diff --git a/pkg/server/container_create.go b/pkg/server/container_create.go index dd0e05259..5922d5230 100644 --- a/pkg/server/container_create.go +++ b/pkg/server/container_create.go @@ -175,10 +175,7 @@ func (c *criContainerdService) CreateContainer(ctx context.Context, r *runtime.C } containerIO, err := cio.NewContainerIO(id, - cio.WithStdinOpen(config.GetStdin()), - cio.WithTerminal(config.GetTty()), - cio.WithRootDir(containerRootDir), - ) + cio.WithNewFIFOs(containerRootDir, config.GetTty(), config.GetStdin())) if err != nil { return nil, fmt.Errorf("failed to create container io: %v", err) } diff --git a/pkg/server/container_execsync.go b/pkg/server/container_execsync.go index b5fec799f..594147bb1 100644 --- a/pkg/server/container_execsync.go +++ b/pkg/server/container_execsync.go @@ -20,7 +20,6 @@ import ( "bytes" "fmt" "io" - "io/ioutil" "time" "github.com/containerd/containerd" @@ -32,6 +31,8 @@ import ( "k8s.io/client-go/tools/remotecommand" "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" + cioutil "github.com/kubernetes-incubator/cri-containerd/pkg/ioutil" + cio "github.com/kubernetes-incubator/cri-containerd/pkg/server/io" "github.com/kubernetes-incubator/cri-containerd/pkg/util" ) @@ -41,8 +42,8 @@ func (c *criContainerdService) ExecSync(ctx context.Context, r *runtime.ExecSync var stdout, stderr bytes.Buffer exitCode, err := c.execInContainer(ctx, r.GetContainerId(), execOptions{ cmd: r.GetCmd(), - stdout: &stdout, - stderr: &stderr, + stdout: cioutil.NewNopWriteCloser(&stdout), + stderr: cioutil.NewNopWriteCloser(&stderr), timeout: time.Duration(r.GetTimeout()) * time.Second, }) if err != nil { @@ -60,8 +61,8 @@ func (c *criContainerdService) ExecSync(ctx context.Context, r *runtime.ExecSync type execOptions struct { cmd []string stdin io.Reader - stdout io.Writer - stderr io.Writer + stdout io.WriteCloser + stderr io.WriteCloser tty bool resize <-chan remotecommand.TerminalSize timeout time.Duration @@ -106,22 +107,23 @@ func (c *criContainerdService) execInContainer(ctx context.Context, id string, o pspec.Args = opts.cmd pspec.Terminal = opts.tty - if opts.stdin == nil { - opts.stdin = new(bytes.Buffer) - } if opts.stdout == nil { - opts.stdout = ioutil.Discard + opts.stdout = cio.NewDiscardLogger() } if opts.stderr == nil { - opts.stderr = ioutil.Discard + opts.stderr = cio.NewDiscardLogger() } execID := util.GenerateID() - process, err := task.Exec(ctx, execID, pspec, containerd.NewIOWithTerminal( - opts.stdin, - opts.stdout, - opts.stderr, - opts.tty, - )) + glog.V(4).Infof("Generated exec id %q for container %q", execID, id) + rootDir := getContainerRootDir(c.config.RootDir, id) + var execIO *cio.ExecIO + process, err := task.Exec(ctx, execID, pspec, + func(id string) (containerd.IO, error) { + var err error + execIO, err = cio.NewExecIO(id, rootDir, opts.tty, opts.stdin != nil) + return execIO, err + }, + ) if err != nil { return nil, fmt.Errorf("failed to create exec %q: %v", execID, err) } @@ -145,6 +147,17 @@ func (c *criContainerdService) execInContainer(ctx context.Context, id string, o } }) + attachDone := execIO.Attach(cio.AttachOptions{ + Stdin: opts.stdin, + Stdout: opts.stdout, + Stderr: opts.stderr, + Tty: opts.tty, + StdinOnce: true, + CloseStdin: func() error { + return process.CloseIO(ctx, containerd.WithStdinCloser) + }, + }) + var timeoutCh <-chan time.Time if opts.timeout == 0 { // Do not set timeout if it's 0. @@ -163,6 +176,8 @@ func (c *criContainerdService) execInContainer(ctx context.Context, id string, o exitRes := <-exitCh glog.V(2).Infof("Timeout received while waiting for exec process kill %q code %d and error %v", execID, exitRes.ExitCode(), exitRes.Error()) + <-attachDone + glog.V(4).Infof("Stream pipe for exec process %q done", execID) return nil, fmt.Errorf("timeout %v exceeded", opts.timeout) case exitRes := <-exitCh: code, _, err := exitRes.Result() @@ -170,6 +185,8 @@ func (c *criContainerdService) execInContainer(ctx context.Context, id string, o if err != nil { return nil, fmt.Errorf("failed while waiting for exec %q: %v", execID, err) } + <-attachDone + glog.V(4).Infof("Stream pipe for exec process %q done", execID) return &code, nil } } diff --git a/pkg/server/container_start.go b/pkg/server/container_start.go index 9febe006f..f60e37f65 100644 --- a/pkg/server/container_start.go +++ b/pkg/server/container_start.go @@ -121,9 +121,7 @@ func (c *criContainerdService) startContainer(ctx context.Context, if err := cio.WithOutput("log", stdoutWC, stderrWC)(cntr.IO); err != nil { return nil, fmt.Errorf("failed to add container log: %v", err) } - if err := cntr.IO.Pipe(); err != nil { - return nil, fmt.Errorf("failed to pipe container io: %v", err) - } + cntr.IO.Pipe() return cntr.IO, nil } diff --git a/pkg/server/io/container_io.go b/pkg/server/io/container_io.go new file mode 100644 index 000000000..8ccead92a --- /dev/null +++ b/pkg/server/io/container_io.go @@ -0,0 +1,245 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io + +import ( + "errors" + "io" + "os" + "strings" + "sync" + + "github.com/containerd/containerd" + "github.com/golang/glog" + + cioutil "github.com/kubernetes-incubator/cri-containerd/pkg/ioutil" + "github.com/kubernetes-incubator/cri-containerd/pkg/util" +) + +// streamKey generates a key for the stream. +func streamKey(id, name string, stream StreamType) string { + return strings.Join([]string{id, name, string(stream)}, "-") +} + +// ContainerIO holds the container io. +type ContainerIO struct { + id string + + fifos *containerd.FIFOSet + *stdioPipes + + stdoutGroup *cioutil.WriterGroup + stderrGroup *cioutil.WriterGroup + + closer *wgCloser +} + +var _ containerd.IO = &ContainerIO{} + +// ContainerIOOpts sets specific information to newly created ContainerIO. +type ContainerIOOpts func(*ContainerIO) error + +// WithOutput adds output stream to the container io. +func WithOutput(name string, stdout, stderr io.WriteCloser) ContainerIOOpts { + return func(c *ContainerIO) error { + if stdout != nil { + if err := c.stdoutGroup.Add(streamKey(c.id, name, Stdout), stdout); err != nil { + return err + } + } + if stderr != nil { + if err := c.stderrGroup.Add(streamKey(c.id, name, Stderr), stderr); err != nil { + return err + } + } + return nil + } +} + +// WithFIFOs specifies existing fifos for the container io. +func WithFIFOs(fifos *containerd.FIFOSet) ContainerIOOpts { + return func(c *ContainerIO) error { + c.fifos = fifos + return nil + } +} + +// WithNewFIFOs creates new fifos for the container io. +func WithNewFIFOs(root string, tty, stdin bool) ContainerIOOpts { + return func(c *ContainerIO) error { + fifos, err := newFifos(root, c.id, tty, stdin) + if err != nil { + return err + } + return WithFIFOs(fifos)(c) + } +} + +// NewContainerIO creates container io. +func NewContainerIO(id string, opts ...ContainerIOOpts) (_ *ContainerIO, err error) { + c := &ContainerIO{ + id: id, + stdoutGroup: cioutil.NewWriterGroup(), + stderrGroup: cioutil.NewWriterGroup(), + } + for _, opt := range opts { + if err := opt(c); err != nil { + return nil, err + } + } + if c.fifos == nil { + return nil, errors.New("fifos are not set") + } + // Create actual fifos. + stdio, closer, err := newStdioPipes(c.fifos) + if err != nil { + return nil, err + } + c.stdioPipes = stdio + c.closer = closer + return c, nil +} + +// Config returns io config. +func (c *ContainerIO) Config() containerd.IOConfig { + return containerd.IOConfig{ + Terminal: c.fifos.Terminal, + Stdin: c.fifos.In, + Stdout: c.fifos.Out, + Stderr: c.fifos.Err, + } +} + +// Pipe creates container fifos and pipe container output +// to output stream. +func (c *ContainerIO) Pipe() { + wg := c.closer.wg + wg.Add(1) + go func() { + if _, err := io.Copy(c.stdoutGroup, c.stdout); err != nil { + glog.Errorf("Failed to pipe stdout of container %q: %v", c.id, err) + } + c.stdout.Close() + c.stdoutGroup.Close() + wg.Done() + glog.V(2).Infof("Finish piping stdout of container %q", c.id) + }() + + if !c.fifos.Terminal { + wg.Add(1) + go func() { + if _, err := io.Copy(c.stderrGroup, c.stderr); err != nil { + glog.Errorf("Failed to pipe stderr of container %q: %v", c.id, err) + } + c.stderr.Close() + c.stderrGroup.Close() + wg.Done() + glog.V(2).Infof("Finish piping stderr of container %q", c.id) + }() + } +} + +// Attach attaches container stdio. +// TODO(random-liu): Use pools.Copy in docker to reduce memory usage? +func (c *ContainerIO) Attach(opts AttachOptions) error { + var wg sync.WaitGroup + key := util.GenerateID() + stdinKey := streamKey(c.id, "attach-"+key, Stdin) + stdoutKey := streamKey(c.id, "attach-"+key, Stdout) + stderrKey := streamKey(c.id, "attach-"+key, Stderr) + + var stdinStreamRC io.ReadCloser + if c.stdin != nil && opts.Stdin != nil { + // Create a wrapper of stdin which could be closed. Note that the + // wrapper doesn't close the actual stdin, it only stops io.Copy. + // The actual stdin will be closed by stream server. + stdinStreamRC = cioutil.NewWrapReadCloser(opts.Stdin) + wg.Add(1) + go func() { + if _, err := io.Copy(c.stdin, stdinStreamRC); err != nil { + glog.Errorf("Failed to pipe stdin for container attach %q: %v", c.id, err) + } + glog.V(2).Infof("Attach stream %q closed", stdinKey) + if opts.StdinOnce && !opts.Tty { + // Due to kubectl requirements and current docker behavior, when (opts.StdinOnce && + // opts.Tty) we have to close container stdin and keep stdout and stderr open until + // container stops. + c.stdin.Close() + // Also closes the containerd side. + if err := opts.CloseStdin(); err != nil { + glog.Errorf("Failed to close stdin for container %q: %v", c.id, err) + } + } else { + if opts.Stdout != nil { + c.stdoutGroup.Remove(stdoutKey) + } + if opts.Stderr != nil { + c.stderrGroup.Remove(stderrKey) + } + } + wg.Done() + }() + } + + attachStream := func(key string, close <-chan struct{}) { + <-close + glog.V(2).Infof("Attach stream %q closed", key) + // Make sure stdin gets closed. + if stdinStreamRC != nil { + stdinStreamRC.Close() + } + wg.Done() + } + + if opts.Stdout != nil { + wg.Add(1) + wc, close := cioutil.NewWriteCloseInformer(opts.Stdout) + if err := c.stdoutGroup.Add(stdoutKey, wc); err != nil { + return err + } + go attachStream(stdoutKey, close) + } + if !opts.Tty && opts.Stderr != nil { + wg.Add(1) + wc, close := cioutil.NewWriteCloseInformer(opts.Stderr) + if err := c.stderrGroup.Add(stderrKey, wc); err != nil { + return err + } + go attachStream(stderrKey, close) + } + wg.Wait() + return nil +} + +// Cancel cancels container io. +func (c *ContainerIO) Cancel() { + c.closer.Cancel() +} + +// Wait waits container io to finish. +func (c *ContainerIO) Wait() { + c.closer.Wait() +} + +// Close closes all FIFOs. +func (c *ContainerIO) Close() error { + c.closer.Close() + if c.fifos != nil { + return os.RemoveAll(c.fifos.Dir) + } + return nil +} diff --git a/pkg/server/io/exec_io.go b/pkg/server/io/exec_io.go new file mode 100644 index 000000000..35e2dde8a --- /dev/null +++ b/pkg/server/io/exec_io.go @@ -0,0 +1,152 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io + +import ( + "io" + "os" + "sync" + + "github.com/containerd/containerd" + "github.com/golang/glog" + + cioutil "github.com/kubernetes-incubator/cri-containerd/pkg/ioutil" +) + +// ExecIO holds the exec io. +type ExecIO struct { + id string + fifos *containerd.FIFOSet + *stdioPipes + closer *wgCloser +} + +var _ containerd.IO = &ExecIO{} + +// NewExecIO creates exec io. +func NewExecIO(id, root string, tty, stdin bool) (*ExecIO, error) { + fifos, err := newFifos(root, id, tty, stdin) + if err != nil { + return nil, err + } + stdio, closer, err := newStdioPipes(fifos) + if err != nil { + return nil, err + } + return &ExecIO{ + id: id, + fifos: fifos, + stdioPipes: stdio, + closer: closer, + }, nil +} + +// Config returns io config. +func (e *ExecIO) Config() containerd.IOConfig { + return containerd.IOConfig{ + Terminal: e.fifos.Terminal, + Stdin: e.fifos.In, + Stdout: e.fifos.Out, + Stderr: e.fifos.Err, + } +} + +// Attach attaches exec stdio. The logic is similar with container io attach. +func (e *ExecIO) Attach(opts AttachOptions) <-chan struct{} { + var wg sync.WaitGroup + var stdinStreamRC io.ReadCloser + if e.stdin != nil && opts.Stdin != nil { + stdinStreamRC = cioutil.NewWrapReadCloser(opts.Stdin) + wg.Add(1) + go func() { + if _, err := io.Copy(e.stdin, stdinStreamRC); err != nil { + glog.Errorf("Failed to redirect stdin for container exec %q: %v", e.id, err) + } + glog.V(2).Infof("Container exec %q stdin closed", e.id) + if opts.StdinOnce && !opts.Tty { + e.stdin.Close() + if err := opts.CloseStdin(); err != nil { + glog.Errorf("Failed to close stdin for container exec %q: %v", e.id, err) + } + } else { + if e.stdout != nil { + e.stdout.Close() + } + if e.stderr != nil { + e.stderr.Close() + } + } + wg.Done() + }() + } + + attachOutput := func(t StreamType, stream io.WriteCloser, out io.ReadCloser) { + if _, err := io.Copy(stream, out); err != nil { + glog.Errorf("Failed to pipe %q for container exec %q: %v", t, e.id, err) + } + out.Close() + stream.Close() + if stdinStreamRC != nil { + stdinStreamRC.Close() + } + e.closer.wg.Done() + wg.Done() + glog.V(2).Infof("Finish piping %q of container exec %q", t, e.id) + } + + if opts.Stdout != nil { + wg.Add(1) + // Closer should wait for this routine to be over. + e.closer.wg.Add(1) + go attachOutput(Stdout, opts.Stdout, e.stdout) + } + + if !opts.Tty && opts.Stderr != nil { + wg.Add(1) + // Closer should wait for this routine to be over. + e.closer.wg.Add(1) + go attachOutput(Stderr, opts.Stderr, e.stderr) + } + + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + return done +} + +// Cancel cancels exec io. +func (e *ExecIO) Cancel() { + e.closer.Cancel() +} + +// Wait waits exec io to finish. +func (e *ExecIO) Wait() { + e.closer.Wait() +} + +// Close closes all FIFOs. +func (e *ExecIO) Close() error { + if e.closer != nil { + e.closer.Close() + } + if e.fifos != nil { + return os.RemoveAll(e.fifos.Dir) + } + return nil +} diff --git a/pkg/server/io/helpers.go b/pkg/server/io/helpers.go new file mode 100644 index 000000000..88b5c076d --- /dev/null +++ b/pkg/server/io/helpers.go @@ -0,0 +1,148 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io + +import ( + "io" + "io/ioutil" + "os" + "path/filepath" + "sync" + "syscall" + + "github.com/containerd/containerd" + "github.com/containerd/fifo" + "golang.org/x/net/context" +) + +// AttachOptions specifies how to attach to a container. +type AttachOptions struct { + Stdin io.Reader + Stdout io.WriteCloser + Stderr io.WriteCloser + Tty bool + StdinOnce bool + // CloseStdin is the function to close container stdin. + CloseStdin func() error +} + +// StreamType is the type of the stream, stdout/stderr. +type StreamType string + +const ( + // Stdin stream type. + Stdin StreamType = "stdin" + // Stdout stream type. + Stdout StreamType = "stdout" + // Stderr stream type. + Stderr StreamType = "stderr" +) + +type wgCloser struct { + ctx context.Context + wg *sync.WaitGroup + set []io.Closer + cancel context.CancelFunc +} + +func (g *wgCloser) Wait() { + g.wg.Wait() +} + +func (g *wgCloser) Close() { + for _, f := range g.set { + f.Close() + } +} + +func (g *wgCloser) Cancel() { + g.cancel() +} + +// newFifos creates fifos directory for a container. +func newFifos(root, id string, tty, stdin bool) (*containerd.FIFOSet, error) { + root = filepath.Join(root, "io") + if err := os.MkdirAll(root, 0700); err != nil { + return nil, err + } + dir, err := ioutil.TempDir(root, "") + if err != nil { + return nil, err + } + fifos := &containerd.FIFOSet{ + Dir: dir, + In: filepath.Join(dir, id+"-stdin"), + Out: filepath.Join(dir, id+"-stdout"), + Err: filepath.Join(dir, id+"-stderr"), + Terminal: tty, + } + if !stdin { + fifos.In = "" + } + return fifos, nil +} + +type stdioPipes struct { + stdin io.WriteCloser + stdout io.ReadCloser + stderr io.ReadCloser +} + +// newStdioPipes creates actual fifos for stdio. +func newStdioPipes(fifos *containerd.FIFOSet) (_ *stdioPipes, _ *wgCloser, err error) { + var ( + f io.ReadWriteCloser + set []io.Closer + ctx, cancel = context.WithCancel(context.Background()) + p = &stdioPipes{} + ) + defer func() { + if err != nil { + for _, f := range set { + f.Close() + } + cancel() + } + }() + + if fifos.In != "" { + if f, err = fifo.OpenFifo(ctx, fifos.In, syscall.O_WRONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil { + return nil, nil, err + } + p.stdin = f + set = append(set, f) + } + + if f, err = fifo.OpenFifo(ctx, fifos.Out, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil { + return nil, nil, err + } + p.stdout = f + set = append(set, f) + + if f, err = fifo.OpenFifo(ctx, fifos.Err, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil { + return nil, nil, err + } + p.stderr = f + set = append(set, f) + + return p, &wgCloser{ + wg: &sync.WaitGroup{}, + set: set, + ctx: ctx, + cancel: cancel, + }, nil +} diff --git a/pkg/server/io/io.go b/pkg/server/io/io.go deleted file mode 100644 index 0fab2aca0..000000000 --- a/pkg/server/io/io.go +++ /dev/null @@ -1,384 +0,0 @@ -/* -Copyright 2017 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package agents - -import ( - "errors" - "io" - "io/ioutil" - "os" - "path/filepath" - "strings" - "sync" - "syscall" - - "github.com/containerd/containerd" - "github.com/containerd/fifo" - "github.com/golang/glog" - "golang.org/x/net/context" - - cioutil "github.com/kubernetes-incubator/cri-containerd/pkg/ioutil" - "github.com/kubernetes-incubator/cri-containerd/pkg/util" -) - -// StreamType is the type of the stream, stdout/stderr. -type StreamType string - -const ( - // Stdin stream type. - Stdin StreamType = "stdin" - // Stdout stream type. - Stdout StreamType = "stdout" - // Stderr stream type. - Stderr StreamType = "stderr" -) - -type wgCloser struct { - ctx context.Context - wg *sync.WaitGroup - set []io.Closer - cancel context.CancelFunc -} - -func (g *wgCloser) Wait() { - g.wg.Wait() -} - -func (g *wgCloser) Close() { - for _, f := range g.set { - f.Close() - } -} - -func (g *wgCloser) Cancel() { - g.cancel() -} - -// streamKey generates a key for the stream. -func streamKey(id, name string, stream StreamType) string { - return strings.Join([]string{id, name, string(stream)}, "-") -} - -// ContainerIO holds the container io. -type ContainerIO struct { - id string - - dir string - stdinPath string - stdoutPath string - stderrPath string - - // Configs for the io. - tty bool - openStdin bool - root string - - stdin io.WriteCloser - stdout *cioutil.WriterGroup - stderr *cioutil.WriterGroup - - closer *wgCloser -} - -var _ containerd.IO = &ContainerIO{} - -// Opts sets specific information to newly created ContainerIO. -type Opts func(*ContainerIO) error - -// WithStdinOpen enables stdin of the container io. -func WithStdinOpen(open bool) Opts { - return func(c *ContainerIO) error { - c.openStdin = open - return nil - } -} - -// WithOutput adds output stream to the container io. -func WithOutput(name string, stdout, stderr io.WriteCloser) Opts { - return func(c *ContainerIO) error { - if stdout != nil { - if err := c.stdout.Add(streamKey(c.id, name, Stdout), stdout); err != nil { - return err - } - } - if stderr != nil { - if err := c.stderr.Add(streamKey(c.id, name, Stderr), stderr); err != nil { - return err - } - } - return nil - } -} - -// WithTerminal enables tty of the container io. -func WithTerminal(tty bool) Opts { - return func(c *ContainerIO) error { - c.tty = tty - return nil - } -} - -// WithRootDir sets the root directory to create container streams. -func WithRootDir(root string) Opts { - return func(c *ContainerIO) error { - c.root = root - return nil - } -} - -// WithFIFOs specifies existing fifos for the container io. -func WithFIFOs(dir, stdin, stdout, stderr string) Opts { - return func(c *ContainerIO) error { - c.dir = dir - c.stdinPath = stdin - c.stdoutPath = stdout - c.stderrPath = stderr - return nil - } -} - -// NewContainerIO creates container io. -func NewContainerIO(id string, opts ...Opts) (*ContainerIO, error) { - c := &ContainerIO{ - id: id, - stdout: cioutil.NewWriterGroup(), - stderr: cioutil.NewWriterGroup(), - root: os.TempDir(), - } - for _, opt := range opts { - if err := opt(c); err != nil { - return nil, err - } - } - if c.dir != "" { - // Return if fifos are already set. - return c, nil - } - fifos, err := newFifos(c.root, id) - if err != nil { - return nil, err - } - c.dir = fifos.Dir - c.stdoutPath = fifos.Out - c.stderrPath = fifos.Err - if c.openStdin { - c.stdinPath = fifos.In - } - return c, nil -} - -// Config returns io config. -func (c *ContainerIO) Config() containerd.IOConfig { - return containerd.IOConfig{ - Terminal: c.tty, - Stdin: c.stdinPath, - Stdout: c.stdoutPath, - Stderr: c.stderrPath, - } -} - -// Pipe creates container fifos and pipe container output -// to output stream. -func (c *ContainerIO) Pipe() (err error) { - var ( - f io.ReadWriteCloser - set []io.Closer - ctx, cancel = context.WithCancel(context.Background()) - wg = &sync.WaitGroup{} - ) - defer func() { - if err != nil { - for _, f := range set { - f.Close() - } - cancel() - } - }() - if c.stdinPath != "" { - if f, err = fifo.OpenFifo(ctx, c.stdinPath, syscall.O_WRONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil { - return err - } - c.stdin = f - set = append(set, f) - } - - if f, err = fifo.OpenFifo(ctx, c.stdoutPath, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil { - return err - } - set = append(set, f) - wg.Add(1) - go func(r io.ReadCloser) { - if _, err := io.Copy(c.stdout, r); err != nil { - glog.Errorf("Failed to redirect stdout of container %q: %v", c.id, err) - } - r.Close() - c.stdout.Close() - wg.Done() - glog.V(2).Infof("Finish piping stdout of container %q", c.id) - }(f) - - if f, err = fifo.OpenFifo(ctx, c.stderrPath, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil { - return err - } - set = append(set, f) - if !c.tty { - wg.Add(1) - go func(r io.ReadCloser) { - if _, err := io.Copy(c.stderr, r); err != nil { - glog.Errorf("Failed to redirect stderr of container %q: %v", c.id, err) - } - r.Close() - c.stderr.Close() - wg.Done() - glog.V(2).Infof("Finish piping stderr of container %q", c.id) - }(f) - } - c.closer = &wgCloser{ - wg: wg, - set: set, - ctx: ctx, - cancel: cancel, - } - return nil -} - -// AttachOptions specifies how to attach to a container. -type AttachOptions struct { - Stdin io.Reader - Stdout io.WriteCloser - Stderr io.WriteCloser - Tty bool - StdinOnce bool - // CloseStdin is the function to close container stdin. - CloseStdin func() error -} - -// Attach attaches container stdio. -// TODO(random-liu): Use pools.Copy in docker to reduce memory usage? -func (c *ContainerIO) Attach(opts AttachOptions) error { - if c.closer == nil { - return errors.New("container io is not initialized") - } - var wg sync.WaitGroup - key := util.GenerateID() - stdinKey := streamKey(c.id, "attach-"+key, Stdin) - stdoutKey := streamKey(c.id, "attach-"+key, Stdout) - stderrKey := streamKey(c.id, "attach-"+key, Stderr) - - var stdinRC io.ReadCloser - if c.stdin != nil && opts.Stdin != nil { - // Create a wrapper of stdin which could be closed. Note that the - // wrapper doesn't close the actual stdin, it only stops io.Copy. - // The actual stdin will be closed by stream server. - stdinRC = cioutil.NewWrapReadCloser(opts.Stdin) - // Also increase wait group here, so that `closer.Wait` will - // also wait for this fifo to be closed. - wg.Add(1) - go func() { - if _, err := io.Copy(c.stdin, stdinRC); err != nil { - glog.Errorf("Failed to redirect stdin for container attach %q: %v", c.id, err) - } - glog.V(2).Infof("Attach stream %q closed", stdinKey) - if opts.StdinOnce && !opts.Tty { - // Due to kubectl requirements and current docker behavior, when (opts.StdinOnce && - // opts.Tty) we have to close container stdin and keep stdout and stderr open until - // container stops. - c.stdin.Close() - // Also closes the containerd side. - if err := opts.CloseStdin(); err != nil { - glog.Errorf("Failed to close stdin for container %q: %v", c.id, err) - } - } else { - if opts.Stdout != nil { - c.stdout.Remove(stdoutKey) - } - if opts.Stderr != nil { - c.stderr.Remove(stderrKey) - } - } - wg.Done() - }() - } - - attachStream := func(key string, close <-chan struct{}) { - <-close - glog.V(2).Infof("Attach stream %q closed", key) - // Make sure stdin gets closed. - if stdinRC != nil { - stdinRC.Close() - } - wg.Done() - } - - if opts.Stdout != nil { - wg.Add(1) - wc, close := cioutil.NewWriteCloseInformer(opts.Stdout) - if err := c.stdout.Add(stdoutKey, wc); err != nil { - return err - } - go attachStream(stdoutKey, close) - } - if !opts.Tty && opts.Stderr != nil { - wg.Add(1) - wc, close := cioutil.NewWriteCloseInformer(opts.Stderr) - if err := c.stderr.Add(stderrKey, wc); err != nil { - return err - } - go attachStream(stderrKey, close) - } - wg.Wait() - return nil -} - -// Cancel cancels container io. -func (c *ContainerIO) Cancel() { - c.closer.Cancel() -} - -// Wait waits container io to finish. -func (c *ContainerIO) Wait() { - c.closer.Wait() -} - -// Close closes all FIFOs. -func (c *ContainerIO) Close() error { - if c.closer != nil { - c.closer.Close() - } - if c.dir != "" { - return os.RemoveAll(c.dir) - } - return nil -} - -// newFifos creates fifos directory for a container. -func newFifos(root, id string) (*containerd.FIFOSet, error) { - root = filepath.Join(root, "io") - if err := os.MkdirAll(root, 0700); err != nil { - return nil, err - } - dir, err := ioutil.TempDir(root, "") - if err != nil { - return nil, err - } - return &containerd.FIFOSet{ - Dir: dir, - In: filepath.Join(dir, id+"-stdin"), - Out: filepath.Join(dir, id+"-stdout"), - Err: filepath.Join(dir, id+"-stderr"), - }, nil -} diff --git a/pkg/server/io/logger.go b/pkg/server/io/logger.go index 4b88d3e52..da2c4fc0e 100644 --- a/pkg/server/io/logger.go +++ b/pkg/server/io/logger.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package agents +package io import ( "bufio" diff --git a/pkg/server/io/logger_test.go b/pkg/server/io/logger_test.go index 746768ba6..9b788c8b2 100644 --- a/pkg/server/io/logger_test.go +++ b/pkg/server/io/logger_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package agents +package io import ( "bytes" diff --git a/pkg/server/restart.go b/pkg/server/restart.go index 80f56f31d..b180a3fd3 100644 --- a/pkg/server/restart.go +++ b/pkg/server/restart.go @@ -160,16 +160,13 @@ func loadContainer(ctx context.Context, cntr containerd.Container, containerDir return nil, err } containerIO, err = cio.NewContainerIO(id, - cio.WithTerminal(fifos.Terminal), - cio.WithFIFOs(fifos.Dir, fifos.In, fifos.Out, fifos.Err), + cio.WithFIFOs(fifos), cio.WithOutput("log", stdoutWC, stderrWC), ) if err != nil { return nil, err } - if err := containerIO.Pipe(); err != nil { - return nil, err - } + containerIO.Pipe() return containerIO, nil }) if err != nil && !errdefs.IsNotFound(err) { @@ -196,14 +193,11 @@ func loadContainer(ctx context.Context, cntr containerd.Container, containerDir // to generate container status. switch status.State() { case runtime.ContainerState_CONTAINER_CREATED: - // TODO(random-liu): Do not create fifos directory in NewContainerIO. - // container is in created state, create container io for it. // NOTE: Another possibility is that we've tried to start the container, but // cri-containerd got restarted just during that. In that case, we still // treat the container as `CREATED`. containerIO, err = cio.NewContainerIO(id, - cio.WithStdinOpen(meta.Config.GetStdin()), - cio.WithTerminal(meta.Config.GetTty()), + cio.WithNewFIFOs(containerDir, meta.Config.GetTty(), meta.Config.GetStdin()), ) if err != nil { return container, fmt.Errorf("failed to create container io: %v", err)