diff --git a/runtime/v2/manager.go b/runtime/v2/manager.go index 9ae6d31b2..085d77b51 100644 --- a/runtime/v2/manager.go +++ b/runtime/v2/manager.go @@ -138,12 +138,8 @@ func (m *TaskManager) Create(ctx context.Context, id string, opts runtime.Create b := shimBinary(ctx, bundle, opts.Runtime, m.containerdAddress, m.containerdTTRPCAddress, m.events, m.tasks) shim, err := b.Start(ctx, topts, func() { log.G(ctx).WithField("id", id).Info("shim disconnected") - _, err := m.tasks.Get(ctx, id) - if err != nil { - // Task was never started or was already successfully deleted - return - } - cleanupAfterDeadShim(context.Background(), id, ns, m.events, b) + + cleanupAfterDeadShim(context.Background(), id, ns, m.tasks, m.events, b) // Remove self from the runtime task list. Even though the cleanupAfterDeadShim() // would publish taskExit event, but the shim.Delete() would always failed with ttrpc // disconnect and there is no chance to remove this dead task from runtime task lists. @@ -266,17 +262,13 @@ func (m *TaskManager) loadTasks(ctx context.Context) error { binaryCall := shimBinary(ctx, bundle, container.Runtime.Name, m.containerdAddress, m.containerdTTRPCAddress, m.events, m.tasks) shim, err := loadShim(ctx, bundle, m.events, m.tasks, func() { log.G(ctx).WithField("id", id).Info("shim disconnected") - _, err := m.tasks.Get(ctx, id) - if err != nil { - // Task was never started or was already successfully deleted - return - } - cleanupAfterDeadShim(context.Background(), id, ns, m.events, binaryCall) + + cleanupAfterDeadShim(context.Background(), id, ns, m.tasks, m.events, binaryCall) // Remove self from the runtime task list. m.tasks.Delete(ctx, id) }) if err != nil { - cleanupAfterDeadShim(ctx, id, ns, m.events, binaryCall) + cleanupAfterDeadShim(ctx, id, ns, m.tasks, m.events, binaryCall) continue } m.tasks.Add(ctx, shim) diff --git a/runtime/v2/shim.go b/runtime/v2/shim.go index 152a1e85b..cde1bf688 100644 --- a/runtime/v2/shim.go +++ b/runtime/v2/shim.go @@ -121,7 +121,7 @@ func loadShim(ctx context.Context, bundle *Bundle, events *exchange.Exchange, rt return s, nil } -func cleanupAfterDeadShim(ctx context.Context, id, ns string, events *exchange.Exchange, binaryCall *binary) { +func cleanupAfterDeadShim(ctx context.Context, id, ns string, rt *runtime.TaskList, events *exchange.Exchange, binaryCall *binary) { ctx = namespaces.WithNamespace(ctx, ns) ctx, cancel := timeout.WithContext(ctx, cleanupTimeout) defer cancel() @@ -138,6 +138,12 @@ func cleanupAfterDeadShim(ctx context.Context, id, ns string, events *exchange.E }).Warn("failed to clean up after shim disconnected") } + if _, err := rt.Get(ctx, id); err != nil { + // Task was never started or was already successfully deleted + // No need to publish events + return + } + var ( pid uint32 exitStatus uint32 @@ -234,13 +240,15 @@ func (s *shim) Delete(ctx context.Context) (*runtime.Exit, error) { } } } - // remove self from the runtime task list - // this seems dirty but it cleans up the API across runtimes, tasks, and the service - s.rtTasks.Delete(ctx, s.ID()) if err := s.waitShutdown(ctx); err != nil { log.G(ctx).WithField("id", s.ID()).WithError(err).Error("failed to shutdown shim") } s.Close() + s.client.UserOnCloseWait(ctx) + + // remove self from the runtime task list + // this seems dirty but it cleans up the API across runtimes, tasks, and the service + s.rtTasks.Delete(ctx, s.ID()) if err := s.bundle.Delete(); err != nil { log.G(ctx).WithField("id", s.ID()).WithError(err).Error("failed to delete bundle") } diff --git a/vendor.conf b/vendor.conf index dbc240084..04c877576 100644 --- a/vendor.conf +++ b/vendor.conf @@ -8,7 +8,7 @@ github.com/containerd/continuity efbc4488d8fe1bdc16bde3b2d299 github.com/containerd/fifo f15a3290365b9d2627d189e619ab4008e0069caf github.com/containerd/go-runc 7016d3ce2328dd2cb1192b2076ebd565c4e8df0c github.com/containerd/nri 0afc7f031eaf9c7d9c1a381b7ab5462e89c998fc -github.com/containerd/ttrpc v1.0.1 +github.com/containerd/ttrpc v1.0.2 github.com/containerd/typeurl v1.0.1 github.com/coreos/go-systemd/v22 v22.1.0 github.com/cpuguy83/go-md2man/v2 v2.0.0 diff --git a/vendor/github.com/containerd/ttrpc/client.go b/vendor/github.com/containerd/ttrpc/client.go index e81694138..30c9b73f3 100644 --- a/vendor/github.com/containerd/ttrpc/client.go +++ b/vendor/github.com/containerd/ttrpc/client.go @@ -47,8 +47,9 @@ type Client struct { ctx context.Context closed func() - closeOnce sync.Once - userCloseFunc func() + closeOnce sync.Once + userCloseFunc func() + userCloseWaitCh chan struct{} errOnce sync.Once err error @@ -75,14 +76,15 @@ func WithUnaryClientInterceptor(i UnaryClientInterceptor) ClientOpts { func NewClient(conn net.Conn, opts ...ClientOpts) *Client { ctx, cancel := context.WithCancel(context.Background()) c := &Client{ - codec: codec{}, - conn: conn, - channel: newChannel(conn), - calls: make(chan *callRequest), - closed: cancel, - ctx: ctx, - userCloseFunc: func() {}, - interceptor: defaultClientInterceptor, + codec: codec{}, + conn: conn, + channel: newChannel(conn), + calls: make(chan *callRequest), + closed: cancel, + ctx: ctx, + userCloseFunc: func() {}, + userCloseWaitCh: make(chan struct{}), + interceptor: defaultClientInterceptor, } for _, o := range opts { @@ -175,6 +177,17 @@ func (c *Client) Close() error { return nil } +// UserOnCloseWait is used to blocks untils the user's on-close callback +// finishes. +func (c *Client) UserOnCloseWait(ctx context.Context) error { + select { + case <-c.userCloseWaitCh: + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + type message struct { messageHeader p []byte @@ -251,6 +264,7 @@ func (c *Client) run() { defer func() { c.conn.Close() c.userCloseFunc() + close(c.userCloseWaitCh) }() for { @@ -339,7 +353,8 @@ func filterCloseErr(err error) error { return ErrClosed default: // if we have an epipe on a write or econnreset on a read , we cast to errclosed - if oerr, ok := err.(*net.OpError); ok && (oerr.Op == "write" || oerr.Op == "read") { + var oerr *net.OpError + if errors.As(err, &oerr) && (oerr.Op == "write" || oerr.Op == "read") { serr, sok := oerr.Err.(*os.SyscallError) if sok && ((serr.Err == syscall.EPIPE && oerr.Op == "write") || (serr.Err == syscall.ECONNRESET && oerr.Op == "read")) {