diff --git a/content/helpers.go b/content/helpers.go index a093c210b..cfe309126 100644 --- a/content/helpers.go +++ b/content/helpers.go @@ -20,7 +20,9 @@ import ( "context" "io" "io/ioutil" + "math/rand" "sync" + "time" "github.com/containerd/containerd/errdefs" "github.com/opencontainers/go-digest" @@ -64,7 +66,7 @@ func ReadBlob(ctx context.Context, provider Provider, dgst digest.Digest) ([]byt // // Copy is buffered, so no need to wrap reader in buffered io. func WriteBlob(ctx context.Context, cs Ingester, ref string, r io.Reader, size int64, expected digest.Digest, opts ...Opt) error { - cw, err := cs.Writer(ctx, ref, size, expected) + cw, err := OpenWriter(ctx, cs, ref, size, expected) if err != nil { if !errdefs.IsAlreadyExists(err) { return err @@ -77,6 +79,43 @@ func WriteBlob(ctx context.Context, cs Ingester, ref string, r io.Reader, size i return Copy(ctx, cw, r, size, expected, opts...) } +// OpenWriter opens a new writer for the given reference, retrying if the writer +// is locked until the reference is available or returns an error. +func OpenWriter(ctx context.Context, cs Ingester, ref string, size int64, expected digest.Digest) (Writer, error) { + var ( + cw Writer + err error + retry = 16 + ) + for { + cw, err = cs.Writer(ctx, ref, size, expected) + if err != nil { + if !errdefs.IsUnavailable(err) { + return nil, err + } + + // TODO: Check status to determine if the writer is active, + // continue waiting while active, otherwise return lock + // error or abort. Requires asserting for an ingest manager + + select { + case <-time.After(time.Millisecond * time.Duration(rand.Intn(retry))): + if retry < 2048 { + retry = retry << 1 + } + continue + case <-ctx.Done(): + // Propagate lock error + return nil, err + } + + } + break + } + + return cw, err +} + // Copy copies data with the expected digest from the reader into the // provided content store writer. This copy commits the writer. // diff --git a/remotes/docker/schema1/converter.go b/remotes/docker/schema1/converter.go index 1cf4dd7a1..19a0d9856 100644 --- a/remotes/docker/schema1/converter.go +++ b/remotes/docker/schema1/converter.go @@ -25,7 +25,6 @@ import ( "fmt" "io" "io/ioutil" - "math/rand" "strings" "sync" "time" @@ -256,10 +255,9 @@ func (c *Converter) fetchBlob(ctx context.Context, desc ocispec.Descriptor) erro log.G(ctx).Debug("fetch blob") var ( - ref = remotes.MakeRefKey(ctx, desc) - calc = newBlobStateCalculator() - retry = 16 - size = desc.Size + ref = remotes.MakeRefKey(ctx, desc) + calc = newBlobStateCalculator() + size = desc.Size ) // size may be unknown, set to zero for content ingest @@ -267,20 +265,9 @@ func (c *Converter) fetchBlob(ctx context.Context, desc ocispec.Descriptor) erro size = 0 } -tryit: - cw, err := c.contentStore.Writer(ctx, ref, size, desc.Digest) + cw, err := content.OpenWriter(ctx, c.contentStore, ref, size, desc.Digest) if err != nil { - if errdefs.IsUnavailable(err) { - select { - case <-time.After(time.Millisecond * time.Duration(rand.Intn(retry))): - if retry < 2048 { - retry = retry << 1 - } - goto tryit - case <-ctx.Done(): - return err - } - } else if !errdefs.IsAlreadyExists(err) { + if !errdefs.IsAlreadyExists(err) { return err } diff --git a/remotes/handlers.go b/remotes/handlers.go index 38b4bcd45..f0334d516 100644 --- a/remotes/handlers.go +++ b/remotes/handlers.go @@ -20,10 +20,8 @@ import ( "context" "fmt" "io" - "math/rand" "strings" "sync" - "time" "github.com/containerd/containerd/content" "github.com/containerd/containerd/errdefs" @@ -83,38 +81,14 @@ func FetchHandler(ingester content.Ingester, fetcher Fetcher) images.HandlerFunc func fetch(ctx context.Context, ingester content.Ingester, fetcher Fetcher, desc ocispec.Descriptor) error { log.G(ctx).Debug("fetch") - var ( - ref = MakeRefKey(ctx, desc) - cw content.Writer - err error - retry = 16 - ) - for { - cw, err = ingester.Writer(ctx, ref, desc.Size, desc.Digest) - if err != nil { - if errdefs.IsAlreadyExists(err) { - return nil - } else if !errdefs.IsUnavailable(err) { - return err - } - - // TODO: On first time locked is encountered, get status - // of writer and abort if not updated recently. - - select { - case <-time.After(time.Millisecond * time.Duration(rand.Intn(retry))): - if retry < 2048 { - retry = retry << 1 - } - continue - case <-ctx.Done(): - // Propagate lock error - return err - } + cw, err := content.OpenWriter(ctx, ingester, MakeRefKey(ctx, desc), desc.Size, desc.Digest) + if err != nil { + if errdefs.IsAlreadyExists(err) { + return nil } - defer cw.Close() - break + return err } + defer cw.Close() ws, err := cw.Status() if err != nil {