pushWriter: refactor reset pipe logic into separate function

Signed-off-by: Justin Chadwell <me@jedevc.com>
This commit is contained in:
Justin Chadwell 2023-04-13 15:44:30 +01:00
parent 9d7641ff3e
commit 651cfa2a2c

View File

@ -380,6 +380,27 @@ func (pw *pushWriter) setResponse(resp *http.Response) {
} }
} }
func (pw *pushWriter) replacePipe(p *io.PipeWriter) error {
if pw.pipe == nil {
pw.pipe = p
return nil
}
pw.pipe.CloseWithError(content.ErrReset)
pw.pipe = p
// If content has already been written, the bytes
// cannot be written again and the caller must reset
status, err := pw.tracker.GetStatus(pw.ref)
if err != nil {
return err
}
status.Offset = 0
status.UpdatedAt = time.Now()
pw.tracker.SetStatus(pw.ref, status)
return content.ErrReset
}
func (pw *pushWriter) Write(p []byte) (n int, err error) { func (pw *pushWriter) Write(p []byte) (n int, err error) {
status, err := pw.tracker.GetStatus(pw.ref) status, err := pw.tracker.GetStatus(pw.ref)
if err != nil { if err != nil {
@ -391,26 +412,14 @@ func (pw *pushWriter) Write(p []byte) (n int, err error) {
case <-pw.done: case <-pw.done:
return 0, io.ErrClosedPipe return 0, io.ErrClosedPipe
case p := <-pw.pipeC: case p := <-pw.pipeC:
pw.pipe = p pw.replacePipe(p)
} }
} else { } else {
select { select {
case <-pw.done: case <-pw.done:
return 0, io.ErrClosedPipe return 0, io.ErrClosedPipe
case p := <-pw.pipeC: case p := <-pw.pipeC:
pw.pipe.CloseWithError(content.ErrReset) return 0, pw.replacePipe(p)
pw.pipe = p
// If content has already been written, the bytes
// cannot be written again and the caller must reset
status, err := pw.tracker.GetStatus(pw.ref)
if err != nil {
return 0, err
}
status.Offset = 0
status.UpdatedAt = time.Now()
pw.tracker.SetStatus(pw.ref, status)
return 0, content.ErrReset
default: default:
} }
} }
@ -423,19 +432,7 @@ func (pw *pushWriter) Write(p []byte) (n int, err error) {
case <-pw.done: case <-pw.done:
case err = <-pw.errC: case err = <-pw.errC:
case p := <-pw.pipeC: case p := <-pw.pipeC:
pw.pipe.CloseWithError(content.ErrReset) return 0, pw.replacePipe(p)
pw.pipe = p
// If content has already been written, the bytes
// cannot be written again and the caller must reset
status, err := pw.tracker.GetStatus(pw.ref)
if err != nil {
return 0, err
}
status.Offset = 0
status.UpdatedAt = time.Now()
pw.tracker.SetStatus(pw.ref, status)
return 0, content.ErrReset
} }
} }
status.Offset += int64(n) status.Offset += int64(n)
@ -498,19 +495,7 @@ func (pw *pushWriter) Commit(ctx context.Context, size int64, expected digest.Di
// check whether the pipe has changed in the commit, because sometimes Write // check whether the pipe has changed in the commit, because sometimes Write
// can complete successfully, but the pipe may have changed. In that case, the // can complete successfully, but the pipe may have changed. In that case, the
// content needs to be reset. // content needs to be reset.
pw.pipe.CloseWithError(content.ErrReset) return pw.replacePipe(p)
pw.pipe = p
// If content has already been written, the bytes
// cannot be written again and the caller must reset
status, err := pw.tracker.GetStatus(pw.ref)
if err != nil {
return err
}
status.Offset = 0
status.UpdatedAt = time.Now()
pw.tracker.SetStatus(pw.ref, status)
return content.ErrReset
} }
// 201 is specified return status, some registries return // 201 is specified return status, some registries return