diff --git a/client.go b/client.go index 8ea7d7972..aa626abcb 100644 --- a/client.go +++ b/client.go @@ -294,6 +294,7 @@ type RemoteContext struct { PlatformMatcher platforms.MatchComparer // Unpack is done after an image is pulled to extract into a snapshotter. + // It is done simultaneously for schema 2 images when they are pulled. // If an image is not unpacked on pull, it can be unpacked any time // afterwards. Unpacking is required to run an image. Unpack bool diff --git a/pull.go b/pull.go index 3a91daba4..ef0d147ba 100644 --- a/pull.go +++ b/pull.go @@ -32,7 +32,7 @@ import ( // Pull downloads the provided content into containerd's content store // and returns a platform specific image object -func (c *Client) Pull(ctx context.Context, ref string, opts ...RemoteOpt) (Image, error) { +func (c *Client) Pull(ctx context.Context, ref string, opts ...RemoteOpt) (_ Image, retErr error) { pullCtx := defaultRemoteContext() for _, o := range opts { if err := o(c, pullCtx); err != nil { @@ -61,6 +61,30 @@ func (c *Client) Pull(ctx context.Context, ref string, opts ...RemoteOpt) (Image } defer done(ctx) + var unpacks int32 + if pullCtx.Unpack { + // unpacker only supports schema 2 image, for schema 1 this is noop. + u, err := c.newUnpacker(ctx, pullCtx) + if err != nil { + return nil, errors.Wrap(err, "create unpacker") + } + unpackWrapper, eg := u.handlerWrapper(ctx, &unpacks) + defer func() { + if err := eg.Wait(); err != nil { + if retErr == nil { + retErr = errors.Wrap(err, "unpack") + } + } + }() + wrapper := pullCtx.HandlerWrapper + pullCtx.HandlerWrapper = func(h images.Handler) images.Handler { + if wrapper == nil { + return unpackWrapper(h) + } + return wrapper(unpackWrapper(h)) + } + } + img, err := c.fetch(ctx, pullCtx, ref, 1) if err != nil { return nil, err @@ -69,8 +93,12 @@ func (c *Client) Pull(ctx context.Context, ref string, opts ...RemoteOpt) (Image i := NewImageWithPlatform(c, img, pullCtx.PlatformMatcher) if pullCtx.Unpack { - if err := i.Unpack(ctx, pullCtx.Snapshotter, pullCtx.UnpackOpts...); err != nil { - return nil, errors.Wrapf(err, "failed to unpack image on snapshotter %s", pullCtx.Snapshotter) + if unpacks == 0 { + // Try to unpack is none is done previously. + // This is at least required for schema 1 image. + if err := i.Unpack(ctx, pullCtx.Snapshotter, pullCtx.UnpackOpts...); err != nil { + return nil, errors.Wrapf(err, "failed to unpack image on snapshotter %s", pullCtx.Snapshotter) + } } } diff --git a/unpacker.go b/unpacker.go new file mode 100644 index 000000000..89c395cbb --- /dev/null +++ b/unpacker.go @@ -0,0 +1,238 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package containerd + +import ( + "context" + "encoding/json" + "fmt" + "sync" + "sync/atomic" + + "github.com/containerd/containerd/content" + "github.com/containerd/containerd/images" + "github.com/containerd/containerd/log" + "github.com/containerd/containerd/rootfs" + "github.com/opencontainers/go-digest" + "github.com/opencontainers/image-spec/identity" + ocispec "github.com/opencontainers/image-spec/specs-go/v1" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + "golang.org/x/sync/errgroup" +) + +type layerState struct { + layer rootfs.Layer + downloaded bool + unpacked bool +} + +type unpacker struct { + updateCh chan ocispec.Descriptor + snapshotter string + config UnpackConfig + c *Client +} + +func (c *Client) newUnpacker(ctx context.Context, rCtx *RemoteContext) (*unpacker, error) { + snapshotter, err := c.resolveSnapshotterName(ctx, rCtx.Snapshotter) + if err != nil { + return nil, err + } + var config UnpackConfig + for _, o := range rCtx.UnpackOpts { + if err := o(ctx, &config); err != nil { + return nil, err + } + } + return &unpacker{ + updateCh: make(chan ocispec.Descriptor, 128), + snapshotter: snapshotter, + config: config, + c: c, + }, nil +} + +func (u *unpacker) unpack(ctx context.Context, config ocispec.Descriptor, layers []ocispec.Descriptor) error { + p, err := content.ReadBlob(ctx, u.c.ContentStore(), config) + if err != nil { + return err + } + + var i ocispec.Image + if err := json.Unmarshal(p, &i); err != nil { + return errors.Wrap(err, "unmarshal image config") + } + diffIDs := i.RootFS.DiffIDs + if len(layers) != len(diffIDs) { + return errors.Errorf("number of layers and diffIDs don't match: %d != %d", len(layers), len(diffIDs)) + } + + var ( + sn = u.c.SnapshotService(u.snapshotter) + a = u.c.DiffService() + cs = u.c.ContentStore() + + states []layerState + chain []digest.Digest + ) + for i, desc := range layers { + states = append(states, layerState{ + layer: rootfs.Layer{ + Blob: desc, + Diff: ocispec.Descriptor{ + MediaType: ocispec.MediaTypeImageLayer, + Digest: diffIDs[i], + }, + }, + }) + } + for { + var layer ocispec.Descriptor + select { + case layer = <-u.updateCh: + case <-ctx.Done(): + return ctx.Err() + } + log.G(ctx).WithField("desc", layer).Debug("layer downloaded") + for i := range states { + if states[i].layer.Blob.Digest != layer.Digest { + continue + } + states[i].downloaded = true + break + } + for i := range states { + if !states[i].downloaded { + break + } + if states[i].unpacked { + continue + } + + log.G(ctx).WithFields(logrus.Fields{ + "desc": states[i].layer.Blob, + "diff": states[i].layer.Diff, + }).Debug("unpack layer") + + unpacked, err := rootfs.ApplyLayerWithOpts(ctx, states[i].layer, chain, sn, a, + u.config.SnapshotOpts, u.config.ApplyOpts) + if err != nil { + return err + } + + if unpacked { + // Set the uncompressed label after the uncompressed + // digest has been verified through apply. + cinfo := content.Info{ + Digest: states[i].layer.Blob.Digest, + Labels: map[string]string{ + "containerd.io/uncompressed": states[i].layer.Diff.Digest.String(), + }, + } + if _, err := cs.Update(ctx, cinfo, "labels.containerd.io/uncompressed"); err != nil { + return err + } + } + + chain = append(chain, states[i].layer.Diff.Digest) + states[i].unpacked = true + log.G(ctx).WithFields(logrus.Fields{ + "desc": states[i].layer.Blob, + "diff": states[i].layer.Diff, + }).Debug("layer unpacked") + } + // Check whether all layers are unpacked. + if states[len(states)-1].unpacked { + break + } + } + + chainID := identity.ChainID(chain).String() + cinfo := content.Info{ + Digest: config.Digest, + Labels: map[string]string{ + fmt.Sprintf("containerd.io/gc.ref.snapshot.%s", u.snapshotter): chainID, + }, + } + _, err = cs.Update(ctx, cinfo, fmt.Sprintf("labels.containerd.io/gc.ref.snapshot.%s", u.snapshotter)) + if err != nil { + return err + } + log.G(ctx).WithFields(logrus.Fields{ + "config": config.Digest, + "chainID": chainID, + }).Debug("image unpacked") + return nil +} + +func (u *unpacker) handlerWrapper(uctx context.Context, unpacks *int32) (func(images.Handler) images.Handler, *errgroup.Group) { + eg, uctx := errgroup.WithContext(uctx) + return func(f images.Handler) images.Handler { + var ( + lock sync.Mutex + layers []ocispec.Descriptor + schema1 bool + ) + return images.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) { + children, err := f.Handle(ctx, desc) + if err != nil { + return children, err + } + + // `Pull` only supports one platform, so there is only + // one manifest to handle, and manifest list can be + // safely skipped. + // TODO: support multi-platform unpack. + switch desc.MediaType { + case images.MediaTypeDockerSchema1Manifest: + lock.Lock() + schema1 = true + lock.Unlock() + case images.MediaTypeDockerSchema2Manifest, ocispec.MediaTypeImageManifest: + lock.Lock() + for _, child := range children { + if child.MediaType == images.MediaTypeDockerSchema2Config || + child.MediaType == ocispec.MediaTypeImageConfig { + continue + } + layers = append(layers, child) + } + lock.Unlock() + case images.MediaTypeDockerSchema2Config, ocispec.MediaTypeImageConfig: + lock.Lock() + l := append([]ocispec.Descriptor{}, layers...) + lock.Unlock() + if len(l) > 0 { + atomic.AddInt32(unpacks, 1) + eg.Go(func() error { + return u.unpack(uctx, desc, l) + }) + } + case images.MediaTypeDockerSchema2LayerGzip, images.MediaTypeDockerSchema2Layer, + ocispec.MediaTypeImageLayerGzip, ocispec.MediaTypeImageLayer: + lock.Lock() + update := !schema1 + lock.Unlock() + if update { + u.updateCh <- desc + } + } + return children, nil + }) + }, eg +}