Cleanup cio.FIFOSet interface

Remove duplication with cio.Config
unexport newFIFOSetTempDir() since it includes hardcoded paths
Expose os.RemoveAll() as part of FIFOSet instead of a Dir

Signed-off-by: Daniel Nephin <dnephin@gmail.com>
This commit is contained in:
Daniel Nephin 2017-12-06 15:56:46 -05:00
parent 6393165b09
commit f79ec5b55f
4 changed files with 89 additions and 105 deletions

View File

@ -75,68 +75,70 @@ type Creation func(id string) (IO, error)
// will be sent only to the first reads // will be sent only to the first reads
type Attach func(*FIFOSet) (IO, error) type Attach func(*FIFOSet) (IO, error)
// FIFOSet is a set of file paths to FIFOs for a task's standard IO streams
type FIFOSet struct {
Config
close func() error
}
func (f *FIFOSet) Close() error {
if f.close != nil {
return f.close()
}
return nil
}
// NewFIFOSet returns a new FIFOSet from a Config and a close function
func NewFIFOSet(config Config, close func() error) *FIFOSet {
return &FIFOSet{Config: config, close: close}
}
// NewIO returns an Creation that will provide IO sets without a terminal // NewIO returns an Creation that will provide IO sets without a terminal
func NewIO(stdin io.Reader, stdout, stderr io.Writer) Creation { func NewIO(stdin io.Reader, stdout, stderr io.Writer) Creation {
return NewIOWithTerminal(stdin, stdout, stderr, false) return NewIOWithTerminal(stdin, stdout, stderr, false)
} }
// NewIOWithTerminal creates a new io set with the provied io.Reader/Writers for use with a terminal // NewIOWithTerminal creates a new io set with the provided io.Reader/Writers for use with a terminal
func NewIOWithTerminal(stdin io.Reader, stdout, stderr io.Writer, terminal bool) Creation { func NewIOWithTerminal(stdin io.Reader, stdout, stderr io.Writer, terminal bool) Creation {
return func(id string) (_ IO, err error) { return func(id string) (_ IO, err error) {
paths, err := NewFifos(id) fifos, err := newFIFOSetTempDir(id)
if err != nil { if err != nil {
return nil, err return nil, err
} }
defer func() { defer func() {
if err != nil && paths.Dir != "" { if err != nil {
os.RemoveAll(paths.Dir) fifos.Close()
} }
}() }()
cfg := Config{ cfg := fifos.Config
Terminal: terminal, cfg.Terminal = terminal
Stdout: paths.Out,
Stderr: paths.Err,
Stdin: paths.In,
}
i := &cio{config: cfg} i := &cio{config: cfg}
set := &ioSet{ set := &ioSet{
in: stdin, in: stdin,
out: stdout, out: stdout,
err: stderr, err: stderr,
} }
closer, err := copyIO(paths, set, cfg.Terminal) closer, err := copyIO(fifos, set, cfg.Terminal)
if err != nil {
return nil, err
}
i.closer = closer i.closer = closer
return i, nil return i, err
} }
} }
// WithAttach attaches the existing io for a task to the provided io.Reader/Writers // WithAttach attaches the existing io for a task to the provided io.Reader/Writers
func WithAttach(stdin io.Reader, stdout, stderr io.Writer) Attach { func WithAttach(stdin io.Reader, stdout, stderr io.Writer) Attach {
return func(paths *FIFOSet) (IO, error) { return func(fifos *FIFOSet) (IO, error) {
if paths == nil { if fifos == nil {
return nil, fmt.Errorf("cannot attach to existing fifos") return nil, fmt.Errorf("cannot attach to existing fifos")
} }
cfg := Config{ i := &cio{config: fifos.Config}
Terminal: paths.Terminal,
Stdout: paths.Out,
Stderr: paths.Err,
Stdin: paths.In,
}
i := &cio{config: cfg}
set := &ioSet{ set := &ioSet{
in: stdin, in: stdin,
out: stdout, out: stdout,
err: stderr, err: stderr,
} }
closer, err := copyIO(paths, set, cfg.Terminal) closer, err := copyIO(fifos, set, fifos.Terminal)
if err != nil {
return nil, err
}
i.closer = closer i.closer = closer
return i, nil return i, err
} }
} }
@ -156,16 +158,6 @@ func NullIO(id string) (IO, error) {
return &cio{}, nil return &cio{}, nil
} }
// FIFOSet is a set of fifos for use with tasks
type FIFOSet struct {
// Dir is the directory holding the task fifos
Dir string
// In, Out, and Err fifo paths
In, Out, Err string
// Terminal returns true if a terminal is being used for the task
Terminal bool
}
type ioSet struct { type ioSet struct {
in io.Reader in io.Reader
out, err io.Writer out, err io.Writer
@ -173,7 +165,6 @@ type ioSet struct {
type wgCloser struct { type wgCloser struct {
wg *sync.WaitGroup wg *sync.WaitGroup
dir string
set []io.Closer set []io.Closer
cancel context.CancelFunc cancel context.CancelFunc
} }
@ -183,12 +174,10 @@ func (g *wgCloser) Wait() {
} }
func (g *wgCloser) Close() error { func (g *wgCloser) Close() error {
// TODO: this should return all errors, not mask them
for _, f := range g.set { for _, f := range g.set {
f.Close() f.Close()
} }
if g.dir != "" {
return os.RemoveAll(g.dir)
}
return nil return nil
} }

View File

@ -14,8 +14,8 @@ import (
"github.com/containerd/fifo" "github.com/containerd/fifo"
) )
// NewFifos returns a new set of fifos for the task // newFIFOSetTempDir returns a new set of fifos for the task
func NewFifos(id string) (*FIFOSet, error) { func newFIFOSetTempDir(id string) (*FIFOSet, error) {
root := "/run/containerd/fifo" root := "/run/containerd/fifo"
if err := os.MkdirAll(root, 0700); err != nil { if err := os.MkdirAll(root, 0700); err != nil {
return nil, err return nil, err
@ -24,21 +24,23 @@ func NewFifos(id string) (*FIFOSet, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &FIFOSet{ closer := func() error {
Dir: dir, return os.RemoveAll(dir)
In: filepath.Join(dir, id+"-stdin"), }
Out: filepath.Join(dir, id+"-stdout"), return NewFIFOSet(Config{
Err: filepath.Join(dir, id+"-stderr"), Stdin: filepath.Join(dir, id+"-stdin"),
}, nil Stdout: filepath.Join(dir, id+"-stdout"),
Stderr: filepath.Join(dir, id+"-stderr"),
}, closer), nil
} }
func copyIO(fifos *FIFOSet, ioset *ioSet, tty bool) (_ *wgCloser, err error) { func copyIO(fifos *FIFOSet, ioset *ioSet, tty bool) (_ *wgCloser, err error) {
var ( var (
f io.ReadWriteCloser f io.ReadWriteCloser
set []io.Closer
ctx, cancel = context.WithCancel(context.Background()) ctx, cancel = context.WithCancel(context.Background())
wg = &sync.WaitGroup{} wg = &sync.WaitGroup{}
) )
set := []io.Closer{fifos}
defer func() { defer func() {
if err != nil { if err != nil {
for _, f := range set { for _, f := range set {
@ -48,7 +50,7 @@ func copyIO(fifos *FIFOSet, ioset *ioSet, tty bool) (_ *wgCloser, err error) {
} }
}() }()
if f, err = fifo.OpenFifo(ctx, fifos.In, syscall.O_WRONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil { if f, err = fifo.OpenFifo(ctx, fifos.Stdin, syscall.O_WRONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil {
return nil, err return nil, err
} }
set = append(set, f) set = append(set, f)
@ -57,7 +59,7 @@ func copyIO(fifos *FIFOSet, ioset *ioSet, tty bool) (_ *wgCloser, err error) {
w.Close() w.Close()
}(f) }(f)
if f, err = fifo.OpenFifo(ctx, fifos.Out, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil { if f, err = fifo.OpenFifo(ctx, fifos.Stdout, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil {
return nil, err return nil, err
} }
set = append(set, f) set = append(set, f)
@ -68,7 +70,7 @@ func copyIO(fifos *FIFOSet, ioset *ioSet, tty bool) (_ *wgCloser, err error) {
wg.Done() wg.Done()
}(f) }(f)
if f, err = fifo.OpenFifo(ctx, fifos.Err, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil { if f, err = fifo.OpenFifo(ctx, fifos.Stderr, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil {
return nil, err return nil, err
} }
set = append(set, f) set = append(set, f)
@ -83,7 +85,6 @@ func copyIO(fifos *FIFOSet, ioset *ioSet, tty bool) (_ *wgCloser, err error) {
} }
return &wgCloser{ return &wgCloser{
wg: wg, wg: wg,
dir: fifos.Dir,
set: set, set: set,
cancel: cancel, cancel: cancel,
}, nil }, nil
@ -91,27 +92,26 @@ func copyIO(fifos *FIFOSet, ioset *ioSet, tty bool) (_ *wgCloser, err error) {
// NewDirectIO returns an IO implementation that exposes the pipes directly // NewDirectIO returns an IO implementation that exposes the pipes directly
func NewDirectIO(ctx context.Context, terminal bool) (*DirectIO, error) { func NewDirectIO(ctx context.Context, terminal bool) (*DirectIO, error) {
set, err := NewFifos("") set, err := newFIFOSetTempDir("")
if err != nil { if err != nil {
return nil, err return nil, err
} }
f := &DirectIO{ set.Terminal = terminal
set: set, f := &DirectIO{set: set}
terminal: terminal,
}
defer func() { defer func() {
if err != nil { if err != nil {
f.Delete() f.Delete()
} }
}() }()
if f.Stdin, err = fifo.OpenFifo(ctx, set.In, syscall.O_WRONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil { if f.Stdin, err = fifo.OpenFifo(ctx, set.Stdin, syscall.O_WRONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil {
return nil, err return nil, err
} }
if f.Stdout, err = fifo.OpenFifo(ctx, set.Out, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil { if f.Stdout, err = fifo.OpenFifo(ctx, set.Stdout, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil {
f.Stdin.Close() f.Stdin.Close()
return nil, err return nil, err
} }
if f.Stderr, err = fifo.OpenFifo(ctx, set.Err, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil { if f.Stderr, err = fifo.OpenFifo(ctx, set.Stderr, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil {
f.Stdin.Close() f.Stdin.Close()
f.Stdout.Close() f.Stdout.Close()
return nil, err return nil, err
@ -125,8 +125,7 @@ type DirectIO struct {
Stdout io.ReadCloser Stdout io.ReadCloser
Stderr io.ReadCloser Stderr io.ReadCloser
set *FIFOSet set *FIFOSet
terminal bool
} }
// IOCreate returns IO avaliable for use with task creation // IOCreate returns IO avaliable for use with task creation
@ -141,12 +140,7 @@ func (f *DirectIO) IOAttach(set *FIFOSet) (IO, error) {
// Config returns the Config // Config returns the Config
func (f *DirectIO) Config() Config { func (f *DirectIO) Config() Config {
return Config{ return f.set.Config
Terminal: f.terminal,
Stdin: f.set.In,
Stdout: f.set.Out,
Stderr: f.set.Err,
}
} }
// Cancel stops any IO copy operations // Cancel stops any IO copy operations
@ -177,8 +171,5 @@ func (f *DirectIO) Close() error {
// Delete removes the underlying directory containing fifos // Delete removes the underlying directory containing fifos
func (f *DirectIO) Delete() error { func (f *DirectIO) Delete() error {
if f.set.Dir == "" { return f.set.Close()
return nil
}
return os.RemoveAll(f.set.Dir)
} }

View File

@ -13,12 +13,12 @@ import (
const pipeRoot = `\\.\pipe` const pipeRoot = `\\.\pipe`
// NewFifos returns a new set of fifos for the task // newFIFOSetTempDir returns a new set of fifos for the task
func NewFifos(id string) (*FIFOSet, error) { func newFIFOSetTempDir(id string) (*FIFOSet, error) {
return &FIFOSet{ return &FIFOSet{
In: fmt.Sprintf(`%s\ctr-%s-stdin`, pipeRoot, id), StdIn: fmt.Sprintf(`%s\ctr-%s-stdin`, pipeRoot, id),
Out: fmt.Sprintf(`%s\ctr-%s-stdout`, pipeRoot, id), StdOut: fmt.Sprintf(`%s\ctr-%s-stdout`, pipeRoot, id),
Err: fmt.Sprintf(`%s\ctr-%s-stderr`, pipeRoot, id), StdErr: fmt.Sprintf(`%s\ctr-%s-stderr`, pipeRoot, id),
}, nil }, nil
} }
@ -28,10 +28,10 @@ func copyIO(fifos *FIFOSet, ioset *ioSet, tty bool) (_ *wgCloser, err error) {
set []io.Closer set []io.Closer
) )
if fifos.In != "" { if fifos.StdIn != "" {
l, err := winio.ListenPipe(fifos.In, nil) l, err := winio.ListenPipe(fifos.StdIn, nil)
if err != nil { if err != nil {
return nil, errors.Wrapf(err, "failed to create stdin pipe %s", fifos.In) return nil, errors.Wrapf(err, "failed to create stdin pipe %s", fifos.StdIn)
} }
defer func(l net.Listener) { defer func(l net.Listener) {
if err != nil { if err != nil {
@ -43,7 +43,7 @@ func copyIO(fifos *FIFOSet, ioset *ioSet, tty bool) (_ *wgCloser, err error) {
go func() { go func() {
c, err := l.Accept() c, err := l.Accept()
if err != nil { if err != nil {
log.L.WithError(err).Errorf("failed to accept stdin connection on %s", fifos.In) log.L.WithError(err).Errorf("failed to accept stdin connection on %s", fifos.StdIn)
return return
} }
io.Copy(c, ioset.in) io.Copy(c, ioset.in)
@ -52,10 +52,10 @@ func copyIO(fifos *FIFOSet, ioset *ioSet, tty bool) (_ *wgCloser, err error) {
}() }()
} }
if fifos.Out != "" { if fifos.StdOut != "" {
l, err := winio.ListenPipe(fifos.Out, nil) l, err := winio.ListenPipe(fifos.StdOut, nil)
if err != nil { if err != nil {
return nil, errors.Wrapf(err, "failed to create stdin pipe %s", fifos.Out) return nil, errors.Wrapf(err, "failed to create stdin pipe %s", fifos.StdOut)
} }
defer func(l net.Listener) { defer func(l net.Listener) {
if err != nil { if err != nil {
@ -69,7 +69,7 @@ func copyIO(fifos *FIFOSet, ioset *ioSet, tty bool) (_ *wgCloser, err error) {
defer wg.Done() defer wg.Done()
c, err := l.Accept() c, err := l.Accept()
if err != nil { if err != nil {
log.L.WithError(err).Errorf("failed to accept stdout connection on %s", fifos.Out) log.L.WithError(err).Errorf("failed to accept stdout connection on %s", fifos.StdOut)
return return
} }
io.Copy(ioset.out, c) io.Copy(ioset.out, c)
@ -78,10 +78,10 @@ func copyIO(fifos *FIFOSet, ioset *ioSet, tty bool) (_ *wgCloser, err error) {
}() }()
} }
if !tty && fifos.Err != "" { if !tty && fifos.StdErr != "" {
l, err := winio.ListenPipe(fifos.Err, nil) l, err := winio.ListenPipe(fifos.StdErr, nil)
if err != nil { if err != nil {
return nil, errors.Wrapf(err, "failed to create stderr pipe %s", fifos.Err) return nil, errors.Wrapf(err, "failed to create stderr pipe %s", fifos.StdErr)
} }
defer func(l net.Listener) { defer func(l net.Listener) {
if err != nil { if err != nil {
@ -95,7 +95,7 @@ func copyIO(fifos *FIFOSet, ioset *ioSet, tty bool) (_ *wgCloser, err error) {
defer wg.Done() defer wg.Done()
c, err := l.Accept() c, err := l.Accept()
if err != nil { if err != nil {
log.L.WithError(err).Errorf("failed to accept stderr connection on %s", fifos.Err) log.L.WithError(err).Errorf("failed to accept stderr connection on %s", fifos.StdErr)
return return
} }
io.Copy(ioset.err, c) io.Copy(ioset.err, c)

View File

@ -3,6 +3,7 @@ package containerd
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"os"
"path/filepath" "path/filepath"
"strings" "strings"
@ -288,20 +289,23 @@ func (c *container) get(ctx context.Context) (containers.Container, error) {
return c.client.ContainerService().Get(ctx, c.id) return c.client.ContainerService().Get(ctx, c.id)
} }
// get the existing fifo paths from the task information stored by the daemon
func attachExistingIO(response *tasks.GetResponse, ioAttach cio.Attach) (cio.IO, error) { func attachExistingIO(response *tasks.GetResponse, ioAttach cio.Attach) (cio.IO, error) {
// get the existing fifo paths from the task information stored by the daemon path := getFifoDir([]string{
paths := &cio.FIFOSet{ response.Process.Stdin,
Dir: getFifoDir([]string{ response.Process.Stdout,
response.Process.Stdin, response.Process.Stderr,
response.Process.Stdout, })
response.Process.Stderr, closer := func() error {
}), return os.RemoveAll(path)
In: response.Process.Stdin,
Out: response.Process.Stdout,
Err: response.Process.Stderr,
Terminal: response.Process.Terminal,
} }
return ioAttach(paths) fifoSet := cio.NewFIFOSet(cio.Config{
Stdin: response.Process.Stdin,
Stdout: response.Process.Stdout,
Stderr: response.Process.Stderr,
Terminal: response.Process.Terminal,
}, closer)
return ioAttach(fifoSet)
} }
// getFifoDir looks for any non-empty path for a stdio fifo // getFifoDir looks for any non-empty path for a stdio fifo