Shim pluggable logging

Closes #603

This adds logging facilities at the shim level to provide minimal I/O
overhead and pluggable logging options.  Log handling is done within the
shim so that all I/O, cpu, and memory can be charged to the container.

A sample logging driver setting up logging for a container the systemd
journal looks like this:

```go
package main

import (
	"bufio"
	"context"
	"fmt"
	"io"
	"sync"

	"github.com/containerd/containerd/runtime/v2/logging"
	"github.com/coreos/go-systemd/journal"
)

func main() {
	logging.Run(log)
}

func log(ctx context.Context, config *logging.Config, ready func() error) error {
	// construct any log metadata for the container
	vars := map[string]string{
		"SYSLOG_IDENTIFIER": fmt.Sprintf("%s:%s", config.Namespace, config.ID),
	}
	var wg sync.WaitGroup
	wg.Add(2)
	// forward both stdout and stderr to the journal
	go copy(&wg, config.Stdout, journal.PriInfo, vars)
	go copy(&wg, config.Stderr, journal.PriErr, vars)

	// signal that we are ready and setup for the container to be started
	if err := ready(); err != nil {
		return err
	}
	wg.Wait()
	return nil
}

func copy(wg *sync.WaitGroup, r io.Reader, pri journal.Priority, vars map[string]string) {
	defer wg.Done()
	s := bufio.NewScanner(r)
	for s.Scan() {
		if s.Err() != nil {
			return
		}
		journal.Send(s.Text(), pri, vars)
	}
}
```

A `logging` package has been created to assist log developers create
logging plugins for containerd.

This uses a URI based approach for logging drivers that can be expanded
in the future.

Supported URI scheme's are:

* binary
* fifo
* file

You can pass the log url via ctr on the command line:

```bash
> ctr run --rm --runtime io.containerd.runc.v2 --log-uri binary://shim-journald docker.io/library/redis:alpine redis
```

```bash
> journalctl -f -t default:redis

-- Logs begin at Tue 2018-12-11 16:29:51 EST. --
Mar 08 16:08:22 deathstar default:redis[120760]: 1:C 08 Mar 2019 21:08:22.703 # Warning: no config file specified, using the default config. In order to specify a config file use redis-server /path/to/redis.conf
Mar 08 16:08:22 deathstar default:redis[120760]: 1:M 08 Mar 2019 21:08:22.704 # You requested maxclients of 10000 requiring at least 10032 max file descriptors.
Mar 08 16:08:22 deathstar default:redis[120760]: 1:M 08 Mar 2019 21:08:22.704 # Server can't set maximum open files to 10032 because of OS error: Operation not permitted.
Mar 08 16:08:22 deathstar default:redis[120760]: 1:M 08 Mar 2019 21:08:22.704 # Current maximum open files is 1024. maxclients has been reduced to 992 to compensate for low ulimit. If you need higher maxclients increase 'ulimit -n'.
Mar 08 16:08:22 deathstar default:redis[120760]: 1:M 08 Mar 2019 21:08:22.705 * Running mode=standalone, port=6379.
Mar 08 16:08:22 deathstar default:redis[120760]: 1:M 08 Mar 2019 21:08:22.705 # WARNING: The TCP backlog setting of 511 cannot be enforced because /proc/sys/net/core/somaxconn is set to the lower value of 128.
Mar 08 16:08:22 deathstar default:redis[120760]: 1:M 08 Mar 2019 21:08:22.705 # Server initialized
Mar 08 16:08:22 deathstar default:redis[120760]: 1:M 08 Mar 2019 21:08:22.705 # WARNING overcommit_memory is set to 0! Background save may fail under low memory condition. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.
Mar 08 16:08:22 deathstar default:redis[120760]: 1:M 08 Mar 2019 21:08:22.705 # WARNING you have Transparent Huge Pages (THP) support enabled in your kernel. This will create latency and memory usage issues with Redis. To fix this issue run the command 'echo never > /sys/kernel/mm/transparent_hugepage/enabled' as root, and add it to your /etc/rc.local in order to retain the setting after a reboot. Redis must be restarted after THP is disabled.
Mar 08 16:08:22 deathstar default:redis[120760]: 1:M 08 Mar 2019 21:08:22.705 * Ready to accept connections
Mar 08 16:08:50 deathstar default:redis[120760]: 1:signal-handler (1552079330) Received SIGINT scheduling shutdown...
Mar 08 16:08:50 deathstar default:redis[120760]: 1:M 08 Mar 2019 21:08:50.405 # User requested shutdown...
Mar 08 16:08:50 deathstar default:redis[120760]: 1:M 08 Mar 2019 21:08:50.406 * Saving the final RDB snapshot before exiting.
Mar 08 16:08:50 deathstar default:redis[120760]: 1:M 08 Mar 2019 21:08:50.452 * DB saved on disk
Mar 08 16:08:50 deathstar default:redis[120760]: 1:M 08 Mar 2019 21:08:50.453 # Redis is now ready to exit, bye bye...
```

