Add FifoIO to expose fifos directly to client

This allows clients an easier way to interact with the fifos for a
container without having to use the built in copyIO functions when
opening fifos.

It's nothing that clients could not have already coded but since we use
this type of functionality in the tests it makes sense to add an
implementation here.

Signed-off-by: Michael Crosby <crosbymichael@gmail.com>
This commit is contained in:
Michael Crosby 2017-08-21 10:55:21 -04:00
parent 10460647f2
commit a8b21da538
4 changed files with 206 additions and 139 deletions

View File

@ -3,7 +3,12 @@
package containerd
import (
"bytes"
"context"
"fmt"
"io"
"runtime"
"sync"
"syscall"
"testing"
"time"
@ -256,3 +261,122 @@ func TestDaemonRestart(t *testing.T) {
<-statusC
}
func TestContainerAttach(t *testing.T) {
t.Parallel()
if runtime.GOOS == "windows" {
// On windows, closing the write side of the pipe closes the read
// side, sending an EOF to it and preventing reopening it.
// Hence this test will always fails on windows
t.Skip("invalid logic on windows")
}
client, err := newClient(t, address)
if err != nil {
t.Fatal(err)
}
defer client.Close()
var (
image Image
ctx, cancel = testContext()
id = t.Name()
)
defer cancel()
if runtime.GOOS != "windows" {
image, err = client.GetImage(ctx, testImage)
if err != nil {
t.Error(err)
return
}
}
spec, err := generateSpec(withImageConfig(ctx, image), withCat())
if err != nil {
t.Error(err)
return
}
container, err := client.NewContainer(ctx, id, WithSpec(spec), withNewSnapshot(id, image))
if err != nil {
t.Error(err)
return
}
defer container.Delete(ctx, WithSnapshotCleanup)
expected := "hello" + newLine
direct, err := NewDirectIO(ctx, false)
if err != nil {
t.Error(err)
return
}
defer direct.Delete()
var (
wg sync.WaitGroup
buf = bytes.NewBuffer(nil)
)
wg.Add(1)
go func() {
defer wg.Done()
io.Copy(buf, direct.Stdout)
}()
task, err := container.NewTask(ctx, direct.IOCreate)
if err != nil {
t.Error(err)
return
}
defer task.Delete(ctx)
status, err := task.Wait(ctx)
if err != nil {
t.Error(err)
}
if err := task.Start(ctx); err != nil {
t.Error(err)
return
}
if _, err := fmt.Fprint(direct.Stdin, expected); err != nil {
t.Error(err)
}
// load the container and re-load the task
if container, err = client.LoadContainer(ctx, id); err != nil {
t.Error(err)
return
}
if task, err = container.Task(ctx, direct.IOAttach); err != nil {
t.Error(err)
return
}
if _, err := fmt.Fprint(direct.Stdin, expected); err != nil {
t.Error(err)
}
direct.Stdin.Close()
if err := task.CloseIO(ctx, WithStdinCloser); err != nil {
t.Error(err)
}
<-status
wg.Wait()
if _, err := task.Delete(ctx); err != nil {
t.Error(err)
}
output := buf.String()
// we wrote the same thing after attach
expected = expected + expected
if output != expected {
t.Errorf("expected output %q but received %q", expected, output)
}
}

View File

@ -3,12 +3,10 @@ package containerd
import (
"bytes"
"fmt"
"io"
"io/ioutil"
"os"
"runtime"
"strings"
"sync"
"syscall"
"testing"
"time"
@ -499,142 +497,6 @@ func TestContainerCloseIO(t *testing.T) {
}
}
func TestContainerAttach(t *testing.T) {
t.Parallel()
if runtime.GOOS == "windows" {
// On windows, closing the write side of the pipe closes the read
// side, sending an EOF to it and preventing reopening it.
// Hence this test will always fails on windows
t.Skip("invalid logic on windows")
}
client, err := newClient(t, address)
if err != nil {
t.Fatal(err)
}
defer client.Close()
var (
image Image
ctx, cancel = testContext()
id = t.Name()
)
defer cancel()
if runtime.GOOS != "windows" {
image, err = client.GetImage(ctx, testImage)
if err != nil {
t.Error(err)
return
}
}
spec, err := generateSpec(withImageConfig(ctx, image), withCat())
if err != nil {
t.Error(err)
return
}
container, err := client.NewContainer(ctx, id, WithSpec(spec), withNewSnapshot(id, image))
if err != nil {
t.Error(err)
return
}
defer container.Delete(ctx, WithSnapshotCleanup)
expected := "hello" + newLine
stdout := bytes.NewBuffer(nil)
r, w, err := os.Pipe()
if err != nil {
t.Error(err)
return
}
or, ow, err := os.Pipe()
if err != nil {
t.Error(err)
return
}
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
io.Copy(stdout, or)
wg.Done()
}()
task, err := container.NewTask(ctx, NewIO(r, ow, ioutil.Discard))
if err != nil {
t.Error(err)
return
}
defer task.Delete(ctx)
originalIO := task.IO()
statusC, err := task.Wait(ctx)
if err != nil {
t.Error(err)
}
if err := task.Start(ctx); err != nil {
t.Error(err)
return
}
if _, err := fmt.Fprint(w, expected); err != nil {
t.Error(err)
}
w.Close()
// load the container and re-load the task
if container, err = client.LoadContainer(ctx, id); err != nil {
t.Error(err)
return
}
// create new IO for the loaded task
if r, w, err = os.Pipe(); err != nil {
t.Error(err)
return
}
if task, err = container.Task(ctx, WithAttach(r, ow, ioutil.Discard)); err != nil {
t.Error(err)
return
}
if _, err := fmt.Fprint(w, expected); err != nil {
t.Error(err)
}
w.Close()
if err := task.CloseIO(ctx, WithStdinCloser); err != nil {
t.Error(err)
}
status := <-statusC
_, _, err = status.Result()
if err != nil {
t.Error(err)
return
}
originalIO.Close()
if _, err := task.Delete(ctx); err != nil {
t.Error(err)
}
ow.Close()
wg.Wait()
output := stdout.String()
// we wrote the same thing after attach
expected = expected + expected
if output != expected {
t.Errorf("expected output %q but received %q", expected, output)
}
}
func TestDeleteRunningContainer(t *testing.T) {
t.Parallel()

1
io.go
View File

@ -24,7 +24,6 @@ type IOConfig struct {
type IO interface {
// Config returns the IO configuration.
Config() IOConfig
// Cancel aborts all current io operations
Cancel()
// Wait blocks until all io copy operations have completed

View File

@ -88,3 +88,85 @@ func copyIO(fifos *FIFOSet, ioset *ioSet, tty bool) (_ *wgCloser, err error) {
cancel: cancel,
}, nil
}
// NewDirectIO returns an IO implementation that exposes the pipes directly
func NewDirectIO(ctx context.Context, terminal bool) (*DirectIO, error) {
set, err := NewFifos("")
if err != nil {
return nil, err
}
f := &DirectIO{
set: set,
terminal: terminal,
}
defer func() {
if err != nil {
f.Delete()
}
}()
if f.Stdin, err = fifo.OpenFifo(ctx, set.In, syscall.O_WRONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil {
return nil, err
}
if f.Stdout, err = fifo.OpenFifo(ctx, set.Out, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil {
f.Stdin.Close()
return nil, err
}
if f.Stderr, err = fifo.OpenFifo(ctx, set.Err, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil {
f.Stdin.Close()
f.Stdout.Close()
return nil, err
}
return f, nil
}
type DirectIO struct {
Stdin io.WriteCloser
Stdout io.ReadCloser
Stderr io.ReadCloser
set *FIFOSet
terminal bool
}
func (f *DirectIO) IOCreate(id string) (IO, error) {
return f, nil
}
func (f *DirectIO) IOAttach(set *FIFOSet) (IO, error) {
return f, nil
}
func (f *DirectIO) Config() IOConfig {
return IOConfig{
Terminal: f.terminal,
Stdin: f.set.In,
Stdout: f.set.Out,
Stderr: f.set.Err,
}
}
func (f *DirectIO) Cancel() {
// nothing to cancel as all operations are handled externally
}
func (f *DirectIO) Wait() {
// nothing to wait on as all operations are handled externally
}
func (f *DirectIO) Close() error {
err := f.Stdin.Close()
if err2 := f.Stdout.Close(); err == nil {
err = err2
}
if err2 := f.Stderr.Close(); err == nil {
err = err2
}
return err
}
func (f *DirectIO) Delete() error {
if f.set.Dir == "" {
return nil
}
return os.RemoveAll(f.set.Dir)
}