Merge pull request #7832 from fuweid/fix-7802

pkg/cri: add timeout to drain exec io
This commit is contained in:
Derek McGowan
2023-03-03 13:54:53 -08:00
committed by GitHub
11 changed files with 448 additions and 12 deletions

View File

@@ -351,6 +351,15 @@ type PluginConfig struct {
// The string is in the golang duration format, see:
// https://golang.org/pkg/time/#ParseDuration
ImagePullProgressTimeout string `toml:"image_pull_progress_timeout" json:"imagePullProgressTimeout"`
// DrainExecSyncIOTimeout is the maximum duration to wait for ExecSync
// API' IO EOF event after exec init process exits. A zero value means
// there is no timeout.
//
// The string is in the golang duration format, see:
// https://golang.org/pkg/time/#ParseDuration
//
// For example, the value can be '5h', '2h30m', '10s'.
DrainExecSyncIOTimeout string `toml:"drain_exec_sync_io_timeout" json:"drainExecSyncIOTimeout"`
}
// X509KeyPairStreaming contains the x509 configuration for streaming
@@ -509,5 +518,12 @@ func ValidatePluginConfig(ctx context.Context, c *PluginConfig) error {
return fmt.Errorf("invalid image pull progress timeout: %w", err)
}
}
// Validation for drain_exec_sync_io_timeout
if c.DrainExecSyncIOTimeout != "" {
if _, err := time.ParseDuration(c.DrainExecSyncIOTimeout); err != nil {
return fmt.Errorf("invalid `drain_exec_sync_io_timeout`: %w", err)
}
}
return nil
}

View File

@@ -383,6 +383,20 @@ func TestValidateConfig(t *testing.T) {
},
expectedErr: "`privileged_without_host_devices_all_devices_allowed` requires `privileged_without_host_devices` to be enabled",
},
"invalid drain_exec_sync_io_timeout input": {
config: &PluginConfig{
ContainerdConfig: ContainerdConfig{
DefaultRuntimeName: RuntimeDefault,
Runtimes: map[string]Runtime{
RuntimeDefault: {
Type: "default",
},
},
},
DrainExecSyncIOTimeout: "10",
},
expectedErr: "invalid `drain_exec_sync_io_timeout`",
},
} {
t.Run(desc, func(t *testing.T) {
err := ValidatePluginConfig(context.Background(), test.config)

View File

@@ -110,5 +110,6 @@ func DefaultConfig() PluginConfig {
EnableCDI: false,
CDISpecDirs: []string{"/etc/cdi", "/var/run/cdi"},
ImagePullProgressTimeout: time.Minute.String(),
DrainExecSyncIOTimeout: "0s",
}
}

View File

@@ -86,5 +86,6 @@ func DefaultConfig() PluginConfig {
KeyModel: KeyModelNode,
},
ImagePullProgressTimeout: time.Minute.String(),
DrainExecSyncIOTimeout: "0s",
}
}

View File

