diff --git a/cio/io.go b/cio/io.go index 1f8abf5f0..133bfcdbe 100644 --- a/cio/io.go +++ b/cio/io.go @@ -20,8 +20,8 @@ import ( "context" "fmt" "io" + "net/url" "os" - "path/filepath" "sync" "github.com/containerd/containerd/defaults" @@ -222,46 +222,76 @@ type DirectIO struct { cio } -var _ IO = &DirectIO{} +var ( + _ IO = &DirectIO{} + _ IO = &logURI{} +) -// LogFile creates a file on disk that logs the task's STDOUT,STDERR. -// If the log file already exists, the logs will be appended to the file. -func LogFile(path string) Creator { +// LogURI provides the raw logging URI +func LogURI(uri *url.URL) Creator { return func(_ string) (IO, error) { - if err := os.MkdirAll(filepath.Dir(path), 0755); err != nil { - return nil, err - } - f, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) - if err != nil { - return nil, err - } - f.Close() - return &logIO{ + return &logURI{ config: Config{ - Stdout: path, - Stderr: path, + Stdout: uri.String(), + Stderr: uri.String(), }, }, nil } } -type logIO struct { +// BinaryIO forwards container STDOUT|STDERR directly to a logging binary +func BinaryIO(binary string, args map[string]string) Creator { + return func(_ string) (IO, error) { + uri := &url.URL{ + Scheme: "binary", + Host: binary, + } + for k, v := range args { + uri.Query().Set(k, v) + } + return &logURI{ + config: Config{ + Stdout: uri.String(), + Stderr: uri.String(), + }, + }, nil + } +} + +// LogFile creates a file on disk that logs the task's STDOUT,STDERR. +// If the log file already exists, the logs will be appended to the file. +func LogFile(path string) Creator { + return func(_ string) (IO, error) { + uri := &url.URL{ + Scheme: "file", + Host: path, + } + return &logURI{ + config: Config{ + Stdout: uri.String(), + Stderr: uri.String(), + }, + }, nil + } +} + +type logURI struct { config Config } -func (l *logIO) Config() Config { +func (l *logURI) Config() Config { return l.config } -func (l *logIO) Cancel() { +func (l *logURI) Cancel() { } -func (l *logIO) Wait() { +func (l *logURI) Wait() { } -func (l *logIO) Close() error { +func (l *logURI) Close() error { return nil } diff --git a/cmd/ctr/commands/run/run.go b/cmd/ctr/commands/run/run.go index 2f01247ac..41f11a1c2 100644 --- a/cmd/ctr/commands/run/run.go +++ b/cmd/ctr/commands/run/run.go @@ -99,6 +99,10 @@ var Command = cli.Command{ Name: "null-io", Usage: "send all IO to /dev/null", }, + cli.StringFlag{ + Name: "log-uri", + Usage: "log uri", + }, cli.BoolFlag{ Name: "detach,d", Usage: "detach from the task after it has started execution", @@ -161,7 +165,7 @@ var Command = cli.Command{ } opts := getNewTaskOpts(context) ioOpts := []cio.Opt{cio.WithFIFODir(context.String("fifo-dir"))} - task, err := tasks.NewTask(ctx, client, container, context.String("checkpoint"), con, context.Bool("null-io"), ioOpts, opts...) + task, err := tasks.NewTask(ctx, client, container, context.String("checkpoint"), con, context.Bool("null-io"), context.String("log-uri"), ioOpts, opts...) if err != nil { return err } diff --git a/cmd/ctr/commands/tasks/start.go b/cmd/ctr/commands/tasks/start.go index 0774d5784..0255cb999 100644 --- a/cmd/ctr/commands/tasks/start.go +++ b/cmd/ctr/commands/tasks/start.go @@ -35,6 +35,10 @@ var startCommand = cli.Command{ Name: "null-io", Usage: "send all IO to /dev/null", }, + cli.StringFlag{ + Name: "log-uri", + Usage: "log uri", + }, cli.StringFlag{ Name: "fifo-dir", Usage: "directory used for storing IO FIFOs", @@ -85,7 +89,7 @@ var startCommand = cli.Command{ } } - task, err := NewTask(ctx, client, container, "", con, context.Bool("null-io"), ioOpts, opts...) + task, err := NewTask(ctx, client, container, "", con, context.Bool("null-io"), context.String("log-uri"), ioOpts, opts...) if err != nil { return err } diff --git a/cmd/ctr/commands/tasks/tasks_unix.go b/cmd/ctr/commands/tasks/tasks_unix.go index e10fe798d..103466482 100644 --- a/cmd/ctr/commands/tasks/tasks_unix.go +++ b/cmd/ctr/commands/tasks/tasks_unix.go @@ -20,6 +20,7 @@ package tasks import ( gocontext "context" + "net/url" "os" "os/signal" @@ -67,7 +68,7 @@ func HandleConsoleResize(ctx gocontext.Context, task resizer, con console.Consol } // NewTask creates a new task -func NewTask(ctx gocontext.Context, client *containerd.Client, container containerd.Container, checkpoint string, con console.Console, nullIO bool, ioOpts []cio.Opt, opts ...containerd.NewTaskOpts) (containerd.Task, error) { +func NewTask(ctx gocontext.Context, client *containerd.Client, container containerd.Container, checkpoint string, con console.Console, nullIO bool, logURI string, ioOpts []cio.Opt, opts ...containerd.NewTaskOpts) (containerd.Task, error) { stdio := cio.NewCreator(append([]cio.Opt{cio.WithStdio}, ioOpts...)...) if checkpoint != "" { im, err := client.GetImage(ctx, checkpoint) @@ -86,6 +87,13 @@ func NewTask(ctx gocontext.Context, client *containerd.Client, container contain } ioCreator = cio.NullIO } + if logURI != "" { + u, err := url.Parse(logURI) + if err != nil { + return nil, err + } + ioCreator = cio.LogURI(u) + } return container.NewTask(ctx, ioCreator, opts...) } diff --git a/cmd/ctr/commands/tasks/tasks_windows.go b/cmd/ctr/commands/tasks/tasks_windows.go index f3831ea3e..b16e8d8c8 100644 --- a/cmd/ctr/commands/tasks/tasks_windows.go +++ b/cmd/ctr/commands/tasks/tasks_windows.go @@ -58,7 +58,7 @@ func HandleConsoleResize(ctx gocontext.Context, task resizer, con console.Consol } // NewTask creates a new task -func NewTask(ctx gocontext.Context, client *containerd.Client, container containerd.Container, _ string, con console.Console, nullIO bool, ioOpts []cio.Opt, opts ...containerd.NewTaskOpts) (containerd.Task, error) { +func NewTask(ctx gocontext.Context, client *containerd.Client, container containerd.Container, _ string, con console.Console, nullIO bool, logURI string, ioOpts []cio.Opt, opts ...containerd.NewTaskOpts) (containerd.Task, error) { var ioCreator cio.Creator if con != nil { if nullIO { diff --git a/runtime/proc/proc.go b/runtime/proc/proc.go index 91ca59bb1..0e8d21b74 100644 --- a/runtime/proc/proc.go +++ b/runtime/proc/proc.go @@ -72,7 +72,7 @@ type Process interface { // platform implementations type Platform interface { CopyConsole(ctx context.Context, console console.Console, stdin, stdout, stderr string, - wg, cwg *sync.WaitGroup) (console.Console, error) + wg *sync.WaitGroup) (console.Console, error) ShutdownConsole(ctx context.Context, console console.Console) error Close() error } diff --git a/runtime/v1/linux/proc/exec.go b/runtime/v1/linux/proc/exec.go index 5dbfbaf62..5ab232ae7 100644 --- a/runtime/v1/linux/proc/exec.go +++ b/runtime/v1/linux/proc/exec.go @@ -46,7 +46,7 @@ type execProcess struct { mu sync.Mutex id string console console.Console - io runc.IO + io *processIO status int exited time.Time pid *safePid @@ -172,29 +172,30 @@ func (e *execProcess) start(ctx context.Context) (err error) { // access e.pid until it is updated. e.pid.Lock() defer e.pid.Unlock() + var ( socket *runc.Socket - pidfile = filepath.Join(e.path, fmt.Sprintf("%s.pid", e.id)) + pio *processIO + pidFile = newExecPidFile(e.path, e.id) ) if e.stdio.Terminal { if socket, err = runc.NewTempConsoleSocket(); err != nil { return errors.Wrap(err, "failed to create runc console socket") } defer socket.Close() - } else if e.stdio.IsNull() { - if e.io, err = runc.NewNullIO(); err != nil { - return errors.Wrap(err, "creating new NULL IO") - } } else { - if e.io, err = runc.NewPipeIO(e.parent.IoUID, e.parent.IoGID, withConditionalIO(e.stdio)); err != nil { - return errors.Wrap(err, "failed to create runc io pipes") + if pio, err = createIO(ctx, e.id, e.parent.IoUID, e.parent.IoGID, e.stdio); err != nil { + return errors.Wrap(err, "failed to create init process I/O") } + e.io = pio } opts := &runc.ExecOpts{ - PidFile: pidfile, - IO: e.io, + PidFile: pidFile.Path(), Detach: true, } + if pio != nil { + opts.IO = pio.IO() + } if socket != nil { opts.ConsoleSocket = socket } @@ -203,14 +204,10 @@ func (e *execProcess) start(ctx context.Context) (err error) { return e.parent.runtimeError(err, "OCI runtime exec failed") } if e.stdio.Stdin != "" { - sc, err := fifo.OpenFifo(context.Background(), e.stdio.Stdin, syscall.O_WRONLY|syscall.O_NONBLOCK, 0) - if err != nil { - return errors.Wrapf(err, "failed to open stdin fifo %s", e.stdio.Stdin) + if err := e.openStdin(e.stdio.Stdin); err != nil { + return err } - e.closers = append(e.closers, sc) - e.stdin = sc } - var copyWaitGroup sync.WaitGroup ctx, cancel := context.WithTimeout(ctx, 30*time.Second) defer cancel() if socket != nil { @@ -218,16 +215,15 @@ func (e *execProcess) start(ctx context.Context) (err error) { if err != nil { return errors.Wrap(err, "failed to retrieve console master") } - if e.console, err = e.parent.Platform.CopyConsole(ctx, console, e.stdio.Stdin, e.stdio.Stdout, e.stdio.Stderr, &e.wg, ©WaitGroup); err != nil { + if e.console, err = e.parent.Platform.CopyConsole(ctx, console, e.stdio.Stdin, e.stdio.Stdout, e.stdio.Stderr, &e.wg); err != nil { return errors.Wrap(err, "failed to start console copy") } - } else if !e.stdio.IsNull() { - if err := copyPipes(ctx, e.io, e.stdio.Stdin, e.stdio.Stdout, e.stdio.Stderr, &e.wg, ©WaitGroup); err != nil { + } else { + if err := pio.Copy(ctx, &e.wg); err != nil { return errors.Wrap(err, "failed to start io pipe copy") } } - copyWaitGroup.Wait() - pid, err := runc.ReadPidFile(opts.PidFile) + pid, err := pidFile.Read() if err != nil { return errors.Wrap(err, "failed to retrieve OCI runtime exec pid") } @@ -235,6 +231,16 @@ func (e *execProcess) start(ctx context.Context) (err error) { return nil } +func (e *execProcess) openStdin(path string) error { + sc, err := fifo.OpenFifo(context.Background(), path, syscall.O_WRONLY|syscall.O_NONBLOCK, 0) + if err != nil { + return errors.Wrapf(err, "failed to open stdin fifo %s", path) + } + e.stdin = sc + e.closers = append(e.closers, sc) + return nil +} + func (e *execProcess) Status(ctx context.Context) (string, error) { s, err := e.parent.Status(ctx) if err != nil { diff --git a/runtime/v1/linux/proc/init.go b/runtime/v1/linux/proc/init.go index 81f6d9514..10787ed87 100644 --- a/runtime/v1/linux/proc/init.go +++ b/runtime/v1/linux/proc/init.go @@ -41,9 +41,6 @@ import ( "github.com/pkg/errors" ) -// InitPidFile name of the file that contains the init pid -const InitPidFile = "init.pid" - // Init represents an initial process for a container type Init struct { wg sync.WaitGroup @@ -63,7 +60,7 @@ type Init struct { Bundle string console console.Console Platform proc.Platform - io runc.IO + io *processIO runtime *runc.Runc status int exited time.Time @@ -111,49 +108,33 @@ func New(id string, runtime *runc.Runc, stdio proc.Stdio) *Init { // Create the process with the provided config func (p *Init) Create(ctx context.Context, r *CreateConfig) error { var ( - err error - socket *runc.Socket + err error + socket *runc.Socket + pio *processIO + pidFile = newPidFile(p.Bundle) ) if r.Terminal { if socket, err = runc.NewTempConsoleSocket(); err != nil { return errors.Wrap(err, "failed to create OCI runtime console socket") } defer socket.Close() - } else if hasNoIO(r) { - if p.io, err = runc.NewNullIO(); err != nil { - return errors.Wrap(err, "creating new NULL IO") - } } else { - if p.io, err = runc.NewPipeIO(p.IoUID, p.IoGID, withConditionalIO(p.stdio)); err != nil { - return errors.Wrap(err, "failed to create OCI runtime io pipes") + if pio, err = createIO(ctx, p.id, p.IoUID, p.IoGID, p.stdio); err != nil { + return errors.Wrap(err, "failed to create init process I/O") } + p.io = pio } - pidFile := filepath.Join(p.Bundle, InitPidFile) if r.Checkpoint != "" { - opts := &runc.RestoreOpts{ - CheckpointOpts: runc.CheckpointOpts{ - ImagePath: r.Checkpoint, - WorkDir: p.CriuWorkPath, - ParentPath: r.ParentCheckpoint, - }, - PidFile: pidFile, - IO: p.io, - NoPivot: p.NoPivotRoot, - Detach: true, - NoSubreaper: true, - } - p.initState = &createdCheckpointState{ - p: p, - opts: opts, - } - return nil + return p.createCheckpointedState(r, pidFile) } opts := &runc.CreateOpts{ - PidFile: pidFile, - IO: p.io, + PidFile: pidFile.Path(), NoPivot: p.NoPivotRoot, NoNewKeyring: p.NoNewKeyring, } + if p.io != nil { + opts.IO = p.io.IO() + } if socket != nil { opts.ConsoleSocket = socket } @@ -161,14 +142,10 @@ func (p *Init) Create(ctx context.Context, r *CreateConfig) error { return p.runtimeError(err, "OCI runtime create failed") } if r.Stdin != "" { - sc, err := fifo.OpenFifo(context.Background(), r.Stdin, syscall.O_WRONLY|syscall.O_NONBLOCK, 0) - if err != nil { - return errors.Wrapf(err, "failed to open stdin fifo %s", r.Stdin) + if err := p.openStdin(r.Stdin); err != nil { + return err } - p.stdin = sc - p.closers = append(p.closers, sc) } - var copyWaitGroup sync.WaitGroup ctx, cancel := context.WithTimeout(ctx, 30*time.Second) defer cancel() if socket != nil { @@ -176,19 +153,17 @@ func (p *Init) Create(ctx context.Context, r *CreateConfig) error { if err != nil { return errors.Wrap(err, "failed to retrieve console master") } - console, err = p.Platform.CopyConsole(ctx, console, r.Stdin, r.Stdout, r.Stderr, &p.wg, ©WaitGroup) + console, err = p.Platform.CopyConsole(ctx, console, r.Stdin, r.Stdout, r.Stderr, &p.wg) if err != nil { return errors.Wrap(err, "failed to start console copy") } p.console = console - } else if !hasNoIO(r) { - if err := copyPipes(ctx, p.io, r.Stdin, r.Stdout, r.Stderr, &p.wg, ©WaitGroup); err != nil { + } else { + if err := pio.Copy(ctx, &p.wg); err != nil { return errors.Wrap(err, "failed to start io pipe copy") } } - - copyWaitGroup.Wait() - pid, err := runc.ReadPidFile(pidFile) + pid, err := pidFile.Read() if err != nil { return errors.Wrap(err, "failed to retrieve OCI runtime container pid") } @@ -196,6 +171,36 @@ func (p *Init) Create(ctx context.Context, r *CreateConfig) error { return nil } +func (p *Init) openStdin(path string) error { + sc, err := fifo.OpenFifo(context.Background(), path, syscall.O_WRONLY|syscall.O_NONBLOCK, 0) + if err != nil { + return errors.Wrapf(err, "failed to open stdin fifo %s", path) + } + p.stdin = sc + p.closers = append(p.closers, sc) + return nil +} + +func (p *Init) createCheckpointedState(r *CreateConfig, pidFile *pidFile) error { + opts := &runc.RestoreOpts{ + CheckpointOpts: runc.CheckpointOpts{ + ImagePath: r.Checkpoint, + WorkDir: p.CriuWorkPath, + ParentPath: r.ParentCheckpoint, + }, + PidFile: pidFile.Path(), + IO: p.io.IO(), + NoPivot: p.NoPivotRoot, + Detach: true, + NoSubreaper: true, + } + p.initState = &createdCheckpointState{ + p: p, + opts: opts, + } + return nil +} + // Wait for the process to exit func (p *Init) Wait() { <-p.waitBlock diff --git a/runtime/v1/linux/proc/init_state.go b/runtime/v1/linux/proc/init_state.go index 62cbe38cd..51849c62b 100644 --- a/runtime/v1/linux/proc/init_state.go +++ b/runtime/v1/linux/proc/init_state.go @@ -20,12 +20,9 @@ package proc import ( "context" - "sync" - "syscall" "github.com/containerd/console" "github.com/containerd/containerd/runtime/proc" - "github.com/containerd/fifo" runc "github.com/containerd/go-runc" google_protobuf "github.com/gogo/protobuf/types" "github.com/pkg/errors" @@ -172,31 +169,25 @@ func (s *createdCheckpointState) Start(ctx context.Context) error { return p.runtimeError(err, "OCI runtime restore failed") } if sio.Stdin != "" { - sc, err := fifo.OpenFifo(context.Background(), sio.Stdin, syscall.O_WRONLY|syscall.O_NONBLOCK, 0) - if err != nil { + if err := p.openStdin(sio.Stdin); err != nil { return errors.Wrapf(err, "failed to open stdin fifo %s", sio.Stdin) } - p.stdin = sc - p.closers = append(p.closers, sc) } - var copyWaitGroup sync.WaitGroup if socket != nil { console, err := socket.ReceiveMaster() if err != nil { return errors.Wrap(err, "failed to retrieve console master") } - console, err = p.Platform.CopyConsole(ctx, console, sio.Stdin, sio.Stdout, sio.Stderr, &p.wg, ©WaitGroup) + console, err = p.Platform.CopyConsole(ctx, console, sio.Stdin, sio.Stdout, sio.Stderr, &p.wg) if err != nil { return errors.Wrap(err, "failed to start console copy") } p.console = console - } else if !sio.IsNull() { - if err := copyPipes(ctx, p.io, sio.Stdin, sio.Stdout, sio.Stderr, &p.wg, ©WaitGroup); err != nil { + } else { + if err := p.io.Copy(ctx, &p.wg); err != nil { return errors.Wrap(err, "failed to start io pipe copy") } } - - copyWaitGroup.Wait() pid, err := runc.ReadPidFile(s.opts.PidFile) if err != nil { return errors.Wrap(err, "failed to retrieve OCI runtime container pid") diff --git a/runtime/v1/linux/proc/io.go b/runtime/v1/linux/proc/io.go index aed349a86..b37c3c6aa 100644 --- a/runtime/v1/linux/proc/io.go +++ b/runtime/v1/linux/proc/io.go @@ -22,12 +22,18 @@ import ( "context" "fmt" "io" + "net/url" "os" + "os/exec" + "path/filepath" "sync" "syscall" + "github.com/containerd/containerd/namespaces" + "github.com/containerd/containerd/runtime/proc" "github.com/containerd/fifo" runc "github.com/containerd/go-runc" + "github.com/pkg/errors" ) var bufPool = sync.Pool{ @@ -37,6 +43,84 @@ var bufPool = sync.Pool{ }, } +type processIO struct { + io runc.IO + + uri *url.URL + copy bool + stdio proc.Stdio +} + +func (p *processIO) Close() error { + if p.io != nil { + return p.io.Close() + } + return nil +} + +func (p *processIO) IO() runc.IO { + return p.io +} + +func (p *processIO) Copy(ctx context.Context, wg *sync.WaitGroup) error { + if !p.copy { + return nil + } + var cwg sync.WaitGroup + if err := copyPipes(ctx, p.IO(), p.stdio.Stdin, p.stdio.Stdout, p.stdio.Stderr, wg, &cwg); err != nil { + return errors.Wrap(err, "unable to copy pipes") + } + cwg.Wait() + return nil +} + +func createIO(ctx context.Context, id string, ioUID, ioGID int, stdio proc.Stdio) (*processIO, error) { + pio := &processIO{ + stdio: stdio, + } + if stdio.IsNull() { + i, err := runc.NewNullIO() + if err != nil { + return nil, err + } + pio.io = i + return pio, nil + } + u, err := url.Parse(stdio.Stdout) + if err != nil { + return nil, errors.Wrap(err, "unable to parse stdout uri") + } + if u.Scheme == "" { + u.Scheme = "fifo" + } + pio.uri = u + switch u.Scheme { + case "fifo": + pio.copy = true + pio.io, err = runc.NewPipeIO(ioUID, ioGID, withConditionalIO(stdio)) + case "binary": + pio.io, err = newBinaryIO(ctx, id, u) + case "file": + if err := os.MkdirAll(filepath.Dir(u.Host), 0755); err != nil { + return nil, err + } + var f *os.File + f, err = os.OpenFile(u.Host, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + return nil, err + } + f.Close() + pio.copy = true + pio.io, err = runc.NewPipeIO(ioUID, ioGID, withConditionalIO(stdio)) + default: + return nil, errors.Errorf("unknown STDIO scheme %s", u.Scheme) + } + if err != nil { + return nil, err + } + return pio, nil +} + func copyPipes(ctx context.Context, rio runc.IO, stdin, stdout, stderr string, wg, cwg *sync.WaitGroup) error { var sameFile io.WriteCloser for _, i := range []struct { @@ -143,3 +227,134 @@ func isFifo(path string) (bool, error) { } return false, nil } + +func newBinaryIO(ctx context.Context, id string, uri *url.URL) (runc.IO, error) { + ns, err := namespaces.NamespaceRequired(ctx) + if err != nil { + return nil, err + } + var args []string + for k, vs := range uri.Query() { + args = append(args, k) + if len(vs) > 0 { + args = append(args, vs[0]) + } + } + ctx, cancel := context.WithCancel(ctx) + cmd := exec.CommandContext(ctx, uri.Host, args...) + cmd.Env = append(cmd.Env, + "CONTAINER_ID="+id, + "CONTAINER_NAMESPACE="+ns, + ) + out, err := newPipe() + if err != nil { + return nil, err + } + serr, err := newPipe() + if err != nil { + return nil, err + } + r, w, err := os.Pipe() + if err != nil { + return nil, err + } + cmd.ExtraFiles = append(cmd.ExtraFiles, out.r, serr.r, w) + // don't need to register this with the reaper or wait when + // running inside a shim + if err := cmd.Start(); err != nil { + return nil, err + } + // close our side of the pipe after start + w.Close() + // wait for the logging binary to be ready + b := make([]byte, 1) + if _, err := r.Read(b); err != nil && err != io.EOF { + return nil, err + } + return &binaryIO{ + cmd: cmd, + cancel: cancel, + out: out, + err: serr, + }, nil +} + +type binaryIO struct { + cmd *exec.Cmd + cancel func() + out, err *pipe +} + +func (b *binaryIO) CloseAfterStart() (err error) { + for _, v := range []*pipe{ + b.out, + b.err, + } { + if v != nil { + if cerr := v.r.Close(); err == nil { + err = cerr + } + } + } + return err +} + +func (b *binaryIO) Close() (err error) { + b.cancel() + for _, v := range []*pipe{ + b.out, + b.err, + } { + if v != nil { + if cerr := v.Close(); err == nil { + err = cerr + } + } + } + return err +} + +func (b *binaryIO) Stdin() io.WriteCloser { + return nil +} + +func (b *binaryIO) Stdout() io.ReadCloser { + return nil +} + +func (b *binaryIO) Stderr() io.ReadCloser { + return nil +} + +func (b *binaryIO) Set(cmd *exec.Cmd) { + if b.out != nil { + cmd.Stdout = b.out.w + } + if b.err != nil { + cmd.Stderr = b.err.w + } +} + +func newPipe() (*pipe, error) { + r, w, err := os.Pipe() + if err != nil { + return nil, err + } + return &pipe{ + r: r, + w: w, + }, nil +} + +type pipe struct { + r *os.File + w *os.File +} + +func (p *pipe) Close() error { + err := p.w.Close() + if rerr := p.r.Close(); err == nil { + err = rerr + } + return err +} diff --git a/runtime/v1/linux/proc/utils.go b/runtime/v1/linux/proc/utils.go index 075ef2617..75927a4ef 100644 --- a/runtime/v1/linux/proc/utils.go +++ b/runtime/v1/linux/proc/utils.go @@ -20,8 +20,10 @@ package proc import ( "encoding/json" + "fmt" "io" "os" + "path/filepath" "strings" "sync" "time" @@ -114,6 +116,29 @@ func checkKillError(err error) error { return errors.Wrapf(err, "unknown error after kill") } -func hasNoIO(r *CreateConfig) bool { - return r.Stdin == "" && r.Stdout == "" && r.Stderr == "" +// InitPidFile name of the file that contains the init pid +const InitPidFile = "init.pid" + +func newPidFile(bundle string) *pidFile { + return &pidFile{ + path: filepath.Join(bundle, InitPidFile), + } +} + +func newExecPidFile(bundle, id string) *pidFile { + return &pidFile{ + path: filepath.Join(bundle, fmt.Sprintf("%s.pid", id)), + } +} + +type pidFile struct { + path string +} + +func (p *pidFile) Path() string { + return p.path +} + +func (p *pidFile) Read() (int, error) { + return runc.ReadPidFile(p.path) } diff --git a/runtime/v1/shim/service_linux.go b/runtime/v1/shim/service_linux.go index 307e20dab..a4a4b90a3 100644 --- a/runtime/v1/shim/service_linux.go +++ b/runtime/v1/shim/service_linux.go @@ -31,7 +31,7 @@ type linuxPlatform struct { epoller *console.Epoller } -func (p *linuxPlatform) CopyConsole(ctx context.Context, console console.Console, stdin, stdout, stderr string, wg, cwg *sync.WaitGroup) (console.Console, error) { +func (p *linuxPlatform) CopyConsole(ctx context.Context, console console.Console, stdin, stdout, stderr string, wg *sync.WaitGroup) (console.Console, error) { if p.epoller == nil { return nil, errors.New("uninitialized epoller") } @@ -40,6 +40,7 @@ func (p *linuxPlatform) CopyConsole(ctx context.Context, console console.Console if err != nil { return nil, err } + var cwg sync.WaitGroup if stdin != "" { in, err := fifo.OpenFifo(ctx, stdin, syscall.O_RDONLY, 0) @@ -77,6 +78,7 @@ func (p *linuxPlatform) CopyConsole(ctx context.Context, console console.Console outw.Close() wg.Done() }() + cwg.Wait() return epollConsole, nil } diff --git a/runtime/v1/shim/service_unix.go b/runtime/v1/shim/service_unix.go index 708e45c29..3c614531e 100644 --- a/runtime/v1/shim/service_unix.go +++ b/runtime/v1/shim/service_unix.go @@ -31,7 +31,8 @@ import ( type unixPlatform struct { } -func (p *unixPlatform) CopyConsole(ctx context.Context, console console.Console, stdin, stdout, stderr string, wg, cwg *sync.WaitGroup) (console.Console, error) { +func (p *unixPlatform) CopyConsole(ctx context.Context, console console.Console, stdin, stdout, stderr string, wg *sync.WaitGroup) (console.Console, error) { + var cwg sync.WaitGroup if stdin != "" { in, err := fifo.OpenFifo(ctx, stdin, syscall.O_RDONLY, 0) if err != nil { @@ -67,6 +68,7 @@ func (p *unixPlatform) CopyConsole(ctx context.Context, console console.Console, outw.Close() wg.Done() }() + cwg.Wait() return console, nil } diff --git a/runtime/v2/README.md b/runtime/v2/README.md index 61ac6721a..51dcafafa 100644 --- a/runtime/v2/README.md +++ b/runtime/v2/README.md @@ -173,6 +173,68 @@ The Runtime v2 supports an async event model. In order for the an upstream calle | `runtime.TaskExitEventTopic` | MUST (follow `TaskExecStartedEventTopic`) | When an exec (other than the init exec) exits expected or unexpected | | `runtime.TaskDeleteEventTopic` | SHOULD (follow `TaskExitEventTopic` or `TaskExecAddedEventTopic` if never started) | When an exec is removed from a shim | +#### Logging + +Shims may support pluggable logging via STDIO URIs. +Current supported schemes for logging are: + +* fifo - Linux +* binary - Linux & Windows +* file - Linux & Windows +* npipe - Windows + +Binary logging has the abilty to forward a container's STDIO to an external binary for consumption. +A sample logging driver that forwards the container's STDOUT and STDERR to `journald` is: + +```go +package main + +import ( + "bufio" + "context" + "fmt" + "io" + "sync" + + "github.com/containerd/containerd/runtime/v2/logging" + "github.com/coreos/go-systemd/journal" +) + +func main() { + logging.Run(log) +} + +func log(ctx context.Context, config *logging.Config, ready func() error) error { + // construct any log metadata for the container + vars := map[string]string{ + "SYSLOG_IDENTIFIER": fmt.Sprintf("%s:%s", config.Namespace, config.ID), + } + var wg sync.WaitGroup + wg.Add(2) + // forward both stdout and stderr to the journal + go copy(&wg, config.Stdout, journal.PriInfo, vars) + go copy(&wg, config.Stderr, journal.PriErr, vars) + + // signal that we are ready and setup for the container to be started + if err := ready(); err != nil { + return err + } + wg.Wait() + return nil +} + +func copy(wg *sync.WaitGroup, r io.Reader, pri journal.Priority, vars map[string]string) { + defer wg.Done() + s := bufio.NewScanner(r) + for s.Scan() { + if s.Err() != nil { + return + } + journal.Send(s.Text(), pri, vars) + } +} +``` + ### Other #### Unsupported rpcs diff --git a/runtime/v2/logging/logging.go b/runtime/v2/logging/logging.go new file mode 100644 index 000000000..614d6aef9 --- /dev/null +++ b/runtime/v2/logging/logging.go @@ -0,0 +1,77 @@ +// +build !windows + +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package logging + +import ( + "context" + "fmt" + "io" + "os" + "os/signal" + + "golang.org/x/sys/unix" +) + +// Config of the container logs +type Config struct { + ID string + Namespace string + Stdout io.Reader + Stderr io.Reader +} + +// LoggerFunc is implemented by custom v2 logging binaries +type LoggerFunc func(context.Context, *Config, func() error) error + +// Run the logging driver +func Run(fn LoggerFunc) { + ctx, cancel := context.WithCancel(context.Background()) + config := &Config{ + ID: os.Getenv("CONTAINER_ID"), + Namespace: os.Getenv("CONTAINER_NAMESPACE"), + Stdout: os.NewFile(3, "CONTAINER_STDOUT"), + Stderr: os.NewFile(4, "CONTAINER_STDERR"), + } + var ( + s = make(chan os.Signal, 32) + errCh = make(chan error, 1) + wait = os.NewFile(5, "CONTAINER_WAIT") + ) + signal.Notify(s, unix.SIGTERM) + + go func() { + if err := fn(ctx, config, wait.Close); err != nil { + errCh <- err + } + errCh <- nil + }() + + for { + select { + case <-s: + cancel() + case err := <-errCh: + if err != nil { + fmt.Fprintln(os.Stderr, err) + os.Exit(1) + } + os.Exit(0) + } + } +} diff --git a/runtime/v2/runc/platform.go b/runtime/v2/runc/platform.go index d38aa5469..970754dac 100644 --- a/runtime/v2/runc/platform.go +++ b/runtime/v2/runc/platform.go @@ -53,7 +53,7 @@ type linuxPlatform struct { epoller *console.Epoller } -func (p *linuxPlatform) CopyConsole(ctx context.Context, console console.Console, stdin, stdout, stderr string, wg, cwg *sync.WaitGroup) (console.Console, error) { +func (p *linuxPlatform) CopyConsole(ctx context.Context, console console.Console, stdin, stdout, stderr string, wg *sync.WaitGroup) (console.Console, error) { if p.epoller == nil { return nil, errors.New("uninitialized epoller") } @@ -63,6 +63,7 @@ func (p *linuxPlatform) CopyConsole(ctx context.Context, console console.Console return nil, err } + var cwg sync.WaitGroup if stdin != "" { in, err := fifo.OpenFifo(context.Background(), stdin, syscall.O_RDONLY|syscall.O_NONBLOCK, 0) if err != nil { @@ -99,6 +100,7 @@ func (p *linuxPlatform) CopyConsole(ctx context.Context, console console.Console outw.Close() wg.Done() }() + cwg.Wait() return epollConsole, nil }