537 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			537 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| /*
 | |
|    Copyright The containerd Authors.
 | |
| 
 | |
|    Licensed under the Apache License, Version 2.0 (the "License");
 | |
|    you may not use this file except in compliance with the License.
 | |
|    You may obtain a copy of the License at
 | |
| 
 | |
|        http://www.apache.org/licenses/LICENSE-2.0
 | |
| 
 | |
|    Unless required by applicable law or agreed to in writing, software
 | |
|    distributed under the License is distributed on an "AS IS" BASIS,
 | |
|    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
|    See the License for the specific language governing permissions and
 | |
|    limitations under the License.
 | |
| */
 | |
| 
 | |
| package unpack
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"encoding/base64"
 | |
| 	"encoding/json"
 | |
| 	"errors"
 | |
| 	"fmt"
 | |
| 	"math/rand"
 | |
| 	"strconv"
 | |
| 	"sync"
 | |
| 	"sync/atomic"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/containerd/containerd/content"
 | |
| 	"github.com/containerd/containerd/diff"
 | |
| 	"github.com/containerd/containerd/errdefs"
 | |
| 	"github.com/containerd/containerd/images"
 | |
| 	"github.com/containerd/containerd/log"
 | |
| 	"github.com/containerd/containerd/mount"
 | |
| 	"github.com/containerd/containerd/pkg/kmutex"
 | |
| 	"github.com/containerd/containerd/platforms"
 | |
| 	"github.com/containerd/containerd/snapshots"
 | |
| 	"github.com/containerd/containerd/tracing"
 | |
| 	"github.com/opencontainers/go-digest"
 | |
| 	"github.com/opencontainers/image-spec/identity"
 | |
| 	ocispec "github.com/opencontainers/image-spec/specs-go/v1"
 | |
| 	"github.com/sirupsen/logrus"
 | |
| 	"golang.org/x/sync/errgroup"
 | |
| 	"golang.org/x/sync/semaphore"
 | |
| )
 | |
| 
 | |
| const (
 | |
| 	labelSnapshotRef = "containerd.io/snapshot.ref"
 | |
| 	unpackSpanPrefix = "pkg.unpack.unpacker"
 | |
| )
 | |
| 
 | |
| // Result returns information about the unpacks which were completed.
 | |
| type Result struct {
 | |
| 	Unpacks int
 | |
| }
 | |
| 
 | |
| type unpackerConfig struct {
 | |
| 	platforms []*Platform
 | |
| 
 | |
| 	content content.Store
 | |
| 
 | |
| 	limiter               *semaphore.Weighted
 | |
| 	duplicationSuppressor kmutex.KeyedLocker
 | |
| }
 | |
| 
 | |
| // Platform represents a platform-specific unpack configuration which includes
 | |
| // the platform matcher as well as snapshotter and applier.
 | |
| type Platform struct {
 | |
| 	Platform platforms.Matcher
 | |
| 
 | |
| 	SnapshotterKey string
 | |
| 	Snapshotter    snapshots.Snapshotter
 | |
| 	SnapshotOpts   []snapshots.Opt
 | |
| 
 | |
| 	Applier   diff.Applier
 | |
| 	ApplyOpts []diff.ApplyOpt
 | |
| }
 | |
| 
 | |
| type UnpackerOpt func(*unpackerConfig) error
 | |
| 
 | |
| func WithUnpackPlatform(u Platform) UnpackerOpt {
 | |
| 	return UnpackerOpt(func(c *unpackerConfig) error {
 | |
| 		if u.Platform == nil {
 | |
| 			u.Platform = platforms.All
 | |
| 		}
 | |
| 		if u.Snapshotter == nil {
 | |
| 			return fmt.Errorf("snapshotter must be provided to unpack")
 | |
| 		}
 | |
| 		if u.SnapshotterKey == "" {
 | |
| 			if s, ok := u.Snapshotter.(fmt.Stringer); ok {
 | |
| 				u.SnapshotterKey = s.String()
 | |
| 			} else {
 | |
| 				u.SnapshotterKey = "unknown"
 | |
| 			}
 | |
| 		}
 | |
| 		if u.Applier == nil {
 | |
| 			return fmt.Errorf("applier must be provided to unpack")
 | |
| 		}
 | |
| 
 | |
| 		c.platforms = append(c.platforms, &u)
 | |
| 
 | |
| 		return nil
 | |
| 	})
 | |
| }
 | |
