From a12acedfad8e61cc518f550403b4ff2a4bcd0853 Mon Sep 17 00:00:00 2001 From: Abel Feng Date: Thu, 22 Feb 2024 16:05:45 +0800 Subject: [PATCH] sandbox: make a independent shim plugin Signed-off-by: Abel Feng --- core/runtime/v2/shim_load.go | 163 ++++++------ .../v2/{manager.go => shim_manager.go} | 245 +++--------------- core/runtime/v2/task_manager.go | 226 ++++++++++++++++ .../{manager_test.go => task_manager_test.go} | 0 plugins/sandbox/controller.go | 53 +++- plugins/types.go | 2 + 6 files changed, 388 insertions(+), 301 deletions(-) rename core/runtime/v2/{manager.go => shim_manager.go} (64%) create mode 100644 core/runtime/v2/task_manager.go rename core/runtime/v2/{manager_test.go => task_manager_test.go} (100%) diff --git a/core/runtime/v2/shim_load.go b/core/runtime/v2/shim_load.go index 12eff4f3c..951ab2c6f 100644 --- a/core/runtime/v2/shim_load.go +++ b/core/runtime/v2/shim_load.go @@ -19,19 +19,23 @@ package v2 import ( "context" "errors" + "fmt" "os" "path/filepath" + "github.com/containerd/errdefs" + "github.com/containerd/log" + "github.com/containerd/containerd/v2/core/mount" "github.com/containerd/containerd/v2/internal/cleanup" "github.com/containerd/containerd/v2/pkg/namespaces" "github.com/containerd/containerd/v2/pkg/timeout" - "github.com/containerd/errdefs" - "github.com/containerd/log" ) -func (m *ShimManager) loadExistingTasks(ctx context.Context) error { - nsDirs, err := os.ReadDir(m.state) +// LoadExistingShims loads existing shims from the path specified by stateDir +// rootDir is for cleaning up the unused paths of removed shims. +func (m *ShimManager) LoadExistingShims(ctx context.Context, stateDir string, rootDir string) error { + nsDirs, err := os.ReadDir(stateDir) if err != nil { return err } @@ -45,11 +49,11 @@ func (m *ShimManager) loadExistingTasks(ctx context.Context) error { continue } log.G(ctx).WithField("namespace", ns).Debug("loading tasks in namespace") - if err := m.loadShims(namespaces.WithNamespace(ctx, ns)); err != nil { + if err := m.loadShims(namespaces.WithNamespace(ctx, ns), stateDir); err != nil { log.G(ctx).WithField("namespace", ns).WithError(err).Error("loading tasks in namespace") continue } - if err := m.cleanupWorkDirs(namespaces.WithNamespace(ctx, ns)); err != nil { + if err := m.cleanupWorkDirs(namespaces.WithNamespace(ctx, ns), rootDir); err != nil { log.G(ctx).WithField("namespace", ns).WithError(err).Error("cleanup working directory in namespace") continue } @@ -57,14 +61,14 @@ func (m *ShimManager) loadExistingTasks(ctx context.Context) error { return nil } -func (m *ShimManager) loadShims(ctx context.Context) error { +func (m *ShimManager) loadShims(ctx context.Context, stateDir string) error { ns, err := namespaces.NamespaceRequired(ctx) if err != nil { return err } ctx = log.WithLogger(ctx, log.G(ctx).WithField("namespace", ns)) - shimDirs, err := os.ReadDir(filepath.Join(m.state, ns)) + shimDirs, err := os.ReadDir(filepath.Join(stateDir, ns)) if err != nil { return err } @@ -77,7 +81,7 @@ func (m *ShimManager) loadShims(ctx context.Context) error { if len(id) > 0 && id[0] == '.' { continue } - bundle, err := LoadBundle(ctx, m.state, id) + bundle, err := LoadBundle(ctx, stateDir, id) if err != nil { // fine to return error here, it is a programmer error if the context // does not have a namespace @@ -102,78 +106,87 @@ func (m *ShimManager) loadShims(ctx context.Context) error { bundle.Delete() continue } - - var ( - runtime string - ) - - // If we're on 1.6+ and specified custom path to the runtime binary, path will be saved in 'shim-binary-path' file. - if data, err := os.ReadFile(filepath.Join(bundle.Path, "shim-binary-path")); err == nil { - runtime = string(data) - } else if err != nil && !os.IsNotExist(err) { - log.G(ctx).WithError(err).Error("failed to read `runtime` path from bundle") - } - - // Query runtime name from metadata store - if runtime == "" { - container, err := m.containers.Get(ctx, id) - if err != nil { - log.G(ctx).WithError(err).Errorf("loading container %s", id) - if err := mount.UnmountRecursive(filepath.Join(bundle.Path, "rootfs"), 0); err != nil { - log.G(ctx).WithError(err).Errorf("failed to unmount of rootfs %s", id) - } - bundle.Delete() - continue - } - runtime = container.Runtime.Name - } - - runtime, err = m.resolveRuntimePath(runtime) - if err != nil { + if err := m.loadShim(ctx, bundle); err != nil { + log.G(ctx).WithError(err).Errorf("failed to load shim %s", bundle.Path) bundle.Delete() - log.G(ctx).WithError(err).Error("failed to resolve runtime path") continue } - binaryCall := shimBinary(bundle, - shimBinaryConfig{ - runtime: runtime, - address: m.containerdAddress, - ttrpcAddress: m.containerdTTRPCAddress, - schedCore: m.schedCore, - }) - shim, err := loadShimTask(ctx, bundle, func() { - log.G(ctx).WithField("id", id).Info("shim disconnected") + } + return nil +} - cleanupAfterDeadShim(cleanup.Background(ctx), id, m.shims, m.events, binaryCall) - // Remove self from the runtime task list. - m.shims.Delete(ctx, id) - }) +func (m *ShimManager) loadShim(ctx context.Context, bundle *Bundle) error { + var ( + runtime string + id = bundle.ID + ) + + // If we're on 1.6+ and specified custom path to the runtime binary, path will be saved in 'shim-binary-path' file. + if data, err := os.ReadFile(filepath.Join(bundle.Path, "shim-binary-path")); err == nil { + runtime = string(data) + } else if err != nil && !os.IsNotExist(err) { + log.G(ctx).WithError(err).Error("failed to read `runtime` path from bundle") + } + + // Query runtime name from metadata store + if runtime == "" { + container, err := m.containers.Get(ctx, id) if err != nil { - log.G(ctx).WithError(err).Errorf("unable to load shim %q", id) - cleanupAfterDeadShim(ctx, id, m.shims, m.events, binaryCall) - continue + log.G(ctx).WithError(err).Errorf("loading container %s", id) + if err := mount.UnmountRecursive(filepath.Join(bundle.Path, "rootfs"), 0); err != nil { + log.G(ctx).WithError(err).Errorf("failed to unmount of rootfs %s", id) + } + return err } + runtime = container.Runtime.Name + } - // There are 3 possibilities for the loaded shim here: - // 1. It could be a shim that is running a task. - // 2. It could be a sandbox shim. - // 3. Or it could be a shim that was created for running a task but - // something happened (probably a containerd crash) and the task was never - // created. This shim process should be cleaned up here. Look at - // containerd/containerd#6860 for further details. + runtime, err := m.resolveRuntimePath(runtime) + if err != nil { + bundle.Delete() - _, sgetErr := m.sandboxStore.Get(ctx, id) - pInfo, pidErr := shim.Pids(ctx) - if sgetErr != nil && errors.Is(sgetErr, errdefs.ErrNotFound) && (len(pInfo) == 0 || errors.Is(pidErr, errdefs.ErrNotFound)) { - log.G(ctx).WithField("id", id).Info("cleaning leaked shim process") - // We are unable to get Pids from the shim and it's not a sandbox - // shim. We should clean it up her. - // No need to do anything for removeTask since we never added this shim. - shim.delete(ctx, false, func(ctx context.Context, id string) {}) - } else { - m.shims.Add(ctx, shim.ShimInstance) - } + return fmt.Errorf("failed to resolve runtime path: %w", err) + } + + binaryCall := shimBinary(bundle, + shimBinaryConfig{ + runtime: runtime, + address: m.containerdAddress, + ttrpcAddress: m.containerdTTRPCAddress, + schedCore: m.schedCore, + }) + // TODO: It seems we can only call loadShim here if it is a sandbox shim? + shim, err := loadShimTask(ctx, bundle, func() { + log.G(ctx).WithField("id", id).Info("shim disconnected") + + cleanupAfterDeadShim(cleanup.Background(ctx), id, m.shims, m.events, binaryCall) + // Remove self from the runtime task list. + m.shims.Delete(ctx, id) + }) + if err != nil { + cleanupAfterDeadShim(ctx, id, m.shims, m.events, binaryCall) + return fmt.Errorf("unable to load shim %q: %w", id, err) + } + + // There are 3 possibilities for the loaded shim here: + // 1. It could be a shim that is running a task. + // 2. It could be a sandbox shim. + // 3. Or it could be a shim that was created for running a task but + // something happened (probably a containerd crash) and the task was never + // created. This shim process should be cleaned up here. Look at + // containerd/containerd#6860 for further details. + + _, sgetErr := m.sandboxStore.Get(ctx, id) + pInfo, pidErr := shim.Pids(ctx) + if sgetErr != nil && errors.Is(sgetErr, errdefs.ErrNotFound) && (len(pInfo) == 0 || errors.Is(pidErr, errdefs.ErrNotFound)) { + log.G(ctx).WithField("id", id).Info("cleaning leaked shim process") + // We are unable to get Pids from the shim and it's not a sandbox + // shim. We should clean it up her. + // No need to do anything for removeTask since we never added this shim. + shim.delete(ctx, false, func(ctx context.Context, id string) {}) + } else { + m.shims.Add(ctx, shim.ShimInstance) } return nil } @@ -198,13 +211,13 @@ func loadShimTask(ctx context.Context, bundle *Bundle, onClose func()) (_ *shimT return s, nil } -func (m *ShimManager) cleanupWorkDirs(ctx context.Context) error { +func (m *ShimManager) cleanupWorkDirs(ctx context.Context, rootDir string) error { ns, err := namespaces.NamespaceRequired(ctx) if err != nil { return err } - f, err := os.Open(filepath.Join(m.root, ns)) + f, err := os.Open(filepath.Join(rootDir, ns)) if err != nil { return err } @@ -220,7 +233,7 @@ func (m *ShimManager) cleanupWorkDirs(ctx context.Context) error { // this can happen on a reboot where /run for the bundle state is cleaned up // but that persistent working dir is left if _, err := m.shims.Get(ctx, dir); err != nil { - path := filepath.Join(m.root, ns, dir) + path := filepath.Join(rootDir, ns, dir) if err := os.RemoveAll(path); err != nil { log.G(ctx).WithError(err).Errorf("cleanup working dir %s", path) } diff --git a/core/runtime/v2/manager.go b/core/runtime/v2/shim_manager.go similarity index 64% rename from core/runtime/v2/manager.go rename to core/runtime/v2/shim_manager.go index 3e12f3799..8e6b5b91a 100644 --- a/core/runtime/v2/manager.go +++ b/core/runtime/v2/shim_manager.go @@ -17,7 +17,6 @@ package v2 import ( - "bytes" "context" "errors" "fmt" @@ -27,7 +26,11 @@ import ( "strings" "sync" - apitypes "github.com/containerd/containerd/v2/api/types" + "github.com/containerd/log" + "github.com/containerd/platforms" + "github.com/containerd/plugin" + "github.com/containerd/plugin/registry" + "github.com/containerd/containerd/v2/core/containers" "github.com/containerd/containerd/v2/core/events/exchange" "github.com/containerd/containerd/v2/core/metadata" @@ -39,15 +42,10 @@ import ( "github.com/containerd/containerd/v2/pkg/timeout" "github.com/containerd/containerd/v2/plugins" "github.com/containerd/containerd/v2/protobuf" - "github.com/containerd/containerd/v2/protobuf/proto" - "github.com/containerd/errdefs" - "github.com/containerd/log" - "github.com/containerd/platforms" - "github.com/containerd/plugin" - "github.com/containerd/plugin/registry" + "github.com/containerd/containerd/v2/version" ) -// Config for the v2 runtime +// Config for the shim type Config struct { // Supported platforms Platforms []string `toml:"platforms"` @@ -56,9 +54,12 @@ type Config struct { } func init() { + // ShimManager is not only for TaskManager, + // the "shim" sandbox controller also use it to manage shims, + // so we make it an independent plugin registry.Register(&plugin.Registration{ - Type: plugins.RuntimePluginV2, - ID: "task", + Type: plugins.ShimPlugin, + ID: "shim", Requires: []plugin.Type{ plugins.EventPlugin, plugins.MetadataPlugin, @@ -72,7 +73,6 @@ func init() { if err != nil { return nil, err } - ic.Meta.Platforms = supportedPlatforms m, err := ic.GetSingle(plugins.MetadataPlugin) @@ -83,13 +83,10 @@ func init() { if err != nil { return nil, err } + events := ep.(*exchange.Exchange) cs := metadata.NewContainerStore(m.(*metadata.DB)) ss := metadata.NewSandboxStore(m.(*metadata.DB)) - events := ep.(*exchange.Exchange) - - shimManager, err := NewShimManager(ic.Context, &ManagerConfig{ - Root: ic.Properties[plugins.PropertyRootDir], - State: ic.Properties[plugins.PropertyStateDir], + return NewShimManager(&ManagerConfig{ Address: ic.Properties[plugins.PropertyGRPCAddress], TTRPCAddress: ic.Properties[plugins.PropertyTTRPCAddress], Events: events, @@ -97,36 +94,27 @@ func init() { SchedCore: config.SchedCore, SandboxStore: ss, }) - if err != nil { - return nil, err - } - - return NewTaskManager(shimManager), nil }, - }) - - // Task manager uses shim manager as a dependency to manage shim instances. - // However, due to time limits and to avoid migration steps in 1.6 release, - // use the following workaround. - // This expected to be removed in 1.7. - registry.Register(&plugin.Registration{ - Type: plugins.RuntimePluginV2, - ID: "shim", - InitFn: func(ic *plugin.InitContext) (interface{}, error) { - taskManagerI, err := ic.GetByID(plugins.RuntimePluginV2, "task") - if err != nil { - return nil, err + ConfigMigration: func(ctx context.Context, configVersion int, pluginConfigs map[string]interface{}) error { + // Migrate configurations from io.containerd.runtime.v2.task + // if the configVersion >= 3 please make sure the config is under io.containerd.shim.v1.shim. + if configVersion >= version.ConfigVersion { + return nil } - - taskManager := taskManagerI.(*TaskManager) - return taskManager.manager, nil + const originalPluginName = string(plugins.RuntimePluginV2) + ".task" + original, ok := pluginConfigs[originalPluginName] + if !ok { + return nil + } + const newPluginName = string(plugins.ShimPlugin) + ".shim" + pluginConfigs[originalPluginName] = nil + pluginConfigs[newPluginName] = original + return nil }, }) } type ManagerConfig struct { - Root string - State string Store containers.Store Events *exchange.Exchange Address string @@ -136,16 +124,8 @@ type ManagerConfig struct { } // NewShimManager creates a manager for v2 shims -func NewShimManager(ctx context.Context, config *ManagerConfig) (*ShimManager, error) { - for _, d := range []string{config.Root, config.State} { - if err := os.MkdirAll(d, 0711); err != nil { - return nil, err - } - } - +func NewShimManager(config *ManagerConfig) (*ShimManager, error) { m := &ShimManager{ - root: config.Root, - state: config.State, containerdAddress: config.Address, containerdTTRPCAddress: config.TTRPCAddress, shims: runtime.NewNSMap[ShimInstance](), @@ -155,10 +135,6 @@ func NewShimManager(ctx context.Context, config *ManagerConfig) (*ShimManager, e sandboxStore: config.SandboxStore, } - if err := m.loadExistingTasks(ctx); err != nil { - return nil, err - } - return m, nil } @@ -167,8 +143,6 @@ func NewShimManager(ctx context.Context, config *ManagerConfig) (*ShimManager, e // The manager is unaware of the underlying services shim provides and lets higher level services consume them, // but don't care about lifecycle management. type ShimManager struct { - root string - state string containerdAddress string containerdTTRPCAddress string schedCore bool @@ -182,21 +156,11 @@ type ShimManager struct { // ID of the shim manager func (m *ShimManager) ID() string { - return plugins.RuntimePluginV2.String() + ".shim" + return plugins.ShimPlugin.String() + ".shim" } // Start launches a new shim instance -func (m *ShimManager) Start(ctx context.Context, id string, opts runtime.CreateOpts) (_ ShimInstance, retErr error) { - bundle, err := NewBundle(ctx, m.root, m.state, id, opts.Spec) - if err != nil { - return nil, err - } - defer func() { - if retErr != nil { - bundle.Delete() - } - }() - +func (m *ShimManager) Start(ctx context.Context, id string, bundle *Bundle, opts runtime.CreateOpts) (_ ShimInstance, retErr error) { // This container belongs to sandbox which supposed to be already started via sandbox API. if opts.SandboxID != "" { process, err := m.Get(ctx, opts.SandboxID) @@ -209,7 +173,7 @@ func (m *ShimManager) Start(ctx context.Context, id string, opts runtime.CreateO return nil, err } - params, err := restoreBootstrapParams(filepath.Join(m.state, process.Namespace(), opts.SandboxID)) + params, err := restoreBootstrapParams(process.Bundle()) if err != nil { return nil, err } @@ -423,148 +387,3 @@ func (m *ShimManager) Delete(ctx context.Context, id string) error { return err } - -// TaskManager wraps task service client on top of shim manager. -type TaskManager struct { - manager *ShimManager -} - -// NewTaskManager creates a new task manager instance. -func NewTaskManager(shims *ShimManager) *TaskManager { - return &TaskManager{ - manager: shims, - } -} - -// ID of the task manager -func (m *TaskManager) ID() string { - return plugins.RuntimePluginV2.String() + ".task" -} - -// Create launches new shim instance and creates new task -func (m *TaskManager) Create(ctx context.Context, taskID string, opts runtime.CreateOpts) (runtime.Task, error) { - shim, err := m.manager.Start(ctx, taskID, opts) - if err != nil { - return nil, fmt.Errorf("failed to start shim: %w", err) - } - - // Cast to shim task and call task service to create a new container task instance. - // This will not be required once shim service / client implemented. - shimTask, err := newShimTask(shim) - if err != nil { - return nil, err - } - - t, err := shimTask.Create(ctx, opts) - if err != nil { - // NOTE: ctx contains required namespace information. - m.manager.shims.Delete(ctx, taskID) - - dctx, cancel := timeout.WithContext(cleanup.Background(ctx), cleanupTimeout) - defer cancel() - - sandboxed := opts.SandboxID != "" - _, errShim := shimTask.delete(dctx, sandboxed, func(context.Context, string) {}) - if errShim != nil { - if errdefs.IsDeadlineExceeded(errShim) { - dctx, cancel = timeout.WithContext(cleanup.Background(ctx), cleanupTimeout) - defer cancel() - } - - shimTask.Shutdown(dctx) - shimTask.Close() - } - - return nil, fmt.Errorf("failed to create shim task: %w", err) - } - - return t, nil -} - -// Get a specific task -func (m *TaskManager) Get(ctx context.Context, id string) (runtime.Task, error) { - shim, err := m.manager.shims.Get(ctx, id) - if err != nil { - return nil, err - } - return newShimTask(shim) -} - -// Tasks lists all tasks -func (m *TaskManager) Tasks(ctx context.Context, all bool) ([]runtime.Task, error) { - shims, err := m.manager.shims.GetAll(ctx, all) - if err != nil { - return nil, err - } - out := make([]runtime.Task, len(shims)) - for i := range shims { - newClient, err := newShimTask(shims[i]) - if err != nil { - return nil, err - } - out[i] = newClient - } - return out, nil -} - -// Delete deletes the task and shim instance -func (m *TaskManager) Delete(ctx context.Context, taskID string) (*runtime.Exit, error) { - shim, err := m.manager.shims.Get(ctx, taskID) - if err != nil { - return nil, err - } - - container, err := m.manager.containers.Get(ctx, taskID) - if err != nil { - return nil, err - } - - shimTask, err := newShimTask(shim) - if err != nil { - return nil, err - } - - sandboxed := container.SandboxID != "" - - exit, err := shimTask.delete(ctx, sandboxed, func(ctx context.Context, id string) { - m.manager.shims.Delete(ctx, id) - }) - - if err != nil { - return nil, fmt.Errorf("failed to delete task: %w", err) - } - - return exit, nil -} - -func (m *TaskManager) PluginInfo(ctx context.Context, request interface{}) (interface{}, error) { - req, ok := request.(*apitypes.RuntimeRequest) - if !ok { - return nil, fmt.Errorf("unknown request type %T: %w", request, errdefs.ErrNotImplemented) - } - - runtimePath, err := m.manager.resolveRuntimePath(req.RuntimePath) - if err != nil { - return nil, fmt.Errorf("failed to resolve runtime path: %w", err) - } - var optsB []byte - if req.Options != nil { - optsB, err = proto.Marshal(req.Options) - if err != nil { - return nil, fmt.Errorf("failed to marshal %s: %w", req.Options.TypeUrl, err) - } - } - var stderr bytes.Buffer - cmd := exec.CommandContext(ctx, runtimePath, "-info") - cmd.Stdin = bytes.NewReader(optsB) - cmd.Stderr = &stderr - stdout, err := cmd.Output() - if err != nil { - return nil, fmt.Errorf("failed to run %v: %w (stderr: %q)", cmd.Args, err, stderr.String()) - } - var info apitypes.RuntimeInfo - if err = proto.Unmarshal(stdout, &info); err != nil { - return nil, fmt.Errorf("failed to unmarshal stdout from %v into %T: %w", cmd.Args, &info, err) - } - return &info, nil -} diff --git a/core/runtime/v2/task_manager.go b/core/runtime/v2/task_manager.go new file mode 100644 index 000000000..3921053e3 --- /dev/null +++ b/core/runtime/v2/task_manager.go @@ -0,0 +1,226 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package v2 + +import ( + "bytes" + "context" + "fmt" + "os" + "os/exec" + + "github.com/containerd/errdefs" + "github.com/containerd/plugin" + "github.com/containerd/plugin/registry" + + apitypes "github.com/containerd/containerd/v2/api/types" + "github.com/containerd/containerd/v2/core/runtime" + "github.com/containerd/containerd/v2/internal/cleanup" + "github.com/containerd/containerd/v2/pkg/timeout" + "github.com/containerd/containerd/v2/plugins" + "github.com/containerd/containerd/v2/protobuf/proto" +) + +func init() { + registry.Register(&plugin.Registration{ + Type: plugins.RuntimePluginV2, + ID: "task", + Requires: []plugin.Type{ + plugins.ShimPlugin, + }, + InitFn: func(ic *plugin.InitContext) (interface{}, error) { + shimManagerI, err := ic.GetByID(plugins.ShimPlugin, "shim") + if err != nil { + return nil, err + } + shimManager := shimManagerI.(*ShimManager) + root, state := ic.Properties[plugins.PropertyRootDir], ic.Properties[plugins.PropertyStateDir] + for _, d := range []string{root, state} { + if err := os.MkdirAll(d, 0711); err != nil { + return nil, err + } + } + return NewTaskManager(ic.Context, root, state, shimManager) + }, + }) +} + +// TaskManager wraps task service client on top of shim manager. +type TaskManager struct { + root string + state string + manager *ShimManager +} + +// NewTaskManager creates a new task manager instance. +// root is the rootDir of TaskManager plugin to store persistent data +// state is the stateDir of TaskManager plugin to store transient data +// shims is ShimManager for TaskManager to create/delete shims +func NewTaskManager(ctx context.Context, root, state string, shims *ShimManager) (*TaskManager, error) { + if err := shims.LoadExistingShims(ctx, state, root); err != nil { + return nil, fmt.Errorf("failed to load existing shims for task manager") + } + m := &TaskManager{ + root: root, + state: state, + manager: shims, + } + return m, nil +} + +// ID of the task manager +func (m *TaskManager) ID() string { + return plugins.RuntimePluginV2.String() + ".task" +} + +// Create launches new shim instance and creates new task +func (m *TaskManager) Create(ctx context.Context, taskID string, opts runtime.CreateOpts) (_ runtime.Task, retErr error) { + bundle, err := NewBundle(ctx, m.root, m.state, taskID, opts.Spec) + if err != nil { + return nil, err + } + defer func() { + if retErr != nil { + bundle.Delete() + } + }() + + shim, err := m.manager.Start(ctx, taskID, bundle, opts) + if err != nil { + return nil, fmt.Errorf("failed to start shim: %w", err) + } + + // Cast to shim task and call task service to create a new container task instance. + // This will not be required once shim service / client implemented. + shimTask, err := newShimTask(shim) + if err != nil { + return nil, err + } + + t, err := shimTask.Create(ctx, opts) + if err != nil { + // NOTE: ctx contains required namespace information. + m.manager.shims.Delete(ctx, taskID) + + dctx, cancel := timeout.WithContext(cleanup.Background(ctx), cleanupTimeout) + defer cancel() + + sandboxed := opts.SandboxID != "" + _, errShim := shimTask.delete(dctx, sandboxed, func(context.Context, string) {}) + if errShim != nil { + if errdefs.IsDeadlineExceeded(errShim) { + dctx, cancel = timeout.WithContext(cleanup.Background(ctx), cleanupTimeout) + defer cancel() + } + + shimTask.Shutdown(dctx) + shimTask.Close() + } + + return nil, fmt.Errorf("failed to create shim task: %w", err) + } + + return t, nil +} + +// Get a specific task +func (m *TaskManager) Get(ctx context.Context, id string) (runtime.Task, error) { + shim, err := m.manager.shims.Get(ctx, id) + if err != nil { + return nil, err + } + return newShimTask(shim) +} + +// Tasks lists all tasks +func (m *TaskManager) Tasks(ctx context.Context, all bool) ([]runtime.Task, error) { + shims, err := m.manager.shims.GetAll(ctx, all) + if err != nil { + return nil, err + } + out := make([]runtime.Task, len(shims)) + for i := range shims { + newClient, err := newShimTask(shims[i]) + if err != nil { + return nil, err + } + out[i] = newClient + } + return out, nil +} + +// Delete deletes the task and shim instance +func (m *TaskManager) Delete(ctx context.Context, taskID string) (*runtime.Exit, error) { + shim, err := m.manager.shims.Get(ctx, taskID) + if err != nil { + return nil, err + } + + container, err := m.manager.containers.Get(ctx, taskID) + if err != nil { + return nil, err + } + + shimTask, err := newShimTask(shim) + if err != nil { + return nil, err + } + + sandboxed := container.SandboxID != "" + + exit, err := shimTask.delete(ctx, sandboxed, func(ctx context.Context, id string) { + m.manager.shims.Delete(ctx, id) + }) + + if err != nil { + return nil, fmt.Errorf("failed to delete task: %w", err) + } + + return exit, nil +} + +func (m *TaskManager) PluginInfo(ctx context.Context, request interface{}) (interface{}, error) { + req, ok := request.(*apitypes.RuntimeRequest) + if !ok { + return nil, fmt.Errorf("unknown request type %T: %w", request, errdefs.ErrNotImplemented) + } + + runtimePath, err := m.manager.resolveRuntimePath(req.RuntimePath) + if err != nil { + return nil, fmt.Errorf("failed to resolve runtime path: %w", err) + } + var optsB []byte + if req.Options != nil { + optsB, err = proto.Marshal(req.Options) + if err != nil { + return nil, fmt.Errorf("failed to marshal %s: %w", req.Options.TypeUrl, err) + } + } + var stderr bytes.Buffer + cmd := exec.CommandContext(ctx, runtimePath, "-info") + cmd.Stdin = bytes.NewReader(optsB) + cmd.Stderr = &stderr + stdout, err := cmd.Output() + if err != nil { + return nil, fmt.Errorf("failed to run %v: %w (stderr: %q)", cmd.Args, err, stderr.String()) + } + var info apitypes.RuntimeInfo + if err = proto.Unmarshal(stdout, &info); err != nil { + return nil, fmt.Errorf("failed to unmarshal stdout from %v into %T: %w", cmd.Args, &info, err) + } + return &info, nil +} diff --git a/core/runtime/v2/manager_test.go b/core/runtime/v2/task_manager_test.go similarity index 100% rename from core/runtime/v2/manager_test.go rename to core/runtime/v2/task_manager_test.go diff --git a/plugins/sandbox/controller.go b/plugins/sandbox/controller.go index bc8b8e237..40c77cfc0 100644 --- a/plugins/sandbox/controller.go +++ b/plugins/sandbox/controller.go @@ -19,8 +19,16 @@ package sandbox import ( "context" "fmt" + "os" "time" + "github.com/containerd/errdefs" + "github.com/containerd/log" + "github.com/containerd/platforms" + "github.com/containerd/plugin" + "github.com/containerd/plugin/registry" + "google.golang.org/protobuf/types/known/anypb" + runtimeAPI "github.com/containerd/containerd/v2/api/runtime/sandbox/v1" "github.com/containerd/containerd/v2/api/types" "github.com/containerd/containerd/v2/core/events" @@ -30,13 +38,6 @@ import ( v2 "github.com/containerd/containerd/v2/core/runtime/v2" "github.com/containerd/containerd/v2/core/sandbox" "github.com/containerd/containerd/v2/plugins" - "github.com/containerd/errdefs" - "github.com/containerd/log" - "github.com/containerd/platforms" - "github.com/containerd/plugin" - "github.com/containerd/plugin/registry" - - "google.golang.org/protobuf/types/known/anypb" ) func init() { @@ -44,11 +45,11 @@ func init() { Type: plugins.SandboxControllerPlugin, ID: "shim", Requires: []plugin.Type{ - plugins.RuntimePluginV2, + plugins.ShimPlugin, plugins.EventPlugin, }, InitFn: func(ic *plugin.InitContext) (interface{}, error) { - shimPlugin, err := ic.GetByID(plugins.RuntimePluginV2, "shim") + shimPlugin, err := ic.GetByID(plugins.ShimPlugin, "shim") if err != nil { return nil, err } @@ -62,16 +63,32 @@ func init() { shims = shimPlugin.(*v2.ShimManager) publisher = exchangePlugin.(*exchange.Exchange) ) + state := ic.Properties[plugins.PropertyStateDir] + root := ic.Properties[plugins.PropertyRootDir] + for _, d := range []string{root, state} { + if err := os.MkdirAll(d, 0711); err != nil { + return nil, err + } + } - return &controllerLocal{ + if err := shims.LoadExistingShims(ic.Context, root, state); err != nil { + return nil, fmt.Errorf("failed to load existing shim sandboxes, %v", err) + } + + c := &controllerLocal{ + root: root, + state: state, shims: shims, publisher: publisher, - }, nil + } + return c, nil }, }) } type controllerLocal struct { + root string + state string shims *v2.ShimManager publisher events.Publisher } @@ -97,7 +114,7 @@ func (c *controllerLocal) cleanupShim(ctx context.Context, sandboxID string, svc } } -func (c *controllerLocal) Create(ctx context.Context, info sandbox.Sandbox, opts ...sandbox.CreateOpt) error { +func (c *controllerLocal) Create(ctx context.Context, info sandbox.Sandbox, opts ...sandbox.CreateOpt) (retErr error) { var coptions sandbox.CreateOptions sandboxID := info.ID for _, opt := range opts { @@ -108,7 +125,17 @@ func (c *controllerLocal) Create(ctx context.Context, info sandbox.Sandbox, opts return fmt.Errorf("sandbox %s already running: %w", sandboxID, errdefs.ErrAlreadyExists) } - shim, err := c.shims.Start(ctx, sandboxID, runtime.CreateOpts{ + bundle, err := v2.NewBundle(ctx, c.root, c.state, sandboxID, info.Spec) + if err != nil { + return err + } + defer func() { + if retErr != nil { + bundle.Delete() + } + }() + + shim, err := c.shims.Start(ctx, sandboxID, bundle, runtime.CreateOpts{ Spec: info.Spec, RuntimeOptions: info.Runtime.Options, Runtime: info.Runtime.Name, diff --git a/plugins/types.go b/plugins/types.go index d18ffbf2d..3be356ade 100644 --- a/plugins/types.go +++ b/plugins/types.go @@ -71,6 +71,8 @@ const ( WarningPlugin plugin.Type = "io.containerd.warning.v1" // CRIServicePlugin implements a cri service CRIServicePlugin plugin.Type = "io.containerd.cri.v1" + // ShimPlugin implements a shim service + ShimPlugin plugin.Type = "io.containerd.shim.v1" ) const (