Merge pull request #3153 from thepwagner/issue-3118
runtime/v1/linux/proc/io: io race
This commit is contained in:
commit
341b99d6e2
@ -27,8 +27,10 @@ import (
|
|||||||
"os/exec"
|
"os/exec"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"sync"
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
"syscall"
|
"syscall"
|
||||||
|
|
||||||
|
"github.com/containerd/containerd/log"
|
||||||
"github.com/containerd/containerd/namespaces"
|
"github.com/containerd/containerd/namespaces"
|
||||||
"github.com/containerd/containerd/runtime/proc"
|
"github.com/containerd/containerd/runtime/proc"
|
||||||
"github.com/containerd/fifo"
|
"github.com/containerd/fifo"
|
||||||
@ -122,7 +124,7 @@ func createIO(ctx context.Context, id string, ioUID, ioGID int, stdio proc.Stdio
|
|||||||
}
|
}
|
||||||
|
|
||||||
func copyPipes(ctx context.Context, rio runc.IO, stdin, stdout, stderr string, wg, cwg *sync.WaitGroup) error {
|
func copyPipes(ctx context.Context, rio runc.IO, stdin, stdout, stderr string, wg, cwg *sync.WaitGroup) error {
|
||||||
var sameFile io.WriteCloser
|
var sameFile *countingWriteCloser
|
||||||
for _, i := range []struct {
|
for _, i := range []struct {
|
||||||
name string
|
name string
|
||||||
dest func(wc io.WriteCloser, rc io.Closer)
|
dest func(wc io.WriteCloser, rc io.Closer)
|
||||||
@ -136,7 +138,9 @@ func copyPipes(ctx context.Context, rio runc.IO, stdin, stdout, stderr string, w
|
|||||||
cwg.Done()
|
cwg.Done()
|
||||||
p := bufPool.Get().(*[]byte)
|
p := bufPool.Get().(*[]byte)
|
||||||
defer bufPool.Put(p)
|
defer bufPool.Put(p)
|
||||||
io.CopyBuffer(wc, rio.Stdout(), *p)
|
if _, err := io.CopyBuffer(wc, rio.Stdout(), *p); err != nil {
|
||||||
|
log.G(ctx).Warn("error copying stdout")
|
||||||
|
}
|
||||||
wg.Done()
|
wg.Done()
|
||||||
wc.Close()
|
wc.Close()
|
||||||
if rc != nil {
|
if rc != nil {
|
||||||
@ -153,7 +157,9 @@ func copyPipes(ctx context.Context, rio runc.IO, stdin, stdout, stderr string, w
|
|||||||
cwg.Done()
|
cwg.Done()
|
||||||
p := bufPool.Get().(*[]byte)
|
p := bufPool.Get().(*[]byte)
|
||||||
defer bufPool.Put(p)
|
defer bufPool.Put(p)
|
||||||
io.CopyBuffer(wc, rio.Stderr(), *p)
|
if _, err := io.CopyBuffer(wc, rio.Stderr(), *p); err != nil {
|
||||||
|
log.G(ctx).Warn("error copying stderr")
|
||||||
|
}
|
||||||
wg.Done()
|
wg.Done()
|
||||||
wc.Close()
|
wc.Close()
|
||||||
if rc != nil {
|
if rc != nil {
|
||||||
@ -180,6 +186,7 @@ func copyPipes(ctx context.Context, rio runc.IO, stdin, stdout, stderr string, w
|
|||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if sameFile != nil {
|
if sameFile != nil {
|
||||||
|
sameFile.count++
|
||||||
i.dest(sameFile, nil)
|
i.dest(sameFile, nil)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@ -187,7 +194,10 @@ func copyPipes(ctx context.Context, rio runc.IO, stdin, stdout, stderr string, w
|
|||||||
return fmt.Errorf("containerd-shim: opening %s failed: %s", i.name, err)
|
return fmt.Errorf("containerd-shim: opening %s failed: %s", i.name, err)
|
||||||
}
|
}
|
||||||
if stdout == stderr {
|
if stdout == stderr {
|
||||||
sameFile = fw
|
sameFile = &countingWriteCloser{
|
||||||
|
WriteCloser: fw,
|
||||||
|
count: 1,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
i.dest(fw, fr)
|
i.dest(fw, fr)
|
||||||
@ -212,6 +222,19 @@ func copyPipes(ctx context.Context, rio runc.IO, stdin, stdout, stderr string, w
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// countingWriteCloser masks io.Closer() until close has been invoked a certain number of times.
|
||||||
|
type countingWriteCloser struct {
|
||||||
|
io.WriteCloser
|
||||||
|
count int64
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *countingWriteCloser) Close() error {
|
||||||
|
if atomic.AddInt64(&c.count, -1) > 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return c.WriteCloser.Close()
|
||||||
|
}
|
||||||
|
|
||||||
// isFifo checks if a file is a fifo
|
// isFifo checks if a file is a fifo
|
||||||
// if the file does not exist then it returns false
|
// if the file does not exist then it returns false
|
||||||
func isFifo(path string) (bool, error) {
|
func isFifo(path string) (bool, error) {
|
||||||
|
Loading…
Reference in New Issue
Block a user