Add service plugin and support in process integration.

Signed-off-by: Lantao Liu <lantaol@google.com>
This commit is contained in:
Lantao Liu 2018-03-12 08:27:35 +00:00
parent ba93435337
commit 1128b3d664
26 changed files with 2254 additions and 1297 deletions

166
client.go
View File

@ -31,6 +31,7 @@ import (
eventsapi "github.com/containerd/containerd/api/services/events/v1"
imagesapi "github.com/containerd/containerd/api/services/images/v1"
introspectionapi "github.com/containerd/containerd/api/services/introspection/v1"
leasesapi "github.com/containerd/containerd/api/services/leases/v1"
namespacesapi "github.com/containerd/containerd/api/services/namespaces/v1"
snapshotsapi "github.com/containerd/containerd/api/services/snapshots/v1"
"github.com/containerd/containerd/api/services/tasks/v1"
@ -39,6 +40,7 @@ import (
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/dialer"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/events"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/platforms"
@ -75,54 +77,73 @@ func New(address string, opts ...ClientOpt) (*Client, error) {
return nil, err
}
}
gopts := []grpc.DialOption{
grpc.WithBlock(),
grpc.WithInsecure(),
grpc.WithTimeout(60 * time.Second),
grpc.FailOnNonTempDialError(true),
grpc.WithBackoffMaxDelay(3 * time.Second),
grpc.WithDialer(dialer.Dialer),
c := &Client{
runtime: fmt.Sprintf("%s.%s", plugin.RuntimePlugin, runtime.GOOS),
}
if len(copts.dialOptions) > 0 {
gopts = copts.dialOptions
if copts.services != nil {
c.services = *copts.services
}
if copts.defaultns != "" {
unary, stream := newNSInterceptors(copts.defaultns)
gopts = append(gopts,
grpc.WithUnaryInterceptor(unary),
grpc.WithStreamInterceptor(stream),
)
}
connector := func() (*grpc.ClientConn, error) {
conn, err := grpc.Dial(dialer.DialAddress(address), gopts...)
if err != nil {
return nil, errors.Wrapf(err, "failed to dial %q", address)
if address != "" {
gopts := []grpc.DialOption{
grpc.WithBlock(),
grpc.WithInsecure(),
grpc.WithTimeout(60 * time.Second),
grpc.FailOnNonTempDialError(true),
grpc.WithBackoffMaxDelay(3 * time.Second),
grpc.WithDialer(dialer.Dialer),
}
return conn, nil
if len(copts.dialOptions) > 0 {
gopts = copts.dialOptions
}
if copts.defaultns != "" {
unary, stream := newNSInterceptors(copts.defaultns)
gopts = append(gopts,
grpc.WithUnaryInterceptor(unary),
grpc.WithStreamInterceptor(stream),
)
}
connector := func() (*grpc.ClientConn, error) {
conn, err := grpc.Dial(dialer.DialAddress(address), gopts...)
if err != nil {
return nil, errors.Wrapf(err, "failed to dial %q", address)
}
return conn, nil
}
conn, err := connector()
if err != nil {
return nil, err
}
c.conn, c.connector = conn, connector
}
conn, err := connector()
if err != nil {
return nil, err
if copts.services == nil && c.conn == nil {
return nil, errors.New("no grpc connection or services is available")
}
return &Client{
conn: conn,
connector: connector,
runtime: fmt.Sprintf("%s.%s", plugin.RuntimePlugin, runtime.GOOS),
}, nil
return c, nil
}
// NewWithConn returns a new containerd client that is connected to the containerd
// instance provided by the connection
func NewWithConn(conn *grpc.ClientConn, opts ...ClientOpt) (*Client, error) {
return &Client{
var copts clientOpts
for _, o := range opts {
if err := o(&copts); err != nil {
return nil, err
}
}
c := &Client{
conn: conn,
runtime: fmt.Sprintf("%s.%s", plugin.RuntimePlugin, runtime.GOOS),
}, nil
}
if copts.services != nil {
c.services = *copts.services
}
return c, nil
}
// Client is the client to interact with containerd and its various services
// using a uniform interface
type Client struct {
services
conn *grpc.ClientConn
runtime string
connector func() (*grpc.ClientConn, error)
@ -149,6 +170,9 @@ func (c *Client) Reconnect() error {
// connection. A timeout can be set in the context to ensure it returns
// early.
func (c *Client) IsServing(ctx context.Context) (bool, error) {
if c.conn == nil {
return false, errors.New("no grpc connection available")
}
r, err := c.HealthService().Check(ctx, &grpc_health_v1.HealthCheckRequest{}, grpc.FailFast(false))
if err != nil {
return false, err
@ -385,43 +409,8 @@ func (c *Client) ListImages(ctx context.Context, filters ...string) ([]Image, er
//
// The subscriber can stop receiving events by canceling the provided context.
// The errs channel will be closed and return a nil error.
func (c *Client) Subscribe(ctx context.Context, filters ...string) (ch <-chan *eventsapi.Envelope, errs <-chan error) {
var (
evq = make(chan *eventsapi.Envelope)
errq = make(chan error, 1)
)
errs = errq
ch = evq
session, err := c.EventService().Subscribe(ctx, &eventsapi.SubscribeRequest{
Filters: filters,
})
if err != nil {
errq <- err
close(errq)
return
}
go func() {
defer close(errq)
for {
ev, err := session.Recv()
if err != nil {
errq <- err
return
}
select {
case evq <- ev:
case <-ctx.Done():
return
}
}
}()
return ch, errs
func (c *Client) Subscribe(ctx context.Context, filters ...string) (ch <-chan *events.Envelope, errs <-chan error) {
return c.EventService().Subscribe(ctx, filters...)
}
// Close closes the clients connection to containerd
@ -431,36 +420,57 @@ func (c *Client) Close() error {
// NamespaceService returns the underlying Namespaces Store
func (c *Client) NamespaceService() namespaces.Store {
if c.namespaceStore != nil {
return c.namespaceStore
}
return NewNamespaceStoreFromClient(namespacesapi.NewNamespacesClient(c.conn))
}
// ContainerService returns the underlying container Store
func (c *Client) ContainerService() containers.Store {
if c.containerStore != nil {
return c.containerStore
}
return NewRemoteContainerStore(containersapi.NewContainersClient(c.conn))
}
// ContentStore returns the underlying content Store
func (c *Client) ContentStore() content.Store {
if c.contentStore != nil {
return c.contentStore
}
return NewContentStoreFromClient(contentapi.NewContentClient(c.conn))
}
// SnapshotService returns the underlying snapshotter for the provided snapshotter name
func (c *Client) SnapshotService(snapshotterName string) snapshots.Snapshotter {
if c.snapshotters != nil {
return c.snapshotters[snapshotterName]
}
return NewSnapshotterFromClient(snapshotsapi.NewSnapshotsClient(c.conn), snapshotterName)
}
// TaskService returns the underlying TasksClient
func (c *Client) TaskService() tasks.TasksClient {
if c.taskService != nil {
return c.taskService
}
return tasks.NewTasksClient(c.conn)
}
// ImageService returns the underlying image Store
func (c *Client) ImageService() images.Store {
if c.imageStore != nil {
return c.imageStore
}
return NewImageStoreFromClient(imagesapi.NewImagesClient(c.conn))
}
// DiffService returns the underlying Differ
func (c *Client) DiffService() DiffService {
if c.diffService != nil {
return c.diffService
}
return NewDiffServiceFromClient(diffapi.NewDiffClient(c.conn))
}
@ -469,14 +479,25 @@ func (c *Client) IntrospectionService() introspectionapi.IntrospectionClient {
return introspectionapi.NewIntrospectionClient(c.conn)
}
// LeasesService returns the underlying Leases Client
func (c *Client) LeasesService() leasesapi.LeasesClient {
if c.leasesService != nil {
return c.leasesService
}
return leasesapi.NewLeasesClient(c.conn)
}
// HealthService returns the underlying GRPC HealthClient
func (c *Client) HealthService() grpc_health_v1.HealthClient {
return grpc_health_v1.NewHealthClient(c.conn)
}
// EventService returns the underlying EventsClient
func (c *Client) EventService() eventsapi.EventsClient {
return eventsapi.NewEventsClient(c.conn)
// EventService returns the underlying event service
func (c *Client) EventService() EventService {
if c.eventService != nil {
return c.eventService
}
return NewEventServiceFromClient(eventsapi.NewEventsClient(c.conn))
}
// VersionService returns the underlying VersionClient
@ -494,6 +515,9 @@ type Version struct {
// Version returns the version of containerd that the client is connected to
func (c *Client) Version(ctx context.Context) (Version, error) {
if c.conn == nil {
return Version{}, errors.New("no grpc connection available")
}
response, err := c.VersionService().Version(ctx, &ptypes.Empty{})
if err != nil {
return Version{}, err

View File

@ -24,6 +24,7 @@ import (
type clientOpts struct {
defaultns string
services *services
dialOptions []grpc.DialOption
}
@ -49,6 +50,17 @@ func WithDialOpts(opts []grpc.DialOption) ClientOpt {
}
}
// WithServices sets services used by the client.
func WithServices(opts ...ServicesOpt) ClientOpt {
return func(c *clientOpts) error {
c.services = &services{}
for _, o := range opts {
o(c.services)
}
return nil
}
}
// RemoteOpt allows the caller to set distribution options for a remote
type RemoteOpt func(*Client, *RemoteContext) error

View File

@ -20,8 +20,8 @@ import (
"encoding/json"
"fmt"
eventsapi "github.com/containerd/containerd/api/services/events/v1"
"github.com/containerd/containerd/cmd/ctr/commands"
"github.com/containerd/containerd/events"
"github.com/containerd/typeurl"
"github.com/urfave/cli"
@ -41,15 +41,16 @@ var Command = cli.Command{
}
defer cancel()
eventsClient := client.EventService()
events, err := eventsClient.Subscribe(ctx, &eventsapi.SubscribeRequest{
Filters: context.Args(),
})
if err != nil {
return err
}
eventsCh, errCh := eventsClient.Subscribe(ctx, context.Args()...)
for {
e, err := events.Recv()
if err != nil {
var e *events.Envelope
select {
case evt, closed := <-eventsCh:
if closed {
return nil
}
e = evt
case err := <-errCh:
return err
}

View File

@ -22,6 +22,7 @@ import (
diffapi "github.com/containerd/containerd/api/services/diff/v1"
"github.com/containerd/containerd/api/types"
"github.com/containerd/containerd/diff"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/mount"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
)
@ -51,7 +52,7 @@ func (r *diffRemote) Apply(ctx context.Context, diff ocispec.Descriptor, mounts
}
resp, err := r.client.Apply(ctx, req)
if err != nil {
return ocispec.Descriptor{}, err
return ocispec.Descriptor{}, errdefs.FromGRPC(err)
}
return toDescriptor(resp.Applied), nil
}
@ -72,7 +73,7 @@ func (r *diffRemote) Compare(ctx context.Context, a, b []mount.Mount, opts ...di
}
resp, err := r.client.Diff(ctx, req)
if err != nil {
return ocispec.Descriptor{}, err
return ocispec.Descriptor{}, errdefs.FromGRPC(err)
}
return toDescriptor(resp.Diff), nil
}

119
events.go Normal file
View File

@ -0,0 +1,119 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package containerd
import (
"context"
eventsapi "github.com/containerd/containerd/api/services/events/v1"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/events"
"github.com/containerd/typeurl"
)
// EventService handles the publish, forward and subscribe of events.
type EventService interface {
events.Publisher
events.Forwarder
events.Subscriber
}
// NewEventServiceFromClient returns a new event service which communicates
// over a GRPC connection.
func NewEventServiceFromClient(client eventsapi.EventsClient) EventService {
return &eventRemote{
client: client,
}
}
type eventRemote struct {
client eventsapi.EventsClient
}
func (e *eventRemote) Publish(ctx context.Context, topic string, event events.Event) error {
any, err := typeurl.MarshalAny(event)
if err != nil {
return err
}
req := &eventsapi.PublishRequest{
Topic: topic,
Event: any,
}
if _, err := e.client.Publish(ctx, req); err != nil {
return errdefs.FromGRPC(err)
}
return nil
}
func (e *eventRemote) Forward(ctx context.Context, envelope *events.Envelope) error {
req := &eventsapi.ForwardRequest{
Envelope: &eventsapi.Envelope{
Timestamp: envelope.Timestamp,
Namespace: envelope.Namespace,
Topic: envelope.Topic,
Event: envelope.Event,
},
}
if _, err := e.client.Forward(ctx, req); err != nil {
return errdefs.FromGRPC(err)
}
return nil
}
func (e *eventRemote) Subscribe(ctx context.Context, filters ...string) (ch <-chan *events.Envelope, errs <-chan error) {
var (
evq = make(chan *events.Envelope)
errq = make(chan error, 1)
)
errs = errq
ch = evq
session, err := e.client.Subscribe(ctx, &eventsapi.SubscribeRequest{
Filters: filters,
})
if err != nil {
errq <- err
close(errq)
return
}
go func() {
defer close(errq)
for {
ev, err := session.Recv()
if err != nil {
errq <- err
return
}
select {
case evq <- &events.Envelope{
Timestamp: ev.Timestamp,
Namespace: ev.Namespace,
Topic: ev.Topic,
Event: ev.Event,
}:
case <-ctx.Done():
return
}
}
}()
return ch, errs
}

View File

@ -36,7 +36,7 @@ type Lease struct {
// CreateLease creates a new lease
func (c *Client) CreateLease(ctx context.Context) (Lease, error) {
lapi := leasesapi.NewLeasesClient(c.conn)
lapi := c.LeasesService()
resp, err := lapi.Create(ctx, &leasesapi.CreateRequest{})
if err != nil {
return Lease{}, err
@ -50,7 +50,7 @@ func (c *Client) CreateLease(ctx context.Context) (Lease, error) {
// ListLeases lists active leases
func (c *Client) ListLeases(ctx context.Context) ([]Lease, error) {
lapi := leasesapi.NewLeasesClient(c.conn)
lapi := c.LeasesService()
resp, err := lapi.List(ctx, &leasesapi.ListRequest{})
if err != nil {
return nil, err
@ -100,7 +100,7 @@ func (l Lease) CreatedAt() time.Time {
// Delete deletes the lease, removing the reference to all resources created
// during the lease.
func (l Lease) Delete(ctx context.Context) error {
lapi := leasesapi.NewLeasesClient(l.client.conn)
lapi := l.client.LeasesService()
_, err := lapi.Delete(ctx, &leasesapi.DeleteRequest{
ID: l.id,
})

View File

@ -195,6 +195,15 @@ func (m *DB) Snapshotter(name string) snapshots.Snapshotter {
return sn
}
// Snapshotters returns all available snapshotters.
func (m *DB) Snapshotters() map[string]snapshots.Snapshotter {
ss := make(map[string]snapshots.Snapshotter, len(m.ss))
for n, sn := range m.ss {
ss[n] = sn
}
return ss
}
// View runs a readonly transaction on the metadata store.
func (m *DB) View(fn func(*bolt.Tx) error) error {
return m.db.View(fn)

View File

@ -58,6 +58,8 @@ const (
AllPlugins Type = "*"
// RuntimePlugin implements a runtime
RuntimePlugin Type = "io.containerd.runtime.v1"
// ServicePlugin implements a internal service
ServicePlugin Type = "io.containerd.service.v1"
// GRPCPlugin implements a grpc service
GRPCPlugin Type = "io.containerd.grpc.v1"
// SnapshotPlugin implements a snapshotter

112
services.go Normal file
View File

@ -0,0 +1,112 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package containerd
import (
containersapi "github.com/containerd/containerd/api/services/containers/v1"
"github.com/containerd/containerd/api/services/diff/v1"
imagesapi "github.com/containerd/containerd/api/services/images/v1"
"github.com/containerd/containerd/api/services/leases/v1"
namespacesapi "github.com/containerd/containerd/api/services/namespaces/v1"
"github.com/containerd/containerd/api/services/tasks/v1"
"github.com/containerd/containerd/containers"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/snapshots"
)
type services struct {
contentStore content.Store
imageStore images.Store
containerStore containers.Store
namespaceStore namespaces.Store
snapshotters map[string]snapshots.Snapshotter
taskService tasks.TasksClient
diffService DiffService
eventService EventService
leasesService leases.LeasesClient
}
// ServicesOpt allows callers to set options on the services
type ServicesOpt func(c *services)
// WithContentStore sets the content store.
func WithContentStore(contentStore content.Store) ServicesOpt {
return func(s *services) {
s.contentStore = contentStore
}
}
// WithImageService sets the image service.
func WithImageService(imageService imagesapi.ImagesClient) ServicesOpt {
return func(s *services) {
s.imageStore = NewImageStoreFromClient(imageService)
}
}
// WithSnapshotters sets the snapshotters.
func WithSnapshotters(snapshotters map[string]snapshots.Snapshotter) ServicesOpt {
return func(s *services) {
s.snapshotters = make(map[string]snapshots.Snapshotter)
for n, sn := range snapshotters {
s.snapshotters[n] = sn
}
}
}
// WithContainerService sets the container service.
func WithContainerService(containerService containersapi.ContainersClient) ServicesOpt {
return func(s *services) {
s.containerStore = NewRemoteContainerStore(containerService)
}
}
// WithTaskService sets the task service.
func WithTaskService(taskService tasks.TasksClient) ServicesOpt {
return func(s *services) {
s.taskService = taskService
}
}
// WithDiffService sets the diff service.
func WithDiffService(diffService diff.DiffClient) ServicesOpt {
return func(s *services) {
s.diffService = NewDiffServiceFromClient(diffService)
}
}
// WithEventService sets the event service.
func WithEventService(eventService EventService) ServicesOpt {
return func(s *services) {
s.eventService = eventService
}
}
// WithNamespaceService sets the namespace service.
func WithNamespaceService(namespaceService namespacesapi.NamespacesClient) ServicesOpt {
return func(s *services) {
s.namespaceStore = NewNamespaceStoreFromClient(namespaceService)
}
}
// WithLeasesService sets the lease service.
func WithLeasesService(leasesService leases.LeasesClient) ServicesOpt {
return func(s *services) {
s.leasesService = leasesService
}
}

View File

@ -0,0 +1,189 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package containers
import (
"github.com/boltdb/bolt"
eventstypes "github.com/containerd/containerd/api/events"
api "github.com/containerd/containerd/api/services/containers/v1"
"github.com/containerd/containerd/containers"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/events"
"github.com/containerd/containerd/metadata"
"github.com/containerd/containerd/plugin"
"github.com/containerd/containerd/services"
ptypes "github.com/gogo/protobuf/types"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
func init() {
plugin.Register(&plugin.Registration{
Type: plugin.ServicePlugin,
ID: services.ContainersService,
Requires: []plugin.Type{
plugin.MetadataPlugin,
},
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
m, err := ic.Get(plugin.MetadataPlugin)
if err != nil {
return nil, err
}
return &local{
db: m.(*metadata.DB),
publisher: ic.Events,
}, nil
},
})
}
type local struct {
db *metadata.DB
publisher events.Publisher
}
var _ api.ContainersClient = &local{}
func (l *local) Get(ctx context.Context, req *api.GetContainerRequest, _ ...grpc.CallOption) (*api.GetContainerResponse, error) {
var resp api.GetContainerResponse
return &resp, errdefs.ToGRPC(l.withStoreView(ctx, func(ctx context.Context, store containers.Store) error {
container, err := store.Get(ctx, req.ID)
if err != nil {
return err
}
containerpb := containerToProto(&container)
resp.Container = containerpb
return nil
}))
}
func (l *local) List(ctx context.Context, req *api.ListContainersRequest, _ ...grpc.CallOption) (*api.ListContainersResponse, error) {
var resp api.ListContainersResponse
return &resp, errdefs.ToGRPC(l.withStoreView(ctx, func(ctx context.Context, store containers.Store) error {
containers, err := store.List(ctx, req.Filters...)
if err != nil {
return err
}
resp.Containers = containersToProto(containers)
return nil
}))
}
func (l *local) Create(ctx context.Context, req *api.CreateContainerRequest, _ ...grpc.CallOption) (*api.CreateContainerResponse, error) {
var resp api.CreateContainerResponse
if err := l.withStoreUpdate(ctx, func(ctx context.Context, store containers.Store) error {
container := containerFromProto(&req.Container)
created, err := store.Create(ctx, container)
if err != nil {
return err
}
resp.Container = containerToProto(&created)
return nil
}); err != nil {
return &resp, errdefs.ToGRPC(err)
}
if err := l.publisher.Publish(ctx, "/containers/create", &eventstypes.ContainerCreate{
ID: resp.Container.ID,
Image: resp.Container.Image,
Runtime: &eventstypes.ContainerCreate_Runtime{
Name: resp.Container.Runtime.Name,
Options: resp.Container.Runtime.Options,
},
}); err != nil {
return &resp, err
}
return &resp, nil
}
func (l *local) Update(ctx context.Context, req *api.UpdateContainerRequest, _ ...grpc.CallOption) (*api.UpdateContainerResponse, error) {
if req.Container.ID == "" {
return nil, status.Errorf(codes.InvalidArgument, "Container.ID required")
}
var (
resp api.UpdateContainerResponse
container = containerFromProto(&req.Container)
)
if err := l.withStoreUpdate(ctx, func(ctx context.Context, store containers.Store) error {
var fieldpaths []string
if req.UpdateMask != nil && len(req.UpdateMask.Paths) > 0 {
for _, path := range req.UpdateMask.Paths {
fieldpaths = append(fieldpaths, path)
}
}
updated, err := store.Update(ctx, container, fieldpaths...)
if err != nil {
return err
}
resp.Container = containerToProto(&updated)
return nil
}); err != nil {
return &resp, errdefs.ToGRPC(err)
}
if err := l.publisher.Publish(ctx, "/containers/update", &eventstypes.ContainerUpdate{
ID: resp.Container.ID,
Image: resp.Container.Image,
Labels: resp.Container.Labels,
SnapshotKey: resp.Container.SnapshotKey,
}); err != nil {
return &resp, err
}
return &resp, nil
}
func (l *local) Delete(ctx context.Context, req *api.DeleteContainerRequest, _ ...grpc.CallOption) (*ptypes.Empty, error) {
if err := l.withStoreUpdate(ctx, func(ctx context.Context, store containers.Store) error {
return store.Delete(ctx, req.ID)
}); err != nil {
return &ptypes.Empty{}, errdefs.ToGRPC(err)
}
if err := l.publisher.Publish(ctx, "/containers/delete", &eventstypes.ContainerDelete{
ID: req.ID,
}); err != nil {
return &ptypes.Empty{}, err
}
return &ptypes.Empty{}, nil
}
func (l *local) withStore(ctx context.Context, fn func(ctx context.Context, store containers.Store) error) func(tx *bolt.Tx) error {
return func(tx *bolt.Tx) error { return fn(ctx, metadata.NewContainerStore(tx)) }
}
func (l *local) withStoreView(ctx context.Context, fn func(ctx context.Context, store containers.Store) error) error {
return l.db.View(l.withStore(ctx, fn))
}
func (l *local) withStoreUpdate(ctx context.Context, fn func(ctx context.Context, store containers.Store) error) error {
return l.db.Update(l.withStore(ctx, fn))
}

View File

@ -17,19 +17,13 @@
package containers
import (
"github.com/boltdb/bolt"
eventstypes "github.com/containerd/containerd/api/events"
api "github.com/containerd/containerd/api/services/containers/v1"
"github.com/containerd/containerd/containers"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/events"
"github.com/containerd/containerd/metadata"
"github.com/containerd/containerd/plugin"
"github.com/containerd/containerd/services"
ptypes "github.com/gogo/protobuf/types"
"github.com/pkg/errors"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
func init() {
@ -37,27 +31,31 @@ func init() {
Type: plugin.GRPCPlugin,
ID: "containers",
Requires: []plugin.Type{
plugin.MetadataPlugin,
plugin.ServicePlugin,
},
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
m, err := ic.Get(plugin.MetadataPlugin)
plugins, err := ic.GetByType(plugin.ServicePlugin)
if err != nil {
return nil, err
}
return NewService(m.(*metadata.DB), ic.Events), nil
p, ok := plugins[services.ContainersService]
if !ok {
return nil, errors.New("containers service not found")
}
i, err := p.Instance()
if err != nil {
return nil, err
}
return &service{local: i.(api.ContainersClient)}, nil
},
})
}
type service struct {
db *metadata.DB
publisher events.Publisher
local api.ContainersClient
}
// NewService returns the container GRPC server
func NewService(db *metadata.DB, publisher events.Publisher) api.ContainersServer {
return &service{db: db, publisher: publisher}
}
var _ api.ContainersServer = &service{}
func (s *service) Register(server *grpc.Server) error {
api.RegisterContainersServer(server, s)
@ -65,129 +63,21 @@ func (s *service) Register(server *grpc.Server) error {
}
func (s *service) Get(ctx context.Context, req *api.GetContainerRequest) (*api.GetContainerResponse, error) {
var resp api.GetContainerResponse
return &resp, errdefs.ToGRPC(s.withStoreView(ctx, func(ctx context.Context, store containers.Store) error {
container, err := store.Get(ctx, req.ID)
if err != nil {
return err
}
containerpb := containerToProto(&container)
resp.Container = containerpb
return nil
}))
return s.local.Get(ctx, req)
}
func (s *service) List(ctx context.Context, req *api.ListContainersRequest) (*api.ListContainersResponse, error) {
var resp api.ListContainersResponse
return &resp, errdefs.ToGRPC(s.withStoreView(ctx, func(ctx context.Context, store containers.Store) error {
containers, err := store.List(ctx, req.Filters...)
if err != nil {
return err
}
resp.Containers = containersToProto(containers)
return nil
}))
return s.local.List(ctx, req)
}
func (s *service) Create(ctx context.Context, req *api.CreateContainerRequest) (*api.CreateContainerResponse, error) {
var resp api.CreateContainerResponse
if err := s.withStoreUpdate(ctx, func(ctx context.Context, store containers.Store) error {
container := containerFromProto(&req.Container)
created, err := store.Create(ctx, container)
if err != nil {
return err
}
resp.Container = containerToProto(&created)
return nil
}); err != nil {
return &resp, errdefs.ToGRPC(err)
}
if err := s.publisher.Publish(ctx, "/containers/create", &eventstypes.ContainerCreate{
ID: resp.Container.ID,
Image: resp.Container.Image,
Runtime: &eventstypes.ContainerCreate_Runtime{
Name: resp.Container.Runtime.Name,
Options: resp.Container.Runtime.Options,
},
}); err != nil {
return &resp, err
}
return &resp, nil
return s.local.Create(ctx, req)
}
func (s *service) Update(ctx context.Context, req *api.UpdateContainerRequest) (*api.UpdateContainerResponse, error) {
if req.Container.ID == "" {
return nil, status.Errorf(codes.InvalidArgument, "Container.ID required")
}
var (
resp api.UpdateContainerResponse
container = containerFromProto(&req.Container)
)
if err := s.withStoreUpdate(ctx, func(ctx context.Context, store containers.Store) error {
var fieldpaths []string
if req.UpdateMask != nil && len(req.UpdateMask.Paths) > 0 {
for _, path := range req.UpdateMask.Paths {
fieldpaths = append(fieldpaths, path)
}
}
updated, err := store.Update(ctx, container, fieldpaths...)
if err != nil {
return err
}
resp.Container = containerToProto(&updated)
return nil
}); err != nil {
return &resp, errdefs.ToGRPC(err)
}
if err := s.publisher.Publish(ctx, "/containers/update", &eventstypes.ContainerUpdate{
ID: resp.Container.ID,
Image: resp.Container.Image,
Labels: resp.Container.Labels,
SnapshotKey: resp.Container.SnapshotKey,
}); err != nil {
return &resp, err
}
return &resp, nil
return s.local.Update(ctx, req)
}
func (s *service) Delete(ctx context.Context, req *api.DeleteContainerRequest) (*ptypes.Empty, error) {
if err := s.withStoreUpdate(ctx, func(ctx context.Context, store containers.Store) error {
return store.Delete(ctx, req.ID)
}); err != nil {
return &ptypes.Empty{}, errdefs.ToGRPC(err)
}
if err := s.publisher.Publish(ctx, "/containers/delete", &eventstypes.ContainerDelete{
ID: req.ID,
}); err != nil {
return &ptypes.Empty{}, err
}
return &ptypes.Empty{}, nil
}
func (s *service) withStore(ctx context.Context, fn func(ctx context.Context, store containers.Store) error) func(tx *bolt.Tx) error {
return func(tx *bolt.Tx) error { return fn(ctx, metadata.NewContainerStore(tx)) }
}
func (s *service) withStoreView(ctx context.Context, fn func(ctx context.Context, store containers.Store) error) error {
return s.db.View(s.withStore(ctx, fn))
}
func (s *service) withStoreUpdate(ctx context.Context, fn func(ctx context.Context, store containers.Store) error) error {
return s.db.Update(s.withStore(ctx, fn))
return s.local.Delete(ctx, req)
}

View File

@ -20,14 +20,12 @@ import (
"io"
"sync"
eventstypes "github.com/containerd/containerd/api/events"
api "github.com/containerd/containerd/api/services/content/v1"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/events"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/metadata"
"github.com/containerd/containerd/plugin"
"github.com/containerd/containerd/services"
ptypes "github.com/gogo/protobuf/types"
digest "github.com/opencontainers/go-digest"
"github.com/pkg/errors"
@ -39,8 +37,7 @@ import (
)
type service struct {
store content.Store
publisher events.Publisher
store content.Store
}
var bufPool = sync.Pool{
@ -57,26 +54,29 @@ func init() {
Type: plugin.GRPCPlugin,
ID: "content",
Requires: []plugin.Type{
plugin.MetadataPlugin,
plugin.ServicePlugin,
},
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
m, err := ic.Get(plugin.MetadataPlugin)
plugins, err := ic.GetByType(plugin.ServicePlugin)
if err != nil {
return nil, err
}
s, err := NewService(m.(*metadata.DB).ContentStore(), ic.Events)
return s, err
p, ok := plugins[services.ContentService]
if !ok {
return nil, errors.New("content store service not found")
}
cs, err := p.Instance()
if err != nil {
return nil, err
}
return newService(cs.(content.Store)), nil
},
})
}
// NewService returns the content GRPC server
func NewService(cs content.Store, publisher events.Publisher) (api.ContentServer, error) {
return &service{
store: cs,
publisher: publisher,
}, nil
// newService returns the content GRPC server
func newService(cs content.Store) api.ContentServer {
return &service{store: cs}
}
func (s *service) Register(server *grpc.Server) error {
@ -166,12 +166,6 @@ func (s *service) Delete(ctx context.Context, req *api.DeleteContentRequest) (*p
return nil, errdefs.ToGRPC(err)
}
if err := s.publisher.Publish(ctx, "/content/delete", &eventstypes.ContentDelete{
Digest: req.Digest,
}); err != nil {
return nil, err
}
return &ptypes.Empty{}, nil
}

71
services/content/store.go Normal file
View File

@ -0,0 +1,71 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package content
import (
"context"
eventstypes "github.com/containerd/containerd/api/events"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/events"
"github.com/containerd/containerd/metadata"
"github.com/containerd/containerd/plugin"
"github.com/containerd/containerd/services"
digest "github.com/opencontainers/go-digest"
)
// store wraps content.Store with proper event published.
type store struct {
content.Store
publisher events.Publisher
}
func init() {
plugin.Register(&plugin.Registration{
Type: plugin.ServicePlugin,
ID: services.ContentService,
Requires: []plugin.Type{
plugin.MetadataPlugin,
},
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
m, err := ic.Get(plugin.MetadataPlugin)
if err != nil {
return nil, err
}
s, err := newContentStore(m.(*metadata.DB).ContentStore(), ic.Events)
return s, err
},
})
}
func newContentStore(cs content.Store, publisher events.Publisher) (content.Store, error) {
return &store{
Store: cs,
publisher: publisher,
}, nil
}
func (s *store) Delete(ctx context.Context, dgst digest.Digest) error {
if err := s.Store.Delete(ctx, dgst); err != nil {
return err
}
// TODO: Consider whether we should return error here.
return s.publisher.Publish(ctx, "/content/delete", &eventstypes.ContentDelete{
Digest: dgst,
})
}

178
services/diff/local.go Normal file
View File

@ -0,0 +1,178 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package diff
import (
diffapi "github.com/containerd/containerd/api/services/diff/v1"
"github.com/containerd/containerd/api/types"
"github.com/containerd/containerd/diff"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/mount"
"github.com/containerd/containerd/plugin"
"github.com/containerd/containerd/services"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
"golang.org/x/net/context"
"google.golang.org/grpc"
)
type config struct {
// Order is the order of preference in which to try diff algorithms, the
// first differ which is supported is used.
// Note when multiple differs may be supported, this order will be
// respected for which is choosen. Each differ should return the same
// correct output, allowing any ordering to be used to prefer
// more optimimal implementations.
Order []string `toml:"default"`
}
type differ interface {
diff.Comparer
diff.Applier
}
func init() {
plugin.Register(&plugin.Registration{
Type: plugin.ServicePlugin,
ID: services.DiffService,
Requires: []plugin.Type{
plugin.DiffPlugin,
},
Config: defaultDifferConfig,
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
differs, err := ic.GetByType(plugin.DiffPlugin)
if err != nil {
return nil, err
}
orderedNames := ic.Config.(*config).Order
ordered := make([]differ, len(orderedNames))
for i, n := range orderedNames {
differp, ok := differs[n]
if !ok {
return nil, errors.Errorf("needed differ not loaded: %s", n)
}
d, err := differp.Instance()
if err != nil {
return nil, errors.Wrapf(err, "could not load required differ due plugin init error: %s", n)
}
ordered[i], ok = d.(differ)
if !ok {
return nil, errors.Errorf("differ does not implement Comparer and Applier interface: %s", n)
}
}
return &local{
differs: ordered,
}, nil
},
})
}
type local struct {
differs []differ
}
var _ diffapi.DiffClient = &local{}
func (l *local) Apply(ctx context.Context, er *diffapi.ApplyRequest, _ ...grpc.CallOption) (*diffapi.ApplyResponse, error) {
var (
ocidesc ocispec.Descriptor
err error
desc = toDescriptor(er.Diff)
mounts = toMounts(er.Mounts)
)
for _, differ := range l.differs {
ocidesc, err = differ.Apply(ctx, desc, mounts)
if !errdefs.IsNotImplemented(err) {
break
}
}
if err != nil {
return nil, errdefs.ToGRPC(err)
}
return &diffapi.ApplyResponse{
Applied: fromDescriptor(ocidesc),
}, nil
}
func (l *local) Diff(ctx context.Context, dr *diffapi.DiffRequest, _ ...grpc.CallOption) (*diffapi.DiffResponse, error) {
var (
ocidesc ocispec.Descriptor
err error
aMounts = toMounts(dr.Left)
bMounts = toMounts(dr.Right)
)
var opts []diff.Opt
if dr.MediaType != "" {
opts = append(opts, diff.WithMediaType(dr.MediaType))
}
if dr.Ref != "" {
opts = append(opts, diff.WithReference(dr.Ref))
}
if dr.Labels != nil {
opts = append(opts, diff.WithLabels(dr.Labels))
}
for _, d := range l.differs {
ocidesc, err = d.Compare(ctx, aMounts, bMounts, opts...)
if !errdefs.IsNotImplemented(err) {
break
}
}
if err != nil {
return nil, errdefs.ToGRPC(err)
}
return &diffapi.DiffResponse{
Diff: fromDescriptor(ocidesc),
}, nil
}
func toMounts(apim []*types.Mount) []mount.Mount {
mounts := make([]mount.Mount, len(apim))
for i, m := range apim {
mounts[i] = mount.Mount{
Type: m.Type,
Source: m.Source,
Options: m.Options,
}
}
return mounts
}
func toDescriptor(d *types.Descriptor) ocispec.Descriptor {
return ocispec.Descriptor{
MediaType: d.MediaType,
Digest: d.Digest,
Size: d.Size_,
}
}
func fromDescriptor(d ocispec.Descriptor) *types.Descriptor {
return &types.Descriptor{
MediaType: d.MediaType,
Digest: d.Digest,
Size_: d.Size,
}
}

View File

@ -18,163 +18,53 @@ package diff
import (
diffapi "github.com/containerd/containerd/api/services/diff/v1"
"github.com/containerd/containerd/api/types"
"github.com/containerd/containerd/diff"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/mount"
"github.com/containerd/containerd/plugin"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/containerd/containerd/services"
"github.com/pkg/errors"
"golang.org/x/net/context"
"google.golang.org/grpc"
)
type config struct {
// Order is the order of preference in which to try diff algorithms, the
// first differ which is supported is used.
// Note when multiple differs may be supported, this order will be
// respected for which is choosen. Each differ should return the same
// correct output, allowing any ordering to be used to prefer
// more optimimal implementations.
Order []string `toml:"default"`
}
type differ interface {
diff.Comparer
diff.Applier
}
func init() {
plugin.Register(&plugin.Registration{
Type: plugin.GRPCPlugin,
ID: "diff",
Requires: []plugin.Type{
plugin.DiffPlugin,
plugin.ServicePlugin,
},
Config: defaultDifferConfig,
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
differs, err := ic.GetByType(plugin.DiffPlugin)
plugins, err := ic.GetByType(plugin.ServicePlugin)
if err != nil {
return nil, err
}
orderedNames := ic.Config.(*config).Order
ordered := make([]differ, len(orderedNames))
for i, n := range orderedNames {
differp, ok := differs[n]
if !ok {
return nil, errors.Errorf("needed differ not loaded: %s", n)
}
d, err := differp.Instance()
if err != nil {
return nil, errors.Wrapf(err, "could not load required differ due plugin init error: %s", n)
}
ordered[i], ok = d.(differ)
if !ok {
return nil, errors.Errorf("differ does not implement Comparer and Applier interface: %s", n)
}
p, ok := plugins[services.DiffService]
if !ok {
return nil, errors.New("diff service not found")
}
return &service{
differs: ordered,
}, nil
i, err := p.Instance()
if err != nil {
return nil, err
}
return &service{local: i.(diffapi.DiffClient)}, nil
},
})
}
type service struct {
differs []differ
local diffapi.DiffClient
}
var _ diffapi.DiffServer = &service{}
func (s *service) Register(gs *grpc.Server) error {
diffapi.RegisterDiffServer(gs, s)
return nil
}
func (s *service) Apply(ctx context.Context, er *diffapi.ApplyRequest) (*diffapi.ApplyResponse, error) {
var (
ocidesc ocispec.Descriptor
err error
desc = toDescriptor(er.Diff)
mounts = toMounts(er.Mounts)
)
for _, differ := range s.differs {
ocidesc, err = differ.Apply(ctx, desc, mounts)
if !errdefs.IsNotImplemented(err) {
break
}
}
if err != nil {
return nil, errdefs.ToGRPC(err)
}
return &diffapi.ApplyResponse{
Applied: fromDescriptor(ocidesc),
}, nil
return s.local.Apply(ctx, er)
}
func (s *service) Diff(ctx context.Context, dr *diffapi.DiffRequest) (*diffapi.DiffResponse, error) {
var (
ocidesc ocispec.Descriptor
err error
aMounts = toMounts(dr.Left)
bMounts = toMounts(dr.Right)
)
var opts []diff.Opt
if dr.MediaType != "" {
opts = append(opts, diff.WithMediaType(dr.MediaType))
}
if dr.Ref != "" {
opts = append(opts, diff.WithReference(dr.Ref))
}
if dr.Labels != nil {
opts = append(opts, diff.WithLabels(dr.Labels))
}
for _, d := range s.differs {
ocidesc, err = d.Compare(ctx, aMounts, bMounts, opts...)
if !errdefs.IsNotImplemented(err) {
break
}
}
if err != nil {
return nil, errdefs.ToGRPC(err)
}
return &diffapi.DiffResponse{
Diff: fromDescriptor(ocidesc),
}, nil
}
func toMounts(apim []*types.Mount) []mount.Mount {
mounts := make([]mount.Mount, len(apim))
for i, m := range apim {
mounts[i] = mount.Mount{
Type: m.Type,
Source: m.Source,
Options: m.Options,
}
}
return mounts
}
func toDescriptor(d *types.Descriptor) ocispec.Descriptor {
return ocispec.Descriptor{
MediaType: d.MediaType,
Digest: d.Digest,
Size: d.Size_,
}
}
func fromDescriptor(d ocispec.Descriptor) *types.Descriptor {
return &types.Descriptor{
MediaType: d.MediaType,
Digest: d.Digest,
Size_: d.Size,
}
return s.local.Diff(ctx, dr)
}

183
services/images/local.go Normal file
View File

@ -0,0 +1,183 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package images
import (
gocontext "context"
eventstypes "github.com/containerd/containerd/api/events"
imagesapi "github.com/containerd/containerd/api/services/images/v1"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/events"
"github.com/containerd/containerd/gc"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/metadata"
"github.com/containerd/containerd/plugin"
"github.com/containerd/containerd/services"
ptypes "github.com/gogo/protobuf/types"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
func init() {
plugin.Register(&plugin.Registration{
Type: plugin.ServicePlugin,
ID: services.ImagesService,
Requires: []plugin.Type{
plugin.MetadataPlugin,
plugin.GCPlugin,
},
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
m, err := ic.Get(plugin.MetadataPlugin)
if err != nil {
return nil, err
}
g, err := ic.Get(plugin.GCPlugin)
if err != nil {
return nil, err
}
return &local{
store: metadata.NewImageStore(m.(*metadata.DB)),
publisher: ic.Events,
gc: g.(gcScheduler),
}, nil
},
})
}
type gcScheduler interface {
ScheduleAndWait(gocontext.Context) (gc.Stats, error)
}
type local struct {
store images.Store
gc gcScheduler
publisher events.Publisher
}
var _ imagesapi.ImagesClient = &local{}
func (l *local) Get(ctx context.Context, req *imagesapi.GetImageRequest, _ ...grpc.CallOption) (*imagesapi.GetImageResponse, error) {
image, err := l.store.Get(ctx, req.Name)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
imagepb := imageToProto(&image)
return &imagesapi.GetImageResponse{
Image: &imagepb,
}, nil
}
func (l *local) List(ctx context.Context, req *imagesapi.ListImagesRequest, _ ...grpc.CallOption) (*imagesapi.ListImagesResponse, error) {
images, err := l.store.List(ctx, req.Filters...)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
return &imagesapi.ListImagesResponse{
Images: imagesToProto(images),
}, nil
}
func (l *local) Create(ctx context.Context, req *imagesapi.CreateImageRequest, _ ...grpc.CallOption) (*imagesapi.CreateImageResponse, error) {
log.G(ctx).WithField("name", req.Image.Name).WithField("target", req.Image.Target.Digest).Debugf("create image")
if req.Image.Name == "" {
return nil, status.Errorf(codes.InvalidArgument, "Image.Name required")
}
var (
image = imageFromProto(&req.Image)
resp imagesapi.CreateImageResponse
)
created, err := l.store.Create(ctx, image)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
resp.Image = imageToProto(&created)
if err := l.publisher.Publish(ctx, "/images/create", &eventstypes.ImageCreate{
Name: resp.Image.Name,
Labels: resp.Image.Labels,
}); err != nil {
return nil, err
}
return &resp, nil
}
func (l *local) Update(ctx context.Context, req *imagesapi.UpdateImageRequest, _ ...grpc.CallOption) (*imagesapi.UpdateImageResponse, error) {
if req.Image.Name == "" {
return nil, status.Errorf(codes.InvalidArgument, "Image.Name required")
}
var (
image = imageFromProto(&req.Image)
resp imagesapi.UpdateImageResponse
fieldpaths []string
)
if req.UpdateMask != nil && len(req.UpdateMask.Paths) > 0 {
for _, path := range req.UpdateMask.Paths {
fieldpaths = append(fieldpaths, path)
}
}
updated, err := l.store.Update(ctx, image, fieldpaths...)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
resp.Image = imageToProto(&updated)
if err := l.publisher.Publish(ctx, "/images/update", &eventstypes.ImageUpdate{
Name: resp.Image.Name,
Labels: resp.Image.Labels,
}); err != nil {
return nil, err
}
return &resp, nil
}
func (l *local) Delete(ctx context.Context, req *imagesapi.DeleteImageRequest, _ ...grpc.CallOption) (*ptypes.Empty, error) {
log.G(ctx).WithField("name", req.Name).Debugf("delete image")
if err := l.store.Delete(ctx, req.Name); err != nil {
return nil, errdefs.ToGRPC(err)
}
if err := l.publisher.Publish(ctx, "/images/delete", &eventstypes.ImageDelete{
Name: req.Name,
}); err != nil {
return nil, err
}
if req.Sync {
if _, err := l.gc.ScheduleAndWait(ctx); err != nil {
return nil, err
}
}
return &ptypes.Empty{}, nil
}

View File

@ -17,22 +17,13 @@
package images
import (
gocontext "context"
eventstypes "github.com/containerd/containerd/api/events"
imagesapi "github.com/containerd/containerd/api/services/images/v1"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/events"
"github.com/containerd/containerd/gc"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/metadata"
"github.com/containerd/containerd/plugin"
"github.com/containerd/containerd/services"
ptypes "github.com/gogo/protobuf/types"
"github.com/pkg/errors"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
func init() {
@ -40,42 +31,31 @@ func init() {
Type: plugin.GRPCPlugin,
ID: "images",
Requires: []plugin.Type{
plugin.MetadataPlugin,
plugin.GCPlugin,
plugin.ServicePlugin,
},
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
m, err := ic.Get(plugin.MetadataPlugin)
plugins, err := ic.GetByType(plugin.ServicePlugin)
if err != nil {
return nil, err
}
g, err := ic.Get(plugin.GCPlugin)
p, ok := plugins[services.ImagesService]
if !ok {
return nil, errors.New("images service not found")
}
i, err := p.Instance()
if err != nil {
return nil, err
}
return NewService(metadata.NewImageStore(m.(*metadata.DB)), ic.Events, g.(gcScheduler)), nil
return &service{local: i.(imagesapi.ImagesClient)}, nil
},
})
}
type gcScheduler interface {
ScheduleAndWait(gocontext.Context) (gc.Stats, error)
}
type service struct {
store images.Store
gc gcScheduler
publisher events.Publisher
local imagesapi.ImagesClient
}
// NewService returns the GRPC image server
func NewService(is images.Store, publisher events.Publisher, gc gcScheduler) imagesapi.ImagesServer {
return &service{
store: is,
gc: gc,
publisher: publisher,
}
}
var _ imagesapi.ImagesServer = &service{}
func (s *service) Register(server *grpc.Server) error {
imagesapi.RegisterImagesServer(server, s)
@ -83,108 +63,21 @@ func (s *service) Register(server *grpc.Server) error {
}
func (s *service) Get(ctx context.Context, req *imagesapi.GetImageRequest) (*imagesapi.GetImageResponse, error) {
image, err := s.store.Get(ctx, req.Name)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
imagepb := imageToProto(&image)
return &imagesapi.GetImageResponse{
Image: &imagepb,
}, nil
return s.local.Get(ctx, req)
}
func (s *service) List(ctx context.Context, req *imagesapi.ListImagesRequest) (*imagesapi.ListImagesResponse, error) {
images, err := s.store.List(ctx, req.Filters...)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
return &imagesapi.ListImagesResponse{
Images: imagesToProto(images),
}, nil
return s.local.List(ctx, req)
}
func (s *service) Create(ctx context.Context, req *imagesapi.CreateImageRequest) (*imagesapi.CreateImageResponse, error) {
log.G(ctx).WithField("name", req.Image.Name).WithField("target", req.Image.Target.Digest).Debugf("create image")
if req.Image.Name == "" {
return nil, status.Errorf(codes.InvalidArgument, "Image.Name required")
}
var (
image = imageFromProto(&req.Image)
resp imagesapi.CreateImageResponse
)
created, err := s.store.Create(ctx, image)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
resp.Image = imageToProto(&created)
if err := s.publisher.Publish(ctx, "/images/create", &eventstypes.ImageCreate{
Name: resp.Image.Name,
Labels: resp.Image.Labels,
}); err != nil {
return nil, err
}
return &resp, nil
return s.local.Create(ctx, req)
}
func (s *service) Update(ctx context.Context, req *imagesapi.UpdateImageRequest) (*imagesapi.UpdateImageResponse, error) {
if req.Image.Name == "" {
return nil, status.Errorf(codes.InvalidArgument, "Image.Name required")
}
var (
image = imageFromProto(&req.Image)
resp imagesapi.UpdateImageResponse
fieldpaths []string
)
if req.UpdateMask != nil && len(req.UpdateMask.Paths) > 0 {
for _, path := range req.UpdateMask.Paths {
fieldpaths = append(fieldpaths, path)
}
}
updated, err := s.store.Update(ctx, image, fieldpaths...)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
resp.Image = imageToProto(&updated)
if err := s.publisher.Publish(ctx, "/images/update", &eventstypes.ImageUpdate{
Name: resp.Image.Name,
Labels: resp.Image.Labels,
}); err != nil {
return nil, err
}
return &resp, nil
return s.local.Update(ctx, req)
}
func (s *service) Delete(ctx context.Context, req *imagesapi.DeleteImageRequest) (*ptypes.Empty, error) {
log.G(ctx).WithField("name", req.Name).Debugf("delete image")
if err := s.store.Delete(ctx, req.Name); err != nil {
return nil, errdefs.ToGRPC(err)
}
if err := s.publisher.Publish(ctx, "/images/delete", &eventstypes.ImageDelete{
Name: req.Name,
}); err != nil {
return nil, err
}
if req.Sync {
if _, err := s.gc.ScheduleAndWait(ctx); err != nil {
return nil, err
}
}
return &ptypes.Empty{}, nil
return s.local.Delete(ctx, req)
}

120
services/leases/local.go Normal file
View File

@ -0,0 +1,120 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package leases
import (
"crypto/rand"
"encoding/base64"
"fmt"
"time"
"google.golang.org/grpc"
"github.com/boltdb/bolt"
api "github.com/containerd/containerd/api/services/leases/v1"
"github.com/containerd/containerd/metadata"
"github.com/containerd/containerd/plugin"
"github.com/containerd/containerd/services"
ptypes "github.com/gogo/protobuf/types"
"golang.org/x/net/context"
)
func init() {
plugin.Register(&plugin.Registration{
Type: plugin.ServicePlugin,
ID: services.LeasesService,
Requires: []plugin.Type{
plugin.MetadataPlugin,
},
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
m, err := ic.Get(plugin.MetadataPlugin)
if err != nil {
return nil, err
}
return &local{db: m.(*metadata.DB)}, nil
},
})
}
type local struct {
db *metadata.DB
}
func (l *local) Create(ctx context.Context, r *api.CreateRequest, _ ...grpc.CallOption) (*api.CreateResponse, error) {
lid := r.ID
if lid == "" {
lid = generateLeaseID()
}
var trans metadata.Lease
if err := l.db.Update(func(tx *bolt.Tx) error {
var err error
trans, err = metadata.NewLeaseManager(tx).Create(ctx, lid, r.Labels)
return err
}); err != nil {
return nil, err
}
return &api.CreateResponse{
Lease: txToGRPC(trans),
}, nil
}
func (l *local) Delete(ctx context.Context, r *api.DeleteRequest, _ ...grpc.CallOption) (*ptypes.Empty, error) {
if err := l.db.Update(func(tx *bolt.Tx) error {
return metadata.NewLeaseManager(tx).Delete(ctx, r.ID)
}); err != nil {
return nil, err
}
return &ptypes.Empty{}, nil
}
func (l *local) List(ctx context.Context, r *api.ListRequest, _ ...grpc.CallOption) (*api.ListResponse, error) {
var leases []metadata.Lease
if err := l.db.View(func(tx *bolt.Tx) error {
var err error
leases, err = metadata.NewLeaseManager(tx).List(ctx, false, r.Filters...)
return err
}); err != nil {
return nil, err
}
apileases := make([]*api.Lease, len(leases))
for i := range leases {
apileases[i] = txToGRPC(leases[i])
}
return &api.ListResponse{
Leases: apileases,
}, nil
}
func txToGRPC(tx metadata.Lease) *api.Lease {
return &api.Lease{
ID: tx.ID,
Labels: tx.Labels,
CreatedAt: tx.CreatedAt,
// TODO: Snapshots
// TODO: Content
}
}
func generateLeaseID() string {
t := time.Now()
var b [3]byte
// Ignore read failures, just decreases uniqueness
rand.Read(b[:])
return fmt.Sprintf("%d-%s", t.Nanosecond(), base64.URLEncoding.EncodeToString(b[:]))
}

View File

@ -17,18 +17,13 @@
package leases
import (
"crypto/rand"
"encoding/base64"
"fmt"
"time"
"google.golang.org/grpc"
"github.com/boltdb/bolt"
api "github.com/containerd/containerd/api/services/leases/v1"
"github.com/containerd/containerd/metadata"
"github.com/containerd/containerd/plugin"
"github.com/containerd/containerd/services"
ptypes "github.com/gogo/protobuf/types"
"github.com/pkg/errors"
"golang.org/x/net/context"
)
@ -37,27 +32,28 @@ func init() {
Type: plugin.GRPCPlugin,
ID: "leases",
Requires: []plugin.Type{
plugin.MetadataPlugin,
plugin.ServicePlugin,
},
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
m, err := ic.Get(plugin.MetadataPlugin)
plugins, err := ic.GetByType(plugin.ServicePlugin)
if err != nil {
return nil, err
}
return NewService(m.(*metadata.DB)), nil
p, ok := plugins[services.LeasesService]
if !ok {
return nil, errors.New("leases service not found")
}
i, err := p.Instance()
if err != nil {
return nil, err
}
return &service{local: i.(api.LeasesClient)}, nil
},
})
}
type service struct {
db *metadata.DB
}
// NewService returns the GRPC metadata server
func NewService(db *metadata.DB) api.LeasesServer {
return &service{
db: db,
}
local api.LeasesClient
}
func (s *service) Register(server *grpc.Server) error {
@ -66,66 +62,13 @@ func (s *service) Register(server *grpc.Server) error {
}
func (s *service) Create(ctx context.Context, r *api.CreateRequest) (*api.CreateResponse, error) {
lid := r.ID
if lid == "" {
lid = generateLeaseID()
}
var trans metadata.Lease
if err := s.db.Update(func(tx *bolt.Tx) error {
var err error
trans, err = metadata.NewLeaseManager(tx).Create(ctx, lid, r.Labels)
return err
}); err != nil {
return nil, err
}
return &api.CreateResponse{
Lease: txToGRPC(trans),
}, nil
return s.local.Create(ctx, r)
}
func (s *service) Delete(ctx context.Context, r *api.DeleteRequest) (*ptypes.Empty, error) {
if err := s.db.Update(func(tx *bolt.Tx) error {
return metadata.NewLeaseManager(tx).Delete(ctx, r.ID)
}); err != nil {
return nil, err
}
return &ptypes.Empty{}, nil
return s.local.Delete(ctx, r)
}
func (s *service) List(ctx context.Context, r *api.ListRequest) (*api.ListResponse, error) {
var leases []metadata.Lease
if err := s.db.View(func(tx *bolt.Tx) error {
var err error
leases, err = metadata.NewLeaseManager(tx).List(ctx, false, r.Filters...)
return err
}); err != nil {
return nil, err
}
apileases := make([]*api.Lease, len(leases))
for i := range leases {
apileases[i] = txToGRPC(leases[i])
}
return &api.ListResponse{
Leases: apileases,
}, nil
}
func txToGRPC(tx metadata.Lease) *api.Lease {
return &api.Lease{
ID: tx.ID,
Labels: tx.Labels,
CreatedAt: tx.CreatedAt,
// TODO: Snapshots
// TODO: Content
}
}
func generateLeaseID() string {
t := time.Now()
var b [3]byte
// Ignore read failures, just decreases uniqueness
rand.Read(b[:])
return fmt.Sprintf("%d-%s", t.Nanosecond(), base64.URLEncoding.EncodeToString(b[:]))
return s.local.List(ctx, r)
}

View File

@ -0,0 +1,223 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package namespaces
import (
"strings"
"github.com/boltdb/bolt"
eventstypes "github.com/containerd/containerd/api/events"
api "github.com/containerd/containerd/api/services/namespaces/v1"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/events"
"github.com/containerd/containerd/metadata"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/plugin"
"github.com/containerd/containerd/services"
ptypes "github.com/gogo/protobuf/types"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
func init() {
plugin.Register(&plugin.Registration{
Type: plugin.ServicePlugin,
ID: services.NamespacesService,
Requires: []plugin.Type{
plugin.MetadataPlugin,
},
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
m, err := ic.Get(plugin.MetadataPlugin)
if err != nil {
return nil, err
}
return &local{
db: m.(*metadata.DB),
publisher: ic.Events,
}, nil
},
})
}
// Provide local namespaces service instead of local namespace store,
// because namespace store interface doesn't provide enough functionality
// for namespaces service.
type local struct {
db *metadata.DB
publisher events.Publisher
}
var _ api.NamespacesClient = &local{}
func (l *local) Get(ctx context.Context, req *api.GetNamespaceRequest, _ ...grpc.CallOption) (*api.GetNamespaceResponse, error) {
var resp api.GetNamespaceResponse
return &resp, l.withStoreView(ctx, func(ctx context.Context, store namespaces.Store) error {
labels, err := store.Labels(ctx, req.Name)
if err != nil {
return errdefs.ToGRPC(err)
}
resp.Namespace = api.Namespace{
Name: req.Name,
Labels: labels,
}
return nil
})
}
func (l *local) List(ctx context.Context, req *api.ListNamespacesRequest, _ ...grpc.CallOption) (*api.ListNamespacesResponse, error) {
var resp api.ListNamespacesResponse
return &resp, l.withStoreView(ctx, func(ctx context.Context, store namespaces.Store) error {
namespaces, err := store.List(ctx)
if err != nil {
return err
}
for _, namespace := range namespaces {
labels, err := store.Labels(ctx, namespace)
if err != nil {
// In general, this should be unlikely, since we are holding a
// transaction to service this request.
return errdefs.ToGRPC(err)
}
resp.Namespaces = append(resp.Namespaces, api.Namespace{
Name: namespace,
Labels: labels,
})
}
return nil
})
}
func (l *local) Create(ctx context.Context, req *api.CreateNamespaceRequest, _ ...grpc.CallOption) (*api.CreateNamespaceResponse, error) {
var resp api.CreateNamespaceResponse
if err := l.withStoreUpdate(ctx, func(ctx context.Context, store namespaces.Store) error {
if err := store.Create(ctx, req.Namespace.Name, req.Namespace.Labels); err != nil {
return errdefs.ToGRPC(err)
}
for k, v := range req.Namespace.Labels {
if err := store.SetLabel(ctx, req.Namespace.Name, k, v); err != nil {
return err
}
}
resp.Namespace = req.Namespace
return nil
}); err != nil {
return &resp, err
}
if err := l.publisher.Publish(ctx, "/namespaces/create", &eventstypes.NamespaceCreate{
Name: req.Namespace.Name,
Labels: req.Namespace.Labels,
}); err != nil {
return &resp, err
}
return &resp, nil
}
func (l *local) Update(ctx context.Context, req *api.UpdateNamespaceRequest, _ ...grpc.CallOption) (*api.UpdateNamespaceResponse, error) {
var resp api.UpdateNamespaceResponse
if err := l.withStoreUpdate(ctx, func(ctx context.Context, store namespaces.Store) error {
if req.UpdateMask != nil && len(req.UpdateMask.Paths) > 0 {
for _, path := range req.UpdateMask.Paths {
switch {
case strings.HasPrefix(path, "labels."):
key := strings.TrimPrefix(path, "labels.")
if err := store.SetLabel(ctx, req.Namespace.Name, key, req.Namespace.Labels[key]); err != nil {
return err
}
default:
return status.Errorf(codes.InvalidArgument, "cannot update %q field", path)
}
}
} else {
// clear out the existing labels and then set them to the incoming request.
// get current set of labels
labels, err := store.Labels(ctx, req.Namespace.Name)
if err != nil {
return errdefs.ToGRPC(err)
}
for k := range labels {
if err := store.SetLabel(ctx, req.Namespace.Name, k, ""); err != nil {
return err
}
}
for k, v := range req.Namespace.Labels {
if err := store.SetLabel(ctx, req.Namespace.Name, k, v); err != nil {
return err
}
}
}
return nil
}); err != nil {
return &resp, err
}
if err := l.publisher.Publish(ctx, "/namespaces/update", &eventstypes.NamespaceUpdate{
Name: req.Namespace.Name,
Labels: req.Namespace.Labels,
}); err != nil {
return &resp, err
}
return &resp, nil
}
func (l *local) Delete(ctx context.Context, req *api.DeleteNamespaceRequest, _ ...grpc.CallOption) (*ptypes.Empty, error) {
if err := l.withStoreUpdate(ctx, func(ctx context.Context, store namespaces.Store) error {
return errdefs.ToGRPC(store.Delete(ctx, req.Name))
}); err != nil {
return &ptypes.Empty{}, err
}
// set the namespace in the context before publishing the event
ctx = namespaces.WithNamespace(ctx, req.Name)
if err := l.publisher.Publish(ctx, "/namespaces/delete", &eventstypes.NamespaceDelete{
Name: req.Name,
}); err != nil {
return &ptypes.Empty{}, err
}
return &ptypes.Empty{}, nil
}
func (l *local) withStore(ctx context.Context, fn func(ctx context.Context, store namespaces.Store) error) func(tx *bolt.Tx) error {
return func(tx *bolt.Tx) error { return fn(ctx, metadata.NewNamespaceStore(tx)) }
}
func (l *local) withStoreView(ctx context.Context, fn func(ctx context.Context, store namespaces.Store) error) error {
return l.db.View(l.withStore(ctx, fn))
}
func (l *local) withStoreUpdate(ctx context.Context, fn func(ctx context.Context, store namespaces.Store) error) error {
return l.db.Update(l.withStore(ctx, fn))
}

View File

@ -17,21 +17,13 @@
package namespaces
import (
"strings"
"github.com/boltdb/bolt"
eventstypes "github.com/containerd/containerd/api/events"
api "github.com/containerd/containerd/api/services/namespaces/v1"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/events"
"github.com/containerd/containerd/metadata"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/plugin"
"github.com/containerd/containerd/services"
ptypes "github.com/gogo/protobuf/types"
"github.com/pkg/errors"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
func init() {
@ -39,191 +31,53 @@ func init() {
Type: plugin.GRPCPlugin,
ID: "namespaces",
Requires: []plugin.Type{
plugin.MetadataPlugin,
plugin.ServicePlugin,
},
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
m, err := ic.Get(plugin.MetadataPlugin)
plugins, err := ic.GetByType(plugin.ServicePlugin)
if err != nil {
return nil, err
}
return NewService(m.(*metadata.DB), ic.Events), nil
p, ok := plugins[services.NamespacesService]
if !ok {
return nil, errors.New("namespaces service not found")
}
i, err := p.Instance()
if err != nil {
return nil, err
}
return &service{local: i.(api.NamespacesClient)}, nil
},
})
}
type service struct {
db *metadata.DB
publisher events.Publisher
local api.NamespacesClient
}
var _ api.NamespacesServer = &service{}
// NewService returns the GRPC namespaces server
func NewService(db *metadata.DB, publisher events.Publisher) api.NamespacesServer {
return &service{
db: db,
publisher: publisher,
}
}
func (s *service) Register(server *grpc.Server) error {
api.RegisterNamespacesServer(server, s)
return nil
}
func (s *service) Get(ctx context.Context, req *api.GetNamespaceRequest) (*api.GetNamespaceResponse, error) {
var resp api.GetNamespaceResponse
return &resp, s.withStoreView(ctx, func(ctx context.Context, store namespaces.Store) error {
labels, err := store.Labels(ctx, req.Name)
if err != nil {
return errdefs.ToGRPC(err)
}
resp.Namespace = api.Namespace{
Name: req.Name,
Labels: labels,
}
return nil
})
return s.local.Get(ctx, req)
}
func (s *service) List(ctx context.Context, req *api.ListNamespacesRequest) (*api.ListNamespacesResponse, error) {
var resp api.ListNamespacesResponse
return &resp, s.withStoreView(ctx, func(ctx context.Context, store namespaces.Store) error {
namespaces, err := store.List(ctx)
if err != nil {
return err
}
for _, namespace := range namespaces {
labels, err := store.Labels(ctx, namespace)
if err != nil {
// In general, this should be unlikely, since we are holding a
// transaction to service this request.
return errdefs.ToGRPC(err)
}
resp.Namespaces = append(resp.Namespaces, api.Namespace{
Name: namespace,
Labels: labels,
})
}
return nil
})
return s.local.List(ctx, req)
}
func (s *service) Create(ctx context.Context, req *api.CreateNamespaceRequest) (*api.CreateNamespaceResponse, error) {
var resp api.CreateNamespaceResponse
if err := s.withStoreUpdate(ctx, func(ctx context.Context, store namespaces.Store) error {
if err := store.Create(ctx, req.Namespace.Name, req.Namespace.Labels); err != nil {
return errdefs.ToGRPC(err)
}
for k, v := range req.Namespace.Labels {
if err := store.SetLabel(ctx, req.Namespace.Name, k, v); err != nil {
return err
}
}
resp.Namespace = req.Namespace
return nil
}); err != nil {
return &resp, err
}
if err := s.publisher.Publish(ctx, "/namespaces/create", &eventstypes.NamespaceCreate{
Name: req.Namespace.Name,
Labels: req.Namespace.Labels,
}); err != nil {
return &resp, err
}
return &resp, nil
return s.local.Create(ctx, req)
}
func (s *service) Update(ctx context.Context, req *api.UpdateNamespaceRequest) (*api.UpdateNamespaceResponse, error) {
var resp api.UpdateNamespaceResponse
if err := s.withStoreUpdate(ctx, func(ctx context.Context, store namespaces.Store) error {
if req.UpdateMask != nil && len(req.UpdateMask.Paths) > 0 {
for _, path := range req.UpdateMask.Paths {
switch {
case strings.HasPrefix(path, "labels."):
key := strings.TrimPrefix(path, "labels.")
if err := store.SetLabel(ctx, req.Namespace.Name, key, req.Namespace.Labels[key]); err != nil {
return err
}
default:
return status.Errorf(codes.InvalidArgument, "cannot update %q field", path)
}
}
} else {
// clear out the existing labels and then set them to the incoming request.
// get current set of labels
labels, err := store.Labels(ctx, req.Namespace.Name)
if err != nil {
return errdefs.ToGRPC(err)
}
for k := range labels {
if err := store.SetLabel(ctx, req.Namespace.Name, k, ""); err != nil {
return err
}
}
for k, v := range req.Namespace.Labels {
if err := store.SetLabel(ctx, req.Namespace.Name, k, v); err != nil {
return err
}
}
}
return nil
}); err != nil {
return &resp, err
}
if err := s.publisher.Publish(ctx, "/namespaces/update", &eventstypes.NamespaceUpdate{
Name: req.Namespace.Name,
Labels: req.Namespace.Labels,
}); err != nil {
return &resp, err
}
return &resp, nil
return s.local.Update(ctx, req)
}
func (s *service) Delete(ctx context.Context, req *api.DeleteNamespaceRequest) (*ptypes.Empty, error) {
if err := s.withStoreUpdate(ctx, func(ctx context.Context, store namespaces.Store) error {
return errdefs.ToGRPC(store.Delete(ctx, req.Name))
}); err != nil {
return &ptypes.Empty{}, err
}
// set the namespace in the context before publishing the event
ctx = namespaces.WithNamespace(ctx, req.Name)
if err := s.publisher.Publish(ctx, "/namespaces/delete", &eventstypes.NamespaceDelete{
Name: req.Name,
}); err != nil {
return &ptypes.Empty{}, err
}
return &ptypes.Empty{}, nil
}
func (s *service) withStore(ctx context.Context, fn func(ctx context.Context, store namespaces.Store) error) func(tx *bolt.Tx) error {
return func(tx *bolt.Tx) error { return fn(ctx, metadata.NewNamespaceStore(tx)) }
}
func (s *service) withStoreView(ctx context.Context, fn func(ctx context.Context, store namespaces.Store) error) error {
return s.db.View(s.withStore(ctx, fn))
}
func (s *service) withStoreUpdate(ctx context.Context, fn func(ctx context.Context, store namespaces.Store) error) error {
return s.db.Update(s.withStore(ctx, fn))
return s.local.Delete(ctx, req)
}

36
services/services.go Normal file
View File

@ -0,0 +1,36 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package services
const (
// ContentService is id of content service.
ContentService = "content-service"
// SnapshotsService is id of snapshots service.
SnapshotsService = "snapshots-service"
// ImagesService is id of images service.
ImagesService = "images-service"
// ContainersService is id of containers service.
ContainersService = "containers-service"
// TasksService is id of tasks service.
TasksService = "tasks-service"
// NamespacesService is id of namespaces service.
NamespacesService = "namespaces-service"
// LeasesService is id of leases service.
LeasesService = "leases-service"
// DiffService is id of diff service.
DiffService = "diff-service"
)

View File

@ -19,17 +19,16 @@ package snapshots
import (
gocontext "context"
eventstypes "github.com/containerd/containerd/api/events"
snapshotsapi "github.com/containerd/containerd/api/services/snapshots/v1"
"github.com/containerd/containerd/api/types"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/events"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/metadata"
"github.com/containerd/containerd/mount"
"github.com/containerd/containerd/plugin"
"github.com/containerd/containerd/services"
"github.com/containerd/containerd/snapshots"
ptypes "github.com/gogo/protobuf/types"
"github.com/pkg/errors"
"golang.org/x/net/context"
"google.golang.org/grpc"
)
@ -39,7 +38,7 @@ func init() {
Type: plugin.GRPCPlugin,
ID: "snapshots",
Requires: []plugin.Type{
plugin.MetadataPlugin,
plugin.ServicePlugin,
},
InitFn: newService,
})
@ -48,20 +47,24 @@ func init() {
var empty = &ptypes.Empty{}
type service struct {
db *metadata.DB
publisher events.Publisher
ss map[string]snapshots.Snapshotter
}
func newService(ic *plugin.InitContext) (interface{}, error) {
md, err := ic.Get(plugin.MetadataPlugin)
plugins, err := ic.GetByType(plugin.ServicePlugin)
if err != nil {
return nil, err
}
return &service{
db: md.(*metadata.DB),
publisher: ic.Events,
}, nil
p, ok := plugins[services.SnapshotsService]
if !ok {
return nil, errors.New("snapshots service not found")
}
i, err := p.Instance()
if err != nil {
return nil, err
}
ss := i.(map[string]snapshots.Snapshotter)
return &service{ss: ss}, nil
}
func (s *service) getSnapshotter(name string) (snapshots.Snapshotter, error) {
@ -69,7 +72,7 @@ func (s *service) getSnapshotter(name string) (snapshots.Snapshotter, error) {
return nil, errdefs.ToGRPCf(errdefs.ErrInvalidArgument, "snapshotter argument missing")
}
sn := s.db.Snapshotter(name)
sn := s.ss[name]
if sn == nil {
return nil, errdefs.ToGRPCf(errdefs.ErrInvalidArgument, "snapshotter not loaded: %s", name)
}
@ -97,12 +100,6 @@ func (s *service) Prepare(ctx context.Context, pr *snapshotsapi.PrepareSnapshotR
return nil, errdefs.ToGRPC(err)
}
if err := s.publisher.Publish(ctx, "/snapshot/prepare", &eventstypes.SnapshotPrepare{
Key: pr.Key,
Parent: pr.Parent,
}); err != nil {
return nil, err
}
return &snapshotsapi.PrepareSnapshotResponse{
Mounts: fromMounts(mounts),
}, nil
@ -158,12 +155,6 @@ func (s *service) Commit(ctx context.Context, cr *snapshotsapi.CommitSnapshotReq
return nil, errdefs.ToGRPC(err)
}
if err := s.publisher.Publish(ctx, "/snapshot/commit", &eventstypes.SnapshotCommit{
Key: cr.Key,
Name: cr.Name,
}); err != nil {
return nil, err
}
return empty, nil
}
@ -178,11 +169,6 @@ func (s *service) Remove(ctx context.Context, rr *snapshotsapi.RemoveSnapshotReq
return nil, errdefs.ToGRPC(err)
}
if err := s.publisher.Publish(ctx, "/snapshot/remove", &eventstypes.SnapshotRemove{
Key: rr.Key,
}); err != nil {
return nil, err
}
return empty, nil
}

View File

@ -0,0 +1,98 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package snapshots
import (
"context"
eventstypes "github.com/containerd/containerd/api/events"
"github.com/containerd/containerd/events"
"github.com/containerd/containerd/metadata"
"github.com/containerd/containerd/mount"
"github.com/containerd/containerd/plugin"
"github.com/containerd/containerd/services"
"github.com/containerd/containerd/snapshots"
)
// snapshotter wraps snapshots.Snapshotter with proper events published.
type snapshotter struct {
snapshots.Snapshotter
publisher events.Publisher
}
func init() {
plugin.Register(&plugin.Registration{
Type: plugin.ServicePlugin,
ID: services.SnapshotsService,
Requires: []plugin.Type{
plugin.MetadataPlugin,
},
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
m, err := ic.Get(plugin.MetadataPlugin)
if err != nil {
return nil, err
}
db := m.(*metadata.DB)
ss := make(map[string]snapshots.Snapshotter)
for n, sn := range db.Snapshotters() {
ss[n] = newSnapshotter(sn, ic.Events)
}
return ss, nil
},
})
}
func newSnapshotter(sn snapshots.Snapshotter, publisher events.Publisher) snapshots.Snapshotter {
return &snapshotter{
Snapshotter: sn,
publisher: publisher,
}
}
func (s *snapshotter) Prepare(ctx context.Context, key, parent string, opts ...snapshots.Opt) ([]mount.Mount, error) {
mounts, err := s.Snapshotter.Prepare(ctx, key, parent, opts...)
if err != nil {
return nil, err
}
if err := s.publisher.Publish(ctx, "/snapshot/prepare", &eventstypes.SnapshotPrepare{
Key: key,
Parent: parent,
}); err != nil {
return nil, err
}
return mounts, nil
}
func (s *snapshotter) Commit(ctx context.Context, name, key string, opts ...snapshots.Opt) error {
if err := s.Snapshotter.Commit(ctx, name, key, opts...); err != nil {
return err
}
return s.publisher.Publish(ctx, "/snapshot/commit", &eventstypes.SnapshotCommit{
Key: key,
Name: name,
})
}
func (s *snapshotter) Remove(ctx context.Context, key string) error {
if err := s.Snapshotter.Remove(ctx, key); err != nil {
return err
}
return s.publisher.Publish(ctx, "/snapshot/remove", &eventstypes.SnapshotRemove{
Key: key,
})
}

634
services/tasks/local.go Normal file
View File

@ -0,0 +1,634 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package tasks
import (
"bytes"
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
"time"
"github.com/boltdb/bolt"
api "github.com/containerd/containerd/api/services/tasks/v1"
"github.com/containerd/containerd/api/types"
"github.com/containerd/containerd/api/types/task"
"github.com/containerd/containerd/archive"
"github.com/containerd/containerd/containers"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/events"
"github.com/containerd/containerd/filters"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/metadata"
"github.com/containerd/containerd/mount"
"github.com/containerd/containerd/plugin"
"github.com/containerd/containerd/runtime"
"github.com/containerd/containerd/services"
"github.com/containerd/typeurl"
ptypes "github.com/gogo/protobuf/types"
"github.com/pkg/errors"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
var (
_ = (api.TasksClient)(&local{})
empty = &ptypes.Empty{}
)
func init() {
plugin.Register(&plugin.Registration{
Type: plugin.ServicePlugin,
ID: services.TasksService,
Requires: []plugin.Type{
plugin.RuntimePlugin,
plugin.MetadataPlugin,
},
InitFn: initFunc,
})
}
func initFunc(ic *plugin.InitContext) (interface{}, error) {
rt, err := ic.GetByType(plugin.RuntimePlugin)
if err != nil {
return nil, err
}
m, err := ic.Get(plugin.MetadataPlugin)
if err != nil {
return nil, err
}
cs := m.(*metadata.DB).ContentStore()
runtimes := make(map[string]runtime.Runtime)
for _, rr := range rt {
ri, err := rr.Instance()
if err != nil {
log.G(ic.Context).WithError(err).Warn("could not load runtime instance due to initialization error")
continue
}
r := ri.(runtime.Runtime)
runtimes[r.ID()] = r
}
if len(runtimes) == 0 {
return nil, errors.New("no runtimes available to create task service")
}
return &local{
runtimes: runtimes,
db: m.(*metadata.DB),
store: cs,
publisher: ic.Events,
}, nil
}
type local struct {
runtimes map[string]runtime.Runtime
db *metadata.DB
store content.Store
publisher events.Publisher
}
func (l *local) Create(ctx context.Context, r *api.CreateTaskRequest, _ ...grpc.CallOption) (*api.CreateTaskResponse, error) {
var (
checkpointPath string
err error
)
if r.Checkpoint != nil {
checkpointPath, err = ioutil.TempDir("", "ctrd-checkpoint")
if err != nil {
return nil, err
}
if r.Checkpoint.MediaType != images.MediaTypeContainerd1Checkpoint {
return nil, fmt.Errorf("unsupported checkpoint type %q", r.Checkpoint.MediaType)
}
reader, err := l.store.ReaderAt(ctx, r.Checkpoint.Digest)
if err != nil {
return nil, err
}
_, err = archive.Apply(ctx, checkpointPath, content.NewReader(reader))
reader.Close()
if err != nil {
return nil, err
}
}
container, err := l.getContainer(ctx, r.ContainerID)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
opts := runtime.CreateOpts{
Spec: container.Spec,
IO: runtime.IO{
Stdin: r.Stdin,
Stdout: r.Stdout,
Stderr: r.Stderr,
Terminal: r.Terminal,
},
Checkpoint: checkpointPath,
Options: r.Options,
}
for _, m := range r.Rootfs {
opts.Rootfs = append(opts.Rootfs, mount.Mount{
Type: m.Type,
Source: m.Source,
Options: m.Options,
})
}
runtime, err := l.getRuntime(container.Runtime.Name)
if err != nil {
return nil, err
}
c, err := runtime.Create(ctx, r.ContainerID, opts)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
state, err := c.State(ctx)
if err != nil {
log.G(ctx).Error(err)
}
return &api.CreateTaskResponse{
ContainerID: r.ContainerID,
Pid: state.Pid,
}, nil
}
func (l *local) Start(ctx context.Context, r *api.StartRequest, _ ...grpc.CallOption) (*api.StartResponse, error) {
t, err := l.getTask(ctx, r.ContainerID)
if err != nil {
return nil, err
}
p := runtime.Process(t)
if r.ExecID != "" {
if p, err = t.Process(ctx, r.ExecID); err != nil {
return nil, errdefs.ToGRPC(err)
}
}
if err := p.Start(ctx); err != nil {
return nil, errdefs.ToGRPC(err)
}
state, err := p.State(ctx)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
return &api.StartResponse{
Pid: state.Pid,
}, nil
}
func (l *local) Delete(ctx context.Context, r *api.DeleteTaskRequest, _ ...grpc.CallOption) (*api.DeleteResponse, error) {
t, err := l.getTask(ctx, r.ContainerID)
if err != nil {
return nil, err
}
runtime, err := l.getRuntime(t.Info().Runtime)
if err != nil {
return nil, err
}
exit, err := runtime.Delete(ctx, t)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
return &api.DeleteResponse{
ExitStatus: exit.Status,
ExitedAt: exit.Timestamp,
Pid: exit.Pid,
}, nil
}
func (l *local) DeleteProcess(ctx context.Context, r *api.DeleteProcessRequest, _ ...grpc.CallOption) (*api.DeleteResponse, error) {
t, err := l.getTask(ctx, r.ContainerID)
if err != nil {
return nil, err
}
exit, err := t.DeleteProcess(ctx, r.ExecID)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
return &api.DeleteResponse{
ID: r.ExecID,
ExitStatus: exit.Status,
ExitedAt: exit.Timestamp,
Pid: exit.Pid,
}, nil
}
func processFromContainerd(ctx context.Context, p runtime.Process) (*task.Process, error) {
state, err := p.State(ctx)
if err != nil {
return nil, err
}
var status task.Status
switch state.Status {
case runtime.CreatedStatus:
status = task.StatusCreated
case runtime.RunningStatus:
status = task.StatusRunning
case runtime.StoppedStatus:
status = task.StatusStopped
case runtime.PausedStatus:
status = task.StatusPaused
case runtime.PausingStatus:
status = task.StatusPausing
default:
log.G(ctx).WithField("status", state.Status).Warn("unknown status")
}
return &task.Process{
ID: p.ID(),
Pid: state.Pid,
Status: status,
Stdin: state.Stdin,
Stdout: state.Stdout,
Stderr: state.Stderr,
Terminal: state.Terminal,
ExitStatus: state.ExitStatus,
ExitedAt: state.ExitedAt,
}, nil
}
func (l *local) Get(ctx context.Context, r *api.GetRequest, _ ...grpc.CallOption) (*api.GetResponse, error) {
task, err := l.getTask(ctx, r.ContainerID)
if err != nil {
return nil, err
}
p := runtime.Process(task)
if r.ExecID != "" {
if p, err = task.Process(ctx, r.ExecID); err != nil {
return nil, errdefs.ToGRPC(err)
}
}
t, err := processFromContainerd(ctx, p)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
return &api.GetResponse{
Process: t,
}, nil
}
func (l *local) List(ctx context.Context, r *api.ListTasksRequest, _ ...grpc.CallOption) (*api.ListTasksResponse, error) {
resp := &api.ListTasksResponse{}
for _, r := range l.runtimes {
tasks, err := r.Tasks(ctx)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
addTasks(ctx, resp, tasks)
}
return resp, nil
}
func addTasks(ctx context.Context, r *api.ListTasksResponse, tasks []runtime.Task) {
for _, t := range tasks {
tt, err := processFromContainerd(ctx, t)
if err != nil {
if !errdefs.IsNotFound(err) { // handle race with deletion
log.G(ctx).WithError(err).WithField("id", t.ID()).Error("converting task to protobuf")
}
continue
}
r.Tasks = append(r.Tasks, tt)
}
}
func (l *local) Pause(ctx context.Context, r *api.PauseTaskRequest, _ ...grpc.CallOption) (*ptypes.Empty, error) {
t, err := l.getTask(ctx, r.ContainerID)
if err != nil {
return nil, err
}
err = t.Pause(ctx)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
return empty, nil
}
func (l *local) Resume(ctx context.Context, r *api.ResumeTaskRequest, _ ...grpc.CallOption) (*ptypes.Empty, error) {
t, err := l.getTask(ctx, r.ContainerID)
if err != nil {
return nil, err
}
err = t.Resume(ctx)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
return empty, nil
}
func (l *local) Kill(ctx context.Context, r *api.KillRequest, _ ...grpc.CallOption) (*ptypes.Empty, error) {
t, err := l.getTask(ctx, r.ContainerID)
if err != nil {
return nil, err
}
p := runtime.Process(t)
if r.ExecID != "" {
if p, err = t.Process(ctx, r.ExecID); err != nil {
return nil, errdefs.ToGRPC(err)
}
}
if err := p.Kill(ctx, r.Signal, r.All); err != nil {
return nil, errdefs.ToGRPC(err)
}
return empty, nil
}
func (l *local) ListPids(ctx context.Context, r *api.ListPidsRequest, _ ...grpc.CallOption) (*api.ListPidsResponse, error) {
t, err := l.getTask(ctx, r.ContainerID)
if err != nil {
return nil, err
}
processList, err := t.Pids(ctx)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
var processes []*task.ProcessInfo
for _, p := range processList {
pInfo := task.ProcessInfo{
Pid: p.Pid,
}
if p.Info != nil {
a, err := typeurl.MarshalAny(p.Info)
if err != nil {
return nil, errors.Wrapf(err, "failed to marshal process %d info", p.Pid)
}
pInfo.Info = a
}
processes = append(processes, &pInfo)
}
return &api.ListPidsResponse{
Processes: processes,
}, nil
}
func (l *local) Exec(ctx context.Context, r *api.ExecProcessRequest, _ ...grpc.CallOption) (*ptypes.Empty, error) {
if r.ExecID == "" {
return nil, status.Errorf(codes.InvalidArgument, "exec id cannot be empty")
}
t, err := l.getTask(ctx, r.ContainerID)
if err != nil {
return nil, err
}
if _, err := t.Exec(ctx, r.ExecID, runtime.ExecOpts{
Spec: r.Spec,
IO: runtime.IO{
Stdin: r.Stdin,
Stdout: r.Stdout,
Stderr: r.Stderr,
Terminal: r.Terminal,
},
}); err != nil {
return nil, errdefs.ToGRPC(err)
}
return empty, nil
}
func (l *local) ResizePty(ctx context.Context, r *api.ResizePtyRequest, _ ...grpc.CallOption) (*ptypes.Empty, error) {
t, err := l.getTask(ctx, r.ContainerID)
if err != nil {
return nil, err
}
p := runtime.Process(t)
if r.ExecID != "" {
if p, err = t.Process(ctx, r.ExecID); err != nil {
return nil, errdefs.ToGRPC(err)
}
}
if err := p.ResizePty(ctx, runtime.ConsoleSize{
Width: r.Width,
Height: r.Height,
}); err != nil {
return nil, errdefs.ToGRPC(err)
}
return empty, nil
}
func (l *local) CloseIO(ctx context.Context, r *api.CloseIORequest, _ ...grpc.CallOption) (*ptypes.Empty, error) {
t, err := l.getTask(ctx, r.ContainerID)
if err != nil {
return nil, err
}
p := runtime.Process(t)
if r.ExecID != "" {
if p, err = t.Process(ctx, r.ExecID); err != nil {
return nil, errdefs.ToGRPC(err)
}
}
if r.Stdin {
if err := p.CloseIO(ctx); err != nil {
return nil, err
}
}
return empty, nil
}
func (l *local) Checkpoint(ctx context.Context, r *api.CheckpointTaskRequest, _ ...grpc.CallOption) (*api.CheckpointTaskResponse, error) {
container, err := l.getContainer(ctx, r.ContainerID)
if err != nil {
return nil, err
}
t, err := l.getTaskFromContainer(ctx, container)
if err != nil {
return nil, err
}
image, err := ioutil.TempDir("", "ctd-checkpoint")
if err != nil {
return nil, errdefs.ToGRPC(err)
}
defer os.RemoveAll(image)
if err := t.Checkpoint(ctx, image, r.Options); err != nil {
return nil, errdefs.ToGRPC(err)
}
// write checkpoint to the content store
tar := archive.Diff(ctx, "", image)
cp, err := l.writeContent(ctx, images.MediaTypeContainerd1Checkpoint, image, tar)
// close tar first after write
if err := tar.Close(); err != nil {
return nil, err
}
if err != nil {
return nil, err
}
// write the config to the content store
data, err := container.Spec.Marshal()
if err != nil {
return nil, err
}
spec := bytes.NewReader(data)
specD, err := l.writeContent(ctx, images.MediaTypeContainerd1CheckpointConfig, filepath.Join(image, "spec"), spec)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
return &api.CheckpointTaskResponse{
Descriptors: []*types.Descriptor{
cp,
specD,
},
}, nil
}
func (l *local) Update(ctx context.Context, r *api.UpdateTaskRequest, _ ...grpc.CallOption) (*ptypes.Empty, error) {
t, err := l.getTask(ctx, r.ContainerID)
if err != nil {
return nil, err
}
if err := t.Update(ctx, r.Resources); err != nil {
return nil, errdefs.ToGRPC(err)
}
return empty, nil
}
func (l *local) Metrics(ctx context.Context, r *api.MetricsRequest, _ ...grpc.CallOption) (*api.MetricsResponse, error) {
filter, err := filters.ParseAll(r.Filters...)
if err != nil {
return nil, err
}
var resp api.MetricsResponse
for _, r := range l.runtimes {
tasks, err := r.Tasks(ctx)
if err != nil {
return nil, err
}
getTasksMetrics(ctx, filter, tasks, &resp)
}
return &resp, nil
}
func (l *local) Wait(ctx context.Context, r *api.WaitRequest, _ ...grpc.CallOption) (*api.WaitResponse, error) {
t, err := l.getTask(ctx, r.ContainerID)
if err != nil {
return nil, err
}
p := runtime.Process(t)
if r.ExecID != "" {
if p, err = t.Process(ctx, r.ExecID); err != nil {
return nil, errdefs.ToGRPC(err)
}
}
exit, err := p.Wait(ctx)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
return &api.WaitResponse{
ExitStatus: exit.Status,
ExitedAt: exit.Timestamp,
}, nil
}
func getTasksMetrics(ctx context.Context, filter filters.Filter, tasks []runtime.Task, r *api.MetricsResponse) {
for _, tk := range tasks {
if !filter.Match(filters.AdapterFunc(func(fieldpath []string) (string, bool) {
t := tk
switch fieldpath[0] {
case "id":
return t.ID(), true
case "namespace":
return t.Info().Namespace, true
case "runtime":
return t.Info().Runtime, true
}
return "", false
})) {
continue
}
collected := time.Now()
metrics, err := tk.Metrics(ctx)
if err != nil {
if !errdefs.IsNotFound(err) {
log.G(ctx).WithError(err).Errorf("collecting metrics for %s", tk.ID())
}
continue
}
data, err := typeurl.MarshalAny(metrics)
if err != nil {
log.G(ctx).WithError(err).Errorf("marshal metrics for %s", tk.ID())
continue
}
r.Metrics = append(r.Metrics, &types.Metric{
ID: tk.ID(),
Timestamp: collected,
Data: data,
})
}
}
func (l *local) writeContent(ctx context.Context, mediaType, ref string, r io.Reader) (*types.Descriptor, error) {
writer, err := l.store.Writer(ctx, ref, 0, "")
if err != nil {
return nil, err
}
defer writer.Close()
size, err := io.Copy(writer, r)
if err != nil {
return nil, err
}
if err := writer.Commit(ctx, 0, ""); err != nil {
return nil, err
}
return &types.Descriptor{
MediaType: mediaType,
Digest: writer.Digest(),
Size_: size,
}, nil
}
func (l *local) getContainer(ctx context.Context, id string) (*containers.Container, error) {
var container containers.Container
if err := l.db.View(func(tx *bolt.Tx) error {
store := metadata.NewContainerStore(tx)
var err error
container, err = store.Get(ctx, id)
return err
}); err != nil {
return nil, errdefs.ToGRPC(err)
}
return &container, nil
}
func (l *local) getTask(ctx context.Context, id string) (runtime.Task, error) {
container, err := l.getContainer(ctx, id)
if err != nil {
return nil, err
}
return l.getTaskFromContainer(ctx, container)
}
func (l *local) getTaskFromContainer(ctx context.Context, container *containers.Container) (runtime.Task, error) {
runtime, err := l.getRuntime(container.Runtime.Name)
if err != nil {
return nil, errdefs.ToGRPCf(err, "runtime for task %s", container.Runtime.Name)
}
t, err := runtime.Get(ctx, container.ID)
if err != nil {
return nil, status.Errorf(codes.NotFound, "task %v not found", container.ID)
}
return t, nil
}
func (l *local) getRuntime(name string) (runtime.Runtime, error) {
runtime, ok := l.runtimes[name]
if !ok {
return nil, status.Errorf(codes.NotFound, "unknown runtime %q", name)
}
return runtime, nil
}

View File

@ -17,42 +17,17 @@
package tasks
import (
"bytes"
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
"time"
"github.com/boltdb/bolt"
api "github.com/containerd/containerd/api/services/tasks/v1"
"github.com/containerd/containerd/api/types"
"github.com/containerd/containerd/api/types/task"
"github.com/containerd/containerd/archive"
"github.com/containerd/containerd/containers"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/events"
"github.com/containerd/containerd/filters"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/metadata"
"github.com/containerd/containerd/mount"
"github.com/containerd/containerd/plugin"
"github.com/containerd/containerd/runtime"
"github.com/containerd/typeurl"
"github.com/containerd/containerd/services"
ptypes "github.com/gogo/protobuf/types"
"github.com/pkg/errors"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
var (
_ = (api.TasksServer)(&service{})
empty = &ptypes.Empty{}
_ = (api.TasksServer)(&service{})
)
func init() {
@ -60,51 +35,28 @@ func init() {
Type: plugin.GRPCPlugin,
ID: "tasks",
Requires: []plugin.Type{
plugin.RuntimePlugin,
plugin.MetadataPlugin,
plugin.ServicePlugin,
},
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
plugins, err := ic.GetByType(plugin.ServicePlugin)
if err != nil {
return nil, err
}
p, ok := plugins[services.TasksService]
if !ok {
return nil, errors.New("tasks service not found")
}
i, err := p.Instance()
if err != nil {
return nil, err
}
return &service{local: i.(api.TasksClient)}, nil
},
InitFn: initFunc,
})
}
func initFunc(ic *plugin.InitContext) (interface{}, error) {
rt, err := ic.GetByType(plugin.RuntimePlugin)
if err != nil {
return nil, err
}
m, err := ic.Get(plugin.MetadataPlugin)
if err != nil {
return nil, err
}
cs := m.(*metadata.DB).ContentStore()
runtimes := make(map[string]runtime.Runtime)
for _, rr := range rt {
ri, err := rr.Instance()
if err != nil {
log.G(ic.Context).WithError(err).Warn("could not load runtime instance due to initialization error")
continue
}
r := ri.(runtime.Runtime)
runtimes[r.ID()] = r
}
if len(runtimes) == 0 {
return nil, errors.New("no runtimes available to create task service")
}
return &service{
runtimes: runtimes,
db: m.(*metadata.DB),
store: cs,
publisher: ic.Events,
}, nil
}
type service struct {
runtimes map[string]runtime.Runtime
db *metadata.DB
store content.Store
publisher events.Publisher
local api.TasksClient
}
func (s *service) Register(server *grpc.Server) error {
@ -113,526 +65,69 @@ func (s *service) Register(server *grpc.Server) error {
}
func (s *service) Create(ctx context.Context, r *api.CreateTaskRequest) (*api.CreateTaskResponse, error) {
var (
checkpointPath string
err error
)
if r.Checkpoint != nil {
checkpointPath, err = ioutil.TempDir("", "ctrd-checkpoint")
if err != nil {
return nil, err
}
if r.Checkpoint.MediaType != images.MediaTypeContainerd1Checkpoint {
return nil, fmt.Errorf("unsupported checkpoint type %q", r.Checkpoint.MediaType)
}
reader, err := s.store.ReaderAt(ctx, r.Checkpoint.Digest)
if err != nil {
return nil, err
}
_, err = archive.Apply(ctx, checkpointPath, content.NewReader(reader))
reader.Close()
if err != nil {
return nil, err
}
}
container, err := s.getContainer(ctx, r.ContainerID)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
opts := runtime.CreateOpts{
Spec: container.Spec,
IO: runtime.IO{
Stdin: r.Stdin,
Stdout: r.Stdout,
Stderr: r.Stderr,
Terminal: r.Terminal,
},
Checkpoint: checkpointPath,
Options: r.Options,
}
for _, m := range r.Rootfs {
opts.Rootfs = append(opts.Rootfs, mount.Mount{
Type: m.Type,
Source: m.Source,
Options: m.Options,
})
}
runtime, err := s.getRuntime(container.Runtime.Name)
if err != nil {
return nil, err
}
c, err := runtime.Create(ctx, r.ContainerID, opts)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
state, err := c.State(ctx)
if err != nil {
log.G(ctx).Error(err)
}
return &api.CreateTaskResponse{
ContainerID: r.ContainerID,
Pid: state.Pid,
}, nil
return s.local.Create(ctx, r)
}
func (s *service) Start(ctx context.Context, r *api.StartRequest) (*api.StartResponse, error) {
t, err := s.getTask(ctx, r.ContainerID)
if err != nil {
return nil, err
}
p := runtime.Process(t)
if r.ExecID != "" {
if p, err = t.Process(ctx, r.ExecID); err != nil {
return nil, errdefs.ToGRPC(err)
}
}
if err := p.Start(ctx); err != nil {
return nil, errdefs.ToGRPC(err)
}
state, err := p.State(ctx)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
return &api.StartResponse{
Pid: state.Pid,
}, nil
return s.local.Start(ctx, r)
}
func (s *service) Delete(ctx context.Context, r *api.DeleteTaskRequest) (*api.DeleteResponse, error) {
t, err := s.getTask(ctx, r.ContainerID)
if err != nil {
return nil, err
}
runtime, err := s.getRuntime(t.Info().Runtime)
if err != nil {
return nil, err
}
exit, err := runtime.Delete(ctx, t)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
return &api.DeleteResponse{
ExitStatus: exit.Status,
ExitedAt: exit.Timestamp,
Pid: exit.Pid,
}, nil
return s.local.Delete(ctx, r)
}
func (s *service) DeleteProcess(ctx context.Context, r *api.DeleteProcessRequest) (*api.DeleteResponse, error) {
t, err := s.getTask(ctx, r.ContainerID)
if err != nil {
return nil, err
}
exit, err := t.DeleteProcess(ctx, r.ExecID)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
return &api.DeleteResponse{
ID: r.ExecID,
ExitStatus: exit.Status,
ExitedAt: exit.Timestamp,
Pid: exit.Pid,
}, nil
}
func processFromContainerd(ctx context.Context, p runtime.Process) (*task.Process, error) {
state, err := p.State(ctx)
if err != nil {
return nil, err
}
var status task.Status
switch state.Status {
case runtime.CreatedStatus:
status = task.StatusCreated
case runtime.RunningStatus:
status = task.StatusRunning
case runtime.StoppedStatus:
status = task.StatusStopped
case runtime.PausedStatus:
status = task.StatusPaused
case runtime.PausingStatus:
status = task.StatusPausing
default:
log.G(ctx).WithField("status", state.Status).Warn("unknown status")
}
return &task.Process{
ID: p.ID(),
Pid: state.Pid,
Status: status,
Stdin: state.Stdin,
Stdout: state.Stdout,
Stderr: state.Stderr,
Terminal: state.Terminal,
ExitStatus: state.ExitStatus,
ExitedAt: state.ExitedAt,
}, nil
return s.local.DeleteProcess(ctx, r)
}
func (s *service) Get(ctx context.Context, r *api.GetRequest) (*api.GetResponse, error) {
task, err := s.getTask(ctx, r.ContainerID)
if err != nil {
return nil, err
}
p := runtime.Process(task)
if r.ExecID != "" {
if p, err = task.Process(ctx, r.ExecID); err != nil {
return nil, errdefs.ToGRPC(err)
}
}
t, err := processFromContainerd(ctx, p)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
return &api.GetResponse{
Process: t,
}, nil
return s.local.Get(ctx, r)
}
func (s *service) List(ctx context.Context, r *api.ListTasksRequest) (*api.ListTasksResponse, error) {
resp := &api.ListTasksResponse{}
for _, r := range s.runtimes {
tasks, err := r.Tasks(ctx)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
addTasks(ctx, resp, tasks)
}
return resp, nil
}
func addTasks(ctx context.Context, r *api.ListTasksResponse, tasks []runtime.Task) {
for _, t := range tasks {
tt, err := processFromContainerd(ctx, t)
if err != nil {
if !errdefs.IsNotFound(err) { // handle race with deletion
log.G(ctx).WithError(err).WithField("id", t.ID()).Error("converting task to protobuf")
}
continue
}
r.Tasks = append(r.Tasks, tt)
}
return s.local.List(ctx, r)
}
func (s *service) Pause(ctx context.Context, r *api.PauseTaskRequest) (*ptypes.Empty, error) {
t, err := s.getTask(ctx, r.ContainerID)
if err != nil {
return nil, err
}
err = t.Pause(ctx)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
return empty, nil
return s.local.Pause(ctx, r)
}
func (s *service) Resume(ctx context.Context, r *api.ResumeTaskRequest) (*ptypes.Empty, error) {
t, err := s.getTask(ctx, r.ContainerID)
if err != nil {
return nil, err
}
err = t.Resume(ctx)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
return empty, nil
return s.local.Resume(ctx, r)
}
func (s *service) Kill(ctx context.Context, r *api.KillRequest) (*ptypes.Empty, error) {
t, err := s.getTask(ctx, r.ContainerID)
if err != nil {
return nil, err
}
p := runtime.Process(t)
if r.ExecID != "" {
if p, err = t.Process(ctx, r.ExecID); err != nil {
return nil, errdefs.ToGRPC(err)
}
}
if err := p.Kill(ctx, r.Signal, r.All); err != nil {
return nil, errdefs.ToGRPC(err)
}
return empty, nil
return s.local.Kill(ctx, r)
}
func (s *service) ListPids(ctx context.Context, r *api.ListPidsRequest) (*api.ListPidsResponse, error) {
t, err := s.getTask(ctx, r.ContainerID)
if err != nil {
return nil, err
}
processList, err := t.Pids(ctx)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
var processes []*task.ProcessInfo
for _, p := range processList {
pInfo := task.ProcessInfo{
Pid: p.Pid,
}
if p.Info != nil {
a, err := typeurl.MarshalAny(p.Info)
if err != nil {
return nil, errors.Wrapf(err, "failed to marshal process %d info", p.Pid)
}
pInfo.Info = a
}
processes = append(processes, &pInfo)
}
return &api.ListPidsResponse{
Processes: processes,
}, nil
return s.local.ListPids(ctx, r)
}
func (s *service) Exec(ctx context.Context, r *api.ExecProcessRequest) (*ptypes.Empty, error) {
if r.ExecID == "" {
return nil, status.Errorf(codes.InvalidArgument, "exec id cannot be empty")
}
t, err := s.getTask(ctx, r.ContainerID)
if err != nil {
return nil, err
}
if _, err := t.Exec(ctx, r.ExecID, runtime.ExecOpts{
Spec: r.Spec,
IO: runtime.IO{
Stdin: r.Stdin,
Stdout: r.Stdout,
Stderr: r.Stderr,
Terminal: r.Terminal,
},
}); err != nil {
return nil, errdefs.ToGRPC(err)
}
return empty, nil
return s.local.Exec(ctx, r)
}
func (s *service) ResizePty(ctx context.Context, r *api.ResizePtyRequest) (*ptypes.Empty, error) {
t, err := s.getTask(ctx, r.ContainerID)
if err != nil {
return nil, err
}
p := runtime.Process(t)
if r.ExecID != "" {
if p, err = t.Process(ctx, r.ExecID); err != nil {
return nil, errdefs.ToGRPC(err)
}
}
if err := p.ResizePty(ctx, runtime.ConsoleSize{
Width: r.Width,
Height: r.Height,
}); err != nil {
return nil, errdefs.ToGRPC(err)
}
return empty, nil
return s.local.ResizePty(ctx, r)
}
func (s *service) CloseIO(ctx context.Context, r *api.CloseIORequest) (*ptypes.Empty, error) {
t, err := s.getTask(ctx, r.ContainerID)
if err != nil {
return nil, err
}
p := runtime.Process(t)
if r.ExecID != "" {
if p, err = t.Process(ctx, r.ExecID); err != nil {
return nil, errdefs.ToGRPC(err)
}
}
if r.Stdin {
if err := p.CloseIO(ctx); err != nil {
return nil, err
}
}
return empty, nil
return s.local.CloseIO(ctx, r)
}
func (s *service) Checkpoint(ctx context.Context, r *api.CheckpointTaskRequest) (*api.CheckpointTaskResponse, error) {
container, err := s.getContainer(ctx, r.ContainerID)
if err != nil {
return nil, err
}
t, err := s.getTaskFromContainer(ctx, container)
if err != nil {
return nil, err
}
image, err := ioutil.TempDir("", "ctd-checkpoint")
if err != nil {
return nil, errdefs.ToGRPC(err)
}
defer os.RemoveAll(image)
if err := t.Checkpoint(ctx, image, r.Options); err != nil {
return nil, errdefs.ToGRPC(err)
}
// write checkpoint to the content store
tar := archive.Diff(ctx, "", image)
cp, err := s.writeContent(ctx, images.MediaTypeContainerd1Checkpoint, image, tar)
// close tar first after write
if err := tar.Close(); err != nil {
return nil, err
}
if err != nil {
return nil, err
}
// write the config to the content store
data, err := container.Spec.Marshal()
if err != nil {
return nil, err
}
spec := bytes.NewReader(data)
specD, err := s.writeContent(ctx, images.MediaTypeContainerd1CheckpointConfig, filepath.Join(image, "spec"), spec)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
return &api.CheckpointTaskResponse{
Descriptors: []*types.Descriptor{
cp,
specD,
},
}, nil
return s.local.Checkpoint(ctx, r)
}
func (s *service) Update(ctx context.Context, r *api.UpdateTaskRequest) (*ptypes.Empty, error) {
t, err := s.getTask(ctx, r.ContainerID)
if err != nil {
return nil, err
}
if err := t.Update(ctx, r.Resources); err != nil {
return nil, errdefs.ToGRPC(err)
}
return empty, nil
return s.local.Update(ctx, r)
}
func (s *service) Metrics(ctx context.Context, r *api.MetricsRequest) (*api.MetricsResponse, error) {
filter, err := filters.ParseAll(r.Filters...)
if err != nil {
return nil, err
}
var resp api.MetricsResponse
for _, r := range s.runtimes {
tasks, err := r.Tasks(ctx)
if err != nil {
return nil, err
}
getTasksMetrics(ctx, filter, tasks, &resp)
}
return &resp, nil
return s.local.Metrics(ctx, r)
}
func (s *service) Wait(ctx context.Context, r *api.WaitRequest) (*api.WaitResponse, error) {
t, err := s.getTask(ctx, r.ContainerID)
if err != nil {
return nil, err
}
p := runtime.Process(t)
if r.ExecID != "" {
if p, err = t.Process(ctx, r.ExecID); err != nil {
return nil, errdefs.ToGRPC(err)
}
}
exit, err := p.Wait(ctx)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
return &api.WaitResponse{
ExitStatus: exit.Status,
ExitedAt: exit.Timestamp,
}, nil
}
func getTasksMetrics(ctx context.Context, filter filters.Filter, tasks []runtime.Task, r *api.MetricsResponse) {
for _, tk := range tasks {
if !filter.Match(filters.AdapterFunc(func(fieldpath []string) (string, bool) {
t := tk
switch fieldpath[0] {
case "id":
return t.ID(), true
case "namespace":
return t.Info().Namespace, true
case "runtime":
return t.Info().Runtime, true
}
return "", false
})) {
continue
}
collected := time.Now()
metrics, err := tk.Metrics(ctx)
if err != nil {
if !errdefs.IsNotFound(err) {
log.G(ctx).WithError(err).Errorf("collecting metrics for %s", tk.ID())
}
continue
}
data, err := typeurl.MarshalAny(metrics)
if err != nil {
log.G(ctx).WithError(err).Errorf("marshal metrics for %s", tk.ID())
continue
}
r.Metrics = append(r.Metrics, &types.Metric{
ID: tk.ID(),
Timestamp: collected,
Data: data,
})
}
}
func (s *service) writeContent(ctx context.Context, mediaType, ref string, r io.Reader) (*types.Descriptor, error) {
writer, err := s.store.Writer(ctx, ref, 0, "")
if err != nil {
return nil, err
}
defer writer.Close()
size, err := io.Copy(writer, r)
if err != nil {
return nil, err
}
if err := writer.Commit(ctx, 0, ""); err != nil {
return nil, err
}
return &types.Descriptor{
MediaType: mediaType,
Digest: writer.Digest(),
Size_: size,
}, nil
}
func (s *service) getContainer(ctx context.Context, id string) (*containers.Container, error) {
var container containers.Container
if err := s.db.View(func(tx *bolt.Tx) error {
store := metadata.NewContainerStore(tx)
var err error
container, err = store.Get(ctx, id)
return err
}); err != nil {
return nil, errdefs.ToGRPC(err)
}
return &container, nil
}
func (s *service) getTask(ctx context.Context, id string) (runtime.Task, error) {
container, err := s.getContainer(ctx, id)
if err != nil {
return nil, err
}
return s.getTaskFromContainer(ctx, container)
}
func (s *service) getTaskFromContainer(ctx context.Context, container *containers.Container) (runtime.Task, error) {
runtime, err := s.getRuntime(container.Runtime.Name)
if err != nil {
return nil, errdefs.ToGRPCf(err, "runtime for task %s", container.Runtime.Name)
}
t, err := runtime.Get(ctx, container.ID)
if err != nil {
return nil, status.Errorf(codes.NotFound, "task %v not found", container.ID)
}
return t, nil
}
func (s *service) getRuntime(name string) (runtime.Runtime, error) {
runtime, ok := s.runtimes[name]
if !ok {
return nil, status.Errorf(codes.NotFound, "unknown runtime %q", name)
}
return runtime, nil
return s.local.Wait(ctx, r)
}