
Currently the shims only support starting the logging binary process if the io.Creator Config does not specify Terminal: true. This means that the program using containerd will only be able to specify FIFO io when Terminal: true, rather than allowing the shim to fork the logging binary process. Hence, containerd consumers face an inconsistent behavior regarding logging binary management depending on the Terminal option. Allowing the shim to fork the logging binary process will introduce consistency between the running container and the logging process. Otherwise, the logging process may die if its parent process dies whereas the container will keep running, resulting in the loss of container logs. Signed-off-by: Akshat Kumar <kshtku@amazon.com>
360 lines
7.3 KiB
Go
360 lines
7.3 KiB
Go
/*
|
|
Copyright The containerd Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package cio
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"net/url"
|
|
"os"
|
|
"path/filepath"
|
|
"strings"
|
|
"sync"
|
|
|
|
"github.com/containerd/containerd/defaults"
|
|
)
|
|
|
|
var bufPool = sync.Pool{
|
|
New: func() interface{} {
|
|
buffer := make([]byte, 32<<10)
|
|
return &buffer
|
|
},
|
|
}
|
|
|
|
// Config holds the IO configurations.
|
|
type Config struct {
|
|
// Terminal is true if one has been allocated
|
|
Terminal bool
|
|
// Stdin path
|
|
Stdin string
|
|
// Stdout path
|
|
Stdout string
|
|
// Stderr path
|
|
Stderr string
|
|
}
|
|
|
|
// IO holds the io information for a task or process
|
|
type IO interface {
|
|
// Config returns the IO configuration.
|
|
Config() Config
|
|
// Cancel aborts all current io operations.
|
|
Cancel()
|
|
// Wait blocks until all io copy operations have completed.
|
|
Wait()
|
|
// Close cleans up all open io resources. Cancel() is always called before
|
|
// Close()
|
|
Close() error
|
|
}
|
|
|
|
// Creator creates new IO sets for a task
|
|
type Creator func(id string) (IO, error)
|
|
|
|
// Attach allows callers to reattach to running tasks
|
|
//
|
|
// There should only be one reader for a task's IO set
|
|
// because fifo's can only be read from one reader or the output
|
|
// will be sent only to the first reads
|
|
type Attach func(*FIFOSet) (IO, error)
|
|
|
|
// FIFOSet is a set of file paths to FIFOs for a task's standard IO streams
|
|
type FIFOSet struct {
|
|
Config
|
|
close func() error
|
|
}
|
|
|
|
// Close the FIFOSet
|
|
func (f *FIFOSet) Close() error {
|
|
if f.close != nil {
|
|
return f.close()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// NewFIFOSet returns a new FIFOSet from a Config and a close function
|
|
func NewFIFOSet(config Config, close func() error) *FIFOSet {
|
|
return &FIFOSet{Config: config, close: close}
|
|
}
|
|
|
|
// Streams used to configure a Creator or Attach
|
|
type Streams struct {
|
|
Stdin io.Reader
|
|
Stdout io.Writer
|
|
Stderr io.Writer
|
|
Terminal bool
|
|
FIFODir string
|
|
}
|
|
|
|
// Opt customize options for creating a Creator or Attach
|
|
type Opt func(*Streams)
|
|
|
|
// WithStdio sets stream options to the standard input/output streams
|
|
func WithStdio(opt *Streams) {
|
|
WithStreams(os.Stdin, os.Stdout, os.Stderr)(opt)
|
|
}
|
|
|
|
// WithTerminal sets the terminal option
|
|
func WithTerminal(opt *Streams) {
|
|
opt.Terminal = true
|
|
}
|
|
|
|
// WithStreams sets the stream options to the specified Reader and Writers
|
|
func WithStreams(stdin io.Reader, stdout, stderr io.Writer) Opt {
|
|
return func(opt *Streams) {
|
|
opt.Stdin = stdin
|
|
opt.Stdout = stdout
|
|
opt.Stderr = stderr
|
|
}
|
|
}
|
|
|
|
// WithFIFODir sets the fifo directory.
|
|
// e.g. "/run/containerd/fifo", "/run/users/1001/containerd/fifo"
|
|
func WithFIFODir(dir string) Opt {
|
|
return func(opt *Streams) {
|
|
opt.FIFODir = dir
|
|
}
|
|
}
|
|
|
|
// NewCreator returns an IO creator from the options
|
|
func NewCreator(opts ...Opt) Creator {
|
|
streams := &Streams{}
|
|
for _, opt := range opts {
|
|
opt(streams)
|
|
}
|
|
if streams.FIFODir == "" {
|
|
streams.FIFODir = defaults.DefaultFIFODir
|
|
}
|
|
return func(id string) (IO, error) {
|
|
fifos, err := NewFIFOSetInDir(streams.FIFODir, id, streams.Terminal)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if streams.Stdin == nil {
|
|
fifos.Stdin = ""
|
|
}
|
|
if streams.Stdout == nil {
|
|
fifos.Stdout = ""
|
|
}
|
|
if streams.Stderr == nil {
|
|
fifos.Stderr = ""
|
|
}
|
|
return copyIO(fifos, streams)
|
|
}
|
|
}
|
|
|
|
// NewAttach attaches the existing io for a task to the provided io.Reader/Writers
|
|
func NewAttach(opts ...Opt) Attach {
|
|
streams := &Streams{}
|
|
for _, opt := range opts {
|
|
opt(streams)
|
|
}
|
|
return func(fifos *FIFOSet) (IO, error) {
|
|
if fifos == nil {
|
|
return nil, fmt.Errorf("cannot attach, missing fifos")
|
|
}
|
|
return copyIO(fifos, streams)
|
|
}
|
|
}
|
|
|
|
// NullIO redirects the container's IO into /dev/null
|
|
func NullIO(_ string) (IO, error) {
|
|
return &cio{}, nil
|
|
}
|
|
|
|
// cio is a basic container IO implementation.
|
|
type cio struct {
|
|
config Config
|
|
wg *sync.WaitGroup
|
|
closers []io.Closer
|
|
cancel context.CancelFunc
|
|
}
|
|
|
|
func (c *cio) Config() Config {
|
|
return c.config
|
|
}
|
|
|
|
func (c *cio) Wait() {
|
|
if c.wg != nil {
|
|
c.wg.Wait()
|
|
}
|
|
}
|
|
|
|
func (c *cio) Close() error {
|
|
var lastErr error
|
|
for _, closer := range c.closers {
|
|
if closer == nil {
|
|
continue
|
|
}
|
|
if err := closer.Close(); err != nil {
|
|
lastErr = err
|
|
}
|
|
}
|
|
return lastErr
|
|
}
|
|
|
|
func (c *cio) Cancel() {
|
|
if c.cancel != nil {
|
|
c.cancel()
|
|
}
|
|
}
|
|
|
|
type pipes struct {
|
|
Stdin io.WriteCloser
|
|
Stdout io.ReadCloser
|
|
Stderr io.ReadCloser
|
|
}
|
|
|
|
// DirectIO allows task IO to be handled externally by the caller
|
|
type DirectIO struct {
|
|
pipes
|
|
cio
|
|
}
|
|
|
|
var (
|
|
_ IO = &DirectIO{}
|
|
_ IO = &logURI{}
|
|
)
|
|
|
|
// LogURI provides the raw logging URI
|
|
func LogURI(uri *url.URL) Creator {
|
|
return func(_ string) (IO, error) {
|
|
return &logURI{
|
|
config: Config{
|
|
Stdout: uri.String(),
|
|
Stderr: uri.String(),
|
|
},
|
|
}, nil
|
|
}
|
|
}
|
|
|
|
// BinaryIO forwards container STDOUT|STDERR directly to a logging binary
|
|
func BinaryIO(binary string, args map[string]string) Creator {
|
|
return func(_ string) (IO, error) {
|
|
uri, err := LogURIGenerator("binary", binary, args)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
res := uri.String()
|
|
return &logURI{
|
|
config: Config{
|
|
Stdout: res,
|
|
Stderr: res,
|
|
},
|
|
}, nil
|
|
}
|
|
}
|
|
|
|
// TerminalBinaryIO forwards container STDOUT|STDERR directly to a logging binary
|
|
// It also sets the terminal option to true
|
|
func TerminalBinaryIO(binary string, args map[string]string) Creator {
|
|
return func(_ string) (IO, error) {
|
|
uri, err := LogURIGenerator("binary", binary, args)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
res := uri.String()
|
|
return &logURI{
|
|
config: Config{
|
|
Stdout: res,
|
|
Stderr: res,
|
|
Terminal: true,
|
|
},
|
|
}, nil
|
|
}
|
|
}
|
|
|
|
// LogFile creates a file on disk that logs the task's STDOUT,STDERR.
|
|
// If the log file already exists, the logs will be appended to the file.
|
|
func LogFile(path string) Creator {
|
|
return func(_ string) (IO, error) {
|
|
uri, err := LogURIGenerator("file", path, nil)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
res := uri.String()
|
|
return &logURI{
|
|
config: Config{
|
|
Stdout: res,
|
|
Stderr: res,
|
|
},
|
|
}, nil
|
|
}
|
|
}
|
|
|
|
// LogURIGenerator is the helper to generate log uri with specific scheme.
|
|
func LogURIGenerator(scheme string, path string, args map[string]string) (*url.URL, error) {
|
|
path = filepath.Clean(path)
|
|
if !strings.HasPrefix(path, "/") {
|
|
return nil, errors.New("absolute path needed")
|
|
}
|
|
|
|
uri := &url.URL{
|
|
Scheme: scheme,
|
|
Path: path,
|
|
}
|
|
|
|
if len(args) == 0 {
|
|
return uri, nil
|
|
}
|
|
|
|
q := uri.Query()
|
|
for k, v := range args {
|
|
q.Set(k, v)
|
|
}
|
|
uri.RawQuery = q.Encode()
|
|
return uri, nil
|
|
}
|
|
|
|
type logURI struct {
|
|
config Config
|
|
}
|
|
|
|
func (l *logURI) Config() Config {
|
|
return l.config
|
|
}
|
|
|
|
func (l *logURI) Cancel() {
|
|
|
|
}
|
|
|
|
func (l *logURI) Wait() {
|
|
|
|
}
|
|
|
|
func (l *logURI) Close() error {
|
|
return nil
|
|
}
|
|
|
|
// Load the io for a container but do not attach
|
|
//
|
|
// Allows io to be loaded on the task for deletion without
|
|
// starting copy routines
|
|
func Load(set *FIFOSet) (IO, error) {
|
|
return &cio{
|
|
config: set.Config,
|
|
closers: []io.Closer{set},
|
|
}, nil
|
|
}
|
|
|
|
func (p *pipes) closers() []io.Closer {
|
|
return []io.Closer{p.Stdin, p.Stdout, p.Stderr}
|
|
}
|