Defer layer download until unpack

Moves the content fetching into the unpack process
and defers the download until the snapshot needs it
and is ready to apply. As soon as a layer is reached
which requires fetching, all remaining layers are
fetched.

Signed-off-by: Derek McGowan <derek@mcgstyle.net>
This commit is contained in:
Derek McGowan 2019-11-27 11:07:15 -08:00
parent 3e5402ce02
commit 30d92eff1c
No known key found for this signature in database
GPG Key ID: F58C5D0A4405ACDB
2 changed files with 179 additions and 140 deletions

View File

@ -70,11 +70,6 @@ func (c *Client) Pull(ctx context.Context, ref string, opts ...RemoteOpt) (_ Ima
} }
unpackWrapper, eg := u.handlerWrapper(ctx, &unpacks) unpackWrapper, eg := u.handlerWrapper(ctx, &unpacks)
defer func() { defer func() {
if retErr != nil {
// Forcibly stop the unpacker if there is
// an error.
eg.Cancel()
}
if err := eg.Wait(); err != nil { if err := eg.Wait(); err != nil {
if retErr == nil { if retErr == nil {
retErr = errors.Wrap(err, "unpack") retErr = errors.Wrap(err, "unpack")

View File

@ -18,34 +18,35 @@ package containerd
import ( import (
"context" "context"
"encoding/base64"
"encoding/json" "encoding/json"
"fmt" "fmt"
"math/rand"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time"
"github.com/containerd/containerd/content" "github.com/containerd/containerd/content"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/images" "github.com/containerd/containerd/images"
"github.com/containerd/containerd/log" "github.com/containerd/containerd/log"
"github.com/containerd/containerd/rootfs" "github.com/containerd/containerd/mount"
"github.com/containerd/containerd/snapshots"
"github.com/opencontainers/go-digest" "github.com/opencontainers/go-digest"
"github.com/opencontainers/image-spec/identity" "github.com/opencontainers/image-spec/identity"
ocispec "github.com/opencontainers/image-spec/specs-go/v1" ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"
) )
type layerState struct {
layer rootfs.Layer
downloaded bool
unpacked bool
}
type unpacker struct { type unpacker struct {
updateCh chan ocispec.Descriptor updateCh chan ocispec.Descriptor
snapshotter string snapshotter string
config UnpackConfig config UnpackConfig
c *Client c *Client
limiter *semaphore.Weighted
} }
func (c *Client) newUnpacker(ctx context.Context, rCtx *RemoteContext) (*unpacker, error) { func (c *Client) newUnpacker(ctx context.Context, rCtx *RemoteContext) (*unpacker, error) {
@ -67,7 +68,7 @@ func (c *Client) newUnpacker(ctx context.Context, rCtx *RemoteContext) (*unpacke
}, nil }, nil
} }
func (u *unpacker) unpack(ctx context.Context, config ocispec.Descriptor, layers []ocispec.Descriptor) error { func (u *unpacker) unpack(ctx context.Context, h images.Handler, config ocispec.Descriptor, layers []ocispec.Descriptor) error {
p, err := content.ReadBlob(ctx, u.c.ContentStore(), config) p, err := content.ReadBlob(ctx, u.c.ContentStore(), config)
if err != nil { if err != nil {
return err return err
@ -87,85 +88,131 @@ func (u *unpacker) unpack(ctx context.Context, config ocispec.Descriptor, layers
a = u.c.DiffService() a = u.c.DiffService()
cs = u.c.ContentStore() cs = u.c.ContentStore()
states []layerState chain []digest.Digest
chain []digest.Digest
fetchOffset int
fetchC []chan struct{}
fetchErr chan error
) )
// If there is an early return, ensure any ongoing
// fetches get their context cancelled
ctx, cancel := context.WithCancel(ctx)
defer cancel()
EachLayer:
for i, desc := range layers { for i, desc := range layers {
states = append(states, layerState{ parent := identity.ChainID(chain)
layer: rootfs.Layer{ chain = append(chain, diffIDs[i])
Blob: desc,
Diff: ocispec.Descriptor{ chainID := identity.ChainID(chain).String()
MediaType: ocispec.MediaTypeImageLayer, if _, err := sn.Stat(ctx, chainID); err == nil {
Digest: diffIDs[i], // no need to handle
}, continue
}, } else if !errdefs.IsNotFound(err) {
return errors.Wrapf(err, "failed to stat snapshot %s", chainID)
}
labelOpt := snapshots.WithLabels(map[string]string{
"containerd.io/snapshot.ref": chainID,
}) })
}
for { var (
var layer ocispec.Descriptor key string
select { mounts []mount.Mount
case layer = <-u.updateCh: )
case <-ctx.Done():
return ctx.Err() for try := 1; try <= 3; try++ {
} // Prepare snapshot with from parent, label as root
log.G(ctx).WithField("desc", layer).Debug("layer downloaded") key = fmt.Sprintf("extract-%s %s", uniquePart(), chainID)
for i := range states { mounts, err = sn.Prepare(ctx, key, parent.String(), labelOpt)
if states[i].layer.Blob.Digest != layer.Digest { if err != nil {
continue if errdefs.IsAlreadyExists(err) {
} if _, err := sn.Stat(ctx, chainID); err != nil {
// Different layers may have the same digest. When that if !errdefs.IsNotFound(err) {
// happens, we should continue marking the next layer return errors.Wrapf(err, "failed to stat snapshot %s", chainID)
// as downloaded. }
if states[i].downloaded { // Try again, this should be rare, log it
continue log.G(ctx).WithField("key", key).WithField("chainid", chainID).Debug("extraction snapshot already exists, chain id not found")
} } else {
states[i].downloaded = true // no need to handle, snapshot now found with chain id
break continue EachLayer
} }
for i := range states { } else {
if !states[i].downloaded { return errors.Wrapf(err, "failed to prepare extraction snapshot %q", key)
}
} else {
break break
} }
if states[i].unpacked { }
continue if err != nil {
return errors.Wrap(err, "unable to prepare extraction snapshot")
}
// Abort the snapshot if commit does not happen
abort := func() {
if err := sn.Remove(ctx, key); err != nil {
log.G(ctx).WithError(err).Errorf("failed to cleanup %q", key)
}
}
if fetchErr == nil {
fetchErr = make(chan error, 1)
fetchOffset = i
fetchC = make([]chan struct{}, len(layers)-fetchOffset)
for i := range fetchC {
fetchC[i] = make(chan struct{})
} }
log.G(ctx).WithFields(logrus.Fields{ go func() {
"desc": states[i].layer.Blob, err := u.fetch(ctx, h, layers[i:], fetchC)
"diff": states[i].layer.Diff, if err != nil {
}).Debug("unpack layer") fetchErr <- err
}
close(fetchErr)
}()
}
unpacked, err := rootfs.ApplyLayerWithOpts(ctx, states[i].layer, chain, sn, a, select {
u.config.SnapshotOpts, u.config.ApplyOpts) case <-ctx.Done():
return ctx.Err()
case err := <-fetchErr:
if err != nil { if err != nil {
return err return err
} }
case <-fetchC[i-fetchOffset]:
}
if unpacked { diff, err := a.Apply(ctx, desc, mounts)
// Set the uncompressed label after the uncompressed if err != nil {
// digest has been verified through apply. abort()
cinfo := content.Info{ return errors.Wrapf(err, "failed to extract layer %s", diffIDs[i])
Digest: states[i].layer.Blob.Digest, }
Labels: map[string]string{ if diff.Digest != diffIDs[i] {
"containerd.io/uncompressed": states[i].layer.Diff.Digest.String(), abort()
}, return errors.Errorf("wrong diff id calculated on extraction %q", diffIDs[i])
} }
if _, err := cs.Update(ctx, cinfo, "labels.containerd.io/uncompressed"); err != nil {
return err if err = sn.Commit(ctx, chainID, key, labelOpt); err != nil {
} abort()
if errdefs.IsAlreadyExists(err) {
continue
} }
return errors.Wrapf(err, "failed to commit snapshot %s", key)
}
chain = append(chain, states[i].layer.Diff.Digest) // Set the uncompressed label after the uncompressed
states[i].unpacked = true // digest has been verified through apply.
log.G(ctx).WithFields(logrus.Fields{ cinfo := content.Info{
"desc": states[i].layer.Blob, Digest: desc.Digest,
"diff": states[i].layer.Diff, Labels: map[string]string{
}).Debug("layer unpacked") "containerd.io/uncompressed": diff.Digest.String(),
},
} }
// Check whether all layers are unpacked. if _, err := cs.Update(ctx, cinfo, "labels.containerd.io/uncompressed"); err != nil {
if states[len(states)-1].unpacked { return err
break
} }
} }
chainID := identity.ChainID(chain).String() chainID := identity.ChainID(chain).String()
@ -183,40 +230,45 @@ func (u *unpacker) unpack(ctx context.Context, config ocispec.Descriptor, layers
"config": config.Digest, "config": config.Digest,
"chainID": chainID, "chainID": chainID,
}).Debug("image unpacked") }).Debug("image unpacked")
return nil return nil
} }
type errGroup struct { func (u *unpacker) fetch(ctx context.Context, h images.Handler, layers []ocispec.Descriptor, done []chan struct{}) error {
*errgroup.Group eg, ctx2 := errgroup.WithContext(ctx)
cancel context.CancelFunc for i, desc := range layers {
desc := desc
i := i
if u.limiter != nil {
if err := u.limiter.Acquire(ctx, 1); err != nil {
return err
}
}
eg.Go(func() error {
_, err := h.Handle(ctx2, desc)
if u.limiter != nil {
u.limiter.Release(1)
}
if err != nil && errors.Cause(err) != images.ErrSkipDesc {
return err
}
close(done[i])
return nil
})
}
return eg.Wait()
} }
func newErrGroup(ctx context.Context) (*errGroup, context.Context) { func (u *unpacker) handlerWrapper(uctx context.Context, unpacks *int32) (func(images.Handler) images.Handler, *errgroup.Group) {
ctx, cancel := context.WithCancel(ctx) eg, uctx := errgroup.WithContext(uctx)
eg, ctx := errgroup.WithContext(ctx)
return &errGroup{
Group: eg,
cancel: cancel,
}, ctx
}
func (e *errGroup) Cancel() {
e.cancel()
}
func (e *errGroup) Wait() error {
err := e.Group.Wait()
e.cancel()
return err
}
func (u *unpacker) handlerWrapper(uctx context.Context, unpacks *int32) (func(images.Handler) images.Handler, *errGroup) {
eg, uctx := newErrGroup(uctx)
return func(f images.Handler) images.Handler { return func(f images.Handler) images.Handler {
var ( var (
lock sync.Mutex lock sync.Mutex
layers []ocispec.Descriptor layers = map[digest.Digest][]ocispec.Descriptor{}
schema1 bool
) )
return images.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) { return images.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
children, err := f.Handle(ctx, desc) children, err := f.Handle(ctx, desc)
@ -224,56 +276,48 @@ func (u *unpacker) handlerWrapper(uctx context.Context, unpacks *int32) (func(im
return children, err return children, err
} }
// `Pull` only supports one platform, so there is only switch desc.MediaType {
// one manifest to handle, and manifest list can be case images.MediaTypeDockerSchema2Manifest, ocispec.MediaTypeImageManifest:
// safely skipped. var nonLayers []ocispec.Descriptor
// TODO: support multi-platform unpack. var manifestLayers []ocispec.Descriptor
switch mt := desc.MediaType; {
case mt == images.MediaTypeDockerSchema1Manifest: // Split layers from non-layers, layers will be handled after
lock.Lock() // the config
schema1 = true
lock.Unlock()
case mt == images.MediaTypeDockerSchema2Manifest || mt == ocispec.MediaTypeImageManifest:
lock.Lock()
for _, child := range children { for _, child := range children {
if child.MediaType == images.MediaTypeDockerSchema2Config || if images.IsLayerType(child.MediaType) {
child.MediaType == ocispec.MediaTypeImageConfig { manifestLayers = append(manifestLayers, child)
continue } else {
nonLayers = append(nonLayers, child)
} }
layers = append(layers, child) }
lock.Lock()
for _, nl := range nonLayers {
layers[nl.Digest] = manifestLayers
} }
lock.Unlock() lock.Unlock()
case mt == images.MediaTypeDockerSchema2Config || mt == ocispec.MediaTypeImageConfig:
children = nonLayers
case images.MediaTypeDockerSchema2Config, ocispec.MediaTypeImageConfig:
lock.Lock() lock.Lock()
l := append([]ocispec.Descriptor{}, layers...) l := layers[desc.Digest]
lock.Unlock() lock.Unlock()
if len(l) > 0 { if len(l) > 0 {
atomic.AddInt32(unpacks, 1) atomic.AddInt32(unpacks, 1)
eg.Go(func() error { eg.Go(func() error {
return u.unpack(uctx, desc, l) return u.unpack(uctx, f, desc, l)
}) })
} }
case images.IsLayerType(mt):
lock.Lock()
update := !schema1
lock.Unlock()
if update {
select {
case <-uctx.Done():
// Do not send update if unpacker is not running.
default:
select {
case u.updateCh <- desc:
case <-uctx.Done():
// Do not send update if unpacker is not running.
}
}
// Checking ctx.Done() prevents the case that unpacker
// exits unexpectedly, but update continues to be generated,
// and eventually fills up updateCh and blocks forever.
}
} }
return children, nil return children, nil
}) })
}, eg }, eg
} }
func uniquePart() string {
t := time.Now()
var b [3]byte
// Ignore read failures, just decreases uniqueness
rand.Read(b[:])
return fmt.Sprintf("%d-%s", t.Nanosecond(), base64.URLEncoding.EncodeToString(b[:]))
}