diff --git a/linux/runtime.go b/linux/runtime.go index 4e45cbbe9..4b4267152 100644 --- a/linux/runtime.go +++ b/linux/runtime.go @@ -5,12 +5,14 @@ package linux import ( "bytes" "context" + "errors" "fmt" "io" "io/ioutil" "os" "path/filepath" "strings" + "sync" "time" "github.com/containerd/containerd/api/services/shim" @@ -25,6 +27,11 @@ import ( "golang.org/x/sys/unix" ) +var ( + ErrTaskNotExists = errors.New("task does not exist") + ErrTaskAlreadyExists = errors.New("task already exists") +) + const ( runtimeName = "linux" configFilename = "config.json" @@ -54,6 +61,71 @@ type Config struct { NoShim bool `toml:"no_shim,omitempty"` } +func newTaskList() *taskList { + return &taskList{ + tasks: make(map[string]map[string]*Task), + } +} + +type taskList struct { + mu sync.Mutex + tasks map[string]map[string]*Task +} + +func (l *taskList) get(ctx context.Context, id string) (*Task, error) { + l.mu.Lock() + defer l.mu.Unlock() + namespace, err := namespaces.NamespaceRequired(ctx) + if err != nil { + return nil, err + } + tasks, ok := l.tasks[namespace] + if !ok { + return nil, ErrTaskNotExists + } + t, ok := tasks[id] + if !ok { + return nil, ErrTaskNotExists + } + return t, nil +} + +func (l *taskList) add(ctx context.Context, t *Task) error { + namespace, err := namespaces.NamespaceRequired(ctx) + if err != nil { + return err + } + return l.addWithNamespace(namespace, t) +} + +func (l *taskList) addWithNamespace(namespace string, t *Task) error { + l.mu.Lock() + defer l.mu.Unlock() + + id := t.containerID + if _, ok := l.tasks[namespace]; !ok { + l.tasks[namespace] = make(map[string]*Task) + } + if _, ok := l.tasks[namespace][id]; ok { + return ErrTaskAlreadyExists + } + l.tasks[namespace][id] = t + return nil +} + +func (l *taskList) delete(ctx context.Context, t *Task) { + l.mu.Lock() + defer l.mu.Unlock() + namespace, err := namespaces.NamespaceRequired(ctx) + if err != nil { + return + } + tasks, ok := l.tasks[namespace] + if ok { + delete(tasks, t.containerID) + } +} + func New(ic *plugin.InitContext) (interface{}, error) { path := filepath.Join(ic.State, runtimeName) if err := os.MkdirAll(path, 0700); err != nil { @@ -70,9 +142,20 @@ func New(ic *plugin.InitContext) (interface{}, error) { eventsContext: c, eventsCancel: cancel, monitor: ic.Monitor, + tasks: newTaskList(), } // set the events output for a monitor if it generates events ic.Monitor.Events(r.events) + tasks, err := r.loadAllTasks(ic.Context) + if err != nil { + return nil, err + } + for _, t := range tasks { + if err := r.tasks.addWithNamespace(t.namespace, t); err != nil { + return nil, err + } + } + // load all tasks from disk return r, nil } @@ -86,6 +169,7 @@ type Runtime struct { eventsContext context.Context eventsCancel func() monitor plugin.TaskMonitor + tasks *taskList } func (r *Runtime) Create(ctx context.Context, id string, opts plugin.CreateOpts) (t plugin.Task, err error) { @@ -134,6 +218,9 @@ func (r *Runtime) Create(ctx context.Context, id string, opts plugin.CreateOpts) return nil, err } c := newTask(id, namespace, opts.Spec, s) + if err := r.tasks.add(ctx, c); err != nil { + return nil, err + } // after the task is created, add it to the monitor if err = r.monitor.Monitor(c); err != nil { return nil, err @@ -160,6 +247,7 @@ func (r *Runtime) Delete(ctx context.Context, c plugin.Task) (*plugin.Exit, erro return nil, err } lc.shim.Exit(ctx, &shim.ExitRequest{}) + r.tasks.delete(ctx, lc) return &plugin.Exit{ Status: rsp.ExitStatus, Timestamp: rsp.ExitedAt, @@ -167,11 +255,27 @@ func (r *Runtime) Delete(ctx context.Context, c plugin.Task) (*plugin.Exit, erro } func (r *Runtime) Tasks(ctx context.Context) ([]plugin.Task, error) { - dir, err := ioutil.ReadDir(r.root) + namespace, err := namespaces.NamespaceRequired(ctx) if err != nil { return nil, err } var o []plugin.Task + tasks, ok := r.tasks.tasks[namespace] + if !ok { + return o, nil + } + for _, t := range tasks { + o = append(o, t) + } + return o, nil +} + +func (r *Runtime) loadAllTasks(ctx context.Context) ([]*Task, error) { + dir, err := ioutil.ReadDir(r.root) + if err != nil { + return nil, err + } + var o []*Task for _, fi := range dir { if !fi.IsDir() { continue @@ -186,19 +290,15 @@ func (r *Runtime) Tasks(ctx context.Context) ([]plugin.Task, error) { } func (r *Runtime) Get(ctx context.Context, id string) (plugin.Task, error) { - namespace, err := namespaces.NamespaceRequired(ctx) - if err != nil { - return nil, err - } - return r.loadTask(ctx, filepath.Join(r.root, namespace, id)) + return r.tasks.get(ctx, id) } -func (r *Runtime) loadTasks(ctx context.Context, ns string) ([]plugin.Task, error) { +func (r *Runtime) loadTasks(ctx context.Context, ns string) ([]*Task, error) { dir, err := ioutil.ReadDir(filepath.Join(r.root, ns)) if err != nil { return nil, err } - var o []plugin.Task + var o []*Task for _, fi := range dir { if !fi.IsDir() { continue @@ -206,7 +306,7 @@ func (r *Runtime) loadTasks(ctx context.Context, ns string) ([]plugin.Task, erro id := fi.Name() // TODO: optimize this if it is call frequently to list all containers // i.e. dont' reconnect to the the shim's ever time - c, err := r.loadTask(ctx, filepath.Join(r.root, ns, id)) + c, err := r.loadTask(ns, filepath.Join(r.root, ns, id)) if err != nil { log.G(ctx).WithError(err).Warnf("failed to load container %s/%s", ns, id) // if we fail to load the container, connect to the shim, make sure if the shim has @@ -291,11 +391,7 @@ func (r *Runtime) deleteBundle(namespace, id string) error { return os.RemoveAll(filepath.Join(r.root, namespace, id)) } -func (r *Runtime) loadTask(ctx context.Context, path string) (*Task, error) { - namespace, err := namespaces.NamespaceRequired(ctx) - if err != nil { - return nil, err - } +func (r *Runtime) loadTask(namespace, path string) (*Task, error) { id := filepath.Base(path) s, err := loadShim(path, namespace, r.remote) if err != nil { diff --git a/metrics/cgroups/cgroups.go b/metrics/cgroups/cgroups.go index 6752a0f24..6d1750205 100644 --- a/metrics/cgroups/cgroups.go +++ b/metrics/cgroups/cgroups.go @@ -47,7 +47,7 @@ type cgroupsMonitor struct { } func getID(t plugin.Task) string { - return fmt.Sprintf("%s-%s", t.Info().ID, t.Info().Namespace) + return fmt.Sprintf("%s-%s", t.Info().Namespace, t.Info().ID) } func (m *cgroupsMonitor) Monitor(c plugin.Task) error { diff --git a/windows/runtime.go b/windows/runtime.go index 4b148ad5f..3c912f992 100644 --- a/windows/runtime.go +++ b/windows/runtime.go @@ -159,7 +159,7 @@ func (r *Runtime) Get(ctx context.Context, id string) (plugin.Task, error) { defer r.Unlock() c, ok := r.containers[id] if !ok { - return fmt.Errorf("container %s does not exit", id) + return nil, fmt.Errorf("container %s does not exit", id) } return c, nil }