diff --git a/unpacker.go b/pkg/unpack/unpacker.go similarity index 54% rename from unpacker.go rename to pkg/unpack/unpacker.go index 03cf7554e..136a128ea 100644 --- a/unpacker.go +++ b/pkg/unpack/unpacker.go @@ -14,7 +14,7 @@ limitations under the License. */ -package containerd +package unpack import ( "context" @@ -28,6 +28,7 @@ import ( "time" "github.com/containerd/containerd/content" + "github.com/containerd/containerd/diff" "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/images" "github.com/containerd/containerd/log" @@ -47,48 +48,179 @@ const ( labelSnapshotRef = "containerd.io/snapshot.ref" ) -type unpacker struct { - updateCh chan ocispec.Descriptor - snapshotter string - config UnpackConfig - c *Client - limiter *semaphore.Weighted +// Result returns information about the unpacks which were completed. +type Result struct { + Unpacks int } -func (c *Client) newUnpacker(ctx context.Context, rCtx *RemoteContext) (*unpacker, error) { - snapshotter, err := c.resolveSnapshotterName(ctx, rCtx.Snapshotter) - if err != nil { - return nil, err +type unpackerConfig struct { + platforms []*Platform + + content content.Store + + limiter *semaphore.Weighted + duplicationSuppressor kmutex.KeyedLocker +} + +// Platform represents a platform-specific unpack configuration which includes +// the platform matcher as well as snapshotter and applier. +type Platform struct { + Platform platforms.Matcher + + SnapshotterKey string + Snapshotter snapshots.Snapshotter + SnapshotOpts []snapshots.Opt + + Applier diff.Applier + ApplyOpts []diff.ApplyOpt +} + +type UnpackerOpt func(*unpackerConfig) error + +func WithUnpackPlatform(u Platform) UnpackerOpt { + return UnpackerOpt(func(c *unpackerConfig) error { + if u.Platform == nil { + u.Platform = platforms.All + } + if u.Snapshotter == nil { + return fmt.Errorf("snapshotter must be provided to unpack") + } + if u.SnapshotterKey == "" { + if s, ok := u.Snapshotter.(fmt.Stringer); ok { + u.SnapshotterKey = s.String() + } else { + u.SnapshotterKey = "unknown" + } + } + if u.Applier == nil { + return fmt.Errorf("applier must be provided to unpack") + } + + c.platforms = append(c.platforms, &u) + + return nil + }) +} + +func WithLimiter(l *semaphore.Weighted) UnpackerOpt { + return UnpackerOpt(func(c *unpackerConfig) error { + c.limiter = l + return nil + }) +} + +func WithDuplicationSuppressor(d kmutex.KeyedLocker) UnpackerOpt { + return UnpackerOpt(func(c *unpackerConfig) error { + c.duplicationSuppressor = d + return nil + }) +} + +// Unpacker unpacks images by hooking into the image handler process. +// Unpacks happen in the backgrounds and waited on to complete. +type Unpacker struct { + unpackerConfig + + unpacks int32 + ctx context.Context + eg *errgroup.Group +} + +// NewUnpacker creates a new instance of the unpacker which can be used to wrap an +// image handler and unpack in parallel to handling. The unpacker will handle +// calling the block handlers when they are needed by the unpack process. +func NewUnpacker(ctx context.Context, cs content.Store, opts ...UnpackerOpt) (*Unpacker, error) { + eg, ctx := errgroup.WithContext(ctx) + + u := &Unpacker{ + unpackerConfig: unpackerConfig{ + content: cs, + duplicationSuppressor: kmutex.NewNoop(), + }, + ctx: ctx, + eg: eg, } - var config = UnpackConfig{ - DuplicationSuppressor: kmutex.NewNoop(), - } - for _, o := range rCtx.UnpackOpts { - if err := o(ctx, &config); err != nil { + for _, opt := range opts { + if err := opt(&u.unpackerConfig); err != nil { return nil, err } } - var limiter *semaphore.Weighted - if rCtx.MaxConcurrentDownloads > 0 { - limiter = semaphore.NewWeighted(int64(rCtx.MaxConcurrentDownloads)) + return u, nil +} + +// Unpack wraps an image handler to filter out blob handling and scheduling them +// during the unpack process. When an image config is encountered, the unpack +// process will be started in a goroutine. +func (u *Unpacker) Unpack(h images.Handler) images.Handler { + var ( + lock sync.Mutex + layers = map[digest.Digest][]ocispec.Descriptor{} + ) + return images.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) { + unlock, err := u.lockBlobDescriptor(ctx, desc) + if err != nil { + return nil, err + } + children, err := h.Handle(ctx, desc) + unlock() + if err != nil { + return children, err + } + + switch desc.MediaType { + case images.MediaTypeDockerSchema2Manifest, ocispec.MediaTypeImageManifest: + var nonLayers []ocispec.Descriptor + var manifestLayers []ocispec.Descriptor + + // Split layers from non-layers, layers will be handled after + // the config + for _, child := range children { + if images.IsLayerType(child.MediaType) { + manifestLayers = append(manifestLayers, child) + } else { + nonLayers = append(nonLayers, child) + } + } + + lock.Lock() + for _, nl := range nonLayers { + layers[nl.Digest] = manifestLayers + } + lock.Unlock() + + children = nonLayers + case images.MediaTypeDockerSchema2Config, ocispec.MediaTypeImageConfig: + lock.Lock() + l := layers[desc.Digest] + lock.Unlock() + if len(l) > 0 { + u.eg.Go(func() error { + return u.unpack(h, desc, l) + }) + } + } + return children, nil + }) +} + +// Wait waits for any ongoing unpack processes to complete then will return +// the result. +func (u *Unpacker) Wait() (Result, error) { + if err := u.eg.Wait(); err != nil { + return Result{}, err } - return &unpacker{ - updateCh: make(chan ocispec.Descriptor, 128), - snapshotter: snapshotter, - config: config, - c: c, - limiter: limiter, + return Result{ + Unpacks: int(u.unpacks), }, nil } -func (u *unpacker) unpack( - ctx context.Context, - rCtx *RemoteContext, +func (u *Unpacker) unpack( h images.Handler, config ocispec.Descriptor, layers []ocispec.Descriptor, ) error { - p, err := content.ReadBlob(ctx, u.c.ContentStore(), config) + ctx := u.ctx + p, err := content.ReadBlob(ctx, u.content, config) if err != nil { return err } @@ -102,21 +234,27 @@ func (u *unpacker) unpack( return fmt.Errorf("number of layers and diffIDs don't match: %d != %d", len(layers), len(diffIDs)) } - if u.config.CheckPlatformSupported { - imgPlatform := platforms.Normalize(ocispec.Platform{OS: i.OS, Architecture: i.Architecture}) - snapshotterPlatformMatcher, err := u.c.GetSnapshotterSupportedPlatforms(ctx, u.snapshotter) - if err != nil { - return fmt.Errorf("failed to find supported platforms for snapshotter %s: %w", u.snapshotter, err) - } - if !snapshotterPlatformMatcher.Match(imgPlatform) { - return fmt.Errorf("snapshotter %s does not support platform %s for image %s", u.snapshotter, imgPlatform, config.Digest) + // TODO: Support multiple unpacks rather than just first match + var unpack *Platform + + imgPlatform := platforms.Normalize(ocispec.Platform{OS: i.OS, Architecture: i.Architecture}) + for _, up := range u.platforms { + if up.Platform.Match(imgPlatform) { + unpack = up + break } } + if unpack == nil { + return fmt.Errorf("unpacker does not support platform %s for image %s", imgPlatform, config.Digest) + } + + atomic.AddInt32(&u.unpacks, 1) + var ( - sn = u.c.SnapshotService(u.snapshotter) - a = u.c.DiffService() - cs = u.c.ContentStore() + sn = unpack.Snapshotter + a = unpack.Applier + cs = u.content chain []digest.Digest @@ -135,7 +273,7 @@ func (u *unpacker) unpack( chain = append(chain, diffIDs[i]) chainID := identity.ChainID(chain).String() - unlock, err := u.lockSnChainID(ctx, chainID) + unlock, err := u.lockSnChainID(ctx, chainID, unpack.SnapshotterKey) if err != nil { return err } @@ -158,7 +296,7 @@ func (u *unpacker) unpack( var ( key string mounts []mount.Mount - opts = append(rCtx.SnapshotterOpts, snapshots.WithLabels(labels)) + opts = append(unpack.SnapshotOpts, snapshots.WithLabels(labels)) ) for try := 1; try <= 3; try++ { @@ -214,15 +352,17 @@ func (u *unpacker) unpack( select { case <-ctx.Done(): + abort() return ctx.Err() case err := <-fetchErr: if err != nil { + abort() return err } case <-fetchC[i-fetchOffset]: } - diff, err := a.Apply(ctx, desc, mounts, u.config.ApplyOpts...) + diff, err := a.Apply(ctx, desc, mounts, unpack.ApplyOpts...) if err != nil { abort() return fmt.Errorf("failed to extract layer %s: %w", diffIDs[i], err) @@ -264,10 +404,10 @@ func (u *unpacker) unpack( cinfo := content.Info{ Digest: config.Digest, Labels: map[string]string{ - fmt.Sprintf("containerd.io/gc.ref.snapshot.%s", u.snapshotter): chainID, + fmt.Sprintf("containerd.io/gc.ref.snapshot.%s", unpack.SnapshotterKey): chainID, }, } - _, err = cs.Update(ctx, cinfo, fmt.Sprintf("labels.containerd.io/gc.ref.snapshot.%s", u.snapshotter)) + _, err = cs.Update(ctx, cinfo, fmt.Sprintf("labels.containerd.io/gc.ref.snapshot.%s", unpack.SnapshotterKey)) if err != nil { return err } @@ -279,7 +419,7 @@ func (u *unpacker) unpack( return nil } -func (u *unpacker) fetch(ctx context.Context, h images.Handler, layers []ocispec.Descriptor, done []chan struct{}) error { +func (u *Unpacker) fetch(ctx context.Context, h images.Handler, layers []ocispec.Descriptor, done []chan struct{}) error { eg, ctx2 := errgroup.WithContext(ctx) for i, desc := range layers { desc := desc @@ -313,108 +453,47 @@ func (u *unpacker) fetch(ctx context.Context, h images.Handler, layers []ocispec return eg.Wait() } -func (u *unpacker) handlerWrapper( - uctx context.Context, - rCtx *RemoteContext, - unpacks *int32, -) (func(images.Handler) images.Handler, *errgroup.Group) { - eg, uctx := errgroup.WithContext(uctx) - return func(f images.Handler) images.Handler { - var ( - lock sync.Mutex - layers = map[digest.Digest][]ocispec.Descriptor{} - ) - return images.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) { - unlock, err := u.lockBlobDescriptor(ctx, desc) - if err != nil { - return nil, err - } - - children, err := f.Handle(ctx, desc) - unlock() - if err != nil { - return children, err - } - - switch desc.MediaType { - case images.MediaTypeDockerSchema2Manifest, ocispec.MediaTypeImageManifest: - var nonLayers []ocispec.Descriptor - var manifestLayers []ocispec.Descriptor - - // Split layers from non-layers, layers will be handled after - // the config - for _, child := range children { - if images.IsLayerType(child.MediaType) { - manifestLayers = append(manifestLayers, child) - } else { - nonLayers = append(nonLayers, child) - } - } - - lock.Lock() - for _, nl := range nonLayers { - layers[nl.Digest] = manifestLayers - } - lock.Unlock() - - children = nonLayers - case images.MediaTypeDockerSchema2Config, ocispec.MediaTypeImageConfig: - lock.Lock() - l := layers[desc.Digest] - lock.Unlock() - if len(l) > 0 { - atomic.AddInt32(unpacks, 1) - eg.Go(func() error { - return u.unpack(uctx, rCtx, f, desc, l) - }) - } - } - return children, nil - }) - }, eg -} - -func (u *unpacker) acquire(ctx context.Context) error { +func (u *Unpacker) acquire(ctx context.Context) error { if u.limiter == nil { return nil } return u.limiter.Acquire(ctx, 1) } -func (u *unpacker) release() { +func (u *Unpacker) release() { if u.limiter == nil { return } u.limiter.Release(1) } -func (u *unpacker) lockSnChainID(ctx context.Context, chainID string) (func(), error) { - key := u.makeChainIDKeyWithSnapshotter(chainID) +func (u *Unpacker) lockSnChainID(ctx context.Context, chainID, snapshotter string) (func(), error) { + key := u.makeChainIDKeyWithSnapshotter(chainID, snapshotter) - if err := u.config.DuplicationSuppressor.Lock(ctx, key); err != nil { + if err := u.duplicationSuppressor.Lock(ctx, key); err != nil { return nil, err } return func() { - u.config.DuplicationSuppressor.Unlock(key) + u.duplicationSuppressor.Unlock(key) }, nil } -func (u *unpacker) lockBlobDescriptor(ctx context.Context, desc ocispec.Descriptor) (func(), error) { +func (u *Unpacker) lockBlobDescriptor(ctx context.Context, desc ocispec.Descriptor) (func(), error) { key := u.makeBlobDescriptorKey(desc) - if err := u.config.DuplicationSuppressor.Lock(ctx, key); err != nil { + if err := u.duplicationSuppressor.Lock(ctx, key); err != nil { return nil, err } return func() { - u.config.DuplicationSuppressor.Unlock(key) + u.duplicationSuppressor.Unlock(key) }, nil } -func (u *unpacker) makeChainIDKeyWithSnapshotter(chainID string) string { - return fmt.Sprintf("sn://%s/%v", u.snapshotter, chainID) +func (u *Unpacker) makeChainIDKeyWithSnapshotter(chainID, snapshotter string) string { + return fmt.Sprintf("sn://%s/%v", snapshotter, chainID) } -func (u *unpacker) makeBlobDescriptorKey(desc ocispec.Descriptor) string { +func (u *Unpacker) makeBlobDescriptorKey(desc ocispec.Descriptor) string { return fmt.Sprintf("blob://%v", desc.Digest) } diff --git a/pull.go b/pull.go index 92f7719b1..a7fc83431 100644 --- a/pull.go +++ b/pull.go @@ -23,12 +23,12 @@ import ( "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/images" + "github.com/containerd/containerd/pkg/unpack" "github.com/containerd/containerd/platforms" "github.com/containerd/containerd/remotes" "github.com/containerd/containerd/remotes/docker" "github.com/containerd/containerd/remotes/docker/schema1" ocispec "github.com/opencontainers/image-spec/specs-go/v1" - "golang.org/x/sync/errgroup" "golang.org/x/sync/semaphore" ) @@ -63,19 +63,46 @@ func (c *Client) Pull(ctx context.Context, ref string, opts ...RemoteOpt) (_ Ima } defer done(ctx) - var unpacks int32 - var unpackEg *errgroup.Group - var unpackWrapper func(f images.Handler) images.Handler + var unpacker *unpack.Unpacker if pullCtx.Unpack { - // unpacker only supports schema 2 image, for schema 1 this is noop. - u, err := c.newUnpacker(ctx, pullCtx) + snapshotterName, err := c.resolveSnapshotterName(ctx, pullCtx.Snapshotter) if err != nil { - return nil, fmt.Errorf("create unpacker: %w", err) + return nil, fmt.Errorf("unable to resolve snapshotter: %w", err) + } + var uconfig UnpackConfig + for _, opt := range pullCtx.UnpackOpts { + if err := opt(ctx, &uconfig); err != nil { + return nil, err + } + } + var platformMatcher platforms.Matcher + if !uconfig.CheckPlatformSupported { + platformMatcher = platforms.All + } + + // Check client Unpack config + platform := unpack.Platform{ + Platform: platformMatcher, + SnapshotterKey: snapshotterName, + Snapshotter: c.SnapshotService(snapshotterName), + SnapshotOpts: append(pullCtx.SnapshotterOpts, uconfig.SnapshotOpts...), + Applier: c.DiffService(), + ApplyOpts: uconfig.ApplyOpts, + } + uopts := []unpack.UnpackerOpt{unpack.WithUnpackPlatform(platform)} + if pullCtx.MaxConcurrentUploadedLayers > 0 { + uopts = append(uopts, unpack.WithLimiter(semaphore.NewWeighted(int64(pullCtx.MaxConcurrentDownloads)))) + } + if uconfig.DuplicationSuppressor != nil { + uopts = append(uopts, unpack.WithDuplicationSuppressor(uconfig.DuplicationSuppressor)) + } + unpacker, err = unpack.NewUnpacker(ctx, c.ContentStore(), uopts...) + if err != nil { + return nil, fmt.Errorf("unable to initialize unpacker: %w", err) } - unpackWrapper, unpackEg = u.handlerWrapper(ctx, pullCtx, &unpacks) defer func() { - if err := unpackEg.Wait(); err != nil { + if _, err := unpacker.Wait(); err != nil { if retErr == nil { retErr = fmt.Errorf("unpack: %w", err) } @@ -84,9 +111,9 @@ func (c *Client) Pull(ctx context.Context, ref string, opts ...RemoteOpt) (_ Ima wrapper := pullCtx.HandlerWrapper pullCtx.HandlerWrapper = func(h images.Handler) images.Handler { if wrapper == nil { - return unpackWrapper(h) + return unpacker.Unpack(h) } - return unpackWrapper(wrapper(h)) + return unpacker.Unpack(wrapper(h)) } } @@ -98,11 +125,10 @@ func (c *Client) Pull(ctx context.Context, ref string, opts ...RemoteOpt) (_ Ima // NOTE(fuweid): unpacker defers blobs download. before create image // record in ImageService, should wait for unpacking(including blobs // download). - if pullCtx.Unpack { - if unpackEg != nil { - if err := unpackEg.Wait(); err != nil { - return nil, err - } + var ur unpack.Result + if unpacker != nil { + if ur, err = unpacker.Wait(); err != nil { + return nil, err } } @@ -113,13 +139,11 @@ func (c *Client) Pull(ctx context.Context, ref string, opts ...RemoteOpt) (_ Ima i := NewImageWithPlatform(c, img, pullCtx.PlatformMatcher) - if pullCtx.Unpack { - if unpacks == 0 { - // Try to unpack is none is done previously. - // This is at least required for schema 1 image. - if err := i.Unpack(ctx, pullCtx.Snapshotter, pullCtx.UnpackOpts...); err != nil { - return nil, fmt.Errorf("failed to unpack image on snapshotter %s: %w", pullCtx.Snapshotter, err) - } + if unpacker != nil && ur.Unpacks == 0 { + // Unpack was tried previously but nothing was unpacked + // This is at least required for schema 1 image. + if err := i.Unpack(ctx, pullCtx.Snapshotter, pullCtx.UnpackOpts...); err != nil { + return nil, fmt.Errorf("failed to unpack image on snapshotter %s: %w", pullCtx.Snapshotter, err) } }