diff --git a/io.go b/cio/io.go similarity index 84% rename from io.go rename to cio/io.go index 48c06f12e..25e398147 100644 --- a/io.go +++ b/cio/io.go @@ -1,4 +1,4 @@ -package containerd +package cio import ( "context" @@ -8,8 +8,8 @@ import ( "sync" ) -// IOConfig holds the io configurations. -type IOConfig struct { +// Config holds the io configurations. +type Config struct { // Terminal is true if one has been allocated Terminal bool // Stdin path @@ -23,7 +23,7 @@ type IOConfig struct { // IO holds the io information for a task or process type IO interface { // Config returns the IO configuration. - Config() IOConfig + Config() Config // Cancel aborts all current io operations Cancel() // Wait blocks until all io copy operations have completed @@ -34,12 +34,12 @@ type IO interface { // cio is a basic container IO implementation. type cio struct { - config IOConfig + config Config closer *wgCloser } -func (c *cio) Config() IOConfig { +func (c *cio) Config() Config { return c.config } @@ -64,23 +64,23 @@ func (c *cio) Close() error { return c.closer.Close() } -// IOCreation creates new IO sets for a task -type IOCreation func(id string) (IO, error) +// Creation creates new IO sets for a task +type Creation func(id string) (IO, error) -// IOAttach allows callers to reattach to running tasks +// Attach allows callers to reattach to running tasks // // There should only be one reader for a task's IO set // because fifo's can only be read from one reader or the output // will be sent only to the first reads -type IOAttach func(*FIFOSet) (IO, error) +type Attach func(*FIFOSet) (IO, error) -// NewIO returns an IOCreation that will provide IO sets without a terminal -func NewIO(stdin io.Reader, stdout, stderr io.Writer) IOCreation { +// NewIO returns an Creation that will provide IO sets without a terminal +func NewIO(stdin io.Reader, stdout, stderr io.Writer) Creation { return NewIOWithTerminal(stdin, stdout, stderr, false) } // NewIOWithTerminal creates a new io set with the provied io.Reader/Writers for use with a terminal -func NewIOWithTerminal(stdin io.Reader, stdout, stderr io.Writer, terminal bool) IOCreation { +func NewIOWithTerminal(stdin io.Reader, stdout, stderr io.Writer, terminal bool) Creation { return func(id string) (_ IO, err error) { paths, err := NewFifos(id) if err != nil { @@ -91,7 +91,7 @@ func NewIOWithTerminal(stdin io.Reader, stdout, stderr io.Writer, terminal bool) os.RemoveAll(paths.Dir) } }() - cfg := IOConfig{ + cfg := Config{ Terminal: terminal, Stdout: paths.Out, Stderr: paths.Err, @@ -113,12 +113,12 @@ func NewIOWithTerminal(stdin io.Reader, stdout, stderr io.Writer, terminal bool) } // WithAttach attaches the existing io for a task to the provided io.Reader/Writers -func WithAttach(stdin io.Reader, stdout, stderr io.Writer) IOAttach { +func WithAttach(stdin io.Reader, stdout, stderr io.Writer) Attach { return func(paths *FIFOSet) (IO, error) { if paths == nil { return nil, fmt.Errorf("cannot attach to existing fifos") } - cfg := IOConfig{ + cfg := Config{ Terminal: paths.Terminal, Stdout: paths.Out, Stderr: paths.Err, diff --git a/io_unix.go b/cio/io_unix.go similarity index 97% rename from io_unix.go rename to cio/io_unix.go index 08aba14ba..c18f7ecf9 100644 --- a/io_unix.go +++ b/cio/io_unix.go @@ -1,6 +1,6 @@ // +build !windows -package containerd +package cio import ( "context" @@ -139,9 +139,9 @@ func (f *DirectIO) IOAttach(set *FIFOSet) (IO, error) { return f, nil } -// Config returns the IOConfig -func (f *DirectIO) Config() IOConfig { - return IOConfig{ +// Config returns the Config +func (f *DirectIO) Config() Config { + return Config{ Terminal: f.terminal, Stdin: f.set.In, Stdout: f.set.Out, diff --git a/io_windows.go b/cio/io_windows.go similarity index 99% rename from io_windows.go rename to cio/io_windows.go index e37568c26..1458c3173 100644 --- a/io_windows.go +++ b/cio/io_windows.go @@ -1,4 +1,4 @@ -package containerd +package cio import ( "fmt" diff --git a/cmd/containerd-stress/main.go b/cmd/containerd-stress/main.go index b76dbc79b..17eb70674 100644 --- a/cmd/containerd-stress/main.go +++ b/cmd/containerd-stress/main.go @@ -13,6 +13,7 @@ import ( "time" "github.com/containerd/containerd" + "github.com/containerd/containerd/cio" "github.com/containerd/containerd/containers" "github.com/containerd/containerd/namespaces" specs "github.com/opencontainers/runtime-spec/specs-go" @@ -206,7 +207,7 @@ func (w *worker) runContainer(ctx context.Context, id string) error { } defer c.Delete(ctx, containerd.WithSnapshotCleanup) - task, err := c.NewTask(ctx, containerd.NullIO) + task, err := c.NewTask(ctx, cio.NullIO) if err != nil { return err } diff --git a/cmd/ctr/commands/tasks/attach.go b/cmd/ctr/commands/tasks/attach.go index 22e6fb90e..ec4da1005 100644 --- a/cmd/ctr/commands/tasks/attach.go +++ b/cmd/ctr/commands/tasks/attach.go @@ -4,7 +4,7 @@ import ( "os" "github.com/containerd/console" - "github.com/containerd/containerd" + "github.com/containerd/containerd/cio" "github.com/containerd/containerd/cmd/ctr/commands" "github.com/sirupsen/logrus" "github.com/urfave/cli" @@ -39,7 +39,7 @@ var attachCommand = cli.Command{ return err } } - task, err := container.Task(ctx, containerd.WithAttach(os.Stdin, os.Stdout, os.Stderr)) + task, err := container.Task(ctx, cio.WithAttach(os.Stdin, os.Stdout, os.Stderr)) if err != nil { return err } diff --git a/cmd/ctr/commands/tasks/exec.go b/cmd/ctr/commands/tasks/exec.go index f3303ade2..a7301136b 100644 --- a/cmd/ctr/commands/tasks/exec.go +++ b/cmd/ctr/commands/tasks/exec.go @@ -4,7 +4,7 @@ import ( "errors" "github.com/containerd/console" - "github.com/containerd/containerd" + "github.com/containerd/containerd/cio" "github.com/containerd/containerd/cmd/ctr/commands" "github.com/sirupsen/logrus" "github.com/urfave/cli" @@ -60,11 +60,11 @@ var execCommand = cli.Command{ pspec.Terminal = tty pspec.Args = args - io := containerd.Stdio + ioCreator := cio.Stdio if tty { - io = containerd.StdioTerminal + ioCreator = cio.StdioTerminal } - process, err := task.Exec(ctx, context.String("exec-id"), pspec, io) + process, err := task.Exec(ctx, context.String("exec-id"), pspec, ioCreator) if err != nil { return err } diff --git a/cmd/ctr/commands/tasks/tasks_unix.go b/cmd/ctr/commands/tasks/tasks_unix.go index edda13b31..0621f9566 100644 --- a/cmd/ctr/commands/tasks/tasks_unix.go +++ b/cmd/ctr/commands/tasks/tasks_unix.go @@ -9,6 +9,7 @@ import ( "github.com/containerd/console" "github.com/containerd/containerd" + "github.com/containerd/containerd/cio" "github.com/containerd/containerd/log" "github.com/pkg/errors" "golang.org/x/sys/unix" @@ -44,21 +45,21 @@ func HandleConsoleResize(ctx gocontext.Context, task resizer, con console.Consol // NewTask creates a new task func NewTask(ctx gocontext.Context, client *containerd.Client, container containerd.Container, checkpoint string, tty, nullIO bool) (containerd.Task, error) { if checkpoint == "" { - io := containerd.Stdio + ioCreator := cio.Stdio if tty { - io = containerd.StdioTerminal + ioCreator = cio.StdioTerminal } if nullIO { if tty { return nil, errors.New("tty and null-io cannot be used together") } - io = containerd.NullIO + ioCreator = cio.NullIO } - return container.NewTask(ctx, io) + return container.NewTask(ctx, ioCreator) } im, err := client.GetImage(ctx, checkpoint) if err != nil { return nil, err } - return container.NewTask(ctx, containerd.Stdio, containerd.WithTaskCheckpoint(im)) + return container.NewTask(ctx, cio.Stdio, containerd.WithTaskCheckpoint(im)) } diff --git a/cmd/ctr/commands/tasks/tasks_windows.go b/cmd/ctr/commands/tasks/tasks_windows.go index dda2fb1d6..083ad49e6 100644 --- a/cmd/ctr/commands/tasks/tasks_windows.go +++ b/cmd/ctr/commands/tasks/tasks_windows.go @@ -6,6 +6,7 @@ import ( "github.com/containerd/console" "github.com/containerd/containerd" + "github.com/containerd/containerd/cio" "github.com/containerd/containerd/log" "github.com/pkg/errors" ) @@ -41,15 +42,15 @@ func HandleConsoleResize(ctx gocontext.Context, task resizer, con console.Consol // NewTask creates a new task func NewTask(ctx gocontext.Context, client *containerd.Client, container containerd.Container, _ string, tty, nullIO bool) (containerd.Task, error) { - io := containerd.Stdio + ioCreator := cio.Stdio if tty { - io = containerd.StdioTerminal + ioCreator = cio.StdioTerminal } if nullIO { if tty { return nil, errors.New("tty and null-io cannot be used together") } - io = containerd.NullIO + ioCreator = cio.NullIO } - return container.NewTask(ctx, io) + return container.NewTask(ctx, ioCreator) } diff --git a/container.go b/container.go index a9750eca0..2d5c9aedb 100644 --- a/container.go +++ b/container.go @@ -8,6 +8,7 @@ import ( "github.com/containerd/containerd/api/services/tasks/v1" "github.com/containerd/containerd/api/types" + "github.com/containerd/containerd/cio" "github.com/containerd/containerd/containers" "github.com/containerd/containerd/errdefs" "github.com/containerd/typeurl" @@ -25,17 +26,17 @@ type Container interface { // Delete removes the container Delete(context.Context, ...DeleteOpts) error // NewTask creates a new task based on the container metadata - NewTask(context.Context, IOCreation, ...NewTaskOpts) (Task, error) + NewTask(context.Context, cio.Creation, ...NewTaskOpts) (Task, error) // Spec returns the OCI runtime specification Spec(context.Context) (*specs.Spec, error) // Task returns the current task for the container // - // If IOAttach options are passed the client will reattach to the IO for the running + // If cio.Attach options are passed the client will reattach to the IO for the running // task. If no task exists for the container a NotFound error is returned // // Clients must make sure that only one reader is attached to the task and consuming // the output from the task's fifos - Task(context.Context, IOAttach) (Task, error) + Task(context.Context, cio.Attach) (Task, error) // Image returns the image that the container is based on Image(context.Context) (Image, error) // Labels returns the labels set on the container @@ -138,7 +139,7 @@ func (c *container) Delete(ctx context.Context, opts ...DeleteOpts) error { return c.client.ContainerService().Delete(ctx, c.id) } -func (c *container) Task(ctx context.Context, attach IOAttach) (Task, error) { +func (c *container) Task(ctx context.Context, attach cio.Attach) (Task, error) { return c.loadTask(ctx, attach) } @@ -161,7 +162,7 @@ func (c *container) Image(ctx context.Context) (Image, error) { }, nil } -func (c *container) NewTask(ctx context.Context, ioCreate IOCreation, opts ...NewTaskOpts) (Task, error) { +func (c *container) NewTask(ctx context.Context, ioCreate cio.Creation, opts ...NewTaskOpts) (Task, error) { i, err := ioCreate(c.id) if err != nil { return nil, err @@ -251,7 +252,7 @@ func (c *container) Update(ctx context.Context, opts ...UpdateContainerOpts) err return nil } -func (c *container) loadTask(ctx context.Context, ioAttach IOAttach) (Task, error) { +func (c *container) loadTask(ctx context.Context, ioAttach cio.Attach) (Task, error) { response, err := c.client.TaskService().Get(ctx, &tasks.GetRequest{ ContainerID: c.id, }) @@ -262,7 +263,7 @@ func (c *container) loadTask(ctx context.Context, ioAttach IOAttach) (Task, erro } return nil, err } - var i IO + var i cio.IO if ioAttach != nil { if i, err = attachExistingIO(response, ioAttach); err != nil { return nil, err @@ -281,9 +282,9 @@ func (c *container) get(ctx context.Context) (containers.Container, error) { return c.client.ContainerService().Get(ctx, c.id) } -func attachExistingIO(response *tasks.GetResponse, ioAttach IOAttach) (IO, error) { +func attachExistingIO(response *tasks.GetResponse, ioAttach cio.Attach) (cio.IO, error) { // get the existing fifo paths from the task information stored by the daemon - paths := &FIFOSet{ + paths := &cio.FIFOSet{ Dir: getFifoDir([]string{ response.Process.Stdin, response.Process.Stdout, diff --git a/container_linux_test.go b/container_linux_test.go index 34509b412..06a35ae07 100644 --- a/container_linux_test.go +++ b/container_linux_test.go @@ -16,6 +16,7 @@ import ( "time" "github.com/containerd/cgroups" + "github.com/containerd/containerd/cio" "github.com/containerd/containerd/containers" "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/linux/runctypes" @@ -294,7 +295,7 @@ func TestContainerAttach(t *testing.T) { expected := "hello" + newLine - direct, err := NewDirectIO(ctx, false) + direct, err := cio.NewDirectIO(ctx, false) if err != nil { t.Error(err) return @@ -389,7 +390,7 @@ func TestContainerUsername(t *testing.T) { t.Error(err) return } - direct, err := NewDirectIO(ctx, false) + direct, err := cio.NewDirectIO(ctx, false) if err != nil { t.Error(err) return @@ -482,7 +483,7 @@ func TestContainerAttachProcess(t *testing.T) { expected := "hello" + newLine // creating IO early for easy resource cleanup - direct, err := NewDirectIO(ctx, false) + direct, err := cio.NewDirectIO(ctx, false) if err != nil { t.Error(err) return @@ -598,7 +599,7 @@ func TestContainerUserID(t *testing.T) { t.Error(err) return } - direct, err := NewDirectIO(ctx, false) + direct, err := cio.NewDirectIO(ctx, false) if err != nil { t.Error(err) return @@ -688,7 +689,7 @@ func TestContainerKillAll(t *testing.T) { defer container.Delete(ctx, WithSnapshotCleanup) stdout := bytes.NewBuffer(nil) - task, err := container.NewTask(ctx, NewIO(bytes.NewBuffer(nil), stdout, bytes.NewBuffer(nil))) + task, err := container.NewTask(ctx, cio.NewIO(bytes.NewBuffer(nil), stdout, bytes.NewBuffer(nil))) if err != nil { t.Error(err) return @@ -940,7 +941,7 @@ func TestContainerKillInitPidHost(t *testing.T) { defer container.Delete(ctx, WithSnapshotCleanup) stdout := bytes.NewBuffer(nil) - task, err := container.NewTask(ctx, NewIO(bytes.NewBuffer(nil), stdout, bytes.NewBuffer(nil))) + task, err := container.NewTask(ctx, cio.NewIO(bytes.NewBuffer(nil), stdout, bytes.NewBuffer(nil))) if err != nil { t.Error(err) return @@ -1039,7 +1040,7 @@ func testUserNamespaces(t *testing.T, readonlyRootFS bool) { } defer container.Delete(ctx, WithSnapshotCleanup) - task, err := container.NewTask(ctx, Stdio, func(_ context.Context, client *Client, r *TaskInfo) error { + task, err := container.NewTask(ctx, cio.Stdio, func(_ context.Context, client *Client, r *TaskInfo) error { r.Options = &runctypes.CreateOptions{ IoUid: 1000, IoGid: 1000, diff --git a/container_test.go b/container_test.go index b0ce42def..db6e63910 100644 --- a/container_test.go +++ b/container_test.go @@ -12,6 +12,7 @@ import ( "time" // Register the typeurl + "github.com/containerd/containerd/cio" "github.com/containerd/containerd/containers" _ "github.com/containerd/containerd/runtime" "github.com/containerd/typeurl" @@ -21,13 +22,13 @@ import ( gogotypes "github.com/gogo/protobuf/types" ) -func empty() IOCreation { +func empty() cio.Creation { // TODO (@mlaventure) windows searches for pipes // when none are provided if runtime.GOOS == "windows" { - return Stdio + return cio.Stdio } - return NullIO + return cio.NullIO } func TestContainerList(t *testing.T) { @@ -185,7 +186,7 @@ func TestContainerOutput(t *testing.T) { defer container.Delete(ctx, WithSnapshotCleanup) stdout := bytes.NewBuffer(nil) - task, err := container.NewTask(ctx, NewIO(bytes.NewBuffer(nil), stdout, bytes.NewBuffer(nil))) + task, err := container.NewTask(ctx, cio.NewIO(bytes.NewBuffer(nil), stdout, bytes.NewBuffer(nil))) if err != nil { t.Error(err) return @@ -445,7 +446,7 @@ func TestContainerCloseIO(t *testing.T) { return } - task, err := container.NewTask(ctx, NewIO(r, stdout, ioutil.Discard)) + task, err := container.NewTask(ctx, cio.NewIO(r, stdout, ioutil.Discard)) if err != nil { t.Error(err) return @@ -1054,7 +1055,7 @@ func TestContainerHostname(t *testing.T) { defer container.Delete(ctx, WithSnapshotCleanup) stdout := bytes.NewBuffer(nil) - task, err := container.NewTask(ctx, NewIO(bytes.NewBuffer(nil), stdout, bytes.NewBuffer(nil))) + task, err := container.NewTask(ctx, cio.NewIO(bytes.NewBuffer(nil), stdout, bytes.NewBuffer(nil))) if err != nil { t.Error(err) return diff --git a/process.go b/process.go index ebefb6973..ad1a2a1f4 100644 --- a/process.go +++ b/process.go @@ -7,6 +7,7 @@ import ( "time" "github.com/containerd/containerd/api/services/tasks/v1" + "github.com/containerd/containerd/cio" "github.com/containerd/containerd/errdefs" "github.com/pkg/errors" ) @@ -28,7 +29,7 @@ type Process interface { // Resize changes the width and heigh of the process's terminal Resize(ctx context.Context, w, h uint32) error // IO returns the io set for the process - IO() IO + IO() cio.IO // Status returns the executing status of the process Status(context.Context) (Status, error) } @@ -72,7 +73,7 @@ type process struct { id string task *task pid uint32 - io IO + io cio.IO } func (p *process) ID() string { @@ -154,7 +155,7 @@ func (p *process) CloseIO(ctx context.Context, opts ...IOCloserOpts) error { return errdefs.FromGRPC(err) } -func (p *process) IO() IO { +func (p *process) IO() cio.IO { return p.io } diff --git a/task.go b/task.go index a4e5fe4d1..8d447b32f 100644 --- a/task.go +++ b/task.go @@ -14,6 +14,7 @@ import ( "github.com/containerd/containerd/api/services/tasks/v1" "github.com/containerd/containerd/api/types" + "github.com/containerd/containerd/cio" "github.com/containerd/containerd/content" "github.com/containerd/containerd/diff" "github.com/containerd/containerd/errdefs" @@ -123,7 +124,7 @@ type Task interface { // Resume the execution of the task Resume(context.Context) error // Exec creates a new process inside the task - Exec(context.Context, string, *specs.Process, IOCreation) (Process, error) + Exec(context.Context, string, *specs.Process, cio.Creation) (Process, error) // Pids returns a list of system specific process ids inside the task Pids(context.Context) ([]ProcessInfo, error) // Checkpoint serializes the runtime and memory information of a task into an @@ -134,7 +135,7 @@ type Task interface { // Update modifies executing tasks with updated settings Update(context.Context, ...UpdateTaskOpts) error // LoadProcess loads a previously created exec'd process - LoadProcess(context.Context, string, IOAttach) (Process, error) + LoadProcess(context.Context, string, cio.Attach) (Process, error) // Metrics returns task metrics for runtime specific metrics // // The metric types are generic to containerd and change depending on the runtime @@ -148,7 +149,7 @@ var _ = (Task)(&task{}) type task struct { client *Client - io IO + io cio.IO id string pid uint32 @@ -279,7 +280,7 @@ func (t *task) Delete(ctx context.Context, opts ...ProcessDeleteOpts) (*ExitStat return &ExitStatus{code: r.ExitStatus, exitedAt: r.ExitedAt}, nil } -func (t *task) Exec(ctx context.Context, id string, spec *specs.Process, ioCreate IOCreation) (Process, error) { +func (t *task) Exec(ctx context.Context, id string, spec *specs.Process, ioCreate cio.Creation) (Process, error) { if id == "" { return nil, errors.Wrapf(errdefs.ErrInvalidArgument, "exec id must not be empty") } @@ -344,7 +345,7 @@ func (t *task) CloseIO(ctx context.Context, opts ...IOCloserOpts) error { return errdefs.FromGRPC(err) } -func (t *task) IO() IO { +func (t *task) IO() cio.IO { return t.io } @@ -461,7 +462,7 @@ func (t *task) Update(ctx context.Context, opts ...UpdateTaskOpts) error { return errdefs.FromGRPC(err) } -func (t *task) LoadProcess(ctx context.Context, id string, ioAttach IOAttach) (Process, error) { +func (t *task) LoadProcess(ctx context.Context, id string, ioAttach cio.Attach) (Process, error) { response, err := t.client.TaskService().Get(ctx, &tasks.GetRequest{ ContainerID: t.id, ExecID: id, @@ -473,7 +474,7 @@ func (t *task) LoadProcess(ctx context.Context, id string, ioAttach IOAttach) (P } return nil, err } - var i IO + var i cio.IO if ioAttach != nil { if i, err = attachExistingIO(response, ioAttach); err != nil { return nil, err