For remote snapshotter cases it's quite often there is need to pass extra info from client (for instance - registry URL to query remote layer from, credentials, etc). This commit slightly extends WithPullSnapshotter to pass extra labels to a snapshotter. Signed-off-by: Maksym Pavlenko <makpav@amazon.com>
		
			
				
	
	
		
			256 lines
		
	
	
		
			7.0 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			256 lines
		
	
	
		
			7.0 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
/*
 | 
						|
   Copyright The containerd Authors.
 | 
						|
 | 
						|
   Licensed under the Apache License, Version 2.0 (the "License");
 | 
						|
   you may not use this file except in compliance with the License.
 | 
						|
   You may obtain a copy of the License at
 | 
						|
 | 
						|
       http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
 | 
						|
   Unless required by applicable law or agreed to in writing, software
 | 
						|
   distributed under the License is distributed on an "AS IS" BASIS,
 | 
						|
   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
						|
   See the License for the specific language governing permissions and
 | 
						|
   limitations under the License.
 | 
						|
*/
 | 
						|
 | 
						|
package containerd
 | 
						|
 | 
						|
import (
 | 
						|
	"context"
 | 
						|
 | 
						|
	"github.com/containerd/containerd/errdefs"
 | 
						|
	"github.com/containerd/containerd/images"
 | 
						|
	"github.com/containerd/containerd/platforms"
 | 
						|
	"github.com/containerd/containerd/remotes"
 | 
						|
	"github.com/containerd/containerd/remotes/docker"
 | 
						|
	"github.com/containerd/containerd/remotes/docker/schema1"
 | 
						|
	ocispec "github.com/opencontainers/image-spec/specs-go/v1"
 | 
						|
	"github.com/pkg/errors"
 | 
						|
	"golang.org/x/sync/errgroup"
 | 
						|
	"golang.org/x/sync/semaphore"
 | 
						|
)
 | 
						|
 | 
						|
// Pull downloads the provided content into containerd's content store
 | 
						|
// and returns a platform specific image object
 | 
						|
