diff --git a/cio/io.go b/cio/io.go index 663799941..003c6632e 100644 --- a/cio/io.go +++ b/cio/io.go @@ -260,6 +260,26 @@ func BinaryIO(binary string, args map[string]string) Creator { } } +// TerminalBinaryIO forwards container STDOUT|STDERR directly to a logging binary +// It also sets the terminal option to true +func TerminalBinaryIO(binary string, args map[string]string) Creator { + return func(_ string) (IO, error) { + uri, err := LogURIGenerator("binary", binary, args) + if err != nil { + return nil, err + } + + res := uri.String() + return &logURI{ + config: Config{ + Stdout: res, + Stderr: res, + Terminal: true, + }, + }, nil + } +} + // LogFile creates a file on disk that logs the task's STDOUT,STDERR. // If the log file already exists, the logs will be appended to the file. func LogFile(path string) Creator { diff --git a/pkg/process/exec.go b/pkg/process/exec.go index e77e79ae3..7790a4979 100644 --- a/pkg/process/exec.go +++ b/pkg/process/exec.go @@ -221,7 +221,7 @@ func (e *execProcess) start(ctx context.Context) (err error) { if err != nil { return errors.Wrap(err, "failed to retrieve console master") } - if e.console, err = e.parent.Platform.CopyConsole(ctx, console, e.stdio.Stdin, e.stdio.Stdout, e.stdio.Stderr, &e.wg); err != nil { + if e.console, err = e.parent.Platform.CopyConsole(ctx, console, e.id, e.stdio.Stdin, e.stdio.Stdout, e.stdio.Stderr, &e.wg); err != nil { return errors.Wrap(err, "failed to start console copy") } } else { diff --git a/pkg/process/init.go b/pkg/process/init.go index 5cb22460a..28ca5ac6a 100644 --- a/pkg/process/init.go +++ b/pkg/process/init.go @@ -157,7 +157,7 @@ func (p *Init) Create(ctx context.Context, r *CreateConfig) error { if err != nil { return errors.Wrap(err, "failed to retrieve console master") } - console, err = p.Platform.CopyConsole(ctx, console, r.Stdin, r.Stdout, r.Stderr, &p.wg) + console, err = p.Platform.CopyConsole(ctx, console, p.id, r.Stdin, r.Stdout, r.Stderr, &p.wg) if err != nil { return errors.Wrap(err, "failed to start console copy") } diff --git a/pkg/process/init_state.go b/pkg/process/init_state.go index 81c6488a0..5273a5d73 100644 --- a/pkg/process/init_state.go +++ b/pkg/process/init_state.go @@ -172,7 +172,7 @@ func (s *createdCheckpointState) Start(ctx context.Context) error { if err != nil { return errors.Wrap(err, "failed to retrieve console master") } - console, err = p.Platform.CopyConsole(ctx, console, sio.Stdin, sio.Stdout, sio.Stderr, &p.wg) + console, err = p.Platform.CopyConsole(ctx, console, p.id, sio.Stdin, sio.Stdout, sio.Stderr, &p.wg) if err != nil { return errors.Wrap(err, "failed to start console copy") } diff --git a/pkg/stdio/platform.go b/pkg/stdio/platform.go index 6e1b27cfa..ca8688fc0 100644 --- a/pkg/stdio/platform.go +++ b/pkg/stdio/platform.go @@ -26,7 +26,7 @@ import ( // Platform handles platform-specific behavior that may differs across // platform implementations type Platform interface { - CopyConsole(ctx context.Context, console console.Console, stdin, stdout, stderr string, + CopyConsole(ctx context.Context, console console.Console, id, stdin, stdout, stderr string, wg *sync.WaitGroup) (console.Console, error) ShutdownConsole(ctx context.Context, console console.Console) error Close() error diff --git a/runtime/io.go b/runtime/io.go new file mode 100644 index 000000000..19489f1ee --- /dev/null +++ b/runtime/io.go @@ -0,0 +1,58 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package runtime + +import ( + "net/url" + "os" + "os/exec" +) + +type Pipe struct { + R *os.File + W *os.File +} + +func NewPipe() (*Pipe, error) { + R, W, err := os.Pipe() + if err != nil { + return nil, err + } + return &Pipe{ + R: R, + W: W, + }, nil +} + +func NewBinaryCmd(binaryURI *url.URL, id, ns string) *exec.Cmd { + var args []string + for k, vs := range binaryURI.Query() { + args = append(args, k) + if len(vs) > 0 { + args = append(args, vs[0]) + } + } + + cmd := exec.Command(binaryURI.Path, args...) + + cmd.Env = append(cmd.Env, + "CONTAINER_ID="+id, + "CONTAINER_NAMESPACE="+ns, + ) + + return cmd +} diff --git a/runtime/v1/shim/service_linux.go b/runtime/v1/shim/service_linux.go index 65a8666e4..ef084374f 100644 --- a/runtime/v1/shim/service_linux.go +++ b/runtime/v1/shim/service_linux.go @@ -19,10 +19,14 @@ package shim import ( "context" "io" + "net/url" + "os" "sync" "syscall" "github.com/containerd/console" + "github.com/containerd/containerd/namespaces" + "github.com/containerd/containerd/runtime" "github.com/containerd/fifo" "github.com/pkg/errors" ) @@ -31,7 +35,7 @@ type linuxPlatform struct { epoller *console.Epoller } -func (p *linuxPlatform) CopyConsole(ctx context.Context, console console.Console, stdin, stdout, stderr string, wg *sync.WaitGroup) (console.Console, error) { +func (p *linuxPlatform) CopyConsole(ctx context.Context, console console.Console, id, stdin, stdout, stderr string, wg *sync.WaitGroup) (console.Console, error) { if p.epoller == nil { return nil, errors.New("uninitialized epoller") } @@ -59,26 +63,85 @@ func (p *linuxPlatform) CopyConsole(ctx context.Context, console console.Console }() } - outw, err := fifo.OpenFifo(ctx, stdout, syscall.O_WRONLY, 0) + uri, err := url.Parse(stdout) if err != nil { - return nil, err + return nil, errors.Wrap(err, "unable to parse stdout uri") } - outr, err := fifo.OpenFifo(ctx, stdout, syscall.O_RDONLY, 0) - if err != nil { - return nil, err + + switch uri.Scheme { + case "binary": + ns, err := namespaces.NamespaceRequired(ctx) + if err != nil { + return nil, err + } + + cmd := runtime.NewBinaryCmd(uri, id, ns) + + // Create pipe to be used by logging binary for Stdout + out, err := runtime.NewPipe() + if err != nil { + return nil, errors.Wrap(err, "failed to create stdout pipes") + } + + // Stderr is created for logging binary but unused when terminal is true + serr, err := runtime.NewPipe() + if err != nil { + return nil, errors.Wrap(err, "failed to create stderr pipes") + } + + r, w, err := os.Pipe() + if err != nil { + return nil, err + } + + cmd.ExtraFiles = append(cmd.ExtraFiles, out.R, serr.R, w) + + wg.Add(1) + cwg.Add(1) + go func() { + cwg.Done() + io.Copy(out.W, epollConsole) + out.W.Close() + wg.Done() + }() + + if err := cmd.Start(); err != nil { + return nil, errors.Wrap(err, "failed to start logging binary process") + } + + // Close our side of the pipe after start + if err := w.Close(); err != nil { + return nil, errors.Wrap(err, "failed to close write pipe after start") + } + + // Wait for the logging binary to be ready + b := make([]byte, 1) + if _, err := r.Read(b); err != nil && err != io.EOF { + return nil, errors.Wrap(err, "failed to read from logging binary") + } + + default: + outw, err := fifo.OpenFifo(ctx, stdout, syscall.O_WRONLY, 0) + if err != nil { + return nil, err + } + outr, err := fifo.OpenFifo(ctx, stdout, syscall.O_RDONLY, 0) + if err != nil { + return nil, err + } + wg.Add(1) + cwg.Add(1) + go func() { + cwg.Done() + p := bufPool.Get().(*[]byte) + defer bufPool.Put(p) + io.CopyBuffer(outw, epollConsole, *p) + outw.Close() + outr.Close() + wg.Done() + }() + cwg.Wait() } - wg.Add(1) - cwg.Add(1) - go func() { - cwg.Done() - p := bufPool.Get().(*[]byte) - defer bufPool.Put(p) - io.CopyBuffer(outw, epollConsole, *p) - outw.Close() - outr.Close() - wg.Done() - }() - cwg.Wait() return epollConsole, nil } diff --git a/runtime/v1/shim/service_unix.go b/runtime/v1/shim/service_unix.go index 3c614531e..782d200eb 100644 --- a/runtime/v1/shim/service_unix.go +++ b/runtime/v1/shim/service_unix.go @@ -21,17 +21,22 @@ package shim import ( "context" "io" + "net/url" + "os" "sync" "syscall" "github.com/containerd/console" + "github.com/containerd/containerd/namespaces" + "github.com/containerd/containerd/runtime" "github.com/containerd/fifo" + "github.com/pkg/errors" ) type unixPlatform struct { } -func (p *unixPlatform) CopyConsole(ctx context.Context, console console.Console, stdin, stdout, stderr string, wg *sync.WaitGroup) (console.Console, error) { +func (p *unixPlatform) CopyConsole(ctx context.Context, console console.Console, id, stdin, stdout, stderr string, wg *sync.WaitGroup) (console.Console, error) { var cwg sync.WaitGroup if stdin != "" { in, err := fifo.OpenFifo(ctx, stdin, syscall.O_RDONLY, 0) @@ -47,28 +52,85 @@ func (p *unixPlatform) CopyConsole(ctx context.Context, console console.Console, io.CopyBuffer(console, in, *p) }() } - outw, err := fifo.OpenFifo(ctx, stdout, syscall.O_WRONLY, 0) + uri, err := url.Parse(stdout) if err != nil { - return nil, err + return nil, errors.Wrap(err, "unable to parse stdout uri") } - outr, err := fifo.OpenFifo(ctx, stdout, syscall.O_RDONLY, 0) - if err != nil { - return nil, err - } - wg.Add(1) - cwg.Add(1) - go func() { - cwg.Done() - p := bufPool.Get().(*[]byte) - defer bufPool.Put(p) - io.CopyBuffer(outw, console, *p) - console.Close() - outr.Close() - outw.Close() - wg.Done() - }() - cwg.Wait() + switch uri.Scheme { + case "binary": + ns, err := namespaces.NamespaceRequired(ctx) + if err != nil { + return nil, err + } + + cmd := runtime.NewBinaryCmd(uri, id, ns) + + // Create pipe to be used by logging binary for Stdout + out, err := runtime.NewPipe() + if err != nil { + return nil, errors.Wrap(err, "failed to create stdout pipes") + } + + // Stderr is created for logging binary but unused when terminal is true + serr, err := runtime.NewPipe() + if err != nil { + return nil, errors.Wrap(err, "failed to create stderr pipes") + } + + r, w, err := os.Pipe() + if err != nil { + return nil, err + } + + cmd.ExtraFiles = append(cmd.ExtraFiles, out.R, serr.R, w) + + wg.Add(1) + cwg.Add(1) + go func() { + cwg.Done() + io.Copy(out.W, console) + out.W.Close() + wg.Done() + }() + + if err := cmd.Start(); err != nil { + return nil, errors.Wrap(err, "failed to start logging binary process") + } + + // Close our side of the pipe after start + if err := w.Close(); err != nil { + return nil, errors.Wrap(err, "failed to close write pipe after start") + } + + // Wait for the logging binary to be ready + b := make([]byte, 1) + if _, err := r.Read(b); err != nil && err != io.EOF { + return nil, errors.Wrap(err, "failed to read from logging binary") + } + + default: + outw, err := fifo.OpenFifo(ctx, stdout, syscall.O_WRONLY, 0) + if err != nil { + return nil, err + } + outr, err := fifo.OpenFifo(ctx, stdout, syscall.O_RDONLY, 0) + if err != nil { + return nil, err + } + wg.Add(1) + cwg.Add(1) + go func() { + cwg.Done() + p := bufPool.Get().(*[]byte) + defer bufPool.Put(p) + io.CopyBuffer(outw, console, *p) + outw.Close() + outr.Close() + wg.Done() + }() + cwg.Wait() + } return console, nil } diff --git a/runtime/v2/runc/platform.go b/runtime/v2/runc/platform.go index aa5402397..d518dd6b4 100644 --- a/runtime/v2/runc/platform.go +++ b/runtime/v2/runc/platform.go @@ -21,11 +21,15 @@ package runc import ( "context" "io" + "net/url" + "os" "sync" "syscall" "github.com/containerd/console" + "github.com/containerd/containerd/namespaces" "github.com/containerd/containerd/pkg/stdio" + "github.com/containerd/containerd/runtime" "github.com/containerd/fifo" "github.com/pkg/errors" ) @@ -55,7 +59,7 @@ type linuxPlatform struct { epoller *console.Epoller } -func (p *linuxPlatform) CopyConsole(ctx context.Context, console console.Console, stdin, stdout, stderr string, wg *sync.WaitGroup) (console.Console, error) { +func (p *linuxPlatform) CopyConsole(ctx context.Context, console console.Console, id, stdin, stdout, stderr string, wg *sync.WaitGroup) (console.Console, error) { if p.epoller == nil { return nil, errors.New("uninitialized epoller") } @@ -83,27 +87,87 @@ func (p *linuxPlatform) CopyConsole(ctx context.Context, console console.Console }() } - outw, err := fifo.OpenFifo(ctx, stdout, syscall.O_WRONLY, 0) + uri, err := url.Parse(stdout) if err != nil { - return nil, err + return nil, errors.Wrap(err, "unable to parse stdout uri") + } + + switch uri.Scheme { + case "binary": + ns, err := namespaces.NamespaceRequired(ctx) + if err != nil { + return nil, err + } + + cmd := runtime.NewBinaryCmd(uri, id, ns) + + // Create pipe to be used by logging binary for Stdout + out, err := runtime.NewPipe() + if err != nil { + return nil, errors.Wrap(err, "failed to create stdout pipes") + } + + // Stderr is created for logging binary but unused when terminal is true + serr, err := runtime.NewPipe() + if err != nil { + return nil, errors.Wrap(err, "failed to create stderr pipes") + } + + r, w, err := os.Pipe() + if err != nil { + return nil, err + } + + cmd.ExtraFiles = append(cmd.ExtraFiles, out.R, serr.R, w) + + wg.Add(1) + cwg.Add(1) + go func() { + cwg.Done() + io.Copy(out.W, epollConsole) + out.W.Close() + wg.Done() + }() + + if err := cmd.Start(); err != nil { + return nil, errors.Wrap(err, "failed to start logging binary process") + } + + // Close our side of the pipe after start + if err := w.Close(); err != nil { + return nil, errors.Wrap(err, "failed to close write pipe after start") + } + + // Wait for the logging binary to be ready + b := make([]byte, 1) + if _, err := r.Read(b); err != nil && err != io.EOF { + return nil, errors.Wrap(err, "failed to read from logging binary") + } + + default: + outw, err := fifo.OpenFifo(ctx, stdout, syscall.O_WRONLY, 0) + if err != nil { + return nil, err + } + outr, err := fifo.OpenFifo(ctx, stdout, syscall.O_RDONLY, 0) + if err != nil { + return nil, err + } + wg.Add(1) + cwg.Add(1) + go func() { + cwg.Done() + buf := bufPool.Get().(*[]byte) + defer bufPool.Put(buf) + io.CopyBuffer(outw, epollConsole, *buf) + + outw.Close() + outr.Close() + wg.Done() + }() + cwg.Wait() } - outr, err := fifo.OpenFifo(ctx, stdout, syscall.O_RDONLY, 0) - if err != nil { - return nil, err - } - wg.Add(1) - cwg.Add(1) - go func() { - cwg.Done() - buf := bufPool.Get().(*[]byte) - defer bufPool.Put(buf) - io.CopyBuffer(outw, epollConsole, *buf) - outw.Close() - outr.Close() - wg.Done() - }() - cwg.Wait() return epollConsole, nil }