Add LoadProcess api to Task

Fixes #1374

This adds a `LoadProcess` api to load existing exec'd processes from a
task.  It allows reattaching of IO and waiting on the process.

Signed-off-by: Michael Crosby <crosbymichael@gmail.com>
This commit is contained in:
Michael Crosby 2017-08-24 11:34:48 -04:00
parent a6ce1ef2a1
commit b3303b55c1
4 changed files with 181 additions and 18 deletions

View File

@ -245,19 +245,7 @@ func (c *container) loadTask(ctx context.Context, ioAttach IOAttach) (Task, erro
}
var i IO
if ioAttach != nil {
// get the existing fifo paths from the task information stored by the daemon
paths := &FIFOSet{
Dir: getFifoDir([]string{
response.Process.Stdin,
response.Process.Stdout,
response.Process.Stderr,
}),
In: response.Process.Stdin,
Out: response.Process.Stdout,
Err: response.Process.Stderr,
Terminal: response.Process.Terminal,
}
if i, err = ioAttach(paths); err != nil {
if i, err = attachExistingIO(response, ioAttach); err != nil {
return nil, err
}
}
@ -270,6 +258,22 @@ func (c *container) loadTask(ctx context.Context, ioAttach IOAttach) (Task, erro
return t, nil
}
func attachExistingIO(response *tasks.GetResponse, ioAttach IOAttach) (IO, error) {
// get the existing fifo paths from the task information stored by the daemon
paths := &FIFOSet{
Dir: getFifoDir([]string{
response.Process.Stdin,
response.Process.Stdout,
response.Process.Stderr,
}),
In: response.Process.Stdin,
Out: response.Process.Stdout,
Err: response.Process.Stderr,
Terminal: response.Process.Terminal,
}
return ioAttach(paths)
}
// getFifoDir looks for any non-empty path for a stdio fifo
// and returns the dir for where it is located
func getFifoDir(paths []string) string {

View File

@ -442,3 +442,139 @@ func TestContainerUsername(t *testing.T) {
t.Errorf("expected squid uid to be 31 but received %q", output)
}
}
func TestContainerAttachProcess(t *testing.T) {
t.Parallel()
if runtime.GOOS == "windows" {
// On windows, closing the write side of the pipe closes the read
// side, sending an EOF to it and preventing reopening it.
// Hence this test will always fails on windows
t.Skip("invalid logic on windows")
}
client, err := newClient(t, address)
if err != nil {
t.Fatal(err)
}
defer client.Close()
var (
image Image
ctx, cancel = testContext()
id = t.Name()
)
defer cancel()
if runtime.GOOS != "windows" {
image, err = client.GetImage(ctx, testImage)
if err != nil {
t.Error(err)
return
}
}
container, err := client.NewContainer(ctx, id, WithNewSpec(withImageConfig(image), withProcessArgs("sleep", "100")), withNewSnapshot(id, image))
if err != nil {
t.Error(err)
return
}
defer container.Delete(ctx, WithSnapshotCleanup)
expected := "hello" + newLine
// creating IO early for easy resource cleanup
direct, err := NewDirectIO(ctx, false)
if err != nil {
t.Error(err)
return
}
defer direct.Delete()
var (
wg sync.WaitGroup
buf = bytes.NewBuffer(nil)
)
wg.Add(1)
go func() {
defer wg.Done()
io.Copy(buf, direct.Stdout)
}()
task, err := container.NewTask(ctx, empty())
if err != nil {
t.Error(err)
return
}
defer task.Delete(ctx)
status, err := task.Wait(ctx)
if err != nil {
t.Error(err)
}
if err := task.Start(ctx); err != nil {
t.Error(err)
return
}
spec, err := container.Spec()
if err != nil {
t.Error(err)
return
}
processSpec := spec.Process
processSpec.Args = []string{"cat"}
execID := t.Name() + "_exec"
process, err := task.Exec(ctx, execID, processSpec, direct.IOCreate)
if err != nil {
t.Error(err)
return
}
processStatusC, err := process.Wait(ctx)
if err != nil {
t.Error(err)
return
}
if err := process.Start(ctx); err != nil {
t.Error(err)
return
}
if _, err := fmt.Fprint(direct.Stdin, expected); err != nil {
t.Error(err)
}
if process, err = task.LoadProcess(ctx, execID, direct.IOAttach); err != nil {
t.Error(err)
return
}
if _, err := fmt.Fprint(direct.Stdin, expected); err != nil {
t.Error(err)
}
direct.Stdin.Close()
if err := process.CloseIO(ctx, WithStdinCloser); err != nil {
t.Error(err)
}
<-processStatusC
wg.Wait()
if err := task.Kill(ctx, syscall.SIGKILL); err != nil {
t.Error(err)
}
output := buf.String()
// we wrote the same thing after attach
expected = expected + expected
if output != expected {
t.Errorf("expected output %q but received %q", expected, output)
}
<-status
}

View File

@ -11,7 +11,6 @@ import (
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/runtime"
"github.com/containerd/containerd/typeurl"
specs "github.com/opencontainers/runtime-spec/specs-go"
"github.com/pkg/errors"
)
@ -77,7 +76,6 @@ type process struct {
task *task
pid uint32
io IO
spec *specs.Process
}
func (p *process) ID() string {

31
task.go
View File

@ -116,13 +116,14 @@ type Task interface {
Checkpoint(context.Context, ...CheckpointTaskOpts) (v1.Descriptor, error)
// Update modifies executing tasks with updated settings
Update(context.Context, ...UpdateTaskOpts) error
// LoadProcess loads a previously created exec'd process
LoadProcess(context.Context, string, IOAttach) (Process, error)
}
var _ = (Task)(&task{})
type task struct {
client *Client
container Container
client *Client
io IO
id string
@ -328,7 +329,6 @@ func (t *task) Exec(ctx context.Context, id string, spec *specs.Process, ioCreat
id: id,
task: t,
io: i,
spec: spec,
}, nil
}
@ -440,6 +440,31 @@ func (t *task) Update(ctx context.Context, opts ...UpdateTaskOpts) error {
return errdefs.FromGRPC(err)
}
func (t *task) LoadProcess(ctx context.Context, id string, ioAttach IOAttach) (Process, error) {
response, err := t.client.TaskService().Get(ctx, &tasks.GetRequest{
ContainerID: t.id,
ExecID: id,
})
if err != nil {
err = errdefs.FromGRPC(err)
if errdefs.IsNotFound(err) {
return nil, errors.Wrapf(err, "no running process found")
}
return nil, err
}
var i IO
if ioAttach != nil {
if i, err = attachExistingIO(response, ioAttach); err != nil {
return nil, err
}
}
return &process{
id: id,
task: t,
io: i,
}, nil
}
func (t *task) checkpointTask(ctx context.Context, index *v1.Index, request *tasks.CheckpointTaskRequest) error {
response, err := t.client.TaskService().Checkpoint(ctx, request)
if err != nil {