api/services: define the container metadata service

Working from feedback on the existing implementation, we have now
introduced a central metadata object to represent the lifecycle and pin
the resources required to implement what people today know as
containers. This includes the runtime specification and the root
filesystem snapshots. We also allow arbitrary labeling of the container.
Such provisions will bring the containerd definition of container closer
to what is expected by users.

The objects that encompass today's ContainerService, centered around the
runtime, will be known as tasks. These tasks take on the existing
lifecycle behavior of containerd's containers, which means that they are
deleted when they exit. Largely, there are no other changes except for
naming.

The `Container` object will operate purely as a metadata object. No
runtime state will be held on `Container`. It only informs the execution
service on what is required for creating tasks and the resources in use
by that container. The resources referenced by that container will be
deleted when the container is deleted, if not in use. In this sense,
users can create, list, label and delete containers in a similar way as
they do with docker today, without the complexity of runtime locks that
plagues current implementations.

Signed-off-by: Stephen J Day <stephen.day@docker.com>
This commit is contained in:
Stephen J Day
2017-05-15 17:44:50 -07:00
parent 8f3b89c79d
commit 539742881d
47 changed files with 4067 additions and 1115 deletions

View File

@@ -0,0 +1,56 @@
package containers
import (
api "github.com/containerd/containerd/api/services/containers"
"github.com/containerd/containerd/containers"
"github.com/gogo/protobuf/types"
specs "github.com/opencontainers/runtime-spec/specs-go"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
)
func containersToProto(containers []containers.Container) []api.Container {
var containerspb []api.Container
for _, image := range containers {
containerspb = append(containerspb, containerToProto(&image))
}
return containerspb
}
func containerToProto(container *containers.Container) api.Container {
return api.Container{
ID: container.ID,
Labels: container.Labels,
Image: container.Image,
Runtime: container.Runtime,
Spec: &types.Any{
TypeUrl: specs.Version,
Value: container.Spec,
},
RootFS: container.RootFS,
}
}
func containerFromProto(containerpb *api.Container) containers.Container {
return containers.Container{
ID: containerpb.ID,
Labels: containerpb.Labels,
Image: containerpb.Image,
Runtime: containerpb.Runtime,
Spec: containerpb.Spec.Value,
RootFS: containerpb.RootFS,
}
}
func mapGRPCError(err error, id string) error {
switch {
case containers.IsNotFound(err):
return grpc.Errorf(codes.NotFound, "container %v not found", id)
case containers.IsExists(err):
return grpc.Errorf(codes.AlreadyExists, "container %v already exists", id)
}
return err
}

View File

