diff --git a/client.go b/client.go index 2047c373e..eacb1024b 100644 --- a/client.go +++ b/client.go @@ -203,6 +203,16 @@ func (c *Client) NewContainer(ctx context.Context, id string, spec *specs.Spec, return containerFromProto(c, r.Container), nil } +func (c *Client) LoadContainer(ctx context.Context, id string) (Container, error) { + response, err := c.ContainerService().Get(ctx, &containers.GetContainerRequest{ + ID: id, + }) + if err != nil { + return nil, err + } + return containerFromProto(c, response.Container), nil +} + type RemoteOpts func(*Client, *RemoteContext) error // RemoteContext is used to configure object resolutions and transfers with diff --git a/container.go b/container.go index 7b9bffd8d..1deec2602 100644 --- a/container.go +++ b/container.go @@ -16,6 +16,7 @@ type Container interface { NewTask(context.Context, IOCreation) (Task, error) Spec() (*specs.Spec, error) Task() Task + LoadTask(context.Context, IOCreation) (Task, error) } func containerFromProto(client *Client, c containers.Container) *container { @@ -108,3 +109,24 @@ func (c *container) NewTask(ctx context.Context, ioCreate IOCreation) (Task, err c.task = t return t, nil } + +func (c *container) LoadTask(ctx context.Context, ioCreate IOCreation) (Task, error) { + i, err := ioCreate() + if err != nil { + return nil, err + } + response, err := c.client.TaskService().Info(ctx, &execution.InfoRequest{ + ContainerID: c.c.ID, + }) + if err != nil { + return nil, err + } + t := &task{ + client: c.client, + io: i, + containerID: response.Task.ContainerID, + pid: response.Task.Pid, + } + c.task = t + return t, nil +} diff --git a/container_test.go b/container_test.go index 44ac5cf5b..f2e791072 100644 --- a/container_test.go +++ b/container_test.go @@ -4,14 +4,17 @@ import ( "bytes" "context" "fmt" + "io" "io/ioutil" "os" + "sync" "syscall" "testing" ) func empty() IOCreation { - return BufferedIO(bytes.NewBuffer(nil), bytes.NewBuffer(nil), bytes.NewBuffer(nil)) + null := ioutil.Discard + return NewIO(bytes.NewBuffer(nil), null, null) } func TestContainerList(t *testing.T) { @@ -170,7 +173,7 @@ func TestContainerOutput(t *testing.T) { defer container.Delete(ctx) stdout := bytes.NewBuffer(nil) - task, err := container.NewTask(ctx, BufferedIO(bytes.NewBuffer(nil), stdout, bytes.NewBuffer(nil))) + task, err := container.NewTask(ctx, NewIO(bytes.NewBuffer(nil), stdout, bytes.NewBuffer(nil))) if err != nil { t.Error(err) return @@ -445,3 +448,132 @@ func TestContainerCloseStdin(t *testing.T) { t.Errorf("expected output %q but received %q", expected, output) } } + +func TestContainerAttach(t *testing.T) { + if testing.Short() { + t.Skip() + } + client, err := New(address) + if err != nil { + t.Fatal(err) + } + defer client.Close() + + var ( + ctx = context.Background() + id = "ContainerAttach" + ) + image, err := client.GetImage(ctx, testImage) + if err != nil { + t.Error(err) + return + } + spec, err := GenerateSpec(WithImageConfig(ctx, image), WithProcessArgs("cat")) + if err != nil { + t.Error(err) + return + } + container, err := client.NewContainer(ctx, id, spec, WithImage(image), WithNewRootFS(id, image)) + if err != nil { + t.Error(err) + return + } + defer container.Delete(ctx) + + expected := "hello\n" + stdout := bytes.NewBuffer(nil) + + r, w, err := os.Pipe() + if err != nil { + t.Error(err) + return + } + or, ow, err := os.Pipe() + if err != nil { + t.Error(err) + return + } + + wg := &sync.WaitGroup{} + + wg.Add(1) + go func() { + io.Copy(stdout, or) + wg.Done() + }() + + // TODO: return fifo information from shim based on task/process + dir, err := ioutil.TempDir("", "attach") + if err != nil { + t.Error(err) + return + } + task, err := container.NewTask(ctx, WithIO(r, ow, ioutil.Discard, dir)) + if err != nil { + t.Error(err) + return + } + defer task.Delete(ctx) + originalIO := task.IO() + + statusC := make(chan uint32, 1) + go func() { + status, err := task.Wait(ctx) + if err != nil { + t.Error(err) + } + statusC <- status + }() + + if err := task.Start(ctx); err != nil { + t.Error(err) + return + } + + if _, err := fmt.Fprint(w, expected); err != nil { + t.Error(err) + } + w.Close() + + // load the container and re-load the task + if container, err = client.LoadContainer(ctx, id); err != nil { + t.Error(err) + return + } + + // create new IO for the loaded task + if r, w, err = os.Pipe(); err != nil { + t.Error(err) + return + } + if task, err = container.LoadTask(ctx, WithIO(r, ow, ioutil.Discard, dir)); err != nil { + t.Error(err) + return + } + + if _, err := fmt.Fprint(w, expected); err != nil { + t.Error(err) + } + w.Close() + + if err := task.CloseStdin(ctx); err != nil { + t.Error(err) + } + + <-statusC + + originalIO.Close() + if _, err := task.Delete(ctx); err != nil { + t.Error(err) + } + ow.Close() + + wg.Wait() + output := stdout.String() + + // we wrote the same thing after attach + expected = expected + expected + if output != expected { + t.Errorf("expected output %q but received %q", expected, output) + } +} diff --git a/io.go b/io.go index 016053142..e8c9f817c 100644 --- a/io.go +++ b/io.go @@ -1,7 +1,6 @@ package containerd import ( - "bytes" "io" "io/ioutil" "os" @@ -27,18 +26,17 @@ func (i *IO) Close() error { type IOCreation func() (*IO, error) -// BufferedIO returns IO that will be logged to an in memory buffer -func BufferedIO(stdin, stdout, stderr *bytes.Buffer) IOCreation { +func NewIO(stdin io.Reader, stdout, stderr io.Writer) IOCreation { return func() (*IO, error) { - paths, err := fifoPaths() + paths, err := NewFifos() if err != nil { return nil, err } i := &IO{ Terminal: false, - Stdout: paths.out, - Stderr: paths.err, - Stdin: paths.in, + Stdout: paths.Out, + Stderr: paths.Err, + Stdin: paths.In, } set := &ioSet{ in: stdin, @@ -54,17 +52,17 @@ func BufferedIO(stdin, stdout, stderr *bytes.Buffer) IOCreation { } } -func NewIO(stdin io.Reader, stdout, stderr io.Writer) IOCreation { +func WithIO(stdin io.Reader, stdout, stderr io.Writer, dir string) IOCreation { return func() (*IO, error) { - paths, err := fifoPaths() + paths, err := WithFifos(dir) if err != nil { return nil, err } i := &IO{ Terminal: false, - Stdout: paths.out, - Stderr: paths.err, - Stdin: paths.in, + Stdout: paths.Out, + Stderr: paths.Err, + Stdin: paths.In, } set := &ioSet{ in: stdin, @@ -83,7 +81,7 @@ func NewIO(stdin io.Reader, stdout, stderr io.Writer) IOCreation { // Stdio returns an IO implementation to be used for a task // that outputs the container's IO as the current processes Stdio func Stdio() (*IO, error) { - paths, err := fifoPaths() + paths, err := NewFifos() if err != nil { return nil, err } @@ -98,14 +96,15 @@ func Stdio() (*IO, error) { } return &IO{ Terminal: false, - Stdin: paths.in, - Stdout: paths.out, - Stderr: paths.err, + Stdin: paths.In, + Stdout: paths.Out, + Stderr: paths.Err, closer: closer, }, nil } -func fifoPaths() (*fifoSet, error) { +// NewFifos returns a new set of fifos for the task +func NewFifos() (*FifoSet, error) { root := filepath.Join(os.TempDir(), "containerd") if err := os.MkdirAll(root, 0700); err != nil { return nil, err @@ -114,18 +113,31 @@ func fifoPaths() (*fifoSet, error) { if err != nil { return nil, err } - return &fifoSet{ - dir: dir, - in: filepath.Join(dir, "stdin"), - out: filepath.Join(dir, "stdout"), - err: filepath.Join(dir, "stderr"), + return &FifoSet{ + Dir: dir, + In: filepath.Join(dir, "stdin"), + Out: filepath.Join(dir, "stdout"), + Err: filepath.Join(dir, "stderr"), }, nil } -type fifoSet struct { - // dir is the directory holding the task fifos - dir string - in, out, err string +// WithFifos returns existing or creates new fifos inside an existing dir +func WithFifos(dir string) (*FifoSet, error) { + if err := os.MkdirAll(dir, 0700); err != nil { + return nil, err + } + return &FifoSet{ + Dir: dir, + In: filepath.Join(dir, "stdin"), + Out: filepath.Join(dir, "stdout"), + Err: filepath.Join(dir, "stderr"), + }, nil +} + +type FifoSet struct { + // Dir is the directory holding the task fifos + Dir string + In, Out, Err string } type ioSet struct { diff --git a/io_unix.go b/io_unix.go index a243c3a52..5c0cf0091 100644 --- a/io_unix.go +++ b/io_unix.go @@ -11,14 +11,14 @@ import ( "github.com/containerd/fifo" ) -func copyIO(fifos *fifoSet, ioset *ioSet, tty bool) (closer io.Closer, err error) { +func copyIO(fifos *FifoSet, ioset *ioSet, tty bool) (closer io.Closer, err error) { var ( f io.ReadWriteCloser ctx = context.Background() wg = &sync.WaitGroup{} ) - if f, err = fifo.OpenFifo(ctx, fifos.in, syscall.O_WRONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil { + if f, err = fifo.OpenFifo(ctx, fifos.In, syscall.O_WRONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil { return nil, err } defer func(c io.Closer) { @@ -31,7 +31,7 @@ func copyIO(fifos *fifoSet, ioset *ioSet, tty bool) (closer io.Closer, err error w.Close() }(f) - if f, err = fifo.OpenFifo(ctx, fifos.out, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil { + if f, err = fifo.OpenFifo(ctx, fifos.Out, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil { return nil, err } defer func(c io.Closer) { @@ -46,7 +46,7 @@ func copyIO(fifos *fifoSet, ioset *ioSet, tty bool) (closer io.Closer, err error wg.Done() }(f) - if f, err = fifo.OpenFifo(ctx, fifos.err, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil { + if f, err = fifo.OpenFifo(ctx, fifos.Err, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil { return nil, err } defer func(c io.Closer) { @@ -66,6 +66,6 @@ func copyIO(fifos *fifoSet, ioset *ioSet, tty bool) (closer io.Closer, err error return &wgCloser{ wg: wg, - dir: fifos.dir, + dir: fifos.Dir, }, nil } diff --git a/io_windows.go b/io_windows.go index 4aa2ed5aa..2fb97566c 100644 --- a/io_windows.go +++ b/io_windows.go @@ -10,13 +10,13 @@ import ( "github.com/pkg/errors" ) -func copyIO(fifos *fifoSet, ioset *ioSet, tty bool) (closer io.Closer, err error) { +func copyIO(fifos *FifoSet, ioset *ioSet, tty bool) (closer io.Closer, err error) { var wg sync.WaitGroup - if fifos.in != "" { - l, err := winio.ListenPipe(fifos.in, nil) + if fifos.In != "" { + l, err := winio.ListenPipe(fifos.In, nil) if err != nil { - return nil, errors.Wrapf(err, "failed to create stdin pipe %s", fifos.in) + return nil, errors.Wrapf(err, "failed to create stdin pipe %s", fifos.In) } defer func(l net.Listener) { if err != nil { @@ -27,7 +27,7 @@ func copyIO(fifos *fifoSet, ioset *ioSet, tty bool) (closer io.Closer, err error go func() { c, err := l.Accept() if err != nil { - log.L.WithError(err).Errorf("failed to accept stdin connection on %s", fifos.in) + log.L.WithError(err).Errorf("failed to accept stdin connection on %s", fifos.In) return } io.Copy(c, ioset.in) @@ -36,10 +36,10 @@ func copyIO(fifos *fifoSet, ioset *ioSet, tty bool) (closer io.Closer, err error }() } - if fifos.out != "" { - l, err := winio.ListenPipe(fifos.out, nil) + if fifos.Out != "" { + l, err := winio.ListenPipe(fifos.Out, nil) if err != nil { - return nil, errors.Wrapf(err, "failed to create stdin pipe %s", fifos.out) + return nil, errors.Wrapf(err, "failed to create stdin pipe %s", fifos.Out) } defer func(l net.Listener) { if err != nil { @@ -52,7 +52,7 @@ func copyIO(fifos *fifoSet, ioset *ioSet, tty bool) (closer io.Closer, err error defer wg.Done() c, err := l.Accept() if err != nil { - log.L.WithError(err).Errorf("failed to accept stdout connection on %s", fifos.out) + log.L.WithError(err).Errorf("failed to accept stdout connection on %s", fifos.Out) return } io.Copy(ioset.out, c) @@ -61,10 +61,10 @@ func copyIO(fifos *fifoSet, ioset *ioSet, tty bool) (closer io.Closer, err error }() } - if !tty && fifos.err != "" { - l, err := winio.ListenPipe(fifos.err, nil) + if !tty && fifos.Err != "" { + l, err := winio.ListenPipe(fifos.Err, nil) if err != nil { - return nil, errors.Wrapf(err, "failed to create stderr pipe %s", fifos.err) + return nil, errors.Wrapf(err, "failed to create stderr pipe %s", fifos.Err) } defer func(l net.Listener) { if err != nil { @@ -77,7 +77,7 @@ func copyIO(fifos *fifoSet, ioset *ioSet, tty bool) (closer io.Closer, err error defer wg.Done() c, err := l.Accept() if err != nil { - log.L.WithError(err).Errorf("failed to accept stderr connection on %s", fifos.err) + log.L.WithError(err).Errorf("failed to accept stderr connection on %s", fifos.Err) return } io.Copy(ioset.err, c) @@ -88,6 +88,6 @@ func copyIO(fifos *fifoSet, ioset *ioSet, tty bool) (closer io.Closer, err error return &wgCloser{ wg: &wg, - dir: fifos.dir, + dir: fifos.Dir, }, nil } diff --git a/task.go b/task.go index bee50db66..21bf92a3f 100644 --- a/task.go +++ b/task.go @@ -33,6 +33,7 @@ type Task interface { Exec(context.Context, *specs.Process, IOCreation) (Process, error) Processes(context.Context) ([]uint32, error) CloseStdin(context.Context) error + IO() *IO } type Process interface { @@ -168,3 +169,7 @@ func (t *task) CloseStdin(ctx context.Context) error { }) return err } + +func (t *task) IO() *IO { + return t.io +}