360 lines
		
	
	
		
			7.3 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			360 lines
		
	
	
		
			7.3 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| /*
 | |
|    Copyright The containerd Authors.
 | |
| 
 | |
|    Licensed under the Apache License, Version 2.0 (the "License");
 | |
|    you may not use this file except in compliance with the License.
 | |
|    You may obtain a copy of the License at
 | |
| 
 | |
|        http://www.apache.org/licenses/LICENSE-2.0
 | |
| 
 | |
|    Unless required by applicable law or agreed to in writing, software
 | |
|    distributed under the License is distributed on an "AS IS" BASIS,
 | |
|    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
|    See the License for the specific language governing permissions and
 | |
|    limitations under the License.
 | |
| */
 | |
| 
 | |
| package cio
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"errors"
 | |
| 	"fmt"
 | |
| 	"io"
 | |
| 	"net/url"
 | |
| 	"os"
 | |
| 	"path/filepath"
 | |
| 	"strings"
 | |
| 	"sync"
 | |
| 
 | |
| 	"github.com/containerd/containerd/defaults"
 | |
| )
 | |
| 
 | |
| var bufPool = sync.Pool{
 | |
| 	New: func() interface{} {
 | |
| 		buffer := make([]byte, 32<<10)
 | |
| 		return &buffer
 | |
| 	},
 | |
| }
 | |
| 
 | |
| // Config holds the IO configurations.
 | |
| type Config struct {
 | |
| 	// Terminal is true if one has been allocated
 | |
| 	Terminal bool
 | |
| 	// Stdin path
 | |
| 	Stdin string
 | |
| 	// Stdout path
 | |
| 	Stdout string
 | |
| 	// Stderr path
 | |
| 	Stderr string
 | |
| }
 | |
| 
 | |
| // IO holds the io information for a task or process
 | |
| type IO interface {
 | |
| 	// Config returns the IO configuration.
 | |
| 	Config() Config
 | |
| 	// Cancel aborts all current io operations.
 | |
| 	Cancel()
 | |
| 	// Wait blocks until all io copy operations have completed.
 | |
| 	Wait()
 | |
| 	// Close cleans up all open io resources. Cancel() is always called before
 | |
| 	// Close()
 | |
| 	Close() error
 | |
| }
 | |
| 
 | |
| // Creator creates new IO sets for a task
 | |
| type Creator func(id string) (IO, error)
 | |
| 
 | |
| // Attach allows callers to reattach to running tasks
 | |
| //
 | |
| // There should only be one reader for a task's IO set
 | |
| // because fifo's can only be read from one reader or the output
 | |
| // will be sent only to the first reads
 | |
| type Attach func(*FIFOSet) (IO, error)
 | |
| 
 | |
| // FIFOSet is a set of file paths to FIFOs for a task's standard IO streams
 | |
| type FIFOSet struct {
 | |
| 	Config
 | |
| 	close func() error
 | |
| }
 | |
| 
 | |
| // Close the FIFOSet
 | |
