Move io.go into cio package

Signed-off-by: Daniel Nephin <dnephin@gmail.com>
This commit is contained in:
Daniel Nephin
2017-11-17 14:18:30 -05:00
parent bc063f263c
commit 298dabc6c2
13 changed files with 77 additions and 69 deletions

196
cio/io.go Normal file
View File

@@ -0,0 +1,196 @@
package cio
import (
"context"
"fmt"
"io"
"os"
"sync"
)
// Config holds the io configurations.
type Config struct {
// Terminal is true if one has been allocated
Terminal bool
// Stdin path
Stdin string
// Stdout path
Stdout string
// Stderr path
Stderr string
}
// IO holds the io information for a task or process
type IO interface {
// Config returns the IO configuration.
Config() Config
// Cancel aborts all current io operations
Cancel()
// Wait blocks until all io copy operations have completed
Wait()
// Close cleans up all open io resources
Close() error
}
// cio is a basic container IO implementation.
type cio struct {
config Config
closer *wgCloser
}
func (c *cio) Config() Config {
return c.config
}
func (c *cio) Cancel() {
if c.closer == nil {
return
}
c.closer.Cancel()
}
func (c *cio) Wait() {
if c.closer == nil {
return
}
c.closer.Wait()
}
func (c *cio) Close() error {
if c.closer == nil {
return nil
}
return c.closer.Close()
}
// Creation creates new IO sets for a task
type Creation func(id string) (IO, error)
// Attach allows callers to reattach to running tasks
//
// There should only be one reader for a task's IO set
// because fifo's can only be read from one reader or the output
// will be sent only to the first reads
type Attach func(*FIFOSet) (IO, error)
// NewIO returns an Creation that will provide IO sets without a terminal
func NewIO(stdin io.Reader, stdout, stderr io.Writer) Creation {
return NewIOWithTerminal(stdin, stdout, stderr, false)
}
// NewIOWithTerminal creates a new io set with the provied io.Reader/Writers for use with a terminal
func NewIOWithTerminal(stdin io.Reader, stdout, stderr io.Writer, terminal bool) Creation {
return func(id string) (_ IO, err error) {
paths, err := NewFifos(id)
if err != nil {
return nil, err
}
defer func() {
if err != nil && paths.Dir != "" {
os.RemoveAll(paths.Dir)
}
}()
cfg := Config{
Terminal: terminal,
Stdout: paths.Out,
Stderr: paths.Err,
Stdin: paths.In,
}
i := &cio{config: cfg}
set := &ioSet{
in: stdin,
out: stdout,
err: stderr,
}
closer, err := copyIO(paths, set, cfg.Terminal)
if err != nil {
return nil, err
}
i.closer = closer
return i, nil
}
}
// WithAttach attaches the existing io for a task to the provided io.Reader/Writers
func WithAttach(stdin io.Reader, stdout, stderr io.Writer) Attach {
return func(paths *FIFOSet) (IO, error) {
if paths == nil {
return nil, fmt.Errorf("cannot attach to existing fifos")
}
cfg := Config{
Terminal: paths.Terminal,
Stdout: paths.Out,
Stderr: paths.Err,
Stdin: paths.In,
}
i := &cio{config: cfg}
set := &ioSet{
in: stdin,
out: stdout,
err: stderr,
}
closer, err := copyIO(paths, set, cfg.Terminal)
if err != nil {
return nil, err
}
i.closer = closer
return i, nil
}
}
// Stdio returns an IO set to be used for a task
// that outputs the container's IO as the current processes Stdio
func Stdio(id string) (IO, error) {
return NewIO(os.Stdin, os.Stdout, os.Stderr)(id)
}
// StdioTerminal will setup the IO for the task to use a terminal
func StdioTerminal(id string) (IO, error) {
return NewIOWithTerminal(os.Stdin, os.Stdout, os.Stderr, true)(id)
}
// NullIO redirects the container's IO into /dev/null
func NullIO(id string) (IO, error) {
return &cio{}, nil
}
// FIFOSet is a set of fifos for use with tasks
type FIFOSet struct {
// Dir is the directory holding the task fifos
Dir string
// In, Out, and Err fifo paths
In, Out, Err string
// Terminal returns true if a terminal is being used for the task
Terminal bool
}
type ioSet struct {
in io.Reader
out, err io.Writer
}
type wgCloser struct {
wg *sync.WaitGroup
dir string
set []io.Closer
cancel context.CancelFunc
}
func (g *wgCloser) Wait() {
g.wg.Wait()
}
func (g *wgCloser) Close() error {
for _, f := range g.set {
f.Close()
}
if g.dir != "" {
return os.RemoveAll(g.dir)
}
return nil
}
func (g *wgCloser) Cancel() {
g.cancel()
}

