From b3303b55c1c1ccee3cb3c0a0b3437e79598b29ef Mon Sep 17 00:00:00 2001 From: Michael Crosby Date: Thu, 24 Aug 2017 11:34:48 -0400 Subject: [PATCH] Add LoadProcess api to Task Fixes #1374 This adds a `LoadProcess` api to load existing exec'd processes from a task. It allows reattaching of IO and waiting on the process. Signed-off-by: Michael Crosby --- container.go | 30 +++++---- container_linux_test.go | 136 ++++++++++++++++++++++++++++++++++++++++ process.go | 2 - task.go | 31 ++++++++- 4 files changed, 181 insertions(+), 18 deletions(-) diff --git a/container.go b/container.go index 9d74a9932..ee507f669 100644 --- a/container.go +++ b/container.go @@ -245,19 +245,7 @@ func (c *container) loadTask(ctx context.Context, ioAttach IOAttach) (Task, erro } var i IO if ioAttach != nil { - // get the existing fifo paths from the task information stored by the daemon - paths := &FIFOSet{ - Dir: getFifoDir([]string{ - response.Process.Stdin, - response.Process.Stdout, - response.Process.Stderr, - }), - In: response.Process.Stdin, - Out: response.Process.Stdout, - Err: response.Process.Stderr, - Terminal: response.Process.Terminal, - } - if i, err = ioAttach(paths); err != nil { + if i, err = attachExistingIO(response, ioAttach); err != nil { return nil, err } } @@ -270,6 +258,22 @@ func (c *container) loadTask(ctx context.Context, ioAttach IOAttach) (Task, erro return t, nil } +func attachExistingIO(response *tasks.GetResponse, ioAttach IOAttach) (IO, error) { + // get the existing fifo paths from the task information stored by the daemon + paths := &FIFOSet{ + Dir: getFifoDir([]string{ + response.Process.Stdin, + response.Process.Stdout, + response.Process.Stderr, + }), + In: response.Process.Stdin, + Out: response.Process.Stdout, + Err: response.Process.Stderr, + Terminal: response.Process.Terminal, + } + return ioAttach(paths) +} + // getFifoDir looks for any non-empty path for a stdio fifo // and returns the dir for where it is located func getFifoDir(paths []string) string { diff --git a/container_linux_test.go b/container_linux_test.go index 10e79f04e..9612410c4 100644 --- a/container_linux_test.go +++ b/container_linux_test.go @@ -442,3 +442,139 @@ func TestContainerUsername(t *testing.T) { t.Errorf("expected squid uid to be 31 but received %q", output) } } + +func TestContainerAttachProcess(t *testing.T) { + t.Parallel() + + if runtime.GOOS == "windows" { + // On windows, closing the write side of the pipe closes the read + // side, sending an EOF to it and preventing reopening it. + // Hence this test will always fails on windows + t.Skip("invalid logic on windows") + } + + client, err := newClient(t, address) + if err != nil { + t.Fatal(err) + } + defer client.Close() + + var ( + image Image + ctx, cancel = testContext() + id = t.Name() + ) + defer cancel() + + if runtime.GOOS != "windows" { + image, err = client.GetImage(ctx, testImage) + if err != nil { + t.Error(err) + return + } + } + + container, err := client.NewContainer(ctx, id, WithNewSpec(withImageConfig(image), withProcessArgs("sleep", "100")), withNewSnapshot(id, image)) + if err != nil { + t.Error(err) + return + } + defer container.Delete(ctx, WithSnapshotCleanup) + + expected := "hello" + newLine + + // creating IO early for easy resource cleanup + direct, err := NewDirectIO(ctx, false) + if err != nil { + t.Error(err) + return + } + defer direct.Delete() + var ( + wg sync.WaitGroup + buf = bytes.NewBuffer(nil) + ) + wg.Add(1) + go func() { + defer wg.Done() + io.Copy(buf, direct.Stdout) + }() + + task, err := container.NewTask(ctx, empty()) + if err != nil { + t.Error(err) + return + } + defer task.Delete(ctx) + + status, err := task.Wait(ctx) + if err != nil { + t.Error(err) + } + + if err := task.Start(ctx); err != nil { + t.Error(err) + return + } + + spec, err := container.Spec() + if err != nil { + t.Error(err) + return + } + + processSpec := spec.Process + processSpec.Args = []string{"cat"} + execID := t.Name() + "_exec" + process, err := task.Exec(ctx, execID, processSpec, direct.IOCreate) + if err != nil { + t.Error(err) + return + } + processStatusC, err := process.Wait(ctx) + if err != nil { + t.Error(err) + return + } + + if err := process.Start(ctx); err != nil { + t.Error(err) + return + } + + if _, err := fmt.Fprint(direct.Stdin, expected); err != nil { + t.Error(err) + } + + if process, err = task.LoadProcess(ctx, execID, direct.IOAttach); err != nil { + t.Error(err) + return + } + + if _, err := fmt.Fprint(direct.Stdin, expected); err != nil { + t.Error(err) + } + + direct.Stdin.Close() + + if err := process.CloseIO(ctx, WithStdinCloser); err != nil { + t.Error(err) + } + + <-processStatusC + + wg.Wait() + + if err := task.Kill(ctx, syscall.SIGKILL); err != nil { + t.Error(err) + } + + output := buf.String() + + // we wrote the same thing after attach + expected = expected + expected + if output != expected { + t.Errorf("expected output %q but received %q", expected, output) + } + <-status +} diff --git a/process.go b/process.go index 276869564..6cea4c47a 100644 --- a/process.go +++ b/process.go @@ -11,7 +11,6 @@ import ( "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/runtime" "github.com/containerd/containerd/typeurl" - specs "github.com/opencontainers/runtime-spec/specs-go" "github.com/pkg/errors" ) @@ -77,7 +76,6 @@ type process struct { task *task pid uint32 io IO - spec *specs.Process } func (p *process) ID() string { diff --git a/task.go b/task.go index ea85d6742..787db09fc 100644 --- a/task.go +++ b/task.go @@ -116,13 +116,14 @@ type Task interface { Checkpoint(context.Context, ...CheckpointTaskOpts) (v1.Descriptor, error) // Update modifies executing tasks with updated settings Update(context.Context, ...UpdateTaskOpts) error + // LoadProcess loads a previously created exec'd process + LoadProcess(context.Context, string, IOAttach) (Process, error) } var _ = (Task)(&task{}) type task struct { - client *Client - container Container + client *Client io IO id string @@ -328,7 +329,6 @@ func (t *task) Exec(ctx context.Context, id string, spec *specs.Process, ioCreat id: id, task: t, io: i, - spec: spec, }, nil } @@ -440,6 +440,31 @@ func (t *task) Update(ctx context.Context, opts ...UpdateTaskOpts) error { return errdefs.FromGRPC(err) } +func (t *task) LoadProcess(ctx context.Context, id string, ioAttach IOAttach) (Process, error) { + response, err := t.client.TaskService().Get(ctx, &tasks.GetRequest{ + ContainerID: t.id, + ExecID: id, + }) + if err != nil { + err = errdefs.FromGRPC(err) + if errdefs.IsNotFound(err) { + return nil, errors.Wrapf(err, "no running process found") + } + return nil, err + } + var i IO + if ioAttach != nil { + if i, err = attachExistingIO(response, ioAttach); err != nil { + return nil, err + } + } + return &process{ + id: id, + task: t, + io: i, + }, nil +} + func (t *task) checkpointTask(ctx context.Context, index *v1.Index, request *tasks.CheckpointTaskRequest) error { response, err := t.client.TaskService().Checkpoint(ctx, request) if err != nil {