Merge pull request #1424 from crosbymichael/load-exec
Add LoadProcess api to Task
This commit is contained in:
commit
fb427da12f
30
container.go
30
container.go
@ -245,19 +245,7 @@ func (c *container) loadTask(ctx context.Context, ioAttach IOAttach) (Task, erro
|
|||||||
}
|
}
|
||||||
var i IO
|
var i IO
|
||||||
if ioAttach != nil {
|
if ioAttach != nil {
|
||||||
// get the existing fifo paths from the task information stored by the daemon
|
if i, err = attachExistingIO(response, ioAttach); err != nil {
|
||||||
paths := &FIFOSet{
|
|
||||||
Dir: getFifoDir([]string{
|
|
||||||
response.Process.Stdin,
|
|
||||||
response.Process.Stdout,
|
|
||||||
response.Process.Stderr,
|
|
||||||
}),
|
|
||||||
In: response.Process.Stdin,
|
|
||||||
Out: response.Process.Stdout,
|
|
||||||
Err: response.Process.Stderr,
|
|
||||||
Terminal: response.Process.Terminal,
|
|
||||||
}
|
|
||||||
if i, err = ioAttach(paths); err != nil {
|
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -270,6 +258,22 @@ func (c *container) loadTask(ctx context.Context, ioAttach IOAttach) (Task, erro
|
|||||||
return t, nil
|
return t, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func attachExistingIO(response *tasks.GetResponse, ioAttach IOAttach) (IO, error) {
|
||||||
|
// get the existing fifo paths from the task information stored by the daemon
|
||||||
|
paths := &FIFOSet{
|
||||||
|
Dir: getFifoDir([]string{
|
||||||
|
response.Process.Stdin,
|
||||||
|
response.Process.Stdout,
|
||||||
|
response.Process.Stderr,
|
||||||
|
}),
|
||||||
|
In: response.Process.Stdin,
|
||||||
|
Out: response.Process.Stdout,
|
||||||
|
Err: response.Process.Stderr,
|
||||||
|
Terminal: response.Process.Terminal,
|
||||||
|
}
|
||||||
|
return ioAttach(paths)
|
||||||
|
}
|
||||||
|
|
||||||
// getFifoDir looks for any non-empty path for a stdio fifo
|
// getFifoDir looks for any non-empty path for a stdio fifo
|
||||||
// and returns the dir for where it is located
|
// and returns the dir for where it is located
|
||||||
func getFifoDir(paths []string) string {
|
func getFifoDir(paths []string) string {
|
||||||
|
@ -442,3 +442,139 @@ func TestContainerUsername(t *testing.T) {
|
|||||||
t.Errorf("expected squid uid to be 31 but received %q", output)
|
t.Errorf("expected squid uid to be 31 but received %q", output)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestContainerAttachProcess(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
if runtime.GOOS == "windows" {
|
||||||
|
// On windows, closing the write side of the pipe closes the read
|
||||||
|
// side, sending an EOF to it and preventing reopening it.
|
||||||
|
// Hence this test will always fails on windows
|
||||||
|
t.Skip("invalid logic on windows")
|
||||||
|
}
|
||||||
|
|
||||||
|
client, err := newClient(t, address)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
defer client.Close()
|
||||||
|
|
||||||
|
var (
|
||||||
|
image Image
|
||||||
|
ctx, cancel = testContext()
|
||||||
|
id = t.Name()
|
||||||
|
)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
if runtime.GOOS != "windows" {
|
||||||
|
image, err = client.GetImage(ctx, testImage)
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
container, err := client.NewContainer(ctx, id, WithNewSpec(withImageConfig(image), withProcessArgs("sleep", "100")), withNewSnapshot(id, image))
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer container.Delete(ctx, WithSnapshotCleanup)
|
||||||
|
|
||||||
|
expected := "hello" + newLine
|
||||||
|
|
||||||
|
// creating IO early for easy resource cleanup
|
||||||
|
direct, err := NewDirectIO(ctx, false)
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer direct.Delete()
|
||||||
|
var (
|
||||||
|
wg sync.WaitGroup
|
||||||
|
buf = bytes.NewBuffer(nil)
|
||||||
|
)
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
io.Copy(buf, direct.Stdout)
|
||||||
|
}()
|
||||||
|
|
||||||
|
task, err := container.NewTask(ctx, empty())
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer task.Delete(ctx)
|
||||||
|
|
||||||
|
status, err := task.Wait(ctx)
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := task.Start(ctx); err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
spec, err := container.Spec()
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
processSpec := spec.Process
|
||||||
|
processSpec.Args = []string{"cat"}
|
||||||
|
execID := t.Name() + "_exec"
|
||||||
|
process, err := task.Exec(ctx, execID, processSpec, direct.IOCreate)
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
processStatusC, err := process.Wait(ctx)
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := process.Start(ctx); err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err := fmt.Fprint(direct.Stdin, expected); err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if process, err = task.LoadProcess(ctx, execID, direct.IOAttach); err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err := fmt.Fprint(direct.Stdin, expected); err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
direct.Stdin.Close()
|
||||||
|
|
||||||
|
if err := process.CloseIO(ctx, WithStdinCloser); err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
<-processStatusC
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
if err := task.Kill(ctx, syscall.SIGKILL); err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
output := buf.String()
|
||||||
|
|
||||||
|
// we wrote the same thing after attach
|
||||||
|
expected = expected + expected
|
||||||
|
if output != expected {
|
||||||
|
t.Errorf("expected output %q but received %q", expected, output)
|
||||||
|
}
|
||||||
|
<-status
|
||||||
|
}
|
||||||
|
@ -11,7 +11,6 @@ import (
|
|||||||
"github.com/containerd/containerd/errdefs"
|
"github.com/containerd/containerd/errdefs"
|
||||||
"github.com/containerd/containerd/runtime"
|
"github.com/containerd/containerd/runtime"
|
||||||
"github.com/containerd/containerd/typeurl"
|
"github.com/containerd/containerd/typeurl"
|
||||||
specs "github.com/opencontainers/runtime-spec/specs-go"
|
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -77,7 +76,6 @@ type process struct {
|
|||||||
task *task
|
task *task
|
||||||
pid uint32
|
pid uint32
|
||||||
io IO
|
io IO
|
||||||
spec *specs.Process
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *process) ID() string {
|
func (p *process) ID() string {
|
||||||
|
31
task.go
31
task.go
@ -116,13 +116,14 @@ type Task interface {
|
|||||||
Checkpoint(context.Context, ...CheckpointTaskOpts) (v1.Descriptor, error)
|
Checkpoint(context.Context, ...CheckpointTaskOpts) (v1.Descriptor, error)
|
||||||
// Update modifies executing tasks with updated settings
|
// Update modifies executing tasks with updated settings
|
||||||
Update(context.Context, ...UpdateTaskOpts) error
|
Update(context.Context, ...UpdateTaskOpts) error
|
||||||
|
// LoadProcess loads a previously created exec'd process
|
||||||
|
LoadProcess(context.Context, string, IOAttach) (Process, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ = (Task)(&task{})
|
var _ = (Task)(&task{})
|
||||||
|
|
||||||
type task struct {
|
type task struct {
|
||||||
client *Client
|
client *Client
|
||||||
container Container
|
|
||||||
|
|
||||||
io IO
|
io IO
|
||||||
id string
|
id string
|
||||||
@ -328,7 +329,6 @@ func (t *task) Exec(ctx context.Context, id string, spec *specs.Process, ioCreat
|
|||||||
id: id,
|
id: id,
|
||||||
task: t,
|
task: t,
|
||||||
io: i,
|
io: i,
|
||||||
spec: spec,
|
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -440,6 +440,31 @@ func (t *task) Update(ctx context.Context, opts ...UpdateTaskOpts) error {
|
|||||||
return errdefs.FromGRPC(err)
|
return errdefs.FromGRPC(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (t *task) LoadProcess(ctx context.Context, id string, ioAttach IOAttach) (Process, error) {
|
||||||
|
response, err := t.client.TaskService().Get(ctx, &tasks.GetRequest{
|
||||||
|
ContainerID: t.id,
|
||||||
|
ExecID: id,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
err = errdefs.FromGRPC(err)
|
||||||
|
if errdefs.IsNotFound(err) {
|
||||||
|
return nil, errors.Wrapf(err, "no running process found")
|
||||||
|
}
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
var i IO
|
||||||
|
if ioAttach != nil {
|
||||||
|
if i, err = attachExistingIO(response, ioAttach); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return &process{
|
||||||
|
id: id,
|
||||||
|
task: t,
|
||||||
|
io: i,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (t *task) checkpointTask(ctx context.Context, index *v1.Index, request *tasks.CheckpointTaskRequest) error {
|
func (t *task) checkpointTask(ctx context.Context, index *v1.Index, request *tasks.CheckpointTaskRequest) error {
|
||||||
response, err := t.client.TaskService().Checkpoint(ctx, request)
|
response, err := t.client.TaskService().Checkpoint(ctx, request)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
Loading…
Reference in New Issue
Block a user