184
cio/io_unix.go Normal file
View File

@@ -0,0 +1,184 @@
// +build !windows
package cio
import (
"context"
"io"
"io/ioutil"
"os"
"path/filepath"
"sync"
"syscall"
"github.com/containerd/fifo"
)
// NewFifos returns a new set of fifos for the task
func NewFifos(id string) (*FIFOSet, error) {
root := "/run/containerd/fifo"
if err := os.MkdirAll(root, 0700); err != nil {
return nil, err
}
dir, err := ioutil.TempDir(root, "")
if err != nil {
return nil, err
}
return &FIFOSet{
Dir: dir,
In: filepath.Join(dir, id+"-stdin"),
Out: filepath.Join(dir, id+"-stdout"),
Err: filepath.Join(dir, id+"-stderr"),
}, nil
}
func copyIO(fifos *FIFOSet, ioset *ioSet, tty bool) (_ *wgCloser, err error) {
var (
f io.ReadWriteCloser
set []io.Closer
ctx, cancel = context.WithCancel(context.Background())
wg = &sync.WaitGroup{}
)
defer func() {
if err != nil {
for _, f := range set {
f.Close()
}
cancel()
}
}()
if f, err = fifo.OpenFifo(ctx, fifos.In, syscall.O_WRONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil {
return nil, err
}
set = append(set, f)
go func(w io.WriteCloser) {
io.Copy(w, ioset.in)
w.Close()
}(f)
if f, err = fifo.OpenFifo(ctx, fifos.Out, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil {
return nil, err
}
set = append(set, f)
wg.Add(1)
go func(r io.ReadCloser) {
io.Copy(ioset.out, r)
r.Close()
wg.Done()
}(f)
if f, err = fifo.OpenFifo(ctx, fifos.Err, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil {
return nil, err
}
set = append(set, f)
if !tty {
wg.Add(1)
go func(r io.ReadCloser) {
io.Copy(ioset.err, r)
r.Close()
wg.Done()
}(f)
}
return &wgCloser{
wg: wg,
dir: fifos.Dir,
set: set,
cancel: cancel,
}, nil
}
// NewDirectIO returns an IO implementation that exposes the pipes directly
func NewDirectIO(ctx context.Context, terminal bool) (*DirectIO, error) {
set, err := NewFifos("")
if err != nil {
return nil, err
}
f := &DirectIO{
set: set,
terminal: terminal,
}
defer func() {
if err != nil {
f.Delete()
}
}()
if f.Stdin, err = fifo.OpenFifo(ctx, set.In, syscall.O_WRONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil {
return nil, err
}
if f.Stdout, err = fifo.OpenFifo(ctx, set.Out, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil {
f.Stdin.Close()
return nil, err
}
if f.Stderr, err = fifo.OpenFifo(ctx, set.Err, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil {
f.Stdin.Close()
f.Stdout.Close()
return nil, err
}
return f, nil
}
// DirectIO allows task IO to be handled externally by the caller
type DirectIO struct {
Stdin io.WriteCloser
Stdout io.ReadCloser
Stderr io.ReadCloser
set *FIFOSet
terminal bool
}
// IOCreate returns IO avaliable for use with task creation
func (f *DirectIO) IOCreate(id string) (IO, error) {
return f, nil
}
// IOAttach returns IO avaliable for use with task attachment
func (f *DirectIO) IOAttach(set *FIFOSet) (IO, error) {
return f, nil
}
// Config returns the Config
func (f *DirectIO) Config() Config {
return Config{
Terminal: f.terminal,
Stdin: f.set.In,
Stdout: f.set.Out,
Stderr: f.set.Err,
}
}
// Cancel stops any IO copy operations
//
// Not applicable for DirectIO
func (f *DirectIO) Cancel() {
// nothing to cancel as all operations are handled externally
}
// Wait on any IO copy operations
//
// Not applicable for DirectIO
func (f *DirectIO) Wait() {
// nothing to wait on as all operations are handled externally
}
// Close closes all open fds
func (f *DirectIO) Close() error {
err := f.Stdin.Close()
if err2 := f.Stdout.Close(); err == nil {
err = err2
}
if err2 := f.Stderr.Close(); err == nil {
err = err2
}
return err
}
// Delete removes the underlying directory containing fifos
func (f *DirectIO) Delete() error {
if f.set.Dir == "" {
return nil
}
return os.RemoveAll(f.set.Dir)
}

117
cio/io_windows.go Normal file
View File

@@ -0,0 +1,117 @@
package cio
import (
"fmt"
"io"
"net"
"sync"
winio "github.com/Microsoft/go-winio"
"github.com/containerd/containerd/log"
"github.com/pkg/errors"
)
const pipeRoot = `\\.\pipe`
// NewFifos returns a new set of fifos for the task
func NewFifos(id string) (*FIFOSet, error) {
return &FIFOSet{
In: fmt.Sprintf(`%s\ctr-%s-stdin`, pipeRoot, id),
Out: fmt.Sprintf(`%s\ctr-%s-stdout`, pipeRoot, id),
Err: fmt.Sprintf(`%s\ctr-%s-stderr`, pipeRoot, id),
}, nil
}
func copyIO(fifos *FIFOSet, ioset *ioSet, tty bool) (_ *wgCloser, err error) {
var (
wg sync.WaitGroup
set []io.Closer
)
if fifos.In != "" {
l, err := winio.ListenPipe(fifos.In, nil)
if err != nil {
return nil, errors.Wrapf(err, "failed to create stdin pipe %s", fifos.In)
}
defer func(l net.Listener) {
if err != nil {
l.Close()
}
}(l)
set = append(set, l)
go func() {
c, err := l.Accept()
if err != nil {
log.L.WithError(err).Errorf("failed to accept stdin connection on %s", fifos.In)
return
}
io.Copy(c, ioset.in)
c.Close()
l.Close()
}()
}
if fifos.Out != "" {
l, err := winio.ListenPipe(fifos.Out, nil)
if err != nil {
return nil, errors.Wrapf(err, "failed to create stdin pipe %s", fifos.Out)
}
defer func(l net.Listener) {
if err != nil {
l.Close()
}
}(l)
set = append(set, l)
wg.Add(1)
go func() {
defer wg.Done()
c, err := l.Accept()
if err != nil {
log.L.WithError(err).Errorf("failed to accept stdout connection on %s", fifos.Out)
return
}
io.Copy(ioset.out, c)
c.Close()
l.Close()
}()
}
if !tty && fifos.Err != "" {
l, err := winio.ListenPipe(fifos.Err, nil)
if err != nil {
return nil, errors.Wrapf(err, "failed to create stderr pipe %s", fifos.Err)
}
defer func(l net.Listener) {
if err != nil {
l.Close()
}
}(l)
set = append(set, l)
wg.Add(1)
go func() {
defer wg.Done()
c, err := l.Accept()
if err != nil {
log.L.WithError(err).Errorf("failed to accept stderr connection on %s", fifos.Err)
return
}
io.Copy(ioset.err, c)
c.Close()
l.Close()
}()
}
return &wgCloser{
wg: &wg,
dir: fifos.Dir,
set: set,
cancel: func() {
for _, l := range set {
l.Close()
}
},
}, nil
}