containerd/services/execution/service.go
Derek McGowan e1ed4a2ea4
Add json storage for container storage
This is just a temporary storage solution to get
containers running on the new code.

Signed-off-by: Derek McGowan <derek@mcgstyle.net>
2017-05-23 14:22:20 -07:00

500 lines
11 KiB
Go

package execution
import (
"bytes"
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
"sync"
"github.com/boltdb/bolt"
api "github.com/containerd/containerd/api/services/execution"
"github.com/containerd/containerd/api/types/descriptor"
"github.com/containerd/containerd/api/types/task"
"github.com/containerd/containerd/archive"
"github.com/containerd/containerd/containers"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/mount"
"github.com/containerd/containerd/plugin"
protobuf "github.com/gogo/protobuf/types"
google_protobuf "github.com/golang/protobuf/ptypes/empty"
specs "github.com/opencontainers/image-spec/specs-go"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
)
var (
_ = (api.TasksServer)(&Service{})
empty = &google_protobuf.Empty{}
)
func init() {
plugin.Register("tasks-grpc", &plugin.Registration{
Type: plugin.GRPCPlugin,
Init: New,
})
}
func New(ic *plugin.InitContext) (interface{}, error) {
c, err := newCollector(ic.Context, ic.Runtimes)
if err != nil {
return nil, err
}
return &Service{
runtimes: ic.Runtimes,
tasks: make(map[string]plugin.Task),
db: ic.Meta,
collector: c,
store: ic.Content,
}, nil
}
type Service struct {
mu sync.Mutex
runtimes map[string]plugin.Runtime
tasks map[string]plugin.Task
db *bolt.DB
collector *collector
store content.Store
}
func (s *Service) Register(server *grpc.Server) error {
api.RegisterTasksServer(server, s)
// load all tasks
for _, r := range s.runtimes {
tasks, err := r.Tasks(context.Background())
if err != nil {
return err
}
for _, c := range tasks {
s.tasks[c.Info().ContainerID] = c
}
}
return nil
}
func (s *Service) Create(ctx context.Context, r *api.CreateRequest) (*api.CreateResponse, error) {
var (
checkpointPath string
err error
)
if r.Checkpoint != nil {
checkpointPath, err = ioutil.TempDir("", "ctrd-checkpoint")
if err != nil {
return nil, err
}
if r.Checkpoint.MediaType != images.MediaTypeContainerd1Checkpoint {
return nil, fmt.Errorf("unsupported checkpoint type %q", r.Checkpoint.MediaType)
}
reader, err := s.store.Reader(ctx, r.Checkpoint.Digest)
if err != nil {
return nil, err
}
_, err = archive.Apply(ctx, checkpointPath, reader)
reader.Close()
if err != nil {
return nil, err
}
}
var container containers.Container
if err := s.db.View(func(tx *bolt.Tx) error {
store := containers.NewStore(tx)
var err error
container, err = store.Get(ctx, r.ContainerID)
return err
}); err != nil {
switch {
case containers.IsNotFound(err):
return nil, grpc.Errorf(codes.NotFound, "container %v not found", r.ContainerID)
case containers.IsExists(err):
return nil, grpc.Errorf(codes.AlreadyExists, "container %v already exists", r.ContainerID)
}
return nil, err
}
opts := plugin.CreateOpts{
Spec: container.Spec,
IO: plugin.IO{
Stdin: r.Stdin,
Stdout: r.Stdout,
Stderr: r.Stderr,
Terminal: r.Terminal,
},
Checkpoint: checkpointPath,
}
for _, m := range r.Rootfs {
opts.Rootfs = append(opts.Rootfs, mount.Mount{
Type: m.Type,
Source: m.Source,
Options: m.Options,
})
}
runtime, err := s.getRuntime(container.Runtime)
if err != nil {
return nil, err
}
s.mu.Lock()
if _, ok := s.tasks[r.ContainerID]; ok {
s.mu.Unlock()
return nil, grpc.Errorf(codes.AlreadyExists, "task %v already exists", r.ContainerID)
}
c, err := runtime.Create(ctx, r.ContainerID, opts)
if err != nil {
s.mu.Unlock()
return nil, err
}
s.tasks[r.ContainerID] = c
s.mu.Unlock()
state, err := c.State(ctx)
if err != nil {
s.mu.Lock()
delete(s.tasks, r.ContainerID)
runtime.Delete(ctx, c)
s.mu.Unlock()
return nil, err
}
return &api.CreateResponse{
ContainerID: r.ContainerID,
Pid: state.Pid(),
}, nil
}
func (s *Service) Start(ctx context.Context, r *api.StartRequest) (*google_protobuf.Empty, error) {
c, err := s.getTask(r.ContainerID)
if err != nil {
return nil, err
}
if err := c.Start(ctx); err != nil {
return nil, err
}
return empty, nil
}
func (s *Service) Delete(ctx context.Context, r *api.DeleteRequest) (*api.DeleteResponse, error) {
c, err := s.getTask(r.ContainerID)
if err != nil {
return nil, err
}
runtime, err := s.getRuntime(c.Info().Runtime)
if err != nil {
return nil, err
}
exit, err := runtime.Delete(ctx, c)
if err != nil {
return nil, err
}
delete(s.tasks, r.ContainerID)
return &api.DeleteResponse{
ExitStatus: exit.Status,
ExitedAt: exit.Timestamp,
}, nil
}
func taskFromContainerd(ctx context.Context, c plugin.Task) (*task.Task, error) {
state, err := c.State(ctx)
if err != nil {
return nil, err
}
var status task.Status
switch state.Status() {
case plugin.CreatedStatus:
status = task.StatusCreated
case plugin.RunningStatus:
status = task.StatusRunning
case plugin.StoppedStatus:
status = task.StatusStopped
case plugin.PausedStatus:
status = task.StatusPaused
default:
log.G(ctx).WithField("status", state.Status()).Warn("unknown status")
}
return &task.Task{
ID: c.Info().ID,
Pid: state.Pid(),
Status: status,
Spec: &protobuf.Any{
TypeUrl: specs.Version,
Value: c.Info().Spec,
},
}, nil
}
func (s *Service) Info(ctx context.Context, r *api.InfoRequest) (*api.InfoResponse, error) {
task, err := s.getTask(r.ContainerID)
if err != nil {
return nil, err
}
t, err := taskFromContainerd(ctx, task)
if err != nil {
return nil, err
}
return &api.InfoResponse{
Task: t,
}, nil
}
func (s *Service) List(ctx context.Context, r *api.ListRequest) (*api.ListResponse, error) {
resp := &api.ListResponse{}
s.mu.Lock()
defer s.mu.Unlock()
for _, cd := range s.tasks {
c, err := taskFromContainerd(ctx, cd)
if err != nil {
return nil, err
}
resp.Tasks = append(resp.Tasks, c)
}
return resp, nil
}
func (s *Service) Pause(ctx context.Context, r *api.PauseRequest) (*google_protobuf.Empty, error) {
c, err := s.getTask(r.ContainerID)
if err != nil {
return nil, err
}
err = c.Pause(ctx)
if err != nil {
return nil, err
}
return empty, nil
}
func (s *Service) Resume(ctx context.Context, r *api.ResumeRequest) (*google_protobuf.Empty, error) {
c, err := s.getTask(r.ContainerID)
if err != nil {
return nil, err
}
err = c.Resume(ctx)
if err != nil {
return nil, err
}
return empty, nil
}
func (s *Service) Kill(ctx context.Context, r *api.KillRequest) (*google_protobuf.Empty, error) {
c, err := s.getTask(r.ContainerID)
if err != nil {
return nil, err
}
switch v := r.PidOrAll.(type) {
case *api.KillRequest_All:
if err := c.Kill(ctx, r.Signal, 0, true); err != nil {
return nil, err
}
case *api.KillRequest_Pid:
if err := c.Kill(ctx, r.Signal, v.Pid, false); err != nil {
return nil, err
}
default:
return nil, fmt.Errorf("invalid option specified; expected pid or all")
}
return empty, nil
}
func (s *Service) Processes(ctx context.Context, r *api.ProcessesRequest) (*api.ProcessesResponse, error) {
c, err := s.getTask(r.ContainerID)
if err != nil {
return nil, err
}
pids, err := c.Processes(ctx)
if err != nil {
return nil, err
}
ps := []*task.Process{}
for _, pid := range pids {
ps = append(ps, &task.Process{
Pid: pid,
})
}
resp := &api.ProcessesResponse{
Processes: ps,
}
return resp, nil
}
func (s *Service) Events(r *api.EventsRequest, server api.Tasks_EventsServer) error {
w := &grpcEventWriter{
server: server,
}
return s.collector.forward(w)
}
func (s *Service) Exec(ctx context.Context, r *api.ExecRequest) (*api.ExecResponse, error) {
c, err := s.getTask(r.ContainerID)
if err != nil {
return nil, err
}
process, err := c.Exec(ctx, plugin.ExecOpts{
Spec: r.Spec.Value,
IO: plugin.IO{
Stdin: r.Stdin,
Stdout: r.Stdout,
Stderr: r.Stderr,
Terminal: r.Terminal,
},
})
if err != nil {
return nil, err
}
state, err := process.State(ctx)
if err != nil {
return nil, err
}
return &api.ExecResponse{
Pid: state.Pid(),
}, nil
}
func (s *Service) Pty(ctx context.Context, r *api.PtyRequest) (*google_protobuf.Empty, error) {
c, err := s.getTask(r.ContainerID)
if err != nil {
return nil, err
}
if err := c.Pty(ctx, r.Pid, plugin.ConsoleSize{
Width: r.Width,
Height: r.Height,
}); err != nil {
return nil, err
}
return empty, nil
}
func (s *Service) CloseStdin(ctx context.Context, r *api.CloseStdinRequest) (*google_protobuf.Empty, error) {
c, err := s.getTask(r.ContainerID)
if err != nil {
return nil, err
}
if err := c.CloseStdin(ctx, r.Pid); err != nil {
return nil, err
}
return empty, nil
}
func (s *Service) Checkpoint(ctx context.Context, r *api.CheckpointRequest) (*api.CheckpointResponse, error) {
c, err := s.getTask(r.ContainerID)
if err != nil {
return nil, err
}
image, err := ioutil.TempDir("", "ctd-checkpoint")
if err != nil {
return nil, err
}
defer os.RemoveAll(image)
if err := c.Checkpoint(ctx, plugin.CheckpointOpts{
Exit: r.Exit,
AllowTCP: r.AllowTcp,
AllowTerminal: r.AllowTerminal,
AllowUnixSockets: r.AllowUnixSockets,
FileLocks: r.FileLocks,
// ParentImage: r.ParentImage,
EmptyNamespaces: r.EmptyNamespaces,
Path: image,
}); err != nil {
return nil, err
}
// write checkpoint to the content store
tar := archive.Diff(ctx, "", image)
cp, err := s.writeContent(ctx, images.MediaTypeContainerd1Checkpoint, image, tar)
// close tar first after write
if err := tar.Close(); err != nil {
return nil, err
}
if err != nil {
return nil, err
}
// write the config to the content store
spec := bytes.NewReader(c.Info().Spec)
specD, err := s.writeContent(ctx, images.MediaTypeContainerd1CheckpointConfig, filepath.Join(image, "spec"), spec)
if err != nil {
return nil, err
}
return &api.CheckpointResponse{
Descriptors: []*descriptor.Descriptor{
cp,
specD,
},
}, nil
}
func (s *Service) writeContent(ctx context.Context, mediaType, ref string, r io.Reader) (*descriptor.Descriptor, error) {
writer, err := s.store.Writer(ctx, ref, 0, "")
if err != nil {
return nil, err
}
defer writer.Close()
size, err := io.Copy(writer, r)
if err != nil {
return nil, err
}
if err := writer.Commit(0, ""); err != nil {
return nil, err
}
return &descriptor.Descriptor{
MediaType: mediaType,
Digest: writer.Digest(),
Size_: size,
}, nil
}
func (s *Service) getTask(id string) (plugin.Task, error) {
s.mu.Lock()
c, ok := s.tasks[id]
s.mu.Unlock()
if !ok {
return nil, grpc.Errorf(codes.NotFound, "task %v not found", id)
}
return c, nil
}
func (s *Service) getRuntime(name string) (plugin.Runtime, error) {
runtime, ok := s.runtimes[name]
if !ok {
return nil, plugin.ErrUnknownRuntime
}
return runtime, nil
}
type grpcEventWriter struct {
server api.Tasks_EventsServer
}
func (g *grpcEventWriter) Write(e *plugin.Event) error {
var t task.Event_EventType
switch e.Type {
case plugin.ExitEvent:
t = task.Event_EXIT
case plugin.ExecAddEvent:
t = task.Event_EXEC_ADDED
case plugin.PausedEvent:
t = task.Event_PAUSED
case plugin.CreateEvent:
t = task.Event_CREATE
case plugin.StartEvent:
t = task.Event_START
case plugin.OOMEvent:
t = task.Event_OOM
}
return g.server.Send(&task.Event{
Type: t,
ID: e.ID,
Pid: e.Pid,
ExitStatus: e.ExitStatus,
})
}