| 
 | |
| func WithLimiter(l *semaphore.Weighted) UnpackerOpt {
 | |
| 	return UnpackerOpt(func(c *unpackerConfig) error {
 | |
| 		c.limiter = l
 | |
| 		return nil
 | |
| 	})
 | |
| }
 | |
| 
 | |
| func WithDuplicationSuppressor(d kmutex.KeyedLocker) UnpackerOpt {
 | |
| 	return UnpackerOpt(func(c *unpackerConfig) error {
 | |
| 		c.duplicationSuppressor = d
 | |
| 		return nil
 | |
| 	})
 | |
| }
 | |
| 
 | |
| // Unpacker unpacks images by hooking into the image handler process.
 | |
| // Unpacks happen in the backgrounds and waited on to complete.
 | |
| type Unpacker struct {
 | |
| 	unpackerConfig
 | |
| 
 | |
| 	unpacks int32
 | |
| 	ctx     context.Context
 | |
| 	eg      *errgroup.Group
 | |
| }
 | |
| 
 | |
| // NewUnpacker creates a new instance of the unpacker which can be used to wrap an
 | |
| // image handler and unpack in parallel to handling. The unpacker will handle
 | |
| // calling the block handlers when they are needed by the unpack process.
 | |
| func NewUnpacker(ctx context.Context, cs content.Store, opts ...UnpackerOpt) (*Unpacker, error) {
 | |
| 	eg, ctx := errgroup.WithContext(ctx)
 | |
| 
 | |
| 	u := &Unpacker{
 | |
| 		unpackerConfig: unpackerConfig{
 | |
| 			content:               cs,
 | |
| 			duplicationSuppressor: kmutex.NewNoop(),
 | |
| 		},
 | |
| 		ctx: ctx,
 | |
| 		eg:  eg,
 | |
| 	}
 | |
| 	for _, opt := range opts {
 | |
| 		if err := opt(&u.unpackerConfig); err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 	}
 | |
| 	if len(u.platforms) == 0 {
 | |
| 		return nil, fmt.Errorf("no unpack platforms defined: %w", errdefs.ErrInvalidArgument)
 | |
| 	}
 | |
| 	return u, nil
 | |
| }
 | |
| 
 | |
| // Unpack wraps an image handler to filter out blob handling and scheduling them
 | |
| // during the unpack process. When an image config is encountered, the unpack
 | |
| // process will be started in a goroutine.
 | |
