diff --git a/core/remotes/docker/pusher.go b/core/remotes/docker/pusher.go index 9efe553b4..cca118d39 100644 --- a/core/remotes/docker/pusher.go +++ b/core/remotes/docker/pusher.go @@ -380,6 +380,27 @@ func (pw *pushWriter) setResponse(resp *http.Response) { } } +func (pw *pushWriter) replacePipe(p *io.PipeWriter) error { + if pw.pipe == nil { + pw.pipe = p + return nil + } + + pw.pipe.CloseWithError(content.ErrReset) + pw.pipe = p + + // If content has already been written, the bytes + // cannot be written again and the caller must reset + status, err := pw.tracker.GetStatus(pw.ref) + if err != nil { + return err + } + status.Offset = 0 + status.UpdatedAt = time.Now() + pw.tracker.SetStatus(pw.ref, status) + return content.ErrReset +} + func (pw *pushWriter) Write(p []byte) (n int, err error) { status, err := pw.tracker.GetStatus(pw.ref) if err != nil { @@ -391,26 +412,14 @@ func (pw *pushWriter) Write(p []byte) (n int, err error) { case <-pw.done: return 0, io.ErrClosedPipe case p := <-pw.pipeC: - pw.pipe = p + pw.replacePipe(p) } } else { select { case <-pw.done: return 0, io.ErrClosedPipe case p := <-pw.pipeC: - pw.pipe.CloseWithError(content.ErrReset) - pw.pipe = p - - // If content has already been written, the bytes - // cannot be written again and the caller must reset - status, err := pw.tracker.GetStatus(pw.ref) - if err != nil { - return 0, err - } - status.Offset = 0 - status.UpdatedAt = time.Now() - pw.tracker.SetStatus(pw.ref, status) - return 0, content.ErrReset + return 0, pw.replacePipe(p) default: } } @@ -423,19 +432,7 @@ func (pw *pushWriter) Write(p []byte) (n int, err error) { case <-pw.done: case err = <-pw.errC: case p := <-pw.pipeC: - pw.pipe.CloseWithError(content.ErrReset) - pw.pipe = p - - // If content has already been written, the bytes - // cannot be written again and the caller must reset - status, err := pw.tracker.GetStatus(pw.ref) - if err != nil { - return 0, err - } - status.Offset = 0 - status.UpdatedAt = time.Now() - pw.tracker.SetStatus(pw.ref, status) - return 0, content.ErrReset + return 0, pw.replacePipe(p) } } status.Offset += int64(n) @@ -498,19 +495,7 @@ func (pw *pushWriter) Commit(ctx context.Context, size int64, expected digest.Di // check whether the pipe has changed in the commit, because sometimes Write // can complete successfully, but the pipe may have changed. In that case, the // content needs to be reset. - pw.pipe.CloseWithError(content.ErrReset) - pw.pipe = p - - // If content has already been written, the bytes - // cannot be written again and the caller must reset - status, err := pw.tracker.GetStatus(pw.ref) - if err != nil { - return err - } - status.Offset = 0 - status.UpdatedAt = time.Now() - pw.tracker.SetStatus(pw.ref, status) - return content.ErrReset + return pw.replacePipe(p) } // 201 is specified return status, some registries return