From f79ec5b55ffdc43672596ba126d51beb0aba5b84 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Wed, 6 Dec 2017 15:56:46 -0500 Subject: [PATCH 1/5] Cleanup cio.FIFOSet interface Remove duplication with cio.Config unexport newFIFOSetTempDir() since it includes hardcoded paths Expose os.RemoveAll() as part of FIFOSet instead of a Dir Signed-off-by: Daniel Nephin --- cio/io.go | 75 ++++++++++++++++++++--------------------------- cio/io_unix.go | 57 +++++++++++++++-------------------- cio/io_windows.go | 34 ++++++++++----------- container.go | 28 ++++++++++-------- 4 files changed, 89 insertions(+), 105 deletions(-) diff --git a/cio/io.go b/cio/io.go index c5e112d9c..343e57b79 100644 --- a/cio/io.go +++ b/cio/io.go @@ -75,68 +75,70 @@ type Creation func(id string) (IO, error) // will be sent only to the first reads type Attach func(*FIFOSet) (IO, error) +// FIFOSet is a set of file paths to FIFOs for a task's standard IO streams +type FIFOSet struct { + Config + close func() error +} + +func (f *FIFOSet) Close() error { + if f.close != nil { + return f.close() + } + return nil +} + +// NewFIFOSet returns a new FIFOSet from a Config and a close function +func NewFIFOSet(config Config, close func() error) *FIFOSet { + return &FIFOSet{Config: config, close: close} +} + // NewIO returns an Creation that will provide IO sets without a terminal func NewIO(stdin io.Reader, stdout, stderr io.Writer) Creation { return NewIOWithTerminal(stdin, stdout, stderr, false) } -// NewIOWithTerminal creates a new io set with the provied io.Reader/Writers for use with a terminal +// NewIOWithTerminal creates a new io set with the provided io.Reader/Writers for use with a terminal func NewIOWithTerminal(stdin io.Reader, stdout, stderr io.Writer, terminal bool) Creation { return func(id string) (_ IO, err error) { - paths, err := NewFifos(id) + fifos, err := newFIFOSetTempDir(id) if err != nil { return nil, err } defer func() { - if err != nil && paths.Dir != "" { - os.RemoveAll(paths.Dir) + if err != nil { + fifos.Close() } }() - cfg := Config{ - Terminal: terminal, - Stdout: paths.Out, - Stderr: paths.Err, - Stdin: paths.In, - } + cfg := fifos.Config + cfg.Terminal = terminal i := &cio{config: cfg} set := &ioSet{ in: stdin, out: stdout, err: stderr, } - closer, err := copyIO(paths, set, cfg.Terminal) - if err != nil { - return nil, err - } + closer, err := copyIO(fifos, set, cfg.Terminal) i.closer = closer - return i, nil + return i, err } } // WithAttach attaches the existing io for a task to the provided io.Reader/Writers func WithAttach(stdin io.Reader, stdout, stderr io.Writer) Attach { - return func(paths *FIFOSet) (IO, error) { - if paths == nil { + return func(fifos *FIFOSet) (IO, error) { + if fifos == nil { return nil, fmt.Errorf("cannot attach to existing fifos") } - cfg := Config{ - Terminal: paths.Terminal, - Stdout: paths.Out, - Stderr: paths.Err, - Stdin: paths.In, - } - i := &cio{config: cfg} + i := &cio{config: fifos.Config} set := &ioSet{ in: stdin, out: stdout, err: stderr, } - closer, err := copyIO(paths, set, cfg.Terminal) - if err != nil { - return nil, err - } + closer, err := copyIO(fifos, set, fifos.Terminal) i.closer = closer - return i, nil + return i, err } } @@ -156,16 +158,6 @@ func NullIO(id string) (IO, error) { return &cio{}, nil } -// FIFOSet is a set of fifos for use with tasks -type FIFOSet struct { - // Dir is the directory holding the task fifos - Dir string - // In, Out, and Err fifo paths - In, Out, Err string - // Terminal returns true if a terminal is being used for the task - Terminal bool -} - type ioSet struct { in io.Reader out, err io.Writer @@ -173,7 +165,6 @@ type ioSet struct { type wgCloser struct { wg *sync.WaitGroup - dir string set []io.Closer cancel context.CancelFunc } @@ -183,12 +174,10 @@ func (g *wgCloser) Wait() { } func (g *wgCloser) Close() error { + // TODO: this should return all errors, not mask them for _, f := range g.set { f.Close() } - if g.dir != "" { - return os.RemoveAll(g.dir) - } return nil } diff --git a/cio/io_unix.go b/cio/io_unix.go index c18f7ecf9..6144ee04c 100644 --- a/cio/io_unix.go +++ b/cio/io_unix.go @@ -14,8 +14,8 @@ import ( "github.com/containerd/fifo" ) -// NewFifos returns a new set of fifos for the task -func NewFifos(id string) (*FIFOSet, error) { +// newFIFOSetTempDir returns a new set of fifos for the task +func newFIFOSetTempDir(id string) (*FIFOSet, error) { root := "/run/containerd/fifo" if err := os.MkdirAll(root, 0700); err != nil { return nil, err @@ -24,21 +24,23 @@ func NewFifos(id string) (*FIFOSet, error) { if err != nil { return nil, err } - return &FIFOSet{ - Dir: dir, - In: filepath.Join(dir, id+"-stdin"), - Out: filepath.Join(dir, id+"-stdout"), - Err: filepath.Join(dir, id+"-stderr"), - }, nil + closer := func() error { + return os.RemoveAll(dir) + } + return NewFIFOSet(Config{ + Stdin: filepath.Join(dir, id+"-stdin"), + Stdout: filepath.Join(dir, id+"-stdout"), + Stderr: filepath.Join(dir, id+"-stderr"), + }, closer), nil } func copyIO(fifos *FIFOSet, ioset *ioSet, tty bool) (_ *wgCloser, err error) { var ( f io.ReadWriteCloser - set []io.Closer ctx, cancel = context.WithCancel(context.Background()) wg = &sync.WaitGroup{} ) + set := []io.Closer{fifos} defer func() { if err != nil { for _, f := range set { @@ -48,7 +50,7 @@ func copyIO(fifos *FIFOSet, ioset *ioSet, tty bool) (_ *wgCloser, err error) { } }() - if f, err = fifo.OpenFifo(ctx, fifos.In, syscall.O_WRONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil { + if f, err = fifo.OpenFifo(ctx, fifos.Stdin, syscall.O_WRONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil { return nil, err } set = append(set, f) @@ -57,7 +59,7 @@ func copyIO(fifos *FIFOSet, ioset *ioSet, tty bool) (_ *wgCloser, err error) { w.Close() }(f) - if f, err = fifo.OpenFifo(ctx, fifos.Out, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil { + if f, err = fifo.OpenFifo(ctx, fifos.Stdout, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil { return nil, err } set = append(set, f) @@ -68,7 +70,7 @@ func copyIO(fifos *FIFOSet, ioset *ioSet, tty bool) (_ *wgCloser, err error) { wg.Done() }(f) - if f, err = fifo.OpenFifo(ctx, fifos.Err, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil { + if f, err = fifo.OpenFifo(ctx, fifos.Stderr, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil { return nil, err } set = append(set, f) @@ -83,7 +85,6 @@ func copyIO(fifos *FIFOSet, ioset *ioSet, tty bool) (_ *wgCloser, err error) { } return &wgCloser{ wg: wg, - dir: fifos.Dir, set: set, cancel: cancel, }, nil @@ -91,27 +92,26 @@ func copyIO(fifos *FIFOSet, ioset *ioSet, tty bool) (_ *wgCloser, err error) { // NewDirectIO returns an IO implementation that exposes the pipes directly func NewDirectIO(ctx context.Context, terminal bool) (*DirectIO, error) { - set, err := NewFifos("") + set, err := newFIFOSetTempDir("") if err != nil { return nil, err } - f := &DirectIO{ - set: set, - terminal: terminal, - } + set.Terminal = terminal + f := &DirectIO{set: set} + defer func() { if err != nil { f.Delete() } }() - if f.Stdin, err = fifo.OpenFifo(ctx, set.In, syscall.O_WRONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil { + if f.Stdin, err = fifo.OpenFifo(ctx, set.Stdin, syscall.O_WRONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil { return nil, err } - if f.Stdout, err = fifo.OpenFifo(ctx, set.Out, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil { + if f.Stdout, err = fifo.OpenFifo(ctx, set.Stdout, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil { f.Stdin.Close() return nil, err } - if f.Stderr, err = fifo.OpenFifo(ctx, set.Err, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil { + if f.Stderr, err = fifo.OpenFifo(ctx, set.Stderr, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil { f.Stdin.Close() f.Stdout.Close() return nil, err @@ -125,8 +125,7 @@ type DirectIO struct { Stdout io.ReadCloser Stderr io.ReadCloser - set *FIFOSet - terminal bool + set *FIFOSet } // IOCreate returns IO avaliable for use with task creation @@ -141,12 +140,7 @@ func (f *DirectIO) IOAttach(set *FIFOSet) (IO, error) { // Config returns the Config func (f *DirectIO) Config() Config { - return Config{ - Terminal: f.terminal, - Stdin: f.set.In, - Stdout: f.set.Out, - Stderr: f.set.Err, - } + return f.set.Config } // Cancel stops any IO copy operations @@ -177,8 +171,5 @@ func (f *DirectIO) Close() error { // Delete removes the underlying directory containing fifos func (f *DirectIO) Delete() error { - if f.set.Dir == "" { - return nil - } - return os.RemoveAll(f.set.Dir) + return f.set.Close() } diff --git a/cio/io_windows.go b/cio/io_windows.go index 1458c3173..3d3fffa6a 100644 --- a/cio/io_windows.go +++ b/cio/io_windows.go @@ -13,12 +13,12 @@ import ( const pipeRoot = `\\.\pipe` -// NewFifos returns a new set of fifos for the task -func NewFifos(id string) (*FIFOSet, error) { +// newFIFOSetTempDir returns a new set of fifos for the task +func newFIFOSetTempDir(id string) (*FIFOSet, error) { return &FIFOSet{ - In: fmt.Sprintf(`%s\ctr-%s-stdin`, pipeRoot, id), - Out: fmt.Sprintf(`%s\ctr-%s-stdout`, pipeRoot, id), - Err: fmt.Sprintf(`%s\ctr-%s-stderr`, pipeRoot, id), + StdIn: fmt.Sprintf(`%s\ctr-%s-stdin`, pipeRoot, id), + StdOut: fmt.Sprintf(`%s\ctr-%s-stdout`, pipeRoot, id), + StdErr: fmt.Sprintf(`%s\ctr-%s-stderr`, pipeRoot, id), }, nil } @@ -28,10 +28,10 @@ func copyIO(fifos *FIFOSet, ioset *ioSet, tty bool) (_ *wgCloser, err error) { set []io.Closer ) - if fifos.In != "" { - l, err := winio.ListenPipe(fifos.In, nil) + if fifos.StdIn != "" { + l, err := winio.ListenPipe(fifos.StdIn, nil) if err != nil { - return nil, errors.Wrapf(err, "failed to create stdin pipe %s", fifos.In) + return nil, errors.Wrapf(err, "failed to create stdin pipe %s", fifos.StdIn) } defer func(l net.Listener) { if err != nil { @@ -43,7 +43,7 @@ func copyIO(fifos *FIFOSet, ioset *ioSet, tty bool) (_ *wgCloser, err error) { go func() { c, err := l.Accept() if err != nil { - log.L.WithError(err).Errorf("failed to accept stdin connection on %s", fifos.In) + log.L.WithError(err).Errorf("failed to accept stdin connection on %s", fifos.StdIn) return } io.Copy(c, ioset.in) @@ -52,10 +52,10 @@ func copyIO(fifos *FIFOSet, ioset *ioSet, tty bool) (_ *wgCloser, err error) { }() } - if fifos.Out != "" { - l, err := winio.ListenPipe(fifos.Out, nil) + if fifos.StdOut != "" { + l, err := winio.ListenPipe(fifos.StdOut, nil) if err != nil { - return nil, errors.Wrapf(err, "failed to create stdin pipe %s", fifos.Out) + return nil, errors.Wrapf(err, "failed to create stdin pipe %s", fifos.StdOut) } defer func(l net.Listener) { if err != nil { @@ -69,7 +69,7 @@ func copyIO(fifos *FIFOSet, ioset *ioSet, tty bool) (_ *wgCloser, err error) { defer wg.Done() c, err := l.Accept() if err != nil { - log.L.WithError(err).Errorf("failed to accept stdout connection on %s", fifos.Out) + log.L.WithError(err).Errorf("failed to accept stdout connection on %s", fifos.StdOut) return } io.Copy(ioset.out, c) @@ -78,10 +78,10 @@ func copyIO(fifos *FIFOSet, ioset *ioSet, tty bool) (_ *wgCloser, err error) { }() } - if !tty && fifos.Err != "" { - l, err := winio.ListenPipe(fifos.Err, nil) + if !tty && fifos.StdErr != "" { + l, err := winio.ListenPipe(fifos.StdErr, nil) if err != nil { - return nil, errors.Wrapf(err, "failed to create stderr pipe %s", fifos.Err) + return nil, errors.Wrapf(err, "failed to create stderr pipe %s", fifos.StdErr) } defer func(l net.Listener) { if err != nil { @@ -95,7 +95,7 @@ func copyIO(fifos *FIFOSet, ioset *ioSet, tty bool) (_ *wgCloser, err error) { defer wg.Done() c, err := l.Accept() if err != nil { - log.L.WithError(err).Errorf("failed to accept stderr connection on %s", fifos.Err) + log.L.WithError(err).Errorf("failed to accept stderr connection on %s", fifos.StdErr) return } io.Copy(ioset.err, c) diff --git a/container.go b/container.go index 716e0456c..1f0e9be2a 100644 --- a/container.go +++ b/container.go @@ -3,6 +3,7 @@ package containerd import ( "context" "encoding/json" + "os" "path/filepath" "strings" @@ -288,20 +289,23 @@ func (c *container) get(ctx context.Context) (containers.Container, error) { return c.client.ContainerService().Get(ctx, c.id) } +// get the existing fifo paths from the task information stored by the daemon func attachExistingIO(response *tasks.GetResponse, ioAttach cio.Attach) (cio.IO, error) { - // get the existing fifo paths from the task information stored by the daemon - paths := &cio.FIFOSet{ - Dir: getFifoDir([]string{ - response.Process.Stdin, - response.Process.Stdout, - response.Process.Stderr, - }), - In: response.Process.Stdin, - Out: response.Process.Stdout, - Err: response.Process.Stderr, - Terminal: response.Process.Terminal, + path := getFifoDir([]string{ + response.Process.Stdin, + response.Process.Stdout, + response.Process.Stderr, + }) + closer := func() error { + return os.RemoveAll(path) } - return ioAttach(paths) + fifoSet := cio.NewFIFOSet(cio.Config{ + Stdin: response.Process.Stdin, + Stdout: response.Process.Stdout, + Stderr: response.Process.Stderr, + Terminal: response.Process.Terminal, + }, closer) + return ioAttach(fifoSet) } // getFifoDir looks for any non-empty path for a stdio fifo From 3146019918cce8103fcc42a7b3deffa03ad87d47 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Wed, 6 Dec 2017 17:28:17 -0500 Subject: [PATCH 2/5] Refactor cio.DirectIO New code duplication Better re-use from consumers of the cio package TODO: io_windows.go Signed-off-by: Daniel Nephin --- cio/io.go | 122 +++++++++++---------------- cio/io_unix.go | 180 +++++++++++++++------------------------- cio/io_windows.go | 11 +-- container_linux_test.go | 51 +++++++++++- 4 files changed, 170 insertions(+), 194 deletions(-) diff --git a/cio/io.go b/cio/io.go index 343e57b79..8e94686bc 100644 --- a/cio/io.go +++ b/cio/io.go @@ -33,38 +33,6 @@ type IO interface { Close() error } -// cio is a basic container IO implementation. -type cio struct { - config Config - - closer *wgCloser -} - -func (c *cio) Config() Config { - return c.config -} - -func (c *cio) Cancel() { - if c.closer == nil { - return - } - c.closer.Cancel() -} - -func (c *cio) Wait() { - if c.closer == nil { - return - } - c.closer.Wait() -} - -func (c *cio) Close() error { - if c.closer == nil { - return nil - } - return c.closer.Close() -} - // Creation creates new IO sets for a task type Creation func(id string) (IO, error) @@ -100,27 +68,15 @@ func NewIO(stdin io.Reader, stdout, stderr io.Writer) Creation { // NewIOWithTerminal creates a new io set with the provided io.Reader/Writers for use with a terminal func NewIOWithTerminal(stdin io.Reader, stdout, stderr io.Writer, terminal bool) Creation { - return func(id string) (_ IO, err error) { - fifos, err := newFIFOSetTempDir(id) + return func(id string) (IO, error) { + fifos, err := newFIFOSetInTempDir(id) if err != nil { return nil, err } - defer func() { - if err != nil { - fifos.Close() - } - }() - cfg := fifos.Config - cfg.Terminal = terminal - i := &cio{config: cfg} - set := &ioSet{ - in: stdin, - out: stdout, - err: stderr, - } - closer, err := copyIO(fifos, set, cfg.Terminal) - i.closer = closer - return i, err + + fifos.Terminal = terminal + set := &ioSet{in: stdin, out: stdout, err: stderr} + return copyIO(fifos, set) } } @@ -128,17 +84,10 @@ func NewIOWithTerminal(stdin io.Reader, stdout, stderr io.Writer, terminal bool) func WithAttach(stdin io.Reader, stdout, stderr io.Writer) Attach { return func(fifos *FIFOSet) (IO, error) { if fifos == nil { - return nil, fmt.Errorf("cannot attach to existing fifos") + return nil, fmt.Errorf("cannot attach, missing fifos") } - i := &cio{config: fifos.Config} - set := &ioSet{ - in: stdin, - out: stdout, - err: stderr, - } - closer, err := copyIO(fifos, set, fifos.Terminal) - i.closer = closer - return i, err + set := &ioSet{in: stdin, out: stdout, err: stderr} + return copyIO(fifos, set) } } @@ -154,7 +103,7 @@ func StdioTerminal(id string) (IO, error) { } // NullIO redirects the container's IO into /dev/null -func NullIO(id string) (IO, error) { +func NullIO(_ string) (IO, error) { return &cio{}, nil } @@ -163,24 +112,49 @@ type ioSet struct { out, err io.Writer } -type wgCloser struct { - wg *sync.WaitGroup - set []io.Closer - cancel context.CancelFunc +type pipes struct { + Stdin io.WriteCloser + Stdout io.ReadCloser + Stderr io.ReadCloser } -func (g *wgCloser) Wait() { - g.wg.Wait() +func (p *pipes) closers() []io.Closer { + return []io.Closer{p.Stdin, p.Stdout, p.Stderr} } -func (g *wgCloser) Close() error { - // TODO: this should return all errors, not mask them - for _, f := range g.set { - f.Close() +// cio is a basic container IO implementation. +type cio struct { + config Config + wg *sync.WaitGroup + closers []io.Closer + cancel context.CancelFunc +} + +func (c *cio) Config() Config { + return c.config +} + +func (c *cio) Wait() { + if c.wg != nil { + c.wg.Wait() } - return nil } -func (g *wgCloser) Cancel() { - g.cancel() +func (c *cio) Close() error { + var lastErr error + for _, closer := range c.closers { + if closer == nil { + continue + } + if err := closer.Close(); err != nil { + lastErr = err + } + } + return lastErr +} + +func (c *cio) Cancel() { + if c.cancel != nil { + c.cancel() + } } diff --git a/cio/io_unix.go b/cio/io_unix.go index 6144ee04c..e745af4c8 100644 --- a/cio/io_unix.go +++ b/cio/io_unix.go @@ -12,10 +12,11 @@ import ( "syscall" "github.com/containerd/fifo" + "github.com/pkg/errors" ) -// newFIFOSetTempDir returns a new set of fifos for the task -func newFIFOSetTempDir(id string) (*FIFOSet, error) { +// newFIFOSetInTempDir returns a new set of fifos for the task +func newFIFOSetInTempDir(id string) (*FIFOSet, error) { root := "/run/containerd/fifo" if err := os.MkdirAll(root, 0700); err != nil { return nil, err @@ -34,142 +35,99 @@ func newFIFOSetTempDir(id string) (*FIFOSet, error) { }, closer), nil } -func copyIO(fifos *FIFOSet, ioset *ioSet, tty bool) (_ *wgCloser, err error) { +func copyIO(fifos *FIFOSet, ioset *ioSet) (*cio, error) { var ( - f io.ReadWriteCloser ctx, cancel = context.WithCancel(context.Background()) wg = &sync.WaitGroup{} ) - set := []io.Closer{fifos} - defer func() { - if err != nil { - for _, f := range set { - f.Close() - } - cancel() - } + + pipes, err := openFifos(ctx, fifos) + if err != nil { + cancel() + return nil, err + } + + if fifos.Stdin != "" { + go func() { + io.Copy(pipes.Stdin, ioset.in) + pipes.Stdin.Close() + }() + } + + wg.Add(1) + go func() { + io.Copy(ioset.out, pipes.Stdout) + pipes.Stdout.Close() + wg.Done() }() - if f, err = fifo.OpenFifo(ctx, fifos.Stdin, syscall.O_WRONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil { - return nil, err - } - set = append(set, f) - go func(w io.WriteCloser) { - io.Copy(w, ioset.in) - w.Close() - }(f) - - if f, err = fifo.OpenFifo(ctx, fifos.Stdout, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil { - return nil, err - } - set = append(set, f) - wg.Add(1) - go func(r io.ReadCloser) { - io.Copy(ioset.out, r) - r.Close() - wg.Done() - }(f) - - if f, err = fifo.OpenFifo(ctx, fifos.Stderr, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil { - return nil, err - } - set = append(set, f) - - if !tty { + if !fifos.Terminal { wg.Add(1) - go func(r io.ReadCloser) { - io.Copy(ioset.err, r) - r.Close() + go func() { + io.Copy(ioset.err, pipes.Stderr) + pipes.Stderr.Close() wg.Done() - }(f) + }() } - return &wgCloser{ - wg: wg, - set: set, - cancel: cancel, + return &cio{ + wg: wg, + closers: append(pipes.closers(), fifos), + cancel: cancel, }, nil } -// NewDirectIO returns an IO implementation that exposes the pipes directly -func NewDirectIO(ctx context.Context, terminal bool) (*DirectIO, error) { - set, err := newFIFOSetTempDir("") - if err != nil { - return nil, err - } - set.Terminal = terminal - f := &DirectIO{set: set} +func openFifos(ctx context.Context, fifos *FIFOSet) (pipes, error) { + var err error + f := new(pipes) defer func() { if err != nil { - f.Delete() + fifos.Close() } }() - if f.Stdin, err = fifo.OpenFifo(ctx, set.Stdin, syscall.O_WRONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil { - return nil, err + + if fifos.Stdin != "" { + if f.Stdin, err = fifo.OpenFifo(ctx, fifos.Stdin, syscall.O_WRONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil { + return pipes{}, errors.Wrapf(err, "failed to open stdin fifo") + } } - if f.Stdout, err = fifo.OpenFifo(ctx, set.Stdout, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil { + if f.Stdout, err = fifo.OpenFifo(ctx, fifos.Stdout, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil { f.Stdin.Close() - return nil, err + return pipes{}, errors.Wrapf(err, "failed to open stdout fifo") } - if f.Stderr, err = fifo.OpenFifo(ctx, set.Stderr, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil { + if f.Stderr, err = fifo.OpenFifo(ctx, fifos.Stderr, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil { f.Stdin.Close() f.Stdout.Close() + return pipes{}, errors.Wrapf(err, "failed to open stderr fifo") + } + return pipes{}, nil +} + +// NewDirectIO returns an IO implementation that exposes the IO streams as io.ReadCloser +// and io.WriteCloser. FIFOs are created in /run/containerd/fifo. +func NewDirectIO(ctx context.Context, terminal bool) (*DirectIO, error) { + fifos, err := newFIFOSetInTempDir("") + if err != nil { return nil, err } - return f, nil + fifos.Terminal = terminal + + ctx, cancel := context.WithCancel(context.Background()) + pipes, err := openFifos(ctx, fifos) + return &DirectIO{ + pipes: pipes, + cio: cio{ + config: fifos.Config, + closers: append(pipes.closers(), fifos), + cancel: cancel, + }, + }, err } // DirectIO allows task IO to be handled externally by the caller type DirectIO struct { - Stdin io.WriteCloser - Stdout io.ReadCloser - Stderr io.ReadCloser - - set *FIFOSet + pipes + cio } -// IOCreate returns IO avaliable for use with task creation -func (f *DirectIO) IOCreate(id string) (IO, error) { - return f, nil -} - -// IOAttach returns IO avaliable for use with task attachment -func (f *DirectIO) IOAttach(set *FIFOSet) (IO, error) { - return f, nil -} - -// Config returns the Config -func (f *DirectIO) Config() Config { - return f.set.Config -} - -// Cancel stops any IO copy operations -// -// Not applicable for DirectIO -func (f *DirectIO) Cancel() { - // nothing to cancel as all operations are handled externally -} - -// Wait on any IO copy operations -// -// Not applicable for DirectIO -func (f *DirectIO) Wait() { - // nothing to wait on as all operations are handled externally -} - -// Close closes all open fds -func (f *DirectIO) Close() error { - err := f.Stdin.Close() - if err2 := f.Stdout.Close(); err == nil { - err = err2 - } - if err2 := f.Stderr.Close(); err == nil { - err = err2 - } - return err -} - -// Delete removes the underlying directory containing fifos -func (f *DirectIO) Delete() error { - return f.set.Close() -} +var _ IO = &DirectIO{} diff --git a/cio/io_windows.go b/cio/io_windows.go index 3d3fffa6a..6ecfba43c 100644 --- a/cio/io_windows.go +++ b/cio/io_windows.go @@ -13,8 +13,8 @@ import ( const pipeRoot = `\\.\pipe` -// newFIFOSetTempDir returns a new set of fifos for the task -func newFIFOSetTempDir(id string) (*FIFOSet, error) { +// newFIFOSetInTempDir returns a new set of fifos for the task +func newFIFOSetInTempDir(id string) (*FIFOSet, error) { return &FIFOSet{ StdIn: fmt.Sprintf(`%s\ctr-%s-stdin`, pipeRoot, id), StdOut: fmt.Sprintf(`%s\ctr-%s-stdout`, pipeRoot, id), @@ -22,8 +22,9 @@ func newFIFOSetTempDir(id string) (*FIFOSet, error) { }, nil } -func copyIO(fifos *FIFOSet, ioset *ioSet, tty bool) (_ *wgCloser, err error) { +func copyIO(fifos *FIFOSet, ioset *ioSet) (*cio, error) { var ( + err error wg sync.WaitGroup set []io.Closer ) @@ -78,7 +79,7 @@ func copyIO(fifos *FIFOSet, ioset *ioSet, tty bool) (_ *wgCloser, err error) { }() } - if !tty && fifos.StdErr != "" { + if !fifos.Terminal && fifos.StdErr != "" { l, err := winio.ListenPipe(fifos.StdErr, nil) if err != nil { return nil, errors.Wrapf(err, "failed to create stderr pipe %s", fifos.StdErr) @@ -104,7 +105,7 @@ func copyIO(fifos *FIFOSet, ioset *ioSet, tty bool) (_ *wgCloser, err error) { }() } - return &wgCloser{ + return &cio{ wg: &wg, dir: fifos.Dir, set: set, diff --git a/container_linux_test.go b/container_linux_test.go index 85e934f52..5fc298b95 100644 --- a/container_linux_test.go +++ b/container_linux_test.go @@ -298,7 +298,7 @@ func TestContainerAttach(t *testing.T) { expected := "hello" + newLine - direct, err := cio.NewDirectIO(ctx, false) + direct, err := newDirectIO(ctx) if err != nil { t.Error(err) return @@ -372,6 +372,49 @@ func TestContainerAttach(t *testing.T) { } } +func newDirectIO(ctx context.Context) (*directIO, error) { + dio, err := cio.NewDirectIO(ctx, false) + if err != nil { + return nil, err + } + return &directIO{DirectIO: *dio}, nil +} + +type directIO struct { + cio.DirectIO +} + +// ioCreate returns IO avaliable for use with task creation +func (f *directIO) IOCreate(id string) (cio.IO, error) { + return f, nil +} + +// ioAttach returns IO avaliable for use with task attachment +func (f *directIO) IOAttach(set *cio.FIFOSet) (cio.IO, error) { + return f, nil +} + +func (f *directIO) Cancel() { + // nothing to cancel as all operations are handled externally +} + +// Close closes all open fds +func (f *directIO) Close() error { + err := f.Stdin.Close() + if err2 := f.Stdout.Close(); err == nil { + err = err2 + } + if err2 := f.Stderr.Close(); err == nil { + err = err2 + } + return err +} + +// Delete removes the underlying directory containing fifos +func (f *directIO) Delete() error { + return f.DirectIO.Close() +} + func TestContainerUsername(t *testing.T) { t.Parallel() @@ -393,7 +436,7 @@ func TestContainerUsername(t *testing.T) { t.Error(err) return } - direct, err := cio.NewDirectIO(ctx, false) + direct, err := newDirectIO(ctx) if err != nil { t.Error(err) return @@ -486,7 +529,7 @@ func TestContainerAttachProcess(t *testing.T) { expected := "hello" + newLine // creating IO early for easy resource cleanup - direct, err := cio.NewDirectIO(ctx, false) + direct, err := newDirectIO(ctx) if err != nil { t.Error(err) return @@ -602,7 +645,7 @@ func TestContainerUserID(t *testing.T) { t.Error(err) return } - direct, err := cio.NewDirectIO(ctx, false) + direct, err := newDirectIO(ctx) if err != nil { t.Error(err) return From a901091f7c536c504981423bc785fa6322bc874e Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Thu, 7 Dec 2017 12:25:27 -0500 Subject: [PATCH 3/5] Rename cio.Creation to cio.Creator Signed-off-by: Daniel Nephin --- cio/io.go | 10 +++++----- container.go | 4 ++-- container_test.go | 2 +- task.go | 4 ++-- 4 files changed, 10 insertions(+), 10 deletions(-) diff --git a/cio/io.go b/cio/io.go index 8e94686bc..43476966b 100644 --- a/cio/io.go +++ b/cio/io.go @@ -33,8 +33,8 @@ type IO interface { Close() error } -// Creation creates new IO sets for a task -type Creation func(id string) (IO, error) +// Creator creates new IO sets for a task +type Creator func(id string) (IO, error) // Attach allows callers to reattach to running tasks // @@ -61,13 +61,13 @@ func NewFIFOSet(config Config, close func() error) *FIFOSet { return &FIFOSet{Config: config, close: close} } -// NewIO returns an Creation that will provide IO sets without a terminal -func NewIO(stdin io.Reader, stdout, stderr io.Writer) Creation { +// NewIO returns an Creator that will provide IO sets without a terminal +func NewIO(stdin io.Reader, stdout, stderr io.Writer) Creator { return NewIOWithTerminal(stdin, stdout, stderr, false) } // NewIOWithTerminal creates a new io set with the provided io.Reader/Writers for use with a terminal -func NewIOWithTerminal(stdin io.Reader, stdout, stderr io.Writer, terminal bool) Creation { +func NewIOWithTerminal(stdin io.Reader, stdout, stderr io.Writer, terminal bool) Creator { return func(id string) (IO, error) { fifos, err := newFIFOSetInTempDir(id) if err != nil { diff --git a/container.go b/container.go index 1f0e9be2a..ad60c69ea 100644 --- a/container.go +++ b/container.go @@ -27,7 +27,7 @@ type Container interface { // Delete removes the container Delete(context.Context, ...DeleteOpts) error // NewTask creates a new task based on the container metadata - NewTask(context.Context, cio.Creation, ...NewTaskOpts) (Task, error) + NewTask(context.Context, cio.Creator, ...NewTaskOpts) (Task, error) // Spec returns the OCI runtime specification Spec(context.Context) (*specs.Spec, error) // Task returns the current task for the container @@ -163,7 +163,7 @@ func (c *container) Image(ctx context.Context) (Image, error) { }, nil } -func (c *container) NewTask(ctx context.Context, ioCreate cio.Creation, opts ...NewTaskOpts) (_ Task, err error) { +func (c *container) NewTask(ctx context.Context, ioCreate cio.Creator, opts ...NewTaskOpts) (_ Task, err error) { i, err := ioCreate(c.id) if err != nil { return nil, err diff --git a/container_test.go b/container_test.go index 3a1324bfc..e3ab0cf27 100644 --- a/container_test.go +++ b/container_test.go @@ -23,7 +23,7 @@ import ( gogotypes "github.com/gogo/protobuf/types" ) -func empty() cio.Creation { +func empty() cio.Creator { // TODO (@mlaventure) windows searches for pipes // when none are provided if runtime.GOOS == "windows" { diff --git a/task.go b/task.go index 28f49fa6c..121da9af5 100644 --- a/task.go +++ b/task.go @@ -123,7 +123,7 @@ type Task interface { // Resume the execution of the task Resume(context.Context) error // Exec creates a new process inside the task - Exec(context.Context, string, *specs.Process, cio.Creation) (Process, error) + Exec(context.Context, string, *specs.Process, cio.Creator) (Process, error) // Pids returns a list of system specific process ids inside the task Pids(context.Context) ([]ProcessInfo, error) // Checkpoint serializes the runtime and memory information of a task into an @@ -278,7 +278,7 @@ func (t *task) Delete(ctx context.Context, opts ...ProcessDeleteOpts) (*ExitStat return &ExitStatus{code: r.ExitStatus, exitedAt: r.ExitedAt}, nil } -func (t *task) Exec(ctx context.Context, id string, spec *specs.Process, ioCreate cio.Creation) (_ Process, err error) { +func (t *task) Exec(ctx context.Context, id string, spec *specs.Process, ioCreate cio.Creator) (_ Process, err error) { if id == "" { return nil, errors.Wrapf(errdefs.ErrInvalidArgument, "exec id must not be empty") } From 7d4337e73871d6093cb6e68c78a3b545c4dce871 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Thu, 7 Dec 2017 14:09:08 -0500 Subject: [PATCH 4/5] Reduce the number of IO constructors Signed-off-by: Daniel Nephin --- cio/io.go | 108 ++++++++++++++---------- cio/io_unix.go | 79 ++++++++--------- cio/io_windows.go | 74 ++++++++-------- cmd/ctr/commands/tasks/attach.go | 4 +- cmd/ctr/commands/tasks/exec.go | 4 +- cmd/ctr/commands/tasks/tasks_unix.go | 7 +- cmd/ctr/commands/tasks/tasks_windows.go | 4 +- container_linux_test.go | 12 ++- container_test.go | 18 +++- 9 files changed, 171 insertions(+), 139 deletions(-) diff --git a/cio/io.go b/cio/io.go index 43476966b..1b4a4dc25 100644 --- a/cio/io.go +++ b/cio/io.go @@ -8,7 +8,7 @@ import ( "sync" ) -// Config holds the io configurations. +// Config holds the IO configurations. type Config struct { // Terminal is true if one has been allocated Terminal bool @@ -49,6 +49,7 @@ type FIFOSet struct { close func() error } +// Close the FIFOSet func (f *FIFOSet) Close() error { if f.close != nil { return f.close() @@ -61,67 +62,72 @@ func NewFIFOSet(config Config, close func() error) *FIFOSet { return &FIFOSet{Config: config, close: close} } -// NewIO returns an Creator that will provide IO sets without a terminal -func NewIO(stdin io.Reader, stdout, stderr io.Writer) Creator { - return NewIOWithTerminal(stdin, stdout, stderr, false) +// Streams used to configure a Creator or Attach +type Streams struct { + Stdin io.Reader + Stdout io.Writer + Stderr io.Writer + Terminal bool } -// NewIOWithTerminal creates a new io set with the provided io.Reader/Writers for use with a terminal -func NewIOWithTerminal(stdin io.Reader, stdout, stderr io.Writer, terminal bool) Creator { - return func(id string) (IO, error) { - fifos, err := newFIFOSetInTempDir(id) - if err != nil { - return nil, err - } +// Opt customize options for creating a Creator or Attach +type Opt func(*Streams) - fifos.Terminal = terminal - set := &ioSet{in: stdin, out: stdout, err: stderr} - return copyIO(fifos, set) +// WithStdio sets stream options to the standard input/output streams +func WithStdio(opt *Streams) { + WithStreams(os.Stdin, os.Stdout, os.Stderr)(opt) +} + +// WithTerminal sets the terminal option +func WithTerminal(opt *Streams) { + opt.Terminal = true +} + +// WithStreams sets the stream options to the specified Reader and Writers +func WithStreams(stdin io.Reader, stdout, stderr io.Writer) Opt { + return func(opt *Streams) { + opt.Stdin = stdin + opt.Stdout = stdout + opt.Stderr = stderr } } -// WithAttach attaches the existing io for a task to the provided io.Reader/Writers -func WithAttach(stdin io.Reader, stdout, stderr io.Writer) Attach { +// NewCreator returns an IO creator from the options +func NewCreator(opts ...Opt) Creator { + streams := &Streams{} + for _, opt := range opts { + opt(streams) + } + return func(id string) (IO, error) { + // TODO: accept root as a param + root := "/run/containerd/fifo" + fifos, err := NewFIFOSetInDir(root, id, streams.Terminal) + if err != nil { + return nil, err + } + return copyIO(fifos, streams) + } +} + +// NewAttach attaches the existing io for a task to the provided io.Reader/Writers +func NewAttach(opts ...Opt) Attach { + streams := &Streams{} + for _, opt := range opts { + opt(streams) + } return func(fifos *FIFOSet) (IO, error) { if fifos == nil { return nil, fmt.Errorf("cannot attach, missing fifos") } - set := &ioSet{in: stdin, out: stdout, err: stderr} - return copyIO(fifos, set) + return copyIO(fifos, streams) } } -// Stdio returns an IO set to be used for a task -// that outputs the container's IO as the current processes Stdio -func Stdio(id string) (IO, error) { - return NewIO(os.Stdin, os.Stdout, os.Stderr)(id) -} - -// StdioTerminal will setup the IO for the task to use a terminal -func StdioTerminal(id string) (IO, error) { - return NewIOWithTerminal(os.Stdin, os.Stdout, os.Stderr, true)(id) -} - // NullIO redirects the container's IO into /dev/null func NullIO(_ string) (IO, error) { return &cio{}, nil } -type ioSet struct { - in io.Reader - out, err io.Writer -} - -type pipes struct { - Stdin io.WriteCloser - Stdout io.ReadCloser - Stderr io.ReadCloser -} - -func (p *pipes) closers() []io.Closer { - return []io.Closer{p.Stdin, p.Stdout, p.Stderr} -} - // cio is a basic container IO implementation. type cio struct { config Config @@ -158,3 +164,17 @@ func (c *cio) Cancel() { c.cancel() } } + +type pipes struct { + Stdin io.WriteCloser + Stdout io.ReadCloser + Stderr io.ReadCloser +} + +// DirectIO allows task IO to be handled externally by the caller +type DirectIO struct { + pipes + cio +} + +var _ IO = &DirectIO{} diff --git a/cio/io_unix.go b/cio/io_unix.go index e745af4c8..005fb0ce9 100644 --- a/cio/io_unix.go +++ b/cio/io_unix.go @@ -15,11 +15,12 @@ import ( "github.com/pkg/errors" ) -// newFIFOSetInTempDir returns a new set of fifos for the task -func newFIFOSetInTempDir(id string) (*FIFOSet, error) { - root := "/run/containerd/fifo" - if err := os.MkdirAll(root, 0700); err != nil { - return nil, err +// NewFIFOSetInDir returns a new FIFOSet with paths in a temporary directory under root +func NewFIFOSetInDir(root, id string, terminal bool) (*FIFOSet, error) { + if root != "" { + if err := os.MkdirAll(root, 0700); err != nil { + return nil, err + } } dir, err := ioutil.TempDir(root, "") if err != nil { @@ -29,18 +30,15 @@ func newFIFOSetInTempDir(id string) (*FIFOSet, error) { return os.RemoveAll(dir) } return NewFIFOSet(Config{ - Stdin: filepath.Join(dir, id+"-stdin"), - Stdout: filepath.Join(dir, id+"-stdout"), - Stderr: filepath.Join(dir, id+"-stderr"), + Stdin: filepath.Join(dir, id+"-stdin"), + Stdout: filepath.Join(dir, id+"-stdout"), + Stderr: filepath.Join(dir, id+"-stderr"), + Terminal: terminal, }, closer), nil } -func copyIO(fifos *FIFOSet, ioset *ioSet) (*cio, error) { - var ( - ctx, cancel = context.WithCancel(context.Background()) - wg = &sync.WaitGroup{} - ) - +func copyIO(fifos *FIFOSet, ioset *Streams) (*cio, error) { + var ctx, cancel = context.WithCancel(context.Background()) pipes, err := openFifos(ctx, fifos) if err != nil { cancel() @@ -49,14 +47,15 @@ func copyIO(fifos *FIFOSet, ioset *ioSet) (*cio, error) { if fifos.Stdin != "" { go func() { - io.Copy(pipes.Stdin, ioset.in) + io.Copy(pipes.Stdin, ioset.Stdin) pipes.Stdin.Close() }() } + var wg = &sync.WaitGroup{} wg.Add(1) go func() { - io.Copy(ioset.out, pipes.Stdout) + io.Copy(ioset.Stdout, pipes.Stdout) pipes.Stdout.Close() wg.Done() }() @@ -64,12 +63,13 @@ func copyIO(fifos *FIFOSet, ioset *ioSet) (*cio, error) { if !fifos.Terminal { wg.Add(1) go func() { - io.Copy(ioset.err, pipes.Stderr) + io.Copy(ioset.Stderr, pipes.Stderr) pipes.Stderr.Close() wg.Done() }() } return &cio{ + config: fifos.Config, wg: wg, closers: append(pipes.closers(), fifos), cancel: cancel, @@ -78,41 +78,38 @@ func copyIO(fifos *FIFOSet, ioset *ioSet) (*cio, error) { func openFifos(ctx context.Context, fifos *FIFOSet) (pipes, error) { var err error - f := new(pipes) - defer func() { if err != nil { fifos.Close() } }() + var f pipes if fifos.Stdin != "" { if f.Stdin, err = fifo.OpenFifo(ctx, fifos.Stdin, syscall.O_WRONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil { - return pipes{}, errors.Wrapf(err, "failed to open stdin fifo") + return f, errors.Wrapf(err, "failed to open stdin fifo") } } - if f.Stdout, err = fifo.OpenFifo(ctx, fifos.Stdout, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil { - f.Stdin.Close() - return pipes{}, errors.Wrapf(err, "failed to open stdout fifo") + if fifos.Stdout != "" { + if f.Stdout, err = fifo.OpenFifo(ctx, fifos.Stdout, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil { + f.Stdin.Close() + return f, errors.Wrapf(err, "failed to open stdout fifo") + } } - if f.Stderr, err = fifo.OpenFifo(ctx, fifos.Stderr, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil { - f.Stdin.Close() - f.Stdout.Close() - return pipes{}, errors.Wrapf(err, "failed to open stderr fifo") + if fifos.Stderr != "" { + if f.Stderr, err = fifo.OpenFifo(ctx, fifos.Stderr, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil { + f.Stdin.Close() + f.Stdout.Close() + return f, errors.Wrapf(err, "failed to open stderr fifo") + } } - return pipes{}, nil + return f, nil } // NewDirectIO returns an IO implementation that exposes the IO streams as io.ReadCloser -// and io.WriteCloser. FIFOs are created in /run/containerd/fifo. -func NewDirectIO(ctx context.Context, terminal bool) (*DirectIO, error) { - fifos, err := newFIFOSetInTempDir("") - if err != nil { - return nil, err - } - fifos.Terminal = terminal - - ctx, cancel := context.WithCancel(context.Background()) +// and io.WriteCloser. +func NewDirectIO(ctx context.Context, fifos *FIFOSet) (*DirectIO, error) { + ctx, cancel := context.WithCancel(ctx) pipes, err := openFifos(ctx, fifos) return &DirectIO{ pipes: pipes, @@ -124,10 +121,6 @@ func NewDirectIO(ctx context.Context, terminal bool) (*DirectIO, error) { }, err } -// DirectIO allows task IO to be handled externally by the caller -type DirectIO struct { - pipes - cio +func (p *pipes) closers() []io.Closer { + return []io.Closer{p.Stdin, p.Stdout, p.Stderr} } - -var _ IO = &DirectIO{} diff --git a/cio/io_windows.go b/cio/io_windows.go index 6ecfba43c..017c9a11f 100644 --- a/cio/io_windows.go +++ b/cio/io_windows.go @@ -13,26 +13,26 @@ import ( const pipeRoot = `\\.\pipe` -// newFIFOSetInTempDir returns a new set of fifos for the task -func newFIFOSetInTempDir(id string) (*FIFOSet, error) { - return &FIFOSet{ - StdIn: fmt.Sprintf(`%s\ctr-%s-stdin`, pipeRoot, id), - StdOut: fmt.Sprintf(`%s\ctr-%s-stdout`, pipeRoot, id), - StdErr: fmt.Sprintf(`%s\ctr-%s-stderr`, pipeRoot, id), - }, nil +// NewFIFOSetInDir returns a new set of fifos for the task +func NewFIFOSetInDir(_, id string, terminal bool) (*FIFOSet, error) { + return NewFIFOSet(Config{ + Terminal: terminal, + Stdin: fmt.Sprintf(`%s\ctr-%s-stdin`, pipeRoot, id), + Stdout: fmt.Sprintf(`%s\ctr-%s-stdout`, pipeRoot, id), + Stderr: fmt.Sprintf(`%s\ctr-%s-stderr`, pipeRoot, id), + }, nil), nil } -func copyIO(fifos *FIFOSet, ioset *ioSet) (*cio, error) { +func copyIO(fifos *FIFOSet, ioset *Streams) (*cio, error) { var ( - err error wg sync.WaitGroup set []io.Closer ) - if fifos.StdIn != "" { - l, err := winio.ListenPipe(fifos.StdIn, nil) + if fifos.Stdin != "" { + l, err := winio.ListenPipe(fifos.Stdin, nil) if err != nil { - return nil, errors.Wrapf(err, "failed to create stdin pipe %s", fifos.StdIn) + return nil, errors.Wrapf(err, "failed to create stdin pipe %s", fifos.Stdin) } defer func(l net.Listener) { if err != nil { @@ -44,19 +44,19 @@ func copyIO(fifos *FIFOSet, ioset *ioSet) (*cio, error) { go func() { c, err := l.Accept() if err != nil { - log.L.WithError(err).Errorf("failed to accept stdin connection on %s", fifos.StdIn) + log.L.WithError(err).Errorf("failed to accept stdin connection on %s", fifos.Stdin) return } - io.Copy(c, ioset.in) + io.Copy(c, ioset.Stdin) c.Close() l.Close() }() } - if fifos.StdOut != "" { - l, err := winio.ListenPipe(fifos.StdOut, nil) + if fifos.Stdout != "" { + l, err := winio.ListenPipe(fifos.Stdout, nil) if err != nil { - return nil, errors.Wrapf(err, "failed to create stdin pipe %s", fifos.StdOut) + return nil, errors.Wrapf(err, "failed to create stdin pipe %s", fifos.Stdout) } defer func(l net.Listener) { if err != nil { @@ -70,19 +70,19 @@ func copyIO(fifos *FIFOSet, ioset *ioSet) (*cio, error) { defer wg.Done() c, err := l.Accept() if err != nil { - log.L.WithError(err).Errorf("failed to accept stdout connection on %s", fifos.StdOut) + log.L.WithError(err).Errorf("failed to accept stdout connection on %s", fifos.Stdout) return } - io.Copy(ioset.out, c) + io.Copy(ioset.Stdout, c) c.Close() l.Close() }() } - if !fifos.Terminal && fifos.StdErr != "" { - l, err := winio.ListenPipe(fifos.StdErr, nil) + if !fifos.Terminal && fifos.Stderr != "" { + l, err := winio.ListenPipe(fifos.Stderr, nil) if err != nil { - return nil, errors.Wrapf(err, "failed to create stderr pipe %s", fifos.StdErr) + return nil, errors.Wrapf(err, "failed to create stderr pipe %s", fifos.Stderr) } defer func(l net.Listener) { if err != nil { @@ -96,23 +96,29 @@ func copyIO(fifos *FIFOSet, ioset *ioSet) (*cio, error) { defer wg.Done() c, err := l.Accept() if err != nil { - log.L.WithError(err).Errorf("failed to accept stderr connection on %s", fifos.StdErr) + log.L.WithError(err).Errorf("failed to accept stderr connection on %s", fifos.Stderr) return } - io.Copy(ioset.err, c) + io.Copy(ioset.Stderr, c) c.Close() l.Close() }() } - return &cio{ - wg: &wg, - dir: fifos.Dir, - set: set, - cancel: func() { - for _, l := range set { - l.Close() - } - }, - }, nil + return &cio{config: fifos.Config, closers: set}, nil +} + +// NewDirectIO returns an IO implementation that exposes the IO streams as io.ReadCloser +// and io.WriteCloser. +func NewDirectIO(stdin io.WriteCloser, stdout, stderr io.ReadCloser, terminal bool) *DirectIO { + return &DirectIO{ + pipes: pipes{ + Stdin: stdin, + Stdout: stdout, + Stderr: stderr, + }, + cio: cio{ + config: Config{Terminal: terminal}, + }, + } } diff --git a/cmd/ctr/commands/tasks/attach.go b/cmd/ctr/commands/tasks/attach.go index ec4da1005..517bd5764 100644 --- a/cmd/ctr/commands/tasks/attach.go +++ b/cmd/ctr/commands/tasks/attach.go @@ -1,8 +1,6 @@ package tasks import ( - "os" - "github.com/containerd/console" "github.com/containerd/containerd/cio" "github.com/containerd/containerd/cmd/ctr/commands" @@ -39,7 +37,7 @@ var attachCommand = cli.Command{ return err } } - task, err := container.Task(ctx, cio.WithAttach(os.Stdin, os.Stdout, os.Stderr)) + task, err := container.Task(ctx, cio.NewAttach(cio.WithStdio)) if err != nil { return err } diff --git a/cmd/ctr/commands/tasks/exec.go b/cmd/ctr/commands/tasks/exec.go index a7301136b..67011db49 100644 --- a/cmd/ctr/commands/tasks/exec.go +++ b/cmd/ctr/commands/tasks/exec.go @@ -60,9 +60,9 @@ var execCommand = cli.Command{ pspec.Terminal = tty pspec.Args = args - ioCreator := cio.Stdio + ioCreator := cio.NewCreator(cio.WithStdio) if tty { - ioCreator = cio.StdioTerminal + ioCreator = cio.NewCreator(cio.WithStdio, cio.WithTerminal) } process, err := task.Exec(ctx, context.String("exec-id"), pspec, ioCreator) if err != nil { diff --git a/cmd/ctr/commands/tasks/tasks_unix.go b/cmd/ctr/commands/tasks/tasks_unix.go index 0621f9566..2a1d5e832 100644 --- a/cmd/ctr/commands/tasks/tasks_unix.go +++ b/cmd/ctr/commands/tasks/tasks_unix.go @@ -44,10 +44,11 @@ func HandleConsoleResize(ctx gocontext.Context, task resizer, con console.Consol // NewTask creates a new task func NewTask(ctx gocontext.Context, client *containerd.Client, container containerd.Container, checkpoint string, tty, nullIO bool) (containerd.Task, error) { + stdio := cio.NewCreator(cio.WithStdio) if checkpoint == "" { - ioCreator := cio.Stdio + ioCreator := stdio if tty { - ioCreator = cio.StdioTerminal + ioCreator = cio.NewCreator(cio.WithStdio, cio.WithTerminal) } if nullIO { if tty { @@ -61,5 +62,5 @@ func NewTask(ctx gocontext.Context, client *containerd.Client, container contain if err != nil { return nil, err } - return container.NewTask(ctx, cio.Stdio, containerd.WithTaskCheckpoint(im)) + return container.NewTask(ctx, stdio, containerd.WithTaskCheckpoint(im)) } diff --git a/cmd/ctr/commands/tasks/tasks_windows.go b/cmd/ctr/commands/tasks/tasks_windows.go index 083ad49e6..93ca166e1 100644 --- a/cmd/ctr/commands/tasks/tasks_windows.go +++ b/cmd/ctr/commands/tasks/tasks_windows.go @@ -42,9 +42,9 @@ func HandleConsoleResize(ctx gocontext.Context, task resizer, con console.Consol // NewTask creates a new task func NewTask(ctx gocontext.Context, client *containerd.Client, container containerd.Container, _ string, tty, nullIO bool) (containerd.Task, error) { - ioCreator := cio.Stdio + ioCreator := cio.NewCreator(cio.WithStdio) if tty { - ioCreator = cio.StdioTerminal + ioCreator = cio.NewCreator(cio.WithStdio, cio.WithTerminal) } if nullIO { if tty { diff --git a/container_linux_test.go b/container_linux_test.go index 5fc298b95..b7d648fba 100644 --- a/container_linux_test.go +++ b/container_linux_test.go @@ -373,7 +373,11 @@ func TestContainerAttach(t *testing.T) { } func newDirectIO(ctx context.Context) (*directIO, error) { - dio, err := cio.NewDirectIO(ctx, false) + fifos, err := cio.NewFIFOSetInDir("", "", false) + if err != nil { + return nil, err + } + dio, err := cio.NewDirectIO(ctx, fifos) if err != nil { return nil, err } @@ -735,7 +739,7 @@ func TestContainerKillAll(t *testing.T) { defer container.Delete(ctx, WithSnapshotCleanup) stdout := bytes.NewBuffer(nil) - task, err := container.NewTask(ctx, cio.NewIO(bytes.NewBuffer(nil), stdout, bytes.NewBuffer(nil))) + task, err := container.NewTask(ctx, cio.NewCreator(withByteBuffers(stdout))) if err != nil { t.Error(err) return @@ -987,7 +991,7 @@ func TestContainerKillInitPidHost(t *testing.T) { defer container.Delete(ctx, WithSnapshotCleanup) stdout := bytes.NewBuffer(nil) - task, err := container.NewTask(ctx, cio.NewIO(bytes.NewBuffer(nil), stdout, bytes.NewBuffer(nil))) + task, err := container.NewTask(ctx, cio.NewCreator(withByteBuffers(stdout))) if err != nil { t.Error(err) return @@ -1086,7 +1090,7 @@ func testUserNamespaces(t *testing.T, readonlyRootFS bool) { } defer container.Delete(ctx, WithSnapshotCleanup) - task, err := container.NewTask(ctx, cio.Stdio, func(_ context.Context, client *Client, r *TaskInfo) error { + task, err := container.NewTask(ctx, cio.NewCreator(cio.WithStdio), func(_ context.Context, client *Client, r *TaskInfo) error { r.Options = &runctypes.CreateOptions{ IoUid: 1000, IoGid: 1000, diff --git a/container_test.go b/container_test.go index e3ab0cf27..8513b0469 100644 --- a/container_test.go +++ b/container_test.go @@ -3,6 +3,7 @@ package containerd import ( "bytes" "context" + "io" "io/ioutil" "os" "runtime" @@ -27,7 +28,7 @@ func empty() cio.Creator { // TODO (@mlaventure) windows searches for pipes // when none are provided if runtime.GOOS == "windows" { - return cio.Stdio + return cio.NewCreator(cio.WithStdio) } return cio.NullIO } @@ -187,7 +188,7 @@ func TestContainerOutput(t *testing.T) { defer container.Delete(ctx, WithSnapshotCleanup) stdout := bytes.NewBuffer(nil) - task, err := container.NewTask(ctx, cio.NewIO(bytes.NewBuffer(nil), stdout, bytes.NewBuffer(nil))) + task, err := container.NewTask(ctx, cio.NewCreator(withByteBuffers(stdout))) if err != nil { t.Error(err) return @@ -223,6 +224,15 @@ func TestContainerOutput(t *testing.T) { } } +func withByteBuffers(stdout io.Writer) cio.Opt { + // TODO: could this use ioutil.Discard? + return func(streams *cio.Streams) { + streams.Stdin = new(bytes.Buffer) + streams.Stdout = stdout + streams.Stderr = new(bytes.Buffer) + } +} + func TestContainerExec(t *testing.T) { t.Parallel() @@ -534,7 +544,7 @@ func TestContainerCloseIO(t *testing.T) { return } - task, err := container.NewTask(ctx, cio.NewIO(r, stdout, ioutil.Discard)) + task, err := container.NewTask(ctx, cio.NewCreator(cio.WithStreams(r, stdout, ioutil.Discard))) if err != nil { t.Error(err) return @@ -1145,7 +1155,7 @@ func TestContainerHostname(t *testing.T) { defer container.Delete(ctx, WithSnapshotCleanup) stdout := bytes.NewBuffer(nil) - task, err := container.NewTask(ctx, cio.NewIO(bytes.NewBuffer(nil), stdout, bytes.NewBuffer(nil))) + task, err := container.NewTask(ctx, cio.NewCreator(withByteBuffers(stdout))) if err != nil { t.Error(err) return From f34d03017800bb4fd3f1e1dff5fb493b124b5bf5 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Tue, 12 Dec 2017 14:45:39 -0500 Subject: [PATCH 5/5] Add unit test for cio Signed-off-by: Daniel Nephin --- cio/io_test.go | 132 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 132 insertions(+) create mode 100644 cio/io_test.go diff --git a/cio/io_test.go b/cio/io_test.go new file mode 100644 index 000000000..434c0af5f --- /dev/null +++ b/cio/io_test.go @@ -0,0 +1,132 @@ +// +build !windows + +package cio + +import ( + "bytes" + "context" + "io" + "io/ioutil" + "os" + "path/filepath" + "runtime" + "strings" + "syscall" + "testing" + + "github.com/containerd/fifo" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func assertHasPrefix(t *testing.T, s, prefix string) { + t.Helper() + if !strings.HasPrefix(s, prefix) { + t.Fatalf("expected %s to start with %s", s, prefix) + } +} + +func TestNewFIFOSetInDir(t *testing.T) { + if runtime.GOOS == "windows" { + t.Skip("NewFIFOSetInDir has different behaviour on windows") + } + + root, err := ioutil.TempDir("", "test-new-fifo-set") + require.NoError(t, err) + defer os.RemoveAll(root) + + fifos, err := NewFIFOSetInDir(root, "theid", true) + require.NoError(t, err) + + assertHasPrefix(t, fifos.Stdin, root) + assertHasPrefix(t, fifos.Stdout, root) + assertHasPrefix(t, fifos.Stderr, root) + assert.Equal(t, "theid-stdin", filepath.Base(fifos.Stdin)) + assert.Equal(t, "theid-stdout", filepath.Base(fifos.Stdout)) + assert.Equal(t, "theid-stderr", filepath.Base(fifos.Stderr)) + assert.Equal(t, true, fifos.Terminal) + + files, err := ioutil.ReadDir(root) + require.NoError(t, err) + assert.Len(t, files, 1) + + require.NoError(t, fifos.Close()) + files, err = ioutil.ReadDir(root) + require.NoError(t, err) + assert.Len(t, files, 0) +} + +func TestNewAttach(t *testing.T) { + if runtime.GOOS == "windows" { + t.Skip("setupFIFOProducers not yet implemented on windows") + } + var ( + expectedStdin = "this is the stdin" + expectedStdout = "this is the stdout" + expectedStderr = "this is the stderr" + stdin = bytes.NewBufferString(expectedStdin) + stdout = new(bytes.Buffer) + stderr = new(bytes.Buffer) + ) + + withBytesBuffers := func(streams *Streams) { + *streams = Streams{Stdin: stdin, Stdout: stdout, Stderr: stderr} + } + attacher := NewAttach(withBytesBuffers) + + fifos, err := NewFIFOSetInDir("", "theid", false) + require.NoError(t, err) + + io, err := attacher(fifos) + require.NoError(t, err) + defer io.Close() + + producers := setupFIFOProducers(t, io.Config()) + initProducers(t, producers, expectedStdout, expectedStderr) + + actualStdin, err := ioutil.ReadAll(producers.Stdin) + require.NoError(t, err) + + io.Cancel() + io.Wait() + assert.NoError(t, io.Close()) + + assert.Equal(t, expectedStdout, stdout.String()) + assert.Equal(t, expectedStderr, stderr.String()) + assert.Equal(t, expectedStdin, string(actualStdin)) +} + +type producers struct { + Stdin io.ReadCloser + Stdout io.WriteCloser + Stderr io.WriteCloser +} + +func setupFIFOProducers(t *testing.T, fifos Config) producers { + var ( + err error + pipes producers + ctx = context.Background() + ) + + pipes.Stdin, err = fifo.OpenFifo(ctx, fifos.Stdin, syscall.O_RDONLY, 0) + require.NoError(t, err) + + pipes.Stdout, err = fifo.OpenFifo(ctx, fifos.Stdout, syscall.O_WRONLY, 0) + require.NoError(t, err) + + pipes.Stderr, err = fifo.OpenFifo(ctx, fifos.Stderr, syscall.O_WRONLY, 0) + require.NoError(t, err) + + return pipes +} + +func initProducers(t *testing.T, producers producers, stdout, stderr string) { + _, err := producers.Stdout.Write([]byte(stdout)) + require.NoError(t, err) + require.NoError(t, producers.Stdout.Close()) + + _, err = producers.Stderr.Write([]byte(stderr)) + require.NoError(t, err) + require.NoError(t, producers.Stderr.Close()) +}