
When an upstream client (e.g. kubelet) stops or restarts, the CRI connection to the containerd gets interrupted which is treated as a cancellation of context which subsequently cancels an ongoing operation, including an image pull. This generally gets followed by containerd's GC routine that tries to delete the prepared snapshots for the image layer(s) corresponding to the image in the pull operation that got cancelled. However, if the upstream client immediately retries (or starts a new) image pull operation, containerd initiates a new image pull and starts unpacking the image layers into snapshots. This may create a race condition: the GC routine (corresponding to the failed image pull operation) trying to clean up the same snapshot that the new image pull operation is preparing, thus leading to the "parent snapshot does not exist: not found" error. Race Condition Scenario: Assume an image consisting of 2 layers (L1 and L2, L1 being the bottom layer) that are supposed to get unpacked into snapshots S1 and S2 respectively. During an image pull operation, containerd unpacks(L1) which involves Stat()'ing the chainID. This Stat() fails as the chainID does not exist and Prepare(L1) gets called. Once S1 gets prepared, containerd processes L2 - unpack(L2) which again involves Stat()'ing the chainID which fails as the chainID for S2 does not exist which results in the call to Prepare(L2). However, if the image pull operation gets cancelled before Prepare(L2) is called, then the GC routine tries to clean up S1. When the image pull operation is retried by the upstream client, containerd follows the same series of operations. unpack(L1) gets called which then calls Stat(chainID) for L1. However, this time, Stat(L1) succedes as S1 already exists (from the previous image pull operation) and thus containerd goes to the next iteration to unpack(L2). Now, GC cleans up S1 and when Prepare(L2) gets called, it returns back the "parent snapshot does not exist: not found" error. Fix: Removing the "Stat() + early return" fixes the race condition. Now during the image pull operation corresponding to the client retry, although the chainID (for L1) already exists, containerd does not return early and goes on to Prepare(L1). Since L1 is already prepared, it adds a new lease to S1 and then returns `ErrAlreadyExists`. This new lease prevents GC from cleaning up S1 when containerd processes L2 (unpack(L2) -> Prepare(L2)). Fixes: https://github.com/containerd/containerd/issues/3787 Signed-off-by: Saket Jajoo <saketjajoo@google.com>
545 lines
14 KiB
Go
545 lines
14 KiB
Go
/*
|
|
Copyright The containerd Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package unpack
|
|
|
|
import (
|
|
"context"
|
|
"crypto/rand"
|
|
"encoding/base64"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"strconv"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/containerd/containerd/v2/core/content"
|
|
"github.com/containerd/containerd/v2/core/diff"
|
|
"github.com/containerd/containerd/v2/core/images"
|
|
"github.com/containerd/containerd/v2/core/mount"
|
|
"github.com/containerd/containerd/v2/core/snapshots"
|
|
"github.com/containerd/containerd/v2/internal/cleanup"
|
|
"github.com/containerd/containerd/v2/internal/kmutex"
|
|
"github.com/containerd/containerd/v2/pkg/labels"
|
|
"github.com/containerd/containerd/v2/pkg/tracing"
|
|
"github.com/containerd/errdefs"
|
|
"github.com/containerd/log"
|
|
"github.com/containerd/platforms"
|
|
"github.com/opencontainers/go-digest"
|
|
"github.com/opencontainers/image-spec/identity"
|
|
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
|
|
"golang.org/x/sync/errgroup"
|
|
"golang.org/x/sync/semaphore"
|
|
)
|
|
|
|
const (
|
|
labelSnapshotRef = "containerd.io/snapshot.ref"
|
|
unpackSpanPrefix = "pkg.unpack.unpacker"
|
|
)
|
|
|
|
// Result returns information about the unpacks which were completed.
|
|
type Result struct {
|
|
Unpacks int
|
|
}
|
|
|
|
type unpackerConfig struct {
|
|
platforms []*Platform
|
|
|
|
content content.Store
|
|
|
|
limiter *semaphore.Weighted
|
|
duplicationSuppressor kmutex.KeyedLocker
|
|
}
|
|
|
|
// Platform represents a platform-specific unpack configuration which includes
|
|
// the platform matcher as well as snapshotter and applier.
|
|
type Platform struct {
|
|
Platform platforms.Matcher
|
|
|
|
SnapshotterKey string
|
|
Snapshotter snapshots.Snapshotter
|
|
SnapshotOpts []snapshots.Opt
|
|
|
|
Applier diff.Applier
|
|
ApplyOpts []diff.ApplyOpt
|
|
}
|
|
|
|
type UnpackerOpt func(*unpackerConfig) error
|
|
|
|
func WithUnpackPlatform(u Platform) UnpackerOpt {
|
|
return UnpackerOpt(func(c *unpackerConfig) error {
|
|
if u.Platform == nil {
|
|
u.Platform = platforms.All
|
|
}
|
|
if u.Snapshotter == nil {
|
|
return fmt.Errorf("snapshotter must be provided to unpack")
|
|
}
|
|
if u.SnapshotterKey == "" {
|
|
if s, ok := u.Snapshotter.(fmt.Stringer); ok {
|
|
u.SnapshotterKey = s.String()
|
|
} else {
|
|
u.SnapshotterKey = "unknown"
|
|
}
|
|
}
|
|
if u.Applier == nil {
|
|
return fmt.Errorf("applier must be provided to unpack")
|
|
}
|
|
|
|
c.platforms = append(c.platforms, &u)
|
|
|
|
return nil
|
|
})
|
|
}
|
|
|
|
func WithLimiter(l *semaphore.Weighted) UnpackerOpt {
|
|
return UnpackerOpt(func(c *unpackerConfig) error {
|
|
c.limiter = l
|
|
return nil
|
|
})
|
|
}
|
|
|
|
func WithDuplicationSuppressor(d kmutex.KeyedLocker) UnpackerOpt {
|
|
return UnpackerOpt(func(c *unpackerConfig) error {
|
|
c.duplicationSuppressor = d
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// Unpacker unpacks images by hooking into the image handler process.
|
|
// Unpacks happen in the backgrounds and waited on to complete.
|
|
type Unpacker struct {
|
|
unpackerConfig
|
|
|
|
unpacks int32
|
|
ctx context.Context
|
|
eg *errgroup.Group
|
|
}
|
|
|
|
// NewUnpacker creates a new instance of the unpacker which can be used to wrap an
|
|
// image handler and unpack in parallel to handling. The unpacker will handle
|
|
// calling the block handlers when they are needed by the unpack process.
|
|
func NewUnpacker(ctx context.Context, cs content.Store, opts ...UnpackerOpt) (*Unpacker, error) {
|
|
eg, ctx := errgroup.WithContext(ctx)
|
|
|
|
u := &Unpacker{
|
|
unpackerConfig: unpackerConfig{
|
|
content: cs,
|
|
duplicationSuppressor: kmutex.NewNoop(),
|
|
},
|
|
ctx: ctx,
|
|
eg: eg,
|
|
}
|
|
for _, opt := range opts {
|
|
if err := opt(&u.unpackerConfig); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
if len(u.platforms) == 0 {
|
|
return nil, fmt.Errorf("no unpack platforms defined: %w", errdefs.ErrInvalidArgument)
|
|
}
|
|
return u, nil
|
|
}
|
|
|
|
// Unpack wraps an image handler to filter out blob handling and scheduling them
|
|
// during the unpack process. When an image config is encountered, the unpack
|
|
// process will be started in a goroutine.
|
|
func (u *Unpacker) Unpack(h images.Handler) images.Handler {
|
|
var (
|
|
lock sync.Mutex
|
|
layers = map[digest.Digest][]ocispec.Descriptor{}
|
|
)
|
|
return images.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
|
|
ctx, span := tracing.StartSpan(ctx, tracing.Name(unpackSpanPrefix, "UnpackHandler"))
|
|
defer span.End()
|
|
span.SetAttributes(
|
|
tracing.Attribute("descriptor.media.type", desc.MediaType),
|
|
tracing.Attribute("descriptor.digest", desc.Digest.String()))
|
|
unlock, err := u.lockBlobDescriptor(ctx, desc)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
children, err := h.Handle(ctx, desc)
|
|
unlock()
|
|
if err != nil {
|
|
return children, err
|
|
}
|
|
|
|
if images.IsManifestType(desc.MediaType) {
|
|
var nonLayers []ocispec.Descriptor
|
|
var manifestLayers []ocispec.Descriptor
|
|
// Split layers from non-layers, layers will be handled after
|
|
// the config
|
|
for i, child := range children {
|
|
span.SetAttributes(
|
|
tracing.Attribute("descriptor.child."+strconv.Itoa(i), []string{child.MediaType, child.Digest.String()}),
|
|
)
|
|
if images.IsLayerType(child.MediaType) {
|
|
manifestLayers = append(manifestLayers, child)
|
|
} else {
|
|
nonLayers = append(nonLayers, child)
|
|
}
|
|
}
|
|
|
|
lock.Lock()
|
|
for _, nl := range nonLayers {
|
|
layers[nl.Digest] = manifestLayers
|
|
}
|
|
lock.Unlock()
|
|
|
|
children = nonLayers
|
|
} else if images.IsConfigType(desc.MediaType) {
|
|
lock.Lock()
|
|
l := layers[desc.Digest]
|
|
lock.Unlock()
|
|
if len(l) > 0 {
|
|
u.eg.Go(func() error {
|
|
return u.unpack(h, desc, l)
|
|
})
|
|
}
|
|
}
|
|
return children, nil
|
|
})
|
|
}
|
|
|
|
// Wait waits for any ongoing unpack processes to complete then will return
|
|
// the result.
|
|
func (u *Unpacker) Wait() (Result, error) {
|
|
if err := u.eg.Wait(); err != nil {
|
|
return Result{}, err
|
|
}
|
|
return Result{
|
|
Unpacks: int(u.unpacks),
|
|
}, nil
|
|
}
|
|
|
|
func (u *Unpacker) unpack(
|
|
h images.Handler,
|
|
config ocispec.Descriptor,
|
|
layers []ocispec.Descriptor,
|
|
) error {
|
|
ctx := u.ctx
|
|
ctx, layerSpan := tracing.StartSpan(ctx, tracing.Name(unpackSpanPrefix, "unpack"))
|
|
defer layerSpan.End()
|
|
unpackStart := time.Now()
|
|
p, err := content.ReadBlob(ctx, u.content, config)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
var i ocispec.Image
|
|
if err := json.Unmarshal(p, &i); err != nil {
|
|
return fmt.Errorf("unmarshal image config: %w", err)
|
|
}
|
|
diffIDs := i.RootFS.DiffIDs
|
|
if len(layers) != len(diffIDs) {
|
|
return fmt.Errorf("number of layers and diffIDs don't match: %d != %d", len(layers), len(diffIDs))
|
|
}
|
|
|
|
// TODO: Support multiple unpacks rather than just first match
|
|
var unpack *Platform
|
|
|
|
imgPlatform := platforms.Normalize(i.Platform)
|
|
for _, up := range u.platforms {
|
|
if up.Platform.Match(imgPlatform) {
|
|
unpack = up
|
|
break
|
|
}
|
|
}
|
|
|
|
if unpack == nil {
|
|
log.G(ctx).WithField("image", config.Digest).WithField("platform", platforms.Format(imgPlatform)).Debugf("unpacker does not support platform, only fetching layers")
|
|
return u.fetch(ctx, h, layers, nil)
|
|
}
|
|
|
|
atomic.AddInt32(&u.unpacks, 1)
|
|
|
|
var (
|
|
sn = unpack.Snapshotter
|
|
a = unpack.Applier
|
|
cs = u.content
|
|
|
|
chain []digest.Digest
|
|
|
|
fetchOffset int
|
|
fetchC []chan struct{}
|
|
fetchErr chan error
|
|
)
|
|
|
|
// If there is an early return, ensure any ongoing
|
|
// fetches get their context cancelled
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
defer cancel()
|
|
|
|
doUnpackFn := func(i int, desc ocispec.Descriptor) error {
|
|
parent := identity.ChainID(chain)
|
|
chain = append(chain, diffIDs[i])
|
|
chainID := identity.ChainID(chain).String()
|
|
|
|
unlock, err := u.lockSnChainID(ctx, chainID, unpack.SnapshotterKey)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer unlock()
|
|
|
|
// inherits annotations which are provided as snapshot labels.
|
|
snapshotLabels := snapshots.FilterInheritedLabels(desc.Annotations)
|
|
if snapshotLabels == nil {
|
|
snapshotLabels = make(map[string]string)
|
|
}
|
|
snapshotLabels[labelSnapshotRef] = chainID
|
|
|
|
var (
|
|
key string
|
|
mounts []mount.Mount
|
|
opts = append(unpack.SnapshotOpts, snapshots.WithLabels(snapshotLabels))
|
|
)
|
|
|
|
for try := 1; try <= 3; try++ {
|
|
// Prepare snapshot with from parent, label as root
|
|
key = fmt.Sprintf(snapshots.UnpackKeyFormat, uniquePart(), chainID)
|
|
mounts, err = sn.Prepare(ctx, key, parent.String(), opts...)
|
|
if err != nil {
|
|
if errdefs.IsAlreadyExists(err) {
|
|
if _, err := sn.Stat(ctx, chainID); err != nil {
|
|
if !errdefs.IsNotFound(err) {
|
|
return fmt.Errorf("failed to stat snapshot %s: %w", chainID, err)
|
|
}
|
|
// Try again, this should be rare, log it
|
|
log.G(ctx).WithField("key", key).WithField("chainid", chainID).Debug("extraction snapshot already exists, chain id not found")
|
|
} else {
|
|
// no need to handle, snapshot now found with chain id
|
|
return nil
|
|
}
|
|
} else {
|
|
return fmt.Errorf("failed to prepare extraction snapshot %q: %w", key, err)
|
|
}
|
|
} else {
|
|
break
|
|
}
|
|
}
|
|
if err != nil {
|
|
return fmt.Errorf("unable to prepare extraction snapshot: %w", err)
|
|
}
|
|
|
|
// Abort the snapshot if commit does not happen
|
|
abort := func(ctx context.Context) {
|
|
if err := sn.Remove(ctx, key); err != nil {
|
|
log.G(ctx).WithError(err).Errorf("failed to cleanup %q", key)
|
|
}
|
|
}
|
|
|
|
if fetchErr == nil {
|
|
fetchErr = make(chan error, 1)
|
|
fetchOffset = i
|
|
fetchC = make([]chan struct{}, len(layers)-fetchOffset)
|
|
for i := range fetchC {
|
|
fetchC[i] = make(chan struct{})
|
|
}
|
|
|
|
go func(i int) {
|
|
err := u.fetch(ctx, h, layers[i:], fetchC)
|
|
if err != nil {
|
|
fetchErr <- err
|
|
}
|
|
close(fetchErr)
|
|
}(i)
|
|
}
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
cleanup.Do(ctx, abort)
|
|
return ctx.Err()
|
|
case err := <-fetchErr:
|
|
if err != nil {
|
|
cleanup.Do(ctx, abort)
|
|
return err
|
|
}
|
|
case <-fetchC[i-fetchOffset]:
|
|
}
|
|
|
|
diff, err := a.Apply(ctx, desc, mounts, unpack.ApplyOpts...)
|
|
if err != nil {
|
|
cleanup.Do(ctx, abort)
|
|
return fmt.Errorf("failed to extract layer %s: %w", diffIDs[i], err)
|
|
}
|
|
if diff.Digest != diffIDs[i] {
|
|
cleanup.Do(ctx, abort)
|
|
return fmt.Errorf("wrong diff id calculated on extraction %q", diffIDs[i])
|
|
}
|
|
|
|
if err = sn.Commit(ctx, chainID, key, opts...); err != nil {
|
|
cleanup.Do(ctx, abort)
|
|
if errdefs.IsAlreadyExists(err) {
|
|
return nil
|
|
}
|
|
return fmt.Errorf("failed to commit snapshot %s: %w", key, err)
|
|
}
|
|
|
|
// Set the uncompressed label after the uncompressed
|
|
// digest has been verified through apply.
|
|
cinfo := content.Info{
|
|
Digest: desc.Digest,
|
|
Labels: map[string]string{
|
|
labels.LabelUncompressed: diff.Digest.String(),
|
|
},
|
|
}
|
|
if _, err := cs.Update(ctx, cinfo, "labels."+labels.LabelUncompressed); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
for i, desc := range layers {
|
|
_, layerSpan := tracing.StartSpan(ctx, tracing.Name(unpackSpanPrefix, "unpackLayer"))
|
|
unpackLayerStart := time.Now()
|
|
layerSpan.SetAttributes(
|
|
tracing.Attribute("layer.media.type", desc.MediaType),
|
|
tracing.Attribute("layer.media.size", desc.Size),
|
|
tracing.Attribute("layer.media.digest", desc.Digest.String()),
|
|
)
|
|
if err := doUnpackFn(i, desc); err != nil {
|
|
layerSpan.SetStatus(err)
|
|
layerSpan.End()
|
|
return err
|
|
}
|
|
layerSpan.End()
|
|
log.G(ctx).WithFields(log.Fields{
|
|
"layer": desc.Digest,
|
|
"duration": time.Since(unpackLayerStart),
|
|
}).Debug("layer unpacked")
|
|
}
|
|
|
|
chainID := identity.ChainID(chain).String()
|
|
cinfo := content.Info{
|
|
Digest: config.Digest,
|
|
Labels: map[string]string{
|
|
fmt.Sprintf("containerd.io/gc.ref.snapshot.%s", unpack.SnapshotterKey): chainID,
|
|
},
|
|
}
|
|
_, err = cs.Update(ctx, cinfo, fmt.Sprintf("labels.containerd.io/gc.ref.snapshot.%s", unpack.SnapshotterKey))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
log.G(ctx).WithFields(log.Fields{
|
|
"config": config.Digest,
|
|
"chainID": chainID,
|
|
"duration": time.Since(unpackStart),
|
|
}).Debug("image unpacked")
|
|
|
|
return nil
|
|
}
|
|
|
|
func (u *Unpacker) fetch(ctx context.Context, h images.Handler, layers []ocispec.Descriptor, done []chan struct{}) error {
|
|
eg, ctx2 := errgroup.WithContext(ctx)
|
|
for i, desc := range layers {
|
|
ctx2, layerSpan := tracing.StartSpan(ctx2, tracing.Name(unpackSpanPrefix, "fetchLayer"))
|
|
layerSpan.SetAttributes(
|
|
tracing.Attribute("layer.media.type", desc.MediaType),
|
|
tracing.Attribute("layer.media.size", desc.Size),
|
|
tracing.Attribute("layer.media.digest", desc.Digest.String()),
|
|
)
|
|
desc := desc
|
|
var ch chan struct{}
|
|
if done != nil {
|
|
ch = done[i]
|
|
}
|
|
|
|
if err := u.acquire(ctx); err != nil {
|
|
return err
|
|
}
|
|
|
|
eg.Go(func() error {
|
|
defer layerSpan.End()
|
|
|
|
unlock, err := u.lockBlobDescriptor(ctx2, desc)
|
|
if err != nil {
|
|
u.release()
|
|
return err
|
|
}
|
|
|
|
_, err = h.Handle(ctx2, desc)
|
|
|
|
unlock()
|
|
u.release()
|
|
|
|
if err != nil && !errors.Is(err, images.ErrSkipDesc) {
|
|
return err
|
|
}
|
|
if ch != nil {
|
|
close(ch)
|
|
}
|
|
|
|
return nil
|
|
})
|
|
}
|
|
|
|
return eg.Wait()
|
|
}
|
|
|
|
func (u *Unpacker) acquire(ctx context.Context) error {
|
|
if u.limiter == nil {
|
|
return nil
|
|
}
|
|
return u.limiter.Acquire(ctx, 1)
|
|
}
|
|
|
|
func (u *Unpacker) release() {
|
|
if u.limiter == nil {
|
|
return
|
|
}
|
|
u.limiter.Release(1)
|
|
}
|
|
|
|
func (u *Unpacker) lockSnChainID(ctx context.Context, chainID, snapshotter string) (func(), error) {
|
|
key := u.makeChainIDKeyWithSnapshotter(chainID, snapshotter)
|
|
|
|
if err := u.duplicationSuppressor.Lock(ctx, key); err != nil {
|
|
return nil, err
|
|
}
|
|
return func() {
|
|
u.duplicationSuppressor.Unlock(key)
|
|
}, nil
|
|
}
|
|
|
|
func (u *Unpacker) lockBlobDescriptor(ctx context.Context, desc ocispec.Descriptor) (func(), error) {
|
|
key := u.makeBlobDescriptorKey(desc)
|
|
|
|
if err := u.duplicationSuppressor.Lock(ctx, key); err != nil {
|
|
return nil, err
|
|
}
|
|
return func() {
|
|
u.duplicationSuppressor.Unlock(key)
|
|
}, nil
|
|
}
|
|
|
|
func (u *Unpacker) makeChainIDKeyWithSnapshotter(chainID, snapshotter string) string {
|
|
return fmt.Sprintf("sn://%s/%v", snapshotter, chainID)
|
|
}
|
|
|
|
func (u *Unpacker) makeBlobDescriptorKey(desc ocispec.Descriptor) string {
|
|
return fmt.Sprintf("blob://%v", desc.Digest)
|
|
}
|
|
|
|
func uniquePart() string {
|
|
t := time.Now()
|
|
var b [3]byte
|
|
// Ignore read failures, just decreases uniqueness
|
|
rand.Read(b[:])
|
|
return fmt.Sprintf("%d-%s", t.Nanosecond(), base64.URLEncoding.EncodeToString(b[:]))
|
|
}
|