
Background: With current design, the content backend uses key-lock for long-lived write transaction. If the content reference has been marked for write transaction, the other requestes on the same reference will fail fast with unavailable error. Since the metadata plugin is based on boltbd which only supports single-writer, the content backend can't block or handle the request too long. It requires the client to handle retry by itself, like OpenWriter - backoff retry helper. But the maximum retry interval can be up to 2 seconds. If there are several concurrent requestes fo the same image, the waiters maybe wakeup at the same time and there is only one waiter can continue. A lot of waiters will get into sleep and we will take long time to finish all the pulling jobs and be worse if the image has many more layers, which mentioned in issue #4937. After fetching, containerd.Pull API allows several hanlers to commit same ChainID snapshotter but only one can be done successfully. Since unpack tar.gz is time-consuming job, it can impact the performance on unpacking for same ChainID snapshotter in parallel. For instance, the Request 2 doesn't need to prepare and commit, it should just wait for Request 1 finish, which mentioned in pull request #6318. ```text Request 1 Request 2 Prepare | | | | Prepare Commit | | | | Commit(failed on exist) ``` Both content backoff retry and unnecessary unpack impacts the performance. Solution: Introduced the duplicate suppression in fetch and unpack context. The deplicate suppression uses key-mutex and single-waiter-notify to support singleflight. The caller can use the duplicate suppression in different PullImage handlers so that we can avoid unnecessary unpack and spin-lock in OpenWriter. Test Result: Before enhancement: ```bash ➜ /tmp sudo bash testing.sh "localhost:5000/redis:latest" 20 crictl pull localhost:5000/redis:latest (x20) takes ... real 1m6.172s user 0m0.268s sys 0m0.193s docker pull localhost:5000/redis:latest (x20) takes ... real 0m1.324s user 0m0.441s sys 0m0.316s ➜ /tmp sudo bash testing.sh "localhost:5000/golang:latest" 20 crictl pull localhost:5000/golang:latest (x20) takes ... real 1m47.657s user 0m0.284s sys 0m0.224s docker pull localhost:5000/golang:latest (x20) takes ... real 0m6.381s user 0m0.488s sys 0m0.358s ``` With this enhancement: ```bash ➜ /tmp sudo bash testing.sh "localhost:5000/redis:latest" 20 crictl pull localhost:5000/redis:latest (x20) takes ... real 0m1.140s user 0m0.243s sys 0m0.178s docker pull localhost:5000/redis:latest (x20) takes ... real 0m1.239s user 0m0.463s sys 0m0.275s ➜ /tmp sudo bash testing.sh "localhost:5000/golang:latest" 20 crictl pull localhost:5000/golang:latest (x20) takes ... real 0m5.546s user 0m0.217s sys 0m0.219s docker pull localhost:5000/golang:latest (x20) takes ... real 0m6.090s user 0m0.501s sys 0m0.331s ``` Test Script: localhost:5000/{redis|golang}:latest is equal to docker.io/library/{redis|golang}:latest. The image is hold in local registry service by `docker run -d -p 5000:5000 --name registry registry:2`. ```bash image_name="${1}" pull_times="${2:-10}" cleanup() { ctr image rmi "${image_name}" ctr -n k8s.io image rmi "${image_name}" crictl rmi "${image_name}" docker rmi "${image_name}" sleep 2 } crictl_testing() { for idx in $(seq 1 ${pull_times}); do crictl pull "${image_name}" > /dev/null 2>&1 & done wait } docker_testing() { for idx in $(seq 1 ${pull_times}); do docker pull "${image_name}" > /dev/null 2>&1 & done wait } cleanup > /dev/null 2>&1 echo 3 > /proc/sys/vm/drop_caches sleep 3 echo "crictl pull $image_name (x${pull_times}) takes ..." time crictl_testing echo echo 3 > /proc/sys/vm/drop_caches sleep 3 echo "docker pull $image_name (x${pull_times}) takes ..." time docker_testing ``` Fixes: #4937 Close: #4985 Close: #6318 Signed-off-by: Wei Fu <fuweid89@gmail.com>
470 lines
13 KiB
Go
470 lines
13 KiB
Go
/*
|
|
Copyright The containerd Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package containerd
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"strings"
|
|
"sync/atomic"
|
|
|
|
"github.com/containerd/containerd/content"
|
|
"github.com/containerd/containerd/diff"
|
|
"github.com/containerd/containerd/errdefs"
|
|
"github.com/containerd/containerd/images"
|
|
"github.com/containerd/containerd/pkg/kmutex"
|
|
"github.com/containerd/containerd/platforms"
|
|
"github.com/containerd/containerd/rootfs"
|
|
"github.com/containerd/containerd/snapshots"
|
|
"github.com/opencontainers/go-digest"
|
|
"github.com/opencontainers/image-spec/identity"
|
|
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
|
|
"golang.org/x/sync/semaphore"
|
|
)
|
|
|
|
// Image describes an image used by containers
|
|
type Image interface {
|
|
// Name of the image
|
|
Name() string
|
|
// Target descriptor for the image content
|
|
Target() ocispec.Descriptor
|
|
// Labels of the image
|
|
Labels() map[string]string
|
|
// Unpack unpacks the image's content into a snapshot
|
|
Unpack(context.Context, string, ...UnpackOpt) error
|
|
// RootFS returns the unpacked diffids that make up images rootfs.
|
|
RootFS(ctx context.Context) ([]digest.Digest, error)
|
|
// Size returns the total size of the image's packed resources.
|
|
Size(ctx context.Context) (int64, error)
|
|
// Usage returns a usage calculation for the image.
|
|
Usage(context.Context, ...UsageOpt) (int64, error)
|
|
// Config descriptor for the image.
|
|
Config(ctx context.Context) (ocispec.Descriptor, error)
|
|
// IsUnpacked returns whether or not an image is unpacked.
|
|
IsUnpacked(context.Context, string) (bool, error)
|
|
// ContentStore provides a content store which contains image blob data
|
|
ContentStore() content.Store
|
|
// Metadata returns the underlying image metadata
|
|
Metadata() images.Image
|
|
// Platform returns the platform match comparer. Can be nil.
|
|
Platform() platforms.MatchComparer
|
|
}
|
|
|
|
type usageOptions struct {
|
|
manifestLimit *int
|
|
manifestOnly bool
|
|
snapshots bool
|
|
}
|
|
|
|
// UsageOpt is used to configure the usage calculation
|
|
type UsageOpt func(*usageOptions) error
|
|
|
|
// WithUsageManifestLimit sets the limit to the number of manifests which will
|
|
// be walked for usage. Setting this value to 0 will require all manifests to
|
|
// be walked, returning ErrNotFound if manifests are missing.
|
|
// NOTE: By default all manifests which exist will be walked
|
|
// and any non-existent manifests and their subobjects will be ignored.
|
|
func WithUsageManifestLimit(i int) UsageOpt {
|
|
// If 0 then don't filter any manifests
|
|
// By default limits to current platform
|
|
return func(o *usageOptions) error {
|
|
o.manifestLimit = &i
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// WithSnapshotUsage will check for referenced snapshots from the image objects
|
|
// and include the snapshot size in the total usage.
|
|
func WithSnapshotUsage() UsageOpt {
|
|
return func(o *usageOptions) error {
|
|
o.snapshots = true
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// WithManifestUsage is used to get the usage for an image based on what is
|
|
// reported by the manifests rather than what exists in the content store.
|
|
// NOTE: This function is best used with the manifest limit set to get a
|
|
// consistent value, otherwise non-existent manifests will be excluded.
|
|
func WithManifestUsage() UsageOpt {
|
|
return func(o *usageOptions) error {
|
|
o.manifestOnly = true
|
|
return nil
|
|
}
|
|
}
|
|
|
|
var _ = (Image)(&image{})
|
|
|
|
// NewImage returns a client image object from the metadata image
|
|
func NewImage(client *Client, i images.Image) Image {
|
|
return &image{
|
|
client: client,
|
|
i: i,
|
|
platform: client.platform,
|
|
}
|
|
}
|
|
|
|
// NewImageWithPlatform returns a client image object from the metadata image
|
|
func NewImageWithPlatform(client *Client, i images.Image, platform platforms.MatchComparer) Image {
|
|
return &image{
|
|
client: client,
|
|
i: i,
|
|
platform: platform,
|
|
}
|
|
}
|
|
|
|
type image struct {
|
|
client *Client
|
|
|
|
i images.Image
|
|
platform platforms.MatchComparer
|
|
}
|
|
|
|
func (i *image) Metadata() images.Image {
|
|
return i.i
|
|
}
|
|
|
|
func (i *image) Name() string {
|
|
return i.i.Name
|
|
}
|
|
|
|
func (i *image) Target() ocispec.Descriptor {
|
|
return i.i.Target
|
|
}
|
|
|
|
func (i *image) Labels() map[string]string {
|
|
return i.i.Labels
|
|
}
|
|
|
|
func (i *image) RootFS(ctx context.Context) ([]digest.Digest, error) {
|
|
provider := i.client.ContentStore()
|
|
return i.i.RootFS(ctx, provider, i.platform)
|
|
}
|
|
|
|
func (i *image) Size(ctx context.Context) (int64, error) {
|
|
return i.Usage(ctx, WithUsageManifestLimit(1), WithManifestUsage())
|
|
}
|
|
|
|
func (i *image) Usage(ctx context.Context, opts ...UsageOpt) (int64, error) {
|
|
var config usageOptions
|
|
for _, opt := range opts {
|
|
if err := opt(&config); err != nil {
|
|
return 0, err
|
|
}
|
|
}
|
|
|
|
var (
|
|
provider = i.client.ContentStore()
|
|
handler = images.ChildrenHandler(provider)
|
|
size int64
|
|
mustExist bool
|
|
)
|
|
|
|
if config.manifestLimit != nil {
|
|
handler = images.LimitManifests(handler, i.platform, *config.manifestLimit)
|
|
mustExist = true
|
|
}
|
|
|
|
var wh images.HandlerFunc = func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
|
|
var usage int64
|
|
children, err := handler(ctx, desc)
|
|
if err != nil {
|
|
if !errdefs.IsNotFound(err) || mustExist {
|
|
return nil, err
|
|
}
|
|
if !config.manifestOnly {
|
|
// Do not count size of non-existent objects
|
|
desc.Size = 0
|
|
}
|
|
} else if config.snapshots || !config.manifestOnly {
|
|
info, err := provider.Info(ctx, desc.Digest)
|
|
if err != nil {
|
|
if !errdefs.IsNotFound(err) {
|
|
return nil, err
|
|
}
|
|
if !config.manifestOnly {
|
|
// Do not count size of non-existent objects
|
|
desc.Size = 0
|
|
}
|
|
} else if info.Size > desc.Size {
|
|
// Count actual usage, Size may be unset or -1
|
|
desc.Size = info.Size
|
|
}
|
|
|
|
if config.snapshots {
|
|
for k, v := range info.Labels {
|
|
const prefix = "containerd.io/gc.ref.snapshot."
|
|
if !strings.HasPrefix(k, prefix) {
|
|
continue
|
|
}
|
|
|
|
sn := i.client.SnapshotService(k[len(prefix):])
|
|
if sn == nil {
|
|
continue
|
|
}
|
|
|
|
u, err := sn.Usage(ctx, v)
|
|
if err != nil {
|
|
if !errdefs.IsNotFound(err) && !errdefs.IsInvalidArgument(err) {
|
|
return nil, err
|
|
}
|
|
} else {
|
|
usage += u.Size
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Ignore unknown sizes. Generally unknown sizes should
|
|
// never be set in manifests, however, the usage
|
|
// calculation does not need to enforce this.
|
|
if desc.Size >= 0 {
|
|
usage += desc.Size
|
|
}
|
|
|
|
atomic.AddInt64(&size, usage)
|
|
|
|
return children, nil
|
|
}
|
|
|
|
l := semaphore.NewWeighted(3)
|
|
if err := images.Dispatch(ctx, wh, l, i.i.Target); err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
return size, nil
|
|
}
|
|
|
|
func (i *image) Config(ctx context.Context) (ocispec.Descriptor, error) {
|
|
provider := i.client.ContentStore()
|
|
return i.i.Config(ctx, provider, i.platform)
|
|
}
|
|
|
|
func (i *image) IsUnpacked(ctx context.Context, snapshotterName string) (bool, error) {
|
|
sn, err := i.client.getSnapshotter(ctx, snapshotterName)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
cs := i.client.ContentStore()
|
|
|
|
diffs, err := i.i.RootFS(ctx, cs, i.platform)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
chainID := identity.ChainID(diffs)
|
|
_, err = sn.Stat(ctx, chainID.String())
|
|
if err == nil {
|
|
return true, nil
|
|
} else if !errdefs.IsNotFound(err) {
|
|
return false, err
|
|
}
|
|
|
|
return false, nil
|
|
}
|
|
|
|
// UnpackConfig provides configuration for the unpack of an image
|
|
type UnpackConfig struct {
|
|
// ApplyOpts for applying a diff to a snapshotter
|
|
ApplyOpts []diff.ApplyOpt
|
|
// SnapshotOpts for configuring a snapshotter
|
|
SnapshotOpts []snapshots.Opt
|
|
// CheckPlatformSupported is whether to validate that a snapshotter
|
|
// supports an image's platform before unpacking
|
|
CheckPlatformSupported bool
|
|
// DuplicationSuppressor is used to make sure that there is only one
|
|
// in-flight fetch request or unpack handler for a given descriptor's
|
|
// digest or chain ID.
|
|
DuplicationSuppressor kmutex.KeyedLocker
|
|
}
|
|
|
|
// UnpackOpt provides configuration for unpack
|
|
type UnpackOpt func(context.Context, *UnpackConfig) error
|
|
|
|
// WithSnapshotterPlatformCheck sets `CheckPlatformSupported` on the UnpackConfig
|
|
func WithSnapshotterPlatformCheck() UnpackOpt {
|
|
return func(ctx context.Context, uc *UnpackConfig) error {
|
|
uc.CheckPlatformSupported = true
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// WithUnpackDuplicationSuppressor sets `DuplicationSuppressor` on the UnpackConfig.
|
|
func WithUnpackDuplicationSuppressor(suppressor kmutex.KeyedLocker) UnpackOpt {
|
|
return func(ctx context.Context, uc *UnpackConfig) error {
|
|
uc.DuplicationSuppressor = suppressor
|
|
return nil
|
|
}
|
|
}
|
|
|
|
func (i *image) Unpack(ctx context.Context, snapshotterName string, opts ...UnpackOpt) error {
|
|
ctx, done, err := i.client.WithLease(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer done(ctx)
|
|
|
|
var config UnpackConfig
|
|
for _, o := range opts {
|
|
if err := o(ctx, &config); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
manifest, err := i.getManifest(ctx, i.platform)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
layers, err := i.getLayers(ctx, i.platform, manifest)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
var (
|
|
a = i.client.DiffService()
|
|
cs = i.client.ContentStore()
|
|
|
|
chain []digest.Digest
|
|
unpacked bool
|
|
)
|
|
snapshotterName, err = i.client.resolveSnapshotterName(ctx, snapshotterName)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
sn, err := i.client.getSnapshotter(ctx, snapshotterName)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if config.CheckPlatformSupported {
|
|
if err := i.checkSnapshotterSupport(ctx, snapshotterName, manifest); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
for _, layer := range layers {
|
|
unpacked, err = rootfs.ApplyLayerWithOpts(ctx, layer, chain, sn, a, config.SnapshotOpts, config.ApplyOpts)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if unpacked {
|
|
// Set the uncompressed label after the uncompressed
|
|
// digest has been verified through apply.
|
|
cinfo := content.Info{
|
|
Digest: layer.Blob.Digest,
|
|
Labels: map[string]string{
|
|
"containerd.io/uncompressed": layer.Diff.Digest.String(),
|
|
},
|
|
}
|
|
if _, err := cs.Update(ctx, cinfo, "labels.containerd.io/uncompressed"); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
chain = append(chain, layer.Diff.Digest)
|
|
}
|
|
|
|
desc, err := i.i.Config(ctx, cs, i.platform)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
rootfs := identity.ChainID(chain).String()
|
|
|
|
cinfo := content.Info{
|
|
Digest: desc.Digest,
|
|
Labels: map[string]string{
|
|
fmt.Sprintf("containerd.io/gc.ref.snapshot.%s", snapshotterName): rootfs,
|
|
},
|
|
}
|
|
|
|
_, err = cs.Update(ctx, cinfo, fmt.Sprintf("labels.containerd.io/gc.ref.snapshot.%s", snapshotterName))
|
|
return err
|
|
}
|
|
|
|
func (i *image) getManifest(ctx context.Context, platform platforms.MatchComparer) (ocispec.Manifest, error) {
|
|
cs := i.ContentStore()
|
|
manifest, err := images.Manifest(ctx, cs, i.i.Target, platform)
|
|
if err != nil {
|
|
return ocispec.Manifest{}, err
|
|
}
|
|
return manifest, nil
|
|
}
|
|
|
|
func (i *image) getLayers(ctx context.Context, platform platforms.MatchComparer, manifest ocispec.Manifest) ([]rootfs.Layer, error) {
|
|
cs := i.ContentStore()
|
|
diffIDs, err := i.i.RootFS(ctx, cs, platform)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to resolve rootfs: %w", err)
|
|
}
|
|
if len(diffIDs) != len(manifest.Layers) {
|
|
return nil, errors.New("mismatched image rootfs and manifest layers")
|
|
}
|
|
layers := make([]rootfs.Layer, len(diffIDs))
|
|
for i := range diffIDs {
|
|
layers[i].Diff = ocispec.Descriptor{
|
|
// TODO: derive media type from compressed type
|
|
MediaType: ocispec.MediaTypeImageLayer,
|
|
Digest: diffIDs[i],
|
|
}
|
|
layers[i].Blob = manifest.Layers[i]
|
|
}
|
|
return layers, nil
|
|
}
|
|
|
|
func (i *image) getManifestPlatform(ctx context.Context, manifest ocispec.Manifest) (ocispec.Platform, error) {
|
|
cs := i.ContentStore()
|
|
p, err := content.ReadBlob(ctx, cs, manifest.Config)
|
|
if err != nil {
|
|
return ocispec.Platform{}, err
|
|
}
|
|
|
|
var image ocispec.Image
|
|
if err := json.Unmarshal(p, &image); err != nil {
|
|
return ocispec.Platform{}, err
|
|
}
|
|
return platforms.Normalize(ocispec.Platform{OS: image.OS, Architecture: image.Architecture}), nil
|
|
}
|
|
|
|
func (i *image) checkSnapshotterSupport(ctx context.Context, snapshotterName string, manifest ocispec.Manifest) error {
|
|
snapshotterPlatformMatcher, err := i.client.GetSnapshotterSupportedPlatforms(ctx, snapshotterName)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
manifestPlatform, err := i.getManifestPlatform(ctx, manifest)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if snapshotterPlatformMatcher.Match(manifestPlatform) {
|
|
return nil
|
|
}
|
|
return fmt.Errorf("snapshotter %s does not support platform %s for image %s", snapshotterName, manifestPlatform, manifest.Config.Digest)
|
|
}
|
|
|
|
func (i *image) ContentStore() content.Store {
|
|
return i.client.ContentStore()
|
|
}
|
|
|
|
func (i *image) Platform() platforms.MatchComparer {
|
|
return i.platform
|
|
}
|