From 9f6058d029ebffdcd6a8a954143cd4070f62f720 Mon Sep 17 00:00:00 2001 From: Justin Chadwell Date: Tue, 24 Jan 2023 11:15:52 +0000 Subject: [PATCH] pushWriter: correctly propagate errors In the refactor from 926b9c72f61b5be6bf8d952512f1d0932fbaf898, the error handling was substantially reworked, and changed the types of errors returned. Notably, in the case of a network error, instead of propogating the error through to return from pushWriter.Write (as previously), it would be propagated through to pushWriter.Commit - however, this is too late, since we've already closed the io.Pipe by the time we would have reached this function. Therefore, we get the generic error message "io: read/write on closed pipe" for *every network error*. This patch corrects this behavior to ensure that the correct error object is always returned as early as possible, by checking the error result after writing and detecting a closed pipe. Additionally, we do some additional hardening - specifically we prevent falling through when resetting the content or detecting errors, and update the tests to explicitly check for the ErrReset message. Signed-off-by: Justin Chadwell --- remotes/docker/pusher.go | 40 +++++++++++++++++++---------------- remotes/docker/pusher_test.go | 29 ++++++++++--------------- 2 files changed, 33 insertions(+), 36 deletions(-) diff --git a/remotes/docker/pusher.go b/remotes/docker/pusher.go index a509cfe3c..ef6e8056a 100644 --- a/remotes/docker/pusher.go +++ b/remotes/docker/pusher.go @@ -380,17 +380,24 @@ func (pw *pushWriter) Write(p []byte) (n int, err error) { // If content has already been written, the bytes // cannot be written and the caller must reset - if status.Offset > 0 { - status.Offset = 0 - status.UpdatedAt = time.Now() - pw.tracker.SetStatus(pw.ref, status) - return 0, content.ErrReset - } + status.Offset = 0 + status.UpdatedAt = time.Now() + pw.tracker.SetStatus(pw.ref, status) + return 0, content.ErrReset default: } } n, err = pw.pipe.Write(p) + if errors.Is(err, io.ErrClosedPipe) { + // if the pipe is closed, we might have the original error on the error + // channel - so we should try and get it + select { + case err2 := <-pw.errC: + err = err2 + default: + } + } status.Offset += int64(n) status.UpdatedAt = time.Now() pw.tracker.SetStatus(pw.ref, status) @@ -431,7 +438,7 @@ func (pw *pushWriter) Digest() digest.Digest { func (pw *pushWriter) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error { // Check whether read has already thrown an error - if _, err := pw.pipe.Write([]byte{}); err != nil && err != io.ErrClosedPipe { + if _, err := pw.pipe.Write([]byte{}); err != nil && !errors.Is(err, io.ErrClosedPipe) { return fmt.Errorf("pipe error before commit: %w", err) } @@ -442,9 +449,7 @@ func (pw *pushWriter) Commit(ctx context.Context, size int64, expected digest.Di var resp *http.Response select { case err := <-pw.errC: - if err != nil { - return err - } + return err case resp = <-pw.respC: defer resp.Body.Close() case p, ok := <-pw.pipeC: @@ -456,18 +461,17 @@ func (pw *pushWriter) Commit(ctx context.Context, size int64, expected digest.Di } pw.pipe.CloseWithError(content.ErrReset) pw.pipe = p + + // If content has already been written, the bytes + // cannot be written again and the caller must reset status, err := pw.tracker.GetStatus(pw.ref) if err != nil { return err } - // If content has already been written, the bytes - // cannot be written again and the caller must reset - if status.Offset > 0 { - status.Offset = 0 - status.UpdatedAt = time.Now() - pw.tracker.SetStatus(pw.ref, status) - return content.ErrReset - } + status.Offset = 0 + status.UpdatedAt = time.Now() + pw.tracker.SetStatus(pw.ref, status) + return content.ErrReset } // 201 is specified return status, some registries return diff --git a/remotes/docker/pusher_test.go b/remotes/docker/pusher_test.go index e74f5c4d7..d982a7de5 100644 --- a/remotes/docker/pusher_test.go +++ b/remotes/docker/pusher_test.go @@ -117,26 +117,19 @@ func TestPusherErrReset(t *testing.T) { } w, err := p.push(context.Background(), desc, remotes.MakeRefKey(context.Background(), desc), false) - assert.Equal(t, err, nil, "no error should be there") + assert.NoError(t, err) - w.Write(ct) + // first push should fail with ErrReset + _, err = w.Write(ct) + assert.NoError(t, err) + err = w.Commit(context.Background(), desc.Size, desc.Digest) + assert.Equal(t, content.ErrReset, err) - pw, _ := w.(*pushWriter) - - select { - case p := <-pw.pipeC: - p.Write(ct) - case e := <-pw.errC: - assert.Failf(t, "error: %v while retrying request", e.Error()) - } - - select { - case resp := <-pw.respC: - assert.Equalf(t, resp.StatusCode, http.StatusCreated, - "201 should be the response code when uploading new content") - case <-pw.errC: - assert.Fail(t, "should not give error") - } + // second push should succeed + _, err = w.Write(ct) + assert.NoError(t, err) + err = w.Commit(context.Background(), desc.Size, desc.Digest) + assert.NoError(t, err) } func tryUpload(ctx context.Context, t *testing.T, p dockerPusher, layerContent []byte) error {