containerd/client.go
Stephen J Day 677257f032
service/snapshotter: move default to client
In order to enforce strict handling of snapshotter values on the
container object, the defaults have been moved to the client side. This
ensures that we correctly qualify the snapshotter under use when from
the container at the time it was created, rather than possibly losing
the metadata on a change of default.

Signed-off-by: Stephen J Day <stephen.day@docker.com>
2017-08-18 14:43:12 -07:00

541 lines
15 KiB
Go

package containerd
import (
"context"
"fmt"
"io"
"net/http"
"runtime"
"strconv"
"sync"
"time"
containersapi "github.com/containerd/containerd/api/services/containers/v1"
contentapi "github.com/containerd/containerd/api/services/content/v1"
diffapi "github.com/containerd/containerd/api/services/diff/v1"
eventsapi "github.com/containerd/containerd/api/services/events/v1"
imagesapi "github.com/containerd/containerd/api/services/images/v1"
namespacesapi "github.com/containerd/containerd/api/services/namespaces/v1"
snapshotapi "github.com/containerd/containerd/api/services/snapshot/v1"
"github.com/containerd/containerd/api/services/tasks/v1"
versionservice "github.com/containerd/containerd/api/services/version/v1"
"github.com/containerd/containerd/containers"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/plugin"
"github.com/containerd/containerd/reference"
"github.com/containerd/containerd/remotes"
"github.com/containerd/containerd/remotes/docker"
"github.com/containerd/containerd/remotes/docker/schema1"
contentservice "github.com/containerd/containerd/services/content"
"github.com/containerd/containerd/services/diff"
diffservice "github.com/containerd/containerd/services/diff"
imagesservice "github.com/containerd/containerd/services/images"
snapshotservice "github.com/containerd/containerd/services/snapshot"
"github.com/containerd/containerd/snapshot"
"github.com/containerd/containerd/typeurl"
pempty "github.com/golang/protobuf/ptypes/empty"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
specs "github.com/opencontainers/runtime-spec/specs-go"
"github.com/pkg/errors"
"google.golang.org/grpc"
"google.golang.org/grpc/health/grpc_health_v1"
)
func init() {
// register TypeUrls for commonly marshaled external types
major := strconv.Itoa(specs.VersionMajor)
typeurl.Register(&specs.Spec{}, "opencontainers/runtime-spec", major, "Spec")
typeurl.Register(&specs.Process{}, "opencontainers/runtime-spec", major, "Process")
typeurl.Register(&specs.LinuxResources{}, "opencontainers/runtime-spec", major, "LinuxResources")
typeurl.Register(&specs.WindowsResources{}, "opencontainers/runtime-spec", major, "WindowsResources")
}
// New returns a new containerd client that is connected to the containerd
// instance provided by address
func New(address string, opts ...ClientOpt) (*Client, error) {
var copts clientOpts
for _, o := range opts {
if err := o(&copts); err != nil {
return nil, err
}
}
gopts := []grpc.DialOption{
grpc.WithBlock(),
grpc.WithInsecure(),
grpc.WithTimeout(60 * time.Second),
grpc.FailOnNonTempDialError(true),
grpc.WithBackoffMaxDelay(3 * time.Second),
grpc.WithDialer(Dialer),
}
if len(copts.dialOptions) > 0 {
gopts = copts.dialOptions
}
if copts.defaultns != "" {
unary, stream := newNSInterceptors(copts.defaultns)
gopts = append(gopts,
grpc.WithUnaryInterceptor(unary),
grpc.WithStreamInterceptor(stream),
)
}
conn, err := grpc.Dial(DialAddress(address), gopts...)
if err != nil {
return nil, errors.Wrapf(err, "failed to dial %q", address)
}
return NewWithConn(conn, opts...)
}
// NewWithConn returns a new containerd client that is connected to the containerd
// instance provided by the connection
func NewWithConn(conn *grpc.ClientConn, opts ...ClientOpt) (*Client, error) {
return &Client{
conn: conn,
runtime: fmt.Sprintf("%s.%s", plugin.RuntimePlugin, runtime.GOOS),
}, nil
}
// Client is the client to interact with containerd and its various services
// using a uniform interface
type Client struct {
conn *grpc.ClientConn
defaultns string
runtime string
}
// IsServing returns true if the client can successfully connect to the
// containerd daemon and the healthcheck service returns the SERVING
// response.
// This call will block if a transient error is encountered during
// connection. A timeout can be set in the context to ensure it returns
// early.
func (c *Client) IsServing(ctx context.Context) (bool, error) {
r, err := c.HealthService().Check(ctx, &grpc_health_v1.HealthCheckRequest{}, grpc.FailFast(false))
if err != nil {
return false, err
}
return r.Status == grpc_health_v1.HealthCheckResponse_SERVING, nil
}
// Containers returns all containers created in containerd
func (c *Client) Containers(ctx context.Context, filters ...string) ([]Container, error) {
r, err := c.ContainerService().List(ctx, filters...)
if err != nil {
return nil, err
}
var out []Container
for _, container := range r {
out = append(out, containerFromRecord(c, container))
}
return out, nil
}
// NewContainer will create a new container in container with the provided id
// the id must be unique within the namespace
func (c *Client) NewContainer(ctx context.Context, id string, opts ...NewContainerOpts) (Container, error) {
container := containers.Container{
ID: id,
Runtime: containers.RuntimeInfo{
Name: c.runtime,
},
}
for _, o := range opts {
if err := o(ctx, c, &container); err != nil {
return nil, err
}
}
r, err := c.ContainerService().Create(ctx, container)
if err != nil {
return nil, err
}
return containerFromRecord(c, r), nil
}
// LoadContainer loads an existing container from metadata
func (c *Client) LoadContainer(ctx context.Context, id string) (Container, error) {
r, err := c.ContainerService().Get(ctx, id)
if err != nil {
return nil, err
}
return containerFromRecord(c, r), nil
}
// RemoteContext is used to configure object resolutions and transfers with
// remote content stores and image providers.
type RemoteContext struct {
// Resolver is used to resolve names to objects, fetchers, and pushers.
// If no resolver is provided, defaults to Docker registry resolver.
Resolver remotes.Resolver
// Unpack is done after an image is pulled to extract into a snapshotter.
// If an image is not unpacked on pull, it can be unpacked any time
// afterwards. Unpacking is required to run an image.
Unpack bool
// Snapshotter used for unpacking
Snapshotter string
// BaseHandlers are a set of handlers which get are called on dispatch.
// These handlers always get called before any operation specific
// handlers.
BaseHandlers []images.Handler
// ConvertSchema1 is whether to convert Docker registry schema 1
// manifests. If this option is false then any image which resolves
// to schema 1 will return an error since schema 1 is not supported.
ConvertSchema1 bool
}
func defaultRemoteContext() *RemoteContext {
return &RemoteContext{
Resolver: docker.NewResolver(docker.ResolverOptions{
Client: http.DefaultClient,
}),
Snapshotter: DefaultSnapshotter,
}
}
// Pull downloads the provided content into containerd's content store
func (c *Client) Pull(ctx context.Context, ref string, opts ...RemoteOpts) (Image, error) {
pullCtx := defaultRemoteContext()
for _, o := range opts {
if err := o(c, pullCtx); err != nil {
return nil, err
}
}
store := c.ContentStore()
name, desc, err := pullCtx.Resolver.Resolve(ctx, ref)
if err != nil {
return nil, err
}
fetcher, err := pullCtx.Resolver.Fetcher(ctx, name)
if err != nil {
return nil, err
}
var (
schema1Converter *schema1.Converter
handler images.Handler
)
if desc.MediaType == images.MediaTypeDockerSchema1Manifest && pullCtx.ConvertSchema1 {
schema1Converter = schema1.NewConverter(store, fetcher)
handler = images.Handlers(append(pullCtx.BaseHandlers, schema1Converter)...)
} else {
handler = images.Handlers(append(pullCtx.BaseHandlers,
remotes.FetchHandler(store, fetcher),
images.ChildrenHandler(store))...,
)
}
if err := images.Dispatch(ctx, handler, desc); err != nil {
return nil, err
}
if schema1Converter != nil {
desc, err = schema1Converter.Convert(ctx)
if err != nil {
return nil, err
}
}
imgrec := images.Image{
Name: name,
Target: desc,
}
is := c.ImageService()
if updated, err := is.Update(ctx, imgrec, "target"); err != nil {
if !errdefs.IsNotFound(err) {
return nil, err
}
created, err := is.Create(ctx, imgrec)
if err != nil {
return nil, err
}
imgrec = created
} else {
imgrec = updated
}
img := &image{
client: c,
i: imgrec,
}
if pullCtx.Unpack {
if err := img.Unpack(ctx, pullCtx.Snapshotter); err != nil {
return nil, err
}
}
return img, nil
}
// Push uploads the provided content to a remote resource
func (c *Client) Push(ctx context.Context, ref string, desc ocispec.Descriptor, opts ...RemoteOpts) error {
pushCtx := defaultRemoteContext()
for _, o := range opts {
if err := o(c, pushCtx); err != nil {
return err
}
}
pusher, err := pushCtx.Resolver.Pusher(ctx, ref)
if err != nil {
return err
}
var m sync.Mutex
manifestStack := []ocispec.Descriptor{}
filterHandler := images.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
switch desc.MediaType {
case images.MediaTypeDockerSchema2Manifest, ocispec.MediaTypeImageManifest,
images.MediaTypeDockerSchema2ManifestList, ocispec.MediaTypeImageIndex:
m.Lock()
manifestStack = append(manifestStack, desc)
m.Unlock()
return nil, images.StopHandler
default:
return nil, nil
}
})
cs := c.ContentStore()
pushHandler := remotes.PushHandler(cs, pusher)
handlers := append(pushCtx.BaseHandlers,
images.ChildrenHandler(cs),
filterHandler,
pushHandler,
)
if err := images.Dispatch(ctx, images.Handlers(handlers...), desc); err != nil {
return err
}
// Iterate in reverse order as seen, parent always uploaded after child
for i := len(manifestStack) - 1; i >= 0; i-- {
_, err := pushHandler(ctx, manifestStack[i])
if err != nil {
return err
}
}
return nil
}
// GetImage returns an existing image
func (c *Client) GetImage(ctx context.Context, ref string) (Image, error) {
i, err := c.ImageService().Get(ctx, ref)
if err != nil {
return nil, err
}
return &image{
client: c,
i: i,
}, nil
}
// ListImages returns all existing images
func (c *Client) ListImages(ctx context.Context) ([]Image, error) {
imgs, err := c.ImageService().List(ctx)
if err != nil {
return nil, err
}
images := make([]Image, len(imgs))
for i, img := range imgs {
images[i] = &image{
client: c,
i: img,
}
}
return images, nil
}
// Close closes the clients connection to containerd
func (c *Client) Close() error {
return c.conn.Close()
}
func (c *Client) NamespaceService() namespacesapi.NamespacesClient {
return namespacesapi.NewNamespacesClient(c.conn)
}
func (c *Client) ContainerService() containers.Store {
return NewRemoteContainerStore(containersapi.NewContainersClient(c.conn))
}
func (c *Client) ContentStore() content.Store {
return contentservice.NewStoreFromClient(contentapi.NewContentClient(c.conn))
}
func (c *Client) SnapshotService(snapshotterName string) snapshot.Snapshotter {
return snapshotservice.NewSnapshotterFromClient(snapshotapi.NewSnapshotsClient(c.conn), snapshotterName)
}
func (c *Client) TaskService() tasks.TasksClient {
return tasks.NewTasksClient(c.conn)
}
func (c *Client) ImageService() images.Store {
return imagesservice.NewStoreFromClient(imagesapi.NewImagesClient(c.conn))
}
func (c *Client) DiffService() diff.DiffService {
return diffservice.NewDiffServiceFromClient(diffapi.NewDiffClient(c.conn))
}
func (c *Client) HealthService() grpc_health_v1.HealthClient {
return grpc_health_v1.NewHealthClient(c.conn)
}
func (c *Client) EventService() eventsapi.EventsClient {
return eventsapi.NewEventsClient(c.conn)
}
func (c *Client) VersionService() versionservice.VersionClient {
return versionservice.NewVersionClient(c.conn)
}
// Version of containerd
type Version struct {
// Version number
Version string
// Revision from git that was built
Revision string
}
// Version returns the version of containerd that the client is connected to
func (c *Client) Version(ctx context.Context) (Version, error) {
response, err := c.VersionService().Version(ctx, &pempty.Empty{})
if err != nil {
return Version{}, err
}
return Version{
Version: response.Version,
Revision: response.Revision,
}, nil
}
type imageFormat string
const (
ociImageFormat imageFormat = "oci"
)
type importOpts struct {
format imageFormat
refObject string
}
// ImportOpt allows the caller to specify import specific options
type ImportOpt func(c *importOpts) error
// WithOCIImportFormat sets the import format for an OCI image format
func WithOCIImportFormat() ImportOpt {
return func(c *importOpts) error {
if c.format != "" {
return errors.New("format already set")
}
c.format = ociImageFormat
return nil
}
}
// WithRefObject specifies the ref object to import.
// If refObject is empty, it is copied from the ref argument of Import().
func WithRefObject(refObject string) ImportOpt {
return func(c *importOpts) error {
c.refObject = refObject
return nil
}
}
func resolveImportOpt(ref string, opts ...ImportOpt) (importOpts, error) {
var iopts importOpts
for _, o := range opts {
if err := o(&iopts); err != nil {
return iopts, err
}
}
// use OCI as the default format
if iopts.format == "" {
iopts.format = ociImageFormat
}
// if refObject is not explicitly specified, use the one specified in ref
if iopts.refObject == "" {
refSpec, err := reference.Parse(ref)
if err != nil {
return iopts, err
}
iopts.refObject = refSpec.Object
}
return iopts, nil
}
// Import imports an image from a Tar stream using reader.
// OCI format is assumed by default.
//
// Note that unreferenced blobs are imported to the content store as well.
func (c *Client) Import(ctx context.Context, ref string, reader io.Reader, opts ...ImportOpt) (Image, error) {
iopts, err := resolveImportOpt(ref, opts...)
if err != nil {
return nil, err
}
switch iopts.format {
case ociImageFormat:
return c.importFromOCITar(ctx, ref, reader, iopts)
default:
return nil, errors.Errorf("unsupported format: %s", iopts.format)
}
}
type exportOpts struct {
format imageFormat
}
// ExportOpt allows callers to set export options
type ExportOpt func(c *exportOpts) error
// WithOCIExportFormat sets the OCI image format as the export target
func WithOCIExportFormat() ExportOpt {
return func(c *exportOpts) error {
if c.format != "" {
return errors.New("format already set")
}
c.format = ociImageFormat
return nil
}
}
// TODO: add WithMediaTypeTranslation that transforms media types according to the format.
// e.g. application/vnd.docker.image.rootfs.diff.tar.gzip
// -> application/vnd.oci.image.layer.v1.tar+gzip
// Export exports an image to a Tar stream.
// OCI format is used by default.
// It is up to caller to put "org.opencontainers.image.ref.name" annotation to desc.
func (c *Client) Export(ctx context.Context, desc ocispec.Descriptor, opts ...ExportOpt) (io.ReadCloser, error) {
var eopts exportOpts
for _, o := range opts {
if err := o(&eopts); err != nil {
return nil, err
}
}
// use OCI as the default format
if eopts.format == "" {
eopts.format = ociImageFormat
}
pr, pw := io.Pipe()
switch eopts.format {
case ociImageFormat:
go func() {
pw.CloseWithError(c.exportToOCITar(ctx, desc, pw, eopts))
}()
default:
return nil, errors.Errorf("unsupported format: %s", eopts.format)
}
return pr, nil
}