
Though containerd gives ChainID to backend snapshotters during unpack for searching snapshots to be skipped downloading the contents, ChainID isn't enough for some snapshotters which require additional information of layers. Some examples are remote snapshotters which is based on stargz filesystem (requires image-related information to query the contents to docker registry) and those which is based on CernVM-FS (requires manifest digest, etc. for providing squashed rootfs). This commit solves this issue by enabling a handler to inject additional information of layers to snapshotters during unpack. Signed-off-by: Kohei Tokunaga <ktokunaga.mail@gmail.com>
234 lines
6.4 KiB
Go
234 lines
6.4 KiB
Go
/*
|
|
Copyright The containerd Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package containerd
|
|
|
|
import (
|
|
"context"
|
|
|
|
"github.com/containerd/containerd/errdefs"
|
|
"github.com/containerd/containerd/images"
|
|
"github.com/containerd/containerd/platforms"
|
|
"github.com/containerd/containerd/remotes"
|
|
"github.com/containerd/containerd/remotes/docker"
|
|
"github.com/containerd/containerd/remotes/docker/schema1"
|
|
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
|
|
"github.com/pkg/errors"
|
|
"golang.org/x/sync/semaphore"
|
|
)
|
|
|
|
// Pull downloads the provided content into containerd's content store
|
|
// and returns a platform specific image object
|
|
func (c *Client) Pull(ctx context.Context, ref string, opts ...RemoteOpt) (_ Image, retErr error) {
|
|
pullCtx := defaultRemoteContext()
|
|
for _, o := range opts {
|
|
if err := o(c, pullCtx); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
if pullCtx.PlatformMatcher == nil {
|
|
if len(pullCtx.Platforms) > 1 {
|
|
return nil, errors.New("cannot pull multiplatform image locally, try Fetch")
|
|
} else if len(pullCtx.Platforms) == 0 {
|
|
pullCtx.PlatformMatcher = c.platform
|
|
} else {
|
|
p, err := platforms.Parse(pullCtx.Platforms[0])
|
|
if err != nil {
|
|
return nil, errors.Wrapf(err, "invalid platform %s", pullCtx.Platforms[0])
|
|
}
|
|
|
|
pullCtx.PlatformMatcher = platforms.Only(p)
|
|
}
|
|
}
|
|
|
|
ctx, done, err := c.WithLease(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer done(ctx)
|
|
|
|
var unpacks int32
|
|
if pullCtx.Unpack {
|
|
// unpacker only supports schema 2 image, for schema 1 this is noop.
|
|
u, err := c.newUnpacker(ctx, pullCtx)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "create unpacker")
|
|
}
|
|
unpackWrapper, eg := u.handlerWrapper(ctx, &unpacks)
|
|
defer func() {
|
|
if err := eg.Wait(); err != nil {
|
|
if retErr == nil {
|
|
retErr = errors.Wrap(err, "unpack")
|
|
}
|
|
}
|
|
}()
|
|
wrapper := pullCtx.HandlerWrapper
|
|
pullCtx.HandlerWrapper = func(h images.Handler) images.Handler {
|
|
if wrapper == nil {
|
|
return unpackWrapper(h)
|
|
}
|
|
return unpackWrapper(wrapper(h))
|
|
}
|
|
}
|
|
|
|
img, err := c.fetch(ctx, pullCtx, ref, 1)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
i := NewImageWithPlatform(c, img, pullCtx.PlatformMatcher)
|
|
|
|
if pullCtx.Unpack {
|
|
if unpacks == 0 {
|
|
// Try to unpack is none is done previously.
|
|
// This is at least required for schema 1 image.
|
|
if err := i.Unpack(ctx, pullCtx.Snapshotter, pullCtx.UnpackOpts...); err != nil {
|
|
return nil, errors.Wrapf(err, "failed to unpack image on snapshotter %s", pullCtx.Snapshotter)
|
|
}
|
|
}
|
|
}
|
|
|
|
return i, nil
|
|
}
|
|
|
|
func (c *Client) fetch(ctx context.Context, rCtx *RemoteContext, ref string, limit int) (images.Image, error) {
|
|
store := c.ContentStore()
|
|
name, desc, err := rCtx.Resolver.Resolve(ctx, ref)
|
|
if err != nil {
|
|
return images.Image{}, errors.Wrapf(err, "failed to resolve reference %q", ref)
|
|
}
|
|
|
|
fetcher, err := rCtx.Resolver.Fetcher(ctx, name)
|
|
if err != nil {
|
|
return images.Image{}, errors.Wrapf(err, "failed to get fetcher for %q", name)
|
|
}
|
|
|
|
var (
|
|
handler images.Handler
|
|
|
|
isConvertible bool
|
|
converterFunc func(context.Context, ocispec.Descriptor) (ocispec.Descriptor, error)
|
|
limiter *semaphore.Weighted
|
|
)
|
|
|
|
if desc.MediaType == images.MediaTypeDockerSchema1Manifest && rCtx.ConvertSchema1 {
|
|
schema1Converter := schema1.NewConverter(store, fetcher)
|
|
|
|
handler = images.Handlers(append(rCtx.BaseHandlers, schema1Converter)...)
|
|
|
|
isConvertible = true
|
|
|
|
converterFunc = func(ctx context.Context, _ ocispec.Descriptor) (ocispec.Descriptor, error) {
|
|
return schema1Converter.Convert(ctx)
|
|
}
|
|
} else {
|
|
// Get all the children for a descriptor
|
|
childrenHandler := images.ChildrenHandler(store)
|
|
// Set any children labels for that content
|
|
childrenHandler = images.SetChildrenLabels(store, childrenHandler)
|
|
if rCtx.AllMetadata {
|
|
// Filter manifests by platforms but allow to handle manifest
|
|
// and configuration for not-target platforms
|
|
childrenHandler = remotes.FilterManifestByPlatformHandler(childrenHandler, rCtx.PlatformMatcher)
|
|
} else {
|
|
// Filter children by platforms if specified.
|
|
childrenHandler = images.FilterPlatforms(childrenHandler, rCtx.PlatformMatcher)
|
|
}
|
|
// Sort and limit manifests if a finite number is needed
|
|
if limit > 0 {
|
|
childrenHandler = images.LimitManifests(childrenHandler, rCtx.PlatformMatcher, limit)
|
|
}
|
|
|
|
// set isConvertible to true if there is application/octet-stream media type
|
|
convertibleHandler := images.HandlerFunc(
|
|
func(_ context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
|
|
if desc.MediaType == docker.LegacyConfigMediaType {
|
|
isConvertible = true
|
|
}
|
|
|
|
return []ocispec.Descriptor{}, nil
|
|
},
|
|
)
|
|
|
|
appendDistSrcLabelHandler, err := docker.AppendDistributionSourceLabel(store, ref)
|
|
if err != nil {
|
|
return images.Image{}, err
|
|
}
|
|
|
|
handlers := append(rCtx.BaseHandlers,
|
|
remotes.FetchHandler(store, fetcher),
|
|
convertibleHandler,
|
|
childrenHandler,
|
|
appendDistSrcLabelHandler,
|
|
)
|
|
|
|
handler = images.Handlers(handlers...)
|
|
|
|
converterFunc = func(ctx context.Context, desc ocispec.Descriptor) (ocispec.Descriptor, error) {
|
|
return docker.ConvertManifest(ctx, store, desc)
|
|
}
|
|
}
|
|
|
|
if rCtx.HandlerWrapper != nil {
|
|
handler = rCtx.HandlerWrapper(handler)
|
|
}
|
|
|
|
if rCtx.MaxConcurrentDownloads > 0 {
|
|
limiter = semaphore.NewWeighted(int64(rCtx.MaxConcurrentDownloads))
|
|
}
|
|
|
|
if err := images.Dispatch(ctx, handler, limiter, desc); err != nil {
|
|
return images.Image{}, err
|
|
}
|
|
|
|
if isConvertible {
|
|
if desc, err = converterFunc(ctx, desc); err != nil {
|
|
return images.Image{}, err
|
|
}
|
|
}
|
|
|
|
img := images.Image{
|
|
Name: name,
|
|
Target: desc,
|
|
Labels: rCtx.Labels,
|
|
}
|
|
|
|
is := c.ImageService()
|
|
for {
|
|
if created, err := is.Create(ctx, img); err != nil {
|
|
if !errdefs.IsAlreadyExists(err) {
|
|
return images.Image{}, err
|
|
}
|
|
|
|
updated, err := is.Update(ctx, img)
|
|
if err != nil {
|
|
// if image was removed, try create again
|
|
if errdefs.IsNotFound(err) {
|
|
continue
|
|
}
|
|
return images.Image{}, err
|
|
}
|
|
|
|
img = updated
|
|
} else {
|
|
img = created
|
|
}
|
|
|
|
return img, nil
|
|
}
|
|
}
|