Merge pull request #1895 from dnephin/refactor-cio
Refactor cio package
This commit is contained in:
commit
afbbe43745
277
cio/io.go
277
cio/io.go
@ -8,7 +8,7 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Config holds the io configurations.
|
// Config holds the IO configurations.
|
||||||
type Config struct {
|
type Config struct {
|
||||||
// Terminal is true if one has been allocated
|
// Terminal is true if one has been allocated
|
||||||
Terminal bool
|
Terminal bool
|
||||||
@ -33,40 +33,8 @@ type IO interface {
|
|||||||
Close() error
|
Close() error
|
||||||
}
|
}
|
||||||
|
|
||||||
// cio is a basic container IO implementation.
|
// Creator creates new IO sets for a task
|
||||||
type cio struct {
|
type Creator func(id string) (IO, error)
|
||||||
config Config
|
|
||||||
|
|
||||||
closer *wgCloser
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *cio) Config() Config {
|
|
||||||
return c.config
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *cio) Cancel() {
|
|
||||||
if c.closer == nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
c.closer.Cancel()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *cio) Wait() {
|
|
||||||
if c.closer == nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
c.closer.Wait()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *cio) Close() error {
|
|
||||||
if c.closer == nil {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
return c.closer.Close()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Creation creates new IO sets for a task
|
|
||||||
type Creation func(id string) (IO, error)
|
|
||||||
|
|
||||||
// Attach allows callers to reattach to running tasks
|
// Attach allows callers to reattach to running tasks
|
||||||
//
|
//
|
||||||
@ -75,123 +43,138 @@ type Creation func(id string) (IO, error)
|
|||||||
// will be sent only to the first reads
|
// will be sent only to the first reads
|
||||||
type Attach func(*FIFOSet) (IO, error)
|
type Attach func(*FIFOSet) (IO, error)
|
||||||
|
|
||||||
// NewIO returns an Creation that will provide IO sets without a terminal
|
// FIFOSet is a set of file paths to FIFOs for a task's standard IO streams
|
||||||
func NewIO(stdin io.Reader, stdout, stderr io.Writer) Creation {
|
|
||||||
return NewIOWithTerminal(stdin, stdout, stderr, false)
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewIOWithTerminal creates a new io set with the provied io.Reader/Writers for use with a terminal
|
|
||||||
func NewIOWithTerminal(stdin io.Reader, stdout, stderr io.Writer, terminal bool) Creation {
|
|
||||||
return func(id string) (_ IO, err error) {
|
|
||||||
paths, err := NewFifos(id)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
defer func() {
|
|
||||||
if err != nil && paths.Dir != "" {
|
|
||||||
os.RemoveAll(paths.Dir)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
cfg := Config{
|
|
||||||
Terminal: terminal,
|
|
||||||
Stdout: paths.Out,
|
|
||||||
Stderr: paths.Err,
|
|
||||||
Stdin: paths.In,
|
|
||||||
}
|
|
||||||
i := &cio{config: cfg}
|
|
||||||
set := &ioSet{
|
|
||||||
in: stdin,
|
|
||||||
out: stdout,
|
|
||||||
err: stderr,
|
|
||||||
}
|
|
||||||
closer, err := copyIO(paths, set, cfg.Terminal)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
i.closer = closer
|
|
||||||
return i, nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// WithAttach attaches the existing io for a task to the provided io.Reader/Writers
|
|
||||||
func WithAttach(stdin io.Reader, stdout, stderr io.Writer) Attach {
|
|
||||||
return func(paths *FIFOSet) (IO, error) {
|
|
||||||
if paths == nil {
|
|
||||||
return nil, fmt.Errorf("cannot attach to existing fifos")
|
|
||||||
}
|
|
||||||
cfg := Config{
|
|
||||||
Terminal: paths.Terminal,
|
|
||||||
Stdout: paths.Out,
|
|
||||||
Stderr: paths.Err,
|
|
||||||
Stdin: paths.In,
|
|
||||||
}
|
|
||||||
i := &cio{config: cfg}
|
|
||||||
set := &ioSet{
|
|
||||||
in: stdin,
|
|
||||||
out: stdout,
|
|
||||||
err: stderr,
|
|
||||||
}
|
|
||||||
closer, err := copyIO(paths, set, cfg.Terminal)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
i.closer = closer
|
|
||||||
return i, nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Stdio returns an IO set to be used for a task
|
|
||||||
// that outputs the container's IO as the current processes Stdio
|
|
||||||
func Stdio(id string) (IO, error) {
|
|
||||||
return NewIO(os.Stdin, os.Stdout, os.Stderr)(id)
|
|
||||||
}
|
|
||||||
|
|
||||||
// StdioTerminal will setup the IO for the task to use a terminal
|
|
||||||
func StdioTerminal(id string) (IO, error) {
|
|
||||||
return NewIOWithTerminal(os.Stdin, os.Stdout, os.Stderr, true)(id)
|
|
||||||
}
|
|
||||||
|
|
||||||
// NullIO redirects the container's IO into /dev/null
|
|
||||||
func NullIO(id string) (IO, error) {
|
|
||||||
return &cio{}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// FIFOSet is a set of fifos for use with tasks
|
|
||||||
type FIFOSet struct {
|
type FIFOSet struct {
|
||||||
// Dir is the directory holding the task fifos
|
Config
|
||||||
Dir string
|
close func() error
|
||||||
// In, Out, and Err fifo paths
|
|
||||||
In, Out, Err string
|
|
||||||
// Terminal returns true if a terminal is being used for the task
|
|
||||||
Terminal bool
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type ioSet struct {
|
// Close the FIFOSet
|
||||||
in io.Reader
|
func (f *FIFOSet) Close() error {
|
||||||
out, err io.Writer
|
if f.close != nil {
|
||||||
}
|
return f.close()
|
||||||
|
|
||||||
type wgCloser struct {
|
|
||||||
wg *sync.WaitGroup
|
|
||||||
dir string
|
|
||||||
set []io.Closer
|
|
||||||
cancel context.CancelFunc
|
|
||||||
}
|
|
||||||
|
|
||||||
func (g *wgCloser) Wait() {
|
|
||||||
g.wg.Wait()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (g *wgCloser) Close() error {
|
|
||||||
for _, f := range g.set {
|
|
||||||
f.Close()
|
|
||||||
}
|
|
||||||
if g.dir != "" {
|
|
||||||
return os.RemoveAll(g.dir)
|
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *wgCloser) Cancel() {
|
// NewFIFOSet returns a new FIFOSet from a Config and a close function
|
||||||
g.cancel()
|
func NewFIFOSet(config Config, close func() error) *FIFOSet {
|
||||||
|
return &FIFOSet{Config: config, close: close}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Streams used to configure a Creator or Attach
|
||||||
|
type Streams struct {
|
||||||
|
Stdin io.Reader
|
||||||
|
Stdout io.Writer
|
||||||
|
Stderr io.Writer
|
||||||
|
Terminal bool
|
||||||
|
}
|
||||||
|
|
||||||
|
// Opt customize options for creating a Creator or Attach
|
||||||
|
type Opt func(*Streams)
|
||||||
|
|
||||||
|
// WithStdio sets stream options to the standard input/output streams
|
||||||
|
func WithStdio(opt *Streams) {
|
||||||
|
WithStreams(os.Stdin, os.Stdout, os.Stderr)(opt)
|
||||||
|
}
|
||||||
|
|
||||||
|
// WithTerminal sets the terminal option
|
||||||
|
func WithTerminal(opt *Streams) {
|
||||||
|
opt.Terminal = true
|
||||||
|
}
|
||||||
|
|
||||||
|
// WithStreams sets the stream options to the specified Reader and Writers
|
||||||
|
func WithStreams(stdin io.Reader, stdout, stderr io.Writer) Opt {
|
||||||
|
return func(opt *Streams) {
|
||||||
|
opt.Stdin = stdin
|
||||||
|
opt.Stdout = stdout
|
||||||
|
opt.Stderr = stderr
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewCreator returns an IO creator from the options
|
||||||
|
func NewCreator(opts ...Opt) Creator {
|
||||||
|
streams := &Streams{}
|
||||||
|
for _, opt := range opts {
|
||||||
|
opt(streams)
|
||||||
|
}
|
||||||
|
return func(id string) (IO, error) {
|
||||||
|
// TODO: accept root as a param
|
||||||
|
root := "/run/containerd/fifo"
|
||||||
|
fifos, err := NewFIFOSetInDir(root, id, streams.Terminal)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return copyIO(fifos, streams)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewAttach attaches the existing io for a task to the provided io.Reader/Writers
|
||||||
|
func NewAttach(opts ...Opt) Attach {
|
||||||
|
streams := &Streams{}
|
||||||
|
for _, opt := range opts {
|
||||||
|
opt(streams)
|
||||||
|
}
|
||||||
|
return func(fifos *FIFOSet) (IO, error) {
|
||||||
|
if fifos == nil {
|
||||||
|
return nil, fmt.Errorf("cannot attach, missing fifos")
|
||||||
|
}
|
||||||
|
return copyIO(fifos, streams)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// NullIO redirects the container's IO into /dev/null
|
||||||
|
func NullIO(_ string) (IO, error) {
|
||||||
|
return &cio{}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// cio is a basic container IO implementation.
|
||||||
|
type cio struct {
|
||||||
|
config Config
|
||||||
|
wg *sync.WaitGroup
|
||||||
|
closers []io.Closer
|
||||||
|
cancel context.CancelFunc
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *cio) Config() Config {
|
||||||
|
return c.config
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *cio) Wait() {
|
||||||
|
if c.wg != nil {
|
||||||
|
c.wg.Wait()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *cio) Close() error {
|
||||||
|
var lastErr error
|
||||||
|
for _, closer := range c.closers {
|
||||||
|
if closer == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if err := closer.Close(); err != nil {
|
||||||
|
lastErr = err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return lastErr
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *cio) Cancel() {
|
||||||
|
if c.cancel != nil {
|
||||||
|
c.cancel()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type pipes struct {
|
||||||
|
Stdin io.WriteCloser
|
||||||
|
Stdout io.ReadCloser
|
||||||
|
Stderr io.ReadCloser
|
||||||
|
}
|
||||||
|
|
||||||
|
// DirectIO allows task IO to be handled externally by the caller
|
||||||
|
type DirectIO struct {
|
||||||
|
pipes
|
||||||
|
cio
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ IO = &DirectIO{}
|
||||||
|
132
cio/io_test.go
Normal file
132
cio/io_test.go
Normal file
@ -0,0 +1,132 @@
|
|||||||
|
// +build !windows
|
||||||
|
|
||||||
|
package cio
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"context"
|
||||||
|
"io"
|
||||||
|
"io/ioutil"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"runtime"
|
||||||
|
"strings"
|
||||||
|
"syscall"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/containerd/fifo"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func assertHasPrefix(t *testing.T, s, prefix string) {
|
||||||
|
t.Helper()
|
||||||
|
if !strings.HasPrefix(s, prefix) {
|
||||||
|
t.Fatalf("expected %s to start with %s", s, prefix)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestNewFIFOSetInDir(t *testing.T) {
|
||||||
|
if runtime.GOOS == "windows" {
|
||||||
|
t.Skip("NewFIFOSetInDir has different behaviour on windows")
|
||||||
|
}
|
||||||
|
|
||||||
|
root, err := ioutil.TempDir("", "test-new-fifo-set")
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer os.RemoveAll(root)
|
||||||
|
|
||||||
|
fifos, err := NewFIFOSetInDir(root, "theid", true)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
assertHasPrefix(t, fifos.Stdin, root)
|
||||||
|
assertHasPrefix(t, fifos.Stdout, root)
|
||||||
|
assertHasPrefix(t, fifos.Stderr, root)
|
||||||
|
assert.Equal(t, "theid-stdin", filepath.Base(fifos.Stdin))
|
||||||
|
assert.Equal(t, "theid-stdout", filepath.Base(fifos.Stdout))
|
||||||
|
assert.Equal(t, "theid-stderr", filepath.Base(fifos.Stderr))
|
||||||
|
assert.Equal(t, true, fifos.Terminal)
|
||||||
|
|
||||||
|
files, err := ioutil.ReadDir(root)
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Len(t, files, 1)
|
||||||
|
|
||||||
|
require.NoError(t, fifos.Close())
|
||||||
|
files, err = ioutil.ReadDir(root)
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Len(t, files, 0)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestNewAttach(t *testing.T) {
|
||||||
|
if runtime.GOOS == "windows" {
|
||||||
|
t.Skip("setupFIFOProducers not yet implemented on windows")
|
||||||
|
}
|
||||||
|
var (
|
||||||
|
expectedStdin = "this is the stdin"
|
||||||
|
expectedStdout = "this is the stdout"
|
||||||
|
expectedStderr = "this is the stderr"
|
||||||
|
stdin = bytes.NewBufferString(expectedStdin)
|
||||||
|
stdout = new(bytes.Buffer)
|
||||||
|
stderr = new(bytes.Buffer)
|
||||||
|
)
|
||||||
|
|
||||||
|
withBytesBuffers := func(streams *Streams) {
|
||||||
|
*streams = Streams{Stdin: stdin, Stdout: stdout, Stderr: stderr}
|
||||||
|
}
|
||||||
|
attacher := NewAttach(withBytesBuffers)
|
||||||
|
|
||||||
|
fifos, err := NewFIFOSetInDir("", "theid", false)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
io, err := attacher(fifos)
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer io.Close()
|
||||||
|
|
||||||
|
producers := setupFIFOProducers(t, io.Config())
|
||||||
|
initProducers(t, producers, expectedStdout, expectedStderr)
|
||||||
|
|
||||||
|
actualStdin, err := ioutil.ReadAll(producers.Stdin)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
io.Cancel()
|
||||||
|
io.Wait()
|
||||||
|
assert.NoError(t, io.Close())
|
||||||
|
|
||||||
|
assert.Equal(t, expectedStdout, stdout.String())
|
||||||
|
assert.Equal(t, expectedStderr, stderr.String())
|
||||||
|
assert.Equal(t, expectedStdin, string(actualStdin))
|
||||||
|
}
|
||||||
|
|
||||||
|
type producers struct {
|
||||||
|
Stdin io.ReadCloser
|
||||||
|
Stdout io.WriteCloser
|
||||||
|
Stderr io.WriteCloser
|
||||||
|
}
|
||||||
|
|
||||||
|
func setupFIFOProducers(t *testing.T, fifos Config) producers {
|
||||||
|
var (
|
||||||
|
err error
|
||||||
|
pipes producers
|
||||||
|
ctx = context.Background()
|
||||||
|
)
|
||||||
|
|
||||||
|
pipes.Stdin, err = fifo.OpenFifo(ctx, fifos.Stdin, syscall.O_RDONLY, 0)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
pipes.Stdout, err = fifo.OpenFifo(ctx, fifos.Stdout, syscall.O_WRONLY, 0)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
pipes.Stderr, err = fifo.OpenFifo(ctx, fifos.Stderr, syscall.O_WRONLY, 0)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
return pipes
|
||||||
|
}
|
||||||
|
|
||||||
|
func initProducers(t *testing.T, producers producers, stdout, stderr string) {
|
||||||
|
_, err := producers.Stdout.Write([]byte(stdout))
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NoError(t, producers.Stdout.Close())
|
||||||
|
|
||||||
|
_, err = producers.Stderr.Write([]byte(stderr))
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NoError(t, producers.Stderr.Close())
|
||||||
|
}
|
230
cio/io_unix.go
230
cio/io_unix.go
@ -12,173 +12,115 @@ import (
|
|||||||
"syscall"
|
"syscall"
|
||||||
|
|
||||||
"github.com/containerd/fifo"
|
"github.com/containerd/fifo"
|
||||||
|
"github.com/pkg/errors"
|
||||||
)
|
)
|
||||||
|
|
||||||
// NewFifos returns a new set of fifos for the task
|
// NewFIFOSetInDir returns a new FIFOSet with paths in a temporary directory under root
|
||||||
func NewFifos(id string) (*FIFOSet, error) {
|
func NewFIFOSetInDir(root, id string, terminal bool) (*FIFOSet, error) {
|
||||||
root := "/run/containerd/fifo"
|
if root != "" {
|
||||||
if err := os.MkdirAll(root, 0700); err != nil {
|
if err := os.MkdirAll(root, 0700); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
dir, err := ioutil.TempDir(root, "")
|
dir, err := ioutil.TempDir(root, "")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return &FIFOSet{
|
closer := func() error {
|
||||||
Dir: dir,
|
return os.RemoveAll(dir)
|
||||||
In: filepath.Join(dir, id+"-stdin"),
|
}
|
||||||
Out: filepath.Join(dir, id+"-stdout"),
|
return NewFIFOSet(Config{
|
||||||
Err: filepath.Join(dir, id+"-stderr"),
|
Stdin: filepath.Join(dir, id+"-stdin"),
|
||||||
}, nil
|
Stdout: filepath.Join(dir, id+"-stdout"),
|
||||||
|
Stderr: filepath.Join(dir, id+"-stderr"),
|
||||||
|
Terminal: terminal,
|
||||||
|
}, closer), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func copyIO(fifos *FIFOSet, ioset *ioSet, tty bool) (_ *wgCloser, err error) {
|
func copyIO(fifos *FIFOSet, ioset *Streams) (*cio, error) {
|
||||||
var (
|
var ctx, cancel = context.WithCancel(context.Background())
|
||||||
f io.ReadWriteCloser
|
pipes, err := openFifos(ctx, fifos)
|
||||||
set []io.Closer
|
|
||||||
ctx, cancel = context.WithCancel(context.Background())
|
|
||||||
wg = &sync.WaitGroup{}
|
|
||||||
)
|
|
||||||
defer func() {
|
|
||||||
if err != nil {
|
|
||||||
for _, f := range set {
|
|
||||||
f.Close()
|
|
||||||
}
|
|
||||||
cancel()
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
if f, err = fifo.OpenFifo(ctx, fifos.In, syscall.O_WRONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
set = append(set, f)
|
|
||||||
go func(w io.WriteCloser) {
|
|
||||||
io.Copy(w, ioset.in)
|
|
||||||
w.Close()
|
|
||||||
}(f)
|
|
||||||
|
|
||||||
if f, err = fifo.OpenFifo(ctx, fifos.Out, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
set = append(set, f)
|
|
||||||
wg.Add(1)
|
|
||||||
go func(r io.ReadCloser) {
|
|
||||||
io.Copy(ioset.out, r)
|
|
||||||
r.Close()
|
|
||||||
wg.Done()
|
|
||||||
}(f)
|
|
||||||
|
|
||||||
if f, err = fifo.OpenFifo(ctx, fifos.Err, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
set = append(set, f)
|
|
||||||
|
|
||||||
if !tty {
|
|
||||||
wg.Add(1)
|
|
||||||
go func(r io.ReadCloser) {
|
|
||||||
io.Copy(ioset.err, r)
|
|
||||||
r.Close()
|
|
||||||
wg.Done()
|
|
||||||
}(f)
|
|
||||||
}
|
|
||||||
return &wgCloser{
|
|
||||||
wg: wg,
|
|
||||||
dir: fifos.Dir,
|
|
||||||
set: set,
|
|
||||||
cancel: cancel,
|
|
||||||
}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewDirectIO returns an IO implementation that exposes the pipes directly
|
|
||||||
func NewDirectIO(ctx context.Context, terminal bool) (*DirectIO, error) {
|
|
||||||
set, err := NewFifos("")
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
cancel()
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
f := &DirectIO{
|
|
||||||
set: set,
|
if fifos.Stdin != "" {
|
||||||
terminal: terminal,
|
go func() {
|
||||||
|
io.Copy(pipes.Stdin, ioset.Stdin)
|
||||||
|
pipes.Stdin.Close()
|
||||||
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var wg = &sync.WaitGroup{}
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
io.Copy(ioset.Stdout, pipes.Stdout)
|
||||||
|
pipes.Stdout.Close()
|
||||||
|
wg.Done()
|
||||||
|
}()
|
||||||
|
|
||||||
|
if !fifos.Terminal {
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
io.Copy(ioset.Stderr, pipes.Stderr)
|
||||||
|
pipes.Stderr.Close()
|
||||||
|
wg.Done()
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
return &cio{
|
||||||
|
config: fifos.Config,
|
||||||
|
wg: wg,
|
||||||
|
closers: append(pipes.closers(), fifos),
|
||||||
|
cancel: cancel,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func openFifos(ctx context.Context, fifos *FIFOSet) (pipes, error) {
|
||||||
|
var err error
|
||||||
defer func() {
|
defer func() {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
f.Delete()
|
fifos.Close()
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
if f.Stdin, err = fifo.OpenFifo(ctx, set.In, syscall.O_WRONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil {
|
|
||||||
return nil, err
|
var f pipes
|
||||||
|
if fifos.Stdin != "" {
|
||||||
|
if f.Stdin, err = fifo.OpenFifo(ctx, fifos.Stdin, syscall.O_WRONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil {
|
||||||
|
return f, errors.Wrapf(err, "failed to open stdin fifo")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if f.Stdout, err = fifo.OpenFifo(ctx, set.Out, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil {
|
if fifos.Stdout != "" {
|
||||||
f.Stdin.Close()
|
if f.Stdout, err = fifo.OpenFifo(ctx, fifos.Stdout, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil {
|
||||||
return nil, err
|
f.Stdin.Close()
|
||||||
|
return f, errors.Wrapf(err, "failed to open stdout fifo")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if f.Stderr, err = fifo.OpenFifo(ctx, set.Err, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil {
|
if fifos.Stderr != "" {
|
||||||
f.Stdin.Close()
|
if f.Stderr, err = fifo.OpenFifo(ctx, fifos.Stderr, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil {
|
||||||
f.Stdout.Close()
|
f.Stdin.Close()
|
||||||
return nil, err
|
f.Stdout.Close()
|
||||||
|
return f, errors.Wrapf(err, "failed to open stderr fifo")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return f, nil
|
return f, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// DirectIO allows task IO to be handled externally by the caller
|
// NewDirectIO returns an IO implementation that exposes the IO streams as io.ReadCloser
|
||||||
type DirectIO struct {
|
// and io.WriteCloser.
|
||||||
Stdin io.WriteCloser
|
func NewDirectIO(ctx context.Context, fifos *FIFOSet) (*DirectIO, error) {
|
||||||
Stdout io.ReadCloser
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
Stderr io.ReadCloser
|
pipes, err := openFifos(ctx, fifos)
|
||||||
|
return &DirectIO{
|
||||||
set *FIFOSet
|
pipes: pipes,
|
||||||
terminal bool
|
cio: cio{
|
||||||
|
config: fifos.Config,
|
||||||
|
closers: append(pipes.closers(), fifos),
|
||||||
|
cancel: cancel,
|
||||||
|
},
|
||||||
|
}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// IOCreate returns IO avaliable for use with task creation
|
func (p *pipes) closers() []io.Closer {
|
||||||
func (f *DirectIO) IOCreate(id string) (IO, error) {
|
return []io.Closer{p.Stdin, p.Stdout, p.Stderr}
|
||||||
return f, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// IOAttach returns IO avaliable for use with task attachment
|
|
||||||
func (f *DirectIO) IOAttach(set *FIFOSet) (IO, error) {
|
|
||||||
return f, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Config returns the Config
|
|
||||||
func (f *DirectIO) Config() Config {
|
|
||||||
return Config{
|
|
||||||
Terminal: f.terminal,
|
|
||||||
Stdin: f.set.In,
|
|
||||||
Stdout: f.set.Out,
|
|
||||||
Stderr: f.set.Err,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Cancel stops any IO copy operations
|
|
||||||
//
|
|
||||||
// Not applicable for DirectIO
|
|
||||||
func (f *DirectIO) Cancel() {
|
|
||||||
// nothing to cancel as all operations are handled externally
|
|
||||||
}
|
|
||||||
|
|
||||||
// Wait on any IO copy operations
|
|
||||||
//
|
|
||||||
// Not applicable for DirectIO
|
|
||||||
func (f *DirectIO) Wait() {
|
|
||||||
// nothing to wait on as all operations are handled externally
|
|
||||||
}
|
|
||||||
|
|
||||||
// Close closes all open fds
|
|
||||||
func (f *DirectIO) Close() error {
|
|
||||||
err := f.Stdin.Close()
|
|
||||||
if err2 := f.Stdout.Close(); err == nil {
|
|
||||||
err = err2
|
|
||||||
}
|
|
||||||
if err2 := f.Stderr.Close(); err == nil {
|
|
||||||
err = err2
|
|
||||||
}
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Delete removes the underlying directory containing fifos
|
|
||||||
func (f *DirectIO) Delete() error {
|
|
||||||
if f.set.Dir == "" {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
return os.RemoveAll(f.set.Dir)
|
|
||||||
}
|
}
|
||||||
|
@ -13,25 +13,26 @@ import (
|
|||||||
|
|
||||||
const pipeRoot = `\\.\pipe`
|
const pipeRoot = `\\.\pipe`
|
||||||
|
|
||||||
// NewFifos returns a new set of fifos for the task
|
// NewFIFOSetInDir returns a new set of fifos for the task
|
||||||
func NewFifos(id string) (*FIFOSet, error) {
|
func NewFIFOSetInDir(_, id string, terminal bool) (*FIFOSet, error) {
|
||||||
return &FIFOSet{
|
return NewFIFOSet(Config{
|
||||||
In: fmt.Sprintf(`%s\ctr-%s-stdin`, pipeRoot, id),
|
Terminal: terminal,
|
||||||
Out: fmt.Sprintf(`%s\ctr-%s-stdout`, pipeRoot, id),
|
Stdin: fmt.Sprintf(`%s\ctr-%s-stdin`, pipeRoot, id),
|
||||||
Err: fmt.Sprintf(`%s\ctr-%s-stderr`, pipeRoot, id),
|
Stdout: fmt.Sprintf(`%s\ctr-%s-stdout`, pipeRoot, id),
|
||||||
}, nil
|
Stderr: fmt.Sprintf(`%s\ctr-%s-stderr`, pipeRoot, id),
|
||||||
|
}, nil), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func copyIO(fifos *FIFOSet, ioset *ioSet, tty bool) (_ *wgCloser, err error) {
|
func copyIO(fifos *FIFOSet, ioset *Streams) (*cio, error) {
|
||||||
var (
|
var (
|
||||||
wg sync.WaitGroup
|
wg sync.WaitGroup
|
||||||
set []io.Closer
|
set []io.Closer
|
||||||
)
|
)
|
||||||
|
|
||||||
if fifos.In != "" {
|
if fifos.Stdin != "" {
|
||||||
l, err := winio.ListenPipe(fifos.In, nil)
|
l, err := winio.ListenPipe(fifos.Stdin, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.Wrapf(err, "failed to create stdin pipe %s", fifos.In)
|
return nil, errors.Wrapf(err, "failed to create stdin pipe %s", fifos.Stdin)
|
||||||
}
|
}
|
||||||
defer func(l net.Listener) {
|
defer func(l net.Listener) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -43,19 +44,19 @@ func copyIO(fifos *FIFOSet, ioset *ioSet, tty bool) (_ *wgCloser, err error) {
|
|||||||
go func() {
|
go func() {
|
||||||
c, err := l.Accept()
|
c, err := l.Accept()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.L.WithError(err).Errorf("failed to accept stdin connection on %s", fifos.In)
|
log.L.WithError(err).Errorf("failed to accept stdin connection on %s", fifos.Stdin)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
io.Copy(c, ioset.in)
|
io.Copy(c, ioset.Stdin)
|
||||||
c.Close()
|
c.Close()
|
||||||
l.Close()
|
l.Close()
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
if fifos.Out != "" {
|
if fifos.Stdout != "" {
|
||||||
l, err := winio.ListenPipe(fifos.Out, nil)
|
l, err := winio.ListenPipe(fifos.Stdout, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.Wrapf(err, "failed to create stdin pipe %s", fifos.Out)
|
return nil, errors.Wrapf(err, "failed to create stdin pipe %s", fifos.Stdout)
|
||||||
}
|
}
|
||||||
defer func(l net.Listener) {
|
defer func(l net.Listener) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -69,19 +70,19 @@ func copyIO(fifos *FIFOSet, ioset *ioSet, tty bool) (_ *wgCloser, err error) {
|
|||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
c, err := l.Accept()
|
c, err := l.Accept()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.L.WithError(err).Errorf("failed to accept stdout connection on %s", fifos.Out)
|
log.L.WithError(err).Errorf("failed to accept stdout connection on %s", fifos.Stdout)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
io.Copy(ioset.out, c)
|
io.Copy(ioset.Stdout, c)
|
||||||
c.Close()
|
c.Close()
|
||||||
l.Close()
|
l.Close()
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
if !tty && fifos.Err != "" {
|
if !fifos.Terminal && fifos.Stderr != "" {
|
||||||
l, err := winio.ListenPipe(fifos.Err, nil)
|
l, err := winio.ListenPipe(fifos.Stderr, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.Wrapf(err, "failed to create stderr pipe %s", fifos.Err)
|
return nil, errors.Wrapf(err, "failed to create stderr pipe %s", fifos.Stderr)
|
||||||
}
|
}
|
||||||
defer func(l net.Listener) {
|
defer func(l net.Listener) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -95,23 +96,29 @@ func copyIO(fifos *FIFOSet, ioset *ioSet, tty bool) (_ *wgCloser, err error) {
|
|||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
c, err := l.Accept()
|
c, err := l.Accept()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.L.WithError(err).Errorf("failed to accept stderr connection on %s", fifos.Err)
|
log.L.WithError(err).Errorf("failed to accept stderr connection on %s", fifos.Stderr)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
io.Copy(ioset.err, c)
|
io.Copy(ioset.Stderr, c)
|
||||||
c.Close()
|
c.Close()
|
||||||
l.Close()
|
l.Close()
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
return &wgCloser{
|
return &cio{config: fifos.Config, closers: set}, nil
|
||||||
wg: &wg,
|
}
|
||||||
dir: fifos.Dir,
|
|
||||||
set: set,
|
// NewDirectIO returns an IO implementation that exposes the IO streams as io.ReadCloser
|
||||||
cancel: func() {
|
// and io.WriteCloser.
|
||||||
for _, l := range set {
|
func NewDirectIO(stdin io.WriteCloser, stdout, stderr io.ReadCloser, terminal bool) *DirectIO {
|
||||||
l.Close()
|
return &DirectIO{
|
||||||
}
|
pipes: pipes{
|
||||||
},
|
Stdin: stdin,
|
||||||
}, nil
|
Stdout: stdout,
|
||||||
|
Stderr: stderr,
|
||||||
|
},
|
||||||
|
cio: cio{
|
||||||
|
config: Config{Terminal: terminal},
|
||||||
|
},
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,8 +1,6 @@
|
|||||||
package tasks
|
package tasks
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"os"
|
|
||||||
|
|
||||||
"github.com/containerd/console"
|
"github.com/containerd/console"
|
||||||
"github.com/containerd/containerd/cio"
|
"github.com/containerd/containerd/cio"
|
||||||
"github.com/containerd/containerd/cmd/ctr/commands"
|
"github.com/containerd/containerd/cmd/ctr/commands"
|
||||||
@ -39,7 +37,7 @@ var attachCommand = cli.Command{
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
task, err := container.Task(ctx, cio.WithAttach(os.Stdin, os.Stdout, os.Stderr))
|
task, err := container.Task(ctx, cio.NewAttach(cio.WithStdio))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -60,9 +60,9 @@ var execCommand = cli.Command{
|
|||||||
pspec.Terminal = tty
|
pspec.Terminal = tty
|
||||||
pspec.Args = args
|
pspec.Args = args
|
||||||
|
|
||||||
ioCreator := cio.Stdio
|
ioCreator := cio.NewCreator(cio.WithStdio)
|
||||||
if tty {
|
if tty {
|
||||||
ioCreator = cio.StdioTerminal
|
ioCreator = cio.NewCreator(cio.WithStdio, cio.WithTerminal)
|
||||||
}
|
}
|
||||||
process, err := task.Exec(ctx, context.String("exec-id"), pspec, ioCreator)
|
process, err := task.Exec(ctx, context.String("exec-id"), pspec, ioCreator)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -44,10 +44,11 @@ func HandleConsoleResize(ctx gocontext.Context, task resizer, con console.Consol
|
|||||||
|
|
||||||
// NewTask creates a new task
|
// NewTask creates a new task
|
||||||
func NewTask(ctx gocontext.Context, client *containerd.Client, container containerd.Container, checkpoint string, tty, nullIO bool) (containerd.Task, error) {
|
func NewTask(ctx gocontext.Context, client *containerd.Client, container containerd.Container, checkpoint string, tty, nullIO bool) (containerd.Task, error) {
|
||||||
|
stdio := cio.NewCreator(cio.WithStdio)
|
||||||
if checkpoint == "" {
|
if checkpoint == "" {
|
||||||
ioCreator := cio.Stdio
|
ioCreator := stdio
|
||||||
if tty {
|
if tty {
|
||||||
ioCreator = cio.StdioTerminal
|
ioCreator = cio.NewCreator(cio.WithStdio, cio.WithTerminal)
|
||||||
}
|
}
|
||||||
if nullIO {
|
if nullIO {
|
||||||
if tty {
|
if tty {
|
||||||
@ -61,5 +62,5 @@ func NewTask(ctx gocontext.Context, client *containerd.Client, container contain
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return container.NewTask(ctx, cio.Stdio, containerd.WithTaskCheckpoint(im))
|
return container.NewTask(ctx, stdio, containerd.WithTaskCheckpoint(im))
|
||||||
}
|
}
|
||||||
|
@ -42,9 +42,9 @@ func HandleConsoleResize(ctx gocontext.Context, task resizer, con console.Consol
|
|||||||
|
|
||||||
// NewTask creates a new task
|
// NewTask creates a new task
|
||||||
func NewTask(ctx gocontext.Context, client *containerd.Client, container containerd.Container, _ string, tty, nullIO bool) (containerd.Task, error) {
|
func NewTask(ctx gocontext.Context, client *containerd.Client, container containerd.Container, _ string, tty, nullIO bool) (containerd.Task, error) {
|
||||||
ioCreator := cio.Stdio
|
ioCreator := cio.NewCreator(cio.WithStdio)
|
||||||
if tty {
|
if tty {
|
||||||
ioCreator = cio.StdioTerminal
|
ioCreator = cio.NewCreator(cio.WithStdio, cio.WithTerminal)
|
||||||
}
|
}
|
||||||
if nullIO {
|
if nullIO {
|
||||||
if tty {
|
if tty {
|
||||||
|
32
container.go
32
container.go
@ -3,6 +3,7 @@ package containerd
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
@ -26,7 +27,7 @@ type Container interface {
|
|||||||
// Delete removes the container
|
// Delete removes the container
|
||||||
Delete(context.Context, ...DeleteOpts) error
|
Delete(context.Context, ...DeleteOpts) error
|
||||||
// NewTask creates a new task based on the container metadata
|
// NewTask creates a new task based on the container metadata
|
||||||
NewTask(context.Context, cio.Creation, ...NewTaskOpts) (Task, error)
|
NewTask(context.Context, cio.Creator, ...NewTaskOpts) (Task, error)
|
||||||
// Spec returns the OCI runtime specification
|
// Spec returns the OCI runtime specification
|
||||||
Spec(context.Context) (*specs.Spec, error)
|
Spec(context.Context) (*specs.Spec, error)
|
||||||
// Task returns the current task for the container
|
// Task returns the current task for the container
|
||||||
@ -162,7 +163,7 @@ func (c *container) Image(ctx context.Context) (Image, error) {
|
|||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *container) NewTask(ctx context.Context, ioCreate cio.Creation, opts ...NewTaskOpts) (_ Task, err error) {
|
func (c *container) NewTask(ctx context.Context, ioCreate cio.Creator, opts ...NewTaskOpts) (_ Task, err error) {
|
||||||
i, err := ioCreate(c.id)
|
i, err := ioCreate(c.id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -288,20 +289,23 @@ func (c *container) get(ctx context.Context) (containers.Container, error) {
|
|||||||
return c.client.ContainerService().Get(ctx, c.id)
|
return c.client.ContainerService().Get(ctx, c.id)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// get the existing fifo paths from the task information stored by the daemon
|
||||||
func attachExistingIO(response *tasks.GetResponse, ioAttach cio.Attach) (cio.IO, error) {
|
func attachExistingIO(response *tasks.GetResponse, ioAttach cio.Attach) (cio.IO, error) {
|
||||||
// get the existing fifo paths from the task information stored by the daemon
|
path := getFifoDir([]string{
|
||||||
paths := &cio.FIFOSet{
|
response.Process.Stdin,
|
||||||
Dir: getFifoDir([]string{
|
response.Process.Stdout,
|
||||||
response.Process.Stdin,
|
response.Process.Stderr,
|
||||||
response.Process.Stdout,
|
})
|
||||||
response.Process.Stderr,
|
closer := func() error {
|
||||||
}),
|
return os.RemoveAll(path)
|
||||||
In: response.Process.Stdin,
|
|
||||||
Out: response.Process.Stdout,
|
|
||||||
Err: response.Process.Stderr,
|
|
||||||
Terminal: response.Process.Terminal,
|
|
||||||
}
|
}
|
||||||
return ioAttach(paths)
|
fifoSet := cio.NewFIFOSet(cio.Config{
|
||||||
|
Stdin: response.Process.Stdin,
|
||||||
|
Stdout: response.Process.Stdout,
|
||||||
|
Stderr: response.Process.Stderr,
|
||||||
|
Terminal: response.Process.Terminal,
|
||||||
|
}, closer)
|
||||||
|
return ioAttach(fifoSet)
|
||||||
}
|
}
|
||||||
|
|
||||||
// getFifoDir looks for any non-empty path for a stdio fifo
|
// getFifoDir looks for any non-empty path for a stdio fifo
|
||||||
|
@ -298,7 +298,7 @@ func TestContainerAttach(t *testing.T) {
|
|||||||
|
|
||||||
expected := "hello" + newLine
|
expected := "hello" + newLine
|
||||||
|
|
||||||
direct, err := cio.NewDirectIO(ctx, false)
|
direct, err := newDirectIO(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
return
|
return
|
||||||
@ -372,6 +372,53 @@ func TestContainerAttach(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func newDirectIO(ctx context.Context) (*directIO, error) {
|
||||||
|
fifos, err := cio.NewFIFOSetInDir("", "", false)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
dio, err := cio.NewDirectIO(ctx, fifos)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &directIO{DirectIO: *dio}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type directIO struct {
|
||||||
|
cio.DirectIO
|
||||||
|
}
|
||||||
|
|
||||||
|
// ioCreate returns IO avaliable for use with task creation
|
||||||
|
func (f *directIO) IOCreate(id string) (cio.IO, error) {
|
||||||
|
return f, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// ioAttach returns IO avaliable for use with task attachment
|
||||||
|
func (f *directIO) IOAttach(set *cio.FIFOSet) (cio.IO, error) {
|
||||||
|
return f, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *directIO) Cancel() {
|
||||||
|
// nothing to cancel as all operations are handled externally
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close closes all open fds
|
||||||
|
func (f *directIO) Close() error {
|
||||||
|
err := f.Stdin.Close()
|
||||||
|
if err2 := f.Stdout.Close(); err == nil {
|
||||||
|
err = err2
|
||||||
|
}
|
||||||
|
if err2 := f.Stderr.Close(); err == nil {
|
||||||
|
err = err2
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete removes the underlying directory containing fifos
|
||||||
|
func (f *directIO) Delete() error {
|
||||||
|
return f.DirectIO.Close()
|
||||||
|
}
|
||||||
|
|
||||||
func TestContainerUsername(t *testing.T) {
|
func TestContainerUsername(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
@ -393,7 +440,7 @@ func TestContainerUsername(t *testing.T) {
|
|||||||
t.Error(err)
|
t.Error(err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
direct, err := cio.NewDirectIO(ctx, false)
|
direct, err := newDirectIO(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
return
|
return
|
||||||
@ -486,7 +533,7 @@ func TestContainerAttachProcess(t *testing.T) {
|
|||||||
expected := "hello" + newLine
|
expected := "hello" + newLine
|
||||||
|
|
||||||
// creating IO early for easy resource cleanup
|
// creating IO early for easy resource cleanup
|
||||||
direct, err := cio.NewDirectIO(ctx, false)
|
direct, err := newDirectIO(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
return
|
return
|
||||||
@ -602,7 +649,7 @@ func TestContainerUserID(t *testing.T) {
|
|||||||
t.Error(err)
|
t.Error(err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
direct, err := cio.NewDirectIO(ctx, false)
|
direct, err := newDirectIO(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
return
|
return
|
||||||
@ -692,7 +739,7 @@ func TestContainerKillAll(t *testing.T) {
|
|||||||
defer container.Delete(ctx, WithSnapshotCleanup)
|
defer container.Delete(ctx, WithSnapshotCleanup)
|
||||||
|
|
||||||
stdout := bytes.NewBuffer(nil)
|
stdout := bytes.NewBuffer(nil)
|
||||||
task, err := container.NewTask(ctx, cio.NewIO(bytes.NewBuffer(nil), stdout, bytes.NewBuffer(nil)))
|
task, err := container.NewTask(ctx, cio.NewCreator(withByteBuffers(stdout)))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
return
|
return
|
||||||
@ -944,7 +991,7 @@ func TestContainerKillInitPidHost(t *testing.T) {
|
|||||||
defer container.Delete(ctx, WithSnapshotCleanup)
|
defer container.Delete(ctx, WithSnapshotCleanup)
|
||||||
|
|
||||||
stdout := bytes.NewBuffer(nil)
|
stdout := bytes.NewBuffer(nil)
|
||||||
task, err := container.NewTask(ctx, cio.NewIO(bytes.NewBuffer(nil), stdout, bytes.NewBuffer(nil)))
|
task, err := container.NewTask(ctx, cio.NewCreator(withByteBuffers(stdout)))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
return
|
return
|
||||||
@ -1043,7 +1090,7 @@ func testUserNamespaces(t *testing.T, readonlyRootFS bool) {
|
|||||||
}
|
}
|
||||||
defer container.Delete(ctx, WithSnapshotCleanup)
|
defer container.Delete(ctx, WithSnapshotCleanup)
|
||||||
|
|
||||||
task, err := container.NewTask(ctx, cio.Stdio, func(_ context.Context, client *Client, r *TaskInfo) error {
|
task, err := container.NewTask(ctx, cio.NewCreator(cio.WithStdio), func(_ context.Context, client *Client, r *TaskInfo) error {
|
||||||
r.Options = &runctypes.CreateOptions{
|
r.Options = &runctypes.CreateOptions{
|
||||||
IoUid: 1000,
|
IoUid: 1000,
|
||||||
IoGid: 1000,
|
IoGid: 1000,
|
||||||
|
@ -3,6 +3,7 @@ package containerd
|
|||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
|
"io"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"os"
|
"os"
|
||||||
"runtime"
|
"runtime"
|
||||||
@ -23,11 +24,11 @@ import (
|
|||||||
gogotypes "github.com/gogo/protobuf/types"
|
gogotypes "github.com/gogo/protobuf/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
func empty() cio.Creation {
|
func empty() cio.Creator {
|
||||||
// TODO (@mlaventure) windows searches for pipes
|
// TODO (@mlaventure) windows searches for pipes
|
||||||
// when none are provided
|
// when none are provided
|
||||||
if runtime.GOOS == "windows" {
|
if runtime.GOOS == "windows" {
|
||||||
return cio.Stdio
|
return cio.NewCreator(cio.WithStdio)
|
||||||
}
|
}
|
||||||
return cio.NullIO
|
return cio.NullIO
|
||||||
}
|
}
|
||||||
@ -187,7 +188,7 @@ func TestContainerOutput(t *testing.T) {
|
|||||||
defer container.Delete(ctx, WithSnapshotCleanup)
|
defer container.Delete(ctx, WithSnapshotCleanup)
|
||||||
|
|
||||||
stdout := bytes.NewBuffer(nil)
|
stdout := bytes.NewBuffer(nil)
|
||||||
task, err := container.NewTask(ctx, cio.NewIO(bytes.NewBuffer(nil), stdout, bytes.NewBuffer(nil)))
|
task, err := container.NewTask(ctx, cio.NewCreator(withByteBuffers(stdout)))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
return
|
return
|
||||||
@ -223,6 +224,15 @@ func TestContainerOutput(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func withByteBuffers(stdout io.Writer) cio.Opt {
|
||||||
|
// TODO: could this use ioutil.Discard?
|
||||||
|
return func(streams *cio.Streams) {
|
||||||
|
streams.Stdin = new(bytes.Buffer)
|
||||||
|
streams.Stdout = stdout
|
||||||
|
streams.Stderr = new(bytes.Buffer)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestContainerExec(t *testing.T) {
|
func TestContainerExec(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
@ -534,7 +544,7 @@ func TestContainerCloseIO(t *testing.T) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
task, err := container.NewTask(ctx, cio.NewIO(r, stdout, ioutil.Discard))
|
task, err := container.NewTask(ctx, cio.NewCreator(cio.WithStreams(r, stdout, ioutil.Discard)))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
return
|
return
|
||||||
@ -1145,7 +1155,7 @@ func TestContainerHostname(t *testing.T) {
|
|||||||
defer container.Delete(ctx, WithSnapshotCleanup)
|
defer container.Delete(ctx, WithSnapshotCleanup)
|
||||||
|
|
||||||
stdout := bytes.NewBuffer(nil)
|
stdout := bytes.NewBuffer(nil)
|
||||||
task, err := container.NewTask(ctx, cio.NewIO(bytes.NewBuffer(nil), stdout, bytes.NewBuffer(nil)))
|
task, err := container.NewTask(ctx, cio.NewCreator(withByteBuffers(stdout)))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
return
|
return
|
||||||
|
4
task.go
4
task.go
@ -123,7 +123,7 @@ type Task interface {
|
|||||||
// Resume the execution of the task
|
// Resume the execution of the task
|
||||||
Resume(context.Context) error
|
Resume(context.Context) error
|
||||||
// Exec creates a new process inside the task
|
// Exec creates a new process inside the task
|
||||||
Exec(context.Context, string, *specs.Process, cio.Creation) (Process, error)
|
Exec(context.Context, string, *specs.Process, cio.Creator) (Process, error)
|
||||||
// Pids returns a list of system specific process ids inside the task
|
// Pids returns a list of system specific process ids inside the task
|
||||||
Pids(context.Context) ([]ProcessInfo, error)
|
Pids(context.Context) ([]ProcessInfo, error)
|
||||||
// Checkpoint serializes the runtime and memory information of a task into an
|
// Checkpoint serializes the runtime and memory information of a task into an
|
||||||
@ -278,7 +278,7 @@ func (t *task) Delete(ctx context.Context, opts ...ProcessDeleteOpts) (*ExitStat
|
|||||||
return &ExitStatus{code: r.ExitStatus, exitedAt: r.ExitedAt}, nil
|
return &ExitStatus{code: r.ExitStatus, exitedAt: r.ExitedAt}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *task) Exec(ctx context.Context, id string, spec *specs.Process, ioCreate cio.Creation) (_ Process, err error) {
|
func (t *task) Exec(ctx context.Context, id string, spec *specs.Process, ioCreate cio.Creator) (_ Process, err error) {
|
||||||
if id == "" {
|
if id == "" {
|
||||||
return nil, errors.Wrapf(errdefs.ErrInvalidArgument, "exec id must not be empty")
|
return nil, errors.Wrapf(errdefs.ErrInvalidArgument, "exec id must not be empty")
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user