/* Copyright 2017 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package agents import ( "errors" "io" "io/ioutil" "os" "path/filepath" "strings" "sync" "syscall" "github.com/containerd/containerd" "github.com/containerd/fifo" "github.com/golang/glog" "golang.org/x/net/context" cioutil "github.com/kubernetes-incubator/cri-containerd/pkg/ioutil" "github.com/kubernetes-incubator/cri-containerd/pkg/util" ) // StreamType is the type of the stream, stdout/stderr. type StreamType string const ( // Stdin stream type. Stdin StreamType = "stdin" // Stdout stream type. Stdout StreamType = "stdout" // Stderr stream type. Stderr StreamType = "stderr" ) type wgCloser struct { ctx context.Context wg *sync.WaitGroup set []io.Closer cancel context.CancelFunc } func (g *wgCloser) Wait() { g.wg.Wait() } func (g *wgCloser) Close() { for _, f := range g.set { f.Close() } } func (g *wgCloser) Cancel() { g.cancel() } // streamKey generates a key for the stream. func streamKey(id, name string, stream StreamType) string { return strings.Join([]string{id, name, string(stream)}, "-") } // ContainerIO holds the container io. type ContainerIO struct { dir string stdinPath string stdoutPath string stderrPath string id string tty bool stdin bool stdout *cioutil.WriterGroup stderr *cioutil.WriterGroup root string closer *wgCloser } var _ containerd.IO = &ContainerIO{} // Opts sets specific information to newly created ContainerIO. type Opts func(*ContainerIO) error // WithStdin enables stdin of the container io. func WithStdin(stdin bool) Opts { return func(c *ContainerIO) error { c.stdin = stdin return nil } } // WithOutput adds output stream to the container io. func WithOutput(name string, stdout, stderr io.WriteCloser) Opts { return func(c *ContainerIO) error { if stdout != nil { if err := c.stdout.Add(streamKey(c.id, name, Stdout), stdout); err != nil { return err } } if stderr != nil { if err := c.stderr.Add(streamKey(c.id, name, Stderr), stderr); err != nil { return err } } return nil } } // WithTerminal enables tty of the container io. func WithTerminal(tty bool) Opts { return func(c *ContainerIO) error { c.tty = tty return nil } } // WithRootDir sets the root directory to create container streams. func WithRootDir(root string) Opts { return func(c *ContainerIO) error { c.root = root return nil } } // WithFIFOs specifies existing fifos for the container io. func WithFIFOs(dir, stdin, stdout, stderr string) Opts { return func(c *ContainerIO) error { c.dir = dir c.stdinPath = stdin c.stdoutPath = stdout c.stderrPath = stderr return nil } } // NewContainerIO creates container io. func NewContainerIO(id string, opts ...Opts) (*ContainerIO, error) { c := &ContainerIO{ id: id, stdout: cioutil.NewWriterGroup(), stderr: cioutil.NewWriterGroup(), root: os.TempDir(), } for _, opt := range opts { if err := opt(c); err != nil { return nil, err } } if c.dir != "" { // Return if fifos are already set. return c, nil } fifos, err := newFifos(c.root, id) if err != nil { return nil, err } c.dir = fifos.Dir c.stdoutPath = fifos.Out c.stderrPath = fifos.Err if c.stdin { c.stdinPath = fifos.In } return c, nil } // Config returns io config. func (c *ContainerIO) Config() containerd.IOConfig { return containerd.IOConfig{ Terminal: c.tty, Stdin: c.stdinPath, Stdout: c.stdoutPath, Stderr: c.stderrPath, } } // Pipe creates container fifos and pipe container output // to output stream. func (c *ContainerIO) Pipe() (err error) { var ( f io.ReadWriteCloser set []io.Closer ctx, cancel = context.WithCancel(context.Background()) wg = &sync.WaitGroup{} ) defer func() { if err != nil { for _, f := range set { f.Close() } cancel() } }() if c.stdinPath != "" { // Just create the stdin, only open it when used. if f, err = fifo.OpenFifo(ctx, c.stdinPath, syscall.O_WRONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil { return err } f.Close() } if f, err = fifo.OpenFifo(ctx, c.stdoutPath, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil { return err } set = append(set, f) wg.Add(1) go func(r io.ReadCloser) { if _, err := io.Copy(c.stdout, r); err != nil { glog.Errorf("Failed to redirect stdout of container %q: %v", c.id, err) } r.Close() c.stdout.Close() wg.Done() glog.V(2).Infof("Finish piping stdout of container %q", c.id) }(f) if f, err = fifo.OpenFifo(ctx, c.stderrPath, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil { return err } set = append(set, f) if !c.tty { wg.Add(1) go func(r io.ReadCloser) { if _, err := io.Copy(c.stderr, r); err != nil { glog.Errorf("Failed to redirect stderr of container %q: %v", c.id, err) } r.Close() c.stderr.Close() wg.Done() glog.V(2).Infof("Finish piping stderr of container %q", c.id) }(f) } c.closer = &wgCloser{ wg: wg, set: set, ctx: ctx, cancel: cancel, } return nil } // AttachOptions specifies how to attach to a container. type AttachOptions struct { Stdin io.Reader Stdout io.WriteCloser Stderr io.WriteCloser Tty bool StdinOnce bool // CloseStdin is the function to close container stdin. CloseStdin func() error } // Attach attaches container stdio. // TODO(random-liu): Use pools.Copy in docker to reduce memory usage? func (c *ContainerIO) Attach(opts AttachOptions) error { if c.closer == nil { return errors.New("container io is not initialized") } var wg sync.WaitGroup key := util.GenerateID() stdinKey := streamKey(c.id, "attach-"+key, Stdin) stdoutKey := streamKey(c.id, "attach-"+key, Stdout) stderrKey := streamKey(c.id, "attach-"+key, Stderr) var stdinRC io.ReadCloser if c.stdinPath != "" && opts.Stdin != nil { f, err := fifo.OpenFifo(c.closer.ctx, c.stdinPath, syscall.O_WRONLY|syscall.O_NONBLOCK, 0700) if err != nil { return err } // Create a wrapper of stdin which could be closed. Note that the // wrapper doesn't close the actual stdin, it only stops io.Copy. // The actual stdin will be closed by stream server. stdinRC = cioutil.NewWrapReadCloser(opts.Stdin) // Also increase wait group here, so that `closer.Wait` will // also wait for this fifo to be closed. c.closer.wg.Add(1) wg.Add(1) go func(w io.WriteCloser) { if _, err := io.Copy(w, stdinRC); err != nil && err != io.ErrClosedPipe { glog.Errorf("Failed to redirect stdin for container attach %q: %v", c.id, err) } w.Close() glog.V(2).Infof("Attach stream %q closed", stdinKey) if opts.StdinOnce && !opts.Tty { // Due to kubectl requirements and current docker behavior, when (opts.StdinOnce && // opts.Tty) we have to close container stdin and keep stdout and stderr open until // container stops. if err := opts.CloseStdin(); err != nil { glog.Errorf("Failed to close stdin for container %q: %v", c.id, err) } } else { if opts.Stdout != nil { c.stdout.Remove(stdoutKey) } if opts.Stderr != nil { c.stderr.Remove(stderrKey) } } wg.Done() c.closer.wg.Done() }(f) } attachStream := func(key string, close <-chan struct{}) { <-close glog.V(2).Infof("Attach stream %q closed", key) // Make sure stdin gets closed. if stdinRC != nil { stdinRC.Close() } wg.Done() } if opts.Stdout != nil { wg.Add(1) wc, close := cioutil.NewWriteCloseInformer(opts.Stdout) if err := c.stdout.Add(stdoutKey, wc); err != nil { return err } go attachStream(stdoutKey, close) } if opts.Tty && opts.Stderr != nil { wg.Add(1) wc, close := cioutil.NewWriteCloseInformer(opts.Stderr) if err := c.stderr.Add(stderrKey, wc); err != nil { return err } go attachStream(stderrKey, close) } wg.Wait() return nil } // Cancel cancels container io. func (c *ContainerIO) Cancel() { c.closer.Cancel() } // Wait waits container io to finish. func (c *ContainerIO) Wait() { c.closer.Wait() } // Close closes all FIFOs. func (c *ContainerIO) Close() error { if c.closer != nil { c.closer.Close() } if c.dir != "" { return os.RemoveAll(c.dir) } return nil } // newFifos creates fifos directory for a container. func newFifos(root, id string) (*containerd.FIFOSet, error) { root = filepath.Join(root, "io") if err := os.MkdirAll(root, 0700); err != nil { return nil, err } dir, err := ioutil.TempDir(root, "") if err != nil { return nil, err } return &containerd.FIFOSet{ Dir: dir, In: filepath.Join(dir, id+"-stdin"), Out: filepath.Join(dir, id+"-stdout"), Err: filepath.Join(dir, id+"-stderr"), }, nil }