diff --git a/integration/container_exec_test.go b/integration/container_exec_test.go new file mode 100644 index 000000000..553faec6e --- /dev/null +++ b/integration/container_exec_test.go @@ -0,0 +1,62 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package integration + +import ( + "testing" + "time" + + "github.com/containerd/containerd/integration/images" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestContainerDrainExecIOAfterExit(t *testing.T) { + t.Log("Create a sandbox") + sb, sbConfig := PodSandboxConfigWithCleanup(t, "sandbox", "container-exec-drain-io-after-exit") + + var ( + testImage = images.Get(images.BusyBox) + containerName = "test-container-exec" + ) + + EnsureImageExists(t, testImage) + + t.Log("Create a container") + cnConfig := ContainerConfig( + containerName, + testImage, + WithCommand("sh", "-c", "sleep 365d"), + ) + + cn, err := runtimeService.CreateContainer(sb, cnConfig, sbConfig) + require.NoError(t, err) + defer func() { + assert.NoError(t, runtimeService.RemoveContainer(cn)) + }() + + t.Log("Start the container") + require.NoError(t, runtimeService.StartContainer(cn)) + defer func() { + assert.NoError(t, runtimeService.StopContainer(cn, 10)) + }() + + t.Log("Exec in container") + _, _, err = runtimeService.ExecSync(cn, []string{"sh", "-c", "sleep 365d &"}, 5*time.Second) + require.ErrorContains(t, err, "failed to drain exec process") +} diff --git a/pkg/cri/server/container_execsync.go b/pkg/cri/server/container_execsync.go index 51bacd163..158ee9f23 100644 --- a/pkg/cri/server/container_execsync.go +++ b/pkg/cri/server/container_execsync.go @@ -37,6 +37,23 @@ import ( cioutil "github.com/containerd/containerd/pkg/ioutil" ) +// defaultDrainExecIOTimeout is used to drain exec io after exec process exits. +// +// By default, the child processes spawned by exec process will inherit standard +// io file descriptors. The shim server creates a pipe as data channel. Both +// exec process and its children write data into the write end of the pipe. +// And the shim server will read data from the pipe. If the write end is still +// open, the shim server will continue to wait for data from pipe. +// +// If the exec command is like `bash -c "sleep 365d &"`, the exec process +// is bash and quit after create `sleep 365d`. But the `sleep 365d` will hold +// the write end of the pipe for a year! It doesn't make senses that CRI plugin +// should wait for it. +// +// So, CRI plugin uses 15 seconds to drain the exec io and then deletes exec +// process to stop copy io in shim side. +const defaultDrainExecIOTimeout = 15 * time.Second + type cappedWriter struct { w io.WriteCloser remain int @@ -159,7 +176,7 @@ func (c *criService) execInternal(ctx context.Context, container containerd.Cont defer func() { deferCtx, deferCancel := util.DeferContext() defer deferCancel() - if _, err := process.Delete(deferCtx, containerd.WithProcessKill); err != nil { + if _, err := process.Delete(deferCtx, containerd.WithProcessKill); err != nil && !errdefs.IsNotFound(err) { log.G(ctx).WithError(err).Errorf("Failed to delete exec process %q for container %q", execID, id) } }() @@ -206,8 +223,11 @@ func (c *criService) execInternal(ctx context.Context, container containerd.Cont exitRes := <-exitCh log.G(ctx).Debugf("Timeout received while waiting for exec process kill %q code %d and error %v", execID, exitRes.ExitCode(), exitRes.Error()) - <-attachDone - log.G(ctx).Debugf("Stream pipe for exec process %q done", execID) + + if err := drainExecIO(ctx, process, attachDone); err != nil { + log.G(ctx).WithError(err).Warnf("failed to drain exec process %q io", execID) + } + return nil, fmt.Errorf("timeout %v exceeded: %w", opts.timeout, execCtx.Err()) case exitRes := <-exitCh: code, _, err := exitRes.Result() @@ -215,8 +235,10 @@ func (c *criService) execInternal(ctx context.Context, container containerd.Cont if err != nil { return nil, fmt.Errorf("failed while waiting for exec %q: %w", execID, err) } - <-attachDone - log.G(ctx).Debugf("Stream pipe for exec process %q done", execID) + + if err := drainExecIO(ctx, process, attachDone); err != nil { + return nil, fmt.Errorf("failed to drain exec process %q io: %w", execID, err) + } return &code, nil } } @@ -249,3 +271,26 @@ func (c *criService) execInContainer(ctx context.Context, id string, opts execOp return c.execInternal(ctx, cntr.Container, id, opts) } + +func drainExecIO(ctx context.Context, execProcess containerd.Process, attachDone <-chan struct{}) error { + timer := time.NewTimer(defaultDrainExecIOTimeout) + defer timer.Stop() + + select { + case <-timer.C: + + case <-attachDone: + log.G(ctx).Debugf("Stream pipe for exec process %q done", execProcess.ID()) + return nil + } + + log.G(ctx).Debugf("Exec process %q exits but the io is still hold by other processes. Trying to delete exec process to release io", execProcess.ID()) + _, err := execProcess.Delete(ctx, containerd.WithProcessKill) + if err != nil { + if !errdefs.IsNotFound(err) { + return fmt.Errorf("failed to release exec io by deleting exec process %q: %w", + execProcess.ID(), err) + } + } + return fmt.Errorf("failed to drain exec process %q io because io is still hold by other processes", execProcess.ID()) +}