From 01493463dbde602f180aedae1634fa781a0f7552 Mon Sep 17 00:00:00 2001 From: Lantao Liu Date: Sun, 12 Nov 2017 07:40:22 +0000 Subject: [PATCH] Fix streaming deadlock. Signed-off-by: Lantao Liu --- pkg/ioutil/read_closer.go | 39 +++++++++++++++++++++------------------ 1 file changed, 21 insertions(+), 18 deletions(-) diff --git a/pkg/ioutil/read_closer.go b/pkg/ioutil/read_closer.go index 39d23be9e..b9d5591e6 100644 --- a/pkg/ioutil/read_closer.go +++ b/pkg/ioutil/read_closer.go @@ -16,39 +16,42 @@ limitations under the License. package ioutil -import ( - "io" - "sync" -) +import "io" // writeCloseInformer wraps a reader with a close function. type wrapReadCloser struct { - // TODO(random-liu): Evaluate whether the lock introduces - // performance regression. - sync.RWMutex - r io.Reader - closed bool + reader *io.PipeReader + writer *io.PipeWriter } // NewWrapReadCloser creates a wrapReadCloser from a reader. +// NOTE(random-liu): To avoid goroutine leakage, the reader passed in +// must be eventually closed by the caller. func NewWrapReadCloser(r io.Reader) io.ReadCloser { - return &wrapReadCloser{r: r} + pr, pw := io.Pipe() + go func() { + _, _ = io.Copy(pw, r) + pr.Close() + pw.Close() + }() + return &wrapReadCloser{ + reader: pr, + writer: pw, + } } // Read reads up to len(p) bytes into p. func (w *wrapReadCloser) Read(p []byte) (int, error) { - w.RLock() - defer w.RUnlock() - if w.closed { - return 0, io.EOF + n, err := w.reader.Read(p) + if err == io.ErrClosedPipe { + return n, io.EOF } - return w.r.Read(p) + return n, err } // Close closes read closer. func (w *wrapReadCloser) Close() error { - w.Lock() - defer w.Unlock() - w.closed = true + w.reader.Close() + w.writer.Close() return nil }