diff --git a/linux/list.go b/linux/list.go deleted file mode 100644 index 0b436f0c5..000000000 --- a/linux/list.go +++ /dev/null @@ -1,75 +0,0 @@ -// +build linux - -package linux - -import ( - "context" - "sync" - - "github.com/containerd/containerd/namespaces" -) - -func newTaskList() *taskList { - return &taskList{ - tasks: make(map[string]map[string]*Task), - } -} - -type taskList struct { - mu sync.Mutex - tasks map[string]map[string]*Task -} - -func (l *taskList) get(ctx context.Context, id string) (*Task, error) { - l.mu.Lock() - defer l.mu.Unlock() - namespace, err := namespaces.NamespaceRequired(ctx) - if err != nil { - return nil, err - } - tasks, ok := l.tasks[namespace] - if !ok { - return nil, ErrTaskNotExists - } - t, ok := tasks[id] - if !ok { - return nil, ErrTaskNotExists - } - return t, nil -} - -func (l *taskList) add(ctx context.Context, t *Task) error { - namespace, err := namespaces.NamespaceRequired(ctx) - if err != nil { - return err - } - return l.addWithNamespace(namespace, t) -} - -func (l *taskList) addWithNamespace(namespace string, t *Task) error { - l.mu.Lock() - defer l.mu.Unlock() - - id := t.id - if _, ok := l.tasks[namespace]; !ok { - l.tasks[namespace] = make(map[string]*Task) - } - if _, ok := l.tasks[namespace][id]; ok { - return ErrTaskAlreadyExists - } - l.tasks[namespace][id] = t - return nil -} - -func (l *taskList) delete(ctx context.Context, t *Task) { - l.mu.Lock() - defer l.mu.Unlock() - namespace, err := namespaces.NamespaceRequired(ctx) - if err != nil { - return - } - tasks, ok := l.tasks[namespace] - if ok { - delete(tasks, t.id) - } -} diff --git a/linux/runtime.go b/linux/runtime.go index 4a7147e6e..cdc6f4af8 100644 --- a/linux/runtime.go +++ b/linux/runtime.go @@ -29,10 +29,8 @@ import ( ) var ( - ErrTaskNotExists = errors.New("task does not exist") - ErrTaskAlreadyExists = errors.New("task already exists") - pluginID = fmt.Sprintf("%s.%s", plugin.RuntimePlugin, "linux") - empty = &google_protobuf.Empty{} + pluginID = fmt.Sprintf("%s.%s", plugin.RuntimePlugin, "linux") + empty = &google_protobuf.Empty{} ) const ( @@ -90,7 +88,7 @@ func New(ic *plugin.InitContext) (interface{}, error) { shimDebug: cfg.ShimDebug, runtime: cfg.Runtime, monitor: monitor.(runtime.TaskMonitor), - tasks: newTaskList(), + tasks: runtime.NewTaskList(), db: m.(*bolt.DB), address: ic.Address, } @@ -99,7 +97,7 @@ func New(ic *plugin.InitContext) (interface{}, error) { return nil, err } for _, t := range tasks { - if err := r.tasks.addWithNamespace(t.namespace, t); err != nil { + if err := r.tasks.AddWithNamespace(t.namespace, t); err != nil { return nil, err } } @@ -115,7 +113,7 @@ type Runtime struct { address string monitor runtime.TaskMonitor - tasks *taskList + tasks *runtime.TaskList db *bolt.DB } @@ -175,7 +173,7 @@ func (r *Runtime) Create(ctx context.Context, id string, opts runtime.CreateOpts return nil, errors.New(grpc.ErrorDesc(err)) } t := newTask(id, namespace, s) - if err := r.tasks.add(ctx, t); err != nil { + if err := r.tasks.Add(ctx, t); err != nil { return nil, err } // after the task is created, add it to the monitor @@ -204,7 +202,7 @@ func (r *Runtime) Delete(ctx context.Context, c runtime.Task) (*runtime.Exit, er if err := lc.shim.KillShim(ctx); err != nil { log.G(ctx).WithError(err).Error("failed to kill shim") } - r.tasks.delete(ctx, lc) + r.tasks.Delete(ctx, lc) bundle := loadBundle(filepath.Join(r.root, namespace, lc.id), namespace) if err := bundle.Delete(); err != nil { @@ -218,19 +216,7 @@ func (r *Runtime) Delete(ctx context.Context, c runtime.Task) (*runtime.Exit, er } func (r *Runtime) Tasks(ctx context.Context) ([]runtime.Task, error) { - namespace, err := namespaces.NamespaceRequired(ctx) - if err != nil { - return nil, err - } - var o []runtime.Task - tasks, ok := r.tasks.tasks[namespace] - if !ok { - return o, nil - } - for _, t := range tasks { - o = append(o, t) - } - return o, nil + return r.tasks.GetAll(ctx) } func (r *Runtime) restoreTasks(ctx context.Context) ([]*Task, error) { @@ -255,7 +241,7 @@ func (r *Runtime) restoreTasks(ctx context.Context) ([]*Task, error) { } func (r *Runtime) Get(ctx context.Context, id string) (runtime.Task, error) { - return r.tasks.get(ctx, id) + return r.tasks.Get(ctx, id) } func (r *Runtime) loadTasks(ctx context.Context, ns string) ([]*Task, error) { diff --git a/runtime/task_list.go b/runtime/task_list.go new file mode 100644 index 000000000..e5d5c0e53 --- /dev/null +++ b/runtime/task_list.go @@ -0,0 +1,95 @@ +package runtime + +import ( + "context" + "errors" + "sync" + + "github.com/containerd/containerd/namespaces" +) + +var ( + ErrTaskNotExists = errors.New("task does not exist") + ErrTaskAlreadyExists = errors.New("task already exists") +) + +func NewTaskList() *TaskList { + return &TaskList{ + tasks: make(map[string]map[string]Task), + } +} + +type TaskList struct { + mu sync.Mutex + tasks map[string]map[string]Task +} + +func (l *TaskList) Get(ctx context.Context, id string) (Task, error) { + l.mu.Lock() + defer l.mu.Unlock() + namespace, err := namespaces.NamespaceRequired(ctx) + if err != nil { + return nil, err + } + tasks, ok := l.tasks[namespace] + if !ok { + return nil, ErrTaskNotExists + } + t, ok := tasks[id] + if !ok { + return nil, ErrTaskNotExists + } + return t, nil +} + +func (l *TaskList) GetAll(ctx context.Context) ([]Task, error) { + namespace, err := namespaces.NamespaceRequired(ctx) + if err != nil { + return nil, err + } + var o []Task + tasks, ok := l.tasks[namespace] + if !ok { + return o, nil + } + for _, t := range tasks { + o = append(o, t) + } + return o, nil +} + +func (l *TaskList) Add(ctx context.Context, t Task) error { + namespace, err := namespaces.NamespaceRequired(ctx) + if err != nil { + return err + } + return l.AddWithNamespace(namespace, t) +} + +func (l *TaskList) AddWithNamespace(namespace string, t Task) error { + l.mu.Lock() + defer l.mu.Unlock() + + id := t.ID() + if _, ok := l.tasks[namespace]; !ok { + l.tasks[namespace] = make(map[string]Task) + } + if _, ok := l.tasks[namespace][id]; ok { + return ErrTaskAlreadyExists + } + l.tasks[namespace][id] = t + return nil +} + +func (l *TaskList) Delete(ctx context.Context, t Task) { + l.mu.Lock() + defer l.mu.Unlock() + namespace, err := namespaces.NamespaceRequired(ctx) + if err != nil { + return + } + tasks, ok := l.tasks[namespace] + if ok { + delete(tasks, t.ID()) + } +}