From 944a9ade37de5914aae5069f19687298a908fb0f Mon Sep 17 00:00:00 2001 From: Derek McGowan Date: Fri, 2 Feb 2018 16:44:46 -0800 Subject: [PATCH] Update fetch handling Fix issue where manifest content must always be fetched even if it is already fully downloaded or shared locally. Simplify children label setting and platform filtering. Prevent getting a fetcher when content shared locally. Signed-off-by: Derek McGowan --- client.go | 11 +++++- images/handlers.go | 75 ++++++++++++++++++++++++++++++++++- images/image.go | 22 ++--------- images/oci/exporter.go | 2 +- remotes/handlers.go | 89 ++++++++---------------------------------- 5 files changed, 103 insertions(+), 96 deletions(-) diff --git a/client.go b/client.go index b2caa37a1..3ae04154b 100644 --- a/client.go +++ b/client.go @@ -234,10 +234,17 @@ func (c *Client) Pull(ctx context.Context, ref string, opts ...RemoteOpt) (Image schema1Converter = schema1.NewConverter(store, fetcher) handler = images.Handlers(append(pullCtx.BaseHandlers, schema1Converter)...) } else { + // Get all the children for a descriptor + childrenHandler := images.ChildrenHandler(store) + // Set any children labels for that content + childrenHandler = images.SetChildrenLabels(store, childrenHandler) + // Filter the childen by the platform + childrenHandler = images.FilterPlatform(platforms.Default(), childrenHandler) + handler = images.Handlers(append(pullCtx.BaseHandlers, remotes.FetchHandler(store, fetcher), - images.ChildrenHandler(store, platforms.Default()))..., - ) + childrenHandler, + )...) } if err := images.Dispatch(ctx, handler, desc); err != nil { diff --git a/images/handlers.go b/images/handlers.go index 63acdb722..e8ca3f347 100644 --- a/images/handlers.go +++ b/images/handlers.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/containerd/containerd/content" + "github.com/containerd/containerd/platforms" ocispec "github.com/opencontainers/image-spec/specs-go/v1" "github.com/pkg/errors" "golang.org/x/sync/errgroup" @@ -128,8 +129,78 @@ func Dispatch(ctx context.Context, handler Handler, descs ...ocispec.Descriptor) // // One can also replace this with another implementation to allow descending of // arbitrary types. -func ChildrenHandler(provider content.Provider, platform string) HandlerFunc { +func ChildrenHandler(provider content.Provider) HandlerFunc { return func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) { - return Children(ctx, provider, desc, platform) + return Children(ctx, provider, desc) + } +} + +// SetChildrenLabels is a handler wrapper which sets labels for the content on +// the children returned by the handler and passes through the children. +// Must follow a handler that returns the children to be labeled. +func SetChildrenLabels(manager content.Manager, f HandlerFunc) HandlerFunc { + return func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) { + children, err := f(ctx, desc) + if err != nil { + return children, err + } + + if len(children) > 0 { + info := content.Info{ + Digest: desc.Digest, + Labels: map[string]string{}, + } + fields := []string{} + for i, ch := range children { + info.Labels[fmt.Sprintf("containerd.io/gc.ref.content.%d", i)] = ch.Digest.String() + fields = append(fields, fmt.Sprintf("labels.containerd.io/gc.ref.content.%d", i)) + } + + _, err := manager.Update(ctx, info, fields...) + if err != nil { + return nil, err + } + } + + return children, err + } +} + +// FilterPlatform is a handler wrapper which limits the descriptors returned +// by a handler to a single platform. +func FilterPlatform(platform string, f HandlerFunc) HandlerFunc { + return func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) { + children, err := f(ctx, desc) + if err != nil { + return children, err + } + + var descs []ocispec.Descriptor + if platform != "" && isMultiPlatform(desc.MediaType) { + matcher, err := platforms.Parse(platform) + if err != nil { + return nil, err + } + + for _, d := range children { + if d.Platform == nil || matcher.Match(*d.Platform) { + descs = append(descs, d) + } + } + } else { + descs = children + } + + return descs, nil + } + +} + +func isMultiPlatform(mediaType string) bool { + switch mediaType { + case MediaTypeDockerSchema2ManifestList, ocispec.MediaTypeImageIndex: + return true + default: + return false } } diff --git a/images/image.go b/images/image.go index 5fea0dcfc..7e808ec5e 100644 --- a/images/image.go +++ b/images/image.go @@ -102,7 +102,7 @@ func (image *Image) Size(ctx context.Context, provider content.Provider, platfor } size += desc.Size return nil, nil - }), ChildrenHandler(provider, platform)), image.Target) + }), FilterPlatform(platform, ChildrenHandler(provider))), image.Target) } // Manifest resolves a manifest from the image for the given platform. @@ -238,7 +238,7 @@ func Platforms(ctx context.Context, provider content.Provider, image ocispec.Des platforms.Normalize(ocispec.Platform{OS: image.OS, Architecture: image.Architecture})) } return nil, nil - }), ChildrenHandler(provider, "")), image) + }), ChildrenHandler(provider)), image) } // Check returns nil if the all components of an image are available in the @@ -285,7 +285,7 @@ func Check(ctx context.Context, provider content.Provider, image ocispec.Descrip } // Children returns the immediate children of content described by the descriptor. -func Children(ctx context.Context, provider content.Provider, desc ocispec.Descriptor, platform string) ([]ocispec.Descriptor, error) { +func Children(ctx context.Context, provider content.Provider, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) { var descs []ocispec.Descriptor switch desc.MediaType { case MediaTypeDockerSchema2Manifest, ocispec.MediaTypeImageManifest: @@ -314,21 +314,7 @@ func Children(ctx context.Context, provider content.Provider, desc ocispec.Descr return nil, err } - if platform != "" { - matcher, err := platforms.Parse(platform) - if err != nil { - return nil, err - } - - for _, d := range index.Manifests { - if d.Platform == nil || matcher.Match(*d.Platform) { - descs = append(descs, d) - } - } - } else { - descs = append(descs, index.Manifests...) - } - + descs = append(descs, index.Manifests...) case MediaTypeDockerSchema2Layer, MediaTypeDockerSchema2LayerGzip, MediaTypeDockerSchema2LayerForeign, MediaTypeDockerSchema2LayerForeignGzip, MediaTypeDockerSchema2Config, ocispec.MediaTypeImageConfig, diff --git a/images/oci/exporter.go b/images/oci/exporter.go index 9a559aa6e..b1d832763 100644 --- a/images/oci/exporter.go +++ b/images/oci/exporter.go @@ -42,7 +42,7 @@ func (oe *V1Exporter) Export(ctx context.Context, store content.Store, desc ocis } handlers := images.Handlers( - images.ChildrenHandler(store, platforms.Default()), + images.FilterPlatform(platforms.Default(), images.ChildrenHandler(store)), images.HandlerFunc(exportHandler), ) diff --git a/remotes/handlers.go b/remotes/handlers.go index a37af65e0..5c4f49d8a 100644 --- a/remotes/handlers.go +++ b/remotes/handlers.go @@ -2,7 +2,6 @@ package remotes import ( "context" - "encoding/json" "fmt" "io" "math/rand" @@ -102,83 +101,27 @@ func fetch(ctx context.Context, ingester content.Ingester, fetcher Fetcher, desc break } + ws, err := cw.Status() + if err != nil { + return err + } + + if ws.Offset == desc.Size { + // If writer is already complete, commit and return + err := cw.Commit(ctx, desc.Size, desc.Digest) + if err != nil && !errdefs.IsAlreadyExists(err) { + return errors.Wrapf(err, "failed commit on ref %q", ws.Ref) + } + return nil + } + rc, err := fetcher.Fetch(ctx, desc) if err != nil { return err } defer rc.Close() - r, opts := commitOpts(desc, rc) - return content.Copy(ctx, cw, r, desc.Size, desc.Digest, opts...) -} - -// commitOpts gets the appropriate content options to alter -// the content info on commit based on media type. -func commitOpts(desc ocispec.Descriptor, r io.Reader) (io.Reader, []content.Opt) { - var childrenF func(r io.Reader) ([]ocispec.Descriptor, error) - - // TODO(AkihiroSuda): use images/oci.GetChildrenDescriptors? - switch desc.MediaType { - case images.MediaTypeDockerSchema2Manifest, ocispec.MediaTypeImageManifest: - childrenF = func(r io.Reader) ([]ocispec.Descriptor, error) { - var ( - manifest ocispec.Manifest - decoder = json.NewDecoder(r) - ) - if err := decoder.Decode(&manifest); err != nil { - return nil, err - } - - return append([]ocispec.Descriptor{manifest.Config}, manifest.Layers...), nil - } - case images.MediaTypeDockerSchema2ManifestList, ocispec.MediaTypeImageIndex: - childrenF = func(r io.Reader) ([]ocispec.Descriptor, error) { - var ( - index ocispec.Index - decoder = json.NewDecoder(r) - ) - if err := decoder.Decode(&index); err != nil { - return nil, err - } - - return index.Manifests, nil - } - default: - return r, nil - } - - pr, pw := io.Pipe() - - var children []ocispec.Descriptor - errC := make(chan error) - - go func() { - defer close(errC) - ch, err := childrenF(pr) - if err != nil { - errC <- err - } - children = ch - }() - - opt := func(info *content.Info) error { - err := <-errC - if err != nil { - return errors.Wrap(err, "unable to get commit labels") - } - - if len(children) > 0 { - if info.Labels == nil { - info.Labels = map[string]string{} - } - for i, ch := range children { - info.Labels[fmt.Sprintf("containerd.io/gc.ref.content.%d", i)] = ch.Digest.String() - } - } - return nil - } - - return io.TeeReader(r, pw), []content.Opt{opt} + return content.Copy(ctx, cw, rc, desc.Size, desc.Digest) } // PushHandler returns a handler that will push all content from the provider @@ -243,7 +186,7 @@ func PushContent(ctx context.Context, pusher Pusher, desc ocispec.Descriptor, pr pushHandler := PushHandler(pusher, provider) handlers := append(baseHandlers, - images.ChildrenHandler(provider, platforms.Default()), + images.FilterPlatform(platforms.Default(), images.ChildrenHandler(provider)), filterHandler, pushHandler, )