diff --git a/client.go b/client.go index eefeb4e94..3613c99d3 100644 --- a/client.go +++ b/client.go @@ -42,7 +42,6 @@ import ( "github.com/containerd/containerd/content" contentproxy "github.com/containerd/containerd/content/proxy" "github.com/containerd/containerd/defaults" - "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/events" "github.com/containerd/containerd/images" "github.com/containerd/containerd/leases" @@ -53,7 +52,6 @@ import ( "github.com/containerd/containerd/plugin" "github.com/containerd/containerd/remotes" "github.com/containerd/containerd/remotes/docker" - "github.com/containerd/containerd/remotes/docker/schema1" "github.com/containerd/containerd/snapshots" snproxy "github.com/containerd/containerd/snapshots/proxy" "github.com/containerd/typeurl" @@ -61,7 +59,6 @@ import ( ocispec "github.com/opencontainers/image-spec/specs-go/v1" specs "github.com/opencontainers/runtime-spec/specs-go" "github.com/pkg/errors" - "golang.org/x/sync/semaphore" "google.golang.org/grpc" "google.golang.org/grpc/health/grpc_health_v1" ) @@ -283,6 +280,12 @@ type RemoteContext struct { // handlers. BaseHandlers []images.Handler + // HandlerWrapper wraps the handler which gets sent to dispatch. + // Unlike BaseHandlers, this can run before and after the built + // in handlers, allowing operations to run on the descriptor + // after it has completed transferring. + HandlerWrapper func(images.Handler) images.Handler + // ConvertSchema1 is whether to convert Docker registry schema 1 // manifests. If this option is false then any image which resolves // to schema 1 will return an error since schema 1 is not supported. @@ -347,161 +350,6 @@ func (c *Client) Fetch(ctx context.Context, ref string, opts ...RemoteOpt) (imag return c.fetch(ctx, fetchCtx, ref, 0) } -// Pull downloads the provided content into containerd's content store -// and returns a platform specific image object -func (c *Client) Pull(ctx context.Context, ref string, opts ...RemoteOpt) (Image, error) { - pullCtx := defaultRemoteContext() - for _, o := range opts { - if err := o(c, pullCtx); err != nil { - return nil, err - } - } - - if pullCtx.PlatformMatcher == nil { - if len(pullCtx.Platforms) > 1 { - return nil, errors.New("cannot pull multiplatform image locally, try Fetch") - } else if len(pullCtx.Platforms) == 0 { - pullCtx.PlatformMatcher = platforms.Default() - } else { - p, err := platforms.Parse(pullCtx.Platforms[0]) - if err != nil { - return nil, errors.Wrapf(err, "invalid platform %s", pullCtx.Platforms[0]) - } - - pullCtx.PlatformMatcher = platforms.Only(p) - } - } - - ctx, done, err := c.WithLease(ctx) - if err != nil { - return nil, err - } - defer done(ctx) - - img, err := c.fetch(ctx, pullCtx, ref, 1) - if err != nil { - return nil, err - } - - i := NewImageWithPlatform(c, img, pullCtx.PlatformMatcher) - - if pullCtx.Unpack { - if err := i.Unpack(ctx, pullCtx.Snapshotter); err != nil { - return nil, errors.Wrapf(err, "failed to unpack image on snapshotter %s", pullCtx.Snapshotter) - } - } - - return i, nil -} - -func (c *Client) fetch(ctx context.Context, rCtx *RemoteContext, ref string, limit int) (images.Image, error) { - store := c.ContentStore() - name, desc, err := rCtx.Resolver.Resolve(ctx, ref) - if err != nil { - return images.Image{}, errors.Wrapf(err, "failed to resolve reference %q", ref) - } - - fetcher, err := rCtx.Resolver.Fetcher(ctx, name) - if err != nil { - return images.Image{}, errors.Wrapf(err, "failed to get fetcher for %q", name) - } - - var ( - handler images.Handler - - isConvertible bool - converterFunc func(context.Context, ocispec.Descriptor) (ocispec.Descriptor, error) - limiter *semaphore.Weighted - ) - - if desc.MediaType == images.MediaTypeDockerSchema1Manifest && rCtx.ConvertSchema1 { - schema1Converter := schema1.NewConverter(store, fetcher) - - handler = images.Handlers(append(rCtx.BaseHandlers, schema1Converter)...) - - isConvertible = true - - converterFunc = func(ctx context.Context, _ ocispec.Descriptor) (ocispec.Descriptor, error) { - return schema1Converter.Convert(ctx) - } - } else { - // Get all the children for a descriptor - childrenHandler := images.ChildrenHandler(store) - // Set any children labels for that content - childrenHandler = images.SetChildrenLabels(store, childrenHandler) - // Filter children by platforms - childrenHandler = images.FilterPlatforms(childrenHandler, rCtx.PlatformMatcher) - // Sort and limit manifests if a finite number is needed - if limit > 0 { - childrenHandler = images.LimitManifests(childrenHandler, rCtx.PlatformMatcher, limit) - } - - // set isConvertible to true if there is application/octet-stream media type - convertibleHandler := images.HandlerFunc( - func(_ context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) { - if desc.MediaType == docker.LegacyConfigMediaType { - isConvertible = true - } - - return []ocispec.Descriptor{}, nil - }, - ) - - handler = images.Handlers(append(rCtx.BaseHandlers, - remotes.FetchHandler(store, fetcher), - convertibleHandler, - childrenHandler, - )...) - - converterFunc = func(ctx context.Context, desc ocispec.Descriptor) (ocispec.Descriptor, error) { - return docker.ConvertManifest(ctx, store, desc) - } - } - - if rCtx.MaxConcurrentDownloads > 0 { - limiter = semaphore.NewWeighted(int64(rCtx.MaxConcurrentDownloads)) - } - if err := images.Dispatch(ctx, handler, limiter, desc); err != nil { - return images.Image{}, err - } - - if isConvertible { - if desc, err = converterFunc(ctx, desc); err != nil { - return images.Image{}, err - } - } - - img := images.Image{ - Name: name, - Target: desc, - Labels: rCtx.Labels, - } - - is := c.ImageService() - for { - if created, err := is.Create(ctx, img); err != nil { - if !errdefs.IsAlreadyExists(err) { - return images.Image{}, err - } - - updated, err := is.Update(ctx, img) - if err != nil { - // if image was removed, try create again - if errdefs.IsNotFound(err) { - continue - } - return images.Image{}, err - } - - img = updated - } else { - img = created - } - - return img, nil - } -} - // Push uploads the provided content to a remote resource func (c *Client) Push(ctx context.Context, ref string, desc ocispec.Descriptor, opts ...RemoteOpt) error { pushCtx := defaultRemoteContext() @@ -531,7 +379,21 @@ func (c *Client) Push(ctx context.Context, ref string, desc ocispec.Descriptor, return err } - return remotes.PushContent(ctx, pusher, desc, c.ContentStore(), pushCtx.PlatformMatcher, pushCtx.BaseHandlers...) + var wrapper func(images.Handler) images.Handler + + if len(pushCtx.BaseHandlers) > 0 { + wrapper = func(h images.Handler) images.Handler { + h = images.Handlers(append(pushCtx.BaseHandlers, h)...) + if pushCtx.HandlerWrapper != nil { + h = pushCtx.HandlerWrapper(h) + } + return h + } + } else if pushCtx.HandlerWrapper != nil { + wrapper = pushCtx.HandlerWrapper + } + + return remotes.PushContent(ctx, pusher, desc, c.ContentStore(), pushCtx.PlatformMatcher, wrapper) } // GetImage returns an existing image diff --git a/client_opts.go b/client_opts.go index cb3792fb3..669829751 100644 --- a/client_opts.go +++ b/client_opts.go @@ -179,6 +179,14 @@ func WithImageHandler(h images.Handler) RemoteOpt { } } +// WithImageHandlerWrapper wraps the handlers to be called on dispatch. +func WithImageHandlerWrapper(w func(images.Handler) images.Handler) RemoteOpt { + return func(client *Client, c *RemoteContext) error { + c.HandlerWrapper = w + return nil + } +} + // WithMaxConcurrentDownloads sets max concurrent download limit. func WithMaxConcurrentDownloads(max int) RemoteOpt { return func(client *Client, c *RemoteContext) error { diff --git a/pull.go b/pull.go new file mode 100644 index 000000000..f938ccc3b --- /dev/null +++ b/pull.go @@ -0,0 +1,190 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package containerd + +import ( + "context" + + "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/images" + "github.com/containerd/containerd/platforms" + "github.com/containerd/containerd/remotes" + "github.com/containerd/containerd/remotes/docker" + "github.com/containerd/containerd/remotes/docker/schema1" + ocispec "github.com/opencontainers/image-spec/specs-go/v1" + "github.com/pkg/errors" + "golang.org/x/sync/semaphore" +) + +// Pull downloads the provided content into containerd's content store +// and returns a platform specific image object +func (c *Client) Pull(ctx context.Context, ref string, opts ...RemoteOpt) (Image, error) { + pullCtx := defaultRemoteContext() + for _, o := range opts { + if err := o(c, pullCtx); err != nil { + return nil, err + } + } + + if pullCtx.PlatformMatcher == nil { + if len(pullCtx.Platforms) > 1 { + return nil, errors.New("cannot pull multiplatform image locally, try Fetch") + } else if len(pullCtx.Platforms) == 0 { + pullCtx.PlatformMatcher = platforms.Default() + } else { + p, err := platforms.Parse(pullCtx.Platforms[0]) + if err != nil { + return nil, errors.Wrapf(err, "invalid platform %s", pullCtx.Platforms[0]) + } + + pullCtx.PlatformMatcher = platforms.Only(p) + } + } + + ctx, done, err := c.WithLease(ctx) + if err != nil { + return nil, err + } + defer done(ctx) + + img, err := c.fetch(ctx, pullCtx, ref, 1) + if err != nil { + return nil, err + } + + i := NewImageWithPlatform(c, img, pullCtx.PlatformMatcher) + + if pullCtx.Unpack { + if err := i.Unpack(ctx, pullCtx.Snapshotter); err != nil { + return nil, errors.Wrapf(err, "failed to unpack image on snapshotter %s", pullCtx.Snapshotter) + } + } + + return i, nil +} + +func (c *Client) fetch(ctx context.Context, rCtx *RemoteContext, ref string, limit int) (images.Image, error) { + store := c.ContentStore() + name, desc, err := rCtx.Resolver.Resolve(ctx, ref) + if err != nil { + return images.Image{}, errors.Wrapf(err, "failed to resolve reference %q", ref) + } + + fetcher, err := rCtx.Resolver.Fetcher(ctx, name) + if err != nil { + return images.Image{}, errors.Wrapf(err, "failed to get fetcher for %q", name) + } + + var ( + handler images.Handler + + isConvertible bool + converterFunc func(context.Context, ocispec.Descriptor) (ocispec.Descriptor, error) + limiter *semaphore.Weighted + ) + + if desc.MediaType == images.MediaTypeDockerSchema1Manifest && rCtx.ConvertSchema1 { + schema1Converter := schema1.NewConverter(store, fetcher) + + handler = images.Handlers(append(rCtx.BaseHandlers, schema1Converter)...) + + isConvertible = true + + converterFunc = func(ctx context.Context, _ ocispec.Descriptor) (ocispec.Descriptor, error) { + return schema1Converter.Convert(ctx) + } + } else { + // Get all the children for a descriptor + childrenHandler := images.ChildrenHandler(store) + // Set any children labels for that content + childrenHandler = images.SetChildrenLabels(store, childrenHandler) + // Filter children by platforms + childrenHandler = images.FilterPlatforms(childrenHandler, rCtx.PlatformMatcher) + // Sort and limit manifests if a finite number is needed + if limit > 0 { + childrenHandler = images.LimitManifests(childrenHandler, rCtx.PlatformMatcher, limit) + } + + // set isConvertible to true if there is application/octet-stream media type + convertibleHandler := images.HandlerFunc( + func(_ context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) { + if desc.MediaType == docker.LegacyConfigMediaType { + isConvertible = true + } + + return []ocispec.Descriptor{}, nil + }, + ) + + handler = images.Handlers(append(rCtx.BaseHandlers, + remotes.FetchHandler(store, fetcher), + convertibleHandler, + childrenHandler, + )...) + + converterFunc = func(ctx context.Context, desc ocispec.Descriptor) (ocispec.Descriptor, error) { + return docker.ConvertManifest(ctx, store, desc) + } + } + + if rCtx.HandlerWrapper != nil { + handler = rCtx.HandlerWrapper(handler) + } + + if rCtx.MaxConcurrentDownloads > 0 { + limiter = semaphore.NewWeighted(int64(rCtx.MaxConcurrentDownloads)) + } + if err := images.Dispatch(ctx, handler, limiter, desc); err != nil { + return images.Image{}, err + } + + if isConvertible { + if desc, err = converterFunc(ctx, desc); err != nil { + return images.Image{}, err + } + } + + img := images.Image{ + Name: name, + Target: desc, + Labels: rCtx.Labels, + } + + is := c.ImageService() + for { + if created, err := is.Create(ctx, img); err != nil { + if !errdefs.IsAlreadyExists(err) { + return images.Image{}, err + } + + updated, err := is.Update(ctx, img) + if err != nil { + // if image was removed, try create again + if errdefs.IsNotFound(err) { + continue + } + return images.Image{}, err + } + + img = updated + } else { + img = created + } + + return img, nil + } +} diff --git a/remotes/handlers.go b/remotes/handlers.go index 86922de99..56d4c5081 100644 --- a/remotes/handlers.go +++ b/remotes/handlers.go @@ -156,7 +156,7 @@ func push(ctx context.Context, provider content.Provider, pusher Pusher, desc oc // // Base handlers can be provided which will be called before any push specific // handlers. -func PushContent(ctx context.Context, pusher Pusher, desc ocispec.Descriptor, provider content.Provider, platform platforms.MatchComparer, baseHandlers ...images.Handler) error { +func PushContent(ctx context.Context, pusher Pusher, desc ocispec.Descriptor, provider content.Provider, platform platforms.MatchComparer, wrapper func(h images.Handler) images.Handler) error { var m sync.Mutex manifestStack := []ocispec.Descriptor{} @@ -175,13 +175,16 @@ func PushContent(ctx context.Context, pusher Pusher, desc ocispec.Descriptor, pr pushHandler := PushHandler(pusher, provider) - handlers := append(baseHandlers, + var handler images.Handler = images.Handlers( images.FilterPlatforms(images.ChildrenHandler(provider), platform), filterHandler, pushHandler, ) + if wrapper != nil { + handler = wrapper(handler) + } - if err := images.Dispatch(ctx, images.Handlers(handlers...), nil, desc); err != nil { + if err := images.Dispatch(ctx, handler, nil, desc); err != nil { return err }