diff --git a/runtime/nsmap.go b/runtime/nsmap.go new file mode 100644 index 000000000..ed172adcc --- /dev/null +++ b/runtime/nsmap.go @@ -0,0 +1,142 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package runtime + +import ( + "context" + "fmt" + "sync" + + "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/namespaces" +) + +type object interface { + ID() string +} + +// NSMap extends Map type with a notion of namespaces passed via Context. +type NSMap[T object] struct { + mu sync.Mutex + objects map[string]map[string]T +} + +// NewNSMap returns a new NSMap +func NewNSMap[T object]() *NSMap[T] { + return &NSMap[T]{ + objects: make(map[string]map[string]T), + } +} + +// Get a task +func (m *NSMap[T]) Get(ctx context.Context, id string) (T, error) { + m.mu.Lock() + defer m.mu.Unlock() + namespace, err := namespaces.NamespaceRequired(ctx) + var t T + if err != nil { + return t, err + } + tasks, ok := m.objects[namespace] + if !ok { + return t, errdefs.ErrNotFound + } + t, ok = tasks[id] + if !ok { + return t, errdefs.ErrNotFound + } + return t, nil +} + +// GetAll objects under a namespace +func (m *NSMap[T]) GetAll(ctx context.Context, noNS bool) ([]T, error) { + m.mu.Lock() + defer m.mu.Unlock() + var o []T + if noNS { + for ns := range m.objects { + for _, t := range m.objects[ns] { + o = append(o, t) + } + } + return o, nil + } + namespace, err := namespaces.NamespaceRequired(ctx) + if err != nil { + return nil, err + } + tasks, ok := m.objects[namespace] + if !ok { + return o, nil + } + for _, t := range tasks { + o = append(o, t) + } + return o, nil +} + +// Add a task +func (m *NSMap[T]) Add(ctx context.Context, t T) error { + namespace, err := namespaces.NamespaceRequired(ctx) + if err != nil { + return err + } + return m.AddWithNamespace(namespace, t) +} + +// AddWithNamespace adds a task with the provided namespace +func (m *NSMap[T]) AddWithNamespace(namespace string, t T) error { + m.mu.Lock() + defer m.mu.Unlock() + + id := t.ID() + if _, ok := m.objects[namespace]; !ok { + m.objects[namespace] = make(map[string]T) + } + if _, ok := m.objects[namespace][id]; ok { + return fmt.Errorf("%s: %w", id, errdefs.ErrAlreadyExists) + } + m.objects[namespace][id] = t + return nil +} + +// Delete a task +func (m *NSMap[T]) Delete(ctx context.Context, id string) { + m.mu.Lock() + defer m.mu.Unlock() + namespace, err := namespaces.NamespaceRequired(ctx) + if err != nil { + return + } + tasks, ok := m.objects[namespace] + if ok { + delete(tasks, id) + } +} + +func (m *NSMap[T]) IsEmpty() bool { + m.mu.Lock() + defer m.mu.Unlock() + + for ns := range m.objects { + if len(m.objects[ns]) > 0 { + return false + } + } + + return true +} diff --git a/runtime/task_list.go b/runtime/task_list.go deleted file mode 100644 index f24d689f1..000000000 --- a/runtime/task_list.go +++ /dev/null @@ -1,144 +0,0 @@ -/* - Copyright The containerd Authors. - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. -*/ - -package runtime - -import ( - "context" - "errors" - "fmt" - "sync" - - "github.com/containerd/containerd/namespaces" -) - -var ( - // ErrTaskNotExists is returned when a task does not exist - ErrTaskNotExists = errors.New("task does not exist") - // ErrTaskAlreadyExists is returned when a task already exists - ErrTaskAlreadyExists = errors.New("task already exists") -) - -// NewTaskList returns a new TaskList -func NewTaskList() *TaskList { - return &TaskList{ - tasks: make(map[string]map[string]Task), - } -} - -// TaskList holds and provides locking around tasks -type TaskList struct { - mu sync.Mutex - tasks map[string]map[string]Task -} - -// Get a task -func (l *TaskList) Get(ctx context.Context, id string) (Task, error) { - l.mu.Lock() - defer l.mu.Unlock() - namespace, err := namespaces.NamespaceRequired(ctx) - if err != nil { - return nil, err - } - tasks, ok := l.tasks[namespace] - if !ok { - return nil, ErrTaskNotExists - } - t, ok := tasks[id] - if !ok { - return nil, ErrTaskNotExists - } - return t, nil -} - -// GetAll tasks under a namespace -func (l *TaskList) GetAll(ctx context.Context, noNS bool) ([]Task, error) { - l.mu.Lock() - defer l.mu.Unlock() - var o []Task - if noNS { - for ns := range l.tasks { - for _, t := range l.tasks[ns] { - o = append(o, t) - } - } - return o, nil - } - namespace, err := namespaces.NamespaceRequired(ctx) - if err != nil { - return nil, err - } - tasks, ok := l.tasks[namespace] - if !ok { - return o, nil - } - for _, t := range tasks { - o = append(o, t) - } - return o, nil -} - -// Add a task -func (l *TaskList) Add(ctx context.Context, t Task) error { - namespace, err := namespaces.NamespaceRequired(ctx) - if err != nil { - return err - } - return l.AddWithNamespace(namespace, t) -} - -// AddWithNamespace adds a task with the provided namespace -func (l *TaskList) AddWithNamespace(namespace string, t Task) error { - l.mu.Lock() - defer l.mu.Unlock() - - id := t.ID() - if _, ok := l.tasks[namespace]; !ok { - l.tasks[namespace] = make(map[string]Task) - } - if _, ok := l.tasks[namespace][id]; ok { - return fmt.Errorf("%s: %w", id, ErrTaskAlreadyExists) - } - l.tasks[namespace][id] = t - return nil -} - -// Delete a task -func (l *TaskList) Delete(ctx context.Context, id string) { - l.mu.Lock() - defer l.mu.Unlock() - namespace, err := namespaces.NamespaceRequired(ctx) - if err != nil { - return - } - tasks, ok := l.tasks[namespace] - if ok { - delete(tasks, id) - } -} - -func (l *TaskList) IsEmpty() bool { - l.mu.Lock() - defer l.mu.Unlock() - - for ns := range l.tasks { - if len(l.tasks[ns]) > 0 { - return false - } - } - - return true -} diff --git a/runtime/v1/linux/runtime.go b/runtime/v1/linux/runtime.go index f4b7be161..4d94d21f7 100644 --- a/runtime/v1/linux/runtime.go +++ b/runtime/v1/linux/runtime.go @@ -124,7 +124,7 @@ func New(ic *plugin.InitContext) (interface{}, error) { r := &Runtime{ root: ic.Root, state: ic.State, - tasks: runtime.NewTaskList(), + tasks: runtime.NewNSMap[runtime.Task](), containers: metadata.NewContainerStore(m.(*metadata.DB)), address: ic.Address, events: ep.(*exchange.Exchange), @@ -148,7 +148,7 @@ type Runtime struct { state string address string - tasks *runtime.TaskList + tasks *runtime.NSMap[runtime.Task] containers containers.Store events *exchange.Exchange diff --git a/runtime/v1/linux/task.go b/runtime/v1/linux/task.go index c2fbc8b2a..5c8c4c294 100644 --- a/runtime/v1/linux/task.go +++ b/runtime/v1/linux/task.go @@ -48,11 +48,11 @@ type Task struct { namespace string cg cgroups.Cgroup events *exchange.Exchange - tasks *runtime.TaskList + tasks *runtime.NSMap[runtime.Task] bundle *bundle } -func newTask(id, namespace string, pid int, shim *client.Client, events *exchange.Exchange, list *runtime.TaskList, bundle *bundle) (*Task, error) { +func newTask(id, namespace string, pid int, shim *client.Client, events *exchange.Exchange, list *runtime.NSMap[runtime.Task], bundle *bundle) (*Task, error) { var ( err error cg cgroups.Cgroup diff --git a/runtime/v2/manager.go b/runtime/v2/manager.go index 17f39fbfc..fd2717c0d 100644 --- a/runtime/v2/manager.go +++ b/runtime/v2/manager.go @@ -25,7 +25,6 @@ import ( "strings" "sync" - "github.com/containerd/containerd/api/runtime/task/v2" "github.com/containerd/containerd/containers" "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/events/exchange" @@ -143,7 +142,7 @@ func NewShimManager(ctx context.Context, config *ManagerConfig) (*ShimManager, e state: config.State, containerdAddress: config.Address, containerdTTRPCAddress: config.TTRPCAddress, - shims: runtime.NewTaskList(), + shims: runtime.NewNSMap[ShimInstance](), events: config.Events, containers: config.Store, schedCore: config.SchedCore, @@ -167,7 +166,7 @@ type ShimManager struct { containerdAddress string containerdTTRPCAddress string schedCore bool - shims *runtime.TaskList + shims *runtime.NSMap[ShimInstance] events *exchange.Exchange containers containers.Store // runtimePaths is a cache of `runtime names` -> `resolved fs path` @@ -181,7 +180,7 @@ func (m *ShimManager) ID() string { } // Start launches a new shim instance -func (m *ShimManager) Start(ctx context.Context, id string, opts runtime.CreateOpts) (_ ShimProcess, retErr error) { +func (m *ShimManager) Start(ctx context.Context, id string, opts runtime.CreateOpts) (_ ShimInstance, retErr error) { bundle, err := NewBundle(ctx, m.root, m.state, id, opts.Spec) if err != nil { return nil, err @@ -236,18 +235,11 @@ func (m *ShimManager) Start(ctx context.Context, id string, opts runtime.CreateO } }() - // NOTE: temporarily keep this wrapper around until containerd's task service depends on it. - // This will no longer be required once we migrate to client side task management. - shimTask := &shimTask{ - shim: shim, - task: task.NewTaskClient(shim.client), - } - - if err := m.shims.Add(ctx, shimTask); err != nil { + if err := m.shims.Add(ctx, shim); err != nil { return nil, fmt.Errorf("failed to add task: %w", err) } - return shimTask, nil + return shim, nil } func (m *ShimManager) startShim(ctx context.Context, bundle *Bundle, id string, opts runtime.CreateOpts) (*shim, error) { @@ -372,29 +364,22 @@ func (m *ShimManager) cleanupShim(shim *shim) { dctx, cancel := timeout.WithContext(context.Background(), cleanupTimeout) defer cancel() - _ = shim.delete(dctx) + _ = shim.Delete(dctx) m.shims.Delete(dctx, shim.ID()) } -func (m *ShimManager) Get(ctx context.Context, id string) (ShimProcess, error) { - proc, err := m.shims.Get(ctx, id) - if err != nil { - return nil, err - } - - shimTask := proc.(*shimTask) - return shimTask, nil +func (m *ShimManager) Get(ctx context.Context, id string) (ShimInstance, error) { + return m.shims.Get(ctx, id) } // Delete a runtime task func (m *ShimManager) Delete(ctx context.Context, id string) error { - proc, err := m.shims.Get(ctx, id) + shim, err := m.shims.Get(ctx, id) if err != nil { return err } - shimTask := proc.(*shimTask) - err = shimTask.shim.delete(ctx) + err = shim.Delete(ctx) m.shims.Delete(ctx, id) return err @@ -431,15 +416,15 @@ func (m *TaskManager) ID() string { // Create launches new shim instance and creates new task func (m *TaskManager) Create(ctx context.Context, taskID string, opts runtime.CreateOpts) (runtime.Task, error) { - process, err := m.manager.Start(ctx, taskID, opts) + shim, err := m.manager.Start(ctx, taskID, opts) if err != nil { return nil, fmt.Errorf("failed to start shim: %w", err) } // Cast to shim task and call task service to create a new container task instance. // This will not be required once shim service / client implemented. - shim := process.(*shimTask) - t, err := shim.Create(ctx, opts) + shimTask := newShimTask(shim) + t, err := shimTask.Create(ctx, opts) if err != nil { // NOTE: ctx contains required namespace information. m.manager.shims.Delete(ctx, taskID) @@ -448,15 +433,15 @@ func (m *TaskManager) Create(ctx context.Context, taskID string, opts runtime.Cr defer cancel() sandboxed := opts.SandboxID != "" - _, errShim := shim.delete(dctx, sandboxed, func(context.Context, string) {}) + _, errShim := shimTask.delete(dctx, sandboxed, func(context.Context, string) {}) if errShim != nil { if errdefs.IsDeadlineExceeded(errShim) { dctx, cancel = timeout.WithContext(context.Background(), cleanupTimeout) defer cancel() } - shim.Shutdown(dctx) - shim.Close() + shimTask.Shutdown(dctx) + shimTask.Client().Close() } return nil, fmt.Errorf("failed to create shim task: %w", err) @@ -467,17 +452,29 @@ func (m *TaskManager) Create(ctx context.Context, taskID string, opts runtime.Cr // Get a specific task func (m *TaskManager) Get(ctx context.Context, id string) (runtime.Task, error) { - return m.manager.shims.Get(ctx, id) + shim, err := m.manager.shims.Get(ctx, id) + if err != nil { + return nil, err + } + return newShimTask(shim), nil } // Tasks lists all tasks func (m *TaskManager) Tasks(ctx context.Context, all bool) ([]runtime.Task, error) { - return m.manager.shims.GetAll(ctx, all) + shims, err := m.manager.shims.GetAll(ctx, all) + if err != nil { + return nil, err + } + out := make([]runtime.Task, len(shims)) + for i := range shims { + out[i] = newShimTask(shims[i]) + } + return out, nil } // Delete deletes the task and shim instance func (m *TaskManager) Delete(ctx context.Context, taskID string) (*runtime.Exit, error) { - item, err := m.manager.shims.Get(ctx, taskID) + shim, err := m.manager.shims.Get(ctx, taskID) if err != nil { return nil, err } @@ -487,8 +484,11 @@ func (m *TaskManager) Delete(ctx context.Context, taskID string) (*runtime.Exit, return nil, err } - sandboxed := container.SandboxID != "" - shimTask := item.(*shimTask) + var ( + sandboxed = container.SandboxID != "" + shimTask = newShimTask(shim) + ) + exit, err := shimTask.delete(ctx, sandboxed, func(ctx context.Context, id string) { m.manager.shims.Delete(ctx, id) }) diff --git a/runtime/v2/shim.go b/runtime/v2/shim.go index b0e59c084..88c6aa1bf 100644 --- a/runtime/v2/shim.go +++ b/runtime/v2/shim.go @@ -63,7 +63,7 @@ func loadAddress(path string) (string, error) { return string(data), nil } -func loadShim(ctx context.Context, bundle *Bundle, onClose func()) (_ *shimTask, err error) { +func loadShim(ctx context.Context, bundle *Bundle, onClose func()) (_ ShimInstance, err error) { address, err := loadAddress(filepath.Join(bundle.Path, "address")) if err != nil { return nil, err @@ -117,24 +117,21 @@ func loadShim(ctx context.Context, bundle *Bundle, onClose func()) (_ *shimTask, client.Close() } }() - s := &shimTask{ - shim: &shim{ - bundle: bundle, - client: client, - }, - task: task.NewTaskClient(client), + shim := &shim{ + bundle: bundle, + client: client, } ctx, cancel := timeout.WithContext(ctx, loadTimeout) defer cancel() - - // Check connectivity + // Check connectivity, TaskService is the only required service, so create a temp one to check connection. + s := newShimTask(shim) if _, err := s.PID(ctx); err != nil { return nil, err } - return s, nil + return shim, nil } -func cleanupAfterDeadShim(ctx context.Context, id, ns string, rt *runtime.TaskList, events *exchange.Exchange, binaryCall *binary) { +func cleanupAfterDeadShim(ctx context.Context, id, ns string, rt *runtime.NSMap[ShimInstance], events *exchange.Exchange, binaryCall *binary) { ctx = namespaces.WithNamespace(ctx, ns) ctx, cancel := timeout.WithContext(ctx, cleanupTimeout) defer cancel() @@ -186,10 +183,8 @@ func cleanupAfterDeadShim(ctx context.Context, id, ns string, rt *runtime.TaskLi }) } -// ShimProcess represents a shim instance managed by the shim service. -type ShimProcess interface { - runtime.Process - +// ShimInstance represents running shim process managed by ShimManager. +type ShimInstance interface { // ID of the shim. ID() string // Namespace of this shim. @@ -198,6 +193,8 @@ type ShimProcess interface { Bundle() string // Client returns the underlying TTRPC client for this shim. Client() *ttrpc.Client + // Delete will close the client and remove bundle from disk. + Delete(ctx context.Context) error } type shim struct { @@ -205,6 +202,8 @@ type shim struct { client *ttrpc.Client } +var _ ShimInstance = (*shim)(nil) + // ID of the shim/task func (s *shim) ID() string { return s.bundle.ID @@ -218,16 +217,16 @@ func (s *shim) Bundle() string { return s.bundle.Path } -func (s *shim) Close() error { - return s.client.Close() +func (s *shim) Client() *ttrpc.Client { + return s.client } -func (s *shim) delete(ctx context.Context) error { +func (s *shim) Delete(ctx context.Context) error { var ( result *multierror.Error ) - if err := s.Close(); err != nil { + if err := s.client.Close(); err != nil { result = multierror.Append(result, fmt.Errorf("failed to close ttrpc client: %w", err)) } @@ -247,12 +246,15 @@ var _ runtime.Task = &shimTask{} // shimTask wraps shim process and adds task service client for compatibility with existing shim manager. type shimTask struct { - *shim + ShimInstance task task.TaskService } -func (s *shimTask) Client() *ttrpc.Client { - return s.client +func newShimTask(shim ShimInstance) *shimTask { + return &shimTask{ + ShimInstance: shim, + task: task.NewTaskClient(shim.Client()), + } } func (s *shimTask) Shutdown(ctx context.Context) error { @@ -319,7 +321,7 @@ func (s *shimTask) delete(ctx context.Context, sandboxed bool, removeTask func(c } } - if err := s.shim.delete(ctx); err != nil { + if err := s.ShimInstance.Delete(ctx); err != nil { log.G(ctx).WithField("id", s.ID()).WithError(err).Error("failed to delete shim") } @@ -345,7 +347,7 @@ func (s *shimTask) Create(ctx context.Context, opts runtime.CreateOpts) (runtime } request := &task.CreateTaskRequest{ ID: s.ID(), - Bundle: s.bundle.Path, + Bundle: s.Bundle(), Stdin: opts.IO.Stdin, Stdout: opts.IO.Stdout, Stderr: opts.IO.Stderr, diff --git a/runtime/v2/shim_load.go b/runtime/v2/shim_load.go index 760a6a7f2..9047c3eca 100644 --- a/runtime/v2/shim_load.go +++ b/runtime/v2/shim_load.go @@ -130,7 +130,7 @@ func (m *ShimManager) loadShims(ctx context.Context) error { ttrpcAddress: m.containerdTTRPCAddress, schedCore: m.schedCore, }) - shim, err := loadShim(ctx, bundle, func() { + instance, err := loadShim(ctx, bundle, func() { log.G(ctx).WithField("id", id).Info("shim disconnected") cleanupAfterDeadShim(context.Background(), id, ns, m.shims, m.events, binaryCall) @@ -141,6 +141,7 @@ func (m *ShimManager) loadShims(ctx context.Context) error { cleanupAfterDeadShim(ctx, id, ns, m.shims, m.events, binaryCall) continue } + shim := newShimTask(instance) // There are 3 possibilities for the loaded shim here: // 1. It could be a shim that is running a task. @@ -159,7 +160,7 @@ func (m *ShimManager) loadShims(ctx context.Context) error { // No need to do anything for removeTask since we never added this shim. shim.delete(ctx, false, func(ctx context.Context, id string) {}) } else { - m.shims.Add(ctx, shim) + m.shims.Add(ctx, shim.ShimInstance) } } return nil diff --git a/services/tasks/local.go b/services/tasks/local.go index 914512c33..663c764a8 100644 --- a/services/tasks/local.go +++ b/services/tasks/local.go @@ -228,7 +228,7 @@ func (l *local) Create(ctx context.Context, r *api.CreateTaskRequest, _ ...grpc. return nil, err } _, err = rtime.Get(ctx, r.ContainerID) - if err != nil && err != runtime.ErrTaskNotExists { + if err != nil && !errdefs.IsNotFound(err) { return nil, errdefs.ToGRPC(err) } if err == nil {