237 lines
		
	
	
		
			6.2 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			237 lines
		
	
	
		
			6.2 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| /*
 | |
|    Copyright The containerd Authors.
 | |
| 
 | |
|    Licensed under the Apache License, Version 2.0 (the "License");
 | |
|    you may not use this file except in compliance with the License.
 | |
|    You may obtain a copy of the License at
 | |
| 
 | |
|        http://www.apache.org/licenses/LICENSE-2.0
 | |
| 
 | |
|    Unless required by applicable law or agreed to in writing, software
 | |
|    distributed under the License is distributed on an "AS IS" BASIS,
 | |
|    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
|    See the License for the specific language governing permissions and
 | |
|    limitations under the License.
 | |
| */
 | |
| 
 | |
| package io
 | |
| 
 | |
| import (
 | |
| 	"errors"
 | |
| 	"io"
 | |
| 	"strings"
 | |
| 	"sync"
 | |
| 
 | |
| 	"github.com/containerd/containerd/cio"
 | |
| 	"github.com/sirupsen/logrus"
 | |
| 
 | |
| 	"github.com/containerd/containerd/pkg/cri/util"
 | |
| 	cioutil "github.com/containerd/containerd/pkg/ioutil"
 | |
| )
 | |
| 
 | |
| // streamKey generates a key for the stream.
 | |
| func streamKey(id, name string, stream StreamType) string {
 | |
| 	return strings.Join([]string{id, name, string(stream)}, "-")
 | |
| }
 | |
| 
 | |
| // ContainerIO holds the container io.
 | |
| type ContainerIO struct {
 | |
| 	id string
 | |
| 
 | |
| 	fifos *cio.FIFOSet
 | |
| 	*stdioPipes
 | |
| 
 | |
| 	stdoutGroup *cioutil.WriterGroup
 | |
| 	stderrGroup *cioutil.WriterGroup
 | |
| 
 | |
| 	closer *wgCloser
 | |
| }
 | |
| 
 | |
| var _ cio.IO = &ContainerIO{}
 | |
| 
 | |
| // ContainerIOOpts sets specific information to newly created ContainerIO.
 | |
| type ContainerIOOpts func(*ContainerIO) error
 | |
| 
 | |
| // WithFIFOs specifies existing fifos for the container io.
 | |
