diff --git a/client.go b/client.go index 5b34799f3..eefeb4e94 100644 --- a/client.go +++ b/client.go @@ -61,6 +61,7 @@ import ( ocispec "github.com/opencontainers/image-spec/specs-go/v1" specs "github.com/opencontainers/runtime-spec/specs-go" "github.com/pkg/errors" + "golang.org/x/sync/semaphore" "google.golang.org/grpc" "google.golang.org/grpc/health/grpc_health_v1" ) @@ -292,6 +293,9 @@ type RemoteContext struct { // platforms will be used to create a PlatformMatcher with no ordering // preference. Platforms []string + + // MaxConcurrentDownloads is the max concurrent content downloads for each pull. + MaxConcurrentDownloads int } func defaultRemoteContext() *RemoteContext { @@ -407,6 +411,7 @@ func (c *Client) fetch(ctx context.Context, rCtx *RemoteContext, ref string, lim isConvertible bool converterFunc func(context.Context, ocispec.Descriptor) (ocispec.Descriptor, error) + limiter *semaphore.Weighted ) if desc.MediaType == images.MediaTypeDockerSchema1Manifest && rCtx.ConvertSchema1 { @@ -453,7 +458,10 @@ func (c *Client) fetch(ctx context.Context, rCtx *RemoteContext, ref string, lim } } - if err := images.Dispatch(ctx, handler, desc); err != nil { + if rCtx.MaxConcurrentDownloads > 0 { + limiter = semaphore.NewWeighted(int64(rCtx.MaxConcurrentDownloads)) + } + if err := images.Dispatch(ctx, handler, limiter, desc); err != nil { return images.Image{}, err } diff --git a/client_opts.go b/client_opts.go index b7431ad29..cb3792fb3 100644 --- a/client_opts.go +++ b/client_opts.go @@ -178,3 +178,11 @@ func WithImageHandler(h images.Handler) RemoteOpt { return nil } } + +// WithMaxConcurrentDownloads sets max concurrent download limit. +func WithMaxConcurrentDownloads(max int) RemoteOpt { + return func(client *Client, c *RemoteContext) error { + c.MaxConcurrentDownloads = max + return nil + } +} diff --git a/client_test.go b/client_test.go index 1a4cf3962..83081a6c9 100644 --- a/client_test.go +++ b/client_test.go @@ -327,6 +327,23 @@ func TestImagePullSchema1(t *testing.T) { } } +func TestImagePullWithConcurrencyLimit(t *testing.T) { + client, err := newClient(t, address) + if err != nil { + t.Fatal(err) + } + defer client.Close() + + ctx, cancel := testContext() + defer cancel() + _, err = client.Pull(ctx, testImage, + WithPlatformMatcher(platforms.Default()), + WithMaxConcurrentDownloads(2)) + if err != nil { + t.Fatal(err) + } +} + func TestClientReconnect(t *testing.T) { t.Parallel() diff --git a/images/handlers.go b/images/handlers.go index 230a9caf8..dac701bb8 100644 --- a/images/handlers.go +++ b/images/handlers.go @@ -26,6 +26,7 @@ import ( ocispec "github.com/opencontainers/image-spec/specs-go/v1" "github.com/pkg/errors" "golang.org/x/sync/errgroup" + "golang.org/x/sync/semaphore" ) var ( @@ -108,19 +109,30 @@ func Walk(ctx context.Context, handler Handler, descs ...ocispec.Descriptor) err // handler may return `ErrSkipDesc` to signal to the dispatcher to not traverse // any children. // +// A concurrency limiter can be passed in to limit the number of concurrent +// handlers running. When limiter is nil, there is no limit. +// // Typically, this function will be used with `FetchHandler`, often composed // with other handlers. // // If any handler returns an error, the dispatch session will be canceled. -func Dispatch(ctx context.Context, handler Handler, descs ...ocispec.Descriptor) error { +func Dispatch(ctx context.Context, handler Handler, limiter *semaphore.Weighted, descs ...ocispec.Descriptor) error { eg, ctx := errgroup.WithContext(ctx) for _, desc := range descs { desc := desc + if limiter != nil { + if err := limiter.Acquire(ctx, 1); err != nil { + return err + } + } eg.Go(func() error { desc := desc children, err := handler.Handle(ctx, desc) + if limiter != nil { + limiter.Release(1) + } if err != nil { if errors.Cause(err) == ErrSkipDesc { return nil // don't traverse the children. @@ -129,7 +141,7 @@ func Dispatch(ctx context.Context, handler Handler, descs ...ocispec.Descriptor) } if len(children) > 0 { - return Dispatch(ctx, handler, children...) + return Dispatch(ctx, handler, limiter, children...) } return nil diff --git a/remotes/handlers.go b/remotes/handlers.go index 77310fb62..86922de99 100644 --- a/remotes/handlers.go +++ b/remotes/handlers.go @@ -181,7 +181,7 @@ func PushContent(ctx context.Context, pusher Pusher, desc ocispec.Descriptor, pr pushHandler, ) - if err := images.Dispatch(ctx, images.Handlers(handlers...), desc); err != nil { + if err := images.Dispatch(ctx, images.Handlers(handlers...), nil, desc); err != nil { return err } diff --git a/vendor.conf b/vendor.conf index 5abbc57b5..10f886342 100644 --- a/vendor.conf +++ b/vendor.conf @@ -29,7 +29,7 @@ github.com/pkg/errors v0.8.0 github.com/opencontainers/go-digest c9281466c8b2f606084ac71339773efd177436e7 golang.org/x/sys 1b2967e3c290b7c545b3db0deeda16e9be4f98a2 https://github.com/golang/sys github.com/opencontainers/image-spec v1.0.1 -golang.org/x/sync 450f422ab23cf9881c94e2db30cac0eb1b7cf80c +golang.org/x/sync 42b317875d0fa942474b76e1b46a6060d720ae6e github.com/BurntSushi/toml a368813c5e648fee92e5f6c30e3944ff9d5e8895 github.com/grpc-ecosystem/go-grpc-prometheus 6b7015e65d366bf3f19b2b2a000a831940f0f7e0 github.com/Microsoft/go-winio v0.4.11 diff --git a/vendor/golang.org/x/sync/README b/vendor/golang.org/x/sync/README deleted file mode 100644 index 59c9dcb49..000000000 --- a/vendor/golang.org/x/sync/README +++ /dev/null @@ -1,2 +0,0 @@ -This repository provides Go concurrency primitives in addition to the -ones provided by the language and "sync" and "sync/atomic" packages. diff --git a/vendor/golang.org/x/sync/README.md b/vendor/golang.org/x/sync/README.md new file mode 100644 index 000000000..1f8436cc9 --- /dev/null +++ b/vendor/golang.org/x/sync/README.md @@ -0,0 +1,18 @@ +# Go Sync + +This repository provides Go concurrency primitives in addition to the +ones provided by the language and "sync" and "sync/atomic" packages. + +## Download/Install + +The easiest way to install is to run `go get -u golang.org/x/sync`. You can +also manually git clone the repository to `$GOPATH/src/golang.org/x/sync`. + +## Report Issues / Send Patches + +This repository uses Gerrit for code changes. To learn how to submit changes to +this repository, see https://golang.org/doc/contribute.html. + +The main issue tracker for the sync repository is located at +https://github.com/golang/go/issues. Prefix your issue with "x/sync:" in the +subject line, so it is easy to find. diff --git a/vendor/golang.org/x/sync/errgroup/errgroup.go b/vendor/golang.org/x/sync/errgroup/errgroup.go index 533438d91..9857fe53d 100644 --- a/vendor/golang.org/x/sync/errgroup/errgroup.go +++ b/vendor/golang.org/x/sync/errgroup/errgroup.go @@ -7,9 +7,8 @@ package errgroup import ( + "context" "sync" - - "golang.org/x/net/context" ) // A Group is a collection of goroutines working on subtasks that are part of diff --git a/vendor/golang.org/x/sync/semaphore/semaphore.go b/vendor/golang.org/x/sync/semaphore/semaphore.go new file mode 100644 index 000000000..2096ca3a0 --- /dev/null +++ b/vendor/golang.org/x/sync/semaphore/semaphore.go @@ -0,0 +1,127 @@ +// Copyright 2017 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package semaphore provides a weighted semaphore implementation. +package semaphore // import "golang.org/x/sync/semaphore" + +import ( + "container/list" + "context" + "sync" +) + +type waiter struct { + n int64 + ready chan<- struct{} // Closed when semaphore acquired. +} + +// NewWeighted creates a new weighted semaphore with the given +// maximum combined weight for concurrent access. +func NewWeighted(n int64) *Weighted { + w := &Weighted{size: n} + return w +} + +// Weighted provides a way to bound concurrent access to a resource. +// The callers can request access with a given weight. +type Weighted struct { + size int64 + cur int64 + mu sync.Mutex + waiters list.List +} + +// Acquire acquires the semaphore with a weight of n, blocking only until ctx +// is done. On success, returns nil. On failure, returns ctx.Err() and leaves +// the semaphore unchanged. +// +// If ctx is already done, Acquire may still succeed without blocking. +func (s *Weighted) Acquire(ctx context.Context, n int64) error { + s.mu.Lock() + if s.size-s.cur >= n && s.waiters.Len() == 0 { + s.cur += n + s.mu.Unlock() + return nil + } + + if n > s.size { + // Don't make other Acquire calls block on one that's doomed to fail. + s.mu.Unlock() + <-ctx.Done() + return ctx.Err() + } + + ready := make(chan struct{}) + w := waiter{n: n, ready: ready} + elem := s.waiters.PushBack(w) + s.mu.Unlock() + + select { + case <-ctx.Done(): + err := ctx.Err() + s.mu.Lock() + select { + case <-ready: + // Acquired the semaphore after we were canceled. Rather than trying to + // fix up the queue, just pretend we didn't notice the cancelation. + err = nil + default: + s.waiters.Remove(elem) + } + s.mu.Unlock() + return err + + case <-ready: + return nil + } +} + +// TryAcquire acquires the semaphore with a weight of n without blocking. +// On success, returns true. On failure, returns false and leaves the semaphore unchanged. +func (s *Weighted) TryAcquire(n int64) bool { + s.mu.Lock() + success := s.size-s.cur >= n && s.waiters.Len() == 0 + if success { + s.cur += n + } + s.mu.Unlock() + return success +} + +// Release releases the semaphore with a weight of n. +func (s *Weighted) Release(n int64) { + s.mu.Lock() + s.cur -= n + if s.cur < 0 { + s.mu.Unlock() + panic("semaphore: bad release") + } + for { + next := s.waiters.Front() + if next == nil { + break // No more waiters blocked. + } + + w := next.Value.(waiter) + if s.size-s.cur < w.n { + // Not enough tokens for the next waiter. We could keep going (to try to + // find a waiter with a smaller request), but under load that could cause + // starvation for large requests; instead, we leave all remaining waiters + // blocked. + // + // Consider a semaphore used as a read-write lock, with N tokens, N + // readers, and one writer. Each reader can Acquire(1) to obtain a read + // lock. The writer can Acquire(N) to obtain a write lock, excluding all + // of the readers. If we allow the readers to jump ahead in the queue, + // the writer will starve — there is always one token available for every + // reader. + break + } + + s.cur += w.n + s.waiters.Remove(next) + close(w.ready) + } + s.mu.Unlock() +}