Move runtime interfaces to runtime package

Signed-off-by: Michael Crosby <crosbymichael@gmail.com>
This commit is contained in:
Michael Crosby 2017-06-28 10:10:59 -07:00
parent bd25543f5a
commit 7c8acca29a
11 changed files with 72 additions and 69 deletions

View File

@ -24,6 +24,7 @@ import (
"github.com/containerd/containerd/metadata" "github.com/containerd/containerd/metadata"
"github.com/containerd/containerd/namespaces" "github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/plugin" "github.com/containerd/containerd/plugin"
"github.com/containerd/containerd/runtime"
runc "github.com/containerd/go-runc" runc "github.com/containerd/go-runc"
google_protobuf "github.com/golang/protobuf/ptypes/empty" google_protobuf "github.com/golang/protobuf/ptypes/empty"
"github.com/pkg/errors" "github.com/pkg/errors"
@ -60,7 +61,7 @@ func init() {
}) })
} }
var _ = (plugin.Runtime)(&Runtime{}) var _ = (runtime.Runtime)(&Runtime{})
type Config struct { type Config struct {
// Shim is a path or name of binary implementing the Shim GRPC API // Shim is a path or name of binary implementing the Shim GRPC API
@ -90,10 +91,10 @@ func New(ic *plugin.InitContext) (interface{}, error) {
remote: !cfg.NoShim, remote: !cfg.NoShim,
shim: cfg.Shim, shim: cfg.Shim,
runtime: cfg.Runtime, runtime: cfg.Runtime,
events: make(chan *plugin.Event, 2048), events: make(chan *runtime.Event, 2048),
eventsContext: c, eventsContext: c,
eventsCancel: cancel, eventsCancel: cancel,
monitor: monitor.(plugin.TaskMonitor), monitor: monitor.(runtime.TaskMonitor),
tasks: newTaskList(), tasks: newTaskList(),
emitter: events.GetPoster(ic.Context), emitter: events.GetPoster(ic.Context),
db: m.(*bolt.DB), db: m.(*bolt.DB),
@ -121,10 +122,10 @@ type Runtime struct {
runtime string runtime string
remote bool remote bool
events chan *plugin.Event events chan *runtime.Event
eventsContext context.Context eventsContext context.Context
eventsCancel func() eventsCancel func()
monitor plugin.TaskMonitor monitor runtime.TaskMonitor
tasks *taskList tasks *taskList
emitter events.Poster emitter events.Poster
db *bolt.DB db *bolt.DB
@ -134,7 +135,7 @@ func (r *Runtime) ID() string {
return pluginID return pluginID
} }
func (r *Runtime) Create(ctx context.Context, id string, opts plugin.CreateOpts) (_ plugin.Task, err error) { func (r *Runtime) Create(ctx context.Context, id string, opts runtime.CreateOpts) (_ runtime.Task, err error) {
namespace, err := namespaces.NamespaceRequired(ctx) namespace, err := namespaces.NamespaceRequired(ctx)
if err != nil { if err != nil {
return nil, err return nil, err
@ -216,7 +217,7 @@ func (r *Runtime) Create(ctx context.Context, id string, opts plugin.CreateOpts)
return t, nil return t, nil
} }
func (r *Runtime) Delete(ctx context.Context, c plugin.Task) (*plugin.Exit, error) { func (r *Runtime) Delete(ctx context.Context, c runtime.Task) (*runtime.Exit, error) {
namespace, err := namespaces.NamespaceRequired(ctx) namespace, err := namespaces.NamespaceRequired(ctx)
if err != nil { if err != nil {
return nil, err return nil, err
@ -249,19 +250,19 @@ func (r *Runtime) Delete(ctx context.Context, c plugin.Task) (*plugin.Exit, erro
}); err != nil { }); err != nil {
return nil, err return nil, err
} }
return &plugin.Exit{ return &runtime.Exit{
Status: rsp.ExitStatus, Status: rsp.ExitStatus,
Timestamp: rsp.ExitedAt, Timestamp: rsp.ExitedAt,
Pid: rsp.Pid, Pid: rsp.Pid,
}, bundle.Delete() }, bundle.Delete()
} }
func (r *Runtime) Tasks(ctx context.Context) ([]plugin.Task, error) { func (r *Runtime) Tasks(ctx context.Context) ([]runtime.Task, error) {
namespace, err := namespaces.NamespaceRequired(ctx) namespace, err := namespaces.NamespaceRequired(ctx)
if err != nil { if err != nil {
return nil, err return nil, err
} }
var o []plugin.Task var o []runtime.Task
tasks, ok := r.tasks.tasks[namespace] tasks, ok := r.tasks.tasks[namespace]
if !ok { if !ok {
return o, nil return o, nil
@ -293,7 +294,7 @@ func (r *Runtime) restoreTasks(ctx context.Context) ([]*Task, error) {
return o, nil return o, nil
} }
func (r *Runtime) Get(ctx context.Context, id string) (plugin.Task, error) { func (r *Runtime) Get(ctx context.Context, id string) (runtime.Task, error) {
return r.tasks.get(ctx, id) return r.tasks.get(ctx, id)
} }
@ -355,25 +356,25 @@ func (r *Runtime) forward(ctx context.Context, events shim.Shim_StreamClient) {
return return
} }
topic := "" topic := ""
var et plugin.EventType var et runtime.EventType
switch e.Type { switch e.Type {
case shim.Event_CREATE: case shim.Event_CREATE:
topic = "task-create" topic = "task-create"
et = plugin.CreateEvent et = runtime.CreateEvent
case shim.Event_START: case shim.Event_START:
topic = "task-start" topic = "task-start"
et = plugin.StartEvent et = runtime.StartEvent
case shim.Event_EXEC_ADDED: case shim.Event_EXEC_ADDED:
topic = "task-execadded" topic = "task-execadded"
et = plugin.ExecAddEvent et = runtime.ExecAddEvent
case shim.Event_OOM: case shim.Event_OOM:
topic = "task-oom" topic = "task-oom"
et = plugin.OOMEvent et = runtime.OOMEvent
case shim.Event_EXIT: case shim.Event_EXIT:
topic = "task-exit" topic = "task-exit"
et = plugin.ExitEvent et = runtime.ExitEvent
} }
r.events <- &plugin.Event{ r.events <- &runtime.Event{
Timestamp: time.Now(), Timestamp: time.Now(),
Runtime: r.ID(), Runtime: r.ID(),
Type: et, Type: et,

View File

@ -20,7 +20,7 @@ import (
shimapi "github.com/containerd/containerd/linux/shim/v1" shimapi "github.com/containerd/containerd/linux/shim/v1"
"github.com/containerd/containerd/log" "github.com/containerd/containerd/log"
"github.com/containerd/containerd/mount" "github.com/containerd/containerd/mount"
"github.com/containerd/containerd/plugin" "github.com/containerd/containerd/runtime"
"github.com/containerd/fifo" "github.com/containerd/fifo"
runc "github.com/containerd/go-runc" runc "github.com/containerd/go-runc"
specs "github.com/opencontainers/runtime-spec/specs-go" specs "github.com/opencontainers/runtime-spec/specs-go"
@ -369,7 +369,7 @@ func checkKillError(err error) error {
return nil return nil
} }
if strings.Contains(err.Error(), "os: process already finished") || err == unix.ESRCH { if strings.Contains(err.Error(), "os: process already finished") || err == unix.ESRCH {
return plugin.ErrProcessExited return runtime.ErrProcessExited
} }
return err return err
} }

View File

@ -10,7 +10,7 @@ import (
"github.com/containerd/containerd/api/types/task" "github.com/containerd/containerd/api/types/task"
client "github.com/containerd/containerd/linux/shim" client "github.com/containerd/containerd/linux/shim"
shim "github.com/containerd/containerd/linux/shim/v1" shim "github.com/containerd/containerd/linux/shim/v1"
"github.com/containerd/containerd/plugin" "github.com/containerd/containerd/runtime"
protobuf "github.com/gogo/protobuf/types" protobuf "github.com/gogo/protobuf/types"
specs "github.com/opencontainers/runtime-spec/specs-go" specs "github.com/opencontainers/runtime-spec/specs-go"
"github.com/pkg/errors" "github.com/pkg/errors"
@ -32,8 +32,8 @@ func newTask(id, namespace string, spec []byte, shim *client.Client) *Task {
} }
} }
func (t *Task) Info() plugin.TaskInfo { func (t *Task) Info() runtime.TaskInfo {
return plugin.TaskInfo{ return runtime.TaskInfo{
ID: t.containerID, ID: t.containerID,
ContainerID: t.containerID, ContainerID: t.containerID,
Runtime: pluginID, Runtime: pluginID,
@ -50,24 +50,24 @@ func (t *Task) Start(ctx context.Context) error {
return err return err
} }
func (t *Task) State(ctx context.Context) (plugin.State, error) { func (t *Task) State(ctx context.Context) (runtime.State, error) {
response, err := t.shim.State(ctx, empty) response, err := t.shim.State(ctx, empty)
if err != nil { if err != nil {
return plugin.State{}, errors.New(grpc.ErrorDesc(err)) return runtime.State{}, errors.New(grpc.ErrorDesc(err))
} }
var status plugin.Status var status runtime.Status
switch response.Status { switch response.Status {
case task.StatusCreated: case task.StatusCreated:
status = plugin.CreatedStatus status = runtime.CreatedStatus
case task.StatusRunning: case task.StatusRunning:
status = plugin.RunningStatus status = runtime.RunningStatus
case task.StatusStopped: case task.StatusStopped:
status = plugin.StoppedStatus status = runtime.StoppedStatus
case task.StatusPaused: case task.StatusPaused:
status = plugin.PausedStatus status = runtime.PausedStatus
// TODO: containerd.DeletedStatus // TODO: containerd.DeletedStatus
} }
return plugin.State{ return runtime.State{
Pid: response.Pid, Pid: response.Pid,
Status: status, Status: status,
Stdin: response.Stdin, Stdin: response.Stdin,
@ -105,7 +105,7 @@ func (t *Task) Kill(ctx context.Context, signal uint32, pid uint32, all bool) er
return err return err
} }
func (t *Task) Exec(ctx context.Context, opts plugin.ExecOpts) (plugin.Process, error) { func (t *Task) Exec(ctx context.Context, opts runtime.ExecOpts) (runtime.Process, error) {
request := &shim.ExecProcessRequest{ request := &shim.ExecProcessRequest{
Stdin: opts.IO.Stdin, Stdin: opts.IO.Stdin,
Stdout: opts.IO.Stdout, Stdout: opts.IO.Stdout,
@ -144,7 +144,7 @@ func (t *Task) Processes(ctx context.Context) ([]uint32, error) {
return pids, nil return pids, nil
} }
func (t *Task) ResizePty(ctx context.Context, pid uint32, size plugin.ConsoleSize) error { func (t *Task) ResizePty(ctx context.Context, pid uint32, size runtime.ConsoleSize) error {
_, err := t.shim.ResizePty(ctx, &shim.ResizePtyRequest{ _, err := t.shim.ResizePty(ctx, &shim.ResizePtyRequest{
Pid: pid, Pid: pid,
Width: size.Width, Width: size.Width,
@ -178,14 +178,14 @@ func (t *Task) Checkpoint(ctx context.Context, path string, options map[string]s
return nil return nil
} }
func (t *Task) DeleteProcess(ctx context.Context, pid uint32) (*plugin.Exit, error) { func (t *Task) DeleteProcess(ctx context.Context, pid uint32) (*runtime.Exit, error) {
r, err := t.shim.DeleteProcess(ctx, &shim.DeleteProcessRequest{ r, err := t.shim.DeleteProcess(ctx, &shim.DeleteProcessRequest{
Pid: pid, Pid: pid,
}) })
if err != nil { if err != nil {
return nil, errors.New(grpc.ErrorDesc(err)) return nil, errors.New(grpc.ErrorDesc(err))
} }
return &plugin.Exit{ return &runtime.Exit{
Status: r.ExitStatus, Status: r.ExitStatus,
Timestamp: r.ExitedAt, Timestamp: r.ExitedAt,
Pid: pid, Pid: pid,
@ -218,7 +218,7 @@ func (p *Process) Kill(ctx context.Context, signal uint32, _ bool) error {
return err return err
} }
func (p *Process) State(ctx context.Context) (plugin.State, error) { func (p *Process) State(ctx context.Context) (runtime.State, error) {
// use the container status for the status of the process // use the container status for the status of the process
state, err := p.t.State(ctx) state, err := p.t.State(ctx)
if err != nil { if err != nil {

View File

@ -7,6 +7,7 @@ import (
"github.com/containerd/cgroups" "github.com/containerd/cgroups"
"github.com/containerd/containerd/plugin" "github.com/containerd/containerd/plugin"
"github.com/containerd/containerd/runtime"
metrics "github.com/docker/go-metrics" metrics "github.com/docker/go-metrics"
"golang.org/x/net/context" "golang.org/x/net/context"
) )
@ -40,10 +41,10 @@ type cgroupsMonitor struct {
collector *Collector collector *Collector
oom *OOMCollector oom *OOMCollector
context context.Context context context.Context
events chan<- *plugin.Event events chan<- *runtime.Event
} }
func (m *cgroupsMonitor) Monitor(c plugin.Task) error { func (m *cgroupsMonitor) Monitor(c runtime.Task) error {
info := c.Info() info := c.Info()
state, err := c.State(m.context) state, err := c.State(m.context)
if err != nil { if err != nil {
@ -59,20 +60,20 @@ func (m *cgroupsMonitor) Monitor(c plugin.Task) error {
return m.oom.Add(info.ID, info.Namespace, cg, m.trigger) return m.oom.Add(info.ID, info.Namespace, cg, m.trigger)
} }
func (m *cgroupsMonitor) Stop(c plugin.Task) error { func (m *cgroupsMonitor) Stop(c runtime.Task) error {
info := c.Info() info := c.Info()
m.collector.Remove(info.ID, info.Namespace) m.collector.Remove(info.ID, info.Namespace)
return nil return nil
} }
func (m *cgroupsMonitor) Events(events chan<- *plugin.Event) { func (m *cgroupsMonitor) Events(events chan<- *runtime.Event) {
m.events = events m.events = events
} }
func (m *cgroupsMonitor) trigger(id string, cg cgroups.Cgroup) { func (m *cgroupsMonitor) trigger(id string, cg cgroups.Cgroup) {
m.events <- &plugin.Event{ m.events <- &runtime.Event{
Timestamp: time.Now(), Timestamp: time.Now(),
Type: plugin.OOMEvent, Type: runtime.OOMEvent,
ID: id, ID: id,
} }
} }

View File

@ -1,4 +1,4 @@
package plugin package runtime
import "errors" import "errors"

View File

@ -1,4 +1,4 @@
package plugin package runtime
import "time" import "time"

View File

@ -1,4 +1,4 @@
package plugin package runtime
// TaskMonitor provides an interface for monitoring of containers within containerd // TaskMonitor provides an interface for monitoring of containers within containerd
type TaskMonitor interface { type TaskMonitor interface {

View File

@ -1,4 +1,4 @@
package plugin package runtime
import ( import (
"context" "context"

View File

@ -1,4 +1,4 @@
package plugin package runtime
import "context" import "context"

View File

@ -22,6 +22,7 @@ import (
"github.com/containerd/containerd/metadata" "github.com/containerd/containerd/metadata"
"github.com/containerd/containerd/mount" "github.com/containerd/containerd/mount"
"github.com/containerd/containerd/plugin" "github.com/containerd/containerd/plugin"
"github.com/containerd/containerd/runtime"
protobuf "github.com/gogo/protobuf/types" protobuf "github.com/gogo/protobuf/types"
google_protobuf "github.com/golang/protobuf/ptypes/empty" google_protobuf "github.com/golang/protobuf/ptypes/empty"
specs "github.com/opencontainers/image-spec/specs-go" specs "github.com/opencontainers/image-spec/specs-go"
@ -62,9 +63,9 @@ func New(ic *plugin.InitContext) (interface{}, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
runtimes := make(map[string]plugin.Runtime) runtimes := make(map[string]runtime.Runtime)
for _, rr := range rt { for _, rr := range rt {
r := rr.(plugin.Runtime) r := rr.(runtime.Runtime)
runtimes[r.ID()] = r runtimes[r.ID()] = r
} }
e := events.GetPoster(ic.Context) e := events.GetPoster(ic.Context)
@ -77,7 +78,7 @@ func New(ic *plugin.InitContext) (interface{}, error) {
} }
type Service struct { type Service struct {
runtimes map[string]plugin.Runtime runtimes map[string]runtime.Runtime
db *bolt.DB db *bolt.DB
store content.Store store content.Store
emitter events.Poster emitter events.Poster
@ -124,9 +125,9 @@ func (s *Service) Create(ctx context.Context, r *api.CreateTaskRequest) (*api.Cr
return nil, err return nil, err
} }
opts := plugin.CreateOpts{ opts := runtime.CreateOpts{
Spec: container.Spec, Spec: container.Spec,
IO: plugin.IO{ IO: runtime.IO{
Stdin: r.Stdin, Stdin: r.Stdin,
Stdout: r.Stdout, Stdout: r.Stdout,
Stderr: r.Stderr, Stderr: r.Stderr,
@ -227,7 +228,7 @@ func (s *Service) DeleteProcess(ctx context.Context, r *api.DeleteProcessRequest
}, nil }, nil
} }
func taskFromContainerd(ctx context.Context, c plugin.Task) (*task.Task, error) { func taskFromContainerd(ctx context.Context, c runtime.Task) (*task.Task, error) {
state, err := c.State(ctx) state, err := c.State(ctx)
if err != nil { if err != nil {
return nil, err return nil, err
@ -235,13 +236,13 @@ func taskFromContainerd(ctx context.Context, c plugin.Task) (*task.Task, error)
var status task.Status var status task.Status
switch state.Status { switch state.Status {
case plugin.CreatedStatus: case runtime.CreatedStatus:
status = task.StatusCreated status = task.StatusCreated
case plugin.RunningStatus: case runtime.RunningStatus:
status = task.StatusRunning status = task.StatusRunning
case plugin.StoppedStatus: case runtime.StoppedStatus:
status = task.StatusStopped status = task.StatusStopped
case plugin.PausedStatus: case runtime.PausedStatus:
status = task.StatusPaused status = task.StatusPaused
default: default:
log.G(ctx).WithField("status", state.Status).Warn("unknown status") log.G(ctx).WithField("status", state.Status).Warn("unknown status")
@ -366,9 +367,9 @@ func (s *Service) Exec(ctx context.Context, r *api.ExecProcessRequest) (*api.Exe
if err != nil { if err != nil {
return nil, err return nil, err
} }
process, err := t.Exec(ctx, plugin.ExecOpts{ process, err := t.Exec(ctx, runtime.ExecOpts{
Spec: r.Spec.Value, Spec: r.Spec.Value,
IO: plugin.IO{ IO: runtime.IO{
Stdin: r.Stdin, Stdin: r.Stdin,
Stdout: r.Stdout, Stdout: r.Stdout,
Stderr: r.Stderr, Stderr: r.Stderr,
@ -392,7 +393,7 @@ func (s *Service) ResizePty(ctx context.Context, r *api.ResizePtyRequest) (*goog
if err != nil { if err != nil {
return nil, err return nil, err
} }
if err := t.ResizePty(ctx, r.Pid, plugin.ConsoleSize{ if err := t.ResizePty(ctx, r.Pid, runtime.ConsoleSize{
Width: r.Width, Width: r.Width,
Height: r.Height, Height: r.Height,
}); err != nil { }); err != nil {
@ -497,7 +498,7 @@ func (s *Service) getContainer(ctx context.Context, id string) (containers.Conta
return container, nil return container, nil
} }
func (s *Service) getTask(ctx context.Context, id string) (plugin.Task, error) { func (s *Service) getTask(ctx context.Context, id string) (runtime.Task, error) {
container, err := s.getContainer(ctx, id) container, err := s.getContainer(ctx, id)
if err != nil { if err != nil {
return nil, grpc.Errorf(codes.InvalidArgument, "task %v not found: %s", id, err.Error()) return nil, grpc.Errorf(codes.InvalidArgument, "task %v not found: %s", id, err.Error())
@ -516,7 +517,7 @@ func (s *Service) getTask(ctx context.Context, id string) (plugin.Task, error) {
return t, nil return t, nil
} }
func (s *Service) getRuntime(name string) (plugin.Runtime, error) { func (s *Service) getRuntime(name string) (runtime.Runtime, error) {
runtime, ok := s.runtimes[name] runtime, ok := s.runtimes[name]
if !ok { if !ok {
return nil, fmt.Errorf("unknown runtime %q", name) return nil, fmt.Errorf("unknown runtime %q", name)

14
task.go
View File

@ -6,7 +6,7 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"io" "io"
"runtime" goruntime "runtime"
"strings" "strings"
"syscall" "syscall"
@ -15,8 +15,8 @@ import (
"github.com/containerd/containerd/api/services/tasks/v1" "github.com/containerd/containerd/api/services/tasks/v1"
"github.com/containerd/containerd/content" "github.com/containerd/containerd/content"
"github.com/containerd/containerd/events" "github.com/containerd/containerd/events"
"github.com/containerd/containerd/plugin"
"github.com/containerd/containerd/rootfs" "github.com/containerd/containerd/rootfs"
"github.com/containerd/containerd/runtime"
"github.com/opencontainers/image-spec/specs-go/v1" "github.com/opencontainers/image-spec/specs-go/v1"
specs "github.com/opencontainers/runtime-spec/specs-go" specs "github.com/opencontainers/runtime-spec/specs-go"
"google.golang.org/grpc" "google.golang.org/grpc"
@ -115,7 +115,7 @@ func (t *task) Kill(ctx context.Context, s syscall.Signal) error {
}, },
}) })
if err != nil { if err != nil {
if strings.Contains(grpc.ErrorDesc(err), plugin.ErrProcessExited.Error()) { if strings.Contains(grpc.ErrorDesc(err), runtime.ErrProcessExited.Error()) {
return ErrProcessExited return ErrProcessExited
} }
return err return err
@ -320,8 +320,8 @@ func (t *task) checkpointTask(ctx context.Context, index *v1.Index, request *tas
Size: d.Size_, Size: d.Size_,
Digest: d.Digest, Digest: d.Digest,
Platform: &v1.Platform{ Platform: &v1.Platform{
OS: runtime.GOOS, OS: goruntime.GOOS,
Architecture: runtime.GOARCH, Architecture: goruntime.GOARCH,
}, },
}) })
} }
@ -334,8 +334,8 @@ func (t *task) checkpointRWSnapshot(ctx context.Context, index *v1.Index, id str
return err return err
} }
rw.Platform = &v1.Platform{ rw.Platform = &v1.Platform{
OS: runtime.GOOS, OS: goruntime.GOOS,
Architecture: runtime.GOARCH, Architecture: goruntime.GOARCH,
} }
index.Manifests = append(index.Manifests, rw) index.Manifests = append(index.Manifests, rw)
return nil return nil