Refactor cio.DirectIO

New code duplication
Better re-use from consumers of the cio package

TODO: io_windows.go

Signed-off-by: Daniel Nephin <dnephin@gmail.com>
This commit is contained in:
Daniel Nephin 2017-12-06 17:28:17 -05:00
parent f79ec5b55f
commit 3146019918
4 changed files with 170 additions and 194 deletions

118
cio/io.go
View File

@ -33,38 +33,6 @@ type IO interface {
Close() error Close() error
} }
// cio is a basic container IO implementation.
type cio struct {
config Config
closer *wgCloser
}
func (c *cio) Config() Config {
return c.config
}
func (c *cio) Cancel() {
if c.closer == nil {
return
}
c.closer.Cancel()
}
func (c *cio) Wait() {
if c.closer == nil {
return
}
c.closer.Wait()
}
func (c *cio) Close() error {
if c.closer == nil {
return nil
}
return c.closer.Close()
}
// Creation creates new IO sets for a task // Creation creates new IO sets for a task
type Creation func(id string) (IO, error) type Creation func(id string) (IO, error)
@ -100,27 +68,15 @@ func NewIO(stdin io.Reader, stdout, stderr io.Writer) Creation {
// NewIOWithTerminal creates a new io set with the provided io.Reader/Writers for use with a terminal // NewIOWithTerminal creates a new io set with the provided io.Reader/Writers for use with a terminal
func NewIOWithTerminal(stdin io.Reader, stdout, stderr io.Writer, terminal bool) Creation { func NewIOWithTerminal(stdin io.Reader, stdout, stderr io.Writer, terminal bool) Creation {
return func(id string) (_ IO, err error) { return func(id string) (IO, error) {
fifos, err := newFIFOSetTempDir(id) fifos, err := newFIFOSetInTempDir(id)
if err != nil { if err != nil {
return nil, err return nil, err
} }
defer func() {
if err != nil { fifos.Terminal = terminal
fifos.Close() set := &ioSet{in: stdin, out: stdout, err: stderr}
} return copyIO(fifos, set)
}()
cfg := fifos.Config
cfg.Terminal = terminal
i := &cio{config: cfg}
set := &ioSet{
in: stdin,
out: stdout,
err: stderr,
}
closer, err := copyIO(fifos, set, cfg.Terminal)
i.closer = closer
return i, err
} }
} }
@ -128,17 +84,10 @@ func NewIOWithTerminal(stdin io.Reader, stdout, stderr io.Writer, terminal bool)
func WithAttach(stdin io.Reader, stdout, stderr io.Writer) Attach { func WithAttach(stdin io.Reader, stdout, stderr io.Writer) Attach {
return func(fifos *FIFOSet) (IO, error) { return func(fifos *FIFOSet) (IO, error) {
if fifos == nil { if fifos == nil {
return nil, fmt.Errorf("cannot attach to existing fifos") return nil, fmt.Errorf("cannot attach, missing fifos")
} }
i := &cio{config: fifos.Config} set := &ioSet{in: stdin, out: stdout, err: stderr}
set := &ioSet{ return copyIO(fifos, set)
in: stdin,
out: stdout,
err: stderr,
}
closer, err := copyIO(fifos, set, fifos.Terminal)
i.closer = closer
return i, err
} }
} }
@ -154,7 +103,7 @@ func StdioTerminal(id string) (IO, error) {
} }
// NullIO redirects the container's IO into /dev/null // NullIO redirects the container's IO into /dev/null
func NullIO(id string) (IO, error) { func NullIO(_ string) (IO, error) {
return &cio{}, nil return &cio{}, nil
} }
@ -163,24 +112,49 @@ type ioSet struct {
out, err io.Writer out, err io.Writer
} }
type wgCloser struct { type pipes struct {
Stdin io.WriteCloser
Stdout io.ReadCloser
Stderr io.ReadCloser
}
func (p *pipes) closers() []io.Closer {
return []io.Closer{p.Stdin, p.Stdout, p.Stderr}
}
// cio is a basic container IO implementation.
type cio struct {
config Config
wg *sync.WaitGroup wg *sync.WaitGroup
set []io.Closer closers []io.Closer
cancel context.CancelFunc cancel context.CancelFunc
} }
func (g *wgCloser) Wait() { func (c *cio) Config() Config {
g.wg.Wait() return c.config
} }
func (g *wgCloser) Close() error { func (c *cio) Wait() {
// TODO: this should return all errors, not mask them if c.wg != nil {
for _, f := range g.set { c.wg.Wait()
f.Close()
} }
return nil
} }
func (g *wgCloser) Cancel() { func (c *cio) Close() error {
g.cancel() var lastErr error
for _, closer := range c.closers {
if closer == nil {
continue
}
if err := closer.Close(); err != nil {
lastErr = err
}
}
return lastErr
}
func (c *cio) Cancel() {
if c.cancel != nil {
c.cancel()
}
} }

