Merge pull request #836 from crosbymichael/wait-copygroup

Wait for copy routines to be scheduled
This commit is contained in:
Michael Crosby 2017-05-10 15:54:09 -07:00 committed by GitHub
commit 79e389e549
3 changed files with 23 additions and 7 deletions

View File

@ -86,20 +86,22 @@ func newExecProcess(context context.Context, path string, r *shimapi.ExecRequest
e.closers = append(e.closers, sc) e.closers = append(e.closers, sc)
e.stdin = sc e.stdin = sc
} }
var copyWaitGroup sync.WaitGroup
if socket != nil { if socket != nil {
console, err := socket.ReceiveMaster() console, err := socket.ReceiveMaster()
if err != nil { if err != nil {
return nil, err return nil, err
} }
e.console = console e.console = console
if err := copyConsole(context, console, r.Stdin, r.Stdout, r.Stderr, &e.WaitGroup); err != nil { if err := copyConsole(context, console, r.Stdin, r.Stdout, r.Stderr, &e.WaitGroup, &copyWaitGroup); err != nil {
return nil, err return nil, err
} }
} else { } else {
if err := copyPipes(context, io, r.Stdin, r.Stdout, r.Stderr, &e.WaitGroup); err != nil { if err := copyPipes(context, io, r.Stdin, r.Stdout, r.Stderr, &e.WaitGroup, &copyWaitGroup); err != nil {
return nil, err return nil, err
} }
} }
copyWaitGroup.Wait()
pid, err := runc.ReadPidFile(opts.PidFile) pid, err := runc.ReadPidFile(opts.PidFile)
if err != nil { if err != nil {
return nil, err return nil, err

View File

@ -93,20 +93,22 @@ func newInitProcess(context context.Context, path string, r *shimapi.CreateReque
p.stdin = sc p.stdin = sc
p.closers = append(p.closers, sc) p.closers = append(p.closers, sc)
} }
var copyWaitGroup sync.WaitGroup
if socket != nil { if socket != nil {
console, err := socket.ReceiveMaster() console, err := socket.ReceiveMaster()
if err != nil { if err != nil {
return nil, err return nil, err
} }
p.console = console p.console = console
if err := copyConsole(context, console, r.Stdin, r.Stdout, r.Stderr, &p.WaitGroup); err != nil { if err := copyConsole(context, console, r.Stdin, r.Stdout, r.Stderr, &p.WaitGroup, &copyWaitGroup); err != nil {
return nil, err return nil, err
} }
} else { } else {
if err := copyPipes(context, io, r.Stdin, r.Stdout, r.Stderr, &p.WaitGroup); err != nil { if err := copyPipes(context, io, r.Stdin, r.Stdout, r.Stderr, &p.WaitGroup, &copyWaitGroup); err != nil {
return nil, err return nil, err
} }
} }
copyWaitGroup.Wait()
pid, err := runc.ReadPidFile(opts.PidFile) pid, err := runc.ReadPidFile(opts.PidFile)
if err != nil { if err != nil {
return nil, err return nil, err

View File

@ -14,13 +14,17 @@ import (
"github.com/tonistiigi/fifo" "github.com/tonistiigi/fifo"
) )
func copyConsole(ctx context.Context, console console.Console, stdin, stdout, stderr string, wg *sync.WaitGroup) error { func copyConsole(ctx context.Context, console console.Console, stdin, stdout, stderr string, wg, cwg *sync.WaitGroup) error {
if stdin != "" { if stdin != "" {
in, err := fifo.OpenFifo(ctx, stdin, syscall.O_RDONLY, 0) in, err := fifo.OpenFifo(ctx, stdin, syscall.O_RDONLY, 0)
if err != nil { if err != nil {
return err return err
} }
go io.Copy(console, in) cwg.Add(1)
go func() {
cwg.Done()
io.Copy(console, in)
}()
} }
outw, err := fifo.OpenFifo(ctx, stdout, syscall.O_WRONLY, 0) outw, err := fifo.OpenFifo(ctx, stdout, syscall.O_WRONLY, 0)
if err != nil { if err != nil {
@ -31,7 +35,9 @@ func copyConsole(ctx context.Context, console console.Console, stdin, stdout, st
return err return err
} }
wg.Add(1) wg.Add(1)
cwg.Add(1)
go func() { go func() {
cwg.Done()
io.Copy(outw, console) io.Copy(outw, console)
console.Close() console.Close()
outr.Close() outr.Close()
@ -41,11 +47,13 @@ func copyConsole(ctx context.Context, console console.Console, stdin, stdout, st
return nil return nil
} }
func copyPipes(ctx context.Context, rio runc.IO, stdin, stdout, stderr string, wg *sync.WaitGroup) error { func copyPipes(ctx context.Context, rio runc.IO, stdin, stdout, stderr string, wg, cwg *sync.WaitGroup) error {
for name, dest := range map[string]func(wc io.WriteCloser, rc io.Closer){ for name, dest := range map[string]func(wc io.WriteCloser, rc io.Closer){
stdout: func(wc io.WriteCloser, rc io.Closer) { stdout: func(wc io.WriteCloser, rc io.Closer) {
wg.Add(1) wg.Add(1)
cwg.Add(1)
go func() { go func() {
cwg.Done()
io.Copy(wc, rio.Stdout()) io.Copy(wc, rio.Stdout())
wg.Done() wg.Done()
wc.Close() wc.Close()
@ -54,7 +62,9 @@ func copyPipes(ctx context.Context, rio runc.IO, stdin, stdout, stderr string, w
}, },
stderr: func(wc io.WriteCloser, rc io.Closer) { stderr: func(wc io.WriteCloser, rc io.Closer) {
wg.Add(1) wg.Add(1)
cwg.Add(1)
go func() { go func() {
cwg.Done()
io.Copy(wc, rio.Stderr()) io.Copy(wc, rio.Stderr())
wg.Done() wg.Done()
wc.Close() wc.Close()
@ -79,7 +89,9 @@ func copyPipes(ctx context.Context, rio runc.IO, stdin, stdout, stderr string, w
if err != nil { if err != nil {
return fmt.Errorf("containerd-shim: opening %s failed: %s", stdin, err) return fmt.Errorf("containerd-shim: opening %s failed: %s", stdin, err)
} }
cwg.Add(1)
go func() { go func() {
cwg.Done()
io.Copy(rio.Stdin(), f) io.Copy(rio.Stdin(), f)
rio.Stdin().Close() rio.Stdin().Close()
f.Close() f.Close()