Add unpack interface to be used by client

Move client unpacker to pkg/unpack

Signed-off-by: Derek McGowan <derek@mcg.dev>
This commit is contained in:
Derek McGowan 2022-03-25 18:08:34 -07:00
parent 030c1ac1ca
commit 8017daa12d
No known key found for this signature in database
GPG Key ID: F58C5D0A4405ACDB
2 changed files with 245 additions and 142 deletions

View File

@ -14,7 +14,7 @@
limitations under the License.
*/
package containerd
package unpack
import (
"context"
@ -28,6 +28,7 @@ import (
"time"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/diff"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/log"
@ -47,48 +48,179 @@ const (
labelSnapshotRef = "containerd.io/snapshot.ref"
)
type unpacker struct {
updateCh chan ocispec.Descriptor
snapshotter string
config UnpackConfig
c *Client
limiter *semaphore.Weighted
// Result returns information about the unpacks which were completed.
type Result struct {
Unpacks int
}
func (c *Client) newUnpacker(ctx context.Context, rCtx *RemoteContext) (*unpacker, error) {
snapshotter, err := c.resolveSnapshotterName(ctx, rCtx.Snapshotter)
type unpackerConfig struct {
platforms []*Platform
content content.Store
limiter *semaphore.Weighted
duplicationSuppressor kmutex.KeyedLocker
}
// Platform represents a platform-specific unpack configuration which includes
// the platform matcher as well as snapshotter and applier.
type Platform struct {
Platform platforms.Matcher
SnapshotterKey string
Snapshotter snapshots.Snapshotter
SnapshotOpts []snapshots.Opt
Applier diff.Applier
ApplyOpts []diff.ApplyOpt
}
type UnpackerOpt func(*unpackerConfig) error
func WithUnpackPlatform(u Platform) UnpackerOpt {
return UnpackerOpt(func(c *unpackerConfig) error {
if u.Platform == nil {
u.Platform = platforms.All
}
if u.Snapshotter == nil {
return fmt.Errorf("snapshotter must be provided to unpack")
}
if u.SnapshotterKey == "" {
if s, ok := u.Snapshotter.(fmt.Stringer); ok {
u.SnapshotterKey = s.String()
} else {
u.SnapshotterKey = "unknown"
}
}
if u.Applier == nil {
return fmt.Errorf("applier must be provided to unpack")
}
c.platforms = append(c.platforms, &u)
return nil
})
}
func WithLimiter(l *semaphore.Weighted) UnpackerOpt {
return UnpackerOpt(func(c *unpackerConfig) error {
c.limiter = l
return nil
})
}
func WithDuplicationSuppressor(d kmutex.KeyedLocker) UnpackerOpt {
return UnpackerOpt(func(c *unpackerConfig) error {
c.duplicationSuppressor = d
return nil
})
}
// Unpacker unpacks images by hooking into the image handler process.
// Unpacks happen in the backgrounds and waited on to complete.
type Unpacker struct {
unpackerConfig
unpacks int32
ctx context.Context
eg *errgroup.Group
}
// NewUnpacker creates a new instance of the unpacker which can be used to wrap an
// image handler and unpack in parallel to handling. The unpacker will handle
// calling the block handlers when they are needed by the unpack process.
func NewUnpacker(ctx context.Context, cs content.Store, opts ...UnpackerOpt) (*Unpacker, error) {
eg, ctx := errgroup.WithContext(ctx)
u := &Unpacker{
unpackerConfig: unpackerConfig{
content: cs,
duplicationSuppressor: kmutex.NewNoop(),
},
ctx: ctx,
eg: eg,
}
for _, opt := range opts {
if err := opt(&u.unpackerConfig); err != nil {
return nil, err
}
}
return u, nil
}
// Unpack wraps an image handler to filter out blob handling and scheduling them
// during the unpack process. When an image config is encountered, the unpack
// process will be started in a goroutine.
func (u *Unpacker) Unpack(h images.Handler) images.Handler {
var (
lock sync.Mutex
layers = map[digest.Digest][]ocispec.Descriptor{}
)
return images.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
unlock, err := u.lockBlobDescriptor(ctx, desc)
if err != nil {
return nil, err
}
var config = UnpackConfig{
DuplicationSuppressor: kmutex.NewNoop(),
children, err := h.Handle(ctx, desc)
unlock()
if err != nil {
return children, err
}
for _, o := range rCtx.UnpackOpts {
if err := o(ctx, &config); err != nil {
return nil, err
switch desc.MediaType {
case images.MediaTypeDockerSchema2Manifest, ocispec.MediaTypeImageManifest:
var nonLayers []ocispec.Descriptor
var manifestLayers []ocispec.Descriptor
// Split layers from non-layers, layers will be handled after
// the config
for _, child := range children {
if images.IsLayerType(child.MediaType) {
manifestLayers = append(manifestLayers, child)
} else {
nonLayers = append(nonLayers, child)
}
}
var limiter *semaphore.Weighted
if rCtx.MaxConcurrentDownloads > 0 {
limiter = semaphore.NewWeighted(int64(rCtx.MaxConcurrentDownloads))
lock.Lock()
for _, nl := range nonLayers {
layers[nl.Digest] = manifestLayers
}
return &unpacker{
updateCh: make(chan ocispec.Descriptor, 128),
snapshotter: snapshotter,
config: config,
c: c,
limiter: limiter,
lock.Unlock()
children = nonLayers
case images.MediaTypeDockerSchema2Config, ocispec.MediaTypeImageConfig:
lock.Lock()
l := layers[desc.Digest]
lock.Unlock()
if len(l) > 0 {
u.eg.Go(func() error {
return u.unpack(h, desc, l)
})
}
}
return children, nil
})
}
// Wait waits for any ongoing unpack processes to complete then will return
// the result.
func (u *Unpacker) Wait() (Result, error) {
if err := u.eg.Wait(); err != nil {
return Result{}, err
}
return Result{
Unpacks: int(u.unpacks),
}, nil
}
func (u *unpacker) unpack(
ctx context.Context,
rCtx *RemoteContext,
func (u *Unpacker) unpack(
h images.Handler,
config ocispec.Descriptor,
layers []ocispec.Descriptor,
) error {
p, err := content.ReadBlob(ctx, u.c.ContentStore(), config)
ctx := u.ctx
p, err := content.ReadBlob(ctx, u.content, config)
if err != nil {
return err
}
@ -102,21 +234,27 @@ func (u *unpacker) unpack(
return fmt.Errorf("number of layers and diffIDs don't match: %d != %d", len(layers), len(diffIDs))
}
if u.config.CheckPlatformSupported {
// TODO: Support multiple unpacks rather than just first match
var unpack *Platform
imgPlatform := platforms.Normalize(ocispec.Platform{OS: i.OS, Architecture: i.Architecture})
snapshotterPlatformMatcher, err := u.c.GetSnapshotterSupportedPlatforms(ctx, u.snapshotter)
if err != nil {
return fmt.Errorf("failed to find supported platforms for snapshotter %s: %w", u.snapshotter, err)
}
if !snapshotterPlatformMatcher.Match(imgPlatform) {
return fmt.Errorf("snapshotter %s does not support platform %s for image %s", u.snapshotter, imgPlatform, config.Digest)
for _, up := range u.platforms {
if up.Platform.Match(imgPlatform) {
unpack = up
break
}
}
if unpack == nil {
return fmt.Errorf("unpacker does not support platform %s for image %s", imgPlatform, config.Digest)
}
atomic.AddInt32(&u.unpacks, 1)
var (
sn = u.c.SnapshotService(u.snapshotter)
a = u.c.DiffService()
cs = u.c.ContentStore()
sn = unpack.Snapshotter
a = unpack.Applier
cs = u.content
chain []digest.Digest
@ -135,7 +273,7 @@ func (u *unpacker) unpack(
chain = append(chain, diffIDs[i])
chainID := identity.ChainID(chain).String()
unlock, err := u.lockSnChainID(ctx, chainID)
unlock, err := u.lockSnChainID(ctx, chainID, unpack.SnapshotterKey)
if err != nil {
return err
}
@ -158,7 +296,7 @@ func (u *unpacker) unpack(
var (
key string
mounts []mount.Mount
opts = append(rCtx.SnapshotterOpts, snapshots.WithLabels(labels))
opts = append(unpack.SnapshotOpts, snapshots.WithLabels(labels))
)
for try := 1; try <= 3; try++ {
@ -214,15 +352,17 @@ func (u *unpacker) unpack(
select {
case <-ctx.Done():
abort()
return ctx.Err()
case err := <-fetchErr:
if err != nil {
abort()
return err
}
case <-fetchC[i-fetchOffset]:
}
diff, err := a.Apply(ctx, desc, mounts, u.config.ApplyOpts...)
diff, err := a.Apply(ctx, desc, mounts, unpack.ApplyOpts...)
if err != nil {
abort()
return fmt.Errorf("failed to extract layer %s: %w", diffIDs[i], err)
@ -264,10 +404,10 @@ func (u *unpacker) unpack(
cinfo := content.Info{
Digest: config.Digest,
Labels: map[string]string{
fmt.Sprintf("containerd.io/gc.ref.snapshot.%s", u.snapshotter): chainID,
fmt.Sprintf("containerd.io/gc.ref.snapshot.%s", unpack.SnapshotterKey): chainID,
},
}
_, err = cs.Update(ctx, cinfo, fmt.Sprintf("labels.containerd.io/gc.ref.snapshot.%s", u.snapshotter))
_, err = cs.Update(ctx, cinfo, fmt.Sprintf("labels.containerd.io/gc.ref.snapshot.%s", unpack.SnapshotterKey))
if err != nil {
return err
}
@ -279,7 +419,7 @@ func (u *unpacker) unpack(
return nil
}
func (u *unpacker) fetch(ctx context.Context, h images.Handler, layers []ocispec.Descriptor, done []chan struct{}) error {
func (u *Unpacker) fetch(ctx context.Context, h images.Handler, layers []ocispec.Descriptor, done []chan struct{}) error {
eg, ctx2 := errgroup.WithContext(ctx)
for i, desc := range layers {
desc := desc
@ -313,108 +453,47 @@ func (u *unpacker) fetch(ctx context.Context, h images.Handler, layers []ocispec
return eg.Wait()
}
func (u *unpacker) handlerWrapper(
uctx context.Context,
rCtx *RemoteContext,
unpacks *int32,
) (func(images.Handler) images.Handler, *errgroup.Group) {
eg, uctx := errgroup.WithContext(uctx)
return func(f images.Handler) images.Handler {
var (
lock sync.Mutex
layers = map[digest.Digest][]ocispec.Descriptor{}
)
return images.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
unlock, err := u.lockBlobDescriptor(ctx, desc)
if err != nil {
return nil, err
}
children, err := f.Handle(ctx, desc)
unlock()
if err != nil {
return children, err
}
switch desc.MediaType {
case images.MediaTypeDockerSchema2Manifest, ocispec.MediaTypeImageManifest:
var nonLayers []ocispec.Descriptor
var manifestLayers []ocispec.Descriptor
// Split layers from non-layers, layers will be handled after
// the config
for _, child := range children {
if images.IsLayerType(child.MediaType) {
manifestLayers = append(manifestLayers, child)
} else {
nonLayers = append(nonLayers, child)
}
}
lock.Lock()
for _, nl := range nonLayers {
layers[nl.Digest] = manifestLayers
}
lock.Unlock()
children = nonLayers
case images.MediaTypeDockerSchema2Config, ocispec.MediaTypeImageConfig:
lock.Lock()
l := layers[desc.Digest]
lock.Unlock()
if len(l) > 0 {
atomic.AddInt32(unpacks, 1)
eg.Go(func() error {
return u.unpack(uctx, rCtx, f, desc, l)
})
}
}
return children, nil
})
}, eg
}
func (u *unpacker) acquire(ctx context.Context) error {
func (u *Unpacker) acquire(ctx context.Context) error {
if u.limiter == nil {
return nil
}
return u.limiter.Acquire(ctx, 1)
}
func (u *unpacker) release() {
func (u *Unpacker) release() {
if u.limiter == nil {
return
}
u.limiter.Release(1)
}
func (u *unpacker) lockSnChainID(ctx context.Context, chainID string) (func(), error) {
key := u.makeChainIDKeyWithSnapshotter(chainID)
func (u *Unpacker) lockSnChainID(ctx context.Context, chainID, snapshotter string) (func(), error) {
key := u.makeChainIDKeyWithSnapshotter(chainID, snapshotter)
if err := u.config.DuplicationSuppressor.Lock(ctx, key); err != nil {
if err := u.duplicationSuppressor.Lock(ctx, key); err != nil {
return nil, err
}
return func() {
u.config.DuplicationSuppressor.Unlock(key)
u.duplicationSuppressor.Unlock(key)
}, nil
}
func (u *unpacker) lockBlobDescriptor(ctx context.Context, desc ocispec.Descriptor) (func(), error) {
func (u *Unpacker) lockBlobDescriptor(ctx context.Context, desc ocispec.Descriptor) (func(), error) {
key := u.makeBlobDescriptorKey(desc)
if err := u.config.DuplicationSuppressor.Lock(ctx, key); err != nil {
if err := u.duplicationSuppressor.Lock(ctx, key); err != nil {
return nil, err
}
return func() {
u.config.DuplicationSuppressor.Unlock(key)
u.duplicationSuppressor.Unlock(key)
}, nil
}
func (u *unpacker) makeChainIDKeyWithSnapshotter(chainID string) string {
return fmt.Sprintf("sn://%s/%v", u.snapshotter, chainID)
func (u *Unpacker) makeChainIDKeyWithSnapshotter(chainID, snapshotter string) string {
return fmt.Sprintf("sn://%s/%v", snapshotter, chainID)
}
func (u *unpacker) makeBlobDescriptorKey(desc ocispec.Descriptor) string {
func (u *Unpacker) makeBlobDescriptorKey(desc ocispec.Descriptor) string {
return fmt.Sprintf("blob://%v", desc.Digest)
}

62
pull.go
View File

@ -23,12 +23,12 @@ import (
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/pkg/unpack"
"github.com/containerd/containerd/platforms"
"github.com/containerd/containerd/remotes"
"github.com/containerd/containerd/remotes/docker"
"github.com/containerd/containerd/remotes/docker/schema1"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"
)
@ -63,19 +63,46 @@ func (c *Client) Pull(ctx context.Context, ref string, opts ...RemoteOpt) (_ Ima
}
defer done(ctx)
var unpacks int32
var unpackEg *errgroup.Group
var unpackWrapper func(f images.Handler) images.Handler
var unpacker *unpack.Unpacker
if pullCtx.Unpack {
// unpacker only supports schema 2 image, for schema 1 this is noop.
u, err := c.newUnpacker(ctx, pullCtx)
snapshotterName, err := c.resolveSnapshotterName(ctx, pullCtx.Snapshotter)
if err != nil {
return nil, fmt.Errorf("create unpacker: %w", err)
return nil, fmt.Errorf("unable to resolve snapshotter: %w", err)
}
var uconfig UnpackConfig
for _, opt := range pullCtx.UnpackOpts {
if err := opt(ctx, &uconfig); err != nil {
return nil, err
}
}
var platformMatcher platforms.Matcher
if !uconfig.CheckPlatformSupported {
platformMatcher = platforms.All
}
// Check client Unpack config
platform := unpack.Platform{
Platform: platformMatcher,
SnapshotterKey: snapshotterName,
Snapshotter: c.SnapshotService(snapshotterName),
SnapshotOpts: append(pullCtx.SnapshotterOpts, uconfig.SnapshotOpts...),
Applier: c.DiffService(),
ApplyOpts: uconfig.ApplyOpts,
}
uopts := []unpack.UnpackerOpt{unpack.WithUnpackPlatform(platform)}
if pullCtx.MaxConcurrentUploadedLayers > 0 {
uopts = append(uopts, unpack.WithLimiter(semaphore.NewWeighted(int64(pullCtx.MaxConcurrentDownloads))))
}
if uconfig.DuplicationSuppressor != nil {
uopts = append(uopts, unpack.WithDuplicationSuppressor(uconfig.DuplicationSuppressor))
}
unpacker, err = unpack.NewUnpacker(ctx, c.ContentStore(), uopts...)
if err != nil {
return nil, fmt.Errorf("unable to initialize unpacker: %w", err)
}
unpackWrapper, unpackEg = u.handlerWrapper(ctx, pullCtx, &unpacks)
defer func() {
if err := unpackEg.Wait(); err != nil {
if _, err := unpacker.Wait(); err != nil {
if retErr == nil {
retErr = fmt.Errorf("unpack: %w", err)
}
@ -84,9 +111,9 @@ func (c *Client) Pull(ctx context.Context, ref string, opts ...RemoteOpt) (_ Ima
wrapper := pullCtx.HandlerWrapper
pullCtx.HandlerWrapper = func(h images.Handler) images.Handler {
if wrapper == nil {
return unpackWrapper(h)
return unpacker.Unpack(h)
}
return unpackWrapper(wrapper(h))
return unpacker.Unpack(wrapper(h))
}
}
@ -98,13 +125,12 @@ func (c *Client) Pull(ctx context.Context, ref string, opts ...RemoteOpt) (_ Ima
// NOTE(fuweid): unpacker defers blobs download. before create image
// record in ImageService, should wait for unpacking(including blobs
// download).
if pullCtx.Unpack {
if unpackEg != nil {
if err := unpackEg.Wait(); err != nil {
var ur unpack.Result
if unpacker != nil {
if ur, err = unpacker.Wait(); err != nil {
return nil, err
}
}
}
img, err = c.createNewImage(ctx, img)
if err != nil {
@ -113,15 +139,13 @@ func (c *Client) Pull(ctx context.Context, ref string, opts ...RemoteOpt) (_ Ima
i := NewImageWithPlatform(c, img, pullCtx.PlatformMatcher)
if pullCtx.Unpack {
if unpacks == 0 {
// Try to unpack is none is done previously.
if unpacker != nil && ur.Unpacks == 0 {
// Unpack was tried previously but nothing was unpacked
// This is at least required for schema 1 image.
if err := i.Unpack(ctx, pullCtx.Snapshotter, pullCtx.UnpackOpts...); err != nil {
return nil, fmt.Errorf("failed to unpack image on snapshotter %s: %w", pullCtx.Snapshotter, err)
}
}
}
return i, nil
}