sandbox: make a independent shim plugin

Signed-off-by: Abel Feng <fshb1988@gmail.com>
This commit is contained in:
Abel Feng 2024-02-22 16:05:45 +08:00
parent f8fbdfdd6f
commit a12acedfad
6 changed files with 388 additions and 301 deletions

View File

@ -19,19 +19,23 @@ package v2
import ( import (
"context" "context"
"errors" "errors"
"fmt"
"os" "os"
"path/filepath" "path/filepath"
"github.com/containerd/errdefs"
"github.com/containerd/log"
"github.com/containerd/containerd/v2/core/mount" "github.com/containerd/containerd/v2/core/mount"
"github.com/containerd/containerd/v2/internal/cleanup" "github.com/containerd/containerd/v2/internal/cleanup"
"github.com/containerd/containerd/v2/pkg/namespaces" "github.com/containerd/containerd/v2/pkg/namespaces"
"github.com/containerd/containerd/v2/pkg/timeout" "github.com/containerd/containerd/v2/pkg/timeout"
"github.com/containerd/errdefs"
"github.com/containerd/log"
) )
func (m *ShimManager) loadExistingTasks(ctx context.Context) error { // LoadExistingShims loads existing shims from the path specified by stateDir
nsDirs, err := os.ReadDir(m.state) // rootDir is for cleaning up the unused paths of removed shims.
func (m *ShimManager) LoadExistingShims(ctx context.Context, stateDir string, rootDir string) error {
nsDirs, err := os.ReadDir(stateDir)
if err != nil { if err != nil {
return err return err
} }
@ -45,11 +49,11 @@ func (m *ShimManager) loadExistingTasks(ctx context.Context) error {
continue continue
} }
log.G(ctx).WithField("namespace", ns).Debug("loading tasks in namespace") log.G(ctx).WithField("namespace", ns).Debug("loading tasks in namespace")
if err := m.loadShims(namespaces.WithNamespace(ctx, ns)); err != nil { if err := m.loadShims(namespaces.WithNamespace(ctx, ns), stateDir); err != nil {
log.G(ctx).WithField("namespace", ns).WithError(err).Error("loading tasks in namespace") log.G(ctx).WithField("namespace", ns).WithError(err).Error("loading tasks in namespace")
continue continue
} }
if err := m.cleanupWorkDirs(namespaces.WithNamespace(ctx, ns)); err != nil { if err := m.cleanupWorkDirs(namespaces.WithNamespace(ctx, ns), rootDir); err != nil {
log.G(ctx).WithField("namespace", ns).WithError(err).Error("cleanup working directory in namespace") log.G(ctx).WithField("namespace", ns).WithError(err).Error("cleanup working directory in namespace")
continue continue
} }
@ -57,14 +61,14 @@ func (m *ShimManager) loadExistingTasks(ctx context.Context) error {
return nil return nil
} }
func (m *ShimManager) loadShims(ctx context.Context) error { func (m *ShimManager) loadShims(ctx context.Context, stateDir string) error {
ns, err := namespaces.NamespaceRequired(ctx) ns, err := namespaces.NamespaceRequired(ctx)
if err != nil { if err != nil {
return err return err
} }
ctx = log.WithLogger(ctx, log.G(ctx).WithField("namespace", ns)) ctx = log.WithLogger(ctx, log.G(ctx).WithField("namespace", ns))
shimDirs, err := os.ReadDir(filepath.Join(m.state, ns)) shimDirs, err := os.ReadDir(filepath.Join(stateDir, ns))
if err != nil { if err != nil {
return err return err
} }
@ -77,7 +81,7 @@ func (m *ShimManager) loadShims(ctx context.Context) error {
if len(id) > 0 && id[0] == '.' { if len(id) > 0 && id[0] == '.' {
continue continue
} }
bundle, err := LoadBundle(ctx, m.state, id) bundle, err := LoadBundle(ctx, stateDir, id)
if err != nil { if err != nil {
// fine to return error here, it is a programmer error if the context // fine to return error here, it is a programmer error if the context
// does not have a namespace // does not have a namespace
@ -102,78 +106,87 @@ func (m *ShimManager) loadShims(ctx context.Context) error {
bundle.Delete() bundle.Delete()
continue continue
} }
if err := m.loadShim(ctx, bundle); err != nil {
var ( log.G(ctx).WithError(err).Errorf("failed to load shim %s", bundle.Path)
runtime string
)
// If we're on 1.6+ and specified custom path to the runtime binary, path will be saved in 'shim-binary-path' file.
if data, err := os.ReadFile(filepath.Join(bundle.Path, "shim-binary-path")); err == nil {
runtime = string(data)
} else if err != nil && !os.IsNotExist(err) {
log.G(ctx).WithError(err).Error("failed to read `runtime` path from bundle")
}
// Query runtime name from metadata store
if runtime == "" {
container, err := m.containers.Get(ctx, id)
if err != nil {
log.G(ctx).WithError(err).Errorf("loading container %s", id)
if err := mount.UnmountRecursive(filepath.Join(bundle.Path, "rootfs"), 0); err != nil {
log.G(ctx).WithError(err).Errorf("failed to unmount of rootfs %s", id)
}
bundle.Delete()
continue
}
runtime = container.Runtime.Name
}
runtime, err = m.resolveRuntimePath(runtime)
if err != nil {
bundle.Delete() bundle.Delete()
log.G(ctx).WithError(err).Error("failed to resolve runtime path")
continue continue
} }
binaryCall := shimBinary(bundle, }
shimBinaryConfig{ return nil
runtime: runtime, }
address: m.containerdAddress,
ttrpcAddress: m.containerdTTRPCAddress,
schedCore: m.schedCore,
})
shim, err := loadShimTask(ctx, bundle, func() {
log.G(ctx).WithField("id", id).Info("shim disconnected")
cleanupAfterDeadShim(cleanup.Background(ctx), id, m.shims, m.events, binaryCall) func (m *ShimManager) loadShim(ctx context.Context, bundle *Bundle) error {
// Remove self from the runtime task list. var (
m.shims.Delete(ctx, id) runtime string
}) id = bundle.ID
)
// If we're on 1.6+ and specified custom path to the runtime binary, path will be saved in 'shim-binary-path' file.
if data, err := os.ReadFile(filepath.Join(bundle.Path, "shim-binary-path")); err == nil {
runtime = string(data)
} else if err != nil && !os.IsNotExist(err) {
log.G(ctx).WithError(err).Error("failed to read `runtime` path from bundle")
}
// Query runtime name from metadata store
if runtime == "" {
container, err := m.containers.Get(ctx, id)
if err != nil { if err != nil {
log.G(ctx).WithError(err).Errorf("unable to load shim %q", id) log.G(ctx).WithError(err).Errorf("loading container %s", id)
cleanupAfterDeadShim(ctx, id, m.shims, m.events, binaryCall) if err := mount.UnmountRecursive(filepath.Join(bundle.Path, "rootfs"), 0); err != nil {
continue log.G(ctx).WithError(err).Errorf("failed to unmount of rootfs %s", id)
}
return err
} }
runtime = container.Runtime.Name
}
// There are 3 possibilities for the loaded shim here: runtime, err := m.resolveRuntimePath(runtime)
// 1. It could be a shim that is running a task. if err != nil {
// 2. It could be a sandbox shim. bundle.Delete()
// 3. Or it could be a shim that was created for running a task but
// something happened (probably a containerd crash) and the task was never
// created. This shim process should be cleaned up here. Look at
// containerd/containerd#6860 for further details.
_, sgetErr := m.sandboxStore.Get(ctx, id) return fmt.Errorf("failed to resolve runtime path: %w", err)
pInfo, pidErr := shim.Pids(ctx) }
if sgetErr != nil && errors.Is(sgetErr, errdefs.ErrNotFound) && (len(pInfo) == 0 || errors.Is(pidErr, errdefs.ErrNotFound)) {
log.G(ctx).WithField("id", id).Info("cleaning leaked shim process") binaryCall := shimBinary(bundle,
// We are unable to get Pids from the shim and it's not a sandbox shimBinaryConfig{
// shim. We should clean it up her. runtime: runtime,
// No need to do anything for removeTask since we never added this shim. address: m.containerdAddress,
shim.delete(ctx, false, func(ctx context.Context, id string) {}) ttrpcAddress: m.containerdTTRPCAddress,
} else { schedCore: m.schedCore,
m.shims.Add(ctx, shim.ShimInstance) })
} // TODO: It seems we can only call loadShim here if it is a sandbox shim?
shim, err := loadShimTask(ctx, bundle, func() {
log.G(ctx).WithField("id", id).Info("shim disconnected")
cleanupAfterDeadShim(cleanup.Background(ctx), id, m.shims, m.events, binaryCall)
// Remove self from the runtime task list.
m.shims.Delete(ctx, id)
})
if err != nil {
cleanupAfterDeadShim(ctx, id, m.shims, m.events, binaryCall)
return fmt.Errorf("unable to load shim %q: %w", id, err)
}
// There are 3 possibilities for the loaded shim here:
// 1. It could be a shim that is running a task.
// 2. It could be a sandbox shim.
// 3. Or it could be a shim that was created for running a task but
// something happened (probably a containerd crash) and the task was never
// created. This shim process should be cleaned up here. Look at
// containerd/containerd#6860 for further details.
_, sgetErr := m.sandboxStore.Get(ctx, id)
pInfo, pidErr := shim.Pids(ctx)
if sgetErr != nil && errors.Is(sgetErr, errdefs.ErrNotFound) && (len(pInfo) == 0 || errors.Is(pidErr, errdefs.ErrNotFound)) {
log.G(ctx).WithField("id", id).Info("cleaning leaked shim process")
// We are unable to get Pids from the shim and it's not a sandbox
// shim. We should clean it up her.
// No need to do anything for removeTask since we never added this shim.
shim.delete(ctx, false, func(ctx context.Context, id string) {})
} else {
m.shims.Add(ctx, shim.ShimInstance)
} }
return nil return nil
} }
@ -198,13 +211,13 @@ func loadShimTask(ctx context.Context, bundle *Bundle, onClose func()) (_ *shimT
return s, nil return s, nil
} }
func (m *ShimManager) cleanupWorkDirs(ctx context.Context) error { func (m *ShimManager) cleanupWorkDirs(ctx context.Context, rootDir string) error {
ns, err := namespaces.NamespaceRequired(ctx) ns, err := namespaces.NamespaceRequired(ctx)
if err != nil { if err != nil {
return err return err
} }
f, err := os.Open(filepath.Join(m.root, ns)) f, err := os.Open(filepath.Join(rootDir, ns))
if err != nil { if err != nil {
return err return err
} }
@ -220,7 +233,7 @@ func (m *ShimManager) cleanupWorkDirs(ctx context.Context) error {
// this can happen on a reboot where /run for the bundle state is cleaned up // this can happen on a reboot where /run for the bundle state is cleaned up
// but that persistent working dir is left // but that persistent working dir is left
if _, err := m.shims.Get(ctx, dir); err != nil { if _, err := m.shims.Get(ctx, dir); err != nil {
path := filepath.Join(m.root, ns, dir) path := filepath.Join(rootDir, ns, dir)
if err := os.RemoveAll(path); err != nil { if err := os.RemoveAll(path); err != nil {
log.G(ctx).WithError(err).Errorf("cleanup working dir %s", path) log.G(ctx).WithError(err).Errorf("cleanup working dir %s", path)
} }

View File

@ -17,7 +17,6 @@
package v2 package v2
import ( import (
"bytes"
"context" "context"
"errors" "errors"
"fmt" "fmt"
@ -27,7 +26,11 @@ import (
"strings" "strings"
"sync" "sync"
apitypes "github.com/containerd/containerd/v2/api/types" "github.com/containerd/log"
"github.com/containerd/platforms"
"github.com/containerd/plugin"
"github.com/containerd/plugin/registry"
"github.com/containerd/containerd/v2/core/containers" "github.com/containerd/containerd/v2/core/containers"
"github.com/containerd/containerd/v2/core/events/exchange" "github.com/containerd/containerd/v2/core/events/exchange"
"github.com/containerd/containerd/v2/core/metadata" "github.com/containerd/containerd/v2/core/metadata"
@ -39,15 +42,10 @@ import (
"github.com/containerd/containerd/v2/pkg/timeout" "github.com/containerd/containerd/v2/pkg/timeout"
"github.com/containerd/containerd/v2/plugins" "github.com/containerd/containerd/v2/plugins"
"github.com/containerd/containerd/v2/protobuf" "github.com/containerd/containerd/v2/protobuf"
"github.com/containerd/containerd/v2/protobuf/proto" "github.com/containerd/containerd/v2/version"
"github.com/containerd/errdefs"
"github.com/containerd/log"
"github.com/containerd/platforms"
"github.com/containerd/plugin"
"github.com/containerd/plugin/registry"
) )
// Config for the v2 runtime // Config for the shim
type Config struct { type Config struct {
// Supported platforms // Supported platforms
Platforms []string `toml:"platforms"` Platforms []string `toml:"platforms"`
@ -56,9 +54,12 @@ type Config struct {
} }
func init() { func init() {
// ShimManager is not only for TaskManager,
// the "shim" sandbox controller also use it to manage shims,
// so we make it an independent plugin
registry.Register(&plugin.Registration{ registry.Register(&plugin.Registration{
Type: plugins.RuntimePluginV2, Type: plugins.ShimPlugin,
ID: "task", ID: "shim",
Requires: []plugin.Type{ Requires: []plugin.Type{
plugins.EventPlugin, plugins.EventPlugin,
plugins.MetadataPlugin, plugins.MetadataPlugin,
@ -72,7 +73,6 @@ func init() {
if err != nil { if err != nil {
return nil, err return nil, err
} }
ic.Meta.Platforms = supportedPlatforms ic.Meta.Platforms = supportedPlatforms
m, err := ic.GetSingle(plugins.MetadataPlugin) m, err := ic.GetSingle(plugins.MetadataPlugin)
@ -83,13 +83,10 @@ func init() {
if err != nil { if err != nil {
return nil, err return nil, err
} }
events := ep.(*exchange.Exchange)
cs := metadata.NewContainerStore(m.(*metadata.DB)) cs := metadata.NewContainerStore(m.(*metadata.DB))
ss := metadata.NewSandboxStore(m.(*metadata.DB)) ss := metadata.NewSandboxStore(m.(*metadata.DB))
events := ep.(*exchange.Exchange) return NewShimManager(&ManagerConfig{
shimManager, err := NewShimManager(ic.Context, &ManagerConfig{
Root: ic.Properties[plugins.PropertyRootDir],
State: ic.Properties[plugins.PropertyStateDir],
Address: ic.Properties[plugins.PropertyGRPCAddress], Address: ic.Properties[plugins.PropertyGRPCAddress],
TTRPCAddress: ic.Properties[plugins.PropertyTTRPCAddress], TTRPCAddress: ic.Properties[plugins.PropertyTTRPCAddress],
Events: events, Events: events,
@ -97,36 +94,27 @@ func init() {
SchedCore: config.SchedCore, SchedCore: config.SchedCore,
SandboxStore: ss, SandboxStore: ss,
}) })
if err != nil {
return nil, err
}
return NewTaskManager(shimManager), nil
}, },
}) ConfigMigration: func(ctx context.Context, configVersion int, pluginConfigs map[string]interface{}) error {
// Migrate configurations from io.containerd.runtime.v2.task
// Task manager uses shim manager as a dependency to manage shim instances. // if the configVersion >= 3 please make sure the config is under io.containerd.shim.v1.shim.
// However, due to time limits and to avoid migration steps in 1.6 release, if configVersion >= version.ConfigVersion {
// use the following workaround. return nil
// This expected to be removed in 1.7.
registry.Register(&plugin.Registration{
Type: plugins.RuntimePluginV2,
ID: "shim",
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
taskManagerI, err := ic.GetByID(plugins.RuntimePluginV2, "task")
if err != nil {
return nil, err
} }
const originalPluginName = string(plugins.RuntimePluginV2) + ".task"
taskManager := taskManagerI.(*TaskManager) original, ok := pluginConfigs[originalPluginName]
return taskManager.manager, nil if !ok {
return nil
}
const newPluginName = string(plugins.ShimPlugin) + ".shim"
pluginConfigs[originalPluginName] = nil
pluginConfigs[newPluginName] = original
return nil
}, },
}) })
} }
type ManagerConfig struct { type ManagerConfig struct {
Root string
State string
Store containers.Store Store containers.Store
Events *exchange.Exchange Events *exchange.Exchange
Address string Address string
@ -136,16 +124,8 @@ type ManagerConfig struct {
} }
// NewShimManager creates a manager for v2 shims // NewShimManager creates a manager for v2 shims
func NewShimManager(ctx context.Context, config *ManagerConfig) (*ShimManager, error) { func NewShimManager(config *ManagerConfig) (*ShimManager, error) {
for _, d := range []string{config.Root, config.State} {
if err := os.MkdirAll(d, 0711); err != nil {
return nil, err
}
}
m := &ShimManager{ m := &ShimManager{
root: config.Root,
state: config.State,
containerdAddress: config.Address, containerdAddress: config.Address,
containerdTTRPCAddress: config.TTRPCAddress, containerdTTRPCAddress: config.TTRPCAddress,
shims: runtime.NewNSMap[ShimInstance](), shims: runtime.NewNSMap[ShimInstance](),
@ -155,10 +135,6 @@ func NewShimManager(ctx context.Context, config *ManagerConfig) (*ShimManager, e
sandboxStore: config.SandboxStore, sandboxStore: config.SandboxStore,
} }
if err := m.loadExistingTasks(ctx); err != nil {
return nil, err
}
return m, nil return m, nil
} }
@ -167,8 +143,6 @@ func NewShimManager(ctx context.Context, config *ManagerConfig) (*ShimManager, e
// The manager is unaware of the underlying services shim provides and lets higher level services consume them, // The manager is unaware of the underlying services shim provides and lets higher level services consume them,
// but don't care about lifecycle management. // but don't care about lifecycle management.
type ShimManager struct { type ShimManager struct {
root string
state string
containerdAddress string containerdAddress string
containerdTTRPCAddress string containerdTTRPCAddress string
schedCore bool schedCore bool
@ -182,21 +156,11 @@ type ShimManager struct {
// ID of the shim manager // ID of the shim manager
func (m *ShimManager) ID() string { func (m *ShimManager) ID() string {
return plugins.RuntimePluginV2.String() + ".shim" return plugins.ShimPlugin.String() + ".shim"
} }
// Start launches a new shim instance // Start launches a new shim instance
func (m *ShimManager) Start(ctx context.Context, id string, opts runtime.CreateOpts) (_ ShimInstance, retErr error) { func (m *ShimManager) Start(ctx context.Context, id string, bundle *Bundle, opts runtime.CreateOpts) (_ ShimInstance, retErr error) {
bundle, err := NewBundle(ctx, m.root, m.state, id, opts.Spec)
if err != nil {
return nil, err
}
defer func() {
if retErr != nil {
bundle.Delete()
}
}()
// This container belongs to sandbox which supposed to be already started via sandbox API. // This container belongs to sandbox which supposed to be already started via sandbox API.
if opts.SandboxID != "" { if opts.SandboxID != "" {
process, err := m.Get(ctx, opts.SandboxID) process, err := m.Get(ctx, opts.SandboxID)
@ -209,7 +173,7 @@ func (m *ShimManager) Start(ctx context.Context, id string, opts runtime.CreateO
return nil, err return nil, err
} }
params, err := restoreBootstrapParams(filepath.Join(m.state, process.Namespace(), opts.SandboxID)) params, err := restoreBootstrapParams(process.Bundle())
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -423,148 +387,3 @@ func (m *ShimManager) Delete(ctx context.Context, id string) error {
return err return err
} }
// TaskManager wraps task service client on top of shim manager.
type TaskManager struct {
manager *ShimManager
}
// NewTaskManager creates a new task manager instance.
func NewTaskManager(shims *ShimManager) *TaskManager {
return &TaskManager{
manager: shims,
}
}
// ID of the task manager
func (m *TaskManager) ID() string {
return plugins.RuntimePluginV2.String() + ".task"
}
// Create launches new shim instance and creates new task
func (m *TaskManager) Create(ctx context.Context, taskID string, opts runtime.CreateOpts) (runtime.Task, error) {
shim, err := m.manager.Start(ctx, taskID, opts)
if err != nil {
return nil, fmt.Errorf("failed to start shim: %w", err)
}
// Cast to shim task and call task service to create a new container task instance.
// This will not be required once shim service / client implemented.
shimTask, err := newShimTask(shim)
if err != nil {
return nil, err
}
t, err := shimTask.Create(ctx, opts)
if err != nil {
// NOTE: ctx contains required namespace information.
m.manager.shims.Delete(ctx, taskID)
dctx, cancel := timeout.WithContext(cleanup.Background(ctx), cleanupTimeout)
defer cancel()
sandboxed := opts.SandboxID != ""
_, errShim := shimTask.delete(dctx, sandboxed, func(context.Context, string) {})
if errShim != nil {
if errdefs.IsDeadlineExceeded(errShim) {
dctx, cancel = timeout.WithContext(cleanup.Background(ctx), cleanupTimeout)
defer cancel()
}
shimTask.Shutdown(dctx)
shimTask.Close()
}
return nil, fmt.Errorf("failed to create shim task: %w", err)
}
return t, nil
}
// Get a specific task
func (m *TaskManager) Get(ctx context.Context, id string) (runtime.Task, error) {
shim, err := m.manager.shims.Get(ctx, id)
if err != nil {
return nil, err
}
return newShimTask(shim)
}
// Tasks lists all tasks
func (m *TaskManager) Tasks(ctx context.Context, all bool) ([]runtime.Task, error) {
shims, err := m.manager.shims.GetAll(ctx, all)
if err != nil {
return nil, err
}
out := make([]runtime.Task, len(shims))
for i := range shims {
newClient, err := newShimTask(shims[i])
if err != nil {
return nil, err
}
out[i] = newClient
}
return out, nil
}
// Delete deletes the task and shim instance
func (m *TaskManager) Delete(ctx context.Context, taskID string) (*runtime.Exit, error) {
shim, err := m.manager.shims.Get(ctx, taskID)
if err != nil {
return nil, err
}
container, err := m.manager.containers.Get(ctx, taskID)
if err != nil {
return nil, err
}
shimTask, err := newShimTask(shim)
if err != nil {
return nil, err
}
sandboxed := container.SandboxID != ""
exit, err := shimTask.delete(ctx, sandboxed, func(ctx context.Context, id string) {
m.manager.shims.Delete(ctx, id)
})
if err != nil {
return nil, fmt.Errorf("failed to delete task: %w", err)
}
return exit, nil
}
func (m *TaskManager) PluginInfo(ctx context.Context, request interface{}) (interface{}, error) {
req, ok := request.(*apitypes.RuntimeRequest)
if !ok {
return nil, fmt.Errorf("unknown request type %T: %w", request, errdefs.ErrNotImplemented)
}
runtimePath, err := m.manager.resolveRuntimePath(req.RuntimePath)
if err != nil {
return nil, fmt.Errorf("failed to resolve runtime path: %w", err)
}
var optsB []byte
if req.Options != nil {
optsB, err = proto.Marshal(req.Options)
if err != nil {
return nil, fmt.Errorf("failed to marshal %s: %w", req.Options.TypeUrl, err)
}
}
var stderr bytes.Buffer
cmd := exec.CommandContext(ctx, runtimePath, "-info")
cmd.Stdin = bytes.NewReader(optsB)
cmd.Stderr = &stderr
stdout, err := cmd.Output()
if err != nil {
return nil, fmt.Errorf("failed to run %v: %w (stderr: %q)", cmd.Args, err, stderr.String())
}
var info apitypes.RuntimeInfo
if err = proto.Unmarshal(stdout, &info); err != nil {
return nil, fmt.Errorf("failed to unmarshal stdout from %v into %T: %w", cmd.Args, &info, err)
}
return &info, nil
}

View File

@ -0,0 +1,226 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package v2
import (
"bytes"
"context"
"fmt"
"os"
"os/exec"
"github.com/containerd/errdefs"
"github.com/containerd/plugin"
"github.com/containerd/plugin/registry"
apitypes "github.com/containerd/containerd/v2/api/types"
"github.com/containerd/containerd/v2/core/runtime"
"github.com/containerd/containerd/v2/internal/cleanup"
"github.com/containerd/containerd/v2/pkg/timeout"
"github.com/containerd/containerd/v2/plugins"
"github.com/containerd/containerd/v2/protobuf/proto"
)
func init() {
registry.Register(&plugin.Registration{
Type: plugins.RuntimePluginV2,
ID: "task",
Requires: []plugin.Type{
plugins.ShimPlugin,
},
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
shimManagerI, err := ic.GetByID(plugins.ShimPlugin, "shim")
if err != nil {
return nil, err
}
shimManager := shimManagerI.(*ShimManager)
root, state := ic.Properties[plugins.PropertyRootDir], ic.Properties[plugins.PropertyStateDir]
for _, d := range []string{root, state} {
if err := os.MkdirAll(d, 0711); err != nil {
return nil, err
}
}
return NewTaskManager(ic.Context, root, state, shimManager)
},
})
}
// TaskManager wraps task service client on top of shim manager.
type TaskManager struct {
root string
state string
manager *ShimManager
}
// NewTaskManager creates a new task manager instance.
// root is the rootDir of TaskManager plugin to store persistent data
// state is the stateDir of TaskManager plugin to store transient data
// shims is ShimManager for TaskManager to create/delete shims
func NewTaskManager(ctx context.Context, root, state string, shims *ShimManager) (*TaskManager, error) {
if err := shims.LoadExistingShims(ctx, state, root); err != nil {
return nil, fmt.Errorf("failed to load existing shims for task manager")
}
m := &TaskManager{
root: root,
state: state,
manager: shims,
}
return m, nil
}
// ID of the task manager
func (m *TaskManager) ID() string {
return plugins.RuntimePluginV2.String() + ".task"
}
// Create launches new shim instance and creates new task
func (m *TaskManager) Create(ctx context.Context, taskID string, opts runtime.CreateOpts) (_ runtime.Task, retErr error) {
bundle, err := NewBundle(ctx, m.root, m.state, taskID, opts.Spec)
if err != nil {
return nil, err
}
defer func() {
if retErr != nil {
bundle.Delete()
}
}()
shim, err := m.manager.Start(ctx, taskID, bundle, opts)
if err != nil {
return nil, fmt.Errorf("failed to start shim: %w", err)
}
// Cast to shim task and call task service to create a new container task instance.
// This will not be required once shim service / client implemented.
shimTask, err := newShimTask(shim)
if err != nil {
return nil, err
}
t, err := shimTask.Create(ctx, opts)
if err != nil {
// NOTE: ctx contains required namespace information.
m.manager.shims.Delete(ctx, taskID)
dctx, cancel := timeout.WithContext(cleanup.Background(ctx), cleanupTimeout)
defer cancel()
sandboxed := opts.SandboxID != ""
_, errShim := shimTask.delete(dctx, sandboxed, func(context.Context, string) {})
if errShim != nil {
if errdefs.IsDeadlineExceeded(errShim) {
dctx, cancel = timeout.WithContext(cleanup.Background(ctx), cleanupTimeout)
defer cancel()
}
shimTask.Shutdown(dctx)
shimTask.Close()
}
return nil, fmt.Errorf("failed to create shim task: %w", err)
}
return t, nil
}
// Get a specific task
func (m *TaskManager) Get(ctx context.Context, id string) (runtime.Task, error) {
shim, err := m.manager.shims.Get(ctx, id)
if err != nil {
return nil, err
}
return newShimTask(shim)
}
// Tasks lists all tasks
func (m *TaskManager) Tasks(ctx context.Context, all bool) ([]runtime.Task, error) {
shims, err := m.manager.shims.GetAll(ctx, all)
if err != nil {
return nil, err
}
out := make([]runtime.Task, len(shims))
for i := range shims {
newClient, err := newShimTask(shims[i])
if err != nil {
return nil, err
}
out[i] = newClient
}
return out, nil
}
// Delete deletes the task and shim instance
func (m *TaskManager) Delete(ctx context.Context, taskID string) (*runtime.Exit, error) {
shim, err := m.manager.shims.Get(ctx, taskID)
if err != nil {
return nil, err
}
container, err := m.manager.containers.Get(ctx, taskID)
if err != nil {
return nil, err
}
shimTask, err := newShimTask(shim)
if err != nil {
return nil, err
}
sandboxed := container.SandboxID != ""
exit, err := shimTask.delete(ctx, sandboxed, func(ctx context.Context, id string) {
m.manager.shims.Delete(ctx, id)
})
if err != nil {
return nil, fmt.Errorf("failed to delete task: %w", err)
}
return exit, nil
}
func (m *TaskManager) PluginInfo(ctx context.Context, request interface{}) (interface{}, error) {
req, ok := request.(*apitypes.RuntimeRequest)
if !ok {
return nil, fmt.Errorf("unknown request type %T: %w", request, errdefs.ErrNotImplemented)
}
runtimePath, err := m.manager.resolveRuntimePath(req.RuntimePath)
if err != nil {
return nil, fmt.Errorf("failed to resolve runtime path: %w", err)
}
var optsB []byte
if req.Options != nil {
optsB, err = proto.Marshal(req.Options)
if err != nil {
return nil, fmt.Errorf("failed to marshal %s: %w", req.Options.TypeUrl, err)
}
}
var stderr bytes.Buffer
cmd := exec.CommandContext(ctx, runtimePath, "-info")
cmd.Stdin = bytes.NewReader(optsB)
cmd.Stderr = &stderr
stdout, err := cmd.Output()
if err != nil {
return nil, fmt.Errorf("failed to run %v: %w (stderr: %q)", cmd.Args, err, stderr.String())
}
var info apitypes.RuntimeInfo
if err = proto.Unmarshal(stdout, &info); err != nil {
return nil, fmt.Errorf("failed to unmarshal stdout from %v into %T: %w", cmd.Args, &info, err)
}
return &info, nil
}

View File

@ -19,8 +19,16 @@ package sandbox
import ( import (
"context" "context"
"fmt" "fmt"
"os"
"time" "time"
"github.com/containerd/errdefs"
"github.com/containerd/log"
"github.com/containerd/platforms"
"github.com/containerd/plugin"
"github.com/containerd/plugin/registry"
"google.golang.org/protobuf/types/known/anypb"
runtimeAPI "github.com/containerd/containerd/v2/api/runtime/sandbox/v1" runtimeAPI "github.com/containerd/containerd/v2/api/runtime/sandbox/v1"
"github.com/containerd/containerd/v2/api/types" "github.com/containerd/containerd/v2/api/types"
"github.com/containerd/containerd/v2/core/events" "github.com/containerd/containerd/v2/core/events"
@ -30,13 +38,6 @@ import (
v2 "github.com/containerd/containerd/v2/core/runtime/v2" v2 "github.com/containerd/containerd/v2/core/runtime/v2"
"github.com/containerd/containerd/v2/core/sandbox" "github.com/containerd/containerd/v2/core/sandbox"
"github.com/containerd/containerd/v2/plugins" "github.com/containerd/containerd/v2/plugins"
"github.com/containerd/errdefs"
"github.com/containerd/log"
"github.com/containerd/platforms"
"github.com/containerd/plugin"
"github.com/containerd/plugin/registry"
"google.golang.org/protobuf/types/known/anypb"
) )
func init() { func init() {
@ -44,11 +45,11 @@ func init() {
Type: plugins.SandboxControllerPlugin, Type: plugins.SandboxControllerPlugin,
ID: "shim", ID: "shim",
Requires: []plugin.Type{ Requires: []plugin.Type{
plugins.RuntimePluginV2, plugins.ShimPlugin,
plugins.EventPlugin, plugins.EventPlugin,
}, },
InitFn: func(ic *plugin.InitContext) (interface{}, error) { InitFn: func(ic *plugin.InitContext) (interface{}, error) {
shimPlugin, err := ic.GetByID(plugins.RuntimePluginV2, "shim") shimPlugin, err := ic.GetByID(plugins.ShimPlugin, "shim")
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -62,16 +63,32 @@ func init() {
shims = shimPlugin.(*v2.ShimManager) shims = shimPlugin.(*v2.ShimManager)
publisher = exchangePlugin.(*exchange.Exchange) publisher = exchangePlugin.(*exchange.Exchange)
) )
state := ic.Properties[plugins.PropertyStateDir]
root := ic.Properties[plugins.PropertyRootDir]
for _, d := range []string{root, state} {
if err := os.MkdirAll(d, 0711); err != nil {
return nil, err
}
}
return &controllerLocal{ if err := shims.LoadExistingShims(ic.Context, root, state); err != nil {
return nil, fmt.Errorf("failed to load existing shim sandboxes, %v", err)
}
c := &controllerLocal{
root: root,
state: state,
shims: shims, shims: shims,
publisher: publisher, publisher: publisher,
}, nil }
return c, nil
}, },
}) })
} }
type controllerLocal struct { type controllerLocal struct {
root string
state string
shims *v2.ShimManager shims *v2.ShimManager
publisher events.Publisher publisher events.Publisher
} }
@ -97,7 +114,7 @@ func (c *controllerLocal) cleanupShim(ctx context.Context, sandboxID string, svc
} }
} }
func (c *controllerLocal) Create(ctx context.Context, info sandbox.Sandbox, opts ...sandbox.CreateOpt) error { func (c *controllerLocal) Create(ctx context.Context, info sandbox.Sandbox, opts ...sandbox.CreateOpt) (retErr error) {
var coptions sandbox.CreateOptions var coptions sandbox.CreateOptions
sandboxID := info.ID sandboxID := info.ID
for _, opt := range opts { for _, opt := range opts {
@ -108,7 +125,17 @@ func (c *controllerLocal) Create(ctx context.Context, info sandbox.Sandbox, opts
return fmt.Errorf("sandbox %s already running: %w", sandboxID, errdefs.ErrAlreadyExists) return fmt.Errorf("sandbox %s already running: %w", sandboxID, errdefs.ErrAlreadyExists)
} }
shim, err := c.shims.Start(ctx, sandboxID, runtime.CreateOpts{ bundle, err := v2.NewBundle(ctx, c.root, c.state, sandboxID, info.Spec)
if err != nil {
return err
}
defer func() {
if retErr != nil {
bundle.Delete()
}
}()
shim, err := c.shims.Start(ctx, sandboxID, bundle, runtime.CreateOpts{
Spec: info.Spec, Spec: info.Spec,
RuntimeOptions: info.Runtime.Options, RuntimeOptions: info.Runtime.Options,
Runtime: info.Runtime.Name, Runtime: info.Runtime.Name,

View File

@ -71,6 +71,8 @@ const (
WarningPlugin plugin.Type = "io.containerd.warning.v1" WarningPlugin plugin.Type = "io.containerd.warning.v1"
// CRIServicePlugin implements a cri service // CRIServicePlugin implements a cri service
CRIServicePlugin plugin.Type = "io.containerd.cri.v1" CRIServicePlugin plugin.Type = "io.containerd.cri.v1"
// ShimPlugin implements a shim service
ShimPlugin plugin.Type = "io.containerd.shim.v1"
) )
const ( const (