Update fetch handling

Fix issue where manifest content must always be fetched
even if it is already fully downloaded or shared locally.
Simplify children label setting and platform filtering.
Prevent getting a fetcher when content shared locally.

Signed-off-by: Derek McGowan <derek@mcgstyle.net>
This commit is contained in:
Derek McGowan 2018-02-02 16:44:46 -08:00
parent ee6ffdd91e
commit 944a9ade37
No known key found for this signature in database
GPG Key ID: F58C5D0A4405ACDB
5 changed files with 103 additions and 96 deletions

View File

@ -234,10 +234,17 @@ func (c *Client) Pull(ctx context.Context, ref string, opts ...RemoteOpt) (Image
schema1Converter = schema1.NewConverter(store, fetcher)
handler = images.Handlers(append(pullCtx.BaseHandlers, schema1Converter)...)
} else {
// Get all the children for a descriptor
childrenHandler := images.ChildrenHandler(store)
// Set any children labels for that content
childrenHandler = images.SetChildrenLabels(store, childrenHandler)
// Filter the childen by the platform
childrenHandler = images.FilterPlatform(platforms.Default(), childrenHandler)
handler = images.Handlers(append(pullCtx.BaseHandlers,
remotes.FetchHandler(store, fetcher),
images.ChildrenHandler(store, platforms.Default()))...,
)
childrenHandler,
)...)
}
if err := images.Dispatch(ctx, handler, desc); err != nil {

View File

@ -5,6 +5,7 @@ import (
"fmt"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/platforms"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
@ -128,8 +129,78 @@ func Dispatch(ctx context.Context, handler Handler, descs ...ocispec.Descriptor)
//
// One can also replace this with another implementation to allow descending of
// arbitrary types.
func ChildrenHandler(provider content.Provider, platform string) HandlerFunc {
func ChildrenHandler(provider content.Provider) HandlerFunc {
return func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
return Children(ctx, provider, desc, platform)
return Children(ctx, provider, desc)
}
}
// SetChildrenLabels is a handler wrapper which sets labels for the content on
// the children returned by the handler and passes through the children.
// Must follow a handler that returns the children to be labeled.
func SetChildrenLabels(manager content.Manager, f HandlerFunc) HandlerFunc {
return func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
children, err := f(ctx, desc)
if err != nil {
return children, err
}
if len(children) > 0 {
info := content.Info{
Digest: desc.Digest,
Labels: map[string]string{},
}
fields := []string{}
for i, ch := range children {
info.Labels[fmt.Sprintf("containerd.io/gc.ref.content.%d", i)] = ch.Digest.String()
fields = append(fields, fmt.Sprintf("labels.containerd.io/gc.ref.content.%d", i))
}
_, err := manager.Update(ctx, info, fields...)
if err != nil {
return nil, err
}
}
return children, err
}
}
// FilterPlatform is a handler wrapper which limits the descriptors returned
// by a handler to a single platform.
func FilterPlatform(platform string, f HandlerFunc) HandlerFunc {
return func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
children, err := f(ctx, desc)
if err != nil {
return children, err
}
var descs []ocispec.Descriptor
if platform != "" && isMultiPlatform(desc.MediaType) {
matcher, err := platforms.Parse(platform)
if err != nil {
return nil, err
}
for _, d := range children {
if d.Platform == nil || matcher.Match(*d.Platform) {
descs = append(descs, d)
}
}
} else {
descs = children
}
return descs, nil
}
}
func isMultiPlatform(mediaType string) bool {
switch mediaType {
case MediaTypeDockerSchema2ManifestList, ocispec.MediaTypeImageIndex:
return true
default:
return false
}
}

View File

