diff --git a/core/remotes/docker/pusher.go b/core/remotes/docker/pusher.go index d38b0c70f..27f5a94d7 100644 --- a/core/remotes/docker/pusher.go +++ b/core/remotes/docker/pusher.go @@ -331,10 +331,12 @@ type pushWriter struct { pipe *io.PipeWriter - pipeC chan *io.PipeWriter - respC chan *http.Response + done chan struct{} closeOnce sync.Once - errC chan error + + pipeC chan *io.PipeWriter + respC chan *http.Response + errC chan error isManifest bool @@ -352,19 +354,30 @@ func newPushWriter(db *dockerBase, ref string, expected digest.Digest, tracker S pipeC: make(chan *io.PipeWriter, 1), respC: make(chan *http.Response, 1), errC: make(chan error, 1), + done: make(chan struct{}), isManifest: isManifest, } } func (pw *pushWriter) setPipe(p *io.PipeWriter) { - pw.pipeC <- p + select { + case <-pw.done: + case pw.pipeC <- p: + } } func (pw *pushWriter) setError(err error) { - pw.errC <- err + select { + case <-pw.done: + case pw.errC <- err: + } } + func (pw *pushWriter) setResponse(resp *http.Response) { - pw.respC <- resp + select { + case <-pw.done: + case pw.respC <- resp: + } } func (pw *pushWriter) Write(p []byte) (n int, err error) { @@ -374,22 +387,26 @@ func (pw *pushWriter) Write(p []byte) (n int, err error) { } if pw.pipe == nil { - p, ok := <-pw.pipeC - if !ok { + select { + case <-pw.done: return 0, io.ErrClosedPipe + case p := <-pw.pipeC: + pw.pipe = p } - pw.pipe = p } else { select { - case p, ok := <-pw.pipeC: - if !ok { - return 0, io.ErrClosedPipe - } + case <-pw.done: + return 0, io.ErrClosedPipe + case p := <-pw.pipeC: pw.pipe.CloseWithError(content.ErrReset) pw.pipe = p // If content has already been written, the bytes - // cannot be written and the caller must reset + // cannot be written again and the caller must reset + status, err := pw.tracker.GetStatus(pw.ref) + if err != nil { + return 0, err + } status.Offset = 0 status.UpdatedAt = time.Now() pw.tracker.SetStatus(pw.ref, status) @@ -418,7 +435,7 @@ func (pw *pushWriter) Close() error { // Ensure pipeC is closed but handle `Close()` being // called multiple times without panicking pw.closeOnce.Do(func() { - close(pw.pipeC) + close(pw.done) }) if pw.pipe != nil { status, err := pw.tracker.GetStatus(pw.ref) @@ -458,17 +475,16 @@ func (pw *pushWriter) Commit(ctx context.Context, size int64, expected digest.Di // TODO: timeout waiting for response var resp *http.Response select { + case <-pw.done: + return io.ErrClosedPipe case err := <-pw.errC: return err case resp = <-pw.respC: defer resp.Body.Close() - case p, ok := <-pw.pipeC: + case p := <-pw.pipeC: // check whether the pipe has changed in the commit, because sometimes Write // can complete successfully, but the pipe may have changed. In that case, the // content needs to be reset. - if !ok { - return io.ErrClosedPipe - } pw.pipe.CloseWithError(content.ErrReset) pw.pipe = p