Merge pull request #3085 from crosbymichael/shim-logs

Shim pluggable logging
This commit is contained in:
Phil Estes 2019-03-13 12:23:06 -07:00 committed by GitHub
commit 9ed2c0aa02
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 542 additions and 109 deletions

View File

@ -20,8 +20,8 @@ import (
"context" "context"
"fmt" "fmt"
"io" "io"
"net/url"
"os" "os"
"path/filepath"
"sync" "sync"
"github.com/containerd/containerd/defaults" "github.com/containerd/containerd/defaults"
@ -222,46 +222,76 @@ type DirectIO struct {
cio cio
} }
var _ IO = &DirectIO{} var (
_ IO = &DirectIO{}
_ IO = &logURI{}
)
// LogFile creates a file on disk that logs the task's STDOUT,STDERR. // LogURI provides the raw logging URI
// If the log file already exists, the logs will be appended to the file. func LogURI(uri *url.URL) Creator {
func LogFile(path string) Creator {
return func(_ string) (IO, error) { return func(_ string) (IO, error) {
if err := os.MkdirAll(filepath.Dir(path), 0755); err != nil { return &logURI{
return nil, err
}
f, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return nil, err
}
f.Close()
return &logIO{
config: Config{ config: Config{
Stdout: path, Stdout: uri.String(),
Stderr: path, Stderr: uri.String(),
}, },
}, nil }, nil
} }
} }
type logIO struct { // BinaryIO forwards container STDOUT|STDERR directly to a logging binary
func BinaryIO(binary string, args map[string]string) Creator {
return func(_ string) (IO, error) {
uri := &url.URL{
Scheme: "binary",
Host: binary,
}
for k, v := range args {
uri.Query().Set(k, v)
}
return &logURI{
config: Config{
Stdout: uri.String(),
Stderr: uri.String(),
},
}, nil
}
}
// LogFile creates a file on disk that logs the task's STDOUT,STDERR.
// If the log file already exists, the logs will be appended to the file.
func LogFile(path string) Creator {
return func(_ string) (IO, error) {
uri := &url.URL{
Scheme: "file",
Host: path,
}
return &logURI{
config: Config{
Stdout: uri.String(),
Stderr: uri.String(),
},
}, nil
}
}
type logURI struct {
config Config config Config
} }
func (l *logIO) Config() Config { func (l *logURI) Config() Config {
return l.config return l.config
} }
func (l *logIO) Cancel() { func (l *logURI) Cancel() {
} }
func (l *logIO) Wait() { func (l *logURI) Wait() {
} }
func (l *logIO) Close() error { func (l *logURI) Close() error {
return nil return nil
} }

View File

@ -99,6 +99,10 @@ var Command = cli.Command{
Name: "null-io", Name: "null-io",
Usage: "send all IO to /dev/null", Usage: "send all IO to /dev/null",
}, },
cli.StringFlag{
Name: "log-uri",
Usage: "log uri",
},
cli.BoolFlag{ cli.BoolFlag{
Name: "detach,d", Name: "detach,d",
Usage: "detach from the task after it has started execution", Usage: "detach from the task after it has started execution",
@ -161,7 +165,7 @@ var Command = cli.Command{
} }
opts := getNewTaskOpts(context) opts := getNewTaskOpts(context)
ioOpts := []cio.Opt{cio.WithFIFODir(context.String("fifo-dir"))} ioOpts := []cio.Opt{cio.WithFIFODir(context.String("fifo-dir"))}
task, err := tasks.NewTask(ctx, client, container, context.String("checkpoint"), con, context.Bool("null-io"), ioOpts, opts...) task, err := tasks.NewTask(ctx, client, container, context.String("checkpoint"), con, context.Bool("null-io"), context.String("log-uri"), ioOpts, opts...)
if err != nil { if err != nil {
return err return err
} }

View File

@ -35,6 +35,10 @@ var startCommand = cli.Command{
Name: "null-io", Name: "null-io",
Usage: "send all IO to /dev/null", Usage: "send all IO to /dev/null",
}, },
cli.StringFlag{
Name: "log-uri",
Usage: "log uri",
},
cli.StringFlag{ cli.StringFlag{
Name: "fifo-dir", Name: "fifo-dir",
Usage: "directory used for storing IO FIFOs", Usage: "directory used for storing IO FIFOs",
@ -85,7 +89,7 @@ var startCommand = cli.Command{
} }
} }
task, err := NewTask(ctx, client, container, "", con, context.Bool("null-io"), ioOpts, opts...) task, err := NewTask(ctx, client, container, "", con, context.Bool("null-io"), context.String("log-uri"), ioOpts, opts...)
if err != nil { if err != nil {
return err return err
} }

