Merge pull request #5918 from mxpv/shim-manager

Decouple task manager
This commit is contained in:
Maksym Pavlenko 2021-11-04 16:29:52 +02:00 committed by GitHub
commit b5b83e0512
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 461 additions and 257 deletions

View File

@ -41,7 +41,7 @@ import (
"github.com/containerd/containerd/runtime/linux/runctypes"
"github.com/containerd/containerd/runtime/v2/runc/options"
"github.com/containerd/containerd/sys"
specs "github.com/opencontainers/runtime-spec/specs-go"
"github.com/opencontainers/runtime-spec/specs-go"
exec "golang.org/x/sys/execabs"
"golang.org/x/sys/unix"
)
@ -430,7 +430,7 @@ func getLogDirPath(runtimeVersion, id string) string {
case "v1":
return filepath.Join(defaultRoot, plugin.RuntimeLinuxV1, testNamespace, id)
case "v2":
return filepath.Join(defaultState, "io.containerd.runtime.v2.task", testNamespace, id)
return filepath.Join(defaultState, "io.containerd.runtime-shim.v2.shim", testNamespace, id)
default:
panic(fmt.Errorf("Unsupported runtime version %s", runtimeVersion))
}

View File

@ -57,6 +57,8 @@ const (
RuntimePlugin Type = "io.containerd.runtime.v1"
// RuntimePluginV2 implements a runtime v2
RuntimePluginV2 Type = "io.containerd.runtime.v2"
// RuntimeShimPlugin implements the shim manager for runtime v2.
RuntimeShimPlugin Type = "io.containerd.runtime-shim.v2"
// ServicePlugin implements a internal service
ServicePlugin Type = "io.containerd.service.v1"
// GRPCPlugin implements a grpc service

View File

@ -69,8 +69,6 @@ type PlatformRuntime interface {
// Tasks returns all the current tasks for the runtime.
// Any container runs at most one task at a time.
Tasks(ctx context.Context, all bool) ([]Task, error)
// Add adds a task into runtime.
Add(ctx context.Context, task Task) error
// Delete remove a task.
Delete(ctx context.Context, taskID string) (*Exit, error)
}

View File

@ -128,3 +128,16 @@ func (l *TaskList) Delete(ctx context.Context, id string) {
delete(tasks, id)
}
}
func (l *TaskList) IsEmpty() bool {
l.mu.Lock()
defer l.mu.Unlock()
for ns := range l.tasks {
if len(l.tasks[ns]) > 0 {
return false
}
}
return true
}

View File

@ -131,7 +131,6 @@ func (b *binary) Start(ctx context.Context, opts *types.Any, onClose func()) (_
return &shim{
bundle: b.bundle,
client: client,
task: task.NewTaskClient(client),
}, nil
}

View File

