From 3c18decea741d86a01def544c51675268afd3936 Mon Sep 17 00:00:00 2001 From: Wei Fu Date: Fri, 3 Mar 2023 00:14:43 +0800 Subject: [PATCH] *: add DrainExecSyncIOTimeout config and disable as by default Signed-off-by: Wei Fu --- pkg/cri/config/config.go | 14 +++ pkg/cri/config/config_unix.go | 1 + pkg/cri/config/config_windows.go | 1 + pkg/cri/sbserver/container_execsync.go | 52 ++++++----- pkg/cri/sbserver/container_execsync_test.go | 98 +++++++++++++++++++++ pkg/cri/server/container_execsync.go | 52 ++++++----- pkg/cri/server/container_execsync_test.go | 98 +++++++++++++++++++++ script/test/utils.sh | 16 +++- 8 files changed, 284 insertions(+), 48 deletions(-) diff --git a/pkg/cri/config/config.go b/pkg/cri/config/config.go index 946baa9bc..33f81958c 100644 --- a/pkg/cri/config/config.go +++ b/pkg/cri/config/config.go @@ -351,6 +351,13 @@ type PluginConfig struct { // The string is in the golang duration format, see: // https://golang.org/pkg/time/#ParseDuration ImagePullProgressTimeout string `toml:"image_pull_progress_timeout" json:"imagePullProgressTimeout"` + // DrainExecSyncIOTimeout is the maximum duration to wait for ExecSync + // API' IO EOF event after exec init process exits. A zero value means + // there is no timeout. + // + // The string is in the golang duration format, see: + // https://golang.org/pkg/time/#ParseDuration + DrainExecSyncIOTimeout string `toml:"drain_exec_sync_io_timeout" json:"drainExecSyncIOTimeout"` } // X509KeyPairStreaming contains the x509 configuration for streaming @@ -509,5 +516,12 @@ func ValidatePluginConfig(ctx context.Context, c *PluginConfig) error { return fmt.Errorf("invalid image pull progress timeout: %w", err) } } + + // Validation for drain_exec_sync_io_timeout + if c.DrainExecSyncIOTimeout != "" { + if _, err := time.ParseDuration(c.DrainExecSyncIOTimeout); err != nil { + return fmt.Errorf("invalid drain exec sync io timeout: %w", err) + } + } return nil } diff --git a/pkg/cri/config/config_unix.go b/pkg/cri/config/config_unix.go index 0e8b393af..c131ce865 100644 --- a/pkg/cri/config/config_unix.go +++ b/pkg/cri/config/config_unix.go @@ -110,5 +110,6 @@ func DefaultConfig() PluginConfig { EnableCDI: false, CDISpecDirs: []string{"/etc/cdi", "/var/run/cdi"}, ImagePullProgressTimeout: time.Minute.String(), + DrainExecSyncIOTimeout: "0", } } diff --git a/pkg/cri/config/config_windows.go b/pkg/cri/config/config_windows.go index 55db40328..19fd020b7 100644 --- a/pkg/cri/config/config_windows.go +++ b/pkg/cri/config/config_windows.go @@ -86,5 +86,6 @@ func DefaultConfig() PluginConfig { KeyModel: KeyModelNode, }, ImagePullProgressTimeout: time.Minute.String(), + DrainExecSyncIOTimeout: "0", } } diff --git a/pkg/cri/sbserver/container_execsync.go b/pkg/cri/sbserver/container_execsync.go index 90ff79d7f..af0c15842 100644 --- a/pkg/cri/sbserver/container_execsync.go +++ b/pkg/cri/sbserver/container_execsync.go @@ -37,23 +37,6 @@ import ( cioutil "github.com/containerd/containerd/pkg/ioutil" ) -// defaultDrainExecIOTimeout is used to drain exec io after exec process exits. -// -// By default, the child processes spawned by exec process will inherit standard -// io file descriptors. The shim server creates a pipe as data channel. Both -// exec process and its children write data into the write end of the pipe. -// And the shim server will read data from the pipe. If the write end is still -// open, the shim server will continue to wait for data from pipe. -// -// If the exec command is like `bash -c "sleep 365d &"`, the exec process -// is bash and quit after create `sleep 365d`. But the `sleep 365d` will hold -// the write end of the pipe for a year! It doesn't make senses that CRI plugin -// should wait for it. -// -// So, CRI plugin uses 15 seconds to drain the exec io and then deletes exec -// process to stop copy io in shim side. -const defaultDrainExecIOTimeout = 15 * time.Second - type cappedWriter struct { w io.WriteCloser remain int @@ -134,6 +117,11 @@ func (c *criService) execInternal(ctx context.Context, container containerd.Cont ctx, cancel := context.WithCancel(ctx) defer cancel() + drainExecSyncIOTimeout, err := time.ParseDuration(c.config.DrainExecSyncIOTimeout) + if err != nil { + return nil, fmt.Errorf("failed to parse drain_exec_sync_io_timeout %q: %w", c.config.DrainExecSyncIOTimeout, err) + } + spec, err := container.Spec(ctx) if err != nil { return nil, fmt.Errorf("failed to get container spec: %w", err) @@ -224,7 +212,7 @@ func (c *criService) execInternal(ctx context.Context, container containerd.Cont log.G(ctx).Debugf("Timeout received while waiting for exec process kill %q code %d and error %v", execID, exitRes.ExitCode(), exitRes.Error()) - if err := drainExecIO(ctx, process, attachDone); err != nil { + if err := drainExecSyncIO(ctx, process, drainExecSyncIOTimeout, attachDone); err != nil { log.G(ctx).WithError(err).Warnf("failed to drain exec process %q io", execID) } @@ -236,7 +224,7 @@ func (c *criService) execInternal(ctx context.Context, container containerd.Cont return nil, fmt.Errorf("failed while waiting for exec %q: %w", execID, err) } - if err := drainExecIO(ctx, process, attachDone); err != nil { + if err := drainExecSyncIO(ctx, process, drainExecSyncIOTimeout, attachDone); err != nil { return nil, fmt.Errorf("failed to drain exec process %q io: %w", execID, err) } return &code, nil @@ -272,12 +260,30 @@ func (c *criService) execInContainer(ctx context.Context, id string, opts execOp return c.execInternal(ctx, cntr.Container, id, opts) } -func drainExecIO(ctx context.Context, execProcess containerd.Process, attachDone <-chan struct{}) error { - timer := time.NewTimer(defaultDrainExecIOTimeout) - defer timer.Stop() +// drainExecSyncIO drains process IO with timeout after exec init process exits. +// +// By default, the child processes spawned by exec process will inherit standard +// io file descriptors. The shim server creates a pipe as data channel. Both +// exec process and its children write data into the write end of the pipe. +// And the shim server will read data from the pipe. If the write end is still +// open, the shim server will continue to wait for data from pipe. +// +// If the exec command is like `bash -c "sleep 365d &"`, the exec process +// is bash and quit after create `sleep 365d`. But the `sleep 365d` will hold +// the write end of the pipe for a year! It doesn't make senses that CRI plugin +// should wait for it. +func drainExecSyncIO(ctx context.Context, execProcess containerd.Process, drainExecIOTimeout time.Duration, attachDone <-chan struct{}) error { + var timerCh <-chan time.Time + + if drainExecIOTimeout != 0 { + timer := time.NewTimer(drainExecIOTimeout) + defer timer.Stop() + + timerCh = timer.C + } select { - case <-timer.C: + case <-timerCh: case <-attachDone: log.G(ctx).Debugf("Stream pipe for exec process %q done", execProcess.ID()) diff --git a/pkg/cri/sbserver/container_execsync_test.go b/pkg/cri/sbserver/container_execsync_test.go index 3b23aa677..3f3ef274d 100644 --- a/pkg/cri/sbserver/container_execsync_test.go +++ b/pkg/cri/sbserver/container_execsync_test.go @@ -18,8 +18,14 @@ package sbserver import ( "bytes" + "context" + "os" + "syscall" "testing" + "time" + "github.com/containerd/containerd" + "github.com/containerd/containerd/cio" cioutil "github.com/containerd/containerd/pkg/ioutil" "github.com/stretchr/testify/assert" ) @@ -50,3 +56,95 @@ func TestCWClose(t *testing.T) { err := cw.Close() assert.NoError(t, err) } + +func TestDrainExecSyncIO(t *testing.T) { + ctx := context.TODO() + + t.Run("NoTimeout", func(t *testing.T) { + ep := &fakeExecProcess{ + id: t.Name(), + pid: uint32(os.Getpid()), + } + + attachDoneCh := make(chan struct{}) + time.AfterFunc(2*time.Second, func() { close(attachDoneCh) }) + assert.NoError(t, drainExecSyncIO(ctx, ep, 0, attachDoneCh)) + assert.Equal(t, 0, len(ep.actionEvents)) + }) + + t.Run("With3Seconds", func(t *testing.T) { + ep := &fakeExecProcess{ + id: t.Name(), + pid: uint32(os.Getpid()), + } + + attachDoneCh := make(chan struct{}) + time.AfterFunc(100*time.Second, func() { close(attachDoneCh) }) + assert.Error(t, drainExecSyncIO(ctx, ep, 3*time.Second, attachDoneCh)) + assert.Equal(t, []string{"Delete"}, ep.actionEvents) + }) +} + +type fakeExecProcess struct { + id string + pid uint32 + actionEvents []string +} + +// ID of the process +func (p *fakeExecProcess) ID() string { + return p.id +} + +// Pid is the system specific process id +func (p *fakeExecProcess) Pid() uint32 { + return p.pid +} + +// Start starts the process executing the user's defined binary +func (p *fakeExecProcess) Start(context.Context) error { + p.actionEvents = append(p.actionEvents, "Start") + return nil +} + +// Delete removes the process and any resources allocated returning the exit status +func (p *fakeExecProcess) Delete(context.Context, ...containerd.ProcessDeleteOpts) (*containerd.ExitStatus, error) { + p.actionEvents = append(p.actionEvents, "Delete") + return nil, nil +} + +// Kill sends the provided signal to the process +func (p *fakeExecProcess) Kill(context.Context, syscall.Signal, ...containerd.KillOpts) error { + p.actionEvents = append(p.actionEvents, "Kill") + return nil +} + +// Wait asynchronously waits for the process to exit, and sends the exit code to the returned channel +func (p *fakeExecProcess) Wait(context.Context) (<-chan containerd.ExitStatus, error) { + p.actionEvents = append(p.actionEvents, "Wait") + return nil, nil +} + +// CloseIO allows various pipes to be closed on the process +func (p *fakeExecProcess) CloseIO(context.Context, ...containerd.IOCloserOpts) error { + p.actionEvents = append(p.actionEvents, "CloseIO") + return nil +} + +// Resize changes the width and height of the process's terminal +func (p *fakeExecProcess) Resize(ctx context.Context, w, h uint32) error { + p.actionEvents = append(p.actionEvents, "Resize") + return nil +} + +// IO returns the io set for the process +func (p *fakeExecProcess) IO() cio.IO { + p.actionEvents = append(p.actionEvents, "IO") + return nil +} + +// Status returns the executing status of the process +func (p *fakeExecProcess) Status(context.Context) (containerd.Status, error) { + p.actionEvents = append(p.actionEvents, "Status") + return containerd.Status{}, nil +} diff --git a/pkg/cri/server/container_execsync.go b/pkg/cri/server/container_execsync.go index c9d10cc6b..a5884a644 100644 --- a/pkg/cri/server/container_execsync.go +++ b/pkg/cri/server/container_execsync.go @@ -37,23 +37,6 @@ import ( cioutil "github.com/containerd/containerd/pkg/ioutil" ) -// defaultDrainExecIOTimeout is used to drain exec io after exec process exits. -// -// By default, the child processes spawned by exec process will inherit standard -// io file descriptors. The shim server creates a pipe as data channel. Both -// exec process and its children write data into the write end of the pipe. -// And the shim server will read data from the pipe. If the write end is still -// open, the shim server will continue to wait for data from pipe. -// -// If the exec command is like `bash -c "sleep 365d &"`, the exec process -// is bash and quit after create `sleep 365d`. But the `sleep 365d` will hold -// the write end of the pipe for a year! It doesn't make senses that CRI plugin -// should wait for it. -// -// So, CRI plugin uses 15 seconds to drain the exec io and then deletes exec -// process to stop copy io in shim side. -const defaultDrainExecIOTimeout = 15 * time.Second - type cappedWriter struct { w io.WriteCloser remain int @@ -134,6 +117,11 @@ func (c *criService) execInternal(ctx context.Context, container containerd.Cont ctx, cancel := context.WithCancel(ctx) defer cancel() + drainExecSyncIOTimeout, err := time.ParseDuration(c.config.DrainExecSyncIOTimeout) + if err != nil { + return nil, fmt.Errorf("failed to parse drain_exec_sync_io_timeout %q: %w", c.config.DrainExecSyncIOTimeout, err) + } + spec, err := container.Spec(ctx) if err != nil { return nil, fmt.Errorf("failed to get container spec: %w", err) @@ -224,7 +212,7 @@ func (c *criService) execInternal(ctx context.Context, container containerd.Cont log.G(ctx).Debugf("Timeout received while waiting for exec process kill %q code %d and error %v", execID, exitRes.ExitCode(), exitRes.Error()) - if err := drainExecIO(ctx, process, attachDone); err != nil { + if err := drainExecSyncIO(ctx, process, drainExecSyncIOTimeout, attachDone); err != nil { log.G(ctx).WithError(err).Warnf("failed to drain exec process %q io", execID) } @@ -236,7 +224,7 @@ func (c *criService) execInternal(ctx context.Context, container containerd.Cont return nil, fmt.Errorf("failed while waiting for exec %q: %w", execID, err) } - if err := drainExecIO(ctx, process, attachDone); err != nil { + if err := drainExecSyncIO(ctx, process, drainExecSyncIOTimeout, attachDone); err != nil { return nil, fmt.Errorf("failed to drain exec process %q io: %w", execID, err) } return &code, nil @@ -272,12 +260,30 @@ func (c *criService) execInContainer(ctx context.Context, id string, opts execOp return c.execInternal(ctx, cntr.Container, id, opts) } -func drainExecIO(ctx context.Context, execProcess containerd.Process, attachDone <-chan struct{}) error { - timer := time.NewTimer(defaultDrainExecIOTimeout) - defer timer.Stop() +// drainExecSyncIO drains process IO with timeout after exec init process exits. +// +// By default, the child processes spawned by exec process will inherit standard +// io file descriptors. The shim server creates a pipe as data channel. Both +// exec process and its children write data into the write end of the pipe. +// And the shim server will read data from the pipe. If the write end is still +// open, the shim server will continue to wait for data from pipe. +// +// If the exec command is like `bash -c "sleep 365d &"`, the exec process +// is bash and quit after create `sleep 365d`. But the `sleep 365d` will hold +// the write end of the pipe for a year! It doesn't make senses that CRI plugin +// should wait for it. +func drainExecSyncIO(ctx context.Context, execProcess containerd.Process, drainExecIOTimeout time.Duration, attachDone <-chan struct{}) error { + var timerCh <-chan time.Time + + if drainExecIOTimeout != 0 { + timer := time.NewTimer(drainExecIOTimeout) + defer timer.Stop() + + timerCh = timer.C + } select { - case <-timer.C: + case <-timerCh: case <-attachDone: log.G(ctx).Debugf("Stream pipe for exec process %q done", execProcess.ID()) diff --git a/pkg/cri/server/container_execsync_test.go b/pkg/cri/server/container_execsync_test.go index 989909649..48f72abef 100644 --- a/pkg/cri/server/container_execsync_test.go +++ b/pkg/cri/server/container_execsync_test.go @@ -18,8 +18,14 @@ package server import ( "bytes" + "context" + "os" + "syscall" "testing" + "time" + "github.com/containerd/containerd" + "github.com/containerd/containerd/cio" cioutil "github.com/containerd/containerd/pkg/ioutil" "github.com/stretchr/testify/assert" ) @@ -50,3 +56,95 @@ func TestCWClose(t *testing.T) { err := cw.Close() assert.NoError(t, err) } + +func TestDrainExecSyncIO(t *testing.T) { + ctx := context.TODO() + + t.Run("NoTimeout", func(t *testing.T) { + ep := &fakeExecProcess{ + id: t.Name(), + pid: uint32(os.Getpid()), + } + + attachDoneCh := make(chan struct{}) + time.AfterFunc(2*time.Second, func() { close(attachDoneCh) }) + assert.NoError(t, drainExecSyncIO(ctx, ep, 0, attachDoneCh)) + assert.Equal(t, 0, len(ep.actionEvents)) + }) + + t.Run("With3Seconds", func(t *testing.T) { + ep := &fakeExecProcess{ + id: t.Name(), + pid: uint32(os.Getpid()), + } + + attachDoneCh := make(chan struct{}) + time.AfterFunc(100*time.Second, func() { close(attachDoneCh) }) + assert.Error(t, drainExecSyncIO(ctx, ep, 3*time.Second, attachDoneCh)) + assert.Equal(t, []string{"Delete"}, ep.actionEvents) + }) +} + +type fakeExecProcess struct { + id string + pid uint32 + actionEvents []string +} + +// ID of the process +func (p *fakeExecProcess) ID() string { + return p.id +} + +// Pid is the system specific process id +func (p *fakeExecProcess) Pid() uint32 { + return p.pid +} + +// Start starts the process executing the user's defined binary +func (p *fakeExecProcess) Start(context.Context) error { + p.actionEvents = append(p.actionEvents, "Start") + return nil +} + +// Delete removes the process and any resources allocated returning the exit status +func (p *fakeExecProcess) Delete(context.Context, ...containerd.ProcessDeleteOpts) (*containerd.ExitStatus, error) { + p.actionEvents = append(p.actionEvents, "Delete") + return nil, nil +} + +// Kill sends the provided signal to the process +func (p *fakeExecProcess) Kill(context.Context, syscall.Signal, ...containerd.KillOpts) error { + p.actionEvents = append(p.actionEvents, "Kill") + return nil +} + +// Wait asynchronously waits for the process to exit, and sends the exit code to the returned channel +func (p *fakeExecProcess) Wait(context.Context) (<-chan containerd.ExitStatus, error) { + p.actionEvents = append(p.actionEvents, "Wait") + return nil, nil +} + +// CloseIO allows various pipes to be closed on the process +func (p *fakeExecProcess) CloseIO(context.Context, ...containerd.IOCloserOpts) error { + p.actionEvents = append(p.actionEvents, "CloseIO") + return nil +} + +// Resize changes the width and height of the process's terminal +func (p *fakeExecProcess) Resize(ctx context.Context, w, h uint32) error { + p.actionEvents = append(p.actionEvents, "Resize") + return nil +} + +// IO returns the io set for the process +func (p *fakeExecProcess) IO() cio.IO { + p.actionEvents = append(p.actionEvents, "IO") + return nil +} + +// Status returns the executing status of the process +func (p *fakeExecProcess) Status(context.Context) (containerd.Status, error) { + p.actionEvents = append(p.actionEvents, "Status") + return containerd.Status{}, nil +} diff --git a/script/test/utils.sh b/script/test/utils.sh index f3a009816..8eedc040e 100755 --- a/script/test/utils.sh +++ b/script/test/utils.sh @@ -39,11 +39,23 @@ CONTAINERD_RUNTIME=${CONTAINERD_RUNTIME:-""} if [ -z "${CONTAINERD_CONFIG_FILE}" ]; then config_file="${CONTAINERD_CONFIG_DIR}/containerd-config-cri.toml" truncate --size 0 "${config_file}" - echo "version=2" >> ${config_file} + # TODO(fuweid): if the config.Imports supports patch update, it will be easy + # to write the integration test case with different configuration, like: + # + # 1. write configuration into importable containerd config path. + # 2. restart containerd + # 3. verify the behaviour + # 4. delete the configuration + # 5. restart containerd + cat >>${config_file} </dev/null 2>&1; then cat >>${config_file} <