Merge pull request #9853 from abel-von/make-shim-independent
sandbox: make an independent shim plugin
This commit is contained in:
		| @@ -19,19 +19,23 @@ package v2 | ||||
| import ( | ||||
| 	"context" | ||||
| 	"errors" | ||||
| 	"fmt" | ||||
| 	"os" | ||||
| 	"path/filepath" | ||||
|  | ||||
| 	"github.com/containerd/errdefs" | ||||
| 	"github.com/containerd/log" | ||||
|  | ||||
| 	"github.com/containerd/containerd/v2/core/mount" | ||||
| 	"github.com/containerd/containerd/v2/internal/cleanup" | ||||
| 	"github.com/containerd/containerd/v2/pkg/namespaces" | ||||
| 	"github.com/containerd/containerd/v2/pkg/timeout" | ||||
| 	"github.com/containerd/errdefs" | ||||
| 	"github.com/containerd/log" | ||||
| ) | ||||
|  | ||||
| func (m *ShimManager) loadExistingTasks(ctx context.Context) error { | ||||
| 	nsDirs, err := os.ReadDir(m.state) | ||||
| // LoadExistingShims loads existing shims from the path specified by stateDir | ||||
| // rootDir is for cleaning up the unused paths of removed shims. | ||||
| func (m *ShimManager) LoadExistingShims(ctx context.Context, stateDir string, rootDir string) error { | ||||
| 	nsDirs, err := os.ReadDir(stateDir) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| @@ -45,11 +49,11 @@ func (m *ShimManager) loadExistingTasks(ctx context.Context) error { | ||||
| 			continue | ||||
| 		} | ||||
| 		log.G(ctx).WithField("namespace", ns).Debug("loading tasks in namespace") | ||||
| 		if err := m.loadShims(namespaces.WithNamespace(ctx, ns)); err != nil { | ||||
| 		if err := m.loadShims(namespaces.WithNamespace(ctx, ns), stateDir); err != nil { | ||||
| 			log.G(ctx).WithField("namespace", ns).WithError(err).Error("loading tasks in namespace") | ||||
| 			continue | ||||
| 		} | ||||
| 		if err := m.cleanupWorkDirs(namespaces.WithNamespace(ctx, ns)); err != nil { | ||||
| 		if err := m.cleanupWorkDirs(namespaces.WithNamespace(ctx, ns), rootDir); err != nil { | ||||
| 			log.G(ctx).WithField("namespace", ns).WithError(err).Error("cleanup working directory in namespace") | ||||
| 			continue | ||||
| 		} | ||||
| @@ -57,14 +61,14 @@ func (m *ShimManager) loadExistingTasks(ctx context.Context) error { | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func (m *ShimManager) loadShims(ctx context.Context) error { | ||||
| func (m *ShimManager) loadShims(ctx context.Context, stateDir string) error { | ||||
| 	ns, err := namespaces.NamespaceRequired(ctx) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	ctx = log.WithLogger(ctx, log.G(ctx).WithField("namespace", ns)) | ||||
|  | ||||
| 	shimDirs, err := os.ReadDir(filepath.Join(m.state, ns)) | ||||
| 	shimDirs, err := os.ReadDir(filepath.Join(stateDir, ns)) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| @@ -77,7 +81,7 @@ func (m *ShimManager) loadShims(ctx context.Context) error { | ||||
| 		if len(id) > 0 && id[0] == '.' { | ||||
| 			continue | ||||
| 		} | ||||
| 		bundle, err := LoadBundle(ctx, m.state, id) | ||||
| 		bundle, err := LoadBundle(ctx, stateDir, id) | ||||
| 		if err != nil { | ||||
| 			// fine to return error here, it is a programmer error if the context | ||||
| 			// does not have a namespace | ||||
| @@ -102,78 +106,87 @@ func (m *ShimManager) loadShims(ctx context.Context) error { | ||||
| 			bundle.Delete() | ||||
| 			continue | ||||
| 		} | ||||
|  | ||||
| 		var ( | ||||
| 			runtime string | ||||
| 		) | ||||
|  | ||||
| 		// If we're on 1.6+ and specified custom path to the runtime binary, path will be saved in 'shim-binary-path' file. | ||||
| 		if data, err := os.ReadFile(filepath.Join(bundle.Path, "shim-binary-path")); err == nil { | ||||
| 			runtime = string(data) | ||||
| 		} else if err != nil && !os.IsNotExist(err) { | ||||
| 			log.G(ctx).WithError(err).Error("failed to read `runtime` path from bundle") | ||||
| 		} | ||||
|  | ||||
| 		// Query runtime name from metadata store | ||||
| 		if runtime == "" { | ||||
| 			container, err := m.containers.Get(ctx, id) | ||||
| 			if err != nil { | ||||
| 				log.G(ctx).WithError(err).Errorf("loading container %s", id) | ||||
| 				if err := mount.UnmountRecursive(filepath.Join(bundle.Path, "rootfs"), 0); err != nil { | ||||
| 					log.G(ctx).WithError(err).Errorf("failed to unmount of rootfs %s", id) | ||||
| 				} | ||||
| 				bundle.Delete() | ||||
| 				continue | ||||
| 			} | ||||
| 			runtime = container.Runtime.Name | ||||
| 		} | ||||
|  | ||||
| 		runtime, err = m.resolveRuntimePath(runtime) | ||||
| 		if err != nil { | ||||
| 		if err := m.loadShim(ctx, bundle); err != nil { | ||||
| 			log.G(ctx).WithError(err).Errorf("failed to load shim %s", bundle.Path) | ||||
| 			bundle.Delete() | ||||
| 			log.G(ctx).WithError(err).Error("failed to resolve runtime path") | ||||
| 			continue | ||||
| 		} | ||||
|  | ||||
| 		binaryCall := shimBinary(bundle, | ||||
| 			shimBinaryConfig{ | ||||
| 				runtime:      runtime, | ||||
| 				address:      m.containerdAddress, | ||||
| 				ttrpcAddress: m.containerdTTRPCAddress, | ||||
| 				schedCore:    m.schedCore, | ||||
| 			}) | ||||
| 		shim, err := loadShimTask(ctx, bundle, func() { | ||||
| 			log.G(ctx).WithField("id", id).Info("shim disconnected") | ||||
| 	} | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| 			cleanupAfterDeadShim(cleanup.Background(ctx), id, m.shims, m.events, binaryCall) | ||||
| 			// Remove self from the runtime task list. | ||||
| 			m.shims.Delete(ctx, id) | ||||
| 		}) | ||||
| func (m *ShimManager) loadShim(ctx context.Context, bundle *Bundle) error { | ||||
| 	var ( | ||||
| 		runtime string | ||||
| 		id      = bundle.ID | ||||
| 	) | ||||
|  | ||||
| 	// If we're on 1.6+ and specified custom path to the runtime binary, path will be saved in 'shim-binary-path' file. | ||||
| 	if data, err := os.ReadFile(filepath.Join(bundle.Path, "shim-binary-path")); err == nil { | ||||
| 		runtime = string(data) | ||||
| 	} else if err != nil && !os.IsNotExist(err) { | ||||
| 		log.G(ctx).WithError(err).Error("failed to read `runtime` path from bundle") | ||||
| 	} | ||||
|  | ||||
| 	// Query runtime name from metadata store | ||||
| 	if runtime == "" { | ||||
| 		container, err := m.containers.Get(ctx, id) | ||||
| 		if err != nil { | ||||
| 			log.G(ctx).WithError(err).Errorf("unable to load shim %q", id) | ||||
| 			cleanupAfterDeadShim(ctx, id, m.shims, m.events, binaryCall) | ||||
| 			continue | ||||
| 			log.G(ctx).WithError(err).Errorf("loading container %s", id) | ||||
| 			if err := mount.UnmountRecursive(filepath.Join(bundle.Path, "rootfs"), 0); err != nil { | ||||
| 				log.G(ctx).WithError(err).Errorf("failed to unmount of rootfs %s", id) | ||||
| 			} | ||||
| 			return err | ||||
| 		} | ||||
| 		runtime = container.Runtime.Name | ||||
| 	} | ||||
|  | ||||
| 		// There are 3 possibilities for the loaded shim here: | ||||
| 		// 1. It could be a shim that is running a task. | ||||
| 		// 2. It could be a sandbox shim. | ||||
| 		// 3. Or it could be a shim that was created for running a task but | ||||
| 		// something happened (probably a containerd crash) and the task was never | ||||
| 		// created. This shim process should be cleaned up here. Look at | ||||
| 		// containerd/containerd#6860 for further details. | ||||
| 	runtime, err := m.resolveRuntimePath(runtime) | ||||
| 	if err != nil { | ||||
| 		bundle.Delete() | ||||
|  | ||||
| 		_, sgetErr := m.sandboxStore.Get(ctx, id) | ||||
| 		pInfo, pidErr := shim.Pids(ctx) | ||||
| 		if sgetErr != nil && errors.Is(sgetErr, errdefs.ErrNotFound) && (len(pInfo) == 0 || errors.Is(pidErr, errdefs.ErrNotFound)) { | ||||
| 			log.G(ctx).WithField("id", id).Info("cleaning leaked shim process") | ||||
| 			// We are unable to get Pids from the shim and it's not a sandbox | ||||
| 			// shim. We should clean it up her. | ||||
| 			// No need to do anything for removeTask since we never added this shim. | ||||
| 			shim.delete(ctx, false, func(ctx context.Context, id string) {}) | ||||
| 		} else { | ||||
| 			m.shims.Add(ctx, shim.ShimInstance) | ||||
| 		} | ||||
| 		return fmt.Errorf("failed to resolve runtime path: %w", err) | ||||
| 	} | ||||
|  | ||||
| 	binaryCall := shimBinary(bundle, | ||||
| 		shimBinaryConfig{ | ||||
| 			runtime:      runtime, | ||||
| 			address:      m.containerdAddress, | ||||
| 			ttrpcAddress: m.containerdTTRPCAddress, | ||||
| 			schedCore:    m.schedCore, | ||||
| 		}) | ||||
| 	// TODO: It seems we can only call loadShim here if it is a sandbox shim? | ||||
| 	shim, err := loadShimTask(ctx, bundle, func() { | ||||
| 		log.G(ctx).WithField("id", id).Info("shim disconnected") | ||||
|  | ||||
| 		cleanupAfterDeadShim(cleanup.Background(ctx), id, m.shims, m.events, binaryCall) | ||||
| 		// Remove self from the runtime task list. | ||||
| 		m.shims.Delete(ctx, id) | ||||
| 	}) | ||||
| 	if err != nil { | ||||
| 		cleanupAfterDeadShim(ctx, id, m.shims, m.events, binaryCall) | ||||
| 		return fmt.Errorf("unable to load shim %q: %w", id, err) | ||||
| 	} | ||||
|  | ||||
| 	// There are 3 possibilities for the loaded shim here: | ||||
| 	// 1. It could be a shim that is running a task. | ||||
| 	// 2. It could be a sandbox shim. | ||||
| 	// 3. Or it could be a shim that was created for running a task but | ||||
| 	// something happened (probably a containerd crash) and the task was never | ||||
| 	// created. This shim process should be cleaned up here. Look at | ||||
| 	// containerd/containerd#6860 for further details. | ||||
|  | ||||
| 	_, sgetErr := m.sandboxStore.Get(ctx, id) | ||||
| 	pInfo, pidErr := shim.Pids(ctx) | ||||
| 	if sgetErr != nil && errors.Is(sgetErr, errdefs.ErrNotFound) && (len(pInfo) == 0 || errors.Is(pidErr, errdefs.ErrNotFound)) { | ||||
| 		log.G(ctx).WithField("id", id).Info("cleaning leaked shim process") | ||||
| 		// We are unable to get Pids from the shim and it's not a sandbox | ||||
| 		// shim. We should clean it up her. | ||||
| 		// No need to do anything for removeTask since we never added this shim. | ||||
| 		shim.delete(ctx, false, func(ctx context.Context, id string) {}) | ||||
| 	} else { | ||||
| 		m.shims.Add(ctx, shim.ShimInstance) | ||||
| 	} | ||||
| 	return nil | ||||
| } | ||||
| @@ -198,13 +211,13 @@ func loadShimTask(ctx context.Context, bundle *Bundle, onClose func()) (_ *shimT | ||||
| 	return s, nil | ||||
| } | ||||
|  | ||||
| func (m *ShimManager) cleanupWorkDirs(ctx context.Context) error { | ||||
| func (m *ShimManager) cleanupWorkDirs(ctx context.Context, rootDir string) error { | ||||
| 	ns, err := namespaces.NamespaceRequired(ctx) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	f, err := os.Open(filepath.Join(m.root, ns)) | ||||
| 	f, err := os.Open(filepath.Join(rootDir, ns)) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| @@ -220,7 +233,7 @@ func (m *ShimManager) cleanupWorkDirs(ctx context.Context) error { | ||||
| 		// this can happen on a reboot where /run for the bundle state is cleaned up | ||||
| 		// but that persistent working dir is left | ||||
| 		if _, err := m.shims.Get(ctx, dir); err != nil { | ||||
| 			path := filepath.Join(m.root, ns, dir) | ||||
| 			path := filepath.Join(rootDir, ns, dir) | ||||
| 			if err := os.RemoveAll(path); err != nil { | ||||
| 				log.G(ctx).WithError(err).Errorf("cleanup working dir %s", path) | ||||
| 			} | ||||
|   | ||||
| @@ -17,7 +17,6 @@ | ||||
| package v2 | ||||
| 
 | ||||
| import ( | ||||
| 	"bytes" | ||||
| 	"context" | ||||
| 	"errors" | ||||
| 	"fmt" | ||||
| @@ -27,7 +26,11 @@ import ( | ||||
| 	"strings" | ||||
| 	"sync" | ||||
| 
 | ||||
| 	apitypes "github.com/containerd/containerd/v2/api/types" | ||||
| 	"github.com/containerd/log" | ||||
| 	"github.com/containerd/platforms" | ||||
| 	"github.com/containerd/plugin" | ||||
| 	"github.com/containerd/plugin/registry" | ||||
| 
 | ||||
| 	"github.com/containerd/containerd/v2/core/containers" | ||||
| 	"github.com/containerd/containerd/v2/core/events/exchange" | ||||
| 	"github.com/containerd/containerd/v2/core/metadata" | ||||
| @@ -39,15 +42,10 @@ import ( | ||||
| 	"github.com/containerd/containerd/v2/pkg/timeout" | ||||
| 	"github.com/containerd/containerd/v2/plugins" | ||||
| 	"github.com/containerd/containerd/v2/protobuf" | ||||
| 	"github.com/containerd/containerd/v2/protobuf/proto" | ||||
| 	"github.com/containerd/errdefs" | ||||
| 	"github.com/containerd/log" | ||||
| 	"github.com/containerd/platforms" | ||||
| 	"github.com/containerd/plugin" | ||||
| 	"github.com/containerd/plugin/registry" | ||||
| 	"github.com/containerd/containerd/v2/version" | ||||
| ) | ||||
| 
 | ||||
| // Config for the v2 runtime | ||||
| // Config for the shim | ||||
| type Config struct { | ||||
| 	// Supported platforms | ||||
| 	Platforms []string `toml:"platforms"` | ||||
| @@ -56,9 +54,12 @@ type Config struct { | ||||
| } | ||||
| 
 | ||||
| func init() { | ||||
| 	// ShimManager is not only for TaskManager, | ||||
| 	// the "shim" sandbox controller also use it to manage shims, | ||||
| 	// so we make it an independent plugin | ||||
| 	registry.Register(&plugin.Registration{ | ||||
| 		Type: plugins.RuntimePluginV2, | ||||
| 		ID:   "task", | ||||
| 		Type: plugins.ShimPlugin, | ||||
| 		ID:   "shim", | ||||
| 		Requires: []plugin.Type{ | ||||
| 			plugins.EventPlugin, | ||||
| 			plugins.MetadataPlugin, | ||||
| @@ -72,7 +73,6 @@ func init() { | ||||
| 			if err != nil { | ||||
| 				return nil, err | ||||
| 			} | ||||
| 
 | ||||
| 			ic.Meta.Platforms = supportedPlatforms | ||||
| 
 | ||||
| 			m, err := ic.GetSingle(plugins.MetadataPlugin) | ||||
| @@ -83,13 +83,10 @@ func init() { | ||||
| 			if err != nil { | ||||
| 				return nil, err | ||||
| 			} | ||||
| 			events := ep.(*exchange.Exchange) | ||||
| 			cs := metadata.NewContainerStore(m.(*metadata.DB)) | ||||
| 			ss := metadata.NewSandboxStore(m.(*metadata.DB)) | ||||
| 			events := ep.(*exchange.Exchange) | ||||
| 
 | ||||
| 			shimManager, err := NewShimManager(ic.Context, &ManagerConfig{ | ||||
| 				Root:         ic.Properties[plugins.PropertyRootDir], | ||||
| 				State:        ic.Properties[plugins.PropertyStateDir], | ||||
| 			return NewShimManager(&ManagerConfig{ | ||||
| 				Address:      ic.Properties[plugins.PropertyGRPCAddress], | ||||
| 				TTRPCAddress: ic.Properties[plugins.PropertyTTRPCAddress], | ||||
| 				Events:       events, | ||||
| @@ -97,36 +94,27 @@ func init() { | ||||
| 				SchedCore:    config.SchedCore, | ||||
| 				SandboxStore: ss, | ||||
| 			}) | ||||
| 			if err != nil { | ||||
| 				return nil, err | ||||
| 			} | ||||
| 
 | ||||
| 			return NewTaskManager(shimManager), nil | ||||
| 		}, | ||||
| 	}) | ||||
| 
 | ||||
| 	// Task manager uses shim manager as a dependency to manage shim instances. | ||||
| 	// However, due to time limits and to avoid migration steps in 1.6 release, | ||||
| 	// use the following workaround. | ||||
| 	// This expected to be removed in 1.7. | ||||
| 	registry.Register(&plugin.Registration{ | ||||
| 		Type: plugins.RuntimePluginV2, | ||||
| 		ID:   "shim", | ||||
| 		InitFn: func(ic *plugin.InitContext) (interface{}, error) { | ||||
| 			taskManagerI, err := ic.GetByID(plugins.RuntimePluginV2, "task") | ||||
| 			if err != nil { | ||||
| 				return nil, err | ||||
| 		ConfigMigration: func(ctx context.Context, configVersion int, pluginConfigs map[string]interface{}) error { | ||||
| 			// Migrate configurations from io.containerd.runtime.v2.task | ||||
| 			// if the configVersion >= 3 please make sure the config is under io.containerd.shim.v1.shim. | ||||
| 			if configVersion >= version.ConfigVersion { | ||||
| 				return nil | ||||
| 			} | ||||
| 
 | ||||
| 			taskManager := taskManagerI.(*TaskManager) | ||||
| 			return taskManager.manager, nil | ||||
| 			const originalPluginName = string(plugins.RuntimePluginV2) + ".task" | ||||
| 			original, ok := pluginConfigs[originalPluginName] | ||||
| 			if !ok { | ||||
| 				return nil | ||||
| 			} | ||||
| 			const newPluginName = string(plugins.ShimPlugin) + ".shim" | ||||
| 			pluginConfigs[originalPluginName] = nil | ||||
| 			pluginConfigs[newPluginName] = original | ||||
| 			return nil | ||||
| 		}, | ||||
| 	}) | ||||
| } | ||||
| 
 | ||||
| type ManagerConfig struct { | ||||
| 	Root         string | ||||
| 	State        string | ||||
| 	Store        containers.Store | ||||
| 	Events       *exchange.Exchange | ||||
| 	Address      string | ||||
| @@ -136,16 +124,8 @@ type ManagerConfig struct { | ||||
| } | ||||
| 
 | ||||
| // NewShimManager creates a manager for v2 shims | ||||
| func NewShimManager(ctx context.Context, config *ManagerConfig) (*ShimManager, error) { | ||||
| 	for _, d := range []string{config.Root, config.State} { | ||||
| 		if err := os.MkdirAll(d, 0711); err != nil { | ||||
| 			return nil, err | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| func NewShimManager(config *ManagerConfig) (*ShimManager, error) { | ||||
| 	m := &ShimManager{ | ||||
| 		root:                   config.Root, | ||||
| 		state:                  config.State, | ||||
| 		containerdAddress:      config.Address, | ||||
| 		containerdTTRPCAddress: config.TTRPCAddress, | ||||
| 		shims:                  runtime.NewNSMap[ShimInstance](), | ||||
| @@ -155,10 +135,6 @@ func NewShimManager(ctx context.Context, config *ManagerConfig) (*ShimManager, e | ||||
| 		sandboxStore:           config.SandboxStore, | ||||
| 	} | ||||
| 
 | ||||
| 	if err := m.loadExistingTasks(ctx); err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 
 | ||||
| 	return m, nil | ||||
| } | ||||
| 
 | ||||
| @@ -167,8 +143,6 @@ func NewShimManager(ctx context.Context, config *ManagerConfig) (*ShimManager, e | ||||
| // The manager is unaware of the underlying services shim provides and lets higher level services consume them, | ||||
| // but don't care about lifecycle management. | ||||
| type ShimManager struct { | ||||
| 	root                   string | ||||
| 	state                  string | ||||
| 	containerdAddress      string | ||||
| 	containerdTTRPCAddress string | ||||
| 	schedCore              bool | ||||
| @@ -182,21 +156,11 @@ type ShimManager struct { | ||||
| 
 | ||||
| // ID of the shim manager | ||||
| func (m *ShimManager) ID() string { | ||||
| 	return plugins.RuntimePluginV2.String() + ".shim" | ||||
| 	return plugins.ShimPlugin.String() + ".shim" | ||||
| } | ||||
| 
 | ||||
| // Start launches a new shim instance | ||||
| func (m *ShimManager) Start(ctx context.Context, id string, opts runtime.CreateOpts) (_ ShimInstance, retErr error) { | ||||
| 	bundle, err := NewBundle(ctx, m.root, m.state, id, opts.Spec) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	defer func() { | ||||
| 		if retErr != nil { | ||||
| 			bundle.Delete() | ||||
| 		} | ||||
| 	}() | ||||
| 
 | ||||
| func (m *ShimManager) Start(ctx context.Context, id string, bundle *Bundle, opts runtime.CreateOpts) (_ ShimInstance, retErr error) { | ||||
| 	// This container belongs to sandbox which supposed to be already started via sandbox API. | ||||
| 	if opts.SandboxID != "" { | ||||
| 		process, err := m.Get(ctx, opts.SandboxID) | ||||
| @@ -209,7 +173,7 @@ func (m *ShimManager) Start(ctx context.Context, id string, opts runtime.CreateO | ||||
| 			return nil, err | ||||
| 		} | ||||
| 
 | ||||
| 		params, err := restoreBootstrapParams(filepath.Join(m.state, process.Namespace(), opts.SandboxID)) | ||||
| 		params, err := restoreBootstrapParams(process.Bundle()) | ||||
| 		if err != nil { | ||||
| 			return nil, err | ||||
| 		} | ||||
| @@ -423,148 +387,3 @@ func (m *ShimManager) Delete(ctx context.Context, id string) error { | ||||
| 
 | ||||
| 	return err | ||||
| } | ||||
| 
 | ||||
| // TaskManager wraps task service client on top of shim manager. | ||||
| type TaskManager struct { | ||||
| 	manager *ShimManager | ||||
| } | ||||
| 
 | ||||
| // NewTaskManager creates a new task manager instance. | ||||
| func NewTaskManager(shims *ShimManager) *TaskManager { | ||||
| 	return &TaskManager{ | ||||
| 		manager: shims, | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| // ID of the task manager | ||||
| func (m *TaskManager) ID() string { | ||||
| 	return plugins.RuntimePluginV2.String() + ".task" | ||||
| } | ||||
| 
 | ||||
| // Create launches new shim instance and creates new task | ||||
| func (m *TaskManager) Create(ctx context.Context, taskID string, opts runtime.CreateOpts) (runtime.Task, error) { | ||||
| 	shim, err := m.manager.Start(ctx, taskID, opts) | ||||
| 	if err != nil { | ||||
| 		return nil, fmt.Errorf("failed to start shim: %w", err) | ||||
| 	} | ||||
| 
 | ||||
| 	// Cast to shim task and call task service to create a new container task instance. | ||||
| 	// This will not be required once shim service / client implemented. | ||||
| 	shimTask, err := newShimTask(shim) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 
 | ||||
| 	t, err := shimTask.Create(ctx, opts) | ||||
| 	if err != nil { | ||||
| 		// NOTE: ctx contains required namespace information. | ||||
| 		m.manager.shims.Delete(ctx, taskID) | ||||
| 
 | ||||
| 		dctx, cancel := timeout.WithContext(cleanup.Background(ctx), cleanupTimeout) | ||||
| 		defer cancel() | ||||
| 
 | ||||
| 		sandboxed := opts.SandboxID != "" | ||||
| 		_, errShim := shimTask.delete(dctx, sandboxed, func(context.Context, string) {}) | ||||
| 		if errShim != nil { | ||||
| 			if errdefs.IsDeadlineExceeded(errShim) { | ||||
| 				dctx, cancel = timeout.WithContext(cleanup.Background(ctx), cleanupTimeout) | ||||
| 				defer cancel() | ||||
| 			} | ||||
| 
 | ||||
| 			shimTask.Shutdown(dctx) | ||||
| 			shimTask.Close() | ||||
| 		} | ||||
| 
 | ||||
| 		return nil, fmt.Errorf("failed to create shim task: %w", err) | ||||
| 	} | ||||
| 
 | ||||
| 	return t, nil | ||||
| } | ||||
| 
 | ||||
| // Get a specific task | ||||
| func (m *TaskManager) Get(ctx context.Context, id string) (runtime.Task, error) { | ||||
| 	shim, err := m.manager.shims.Get(ctx, id) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	return newShimTask(shim) | ||||
| } | ||||
| 
 | ||||
| // Tasks lists all tasks | ||||
| func (m *TaskManager) Tasks(ctx context.Context, all bool) ([]runtime.Task, error) { | ||||
| 	shims, err := m.manager.shims.GetAll(ctx, all) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	out := make([]runtime.Task, len(shims)) | ||||
| 	for i := range shims { | ||||
| 		newClient, err := newShimTask(shims[i]) | ||||
| 		if err != nil { | ||||
| 			return nil, err | ||||
| 		} | ||||
| 		out[i] = newClient | ||||
| 	} | ||||
| 	return out, nil | ||||
| } | ||||
| 
 | ||||
| // Delete deletes the task and shim instance | ||||
| func (m *TaskManager) Delete(ctx context.Context, taskID string) (*runtime.Exit, error) { | ||||
| 	shim, err := m.manager.shims.Get(ctx, taskID) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 
 | ||||
| 	container, err := m.manager.containers.Get(ctx, taskID) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 
 | ||||
| 	shimTask, err := newShimTask(shim) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 
 | ||||
| 	sandboxed := container.SandboxID != "" | ||||
| 
 | ||||
| 	exit, err := shimTask.delete(ctx, sandboxed, func(ctx context.Context, id string) { | ||||
| 		m.manager.shims.Delete(ctx, id) | ||||
| 	}) | ||||
| 
 | ||||
| 	if err != nil { | ||||
| 		return nil, fmt.Errorf("failed to delete task: %w", err) | ||||
| 	} | ||||
| 
 | ||||
| 	return exit, nil | ||||
| } | ||||
| 
 | ||||
| func (m *TaskManager) PluginInfo(ctx context.Context, request interface{}) (interface{}, error) { | ||||
| 	req, ok := request.(*apitypes.RuntimeRequest) | ||||
| 	if !ok { | ||||
| 		return nil, fmt.Errorf("unknown request type %T: %w", request, errdefs.ErrNotImplemented) | ||||
| 	} | ||||
| 
 | ||||
| 	runtimePath, err := m.manager.resolveRuntimePath(req.RuntimePath) | ||||
| 	if err != nil { | ||||
| 		return nil, fmt.Errorf("failed to resolve runtime path: %w", err) | ||||
| 	} | ||||
| 	var optsB []byte | ||||
| 	if req.Options != nil { | ||||
| 		optsB, err = proto.Marshal(req.Options) | ||||
| 		if err != nil { | ||||
| 			return nil, fmt.Errorf("failed to marshal %s: %w", req.Options.TypeUrl, err) | ||||
| 		} | ||||
| 	} | ||||
| 	var stderr bytes.Buffer | ||||
| 	cmd := exec.CommandContext(ctx, runtimePath, "-info") | ||||
| 	cmd.Stdin = bytes.NewReader(optsB) | ||||
| 	cmd.Stderr = &stderr | ||||
| 	stdout, err := cmd.Output() | ||||
| 	if err != nil { | ||||
| 		return nil, fmt.Errorf("failed to run %v: %w (stderr: %q)", cmd.Args, err, stderr.String()) | ||||
| 	} | ||||
| 	var info apitypes.RuntimeInfo | ||||
| 	if err = proto.Unmarshal(stdout, &info); err != nil { | ||||
| 		return nil, fmt.Errorf("failed to unmarshal stdout from %v into %T: %w", cmd.Args, &info, err) | ||||
| 	} | ||||
| 	return &info, nil | ||||
| } | ||||
							
								
								
									
										226
									
								
								core/runtime/v2/task_manager.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										226
									
								
								core/runtime/v2/task_manager.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,226 @@ | ||||
| /* | ||||
|    Copyright The containerd Authors. | ||||
|  | ||||
|    Licensed under the Apache License, Version 2.0 (the "License"); | ||||
|    you may not use this file except in compliance with the License. | ||||
|    You may obtain a copy of the License at | ||||
|  | ||||
|        http://www.apache.org/licenses/LICENSE-2.0 | ||||
|  | ||||
|    Unless required by applicable law or agreed to in writing, software | ||||
|    distributed under the License is distributed on an "AS IS" BASIS, | ||||
|    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
|    See the License for the specific language governing permissions and | ||||
|    limitations under the License. | ||||
| */ | ||||
|  | ||||
| package v2 | ||||
|  | ||||
| import ( | ||||
| 	"bytes" | ||||
| 	"context" | ||||
| 	"fmt" | ||||
| 	"os" | ||||
| 	"os/exec" | ||||
|  | ||||
| 	"github.com/containerd/errdefs" | ||||
| 	"github.com/containerd/plugin" | ||||
| 	"github.com/containerd/plugin/registry" | ||||
|  | ||||
| 	apitypes "github.com/containerd/containerd/v2/api/types" | ||||
| 	"github.com/containerd/containerd/v2/core/runtime" | ||||
| 	"github.com/containerd/containerd/v2/internal/cleanup" | ||||
| 	"github.com/containerd/containerd/v2/pkg/timeout" | ||||
| 	"github.com/containerd/containerd/v2/plugins" | ||||
| 	"github.com/containerd/containerd/v2/protobuf/proto" | ||||
| ) | ||||
|  | ||||
| func init() { | ||||
| 	registry.Register(&plugin.Registration{ | ||||
| 		Type: plugins.RuntimePluginV2, | ||||
| 		ID:   "task", | ||||
| 		Requires: []plugin.Type{ | ||||
| 			plugins.ShimPlugin, | ||||
| 		}, | ||||
| 		InitFn: func(ic *plugin.InitContext) (interface{}, error) { | ||||
| 			shimManagerI, err := ic.GetByID(plugins.ShimPlugin, "shim") | ||||
| 			if err != nil { | ||||
| 				return nil, err | ||||
| 			} | ||||
| 			shimManager := shimManagerI.(*ShimManager) | ||||
| 			root, state := ic.Properties[plugins.PropertyRootDir], ic.Properties[plugins.PropertyStateDir] | ||||
| 			for _, d := range []string{root, state} { | ||||
| 				if err := os.MkdirAll(d, 0711); err != nil { | ||||
| 					return nil, err | ||||
| 				} | ||||
| 			} | ||||
| 			return NewTaskManager(ic.Context, root, state, shimManager) | ||||
| 		}, | ||||
| 	}) | ||||
| } | ||||
|  | ||||
| // TaskManager wraps task service client on top of shim manager. | ||||
| type TaskManager struct { | ||||
| 	root    string | ||||
| 	state   string | ||||
| 	manager *ShimManager | ||||
| } | ||||
|  | ||||
| // NewTaskManager creates a new task manager instance. | ||||
| // root is the rootDir of TaskManager plugin to store persistent data | ||||
| // state is the stateDir of TaskManager plugin to store transient data | ||||
| // shims is  ShimManager for TaskManager to create/delete shims | ||||
| func NewTaskManager(ctx context.Context, root, state string, shims *ShimManager) (*TaskManager, error) { | ||||
| 	if err := shims.LoadExistingShims(ctx, state, root); err != nil { | ||||
| 		return nil, fmt.Errorf("failed to load existing shims for task manager") | ||||
| 	} | ||||
| 	m := &TaskManager{ | ||||
| 		root:    root, | ||||
| 		state:   state, | ||||
| 		manager: shims, | ||||
| 	} | ||||
| 	return m, nil | ||||
| } | ||||
|  | ||||
| // ID of the task manager | ||||
| func (m *TaskManager) ID() string { | ||||
| 	return plugins.RuntimePluginV2.String() + ".task" | ||||
| } | ||||
|  | ||||
| // Create launches new shim instance and creates new task | ||||
| func (m *TaskManager) Create(ctx context.Context, taskID string, opts runtime.CreateOpts) (_ runtime.Task, retErr error) { | ||||
| 	bundle, err := NewBundle(ctx, m.root, m.state, taskID, opts.Spec) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	defer func() { | ||||
| 		if retErr != nil { | ||||
| 			bundle.Delete() | ||||
| 		} | ||||
| 	}() | ||||
|  | ||||
| 	shim, err := m.manager.Start(ctx, taskID, bundle, opts) | ||||
| 	if err != nil { | ||||
| 		return nil, fmt.Errorf("failed to start shim: %w", err) | ||||
| 	} | ||||
|  | ||||
| 	// Cast to shim task and call task service to create a new container task instance. | ||||
| 	// This will not be required once shim service / client implemented. | ||||
| 	shimTask, err := newShimTask(shim) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
|  | ||||
| 	t, err := shimTask.Create(ctx, opts) | ||||
| 	if err != nil { | ||||
| 		// NOTE: ctx contains required namespace information. | ||||
| 		m.manager.shims.Delete(ctx, taskID) | ||||
|  | ||||
| 		dctx, cancel := timeout.WithContext(cleanup.Background(ctx), cleanupTimeout) | ||||
| 		defer cancel() | ||||
|  | ||||
| 		sandboxed := opts.SandboxID != "" | ||||
| 		_, errShim := shimTask.delete(dctx, sandboxed, func(context.Context, string) {}) | ||||
| 		if errShim != nil { | ||||
| 			if errdefs.IsDeadlineExceeded(errShim) { | ||||
| 				dctx, cancel = timeout.WithContext(cleanup.Background(ctx), cleanupTimeout) | ||||
| 				defer cancel() | ||||
| 			} | ||||
|  | ||||
| 			shimTask.Shutdown(dctx) | ||||
| 			shimTask.Close() | ||||
| 		} | ||||
|  | ||||
| 		return nil, fmt.Errorf("failed to create shim task: %w", err) | ||||
| 	} | ||||
|  | ||||
| 	return t, nil | ||||
| } | ||||
|  | ||||
| // Get a specific task | ||||
| func (m *TaskManager) Get(ctx context.Context, id string) (runtime.Task, error) { | ||||
| 	shim, err := m.manager.shims.Get(ctx, id) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	return newShimTask(shim) | ||||
| } | ||||
|  | ||||
| // Tasks lists all tasks | ||||
| func (m *TaskManager) Tasks(ctx context.Context, all bool) ([]runtime.Task, error) { | ||||
| 	shims, err := m.manager.shims.GetAll(ctx, all) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	out := make([]runtime.Task, len(shims)) | ||||
| 	for i := range shims { | ||||
| 		newClient, err := newShimTask(shims[i]) | ||||
| 		if err != nil { | ||||
| 			return nil, err | ||||
| 		} | ||||
| 		out[i] = newClient | ||||
| 	} | ||||
| 	return out, nil | ||||
| } | ||||
|  | ||||
| // Delete deletes the task and shim instance | ||||
| func (m *TaskManager) Delete(ctx context.Context, taskID string) (*runtime.Exit, error) { | ||||
| 	shim, err := m.manager.shims.Get(ctx, taskID) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
|  | ||||
| 	container, err := m.manager.containers.Get(ctx, taskID) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
|  | ||||
| 	shimTask, err := newShimTask(shim) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
|  | ||||
| 	sandboxed := container.SandboxID != "" | ||||
|  | ||||
| 	exit, err := shimTask.delete(ctx, sandboxed, func(ctx context.Context, id string) { | ||||
| 		m.manager.shims.Delete(ctx, id) | ||||
| 	}) | ||||
|  | ||||
| 	if err != nil { | ||||
| 		return nil, fmt.Errorf("failed to delete task: %w", err) | ||||
| 	} | ||||
|  | ||||
| 	return exit, nil | ||||
| } | ||||
|  | ||||
| func (m *TaskManager) PluginInfo(ctx context.Context, request interface{}) (interface{}, error) { | ||||
| 	req, ok := request.(*apitypes.RuntimeRequest) | ||||
| 	if !ok { | ||||
| 		return nil, fmt.Errorf("unknown request type %T: %w", request, errdefs.ErrNotImplemented) | ||||
| 	} | ||||
|  | ||||
| 	runtimePath, err := m.manager.resolveRuntimePath(req.RuntimePath) | ||||
| 	if err != nil { | ||||
| 		return nil, fmt.Errorf("failed to resolve runtime path: %w", err) | ||||
| 	} | ||||
| 	var optsB []byte | ||||
| 	if req.Options != nil { | ||||
| 		optsB, err = proto.Marshal(req.Options) | ||||
| 		if err != nil { | ||||
| 			return nil, fmt.Errorf("failed to marshal %s: %w", req.Options.TypeUrl, err) | ||||
| 		} | ||||
| 	} | ||||
| 	var stderr bytes.Buffer | ||||
| 	cmd := exec.CommandContext(ctx, runtimePath, "-info") | ||||
| 	cmd.Stdin = bytes.NewReader(optsB) | ||||
| 	cmd.Stderr = &stderr | ||||
| 	stdout, err := cmd.Output() | ||||
| 	if err != nil { | ||||
| 		return nil, fmt.Errorf("failed to run %v: %w (stderr: %q)", cmd.Args, err, stderr.String()) | ||||
| 	} | ||||
| 	var info apitypes.RuntimeInfo | ||||
| 	if err = proto.Unmarshal(stdout, &info); err != nil { | ||||
| 		return nil, fmt.Errorf("failed to unmarshal stdout from %v into %T: %w", cmd.Args, &info, err) | ||||
| 	} | ||||
| 	return &info, nil | ||||
| } | ||||
| @@ -19,8 +19,16 @@ package sandbox | ||||
| import ( | ||||
| 	"context" | ||||
| 	"fmt" | ||||
| 	"os" | ||||
| 	"time" | ||||
|  | ||||
| 	"github.com/containerd/errdefs" | ||||
| 	"github.com/containerd/log" | ||||
| 	"github.com/containerd/platforms" | ||||
| 	"github.com/containerd/plugin" | ||||
| 	"github.com/containerd/plugin/registry" | ||||
| 	"google.golang.org/protobuf/types/known/anypb" | ||||
|  | ||||
| 	runtimeAPI "github.com/containerd/containerd/v2/api/runtime/sandbox/v1" | ||||
| 	"github.com/containerd/containerd/v2/api/types" | ||||
| 	"github.com/containerd/containerd/v2/core/events" | ||||
| @@ -30,13 +38,6 @@ import ( | ||||
| 	v2 "github.com/containerd/containerd/v2/core/runtime/v2" | ||||
| 	"github.com/containerd/containerd/v2/core/sandbox" | ||||
| 	"github.com/containerd/containerd/v2/plugins" | ||||
| 	"github.com/containerd/errdefs" | ||||
| 	"github.com/containerd/log" | ||||
| 	"github.com/containerd/platforms" | ||||
| 	"github.com/containerd/plugin" | ||||
| 	"github.com/containerd/plugin/registry" | ||||
|  | ||||
| 	"google.golang.org/protobuf/types/known/anypb" | ||||
| ) | ||||
|  | ||||
| func init() { | ||||
| @@ -44,11 +45,11 @@ func init() { | ||||
| 		Type: plugins.SandboxControllerPlugin, | ||||
| 		ID:   "shim", | ||||
| 		Requires: []plugin.Type{ | ||||
| 			plugins.RuntimePluginV2, | ||||
| 			plugins.ShimPlugin, | ||||
| 			plugins.EventPlugin, | ||||
| 		}, | ||||
| 		InitFn: func(ic *plugin.InitContext) (interface{}, error) { | ||||
| 			shimPlugin, err := ic.GetByID(plugins.RuntimePluginV2, "shim") | ||||
| 			shimPlugin, err := ic.GetByID(plugins.ShimPlugin, "shim") | ||||
| 			if err != nil { | ||||
| 				return nil, err | ||||
| 			} | ||||
| @@ -62,16 +63,32 @@ func init() { | ||||
| 				shims     = shimPlugin.(*v2.ShimManager) | ||||
| 				publisher = exchangePlugin.(*exchange.Exchange) | ||||
| 			) | ||||
| 			state := ic.Properties[plugins.PropertyStateDir] | ||||
| 			root := ic.Properties[plugins.PropertyRootDir] | ||||
| 			for _, d := range []string{root, state} { | ||||
| 				if err := os.MkdirAll(d, 0711); err != nil { | ||||
| 					return nil, err | ||||
| 				} | ||||
| 			} | ||||
|  | ||||
| 			return &controllerLocal{ | ||||
| 			if err := shims.LoadExistingShims(ic.Context, root, state); err != nil { | ||||
| 				return nil, fmt.Errorf("failed to load existing shim sandboxes, %v", err) | ||||
| 			} | ||||
|  | ||||
| 			c := &controllerLocal{ | ||||
| 				root:      root, | ||||
| 				state:     state, | ||||
| 				shims:     shims, | ||||
| 				publisher: publisher, | ||||
| 			}, nil | ||||
| 			} | ||||
| 			return c, nil | ||||
| 		}, | ||||
| 	}) | ||||
| } | ||||
|  | ||||
| type controllerLocal struct { | ||||
| 	root      string | ||||
| 	state     string | ||||
| 	shims     *v2.ShimManager | ||||
| 	publisher events.Publisher | ||||
| } | ||||
| @@ -97,7 +114,7 @@ func (c *controllerLocal) cleanupShim(ctx context.Context, sandboxID string, svc | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (c *controllerLocal) Create(ctx context.Context, info sandbox.Sandbox, opts ...sandbox.CreateOpt) error { | ||||
| func (c *controllerLocal) Create(ctx context.Context, info sandbox.Sandbox, opts ...sandbox.CreateOpt) (retErr error) { | ||||
| 	var coptions sandbox.CreateOptions | ||||
| 	sandboxID := info.ID | ||||
| 	for _, opt := range opts { | ||||
| @@ -108,7 +125,17 @@ func (c *controllerLocal) Create(ctx context.Context, info sandbox.Sandbox, opts | ||||
| 		return fmt.Errorf("sandbox %s already running: %w", sandboxID, errdefs.ErrAlreadyExists) | ||||
| 	} | ||||
|  | ||||
| 	shim, err := c.shims.Start(ctx, sandboxID, runtime.CreateOpts{ | ||||
| 	bundle, err := v2.NewBundle(ctx, c.root, c.state, sandboxID, info.Spec) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	defer func() { | ||||
| 		if retErr != nil { | ||||
| 			bundle.Delete() | ||||
| 		} | ||||
| 	}() | ||||
|  | ||||
| 	shim, err := c.shims.Start(ctx, sandboxID, bundle, runtime.CreateOpts{ | ||||
| 		Spec:           info.Spec, | ||||
| 		RuntimeOptions: info.Runtime.Options, | ||||
| 		Runtime:        info.Runtime.Name, | ||||
|   | ||||
| @@ -71,6 +71,8 @@ const ( | ||||
| 	WarningPlugin plugin.Type = "io.containerd.warning.v1" | ||||
| 	// CRIServicePlugin implements a cri service | ||||
| 	CRIServicePlugin plugin.Type = "io.containerd.cri.v1" | ||||
| 	// ShimPlugin implements a shim service | ||||
| 	ShimPlugin plugin.Type = "io.containerd.shim.v1" | ||||
| ) | ||||
|  | ||||
| const ( | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 Maksym Pavlenko
					Maksym Pavlenko