diff --git a/images/archive/reference.go b/images/archive/reference.go index ba19b111f..8a030fbfa 100644 --- a/images/archive/reference.go +++ b/images/archive/reference.go @@ -41,6 +41,9 @@ func AddRefPrefix(image string) func(string) string { // a full reference. func refTranslator(image string, checkPrefix bool) func(string) string { return func(ref string) string { + if image == "" { + return "" + } // Check if ref is full reference if strings.ContainsAny(ref, "/:@") { // If not prefixed, don't include image diff --git a/integration/client/transfer_test.go b/integration/client/transfer_test.go index 74d19eeaa..fa3d9f822 100644 --- a/integration/client/transfer_test.go +++ b/integration/client/transfer_test.go @@ -45,7 +45,7 @@ func TestTransferEcho(t *testing.T) { func newImportExportEcho(ctx context.Context, client *containerd.Client, expected []byte) func(*testing.T) { return func(t *testing.T) { testBuf := newWaitBuffer() - err := client.Transfer(ctx, archive.NewImageImportStream(bytes.NewReader(expected)), archive.NewImageExportStream(testBuf)) + err := client.Transfer(ctx, archive.NewImageImportStream(bytes.NewReader(expected), "application/octet-stream"), archive.NewImageExportStream(testBuf)) if err != nil { t.Fatal(err) } diff --git a/pkg/transfer/archive/importer.go b/pkg/transfer/archive/importer.go index a82b0f923..4c37ec7cf 100644 --- a/pkg/transfer/archive/importer.go +++ b/pkg/transfer/archive/importer.go @@ -21,28 +21,51 @@ import ( "io" transferapi "github.com/containerd/containerd/api/types/transfer" + "github.com/containerd/containerd/content" + "github.com/containerd/containerd/images/archive" "github.com/containerd/containerd/log" "github.com/containerd/containerd/pkg/streaming" tstreaming "github.com/containerd/containerd/pkg/transfer/streaming" "github.com/containerd/typeurl" + ocispec "github.com/opencontainers/image-spec/specs-go/v1" ) +type ImportOpt func(*ImageImportStream) + +func WithForceCompression(s *ImageImportStream) { + s.forceCompress = true +} + // NewImageImportStream returns a image importer via tar stream -// TODO: Add import options -func NewImageImportStream(stream io.Reader) *ImageImportStream { - return &ImageImportStream{ - stream: stream, +func NewImageImportStream(stream io.Reader, mediaType string, opts ...ImportOpt) *ImageImportStream { + s := &ImageImportStream{ + stream: stream, + mediaType: mediaType, } + for _, opt := range opts { + opt(s) + } + return s } type ImageImportStream struct { - stream io.Reader + stream io.Reader + mediaType string + forceCompress bool } func (iis *ImageImportStream) ImportStream(context.Context) (io.Reader, error) { return iis.stream, nil } +func (iis *ImageImportStream) Import(ctx context.Context, store content.Store) (ocispec.Descriptor, error) { + var opts []archive.ImportOpt + if iis.forceCompress { + opts = append(opts, archive.WithImportCompression()) + } + return archive.ImportIndex(ctx, store, iis.stream, opts...) +} + func (iis *ImageImportStream) MarshalAny(ctx context.Context, sm streaming.StreamCreator) (typeurl.Any, error) { sid := tstreaming.GenerateID("import") stream, err := sm.Create(ctx, sid) @@ -52,7 +75,9 @@ func (iis *ImageImportStream) MarshalAny(ctx context.Context, sm streaming.Strea tstreaming.SendStream(ctx, iis.stream, stream) s := &transferapi.ImageImportStream{ - Stream: sid, + Stream: sid, + MediaType: iis.mediaType, + ForceCompress: iis.forceCompress, } return typeurl.MarshalAny(s) @@ -71,6 +96,8 @@ func (iis *ImageImportStream) UnmarshalAny(ctx context.Context, sm streaming.Str } iis.stream = tstreaming.ReceiveStream(ctx, stream) + iis.mediaType = s.MediaType + iis.forceCompress = s.ForceCompress return nil } diff --git a/pkg/transfer/image/imagestore.go b/pkg/transfer/image/imagestore.go index 015e38e0d..953803b67 100644 --- a/pkg/transfer/image/imagestore.go +++ b/pkg/transfer/image/imagestore.go @@ -25,6 +25,7 @@ import ( "github.com/containerd/containerd/content" "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/images" + "github.com/containerd/containerd/images/archive" "github.com/containerd/containerd/pkg/streaming" "github.com/containerd/containerd/pkg/transfer/plugins" "github.com/containerd/containerd/pkg/unpack" @@ -47,6 +48,12 @@ type Store struct { labelMap func(ocispec.Descriptor) []string manifestLimit int + //import image options + namePrefix string + checkPrefix bool + digestRefs bool + alwaysDigest bool + unpacks []UnpackConfiguration } @@ -86,6 +93,25 @@ func WithAllMetadata(s *Store) { s.allMetadata = true } +// WithNamePrefix sets the name prefix for imported images, if +// check is enabled, then only images with the prefix are stored. +func WithNamePrefix(prefix string, check bool) StoreOpt { + return func(s *Store) { + s.namePrefix = prefix + s.checkPrefix = check + } +} + +// WithDigestRefs sets digest refs for imported images, if +// always is enabled, then digest refs are added even if a +// non-digest image name is added for the same image. +func WithDigestRefs(always bool) StoreOpt { + return func(s *Store) { + s.digestRefs = true + s.alwaysDigest = always + } +} + // WithUnpack specifies a platform to unpack for and an optional snapshotter to use func WithUnpack(p ocispec.Platform, snapshotter string) StoreOpt { return func(s *Store) { @@ -144,6 +170,35 @@ func (is *Store) Store(ctx context.Context, desc ocispec.Descriptor, store image Labels: is.imageLabels, } + // Handle imported image names + if refType, ok := desc.Annotations["io.containerd.import.ref-type"]; ok { + var nameT func(string) string + if is.checkPrefix { + nameT = archive.FilterRefPrefix(is.namePrefix) + } else { + nameT = archive.AddRefPrefix(is.namePrefix) + } + name := imageName(desc.Annotations, nameT) + switch refType { + case "name": + if name == "" { + return images.Image{}, fmt.Errorf("no image name: %w", errdefs.ErrNotFound) + } + img.Name = name + case "digest": + if !is.digestRefs || (!is.alwaysDigest && name != "") { + return images.Image{}, fmt.Errorf("no digest refs: %w", errdefs.ErrNotFound) + } + img.Name = fmt.Sprintf("%s@%s", is.namePrefix, desc.Digest) + default: + return images.Image{}, fmt.Errorf("ref type not supported: %w", errdefs.ErrInvalidArgument) + } + delete(desc.Annotations, "io.containerd.import.ref-type") + } else if img.Name == "" { + // No valid image combination found + return images.Image{}, fmt.Errorf("no image name found: %w", errdefs.ErrNotFound) + } + for { if created, err := store.Create(ctx, img); err != nil { if !errdefs.IsAlreadyExists(err) { @@ -189,6 +244,10 @@ func (is *Store) MarshalAny(context.Context, streaming.StreamCreator) (typeurl.A ManifestLimit: uint32(is.manifestLimit), AllMetadata: is.allMetadata, Platforms: platformsToProto(is.platforms), + Prefix: is.namePrefix, + CheckPrefix: is.checkPrefix, + DigestRefs: is.digestRefs, + AlwaysDigest: is.alwaysDigest, Unpacks: unpackToProto(is.unpacks), } return typeurl.MarshalAny(s) @@ -205,6 +264,10 @@ func (is *Store) UnmarshalAny(ctx context.Context, sm streaming.StreamGetter, a is.manifestLimit = int(s.ManifestLimit) is.allMetadata = s.AllMetadata is.platforms = platformFromProto(s.Platforms) + is.namePrefix = s.Prefix + is.checkPrefix = s.CheckPrefix + is.digestRefs = s.DigestRefs + is.alwaysDigest = s.AlwaysDigest is.unpacks = unpackFromProto(s.Unpacks) return nil @@ -262,3 +325,17 @@ func unpackFromProto(auc []*transfertypes.UnpackConfiguration) []UnpackConfigura } return uc } + +func imageName(annotations map[string]string, ociCleanup func(string) string) string { + name := annotations[images.AnnotationImageName] + if name != "" { + return name + } + name = annotations[ocispec.AnnotationRefName] + if name != "" { + if ociCleanup != nil { + name = ociCleanup(name) + } + } + return name +} diff --git a/pkg/transfer/local/import.go b/pkg/transfer/local/import.go new file mode 100644 index 000000000..5518aea0a --- /dev/null +++ b/pkg/transfer/local/import.go @@ -0,0 +1,127 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package local + +import ( + "context" + "encoding/json" + + "github.com/containerd/containerd/content" + "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/images" + "github.com/containerd/containerd/pkg/transfer" + ocispec "github.com/opencontainers/image-spec/specs-go/v1" +) + +func (ts *localTransferService) importStream(ctx context.Context, i transfer.ImageImporter, is transfer.ImageStorer, tops *transfer.Config) error { + ctx, done, err := ts.withLease(ctx) + if err != nil { + return err + } + defer done(ctx) + + if tops.Progress != nil { + tops.Progress(transfer.Progress{ + Event: "Importing", + }) + } + + index, err := i.Import(ctx, ts.content) + if err != nil { + return err + } + + var descriptors []ocispec.Descriptor + + // If save index, add index + descriptors = append(descriptors, index) + + var handler images.HandlerFunc = func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) { + // Only save images at top level + if desc.Digest != index.Digest { + return images.Children(ctx, ts.content, desc) + } + + p, err := content.ReadBlob(ctx, ts.content, desc) + if err != nil { + return nil, err + } + + var idx ocispec.Index + if err := json.Unmarshal(p, &idx); err != nil { + return nil, err + } + + for _, m := range idx.Manifests { + m1 := m + m1.Annotations = mergeMap(m.Annotations, map[string]string{"io.containerd.import.ref-type": "name"}) + descriptors = append(descriptors, m1) + + // If add digest references, add twice + m2 := m + m2.Annotations = mergeMap(m.Annotations, map[string]string{"io.containerd.import.ref-type": "digest"}) + descriptors = append(descriptors, m2) + } + + return idx.Manifests, nil + } + + if f, ok := is.(transfer.ImageFilterer); ok { + handler = f.ImageFilter(handler, ts.content) + } + + if err := images.WalkNotEmpty(ctx, handler, index); err != nil { + return err + } + + for _, desc := range descriptors { + img, err := is.Store(ctx, desc, ts.images) + if err != nil { + if errdefs.IsNotFound(err) { + continue + } + return err + } + + if tops.Progress != nil { + tops.Progress(transfer.Progress{ + Event: "saved", + Name: img.Name, + //Digest: img.Target.Digest.String(), + }) + } + } + + if tops.Progress != nil { + tops.Progress(transfer.Progress{ + Event: "Completed import", + }) + } + + return nil +} + +func mergeMap(m1, m2 map[string]string) map[string]string { + merged := make(map[string]string, len(m1)+len(m2)) + for k, v := range m1 { + merged[k] = v + } + for k, v := range m2 { + merged[k] = v + } + return merged +} diff --git a/pkg/transfer/local/transfer.go b/pkg/transfer/local/transfer.go index 1d3693dbc..05cd29615 100644 --- a/pkg/transfer/local/transfer.go +++ b/pkg/transfer/local/transfer.go @@ -75,13 +75,12 @@ func (ts *localTransferService) Transfer(ctx context.Context, src interface{}, d case transfer.ImagePusher: return ts.push(ctx, s, d, topts) } - case transfer.ImageImportStreamer: + case transfer.ImageImporter: switch d := dest.(type) { case transfer.ImageExportStreamer: return ts.echo(ctx, s, d, topts) - - // Image import - // case transfer.ImageStorer + case transfer.ImageStorer: + return ts.importStream(ctx, s, d, topts) } } return fmt.Errorf("unable to transfer from %s to %s: %w", name(src), name(dest), errdefs.ErrNotImplemented) @@ -100,8 +99,12 @@ func name(t interface{}) string { // echo is mostly used for testing, it implements an import->export which is // a no-op which only roundtrips the bytes. -func (ts *localTransferService) echo(ctx context.Context, i transfer.ImageImportStreamer, e transfer.ImageExportStreamer, tops *transfer.Config) error { - r, err := i.ImportStream(ctx) +func (ts *localTransferService) echo(ctx context.Context, i transfer.ImageImporter, e transfer.ImageExportStreamer, tops *transfer.Config) error { + iis, ok := i.(transfer.ImageImportStreamer) + if !ok { + return fmt.Errorf("echo requires access to raw stream: %w", errdefs.ErrNotImplemented) + } + r, _, err := iis.ImportStream(ctx) if err != nil { return err } diff --git a/pkg/transfer/transfer.go b/pkg/transfer/transfer.go index 204bc927f..971f7f145 100644 --- a/pkg/transfer/transfer.go +++ b/pkg/transfer/transfer.go @@ -68,11 +68,16 @@ type ImageGetter interface { Get(context.Context, images.Store) (images.Image, error) } +// ImageImporter imports an image into a content store +type ImageImporter interface { + Import(context.Context, content.Store) (ocispec.Descriptor, error) +} + // ImageImportStreamer returns an import streamer based on OCI or // Docker image tar archives. The stream should be a raw tar stream // and without compression. type ImageImportStreamer interface { - ImportStream(context.Context) (io.Reader, error) + ImportStream(context.Context) (io.Reader, string, error) } type ImageExportStreamer interface {