| func (f *FIFOSet) Close() error {
 | |
| 	if f != nil && f.close != nil {
 | |
| 		return f.close()
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // NewFIFOSet returns a new FIFOSet from a Config and a close function
 | |
| func NewFIFOSet(config Config, close func() error) *FIFOSet {
 | |
| 	return &FIFOSet{Config: config, close: close}
 | |
| }
 | |
| 
 | |
| // Streams used to configure a Creator or Attach
 | |
| type Streams struct {
 | |
| 	Stdin    io.Reader
 | |
| 	Stdout   io.Writer
 | |
| 	Stderr   io.Writer
 | |
| 	Terminal bool
 | |
| 	FIFODir  string
 | |
| }
 | |
| 
 | |
| // Opt customize options for creating a Creator or Attach
 | |
| type Opt func(*Streams)
 | |
| 
 | |
| // WithStdio sets stream options to the standard input/output streams
 | |
| func WithStdio(opt *Streams) {
 | |
| 	WithStreams(os.Stdin, os.Stdout, os.Stderr)(opt)
 | |
| }
 | |
| 
 | |
| // WithTerminal sets the terminal option
 | |
| func WithTerminal(opt *Streams) {
 | |
| 	opt.Terminal = true
 | |
| }
 | |
| 
 | |
| // WithStreams sets the stream options to the specified Reader and Writers
 | |
| func WithStreams(stdin io.Reader, stdout, stderr io.Writer) Opt {
 | |
| 	return func(opt *Streams) {
 | |
| 		opt.Stdin = stdin
 | |
| 		opt.Stdout = stdout
 | |
| 		opt.Stderr = stderr
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // WithFIFODir sets the fifo directory.
 | |
| // e.g. "/run/containerd/fifo", "/run/users/1001/containerd/fifo"
 | |
| func WithFIFODir(dir string) Opt {
 | |
| 	return func(opt *Streams) {
 | |
| 		opt.FIFODir = dir
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // NewCreator returns an IO creator from the options
 | |
| func NewCreator(opts ...Opt) Creator {
 | |
| 	streams := &Streams{}
 | |
| 	for _, opt := range opts {
 | |
| 		opt(streams)
 | |
| 	}
 | |
| 	if streams.FIFODir == "" {
 | |
| 		streams.FIFODir = defaults.DefaultFIFODir
 | |
| 	}
 | |
| 	return func(id string) (IO, error) {
 | |
| 		fifos, err := NewFIFOSetInDir(streams.FIFODir, id, streams.Terminal)
 | |
| 		if err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 		if streams.Stdin == nil {
 | |
| 			fifos.Stdin = ""
 | |
| 		}
 | |
| 		if streams.Stdout == nil {
 | |
| 			fifos.Stdout = ""
 | |
| 		}
 | |
| 		if streams.Stderr == nil {
 | |
| 			fifos.Stderr = ""
 | |
| 		}
 | |
| 		return copyIO(fifos, streams)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // NewAttach attaches the existing io for a task to the provided io.Reader/Writers
 | |
| func NewAttach(opts ...Opt) Attach {
 | |
| 	streams := &Streams{}
 | |
| 	for _, opt := range opts {
 | |
| 		opt(streams)
 | |
| 	}
 | |
| 	return func(fifos *FIFOSet) (IO, error) {
 | |
| 		if fifos == nil {
 | |
| 			return nil, fmt.Errorf("cannot attach, missing fifos")
 | |
| 		}
 | |
| 		return copyIO(fifos, streams)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // NullIO redirects the container's IO into /dev/null
 | |
| func NullIO(_ string) (IO, error) {
 | |
| 	return &cio{}, nil
 | |
| }
 | |
| 
 | |
| // cio is a basic container IO implementation.
 | |
| type cio struct {
 | |
| 	config  Config
 | |
| 	wg      *sync.WaitGroup
 | |
| 	closers []io.Closer
 | |
| 	cancel  context.CancelFunc
 | |
| }
 | |
| 
 | |
| func (c *cio) Config() Config {
 | |
| 	return c.config
 | |
| }
 | |
| 
 | |
| func (c *cio) Wait() {
 | |
| 	if c.wg != nil {
 | |
| 		c.wg.Wait()
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (c *cio) Close() error {
 | |
| 	var lastErr error
 | |
| 	for _, closer := range c.closers {
 | |
| 		if closer == nil {
 | |
| 			continue
 | |
| 		}
 | |
| 		if err := closer.Close(); err != nil {
 | |
| 			lastErr = err
 | |
| 		}
 | |
| 	}
 | |
| 	return lastErr
 | |
| }
 | |
| 
 | |
| func (c *cio) Cancel() {
 | |
| 	if c.cancel != nil {
 | |
| 		c.cancel()
 | |
| 	}
 | |
| }
 | |
| 
 | |
| type pipes struct {
 | |
| 	Stdin  io.WriteCloser
 | |
| 	Stdout io.ReadCloser
 | |
| 	Stderr io.ReadCloser
 | |
| }
 | |
| 
 | |
| // DirectIO allows task IO to be handled externally by the caller
 | |
| type DirectIO struct {
 | |
| 	pipes
 | |
| 	cio
 | |
| }
 | |
| 
 | |
| var (
 | |
| 	_ IO = &DirectIO{}
 | |
| 	_ IO = &logURI{}
 | |
| )
 | |
| 
 | |
| // LogURI provides the raw logging URI
 | |
| func LogURI(uri *url.URL) Creator {
 | |
| 	return func(_ string) (IO, error) {
 | |
| 		return &logURI{
 | |
| 			config: Config{
 | |
| 				Stdout: uri.String(),
 | |
| 				Stderr: uri.String(),
 | |
| 			},
 | |
| 		}, nil
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // BinaryIO forwards container STDOUT|STDERR directly to a logging binary
 | |
| func BinaryIO(binary string, args map[string]string) Creator {
 | |
| 	return func(_ string) (IO, error) {
 | |
| 		uri, err := LogURIGenerator("binary", binary, args)
 | |
| 		if err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 
 | |
| 		res := uri.String()
 | |
| 		return &logURI{
 | |
| 			config: Config{
 | |
| 				Stdout: res,
 | |
| 				Stderr: res,
 | |
| 			},
 | |
| 		}, nil
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // TerminalBinaryIO forwards container STDOUT|STDERR directly to a logging binary
 | |
| // It also sets the terminal option to true
 | |
| func TerminalBinaryIO(binary string, args map[string]string) Creator {
 | |
| 	return func(_ string) (IO, error) {
 | |
| 		uri, err := LogURIGenerator("binary", binary, args)
 | |
| 		if err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 
 | |
| 		res := uri.String()
 | |
| 		return &logURI{
 | |
| 			config: Config{
 | |
| 				Stdout:   res,
 | |
| 				Stderr:   res,
 | |
| 				Terminal: true,
 | |
| 			},
 | |
| 		}, nil
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // LogFile creates a file on disk that logs the task's STDOUT,STDERR.
 | |
| // If the log file already exists, the logs will be appended to the file.
 | |
| func LogFile(path string) Creator {
 | |
| 	return func(_ string) (IO, error) {
 | |
| 		uri, err := LogURIGenerator("file", path, nil)
 | |
| 		if err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 
 | |
| 		res := uri.String()
 | |
| 		return &logURI{
 | |
| 			config: Config{
 | |
| 				Stdout: res,
 | |
| 				Stderr: res,
 | |
| 			},
 | |
| 		}, nil
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // LogURIGenerator is the helper to generate log uri with specific scheme.
 | |
| func LogURIGenerator(scheme string, path string, args map[string]string) (*url.URL, error) {
 | |
| 	path = filepath.Clean(path)
 | |
| 	if !strings.HasPrefix(path, "/") {
 | |
| 		return nil, errors.New("absolute path needed")
 | |
| 	}
 | |
| 
 | |
| 	uri := &url.URL{
 | |
| 		Scheme: scheme,
 | |
| 		Path:   path,
 | |
| 	}
 | |
| 
 | |
| 	if len(args) == 0 {
 | |
| 		return uri, nil
 | |
| 	}
 | |
| 
 | |
| 	q := uri.Query()
 | |
| 	for k, v := range args {
 | |
| 		q.Set(k, v)
 | |
| 	}
 | |
| 	uri.RawQuery = q.Encode()
 | |
| 	return uri, nil
 | |
| }
 | |
| 
 | |
| type logURI struct {
 | |
| 	config Config
 | |
| }
 | |
| 
 | |
| func (l *logURI) Config() Config {
 | |
| 	return l.config
 | |
| }
 | |
| 
 | |
| func (l *logURI) Cancel() {
 | |
| 
 | |
| }
 | |
| 
 | |
| func (l *logURI) Wait() {
 | |
| 
 | |
| }
 | |
| 
 | |
| func (l *logURI) Close() error {
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // Load the io for a container but do not attach
 | |
| //
 | |
| // Allows io to be loaded on the task for deletion without
 | |
| // starting copy routines
 | |
| func Load(set *FIFOSet) (IO, error) {
 | |
| 	return &cio{
 | |
| 		config:  set.Config,
 | |
| 		closers: []io.Closer{set},
 | |
| 	}, nil
 | |
| }
 | |
| 
 | |
| func (p *pipes) closers() []io.Closer {
 | |
| 	return []io.Closer{p.Stdin, p.Stdout, p.Stderr}
 | |
| }
 | 
