diff --git a/cio/io.go b/cio/io.go index c5e112d9c..343e57b79 100644 --- a/cio/io.go +++ b/cio/io.go @@ -75,68 +75,70 @@ type Creation func(id string) (IO, error) // will be sent only to the first reads type Attach func(*FIFOSet) (IO, error) +// FIFOSet is a set of file paths to FIFOs for a task's standard IO streams +type FIFOSet struct { + Config + close func() error +} + +func (f *FIFOSet) Close() error { + if f.close != nil { + return f.close() + } + return nil +} + +// NewFIFOSet returns a new FIFOSet from a Config and a close function +func NewFIFOSet(config Config, close func() error) *FIFOSet { + return &FIFOSet{Config: config, close: close} +} + // NewIO returns an Creation that will provide IO sets without a terminal func NewIO(stdin io.Reader, stdout, stderr io.Writer) Creation { return NewIOWithTerminal(stdin, stdout, stderr, false) } -// NewIOWithTerminal creates a new io set with the provied io.Reader/Writers for use with a terminal +// NewIOWithTerminal creates a new io set with the provided io.Reader/Writers for use with a terminal func NewIOWithTerminal(stdin io.Reader, stdout, stderr io.Writer, terminal bool) Creation { return func(id string) (_ IO, err error) { - paths, err := NewFifos(id) + fifos, err := newFIFOSetTempDir(id) if err != nil { return nil, err } defer func() { - if err != nil && paths.Dir != "" { - os.RemoveAll(paths.Dir) + if err != nil { + fifos.Close() } }() - cfg := Config{ - Terminal: terminal, - Stdout: paths.Out, - Stderr: paths.Err, - Stdin: paths.In, - } + cfg := fifos.Config + cfg.Terminal = terminal i := &cio{config: cfg} set := &ioSet{ in: stdin, out: stdout, err: stderr, } - closer, err := copyIO(paths, set, cfg.Terminal) - if err != nil { - return nil, err - } + closer, err := copyIO(fifos, set, cfg.Terminal) i.closer = closer - return i, nil + return i, err } } // WithAttach attaches the existing io for a task to the provided io.Reader/Writers func WithAttach(stdin io.Reader, stdout, stderr io.Writer) Attach { - return func(paths *FIFOSet) (IO, error) { - if paths == nil { + return func(fifos *FIFOSet) (IO, error) { + if fifos == nil { return nil, fmt.Errorf("cannot attach to existing fifos") } - cfg := Config{ - Terminal: paths.Terminal, - Stdout: paths.Out, - Stderr: paths.Err, - Stdin: paths.In, - } - i := &cio{config: cfg} + i := &cio{config: fifos.Config} set := &ioSet{ in: stdin, out: stdout, err: stderr, } - closer, err := copyIO(paths, set, cfg.Terminal) - if err != nil { - return nil, err - } + closer, err := copyIO(fifos, set, fifos.Terminal) i.closer = closer - return i, nil + return i, err } } @@ -156,16 +158,6 @@ func NullIO(id string) (IO, error) { return &cio{}, nil } -// FIFOSet is a set of fifos for use with tasks -type FIFOSet struct { - // Dir is the directory holding the task fifos - Dir string - // In, Out, and Err fifo paths - In, Out, Err string - // Terminal returns true if a terminal is being used for the task - Terminal bool -} - type ioSet struct { in io.Reader out, err io.Writer @@ -173,7 +165,6 @@ type ioSet struct { type wgCloser struct { wg *sync.WaitGroup - dir string set []io.Closer cancel context.CancelFunc } @@ -183,12 +174,10 @@ func (g *wgCloser) Wait() { } func (g *wgCloser) Close() error { + // TODO: this should return all errors, not mask them for _, f := range g.set { f.Close() } - if g.dir != "" { - return os.RemoveAll(g.dir) - } return nil } diff --git a/cio/io_unix.go b/cio/io_unix.go index c18f7ecf9..6144ee04c 100644 --- a/cio/io_unix.go +++ b/cio/io_unix.go @@ -14,8 +14,8 @@ import ( "github.com/containerd/fifo" ) -// NewFifos returns a new set of fifos for the task -func NewFifos(id string) (*FIFOSet, error) { +// newFIFOSetTempDir returns a new set of fifos for the task +func newFIFOSetTempDir(id string) (*FIFOSet, error) { root := "/run/containerd/fifo" if err := os.MkdirAll(root, 0700); err != nil { return nil, err @@ -24,21 +24,23 @@ func NewFifos(id string) (*FIFOSet, error) { if err != nil { return nil, err } - return &FIFOSet{ - Dir: dir, - In: filepath.Join(dir, id+"-stdin"), - Out: filepath.Join(dir, id+"-stdout"), - Err: filepath.Join(dir, id+"-stderr"), - }, nil + closer := func() error { + return os.RemoveAll(dir) + } + return NewFIFOSet(Config{ + Stdin: filepath.Join(dir, id+"-stdin"), + Stdout: filepath.Join(dir, id+"-stdout"), + Stderr: filepath.Join(dir, id+"-stderr"), + }, closer), nil } func copyIO(fifos *FIFOSet, ioset *ioSet, tty bool) (_ *wgCloser, err error) { var ( f io.ReadWriteCloser - set []io.Closer ctx, cancel = context.WithCancel(context.Background()) wg = &sync.WaitGroup{} ) + set := []io.Closer{fifos} defer func() { if err != nil { for _, f := range set { @@ -48,7 +50,7 @@ func copyIO(fifos *FIFOSet, ioset *ioSet, tty bool) (_ *wgCloser, err error) { } }() - if f, err = fifo.OpenFifo(ctx, fifos.In, syscall.O_WRONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil { + if f, err = fifo.OpenFifo(ctx, fifos.Stdin, syscall.O_WRONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil { return nil, err } set = append(set, f) @@ -57,7 +59,7 @@ func copyIO(fifos *FIFOSet, ioset *ioSet, tty bool) (_ *wgCloser, err error) { w.Close() }(f) - if f, err = fifo.OpenFifo(ctx, fifos.Out, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil { + if f, err = fifo.OpenFifo(ctx, fifos.Stdout, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil { return nil, err } set = append(set, f) @@ -68,7 +70,7 @@ func copyIO(fifos *FIFOSet, ioset *ioSet, tty bool) (_ *wgCloser, err error) { wg.Done() }(f) - if f, err = fifo.OpenFifo(ctx, fifos.Err, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil { + if f, err = fifo.OpenFifo(ctx, fifos.Stderr, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil { return nil, err } set = append(set, f) @@ -83,7 +85,6 @@ func copyIO(fifos *FIFOSet, ioset *ioSet, tty bool) (_ *wgCloser, err error) { } return &wgCloser{ wg: wg, - dir: fifos.Dir, set: set, cancel: cancel, }, nil @@ -91,27 +92,26 @@ func copyIO(fifos *FIFOSet, ioset *ioSet, tty bool) (_ *wgCloser, err error) { // NewDirectIO returns an IO implementation that exposes the pipes directly func NewDirectIO(ctx context.Context, terminal bool) (*DirectIO, error) { - set, err := NewFifos("") + set, err := newFIFOSetTempDir("") if err != nil { return nil, err } - f := &DirectIO{ - set: set, - terminal: terminal, - } + set.Terminal = terminal + f := &DirectIO{set: set} + defer func() { if err != nil { f.Delete() } }() - if f.Stdin, err = fifo.OpenFifo(ctx, set.In, syscall.O_WRONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil { + if f.Stdin, err = fifo.OpenFifo(ctx, set.Stdin, syscall.O_WRONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil { return nil, err } - if f.Stdout, err = fifo.OpenFifo(ctx, set.Out, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil { + if f.Stdout, err = fifo.OpenFifo(ctx, set.Stdout, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil { f.Stdin.Close() return nil, err } - if f.Stderr, err = fifo.OpenFifo(ctx, set.Err, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil { + if f.Stderr, err = fifo.OpenFifo(ctx, set.Stderr, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil { f.Stdin.Close() f.Stdout.Close() return nil, err @@ -125,8 +125,7 @@ type DirectIO struct { Stdout io.ReadCloser Stderr io.ReadCloser - set *FIFOSet - terminal bool + set *FIFOSet } // IOCreate returns IO avaliable for use with task creation @@ -141,12 +140,7 @@ func (f *DirectIO) IOAttach(set *FIFOSet) (IO, error) { // Config returns the Config func (f *DirectIO) Config() Config { - return Config{ - Terminal: f.terminal, - Stdin: f.set.In, - Stdout: f.set.Out, - Stderr: f.set.Err, - } + return f.set.Config } // Cancel stops any IO copy operations @@ -177,8 +171,5 @@ func (f *DirectIO) Close() error { // Delete removes the underlying directory containing fifos func (f *DirectIO) Delete() error { - if f.set.Dir == "" { - return nil - } - return os.RemoveAll(f.set.Dir) + return f.set.Close() } diff --git a/cio/io_windows.go b/cio/io_windows.go index 1458c3173..3d3fffa6a 100644 --- a/cio/io_windows.go +++ b/cio/io_windows.go @@ -13,12 +13,12 @@ import ( const pipeRoot = `\\.\pipe` -// NewFifos returns a new set of fifos for the task -func NewFifos(id string) (*FIFOSet, error) { +// newFIFOSetTempDir returns a new set of fifos for the task +func newFIFOSetTempDir(id string) (*FIFOSet, error) { return &FIFOSet{ - In: fmt.Sprintf(`%s\ctr-%s-stdin`, pipeRoot, id), - Out: fmt.Sprintf(`%s\ctr-%s-stdout`, pipeRoot, id), - Err: fmt.Sprintf(`%s\ctr-%s-stderr`, pipeRoot, id), + StdIn: fmt.Sprintf(`%s\ctr-%s-stdin`, pipeRoot, id), + StdOut: fmt.Sprintf(`%s\ctr-%s-stdout`, pipeRoot, id), + StdErr: fmt.Sprintf(`%s\ctr-%s-stderr`, pipeRoot, id), }, nil } @@ -28,10 +28,10 @@ func copyIO(fifos *FIFOSet, ioset *ioSet, tty bool) (_ *wgCloser, err error) { set []io.Closer ) - if fifos.In != "" { - l, err := winio.ListenPipe(fifos.In, nil) + if fifos.StdIn != "" { + l, err := winio.ListenPipe(fifos.StdIn, nil) if err != nil { - return nil, errors.Wrapf(err, "failed to create stdin pipe %s", fifos.In) + return nil, errors.Wrapf(err, "failed to create stdin pipe %s", fifos.StdIn) } defer func(l net.Listener) { if err != nil { @@ -43,7 +43,7 @@ func copyIO(fifos *FIFOSet, ioset *ioSet, tty bool) (_ *wgCloser, err error) { go func() { c, err := l.Accept() if err != nil { - log.L.WithError(err).Errorf("failed to accept stdin connection on %s", fifos.In) + log.L.WithError(err).Errorf("failed to accept stdin connection on %s", fifos.StdIn) return } io.Copy(c, ioset.in) @@ -52,10 +52,10 @@ func copyIO(fifos *FIFOSet, ioset *ioSet, tty bool) (_ *wgCloser, err error) { }() } - if fifos.Out != "" { - l, err := winio.ListenPipe(fifos.Out, nil) + if fifos.StdOut != "" { + l, err := winio.ListenPipe(fifos.StdOut, nil) if err != nil { - return nil, errors.Wrapf(err, "failed to create stdin pipe %s", fifos.Out) + return nil, errors.Wrapf(err, "failed to create stdin pipe %s", fifos.StdOut) } defer func(l net.Listener) { if err != nil { @@ -69,7 +69,7 @@ func copyIO(fifos *FIFOSet, ioset *ioSet, tty bool) (_ *wgCloser, err error) { defer wg.Done() c, err := l.Accept() if err != nil { - log.L.WithError(err).Errorf("failed to accept stdout connection on %s", fifos.Out) + log.L.WithError(err).Errorf("failed to accept stdout connection on %s", fifos.StdOut) return } io.Copy(ioset.out, c) @@ -78,10 +78,10 @@ func copyIO(fifos *FIFOSet, ioset *ioSet, tty bool) (_ *wgCloser, err error) { }() } - if !tty && fifos.Err != "" { - l, err := winio.ListenPipe(fifos.Err, nil) + if !tty && fifos.StdErr != "" { + l, err := winio.ListenPipe(fifos.StdErr, nil) if err != nil { - return nil, errors.Wrapf(err, "failed to create stderr pipe %s", fifos.Err) + return nil, errors.Wrapf(err, "failed to create stderr pipe %s", fifos.StdErr) } defer func(l net.Listener) { if err != nil { @@ -95,7 +95,7 @@ func copyIO(fifos *FIFOSet, ioset *ioSet, tty bool) (_ *wgCloser, err error) { defer wg.Done() c, err := l.Accept() if err != nil { - log.L.WithError(err).Errorf("failed to accept stderr connection on %s", fifos.Err) + log.L.WithError(err).Errorf("failed to accept stderr connection on %s", fifos.StdErr) return } io.Copy(ioset.err, c) diff --git a/container.go b/container.go index 716e0456c..1f0e9be2a 100644 --- a/container.go +++ b/container.go @@ -3,6 +3,7 @@ package containerd import ( "context" "encoding/json" + "os" "path/filepath" "strings" @@ -288,20 +289,23 @@ func (c *container) get(ctx context.Context) (containers.Container, error) { return c.client.ContainerService().Get(ctx, c.id) } +// get the existing fifo paths from the task information stored by the daemon func attachExistingIO(response *tasks.GetResponse, ioAttach cio.Attach) (cio.IO, error) { - // get the existing fifo paths from the task information stored by the daemon - paths := &cio.FIFOSet{ - Dir: getFifoDir([]string{ - response.Process.Stdin, - response.Process.Stdout, - response.Process.Stderr, - }), - In: response.Process.Stdin, - Out: response.Process.Stdout, - Err: response.Process.Stderr, - Terminal: response.Process.Terminal, + path := getFifoDir([]string{ + response.Process.Stdin, + response.Process.Stdout, + response.Process.Stderr, + }) + closer := func() error { + return os.RemoveAll(path) } - return ioAttach(paths) + fifoSet := cio.NewFIFOSet(cio.Config{ + Stdin: response.Process.Stdin, + Stdout: response.Process.Stdout, + Stderr: response.Process.Stderr, + Terminal: response.Process.Terminal, + }, closer) + return ioAttach(fifoSet) } // getFifoDir looks for any non-empty path for a stdio fifo