diff --git a/container_linux_test.go b/container_linux_test.go index 2db040b51..ec02042ad 100644 --- a/container_linux_test.go +++ b/container_linux_test.go @@ -36,18 +36,128 @@ import ( "github.com/containerd/cgroups" cgroupsv2 "github.com/containerd/cgroups/v2" + apievents "github.com/containerd/containerd/api/events" "github.com/containerd/containerd/cio" "github.com/containerd/containerd/containers" "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/images" + "github.com/containerd/containerd/log/logtest" + "github.com/containerd/containerd/namespaces" "github.com/containerd/containerd/oci" "github.com/containerd/containerd/plugin" "github.com/containerd/containerd/runtime/linux/runctypes" "github.com/containerd/containerd/runtime/v2/runc/options" "github.com/containerd/containerd/sys" + "github.com/containerd/typeurl" specs "github.com/opencontainers/runtime-spec/specs-go" + "github.com/pkg/errors" "golang.org/x/sys/unix" ) +// TestRegressionIssue4769 verifies the number of task exit events. +// +// Issue: https://github.com/containerd/containerd/issues/4769. +func TestRegressionIssue4769(t *testing.T) { + t.Parallel() + + client, err := newClient(t, address) + if err != nil { + t.Fatal(err) + } + defer client.Close() + + // use unique namespace to get unique task events + id := t.Name() + ns := fmt.Sprintf("%s-%s", testNamespace, id) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ctx = namespaces.WithNamespace(ctx, ns) + ctx = logtest.WithT(ctx, t) + + image, err := client.Pull(ctx, testImage, WithPullUnpack) + if err != nil { + t.Fatal(err) + } + defer client.ImageService().Delete(ctx, testImage, images.SynchronousDelete()) + + container, err := client.NewContainer(ctx, id, + WithNewSnapshot(id, image), + WithNewSpec(oci.WithImageConfig(image), withTrue()), + WithRuntime(client.runtime, nil), + ) + if err != nil { + t.Fatal(err) + } + defer container.Delete(ctx, WithSnapshotCleanup) + + task, err := container.NewTask(ctx, empty()) + if err != nil { + t.Fatal(err) + } + defer task.Delete(ctx) + + statusC, err := task.Wait(ctx) + if err != nil { + t.Fatal(err) + } + + eventStream, errC := client.EventService().Subscribe(ctx, "namespace=="+ns+",topic~=|^/tasks/exit|") + + if err := task.Start(ctx); err != nil { + t.Fatal(err) + } + + var timeout = 3 * time.Second + + select { + case et := <-statusC: + if got := et.ExitCode(); got != 0 { + t.Fatal(errors.Errorf("expect zero exit status, but got %v", got)) + } + case <-time.After(timeout): + t.Fatal(fmt.Errorf("failed to get exit event in time")) + } + + // start to check events + select { + case et := <-eventStream: + if et.Event == nil { + t.Fatal(errors.Errorf("unexpect empty event: %+v", et)) + } + + v, err := typeurl.UnmarshalAny(et.Event) + if err != nil { + t.Fatal(errors.Wrap(err, "failed to unmarshal event")) + } + + if e, ok := v.(*apievents.TaskExit); !ok { + t.Fatal(errors.Errorf("unexpect event type: %+v", v)) + } else if e.ExitStatus != 0 { + t.Fatal(errors.Errorf("expect zero exit status, but got %v", e.ExitStatus)) + } + case err := <-errC: + t.Fatal(errors.Wrap(err, "unexpected error from event service")) + + case <-time.After(timeout): + t.Fatal(fmt.Errorf("failed to get exit event in time")) + } + + if _, err := task.Delete(ctx); err != nil { + t.Fatal(err) + } + + // check duplicate event should not show up + select { + case event := <-eventStream: + t.Fatal(errors.Errorf("unexpected exit event: %+v", event)) + case err := <-errC: + t.Fatal(errors.Wrap(err, "unexpected error from event service")) + case <-time.After(timeout): + } +} + func TestTaskUpdate(t *testing.T) { t.Parallel() diff --git a/container_test.go b/container_test.go index 530245a33..111dab1bd 100644 --- a/container_test.go +++ b/container_test.go @@ -30,7 +30,6 @@ import ( "testing" "time" - // Register the typeurl "github.com/containerd/containerd/cio" "github.com/containerd/containerd/containers" "github.com/containerd/containerd/namespaces" diff --git a/runtime/v2/shim.go b/runtime/v2/shim.go index 05618b1eb..9a6b0ec70 100644 --- a/runtime/v2/shim.go +++ b/runtime/v2/shim.go @@ -240,6 +240,21 @@ func (s *shim) Delete(ctx context.Context) (*runtime.Exit, error) { } } } + + // NOTE: If the shim has been killed and ttrpc connection has been + // closed, the shimErr will not be nil. For this case, the event + // subscriber, like moby/moby, might have received the exit or delete + // events. Just in case, we should allow ttrpc-callback-on-close to + // send the exit and delete events again. And the exit status will + // depend on result of shimV2.Delete. + // + // If not, the shim has been delivered the exit and delete events. + // So we should remove the record and prevent duplicate events from + // ttrpc-callback-on-close. + if shimErr == nil { + s.rtTasks.Delete(ctx, s.ID()) + } + if err := s.waitShutdown(ctx); err != nil { log.G(ctx).WithField("id", s.ID()).WithError(err).Error("failed to shutdown shim") }