func (c *Client) Pull(ctx context.Context, ref string, opts ...RemoteOpt) (_ Image, retErr error) {
 | 
						|
	pullCtx := defaultRemoteContext()
 | 
						|
	for _, o := range opts {
 | 
						|
		if err := o(c, pullCtx); err != nil {
 | 
						|
			return nil, err
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	if pullCtx.PlatformMatcher == nil {
 | 
						|
		if len(pullCtx.Platforms) > 1 {
 | 
						|
			return nil, errors.New("cannot pull multiplatform image locally, try Fetch")
 | 
						|
		} else if len(pullCtx.Platforms) == 0 {
 | 
						|
			pullCtx.PlatformMatcher = c.platform
 | 
						|
		} else {
 | 
						|
			p, err := platforms.Parse(pullCtx.Platforms[0])
 | 
						|
			if err != nil {
 | 
						|
				return nil, errors.Wrapf(err, "invalid platform %s", pullCtx.Platforms[0])
 | 
						|
			}
 | 
						|
 | 
						|
			pullCtx.PlatformMatcher = platforms.Only(p)
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	ctx, done, err := c.WithLease(ctx)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	defer done(ctx)
 | 
						|
 | 
						|
	var unpacks int32
 | 
						|
	var unpackEg *errgroup.Group
 | 
						|
	var unpackWrapper func(f images.Handler) images.Handler
 | 
						|
 | 
						|
	if pullCtx.Unpack {
 | 
						|
		// unpacker only supports schema 2 image, for schema 1 this is noop.
 | 
						|
		u, err := c.newUnpacker(ctx, pullCtx)
 | 
						|
		if err != nil {
 | 
						|
			return nil, errors.Wrap(err, "create unpacker")
 | 
						|
		}
 | 
						|
		unpackWrapper, unpackEg = u.handlerWrapper(ctx, pullCtx, &unpacks)
 | 
						|
		defer func() {
 | 
						|
			if err := unpackEg.Wait(); err != nil {
 | 
						|
				if retErr == nil {
 | 
						|
					retErr = errors.Wrap(err, "unpack")
 | 
						|
				}
 | 
						|
			}
 | 
						|
		}()
 | 
						|
		wrapper := pullCtx.HandlerWrapper
 | 
						|
		pullCtx.HandlerWrapper = func(h images.Handler) images.Handler {
 | 
						|
			if wrapper == nil {
 | 
						|
				return unpackWrapper(h)
 | 
						|
			}
 | 
						|
			return unpackWrapper(wrapper(h))
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	img, err := c.fetch(ctx, pullCtx, ref, 1)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	// NOTE(fuweid): unpacker defers blobs download. before create image
 | 
						|
	// record in ImageService, should wait for unpacking(including blobs
 | 
						|
	// download).
 | 
						|
	if pullCtx.Unpack {
 | 
						|
		if unpackEg != nil {
 | 
						|
			if err := unpackEg.Wait(); err != nil {
 | 
						|
				return nil, err
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	img, err = c.createNewImage(ctx, img)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	i := NewImageWithPlatform(c, img, pullCtx.PlatformMatcher)
 | 
						|
 | 
						|
	if pullCtx.Unpack {
 | 
						|
		if unpacks == 0 {
 | 
						|
			// Try to unpack is none is done previously.
 | 
						|
			// This is at least required for schema 1 image.
 | 
						|
			if err := i.Unpack(ctx, pullCtx.Snapshotter, pullCtx.UnpackOpts...); err != nil {
 | 
						|
				return nil, errors.Wrapf(err, "failed to unpack image on snapshotter %s", pullCtx.Snapshotter)
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	return i, nil
 | 
						|
}
 | 
						|
 | 
						|
func (c *Client) fetch(ctx context.Context, rCtx *RemoteContext, ref string, limit int) (images.Image, error) {
 | 
						|
	store := c.ContentStore()
 | 
						|
	name, desc, err := rCtx.Resolver.Resolve(ctx, ref)
 | 
						|
	if err != nil {
 | 
						|
		return images.Image{}, errors.Wrapf(err, "failed to resolve reference %q", ref)
 | 
						|
	}
 | 
						|
 | 
						|
	fetcher, err := rCtx.Resolver.Fetcher(ctx, name)
 | 
						|
	if err != nil {
 | 
						|
		return images.Image{}, errors.Wrapf(err, "failed to get fetcher for %q", name)
 | 
						|
	}
 | 
						|
 | 
						|
	var (
 | 
						|
		handler images.Handler
 | 
						|
 | 
						|
		isConvertible bool
 | 
						|
		converterFunc func(context.Context, ocispec.Descriptor) (ocispec.Descriptor, error)
 | 
						|
		limiter       *semaphore.Weighted
 | 
						|
	)
 | 
						|
 | 
						|
	if desc.MediaType == images.MediaTypeDockerSchema1Manifest && rCtx.ConvertSchema1 {
 | 
						|
		schema1Converter := schema1.NewConverter(store, fetcher)
 | 
						|
 | 
						|
		handler = images.Handlers(append(rCtx.BaseHandlers, schema1Converter)...)
 | 
						|
 | 
						|
		isConvertible = true
 | 
						|
 | 
						|
		converterFunc = func(ctx context.Context, _ ocispec.Descriptor) (ocispec.Descriptor, error) {
 | 
						|
			return schema1Converter.Convert(ctx)
 | 
						|
		}
 | 
						|
	} else {
 | 
						|
		// Get all the children for a descriptor
 | 
						|
		childrenHandler := images.ChildrenHandler(store)
 | 
						|
		// Set any children labels for that content
 | 
						|
		childrenHandler = images.SetChildrenLabels(store, childrenHandler)
 | 
						|
		if rCtx.AllMetadata {
 | 
						|
			// Filter manifests by platforms but allow to handle manifest
 | 
						|
			// and configuration for not-target platforms
 | 
						|
			childrenHandler = remotes.FilterManifestByPlatformHandler(childrenHandler, rCtx.PlatformMatcher)
 | 
						|
		} else {
 | 
						|
			// Filter children by platforms if specified.
 | 
						|
			childrenHandler = images.FilterPlatforms(childrenHandler, rCtx.PlatformMatcher)
 | 
						|
		}
 | 
						|
		// Sort and limit manifests if a finite number is needed
 | 
						|
		if limit > 0 {
 | 
						|
			childrenHandler = images.LimitManifests(childrenHandler, rCtx.PlatformMatcher, limit)
 | 
						|
		}
 | 
						|
 | 
						|
		// set isConvertible to true if there is application/octet-stream media type
 | 
						|
		convertibleHandler := images.HandlerFunc(
 | 
						|
			func(_ context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
 | 
						|
				if desc.MediaType == docker.LegacyConfigMediaType {
 | 
						|
					isConvertible = true
 | 
						|
				}
 | 
						|
 | 
						|
				return []ocispec.Descriptor{}, nil
 | 
						|
			},
 | 
						|
		)
 | 
						|
 | 
						|
		appendDistSrcLabelHandler, err := docker.AppendDistributionSourceLabel(store, ref)
 | 
						|
		if err != nil {
 | 
						|
			return images.Image{}, err
 | 
						|
		}
 | 
						|
 | 
						|
		handlers := append(rCtx.BaseHandlers,
 | 
						|
			remotes.FetchHandler(store, fetcher),
 | 
						|
			convertibleHandler,
 | 
						|
			childrenHandler,
 | 
						|
			appendDistSrcLabelHandler,
 | 
						|
		)
 | 
						|
 | 
						|
		handler = images.Handlers(handlers...)
 | 
						|
 | 
						|
		converterFunc = func(ctx context.Context, desc ocispec.Descriptor) (ocispec.Descriptor, error) {
 | 
						|
			return docker.ConvertManifest(ctx, store, desc)
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	if rCtx.HandlerWrapper != nil {
 | 
						|
		handler = rCtx.HandlerWrapper(handler)
 | 
						|
	}
 | 
						|
 | 
						|
	if rCtx.MaxConcurrentDownloads > 0 {
 | 
						|
		limiter = semaphore.NewWeighted(int64(rCtx.MaxConcurrentDownloads))
 | 
						|
	}
 | 
						|
 | 
						|
	if err := images.Dispatch(ctx, handler, limiter, desc); err != nil {
 | 
						|
		return images.Image{}, err
 | 
						|
	}
 | 
						|
 | 
						|
	if isConvertible {
 | 
						|
		if desc, err = converterFunc(ctx, desc); err != nil {
 | 
						|
			return images.Image{}, err
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	return images.Image{
 | 
						|
		Name:   name,
 | 
						|
		Target: desc,
 | 
						|
		Labels: rCtx.Labels,
 | 
						|
	}, nil
 | 
						|
}
 | 
						|
 | 
						|
func (c *Client) createNewImage(ctx context.Context, img images.Image) (images.Image, error) {
 | 
						|
	is := c.ImageService()
 | 
						|
	for {
 | 
						|
		if created, err := is.Create(ctx, img); err != nil {
 | 
						|
			if !errdefs.IsAlreadyExists(err) {
 | 
						|
				return images.Image{}, err
 | 
						|
			}
 | 
						|
 | 
						|
			updated, err := is.Update(ctx, img)
 | 
						|
			if err != nil {
 | 
						|
				// if image was removed, try create again
 | 
						|
				if errdefs.IsNotFound(err) {
 | 
						|
					continue
 | 
						|
				}
 | 
						|
				return images.Image{}, err
 | 
						|
			}
 | 
						|
 | 
						|
			img = updated
 | 
						|
		} else {
 | 
						|
			img = created
 | 
						|
		}
 | 
						|
 | 
						|
		return img, nil
 | 
						|
	}
 | 
						|
}
 |