| func WithFIFOs(fifos *cio.FIFOSet) ContainerIOOpts {
 | |
| 	return func(c *ContainerIO) error {
 | |
| 		c.fifos = fifos
 | |
| 		return nil
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // WithNewFIFOs creates new fifos for the container io.
 | |
| func WithNewFIFOs(root string, tty, stdin bool) ContainerIOOpts {
 | |
| 	return func(c *ContainerIO) error {
 | |
| 		fifos, err := newFifos(root, c.id, tty, stdin)
 | |
| 		if err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 		return WithFIFOs(fifos)(c)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // NewContainerIO creates container io.
 | |
| func NewContainerIO(id string, opts ...ContainerIOOpts) (_ *ContainerIO, err error) {
 | |
| 	c := &ContainerIO{
 | |
| 		id:          id,
 | |
| 		stdoutGroup: cioutil.NewWriterGroup(),
 | |
| 		stderrGroup: cioutil.NewWriterGroup(),
 | |
| 	}
 | |
| 	for _, opt := range opts {
 | |
| 		if err := opt(c); err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 	}
 | |
| 	if c.fifos == nil {
 | |
| 		return nil, errors.New("fifos are not set")
 | |
| 	}
 | |
| 	// Create actual fifos.
 | |
| 	stdio, closer, err := newStdioPipes(c.fifos)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	c.stdioPipes = stdio
 | |
| 	c.closer = closer
 | |
| 	return c, nil
 | |
| }
 | |
| 
 | |
| // Config returns io config.
 | |
| func (c *ContainerIO) Config() cio.Config {
 | |
| 	return c.fifos.Config
 | |
| }
 | |
| 
 | |
| // Pipe creates container fifos and pipe container output
 | |
| // to output stream.
 | |
| func (c *ContainerIO) Pipe() {
 | |
| 	wg := c.closer.wg
 | |
| 	if c.stdout != nil {
 | |
| 		wg.Add(1)
 | |
| 		go func() {
 | |
| 			if _, err := io.Copy(c.stdoutGroup, c.stdout); err != nil {
 | |
| 				logrus.WithError(err).Errorf("Failed to pipe stdout of container %q", c.id)
 | |
| 			}
 | |
| 			c.stdout.Close()
 | |
| 			c.stdoutGroup.Close()
 | |
| 			wg.Done()
 | |
| 			logrus.Debugf("Finish piping stdout of container %q", c.id)
 | |
| 		}()
 | |
| 	}
 | |
| 
 | |
| 	if !c.fifos.Terminal && c.stderr != nil {
 | |
| 		wg.Add(1)
 | |
| 		go func() {
 | |
| 			if _, err := io.Copy(c.stderrGroup, c.stderr); err != nil {
 | |
| 				logrus.WithError(err).Errorf("Failed to pipe stderr of container %q", c.id)
 | |
| 			}
 | |
| 			c.stderr.Close()
 | |
| 			c.stderrGroup.Close()
 | |
| 			wg.Done()
 | |
| 			logrus.Debugf("Finish piping stderr of container %q", c.id)
 | |
| 		}()
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // Attach attaches container stdio.
 | |
| // TODO(random-liu): Use pools.Copy in docker to reduce memory usage?
 | |
| func (c *ContainerIO) Attach(opts AttachOptions) {
 | |
| 	var wg sync.WaitGroup
 | |
| 	key := util.GenerateID()
 | |
| 	stdinKey := streamKey(c.id, "attach-"+key, Stdin)
 | |
| 	stdoutKey := streamKey(c.id, "attach-"+key, Stdout)
 | |
| 	stderrKey := streamKey(c.id, "attach-"+key, Stderr)
 | |
| 
 | |
| 	var stdinStreamRC io.ReadCloser
 | |
| 	if c.stdin != nil && opts.Stdin != nil {
 | |
| 		// Create a wrapper of stdin which could be closed. Note that the
 | |
| 		// wrapper doesn't close the actual stdin, it only stops io.Copy.
 | |
| 		// The actual stdin will be closed by stream server.
 | |
| 		stdinStreamRC = cioutil.NewWrapReadCloser(opts.Stdin)
 | |
| 		wg.Add(1)
 | |
| 		go func() {
 | |
| 			if _, err := io.Copy(c.stdin, stdinStreamRC); err != nil {
 | |
| 				logrus.WithError(err).Errorf("Failed to pipe stdin for container attach %q", c.id)
 | |
| 			}
 | |
| 			logrus.Infof("Attach stream %q closed", stdinKey)
 | |
| 			if opts.StdinOnce && !opts.Tty {
 | |
| 				// Due to kubectl requirements and current docker behavior, when (opts.StdinOnce &&
 | |
| 				// opts.Tty) we have to close container stdin and keep stdout and stderr open until
 | |
| 				// container stops.
 | |
| 				c.stdin.Close()
 | |
| 				// Also closes the containerd side.
 | |
| 				if err := opts.CloseStdin(); err != nil {
 | |
| 					logrus.WithError(err).Errorf("Failed to close stdin for container %q", c.id)
 | |
| 				}
 | |
| 			} else {
 | |
| 				if opts.Stdout != nil {
 | |
| 					c.stdoutGroup.Remove(stdoutKey)
 | |
| 				}
 | |
| 				if opts.Stderr != nil {
 | |
| 					c.stderrGroup.Remove(stderrKey)
 | |
| 				}
 | |
| 			}
 | |
| 			wg.Done()
 | |
| 		}()
 | |
| 	}
 | |
| 
 | |
| 	attachStream := func(key string, close <-chan struct{}) {
 | |
| 		<-close
 | |
| 		logrus.Infof("Attach stream %q closed", key)
 | |
| 		// Make sure stdin gets closed.
 | |
| 		if stdinStreamRC != nil {
 | |
| 			stdinStreamRC.Close()
 | |
| 		}
 | |
| 		wg.Done()
 | |
| 	}
 | |
| 
 | |
| 	if opts.Stdout != nil {
 | |
| 		wg.Add(1)
 | |
| 		wc, close := cioutil.NewWriteCloseInformer(opts.Stdout)
 | |
| 		c.stdoutGroup.Add(stdoutKey, wc)
 | |
| 		go attachStream(stdoutKey, close)
 | |
| 	}
 | |
| 	if !opts.Tty && opts.Stderr != nil {
 | |
| 		wg.Add(1)
 | |
| 		wc, close := cioutil.NewWriteCloseInformer(opts.Stderr)
 | |
| 		c.stderrGroup.Add(stderrKey, wc)
 | |
| 		go attachStream(stderrKey, close)
 | |
| 	}
 | |
| 	wg.Wait()
 | |
| }
 | |
| 
 | |
| // AddOutput adds new write closers to the container stream, and returns existing
 | |
| // write closers if there are any.
 | |
| func (c *ContainerIO) AddOutput(name string, stdout, stderr io.WriteCloser) (io.WriteCloser, io.WriteCloser) {
 | |
| 	var oldStdout, oldStderr io.WriteCloser
 | |
| 	if stdout != nil {
 | |
| 		key := streamKey(c.id, name, Stdout)
 | |
| 		oldStdout = c.stdoutGroup.Get(key)
 | |
| 		c.stdoutGroup.Add(key, stdout)
 | |
| 	}
 | |
| 	if stderr != nil {
 | |
| 		key := streamKey(c.id, name, Stderr)
 | |
| 		oldStderr = c.stderrGroup.Get(key)
 | |
| 		c.stderrGroup.Add(key, stderr)
 | |
| 	}
 | |
| 	return oldStdout, oldStderr
 | |
| }
 | |
| 
 | |
| // Cancel cancels container io.
 | |
| func (c *ContainerIO) Cancel() {
 | |
| 	c.closer.Cancel()
 | |
| }
 | |
| 
 | |
| // Wait waits container io to finish.
 | |
| func (c *ContainerIO) Wait() {
 | |
| 	c.closer.Wait()
 | |
| }
 | |
| 
 | |
| // Close closes all FIFOs.
 | |
| func (c *ContainerIO) Close() error {
 | |
| 	c.closer.Close()
 | |
| 	if c.fifos != nil {
 | |
| 		return c.fifos.Close()
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | 
