From 745398b2e93909e735844dad0797ffc9138c26af Mon Sep 17 00:00:00 2001 From: Michael Crosby Date: Thu, 8 Jun 2017 16:34:52 -0700 Subject: [PATCH 1/3] Remove tasks map from service Signed-off-by: Michael Crosby --- linux/runtime.go | 19 ++++++--- linux/task.go | 5 ++- plugin/container.go | 1 + plugin/runtime.go | 2 + services/execution/service.go | 76 +++++++++++++---------------------- windows/runtime.go | 12 +++++- 6 files changed, 58 insertions(+), 57 deletions(-) diff --git a/linux/runtime.go b/linux/runtime.go index 423dd2806..4e45cbbe9 100644 --- a/linux/runtime.go +++ b/linux/runtime.go @@ -133,7 +133,7 @@ func (r *Runtime) Create(ctx context.Context, id string, opts plugin.CreateOpts) os.RemoveAll(path) return nil, err } - c := newTask(id, opts.Spec, s) + c := newTask(id, namespace, opts.Spec, s) // after the task is created, add it to the monitor if err = r.monitor.Monitor(c); err != nil { return nil, err @@ -176,7 +176,7 @@ func (r *Runtime) Tasks(ctx context.Context) ([]plugin.Task, error) { if !fi.IsDir() { continue } - tasks, err := r.loadContainers(ctx, fi.Name()) + tasks, err := r.loadTasks(ctx, fi.Name()) if err != nil { return nil, err } @@ -185,7 +185,15 @@ func (r *Runtime) Tasks(ctx context.Context) ([]plugin.Task, error) { return o, nil } -func (r *Runtime) loadContainers(ctx context.Context, ns string) ([]plugin.Task, error) { +func (r *Runtime) Get(ctx context.Context, id string) (plugin.Task, error) { + namespace, err := namespaces.NamespaceRequired(ctx) + if err != nil { + return nil, err + } + return r.loadTask(ctx, filepath.Join(r.root, namespace, id)) +} + +func (r *Runtime) loadTasks(ctx context.Context, ns string) ([]plugin.Task, error) { dir, err := ioutil.ReadDir(filepath.Join(r.root, ns)) if err != nil { return nil, err @@ -198,7 +206,7 @@ func (r *Runtime) loadContainers(ctx context.Context, ns string) ([]plugin.Task, id := fi.Name() // TODO: optimize this if it is call frequently to list all containers // i.e. dont' reconnect to the the shim's ever time - c, err := r.loadContainer(ctx, filepath.Join(r.root, ns, id)) + c, err := r.loadTask(ctx, filepath.Join(r.root, ns, id)) if err != nil { log.G(ctx).WithError(err).Warnf("failed to load container %s/%s", ns, id) // if we fail to load the container, connect to the shim, make sure if the shim has @@ -283,7 +291,7 @@ func (r *Runtime) deleteBundle(namespace, id string) error { return os.RemoveAll(filepath.Join(r.root, namespace, id)) } -func (r *Runtime) loadContainer(ctx context.Context, path string) (*Task, error) { +func (r *Runtime) loadTask(ctx context.Context, path string) (*Task, error) { namespace, err := namespaces.NamespaceRequired(ctx) if err != nil { return nil, err @@ -304,6 +312,7 @@ func (r *Runtime) loadContainer(ctx context.Context, path string) (*Task, error) containerID: id, shim: s, spec: data, + namespace: namespace, }, nil } diff --git a/linux/task.go b/linux/task.go index 1a04033a4..d8d341902 100644 --- a/linux/task.go +++ b/linux/task.go @@ -16,13 +16,15 @@ type Task struct { containerID string spec []byte shim shim.ShimClient + namespace string } -func newTask(id string, spec []byte, shim shim.ShimClient) *Task { +func newTask(id, namespace string, spec []byte, shim shim.ShimClient) *Task { return &Task{ containerID: id, shim: shim, spec: spec, + namespace: namespace, } } @@ -32,6 +34,7 @@ func (c *Task) Info() plugin.TaskInfo { ContainerID: c.containerID, Runtime: runtimeName, Spec: c.spec, + Namespace: c.namespace, } } diff --git a/plugin/container.go b/plugin/container.go index 5c013623c..0c4301f72 100644 --- a/plugin/container.go +++ b/plugin/container.go @@ -7,6 +7,7 @@ type TaskInfo struct { ContainerID string Runtime string Spec []byte + Namespace string } type Task interface { diff --git a/plugin/runtime.go b/plugin/runtime.go index 5babc2cd7..2fba9faef 100644 --- a/plugin/runtime.go +++ b/plugin/runtime.go @@ -34,6 +34,8 @@ type Exit struct { type Runtime interface { // Create creates a container with the provided id and options Create(ctx context.Context, id string, opts CreateOpts) (Task, error) + // Get returns a container + Get(context.Context, string) (Task, error) // Containers returns all the current containers for the runtime Tasks(context.Context) ([]Task, error) // Delete removes the container in the runtime diff --git a/services/execution/service.go b/services/execution/service.go index 65923846d..93f9e1254 100644 --- a/services/execution/service.go +++ b/services/execution/service.go @@ -7,7 +7,6 @@ import ( "io/ioutil" "os" "path/filepath" - "sync" "github.com/boltdb/bolt" api "github.com/containerd/containerd/api/services/execution" @@ -48,7 +47,6 @@ func New(ic *plugin.InitContext) (interface{}, error) { } return &Service{ runtimes: ic.Runtimes, - tasks: make(map[string]plugin.Task), db: ic.Meta, collector: c, store: ic.Content, @@ -56,10 +54,7 @@ func New(ic *plugin.InitContext) (interface{}, error) { } type Service struct { - mu sync.Mutex - runtimes map[string]plugin.Runtime - tasks map[string]plugin.Task db *bolt.DB collector *collector store content.Store @@ -67,16 +62,6 @@ type Service struct { func (s *Service) Register(server *grpc.Server) error { api.RegisterTasksServer(server, s) - // load all tasks - for _, r := range s.runtimes { - tasks, err := r.Tasks(context.Background()) - if err != nil { - return err - } - for _, c := range tasks { - s.tasks[c.Info().ContainerID] = c - } - } return nil } @@ -142,18 +127,10 @@ func (s *Service) Create(ctx context.Context, r *api.CreateRequest) (*api.Create if err != nil { return nil, err } - s.mu.Lock() - if _, ok := s.tasks[r.ContainerID]; ok { - s.mu.Unlock() - return nil, grpc.Errorf(codes.AlreadyExists, "task %v already exists", r.ContainerID) - } c, err := runtime.Create(ctx, r.ContainerID, opts) if err != nil { - s.mu.Unlock() return nil, err } - s.tasks[r.ContainerID] = c - s.mu.Unlock() state, err := c.State(ctx) if err != nil { log.G(ctx).Error(err) @@ -165,7 +142,7 @@ func (s *Service) Create(ctx context.Context, r *api.CreateRequest) (*api.Create } func (s *Service) Start(ctx context.Context, r *api.StartRequest) (*google_protobuf.Empty, error) { - c, err := s.getTask(r.ContainerID) + c, err := s.getTask(ctx, r.ContainerID) if err != nil { return nil, err } @@ -176,7 +153,7 @@ func (s *Service) Start(ctx context.Context, r *api.StartRequest) (*google_proto } func (s *Service) Delete(ctx context.Context, r *api.DeleteRequest) (*api.DeleteResponse, error) { - c, err := s.getTask(r.ContainerID) + c, err := s.getTask(ctx, r.ContainerID) if err != nil { return nil, err } @@ -188,9 +165,6 @@ func (s *Service) Delete(ctx context.Context, r *api.DeleteRequest) (*api.Delete if err != nil { return nil, err } - - delete(s.tasks, r.ContainerID) - return &api.DeleteResponse{ ExitStatus: exit.Status, ExitedAt: exit.Timestamp, @@ -233,7 +207,7 @@ func taskFromContainerd(ctx context.Context, c plugin.Task) (*task.Task, error) } func (s *Service) Info(ctx context.Context, r *api.InfoRequest) (*api.InfoResponse, error) { - task, err := s.getTask(r.ContainerID) + task, err := s.getTask(ctx, r.ContainerID) if err != nil { return nil, err } @@ -248,20 +222,24 @@ func (s *Service) Info(ctx context.Context, r *api.InfoRequest) (*api.InfoRespon func (s *Service) List(ctx context.Context, r *api.ListRequest) (*api.ListResponse, error) { resp := &api.ListResponse{} - s.mu.Lock() - defer s.mu.Unlock() - for _, cd := range s.tasks { - c, err := taskFromContainerd(ctx, cd) + for _, r := range s.runtimes { + tasks, err := r.Tasks(ctx) if err != nil { return nil, err } - resp.Tasks = append(resp.Tasks, c) + for _, t := range tasks { + tt, err := taskFromContainerd(ctx, t) + if err != nil { + return nil, err + } + resp.Tasks = append(resp.Tasks, tt) + } } return resp, nil } func (s *Service) Pause(ctx context.Context, r *api.PauseRequest) (*google_protobuf.Empty, error) { - c, err := s.getTask(r.ContainerID) + c, err := s.getTask(ctx, r.ContainerID) if err != nil { return nil, err } @@ -273,7 +251,7 @@ func (s *Service) Pause(ctx context.Context, r *api.PauseRequest) (*google_proto } func (s *Service) Resume(ctx context.Context, r *api.ResumeRequest) (*google_protobuf.Empty, error) { - c, err := s.getTask(r.ContainerID) + c, err := s.getTask(ctx, r.ContainerID) if err != nil { return nil, err } @@ -285,7 +263,7 @@ func (s *Service) Resume(ctx context.Context, r *api.ResumeRequest) (*google_pro } func (s *Service) Kill(ctx context.Context, r *api.KillRequest) (*google_protobuf.Empty, error) { - c, err := s.getTask(r.ContainerID) + c, err := s.getTask(ctx, r.ContainerID) if err != nil { return nil, err } @@ -306,7 +284,7 @@ func (s *Service) Kill(ctx context.Context, r *api.KillRequest) (*google_protobu } func (s *Service) Processes(ctx context.Context, r *api.ProcessesRequest) (*api.ProcessesResponse, error) { - c, err := s.getTask(r.ContainerID) + c, err := s.getTask(ctx, r.ContainerID) if err != nil { return nil, err } @@ -338,7 +316,7 @@ func (s *Service) Events(r *api.EventsRequest, server api.Tasks_EventsServer) er } func (s *Service) Exec(ctx context.Context, r *api.ExecRequest) (*api.ExecResponse, error) { - c, err := s.getTask(r.ContainerID) + c, err := s.getTask(ctx, r.ContainerID) if err != nil { return nil, err } @@ -364,7 +342,7 @@ func (s *Service) Exec(ctx context.Context, r *api.ExecRequest) (*api.ExecRespon } func (s *Service) Pty(ctx context.Context, r *api.PtyRequest) (*google_protobuf.Empty, error) { - c, err := s.getTask(r.ContainerID) + c, err := s.getTask(ctx, r.ContainerID) if err != nil { return nil, err } @@ -378,7 +356,7 @@ func (s *Service) Pty(ctx context.Context, r *api.PtyRequest) (*google_protobuf. } func (s *Service) CloseStdin(ctx context.Context, r *api.CloseStdinRequest) (*google_protobuf.Empty, error) { - c, err := s.getTask(r.ContainerID) + c, err := s.getTask(ctx, r.ContainerID) if err != nil { return nil, err } @@ -389,7 +367,7 @@ func (s *Service) CloseStdin(ctx context.Context, r *api.CloseStdinRequest) (*go } func (s *Service) Checkpoint(ctx context.Context, r *api.CheckpointRequest) (*api.CheckpointResponse, error) { - c, err := s.getTask(r.ContainerID) + c, err := s.getTask(ctx, r.ContainerID) if err != nil { return nil, err } @@ -454,14 +432,14 @@ func (s *Service) writeContent(ctx context.Context, mediaType, ref string, r io. }, nil } -func (s *Service) getTask(id string) (plugin.Task, error) { - s.mu.Lock() - c, ok := s.tasks[id] - s.mu.Unlock() - if !ok { - return nil, grpc.Errorf(codes.NotFound, "task %v not found", id) +func (s *Service) getTask(ctx context.Context, id string) (plugin.Task, error) { + for _, r := range s.runtimes { + t, err := r.Get(ctx, id) + if err == nil { + return t, nil + } } - return c, nil + return nil, grpc.Errorf(codes.NotFound, "task %v not found", id) } func (s *Service) getRuntime(name string) (plugin.Runtime, error) { diff --git a/windows/runtime.go b/windows/runtime.go index e28d00e3e..4b148ad5f 100644 --- a/windows/runtime.go +++ b/windows/runtime.go @@ -34,7 +34,6 @@ func init() { } func New(ic *plugin.InitContext) (interface{}, error) { - rootDir := filepath.Join(ic.Root, runtimeName) if err := os.MkdirAll(rootDir, 0755); err != nil { return nil, errors.Wrapf(err, "could not create state directory at %s", rootDir) @@ -152,10 +151,19 @@ func (r *Runtime) Tasks(ctx context.Context) ([]plugin.Task, error) { list = append(list, c) } } - return list, nil } +func (r *Runtime) Get(ctx context.Context, id string) (plugin.Task, error) { + r.Lock() + defer r.Unlock() + c, ok := r.containers[id] + if !ok { + return fmt.Errorf("container %s does not exit", id) + } + return c, nil +} + func (r *Runtime) Events(ctx context.Context) <-chan *plugin.Event { return r.events } From a40f307e887ce24916a0bafec0a5ed551cbd002e Mon Sep 17 00:00:00 2001 From: Michael Crosby Date: Thu, 8 Jun 2017 16:46:13 -0700 Subject: [PATCH 2/3] Namespace cgroups monitor ids Signed-off-by: Michael Crosby --- metrics/cgroups/cgroups.go | 9 +++++++-- spec_unix.go | 1 + 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/metrics/cgroups/cgroups.go b/metrics/cgroups/cgroups.go index 1fb3ea373..6752a0f24 100644 --- a/metrics/cgroups/cgroups.go +++ b/metrics/cgroups/cgroups.go @@ -3,6 +3,7 @@ package cgroups import ( + "fmt" "time" "github.com/containerd/cgroups" @@ -45,8 +46,12 @@ type cgroupsMonitor struct { events chan<- *plugin.Event } +func getID(t plugin.Task) string { + return fmt.Sprintf("%s-%s", t.Info().ID, t.Info().Namespace) +} + func (m *cgroupsMonitor) Monitor(c plugin.Task) error { - id := c.Info().ID + id := getID(c) state, err := c.State(m.context) if err != nil { return err @@ -62,7 +67,7 @@ func (m *cgroupsMonitor) Monitor(c plugin.Task) error { } func (m *cgroupsMonitor) Stop(c plugin.Task) error { - m.collector.Remove(c.Info().ID) + m.collector.Remove(getID(c)) return nil } diff --git a/spec_unix.go b/spec_unix.go index f0b071068..c74d88e5f 100644 --- a/spec_unix.go +++ b/spec_unix.go @@ -155,6 +155,7 @@ func createDefaultSpec() (*specs.Spec, error) { }, }, Linux: &specs.Linux{ + // TODO (@crosbymichael) make sure we don't have have two containers in the same cgroup Resources: &specs.LinuxResources{ Devices: []specs.LinuxDeviceCgroup{ { From 588c11852b03e577fab19abd342efeba6cb29067 Mon Sep 17 00:00:00 2001 From: Michael Crosby Date: Thu, 8 Jun 2017 17:24:33 -0700 Subject: [PATCH 3/3] Move task list to runtimes Signed-off-by: Michael Crosby --- linux/runtime.go | 124 ++++++++++++++++++++++++++++++++----- metrics/cgroups/cgroups.go | 2 +- windows/runtime.go | 2 +- 3 files changed, 112 insertions(+), 16 deletions(-) diff --git a/linux/runtime.go b/linux/runtime.go index 4e45cbbe9..4b4267152 100644 --- a/linux/runtime.go +++ b/linux/runtime.go @@ -5,12 +5,14 @@ package linux import ( "bytes" "context" + "errors" "fmt" "io" "io/ioutil" "os" "path/filepath" "strings" + "sync" "time" "github.com/containerd/containerd/api/services/shim" @@ -25,6 +27,11 @@ import ( "golang.org/x/sys/unix" ) +var ( + ErrTaskNotExists = errors.New("task does not exist") + ErrTaskAlreadyExists = errors.New("task already exists") +) + const ( runtimeName = "linux" configFilename = "config.json" @@ -54,6 +61,71 @@ type Config struct { NoShim bool `toml:"no_shim,omitempty"` } +func newTaskList() *taskList { + return &taskList{ + tasks: make(map[string]map[string]*Task), + } +} + +type taskList struct { + mu sync.Mutex + tasks map[string]map[string]*Task +} + +func (l *taskList) get(ctx context.Context, id string) (*Task, error) { + l.mu.Lock() + defer l.mu.Unlock() + namespace, err := namespaces.NamespaceRequired(ctx) + if err != nil { + return nil, err + } + tasks, ok := l.tasks[namespace] + if !ok { + return nil, ErrTaskNotExists + } + t, ok := tasks[id] + if !ok { + return nil, ErrTaskNotExists + } + return t, nil +} + +func (l *taskList) add(ctx context.Context, t *Task) error { + namespace, err := namespaces.NamespaceRequired(ctx) + if err != nil { + return err + } + return l.addWithNamespace(namespace, t) +} + +func (l *taskList) addWithNamespace(namespace string, t *Task) error { + l.mu.Lock() + defer l.mu.Unlock() + + id := t.containerID + if _, ok := l.tasks[namespace]; !ok { + l.tasks[namespace] = make(map[string]*Task) + } + if _, ok := l.tasks[namespace][id]; ok { + return ErrTaskAlreadyExists + } + l.tasks[namespace][id] = t + return nil +} + +func (l *taskList) delete(ctx context.Context, t *Task) { + l.mu.Lock() + defer l.mu.Unlock() + namespace, err := namespaces.NamespaceRequired(ctx) + if err != nil { + return + } + tasks, ok := l.tasks[namespace] + if ok { + delete(tasks, t.containerID) + } +} + func New(ic *plugin.InitContext) (interface{}, error) { path := filepath.Join(ic.State, runtimeName) if err := os.MkdirAll(path, 0700); err != nil { @@ -70,9 +142,20 @@ func New(ic *plugin.InitContext) (interface{}, error) { eventsContext: c, eventsCancel: cancel, monitor: ic.Monitor, + tasks: newTaskList(), } // set the events output for a monitor if it generates events ic.Monitor.Events(r.events) + tasks, err := r.loadAllTasks(ic.Context) + if err != nil { + return nil, err + } + for _, t := range tasks { + if err := r.tasks.addWithNamespace(t.namespace, t); err != nil { + return nil, err + } + } + // load all tasks from disk return r, nil } @@ -86,6 +169,7 @@ type Runtime struct { eventsContext context.Context eventsCancel func() monitor plugin.TaskMonitor + tasks *taskList } func (r *Runtime) Create(ctx context.Context, id string, opts plugin.CreateOpts) (t plugin.Task, err error) { @@ -134,6 +218,9 @@ func (r *Runtime) Create(ctx context.Context, id string, opts plugin.CreateOpts) return nil, err } c := newTask(id, namespace, opts.Spec, s) + if err := r.tasks.add(ctx, c); err != nil { + return nil, err + } // after the task is created, add it to the monitor if err = r.monitor.Monitor(c); err != nil { return nil, err @@ -160,6 +247,7 @@ func (r *Runtime) Delete(ctx context.Context, c plugin.Task) (*plugin.Exit, erro return nil, err } lc.shim.Exit(ctx, &shim.ExitRequest{}) + r.tasks.delete(ctx, lc) return &plugin.Exit{ Status: rsp.ExitStatus, Timestamp: rsp.ExitedAt, @@ -167,11 +255,27 @@ func (r *Runtime) Delete(ctx context.Context, c plugin.Task) (*plugin.Exit, erro } func (r *Runtime) Tasks(ctx context.Context) ([]plugin.Task, error) { - dir, err := ioutil.ReadDir(r.root) + namespace, err := namespaces.NamespaceRequired(ctx) if err != nil { return nil, err } var o []plugin.Task + tasks, ok := r.tasks.tasks[namespace] + if !ok { + return o, nil + } + for _, t := range tasks { + o = append(o, t) + } + return o, nil +} + +func (r *Runtime) loadAllTasks(ctx context.Context) ([]*Task, error) { + dir, err := ioutil.ReadDir(r.root) + if err != nil { + return nil, err + } + var o []*Task for _, fi := range dir { if !fi.IsDir() { continue @@ -186,19 +290,15 @@ func (r *Runtime) Tasks(ctx context.Context) ([]plugin.Task, error) { } func (r *Runtime) Get(ctx context.Context, id string) (plugin.Task, error) { - namespace, err := namespaces.NamespaceRequired(ctx) - if err != nil { - return nil, err - } - return r.loadTask(ctx, filepath.Join(r.root, namespace, id)) + return r.tasks.get(ctx, id) } -func (r *Runtime) loadTasks(ctx context.Context, ns string) ([]plugin.Task, error) { +func (r *Runtime) loadTasks(ctx context.Context, ns string) ([]*Task, error) { dir, err := ioutil.ReadDir(filepath.Join(r.root, ns)) if err != nil { return nil, err } - var o []plugin.Task + var o []*Task for _, fi := range dir { if !fi.IsDir() { continue @@ -206,7 +306,7 @@ func (r *Runtime) loadTasks(ctx context.Context, ns string) ([]plugin.Task, erro id := fi.Name() // TODO: optimize this if it is call frequently to list all containers // i.e. dont' reconnect to the the shim's ever time - c, err := r.loadTask(ctx, filepath.Join(r.root, ns, id)) + c, err := r.loadTask(ns, filepath.Join(r.root, ns, id)) if err != nil { log.G(ctx).WithError(err).Warnf("failed to load container %s/%s", ns, id) // if we fail to load the container, connect to the shim, make sure if the shim has @@ -291,11 +391,7 @@ func (r *Runtime) deleteBundle(namespace, id string) error { return os.RemoveAll(filepath.Join(r.root, namespace, id)) } -func (r *Runtime) loadTask(ctx context.Context, path string) (*Task, error) { - namespace, err := namespaces.NamespaceRequired(ctx) - if err != nil { - return nil, err - } +func (r *Runtime) loadTask(namespace, path string) (*Task, error) { id := filepath.Base(path) s, err := loadShim(path, namespace, r.remote) if err != nil { diff --git a/metrics/cgroups/cgroups.go b/metrics/cgroups/cgroups.go index 6752a0f24..6d1750205 100644 --- a/metrics/cgroups/cgroups.go +++ b/metrics/cgroups/cgroups.go @@ -47,7 +47,7 @@ type cgroupsMonitor struct { } func getID(t plugin.Task) string { - return fmt.Sprintf("%s-%s", t.Info().ID, t.Info().Namespace) + return fmt.Sprintf("%s-%s", t.Info().Namespace, t.Info().ID) } func (m *cgroupsMonitor) Monitor(c plugin.Task) error { diff --git a/windows/runtime.go b/windows/runtime.go index 4b148ad5f..3c912f992 100644 --- a/windows/runtime.go +++ b/windows/runtime.go @@ -159,7 +159,7 @@ func (r *Runtime) Get(ctx context.Context, id string) (plugin.Task, error) { defer r.Unlock() c, ok := r.containers[id] if !ok { - return fmt.Errorf("container %s does not exit", id) + return nil, fmt.Errorf("container %s does not exit", id) } return c, nil }