Move taskList to the runtime package

Signed-off-by: Kenfe-Mickael Laventure <mickael.laventure@gmail.com>
This commit is contained in:
Kenfe-Mickael Laventure 2017-07-13 13:47:42 +02:00
parent 8eadcb8c28
commit 88c1db5ca5
No known key found for this signature in database
GPG Key ID: 40CF16616B361216
3 changed files with 104 additions and 98 deletions

View File

@ -1,75 +0,0 @@
// +build linux
package linux
import (
"context"
"sync"
"github.com/containerd/containerd/namespaces"
)
func newTaskList() *taskList {
return &taskList{
tasks: make(map[string]map[string]*Task),
}
}
type taskList struct {
mu sync.Mutex
tasks map[string]map[string]*Task
}
func (l *taskList) get(ctx context.Context, id string) (*Task, error) {
l.mu.Lock()
defer l.mu.Unlock()
namespace, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return nil, err
}
tasks, ok := l.tasks[namespace]
if !ok {
return nil, ErrTaskNotExists
}
t, ok := tasks[id]
if !ok {
return nil, ErrTaskNotExists
}
return t, nil
}
func (l *taskList) add(ctx context.Context, t *Task) error {
namespace, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return err
}
return l.addWithNamespace(namespace, t)
}
func (l *taskList) addWithNamespace(namespace string, t *Task) error {
l.mu.Lock()
defer l.mu.Unlock()
id := t.id
if _, ok := l.tasks[namespace]; !ok {
l.tasks[namespace] = make(map[string]*Task)
}
if _, ok := l.tasks[namespace][id]; ok {
return ErrTaskAlreadyExists
}
l.tasks[namespace][id] = t
return nil
}
func (l *taskList) delete(ctx context.Context, t *Task) {
l.mu.Lock()
defer l.mu.Unlock()
namespace, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return
}
tasks, ok := l.tasks[namespace]
if ok {
delete(tasks, t.id)
}
}

View File

@ -29,10 +29,8 @@ import (
)
var (
ErrTaskNotExists = errors.New("task does not exist")
ErrTaskAlreadyExists = errors.New("task already exists")
pluginID = fmt.Sprintf("%s.%s", plugin.RuntimePlugin, "linux")
empty = &google_protobuf.Empty{}
pluginID = fmt.Sprintf("%s.%s", plugin.RuntimePlugin, "linux")
empty = &google_protobuf.Empty{}
)
const (
@ -90,7 +88,7 @@ func New(ic *plugin.InitContext) (interface{}, error) {
shimDebug: cfg.ShimDebug,
runtime: cfg.Runtime,
monitor: monitor.(runtime.TaskMonitor),
tasks: newTaskList(),
tasks: runtime.NewTaskList(),
db: m.(*bolt.DB),
address: ic.Address,
}
@ -99,7 +97,7 @@ func New(ic *plugin.InitContext) (interface{}, error) {
return nil, err
}
for _, t := range tasks {
if err := r.tasks.addWithNamespace(t.namespace, t); err != nil {
if err := r.tasks.AddWithNamespace(t.namespace, t); err != nil {
return nil, err
}
}
@ -115,7 +113,7 @@ type Runtime struct {
address string
monitor runtime.TaskMonitor
tasks *taskList
tasks *runtime.TaskList
db *bolt.DB
}
@ -175,7 +173,7 @@ func (r *Runtime) Create(ctx context.Context, id string, opts runtime.CreateOpts
return nil, errors.New(grpc.ErrorDesc(err))
}
t := newTask(id, namespace, s)
if err := r.tasks.add(ctx, t); err != nil {
if err := r.tasks.Add(ctx, t); err != nil {
return nil, err
}
// after the task is created, add it to the monitor
@ -204,7 +202,7 @@ func (r *Runtime) Delete(ctx context.Context, c runtime.Task) (*runtime.Exit, er
if err := lc.shim.KillShim(ctx); err != nil {
log.G(ctx).WithError(err).Error("failed to kill shim")
}
r.tasks.delete(ctx, lc)
r.tasks.Delete(ctx, lc)
bundle := loadBundle(filepath.Join(r.root, namespace, lc.id), namespace)
if err := bundle.Delete(); err != nil {
@ -218,19 +216,7 @@ func (r *Runtime) Delete(ctx context.Context, c runtime.Task) (*runtime.Exit, er
}
func (r *Runtime) Tasks(ctx context.Context) ([]runtime.Task, error) {
namespace, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return nil, err
}
var o []runtime.Task
tasks, ok := r.tasks.tasks[namespace]
if !ok {
return o, nil
}
for _, t := range tasks {
o = append(o, t)
}
return o, nil
return r.tasks.GetAll(ctx)
}
func (r *Runtime) restoreTasks(ctx context.Context) ([]*Task, error) {
@ -255,7 +241,7 @@ func (r *Runtime) restoreTasks(ctx context.Context) ([]*Task, error) {
}
func (r *Runtime) Get(ctx context.Context, id string) (runtime.Task, error) {
return r.tasks.get(ctx, id)
return r.tasks.Get(ctx, id)
}
func (r *Runtime) loadTasks(ctx context.Context, ns string) ([]*Task, error) {

95
runtime/task_list.go Normal file
View File

@ -0,0 +1,95 @@
package runtime
import (
"context"
"errors"
"sync"
"github.com/containerd/containerd/namespaces"
)
var (
ErrTaskNotExists = errors.New("task does not exist")
ErrTaskAlreadyExists = errors.New("task already exists")
)
func NewTaskList() *TaskList {
return &TaskList{
tasks: make(map[string]map[string]Task),
}
}
type TaskList struct {
mu sync.Mutex
tasks map[string]map[string]Task
}
func (l *TaskList) Get(ctx context.Context, id string) (Task, error) {
l.mu.Lock()
defer l.mu.Unlock()
namespace, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return nil, err
}
tasks, ok := l.tasks[namespace]
if !ok {
return nil, ErrTaskNotExists
}
t, ok := tasks[id]
if !ok {
return nil, ErrTaskNotExists
}
return t, nil
}
func (l *TaskList) GetAll(ctx context.Context) ([]Task, error) {
namespace, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return nil, err
}
var o []Task
tasks, ok := l.tasks[namespace]
if !ok {
return o, nil
}
for _, t := range tasks {
o = append(o, t)
}
return o, nil
}
func (l *TaskList) Add(ctx context.Context, t Task) error {
namespace, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return err
}
return l.AddWithNamespace(namespace, t)
}
func (l *TaskList) AddWithNamespace(namespace string, t Task) error {
l.mu.Lock()
defer l.mu.Unlock()
id := t.ID()
if _, ok := l.tasks[namespace]; !ok {
l.tasks[namespace] = make(map[string]Task)
}
if _, ok := l.tasks[namespace][id]; ok {
return ErrTaskAlreadyExists
}
l.tasks[namespace][id] = t
return nil
}
func (l *TaskList) Delete(ctx context.Context, t Task) {
l.mu.Lock()
defer l.mu.Unlock()
namespace, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return
}
tasks, ok := l.tasks[namespace]
if ok {
delete(tasks, t.ID())
}
}