@@ -0,0 +1,148 @@
package containers
import (
"github.com/boltdb/bolt"
api "github.com/containerd/containerd/api/services/containers"
"github.com/containerd/containerd/containers"
"github.com/containerd/containerd/plugin"
"github.com/golang/protobuf/ptypes/empty"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
)
func init() {
plugin.Register("containers-grpc", &plugin.Registration{
Type: plugin.GRPCPlugin,
Init: func(ic *plugin.InitContext) (interface{}, error) {
return NewService(ic.Meta), nil
},
})
}
type Service struct {
db *bolt.DB
}
func NewService(db *bolt.DB) api.ContainersServer {
return &Service{db: db}
}
func (s *Service) Register(server *grpc.Server) error {
api.RegisterContainersServer(server, s)
return nil
}
func (s *Service) Get(ctx context.Context, req *api.GetContainerRequest) (*api.GetContainerResponse, error) {
var resp api.GetContainerResponse
return &resp, s.withStoreView(ctx, func(ctx context.Context, store containers.Store) error {
container, err := store.Get(ctx, req.ID)
if err != nil {
return mapGRPCError(err, req.ID)
}
containerpb := containerToProto(&container)
resp.Container = containerpb
return nil
})
}
func (s *Service) List(ctx context.Context, req *api.ListContainersRequest) (*api.ListContainersResponse, error) {
var resp api.ListContainersResponse
return &resp, s.withStoreView(ctx, func(ctx context.Context, store containers.Store) error {
containers, err := store.List(ctx, req.Filter)
if err != nil {
return mapGRPCError(err, "")
}
resp.Containers = containersToProto(containers)
return nil
})
}
func (s *Service) Create(ctx context.Context, req *api.CreateContainerRequest) (*api.CreateContainerResponse, error) {
var resp api.CreateContainerResponse
return &resp, s.withStoreUpdate(ctx, func(ctx context.Context, store containers.Store) error {
container := containerFromProto(&req.Container)
created, err := store.Create(ctx, container)
if err != nil {
return mapGRPCError(err, req.Container.ID)
}
resp.Container = containerToProto(&created)
return nil
})
}
func (s *Service) Update(ctx context.Context, req *api.UpdateContainerRequest) (*api.UpdateContainerResponse, error) {
var resp api.UpdateContainerResponse
return &resp, s.withStoreUpdate(ctx, func(ctx context.Context, store containers.Store) error {
container := containerFromProto(&req.Container)
current, err := store.Get(ctx, container.ID)
if err != nil {
return mapGRPCError(err, container.ID)
}
if current.ID != container.ID {
return grpc.Errorf(codes.InvalidArgument, "container ids must match: %v != %v", current.ID, container.ID)
}
// apply the field mask. If you update this code, you better follow the
// field mask rules in field_mask.proto. If you don't know what this
// is, do not update this code.
if req.UpdateMask != nil && len(req.UpdateMask.Paths) > 0 {
for _, path := range req.UpdateMask.Paths {
switch path {
case "labels":
current.Labels = container.Labels
case "image":
current.Image = container.Image
case "runtime":
// TODO(stevvooe): Should this actually be allowed?
current.Runtime = container.Runtime
case "spec":
current.Spec = container.Spec
case "rootfs":
current.RootFS = container.RootFS
default:
return grpc.Errorf(codes.InvalidArgument, "cannot update %q field", path)
}
}
} else {
// no field mask present, just replace everything
current = container
}
created, err := store.Update(ctx, container)
if err != nil {
return mapGRPCError(err, req.Container.ID)
}
resp.Container = containerToProto(&created)
return nil
})
}
func (s *Service) Delete(ctx context.Context, req *api.DeleteContainerRequest) (*empty.Empty, error) {
return &empty.Empty{}, s.withStoreUpdate(ctx, func(ctx context.Context, store containers.Store) error {
return mapGRPCError(store.Delete(ctx, req.ID), req.ID)
})
}
func (s *Service) withStore(ctx context.Context, fn func(ctx context.Context, store containers.Store) error) func(tx *bolt.Tx) error {
return func(tx *bolt.Tx) error { return fn(ctx, containers.NewStore(tx)) }
}
func (s *Service) withStoreView(ctx context.Context, fn func(ctx context.Context, store containers.Store) error) error {
return s.db.View(s.withStore(ctx, fn))
}
func (s *Service) withStoreUpdate(ctx context.Context, fn func(ctx context.Context, store containers.Store) error) error {
return s.db.Update(s.withStore(ctx, fn))
}

View File

