Update import logic

Signed-off-by: Derek McGowan <derek@mcg.dev>
This commit is contained in:
Derek McGowan 2022-09-15 22:42:34 -07:00
parent 66dc4d1069
commit 11c1c8e6f4
No known key found for this signature in database
GPG Key ID: F58C5D0A4405ACDB
7 changed files with 256 additions and 14 deletions

View File

@ -41,6 +41,9 @@ func AddRefPrefix(image string) func(string) string {
// a full reference. // a full reference.
func refTranslator(image string, checkPrefix bool) func(string) string { func refTranslator(image string, checkPrefix bool) func(string) string {
return func(ref string) string { return func(ref string) string {
if image == "" {
return ""
}
// Check if ref is full reference // Check if ref is full reference
if strings.ContainsAny(ref, "/:@") { if strings.ContainsAny(ref, "/:@") {
// If not prefixed, don't include image // If not prefixed, don't include image

View File

@ -45,7 +45,7 @@ func TestTransferEcho(t *testing.T) {
func newImportExportEcho(ctx context.Context, client *containerd.Client, expected []byte) func(*testing.T) { func newImportExportEcho(ctx context.Context, client *containerd.Client, expected []byte) func(*testing.T) {
return func(t *testing.T) { return func(t *testing.T) {
testBuf := newWaitBuffer() testBuf := newWaitBuffer()
err := client.Transfer(ctx, archive.NewImageImportStream(bytes.NewReader(expected)), archive.NewImageExportStream(testBuf)) err := client.Transfer(ctx, archive.NewImageImportStream(bytes.NewReader(expected), "application/octet-stream"), archive.NewImageExportStream(testBuf))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@ -21,28 +21,51 @@ import (
"io" "io"
transferapi "github.com/containerd/containerd/api/types/transfer" transferapi "github.com/containerd/containerd/api/types/transfer"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/images/archive"
"github.com/containerd/containerd/log" "github.com/containerd/containerd/log"
"github.com/containerd/containerd/pkg/streaming" "github.com/containerd/containerd/pkg/streaming"
tstreaming "github.com/containerd/containerd/pkg/transfer/streaming" tstreaming "github.com/containerd/containerd/pkg/transfer/streaming"
"github.com/containerd/typeurl" "github.com/containerd/typeurl"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
) )
type ImportOpt func(*ImageImportStream)
func WithForceCompression(s *ImageImportStream) {
s.forceCompress = true
}
// NewImageImportStream returns a image importer via tar stream // NewImageImportStream returns a image importer via tar stream
// TODO: Add import options func NewImageImportStream(stream io.Reader, mediaType string, opts ...ImportOpt) *ImageImportStream {
func NewImageImportStream(stream io.Reader) *ImageImportStream { s := &ImageImportStream{
return &ImageImportStream{ stream: stream,
stream: stream, mediaType: mediaType,
} }
for _, opt := range opts {
opt(s)
}
return s
} }
type ImageImportStream struct { type ImageImportStream struct {
stream io.Reader stream io.Reader
mediaType string
forceCompress bool
} }
func (iis *ImageImportStream) ImportStream(context.Context) (io.Reader, error) { func (iis *ImageImportStream) ImportStream(context.Context) (io.Reader, error) {
return iis.stream, nil return iis.stream, nil
} }
func (iis *ImageImportStream) Import(ctx context.Context, store content.Store) (ocispec.Descriptor, error) {
var opts []archive.ImportOpt
if iis.forceCompress {
opts = append(opts, archive.WithImportCompression())
}
return archive.ImportIndex(ctx, store, iis.stream, opts...)
}
func (iis *ImageImportStream) MarshalAny(ctx context.Context, sm streaming.StreamCreator) (typeurl.Any, error) { func (iis *ImageImportStream) MarshalAny(ctx context.Context, sm streaming.StreamCreator) (typeurl.Any, error) {
sid := tstreaming.GenerateID("import") sid := tstreaming.GenerateID("import")
stream, err := sm.Create(ctx, sid) stream, err := sm.Create(ctx, sid)
@ -52,7 +75,9 @@ func (iis *ImageImportStream) MarshalAny(ctx context.Context, sm streaming.Strea
tstreaming.SendStream(ctx, iis.stream, stream) tstreaming.SendStream(ctx, iis.stream, stream)
s := &transferapi.ImageImportStream{ s := &transferapi.ImageImportStream{
Stream: sid, Stream: sid,
MediaType: iis.mediaType,
ForceCompress: iis.forceCompress,
} }
return typeurl.MarshalAny(s) return typeurl.MarshalAny(s)
@ -71,6 +96,8 @@ func (iis *ImageImportStream) UnmarshalAny(ctx context.Context, sm streaming.Str
} }
iis.stream = tstreaming.ReceiveStream(ctx, stream) iis.stream = tstreaming.ReceiveStream(ctx, stream)
iis.mediaType = s.MediaType
iis.forceCompress = s.ForceCompress
return nil return nil
} }

View File

@ -25,6 +25,7 @@ import (
"github.com/containerd/containerd/content" "github.com/containerd/containerd/content"
"github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/images" "github.com/containerd/containerd/images"
"github.com/containerd/containerd/images/archive"
"github.com/containerd/containerd/pkg/streaming" "github.com/containerd/containerd/pkg/streaming"
"github.com/containerd/containerd/pkg/transfer/plugins" "github.com/containerd/containerd/pkg/transfer/plugins"
"github.com/containerd/containerd/pkg/unpack" "github.com/containerd/containerd/pkg/unpack"
@ -47,6 +48,12 @@ type Store struct {
labelMap func(ocispec.Descriptor) []string labelMap func(ocispec.Descriptor) []string
manifestLimit int manifestLimit int
//import image options
namePrefix string
checkPrefix bool
digestRefs bool
alwaysDigest bool
unpacks []UnpackConfiguration unpacks []UnpackConfiguration
} }
@ -86,6 +93,25 @@ func WithAllMetadata(s *Store) {
s.allMetadata = true s.allMetadata = true
} }
// WithNamePrefix sets the name prefix for imported images, if
// check is enabled, then only images with the prefix are stored.
func WithNamePrefix(prefix string, check bool) StoreOpt {
return func(s *Store) {
s.namePrefix = prefix
s.checkPrefix = check
}
}
// WithDigestRefs sets digest refs for imported images, if
// always is enabled, then digest refs are added even if a
// non-digest image name is added for the same image.
func WithDigestRefs(always bool) StoreOpt {
return func(s *Store) {
s.digestRefs = true
s.alwaysDigest = always
}
}
// WithUnpack specifies a platform to unpack for and an optional snapshotter to use // WithUnpack specifies a platform to unpack for and an optional snapshotter to use
func WithUnpack(p ocispec.Platform, snapshotter string) StoreOpt { func WithUnpack(p ocispec.Platform, snapshotter string) StoreOpt {
return func(s *Store) { return func(s *Store) {
@ -144,6 +170,35 @@ func (is *Store) Store(ctx context.Context, desc ocispec.Descriptor, store image
Labels: is.imageLabels, Labels: is.imageLabels,
} }
// Handle imported image names
if refType, ok := desc.Annotations["io.containerd.import.ref-type"]; ok {
var nameT func(string) string
if is.checkPrefix {
nameT = archive.FilterRefPrefix(is.namePrefix)
} else {
nameT = archive.AddRefPrefix(is.namePrefix)
}
name := imageName(desc.Annotations, nameT)
switch refType {
case "name":
if name == "" {
return images.Image{}, fmt.Errorf("no image name: %w", errdefs.ErrNotFound)
}
img.Name = name
case "digest":
if !is.digestRefs || (!is.alwaysDigest && name != "") {
return images.Image{}, fmt.Errorf("no digest refs: %w", errdefs.ErrNotFound)
}
img.Name = fmt.Sprintf("%s@%s", is.namePrefix, desc.Digest)
default:
return images.Image{}, fmt.Errorf("ref type not supported: %w", errdefs.ErrInvalidArgument)
}
delete(desc.Annotations, "io.containerd.import.ref-type")
} else if img.Name == "" {
// No valid image combination found
return images.Image{}, fmt.Errorf("no image name found: %w", errdefs.ErrNotFound)
}
for { for {
if created, err := store.Create(ctx, img); err != nil { if created, err := store.Create(ctx, img); err != nil {
if !errdefs.IsAlreadyExists(err) { if !errdefs.IsAlreadyExists(err) {
@ -189,6 +244,10 @@ func (is *Store) MarshalAny(context.Context, streaming.StreamCreator) (typeurl.A
ManifestLimit: uint32(is.manifestLimit), ManifestLimit: uint32(is.manifestLimit),
AllMetadata: is.allMetadata, AllMetadata: is.allMetadata,
Platforms: platformsToProto(is.platforms), Platforms: platformsToProto(is.platforms),
Prefix: is.namePrefix,
CheckPrefix: is.checkPrefix,
DigestRefs: is.digestRefs,
AlwaysDigest: is.alwaysDigest,
Unpacks: unpackToProto(is.unpacks), Unpacks: unpackToProto(is.unpacks),
} }
return typeurl.MarshalAny(s) return typeurl.MarshalAny(s)
@ -205,6 +264,10 @@ func (is *Store) UnmarshalAny(ctx context.Context, sm streaming.StreamGetter, a
is.manifestLimit = int(s.ManifestLimit) is.manifestLimit = int(s.ManifestLimit)
is.allMetadata = s.AllMetadata is.allMetadata = s.AllMetadata
is.platforms = platformFromProto(s.Platforms) is.platforms = platformFromProto(s.Platforms)
is.namePrefix = s.Prefix
is.checkPrefix = s.CheckPrefix
is.digestRefs = s.DigestRefs
is.alwaysDigest = s.AlwaysDigest
is.unpacks = unpackFromProto(s.Unpacks) is.unpacks = unpackFromProto(s.Unpacks)
return nil return nil
@ -262,3 +325,17 @@ func unpackFromProto(auc []*transfertypes.UnpackConfiguration) []UnpackConfigura
} }
return uc return uc
} }
func imageName(annotations map[string]string, ociCleanup func(string) string) string {
name := annotations[images.AnnotationImageName]
if name != "" {
return name
}
name = annotations[ocispec.AnnotationRefName]
if name != "" {
if ociCleanup != nil {
name = ociCleanup(name)
}
}
return name
}

View File

@ -0,0 +1,127 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package local
import (
"context"
"encoding/json"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/pkg/transfer"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
)
func (ts *localTransferService) importStream(ctx context.Context, i transfer.ImageImporter, is transfer.ImageStorer, tops *transfer.Config) error {
ctx, done, err := ts.withLease(ctx)
if err != nil {
return err
}
defer done(ctx)
if tops.Progress != nil {
tops.Progress(transfer.Progress{
Event: "Importing",
})
}
index, err := i.Import(ctx, ts.content)
if err != nil {
return err
}
var descriptors []ocispec.Descriptor
// If save index, add index
descriptors = append(descriptors, index)
var handler images.HandlerFunc = func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
// Only save images at top level
if desc.Digest != index.Digest {
return images.Children(ctx, ts.content, desc)
}
p, err := content.ReadBlob(ctx, ts.content, desc)
if err != nil {
return nil, err
}
var idx ocispec.Index
if err := json.Unmarshal(p, &idx); err != nil {
return nil, err
}
for _, m := range idx.Manifests {
m1 := m
m1.Annotations = mergeMap(m.Annotations, map[string]string{"io.containerd.import.ref-type": "name"})
descriptors = append(descriptors, m1)
// If add digest references, add twice
m2 := m
m2.Annotations = mergeMap(m.Annotations, map[string]string{"io.containerd.import.ref-type": "digest"})
descriptors = append(descriptors, m2)
}
return idx.Manifests, nil
}
if f, ok := is.(transfer.ImageFilterer); ok {
handler = f.ImageFilter(handler, ts.content)
}
if err := images.WalkNotEmpty(ctx, handler, index); err != nil {
return err
}
for _, desc := range descriptors {
img, err := is.Store(ctx, desc, ts.images)
if err != nil {
if errdefs.IsNotFound(err) {
continue
}
return err
}
if tops.Progress != nil {
tops.Progress(transfer.Progress{
Event: "saved",
Name: img.Name,
//Digest: img.Target.Digest.String(),
})
}
}
if tops.Progress != nil {
tops.Progress(transfer.Progress{
Event: "Completed import",
})
}
return nil
}
func mergeMap(m1, m2 map[string]string) map[string]string {
merged := make(map[string]string, len(m1)+len(m2))
for k, v := range m1 {
merged[k] = v
}
for k, v := range m2 {
merged[k] = v
}
return merged
}

View File

@ -75,13 +75,12 @@ func (ts *localTransferService) Transfer(ctx context.Context, src interface{}, d
case transfer.ImagePusher: case transfer.ImagePusher:
return ts.push(ctx, s, d, topts) return ts.push(ctx, s, d, topts)
} }
case transfer.ImageImportStreamer: case transfer.ImageImporter:
switch d := dest.(type) { switch d := dest.(type) {
case transfer.ImageExportStreamer: case transfer.ImageExportStreamer:
return ts.echo(ctx, s, d, topts) return ts.echo(ctx, s, d, topts)
case transfer.ImageStorer:
// Image import return ts.importStream(ctx, s, d, topts)
// case transfer.ImageStorer
} }
} }
return fmt.Errorf("unable to transfer from %s to %s: %w", name(src), name(dest), errdefs.ErrNotImplemented) return fmt.Errorf("unable to transfer from %s to %s: %w", name(src), name(dest), errdefs.ErrNotImplemented)
@ -100,8 +99,12 @@ func name(t interface{}) string {
// echo is mostly used for testing, it implements an import->export which is // echo is mostly used for testing, it implements an import->export which is
// a no-op which only roundtrips the bytes. // a no-op which only roundtrips the bytes.
func (ts *localTransferService) echo(ctx context.Context, i transfer.ImageImportStreamer, e transfer.ImageExportStreamer, tops *transfer.Config) error { func (ts *localTransferService) echo(ctx context.Context, i transfer.ImageImporter, e transfer.ImageExportStreamer, tops *transfer.Config) error {
r, err := i.ImportStream(ctx) iis, ok := i.(transfer.ImageImportStreamer)
if !ok {
return fmt.Errorf("echo requires access to raw stream: %w", errdefs.ErrNotImplemented)
}
r, _, err := iis.ImportStream(ctx)
if err != nil { if err != nil {
return err return err
} }

View File

@ -68,11 +68,16 @@ type ImageGetter interface {
Get(context.Context, images.Store) (images.Image, error) Get(context.Context, images.Store) (images.Image, error)
} }
// ImageImporter imports an image into a content store
type ImageImporter interface {
Import(context.Context, content.Store) (ocispec.Descriptor, error)
}
// ImageImportStreamer returns an import streamer based on OCI or // ImageImportStreamer returns an import streamer based on OCI or
// Docker image tar archives. The stream should be a raw tar stream // Docker image tar archives. The stream should be a raw tar stream
// and without compression. // and without compression.
type ImageImportStreamer interface { type ImageImportStreamer interface {
ImportStream(context.Context) (io.Reader, error) ImportStream(context.Context) (io.Reader, string, error)
} }
type ImageExportStreamer interface { type ImageExportStreamer interface {