Merge pull request #2918 from Random-Liu/parellel-unpack
Add simultaneous unpack support
This commit is contained in:
commit
4a2f61c4f2
@ -294,6 +294,7 @@ type RemoteContext struct {
|
||||
PlatformMatcher platforms.MatchComparer
|
||||
|
||||
// Unpack is done after an image is pulled to extract into a snapshotter.
|
||||
// It is done simultaneously for schema 2 images when they are pulled.
|
||||
// If an image is not unpacked on pull, it can be unpacked any time
|
||||
// afterwards. Unpacking is required to run an image.
|
||||
Unpack bool
|
||||
|
@ -20,6 +20,9 @@ package containerd
|
||||
|
||||
import (
|
||||
"runtime"
|
||||
"testing"
|
||||
|
||||
"github.com/containerd/containerd/platforms"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -48,3 +51,20 @@ func init() {
|
||||
testImage = "docker.io/library/alpine:latest"
|
||||
}
|
||||
}
|
||||
|
||||
func TestImagePullSchema1WithEmptyLayers(t *testing.T) {
|
||||
client, err := newClient(t, address)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer client.Close()
|
||||
|
||||
ctx, cancel := testContext(t)
|
||||
defer cancel()
|
||||
|
||||
schema1TestImageWithEmptyLayers := "gcr.io/google-containers/busybox@sha256:d8d3bc2c183ed2f9f10e7258f84971202325ee6011ba137112e01e30f206de67"
|
||||
_, err = client.Pull(ctx, schema1TestImageWithEmptyLayers, WithPlatform(platforms.DefaultString()), WithSchema1Conversion, WithPullUnpack)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
34
pull.go
34
pull.go
@ -32,7 +32,7 @@ import (
|
||||
|
||||
// Pull downloads the provided content into containerd's content store
|
||||
// and returns a platform specific image object
|
||||
func (c *Client) Pull(ctx context.Context, ref string, opts ...RemoteOpt) (Image, error) {
|
||||
func (c *Client) Pull(ctx context.Context, ref string, opts ...RemoteOpt) (_ Image, retErr error) {
|
||||
pullCtx := defaultRemoteContext()
|
||||
for _, o := range opts {
|
||||
if err := o(c, pullCtx); err != nil {
|
||||
@ -61,6 +61,30 @@ func (c *Client) Pull(ctx context.Context, ref string, opts ...RemoteOpt) (Image
|
||||
}
|
||||
defer done(ctx)
|
||||
|
||||
var unpacks int32
|
||||
if pullCtx.Unpack {
|
||||
// unpacker only supports schema 2 image, for schema 1 this is noop.
|
||||
u, err := c.newUnpacker(ctx, pullCtx)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "create unpacker")
|
||||
}
|
||||
unpackWrapper, eg := u.handlerWrapper(ctx, &unpacks)
|
||||
defer func() {
|
||||
if err := eg.Wait(); err != nil {
|
||||
if retErr == nil {
|
||||
retErr = errors.Wrap(err, "unpack")
|
||||
}
|
||||
}
|
||||
}()
|
||||
wrapper := pullCtx.HandlerWrapper
|
||||
pullCtx.HandlerWrapper = func(h images.Handler) images.Handler {
|
||||
if wrapper == nil {
|
||||
return unpackWrapper(h)
|
||||
}
|
||||
return wrapper(unpackWrapper(h))
|
||||
}
|
||||
}
|
||||
|
||||
img, err := c.fetch(ctx, pullCtx, ref, 1)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -69,8 +93,12 @@ func (c *Client) Pull(ctx context.Context, ref string, opts ...RemoteOpt) (Image
|
||||
i := NewImageWithPlatform(c, img, pullCtx.PlatformMatcher)
|
||||
|
||||
if pullCtx.Unpack {
|
||||
if err := i.Unpack(ctx, pullCtx.Snapshotter, pullCtx.UnpackOpts...); err != nil {
|
||||
return nil, errors.Wrapf(err, "failed to unpack image on snapshotter %s", pullCtx.Snapshotter)
|
||||
if unpacks == 0 {
|
||||
// Try to unpack is none is done previously.
|
||||
// This is at least required for schema 1 image.
|
||||
if err := i.Unpack(ctx, pullCtx.Snapshotter, pullCtx.UnpackOpts...); err != nil {
|
||||
return nil, errors.Wrapf(err, "failed to unpack image on snapshotter %s", pullCtx.Snapshotter)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
238
unpacker.go
Normal file
238
unpacker.go
Normal file
@ -0,0 +1,238 @@
|
||||
/*
|
||||
Copyright The containerd Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package containerd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/containerd/containerd/content"
|
||||
"github.com/containerd/containerd/images"
|
||||
"github.com/containerd/containerd/log"
|
||||
"github.com/containerd/containerd/rootfs"
|
||||
"github.com/opencontainers/go-digest"
|
||||
"github.com/opencontainers/image-spec/identity"
|
||||
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/sirupsen/logrus"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
type layerState struct {
|
||||
layer rootfs.Layer
|
||||
downloaded bool
|
||||
unpacked bool
|
||||
}
|
||||
|
||||
type unpacker struct {
|
||||
updateCh chan ocispec.Descriptor
|
||||
snapshotter string
|
||||
config UnpackConfig
|
||||
c *Client
|
||||
}
|
||||
|
||||
func (c *Client) newUnpacker(ctx context.Context, rCtx *RemoteContext) (*unpacker, error) {
|
||||
snapshotter, err := c.resolveSnapshotterName(ctx, rCtx.Snapshotter)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var config UnpackConfig
|
||||
for _, o := range rCtx.UnpackOpts {
|
||||
if err := o(ctx, &config); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return &unpacker{
|
||||
updateCh: make(chan ocispec.Descriptor, 128),
|
||||
snapshotter: snapshotter,
|
||||
config: config,
|
||||
c: c,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (u *unpacker) unpack(ctx context.Context, config ocispec.Descriptor, layers []ocispec.Descriptor) error {
|
||||
p, err := content.ReadBlob(ctx, u.c.ContentStore(), config)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var i ocispec.Image
|
||||
if err := json.Unmarshal(p, &i); err != nil {
|
||||
return errors.Wrap(err, "unmarshal image config")
|
||||
}
|
||||
diffIDs := i.RootFS.DiffIDs
|
||||
if len(layers) != len(diffIDs) {
|
||||
return errors.Errorf("number of layers and diffIDs don't match: %d != %d", len(layers), len(diffIDs))
|
||||
}
|
||||
|
||||
var (
|
||||
sn = u.c.SnapshotService(u.snapshotter)
|
||||
a = u.c.DiffService()
|
||||
cs = u.c.ContentStore()
|
||||
|
||||
states []layerState
|
||||
chain []digest.Digest
|
||||
)
|
||||
for i, desc := range layers {
|
||||
states = append(states, layerState{
|
||||
layer: rootfs.Layer{
|
||||
Blob: desc,
|
||||
Diff: ocispec.Descriptor{
|
||||
MediaType: ocispec.MediaTypeImageLayer,
|
||||
Digest: diffIDs[i],
|
||||
},
|
||||
},
|
||||
})
|
||||
}
|
||||
for {
|
||||
var layer ocispec.Descriptor
|
||||
select {
|
||||
case layer = <-u.updateCh:
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
log.G(ctx).WithField("desc", layer).Debug("layer downloaded")
|
||||
for i := range states {
|
||||
if states[i].layer.Blob.Digest != layer.Digest {
|
||||
continue
|
||||
}
|
||||
states[i].downloaded = true
|
||||
break
|
||||
}
|
||||
for i := range states {
|
||||
if !states[i].downloaded {
|
||||
break
|
||||
}
|
||||
if states[i].unpacked {
|
||||
continue
|
||||
}
|
||||
|
||||
log.G(ctx).WithFields(logrus.Fields{
|
||||
"desc": states[i].layer.Blob,
|
||||
"diff": states[i].layer.Diff,
|
||||
}).Debug("unpack layer")
|
||||
|
||||
unpacked, err := rootfs.ApplyLayerWithOpts(ctx, states[i].layer, chain, sn, a,
|
||||
u.config.SnapshotOpts, u.config.ApplyOpts)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if unpacked {
|
||||
// Set the uncompressed label after the uncompressed
|
||||
// digest has been verified through apply.
|
||||
cinfo := content.Info{
|
||||
Digest: states[i].layer.Blob.Digest,
|
||||
Labels: map[string]string{
|
||||
"containerd.io/uncompressed": states[i].layer.Diff.Digest.String(),
|
||||
},
|
||||
}
|
||||
if _, err := cs.Update(ctx, cinfo, "labels.containerd.io/uncompressed"); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
chain = append(chain, states[i].layer.Diff.Digest)
|
||||
states[i].unpacked = true
|
||||
log.G(ctx).WithFields(logrus.Fields{
|
||||
"desc": states[i].layer.Blob,
|
||||
"diff": states[i].layer.Diff,
|
||||
}).Debug("layer unpacked")
|
||||
}
|
||||
// Check whether all layers are unpacked.
|
||||
if states[len(states)-1].unpacked {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
chainID := identity.ChainID(chain).String()
|
||||
cinfo := content.Info{
|
||||
Digest: config.Digest,
|
||||
Labels: map[string]string{
|
||||
fmt.Sprintf("containerd.io/gc.ref.snapshot.%s", u.snapshotter): chainID,
|
||||
},
|
||||
}
|
||||
_, err = cs.Update(ctx, cinfo, fmt.Sprintf("labels.containerd.io/gc.ref.snapshot.%s", u.snapshotter))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
log.G(ctx).WithFields(logrus.Fields{
|
||||
"config": config.Digest,
|
||||
"chainID": chainID,
|
||||
}).Debug("image unpacked")
|
||||
return nil
|
||||
}
|
||||
|
||||
func (u *unpacker) handlerWrapper(uctx context.Context, unpacks *int32) (func(images.Handler) images.Handler, *errgroup.Group) {
|
||||
eg, uctx := errgroup.WithContext(uctx)
|
||||
return func(f images.Handler) images.Handler {
|
||||
var (
|
||||
lock sync.Mutex
|
||||
layers []ocispec.Descriptor
|
||||
schema1 bool
|
||||
)
|
||||
return images.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
|
||||
children, err := f.Handle(ctx, desc)
|
||||
if err != nil {
|
||||
return children, err
|
||||
}
|
||||
|
||||
// `Pull` only supports one platform, so there is only
|
||||
// one manifest to handle, and manifest list can be
|
||||
// safely skipped.
|
||||
// TODO: support multi-platform unpack.
|
||||
switch desc.MediaType {
|
||||
case images.MediaTypeDockerSchema1Manifest:
|
||||
lock.Lock()
|
||||
schema1 = true
|
||||
lock.Unlock()
|
||||
case images.MediaTypeDockerSchema2Manifest, ocispec.MediaTypeImageManifest:
|
||||
lock.Lock()
|
||||
for _, child := range children {
|
||||
if child.MediaType == images.MediaTypeDockerSchema2Config ||
|
||||
child.MediaType == ocispec.MediaTypeImageConfig {
|
||||
continue
|
||||
}
|
||||
layers = append(layers, child)
|
||||
}
|
||||
lock.Unlock()
|
||||
case images.MediaTypeDockerSchema2Config, ocispec.MediaTypeImageConfig:
|
||||
lock.Lock()
|
||||
l := append([]ocispec.Descriptor{}, layers...)
|
||||
lock.Unlock()
|
||||
if len(l) > 0 {
|
||||
atomic.AddInt32(unpacks, 1)
|
||||
eg.Go(func() error {
|
||||
return u.unpack(uctx, desc, l)
|
||||
})
|
||||
}
|
||||
case images.MediaTypeDockerSchema2LayerGzip, images.MediaTypeDockerSchema2Layer,
|
||||
ocispec.MediaTypeImageLayerGzip, ocispec.MediaTypeImageLayer:
|
||||
lock.Lock()
|
||||
update := !schema1
|
||||
lock.Unlock()
|
||||
if update {
|
||||
u.updateCh <- desc
|
||||
}
|
||||
}
|
||||
return children, nil
|
||||
})
|
||||
}, eg
|
||||
}
|
Loading…
Reference in New Issue
Block a user