Change IO to interface.

Signed-off-by: Lantao Liu <lantaol@google.com>
This commit is contained in:
Lantao Liu 2017-08-11 19:17:08 +00:00
parent c288fbe973
commit 8e2c95f9c1
4 changed files with 67 additions and 44 deletions

View File

@ -165,12 +165,13 @@ func (c *container) NewTask(ctx context.Context, ioCreate IOCreation, opts ...Ne
if err != nil { if err != nil {
return nil, err return nil, err
} }
cfg := i.Config()
request := &tasks.CreateTaskRequest{ request := &tasks.CreateTaskRequest{
ContainerID: c.c.ID, ContainerID: c.c.ID,
Terminal: i.Terminal, Terminal: cfg.Terminal,
Stdin: i.Stdin, Stdin: cfg.Stdin,
Stdout: i.Stdout, Stdout: cfg.Stdout,
Stderr: i.Stderr, Stderr: cfg.Stderr,
} }
if c.c.RootFS != "" { if c.c.RootFS != "" {
// get the rootfs from the snapshotter and add it to the request // get the rootfs from the snapshotter and add it to the request
@ -238,7 +239,7 @@ func (c *container) loadTask(ctx context.Context, ioAttach IOAttach) (Task, erro
} }
return nil, err return nil, err
} }
var i *IO var i IO
if ioAttach != nil { if ioAttach != nil {
// get the existing fifo paths from the task information stored by the daemon // get the existing fifo paths from the task information stored by the daemon
paths := &FIFOSet{ paths := &FIFOSet{

77
io.go
View File

@ -8,8 +8,8 @@ import (
"sync" "sync"
) )
// IO holds the io information for a task or process // IOConfig holds the io configurations.
type IO struct { type IOConfig struct {
// Terminal is true if one has been allocated // Terminal is true if one has been allocated
Terminal bool Terminal bool
// Stdin path // Stdin path
@ -18,39 +18,58 @@ type IO struct {
Stdout string Stdout string
// Stderr path // Stderr path
Stderr string Stderr string
}
// IO holds the io information for a task or process
type IO interface {
// Config returns the IO configuration.
Config() IOConfig
// Cancel aborts all current io operations
Cancel()
// Wait blocks until all io copy operations have completed
Wait()
// Close cleans up all open io resources
Close() error
}
// cio is a basic container IO implementation.
type cio struct {
config IOConfig
closer *wgCloser closer *wgCloser
} }
// Cancel aborts all current io operations func (c *cio) Config() IOConfig {
func (i *IO) Cancel() { return c.config
if i.closer == nil {
return
}
i.closer.Cancel()
} }
// Wait blocks until all io copy operations have completed func (c *cio) Cancel() {
func (i *IO) Wait() { if c.closer == nil {
if i.closer == nil {
return return
} }
i.closer.Wait() c.closer.Cancel()
} }
// Close cleans up all open io resources func (c *cio) Wait() {
func (i *IO) Close() error { if c.closer == nil {
if i.closer == nil { return
}
c.closer.Wait()
}
func (c *cio) Close() error {
if c.closer == nil {
return nil return nil
} }
return i.closer.Close() return c.closer.Close()
} }
// IOCreation creates new IO sets for a task // IOCreation creates new IO sets for a task
type IOCreation func(id string) (*IO, error) type IOCreation func(id string) (IO, error)
// IOAttach allows callers to reattach to running tasks // IOAttach allows callers to reattach to running tasks
type IOAttach func(*FIFOSet) (*IO, error) type IOAttach func(*FIFOSet) (IO, error)
// NewIO returns an IOCreation that will provide IO sets without a terminal // NewIO returns an IOCreation that will provide IO sets without a terminal
func NewIO(stdin io.Reader, stdout, stderr io.Writer) IOCreation { func NewIO(stdin io.Reader, stdout, stderr io.Writer) IOCreation {
@ -59,7 +78,7 @@ func NewIO(stdin io.Reader, stdout, stderr io.Writer) IOCreation {
// NewIOWithTerminal creates a new io set with the provied io.Reader/Writers for use with a terminal // NewIOWithTerminal creates a new io set with the provied io.Reader/Writers for use with a terminal
func NewIOWithTerminal(stdin io.Reader, stdout, stderr io.Writer, terminal bool) IOCreation { func NewIOWithTerminal(stdin io.Reader, stdout, stderr io.Writer, terminal bool) IOCreation {
return func(id string) (_ *IO, err error) { return func(id string) (_ IO, err error) {
paths, err := NewFifos(id) paths, err := NewFifos(id)
if err != nil { if err != nil {
return nil, err return nil, err
@ -69,18 +88,19 @@ func NewIOWithTerminal(stdin io.Reader, stdout, stderr io.Writer, terminal bool)
os.RemoveAll(paths.Dir) os.RemoveAll(paths.Dir)
} }
}() }()
i := &IO{ cfg := IOConfig{
Terminal: terminal, Terminal: terminal,
Stdout: paths.Out, Stdout: paths.Out,
Stderr: paths.Err, Stderr: paths.Err,
Stdin: paths.In, Stdin: paths.In,
} }
i := &cio{config: cfg}
set := &ioSet{ set := &ioSet{
in: stdin, in: stdin,
out: stdout, out: stdout,
err: stderr, err: stderr,
} }
closer, err := copyIO(paths, set, i.Terminal) closer, err := copyIO(paths, set, cfg.Terminal)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -91,22 +111,23 @@ func NewIOWithTerminal(stdin io.Reader, stdout, stderr io.Writer, terminal bool)
// WithAttach attaches the existing io for a task to the provided io.Reader/Writers // WithAttach attaches the existing io for a task to the provided io.Reader/Writers
func WithAttach(stdin io.Reader, stdout, stderr io.Writer) IOAttach { func WithAttach(stdin io.Reader, stdout, stderr io.Writer) IOAttach {
return func(paths *FIFOSet) (*IO, error) { return func(paths *FIFOSet) (IO, error) {
if paths == nil { if paths == nil {
return nil, fmt.Errorf("cannot attach to existing fifos") return nil, fmt.Errorf("cannot attach to existing fifos")
} }
i := &IO{ cfg := IOConfig{
Terminal: paths.Terminal, Terminal: paths.Terminal,
Stdout: paths.Out, Stdout: paths.Out,
Stderr: paths.Err, Stderr: paths.Err,
Stdin: paths.In, Stdin: paths.In,
} }
i := &cio{config: cfg}
set := &ioSet{ set := &ioSet{
in: stdin, in: stdin,
out: stdout, out: stdout,
err: stderr, err: stderr,
} }
closer, err := copyIO(paths, set, i.Terminal) closer, err := copyIO(paths, set, cfg.Terminal)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -117,18 +138,18 @@ func WithAttach(stdin io.Reader, stdout, stderr io.Writer) IOAttach {
// Stdio returns an IO set to be used for a task // Stdio returns an IO set to be used for a task
// that outputs the container's IO as the current processes Stdio // that outputs the container's IO as the current processes Stdio
func Stdio(id string) (*IO, error) { func Stdio(id string) (IO, error) {
return NewIO(os.Stdin, os.Stdout, os.Stderr)(id) return NewIO(os.Stdin, os.Stdout, os.Stderr)(id)
} }
// StdioTerminal will setup the IO for the task to use a terminal // StdioTerminal will setup the IO for the task to use a terminal
func StdioTerminal(id string) (*IO, error) { func StdioTerminal(id string) (IO, error) {
return NewIOWithTerminal(os.Stdin, os.Stdout, os.Stderr, true)(id) return NewIOWithTerminal(os.Stdin, os.Stdout, os.Stderr, true)(id)
} }
// NullIO redirects the container's IO into /dev/null // NullIO redirects the container's IO into /dev/null
func NullIO(id string) (*IO, error) { func NullIO(id string) (IO, error) {
return &IO{}, nil return &cio{}, nil
} }
// FIFOSet is a set of fifos for use with tasks // FIFOSet is a set of fifos for use with tasks

View File

@ -31,7 +31,7 @@ type Process interface {
// Resize changes the width and heigh of the process's terminal // Resize changes the width and heigh of the process's terminal
Resize(ctx context.Context, w, h uint32) error Resize(ctx context.Context, w, h uint32) error
// IO returns the io set for the process // IO returns the io set for the process
IO() *IO IO() IO
// Status returns the executing status of the process // Status returns the executing status of the process
Status(context.Context) (Status, error) Status(context.Context) (Status, error)
} }
@ -40,7 +40,7 @@ type process struct {
id string id string
task *task task *task
pid uint32 pid uint32
io *IO io IO
spec *specs.Process spec *specs.Process
} }
@ -128,7 +128,7 @@ func (p *process) CloseIO(ctx context.Context, opts ...IOCloserOpts) error {
return err return err
} }
func (p *process) IO() *IO { func (p *process) IO() IO {
return p.io return p.io
} }

17
task.go
View File

@ -119,7 +119,7 @@ var _ = (Task)(&task{})
type task struct { type task struct {
client *Client client *Client
io *IO io IO
id string id string
pid uint32 pid uint32
@ -142,7 +142,7 @@ func (t *task) Start(ctx context.Context) error {
t.deferred = nil t.deferred = nil
t.mu.Unlock() t.mu.Unlock()
if err != nil { if err != nil {
t.io.closer.Close() t.io.Close()
return err return err
} }
t.pid = response.Pid t.pid = response.Pid
@ -152,7 +152,7 @@ func (t *task) Start(ctx context.Context) error {
ContainerID: t.id, ContainerID: t.id,
}) })
if err != nil { if err != nil {
t.io.closer.Close() t.io.Close()
} }
return err return err
} }
@ -279,13 +279,14 @@ func (t *task) Exec(ctx context.Context, id string, spec *specs.Process, ioCreat
if err != nil { if err != nil {
return nil, err return nil, err
} }
cfg := i.Config()
request := &tasks.ExecProcessRequest{ request := &tasks.ExecProcessRequest{
ContainerID: t.id, ContainerID: t.id,
ExecID: id, ExecID: id,
Terminal: i.Terminal, Terminal: cfg.Terminal,
Stdin: i.Stdin, Stdin: cfg.Stdin,
Stdout: i.Stdout, Stdout: cfg.Stdout,
Stderr: i.Stderr, Stderr: cfg.Stderr,
Spec: any, Spec: any,
} }
if _, err := t.client.TaskService().Exec(ctx, request); err != nil { if _, err := t.client.TaskService().Exec(ctx, request); err != nil {
@ -325,7 +326,7 @@ func (t *task) CloseIO(ctx context.Context, opts ...IOCloserOpts) error {
return err return err
} }
func (t *task) IO() *IO { func (t *task) IO() IO {
return t.io return t.io
} }