Fix panic when bufio Reader called in 2 goroutines

A panic was seen related to the buffer being reset in
one goroutine while being read in another. In the case
of pigz an early cancellation will cause the reader to
close, resetting the buffer and signaling the process
to shut down, but races since the process must stop
reading before the reset otherwise the a panic may occur.
This fix guarantees that the bufio is always reset and
returned to the pool on the same goroutine that is
doing the read. If a buffer is not fully read the
buffered reader should just be discarded and not
returned back to the pool.

Signed-off-by: Derek McGowan <derek@mcgstyle.net>
This commit is contained in:
Derek McGowan 2018-09-26 18:11:30 -07:00
parent 4b1d56e240
commit db358a9fd2
No known key found for this signature in database
GPG Key ID: F58C5D0A4405ACDB

View File

@ -92,6 +92,36 @@ func (w *writeCloserWrapper) Close() error {
return nil
}
type bufferedReader struct {
buf *bufio.Reader
}
func newBufferedReader(r io.Reader) *bufferedReader {
buf := bufioReader32KPool.Get().(*bufio.Reader)
buf.Reset(r)
return &bufferedReader{buf}
}
func (r *bufferedReader) Read(p []byte) (n int, err error) {
if r.buf == nil {
return 0, io.EOF
}
n, err = r.buf.Read(p)
if err == io.EOF {
r.buf.Reset(nil)
bufioReader32KPool.Put(r.buf)
r.buf = nil
}
return
}
func (r *bufferedReader) Peek(n int) ([]byte, error) {
if r.buf == nil {
return nil, io.EOF
}
return r.buf.Peek(n)
}
// DetectCompression detects the compression algorithm of the source.
func DetectCompression(source []byte) Compression {
for compression, m := range map[Compression][]byte{
@ -110,8 +140,7 @@ func DetectCompression(source []byte) Compression {
// DecompressStream decompresses the archive and returns a ReaderCloser with the decompressed archive.
func DecompressStream(archive io.Reader) (DecompressReadCloser, error) {
buf := bufioReader32KPool.Get().(*bufio.Reader)
buf.Reset(archive)
buf := newBufferedReader(archive)
bs, err := buf.Peek(10)
if err != nil && err != io.EOF {
// Note: we'll ignore any io.EOF error because there are some odd
@ -123,15 +152,12 @@ func DecompressStream(archive io.Reader) (DecompressReadCloser, error) {
return nil, err
}
closer := func() error {
buf.Reset(nil)
bufioReader32KPool.Put(buf)
return nil
}
switch compression := DetectCompression(bs); compression {
case Uncompressed:
readBufWrapper := &readCloserWrapper{buf, compression, closer}
return readBufWrapper, nil
return &readCloserWrapper{
Reader: buf,
compression: compression,
}, nil
case Gzip:
ctx, cancel := context.WithCancel(context.Background())
gzReader, err := gzipDecompress(ctx, buf)
@ -140,12 +166,15 @@ func DecompressStream(archive io.Reader) (DecompressReadCloser, error) {
return nil, err
}
readBufWrapper := &readCloserWrapper{gzReader, compression, func() error {
cancel()
return closer()
}}
return &readCloserWrapper{
Reader: gzReader,
compression: compression,
closer: func() error {
cancel()
return gzReader.Close()
},
}, nil
return readBufWrapper, nil
default:
return nil, fmt.Errorf("unsupported compression format %s", (&compression).Extension())
}