Merge pull request #1183 from mlaventure/move-tasklist-to-runtime
Move taskList to the runtime package
This commit is contained in:
commit
d50e4bcdf3
@ -1,75 +0,0 @@
|
|||||||
// +build linux
|
|
||||||
|
|
||||||
package linux
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"sync"
|
|
||||||
|
|
||||||
"github.com/containerd/containerd/namespaces"
|
|
||||||
)
|
|
||||||
|
|
||||||
func newTaskList() *taskList {
|
|
||||||
return &taskList{
|
|
||||||
tasks: make(map[string]map[string]*Task),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
type taskList struct {
|
|
||||||
mu sync.Mutex
|
|
||||||
tasks map[string]map[string]*Task
|
|
||||||
}
|
|
||||||
|
|
||||||
func (l *taskList) get(ctx context.Context, id string) (*Task, error) {
|
|
||||||
l.mu.Lock()
|
|
||||||
defer l.mu.Unlock()
|
|
||||||
namespace, err := namespaces.NamespaceRequired(ctx)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
tasks, ok := l.tasks[namespace]
|
|
||||||
if !ok {
|
|
||||||
return nil, ErrTaskNotExists
|
|
||||||
}
|
|
||||||
t, ok := tasks[id]
|
|
||||||
if !ok {
|
|
||||||
return nil, ErrTaskNotExists
|
|
||||||
}
|
|
||||||
return t, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (l *taskList) add(ctx context.Context, t *Task) error {
|
|
||||||
namespace, err := namespaces.NamespaceRequired(ctx)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return l.addWithNamespace(namespace, t)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (l *taskList) addWithNamespace(namespace string, t *Task) error {
|
|
||||||
l.mu.Lock()
|
|
||||||
defer l.mu.Unlock()
|
|
||||||
|
|
||||||
id := t.id
|
|
||||||
if _, ok := l.tasks[namespace]; !ok {
|
|
||||||
l.tasks[namespace] = make(map[string]*Task)
|
|
||||||
}
|
|
||||||
if _, ok := l.tasks[namespace][id]; ok {
|
|
||||||
return ErrTaskAlreadyExists
|
|
||||||
}
|
|
||||||
l.tasks[namespace][id] = t
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (l *taskList) delete(ctx context.Context, t *Task) {
|
|
||||||
l.mu.Lock()
|
|
||||||
defer l.mu.Unlock()
|
|
||||||
namespace, err := namespaces.NamespaceRequired(ctx)
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
tasks, ok := l.tasks[namespace]
|
|
||||||
if ok {
|
|
||||||
delete(tasks, t.id)
|
|
||||||
}
|
|
||||||
}
|
|
@ -29,10 +29,8 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
ErrTaskNotExists = errors.New("task does not exist")
|
pluginID = fmt.Sprintf("%s.%s", plugin.RuntimePlugin, "linux")
|
||||||
ErrTaskAlreadyExists = errors.New("task already exists")
|
empty = &google_protobuf.Empty{}
|
||||||
pluginID = fmt.Sprintf("%s.%s", plugin.RuntimePlugin, "linux")
|
|
||||||
empty = &google_protobuf.Empty{}
|
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@ -90,7 +88,7 @@ func New(ic *plugin.InitContext) (interface{}, error) {
|
|||||||
shimDebug: cfg.ShimDebug,
|
shimDebug: cfg.ShimDebug,
|
||||||
runtime: cfg.Runtime,
|
runtime: cfg.Runtime,
|
||||||
monitor: monitor.(runtime.TaskMonitor),
|
monitor: monitor.(runtime.TaskMonitor),
|
||||||
tasks: newTaskList(),
|
tasks: runtime.NewTaskList(),
|
||||||
db: m.(*bolt.DB),
|
db: m.(*bolt.DB),
|
||||||
address: ic.Address,
|
address: ic.Address,
|
||||||
}
|
}
|
||||||
@ -99,7 +97,7 @@ func New(ic *plugin.InitContext) (interface{}, error) {
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
for _, t := range tasks {
|
for _, t := range tasks {
|
||||||
if err := r.tasks.addWithNamespace(t.namespace, t); err != nil {
|
if err := r.tasks.AddWithNamespace(t.namespace, t); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -115,7 +113,7 @@ type Runtime struct {
|
|||||||
address string
|
address string
|
||||||
|
|
||||||
monitor runtime.TaskMonitor
|
monitor runtime.TaskMonitor
|
||||||
tasks *taskList
|
tasks *runtime.TaskList
|
||||||
db *bolt.DB
|
db *bolt.DB
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -175,7 +173,7 @@ func (r *Runtime) Create(ctx context.Context, id string, opts runtime.CreateOpts
|
|||||||
return nil, errors.New(grpc.ErrorDesc(err))
|
return nil, errors.New(grpc.ErrorDesc(err))
|
||||||
}
|
}
|
||||||
t := newTask(id, namespace, s)
|
t := newTask(id, namespace, s)
|
||||||
if err := r.tasks.add(ctx, t); err != nil {
|
if err := r.tasks.Add(ctx, t); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
// after the task is created, add it to the monitor
|
// after the task is created, add it to the monitor
|
||||||
@ -204,7 +202,7 @@ func (r *Runtime) Delete(ctx context.Context, c runtime.Task) (*runtime.Exit, er
|
|||||||
if err := lc.shim.KillShim(ctx); err != nil {
|
if err := lc.shim.KillShim(ctx); err != nil {
|
||||||
log.G(ctx).WithError(err).Error("failed to kill shim")
|
log.G(ctx).WithError(err).Error("failed to kill shim")
|
||||||
}
|
}
|
||||||
r.tasks.delete(ctx, lc)
|
r.tasks.Delete(ctx, lc)
|
||||||
|
|
||||||
bundle := loadBundle(filepath.Join(r.root, namespace, lc.id), namespace)
|
bundle := loadBundle(filepath.Join(r.root, namespace, lc.id), namespace)
|
||||||
if err := bundle.Delete(); err != nil {
|
if err := bundle.Delete(); err != nil {
|
||||||
@ -218,19 +216,7 @@ func (r *Runtime) Delete(ctx context.Context, c runtime.Task) (*runtime.Exit, er
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (r *Runtime) Tasks(ctx context.Context) ([]runtime.Task, error) {
|
func (r *Runtime) Tasks(ctx context.Context) ([]runtime.Task, error) {
|
||||||
namespace, err := namespaces.NamespaceRequired(ctx)
|
return r.tasks.GetAll(ctx)
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
var o []runtime.Task
|
|
||||||
tasks, ok := r.tasks.tasks[namespace]
|
|
||||||
if !ok {
|
|
||||||
return o, nil
|
|
||||||
}
|
|
||||||
for _, t := range tasks {
|
|
||||||
o = append(o, t)
|
|
||||||
}
|
|
||||||
return o, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Runtime) restoreTasks(ctx context.Context) ([]*Task, error) {
|
func (r *Runtime) restoreTasks(ctx context.Context) ([]*Task, error) {
|
||||||
@ -255,7 +241,7 @@ func (r *Runtime) restoreTasks(ctx context.Context) ([]*Task, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (r *Runtime) Get(ctx context.Context, id string) (runtime.Task, error) {
|
func (r *Runtime) Get(ctx context.Context, id string) (runtime.Task, error) {
|
||||||
return r.tasks.get(ctx, id)
|
return r.tasks.Get(ctx, id)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Runtime) loadTasks(ctx context.Context, ns string) ([]*Task, error) {
|
func (r *Runtime) loadTasks(ctx context.Context, ns string) ([]*Task, error) {
|
||||||
|
95
runtime/task_list.go
Normal file
95
runtime/task_list.go
Normal file
@ -0,0 +1,95 @@
|
|||||||
|
package runtime
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"github.com/containerd/containerd/namespaces"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
ErrTaskNotExists = errors.New("task does not exist")
|
||||||
|
ErrTaskAlreadyExists = errors.New("task already exists")
|
||||||
|
)
|
||||||
|
|
||||||
|
func NewTaskList() *TaskList {
|
||||||
|
return &TaskList{
|
||||||
|
tasks: make(map[string]map[string]Task),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type TaskList struct {
|
||||||
|
mu sync.Mutex
|
||||||
|
tasks map[string]map[string]Task
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *TaskList) Get(ctx context.Context, id string) (Task, error) {
|
||||||
|
l.mu.Lock()
|
||||||
|
defer l.mu.Unlock()
|
||||||
|
namespace, err := namespaces.NamespaceRequired(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
tasks, ok := l.tasks[namespace]
|
||||||
|
if !ok {
|
||||||
|
return nil, ErrTaskNotExists
|
||||||
|
}
|
||||||
|
t, ok := tasks[id]
|
||||||
|
if !ok {
|
||||||
|
return nil, ErrTaskNotExists
|
||||||
|
}
|
||||||
|
return t, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *TaskList) GetAll(ctx context.Context) ([]Task, error) {
|
||||||
|
namespace, err := namespaces.NamespaceRequired(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
var o []Task
|
||||||
|
tasks, ok := l.tasks[namespace]
|
||||||
|
if !ok {
|
||||||
|
return o, nil
|
||||||
|
}
|
||||||
|
for _, t := range tasks {
|
||||||
|
o = append(o, t)
|
||||||
|
}
|
||||||
|
return o, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *TaskList) Add(ctx context.Context, t Task) error {
|
||||||
|
namespace, err := namespaces.NamespaceRequired(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return l.AddWithNamespace(namespace, t)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *TaskList) AddWithNamespace(namespace string, t Task) error {
|
||||||
|
l.mu.Lock()
|
||||||
|
defer l.mu.Unlock()
|
||||||
|
|
||||||
|
id := t.ID()
|
||||||
|
if _, ok := l.tasks[namespace]; !ok {
|
||||||
|
l.tasks[namespace] = make(map[string]Task)
|
||||||
|
}
|
||||||
|
if _, ok := l.tasks[namespace][id]; ok {
|
||||||
|
return ErrTaskAlreadyExists
|
||||||
|
}
|
||||||
|
l.tasks[namespace][id] = t
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *TaskList) Delete(ctx context.Context, t Task) {
|
||||||
|
l.mu.Lock()
|
||||||
|
defer l.mu.Unlock()
|
||||||
|
namespace, err := namespaces.NamespaceRequired(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
tasks, ok := l.tasks[namespace]
|
||||||
|
if ok {
|
||||||
|
delete(tasks, t.ID())
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user