diff --git a/client.go b/client.go index 011c0c323..ace9bf6d1 100644 --- a/client.go +++ b/client.go @@ -63,6 +63,7 @@ import ( ocispec "github.com/opencontainers/image-spec/specs-go/v1" specs "github.com/opencontainers/runtime-spec/specs-go" "github.com/pkg/errors" + "golang.org/x/sync/semaphore" "google.golang.org/grpc" "google.golang.org/grpc/backoff" "google.golang.org/grpc/health/grpc_health_v1" @@ -355,6 +356,9 @@ type RemoteContext struct { // MaxConcurrentDownloads is the max concurrent content downloads for each pull. MaxConcurrentDownloads int + // MaxConcurrentUploadedLayers is the max concurrent uploaded layers for each push. + MaxConcurrentUploadedLayers int + // AllMetadata downloads all manifests and known-configuration files AllMetadata bool @@ -463,7 +467,12 @@ func (c *Client) Push(ctx context.Context, ref string, desc ocispec.Descriptor, wrapper = pushCtx.HandlerWrapper } - return remotes.PushContent(ctx, pusher, desc, c.ContentStore(), pushCtx.PlatformMatcher, wrapper) + var limiter *semaphore.Weighted + if pushCtx.MaxConcurrentUploadedLayers > 0 { + limiter = semaphore.NewWeighted(int64(pushCtx.MaxConcurrentUploadedLayers)) + } + + return remotes.PushContent(ctx, pusher, desc, c.ContentStore(), limiter, pushCtx.PlatformMatcher, wrapper) } // GetImage returns an existing image diff --git a/client_opts.go b/client_opts.go index 5271b94ee..44feaa30a 100644 --- a/client_opts.go +++ b/client_opts.go @@ -228,6 +228,14 @@ func WithMaxConcurrentDownloads(max int) RemoteOpt { } } +// WithMaxConcurrentUploadedLayers sets max concurrent uploaded layer limit. +func WithMaxConcurrentUploadedLayers(max int) RemoteOpt { + return func(client *Client, c *RemoteContext) error { + c.MaxConcurrentUploadedLayers = max + return nil + } +} + // WithAllMetadata downloads all manifests and known-configuration files func WithAllMetadata() RemoteOpt { return func(_ *Client, c *RemoteContext) error { diff --git a/cmd/ctr/commands/content/fetch.go b/cmd/ctr/commands/content/fetch.go index 873637add..4dc10875e 100644 --- a/cmd/ctr/commands/content/fetch.go +++ b/cmd/ctr/commands/content/fetch.go @@ -109,7 +109,7 @@ type FetchConfig struct { Platforms []string // Whether or not download all metadata AllMetadata bool - // RemoteOpts is not used by ctr, but can be used by other CLI tools + // RemoteOpts to configure object resolutions and transfers with remote content providers RemoteOpts []containerd.RemoteOpt // TraceHTTP writes DNS and connection information to the log when dealing with a container registry TraceHTTP bool @@ -145,6 +145,16 @@ func NewFetchConfig(ctx context.Context, clicontext *cli.Context) (*FetchConfig, config.AllMetadata = true } + if clicontext.IsSet("max-concurrent-downloads") { + mcd := clicontext.Int("max-concurrent-downloads") + config.RemoteOpts = append(config.RemoteOpts, containerd.WithMaxConcurrentDownloads(mcd)) + } + + if clicontext.IsSet("max-concurrent-uploaded-layers") { + mcu := clicontext.Int("max-concurrent-uploaded-layers") + config.RemoteOpts = append(config.RemoteOpts, containerd.WithMaxConcurrentUploadedLayers(mcu)) + } + return config, nil } diff --git a/cmd/ctr/commands/images/pull.go b/cmd/ctr/commands/images/pull.go index 57888c73a..7a5db210f 100644 --- a/cmd/ctr/commands/images/pull.go +++ b/cmd/ctr/commands/images/pull.go @@ -63,6 +63,10 @@ command. As part of this process, we do the following: Name: "print-chainid", Usage: "Print the resulting image's chain ID", }, + cli.IntFlag{ + Name: "max-concurrent-downloads", + Usage: "Set the max concurrent downloads for each pull", + }, ), Action: func(context *cli.Context) error { var ( diff --git a/cmd/ctr/commands/images/push.go b/cmd/ctr/commands/images/push.go index 095ea7f7f..da9e8968b 100644 --- a/cmd/ctr/commands/images/push.go +++ b/cmd/ctr/commands/images/push.go @@ -64,6 +64,9 @@ var pushCommand = cli.Command{ Name: "platform", Usage: "push content from a specific platform", Value: &cli.StringSlice{}, + }, cli.IntFlag{ + Name: "max-concurrent-uploaded-layers", + Usage: "Set the max concurrent uploaded layers for each push", }), Action: func(context *cli.Context) error { var ( @@ -144,10 +147,17 @@ var pushCommand = cli.Command{ return nil, nil }) - return client.Push(ctx, ref, desc, + ropts := []containerd.RemoteOpt{ containerd.WithResolver(resolver), containerd.WithImageHandler(jobHandler), - ) + } + + if context.IsSet("max-concurrent-uploaded-layers") { + mcu := context.Int("max-concurrent-uploaded-layers") + ropts = append(ropts, containerd.WithMaxConcurrentUploadedLayers(mcu)) + } + + return client.Push(ctx, ref, desc, ropts...) }) // don't show progress if debug mode is set diff --git a/remotes/handlers.go b/remotes/handlers.go index 9c864ec72..5ae6113aa 100644 --- a/remotes/handlers.go +++ b/remotes/handlers.go @@ -31,6 +31,7 @@ import ( ocispec "github.com/opencontainers/image-spec/specs-go/v1" "github.com/pkg/errors" "github.com/sirupsen/logrus" + "golang.org/x/sync/semaphore" ) type refKeyPrefix struct{} @@ -181,7 +182,8 @@ func push(ctx context.Context, provider content.Provider, pusher Pusher, desc oc // // Base handlers can be provided which will be called before any push specific // handlers. -func PushContent(ctx context.Context, pusher Pusher, desc ocispec.Descriptor, store content.Store, platform platforms.MatchComparer, wrapper func(h images.Handler) images.Handler) error { +func PushContent(ctx context.Context, pusher Pusher, desc ocispec.Descriptor, store content.Store, limiter *semaphore.Weighted, platform platforms.MatchComparer, wrapper func(h images.Handler) images.Handler) error { + var m sync.Mutex manifestStack := []ocispec.Descriptor{} @@ -213,7 +215,7 @@ func PushContent(ctx context.Context, pusher Pusher, desc ocispec.Descriptor, st handler = wrapper(handler) } - if err := images.Dispatch(ctx, handler, nil, desc); err != nil { + if err := images.Dispatch(ctx, handler, limiter, desc); err != nil { return err }