| func (u *Unpacker) Unpack(h images.Handler) images.Handler {
 | |
| 	var (
 | |
| 		lock   sync.Mutex
 | |
| 		layers = map[digest.Digest][]ocispec.Descriptor{}
 | |
| 	)
 | |
| 	return images.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
 | |
| 		ctx, span := tracing.StartSpan(ctx, tracing.Name(unpackSpanPrefix, "UnpackHandler"))
 | |
| 		defer span.End()
 | |
| 		span.SetAttributes(
 | |
| 			tracing.Attribute("descriptor.media.type", desc.MediaType),
 | |
| 			tracing.Attribute("descriptor.digest", desc.Digest.String()))
 | |
| 		unlock, err := u.lockBlobDescriptor(ctx, desc)
 | |
| 		if err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 		children, err := h.Handle(ctx, desc)
 | |
| 		unlock()
 | |
| 		if err != nil {
 | |
| 			return children, err
 | |
| 		}
 | |
| 
 | |
| 		switch desc.MediaType {
 | |
| 		case images.MediaTypeDockerSchema2Manifest, ocispec.MediaTypeImageManifest:
 | |
| 			var nonLayers []ocispec.Descriptor
 | |
| 			var manifestLayers []ocispec.Descriptor
 | |
| 			// Split layers from non-layers, layers will be handled after
 | |
| 			// the config
 | |
| 			for i, child := range children {
 | |
| 				span.SetAttributes(
 | |
| 					tracing.Attribute("descriptor.child."+strconv.Itoa(i), []string{child.MediaType, child.Digest.String()}),
 | |
| 				)
 | |
| 				if images.IsLayerType(child.MediaType) {
 | |
| 					manifestLayers = append(manifestLayers, child)
 | |
| 				} else {
 | |
| 					nonLayers = append(nonLayers, child)
 | |
| 				}
 | |
| 			}
 | |
| 
 | |
| 			lock.Lock()
 | |
| 			for _, nl := range nonLayers {
 | |
| 				layers[nl.Digest] = manifestLayers
 | |
| 			}
 | |
| 			lock.Unlock()
 | |
| 
 | |
| 			children = nonLayers
 | |
| 		case images.MediaTypeDockerSchema2Config, ocispec.MediaTypeImageConfig:
 | |
| 			lock.Lock()
 | |
| 			l := layers[desc.Digest]
 | |
| 			lock.Unlock()
 | |
| 			if len(l) > 0 {
 | |
| 				u.eg.Go(func() error {
 | |
| 					return u.unpack(h, desc, l)
 | |
| 				})
 | |
| 			}
 | |
| 		}
 | |
| 		return children, nil
 | |
| 	})
 | |
| }
 | |
| 
 | |
| // Wait waits for any ongoing unpack processes to complete then will return
 | |
| // the result.
 | |
| func (u *Unpacker) Wait() (Result, error) {
 | |
| 	if err := u.eg.Wait(); err != nil {
 | |
| 		return Result{}, err
 | |
| 	}
 | |
| 	return Result{
 | |
| 		Unpacks: int(u.unpacks),
 | |
| 	}, nil
 | |
| }
 | |
| 
 | |