@ -20,19 +20,18 @@ import (
"context"
"fmt"
"os"
"path/filepath"
"github.com/containerd/containerd/containers"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/events/exchange"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/metadata"
"github.com/containerd/containerd/mount"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/pkg/timeout"
"github.com/containerd/containerd/platforms"
"github.com/containerd/containerd/plugin"
"github.com/containerd/containerd/runtime"
"github.com/containerd/containerd/runtime/v2/task"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
)
@ -47,8 +46,8 @@ type Config struct {
func init() {
plugin.Register(&plugin.Registration{
Type: plugin.RuntimePluginV2,
ID: "task",
Type: plugin.RuntimeShimPlugin,
ID: "shim",
Requires: []plugin.Type{
plugin.EventPlugin,
plugin.MetadataPlugin,
@ -64,12 +63,7 @@ func init() {
}
ic.Meta.Platforms = supportedPlatforms
if err := os.MkdirAll(ic.Root, 0711); err != nil {
return nil, err
}
if err := os.MkdirAll(ic.State, 0711); err != nil {
return nil, err
}
m, err := ic.Get(plugin.MetadataPlugin)
if err != nil {
return nil, err
@ -81,7 +75,7 @@ func init() {
cs := metadata.NewContainerStore(m.(*metadata.DB))
events := ep.(*exchange.Exchange)
return New(ic.Context, &ManagerConfig{
return NewShimManager(ic.Context, &ManagerConfig{
Root: ic.Root,
State: ic.State,
Address: ic.Address,
@ -92,6 +86,68 @@ func init() {
})
},
})
plugin.Register(&plugin.Registration{
Type: plugin.RuntimePluginV2,
ID: "task",
Requires: []plugin.Type{
plugin.RuntimeShimPlugin,
},
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
shimManagerInterface, err := ic.Get(plugin.RuntimeShimPlugin)
if err != nil {
return nil, err
}
shimManager := shimManagerInterface.(*ShimManager)
// From now on task manager works via shim manager, which has different home directory.
// Check if there are any leftovers from previous containerd versions and migrate home directory,
// so we can properly restore existing tasks as well.
if err := migrateTasks(ic, shimManager); err != nil {
log.G(ic.Context).WithError(err).Error("unable to migrate tasks")
}
return NewTaskManager(shimManager), nil
},
})
}
func migrateTasks(ic *plugin.InitContext, shimManager *ShimManager) error {
if !shimManager.shims.IsEmpty() {
return nil
}
// Rename below will fail is target directory exists.
// `Root` and `State` dirs expected to be empty at this point (we check that there are no shims loaded above).
// If for some they are not empty, these remove calls will fail (`os.Remove` requires a dir to be empty to succeed).
if err := os.Remove(shimManager.root); err != nil && !os.IsNotExist(err) {
return fmt.Errorf("failed to remove `root` dir: %w", err)
}
if err := os.Remove(shimManager.state); err != nil && !os.IsNotExist(err) {
return fmt.Errorf("failed to remove `state` dir: %w", err)
}
if err := os.Rename(ic.Root, shimManager.root); err != nil {
if os.IsNotExist(err) {
return nil
}
return fmt.Errorf("failed to migrate task `root` directory: %w", err)
}
if err := os.Rename(ic.State, shimManager.state); err != nil {
if os.IsNotExist(err) {
return nil
}
return fmt.Errorf("failed to migrate task `state` directory: %w", err)
}
if err := shimManager.loadExistingTasks(ic.Context); err != nil {
return fmt.Errorf("failed to load tasks after migration: %w", err)
}
return nil
}
type ManagerConfig struct {
@ -104,49 +160,54 @@ type ManagerConfig struct {
SchedCore bool
}
// New task manager for v2 shims
func New(ctx context.Context, config *ManagerConfig) (*TaskManager, error) {
// NewShimManager creates a manager for v2 shims
func NewShimManager(ctx context.Context, config *ManagerConfig) (*ShimManager, error) {
for _, d := range []string{config.Root, config.State} {
if err := os.MkdirAll(d, 0711); err != nil {
return nil, err
}
}
m := &TaskManager{
m := &ShimManager{
root: config.Root,
state: config.State,
containerdAddress: config.Address,
containerdTTRPCAddress: config.TTRPCAddress,
schedCore: config.SchedCore,
tasks: runtime.NewTaskList(),
shims: runtime.NewTaskList(),
events: config.Events,
containers: config.Store,
schedCore: config.SchedCore,
}
if err := m.loadExistingTasks(ctx); err != nil {
return nil, err
}
return m, nil
}
// TaskManager manages v2 shim's and their tasks
type TaskManager struct {
// ShimManager manages currently running shim processes.
// It is mainly responsible for launching new shims and for proper shutdown and cleanup of existing instances.
// The manager is unaware of the underlying services shim provides and lets higher level services consume them,
// but don't care about lifecycle management.
type ShimManager struct {
root string
state string
containerdAddress string
containerdTTRPCAddress string
schedCore bool
tasks *runtime.TaskList
shims *runtime.TaskList
events *exchange.Exchange
containers containers.Store
}
// ID of the task manager
func (m *TaskManager) ID() string {
return fmt.Sprintf("%s.%s", plugin.RuntimePluginV2, "task")
// ID of the shim manager
func (m *ShimManager) ID() string {
return fmt.Sprintf("%s.%s", plugin.RuntimeShimPlugin, "shim")
}
// Create a new task
func (m *TaskManager) Create(ctx context.Context, id string, opts runtime.CreateOpts) (_ runtime.Task, retErr error) {
// Start launches a new shim instance
func (m *ShimManager) Start(ctx context.Context, id string, opts runtime.CreateOpts) (_ ShimProcess, retErr error) {
bundle, err := NewBundle(ctx, m.root, m.state, id, opts.Spec.Value)
if err != nil {
return nil, err
@ -163,23 +224,25 @@ func (m *TaskManager) Create(ctx context.Context, id string, opts runtime.Create
}
defer func() {
if retErr != nil {
m.deleteShim(shim)
m.cleanupShim(shim)
}
}()
t, err := shim.Create(ctx, opts)
if err != nil {
return nil, errors.Wrap(err, "failed to create shim")
// NOTE: temporarily keep this wrapper around until containerd's task service depends on it.
// This will no longer be required once we migrate to client side task management.
shimTask := &shimTask{
shim: shim,
task: task.NewTaskClient(shim.client),
}
if err := m.tasks.Add(ctx, t); err != nil {
if err := m.shims.Add(ctx, shimTask); err != nil {
return nil, errors.Wrap(err, "failed to add task")
}
return t, nil
return shimTask, nil
}
func (m *TaskManager) startShim(ctx context.Context, bundle *Bundle, id string, opts runtime.CreateOpts) (*shim, error) {
func (m *ShimManager) startShim(ctx context.Context, bundle *Bundle, id string, opts runtime.CreateOpts) (*shim, error) {
ns, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return nil, err
@ -199,12 +262,12 @@ func (m *TaskManager) startShim(ctx context.Context, bundle *Bundle, id string,
shim, err := b.Start(ctx, topts, func() {
log.G(ctx).WithField("id", id).Info("shim disconnected")
cleanupAfterDeadShim(context.Background(), id, ns, m.tasks, m.events, b)
cleanupAfterDeadShim(context.Background(), id, ns, m.shims, m.events, b)
// Remove self from the runtime task list. Even though the cleanupAfterDeadShim()
// would publish taskExit event, but the shim.Delete() would always failed with ttrpc
// disconnect and there is no chance to remove this dead task from runtime task lists.
// Thus it's better to delete it here.
m.tasks.Delete(ctx, id)
m.shims.Delete(ctx, id)
})
if err != nil {
return nil, errors.Wrap(err, "start failed")
@ -213,176 +276,36 @@ func (m *TaskManager) startShim(ctx context.Context, bundle *Bundle, id string,
return shim, nil
}
// deleteShim attempts to properly delete and cleanup shim after error
func (m *TaskManager) deleteShim(shim *shim) {
// cleanupShim attempts to properly delete and cleanup shim after error
func (m *ShimManager) cleanupShim(shim *shim) {
dctx, cancel := timeout.WithContext(context.Background(), cleanupTimeout)
defer cancel()
_, errShim := shim.delete(dctx, m.tasks.Delete)
if errShim != nil {
if errdefs.IsDeadlineExceeded(errShim) {
dctx, cancel = timeout.WithContext(context.Background(), cleanupTimeout)
defer cancel()
}
shim.Shutdown(dctx)
shim.Close()
}
_ = shim.delete(dctx)
m.shims.Delete(dctx, shim.ID())
}
// Get a specific task
func (m *TaskManager) Get(ctx context.Context, id string) (runtime.Task, error) {
return m.tasks.Get(ctx, id)
}
func (m *ShimManager) Get(ctx context.Context, id string) (ShimProcess, error) {
proc, err := m.shims.Get(ctx, id)
if err != nil {
return nil, err
}
// Add a runtime task
func (m *TaskManager) Add(ctx context.Context, task runtime.Task) error {
return m.tasks.Add(ctx, task)
return proc, nil
}
// Delete a runtime task
func (m *TaskManager) Delete(ctx context.Context, id string) (*runtime.Exit, error) {
task, err := m.tasks.Get(ctx, id)
if err != nil {
return nil, err
}
shim := task.(*shim)
exit, err := shim.delete(ctx, m.tasks.Delete)
if err != nil {
return nil, err
}
return exit, err
}
// Tasks lists all tasks
func (m *TaskManager) Tasks(ctx context.Context, all bool) ([]runtime.Task, error) {
return m.tasks.GetAll(ctx, all)
}
func (m *TaskManager) loadExistingTasks(ctx context.Context) error {
nsDirs, err := os.ReadDir(m.state)
func (m *ShimManager) Delete(ctx context.Context, id string) error {
proc, err := m.shims.Get(ctx, id)
if err != nil {
return err
}
for _, nsd := range nsDirs {
if !nsd.IsDir() {
continue
}
ns := nsd.Name()
// skip hidden directories
if len(ns) > 0 && ns[0] == '.' {
continue
}
log.G(ctx).WithField("namespace", ns).Debug("loading tasks in namespace")
if err := m.loadTasks(namespaces.WithNamespace(ctx, ns)); err != nil {
log.G(ctx).WithField("namespace", ns).WithError(err).Error("loading tasks in namespace")
continue
}
if err := m.cleanupWorkDirs(namespaces.WithNamespace(ctx, ns)); err != nil {
log.G(ctx).WithField("namespace", ns).WithError(err).Error("cleanup working directory in namespace")
continue
}
}
return nil
}
func (m *TaskManager) loadTasks(ctx context.Context) error {
ns, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return err
}
shimDirs, err := os.ReadDir(filepath.Join(m.state, ns))
if err != nil {
return err
}
for _, sd := range shimDirs {
if !sd.IsDir() {
continue
}
id := sd.Name()
// skip hidden directories
if len(id) > 0 && id[0] == '.' {
continue
}
bundle, err := LoadBundle(ctx, m.state, id)
if err != nil {
// fine to return error here, it is a programmer error if the context
// does not have a namespace
return err
}
// fast path
bf, err := os.ReadDir(bundle.Path)
if err != nil {
bundle.Delete()
log.G(ctx).WithError(err).Errorf("fast path read bundle path for %s", bundle.Path)
continue
}
if len(bf) == 0 {
bundle.Delete()
continue
}
container, err := m.container(ctx, id)
if err != nil {
log.G(ctx).WithError(err).Errorf("loading container %s", id)
if err := mount.UnmountAll(filepath.Join(bundle.Path, "rootfs"), 0); err != nil {
log.G(ctx).WithError(err).Errorf("forceful unmount of rootfs %s", id)
}
bundle.Delete()
continue
}
binaryCall := shimBinary(bundle,
shimBinaryConfig{
runtime: container.Runtime.Name,
address: m.containerdAddress,
ttrpcAddress: m.containerdTTRPCAddress,
schedCore: m.schedCore,
})
shim, err := loadShim(ctx, bundle, func() {
log.G(ctx).WithField("id", id).Info("shim disconnected")
shimTask := proc.(*shimTask)
err = shimTask.shim.delete(ctx)
m.shims.Delete(ctx, id)
cleanupAfterDeadShim(context.Background(), id, ns, m.tasks, m.events, binaryCall)
// Remove self from the runtime task list.
m.tasks.Delete(ctx, id)
})
if err != nil {
cleanupAfterDeadShim(ctx, id, ns, m.tasks, m.events, binaryCall)
continue
}
m.tasks.Add(ctx, shim)
}
return nil
}
func (m *TaskManager) container(ctx context.Context, id string) (*containers.Container, error) {
container, err := m.containers.Get(ctx, id)
if err != nil {
return nil, err
}
return &container, nil
}
func (m *TaskManager) cleanupWorkDirs(ctx context.Context) error {
ns, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return err
}
dirs, err := os.ReadDir(filepath.Join(m.root, ns))
if err != nil {
return err
}
for _, d := range dirs {
// if the task was not loaded, cleanup and empty working directory
// this can happen on a reboot where /run for the bundle state is cleaned up
// but that persistent working dir is left
if _, err := m.tasks.Get(ctx, d.Name()); err != nil {
path := filepath.Join(m.root, ns, d.Name())
if err := os.RemoveAll(path); err != nil {
log.G(ctx).WithError(err).Errorf("cleanup working dir %s", path)
}
}
}
return nil
}
func parsePlatforms(platformStr []string) ([]ocispec.Platform, error) {
@ -396,3 +319,84 @@ func parsePlatforms(platformStr []string) ([]ocispec.Platform, error) {
}
return p, nil
}
// TaskManager wraps task service client on top of shim manager.
type TaskManager struct {
manager *ShimManager
}
// NewTaskManager creates a new task manager instance.
func NewTaskManager(shims *ShimManager) *TaskManager {
return &TaskManager{
manager: shims,
}
}
// ID of the task manager
func (m *TaskManager) ID() string {
return fmt.Sprintf("%s.%s", plugin.RuntimePluginV2, "task")
}
// Create launches new shim instance and creates new task
func (m *TaskManager) Create(ctx context.Context, taskID string, opts runtime.CreateOpts) (runtime.Task, error) {
process, err := m.manager.Start(ctx, taskID, opts)
if err != nil {
return nil, errors.Wrap(err, "failed to start shim")
}
// Cast to shim task and call task service to create a new container task instance.
// This will not be required once shim service / client implemented.
shim := process.(*shimTask)
t, err := shim.Create(ctx, opts)
if err != nil {
dctx, cancel := timeout.WithContext(context.Background(), cleanupTimeout)
defer cancel()
_, errShim := shim.delete(dctx, func(ctx context.Context, id string) {
m.manager.shims.Delete(ctx, id)
})
if errShim != nil {
if errdefs.IsDeadlineExceeded(errShim) {
dctx, cancel = timeout.WithContext(context.Background(), cleanupTimeout)
defer cancel()
}
shim.Shutdown(dctx)
shim.Close()
}
return nil, errors.Wrap(err, "failed to create shim task")
}
return t, nil
}
// Get a specific task
func (m *TaskManager) Get(ctx context.Context, id string) (runtime.Task, error) {
return m.manager.shims.Get(ctx, id)
}
// Tasks lists all tasks
func (m *TaskManager) Tasks(ctx context.Context, all bool) ([]runtime.Task, error) {
return m.manager.shims.GetAll(ctx, all)
}
// Delete deletes the task and shim instance
func (m *TaskManager) Delete(ctx context.Context, taskID string) (*runtime.Exit, error) {
item, err := m.manager.shims.Get(ctx, taskID)
if err != nil {
return nil, err
}
shimTask := item.(*shimTask)
exit, err := shimTask.delete(ctx, func(ctx context.Context, id string) {
m.manager.shims.Delete(ctx, id)
})
if err != nil {
return nil, fmt.Errorf("failed to delete task: %w", err)
}
return exit, nil
}

View File

@ -29,7 +29,7 @@ import (
type process struct {
id string
shim *shim
shim *shimTask
}
func (p *process) ID() string {

View File

@ -18,6 +18,7 @@ package v2
import (
"context"
"fmt"
"io"
"os"
"path/filepath"
@ -37,6 +38,7 @@ import (
"github.com/containerd/containerd/runtime/v2/task"
"github.com/containerd/ttrpc"
ptypes "github.com/gogo/protobuf/types"
"github.com/hashicorp/go-multierror"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
@ -61,7 +63,7 @@ func loadAddress(path string) (string, error) {
return string(data), nil
}
func loadShim(ctx context.Context, bundle *Bundle, onClose func()) (_ *shim, err error) {
func loadShim(ctx context.Context, bundle *Bundle, onClose func()) (_ *shimTask, err error) {
address, err := loadAddress(filepath.Join(bundle.Path, "address"))
if err != nil {
return nil, err
@ -115,10 +117,12 @@ func loadShim(ctx context.Context, bundle *Bundle, onClose func()) (_ *shim, err
client.Close()
}
}()
s := &shim{
client: client,
task: task.NewTaskClient(client),
s := &shimTask{
shim: &shim{
bundle: bundle,
client: client,
},
task: task.NewTaskClient(client),
}
ctx, cancel := timeout.WithContext(ctx, loadTimeout)
defer cancel()
@ -182,28 +186,19 @@ func cleanupAfterDeadShim(ctx context.Context, id, ns string, rt *runtime.TaskLi
})
}
var _ runtime.Task = &shim{}
// ShimProcess represents a shim instance managed by the shim service.
type ShimProcess interface {
runtime.Process
// ID of the shim.
ID() string
// Namespace of this shim.
Namespace() string
}
type shim struct {
bundle *Bundle
client *ttrpc.Client
task task.TaskService
}
func (s *shim) Shutdown(ctx context.Context) error {
_, err := s.task.Shutdown(ctx, &task.ShutdownRequest{
ID: s.ID(),
})
if err != nil && !errors.Is(err, ttrpc.ErrClosed) {
return errdefs.FromGRPC(err)
}
return nil
}
func (s *shim) waitShutdown(ctx context.Context) error {
ctx, cancel := timeout.WithContext(ctx, shutdownTimeout)
defer cancel()
return s.Shutdown(ctx)
}
// ID of the shim/task
@ -211,18 +206,6 @@ func (s *shim) ID() string {
return s.bundle.ID
}
// PID of the task
func (s *shim) PID(ctx context.Context) (uint32, error) {
response, err := s.task.Connect(ctx, &task.ConnectRequest{
ID: s.ID(),
})
if err != nil {
return 0, errdefs.FromGRPC(err)
}
return response.TaskPid, nil
}
func (s *shim) Namespace() string {
return s.bundle.Namespace
}
@ -231,7 +214,64 @@ func (s *shim) Close() error {
return s.client.Close()
}
func (s *shim) delete(ctx context.Context, removeTask func(ctx context.Context, id string)) (*runtime.Exit, error) {
func (s *shim) delete(ctx context.Context) error {
var (
result *multierror.Error
)
if err := s.Close(); err != nil {
result = multierror.Append(result, fmt.Errorf("failed to close ttrpc client: %w", err))
}
if err := s.client.UserOnCloseWait(ctx); err != nil {
result = multierror.Append(result, fmt.Errorf("close wait error: %w", err))
}
if err := s.bundle.Delete(); err != nil {
log.G(ctx).WithField("id", s.ID()).WithError(err).Error("failed to delete bundle")
result = multierror.Append(result, fmt.Errorf("failed to delete bundle: %w", err))
}
return result.ErrorOrNil()
}
var _ runtime.Task = &shimTask{}
// shimTask wraps shim process and adds task service client for compatibility with existing shim manager.
type shimTask struct {
*shim
task task.TaskService
}
func (s *shimTask) Shutdown(ctx context.Context) error {
_, err := s.task.Shutdown(ctx, &task.ShutdownRequest{
ID: s.ID(),
})
if err != nil && !errors.Is(err, ttrpc.ErrClosed) {
return errdefs.FromGRPC(err)
}
return nil
}
func (s *shimTask) waitShutdown(ctx context.Context) error {
ctx, cancel := timeout.WithContext(ctx, shutdownTimeout)
defer cancel()
return s.Shutdown(ctx)
}
// PID of the task
func (s *shimTask) PID(ctx context.Context) (uint32, error) {
response, err := s.task.Connect(ctx, &task.ConnectRequest{
ID: s.ID(),
})
if err != nil {
return 0, errdefs.FromGRPC(err)
}
return response.TaskPid, nil
}
func (s *shimTask) delete(ctx context.Context, removeTask func(ctx context.Context, id string)) (*runtime.Exit, error) {
response, shimErr := s.task.Delete(ctx, &task.DeleteRequest{
ID: s.ID(),
})
@ -260,20 +300,21 @@ func (s *shim) delete(ctx context.Context, removeTask func(ctx context.Context,
}
if err := s.waitShutdown(ctx); err != nil {
log.G(ctx).WithField("id", s.ID()).WithError(err).Error("failed to shutdown shim")
log.G(ctx).WithField("id", s.ID()).WithError(err).Error("failed to shutdown shim task")
}
if err := s.shim.delete(ctx); err != nil {
log.G(ctx).WithField("id", s.ID()).WithError(err).Errorf("failed to delete shim")
}
s.Close()
s.client.UserOnCloseWait(ctx)
// remove self from the runtime task list
// this seems dirty but it cleans up the API across runtimes, tasks, and the service
removeTask(ctx, s.ID())
if err := s.bundle.Delete(); err != nil {
log.G(ctx).WithField("id", s.ID()).WithError(err).Error("failed to delete bundle")
}
if shimErr != nil {
return nil, shimErr
}
return &runtime.Exit{
Status: response.ExitStatus,
Timestamp: response.ExitedAt,
@ -281,7 +322,7 @@ func (s *shim) delete(ctx context.Context, removeTask func(ctx context.Context,
}, nil
}
func (s *shim) Create(ctx context.Context, opts runtime.CreateOpts) (runtime.Task, error) {
func (s *shimTask) Create(ctx context.Context, opts runtime.CreateOpts) (runtime.Task, error) {
topts := opts.TaskOptions
if topts == nil {
topts = opts.RuntimeOptions
@ -312,7 +353,7 @@ func (s *shim) Create(ctx context.Context, opts runtime.CreateOpts) (runtime.Tas
return s, nil
}
func (s *shim) Pause(ctx context.Context) error {
func (s *shimTask) Pause(ctx context.Context) error {
if _, err := s.task.Pause(ctx, &task.PauseRequest{
ID: s.ID(),
}); err != nil {
@ -321,7 +362,7 @@ func (s *shim) Pause(ctx context.Context) error {
return nil
}
func (s *shim) Resume(ctx context.Context) error {
func (s *shimTask) Resume(ctx context.Context) error {
if _, err := s.task.Resume(ctx, &task.ResumeRequest{
ID: s.ID(),
}); err != nil {
@ -330,7 +371,7 @@ func (s *shim) Resume(ctx context.Context) error {
return nil
}
func (s *shim) Start(ctx context.Context) error {
func (s *shimTask) Start(ctx context.Context) error {
_, err := s.task.Start(ctx, &task.StartRequest{
ID: s.ID(),
})
@ -340,7 +381,7 @@ func (s *shim) Start(ctx context.Context) error {
return nil
}
func (s *shim) Kill(ctx context.Context, signal uint32, all bool) error {
func (s *shimTask) Kill(ctx context.Context, signal uint32, all bool) error {
if _, err := s.task.Kill(ctx, &task.KillRequest{
ID: s.ID(),
Signal: signal,
@ -351,7 +392,7 @@ func (s *shim) Kill(ctx context.Context, signal uint32, all bool) error {
return nil
}
func (s *shim) Exec(ctx context.Context, id string, opts runtime.ExecOpts) (runtime.ExecProcess, error) {
func (s *shimTask) Exec(ctx context.Context, id string, opts runtime.ExecOpts) (runtime.ExecProcess, error) {
if err := identifiers.Validate(id); err != nil {
return nil, errors.Wrapf(err, "invalid exec id %s", id)
}
@ -373,7 +414,7 @@ func (s *shim) Exec(ctx context.Context, id string, opts runtime.ExecOpts) (runt
}, nil
}
func (s *shim) Pids(ctx context.Context) ([]runtime.ProcessInfo, error) {
func (s *shimTask) Pids(ctx context.Context) ([]runtime.ProcessInfo, error) {
resp, err := s.task.Pids(ctx, &task.PidsRequest{
ID: s.ID(),
})
@ -390,7 +431,7 @@ func (s *shim) Pids(ctx context.Context) ([]runtime.ProcessInfo, error) {
return processList, nil
}
func (s *shim) ResizePty(ctx context.Context, size runtime.ConsoleSize) error {
func (s *shimTask) ResizePty(ctx context.Context, size runtime.ConsoleSize) error {
_, err := s.task.ResizePty(ctx, &task.ResizePtyRequest{
ID: s.ID(),
Width: size.Width,
@ -402,7 +443,7 @@ func (s *shim) ResizePty(ctx context.Context, size runtime.ConsoleSize) error {
return nil
}
func (s *shim) CloseIO(ctx context.Context) error {
func (s *shimTask) CloseIO(ctx context.Context) error {
_, err := s.task.CloseIO(ctx, &task.CloseIORequest{
ID: s.ID(),
Stdin: true,
@ -413,7 +454,7 @@ func (s *shim) CloseIO(ctx context.Context) error {
return nil
}
func (s *shim) Wait(ctx context.Context) (*runtime.Exit, error) {
func (s *shimTask) Wait(ctx context.Context) (*runtime.Exit, error) {
taskPid, err := s.PID(ctx)
if err != nil {
return nil, err
@ -431,7 +472,7 @@ func (s *shim) Wait(ctx context.Context) (*runtime.Exit, error) {
}, nil
}
func (s *shim) Checkpoint(ctx context.Context, path string, options *ptypes.Any) error {
func (s *shimTask) Checkpoint(ctx context.Context, path string, options *ptypes.Any) error {
request := &task.CheckpointTaskRequest{
ID: s.ID(),
Path: path,
@ -443,7 +484,7 @@ func (s *shim) Checkpoint(ctx context.Context, path string, options *ptypes.Any)
return nil
}
func (s *shim) Update(ctx context.Context, resources *ptypes.Any, annotations map[string]string) error {
func (s *shimTask) Update(ctx context.Context, resources *ptypes.Any, annotations map[string]string) error {
if _, err := s.task.Update(ctx, &task.UpdateTaskRequest{
ID: s.ID(),
Resources: resources,
@ -454,7 +495,7 @@ func (s *shim) Update(ctx context.Context, resources *ptypes.Any, annotations ma
return nil
}
func (s *shim) Stats(ctx context.Context) (*ptypes.Any, error) {
func (s *shimTask) Stats(ctx context.Context) (*ptypes.Any, error) {
response, err := s.task.Stats(ctx, &task.StatsRequest{
ID: s.ID(),
})
@ -464,7 +505,7 @@ func (s *shim) Stats(ctx context.Context) (*ptypes.Any, error) {
return response.Stats, nil
}
func (s *shim) Process(ctx context.Context, id string) (runtime.ExecProcess, error) {
func (s *shimTask) Process(ctx context.Context, id string) (runtime.ExecProcess, error) {
p := &process{
id: id,
shim: s,
@ -475,7 +516,7 @@ func (s *shim) Process(ctx context.Context, id string) (runtime.ExecProcess, err
return p, nil
}
func (s *shim) State(ctx context.Context) (runtime.State, error) {
func (s *shimTask) State(ctx context.Context) (runtime.State, error) {
response, err := s.task.State(ctx, &task.StateRequest{
ID: s.ID(),
})

144
runtime/v2/shim_load.go Normal file
View File

@ -0,0 +1,144 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package v2
import (
"context"
"os"
"path/filepath"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/mount"
"github.com/containerd/containerd/namespaces"
)
func (m *ShimManager) loadExistingTasks(ctx context.Context) error {
nsDirs, err := os.ReadDir(m.state)
if err != nil {
return err
}
for _, nsd := range nsDirs {
if !nsd.IsDir() {
continue
}
ns := nsd.Name()
// skip hidden directories
if len(ns) > 0 && ns[0] == '.' {
continue
}
log.G(ctx).WithField("namespace", ns).Debug("loading tasks in namespace")
if err := m.loadShims(namespaces.WithNamespace(ctx, ns)); err != nil {
log.G(ctx).WithField("namespace", ns).WithError(err).Error("loading tasks in namespace")
continue
}
if err := m.cleanupWorkDirs(namespaces.WithNamespace(ctx, ns)); err != nil {
log.G(ctx).WithField("namespace", ns).WithError(err).Error("cleanup working directory in namespace")
continue
}
}
return nil
}
func (m *ShimManager) loadShims(ctx context.Context) error {
ns, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return err
}
shimDirs, err := os.ReadDir(filepath.Join(m.state, ns))
if err != nil {
return err
}
for _, sd := range shimDirs {
if !sd.IsDir() {
continue
}
id := sd.Name()
// skip hidden directories
if len(id) > 0 && id[0] == '.' {
continue
}
bundle, err := LoadBundle(ctx, m.state, id)
if err != nil {
// fine to return error here, it is a programmer error if the context
// does not have a namespace
return err
}
// fast path
bf, err := os.ReadDir(bundle.Path)
if err != nil {
bundle.Delete()
log.G(ctx).WithError(err).Errorf("fast path read bundle path for %s", bundle.Path)
continue
}
if len(bf) == 0 {
bundle.Delete()
continue
}
container, err := m.containers.Get(ctx, id)
if err != nil {
log.G(ctx).WithError(err).Errorf("loading container %s", id)
if err := mount.UnmountAll(filepath.Join(bundle.Path, "rootfs"), 0); err != nil {
log.G(ctx).WithError(err).Errorf("forceful unmount of rootfs %s", id)
}
bundle.Delete()
continue
}
binaryCall := shimBinary(bundle,
shimBinaryConfig{
runtime: container.Runtime.Name,
address: m.containerdAddress,
ttrpcAddress: m.containerdTTRPCAddress,
schedCore: m.schedCore,
})
shim, err := loadShim(ctx, bundle, func() {
log.G(ctx).WithField("id", id).Info("shim disconnected")
cleanupAfterDeadShim(context.Background(), id, ns, m.shims, m.events, binaryCall)
// Remove self from the runtime task list.
m.shims.Delete(ctx, id)
})
if err != nil {
cleanupAfterDeadShim(ctx, id, ns, m.shims, m.events, binaryCall)
continue
}
m.shims.Add(ctx, shim)
}
return nil
}
func (m *ShimManager) cleanupWorkDirs(ctx context.Context) error {
ns, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return err
}
dirs, err := os.ReadDir(filepath.Join(m.root, ns))
if err != nil {
return err
}
for _, d := range dirs {
// if the task was not loaded, cleanup and empty working directory
// this can happen on a reboot where /run for the bundle state is cleaned up
// but that persistent working dir is left
if _, err := m.shims.Get(ctx, d.Name()); err != nil {
path := filepath.Join(m.root, ns, d.Name())
if err := os.RemoveAll(path); err != nil {
log.G(ctx).WithError(err).Errorf("cleanup working dir %s", path)
}
}
}
return nil
}

View File

@ -81,7 +81,7 @@ func initFunc(ic *plugin.InitContext) (interface{}, error) {
return nil, err
}
v2r, err := ic.Get(plugin.RuntimePluginV2)
v2r, err := ic.GetByID(plugin.RuntimePluginV2, "task")
if err != nil {
return nil, err
}

View File

@ -24,6 +24,7 @@ import (
var tasksServiceRequires = []plugin.Type{
plugin.EventPlugin,
plugin.RuntimePluginV2,
plugin.RuntimeShimPlugin,
plugin.MetadataPlugin,
plugin.TaskMonitorPlugin,
}

View File

@ -30,6 +30,7 @@ var tasksServiceRequires = []plugin.Type{
plugin.EventPlugin,
plugin.RuntimePlugin,
plugin.RuntimePluginV2,
plugin.RuntimeShimPlugin,
plugin.MetadataPlugin,
plugin.TaskMonitorPlugin,
}

View File

@ -24,6 +24,7 @@ import (
var tasksServiceRequires = []plugin.Type{
plugin.EventPlugin,
plugin.RuntimePluginV2,
plugin.RuntimeShimPlugin,
plugin.MetadataPlugin,
plugin.TaskMonitorPlugin,
}