Merge pull request #4538 from fuweid/update-shim-cleanup

runtime/v2: cleanup dead shim before delete bundle
This commit is contained in:
Phil Estes 2020-09-21 13:32:40 -04:00 committed by GitHub
commit 68d97331be
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 44 additions and 29 deletions

View File

@ -138,12 +138,8 @@ func (m *TaskManager) Create(ctx context.Context, id string, opts runtime.Create
b := shimBinary(ctx, bundle, opts.Runtime, m.containerdAddress, m.containerdTTRPCAddress, m.events, m.tasks) b := shimBinary(ctx, bundle, opts.Runtime, m.containerdAddress, m.containerdTTRPCAddress, m.events, m.tasks)
shim, err := b.Start(ctx, topts, func() { shim, err := b.Start(ctx, topts, func() {
log.G(ctx).WithField("id", id).Info("shim disconnected") log.G(ctx).WithField("id", id).Info("shim disconnected")
_, err := m.tasks.Get(ctx, id)
if err != nil { cleanupAfterDeadShim(context.Background(), id, ns, m.tasks, m.events, b)
// Task was never started or was already successfully deleted
return
}
cleanupAfterDeadShim(context.Background(), id, ns, m.events, b)
// Remove self from the runtime task list. Even though the cleanupAfterDeadShim() // Remove self from the runtime task list. Even though the cleanupAfterDeadShim()
// would publish taskExit event, but the shim.Delete() would always failed with ttrpc // would publish taskExit event, but the shim.Delete() would always failed with ttrpc
// disconnect and there is no chance to remove this dead task from runtime task lists. // disconnect and there is no chance to remove this dead task from runtime task lists.
@ -266,17 +262,13 @@ func (m *TaskManager) loadTasks(ctx context.Context) error {
binaryCall := shimBinary(ctx, bundle, container.Runtime.Name, m.containerdAddress, m.containerdTTRPCAddress, m.events, m.tasks) binaryCall := shimBinary(ctx, bundle, container.Runtime.Name, m.containerdAddress, m.containerdTTRPCAddress, m.events, m.tasks)
shim, err := loadShim(ctx, bundle, m.events, m.tasks, func() { shim, err := loadShim(ctx, bundle, m.events, m.tasks, func() {
log.G(ctx).WithField("id", id).Info("shim disconnected") log.G(ctx).WithField("id", id).Info("shim disconnected")
_, err := m.tasks.Get(ctx, id)
if err != nil { cleanupAfterDeadShim(context.Background(), id, ns, m.tasks, m.events, binaryCall)
// Task was never started or was already successfully deleted
return
}
cleanupAfterDeadShim(context.Background(), id, ns, m.events, binaryCall)
// Remove self from the runtime task list. // Remove self from the runtime task list.
m.tasks.Delete(ctx, id) m.tasks.Delete(ctx, id)
}) })
if err != nil { if err != nil {
cleanupAfterDeadShim(ctx, id, ns, m.events, binaryCall) cleanupAfterDeadShim(ctx, id, ns, m.tasks, m.events, binaryCall)
continue continue
} }
m.tasks.Add(ctx, shim) m.tasks.Add(ctx, shim)

View File

@ -121,7 +121,7 @@ func loadShim(ctx context.Context, bundle *Bundle, events *exchange.Exchange, rt
return s, nil return s, nil
} }
func cleanupAfterDeadShim(ctx context.Context, id, ns string, events *exchange.Exchange, binaryCall *binary) { func cleanupAfterDeadShim(ctx context.Context, id, ns string, rt *runtime.TaskList, events *exchange.Exchange, binaryCall *binary) {
ctx = namespaces.WithNamespace(ctx, ns) ctx = namespaces.WithNamespace(ctx, ns)
ctx, cancel := timeout.WithContext(ctx, cleanupTimeout) ctx, cancel := timeout.WithContext(ctx, cleanupTimeout)
defer cancel() defer cancel()
@ -138,6 +138,12 @@ func cleanupAfterDeadShim(ctx context.Context, id, ns string, events *exchange.E
}).Warn("failed to clean up after shim disconnected") }).Warn("failed to clean up after shim disconnected")
} }
if _, err := rt.Get(ctx, id); err != nil {
// Task was never started or was already successfully deleted
// No need to publish events
return
}
var ( var (
pid uint32 pid uint32
exitStatus uint32 exitStatus uint32
@ -234,13 +240,15 @@ func (s *shim) Delete(ctx context.Context) (*runtime.Exit, error) {
} }
} }
} }
// remove self from the runtime task list
// this seems dirty but it cleans up the API across runtimes, tasks, and the service
s.rtTasks.Delete(ctx, s.ID())
if err := s.waitShutdown(ctx); err != nil { if err := s.waitShutdown(ctx); err != nil {
log.G(ctx).WithField("id", s.ID()).WithError(err).Error("failed to shutdown shim") log.G(ctx).WithField("id", s.ID()).WithError(err).Error("failed to shutdown shim")
} }
s.Close() s.Close()
s.client.UserOnCloseWait(ctx)
// remove self from the runtime task list
// this seems dirty but it cleans up the API across runtimes, tasks, and the service
s.rtTasks.Delete(ctx, s.ID())
if err := s.bundle.Delete(); err != nil { if err := s.bundle.Delete(); err != nil {
log.G(ctx).WithField("id", s.ID()).WithError(err).Error("failed to delete bundle") log.G(ctx).WithField("id", s.ID()).WithError(err).Error("failed to delete bundle")
} }