The following client side Opts are added:

```go
// LogURI provides the raw logging URI
func LogURI(uri *url.URL) Creator { }
// BinaryIO forwards contianer STDOUT|STDERR directly to a logging binary
func BinaryIO(binary string, args map[string]string) Creator {}
```

Signed-off-by: Michael Crosby <crosbymichael@gmail.com>
This commit is contained in:
Michael Crosby 2019-03-07 13:38:56 -05:00
parent aa328dfc55
commit e6ae9cc64f
16 changed files with 542 additions and 109 deletions

View File

@ -20,8 +20,8 @@ import (
"context"
"fmt"
"io"
"net/url"
"os"
"path/filepath"
"sync"
"github.com/containerd/containerd/defaults"
@ -222,46 +222,76 @@ type DirectIO struct {
cio
}
var _ IO = &DirectIO{}
var (
_ IO = &DirectIO{}
_ IO = &logURI{}
)
// LogFile creates a file on disk that logs the task's STDOUT,STDERR.
// If the log file already exists, the logs will be appended to the file.
func LogFile(path string) Creator {
// LogURI provides the raw logging URI
func LogURI(uri *url.URL) Creator {
return func(_ string) (IO, error) {
if err := os.MkdirAll(filepath.Dir(path), 0755); err != nil {
return nil, err
}
f, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return nil, err
}
f.Close()
return &logIO{
return &logURI{
config: Config{
Stdout: path,
Stderr: path,
Stdout: uri.String(),
Stderr: uri.String(),
},
}, nil
}
}
type logIO struct {
// BinaryIO forwards container STDOUT|STDERR directly to a logging binary
func BinaryIO(binary string, args map[string]string) Creator {
return func(_ string) (IO, error) {
uri := &url.URL{
Scheme: "binary",
Host: binary,
}
for k, v := range args {
uri.Query().Set(k, v)
}
return &logURI{
config: Config{
Stdout: uri.String(),
Stderr: uri.String(),
},
}, nil
}
}
// LogFile creates a file on disk that logs the task's STDOUT,STDERR.
// If the log file already exists, the logs will be appended to the file.
func LogFile(path string) Creator {
return func(_ string) (IO, error) {
uri := &url.URL{
Scheme: "file",
Host: path,
}
return &logURI{
config: Config{
Stdout: uri.String(),
Stderr: uri.String(),
},
}, nil
}
}
type logURI struct {
config Config
}
func (l *logIO) Config() Config {
func (l *logURI) Config() Config {
return l.config
}
func (l *logIO) Cancel() {
func (l *logURI) Cancel() {
}
func (l *logIO) Wait() {
func (l *logURI) Wait() {
}
func (l *logIO) Close() error {
func (l *logURI) Close() error {
return nil
}

View File

@ -99,6 +99,10 @@ var Command = cli.Command{
Name: "null-io",
Usage: "send all IO to /dev/null",
},
cli.StringFlag{
Name: "log-uri",
Usage: "log uri",
},
cli.BoolFlag{
Name: "detach,d",
Usage: "detach from the task after it has started execution",
@ -161,7 +165,7 @@ var Command = cli.Command{
}
opts := getNewTaskOpts(context)
ioOpts := []cio.Opt{cio.WithFIFODir(context.String("fifo-dir"))}
task, err := tasks.NewTask(ctx, client, container, context.String("checkpoint"), con, context.Bool("null-io"), ioOpts, opts...)
task, err := tasks.NewTask(ctx, client, container, context.String("checkpoint"), con, context.Bool("null-io"), context.String("log-uri"), ioOpts, opts...)
if err != nil {
return err
}

View File

@ -35,6 +35,10 @@ var startCommand = cli.Command{
Name: "null-io",
Usage: "send all IO to /dev/null",
},
cli.StringFlag{
Name: "log-uri",
Usage: "log uri",
},
cli.StringFlag{
Name: "fifo-dir",
Usage: "directory used for storing IO FIFOs",
@ -85,7 +89,7 @@ var startCommand = cli.Command{
}
}
task, err := NewTask(ctx, client, container, "", con, context.Bool("null-io"), ioOpts, opts...)
task, err := NewTask(ctx, client, container, "", con, context.Bool("null-io"), context.String("log-uri"), ioOpts, opts...)
if err != nil {
return err
}

View File

@ -20,6 +20,7 @@ package tasks
import (
gocontext "context"
"net/url"
"os"
"os/signal"
@ -67,7 +68,7 @@ func HandleConsoleResize(ctx gocontext.Context, task resizer, con console.Consol
}
// NewTask creates a new task
func NewTask(ctx gocontext.Context, client *containerd.Client, container containerd.Container, checkpoint string, con console.Console, nullIO bool, ioOpts []cio.Opt, opts ...containerd.NewTaskOpts) (containerd.Task, error) {
func NewTask(ctx gocontext.Context, client *containerd.Client, container containerd.Container, checkpoint string, con console.Console, nullIO bool, logURI string, ioOpts []cio.Opt, opts ...containerd.NewTaskOpts) (containerd.Task, error) {
stdio := cio.NewCreator(append([]cio.Opt{cio.WithStdio}, ioOpts...)...)
if checkpoint != "" {
im, err := client.GetImage(ctx, checkpoint)
@ -86,6 +87,13 @@ func NewTask(ctx gocontext.Context, client *containerd.Client, container contain
}
ioCreator = cio.NullIO
}
if logURI != "" {
u, err := url.Parse(logURI)
if err != nil {
return nil, err
}
ioCreator = cio.LogURI(u)
}
return container.NewTask(ctx, ioCreator, opts...)
}

View File

@ -58,7 +58,7 @@ func HandleConsoleResize(ctx gocontext.Context, task resizer, con console.Consol
}
// NewTask creates a new task
func NewTask(ctx gocontext.Context, client *containerd.Client, container containerd.Container, _ string, con console.Console, nullIO bool, ioOpts []cio.Opt, opts ...containerd.NewTaskOpts) (containerd.Task, error) {
func NewTask(ctx gocontext.Context, client *containerd.Client, container containerd.Container, _ string, con console.Console, nullIO bool, logURI string, ioOpts []cio.Opt, opts ...containerd.NewTaskOpts) (containerd.Task, error) {
var ioCreator cio.Creator
if con != nil {
if nullIO {

View File

@ -72,7 +72,7 @@ type Process interface {
// platform implementations
type Platform interface {
CopyConsole(ctx context.Context, console console.Console, stdin, stdout, stderr string,
wg, cwg *sync.WaitGroup) (console.Console, error)
wg *sync.WaitGroup) (console.Console, error)
ShutdownConsole(ctx context.Context, console console.Console) error
Close() error
}

View File

@ -46,7 +46,7 @@ type execProcess struct {
mu sync.Mutex
id string
console console.Console
io runc.IO
io *processIO
status int
exited time.Time
pid *safePid
@ -172,29 +172,30 @@ func (e *execProcess) start(ctx context.Context) (err error) {
// access e.pid until it is updated.
e.pid.Lock()
defer e.pid.Unlock()
var (
socket *runc.Socket
pidfile = filepath.Join(e.path, fmt.Sprintf("%s.pid", e.id))
pio *processIO
pidFile = newExecPidFile(e.path, e.id)
)
if e.stdio.Terminal {
if socket, err = runc.NewTempConsoleSocket(); err != nil {
return errors.Wrap(err, "failed to create runc console socket")
}
defer socket.Close()
} else if e.stdio.IsNull() {
if e.io, err = runc.NewNullIO(); err != nil {
return errors.Wrap(err, "creating new NULL IO")
}
} else {
if e.io, err = runc.NewPipeIO(e.parent.IoUID, e.parent.IoGID, withConditionalIO(e.stdio)); err != nil {
return errors.Wrap(err, "failed to create runc io pipes")
if pio, err = createIO(ctx, e.id, e.parent.IoUID, e.parent.IoGID, e.stdio); err != nil {
return errors.Wrap(err, "failed to create init process I/O")
}
e.io = pio
}
opts := &runc.ExecOpts{
PidFile: pidfile,
IO: e.io,
PidFile: pidFile.Path(),
Detach: true,
}
if pio != nil {
opts.IO = pio.IO()
}
if socket != nil {
opts.ConsoleSocket = socket
}
@ -203,14 +204,10 @@ func (e *execProcess) start(ctx context.Context) (err error) {
return e.parent.runtimeError(err, "OCI runtime exec failed")
}
if e.stdio.Stdin != "" {
sc, err := fifo.OpenFifo(context.Background(), e.stdio.Stdin, syscall.O_WRONLY|syscall.O_NONBLOCK, 0)
if err != nil {
return errors.Wrapf(err, "failed to open stdin fifo %s", e.stdio.Stdin)
if err := e.openStdin(e.stdio.Stdin); err != nil {
return err
}
e.closers = append(e.closers, sc)
e.stdin = sc
}
var copyWaitGroup sync.WaitGroup
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
if socket != nil {
@ -218,16 +215,15 @@ func (e *execProcess) start(ctx context.Context) (err error) {
if err != nil {
return errors.Wrap(err, "failed to retrieve console master")
}
if e.console, err = e.parent.Platform.CopyConsole(ctx, console, e.stdio.Stdin, e.stdio.Stdout, e.stdio.Stderr, &e.wg, &copyWaitGroup); err != nil {
if e.console, err = e.parent.Platform.CopyConsole(ctx, console, e.stdio.Stdin, e.stdio.Stdout, e.stdio.Stderr, &e.wg); err != nil {
return errors.Wrap(err, "failed to start console copy")
}
} else if !e.stdio.IsNull() {
if err := copyPipes(ctx, e.io, e.stdio.Stdin, e.stdio.Stdout, e.stdio.Stderr, &e.wg, &copyWaitGroup); err != nil {
} else {
if err := pio.Copy(ctx, &e.wg); err != nil {
return errors.Wrap(err, "failed to start io pipe copy")
}
}
copyWaitGroup.Wait()
pid, err := runc.ReadPidFile(opts.PidFile)
pid, err := pidFile.Read()
if err != nil {
return errors.Wrap(err, "failed to retrieve OCI runtime exec pid")
}
@ -235,6 +231,16 @@ func (e *execProcess) start(ctx context.Context) (err error) {
return nil
}
func (e *execProcess) openStdin(path string) error {
sc, err := fifo.OpenFifo(context.Background(), path, syscall.O_WRONLY|syscall.O_NONBLOCK, 0)
if err != nil {
return errors.Wrapf(err, "failed to open stdin fifo %s", path)
}
e.stdin = sc
e.closers = append(e.closers, sc)
return nil
}
func (e *execProcess) Status(ctx context.Context) (string, error) {
s, err := e.parent.Status(ctx)
if err != nil {

View File

@ -41,9 +41,6 @@ import (
"github.com/pkg/errors"
)
// InitPidFile name of the file that contains the init pid
const InitPidFile = "init.pid"
// Init represents an initial process for a container
type Init struct {
wg sync.WaitGroup
@ -63,7 +60,7 @@ type Init struct {
Bundle string
console console.Console
Platform proc.Platform
io runc.IO
io *processIO
runtime *runc.Runc
status int
exited time.Time
@ -113,47 +110,31 @@ func (p *Init) Create(ctx context.Context, r *CreateConfig) error {
var (
err error
socket *runc.Socket
pio *processIO
pidFile = newPidFile(p.Bundle)
)
if r.Terminal {
if socket, err = runc.NewTempConsoleSocket(); err != nil {
return errors.Wrap(err, "failed to create OCI runtime console socket")
}
defer socket.Close()
} else if hasNoIO(r) {
if p.io, err = runc.NewNullIO(); err != nil {
return errors.Wrap(err, "creating new NULL IO")
}
} else {
if p.io, err = runc.NewPipeIO(p.IoUID, p.IoGID, withConditionalIO(p.stdio)); err != nil {
return errors.Wrap(err, "failed to create OCI runtime io pipes")
if pio, err = createIO(ctx, p.id, p.IoUID, p.IoGID, p.stdio); err != nil {
return errors.Wrap(err, "failed to create init process I/O")
}
p.io = pio
}
pidFile := filepath.Join(p.Bundle, InitPidFile)
if r.Checkpoint != "" {
opts := &runc.RestoreOpts{
CheckpointOpts: runc.CheckpointOpts{
ImagePath: r.Checkpoint,
WorkDir: p.CriuWorkPath,
ParentPath: r.ParentCheckpoint,
},
PidFile: pidFile,
IO: p.io,
NoPivot: p.NoPivotRoot,
Detach: true,
NoSubreaper: true,
}
p.initState = &createdCheckpointState{
p: p,
opts: opts,
}
return nil
return p.createCheckpointedState(r, pidFile)
}
opts := &runc.CreateOpts{
PidFile: pidFile,
IO: p.io,
PidFile: pidFile.Path(),
NoPivot: p.NoPivotRoot,
NoNewKeyring: p.NoNewKeyring,
}
if p.io != nil {
opts.IO = p.io.IO()
}
if socket != nil {
opts.ConsoleSocket = socket
}
@ -161,14 +142,10 @@ func (p *Init) Create(ctx context.Context, r *CreateConfig) error {
return p.runtimeError(err, "OCI runtime create failed")
}
if r.Stdin != "" {
sc, err := fifo.OpenFifo(context.Background(), r.Stdin, syscall.O_WRONLY|syscall.O_NONBLOCK, 0)
if err != nil {
return errors.Wrapf(err, "failed to open stdin fifo %s", r.Stdin)
if err := p.openStdin(r.Stdin); err != nil {
return err
}
p.stdin = sc
p.closers = append(p.closers, sc)
}
var copyWaitGroup sync.WaitGroup
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
if socket != nil {
@ -176,19 +153,17 @@ func (p *Init) Create(ctx context.Context, r *CreateConfig) error {
if err != nil {
return errors.Wrap(err, "failed to retrieve console master")
}
console, err = p.Platform.CopyConsole(ctx, console, r.Stdin, r.Stdout, r.Stderr, &p.wg, &copyWaitGroup)
console, err = p.Platform.CopyConsole(ctx, console, r.Stdin, r.Stdout, r.Stderr, &p.wg)
if err != nil {
return errors.Wrap(err, "failed to start console copy")
}
p.console = console
} else if !hasNoIO(r) {
if err := copyPipes(ctx, p.io, r.Stdin, r.Stdout, r.Stderr, &p.wg, &copyWaitGroup); err != nil {
} else {
if err := pio.Copy(ctx, &p.wg); err != nil {
return errors.Wrap(err, "failed to start io pipe copy")
}
}
copyWaitGroup.Wait()
pid, err := runc.ReadPidFile(pidFile)
pid, err := pidFile.Read()
if err != nil {
return errors.Wrap(err, "failed to retrieve OCI runtime container pid")
}
@ -196,6 +171,36 @@ func (p *Init) Create(ctx context.Context, r *CreateConfig) error {
return nil
}
func (p *Init) openStdin(path string) error {
sc, err := fifo.OpenFifo(context.Background(), path, syscall.O_WRONLY|syscall.O_NONBLOCK, 0)
if err != nil {
return errors.Wrapf(err, "failed to open stdin fifo %s", path)
}
p.stdin = sc
p.closers = append(p.closers, sc)
return nil
}
func (p *Init) createCheckpointedState(r *CreateConfig, pidFile *pidFile) error {
opts := &runc.RestoreOpts{
CheckpointOpts: runc.CheckpointOpts{
ImagePath: r.Checkpoint,
WorkDir: p.CriuWorkPath,
ParentPath: r.ParentCheckpoint,
},
PidFile: pidFile.Path(),
IO: p.io.IO(),
NoPivot: p.NoPivotRoot,
Detach: true,
NoSubreaper: true,
}
p.initState = &createdCheckpointState{
p: p,
opts: opts,
}
return nil
}
// Wait for the process to exit
func (p *Init) Wait() {
<-p.waitBlock

View File

@ -20,12 +20,9 @@ package proc
import (
"context"
"sync"
"syscall"
"github.com/containerd/console"
"github.com/containerd/containerd/runtime/proc"
"github.com/containerd/fifo"
runc "github.com/containerd/go-runc"
google_protobuf "github.com/gogo/protobuf/types"
"github.com/pkg/errors"
@ -172,31 +169,25 @@ func (s *createdCheckpointState) Start(ctx context.Context) error {
return p.runtimeError(err, "OCI runtime restore failed")
}
if sio.Stdin != "" {
sc, err := fifo.OpenFifo(context.Background(), sio.Stdin, syscall.O_WRONLY|syscall.O_NONBLOCK, 0)
if err != nil {
if err := p.openStdin(sio.Stdin); err != nil {
return errors.Wrapf(err, "failed to open stdin fifo %s", sio.Stdin)
}
p.stdin = sc
p.closers = append(p.closers, sc)
}
var copyWaitGroup sync.WaitGroup
if socket != nil {
console, err := socket.ReceiveMaster()
if err != nil {
return errors.Wrap(err, "failed to retrieve console master")
}
console, err = p.Platform.CopyConsole(ctx, console, sio.Stdin, sio.Stdout, sio.Stderr, &p.wg, &copyWaitGroup)
console, err = p.Platform.CopyConsole(ctx, console, sio.Stdin, sio.Stdout, sio.Stderr, &p.wg)
if err != nil {
return errors.Wrap(err, "failed to start console copy")
}
p.console = console
} else if !sio.IsNull() {
if err := copyPipes(ctx, p.io, sio.Stdin, sio.Stdout, sio.Stderr, &p.wg, &copyWaitGroup); err != nil {
} else {
if err := p.io.Copy(ctx, &p.wg); err != nil {
return errors.Wrap(err, "failed to start io pipe copy")
}
}
copyWaitGroup.Wait()
pid, err := runc.ReadPidFile(s.opts.PidFile)
if err != nil {
return errors.Wrap(err, "failed to retrieve OCI runtime container pid")

View File

@ -22,12 +22,18 @@ import (
"context"
"fmt"
"io"
"net/url"
"os"
"os/exec"
"path/filepath"
"sync"
"syscall"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/runtime/proc"
"github.com/containerd/fifo"
runc "github.com/containerd/go-runc"
"github.com/pkg/errors"
)
var bufPool = sync.Pool{
@ -37,6 +43,84 @@ var bufPool = sync.Pool{
},
}
type processIO struct {
io runc.IO
uri *url.URL
copy bool
stdio proc.Stdio
}
func (p *processIO) Close() error {
if p.io != nil {
return p.io.Close()
}
return nil
}
func (p *processIO) IO() runc.IO {
return p.io
}
func (p *processIO) Copy(ctx context.Context, wg *sync.WaitGroup) error {
if !p.copy {
return nil
}
var cwg sync.WaitGroup
if err := copyPipes(ctx, p.IO(), p.stdio.Stdin, p.stdio.Stdout, p.stdio.Stderr, wg, &cwg); err != nil {
return errors.Wrap(err, "unable to copy pipes")
}
cwg.Wait()
return nil
}
func createIO(ctx context.Context, id string, ioUID, ioGID int, stdio proc.Stdio) (*processIO, error) {
pio := &processIO{
stdio: stdio,
}
if stdio.IsNull() {
i, err := runc.NewNullIO()
if err != nil {
return nil, err
}
pio.io = i
return pio, nil
}
u, err := url.Parse(stdio.Stdout)
if err != nil {
return nil, errors.Wrap(err, "unable to parse stdout uri")
}
if u.Scheme == "" {
u.Scheme = "fifo"
}
pio.uri = u
switch u.Scheme {
case "fifo":
pio.copy = true
pio.io, err = runc.NewPipeIO(ioUID, ioGID, withConditionalIO(stdio))
case "binary":
pio.io, err = newBinaryIO(ctx, id, u)
case "file":
if err := os.MkdirAll(filepath.Dir(u.Host), 0755); err != nil {
return nil, err
}
var f *os.File
f, err = os.OpenFile(u.Host, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return nil, err
}
f.Close()
pio.copy = true
pio.io, err = runc.NewPipeIO(ioUID, ioGID, withConditionalIO(stdio))
default:
return nil, errors.Errorf("unknown STDIO scheme %s", u.Scheme)
}
if err != nil {
return nil, err
}
return pio, nil
}
func copyPipes(ctx context.Context, rio runc.IO, stdin, stdout, stderr string, wg, cwg *sync.WaitGroup) error {
var sameFile io.WriteCloser
for _, i := range []struct {
@ -143,3 +227,134 @@ func isFifo(path string) (bool, error) {
}
return false, nil
}
func newBinaryIO(ctx context.Context, id string, uri *url.URL) (runc.IO, error) {
ns, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return nil, err
}
var args []string
for k, vs := range uri.Query() {
args = append(args, k)
if len(vs) > 0 {
args = append(args, vs[0])
}
}
ctx, cancel := context.WithCancel(ctx)
cmd := exec.CommandContext(ctx, uri.Host, args...)
cmd.Env = append(cmd.Env,
"CONTAINER_ID="+id,
"CONTAINER_NAMESPACE="+ns,
)
out, err := newPipe()
if err != nil {
return nil, err
}
serr, err := newPipe()
if err != nil {
return nil, err
}
r, w, err := os.Pipe()
if err != nil {
return nil, err
}
cmd.ExtraFiles = append(cmd.ExtraFiles, out.r, serr.r, w)
// don't need to register this with the reaper or wait when
// running inside a shim
if err := cmd.Start(); err != nil {
return nil, err
}
// close our side of the pipe after start
w.Close()
// wait for the logging binary to be ready
b := make([]byte, 1)
if _, err := r.Read(b); err != nil && err != io.EOF {
return nil, err
}
return &binaryIO{
cmd: cmd,
cancel: cancel,
out: out,
err: serr,
}, nil
}
type binaryIO struct {
cmd *exec.Cmd
cancel func()
out, err *pipe
}
func (b *binaryIO) CloseAfterStart() (err error) {
for _, v := range []*pipe{
b.out,
b.err,
} {
if v != nil {
if cerr := v.r.Close(); err == nil {
err = cerr
}
}
}
return err
}
func (b *binaryIO) Close() (err error) {
b.cancel()
for _, v := range []*pipe{
b.out,
b.err,
} {
if v != nil {
if cerr := v.Close(); err == nil {
err = cerr
}
}
}
return err
}
func (b *binaryIO) Stdin() io.WriteCloser {
return nil
}
func (b *binaryIO) Stdout() io.ReadCloser {
return nil
}
func (b *binaryIO) Stderr() io.ReadCloser {
return nil
}
func (b *binaryIO) Set(cmd *exec.Cmd) {
if b.out != nil {
cmd.Stdout = b.out.w
}
if b.err != nil {
cmd.Stderr = b.err.w
}
}
func newPipe() (*pipe, error) {
r, w, err := os.Pipe()
if err != nil {
return nil, err
}
return &pipe{
r: r,
w: w,
}, nil
}
type pipe struct {
r *os.File
w *os.File
}
func (p *pipe) Close() error {
err := p.w.Close()
if rerr := p.r.Close(); err == nil {
err = rerr
}
return err
}

View File

@ -20,8 +20,10 @@ package proc
import (
"encoding/json"
"fmt"
"io"
"os"
"path/filepath"
"strings"
"sync"
"time"
@ -114,6 +116,29 @@ func checkKillError(err error) error {
return errors.Wrapf(err, "unknown error after kill")
}
func hasNoIO(r *CreateConfig) bool {
return r.Stdin == "" && r.Stdout == "" && r.Stderr == ""
// InitPidFile name of the file that contains the init pid
const InitPidFile = "init.pid"
func newPidFile(bundle string) *pidFile {
return &pidFile{
path: filepath.Join(bundle, InitPidFile),
}
}
func newExecPidFile(bundle, id string) *pidFile {
return &pidFile{
path: filepath.Join(bundle, fmt.Sprintf("%s.pid", id)),
}
}
type pidFile struct {
path string
}
func (p *pidFile) Path() string {
return p.path
}
func (p *pidFile) Read() (int, error) {
return runc.ReadPidFile(p.path)
}

View File

@ -31,7 +31,7 @@ type linuxPlatform struct {
epoller *console.Epoller
}
func (p *linuxPlatform) CopyConsole(ctx context.Context, console console.Console, stdin, stdout, stderr string, wg, cwg *sync.WaitGroup) (console.Console, error) {
func (p *linuxPlatform) CopyConsole(ctx context.Context, console console.Console, stdin, stdout, stderr string, wg *sync.WaitGroup) (console.Console, error) {
if p.epoller == nil {
return nil, errors.New("uninitialized epoller")
}
@ -40,6 +40,7 @@ func (p *linuxPlatform) CopyConsole(ctx context.Context, console console.Console
if err != nil {
return nil, err
}
var cwg sync.WaitGroup
if stdin != "" {
in, err := fifo.OpenFifo(ctx, stdin, syscall.O_RDONLY, 0)
@ -77,6 +78,7 @@ func (p *linuxPlatform) CopyConsole(ctx context.Context, console console.Console
outw.Close()
wg.Done()
}()
cwg.Wait()
return epollConsole, nil
}

View File

@ -31,7 +31,8 @@ import (
type unixPlatform struct {
}
func (p *unixPlatform) CopyConsole(ctx context.Context, console console.Console, stdin, stdout, stderr string, wg, cwg *sync.WaitGroup) (console.Console, error) {
func (p *unixPlatform) CopyConsole(ctx context.Context, console console.Console, stdin, stdout, stderr string, wg *sync.WaitGroup) (console.Console, error) {
var cwg sync.WaitGroup
if stdin != "" {
in, err := fifo.OpenFifo(ctx, stdin, syscall.O_RDONLY, 0)
if err != nil {
@ -67,6 +68,7 @@ func (p *unixPlatform) CopyConsole(ctx context.Context, console console.Console,
outw.Close()
wg.Done()
}()
cwg.Wait()
return console, nil
}

View File

@ -173,6 +173,68 @@ The Runtime v2 supports an async event model. In order for the an upstream calle
| `runtime.TaskExitEventTopic` | MUST (follow `TaskExecStartedEventTopic`) | When an exec (other than the init exec) exits expected or unexpected |
| `runtime.TaskDeleteEventTopic` | SHOULD (follow `TaskExitEventTopic` or `TaskExecAddedEventTopic` if never started) | When an exec is removed from a shim |
#### Logging
Shims may support pluggable logging via STDIO URIs.
Current supported schemes for logging are:
* fifo - Linux
* binary - Linux & Windows
* file - Linux & Windows
* npipe - Windows
Binary logging has the abilty to forward a container's STDIO to an external binary for consumption.
A sample logging driver that forwards the container's STDOUT and STDERR to `journald` is:
```go
package main
import (
"bufio"
"context"
"fmt"
"io"
"sync"
"github.com/containerd/containerd/runtime/v2/logging"
"github.com/coreos/go-systemd/journal"
)
func main() {
logging.Run(log)
}
func log(ctx context.Context, config *logging.Config, ready func() error) error {
// construct any log metadata for the container
vars := map[string]string{
"SYSLOG_IDENTIFIER": fmt.Sprintf("%s:%s", config.Namespace, config.ID),
}
var wg sync.WaitGroup
wg.Add(2)
// forward both stdout and stderr to the journal
go copy(&wg, config.Stdout, journal.PriInfo, vars)
go copy(&wg, config.Stderr, journal.PriErr, vars)
// signal that we are ready and setup for the container to be started
if err := ready(); err != nil {
return err
}
wg.Wait()
return nil
}
func copy(wg *sync.WaitGroup, r io.Reader, pri journal.Priority, vars map[string]string) {
defer wg.Done()
s := bufio.NewScanner(r)
for s.Scan() {
if s.Err() != nil {
return
}
journal.Send(s.Text(), pri, vars)
}
}
```
### Other
#### Unsupported rpcs

View File

@ -0,0 +1,77 @@
// +build !windows
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package logging
import (
"context"
"fmt"
"io"
"os"
"os/signal"
"golang.org/x/sys/unix"
)
// Config of the container logs
type Config struct {
ID string
Namespace string
Stdout io.Reader
Stderr io.Reader
}
// LoggerFunc is implemented by custom v2 logging binaries
type LoggerFunc func(context.Context, *Config, func() error) error
// Run the logging driver
func Run(fn LoggerFunc) {
ctx, cancel := context.WithCancel(context.Background())
config := &Config{
ID: os.Getenv("CONTAINER_ID"),
Namespace: os.Getenv("CONTAINER_NAMESPACE"),
Stdout: os.NewFile(3, "CONTAINER_STDOUT"),
Stderr: os.NewFile(4, "CONTAINER_STDERR"),
}
var (
s = make(chan os.Signal, 32)
errCh = make(chan error, 1)
wait = os.NewFile(5, "CONTAINER_WAIT")
)
signal.Notify(s, unix.SIGTERM)
go func() {
if err := fn(ctx, config, wait.Close); err != nil {
errCh <- err
}
errCh <- nil
}()
for {
select {
case <-s:
cancel()
case err := <-errCh:
if err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
os.Exit(0)
}
}
}

View File

@ -53,7 +53,7 @@ type linuxPlatform struct {
epoller *console.Epoller
}
func (p *linuxPlatform) CopyConsole(ctx context.Context, console console.Console, stdin, stdout, stderr string, wg, cwg *sync.WaitGroup) (console.Console, error) {
func (p *linuxPlatform) CopyConsole(ctx context.Context, console console.Console, stdin, stdout, stderr string, wg *sync.WaitGroup) (console.Console, error) {
if p.epoller == nil {
return nil, errors.New("uninitialized epoller")
}
@ -63,6 +63,7 @@ func (p *linuxPlatform) CopyConsole(ctx context.Context, console console.Console
return nil, err
}
var cwg sync.WaitGroup
if stdin != "" {
in, err := fifo.OpenFifo(context.Background(), stdin, syscall.O_RDONLY|syscall.O_NONBLOCK, 0)
if err != nil {
@ -99,6 +100,7 @@ func (p *linuxPlatform) CopyConsole(ctx context.Context, console console.Console
outw.Close()
wg.Done()
}()
cwg.Wait()
return epollConsole, nil
}