Rework shim logger shutdown process

Signed-off-by: Maksym Pavlenko <makpav@amazon.com>
This commit is contained in:
Maksym Pavlenko 2020-04-07 12:26:42 -07:00
parent 23fc8597db
commit 0caa233158
2 changed files with 99 additions and 46 deletions

View File

@ -29,16 +29,23 @@ import (
"sync" "sync"
"sync/atomic" "sync/atomic"
"syscall" "syscall"
"time"
"github.com/containerd/containerd/log" "github.com/containerd/containerd/log"
"github.com/containerd/containerd/namespaces" "github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/pkg/stdio" "github.com/containerd/containerd/pkg/stdio"
"github.com/containerd/containerd/pkg/timeout"
"github.com/containerd/containerd/sys" "github.com/containerd/containerd/sys"
"github.com/containerd/fifo" "github.com/containerd/fifo"
runc "github.com/containerd/go-runc" runc "github.com/containerd/go-runc"
"github.com/hashicorp/go-multierror"
"github.com/pkg/errors" "github.com/pkg/errors"
) )
const (
shimLoggerTermTimeout = "io.containerd.timeout.shim.logger.shutdown"
)
var bufPool = sync.Pool{ var bufPool = sync.Pool{
New: func() interface{} { New: func() interface{} {
// setting to 4096 to align with PIPE_BUF // setting to 4096 to align with PIPE_BUF
@ -254,86 +261,133 @@ func NewBinaryIO(ctx context.Context, id string, uri *url.URL) (runc.IO, error)
args = append(args, vs[0]) args = append(args, vs[0])
} }
} }
ctx, cancel := context.WithCancel(ctx)
cmd := exec.CommandContext(ctx, uri.Path, args...) out, err := newPipe()
if err != nil {
return nil, err
}
serr, err := newPipe()
if err != nil {
return nil, err
}
r, w, err := os.Pipe()
if err != nil {
return nil, err
}
cmd := exec.Command(uri.Path, args...)
cmd.Env = append(cmd.Env, cmd.Env = append(cmd.Env,
"CONTAINER_ID="+id, "CONTAINER_ID="+id,
"CONTAINER_NAMESPACE="+ns, "CONTAINER_NAMESPACE="+ns,
) )
out, err := newPipe()
if err != nil {
cancel()
return nil, err
}
serr, err := newPipe()
if err != nil {
cancel()
return nil, err
}
r, w, err := os.Pipe()
if err != nil {
cancel()
return nil, err
}
cmd.ExtraFiles = append(cmd.ExtraFiles, out.r, serr.r, w) cmd.ExtraFiles = append(cmd.ExtraFiles, out.r, serr.r, w)
// don't need to register this with the reaper or wait when // don't need to register this with the reaper or wait when
// running inside a shim // running inside a shim
if err := cmd.Start(); err != nil { if err := cmd.Start(); err != nil {
cancel()
return nil, err return nil, err
} }
// close our side of the pipe after start // close our side of the pipe after start
if err := w.Close(); err != nil { if err := w.Close(); err != nil {
cancel()
return nil, err return nil, err
} }
// wait for the logging binary to be ready // wait for the logging binary to be ready
b := make([]byte, 1) b := make([]byte, 1)
if _, err := r.Read(b); err != nil && err != io.EOF { if _, err := r.Read(b); err != nil && err != io.EOF {
cancel()
return nil, err return nil, err
} }
return &binaryIO{ return &binaryIO{
cmd: cmd, cmd: cmd,
cancel: cancel, out: out,
out: out, err: serr,
err: serr,
}, nil }, nil
} }
type binaryIO struct { type binaryIO struct {
cmd *exec.Cmd cmd *exec.Cmd
cancel func()
out, err *pipe out, err *pipe
} }
func (b *binaryIO) CloseAfterStart() (err error) { func (b *binaryIO) CloseAfterStart() error {
for _, v := range []*pipe{ var (
b.out, result *multierror.Error
b.err, )
} {
for _, v := range []*pipe{b.out, b.err} {
if v != nil { if v != nil {
if cerr := v.r.Close(); err == nil { if err := v.r.Close(); err != nil {
err = cerr result = multierror.Append(result, err)
} }
} }
} }
return err
return result.ErrorOrNil()
} }
func (b *binaryIO) Close() (err error) { func (b *binaryIO) Close() error {
b.cancel() var (
for _, v := range []*pipe{ result *multierror.Error
b.out, )
b.err,
} { for _, v := range []*pipe{b.out, b.err} {
if v != nil { if v != nil {
if cerr := v.Close(); err == nil { if err := v.Close(); err != nil {
err = cerr result = multierror.Append(result, err)
} }
} }
} }
return err
if err := b.cancel(); err != nil {
result = multierror.Append(result, err)
}
return result.ErrorOrNil()
}
func (b *binaryIO) cancel() error {
if b.cmd == nil || b.cmd.Process == nil {
return nil
}
// Send SIGTERM first, so logger process has a chance to flush and exit properly
if err := b.cmd.Process.Signal(syscall.SIGTERM); err != nil {
result := multierror.Append(errors.Wrap(err, "failed to send SIGTERM"))
log.L.WithError(err).Warn("failed to send SIGTERM signal, killing logging shim")
if err := b.cmd.Process.Kill(); err != nil {
result = multierror.Append(result, errors.Wrap(err, "failed to kill process after faulty SIGTERM"))
}
return result.ErrorOrNil()
}
done := make(chan error)
go func() {
err := b.cmd.Wait()
if err != nil {
err = errors.Wrap(err, "failed to wait for shim logger process after SIGTERM")
}
done <- err
}()
termTimeout := timeout.Get(shimLoggerTermTimeout)
select {
case err := <-done:
return err
case <-time.After(termTimeout):
log.L.Warn("failed to wait for shim logger process to exit, killing")
err := b.cmd.Process.Kill()
if err != nil {
return errors.Wrap(err, "failed to kill shim logger process")
}
return nil
}
} }
func (b *binaryIO) Stdin() io.WriteCloser { func (b *binaryIO) Stdin() io.WriteCloser {

View File

@ -42,6 +42,8 @@ type LoggerFunc func(context.Context, *Config, func() error) error
// Run the logging driver // Run the logging driver
func Run(fn LoggerFunc) { func Run(fn LoggerFunc) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel()
config := &Config{ config := &Config{
ID: os.Getenv("CONTAINER_ID"), ID: os.Getenv("CONTAINER_ID"),
Namespace: os.Getenv("CONTAINER_NAMESPACE"), Namespace: os.Getenv("CONTAINER_NAMESPACE"),
@ -56,10 +58,7 @@ func Run(fn LoggerFunc) {
signal.Notify(s, unix.SIGTERM) signal.Notify(s, unix.SIGTERM)
go func() { go func() {
if err := fn(ctx, config, wait.Close); err != nil { errCh <- fn(ctx, config, wait.Close)
errCh <- err
}
errCh <- nil
}() }()
for { for {