From 82c0f4ff86dc7b14b2e1b5eb98b4531ea9fd7681 Mon Sep 17 00:00:00 2001 From: Wei Fu Date: Sun, 18 Dec 2022 17:27:12 +0800 Subject: [PATCH 01/10] pkg/cri/server: add timeout to drain exec io By default, the child processes spawned by exec process will inherit standard io file descriptors. The shim server creates a pipe as data channel. Both exec process and its children write data into the write end of the pipe. And the shim server will read data from the pipe. If the write end is still open, the shim server will continue to wait for data from pipe. So, if the exec command is like `bash -c "sleep 365d &"`, the exec process is bash and quit after create `sleep 365d`. But the `sleep 365d` will hold the write end of the pipe for a year! It doesn't make senses that CRI plugin should wait for it. For this case, we should use timeout to drain exec process's io instead of waiting for it. Fixes: #7802 Signed-off-by: Wei Fu --- integration/container_exec_test.go | 62 ++++++++++++++++++++++++++++ pkg/cri/server/container_execsync.go | 55 +++++++++++++++++++++--- 2 files changed, 112 insertions(+), 5 deletions(-) create mode 100644 integration/container_exec_test.go diff --git a/integration/container_exec_test.go b/integration/container_exec_test.go new file mode 100644 index 000000000..553faec6e --- /dev/null +++ b/integration/container_exec_test.go @@ -0,0 +1,62 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package integration + +import ( + "testing" + "time" + + "github.com/containerd/containerd/integration/images" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestContainerDrainExecIOAfterExit(t *testing.T) { + t.Log("Create a sandbox") + sb, sbConfig := PodSandboxConfigWithCleanup(t, "sandbox", "container-exec-drain-io-after-exit") + + var ( + testImage = images.Get(images.BusyBox) + containerName = "test-container-exec" + ) + + EnsureImageExists(t, testImage) + + t.Log("Create a container") + cnConfig := ContainerConfig( + containerName, + testImage, + WithCommand("sh", "-c", "sleep 365d"), + ) + + cn, err := runtimeService.CreateContainer(sb, cnConfig, sbConfig) + require.NoError(t, err) + defer func() { + assert.NoError(t, runtimeService.RemoveContainer(cn)) + }() + + t.Log("Start the container") + require.NoError(t, runtimeService.StartContainer(cn)) + defer func() { + assert.NoError(t, runtimeService.StopContainer(cn, 10)) + }() + + t.Log("Exec in container") + _, _, err = runtimeService.ExecSync(cn, []string{"sh", "-c", "sleep 365d &"}, 5*time.Second) + require.ErrorContains(t, err, "failed to drain exec process") +} diff --git a/pkg/cri/server/container_execsync.go b/pkg/cri/server/container_execsync.go index 51bacd163..158ee9f23 100644 --- a/pkg/cri/server/container_execsync.go +++ b/pkg/cri/server/container_execsync.go @@ -37,6 +37,23 @@ import ( cioutil "github.com/containerd/containerd/pkg/ioutil" ) +// defaultDrainExecIOTimeout is used to drain exec io after exec process exits. +// +// By default, the child processes spawned by exec process will inherit standard +// io file descriptors. The shim server creates a pipe as data channel. Both +// exec process and its children write data into the write end of the pipe. +// And the shim server will read data from the pipe. If the write end is still +// open, the shim server will continue to wait for data from pipe. +// +// If the exec command is like `bash -c "sleep 365d &"`, the exec process +// is bash and quit after create `sleep 365d`. But the `sleep 365d` will hold +// the write end of the pipe for a year! It doesn't make senses that CRI plugin +// should wait for it. +// +// So, CRI plugin uses 15 seconds to drain the exec io and then deletes exec +// process to stop copy io in shim side. +const defaultDrainExecIOTimeout = 15 * time.Second + type cappedWriter struct { w io.WriteCloser remain int @@ -159,7 +176,7 @@ func (c *criService) execInternal(ctx context.Context, container containerd.Cont defer func() { deferCtx, deferCancel := util.DeferContext() defer deferCancel() - if _, err := process.Delete(deferCtx, containerd.WithProcessKill); err != nil { + if _, err := process.Delete(deferCtx, containerd.WithProcessKill); err != nil && !errdefs.IsNotFound(err) { log.G(ctx).WithError(err).Errorf("Failed to delete exec process %q for container %q", execID, id) } }() @@ -206,8 +223,11 @@ func (c *criService) execInternal(ctx context.Context, container containerd.Cont exitRes := <-exitCh log.G(ctx).Debugf("Timeout received while waiting for exec process kill %q code %d and error %v", execID, exitRes.ExitCode(), exitRes.Error()) - <-attachDone - log.G(ctx).Debugf("Stream pipe for exec process %q done", execID) + + if err := drainExecIO(ctx, process, attachDone); err != nil { + log.G(ctx).WithError(err).Warnf("failed to drain exec process %q io", execID) + } + return nil, fmt.Errorf("timeout %v exceeded: %w", opts.timeout, execCtx.Err()) case exitRes := <-exitCh: code, _, err := exitRes.Result() @@ -215,8 +235,10 @@ func (c *criService) execInternal(ctx context.Context, container containerd.Cont if err != nil { return nil, fmt.Errorf("failed while waiting for exec %q: %w", execID, err) } - <-attachDone - log.G(ctx).Debugf("Stream pipe for exec process %q done", execID) + + if err := drainExecIO(ctx, process, attachDone); err != nil { + return nil, fmt.Errorf("failed to drain exec process %q io: %w", execID, err) + } return &code, nil } } @@ -249,3 +271,26 @@ func (c *criService) execInContainer(ctx context.Context, id string, opts execOp return c.execInternal(ctx, cntr.Container, id, opts) } + +func drainExecIO(ctx context.Context, execProcess containerd.Process, attachDone <-chan struct{}) error { + timer := time.NewTimer(defaultDrainExecIOTimeout) + defer timer.Stop() + + select { + case <-timer.C: + + case <-attachDone: + log.G(ctx).Debugf("Stream pipe for exec process %q done", execProcess.ID()) + return nil + } + + log.G(ctx).Debugf("Exec process %q exits but the io is still hold by other processes. Trying to delete exec process to release io", execProcess.ID()) + _, err := execProcess.Delete(ctx, containerd.WithProcessKill) + if err != nil { + if !errdefs.IsNotFound(err) { + return fmt.Errorf("failed to release exec io by deleting exec process %q: %w", + execProcess.ID(), err) + } + } + return fmt.Errorf("failed to drain exec process %q io because io is still hold by other processes", execProcess.ID()) +} From 04dfd6275e6e607a4a2013bc9b5dd7746555b811 Mon Sep 17 00:00:00 2001 From: Wei Fu Date: Sun, 18 Dec 2022 17:28:50 +0800 Subject: [PATCH 02/10] pkg/cri/sbserver: add timeout to drain exec io Signed-off-by: Wei Fu --- pkg/cri/sbserver/container_execsync.go | 53 ++++++++++++++++++++++++-- 1 file changed, 49 insertions(+), 4 deletions(-) diff --git a/pkg/cri/sbserver/container_execsync.go b/pkg/cri/sbserver/container_execsync.go index 9dddabbf5..0016155ed 100644 --- a/pkg/cri/sbserver/container_execsync.go +++ b/pkg/cri/sbserver/container_execsync.go @@ -37,6 +37,23 @@ import ( cioutil "github.com/containerd/containerd/pkg/ioutil" ) +// defaultDrainExecIOTimeout is used to drain exec io after exec process exits. +// +// By default, the child processes spawned by exec process will inherit standard +// io file descriptors. The shim server creates a pipe as data channel. Both +// exec process and its children write data into the write end of the pipe. +// And the shim server will read data from the pipe. If the write end is still +// open, the shim server will continue to wait for data from pipe. +// +// If the exec command is like `bash -c "sleep 365d &"`, the exec process +// is bash and quit after create `sleep 365d`. But the `sleep 365d` will hold +// the write end of the pipe for a year! It doesn't make senses that CRI plugin +// should wait for it. +// +// So, CRI plugin uses 15 seconds to drain the exec io and then deletes exec +// process to stop copy io in shim side. +const defaultDrainExecIOTimeout = 15 * time.Second + type cappedWriter struct { w io.WriteCloser remain int @@ -206,8 +223,11 @@ func (c *criService) execInternal(ctx context.Context, container containerd.Cont exitRes := <-exitCh log.G(ctx).Debugf("Timeout received while waiting for exec process kill %q code %d and error %v", execID, exitRes.ExitCode(), exitRes.Error()) - <-attachDone - log.G(ctx).Debugf("Stream pipe for exec process %q done", execID) + + if err := drainExecIO(ctx, process, attachDone); err != nil { + log.G(ctx).WithError(err).Warnf("failed to drain exec process %q io", execID) + } + return nil, fmt.Errorf("timeout %v exceeded: %w", opts.timeout, execCtx.Err()) case exitRes := <-exitCh: code, _, err := exitRes.Result() @@ -215,8 +235,10 @@ func (c *criService) execInternal(ctx context.Context, container containerd.Cont if err != nil { return nil, fmt.Errorf("failed while waiting for exec %q: %w", execID, err) } - <-attachDone - log.G(ctx).Debugf("Stream pipe for exec process %q done", execID) + + if err := drainExecIO(ctx, process, attachDone); err != nil { + return nil, fmt.Errorf("failed to drain exec process %q io: %w", execID, err) + } return &code, nil } } @@ -249,3 +271,26 @@ func (c *criService) execInContainer(ctx context.Context, id string, opts execOp return c.execInternal(ctx, cntr.Container, id, opts) } + +func drainExecIO(ctx context.Context, execProcess containerd.Process, attachDone <-chan struct{}) error { + timer := time.NewTimer(defaultDrainExecIOTimeout) + defer timer.Stop() + + select { + case <-timer.C: + + case <-attachDone: + log.G(ctx).Debugf("Stream pipe for exec process %q done", execProcess.ID()) + return nil + } + + log.G(ctx).Debugf("Exec process %q exits but the io is still hold by other processes. Trying to delete exec process to release io", execProcess.ID()) + _, err := execProcess.Delete(ctx, containerd.WithProcessKill) + if err != nil { + if !errdefs.IsNotFound(err) { + return fmt.Errorf("failed to release exec io by deleting exec process %q: %w", + execProcess.ID(), err) + } + } + return fmt.Errorf("failed to drain exec process %q io because io is still hold by other processes", execProcess.ID()) +} From a9cbddd65d3eb1c5caa47c9fbc2c34ecce7c7d26 Mon Sep 17 00:00:00 2001 From: Wei Fu Date: Thu, 2 Mar 2023 21:57:43 +0800 Subject: [PATCH 03/10] *: fix typo and skip exec-io-drain-testcase in win Signed-off-by: Wei Fu --- integration/container_exec_test.go | 6 ++++++ pkg/cri/sbserver/container_execsync.go | 4 ++-- pkg/cri/server/container_execsync.go | 4 ++-- 3 files changed, 10 insertions(+), 4 deletions(-) diff --git a/integration/container_exec_test.go b/integration/container_exec_test.go index 553faec6e..1fcd1d765 100644 --- a/integration/container_exec_test.go +++ b/integration/container_exec_test.go @@ -17,6 +17,7 @@ package integration import ( + "runtime" "testing" "time" @@ -27,6 +28,11 @@ import ( ) func TestContainerDrainExecIOAfterExit(t *testing.T) { + // FIXME(fuweid): support it for windows container. + if runtime.GOOS == "windows" { + t.Skip("it seems that windows platform doesn't support detached process. skip it") + } + t.Log("Create a sandbox") sb, sbConfig := PodSandboxConfigWithCleanup(t, "sandbox", "container-exec-drain-io-after-exit") diff --git a/pkg/cri/sbserver/container_execsync.go b/pkg/cri/sbserver/container_execsync.go index 0016155ed..90ff79d7f 100644 --- a/pkg/cri/sbserver/container_execsync.go +++ b/pkg/cri/sbserver/container_execsync.go @@ -284,7 +284,7 @@ func drainExecIO(ctx context.Context, execProcess containerd.Process, attachDone return nil } - log.G(ctx).Debugf("Exec process %q exits but the io is still hold by other processes. Trying to delete exec process to release io", execProcess.ID()) + log.G(ctx).Debugf("Exec process %q exits but the io is still held by other processes. Trying to delete exec process to release io", execProcess.ID()) _, err := execProcess.Delete(ctx, containerd.WithProcessKill) if err != nil { if !errdefs.IsNotFound(err) { @@ -292,5 +292,5 @@ func drainExecIO(ctx context.Context, execProcess containerd.Process, attachDone execProcess.ID(), err) } } - return fmt.Errorf("failed to drain exec process %q io because io is still hold by other processes", execProcess.ID()) + return fmt.Errorf("failed to drain exec process %q io because io is still held by other processes", execProcess.ID()) } diff --git a/pkg/cri/server/container_execsync.go b/pkg/cri/server/container_execsync.go index 158ee9f23..c9d10cc6b 100644 --- a/pkg/cri/server/container_execsync.go +++ b/pkg/cri/server/container_execsync.go @@ -284,7 +284,7 @@ func drainExecIO(ctx context.Context, execProcess containerd.Process, attachDone return nil } - log.G(ctx).Debugf("Exec process %q exits but the io is still hold by other processes. Trying to delete exec process to release io", execProcess.ID()) + log.G(ctx).Debugf("Exec process %q exits but the io is still held by other processes. Trying to delete exec process to release io", execProcess.ID()) _, err := execProcess.Delete(ctx, containerd.WithProcessKill) if err != nil { if !errdefs.IsNotFound(err) { @@ -292,5 +292,5 @@ func drainExecIO(ctx context.Context, execProcess containerd.Process, attachDone execProcess.ID(), err) } } - return fmt.Errorf("failed to drain exec process %q io because io is still hold by other processes", execProcess.ID()) + return fmt.Errorf("failed to drain exec process %q io because io is still held by other processes", execProcess.ID()) } From 3c18decea741d86a01def544c51675268afd3936 Mon Sep 17 00:00:00 2001 From: Wei Fu Date: Fri, 3 Mar 2023 00:14:43 +0800 Subject: [PATCH 04/10] *: add DrainExecSyncIOTimeout config and disable as by default Signed-off-by: Wei Fu --- pkg/cri/config/config.go | 14 +++ pkg/cri/config/config_unix.go | 1 + pkg/cri/config/config_windows.go | 1 + pkg/cri/sbserver/container_execsync.go | 52 ++++++----- pkg/cri/sbserver/container_execsync_test.go | 98 +++++++++++++++++++++ pkg/cri/server/container_execsync.go | 52 ++++++----- pkg/cri/server/container_execsync_test.go | 98 +++++++++++++++++++++ script/test/utils.sh | 16 +++- 8 files changed, 284 insertions(+), 48 deletions(-) diff --git a/pkg/cri/config/config.go b/pkg/cri/config/config.go index 946baa9bc..33f81958c 100644 --- a/pkg/cri/config/config.go +++ b/pkg/cri/config/config.go @@ -351,6 +351,13 @@ type PluginConfig struct { // The string is in the golang duration format, see: // https://golang.org/pkg/time/#ParseDuration ImagePullProgressTimeout string `toml:"image_pull_progress_timeout" json:"imagePullProgressTimeout"` + // DrainExecSyncIOTimeout is the maximum duration to wait for ExecSync + // API' IO EOF event after exec init process exits. A zero value means + // there is no timeout. + // + // The string is in the golang duration format, see: + // https://golang.org/pkg/time/#ParseDuration + DrainExecSyncIOTimeout string `toml:"drain_exec_sync_io_timeout" json:"drainExecSyncIOTimeout"` } // X509KeyPairStreaming contains the x509 configuration for streaming @@ -509,5 +516,12 @@ func ValidatePluginConfig(ctx context.Context, c *PluginConfig) error { return fmt.Errorf("invalid image pull progress timeout: %w", err) } } + + // Validation for drain_exec_sync_io_timeout + if c.DrainExecSyncIOTimeout != "" { + if _, err := time.ParseDuration(c.DrainExecSyncIOTimeout); err != nil { + return fmt.Errorf("invalid drain exec sync io timeout: %w", err) + } + } return nil } diff --git a/pkg/cri/config/config_unix.go b/pkg/cri/config/config_unix.go index 0e8b393af..c131ce865 100644 --- a/pkg/cri/config/config_unix.go +++ b/pkg/cri/config/config_unix.go @@ -110,5 +110,6 @@ func DefaultConfig() PluginConfig { EnableCDI: false, CDISpecDirs: []string{"/etc/cdi", "/var/run/cdi"}, ImagePullProgressTimeout: time.Minute.String(), + DrainExecSyncIOTimeout: "0", } } diff --git a/pkg/cri/config/config_windows.go b/pkg/cri/config/config_windows.go index 55db40328..19fd020b7 100644 --- a/pkg/cri/config/config_windows.go +++ b/pkg/cri/config/config_windows.go @@ -86,5 +86,6 @@ func DefaultConfig() PluginConfig { KeyModel: KeyModelNode, }, ImagePullProgressTimeout: time.Minute.String(), + DrainExecSyncIOTimeout: "0", } } diff --git a/pkg/cri/sbserver/container_execsync.go b/pkg/cri/sbserver/container_execsync.go index 90ff79d7f..af0c15842 100644 --- a/pkg/cri/sbserver/container_execsync.go +++ b/pkg/cri/sbserver/container_execsync.go @@ -37,23 +37,6 @@ import ( cioutil "github.com/containerd/containerd/pkg/ioutil" ) -// defaultDrainExecIOTimeout is used to drain exec io after exec process exits. -// -// By default, the child processes spawned by exec process will inherit standard -// io file descriptors. The shim server creates a pipe as data channel. Both -// exec process and its children write data into the write end of the pipe. -// And the shim server will read data from the pipe. If the write end is still -// open, the shim server will continue to wait for data from pipe. -// -// If the exec command is like `bash -c "sleep 365d &"`, the exec process -// is bash and quit after create `sleep 365d`. But the `sleep 365d` will hold -// the write end of the pipe for a year! It doesn't make senses that CRI plugin -// should wait for it. -// -// So, CRI plugin uses 15 seconds to drain the exec io and then deletes exec -// process to stop copy io in shim side. -const defaultDrainExecIOTimeout = 15 * time.Second - type cappedWriter struct { w io.WriteCloser remain int @@ -134,6 +117,11 @@ func (c *criService) execInternal(ctx context.Context, container containerd.Cont ctx, cancel := context.WithCancel(ctx) defer cancel() + drainExecSyncIOTimeout, err := time.ParseDuration(c.config.DrainExecSyncIOTimeout) + if err != nil { + return nil, fmt.Errorf("failed to parse drain_exec_sync_io_timeout %q: %w", c.config.DrainExecSyncIOTimeout, err) + } + spec, err := container.Spec(ctx) if err != nil { return nil, fmt.Errorf("failed to get container spec: %w", err) @@ -224,7 +212,7 @@ func (c *criService) execInternal(ctx context.Context, container containerd.Cont log.G(ctx).Debugf("Timeout received while waiting for exec process kill %q code %d and error %v", execID, exitRes.ExitCode(), exitRes.Error()) - if err := drainExecIO(ctx, process, attachDone); err != nil { + if err := drainExecSyncIO(ctx, process, drainExecSyncIOTimeout, attachDone); err != nil { log.G(ctx).WithError(err).Warnf("failed to drain exec process %q io", execID) } @@ -236,7 +224,7 @@ func (c *criService) execInternal(ctx context.Context, container containerd.Cont return nil, fmt.Errorf("failed while waiting for exec %q: %w", execID, err) } - if err := drainExecIO(ctx, process, attachDone); err != nil { + if err := drainExecSyncIO(ctx, process, drainExecSyncIOTimeout, attachDone); err != nil { return nil, fmt.Errorf("failed to drain exec process %q io: %w", execID, err) } return &code, nil @@ -272,12 +260,30 @@ func (c *criService) execInContainer(ctx context.Context, id string, opts execOp return c.execInternal(ctx, cntr.Container, id, opts) } -func drainExecIO(ctx context.Context, execProcess containerd.Process, attachDone <-chan struct{}) error { - timer := time.NewTimer(defaultDrainExecIOTimeout) - defer timer.Stop() +// drainExecSyncIO drains process IO with timeout after exec init process exits. +// +// By default, the child processes spawned by exec process will inherit standard +// io file descriptors. The shim server creates a pipe as data channel. Both +// exec process and its children write data into the write end of the pipe. +// And the shim server will read data from the pipe. If the write end is still +// open, the shim server will continue to wait for data from pipe. +// +// If the exec command is like `bash -c "sleep 365d &"`, the exec process +// is bash and quit after create `sleep 365d`. But the `sleep 365d` will hold +// the write end of the pipe for a year! It doesn't make senses that CRI plugin +// should wait for it. +func drainExecSyncIO(ctx context.Context, execProcess containerd.Process, drainExecIOTimeout time.Duration, attachDone <-chan struct{}) error { + var timerCh <-chan time.Time + + if drainExecIOTimeout != 0 { + timer := time.NewTimer(drainExecIOTimeout) + defer timer.Stop() + + timerCh = timer.C + } select { - case <-timer.C: + case <-timerCh: case <-attachDone: log.G(ctx).Debugf("Stream pipe for exec process %q done", execProcess.ID()) diff --git a/pkg/cri/sbserver/container_execsync_test.go b/pkg/cri/sbserver/container_execsync_test.go index 3b23aa677..3f3ef274d 100644 --- a/pkg/cri/sbserver/container_execsync_test.go +++ b/pkg/cri/sbserver/container_execsync_test.go @@ -18,8 +18,14 @@ package sbserver import ( "bytes" + "context" + "os" + "syscall" "testing" + "time" + "github.com/containerd/containerd" + "github.com/containerd/containerd/cio" cioutil "github.com/containerd/containerd/pkg/ioutil" "github.com/stretchr/testify/assert" ) @@ -50,3 +56,95 @@ func TestCWClose(t *testing.T) { err := cw.Close() assert.NoError(t, err) } + +func TestDrainExecSyncIO(t *testing.T) { + ctx := context.TODO() + + t.Run("NoTimeout", func(t *testing.T) { + ep := &fakeExecProcess{ + id: t.Name(), + pid: uint32(os.Getpid()), + } + + attachDoneCh := make(chan struct{}) + time.AfterFunc(2*time.Second, func() { close(attachDoneCh) }) + assert.NoError(t, drainExecSyncIO(ctx, ep, 0, attachDoneCh)) + assert.Equal(t, 0, len(ep.actionEvents)) + }) + + t.Run("With3Seconds", func(t *testing.T) { + ep := &fakeExecProcess{ + id: t.Name(), + pid: uint32(os.Getpid()), + } + + attachDoneCh := make(chan struct{}) + time.AfterFunc(100*time.Second, func() { close(attachDoneCh) }) + assert.Error(t, drainExecSyncIO(ctx, ep, 3*time.Second, attachDoneCh)) + assert.Equal(t, []string{"Delete"}, ep.actionEvents) + }) +} + +type fakeExecProcess struct { + id string + pid uint32 + actionEvents []string +} + +// ID of the process +func (p *fakeExecProcess) ID() string { + return p.id +} + +// Pid is the system specific process id +func (p *fakeExecProcess) Pid() uint32 { + return p.pid +} + +// Start starts the process executing the user's defined binary +func (p *fakeExecProcess) Start(context.Context) error { + p.actionEvents = append(p.actionEvents, "Start") + return nil +} + +// Delete removes the process and any resources allocated returning the exit status +func (p *fakeExecProcess) Delete(context.Context, ...containerd.ProcessDeleteOpts) (*containerd.ExitStatus, error) { + p.actionEvents = append(p.actionEvents, "Delete") + return nil, nil +} + +// Kill sends the provided signal to the process +func (p *fakeExecProcess) Kill(context.Context, syscall.Signal, ...containerd.KillOpts) error { + p.actionEvents = append(p.actionEvents, "Kill") + return nil +} + +// Wait asynchronously waits for the process to exit, and sends the exit code to the returned channel +func (p *fakeExecProcess) Wait(context.Context) (<-chan containerd.ExitStatus, error) { + p.actionEvents = append(p.actionEvents, "Wait") + return nil, nil +} + +// CloseIO allows various pipes to be closed on the process +func (p *fakeExecProcess) CloseIO(context.Context, ...containerd.IOCloserOpts) error { + p.actionEvents = append(p.actionEvents, "CloseIO") + return nil +} + +// Resize changes the width and height of the process's terminal +func (p *fakeExecProcess) Resize(ctx context.Context, w, h uint32) error { + p.actionEvents = append(p.actionEvents, "Resize") + return nil +} + +// IO returns the io set for the process +func (p *fakeExecProcess) IO() cio.IO { + p.actionEvents = append(p.actionEvents, "IO") + return nil +} + +// Status returns the executing status of the process +func (p *fakeExecProcess) Status(context.Context) (containerd.Status, error) { + p.actionEvents = append(p.actionEvents, "Status") + return containerd.Status{}, nil +} diff --git a/pkg/cri/server/container_execsync.go b/pkg/cri/server/container_execsync.go index c9d10cc6b..a5884a644 100644 --- a/pkg/cri/server/container_execsync.go +++ b/pkg/cri/server/container_execsync.go @@ -37,23 +37,6 @@ import ( cioutil "github.com/containerd/containerd/pkg/ioutil" ) -// defaultDrainExecIOTimeout is used to drain exec io after exec process exits. -// -// By default, the child processes spawned by exec process will inherit standard -// io file descriptors. The shim server creates a pipe as data channel. Both -// exec process and its children write data into the write end of the pipe. -// And the shim server will read data from the pipe. If the write end is still -// open, the shim server will continue to wait for data from pipe. -// -// If the exec command is like `bash -c "sleep 365d &"`, the exec process -// is bash and quit after create `sleep 365d`. But the `sleep 365d` will hold -// the write end of the pipe for a year! It doesn't make senses that CRI plugin -// should wait for it. -// -// So, CRI plugin uses 15 seconds to drain the exec io and then deletes exec -// process to stop copy io in shim side. -const defaultDrainExecIOTimeout = 15 * time.Second - type cappedWriter struct { w io.WriteCloser remain int @@ -134,6 +117,11 @@ func (c *criService) execInternal(ctx context.Context, container containerd.Cont ctx, cancel := context.WithCancel(ctx) defer cancel() + drainExecSyncIOTimeout, err := time.ParseDuration(c.config.DrainExecSyncIOTimeout) + if err != nil { + return nil, fmt.Errorf("failed to parse drain_exec_sync_io_timeout %q: %w", c.config.DrainExecSyncIOTimeout, err) + } + spec, err := container.Spec(ctx) if err != nil { return nil, fmt.Errorf("failed to get container spec: %w", err) @@ -224,7 +212,7 @@ func (c *criService) execInternal(ctx context.Context, container containerd.Cont log.G(ctx).Debugf("Timeout received while waiting for exec process kill %q code %d and error %v", execID, exitRes.ExitCode(), exitRes.Error()) - if err := drainExecIO(ctx, process, attachDone); err != nil { + if err := drainExecSyncIO(ctx, process, drainExecSyncIOTimeout, attachDone); err != nil { log.G(ctx).WithError(err).Warnf("failed to drain exec process %q io", execID) } @@ -236,7 +224,7 @@ func (c *criService) execInternal(ctx context.Context, container containerd.Cont return nil, fmt.Errorf("failed while waiting for exec %q: %w", execID, err) } - if err := drainExecIO(ctx, process, attachDone); err != nil { + if err := drainExecSyncIO(ctx, process, drainExecSyncIOTimeout, attachDone); err != nil { return nil, fmt.Errorf("failed to drain exec process %q io: %w", execID, err) } return &code, nil @@ -272,12 +260,30 @@ func (c *criService) execInContainer(ctx context.Context, id string, opts execOp return c.execInternal(ctx, cntr.Container, id, opts) } -func drainExecIO(ctx context.Context, execProcess containerd.Process, attachDone <-chan struct{}) error { - timer := time.NewTimer(defaultDrainExecIOTimeout) - defer timer.Stop() +// drainExecSyncIO drains process IO with timeout after exec init process exits. +// +// By default, the child processes spawned by exec process will inherit standard +// io file descriptors. The shim server creates a pipe as data channel. Both +// exec process and its children write data into the write end of the pipe. +// And the shim server will read data from the pipe. If the write end is still +// open, the shim server will continue to wait for data from pipe. +// +// If the exec command is like `bash -c "sleep 365d &"`, the exec process +// is bash and quit after create `sleep 365d`. But the `sleep 365d` will hold +// the write end of the pipe for a year! It doesn't make senses that CRI plugin +// should wait for it. +func drainExecSyncIO(ctx context.Context, execProcess containerd.Process, drainExecIOTimeout time.Duration, attachDone <-chan struct{}) error { + var timerCh <-chan time.Time + + if drainExecIOTimeout != 0 { + timer := time.NewTimer(drainExecIOTimeout) + defer timer.Stop() + + timerCh = timer.C + } select { - case <-timer.C: + case <-timerCh: case <-attachDone: log.G(ctx).Debugf("Stream pipe for exec process %q done", execProcess.ID()) diff --git a/pkg/cri/server/container_execsync_test.go b/pkg/cri/server/container_execsync_test.go index 989909649..48f72abef 100644 --- a/pkg/cri/server/container_execsync_test.go +++ b/pkg/cri/server/container_execsync_test.go @@ -18,8 +18,14 @@ package server import ( "bytes" + "context" + "os" + "syscall" "testing" + "time" + "github.com/containerd/containerd" + "github.com/containerd/containerd/cio" cioutil "github.com/containerd/containerd/pkg/ioutil" "github.com/stretchr/testify/assert" ) @@ -50,3 +56,95 @@ func TestCWClose(t *testing.T) { err := cw.Close() assert.NoError(t, err) } + +func TestDrainExecSyncIO(t *testing.T) { + ctx := context.TODO() + + t.Run("NoTimeout", func(t *testing.T) { + ep := &fakeExecProcess{ + id: t.Name(), + pid: uint32(os.Getpid()), + } + + attachDoneCh := make(chan struct{}) + time.AfterFunc(2*time.Second, func() { close(attachDoneCh) }) + assert.NoError(t, drainExecSyncIO(ctx, ep, 0, attachDoneCh)) + assert.Equal(t, 0, len(ep.actionEvents)) + }) + + t.Run("With3Seconds", func(t *testing.T) { + ep := &fakeExecProcess{ + id: t.Name(), + pid: uint32(os.Getpid()), + } + + attachDoneCh := make(chan struct{}) + time.AfterFunc(100*time.Second, func() { close(attachDoneCh) }) + assert.Error(t, drainExecSyncIO(ctx, ep, 3*time.Second, attachDoneCh)) + assert.Equal(t, []string{"Delete"}, ep.actionEvents) + }) +} + +type fakeExecProcess struct { + id string + pid uint32 + actionEvents []string +} + +// ID of the process +func (p *fakeExecProcess) ID() string { + return p.id +} + +// Pid is the system specific process id +func (p *fakeExecProcess) Pid() uint32 { + return p.pid +} + +// Start starts the process executing the user's defined binary +func (p *fakeExecProcess) Start(context.Context) error { + p.actionEvents = append(p.actionEvents, "Start") + return nil +} + +// Delete removes the process and any resources allocated returning the exit status +func (p *fakeExecProcess) Delete(context.Context, ...containerd.ProcessDeleteOpts) (*containerd.ExitStatus, error) { + p.actionEvents = append(p.actionEvents, "Delete") + return nil, nil +} + +// Kill sends the provided signal to the process +func (p *fakeExecProcess) Kill(context.Context, syscall.Signal, ...containerd.KillOpts) error { + p.actionEvents = append(p.actionEvents, "Kill") + return nil +} + +// Wait asynchronously waits for the process to exit, and sends the exit code to the returned channel +func (p *fakeExecProcess) Wait(context.Context) (<-chan containerd.ExitStatus, error) { + p.actionEvents = append(p.actionEvents, "Wait") + return nil, nil +} + +// CloseIO allows various pipes to be closed on the process +func (p *fakeExecProcess) CloseIO(context.Context, ...containerd.IOCloserOpts) error { + p.actionEvents = append(p.actionEvents, "CloseIO") + return nil +} + +// Resize changes the width and height of the process's terminal +func (p *fakeExecProcess) Resize(ctx context.Context, w, h uint32) error { + p.actionEvents = append(p.actionEvents, "Resize") + return nil +} + +// IO returns the io set for the process +func (p *fakeExecProcess) IO() cio.IO { + p.actionEvents = append(p.actionEvents, "IO") + return nil +} + +// Status returns the executing status of the process +func (p *fakeExecProcess) Status(context.Context) (containerd.Status, error) { + p.actionEvents = append(p.actionEvents, "Status") + return containerd.Status{}, nil +} diff --git a/script/test/utils.sh b/script/test/utils.sh index f3a009816..8eedc040e 100755 --- a/script/test/utils.sh +++ b/script/test/utils.sh @@ -39,11 +39,23 @@ CONTAINERD_RUNTIME=${CONTAINERD_RUNTIME:-""} if [ -z "${CONTAINERD_CONFIG_FILE}" ]; then config_file="${CONTAINERD_CONFIG_DIR}/containerd-config-cri.toml" truncate --size 0 "${config_file}" - echo "version=2" >> ${config_file} + # TODO(fuweid): if the config.Imports supports patch update, it will be easy + # to write the integration test case with different configuration, like: + # + # 1. write configuration into importable containerd config path. + # 2. restart containerd + # 3. verify the behaviour + # 4. delete the configuration + # 5. restart containerd + cat >>${config_file} </dev/null 2>&1; then cat >>${config_file} < Date: Fri, 3 Mar 2023 09:15:31 +0800 Subject: [PATCH 05/10] *: update drainExecSyncIO docs and validate the timeout We should validate the drainExecSyncIO timeout at the beginning and raise the error for any invalid input. Signed-off-by: Wei Fu --- docs/cri/config.md | 10 ++++++++++ pkg/cri/config/config.go | 2 ++ pkg/cri/config/config_unix.go | 2 +- pkg/cri/config/config_windows.go | 2 +- 4 files changed, 14 insertions(+), 2 deletions(-) diff --git a/docs/cri/config.md b/docs/cri/config.md index 67c25d114..19a0276f1 100644 --- a/docs/cri/config.md +++ b/docs/cri/config.md @@ -227,6 +227,16 @@ version = 2 # https://github.com/container-orchestrated-devices/container-device-interface#containerd-configuration cdi_spec_dirs = ["/etc/cdi", "/var/run/cdi"] + # drain_exec_sync_io_timeout is the maximum duration to wait for ExecSync API' + # IO EOF event after exec init process exits. A zero value means there is no + # timeout. + # + # The string is in the golang duration format, see: + # https://golang.org/pkg/time/#ParseDuration + # + # For example, the value can be '5h', '2h30m', '10s'. + drain_exec_sync_io_timeout = "0s" + # 'plugins."io.containerd.grpc.v1.cri".containerd' contains config related to containerd [plugins."io.containerd.grpc.v1.cri".containerd] diff --git a/pkg/cri/config/config.go b/pkg/cri/config/config.go index 33f81958c..7f0403c30 100644 --- a/pkg/cri/config/config.go +++ b/pkg/cri/config/config.go @@ -357,6 +357,8 @@ type PluginConfig struct { // // The string is in the golang duration format, see: // https://golang.org/pkg/time/#ParseDuration + // + // For example, the value can be '5h', '2h30m', '10s'. DrainExecSyncIOTimeout string `toml:"drain_exec_sync_io_timeout" json:"drainExecSyncIOTimeout"` } diff --git a/pkg/cri/config/config_unix.go b/pkg/cri/config/config_unix.go index c131ce865..72348ca3f 100644 --- a/pkg/cri/config/config_unix.go +++ b/pkg/cri/config/config_unix.go @@ -110,6 +110,6 @@ func DefaultConfig() PluginConfig { EnableCDI: false, CDISpecDirs: []string{"/etc/cdi", "/var/run/cdi"}, ImagePullProgressTimeout: time.Minute.String(), - DrainExecSyncIOTimeout: "0", + DrainExecSyncIOTimeout: "0s", } } diff --git a/pkg/cri/config/config_windows.go b/pkg/cri/config/config_windows.go index 19fd020b7..63bb2decd 100644 --- a/pkg/cri/config/config_windows.go +++ b/pkg/cri/config/config_windows.go @@ -86,6 +86,6 @@ func DefaultConfig() PluginConfig { KeyModel: KeyModelNode, }, ImagePullProgressTimeout: time.Minute.String(), - DrainExecSyncIOTimeout: "0", + DrainExecSyncIOTimeout: "0s", } } From ffebcb1223ec88f2cb32df60b956e3a09e92bcf1 Mon Sep 17 00:00:00 2001 From: Wei Fu Date: Fri, 3 Mar 2023 09:19:22 +0800 Subject: [PATCH 06/10] cri: disable drain-exec-IO if it is empty timeout Signed-off-by: Wei Fu --- pkg/cri/sbserver/container_execsync.go | 15 +++++++++++---- pkg/cri/server/container_execsync.go | 15 +++++++++++---- 2 files changed, 22 insertions(+), 8 deletions(-) diff --git a/pkg/cri/sbserver/container_execsync.go b/pkg/cri/sbserver/container_execsync.go index af0c15842..6edd96da7 100644 --- a/pkg/cri/sbserver/container_execsync.go +++ b/pkg/cri/sbserver/container_execsync.go @@ -117,9 +117,15 @@ func (c *criService) execInternal(ctx context.Context, container containerd.Cont ctx, cancel := context.WithCancel(ctx) defer cancel() - drainExecSyncIOTimeout, err := time.ParseDuration(c.config.DrainExecSyncIOTimeout) - if err != nil { - return nil, fmt.Errorf("failed to parse drain_exec_sync_io_timeout %q: %w", c.config.DrainExecSyncIOTimeout, err) + var drainExecSyncIOTimeout time.Duration + var err error + + if c.config.DrainExecSyncIOTimeout != "" { + drainExecSyncIOTimeout, err = time.ParseDuration(c.config.DrainExecSyncIOTimeout) + if err != nil { + return nil, fmt.Errorf("failed to parse drain_exec_sync_io_timeout %q: %w", + c.config.DrainExecSyncIOTimeout, err) + } } spec, err := container.Spec(ctx) @@ -298,5 +304,6 @@ func drainExecSyncIO(ctx context.Context, execProcess containerd.Process, drainE execProcess.ID(), err) } } - return fmt.Errorf("failed to drain exec process %q io because io is still held by other processes", execProcess.ID()) + return fmt.Errorf("failed to drain exec process %q io in %s because io is still held by other processes", + execProcess.ID(), drainExecIOTimeout) } diff --git a/pkg/cri/server/container_execsync.go b/pkg/cri/server/container_execsync.go index a5884a644..63ba72512 100644 --- a/pkg/cri/server/container_execsync.go +++ b/pkg/cri/server/container_execsync.go @@ -117,9 +117,15 @@ func (c *criService) execInternal(ctx context.Context, container containerd.Cont ctx, cancel := context.WithCancel(ctx) defer cancel() - drainExecSyncIOTimeout, err := time.ParseDuration(c.config.DrainExecSyncIOTimeout) - if err != nil { - return nil, fmt.Errorf("failed to parse drain_exec_sync_io_timeout %q: %w", c.config.DrainExecSyncIOTimeout, err) + var drainExecSyncIOTimeout time.Duration + var err error + + if c.config.DrainExecSyncIOTimeout != "" { + drainExecSyncIOTimeout, err = time.ParseDuration(c.config.DrainExecSyncIOTimeout) + if err != nil { + return nil, fmt.Errorf("failed to parse drain_exec_sync_io_timeout %q: %w", + c.config.DrainExecSyncIOTimeout, err) + } } spec, err := container.Spec(ctx) @@ -298,5 +304,6 @@ func drainExecSyncIO(ctx context.Context, execProcess containerd.Process, drainE execProcess.ID(), err) } } - return fmt.Errorf("failed to drain exec process %q io because io is still held by other processes", execProcess.ID()) + return fmt.Errorf("failed to drain exec process %q io in %s because io is still held by other processes", + execProcess.ID(), drainExecIOTimeout) } From 55e25f16440eb5a79c1abfc1149ce9feaded4158 Mon Sep 17 00:00:00 2001 From: Wei Fu Date: Fri, 3 Mar 2023 09:20:02 +0800 Subject: [PATCH 07/10] integration: add testcase to drain exec IO in time Signed-off-by: Wei Fu --- integration/container_exec_test.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/integration/container_exec_test.go b/integration/container_exec_test.go index 1fcd1d765..be2635f68 100644 --- a/integration/container_exec_test.go +++ b/integration/container_exec_test.go @@ -65,4 +65,8 @@ func TestContainerDrainExecIOAfterExit(t *testing.T) { t.Log("Exec in container") _, _, err = runtimeService.ExecSync(cn, []string{"sh", "-c", "sleep 365d &"}, 5*time.Second) require.ErrorContains(t, err, "failed to drain exec process") + + t.Log("Exec in container") + _, _, err = runtimeService.ExecSync(cn, []string{"sh", "-c", "sleep 2s &"}, 10*time.Second) + require.NoError(t, err, "should drain IO in time") } From 01671e9fc56ccb5323072af674aa81e75b36d3d4 Mon Sep 17 00:00:00 2001 From: Wei Fu Date: Fri, 3 Mar 2023 09:58:00 +0800 Subject: [PATCH 08/10] cri: add config ut for invalid drain io timeout value Signed-off-by: Wei Fu --- pkg/cri/config/config_test.go | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/pkg/cri/config/config_test.go b/pkg/cri/config/config_test.go index 92b93fc76..620d4906c 100644 --- a/pkg/cri/config/config_test.go +++ b/pkg/cri/config/config_test.go @@ -383,6 +383,20 @@ func TestValidateConfig(t *testing.T) { }, expectedErr: "`privileged_without_host_devices_all_devices_allowed` requires `privileged_without_host_devices` to be enabled", }, + "invalid drain_exec_sync_io_timeout input": { + config: &PluginConfig{ + ContainerdConfig: ContainerdConfig{ + DefaultRuntimeName: RuntimeDefault, + Runtimes: map[string]Runtime{ + RuntimeDefault: { + Type: "default", + }, + }, + }, + DrainExecSyncIOTimeout: "10", + }, + expectedErr: "invalid drain exec sync io timeout: time: missing unit in duration \"10\"", + }, } { t.Run(desc, func(t *testing.T) { err := ValidatePluginConfig(context.Background(), test.config) From 98cb6d7eb8c95de40e8d908a3285e94fb16a44e3 Mon Sep 17 00:00:00 2001 From: Wei Fu Date: Fri, 3 Mar 2023 12:19:40 +0800 Subject: [PATCH 09/10] cri/sbserver: ignore the NOT_FOUND error in exec cleanup Signed-off-by: Wei Fu --- pkg/cri/sbserver/container_execsync.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/cri/sbserver/container_execsync.go b/pkg/cri/sbserver/container_execsync.go index 6edd96da7..cd67037ac 100644 --- a/pkg/cri/sbserver/container_execsync.go +++ b/pkg/cri/sbserver/container_execsync.go @@ -170,7 +170,7 @@ func (c *criService) execInternal(ctx context.Context, container containerd.Cont defer func() { deferCtx, deferCancel := util.DeferContext() defer deferCancel() - if _, err := process.Delete(deferCtx, containerd.WithProcessKill); err != nil { + if _, err := process.Delete(deferCtx, containerd.WithProcessKill); err != nil && !errdefs.IsNotFound(err) { log.G(ctx).WithError(err).Errorf("Failed to delete exec process %q for container %q", execID, id) } }() From 5946c1051ed84b4bc3a5e3bb98f516b1dd648f3d Mon Sep 17 00:00:00 2001 From: Wei Fu Date: Fri, 3 Mar 2023 17:51:03 +0800 Subject: [PATCH 10/10] *: fix code style issue 1. it's easy to check wrong input if using drain_exec_sync_io_timeout in error 2. avoid to use full error message, as part of error generated by go stdlib would be changed in the future 3. delete the extra empty line Signed-off-by: Wei Fu --- pkg/cri/config/config.go | 2 +- pkg/cri/config/config_test.go | 2 +- pkg/cri/sbserver/container_execsync.go | 1 - pkg/cri/server/container_execsync.go | 1 - 4 files changed, 2 insertions(+), 4 deletions(-) diff --git a/pkg/cri/config/config.go b/pkg/cri/config/config.go index 7f0403c30..ad16bbe20 100644 --- a/pkg/cri/config/config.go +++ b/pkg/cri/config/config.go @@ -522,7 +522,7 @@ func ValidatePluginConfig(ctx context.Context, c *PluginConfig) error { // Validation for drain_exec_sync_io_timeout if c.DrainExecSyncIOTimeout != "" { if _, err := time.ParseDuration(c.DrainExecSyncIOTimeout); err != nil { - return fmt.Errorf("invalid drain exec sync io timeout: %w", err) + return fmt.Errorf("invalid `drain_exec_sync_io_timeout`: %w", err) } } return nil diff --git a/pkg/cri/config/config_test.go b/pkg/cri/config/config_test.go index 620d4906c..4c2a80419 100644 --- a/pkg/cri/config/config_test.go +++ b/pkg/cri/config/config_test.go @@ -395,7 +395,7 @@ func TestValidateConfig(t *testing.T) { }, DrainExecSyncIOTimeout: "10", }, - expectedErr: "invalid drain exec sync io timeout: time: missing unit in duration \"10\"", + expectedErr: "invalid `drain_exec_sync_io_timeout`", }, } { t.Run(desc, func(t *testing.T) { diff --git a/pkg/cri/sbserver/container_execsync.go b/pkg/cri/sbserver/container_execsync.go index cd67037ac..3e4090f3e 100644 --- a/pkg/cri/sbserver/container_execsync.go +++ b/pkg/cri/sbserver/container_execsync.go @@ -290,7 +290,6 @@ func drainExecSyncIO(ctx context.Context, execProcess containerd.Process, drainE select { case <-timerCh: - case <-attachDone: log.G(ctx).Debugf("Stream pipe for exec process %q done", execProcess.ID()) return nil diff --git a/pkg/cri/server/container_execsync.go b/pkg/cri/server/container_execsync.go index 63ba72512..23885471e 100644 --- a/pkg/cri/server/container_execsync.go +++ b/pkg/cri/server/container_execsync.go @@ -290,7 +290,6 @@ func drainExecSyncIO(ctx context.Context, execProcess containerd.Process, drainE select { case <-timerCh: - case <-attachDone: log.G(ctx).Debugf("Stream pipe for exec process %q done", execProcess.ID()) return nil