Reduce the number of IO constructors
Signed-off-by: Daniel Nephin <dnephin@gmail.com>
This commit is contained in:
108
cio/io.go
108
cio/io.go
@@ -8,7 +8,7 @@ import (
|
||||
"sync"
|
||||
)
|
||||
|
||||
// Config holds the io configurations.
|
||||
// Config holds the IO configurations.
|
||||
type Config struct {
|
||||
// Terminal is true if one has been allocated
|
||||
Terminal bool
|
||||
@@ -49,6 +49,7 @@ type FIFOSet struct {
|
||||
close func() error
|
||||
}
|
||||
|
||||
// Close the FIFOSet
|
||||
func (f *FIFOSet) Close() error {
|
||||
if f.close != nil {
|
||||
return f.close()
|
||||
@@ -61,67 +62,72 @@ func NewFIFOSet(config Config, close func() error) *FIFOSet {
|
||||
return &FIFOSet{Config: config, close: close}
|
||||
}
|
||||
|
||||
// NewIO returns an Creator that will provide IO sets without a terminal
|
||||
func NewIO(stdin io.Reader, stdout, stderr io.Writer) Creator {
|
||||
return NewIOWithTerminal(stdin, stdout, stderr, false)
|
||||
// Streams used to configure a Creator or Attach
|
||||
type Streams struct {
|
||||
Stdin io.Reader
|
||||
Stdout io.Writer
|
||||
Stderr io.Writer
|
||||
Terminal bool
|
||||
}
|
||||
|
||||
// NewIOWithTerminal creates a new io set with the provided io.Reader/Writers for use with a terminal
|
||||
func NewIOWithTerminal(stdin io.Reader, stdout, stderr io.Writer, terminal bool) Creator {
|
||||
return func(id string) (IO, error) {
|
||||
fifos, err := newFIFOSetInTempDir(id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// Opt customize options for creating a Creator or Attach
|
||||
type Opt func(*Streams)
|
||||
|
||||
fifos.Terminal = terminal
|
||||
set := &ioSet{in: stdin, out: stdout, err: stderr}
|
||||
return copyIO(fifos, set)
|
||||
// WithStdio sets stream options to the standard input/output streams
|
||||
func WithStdio(opt *Streams) {
|
||||
WithStreams(os.Stdin, os.Stdout, os.Stderr)(opt)
|
||||
}
|
||||
|
||||
// WithTerminal sets the terminal option
|
||||
func WithTerminal(opt *Streams) {
|
||||
opt.Terminal = true
|
||||
}
|
||||
|
||||
// WithStreams sets the stream options to the specified Reader and Writers
|
||||
func WithStreams(stdin io.Reader, stdout, stderr io.Writer) Opt {
|
||||
return func(opt *Streams) {
|
||||
opt.Stdin = stdin
|
||||
opt.Stdout = stdout
|
||||
opt.Stderr = stderr
|
||||
}
|
||||
}
|
||||
|
||||
// WithAttach attaches the existing io for a task to the provided io.Reader/Writers
|
||||
func WithAttach(stdin io.Reader, stdout, stderr io.Writer) Attach {
|
||||
// NewCreator returns an IO creator from the options
|
||||
func NewCreator(opts ...Opt) Creator {
|
||||
streams := &Streams{}
|
||||
for _, opt := range opts {
|
||||
opt(streams)
|
||||
}
|
||||
return func(id string) (IO, error) {
|
||||
// TODO: accept root as a param
|
||||
root := "/run/containerd/fifo"
|
||||
fifos, err := NewFIFOSetInDir(root, id, streams.Terminal)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return copyIO(fifos, streams)
|
||||
}
|
||||
}
|
||||
|
||||
// NewAttach attaches the existing io for a task to the provided io.Reader/Writers
|
||||
func NewAttach(opts ...Opt) Attach {
|
||||
streams := &Streams{}
|
||||
for _, opt := range opts {
|
||||
opt(streams)
|
||||
}
|
||||
return func(fifos *FIFOSet) (IO, error) {
|
||||
if fifos == nil {
|
||||
return nil, fmt.Errorf("cannot attach, missing fifos")
|
||||
}
|
||||
set := &ioSet{in: stdin, out: stdout, err: stderr}
|
||||
return copyIO(fifos, set)
|
||||
return copyIO(fifos, streams)
|
||||
}
|
||||
}
|
||||
|
||||
// Stdio returns an IO set to be used for a task
|
||||
// that outputs the container's IO as the current processes Stdio
|
||||
func Stdio(id string) (IO, error) {
|
||||
return NewIO(os.Stdin, os.Stdout, os.Stderr)(id)
|
||||
}
|
||||
|
||||
// StdioTerminal will setup the IO for the task to use a terminal
|
||||
func StdioTerminal(id string) (IO, error) {
|
||||
return NewIOWithTerminal(os.Stdin, os.Stdout, os.Stderr, true)(id)
|
||||
}
|
||||
|
||||
// NullIO redirects the container's IO into /dev/null
|
||||
func NullIO(_ string) (IO, error) {
|
||||
return &cio{}, nil
|
||||
}
|
||||
|
||||
type ioSet struct {
|
||||
in io.Reader
|
||||
out, err io.Writer
|
||||
}
|
||||
|
||||
type pipes struct {
|
||||
Stdin io.WriteCloser
|
||||
Stdout io.ReadCloser
|
||||
Stderr io.ReadCloser
|
||||
}
|
||||
|
||||
func (p *pipes) closers() []io.Closer {
|
||||
return []io.Closer{p.Stdin, p.Stdout, p.Stderr}
|
||||
}
|
||||
|
||||
// cio is a basic container IO implementation.
|
||||
type cio struct {
|
||||
config Config
|
||||
@@ -158,3 +164,17 @@ func (c *cio) Cancel() {
|
||||
c.cancel()
|
||||
}
|
||||
}
|
||||
|
||||
type pipes struct {
|
||||
Stdin io.WriteCloser
|
||||
Stdout io.ReadCloser
|
||||
Stderr io.ReadCloser
|
||||
}
|
||||
|
||||
// DirectIO allows task IO to be handled externally by the caller
|
||||
type DirectIO struct {
|
||||
pipes
|
||||
cio
|
||||
}
|
||||
|
||||
var _ IO = &DirectIO{}
|
||||
|
||||
@@ -15,11 +15,12 @@ import (
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
// newFIFOSetInTempDir returns a new set of fifos for the task
|
||||
func newFIFOSetInTempDir(id string) (*FIFOSet, error) {
|
||||
root := "/run/containerd/fifo"
|
||||
if err := os.MkdirAll(root, 0700); err != nil {
|
||||
return nil, err
|
||||
// NewFIFOSetInDir returns a new FIFOSet with paths in a temporary directory under root
|
||||
func NewFIFOSetInDir(root, id string, terminal bool) (*FIFOSet, error) {
|
||||
if root != "" {
|
||||
if err := os.MkdirAll(root, 0700); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
dir, err := ioutil.TempDir(root, "")
|
||||
if err != nil {
|
||||
@@ -29,18 +30,15 @@ func newFIFOSetInTempDir(id string) (*FIFOSet, error) {
|
||||
return os.RemoveAll(dir)
|
||||
}
|
||||
return NewFIFOSet(Config{
|
||||
Stdin: filepath.Join(dir, id+"-stdin"),
|
||||
Stdout: filepath.Join(dir, id+"-stdout"),
|
||||
Stderr: filepath.Join(dir, id+"-stderr"),
|
||||
Stdin: filepath.Join(dir, id+"-stdin"),
|
||||
Stdout: filepath.Join(dir, id+"-stdout"),
|
||||
Stderr: filepath.Join(dir, id+"-stderr"),
|
||||
Terminal: terminal,
|
||||
}, closer), nil
|
||||
}
|
||||
|
||||
func copyIO(fifos *FIFOSet, ioset *ioSet) (*cio, error) {
|
||||
var (
|
||||
ctx, cancel = context.WithCancel(context.Background())
|
||||
wg = &sync.WaitGroup{}
|
||||
)
|
||||
|
||||
func copyIO(fifos *FIFOSet, ioset *Streams) (*cio, error) {
|
||||
var ctx, cancel = context.WithCancel(context.Background())
|
||||
pipes, err := openFifos(ctx, fifos)
|
||||
if err != nil {
|
||||
cancel()
|
||||
@@ -49,14 +47,15 @@ func copyIO(fifos *FIFOSet, ioset *ioSet) (*cio, error) {
|
||||
|
||||
if fifos.Stdin != "" {
|
||||
go func() {
|
||||
io.Copy(pipes.Stdin, ioset.in)
|
||||
io.Copy(pipes.Stdin, ioset.Stdin)
|
||||
pipes.Stdin.Close()
|
||||
}()
|
||||
}
|
||||
|
||||
var wg = &sync.WaitGroup{}
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
io.Copy(ioset.out, pipes.Stdout)
|
||||
io.Copy(ioset.Stdout, pipes.Stdout)
|
||||
pipes.Stdout.Close()
|
||||
wg.Done()
|
||||
}()
|
||||
@@ -64,12 +63,13 @@ func copyIO(fifos *FIFOSet, ioset *ioSet) (*cio, error) {
|
||||
if !fifos.Terminal {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
io.Copy(ioset.err, pipes.Stderr)
|
||||
io.Copy(ioset.Stderr, pipes.Stderr)
|
||||
pipes.Stderr.Close()
|
||||
wg.Done()
|
||||
}()
|
||||
}
|
||||
return &cio{
|
||||
config: fifos.Config,
|
||||
wg: wg,
|
||||
closers: append(pipes.closers(), fifos),
|
||||
cancel: cancel,
|
||||
@@ -78,41 +78,38 @@ func copyIO(fifos *FIFOSet, ioset *ioSet) (*cio, error) {
|
||||
|
||||
func openFifos(ctx context.Context, fifos *FIFOSet) (pipes, error) {
|
||||
var err error
|
||||
f := new(pipes)
|
||||
|
||||
defer func() {
|
||||
if err != nil {
|
||||
fifos.Close()
|
||||
}
|
||||
}()
|
||||
|
||||
var f pipes
|
||||
if fifos.Stdin != "" {
|
||||
if f.Stdin, err = fifo.OpenFifo(ctx, fifos.Stdin, syscall.O_WRONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil {
|
||||
return pipes{}, errors.Wrapf(err, "failed to open stdin fifo")
|
||||
return f, errors.Wrapf(err, "failed to open stdin fifo")
|
||||
}
|
||||
}
|
||||
if f.Stdout, err = fifo.OpenFifo(ctx, fifos.Stdout, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil {
|
||||
f.Stdin.Close()
|
||||
return pipes{}, errors.Wrapf(err, "failed to open stdout fifo")
|
||||
if fifos.Stdout != "" {
|
||||
if f.Stdout, err = fifo.OpenFifo(ctx, fifos.Stdout, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil {
|
||||
f.Stdin.Close()
|
||||
return f, errors.Wrapf(err, "failed to open stdout fifo")
|
||||
}
|
||||
}
|
||||
if f.Stderr, err = fifo.OpenFifo(ctx, fifos.Stderr, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil {
|
||||
f.Stdin.Close()
|
||||
f.Stdout.Close()
|
||||
return pipes{}, errors.Wrapf(err, "failed to open stderr fifo")
|
||||
if fifos.Stderr != "" {
|
||||
if f.Stderr, err = fifo.OpenFifo(ctx, fifos.Stderr, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil {
|
||||
f.Stdin.Close()
|
||||
f.Stdout.Close()
|
||||
return f, errors.Wrapf(err, "failed to open stderr fifo")
|
||||
}
|
||||
}
|
||||
return pipes{}, nil
|
||||
return f, nil
|
||||
}
|
||||
|
||||
// NewDirectIO returns an IO implementation that exposes the IO streams as io.ReadCloser
|
||||
// and io.WriteCloser. FIFOs are created in /run/containerd/fifo.
|
||||
func NewDirectIO(ctx context.Context, terminal bool) (*DirectIO, error) {
|
||||
fifos, err := newFIFOSetInTempDir("")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
fifos.Terminal = terminal
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
// and io.WriteCloser.
|
||||
func NewDirectIO(ctx context.Context, fifos *FIFOSet) (*DirectIO, error) {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
pipes, err := openFifos(ctx, fifos)
|
||||
return &DirectIO{
|
||||
pipes: pipes,
|
||||
@@ -124,10 +121,6 @@ func NewDirectIO(ctx context.Context, terminal bool) (*DirectIO, error) {
|
||||
}, err
|
||||
}
|
||||
|
||||
// DirectIO allows task IO to be handled externally by the caller
|
||||
type DirectIO struct {
|
||||
pipes
|
||||
cio
|
||||
func (p *pipes) closers() []io.Closer {
|
||||
return []io.Closer{p.Stdin, p.Stdout, p.Stderr}
|
||||
}
|
||||
|
||||
var _ IO = &DirectIO{}
|
||||
|
||||
@@ -13,26 +13,26 @@ import (
|
||||
|
||||
const pipeRoot = `\\.\pipe`
|
||||
|
||||
// newFIFOSetInTempDir returns a new set of fifos for the task
|
||||
func newFIFOSetInTempDir(id string) (*FIFOSet, error) {
|
||||
return &FIFOSet{
|
||||
StdIn: fmt.Sprintf(`%s\ctr-%s-stdin`, pipeRoot, id),
|
||||
StdOut: fmt.Sprintf(`%s\ctr-%s-stdout`, pipeRoot, id),
|
||||
StdErr: fmt.Sprintf(`%s\ctr-%s-stderr`, pipeRoot, id),
|
||||
}, nil
|
||||
// NewFIFOSetInDir returns a new set of fifos for the task
|
||||
func NewFIFOSetInDir(_, id string, terminal bool) (*FIFOSet, error) {
|
||||
return NewFIFOSet(Config{
|
||||
Terminal: terminal,
|
||||
Stdin: fmt.Sprintf(`%s\ctr-%s-stdin`, pipeRoot, id),
|
||||
Stdout: fmt.Sprintf(`%s\ctr-%s-stdout`, pipeRoot, id),
|
||||
Stderr: fmt.Sprintf(`%s\ctr-%s-stderr`, pipeRoot, id),
|
||||
}, nil), nil
|
||||
}
|
||||
|
||||
func copyIO(fifos *FIFOSet, ioset *ioSet) (*cio, error) {
|
||||
func copyIO(fifos *FIFOSet, ioset *Streams) (*cio, error) {
|
||||
var (
|
||||
err error
|
||||
wg sync.WaitGroup
|
||||
set []io.Closer
|
||||
)
|
||||
|
||||
if fifos.StdIn != "" {
|
||||
l, err := winio.ListenPipe(fifos.StdIn, nil)
|
||||
if fifos.Stdin != "" {
|
||||
l, err := winio.ListenPipe(fifos.Stdin, nil)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "failed to create stdin pipe %s", fifos.StdIn)
|
||||
return nil, errors.Wrapf(err, "failed to create stdin pipe %s", fifos.Stdin)
|
||||
}
|
||||
defer func(l net.Listener) {
|
||||
if err != nil {
|
||||
@@ -44,19 +44,19 @@ func copyIO(fifos *FIFOSet, ioset *ioSet) (*cio, error) {
|
||||
go func() {
|
||||
c, err := l.Accept()
|
||||
if err != nil {
|
||||
log.L.WithError(err).Errorf("failed to accept stdin connection on %s", fifos.StdIn)
|
||||
log.L.WithError(err).Errorf("failed to accept stdin connection on %s", fifos.Stdin)
|
||||
return
|
||||
}
|
||||
io.Copy(c, ioset.in)
|
||||
io.Copy(c, ioset.Stdin)
|
||||
c.Close()
|
||||
l.Close()
|
||||
}()
|
||||
}
|
||||
|
||||
if fifos.StdOut != "" {
|
||||
l, err := winio.ListenPipe(fifos.StdOut, nil)
|
||||
if fifos.Stdout != "" {
|
||||
l, err := winio.ListenPipe(fifos.Stdout, nil)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "failed to create stdin pipe %s", fifos.StdOut)
|
||||
return nil, errors.Wrapf(err, "failed to create stdin pipe %s", fifos.Stdout)
|
||||
}
|
||||
defer func(l net.Listener) {
|
||||
if err != nil {
|
||||
@@ -70,19 +70,19 @@ func copyIO(fifos *FIFOSet, ioset *ioSet) (*cio, error) {
|
||||
defer wg.Done()
|
||||
c, err := l.Accept()
|
||||
if err != nil {
|
||||
log.L.WithError(err).Errorf("failed to accept stdout connection on %s", fifos.StdOut)
|
||||
log.L.WithError(err).Errorf("failed to accept stdout connection on %s", fifos.Stdout)
|
||||
return
|
||||
}
|
||||
io.Copy(ioset.out, c)
|
||||
io.Copy(ioset.Stdout, c)
|
||||
c.Close()
|
||||
l.Close()
|
||||
}()
|
||||
}
|
||||
|
||||
if !fifos.Terminal && fifos.StdErr != "" {
|
||||
l, err := winio.ListenPipe(fifos.StdErr, nil)
|
||||
if !fifos.Terminal && fifos.Stderr != "" {
|
||||
l, err := winio.ListenPipe(fifos.Stderr, nil)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "failed to create stderr pipe %s", fifos.StdErr)
|
||||
return nil, errors.Wrapf(err, "failed to create stderr pipe %s", fifos.Stderr)
|
||||
}
|
||||
defer func(l net.Listener) {
|
||||
if err != nil {
|
||||
@@ -96,23 +96,29 @@ func copyIO(fifos *FIFOSet, ioset *ioSet) (*cio, error) {
|
||||
defer wg.Done()
|
||||
c, err := l.Accept()
|
||||
if err != nil {
|
||||
log.L.WithError(err).Errorf("failed to accept stderr connection on %s", fifos.StdErr)
|
||||
log.L.WithError(err).Errorf("failed to accept stderr connection on %s", fifos.Stderr)
|
||||
return
|
||||
}
|
||||
io.Copy(ioset.err, c)
|
||||
io.Copy(ioset.Stderr, c)
|
||||
c.Close()
|
||||
l.Close()
|
||||
}()
|
||||
}
|
||||
|
||||
return &cio{
|
||||
wg: &wg,
|
||||
dir: fifos.Dir,
|
||||
set: set,
|
||||
cancel: func() {
|
||||
for _, l := range set {
|
||||
l.Close()
|
||||
}
|
||||
},
|
||||
}, nil
|
||||
return &cio{config: fifos.Config, closers: set}, nil
|
||||
}
|
||||
|
||||
// NewDirectIO returns an IO implementation that exposes the IO streams as io.ReadCloser
|
||||
// and io.WriteCloser.
|
||||
func NewDirectIO(stdin io.WriteCloser, stdout, stderr io.ReadCloser, terminal bool) *DirectIO {
|
||||
return &DirectIO{
|
||||
pipes: pipes{
|
||||
Stdin: stdin,
|
||||
Stdout: stdout,
|
||||
Stderr: stderr,
|
||||
},
|
||||
cio: cio{
|
||||
config: Config{Terminal: terminal},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user