From a8b21da5386c009793219c791a562cb5c64abb74 Mon Sep 17 00:00:00 2001 From: Michael Crosby Date: Mon, 21 Aug 2017 10:55:21 -0400 Subject: [PATCH] Add FifoIO to expose fifos directly to client This allows clients an easier way to interact with the fifos for a container without having to use the built in copyIO functions when opening fifos. It's nothing that clients could not have already coded but since we use this type of functionality in the tests it makes sense to add an implementation here. Signed-off-by: Michael Crosby --- container_linux_test.go | 124 ++++++++++++++++++++++++++++++++++++ container_test.go | 138 ---------------------------------------- io.go | 1 - io_unix.go | 82 ++++++++++++++++++++++++ 4 files changed, 206 insertions(+), 139 deletions(-) diff --git a/container_linux_test.go b/container_linux_test.go index 18e66d4a9..4db38b635 100644 --- a/container_linux_test.go +++ b/container_linux_test.go @@ -3,7 +3,12 @@ package containerd import ( + "bytes" "context" + "fmt" + "io" + "runtime" + "sync" "syscall" "testing" "time" @@ -256,3 +261,122 @@ func TestDaemonRestart(t *testing.T) { <-statusC } + +func TestContainerAttach(t *testing.T) { + t.Parallel() + + if runtime.GOOS == "windows" { + // On windows, closing the write side of the pipe closes the read + // side, sending an EOF to it and preventing reopening it. + // Hence this test will always fails on windows + t.Skip("invalid logic on windows") + } + + client, err := newClient(t, address) + if err != nil { + t.Fatal(err) + } + defer client.Close() + + var ( + image Image + ctx, cancel = testContext() + id = t.Name() + ) + defer cancel() + + if runtime.GOOS != "windows" { + image, err = client.GetImage(ctx, testImage) + if err != nil { + t.Error(err) + return + } + } + + spec, err := generateSpec(withImageConfig(ctx, image), withCat()) + if err != nil { + t.Error(err) + return + } + container, err := client.NewContainer(ctx, id, WithSpec(spec), withNewSnapshot(id, image)) + if err != nil { + t.Error(err) + return + } + defer container.Delete(ctx, WithSnapshotCleanup) + + expected := "hello" + newLine + + direct, err := NewDirectIO(ctx, false) + if err != nil { + t.Error(err) + return + } + defer direct.Delete() + var ( + wg sync.WaitGroup + buf = bytes.NewBuffer(nil) + ) + wg.Add(1) + go func() { + defer wg.Done() + io.Copy(buf, direct.Stdout) + }() + + task, err := container.NewTask(ctx, direct.IOCreate) + if err != nil { + t.Error(err) + return + } + defer task.Delete(ctx) + + status, err := task.Wait(ctx) + if err != nil { + t.Error(err) + } + + if err := task.Start(ctx); err != nil { + t.Error(err) + return + } + + if _, err := fmt.Fprint(direct.Stdin, expected); err != nil { + t.Error(err) + } + + // load the container and re-load the task + if container, err = client.LoadContainer(ctx, id); err != nil { + t.Error(err) + return + } + + if task, err = container.Task(ctx, direct.IOAttach); err != nil { + t.Error(err) + return + } + + if _, err := fmt.Fprint(direct.Stdin, expected); err != nil { + t.Error(err) + } + + direct.Stdin.Close() + + if err := task.CloseIO(ctx, WithStdinCloser); err != nil { + t.Error(err) + } + + <-status + + wg.Wait() + if _, err := task.Delete(ctx); err != nil { + t.Error(err) + } + + output := buf.String() + + // we wrote the same thing after attach + expected = expected + expected + if output != expected { + t.Errorf("expected output %q but received %q", expected, output) + } +} diff --git a/container_test.go b/container_test.go index e73b8c8b3..3adba1731 100644 --- a/container_test.go +++ b/container_test.go @@ -3,12 +3,10 @@ package containerd import ( "bytes" "fmt" - "io" "io/ioutil" "os" "runtime" "strings" - "sync" "syscall" "testing" "time" @@ -499,142 +497,6 @@ func TestContainerCloseIO(t *testing.T) { } } -func TestContainerAttach(t *testing.T) { - t.Parallel() - - if runtime.GOOS == "windows" { - // On windows, closing the write side of the pipe closes the read - // side, sending an EOF to it and preventing reopening it. - // Hence this test will always fails on windows - t.Skip("invalid logic on windows") - } - - client, err := newClient(t, address) - if err != nil { - t.Fatal(err) - } - defer client.Close() - - var ( - image Image - ctx, cancel = testContext() - id = t.Name() - ) - defer cancel() - - if runtime.GOOS != "windows" { - image, err = client.GetImage(ctx, testImage) - if err != nil { - t.Error(err) - return - } - } - - spec, err := generateSpec(withImageConfig(ctx, image), withCat()) - if err != nil { - t.Error(err) - return - } - container, err := client.NewContainer(ctx, id, WithSpec(spec), withNewSnapshot(id, image)) - if err != nil { - t.Error(err) - return - } - defer container.Delete(ctx, WithSnapshotCleanup) - - expected := "hello" + newLine - stdout := bytes.NewBuffer(nil) - - r, w, err := os.Pipe() - if err != nil { - t.Error(err) - return - } - or, ow, err := os.Pipe() - if err != nil { - t.Error(err) - return - } - - wg := &sync.WaitGroup{} - - wg.Add(1) - go func() { - io.Copy(stdout, or) - wg.Done() - }() - - task, err := container.NewTask(ctx, NewIO(r, ow, ioutil.Discard)) - if err != nil { - t.Error(err) - return - } - defer task.Delete(ctx) - originalIO := task.IO() - - statusC, err := task.Wait(ctx) - if err != nil { - t.Error(err) - } - - if err := task.Start(ctx); err != nil { - t.Error(err) - return - } - - if _, err := fmt.Fprint(w, expected); err != nil { - t.Error(err) - } - w.Close() - - // load the container and re-load the task - if container, err = client.LoadContainer(ctx, id); err != nil { - t.Error(err) - return - } - - // create new IO for the loaded task - if r, w, err = os.Pipe(); err != nil { - t.Error(err) - return - } - if task, err = container.Task(ctx, WithAttach(r, ow, ioutil.Discard)); err != nil { - t.Error(err) - return - } - - if _, err := fmt.Fprint(w, expected); err != nil { - t.Error(err) - } - w.Close() - - if err := task.CloseIO(ctx, WithStdinCloser); err != nil { - t.Error(err) - } - - status := <-statusC - _, _, err = status.Result() - if err != nil { - t.Error(err) - return - } - - originalIO.Close() - if _, err := task.Delete(ctx); err != nil { - t.Error(err) - } - ow.Close() - - wg.Wait() - output := stdout.String() - - // we wrote the same thing after attach - expected = expected + expected - if output != expected { - t.Errorf("expected output %q but received %q", expected, output) - } -} - func TestDeleteRunningContainer(t *testing.T) { t.Parallel() diff --git a/io.go b/io.go index 27c3482c1..108af6bb2 100644 --- a/io.go +++ b/io.go @@ -24,7 +24,6 @@ type IOConfig struct { type IO interface { // Config returns the IO configuration. Config() IOConfig - // Cancel aborts all current io operations Cancel() // Wait blocks until all io copy operations have completed diff --git a/io_unix.go b/io_unix.go index cde30d452..3da6de2ca 100644 --- a/io_unix.go +++ b/io_unix.go @@ -88,3 +88,85 @@ func copyIO(fifos *FIFOSet, ioset *ioSet, tty bool) (_ *wgCloser, err error) { cancel: cancel, }, nil } + +// NewDirectIO returns an IO implementation that exposes the pipes directly +func NewDirectIO(ctx context.Context, terminal bool) (*DirectIO, error) { + set, err := NewFifos("") + if err != nil { + return nil, err + } + f := &DirectIO{ + set: set, + terminal: terminal, + } + defer func() { + if err != nil { + f.Delete() + } + }() + if f.Stdin, err = fifo.OpenFifo(ctx, set.In, syscall.O_WRONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil { + return nil, err + } + if f.Stdout, err = fifo.OpenFifo(ctx, set.Out, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil { + f.Stdin.Close() + return nil, err + } + if f.Stderr, err = fifo.OpenFifo(ctx, set.Err, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil { + f.Stdin.Close() + f.Stdout.Close() + return nil, err + } + return f, nil +} + +type DirectIO struct { + Stdin io.WriteCloser + Stdout io.ReadCloser + Stderr io.ReadCloser + + set *FIFOSet + terminal bool +} + +func (f *DirectIO) IOCreate(id string) (IO, error) { + return f, nil +} + +func (f *DirectIO) IOAttach(set *FIFOSet) (IO, error) { + return f, nil +} + +func (f *DirectIO) Config() IOConfig { + return IOConfig{ + Terminal: f.terminal, + Stdin: f.set.In, + Stdout: f.set.Out, + Stderr: f.set.Err, + } +} + +func (f *DirectIO) Cancel() { + // nothing to cancel as all operations are handled externally +} + +func (f *DirectIO) Wait() { + // nothing to wait on as all operations are handled externally +} + +func (f *DirectIO) Close() error { + err := f.Stdin.Close() + if err2 := f.Stdout.Close(); err == nil { + err = err2 + } + if err2 := f.Stderr.Close(); err == nil { + err = err2 + } + return err +} + +func (f *DirectIO) Delete() error { + if f.set.Dir == "" { + return nil + } + return os.RemoveAll(f.set.Dir) +}