View File

@ -8,7 +8,7 @@ github.com/containerd/continuity efbc4488d8fe1bdc16bde3b2d299
github.com/containerd/fifo f15a3290365b9d2627d189e619ab4008e0069caf github.com/containerd/fifo f15a3290365b9d2627d189e619ab4008e0069caf
github.com/containerd/go-runc 7016d3ce2328dd2cb1192b2076ebd565c4e8df0c github.com/containerd/go-runc 7016d3ce2328dd2cb1192b2076ebd565c4e8df0c
github.com/containerd/nri 0afc7f031eaf9c7d9c1a381b7ab5462e89c998fc github.com/containerd/nri 0afc7f031eaf9c7d9c1a381b7ab5462e89c998fc
github.com/containerd/ttrpc v1.0.1 github.com/containerd/ttrpc v1.0.2
github.com/containerd/typeurl v1.0.1 github.com/containerd/typeurl v1.0.1
github.com/coreos/go-systemd/v22 v22.1.0 github.com/coreos/go-systemd/v22 v22.1.0
github.com/cpuguy83/go-md2man/v2 v2.0.0 github.com/cpuguy83/go-md2man/v2 v2.0.0

View File

@ -47,8 +47,9 @@ type Client struct {
ctx context.Context ctx context.Context
closed func() closed func()
closeOnce sync.Once closeOnce sync.Once
userCloseFunc func() userCloseFunc func()
userCloseWaitCh chan struct{}
errOnce sync.Once errOnce sync.Once
err error err error
@ -75,14 +76,15 @@ func WithUnaryClientInterceptor(i UnaryClientInterceptor) ClientOpts {
func NewClient(conn net.Conn, opts ...ClientOpts) *Client { func NewClient(conn net.Conn, opts ...ClientOpts) *Client {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
c := &Client{ c := &Client{
codec: codec{}, codec: codec{},
conn: conn, conn: conn,
channel: newChannel(conn), channel: newChannel(conn),
calls: make(chan *callRequest), calls: make(chan *callRequest),
closed: cancel, closed: cancel,
ctx: ctx, ctx: ctx,
userCloseFunc: func() {}, userCloseFunc: func() {},
interceptor: defaultClientInterceptor, userCloseWaitCh: make(chan struct{}),
interceptor: defaultClientInterceptor,
} }
for _, o := range opts { for _, o := range opts {
@ -175,6 +177,17 @@ func (c *Client) Close() error {
return nil return nil
} }
// UserOnCloseWait is used to blocks untils the user's on-close callback
// finishes.
func (c *Client) UserOnCloseWait(ctx context.Context) error {
select {
case <-c.userCloseWaitCh:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
type message struct { type message struct {
messageHeader messageHeader
p []byte p []byte
@ -251,6 +264,7 @@ func (c *Client) run() {
defer func() { defer func() {
c.conn.Close() c.conn.Close()
c.userCloseFunc() c.userCloseFunc()
close(c.userCloseWaitCh)
}() }()
for { for {
@ -339,7 +353,8 @@ func filterCloseErr(err error) error {
return ErrClosed return ErrClosed
default: default:
// if we have an epipe on a write or econnreset on a read , we cast to errclosed // if we have an epipe on a write or econnreset on a read , we cast to errclosed
if oerr, ok := err.(*net.OpError); ok && (oerr.Op == "write" || oerr.Op == "read") { var oerr *net.OpError
if errors.As(err, &oerr) && (oerr.Op == "write" || oerr.Op == "read") {
serr, sok := oerr.Err.(*os.SyscallError) serr, sok := oerr.Err.(*os.SyscallError)
if sok && ((serr.Err == syscall.EPIPE && oerr.Op == "write") || if sok && ((serr.Err == syscall.EPIPE && oerr.Op == "write") ||
(serr.Err == syscall.ECONNRESET && oerr.Op == "read")) { (serr.Err == syscall.ECONNRESET && oerr.Op == "read")) {