@ -102,7 +102,7 @@ func (image *Image) Size(ctx context.Context, provider content.Provider, platfor
}
size += desc.Size
return nil, nil
}), ChildrenHandler(provider, platform)), image.Target)
}), FilterPlatform(platform, ChildrenHandler(provider))), image.Target)
}
// Manifest resolves a manifest from the image for the given platform.
@ -238,7 +238,7 @@ func Platforms(ctx context.Context, provider content.Provider, image ocispec.Des
platforms.Normalize(ocispec.Platform{OS: image.OS, Architecture: image.Architecture}))
}
return nil, nil
}), ChildrenHandler(provider, "")), image)
}), ChildrenHandler(provider)), image)
}
// Check returns nil if the all components of an image are available in the
@ -285,7 +285,7 @@ func Check(ctx context.Context, provider content.Provider, image ocispec.Descrip
}
// Children returns the immediate children of content described by the descriptor.
func Children(ctx context.Context, provider content.Provider, desc ocispec.Descriptor, platform string) ([]ocispec.Descriptor, error) {
func Children(ctx context.Context, provider content.Provider, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
var descs []ocispec.Descriptor
switch desc.MediaType {
case MediaTypeDockerSchema2Manifest, ocispec.MediaTypeImageManifest:
@ -314,21 +314,7 @@ func Children(ctx context.Context, provider content.Provider, desc ocispec.Descr
return nil, err
}
if platform != "" {
matcher, err := platforms.Parse(platform)
if err != nil {
return nil, err
}
for _, d := range index.Manifests {
if d.Platform == nil || matcher.Match(*d.Platform) {
descs = append(descs, d)
}
}
} else {
descs = append(descs, index.Manifests...)
}
descs = append(descs, index.Manifests...)
case MediaTypeDockerSchema2Layer, MediaTypeDockerSchema2LayerGzip,
MediaTypeDockerSchema2LayerForeign, MediaTypeDockerSchema2LayerForeignGzip,
MediaTypeDockerSchema2Config, ocispec.MediaTypeImageConfig,

View File

@ -42,7 +42,7 @@ func (oe *V1Exporter) Export(ctx context.Context, store content.Store, desc ocis
}
handlers := images.Handlers(
images.ChildrenHandler(store, platforms.Default()),
images.FilterPlatform(platforms.Default(), images.ChildrenHandler(store)),
images.HandlerFunc(exportHandler),
)

View File

@ -2,7 +2,6 @@ package remotes
import (
"context"
"encoding/json"
"fmt"
"io"
"math/rand"
@ -102,83 +101,27 @@ func fetch(ctx context.Context, ingester content.Ingester, fetcher Fetcher, desc
break
}
ws, err := cw.Status()
if err != nil {
return err
}
if ws.Offset == desc.Size {
// If writer is already complete, commit and return
err := cw.Commit(ctx, desc.Size, desc.Digest)
if err != nil && !errdefs.IsAlreadyExists(err) {
return errors.Wrapf(err, "failed commit on ref %q", ws.Ref)
}
return nil
}
rc, err := fetcher.Fetch(ctx, desc)
if err != nil {
return err
}
defer rc.Close()
r, opts := commitOpts(desc, rc)
return content.Copy(ctx, cw, r, desc.Size, desc.Digest, opts...)
}
// commitOpts gets the appropriate content options to alter
// the content info on commit based on media type.
func commitOpts(desc ocispec.Descriptor, r io.Reader) (io.Reader, []content.Opt) {
var childrenF func(r io.Reader) ([]ocispec.Descriptor, error)
// TODO(AkihiroSuda): use images/oci.GetChildrenDescriptors?
switch desc.MediaType {
case images.MediaTypeDockerSchema2Manifest, ocispec.MediaTypeImageManifest:
childrenF = func(r io.Reader) ([]ocispec.Descriptor, error) {
var (
manifest ocispec.Manifest
decoder = json.NewDecoder(r)
)
if err := decoder.Decode(&manifest); err != nil {
return nil, err
}
return append([]ocispec.Descriptor{manifest.Config}, manifest.Layers...), nil
}
case images.MediaTypeDockerSchema2ManifestList, ocispec.MediaTypeImageIndex:
childrenF = func(r io.Reader) ([]ocispec.Descriptor, error) {
var (
index ocispec.Index
decoder = json.NewDecoder(r)
)
if err := decoder.Decode(&index); err != nil {
return nil, err
}
return index.Manifests, nil
}
default:
return r, nil
}
pr, pw := io.Pipe()
var children []ocispec.Descriptor
errC := make(chan error)
go func() {
defer close(errC)
ch, err := childrenF(pr)
if err != nil {
errC <- err
}
children = ch
}()
opt := func(info *content.Info) error {
err := <-errC
if err != nil {
return errors.Wrap(err, "unable to get commit labels")
}
if len(children) > 0 {
if info.Labels == nil {
info.Labels = map[string]string{}
}
for i, ch := range children {
info.Labels[fmt.Sprintf("containerd.io/gc.ref.content.%d", i)] = ch.Digest.String()
}
}
return nil
}
return io.TeeReader(r, pw), []content.Opt{opt}
return content.Copy(ctx, cw, rc, desc.Size, desc.Digest)
}
// PushHandler returns a handler that will push all content from the provider
@ -243,7 +186,7 @@ func PushContent(ctx context.Context, pusher Pusher, desc ocispec.Descriptor, pr
pushHandler := PushHandler(pusher, provider)
handlers := append(baseHandlers,
images.ChildrenHandler(provider, platforms.Default()),
images.FilterPlatform(platforms.Default(), images.ChildrenHandler(provider)),
filterHandler,
pushHandler,
)