diff --git a/cmd/containerd/builtins/builtins.go b/cmd/containerd/builtins/builtins.go index 81e508603..bb64c1904 100644 --- a/cmd/containerd/builtins/builtins.go +++ b/cmd/containerd/builtins/builtins.go @@ -25,6 +25,7 @@ import ( _ "github.com/containerd/containerd/metadata/plugin" _ "github.com/containerd/containerd/pkg/nri/plugin" _ "github.com/containerd/containerd/plugins/streaming" + _ "github.com/containerd/containerd/plugins/transfer" _ "github.com/containerd/containerd/runtime/restart/monitor" _ "github.com/containerd/containerd/runtime/v2" _ "github.com/containerd/containerd/services/containers" @@ -41,5 +42,6 @@ import ( _ "github.com/containerd/containerd/services/snapshots" _ "github.com/containerd/containerd/services/streaming" _ "github.com/containerd/containerd/services/tasks" + _ "github.com/containerd/containerd/services/transfer" _ "github.com/containerd/containerd/services/version" ) diff --git a/pkg/transfer/image/local.go b/pkg/transfer/image/local.go new file mode 100644 index 000000000..699b5565c --- /dev/null +++ b/pkg/transfer/image/local.go @@ -0,0 +1,249 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package transfer + +import ( + "context" + "fmt" + + "github.com/containerd/containerd/api/types/transfer" + "github.com/containerd/containerd/content" + "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/images" + "github.com/containerd/containerd/pkg/streaming" + "github.com/containerd/containerd/pkg/unpack" + "github.com/containerd/containerd/platforms" + "github.com/containerd/containerd/remotes" + "github.com/containerd/typeurl" + ocispec "github.com/opencontainers/image-spec/specs-go/v1" +) + +// TODO: Should a factory be exposed here as a service?? +func NewOCIRegistryFromProto(p *transfer.OCIRegistry, resolver remotes.Resolver, sm streaming.StreamManager) *OCIRegistry { + //transfer.OCIRegistry + // Create resolver + // Convert auth stream to credential manager + return &OCIRegistry{ + reference: p.Reference, + resolver: resolver, + streams: sm, + } +} + +func NewOCIRegistry(ref string, resolver remotes.Resolver, sm streaming.StreamManager) *OCIRegistry { + // With options, stream, + // With streams? + return &OCIRegistry{ + reference: ref, + resolver: resolver, + streams: sm, + } +} + +// OCI +type OCIRegistry struct { + reference string + + resolver remotes.Resolver + streams streaming.StreamManager + + // This could be an interface which returns resolver? + // Resolver could also be a plug-able interface, to call out to a program to fetch? +} + +func (r *OCIRegistry) String() string { + return fmt.Sprintf("OCI Registry (%s)", r.reference) +} + +func (r *OCIRegistry) Image() string { + return r.reference +} + +func (r *OCIRegistry) Resolver() remotes.Resolver { + return r.resolver +} + +func (r *OCIRegistry) ToProto() typeurl.Any { + // Might need more context to convert to proto + // Need access to a stream manager + // Service provider + return nil +} + +type ImageStore struct { + // TODO: Put these configurations in object which can convert to/from any + // Embed generated type + imageName string + imageLabels map[string]string + platforms platforms.MatchComparer + allMetadata bool + labelMap func(ocispec.Descriptor) []string + manifestLimit int + + images images.Store + content content.Store + + // TODO: Convert these to unpack platforms + unpacks []unpack.Platform +} + +func NewImageStore(image string) *ImageStore { + return &ImageStore{ + imageName: image, + } +} + +func (is *ImageStore) String() string { + return fmt.Sprintf("Local Image Store (%s)", is.imageName) +} + +func (is *ImageStore) FilterHandler(h images.HandlerFunc) images.HandlerFunc { + h = images.SetChildrenMappedLabels(is.content, h, is.labelMap) + if is.allMetadata { + // Filter manifests by platforms but allow to handle manifest + // and configuration for not-target platforms + h = remotes.FilterManifestByPlatformHandler(h, is.platforms) + } else { + // Filter children by platforms if specified. + h = images.FilterPlatforms(h, is.platforms) + } + + // Sort and limit manifests if a finite number is needed + if is.manifestLimit > 0 { + h = images.LimitManifests(h, is.platforms, is.manifestLimit) + } + return h +} + +func (is *ImageStore) Store(ctx context.Context, desc ocispec.Descriptor) (images.Image, error) { + img := images.Image{ + Name: is.imageName, + Target: desc, + Labels: is.imageLabels, + } + + for { + if created, err := is.images.Create(ctx, img); err != nil { + if !errdefs.IsAlreadyExists(err) { + return images.Image{}, err + } + + updated, err := is.images.Update(ctx, img) + if err != nil { + // if image was removed, try create again + if errdefs.IsNotFound(err) { + continue + } + return images.Image{}, err + } + + img = updated + } else { + img = created + } + + return img, nil + } +} + +func (is *ImageStore) UnpackPlatforms() []unpack.Platform { + return is.unpacks +} + +/* +type RemoteContext struct { + // Resolver is used to resolve names to objects, fetchers, and pushers. + // If no resolver is provided, defaults to Docker registry resolver. + Resolver remotes.Resolver + + // PlatformMatcher is used to match the platforms for an image + // operation and define the preference when a single match is required + // from multiple platforms. + PlatformMatcher platforms.MatchComparer + + // Unpack is done after an image is pulled to extract into a snapshotter. + // It is done simultaneously for schema 2 images when they are pulled. + // If an image is not unpacked on pull, it can be unpacked any time + // afterwards. Unpacking is required to run an image. + Unpack bool + + // UnpackOpts handles options to the unpack call. + UnpackOpts []UnpackOpt + + // Snapshotter used for unpacking + Snapshotter string + + // SnapshotterOpts are additional options to be passed to a snapshotter during pull + SnapshotterOpts []snapshots.Opt + + // Labels to be applied to the created image + Labels map[string]string + + // BaseHandlers are a set of handlers which get are called on dispatch. + // These handlers always get called before any operation specific + // handlers. + BaseHandlers []images.Handler + + // HandlerWrapper wraps the handler which gets sent to dispatch. + // Unlike BaseHandlers, this can run before and after the built + // in handlers, allowing operations to run on the descriptor + // after it has completed transferring. + HandlerWrapper func(images.Handler) images.Handler + + // Platforms defines which platforms to handle when doing the image operation. + // Platforms is ignored when a PlatformMatcher is set, otherwise the + // platforms will be used to create a PlatformMatcher with no ordering + // preference. + Platforms []string + + // MaxConcurrentDownloads is the max concurrent content downloads for each pull. + MaxConcurrentDownloads int + + // MaxConcurrentUploadedLayers is the max concurrent uploaded layers for each push. + MaxConcurrentUploadedLayers int + + // AllMetadata downloads all manifests and known-configuration files + AllMetadata bool + + // ChildLabelMap sets the labels used to reference child objects in the content + // store. By default, all GC reference labels will be set for all fetched content. + ChildLabelMap func(ocispec.Descriptor) []string +} +*/ +/* +// What should streamhandler look like? +type StreamHandler interface { + Authorize() error + Progress(key string, int64) +} + +// Distribution options +// Stream handler +// Progress rate +// Unpack options +// Remote options +// Cases: +// Registry -> Content/ImageStore (pull) +// Registry -> Registry +// Content/ImageStore -> Registry (push) +// Content/ImageStore -> Content/ImageStore (tag) +// Common fetch/push interface for registry, content/imagestore, OCI index +// Always starts with string for source and destination, on client side, does not need to resolve +// Higher level implementation just takes strings and options +// Lower level implementation takes pusher/fetcher? + +*/ diff --git a/pkg/transfer/local/transfer.go b/pkg/transfer/local/transfer.go new file mode 100644 index 000000000..2a8fd9e14 --- /dev/null +++ b/pkg/transfer/local/transfer.go @@ -0,0 +1,233 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package local + +import ( + "context" + "fmt" + + "github.com/containerd/containerd/content" + "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/images" + "github.com/containerd/containerd/leases" + "github.com/containerd/containerd/pkg/transfer" + "github.com/containerd/containerd/pkg/unpack" + "github.com/containerd/containerd/remotes" + "github.com/containerd/containerd/remotes/docker" + "github.com/containerd/typeurl" + ocispec "github.com/opencontainers/image-spec/specs-go/v1" + "golang.org/x/sync/semaphore" +) + +type localTransferService struct { + leases leases.Manager + content content.Store + + // semaphore.NewWeighted(int64(rCtx.MaxConcurrentDownloads)) + limiter *semaphore.Weighted + + // TODO: Duplication suppressor + // Metadata service (Or snapshotters, image, content) + // Diff + + // Configuration + // - Max downloads + // - Max uploads + + // Supported platforms + // - Platform -> snapshotter defaults? +} + +func NewTransferService(lm leases.Manager, cs content.Store) transfer.Transferer { + return &localTransferService{ + leases: lm, + content: cs, + } +} + +func (ts *localTransferService) Transfer(ctx context.Context, src interface{}, dest interface{}, opts ...transfer.Opt) error { + topts := &transfer.TransferOpts{} + for _, opt := range opts { + opt(topts) + } + + // Convert Any to real source/destination + + // Figure out matrix of whether source destination combination is supported + switch s := src.(type) { + case transfer.ImageResolver: + switch d := dest.(type) { + case transfer.ImageStorer: + return ts.pull(ctx, s, d, topts) + } + } + return fmt.Errorf("Unable to transfer from %s to %s: %w", name(src), name(dest), errdefs.ErrNotImplemented) +} + +func name(t interface{}) string { + switch s := t.(type) { + case fmt.Stringer: + return s.String() + case typeurl.Any: + return s.GetTypeUrl() + default: + return fmt.Sprintf("%T", t) + } +} + +func (ts *localTransferService) pull(ctx context.Context, ir transfer.ImageResolver, is transfer.ImageStorer, tops *transfer.TransferOpts) error { + // TODO: Attach lease if doesn't have one + + // From source, need + // - resolver + // - image name + + // From destination, need + // - Image labels + // - Unpack information + // - Platform to Snapshotter + // - Child label map + // - All metdata? + + name, desc, err := ir.Resolve(ctx) + if err != nil { + return fmt.Errorf("failed to resolve image: %w", err) + } + if desc.MediaType == images.MediaTypeDockerSchema1Manifest { + // Explicitly call out schema 1 as deprecated and not supported + return fmt.Errorf("schema 1 image manifests are no longer supported: %w", errdefs.ErrInvalidArgument) + } + + fetcher, err := ir.Fetcher(ctx, name) + if err != nil { + return fmt.Errorf("failed to get fetcher for %q: %w", name, err) + } + + var ( + handler images.Handler + + unpacker *unpack.Unpacker + + // has a config media type bug (distribution#1622) + hasMediaTypeBug1622 bool + + store = ts.content + ) + //func (is *ImageStore) FilterHandler(h images.HandlerFunc) images.HandlerFunc { + //func (is *ImageStore) Store(ctx context.Context, desc ocispec.Descriptor) (images.Image, error) { + + // Get all the children for a descriptor + childrenHandler := images.ChildrenHandler(store) + + if f, ok := is.(transfer.ImageFilterer); ok { + childrenHandler = f.ImageFilter(childrenHandler) + } + + // Sort and limit manifests if a finite number is needed + //if limit > 0 { + // childrenHandler = images.LimitManifests(childrenHandler, rCtx.PlatformMatcher, limit) + //} + + checkNeedsFix := images.HandlerFunc( + func(_ context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) { + // set to true if there is application/octet-stream media type + if desc.MediaType == docker.LegacyConfigMediaType { + hasMediaTypeBug1622 = true + } + + return []ocispec.Descriptor{}, nil + }, + ) + + appendDistSrcLabelHandler, err := docker.AppendDistributionSourceLabel(store, name) + if err != nil { + return err + } + + // TODO: Support set of base handlers from configuration or image store + // Progress handlers? + handlers := []images.Handler{ + remotes.FetchHandler(store, fetcher), + checkNeedsFix, + childrenHandler, + appendDistSrcLabelHandler, + } + + handler = images.Handlers(handlers...) + + // TODO: Should available platforms be a configuration of the service? + // First find suitable platforms to unpack into + //if unpacker, ok := is. + if iu, ok := is.(transfer.ImageUnpacker); ok { + unpacks := iu.UnpackPlatforms() + if len(unpacks) > 0 { + uopts := []unpack.UnpackerOpt{} + for _, u := range unpacks { + uopts = append(uopts, unpack.WithUnpackPlatform(u)) + } + if ts.limiter != nil { + uopts = append(uopts, unpack.WithLimiter(ts.limiter)) + } + //if uconfig.DuplicationSuppressor != nil { + // uopts = append(uopts, unpack.WithDuplicationSuppressor(uconfig.DuplicationSuppressor)) + //} + unpacker, err = unpack.NewUnpacker(ctx, ts.content, uopts...) + if err != nil { + return fmt.Errorf("unable to initialize unpacker: %w", err) + } + defer func() { + // TODO: This needs to be tigher scoped... + if _, err := unpacker.Wait(); err != nil { + //if retErr == nil { + // retErr = fmt.Errorf("unpack: %w", err) + //} + } + }() + handler = unpacker.Unpack(handler) + } + } + + if err := images.Dispatch(ctx, handler, ts.limiter, desc); err != nil { + // TODO: Cancel unpack and wait? + return err + } + + // NOTE(fuweid): unpacker defers blobs download. before create image + // record in ImageService, should wait for unpacking(including blobs + // download). + if unpacker != nil { + if _, err = unpacker.Wait(); err != nil { + return err + } + // TODO: Check results to make sure unpack was successful + } + + if hasMediaTypeBug1622 { + if desc, err = docker.ConvertManifest(ctx, store, desc); err != nil { + return err + } + } + + _, err = is.Store(ctx, desc) + if err != nil { + return err + } + + // TODO: Send status update for created image + + return nil +} diff --git a/pkg/transfer/transfer.go b/pkg/transfer/transfer.go new file mode 100644 index 000000000..aa5fcdda2 --- /dev/null +++ b/pkg/transfer/transfer.go @@ -0,0 +1,89 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package transfer + +import ( + "context" + "io" + + "github.com/containerd/containerd/images" + "github.com/containerd/containerd/pkg/unpack" + ocispec "github.com/opencontainers/image-spec/specs-go/v1" +) + +type Transferer interface { + Transfer(context.Context, interface{}, interface{}, ...Opt) error +} + +type ImageResolver interface { + Resolve(ctx context.Context) (name string, desc ocispec.Descriptor, err error) + + Fetcher(ctx context.Context, ref string) (Fetcher, error) +} + +type Fetcher interface { + Fetch(ctx context.Context, desc ocispec.Descriptor) (io.ReadCloser, error) +} + +// ImageFilterer is used to filter out child objects of an image +type ImageFilterer interface { + ImageFilter(images.HandlerFunc) images.HandlerFunc +} + +type ImageStorer interface { + Store(context.Context, ocispec.Descriptor) (images.Image, error) +} + +type ImageUnpacker interface { + // TODO: Or unpack options? + UnpackPlatforms() []unpack.Platform +} + +type TransferOpts struct { +} + +type Opt func(*TransferOpts) + +func WithProgress() Opt { + return nil +} + +type Progress struct { + Event string + Name string + Digest string + Progress int64 + Total int64 +} + +/* +// Distribution options +// Stream handler +// Progress rate +// Unpack options +// Remote options +// Cases: +// Registry -> Content/ImageStore (pull) +// Registry -> Registry +// Content/ImageStore -> Registry (push) +// Content/ImageStore -> Content/ImageStore (tag) +// Common fetch/push interface for registry, content/imagestore, OCI index +// Always starts with string for source and destination, on client side, does not need to resolve +// Higher level implementation just takes strings and options +// Lower level implementation takes pusher/fetcher? + +*/ diff --git a/plugin/plugin.go b/plugin/plugin.go index 1883d97d9..80dd2a263 100644 --- a/plugin/plugin.go +++ b/plugin/plugin.go @@ -84,6 +84,8 @@ const ( TracingProcessorPlugin Type = "io.containerd.tracing.processor.v1" // NRIApiPlugin implements the NRI adaptation interface for containerd. NRIApiPlugin Type = "io.containerd.nri.v1" + // TransferPlugin implements a transfer service + TransferPlugin Type = "io.containerd.transfer.v1" ) const ( diff --git a/plugins/transfer/plugin.go b/plugins/transfer/plugin.go new file mode 100644 index 000000000..1b9002dcb --- /dev/null +++ b/plugins/transfer/plugin.go @@ -0,0 +1,330 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package transfer + +import ( + "context" + "fmt" + + ttypes "github.com/containerd/containerd/api/types/transfer" + "github.com/containerd/containerd/content" + "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/images" + "github.com/containerd/containerd/leases" + "github.com/containerd/containerd/metadata" + "github.com/containerd/containerd/pkg/transfer" + "github.com/containerd/containerd/pkg/unpack" + "github.com/containerd/containerd/plugin" + "github.com/containerd/containerd/remotes" + "github.com/containerd/containerd/remotes/docker" + "github.com/containerd/typeurl" + ocispec "github.com/opencontainers/image-spec/specs-go/v1" + "golang.org/x/sync/semaphore" +) + +func init() { + + plugin.Register(&plugin.Registration{ + Type: plugin.TransferPlugin, + ID: "image", + Requires: []plugin.Type{ + plugin.LeasePlugin, + plugin.MetadataPlugin, + }, + Config: &transferConfig{}, + InitFn: func(ic *plugin.InitContext) (interface{}, error) { + m, err := ic.Get(plugin.MetadataPlugin) + if err != nil { + return nil, err + } + ms := m.(*metadata.DB) + l, err := ic.Get(plugin.LeasePlugin) + if err != nil { + return nil, err + } + + // Map to url instance handler (typeurl.Any) interface{} + + return &localTransferService{ + leases: l.(leases.Manager), + content: ms.ContentStore(), + conversions: map[string]func(typeurl.Any) (interface{}, error){}, + // // semaphore.NewWeighted(int64(rCtx.MaxConcurrentDownloads)) + // limiter *semaphore.Weighted + }, nil + }, + }) + +} + +type transferConfig struct { + // Max concurrent downloads + // Snapshotter platforms +} + +// TODO: Move this to a local package with constructor arguments...? +type localTransferService struct { + leases leases.Manager + content content.Store + + // semaphore.NewWeighted(int64(rCtx.MaxConcurrentDownloads)) + limiter *semaphore.Weighted + + conversions map[string]func(typeurl.Any) (interface{}, error) + + // TODO: Duplication suppressor + // Metadata service (Or snapshotters, image, content) + // Diff + + // Configuration + // - Max downloads + // - Max uploads + + // Supported platforms + // - Platform -> snapshotter defaults? + + // Type Resolver, support registration... For Any Type URL -> Constructor +} + +// populatedConversions is used to map the typeurls to instance converstion functions, +// since the typeurls are derived from instances rather than types, they are +// calculated at runtime and populate the converstion map. +// Static mapping or offloading conversion to another plugin is probably a more +// ideal long term solution. +func (ts *localTransferService) populateConversions() error { + for _, c := range []struct { + instance interface{} + conversion func(typeurl.Any) (interface{}, error) + }{ + {ttypes.ImageStoreDestination{}, ts.convertImageStoreDestination}, + {ttypes.OCIRegistry{}, ts.convertOCIRegistry}, + } { + u, err := typeurl.TypeURL(c.instance) + if err != nil { + return fmt.Errorf("unable to get type %T: %w", c.instance, err) + } + if _, ok := ts.conversions[u]; ok { + return fmt.Errorf("duplicate typeurl mapping: %s for %T", u, c.instance) + } + } + + return nil +} + +func (ts *localTransferService) convertImageStoreDestination(a typeurl.Any) (interface{}, error) { + var dest ttypes.ImageStoreDestination + if err := typeurl.UnmarshalTo(a, &dest); err != nil { + return nil, err + } + return nil, nil +} + +func (ts *localTransferService) convertOCIRegistry(a typeurl.Any) (interface{}, error) { + var dest ttypes.OCIRegistry + if err := typeurl.UnmarshalTo(a, &dest); err != nil { + return nil, err + } + // TODO: Create credential callback + + return nil, nil +} + +func (ts *localTransferService) resolveType(a typeurl.Any) (interface{}, error) { + c, ok := ts.conversions[a.GetTypeUrl()] + if !ok { + return nil, fmt.Errorf("type %q not supported: %w", a.GetTypeUrl(), errdefs.ErrNotImplemented) + } + return c(a) +} + +func (ts *localTransferService) Transfer(ctx context.Context, src interface{}, dest interface{}, opts ...transfer.Opt) error { + topts := &transfer.TransferOpts{} + for _, opt := range opts { + opt(topts) + } + + if a, ok := src.(typeurl.Any); ok { + r, err := ts.resolveType(a) + if err != nil { + return err + } + src = r + } + + if a, ok := dest.(typeurl.Any); ok { + r, err := ts.resolveType(a) + if err != nil { + return err + } + dest = r + } + + // Figure out matrix of whether source destination combination is supported + switch s := src.(type) { + case transfer.ImageResolver: + switch d := dest.(type) { + case transfer.ImageStorer: + return ts.pull(ctx, s, d, topts) + } + } + return fmt.Errorf("Unable to transfer from %s to %s: %w", name(src), name(dest), errdefs.ErrNotImplemented) +} + +func name(t interface{}) string { + switch s := t.(type) { + case fmt.Stringer: + return s.String() + case typeurl.Any: + return s.GetTypeUrl() + default: + return fmt.Sprintf("%T", t) + } +} + +func (ts *localTransferService) pull(ctx context.Context, ir transfer.ImageResolver, is transfer.ImageStorer, tops *transfer.TransferOpts) error { + // TODO: Attach lease if doesn't have one + + // From source, need + // - resolver + // - image name + + // From destination, need + // - Image labels + // - Unpack information + // - Platform to Snapshotter + // - Child label map + // - All metdata? + + name, desc, err := ir.Resolve(ctx) + if err != nil { + return fmt.Errorf("failed to resolve image: %w", err) + } + if desc.MediaType == images.MediaTypeDockerSchema1Manifest { + // Explicitly call out schema 1 as deprecated and not supported + return fmt.Errorf("schema 1 image manifests are no longer supported: %w", errdefs.ErrInvalidArgument) + } + + fetcher, err := ir.Fetcher(ctx, name) + if err != nil { + return fmt.Errorf("failed to get fetcher for %q: %w", name, err) + } + + var ( + handler images.Handler + + unpacker *unpack.Unpacker + + // has a config media type bug (distribution#1622) + hasMediaTypeBug1622 bool + + store = ts.content + ) + + // Get all the children for a descriptor + childrenHandler := images.ChildrenHandler(store) + + if hw, ok := is.(transfer.ImageFilterer); ok { + childrenHandler = hw.ImageFilter(childrenHandler) + } + // TODO: Move these to image store + //// TODO: This could easily be handled by having an ImageHandlerWrapper() + //// Set any children labels for that content + + checkNeedsFix := images.HandlerFunc( + func(_ context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) { + // set to true if there is application/octet-stream media type + if desc.MediaType == docker.LegacyConfigMediaType { + hasMediaTypeBug1622 = true + } + + return []ocispec.Descriptor{}, nil + }, + ) + + appendDistSrcLabelHandler, err := docker.AppendDistributionSourceLabel(store, name) + if err != nil { + return err + } + + // TODO: Support set of base handlers from configuration + // Progress handlers? + handlers := []images.Handler{ + remotes.FetchHandler(store, fetcher), + checkNeedsFix, + childrenHandler, + appendDistSrcLabelHandler, + } + + handler = images.Handlers(handlers...) + + // TODO: Should available platforms be a configuration of the service? + // First find suitable platforms to unpack into + if u, ok := is.(transfer.ImageUnpacker); ok { + uopts := []unpack.UnpackerOpt{} + for _, u := range u.UnpackPlatforms() { + uopts = append(uopts, unpack.WithUnpackPlatform(u)) + } + if ts.limiter != nil { + uopts = append(uopts, unpack.WithLimiter(ts.limiter)) + } + //if uconfig.DuplicationSuppressor != nil { + // uopts = append(uopts, unpack.WithDuplicationSuppressor(uconfig.DuplicationSuppressor)) + //} + unpacker, err = unpack.NewUnpacker(ctx, ts.content, uopts...) + if err != nil { + return fmt.Errorf("unable to initialize unpacker: %w", err) + } + defer func() { + // TODO: This needs to be tigher scoped... + if _, err := unpacker.Wait(); err != nil { + //if retErr == nil { + // retErr = fmt.Errorf("unpack: %w", err) + //} + } + }() + handler = unpacker.Unpack(handler) + } + + if err := images.Dispatch(ctx, handler, ts.limiter, desc); err != nil { + // TODO: Cancel unpack and wait? + return err + } + + // NOTE(fuweid): unpacker defers blobs download. before create image + // record in ImageService, should wait for unpacking(including blobs + // download). + if unpacker != nil { + if _, err = unpacker.Wait(); err != nil { + return err + } + // TODO: Check results to make sure unpack was successful + } + + if hasMediaTypeBug1622 { + if desc, err = docker.ConvertManifest(ctx, store, desc); err != nil { + return err + } + } + + _, err = is.Store(ctx, desc) + if err != nil { + return err + } + // TODO: Send status update + + return nil +} diff --git a/services/transfer/service.go b/services/transfer/service.go new file mode 100644 index 000000000..fd020b0d0 --- /dev/null +++ b/services/transfer/service.go @@ -0,0 +1,84 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package transfer + +import ( + "context" + + transferapi "github.com/containerd/containerd/api/services/transfer/v1" + "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/pkg/transfer" + "github.com/containerd/containerd/plugin" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/types/known/emptypb" +) + +func init() { + plugin.Register(&plugin.Registration{ + Type: plugin.GRPCPlugin, + ID: "transfer", + Requires: []plugin.Type{ + plugin.TransferPlugin, + }, + InitFn: newService, + }) +} + +type service struct { + transferers []transfer.Transferer + transferapi.UnimplementedTransferServer +} + +func newService(ic *plugin.InitContext) (interface{}, error) { + plugins, err := ic.GetByType(plugin.TransferPlugin) + if err != nil { + return nil, err + } + // TODO: how to determine order? + t := make([]transfer.Transferer, 0, len(plugins)) + for _, p := range plugins { + i, err := p.Instance() + if err != nil { + return nil, err + } + t = append(t, i.(transfer.Transferer)) + } + return &service{ + transferers: t, + }, nil +} + +func (s *service) Register(gs *grpc.Server) error { + transferapi.RegisterTransferServer(gs, s) + return nil +} + +func (s *service) Transfer(ctx context.Context, req *transferapi.TransferRequest) (*emptypb.Empty, error) { + // TODO: Optionally proxy + + // TODO: Convert options + for _, t := range s.transferers { + if err := t.Transfer(ctx, req.Source, req.Destination); err == nil { + return nil, nil + } else if !errdefs.IsNotImplemented(err) { + return nil, errdefs.ToGRPC(err) + } + } + return nil, status.Errorf(codes.Unimplemented, "method Transfer not implemented for %s to %s", req.Source.GetTypeUrl(), req.Destination.GetTypeUrl()) +}