From e96ac2040db90a7d9e4225b78ccc6bff373f8aeb Mon Sep 17 00:00:00 2001 From: Peter Wagner Date: Thu, 28 Mar 2019 12:59:48 -0700 Subject: [PATCH 1/2] runtime: log IO error when copying output streams Signed-off-by: Peter Wagner --- runtime/v1/linux/proc/io.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/runtime/v1/linux/proc/io.go b/runtime/v1/linux/proc/io.go index b37c3c6aa..34f6800e2 100644 --- a/runtime/v1/linux/proc/io.go +++ b/runtime/v1/linux/proc/io.go @@ -29,6 +29,7 @@ import ( "sync" "syscall" + "github.com/containerd/containerd/log" "github.com/containerd/containerd/namespaces" "github.com/containerd/containerd/runtime/proc" "github.com/containerd/fifo" @@ -136,7 +137,9 @@ func copyPipes(ctx context.Context, rio runc.IO, stdin, stdout, stderr string, w cwg.Done() p := bufPool.Get().(*[]byte) defer bufPool.Put(p) - io.CopyBuffer(wc, rio.Stdout(), *p) + if _, err := io.CopyBuffer(wc, rio.Stdout(), *p); err != nil { + log.G(ctx).Warn("error copying stdout") + } wg.Done() wc.Close() if rc != nil { @@ -153,7 +156,9 @@ func copyPipes(ctx context.Context, rio runc.IO, stdin, stdout, stderr string, w cwg.Done() p := bufPool.Get().(*[]byte) defer bufPool.Put(p) - io.CopyBuffer(wc, rio.Stderr(), *p) + if _, err := io.CopyBuffer(wc, rio.Stderr(), *p); err != nil { + log.G(ctx).Warn("error copying stderr") + } wg.Done() wc.Close() if rc != nil { From ae04c16607840a4d4977d9db19057ca74cfd4762 Mon Sep 17 00:00:00 2001 From: Peter Wagner Date: Thu, 28 Mar 2019 13:34:32 -0700 Subject: [PATCH 2/2] runtime: guard Close() until both streams are complete Signed-off-by: Peter Wagner --- runtime/v1/linux/proc/io.go | 22 ++++++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/runtime/v1/linux/proc/io.go b/runtime/v1/linux/proc/io.go index 34f6800e2..15f6ed8ca 100644 --- a/runtime/v1/linux/proc/io.go +++ b/runtime/v1/linux/proc/io.go @@ -27,6 +27,7 @@ import ( "os/exec" "path/filepath" "sync" + "sync/atomic" "syscall" "github.com/containerd/containerd/log" @@ -123,7 +124,7 @@ func createIO(ctx context.Context, id string, ioUID, ioGID int, stdio proc.Stdio } func copyPipes(ctx context.Context, rio runc.IO, stdin, stdout, stderr string, wg, cwg *sync.WaitGroup) error { - var sameFile io.WriteCloser + var sameFile *countingWriteCloser for _, i := range []struct { name string dest func(wc io.WriteCloser, rc io.Closer) @@ -185,6 +186,7 @@ func copyPipes(ctx context.Context, rio runc.IO, stdin, stdout, stderr string, w } } else { if sameFile != nil { + sameFile.count++ i.dest(sameFile, nil) continue } @@ -192,7 +194,10 @@ func copyPipes(ctx context.Context, rio runc.IO, stdin, stdout, stderr string, w return fmt.Errorf("containerd-shim: opening %s failed: %s", i.name, err) } if stdout == stderr { - sameFile = fw + sameFile = &countingWriteCloser{ + WriteCloser: fw, + count: 1, + } } } i.dest(fw, fr) @@ -217,6 +222,19 @@ func copyPipes(ctx context.Context, rio runc.IO, stdin, stdout, stderr string, w return nil } +// countingWriteCloser masks io.Closer() until close has been invoked a certain number of times. +type countingWriteCloser struct { + io.WriteCloser + count int64 +} + +func (c *countingWriteCloser) Close() error { + if atomic.AddInt64(&c.count, -1) > 0 { + return nil + } + return c.WriteCloser.Close() +} + // isFifo checks if a file is a fifo // if the file does not exist then it returns false func isFifo(path string) (bool, error) {