diff --git a/Vagrantfile b/Vagrantfile index d90f352ba..42a96c2ca 100644 --- a/Vagrantfile +++ b/Vagrantfile @@ -80,6 +80,7 @@ Vagrant.configure("2") do |config| libselinux-devel \ lsof \ make \ + strace \ ${INSTALL_PACKAGES} SHELL end diff --git a/integration/issue7496_linux_test.go b/integration/issue7496_linux_test.go new file mode 100644 index 000000000..7cca02c98 --- /dev/null +++ b/integration/issue7496_linux_test.go @@ -0,0 +1,172 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package integration + +import ( + "bufio" + "context" + "io" + "net" + "os" + "strconv" + "strings" + "syscall" + "testing" + "time" + + apitask "github.com/containerd/containerd/api/runtime/task/v2" + "github.com/containerd/containerd/integration/images" + "github.com/containerd/containerd/namespaces" + "github.com/containerd/containerd/runtime/v2/shim" + "github.com/containerd/ttrpc" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + exec "golang.org/x/sys/execabs" +) + +// TestIssue7496 is used to reproduce https://github.com/containerd/containerd/issues/7496 +// +// NOTE: https://github.com/containerd/containerd/issues/8931 is the same issue. +func TestIssue7496(t *testing.T) { + ctx := namespaces.WithNamespace(context.Background(), "k8s.io") + + t.Logf("Create a pod config and run sandbox container") + sbConfig := PodSandboxConfig("sandbox", "issue7496") + sbID, err := runtimeService.RunPodSandbox(sbConfig, *runtimeHandler) + require.NoError(t, err) + + shimCli := connectToShim(ctx, t, sbID) + + delayInSec := 12 + t.Logf("[shim pid: %d]: Injecting %d seconds delay to umount2 syscall", + shimPid(ctx, t, shimCli), + delayInSec) + + doneCh := injectDelayToUmount2(ctx, t, shimCli, delayInSec /* CRI plugin uses 10 seconds to delete task */) + + t.Logf("Create a container config and run container in a pod") + pauseImage := images.Get(images.Pause) + EnsureImageExists(t, pauseImage) + + containerConfig := ContainerConfig("pausecontainer", pauseImage) + cnID, err := runtimeService.CreateContainer(sbID, containerConfig, sbConfig) + require.NoError(t, err) + require.NoError(t, runtimeService.StartContainer(cnID)) + + t.Logf("Start to StopPodSandbox and RemovePodSandbox") + ctx, cancelFn := context.WithTimeout(ctx, 3*time.Minute) + defer cancelFn() + for { + select { + case <-ctx.Done(): + require.NoError(t, ctx.Err(), "The StopPodSandbox should be done in time") + default: + } + + err := runtimeService.StopPodSandbox(sbID) + if err != nil { + t.Logf("Failed to StopPodSandbox: %v", err) + continue + } + + err = runtimeService.RemovePodSandbox(sbID) + if err == nil { + break + } + t.Logf("Failed to RemovePodSandbox: %v", err) + time.Sleep(1 * time.Second) + } + + t.Logf("PodSandbox %s has been deleted and start to wait for strace exit", sbID) + select { + case <-time.After(15 * time.Second): + resp, err := shimCli.Connect(ctx, &apitask.ConnectRequest{}) + assert.Error(t, err, "should failed to call shim connect API") + + t.Errorf("Strace doesn't exit in time") + + t.Logf("Cleanup the shim (pid: %d)", resp.GetShimPid()) + syscall.Kill(int(resp.GetShimPid()), syscall.SIGKILL) + <-doneCh + case <-doneCh: + } +} + +// injectDelayToUmount2 uses strace(1) to inject delay on umount2 syscall to +// simulate IO pressure because umount2 might force kernel to syncfs, for +// example, umount overlayfs rootfs which doesn't with volatile. +// +// REF: https://man7.org/linux/man-pages/man1/strace.1.html +func injectDelayToUmount2(ctx context.Context, t *testing.T, shimCli apitask.TaskService, delayInSec int) chan struct{} { + pid := shimPid(ctx, t, shimCli) + + doneCh := make(chan struct{}) + + cmd := exec.CommandContext(ctx, "strace", + "-p", strconv.Itoa(int(pid)), "-f", // attach to all the threads + "--detach-on=execve", // stop to attach runc child-processes + "--trace=umount2", // only trace umount2 syscall + "-e", "inject=umount2:delay_enter="+strconv.Itoa(delayInSec)+"s", + ) + cmd.SysProcAttr = &syscall.SysProcAttr{Pdeathsig: syscall.SIGKILL} + + pipeR, pipeW := io.Pipe() + cmd.Stdout = pipeW + cmd.Stderr = pipeW + + require.NoError(t, cmd.Start()) + + // ensure that strace has attached to the shim + readyCh := make(chan struct{}) + go func() { + defer close(doneCh) + + bufReader := bufio.NewReader(pipeR) + _, err := bufReader.Peek(1) + assert.NoError(t, err, "failed to ensure that strace has attached to shim") + + close(readyCh) + io.Copy(os.Stdout, bufReader) + t.Logf("Strace has exited") + }() + + go func() { + defer pipeW.Close() + assert.NoError(t, cmd.Wait(), "strace should exit with zero code") + }() + + <-readyCh + return doneCh +} + +func connectToShim(ctx context.Context, t *testing.T, id string) apitask.TaskService { + addr, err := shim.SocketAddress(ctx, containerdEndpoint, id) + require.NoError(t, err) + addr = strings.TrimPrefix(addr, "unix://") + + conn, err := net.Dial("unix", addr) + require.NoError(t, err) + + client := ttrpc.NewClient(conn) + return apitask.NewTaskClient(client) +} + +func shimPid(ctx context.Context, t *testing.T, shimCli apitask.TaskService) uint32 { + resp, err := shimCli.Connect(ctx, &apitask.ConnectRequest{}) + require.NoError(t, err) + return resp.GetShimPid() +} diff --git a/integration/issue7496_shutdown_linux_test.go b/integration/issue7496_shutdown_linux_test.go new file mode 100644 index 000000000..e9e1cdc58 --- /dev/null +++ b/integration/issue7496_shutdown_linux_test.go @@ -0,0 +1,65 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package integration + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + + apitask "github.com/containerd/containerd/api/runtime/task/v2" + "github.com/containerd/containerd/namespaces" +) + +// TestIssue7496_ShouldRetryShutdown is based on https://github.com/containerd/containerd/issues/7496. +// +// Assume that the shim.Delete takes almost 10 seconds and returns successfully +// and there is no container in shim. However, the context is close to be +// canceled. It will fail to call Shutdown. If we ignores the Canceled error, +// the shim will be leaked. In order to reproduce this, this case will use +// failpoint to inject error into Shutdown API, and then check whether the shim +// is leaked. +func TestIssue7496_ShouldRetryShutdown(t *testing.T) { + // TODO: re-enable if we can retry Shutdown API. + t.Skipf("Please re-enable me if we can retry Shutdown API") + + ctx := namespaces.WithNamespace(context.Background(), "k8s.io") + + t.Logf("Create a pod config with shutdown failpoint") + sbConfig := PodSandboxConfig("sandbox", "issue7496_shouldretryshutdown") + injectShimFailpoint(t, sbConfig, map[string]string{ + "Shutdown": "1*error(please retry)", + }) + + t.Logf("RunPodSandbox") + sbID, err := runtimeService.RunPodSandbox(sbConfig, failpointRuntimeHandler) + require.NoError(t, err) + + t.Logf("Connect to the shim %s", sbID) + shimCli := connectToShim(ctx, t, sbID) + + t.Logf("Log shim %s's pid: %d", sbID, shimPid(ctx, t, shimCli)) + + t.Logf("StopPodSandbox and RemovePodSandbox") + require.NoError(t, runtimeService.StopPodSandbox(sbID)) + require.NoError(t, runtimeService.RemovePodSandbox(sbID)) + + t.Logf("Check the shim connection") + _, err = shimCli.Connect(ctx, &apitask.ConnectRequest{}) + require.Error(t, err, "should failed to call shim connect API") +} diff --git a/pkg/cri/sbserver/events.go b/pkg/cri/sbserver/events.go index 413c84b96..5d8006e70 100644 --- a/pkg/cri/sbserver/events.go +++ b/pkg/cri/sbserver/events.go @@ -25,6 +25,7 @@ import ( "github.com/containerd/containerd" eventtypes "github.com/containerd/containerd/api/events" + apitasks "github.com/containerd/containerd/api/services/tasks/v1" containerdio "github.com/containerd/containerd/cio" "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/events" @@ -404,6 +405,51 @@ func handleContainerExit(ctx context.Context, e *eventtypes.TaskExit, cntr conta // Move on to make sure container status is updated. } } + + // NOTE: Both sb.Container.Task and task.Delete interface always ensures + // that the status of target task. However, the interfaces return + // ErrNotFound, which doesn't mean that the shim instance doesn't exist. + // + // There are two caches for task in containerd: + // + // 1. io.containerd.service.v1.tasks-service + // 2. io.containerd.runtime.v2.task + // + // First one is to maintain the shim connection and shutdown the shim + // in Delete API. And the second one is to maintain the lifecycle of + // task in shim server. + // + // So, if the shim instance is running and task has been deleted in shim + // server, the sb.Container.Task and task.Delete will receive the + // ErrNotFound. If we don't delete the shim instance in io.containerd.service.v1.tasks-service, + // shim will be leaky. + // + // Based on containerd/containerd#7496 issue, when host is under IO + // pressure, the umount2 syscall will take more than 10 seconds so that + // the CRI plugin will cancel this task.Delete call. However, the shim + // server isn't aware about this. After return from umount2 syscall, the + // shim server continue delete the task record. And then CRI plugin + // retries to delete task and retrieves ErrNotFound and marks it as + // stopped. Therefore, The shim is leaky. + // + // It's hard to handle the connection lost or request canceled cases in + // shim server. We should call Delete API to io.containerd.service.v1.tasks-service + // to ensure that shim instance is shutdown. + // + // REF: + // 1. https://github.com/containerd/containerd/issues/7496#issuecomment-1671100968 + // 2. https://github.com/containerd/containerd/issues/8931 + if errdefs.IsNotFound(err) { + _, err = c.client.TaskService().Delete(ctx, &apitasks.DeleteTaskRequest{ContainerID: cntr.Container.ID()}) + if err != nil { + err = errdefs.FromGRPC(err) + if !errdefs.IsNotFound(err) { + return fmt.Errorf("failed to cleanup container %s in task-service: %w", cntr.Container.ID(), err) + } + } + log.L.Infof("Ensure that container %s in task-service has been cleanup successfully", cntr.Container.ID()) + } + err = cntr.Status.UpdateSync(func(status containerstore.Status) (containerstore.Status, error) { if status.FinishedAt == 0 { status.Pid = 0 diff --git a/pkg/cri/sbserver/podsandbox/sandbox_delete.go b/pkg/cri/sbserver/podsandbox/sandbox_delete.go index 4e388cd01..9b899f944 100644 --- a/pkg/cri/sbserver/podsandbox/sandbox_delete.go +++ b/pkg/cri/sbserver/podsandbox/sandbox_delete.go @@ -21,6 +21,7 @@ import ( "fmt" "github.com/containerd/containerd" + apitasks "github.com/containerd/containerd/api/services/tasks/v1" "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/log" ) @@ -49,6 +50,10 @@ func (c *Controller) Shutdown(ctx context.Context, sandboxID string) error { // Delete sandbox container. if sandbox.Container != nil { + if err := c.cleanupSandboxTask(ctx, sandbox.Container); err != nil { + return fmt.Errorf("failed to delete sandbox task %q: %w", sandboxID, err) + } + if err := sandbox.Container.Delete(ctx, containerd.WithSnapshotCleanup); err != nil { if !errdefs.IsNotFound(err) { return fmt.Errorf("failed to delete sandbox container %q: %w", sandboxID, err) @@ -59,3 +64,63 @@ func (c *Controller) Shutdown(ctx context.Context, sandboxID string) error { return nil } + +func (c *Controller) cleanupSandboxTask(ctx context.Context, sbCntr containerd.Container) error { + task, err := sbCntr.Task(ctx, nil) + if err != nil { + if !errdefs.IsNotFound(err) { + return fmt.Errorf("failed to load task for sandbox: %w", err) + } + } else { + if _, err = task.Delete(ctx, containerd.WithProcessKill); err != nil { + if !errdefs.IsNotFound(err) { + return fmt.Errorf("failed to stop sandbox: %w", err) + } + } + } + + // NOTE: Both sb.Container.Task and task.Delete interface always ensures + // that the status of target task. However, the interfaces return + // ErrNotFound, which doesn't mean that the shim instance doesn't exist. + // + // There are two caches for task in containerd: + // + // 1. io.containerd.service.v1.tasks-service + // 2. io.containerd.runtime.v2.task + // + // First one is to maintain the shim connection and shutdown the shim + // in Delete API. And the second one is to maintain the lifecycle of + // task in shim server. + // + // So, if the shim instance is running and task has been deleted in shim + // server, the sb.Container.Task and task.Delete will receive the + // ErrNotFound. If we don't delete the shim instance in io.containerd.service.v1.tasks-service, + // shim will be leaky. + // + // Based on containerd/containerd#7496 issue, when host is under IO + // pressure, the umount2 syscall will take more than 10 seconds so that + // the CRI plugin will cancel this task.Delete call. However, the shim + // server isn't aware about this. After return from umount2 syscall, the + // shim server continue delete the task record. And then CRI plugin + // retries to delete task and retrieves ErrNotFound and marks it as + // stopped. Therefore, The shim is leaky. + // + // It's hard to handle the connection lost or request canceled cases in + // shim server. We should call Delete API to io.containerd.service.v1.tasks-service + // to ensure that shim instance is shutdown. + // + // REF: + // 1. https://github.com/containerd/containerd/issues/7496#issuecomment-1671100968 + // 2. https://github.com/containerd/containerd/issues/8931 + if errdefs.IsNotFound(err) { + _, err = c.client.TaskService().Delete(ctx, &apitasks.DeleteTaskRequest{ContainerID: sbCntr.ID()}) + if err != nil { + err = errdefs.FromGRPC(err) + if !errdefs.IsNotFound(err) { + return fmt.Errorf("failed to cleanup sandbox %s in task-service: %w", sbCntr.ID(), err) + } + } + log.G(ctx).Infof("Ensure that sandbox %s in task-service has been cleanup successfully", sbCntr.ID()) + } + return nil +} diff --git a/pkg/cri/server/events.go b/pkg/cri/server/events.go index e33e9e908..58d1c45fa 100644 --- a/pkg/cri/server/events.go +++ b/pkg/cri/server/events.go @@ -25,6 +25,7 @@ import ( "github.com/containerd/containerd" eventtypes "github.com/containerd/containerd/api/events" + apitasks "github.com/containerd/containerd/api/services/tasks/v1" containerdio "github.com/containerd/containerd/cio" "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/events" @@ -393,6 +394,51 @@ func handleContainerExit(ctx context.Context, e *eventtypes.TaskExit, cntr conta // Move on to make sure container status is updated. } } + + // NOTE: Both sb.Container.Task and task.Delete interface always ensures + // that the status of target task. However, the interfaces return + // ErrNotFound, which doesn't mean that the shim instance doesn't exist. + // + // There are two caches for task in containerd: + // + // 1. io.containerd.service.v1.tasks-service + // 2. io.containerd.runtime.v2.task + // + // First one is to maintain the shim connection and shutdown the shim + // in Delete API. And the second one is to maintain the lifecycle of + // task in shim server. + // + // So, if the shim instance is running and task has been deleted in shim + // server, the sb.Container.Task and task.Delete will receive the + // ErrNotFound. If we don't delete the shim instance in io.containerd.service.v1.tasks-service, + // shim will be leaky. + // + // Based on containerd/containerd#7496 issue, when host is under IO + // pressure, the umount2 syscall will take more than 10 seconds so that + // the CRI plugin will cancel this task.Delete call. However, the shim + // server isn't aware about this. After return from umount2 syscall, the + // shim server continue delete the task record. And then CRI plugin + // retries to delete task and retrieves ErrNotFound and marks it as + // stopped. Therefore, The shim is leaky. + // + // It's hard to handle the connection lost or request canceled cases in + // shim server. We should call Delete API to io.containerd.service.v1.tasks-service + // to ensure that shim instance is shutdown. + // + // REF: + // 1. https://github.com/containerd/containerd/issues/7496#issuecomment-1671100968 + // 2. https://github.com/containerd/containerd/issues/8931 + if errdefs.IsNotFound(err) { + _, err = c.client.TaskService().Delete(ctx, &apitasks.DeleteTaskRequest{ContainerID: cntr.Container.ID()}) + if err != nil { + err = errdefs.FromGRPC(err) + if !errdefs.IsNotFound(err) { + return fmt.Errorf("failed to cleanup container %s in task-service: %w", cntr.Container.ID(), err) + } + } + logrus.Infof("Ensure that container %s in task-service has been cleanup successfully", cntr.Container.ID()) + } + err = cntr.Status.UpdateSync(func(status containerstore.Status) (containerstore.Status, error) { if status.FinishedAt == 0 { status.Pid = 0 @@ -434,6 +480,50 @@ func handleSandboxExit(ctx context.Context, e *eventtypes.TaskExit, sb sandboxst // Move on to make sure container status is updated. } } + + // NOTE: Both sb.Container.Task and task.Delete interface always ensures + // that the status of target task. However, the interfaces return + // ErrNotFound, which doesn't mean that the shim instance doesn't exist. + // + // There are two caches for task in containerd: + // + // 1. io.containerd.service.v1.tasks-service + // 2. io.containerd.runtime.v2.task + // + // First one is to maintain the shim connection and shutdown the shim + // in Delete API. And the second one is to maintain the lifecycle of + // task in shim server. + // + // So, if the shim instance is running and task has been deleted in shim + // server, the sb.Container.Task and task.Delete will receive the + // ErrNotFound. If we don't delete the shim instance in io.containerd.service.v1.tasks-service, + // shim will be leaky. + // + // Based on containerd/containerd#7496 issue, when host is under IO + // pressure, the umount2 syscall will take more than 10 seconds so that + // the CRI plugin will cancel this task.Delete call. However, the shim + // server isn't aware about this. After return from umount2 syscall, the + // shim server continue delete the task record. And then CRI plugin + // retries to delete task and retrieves ErrNotFound and marks it as + // stopped. Therefore, The shim is leaky. + // + // It's hard to handle the connection lost or request canceled cases in + // shim server. We should call Delete API to io.containerd.service.v1.tasks-service + // to ensure that shim instance is shutdown. + // + // REF: + // 1. https://github.com/containerd/containerd/issues/7496#issuecomment-1671100968 + // 2. https://github.com/containerd/containerd/issues/8931 + if errdefs.IsNotFound(err) { + _, err = c.client.TaskService().Delete(ctx, &apitasks.DeleteTaskRequest{ContainerID: sb.Container.ID()}) + if err != nil { + err = errdefs.FromGRPC(err) + if !errdefs.IsNotFound(err) { + return fmt.Errorf("failed to cleanup sandbox %s in task-service: %w", sb.Container.ID(), err) + } + } + logrus.Infof("Ensure that sandbox %s in task-service has been cleanup successfully", sb.Container.ID()) + } err = sb.Status.Update(func(status sandboxstore.Status) (sandboxstore.Status, error) { status.State = sandboxstore.StateNotReady status.Pid = 0 diff --git a/runtime/v2/shim.go b/runtime/v2/shim.go index 7524fe280..f4265c17f 100644 --- a/runtime/v2/shim.go +++ b/runtime/v2/shim.go @@ -458,6 +458,12 @@ func (s *shimTask) delete(ctx context.Context, sandboxed bool, removeTask func(c // If not, the shim has been delivered the exit and delete events. // So we should remove the record and prevent duplicate events from // ttrpc-callback-on-close. + // + // TODO: It's hard to guarantee that the event is unique and sent only + // once. The moby/moby should not rely on that assumption that there is + // only one exit event. The moby/moby should handle the duplicate events. + // + // REF: https://github.com/containerd/containerd/issues/4769 if shimErr == nil { removeTask(ctx, s.ID()) } @@ -466,7 +472,11 @@ func (s *shimTask) delete(ctx context.Context, sandboxed bool, removeTask func(c // Let controller decide when to shutdown. if !sandboxed { if err := s.waitShutdown(ctx); err != nil { - log.G(ctx).WithField("id", s.ID()).WithError(err).Error("failed to shutdown shim task") + // FIXME(fuweid): + // + // If the error is context canceled, should we use context.TODO() + // to wait for it? + log.G(ctx).WithField("id", s.ID()).WithError(err).Error("failed to shutdown shim task and the shim might be leaked") } }