Move shim protos into linux pkg

This moves the shim's API and protos out of the containerd services
package and into the linux runtime package. This is because the shim is
an implementation detail of the linux runtime that we have and it is not
a containerd user facing api.

Signed-off-by: Michael Crosby <crosbymichael@gmail.com>
This commit is contained in:
Michael Crosby
2017-06-22 14:29:49 -07:00
parent 6ad3ba739e
commit 990536f2cc
26 changed files with 1838 additions and 2266 deletions

View File

@@ -3,29 +3,29 @@
package linux
import (
"bytes"
"context"
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
"strings"
"sync"
"time"
"google.golang.org/grpc"
"github.com/boltdb/bolt"
eventsapi "github.com/containerd/containerd/api/services/events/v1"
"github.com/containerd/containerd/api/services/shim/v1"
"github.com/containerd/containerd/api/types"
"github.com/containerd/containerd/api/types/task"
"github.com/containerd/containerd/containers"
"github.com/containerd/containerd/events"
shimb "github.com/containerd/containerd/linux/shim"
client "github.com/containerd/containerd/linux/shim"
shim "github.com/containerd/containerd/linux/shim/v1"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/metadata"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/plugin"
runc "github.com/containerd/go-runc"
google_protobuf "github.com/golang/protobuf/ptypes/empty"
"github.com/pkg/errors"
"golang.org/x/sys/unix"
@@ -35,6 +35,7 @@ var (
ErrTaskNotExists = errors.New("task does not exist")
ErrTaskAlreadyExists = errors.New("task already exists")
pluginID = fmt.Sprintf("%s.%s", plugin.RuntimePlugin, "linux")
empty = &google_protobuf.Empty{}
)
const (
@@ -50,6 +51,7 @@ func init() {
Init: New,
Requires: []plugin.PluginType{
plugin.TaskMonitorPlugin,
plugin.MetadataPlugin,
},
Config: &Config{
Shim: defaultShim,
@@ -69,71 +71,6 @@ type Config struct {
NoShim bool `toml:"no_shim,omitempty"`
}
func newTaskList() *taskList {
return &taskList{
tasks: make(map[string]map[string]*Task),
}
}
type taskList struct {
mu sync.Mutex
tasks map[string]map[string]*Task
}
func (l *taskList) get(ctx context.Context, id string) (*Task, error) {
l.mu.Lock()
defer l.mu.Unlock()
namespace, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return nil, err
}
tasks, ok := l.tasks[namespace]
if !ok {
return nil, ErrTaskNotExists
}
t, ok := tasks[id]
if !ok {
return nil, ErrTaskNotExists
}
return t, nil
}
func (l *taskList) add(ctx context.Context, t *Task) error {
namespace, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return err
}
return l.addWithNamespace(namespace, t)
}
func (l *taskList) addWithNamespace(namespace string, t *Task) error {
l.mu.Lock()
defer l.mu.Unlock()
id := t.containerID
if _, ok := l.tasks[namespace]; !ok {
l.tasks[namespace] = make(map[string]*Task)
}
if _, ok := l.tasks[namespace][id]; ok {
return ErrTaskAlreadyExists
}
l.tasks[namespace][id] = t
return nil
}
func (l *taskList) delete(ctx context.Context, t *Task) {
l.mu.Lock()
defer l.mu.Unlock()
namespace, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return
}
tasks, ok := l.tasks[namespace]
if ok {
delete(tasks, t.containerID)
}
}
func New(ic *plugin.InitContext) (interface{}, error) {
if err := os.MkdirAll(ic.Root, 0700); err != nil {
return nil, err
@@ -142,6 +79,10 @@ func New(ic *plugin.InitContext) (interface{}, error) {
if err != nil {
return nil, err
}
m, err := ic.Get(plugin.MetadataPlugin)
if err != nil {
return nil, err
}
cfg := ic.Config.(*Config)
c, cancel := context.WithCancel(ic.Context)
r := &Runtime{
@@ -155,10 +96,11 @@ func New(ic *plugin.InitContext) (interface{}, error) {
monitor: monitor.(plugin.TaskMonitor),
tasks: newTaskList(),
emitter: events.GetPoster(ic.Context),
db: m.(*bolt.DB),
}
// set the events output for a monitor if it generates events
r.monitor.Events(r.events)
tasks, err := r.loadAllTasks(ic.Context)
tasks, err := r.restoreTasks(ic.Context)
if err != nil {
return nil, err
}
@@ -166,6 +108,9 @@ func New(ic *plugin.InitContext) (interface{}, error) {
if err := r.tasks.addWithNamespace(t.namespace, t); err != nil {
return nil, err
}
if err := r.handleEvents(ic.Context, t.shim); err != nil {
return nil, err
}
}
return r, nil
}
@@ -182,39 +127,44 @@ type Runtime struct {
monitor plugin.TaskMonitor
tasks *taskList
emitter events.Poster
db *bolt.DB
}
func (r *Runtime) ID() string {
return pluginID
}
func (r *Runtime) Create(ctx context.Context, id string, opts plugin.CreateOpts) (t plugin.Task, err error) {
func (r *Runtime) Create(ctx context.Context, id string, opts plugin.CreateOpts) (_ plugin.Task, err error) {
namespace, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return nil, err
}
path, err := r.newBundle(namespace, id, opts.Spec)
bundle, err := newBundle(filepath.Join(r.root, namespace), namespace, id, opts.Spec)
if err != nil {
return nil, err
}
s, err := newShim(ctx, r.shim, path, namespace, r.remote)
if err != nil {
os.RemoveAll(path)
return nil, err
}
// Exit the shim on error
defer func() {
if err != nil {
s.Exit(context.Background(), &shim.ExitRequest{})
bundle.Delete()
}
}()
s, err := bundle.NewShim(ctx, r.shim, r.remote)
if err != nil {
return nil, err
}
defer func() {
if err != nil {
if kerr := s.KillShim(ctx); kerr != nil {
log.G(ctx).WithError(err).Error("failed to kill shim")
}
}
}()
if err = r.handleEvents(ctx, s); err != nil {
os.RemoveAll(path)
return nil, err
}
sopts := &shim.CreateRequest{
sopts := &shim.CreateTaskRequest{
ID: id,
Bundle: path,
Bundle: bundle.path,
Runtime: r.runtime,
Stdin: opts.IO.Stdin,
Stdout: opts.IO.Stdout,
@@ -230,15 +180,14 @@ func (r *Runtime) Create(ctx context.Context, id string, opts plugin.CreateOpts)
})
}
if _, err = s.Create(ctx, sopts); err != nil {
os.RemoveAll(path)
return nil, errors.New(grpc.ErrorDesc(err))
}
c := newTask(id, namespace, opts.Spec, s)
if err := r.tasks.add(ctx, c); err != nil {
t := newTask(id, namespace, opts.Spec, s)
if err := r.tasks.add(ctx, t); err != nil {
return nil, err
}
// after the task is created, add it to the monitor
if err = r.monitor.Monitor(c); err != nil {
if err = r.monitor.Monitor(t); err != nil {
return nil, err
}
@@ -252,7 +201,7 @@ func (r *Runtime) Create(ctx context.Context, id string, opts plugin.CreateOpts)
}
if err := r.emit(ctx, "/runtime/create", &eventsapi.RuntimeCreate{
ID: id,
Bundle: path,
Bundle: bundle.path,
RootFS: runtimeMounts,
IO: &eventsapi.RuntimeIO{
Stdin: opts.IO.Stdin,
@@ -264,8 +213,7 @@ func (r *Runtime) Create(ctx context.Context, id string, opts plugin.CreateOpts)
}); err != nil {
return nil, err
}
return c, nil
return t, nil
}
func (r *Runtime) Delete(ctx context.Context, c plugin.Task) (*plugin.Exit, error) {
@@ -275,22 +223,25 @@ func (r *Runtime) Delete(ctx context.Context, c plugin.Task) (*plugin.Exit, erro
}
lc, ok := c.(*Task)
if !ok {
return nil, fmt.Errorf("container cannot be cast as *linux.Container")
return nil, fmt.Errorf("task cannot be cast as *linux.Task")
}
// remove the container from the monitor
if err := r.monitor.Stop(lc); err != nil {
// TODO: log error here
return nil, err
}
rsp, err := lc.shim.Delete(ctx, &shim.DeleteRequest{})
rsp, err := lc.shim.Delete(ctx, empty)
if err != nil {
return nil, errors.New(grpc.ErrorDesc(err))
}
lc.shim.Exit(ctx, &shim.ExitRequest{})
if err := lc.shim.KillShim(ctx); err != nil {
log.G(ctx).WithError(err).Error("failed to kill shim")
}
r.tasks.delete(ctx, lc)
i := c.Info()
if err := r.emit(ctx, "/runtime/delete", &eventsapi.RuntimeDelete{
var (
bundle = loadBundle(filepath.Join(r.root, namespace, lc.containerID), namespace)
i = c.Info()
)
if err := r.emit(ctx, "/runtime/delete", eventsapi.RuntimeDelete{
ID: i.ID,
Runtime: i.Runtime,
ExitStatus: rsp.ExitStatus,
@@ -302,7 +253,7 @@ func (r *Runtime) Delete(ctx context.Context, c plugin.Task) (*plugin.Exit, erro
Status: rsp.ExitStatus,
Timestamp: rsp.ExitedAt,
Pid: rsp.Pid,
}, r.deleteBundle(namespace, lc.containerID)
}, bundle.Delete()
}
func (r *Runtime) Tasks(ctx context.Context) ([]plugin.Task, error) {
@@ -321,17 +272,19 @@ func (r *Runtime) Tasks(ctx context.Context) ([]plugin.Task, error) {
return o, nil
}
func (r *Runtime) loadAllTasks(ctx context.Context) ([]*Task, error) {
func (r *Runtime) restoreTasks(ctx context.Context) ([]*Task, error) {
dir, err := ioutil.ReadDir(r.root)
if err != nil {
return nil, err
}
var o []*Task
for _, fi := range dir {
if !fi.IsDir() {
for _, namespace := range dir {
if !namespace.IsDir() {
continue
}
tasks, err := r.loadTasks(ctx, fi.Name())
name := namespace.Name()
log.G(ctx).WithField("namespace", name).Debug("loading tasks in namespace")
tasks, err := r.loadTasks(ctx, name)
if err != nil {
return nil, err
}
@@ -350,28 +303,41 @@ func (r *Runtime) loadTasks(ctx context.Context, ns string) ([]*Task, error) {
return nil, err
}
var o []*Task
for _, fi := range dir {
if !fi.IsDir() {
for _, path := range dir {
if !path.IsDir() {
continue
}
id := fi.Name()
// TODO: optimize this if it is call frequently to list all containers
// i.e. dont' reconnect to the the shim's ever time
c, err := r.loadTask(ctx, ns, filepath.Join(r.root, ns, id))
id := path.Name()
bundle := loadBundle(filepath.Join(r.root, ns, id), ns)
s, err := bundle.Connect(ctx, r.remote)
if err != nil {
log.G(ctx).WithError(err).Warnf("failed to load container %s/%s", ns, id)
// if we fail to load the container, connect to the shim, make sure if the shim has
// been killed and cleanup the resources still being held by the container
r.killContainer(ctx, ns, id)
log.G(ctx).WithError(err).Error("connecting to shim")
if err := r.terminate(ctx, bundle, ns, id); err != nil {
log.G(ctx).WithError(err).WithField("bundle", bundle.path).Error("failed to terminate task, leaving bundle for debugging")
continue
}
if err := bundle.Delete(); err != nil {
log.G(ctx).WithError(err).Error("delete bundle")
}
continue
}
o = append(o, c)
spec, err := bundle.Spec()
if err != nil {
log.G(ctx).WithError(err).Error("load task spec")
}
o = append(o, &Task{
containerID: id,
shim: s,
spec: spec,
namespace: ns,
})
}
return o, nil
}
func (r *Runtime) handleEvents(ctx context.Context, s shim.ShimClient) error {
events, err := s.Events(r.eventsContext, &shim.EventsRequest{})
func (r *Runtime) handleEvents(ctx context.Context, s *client.Client) error {
events, err := s.Stream(r.eventsContext, &shim.StreamEventsRequest{})
if err != nil {
return err
}
@@ -379,7 +345,7 @@ func (r *Runtime) handleEvents(ctx context.Context, s shim.ShimClient) error {
return nil
}
func (r *Runtime) forward(ctx context.Context, events shim.Shim_EventsClient) {
func (r *Runtime) forward(ctx context.Context, events shim.Shim_StreamClient) {
for {
e, err := events.Recv()
if err != nil {
@@ -391,19 +357,19 @@ func (r *Runtime) forward(ctx context.Context, events shim.Shim_EventsClient) {
topic := ""
var et plugin.EventType
switch e.Type {
case task.Event_CREATE:
case shim.Event_CREATE:
topic = "task-create"
et = plugin.CreateEvent
case task.Event_START:
case shim.Event_START:
topic = "task-start"
et = plugin.StartEvent
case task.Event_EXEC_ADDED:
case shim.Event_EXEC_ADDED:
topic = "task-execadded"
et = plugin.ExecAddEvent
case task.Event_OOM:
case shim.Event_OOM:
topic = "task-oom"
et = plugin.OOMEvent
case task.Event_EXIT:
case shim.Event_EXIT:
topic = "task-exit"
et = plugin.ExitEvent
}
@@ -418,7 +384,7 @@ func (r *Runtime) forward(ctx context.Context, events shim.Shim_EventsClient) {
}
if err := r.emit(ctx, "/runtime/"+topic, &eventsapi.RuntimeEvent{
ID: e.ID,
Type: e.Type,
Type: eventsapi.RuntimeEvent_EventType(e.Type),
Pid: e.Pid,
ExitStatus: e.ExitStatus,
ExitedAt: e.ExitedAt,
@@ -428,89 +394,51 @@ func (r *Runtime) forward(ctx context.Context, events shim.Shim_EventsClient) {
}
}
func (r *Runtime) newBundle(namespace, id string, spec []byte) (string, error) {
path := filepath.Join(r.root, namespace)
if err := os.MkdirAll(path, 0700); err != nil {
return "", err
}
path = filepath.Join(path, id)
if err := os.Mkdir(path, 0700); err != nil {
return "", err
}
if err := os.Mkdir(filepath.Join(path, "rootfs"), 0700); err != nil {
return "", err
}
f, err := os.Create(filepath.Join(path, configFilename))
func (r *Runtime) terminate(ctx context.Context, bundle *bundle, ns, id string) error {
ctx = namespaces.WithNamespace(ctx, ns)
rt, err := r.getRuntime(ctx, ns, id)
if err != nil {
return "", err
return err
}
defer f.Close()
_, err = io.Copy(f, bytes.NewReader(spec))
return path, err
}
func (r *Runtime) deleteBundle(namespace, id string) error {
return os.RemoveAll(filepath.Join(r.root, namespace, id))
}
func (r *Runtime) loadTask(ctx context.Context, namespace, path string) (*Task, error) {
id := filepath.Base(path)
s, err := loadShim(path, namespace, r.remote)
if err != nil {
return nil, err
}
if err = r.handleEvents(ctx, s); err != nil {
return nil, err
}
data, err := ioutil.ReadFile(filepath.Join(path, configFilename))
if err != nil {
return nil, err
}
return &Task{
containerID: id,
shim: s,
spec: data,
namespace: namespace,
}, nil
}
// killContainer is used whenever the runtime fails to connect to a shim (it died)
// and needs to cleanup the container resources in the underlying runtime (runc, etc...)
func (r *Runtime) killContainer(ctx context.Context, ns, id string) {
log.G(ctx).Debug("terminating container after failed load")
runtime := &runc.Runc{
// TODO: should we get Command provided for initial container creation?
Command: r.runtime,
LogFormat: runc.JSON,
PdeathSignal: unix.SIGKILL,
Root: filepath.Join(shimb.RuncRoot, ns),
}
if err := runtime.Kill(ctx, id, int(unix.SIGKILL), &runc.KillOpts{
All: true,
}); err != nil {
if err := rt.Kill(ctx, id, int(unix.SIGKILL), &runc.KillOpts{All: true}); err != nil {
log.G(ctx).WithError(err).Warnf("kill all processes for %s", id)
}
// it can take a while for the container to be killed so poll for the container's status
// until it is in a stopped state
status := "running"
for status != "stopped" {
c, err := runtime.State(ctx, id)
c, err := rt.State(ctx, id)
if err != nil {
break
}
status = c.Status
time.Sleep(10 * time.Millisecond)
time.Sleep(50 * time.Millisecond)
}
if err := runtime.Delete(ctx, id); err != nil {
log.G(ctx).WithError(err).Warnf("delete container %s", id)
if err := rt.Delete(ctx, id); err != nil {
log.G(ctx).WithError(err).Warnf("delete runtime state %s", id)
}
// try to unmount the rootfs in case it was not owned by an external mount namespace
unix.Unmount(filepath.Join(r.root, ns, id, "rootfs"), 0)
// remove container bundle
if err := r.deleteBundle(ns, id); err != nil {
log.G(ctx).WithError(err).Warnf("delete container bundle %s", id)
if err := unix.Unmount(filepath.Join(bundle.path, "rootfs"), 0); err != nil {
log.G(ctx).WithError(err).Warnf("unmount task rootfs %s", id)
}
return nil
}
func (r *Runtime) getRuntime(ctx context.Context, ns, id string) (*runc.Runc, error) {
var c containers.Container
if err := r.db.View(func(tx *bolt.Tx) error {
store := metadata.NewContainerStore(tx)
var err error
c, err = store.Get(ctx, id)
return err
}); err != nil {
return nil, err
}
return &runc.Runc{
Command: c.Runtime.Name,
LogFormat: runc.JSON,
PdeathSignal: unix.SIGKILL,
Root: filepath.Join(client.RuncRoot, ns),
}, nil
}
func (r *Runtime) emit(ctx context.Context, topic string, evt interface{}) error {