View File

@ -20,6 +20,7 @@ package tasks
import ( import (
gocontext "context" gocontext "context"
"net/url"
"os" "os"
"os/signal" "os/signal"
@ -67,7 +68,7 @@ func HandleConsoleResize(ctx gocontext.Context, task resizer, con console.Consol
} }
// NewTask creates a new task // NewTask creates a new task
func NewTask(ctx gocontext.Context, client *containerd.Client, container containerd.Container, checkpoint string, con console.Console, nullIO bool, ioOpts []cio.Opt, opts ...containerd.NewTaskOpts) (containerd.Task, error) { func NewTask(ctx gocontext.Context, client *containerd.Client, container containerd.Container, checkpoint string, con console.Console, nullIO bool, logURI string, ioOpts []cio.Opt, opts ...containerd.NewTaskOpts) (containerd.Task, error) {
stdio := cio.NewCreator(append([]cio.Opt{cio.WithStdio}, ioOpts...)...) stdio := cio.NewCreator(append([]cio.Opt{cio.WithStdio}, ioOpts...)...)
if checkpoint != "" { if checkpoint != "" {
im, err := client.GetImage(ctx, checkpoint) im, err := client.GetImage(ctx, checkpoint)
@ -86,6 +87,13 @@ func NewTask(ctx gocontext.Context, client *containerd.Client, container contain
} }
ioCreator = cio.NullIO ioCreator = cio.NullIO
} }
if logURI != "" {
u, err := url.Parse(logURI)
if err != nil {
return nil, err
}
ioCreator = cio.LogURI(u)
}
return container.NewTask(ctx, ioCreator, opts...) return container.NewTask(ctx, ioCreator, opts...)
} }

View File

