From 2d5d3541e6260718e951f002fe31b4ae6d237199 Mon Sep 17 00:00:00 2001 From: Maksym Pavlenko Date: Wed, 25 Aug 2021 17:00:27 -0700 Subject: [PATCH 01/12] Rename task manager to shim manager Signed-off-by: Maksym Pavlenko --- runtime/v2/binary.go | 12 +++-- runtime/v2/manager.go | 94 ++++++++++++++++++---------------- runtime/v2/process.go | 2 +- runtime/v2/shim.go | 111 +++++++++++++++++++++------------------- services/tasks/local.go | 4 +- 5 files changed, 118 insertions(+), 105 deletions(-) diff --git a/runtime/v2/binary.go b/runtime/v2/binary.go index e16f8ac92..b847a6a8b 100644 --- a/runtime/v2/binary.go +++ b/runtime/v2/binary.go @@ -60,7 +60,7 @@ type binary struct { bundle *Bundle } -func (b *binary) Start(ctx context.Context, opts *types.Any, onClose func()) (_ *shim, err error) { +func (b *binary) Start(ctx context.Context, opts *types.Any, onClose func()) (_ *shimTask, err error) { args := []string{"-id", b.bundle.ID} switch logrus.GetLevel() { case logrus.DebugLevel, logrus.TraceLevel: @@ -128,10 +128,12 @@ func (b *binary) Start(ctx context.Context, opts *types.Any, onClose func()) (_ f.Close() } client := ttrpc.NewClient(conn, ttrpc.WithOnClose(onCloseWithShimLog)) - return &shim{ - bundle: b.bundle, - client: client, - task: task.NewTaskClient(client), + return &shimTask{ + shim: &shim{ + bundle: b.bundle, + client: client, + }, + task: task.NewTaskClient(client), }, nil } diff --git a/runtime/v2/manager.go b/runtime/v2/manager.go index 1a6e9b293..d7b23f050 100644 --- a/runtime/v2/manager.go +++ b/runtime/v2/manager.go @@ -48,7 +48,7 @@ type Config struct { func init() { plugin.Register(&plugin.Registration{ Type: plugin.RuntimePluginV2, - ID: "task", + ID: "shim", Requires: []plugin.Type{ plugin.EventPlugin, plugin.MetadataPlugin, @@ -81,7 +81,7 @@ func init() { cs := metadata.NewContainerStore(m.(*metadata.DB)) events := ep.(*exchange.Exchange) - return New(ic.Context, &ManagerConfig{ + return NewShimManager(ic.Context, &ManagerConfig{ Root: ic.Root, State: ic.State, Address: ic.Address, @@ -104,49 +104,53 @@ type ManagerConfig struct { SchedCore bool } -// New task manager for v2 shims -func New(ctx context.Context, config *ManagerConfig) (*TaskManager, error) { +// NewShimManager creates a manager for v2 shims +func NewShimManager(ctx context.Context, config *ManagerConfig) (*ShimManager, error) { for _, d := range []string{config.Root, config.State} { if err := os.MkdirAll(d, 0711); err != nil { return nil, err } } - m := &TaskManager{ + + m := &ShimManager{ root: config.Root, state: config.State, containerdAddress: config.Address, containerdTTRPCAddress: config.TTRPCAddress, - schedCore: config.SchedCore, - tasks: runtime.NewTaskList(), + list: runtime.NewTaskList(), events: config.Events, containers: config.Store, } + if err := m.loadExistingTasks(ctx); err != nil { return nil, err } + return m, nil } -// TaskManager manages v2 shim's and their tasks -type TaskManager struct { +// ShimManager manages currently running shim processes. +// It is mainly responsible for launching new shims and for proper shutdown and cleanup of existing instances. +// The manager is unaware of the underlying services shim provides and lets higher level services consume them, +// but don't care about lifecycle management. +type ShimManager struct { root string state string containerdAddress string containerdTTRPCAddress string schedCore bool - - tasks *runtime.TaskList - events *exchange.Exchange - containers containers.Store + list *runtime.TaskList + events *exchange.Exchange + containers containers.Store } -// ID of the task manager -func (m *TaskManager) ID() string { - return fmt.Sprintf("%s.%s", plugin.RuntimePluginV2, "task") +// ID of the shim manager +func (m *ShimManager) ID() string { + return fmt.Sprintf("%s.%s", plugin.RuntimePluginV2, "shim") } // Create a new task -func (m *TaskManager) Create(ctx context.Context, id string, opts runtime.CreateOpts) (_ runtime.Task, retErr error) { +func (m *ShimManager) Create(ctx context.Context, id string, opts runtime.CreateOpts) (_ runtime.Task, retErr error) { bundle, err := NewBundle(ctx, m.root, m.state, id, opts.Spec.Value) if err != nil { return nil, err @@ -163,7 +167,7 @@ func (m *TaskManager) Create(ctx context.Context, id string, opts runtime.Create } defer func() { if retErr != nil { - m.deleteShim(shim) + m.cleanupShim(shim) } }() @@ -172,14 +176,14 @@ func (m *TaskManager) Create(ctx context.Context, id string, opts runtime.Create return nil, errors.Wrap(err, "failed to create shim") } - if err := m.tasks.Add(ctx, t); err != nil { + if err := m.list.Add(ctx, t); err != nil { return nil, errors.Wrap(err, "failed to add task") } return t, nil } -func (m *TaskManager) startShim(ctx context.Context, bundle *Bundle, id string, opts runtime.CreateOpts) (*shim, error) { +func (m *ShimManager) startShim(ctx context.Context, bundle *Bundle, id string, opts runtime.CreateOpts) (*shimTask, error) { ns, err := namespaces.NamespaceRequired(ctx) if err != nil { return nil, err @@ -199,12 +203,12 @@ func (m *TaskManager) startShim(ctx context.Context, bundle *Bundle, id string, shim, err := b.Start(ctx, topts, func() { log.G(ctx).WithField("id", id).Info("shim disconnected") - cleanupAfterDeadShim(context.Background(), id, ns, m.tasks, m.events, b) + cleanupAfterDeadShim(context.Background(), id, ns, m.list, m.events, b) // Remove self from the runtime task list. Even though the cleanupAfterDeadShim() // would publish taskExit event, but the shim.Delete() would always failed with ttrpc // disconnect and there is no chance to remove this dead task from runtime task lists. // Thus it's better to delete it here. - m.tasks.Delete(ctx, id) + m.list.Delete(ctx, id) }) if err != nil { return nil, errors.Wrap(err, "start failed") @@ -213,12 +217,12 @@ func (m *TaskManager) startShim(ctx context.Context, bundle *Bundle, id string, return shim, nil } -// deleteShim attempts to properly delete and cleanup shim after error -func (m *TaskManager) deleteShim(shim *shim) { +// cleanupShim attempts to properly delete and cleanup shim after error +func (m *ShimManager) cleanupShim(shim *shimTask) { dctx, cancel := timeout.WithContext(context.Background(), cleanupTimeout) defer cancel() - _, errShim := shim.delete(dctx, m.tasks.Delete) + _, errShim := shim.delete(dctx, m.list.Delete) if errShim != nil { if errdefs.IsDeadlineExceeded(errShim) { dctx, cancel = timeout.WithContext(context.Background(), cleanupTimeout) @@ -230,24 +234,24 @@ func (m *TaskManager) deleteShim(shim *shim) { } // Get a specific task -func (m *TaskManager) Get(ctx context.Context, id string) (runtime.Task, error) { - return m.tasks.Get(ctx, id) +func (m *ShimManager) Get(ctx context.Context, id string) (runtime.Task, error) { + return m.list.Get(ctx, id) } // Add a runtime task -func (m *TaskManager) Add(ctx context.Context, task runtime.Task) error { - return m.tasks.Add(ctx, task) +func (m *ShimManager) Add(ctx context.Context, task runtime.Task) error { + return m.list.Add(ctx, task) } // Delete a runtime task -func (m *TaskManager) Delete(ctx context.Context, id string) (*runtime.Exit, error) { - task, err := m.tasks.Get(ctx, id) +func (m *ShimManager) Delete(ctx context.Context, id string) (*runtime.Exit, error) { + task, err := m.list.Get(ctx, id) if err != nil { return nil, err } - shim := task.(*shim) - exit, err := shim.delete(ctx, m.tasks.Delete) + shim := task.(*shimTask) + exit, err := shim.delete(ctx, m.list.Delete) if err != nil { return nil, err } @@ -256,11 +260,11 @@ func (m *TaskManager) Delete(ctx context.Context, id string) (*runtime.Exit, err } // Tasks lists all tasks -func (m *TaskManager) Tasks(ctx context.Context, all bool) ([]runtime.Task, error) { - return m.tasks.GetAll(ctx, all) +func (m *ShimManager) Tasks(ctx context.Context, all bool) ([]runtime.Task, error) { + return m.list.GetAll(ctx, all) } -func (m *TaskManager) loadExistingTasks(ctx context.Context) error { +func (m *ShimManager) loadExistingTasks(ctx context.Context) error { nsDirs, err := os.ReadDir(m.state) if err != nil { return err @@ -275,7 +279,7 @@ func (m *TaskManager) loadExistingTasks(ctx context.Context) error { continue } log.G(ctx).WithField("namespace", ns).Debug("loading tasks in namespace") - if err := m.loadTasks(namespaces.WithNamespace(ctx, ns)); err != nil { + if err := m.loadShims(namespaces.WithNamespace(ctx, ns)); err != nil { log.G(ctx).WithField("namespace", ns).WithError(err).Error("loading tasks in namespace") continue } @@ -287,7 +291,7 @@ func (m *TaskManager) loadExistingTasks(ctx context.Context) error { return nil } -func (m *TaskManager) loadTasks(ctx context.Context) error { +func (m *ShimManager) loadShims(ctx context.Context) error { ns, err := namespaces.NamespaceRequired(ctx) if err != nil { return err @@ -341,20 +345,20 @@ func (m *TaskManager) loadTasks(ctx context.Context) error { shim, err := loadShim(ctx, bundle, func() { log.G(ctx).WithField("id", id).Info("shim disconnected") - cleanupAfterDeadShim(context.Background(), id, ns, m.tasks, m.events, binaryCall) + cleanupAfterDeadShim(context.Background(), id, ns, m.list, m.events, binaryCall) // Remove self from the runtime task list. - m.tasks.Delete(ctx, id) + m.list.Delete(ctx, id) }) if err != nil { - cleanupAfterDeadShim(ctx, id, ns, m.tasks, m.events, binaryCall) + cleanupAfterDeadShim(ctx, id, ns, m.list, m.events, binaryCall) continue } - m.tasks.Add(ctx, shim) + m.list.Add(ctx, shim) } return nil } -func (m *TaskManager) container(ctx context.Context, id string) (*containers.Container, error) { +func (m *ShimManager) container(ctx context.Context, id string) (*containers.Container, error) { container, err := m.containers.Get(ctx, id) if err != nil { return nil, err @@ -362,7 +366,7 @@ func (m *TaskManager) container(ctx context.Context, id string) (*containers.Con return &container, nil } -func (m *TaskManager) cleanupWorkDirs(ctx context.Context) error { +func (m *ShimManager) cleanupWorkDirs(ctx context.Context) error { ns, err := namespaces.NamespaceRequired(ctx) if err != nil { return err @@ -375,7 +379,7 @@ func (m *TaskManager) cleanupWorkDirs(ctx context.Context) error { // if the task was not loaded, cleanup and empty working directory // this can happen on a reboot where /run for the bundle state is cleaned up // but that persistent working dir is left - if _, err := m.tasks.Get(ctx, d.Name()); err != nil { + if _, err := m.list.Get(ctx, d.Name()); err != nil { path := filepath.Join(m.root, ns, d.Name()) if err := os.RemoveAll(path); err != nil { log.G(ctx).WithError(err).Errorf("cleanup working dir %s", path) diff --git a/runtime/v2/process.go b/runtime/v2/process.go index 903a4282b..497219ae5 100644 --- a/runtime/v2/process.go +++ b/runtime/v2/process.go @@ -29,7 +29,7 @@ import ( type process struct { id string - shim *shim + shim *shimTask } func (p *process) ID() string { diff --git a/runtime/v2/shim.go b/runtime/v2/shim.go index 56e347563..2d9cb27c2 100644 --- a/runtime/v2/shim.go +++ b/runtime/v2/shim.go @@ -61,7 +61,7 @@ func loadAddress(path string) (string, error) { return string(data), nil } -func loadShim(ctx context.Context, bundle *Bundle, onClose func()) (_ *shim, err error) { +func loadShim(ctx context.Context, bundle *Bundle, onClose func()) (_ *shimTask, err error) { address, err := loadAddress(filepath.Join(bundle.Path, "address")) if err != nil { return nil, err @@ -115,10 +115,12 @@ func loadShim(ctx context.Context, bundle *Bundle, onClose func()) (_ *shim, err client.Close() } }() - s := &shim{ - client: client, - task: task.NewTaskClient(client), - bundle: bundle, + s := &shimTask{ + shim: &shim{ + bundle: bundle, + client: client, + }, + task: task.NewTaskClient(client), } ctx, cancel := timeout.WithContext(ctx, loadTimeout) defer cancel() @@ -182,28 +184,9 @@ func cleanupAfterDeadShim(ctx context.Context, id, ns string, rt *runtime.TaskLi }) } -var _ runtime.Task = &shim{} - type shim struct { bundle *Bundle client *ttrpc.Client - task task.TaskService -} - -func (s *shim) Shutdown(ctx context.Context) error { - _, err := s.task.Shutdown(ctx, &task.ShutdownRequest{ - ID: s.ID(), - }) - if err != nil && !errors.Is(err, ttrpc.ErrClosed) { - return errdefs.FromGRPC(err) - } - return nil -} - -func (s *shim) waitShutdown(ctx context.Context) error { - ctx, cancel := timeout.WithContext(ctx, shutdownTimeout) - defer cancel() - return s.Shutdown(ctx) } // ID of the shim/task @@ -211,18 +194,6 @@ func (s *shim) ID() string { return s.bundle.ID } -// PID of the task -func (s *shim) PID(ctx context.Context) (uint32, error) { - response, err := s.task.Connect(ctx, &task.ConnectRequest{ - ID: s.ID(), - }) - if err != nil { - return 0, errdefs.FromGRPC(err) - } - - return response.TaskPid, nil -} - func (s *shim) Namespace() string { return s.bundle.Namespace } @@ -231,7 +202,43 @@ func (s *shim) Close() error { return s.client.Close() } -func (s *shim) delete(ctx context.Context, removeTask func(ctx context.Context, id string)) (*runtime.Exit, error) { +var _ runtime.Task = &shimTask{} + +// shimTask wraps shim process and adds task service client for compatibility with existing shim manager. +type shimTask struct { + *shim + task task.TaskService +} + +func (s *shimTask) Shutdown(ctx context.Context) error { + _, err := s.task.Shutdown(ctx, &task.ShutdownRequest{ + ID: s.ID(), + }) + if err != nil && !errors.Is(err, ttrpc.ErrClosed) { + return errdefs.FromGRPC(err) + } + return nil +} + +func (s *shimTask) waitShutdown(ctx context.Context) error { + ctx, cancel := timeout.WithContext(ctx, shutdownTimeout) + defer cancel() + return s.Shutdown(ctx) +} + +// PID of the task +func (s *shimTask) PID(ctx context.Context) (uint32, error) { + response, err := s.task.Connect(ctx, &task.ConnectRequest{ + ID: s.ID(), + }) + if err != nil { + return 0, errdefs.FromGRPC(err) + } + + return response.TaskPid, nil +} + +func (s *shimTask) delete(ctx context.Context, removeTask func(ctx context.Context, id string)) (*runtime.Exit, error) { response, shimErr := s.task.Delete(ctx, &task.DeleteRequest{ ID: s.ID(), }) @@ -281,7 +288,7 @@ func (s *shim) delete(ctx context.Context, removeTask func(ctx context.Context, }, nil } -func (s *shim) Create(ctx context.Context, opts runtime.CreateOpts) (runtime.Task, error) { +func (s *shimTask) Create(ctx context.Context, opts runtime.CreateOpts) (runtime.Task, error) { topts := opts.TaskOptions if topts == nil { topts = opts.RuntimeOptions @@ -312,7 +319,7 @@ func (s *shim) Create(ctx context.Context, opts runtime.CreateOpts) (runtime.Tas return s, nil } -func (s *shim) Pause(ctx context.Context) error { +func (s *shimTask) Pause(ctx context.Context) error { if _, err := s.task.Pause(ctx, &task.PauseRequest{ ID: s.ID(), }); err != nil { @@ -321,7 +328,7 @@ func (s *shim) Pause(ctx context.Context) error { return nil } -func (s *shim) Resume(ctx context.Context) error { +func (s *shimTask) Resume(ctx context.Context) error { if _, err := s.task.Resume(ctx, &task.ResumeRequest{ ID: s.ID(), }); err != nil { @@ -330,7 +337,7 @@ func (s *shim) Resume(ctx context.Context) error { return nil } -func (s *shim) Start(ctx context.Context) error { +func (s *shimTask) Start(ctx context.Context) error { _, err := s.task.Start(ctx, &task.StartRequest{ ID: s.ID(), }) @@ -340,7 +347,7 @@ func (s *shim) Start(ctx context.Context) error { return nil } -func (s *shim) Kill(ctx context.Context, signal uint32, all bool) error { +func (s *shimTask) Kill(ctx context.Context, signal uint32, all bool) error { if _, err := s.task.Kill(ctx, &task.KillRequest{ ID: s.ID(), Signal: signal, @@ -351,7 +358,7 @@ func (s *shim) Kill(ctx context.Context, signal uint32, all bool) error { return nil } -func (s *shim) Exec(ctx context.Context, id string, opts runtime.ExecOpts) (runtime.ExecProcess, error) { +func (s *shimTask) Exec(ctx context.Context, id string, opts runtime.ExecOpts) (runtime.ExecProcess, error) { if err := identifiers.Validate(id); err != nil { return nil, errors.Wrapf(err, "invalid exec id %s", id) } @@ -373,7 +380,7 @@ func (s *shim) Exec(ctx context.Context, id string, opts runtime.ExecOpts) (runt }, nil } -func (s *shim) Pids(ctx context.Context) ([]runtime.ProcessInfo, error) { +func (s *shimTask) Pids(ctx context.Context) ([]runtime.ProcessInfo, error) { resp, err := s.task.Pids(ctx, &task.PidsRequest{ ID: s.ID(), }) @@ -390,7 +397,7 @@ func (s *shim) Pids(ctx context.Context) ([]runtime.ProcessInfo, error) { return processList, nil } -func (s *shim) ResizePty(ctx context.Context, size runtime.ConsoleSize) error { +func (s *shimTask) ResizePty(ctx context.Context, size runtime.ConsoleSize) error { _, err := s.task.ResizePty(ctx, &task.ResizePtyRequest{ ID: s.ID(), Width: size.Width, @@ -402,7 +409,7 @@ func (s *shim) ResizePty(ctx context.Context, size runtime.ConsoleSize) error { return nil } -func (s *shim) CloseIO(ctx context.Context) error { +func (s *shimTask) CloseIO(ctx context.Context) error { _, err := s.task.CloseIO(ctx, &task.CloseIORequest{ ID: s.ID(), Stdin: true, @@ -413,7 +420,7 @@ func (s *shim) CloseIO(ctx context.Context) error { return nil } -func (s *shim) Wait(ctx context.Context) (*runtime.Exit, error) { +func (s *shimTask) Wait(ctx context.Context) (*runtime.Exit, error) { taskPid, err := s.PID(ctx) if err != nil { return nil, err @@ -431,7 +438,7 @@ func (s *shim) Wait(ctx context.Context) (*runtime.Exit, error) { }, nil } -func (s *shim) Checkpoint(ctx context.Context, path string, options *ptypes.Any) error { +func (s *shimTask) Checkpoint(ctx context.Context, path string, options *ptypes.Any) error { request := &task.CheckpointTaskRequest{ ID: s.ID(), Path: path, @@ -443,7 +450,7 @@ func (s *shim) Checkpoint(ctx context.Context, path string, options *ptypes.Any) return nil } -func (s *shim) Update(ctx context.Context, resources *ptypes.Any, annotations map[string]string) error { +func (s *shimTask) Update(ctx context.Context, resources *ptypes.Any, annotations map[string]string) error { if _, err := s.task.Update(ctx, &task.UpdateTaskRequest{ ID: s.ID(), Resources: resources, @@ -454,7 +461,7 @@ func (s *shim) Update(ctx context.Context, resources *ptypes.Any, annotations ma return nil } -func (s *shim) Stats(ctx context.Context) (*ptypes.Any, error) { +func (s *shimTask) Stats(ctx context.Context) (*ptypes.Any, error) { response, err := s.task.Stats(ctx, &task.StatsRequest{ ID: s.ID(), }) @@ -464,7 +471,7 @@ func (s *shim) Stats(ctx context.Context) (*ptypes.Any, error) { return response.Stats, nil } -func (s *shim) Process(ctx context.Context, id string) (runtime.ExecProcess, error) { +func (s *shimTask) Process(ctx context.Context, id string) (runtime.ExecProcess, error) { p := &process{ id: id, shim: s, @@ -475,7 +482,7 @@ func (s *shim) Process(ctx context.Context, id string) (runtime.ExecProcess, err return p, nil } -func (s *shim) State(ctx context.Context) (runtime.State, error) { +func (s *shimTask) State(ctx context.Context) (runtime.State, error) { response, err := s.task.State(ctx, &task.StateRequest{ ID: s.ID(), }) diff --git a/services/tasks/local.go b/services/tasks/local.go index 067a5b05c..edee5616c 100644 --- a/services/tasks/local.go +++ b/services/tasks/local.go @@ -111,7 +111,7 @@ func initFunc(ic *plugin.InitContext) (interface{}, error) { store: db.ContentStore(), publisher: ep.(events.Publisher), monitor: monitor.(runtime.TaskMonitor), - v2Runtime: v2r.(*v2.TaskManager), + v2Runtime: v2r.(*v2.ShimManager), } for _, r := range runtimes { tasks, err := r.Tasks(ic.Context, true) @@ -139,7 +139,7 @@ type local struct { publisher events.Publisher monitor runtime.TaskMonitor - v2Runtime *v2.TaskManager + v2Runtime *v2.ShimManager } func (l *local) Create(ctx context.Context, r *api.CreateTaskRequest, _ ...grpc.CallOption) (*api.CreateTaskResponse, error) { From 7c4ead285dbab56357c3c5411612b5032661f1f5 Mon Sep 17 00:00:00 2001 From: Maksym Pavlenko Date: Thu, 26 Aug 2021 11:56:46 -0700 Subject: [PATCH 02/12] Add task manager Signed-off-by: Maksym Pavlenko --- integration/client/container_linux_test.go | 5 ++- runtime/runtime.go | 2 - runtime/v2/manager.go | 51 ++++++++++++++++++++++ services/tasks/local.go | 6 +-- 4 files changed, 58 insertions(+), 6 deletions(-) diff --git a/integration/client/container_linux_test.go b/integration/client/container_linux_test.go index 4de3f4477..40023bbb2 100644 --- a/integration/client/container_linux_test.go +++ b/integration/client/container_linux_test.go @@ -41,7 +41,10 @@ import ( "github.com/containerd/containerd/runtime/linux/runctypes" "github.com/containerd/containerd/runtime/v2/runc/options" "github.com/containerd/containerd/sys" + "github.com/containerd/typeurl" + "github.com/opencontainers/runtime-spec/specs-go" specs "github.com/opencontainers/runtime-spec/specs-go" + "github.com/pkg/errors" exec "golang.org/x/sys/execabs" "golang.org/x/sys/unix" ) @@ -430,7 +433,7 @@ func getLogDirPath(runtimeVersion, id string) string { case "v1": return filepath.Join(defaultRoot, plugin.RuntimeLinuxV1, testNamespace, id) case "v2": - return filepath.Join(defaultState, "io.containerd.runtime.v2.task", testNamespace, id) + return filepath.Join(defaultState, "io.containerd.runtime.v2.shim", testNamespace, id) default: panic(fmt.Errorf("Unsupported runtime version %s", runtimeVersion)) } diff --git a/runtime/runtime.go b/runtime/runtime.go index 52974a3a9..b4bed75cf 100644 --- a/runtime/runtime.go +++ b/runtime/runtime.go @@ -69,8 +69,6 @@ type PlatformRuntime interface { // Tasks returns all the current tasks for the runtime. // Any container runs at most one task at a time. Tasks(ctx context.Context, all bool) ([]Task, error) - // Add adds a task into runtime. - Add(ctx context.Context, task Task) error // Delete remove a task. Delete(ctx context.Context, taskID string) (*Exit, error) } diff --git a/runtime/v2/manager.go b/runtime/v2/manager.go index d7b23f050..5cb5b9cd2 100644 --- a/runtime/v2/manager.go +++ b/runtime/v2/manager.go @@ -92,6 +92,20 @@ func init() { }) }, }) + + plugin.Register(&plugin.Registration{ + Type: plugin.RuntimePluginV2, + ID: "task", + InitFn: func(ic *plugin.InitContext) (interface{}, error) { + shimInstance, err := ic.GetByID(plugin.RuntimePluginV2, "shim") + if err != nil { + return nil, err + } + + shimManager := shimInstance.(*ShimManager) + return NewTaskManager(shimManager), nil + }, + }) } type ManagerConfig struct { @@ -400,3 +414,40 @@ func parsePlatforms(platformStr []string) ([]ocispec.Platform, error) { } return p, nil } + +// TaskManager wraps task service client on top of shim manager. +type TaskManager struct { + shims *ShimManager +} + +// NewTaskManager creates a new task manager instance. +func NewTaskManager(shims *ShimManager) *TaskManager { + return &TaskManager{ + shims: shims, + } +} + +// ID of the task manager +func (m *TaskManager) ID() string { + return fmt.Sprintf("%s.%s", plugin.RuntimePluginV2, "task") +} + +// Create launches new shim instance and creates new task +func (m *TaskManager) Create(ctx context.Context, taskID string, opts runtime.CreateOpts) (runtime.Task, error) { + return m.shims.Create(ctx, taskID, opts) +} + +// Get a specific task +func (m *TaskManager) Get(ctx context.Context, id string) (runtime.Task, error) { + return m.shims.Get(ctx, id) +} + +// Tasks lists all tasks +func (m *TaskManager) Tasks(ctx context.Context, all bool) ([]runtime.Task, error) { + return m.shims.Tasks(ctx, all) +} + +// Delete deletes the task and shim instance +func (m *TaskManager) Delete(ctx context.Context, taskID string) (*runtime.Exit, error) { + return m.shims.Delete(ctx, taskID) +} diff --git a/services/tasks/local.go b/services/tasks/local.go index edee5616c..54c758fbf 100644 --- a/services/tasks/local.go +++ b/services/tasks/local.go @@ -81,7 +81,7 @@ func initFunc(ic *plugin.InitContext) (interface{}, error) { return nil, err } - v2r, err := ic.Get(plugin.RuntimePluginV2) + v2r, err := ic.GetByID(plugin.RuntimePluginV2, "task") if err != nil { return nil, err } @@ -111,7 +111,7 @@ func initFunc(ic *plugin.InitContext) (interface{}, error) { store: db.ContentStore(), publisher: ep.(events.Publisher), monitor: monitor.(runtime.TaskMonitor), - v2Runtime: v2r.(*v2.ShimManager), + v2Runtime: v2r.(*v2.TaskManager), } for _, r := range runtimes { tasks, err := r.Tasks(ic.Context, true) @@ -139,7 +139,7 @@ type local struct { publisher events.Publisher monitor runtime.TaskMonitor - v2Runtime *v2.ShimManager + v2Runtime *v2.TaskManager } func (l *local) Create(ctx context.Context, r *api.CreateTaskRequest, _ ...grpc.CallOption) (*api.CreateTaskResponse, error) { From fb5f6ce3c9f9fd8b7d18df55b066703fb1ab4ce2 Mon Sep 17 00:00:00 2001 From: Maksym Pavlenko Date: Thu, 26 Aug 2021 16:35:56 -0700 Subject: [PATCH 03/12] Rework task create and cleanup flow Signed-off-by: Maksym Pavlenko --- runtime/v2/binary.go | 11 ++-- runtime/v2/manager.go | 117 ++++++++++++++++++++++++++---------------- runtime/v2/shim.go | 36 ++++++++++--- 3 files changed, 107 insertions(+), 57 deletions(-) diff --git a/runtime/v2/binary.go b/runtime/v2/binary.go index b847a6a8b..01584ab5a 100644 --- a/runtime/v2/binary.go +++ b/runtime/v2/binary.go @@ -60,7 +60,7 @@ type binary struct { bundle *Bundle } -func (b *binary) Start(ctx context.Context, opts *types.Any, onClose func()) (_ *shimTask, err error) { +func (b *binary) Start(ctx context.Context, opts *types.Any, onClose func()) (_ *shim, err error) { args := []string{"-id", b.bundle.ID} switch logrus.GetLevel() { case logrus.DebugLevel, logrus.TraceLevel: @@ -128,12 +128,9 @@ func (b *binary) Start(ctx context.Context, opts *types.Any, onClose func()) (_ f.Close() } client := ttrpc.NewClient(conn, ttrpc.WithOnClose(onCloseWithShimLog)) - return &shimTask{ - shim: &shim{ - bundle: b.bundle, - client: client, - }, - task: task.NewTaskClient(client), + return &shim{ + bundle: b.bundle, + client: client, }, nil } diff --git a/runtime/v2/manager.go b/runtime/v2/manager.go index 5cb5b9cd2..7baba194f 100644 --- a/runtime/v2/manager.go +++ b/runtime/v2/manager.go @@ -33,6 +33,7 @@ import ( "github.com/containerd/containerd/platforms" "github.com/containerd/containerd/plugin" "github.com/containerd/containerd/runtime" + "github.com/containerd/containerd/runtime/v2/task" ocispec "github.com/opencontainers/image-spec/specs-go/v1" "github.com/pkg/errors" ) @@ -163,8 +164,8 @@ func (m *ShimManager) ID() string { return fmt.Sprintf("%s.%s", plugin.RuntimePluginV2, "shim") } -// Create a new task -func (m *ShimManager) Create(ctx context.Context, id string, opts runtime.CreateOpts) (_ runtime.Task, retErr error) { +// Start launches a new shim instance +func (m *ShimManager) Start(ctx context.Context, id string, opts runtime.CreateOpts) (_ *shimTask, retErr error) { bundle, err := NewBundle(ctx, m.root, m.state, id, opts.Spec.Value) if err != nil { return nil, err @@ -185,19 +186,21 @@ func (m *ShimManager) Create(ctx context.Context, id string, opts runtime.Create } }() - t, err := shim.Create(ctx, opts) - if err != nil { - return nil, errors.Wrap(err, "failed to create shim") + // NOTE: temporarily keep this wrapper around until containerd's task service depends on it. + // This will no longer be required once we migrate to client side task management. + shimTask := &shimTask{ + shim: shim, + task: task.NewTaskClient(shim.client), } - if err := m.list.Add(ctx, t); err != nil { + if err := m.list.Add(ctx, shimTask); err != nil { return nil, errors.Wrap(err, "failed to add task") } - return t, nil + return shimTask, nil } -func (m *ShimManager) startShim(ctx context.Context, bundle *Bundle, id string, opts runtime.CreateOpts) (*shimTask, error) { +func (m *ShimManager) startShim(ctx context.Context, bundle *Bundle, id string, opts runtime.CreateOpts) (*shim, error) { ns, err := namespaces.NamespaceRequired(ctx) if err != nil { return nil, err @@ -232,50 +235,35 @@ func (m *ShimManager) startShim(ctx context.Context, bundle *Bundle, id string, } // cleanupShim attempts to properly delete and cleanup shim after error -func (m *ShimManager) cleanupShim(shim *shimTask) { +func (m *ShimManager) cleanupShim(shim *shim) { dctx, cancel := timeout.WithContext(context.Background(), cleanupTimeout) defer cancel() - _, errShim := shim.delete(dctx, m.list.Delete) - if errShim != nil { - if errdefs.IsDeadlineExceeded(errShim) { - dctx, cancel = timeout.WithContext(context.Background(), cleanupTimeout) - defer cancel() - } - shim.Shutdown(dctx) - shim.Close() + _ = shim.delete(dctx) + m.list.Delete(dctx, shim.ID()) +} + +func (m *ShimManager) Get(ctx context.Context, id string) (*shim, error) { + item, err := m.list.Get(ctx, id) + if err != nil { + return nil, err } -} -// Get a specific task -func (m *ShimManager) Get(ctx context.Context, id string) (runtime.Task, error) { - return m.list.Get(ctx, id) -} - -// Add a runtime task -func (m *ShimManager) Add(ctx context.Context, task runtime.Task) error { - return m.list.Add(ctx, task) + shimTask := item.(*shimTask) + return shimTask.shim, nil } // Delete a runtime task -func (m *ShimManager) Delete(ctx context.Context, id string) (*runtime.Exit, error) { - task, err := m.list.Get(ctx, id) +func (m *ShimManager) Delete(ctx context.Context, id string) error { + shim, err := m.Get(ctx, id) if err != nil { - return nil, err + return err } - shim := task.(*shimTask) - exit, err := shim.delete(ctx, m.list.Delete) - if err != nil { - return nil, err - } + err = shim.delete(ctx) + m.list.Delete(ctx, id) - return exit, err -} - -// Tasks lists all tasks -func (m *ShimManager) Tasks(ctx context.Context, all bool) ([]runtime.Task, error) { - return m.list.GetAll(ctx, all) + return err } func (m *ShimManager) loadExistingTasks(ctx context.Context) error { @@ -434,20 +422,61 @@ func (m *TaskManager) ID() string { // Create launches new shim instance and creates new task func (m *TaskManager) Create(ctx context.Context, taskID string, opts runtime.CreateOpts) (runtime.Task, error) { - return m.shims.Create(ctx, taskID, opts) + shim, err := m.shims.Start(ctx, taskID, opts) + if err != nil { + return nil, errors.Wrap(err, "failed to start shim") + } + + t, err := shim.Create(ctx, opts) + if err != nil { + dctx, cancel := timeout.WithContext(context.Background(), cleanupTimeout) + defer cancel() + + _, errShim := shim.delete(dctx, func(ctx context.Context, id string) { + m.shims.list.Delete(ctx, id) + }) + + if errShim != nil { + if errdefs.IsDeadlineExceeded(errShim) { + dctx, cancel = timeout.WithContext(context.Background(), cleanupTimeout) + defer cancel() + } + + shim.Shutdown(dctx) + shim.Close() + } + + return nil, errors.Wrap(err, "failed to create shim task") + } + + return t, nil } // Get a specific task func (m *TaskManager) Get(ctx context.Context, id string) (runtime.Task, error) { - return m.shims.Get(ctx, id) + return m.shims.list.Get(ctx, id) } // Tasks lists all tasks func (m *TaskManager) Tasks(ctx context.Context, all bool) ([]runtime.Task, error) { - return m.shims.Tasks(ctx, all) + return m.shims.list.GetAll(ctx, all) } // Delete deletes the task and shim instance func (m *TaskManager) Delete(ctx context.Context, taskID string) (*runtime.Exit, error) { - return m.shims.Delete(ctx, taskID) + item, err := m.shims.list.Get(ctx, taskID) + if err != nil { + return nil, err + } + + shimTask := item.(*shimTask) + exit, err := shimTask.delete(ctx, func(ctx context.Context, id string) { + m.shims.list.Delete(ctx, id) + }) + + if err != nil { + return nil, fmt.Errorf("failed to delete task: %w", err) + } + + return exit, nil } diff --git a/runtime/v2/shim.go b/runtime/v2/shim.go index 2d9cb27c2..df3a2b097 100644 --- a/runtime/v2/shim.go +++ b/runtime/v2/shim.go @@ -18,6 +18,7 @@ package v2 import ( "context" + "fmt" "io" "os" "path/filepath" @@ -37,6 +38,7 @@ import ( "github.com/containerd/containerd/runtime/v2/task" "github.com/containerd/ttrpc" ptypes "github.com/gogo/protobuf/types" + "github.com/hashicorp/go-multierror" "github.com/pkg/errors" "github.com/sirupsen/logrus" ) @@ -202,6 +204,27 @@ func (s *shim) Close() error { return s.client.Close() } +func (s *shim) delete(ctx context.Context) error { + var ( + result *multierror.Error + ) + + if err := s.Close(); err != nil { + result = multierror.Append(result, fmt.Errorf("failed to close ttrpc client: %w", err)) + } + + if err := s.client.UserOnCloseWait(ctx); err != nil { + result = multierror.Append(result, fmt.Errorf("close wait error: %w", err)) + } + + if err := s.bundle.Delete(); err != nil { + log.G(ctx).WithField("id", s.ID()).WithError(err).Error("failed to delete bundle") + result = multierror.Append(result, fmt.Errorf("failed to delete bundle: %w", err)) + } + + return result.ErrorOrNil() +} + var _ runtime.Task = &shimTask{} // shimTask wraps shim process and adds task service client for compatibility with existing shim manager. @@ -267,20 +290,21 @@ func (s *shimTask) delete(ctx context.Context, removeTask func(ctx context.Conte } if err := s.waitShutdown(ctx); err != nil { - log.G(ctx).WithField("id", s.ID()).WithError(err).Error("failed to shutdown shim") + log.G(ctx).WithField("id", s.ID()).WithError(err).Error("failed to shutdown shim task") + } + + if err := s.shim.delete(ctx); err != nil { + log.G(ctx).WithField("id", s.ID()).WithError(err).Errorf("failed to delete shim") } - s.Close() - s.client.UserOnCloseWait(ctx) // remove self from the runtime task list // this seems dirty but it cleans up the API across runtimes, tasks, and the service removeTask(ctx, s.ID()) - if err := s.bundle.Delete(); err != nil { - log.G(ctx).WithField("id", s.ID()).WithError(err).Error("failed to delete bundle") - } + if shimErr != nil { return nil, shimErr } + return &runtime.Exit{ Status: response.ExitStatus, Timestamp: response.ExitedAt, From 33786ee4d279864b42e639056e64d18bfcd6da40 Mon Sep 17 00:00:00 2001 From: Maksym Pavlenko Date: Mon, 30 Aug 2021 12:59:38 -0700 Subject: [PATCH 04/12] Add plugin dependency between shim and shim services Signed-off-by: Maksym Pavlenko --- plugin/plugin.go | 2 ++ runtime/v2/manager.go | 7 +++++-- services/tasks/local.go | 2 +- services/tasks/local_freebsd.go | 1 + services/tasks/local_unix.go | 1 + services/tasks/local_windows.go | 1 + 6 files changed, 11 insertions(+), 3 deletions(-) diff --git a/plugin/plugin.go b/plugin/plugin.go index f24a3c77b..73fcd643d 100644 --- a/plugin/plugin.go +++ b/plugin/plugin.go @@ -57,6 +57,8 @@ const ( RuntimePlugin Type = "io.containerd.runtime.v1" // RuntimePluginV2 implements a runtime v2 RuntimePluginV2 Type = "io.containerd.runtime.v2" + // RuntimePluginV2Service is a shim provided service implemented on top of runtime v2 plugins. + RuntimePluginV2Service Type = "io.containerd.runtime.v2.service" // ServicePlugin implements a internal service ServicePlugin Type = "io.containerd.service.v1" // GRPCPlugin implements a grpc service diff --git a/runtime/v2/manager.go b/runtime/v2/manager.go index 7baba194f..be7427a56 100644 --- a/runtime/v2/manager.go +++ b/runtime/v2/manager.go @@ -95,8 +95,11 @@ func init() { }) plugin.Register(&plugin.Registration{ - Type: plugin.RuntimePluginV2, + Type: plugin.RuntimePluginV2Service, ID: "task", + Requires: []plugin.Type{ + plugin.RuntimePluginV2, + }, InitFn: func(ic *plugin.InitContext) (interface{}, error) { shimInstance, err := ic.GetByID(plugin.RuntimePluginV2, "shim") if err != nil { @@ -417,7 +420,7 @@ func NewTaskManager(shims *ShimManager) *TaskManager { // ID of the task manager func (m *TaskManager) ID() string { - return fmt.Sprintf("%s.%s", plugin.RuntimePluginV2, "task") + return fmt.Sprintf("%s.%s", plugin.RuntimePluginV2Service, "task") } // Create launches new shim instance and creates new task diff --git a/services/tasks/local.go b/services/tasks/local.go index 54c758fbf..d06688477 100644 --- a/services/tasks/local.go +++ b/services/tasks/local.go @@ -81,7 +81,7 @@ func initFunc(ic *plugin.InitContext) (interface{}, error) { return nil, err } - v2r, err := ic.GetByID(plugin.RuntimePluginV2, "task") + v2r, err := ic.GetByID(plugin.RuntimePluginV2Service, "task") if err != nil { return nil, err } diff --git a/services/tasks/local_freebsd.go b/services/tasks/local_freebsd.go index c0b57edfa..014948f47 100644 --- a/services/tasks/local_freebsd.go +++ b/services/tasks/local_freebsd.go @@ -24,6 +24,7 @@ import ( var tasksServiceRequires = []plugin.Type{ plugin.EventPlugin, plugin.RuntimePluginV2, + plugin.RuntimePluginV2Service, plugin.MetadataPlugin, plugin.TaskMonitorPlugin, } diff --git a/services/tasks/local_unix.go b/services/tasks/local_unix.go index 50a29f612..acf1d8c64 100644 --- a/services/tasks/local_unix.go +++ b/services/tasks/local_unix.go @@ -30,6 +30,7 @@ var tasksServiceRequires = []plugin.Type{ plugin.EventPlugin, plugin.RuntimePlugin, plugin.RuntimePluginV2, + plugin.RuntimePluginV2Service, plugin.MetadataPlugin, plugin.TaskMonitorPlugin, } diff --git a/services/tasks/local_windows.go b/services/tasks/local_windows.go index 90b1ed9dd..6fd482e7f 100644 --- a/services/tasks/local_windows.go +++ b/services/tasks/local_windows.go @@ -24,6 +24,7 @@ import ( var tasksServiceRequires = []plugin.Type{ plugin.EventPlugin, plugin.RuntimePluginV2, + plugin.RuntimePluginV2Service, plugin.MetadataPlugin, plugin.TaskMonitorPlugin, } From a3d298193c816386b7a939d6cf7259e0afa3001d Mon Sep 17 00:00:00 2001 From: Maksym Pavlenko Date: Tue, 5 Oct 2021 14:51:12 -0700 Subject: [PATCH 05/12] Fix backward compatibility with old task shims Signed-off-by: Maksym Pavlenko --- integration/client/container_linux_test.go | 2 +- plugin/plugin.go | 4 +- runtime/v2/manager.go | 48 ++++++++++++++++------ services/tasks/local.go | 2 +- services/tasks/local_freebsd.go | 2 +- services/tasks/local_unix.go | 2 +- services/tasks/local_windows.go | 2 +- 7 files changed, 43 insertions(+), 19 deletions(-) diff --git a/integration/client/container_linux_test.go b/integration/client/container_linux_test.go index 40023bbb2..af8b6f099 100644 --- a/integration/client/container_linux_test.go +++ b/integration/client/container_linux_test.go @@ -433,7 +433,7 @@ func getLogDirPath(runtimeVersion, id string) string { case "v1": return filepath.Join(defaultRoot, plugin.RuntimeLinuxV1, testNamespace, id) case "v2": - return filepath.Join(defaultState, "io.containerd.runtime.v2.shim", testNamespace, id) + return filepath.Join(defaultState, "io.containerd.runtime.v2.task", testNamespace, id) default: panic(fmt.Errorf("Unsupported runtime version %s", runtimeVersion)) } diff --git a/plugin/plugin.go b/plugin/plugin.go index 73fcd643d..6c94e960c 100644 --- a/plugin/plugin.go +++ b/plugin/plugin.go @@ -57,8 +57,8 @@ const ( RuntimePlugin Type = "io.containerd.runtime.v1" // RuntimePluginV2 implements a runtime v2 RuntimePluginV2 Type = "io.containerd.runtime.v2" - // RuntimePluginV2Service is a shim provided service implemented on top of runtime v2 plugins. - RuntimePluginV2Service Type = "io.containerd.runtime.v2.service" + // RuntimeShimPlugin implements the shim manager for runtime v2. + RuntimeShimPlugin Type = "io.containerd.runtime-shim.v2" // ServicePlugin implements a internal service ServicePlugin Type = "io.containerd.service.v1" // GRPCPlugin implements a grpc service diff --git a/runtime/v2/manager.go b/runtime/v2/manager.go index be7427a56..7a4796743 100644 --- a/runtime/v2/manager.go +++ b/runtime/v2/manager.go @@ -48,7 +48,7 @@ type Config struct { func init() { plugin.Register(&plugin.Registration{ - Type: plugin.RuntimePluginV2, + Type: plugin.RuntimeShimPlugin, ID: "shim", Requires: []plugin.Type{ plugin.EventPlugin, @@ -65,12 +65,7 @@ func init() { } ic.Meta.Platforms = supportedPlatforms - if err := os.MkdirAll(ic.Root, 0711); err != nil { - return nil, err - } - if err := os.MkdirAll(ic.State, 0711); err != nil { - return nil, err - } + m, err := ic.Get(plugin.MetadataPlugin) if err != nil { return nil, err @@ -95,18 +90,47 @@ func init() { }) plugin.Register(&plugin.Registration{ - Type: plugin.RuntimePluginV2Service, + Type: plugin.RuntimePluginV2, ID: "task", Requires: []plugin.Type{ - plugin.RuntimePluginV2, + plugin.RuntimeShimPlugin, }, InitFn: func(ic *plugin.InitContext) (interface{}, error) { - shimInstance, err := ic.GetByID(plugin.RuntimePluginV2, "shim") + m, err := ic.Get(plugin.MetadataPlugin) + if err != nil { + return nil, err + } + ep, err := ic.GetByID(plugin.EventPlugin, "exchange") + if err != nil { + return nil, err + } + cs := metadata.NewContainerStore(m.(*metadata.DB)) + events := ep.(*exchange.Exchange) + + shimManager, err := NewShimManager(ic.Context, &ManagerConfig{ + Root: ic.Root, + State: ic.State, + Address: ic.Address, + TTRPCAddress: ic.TTRPCAddress, + Events: events, + Store: cs, + SchedCore: false, + }) if err != nil { return nil, err } - shimManager := shimInstance.(*ShimManager) + // Internally task manager relies on shim manager to launch task shims. + // It's also possible to use shim manager independently and launch other types of shims. + // + // Ideally task manager should depend on shim instance we registered above, however it'll use + // different home directory (`io.containerd.runtime.v2.task` vs `io.containerd.runtime.v2.shim`), + // which will break backward compatibility when upgrading containerd to the new version. + // + // For now, we create another instance of shim manager with the "old" home directory, so shim tasks + // are properly restored, but will work independently. + // + // See more context https://github.com/containerd/containerd/pull/5918#discussion_r705434412 return NewTaskManager(shimManager), nil }, }) @@ -420,7 +444,7 @@ func NewTaskManager(shims *ShimManager) *TaskManager { // ID of the task manager func (m *TaskManager) ID() string { - return fmt.Sprintf("%s.%s", plugin.RuntimePluginV2Service, "task") + return fmt.Sprintf("%s.%s", plugin.RuntimeShimPlugin, "task") } // Create launches new shim instance and creates new task diff --git a/services/tasks/local.go b/services/tasks/local.go index d06688477..54c758fbf 100644 --- a/services/tasks/local.go +++ b/services/tasks/local.go @@ -81,7 +81,7 @@ func initFunc(ic *plugin.InitContext) (interface{}, error) { return nil, err } - v2r, err := ic.GetByID(plugin.RuntimePluginV2Service, "task") + v2r, err := ic.GetByID(plugin.RuntimePluginV2, "task") if err != nil { return nil, err } diff --git a/services/tasks/local_freebsd.go b/services/tasks/local_freebsd.go index 014948f47..aa081529a 100644 --- a/services/tasks/local_freebsd.go +++ b/services/tasks/local_freebsd.go @@ -24,7 +24,7 @@ import ( var tasksServiceRequires = []plugin.Type{ plugin.EventPlugin, plugin.RuntimePluginV2, - plugin.RuntimePluginV2Service, + plugin.RuntimeShimPlugin, plugin.MetadataPlugin, plugin.TaskMonitorPlugin, } diff --git a/services/tasks/local_unix.go b/services/tasks/local_unix.go index acf1d8c64..badd3bc7a 100644 --- a/services/tasks/local_unix.go +++ b/services/tasks/local_unix.go @@ -30,7 +30,7 @@ var tasksServiceRequires = []plugin.Type{ plugin.EventPlugin, plugin.RuntimePlugin, plugin.RuntimePluginV2, - plugin.RuntimePluginV2Service, + plugin.RuntimeShimPlugin, plugin.MetadataPlugin, plugin.TaskMonitorPlugin, } diff --git a/services/tasks/local_windows.go b/services/tasks/local_windows.go index 6fd482e7f..c494f4981 100644 --- a/services/tasks/local_windows.go +++ b/services/tasks/local_windows.go @@ -24,7 +24,7 @@ import ( var tasksServiceRequires = []plugin.Type{ plugin.EventPlugin, plugin.RuntimePluginV2, - plugin.RuntimePluginV2Service, + plugin.RuntimeShimPlugin, plugin.MetadataPlugin, plugin.TaskMonitorPlugin, } From b554b577b05b1663931fcc3c792d60fb1a157020 Mon Sep 17 00:00:00 2001 From: Maksym Pavlenko Date: Tue, 5 Oct 2021 10:01:41 -0700 Subject: [PATCH 06/12] Move shim restore to a separate file Signed-off-by: Maksym Pavlenko --- runtime/v2/shim_load.go | 151 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 151 insertions(+) create mode 100644 runtime/v2/shim_load.go diff --git a/runtime/v2/shim_load.go b/runtime/v2/shim_load.go new file mode 100644 index 000000000..9dcfccf09 --- /dev/null +++ b/runtime/v2/shim_load.go @@ -0,0 +1,151 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package v2 + +import ( + "context" + "os" + "path/filepath" + + "github.com/containerd/containerd/containers" + "github.com/containerd/containerd/events/exchange" + "github.com/containerd/containerd/log" + "github.com/containerd/containerd/mount" + "github.com/containerd/containerd/namespaces" + "github.com/containerd/containerd/plugin" + "github.com/containerd/containerd/runtime" +) + +func loadExistingTasks(ic *plugin.InitContext, list *runtime.TaskList, events *exchange.Exchange, containers containers.Store) error { + var ( + ctx = ic.Context + state = ic.State + root = ic.Root + containerdAddress = ic.Address + containerdTTRPCAddress = ic.TTRPCAddress + ) + + nsDirs, err := os.ReadDir(state) + if err != nil { + return err + } + for _, nsd := range nsDirs { + if !nsd.IsDir() { + continue + } + ns := nsd.Name() + // skip hidden directories + if len(ns) > 0 && ns[0] == '.' { + continue + } + log.G(ctx).WithField("namespace", ns).Debug("loading tasks in namespace") + if err := loadShims(namespaces.WithNamespace(ctx, ns), state, list, events, containers, containerdAddress, containerdTTRPCAddress); err != nil { + log.G(ctx).WithField("namespace", ns).WithError(err).Error("loading tasks in namespace") + continue + } + if err := cleanupWorkDirs(namespaces.WithNamespace(ctx, ns), root, list); err != nil { + log.G(ctx).WithField("namespace", ns).WithError(err).Error("cleanup working directory in namespace") + continue + } + } + + return nil +} + +func loadShims(ctx context.Context, state string, list *runtime.TaskList, events *exchange.Exchange, containers containers.Store, containerdAddress string, containerdTTRPCAddress string) error { + ns, err := namespaces.NamespaceRequired(ctx) + if err != nil { + return err + } + shimDirs, err := os.ReadDir(filepath.Join(state, ns)) + if err != nil { + return err + } + for _, sd := range shimDirs { + if !sd.IsDir() { + continue + } + id := sd.Name() + // skip hidden directories + if len(id) > 0 && id[0] == '.' { + continue + } + bundle, err := LoadBundle(ctx, state, id) + if err != nil { + // fine to return error here, it is a programmer error if the context + // does not have a namespace + return err + } + // fast path + bf, err := os.ReadDir(bundle.Path) + if err != nil { + bundle.Delete() + log.G(ctx).WithError(err).Errorf("fast path read bundle path for %s", bundle.Path) + continue + } + if len(bf) == 0 { + bundle.Delete() + continue + } + container, err := containers.Get(ctx, id) + if err != nil { + log.G(ctx).WithError(err).Errorf("loading container %s", id) + if err := mount.UnmountAll(filepath.Join(bundle.Path, "rootfs"), 0); err != nil { + log.G(ctx).WithError(err).Errorf("forceful unmount of rootfs %s", id) + } + bundle.Delete() + continue + } + binaryCall := shimBinary(bundle, container.Runtime.Name, containerdAddress, containerdTTRPCAddress) + shim, err := loadShim(ctx, bundle, func() { + log.G(ctx).WithField("id", id).Info("shim disconnected") + + cleanupAfterDeadShim(context.Background(), id, ns, list, events, binaryCall) + // Remove self from the runtime task list. + list.Delete(ctx, id) + }) + if err != nil { + cleanupAfterDeadShim(ctx, id, ns, list, events, binaryCall) + continue + } + list.Add(ctx, shim) + } + return nil +} + +func cleanupWorkDirs(ctx context.Context, root string, list *runtime.TaskList) error { + ns, err := namespaces.NamespaceRequired(ctx) + if err != nil { + return err + } + dirs, err := os.ReadDir(filepath.Join(root, ns)) + if err != nil { + return err + } + for _, d := range dirs { + // if the task was not loaded, cleanup and empty working directory + // this can happen on a reboot where /run for the bundle state is cleaned up + // but that persistent working dir is left + if _, err := list.Get(ctx, d.Name()); err != nil { + path := filepath.Join(root, ns, d.Name()) + if err := os.RemoveAll(path); err != nil { + log.G(ctx).WithError(err).Errorf("cleanup working dir %s", path) + } + } + } + return nil +} From df8c206a927986a6953586c556b01f594b951821 Mon Sep 17 00:00:00 2001 From: Maksym Pavlenko Date: Thu, 7 Oct 2021 13:53:12 -0700 Subject: [PATCH 07/12] Cleanup shim loading Signed-off-by: Maksym Pavlenko --- runtime/v2/manager.go | 133 ++-------------------------------------- runtime/v2/shim_load.go | 53 +++++++--------- 2 files changed, 29 insertions(+), 157 deletions(-) diff --git a/runtime/v2/manager.go b/runtime/v2/manager.go index 7a4796743..76477dc89 100644 --- a/runtime/v2/manager.go +++ b/runtime/v2/manager.go @@ -20,14 +20,12 @@ import ( "context" "fmt" "os" - "path/filepath" "github.com/containerd/containerd/containers" "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/events/exchange" "github.com/containerd/containerd/log" "github.com/containerd/containerd/metadata" - "github.com/containerd/containerd/mount" "github.com/containerd/containerd/namespaces" "github.com/containerd/containerd/pkg/timeout" "github.com/containerd/containerd/platforms" @@ -94,6 +92,8 @@ func init() { ID: "task", Requires: []plugin.Type{ plugin.RuntimeShimPlugin, + plugin.EventPlugin, + plugin.MetadataPlugin, }, InitFn: func(ic *plugin.InitContext) (interface{}, error) { m, err := ic.Get(plugin.MetadataPlugin) @@ -120,6 +120,10 @@ func init() { return nil, err } + if err := shimManager.loadExistingTasks(ic.Context); err != nil { + return nil, err + } + // Internally task manager relies on shim manager to launch task shims. // It's also possible to use shim manager independently and launch other types of shims. // @@ -293,131 +297,6 @@ func (m *ShimManager) Delete(ctx context.Context, id string) error { return err } -func (m *ShimManager) loadExistingTasks(ctx context.Context) error { - nsDirs, err := os.ReadDir(m.state) - if err != nil { - return err - } - for _, nsd := range nsDirs { - if !nsd.IsDir() { - continue - } - ns := nsd.Name() - // skip hidden directories - if len(ns) > 0 && ns[0] == '.' { - continue - } - log.G(ctx).WithField("namespace", ns).Debug("loading tasks in namespace") - if err := m.loadShims(namespaces.WithNamespace(ctx, ns)); err != nil { - log.G(ctx).WithField("namespace", ns).WithError(err).Error("loading tasks in namespace") - continue - } - if err := m.cleanupWorkDirs(namespaces.WithNamespace(ctx, ns)); err != nil { - log.G(ctx).WithField("namespace", ns).WithError(err).Error("cleanup working directory in namespace") - continue - } - } - return nil -} - -func (m *ShimManager) loadShims(ctx context.Context) error { - ns, err := namespaces.NamespaceRequired(ctx) - if err != nil { - return err - } - shimDirs, err := os.ReadDir(filepath.Join(m.state, ns)) - if err != nil { - return err - } - for _, sd := range shimDirs { - if !sd.IsDir() { - continue - } - id := sd.Name() - // skip hidden directories - if len(id) > 0 && id[0] == '.' { - continue - } - bundle, err := LoadBundle(ctx, m.state, id) - if err != nil { - // fine to return error here, it is a programmer error if the context - // does not have a namespace - return err - } - // fast path - bf, err := os.ReadDir(bundle.Path) - if err != nil { - bundle.Delete() - log.G(ctx).WithError(err).Errorf("fast path read bundle path for %s", bundle.Path) - continue - } - if len(bf) == 0 { - bundle.Delete() - continue - } - container, err := m.container(ctx, id) - if err != nil { - log.G(ctx).WithError(err).Errorf("loading container %s", id) - if err := mount.UnmountAll(filepath.Join(bundle.Path, "rootfs"), 0); err != nil { - log.G(ctx).WithError(err).Errorf("forceful unmount of rootfs %s", id) - } - bundle.Delete() - continue - } - binaryCall := shimBinary(bundle, - shimBinaryConfig{ - runtime: container.Runtime.Name, - address: m.containerdAddress, - ttrpcAddress: m.containerdTTRPCAddress, - schedCore: m.schedCore, - }) - shim, err := loadShim(ctx, bundle, func() { - log.G(ctx).WithField("id", id).Info("shim disconnected") - - cleanupAfterDeadShim(context.Background(), id, ns, m.list, m.events, binaryCall) - // Remove self from the runtime task list. - m.list.Delete(ctx, id) - }) - if err != nil { - cleanupAfterDeadShim(ctx, id, ns, m.list, m.events, binaryCall) - continue - } - m.list.Add(ctx, shim) - } - return nil -} - -func (m *ShimManager) container(ctx context.Context, id string) (*containers.Container, error) { - container, err := m.containers.Get(ctx, id) - if err != nil { - return nil, err - } - return &container, nil -} - -func (m *ShimManager) cleanupWorkDirs(ctx context.Context) error { - ns, err := namespaces.NamespaceRequired(ctx) - if err != nil { - return err - } - dirs, err := os.ReadDir(filepath.Join(m.root, ns)) - if err != nil { - return err - } - for _, d := range dirs { - // if the task was not loaded, cleanup and empty working directory - // this can happen on a reboot where /run for the bundle state is cleaned up - // but that persistent working dir is left - if _, err := m.list.Get(ctx, d.Name()); err != nil { - path := filepath.Join(m.root, ns, d.Name()) - if err := os.RemoveAll(path); err != nil { - log.G(ctx).WithError(err).Errorf("cleanup working dir %s", path) - } - } - } - return nil -} - func parsePlatforms(platformStr []string) ([]ocispec.Platform, error) { p := make([]ocispec.Platform, len(platformStr)) for i, v := range platformStr { diff --git a/runtime/v2/shim_load.go b/runtime/v2/shim_load.go index 9dcfccf09..8eed0f467 100644 --- a/runtime/v2/shim_load.go +++ b/runtime/v2/shim_load.go @@ -21,25 +21,13 @@ import ( "os" "path/filepath" - "github.com/containerd/containerd/containers" - "github.com/containerd/containerd/events/exchange" "github.com/containerd/containerd/log" "github.com/containerd/containerd/mount" "github.com/containerd/containerd/namespaces" - "github.com/containerd/containerd/plugin" - "github.com/containerd/containerd/runtime" ) -func loadExistingTasks(ic *plugin.InitContext, list *runtime.TaskList, events *exchange.Exchange, containers containers.Store) error { - var ( - ctx = ic.Context - state = ic.State - root = ic.Root - containerdAddress = ic.Address - containerdTTRPCAddress = ic.TTRPCAddress - ) - - nsDirs, err := os.ReadDir(state) +func (m *ShimManager) loadExistingTasks(ctx context.Context) error { + nsDirs, err := os.ReadDir(m.state) if err != nil { return err } @@ -53,25 +41,24 @@ func loadExistingTasks(ic *plugin.InitContext, list *runtime.TaskList, events *e continue } log.G(ctx).WithField("namespace", ns).Debug("loading tasks in namespace") - if err := loadShims(namespaces.WithNamespace(ctx, ns), state, list, events, containers, containerdAddress, containerdTTRPCAddress); err != nil { + if err := m.loadShims(namespaces.WithNamespace(ctx, ns)); err != nil { log.G(ctx).WithField("namespace", ns).WithError(err).Error("loading tasks in namespace") continue } - if err := cleanupWorkDirs(namespaces.WithNamespace(ctx, ns), root, list); err != nil { + if err := m.cleanupWorkDirs(namespaces.WithNamespace(ctx, ns)); err != nil { log.G(ctx).WithField("namespace", ns).WithError(err).Error("cleanup working directory in namespace") continue } } - return nil } -func loadShims(ctx context.Context, state string, list *runtime.TaskList, events *exchange.Exchange, containers containers.Store, containerdAddress string, containerdTTRPCAddress string) error { +func (m *ShimManager) loadShims(ctx context.Context) error { ns, err := namespaces.NamespaceRequired(ctx) if err != nil { return err } - shimDirs, err := os.ReadDir(filepath.Join(state, ns)) + shimDirs, err := os.ReadDir(filepath.Join(m.state, ns)) if err != nil { return err } @@ -84,7 +71,7 @@ func loadShims(ctx context.Context, state string, list *runtime.TaskList, events if len(id) > 0 && id[0] == '.' { continue } - bundle, err := LoadBundle(ctx, state, id) + bundle, err := LoadBundle(ctx, m.state, id) if err != nil { // fine to return error here, it is a programmer error if the context // does not have a namespace @@ -101,7 +88,7 @@ func loadShims(ctx context.Context, state string, list *runtime.TaskList, events bundle.Delete() continue } - container, err := containers.Get(ctx, id) + container, err := m.containers.Get(ctx, id) if err != nil { log.G(ctx).WithError(err).Errorf("loading container %s", id) if err := mount.UnmountAll(filepath.Join(bundle.Path, "rootfs"), 0); err != nil { @@ -110,29 +97,35 @@ func loadShims(ctx context.Context, state string, list *runtime.TaskList, events bundle.Delete() continue } - binaryCall := shimBinary(bundle, container.Runtime.Name, containerdAddress, containerdTTRPCAddress) + binaryCall := shimBinary(bundle, + shimBinaryConfig{ + runtime: container.Runtime.Name, + address: m.containerdAddress, + ttrpcAddress: m.containerdTTRPCAddress, + schedCore: m.schedCore, + }) shim, err := loadShim(ctx, bundle, func() { log.G(ctx).WithField("id", id).Info("shim disconnected") - cleanupAfterDeadShim(context.Background(), id, ns, list, events, binaryCall) + cleanupAfterDeadShim(context.Background(), id, ns, m.list, m.events, binaryCall) // Remove self from the runtime task list. - list.Delete(ctx, id) + m.list.Delete(ctx, id) }) if err != nil { - cleanupAfterDeadShim(ctx, id, ns, list, events, binaryCall) + cleanupAfterDeadShim(ctx, id, ns, m.list, m.events, binaryCall) continue } - list.Add(ctx, shim) + m.list.Add(ctx, shim) } return nil } -func cleanupWorkDirs(ctx context.Context, root string, list *runtime.TaskList) error { +func (m *ShimManager) cleanupWorkDirs(ctx context.Context) error { ns, err := namespaces.NamespaceRequired(ctx) if err != nil { return err } - dirs, err := os.ReadDir(filepath.Join(root, ns)) + dirs, err := os.ReadDir(filepath.Join(m.root, ns)) if err != nil { return err } @@ -140,8 +133,8 @@ func cleanupWorkDirs(ctx context.Context, root string, list *runtime.TaskList) e // if the task was not loaded, cleanup and empty working directory // this can happen on a reboot where /run for the bundle state is cleaned up // but that persistent working dir is left - if _, err := list.Get(ctx, d.Name()); err != nil { - path := filepath.Join(root, ns, d.Name()) + if _, err := m.list.Get(ctx, d.Name()); err != nil { + path := filepath.Join(m.root, ns, d.Name()) if err := os.RemoveAll(path); err != nil { log.G(ctx).WithError(err).Errorf("cleanup working dir %s", path) } From 733519677f9b266314db5d9c20df821120477769 Mon Sep 17 00:00:00 2001 From: Maksym Pavlenko Date: Fri, 8 Oct 2021 12:26:23 -0700 Subject: [PATCH 08/12] Fix after rebase Signed-off-by: Maksym Pavlenko --- runtime/v2/manager.go | 1 + 1 file changed, 1 insertion(+) diff --git a/runtime/v2/manager.go b/runtime/v2/manager.go index 76477dc89..749931373 100644 --- a/runtime/v2/manager.go +++ b/runtime/v2/manager.go @@ -166,6 +166,7 @@ func NewShimManager(ctx context.Context, config *ManagerConfig) (*ShimManager, e list: runtime.NewTaskList(), events: config.Events, containers: config.Store, + schedCore: config.SchedCore, } if err := m.loadExistingTasks(ctx); err != nil { From 8b788d9dfee9e330e65949f8e8e6704bbf9049e0 Mon Sep 17 00:00:00 2001 From: Maksym Pavlenko Date: Fri, 8 Oct 2021 21:34:10 -0700 Subject: [PATCH 09/12] Expose shim process interface Signed-off-by: Maksym Pavlenko --- runtime/v2/manager.go | 17 +++++++++-------- runtime/v2/shim.go | 10 ++++++++++ 2 files changed, 19 insertions(+), 8 deletions(-) diff --git a/runtime/v2/manager.go b/runtime/v2/manager.go index 749931373..7a7bfe6a9 100644 --- a/runtime/v2/manager.go +++ b/runtime/v2/manager.go @@ -197,7 +197,7 @@ func (m *ShimManager) ID() string { } // Start launches a new shim instance -func (m *ShimManager) Start(ctx context.Context, id string, opts runtime.CreateOpts) (_ *shimTask, retErr error) { +func (m *ShimManager) Start(ctx context.Context, id string, opts runtime.CreateOpts) (_ ShimProcess, retErr error) { bundle, err := NewBundle(ctx, m.root, m.state, id, opts.Spec.Value) if err != nil { return nil, err @@ -275,24 +275,24 @@ func (m *ShimManager) cleanupShim(shim *shim) { m.list.Delete(dctx, shim.ID()) } -func (m *ShimManager) Get(ctx context.Context, id string) (*shim, error) { - item, err := m.list.Get(ctx, id) +func (m *ShimManager) Get(ctx context.Context, id string) (ShimProcess, error) { + proc, err := m.list.Get(ctx, id) if err != nil { return nil, err } - shimTask := item.(*shimTask) - return shimTask.shim, nil + return proc, nil } // Delete a runtime task func (m *ShimManager) Delete(ctx context.Context, id string) error { - shim, err := m.Get(ctx, id) + proc, err := m.list.Get(ctx, id) if err != nil { return err } - err = shim.delete(ctx) + shimTask := proc.(*shimTask) + err = shimTask.shim.delete(ctx) m.list.Delete(ctx, id) return err @@ -329,11 +329,12 @@ func (m *TaskManager) ID() string { // Create launches new shim instance and creates new task func (m *TaskManager) Create(ctx context.Context, taskID string, opts runtime.CreateOpts) (runtime.Task, error) { - shim, err := m.shims.Start(ctx, taskID, opts) + process, err := m.shims.Start(ctx, taskID, opts) if err != nil { return nil, errors.Wrap(err, "failed to start shim") } + shim := process.(*shimTask) t, err := shim.Create(ctx, opts) if err != nil { dctx, cancel := timeout.WithContext(context.Background(), cleanupTimeout) diff --git a/runtime/v2/shim.go b/runtime/v2/shim.go index df3a2b097..e356e14d9 100644 --- a/runtime/v2/shim.go +++ b/runtime/v2/shim.go @@ -186,6 +186,16 @@ func cleanupAfterDeadShim(ctx context.Context, id, ns string, rt *runtime.TaskLi }) } +// ShimProcess represents a shim instance managed by the shim service. +type ShimProcess interface { + runtime.Process + + // ID of the shim. + ID() string + // Namespace of this shim. + Namespace() string +} + type shim struct { bundle *Bundle client *ttrpc.Client From 2cec3a34b18a95205c7632f37b4686e982b1aa48 Mon Sep 17 00:00:00 2001 From: Maksym Pavlenko Date: Mon, 11 Oct 2021 11:50:57 -0700 Subject: [PATCH 10/12] Migrate task directory Signed-off-by: Maksym Pavlenko --- integration/client/container_linux_test.go | 2 +- runtime/task_list.go | 13 ++++ runtime/v2/manager.go | 84 ++++++++++++---------- 3 files changed, 61 insertions(+), 38 deletions(-) diff --git a/integration/client/container_linux_test.go b/integration/client/container_linux_test.go index af8b6f099..bbc842d2e 100644 --- a/integration/client/container_linux_test.go +++ b/integration/client/container_linux_test.go @@ -433,7 +433,7 @@ func getLogDirPath(runtimeVersion, id string) string { case "v1": return filepath.Join(defaultRoot, plugin.RuntimeLinuxV1, testNamespace, id) case "v2": - return filepath.Join(defaultState, "io.containerd.runtime.v2.task", testNamespace, id) + return filepath.Join(defaultState, "io.containerd.runtime-shim.v2.shim", testNamespace, id) default: panic(fmt.Errorf("Unsupported runtime version %s", runtimeVersion)) } diff --git a/runtime/task_list.go b/runtime/task_list.go index b92c6e01c..5b969e5a5 100644 --- a/runtime/task_list.go +++ b/runtime/task_list.go @@ -128,3 +128,16 @@ func (l *TaskList) Delete(ctx context.Context, id string) { delete(tasks, id) } } + +func (l *TaskList) IsEmpty() bool { + l.mu.Lock() + defer l.mu.Unlock() + + for ns := range l.tasks { + if len(l.tasks[ns]) > 0 { + return false + } + } + + return true +} diff --git a/runtime/v2/manager.go b/runtime/v2/manager.go index 7a7bfe6a9..e11d1640b 100644 --- a/runtime/v2/manager.go +++ b/runtime/v2/manager.go @@ -92,54 +92,64 @@ func init() { ID: "task", Requires: []plugin.Type{ plugin.RuntimeShimPlugin, - plugin.EventPlugin, - plugin.MetadataPlugin, }, InitFn: func(ic *plugin.InitContext) (interface{}, error) { - m, err := ic.Get(plugin.MetadataPlugin) - if err != nil { - return nil, err - } - ep, err := ic.GetByID(plugin.EventPlugin, "exchange") - if err != nil { - return nil, err - } - cs := metadata.NewContainerStore(m.(*metadata.DB)) - events := ep.(*exchange.Exchange) - - shimManager, err := NewShimManager(ic.Context, &ManagerConfig{ - Root: ic.Root, - State: ic.State, - Address: ic.Address, - TTRPCAddress: ic.TTRPCAddress, - Events: events, - Store: cs, - SchedCore: false, - }) + shimManagerInterface, err := ic.Get(plugin.RuntimeShimPlugin) if err != nil { return nil, err } - if err := shimManager.loadExistingTasks(ic.Context); err != nil { - return nil, err + shimManager := shimManagerInterface.(*ShimManager) + + // From now on task manager works via shim manager, which has different home directory. + // Check if there are any leftovers from previous containerd versions and migrate home directory, + // so we can properly restore existing tasks as well. + if err := migrateTasks(ic, shimManager); err != nil { + log.G(ic.Context).WithError(err).Error("unable to migrate tasks") } - // Internally task manager relies on shim manager to launch task shims. - // It's also possible to use shim manager independently and launch other types of shims. - // - // Ideally task manager should depend on shim instance we registered above, however it'll use - // different home directory (`io.containerd.runtime.v2.task` vs `io.containerd.runtime.v2.shim`), - // which will break backward compatibility when upgrading containerd to the new version. - // - // For now, we create another instance of shim manager with the "old" home directory, so shim tasks - // are properly restored, but will work independently. - // - // See more context https://github.com/containerd/containerd/pull/5918#discussion_r705434412 return NewTaskManager(shimManager), nil }, }) } +func migrateTasks(ic *plugin.InitContext, shimManager *ShimManager) error { + if !shimManager.list.IsEmpty() { + return nil + } + + // Rename below will fail is target directory exists. + // `Root` and `State` dirs expected to be empty at this point (we check that there are no shims loaded above). + // If for some they are not empty, these remove calls will fail (`os.Remove` requires a dir to be empty to succeed). + if err := os.Remove(shimManager.root); err != nil && !os.IsNotExist(err) { + return err + } + + if err := os.Remove(shimManager.state); err != nil && !os.IsNotExist(err) { + return err + } + + if err := os.Rename(ic.Root, shimManager.root); err != nil { + if os.IsNotExist(err) { + return nil + } + return fmt.Errorf("failed to migrate task `root` directory") + } + + if err := os.Rename(ic.State, shimManager.state); err != nil { + if os.IsNotExist(err) { + return nil + } + return fmt.Errorf("failed to migrate task `state` directory") + } + + if err := shimManager.loadExistingTasks(ic.Context); err != nil { + return fmt.Errorf("failed to load tasks after migration") + } + + return nil +} + type ManagerConfig struct { Root string State string @@ -193,7 +203,7 @@ type ShimManager struct { // ID of the shim manager func (m *ShimManager) ID() string { - return fmt.Sprintf("%s.%s", plugin.RuntimePluginV2, "shim") + return fmt.Sprintf("%s.%s", plugin.RuntimeShimPlugin, "shim") } // Start launches a new shim instance @@ -324,7 +334,7 @@ func NewTaskManager(shims *ShimManager) *TaskManager { // ID of the task manager func (m *TaskManager) ID() string { - return fmt.Sprintf("%s.%s", plugin.RuntimeShimPlugin, "task") + return fmt.Sprintf("%s.%s", plugin.RuntimePluginV2, "task") } // Create launches new shim instance and creates new task From 6fa1bb4a5c48fbb971084d38ce4dea43995ad3e9 Mon Sep 17 00:00:00 2001 From: Maksym Pavlenko Date: Mon, 1 Nov 2021 09:22:50 -0700 Subject: [PATCH 11/12] Fix build after rebase Signed-off-by: Maksym Pavlenko --- integration/client/container_linux_test.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/integration/client/container_linux_test.go b/integration/client/container_linux_test.go index bbc842d2e..586260102 100644 --- a/integration/client/container_linux_test.go +++ b/integration/client/container_linux_test.go @@ -41,10 +41,7 @@ import ( "github.com/containerd/containerd/runtime/linux/runctypes" "github.com/containerd/containerd/runtime/v2/runc/options" "github.com/containerd/containerd/sys" - "github.com/containerd/typeurl" "github.com/opencontainers/runtime-spec/specs-go" - specs "github.com/opencontainers/runtime-spec/specs-go" - "github.com/pkg/errors" exec "golang.org/x/sys/execabs" "golang.org/x/sys/unix" ) From d022fbe789b22d7e2773eeb1e01949f356b32efe Mon Sep 17 00:00:00 2001 From: Maksym Pavlenko Date: Tue, 2 Nov 2021 11:19:43 -0700 Subject: [PATCH 12/12] Address PR comments Signed-off-by: Maksym Pavlenko --- runtime/v2/manager.go | 48 +++++++++++++++++++++-------------------- runtime/v2/shim_load.go | 10 ++++----- 2 files changed, 30 insertions(+), 28 deletions(-) diff --git a/runtime/v2/manager.go b/runtime/v2/manager.go index e11d1640b..326980583 100644 --- a/runtime/v2/manager.go +++ b/runtime/v2/manager.go @@ -114,7 +114,7 @@ func init() { } func migrateTasks(ic *plugin.InitContext, shimManager *ShimManager) error { - if !shimManager.list.IsEmpty() { + if !shimManager.shims.IsEmpty() { return nil } @@ -122,29 +122,29 @@ func migrateTasks(ic *plugin.InitContext, shimManager *ShimManager) error { // `Root` and `State` dirs expected to be empty at this point (we check that there are no shims loaded above). // If for some they are not empty, these remove calls will fail (`os.Remove` requires a dir to be empty to succeed). if err := os.Remove(shimManager.root); err != nil && !os.IsNotExist(err) { - return err + return fmt.Errorf("failed to remove `root` dir: %w", err) } if err := os.Remove(shimManager.state); err != nil && !os.IsNotExist(err) { - return err + return fmt.Errorf("failed to remove `state` dir: %w", err) } if err := os.Rename(ic.Root, shimManager.root); err != nil { if os.IsNotExist(err) { return nil } - return fmt.Errorf("failed to migrate task `root` directory") + return fmt.Errorf("failed to migrate task `root` directory: %w", err) } if err := os.Rename(ic.State, shimManager.state); err != nil { if os.IsNotExist(err) { return nil } - return fmt.Errorf("failed to migrate task `state` directory") + return fmt.Errorf("failed to migrate task `state` directory: %w", err) } if err := shimManager.loadExistingTasks(ic.Context); err != nil { - return fmt.Errorf("failed to load tasks after migration") + return fmt.Errorf("failed to load tasks after migration: %w", err) } return nil @@ -173,7 +173,7 @@ func NewShimManager(ctx context.Context, config *ManagerConfig) (*ShimManager, e state: config.State, containerdAddress: config.Address, containerdTTRPCAddress: config.TTRPCAddress, - list: runtime.NewTaskList(), + shims: runtime.NewTaskList(), events: config.Events, containers: config.Store, schedCore: config.SchedCore, @@ -196,7 +196,7 @@ type ShimManager struct { containerdAddress string containerdTTRPCAddress string schedCore bool - list *runtime.TaskList + shims *runtime.TaskList events *exchange.Exchange containers containers.Store } @@ -235,7 +235,7 @@ func (m *ShimManager) Start(ctx context.Context, id string, opts runtime.CreateO task: task.NewTaskClient(shim.client), } - if err := m.list.Add(ctx, shimTask); err != nil { + if err := m.shims.Add(ctx, shimTask); err != nil { return nil, errors.Wrap(err, "failed to add task") } @@ -262,12 +262,12 @@ func (m *ShimManager) startShim(ctx context.Context, bundle *Bundle, id string, shim, err := b.Start(ctx, topts, func() { log.G(ctx).WithField("id", id).Info("shim disconnected") - cleanupAfterDeadShim(context.Background(), id, ns, m.list, m.events, b) + cleanupAfterDeadShim(context.Background(), id, ns, m.shims, m.events, b) // Remove self from the runtime task list. Even though the cleanupAfterDeadShim() // would publish taskExit event, but the shim.Delete() would always failed with ttrpc // disconnect and there is no chance to remove this dead task from runtime task lists. // Thus it's better to delete it here. - m.list.Delete(ctx, id) + m.shims.Delete(ctx, id) }) if err != nil { return nil, errors.Wrap(err, "start failed") @@ -282,11 +282,11 @@ func (m *ShimManager) cleanupShim(shim *shim) { defer cancel() _ = shim.delete(dctx) - m.list.Delete(dctx, shim.ID()) + m.shims.Delete(dctx, shim.ID()) } func (m *ShimManager) Get(ctx context.Context, id string) (ShimProcess, error) { - proc, err := m.list.Get(ctx, id) + proc, err := m.shims.Get(ctx, id) if err != nil { return nil, err } @@ -296,14 +296,14 @@ func (m *ShimManager) Get(ctx context.Context, id string) (ShimProcess, error) { // Delete a runtime task func (m *ShimManager) Delete(ctx context.Context, id string) error { - proc, err := m.list.Get(ctx, id) + proc, err := m.shims.Get(ctx, id) if err != nil { return err } shimTask := proc.(*shimTask) err = shimTask.shim.delete(ctx) - m.list.Delete(ctx, id) + m.shims.Delete(ctx, id) return err } @@ -322,13 +322,13 @@ func parsePlatforms(platformStr []string) ([]ocispec.Platform, error) { // TaskManager wraps task service client on top of shim manager. type TaskManager struct { - shims *ShimManager + manager *ShimManager } // NewTaskManager creates a new task manager instance. func NewTaskManager(shims *ShimManager) *TaskManager { return &TaskManager{ - shims: shims, + manager: shims, } } @@ -339,11 +339,13 @@ func (m *TaskManager) ID() string { // Create launches new shim instance and creates new task func (m *TaskManager) Create(ctx context.Context, taskID string, opts runtime.CreateOpts) (runtime.Task, error) { - process, err := m.shims.Start(ctx, taskID, opts) + process, err := m.manager.Start(ctx, taskID, opts) if err != nil { return nil, errors.Wrap(err, "failed to start shim") } + // Cast to shim task and call task service to create a new container task instance. + // This will not be required once shim service / client implemented. shim := process.(*shimTask) t, err := shim.Create(ctx, opts) if err != nil { @@ -351,7 +353,7 @@ func (m *TaskManager) Create(ctx context.Context, taskID string, opts runtime.Cr defer cancel() _, errShim := shim.delete(dctx, func(ctx context.Context, id string) { - m.shims.list.Delete(ctx, id) + m.manager.shims.Delete(ctx, id) }) if errShim != nil { @@ -372,24 +374,24 @@ func (m *TaskManager) Create(ctx context.Context, taskID string, opts runtime.Cr // Get a specific task func (m *TaskManager) Get(ctx context.Context, id string) (runtime.Task, error) { - return m.shims.list.Get(ctx, id) + return m.manager.shims.Get(ctx, id) } // Tasks lists all tasks func (m *TaskManager) Tasks(ctx context.Context, all bool) ([]runtime.Task, error) { - return m.shims.list.GetAll(ctx, all) + return m.manager.shims.GetAll(ctx, all) } // Delete deletes the task and shim instance func (m *TaskManager) Delete(ctx context.Context, taskID string) (*runtime.Exit, error) { - item, err := m.shims.list.Get(ctx, taskID) + item, err := m.manager.shims.Get(ctx, taskID) if err != nil { return nil, err } shimTask := item.(*shimTask) exit, err := shimTask.delete(ctx, func(ctx context.Context, id string) { - m.shims.list.Delete(ctx, id) + m.manager.shims.Delete(ctx, id) }) if err != nil { diff --git a/runtime/v2/shim_load.go b/runtime/v2/shim_load.go index 8eed0f467..2def62d02 100644 --- a/runtime/v2/shim_load.go +++ b/runtime/v2/shim_load.go @@ -107,15 +107,15 @@ func (m *ShimManager) loadShims(ctx context.Context) error { shim, err := loadShim(ctx, bundle, func() { log.G(ctx).WithField("id", id).Info("shim disconnected") - cleanupAfterDeadShim(context.Background(), id, ns, m.list, m.events, binaryCall) + cleanupAfterDeadShim(context.Background(), id, ns, m.shims, m.events, binaryCall) // Remove self from the runtime task list. - m.list.Delete(ctx, id) + m.shims.Delete(ctx, id) }) if err != nil { - cleanupAfterDeadShim(ctx, id, ns, m.list, m.events, binaryCall) + cleanupAfterDeadShim(ctx, id, ns, m.shims, m.events, binaryCall) continue } - m.list.Add(ctx, shim) + m.shims.Add(ctx, shim) } return nil } @@ -133,7 +133,7 @@ func (m *ShimManager) cleanupWorkDirs(ctx context.Context) error { // if the task was not loaded, cleanup and empty working directory // this can happen on a reboot where /run for the bundle state is cleaned up // but that persistent working dir is left - if _, err := m.list.Get(ctx, d.Name()); err != nil { + if _, err := m.shims.Get(ctx, d.Name()); err != nil { path := filepath.Join(m.root, ns, d.Name()) if err := os.RemoveAll(path); err != nil { log.G(ctx).WithError(err).Errorf("cleanup working dir %s", path)