diff --git a/container_linux_test.go b/container_linux_test.go index 9b0345c96..ab6ff9fa5 100644 --- a/container_linux_test.go +++ b/container_linux_test.go @@ -3,7 +3,12 @@ package containerd import ( + "bytes" "context" + "fmt" + "io" + "runtime" + "sync" "syscall" "testing" "time" @@ -256,3 +261,122 @@ func TestDaemonRestart(t *testing.T) { <-statusC } + +func TestContainerAttach(t *testing.T) { + t.Parallel() + + if runtime.GOOS == "windows" { + // On windows, closing the write side of the pipe closes the read + // side, sending an EOF to it and preventing reopening it. + // Hence this test will always fails on windows + t.Skip("invalid logic on windows") + } + + client, err := newClient(t, address) + if err != nil { + t.Fatal(err) + } + defer client.Close() + + var ( + image Image + ctx, cancel = testContext() + id = t.Name() + ) + defer cancel() + + if runtime.GOOS != "windows" { + image, err = client.GetImage(ctx, testImage) + if err != nil { + t.Error(err) + return + } + } + + spec, err := generateSpec(withImageConfig(ctx, image), withCat()) + if err != nil { + t.Error(err) + return + } + container, err := client.NewContainer(ctx, id, WithSpec(spec), withNewSnapshot(id, image)) + if err != nil { + t.Error(err) + return + } + defer container.Delete(ctx, WithSnapshotCleanup) + + expected := "hello" + newLine + + direct, err := NewDirectIO(ctx, false) + if err != nil { + t.Error(err) + return + } + defer direct.Delete() + var ( + wg sync.WaitGroup + buf = bytes.NewBuffer(nil) + ) + wg.Add(1) + go func() { + defer wg.Done() + io.Copy(buf, direct.Stdout) + }() + + task, err := container.NewTask(ctx, direct.IOCreate) + if err != nil { + t.Error(err) + return + } + defer task.Delete(ctx) + + status, err := task.Wait(ctx) + if err != nil { + t.Error(err) + } + + if err := task.Start(ctx); err != nil { + t.Error(err) + return + } + + if _, err := fmt.Fprint(direct.Stdin, expected); err != nil { + t.Error(err) + } + + // load the container and re-load the task + if container, err = client.LoadContainer(ctx, id); err != nil { + t.Error(err) + return + } + + if task, err = container.Task(ctx, direct.IOAttach); err != nil { + t.Error(err) + return + } + + if _, err := fmt.Fprint(direct.Stdin, expected); err != nil { + t.Error(err) + } + + direct.Stdin.Close() + + if err := task.CloseIO(ctx, WithStdinCloser); err != nil { + t.Error(err) + } + + <-status + + wg.Wait() + if _, err := task.Delete(ctx); err != nil { + t.Error(err) + } + + output := buf.String() + + // we wrote the same thing after attach + expected = expected + expected + if output != expected { + t.Errorf("expected output %q but received %q", expected, output) + } +} diff --git a/container_test.go b/container_test.go index 3d665a330..a2627114c 100644 --- a/container_test.go +++ b/container_test.go @@ -3,12 +3,10 @@ package containerd import ( "bytes" "fmt" - "io" "io/ioutil" "os" "runtime" "strings" - "sync" "syscall" "testing" "time" @@ -484,142 +482,6 @@ func TestContainerCloseIO(t *testing.T) { <-statusC } -func TestContainerAttach(t *testing.T) { - t.Parallel() - - if runtime.GOOS == "windows" { - // On windows, closing the write side of the pipe closes the read - // side, sending an EOF to it and preventing reopening it. - // Hence this test will always fails on windows - t.Skip("invalid logic on windows") - } - - client, err := newClient(t, address) - if err != nil { - t.Fatal(err) - } - defer client.Close() - - var ( - image Image - ctx, cancel = testContext() - id = t.Name() - ) - defer cancel() - - if runtime.GOOS != "windows" { - image, err = client.GetImage(ctx, testImage) - if err != nil { - t.Error(err) - return - } - } - - spec, err := generateSpec(withImageConfig(ctx, image), withCat()) - if err != nil { - t.Error(err) - return - } - container, err := client.NewContainer(ctx, id, WithSpec(spec), withNewSnapshot(id, image)) - if err != nil { - t.Error(err) - return - } - defer container.Delete(ctx, WithSnapshotCleanup) - - expected := "hello" + newLine - stdout := bytes.NewBuffer(nil) - - r, w, err := os.Pipe() - if err != nil { - t.Error(err) - return - } - or, ow, err := os.Pipe() - if err != nil { - t.Error(err) - return - } - - wg := &sync.WaitGroup{} - - wg.Add(1) - go func() { - io.Copy(stdout, or) - wg.Done() - }() - - task, err := container.NewTask(ctx, NewIO(r, ow, ioutil.Discard)) - if err != nil { - t.Error(err) - return - } - defer task.Delete(ctx) - originalIO := task.IO() - - statusC, err := task.Wait(ctx) - if err != nil { - t.Error(err) - } - - if err := task.Start(ctx); err != nil { - t.Error(err) - return - } - - if _, err := fmt.Fprint(w, expected); err != nil { - t.Error(err) - } - w.Close() - - // load the container and re-load the task - if container, err = client.LoadContainer(ctx, id); err != nil { - t.Error(err) - return - } - - // create new IO for the loaded task - if r, w, err = os.Pipe(); err != nil { - t.Error(err) - return - } - if task, err = container.Task(ctx, WithAttach(r, ow, ioutil.Discard)); err != nil { - t.Error(err) - return - } - - if _, err := fmt.Fprint(w, expected); err != nil { - t.Error(err) - } - w.Close() - - if err := task.CloseIO(ctx, WithStdinCloser); err != nil { - t.Error(err) - } - - status := <-statusC - _, _, err = status.Result() - if err != nil { - t.Error(err) - return - } - - originalIO.Close() - if _, err := task.Delete(ctx); err != nil { - t.Error(err) - } - ow.Close() - - wg.Wait() - output := stdout.String() - - // we wrote the same thing after attach - expected = expected + expected - if output != expected { - t.Errorf("expected output %q but received %q", expected, output) - } -} - func TestDeleteRunningContainer(t *testing.T) { t.Parallel() diff --git a/io.go b/io.go index 27c3482c1..108af6bb2 100644 --- a/io.go +++ b/io.go @@ -24,7 +24,6 @@ type IOConfig struct { type IO interface { // Config returns the IO configuration. Config() IOConfig - // Cancel aborts all current io operations Cancel() // Wait blocks until all io copy operations have completed diff --git a/io_unix.go b/io_unix.go index cde30d452..3da6de2ca 100644 --- a/io_unix.go +++ b/io_unix.go @@ -88,3 +88,85 @@ func copyIO(fifos *FIFOSet, ioset *ioSet, tty bool) (_ *wgCloser, err error) { cancel: cancel, }, nil } + +// NewDirectIO returns an IO implementation that exposes the pipes directly +func NewDirectIO(ctx context.Context, terminal bool) (*DirectIO, error) { + set, err := NewFifos("") + if err != nil { + return nil, err + } + f := &DirectIO{ + set: set, + terminal: terminal, + } + defer func() { + if err != nil { + f.Delete() + } + }() + if f.Stdin, err = fifo.OpenFifo(ctx, set.In, syscall.O_WRONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil { + return nil, err + } + if f.Stdout, err = fifo.OpenFifo(ctx, set.Out, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil { + f.Stdin.Close() + return nil, err + } + if f.Stderr, err = fifo.OpenFifo(ctx, set.Err, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil { + f.Stdin.Close() + f.Stdout.Close() + return nil, err + } + return f, nil +} + +type DirectIO struct { + Stdin io.WriteCloser + Stdout io.ReadCloser + Stderr io.ReadCloser + + set *FIFOSet + terminal bool +} + +func (f *DirectIO) IOCreate(id string) (IO, error) { + return f, nil +} + +func (f *DirectIO) IOAttach(set *FIFOSet) (IO, error) { + return f, nil +} + +func (f *DirectIO) Config() IOConfig { + return IOConfig{ + Terminal: f.terminal, + Stdin: f.set.In, + Stdout: f.set.Out, + Stderr: f.set.Err, + } +} + +func (f *DirectIO) Cancel() { + // nothing to cancel as all operations are handled externally +} + +func (f *DirectIO) Wait() { + // nothing to wait on as all operations are handled externally +} + +func (f *DirectIO) Close() error { + err := f.Stdin.Close() + if err2 := f.Stdout.Close(); err == nil { + err = err2 + } + if err2 := f.Stderr.Close(); err == nil { + err = err2 + } + return err +} + +func (f *DirectIO) Delete() error { + if f.set.Dir == "" { + return nil + } + return os.RemoveAll(f.set.Dir) +}