| func (u *Unpacker) unpack(
 | |
| 	h images.Handler,
 | |
| 	config ocispec.Descriptor,
 | |
| 	layers []ocispec.Descriptor,
 | |
| ) error {
 | |
| 	ctx := u.ctx
 | |
| 	ctx, layerSpan := tracing.StartSpan(ctx, tracing.Name(unpackSpanPrefix, "unpack"))
 | |
| 	defer layerSpan.End()
 | |
| 	p, err := content.ReadBlob(ctx, u.content, config)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	var i ocispec.Image
 | |
| 	if err := json.Unmarshal(p, &i); err != nil {
 | |
| 		return fmt.Errorf("unmarshal image config: %w", err)
 | |
| 	}
 | |
| 	diffIDs := i.RootFS.DiffIDs
 | |
| 	if len(layers) != len(diffIDs) {
 | |
| 		return fmt.Errorf("number of layers and diffIDs don't match: %d != %d", len(layers), len(diffIDs))
 | |
| 	}
 | |
| 
 | |
| 	// TODO: Support multiple unpacks rather than just first match
 | |
| 	var unpack *Platform
 | |
| 
 | |
| 	imgPlatform := platforms.Normalize(ocispec.Platform{OS: i.OS, Architecture: i.Architecture})
 | |
| 	for _, up := range u.platforms {
 | |
| 		if up.Platform.Match(imgPlatform) {
 | |
| 			unpack = up
 | |
| 			break
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	if unpack == nil {
 | |
| 		return fmt.Errorf("unpacker does not support platform %s for image %s", imgPlatform, config.Digest)
 | |
| 	}
 | |
| 
 | |
| 	atomic.AddInt32(&u.unpacks, 1)
 | |
| 
 | |
| 	var (
 | |
| 		sn = unpack.Snapshotter
 | |
| 		a  = unpack.Applier
 | |
| 		cs = u.content
 | |
| 
 | |
| 		chain []digest.Digest
 | |
| 
 | |
| 		fetchOffset int
 | |
| 		fetchC      []chan struct{}
 | |
| 		fetchErr    chan error
 | |
| 	)
 | |
| 
 | |
| 	// If there is an early return, ensure any ongoing
 | |
| 	// fetches get their context cancelled
 | |
| 	ctx, cancel := context.WithCancel(ctx)
 | |
| 	defer cancel()
 | |
| 
 | |
| 	doUnpackFn := func(i int, desc ocispec.Descriptor) error {
 | |
| 		parent := identity.ChainID(chain)
 | |
| 		chain = append(chain, diffIDs[i])
 | |
| 		chainID := identity.ChainID(chain).String()
 | |
| 
 | |
| 		unlock, err := u.lockSnChainID(ctx, chainID, unpack.SnapshotterKey)
 | |
| 		if err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 		defer unlock()
 | |
| 
 | |
| 		if _, err := sn.Stat(ctx, chainID); err == nil {
 | |
| 			// no need to handle
 | |
| 			return nil
 | |
| 		} else if !errdefs.IsNotFound(err) {
 | |
| 			return fmt.Errorf("failed to stat snapshot %s: %w", chainID, err)
 | |
| 		}
 | |
| 
 | |
| 		// inherits annotations which are provided as snapshot labels.
 | |
| 		labels := snapshots.FilterInheritedLabels(desc.Annotations)
 | |
| 		if labels == nil {
 | |
| 			labels = make(map[string]string)
 | |
| 		}
 | |
| 		labels[labelSnapshotRef] = chainID
 | |
| 
 | |
| 		var (
 | |
| 			key    string
 | |
| 			mounts []mount.Mount
 | |
| 			opts   = append(unpack.SnapshotOpts, snapshots.WithLabels(labels))
 | |
| 		)
 | |
| 
 | |
| 		for try := 1; try <= 3; try++ {
 | |
| 			// Prepare snapshot with from parent, label as root
 | |
| 			key = fmt.Sprintf(snapshots.UnpackKeyFormat, uniquePart(), chainID)
 | |
| 			mounts, err = sn.Prepare(ctx, key, parent.String(), opts...)
 | |
| 			if err != nil {
 | |
| 				if errdefs.IsAlreadyExists(err) {
 | |
| 					if _, err := sn.Stat(ctx, chainID); err != nil {
 | |
| 						if !errdefs.IsNotFound(err) {
 | |
| 							return fmt.Errorf("failed to stat snapshot %s: %w", chainID, err)
 | |
| 						}
 | |
| 						// Try again, this should be rare, log it
 | |
| 						log.G(ctx).WithField("key", key).WithField("chainid", chainID).Debug("extraction snapshot already exists, chain id not found")
 | |
| 					} else {
 | |
| 						// no need to handle, snapshot now found with chain id
 | |
| 						return nil
 | |
| 					}
 | |
| 				} else {
 | |
| 					return fmt.Errorf("failed to prepare extraction snapshot %q: %w", key, err)
 | |
| 				}
 | |
| 			} else {
 | |
| 				break
 | |
| 			}
 | |
| 		}
 | |
| 		if err != nil {
 | |
| 			return fmt.Errorf("unable to prepare extraction snapshot: %w", err)
 | |
| 		}
 | |
| 
 | |
| 		// Abort the snapshot if commit does not happen
 | |
| 		abort := func() {
 | |
| 			if err := sn.Remove(ctx, key); err != nil {
 | |
| 				log.G(ctx).WithError(err).Errorf("failed to cleanup %q", key)
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 		if fetchErr == nil {
 | |
| 			fetchErr = make(chan error, 1)
 | |
| 			fetchOffset = i
 | |
| 			fetchC = make([]chan struct{}, len(layers)-fetchOffset)
 | |
| 			for i := range fetchC {
 | |
| 				fetchC[i] = make(chan struct{})
 | |
| 			}
 | |
| 
 | |
| 			go func(i int) {
 | |
| 				err := u.fetch(ctx, h, layers[i:], fetchC)
 | |
| 				if err != nil {
 | |
| 					fetchErr <- err
 | |
| 				}
 | |
| 				close(fetchErr)
 | |
| 			}(i)
 | |
| 		}
 | |
| 
 | |
| 		select {
 | |
| 		case <-ctx.Done():
 | |
| 			abort()
 | |
| 			return ctx.Err()
 | |
| 		case err := <-fetchErr:
 | |
| 			if err != nil {
 | |
| 				abort()
 | |
| 				return err
 | |
| 			}
 | |
| 		case <-fetchC[i-fetchOffset]:
 | |
| 		}
 | |
| 
 | |
| 		diff, err := a.Apply(ctx, desc, mounts, unpack.ApplyOpts...)
 | |
| 		if err != nil {
 | |
| 			abort()
 | |
| 			return fmt.Errorf("failed to extract layer %s: %w", diffIDs[i], err)
 | |
| 		}
 | |
| 		if diff.Digest != diffIDs[i] {
 | |
| 			abort()
 | |
| 			return fmt.Errorf("wrong diff id calculated on extraction %q", diffIDs[i])
 | |
| 		}
 | |
| 
 | |
| 		if err = sn.Commit(ctx, chainID, key, opts...); err != nil {
 | |
| 			abort()
 | |
| 			if errdefs.IsAlreadyExists(err) {
 | |
| 				return nil
 | |
| 			}
 | |
| 			return fmt.Errorf("failed to commit snapshot %s: %w", key, err)
 | |
| 		}
 | |
| 
 | |
| 		// Set the uncompressed label after the uncompressed
 | |
| 		// digest has been verified through apply.
 | |
| 		cinfo := content.Info{
 | |
| 			Digest: desc.Digest,
 | |
| 			Labels: map[string]string{
 | |
| 				"containerd.io/uncompressed": diff.Digest.String(),
 | |
| 			},
 | |
| 		}
 | |
| 		if _, err := cs.Update(ctx, cinfo, "labels.containerd.io/uncompressed"); err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	for i, desc := range layers {
 | |
| 		_, layerSpan := tracing.StartSpan(ctx, tracing.Name(unpackSpanPrefix, "unpackLayer"))
 | |
| 		layerSpan.SetAttributes(
 | |
| 			tracing.Attribute("layer.media.type", desc.MediaType),
 | |
| 			tracing.Attribute("layer.media.size", desc.Size),
 | |
| 			tracing.Attribute("layer.media.digest", desc.Digest.String()),
 | |
| 		)
 | |
| 		if err := doUnpackFn(i, desc); err != nil {
 | |
| 			layerSpan.SetStatus(err)
 | |
| 			layerSpan.End()
 | |
| 			return err
 | |
| 		}
 | |
| 		layerSpan.End()
 | |
| 	}
 | |
| 
 | |
| 	chainID := identity.ChainID(chain).String()
 | |
| 	cinfo := content.Info{
 | |
| 		Digest: config.Digest,
 | |
| 		Labels: map[string]string{
 | |
| 			fmt.Sprintf("containerd.io/gc.ref.snapshot.%s", unpack.SnapshotterKey): chainID,
 | |
| 		},
 | |
| 	}
 | |
| 	_, err = cs.Update(ctx, cinfo, fmt.Sprintf("labels.containerd.io/gc.ref.snapshot.%s", unpack.SnapshotterKey))
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	log.G(ctx).WithFields(logrus.Fields{
 | |
| 		"config":  config.Digest,
 | |
| 		"chainID": chainID,
 | |
| 	}).Debug("image unpacked")
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (u *Unpacker) fetch(ctx context.Context, h images.Handler, layers []ocispec.Descriptor, done []chan struct{}) error {
 | |
| 	eg, ctx2 := errgroup.WithContext(ctx)
 | |
| 	for i, desc := range layers {
 | |
| 		ctx2, layerSpan := tracing.StartSpan(ctx2, tracing.Name(unpackSpanPrefix, "fetchLayer"))
 | |
| 		layerSpan.SetAttributes(
 | |
| 			tracing.Attribute("layer.media.type", desc.MediaType),
 | |
| 			tracing.Attribute("layer.media.size", desc.Size),
 | |
| 			tracing.Attribute("layer.media.digest", desc.Digest.String()),
 | |
| 		)
 | |
| 		desc := desc
 | |
| 		i := i
 | |
| 		if err := u.acquire(ctx); err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 
 | |
| 		eg.Go(func() error {
 | |
| 			unlock, err := u.lockBlobDescriptor(ctx2, desc)
 | |
| 			if err != nil {
 | |
| 				u.release()
 | |
| 				return err
 | |
| 			}
 | |
| 
 | |
| 			_, err = h.Handle(ctx2, desc)
 | |
| 
 | |
| 			unlock()
 | |
| 			u.release()
 | |
| 
 | |
| 			if err != nil && !errors.Is(err, images.ErrSkipDesc) {
 | |
| 				return err
 | |
| 			}
 | |
| 			close(done[i])
 | |
| 
 | |
| 			return nil
 | |
| 		})
 | |
| 		layerSpan.End()
 | |
| 	}
 | |
| 
 | |
| 	return eg.Wait()
 | |
| }
 | |
| 
 | |
| func (u *Unpacker) acquire(ctx context.Context) error {
 | |
| 	if u.limiter == nil {
 | |
| 		return nil
 | |
| 	}
 | |
| 	return u.limiter.Acquire(ctx, 1)
 | |
| }
 | |
| 
 | |
| func (u *Unpacker) release() {
 | |
| 	if u.limiter == nil {
 | |
| 		return
 | |
| 	}
 | |
| 	u.limiter.Release(1)
 | |
| }
 | |
| 
 | |
| func (u *Unpacker) lockSnChainID(ctx context.Context, chainID, snapshotter string) (func(), error) {
 | |
| 	key := u.makeChainIDKeyWithSnapshotter(chainID, snapshotter)
 | |
| 
 | |
| 	if err := u.duplicationSuppressor.Lock(ctx, key); err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	return func() {
 | |
| 		u.duplicationSuppressor.Unlock(key)
 | |
| 	}, nil
 | |
| }
 | |
| 
 | |
| func (u *Unpacker) lockBlobDescriptor(ctx context.Context, desc ocispec.Descriptor) (func(), error) {
 | |
| 	key := u.makeBlobDescriptorKey(desc)
 | |
| 
 | |
| 	if err := u.duplicationSuppressor.Lock(ctx, key); err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	return func() {
 | |
| 		u.duplicationSuppressor.Unlock(key)
 | |
| 	}, nil
 | |
| }
 | |
| 
 | |
| func (u *Unpacker) makeChainIDKeyWithSnapshotter(chainID, snapshotter string) string {
 | |
| 	return fmt.Sprintf("sn://%s/%v", snapshotter, chainID)
 | |
| }
 | |
| 
 | |
| func (u *Unpacker) makeBlobDescriptorKey(desc ocispec.Descriptor) string {
 | |
| 	return fmt.Sprintf("blob://%v", desc.Digest)
 | |
| }
 | |
| 
 | |
| func uniquePart() string {
 | |
| 	t := time.Now()
 | |
| 	var b [3]byte
 | |
| 	// Ignore read failures, just decreases uniqueness
 | |
| 	rand.Read(b[:])
 | |
| 	return fmt.Sprintf("%d-%s", t.Nanosecond(), base64.URLEncoding.EncodeToString(b[:]))
 | |
| }
 | 
