containerd/linux/runtime.go
Stephen J Day c05be46348
events: move types into service package
When using events, it was found to be fairly unwieldy with a number of
extra packages. For the most part, when interacting with the events
service, we want types of the same version of the service. This has been
accomplished by moving all events types into the events package.

In addition, several fixes to the way events are marshaled have been
included. Specifically, we defer to the protobuf type registration
system to assemble events and type urls, with a little bit sheen on top
of add a containerd.io oriented namespace.

This has resulted in much cleaner event consumption and has removed the
reliance on error prone type urls, in favor of concrete types.

Signed-off-by: Stephen J Day <stephen.day@docker.com>
2017-06-22 19:12:25 -07:00

524 lines
13 KiB
Go

// +build linux
package linux
import (
"bytes"
"context"
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
"strings"
"sync"
"time"
"google.golang.org/grpc"
eventsapi "github.com/containerd/containerd/api/services/events/v1"
"github.com/containerd/containerd/api/services/shim/v1"
"github.com/containerd/containerd/api/types/mount"
"github.com/containerd/containerd/api/types/task"
"github.com/containerd/containerd/events"
shimb "github.com/containerd/containerd/linux/shim"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/plugin"
runc "github.com/containerd/go-runc"
"github.com/pkg/errors"
"golang.org/x/sys/unix"
)
var (
ErrTaskNotExists = errors.New("task does not exist")
ErrTaskAlreadyExists = errors.New("task already exists")
pluginID = fmt.Sprintf("%s.%s", plugin.RuntimePlugin, "linux")
)
const (
configFilename = "config.json"
defaultRuntime = "runc"
defaultShim = "containerd-shim"
)
func init() {
plugin.Register(&plugin.Registration{
Type: plugin.RuntimePlugin,
ID: "linux",
Init: New,
Requires: []plugin.PluginType{
plugin.TaskMonitorPlugin,
},
Config: &Config{
Shim: defaultShim,
Runtime: defaultRuntime,
},
})
}
var _ = (plugin.Runtime)(&Runtime{})
type Config struct {
// Shim is a path or name of binary implementing the Shim GRPC API
Shim string `toml:"shim,omitempty"`
// Runtime is a path or name of an OCI runtime used by the shim
Runtime string `toml:"runtime,omitempty"`
// NoShim calls runc directly from within the pkg
NoShim bool `toml:"no_shim,omitempty"`
}
func newTaskList() *taskList {
return &taskList{
tasks: make(map[string]map[string]*Task),
}
}
type taskList struct {
mu sync.Mutex
tasks map[string]map[string]*Task
}
func (l *taskList) get(ctx context.Context, id string) (*Task, error) {
l.mu.Lock()
defer l.mu.Unlock()
namespace, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return nil, err
}
tasks, ok := l.tasks[namespace]
if !ok {
return nil, ErrTaskNotExists
}
t, ok := tasks[id]
if !ok {
return nil, ErrTaskNotExists
}
return t, nil
}
func (l *taskList) add(ctx context.Context, t *Task) error {
namespace, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return err
}
return l.addWithNamespace(namespace, t)
}
func (l *taskList) addWithNamespace(namespace string, t *Task) error {
l.mu.Lock()
defer l.mu.Unlock()
id := t.containerID
if _, ok := l.tasks[namespace]; !ok {
l.tasks[namespace] = make(map[string]*Task)
}
if _, ok := l.tasks[namespace][id]; ok {
return ErrTaskAlreadyExists
}
l.tasks[namespace][id] = t
return nil
}
func (l *taskList) delete(ctx context.Context, t *Task) {
l.mu.Lock()
defer l.mu.Unlock()
namespace, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return
}
tasks, ok := l.tasks[namespace]
if ok {
delete(tasks, t.containerID)
}
}
func New(ic *plugin.InitContext) (interface{}, error) {
if err := os.MkdirAll(ic.Root, 0700); err != nil {
return nil, err
}
monitor, err := ic.Get(plugin.TaskMonitorPlugin)
if err != nil {
return nil, err
}
cfg := ic.Config.(*Config)
c, cancel := context.WithCancel(ic.Context)
r := &Runtime{
root: ic.Root,
remote: !cfg.NoShim,
shim: cfg.Shim,
runtime: cfg.Runtime,
events: make(chan *plugin.Event, 2048),
eventsContext: c,
eventsCancel: cancel,
monitor: monitor.(plugin.TaskMonitor),
tasks: newTaskList(),
emitter: events.GetPoster(ic.Context),
}
// set the events output for a monitor if it generates events
r.monitor.Events(r.events)
tasks, err := r.loadAllTasks(ic.Context)
if err != nil {
return nil, err
}
for _, t := range tasks {
if err := r.tasks.addWithNamespace(t.namespace, t); err != nil {
return nil, err
}
}
return r, nil
}
type Runtime struct {
root string
shim string
runtime string
remote bool
events chan *plugin.Event
eventsContext context.Context
eventsCancel func()
monitor plugin.TaskMonitor
tasks *taskList
emitter events.Poster
}
func (r *Runtime) ID() string {
return pluginID
}
func (r *Runtime) Create(ctx context.Context, id string, opts plugin.CreateOpts) (t plugin.Task, err error) {
namespace, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return nil, err
}
path, err := r.newBundle(namespace, id, opts.Spec)
if err != nil {
return nil, err
}
s, err := newShim(ctx, r.shim, path, namespace, r.remote)
if err != nil {
os.RemoveAll(path)
return nil, err
}
// Exit the shim on error
defer func() {
if err != nil {
s.Exit(context.Background(), &shim.ExitRequest{})
}
}()
if err = r.handleEvents(ctx, s); err != nil {
os.RemoveAll(path)
return nil, err
}
sopts := &shim.CreateRequest{
ID: id,
Bundle: path,
Runtime: r.runtime,
Stdin: opts.IO.Stdin,
Stdout: opts.IO.Stdout,
Stderr: opts.IO.Stderr,
Terminal: opts.IO.Terminal,
Checkpoint: opts.Checkpoint,
}
for _, m := range opts.Rootfs {
sopts.Rootfs = append(sopts.Rootfs, &mount.Mount{
Type: m.Type,
Source: m.Source,
Options: m.Options,
})
}
if _, err = s.Create(ctx, sopts); err != nil {
os.RemoveAll(path)
return nil, errors.New(grpc.ErrorDesc(err))
}
c := newTask(id, namespace, opts.Spec, s)
if err := r.tasks.add(ctx, c); err != nil {
return nil, err
}
// after the task is created, add it to the monitor
if err = r.monitor.Monitor(c); err != nil {
return nil, err
}
var runtimeMounts []*eventsapi.RuntimeMount
for _, m := range opts.Rootfs {
runtimeMounts = append(runtimeMounts, &eventsapi.RuntimeMount{
Type: m.Type,
Source: m.Source,
Options: m.Options,
})
}
if err := r.emit(ctx, "/runtime/create", &eventsapi.RuntimeCreate{
ID: id,
Bundle: path,
RootFS: runtimeMounts,
IO: &eventsapi.RuntimeIO{
Stdin: opts.IO.Stdin,
Stdout: opts.IO.Stdout,
Stderr: opts.IO.Stderr,
Terminal: opts.IO.Terminal,
},
Checkpoint: opts.Checkpoint,
}); err != nil {
return nil, err
}
return c, nil
}
func (r *Runtime) Delete(ctx context.Context, c plugin.Task) (*plugin.Exit, error) {
namespace, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return nil, err
}
lc, ok := c.(*Task)
if !ok {
return nil, fmt.Errorf("container cannot be cast as *linux.Container")
}
// remove the container from the monitor
if err := r.monitor.Stop(lc); err != nil {
// TODO: log error here
return nil, err
}
rsp, err := lc.shim.Delete(ctx, &shim.DeleteRequest{})
if err != nil {
return nil, errors.New(grpc.ErrorDesc(err))
}
lc.shim.Exit(ctx, &shim.ExitRequest{})
r.tasks.delete(ctx, lc)
i := c.Info()
if err := r.emit(ctx, "/runtime/delete", &eventsapi.RuntimeDelete{
ID: i.ID,
Runtime: i.Runtime,
ExitStatus: rsp.ExitStatus,
ExitedAt: rsp.ExitedAt,
}); err != nil {
return nil, err
}
return &plugin.Exit{
Status: rsp.ExitStatus,
Timestamp: rsp.ExitedAt,
Pid: rsp.Pid,
}, r.deleteBundle(namespace, lc.containerID)
}
func (r *Runtime) Tasks(ctx context.Context) ([]plugin.Task, error) {
namespace, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return nil, err
}
var o []plugin.Task
tasks, ok := r.tasks.tasks[namespace]
if !ok {
return o, nil
}
for _, t := range tasks {
o = append(o, t)
}
return o, nil
}
func (r *Runtime) loadAllTasks(ctx context.Context) ([]*Task, error) {
dir, err := ioutil.ReadDir(r.root)
if err != nil {
return nil, err
}
var o []*Task
for _, fi := range dir {
if !fi.IsDir() {
continue
}
tasks, err := r.loadTasks(ctx, fi.Name())
if err != nil {
return nil, err
}
o = append(o, tasks...)
}
return o, nil
}
func (r *Runtime) Get(ctx context.Context, id string) (plugin.Task, error) {
return r.tasks.get(ctx, id)
}
func (r *Runtime) loadTasks(ctx context.Context, ns string) ([]*Task, error) {
dir, err := ioutil.ReadDir(filepath.Join(r.root, ns))
if err != nil {
return nil, err
}
var o []*Task
for _, fi := range dir {
if !fi.IsDir() {
continue
}
id := fi.Name()
// TODO: optimize this if it is call frequently to list all containers
// i.e. dont' reconnect to the the shim's ever time
c, err := r.loadTask(ctx, ns, filepath.Join(r.root, ns, id))
if err != nil {
log.G(ctx).WithError(err).Warnf("failed to load container %s/%s", ns, id)
// if we fail to load the container, connect to the shim, make sure if the shim has
// been killed and cleanup the resources still being held by the container
r.killContainer(ctx, ns, id)
continue
}
o = append(o, c)
}
return o, nil
}
func (r *Runtime) handleEvents(ctx context.Context, s shim.ShimClient) error {
events, err := s.Events(r.eventsContext, &shim.EventsRequest{})
if err != nil {
return err
}
go r.forward(ctx, events)
return nil
}
func (r *Runtime) forward(ctx context.Context, events shim.Shim_EventsClient) {
for {
e, err := events.Recv()
if err != nil {
if !strings.HasSuffix(err.Error(), "transport is closing") {
log.G(r.eventsContext).WithError(err).Error("get event from shim")
}
return
}
topic := ""
var et plugin.EventType
switch e.Type {
case task.Event_CREATE:
topic = "task-create"
et = plugin.CreateEvent
case task.Event_START:
topic = "task-start"
et = plugin.StartEvent
case task.Event_EXEC_ADDED:
topic = "task-execadded"
et = plugin.ExecAddEvent
case task.Event_OOM:
topic = "task-oom"
et = plugin.OOMEvent
case task.Event_EXIT:
topic = "task-exit"
et = plugin.ExitEvent
}
r.events <- &plugin.Event{
Timestamp: time.Now(),
Runtime: r.ID(),
Type: et,
Pid: e.Pid,
ID: e.ID,
ExitStatus: e.ExitStatus,
ExitedAt: e.ExitedAt,
}
if err := r.emit(ctx, "/runtime/"+topic, &eventsapi.RuntimeEvent{
ID: e.ID,
Type: e.Type,
Pid: e.Pid,
ExitStatus: e.ExitStatus,
ExitedAt: e.ExitedAt,
}); err != nil {
return
}
}
}
func (r *Runtime) newBundle(namespace, id string, spec []byte) (string, error) {
path := filepath.Join(r.root, namespace)
if err := os.MkdirAll(path, 0700); err != nil {
return "", err
}
path = filepath.Join(path, id)
if err := os.Mkdir(path, 0700); err != nil {
return "", err
}
if err := os.Mkdir(filepath.Join(path, "rootfs"), 0700); err != nil {
return "", err
}
f, err := os.Create(filepath.Join(path, configFilename))
if err != nil {
return "", err
}
defer f.Close()
_, err = io.Copy(f, bytes.NewReader(spec))
return path, err
}
func (r *Runtime) deleteBundle(namespace, id string) error {
return os.RemoveAll(filepath.Join(r.root, namespace, id))
}
func (r *Runtime) loadTask(ctx context.Context, namespace, path string) (*Task, error) {
id := filepath.Base(path)
s, err := loadShim(path, namespace, r.remote)
if err != nil {
return nil, err
}
if err = r.handleEvents(ctx, s); err != nil {
return nil, err
}
data, err := ioutil.ReadFile(filepath.Join(path, configFilename))
if err != nil {
return nil, err
}
return &Task{
containerID: id,
shim: s,
spec: data,
namespace: namespace,
}, nil
}
// killContainer is used whenever the runtime fails to connect to a shim (it died)
// and needs to cleanup the container resources in the underlying runtime (runc, etc...)
func (r *Runtime) killContainer(ctx context.Context, ns, id string) {
log.G(ctx).Debug("terminating container after failed load")
runtime := &runc.Runc{
// TODO: should we get Command provided for initial container creation?
Command: r.runtime,
LogFormat: runc.JSON,
PdeathSignal: unix.SIGKILL,
Root: filepath.Join(shimb.RuncRoot, ns),
}
if err := runtime.Kill(ctx, id, int(unix.SIGKILL), &runc.KillOpts{
All: true,
}); err != nil {
log.G(ctx).WithError(err).Warnf("kill all processes for %s", id)
}
// it can take a while for the container to be killed so poll for the container's status
// until it is in a stopped state
status := "running"
for status != "stopped" {
c, err := runtime.State(ctx, id)
if err != nil {
break
}
status = c.Status
time.Sleep(10 * time.Millisecond)
}
if err := runtime.Delete(ctx, id); err != nil {
log.G(ctx).WithError(err).Warnf("delete container %s", id)
}
// try to unmount the rootfs in case it was not owned by an external mount namespace
unix.Unmount(filepath.Join(r.root, ns, id, "rootfs"), 0)
// remove container bundle
if err := r.deleteBundle(ns, id); err != nil {
log.G(ctx).WithError(err).Warnf("delete container bundle %s", id)
}
}
func (r *Runtime) emit(ctx context.Context, topic string, evt interface{}) error {
emitterCtx := events.WithTopic(ctx, topic)
if err := r.emitter.Post(emitterCtx, evt); err != nil {
return err
}
return nil
}