pushWriter: correctly propagate errors

In the refactor from 926b9c72f61b5be6bf8d952512f1d0932fbaf898, the error
handling was substantially reworked, and changed the types of errors
returned.

Notably, in the case of a network error, instead of propogating the
error through to return from pushWriter.Write (as previously), it would
be propagated through to pushWriter.Commit - however, this is too late,
since we've already closed the io.Pipe by the time we would have reached
this function. Therefore, we get the generic error message  "io:
read/write on closed pipe" for *every network error*.

This patch corrects this behavior to ensure that the correct error
object is always returned as early as possible, by checking the error
result after writing and detecting a closed pipe.

Additionally, we do some additional hardening - specifically we prevent
falling through when resetting the content or detecting errors, and
update the tests to explicitly check for the ErrReset message.

Signed-off-by: Justin Chadwell <me@jedevc.com>
This commit is contained in:
Justin Chadwell 2023-01-24 11:15:52 +00:00
parent 3f565daf68
commit 9f6058d029
2 changed files with 33 additions and 36 deletions

View File

@ -380,17 +380,24 @@ func (pw *pushWriter) Write(p []byte) (n int, err error) {
// If content has already been written, the bytes // If content has already been written, the bytes
// cannot be written and the caller must reset // cannot be written and the caller must reset
if status.Offset > 0 { status.Offset = 0
status.Offset = 0 status.UpdatedAt = time.Now()
status.UpdatedAt = time.Now() pw.tracker.SetStatus(pw.ref, status)
pw.tracker.SetStatus(pw.ref, status) return 0, content.ErrReset
return 0, content.ErrReset
}
default: default:
} }
} }
n, err = pw.pipe.Write(p) n, err = pw.pipe.Write(p)
if errors.Is(err, io.ErrClosedPipe) {
// if the pipe is closed, we might have the original error on the error
// channel - so we should try and get it
select {
case err2 := <-pw.errC:
err = err2
default:
}
}
status.Offset += int64(n) status.Offset += int64(n)
status.UpdatedAt = time.Now() status.UpdatedAt = time.Now()
pw.tracker.SetStatus(pw.ref, status) pw.tracker.SetStatus(pw.ref, status)
@ -431,7 +438,7 @@ func (pw *pushWriter) Digest() digest.Digest {
func (pw *pushWriter) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error { func (pw *pushWriter) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error {
// Check whether read has already thrown an error // Check whether read has already thrown an error
if _, err := pw.pipe.Write([]byte{}); err != nil && err != io.ErrClosedPipe { if _, err := pw.pipe.Write([]byte{}); err != nil && !errors.Is(err, io.ErrClosedPipe) {
return fmt.Errorf("pipe error before commit: %w", err) return fmt.Errorf("pipe error before commit: %w", err)
} }
@ -442,9 +449,7 @@ func (pw *pushWriter) Commit(ctx context.Context, size int64, expected digest.Di
var resp *http.Response var resp *http.Response
select { select {
case err := <-pw.errC: case err := <-pw.errC:
if err != nil { return err
return err
}
case resp = <-pw.respC: case resp = <-pw.respC:
defer resp.Body.Close() defer resp.Body.Close()
case p, ok := <-pw.pipeC: case p, ok := <-pw.pipeC:
@ -456,18 +461,17 @@ func (pw *pushWriter) Commit(ctx context.Context, size int64, expected digest.Di
} }
pw.pipe.CloseWithError(content.ErrReset) pw.pipe.CloseWithError(content.ErrReset)
pw.pipe = p pw.pipe = p
// If content has already been written, the bytes
// cannot be written again and the caller must reset
status, err := pw.tracker.GetStatus(pw.ref) status, err := pw.tracker.GetStatus(pw.ref)
if err != nil { if err != nil {
return err return err
} }
// If content has already been written, the bytes status.Offset = 0
// cannot be written again and the caller must reset status.UpdatedAt = time.Now()
if status.Offset > 0 { pw.tracker.SetStatus(pw.ref, status)
status.Offset = 0 return content.ErrReset
status.UpdatedAt = time.Now()
pw.tracker.SetStatus(pw.ref, status)
return content.ErrReset
}
} }
// 201 is specified return status, some registries return // 201 is specified return status, some registries return

View File

@ -117,26 +117,19 @@ func TestPusherErrReset(t *testing.T) {
} }
w, err := p.push(context.Background(), desc, remotes.MakeRefKey(context.Background(), desc), false) w, err := p.push(context.Background(), desc, remotes.MakeRefKey(context.Background(), desc), false)
assert.Equal(t, err, nil, "no error should be there") assert.NoError(t, err)
w.Write(ct) // first push should fail with ErrReset
_, err = w.Write(ct)
assert.NoError(t, err)
err = w.Commit(context.Background(), desc.Size, desc.Digest)
assert.Equal(t, content.ErrReset, err)
pw, _ := w.(*pushWriter) // second push should succeed
_, err = w.Write(ct)
select { assert.NoError(t, err)
case p := <-pw.pipeC: err = w.Commit(context.Background(), desc.Size, desc.Digest)
p.Write(ct) assert.NoError(t, err)
case e := <-pw.errC:
assert.Failf(t, "error: %v while retrying request", e.Error())
}
select {
case resp := <-pw.respC:
assert.Equalf(t, resp.StatusCode, http.StatusCreated,
"201 should be the response code when uploading new content")
case <-pw.errC:
assert.Fail(t, "should not give error")
}
} }
func tryUpload(ctx context.Context, t *testing.T, p dockerPusher, layerContent []byte) error { func tryUpload(ctx context.Context, t *testing.T, p dockerPusher, layerContent []byte) error {