diff --git a/pull.go b/pull.go index b5c50d94f..2520639df 100644 --- a/pull.go +++ b/pull.go @@ -70,11 +70,6 @@ func (c *Client) Pull(ctx context.Context, ref string, opts ...RemoteOpt) (_ Ima } unpackWrapper, eg := u.handlerWrapper(ctx, &unpacks) defer func() { - if retErr != nil { - // Forcibly stop the unpacker if there is - // an error. - eg.Cancel() - } if err := eg.Wait(); err != nil { if retErr == nil { retErr = errors.Wrap(err, "unpack") diff --git a/unpacker.go b/unpacker.go index 6955129c7..7d3a6d3d8 100644 --- a/unpacker.go +++ b/unpacker.go @@ -18,34 +18,35 @@ package containerd import ( "context" + "encoding/base64" "encoding/json" "fmt" + "math/rand" "sync" "sync/atomic" + "time" "github.com/containerd/containerd/content" + "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/images" "github.com/containerd/containerd/log" - "github.com/containerd/containerd/rootfs" + "github.com/containerd/containerd/mount" + "github.com/containerd/containerd/snapshots" "github.com/opencontainers/go-digest" "github.com/opencontainers/image-spec/identity" ocispec "github.com/opencontainers/image-spec/specs-go/v1" "github.com/pkg/errors" "github.com/sirupsen/logrus" "golang.org/x/sync/errgroup" + "golang.org/x/sync/semaphore" ) -type layerState struct { - layer rootfs.Layer - downloaded bool - unpacked bool -} - type unpacker struct { updateCh chan ocispec.Descriptor snapshotter string config UnpackConfig c *Client + limiter *semaphore.Weighted } func (c *Client) newUnpacker(ctx context.Context, rCtx *RemoteContext) (*unpacker, error) { @@ -67,7 +68,7 @@ func (c *Client) newUnpacker(ctx context.Context, rCtx *RemoteContext) (*unpacke }, nil } -func (u *unpacker) unpack(ctx context.Context, config ocispec.Descriptor, layers []ocispec.Descriptor) error { +func (u *unpacker) unpack(ctx context.Context, h images.Handler, config ocispec.Descriptor, layers []ocispec.Descriptor) error { p, err := content.ReadBlob(ctx, u.c.ContentStore(), config) if err != nil { return err @@ -87,85 +88,131 @@ func (u *unpacker) unpack(ctx context.Context, config ocispec.Descriptor, layers a = u.c.DiffService() cs = u.c.ContentStore() - states []layerState - chain []digest.Digest + chain []digest.Digest + + fetchOffset int + fetchC []chan struct{} + fetchErr chan error ) + + // If there is an early return, ensure any ongoing + // fetches get their context cancelled + ctx, cancel := context.WithCancel(ctx) + defer cancel() + +EachLayer: for i, desc := range layers { - states = append(states, layerState{ - layer: rootfs.Layer{ - Blob: desc, - Diff: ocispec.Descriptor{ - MediaType: ocispec.MediaTypeImageLayer, - Digest: diffIDs[i], - }, - }, + parent := identity.ChainID(chain) + chain = append(chain, diffIDs[i]) + + chainID := identity.ChainID(chain).String() + if _, err := sn.Stat(ctx, chainID); err == nil { + // no need to handle + continue + } else if !errdefs.IsNotFound(err) { + return errors.Wrapf(err, "failed to stat snapshot %s", chainID) + } + + labelOpt := snapshots.WithLabels(map[string]string{ + "containerd.io/snapshot.ref": chainID, }) - } - for { - var layer ocispec.Descriptor - select { - case layer = <-u.updateCh: - case <-ctx.Done(): - return ctx.Err() - } - log.G(ctx).WithField("desc", layer).Debug("layer downloaded") - for i := range states { - if states[i].layer.Blob.Digest != layer.Digest { - continue - } - // Different layers may have the same digest. When that - // happens, we should continue marking the next layer - // as downloaded. - if states[i].downloaded { - continue - } - states[i].downloaded = true - break - } - for i := range states { - if !states[i].downloaded { + + var ( + key string + mounts []mount.Mount + ) + + for try := 1; try <= 3; try++ { + // Prepare snapshot with from parent, label as root + key = fmt.Sprintf("extract-%s %s", uniquePart(), chainID) + mounts, err = sn.Prepare(ctx, key, parent.String(), labelOpt) + if err != nil { + if errdefs.IsAlreadyExists(err) { + if _, err := sn.Stat(ctx, chainID); err != nil { + if !errdefs.IsNotFound(err) { + return errors.Wrapf(err, "failed to stat snapshot %s", chainID) + } + // Try again, this should be rare, log it + log.G(ctx).WithField("key", key).WithField("chainid", chainID).Debug("extraction snapshot already exists, chain id not found") + } else { + // no need to handle, snapshot now found with chain id + continue EachLayer + } + } else { + return errors.Wrapf(err, "failed to prepare extraction snapshot %q", key) + } + } else { break } - if states[i].unpacked { - continue + } + if err != nil { + return errors.Wrap(err, "unable to prepare extraction snapshot") + } + + // Abort the snapshot if commit does not happen + abort := func() { + if err := sn.Remove(ctx, key); err != nil { + log.G(ctx).WithError(err).Errorf("failed to cleanup %q", key) + } + } + + if fetchErr == nil { + fetchErr = make(chan error, 1) + fetchOffset = i + fetchC = make([]chan struct{}, len(layers)-fetchOffset) + for i := range fetchC { + fetchC[i] = make(chan struct{}) } - log.G(ctx).WithFields(logrus.Fields{ - "desc": states[i].layer.Blob, - "diff": states[i].layer.Diff, - }).Debug("unpack layer") + go func() { + err := u.fetch(ctx, h, layers[i:], fetchC) + if err != nil { + fetchErr <- err + } + close(fetchErr) + }() + } - unpacked, err := rootfs.ApplyLayerWithOpts(ctx, states[i].layer, chain, sn, a, - u.config.SnapshotOpts, u.config.ApplyOpts) + select { + case <-ctx.Done(): + return ctx.Err() + case err := <-fetchErr: if err != nil { return err } + case <-fetchC[i-fetchOffset]: + } - if unpacked { - // Set the uncompressed label after the uncompressed - // digest has been verified through apply. - cinfo := content.Info{ - Digest: states[i].layer.Blob.Digest, - Labels: map[string]string{ - "containerd.io/uncompressed": states[i].layer.Diff.Digest.String(), - }, - } - if _, err := cs.Update(ctx, cinfo, "labels.containerd.io/uncompressed"); err != nil { - return err - } + diff, err := a.Apply(ctx, desc, mounts) + if err != nil { + abort() + return errors.Wrapf(err, "failed to extract layer %s", diffIDs[i]) + } + if diff.Digest != diffIDs[i] { + abort() + return errors.Errorf("wrong diff id calculated on extraction %q", diffIDs[i]) + } + + if err = sn.Commit(ctx, chainID, key, labelOpt); err != nil { + abort() + if errdefs.IsAlreadyExists(err) { + continue } + return errors.Wrapf(err, "failed to commit snapshot %s", key) + } - chain = append(chain, states[i].layer.Diff.Digest) - states[i].unpacked = true - log.G(ctx).WithFields(logrus.Fields{ - "desc": states[i].layer.Blob, - "diff": states[i].layer.Diff, - }).Debug("layer unpacked") + // Set the uncompressed label after the uncompressed + // digest has been verified through apply. + cinfo := content.Info{ + Digest: desc.Digest, + Labels: map[string]string{ + "containerd.io/uncompressed": diff.Digest.String(), + }, } - // Check whether all layers are unpacked. - if states[len(states)-1].unpacked { - break + if _, err := cs.Update(ctx, cinfo, "labels.containerd.io/uncompressed"); err != nil { + return err } + } chainID := identity.ChainID(chain).String() @@ -183,40 +230,45 @@ func (u *unpacker) unpack(ctx context.Context, config ocispec.Descriptor, layers "config": config.Digest, "chainID": chainID, }).Debug("image unpacked") + return nil } -type errGroup struct { - *errgroup.Group - cancel context.CancelFunc +func (u *unpacker) fetch(ctx context.Context, h images.Handler, layers []ocispec.Descriptor, done []chan struct{}) error { + eg, ctx2 := errgroup.WithContext(ctx) + for i, desc := range layers { + desc := desc + i := i + + if u.limiter != nil { + if err := u.limiter.Acquire(ctx, 1); err != nil { + return err + } + } + + eg.Go(func() error { + _, err := h.Handle(ctx2, desc) + if u.limiter != nil { + u.limiter.Release(1) + } + if err != nil && errors.Cause(err) != images.ErrSkipDesc { + return err + } + close(done[i]) + + return nil + }) + } + + return eg.Wait() } -func newErrGroup(ctx context.Context) (*errGroup, context.Context) { - ctx, cancel := context.WithCancel(ctx) - eg, ctx := errgroup.WithContext(ctx) - return &errGroup{ - Group: eg, - cancel: cancel, - }, ctx -} - -func (e *errGroup) Cancel() { - e.cancel() -} - -func (e *errGroup) Wait() error { - err := e.Group.Wait() - e.cancel() - return err -} - -func (u *unpacker) handlerWrapper(uctx context.Context, unpacks *int32) (func(images.Handler) images.Handler, *errGroup) { - eg, uctx := newErrGroup(uctx) +func (u *unpacker) handlerWrapper(uctx context.Context, unpacks *int32) (func(images.Handler) images.Handler, *errgroup.Group) { + eg, uctx := errgroup.WithContext(uctx) return func(f images.Handler) images.Handler { var ( - lock sync.Mutex - layers []ocispec.Descriptor - schema1 bool + lock sync.Mutex + layers = map[digest.Digest][]ocispec.Descriptor{} ) return images.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) { children, err := f.Handle(ctx, desc) @@ -224,56 +276,48 @@ func (u *unpacker) handlerWrapper(uctx context.Context, unpacks *int32) (func(im return children, err } - // `Pull` only supports one platform, so there is only - // one manifest to handle, and manifest list can be - // safely skipped. - // TODO: support multi-platform unpack. - switch mt := desc.MediaType; { - case mt == images.MediaTypeDockerSchema1Manifest: - lock.Lock() - schema1 = true - lock.Unlock() - case mt == images.MediaTypeDockerSchema2Manifest || mt == ocispec.MediaTypeImageManifest: - lock.Lock() + switch desc.MediaType { + case images.MediaTypeDockerSchema2Manifest, ocispec.MediaTypeImageManifest: + var nonLayers []ocispec.Descriptor + var manifestLayers []ocispec.Descriptor + + // Split layers from non-layers, layers will be handled after + // the config for _, child := range children { - if child.MediaType == images.MediaTypeDockerSchema2Config || - child.MediaType == ocispec.MediaTypeImageConfig { - continue + if images.IsLayerType(child.MediaType) { + manifestLayers = append(manifestLayers, child) + } else { + nonLayers = append(nonLayers, child) } - layers = append(layers, child) + } + + lock.Lock() + for _, nl := range nonLayers { + layers[nl.Digest] = manifestLayers } lock.Unlock() - case mt == images.MediaTypeDockerSchema2Config || mt == ocispec.MediaTypeImageConfig: + + children = nonLayers + case images.MediaTypeDockerSchema2Config, ocispec.MediaTypeImageConfig: lock.Lock() - l := append([]ocispec.Descriptor{}, layers...) + l := layers[desc.Digest] lock.Unlock() if len(l) > 0 { atomic.AddInt32(unpacks, 1) eg.Go(func() error { - return u.unpack(uctx, desc, l) + return u.unpack(uctx, f, desc, l) }) } - case images.IsLayerType(mt): - lock.Lock() - update := !schema1 - lock.Unlock() - if update { - select { - case <-uctx.Done(): - // Do not send update if unpacker is not running. - default: - select { - case u.updateCh <- desc: - case <-uctx.Done(): - // Do not send update if unpacker is not running. - } - } - // Checking ctx.Done() prevents the case that unpacker - // exits unexpectedly, but update continues to be generated, - // and eventually fills up updateCh and blocks forever. - } } return children, nil }) }, eg } + +func uniquePart() string { + t := time.Now() + var b [3]byte + // Ignore read failures, just decreases uniqueness + rand.Read(b[:]) + return fmt.Sprintf("%d-%s", t.Nanosecond(), base64.URLEncoding.EncodeToString(b[:])) +}