@@ -117,6 +117,17 @@ func (c *criService) execInternal(ctx context.Context, container containerd.Cont
ctx, cancel := context.WithCancel(ctx)
defer cancel()
var drainExecSyncIOTimeout time.Duration
var err error
if c.config.DrainExecSyncIOTimeout != "" {
drainExecSyncIOTimeout, err = time.ParseDuration(c.config.DrainExecSyncIOTimeout)
if err != nil {
return nil, fmt.Errorf("failed to parse drain_exec_sync_io_timeout %q: %w",
c.config.DrainExecSyncIOTimeout, err)
}
}
spec, err := container.Spec(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get container spec: %w", err)
@@ -159,7 +170,7 @@ func (c *criService) execInternal(ctx context.Context, container containerd.Cont
defer func() {
deferCtx, deferCancel := util.DeferContext()
defer deferCancel()
if _, err := process.Delete(deferCtx, containerd.WithProcessKill); err != nil {
if _, err := process.Delete(deferCtx, containerd.WithProcessKill); err != nil && !errdefs.IsNotFound(err) {
log.G(ctx).WithError(err).Errorf("Failed to delete exec process %q for container %q", execID, id)
}
}()
@@ -206,8 +217,11 @@ func (c *criService) execInternal(ctx context.Context, container containerd.Cont
exitRes := <-exitCh
log.G(ctx).Debugf("Timeout received while waiting for exec process kill %q code %d and error %v",
execID, exitRes.ExitCode(), exitRes.Error())
<-attachDone
log.G(ctx).Debugf("Stream pipe for exec process %q done", execID)
if err := drainExecSyncIO(ctx, process, drainExecSyncIOTimeout, attachDone); err != nil {
log.G(ctx).WithError(err).Warnf("failed to drain exec process %q io", execID)
}
return nil, fmt.Errorf("timeout %v exceeded: %w", opts.timeout, execCtx.Err())
case exitRes := <-exitCh:
code, _, err := exitRes.Result()
@@ -215,8 +229,10 @@ func (c *criService) execInternal(ctx context.Context, container containerd.Cont
if err != nil {
return nil, fmt.Errorf("failed while waiting for exec %q: %w", execID, err)
}
<-attachDone
log.G(ctx).Debugf("Stream pipe for exec process %q done", execID)
if err := drainExecSyncIO(ctx, process, drainExecSyncIOTimeout, attachDone); err != nil {
return nil, fmt.Errorf("failed to drain exec process %q io: %w", execID, err)
}
return &code, nil
}
}
@@ -249,3 +265,44 @@ func (c *criService) execInContainer(ctx context.Context, id string, opts execOp
return c.execInternal(ctx, cntr.Container, id, opts)
}
// drainExecSyncIO drains process IO with timeout after exec init process exits.
//
// By default, the child processes spawned by exec process will inherit standard
// io file descriptors. The shim server creates a pipe as data channel. Both
// exec process and its children write data into the write end of the pipe.
// And the shim server will read data from the pipe. If the write end is still
// open, the shim server will continue to wait for data from pipe.
//
// If the exec command is like `bash -c "sleep 365d &"`, the exec process
// is bash and quit after create `sleep 365d`. But the `sleep 365d` will hold
// the write end of the pipe for a year! It doesn't make senses that CRI plugin
// should wait for it.
func drainExecSyncIO(ctx context.Context, execProcess containerd.Process, drainExecIOTimeout time.Duration, attachDone <-chan struct{}) error {
var timerCh <-chan time.Time
if drainExecIOTimeout != 0 {
timer := time.NewTimer(drainExecIOTimeout)
defer timer.Stop()
timerCh = timer.C
}
select {
case <-timerCh:
case <-attachDone:
log.G(ctx).Debugf("Stream pipe for exec process %q done", execProcess.ID())
return nil
}
log.G(ctx).Debugf("Exec process %q exits but the io is still held by other processes. Trying to delete exec process to release io", execProcess.ID())
_, err := execProcess.Delete(ctx, containerd.WithProcessKill)
if err != nil {
if !errdefs.IsNotFound(err) {
return fmt.Errorf("failed to release exec io by deleting exec process %q: %w",
execProcess.ID(), err)
}
}
return fmt.Errorf("failed to drain exec process %q io in %s because io is still held by other processes",
execProcess.ID(), drainExecIOTimeout)
}

View File

@@ -18,8 +18,14 @@ package sbserver
import (
"bytes"
"context"
"os"
"syscall"
"testing"
"time"
"github.com/containerd/containerd"
"github.com/containerd/containerd/cio"
cioutil "github.com/containerd/containerd/pkg/ioutil"
"github.com/stretchr/testify/assert"
)
@@ -50,3 +56,95 @@ func TestCWClose(t *testing.T) {
err := cw.Close()
assert.NoError(t, err)
}
func TestDrainExecSyncIO(t *testing.T) {
ctx := context.TODO()
t.Run("NoTimeout", func(t *testing.T) {
ep := &fakeExecProcess{
id: t.Name(),
pid: uint32(os.Getpid()),
}
attachDoneCh := make(chan struct{})
time.AfterFunc(2*time.Second, func() { close(attachDoneCh) })
assert.NoError(t, drainExecSyncIO(ctx, ep, 0, attachDoneCh))
assert.Equal(t, 0, len(ep.actionEvents))
})
t.Run("With3Seconds", func(t *testing.T) {
ep := &fakeExecProcess{
id: t.Name(),
pid: uint32(os.Getpid()),
}
attachDoneCh := make(chan struct{})
time.AfterFunc(100*time.Second, func() { close(attachDoneCh) })
assert.Error(t, drainExecSyncIO(ctx, ep, 3*time.Second, attachDoneCh))
assert.Equal(t, []string{"Delete"}, ep.actionEvents)
})
}
type fakeExecProcess struct {
id string
pid uint32
actionEvents []string
}
// ID of the process
func (p *fakeExecProcess) ID() string {
return p.id
}
// Pid is the system specific process id
func (p *fakeExecProcess) Pid() uint32 {
return p.pid
}
// Start starts the process executing the user's defined binary
func (p *fakeExecProcess) Start(context.Context) error {
p.actionEvents = append(p.actionEvents, "Start")
return nil
}
// Delete removes the process and any resources allocated returning the exit status
func (p *fakeExecProcess) Delete(context.Context, ...containerd.ProcessDeleteOpts) (*containerd.ExitStatus, error) {
p.actionEvents = append(p.actionEvents, "Delete")
return nil, nil
}
// Kill sends the provided signal to the process
func (p *fakeExecProcess) Kill(context.Context, syscall.Signal, ...containerd.KillOpts) error {
p.actionEvents = append(p.actionEvents, "Kill")
return nil
}
// Wait asynchronously waits for the process to exit, and sends the exit code to the returned channel
func (p *fakeExecProcess) Wait(context.Context) (<-chan containerd.ExitStatus, error) {
p.actionEvents = append(p.actionEvents, "Wait")
return nil, nil
}
// CloseIO allows various pipes to be closed on the process
func (p *fakeExecProcess) CloseIO(context.Context, ...containerd.IOCloserOpts) error {
p.actionEvents = append(p.actionEvents, "CloseIO")
return nil
}
// Resize changes the width and height of the process's terminal
func (p *fakeExecProcess) Resize(ctx context.Context, w, h uint32) error {
p.actionEvents = append(p.actionEvents, "Resize")
return nil
}
// IO returns the io set for the process
func (p *fakeExecProcess) IO() cio.IO {
p.actionEvents = append(p.actionEvents, "IO")
return nil
}
// Status returns the executing status of the process
func (p *fakeExecProcess) Status(context.Context) (containerd.Status, error) {
p.actionEvents = append(p.actionEvents, "Status")
return containerd.Status{}, nil
}

View File

@@ -117,6 +117,17 @@ func (c *criService) execInternal(ctx context.Context, container containerd.Cont
ctx, cancel := context.WithCancel(ctx)
defer cancel()
var drainExecSyncIOTimeout time.Duration
var err error
if c.config.DrainExecSyncIOTimeout != "" {
drainExecSyncIOTimeout, err = time.ParseDuration(c.config.DrainExecSyncIOTimeout)
if err != nil {
return nil, fmt.Errorf("failed to parse drain_exec_sync_io_timeout %q: %w",
c.config.DrainExecSyncIOTimeout, err)
}
}
spec, err := container.Spec(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get container spec: %w", err)
@@ -159,7 +170,7 @@ func (c *criService) execInternal(ctx context.Context, container containerd.Cont
defer func() {
deferCtx, deferCancel := util.DeferContext()
defer deferCancel()
if _, err := process.Delete(deferCtx, containerd.WithProcessKill); err != nil {
if _, err := process.Delete(deferCtx, containerd.WithProcessKill); err != nil && !errdefs.IsNotFound(err) {
log.G(ctx).WithError(err).Errorf("Failed to delete exec process %q for container %q", execID, id)
}
}()
@@ -206,8 +217,11 @@ func (c *criService) execInternal(ctx context.Context, container containerd.Cont
exitRes := <-exitCh
log.G(ctx).Debugf("Timeout received while waiting for exec process kill %q code %d and error %v",
execID, exitRes.ExitCode(), exitRes.Error())
<-attachDone
log.G(ctx).Debugf("Stream pipe for exec process %q done", execID)
if err := drainExecSyncIO(ctx, process, drainExecSyncIOTimeout, attachDone); err != nil {
log.G(ctx).WithError(err).Warnf("failed to drain exec process %q io", execID)
}
return nil, fmt.Errorf("timeout %v exceeded: %w", opts.timeout, execCtx.Err())
case exitRes := <-exitCh:
code, _, err := exitRes.Result()
@@ -215,8 +229,10 @@ func (c *criService) execInternal(ctx context.Context, container containerd.Cont
if err != nil {
return nil, fmt.Errorf("failed while waiting for exec %q: %w", execID, err)
}
<-attachDone
log.G(ctx).Debugf("Stream pipe for exec process %q done", execID)
if err := drainExecSyncIO(ctx, process, drainExecSyncIOTimeout, attachDone); err != nil {
return nil, fmt.Errorf("failed to drain exec process %q io: %w", execID, err)
}
return &code, nil
}
}
@@ -249,3 +265,44 @@ func (c *criService) execInContainer(ctx context.Context, id string, opts execOp
return c.execInternal(ctx, cntr.Container, id, opts)
}
// drainExecSyncIO drains process IO with timeout after exec init process exits.
//
// By default, the child processes spawned by exec process will inherit standard
// io file descriptors. The shim server creates a pipe as data channel. Both
// exec process and its children write data into the write end of the pipe.
// And the shim server will read data from the pipe. If the write end is still
// open, the shim server will continue to wait for data from pipe.
//
// If the exec command is like `bash -c "sleep 365d &"`, the exec process
// is bash and quit after create `sleep 365d`. But the `sleep 365d` will hold
// the write end of the pipe for a year! It doesn't make senses that CRI plugin
// should wait for it.
func drainExecSyncIO(ctx context.Context, execProcess containerd.Process, drainExecIOTimeout time.Duration, attachDone <-chan struct{}) error {
var timerCh <-chan time.Time
if drainExecIOTimeout != 0 {
timer := time.NewTimer(drainExecIOTimeout)
defer timer.Stop()
timerCh = timer.C
}
select {
case <-timerCh:
case <-attachDone:
log.G(ctx).Debugf("Stream pipe for exec process %q done", execProcess.ID())
return nil
}
log.G(ctx).Debugf("Exec process %q exits but the io is still held by other processes. Trying to delete exec process to release io", execProcess.ID())
_, err := execProcess.Delete(ctx, containerd.WithProcessKill)
if err != nil {
if !errdefs.IsNotFound(err) {
return fmt.Errorf("failed to release exec io by deleting exec process %q: %w",
execProcess.ID(), err)
}
}
return fmt.Errorf("failed to drain exec process %q io in %s because io is still held by other processes",
execProcess.ID(), drainExecIOTimeout)
}

View File

@@ -18,8 +18,14 @@ package server
import (
"bytes"
"context"
"os"
"syscall"
"testing"
"time"
"github.com/containerd/containerd"
"github.com/containerd/containerd/cio"
cioutil "github.com/containerd/containerd/pkg/ioutil"
"github.com/stretchr/testify/assert"
)
@@ -50,3 +56,95 @@ func TestCWClose(t *testing.T) {
err := cw.Close()
assert.NoError(t, err)
}
func TestDrainExecSyncIO(t *testing.T) {
ctx := context.TODO()
t.Run("NoTimeout", func(t *testing.T) {
ep := &fakeExecProcess{
id: t.Name(),
pid: uint32(os.Getpid()),
}
attachDoneCh := make(chan struct{})
time.AfterFunc(2*time.Second, func() { close(attachDoneCh) })
assert.NoError(t, drainExecSyncIO(ctx, ep, 0, attachDoneCh))
assert.Equal(t, 0, len(ep.actionEvents))
})
t.Run("With3Seconds", func(t *testing.T) {
ep := &fakeExecProcess{
id: t.Name(),
pid: uint32(os.Getpid()),
}
attachDoneCh := make(chan struct{})
time.AfterFunc(100*time.Second, func() { close(attachDoneCh) })
assert.Error(t, drainExecSyncIO(ctx, ep, 3*time.Second, attachDoneCh))
assert.Equal(t, []string{"Delete"}, ep.actionEvents)
})
}
type fakeExecProcess struct {
id string
pid uint32
actionEvents []string
}
// ID of the process
func (p *fakeExecProcess) ID() string {
return p.id
}
// Pid is the system specific process id
func (p *fakeExecProcess) Pid() uint32 {
return p.pid
}
// Start starts the process executing the user's defined binary
func (p *fakeExecProcess) Start(context.Context) error {
p.actionEvents = append(p.actionEvents, "Start")
return nil
}
// Delete removes the process and any resources allocated returning the exit status
func (p *fakeExecProcess) Delete(context.Context, ...containerd.ProcessDeleteOpts) (*containerd.ExitStatus, error) {
p.actionEvents = append(p.actionEvents, "Delete")
return nil, nil
}
// Kill sends the provided signal to the process
func (p *fakeExecProcess) Kill(context.Context, syscall.Signal, ...containerd.KillOpts) error {
p.actionEvents = append(p.actionEvents, "Kill")
return nil
}
// Wait asynchronously waits for the process to exit, and sends the exit code to the returned channel
func (p *fakeExecProcess) Wait(context.Context) (<-chan containerd.ExitStatus, error) {
p.actionEvents = append(p.actionEvents, "Wait")
return nil, nil
}
// CloseIO allows various pipes to be closed on the process
func (p *fakeExecProcess) CloseIO(context.Context, ...containerd.IOCloserOpts) error {
p.actionEvents = append(p.actionEvents, "CloseIO")
return nil
}
// Resize changes the width and height of the process's terminal
func (p *fakeExecProcess) Resize(ctx context.Context, w, h uint32) error {
p.actionEvents = append(p.actionEvents, "Resize")
return nil
}
// IO returns the io set for the process
func (p *fakeExecProcess) IO() cio.IO {
p.actionEvents = append(p.actionEvents, "IO")
return nil
}
// Status returns the executing status of the process
func (p *fakeExecProcess) Status(context.Context) (containerd.Status, error) {
p.actionEvents = append(p.actionEvents, "Status")
return containerd.Status{}, nil
}