diff --git a/runtime/v2/binary.go b/runtime/v2/binary.go index eedecba28..6c0407f58 100644 --- a/runtime/v2/binary.go +++ b/runtime/v2/binary.go @@ -52,7 +52,7 @@ type binary struct { rtTasks *runtime.TaskList } -func (b *binary) Start(ctx context.Context) (_ *shim, err error) { +func (b *binary) Start(ctx context.Context, onClose func()) (_ *shim, err error) { args := []string{"-id", b.bundle.ID} if logrus.GetLevel() == logrus.DebugLevel { args = append(args, "-debug") @@ -96,7 +96,7 @@ func (b *binary) Start(ctx context.Context) (_ *shim, err error) { if err != nil { return nil, err } - client := ttrpc.NewClient(conn, ttrpc.WithOnClose(func() { _ = conn.Close() })) + client := ttrpc.NewClient(conn, ttrpc.WithOnClose(onClose)) return &shim{ bundle: b.bundle, client: client, @@ -147,9 +147,6 @@ func (b *binary) Delete(ctx context.Context) (*runtime.Exit, error) { if err := b.bundle.Delete(); err != nil { return nil, err } - // remove self from the runtime task list - // this seems dirty but it cleans up the API across runtimes, tasks, and the service - b.rtTasks.Delete(ctx, b.bundle.ID) return &runtime.Exit{ Status: response.ExitStatus, Timestamp: response.ExitedAt, diff --git a/runtime/v2/manager.go b/runtime/v2/manager.go index e4b21284c..19daefdba 100644 --- a/runtime/v2/manager.go +++ b/runtime/v2/manager.go @@ -113,6 +113,10 @@ func (m *TaskManager) ID() string { // Create a new task func (m *TaskManager) Create(ctx context.Context, id string, opts runtime.CreateOpts) (_ runtime.Task, err error) { + ns, err := namespaces.NamespaceRequired(ctx) + if err != nil { + return nil, err + } bundle, err := NewBundle(ctx, m.root, m.state, id, opts.Spec.Value) if err != nil { return nil, err @@ -123,7 +127,15 @@ func (m *TaskManager) Create(ctx context.Context, id string, opts runtime.Create } }() b := shimBinary(ctx, bundle, opts.Runtime, m.containerdAddress, m.events, m.tasks) - shim, err := b.Start(ctx) + shim, err := b.Start(ctx, func() { + log.G(ctx).WithField("id", id).Info("shim disconnected") + _, err := m.tasks.Get(ctx, id) + if err != nil { + // Task was never started or was already successfully deleted + return + } + cleanupAfterDeadShim(context.Background(), id, ns, m.events, b) + }) if err != nil { return nil, err } @@ -219,23 +231,27 @@ func (m *TaskManager) loadTasks(ctx context.Context) error { bundle.Delete() continue } - shim, err := loadShim(ctx, bundle, m.events, m.tasks) + container, err := m.container(ctx, id) if err != nil { - log.G(ctx).WithError(err).Errorf("cleanup dead shim %s", id) - container, err := m.container(ctx, id) + log.G(ctx).WithError(err).Errorf("loading container %s", id) + if err := mount.UnmountAll(filepath.Join(bundle.Path, "rootfs"), 0); err != nil { + log.G(ctx).WithError(err).Errorf("forceful unmount of rootfs %s", id) + } + bundle.Delete() + continue + } + binaryCall := shimBinary(ctx, bundle, container.Runtime.Name, m.containerdAddress, m.events, m.tasks) + shim, err := loadShim(ctx, bundle, m.events, m.tasks, func() { + log.G(ctx).WithField("id", id).Info("shim disconnected") + _, err := m.tasks.Get(ctx, id) if err != nil { - log.G(ctx).WithError(err).Errorf("loading dead container %s", id) - if err := mount.UnmountAll(filepath.Join(bundle.Path, "rootfs"), 0); err != nil { - log.G(ctx).WithError(err).Errorf("forceful unmount of rootfs %s", id) - } - bundle.Delete() - continue - } - binaryCall := shimBinary(ctx, bundle, container.Runtime.Name, m.containerdAddress, m.events, m.tasks) - if _, err := binaryCall.Delete(ctx); err != nil { - log.G(ctx).WithError(err).Errorf("binary call to delete for %s", id) - continue + // Task was never started or was already successfully deleted + return } + cleanupAfterDeadShim(context.Background(), id, ns, m.events, binaryCall) + }) + if err != nil { + cleanupAfterDeadShim(ctx, id, ns, m.events, binaryCall) continue } m.tasks.Add(ctx, shim) diff --git a/runtime/v2/shim.go b/runtime/v2/shim.go index 703168697..edf7e83df 100644 --- a/runtime/v2/shim.go +++ b/runtime/v2/shim.go @@ -24,18 +24,21 @@ import ( "path/filepath" "time" + eventstypes "github.com/containerd/containerd/api/events" "github.com/containerd/containerd/api/types" tasktypes "github.com/containerd/containerd/api/types/task" "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/events/exchange" "github.com/containerd/containerd/identifiers" "github.com/containerd/containerd/log" + "github.com/containerd/containerd/namespaces" "github.com/containerd/containerd/runtime" client "github.com/containerd/containerd/runtime/v2/shim" "github.com/containerd/containerd/runtime/v2/task" "github.com/containerd/ttrpc" ptypes "github.com/gogo/protobuf/types" "github.com/pkg/errors" + "github.com/sirupsen/logrus" ) func loadAddress(path string) (string, error) { @@ -46,7 +49,7 @@ func loadAddress(path string) (string, error) { return string(data), nil } -func loadShim(ctx context.Context, bundle *Bundle, events *exchange.Exchange, rt *runtime.TaskList) (_ *shim, err error) { +func loadShim(ctx context.Context, bundle *Bundle, events *exchange.Exchange, rt *runtime.TaskList, onClose func()) (_ *shim, err error) { address, err := loadAddress(filepath.Join(bundle.Path, "address")) if err != nil { return nil, err @@ -55,6 +58,11 @@ func loadShim(ctx context.Context, bundle *Bundle, events *exchange.Exchange, rt if err != nil { return nil, err } + defer func() { + if err != nil { + conn.Close() + } + }() f, err := openShimLog(ctx, bundle) if err != nil { return nil, errors.Wrap(err, "open shim log pipe") @@ -74,7 +82,12 @@ func loadShim(ctx context.Context, bundle *Bundle, events *exchange.Exchange, rt } }() - client := ttrpc.NewClient(conn, ttrpc.WithOnClose(func() { _ = conn.Close() })) + client := ttrpc.NewClient(conn, ttrpc.WithOnClose(onClose)) + defer func() { + if err != nil { + client.Close() + } + }() s := &shim{ client: client, task: task.NewTaskClient(client), @@ -88,6 +101,52 @@ func loadShim(ctx context.Context, bundle *Bundle, events *exchange.Exchange, rt return s, nil } +func cleanupAfterDeadShim(ctx context.Context, id, ns string, events *exchange.Exchange, binaryCall *binary) { + ctx = namespaces.WithNamespace(ctx, ns) + ctx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + log.G(ctx).WithFields(logrus.Fields{ + "id": id, + "namespace": ns, + }).Warn("cleaning up after shim disconnected") + response, err := binaryCall.Delete(ctx) + if err != nil { + log.G(ctx).WithError(err).WithFields(logrus.Fields{ + "id": id, + "namespace": ns, + }).Warn("failed to clean up after shim disconnected") + } + + var ( + pid uint32 + exitStatus uint32 + exitedAt time.Time + ) + if response != nil { + pid = response.Pid + exitStatus = response.Status + exitedAt = response.Timestamp + } else { + exitStatus = 255 + exitedAt = time.Now() + } + events.Publish(ctx, runtime.TaskExitEventTopic, &eventstypes.TaskExit{ + ContainerID: id, + ID: id, + Pid: pid, + ExitStatus: exitStatus, + ExitedAt: exitedAt, + }) + + events.Publish(ctx, runtime.TaskDeleteEventTopic, &eventstypes.TaskDelete{ + ContainerID: id, + Pid: pid, + ExitStatus: exitStatus, + ExitedAt: exitedAt, + }) +} + type shim struct { bundle *Bundle client *ttrpc.Client @@ -119,19 +178,9 @@ func (s *shim) Shutdown(ctx context.Context) error { } func (s *shim) waitShutdown(ctx context.Context) error { - dead := make(chan struct{}) - go func() { - if err := s.Shutdown(ctx); err != nil { - log.G(ctx).WithError(err).Error("shim shutdown error") - } - close(dead) - }() - select { - case <-time.After(3 * time.Second): - return errors.New("failed to shutdown shim in time") - case <-dead: - return nil - } + ctx, cancel := context.WithTimeout(ctx, 3*time.Second) + defer cancel() + return s.Shutdown(ctx) } // ID of the shim/task @@ -154,15 +203,15 @@ func (s *shim) Delete(ctx context.Context) (*runtime.Exit, error) { if err != nil { return nil, errdefs.FromGRPC(err) } - if err := s.waitShutdown(ctx); err != nil { - return nil, err - } - if err := s.bundle.Delete(); err != nil { - return nil, err - } // remove self from the runtime task list // this seems dirty but it cleans up the API across runtimes, tasks, and the service s.rtTasks.Delete(ctx, s.ID()) + if err := s.waitShutdown(ctx); err != nil { + log.G(ctx).WithError(err).Error("failed to shutdown shim") + } + if err := s.bundle.Delete(); err != nil { + log.G(ctx).WithError(err).Error("failed to delete bundle") + } return &runtime.Exit{ Status: response.ExitStatus, Timestamp: response.ExitedAt,