diff --git a/core/content/helpers.go b/core/content/helpers.go index 424db0fd8..101af94a9 100644 --- a/core/content/helpers.go +++ b/core/content/helpers.go @@ -31,9 +31,6 @@ import ( ocispec "github.com/opencontainers/image-spec/specs-go/v1" ) -// maxResets is the no.of times the Copy() method can tolerate a reset of the body -const maxResets = 5 - var ErrReset = errors.New("writer has been reset") var bufPool = sync.Pool{ @@ -149,7 +146,7 @@ func OpenWriter(ctx context.Context, cs Ingester, opts ...WriterOpt) (Writer, er // Copy is buffered, so no need to wrap reader in buffered io. func Copy(ctx context.Context, cw Writer, or io.Reader, size int64, expected digest.Digest, opts ...Opt) error { r := or - for i := 0; i < maxResets; i++ { + for i := 0; ; i++ { if i >= 1 { log.G(ctx).WithField("digest", expected).Debugf("retrying copy due to reset") } @@ -189,9 +186,6 @@ func Copy(ctx context.Context, cw Writer, or io.Reader, size int64, expected dig } return nil } - - log.G(ctx).WithField("digest", expected).Errorf("failed to copy after %d retries", maxResets) - return fmt.Errorf("failed to copy after %d retries", maxResets) } // CopyReaderAt copies to a writer from a given reader at for the given diff --git a/core/content/helpers_test.go b/core/content/helpers_test.go index 8601dc7b0..b23d32d9d 100644 --- a/core/content/helpers_test.go +++ b/core/content/helpers_test.go @@ -20,7 +20,7 @@ import ( "bytes" "context" _ "crypto/sha256" // required by go-digest - "fmt" + "errors" "io" "strings" "testing" @@ -42,7 +42,7 @@ func TestCopy(t *testing.T) { cf1 := func(buf *bytes.Buffer, st Status) commitFunction { i := 0 return func() error { - // function resets the first time + // function resets the first time, but then succeeds after if i == 0 { // this is the case where, the pipewriter to which the data was being written has // changed. which means we need to clear the buffer @@ -55,11 +55,28 @@ func TestCopy(t *testing.T) { } } + cf2err := errors.New("commit failed") cf2 := func(buf *bytes.Buffer, st Status) commitFunction { i := 0 return func() error { - // function resets for more than the maxReset value - if i < maxResets+1 { + // function resets a lot of times, and eventually fails + if i < 10 { + // this is the case where, the pipewriter to which the data was being written has + // changed. which means we need to clear the buffer + i++ + buf.Reset() + st.Offset = 0 + return ErrReset + } + return cf2err + } + } + + cf3 := func(buf *bytes.Buffer, st Status) commitFunction { + i := 0 + return func() error { + // function resets a lot of times, and eventually succeeds + if i < 10 { // this is the case where, the pipewriter to which the data was being written has // changed. which means we need to clear the buffer i++ @@ -73,8 +90,10 @@ func TestCopy(t *testing.T) { s1 := Status{} s2 := Status{} + s3 := Status{} b1 := bytes.Buffer{} b2 := bytes.Buffer{} + b3 := bytes.Buffer{} var testcases = []struct { name string @@ -130,7 +149,7 @@ func TestCopy(t *testing.T) { expected: "content to copy", }, { - name: "write fails more than maxReset times due to reset", + name: "write fails after lots of resets", source: newCopySource("content to copy"), writer: fakeWriter{ Buffer: &b2, @@ -138,7 +157,17 @@ func TestCopy(t *testing.T) { commitFunc: cf2(&b2, s2), }, expected: "", - expectedErr: fmt.Errorf("failed to copy after %d retries", maxResets), + expectedErr: cf2err, + }, + { + name: "write succeeds after lots of resets", + source: newCopySource("content to copy"), + writer: fakeWriter{ + Buffer: &b3, + status: s3, + commitFunc: cf3(&b3, s3), + }, + expected: "content to copy", }, } @@ -153,7 +182,7 @@ func TestCopy(t *testing.T) { // if an error is expected then further comparisons are not required if testcase.expectedErr != nil { - assert.Equal(t, testcase.expectedErr, err) + assert.ErrorIs(t, err, testcase.expectedErr) return } diff --git a/core/remotes/docker/pusher.go b/core/remotes/docker/pusher.go index c3d0c6069..f994fff5a 100644 --- a/core/remotes/docker/pusher.go +++ b/core/remotes/docker/pusher.go @@ -280,7 +280,7 @@ func (p dockerPusher) push(ctx context.Context, desc ocispec.Descriptor, ref str req.body = func() (io.ReadCloser, error) { pr, pw := io.Pipe() pushw.setPipe(pw) - return io.NopCloser(pr), nil + return pr, nil } req.size = desc.Size @@ -288,7 +288,6 @@ func (p dockerPusher) push(ctx context.Context, desc ocispec.Descriptor, ref str resp, err := req.doWithRetries(ctx, nil) if err != nil { pushw.setError(err) - pushw.Close() return } @@ -298,7 +297,7 @@ func (p dockerPusher) push(ctx context.Context, desc ocispec.Descriptor, ref str err := remoteserrors.NewUnexpectedStatusErr(resp) log.G(ctx).WithField("resp", resp).WithField("body", string(err.(remoteserrors.ErrUnexpectedStatus).Body)).Debug("unexpected response") pushw.setError(err) - pushw.Close() + return } pushw.setResponse(resp) }() @@ -331,10 +330,12 @@ type pushWriter struct { pipe *io.PipeWriter - pipeC chan *io.PipeWriter - respC chan *http.Response + done chan struct{} closeOnce sync.Once - errC chan error + + pipeC chan *io.PipeWriter + respC chan *http.Response + errC chan error isManifest bool @@ -352,19 +353,51 @@ func newPushWriter(db *dockerBase, ref string, expected digest.Digest, tracker S pipeC: make(chan *io.PipeWriter, 1), respC: make(chan *http.Response, 1), errC: make(chan error, 1), + done: make(chan struct{}), isManifest: isManifest, } } func (pw *pushWriter) setPipe(p *io.PipeWriter) { - pw.pipeC <- p + select { + case <-pw.done: + case pw.pipeC <- p: + } } func (pw *pushWriter) setError(err error) { - pw.errC <- err + select { + case <-pw.done: + case pw.errC <- err: + } } + func (pw *pushWriter) setResponse(resp *http.Response) { - pw.respC <- resp + select { + case <-pw.done: + case pw.respC <- resp: + } +} + +func (pw *pushWriter) replacePipe(p *io.PipeWriter) error { + if pw.pipe == nil { + pw.pipe = p + return nil + } + + pw.pipe.CloseWithError(content.ErrReset) + pw.pipe = p + + // If content has already been written, the bytes + // cannot be written again and the caller must reset + status, err := pw.tracker.GetStatus(pw.ref) + if err != nil { + return err + } + status.Offset = 0 + status.UpdatedAt = time.Now() + pw.tracker.SetStatus(pw.ref, status) + return content.ErrReset } func (pw *pushWriter) Write(p []byte) (n int, err error) { @@ -374,26 +407,18 @@ func (pw *pushWriter) Write(p []byte) (n int, err error) { } if pw.pipe == nil { - p, ok := <-pw.pipeC - if !ok { + select { + case <-pw.done: return 0, io.ErrClosedPipe + case p := <-pw.pipeC: + pw.replacePipe(p) } - pw.pipe = p } else { select { - case p, ok := <-pw.pipeC: - if !ok { - return 0, io.ErrClosedPipe - } - pw.pipe.CloseWithError(content.ErrReset) - pw.pipe = p - - // If content has already been written, the bytes - // cannot be written and the caller must reset - status.Offset = 0 - status.UpdatedAt = time.Now() - pw.tracker.SetStatus(pw.ref, status) - return 0, content.ErrReset + case <-pw.done: + return 0, io.ErrClosedPipe + case p := <-pw.pipeC: + return 0, pw.replacePipe(p) default: } } @@ -403,9 +428,13 @@ func (pw *pushWriter) Write(p []byte) (n int, err error) { // if the pipe is closed, we might have the original error on the error // channel - so we should try and get it select { - case err2 := <-pw.errC: - err = err2 - default: + case <-pw.done: + case err = <-pw.errC: + pw.Close() + case p := <-pw.pipeC: + return 0, pw.replacePipe(p) + case resp := <-pw.respC: + pw.setResponse(resp) } } status.Offset += int64(n) @@ -418,7 +447,7 @@ func (pw *pushWriter) Close() error { // Ensure pipeC is closed but handle `Close()` being // called multiple times without panicking pw.closeOnce.Do(func() { - close(pw.pipeC) + close(pw.done) }) if pw.pipe != nil { status, err := pw.tracker.GetStatus(pw.ref) @@ -458,30 +487,18 @@ func (pw *pushWriter) Commit(ctx context.Context, size int64, expected digest.Di // TODO: timeout waiting for response var resp *http.Response select { + case <-pw.done: + return io.ErrClosedPipe case err := <-pw.errC: + pw.Close() return err case resp = <-pw.respC: defer resp.Body.Close() - case p, ok := <-pw.pipeC: + case p := <-pw.pipeC: // check whether the pipe has changed in the commit, because sometimes Write // can complete successfully, but the pipe may have changed. In that case, the // content needs to be reset. - if !ok { - return io.ErrClosedPipe - } - pw.pipe.CloseWithError(content.ErrReset) - pw.pipe = p - - // If content has already been written, the bytes - // cannot be written again and the caller must reset - status, err := pw.tracker.GetStatus(pw.ref) - if err != nil { - return err - } - status.Offset = 0 - status.UpdatedAt = time.Now() - pw.tracker.SetStatus(pw.ref, status) - return content.ErrReset + return pw.replacePipe(p) } // 201 is specified return status, some registries return