/* Copyright The containerd Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package containerd import ( "context" "fmt" "io" "net/http" "runtime" "strconv" "sync" "time" containersapi "github.com/containerd/containerd/api/services/containers/v1" contentapi "github.com/containerd/containerd/api/services/content/v1" diffapi "github.com/containerd/containerd/api/services/diff/v1" eventsapi "github.com/containerd/containerd/api/services/events/v1" imagesapi "github.com/containerd/containerd/api/services/images/v1" introspectionapi "github.com/containerd/containerd/api/services/introspection/v1" leasesapi "github.com/containerd/containerd/api/services/leases/v1" namespacesapi "github.com/containerd/containerd/api/services/namespaces/v1" snapshotsapi "github.com/containerd/containerd/api/services/snapshots/v1" "github.com/containerd/containerd/api/services/tasks/v1" versionservice "github.com/containerd/containerd/api/services/version/v1" "github.com/containerd/containerd/containers" "github.com/containerd/containerd/content" "github.com/containerd/containerd/defaults" "github.com/containerd/containerd/dialer" "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/events" "github.com/containerd/containerd/images" "github.com/containerd/containerd/namespaces" "github.com/containerd/containerd/plugin" "github.com/containerd/containerd/remotes" "github.com/containerd/containerd/remotes/docker" "github.com/containerd/containerd/remotes/docker/schema1" "github.com/containerd/containerd/snapshots" "github.com/containerd/typeurl" ptypes "github.com/gogo/protobuf/types" ocispec "github.com/opencontainers/image-spec/specs-go/v1" specs "github.com/opencontainers/runtime-spec/specs-go" "github.com/pkg/errors" "google.golang.org/grpc" "google.golang.org/grpc/health/grpc_health_v1" ) func init() { const prefix = "types.containerd.io" // register TypeUrls for commonly marshaled external types major := strconv.Itoa(specs.VersionMajor) typeurl.Register(&specs.Spec{}, prefix, "opencontainers/runtime-spec", major, "Spec") typeurl.Register(&specs.Process{}, prefix, "opencontainers/runtime-spec", major, "Process") typeurl.Register(&specs.LinuxResources{}, prefix, "opencontainers/runtime-spec", major, "LinuxResources") typeurl.Register(&specs.WindowsResources{}, prefix, "opencontainers/runtime-spec", major, "WindowsResources") } // New returns a new containerd client that is connected to the containerd // instance provided by address func New(address string, opts ...ClientOpt) (*Client, error) { var copts clientOpts for _, o := range opts { if err := o(&copts); err != nil { return nil, err } } c := &Client{ runtime: fmt.Sprintf("%s.%s", plugin.RuntimePlugin, runtime.GOOS), } if copts.services != nil { c.services = *copts.services } if address != "" { gopts := []grpc.DialOption{ grpc.WithBlock(), grpc.WithInsecure(), grpc.WithTimeout(60 * time.Second), grpc.FailOnNonTempDialError(true), grpc.WithBackoffMaxDelay(3 * time.Second), grpc.WithDialer(dialer.Dialer), // TODO(stevvooe): We may need to allow configuration of this on the client. grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(defaults.DefaultMaxRecvMsgSize)), grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(defaults.DefaultMaxSendMsgSize)), } if len(copts.dialOptions) > 0 { gopts = copts.dialOptions } if copts.defaultns != "" { unary, stream := newNSInterceptors(copts.defaultns) gopts = append(gopts, grpc.WithUnaryInterceptor(unary), grpc.WithStreamInterceptor(stream), ) } connector := func() (*grpc.ClientConn, error) { conn, err := grpc.Dial(dialer.DialAddress(address), gopts...) if err != nil { return nil, errors.Wrapf(err, "failed to dial %q", address) } return conn, nil } conn, err := connector() if err != nil { return nil, err } c.conn, c.connector = conn, connector } if copts.services == nil && c.conn == nil { return nil, errors.New("no grpc connection or services is available") } return c, nil } // NewWithConn returns a new containerd client that is connected to the containerd // instance provided by the connection func NewWithConn(conn *grpc.ClientConn, opts ...ClientOpt) (*Client, error) { var copts clientOpts for _, o := range opts { if err := o(&copts); err != nil { return nil, err } } c := &Client{ conn: conn, runtime: fmt.Sprintf("%s.%s", plugin.RuntimePlugin, runtime.GOOS), } if copts.services != nil { c.services = *copts.services } return c, nil } // Client is the client to interact with containerd and its various services // using a uniform interface type Client struct { services connMu sync.Mutex conn *grpc.ClientConn runtime string connector func() (*grpc.ClientConn, error) } // Reconnect re-establishes the GRPC connection to the containerd daemon func (c *Client) Reconnect() error { if c.connector == nil { return errors.New("unable to reconnect to containerd, no connector available") } c.connMu.Lock() defer c.connMu.Unlock() c.conn.Close() conn, err := c.connector() if err != nil { return err } c.conn = conn return nil } // IsServing returns true if the client can successfully connect to the // containerd daemon and the healthcheck service returns the SERVING // response. // This call will block if a transient error is encountered during // connection. A timeout can be set in the context to ensure it returns // early. func (c *Client) IsServing(ctx context.Context) (bool, error) { c.connMu.Lock() if c.conn == nil { c.connMu.Unlock() return false, errors.New("no grpc connection available") } c.connMu.Unlock() r, err := c.HealthService().Check(ctx, &grpc_health_v1.HealthCheckRequest{}, grpc.FailFast(false)) if err != nil { return false, err } return r.Status == grpc_health_v1.HealthCheckResponse_SERVING, nil } // Containers returns all containers created in containerd func (c *Client) Containers(ctx context.Context, filters ...string) ([]Container, error) { r, err := c.ContainerService().List(ctx, filters...) if err != nil { return nil, err } var out []Container for _, container := range r { out = append(out, containerFromRecord(c, container)) } return out, nil } // NewContainer will create a new container in container with the provided id // the id must be unique within the namespace func (c *Client) NewContainer(ctx context.Context, id string, opts ...NewContainerOpts) (Container, error) { ctx, done, err := c.WithLease(ctx) if err != nil { return nil, err } defer done(ctx) container := containers.Container{ ID: id, Runtime: containers.RuntimeInfo{ Name: c.runtime, }, } for _, o := range opts { if err := o(ctx, c, &container); err != nil { return nil, err } } r, err := c.ContainerService().Create(ctx, container) if err != nil { return nil, err } return containerFromRecord(c, r), nil } // LoadContainer loads an existing container from metadata func (c *Client) LoadContainer(ctx context.Context, id string) (Container, error) { r, err := c.ContainerService().Get(ctx, id) if err != nil { return nil, err } return containerFromRecord(c, r), nil } // RemoteContext is used to configure object resolutions and transfers with // remote content stores and image providers. type RemoteContext struct { // Resolver is used to resolve names to objects, fetchers, and pushers. // If no resolver is provided, defaults to Docker registry resolver. Resolver remotes.Resolver // Platforms defines which platforms to handle when doing the image operation. // If this field is empty, content for all platforms will be pulled. Platforms []string // Unpack is done after an image is pulled to extract into a snapshotter. // If an image is not unpacked on pull, it can be unpacked any time // afterwards. Unpacking is required to run an image. Unpack bool // Snapshotter used for unpacking Snapshotter string // Labels to be applied to the created image Labels map[string]string // BaseHandlers are a set of handlers which get are called on dispatch. // These handlers always get called before any operation specific // handlers. BaseHandlers []images.Handler // ConvertSchema1 is whether to convert Docker registry schema 1 // manifests. If this option is false then any image which resolves // to schema 1 will return an error since schema 1 is not supported. ConvertSchema1 bool } func defaultRemoteContext() *RemoteContext { return &RemoteContext{ Resolver: docker.NewResolver(docker.ResolverOptions{ Client: http.DefaultClient, }), Snapshotter: DefaultSnapshotter, } } // Pull downloads the provided content into containerd's content store func (c *Client) Pull(ctx context.Context, ref string, opts ...RemoteOpt) (Image, error) { pullCtx := defaultRemoteContext() for _, o := range opts { if err := o(c, pullCtx); err != nil { return nil, err } } store := c.ContentStore() ctx, done, err := c.WithLease(ctx) if err != nil { return nil, err } defer done(ctx) name, desc, err := pullCtx.Resolver.Resolve(ctx, ref) if err != nil { return nil, errors.Wrapf(err, "failed to resolve reference %q", ref) } fetcher, err := pullCtx.Resolver.Fetcher(ctx, name) if err != nil { return nil, errors.Wrapf(err, "failed to get fetcher for %q", name) } var ( schema1Converter *schema1.Converter handler images.Handler ) if desc.MediaType == images.MediaTypeDockerSchema1Manifest && pullCtx.ConvertSchema1 { schema1Converter = schema1.NewConverter(store, fetcher) handler = images.Handlers(append(pullCtx.BaseHandlers, schema1Converter)...) } else { // Get all the children for a descriptor childrenHandler := images.ChildrenHandler(store) // Set any children labels for that content childrenHandler = images.SetChildrenLabels(store, childrenHandler) // Filter childen by platforms childrenHandler = images.FilterPlatforms(childrenHandler, pullCtx.Platforms...) handler = images.Handlers(append(pullCtx.BaseHandlers, remotes.FetchHandler(store, fetcher), childrenHandler, )...) } if err := images.Dispatch(ctx, handler, desc); err != nil { return nil, err } if schema1Converter != nil { desc, err = schema1Converter.Convert(ctx) if err != nil { return nil, err } } imgrec := images.Image{ Name: name, Target: desc, Labels: pullCtx.Labels, } is := c.ImageService() if created, err := is.Create(ctx, imgrec); err != nil { if !errdefs.IsAlreadyExists(err) { return nil, err } updated, err := is.Update(ctx, imgrec) if err != nil { return nil, err } imgrec = updated } else { imgrec = created } img := &image{ client: c, i: imgrec, } if pullCtx.Unpack { if err := img.Unpack(ctx, pullCtx.Snapshotter); err != nil { errors.Wrapf(err, "failed to unpack image on snapshotter %s", pullCtx.Snapshotter) } } return img, nil } // Push uploads the provided content to a remote resource func (c *Client) Push(ctx context.Context, ref string, desc ocispec.Descriptor, opts ...RemoteOpt) error { pushCtx := defaultRemoteContext() for _, o := range opts { if err := o(c, pushCtx); err != nil { return err } } pusher, err := pushCtx.Resolver.Pusher(ctx, ref) if err != nil { return err } return remotes.PushContent(ctx, pusher, desc, c.ContentStore(), pushCtx.Platforms, pushCtx.BaseHandlers...) } // GetImage returns an existing image func (c *Client) GetImage(ctx context.Context, ref string) (Image, error) { i, err := c.ImageService().Get(ctx, ref) if err != nil { return nil, err } return &image{ client: c, i: i, }, nil } // ListImages returns all existing images func (c *Client) ListImages(ctx context.Context, filters ...string) ([]Image, error) { imgs, err := c.ImageService().List(ctx, filters...) if err != nil { return nil, err } images := make([]Image, len(imgs)) for i, img := range imgs { images[i] = &image{ client: c, i: img, } } return images, nil } // Subscribe to events that match one or more of the provided filters. // // Callers should listen on both the envelope and errs channels. If the errs // channel returns nil or an error, the subscriber should terminate. // // The subscriber can stop receiving events by canceling the provided context. // The errs channel will be closed and return a nil error. func (c *Client) Subscribe(ctx context.Context, filters ...string) (ch <-chan *events.Envelope, errs <-chan error) { return c.EventService().Subscribe(ctx, filters...) } // Close closes the clients connection to containerd func (c *Client) Close() error { c.connMu.Lock() defer c.connMu.Unlock() if c.conn != nil { return c.conn.Close() } return nil } // NamespaceService returns the underlying Namespaces Store func (c *Client) NamespaceService() namespaces.Store { if c.namespaceStore != nil { return c.namespaceStore } return NewNamespaceStoreFromClient(namespacesapi.NewNamespacesClient(c.conn)) } // ContainerService returns the underlying container Store func (c *Client) ContainerService() containers.Store { if c.containerStore != nil { return c.containerStore } return NewRemoteContainerStore(containersapi.NewContainersClient(c.conn)) } // ContentStore returns the underlying content Store func (c *Client) ContentStore() content.Store { if c.contentStore != nil { return c.contentStore } return NewContentStoreFromClient(contentapi.NewContentClient(c.conn)) } // SnapshotService returns the underlying snapshotter for the provided snapshotter name func (c *Client) SnapshotService(snapshotterName string) snapshots.Snapshotter { if c.snapshotters != nil { return c.snapshotters[snapshotterName] } return NewSnapshotterFromClient(snapshotsapi.NewSnapshotsClient(c.conn), snapshotterName) } // TaskService returns the underlying TasksClient func (c *Client) TaskService() tasks.TasksClient { if c.taskService != nil { return c.taskService } return tasks.NewTasksClient(c.conn) } // ImageService returns the underlying image Store func (c *Client) ImageService() images.Store { if c.imageStore != nil { return c.imageStore } return NewImageStoreFromClient(imagesapi.NewImagesClient(c.conn)) } // DiffService returns the underlying Differ func (c *Client) DiffService() DiffService { if c.diffService != nil { return c.diffService } return NewDiffServiceFromClient(diffapi.NewDiffClient(c.conn)) } // IntrospectionService returns the underlying Introspection Client func (c *Client) IntrospectionService() introspectionapi.IntrospectionClient { return introspectionapi.NewIntrospectionClient(c.conn) } // LeasesService returns the underlying Leases Client func (c *Client) LeasesService() leasesapi.LeasesClient { if c.leasesService != nil { return c.leasesService } return leasesapi.NewLeasesClient(c.conn) } // HealthService returns the underlying GRPC HealthClient func (c *Client) HealthService() grpc_health_v1.HealthClient { return grpc_health_v1.NewHealthClient(c.conn) } // EventService returns the underlying event service func (c *Client) EventService() EventService { if c.eventService != nil { return c.eventService } return NewEventServiceFromClient(eventsapi.NewEventsClient(c.conn)) } // VersionService returns the underlying VersionClient func (c *Client) VersionService() versionservice.VersionClient { return versionservice.NewVersionClient(c.conn) } // Version of containerd type Version struct { // Version number Version string // Revision from git that was built Revision string } // Version returns the version of containerd that the client is connected to func (c *Client) Version(ctx context.Context) (Version, error) { c.connMu.Lock() if c.conn == nil { c.connMu.Unlock() return Version{}, errors.New("no grpc connection available") } c.connMu.Unlock() response, err := c.VersionService().Version(ctx, &ptypes.Empty{}) if err != nil { return Version{}, err } return Version{ Version: response.Version, Revision: response.Revision, }, nil } type importOpts struct { } // ImportOpt allows the caller to specify import specific options type ImportOpt func(c *importOpts) error func resolveImportOpt(opts ...ImportOpt) (importOpts, error) { var iopts importOpts for _, o := range opts { if err := o(&iopts); err != nil { return iopts, err } } return iopts, nil } // Import imports an image from a Tar stream using reader. // Caller needs to specify importer. Future version may use oci.v1 as the default. // Note that unreferrenced blobs may be imported to the content store as well. func (c *Client) Import(ctx context.Context, importer images.Importer, reader io.Reader, opts ...ImportOpt) ([]Image, error) { _, err := resolveImportOpt(opts...) // unused now if err != nil { return nil, err } ctx, done, err := c.WithLease(ctx) if err != nil { return nil, err } defer done(ctx) imgrecs, err := importer.Import(ctx, c.ContentStore(), reader) if err != nil { // is.Update() is not called on error return nil, err } is := c.ImageService() var images []Image for _, imgrec := range imgrecs { if updated, err := is.Update(ctx, imgrec, "target"); err != nil { if !errdefs.IsNotFound(err) { return nil, err } created, err := is.Create(ctx, imgrec) if err != nil { return nil, err } imgrec = created } else { imgrec = updated } images = append(images, &image{ client: c, i: imgrec, }) } return images, nil } type exportOpts struct { } // ExportOpt allows the caller to specify export-specific options type ExportOpt func(c *exportOpts) error func resolveExportOpt(opts ...ExportOpt) (exportOpts, error) { var eopts exportOpts for _, o := range opts { if err := o(&eopts); err != nil { return eopts, err } } return eopts, nil } // Export exports an image to a Tar stream. // OCI format is used by default. // It is up to caller to put "org.opencontainers.image.ref.name" annotation to desc. // TODO(AkihiroSuda): support exporting multiple descriptors at once to a single archive stream. func (c *Client) Export(ctx context.Context, exporter images.Exporter, desc ocispec.Descriptor, opts ...ExportOpt) (io.ReadCloser, error) { _, err := resolveExportOpt(opts...) // unused now if err != nil { return nil, err } pr, pw := io.Pipe() go func() { pw.CloseWithError(exporter.Export(ctx, c.ContentStore(), desc, pw)) }() return pr, nil }