From 4660f63033cf8036a21f97021ea20c8565b0cde7 Mon Sep 17 00:00:00 2001 From: Justin Chadwell Date: Wed, 12 Apr 2023 17:02:09 +0100 Subject: [PATCH 1/7] copy: remove wrapping io.NopCloser from push writer pipe io.Pipe produces a PipeReader and a PipeWriter - a close on the write side, causes an error on both the read and write sides, while a close on the read side causes an error on only the read side. Previously, we explicitly prohibited closing from the read side. However, http.Request.Body requires that "calling Close should unblock a Read waiting for input". Our reader will not do this - calling close becomes a no-op. This can cause a deadlock because client.Do may never terminate in some circumstances. We need the Reader side to close its side of the pipe as well, which it already does using the go standard library - otherwise, we can hang forever, writing to a pipe that will never be closed. Allowing the requester to close the body should be safe - we never reuse the same reader between requests, as the result of body() will never be reused by the guarantees of the standard library. Signed-off-by: Justin Chadwell --- core/remotes/docker/pusher.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/remotes/docker/pusher.go b/core/remotes/docker/pusher.go index c3d0c6069..d38b0c70f 100644 --- a/core/remotes/docker/pusher.go +++ b/core/remotes/docker/pusher.go @@ -280,7 +280,7 @@ func (p dockerPusher) push(ctx context.Context, desc ocispec.Descriptor, ref str req.body = func() (io.ReadCloser, error) { pr, pw := io.Pipe() pushw.setPipe(pw) - return io.NopCloser(pr), nil + return pr, nil } req.size = desc.Size From 91a50f70b7024c4a40d22135b06dab639377f546 Mon Sep 17 00:00:00 2001 From: Justin Chadwell Date: Thu, 13 Apr 2023 15:49:20 +0100 Subject: [PATCH 2/7] copy: check if writer was closed before setting a pipe If Close is called externally before a request is attempted, then we will accidentally attempt to send to a closed channel, causing a panic. To avoid this, we can check to see if Close has been called, using a done channel. If this channel is ever done, we drop any incoming errors, requests or pipes - we don't need them, since we're done. Signed-off-by: Justin Chadwell --- core/remotes/docker/pusher.go | 54 +++++++++++++++++++++++------------ 1 file changed, 35 insertions(+), 19 deletions(-) diff --git a/core/remotes/docker/pusher.go b/core/remotes/docker/pusher.go index d38b0c70f..27f5a94d7 100644 --- a/core/remotes/docker/pusher.go +++ b/core/remotes/docker/pusher.go @@ -331,10 +331,12 @@ type pushWriter struct { pipe *io.PipeWriter - pipeC chan *io.PipeWriter - respC chan *http.Response + done chan struct{} closeOnce sync.Once - errC chan error + + pipeC chan *io.PipeWriter + respC chan *http.Response + errC chan error isManifest bool @@ -352,19 +354,30 @@ func newPushWriter(db *dockerBase, ref string, expected digest.Digest, tracker S pipeC: make(chan *io.PipeWriter, 1), respC: make(chan *http.Response, 1), errC: make(chan error, 1), + done: make(chan struct{}), isManifest: isManifest, } } func (pw *pushWriter) setPipe(p *io.PipeWriter) { - pw.pipeC <- p + select { + case <-pw.done: + case pw.pipeC <- p: + } } func (pw *pushWriter) setError(err error) { - pw.errC <- err + select { + case <-pw.done: + case pw.errC <- err: + } } + func (pw *pushWriter) setResponse(resp *http.Response) { - pw.respC <- resp + select { + case <-pw.done: + case pw.respC <- resp: + } } func (pw *pushWriter) Write(p []byte) (n int, err error) { @@ -374,22 +387,26 @@ func (pw *pushWriter) Write(p []byte) (n int, err error) { } if pw.pipe == nil { - p, ok := <-pw.pipeC - if !ok { + select { + case <-pw.done: return 0, io.ErrClosedPipe + case p := <-pw.pipeC: + pw.pipe = p } - pw.pipe = p } else { select { - case p, ok := <-pw.pipeC: - if !ok { - return 0, io.ErrClosedPipe - } + case <-pw.done: + return 0, io.ErrClosedPipe + case p := <-pw.pipeC: pw.pipe.CloseWithError(content.ErrReset) pw.pipe = p // If content has already been written, the bytes - // cannot be written and the caller must reset + // cannot be written again and the caller must reset + status, err := pw.tracker.GetStatus(pw.ref) + if err != nil { + return 0, err + } status.Offset = 0 status.UpdatedAt = time.Now() pw.tracker.SetStatus(pw.ref, status) @@ -418,7 +435,7 @@ func (pw *pushWriter) Close() error { // Ensure pipeC is closed but handle `Close()` being // called multiple times without panicking pw.closeOnce.Do(func() { - close(pw.pipeC) + close(pw.done) }) if pw.pipe != nil { status, err := pw.tracker.GetStatus(pw.ref) @@ -458,17 +475,16 @@ func (pw *pushWriter) Commit(ctx context.Context, size int64, expected digest.Di // TODO: timeout waiting for response var resp *http.Response select { + case <-pw.done: + return io.ErrClosedPipe case err := <-pw.errC: return err case resp = <-pw.respC: defer resp.Body.Close() - case p, ok := <-pw.pipeC: + case p := <-pw.pipeC: // check whether the pipe has changed in the commit, because sometimes Write // can complete successfully, but the pipe may have changed. In that case, the // content needs to be reset. - if !ok { - return io.ErrClosedPipe - } pw.pipe.CloseWithError(content.ErrReset) pw.pipe = p From 9d7641ff3e64537ad7a2f4b2f01c422b251fdccd Mon Sep 17 00:00:00 2001 From: Justin Chadwell Date: Thu, 13 Apr 2023 15:49:44 +0100 Subject: [PATCH 3/7] copy: improve error detection from closed pipes If we get io.ErrClosedPipe in pushWriter.Write, there are three possible scenarios: - The request has failed, we need to attempt a reset, so we can expect a new pipe incoming on pipeC. - The request has failed, we don't need to attempt a reset, so we can expect an incoming error on errC. - Something else externally has called Close, so we can expect the done channel to be closed. This patch ensures that we block for as long as possible (while still handling each of the above cases, so we avoid hanging), to make sure that we properly return an appropriate error message each time. Signed-off-by: Justin Chadwell --- core/remotes/docker/pusher.go | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/core/remotes/docker/pusher.go b/core/remotes/docker/pusher.go index 27f5a94d7..9efe553b4 100644 --- a/core/remotes/docker/pusher.go +++ b/core/remotes/docker/pusher.go @@ -420,9 +420,22 @@ func (pw *pushWriter) Write(p []byte) (n int, err error) { // if the pipe is closed, we might have the original error on the error // channel - so we should try and get it select { - case err2 := <-pw.errC: - err = err2 - default: + case <-pw.done: + case err = <-pw.errC: + case p := <-pw.pipeC: + pw.pipe.CloseWithError(content.ErrReset) + pw.pipe = p + + // If content has already been written, the bytes + // cannot be written again and the caller must reset + status, err := pw.tracker.GetStatus(pw.ref) + if err != nil { + return 0, err + } + status.Offset = 0 + status.UpdatedAt = time.Now() + pw.tracker.SetStatus(pw.ref, status) + return 0, content.ErrReset } } status.Offset += int64(n) From 651cfa2a2c62b2f0b97d101707512e2b15950e43 Mon Sep 17 00:00:00 2001 From: Justin Chadwell Date: Thu, 13 Apr 2023 15:44:30 +0100 Subject: [PATCH 4/7] pushWriter: refactor reset pipe logic into separate function Signed-off-by: Justin Chadwell --- core/remotes/docker/pusher.go | 65 ++++++++++++++--------------------- 1 file changed, 25 insertions(+), 40 deletions(-) diff --git a/core/remotes/docker/pusher.go b/core/remotes/docker/pusher.go index 9efe553b4..cca118d39 100644 --- a/core/remotes/docker/pusher.go +++ b/core/remotes/docker/pusher.go @@ -380,6 +380,27 @@ func (pw *pushWriter) setResponse(resp *http.Response) { } } +func (pw *pushWriter) replacePipe(p *io.PipeWriter) error { + if pw.pipe == nil { + pw.pipe = p + return nil + } + + pw.pipe.CloseWithError(content.ErrReset) + pw.pipe = p + + // If content has already been written, the bytes + // cannot be written again and the caller must reset + status, err := pw.tracker.GetStatus(pw.ref) + if err != nil { + return err + } + status.Offset = 0 + status.UpdatedAt = time.Now() + pw.tracker.SetStatus(pw.ref, status) + return content.ErrReset +} + func (pw *pushWriter) Write(p []byte) (n int, err error) { status, err := pw.tracker.GetStatus(pw.ref) if err != nil { @@ -391,26 +412,14 @@ func (pw *pushWriter) Write(p []byte) (n int, err error) { case <-pw.done: return 0, io.ErrClosedPipe case p := <-pw.pipeC: - pw.pipe = p + pw.replacePipe(p) } } else { select { case <-pw.done: return 0, io.ErrClosedPipe case p := <-pw.pipeC: - pw.pipe.CloseWithError(content.ErrReset) - pw.pipe = p - - // If content has already been written, the bytes - // cannot be written again and the caller must reset - status, err := pw.tracker.GetStatus(pw.ref) - if err != nil { - return 0, err - } - status.Offset = 0 - status.UpdatedAt = time.Now() - pw.tracker.SetStatus(pw.ref, status) - return 0, content.ErrReset + return 0, pw.replacePipe(p) default: } } @@ -423,19 +432,7 @@ func (pw *pushWriter) Write(p []byte) (n int, err error) { case <-pw.done: case err = <-pw.errC: case p := <-pw.pipeC: - pw.pipe.CloseWithError(content.ErrReset) - pw.pipe = p - - // If content has already been written, the bytes - // cannot be written again and the caller must reset - status, err := pw.tracker.GetStatus(pw.ref) - if err != nil { - return 0, err - } - status.Offset = 0 - status.UpdatedAt = time.Now() - pw.tracker.SetStatus(pw.ref, status) - return 0, content.ErrReset + return 0, pw.replacePipe(p) } } status.Offset += int64(n) @@ -498,19 +495,7 @@ func (pw *pushWriter) Commit(ctx context.Context, size int64, expected digest.Di // check whether the pipe has changed in the commit, because sometimes Write // can complete successfully, but the pipe may have changed. In that case, the // content needs to be reset. - pw.pipe.CloseWithError(content.ErrReset) - pw.pipe = p - - // If content has already been written, the bytes - // cannot be written again and the caller must reset - status, err := pw.tracker.GetStatus(pw.ref) - if err != nil { - return err - } - status.Offset = 0 - status.UpdatedAt = time.Now() - pw.tracker.SetStatus(pw.ref, status) - return content.ErrReset + return pw.replacePipe(p) } // 201 is specified return status, some registries return From e4f91c2df06840af35bfb15af7afca1b7ba9902f Mon Sep 17 00:00:00 2001 From: Justin Chadwell Date: Wed, 12 Apr 2023 14:49:52 +0100 Subject: [PATCH 5/7] copy: remove max number of ErrResets If a writer continually asks to be reset then it should always succeed - it should be the responsibility of the underlying content.Writer to stop producing ErrReset after some amount of time and to instead return the underlying issue - which pushWriter already does today, using the doWithRetries function. doWithRetries already has a separate cap for retries of 6 requests (5 retries after the original failure), and it seems like this would be previously overridden by content.Copy's max number of 5 attempts, hiding the original error. Signed-off-by: Justin Chadwell --- core/content/helpers.go | 8 +------ core/content/helpers_test.go | 43 ++++++++++++++++++++++++++++++------ 2 files changed, 37 insertions(+), 14 deletions(-) diff --git a/core/content/helpers.go b/core/content/helpers.go index 424db0fd8..101af94a9 100644 --- a/core/content/helpers.go +++ b/core/content/helpers.go @@ -31,9 +31,6 @@ import ( ocispec "github.com/opencontainers/image-spec/specs-go/v1" ) -// maxResets is the no.of times the Copy() method can tolerate a reset of the body -const maxResets = 5 - var ErrReset = errors.New("writer has been reset") var bufPool = sync.Pool{ @@ -149,7 +146,7 @@ func OpenWriter(ctx context.Context, cs Ingester, opts ...WriterOpt) (Writer, er // Copy is buffered, so no need to wrap reader in buffered io. func Copy(ctx context.Context, cw Writer, or io.Reader, size int64, expected digest.Digest, opts ...Opt) error { r := or - for i := 0; i < maxResets; i++ { + for i := 0; ; i++ { if i >= 1 { log.G(ctx).WithField("digest", expected).Debugf("retrying copy due to reset") } @@ -189,9 +186,6 @@ func Copy(ctx context.Context, cw Writer, or io.Reader, size int64, expected dig } return nil } - - log.G(ctx).WithField("digest", expected).Errorf("failed to copy after %d retries", maxResets) - return fmt.Errorf("failed to copy after %d retries", maxResets) } // CopyReaderAt copies to a writer from a given reader at for the given diff --git a/core/content/helpers_test.go b/core/content/helpers_test.go index 8601dc7b0..b23d32d9d 100644 --- a/core/content/helpers_test.go +++ b/core/content/helpers_test.go @@ -20,7 +20,7 @@ import ( "bytes" "context" _ "crypto/sha256" // required by go-digest - "fmt" + "errors" "io" "strings" "testing" @@ -42,7 +42,7 @@ func TestCopy(t *testing.T) { cf1 := func(buf *bytes.Buffer, st Status) commitFunction { i := 0 return func() error { - // function resets the first time + // function resets the first time, but then succeeds after if i == 0 { // this is the case where, the pipewriter to which the data was being written has // changed. which means we need to clear the buffer @@ -55,11 +55,28 @@ func TestCopy(t *testing.T) { } } + cf2err := errors.New("commit failed") cf2 := func(buf *bytes.Buffer, st Status) commitFunction { i := 0 return func() error { - // function resets for more than the maxReset value - if i < maxResets+1 { + // function resets a lot of times, and eventually fails + if i < 10 { + // this is the case where, the pipewriter to which the data was being written has + // changed. which means we need to clear the buffer + i++ + buf.Reset() + st.Offset = 0 + return ErrReset + } + return cf2err + } + } + + cf3 := func(buf *bytes.Buffer, st Status) commitFunction { + i := 0 + return func() error { + // function resets a lot of times, and eventually succeeds + if i < 10 { // this is the case where, the pipewriter to which the data was being written has // changed. which means we need to clear the buffer i++ @@ -73,8 +90,10 @@ func TestCopy(t *testing.T) { s1 := Status{} s2 := Status{} + s3 := Status{} b1 := bytes.Buffer{} b2 := bytes.Buffer{} + b3 := bytes.Buffer{} var testcases = []struct { name string @@ -130,7 +149,7 @@ func TestCopy(t *testing.T) { expected: "content to copy", }, { - name: "write fails more than maxReset times due to reset", + name: "write fails after lots of resets", source: newCopySource("content to copy"), writer: fakeWriter{ Buffer: &b2, @@ -138,7 +157,17 @@ func TestCopy(t *testing.T) { commitFunc: cf2(&b2, s2), }, expected: "", - expectedErr: fmt.Errorf("failed to copy after %d retries", maxResets), + expectedErr: cf2err, + }, + { + name: "write succeeds after lots of resets", + source: newCopySource("content to copy"), + writer: fakeWriter{ + Buffer: &b3, + status: s3, + commitFunc: cf3(&b3, s3), + }, + expected: "content to copy", }, } @@ -153,7 +182,7 @@ func TestCopy(t *testing.T) { // if an error is expected then further comparisons are not required if testcase.expectedErr != nil { - assert.Equal(t, testcase.expectedErr, err) + assert.ErrorIs(t, err, testcase.expectedErr) return } From b48e1141ebdeb82c3e575d3820a92cbd9250bb28 Mon Sep 17 00:00:00 2001 From: Justin Chadwell Date: Thu, 13 Apr 2023 17:12:23 +0100 Subject: [PATCH 6/7] copy: setError should imply Close If sending two messages from goroutine X: a <- 1 b <- 2 And receiving them in goroutine Y: select { case <- a: case <- b: } Either branch of the select can trigger first - so when we call .setError and .Close next to each other, we don't know whether the done channel will close first or the error channel will receive first - so sometimes, we get an incorrect error message. We resolve this by not sending both signals - instead, we can have .setError *imply* .Close, by having the pushWriter call .Close on itself, after receiving an error. Signed-off-by: Justin Chadwell --- core/remotes/docker/pusher.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/remotes/docker/pusher.go b/core/remotes/docker/pusher.go index cca118d39..128d2fd03 100644 --- a/core/remotes/docker/pusher.go +++ b/core/remotes/docker/pusher.go @@ -288,7 +288,6 @@ func (p dockerPusher) push(ctx context.Context, desc ocispec.Descriptor, ref str resp, err := req.doWithRetries(ctx, nil) if err != nil { pushw.setError(err) - pushw.Close() return } @@ -298,7 +297,6 @@ func (p dockerPusher) push(ctx context.Context, desc ocispec.Descriptor, ref str err := remoteserrors.NewUnexpectedStatusErr(resp) log.G(ctx).WithField("resp", resp).WithField("body", string(err.(remoteserrors.ErrUnexpectedStatus).Body)).Debug("unexpected response") pushw.setError(err) - pushw.Close() } pushw.setResponse(resp) }() @@ -431,6 +429,7 @@ func (pw *pushWriter) Write(p []byte) (n int, err error) { select { case <-pw.done: case err = <-pw.errC: + pw.Close() case p := <-pw.pipeC: return 0, pw.replacePipe(p) } @@ -488,6 +487,7 @@ func (pw *pushWriter) Commit(ctx context.Context, size int64, expected digest.Di case <-pw.done: return io.ErrClosedPipe case err := <-pw.errC: + pw.Close() return err case resp = <-pw.respC: defer resp.Body.Close() From a9152ebf895b03f9be4eb00624b0f4aff92396ed Mon Sep 17 00:00:00 2001 From: Justin Chadwell Date: Thu, 11 May 2023 15:44:24 +0100 Subject: [PATCH 7/7] copy: prevent potential deadlock if close before fully written We also need an additional check to avoid setting both the error and response which can create a race where they can arrive in the receiving thread in either order. If we hit an error, we don't need to send the response. > There is a condition where the registry (unexpectedly, not to spec) > returns 201 or 204 on the put before the body is fully written. I would > expect that the http library would issue close and could fall into a > deadlock here. We could just read respC and call setResponse. In that > case ErrClosedPipe would get returned and Commit shouldn't be called > anyway. Signed-off-by: Justin Chadwell --- core/remotes/docker/pusher.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/core/remotes/docker/pusher.go b/core/remotes/docker/pusher.go index 128d2fd03..f994fff5a 100644 --- a/core/remotes/docker/pusher.go +++ b/core/remotes/docker/pusher.go @@ -297,6 +297,7 @@ func (p dockerPusher) push(ctx context.Context, desc ocispec.Descriptor, ref str err := remoteserrors.NewUnexpectedStatusErr(resp) log.G(ctx).WithField("resp", resp).WithField("body", string(err.(remoteserrors.ErrUnexpectedStatus).Body)).Debug("unexpected response") pushw.setError(err) + return } pushw.setResponse(resp) }() @@ -432,6 +433,8 @@ func (pw *pushWriter) Write(p []byte) (n int, err error) { pw.Close() case p := <-pw.pipeC: return 0, pw.replacePipe(p) + case resp := <-pw.respC: + pw.setResponse(resp) } } status.Offset += int64(n)