Merge pull request #4784 from fuweid/fix-4769

runtime: should not send duplicate task exit event
This commit is contained in:
Akihiro Suda 2020-12-02 15:26:57 +09:00 committed by GitHub
commit 7126310a09
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 125 additions and 1 deletions

View File

@ -36,18 +36,128 @@ import (
"github.com/containerd/cgroups" "github.com/containerd/cgroups"
cgroupsv2 "github.com/containerd/cgroups/v2" cgroupsv2 "github.com/containerd/cgroups/v2"
apievents "github.com/containerd/containerd/api/events"
"github.com/containerd/containerd/cio" "github.com/containerd/containerd/cio"
"github.com/containerd/containerd/containers" "github.com/containerd/containerd/containers"
"github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/log/logtest"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/oci" "github.com/containerd/containerd/oci"
"github.com/containerd/containerd/plugin" "github.com/containerd/containerd/plugin"
"github.com/containerd/containerd/runtime/linux/runctypes" "github.com/containerd/containerd/runtime/linux/runctypes"
"github.com/containerd/containerd/runtime/v2/runc/options" "github.com/containerd/containerd/runtime/v2/runc/options"
"github.com/containerd/containerd/sys" "github.com/containerd/containerd/sys"
"github.com/containerd/typeurl"
specs "github.com/opencontainers/runtime-spec/specs-go" specs "github.com/opencontainers/runtime-spec/specs-go"
"github.com/pkg/errors"
"golang.org/x/sys/unix" "golang.org/x/sys/unix"
) )
// TestRegressionIssue4769 verifies the number of task exit events.
//
// Issue: https://github.com/containerd/containerd/issues/4769.
func TestRegressionIssue4769(t *testing.T) {
t.Parallel()
client, err := newClient(t, address)
if err != nil {
t.Fatal(err)
}
defer client.Close()
// use unique namespace to get unique task events
id := t.Name()
ns := fmt.Sprintf("%s-%s", testNamespace, id)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ctx = namespaces.WithNamespace(ctx, ns)
ctx = logtest.WithT(ctx, t)
image, err := client.Pull(ctx, testImage, WithPullUnpack)
if err != nil {
t.Fatal(err)
}
defer client.ImageService().Delete(ctx, testImage, images.SynchronousDelete())
container, err := client.NewContainer(ctx, id,
WithNewSnapshot(id, image),
WithNewSpec(oci.WithImageConfig(image), withTrue()),
WithRuntime(client.runtime, nil),
)
if err != nil {
t.Fatal(err)
}
defer container.Delete(ctx, WithSnapshotCleanup)
task, err := container.NewTask(ctx, empty())
if err != nil {
t.Fatal(err)
}
defer task.Delete(ctx)
statusC, err := task.Wait(ctx)
if err != nil {
t.Fatal(err)
}
eventStream, errC := client.EventService().Subscribe(ctx, "namespace=="+ns+",topic~=|^/tasks/exit|")
if err := task.Start(ctx); err != nil {
t.Fatal(err)
}
var timeout = 3 * time.Second
select {
case et := <-statusC:
if got := et.ExitCode(); got != 0 {
t.Fatal(errors.Errorf("expect zero exit status, but got %v", got))
}
case <-time.After(timeout):
t.Fatal(fmt.Errorf("failed to get exit event in time"))
}
// start to check events
select {
case et := <-eventStream:
if et.Event == nil {
t.Fatal(errors.Errorf("unexpect empty event: %+v", et))
}
v, err := typeurl.UnmarshalAny(et.Event)
if err != nil {
t.Fatal(errors.Wrap(err, "failed to unmarshal event"))
}
if e, ok := v.(*apievents.TaskExit); !ok {
t.Fatal(errors.Errorf("unexpect event type: %+v", v))
} else if e.ExitStatus != 0 {
t.Fatal(errors.Errorf("expect zero exit status, but got %v", e.ExitStatus))
}
case err := <-errC:
t.Fatal(errors.Wrap(err, "unexpected error from event service"))
case <-time.After(timeout):
t.Fatal(fmt.Errorf("failed to get exit event in time"))
}
if _, err := task.Delete(ctx); err != nil {
t.Fatal(err)
}
// check duplicate event should not show up
select {
case event := <-eventStream:
t.Fatal(errors.Errorf("unexpected exit event: %+v", event))
case err := <-errC:
t.Fatal(errors.Wrap(err, "unexpected error from event service"))
case <-time.After(timeout):
}
}
func TestTaskUpdate(t *testing.T) { func TestTaskUpdate(t *testing.T) {
t.Parallel() t.Parallel()

View File

@ -30,7 +30,6 @@ import (
"testing" "testing"
"time" "time"
// Register the typeurl
"github.com/containerd/containerd/cio" "github.com/containerd/containerd/cio"
"github.com/containerd/containerd/containers" "github.com/containerd/containerd/containers"
"github.com/containerd/containerd/namespaces" "github.com/containerd/containerd/namespaces"

View File

@ -240,6 +240,21 @@ func (s *shim) Delete(ctx context.Context) (*runtime.Exit, error) {
} }
} }
} }
// NOTE: If the shim has been killed and ttrpc connection has been
// closed, the shimErr will not be nil. For this case, the event
// subscriber, like moby/moby, might have received the exit or delete
// events. Just in case, we should allow ttrpc-callback-on-close to
// send the exit and delete events again. And the exit status will
// depend on result of shimV2.Delete.
//
// If not, the shim has been delivered the exit and delete events.
// So we should remove the record and prevent duplicate events from
// ttrpc-callback-on-close.
if shimErr == nil {
s.rtTasks.Delete(ctx, s.ID())
}
if err := s.waitShutdown(ctx); err != nil { if err := s.waitShutdown(ctx); err != nil {
log.G(ctx).WithField("id", s.ID()).WithError(err).Error("failed to shutdown shim") log.G(ctx).WithField("id", s.ID()).WithError(err).Error("failed to shutdown shim")
} }