
Background: With current design, the content backend uses key-lock for long-lived write transaction. If the content reference has been marked for write transaction, the other requestes on the same reference will fail fast with unavailable error. Since the metadata plugin is based on boltbd which only supports single-writer, the content backend can't block or handle the request too long. It requires the client to handle retry by itself, like OpenWriter - backoff retry helper. But the maximum retry interval can be up to 2 seconds. If there are several concurrent requestes fo the same image, the waiters maybe wakeup at the same time and there is only one waiter can continue. A lot of waiters will get into sleep and we will take long time to finish all the pulling jobs and be worse if the image has many more layers, which mentioned in issue #4937. After fetching, containerd.Pull API allows several hanlers to commit same ChainID snapshotter but only one can be done successfully. Since unpack tar.gz is time-consuming job, it can impact the performance on unpacking for same ChainID snapshotter in parallel. For instance, the Request 2 doesn't need to prepare and commit, it should just wait for Request 1 finish, which mentioned in pull request #6318. ```text Request 1 Request 2 Prepare | | | | Prepare Commit | | | | Commit(failed on exist) ``` Both content backoff retry and unnecessary unpack impacts the performance. Solution: Introduced the duplicate suppression in fetch and unpack context. The deplicate suppression uses key-mutex and single-waiter-notify to support singleflight. The caller can use the duplicate suppression in different PullImage handlers so that we can avoid unnecessary unpack and spin-lock in OpenWriter. Test Result: Before enhancement: ```bash ➜ /tmp sudo bash testing.sh "localhost:5000/redis:latest" 20 crictl pull localhost:5000/redis:latest (x20) takes ... real 1m6.172s user 0m0.268s sys 0m0.193s docker pull localhost:5000/redis:latest (x20) takes ... real 0m1.324s user 0m0.441s sys 0m0.316s ➜ /tmp sudo bash testing.sh "localhost:5000/golang:latest" 20 crictl pull localhost:5000/golang:latest (x20) takes ... real 1m47.657s user 0m0.284s sys 0m0.224s docker pull localhost:5000/golang:latest (x20) takes ... real 0m6.381s user 0m0.488s sys 0m0.358s ``` With this enhancement: ```bash ➜ /tmp sudo bash testing.sh "localhost:5000/redis:latest" 20 crictl pull localhost:5000/redis:latest (x20) takes ... real 0m1.140s user 0m0.243s sys 0m0.178s docker pull localhost:5000/redis:latest (x20) takes ... real 0m1.239s user 0m0.463s sys 0m0.275s ➜ /tmp sudo bash testing.sh "localhost:5000/golang:latest" 20 crictl pull localhost:5000/golang:latest (x20) takes ... real 0m5.546s user 0m0.217s sys 0m0.219s docker pull localhost:5000/golang:latest (x20) takes ... real 0m6.090s user 0m0.501s sys 0m0.331s ``` Test Script: localhost:5000/{redis|golang}:latest is equal to docker.io/library/{redis|golang}:latest. The image is hold in local registry service by `docker run -d -p 5000:5000 --name registry registry:2`. ```bash image_name="${1}" pull_times="${2:-10}" cleanup() { ctr image rmi "${image_name}" ctr -n k8s.io image rmi "${image_name}" crictl rmi "${image_name}" docker rmi "${image_name}" sleep 2 } crictl_testing() { for idx in $(seq 1 ${pull_times}); do crictl pull "${image_name}" > /dev/null 2>&1 & done wait } docker_testing() { for idx in $(seq 1 ${pull_times}); do docker pull "${image_name}" > /dev/null 2>&1 & done wait } cleanup > /dev/null 2>&1 echo 3 > /proc/sys/vm/drop_caches sleep 3 echo "crictl pull $image_name (x${pull_times}) takes ..." time crictl_testing echo echo 3 > /proc/sys/vm/drop_caches sleep 3 echo "docker pull $image_name (x${pull_times}) takes ..." time docker_testing ``` Fixes: #4937 Close: #4985 Close: #6318 Signed-off-by: Wei Fu <fuweid89@gmail.com>
428 lines
11 KiB
Go
428 lines
11 KiB
Go
/*
|
|
Copyright The containerd Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package containerd
|
|
|
|
import (
|
|
"context"
|
|
"encoding/base64"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"math/rand"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/containerd/containerd/content"
|
|
"github.com/containerd/containerd/errdefs"
|
|
"github.com/containerd/containerd/images"
|
|
"github.com/containerd/containerd/log"
|
|
"github.com/containerd/containerd/mount"
|
|
"github.com/containerd/containerd/pkg/kmutex"
|
|
"github.com/containerd/containerd/platforms"
|
|
"github.com/containerd/containerd/snapshots"
|
|
"github.com/opencontainers/go-digest"
|
|
"github.com/opencontainers/image-spec/identity"
|
|
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
|
|
"github.com/sirupsen/logrus"
|
|
"golang.org/x/sync/errgroup"
|
|
"golang.org/x/sync/semaphore"
|
|
)
|
|
|
|
const (
|
|
labelSnapshotRef = "containerd.io/snapshot.ref"
|
|
)
|
|
|
|
type unpacker struct {
|
|
updateCh chan ocispec.Descriptor
|
|
snapshotter string
|
|
config UnpackConfig
|
|
c *Client
|
|
limiter *semaphore.Weighted
|
|
}
|
|
|
|
func (c *Client) newUnpacker(ctx context.Context, rCtx *RemoteContext) (*unpacker, error) {
|
|
snapshotter, err := c.resolveSnapshotterName(ctx, rCtx.Snapshotter)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
var config = UnpackConfig{
|
|
DuplicationSuppressor: kmutex.NewNoop(),
|
|
}
|
|
for _, o := range rCtx.UnpackOpts {
|
|
if err := o(ctx, &config); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
var limiter *semaphore.Weighted
|
|
if rCtx.MaxConcurrentDownloads > 0 {
|
|
limiter = semaphore.NewWeighted(int64(rCtx.MaxConcurrentDownloads))
|
|
}
|
|
return &unpacker{
|
|
updateCh: make(chan ocispec.Descriptor, 128),
|
|
snapshotter: snapshotter,
|
|
config: config,
|
|
c: c,
|
|
limiter: limiter,
|
|
}, nil
|
|
}
|
|
|
|
func (u *unpacker) unpack(
|
|
ctx context.Context,
|
|
rCtx *RemoteContext,
|
|
h images.Handler,
|
|
config ocispec.Descriptor,
|
|
layers []ocispec.Descriptor,
|
|
) error {
|
|
p, err := content.ReadBlob(ctx, u.c.ContentStore(), config)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
var i ocispec.Image
|
|
if err := json.Unmarshal(p, &i); err != nil {
|
|
return fmt.Errorf("unmarshal image config: %w", err)
|
|
}
|
|
diffIDs := i.RootFS.DiffIDs
|
|
if len(layers) != len(diffIDs) {
|
|
return fmt.Errorf("number of layers and diffIDs don't match: %d != %d", len(layers), len(diffIDs))
|
|
}
|
|
|
|
if u.config.CheckPlatformSupported {
|
|
imgPlatform := platforms.Normalize(ocispec.Platform{OS: i.OS, Architecture: i.Architecture})
|
|
snapshotterPlatformMatcher, err := u.c.GetSnapshotterSupportedPlatforms(ctx, u.snapshotter)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to find supported platforms for snapshotter %s: %w", u.snapshotter, err)
|
|
}
|
|
if !snapshotterPlatformMatcher.Match(imgPlatform) {
|
|
return fmt.Errorf("snapshotter %s does not support platform %s for image %s", u.snapshotter, imgPlatform, config.Digest)
|
|
}
|
|
}
|
|
|
|
var (
|
|
sn = u.c.SnapshotService(u.snapshotter)
|
|
a = u.c.DiffService()
|
|
cs = u.c.ContentStore()
|
|
|
|
chain []digest.Digest
|
|
|
|
fetchOffset int
|
|
fetchC []chan struct{}
|
|
fetchErr chan error
|
|
)
|
|
|
|
// If there is an early return, ensure any ongoing
|
|
// fetches get their context cancelled
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
defer cancel()
|
|
|
|
doUnpackFn := func(i int, desc ocispec.Descriptor) error {
|
|
parent := identity.ChainID(chain)
|
|
chain = append(chain, diffIDs[i])
|
|
chainID := identity.ChainID(chain).String()
|
|
|
|
unlock, err := u.lockSnChainID(ctx, chainID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer unlock()
|
|
|
|
if _, err := sn.Stat(ctx, chainID); err == nil {
|
|
// no need to handle
|
|
return nil
|
|
} else if !errdefs.IsNotFound(err) {
|
|
return fmt.Errorf("failed to stat snapshot %s: %w", chainID, err)
|
|
}
|
|
|
|
// inherits annotations which are provided as snapshot labels.
|
|
labels := snapshots.FilterInheritedLabels(desc.Annotations)
|
|
if labels == nil {
|
|
labels = make(map[string]string)
|
|
}
|
|
labels[labelSnapshotRef] = chainID
|
|
|
|
var (
|
|
key string
|
|
mounts []mount.Mount
|
|
opts = append(rCtx.SnapshotterOpts, snapshots.WithLabels(labels))
|
|
)
|
|
|
|
for try := 1; try <= 3; try++ {
|
|
// Prepare snapshot with from parent, label as root
|
|
key = fmt.Sprintf(snapshots.UnpackKeyFormat, uniquePart(), chainID)
|
|
mounts, err = sn.Prepare(ctx, key, parent.String(), opts...)
|
|
if err != nil {
|
|
if errdefs.IsAlreadyExists(err) {
|
|
if _, err := sn.Stat(ctx, chainID); err != nil {
|
|
if !errdefs.IsNotFound(err) {
|
|
return fmt.Errorf("failed to stat snapshot %s: %w", chainID, err)
|
|
}
|
|
// Try again, this should be rare, log it
|
|
log.G(ctx).WithField("key", key).WithField("chainid", chainID).Debug("extraction snapshot already exists, chain id not found")
|
|
} else {
|
|
// no need to handle, snapshot now found with chain id
|
|
return nil
|
|
}
|
|
} else {
|
|
return fmt.Errorf("failed to prepare extraction snapshot %q: %w", key, err)
|
|
}
|
|
} else {
|
|
break
|
|
}
|
|
}
|
|
if err != nil {
|
|
return fmt.Errorf("unable to prepare extraction snapshot: %w", err)
|
|
}
|
|
|
|
// Abort the snapshot if commit does not happen
|
|
abort := func() {
|
|
if err := sn.Remove(ctx, key); err != nil {
|
|
log.G(ctx).WithError(err).Errorf("failed to cleanup %q", key)
|
|
}
|
|
}
|
|
|
|
if fetchErr == nil {
|
|
fetchErr = make(chan error, 1)
|
|
fetchOffset = i
|
|
fetchC = make([]chan struct{}, len(layers)-fetchOffset)
|
|
for i := range fetchC {
|
|
fetchC[i] = make(chan struct{})
|
|
}
|
|
|
|
go func(i int) {
|
|
err := u.fetch(ctx, h, layers[i:], fetchC)
|
|
if err != nil {
|
|
fetchErr <- err
|
|
}
|
|
close(fetchErr)
|
|
}(i)
|
|
}
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
case err := <-fetchErr:
|
|
if err != nil {
|
|
return err
|
|
}
|
|
case <-fetchC[i-fetchOffset]:
|
|
}
|
|
|
|
diff, err := a.Apply(ctx, desc, mounts, u.config.ApplyOpts...)
|
|
if err != nil {
|
|
abort()
|
|
return fmt.Errorf("failed to extract layer %s: %w", diffIDs[i], err)
|
|
}
|
|
if diff.Digest != diffIDs[i] {
|
|
abort()
|
|
return fmt.Errorf("wrong diff id calculated on extraction %q", diffIDs[i])
|
|
}
|
|
|
|
if err = sn.Commit(ctx, chainID, key, opts...); err != nil {
|
|
abort()
|
|
if errdefs.IsAlreadyExists(err) {
|
|
return nil
|
|
}
|
|
return fmt.Errorf("failed to commit snapshot %s: %w", key, err)
|
|
}
|
|
|
|
// Set the uncompressed label after the uncompressed
|
|
// digest has been verified through apply.
|
|
cinfo := content.Info{
|
|
Digest: desc.Digest,
|
|
Labels: map[string]string{
|
|
"containerd.io/uncompressed": diff.Digest.String(),
|
|
},
|
|
}
|
|
if _, err := cs.Update(ctx, cinfo, "labels.containerd.io/uncompressed"); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
for i, desc := range layers {
|
|
if err := doUnpackFn(i, desc); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
chainID := identity.ChainID(chain).String()
|
|
cinfo := content.Info{
|
|
Digest: config.Digest,
|
|
Labels: map[string]string{
|
|
fmt.Sprintf("containerd.io/gc.ref.snapshot.%s", u.snapshotter): chainID,
|
|
},
|
|
}
|
|
_, err = cs.Update(ctx, cinfo, fmt.Sprintf("labels.containerd.io/gc.ref.snapshot.%s", u.snapshotter))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
log.G(ctx).WithFields(logrus.Fields{
|
|
"config": config.Digest,
|
|
"chainID": chainID,
|
|
}).Debug("image unpacked")
|
|
|
|
return nil
|
|
}
|
|
|
|
func (u *unpacker) fetch(ctx context.Context, h images.Handler, layers []ocispec.Descriptor, done []chan struct{}) error {
|
|
eg, ctx2 := errgroup.WithContext(ctx)
|
|
for i, desc := range layers {
|
|
desc := desc
|
|
i := i
|
|
|
|
if err := u.acquire(ctx); err != nil {
|
|
return err
|
|
}
|
|
|
|
eg.Go(func() error {
|
|
unlock, err := u.lockBlobDescriptor(ctx2, desc)
|
|
if err != nil {
|
|
u.release()
|
|
return err
|
|
}
|
|
|
|
_, err = h.Handle(ctx2, desc)
|
|
|
|
unlock()
|
|
u.release()
|
|
|
|
if err != nil && !errors.Is(err, images.ErrSkipDesc) {
|
|
return err
|
|
}
|
|
close(done[i])
|
|
|
|
return nil
|
|
})
|
|
}
|
|
|
|
return eg.Wait()
|
|
}
|
|
|
|
func (u *unpacker) handlerWrapper(
|
|
uctx context.Context,
|
|
rCtx *RemoteContext,
|
|
unpacks *int32,
|
|
) (func(images.Handler) images.Handler, *errgroup.Group) {
|
|
eg, uctx := errgroup.WithContext(uctx)
|
|
return func(f images.Handler) images.Handler {
|
|
var (
|
|
lock sync.Mutex
|
|
layers = map[digest.Digest][]ocispec.Descriptor{}
|
|
)
|
|
return images.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
|
|
unlock, err := u.lockBlobDescriptor(ctx, desc)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
children, err := f.Handle(ctx, desc)
|
|
unlock()
|
|
if err != nil {
|
|
return children, err
|
|
}
|
|
|
|
switch desc.MediaType {
|
|
case images.MediaTypeDockerSchema2Manifest, ocispec.MediaTypeImageManifest:
|
|
var nonLayers []ocispec.Descriptor
|
|
var manifestLayers []ocispec.Descriptor
|
|
|
|
// Split layers from non-layers, layers will be handled after
|
|
// the config
|
|
for _, child := range children {
|
|
if images.IsLayerType(child.MediaType) {
|
|
manifestLayers = append(manifestLayers, child)
|
|
} else {
|
|
nonLayers = append(nonLayers, child)
|
|
}
|
|
}
|
|
|
|
lock.Lock()
|
|
for _, nl := range nonLayers {
|
|
layers[nl.Digest] = manifestLayers
|
|
}
|
|
lock.Unlock()
|
|
|
|
children = nonLayers
|
|
case images.MediaTypeDockerSchema2Config, ocispec.MediaTypeImageConfig:
|
|
lock.Lock()
|
|
l := layers[desc.Digest]
|
|
lock.Unlock()
|
|
if len(l) > 0 {
|
|
atomic.AddInt32(unpacks, 1)
|
|
eg.Go(func() error {
|
|
return u.unpack(uctx, rCtx, f, desc, l)
|
|
})
|
|
}
|
|
}
|
|
return children, nil
|
|
})
|
|
}, eg
|
|
}
|
|
|
|
func (u *unpacker) acquire(ctx context.Context) error {
|
|
if u.limiter == nil {
|
|
return nil
|
|
}
|
|
return u.limiter.Acquire(ctx, 1)
|
|
}
|
|
|
|
func (u *unpacker) release() {
|
|
if u.limiter == nil {
|
|
return
|
|
}
|
|
u.limiter.Release(1)
|
|
}
|
|
|
|
func (u *unpacker) lockSnChainID(ctx context.Context, chainID string) (func(), error) {
|
|
key := u.makeChainIDKeyWithSnapshotter(chainID)
|
|
|
|
if err := u.config.DuplicationSuppressor.Lock(ctx, key); err != nil {
|
|
return nil, err
|
|
}
|
|
return func() {
|
|
u.config.DuplicationSuppressor.Unlock(key)
|
|
}, nil
|
|
}
|
|
|
|
func (u *unpacker) lockBlobDescriptor(ctx context.Context, desc ocispec.Descriptor) (func(), error) {
|
|
key := u.makeBlobDescriptorKey(desc)
|
|
|
|
if err := u.config.DuplicationSuppressor.Lock(ctx, key); err != nil {
|
|
return nil, err
|
|
}
|
|
return func() {
|
|
u.config.DuplicationSuppressor.Unlock(key)
|
|
}, nil
|
|
}
|
|
|
|
func (u *unpacker) makeChainIDKeyWithSnapshotter(chainID string) string {
|
|
return fmt.Sprintf("sn://%s/%v", u.snapshotter, chainID)
|
|
}
|
|
|
|
func (u *unpacker) makeBlobDescriptorKey(desc ocispec.Descriptor) string {
|
|
return fmt.Sprintf("blob://%v", desc.Digest)
|
|
}
|
|
|
|
func uniquePart() string {
|
|
t := time.Now()
|
|
var b [3]byte
|
|
// Ignore read failures, just decreases uniqueness
|
|
rand.Read(b[:])
|
|
return fmt.Sprintf("%d-%s", t.Nanosecond(), base64.URLEncoding.EncodeToString(b[:]))
|
|
}
|