From 4b05d039035b8f62c83e292c6451e0e6dd26930a Mon Sep 17 00:00:00 2001 From: Wei Fu Date: Mon, 7 Sep 2020 18:23:01 +0800 Subject: [PATCH] runtime/v2: cleanup dead shim before delete bundle The shim delete action needs bundle information to cleanup resources created by shim. If the cleanup dead shim is called after delete bundle, the part of resources maybe leaky. The ttrpc client UserOnCloseWait() can make sure that resources are cleanup before delete bundle, which synchronizes task deletion and cleanup deadshim. It might slow down the task deletion, but it can make sure that resources can be cleanup and avoid EBUSY umount case. For example, the sandbox container like Kata/Firecracker might have mount points over the rootfs. If containerd handles task deletion and cleanup deadshim parallelly, the task deletion will meet EBUSY during umount and fail to cleanup bundle, which makes case worse. And also update cleanupAfterDeadshim, which makes sure that cleanupAfterDeadshim must be called after shim disconnected. In some case, shim fails to call runc-create for some reason, but the runc-create already makes runc-init into ready state. If containerd doesn't call shim deletion, the runc-init process will be leaky and hold the cgroup, which makes pod terminating :(. Signed-off-by: Wei Fu --- runtime/v2/manager.go | 18 +++------- runtime/v2/shim.go | 16 ++++++--- vendor.conf | 2 +- vendor/github.com/containerd/ttrpc/client.go | 37 ++++++++++++++------ 4 files changed, 44 insertions(+), 29 deletions(-) diff --git a/runtime/v2/manager.go b/runtime/v2/manager.go index 9ae6d31b2..085d77b51 100644 --- a/runtime/v2/manager.go +++ b/runtime/v2/manager.go @@ -138,12 +138,8 @@ func (m *TaskManager) Create(ctx context.Context, id string, opts runtime.Create b := shimBinary(ctx, bundle, opts.Runtime, m.containerdAddress, m.containerdTTRPCAddress, m.events, m.tasks) shim, err := b.Start(ctx, topts, func() { log.G(ctx).WithField("id", id).Info("shim disconnected") - _, err := m.tasks.Get(ctx, id) - if err != nil { - // Task was never started or was already successfully deleted - return - } - cleanupAfterDeadShim(context.Background(), id, ns, m.events, b) + + cleanupAfterDeadShim(context.Background(), id, ns, m.tasks, m.events, b) // Remove self from the runtime task list. Even though the cleanupAfterDeadShim() // would publish taskExit event, but the shim.Delete() would always failed with ttrpc // disconnect and there is no chance to remove this dead task from runtime task lists. @@ -266,17 +262,13 @@ func (m *TaskManager) loadTasks(ctx context.Context) error { binaryCall := shimBinary(ctx, bundle, container.Runtime.Name, m.containerdAddress, m.containerdTTRPCAddress, m.events, m.tasks) shim, err := loadShim(ctx, bundle, m.events, m.tasks, func() { log.G(ctx).WithField("id", id).Info("shim disconnected") - _, err := m.tasks.Get(ctx, id) - if err != nil { - // Task was never started or was already successfully deleted - return - } - cleanupAfterDeadShim(context.Background(), id, ns, m.events, binaryCall) + + cleanupAfterDeadShim(context.Background(), id, ns, m.tasks, m.events, binaryCall) // Remove self from the runtime task list. m.tasks.Delete(ctx, id) }) if err != nil { - cleanupAfterDeadShim(ctx, id, ns, m.events, binaryCall) + cleanupAfterDeadShim(ctx, id, ns, m.tasks, m.events, binaryCall) continue } m.tasks.Add(ctx, shim) diff --git a/runtime/v2/shim.go b/runtime/v2/shim.go index 152a1e85b..cde1bf688 100644 --- a/runtime/v2/shim.go +++ b/runtime/v2/shim.go @@ -121,7 +121,7 @@ func loadShim(ctx context.Context, bundle *Bundle, events *exchange.Exchange, rt return s, nil } -func cleanupAfterDeadShim(ctx context.Context, id, ns string, events *exchange.Exchange, binaryCall *binary) { +func cleanupAfterDeadShim(ctx context.Context, id, ns string, rt *runtime.TaskList, events *exchange.Exchange, binaryCall *binary) { ctx = namespaces.WithNamespace(ctx, ns) ctx, cancel := timeout.WithContext(ctx, cleanupTimeout) defer cancel() @@ -138,6 +138,12 @@ func cleanupAfterDeadShim(ctx context.Context, id, ns string, events *exchange.E }).Warn("failed to clean up after shim disconnected") } + if _, err := rt.Get(ctx, id); err != nil { + // Task was never started or was already successfully deleted + // No need to publish events + return + } + var ( pid uint32 exitStatus uint32 @@ -234,13 +240,15 @@ func (s *shim) Delete(ctx context.Context) (*runtime.Exit, error) { } } } - // remove self from the runtime task list - // this seems dirty but it cleans up the API across runtimes, tasks, and the service - s.rtTasks.Delete(ctx, s.ID()) if err := s.waitShutdown(ctx); err != nil { log.G(ctx).WithField("id", s.ID()).WithError(err).Error("failed to shutdown shim") } s.Close() + s.client.UserOnCloseWait(ctx) + + // remove self from the runtime task list + // this seems dirty but it cleans up the API across runtimes, tasks, and the service + s.rtTasks.Delete(ctx, s.ID()) if err := s.bundle.Delete(); err != nil { log.G(ctx).WithField("id", s.ID()).WithError(err).Error("failed to delete bundle") } diff --git a/vendor.conf b/vendor.conf index ad72fb6d2..1c5208e4f 100644 --- a/vendor.conf +++ b/vendor.conf @@ -8,7 +8,7 @@ github.com/containerd/continuity efbc4488d8fe1bdc16bde3b2d299 github.com/containerd/fifo f15a3290365b9d2627d189e619ab4008e0069caf github.com/containerd/go-runc 7016d3ce2328dd2cb1192b2076ebd565c4e8df0c github.com/containerd/nri 0afc7f031eaf9c7d9c1a381b7ab5462e89c998fc -github.com/containerd/ttrpc v1.0.1 +github.com/containerd/ttrpc v1.0.2 github.com/containerd/typeurl v1.0.1 github.com/coreos/go-systemd/v22 v22.1.0 github.com/cpuguy83/go-md2man/v2 v2.0.0 diff --git a/vendor/github.com/containerd/ttrpc/client.go b/vendor/github.com/containerd/ttrpc/client.go index e81694138..30c9b73f3 100644 --- a/vendor/github.com/containerd/ttrpc/client.go +++ b/vendor/github.com/containerd/ttrpc/client.go @@ -47,8 +47,9 @@ type Client struct { ctx context.Context closed func() - closeOnce sync.Once - userCloseFunc func() + closeOnce sync.Once + userCloseFunc func() + userCloseWaitCh chan struct{} errOnce sync.Once err error @@ -75,14 +76,15 @@ func WithUnaryClientInterceptor(i UnaryClientInterceptor) ClientOpts { func NewClient(conn net.Conn, opts ...ClientOpts) *Client { ctx, cancel := context.WithCancel(context.Background()) c := &Client{ - codec: codec{}, - conn: conn, - channel: newChannel(conn), - calls: make(chan *callRequest), - closed: cancel, - ctx: ctx, - userCloseFunc: func() {}, - interceptor: defaultClientInterceptor, + codec: codec{}, + conn: conn, + channel: newChannel(conn), + calls: make(chan *callRequest), + closed: cancel, + ctx: ctx, + userCloseFunc: func() {}, + userCloseWaitCh: make(chan struct{}), + interceptor: defaultClientInterceptor, } for _, o := range opts { @@ -175,6 +177,17 @@ func (c *Client) Close() error { return nil } +// UserOnCloseWait is used to blocks untils the user's on-close callback +// finishes. +func (c *Client) UserOnCloseWait(ctx context.Context) error { + select { + case <-c.userCloseWaitCh: + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + type message struct { messageHeader p []byte @@ -251,6 +264,7 @@ func (c *Client) run() { defer func() { c.conn.Close() c.userCloseFunc() + close(c.userCloseWaitCh) }() for { @@ -339,7 +353,8 @@ func filterCloseErr(err error) error { return ErrClosed default: // if we have an epipe on a write or econnreset on a read , we cast to errclosed - if oerr, ok := err.(*net.OpError); ok && (oerr.Op == "write" || oerr.Op == "read") { + var oerr *net.OpError + if errors.As(err, &oerr) && (oerr.Op == "write" || oerr.Op == "read") { serr, sok := oerr.Err.(*os.SyscallError) if sok && ((serr.Err == syscall.EPIPE && oerr.Op == "write") || (serr.Err == syscall.ECONNRESET && oerr.Op == "read")) {