diff --git a/pkg/transfer/archive/exporter.go b/pkg/transfer/archive/exporter.go index cf619ae42..064108a95 100644 --- a/pkg/transfer/archive/exporter.go +++ b/pkg/transfer/archive/exporter.go @@ -20,13 +20,20 @@ import ( "context" "io" - transferapi "github.com/containerd/containerd/api/types/transfer" + transfertypes "github.com/containerd/containerd/api/types/transfer" "github.com/containerd/containerd/log" "github.com/containerd/containerd/pkg/streaming" + "github.com/containerd/containerd/pkg/transfer/plugins" tstreaming "github.com/containerd/containerd/pkg/transfer/streaming" "github.com/containerd/typeurl" ) +func init() { + // TODO: Move this to seperate package? + plugins.Register(&transfertypes.ImageExportStream{}, &ImageExportStream{}) + plugins.Register(&transfertypes.ImageImportStream{}, &ImageImportStream{}) +} + // NewImageImportStream returns a image importer via tar stream // TODO: Add import options func NewImageExportStream(stream io.WriteCloser) *ImageExportStream { @@ -43,11 +50,9 @@ func (iis *ImageExportStream) ExportStream(context.Context) (io.WriteCloser, err return iis.stream, nil } -func (iis *ImageExportStream) MarshalAny(ctx context.Context, sm streaming.StreamManager) (typeurl.Any, error) { - // TODO: Unique stream ID - sid := "randomid" - // TODO: Should this API be create instead of get - stream, err := sm.Get(ctx, sid) +func (iis *ImageExportStream) MarshalAny(ctx context.Context, sm streaming.StreamCreator) (typeurl.Any, error) { + sid := tstreaming.GenerateID("export") + stream, err := sm.Create(ctx, sid) if err != nil { return nil, err } @@ -60,28 +65,26 @@ func (iis *ImageExportStream) MarshalAny(ctx context.Context, sm streaming.Strea iis.stream.Close() }() - s := &transferapi.ImageExportStream{ + s := &transfertypes.ImageExportStream{ Stream: sid, } return typeurl.MarshalAny(s) } -func (iis *ImageExportStream) UnmarshalAny(ctx context.Context, sm streaming.StreamManager, any typeurl.Any) error { - var s transferapi.ImageExportStream +func (iis *ImageExportStream) UnmarshalAny(ctx context.Context, sm streaming.StreamGetter, any typeurl.Any) error { + var s transfertypes.ImageExportStream if err := typeurl.UnmarshalTo(any, &s); err != nil { return err } stream, err := sm.Get(ctx, s.Stream) if err != nil { + log.G(ctx).WithError(err).WithField("stream", s.Stream).Debug("failed to get export stream") return err } - r, w := io.Pipe() - - tstreaming.SendStream(ctx, r, stream) - iis.stream = w + iis.stream = tstreaming.WriteByteStream(ctx, stream) return nil } diff --git a/pkg/transfer/archive/importer.go b/pkg/transfer/archive/importer.go index d1170b485..a82b0f923 100644 --- a/pkg/transfer/archive/importer.go +++ b/pkg/transfer/archive/importer.go @@ -21,6 +21,7 @@ import ( "io" transferapi "github.com/containerd/containerd/api/types/transfer" + "github.com/containerd/containerd/log" "github.com/containerd/containerd/pkg/streaming" tstreaming "github.com/containerd/containerd/pkg/transfer/streaming" "github.com/containerd/typeurl" @@ -42,11 +43,9 @@ func (iis *ImageImportStream) ImportStream(context.Context) (io.Reader, error) { return iis.stream, nil } -func (iis *ImageImportStream) MarshalAny(ctx context.Context, sm streaming.StreamManager) (typeurl.Any, error) { - // TODO: Unique stream ID - sid := "randomid" - // TODO: Should this API be create instead of get - stream, err := sm.Get(ctx, sid) +func (iis *ImageImportStream) MarshalAny(ctx context.Context, sm streaming.StreamCreator) (typeurl.Any, error) { + sid := tstreaming.GenerateID("import") + stream, err := sm.Create(ctx, sid) if err != nil { return nil, err } @@ -59,7 +58,7 @@ func (iis *ImageImportStream) MarshalAny(ctx context.Context, sm streaming.Strea return typeurl.MarshalAny(s) } -func (iis *ImageImportStream) UnmarshalAny(ctx context.Context, sm streaming.StreamManager, any typeurl.Any) error { +func (iis *ImageImportStream) UnmarshalAny(ctx context.Context, sm streaming.StreamGetter, any typeurl.Any) error { var s transferapi.ImageImportStream if err := typeurl.UnmarshalTo(any, &s); err != nil { return err @@ -67,6 +66,7 @@ func (iis *ImageImportStream) UnmarshalAny(ctx context.Context, sm streaming.Str stream, err := sm.Get(ctx, s.Stream) if err != nil { + log.G(ctx).WithError(err).WithField("stream", s.Stream).Debug("failed to get import stream") return err } diff --git a/pkg/transfer/image/imagestore.go b/pkg/transfer/image/imagestore.go new file mode 100644 index 000000000..c090f63f2 --- /dev/null +++ b/pkg/transfer/image/imagestore.go @@ -0,0 +1,140 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package image + +import ( + "context" + "fmt" + + transfertypes "github.com/containerd/containerd/api/types/transfer" + "github.com/containerd/containerd/content" + "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/images" + "github.com/containerd/containerd/pkg/streaming" + "github.com/containerd/containerd/pkg/transfer/plugins" + "github.com/containerd/containerd/pkg/unpack" + "github.com/containerd/containerd/platforms" + "github.com/containerd/containerd/remotes" + "github.com/containerd/typeurl" + ocispec "github.com/opencontainers/image-spec/specs-go/v1" +) + +func init() { + // TODO: Move this to seperate package? + plugins.Register(&transfertypes.ImageStore{}, &ImageStore{}) // TODO: Rename ImageStoreDestination +} + +type ImageStore struct { + // TODO: Put these configurations in object which can convert to/from any + // Embed generated type + imageName string + imageLabels map[string]string + platforms platforms.MatchComparer + allMetadata bool + labelMap func(ocispec.Descriptor) []string + manifestLimit int + + // TODO: Convert these to unpack platforms + unpacks []unpack.Platform +} + +func NewImageStore(image string) *ImageStore { + return &ImageStore{ + imageName: image, + } +} + +func (is *ImageStore) String() string { + return fmt.Sprintf("Local Image Store (%s)", is.imageName) +} + +func (is *ImageStore) FilterHandler(h images.HandlerFunc, cs content.Store) images.HandlerFunc { + h = images.SetChildrenMappedLabels(cs, h, is.labelMap) + if is.allMetadata { + // Filter manifests by platforms but allow to handle manifest + // and configuration for not-target platforms + h = remotes.FilterManifestByPlatformHandler(h, is.platforms) + } else { + // Filter children by platforms if specified. + h = images.FilterPlatforms(h, is.platforms) + } + + // Sort and limit manifests if a finite number is needed + if is.manifestLimit > 0 { + h = images.LimitManifests(h, is.platforms, is.manifestLimit) + } + return h +} + +func (is *ImageStore) Store(ctx context.Context, desc ocispec.Descriptor, store images.Store) (images.Image, error) { + img := images.Image{ + Name: is.imageName, + Target: desc, + Labels: is.imageLabels, + } + + for { + if created, err := store.Create(ctx, img); err != nil { + if !errdefs.IsAlreadyExists(err) { + return images.Image{}, err + } + + updated, err := store.Update(ctx, img) + if err != nil { + // if image was removed, try create again + if errdefs.IsNotFound(err) { + continue + } + return images.Image{}, err + } + + img = updated + } else { + img = created + } + + return img, nil + } +} + +func (is *ImageStore) Get(ctx context.Context, store images.Store) (images.Image, error) { + return store.Get(ctx, is.imageName) +} + +func (is *ImageStore) UnpackPlatforms() []unpack.Platform { + return is.unpacks +} + +func (is *ImageStore) MarshalAny(ctx context.Context, sm streaming.StreamCreator) (typeurl.Any, error) { + s := &transfertypes.ImageStore{ + Name: is.imageName, + // TODO: Support other fields + } + return typeurl.MarshalAny(s) +} + +func (is *ImageStore) UnmarshalAny(ctx context.Context, sm streaming.StreamGetter, a typeurl.Any) error { + var s transfertypes.ImageStore + if err := typeurl.UnmarshalTo(a, &s); err != nil { + return err + } + + is.imageName = s.Name + // TODO: Support other fields + + return nil +} diff --git a/pkg/transfer/image/local.go b/pkg/transfer/image/local.go deleted file mode 100644 index 25f9428fe..000000000 --- a/pkg/transfer/image/local.go +++ /dev/null @@ -1,322 +0,0 @@ -/* - Copyright The containerd Authors. - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. -*/ - -package transfer - -import ( - "context" - "fmt" - "net/http" - - "github.com/containerd/containerd/content" - "github.com/containerd/containerd/errdefs" - "github.com/containerd/containerd/images" - "github.com/containerd/containerd/pkg/streaming" - "github.com/containerd/containerd/pkg/transfer" - "github.com/containerd/containerd/pkg/unpack" - "github.com/containerd/containerd/platforms" - "github.com/containerd/containerd/remotes" - "github.com/containerd/containerd/remotes/docker" - "github.com/containerd/typeurl" - ocispec "github.com/opencontainers/image-spec/specs-go/v1" -) - -// TODO: Should a factory be exposed here as a service?? -/* -func NewOCIRegistryFromProto(p *transferapi.OCIRegistry, resolver remotes.Resolver, sm streaming.StreamManager) *OCIRegistry { - //transfer.OCIRegistry - // Create resolver - // Convert auth stream to credential manager - return &OCIRegistry{ - reference: p.Reference, - resolver: resolver, - } -} -*/ - -// Initialize with hosts, authorizer callback, and headers -func NewOCIRegistry(ref string, headers http.Header, creds CredentialHelper) *OCIRegistry { - // Create an authorizer - var ropts []docker.RegistryOpt - if creds != nil { - // TODO: Support bearer - authorizer := docker.NewDockerAuthorizer(docker.WithAuthCreds(func(host string) (string, string, error) { - c, err := creds.GetCredentials(context.Background(), ref, host) - if err != nil { - return "", "", err - } - - return c.Username, c.Secret, nil - })) - ropts = append(ropts, docker.WithAuthorizer(authorizer)) - } - - // TODO: Apply local configuration, maybe dynamically create resolver when requested - resolver := docker.NewResolver(docker.ResolverOptions{ - Hosts: docker.ConfigureDefaultRegistries(ropts...), - Headers: headers, - }) - return &OCIRegistry{ - reference: ref, - headers: headers, - creds: creds, - resolver: resolver, - } -} - -// From stream -type CredentialHelper interface { - GetCredentials(ctx context.Context, ref, host string) (Credentials, error) -} - -type Credentials struct { - Host string - Username string - Secret string - Bearer string -} - -// OCI -type OCIRegistry struct { - reference string - - headers http.Header - creds CredentialHelper - - resolver remotes.Resolver - - // This could be an interface which returns resolver? - // Resolver could also be a plug-able interface, to call out to a program to fetch? -} - -func (r *OCIRegistry) String() string { - return fmt.Sprintf("OCI Registry (%s)", r.reference) -} - -func (r *OCIRegistry) Image() string { - return r.reference -} - -func (r *OCIRegistry) Resolve(ctx context.Context) (name string, desc ocispec.Descriptor, err error) { - return r.resolver.Resolve(ctx, r.reference) -} - -func (r *OCIRegistry) Fetcher(ctx context.Context, ref string) (transfer.Fetcher, error) { - return r.resolver.Fetcher(ctx, ref) -} - -func (r *OCIRegistry) MarshalAny(ctx context.Context, sm streaming.StreamManager) (typeurl.Any, error) { - if r.creds != nil { - // TODO: Unique stream ID - stream, err := sm.Get(ctx, "") - if err != nil { - return nil, err - } - go func() { - // Check for context cancellation as well - for { - select { - case <-ctx.Done(): - return - default: - } - - _, err := stream.Recv() - if err != nil { - // If not EOF, log error - return - } - // If closed, return - // Call creds helper - // Send response - } - - }() - // link creds to stream - } - - // Create API OCI Registry type - - // Marshal and return - - return nil, nil -} - -type ImageStore struct { - // TODO: Put these configurations in object which can convert to/from any - // Embed generated type - imageName string - imageLabels map[string]string - platforms platforms.MatchComparer - allMetadata bool - labelMap func(ocispec.Descriptor) []string - manifestLimit int - - images images.Store - content content.Store - - // TODO: Convert these to unpack platforms - unpacks []unpack.Platform -} - -func NewImageStore(image string, cs content.Store, is images.Store) *ImageStore { - return &ImageStore{ - imageName: image, - images: is, - content: cs, - } -} - -func (is *ImageStore) String() string { - return fmt.Sprintf("Local Image Store (%s)", is.imageName) -} - -func (is *ImageStore) FilterHandler(h images.HandlerFunc) images.HandlerFunc { - h = images.SetChildrenMappedLabels(is.content, h, is.labelMap) - if is.allMetadata { - // Filter manifests by platforms but allow to handle manifest - // and configuration for not-target platforms - h = remotes.FilterManifestByPlatformHandler(h, is.platforms) - } else { - // Filter children by platforms if specified. - h = images.FilterPlatforms(h, is.platforms) - } - - // Sort and limit manifests if a finite number is needed - if is.manifestLimit > 0 { - h = images.LimitManifests(h, is.platforms, is.manifestLimit) - } - return h -} - -func (is *ImageStore) Store(ctx context.Context, desc ocispec.Descriptor) (images.Image, error) { - img := images.Image{ - Name: is.imageName, - Target: desc, - Labels: is.imageLabels, - } - - for { - if created, err := is.images.Create(ctx, img); err != nil { - if !errdefs.IsAlreadyExists(err) { - return images.Image{}, err - } - - updated, err := is.images.Update(ctx, img) - if err != nil { - // if image was removed, try create again - if errdefs.IsNotFound(err) { - continue - } - return images.Image{}, err - } - - img = updated - } else { - img = created - } - - return img, nil - } -} - -func (is *ImageStore) UnpackPlatforms() []unpack.Platform { - return is.unpacks -} - -/* -type RemoteContext struct { - // Resolver is used to resolve names to objects, fetchers, and pushers. - // If no resolver is provided, defaults to Docker registry resolver. - Resolver remotes.Resolver - - // PlatformMatcher is used to match the platforms for an image - // operation and define the preference when a single match is required - // from multiple platforms. - PlatformMatcher platforms.MatchComparer - - // Unpack is done after an image is pulled to extract into a snapshotter. - // It is done simultaneously for schema 2 images when they are pulled. - // If an image is not unpacked on pull, it can be unpacked any time - // afterwards. Unpacking is required to run an image. - Unpack bool - - // UnpackOpts handles options to the unpack call. - UnpackOpts []UnpackOpt - - // Snapshotter used for unpacking - Snapshotter string - - // SnapshotterOpts are additional options to be passed to a snapshotter during pull - SnapshotterOpts []snapshots.Opt - - // Labels to be applied to the created image - Labels map[string]string - - // BaseHandlers are a set of handlers which get are called on dispatch. - // These handlers always get called before any operation specific - // handlers. - BaseHandlers []images.Handler - - // HandlerWrapper wraps the handler which gets sent to dispatch. - // Unlike BaseHandlers, this can run before and after the built - // in handlers, allowing operations to run on the descriptor - // after it has completed transferring. - HandlerWrapper func(images.Handler) images.Handler - - // Platforms defines which platforms to handle when doing the image operation. - // Platforms is ignored when a PlatformMatcher is set, otherwise the - // platforms will be used to create a PlatformMatcher with no ordering - // preference. - Platforms []string - - // MaxConcurrentDownloads is the max concurrent content downloads for each pull. - MaxConcurrentDownloads int - - // MaxConcurrentUploadedLayers is the max concurrent uploaded layers for each push. - MaxConcurrentUploadedLayers int - - // AllMetadata downloads all manifests and known-configuration files - AllMetadata bool - - // ChildLabelMap sets the labels used to reference child objects in the content - // store. By default, all GC reference labels will be set for all fetched content. - ChildLabelMap func(ocispec.Descriptor) []string -} -*/ -/* -// What should streamhandler look like? -type StreamHandler interface { - Authorize() error - Progress(key string, int64) -} - -// Distribution options -// Stream handler -// Progress rate -// Unpack options -// Remote options -// Cases: -// Registry -> Content/ImageStore (pull) -// Registry -> Registry -// Content/ImageStore -> Registry (push) -// Content/ImageStore -> Content/ImageStore (tag) -// Common fetch/push interface for registry, content/imagestore, OCI index -// Always starts with string for source and destination, on client side, does not need to resolve -// Higher level implementation just takes strings and options -// Lower level implementation takes pusher/fetcher? - -*/ diff --git a/pkg/transfer/image/registry.go b/pkg/transfer/image/registry.go new file mode 100644 index 000000000..6fb5ffe20 --- /dev/null +++ b/pkg/transfer/image/registry.go @@ -0,0 +1,290 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package image + +import ( + "context" + "errors" + "fmt" + "io" + "net/http" + "strings" + "sync" + + transfertypes "github.com/containerd/containerd/api/types/transfer" + "github.com/containerd/containerd/log" + "github.com/containerd/containerd/pkg/streaming" + "github.com/containerd/containerd/pkg/transfer" + "github.com/containerd/containerd/pkg/transfer/plugins" + tstreaming "github.com/containerd/containerd/pkg/transfer/streaming" + "github.com/containerd/containerd/remotes" + "github.com/containerd/containerd/remotes/docker" + "github.com/containerd/typeurl" + ocispec "github.com/opencontainers/image-spec/specs-go/v1" +) + +func init() { + // TODO: Move this to seperate package? + plugins.Register(&transfertypes.OCIRegistry{}, &OCIRegistry{}) +} + +// Initialize with hosts, authorizer callback, and headers +func NewOCIRegistry(ref string, headers http.Header, creds CredentialHelper) *OCIRegistry { + // Create an authorizer + var ropts []docker.RegistryOpt + if creds != nil { + // TODO: Support bearer + authorizer := docker.NewDockerAuthorizer(docker.WithAuthCreds(func(host string) (string, string, error) { + c, err := creds.GetCredentials(context.Background(), ref, host) + if err != nil { + return "", "", err + } + + return c.Username, c.Secret, nil + })) + ropts = append(ropts, docker.WithAuthorizer(authorizer)) + } + + // TODO: Apply local configuration, maybe dynamically create resolver when requested + resolver := docker.NewResolver(docker.ResolverOptions{ + Hosts: docker.ConfigureDefaultRegistries(ropts...), + Headers: headers, + }) + return &OCIRegistry{ + reference: ref, + headers: headers, + creds: creds, + resolver: resolver, + } +} + +// From stream +type CredentialHelper interface { + GetCredentials(ctx context.Context, ref, host string) (Credentials, error) +} + +type Credentials struct { + Host string + Username string + Secret string + Header string +} + +// OCI +type OCIRegistry struct { + reference string + + headers http.Header + creds CredentialHelper + + resolver remotes.Resolver + + // This could be an interface which returns resolver? + // Resolver could also be a plug-able interface, to call out to a program to fetch? +} + +func (r *OCIRegistry) String() string { + return fmt.Sprintf("OCI Registry (%s)", r.reference) +} + +func (r *OCIRegistry) Image() string { + return r.reference +} + +func (r *OCIRegistry) Resolve(ctx context.Context) (name string, desc ocispec.Descriptor, err error) { + return r.resolver.Resolve(ctx, r.reference) +} + +func (r *OCIRegistry) Fetcher(ctx context.Context, ref string) (transfer.Fetcher, error) { + return r.resolver.Fetcher(ctx, ref) +} + +func (r *OCIRegistry) Pusher(ctx context.Context, desc ocispec.Descriptor) (transfer.Pusher, error) { + var ref = r.reference + // Annotate ref with digest to push only push tag for single digest + if !strings.Contains(ref, "@") { + ref = ref + "@" + desc.Digest.String() + } + return r.resolver.Pusher(ctx, ref) +} + +func (r *OCIRegistry) MarshalAny(ctx context.Context, sm streaming.StreamCreator) (typeurl.Any, error) { + res := &transfertypes.RegistryResolver{} + if r.headers != nil { + res.Headers = map[string]string{} + for k := range r.headers { + res.Headers[k] = r.headers.Get(k) + } + } + if r.creds != nil { + sid := tstreaming.GenerateID("creds") + stream, err := sm.Create(ctx, sid) + if err != nil { + return nil, err + } + go func() { + // Check for context cancellation as well + for { + select { + case <-ctx.Done(): + return + default: + } + + req, err := stream.Recv() + if err != nil { + // If not EOF, log error + return + } + + var s transfertypes.AuthRequest + if err := typeurl.UnmarshalTo(req, &s); err != nil { + log.G(ctx).WithError(err).Error("failed to unmarshal credential request") + continue + } + creds, err := r.creds.GetCredentials(ctx, s.Reference, s.Host) + if err != nil { + log.G(ctx).WithError(err).Error("failed to get credentials") + continue + } + var resp transfertypes.AuthResponse + if creds.Header != "" { + resp.AuthType = transfertypes.AuthType_HEADER + resp.Secret = creds.Header + } else if creds.Username != "" { + resp.AuthType = transfertypes.AuthType_CREDENTIALS + resp.Username = creds.Username + resp.Secret = creds.Secret + } else { + resp.AuthType = transfertypes.AuthType_REFRESH + resp.Secret = creds.Secret + } + + a, err := typeurl.MarshalAny(&resp) + if err != nil { + log.G(ctx).WithError(err).Error("failed to marshal credential response") + continue + } + + if err := stream.Send(a); err != nil { + if !errors.Is(err, io.EOF) { + log.G(ctx).WithError(err).Error("unexpected send failure") + } + return + } + } + + }() + res.AuthStream = sid + } + s := &transfertypes.OCIRegistry{ + Reference: r.reference, + Resolver: res, + } + + return typeurl.MarshalAny(s) +} + +func (r *OCIRegistry) UnmarshalAny(ctx context.Context, sm streaming.StreamGetter, a typeurl.Any) error { + var ( + s transfertypes.OCIRegistry + ropts []docker.RegistryOpt + aopts []docker.AuthorizerOpt + ) + if err := typeurl.UnmarshalTo(a, &s); err != nil { + return err + } + + if s.Resolver != nil { + if sid := s.Resolver.AuthStream; sid != "" { + stream, err := sm.Get(ctx, sid) + if err != nil { + log.G(ctx).WithError(err).WithField("stream", sid).Debug("failed to get auth stream") + return err + } + r.creds = &credCallback{ + stream: stream, + } + aopts = append(aopts, docker.WithAuthCreds(func(host string) (string, string, error) { + c, err := r.creds.GetCredentials(context.Background(), s.Reference, host) + if err != nil { + return "", "", err + } + + return c.Username, c.Secret, nil + })) + } + r.headers = http.Header{} + for k, v := range s.Resolver.Headers { + r.headers.Add(k, v) + } + } + authorizer := docker.NewDockerAuthorizer(aopts...) + ropts = append(ropts, docker.WithAuthorizer(authorizer)) + + r.reference = s.Reference + r.resolver = docker.NewResolver(docker.ResolverOptions{ + Hosts: docker.ConfigureDefaultRegistries(ropts...), + Headers: r.headers, + }) + + return nil +} + +type credCallback struct { + sync.Mutex + stream streaming.Stream +} + +func (cc *credCallback) GetCredentials(ctx context.Context, ref, host string) (Credentials, error) { + cc.Lock() + defer cc.Unlock() + + ar := &transfertypes.AuthRequest{ + Host: host, + Reference: ref, + } + any, err := typeurl.MarshalAny(ar) + if err != nil { + return Credentials{}, err + } + if err := cc.stream.Send(any); err != nil { + return Credentials{}, err + } + resp, err := cc.stream.Recv() + if err != nil { + return Credentials{}, err + } + var s transfertypes.AuthResponse + if err := typeurl.UnmarshalTo(resp, &s); err != nil { + return Credentials{}, err + } + creds := Credentials{ + Host: host, + } + switch s.AuthType { + case transfertypes.AuthType_CREDENTIALS: + creds.Username = s.Username + creds.Secret = s.Secret + case transfertypes.AuthType_REFRESH: + creds.Secret = s.Secret + case transfertypes.AuthType_HEADER: + creds.Header = s.Secret + } + + return creds, nil +} diff --git a/pkg/transfer/local/pull.go b/pkg/transfer/local/pull.go new file mode 100644 index 000000000..b6cdc6bce --- /dev/null +++ b/pkg/transfer/local/pull.go @@ -0,0 +1,243 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package local + +import ( + "context" + "fmt" + + "github.com/containerd/containerd/content" + "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/images" + "github.com/containerd/containerd/log" + "github.com/containerd/containerd/pkg/transfer" + "github.com/containerd/containerd/pkg/unpack" + "github.com/containerd/containerd/remotes" + "github.com/containerd/containerd/remotes/docker" + ocispec "github.com/opencontainers/image-spec/specs-go/v1" + "github.com/sirupsen/logrus" +) + +func (ts *localTransferService) pull(ctx context.Context, ir transfer.ImageFetcher, is transfer.ImageStorer, tops *transfer.TransferOpts) error { + ctx, done, err := ts.withLease(ctx) + if err != nil { + return err + } + defer done(ctx) + + if tops.Progress != nil { + tops.Progress(transfer.Progress{ + Event: fmt.Sprintf("Resolving from %s", ir), + }) + } + + name, desc, err := ir.Resolve(ctx) + if err != nil { + return fmt.Errorf("failed to resolve image: %w", err) + } + if desc.MediaType == images.MediaTypeDockerSchema1Manifest { + // Explicitly call out schema 1 as deprecated and not supported + return fmt.Errorf("schema 1 image manifests are no longer supported: %w", errdefs.ErrInvalidArgument) + } + + // TODO: Handle already exists + if tops.Progress != nil { + tops.Progress(transfer.Progress{ + Event: fmt.Sprintf("Pulling from %s", ir), + }) + tops.Progress(transfer.Progress{ + Event: "fetching image content", + Name: name, + //Digest: img.Target.Digest.String(), + }) + } + + fetcher, err := ir.Fetcher(ctx, name) + if err != nil { + return fmt.Errorf("failed to get fetcher for %q: %w", name, err) + } + + var ( + handler images.Handler + + unpacker *unpack.Unpacker + + // has a config media type bug (distribution#1622) + hasMediaTypeBug1622 bool + + store = ts.content + progressTracker *ProgressTracker + ) + + if tops.Progress != nil { + progressTracker = NewProgressTracker(name, store) //Pass in first name as root + go progressTracker.HandleProgress(ctx, tops.Progress) + defer progressTracker.Wait() + } + + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + // Get all the children for a descriptor + childrenHandler := images.ChildrenHandler(store) + + if f, ok := is.(transfer.ImageFilterer); ok { + childrenHandler = f.ImageFilter(childrenHandler, store) + } + + // Sort and limit manifests if a finite number is needed + //if limit > 0 { + // childrenHandler = images.LimitManifests(childrenHandler, rCtx.PlatformMatcher, limit) + //} + + checkNeedsFix := images.HandlerFunc( + func(_ context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) { + // set to true if there is application/octet-stream media type + if desc.MediaType == docker.LegacyConfigMediaType { + hasMediaTypeBug1622 = true + } + + return []ocispec.Descriptor{}, nil + }, + ) + + appendDistSrcLabelHandler, err := docker.AppendDistributionSourceLabel(store, name) + if err != nil { + return err + } + + // TODO: Allow initialization from configuration + baseHandlers := []images.Handler{} + + if tops.Progress != nil { + baseHandlers = append(baseHandlers, images.HandlerFunc( + func(_ context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) { + progressTracker.Add(desc) + + return []ocispec.Descriptor{}, nil + }, + )) + + baseChildrenHandler := childrenHandler + childrenHandler = images.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) (children []ocispec.Descriptor, err error) { + children, err = baseChildrenHandler(ctx, desc) + if err != nil { + return + } + progressTracker.AddChildren(desc, children) + return + }) + } + + handler = images.Handlers(append(baseHandlers, + fetchHandler(store, fetcher, progressTracker), + checkNeedsFix, + childrenHandler, // List children to track hierachy + appendDistSrcLabelHandler, + )...) + + // TODO: Should available platforms be a configuration of the service? + // First find suitable platforms to unpack into + //if unpacker, ok := is. + if iu, ok := is.(transfer.ImageUnpacker); ok { + unpacks := iu.UnpackPlatforms() + if len(unpacks) > 0 { + uopts := []unpack.UnpackerOpt{} + for _, u := range unpacks { + uopts = append(uopts, unpack.WithUnpackPlatform(u)) + } + if ts.limiter != nil { + uopts = append(uopts, unpack.WithLimiter(ts.limiter)) + } + //if uconfig.DuplicationSuppressor != nil { + // uopts = append(uopts, unpack.WithDuplicationSuppressor(uconfig.DuplicationSuppressor)) + //} + unpacker, err = unpack.NewUnpacker(ctx, ts.content, uopts...) + if err != nil { + return fmt.Errorf("unable to initialize unpacker: %w", err) + } + handler = unpacker.Unpack(handler) + } + } + + if err := images.Dispatch(ctx, handler, ts.limiter, desc); err != nil { + if unpacker != nil { + // wait for unpacker to cleanup + unpacker.Wait() + } + return err + } + + // NOTE(fuweid): unpacker defers blobs download. before create image + // record in ImageService, should wait for unpacking(including blobs + // download). + if unpacker != nil { + if _, err = unpacker.Wait(); err != nil { + return err + } + // TODO: Check results to make sure unpack was successful + } + + if hasMediaTypeBug1622 { + if desc, err = docker.ConvertManifest(ctx, store, desc); err != nil { + return err + } + } + + img, err := is.Store(ctx, desc, ts.images) + if err != nil { + return err + } + + if tops.Progress != nil { + tops.Progress(transfer.Progress{ + Event: "saved", + Name: img.Name, + //Digest: img.Target.Digest.String(), + }) + } + + if tops.Progress != nil { + tops.Progress(transfer.Progress{ + Event: fmt.Sprintf("Completed pull from %s", ir), + }) + } + + return nil +} + +func fetchHandler(ingester content.Ingester, fetcher remotes.Fetcher, pt *ProgressTracker) images.HandlerFunc { + return func(ctx context.Context, desc ocispec.Descriptor) (subdescs []ocispec.Descriptor, err error) { + ctx = log.WithLogger(ctx, log.G(ctx).WithFields(logrus.Fields{ + "digest": desc.Digest, + "mediatype": desc.MediaType, + "size": desc.Size, + })) + + switch desc.MediaType { + case images.MediaTypeDockerSchema1Manifest: + return nil, fmt.Errorf("%v not supported", desc.MediaType) + default: + err := remotes.Fetch(ctx, ingester, fetcher, desc) + if errdefs.IsAlreadyExists(err) { + pt.MarkExists(desc) + return nil, nil + } + return nil, err + } + } +} diff --git a/pkg/transfer/local/push.go b/pkg/transfer/local/push.go new file mode 100644 index 000000000..f68f40b97 --- /dev/null +++ b/pkg/transfer/local/push.go @@ -0,0 +1,79 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package local + +import ( + "context" + + "github.com/containerd/containerd/images" + "github.com/containerd/containerd/pkg/transfer" + "github.com/containerd/containerd/platforms" + "github.com/containerd/containerd/remotes" +) + +func (ts *localTransferService) push(ctx context.Context, ig transfer.ImageGetter, p transfer.ImagePusher, tops *transfer.TransferOpts) error { + /* + // TODO: Platform matching + if pushCtx.PlatformMatcher == nil { + if len(pushCtx.Platforms) > 0 { + var ps []ocispec.Platform + for _, platform := range pushCtx.Platforms { + p, err := platforms.Parse(platform) + if err != nil { + return fmt.Errorf("invalid platform %s: %w", platform, err) + } + ps = append(ps, p) + } + pushCtx.PlatformMatcher = platforms.Any(ps...) + } else { + pushCtx.PlatformMatcher = platforms.All + } + } + */ + + matcher := platforms.All + // Filter push + + img, err := ig.Get(ctx, ts.images) + if err != nil { + return err + } + + pusher, err := p.Pusher(ctx, img.Target) + if err != nil { + return err + } + + var wrapper func(images.Handler) images.Handler + + /* + // TODO: Add handlers + if len(pushCtx.BaseHandlers) > 0 { + wrapper = func(h images.Handler) images.Handler { + h = images.Handlers(append(pushCtx.BaseHandlers, h)...) + if pushCtx.HandlerWrapper != nil { + h = pushCtx.HandlerWrapper(h) + } + return h + } + } else if pushCtx.HandlerWrapper != nil { + wrapper = pushCtx.HandlerWrapper + } + */ + + return remotes.PushContent(ctx, pusher, img.Target, ts.content, ts.limiter, matcher, wrapper) +} diff --git a/pkg/transfer/local/transfer.go b/pkg/transfer/local/transfer.go index cf22f662a..3bdbfa560 100644 --- a/pkg/transfer/local/transfer.go +++ b/pkg/transfer/local/transfer.go @@ -20,32 +20,26 @@ import ( "context" "fmt" "io" + "time" "github.com/containerd/containerd/content" "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/images" "github.com/containerd/containerd/leases" - "github.com/containerd/containerd/log" "github.com/containerd/containerd/pkg/transfer" - "github.com/containerd/containerd/pkg/unpack" - "github.com/containerd/containerd/remotes" - "github.com/containerd/containerd/remotes/docker" "github.com/containerd/typeurl" - ocispec "github.com/opencontainers/image-spec/specs-go/v1" - "github.com/sirupsen/logrus" "golang.org/x/sync/semaphore" ) type localTransferService struct { leases leases.Manager content content.Store + images images.Store // semaphore.NewWeighted(int64(rCtx.MaxConcurrentDownloads)) limiter *semaphore.Weighted // TODO: Duplication suppressor - // Metadata service (Or snapshotters, image, content) - // Diff // Configuration // - Max downloads @@ -55,10 +49,11 @@ type localTransferService struct { // - Platform -> snapshotter defaults? } -func NewTransferService(lm leases.Manager, cs content.Store) transfer.Transferer { +func NewTransferService(lm leases.Manager, cs content.Store, is images.Store) transfer.Transferer { return &localTransferService{ leases: lm, content: cs, + images: is, } } @@ -68,15 +63,18 @@ func (ts *localTransferService) Transfer(ctx context.Context, src interface{}, d opt(topts) } - // Convert Any to real source/destination - // Figure out matrix of whether source destination combination is supported switch s := src.(type) { - case transfer.ImageResolver: + case transfer.ImageFetcher: switch d := dest.(type) { case transfer.ImageStorer: return ts.pull(ctx, s, d, topts) } + case transfer.ImageGetter: + switch d := dest.(type) { + case transfer.ImagePusher: + return ts.push(ctx, s, d, topts) + } case transfer.ImageImportStreamer: switch d := dest.(type) { case transfer.ImageExportStreamer: @@ -119,221 +117,33 @@ func (ts *localTransferService) echo(ctx context.Context, i transfer.ImageImport } return err } -func (ts *localTransferService) pull(ctx context.Context, ir transfer.ImageResolver, is transfer.ImageStorer, tops *transfer.TransferOpts) error { - // TODO: Attach lease if doesn't have one - // From source, need - // - resolver - // - image name +// WithLease attaches a lease on the context +func (ts *localTransferService) withLease(ctx context.Context, opts ...leases.Opt) (context.Context, func(context.Context) error, error) { + nop := func(context.Context) error { return nil } - // From destination, need - // - Image labels - // - Unpack information - // - Platform to Snapshotter - // - Child label map - // - All metdata? - if tops.Progress != nil { - tops.Progress(transfer.Progress{ - Event: fmt.Sprintf("Resolving from %s", ir), - }) + _, ok := leases.FromContext(ctx) + if ok { + return ctx, nop, nil } - name, desc, err := ir.Resolve(ctx) - if err != nil { - return fmt.Errorf("failed to resolve image: %w", err) - } - if desc.MediaType == images.MediaTypeDockerSchema1Manifest { - // Explicitly call out schema 1 as deprecated and not supported - return fmt.Errorf("schema 1 image manifests are no longer supported: %w", errdefs.ErrInvalidArgument) - } + ls := ts.leases - // TODO: Handle already exists - if tops.Progress != nil { - tops.Progress(transfer.Progress{ - Event: fmt.Sprintf("Pulling from %s", ir), - }) - tops.Progress(transfer.Progress{ - Event: "fetching image content", - Name: name, - //Digest: img.Target.Digest.String(), - }) - } - - fetcher, err := ir.Fetcher(ctx, name) - if err != nil { - return fmt.Errorf("failed to get fetcher for %q: %w", name, err) - } - - var ( - handler images.Handler - - unpacker *unpack.Unpacker - - // has a config media type bug (distribution#1622) - hasMediaTypeBug1622 bool - - store = ts.content - progressTracker *ProgressTracker - ) - - if tops.Progress != nil { - progressTracker = NewProgressTracker(name, store) //Pass in first name as root - go progressTracker.HandleProgress(ctx, tops.Progress) - defer progressTracker.Wait() - } - - ctx, cancel := context.WithCancel(ctx) - defer cancel() - - //func (is *ImageStore) FilterHandler(h images.HandlerFunc) images.HandlerFunc { - //func (is *ImageStore) Store(ctx context.Context, desc ocispec.Descriptor) (images.Image, error) { - - // Get all the children for a descriptor - childrenHandler := images.ChildrenHandler(store) - - if f, ok := is.(transfer.ImageFilterer); ok { - childrenHandler = f.ImageFilter(childrenHandler) - } - - // Sort and limit manifests if a finite number is needed - //if limit > 0 { - // childrenHandler = images.LimitManifests(childrenHandler, rCtx.PlatformMatcher, limit) - //} - - checkNeedsFix := images.HandlerFunc( - func(_ context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) { - // set to true if there is application/octet-stream media type - if desc.MediaType == docker.LegacyConfigMediaType { - hasMediaTypeBug1622 = true - } - - return []ocispec.Descriptor{}, nil - }, - ) - - appendDistSrcLabelHandler, err := docker.AppendDistributionSourceLabel(store, name) - if err != nil { - return err - } - - // TODO: Allow initialization from configuration - baseHandlers := []images.Handler{} - - if tops.Progress != nil { - baseHandlers = append(baseHandlers, images.HandlerFunc( - func(_ context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) { - progressTracker.Add(desc) - - return []ocispec.Descriptor{}, nil - }, - )) - - baseChildrenHandler := childrenHandler - childrenHandler = images.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) (children []ocispec.Descriptor, err error) { - children, err = baseChildrenHandler(ctx, desc) - if err != nil { - return - } - progressTracker.AddChildren(desc, children) - return - }) - } - - handler = images.Handlers(append(baseHandlers, - fetchHandler(store, fetcher, progressTracker), - checkNeedsFix, - childrenHandler, // List children to track hierachy - appendDistSrcLabelHandler, - )...) - - // TODO: Should available platforms be a configuration of the service? - // First find suitable platforms to unpack into - //if unpacker, ok := is. - if iu, ok := is.(transfer.ImageUnpacker); ok { - unpacks := iu.UnpackPlatforms() - if len(unpacks) > 0 { - uopts := []unpack.UnpackerOpt{} - for _, u := range unpacks { - uopts = append(uopts, unpack.WithUnpackPlatform(u)) - } - if ts.limiter != nil { - uopts = append(uopts, unpack.WithLimiter(ts.limiter)) - } - //if uconfig.DuplicationSuppressor != nil { - // uopts = append(uopts, unpack.WithDuplicationSuppressor(uconfig.DuplicationSuppressor)) - //} - unpacker, err = unpack.NewUnpacker(ctx, ts.content, uopts...) - if err != nil { - return fmt.Errorf("unable to initialize unpacker: %w", err) - } - handler = unpacker.Unpack(handler) + if len(opts) == 0 { + // Use default lease configuration if no options provided + opts = []leases.Opt{ + leases.WithRandomID(), + leases.WithExpiration(24 * time.Hour), } } - if err := images.Dispatch(ctx, handler, ts.limiter, desc); err != nil { - if unpacker != nil { - // wait for unpacker to cleanup - unpacker.Wait() - } - return err - } - - // NOTE(fuweid): unpacker defers blobs download. before create image - // record in ImageService, should wait for unpacking(including blobs - // download). - if unpacker != nil { - if _, err = unpacker.Wait(); err != nil { - return err - } - // TODO: Check results to make sure unpack was successful - } - - if hasMediaTypeBug1622 { - if desc, err = docker.ConvertManifest(ctx, store, desc); err != nil { - return err - } - } - - img, err := is.Store(ctx, desc) + l, err := ls.Create(ctx, opts...) if err != nil { - return err + return ctx, nop, err } - if tops.Progress != nil { - tops.Progress(transfer.Progress{ - Event: "saved", - Name: img.Name, - //Digest: img.Target.Digest.String(), - }) - } - - if tops.Progress != nil { - tops.Progress(transfer.Progress{ - Event: fmt.Sprintf("Completed pull from %s", ir), - }) - } - - return nil -} - -func fetchHandler(ingester content.Ingester, fetcher remotes.Fetcher, pt *ProgressTracker) images.HandlerFunc { - return func(ctx context.Context, desc ocispec.Descriptor) (subdescs []ocispec.Descriptor, err error) { - ctx = log.WithLogger(ctx, log.G(ctx).WithFields(logrus.Fields{ - "digest": desc.Digest, - "mediatype": desc.MediaType, - "size": desc.Size, - })) - - switch desc.MediaType { - case images.MediaTypeDockerSchema1Manifest: - return nil, fmt.Errorf("%v not supported", desc.MediaType) - default: - err := remotes.Fetch(ctx, ingester, fetcher, desc) - if errdefs.IsAlreadyExists(err) { - pt.MarkExists(desc) - return nil, nil - } - return nil, err - } - } + ctx = leases.WithLease(ctx, l.ID) + return ctx, func(ctx context.Context) error { + return ls.Delete(ctx, l) + }, nil } diff --git a/pkg/transfer/proxy/transfer.go b/pkg/transfer/proxy/transfer.go index f6e677aad..3d2cf4bb5 100644 --- a/pkg/transfer/proxy/transfer.go +++ b/pkg/transfer/proxy/transfer.go @@ -18,29 +18,73 @@ package proxy import ( "context" + "io" transferapi "github.com/containerd/containerd/api/services/transfer/v1" + transfertypes "github.com/containerd/containerd/api/types/transfer" + "github.com/containerd/containerd/log" "github.com/containerd/containerd/pkg/streaming" "github.com/containerd/containerd/pkg/transfer" + tstreaming "github.com/containerd/containerd/pkg/transfer/streaming" "github.com/containerd/typeurl" "google.golang.org/protobuf/types/known/anypb" ) type proxyTransferer struct { client transferapi.TransferClient - streamManager streaming.StreamManager + streamCreator streaming.StreamCreator } // NewTransferer returns a new transferr which communicates over a GRPC // connection using the containerd transfer API -func NewTransferer(client transferapi.TransferClient, sm streaming.StreamManager) transfer.Transferer { +func NewTransferer(client transferapi.TransferClient, sc streaming.StreamCreator) transfer.Transferer { return &proxyTransferer{ client: client, - streamManager: sm, + streamCreator: sc, } } func (p *proxyTransferer) Transfer(ctx context.Context, src interface{}, dst interface{}, opts ...transfer.Opt) error { + o := &transfer.TransferOpts{} + for _, opt := range opts { + opt(o) + } + apiOpts := &transferapi.TransferOptions{} + if o.Progress != nil { + sid := tstreaming.GenerateID("progress") + stream, err := p.streamCreator.Create(ctx, sid) + if err != nil { + return err + } + apiOpts.ProgressStream = sid + go func() { + for { + a, err := stream.Recv() + if err != nil { + if err != io.EOF { + log.G(ctx).WithError(err).Error("progress stream failed to recv") + } + return + } + i, err := typeurl.UnmarshalAny(a) + if err != nil { + log.G(ctx).WithError(err).Warnf("failed to unmarshal progress object: %v", a.GetTypeUrl()) + } + switch v := i.(type) { + case *transfertypes.Progress: + o.Progress(transfer.Progress{ + Event: v.Event, + Name: v.Name, + Parents: v.Parents, + Progress: v.Progress, + Total: v.Total, + }) + default: + log.G(ctx).Warnf("unhandled progress object %T: %v", i, a.GetTypeUrl()) + } + } + }() + } asrc, err := p.marshalAny(ctx, src) if err != nil { return err @@ -49,7 +93,6 @@ func (p *proxyTransferer) Transfer(ctx context.Context, src interface{}, dst int if err != nil { return err } - // Resolve opts to req := &transferapi.TransferRequest{ Source: &anypb.Any{ TypeUrl: asrc.GetTypeUrl(), @@ -59,7 +102,7 @@ func (p *proxyTransferer) Transfer(ctx context.Context, src interface{}, dst int TypeUrl: adst.GetTypeUrl(), Value: adst.GetValue(), }, - // TODO: Options + Options: apiOpts, } _, err = p.client.Transfer(ctx, req) return err @@ -67,11 +110,11 @@ func (p *proxyTransferer) Transfer(ctx context.Context, src interface{}, dst int func (p *proxyTransferer) marshalAny(ctx context.Context, i interface{}) (typeurl.Any, error) { switch m := i.(type) { case streamMarshaler: - return m.MarshalAny(ctx, p.streamManager) + return m.MarshalAny(ctx, p.streamCreator) } return typeurl.MarshalAny(i) } type streamMarshaler interface { - MarshalAny(context.Context, streaming.StreamManager) (typeurl.Any, error) + MarshalAny(context.Context, streaming.StreamCreator) (typeurl.Any, error) } diff --git a/pkg/transfer/transfer.go b/pkg/transfer/transfer.go index 5cecc30a5..42ab73eef 100644 --- a/pkg/transfer/transfer.go +++ b/pkg/transfer/transfer.go @@ -20,6 +20,7 @@ import ( "context" "io" + "github.com/containerd/containerd/content" "github.com/containerd/containerd/images" "github.com/containerd/containerd/pkg/unpack" ocispec "github.com/opencontainers/image-spec/specs-go/v1" @@ -31,23 +32,40 @@ type Transferer interface { type ImageResolver interface { Resolve(ctx context.Context) (name string, desc ocispec.Descriptor, err error) +} + +type ImageFetcher interface { + ImageResolver Fetcher(ctx context.Context, ref string) (Fetcher, error) } +type ImagePusher interface { + Pusher(context.Context, ocispec.Descriptor) (Pusher, error) +} + type Fetcher interface { - Fetch(ctx context.Context, desc ocispec.Descriptor) (io.ReadCloser, error) + Fetch(context.Context, ocispec.Descriptor) (io.ReadCloser, error) +} + +type Pusher interface { + Push(context.Context, ocispec.Descriptor) (content.Writer, error) } // ImageFilterer is used to filter out child objects of an image type ImageFilterer interface { - ImageFilter(images.HandlerFunc) images.HandlerFunc + ImageFilter(images.HandlerFunc, content.Store) images.HandlerFunc } // ImageStorer is a type which is capable of storing an image to // for a provided descriptor type ImageStorer interface { - Store(context.Context, ocispec.Descriptor) (images.Image, error) + Store(context.Context, ocispec.Descriptor, images.Store) (images.Image, error) +} + +// ImageGetter is type which returns an image from an image store +type ImageGetter interface { + Get(context.Context, images.Store) (images.Image, error) } // ImageImportStreamer returns an import streamer based on OCI or diff --git a/plugins/transfer/plugin.go b/plugins/transfer/plugin.go index 1b9002dcb..98ea8f594 100644 --- a/plugins/transfer/plugin.go +++ b/plugins/transfer/plugin.go @@ -17,27 +17,16 @@ package transfer import ( - "context" - "fmt" - - ttypes "github.com/containerd/containerd/api/types/transfer" - "github.com/containerd/containerd/content" - "github.com/containerd/containerd/errdefs" - "github.com/containerd/containerd/images" "github.com/containerd/containerd/leases" "github.com/containerd/containerd/metadata" - "github.com/containerd/containerd/pkg/transfer" - "github.com/containerd/containerd/pkg/unpack" + "github.com/containerd/containerd/pkg/transfer/local" "github.com/containerd/containerd/plugin" - "github.com/containerd/containerd/remotes" - "github.com/containerd/containerd/remotes/docker" - "github.com/containerd/typeurl" - ocispec "github.com/opencontainers/image-spec/specs-go/v1" - "golang.org/x/sync/semaphore" + + _ "github.com/containerd/containerd/pkg/transfer/archive" + _ "github.com/containerd/containerd/pkg/transfer/image" ) func init() { - plugin.Register(&plugin.Registration{ Type: plugin.TransferPlugin, ID: "image", @@ -57,274 +46,12 @@ func init() { return nil, err } - // Map to url instance handler (typeurl.Any) interface{} - - return &localTransferService{ - leases: l.(leases.Manager), - content: ms.ContentStore(), - conversions: map[string]func(typeurl.Any) (interface{}, error){}, - // // semaphore.NewWeighted(int64(rCtx.MaxConcurrentDownloads)) - // limiter *semaphore.Weighted - }, nil + return local.NewTransferService(l.(leases.Manager), ms.ContentStore(), metadata.NewImageStore(ms)), nil }, }) - } type transferConfig struct { // Max concurrent downloads // Snapshotter platforms } - -// TODO: Move this to a local package with constructor arguments...? -type localTransferService struct { - leases leases.Manager - content content.Store - - // semaphore.NewWeighted(int64(rCtx.MaxConcurrentDownloads)) - limiter *semaphore.Weighted - - conversions map[string]func(typeurl.Any) (interface{}, error) - - // TODO: Duplication suppressor - // Metadata service (Or snapshotters, image, content) - // Diff - - // Configuration - // - Max downloads - // - Max uploads - - // Supported platforms - // - Platform -> snapshotter defaults? - - // Type Resolver, support registration... For Any Type URL -> Constructor -} - -// populatedConversions is used to map the typeurls to instance converstion functions, -// since the typeurls are derived from instances rather than types, they are -// calculated at runtime and populate the converstion map. -// Static mapping or offloading conversion to another plugin is probably a more -// ideal long term solution. -func (ts *localTransferService) populateConversions() error { - for _, c := range []struct { - instance interface{} - conversion func(typeurl.Any) (interface{}, error) - }{ - {ttypes.ImageStoreDestination{}, ts.convertImageStoreDestination}, - {ttypes.OCIRegistry{}, ts.convertOCIRegistry}, - } { - u, err := typeurl.TypeURL(c.instance) - if err != nil { - return fmt.Errorf("unable to get type %T: %w", c.instance, err) - } - if _, ok := ts.conversions[u]; ok { - return fmt.Errorf("duplicate typeurl mapping: %s for %T", u, c.instance) - } - } - - return nil -} - -func (ts *localTransferService) convertImageStoreDestination(a typeurl.Any) (interface{}, error) { - var dest ttypes.ImageStoreDestination - if err := typeurl.UnmarshalTo(a, &dest); err != nil { - return nil, err - } - return nil, nil -} - -func (ts *localTransferService) convertOCIRegistry(a typeurl.Any) (interface{}, error) { - var dest ttypes.OCIRegistry - if err := typeurl.UnmarshalTo(a, &dest); err != nil { - return nil, err - } - // TODO: Create credential callback - - return nil, nil -} - -func (ts *localTransferService) resolveType(a typeurl.Any) (interface{}, error) { - c, ok := ts.conversions[a.GetTypeUrl()] - if !ok { - return nil, fmt.Errorf("type %q not supported: %w", a.GetTypeUrl(), errdefs.ErrNotImplemented) - } - return c(a) -} - -func (ts *localTransferService) Transfer(ctx context.Context, src interface{}, dest interface{}, opts ...transfer.Opt) error { - topts := &transfer.TransferOpts{} - for _, opt := range opts { - opt(topts) - } - - if a, ok := src.(typeurl.Any); ok { - r, err := ts.resolveType(a) - if err != nil { - return err - } - src = r - } - - if a, ok := dest.(typeurl.Any); ok { - r, err := ts.resolveType(a) - if err != nil { - return err - } - dest = r - } - - // Figure out matrix of whether source destination combination is supported - switch s := src.(type) { - case transfer.ImageResolver: - switch d := dest.(type) { - case transfer.ImageStorer: - return ts.pull(ctx, s, d, topts) - } - } - return fmt.Errorf("Unable to transfer from %s to %s: %w", name(src), name(dest), errdefs.ErrNotImplemented) -} - -func name(t interface{}) string { - switch s := t.(type) { - case fmt.Stringer: - return s.String() - case typeurl.Any: - return s.GetTypeUrl() - default: - return fmt.Sprintf("%T", t) - } -} - -func (ts *localTransferService) pull(ctx context.Context, ir transfer.ImageResolver, is transfer.ImageStorer, tops *transfer.TransferOpts) error { - // TODO: Attach lease if doesn't have one - - // From source, need - // - resolver - // - image name - - // From destination, need - // - Image labels - // - Unpack information - // - Platform to Snapshotter - // - Child label map - // - All metdata? - - name, desc, err := ir.Resolve(ctx) - if err != nil { - return fmt.Errorf("failed to resolve image: %w", err) - } - if desc.MediaType == images.MediaTypeDockerSchema1Manifest { - // Explicitly call out schema 1 as deprecated and not supported - return fmt.Errorf("schema 1 image manifests are no longer supported: %w", errdefs.ErrInvalidArgument) - } - - fetcher, err := ir.Fetcher(ctx, name) - if err != nil { - return fmt.Errorf("failed to get fetcher for %q: %w", name, err) - } - - var ( - handler images.Handler - - unpacker *unpack.Unpacker - - // has a config media type bug (distribution#1622) - hasMediaTypeBug1622 bool - - store = ts.content - ) - - // Get all the children for a descriptor - childrenHandler := images.ChildrenHandler(store) - - if hw, ok := is.(transfer.ImageFilterer); ok { - childrenHandler = hw.ImageFilter(childrenHandler) - } - // TODO: Move these to image store - //// TODO: This could easily be handled by having an ImageHandlerWrapper() - //// Set any children labels for that content - - checkNeedsFix := images.HandlerFunc( - func(_ context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) { - // set to true if there is application/octet-stream media type - if desc.MediaType == docker.LegacyConfigMediaType { - hasMediaTypeBug1622 = true - } - - return []ocispec.Descriptor{}, nil - }, - ) - - appendDistSrcLabelHandler, err := docker.AppendDistributionSourceLabel(store, name) - if err != nil { - return err - } - - // TODO: Support set of base handlers from configuration - // Progress handlers? - handlers := []images.Handler{ - remotes.FetchHandler(store, fetcher), - checkNeedsFix, - childrenHandler, - appendDistSrcLabelHandler, - } - - handler = images.Handlers(handlers...) - - // TODO: Should available platforms be a configuration of the service? - // First find suitable platforms to unpack into - if u, ok := is.(transfer.ImageUnpacker); ok { - uopts := []unpack.UnpackerOpt{} - for _, u := range u.UnpackPlatforms() { - uopts = append(uopts, unpack.WithUnpackPlatform(u)) - } - if ts.limiter != nil { - uopts = append(uopts, unpack.WithLimiter(ts.limiter)) - } - //if uconfig.DuplicationSuppressor != nil { - // uopts = append(uopts, unpack.WithDuplicationSuppressor(uconfig.DuplicationSuppressor)) - //} - unpacker, err = unpack.NewUnpacker(ctx, ts.content, uopts...) - if err != nil { - return fmt.Errorf("unable to initialize unpacker: %w", err) - } - defer func() { - // TODO: This needs to be tigher scoped... - if _, err := unpacker.Wait(); err != nil { - //if retErr == nil { - // retErr = fmt.Errorf("unpack: %w", err) - //} - } - }() - handler = unpacker.Unpack(handler) - } - - if err := images.Dispatch(ctx, handler, ts.limiter, desc); err != nil { - // TODO: Cancel unpack and wait? - return err - } - - // NOTE(fuweid): unpacker defers blobs download. before create image - // record in ImageService, should wait for unpacking(including blobs - // download). - if unpacker != nil { - if _, err = unpacker.Wait(); err != nil { - return err - } - // TODO: Check results to make sure unpack was successful - } - - if hasMediaTypeBug1622 { - if desc, err = docker.ConvertManifest(ctx, store, desc); err != nil { - return err - } - } - - _, err = is.Store(ctx, desc) - if err != nil { - return err - } - // TODO: Send status update - - return nil -} diff --git a/services/transfer/service.go b/services/transfer/service.go index fd020b0d0..1e2fa91f9 100644 --- a/services/transfer/service.go +++ b/services/transfer/service.go @@ -20,9 +20,15 @@ import ( "context" transferapi "github.com/containerd/containerd/api/services/transfer/v1" + transferTypes "github.com/containerd/containerd/api/types/transfer" "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/log" + "github.com/containerd/containerd/pkg/streaming" "github.com/containerd/containerd/pkg/transfer" + "github.com/containerd/containerd/pkg/transfer/plugins" "github.com/containerd/containerd/plugin" + ptypes "github.com/containerd/containerd/protobuf/types" + "github.com/containerd/typeurl" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -35,13 +41,15 @@ func init() { ID: "transfer", Requires: []plugin.Type{ plugin.TransferPlugin, + plugin.StreamingPlugin, }, InitFn: newService, }) } type service struct { - transferers []transfer.Transferer + transferers []transfer.Transferer + streamManager streaming.StreamManager transferapi.UnimplementedTransferServer } @@ -50,6 +58,7 @@ func newService(ic *plugin.InitContext) (interface{}, error) { if err != nil { return nil, err } + // TODO: how to determine order? t := make([]transfer.Transferer, 0, len(plugins)) for _, p := range plugins { @@ -59,8 +68,13 @@ func newService(ic *plugin.InitContext) (interface{}, error) { } t = append(t, i.(transfer.Transferer)) } + sp, err := ic.GetByID(plugin.StreamingPlugin, "manager") + if err != nil { + return nil, err + } return &service{ - transferers: t, + transferers: t, + streamManager: sp.(streaming.StreamManager), }, nil } @@ -70,15 +84,75 @@ func (s *service) Register(gs *grpc.Server) error { } func (s *service) Transfer(ctx context.Context, req *transferapi.TransferRequest) (*emptypb.Empty, error) { - // TODO: Optionally proxy + var transferOpts []transfer.Opt + if req.Options != nil { + if req.Options.ProgressStream != "" { + stream, err := s.streamManager.Get(ctx, req.Options.ProgressStream) + if err != nil { + return nil, errdefs.ToGRPC(err) + } + defer stream.Close() + + pf := func(p transfer.Progress) { + any, err := typeurl.MarshalAny(&transferTypes.Progress{ + Event: p.Event, + Name: p.Name, + Parents: p.Parents, + Progress: p.Progress, + Total: p.Total, + }) + if err != nil { + log.G(ctx).WithError(err).Warnf("event could not be marshaled: %v/%v", p.Event, p.Name) + return + } + if err := stream.Send(any); err != nil { + log.G(ctx).WithError(err).Warnf("event not sent: %v/%v", p.Event, p.Name) + return + } + } + + transferOpts = append(transferOpts, transfer.WithProgress(pf)) + } + } + src, err := s.convertAny(ctx, req.Source) + if err != nil { + return nil, errdefs.ToGRPC(err) + } + dst, err := s.convertAny(ctx, req.Destination) + plugins.ResolveType(req.Source) + if err != nil { + return nil, errdefs.ToGRPC(err) + } - // TODO: Convert options for _, t := range s.transferers { - if err := t.Transfer(ctx, req.Source, req.Destination); err == nil { - return nil, nil + if err := t.Transfer(ctx, src, dst, transferOpts...); err == nil { + return &ptypes.Empty{}, nil } else if !errdefs.IsNotImplemented(err) { return nil, errdefs.ToGRPC(err) } } return nil, status.Errorf(codes.Unimplemented, "method Transfer not implemented for %s to %s", req.Source.GetTypeUrl(), req.Destination.GetTypeUrl()) } + +func (s *service) convertAny(ctx context.Context, a typeurl.Any) (interface{}, error) { + obj, err := plugins.ResolveType(a) + if err != nil { + if errdefs.IsNotFound(err) { + return typeurl.UnmarshalAny(a) + } + return nil, err + } + switch v := obj.(type) { + case streamUnmarshaler: + err = v.UnmarshalAny(ctx, s.streamManager, a) + return obj, err + default: + log.G(ctx).Debug("unmarshling to..") + err = typeurl.UnmarshalTo(a, obj) + return obj, err + } +} + +type streamUnmarshaler interface { + UnmarshalAny(context.Context, streaming.StreamGetter, typeurl.Any) error +} diff --git a/transfer.go b/transfer.go index 61fb7e88e..520931d94 100644 --- a/transfer.go +++ b/transfer.go @@ -19,56 +19,68 @@ package containerd import ( "context" + streamingapi "github.com/containerd/containerd/api/services/streaming/v1" transferapi "github.com/containerd/containerd/api/services/transfer/v1" "github.com/containerd/containerd/pkg/streaming" "github.com/containerd/containerd/pkg/transfer" + "github.com/containerd/containerd/pkg/transfer/proxy" + "github.com/containerd/containerd/protobuf" "github.com/containerd/typeurl" - "google.golang.org/protobuf/types/known/anypb" ) -func (c *Client) Transfer(ctx context.Context, src interface{}, dst interface{}, opts ...transfer.Opt) error { - // Conver Options - // Convert Source - // Convert Destinations - // Get Stream Manager +func (c *Client) Transfer(ctx context.Context, src interface{}, dest interface{}, opts ...transfer.Opt) error { + return proxy.NewTransferer(transferapi.NewTransferClient(c.conn), c.streamCreator()).Transfer(ctx, src, dest, opts...) +} - asrc, err := c.toAny(ctx, src) +func (c *Client) streamCreator() streaming.StreamCreator { + return &streamCreator{ + client: streamingapi.NewStreamingClient(c.conn), + } +} + +type streamCreator struct { + client streamingapi.StreamingClient +} + +func (sc *streamCreator) Create(ctx context.Context, id string) (streaming.Stream, error) { + stream, err := sc.client.Stream(ctx) if err != nil { - return err + return nil, err } - adst, err := c.toAny(ctx, dst) - if err != nil { - return err - } - - _, err = transferapi.NewTransferClient(c.conn).Transfer(ctx, &transferapi.TransferRequest{ - Source: &anypb.Any{ - TypeUrl: asrc.GetTypeUrl(), - Value: asrc.GetValue(), - }, - Destination: &anypb.Any{ - TypeUrl: adst.GetTypeUrl(), - Value: adst.GetValue(), - }, + a, err := typeurl.MarshalAny(&streamingapi.StreamInit{ + ID: id, }) - return err -} - -func (c *Client) toAny(ctx context.Context, i interface{}) (a typeurl.Any, err error) { - switch v := i.(type) { - case toAny: - //Get stream manager - a, err = v.ToAny(ctx, nil) - case typeurl.Any: - a = v - default: - a, err = typeurl.MarshalAny(i) + if err != nil { + return nil, err + } + err = stream.Send(protobuf.FromAny(a)) + if err != nil { + return nil, err } - return + // Receive an ack that stream is init and ready + if _, err = stream.Recv(); err != nil { + return nil, err + } + + return &clientStream{ + s: stream, + }, nil } -type toAny interface { - ToAny(context.Context, streaming.StreamManager) (typeurl.Any, error) +type clientStream struct { + s streamingapi.Streaming_StreamClient +} + +func (cs *clientStream) Send(a typeurl.Any) error { + return cs.s.Send(protobuf.FromAny(a)) +} + +func (cs *clientStream) Recv() (typeurl.Any, error) { + return cs.s.Recv() +} + +func (cs *clientStream) Close() error { + return cs.s.CloseSend() }