diff --git a/pkg/process/io.go b/pkg/process/io.go index d315f44ea..78f86047e 100644 --- a/pkg/process/io.go +++ b/pkg/process/io.go @@ -29,16 +29,23 @@ import ( "sync" "sync/atomic" "syscall" + "time" "github.com/containerd/containerd/log" "github.com/containerd/containerd/namespaces" "github.com/containerd/containerd/pkg/stdio" + "github.com/containerd/containerd/pkg/timeout" "github.com/containerd/containerd/sys" "github.com/containerd/fifo" runc "github.com/containerd/go-runc" + "github.com/hashicorp/go-multierror" "github.com/pkg/errors" ) +const ( + shimLoggerTermTimeout = "io.containerd.timeout.shim.logger.shutdown" +) + var bufPool = sync.Pool{ New: func() interface{} { // setting to 4096 to align with PIPE_BUF @@ -254,86 +261,133 @@ func NewBinaryIO(ctx context.Context, id string, uri *url.URL) (runc.IO, error) args = append(args, vs[0]) } } - ctx, cancel := context.WithCancel(ctx) - cmd := exec.CommandContext(ctx, uri.Path, args...) + + out, err := newPipe() + if err != nil { + return nil, err + } + + serr, err := newPipe() + if err != nil { + return nil, err + } + + r, w, err := os.Pipe() + if err != nil { + return nil, err + } + + cmd := exec.Command(uri.Path, args...) cmd.Env = append(cmd.Env, "CONTAINER_ID="+id, "CONTAINER_NAMESPACE="+ns, ) - out, err := newPipe() - if err != nil { - cancel() - return nil, err - } - serr, err := newPipe() - if err != nil { - cancel() - return nil, err - } - r, w, err := os.Pipe() - if err != nil { - cancel() - return nil, err - } + cmd.ExtraFiles = append(cmd.ExtraFiles, out.r, serr.r, w) // don't need to register this with the reaper or wait when // running inside a shim if err := cmd.Start(); err != nil { - cancel() return nil, err } // close our side of the pipe after start if err := w.Close(); err != nil { - cancel() return nil, err } // wait for the logging binary to be ready b := make([]byte, 1) if _, err := r.Read(b); err != nil && err != io.EOF { - cancel() return nil, err } return &binaryIO{ - cmd: cmd, - cancel: cancel, - out: out, - err: serr, + cmd: cmd, + out: out, + err: serr, }, nil } type binaryIO struct { cmd *exec.Cmd - cancel func() out, err *pipe } -func (b *binaryIO) CloseAfterStart() (err error) { - for _, v := range []*pipe{ - b.out, - b.err, - } { +func (b *binaryIO) CloseAfterStart() error { + var ( + result *multierror.Error + ) + + for _, v := range []*pipe{b.out, b.err} { if v != nil { - if cerr := v.r.Close(); err == nil { - err = cerr + if err := v.r.Close(); err != nil { + result = multierror.Append(result, err) } } } - return err + + return result.ErrorOrNil() } -func (b *binaryIO) Close() (err error) { - b.cancel() - for _, v := range []*pipe{ - b.out, - b.err, - } { +func (b *binaryIO) Close() error { + var ( + result *multierror.Error + ) + + for _, v := range []*pipe{b.out, b.err} { if v != nil { - if cerr := v.Close(); err == nil { - err = cerr + if err := v.Close(); err != nil { + result = multierror.Append(result, err) } } } - return err + + if err := b.cancel(); err != nil { + result = multierror.Append(result, err) + } + + return result.ErrorOrNil() +} + +func (b *binaryIO) cancel() error { + if b.cmd == nil || b.cmd.Process == nil { + return nil + } + + // Send SIGTERM first, so logger process has a chance to flush and exit properly + if err := b.cmd.Process.Signal(syscall.SIGTERM); err != nil { + result := multierror.Append(errors.Wrap(err, "failed to send SIGTERM")) + + log.L.WithError(err).Warn("failed to send SIGTERM signal, killing logging shim") + + if err := b.cmd.Process.Kill(); err != nil { + result = multierror.Append(result, errors.Wrap(err, "failed to kill process after faulty SIGTERM")) + } + + return result.ErrorOrNil() + } + + done := make(chan error) + go func() { + err := b.cmd.Wait() + if err != nil { + err = errors.Wrap(err, "failed to wait for shim logger process after SIGTERM") + } + done <- err + }() + + termTimeout := timeout.Get(shimLoggerTermTimeout) + + select { + case err := <-done: + return err + case <-time.After(termTimeout): + log.L.Warn("failed to wait for shim logger process to exit, killing") + + err := b.cmd.Process.Kill() + if err != nil { + return errors.Wrap(err, "failed to kill shim logger process") + } + + return nil + } } func (b *binaryIO) Stdin() io.WriteCloser { diff --git a/runtime/v2/logging/logging.go b/runtime/v2/logging/logging.go index 614d6aef9..c394864c0 100644 --- a/runtime/v2/logging/logging.go +++ b/runtime/v2/logging/logging.go @@ -42,6 +42,8 @@ type LoggerFunc func(context.Context, *Config, func() error) error // Run the logging driver func Run(fn LoggerFunc) { ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + config := &Config{ ID: os.Getenv("CONTAINER_ID"), Namespace: os.Getenv("CONTAINER_NAMESPACE"), @@ -56,10 +58,7 @@ func Run(fn LoggerFunc) { signal.Notify(s, unix.SIGTERM) go func() { - if err := fn(ctx, config, wait.Close); err != nil { - errCh <- err - } - errCh <- nil + errCh <- fn(ctx, config, wait.Close) }() for {