View File

@ -12,10 +12,11 @@ import (
"syscall" "syscall"
"github.com/containerd/fifo" "github.com/containerd/fifo"
"github.com/pkg/errors"
) )
// newFIFOSetTempDir returns a new set of fifos for the task // newFIFOSetInTempDir returns a new set of fifos for the task
func newFIFOSetTempDir(id string) (*FIFOSet, error) { func newFIFOSetInTempDir(id string) (*FIFOSet, error) {
root := "/run/containerd/fifo" root := "/run/containerd/fifo"
if err := os.MkdirAll(root, 0700); err != nil { if err := os.MkdirAll(root, 0700); err != nil {
return nil, err return nil, err
@ -34,142 +35,99 @@ func newFIFOSetTempDir(id string) (*FIFOSet, error) {
}, closer), nil }, closer), nil
} }
func copyIO(fifos *FIFOSet, ioset *ioSet, tty bool) (_ *wgCloser, err error) { func copyIO(fifos *FIFOSet, ioset *ioSet) (*cio, error) {
var ( var (
f io.ReadWriteCloser
ctx, cancel = context.WithCancel(context.Background()) ctx, cancel = context.WithCancel(context.Background())
wg = &sync.WaitGroup{} wg = &sync.WaitGroup{}
) )
set := []io.Closer{fifos}
defer func() { pipes, err := openFifos(ctx, fifos)
if err != nil { if err != nil {
for _, f := range set {
f.Close()
}
cancel() cancel()
return nil, err
} }
if fifos.Stdin != "" {
go func() {
io.Copy(pipes.Stdin, ioset.in)
pipes.Stdin.Close()
}()
}
wg.Add(1)
go func() {
io.Copy(ioset.out, pipes.Stdout)
pipes.Stdout.Close()
wg.Done()
}() }()
if f, err = fifo.OpenFifo(ctx, fifos.Stdin, syscall.O_WRONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil { if !fifos.Terminal {
return nil, err
}
set = append(set, f)
go func(w io.WriteCloser) {
io.Copy(w, ioset.in)
w.Close()
}(f)
if f, err = fifo.OpenFifo(ctx, fifos.Stdout, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil {
return nil, err
}
set = append(set, f)
wg.Add(1) wg.Add(1)
go func(r io.ReadCloser) { go func() {
io.Copy(ioset.out, r) io.Copy(ioset.err, pipes.Stderr)
r.Close() pipes.Stderr.Close()
wg.Done() wg.Done()
}(f) }()
if f, err = fifo.OpenFifo(ctx, fifos.Stderr, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil {
return nil, err
} }
set = append(set, f) return &cio{
if !tty {
wg.Add(1)
go func(r io.ReadCloser) {
io.Copy(ioset.err, r)
r.Close()
wg.Done()
}(f)
}
return &wgCloser{
wg: wg, wg: wg,
set: set, closers: append(pipes.closers(), fifos),
cancel: cancel, cancel: cancel,
}, nil }, nil
} }
// NewDirectIO returns an IO implementation that exposes the pipes directly func openFifos(ctx context.Context, fifos *FIFOSet) (pipes, error) {
func NewDirectIO(ctx context.Context, terminal bool) (*DirectIO, error) { var err error
set, err := newFIFOSetTempDir("") f := new(pipes)
if err != nil {
return nil, err
}
set.Terminal = terminal
f := &DirectIO{set: set}
defer func() { defer func() {
if err != nil { if err != nil {
f.Delete() fifos.Close()
} }
}() }()
if f.Stdin, err = fifo.OpenFifo(ctx, set.Stdin, syscall.O_WRONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil {
return nil, err if fifos.Stdin != "" {
if f.Stdin, err = fifo.OpenFifo(ctx, fifos.Stdin, syscall.O_WRONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil {
return pipes{}, errors.Wrapf(err, "failed to open stdin fifo")
} }
if f.Stdout, err = fifo.OpenFifo(ctx, set.Stdout, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil { }
if f.Stdout, err = fifo.OpenFifo(ctx, fifos.Stdout, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil {
f.Stdin.Close() f.Stdin.Close()
return nil, err return pipes{}, errors.Wrapf(err, "failed to open stdout fifo")
} }
if f.Stderr, err = fifo.OpenFifo(ctx, set.Stderr, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil { if f.Stderr, err = fifo.OpenFifo(ctx, fifos.Stderr, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil {
f.Stdin.Close() f.Stdin.Close()
f.Stdout.Close() f.Stdout.Close()
return pipes{}, errors.Wrapf(err, "failed to open stderr fifo")
}
return pipes{}, nil
}
// NewDirectIO returns an IO implementation that exposes the IO streams as io.ReadCloser
// and io.WriteCloser. FIFOs are created in /run/containerd/fifo.
func NewDirectIO(ctx context.Context, terminal bool) (*DirectIO, error) {
fifos, err := newFIFOSetInTempDir("")
if err != nil {
return nil, err return nil, err
} }
return f, nil fifos.Terminal = terminal
ctx, cancel := context.WithCancel(context.Background())
pipes, err := openFifos(ctx, fifos)
return &DirectIO{
pipes: pipes,
cio: cio{
config: fifos.Config,
closers: append(pipes.closers(), fifos),
cancel: cancel,
},
}, err
} }
// DirectIO allows task IO to be handled externally by the caller // DirectIO allows task IO to be handled externally by the caller
type DirectIO struct { type DirectIO struct {
Stdin io.WriteCloser pipes
Stdout io.ReadCloser cio
Stderr io.ReadCloser
set *FIFOSet
} }
// IOCreate returns IO avaliable for use with task creation var _ IO = &DirectIO{}
func (f *DirectIO) IOCreate(id string) (IO, error) {
return f, nil
}
// IOAttach returns IO avaliable for use with task attachment
func (f *DirectIO) IOAttach(set *FIFOSet) (IO, error) {
return f, nil
}
// Config returns the Config
func (f *DirectIO) Config() Config {
return f.set.Config
}
// Cancel stops any IO copy operations
//
// Not applicable for DirectIO
func (f *DirectIO) Cancel() {
// nothing to cancel as all operations are handled externally
}
// Wait on any IO copy operations
//
// Not applicable for DirectIO
func (f *DirectIO) Wait() {
// nothing to wait on as all operations are handled externally
}
// Close closes all open fds
func (f *DirectIO) Close() error {
err := f.Stdin.Close()
if err2 := f.Stdout.Close(); err == nil {
err = err2
}
if err2 := f.Stderr.Close(); err == nil {
err = err2
}
return err
}
// Delete removes the underlying directory containing fifos
func (f *DirectIO) Delete() error {
return f.set.Close()
}

View File

@ -13,8 +13,8 @@ import (
const pipeRoot = `\\.\pipe` const pipeRoot = `\\.\pipe`
// newFIFOSetTempDir returns a new set of fifos for the task // newFIFOSetInTempDir returns a new set of fifos for the task
func newFIFOSetTempDir(id string) (*FIFOSet, error) { func newFIFOSetInTempDir(id string) (*FIFOSet, error) {
return &FIFOSet{ return &FIFOSet{
StdIn: fmt.Sprintf(`%s\ctr-%s-stdin`, pipeRoot, id), StdIn: fmt.Sprintf(`%s\ctr-%s-stdin`, pipeRoot, id),
StdOut: fmt.Sprintf(`%s\ctr-%s-stdout`, pipeRoot, id), StdOut: fmt.Sprintf(`%s\ctr-%s-stdout`, pipeRoot, id),
@ -22,8 +22,9 @@ func newFIFOSetTempDir(id string) (*FIFOSet, error) {
}, nil }, nil
} }
func copyIO(fifos *FIFOSet, ioset *ioSet, tty bool) (_ *wgCloser, err error) { func copyIO(fifos *FIFOSet, ioset *ioSet) (*cio, error) {
var ( var (
err error
wg sync.WaitGroup wg sync.WaitGroup
set []io.Closer set []io.Closer
) )
@ -78,7 +79,7 @@ func copyIO(fifos *FIFOSet, ioset *ioSet, tty bool) (_ *wgCloser, err error) {
}() }()
} }
if !tty && fifos.StdErr != "" { if !fifos.Terminal && fifos.StdErr != "" {
l, err := winio.ListenPipe(fifos.StdErr, nil) l, err := winio.ListenPipe(fifos.StdErr, nil)
if err != nil { if err != nil {
return nil, errors.Wrapf(err, "failed to create stderr pipe %s", fifos.StdErr) return nil, errors.Wrapf(err, "failed to create stderr pipe %s", fifos.StdErr)
@ -104,7 +105,7 @@ func copyIO(fifos *FIFOSet, ioset *ioSet, tty bool) (_ *wgCloser, err error) {
}() }()
} }
return &wgCloser{ return &cio{
wg: &wg, wg: &wg,
dir: fifos.Dir, dir: fifos.Dir,
set: set, set: set,

View File

@ -298,7 +298,7 @@ func TestContainerAttach(t *testing.T) {
expected := "hello" + newLine expected := "hello" + newLine
direct, err := cio.NewDirectIO(ctx, false) direct, err := newDirectIO(ctx)
if err != nil { if err != nil {
t.Error(err) t.Error(err)
return return
@ -372,6 +372,49 @@ func TestContainerAttach(t *testing.T) {
} }
} }
func newDirectIO(ctx context.Context) (*directIO, error) {
dio, err := cio.NewDirectIO(ctx, false)
if err != nil {
return nil, err
}
return &directIO{DirectIO: *dio}, nil
}
type directIO struct {
cio.DirectIO
}
// ioCreate returns IO avaliable for use with task creation
func (f *directIO) IOCreate(id string) (cio.IO, error) {
return f, nil
}
// ioAttach returns IO avaliable for use with task attachment
func (f *directIO) IOAttach(set *cio.FIFOSet) (cio.IO, error) {
return f, nil
}
func (f *directIO) Cancel() {
// nothing to cancel as all operations are handled externally
}
// Close closes all open fds
func (f *directIO) Close() error {
err := f.Stdin.Close()
if err2 := f.Stdout.Close(); err == nil {
err = err2
}
if err2 := f.Stderr.Close(); err == nil {
err = err2
}
return err
}
// Delete removes the underlying directory containing fifos
func (f *directIO) Delete() error {
return f.DirectIO.Close()
}
func TestContainerUsername(t *testing.T) { func TestContainerUsername(t *testing.T) {
t.Parallel() t.Parallel()
@ -393,7 +436,7 @@ func TestContainerUsername(t *testing.T) {
t.Error(err) t.Error(err)
return return
} }
direct, err := cio.NewDirectIO(ctx, false) direct, err := newDirectIO(ctx)
if err != nil { if err != nil {
t.Error(err) t.Error(err)
return return
@ -486,7 +529,7 @@ func TestContainerAttachProcess(t *testing.T) {
expected := "hello" + newLine expected := "hello" + newLine
// creating IO early for easy resource cleanup // creating IO early for easy resource cleanup
direct, err := cio.NewDirectIO(ctx, false) direct, err := newDirectIO(ctx)
if err != nil { if err != nil {
t.Error(err) t.Error(err)
return return
@ -602,7 +645,7 @@ func TestContainerUserID(t *testing.T) {
t.Error(err) t.Error(err)
return return
} }
direct, err := cio.NewDirectIO(ctx, false) direct, err := newDirectIO(ctx)
if err != nil { if err != nil {
t.Error(err) t.Error(err)
return return