Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: kill tidb [session id] can't stop executors and release resources quickly #9844

Merged
merged 16 commits into from
Apr 1, 2019
Prev Previous commit
Next Next commit
update
  • Loading branch information
qw4990 committed Mar 27, 2019
commit 933fd0493d7d10954055a32d8167d452ecf98e92
45 changes: 18 additions & 27 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 220,7 @@ func (a *ExecStmt) Exec(ctx context.Context) (sqlexec.RecordSet, error) {
if err != nil {
return nil, errors.Trace(err)
}
e := wrapCtxWatcher(exec)
e := wrapReentrant(exec)

if err = e.Open(ctx); err != nil {
terror.Call(e.Close)
Expand Down Expand Up @@ -269,7 269,7 @@ func (a *ExecStmt) Exec(ctx context.Context) (sqlexec.RecordSet, error) {
}, nil
}

func (a *ExecStmt) handleNoDelayExecutor(ctx context.Context, sctx sessionctx.Context, e *execCtxWatcher) (sqlexec.RecordSet, error) {
func (a *ExecStmt) handleNoDelayExecutor(ctx context.Context, sctx sessionctx.Context, e *execReentrantWrapper) (sqlexec.RecordSet, error) {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("executor.handleNoDelayExecutor", opentracing.ChildOf(span.Context()))
defer span1.Finish()
Expand Down Expand Up @@ -486,49 486,40 @@ func IsPointGetWithPKOrUniqueKeyByAutoCommit(ctx sessionctx.Context, p plannerco
}
}

// execCtxWatcher used to watch the context specified in Open and
const (
reentrantWrapperNotStart = iota
reentrantWrapperStartFail
reentrantWrapperRunning
reentrantWrapperClosed
)

// execReentrantWrapper used to watch the context specified in Open and
// stop the wrapped executor when this context is cancelled.
type execCtxWatcher struct {
type execReentrantWrapper struct {
Executor
// 0, 1, 2, 3 represent not started, failed in start, running, closed
status int32
exit chan struct{}
}

func wrapCtxWatcher(exec Executor) *execCtxWatcher {
return &execCtxWatcher{exec, 0, make(chan struct{})}
func wrapReentrant(exec Executor) *execReentrantWrapper {
return &execReentrantWrapper{exec, reentrantWrapperNotStart}
}

func (ecw *execCtxWatcher) Open(ctx context.Context) error {
if !atomic.CompareAndSwapInt32(&ecw.status, 0, 2) {
func (ecw *execReentrantWrapper) Open(ctx context.Context) error {
if !atomic.CompareAndSwapInt32(&ecw.status, reentrantWrapperNotStart, reentrantWrapperRunning) {
return nil
}

if err := ecw.Executor.Open(ctx); err != nil {
atomic.StoreInt32(&ecw.status, 1)
atomic.StoreInt32(&ecw.status, reentrantWrapperStartFail)
return err
}
go ecw.watch(ctx)

return nil
}

func (ecw *execCtxWatcher) Close() error {
if !atomic.CompareAndSwapInt32(&ecw.status, 2, 3) {
func (ecw *execReentrantWrapper) Close() error {
if !atomic.CompareAndSwapInt32(&ecw.status, reentrantWrapperRunning, reentrantWrapperClosed) {
return nil
}
close(ecw.exit)
return ecw.Executor.Close()
}

func (ecw *execCtxWatcher) watch(ctx context.Context) {
select {
case <-ctx.Done():
if err := ecw.Close(); err != nil {
logutil.Logger(ctx).Error("cancel executor in execCtxWatcher err", zap.Error(err))
}
return
case <-ecw.exit:
return
}
}
115 changes: 0 additions & 115 deletions executor/adapter_test.go

This file was deleted.