From 7c4ead285dbab56357c3c5411612b5032661f1f5 Mon Sep 17 00:00:00 2001 From: Maksym Pavlenko Date: Thu, 26 Aug 2021 11:56:46 -0700 Subject: [PATCH] Add task manager Signed-off-by: Maksym Pavlenko --- integration/client/container_linux_test.go | 5 ++- runtime/runtime.go | 2 - runtime/v2/manager.go | 51 ++++++++++++++++++++++ services/tasks/local.go | 6 +-- 4 files changed, 58 insertions(+), 6 deletions(-) diff --git a/integration/client/container_linux_test.go b/integration/client/container_linux_test.go index 4de3f4477..40023bbb2 100644 --- a/integration/client/container_linux_test.go +++ b/integration/client/container_linux_test.go @@ -41,7 +41,10 @@ import ( "github.com/containerd/containerd/runtime/linux/runctypes" "github.com/containerd/containerd/runtime/v2/runc/options" "github.com/containerd/containerd/sys" + "github.com/containerd/typeurl" + "github.com/opencontainers/runtime-spec/specs-go" specs "github.com/opencontainers/runtime-spec/specs-go" + "github.com/pkg/errors" exec "golang.org/x/sys/execabs" "golang.org/x/sys/unix" ) @@ -430,7 +433,7 @@ func getLogDirPath(runtimeVersion, id string) string { case "v1": return filepath.Join(defaultRoot, plugin.RuntimeLinuxV1, testNamespace, id) case "v2": - return filepath.Join(defaultState, "io.containerd.runtime.v2.task", testNamespace, id) + return filepath.Join(defaultState, "io.containerd.runtime.v2.shim", testNamespace, id) default: panic(fmt.Errorf("Unsupported runtime version %s", runtimeVersion)) } diff --git a/runtime/runtime.go b/runtime/runtime.go index 52974a3a9..b4bed75cf 100644 --- a/runtime/runtime.go +++ b/runtime/runtime.go @@ -69,8 +69,6 @@ type PlatformRuntime interface { // Tasks returns all the current tasks for the runtime. // Any container runs at most one task at a time. Tasks(ctx context.Context, all bool) ([]Task, error) - // Add adds a task into runtime. - Add(ctx context.Context, task Task) error // Delete remove a task. Delete(ctx context.Context, taskID string) (*Exit, error) } diff --git a/runtime/v2/manager.go b/runtime/v2/manager.go index d7b23f050..5cb5b9cd2 100644 --- a/runtime/v2/manager.go +++ b/runtime/v2/manager.go @@ -92,6 +92,20 @@ func init() { }) }, }) + + plugin.Register(&plugin.Registration{ + Type: plugin.RuntimePluginV2, + ID: "task", + InitFn: func(ic *plugin.InitContext) (interface{}, error) { + shimInstance, err := ic.GetByID(plugin.RuntimePluginV2, "shim") + if err != nil { + return nil, err + } + + shimManager := shimInstance.(*ShimManager) + return NewTaskManager(shimManager), nil + }, + }) } type ManagerConfig struct { @@ -400,3 +414,40 @@ func parsePlatforms(platformStr []string) ([]ocispec.Platform, error) { } return p, nil } + +// TaskManager wraps task service client on top of shim manager. +type TaskManager struct { + shims *ShimManager +} + +// NewTaskManager creates a new task manager instance. +func NewTaskManager(shims *ShimManager) *TaskManager { + return &TaskManager{ + shims: shims, + } +} + +// ID of the task manager +func (m *TaskManager) ID() string { + return fmt.Sprintf("%s.%s", plugin.RuntimePluginV2, "task") +} + +// Create launches new shim instance and creates new task +func (m *TaskManager) Create(ctx context.Context, taskID string, opts runtime.CreateOpts) (runtime.Task, error) { + return m.shims.Create(ctx, taskID, opts) +} + +// Get a specific task +func (m *TaskManager) Get(ctx context.Context, id string) (runtime.Task, error) { + return m.shims.Get(ctx, id) +} + +// Tasks lists all tasks +func (m *TaskManager) Tasks(ctx context.Context, all bool) ([]runtime.Task, error) { + return m.shims.Tasks(ctx, all) +} + +// Delete deletes the task and shim instance +func (m *TaskManager) Delete(ctx context.Context, taskID string) (*runtime.Exit, error) { + return m.shims.Delete(ctx, taskID) +} diff --git a/services/tasks/local.go b/services/tasks/local.go index edee5616c..54c758fbf 100644 --- a/services/tasks/local.go +++ b/services/tasks/local.go @@ -81,7 +81,7 @@ func initFunc(ic *plugin.InitContext) (interface{}, error) { return nil, err } - v2r, err := ic.Get(plugin.RuntimePluginV2) + v2r, err := ic.GetByID(plugin.RuntimePluginV2, "task") if err != nil { return nil, err } @@ -111,7 +111,7 @@ func initFunc(ic *plugin.InitContext) (interface{}, error) { store: db.ContentStore(), publisher: ep.(events.Publisher), monitor: monitor.(runtime.TaskMonitor), - v2Runtime: v2r.(*v2.ShimManager), + v2Runtime: v2r.(*v2.TaskManager), } for _, r := range runtimes { tasks, err := r.Tasks(ic.Context, true) @@ -139,7 +139,7 @@ type local struct { publisher events.Publisher monitor runtime.TaskMonitor - v2Runtime *v2.ShimManager + v2Runtime *v2.TaskManager } func (l *local) Create(ctx context.Context, r *api.CreateTaskRequest, _ ...grpc.CallOption) (*api.CreateTaskResponse, error) {