From e6ae9cc64f61fc5f65bdb5a8efeeca23ac1d28ea Mon Sep 17 00:00:00 2001 From: Michael Crosby Date: Thu, 7 Mar 2019 13:38:56 -0500 Subject: [PATCH] Shim pluggable logging Closes #603 This adds logging facilities at the shim level to provide minimal I/O overhead and pluggable logging options. Log handling is done within the shim so that all I/O, cpu, and memory can be charged to the container. A sample logging driver setting up logging for a container the systemd journal looks like this: ```go package main import ( "bufio" "context" "fmt" "io" "sync" "github.com/containerd/containerd/runtime/v2/logging" "github.com/coreos/go-systemd/journal" ) func main() { logging.Run(log) } func log(ctx context.Context, config *logging.Config, ready func() error) error { // construct any log metadata for the container vars := map[string]string{ "SYSLOG_IDENTIFIER": fmt.Sprintf("%s:%s", config.Namespace, config.ID), } var wg sync.WaitGroup wg.Add(2) // forward both stdout and stderr to the journal go copy(&wg, config.Stdout, journal.PriInfo, vars) go copy(&wg, config.Stderr, journal.PriErr, vars) // signal that we are ready and setup for the container to be started if err := ready(); err != nil { return err } wg.Wait() return nil } func copy(wg *sync.WaitGroup, r io.Reader, pri journal.Priority, vars map[string]string) { defer wg.Done() s := bufio.NewScanner(r) for s.Scan() { if s.Err() != nil { return } journal.Send(s.Text(), pri, vars) } } ``` A `logging` package has been created to assist log developers create logging plugins for containerd. This uses a URI based approach for logging drivers that can be expanded in the future. Supported URI scheme's are: * binary * fifo * file You can pass the log url via ctr on the command line: ```bash > ctr run --rm --runtime io.containerd.runc.v2 --log-uri binary://shim-journald docker.io/library/redis:alpine redis ``` ```bash > journalctl -f -t default:redis -- Logs begin at Tue 2018-12-11 16:29:51 EST. -- Mar 08 16:08:22 deathstar default:redis[120760]: 1:C 08 Mar 2019 21:08:22.703 # Warning: no config file specified, using the default config. In order to specify a config file use redis-server /path/to/redis.conf Mar 08 16:08:22 deathstar default:redis[120760]: 1:M 08 Mar 2019 21:08:22.704 # You requested maxclients of 10000 requiring at least 10032 max file descriptors. Mar 08 16:08:22 deathstar default:redis[120760]: 1:M 08 Mar 2019 21:08:22.704 # Server can't set maximum open files to 10032 because of OS error: Operation not permitted. Mar 08 16:08:22 deathstar default:redis[120760]: 1:M 08 Mar 2019 21:08:22.704 # Current maximum open files is 1024. maxclients has been reduced to 992 to compensate for low ulimit. If you need higher maxclients increase 'ulimit -n'. Mar 08 16:08:22 deathstar default:redis[120760]: 1:M 08 Mar 2019 21:08:22.705 * Running mode=standalone, port=6379. Mar 08 16:08:22 deathstar default:redis[120760]: 1:M 08 Mar 2019 21:08:22.705 # WARNING: The TCP backlog setting of 511 cannot be enforced because /proc/sys/net/core/somaxconn is set to the lower value of 128. Mar 08 16:08:22 deathstar default:redis[120760]: 1:M 08 Mar 2019 21:08:22.705 # Server initialized Mar 08 16:08:22 deathstar default:redis[120760]: 1:M 08 Mar 2019 21:08:22.705 # WARNING overcommit_memory is set to 0! Background save may fail under low memory condition. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect. Mar 08 16:08:22 deathstar default:redis[120760]: 1:M 08 Mar 2019 21:08:22.705 # WARNING you have Transparent Huge Pages (THP) support enabled in your kernel. This will create latency and memory usage issues with Redis. To fix this issue run the command 'echo never > /sys/kernel/mm/transparent_hugepage/enabled' as root, and add it to your /etc/rc.local in order to retain the setting after a reboot. Redis must be restarted after THP is disabled. Mar 08 16:08:22 deathstar default:redis[120760]: 1:M 08 Mar 2019 21:08:22.705 * Ready to accept connections Mar 08 16:08:50 deathstar default:redis[120760]: 1:signal-handler (1552079330) Received SIGINT scheduling shutdown... Mar 08 16:08:50 deathstar default:redis[120760]: 1:M 08 Mar 2019 21:08:50.405 # User requested shutdown... Mar 08 16:08:50 deathstar default:redis[120760]: 1:M 08 Mar 2019 21:08:50.406 * Saving the final RDB snapshot before exiting. Mar 08 16:08:50 deathstar default:redis[120760]: 1:M 08 Mar 2019 21:08:50.452 * DB saved on disk Mar 08 16:08:50 deathstar default:redis[120760]: 1:M 08 Mar 2019 21:08:50.453 # Redis is now ready to exit, bye bye... ``` The following client side Opts are added: ```go // LogURI provides the raw logging URI func LogURI(uri *url.URL) Creator { } // BinaryIO forwards contianer STDOUT|STDERR directly to a logging binary func BinaryIO(binary string, args map[string]string) Creator {} ``` Signed-off-by: Michael Crosby --- cio/io.go | 72 +++++--- cmd/ctr/commands/run/run.go | 6 +- cmd/ctr/commands/tasks/start.go | 6 +- cmd/ctr/commands/tasks/tasks_unix.go | 10 +- cmd/ctr/commands/tasks/tasks_windows.go | 2 +- runtime/proc/proc.go | 2 +- runtime/v1/linux/proc/exec.go | 48 +++--- runtime/v1/linux/proc/init.go | 93 +++++----- runtime/v1/linux/proc/init_state.go | 17 +- runtime/v1/linux/proc/io.go | 215 ++++++++++++++++++++++++ runtime/v1/linux/proc/utils.go | 29 +++- runtime/v1/shim/service_linux.go | 4 +- runtime/v1/shim/service_unix.go | 4 +- runtime/v2/README.md | 62 +++++++ runtime/v2/logging/logging.go | 77 +++++++++ runtime/v2/runc/platform.go | 4 +- 16 files changed, 542 insertions(+), 109 deletions(-) create mode 100644 runtime/v2/logging/logging.go diff --git a/cio/io.go b/cio/io.go index 1f8abf5f0..133bfcdbe 100644 --- a/cio/io.go +++ b/cio/io.go @@ -20,8 +20,8 @@ import ( "context" "fmt" "io" + "net/url" "os" - "path/filepath" "sync" "github.com/containerd/containerd/defaults" @@ -222,46 +222,76 @@ type DirectIO struct { cio } -var _ IO = &DirectIO{} +var ( + _ IO = &DirectIO{} + _ IO = &logURI{} +) -// LogFile creates a file on disk that logs the task's STDOUT,STDERR. -// If the log file already exists, the logs will be appended to the file. -func LogFile(path string) Creator { +// LogURI provides the raw logging URI +func LogURI(uri *url.URL) Creator { return func(_ string) (IO, error) { - if err := os.MkdirAll(filepath.Dir(path), 0755); err != nil { - return nil, err - } - f, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) - if err != nil { - return nil, err - } - f.Close() - return &logIO{ + return &logURI{ config: Config{ - Stdout: path, - Stderr: path, + Stdout: uri.String(), + Stderr: uri.String(), }, }, nil } } -type logIO struct { +// BinaryIO forwards container STDOUT|STDERR directly to a logging binary +func BinaryIO(binary string, args map[string]string) Creator { + return func(_ string) (IO, error) { + uri := &url.URL{ + Scheme: "binary", + Host: binary, + } + for k, v := range args { + uri.Query().Set(k, v) + } + return &logURI{ + config: Config{ + Stdout: uri.String(), + Stderr: uri.String(), + }, + }, nil + } +} + +// LogFile creates a file on disk that logs the task's STDOUT,STDERR. +// If the log file already exists, the logs will be appended to the file. +func LogFile(path string) Creator { + return func(_ string) (IO, error) { + uri := &url.URL{ + Scheme: "file", + Host: path, + } + return &logURI{ + config: Config{ + Stdout: uri.String(), + Stderr: uri.String(), + }, + }, nil + } +} + +type logURI struct { config Config } -func (l *logIO) Config() Config { +func (l *logURI) Config() Config { return l.config } -func (l *logIO) Cancel() { +func (l *logURI) Cancel() { } -func (l *logIO) Wait() { +func (l *logURI) Wait() { } -func (l *logIO) Close() error { +func (l *logURI) Close() error { return nil } diff --git a/cmd/ctr/commands/run/run.go b/cmd/ctr/commands/run/run.go index 2f01247ac..41f11a1c2 100644 --- a/cmd/ctr/commands/run/run.go +++ b/cmd/ctr/commands/run/run.go @@ -99,6 +99,10 @@ var Command = cli.Command{ Name: "null-io", Usage: "send all IO to /dev/null", }, + cli.StringFlag{ + Name: "log-uri", + Usage: "log uri", + }, cli.BoolFlag{ Name: "detach,d", Usage: "detach from the task after it has started execution", @@ -161,7 +165,7 @@ var Command = cli.Command{ } opts := getNewTaskOpts(context) ioOpts := []cio.Opt{cio.WithFIFODir(context.String("fifo-dir"))} - task, err := tasks.NewTask(ctx, client, container, context.String("checkpoint"), con, context.Bool("null-io"), ioOpts, opts...) + task, err := tasks.NewTask(ctx, client, container, context.String("checkpoint"), con, context.Bool("null-io"), context.String("log-uri"), ioOpts, opts...) if err != nil { return err } diff --git a/cmd/ctr/commands/tasks/start.go b/cmd/ctr/commands/tasks/start.go index 0774d5784..0255cb999 100644 --- a/cmd/ctr/commands/tasks/start.go +++ b/cmd/ctr/commands/tasks/start.go @@ -35,6 +35,10 @@ var startCommand = cli.Command{ Name: "null-io", Usage: "send all IO to /dev/null", }, + cli.StringFlag{ + Name: "log-uri", + Usage: "log uri", + }, cli.StringFlag{ Name: "fifo-dir", Usage: "directory used for storing IO FIFOs", @@ -85,7 +89,7 @@ var startCommand = cli.Command{ } } - task, err := NewTask(ctx, client, container, "", con, context.Bool("null-io"), ioOpts, opts...) + task, err := NewTask(ctx, client, container, "", con, context.Bool("null-io"), context.String("log-uri"), ioOpts, opts...) if err != nil { return err } diff --git a/cmd/ctr/commands/tasks/tasks_unix.go b/cmd/ctr/commands/tasks/tasks_unix.go index e10fe798d..103466482 100644 --- a/cmd/ctr/commands/tasks/tasks_unix.go +++ b/cmd/ctr/commands/tasks/tasks_unix.go @@ -20,6 +20,7 @@ package tasks import ( gocontext "context" + "net/url" "os" "os/signal" @@ -67,7 +68,7 @@ func HandleConsoleResize(ctx gocontext.Context, task resizer, con console.Consol } // NewTask creates a new task -func NewTask(ctx gocontext.Context, client *containerd.Client, container containerd.Container, checkpoint string, con console.Console, nullIO bool, ioOpts []cio.Opt, opts ...containerd.NewTaskOpts) (containerd.Task, error) { +func NewTask(ctx gocontext.Context, client *containerd.Client, container containerd.Container, checkpoint string, con console.Console, nullIO bool, logURI string, ioOpts []cio.Opt, opts ...containerd.NewTaskOpts) (containerd.Task, error) { stdio := cio.NewCreator(append([]cio.Opt{cio.WithStdio}, ioOpts...)...) if checkpoint != "" { im, err := client.GetImage(ctx, checkpoint) @@ -86,6 +87,13 @@ func NewTask(ctx gocontext.Context, client *containerd.Client, container contain } ioCreator = cio.NullIO } + if logURI != "" { + u, err := url.Parse(logURI) + if err != nil { + return nil, err + } + ioCreator = cio.LogURI(u) + } return container.NewTask(ctx, ioCreator, opts...) } diff --git a/cmd/ctr/commands/tasks/tasks_windows.go b/cmd/ctr/commands/tasks/tasks_windows.go index f3831ea3e..b16e8d8c8 100644 --- a/cmd/ctr/commands/tasks/tasks_windows.go +++ b/cmd/ctr/commands/tasks/tasks_windows.go @@ -58,7 +58,7 @@ func HandleConsoleResize(ctx gocontext.Context, task resizer, con console.Consol } // NewTask creates a new task -func NewTask(ctx gocontext.Context, client *containerd.Client, container containerd.Container, _ string, con console.Console, nullIO bool, ioOpts []cio.Opt, opts ...containerd.NewTaskOpts) (containerd.Task, error) { +func NewTask(ctx gocontext.Context, client *containerd.Client, container containerd.Container, _ string, con console.Console, nullIO bool, logURI string, ioOpts []cio.Opt, opts ...containerd.NewTaskOpts) (containerd.Task, error) { var ioCreator cio.Creator if con != nil { if nullIO { diff --git a/runtime/proc/proc.go b/runtime/proc/proc.go index 91ca59bb1..0e8d21b74 100644 --- a/runtime/proc/proc.go +++ b/runtime/proc/proc.go @@ -72,7 +72,7 @@ type Process interface { // platform implementations type Platform interface { CopyConsole(ctx context.Context, console console.Console, stdin, stdout, stderr string, - wg, cwg *sync.WaitGroup) (console.Console, error) + wg *sync.WaitGroup) (console.Console, error) ShutdownConsole(ctx context.Context, console console.Console) error Close() error } diff --git a/runtime/v1/linux/proc/exec.go b/runtime/v1/linux/proc/exec.go index 5dbfbaf62..5ab232ae7 100644 --- a/runtime/v1/linux/proc/exec.go +++ b/runtime/v1/linux/proc/exec.go @@ -46,7 +46,7 @@ type execProcess struct { mu sync.Mutex id string console console.Console - io runc.IO + io *processIO status int exited time.Time pid *safePid @@ -172,29 +172,30 @@ func (e *execProcess) start(ctx context.Context) (err error) { // access e.pid until it is updated. e.pid.Lock() defer e.pid.Unlock() + var ( socket *runc.Socket - pidfile = filepath.Join(e.path, fmt.Sprintf("%s.pid", e.id)) + pio *processIO + pidFile = newExecPidFile(e.path, e.id) ) if e.stdio.Terminal { if socket, err = runc.NewTempConsoleSocket(); err != nil { return errors.Wrap(err, "failed to create runc console socket") } defer socket.Close() - } else if e.stdio.IsNull() { - if e.io, err = runc.NewNullIO(); err != nil { - return errors.Wrap(err, "creating new NULL IO") - } } else { - if e.io, err = runc.NewPipeIO(e.parent.IoUID, e.parent.IoGID, withConditionalIO(e.stdio)); err != nil { - return errors.Wrap(err, "failed to create runc io pipes") + if pio, err = createIO(ctx, e.id, e.parent.IoUID, e.parent.IoGID, e.stdio); err != nil { + return errors.Wrap(err, "failed to create init process I/O") } + e.io = pio } opts := &runc.ExecOpts{ - PidFile: pidfile, - IO: e.io, + PidFile: pidFile.Path(), Detach: true, } + if pio != nil { + opts.IO = pio.IO() + } if socket != nil { opts.ConsoleSocket = socket } @@ -203,14 +204,10 @@ func (e *execProcess) start(ctx context.Context) (err error) { return e.parent.runtimeError(err, "OCI runtime exec failed") } if e.stdio.Stdin != "" { - sc, err := fifo.OpenFifo(context.Background(), e.stdio.Stdin, syscall.O_WRONLY|syscall.O_NONBLOCK, 0) - if err != nil { - return errors.Wrapf(err, "failed to open stdin fifo %s", e.stdio.Stdin) + if err := e.openStdin(e.stdio.Stdin); err != nil { + return err } - e.closers = append(e.closers, sc) - e.stdin = sc } - var copyWaitGroup sync.WaitGroup ctx, cancel := context.WithTimeout(ctx, 30*time.Second) defer cancel() if socket != nil { @@ -218,16 +215,15 @@ func (e *execProcess) start(ctx context.Context) (err error) { if err != nil { return errors.Wrap(err, "failed to retrieve console master") } - if e.console, err = e.parent.Platform.CopyConsole(ctx, console, e.stdio.Stdin, e.stdio.Stdout, e.stdio.Stderr, &e.wg, ©WaitGroup); err != nil { + if e.console, err = e.parent.Platform.CopyConsole(ctx, console, e.stdio.Stdin, e.stdio.Stdout, e.stdio.Stderr, &e.wg); err != nil { return errors.Wrap(err, "failed to start console copy") } - } else if !e.stdio.IsNull() { - if err := copyPipes(ctx, e.io, e.stdio.Stdin, e.stdio.Stdout, e.stdio.Stderr, &e.wg, ©WaitGroup); err != nil { + } else { + if err := pio.Copy(ctx, &e.wg); err != nil { return errors.Wrap(err, "failed to start io pipe copy") } } - copyWaitGroup.Wait() - pid, err := runc.ReadPidFile(opts.PidFile) + pid, err := pidFile.Read() if err != nil { return errors.Wrap(err, "failed to retrieve OCI runtime exec pid") } @@ -235,6 +231,16 @@ func (e *execProcess) start(ctx context.Context) (err error) { return nil } +func (e *execProcess) openStdin(path string) error { + sc, err := fifo.OpenFifo(context.Background(), path, syscall.O_WRONLY|syscall.O_NONBLOCK, 0) + if err != nil { + return errors.Wrapf(err, "failed to open stdin fifo %s", path) + } + e.stdin = sc + e.closers = append(e.closers, sc) + return nil +} + func (e *execProcess) Status(ctx context.Context) (string, error) { s, err := e.parent.Status(ctx) if err != nil { diff --git a/runtime/v1/linux/proc/init.go b/runtime/v1/linux/proc/init.go index 81f6d9514..10787ed87 100644 --- a/runtime/v1/linux/proc/init.go +++ b/runtime/v1/linux/proc/init.go @@ -41,9 +41,6 @@ import ( "github.com/pkg/errors" ) -// InitPidFile name of the file that contains the init pid -const InitPidFile = "init.pid" - // Init represents an initial process for a container type Init struct { wg sync.WaitGroup @@ -63,7 +60,7 @@ type Init struct { Bundle string console console.Console Platform proc.Platform - io runc.IO + io *processIO runtime *runc.Runc status int exited time.Time @@ -111,49 +108,33 @@ func New(id string, runtime *runc.Runc, stdio proc.Stdio) *Init { // Create the process with the provided config func (p *Init) Create(ctx context.Context, r *CreateConfig) error { var ( - err error - socket *runc.Socket + err error + socket *runc.Socket + pio *processIO + pidFile = newPidFile(p.Bundle) ) if r.Terminal { if socket, err = runc.NewTempConsoleSocket(); err != nil { return errors.Wrap(err, "failed to create OCI runtime console socket") } defer socket.Close() - } else if hasNoIO(r) { - if p.io, err = runc.NewNullIO(); err != nil { - return errors.Wrap(err, "creating new NULL IO") - } } else { - if p.io, err = runc.NewPipeIO(p.IoUID, p.IoGID, withConditionalIO(p.stdio)); err != nil { - return errors.Wrap(err, "failed to create OCI runtime io pipes") + if pio, err = createIO(ctx, p.id, p.IoUID, p.IoGID, p.stdio); err != nil { + return errors.Wrap(err, "failed to create init process I/O") } + p.io = pio } - pidFile := filepath.Join(p.Bundle, InitPidFile) if r.Checkpoint != "" { - opts := &runc.RestoreOpts{ - CheckpointOpts: runc.CheckpointOpts{ - ImagePath: r.Checkpoint, - WorkDir: p.CriuWorkPath, - ParentPath: r.ParentCheckpoint, - }, - PidFile: pidFile, - IO: p.io, - NoPivot: p.NoPivotRoot, - Detach: true, - NoSubreaper: true, - } - p.initState = &createdCheckpointState{ - p: p, - opts: opts, - } - return nil + return p.createCheckpointedState(r, pidFile) } opts := &runc.CreateOpts{ - PidFile: pidFile, - IO: p.io, + PidFile: pidFile.Path(), NoPivot: p.NoPivotRoot, NoNewKeyring: p.NoNewKeyring, } + if p.io != nil { + opts.IO = p.io.IO() + } if socket != nil { opts.ConsoleSocket = socket } @@ -161,14 +142,10 @@ func (p *Init) Create(ctx context.Context, r *CreateConfig) error { return p.runtimeError(err, "OCI runtime create failed") } if r.Stdin != "" { - sc, err := fifo.OpenFifo(context.Background(), r.Stdin, syscall.O_WRONLY|syscall.O_NONBLOCK, 0) - if err != nil { - return errors.Wrapf(err, "failed to open stdin fifo %s", r.Stdin) + if err := p.openStdin(r.Stdin); err != nil { + return err } - p.stdin = sc - p.closers = append(p.closers, sc) } - var copyWaitGroup sync.WaitGroup ctx, cancel := context.WithTimeout(ctx, 30*time.Second) defer cancel() if socket != nil { @@ -176,19 +153,17 @@ func (p *Init) Create(ctx context.Context, r *CreateConfig) error { if err != nil { return errors.Wrap(err, "failed to retrieve console master") } - console, err = p.Platform.CopyConsole(ctx, console, r.Stdin, r.Stdout, r.Stderr, &p.wg, ©WaitGroup) + console, err = p.Platform.CopyConsole(ctx, console, r.Stdin, r.Stdout, r.Stderr, &p.wg) if err != nil { return errors.Wrap(err, "failed to start console copy") } p.console = console - } else if !hasNoIO(r) { - if err := copyPipes(ctx, p.io, r.Stdin, r.Stdout, r.Stderr, &p.wg, ©WaitGroup); err != nil { + } else { + if err := pio.Copy(ctx, &p.wg); err != nil { return errors.Wrap(err, "failed to start io pipe copy") } } - - copyWaitGroup.Wait() - pid, err := runc.ReadPidFile(pidFile) + pid, err := pidFile.Read() if err != nil { return errors.Wrap(err, "failed to retrieve OCI runtime container pid") } @@ -196,6 +171,36 @@ func (p *Init) Create(ctx context.Context, r *CreateConfig) error { return nil } +func (p *Init) openStdin(path string) error { + sc, err := fifo.OpenFifo(context.Background(), path, syscall.O_WRONLY|syscall.O_NONBLOCK, 0) + if err != nil { + return errors.Wrapf(err, "failed to open stdin fifo %s", path) + } + p.stdin = sc + p.closers = append(p.closers, sc) + return nil +} + +func (p *Init) createCheckpointedState(r *CreateConfig, pidFile *pidFile) error { + opts := &runc.RestoreOpts{ + CheckpointOpts: runc.CheckpointOpts{ + ImagePath: r.Checkpoint, + WorkDir: p.CriuWorkPath, + ParentPath: r.ParentCheckpoint, + }, + PidFile: pidFile.Path(), + IO: p.io.IO(), + NoPivot: p.NoPivotRoot, + Detach: true, + NoSubreaper: true, + } + p.initState = &createdCheckpointState{ + p: p, + opts: opts, + } + return nil +} + // Wait for the process to exit func (p *Init) Wait() { <-p.waitBlock diff --git a/runtime/v1/linux/proc/init_state.go b/runtime/v1/linux/proc/init_state.go index 62cbe38cd..51849c62b 100644 --- a/runtime/v1/linux/proc/init_state.go +++ b/runtime/v1/linux/proc/init_state.go @@ -20,12 +20,9 @@ package proc import ( "context" - "sync" - "syscall" "github.com/containerd/console" "github.com/containerd/containerd/runtime/proc" - "github.com/containerd/fifo" runc "github.com/containerd/go-runc" google_protobuf "github.com/gogo/protobuf/types" "github.com/pkg/errors" @@ -172,31 +169,25 @@ func (s *createdCheckpointState) Start(ctx context.Context) error { return p.runtimeError(err, "OCI runtime restore failed") } if sio.Stdin != "" { - sc, err := fifo.OpenFifo(context.Background(), sio.Stdin, syscall.O_WRONLY|syscall.O_NONBLOCK, 0) - if err != nil { + if err := p.openStdin(sio.Stdin); err != nil { return errors.Wrapf(err, "failed to open stdin fifo %s", sio.Stdin) } - p.stdin = sc - p.closers = append(p.closers, sc) } - var copyWaitGroup sync.WaitGroup if socket != nil { console, err := socket.ReceiveMaster() if err != nil { return errors.Wrap(err, "failed to retrieve console master") } - console, err = p.Platform.CopyConsole(ctx, console, sio.Stdin, sio.Stdout, sio.Stderr, &p.wg, ©WaitGroup) + console, err = p.Platform.CopyConsole(ctx, console, sio.Stdin, sio.Stdout, sio.Stderr, &p.wg) if err != nil { return errors.Wrap(err, "failed to start console copy") } p.console = console - } else if !sio.IsNull() { - if err := copyPipes(ctx, p.io, sio.Stdin, sio.Stdout, sio.Stderr, &p.wg, ©WaitGroup); err != nil { + } else { + if err := p.io.Copy(ctx, &p.wg); err != nil { return errors.Wrap(err, "failed to start io pipe copy") } } - - copyWaitGroup.Wait() pid, err := runc.ReadPidFile(s.opts.PidFile) if err != nil { return errors.Wrap(err, "failed to retrieve OCI runtime container pid") diff --git a/runtime/v1/linux/proc/io.go b/runtime/v1/linux/proc/io.go index aed349a86..b37c3c6aa 100644 --- a/runtime/v1/linux/proc/io.go +++ b/runtime/v1/linux/proc/io.go @@ -22,12 +22,18 @@ import ( "context" "fmt" "io" + "net/url" "os" + "os/exec" + "path/filepath" "sync" "syscall" + "github.com/containerd/containerd/namespaces" + "github.com/containerd/containerd/runtime/proc" "github.com/containerd/fifo" runc "github.com/containerd/go-runc" + "github.com/pkg/errors" ) var bufPool = sync.Pool{ @@ -37,6 +43,84 @@ var bufPool = sync.Pool{ }, } +type processIO struct { + io runc.IO + + uri *url.URL + copy bool + stdio proc.Stdio +} + +func (p *processIO) Close() error { + if p.io != nil { + return p.io.Close() + } + return nil +} + +func (p *processIO) IO() runc.IO { + return p.io +} + +func (p *processIO) Copy(ctx context.Context, wg *sync.WaitGroup) error { + if !p.copy { + return nil + } + var cwg sync.WaitGroup + if err := copyPipes(ctx, p.IO(), p.stdio.Stdin, p.stdio.Stdout, p.stdio.Stderr, wg, &cwg); err != nil { + return errors.Wrap(err, "unable to copy pipes") + } + cwg.Wait() + return nil +} + +func createIO(ctx context.Context, id string, ioUID, ioGID int, stdio proc.Stdio) (*processIO, error) { + pio := &processIO{ + stdio: stdio, + } + if stdio.IsNull() { + i, err := runc.NewNullIO() + if err != nil { + return nil, err + } + pio.io = i + return pio, nil + } + u, err := url.Parse(stdio.Stdout) + if err != nil { + return nil, errors.Wrap(err, "unable to parse stdout uri") + } + if u.Scheme == "" { + u.Scheme = "fifo" + } + pio.uri = u + switch u.Scheme { + case "fifo": + pio.copy = true + pio.io, err = runc.NewPipeIO(ioUID, ioGID, withConditionalIO(stdio)) + case "binary": + pio.io, err = newBinaryIO(ctx, id, u) + case "file": + if err := os.MkdirAll(filepath.Dir(u.Host), 0755); err != nil { + return nil, err + } + var f *os.File + f, err = os.OpenFile(u.Host, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + return nil, err + } + f.Close() + pio.copy = true + pio.io, err = runc.NewPipeIO(ioUID, ioGID, withConditionalIO(stdio)) + default: + return nil, errors.Errorf("unknown STDIO scheme %s", u.Scheme) + } + if err != nil { + return nil, err + } + return pio, nil +} + func copyPipes(ctx context.Context, rio runc.IO, stdin, stdout, stderr string, wg, cwg *sync.WaitGroup) error { var sameFile io.WriteCloser for _, i := range []struct { @@ -143,3 +227,134 @@ func isFifo(path string) (bool, error) { } return false, nil } + +func newBinaryIO(ctx context.Context, id string, uri *url.URL) (runc.IO, error) { + ns, err := namespaces.NamespaceRequired(ctx) + if err != nil { + return nil, err + } + var args []string + for k, vs := range uri.Query() { + args = append(args, k) + if len(vs) > 0 { + args = append(args, vs[0]) + } + } + ctx, cancel := context.WithCancel(ctx) + cmd := exec.CommandContext(ctx, uri.Host, args...) + cmd.Env = append(cmd.Env, + "CONTAINER_ID="+id, + "CONTAINER_NAMESPACE="+ns, + ) + out, err := newPipe() + if err != nil { + return nil, err + } + serr, err := newPipe() + if err != nil { + return nil, err + } + r, w, err := os.Pipe() + if err != nil { + return nil, err + } + cmd.ExtraFiles = append(cmd.ExtraFiles, out.r, serr.r, w) + // don't need to register this with the reaper or wait when + // running inside a shim + if err := cmd.Start(); err != nil { + return nil, err + } + // close our side of the pipe after start + w.Close() + // wait for the logging binary to be ready + b := make([]byte, 1) + if _, err := r.Read(b); err != nil && err != io.EOF { + return nil, err + } + return &binaryIO{ + cmd: cmd, + cancel: cancel, + out: out, + err: serr, + }, nil +} + +type binaryIO struct { + cmd *exec.Cmd + cancel func() + out, err *pipe +} + +func (b *binaryIO) CloseAfterStart() (err error) { + for _, v := range []*pipe{ + b.out, + b.err, + } { + if v != nil { + if cerr := v.r.Close(); err == nil { + err = cerr + } + } + } + return err +} + +func (b *binaryIO) Close() (err error) { + b.cancel() + for _, v := range []*pipe{ + b.out, + b.err, + } { + if v != nil { + if cerr := v.Close(); err == nil { + err = cerr + } + } + } + return err +} + +func (b *binaryIO) Stdin() io.WriteCloser { + return nil +} + +func (b *binaryIO) Stdout() io.ReadCloser { + return nil +} + +func (b *binaryIO) Stderr() io.ReadCloser { + return nil +} + +func (b *binaryIO) Set(cmd *exec.Cmd) { + if b.out != nil { + cmd.Stdout = b.out.w + } + if b.err != nil { + cmd.Stderr = b.err.w + } +} + +func newPipe() (*pipe, error) { + r, w, err := os.Pipe() + if err != nil { + return nil, err + } + return &pipe{ + r: r, + w: w, + }, nil +} + +type pipe struct { + r *os.File + w *os.File +} + +func (p *pipe) Close() error { + err := p.w.Close() + if rerr := p.r.Close(); err == nil { + err = rerr + } + return err +} diff --git a/runtime/v1/linux/proc/utils.go b/runtime/v1/linux/proc/utils.go index 075ef2617..75927a4ef 100644 --- a/runtime/v1/linux/proc/utils.go +++ b/runtime/v1/linux/proc/utils.go @@ -20,8 +20,10 @@ package proc import ( "encoding/json" + "fmt" "io" "os" + "path/filepath" "strings" "sync" "time" @@ -114,6 +116,29 @@ func checkKillError(err error) error { return errors.Wrapf(err, "unknown error after kill") } -func hasNoIO(r *CreateConfig) bool { - return r.Stdin == "" && r.Stdout == "" && r.Stderr == "" +// InitPidFile name of the file that contains the init pid +const InitPidFile = "init.pid" + +func newPidFile(bundle string) *pidFile { + return &pidFile{ + path: filepath.Join(bundle, InitPidFile), + } +} + +func newExecPidFile(bundle, id string) *pidFile { + return &pidFile{ + path: filepath.Join(bundle, fmt.Sprintf("%s.pid", id)), + } +} + +type pidFile struct { + path string +} + +func (p *pidFile) Path() string { + return p.path +} + +func (p *pidFile) Read() (int, error) { + return runc.ReadPidFile(p.path) } diff --git a/runtime/v1/shim/service_linux.go b/runtime/v1/shim/service_linux.go index 307e20dab..a4a4b90a3 100644 --- a/runtime/v1/shim/service_linux.go +++ b/runtime/v1/shim/service_linux.go @@ -31,7 +31,7 @@ type linuxPlatform struct { epoller *console.Epoller } -func (p *linuxPlatform) CopyConsole(ctx context.Context, console console.Console, stdin, stdout, stderr string, wg, cwg *sync.WaitGroup) (console.Console, error) { +func (p *linuxPlatform) CopyConsole(ctx context.Context, console console.Console, stdin, stdout, stderr string, wg *sync.WaitGroup) (console.Console, error) { if p.epoller == nil { return nil, errors.New("uninitialized epoller") } @@ -40,6 +40,7 @@ func (p *linuxPlatform) CopyConsole(ctx context.Context, console console.Console if err != nil { return nil, err } + var cwg sync.WaitGroup if stdin != "" { in, err := fifo.OpenFifo(ctx, stdin, syscall.O_RDONLY, 0) @@ -77,6 +78,7 @@ func (p *linuxPlatform) CopyConsole(ctx context.Context, console console.Console outw.Close() wg.Done() }() + cwg.Wait() return epollConsole, nil } diff --git a/runtime/v1/shim/service_unix.go b/runtime/v1/shim/service_unix.go index 708e45c29..3c614531e 100644 --- a/runtime/v1/shim/service_unix.go +++ b/runtime/v1/shim/service_unix.go @@ -31,7 +31,8 @@ import ( type unixPlatform struct { } -func (p *unixPlatform) CopyConsole(ctx context.Context, console console.Console, stdin, stdout, stderr string, wg, cwg *sync.WaitGroup) (console.Console, error) { +func (p *unixPlatform) CopyConsole(ctx context.Context, console console.Console, stdin, stdout, stderr string, wg *sync.WaitGroup) (console.Console, error) { + var cwg sync.WaitGroup if stdin != "" { in, err := fifo.OpenFifo(ctx, stdin, syscall.O_RDONLY, 0) if err != nil { @@ -67,6 +68,7 @@ func (p *unixPlatform) CopyConsole(ctx context.Context, console console.Console, outw.Close() wg.Done() }() + cwg.Wait() return console, nil } diff --git a/runtime/v2/README.md b/runtime/v2/README.md index 61ac6721a..51dcafafa 100644 --- a/runtime/v2/README.md +++ b/runtime/v2/README.md @@ -173,6 +173,68 @@ The Runtime v2 supports an async event model. In order for the an upstream calle | `runtime.TaskExitEventTopic` | MUST (follow `TaskExecStartedEventTopic`) | When an exec (other than the init exec) exits expected or unexpected | | `runtime.TaskDeleteEventTopic` | SHOULD (follow `TaskExitEventTopic` or `TaskExecAddedEventTopic` if never started) | When an exec is removed from a shim | +#### Logging + +Shims may support pluggable logging via STDIO URIs. +Current supported schemes for logging are: + +* fifo - Linux +* binary - Linux & Windows +* file - Linux & Windows +* npipe - Windows + +Binary logging has the abilty to forward a container's STDIO to an external binary for consumption. +A sample logging driver that forwards the container's STDOUT and STDERR to `journald` is: + +```go +package main + +import ( + "bufio" + "context" + "fmt" + "io" + "sync" + + "github.com/containerd/containerd/runtime/v2/logging" + "github.com/coreos/go-systemd/journal" +) + +func main() { + logging.Run(log) +} + +func log(ctx context.Context, config *logging.Config, ready func() error) error { + // construct any log metadata for the container + vars := map[string]string{ + "SYSLOG_IDENTIFIER": fmt.Sprintf("%s:%s", config.Namespace, config.ID), + } + var wg sync.WaitGroup + wg.Add(2) + // forward both stdout and stderr to the journal + go copy(&wg, config.Stdout, journal.PriInfo, vars) + go copy(&wg, config.Stderr, journal.PriErr, vars) + + // signal that we are ready and setup for the container to be started + if err := ready(); err != nil { + return err + } + wg.Wait() + return nil +} + +func copy(wg *sync.WaitGroup, r io.Reader, pri journal.Priority, vars map[string]string) { + defer wg.Done() + s := bufio.NewScanner(r) + for s.Scan() { + if s.Err() != nil { + return + } + journal.Send(s.Text(), pri, vars) + } +} +``` + ### Other #### Unsupported rpcs diff --git a/runtime/v2/logging/logging.go b/runtime/v2/logging/logging.go new file mode 100644 index 000000000..614d6aef9 --- /dev/null +++ b/runtime/v2/logging/logging.go @@ -0,0 +1,77 @@ +// +build !windows + +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package logging + +import ( + "context" + "fmt" + "io" + "os" + "os/signal" + + "golang.org/x/sys/unix" +) + +// Config of the container logs +type Config struct { + ID string + Namespace string + Stdout io.Reader + Stderr io.Reader +} + +// LoggerFunc is implemented by custom v2 logging binaries +type LoggerFunc func(context.Context, *Config, func() error) error + +// Run the logging driver +func Run(fn LoggerFunc) { + ctx, cancel := context.WithCancel(context.Background()) + config := &Config{ + ID: os.Getenv("CONTAINER_ID"), + Namespace: os.Getenv("CONTAINER_NAMESPACE"), + Stdout: os.NewFile(3, "CONTAINER_STDOUT"), + Stderr: os.NewFile(4, "CONTAINER_STDERR"), + } + var ( + s = make(chan os.Signal, 32) + errCh = make(chan error, 1) + wait = os.NewFile(5, "CONTAINER_WAIT") + ) + signal.Notify(s, unix.SIGTERM) + + go func() { + if err := fn(ctx, config, wait.Close); err != nil { + errCh <- err + } + errCh <- nil + }() + + for { + select { + case <-s: + cancel() + case err := <-errCh: + if err != nil { + fmt.Fprintln(os.Stderr, err) + os.Exit(1) + } + os.Exit(0) + } + } +} diff --git a/runtime/v2/runc/platform.go b/runtime/v2/runc/platform.go index d38aa5469..970754dac 100644 --- a/runtime/v2/runc/platform.go +++ b/runtime/v2/runc/platform.go @@ -53,7 +53,7 @@ type linuxPlatform struct { epoller *console.Epoller } -func (p *linuxPlatform) CopyConsole(ctx context.Context, console console.Console, stdin, stdout, stderr string, wg, cwg *sync.WaitGroup) (console.Console, error) { +func (p *linuxPlatform) CopyConsole(ctx context.Context, console console.Console, stdin, stdout, stderr string, wg *sync.WaitGroup) (console.Console, error) { if p.epoller == nil { return nil, errors.New("uninitialized epoller") } @@ -63,6 +63,7 @@ func (p *linuxPlatform) CopyConsole(ctx context.Context, console console.Console return nil, err } + var cwg sync.WaitGroup if stdin != "" { in, err := fifo.OpenFifo(context.Background(), stdin, syscall.O_RDONLY|syscall.O_NONBLOCK, 0) if err != nil { @@ -99,6 +100,7 @@ func (p *linuxPlatform) CopyConsole(ctx context.Context, console console.Console outw.Close() wg.Done() }() + cwg.Wait() return epollConsole, nil }