Rework task create and cleanup flow

Signed-off-by: Maksym Pavlenko <pavlenko.maksym@gmail.com>
This commit is contained in:
Maksym Pavlenko
2021-08-26 16:35:56 -07:00
parent 7c4ead285d
commit fb5f6ce3c9
3 changed files with 107 additions and 57 deletions

View File

@@ -33,6 +33,7 @@ import (
"github.com/containerd/containerd/platforms"
"github.com/containerd/containerd/plugin"
"github.com/containerd/containerd/runtime"
"github.com/containerd/containerd/runtime/v2/task"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
)
@@ -163,8 +164,8 @@ func (m *ShimManager) ID() string {
return fmt.Sprintf("%s.%s", plugin.RuntimePluginV2, "shim")
}
// Create a new task
func (m *ShimManager) Create(ctx context.Context, id string, opts runtime.CreateOpts) (_ runtime.Task, retErr error) {
// Start launches a new shim instance
func (m *ShimManager) Start(ctx context.Context, id string, opts runtime.CreateOpts) (_ *shimTask, retErr error) {
bundle, err := NewBundle(ctx, m.root, m.state, id, opts.Spec.Value)
if err != nil {
return nil, err
@@ -185,19 +186,21 @@ func (m *ShimManager) Create(ctx context.Context, id string, opts runtime.Create
}
}()
t, err := shim.Create(ctx, opts)
if err != nil {
return nil, errors.Wrap(err, "failed to create shim")
// NOTE: temporarily keep this wrapper around until containerd's task service depends on it.
// This will no longer be required once we migrate to client side task management.
shimTask := &shimTask{
shim: shim,
task: task.NewTaskClient(shim.client),
}
if err := m.list.Add(ctx, t); err != nil {
if err := m.list.Add(ctx, shimTask); err != nil {
return nil, errors.Wrap(err, "failed to add task")
}
return t, nil
return shimTask, nil
}
func (m *ShimManager) startShim(ctx context.Context, bundle *Bundle, id string, opts runtime.CreateOpts) (*shimTask, error) {
func (m *ShimManager) startShim(ctx context.Context, bundle *Bundle, id string, opts runtime.CreateOpts) (*shim, error) {
ns, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return nil, err
@@ -232,50 +235,35 @@ func (m *ShimManager) startShim(ctx context.Context, bundle *Bundle, id string,
}
// cleanupShim attempts to properly delete and cleanup shim after error
func (m *ShimManager) cleanupShim(shim *shimTask) {
func (m *ShimManager) cleanupShim(shim *shim) {
dctx, cancel := timeout.WithContext(context.Background(), cleanupTimeout)
defer cancel()
_, errShim := shim.delete(dctx, m.list.Delete)
if errShim != nil {
if errdefs.IsDeadlineExceeded(errShim) {
dctx, cancel = timeout.WithContext(context.Background(), cleanupTimeout)
defer cancel()
}
shim.Shutdown(dctx)
shim.Close()
_ = shim.delete(dctx)
m.list.Delete(dctx, shim.ID())
}
func (m *ShimManager) Get(ctx context.Context, id string) (*shim, error) {
item, err := m.list.Get(ctx, id)
if err != nil {
return nil, err
}
}
// Get a specific task
func (m *ShimManager) Get(ctx context.Context, id string) (runtime.Task, error) {
return m.list.Get(ctx, id)
}
// Add a runtime task
func (m *ShimManager) Add(ctx context.Context, task runtime.Task) error {
return m.list.Add(ctx, task)
shimTask := item.(*shimTask)
return shimTask.shim, nil
}
// Delete a runtime task
func (m *ShimManager) Delete(ctx context.Context, id string) (*runtime.Exit, error) {
task, err := m.list.Get(ctx, id)
func (m *ShimManager) Delete(ctx context.Context, id string) error {
shim, err := m.Get(ctx, id)
if err != nil {
return nil, err
return err
}
shim := task.(*shimTask)
exit, err := shim.delete(ctx, m.list.Delete)
if err != nil {
return nil, err
}
err = shim.delete(ctx)
m.list.Delete(ctx, id)
return exit, err
}
// Tasks lists all tasks
func (m *ShimManager) Tasks(ctx context.Context, all bool) ([]runtime.Task, error) {
return m.list.GetAll(ctx, all)
return err
}
func (m *ShimManager) loadExistingTasks(ctx context.Context) error {
@@ -434,20 +422,61 @@ func (m *TaskManager) ID() string {
// Create launches new shim instance and creates new task
func (m *TaskManager) Create(ctx context.Context, taskID string, opts runtime.CreateOpts) (runtime.Task, error) {
return m.shims.Create(ctx, taskID, opts)
shim, err := m.shims.Start(ctx, taskID, opts)
if err != nil {
return nil, errors.Wrap(err, "failed to start shim")
}
t, err := shim.Create(ctx, opts)
if err != nil {
dctx, cancel := timeout.WithContext(context.Background(), cleanupTimeout)
defer cancel()
_, errShim := shim.delete(dctx, func(ctx context.Context, id string) {
m.shims.list.Delete(ctx, id)
})
if errShim != nil {
if errdefs.IsDeadlineExceeded(errShim) {
dctx, cancel = timeout.WithContext(context.Background(), cleanupTimeout)
defer cancel()
}
shim.Shutdown(dctx)
shim.Close()
}
return nil, errors.Wrap(err, "failed to create shim task")
}
return t, nil
}
// Get a specific task
func (m *TaskManager) Get(ctx context.Context, id string) (runtime.Task, error) {
return m.shims.Get(ctx, id)
return m.shims.list.Get(ctx, id)
}
// Tasks lists all tasks
func (m *TaskManager) Tasks(ctx context.Context, all bool) ([]runtime.Task, error) {
return m.shims.Tasks(ctx, all)
return m.shims.list.GetAll(ctx, all)
}
// Delete deletes the task and shim instance
func (m *TaskManager) Delete(ctx context.Context, taskID string) (*runtime.Exit, error) {
return m.shims.Delete(ctx, taskID)
item, err := m.shims.list.Get(ctx, taskID)
if err != nil {
return nil, err
}
shimTask := item.(*shimTask)
exit, err := shimTask.delete(ctx, func(ctx context.Context, id string) {
m.shims.list.Delete(ctx, id)
})
if err != nil {
return nil, fmt.Errorf("failed to delete task: %w", err)
}
return exit, nil
}