Merge pull request #8379 from jedevc/docker-pusher-concurrency
Fix various timing issues with docker pusher
This commit is contained in:
commit
b8654e36f4
@ -31,9 +31,6 @@ import (
|
|||||||
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
|
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
|
||||||
)
|
)
|
||||||
|
|
||||||
// maxResets is the no.of times the Copy() method can tolerate a reset of the body
|
|
||||||
const maxResets = 5
|
|
||||||
|
|
||||||
var ErrReset = errors.New("writer has been reset")
|
var ErrReset = errors.New("writer has been reset")
|
||||||
|
|
||||||
var bufPool = sync.Pool{
|
var bufPool = sync.Pool{
|
||||||
@ -149,7 +146,7 @@ func OpenWriter(ctx context.Context, cs Ingester, opts ...WriterOpt) (Writer, er
|
|||||||
// Copy is buffered, so no need to wrap reader in buffered io.
|
// Copy is buffered, so no need to wrap reader in buffered io.
|
||||||
func Copy(ctx context.Context, cw Writer, or io.Reader, size int64, expected digest.Digest, opts ...Opt) error {
|
func Copy(ctx context.Context, cw Writer, or io.Reader, size int64, expected digest.Digest, opts ...Opt) error {
|
||||||
r := or
|
r := or
|
||||||
for i := 0; i < maxResets; i++ {
|
for i := 0; ; i++ {
|
||||||
if i >= 1 {
|
if i >= 1 {
|
||||||
log.G(ctx).WithField("digest", expected).Debugf("retrying copy due to reset")
|
log.G(ctx).WithField("digest", expected).Debugf("retrying copy due to reset")
|
||||||
}
|
}
|
||||||
@ -189,9 +186,6 @@ func Copy(ctx context.Context, cw Writer, or io.Reader, size int64, expected dig
|
|||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
log.G(ctx).WithField("digest", expected).Errorf("failed to copy after %d retries", maxResets)
|
|
||||||
return fmt.Errorf("failed to copy after %d retries", maxResets)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// CopyReaderAt copies to a writer from a given reader at for the given
|
// CopyReaderAt copies to a writer from a given reader at for the given
|
||||||
|
@ -20,7 +20,7 @@ import (
|
|||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
_ "crypto/sha256" // required by go-digest
|
_ "crypto/sha256" // required by go-digest
|
||||||
"fmt"
|
"errors"
|
||||||
"io"
|
"io"
|
||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
@ -42,7 +42,7 @@ func TestCopy(t *testing.T) {
|
|||||||
cf1 := func(buf *bytes.Buffer, st Status) commitFunction {
|
cf1 := func(buf *bytes.Buffer, st Status) commitFunction {
|
||||||
i := 0
|
i := 0
|
||||||
return func() error {
|
return func() error {
|
||||||
// function resets the first time
|
// function resets the first time, but then succeeds after
|
||||||
if i == 0 {
|
if i == 0 {
|
||||||
// this is the case where, the pipewriter to which the data was being written has
|
// this is the case where, the pipewriter to which the data was being written has
|
||||||
// changed. which means we need to clear the buffer
|
// changed. which means we need to clear the buffer
|
||||||
@ -55,11 +55,28 @@ func TestCopy(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
cf2err := errors.New("commit failed")
|
||||||
cf2 := func(buf *bytes.Buffer, st Status) commitFunction {
|
cf2 := func(buf *bytes.Buffer, st Status) commitFunction {
|
||||||
i := 0
|
i := 0
|
||||||
return func() error {
|
return func() error {
|
||||||
// function resets for more than the maxReset value
|
// function resets a lot of times, and eventually fails
|
||||||
if i < maxResets+1 {
|
if i < 10 {
|
||||||
|
// this is the case where, the pipewriter to which the data was being written has
|
||||||
|
// changed. which means we need to clear the buffer
|
||||||
|
i++
|
||||||
|
buf.Reset()
|
||||||
|
st.Offset = 0
|
||||||
|
return ErrReset
|
||||||
|
}
|
||||||
|
return cf2err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
cf3 := func(buf *bytes.Buffer, st Status) commitFunction {
|
||||||
|
i := 0
|
||||||
|
return func() error {
|
||||||
|
// function resets a lot of times, and eventually succeeds
|
||||||
|
if i < 10 {
|
||||||
// this is the case where, the pipewriter to which the data was being written has
|
// this is the case where, the pipewriter to which the data was being written has
|
||||||
// changed. which means we need to clear the buffer
|
// changed. which means we need to clear the buffer
|
||||||
i++
|
i++
|
||||||
@ -73,8 +90,10 @@ func TestCopy(t *testing.T) {
|
|||||||
|
|
||||||
s1 := Status{}
|
s1 := Status{}
|
||||||
s2 := Status{}
|
s2 := Status{}
|
||||||
|
s3 := Status{}
|
||||||
b1 := bytes.Buffer{}
|
b1 := bytes.Buffer{}
|
||||||
b2 := bytes.Buffer{}
|
b2 := bytes.Buffer{}
|
||||||
|
b3 := bytes.Buffer{}
|
||||||
|
|
||||||
var testcases = []struct {
|
var testcases = []struct {
|
||||||
name string
|
name string
|
||||||
@ -130,7 +149,7 @@ func TestCopy(t *testing.T) {
|
|||||||
expected: "content to copy",
|
expected: "content to copy",
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "write fails more than maxReset times due to reset",
|
name: "write fails after lots of resets",
|
||||||
source: newCopySource("content to copy"),
|
source: newCopySource("content to copy"),
|
||||||
writer: fakeWriter{
|
writer: fakeWriter{
|
||||||
Buffer: &b2,
|
Buffer: &b2,
|
||||||
@ -138,7 +157,17 @@ func TestCopy(t *testing.T) {
|
|||||||
commitFunc: cf2(&b2, s2),
|
commitFunc: cf2(&b2, s2),
|
||||||
},
|
},
|
||||||
expected: "",
|
expected: "",
|
||||||
expectedErr: fmt.Errorf("failed to copy after %d retries", maxResets),
|
expectedErr: cf2err,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "write succeeds after lots of resets",
|
||||||
|
source: newCopySource("content to copy"),
|
||||||
|
writer: fakeWriter{
|
||||||
|
Buffer: &b3,
|
||||||
|
status: s3,
|
||||||
|
commitFunc: cf3(&b3, s3),
|
||||||
|
},
|
||||||
|
expected: "content to copy",
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -153,7 +182,7 @@ func TestCopy(t *testing.T) {
|
|||||||
|
|
||||||
// if an error is expected then further comparisons are not required
|
// if an error is expected then further comparisons are not required
|
||||||
if testcase.expectedErr != nil {
|
if testcase.expectedErr != nil {
|
||||||
assert.Equal(t, testcase.expectedErr, err)
|
assert.ErrorIs(t, err, testcase.expectedErr)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -280,7 +280,7 @@ func (p dockerPusher) push(ctx context.Context, desc ocispec.Descriptor, ref str
|
|||||||
req.body = func() (io.ReadCloser, error) {
|
req.body = func() (io.ReadCloser, error) {
|
||||||
pr, pw := io.Pipe()
|
pr, pw := io.Pipe()
|
||||||
pushw.setPipe(pw)
|
pushw.setPipe(pw)
|
||||||
return io.NopCloser(pr), nil
|
return pr, nil
|
||||||
}
|
}
|
||||||
req.size = desc.Size
|
req.size = desc.Size
|
||||||
|
|
||||||
@ -288,7 +288,6 @@ func (p dockerPusher) push(ctx context.Context, desc ocispec.Descriptor, ref str
|
|||||||
resp, err := req.doWithRetries(ctx, nil)
|
resp, err := req.doWithRetries(ctx, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
pushw.setError(err)
|
pushw.setError(err)
|
||||||
pushw.Close()
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -298,7 +297,7 @@ func (p dockerPusher) push(ctx context.Context, desc ocispec.Descriptor, ref str
|
|||||||
err := remoteserrors.NewUnexpectedStatusErr(resp)
|
err := remoteserrors.NewUnexpectedStatusErr(resp)
|
||||||
log.G(ctx).WithField("resp", resp).WithField("body", string(err.(remoteserrors.ErrUnexpectedStatus).Body)).Debug("unexpected response")
|
log.G(ctx).WithField("resp", resp).WithField("body", string(err.(remoteserrors.ErrUnexpectedStatus).Body)).Debug("unexpected response")
|
||||||
pushw.setError(err)
|
pushw.setError(err)
|
||||||
pushw.Close()
|
return
|
||||||
}
|
}
|
||||||
pushw.setResponse(resp)
|
pushw.setResponse(resp)
|
||||||
}()
|
}()
|
||||||
@ -331,10 +330,12 @@ type pushWriter struct {
|
|||||||
|
|
||||||
pipe *io.PipeWriter
|
pipe *io.PipeWriter
|
||||||
|
|
||||||
pipeC chan *io.PipeWriter
|
done chan struct{}
|
||||||
respC chan *http.Response
|
|
||||||
closeOnce sync.Once
|
closeOnce sync.Once
|
||||||
errC chan error
|
|
||||||
|
pipeC chan *io.PipeWriter
|
||||||
|
respC chan *http.Response
|
||||||
|
errC chan error
|
||||||
|
|
||||||
isManifest bool
|
isManifest bool
|
||||||
|
|
||||||
@ -352,19 +353,51 @@ func newPushWriter(db *dockerBase, ref string, expected digest.Digest, tracker S
|
|||||||
pipeC: make(chan *io.PipeWriter, 1),
|
pipeC: make(chan *io.PipeWriter, 1),
|
||||||
respC: make(chan *http.Response, 1),
|
respC: make(chan *http.Response, 1),
|
||||||
errC: make(chan error, 1),
|
errC: make(chan error, 1),
|
||||||
|
done: make(chan struct{}),
|
||||||
isManifest: isManifest,
|
isManifest: isManifest,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pw *pushWriter) setPipe(p *io.PipeWriter) {
|
func (pw *pushWriter) setPipe(p *io.PipeWriter) {
|
||||||
pw.pipeC <- p
|
select {
|
||||||
|
case <-pw.done:
|
||||||
|
case pw.pipeC <- p:
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pw *pushWriter) setError(err error) {
|
func (pw *pushWriter) setError(err error) {
|
||||||
pw.errC <- err
|
select {
|
||||||
|
case <-pw.done:
|
||||||
|
case pw.errC <- err:
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pw *pushWriter) setResponse(resp *http.Response) {
|
func (pw *pushWriter) setResponse(resp *http.Response) {
|
||||||
pw.respC <- resp
|
select {
|
||||||
|
case <-pw.done:
|
||||||
|
case pw.respC <- resp:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pw *pushWriter) replacePipe(p *io.PipeWriter) error {
|
||||||
|
if pw.pipe == nil {
|
||||||
|
pw.pipe = p
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
pw.pipe.CloseWithError(content.ErrReset)
|
||||||
|
pw.pipe = p
|
||||||
|
|
||||||
|
// If content has already been written, the bytes
|
||||||
|
// cannot be written again and the caller must reset
|
||||||
|
status, err := pw.tracker.GetStatus(pw.ref)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
status.Offset = 0
|
||||||
|
status.UpdatedAt = time.Now()
|
||||||
|
pw.tracker.SetStatus(pw.ref, status)
|
||||||
|
return content.ErrReset
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pw *pushWriter) Write(p []byte) (n int, err error) {
|
func (pw *pushWriter) Write(p []byte) (n int, err error) {
|
||||||
@ -374,26 +407,18 @@ func (pw *pushWriter) Write(p []byte) (n int, err error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if pw.pipe == nil {
|
if pw.pipe == nil {
|
||||||
p, ok := <-pw.pipeC
|
select {
|
||||||
if !ok {
|
case <-pw.done:
|
||||||
return 0, io.ErrClosedPipe
|
return 0, io.ErrClosedPipe
|
||||||
|
case p := <-pw.pipeC:
|
||||||
|
pw.replacePipe(p)
|
||||||
}
|
}
|
||||||
pw.pipe = p
|
|
||||||
} else {
|
} else {
|
||||||
select {
|
select {
|
||||||
case p, ok := <-pw.pipeC:
|
case <-pw.done:
|
||||||
if !ok {
|
return 0, io.ErrClosedPipe
|
||||||
return 0, io.ErrClosedPipe
|
case p := <-pw.pipeC:
|
||||||
}
|
return 0, pw.replacePipe(p)
|
||||||
pw.pipe.CloseWithError(content.ErrReset)
|
|
||||||
pw.pipe = p
|
|
||||||
|
|
||||||
// If content has already been written, the bytes
|
|
||||||
// cannot be written and the caller must reset
|
|
||||||
status.Offset = 0
|
|
||||||
status.UpdatedAt = time.Now()
|
|
||||||
pw.tracker.SetStatus(pw.ref, status)
|
|
||||||
return 0, content.ErrReset
|
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -403,9 +428,13 @@ func (pw *pushWriter) Write(p []byte) (n int, err error) {
|
|||||||
// if the pipe is closed, we might have the original error on the error
|
// if the pipe is closed, we might have the original error on the error
|
||||||
// channel - so we should try and get it
|
// channel - so we should try and get it
|
||||||
select {
|
select {
|
||||||
case err2 := <-pw.errC:
|
case <-pw.done:
|
||||||
err = err2
|
case err = <-pw.errC:
|
||||||
default:
|
pw.Close()
|
||||||
|
case p := <-pw.pipeC:
|
||||||
|
return 0, pw.replacePipe(p)
|
||||||
|
case resp := <-pw.respC:
|
||||||
|
pw.setResponse(resp)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
status.Offset += int64(n)
|
status.Offset += int64(n)
|
||||||
@ -418,7 +447,7 @@ func (pw *pushWriter) Close() error {
|
|||||||
// Ensure pipeC is closed but handle `Close()` being
|
// Ensure pipeC is closed but handle `Close()` being
|
||||||
// called multiple times without panicking
|
// called multiple times without panicking
|
||||||
pw.closeOnce.Do(func() {
|
pw.closeOnce.Do(func() {
|
||||||
close(pw.pipeC)
|
close(pw.done)
|
||||||
})
|
})
|
||||||
if pw.pipe != nil {
|
if pw.pipe != nil {
|
||||||
status, err := pw.tracker.GetStatus(pw.ref)
|
status, err := pw.tracker.GetStatus(pw.ref)
|
||||||
@ -458,30 +487,18 @@ func (pw *pushWriter) Commit(ctx context.Context, size int64, expected digest.Di
|
|||||||
// TODO: timeout waiting for response
|
// TODO: timeout waiting for response
|
||||||
var resp *http.Response
|
var resp *http.Response
|
||||||
select {
|
select {
|
||||||
|
case <-pw.done:
|
||||||
|
return io.ErrClosedPipe
|
||||||
case err := <-pw.errC:
|
case err := <-pw.errC:
|
||||||
|
pw.Close()
|
||||||
return err
|
return err
|
||||||
case resp = <-pw.respC:
|
case resp = <-pw.respC:
|
||||||
defer resp.Body.Close()
|
defer resp.Body.Close()
|
||||||
case p, ok := <-pw.pipeC:
|
case p := <-pw.pipeC:
|
||||||
// check whether the pipe has changed in the commit, because sometimes Write
|
// check whether the pipe has changed in the commit, because sometimes Write
|
||||||
// can complete successfully, but the pipe may have changed. In that case, the
|
// can complete successfully, but the pipe may have changed. In that case, the
|
||||||
// content needs to be reset.
|
// content needs to be reset.
|
||||||
if !ok {
|
return pw.replacePipe(p)
|
||||||
return io.ErrClosedPipe
|
|
||||||
}
|
|
||||||
pw.pipe.CloseWithError(content.ErrReset)
|
|
||||||
pw.pipe = p
|
|
||||||
|
|
||||||
// If content has already been written, the bytes
|
|
||||||
// cannot be written again and the caller must reset
|
|
||||||
status, err := pw.tracker.GetStatus(pw.ref)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
status.Offset = 0
|
|
||||||
status.UpdatedAt = time.Now()
|
|
||||||
pw.tracker.SetStatus(pw.ref, status)
|
|
||||||
return content.ErrReset
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// 201 is specified return status, some registries return
|
// 201 is specified return status, some registries return
|
||||||
|
Loading…
Reference in New Issue
Block a user