pkg/cri/sbserver: add timeout to drain exec io
Signed-off-by: Wei Fu <fuweid89@gmail.com>
This commit is contained in:
parent
82c0f4ff86
commit
04dfd6275e
@ -37,6 +37,23 @@ import (
|
|||||||
cioutil "github.com/containerd/containerd/pkg/ioutil"
|
cioutil "github.com/containerd/containerd/pkg/ioutil"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// defaultDrainExecIOTimeout is used to drain exec io after exec process exits.
|
||||||
|
//
|
||||||
|
// By default, the child processes spawned by exec process will inherit standard
|
||||||
|
// io file descriptors. The shim server creates a pipe as data channel. Both
|
||||||
|
// exec process and its children write data into the write end of the pipe.
|
||||||
|
// And the shim server will read data from the pipe. If the write end is still
|
||||||
|
// open, the shim server will continue to wait for data from pipe.
|
||||||
|
//
|
||||||
|
// If the exec command is like `bash -c "sleep 365d &"`, the exec process
|
||||||
|
// is bash and quit after create `sleep 365d`. But the `sleep 365d` will hold
|
||||||
|
// the write end of the pipe for a year! It doesn't make senses that CRI plugin
|
||||||
|
// should wait for it.
|
||||||
|
//
|
||||||
|
// So, CRI plugin uses 15 seconds to drain the exec io and then deletes exec
|
||||||
|
// process to stop copy io in shim side.
|
||||||
|
const defaultDrainExecIOTimeout = 15 * time.Second
|
||||||
|
|
||||||
type cappedWriter struct {
|
type cappedWriter struct {
|
||||||
w io.WriteCloser
|
w io.WriteCloser
|
||||||
remain int
|
remain int
|
||||||
@ -206,8 +223,11 @@ func (c *criService) execInternal(ctx context.Context, container containerd.Cont
|
|||||||
exitRes := <-exitCh
|
exitRes := <-exitCh
|
||||||
log.G(ctx).Debugf("Timeout received while waiting for exec process kill %q code %d and error %v",
|
log.G(ctx).Debugf("Timeout received while waiting for exec process kill %q code %d and error %v",
|
||||||
execID, exitRes.ExitCode(), exitRes.Error())
|
execID, exitRes.ExitCode(), exitRes.Error())
|
||||||
<-attachDone
|
|
||||||
log.G(ctx).Debugf("Stream pipe for exec process %q done", execID)
|
if err := drainExecIO(ctx, process, attachDone); err != nil {
|
||||||
|
log.G(ctx).WithError(err).Warnf("failed to drain exec process %q io", execID)
|
||||||
|
}
|
||||||
|
|
||||||
return nil, fmt.Errorf("timeout %v exceeded: %w", opts.timeout, execCtx.Err())
|
return nil, fmt.Errorf("timeout %v exceeded: %w", opts.timeout, execCtx.Err())
|
||||||
case exitRes := <-exitCh:
|
case exitRes := <-exitCh:
|
||||||
code, _, err := exitRes.Result()
|
code, _, err := exitRes.Result()
|
||||||
@ -215,8 +235,10 @@ func (c *criService) execInternal(ctx context.Context, container containerd.Cont
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed while waiting for exec %q: %w", execID, err)
|
return nil, fmt.Errorf("failed while waiting for exec %q: %w", execID, err)
|
||||||
}
|
}
|
||||||
<-attachDone
|
|
||||||
log.G(ctx).Debugf("Stream pipe for exec process %q done", execID)
|
if err := drainExecIO(ctx, process, attachDone); err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to drain exec process %q io: %w", execID, err)
|
||||||
|
}
|
||||||
return &code, nil
|
return &code, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -249,3 +271,26 @@ func (c *criService) execInContainer(ctx context.Context, id string, opts execOp
|
|||||||
|
|
||||||
return c.execInternal(ctx, cntr.Container, id, opts)
|
return c.execInternal(ctx, cntr.Container, id, opts)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func drainExecIO(ctx context.Context, execProcess containerd.Process, attachDone <-chan struct{}) error {
|
||||||
|
timer := time.NewTimer(defaultDrainExecIOTimeout)
|
||||||
|
defer timer.Stop()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-timer.C:
|
||||||
|
|
||||||
|
case <-attachDone:
|
||||||
|
log.G(ctx).Debugf("Stream pipe for exec process %q done", execProcess.ID())
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
log.G(ctx).Debugf("Exec process %q exits but the io is still hold by other processes. Trying to delete exec process to release io", execProcess.ID())
|
||||||
|
_, err := execProcess.Delete(ctx, containerd.WithProcessKill)
|
||||||
|
if err != nil {
|
||||||
|
if !errdefs.IsNotFound(err) {
|
||||||
|
return fmt.Errorf("failed to release exec io by deleting exec process %q: %w",
|
||||||
|
execProcess.ID(), err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return fmt.Errorf("failed to drain exec process %q io because io is still hold by other processes", execProcess.ID())
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user