From faec5d4ffd46997fda35e6808d230ab82b80e9e4 Mon Sep 17 00:00:00 2001 From: Wei Fu Date: Thu, 26 Nov 2020 20:14:18 +0800 Subject: [PATCH] runtime: should not send duplicate task exit event If the shim has been killed and ttrpc connection has been closed, the shimErr will not be nil. For this case, the event subscriber, like moby/moby, might have received the exit or delete events. Just in case, we should allow ttrpc-callback-on-close to send the exit and delete events again. And the exit status will depend on result of shimV2.Delete. If not, the shim has been delivered the exit and delete events. So we should remove the task record and prevent duplicate events from ttrpc-callback-on-close. Fix: #4769 Signed-off-by: Wei Fu --- container_linux_test.go | 110 ++++++++++++++++++++++++++++++++++++++++ container_test.go | 1 - runtime/v2/shim.go | 15 ++++++ 3 files changed, 125 insertions(+), 1 deletion(-) diff --git a/container_linux_test.go b/container_linux_test.go index 2db040b51..ec02042ad 100644 --- a/container_linux_test.go +++ b/container_linux_test.go @@ -36,18 +36,128 @@ import ( "github.com/containerd/cgroups" cgroupsv2 "github.com/containerd/cgroups/v2" + apievents "github.com/containerd/containerd/api/events" "github.com/containerd/containerd/cio" "github.com/containerd/containerd/containers" "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/images" + "github.com/containerd/containerd/log/logtest" + "github.com/containerd/containerd/namespaces" "github.com/containerd/containerd/oci" "github.com/containerd/containerd/plugin" "github.com/containerd/containerd/runtime/linux/runctypes" "github.com/containerd/containerd/runtime/v2/runc/options" "github.com/containerd/containerd/sys" + "github.com/containerd/typeurl" specs "github.com/opencontainers/runtime-spec/specs-go" + "github.com/pkg/errors" "golang.org/x/sys/unix" ) +// TestRegressionIssue4769 verifies the number of task exit events. +// +// Issue: https://github.com/containerd/containerd/issues/4769. +func TestRegressionIssue4769(t *testing.T) { + t.Parallel() + + client, err := newClient(t, address) + if err != nil { + t.Fatal(err) + } + defer client.Close() + + // use unique namespace to get unique task events + id := t.Name() + ns := fmt.Sprintf("%s-%s", testNamespace, id) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ctx = namespaces.WithNamespace(ctx, ns) + ctx = logtest.WithT(ctx, t) + + image, err := client.Pull(ctx, testImage, WithPullUnpack) + if err != nil { + t.Fatal(err) + } + defer client.ImageService().Delete(ctx, testImage, images.SynchronousDelete()) + + container, err := client.NewContainer(ctx, id, + WithNewSnapshot(id, image), + WithNewSpec(oci.WithImageConfig(image), withTrue()), + WithRuntime(client.runtime, nil), + ) + if err != nil { + t.Fatal(err) + } + defer container.Delete(ctx, WithSnapshotCleanup) + + task, err := container.NewTask(ctx, empty()) + if err != nil { + t.Fatal(err) + } + defer task.Delete(ctx) + + statusC, err := task.Wait(ctx) + if err != nil { + t.Fatal(err) + } + + eventStream, errC := client.EventService().Subscribe(ctx, "namespace=="+ns+",topic~=|^/tasks/exit|") + + if err := task.Start(ctx); err != nil { + t.Fatal(err) + } + + var timeout = 3 * time.Second + + select { + case et := <-statusC: + if got := et.ExitCode(); got != 0 { + t.Fatal(errors.Errorf("expect zero exit status, but got %v", got)) + } + case <-time.After(timeout): + t.Fatal(fmt.Errorf("failed to get exit event in time")) + } + + // start to check events + select { + case et := <-eventStream: + if et.Event == nil { + t.Fatal(errors.Errorf("unexpect empty event: %+v", et)) + } + + v, err := typeurl.UnmarshalAny(et.Event) + if err != nil { + t.Fatal(errors.Wrap(err, "failed to unmarshal event")) + } + + if e, ok := v.(*apievents.TaskExit); !ok { + t.Fatal(errors.Errorf("unexpect event type: %+v", v)) + } else if e.ExitStatus != 0 { + t.Fatal(errors.Errorf("expect zero exit status, but got %v", e.ExitStatus)) + } + case err := <-errC: + t.Fatal(errors.Wrap(err, "unexpected error from event service")) + + case <-time.After(timeout): + t.Fatal(fmt.Errorf("failed to get exit event in time")) + } + + if _, err := task.Delete(ctx); err != nil { + t.Fatal(err) + } + + // check duplicate event should not show up + select { + case event := <-eventStream: + t.Fatal(errors.Errorf("unexpected exit event: %+v", event)) + case err := <-errC: + t.Fatal(errors.Wrap(err, "unexpected error from event service")) + case <-time.After(timeout): + } +} + func TestTaskUpdate(t *testing.T) { t.Parallel() diff --git a/container_test.go b/container_test.go index 530245a33..111dab1bd 100644 --- a/container_test.go +++ b/container_test.go @@ -30,7 +30,6 @@ import ( "testing" "time" - // Register the typeurl "github.com/containerd/containerd/cio" "github.com/containerd/containerd/containers" "github.com/containerd/containerd/namespaces" diff --git a/runtime/v2/shim.go b/runtime/v2/shim.go index 05618b1eb..9a6b0ec70 100644 --- a/runtime/v2/shim.go +++ b/runtime/v2/shim.go @@ -240,6 +240,21 @@ func (s *shim) Delete(ctx context.Context) (*runtime.Exit, error) { } } } + + // NOTE: If the shim has been killed and ttrpc connection has been + // closed, the shimErr will not be nil. For this case, the event + // subscriber, like moby/moby, might have received the exit or delete + // events. Just in case, we should allow ttrpc-callback-on-close to + // send the exit and delete events again. And the exit status will + // depend on result of shimV2.Delete. + // + // If not, the shim has been delivered the exit and delete events. + // So we should remove the record and prevent duplicate events from + // ttrpc-callback-on-close. + if shimErr == nil { + s.rtTasks.Delete(ctx, s.ID()) + } + if err := s.waitShutdown(ctx); err != nil { log.G(ctx).WithField("id", s.ID()).WithError(err).Error("failed to shutdown shim") }