From ff65fc2d0e656d435c6d36700301b681c17341e7 Mon Sep 17 00:00:00 2001 From: Maksym Pavlenko Date: Wed, 10 Aug 2022 14:02:53 -0700 Subject: [PATCH] Make TaskList generic Signed-off-by: Maksym Pavlenko --- runtime/nsmap.go | 142 +++++++++++++++++++++++++++++++++++ runtime/task_list.go | 144 ------------------------------------ runtime/v1/linux/runtime.go | 4 +- runtime/v1/linux/task.go | 4 +- runtime/v2/manager.go | 4 +- runtime/v2/shim.go | 2 +- services/tasks/local.go | 2 +- 7 files changed, 150 insertions(+), 152 deletions(-) create mode 100644 runtime/nsmap.go delete mode 100644 runtime/task_list.go diff --git a/runtime/nsmap.go b/runtime/nsmap.go new file mode 100644 index 000000000..ed172adcc --- /dev/null +++ b/runtime/nsmap.go @@ -0,0 +1,142 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package runtime + +import ( + "context" + "fmt" + "sync" + + "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/namespaces" +) + +type object interface { + ID() string +} + +// NSMap extends Map type with a notion of namespaces passed via Context. +type NSMap[T object] struct { + mu sync.Mutex + objects map[string]map[string]T +} + +// NewNSMap returns a new NSMap +func NewNSMap[T object]() *NSMap[T] { + return &NSMap[T]{ + objects: make(map[string]map[string]T), + } +} + +// Get a task +func (m *NSMap[T]) Get(ctx context.Context, id string) (T, error) { + m.mu.Lock() + defer m.mu.Unlock() + namespace, err := namespaces.NamespaceRequired(ctx) + var t T + if err != nil { + return t, err + } + tasks, ok := m.objects[namespace] + if !ok { + return t, errdefs.ErrNotFound + } + t, ok = tasks[id] + if !ok { + return t, errdefs.ErrNotFound + } + return t, nil +} + +// GetAll objects under a namespace +func (m *NSMap[T]) GetAll(ctx context.Context, noNS bool) ([]T, error) { + m.mu.Lock() + defer m.mu.Unlock() + var o []T + if noNS { + for ns := range m.objects { + for _, t := range m.objects[ns] { + o = append(o, t) + } + } + return o, nil + } + namespace, err := namespaces.NamespaceRequired(ctx) + if err != nil { + return nil, err + } + tasks, ok := m.objects[namespace] + if !ok { + return o, nil + } + for _, t := range tasks { + o = append(o, t) + } + return o, nil +} + +// Add a task +func (m *NSMap[T]) Add(ctx context.Context, t T) error { + namespace, err := namespaces.NamespaceRequired(ctx) + if err != nil { + return err + } + return m.AddWithNamespace(namespace, t) +} + +// AddWithNamespace adds a task with the provided namespace +func (m *NSMap[T]) AddWithNamespace(namespace string, t T) error { + m.mu.Lock() + defer m.mu.Unlock() + + id := t.ID() + if _, ok := m.objects[namespace]; !ok { + m.objects[namespace] = make(map[string]T) + } + if _, ok := m.objects[namespace][id]; ok { + return fmt.Errorf("%s: %w", id, errdefs.ErrAlreadyExists) + } + m.objects[namespace][id] = t + return nil +} + +// Delete a task +func (m *NSMap[T]) Delete(ctx context.Context, id string) { + m.mu.Lock() + defer m.mu.Unlock() + namespace, err := namespaces.NamespaceRequired(ctx) + if err != nil { + return + } + tasks, ok := m.objects[namespace] + if ok { + delete(tasks, id) + } +} + +func (m *NSMap[T]) IsEmpty() bool { + m.mu.Lock() + defer m.mu.Unlock() + + for ns := range m.objects { + if len(m.objects[ns]) > 0 { + return false + } + } + + return true +} diff --git a/runtime/task_list.go b/runtime/task_list.go deleted file mode 100644 index f24d689f1..000000000 --- a/runtime/task_list.go +++ /dev/null @@ -1,144 +0,0 @@ -/* - Copyright The containerd Authors. - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. -*/ - -package runtime - -import ( - "context" - "errors" - "fmt" - "sync" - - "github.com/containerd/containerd/namespaces" -) - -var ( - // ErrTaskNotExists is returned when a task does not exist - ErrTaskNotExists = errors.New("task does not exist") - // ErrTaskAlreadyExists is returned when a task already exists - ErrTaskAlreadyExists = errors.New("task already exists") -) - -// NewTaskList returns a new TaskList -func NewTaskList() *TaskList { - return &TaskList{ - tasks: make(map[string]map[string]Task), - } -} - -// TaskList holds and provides locking around tasks -type TaskList struct { - mu sync.Mutex - tasks map[string]map[string]Task -} - -// Get a task -func (l *TaskList) Get(ctx context.Context, id string) (Task, error) { - l.mu.Lock() - defer l.mu.Unlock() - namespace, err := namespaces.NamespaceRequired(ctx) - if err != nil { - return nil, err - } - tasks, ok := l.tasks[namespace] - if !ok { - return nil, ErrTaskNotExists - } - t, ok := tasks[id] - if !ok { - return nil, ErrTaskNotExists - } - return t, nil -} - -// GetAll tasks under a namespace -func (l *TaskList) GetAll(ctx context.Context, noNS bool) ([]Task, error) { - l.mu.Lock() - defer l.mu.Unlock() - var o []Task - if noNS { - for ns := range l.tasks { - for _, t := range l.tasks[ns] { - o = append(o, t) - } - } - return o, nil - } - namespace, err := namespaces.NamespaceRequired(ctx) - if err != nil { - return nil, err - } - tasks, ok := l.tasks[namespace] - if !ok { - return o, nil - } - for _, t := range tasks { - o = append(o, t) - } - return o, nil -} - -// Add a task -func (l *TaskList) Add(ctx context.Context, t Task) error { - namespace, err := namespaces.NamespaceRequired(ctx) - if err != nil { - return err - } - return l.AddWithNamespace(namespace, t) -} - -// AddWithNamespace adds a task with the provided namespace -func (l *TaskList) AddWithNamespace(namespace string, t Task) error { - l.mu.Lock() - defer l.mu.Unlock() - - id := t.ID() - if _, ok := l.tasks[namespace]; !ok { - l.tasks[namespace] = make(map[string]Task) - } - if _, ok := l.tasks[namespace][id]; ok { - return fmt.Errorf("%s: %w", id, ErrTaskAlreadyExists) - } - l.tasks[namespace][id] = t - return nil -} - -// Delete a task -func (l *TaskList) Delete(ctx context.Context, id string) { - l.mu.Lock() - defer l.mu.Unlock() - namespace, err := namespaces.NamespaceRequired(ctx) - if err != nil { - return - } - tasks, ok := l.tasks[namespace] - if ok { - delete(tasks, id) - } -} - -func (l *TaskList) IsEmpty() bool { - l.mu.Lock() - defer l.mu.Unlock() - - for ns := range l.tasks { - if len(l.tasks[ns]) > 0 { - return false - } - } - - return true -} diff --git a/runtime/v1/linux/runtime.go b/runtime/v1/linux/runtime.go index f4b7be161..4d94d21f7 100644 --- a/runtime/v1/linux/runtime.go +++ b/runtime/v1/linux/runtime.go @@ -124,7 +124,7 @@ func New(ic *plugin.InitContext) (interface{}, error) { r := &Runtime{ root: ic.Root, state: ic.State, - tasks: runtime.NewTaskList(), + tasks: runtime.NewNSMap[runtime.Task](), containers: metadata.NewContainerStore(m.(*metadata.DB)), address: ic.Address, events: ep.(*exchange.Exchange), @@ -148,7 +148,7 @@ type Runtime struct { state string address string - tasks *runtime.TaskList + tasks *runtime.NSMap[runtime.Task] containers containers.Store events *exchange.Exchange diff --git a/runtime/v1/linux/task.go b/runtime/v1/linux/task.go index c2fbc8b2a..5c8c4c294 100644 --- a/runtime/v1/linux/task.go +++ b/runtime/v1/linux/task.go @@ -48,11 +48,11 @@ type Task struct { namespace string cg cgroups.Cgroup events *exchange.Exchange - tasks *runtime.TaskList + tasks *runtime.NSMap[runtime.Task] bundle *bundle } -func newTask(id, namespace string, pid int, shim *client.Client, events *exchange.Exchange, list *runtime.TaskList, bundle *bundle) (*Task, error) { +func newTask(id, namespace string, pid int, shim *client.Client, events *exchange.Exchange, list *runtime.NSMap[runtime.Task], bundle *bundle) (*Task, error) { var ( err error cg cgroups.Cgroup diff --git a/runtime/v2/manager.go b/runtime/v2/manager.go index 17f39fbfc..e9218b891 100644 --- a/runtime/v2/manager.go +++ b/runtime/v2/manager.go @@ -143,7 +143,7 @@ func NewShimManager(ctx context.Context, config *ManagerConfig) (*ShimManager, e state: config.State, containerdAddress: config.Address, containerdTTRPCAddress: config.TTRPCAddress, - shims: runtime.NewTaskList(), + shims: runtime.NewNSMap[runtime.Task](), events: config.Events, containers: config.Store, schedCore: config.SchedCore, @@ -167,7 +167,7 @@ type ShimManager struct { containerdAddress string containerdTTRPCAddress string schedCore bool - shims *runtime.TaskList + shims *runtime.NSMap[runtime.Task] events *exchange.Exchange containers containers.Store // runtimePaths is a cache of `runtime names` -> `resolved fs path` diff --git a/runtime/v2/shim.go b/runtime/v2/shim.go index b0e59c084..fb7d68f27 100644 --- a/runtime/v2/shim.go +++ b/runtime/v2/shim.go @@ -134,7 +134,7 @@ func loadShim(ctx context.Context, bundle *Bundle, onClose func()) (_ *shimTask, return s, nil } -func cleanupAfterDeadShim(ctx context.Context, id, ns string, rt *runtime.TaskList, events *exchange.Exchange, binaryCall *binary) { +func cleanupAfterDeadShim(ctx context.Context, id, ns string, rt *runtime.NSMap[runtime.Task], events *exchange.Exchange, binaryCall *binary) { ctx = namespaces.WithNamespace(ctx, ns) ctx, cancel := timeout.WithContext(ctx, cleanupTimeout) defer cancel() diff --git a/services/tasks/local.go b/services/tasks/local.go index 914512c33..663c764a8 100644 --- a/services/tasks/local.go +++ b/services/tasks/local.go @@ -228,7 +228,7 @@ func (l *local) Create(ctx context.Context, r *api.CreateTaskRequest, _ ...grpc. return nil, err } _, err = rtime.Get(ctx, r.ContainerID) - if err != nil && err != runtime.ErrTaskNotExists { + if err != nil && !errdefs.IsNotFound(err) { return nil, errdefs.ToGRPC(err) } if err == nil {