diff --git a/client.go b/client.go index 2ac256dd9..1ec0eb549 100644 --- a/client.go +++ b/client.go @@ -31,6 +31,7 @@ import ( eventsapi "github.com/containerd/containerd/api/services/events/v1" imagesapi "github.com/containerd/containerd/api/services/images/v1" introspectionapi "github.com/containerd/containerd/api/services/introspection/v1" + leasesapi "github.com/containerd/containerd/api/services/leases/v1" namespacesapi "github.com/containerd/containerd/api/services/namespaces/v1" snapshotsapi "github.com/containerd/containerd/api/services/snapshots/v1" "github.com/containerd/containerd/api/services/tasks/v1" @@ -39,6 +40,7 @@ import ( "github.com/containerd/containerd/content" "github.com/containerd/containerd/dialer" "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/events" "github.com/containerd/containerd/images" "github.com/containerd/containerd/namespaces" "github.com/containerd/containerd/platforms" @@ -75,54 +77,73 @@ func New(address string, opts ...ClientOpt) (*Client, error) { return nil, err } } - gopts := []grpc.DialOption{ - grpc.WithBlock(), - grpc.WithInsecure(), - grpc.WithTimeout(60 * time.Second), - grpc.FailOnNonTempDialError(true), - grpc.WithBackoffMaxDelay(3 * time.Second), - grpc.WithDialer(dialer.Dialer), + c := &Client{ + runtime: fmt.Sprintf("%s.%s", plugin.RuntimePlugin, runtime.GOOS), } - if len(copts.dialOptions) > 0 { - gopts = copts.dialOptions + if copts.services != nil { + c.services = *copts.services } - if copts.defaultns != "" { - unary, stream := newNSInterceptors(copts.defaultns) - gopts = append(gopts, - grpc.WithUnaryInterceptor(unary), - grpc.WithStreamInterceptor(stream), - ) - } - connector := func() (*grpc.ClientConn, error) { - conn, err := grpc.Dial(dialer.DialAddress(address), gopts...) - if err != nil { - return nil, errors.Wrapf(err, "failed to dial %q", address) + if address != "" { + gopts := []grpc.DialOption{ + grpc.WithBlock(), + grpc.WithInsecure(), + grpc.WithTimeout(60 * time.Second), + grpc.FailOnNonTempDialError(true), + grpc.WithBackoffMaxDelay(3 * time.Second), + grpc.WithDialer(dialer.Dialer), } - return conn, nil + if len(copts.dialOptions) > 0 { + gopts = copts.dialOptions + } + if copts.defaultns != "" { + unary, stream := newNSInterceptors(copts.defaultns) + gopts = append(gopts, + grpc.WithUnaryInterceptor(unary), + grpc.WithStreamInterceptor(stream), + ) + } + connector := func() (*grpc.ClientConn, error) { + conn, err := grpc.Dial(dialer.DialAddress(address), gopts...) + if err != nil { + return nil, errors.Wrapf(err, "failed to dial %q", address) + } + return conn, nil + } + conn, err := connector() + if err != nil { + return nil, err + } + c.conn, c.connector = conn, connector } - conn, err := connector() - if err != nil { - return nil, err + if copts.services == nil && c.conn == nil { + return nil, errors.New("no grpc connection or services is available") } - return &Client{ - conn: conn, - connector: connector, - runtime: fmt.Sprintf("%s.%s", plugin.RuntimePlugin, runtime.GOOS), - }, nil + return c, nil } // NewWithConn returns a new containerd client that is connected to the containerd // instance provided by the connection func NewWithConn(conn *grpc.ClientConn, opts ...ClientOpt) (*Client, error) { - return &Client{ + var copts clientOpts + for _, o := range opts { + if err := o(&copts); err != nil { + return nil, err + } + } + c := &Client{ conn: conn, runtime: fmt.Sprintf("%s.%s", plugin.RuntimePlugin, runtime.GOOS), - }, nil + } + if copts.services != nil { + c.services = *copts.services + } + return c, nil } // Client is the client to interact with containerd and its various services // using a uniform interface type Client struct { + services conn *grpc.ClientConn runtime string connector func() (*grpc.ClientConn, error) @@ -149,6 +170,9 @@ func (c *Client) Reconnect() error { // connection. A timeout can be set in the context to ensure it returns // early. func (c *Client) IsServing(ctx context.Context) (bool, error) { + if c.conn == nil { + return false, errors.New("no grpc connection available") + } r, err := c.HealthService().Check(ctx, &grpc_health_v1.HealthCheckRequest{}, grpc.FailFast(false)) if err != nil { return false, err @@ -385,43 +409,8 @@ func (c *Client) ListImages(ctx context.Context, filters ...string) ([]Image, er // // The subscriber can stop receiving events by canceling the provided context. // The errs channel will be closed and return a nil error. -func (c *Client) Subscribe(ctx context.Context, filters ...string) (ch <-chan *eventsapi.Envelope, errs <-chan error) { - var ( - evq = make(chan *eventsapi.Envelope) - errq = make(chan error, 1) - ) - - errs = errq - ch = evq - - session, err := c.EventService().Subscribe(ctx, &eventsapi.SubscribeRequest{ - Filters: filters, - }) - if err != nil { - errq <- err - close(errq) - return - } - - go func() { - defer close(errq) - - for { - ev, err := session.Recv() - if err != nil { - errq <- err - return - } - - select { - case evq <- ev: - case <-ctx.Done(): - return - } - } - }() - - return ch, errs +func (c *Client) Subscribe(ctx context.Context, filters ...string) (ch <-chan *events.Envelope, errs <-chan error) { + return c.EventService().Subscribe(ctx, filters...) } // Close closes the clients connection to containerd @@ -431,36 +420,57 @@ func (c *Client) Close() error { // NamespaceService returns the underlying Namespaces Store func (c *Client) NamespaceService() namespaces.Store { + if c.namespaceStore != nil { + return c.namespaceStore + } return NewNamespaceStoreFromClient(namespacesapi.NewNamespacesClient(c.conn)) } // ContainerService returns the underlying container Store func (c *Client) ContainerService() containers.Store { + if c.containerStore != nil { + return c.containerStore + } return NewRemoteContainerStore(containersapi.NewContainersClient(c.conn)) } // ContentStore returns the underlying content Store func (c *Client) ContentStore() content.Store { + if c.contentStore != nil { + return c.contentStore + } return NewContentStoreFromClient(contentapi.NewContentClient(c.conn)) } // SnapshotService returns the underlying snapshotter for the provided snapshotter name func (c *Client) SnapshotService(snapshotterName string) snapshots.Snapshotter { + if c.snapshotters != nil { + return c.snapshotters[snapshotterName] + } return NewSnapshotterFromClient(snapshotsapi.NewSnapshotsClient(c.conn), snapshotterName) } // TaskService returns the underlying TasksClient func (c *Client) TaskService() tasks.TasksClient { + if c.taskService != nil { + return c.taskService + } return tasks.NewTasksClient(c.conn) } // ImageService returns the underlying image Store func (c *Client) ImageService() images.Store { + if c.imageStore != nil { + return c.imageStore + } return NewImageStoreFromClient(imagesapi.NewImagesClient(c.conn)) } // DiffService returns the underlying Differ func (c *Client) DiffService() DiffService { + if c.diffService != nil { + return c.diffService + } return NewDiffServiceFromClient(diffapi.NewDiffClient(c.conn)) } @@ -469,14 +479,25 @@ func (c *Client) IntrospectionService() introspectionapi.IntrospectionClient { return introspectionapi.NewIntrospectionClient(c.conn) } +// LeasesService returns the underlying Leases Client +func (c *Client) LeasesService() leasesapi.LeasesClient { + if c.leasesService != nil { + return c.leasesService + } + return leasesapi.NewLeasesClient(c.conn) +} + // HealthService returns the underlying GRPC HealthClient func (c *Client) HealthService() grpc_health_v1.HealthClient { return grpc_health_v1.NewHealthClient(c.conn) } -// EventService returns the underlying EventsClient -func (c *Client) EventService() eventsapi.EventsClient { - return eventsapi.NewEventsClient(c.conn) +// EventService returns the underlying event service +func (c *Client) EventService() EventService { + if c.eventService != nil { + return c.eventService + } + return NewEventServiceFromClient(eventsapi.NewEventsClient(c.conn)) } // VersionService returns the underlying VersionClient @@ -494,6 +515,9 @@ type Version struct { // Version returns the version of containerd that the client is connected to func (c *Client) Version(ctx context.Context) (Version, error) { + if c.conn == nil { + return Version{}, errors.New("no grpc connection available") + } response, err := c.VersionService().Version(ctx, &ptypes.Empty{}) if err != nil { return Version{}, err diff --git a/client_opts.go b/client_opts.go index dfa02ee7b..1859c4865 100644 --- a/client_opts.go +++ b/client_opts.go @@ -24,6 +24,7 @@ import ( type clientOpts struct { defaultns string + services *services dialOptions []grpc.DialOption } @@ -49,6 +50,17 @@ func WithDialOpts(opts []grpc.DialOption) ClientOpt { } } +// WithServices sets services used by the client. +func WithServices(opts ...ServicesOpt) ClientOpt { + return func(c *clientOpts) error { + c.services = &services{} + for _, o := range opts { + o(c.services) + } + return nil + } +} + // RemoteOpt allows the caller to set distribution options for a remote type RemoteOpt func(*Client, *RemoteContext) error diff --git a/cmd/ctr/commands/events/events.go b/cmd/ctr/commands/events/events.go index ee81ced7c..083ee041f 100644 --- a/cmd/ctr/commands/events/events.go +++ b/cmd/ctr/commands/events/events.go @@ -20,8 +20,8 @@ import ( "encoding/json" "fmt" - eventsapi "github.com/containerd/containerd/api/services/events/v1" "github.com/containerd/containerd/cmd/ctr/commands" + "github.com/containerd/containerd/events" "github.com/containerd/typeurl" "github.com/urfave/cli" @@ -41,15 +41,16 @@ var Command = cli.Command{ } defer cancel() eventsClient := client.EventService() - events, err := eventsClient.Subscribe(ctx, &eventsapi.SubscribeRequest{ - Filters: context.Args(), - }) - if err != nil { - return err - } + eventsCh, errCh := eventsClient.Subscribe(ctx, context.Args()...) for { - e, err := events.Recv() - if err != nil { + var e *events.Envelope + select { + case evt, closed := <-eventsCh: + if closed { + return nil + } + e = evt + case err := <-errCh: return err } diff --git a/diff.go b/diff.go index af95e03e7..8d1219e35 100644 --- a/diff.go +++ b/diff.go @@ -22,6 +22,7 @@ import ( diffapi "github.com/containerd/containerd/api/services/diff/v1" "github.com/containerd/containerd/api/types" "github.com/containerd/containerd/diff" + "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/mount" ocispec "github.com/opencontainers/image-spec/specs-go/v1" ) @@ -51,7 +52,7 @@ func (r *diffRemote) Apply(ctx context.Context, diff ocispec.Descriptor, mounts } resp, err := r.client.Apply(ctx, req) if err != nil { - return ocispec.Descriptor{}, err + return ocispec.Descriptor{}, errdefs.FromGRPC(err) } return toDescriptor(resp.Applied), nil } @@ -72,7 +73,7 @@ func (r *diffRemote) Compare(ctx context.Context, a, b []mount.Mount, opts ...di } resp, err := r.client.Diff(ctx, req) if err != nil { - return ocispec.Descriptor{}, err + return ocispec.Descriptor{}, errdefs.FromGRPC(err) } return toDescriptor(resp.Diff), nil } diff --git a/events.go b/events.go new file mode 100644 index 000000000..92e9cd510 --- /dev/null +++ b/events.go @@ -0,0 +1,119 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package containerd + +import ( + "context" + + eventsapi "github.com/containerd/containerd/api/services/events/v1" + "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/events" + "github.com/containerd/typeurl" +) + +// EventService handles the publish, forward and subscribe of events. +type EventService interface { + events.Publisher + events.Forwarder + events.Subscriber +} + +// NewEventServiceFromClient returns a new event service which communicates +// over a GRPC connection. +func NewEventServiceFromClient(client eventsapi.EventsClient) EventService { + return &eventRemote{ + client: client, + } +} + +type eventRemote struct { + client eventsapi.EventsClient +} + +func (e *eventRemote) Publish(ctx context.Context, topic string, event events.Event) error { + any, err := typeurl.MarshalAny(event) + if err != nil { + return err + } + req := &eventsapi.PublishRequest{ + Topic: topic, + Event: any, + } + if _, err := e.client.Publish(ctx, req); err != nil { + return errdefs.FromGRPC(err) + } + return nil +} + +func (e *eventRemote) Forward(ctx context.Context, envelope *events.Envelope) error { + req := &eventsapi.ForwardRequest{ + Envelope: &eventsapi.Envelope{ + Timestamp: envelope.Timestamp, + Namespace: envelope.Namespace, + Topic: envelope.Topic, + Event: envelope.Event, + }, + } + if _, err := e.client.Forward(ctx, req); err != nil { + return errdefs.FromGRPC(err) + } + return nil +} + +func (e *eventRemote) Subscribe(ctx context.Context, filters ...string) (ch <-chan *events.Envelope, errs <-chan error) { + var ( + evq = make(chan *events.Envelope) + errq = make(chan error, 1) + ) + + errs = errq + ch = evq + + session, err := e.client.Subscribe(ctx, &eventsapi.SubscribeRequest{ + Filters: filters, + }) + if err != nil { + errq <- err + close(errq) + return + } + + go func() { + defer close(errq) + + for { + ev, err := session.Recv() + if err != nil { + errq <- err + return + } + + select { + case evq <- &events.Envelope{ + Timestamp: ev.Timestamp, + Namespace: ev.Namespace, + Topic: ev.Topic, + Event: ev.Event, + }: + case <-ctx.Done(): + return + } + } + }() + + return ch, errs +} diff --git a/lease.go b/lease.go index 6187c1df7..5fc7833f8 100644 --- a/lease.go +++ b/lease.go @@ -36,7 +36,7 @@ type Lease struct { // CreateLease creates a new lease func (c *Client) CreateLease(ctx context.Context) (Lease, error) { - lapi := leasesapi.NewLeasesClient(c.conn) + lapi := c.LeasesService() resp, err := lapi.Create(ctx, &leasesapi.CreateRequest{}) if err != nil { return Lease{}, err @@ -50,7 +50,7 @@ func (c *Client) CreateLease(ctx context.Context) (Lease, error) { // ListLeases lists active leases func (c *Client) ListLeases(ctx context.Context) ([]Lease, error) { - lapi := leasesapi.NewLeasesClient(c.conn) + lapi := c.LeasesService() resp, err := lapi.List(ctx, &leasesapi.ListRequest{}) if err != nil { return nil, err @@ -100,7 +100,7 @@ func (l Lease) CreatedAt() time.Time { // Delete deletes the lease, removing the reference to all resources created // during the lease. func (l Lease) Delete(ctx context.Context) error { - lapi := leasesapi.NewLeasesClient(l.client.conn) + lapi := l.client.LeasesService() _, err := lapi.Delete(ctx, &leasesapi.DeleteRequest{ ID: l.id, }) diff --git a/metadata/db.go b/metadata/db.go index f08e1d46c..7296d8caa 100644 --- a/metadata/db.go +++ b/metadata/db.go @@ -195,6 +195,15 @@ func (m *DB) Snapshotter(name string) snapshots.Snapshotter { return sn } +// Snapshotters returns all available snapshotters. +func (m *DB) Snapshotters() map[string]snapshots.Snapshotter { + ss := make(map[string]snapshots.Snapshotter, len(m.ss)) + for n, sn := range m.ss { + ss[n] = sn + } + return ss +} + // View runs a readonly transaction on the metadata store. func (m *DB) View(fn func(*bolt.Tx) error) error { return m.db.View(fn) diff --git a/plugin/plugin.go b/plugin/plugin.go index 470429a0c..584547aff 100644 --- a/plugin/plugin.go +++ b/plugin/plugin.go @@ -58,6 +58,8 @@ const ( AllPlugins Type = "*" // RuntimePlugin implements a runtime RuntimePlugin Type = "io.containerd.runtime.v1" + // ServicePlugin implements a internal service + ServicePlugin Type = "io.containerd.service.v1" // GRPCPlugin implements a grpc service GRPCPlugin Type = "io.containerd.grpc.v1" // SnapshotPlugin implements a snapshotter diff --git a/services.go b/services.go new file mode 100644 index 000000000..daa7b897e --- /dev/null +++ b/services.go @@ -0,0 +1,112 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package containerd + +import ( + containersapi "github.com/containerd/containerd/api/services/containers/v1" + "github.com/containerd/containerd/api/services/diff/v1" + imagesapi "github.com/containerd/containerd/api/services/images/v1" + "github.com/containerd/containerd/api/services/leases/v1" + namespacesapi "github.com/containerd/containerd/api/services/namespaces/v1" + "github.com/containerd/containerd/api/services/tasks/v1" + "github.com/containerd/containerd/containers" + "github.com/containerd/containerd/content" + "github.com/containerd/containerd/images" + "github.com/containerd/containerd/namespaces" + "github.com/containerd/containerd/snapshots" +) + +type services struct { + contentStore content.Store + imageStore images.Store + containerStore containers.Store + namespaceStore namespaces.Store + snapshotters map[string]snapshots.Snapshotter + taskService tasks.TasksClient + diffService DiffService + eventService EventService + leasesService leases.LeasesClient +} + +// ServicesOpt allows callers to set options on the services +type ServicesOpt func(c *services) + +// WithContentStore sets the content store. +func WithContentStore(contentStore content.Store) ServicesOpt { + return func(s *services) { + s.contentStore = contentStore + } +} + +// WithImageService sets the image service. +func WithImageService(imageService imagesapi.ImagesClient) ServicesOpt { + return func(s *services) { + s.imageStore = NewImageStoreFromClient(imageService) + } +} + +// WithSnapshotters sets the snapshotters. +func WithSnapshotters(snapshotters map[string]snapshots.Snapshotter) ServicesOpt { + return func(s *services) { + s.snapshotters = make(map[string]snapshots.Snapshotter) + for n, sn := range snapshotters { + s.snapshotters[n] = sn + } + } +} + +// WithContainerService sets the container service. +func WithContainerService(containerService containersapi.ContainersClient) ServicesOpt { + return func(s *services) { + s.containerStore = NewRemoteContainerStore(containerService) + } +} + +// WithTaskService sets the task service. +func WithTaskService(taskService tasks.TasksClient) ServicesOpt { + return func(s *services) { + s.taskService = taskService + } +} + +// WithDiffService sets the diff service. +func WithDiffService(diffService diff.DiffClient) ServicesOpt { + return func(s *services) { + s.diffService = NewDiffServiceFromClient(diffService) + } +} + +// WithEventService sets the event service. +func WithEventService(eventService EventService) ServicesOpt { + return func(s *services) { + s.eventService = eventService + } +} + +// WithNamespaceService sets the namespace service. +func WithNamespaceService(namespaceService namespacesapi.NamespacesClient) ServicesOpt { + return func(s *services) { + s.namespaceStore = NewNamespaceStoreFromClient(namespaceService) + } +} + +// WithLeasesService sets the lease service. +func WithLeasesService(leasesService leases.LeasesClient) ServicesOpt { + return func(s *services) { + s.leasesService = leasesService + } +} diff --git a/services/containers/local.go b/services/containers/local.go new file mode 100644 index 000000000..1e59da6c3 --- /dev/null +++ b/services/containers/local.go @@ -0,0 +1,189 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package containers + +import ( + "github.com/boltdb/bolt" + eventstypes "github.com/containerd/containerd/api/events" + api "github.com/containerd/containerd/api/services/containers/v1" + "github.com/containerd/containerd/containers" + "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/events" + "github.com/containerd/containerd/metadata" + "github.com/containerd/containerd/plugin" + "github.com/containerd/containerd/services" + ptypes "github.com/gogo/protobuf/types" + "golang.org/x/net/context" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +func init() { + plugin.Register(&plugin.Registration{ + Type: plugin.ServicePlugin, + ID: services.ContainersService, + Requires: []plugin.Type{ + plugin.MetadataPlugin, + }, + InitFn: func(ic *plugin.InitContext) (interface{}, error) { + m, err := ic.Get(plugin.MetadataPlugin) + if err != nil { + return nil, err + } + return &local{ + db: m.(*metadata.DB), + publisher: ic.Events, + }, nil + }, + }) +} + +type local struct { + db *metadata.DB + publisher events.Publisher +} + +var _ api.ContainersClient = &local{} + +func (l *local) Get(ctx context.Context, req *api.GetContainerRequest, _ ...grpc.CallOption) (*api.GetContainerResponse, error) { + var resp api.GetContainerResponse + + return &resp, errdefs.ToGRPC(l.withStoreView(ctx, func(ctx context.Context, store containers.Store) error { + container, err := store.Get(ctx, req.ID) + if err != nil { + return err + } + containerpb := containerToProto(&container) + resp.Container = containerpb + + return nil + })) +} + +func (l *local) List(ctx context.Context, req *api.ListContainersRequest, _ ...grpc.CallOption) (*api.ListContainersResponse, error) { + var resp api.ListContainersResponse + + return &resp, errdefs.ToGRPC(l.withStoreView(ctx, func(ctx context.Context, store containers.Store) error { + containers, err := store.List(ctx, req.Filters...) + if err != nil { + return err + } + + resp.Containers = containersToProto(containers) + return nil + })) +} + +func (l *local) Create(ctx context.Context, req *api.CreateContainerRequest, _ ...grpc.CallOption) (*api.CreateContainerResponse, error) { + var resp api.CreateContainerResponse + + if err := l.withStoreUpdate(ctx, func(ctx context.Context, store containers.Store) error { + container := containerFromProto(&req.Container) + + created, err := store.Create(ctx, container) + if err != nil { + return err + } + + resp.Container = containerToProto(&created) + + return nil + }); err != nil { + return &resp, errdefs.ToGRPC(err) + } + if err := l.publisher.Publish(ctx, "/containers/create", &eventstypes.ContainerCreate{ + ID: resp.Container.ID, + Image: resp.Container.Image, + Runtime: &eventstypes.ContainerCreate_Runtime{ + Name: resp.Container.Runtime.Name, + Options: resp.Container.Runtime.Options, + }, + }); err != nil { + return &resp, err + } + + return &resp, nil +} + +func (l *local) Update(ctx context.Context, req *api.UpdateContainerRequest, _ ...grpc.CallOption) (*api.UpdateContainerResponse, error) { + if req.Container.ID == "" { + return nil, status.Errorf(codes.InvalidArgument, "Container.ID required") + } + var ( + resp api.UpdateContainerResponse + container = containerFromProto(&req.Container) + ) + + if err := l.withStoreUpdate(ctx, func(ctx context.Context, store containers.Store) error { + var fieldpaths []string + if req.UpdateMask != nil && len(req.UpdateMask.Paths) > 0 { + for _, path := range req.UpdateMask.Paths { + fieldpaths = append(fieldpaths, path) + } + } + + updated, err := store.Update(ctx, container, fieldpaths...) + if err != nil { + return err + } + + resp.Container = containerToProto(&updated) + return nil + }); err != nil { + return &resp, errdefs.ToGRPC(err) + } + + if err := l.publisher.Publish(ctx, "/containers/update", &eventstypes.ContainerUpdate{ + ID: resp.Container.ID, + Image: resp.Container.Image, + Labels: resp.Container.Labels, + SnapshotKey: resp.Container.SnapshotKey, + }); err != nil { + return &resp, err + } + + return &resp, nil +} + +func (l *local) Delete(ctx context.Context, req *api.DeleteContainerRequest, _ ...grpc.CallOption) (*ptypes.Empty, error) { + if err := l.withStoreUpdate(ctx, func(ctx context.Context, store containers.Store) error { + return store.Delete(ctx, req.ID) + }); err != nil { + return &ptypes.Empty{}, errdefs.ToGRPC(err) + } + + if err := l.publisher.Publish(ctx, "/containers/delete", &eventstypes.ContainerDelete{ + ID: req.ID, + }); err != nil { + return &ptypes.Empty{}, err + } + + return &ptypes.Empty{}, nil +} + +func (l *local) withStore(ctx context.Context, fn func(ctx context.Context, store containers.Store) error) func(tx *bolt.Tx) error { + return func(tx *bolt.Tx) error { return fn(ctx, metadata.NewContainerStore(tx)) } +} + +func (l *local) withStoreView(ctx context.Context, fn func(ctx context.Context, store containers.Store) error) error { + return l.db.View(l.withStore(ctx, fn)) +} + +func (l *local) withStoreUpdate(ctx context.Context, fn func(ctx context.Context, store containers.Store) error) error { + return l.db.Update(l.withStore(ctx, fn)) +} diff --git a/services/containers/service.go b/services/containers/service.go index 162cae630..f25c0b0ff 100644 --- a/services/containers/service.go +++ b/services/containers/service.go @@ -17,19 +17,13 @@ package containers import ( - "github.com/boltdb/bolt" - eventstypes "github.com/containerd/containerd/api/events" api "github.com/containerd/containerd/api/services/containers/v1" - "github.com/containerd/containerd/containers" - "github.com/containerd/containerd/errdefs" - "github.com/containerd/containerd/events" - "github.com/containerd/containerd/metadata" "github.com/containerd/containerd/plugin" + "github.com/containerd/containerd/services" ptypes "github.com/gogo/protobuf/types" + "github.com/pkg/errors" "golang.org/x/net/context" "google.golang.org/grpc" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" ) func init() { @@ -37,27 +31,31 @@ func init() { Type: plugin.GRPCPlugin, ID: "containers", Requires: []plugin.Type{ - plugin.MetadataPlugin, + plugin.ServicePlugin, }, InitFn: func(ic *plugin.InitContext) (interface{}, error) { - m, err := ic.Get(plugin.MetadataPlugin) + plugins, err := ic.GetByType(plugin.ServicePlugin) if err != nil { return nil, err } - return NewService(m.(*metadata.DB), ic.Events), nil + p, ok := plugins[services.ContainersService] + if !ok { + return nil, errors.New("containers service not found") + } + i, err := p.Instance() + if err != nil { + return nil, err + } + return &service{local: i.(api.ContainersClient)}, nil }, }) } type service struct { - db *metadata.DB - publisher events.Publisher + local api.ContainersClient } -// NewService returns the container GRPC server -func NewService(db *metadata.DB, publisher events.Publisher) api.ContainersServer { - return &service{db: db, publisher: publisher} -} +var _ api.ContainersServer = &service{} func (s *service) Register(server *grpc.Server) error { api.RegisterContainersServer(server, s) @@ -65,129 +63,21 @@ func (s *service) Register(server *grpc.Server) error { } func (s *service) Get(ctx context.Context, req *api.GetContainerRequest) (*api.GetContainerResponse, error) { - var resp api.GetContainerResponse - - return &resp, errdefs.ToGRPC(s.withStoreView(ctx, func(ctx context.Context, store containers.Store) error { - container, err := store.Get(ctx, req.ID) - if err != nil { - return err - } - containerpb := containerToProto(&container) - resp.Container = containerpb - - return nil - })) + return s.local.Get(ctx, req) } func (s *service) List(ctx context.Context, req *api.ListContainersRequest) (*api.ListContainersResponse, error) { - var resp api.ListContainersResponse - - return &resp, errdefs.ToGRPC(s.withStoreView(ctx, func(ctx context.Context, store containers.Store) error { - containers, err := store.List(ctx, req.Filters...) - if err != nil { - return err - } - - resp.Containers = containersToProto(containers) - return nil - })) + return s.local.List(ctx, req) } func (s *service) Create(ctx context.Context, req *api.CreateContainerRequest) (*api.CreateContainerResponse, error) { - var resp api.CreateContainerResponse - - if err := s.withStoreUpdate(ctx, func(ctx context.Context, store containers.Store) error { - container := containerFromProto(&req.Container) - - created, err := store.Create(ctx, container) - if err != nil { - return err - } - - resp.Container = containerToProto(&created) - - return nil - }); err != nil { - return &resp, errdefs.ToGRPC(err) - } - if err := s.publisher.Publish(ctx, "/containers/create", &eventstypes.ContainerCreate{ - ID: resp.Container.ID, - Image: resp.Container.Image, - Runtime: &eventstypes.ContainerCreate_Runtime{ - Name: resp.Container.Runtime.Name, - Options: resp.Container.Runtime.Options, - }, - }); err != nil { - return &resp, err - } - - return &resp, nil + return s.local.Create(ctx, req) } func (s *service) Update(ctx context.Context, req *api.UpdateContainerRequest) (*api.UpdateContainerResponse, error) { - if req.Container.ID == "" { - return nil, status.Errorf(codes.InvalidArgument, "Container.ID required") - } - var ( - resp api.UpdateContainerResponse - container = containerFromProto(&req.Container) - ) - - if err := s.withStoreUpdate(ctx, func(ctx context.Context, store containers.Store) error { - var fieldpaths []string - if req.UpdateMask != nil && len(req.UpdateMask.Paths) > 0 { - for _, path := range req.UpdateMask.Paths { - fieldpaths = append(fieldpaths, path) - } - } - - updated, err := store.Update(ctx, container, fieldpaths...) - if err != nil { - return err - } - - resp.Container = containerToProto(&updated) - return nil - }); err != nil { - return &resp, errdefs.ToGRPC(err) - } - - if err := s.publisher.Publish(ctx, "/containers/update", &eventstypes.ContainerUpdate{ - ID: resp.Container.ID, - Image: resp.Container.Image, - Labels: resp.Container.Labels, - SnapshotKey: resp.Container.SnapshotKey, - }); err != nil { - return &resp, err - } - - return &resp, nil + return s.local.Update(ctx, req) } func (s *service) Delete(ctx context.Context, req *api.DeleteContainerRequest) (*ptypes.Empty, error) { - if err := s.withStoreUpdate(ctx, func(ctx context.Context, store containers.Store) error { - return store.Delete(ctx, req.ID) - }); err != nil { - return &ptypes.Empty{}, errdefs.ToGRPC(err) - } - - if err := s.publisher.Publish(ctx, "/containers/delete", &eventstypes.ContainerDelete{ - ID: req.ID, - }); err != nil { - return &ptypes.Empty{}, err - } - - return &ptypes.Empty{}, nil -} - -func (s *service) withStore(ctx context.Context, fn func(ctx context.Context, store containers.Store) error) func(tx *bolt.Tx) error { - return func(tx *bolt.Tx) error { return fn(ctx, metadata.NewContainerStore(tx)) } -} - -func (s *service) withStoreView(ctx context.Context, fn func(ctx context.Context, store containers.Store) error) error { - return s.db.View(s.withStore(ctx, fn)) -} - -func (s *service) withStoreUpdate(ctx context.Context, fn func(ctx context.Context, store containers.Store) error) error { - return s.db.Update(s.withStore(ctx, fn)) + return s.local.Delete(ctx, req) } diff --git a/services/content/service.go b/services/content/service.go index d24fb6eba..a2c87a4d5 100644 --- a/services/content/service.go +++ b/services/content/service.go @@ -20,14 +20,12 @@ import ( "io" "sync" - eventstypes "github.com/containerd/containerd/api/events" api "github.com/containerd/containerd/api/services/content/v1" "github.com/containerd/containerd/content" "github.com/containerd/containerd/errdefs" - "github.com/containerd/containerd/events" "github.com/containerd/containerd/log" - "github.com/containerd/containerd/metadata" "github.com/containerd/containerd/plugin" + "github.com/containerd/containerd/services" ptypes "github.com/gogo/protobuf/types" digest "github.com/opencontainers/go-digest" "github.com/pkg/errors" @@ -39,8 +37,7 @@ import ( ) type service struct { - store content.Store - publisher events.Publisher + store content.Store } var bufPool = sync.Pool{ @@ -57,26 +54,29 @@ func init() { Type: plugin.GRPCPlugin, ID: "content", Requires: []plugin.Type{ - plugin.MetadataPlugin, + plugin.ServicePlugin, }, InitFn: func(ic *plugin.InitContext) (interface{}, error) { - m, err := ic.Get(plugin.MetadataPlugin) + plugins, err := ic.GetByType(plugin.ServicePlugin) if err != nil { return nil, err } - - s, err := NewService(m.(*metadata.DB).ContentStore(), ic.Events) - return s, err + p, ok := plugins[services.ContentService] + if !ok { + return nil, errors.New("content store service not found") + } + cs, err := p.Instance() + if err != nil { + return nil, err + } + return newService(cs.(content.Store)), nil }, }) } -// NewService returns the content GRPC server -func NewService(cs content.Store, publisher events.Publisher) (api.ContentServer, error) { - return &service{ - store: cs, - publisher: publisher, - }, nil +// newService returns the content GRPC server +func newService(cs content.Store) api.ContentServer { + return &service{store: cs} } func (s *service) Register(server *grpc.Server) error { @@ -166,12 +166,6 @@ func (s *service) Delete(ctx context.Context, req *api.DeleteContentRequest) (*p return nil, errdefs.ToGRPC(err) } - if err := s.publisher.Publish(ctx, "/content/delete", &eventstypes.ContentDelete{ - Digest: req.Digest, - }); err != nil { - return nil, err - } - return &ptypes.Empty{}, nil } diff --git a/services/content/store.go b/services/content/store.go new file mode 100644 index 000000000..3de91d37c --- /dev/null +++ b/services/content/store.go @@ -0,0 +1,71 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package content + +import ( + "context" + + eventstypes "github.com/containerd/containerd/api/events" + "github.com/containerd/containerd/content" + "github.com/containerd/containerd/events" + "github.com/containerd/containerd/metadata" + "github.com/containerd/containerd/plugin" + "github.com/containerd/containerd/services" + digest "github.com/opencontainers/go-digest" +) + +// store wraps content.Store with proper event published. +type store struct { + content.Store + publisher events.Publisher +} + +func init() { + plugin.Register(&plugin.Registration{ + Type: plugin.ServicePlugin, + ID: services.ContentService, + Requires: []plugin.Type{ + plugin.MetadataPlugin, + }, + InitFn: func(ic *plugin.InitContext) (interface{}, error) { + m, err := ic.Get(plugin.MetadataPlugin) + if err != nil { + return nil, err + } + + s, err := newContentStore(m.(*metadata.DB).ContentStore(), ic.Events) + return s, err + }, + }) +} + +func newContentStore(cs content.Store, publisher events.Publisher) (content.Store, error) { + return &store{ + Store: cs, + publisher: publisher, + }, nil +} + +func (s *store) Delete(ctx context.Context, dgst digest.Digest) error { + if err := s.Store.Delete(ctx, dgst); err != nil { + return err + } + // TODO: Consider whether we should return error here. + return s.publisher.Publish(ctx, "/content/delete", &eventstypes.ContentDelete{ + Digest: dgst, + }) +} diff --git a/services/diff/local.go b/services/diff/local.go new file mode 100644 index 000000000..0335b1409 --- /dev/null +++ b/services/diff/local.go @@ -0,0 +1,178 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package diff + +import ( + diffapi "github.com/containerd/containerd/api/services/diff/v1" + "github.com/containerd/containerd/api/types" + "github.com/containerd/containerd/diff" + "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/mount" + "github.com/containerd/containerd/plugin" + "github.com/containerd/containerd/services" + ocispec "github.com/opencontainers/image-spec/specs-go/v1" + "github.com/pkg/errors" + "golang.org/x/net/context" + "google.golang.org/grpc" +) + +type config struct { + // Order is the order of preference in which to try diff algorithms, the + // first differ which is supported is used. + // Note when multiple differs may be supported, this order will be + // respected for which is choosen. Each differ should return the same + // correct output, allowing any ordering to be used to prefer + // more optimimal implementations. + Order []string `toml:"default"` +} + +type differ interface { + diff.Comparer + diff.Applier +} + +func init() { + plugin.Register(&plugin.Registration{ + Type: plugin.ServicePlugin, + ID: services.DiffService, + Requires: []plugin.Type{ + plugin.DiffPlugin, + }, + Config: defaultDifferConfig, + InitFn: func(ic *plugin.InitContext) (interface{}, error) { + differs, err := ic.GetByType(plugin.DiffPlugin) + if err != nil { + return nil, err + } + + orderedNames := ic.Config.(*config).Order + ordered := make([]differ, len(orderedNames)) + for i, n := range orderedNames { + differp, ok := differs[n] + if !ok { + return nil, errors.Errorf("needed differ not loaded: %s", n) + } + d, err := differp.Instance() + if err != nil { + return nil, errors.Wrapf(err, "could not load required differ due plugin init error: %s", n) + } + + ordered[i], ok = d.(differ) + if !ok { + return nil, errors.Errorf("differ does not implement Comparer and Applier interface: %s", n) + } + } + + return &local{ + differs: ordered, + }, nil + }, + }) +} + +type local struct { + differs []differ +} + +var _ diffapi.DiffClient = &local{} + +func (l *local) Apply(ctx context.Context, er *diffapi.ApplyRequest, _ ...grpc.CallOption) (*diffapi.ApplyResponse, error) { + var ( + ocidesc ocispec.Descriptor + err error + desc = toDescriptor(er.Diff) + mounts = toMounts(er.Mounts) + ) + + for _, differ := range l.differs { + ocidesc, err = differ.Apply(ctx, desc, mounts) + if !errdefs.IsNotImplemented(err) { + break + } + } + + if err != nil { + return nil, errdefs.ToGRPC(err) + } + + return &diffapi.ApplyResponse{ + Applied: fromDescriptor(ocidesc), + }, nil + +} + +func (l *local) Diff(ctx context.Context, dr *diffapi.DiffRequest, _ ...grpc.CallOption) (*diffapi.DiffResponse, error) { + var ( + ocidesc ocispec.Descriptor + err error + aMounts = toMounts(dr.Left) + bMounts = toMounts(dr.Right) + ) + + var opts []diff.Opt + if dr.MediaType != "" { + opts = append(opts, diff.WithMediaType(dr.MediaType)) + } + if dr.Ref != "" { + opts = append(opts, diff.WithReference(dr.Ref)) + } + if dr.Labels != nil { + opts = append(opts, diff.WithLabels(dr.Labels)) + } + + for _, d := range l.differs { + ocidesc, err = d.Compare(ctx, aMounts, bMounts, opts...) + if !errdefs.IsNotImplemented(err) { + break + } + } + if err != nil { + return nil, errdefs.ToGRPC(err) + } + + return &diffapi.DiffResponse{ + Diff: fromDescriptor(ocidesc), + }, nil +} + +func toMounts(apim []*types.Mount) []mount.Mount { + mounts := make([]mount.Mount, len(apim)) + for i, m := range apim { + mounts[i] = mount.Mount{ + Type: m.Type, + Source: m.Source, + Options: m.Options, + } + } + return mounts +} + +func toDescriptor(d *types.Descriptor) ocispec.Descriptor { + return ocispec.Descriptor{ + MediaType: d.MediaType, + Digest: d.Digest, + Size: d.Size_, + } +} + +func fromDescriptor(d ocispec.Descriptor) *types.Descriptor { + return &types.Descriptor{ + MediaType: d.MediaType, + Digest: d.Digest, + Size_: d.Size, + } +} diff --git a/services/diff/service.go b/services/diff/service.go index 7053dace1..e12c4a062 100644 --- a/services/diff/service.go +++ b/services/diff/service.go @@ -18,163 +18,53 @@ package diff import ( diffapi "github.com/containerd/containerd/api/services/diff/v1" - "github.com/containerd/containerd/api/types" - "github.com/containerd/containerd/diff" - "github.com/containerd/containerd/errdefs" - "github.com/containerd/containerd/mount" "github.com/containerd/containerd/plugin" - ocispec "github.com/opencontainers/image-spec/specs-go/v1" + "github.com/containerd/containerd/services" "github.com/pkg/errors" "golang.org/x/net/context" "google.golang.org/grpc" ) -type config struct { - // Order is the order of preference in which to try diff algorithms, the - // first differ which is supported is used. - // Note when multiple differs may be supported, this order will be - // respected for which is choosen. Each differ should return the same - // correct output, allowing any ordering to be used to prefer - // more optimimal implementations. - Order []string `toml:"default"` -} - -type differ interface { - diff.Comparer - diff.Applier -} - func init() { plugin.Register(&plugin.Registration{ Type: plugin.GRPCPlugin, ID: "diff", Requires: []plugin.Type{ - plugin.DiffPlugin, + plugin.ServicePlugin, }, - Config: defaultDifferConfig, InitFn: func(ic *plugin.InitContext) (interface{}, error) { - differs, err := ic.GetByType(plugin.DiffPlugin) + plugins, err := ic.GetByType(plugin.ServicePlugin) if err != nil { return nil, err } - - orderedNames := ic.Config.(*config).Order - ordered := make([]differ, len(orderedNames)) - for i, n := range orderedNames { - differp, ok := differs[n] - if !ok { - return nil, errors.Errorf("needed differ not loaded: %s", n) - } - d, err := differp.Instance() - if err != nil { - return nil, errors.Wrapf(err, "could not load required differ due plugin init error: %s", n) - } - - ordered[i], ok = d.(differ) - if !ok { - return nil, errors.Errorf("differ does not implement Comparer and Applier interface: %s", n) - } + p, ok := plugins[services.DiffService] + if !ok { + return nil, errors.New("diff service not found") } - - return &service{ - differs: ordered, - }, nil + i, err := p.Instance() + if err != nil { + return nil, err + } + return &service{local: i.(diffapi.DiffClient)}, nil }, }) } type service struct { - differs []differ + local diffapi.DiffClient } +var _ diffapi.DiffServer = &service{} + func (s *service) Register(gs *grpc.Server) error { diffapi.RegisterDiffServer(gs, s) return nil } func (s *service) Apply(ctx context.Context, er *diffapi.ApplyRequest) (*diffapi.ApplyResponse, error) { - var ( - ocidesc ocispec.Descriptor - err error - desc = toDescriptor(er.Diff) - mounts = toMounts(er.Mounts) - ) - - for _, differ := range s.differs { - ocidesc, err = differ.Apply(ctx, desc, mounts) - if !errdefs.IsNotImplemented(err) { - break - } - } - - if err != nil { - return nil, errdefs.ToGRPC(err) - } - - return &diffapi.ApplyResponse{ - Applied: fromDescriptor(ocidesc), - }, nil - + return s.local.Apply(ctx, er) } func (s *service) Diff(ctx context.Context, dr *diffapi.DiffRequest) (*diffapi.DiffResponse, error) { - var ( - ocidesc ocispec.Descriptor - err error - aMounts = toMounts(dr.Left) - bMounts = toMounts(dr.Right) - ) - - var opts []diff.Opt - if dr.MediaType != "" { - opts = append(opts, diff.WithMediaType(dr.MediaType)) - } - if dr.Ref != "" { - opts = append(opts, diff.WithReference(dr.Ref)) - } - if dr.Labels != nil { - opts = append(opts, diff.WithLabels(dr.Labels)) - } - - for _, d := range s.differs { - ocidesc, err = d.Compare(ctx, aMounts, bMounts, opts...) - if !errdefs.IsNotImplemented(err) { - break - } - } - if err != nil { - return nil, errdefs.ToGRPC(err) - } - - return &diffapi.DiffResponse{ - Diff: fromDescriptor(ocidesc), - }, nil -} - -func toMounts(apim []*types.Mount) []mount.Mount { - mounts := make([]mount.Mount, len(apim)) - for i, m := range apim { - mounts[i] = mount.Mount{ - Type: m.Type, - Source: m.Source, - Options: m.Options, - } - } - return mounts -} - -func toDescriptor(d *types.Descriptor) ocispec.Descriptor { - return ocispec.Descriptor{ - MediaType: d.MediaType, - Digest: d.Digest, - Size: d.Size_, - } -} - -func fromDescriptor(d ocispec.Descriptor) *types.Descriptor { - return &types.Descriptor{ - MediaType: d.MediaType, - Digest: d.Digest, - Size_: d.Size, - } + return s.local.Diff(ctx, dr) } diff --git a/services/images/local.go b/services/images/local.go new file mode 100644 index 000000000..1cca1a42a --- /dev/null +++ b/services/images/local.go @@ -0,0 +1,183 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package images + +import ( + gocontext "context" + + eventstypes "github.com/containerd/containerd/api/events" + imagesapi "github.com/containerd/containerd/api/services/images/v1" + "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/events" + "github.com/containerd/containerd/gc" + "github.com/containerd/containerd/images" + "github.com/containerd/containerd/log" + "github.com/containerd/containerd/metadata" + "github.com/containerd/containerd/plugin" + "github.com/containerd/containerd/services" + ptypes "github.com/gogo/protobuf/types" + "golang.org/x/net/context" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +func init() { + plugin.Register(&plugin.Registration{ + Type: plugin.ServicePlugin, + ID: services.ImagesService, + Requires: []plugin.Type{ + plugin.MetadataPlugin, + plugin.GCPlugin, + }, + InitFn: func(ic *plugin.InitContext) (interface{}, error) { + m, err := ic.Get(plugin.MetadataPlugin) + if err != nil { + return nil, err + } + g, err := ic.Get(plugin.GCPlugin) + if err != nil { + return nil, err + } + + return &local{ + store: metadata.NewImageStore(m.(*metadata.DB)), + publisher: ic.Events, + gc: g.(gcScheduler), + }, nil + }, + }) +} + +type gcScheduler interface { + ScheduleAndWait(gocontext.Context) (gc.Stats, error) +} + +type local struct { + store images.Store + gc gcScheduler + publisher events.Publisher +} + +var _ imagesapi.ImagesClient = &local{} + +func (l *local) Get(ctx context.Context, req *imagesapi.GetImageRequest, _ ...grpc.CallOption) (*imagesapi.GetImageResponse, error) { + image, err := l.store.Get(ctx, req.Name) + if err != nil { + return nil, errdefs.ToGRPC(err) + } + + imagepb := imageToProto(&image) + return &imagesapi.GetImageResponse{ + Image: &imagepb, + }, nil +} + +func (l *local) List(ctx context.Context, req *imagesapi.ListImagesRequest, _ ...grpc.CallOption) (*imagesapi.ListImagesResponse, error) { + images, err := l.store.List(ctx, req.Filters...) + if err != nil { + return nil, errdefs.ToGRPC(err) + } + + return &imagesapi.ListImagesResponse{ + Images: imagesToProto(images), + }, nil +} + +func (l *local) Create(ctx context.Context, req *imagesapi.CreateImageRequest, _ ...grpc.CallOption) (*imagesapi.CreateImageResponse, error) { + log.G(ctx).WithField("name", req.Image.Name).WithField("target", req.Image.Target.Digest).Debugf("create image") + if req.Image.Name == "" { + return nil, status.Errorf(codes.InvalidArgument, "Image.Name required") + } + + var ( + image = imageFromProto(&req.Image) + resp imagesapi.CreateImageResponse + ) + created, err := l.store.Create(ctx, image) + if err != nil { + return nil, errdefs.ToGRPC(err) + } + + resp.Image = imageToProto(&created) + + if err := l.publisher.Publish(ctx, "/images/create", &eventstypes.ImageCreate{ + Name: resp.Image.Name, + Labels: resp.Image.Labels, + }); err != nil { + return nil, err + } + + return &resp, nil + +} + +func (l *local) Update(ctx context.Context, req *imagesapi.UpdateImageRequest, _ ...grpc.CallOption) (*imagesapi.UpdateImageResponse, error) { + if req.Image.Name == "" { + return nil, status.Errorf(codes.InvalidArgument, "Image.Name required") + } + + var ( + image = imageFromProto(&req.Image) + resp imagesapi.UpdateImageResponse + fieldpaths []string + ) + + if req.UpdateMask != nil && len(req.UpdateMask.Paths) > 0 { + for _, path := range req.UpdateMask.Paths { + fieldpaths = append(fieldpaths, path) + } + } + + updated, err := l.store.Update(ctx, image, fieldpaths...) + if err != nil { + return nil, errdefs.ToGRPC(err) + } + + resp.Image = imageToProto(&updated) + + if err := l.publisher.Publish(ctx, "/images/update", &eventstypes.ImageUpdate{ + Name: resp.Image.Name, + Labels: resp.Image.Labels, + }); err != nil { + return nil, err + } + + return &resp, nil +} + +func (l *local) Delete(ctx context.Context, req *imagesapi.DeleteImageRequest, _ ...grpc.CallOption) (*ptypes.Empty, error) { + log.G(ctx).WithField("name", req.Name).Debugf("delete image") + + if err := l.store.Delete(ctx, req.Name); err != nil { + return nil, errdefs.ToGRPC(err) + } + + if err := l.publisher.Publish(ctx, "/images/delete", &eventstypes.ImageDelete{ + Name: req.Name, + }); err != nil { + return nil, err + } + + if req.Sync { + if _, err := l.gc.ScheduleAndWait(ctx); err != nil { + return nil, err + } + } + + return &ptypes.Empty{}, nil +} diff --git a/services/images/service.go b/services/images/service.go index d17c684ae..a025c66c9 100644 --- a/services/images/service.go +++ b/services/images/service.go @@ -17,22 +17,13 @@ package images import ( - gocontext "context" - - eventstypes "github.com/containerd/containerd/api/events" imagesapi "github.com/containerd/containerd/api/services/images/v1" - "github.com/containerd/containerd/errdefs" - "github.com/containerd/containerd/events" - "github.com/containerd/containerd/gc" - "github.com/containerd/containerd/images" - "github.com/containerd/containerd/log" - "github.com/containerd/containerd/metadata" "github.com/containerd/containerd/plugin" + "github.com/containerd/containerd/services" ptypes "github.com/gogo/protobuf/types" + "github.com/pkg/errors" "golang.org/x/net/context" "google.golang.org/grpc" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" ) func init() { @@ -40,42 +31,31 @@ func init() { Type: plugin.GRPCPlugin, ID: "images", Requires: []plugin.Type{ - plugin.MetadataPlugin, - plugin.GCPlugin, + plugin.ServicePlugin, }, InitFn: func(ic *plugin.InitContext) (interface{}, error) { - m, err := ic.Get(plugin.MetadataPlugin) + plugins, err := ic.GetByType(plugin.ServicePlugin) if err != nil { return nil, err } - g, err := ic.Get(plugin.GCPlugin) + p, ok := plugins[services.ImagesService] + if !ok { + return nil, errors.New("images service not found") + } + i, err := p.Instance() if err != nil { return nil, err } - - return NewService(metadata.NewImageStore(m.(*metadata.DB)), ic.Events, g.(gcScheduler)), nil + return &service{local: i.(imagesapi.ImagesClient)}, nil }, }) } -type gcScheduler interface { - ScheduleAndWait(gocontext.Context) (gc.Stats, error) -} - type service struct { - store images.Store - gc gcScheduler - publisher events.Publisher + local imagesapi.ImagesClient } -// NewService returns the GRPC image server -func NewService(is images.Store, publisher events.Publisher, gc gcScheduler) imagesapi.ImagesServer { - return &service{ - store: is, - gc: gc, - publisher: publisher, - } -} +var _ imagesapi.ImagesServer = &service{} func (s *service) Register(server *grpc.Server) error { imagesapi.RegisterImagesServer(server, s) @@ -83,108 +63,21 @@ func (s *service) Register(server *grpc.Server) error { } func (s *service) Get(ctx context.Context, req *imagesapi.GetImageRequest) (*imagesapi.GetImageResponse, error) { - image, err := s.store.Get(ctx, req.Name) - if err != nil { - return nil, errdefs.ToGRPC(err) - } - - imagepb := imageToProto(&image) - return &imagesapi.GetImageResponse{ - Image: &imagepb, - }, nil + return s.local.Get(ctx, req) } func (s *service) List(ctx context.Context, req *imagesapi.ListImagesRequest) (*imagesapi.ListImagesResponse, error) { - images, err := s.store.List(ctx, req.Filters...) - if err != nil { - return nil, errdefs.ToGRPC(err) - } - - return &imagesapi.ListImagesResponse{ - Images: imagesToProto(images), - }, nil + return s.local.List(ctx, req) } func (s *service) Create(ctx context.Context, req *imagesapi.CreateImageRequest) (*imagesapi.CreateImageResponse, error) { - log.G(ctx).WithField("name", req.Image.Name).WithField("target", req.Image.Target.Digest).Debugf("create image") - if req.Image.Name == "" { - return nil, status.Errorf(codes.InvalidArgument, "Image.Name required") - } - - var ( - image = imageFromProto(&req.Image) - resp imagesapi.CreateImageResponse - ) - created, err := s.store.Create(ctx, image) - if err != nil { - return nil, errdefs.ToGRPC(err) - } - - resp.Image = imageToProto(&created) - - if err := s.publisher.Publish(ctx, "/images/create", &eventstypes.ImageCreate{ - Name: resp.Image.Name, - Labels: resp.Image.Labels, - }); err != nil { - return nil, err - } - - return &resp, nil - + return s.local.Create(ctx, req) } func (s *service) Update(ctx context.Context, req *imagesapi.UpdateImageRequest) (*imagesapi.UpdateImageResponse, error) { - if req.Image.Name == "" { - return nil, status.Errorf(codes.InvalidArgument, "Image.Name required") - } - - var ( - image = imageFromProto(&req.Image) - resp imagesapi.UpdateImageResponse - fieldpaths []string - ) - - if req.UpdateMask != nil && len(req.UpdateMask.Paths) > 0 { - for _, path := range req.UpdateMask.Paths { - fieldpaths = append(fieldpaths, path) - } - } - - updated, err := s.store.Update(ctx, image, fieldpaths...) - if err != nil { - return nil, errdefs.ToGRPC(err) - } - - resp.Image = imageToProto(&updated) - - if err := s.publisher.Publish(ctx, "/images/update", &eventstypes.ImageUpdate{ - Name: resp.Image.Name, - Labels: resp.Image.Labels, - }); err != nil { - return nil, err - } - - return &resp, nil + return s.local.Update(ctx, req) } func (s *service) Delete(ctx context.Context, req *imagesapi.DeleteImageRequest) (*ptypes.Empty, error) { - log.G(ctx).WithField("name", req.Name).Debugf("delete image") - - if err := s.store.Delete(ctx, req.Name); err != nil { - return nil, errdefs.ToGRPC(err) - } - - if err := s.publisher.Publish(ctx, "/images/delete", &eventstypes.ImageDelete{ - Name: req.Name, - }); err != nil { - return nil, err - } - - if req.Sync { - if _, err := s.gc.ScheduleAndWait(ctx); err != nil { - return nil, err - } - } - - return &ptypes.Empty{}, nil + return s.local.Delete(ctx, req) } diff --git a/services/leases/local.go b/services/leases/local.go new file mode 100644 index 000000000..d3e0c2f2c --- /dev/null +++ b/services/leases/local.go @@ -0,0 +1,120 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package leases + +import ( + "crypto/rand" + "encoding/base64" + "fmt" + "time" + + "google.golang.org/grpc" + + "github.com/boltdb/bolt" + api "github.com/containerd/containerd/api/services/leases/v1" + "github.com/containerd/containerd/metadata" + "github.com/containerd/containerd/plugin" + "github.com/containerd/containerd/services" + ptypes "github.com/gogo/protobuf/types" + "golang.org/x/net/context" +) + +func init() { + plugin.Register(&plugin.Registration{ + Type: plugin.ServicePlugin, + ID: services.LeasesService, + Requires: []plugin.Type{ + plugin.MetadataPlugin, + }, + InitFn: func(ic *plugin.InitContext) (interface{}, error) { + m, err := ic.Get(plugin.MetadataPlugin) + if err != nil { + return nil, err + } + return &local{db: m.(*metadata.DB)}, nil + }, + }) +} + +type local struct { + db *metadata.DB +} + +func (l *local) Create(ctx context.Context, r *api.CreateRequest, _ ...grpc.CallOption) (*api.CreateResponse, error) { + lid := r.ID + if lid == "" { + lid = generateLeaseID() + } + var trans metadata.Lease + if err := l.db.Update(func(tx *bolt.Tx) error { + var err error + trans, err = metadata.NewLeaseManager(tx).Create(ctx, lid, r.Labels) + return err + }); err != nil { + return nil, err + } + return &api.CreateResponse{ + Lease: txToGRPC(trans), + }, nil +} + +func (l *local) Delete(ctx context.Context, r *api.DeleteRequest, _ ...grpc.CallOption) (*ptypes.Empty, error) { + if err := l.db.Update(func(tx *bolt.Tx) error { + return metadata.NewLeaseManager(tx).Delete(ctx, r.ID) + }); err != nil { + return nil, err + } + return &ptypes.Empty{}, nil +} + +func (l *local) List(ctx context.Context, r *api.ListRequest, _ ...grpc.CallOption) (*api.ListResponse, error) { + var leases []metadata.Lease + if err := l.db.View(func(tx *bolt.Tx) error { + var err error + leases, err = metadata.NewLeaseManager(tx).List(ctx, false, r.Filters...) + return err + }); err != nil { + return nil, err + } + + apileases := make([]*api.Lease, len(leases)) + for i := range leases { + apileases[i] = txToGRPC(leases[i]) + } + + return &api.ListResponse{ + Leases: apileases, + }, nil +} + +func txToGRPC(tx metadata.Lease) *api.Lease { + return &api.Lease{ + ID: tx.ID, + Labels: tx.Labels, + CreatedAt: tx.CreatedAt, + // TODO: Snapshots + // TODO: Content + } +} + +func generateLeaseID() string { + t := time.Now() + var b [3]byte + // Ignore read failures, just decreases uniqueness + rand.Read(b[:]) + return fmt.Sprintf("%d-%s", t.Nanosecond(), base64.URLEncoding.EncodeToString(b[:])) +} diff --git a/services/leases/service.go b/services/leases/service.go index 100052f3a..e9cccf921 100644 --- a/services/leases/service.go +++ b/services/leases/service.go @@ -17,18 +17,13 @@ package leases import ( - "crypto/rand" - "encoding/base64" - "fmt" - "time" - "google.golang.org/grpc" - "github.com/boltdb/bolt" api "github.com/containerd/containerd/api/services/leases/v1" - "github.com/containerd/containerd/metadata" "github.com/containerd/containerd/plugin" + "github.com/containerd/containerd/services" ptypes "github.com/gogo/protobuf/types" + "github.com/pkg/errors" "golang.org/x/net/context" ) @@ -37,27 +32,28 @@ func init() { Type: plugin.GRPCPlugin, ID: "leases", Requires: []plugin.Type{ - plugin.MetadataPlugin, + plugin.ServicePlugin, }, InitFn: func(ic *plugin.InitContext) (interface{}, error) { - m, err := ic.Get(plugin.MetadataPlugin) + plugins, err := ic.GetByType(plugin.ServicePlugin) if err != nil { return nil, err } - return NewService(m.(*metadata.DB)), nil + p, ok := plugins[services.LeasesService] + if !ok { + return nil, errors.New("leases service not found") + } + i, err := p.Instance() + if err != nil { + return nil, err + } + return &service{local: i.(api.LeasesClient)}, nil }, }) } type service struct { - db *metadata.DB -} - -// NewService returns the GRPC metadata server -func NewService(db *metadata.DB) api.LeasesServer { - return &service{ - db: db, - } + local api.LeasesClient } func (s *service) Register(server *grpc.Server) error { @@ -66,66 +62,13 @@ func (s *service) Register(server *grpc.Server) error { } func (s *service) Create(ctx context.Context, r *api.CreateRequest) (*api.CreateResponse, error) { - lid := r.ID - if lid == "" { - lid = generateLeaseID() - } - var trans metadata.Lease - if err := s.db.Update(func(tx *bolt.Tx) error { - var err error - trans, err = metadata.NewLeaseManager(tx).Create(ctx, lid, r.Labels) - return err - }); err != nil { - return nil, err - } - return &api.CreateResponse{ - Lease: txToGRPC(trans), - }, nil + return s.local.Create(ctx, r) } func (s *service) Delete(ctx context.Context, r *api.DeleteRequest) (*ptypes.Empty, error) { - if err := s.db.Update(func(tx *bolt.Tx) error { - return metadata.NewLeaseManager(tx).Delete(ctx, r.ID) - }); err != nil { - return nil, err - } - return &ptypes.Empty{}, nil + return s.local.Delete(ctx, r) } func (s *service) List(ctx context.Context, r *api.ListRequest) (*api.ListResponse, error) { - var leases []metadata.Lease - if err := s.db.View(func(tx *bolt.Tx) error { - var err error - leases, err = metadata.NewLeaseManager(tx).List(ctx, false, r.Filters...) - return err - }); err != nil { - return nil, err - } - - apileases := make([]*api.Lease, len(leases)) - for i := range leases { - apileases[i] = txToGRPC(leases[i]) - } - - return &api.ListResponse{ - Leases: apileases, - }, nil -} - -func txToGRPC(tx metadata.Lease) *api.Lease { - return &api.Lease{ - ID: tx.ID, - Labels: tx.Labels, - CreatedAt: tx.CreatedAt, - // TODO: Snapshots - // TODO: Content - } -} - -func generateLeaseID() string { - t := time.Now() - var b [3]byte - // Ignore read failures, just decreases uniqueness - rand.Read(b[:]) - return fmt.Sprintf("%d-%s", t.Nanosecond(), base64.URLEncoding.EncodeToString(b[:])) + return s.local.List(ctx, r) } diff --git a/services/namespaces/local.go b/services/namespaces/local.go new file mode 100644 index 000000000..dfa86740b --- /dev/null +++ b/services/namespaces/local.go @@ -0,0 +1,223 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package namespaces + +import ( + "strings" + + "github.com/boltdb/bolt" + eventstypes "github.com/containerd/containerd/api/events" + api "github.com/containerd/containerd/api/services/namespaces/v1" + "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/events" + "github.com/containerd/containerd/metadata" + "github.com/containerd/containerd/namespaces" + "github.com/containerd/containerd/plugin" + "github.com/containerd/containerd/services" + ptypes "github.com/gogo/protobuf/types" + "golang.org/x/net/context" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +func init() { + plugin.Register(&plugin.Registration{ + Type: plugin.ServicePlugin, + ID: services.NamespacesService, + Requires: []plugin.Type{ + plugin.MetadataPlugin, + }, + InitFn: func(ic *plugin.InitContext) (interface{}, error) { + m, err := ic.Get(plugin.MetadataPlugin) + if err != nil { + return nil, err + } + return &local{ + db: m.(*metadata.DB), + publisher: ic.Events, + }, nil + }, + }) +} + +// Provide local namespaces service instead of local namespace store, +// because namespace store interface doesn't provide enough functionality +// for namespaces service. +type local struct { + db *metadata.DB + publisher events.Publisher +} + +var _ api.NamespacesClient = &local{} + +func (l *local) Get(ctx context.Context, req *api.GetNamespaceRequest, _ ...grpc.CallOption) (*api.GetNamespaceResponse, error) { + var resp api.GetNamespaceResponse + + return &resp, l.withStoreView(ctx, func(ctx context.Context, store namespaces.Store) error { + labels, err := store.Labels(ctx, req.Name) + if err != nil { + return errdefs.ToGRPC(err) + } + + resp.Namespace = api.Namespace{ + Name: req.Name, + Labels: labels, + } + + return nil + }) +} + +func (l *local) List(ctx context.Context, req *api.ListNamespacesRequest, _ ...grpc.CallOption) (*api.ListNamespacesResponse, error) { + var resp api.ListNamespacesResponse + + return &resp, l.withStoreView(ctx, func(ctx context.Context, store namespaces.Store) error { + namespaces, err := store.List(ctx) + if err != nil { + return err + } + + for _, namespace := range namespaces { + labels, err := store.Labels(ctx, namespace) + if err != nil { + // In general, this should be unlikely, since we are holding a + // transaction to service this request. + return errdefs.ToGRPC(err) + } + + resp.Namespaces = append(resp.Namespaces, api.Namespace{ + Name: namespace, + Labels: labels, + }) + } + + return nil + }) +} + +func (l *local) Create(ctx context.Context, req *api.CreateNamespaceRequest, _ ...grpc.CallOption) (*api.CreateNamespaceResponse, error) { + var resp api.CreateNamespaceResponse + + if err := l.withStoreUpdate(ctx, func(ctx context.Context, store namespaces.Store) error { + if err := store.Create(ctx, req.Namespace.Name, req.Namespace.Labels); err != nil { + return errdefs.ToGRPC(err) + } + + for k, v := range req.Namespace.Labels { + if err := store.SetLabel(ctx, req.Namespace.Name, k, v); err != nil { + return err + } + } + + resp.Namespace = req.Namespace + return nil + }); err != nil { + return &resp, err + } + + if err := l.publisher.Publish(ctx, "/namespaces/create", &eventstypes.NamespaceCreate{ + Name: req.Namespace.Name, + Labels: req.Namespace.Labels, + }); err != nil { + return &resp, err + } + + return &resp, nil + +} + +func (l *local) Update(ctx context.Context, req *api.UpdateNamespaceRequest, _ ...grpc.CallOption) (*api.UpdateNamespaceResponse, error) { + var resp api.UpdateNamespaceResponse + if err := l.withStoreUpdate(ctx, func(ctx context.Context, store namespaces.Store) error { + if req.UpdateMask != nil && len(req.UpdateMask.Paths) > 0 { + for _, path := range req.UpdateMask.Paths { + switch { + case strings.HasPrefix(path, "labels."): + key := strings.TrimPrefix(path, "labels.") + if err := store.SetLabel(ctx, req.Namespace.Name, key, req.Namespace.Labels[key]); err != nil { + return err + } + default: + return status.Errorf(codes.InvalidArgument, "cannot update %q field", path) + } + } + } else { + // clear out the existing labels and then set them to the incoming request. + // get current set of labels + labels, err := store.Labels(ctx, req.Namespace.Name) + if err != nil { + return errdefs.ToGRPC(err) + } + + for k := range labels { + if err := store.SetLabel(ctx, req.Namespace.Name, k, ""); err != nil { + return err + } + } + + for k, v := range req.Namespace.Labels { + if err := store.SetLabel(ctx, req.Namespace.Name, k, v); err != nil { + return err + } + + } + } + + return nil + }); err != nil { + return &resp, err + } + + if err := l.publisher.Publish(ctx, "/namespaces/update", &eventstypes.NamespaceUpdate{ + Name: req.Namespace.Name, + Labels: req.Namespace.Labels, + }); err != nil { + return &resp, err + } + + return &resp, nil +} + +func (l *local) Delete(ctx context.Context, req *api.DeleteNamespaceRequest, _ ...grpc.CallOption) (*ptypes.Empty, error) { + if err := l.withStoreUpdate(ctx, func(ctx context.Context, store namespaces.Store) error { + return errdefs.ToGRPC(store.Delete(ctx, req.Name)) + }); err != nil { + return &ptypes.Empty{}, err + } + // set the namespace in the context before publishing the event + ctx = namespaces.WithNamespace(ctx, req.Name) + if err := l.publisher.Publish(ctx, "/namespaces/delete", &eventstypes.NamespaceDelete{ + Name: req.Name, + }); err != nil { + return &ptypes.Empty{}, err + } + + return &ptypes.Empty{}, nil +} + +func (l *local) withStore(ctx context.Context, fn func(ctx context.Context, store namespaces.Store) error) func(tx *bolt.Tx) error { + return func(tx *bolt.Tx) error { return fn(ctx, metadata.NewNamespaceStore(tx)) } +} + +func (l *local) withStoreView(ctx context.Context, fn func(ctx context.Context, store namespaces.Store) error) error { + return l.db.View(l.withStore(ctx, fn)) +} + +func (l *local) withStoreUpdate(ctx context.Context, fn func(ctx context.Context, store namespaces.Store) error) error { + return l.db.Update(l.withStore(ctx, fn)) +} diff --git a/services/namespaces/service.go b/services/namespaces/service.go index 2c5694012..965590688 100644 --- a/services/namespaces/service.go +++ b/services/namespaces/service.go @@ -17,21 +17,13 @@ package namespaces import ( - "strings" - - "github.com/boltdb/bolt" - eventstypes "github.com/containerd/containerd/api/events" api "github.com/containerd/containerd/api/services/namespaces/v1" - "github.com/containerd/containerd/errdefs" - "github.com/containerd/containerd/events" - "github.com/containerd/containerd/metadata" - "github.com/containerd/containerd/namespaces" "github.com/containerd/containerd/plugin" + "github.com/containerd/containerd/services" ptypes "github.com/gogo/protobuf/types" + "github.com/pkg/errors" "golang.org/x/net/context" "google.golang.org/grpc" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" ) func init() { @@ -39,191 +31,53 @@ func init() { Type: plugin.GRPCPlugin, ID: "namespaces", Requires: []plugin.Type{ - plugin.MetadataPlugin, + plugin.ServicePlugin, }, InitFn: func(ic *plugin.InitContext) (interface{}, error) { - m, err := ic.Get(plugin.MetadataPlugin) + plugins, err := ic.GetByType(plugin.ServicePlugin) if err != nil { return nil, err } - return NewService(m.(*metadata.DB), ic.Events), nil + p, ok := plugins[services.NamespacesService] + if !ok { + return nil, errors.New("namespaces service not found") + } + i, err := p.Instance() + if err != nil { + return nil, err + } + return &service{local: i.(api.NamespacesClient)}, nil }, }) } type service struct { - db *metadata.DB - publisher events.Publisher + local api.NamespacesClient } var _ api.NamespacesServer = &service{} -// NewService returns the GRPC namespaces server -func NewService(db *metadata.DB, publisher events.Publisher) api.NamespacesServer { - return &service{ - db: db, - publisher: publisher, - } -} - func (s *service) Register(server *grpc.Server) error { api.RegisterNamespacesServer(server, s) return nil } func (s *service) Get(ctx context.Context, req *api.GetNamespaceRequest) (*api.GetNamespaceResponse, error) { - var resp api.GetNamespaceResponse - - return &resp, s.withStoreView(ctx, func(ctx context.Context, store namespaces.Store) error { - labels, err := store.Labels(ctx, req.Name) - if err != nil { - return errdefs.ToGRPC(err) - } - - resp.Namespace = api.Namespace{ - Name: req.Name, - Labels: labels, - } - - return nil - }) + return s.local.Get(ctx, req) } func (s *service) List(ctx context.Context, req *api.ListNamespacesRequest) (*api.ListNamespacesResponse, error) { - var resp api.ListNamespacesResponse - - return &resp, s.withStoreView(ctx, func(ctx context.Context, store namespaces.Store) error { - namespaces, err := store.List(ctx) - if err != nil { - return err - } - - for _, namespace := range namespaces { - labels, err := store.Labels(ctx, namespace) - if err != nil { - // In general, this should be unlikely, since we are holding a - // transaction to service this request. - return errdefs.ToGRPC(err) - } - - resp.Namespaces = append(resp.Namespaces, api.Namespace{ - Name: namespace, - Labels: labels, - }) - } - - return nil - }) + return s.local.List(ctx, req) } func (s *service) Create(ctx context.Context, req *api.CreateNamespaceRequest) (*api.CreateNamespaceResponse, error) { - var resp api.CreateNamespaceResponse - - if err := s.withStoreUpdate(ctx, func(ctx context.Context, store namespaces.Store) error { - if err := store.Create(ctx, req.Namespace.Name, req.Namespace.Labels); err != nil { - return errdefs.ToGRPC(err) - } - - for k, v := range req.Namespace.Labels { - if err := store.SetLabel(ctx, req.Namespace.Name, k, v); err != nil { - return err - } - } - - resp.Namespace = req.Namespace - return nil - }); err != nil { - return &resp, err - } - - if err := s.publisher.Publish(ctx, "/namespaces/create", &eventstypes.NamespaceCreate{ - Name: req.Namespace.Name, - Labels: req.Namespace.Labels, - }); err != nil { - return &resp, err - } - - return &resp, nil - + return s.local.Create(ctx, req) } func (s *service) Update(ctx context.Context, req *api.UpdateNamespaceRequest) (*api.UpdateNamespaceResponse, error) { - var resp api.UpdateNamespaceResponse - if err := s.withStoreUpdate(ctx, func(ctx context.Context, store namespaces.Store) error { - if req.UpdateMask != nil && len(req.UpdateMask.Paths) > 0 { - for _, path := range req.UpdateMask.Paths { - switch { - case strings.HasPrefix(path, "labels."): - key := strings.TrimPrefix(path, "labels.") - if err := store.SetLabel(ctx, req.Namespace.Name, key, req.Namespace.Labels[key]); err != nil { - return err - } - default: - return status.Errorf(codes.InvalidArgument, "cannot update %q field", path) - } - } - } else { - // clear out the existing labels and then set them to the incoming request. - // get current set of labels - labels, err := store.Labels(ctx, req.Namespace.Name) - if err != nil { - return errdefs.ToGRPC(err) - } - - for k := range labels { - if err := store.SetLabel(ctx, req.Namespace.Name, k, ""); err != nil { - return err - } - } - - for k, v := range req.Namespace.Labels { - if err := store.SetLabel(ctx, req.Namespace.Name, k, v); err != nil { - return err - } - - } - } - - return nil - }); err != nil { - return &resp, err - } - - if err := s.publisher.Publish(ctx, "/namespaces/update", &eventstypes.NamespaceUpdate{ - Name: req.Namespace.Name, - Labels: req.Namespace.Labels, - }); err != nil { - return &resp, err - } - - return &resp, nil + return s.local.Update(ctx, req) } func (s *service) Delete(ctx context.Context, req *api.DeleteNamespaceRequest) (*ptypes.Empty, error) { - if err := s.withStoreUpdate(ctx, func(ctx context.Context, store namespaces.Store) error { - return errdefs.ToGRPC(store.Delete(ctx, req.Name)) - }); err != nil { - return &ptypes.Empty{}, err - } - // set the namespace in the context before publishing the event - ctx = namespaces.WithNamespace(ctx, req.Name) - if err := s.publisher.Publish(ctx, "/namespaces/delete", &eventstypes.NamespaceDelete{ - Name: req.Name, - }); err != nil { - return &ptypes.Empty{}, err - } - - return &ptypes.Empty{}, nil -} - -func (s *service) withStore(ctx context.Context, fn func(ctx context.Context, store namespaces.Store) error) func(tx *bolt.Tx) error { - return func(tx *bolt.Tx) error { return fn(ctx, metadata.NewNamespaceStore(tx)) } -} - -func (s *service) withStoreView(ctx context.Context, fn func(ctx context.Context, store namespaces.Store) error) error { - return s.db.View(s.withStore(ctx, fn)) -} - -func (s *service) withStoreUpdate(ctx context.Context, fn func(ctx context.Context, store namespaces.Store) error) error { - return s.db.Update(s.withStore(ctx, fn)) + return s.local.Delete(ctx, req) } diff --git a/services/services.go b/services/services.go new file mode 100644 index 000000000..efc920093 --- /dev/null +++ b/services/services.go @@ -0,0 +1,36 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package services + +const ( + // ContentService is id of content service. + ContentService = "content-service" + // SnapshotsService is id of snapshots service. + SnapshotsService = "snapshots-service" + // ImagesService is id of images service. + ImagesService = "images-service" + // ContainersService is id of containers service. + ContainersService = "containers-service" + // TasksService is id of tasks service. + TasksService = "tasks-service" + // NamespacesService is id of namespaces service. + NamespacesService = "namespaces-service" + // LeasesService is id of leases service. + LeasesService = "leases-service" + // DiffService is id of diff service. + DiffService = "diff-service" +) diff --git a/services/snapshots/service.go b/services/snapshots/service.go index 2d997bdc9..1d10fae1e 100644 --- a/services/snapshots/service.go +++ b/services/snapshots/service.go @@ -19,17 +19,16 @@ package snapshots import ( gocontext "context" - eventstypes "github.com/containerd/containerd/api/events" snapshotsapi "github.com/containerd/containerd/api/services/snapshots/v1" "github.com/containerd/containerd/api/types" "github.com/containerd/containerd/errdefs" - "github.com/containerd/containerd/events" "github.com/containerd/containerd/log" - "github.com/containerd/containerd/metadata" "github.com/containerd/containerd/mount" "github.com/containerd/containerd/plugin" + "github.com/containerd/containerd/services" "github.com/containerd/containerd/snapshots" ptypes "github.com/gogo/protobuf/types" + "github.com/pkg/errors" "golang.org/x/net/context" "google.golang.org/grpc" ) @@ -39,7 +38,7 @@ func init() { Type: plugin.GRPCPlugin, ID: "snapshots", Requires: []plugin.Type{ - plugin.MetadataPlugin, + plugin.ServicePlugin, }, InitFn: newService, }) @@ -48,20 +47,24 @@ func init() { var empty = &ptypes.Empty{} type service struct { - db *metadata.DB - publisher events.Publisher + ss map[string]snapshots.Snapshotter } func newService(ic *plugin.InitContext) (interface{}, error) { - md, err := ic.Get(plugin.MetadataPlugin) + plugins, err := ic.GetByType(plugin.ServicePlugin) if err != nil { return nil, err } - - return &service{ - db: md.(*metadata.DB), - publisher: ic.Events, - }, nil + p, ok := plugins[services.SnapshotsService] + if !ok { + return nil, errors.New("snapshots service not found") + } + i, err := p.Instance() + if err != nil { + return nil, err + } + ss := i.(map[string]snapshots.Snapshotter) + return &service{ss: ss}, nil } func (s *service) getSnapshotter(name string) (snapshots.Snapshotter, error) { @@ -69,7 +72,7 @@ func (s *service) getSnapshotter(name string) (snapshots.Snapshotter, error) { return nil, errdefs.ToGRPCf(errdefs.ErrInvalidArgument, "snapshotter argument missing") } - sn := s.db.Snapshotter(name) + sn := s.ss[name] if sn == nil { return nil, errdefs.ToGRPCf(errdefs.ErrInvalidArgument, "snapshotter not loaded: %s", name) } @@ -97,12 +100,6 @@ func (s *service) Prepare(ctx context.Context, pr *snapshotsapi.PrepareSnapshotR return nil, errdefs.ToGRPC(err) } - if err := s.publisher.Publish(ctx, "/snapshot/prepare", &eventstypes.SnapshotPrepare{ - Key: pr.Key, - Parent: pr.Parent, - }); err != nil { - return nil, err - } return &snapshotsapi.PrepareSnapshotResponse{ Mounts: fromMounts(mounts), }, nil @@ -158,12 +155,6 @@ func (s *service) Commit(ctx context.Context, cr *snapshotsapi.CommitSnapshotReq return nil, errdefs.ToGRPC(err) } - if err := s.publisher.Publish(ctx, "/snapshot/commit", &eventstypes.SnapshotCommit{ - Key: cr.Key, - Name: cr.Name, - }); err != nil { - return nil, err - } return empty, nil } @@ -178,11 +169,6 @@ func (s *service) Remove(ctx context.Context, rr *snapshotsapi.RemoveSnapshotReq return nil, errdefs.ToGRPC(err) } - if err := s.publisher.Publish(ctx, "/snapshot/remove", &eventstypes.SnapshotRemove{ - Key: rr.Key, - }); err != nil { - return nil, err - } return empty, nil } diff --git a/services/snapshots/snapshotters.go b/services/snapshots/snapshotters.go new file mode 100644 index 000000000..5da365110 --- /dev/null +++ b/services/snapshots/snapshotters.go @@ -0,0 +1,98 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package snapshots + +import ( + "context" + + eventstypes "github.com/containerd/containerd/api/events" + "github.com/containerd/containerd/events" + "github.com/containerd/containerd/metadata" + "github.com/containerd/containerd/mount" + "github.com/containerd/containerd/plugin" + "github.com/containerd/containerd/services" + "github.com/containerd/containerd/snapshots" +) + +// snapshotter wraps snapshots.Snapshotter with proper events published. +type snapshotter struct { + snapshots.Snapshotter + publisher events.Publisher +} + +func init() { + plugin.Register(&plugin.Registration{ + Type: plugin.ServicePlugin, + ID: services.SnapshotsService, + Requires: []plugin.Type{ + plugin.MetadataPlugin, + }, + InitFn: func(ic *plugin.InitContext) (interface{}, error) { + m, err := ic.Get(plugin.MetadataPlugin) + if err != nil { + return nil, err + } + + db := m.(*metadata.DB) + ss := make(map[string]snapshots.Snapshotter) + for n, sn := range db.Snapshotters() { + ss[n] = newSnapshotter(sn, ic.Events) + } + return ss, nil + }, + }) +} + +func newSnapshotter(sn snapshots.Snapshotter, publisher events.Publisher) snapshots.Snapshotter { + return &snapshotter{ + Snapshotter: sn, + publisher: publisher, + } +} + +func (s *snapshotter) Prepare(ctx context.Context, key, parent string, opts ...snapshots.Opt) ([]mount.Mount, error) { + mounts, err := s.Snapshotter.Prepare(ctx, key, parent, opts...) + if err != nil { + return nil, err + } + if err := s.publisher.Publish(ctx, "/snapshot/prepare", &eventstypes.SnapshotPrepare{ + Key: key, + Parent: parent, + }); err != nil { + return nil, err + } + return mounts, nil +} + +func (s *snapshotter) Commit(ctx context.Context, name, key string, opts ...snapshots.Opt) error { + if err := s.Snapshotter.Commit(ctx, name, key, opts...); err != nil { + return err + } + return s.publisher.Publish(ctx, "/snapshot/commit", &eventstypes.SnapshotCommit{ + Key: key, + Name: name, + }) +} + +func (s *snapshotter) Remove(ctx context.Context, key string) error { + if err := s.Snapshotter.Remove(ctx, key); err != nil { + return err + } + return s.publisher.Publish(ctx, "/snapshot/remove", &eventstypes.SnapshotRemove{ + Key: key, + }) +} diff --git a/services/tasks/local.go b/services/tasks/local.go new file mode 100644 index 000000000..5bdadab62 --- /dev/null +++ b/services/tasks/local.go @@ -0,0 +1,634 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package tasks + +import ( + "bytes" + "fmt" + "io" + "io/ioutil" + "os" + "path/filepath" + "time" + + "github.com/boltdb/bolt" + api "github.com/containerd/containerd/api/services/tasks/v1" + "github.com/containerd/containerd/api/types" + "github.com/containerd/containerd/api/types/task" + "github.com/containerd/containerd/archive" + "github.com/containerd/containerd/containers" + "github.com/containerd/containerd/content" + "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/events" + "github.com/containerd/containerd/filters" + "github.com/containerd/containerd/images" + "github.com/containerd/containerd/log" + "github.com/containerd/containerd/metadata" + "github.com/containerd/containerd/mount" + "github.com/containerd/containerd/plugin" + "github.com/containerd/containerd/runtime" + "github.com/containerd/containerd/services" + "github.com/containerd/typeurl" + ptypes "github.com/gogo/protobuf/types" + "github.com/pkg/errors" + "golang.org/x/net/context" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +var ( + _ = (api.TasksClient)(&local{}) + empty = &ptypes.Empty{} +) + +func init() { + plugin.Register(&plugin.Registration{ + Type: plugin.ServicePlugin, + ID: services.TasksService, + Requires: []plugin.Type{ + plugin.RuntimePlugin, + plugin.MetadataPlugin, + }, + InitFn: initFunc, + }) +} + +func initFunc(ic *plugin.InitContext) (interface{}, error) { + rt, err := ic.GetByType(plugin.RuntimePlugin) + if err != nil { + return nil, err + } + + m, err := ic.Get(plugin.MetadataPlugin) + if err != nil { + return nil, err + } + cs := m.(*metadata.DB).ContentStore() + runtimes := make(map[string]runtime.Runtime) + for _, rr := range rt { + ri, err := rr.Instance() + if err != nil { + log.G(ic.Context).WithError(err).Warn("could not load runtime instance due to initialization error") + continue + } + r := ri.(runtime.Runtime) + runtimes[r.ID()] = r + } + + if len(runtimes) == 0 { + return nil, errors.New("no runtimes available to create task service") + } + return &local{ + runtimes: runtimes, + db: m.(*metadata.DB), + store: cs, + publisher: ic.Events, + }, nil +} + +type local struct { + runtimes map[string]runtime.Runtime + db *metadata.DB + store content.Store + publisher events.Publisher +} + +func (l *local) Create(ctx context.Context, r *api.CreateTaskRequest, _ ...grpc.CallOption) (*api.CreateTaskResponse, error) { + var ( + checkpointPath string + err error + ) + if r.Checkpoint != nil { + checkpointPath, err = ioutil.TempDir("", "ctrd-checkpoint") + if err != nil { + return nil, err + } + if r.Checkpoint.MediaType != images.MediaTypeContainerd1Checkpoint { + return nil, fmt.Errorf("unsupported checkpoint type %q", r.Checkpoint.MediaType) + } + reader, err := l.store.ReaderAt(ctx, r.Checkpoint.Digest) + if err != nil { + return nil, err + } + _, err = archive.Apply(ctx, checkpointPath, content.NewReader(reader)) + reader.Close() + if err != nil { + return nil, err + } + } + + container, err := l.getContainer(ctx, r.ContainerID) + if err != nil { + return nil, errdefs.ToGRPC(err) + } + opts := runtime.CreateOpts{ + Spec: container.Spec, + IO: runtime.IO{ + Stdin: r.Stdin, + Stdout: r.Stdout, + Stderr: r.Stderr, + Terminal: r.Terminal, + }, + Checkpoint: checkpointPath, + Options: r.Options, + } + for _, m := range r.Rootfs { + opts.Rootfs = append(opts.Rootfs, mount.Mount{ + Type: m.Type, + Source: m.Source, + Options: m.Options, + }) + } + runtime, err := l.getRuntime(container.Runtime.Name) + if err != nil { + return nil, err + } + c, err := runtime.Create(ctx, r.ContainerID, opts) + if err != nil { + return nil, errdefs.ToGRPC(err) + } + state, err := c.State(ctx) + if err != nil { + log.G(ctx).Error(err) + } + + return &api.CreateTaskResponse{ + ContainerID: r.ContainerID, + Pid: state.Pid, + }, nil +} + +func (l *local) Start(ctx context.Context, r *api.StartRequest, _ ...grpc.CallOption) (*api.StartResponse, error) { + t, err := l.getTask(ctx, r.ContainerID) + if err != nil { + return nil, err + } + p := runtime.Process(t) + if r.ExecID != "" { + if p, err = t.Process(ctx, r.ExecID); err != nil { + return nil, errdefs.ToGRPC(err) + } + } + if err := p.Start(ctx); err != nil { + return nil, errdefs.ToGRPC(err) + } + state, err := p.State(ctx) + if err != nil { + return nil, errdefs.ToGRPC(err) + } + return &api.StartResponse{ + Pid: state.Pid, + }, nil +} + +func (l *local) Delete(ctx context.Context, r *api.DeleteTaskRequest, _ ...grpc.CallOption) (*api.DeleteResponse, error) { + t, err := l.getTask(ctx, r.ContainerID) + if err != nil { + return nil, err + } + runtime, err := l.getRuntime(t.Info().Runtime) + if err != nil { + return nil, err + } + exit, err := runtime.Delete(ctx, t) + if err != nil { + return nil, errdefs.ToGRPC(err) + } + return &api.DeleteResponse{ + ExitStatus: exit.Status, + ExitedAt: exit.Timestamp, + Pid: exit.Pid, + }, nil +} + +func (l *local) DeleteProcess(ctx context.Context, r *api.DeleteProcessRequest, _ ...grpc.CallOption) (*api.DeleteResponse, error) { + t, err := l.getTask(ctx, r.ContainerID) + if err != nil { + return nil, err + } + exit, err := t.DeleteProcess(ctx, r.ExecID) + if err != nil { + return nil, errdefs.ToGRPC(err) + } + return &api.DeleteResponse{ + ID: r.ExecID, + ExitStatus: exit.Status, + ExitedAt: exit.Timestamp, + Pid: exit.Pid, + }, nil +} + +func processFromContainerd(ctx context.Context, p runtime.Process) (*task.Process, error) { + state, err := p.State(ctx) + if err != nil { + return nil, err + } + var status task.Status + switch state.Status { + case runtime.CreatedStatus: + status = task.StatusCreated + case runtime.RunningStatus: + status = task.StatusRunning + case runtime.StoppedStatus: + status = task.StatusStopped + case runtime.PausedStatus: + status = task.StatusPaused + case runtime.PausingStatus: + status = task.StatusPausing + default: + log.G(ctx).WithField("status", state.Status).Warn("unknown status") + } + return &task.Process{ + ID: p.ID(), + Pid: state.Pid, + Status: status, + Stdin: state.Stdin, + Stdout: state.Stdout, + Stderr: state.Stderr, + Terminal: state.Terminal, + ExitStatus: state.ExitStatus, + ExitedAt: state.ExitedAt, + }, nil +} + +func (l *local) Get(ctx context.Context, r *api.GetRequest, _ ...grpc.CallOption) (*api.GetResponse, error) { + task, err := l.getTask(ctx, r.ContainerID) + if err != nil { + return nil, err + } + p := runtime.Process(task) + if r.ExecID != "" { + if p, err = task.Process(ctx, r.ExecID); err != nil { + return nil, errdefs.ToGRPC(err) + } + } + t, err := processFromContainerd(ctx, p) + if err != nil { + return nil, errdefs.ToGRPC(err) + } + return &api.GetResponse{ + Process: t, + }, nil +} + +func (l *local) List(ctx context.Context, r *api.ListTasksRequest, _ ...grpc.CallOption) (*api.ListTasksResponse, error) { + resp := &api.ListTasksResponse{} + for _, r := range l.runtimes { + tasks, err := r.Tasks(ctx) + if err != nil { + return nil, errdefs.ToGRPC(err) + } + addTasks(ctx, resp, tasks) + } + return resp, nil +} + +func addTasks(ctx context.Context, r *api.ListTasksResponse, tasks []runtime.Task) { + for _, t := range tasks { + tt, err := processFromContainerd(ctx, t) + if err != nil { + if !errdefs.IsNotFound(err) { // handle race with deletion + log.G(ctx).WithError(err).WithField("id", t.ID()).Error("converting task to protobuf") + } + continue + } + r.Tasks = append(r.Tasks, tt) + } +} + +func (l *local) Pause(ctx context.Context, r *api.PauseTaskRequest, _ ...grpc.CallOption) (*ptypes.Empty, error) { + t, err := l.getTask(ctx, r.ContainerID) + if err != nil { + return nil, err + } + err = t.Pause(ctx) + if err != nil { + return nil, errdefs.ToGRPC(err) + } + return empty, nil +} + +func (l *local) Resume(ctx context.Context, r *api.ResumeTaskRequest, _ ...grpc.CallOption) (*ptypes.Empty, error) { + t, err := l.getTask(ctx, r.ContainerID) + if err != nil { + return nil, err + } + err = t.Resume(ctx) + if err != nil { + return nil, errdefs.ToGRPC(err) + } + return empty, nil +} + +func (l *local) Kill(ctx context.Context, r *api.KillRequest, _ ...grpc.CallOption) (*ptypes.Empty, error) { + t, err := l.getTask(ctx, r.ContainerID) + if err != nil { + return nil, err + } + p := runtime.Process(t) + if r.ExecID != "" { + if p, err = t.Process(ctx, r.ExecID); err != nil { + return nil, errdefs.ToGRPC(err) + } + } + if err := p.Kill(ctx, r.Signal, r.All); err != nil { + return nil, errdefs.ToGRPC(err) + } + return empty, nil +} + +func (l *local) ListPids(ctx context.Context, r *api.ListPidsRequest, _ ...grpc.CallOption) (*api.ListPidsResponse, error) { + t, err := l.getTask(ctx, r.ContainerID) + if err != nil { + return nil, err + } + processList, err := t.Pids(ctx) + if err != nil { + return nil, errdefs.ToGRPC(err) + } + var processes []*task.ProcessInfo + for _, p := range processList { + pInfo := task.ProcessInfo{ + Pid: p.Pid, + } + if p.Info != nil { + a, err := typeurl.MarshalAny(p.Info) + if err != nil { + return nil, errors.Wrapf(err, "failed to marshal process %d info", p.Pid) + } + pInfo.Info = a + } + processes = append(processes, &pInfo) + } + return &api.ListPidsResponse{ + Processes: processes, + }, nil +} + +func (l *local) Exec(ctx context.Context, r *api.ExecProcessRequest, _ ...grpc.CallOption) (*ptypes.Empty, error) { + if r.ExecID == "" { + return nil, status.Errorf(codes.InvalidArgument, "exec id cannot be empty") + } + t, err := l.getTask(ctx, r.ContainerID) + if err != nil { + return nil, err + } + if _, err := t.Exec(ctx, r.ExecID, runtime.ExecOpts{ + Spec: r.Spec, + IO: runtime.IO{ + Stdin: r.Stdin, + Stdout: r.Stdout, + Stderr: r.Stderr, + Terminal: r.Terminal, + }, + }); err != nil { + return nil, errdefs.ToGRPC(err) + } + return empty, nil +} + +func (l *local) ResizePty(ctx context.Context, r *api.ResizePtyRequest, _ ...grpc.CallOption) (*ptypes.Empty, error) { + t, err := l.getTask(ctx, r.ContainerID) + if err != nil { + return nil, err + } + p := runtime.Process(t) + if r.ExecID != "" { + if p, err = t.Process(ctx, r.ExecID); err != nil { + return nil, errdefs.ToGRPC(err) + } + } + if err := p.ResizePty(ctx, runtime.ConsoleSize{ + Width: r.Width, + Height: r.Height, + }); err != nil { + return nil, errdefs.ToGRPC(err) + } + return empty, nil +} + +func (l *local) CloseIO(ctx context.Context, r *api.CloseIORequest, _ ...grpc.CallOption) (*ptypes.Empty, error) { + t, err := l.getTask(ctx, r.ContainerID) + if err != nil { + return nil, err + } + p := runtime.Process(t) + if r.ExecID != "" { + if p, err = t.Process(ctx, r.ExecID); err != nil { + return nil, errdefs.ToGRPC(err) + } + } + if r.Stdin { + if err := p.CloseIO(ctx); err != nil { + return nil, err + } + } + return empty, nil +} + +func (l *local) Checkpoint(ctx context.Context, r *api.CheckpointTaskRequest, _ ...grpc.CallOption) (*api.CheckpointTaskResponse, error) { + container, err := l.getContainer(ctx, r.ContainerID) + if err != nil { + return nil, err + } + t, err := l.getTaskFromContainer(ctx, container) + if err != nil { + return nil, err + } + image, err := ioutil.TempDir("", "ctd-checkpoint") + if err != nil { + return nil, errdefs.ToGRPC(err) + } + defer os.RemoveAll(image) + if err := t.Checkpoint(ctx, image, r.Options); err != nil { + return nil, errdefs.ToGRPC(err) + } + // write checkpoint to the content store + tar := archive.Diff(ctx, "", image) + cp, err := l.writeContent(ctx, images.MediaTypeContainerd1Checkpoint, image, tar) + // close tar first after write + if err := tar.Close(); err != nil { + return nil, err + } + if err != nil { + return nil, err + } + // write the config to the content store + data, err := container.Spec.Marshal() + if err != nil { + return nil, err + } + spec := bytes.NewReader(data) + specD, err := l.writeContent(ctx, images.MediaTypeContainerd1CheckpointConfig, filepath.Join(image, "spec"), spec) + if err != nil { + return nil, errdefs.ToGRPC(err) + } + return &api.CheckpointTaskResponse{ + Descriptors: []*types.Descriptor{ + cp, + specD, + }, + }, nil +} + +func (l *local) Update(ctx context.Context, r *api.UpdateTaskRequest, _ ...grpc.CallOption) (*ptypes.Empty, error) { + t, err := l.getTask(ctx, r.ContainerID) + if err != nil { + return nil, err + } + if err := t.Update(ctx, r.Resources); err != nil { + return nil, errdefs.ToGRPC(err) + } + return empty, nil +} + +func (l *local) Metrics(ctx context.Context, r *api.MetricsRequest, _ ...grpc.CallOption) (*api.MetricsResponse, error) { + filter, err := filters.ParseAll(r.Filters...) + if err != nil { + return nil, err + } + var resp api.MetricsResponse + for _, r := range l.runtimes { + tasks, err := r.Tasks(ctx) + if err != nil { + return nil, err + } + getTasksMetrics(ctx, filter, tasks, &resp) + } + return &resp, nil +} + +func (l *local) Wait(ctx context.Context, r *api.WaitRequest, _ ...grpc.CallOption) (*api.WaitResponse, error) { + t, err := l.getTask(ctx, r.ContainerID) + if err != nil { + return nil, err + } + p := runtime.Process(t) + if r.ExecID != "" { + if p, err = t.Process(ctx, r.ExecID); err != nil { + return nil, errdefs.ToGRPC(err) + } + } + exit, err := p.Wait(ctx) + if err != nil { + return nil, errdefs.ToGRPC(err) + } + return &api.WaitResponse{ + ExitStatus: exit.Status, + ExitedAt: exit.Timestamp, + }, nil +} + +func getTasksMetrics(ctx context.Context, filter filters.Filter, tasks []runtime.Task, r *api.MetricsResponse) { + for _, tk := range tasks { + if !filter.Match(filters.AdapterFunc(func(fieldpath []string) (string, bool) { + t := tk + switch fieldpath[0] { + case "id": + return t.ID(), true + case "namespace": + return t.Info().Namespace, true + case "runtime": + return t.Info().Runtime, true + } + return "", false + })) { + continue + } + + collected := time.Now() + metrics, err := tk.Metrics(ctx) + if err != nil { + if !errdefs.IsNotFound(err) { + log.G(ctx).WithError(err).Errorf("collecting metrics for %s", tk.ID()) + } + continue + } + data, err := typeurl.MarshalAny(metrics) + if err != nil { + log.G(ctx).WithError(err).Errorf("marshal metrics for %s", tk.ID()) + continue + } + r.Metrics = append(r.Metrics, &types.Metric{ + ID: tk.ID(), + Timestamp: collected, + Data: data, + }) + } +} + +func (l *local) writeContent(ctx context.Context, mediaType, ref string, r io.Reader) (*types.Descriptor, error) { + writer, err := l.store.Writer(ctx, ref, 0, "") + if err != nil { + return nil, err + } + defer writer.Close() + size, err := io.Copy(writer, r) + if err != nil { + return nil, err + } + if err := writer.Commit(ctx, 0, ""); err != nil { + return nil, err + } + return &types.Descriptor{ + MediaType: mediaType, + Digest: writer.Digest(), + Size_: size, + }, nil +} + +func (l *local) getContainer(ctx context.Context, id string) (*containers.Container, error) { + var container containers.Container + if err := l.db.View(func(tx *bolt.Tx) error { + store := metadata.NewContainerStore(tx) + var err error + container, err = store.Get(ctx, id) + return err + }); err != nil { + return nil, errdefs.ToGRPC(err) + } + return &container, nil +} + +func (l *local) getTask(ctx context.Context, id string) (runtime.Task, error) { + container, err := l.getContainer(ctx, id) + if err != nil { + return nil, err + } + return l.getTaskFromContainer(ctx, container) +} + +func (l *local) getTaskFromContainer(ctx context.Context, container *containers.Container) (runtime.Task, error) { + runtime, err := l.getRuntime(container.Runtime.Name) + if err != nil { + return nil, errdefs.ToGRPCf(err, "runtime for task %s", container.Runtime.Name) + } + t, err := runtime.Get(ctx, container.ID) + if err != nil { + return nil, status.Errorf(codes.NotFound, "task %v not found", container.ID) + } + return t, nil +} + +func (l *local) getRuntime(name string) (runtime.Runtime, error) { + runtime, ok := l.runtimes[name] + if !ok { + return nil, status.Errorf(codes.NotFound, "unknown runtime %q", name) + } + return runtime, nil +} diff --git a/services/tasks/service.go b/services/tasks/service.go index b1329335b..becf1508b 100644 --- a/services/tasks/service.go +++ b/services/tasks/service.go @@ -17,42 +17,17 @@ package tasks import ( - "bytes" - "fmt" - "io" - "io/ioutil" - "os" - "path/filepath" - "time" - - "github.com/boltdb/bolt" api "github.com/containerd/containerd/api/services/tasks/v1" - "github.com/containerd/containerd/api/types" - "github.com/containerd/containerd/api/types/task" - "github.com/containerd/containerd/archive" - "github.com/containerd/containerd/containers" - "github.com/containerd/containerd/content" - "github.com/containerd/containerd/errdefs" - "github.com/containerd/containerd/events" - "github.com/containerd/containerd/filters" - "github.com/containerd/containerd/images" - "github.com/containerd/containerd/log" - "github.com/containerd/containerd/metadata" - "github.com/containerd/containerd/mount" "github.com/containerd/containerd/plugin" - "github.com/containerd/containerd/runtime" - "github.com/containerd/typeurl" + "github.com/containerd/containerd/services" ptypes "github.com/gogo/protobuf/types" "github.com/pkg/errors" "golang.org/x/net/context" "google.golang.org/grpc" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" ) var ( - _ = (api.TasksServer)(&service{}) - empty = &ptypes.Empty{} + _ = (api.TasksServer)(&service{}) ) func init() { @@ -60,51 +35,28 @@ func init() { Type: plugin.GRPCPlugin, ID: "tasks", Requires: []plugin.Type{ - plugin.RuntimePlugin, - plugin.MetadataPlugin, + plugin.ServicePlugin, + }, + InitFn: func(ic *plugin.InitContext) (interface{}, error) { + plugins, err := ic.GetByType(plugin.ServicePlugin) + if err != nil { + return nil, err + } + p, ok := plugins[services.TasksService] + if !ok { + return nil, errors.New("tasks service not found") + } + i, err := p.Instance() + if err != nil { + return nil, err + } + return &service{local: i.(api.TasksClient)}, nil }, - InitFn: initFunc, }) } -func initFunc(ic *plugin.InitContext) (interface{}, error) { - rt, err := ic.GetByType(plugin.RuntimePlugin) - if err != nil { - return nil, err - } - - m, err := ic.Get(plugin.MetadataPlugin) - if err != nil { - return nil, err - } - cs := m.(*metadata.DB).ContentStore() - runtimes := make(map[string]runtime.Runtime) - for _, rr := range rt { - ri, err := rr.Instance() - if err != nil { - log.G(ic.Context).WithError(err).Warn("could not load runtime instance due to initialization error") - continue - } - r := ri.(runtime.Runtime) - runtimes[r.ID()] = r - } - - if len(runtimes) == 0 { - return nil, errors.New("no runtimes available to create task service") - } - return &service{ - runtimes: runtimes, - db: m.(*metadata.DB), - store: cs, - publisher: ic.Events, - }, nil -} - type service struct { - runtimes map[string]runtime.Runtime - db *metadata.DB - store content.Store - publisher events.Publisher + local api.TasksClient } func (s *service) Register(server *grpc.Server) error { @@ -113,526 +65,69 @@ func (s *service) Register(server *grpc.Server) error { } func (s *service) Create(ctx context.Context, r *api.CreateTaskRequest) (*api.CreateTaskResponse, error) { - var ( - checkpointPath string - err error - ) - if r.Checkpoint != nil { - checkpointPath, err = ioutil.TempDir("", "ctrd-checkpoint") - if err != nil { - return nil, err - } - if r.Checkpoint.MediaType != images.MediaTypeContainerd1Checkpoint { - return nil, fmt.Errorf("unsupported checkpoint type %q", r.Checkpoint.MediaType) - } - reader, err := s.store.ReaderAt(ctx, r.Checkpoint.Digest) - if err != nil { - return nil, err - } - _, err = archive.Apply(ctx, checkpointPath, content.NewReader(reader)) - reader.Close() - if err != nil { - return nil, err - } - } - - container, err := s.getContainer(ctx, r.ContainerID) - if err != nil { - return nil, errdefs.ToGRPC(err) - } - opts := runtime.CreateOpts{ - Spec: container.Spec, - IO: runtime.IO{ - Stdin: r.Stdin, - Stdout: r.Stdout, - Stderr: r.Stderr, - Terminal: r.Terminal, - }, - Checkpoint: checkpointPath, - Options: r.Options, - } - for _, m := range r.Rootfs { - opts.Rootfs = append(opts.Rootfs, mount.Mount{ - Type: m.Type, - Source: m.Source, - Options: m.Options, - }) - } - runtime, err := s.getRuntime(container.Runtime.Name) - if err != nil { - return nil, err - } - c, err := runtime.Create(ctx, r.ContainerID, opts) - if err != nil { - return nil, errdefs.ToGRPC(err) - } - state, err := c.State(ctx) - if err != nil { - log.G(ctx).Error(err) - } - - return &api.CreateTaskResponse{ - ContainerID: r.ContainerID, - Pid: state.Pid, - }, nil + return s.local.Create(ctx, r) } func (s *service) Start(ctx context.Context, r *api.StartRequest) (*api.StartResponse, error) { - t, err := s.getTask(ctx, r.ContainerID) - if err != nil { - return nil, err - } - p := runtime.Process(t) - if r.ExecID != "" { - if p, err = t.Process(ctx, r.ExecID); err != nil { - return nil, errdefs.ToGRPC(err) - } - } - if err := p.Start(ctx); err != nil { - return nil, errdefs.ToGRPC(err) - } - state, err := p.State(ctx) - if err != nil { - return nil, errdefs.ToGRPC(err) - } - return &api.StartResponse{ - Pid: state.Pid, - }, nil + return s.local.Start(ctx, r) } func (s *service) Delete(ctx context.Context, r *api.DeleteTaskRequest) (*api.DeleteResponse, error) { - t, err := s.getTask(ctx, r.ContainerID) - if err != nil { - return nil, err - } - runtime, err := s.getRuntime(t.Info().Runtime) - if err != nil { - return nil, err - } - exit, err := runtime.Delete(ctx, t) - if err != nil { - return nil, errdefs.ToGRPC(err) - } - return &api.DeleteResponse{ - ExitStatus: exit.Status, - ExitedAt: exit.Timestamp, - Pid: exit.Pid, - }, nil + return s.local.Delete(ctx, r) } func (s *service) DeleteProcess(ctx context.Context, r *api.DeleteProcessRequest) (*api.DeleteResponse, error) { - t, err := s.getTask(ctx, r.ContainerID) - if err != nil { - return nil, err - } - exit, err := t.DeleteProcess(ctx, r.ExecID) - if err != nil { - return nil, errdefs.ToGRPC(err) - } - return &api.DeleteResponse{ - ID: r.ExecID, - ExitStatus: exit.Status, - ExitedAt: exit.Timestamp, - Pid: exit.Pid, - }, nil -} - -func processFromContainerd(ctx context.Context, p runtime.Process) (*task.Process, error) { - state, err := p.State(ctx) - if err != nil { - return nil, err - } - var status task.Status - switch state.Status { - case runtime.CreatedStatus: - status = task.StatusCreated - case runtime.RunningStatus: - status = task.StatusRunning - case runtime.StoppedStatus: - status = task.StatusStopped - case runtime.PausedStatus: - status = task.StatusPaused - case runtime.PausingStatus: - status = task.StatusPausing - default: - log.G(ctx).WithField("status", state.Status).Warn("unknown status") - } - return &task.Process{ - ID: p.ID(), - Pid: state.Pid, - Status: status, - Stdin: state.Stdin, - Stdout: state.Stdout, - Stderr: state.Stderr, - Terminal: state.Terminal, - ExitStatus: state.ExitStatus, - ExitedAt: state.ExitedAt, - }, nil + return s.local.DeleteProcess(ctx, r) } func (s *service) Get(ctx context.Context, r *api.GetRequest) (*api.GetResponse, error) { - task, err := s.getTask(ctx, r.ContainerID) - if err != nil { - return nil, err - } - p := runtime.Process(task) - if r.ExecID != "" { - if p, err = task.Process(ctx, r.ExecID); err != nil { - return nil, errdefs.ToGRPC(err) - } - } - t, err := processFromContainerd(ctx, p) - if err != nil { - return nil, errdefs.ToGRPC(err) - } - return &api.GetResponse{ - Process: t, - }, nil + return s.local.Get(ctx, r) } func (s *service) List(ctx context.Context, r *api.ListTasksRequest) (*api.ListTasksResponse, error) { - resp := &api.ListTasksResponse{} - for _, r := range s.runtimes { - tasks, err := r.Tasks(ctx) - if err != nil { - return nil, errdefs.ToGRPC(err) - } - addTasks(ctx, resp, tasks) - } - return resp, nil -} - -func addTasks(ctx context.Context, r *api.ListTasksResponse, tasks []runtime.Task) { - for _, t := range tasks { - tt, err := processFromContainerd(ctx, t) - if err != nil { - if !errdefs.IsNotFound(err) { // handle race with deletion - log.G(ctx).WithError(err).WithField("id", t.ID()).Error("converting task to protobuf") - } - continue - } - r.Tasks = append(r.Tasks, tt) - } + return s.local.List(ctx, r) } func (s *service) Pause(ctx context.Context, r *api.PauseTaskRequest) (*ptypes.Empty, error) { - t, err := s.getTask(ctx, r.ContainerID) - if err != nil { - return nil, err - } - err = t.Pause(ctx) - if err != nil { - return nil, errdefs.ToGRPC(err) - } - return empty, nil + return s.local.Pause(ctx, r) } func (s *service) Resume(ctx context.Context, r *api.ResumeTaskRequest) (*ptypes.Empty, error) { - t, err := s.getTask(ctx, r.ContainerID) - if err != nil { - return nil, err - } - err = t.Resume(ctx) - if err != nil { - return nil, errdefs.ToGRPC(err) - } - return empty, nil + return s.local.Resume(ctx, r) } func (s *service) Kill(ctx context.Context, r *api.KillRequest) (*ptypes.Empty, error) { - t, err := s.getTask(ctx, r.ContainerID) - if err != nil { - return nil, err - } - p := runtime.Process(t) - if r.ExecID != "" { - if p, err = t.Process(ctx, r.ExecID); err != nil { - return nil, errdefs.ToGRPC(err) - } - } - if err := p.Kill(ctx, r.Signal, r.All); err != nil { - return nil, errdefs.ToGRPC(err) - } - return empty, nil + return s.local.Kill(ctx, r) } func (s *service) ListPids(ctx context.Context, r *api.ListPidsRequest) (*api.ListPidsResponse, error) { - t, err := s.getTask(ctx, r.ContainerID) - if err != nil { - return nil, err - } - processList, err := t.Pids(ctx) - if err != nil { - return nil, errdefs.ToGRPC(err) - } - var processes []*task.ProcessInfo - for _, p := range processList { - pInfo := task.ProcessInfo{ - Pid: p.Pid, - } - if p.Info != nil { - a, err := typeurl.MarshalAny(p.Info) - if err != nil { - return nil, errors.Wrapf(err, "failed to marshal process %d info", p.Pid) - } - pInfo.Info = a - } - processes = append(processes, &pInfo) - } - return &api.ListPidsResponse{ - Processes: processes, - }, nil + return s.local.ListPids(ctx, r) } func (s *service) Exec(ctx context.Context, r *api.ExecProcessRequest) (*ptypes.Empty, error) { - if r.ExecID == "" { - return nil, status.Errorf(codes.InvalidArgument, "exec id cannot be empty") - } - t, err := s.getTask(ctx, r.ContainerID) - if err != nil { - return nil, err - } - if _, err := t.Exec(ctx, r.ExecID, runtime.ExecOpts{ - Spec: r.Spec, - IO: runtime.IO{ - Stdin: r.Stdin, - Stdout: r.Stdout, - Stderr: r.Stderr, - Terminal: r.Terminal, - }, - }); err != nil { - return nil, errdefs.ToGRPC(err) - } - return empty, nil + return s.local.Exec(ctx, r) } func (s *service) ResizePty(ctx context.Context, r *api.ResizePtyRequest) (*ptypes.Empty, error) { - t, err := s.getTask(ctx, r.ContainerID) - if err != nil { - return nil, err - } - p := runtime.Process(t) - if r.ExecID != "" { - if p, err = t.Process(ctx, r.ExecID); err != nil { - return nil, errdefs.ToGRPC(err) - } - } - if err := p.ResizePty(ctx, runtime.ConsoleSize{ - Width: r.Width, - Height: r.Height, - }); err != nil { - return nil, errdefs.ToGRPC(err) - } - return empty, nil + return s.local.ResizePty(ctx, r) } func (s *service) CloseIO(ctx context.Context, r *api.CloseIORequest) (*ptypes.Empty, error) { - t, err := s.getTask(ctx, r.ContainerID) - if err != nil { - return nil, err - } - p := runtime.Process(t) - if r.ExecID != "" { - if p, err = t.Process(ctx, r.ExecID); err != nil { - return nil, errdefs.ToGRPC(err) - } - } - if r.Stdin { - if err := p.CloseIO(ctx); err != nil { - return nil, err - } - } - return empty, nil + return s.local.CloseIO(ctx, r) } func (s *service) Checkpoint(ctx context.Context, r *api.CheckpointTaskRequest) (*api.CheckpointTaskResponse, error) { - container, err := s.getContainer(ctx, r.ContainerID) - if err != nil { - return nil, err - } - t, err := s.getTaskFromContainer(ctx, container) - if err != nil { - return nil, err - } - image, err := ioutil.TempDir("", "ctd-checkpoint") - if err != nil { - return nil, errdefs.ToGRPC(err) - } - defer os.RemoveAll(image) - if err := t.Checkpoint(ctx, image, r.Options); err != nil { - return nil, errdefs.ToGRPC(err) - } - // write checkpoint to the content store - tar := archive.Diff(ctx, "", image) - cp, err := s.writeContent(ctx, images.MediaTypeContainerd1Checkpoint, image, tar) - // close tar first after write - if err := tar.Close(); err != nil { - return nil, err - } - if err != nil { - return nil, err - } - // write the config to the content store - data, err := container.Spec.Marshal() - if err != nil { - return nil, err - } - spec := bytes.NewReader(data) - specD, err := s.writeContent(ctx, images.MediaTypeContainerd1CheckpointConfig, filepath.Join(image, "spec"), spec) - if err != nil { - return nil, errdefs.ToGRPC(err) - } - return &api.CheckpointTaskResponse{ - Descriptors: []*types.Descriptor{ - cp, - specD, - }, - }, nil + return s.local.Checkpoint(ctx, r) } func (s *service) Update(ctx context.Context, r *api.UpdateTaskRequest) (*ptypes.Empty, error) { - t, err := s.getTask(ctx, r.ContainerID) - if err != nil { - return nil, err - } - if err := t.Update(ctx, r.Resources); err != nil { - return nil, errdefs.ToGRPC(err) - } - return empty, nil + return s.local.Update(ctx, r) } func (s *service) Metrics(ctx context.Context, r *api.MetricsRequest) (*api.MetricsResponse, error) { - filter, err := filters.ParseAll(r.Filters...) - if err != nil { - return nil, err - } - var resp api.MetricsResponse - for _, r := range s.runtimes { - tasks, err := r.Tasks(ctx) - if err != nil { - return nil, err - } - getTasksMetrics(ctx, filter, tasks, &resp) - } - return &resp, nil + return s.local.Metrics(ctx, r) } func (s *service) Wait(ctx context.Context, r *api.WaitRequest) (*api.WaitResponse, error) { - t, err := s.getTask(ctx, r.ContainerID) - if err != nil { - return nil, err - } - p := runtime.Process(t) - if r.ExecID != "" { - if p, err = t.Process(ctx, r.ExecID); err != nil { - return nil, errdefs.ToGRPC(err) - } - } - exit, err := p.Wait(ctx) - if err != nil { - return nil, errdefs.ToGRPC(err) - } - return &api.WaitResponse{ - ExitStatus: exit.Status, - ExitedAt: exit.Timestamp, - }, nil -} - -func getTasksMetrics(ctx context.Context, filter filters.Filter, tasks []runtime.Task, r *api.MetricsResponse) { - for _, tk := range tasks { - if !filter.Match(filters.AdapterFunc(func(fieldpath []string) (string, bool) { - t := tk - switch fieldpath[0] { - case "id": - return t.ID(), true - case "namespace": - return t.Info().Namespace, true - case "runtime": - return t.Info().Runtime, true - } - return "", false - })) { - continue - } - - collected := time.Now() - metrics, err := tk.Metrics(ctx) - if err != nil { - if !errdefs.IsNotFound(err) { - log.G(ctx).WithError(err).Errorf("collecting metrics for %s", tk.ID()) - } - continue - } - data, err := typeurl.MarshalAny(metrics) - if err != nil { - log.G(ctx).WithError(err).Errorf("marshal metrics for %s", tk.ID()) - continue - } - r.Metrics = append(r.Metrics, &types.Metric{ - ID: tk.ID(), - Timestamp: collected, - Data: data, - }) - } -} - -func (s *service) writeContent(ctx context.Context, mediaType, ref string, r io.Reader) (*types.Descriptor, error) { - writer, err := s.store.Writer(ctx, ref, 0, "") - if err != nil { - return nil, err - } - defer writer.Close() - size, err := io.Copy(writer, r) - if err != nil { - return nil, err - } - if err := writer.Commit(ctx, 0, ""); err != nil { - return nil, err - } - return &types.Descriptor{ - MediaType: mediaType, - Digest: writer.Digest(), - Size_: size, - }, nil -} - -func (s *service) getContainer(ctx context.Context, id string) (*containers.Container, error) { - var container containers.Container - if err := s.db.View(func(tx *bolt.Tx) error { - store := metadata.NewContainerStore(tx) - var err error - container, err = store.Get(ctx, id) - return err - }); err != nil { - return nil, errdefs.ToGRPC(err) - } - return &container, nil -} - -func (s *service) getTask(ctx context.Context, id string) (runtime.Task, error) { - container, err := s.getContainer(ctx, id) - if err != nil { - return nil, err - } - return s.getTaskFromContainer(ctx, container) -} - -func (s *service) getTaskFromContainer(ctx context.Context, container *containers.Container) (runtime.Task, error) { - runtime, err := s.getRuntime(container.Runtime.Name) - if err != nil { - return nil, errdefs.ToGRPCf(err, "runtime for task %s", container.Runtime.Name) - } - t, err := runtime.Get(ctx, container.ID) - if err != nil { - return nil, status.Errorf(codes.NotFound, "task %v not found", container.ID) - } - return t, nil -} - -func (s *service) getRuntime(name string) (runtime.Runtime, error) { - runtime, ok := s.runtimes[name] - if !ok { - return nil, status.Errorf(codes.NotFound, "unknown runtime %q", name) - } - return runtime, nil + return s.local.Wait(ctx, r) }