Merge pull request #980 from crosbymichael/service-tasks

Namespace tasks within runtime
This commit is contained in:
Derek McGowan 2017-06-09 11:02:33 -07:00 committed by GitHub
commit 6c91ee2dac
8 changed files with 168 additions and 65 deletions

View File

@ -5,12 +5,14 @@ package linux
import ( import (
"bytes" "bytes"
"context" "context"
"errors"
"fmt" "fmt"
"io" "io"
"io/ioutil" "io/ioutil"
"os" "os"
"path/filepath" "path/filepath"
"strings" "strings"
"sync"
"time" "time"
"github.com/containerd/containerd/api/services/shim" "github.com/containerd/containerd/api/services/shim"
@ -25,6 +27,11 @@ import (
"golang.org/x/sys/unix" "golang.org/x/sys/unix"
) )
var (
ErrTaskNotExists = errors.New("task does not exist")
ErrTaskAlreadyExists = errors.New("task already exists")
)
const ( const (
runtimeName = "linux" runtimeName = "linux"
configFilename = "config.json" configFilename = "config.json"
@ -54,6 +61,71 @@ type Config struct {
NoShim bool `toml:"no_shim,omitempty"` NoShim bool `toml:"no_shim,omitempty"`
} }
func newTaskList() *taskList {
return &taskList{
tasks: make(map[string]map[string]*Task),
}
}
type taskList struct {
mu sync.Mutex
tasks map[string]map[string]*Task
}
func (l *taskList) get(ctx context.Context, id string) (*Task, error) {
l.mu.Lock()
defer l.mu.Unlock()
namespace, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return nil, err
}
tasks, ok := l.tasks[namespace]
if !ok {
return nil, ErrTaskNotExists
}
t, ok := tasks[id]
if !ok {
return nil, ErrTaskNotExists
}
return t, nil
}
func (l *taskList) add(ctx context.Context, t *Task) error {
namespace, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return err
}
return l.addWithNamespace(namespace, t)
}
func (l *taskList) addWithNamespace(namespace string, t *Task) error {
l.mu.Lock()
defer l.mu.Unlock()
id := t.containerID
if _, ok := l.tasks[namespace]; !ok {
l.tasks[namespace] = make(map[string]*Task)
}
if _, ok := l.tasks[namespace][id]; ok {
return ErrTaskAlreadyExists
}
l.tasks[namespace][id] = t
return nil
}
func (l *taskList) delete(ctx context.Context, t *Task) {
l.mu.Lock()
defer l.mu.Unlock()
namespace, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return
}
tasks, ok := l.tasks[namespace]
if ok {
delete(tasks, t.containerID)
}
}
func New(ic *plugin.InitContext) (interface{}, error) { func New(ic *plugin.InitContext) (interface{}, error) {
path := filepath.Join(ic.State, runtimeName) path := filepath.Join(ic.State, runtimeName)
if err := os.MkdirAll(path, 0700); err != nil { if err := os.MkdirAll(path, 0700); err != nil {
@ -70,9 +142,20 @@ func New(ic *plugin.InitContext) (interface{}, error) {
eventsContext: c, eventsContext: c,
eventsCancel: cancel, eventsCancel: cancel,
monitor: ic.Monitor, monitor: ic.Monitor,
tasks: newTaskList(),
} }
// set the events output for a monitor if it generates events // set the events output for a monitor if it generates events
ic.Monitor.Events(r.events) ic.Monitor.Events(r.events)
tasks, err := r.loadAllTasks(ic.Context)
if err != nil {
return nil, err
}
for _, t := range tasks {
if err := r.tasks.addWithNamespace(t.namespace, t); err != nil {
return nil, err
}
}
// load all tasks from disk
return r, nil return r, nil
} }
@ -86,6 +169,7 @@ type Runtime struct {
eventsContext context.Context eventsContext context.Context
eventsCancel func() eventsCancel func()
monitor plugin.TaskMonitor monitor plugin.TaskMonitor
tasks *taskList
} }
func (r *Runtime) Create(ctx context.Context, id string, opts plugin.CreateOpts) (t plugin.Task, err error) { func (r *Runtime) Create(ctx context.Context, id string, opts plugin.CreateOpts) (t plugin.Task, err error) {
@ -133,7 +217,10 @@ func (r *Runtime) Create(ctx context.Context, id string, opts plugin.CreateOpts)
os.RemoveAll(path) os.RemoveAll(path)
return nil, err return nil, err
} }
c := newTask(id, opts.Spec, s) c := newTask(id, namespace, opts.Spec, s)
if err := r.tasks.add(ctx, c); err != nil {
return nil, err
}
// after the task is created, add it to the monitor // after the task is created, add it to the monitor
if err = r.monitor.Monitor(c); err != nil { if err = r.monitor.Monitor(c); err != nil {
return nil, err return nil, err
@ -160,6 +247,7 @@ func (r *Runtime) Delete(ctx context.Context, c plugin.Task) (*plugin.Exit, erro
return nil, err return nil, err
} }
lc.shim.Exit(ctx, &shim.ExitRequest{}) lc.shim.Exit(ctx, &shim.ExitRequest{})
r.tasks.delete(ctx, lc)
return &plugin.Exit{ return &plugin.Exit{
Status: rsp.ExitStatus, Status: rsp.ExitStatus,
Timestamp: rsp.ExitedAt, Timestamp: rsp.ExitedAt,
@ -167,16 +255,32 @@ func (r *Runtime) Delete(ctx context.Context, c plugin.Task) (*plugin.Exit, erro
} }
func (r *Runtime) Tasks(ctx context.Context) ([]plugin.Task, error) { func (r *Runtime) Tasks(ctx context.Context) ([]plugin.Task, error) {
dir, err := ioutil.ReadDir(r.root) namespace, err := namespaces.NamespaceRequired(ctx)
if err != nil { if err != nil {
return nil, err return nil, err
} }
var o []plugin.Task var o []plugin.Task
tasks, ok := r.tasks.tasks[namespace]
if !ok {
return o, nil
}
for _, t := range tasks {
o = append(o, t)
}
return o, nil
}
func (r *Runtime) loadAllTasks(ctx context.Context) ([]*Task, error) {
dir, err := ioutil.ReadDir(r.root)
if err != nil {
return nil, err
}
var o []*Task
for _, fi := range dir { for _, fi := range dir {
if !fi.IsDir() { if !fi.IsDir() {
continue continue
} }
tasks, err := r.loadContainers(ctx, fi.Name()) tasks, err := r.loadTasks(ctx, fi.Name())
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -185,12 +289,16 @@ func (r *Runtime) Tasks(ctx context.Context) ([]plugin.Task, error) {
return o, nil return o, nil
} }
func (r *Runtime) loadContainers(ctx context.Context, ns string) ([]plugin.Task, error) { func (r *Runtime) Get(ctx context.Context, id string) (plugin.Task, error) {
return r.tasks.get(ctx, id)
}
func (r *Runtime) loadTasks(ctx context.Context, ns string) ([]*Task, error) {
dir, err := ioutil.ReadDir(filepath.Join(r.root, ns)) dir, err := ioutil.ReadDir(filepath.Join(r.root, ns))
if err != nil { if err != nil {
return nil, err return nil, err
} }
var o []plugin.Task var o []*Task
for _, fi := range dir { for _, fi := range dir {
if !fi.IsDir() { if !fi.IsDir() {
continue continue
@ -198,7 +306,7 @@ func (r *Runtime) loadContainers(ctx context.Context, ns string) ([]plugin.Task,
id := fi.Name() id := fi.Name()
// TODO: optimize this if it is call frequently to list all containers // TODO: optimize this if it is call frequently to list all containers
// i.e. dont' reconnect to the the shim's ever time // i.e. dont' reconnect to the the shim's ever time
c, err := r.loadContainer(ctx, filepath.Join(r.root, ns, id)) c, err := r.loadTask(ns, filepath.Join(r.root, ns, id))
if err != nil { if err != nil {
log.G(ctx).WithError(err).Warnf("failed to load container %s/%s", ns, id) log.G(ctx).WithError(err).Warnf("failed to load container %s/%s", ns, id)
// if we fail to load the container, connect to the shim, make sure if the shim has // if we fail to load the container, connect to the shim, make sure if the shim has
@ -283,11 +391,7 @@ func (r *Runtime) deleteBundle(namespace, id string) error {
return os.RemoveAll(filepath.Join(r.root, namespace, id)) return os.RemoveAll(filepath.Join(r.root, namespace, id))
} }
func (r *Runtime) loadContainer(ctx context.Context, path string) (*Task, error) { func (r *Runtime) loadTask(namespace, path string) (*Task, error) {
namespace, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return nil, err
}
id := filepath.Base(path) id := filepath.Base(path)
s, err := loadShim(path, namespace, r.remote) s, err := loadShim(path, namespace, r.remote)
if err != nil { if err != nil {
@ -304,6 +408,7 @@ func (r *Runtime) loadContainer(ctx context.Context, path string) (*Task, error)
containerID: id, containerID: id,
shim: s, shim: s,
spec: data, spec: data,
namespace: namespace,
}, nil }, nil
} }

View File

@ -16,13 +16,15 @@ type Task struct {
containerID string containerID string
spec []byte spec []byte
shim shim.ShimClient shim shim.ShimClient
namespace string
} }
func newTask(id string, spec []byte, shim shim.ShimClient) *Task { func newTask(id, namespace string, spec []byte, shim shim.ShimClient) *Task {
return &Task{ return &Task{
containerID: id, containerID: id,
shim: shim, shim: shim,
spec: spec, spec: spec,
namespace: namespace,
} }
} }
@ -32,6 +34,7 @@ func (c *Task) Info() plugin.TaskInfo {
ContainerID: c.containerID, ContainerID: c.containerID,
Runtime: runtimeName, Runtime: runtimeName,
Spec: c.spec, Spec: c.spec,
Namespace: c.namespace,
} }
} }

View File

@ -3,6 +3,7 @@
package cgroups package cgroups
import ( import (
"fmt"
"time" "time"
"github.com/containerd/cgroups" "github.com/containerd/cgroups"
@ -45,8 +46,12 @@ type cgroupsMonitor struct {
events chan<- *plugin.Event events chan<- *plugin.Event
} }
func getID(t plugin.Task) string {
return fmt.Sprintf("%s-%s", t.Info().Namespace, t.Info().ID)
}
func (m *cgroupsMonitor) Monitor(c plugin.Task) error { func (m *cgroupsMonitor) Monitor(c plugin.Task) error {
id := c.Info().ID id := getID(c)
state, err := c.State(m.context) state, err := c.State(m.context)
if err != nil { if err != nil {
return err return err
@ -62,7 +67,7 @@ func (m *cgroupsMonitor) Monitor(c plugin.Task) error {
} }
func (m *cgroupsMonitor) Stop(c plugin.Task) error { func (m *cgroupsMonitor) Stop(c plugin.Task) error {
m.collector.Remove(c.Info().ID) m.collector.Remove(getID(c))
return nil return nil
} }

View File

@ -7,6 +7,7 @@ type TaskInfo struct {
ContainerID string ContainerID string
Runtime string Runtime string
Spec []byte Spec []byte
Namespace string
} }
type Task interface { type Task interface {

View File

@ -34,6 +34,8 @@ type Exit struct {
type Runtime interface { type Runtime interface {
// Create creates a container with the provided id and options // Create creates a container with the provided id and options
Create(ctx context.Context, id string, opts CreateOpts) (Task, error) Create(ctx context.Context, id string, opts CreateOpts) (Task, error)
// Get returns a container
Get(context.Context, string) (Task, error)
// Containers returns all the current containers for the runtime // Containers returns all the current containers for the runtime
Tasks(context.Context) ([]Task, error) Tasks(context.Context) ([]Task, error)
// Delete removes the container in the runtime // Delete removes the container in the runtime

View File

@ -7,7 +7,6 @@ import (
"io/ioutil" "io/ioutil"
"os" "os"
"path/filepath" "path/filepath"
"sync"
"github.com/boltdb/bolt" "github.com/boltdb/bolt"
api "github.com/containerd/containerd/api/services/execution" api "github.com/containerd/containerd/api/services/execution"
@ -48,7 +47,6 @@ func New(ic *plugin.InitContext) (interface{}, error) {
} }
return &Service{ return &Service{
runtimes: ic.Runtimes, runtimes: ic.Runtimes,
tasks: make(map[string]plugin.Task),
db: ic.Meta, db: ic.Meta,
collector: c, collector: c,
store: ic.Content, store: ic.Content,
@ -56,10 +54,7 @@ func New(ic *plugin.InitContext) (interface{}, error) {
} }
type Service struct { type Service struct {
mu sync.Mutex
runtimes map[string]plugin.Runtime runtimes map[string]plugin.Runtime
tasks map[string]plugin.Task
db *bolt.DB db *bolt.DB
collector *collector collector *collector
store content.Store store content.Store
@ -67,16 +62,6 @@ type Service struct {
func (s *Service) Register(server *grpc.Server) error { func (s *Service) Register(server *grpc.Server) error {
api.RegisterTasksServer(server, s) api.RegisterTasksServer(server, s)
// load all tasks
for _, r := range s.runtimes {
tasks, err := r.Tasks(context.Background())
if err != nil {
return err
}
for _, c := range tasks {
s.tasks[c.Info().ContainerID] = c
}
}
return nil return nil
} }
@ -142,18 +127,10 @@ func (s *Service) Create(ctx context.Context, r *api.CreateRequest) (*api.Create
if err != nil { if err != nil {
return nil, err return nil, err
} }
s.mu.Lock()
if _, ok := s.tasks[r.ContainerID]; ok {
s.mu.Unlock()
return nil, grpc.Errorf(codes.AlreadyExists, "task %v already exists", r.ContainerID)
}
c, err := runtime.Create(ctx, r.ContainerID, opts) c, err := runtime.Create(ctx, r.ContainerID, opts)
if err != nil { if err != nil {
s.mu.Unlock()
return nil, err return nil, err
} }
s.tasks[r.ContainerID] = c
s.mu.Unlock()
state, err := c.State(ctx) state, err := c.State(ctx)
if err != nil { if err != nil {
log.G(ctx).Error(err) log.G(ctx).Error(err)
@ -165,7 +142,7 @@ func (s *Service) Create(ctx context.Context, r *api.CreateRequest) (*api.Create
} }
func (s *Service) Start(ctx context.Context, r *api.StartRequest) (*google_protobuf.Empty, error) { func (s *Service) Start(ctx context.Context, r *api.StartRequest) (*google_protobuf.Empty, error) {
c, err := s.getTask(r.ContainerID) c, err := s.getTask(ctx, r.ContainerID)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -176,7 +153,7 @@ func (s *Service) Start(ctx context.Context, r *api.StartRequest) (*google_proto
} }
func (s *Service) Delete(ctx context.Context, r *api.DeleteRequest) (*api.DeleteResponse, error) { func (s *Service) Delete(ctx context.Context, r *api.DeleteRequest) (*api.DeleteResponse, error) {
c, err := s.getTask(r.ContainerID) c, err := s.getTask(ctx, r.ContainerID)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -188,9 +165,6 @@ func (s *Service) Delete(ctx context.Context, r *api.DeleteRequest) (*api.Delete
if err != nil { if err != nil {
return nil, err return nil, err
} }
delete(s.tasks, r.ContainerID)
return &api.DeleteResponse{ return &api.DeleteResponse{
ExitStatus: exit.Status, ExitStatus: exit.Status,
ExitedAt: exit.Timestamp, ExitedAt: exit.Timestamp,
@ -233,7 +207,7 @@ func taskFromContainerd(ctx context.Context, c plugin.Task) (*task.Task, error)
} }
func (s *Service) Info(ctx context.Context, r *api.InfoRequest) (*api.InfoResponse, error) { func (s *Service) Info(ctx context.Context, r *api.InfoRequest) (*api.InfoResponse, error) {
task, err := s.getTask(r.ContainerID) task, err := s.getTask(ctx, r.ContainerID)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -248,20 +222,24 @@ func (s *Service) Info(ctx context.Context, r *api.InfoRequest) (*api.InfoRespon
func (s *Service) List(ctx context.Context, r *api.ListRequest) (*api.ListResponse, error) { func (s *Service) List(ctx context.Context, r *api.ListRequest) (*api.ListResponse, error) {
resp := &api.ListResponse{} resp := &api.ListResponse{}
s.mu.Lock() for _, r := range s.runtimes {
defer s.mu.Unlock() tasks, err := r.Tasks(ctx)
for _, cd := range s.tasks {
c, err := taskFromContainerd(ctx, cd)
if err != nil { if err != nil {
return nil, err return nil, err
} }
resp.Tasks = append(resp.Tasks, c) for _, t := range tasks {
tt, err := taskFromContainerd(ctx, t)
if err != nil {
return nil, err
}
resp.Tasks = append(resp.Tasks, tt)
}
} }
return resp, nil return resp, nil
} }
func (s *Service) Pause(ctx context.Context, r *api.PauseRequest) (*google_protobuf.Empty, error) { func (s *Service) Pause(ctx context.Context, r *api.PauseRequest) (*google_protobuf.Empty, error) {
c, err := s.getTask(r.ContainerID) c, err := s.getTask(ctx, r.ContainerID)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -273,7 +251,7 @@ func (s *Service) Pause(ctx context.Context, r *api.PauseRequest) (*google_proto
} }
func (s *Service) Resume(ctx context.Context, r *api.ResumeRequest) (*google_protobuf.Empty, error) { func (s *Service) Resume(ctx context.Context, r *api.ResumeRequest) (*google_protobuf.Empty, error) {
c, err := s.getTask(r.ContainerID) c, err := s.getTask(ctx, r.ContainerID)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -285,7 +263,7 @@ func (s *Service) Resume(ctx context.Context, r *api.ResumeRequest) (*google_pro
} }
func (s *Service) Kill(ctx context.Context, r *api.KillRequest) (*google_protobuf.Empty, error) { func (s *Service) Kill(ctx context.Context, r *api.KillRequest) (*google_protobuf.Empty, error) {
c, err := s.getTask(r.ContainerID) c, err := s.getTask(ctx, r.ContainerID)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -306,7 +284,7 @@ func (s *Service) Kill(ctx context.Context, r *api.KillRequest) (*google_protobu
} }
func (s *Service) Processes(ctx context.Context, r *api.ProcessesRequest) (*api.ProcessesResponse, error) { func (s *Service) Processes(ctx context.Context, r *api.ProcessesRequest) (*api.ProcessesResponse, error) {
c, err := s.getTask(r.ContainerID) c, err := s.getTask(ctx, r.ContainerID)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -338,7 +316,7 @@ func (s *Service) Events(r *api.EventsRequest, server api.Tasks_EventsServer) er
} }
func (s *Service) Exec(ctx context.Context, r *api.ExecRequest) (*api.ExecResponse, error) { func (s *Service) Exec(ctx context.Context, r *api.ExecRequest) (*api.ExecResponse, error) {
c, err := s.getTask(r.ContainerID) c, err := s.getTask(ctx, r.ContainerID)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -364,7 +342,7 @@ func (s *Service) Exec(ctx context.Context, r *api.ExecRequest) (*api.ExecRespon
} }
func (s *Service) Pty(ctx context.Context, r *api.PtyRequest) (*google_protobuf.Empty, error) { func (s *Service) Pty(ctx context.Context, r *api.PtyRequest) (*google_protobuf.Empty, error) {
c, err := s.getTask(r.ContainerID) c, err := s.getTask(ctx, r.ContainerID)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -378,7 +356,7 @@ func (s *Service) Pty(ctx context.Context, r *api.PtyRequest) (*google_protobuf.
} }
func (s *Service) CloseStdin(ctx context.Context, r *api.CloseStdinRequest) (*google_protobuf.Empty, error) { func (s *Service) CloseStdin(ctx context.Context, r *api.CloseStdinRequest) (*google_protobuf.Empty, error) {
c, err := s.getTask(r.ContainerID) c, err := s.getTask(ctx, r.ContainerID)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -389,7 +367,7 @@ func (s *Service) CloseStdin(ctx context.Context, r *api.CloseStdinRequest) (*go
} }
func (s *Service) Checkpoint(ctx context.Context, r *api.CheckpointRequest) (*api.CheckpointResponse, error) { func (s *Service) Checkpoint(ctx context.Context, r *api.CheckpointRequest) (*api.CheckpointResponse, error) {
c, err := s.getTask(r.ContainerID) c, err := s.getTask(ctx, r.ContainerID)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -454,14 +432,14 @@ func (s *Service) writeContent(ctx context.Context, mediaType, ref string, r io.
}, nil }, nil
} }
func (s *Service) getTask(id string) (plugin.Task, error) { func (s *Service) getTask(ctx context.Context, id string) (plugin.Task, error) {
s.mu.Lock() for _, r := range s.runtimes {
c, ok := s.tasks[id] t, err := r.Get(ctx, id)
s.mu.Unlock() if err == nil {
if !ok { return t, nil
return nil, grpc.Errorf(codes.NotFound, "task %v not found", id) }
} }
return c, nil return nil, grpc.Errorf(codes.NotFound, "task %v not found", id)
} }
func (s *Service) getRuntime(name string) (plugin.Runtime, error) { func (s *Service) getRuntime(name string) (plugin.Runtime, error) {

View File

@ -155,6 +155,7 @@ func createDefaultSpec() (*specs.Spec, error) {
}, },
}, },
Linux: &specs.Linux{ Linux: &specs.Linux{
// TODO (@crosbymichael) make sure we don't have have two containers in the same cgroup
Resources: &specs.LinuxResources{ Resources: &specs.LinuxResources{
Devices: []specs.LinuxDeviceCgroup{ Devices: []specs.LinuxDeviceCgroup{
{ {

View File

@ -34,7 +34,6 @@ func init() {
} }
func New(ic *plugin.InitContext) (interface{}, error) { func New(ic *plugin.InitContext) (interface{}, error) {
rootDir := filepath.Join(ic.Root, runtimeName) rootDir := filepath.Join(ic.Root, runtimeName)
if err := os.MkdirAll(rootDir, 0755); err != nil { if err := os.MkdirAll(rootDir, 0755); err != nil {
return nil, errors.Wrapf(err, "could not create state directory at %s", rootDir) return nil, errors.Wrapf(err, "could not create state directory at %s", rootDir)
@ -152,10 +151,19 @@ func (r *Runtime) Tasks(ctx context.Context) ([]plugin.Task, error) {
list = append(list, c) list = append(list, c)
} }
} }
return list, nil return list, nil
} }
func (r *Runtime) Get(ctx context.Context, id string) (plugin.Task, error) {
r.Lock()
defer r.Unlock()
c, ok := r.containers[id]
if !ok {
return nil, fmt.Errorf("container %s does not exit", id)
}
return c, nil
}
func (r *Runtime) Events(ctx context.Context) <-chan *plugin.Event { func (r *Runtime) Events(ctx context.Context) <-chan *plugin.Event {
return r.events return r.events
} }