@@ -9,10 +9,12 @@ import (
"path/filepath"
"sync"
"github.com/boltdb/bolt"
api "github.com/containerd/containerd/api/services/execution"
"github.com/containerd/containerd/api/types/container"
"github.com/containerd/containerd/api/types/descriptor"
"github.com/containerd/containerd/api/types/task"
"github.com/containerd/containerd/archive"
"github.com/containerd/containerd/containers"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/log"
@@ -23,15 +25,16 @@ import (
specs "github.com/opencontainers/image-spec/specs-go"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
)
var (
_ = (api.ContainerServiceServer)(&Service{})
_ = (api.TasksServer)(&Service{})
empty = &google_protobuf.Empty{}
)
func init() {
plugin.Register("runtime-grpc", &plugin.Registration{
plugin.Register("tasks-grpc", &plugin.Registration{
Type: plugin.GRPCPlugin,
Init: New,
})
@@ -43,32 +46,34 @@ func New(ic *plugin.InitContext) (interface{}, error) {
return nil, err
}
return &Service{
runtimes: ic.Runtimes,
containers: make(map[string]plugin.Container),
collector: c,
store: ic.Content,
runtimes: ic.Runtimes,
tasks: make(map[string]plugin.Task),
db: ic.Meta,
collector: c,
store: ic.Content,
}, nil
}
type Service struct {
mu sync.Mutex
runtimes map[string]plugin.Runtime
containers map[string]plugin.Container
collector *collector
store content.Store
runtimes map[string]plugin.Runtime
tasks map[string]plugin.Task
db *bolt.DB
collector *collector
store content.Store
}
func (s *Service) Register(server *grpc.Server) error {
api.RegisterContainerServiceServer(server, s)
// load all containers
api.RegisterTasksServer(server, s)
// load all tasks
for _, r := range s.runtimes {
containers, err := r.Containers(context.Background())
tasks, err := r.Tasks(context.Background())
if err != nil {
return err
}
for _, c := range containers {
s.containers[c.Info().ID] = c
for _, c := range tasks {
s.tasks[c.Info().ContainerID] = c
}
}
return nil
@@ -80,7 +85,7 @@ func (s *Service) Create(ctx context.Context, r *api.CreateRequest) (*api.Create
err error
)
if r.Checkpoint != nil {
checkpointPath, err = ioutil.TempDir("", "ctd-checkpoint")
checkpointPath, err = ioutil.TempDir("", "ctrd-checkpoint")
if err != nil {
return nil, err
}
@@ -97,8 +102,26 @@ func (s *Service) Create(ctx context.Context, r *api.CreateRequest) (*api.Create
return nil, err
}
}
var container containers.Container
if err := s.db.View(func(tx *bolt.Tx) error {
store := containers.NewStore(tx)
var err error
container, err = store.Get(ctx, r.ContainerID)
return err
}); err != nil {
switch {
case containers.IsNotFound(err):
return nil, grpc.Errorf(codes.NotFound, "container %v not found", r.ContainerID)
case containers.IsExists(err):
return nil, grpc.Errorf(codes.AlreadyExists, "container %v already exists", r.ContainerID)
}
return nil, err
}
opts := plugin.CreateOpts{
Spec: r.Spec.Value,
Spec: container.Spec,
IO: plugin.IO{
Stdin: r.Stdin,
Stdout: r.Stdout,
@@ -114,38 +137,38 @@ func (s *Service) Create(ctx context.Context, r *api.CreateRequest) (*api.Create
Options: m.Options,
})
}
runtime, err := s.getRuntime(r.Runtime)
runtime, err := s.getRuntime(container.Runtime)
if err != nil {
return nil, err
}
s.mu.Lock()
if _, ok := s.containers[r.ID]; ok {
if _, ok := s.tasks[r.ContainerID]; ok {
s.mu.Unlock()
return nil, plugin.ErrContainerExists
}
c, err := runtime.Create(ctx, r.ID, opts)
c, err := runtime.Create(ctx, r.ContainerID, opts)
if err != nil {
s.mu.Unlock()
return nil, err
}
s.containers[r.ID] = c
s.tasks[r.ContainerID] = c
s.mu.Unlock()
state, err := c.State(ctx)
if err != nil {
s.mu.Lock()
delete(s.containers, r.ID)
delete(s.tasks, r.ContainerID)
runtime.Delete(ctx, c)
s.mu.Unlock()
return nil, err
}
return &api.CreateResponse{
ID: r.ID,
Pid: state.Pid(),
ContainerID: r.ContainerID,
Pid: state.Pid(),
}, nil
}
func (s *Service) Start(ctx context.Context, r *api.StartRequest) (*google_protobuf.Empty, error) {
c, err := s.getContainer(r.ID)
c, err := s.getTask(r.ContainerID)
if err != nil {
return nil, err
}
@@ -156,7 +179,7 @@ func (s *Service) Start(ctx context.Context, r *api.StartRequest) (*google_proto
}
func (s *Service) Delete(ctx context.Context, r *api.DeleteRequest) (*api.DeleteResponse, error) {
c, err := s.getContainer(r.ID)
c, err := s.getTask(r.ContainerID)
if err != nil {
return nil, err
}
@@ -169,7 +192,7 @@ func (s *Service) Delete(ctx context.Context, r *api.DeleteRequest) (*api.Delete
return nil, err
}
delete(s.containers, r.ID)
delete(s.tasks, r.ContainerID)
return &api.DeleteResponse{
ExitStatus: exit.Status,
@@ -177,26 +200,26 @@ func (s *Service) Delete(ctx context.Context, r *api.DeleteRequest) (*api.Delete
}, nil
}
func containerFromContainerd(ctx context.Context, c plugin.Container) (*container.Container, error) {
func taskFromContainerd(ctx context.Context, c plugin.Task) (*task.Task, error) {
state, err := c.State(ctx)
if err != nil {
return nil, err
}
var status container.Status
var status task.Status
switch state.Status() {
case plugin.CreatedStatus:
status = container.StatusCreated
status = task.StatusCreated
case plugin.RunningStatus:
status = container.StatusRunning
status = task.StatusRunning
case plugin.StoppedStatus:
status = container.StatusStopped
status = task.StatusStopped
case plugin.PausedStatus:
status = container.StatusPaused
status = task.StatusPaused
default:
log.G(ctx).WithField("status", state.Status()).Warn("unknown status")
}
return &container.Container{
return &task.Task{
ID: c.Info().ID,
Pid: state.Pid(),
Status: status,
@@ -207,30 +230,36 @@ func containerFromContainerd(ctx context.Context, c plugin.Container) (*containe
}, nil
}
func (s *Service) Info(ctx context.Context, r *api.InfoRequest) (*container.Container, error) {
c, err := s.getContainer(r.ID)
func (s *Service) Info(ctx context.Context, r *api.InfoRequest) (*api.InfoResponse, error) {
task, err := s.getTask(r.ContainerID)
if err != nil {
return nil, err
}
return containerFromContainerd(ctx, c)
t, err := taskFromContainerd(ctx, task)
if err != nil {
return nil, err
}
return &api.InfoResponse{
Task: t,
}, nil
}
func (s *Service) List(ctx context.Context, r *api.ListRequest) (*api.ListResponse, error) {
resp := &api.ListResponse{}
s.mu.Lock()
defer s.mu.Unlock()
for _, cd := range s.containers {
c, err := containerFromContainerd(ctx, cd)
for _, cd := range s.tasks {
c, err := taskFromContainerd(ctx, cd)
if err != nil {
return nil, err
}
resp.Containers = append(resp.Containers, c)
resp.Tasks = append(resp.Tasks, c)
}
return resp, nil
}
func (s *Service) Pause(ctx context.Context, r *api.PauseRequest) (*google_protobuf.Empty, error) {
c, err := s.getContainer(r.ID)
c, err := s.getTask(r.ContainerID)
if err != nil {
return nil, err
}
@@ -242,7 +271,7 @@ func (s *Service) Pause(ctx context.Context, r *api.PauseRequest) (*google_proto
}
func (s *Service) Resume(ctx context.Context, r *api.ResumeRequest) (*google_protobuf.Empty, error) {
c, err := s.getContainer(r.ID)
c, err := s.getTask(r.ContainerID)
if err != nil {
return nil, err
}
@@ -254,7 +283,7 @@ func (s *Service) Resume(ctx context.Context, r *api.ResumeRequest) (*google_pro
}
func (s *Service) Kill(ctx context.Context, r *api.KillRequest) (*google_protobuf.Empty, error) {
c, err := s.getContainer(r.ID)
c, err := s.getTask(r.ContainerID)
if err != nil {
return nil, err
}
@@ -275,7 +304,7 @@ func (s *Service) Kill(ctx context.Context, r *api.KillRequest) (*google_protobu
}
func (s *Service) Processes(ctx context.Context, r *api.ProcessesRequest) (*api.ProcessesResponse, error) {
c, err := s.getContainer(r.ID)
c, err := s.getTask(r.ContainerID)
if err != nil {
return nil, err
}
@@ -285,9 +314,9 @@ func (s *Service) Processes(ctx context.Context, r *api.ProcessesRequest) (*api.
return nil, err
}
ps := []*container.Process{}
ps := []*task.Process{}
for _, pid := range pids {
ps = append(ps, &container.Process{
ps = append(ps, &task.Process{
Pid: pid,
})
}
@@ -299,7 +328,7 @@ func (s *Service) Processes(ctx context.Context, r *api.ProcessesRequest) (*api.
return resp, nil
}
func (s *Service) Events(r *api.EventsRequest, server api.ContainerService_EventsServer) error {
func (s *Service) Events(r *api.EventsRequest, server api.Tasks_EventsServer) error {
w := &grpcEventWriter{
server: server,
}
@@ -307,7 +336,7 @@ func (s *Service) Events(r *api.EventsRequest, server api.ContainerService_Event
}
func (s *Service) Exec(ctx context.Context, r *api.ExecRequest) (*api.ExecResponse, error) {
c, err := s.getContainer(r.ID)
c, err := s.getTask(r.ContainerID)
if err != nil {
return nil, err
}
@@ -333,7 +362,7 @@ func (s *Service) Exec(ctx context.Context, r *api.ExecRequest) (*api.ExecRespon
}
func (s *Service) Pty(ctx context.Context, r *api.PtyRequest) (*google_protobuf.Empty, error) {
c, err := s.getContainer(r.ID)
c, err := s.getTask(r.ContainerID)
if err != nil {
return nil, err
}
@@ -347,7 +376,7 @@ func (s *Service) Pty(ctx context.Context, r *api.PtyRequest) (*google_protobuf.
}
func (s *Service) CloseStdin(ctx context.Context, r *api.CloseStdinRequest) (*google_protobuf.Empty, error) {
c, err := s.getContainer(r.ID)
c, err := s.getTask(r.ContainerID)
if err != nil {
return nil, err
}
@@ -358,7 +387,7 @@ func (s *Service) CloseStdin(ctx context.Context, r *api.CloseStdinRequest) (*go
}
func (s *Service) Checkpoint(ctx context.Context, r *api.CheckpointRequest) (*api.CheckpointResponse, error) {
c, err := s.getContainer(r.ID)
c, err := s.getTask(r.ContainerID)
if err != nil {
return nil, err
}
@@ -423,9 +452,9 @@ func (s *Service) writeContent(ctx context.Context, mediaType, ref string, r io.
}, nil
}
func (s *Service) getContainer(id string) (plugin.Container, error) {
func (s *Service) getTask(id string) (plugin.Task, error) {
s.mu.Lock()
c, ok := s.containers[id]
c, ok := s.tasks[id]
s.mu.Unlock()
if !ok {
return nil, plugin.ErrContainerNotExist
@@ -442,26 +471,26 @@ func (s *Service) getRuntime(name string) (plugin.Runtime, error) {
}
type grpcEventWriter struct {
server api.ContainerService_EventsServer
server api.Tasks_EventsServer
}
func (g *grpcEventWriter) Write(e *plugin.Event) error {
var t container.Event_EventType
var t task.Event_EventType
switch e.Type {
case plugin.ExitEvent:
t = container.Event_EXIT
t = task.Event_EXIT
case plugin.ExecAddEvent:
t = container.Event_EXEC_ADDED
t = task.Event_EXEC_ADDED
case plugin.PausedEvent:
t = container.Event_PAUSED
t = task.Event_PAUSED
case plugin.CreateEvent:
t = container.Event_CREATE
t = task.Event_CREATE
case plugin.StartEvent:
t = container.Event_START
t = task.Event_START
case plugin.OOMEvent:
t = container.Event_OOM
t = task.Event_OOM
}
return g.server.Send(&container.Event{
return g.server.Send(&task.Event{
Type: t,
ID: e.ID,
Pid: e.Pid,

View File

@@ -73,7 +73,7 @@ func (s *Service) Delete(ctx context.Context, req *imagesapi.DeleteRequest) (*em
}
func (s *Service) withStore(ctx context.Context, fn func(ctx context.Context, store images.Store) error) func(tx *bolt.Tx) error {
return func(tx *bolt.Tx) error { return fn(ctx, images.NewImageStore(tx)) }
return func(tx *bolt.Tx) error { return fn(ctx, images.NewStore(tx)) }
}
func (s *Service) withStoreView(ctx context.Context, fn func(ctx context.Context, store images.Store) error) error {