From 3146019918cce8103fcc42a7b3deffa03ad87d47 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Wed, 6 Dec 2017 17:28:17 -0500 Subject: [PATCH] Refactor cio.DirectIO New code duplication Better re-use from consumers of the cio package TODO: io_windows.go Signed-off-by: Daniel Nephin --- cio/io.go | 122 +++++++++++---------------- cio/io_unix.go | 180 +++++++++++++++------------------------- cio/io_windows.go | 11 +-- container_linux_test.go | 51 +++++++++++- 4 files changed, 170 insertions(+), 194 deletions(-) diff --git a/cio/io.go b/cio/io.go index 343e57b79..8e94686bc 100644 --- a/cio/io.go +++ b/cio/io.go @@ -33,38 +33,6 @@ type IO interface { Close() error } -// cio is a basic container IO implementation. -type cio struct { - config Config - - closer *wgCloser -} - -func (c *cio) Config() Config { - return c.config -} - -func (c *cio) Cancel() { - if c.closer == nil { - return - } - c.closer.Cancel() -} - -func (c *cio) Wait() { - if c.closer == nil { - return - } - c.closer.Wait() -} - -func (c *cio) Close() error { - if c.closer == nil { - return nil - } - return c.closer.Close() -} - // Creation creates new IO sets for a task type Creation func(id string) (IO, error) @@ -100,27 +68,15 @@ func NewIO(stdin io.Reader, stdout, stderr io.Writer) Creation { // NewIOWithTerminal creates a new io set with the provided io.Reader/Writers for use with a terminal func NewIOWithTerminal(stdin io.Reader, stdout, stderr io.Writer, terminal bool) Creation { - return func(id string) (_ IO, err error) { - fifos, err := newFIFOSetTempDir(id) + return func(id string) (IO, error) { + fifos, err := newFIFOSetInTempDir(id) if err != nil { return nil, err } - defer func() { - if err != nil { - fifos.Close() - } - }() - cfg := fifos.Config - cfg.Terminal = terminal - i := &cio{config: cfg} - set := &ioSet{ - in: stdin, - out: stdout, - err: stderr, - } - closer, err := copyIO(fifos, set, cfg.Terminal) - i.closer = closer - return i, err + + fifos.Terminal = terminal + set := &ioSet{in: stdin, out: stdout, err: stderr} + return copyIO(fifos, set) } } @@ -128,17 +84,10 @@ func NewIOWithTerminal(stdin io.Reader, stdout, stderr io.Writer, terminal bool) func WithAttach(stdin io.Reader, stdout, stderr io.Writer) Attach { return func(fifos *FIFOSet) (IO, error) { if fifos == nil { - return nil, fmt.Errorf("cannot attach to existing fifos") + return nil, fmt.Errorf("cannot attach, missing fifos") } - i := &cio{config: fifos.Config} - set := &ioSet{ - in: stdin, - out: stdout, - err: stderr, - } - closer, err := copyIO(fifos, set, fifos.Terminal) - i.closer = closer - return i, err + set := &ioSet{in: stdin, out: stdout, err: stderr} + return copyIO(fifos, set) } } @@ -154,7 +103,7 @@ func StdioTerminal(id string) (IO, error) { } // NullIO redirects the container's IO into /dev/null -func NullIO(id string) (IO, error) { +func NullIO(_ string) (IO, error) { return &cio{}, nil } @@ -163,24 +112,49 @@ type ioSet struct { out, err io.Writer } -type wgCloser struct { - wg *sync.WaitGroup - set []io.Closer - cancel context.CancelFunc +type pipes struct { + Stdin io.WriteCloser + Stdout io.ReadCloser + Stderr io.ReadCloser } -func (g *wgCloser) Wait() { - g.wg.Wait() +func (p *pipes) closers() []io.Closer { + return []io.Closer{p.Stdin, p.Stdout, p.Stderr} } -func (g *wgCloser) Close() error { - // TODO: this should return all errors, not mask them - for _, f := range g.set { - f.Close() +// cio is a basic container IO implementation. +type cio struct { + config Config + wg *sync.WaitGroup + closers []io.Closer + cancel context.CancelFunc +} + +func (c *cio) Config() Config { + return c.config +} + +func (c *cio) Wait() { + if c.wg != nil { + c.wg.Wait() } - return nil } -func (g *wgCloser) Cancel() { - g.cancel() +func (c *cio) Close() error { + var lastErr error + for _, closer := range c.closers { + if closer == nil { + continue + } + if err := closer.Close(); err != nil { + lastErr = err + } + } + return lastErr +} + +func (c *cio) Cancel() { + if c.cancel != nil { + c.cancel() + } } diff --git a/cio/io_unix.go b/cio/io_unix.go index 6144ee04c..e745af4c8 100644 --- a/cio/io_unix.go +++ b/cio/io_unix.go @@ -12,10 +12,11 @@ import ( "syscall" "github.com/containerd/fifo" + "github.com/pkg/errors" ) -// newFIFOSetTempDir returns a new set of fifos for the task -func newFIFOSetTempDir(id string) (*FIFOSet, error) { +// newFIFOSetInTempDir returns a new set of fifos for the task +func newFIFOSetInTempDir(id string) (*FIFOSet, error) { root := "/run/containerd/fifo" if err := os.MkdirAll(root, 0700); err != nil { return nil, err @@ -34,142 +35,99 @@ func newFIFOSetTempDir(id string) (*FIFOSet, error) { }, closer), nil } -func copyIO(fifos *FIFOSet, ioset *ioSet, tty bool) (_ *wgCloser, err error) { +func copyIO(fifos *FIFOSet, ioset *ioSet) (*cio, error) { var ( - f io.ReadWriteCloser ctx, cancel = context.WithCancel(context.Background()) wg = &sync.WaitGroup{} ) - set := []io.Closer{fifos} - defer func() { - if err != nil { - for _, f := range set { - f.Close() - } - cancel() - } + + pipes, err := openFifos(ctx, fifos) + if err != nil { + cancel() + return nil, err + } + + if fifos.Stdin != "" { + go func() { + io.Copy(pipes.Stdin, ioset.in) + pipes.Stdin.Close() + }() + } + + wg.Add(1) + go func() { + io.Copy(ioset.out, pipes.Stdout) + pipes.Stdout.Close() + wg.Done() }() - if f, err = fifo.OpenFifo(ctx, fifos.Stdin, syscall.O_WRONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil { - return nil, err - } - set = append(set, f) - go func(w io.WriteCloser) { - io.Copy(w, ioset.in) - w.Close() - }(f) - - if f, err = fifo.OpenFifo(ctx, fifos.Stdout, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil { - return nil, err - } - set = append(set, f) - wg.Add(1) - go func(r io.ReadCloser) { - io.Copy(ioset.out, r) - r.Close() - wg.Done() - }(f) - - if f, err = fifo.OpenFifo(ctx, fifos.Stderr, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil { - return nil, err - } - set = append(set, f) - - if !tty { + if !fifos.Terminal { wg.Add(1) - go func(r io.ReadCloser) { - io.Copy(ioset.err, r) - r.Close() + go func() { + io.Copy(ioset.err, pipes.Stderr) + pipes.Stderr.Close() wg.Done() - }(f) + }() } - return &wgCloser{ - wg: wg, - set: set, - cancel: cancel, + return &cio{ + wg: wg, + closers: append(pipes.closers(), fifos), + cancel: cancel, }, nil } -// NewDirectIO returns an IO implementation that exposes the pipes directly -func NewDirectIO(ctx context.Context, terminal bool) (*DirectIO, error) { - set, err := newFIFOSetTempDir("") - if err != nil { - return nil, err - } - set.Terminal = terminal - f := &DirectIO{set: set} +func openFifos(ctx context.Context, fifos *FIFOSet) (pipes, error) { + var err error + f := new(pipes) defer func() { if err != nil { - f.Delete() + fifos.Close() } }() - if f.Stdin, err = fifo.OpenFifo(ctx, set.Stdin, syscall.O_WRONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil { - return nil, err + + if fifos.Stdin != "" { + if f.Stdin, err = fifo.OpenFifo(ctx, fifos.Stdin, syscall.O_WRONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil { + return pipes{}, errors.Wrapf(err, "failed to open stdin fifo") + } } - if f.Stdout, err = fifo.OpenFifo(ctx, set.Stdout, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil { + if f.Stdout, err = fifo.OpenFifo(ctx, fifos.Stdout, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil { f.Stdin.Close() - return nil, err + return pipes{}, errors.Wrapf(err, "failed to open stdout fifo") } - if f.Stderr, err = fifo.OpenFifo(ctx, set.Stderr, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil { + if f.Stderr, err = fifo.OpenFifo(ctx, fifos.Stderr, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil { f.Stdin.Close() f.Stdout.Close() + return pipes{}, errors.Wrapf(err, "failed to open stderr fifo") + } + return pipes{}, nil +} + +// NewDirectIO returns an IO implementation that exposes the IO streams as io.ReadCloser +// and io.WriteCloser. FIFOs are created in /run/containerd/fifo. +func NewDirectIO(ctx context.Context, terminal bool) (*DirectIO, error) { + fifos, err := newFIFOSetInTempDir("") + if err != nil { return nil, err } - return f, nil + fifos.Terminal = terminal + + ctx, cancel := context.WithCancel(context.Background()) + pipes, err := openFifos(ctx, fifos) + return &DirectIO{ + pipes: pipes, + cio: cio{ + config: fifos.Config, + closers: append(pipes.closers(), fifos), + cancel: cancel, + }, + }, err } // DirectIO allows task IO to be handled externally by the caller type DirectIO struct { - Stdin io.WriteCloser - Stdout io.ReadCloser - Stderr io.ReadCloser - - set *FIFOSet + pipes + cio } -// IOCreate returns IO avaliable for use with task creation -func (f *DirectIO) IOCreate(id string) (IO, error) { - return f, nil -} - -// IOAttach returns IO avaliable for use with task attachment -func (f *DirectIO) IOAttach(set *FIFOSet) (IO, error) { - return f, nil -} - -// Config returns the Config -func (f *DirectIO) Config() Config { - return f.set.Config -} - -// Cancel stops any IO copy operations -// -// Not applicable for DirectIO -func (f *DirectIO) Cancel() { - // nothing to cancel as all operations are handled externally -} - -// Wait on any IO copy operations -// -// Not applicable for DirectIO -func (f *DirectIO) Wait() { - // nothing to wait on as all operations are handled externally -} - -// Close closes all open fds -func (f *DirectIO) Close() error { - err := f.Stdin.Close() - if err2 := f.Stdout.Close(); err == nil { - err = err2 - } - if err2 := f.Stderr.Close(); err == nil { - err = err2 - } - return err -} - -// Delete removes the underlying directory containing fifos -func (f *DirectIO) Delete() error { - return f.set.Close() -} +var _ IO = &DirectIO{} diff --git a/cio/io_windows.go b/cio/io_windows.go index 3d3fffa6a..6ecfba43c 100644 --- a/cio/io_windows.go +++ b/cio/io_windows.go @@ -13,8 +13,8 @@ import ( const pipeRoot = `\\.\pipe` -// newFIFOSetTempDir returns a new set of fifos for the task -func newFIFOSetTempDir(id string) (*FIFOSet, error) { +// newFIFOSetInTempDir returns a new set of fifos for the task +func newFIFOSetInTempDir(id string) (*FIFOSet, error) { return &FIFOSet{ StdIn: fmt.Sprintf(`%s\ctr-%s-stdin`, pipeRoot, id), StdOut: fmt.Sprintf(`%s\ctr-%s-stdout`, pipeRoot, id), @@ -22,8 +22,9 @@ func newFIFOSetTempDir(id string) (*FIFOSet, error) { }, nil } -func copyIO(fifos *FIFOSet, ioset *ioSet, tty bool) (_ *wgCloser, err error) { +func copyIO(fifos *FIFOSet, ioset *ioSet) (*cio, error) { var ( + err error wg sync.WaitGroup set []io.Closer ) @@ -78,7 +79,7 @@ func copyIO(fifos *FIFOSet, ioset *ioSet, tty bool) (_ *wgCloser, err error) { }() } - if !tty && fifos.StdErr != "" { + if !fifos.Terminal && fifos.StdErr != "" { l, err := winio.ListenPipe(fifos.StdErr, nil) if err != nil { return nil, errors.Wrapf(err, "failed to create stderr pipe %s", fifos.StdErr) @@ -104,7 +105,7 @@ func copyIO(fifos *FIFOSet, ioset *ioSet, tty bool) (_ *wgCloser, err error) { }() } - return &wgCloser{ + return &cio{ wg: &wg, dir: fifos.Dir, set: set, diff --git a/container_linux_test.go b/container_linux_test.go index 85e934f52..5fc298b95 100644 --- a/container_linux_test.go +++ b/container_linux_test.go @@ -298,7 +298,7 @@ func TestContainerAttach(t *testing.T) { expected := "hello" + newLine - direct, err := cio.NewDirectIO(ctx, false) + direct, err := newDirectIO(ctx) if err != nil { t.Error(err) return @@ -372,6 +372,49 @@ func TestContainerAttach(t *testing.T) { } } +func newDirectIO(ctx context.Context) (*directIO, error) { + dio, err := cio.NewDirectIO(ctx, false) + if err != nil { + return nil, err + } + return &directIO{DirectIO: *dio}, nil +} + +type directIO struct { + cio.DirectIO +} + +// ioCreate returns IO avaliable for use with task creation +func (f *directIO) IOCreate(id string) (cio.IO, error) { + return f, nil +} + +// ioAttach returns IO avaliable for use with task attachment +func (f *directIO) IOAttach(set *cio.FIFOSet) (cio.IO, error) { + return f, nil +} + +func (f *directIO) Cancel() { + // nothing to cancel as all operations are handled externally +} + +// Close closes all open fds +func (f *directIO) Close() error { + err := f.Stdin.Close() + if err2 := f.Stdout.Close(); err == nil { + err = err2 + } + if err2 := f.Stderr.Close(); err == nil { + err = err2 + } + return err +} + +// Delete removes the underlying directory containing fifos +func (f *directIO) Delete() error { + return f.DirectIO.Close() +} + func TestContainerUsername(t *testing.T) { t.Parallel() @@ -393,7 +436,7 @@ func TestContainerUsername(t *testing.T) { t.Error(err) return } - direct, err := cio.NewDirectIO(ctx, false) + direct, err := newDirectIO(ctx) if err != nil { t.Error(err) return @@ -486,7 +529,7 @@ func TestContainerAttachProcess(t *testing.T) { expected := "hello" + newLine // creating IO early for easy resource cleanup - direct, err := cio.NewDirectIO(ctx, false) + direct, err := newDirectIO(ctx) if err != nil { t.Error(err) return @@ -602,7 +645,7 @@ func TestContainerUserID(t *testing.T) { t.Error(err) return } - direct, err := cio.NewDirectIO(ctx, false) + direct, err := newDirectIO(ctx) if err != nil { t.Error(err) return