From 43fb19e01c2966bffb015073a55ca923b09db7cd Mon Sep 17 00:00:00 2001 From: Michael Crosby Date: Wed, 31 May 2017 16:29:41 -0700 Subject: [PATCH] Add Load for container and Task with Attach This adds both container and task loading of running tasks as well as reattaching to the IO of the task after load. Signed-off-by: Michael Crosby --- client.go | 10 ++++ container.go | 22 ++++++++ container_test.go | 136 +++++++++++++++++++++++++++++++++++++++++++++- io.go | 64 +++++++++++++--------- io_unix.go | 10 ++-- io_windows.go | 28 +++++----- task.go | 5 ++ 7 files changed, 228 insertions(+), 47 deletions(-) diff --git a/client.go b/client.go index 2047c373e..eacb1024b 100644 --- a/client.go +++ b/client.go @@ -203,6 +203,16 @@ func (c *Client) NewContainer(ctx context.Context, id string, spec *specs.Spec, return containerFromProto(c, r.Container), nil } +func (c *Client) LoadContainer(ctx context.Context, id string) (Container, error) { + response, err := c.ContainerService().Get(ctx, &containers.GetContainerRequest{ + ID: id, + }) + if err != nil { + return nil, err + } + return containerFromProto(c, response.Container), nil +} + type RemoteOpts func(*Client, *RemoteContext) error // RemoteContext is used to configure object resolutions and transfers with diff --git a/container.go b/container.go index 7b9bffd8d..1deec2602 100644 --- a/container.go +++ b/container.go @@ -16,6 +16,7 @@ type Container interface { NewTask(context.Context, IOCreation) (Task, error) Spec() (*specs.Spec, error) Task() Task + LoadTask(context.Context, IOCreation) (Task, error) } func containerFromProto(client *Client, c containers.Container) *container { @@ -108,3 +109,24 @@ func (c *container) NewTask(ctx context.Context, ioCreate IOCreation) (Task, err c.task = t return t, nil } + +func (c *container) LoadTask(ctx context.Context, ioCreate IOCreation) (Task, error) { + i, err := ioCreate() + if err != nil { + return nil, err + } + response, err := c.client.TaskService().Info(ctx, &execution.InfoRequest{ + ContainerID: c.c.ID, + }) + if err != nil { + return nil, err + } + t := &task{ + client: c.client, + io: i, + containerID: response.Task.ContainerID, + pid: response.Task.Pid, + } + c.task = t + return t, nil +} diff --git a/container_test.go b/container_test.go index 44ac5cf5b..f2e791072 100644 --- a/container_test.go +++ b/container_test.go @@ -4,14 +4,17 @@ import ( "bytes" "context" "fmt" + "io" "io/ioutil" "os" + "sync" "syscall" "testing" ) func empty() IOCreation { - return BufferedIO(bytes.NewBuffer(nil), bytes.NewBuffer(nil), bytes.NewBuffer(nil)) + null := ioutil.Discard + return NewIO(bytes.NewBuffer(nil), null, null) } func TestContainerList(t *testing.T) { @@ -170,7 +173,7 @@ func TestContainerOutput(t *testing.T) { defer container.Delete(ctx) stdout := bytes.NewBuffer(nil) - task, err := container.NewTask(ctx, BufferedIO(bytes.NewBuffer(nil), stdout, bytes.NewBuffer(nil))) + task, err := container.NewTask(ctx, NewIO(bytes.NewBuffer(nil), stdout, bytes.NewBuffer(nil))) if err != nil { t.Error(err) return @@ -445,3 +448,132 @@ func TestContainerCloseStdin(t *testing.T) { t.Errorf("expected output %q but received %q", expected, output) } } + +func TestContainerAttach(t *testing.T) { + if testing.Short() { + t.Skip() + } + client, err := New(address) + if err != nil { + t.Fatal(err) + } + defer client.Close() + + var ( + ctx = context.Background() + id = "ContainerAttach" + ) + image, err := client.GetImage(ctx, testImage) + if err != nil { + t.Error(err) + return + } + spec, err := GenerateSpec(WithImageConfig(ctx, image), WithProcessArgs("cat")) + if err != nil { + t.Error(err) + return + } + container, err := client.NewContainer(ctx, id, spec, WithImage(image), WithNewRootFS(id, image)) + if err != nil { + t.Error(err) + return + } + defer container.Delete(ctx) + + expected := "hello\n" + stdout := bytes.NewBuffer(nil) + + r, w, err := os.Pipe() + if err != nil { + t.Error(err) + return + } + or, ow, err := os.Pipe() + if err != nil { + t.Error(err) + return + } + + wg := &sync.WaitGroup{} + + wg.Add(1) + go func() { + io.Copy(stdout, or) + wg.Done() + }() + + // TODO: return fifo information from shim based on task/process + dir, err := ioutil.TempDir("", "attach") + if err != nil { + t.Error(err) + return + } + task, err := container.NewTask(ctx, WithIO(r, ow, ioutil.Discard, dir)) + if err != nil { + t.Error(err) + return + } + defer task.Delete(ctx) + originalIO := task.IO() + + statusC := make(chan uint32, 1) + go func() { + status, err := task.Wait(ctx) + if err != nil { + t.Error(err) + } + statusC <- status + }() + + if err := task.Start(ctx); err != nil { + t.Error(err) + return + } + + if _, err := fmt.Fprint(w, expected); err != nil { + t.Error(err) + } + w.Close() + + // load the container and re-load the task + if container, err = client.LoadContainer(ctx, id); err != nil { + t.Error(err) + return + } + + // create new IO for the loaded task + if r, w, err = os.Pipe(); err != nil { + t.Error(err) + return + } + if task, err = container.LoadTask(ctx, WithIO(r, ow, ioutil.Discard, dir)); err != nil { + t.Error(err) + return + } + + if _, err := fmt.Fprint(w, expected); err != nil { + t.Error(err) + } + w.Close() + + if err := task.CloseStdin(ctx); err != nil { + t.Error(err) + } + + <-statusC + + originalIO.Close() + if _, err := task.Delete(ctx); err != nil { + t.Error(err) + } + ow.Close() + + wg.Wait() + output := stdout.String() + + // we wrote the same thing after attach + expected = expected + expected + if output != expected { + t.Errorf("expected output %q but received %q", expected, output) + } +} diff --git a/io.go b/io.go index 016053142..e8c9f817c 100644 --- a/io.go +++ b/io.go @@ -1,7 +1,6 @@ package containerd import ( - "bytes" "io" "io/ioutil" "os" @@ -27,18 +26,17 @@ func (i *IO) Close() error { type IOCreation func() (*IO, error) -// BufferedIO returns IO that will be logged to an in memory buffer -func BufferedIO(stdin, stdout, stderr *bytes.Buffer) IOCreation { +func NewIO(stdin io.Reader, stdout, stderr io.Writer) IOCreation { return func() (*IO, error) { - paths, err := fifoPaths() + paths, err := NewFifos() if err != nil { return nil, err } i := &IO{ Terminal: false, - Stdout: paths.out, - Stderr: paths.err, - Stdin: paths.in, + Stdout: paths.Out, + Stderr: paths.Err, + Stdin: paths.In, } set := &ioSet{ in: stdin, @@ -54,17 +52,17 @@ func BufferedIO(stdin, stdout, stderr *bytes.Buffer) IOCreation { } } -func NewIO(stdin io.Reader, stdout, stderr io.Writer) IOCreation { +func WithIO(stdin io.Reader, stdout, stderr io.Writer, dir string) IOCreation { return func() (*IO, error) { - paths, err := fifoPaths() + paths, err := WithFifos(dir) if err != nil { return nil, err } i := &IO{ Terminal: false, - Stdout: paths.out, - Stderr: paths.err, - Stdin: paths.in, + Stdout: paths.Out, + Stderr: paths.Err, + Stdin: paths.In, } set := &ioSet{ in: stdin, @@ -83,7 +81,7 @@ func NewIO(stdin io.Reader, stdout, stderr io.Writer) IOCreation { // Stdio returns an IO implementation to be used for a task // that outputs the container's IO as the current processes Stdio func Stdio() (*IO, error) { - paths, err := fifoPaths() + paths, err := NewFifos() if err != nil { return nil, err } @@ -98,14 +96,15 @@ func Stdio() (*IO, error) { } return &IO{ Terminal: false, - Stdin: paths.in, - Stdout: paths.out, - Stderr: paths.err, + Stdin: paths.In, + Stdout: paths.Out, + Stderr: paths.Err, closer: closer, }, nil } -func fifoPaths() (*fifoSet, error) { +// NewFifos returns a new set of fifos for the task +func NewFifos() (*FifoSet, error) { root := filepath.Join(os.TempDir(), "containerd") if err := os.MkdirAll(root, 0700); err != nil { return nil, err @@ -114,18 +113,31 @@ func fifoPaths() (*fifoSet, error) { if err != nil { return nil, err } - return &fifoSet{ - dir: dir, - in: filepath.Join(dir, "stdin"), - out: filepath.Join(dir, "stdout"), - err: filepath.Join(dir, "stderr"), + return &FifoSet{ + Dir: dir, + In: filepath.Join(dir, "stdin"), + Out: filepath.Join(dir, "stdout"), + Err: filepath.Join(dir, "stderr"), }, nil } -type fifoSet struct { - // dir is the directory holding the task fifos - dir string - in, out, err string +// WithFifos returns existing or creates new fifos inside an existing dir +func WithFifos(dir string) (*FifoSet, error) { + if err := os.MkdirAll(dir, 0700); err != nil { + return nil, err + } + return &FifoSet{ + Dir: dir, + In: filepath.Join(dir, "stdin"), + Out: filepath.Join(dir, "stdout"), + Err: filepath.Join(dir, "stderr"), + }, nil +} + +type FifoSet struct { + // Dir is the directory holding the task fifos + Dir string + In, Out, Err string } type ioSet struct { diff --git a/io_unix.go b/io_unix.go index a243c3a52..5c0cf0091 100644 --- a/io_unix.go +++ b/io_unix.go @@ -11,14 +11,14 @@ import ( "github.com/containerd/fifo" ) -func copyIO(fifos *fifoSet, ioset *ioSet, tty bool) (closer io.Closer, err error) { +func copyIO(fifos *FifoSet, ioset *ioSet, tty bool) (closer io.Closer, err error) { var ( f io.ReadWriteCloser ctx = context.Background() wg = &sync.WaitGroup{} ) - if f, err = fifo.OpenFifo(ctx, fifos.in, syscall.O_WRONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil { + if f, err = fifo.OpenFifo(ctx, fifos.In, syscall.O_WRONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil { return nil, err } defer func(c io.Closer) { @@ -31,7 +31,7 @@ func copyIO(fifos *fifoSet, ioset *ioSet, tty bool) (closer io.Closer, err error w.Close() }(f) - if f, err = fifo.OpenFifo(ctx, fifos.out, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil { + if f, err = fifo.OpenFifo(ctx, fifos.Out, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil { return nil, err } defer func(c io.Closer) { @@ -46,7 +46,7 @@ func copyIO(fifos *fifoSet, ioset *ioSet, tty bool) (closer io.Closer, err error wg.Done() }(f) - if f, err = fifo.OpenFifo(ctx, fifos.err, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil { + if f, err = fifo.OpenFifo(ctx, fifos.Err, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil { return nil, err } defer func(c io.Closer) { @@ -66,6 +66,6 @@ func copyIO(fifos *fifoSet, ioset *ioSet, tty bool) (closer io.Closer, err error return &wgCloser{ wg: wg, - dir: fifos.dir, + dir: fifos.Dir, }, nil } diff --git a/io_windows.go b/io_windows.go index 4aa2ed5aa..2fb97566c 100644 --- a/io_windows.go +++ b/io_windows.go @@ -10,13 +10,13 @@ import ( "github.com/pkg/errors" ) -func copyIO(fifos *fifoSet, ioset *ioSet, tty bool) (closer io.Closer, err error) { +func copyIO(fifos *FifoSet, ioset *ioSet, tty bool) (closer io.Closer, err error) { var wg sync.WaitGroup - if fifos.in != "" { - l, err := winio.ListenPipe(fifos.in, nil) + if fifos.In != "" { + l, err := winio.ListenPipe(fifos.In, nil) if err != nil { - return nil, errors.Wrapf(err, "failed to create stdin pipe %s", fifos.in) + return nil, errors.Wrapf(err, "failed to create stdin pipe %s", fifos.In) } defer func(l net.Listener) { if err != nil { @@ -27,7 +27,7 @@ func copyIO(fifos *fifoSet, ioset *ioSet, tty bool) (closer io.Closer, err error go func() { c, err := l.Accept() if err != nil { - log.L.WithError(err).Errorf("failed to accept stdin connection on %s", fifos.in) + log.L.WithError(err).Errorf("failed to accept stdin connection on %s", fifos.In) return } io.Copy(c, ioset.in) @@ -36,10 +36,10 @@ func copyIO(fifos *fifoSet, ioset *ioSet, tty bool) (closer io.Closer, err error }() } - if fifos.out != "" { - l, err := winio.ListenPipe(fifos.out, nil) + if fifos.Out != "" { + l, err := winio.ListenPipe(fifos.Out, nil) if err != nil { - return nil, errors.Wrapf(err, "failed to create stdin pipe %s", fifos.out) + return nil, errors.Wrapf(err, "failed to create stdin pipe %s", fifos.Out) } defer func(l net.Listener) { if err != nil { @@ -52,7 +52,7 @@ func copyIO(fifos *fifoSet, ioset *ioSet, tty bool) (closer io.Closer, err error defer wg.Done() c, err := l.Accept() if err != nil { - log.L.WithError(err).Errorf("failed to accept stdout connection on %s", fifos.out) + log.L.WithError(err).Errorf("failed to accept stdout connection on %s", fifos.Out) return } io.Copy(ioset.out, c) @@ -61,10 +61,10 @@ func copyIO(fifos *fifoSet, ioset *ioSet, tty bool) (closer io.Closer, err error }() } - if !tty && fifos.err != "" { - l, err := winio.ListenPipe(fifos.err, nil) + if !tty && fifos.Err != "" { + l, err := winio.ListenPipe(fifos.Err, nil) if err != nil { - return nil, errors.Wrapf(err, "failed to create stderr pipe %s", fifos.err) + return nil, errors.Wrapf(err, "failed to create stderr pipe %s", fifos.Err) } defer func(l net.Listener) { if err != nil { @@ -77,7 +77,7 @@ func copyIO(fifos *fifoSet, ioset *ioSet, tty bool) (closer io.Closer, err error defer wg.Done() c, err := l.Accept() if err != nil { - log.L.WithError(err).Errorf("failed to accept stderr connection on %s", fifos.err) + log.L.WithError(err).Errorf("failed to accept stderr connection on %s", fifos.Err) return } io.Copy(ioset.err, c) @@ -88,6 +88,6 @@ func copyIO(fifos *fifoSet, ioset *ioSet, tty bool) (closer io.Closer, err error return &wgCloser{ wg: &wg, - dir: fifos.dir, + dir: fifos.Dir, }, nil } diff --git a/task.go b/task.go index bee50db66..21bf92a3f 100644 --- a/task.go +++ b/task.go @@ -33,6 +33,7 @@ type Task interface { Exec(context.Context, *specs.Process, IOCreation) (Process, error) Processes(context.Context) ([]uint32, error) CloseStdin(context.Context) error + IO() *IO } type Process interface { @@ -168,3 +169,7 @@ func (t *task) CloseStdin(ctx context.Context) error { }) return err } + +func (t *task) IO() *IO { + return t.io +}