Merge pull request #1405 from crosbymichael/fifos

Add FifoIO to expose fifos directly to client
This commit is contained in:
Derek McGowan 2017-08-22 15:52:44 -07:00 committed by GitHub
commit 4028add553
4 changed files with 206 additions and 139 deletions

View File

@ -3,7 +3,12 @@
package containerd package containerd
import ( import (
"bytes"
"context" "context"
"fmt"
"io"
"runtime"
"sync"
"syscall" "syscall"
"testing" "testing"
"time" "time"
@ -256,3 +261,122 @@ func TestDaemonRestart(t *testing.T) {
<-statusC <-statusC
} }
func TestContainerAttach(t *testing.T) {
t.Parallel()
if runtime.GOOS == "windows" {
// On windows, closing the write side of the pipe closes the read
// side, sending an EOF to it and preventing reopening it.
// Hence this test will always fails on windows
t.Skip("invalid logic on windows")
}
client, err := newClient(t, address)
if err != nil {
t.Fatal(err)
}
defer client.Close()
var (
image Image
ctx, cancel = testContext()
id = t.Name()
)
defer cancel()
if runtime.GOOS != "windows" {
image, err = client.GetImage(ctx, testImage)
if err != nil {
t.Error(err)
return
}
}
spec, err := generateSpec(withImageConfig(ctx, image), withCat())
if err != nil {
t.Error(err)
return
}
container, err := client.NewContainer(ctx, id, WithSpec(spec), withNewSnapshot(id, image))
if err != nil {
t.Error(err)
return
}
defer container.Delete(ctx, WithSnapshotCleanup)
expected := "hello" + newLine
direct, err := NewDirectIO(ctx, false)
if err != nil {
t.Error(err)
return
}
defer direct.Delete()
var (
wg sync.WaitGroup
buf = bytes.NewBuffer(nil)
)
wg.Add(1)
go func() {
defer wg.Done()
io.Copy(buf, direct.Stdout)
}()
task, err := container.NewTask(ctx, direct.IOCreate)
if err != nil {
t.Error(err)
return
}
defer task.Delete(ctx)
status, err := task.Wait(ctx)
if err != nil {
t.Error(err)
}
if err := task.Start(ctx); err != nil {
t.Error(err)
return
}
if _, err := fmt.Fprint(direct.Stdin, expected); err != nil {
t.Error(err)
}
// load the container and re-load the task
if container, err = client.LoadContainer(ctx, id); err != nil {
t.Error(err)
return
}
if task, err = container.Task(ctx, direct.IOAttach); err != nil {
t.Error(err)
return
}
if _, err := fmt.Fprint(direct.Stdin, expected); err != nil {
t.Error(err)
}
direct.Stdin.Close()
if err := task.CloseIO(ctx, WithStdinCloser); err != nil {
t.Error(err)
}
<-status
wg.Wait()
if _, err := task.Delete(ctx); err != nil {
t.Error(err)
}
output := buf.String()
// we wrote the same thing after attach
expected = expected + expected
if output != expected {
t.Errorf("expected output %q but received %q", expected, output)
}
}

View File

@ -3,12 +3,10 @@ package containerd
import ( import (
"bytes" "bytes"
"fmt" "fmt"
"io"
"io/ioutil" "io/ioutil"
"os" "os"
"runtime" "runtime"
"strings" "strings"
"sync"
"syscall" "syscall"
"testing" "testing"
"time" "time"
@ -484,142 +482,6 @@ func TestContainerCloseIO(t *testing.T) {
<-statusC <-statusC
} }
func TestContainerAttach(t *testing.T) {
t.Parallel()
if runtime.GOOS == "windows" {
// On windows, closing the write side of the pipe closes the read
// side, sending an EOF to it and preventing reopening it.
// Hence this test will always fails on windows
t.Skip("invalid logic on windows")
}
client, err := newClient(t, address)
if err != nil {
t.Fatal(err)
}
defer client.Close()
var (
image Image
ctx, cancel = testContext()
id = t.Name()
)
defer cancel()
if runtime.GOOS != "windows" {
image, err = client.GetImage(ctx, testImage)
if err != nil {
t.Error(err)
return
}
}
spec, err := generateSpec(withImageConfig(ctx, image), withCat())
if err != nil {
t.Error(err)
return
}
container, err := client.NewContainer(ctx, id, WithSpec(spec), withNewSnapshot(id, image))
if err != nil {
t.Error(err)
return
}
defer container.Delete(ctx, WithSnapshotCleanup)
expected := "hello" + newLine
stdout := bytes.NewBuffer(nil)
r, w, err := os.Pipe()
if err != nil {
t.Error(err)
return
}
or, ow, err := os.Pipe()
if err != nil {
t.Error(err)
return
}
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
io.Copy(stdout, or)
wg.Done()
}()
task, err := container.NewTask(ctx, NewIO(r, ow, ioutil.Discard))
if err != nil {
t.Error(err)
return
}
defer task.Delete(ctx)
originalIO := task.IO()
statusC, err := task.Wait(ctx)
if err != nil {
t.Error(err)
}
if err := task.Start(ctx); err != nil {
t.Error(err)
return
}
if _, err := fmt.Fprint(w, expected); err != nil {
t.Error(err)
}
w.Close()
// load the container and re-load the task
if container, err = client.LoadContainer(ctx, id); err != nil {
t.Error(err)
return
}
// create new IO for the loaded task
if r, w, err = os.Pipe(); err != nil {
t.Error(err)
return
}
if task, err = container.Task(ctx, WithAttach(r, ow, ioutil.Discard)); err != nil {
t.Error(err)
return
}
if _, err := fmt.Fprint(w, expected); err != nil {
t.Error(err)
}
w.Close()
if err := task.CloseIO(ctx, WithStdinCloser); err != nil {
t.Error(err)
}
status := <-statusC
_, _, err = status.Result()
if err != nil {
t.Error(err)
return
}
originalIO.Close()
if _, err := task.Delete(ctx); err != nil {
t.Error(err)
}
ow.Close()
wg.Wait()
output := stdout.String()
// we wrote the same thing after attach
expected = expected + expected
if output != expected {
t.Errorf("expected output %q but received %q", expected, output)
}
}
func TestDeleteRunningContainer(t *testing.T) { func TestDeleteRunningContainer(t *testing.T) {
t.Parallel() t.Parallel()

1
io.go
View File

@ -24,7 +24,6 @@ type IOConfig struct {
type IO interface { type IO interface {
// Config returns the IO configuration. // Config returns the IO configuration.
Config() IOConfig Config() IOConfig
// Cancel aborts all current io operations // Cancel aborts all current io operations
Cancel() Cancel()
// Wait blocks until all io copy operations have completed // Wait blocks until all io copy operations have completed

View File

@ -88,3 +88,85 @@ func copyIO(fifos *FIFOSet, ioset *ioSet, tty bool) (_ *wgCloser, err error) {
cancel: cancel, cancel: cancel,
}, nil }, nil
} }
// NewDirectIO returns an IO implementation that exposes the pipes directly
func NewDirectIO(ctx context.Context, terminal bool) (*DirectIO, error) {
set, err := NewFifos("")
if err != nil {
return nil, err
}
f := &DirectIO{
set: set,
terminal: terminal,
}
defer func() {
if err != nil {
f.Delete()
}
}()
if f.Stdin, err = fifo.OpenFifo(ctx, set.In, syscall.O_WRONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil {
return nil, err
}
if f.Stdout, err = fifo.OpenFifo(ctx, set.Out, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil {
f.Stdin.Close()
return nil, err
}
if f.Stderr, err = fifo.OpenFifo(ctx, set.Err, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil {
f.Stdin.Close()
f.Stdout.Close()
return nil, err
}
return f, nil
}
type DirectIO struct {
Stdin io.WriteCloser
Stdout io.ReadCloser
Stderr io.ReadCloser
set *FIFOSet
terminal bool
}
func (f *DirectIO) IOCreate(id string) (IO, error) {
return f, nil
}
func (f *DirectIO) IOAttach(set *FIFOSet) (IO, error) {
return f, nil
}
func (f *DirectIO) Config() IOConfig {
return IOConfig{
Terminal: f.terminal,
Stdin: f.set.In,
Stdout: f.set.Out,
Stderr: f.set.Err,
}
}
func (f *DirectIO) Cancel() {
// nothing to cancel as all operations are handled externally
}
func (f *DirectIO) Wait() {
// nothing to wait on as all operations are handled externally
}
func (f *DirectIO) Close() error {
err := f.Stdin.Close()
if err2 := f.Stdout.Close(); err == nil {
err = err2
}
if err2 := f.Stderr.Close(); err == nil {
err = err2
}
return err
}
func (f *DirectIO) Delete() error {
if f.set.Dir == "" {
return nil
}
return os.RemoveAll(f.set.Dir)
}