runtime/v2: cleanup dead shim before delete bundle
The shim delete action needs bundle information to cleanup resources created by shim. If the cleanup dead shim is called after delete bundle, the part of resources maybe leaky. The ttrpc client UserOnCloseWait() can make sure that resources are cleanup before delete bundle, which synchronizes task deletion and cleanup deadshim. It might slow down the task deletion, but it can make sure that resources can be cleanup and avoid EBUSY umount case. For example, the sandbox container like Kata/Firecracker might have mount points over the rootfs. If containerd handles task deletion and cleanup deadshim parallelly, the task deletion will meet EBUSY during umount and fail to cleanup bundle, which makes case worse. And also update cleanupAfterDeadshim, which makes sure that cleanupAfterDeadshim must be called after shim disconnected. In some case, shim fails to call runc-create for some reason, but the runc-create already makes runc-init into ready state. If containerd doesn't call shim deletion, the runc-init process will be leaky and hold the cgroup, which makes pod terminating :(. Signed-off-by: Wei Fu <fuweid89@gmail.com>
This commit is contained in:
parent
fabebe5d55
commit
4b05d03903
@ -138,12 +138,8 @@ func (m *TaskManager) Create(ctx context.Context, id string, opts runtime.Create
|
|||||||
b := shimBinary(ctx, bundle, opts.Runtime, m.containerdAddress, m.containerdTTRPCAddress, m.events, m.tasks)
|
b := shimBinary(ctx, bundle, opts.Runtime, m.containerdAddress, m.containerdTTRPCAddress, m.events, m.tasks)
|
||||||
shim, err := b.Start(ctx, topts, func() {
|
shim, err := b.Start(ctx, topts, func() {
|
||||||
log.G(ctx).WithField("id", id).Info("shim disconnected")
|
log.G(ctx).WithField("id", id).Info("shim disconnected")
|
||||||
_, err := m.tasks.Get(ctx, id)
|
|
||||||
if err != nil {
|
cleanupAfterDeadShim(context.Background(), id, ns, m.tasks, m.events, b)
|
||||||
// Task was never started or was already successfully deleted
|
|
||||||
return
|
|
||||||
}
|
|
||||||
cleanupAfterDeadShim(context.Background(), id, ns, m.events, b)
|
|
||||||
// Remove self from the runtime task list. Even though the cleanupAfterDeadShim()
|
// Remove self from the runtime task list. Even though the cleanupAfterDeadShim()
|
||||||
// would publish taskExit event, but the shim.Delete() would always failed with ttrpc
|
// would publish taskExit event, but the shim.Delete() would always failed with ttrpc
|
||||||
// disconnect and there is no chance to remove this dead task from runtime task lists.
|
// disconnect and there is no chance to remove this dead task from runtime task lists.
|
||||||
@ -266,17 +262,13 @@ func (m *TaskManager) loadTasks(ctx context.Context) error {
|
|||||||
binaryCall := shimBinary(ctx, bundle, container.Runtime.Name, m.containerdAddress, m.containerdTTRPCAddress, m.events, m.tasks)
|
binaryCall := shimBinary(ctx, bundle, container.Runtime.Name, m.containerdAddress, m.containerdTTRPCAddress, m.events, m.tasks)
|
||||||
shim, err := loadShim(ctx, bundle, m.events, m.tasks, func() {
|
shim, err := loadShim(ctx, bundle, m.events, m.tasks, func() {
|
||||||
log.G(ctx).WithField("id", id).Info("shim disconnected")
|
log.G(ctx).WithField("id", id).Info("shim disconnected")
|
||||||
_, err := m.tasks.Get(ctx, id)
|
|
||||||
if err != nil {
|
cleanupAfterDeadShim(context.Background(), id, ns, m.tasks, m.events, binaryCall)
|
||||||
// Task was never started or was already successfully deleted
|
|
||||||
return
|
|
||||||
}
|
|
||||||
cleanupAfterDeadShim(context.Background(), id, ns, m.events, binaryCall)
|
|
||||||
// Remove self from the runtime task list.
|
// Remove self from the runtime task list.
|
||||||
m.tasks.Delete(ctx, id)
|
m.tasks.Delete(ctx, id)
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cleanupAfterDeadShim(ctx, id, ns, m.events, binaryCall)
|
cleanupAfterDeadShim(ctx, id, ns, m.tasks, m.events, binaryCall)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
m.tasks.Add(ctx, shim)
|
m.tasks.Add(ctx, shim)
|
||||||
|
@ -121,7 +121,7 @@ func loadShim(ctx context.Context, bundle *Bundle, events *exchange.Exchange, rt
|
|||||||
return s, nil
|
return s, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func cleanupAfterDeadShim(ctx context.Context, id, ns string, events *exchange.Exchange, binaryCall *binary) {
|
func cleanupAfterDeadShim(ctx context.Context, id, ns string, rt *runtime.TaskList, events *exchange.Exchange, binaryCall *binary) {
|
||||||
ctx = namespaces.WithNamespace(ctx, ns)
|
ctx = namespaces.WithNamespace(ctx, ns)
|
||||||
ctx, cancel := timeout.WithContext(ctx, cleanupTimeout)
|
ctx, cancel := timeout.WithContext(ctx, cleanupTimeout)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
@ -138,6 +138,12 @@ func cleanupAfterDeadShim(ctx context.Context, id, ns string, events *exchange.E
|
|||||||
}).Warn("failed to clean up after shim disconnected")
|
}).Warn("failed to clean up after shim disconnected")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if _, err := rt.Get(ctx, id); err != nil {
|
||||||
|
// Task was never started or was already successfully deleted
|
||||||
|
// No need to publish events
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
pid uint32
|
pid uint32
|
||||||
exitStatus uint32
|
exitStatus uint32
|
||||||
@ -234,13 +240,15 @@ func (s *shim) Delete(ctx context.Context) (*runtime.Exit, error) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// remove self from the runtime task list
|
|
||||||
// this seems dirty but it cleans up the API across runtimes, tasks, and the service
|
|
||||||
s.rtTasks.Delete(ctx, s.ID())
|
|
||||||
if err := s.waitShutdown(ctx); err != nil {
|
if err := s.waitShutdown(ctx); err != nil {
|
||||||
log.G(ctx).WithField("id", s.ID()).WithError(err).Error("failed to shutdown shim")
|
log.G(ctx).WithField("id", s.ID()).WithError(err).Error("failed to shutdown shim")
|
||||||
}
|
}
|
||||||
s.Close()
|
s.Close()
|
||||||
|
s.client.UserOnCloseWait(ctx)
|
||||||
|
|
||||||
|
// remove self from the runtime task list
|
||||||
|
// this seems dirty but it cleans up the API across runtimes, tasks, and the service
|
||||||
|
s.rtTasks.Delete(ctx, s.ID())
|
||||||
if err := s.bundle.Delete(); err != nil {
|
if err := s.bundle.Delete(); err != nil {
|
||||||
log.G(ctx).WithField("id", s.ID()).WithError(err).Error("failed to delete bundle")
|
log.G(ctx).WithField("id", s.ID()).WithError(err).Error("failed to delete bundle")
|
||||||
}
|
}
|
||||||
|
@ -8,7 +8,7 @@ github.com/containerd/continuity efbc4488d8fe1bdc16bde3b2d299
|
|||||||
github.com/containerd/fifo f15a3290365b9d2627d189e619ab4008e0069caf
|
github.com/containerd/fifo f15a3290365b9d2627d189e619ab4008e0069caf
|
||||||
github.com/containerd/go-runc 7016d3ce2328dd2cb1192b2076ebd565c4e8df0c
|
github.com/containerd/go-runc 7016d3ce2328dd2cb1192b2076ebd565c4e8df0c
|
||||||
github.com/containerd/nri 0afc7f031eaf9c7d9c1a381b7ab5462e89c998fc
|
github.com/containerd/nri 0afc7f031eaf9c7d9c1a381b7ab5462e89c998fc
|
||||||
github.com/containerd/ttrpc v1.0.1
|
github.com/containerd/ttrpc v1.0.2
|
||||||
github.com/containerd/typeurl v1.0.1
|
github.com/containerd/typeurl v1.0.1
|
||||||
github.com/coreos/go-systemd/v22 v22.1.0
|
github.com/coreos/go-systemd/v22 v22.1.0
|
||||||
github.com/cpuguy83/go-md2man/v2 v2.0.0
|
github.com/cpuguy83/go-md2man/v2 v2.0.0
|
||||||
|
37
vendor/github.com/containerd/ttrpc/client.go
generated
vendored
37
vendor/github.com/containerd/ttrpc/client.go
generated
vendored
@ -47,8 +47,9 @@ type Client struct {
|
|||||||
ctx context.Context
|
ctx context.Context
|
||||||
closed func()
|
closed func()
|
||||||
|
|
||||||
closeOnce sync.Once
|
closeOnce sync.Once
|
||||||
userCloseFunc func()
|
userCloseFunc func()
|
||||||
|
userCloseWaitCh chan struct{}
|
||||||
|
|
||||||
errOnce sync.Once
|
errOnce sync.Once
|
||||||
err error
|
err error
|
||||||
@ -75,14 +76,15 @@ func WithUnaryClientInterceptor(i UnaryClientInterceptor) ClientOpts {
|
|||||||
func NewClient(conn net.Conn, opts ...ClientOpts) *Client {
|
func NewClient(conn net.Conn, opts ...ClientOpts) *Client {
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
c := &Client{
|
c := &Client{
|
||||||
codec: codec{},
|
codec: codec{},
|
||||||
conn: conn,
|
conn: conn,
|
||||||
channel: newChannel(conn),
|
channel: newChannel(conn),
|
||||||
calls: make(chan *callRequest),
|
calls: make(chan *callRequest),
|
||||||
closed: cancel,
|
closed: cancel,
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
userCloseFunc: func() {},
|
userCloseFunc: func() {},
|
||||||
interceptor: defaultClientInterceptor,
|
userCloseWaitCh: make(chan struct{}),
|
||||||
|
interceptor: defaultClientInterceptor,
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, o := range opts {
|
for _, o := range opts {
|
||||||
@ -175,6 +177,17 @@ func (c *Client) Close() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// UserOnCloseWait is used to blocks untils the user's on-close callback
|
||||||
|
// finishes.
|
||||||
|
func (c *Client) UserOnCloseWait(ctx context.Context) error {
|
||||||
|
select {
|
||||||
|
case <-c.userCloseWaitCh:
|
||||||
|
return nil
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
type message struct {
|
type message struct {
|
||||||
messageHeader
|
messageHeader
|
||||||
p []byte
|
p []byte
|
||||||
@ -251,6 +264,7 @@ func (c *Client) run() {
|
|||||||
defer func() {
|
defer func() {
|
||||||
c.conn.Close()
|
c.conn.Close()
|
||||||
c.userCloseFunc()
|
c.userCloseFunc()
|
||||||
|
close(c.userCloseWaitCh)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
@ -339,7 +353,8 @@ func filterCloseErr(err error) error {
|
|||||||
return ErrClosed
|
return ErrClosed
|
||||||
default:
|
default:
|
||||||
// if we have an epipe on a write or econnreset on a read , we cast to errclosed
|
// if we have an epipe on a write or econnreset on a read , we cast to errclosed
|
||||||
if oerr, ok := err.(*net.OpError); ok && (oerr.Op == "write" || oerr.Op == "read") {
|
var oerr *net.OpError
|
||||||
|
if errors.As(err, &oerr) && (oerr.Op == "write" || oerr.Op == "read") {
|
||||||
serr, sok := oerr.Err.(*os.SyscallError)
|
serr, sok := oerr.Err.(*os.SyscallError)
|
||||||
if sok && ((serr.Err == syscall.EPIPE && oerr.Op == "write") ||
|
if sok && ((serr.Err == syscall.EPIPE && oerr.Op == "write") ||
|
||||||
(serr.Err == syscall.ECONNRESET && oerr.Op == "read")) {
|
(serr.Err == syscall.ECONNRESET && oerr.Op == "read")) {
|
||||||
|
Loading…
Reference in New Issue
Block a user