Move task list to runtimes
Signed-off-by: Michael Crosby <crosbymichael@gmail.com>
This commit is contained in:
parent
a40f307e88
commit
588c11852b
124
linux/runtime.go
124
linux/runtime.go
@ -5,12 +5,14 @@ package linux
|
|||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/containerd/containerd/api/services/shim"
|
"github.com/containerd/containerd/api/services/shim"
|
||||||
@ -25,6 +27,11 @@ import (
|
|||||||
"golang.org/x/sys/unix"
|
"golang.org/x/sys/unix"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
ErrTaskNotExists = errors.New("task does not exist")
|
||||||
|
ErrTaskAlreadyExists = errors.New("task already exists")
|
||||||
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
runtimeName = "linux"
|
runtimeName = "linux"
|
||||||
configFilename = "config.json"
|
configFilename = "config.json"
|
||||||
@ -54,6 +61,71 @@ type Config struct {
|
|||||||
NoShim bool `toml:"no_shim,omitempty"`
|
NoShim bool `toml:"no_shim,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func newTaskList() *taskList {
|
||||||
|
return &taskList{
|
||||||
|
tasks: make(map[string]map[string]*Task),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type taskList struct {
|
||||||
|
mu sync.Mutex
|
||||||
|
tasks map[string]map[string]*Task
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *taskList) get(ctx context.Context, id string) (*Task, error) {
|
||||||
|
l.mu.Lock()
|
||||||
|
defer l.mu.Unlock()
|
||||||
|
namespace, err := namespaces.NamespaceRequired(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
tasks, ok := l.tasks[namespace]
|
||||||
|
if !ok {
|
||||||
|
return nil, ErrTaskNotExists
|
||||||
|
}
|
||||||
|
t, ok := tasks[id]
|
||||||
|
if !ok {
|
||||||
|
return nil, ErrTaskNotExists
|
||||||
|
}
|
||||||
|
return t, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *taskList) add(ctx context.Context, t *Task) error {
|
||||||
|
namespace, err := namespaces.NamespaceRequired(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return l.addWithNamespace(namespace, t)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *taskList) addWithNamespace(namespace string, t *Task) error {
|
||||||
|
l.mu.Lock()
|
||||||
|
defer l.mu.Unlock()
|
||||||
|
|
||||||
|
id := t.containerID
|
||||||
|
if _, ok := l.tasks[namespace]; !ok {
|
||||||
|
l.tasks[namespace] = make(map[string]*Task)
|
||||||
|
}
|
||||||
|
if _, ok := l.tasks[namespace][id]; ok {
|
||||||
|
return ErrTaskAlreadyExists
|
||||||
|
}
|
||||||
|
l.tasks[namespace][id] = t
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *taskList) delete(ctx context.Context, t *Task) {
|
||||||
|
l.mu.Lock()
|
||||||
|
defer l.mu.Unlock()
|
||||||
|
namespace, err := namespaces.NamespaceRequired(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
tasks, ok := l.tasks[namespace]
|
||||||
|
if ok {
|
||||||
|
delete(tasks, t.containerID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func New(ic *plugin.InitContext) (interface{}, error) {
|
func New(ic *plugin.InitContext) (interface{}, error) {
|
||||||
path := filepath.Join(ic.State, runtimeName)
|
path := filepath.Join(ic.State, runtimeName)
|
||||||
if err := os.MkdirAll(path, 0700); err != nil {
|
if err := os.MkdirAll(path, 0700); err != nil {
|
||||||
@ -70,9 +142,20 @@ func New(ic *plugin.InitContext) (interface{}, error) {
|
|||||||
eventsContext: c,
|
eventsContext: c,
|
||||||
eventsCancel: cancel,
|
eventsCancel: cancel,
|
||||||
monitor: ic.Monitor,
|
monitor: ic.Monitor,
|
||||||
|
tasks: newTaskList(),
|
||||||
}
|
}
|
||||||
// set the events output for a monitor if it generates events
|
// set the events output for a monitor if it generates events
|
||||||
ic.Monitor.Events(r.events)
|
ic.Monitor.Events(r.events)
|
||||||
|
tasks, err := r.loadAllTasks(ic.Context)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
for _, t := range tasks {
|
||||||
|
if err := r.tasks.addWithNamespace(t.namespace, t); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// load all tasks from disk
|
||||||
return r, nil
|
return r, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -86,6 +169,7 @@ type Runtime struct {
|
|||||||
eventsContext context.Context
|
eventsContext context.Context
|
||||||
eventsCancel func()
|
eventsCancel func()
|
||||||
monitor plugin.TaskMonitor
|
monitor plugin.TaskMonitor
|
||||||
|
tasks *taskList
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Runtime) Create(ctx context.Context, id string, opts plugin.CreateOpts) (t plugin.Task, err error) {
|
func (r *Runtime) Create(ctx context.Context, id string, opts plugin.CreateOpts) (t plugin.Task, err error) {
|
||||||
@ -134,6 +218,9 @@ func (r *Runtime) Create(ctx context.Context, id string, opts plugin.CreateOpts)
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
c := newTask(id, namespace, opts.Spec, s)
|
c := newTask(id, namespace, opts.Spec, s)
|
||||||
|
if err := r.tasks.add(ctx, c); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
// after the task is created, add it to the monitor
|
// after the task is created, add it to the monitor
|
||||||
if err = r.monitor.Monitor(c); err != nil {
|
if err = r.monitor.Monitor(c); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -160,6 +247,7 @@ func (r *Runtime) Delete(ctx context.Context, c plugin.Task) (*plugin.Exit, erro
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
lc.shim.Exit(ctx, &shim.ExitRequest{})
|
lc.shim.Exit(ctx, &shim.ExitRequest{})
|
||||||
|
r.tasks.delete(ctx, lc)
|
||||||
return &plugin.Exit{
|
return &plugin.Exit{
|
||||||
Status: rsp.ExitStatus,
|
Status: rsp.ExitStatus,
|
||||||
Timestamp: rsp.ExitedAt,
|
Timestamp: rsp.ExitedAt,
|
||||||
@ -167,11 +255,27 @@ func (r *Runtime) Delete(ctx context.Context, c plugin.Task) (*plugin.Exit, erro
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (r *Runtime) Tasks(ctx context.Context) ([]plugin.Task, error) {
|
func (r *Runtime) Tasks(ctx context.Context) ([]plugin.Task, error) {
|
||||||
dir, err := ioutil.ReadDir(r.root)
|
namespace, err := namespaces.NamespaceRequired(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
var o []plugin.Task
|
var o []plugin.Task
|
||||||
|
tasks, ok := r.tasks.tasks[namespace]
|
||||||
|
if !ok {
|
||||||
|
return o, nil
|
||||||
|
}
|
||||||
|
for _, t := range tasks {
|
||||||
|
o = append(o, t)
|
||||||
|
}
|
||||||
|
return o, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *Runtime) loadAllTasks(ctx context.Context) ([]*Task, error) {
|
||||||
|
dir, err := ioutil.ReadDir(r.root)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
var o []*Task
|
||||||
for _, fi := range dir {
|
for _, fi := range dir {
|
||||||
if !fi.IsDir() {
|
if !fi.IsDir() {
|
||||||
continue
|
continue
|
||||||
@ -186,19 +290,15 @@ func (r *Runtime) Tasks(ctx context.Context) ([]plugin.Task, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (r *Runtime) Get(ctx context.Context, id string) (plugin.Task, error) {
|
func (r *Runtime) Get(ctx context.Context, id string) (plugin.Task, error) {
|
||||||
namespace, err := namespaces.NamespaceRequired(ctx)
|
return r.tasks.get(ctx, id)
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return r.loadTask(ctx, filepath.Join(r.root, namespace, id))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Runtime) loadTasks(ctx context.Context, ns string) ([]plugin.Task, error) {
|
func (r *Runtime) loadTasks(ctx context.Context, ns string) ([]*Task, error) {
|
||||||
dir, err := ioutil.ReadDir(filepath.Join(r.root, ns))
|
dir, err := ioutil.ReadDir(filepath.Join(r.root, ns))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
var o []plugin.Task
|
var o []*Task
|
||||||
for _, fi := range dir {
|
for _, fi := range dir {
|
||||||
if !fi.IsDir() {
|
if !fi.IsDir() {
|
||||||
continue
|
continue
|
||||||
@ -206,7 +306,7 @@ func (r *Runtime) loadTasks(ctx context.Context, ns string) ([]plugin.Task, erro
|
|||||||
id := fi.Name()
|
id := fi.Name()
|
||||||
// TODO: optimize this if it is call frequently to list all containers
|
// TODO: optimize this if it is call frequently to list all containers
|
||||||
// i.e. dont' reconnect to the the shim's ever time
|
// i.e. dont' reconnect to the the shim's ever time
|
||||||
c, err := r.loadTask(ctx, filepath.Join(r.root, ns, id))
|
c, err := r.loadTask(ns, filepath.Join(r.root, ns, id))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.G(ctx).WithError(err).Warnf("failed to load container %s/%s", ns, id)
|
log.G(ctx).WithError(err).Warnf("failed to load container %s/%s", ns, id)
|
||||||
// if we fail to load the container, connect to the shim, make sure if the shim has
|
// if we fail to load the container, connect to the shim, make sure if the shim has
|
||||||
@ -291,11 +391,7 @@ func (r *Runtime) deleteBundle(namespace, id string) error {
|
|||||||
return os.RemoveAll(filepath.Join(r.root, namespace, id))
|
return os.RemoveAll(filepath.Join(r.root, namespace, id))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Runtime) loadTask(ctx context.Context, path string) (*Task, error) {
|
func (r *Runtime) loadTask(namespace, path string) (*Task, error) {
|
||||||
namespace, err := namespaces.NamespaceRequired(ctx)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
id := filepath.Base(path)
|
id := filepath.Base(path)
|
||||||
s, err := loadShim(path, namespace, r.remote)
|
s, err := loadShim(path, namespace, r.remote)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -47,7 +47,7 @@ type cgroupsMonitor struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func getID(t plugin.Task) string {
|
func getID(t plugin.Task) string {
|
||||||
return fmt.Sprintf("%s-%s", t.Info().ID, t.Info().Namespace)
|
return fmt.Sprintf("%s-%s", t.Info().Namespace, t.Info().ID)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *cgroupsMonitor) Monitor(c plugin.Task) error {
|
func (m *cgroupsMonitor) Monitor(c plugin.Task) error {
|
||||||
|
@ -159,7 +159,7 @@ func (r *Runtime) Get(ctx context.Context, id string) (plugin.Task, error) {
|
|||||||
defer r.Unlock()
|
defer r.Unlock()
|
||||||
c, ok := r.containers[id]
|
c, ok := r.containers[id]
|
||||||
if !ok {
|
if !ok {
|
||||||
return fmt.Errorf("container %s does not exit", id)
|
return nil, fmt.Errorf("container %s does not exit", id)
|
||||||
}
|
}
|
||||||
return c, nil
|
return c, nil
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user