@ -58,7 +58,7 @@ func HandleConsoleResize(ctx gocontext.Context, task resizer, con console.Consol
} }
// NewTask creates a new task // NewTask creates a new task
func NewTask(ctx gocontext.Context, client *containerd.Client, container containerd.Container, _ string, con console.Console, nullIO bool, ioOpts []cio.Opt, opts ...containerd.NewTaskOpts) (containerd.Task, error) { func NewTask(ctx gocontext.Context, client *containerd.Client, container containerd.Container, _ string, con console.Console, nullIO bool, logURI string, ioOpts []cio.Opt, opts ...containerd.NewTaskOpts) (containerd.Task, error) {
var ioCreator cio.Creator var ioCreator cio.Creator
if con != nil { if con != nil {
if nullIO { if nullIO {

View File

@ -72,7 +72,7 @@ type Process interface {
// platform implementations // platform implementations
type Platform interface { type Platform interface {
CopyConsole(ctx context.Context, console console.Console, stdin, stdout, stderr string, CopyConsole(ctx context.Context, console console.Console, stdin, stdout, stderr string,
wg, cwg *sync.WaitGroup) (console.Console, error) wg *sync.WaitGroup) (console.Console, error)
ShutdownConsole(ctx context.Context, console console.Console) error ShutdownConsole(ctx context.Context, console console.Console) error
Close() error Close() error
} }

View File

@ -46,7 +46,7 @@ type execProcess struct {
mu sync.Mutex mu sync.Mutex
id string id string
console console.Console console console.Console
io runc.IO io *processIO
status int status int
exited time.Time exited time.Time
pid *safePid pid *safePid
@ -172,29 +172,30 @@ func (e *execProcess) start(ctx context.Context) (err error) {
// access e.pid until it is updated. // access e.pid until it is updated.
e.pid.Lock() e.pid.Lock()
defer e.pid.Unlock() defer e.pid.Unlock()
var ( var (
socket *runc.Socket socket *runc.Socket
pidfile = filepath.Join(e.path, fmt.Sprintf("%s.pid", e.id)) pio *processIO
pidFile = newExecPidFile(e.path, e.id)
) )
if e.stdio.Terminal { if e.stdio.Terminal {
if socket, err = runc.NewTempConsoleSocket(); err != nil { if socket, err = runc.NewTempConsoleSocket(); err != nil {
return errors.Wrap(err, "failed to create runc console socket") return errors.Wrap(err, "failed to create runc console socket")
} }
defer socket.Close() defer socket.Close()
} else if e.stdio.IsNull() {
if e.io, err = runc.NewNullIO(); err != nil {
return errors.Wrap(err, "creating new NULL IO")
}
} else { } else {
if e.io, err = runc.NewPipeIO(e.parent.IoUID, e.parent.IoGID, withConditionalIO(e.stdio)); err != nil { if pio, err = createIO(ctx, e.id, e.parent.IoUID, e.parent.IoGID, e.stdio); err != nil {
return errors.Wrap(err, "failed to create runc io pipes") return errors.Wrap(err, "failed to create init process I/O")
} }
e.io = pio
} }
opts := &runc.ExecOpts{ opts := &runc.ExecOpts{
PidFile: pidfile, PidFile: pidFile.Path(),
IO: e.io,
Detach: true, Detach: true,
} }
if pio != nil {
opts.IO = pio.IO()
}
if socket != nil { if socket != nil {
opts.ConsoleSocket = socket opts.ConsoleSocket = socket
} }
@ -203,14 +204,10 @@ func (e *execProcess) start(ctx context.Context) (err error) {
return e.parent.runtimeError(err, "OCI runtime exec failed") return e.parent.runtimeError(err, "OCI runtime exec failed")
} }
if e.stdio.Stdin != "" { if e.stdio.Stdin != "" {
sc, err := fifo.OpenFifo(context.Background(), e.stdio.Stdin, syscall.O_WRONLY|syscall.O_NONBLOCK, 0) if err := e.openStdin(e.stdio.Stdin); err != nil {
if err != nil { return err
return errors.Wrapf(err, "failed to open stdin fifo %s", e.stdio.Stdin)
} }
e.closers = append(e.closers, sc)
e.stdin = sc
} }
var copyWaitGroup sync.WaitGroup
ctx, cancel := context.WithTimeout(ctx, 30*time.Second) ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel() defer cancel()
if socket != nil { if socket != nil {
@ -218,16 +215,15 @@ func (e *execProcess) start(ctx context.Context) (err error) {
if err != nil { if err != nil {
return errors.Wrap(err, "failed to retrieve console master") return errors.Wrap(err, "failed to retrieve console master")
} }
if e.console, err = e.parent.Platform.CopyConsole(ctx, console, e.stdio.Stdin, e.stdio.Stdout, e.stdio.Stderr, &e.wg, &copyWaitGroup); err != nil { if e.console, err = e.parent.Platform.CopyConsole(ctx, console, e.stdio.Stdin, e.stdio.Stdout, e.stdio.Stderr, &e.wg); err != nil {
return errors.Wrap(err, "failed to start console copy") return errors.Wrap(err, "failed to start console copy")
} }
} else if !e.stdio.IsNull() { } else {
if err := copyPipes(ctx, e.io, e.stdio.Stdin, e.stdio.Stdout, e.stdio.Stderr, &e.wg, &copyWaitGroup); err != nil { if err := pio.Copy(ctx, &e.wg); err != nil {
return errors.Wrap(err, "failed to start io pipe copy") return errors.Wrap(err, "failed to start io pipe copy")
} }
} }
copyWaitGroup.Wait() pid, err := pidFile.Read()
pid, err := runc.ReadPidFile(opts.PidFile)
if err != nil { if err != nil {
return errors.Wrap(err, "failed to retrieve OCI runtime exec pid") return errors.Wrap(err, "failed to retrieve OCI runtime exec pid")
} }
@ -235,6 +231,16 @@ func (e *execProcess) start(ctx context.Context) (err error) {
return nil return nil
} }
func (e *execProcess) openStdin(path string) error {
sc, err := fifo.OpenFifo(context.Background(), path, syscall.O_WRONLY|syscall.O_NONBLOCK, 0)
if err != nil {
return errors.Wrapf(err, "failed to open stdin fifo %s", path)
}
e.stdin = sc
e.closers = append(e.closers, sc)
return nil
}
func (e *execProcess) Status(ctx context.Context) (string, error) { func (e *execProcess) Status(ctx context.Context) (string, error) {
s, err := e.parent.Status(ctx) s, err := e.parent.Status(ctx)
if err != nil { if err != nil {

View File

@ -41,9 +41,6 @@ import (
"github.com/pkg/errors" "github.com/pkg/errors"
) )
// InitPidFile name of the file that contains the init pid
const InitPidFile = "init.pid"
// Init represents an initial process for a container // Init represents an initial process for a container
type Init struct { type Init struct {
wg sync.WaitGroup wg sync.WaitGroup
@ -63,7 +60,7 @@ type Init struct {
Bundle string Bundle string
console console.Console console console.Console
Platform proc.Platform Platform proc.Platform
io runc.IO io *processIO
runtime *runc.Runc runtime *runc.Runc
status int status int
exited time.Time exited time.Time
@ -111,49 +108,33 @@ func New(id string, runtime *runc.Runc, stdio proc.Stdio) *Init {
// Create the process with the provided config // Create the process with the provided config
func (p *Init) Create(ctx context.Context, r *CreateConfig) error { func (p *Init) Create(ctx context.Context, r *CreateConfig) error {
var ( var (
err error err error
socket *runc.Socket socket *runc.Socket
pio *processIO
pidFile = newPidFile(p.Bundle)
) )
if r.Terminal { if r.Terminal {
if socket, err = runc.NewTempConsoleSocket(); err != nil { if socket, err = runc.NewTempConsoleSocket(); err != nil {
return errors.Wrap(err, "failed to create OCI runtime console socket") return errors.Wrap(err, "failed to create OCI runtime console socket")
} }
defer socket.Close() defer socket.Close()
} else if hasNoIO(r) {
if p.io, err = runc.NewNullIO(); err != nil {
return errors.Wrap(err, "creating new NULL IO")
}
} else { } else {
if p.io, err = runc.NewPipeIO(p.IoUID, p.IoGID, withConditionalIO(p.stdio)); err != nil { if pio, err = createIO(ctx, p.id, p.IoUID, p.IoGID, p.stdio); err != nil {
return errors.Wrap(err, "failed to create OCI runtime io pipes") return errors.Wrap(err, "failed to create init process I/O")
} }
p.io = pio
} }
pidFile := filepath.Join(p.Bundle, InitPidFile)
if r.Checkpoint != "" { if r.Checkpoint != "" {
opts := &runc.RestoreOpts{ return p.createCheckpointedState(r, pidFile)
CheckpointOpts: runc.CheckpointOpts{
ImagePath: r.Checkpoint,
WorkDir: p.CriuWorkPath,
ParentPath: r.ParentCheckpoint,
},
PidFile: pidFile,
IO: p.io,
NoPivot: p.NoPivotRoot,
Detach: true,
NoSubreaper: true,
}
p.initState = &createdCheckpointState{
p: p,
opts: opts,
}
return nil
} }
opts := &runc.CreateOpts{ opts := &runc.CreateOpts{
PidFile: pidFile, PidFile: pidFile.Path(),
IO: p.io,
NoPivot: p.NoPivotRoot, NoPivot: p.NoPivotRoot,
NoNewKeyring: p.NoNewKeyring, NoNewKeyring: p.NoNewKeyring,
} }
if p.io != nil {
opts.IO = p.io.IO()
}
if socket != nil { if socket != nil {
opts.ConsoleSocket = socket opts.ConsoleSocket = socket
} }
@ -161,14 +142,10 @@ func (p *Init) Create(ctx context.Context, r *CreateConfig) error {
return p.runtimeError(err, "OCI runtime create failed") return p.runtimeError(err, "OCI runtime create failed")
} }
if r.Stdin != "" { if r.Stdin != "" {
sc, err := fifo.OpenFifo(context.Background(), r.Stdin, syscall.O_WRONLY|syscall.O_NONBLOCK, 0) if err := p.openStdin(r.Stdin); err != nil {
if err != nil { return err
return errors.Wrapf(err, "failed to open stdin fifo %s", r.Stdin)
} }
p.stdin = sc
p.closers = append(p.closers, sc)
} }
var copyWaitGroup sync.WaitGroup
ctx, cancel := context.WithTimeout(ctx, 30*time.Second) ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel() defer cancel()
if socket != nil { if socket != nil {
@ -176,19 +153,17 @@ func (p *Init) Create(ctx context.Context, r *CreateConfig) error {
if err != nil { if err != nil {
return errors.Wrap(err, "failed to retrieve console master") return errors.Wrap(err, "failed to retrieve console master")
} }
console, err = p.Platform.CopyConsole(ctx, console, r.Stdin, r.Stdout, r.Stderr, &p.wg, &copyWaitGroup) console, err = p.Platform.CopyConsole(ctx, console, r.Stdin, r.Stdout, r.Stderr, &p.wg)
if err != nil { if err != nil {
return errors.Wrap(err, "failed to start console copy") return errors.Wrap(err, "failed to start console copy")
} }
p.console = console p.console = console
} else if !hasNoIO(r) { } else {
if err := copyPipes(ctx, p.io, r.Stdin, r.Stdout, r.Stderr, &p.wg, &copyWaitGroup); err != nil { if err := pio.Copy(ctx, &p.wg); err != nil {
return errors.Wrap(err, "failed to start io pipe copy") return errors.Wrap(err, "failed to start io pipe copy")
} }
} }
pid, err := pidFile.Read()
copyWaitGroup.Wait()
pid, err := runc.ReadPidFile(pidFile)
if err != nil { if err != nil {
return errors.Wrap(err, "failed to retrieve OCI runtime container pid") return errors.Wrap(err, "failed to retrieve OCI runtime container pid")
} }
@ -196,6 +171,36 @@ func (p *Init) Create(ctx context.Context, r *CreateConfig) error {
return nil return nil
} }
func (p *Init) openStdin(path string) error {
sc, err := fifo.OpenFifo(context.Background(), path, syscall.O_WRONLY|syscall.O_NONBLOCK, 0)
if err != nil {
return errors.Wrapf(err, "failed to open stdin fifo %s", path)
}
p.stdin = sc
p.closers = append(p.closers, sc)
return nil
}
func (p *Init) createCheckpointedState(r *CreateConfig, pidFile *pidFile) error {
opts := &runc.RestoreOpts{
CheckpointOpts: runc.CheckpointOpts{
ImagePath: r.Checkpoint,
WorkDir: p.CriuWorkPath,
ParentPath: r.ParentCheckpoint,
},
PidFile: pidFile.Path(),
IO: p.io.IO(),
NoPivot: p.NoPivotRoot,
Detach: true,
NoSubreaper: true,
}
p.initState = &createdCheckpointState{
p: p,
opts: opts,
}
return nil
}
// Wait for the process to exit // Wait for the process to exit
func (p *Init) Wait() { func (p *Init) Wait() {
<-p.waitBlock <-p.waitBlock

View File

@ -20,12 +20,9 @@ package proc
import ( import (
"context" "context"
"sync"
"syscall"
"github.com/containerd/console" "github.com/containerd/console"
"github.com/containerd/containerd/runtime/proc" "github.com/containerd/containerd/runtime/proc"
"github.com/containerd/fifo"
runc "github.com/containerd/go-runc" runc "github.com/containerd/go-runc"
google_protobuf "github.com/gogo/protobuf/types" google_protobuf "github.com/gogo/protobuf/types"
"github.com/pkg/errors" "github.com/pkg/errors"
@ -172,31 +169,25 @@ func (s *createdCheckpointState) Start(ctx context.Context) error {
return p.runtimeError(err, "OCI runtime restore failed") return p.runtimeError(err, "OCI runtime restore failed")
} }
if sio.Stdin != "" { if sio.Stdin != "" {
sc, err := fifo.OpenFifo(context.Background(), sio.Stdin, syscall.O_WRONLY|syscall.O_NONBLOCK, 0) if err := p.openStdin(sio.Stdin); err != nil {
if err != nil {
return errors.Wrapf(err, "failed to open stdin fifo %s", sio.Stdin) return errors.Wrapf(err, "failed to open stdin fifo %s", sio.Stdin)
} }
p.stdin = sc
p.closers = append(p.closers, sc)
} }
var copyWaitGroup sync.WaitGroup
if socket != nil { if socket != nil {
console, err := socket.ReceiveMaster() console, err := socket.ReceiveMaster()
if err != nil { if err != nil {
return errors.Wrap(err, "failed to retrieve console master") return errors.Wrap(err, "failed to retrieve console master")
} }
console, err = p.Platform.CopyConsole(ctx, console, sio.Stdin, sio.Stdout, sio.Stderr, &p.wg, &copyWaitGroup) console, err = p.Platform.CopyConsole(ctx, console, sio.Stdin, sio.Stdout, sio.Stderr, &p.wg)
if err != nil { if err != nil {
return errors.Wrap(err, "failed to start console copy") return errors.Wrap(err, "failed to start console copy")
} }
p.console = console p.console = console
} else if !sio.IsNull() { } else {
if err := copyPipes(ctx, p.io, sio.Stdin, sio.Stdout, sio.Stderr, &p.wg, &copyWaitGroup); err != nil { if err := p.io.Copy(ctx, &p.wg); err != nil {
return errors.Wrap(err, "failed to start io pipe copy") return errors.Wrap(err, "failed to start io pipe copy")
} }
} }
copyWaitGroup.Wait()
pid, err := runc.ReadPidFile(s.opts.PidFile) pid, err := runc.ReadPidFile(s.opts.PidFile)
if err != nil { if err != nil {
return errors.Wrap(err, "failed to retrieve OCI runtime container pid") return errors.Wrap(err, "failed to retrieve OCI runtime container pid")

View File

@ -22,12 +22,18 @@ import (
"context" "context"
"fmt" "fmt"
"io" "io"
"net/url"
"os" "os"
"os/exec"
"path/filepath"
"sync" "sync"
"syscall" "syscall"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/runtime/proc"
"github.com/containerd/fifo" "github.com/containerd/fifo"
runc "github.com/containerd/go-runc" runc "github.com/containerd/go-runc"
"github.com/pkg/errors"
) )
var bufPool = sync.Pool{ var bufPool = sync.Pool{
@ -37,6 +43,84 @@ var bufPool = sync.Pool{
}, },
} }
type processIO struct {
io runc.IO
uri *url.URL
copy bool
stdio proc.Stdio
}
func (p *processIO) Close() error {
if p.io != nil {
return p.io.Close()
}
return nil
}
func (p *processIO) IO() runc.IO {
return p.io
}
func (p *processIO) Copy(ctx context.Context, wg *sync.WaitGroup) error {
if !p.copy {
return nil
}
var cwg sync.WaitGroup
if err := copyPipes(ctx, p.IO(), p.stdio.Stdin, p.stdio.Stdout, p.stdio.Stderr, wg, &cwg); err != nil {
return errors.Wrap(err, "unable to copy pipes")
}
cwg.Wait()
return nil
}
func createIO(ctx context.Context, id string, ioUID, ioGID int, stdio proc.Stdio) (*processIO, error) {
pio := &processIO{
stdio: stdio,
}
if stdio.IsNull() {
i, err := runc.NewNullIO()
if err != nil {
return nil, err
}
pio.io = i
return pio, nil
}
u, err := url.Parse(stdio.Stdout)
if err != nil {
return nil, errors.Wrap(err, "unable to parse stdout uri")
}
if u.Scheme == "" {
u.Scheme = "fifo"
}
pio.uri = u
switch u.Scheme {
case "fifo":
pio.copy = true
pio.io, err = runc.NewPipeIO(ioUID, ioGID, withConditionalIO(stdio))
case "binary":
pio.io, err = newBinaryIO(ctx, id, u)
case "file":
if err := os.MkdirAll(filepath.Dir(u.Host), 0755); err != nil {
return nil, err
}
var f *os.File
f, err = os.OpenFile(u.Host, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return nil, err
}
f.Close()
pio.copy = true
pio.io, err = runc.NewPipeIO(ioUID, ioGID, withConditionalIO(stdio))
default:
return nil, errors.Errorf("unknown STDIO scheme %s", u.Scheme)
}
if err != nil {
return nil, err
}
return pio, nil
}
func copyPipes(ctx context.Context, rio runc.IO, stdin, stdout, stderr string, wg, cwg *sync.WaitGroup) error { func copyPipes(ctx context.Context, rio runc.IO, stdin, stdout, stderr string, wg, cwg *sync.WaitGroup) error {
var sameFile io.WriteCloser var sameFile io.WriteCloser
for _, i := range []struct { for _, i := range []struct {
@ -143,3 +227,134 @@ func isFifo(path string) (bool, error) {
} }
return false, nil return false, nil
} }
func newBinaryIO(ctx context.Context, id string, uri *url.URL) (runc.IO, error) {
ns, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return nil, err
}
var args []string
for k, vs := range uri.Query() {
args = append(args, k)
if len(vs) > 0 {
args = append(args, vs[0])
}
}
ctx, cancel := context.WithCancel(ctx)
cmd := exec.CommandContext(ctx, uri.Host, args...)
cmd.Env = append(cmd.Env,
"CONTAINER_ID="+id,
"CONTAINER_NAMESPACE="+ns,
)
out, err := newPipe()
if err != nil {
return nil, err
}
serr, err := newPipe()
if err != nil {
return nil, err
}
r, w, err := os.Pipe()
if err != nil {
return nil, err
}
cmd.ExtraFiles = append(cmd.ExtraFiles, out.r, serr.r, w)
// don't need to register this with the reaper or wait when
// running inside a shim
if err := cmd.Start(); err != nil {
return nil, err
}
// close our side of the pipe after start
w.Close()
// wait for the logging binary to be ready
b := make([]byte, 1)
if _, err := r.Read(b); err != nil && err != io.EOF {
return nil, err
}
return &binaryIO{
cmd: cmd,
cancel: cancel,
out: out,
err: serr,
}, nil
}
type binaryIO struct {
cmd *exec.Cmd
cancel func()
out, err *pipe
}
func (b *binaryIO) CloseAfterStart() (err error) {
for _, v := range []*pipe{
b.out,
b.err,
} {
if v != nil {
if cerr := v.r.Close(); err == nil {
err = cerr
}
}
}
return err
}
func (b *binaryIO) Close() (err error) {
b.cancel()
for _, v := range []*pipe{
b.out,
b.err,
} {
if v != nil {
if cerr := v.Close(); err == nil {
err = cerr
}
}
}
return err
}
func (b *binaryIO) Stdin() io.WriteCloser {
return nil
}
func (b *binaryIO) Stdout() io.ReadCloser {
return nil
}
func (b *binaryIO) Stderr() io.ReadCloser {
return nil
}
func (b *binaryIO) Set(cmd *exec.Cmd) {
if b.out != nil {
cmd.Stdout = b.out.w
}
if b.err != nil {
cmd.Stderr = b.err.w
}
}
func newPipe() (*pipe, error) {
r, w, err := os.Pipe()
if err != nil {
return nil, err
}
return &pipe{
r: r,
w: w,
}, nil
}
type pipe struct {
r *os.File
w *os.File
}
func (p *pipe) Close() error {
err := p.w.Close()
if rerr := p.r.Close(); err == nil {
err = rerr
}
return err
}

View File

@ -20,8 +20,10 @@ package proc
import ( import (
"encoding/json" "encoding/json"
"fmt"
"io" "io"
"os" "os"
"path/filepath"
"strings" "strings"
"sync" "sync"
"time" "time"
@ -114,6 +116,29 @@ func checkKillError(err error) error {
return errors.Wrapf(err, "unknown error after kill") return errors.Wrapf(err, "unknown error after kill")
} }
func hasNoIO(r *CreateConfig) bool { // InitPidFile name of the file that contains the init pid
return r.Stdin == "" && r.Stdout == "" && r.Stderr == "" const InitPidFile = "init.pid"
func newPidFile(bundle string) *pidFile {
return &pidFile{
path: filepath.Join(bundle, InitPidFile),
}
}
func newExecPidFile(bundle, id string) *pidFile {
return &pidFile{
path: filepath.Join(bundle, fmt.Sprintf("%s.pid", id)),
}
}
type pidFile struct {
path string
}
func (p *pidFile) Path() string {
return p.path
}
func (p *pidFile) Read() (int, error) {
return runc.ReadPidFile(p.path)
} }

View File

@ -31,7 +31,7 @@ type linuxPlatform struct {
epoller *console.Epoller epoller *console.Epoller
} }
func (p *linuxPlatform) CopyConsole(ctx context.Context, console console.Console, stdin, stdout, stderr string, wg, cwg *sync.WaitGroup) (console.Console, error) { func (p *linuxPlatform) CopyConsole(ctx context.Context, console console.Console, stdin, stdout, stderr string, wg *sync.WaitGroup) (console.Console, error) {
if p.epoller == nil { if p.epoller == nil {
return nil, errors.New("uninitialized epoller") return nil, errors.New("uninitialized epoller")
} }
@ -40,6 +40,7 @@ func (p *linuxPlatform) CopyConsole(ctx context.Context, console console.Console
if err != nil { if err != nil {
return nil, err return nil, err
} }
var cwg sync.WaitGroup
if stdin != "" { if stdin != "" {
in, err := fifo.OpenFifo(ctx, stdin, syscall.O_RDONLY, 0) in, err := fifo.OpenFifo(ctx, stdin, syscall.O_RDONLY, 0)
@ -77,6 +78,7 @@ func (p *linuxPlatform) CopyConsole(ctx context.Context, console console.Console
outw.Close() outw.Close()
wg.Done() wg.Done()
}() }()
cwg.Wait()
return epollConsole, nil return epollConsole, nil
} }

View File

@ -31,7 +31,8 @@ import (
type unixPlatform struct { type unixPlatform struct {
} }
func (p *unixPlatform) CopyConsole(ctx context.Context, console console.Console, stdin, stdout, stderr string, wg, cwg *sync.WaitGroup) (console.Console, error) { func (p *unixPlatform) CopyConsole(ctx context.Context, console console.Console, stdin, stdout, stderr string, wg *sync.WaitGroup) (console.Console, error) {
var cwg sync.WaitGroup
if stdin != "" { if stdin != "" {
in, err := fifo.OpenFifo(ctx, stdin, syscall.O_RDONLY, 0) in, err := fifo.OpenFifo(ctx, stdin, syscall.O_RDONLY, 0)
if err != nil { if err != nil {
@ -67,6 +68,7 @@ func (p *unixPlatform) CopyConsole(ctx context.Context, console console.Console,
outw.Close() outw.Close()
wg.Done() wg.Done()
}() }()
cwg.Wait()
return console, nil return console, nil
} }

View File

@ -173,6 +173,68 @@ The Runtime v2 supports an async event model. In order for the an upstream calle
| `runtime.TaskExitEventTopic` | MUST (follow `TaskExecStartedEventTopic`) | When an exec (other than the init exec) exits expected or unexpected | | `runtime.TaskExitEventTopic` | MUST (follow `TaskExecStartedEventTopic`) | When an exec (other than the init exec) exits expected or unexpected |
| `runtime.TaskDeleteEventTopic` | SHOULD (follow `TaskExitEventTopic` or `TaskExecAddedEventTopic` if never started) | When an exec is removed from a shim | | `runtime.TaskDeleteEventTopic` | SHOULD (follow `TaskExitEventTopic` or `TaskExecAddedEventTopic` if never started) | When an exec is removed from a shim |
#### Logging
Shims may support pluggable logging via STDIO URIs.
Current supported schemes for logging are:
* fifo - Linux
* binary - Linux & Windows
* file - Linux & Windows
* npipe - Windows
Binary logging has the abilty to forward a container's STDIO to an external binary for consumption.
A sample logging driver that forwards the container's STDOUT and STDERR to `journald` is:
```go
package main
import (
"bufio"
"context"
"fmt"
"io"
"sync"
"github.com/containerd/containerd/runtime/v2/logging"
"github.com/coreos/go-systemd/journal"
)
func main() {
logging.Run(log)
}
func log(ctx context.Context, config *logging.Config, ready func() error) error {
// construct any log metadata for the container
vars := map[string]string{
"SYSLOG_IDENTIFIER": fmt.Sprintf("%s:%s", config.Namespace, config.ID),
}
var wg sync.WaitGroup
wg.Add(2)
// forward both stdout and stderr to the journal
go copy(&wg, config.Stdout, journal.PriInfo, vars)
go copy(&wg, config.Stderr, journal.PriErr, vars)
// signal that we are ready and setup for the container to be started
if err := ready(); err != nil {
return err
}
wg.Wait()
return nil
}
func copy(wg *sync.WaitGroup, r io.Reader, pri journal.Priority, vars map[string]string) {
defer wg.Done()
s := bufio.NewScanner(r)
for s.Scan() {
if s.Err() != nil {
return
}
journal.Send(s.Text(), pri, vars)
}
}
```
### Other ### Other
#### Unsupported rpcs #### Unsupported rpcs

View File

@ -0,0 +1,77 @@
// +build !windows
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package logging
import (
"context"
"fmt"
"io"
"os"
"os/signal"
"golang.org/x/sys/unix"
)
// Config of the container logs
type Config struct {
ID string
Namespace string
Stdout io.Reader
Stderr io.Reader
}
// LoggerFunc is implemented by custom v2 logging binaries
type LoggerFunc func(context.Context, *Config, func() error) error
// Run the logging driver
func Run(fn LoggerFunc) {
ctx, cancel := context.WithCancel(context.Background())
config := &Config{
ID: os.Getenv("CONTAINER_ID"),
Namespace: os.Getenv("CONTAINER_NAMESPACE"),
Stdout: os.NewFile(3, "CONTAINER_STDOUT"),
Stderr: os.NewFile(4, "CONTAINER_STDERR"),
}
var (
s = make(chan os.Signal, 32)
errCh = make(chan error, 1)
wait = os.NewFile(5, "CONTAINER_WAIT")
)
signal.Notify(s, unix.SIGTERM)
go func() {
if err := fn(ctx, config, wait.Close); err != nil {
errCh <- err
}
errCh <- nil
}()
for {
select {
case <-s:
cancel()
case err := <-errCh:
if err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
os.Exit(0)
}
}
}

View File

@ -53,7 +53,7 @@ type linuxPlatform struct {
epoller *console.Epoller epoller *console.Epoller
} }
func (p *linuxPlatform) CopyConsole(ctx context.Context, console console.Console, stdin, stdout, stderr string, wg, cwg *sync.WaitGroup) (console.Console, error) { func (p *linuxPlatform) CopyConsole(ctx context.Context, console console.Console, stdin, stdout, stderr string, wg *sync.WaitGroup) (console.Console, error) {
if p.epoller == nil { if p.epoller == nil {
return nil, errors.New("uninitialized epoller") return nil, errors.New("uninitialized epoller")
} }
@ -63,6 +63,7 @@ func (p *linuxPlatform) CopyConsole(ctx context.Context, console console.Console
return nil, err return nil, err
} }
var cwg sync.WaitGroup
if stdin != "" { if stdin != "" {
in, err := fifo.OpenFifo(context.Background(), stdin, syscall.O_RDONLY|syscall.O_NONBLOCK, 0) in, err := fifo.OpenFifo(context.Background(), stdin, syscall.O_RDONLY|syscall.O_NONBLOCK, 0)
if err != nil { if err != nil {
@ -99,6 +100,7 @@ func (p *linuxPlatform) CopyConsole(ctx context.Context, console console.Console
outw.Close() outw.Close()
wg.Done() wg.Done()
}() }()
cwg.Wait()
return epollConsole, nil return epollConsole, nil
} }