Add configuration options to local transfer service

Signed-off-by: Tony Fang <nhfang@amazon.com>
This commit is contained in:
Tony Fang
2023-02-07 20:58:03 +00:00
parent e366facb87
commit 47305392c6
14 changed files with 209 additions and 82 deletions

View File

@@ -19,13 +19,16 @@ package local
import (
"context"
"encoding/json"
"fmt"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/pkg/transfer"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/containerd/containerd/pkg/unpack"
)
func (ts *localTransferService) importStream(ctx context.Context, i transfer.ImageImporter, is transfer.ImageStorer, tops *transfer.Config) error {
@@ -46,12 +49,16 @@ func (ts *localTransferService) importStream(ctx context.Context, i transfer.Ima
return err
}
var descriptors []ocispec.Descriptor
var (
descriptors []ocispec.Descriptor
handler images.Handler
unpacker *unpack.Unpacker
)
// If save index, add index
descriptors = append(descriptors, index)
var handler images.HandlerFunc = func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
var handlerFunc images.HandlerFunc = func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
// Only save images at top level
if desc.Digest != index.Digest {
return images.Children(ctx, ts.content, desc)
@@ -76,7 +83,33 @@ func (ts *localTransferService) importStream(ctx context.Context, i transfer.Ima
}
if f, ok := is.(transfer.ImageFilterer); ok {
handler = f.ImageFilter(handler, ts.content)
handlerFunc = f.ImageFilter(handlerFunc, ts.content)
}
handler = images.Handlers(handlerFunc)
// First find suitable platforms to unpack into
//If image storer is also an unpacker type, i.e implemented UnpackPlatforms() func
if iu, ok := is.(transfer.ImageUnpacker); ok {
unpacks := iu.UnpackPlatforms()
if len(unpacks) > 0 {
uopts := []unpack.UnpackerOpt{}
for _, u := range unpacks {
matched, mu := getSupportedPlatform(u, ts.config.UnpackPlatforms)
if matched {
uopts = append(uopts, unpack.WithUnpackPlatform(mu))
}
}
if ts.config.DuplicationSuppressor != nil {
uopts = append(uopts, unpack.WithDuplicationSuppressor(ts.config.DuplicationSuppressor))
}
unpacker, err = unpack.NewUnpacker(ctx, ts.content, uopts...)
if err != nil {
return fmt.Errorf("unable to initialize unpacker: %w", err)
}
handler = unpacker.Unpack(handler)
}
}
if err := images.WalkNotEmpty(ctx, handler, index); err != nil {

View File

@@ -20,6 +20,7 @@ import (
"context"
"fmt"
"github.com/containerd/containerd"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/images"
@@ -73,6 +74,8 @@ func (ts *localTransferService) pull(ctx context.Context, ir transfer.ImageFetch
var (
handler images.Handler
baseHandlers []images.Handler
unpacker *unpack.Unpacker
// has a config media type bug (distribution#1622)
@@ -97,12 +100,6 @@ func (ts *localTransferService) pull(ctx context.Context, ir transfer.ImageFetch
childrenHandler = f.ImageFilter(childrenHandler, store)
}
// Sort and limit manifests if a finite number is needed
//if limit > 0 {
// childrenHandler = images.LimitManifests(childrenHandler, rCtx.PlatformMatcher, limit)
//}
//SetChildrenMappedLabels(manager content.Manager, f HandlerFunc, labelMap func(ocispec.Descriptor) []string) HandlerFunc {
checkNeedsFix := images.HandlerFunc(
func(_ context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
// set to true if there is application/octet-stream media type
@@ -119,8 +116,12 @@ func (ts *localTransferService) pull(ctx context.Context, ir transfer.ImageFetch
return err
}
// TODO: Allow initialization from configuration
baseHandlers := []images.Handler{}
//Set up baseHandlers from service configuration if present or create a new one
if ts.config.BaseHandlers != nil {
baseHandlers = ts.config.BaseHandlers
} else {
baseHandlers = []images.Handler{}
}
if tops.Progress != nil {
baseHandlers = append(baseHandlers, images.HandlerFunc(
@@ -149,22 +150,28 @@ func (ts *localTransferService) pull(ctx context.Context, ir transfer.ImageFetch
appendDistSrcLabelHandler,
)...)
// TODO: Should available platforms be a configuration of the service?
// First find suitable platforms to unpack into
//if unpacker, ok := is.
//If image storer is also an unpacker type, i.e implemented UnpackPlatforms() func
if iu, ok := is.(transfer.ImageUnpacker); ok {
unpacks := iu.UnpackPlatforms()
if len(unpacks) > 0 {
uopts := []unpack.UnpackerOpt{}
//Only unpack if requested unpackconfig matches default/supported unpackconfigs
for _, u := range unpacks {
uopts = append(uopts, unpack.WithUnpackPlatform(u))
matched, mu := getSupportedPlatform(u, ts.config.UnpackPlatforms)
if matched {
uopts = append(uopts, unpack.WithUnpackPlatform(mu))
}
}
if ts.limiter != nil {
uopts = append(uopts, unpack.WithLimiter(ts.limiter))
if ts.limiterD != nil {
uopts = append(uopts, unpack.WithLimiter(ts.limiterD))
}
//if uconfig.DuplicationSuppressor != nil {
// uopts = append(uopts, unpack.WithDuplicationSuppressor(uconfig.DuplicationSuppressor))
//}
if ts.config.DuplicationSuppressor != nil {
uopts = append(uopts, unpack.WithDuplicationSuppressor(ts.config.DuplicationSuppressor))
}
unpacker, err = unpack.NewUnpacker(ctx, ts.content, uopts...)
if err != nil {
return fmt.Errorf("unable to initialize unpacker: %w", err)
@@ -173,7 +180,7 @@ func (ts *localTransferService) pull(ctx context.Context, ir transfer.ImageFetch
}
}
if err := images.Dispatch(ctx, handler, ts.limiter, desc); err != nil {
if err := images.Dispatch(ctx, handler, ts.limiterD, desc); err != nil {
if unpacker != nil {
// wait for unpacker to cleanup
unpacker.Wait()
@@ -241,3 +248,23 @@ func fetchHandler(ingester content.Ingester, fetcher remotes.Fetcher, pt *Progre
}
}
}
// getSupportedPlatform returns a matched platform comparing input UnpackConfiguration to the supported platform/snapshotter combinations
// If input platform didn't specify snapshotter, default will be used if there is a match on platform.
func getSupportedPlatform(uc transfer.UnpackConfiguration, supportedPlatforms []unpack.Platform) (bool, unpack.Platform) {
var u unpack.Platform
for _, sp := range supportedPlatforms {
//If both platform and snapshotter match, return the supportPlatform
//If platform matched and SnapshotterKey is empty, we assume client didn't pass SnapshotterKey
//use default Snapshotter
if sp.Platform.Match(uc.Platform) {
//assuming sp.SnapshotterKey is not empty
if uc.Snapshotter == sp.SnapshotterKey {
return true, sp
} else if uc.Snapshotter == "" && sp.SnapshotterKey == containerd.DefaultSnapshotter {
return true, sp
}
}
}
return false, u
}

View File

@@ -105,8 +105,7 @@ func (ts *localTransferService) push(ctx context.Context, ig transfer.ImageGette
wrapper = pushCtx.HandlerWrapper
}
*/
if err := remotes.PushContent(ctx, pusher, img.Target, ts.content, ts.limiter, matcher, wrapper); err != nil {
if err := remotes.PushContent(ctx, pusher, img.Target, ts.content, ts.limiterU, matcher, wrapper); err != nil {
return err
}
if tops.Progress != nil {

View File

@@ -22,11 +22,15 @@ import (
"io"
"time"
"github.com/containerd/containerd"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/leases"
"github.com/containerd/containerd/pkg/kmutex"
"github.com/containerd/containerd/pkg/transfer"
"github.com/containerd/containerd/pkg/unpack"
"github.com/containerd/containerd/platforms"
"github.com/containerd/typeurl/v2"
"golang.org/x/sync/semaphore"
)
@@ -35,25 +39,21 @@ type localTransferService struct {
leases leases.Manager
content content.Store
images images.Store
// semaphore.NewWeighted(int64(rCtx.MaxConcurrentDownloads))
limiter *semaphore.Weighted
// TODO: Duplication suppressor
// Configuration
// - Max downloads
// - Max uploads
// Supported platforms
// - Platform -> snapshotter defaults?
//limiter for upload
limiterU *semaphore.Weighted
//limiter for download operation
limiterD *semaphore.Weighted
config TransferConfig
}
func NewTransferService(lm leases.Manager, cs content.Store, is images.Store) transfer.Transferrer {
func NewTransferService(lm leases.Manager, cs content.Store, is images.Store, tc *TransferConfig) transfer.Transferrer {
return &localTransferService{
leases: lm,
content: cs,
images: is,
leases: lm,
content: cs,
images: is,
limiterU: semaphore.NewWeighted(int64(tc.MaxConcurrentUploadedLayers)),
limiterD: semaphore.NewWeighted(int64(tc.MaxConcurrentDownloads)),
config: *tc,
}
}
@@ -150,3 +150,40 @@ func (ts *localTransferService) withLease(ctx context.Context, opts ...leases.Op
return ls.Delete(ctx, l)
}, nil
}
type TransferConfig struct {
// MaxConcurrentDownloads is the max concurrent content downloads for pull.
MaxConcurrentDownloads int `toml:"max_concurrent_downloads"`
// MaxConcurrentUploadedLayers is the max concurrent uploads for push
MaxConcurrentUploadedLayers int `toml:"max_concurrent_uploaded_layers"`
// DuplicationSuppressor is used to make sure that there is only one
// in-flight fetch request or unpack handler for a given descriptor's
// digest or chain ID.
DuplicationSuppressor kmutex.KeyedLocker
// BaseHandlers are a set of handlers which get are called on dispatch.
// These handlers always get called before any operation specific
// handlers.
BaseHandlers []images.Handler
//UnpackPlatforms are used to specify supported combination of platforms and snapshotters
UnpackPlatforms []unpack.Platform `toml:"unpack_platforms"`
// RegistryConfigPath is a path to the root directory containing registry-specific configurations
RegistryConfigPath string `toml:"config_path"`
}
func DefaultConfig() *TransferConfig {
return &TransferConfig{
MaxConcurrentDownloads: 3,
MaxConcurrentUploadedLayers: 3,
UnpackPlatforms: []unpack.Platform{
{
Platform: platforms.Only(platforms.DefaultSpec()),
SnapshotterKey: containerd.DefaultSnapshotter,
},
},
}
}