diff --git a/runtime/v2/binary.go b/runtime/v2/binary.go index b847a6a8b..01584ab5a 100644 --- a/runtime/v2/binary.go +++ b/runtime/v2/binary.go @@ -60,7 +60,7 @@ type binary struct { bundle *Bundle } -func (b *binary) Start(ctx context.Context, opts *types.Any, onClose func()) (_ *shimTask, err error) { +func (b *binary) Start(ctx context.Context, opts *types.Any, onClose func()) (_ *shim, err error) { args := []string{"-id", b.bundle.ID} switch logrus.GetLevel() { case logrus.DebugLevel, logrus.TraceLevel: @@ -128,12 +128,9 @@ func (b *binary) Start(ctx context.Context, opts *types.Any, onClose func()) (_ f.Close() } client := ttrpc.NewClient(conn, ttrpc.WithOnClose(onCloseWithShimLog)) - return &shimTask{ - shim: &shim{ - bundle: b.bundle, - client: client, - }, - task: task.NewTaskClient(client), + return &shim{ + bundle: b.bundle, + client: client, }, nil } diff --git a/runtime/v2/manager.go b/runtime/v2/manager.go index 5cb5b9cd2..7baba194f 100644 --- a/runtime/v2/manager.go +++ b/runtime/v2/manager.go @@ -33,6 +33,7 @@ import ( "github.com/containerd/containerd/platforms" "github.com/containerd/containerd/plugin" "github.com/containerd/containerd/runtime" + "github.com/containerd/containerd/runtime/v2/task" ocispec "github.com/opencontainers/image-spec/specs-go/v1" "github.com/pkg/errors" ) @@ -163,8 +164,8 @@ func (m *ShimManager) ID() string { return fmt.Sprintf("%s.%s", plugin.RuntimePluginV2, "shim") } -// Create a new task -func (m *ShimManager) Create(ctx context.Context, id string, opts runtime.CreateOpts) (_ runtime.Task, retErr error) { +// Start launches a new shim instance +func (m *ShimManager) Start(ctx context.Context, id string, opts runtime.CreateOpts) (_ *shimTask, retErr error) { bundle, err := NewBundle(ctx, m.root, m.state, id, opts.Spec.Value) if err != nil { return nil, err @@ -185,19 +186,21 @@ func (m *ShimManager) Create(ctx context.Context, id string, opts runtime.Create } }() - t, err := shim.Create(ctx, opts) - if err != nil { - return nil, errors.Wrap(err, "failed to create shim") + // NOTE: temporarily keep this wrapper around until containerd's task service depends on it. + // This will no longer be required once we migrate to client side task management. + shimTask := &shimTask{ + shim: shim, + task: task.NewTaskClient(shim.client), } - if err := m.list.Add(ctx, t); err != nil { + if err := m.list.Add(ctx, shimTask); err != nil { return nil, errors.Wrap(err, "failed to add task") } - return t, nil + return shimTask, nil } -func (m *ShimManager) startShim(ctx context.Context, bundle *Bundle, id string, opts runtime.CreateOpts) (*shimTask, error) { +func (m *ShimManager) startShim(ctx context.Context, bundle *Bundle, id string, opts runtime.CreateOpts) (*shim, error) { ns, err := namespaces.NamespaceRequired(ctx) if err != nil { return nil, err @@ -232,50 +235,35 @@ func (m *ShimManager) startShim(ctx context.Context, bundle *Bundle, id string, } // cleanupShim attempts to properly delete and cleanup shim after error -func (m *ShimManager) cleanupShim(shim *shimTask) { +func (m *ShimManager) cleanupShim(shim *shim) { dctx, cancel := timeout.WithContext(context.Background(), cleanupTimeout) defer cancel() - _, errShim := shim.delete(dctx, m.list.Delete) - if errShim != nil { - if errdefs.IsDeadlineExceeded(errShim) { - dctx, cancel = timeout.WithContext(context.Background(), cleanupTimeout) - defer cancel() - } - shim.Shutdown(dctx) - shim.Close() + _ = shim.delete(dctx) + m.list.Delete(dctx, shim.ID()) +} + +func (m *ShimManager) Get(ctx context.Context, id string) (*shim, error) { + item, err := m.list.Get(ctx, id) + if err != nil { + return nil, err } -} -// Get a specific task -func (m *ShimManager) Get(ctx context.Context, id string) (runtime.Task, error) { - return m.list.Get(ctx, id) -} - -// Add a runtime task -func (m *ShimManager) Add(ctx context.Context, task runtime.Task) error { - return m.list.Add(ctx, task) + shimTask := item.(*shimTask) + return shimTask.shim, nil } // Delete a runtime task -func (m *ShimManager) Delete(ctx context.Context, id string) (*runtime.Exit, error) { - task, err := m.list.Get(ctx, id) +func (m *ShimManager) Delete(ctx context.Context, id string) error { + shim, err := m.Get(ctx, id) if err != nil { - return nil, err + return err } - shim := task.(*shimTask) - exit, err := shim.delete(ctx, m.list.Delete) - if err != nil { - return nil, err - } + err = shim.delete(ctx) + m.list.Delete(ctx, id) - return exit, err -} - -// Tasks lists all tasks -func (m *ShimManager) Tasks(ctx context.Context, all bool) ([]runtime.Task, error) { - return m.list.GetAll(ctx, all) + return err } func (m *ShimManager) loadExistingTasks(ctx context.Context) error { @@ -434,20 +422,61 @@ func (m *TaskManager) ID() string { // Create launches new shim instance and creates new task func (m *TaskManager) Create(ctx context.Context, taskID string, opts runtime.CreateOpts) (runtime.Task, error) { - return m.shims.Create(ctx, taskID, opts) + shim, err := m.shims.Start(ctx, taskID, opts) + if err != nil { + return nil, errors.Wrap(err, "failed to start shim") + } + + t, err := shim.Create(ctx, opts) + if err != nil { + dctx, cancel := timeout.WithContext(context.Background(), cleanupTimeout) + defer cancel() + + _, errShim := shim.delete(dctx, func(ctx context.Context, id string) { + m.shims.list.Delete(ctx, id) + }) + + if errShim != nil { + if errdefs.IsDeadlineExceeded(errShim) { + dctx, cancel = timeout.WithContext(context.Background(), cleanupTimeout) + defer cancel() + } + + shim.Shutdown(dctx) + shim.Close() + } + + return nil, errors.Wrap(err, "failed to create shim task") + } + + return t, nil } // Get a specific task func (m *TaskManager) Get(ctx context.Context, id string) (runtime.Task, error) { - return m.shims.Get(ctx, id) + return m.shims.list.Get(ctx, id) } // Tasks lists all tasks func (m *TaskManager) Tasks(ctx context.Context, all bool) ([]runtime.Task, error) { - return m.shims.Tasks(ctx, all) + return m.shims.list.GetAll(ctx, all) } // Delete deletes the task and shim instance func (m *TaskManager) Delete(ctx context.Context, taskID string) (*runtime.Exit, error) { - return m.shims.Delete(ctx, taskID) + item, err := m.shims.list.Get(ctx, taskID) + if err != nil { + return nil, err + } + + shimTask := item.(*shimTask) + exit, err := shimTask.delete(ctx, func(ctx context.Context, id string) { + m.shims.list.Delete(ctx, id) + }) + + if err != nil { + return nil, fmt.Errorf("failed to delete task: %w", err) + } + + return exit, nil } diff --git a/runtime/v2/shim.go b/runtime/v2/shim.go index 2d9cb27c2..df3a2b097 100644 --- a/runtime/v2/shim.go +++ b/runtime/v2/shim.go @@ -18,6 +18,7 @@ package v2 import ( "context" + "fmt" "io" "os" "path/filepath" @@ -37,6 +38,7 @@ import ( "github.com/containerd/containerd/runtime/v2/task" "github.com/containerd/ttrpc" ptypes "github.com/gogo/protobuf/types" + "github.com/hashicorp/go-multierror" "github.com/pkg/errors" "github.com/sirupsen/logrus" ) @@ -202,6 +204,27 @@ func (s *shim) Close() error { return s.client.Close() } +func (s *shim) delete(ctx context.Context) error { + var ( + result *multierror.Error + ) + + if err := s.Close(); err != nil { + result = multierror.Append(result, fmt.Errorf("failed to close ttrpc client: %w", err)) + } + + if err := s.client.UserOnCloseWait(ctx); err != nil { + result = multierror.Append(result, fmt.Errorf("close wait error: %w", err)) + } + + if err := s.bundle.Delete(); err != nil { + log.G(ctx).WithField("id", s.ID()).WithError(err).Error("failed to delete bundle") + result = multierror.Append(result, fmt.Errorf("failed to delete bundle: %w", err)) + } + + return result.ErrorOrNil() +} + var _ runtime.Task = &shimTask{} // shimTask wraps shim process and adds task service client for compatibility with existing shim manager. @@ -267,20 +290,21 @@ func (s *shimTask) delete(ctx context.Context, removeTask func(ctx context.Conte } if err := s.waitShutdown(ctx); err != nil { - log.G(ctx).WithField("id", s.ID()).WithError(err).Error("failed to shutdown shim") + log.G(ctx).WithField("id", s.ID()).WithError(err).Error("failed to shutdown shim task") + } + + if err := s.shim.delete(ctx); err != nil { + log.G(ctx).WithField("id", s.ID()).WithError(err).Errorf("failed to delete shim") } - s.Close() - s.client.UserOnCloseWait(ctx) // remove self from the runtime task list // this seems dirty but it cleans up the API across runtimes, tasks, and the service removeTask(ctx, s.ID()) - if err := s.bundle.Delete(); err != nil { - log.G(ctx).WithField("id", s.ID()).WithError(err).Error("failed to delete bundle") - } + if shimErr != nil { return nil, shimErr } + return &runtime.Exit{ Status: response.ExitStatus, Timestamp: response.ExitedAt,