Add image pull concurrency limit.
Signed-off-by: Lantao Liu <lantaol@google.com>
This commit is contained in:
parent
5abeeff84f
commit
d7ed403072
10
client.go
10
client.go
@ -61,6 +61,7 @@ import (
|
||||
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
|
||||
specs "github.com/opencontainers/runtime-spec/specs-go"
|
||||
"github.com/pkg/errors"
|
||||
"golang.org/x/sync/semaphore"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/health/grpc_health_v1"
|
||||
)
|
||||
@ -292,6 +293,9 @@ type RemoteContext struct {
|
||||
// platforms will be used to create a PlatformMatcher with no ordering
|
||||
// preference.
|
||||
Platforms []string
|
||||
|
||||
// MaxConcurrentDownloads is the max concurrent content downloads for each pull.
|
||||
MaxConcurrentDownloads int
|
||||
}
|
||||
|
||||
func defaultRemoteContext() *RemoteContext {
|
||||
@ -407,6 +411,7 @@ func (c *Client) fetch(ctx context.Context, rCtx *RemoteContext, ref string, lim
|
||||
|
||||
isConvertible bool
|
||||
converterFunc func(context.Context, ocispec.Descriptor) (ocispec.Descriptor, error)
|
||||
limiter *semaphore.Weighted
|
||||
)
|
||||
|
||||
if desc.MediaType == images.MediaTypeDockerSchema1Manifest && rCtx.ConvertSchema1 {
|
||||
@ -453,7 +458,10 @@ func (c *Client) fetch(ctx context.Context, rCtx *RemoteContext, ref string, lim
|
||||
}
|
||||
}
|
||||
|
||||
if err := images.Dispatch(ctx, handler, desc); err != nil {
|
||||
if rCtx.MaxConcurrentDownloads > 0 {
|
||||
limiter = semaphore.NewWeighted(int64(rCtx.MaxConcurrentDownloads))
|
||||
}
|
||||
if err := images.Dispatch(ctx, handler, limiter, desc); err != nil {
|
||||
return images.Image{}, err
|
||||
}
|
||||
|
||||
|
@ -178,3 +178,11 @@ func WithImageHandler(h images.Handler) RemoteOpt {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// WithMaxConcurrentDownloads sets max concurrent download limit.
|
||||
func WithMaxConcurrentDownloads(max int) RemoteOpt {
|
||||
return func(client *Client, c *RemoteContext) error {
|
||||
c.MaxConcurrentDownloads = max
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
@ -327,6 +327,23 @@ func TestImagePullSchema1(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestImagePullWithConcurrencyLimit(t *testing.T) {
|
||||
client, err := newClient(t, address)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer client.Close()
|
||||
|
||||
ctx, cancel := testContext()
|
||||
defer cancel()
|
||||
_, err = client.Pull(ctx, testImage,
|
||||
WithPlatformMatcher(platforms.Default()),
|
||||
WithMaxConcurrentDownloads(2))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestClientReconnect(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
|
@ -26,6 +26,7 @@ import (
|
||||
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
|
||||
"github.com/pkg/errors"
|
||||
"golang.org/x/sync/errgroup"
|
||||
"golang.org/x/sync/semaphore"
|
||||
)
|
||||
|
||||
var (
|
||||
@ -108,19 +109,30 @@ func Walk(ctx context.Context, handler Handler, descs ...ocispec.Descriptor) err
|
||||
// handler may return `ErrSkipDesc` to signal to the dispatcher to not traverse
|
||||
// any children.
|
||||
//
|
||||
// A concurrency limiter can be passed in to limit the number of concurrent
|
||||
// handlers running. When limiter is nil, there is no limit.
|
||||
//
|
||||
// Typically, this function will be used with `FetchHandler`, often composed
|
||||
// with other handlers.
|
||||
//
|
||||
// If any handler returns an error, the dispatch session will be canceled.
|
||||
func Dispatch(ctx context.Context, handler Handler, descs ...ocispec.Descriptor) error {
|
||||
func Dispatch(ctx context.Context, handler Handler, limiter *semaphore.Weighted, descs ...ocispec.Descriptor) error {
|
||||
eg, ctx := errgroup.WithContext(ctx)
|
||||
for _, desc := range descs {
|
||||
desc := desc
|
||||
|
||||
if limiter != nil {
|
||||
if err := limiter.Acquire(ctx, 1); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
eg.Go(func() error {
|
||||
desc := desc
|
||||
|
||||
children, err := handler.Handle(ctx, desc)
|
||||
if limiter != nil {
|
||||
limiter.Release(1)
|
||||
}
|
||||
if err != nil {
|
||||
if errors.Cause(err) == ErrSkipDesc {
|
||||
return nil // don't traverse the children.
|
||||
@ -129,7 +141,7 @@ func Dispatch(ctx context.Context, handler Handler, descs ...ocispec.Descriptor)
|
||||
}
|
||||
|
||||
if len(children) > 0 {
|
||||
return Dispatch(ctx, handler, children...)
|
||||
return Dispatch(ctx, handler, limiter, children...)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@ -181,7 +181,7 @@ func PushContent(ctx context.Context, pusher Pusher, desc ocispec.Descriptor, pr
|
||||
pushHandler,
|
||||
)
|
||||
|
||||
if err := images.Dispatch(ctx, images.Handlers(handlers...), desc); err != nil {
|
||||
if err := images.Dispatch(ctx, images.Handlers(handlers...), nil, desc); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -29,7 +29,7 @@ github.com/pkg/errors v0.8.0
|
||||
github.com/opencontainers/go-digest c9281466c8b2f606084ac71339773efd177436e7
|
||||
golang.org/x/sys 1b2967e3c290b7c545b3db0deeda16e9be4f98a2 https://github.com/golang/sys
|
||||
github.com/opencontainers/image-spec v1.0.1
|
||||
golang.org/x/sync 450f422ab23cf9881c94e2db30cac0eb1b7cf80c
|
||||
golang.org/x/sync 42b317875d0fa942474b76e1b46a6060d720ae6e
|
||||
github.com/BurntSushi/toml a368813c5e648fee92e5f6c30e3944ff9d5e8895
|
||||
github.com/grpc-ecosystem/go-grpc-prometheus 6b7015e65d366bf3f19b2b2a000a831940f0f7e0
|
||||
github.com/Microsoft/go-winio v0.4.11
|
||||
|
2
vendor/golang.org/x/sync/README
generated
vendored
2
vendor/golang.org/x/sync/README
generated
vendored
@ -1,2 +0,0 @@
|
||||
This repository provides Go concurrency primitives in addition to the
|
||||
ones provided by the language and "sync" and "sync/atomic" packages.
|
18
vendor/golang.org/x/sync/README.md
generated
vendored
Normal file
18
vendor/golang.org/x/sync/README.md
generated
vendored
Normal file
@ -0,0 +1,18 @@
|
||||
# Go Sync
|
||||
|
||||
This repository provides Go concurrency primitives in addition to the
|
||||
ones provided by the language and "sync" and "sync/atomic" packages.
|
||||
|
||||
## Download/Install
|
||||
|
||||
The easiest way to install is to run `go get -u golang.org/x/sync`. You can
|
||||
also manually git clone the repository to `$GOPATH/src/golang.org/x/sync`.
|
||||
|
||||
## Report Issues / Send Patches
|
||||
|
||||
This repository uses Gerrit for code changes. To learn how to submit changes to
|
||||
this repository, see https://golang.org/doc/contribute.html.
|
||||
|
||||
The main issue tracker for the sync repository is located at
|
||||
https://github.com/golang/go/issues. Prefix your issue with "x/sync:" in the
|
||||
subject line, so it is easy to find.
|
3
vendor/golang.org/x/sync/errgroup/errgroup.go
generated
vendored
3
vendor/golang.org/x/sync/errgroup/errgroup.go
generated
vendored
@ -7,9 +7,8 @@
|
||||
package errgroup
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
// A Group is a collection of goroutines working on subtasks that are part of
|
||||
|
127
vendor/golang.org/x/sync/semaphore/semaphore.go
generated
vendored
Normal file
127
vendor/golang.org/x/sync/semaphore/semaphore.go
generated
vendored
Normal file
@ -0,0 +1,127 @@
|
||||
// Copyright 2017 The Go Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
// Package semaphore provides a weighted semaphore implementation.
|
||||
package semaphore // import "golang.org/x/sync/semaphore"
|
||||
|
||||
import (
|
||||
"container/list"
|
||||
"context"
|
||||
"sync"
|
||||
)
|
||||
|
||||
type waiter struct {
|
||||
n int64
|
||||
ready chan<- struct{} // Closed when semaphore acquired.
|
||||
}
|
||||
|
||||
// NewWeighted creates a new weighted semaphore with the given
|
||||
// maximum combined weight for concurrent access.
|
||||
func NewWeighted(n int64) *Weighted {
|
||||
w := &Weighted{size: n}
|
||||
return w
|
||||
}
|
||||
|
||||
// Weighted provides a way to bound concurrent access to a resource.
|
||||
// The callers can request access with a given weight.
|
||||
type Weighted struct {
|
||||
size int64
|
||||
cur int64
|
||||
mu sync.Mutex
|
||||
waiters list.List
|
||||
}
|
||||
|
||||
// Acquire acquires the semaphore with a weight of n, blocking only until ctx
|
||||
// is done. On success, returns nil. On failure, returns ctx.Err() and leaves
|
||||
// the semaphore unchanged.
|
||||
//
|
||||
// If ctx is already done, Acquire may still succeed without blocking.
|
||||
func (s *Weighted) Acquire(ctx context.Context, n int64) error {
|
||||
s.mu.Lock()
|
||||
if s.size-s.cur >= n && s.waiters.Len() == 0 {
|
||||
s.cur += n
|
||||
s.mu.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
if n > s.size {
|
||||
// Don't make other Acquire calls block on one that's doomed to fail.
|
||||
s.mu.Unlock()
|
||||
<-ctx.Done()
|
||||
return ctx.Err()
|
||||
}
|
||||
|
||||
ready := make(chan struct{})
|
||||
w := waiter{n: n, ready: ready}
|
||||
elem := s.waiters.PushBack(w)
|
||||
s.mu.Unlock()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
err := ctx.Err()
|
||||
s.mu.Lock()
|
||||
select {
|
||||
case <-ready:
|
||||
// Acquired the semaphore after we were canceled. Rather than trying to
|
||||
// fix up the queue, just pretend we didn't notice the cancelation.
|
||||
err = nil
|
||||
default:
|
||||
s.waiters.Remove(elem)
|
||||
}
|
||||
s.mu.Unlock()
|
||||
return err
|
||||
|
||||
case <-ready:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// TryAcquire acquires the semaphore with a weight of n without blocking.
|
||||
// On success, returns true. On failure, returns false and leaves the semaphore unchanged.
|
||||
func (s *Weighted) TryAcquire(n int64) bool {
|
||||
s.mu.Lock()
|
||||
success := s.size-s.cur >= n && s.waiters.Len() == 0
|
||||
if success {
|
||||
s.cur += n
|
||||
}
|
||||
s.mu.Unlock()
|
||||
return success
|
||||
}
|
||||
|
||||
// Release releases the semaphore with a weight of n.
|
||||
func (s *Weighted) Release(n int64) {
|
||||
s.mu.Lock()
|
||||
s.cur -= n
|
||||
if s.cur < 0 {
|
||||
s.mu.Unlock()
|
||||
panic("semaphore: bad release")
|
||||
}
|
||||
for {
|
||||
next := s.waiters.Front()
|
||||
if next == nil {
|
||||
break // No more waiters blocked.
|
||||
}
|
||||
|
||||
w := next.Value.(waiter)
|
||||
if s.size-s.cur < w.n {
|
||||
// Not enough tokens for the next waiter. We could keep going (to try to
|
||||
// find a waiter with a smaller request), but under load that could cause
|
||||
// starvation for large requests; instead, we leave all remaining waiters
|
||||
// blocked.
|
||||
//
|
||||
// Consider a semaphore used as a read-write lock, with N tokens, N
|
||||
// readers, and one writer. Each reader can Acquire(1) to obtain a read
|
||||
// lock. The writer can Acquire(N) to obtain a write lock, excluding all
|
||||
// of the readers. If we allow the readers to jump ahead in the queue,
|
||||
// the writer will starve — there is always one token available for every
|
||||
// reader.
|
||||
break
|
||||
}
|
||||
|
||||
s.cur += w.n
|
||||
s.waiters.Remove(next)
|
||||
close(w.ready)
|
||||
}
|
||||
s.mu.Unlock()
|
||||
}
|
Loading…
Reference in New Issue
Block a user