diff --git a/remotes/docker/pusher.go b/remotes/docker/pusher.go index a509cfe3c..ef6e8056a 100644 --- a/remotes/docker/pusher.go +++ b/remotes/docker/pusher.go @@ -380,17 +380,24 @@ func (pw *pushWriter) Write(p []byte) (n int, err error) { // If content has already been written, the bytes // cannot be written and the caller must reset - if status.Offset > 0 { - status.Offset = 0 - status.UpdatedAt = time.Now() - pw.tracker.SetStatus(pw.ref, status) - return 0, content.ErrReset - } + status.Offset = 0 + status.UpdatedAt = time.Now() + pw.tracker.SetStatus(pw.ref, status) + return 0, content.ErrReset default: } } n, err = pw.pipe.Write(p) + if errors.Is(err, io.ErrClosedPipe) { + // if the pipe is closed, we might have the original error on the error + // channel - so we should try and get it + select { + case err2 := <-pw.errC: + err = err2 + default: + } + } status.Offset += int64(n) status.UpdatedAt = time.Now() pw.tracker.SetStatus(pw.ref, status) @@ -431,7 +438,7 @@ func (pw *pushWriter) Digest() digest.Digest { func (pw *pushWriter) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error { // Check whether read has already thrown an error - if _, err := pw.pipe.Write([]byte{}); err != nil && err != io.ErrClosedPipe { + if _, err := pw.pipe.Write([]byte{}); err != nil && !errors.Is(err, io.ErrClosedPipe) { return fmt.Errorf("pipe error before commit: %w", err) } @@ -442,9 +449,7 @@ func (pw *pushWriter) Commit(ctx context.Context, size int64, expected digest.Di var resp *http.Response select { case err := <-pw.errC: - if err != nil { - return err - } + return err case resp = <-pw.respC: defer resp.Body.Close() case p, ok := <-pw.pipeC: @@ -456,18 +461,17 @@ func (pw *pushWriter) Commit(ctx context.Context, size int64, expected digest.Di } pw.pipe.CloseWithError(content.ErrReset) pw.pipe = p + + // If content has already been written, the bytes + // cannot be written again and the caller must reset status, err := pw.tracker.GetStatus(pw.ref) if err != nil { return err } - // If content has already been written, the bytes - // cannot be written again and the caller must reset - if status.Offset > 0 { - status.Offset = 0 - status.UpdatedAt = time.Now() - pw.tracker.SetStatus(pw.ref, status) - return content.ErrReset - } + status.Offset = 0 + status.UpdatedAt = time.Now() + pw.tracker.SetStatus(pw.ref, status) + return content.ErrReset } // 201 is specified return status, some registries return diff --git a/remotes/docker/pusher_test.go b/remotes/docker/pusher_test.go index e74f5c4d7..d982a7de5 100644 --- a/remotes/docker/pusher_test.go +++ b/remotes/docker/pusher_test.go @@ -117,26 +117,19 @@ func TestPusherErrReset(t *testing.T) { } w, err := p.push(context.Background(), desc, remotes.MakeRefKey(context.Background(), desc), false) - assert.Equal(t, err, nil, "no error should be there") + assert.NoError(t, err) - w.Write(ct) + // first push should fail with ErrReset + _, err = w.Write(ct) + assert.NoError(t, err) + err = w.Commit(context.Background(), desc.Size, desc.Digest) + assert.Equal(t, content.ErrReset, err) - pw, _ := w.(*pushWriter) - - select { - case p := <-pw.pipeC: - p.Write(ct) - case e := <-pw.errC: - assert.Failf(t, "error: %v while retrying request", e.Error()) - } - - select { - case resp := <-pw.respC: - assert.Equalf(t, resp.StatusCode, http.StatusCreated, - "201 should be the response code when uploading new content") - case <-pw.errC: - assert.Fail(t, "should not give error") - } + // second push should succeed + _, err = w.Write(ct) + assert.NoError(t, err) + err = w.Commit(context.Background(), desc.Size, desc.Digest) + assert.NoError(t, err) } func tryUpload(ctx context.Context, t *testing.T, p dockerPusher, layerContent []byte) error {