Add writer open helper to handle unavailable refs

Updates blob writer helper to use new open and ensure
unavailable errors are always handled.
Removes duplication of unavailable handling code.

Signed-off-by: Derek McGowan <derek@mcgstyle.net>
This commit is contained in:
Derek McGowan 2018-03-21 16:21:39 -07:00
parent 4c8bbb55b7
commit 5304ef294b
No known key found for this signature in database
GPG Key ID: F58C5D0A4405ACDB
3 changed files with 51 additions and 51 deletions

View File

@ -20,7 +20,9 @@ import (
"context"
"io"
"io/ioutil"
"math/rand"
"sync"
"time"
"github.com/containerd/containerd/errdefs"
"github.com/opencontainers/go-digest"
@ -64,7 +66,7 @@ func ReadBlob(ctx context.Context, provider Provider, dgst digest.Digest) ([]byt
//
// Copy is buffered, so no need to wrap reader in buffered io.
func WriteBlob(ctx context.Context, cs Ingester, ref string, r io.Reader, size int64, expected digest.Digest, opts ...Opt) error {
cw, err := cs.Writer(ctx, ref, size, expected)
cw, err := OpenWriter(ctx, cs, ref, size, expected)
if err != nil {
if !errdefs.IsAlreadyExists(err) {
return err
@ -77,6 +79,43 @@ func WriteBlob(ctx context.Context, cs Ingester, ref string, r io.Reader, size i
return Copy(ctx, cw, r, size, expected, opts...)
}
// OpenWriter opens a new writer for the given reference, retrying if the writer
// is locked until the reference is available or returns an error.
func OpenWriter(ctx context.Context, cs Ingester, ref string, size int64, expected digest.Digest) (Writer, error) {
var (
cw Writer
err error
retry = 16
)
for {
cw, err = cs.Writer(ctx, ref, size, expected)
if err != nil {
if !errdefs.IsUnavailable(err) {
return nil, err
}
// TODO: Check status to determine if the writer is active,
// continue waiting while active, otherwise return lock
// error or abort. Requires asserting for an ingest manager
select {
case <-time.After(time.Millisecond * time.Duration(rand.Intn(retry))):
if retry < 2048 {
retry = retry << 1
}
continue
case <-ctx.Done():
// Propagate lock error
return nil, err
}
}
break
}
return cw, err
}
// Copy copies data with the expected digest from the reader into the
// provided content store writer. This copy commits the writer.
//

View File

@ -25,7 +25,6 @@ import (
"fmt"
"io"
"io/ioutil"
"math/rand"
"strings"
"sync"
"time"
@ -256,10 +255,9 @@ func (c *Converter) fetchBlob(ctx context.Context, desc ocispec.Descriptor) erro
log.G(ctx).Debug("fetch blob")
var (
ref = remotes.MakeRefKey(ctx, desc)
calc = newBlobStateCalculator()
retry = 16
size = desc.Size
ref = remotes.MakeRefKey(ctx, desc)
calc = newBlobStateCalculator()
size = desc.Size
)
// size may be unknown, set to zero for content ingest
@ -267,20 +265,9 @@ func (c *Converter) fetchBlob(ctx context.Context, desc ocispec.Descriptor) erro
size = 0
}
tryit:
cw, err := c.contentStore.Writer(ctx, ref, size, desc.Digest)
cw, err := content.OpenWriter(ctx, c.contentStore, ref, size, desc.Digest)
if err != nil {
if errdefs.IsUnavailable(err) {
select {
case <-time.After(time.Millisecond * time.Duration(rand.Intn(retry))):
if retry < 2048 {
retry = retry << 1
}
goto tryit
case <-ctx.Done():
return err
}
} else if !errdefs.IsAlreadyExists(err) {
if !errdefs.IsAlreadyExists(err) {
return err
}

View File

@ -20,10 +20,8 @@ import (
"context"
"fmt"
"io"
"math/rand"
"strings"
"sync"
"time"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/errdefs"
@ -83,38 +81,14 @@ func FetchHandler(ingester content.Ingester, fetcher Fetcher) images.HandlerFunc
func fetch(ctx context.Context, ingester content.Ingester, fetcher Fetcher, desc ocispec.Descriptor) error {
log.G(ctx).Debug("fetch")
var (
ref = MakeRefKey(ctx, desc)
cw content.Writer
err error
retry = 16
)
for {
cw, err = ingester.Writer(ctx, ref, desc.Size, desc.Digest)
if err != nil {
if errdefs.IsAlreadyExists(err) {
return nil
} else if !errdefs.IsUnavailable(err) {
return err
}
// TODO: On first time locked is encountered, get status
// of writer and abort if not updated recently.
select {
case <-time.After(time.Millisecond * time.Duration(rand.Intn(retry))):
if retry < 2048 {
retry = retry << 1
}
continue
case <-ctx.Done():
// Propagate lock error
return err
}
cw, err := content.OpenWriter(ctx, ingester, MakeRefKey(ctx, desc), desc.Size, desc.Digest)
if err != nil {
if errdefs.IsAlreadyExists(err) {
return nil
}
defer cw.Close()
break
return err
}
defer cw.Close()
ws, err := cw.Status()
if err != nil {