From 04dfd6275e6e607a4a2013bc9b5dd7746555b811 Mon Sep 17 00:00:00 2001 From: Wei Fu Date: Sun, 18 Dec 2022 17:28:50 +0800 Subject: [PATCH] pkg/cri/sbserver: add timeout to drain exec io Signed-off-by: Wei Fu --- pkg/cri/sbserver/container_execsync.go | 53 ++++++++++++++++++++++++-- 1 file changed, 49 insertions(+), 4 deletions(-) diff --git a/pkg/cri/sbserver/container_execsync.go b/pkg/cri/sbserver/container_execsync.go index 9dddabbf5..0016155ed 100644 --- a/pkg/cri/sbserver/container_execsync.go +++ b/pkg/cri/sbserver/container_execsync.go @@ -37,6 +37,23 @@ import ( cioutil "github.com/containerd/containerd/pkg/ioutil" ) +// defaultDrainExecIOTimeout is used to drain exec io after exec process exits. +// +// By default, the child processes spawned by exec process will inherit standard +// io file descriptors. The shim server creates a pipe as data channel. Both +// exec process and its children write data into the write end of the pipe. +// And the shim server will read data from the pipe. If the write end is still +// open, the shim server will continue to wait for data from pipe. +// +// If the exec command is like `bash -c "sleep 365d &"`, the exec process +// is bash and quit after create `sleep 365d`. But the `sleep 365d` will hold +// the write end of the pipe for a year! It doesn't make senses that CRI plugin +// should wait for it. +// +// So, CRI plugin uses 15 seconds to drain the exec io and then deletes exec +// process to stop copy io in shim side. +const defaultDrainExecIOTimeout = 15 * time.Second + type cappedWriter struct { w io.WriteCloser remain int @@ -206,8 +223,11 @@ func (c *criService) execInternal(ctx context.Context, container containerd.Cont exitRes := <-exitCh log.G(ctx).Debugf("Timeout received while waiting for exec process kill %q code %d and error %v", execID, exitRes.ExitCode(), exitRes.Error()) - <-attachDone - log.G(ctx).Debugf("Stream pipe for exec process %q done", execID) + + if err := drainExecIO(ctx, process, attachDone); err != nil { + log.G(ctx).WithError(err).Warnf("failed to drain exec process %q io", execID) + } + return nil, fmt.Errorf("timeout %v exceeded: %w", opts.timeout, execCtx.Err()) case exitRes := <-exitCh: code, _, err := exitRes.Result() @@ -215,8 +235,10 @@ func (c *criService) execInternal(ctx context.Context, container containerd.Cont if err != nil { return nil, fmt.Errorf("failed while waiting for exec %q: %w", execID, err) } - <-attachDone - log.G(ctx).Debugf("Stream pipe for exec process %q done", execID) + + if err := drainExecIO(ctx, process, attachDone); err != nil { + return nil, fmt.Errorf("failed to drain exec process %q io: %w", execID, err) + } return &code, nil } } @@ -249,3 +271,26 @@ func (c *criService) execInContainer(ctx context.Context, id string, opts execOp return c.execInternal(ctx, cntr.Container, id, opts) } + +func drainExecIO(ctx context.Context, execProcess containerd.Process, attachDone <-chan struct{}) error { + timer := time.NewTimer(defaultDrainExecIOTimeout) + defer timer.Stop() + + select { + case <-timer.C: + + case <-attachDone: + log.G(ctx).Debugf("Stream pipe for exec process %q done", execProcess.ID()) + return nil + } + + log.G(ctx).Debugf("Exec process %q exits but the io is still hold by other processes. Trying to delete exec process to release io", execProcess.ID()) + _, err := execProcess.Delete(ctx, containerd.WithProcessKill) + if err != nil { + if !errdefs.IsNotFound(err) { + return fmt.Errorf("failed to release exec io by deleting exec process %q: %w", + execProcess.ID(), err) + } + } + return fmt.Errorf("failed to drain exec process %q io because io is still hold by other processes", execProcess.ID()) +}