copy: check if writer was closed before setting a pipe
If Close is called externally before a request is attempted, then we will accidentally attempt to send to a closed channel, causing a panic. To avoid this, we can check to see if Close has been called, using a done channel. If this channel is ever done, we drop any incoming errors, requests or pipes - we don't need them, since we're done. Signed-off-by: Justin Chadwell <me@jedevc.com>
This commit is contained in:
parent
4660f63033
commit
91a50f70b7
@ -331,10 +331,12 @@ type pushWriter struct {
|
|||||||
|
|
||||||
pipe *io.PipeWriter
|
pipe *io.PipeWriter
|
||||||
|
|
||||||
pipeC chan *io.PipeWriter
|
done chan struct{}
|
||||||
respC chan *http.Response
|
|
||||||
closeOnce sync.Once
|
closeOnce sync.Once
|
||||||
errC chan error
|
|
||||||
|
pipeC chan *io.PipeWriter
|
||||||
|
respC chan *http.Response
|
||||||
|
errC chan error
|
||||||
|
|
||||||
isManifest bool
|
isManifest bool
|
||||||
|
|
||||||
@ -352,19 +354,30 @@ func newPushWriter(db *dockerBase, ref string, expected digest.Digest, tracker S
|
|||||||
pipeC: make(chan *io.PipeWriter, 1),
|
pipeC: make(chan *io.PipeWriter, 1),
|
||||||
respC: make(chan *http.Response, 1),
|
respC: make(chan *http.Response, 1),
|
||||||
errC: make(chan error, 1),
|
errC: make(chan error, 1),
|
||||||
|
done: make(chan struct{}),
|
||||||
isManifest: isManifest,
|
isManifest: isManifest,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pw *pushWriter) setPipe(p *io.PipeWriter) {
|
func (pw *pushWriter) setPipe(p *io.PipeWriter) {
|
||||||
pw.pipeC <- p
|
select {
|
||||||
|
case <-pw.done:
|
||||||
|
case pw.pipeC <- p:
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pw *pushWriter) setError(err error) {
|
func (pw *pushWriter) setError(err error) {
|
||||||
pw.errC <- err
|
select {
|
||||||
|
case <-pw.done:
|
||||||
|
case pw.errC <- err:
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pw *pushWriter) setResponse(resp *http.Response) {
|
func (pw *pushWriter) setResponse(resp *http.Response) {
|
||||||
pw.respC <- resp
|
select {
|
||||||
|
case <-pw.done:
|
||||||
|
case pw.respC <- resp:
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pw *pushWriter) Write(p []byte) (n int, err error) {
|
func (pw *pushWriter) Write(p []byte) (n int, err error) {
|
||||||
@ -374,22 +387,26 @@ func (pw *pushWriter) Write(p []byte) (n int, err error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if pw.pipe == nil {
|
if pw.pipe == nil {
|
||||||
p, ok := <-pw.pipeC
|
select {
|
||||||
if !ok {
|
case <-pw.done:
|
||||||
return 0, io.ErrClosedPipe
|
return 0, io.ErrClosedPipe
|
||||||
|
case p := <-pw.pipeC:
|
||||||
|
pw.pipe = p
|
||||||
}
|
}
|
||||||
pw.pipe = p
|
|
||||||
} else {
|
} else {
|
||||||
select {
|
select {
|
||||||
case p, ok := <-pw.pipeC:
|
case <-pw.done:
|
||||||
if !ok {
|
return 0, io.ErrClosedPipe
|
||||||
return 0, io.ErrClosedPipe
|
case p := <-pw.pipeC:
|
||||||
}
|
|
||||||
pw.pipe.CloseWithError(content.ErrReset)
|
pw.pipe.CloseWithError(content.ErrReset)
|
||||||
pw.pipe = p
|
pw.pipe = p
|
||||||
|
|
||||||
// If content has already been written, the bytes
|
// If content has already been written, the bytes
|
||||||
// cannot be written and the caller must reset
|
// cannot be written again and the caller must reset
|
||||||
|
status, err := pw.tracker.GetStatus(pw.ref)
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
status.Offset = 0
|
status.Offset = 0
|
||||||
status.UpdatedAt = time.Now()
|
status.UpdatedAt = time.Now()
|
||||||
pw.tracker.SetStatus(pw.ref, status)
|
pw.tracker.SetStatus(pw.ref, status)
|
||||||
@ -418,7 +435,7 @@ func (pw *pushWriter) Close() error {
|
|||||||
// Ensure pipeC is closed but handle `Close()` being
|
// Ensure pipeC is closed but handle `Close()` being
|
||||||
// called multiple times without panicking
|
// called multiple times without panicking
|
||||||
pw.closeOnce.Do(func() {
|
pw.closeOnce.Do(func() {
|
||||||
close(pw.pipeC)
|
close(pw.done)
|
||||||
})
|
})
|
||||||
if pw.pipe != nil {
|
if pw.pipe != nil {
|
||||||
status, err := pw.tracker.GetStatus(pw.ref)
|
status, err := pw.tracker.GetStatus(pw.ref)
|
||||||
@ -458,17 +475,16 @@ func (pw *pushWriter) Commit(ctx context.Context, size int64, expected digest.Di
|
|||||||
// TODO: timeout waiting for response
|
// TODO: timeout waiting for response
|
||||||
var resp *http.Response
|
var resp *http.Response
|
||||||
select {
|
select {
|
||||||
|
case <-pw.done:
|
||||||
|
return io.ErrClosedPipe
|
||||||
case err := <-pw.errC:
|
case err := <-pw.errC:
|
||||||
return err
|
return err
|
||||||
case resp = <-pw.respC:
|
case resp = <-pw.respC:
|
||||||
defer resp.Body.Close()
|
defer resp.Body.Close()
|
||||||
case p, ok := <-pw.pipeC:
|
case p := <-pw.pipeC:
|
||||||
// check whether the pipe has changed in the commit, because sometimes Write
|
// check whether the pipe has changed in the commit, because sometimes Write
|
||||||
// can complete successfully, but the pipe may have changed. In that case, the
|
// can complete successfully, but the pipe may have changed. In that case, the
|
||||||
// content needs to be reset.
|
// content needs to be reset.
|
||||||
if !ok {
|
|
||||||
return io.ErrClosedPipe
|
|
||||||
}
|
|
||||||
pw.pipe.CloseWithError(content.ErrReset)
|
pw.pipe.CloseWithError(content.ErrReset)
|
||||||
pw.pipe = p
|
pw.pipe = p
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user