runtime: should not send duplicate task exit event
If the shim has been killed and ttrpc connection has been closed, the shimErr will not be nil. For this case, the event subscriber, like moby/moby, might have received the exit or delete events. Just in case, we should allow ttrpc-callback-on-close to send the exit and delete events again. And the exit status will depend on result of shimV2.Delete. If not, the shim has been delivered the exit and delete events. So we should remove the task record and prevent duplicate events from ttrpc-callback-on-close. Fix: #4769 Signed-off-by: Wei Fu <fuweid89@gmail.com>
This commit is contained in:
parent
cb2dcb04fc
commit
faec5d4ffd
@ -36,18 +36,128 @@ import (
|
||||
|
||||
"github.com/containerd/cgroups"
|
||||
cgroupsv2 "github.com/containerd/cgroups/v2"
|
||||
apievents "github.com/containerd/containerd/api/events"
|
||||
"github.com/containerd/containerd/cio"
|
||||
"github.com/containerd/containerd/containers"
|
||||
"github.com/containerd/containerd/errdefs"
|
||||
"github.com/containerd/containerd/images"
|
||||
"github.com/containerd/containerd/log/logtest"
|
||||
"github.com/containerd/containerd/namespaces"
|
||||
"github.com/containerd/containerd/oci"
|
||||
"github.com/containerd/containerd/plugin"
|
||||
"github.com/containerd/containerd/runtime/linux/runctypes"
|
||||
"github.com/containerd/containerd/runtime/v2/runc/options"
|
||||
"github.com/containerd/containerd/sys"
|
||||
"github.com/containerd/typeurl"
|
||||
specs "github.com/opencontainers/runtime-spec/specs-go"
|
||||
"github.com/pkg/errors"
|
||||
"golang.org/x/sys/unix"
|
||||
)
|
||||
|
||||
// TestRegressionIssue4769 verifies the number of task exit events.
|
||||
//
|
||||
// Issue: https://github.com/containerd/containerd/issues/4769.
|
||||
func TestRegressionIssue4769(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
client, err := newClient(t, address)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer client.Close()
|
||||
|
||||
// use unique namespace to get unique task events
|
||||
id := t.Name()
|
||||
ns := fmt.Sprintf("%s-%s", testNamespace, id)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
ctx = namespaces.WithNamespace(ctx, ns)
|
||||
ctx = logtest.WithT(ctx, t)
|
||||
|
||||
image, err := client.Pull(ctx, testImage, WithPullUnpack)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer client.ImageService().Delete(ctx, testImage, images.SynchronousDelete())
|
||||
|
||||
container, err := client.NewContainer(ctx, id,
|
||||
WithNewSnapshot(id, image),
|
||||
WithNewSpec(oci.WithImageConfig(image), withTrue()),
|
||||
WithRuntime(client.runtime, nil),
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer container.Delete(ctx, WithSnapshotCleanup)
|
||||
|
||||
task, err := container.NewTask(ctx, empty())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer task.Delete(ctx)
|
||||
|
||||
statusC, err := task.Wait(ctx)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
eventStream, errC := client.EventService().Subscribe(ctx, "namespace=="+ns+",topic~=|^/tasks/exit|")
|
||||
|
||||
if err := task.Start(ctx); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
var timeout = 3 * time.Second
|
||||
|
||||
select {
|
||||
case et := <-statusC:
|
||||
if got := et.ExitCode(); got != 0 {
|
||||
t.Fatal(errors.Errorf("expect zero exit status, but got %v", got))
|
||||
}
|
||||
case <-time.After(timeout):
|
||||
t.Fatal(fmt.Errorf("failed to get exit event in time"))
|
||||
}
|
||||
|
||||
// start to check events
|
||||
select {
|
||||
case et := <-eventStream:
|
||||
if et.Event == nil {
|
||||
t.Fatal(errors.Errorf("unexpect empty event: %+v", et))
|
||||
}
|
||||
|
||||
v, err := typeurl.UnmarshalAny(et.Event)
|
||||
if err != nil {
|
||||
t.Fatal(errors.Wrap(err, "failed to unmarshal event"))
|
||||
}
|
||||
|
||||
if e, ok := v.(*apievents.TaskExit); !ok {
|
||||
t.Fatal(errors.Errorf("unexpect event type: %+v", v))
|
||||
} else if e.ExitStatus != 0 {
|
||||
t.Fatal(errors.Errorf("expect zero exit status, but got %v", e.ExitStatus))
|
||||
}
|
||||
case err := <-errC:
|
||||
t.Fatal(errors.Wrap(err, "unexpected error from event service"))
|
||||
|
||||
case <-time.After(timeout):
|
||||
t.Fatal(fmt.Errorf("failed to get exit event in time"))
|
||||
}
|
||||
|
||||
if _, err := task.Delete(ctx); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// check duplicate event should not show up
|
||||
select {
|
||||
case event := <-eventStream:
|
||||
t.Fatal(errors.Errorf("unexpected exit event: %+v", event))
|
||||
case err := <-errC:
|
||||
t.Fatal(errors.Wrap(err, "unexpected error from event service"))
|
||||
case <-time.After(timeout):
|
||||
}
|
||||
}
|
||||
|
||||
func TestTaskUpdate(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
|
@ -30,7 +30,6 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
// Register the typeurl
|
||||
"github.com/containerd/containerd/cio"
|
||||
"github.com/containerd/containerd/containers"
|
||||
"github.com/containerd/containerd/namespaces"
|
||||
|
@ -240,6 +240,21 @@ func (s *shim) Delete(ctx context.Context) (*runtime.Exit, error) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// NOTE: If the shim has been killed and ttrpc connection has been
|
||||
// closed, the shimErr will not be nil. For this case, the event
|
||||
// subscriber, like moby/moby, might have received the exit or delete
|
||||
// events. Just in case, we should allow ttrpc-callback-on-close to
|
||||
// send the exit and delete events again. And the exit status will
|
||||
// depend on result of shimV2.Delete.
|
||||
//
|
||||
// If not, the shim has been delivered the exit and delete events.
|
||||
// So we should remove the record and prevent duplicate events from
|
||||
// ttrpc-callback-on-close.
|
||||
if shimErr == nil {
|
||||
s.rtTasks.Delete(ctx, s.ID())
|
||||
}
|
||||
|
||||
if err := s.waitShutdown(ctx); err != nil {
|
||||
log.G(ctx).WithField("id", s.ID()).WithError(err).Error("failed to shutdown shim")
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user