From d541567119e2d97109f221409118e4b1f94f7c69 Mon Sep 17 00:00:00 2001 From: Kenfe-Mickael Laventure Date: Tue, 15 Aug 2017 11:37:53 -0700 Subject: [PATCH] Handle SIGKILL'ed shim while daemon is running Signed-off-by: Kenfe-Mickael Laventure --- container_linux_test.go | 147 ++++++++++++++++++++++++++++++++++++++-- daemon_test.go | 11 +-- linux/bundle.go | 6 +- linux/runtime.go | 117 +++++++++++++++++++++++++++----- linux/shim/client.go | 75 +++++++++++++------- linux/shim/init.go | 4 +- linux/task.go | 4 +- runtime/task_list.go | 4 +- 8 files changed, 314 insertions(+), 54 deletions(-) diff --git a/container_linux_test.go b/container_linux_test.go index 06a01e72e..d6d7550fb 100644 --- a/container_linux_test.go +++ b/container_linux_test.go @@ -7,6 +7,7 @@ import ( "context" "fmt" "io" + "os/exec" "runtime" "strings" "sync" @@ -222,13 +223,14 @@ func TestDaemonRestart(t *testing.T) { return } - if err := ctrd.Restart(); err != nil { + var exitStatus ExitStatus + if err := ctrd.Restart(func() { + exitStatus = <-statusC + }); err != nil { t.Fatal(err) } - status := <-statusC - _, _, err = status.Result() - if err == nil { + if exitStatus.Error() == nil { t.Errorf(`first task.Wait() should have failed with "transport is closing"`) } @@ -712,3 +714,140 @@ func TestContainerKillAll(t *testing.T) { return } } + +func TestShimSigkilled(t *testing.T) { + client, err := newClient(t, address) + if err != nil { + t.Fatal(err) + } + defer client.Close() + + var ( + image Image + ctx, cancel = testContext() + id = t.Name() + ) + defer cancel() + + // redis unset its PDeathSignal making it a good candidate + image, err = client.Pull(ctx, "docker.io/library/redis:alpine", WithPullUnpack) + if err != nil { + t.Fatal(err) + } + container, err := client.NewContainer(ctx, id, WithNewSpec(WithImageConfig(image)), withNewSnapshot(id, image)) + if err != nil { + t.Fatal(err) + } + defer container.Delete(ctx, WithSnapshotCleanup) + + task, err := container.NewTask(ctx, empty()) + if err != nil { + t.Fatal(err) + } + defer task.Delete(ctx) + + statusC, err := task.Wait(ctx) + if err != nil { + t.Error(err) + } + + pid := task.Pid() + if pid <= 0 { + t.Fatalf("invalid task pid %d", pid) + } + + if err := task.Start(ctx); err != nil { + t.Fatal(err) + } + + // SIGKILL the shim + if err := exec.Command("pkill", "-KILL", "containerd-s").Run(); err != nil { + t.Fatalf("failed to kill shim: %v", err) + } + + <-statusC + + if err := unix.Kill(int(pid), 0); err != unix.ESRCH { + t.Errorf("pid %d still exists", pid) + } +} + +func TestDaemonRestartWithRunningShim(t *testing.T) { + client, err := newClient(t, address) + if err != nil { + t.Fatal(err) + } + defer client.Close() + + var ( + image Image + ctx, cancel = testContext() + id = t.Name() + ) + defer cancel() + + image, err = client.GetImage(ctx, testImage) + if err != nil { + t.Error(err) + return + } + container, err := client.NewContainer(ctx, id, WithNewSpec(WithImageConfig(image), WithProcessArgs("sleep", "100")), withNewSnapshot(id, image)) + if err != nil { + t.Fatal(err) + } + defer container.Delete(ctx, WithSnapshotCleanup) + + task, err := container.NewTask(ctx, empty()) + if err != nil { + t.Fatal(err) + } + defer task.Delete(ctx) + + statusC, err := task.Wait(ctx) + if err != nil { + t.Error(err) + } + + pid := task.Pid() + if pid <= 0 { + t.Fatalf("invalid task pid %d", pid) + } + + if err := task.Start(ctx); err != nil { + t.Fatal(err) + } + + var exitStatus ExitStatus + if err := ctrd.Restart(func() { + exitStatus = <-statusC + }); err != nil { + t.Fatal(err) + } + + if exitStatus.Error() == nil { + t.Errorf(`first task.Wait() should have failed with "transport is closing"`) + } + + waitCtx, cancel := context.WithTimeout(ctx, 1*time.Second) + c, err := ctrd.waitForStart(waitCtx) + cancel() + if err != nil { + t.Fatal(err) + } + c.Close() + + statusC, err = task.Wait(ctx) + if err != nil { + t.Error(err) + } + + if err := task.Kill(ctx, syscall.SIGKILL); err != nil { + t.Fatal(err) + } + + <-statusC + + if err := unix.Kill(int(pid), 0); err != unix.ESRCH { + t.Errorf("pid %d still exists", pid) + } +} diff --git a/daemon_test.go b/daemon_test.go index 3bc9f6955..fe3a33932 100644 --- a/daemon_test.go +++ b/daemon_test.go @@ -6,7 +6,6 @@ import ( "os/exec" "sync" "syscall" - "time" "github.com/pkg/errors" ) @@ -82,10 +81,12 @@ func (d *daemon) Wait() error { if d.cmd == nil { return errors.New("daemon is not running") } - return d.cmd.Wait() + err := d.cmd.Wait() + d.cmd = nil + return err } -func (d *daemon) Restart() error { +func (d *daemon) Restart(stopCb func()) error { d.Lock() defer d.Unlock() if d.cmd == nil { @@ -99,7 +100,9 @@ func (d *daemon) Restart() error { d.cmd.Wait() - <-time.After(1 * time.Second) + if stopCb != nil { + stopCb() + } cmd := exec.Command(d.cmd.Path, d.cmd.Args[1:]...) cmd.Stdout = d.cmd.Stdout diff --git a/linux/bundle.go b/linux/bundle.go index b03dc8733..f05da157c 100644 --- a/linux/bundle.go +++ b/linux/bundle.go @@ -28,7 +28,7 @@ func loadBundle(path, workdir, namespace, id string, events *events.Exchange) *b } // newBundle creates a new bundle on disk at the provided path for the given id -func newBundle(path, namespace, workDir, id string, spec []byte, events *events.Exchange) (b *bundle, err error) { +func newBundle(namespace, id, path, workDir string, spec []byte, events *events.Exchange) (b *bundle, err error) { if err := os.MkdirAll(path, 0711); err != nil { return nil, err } @@ -78,8 +78,8 @@ type bundle struct { } // NewShim connects to the shim managing the bundle and tasks -func (b *bundle) NewShim(ctx context.Context, binary, grpcAddress string, remote, debug bool, createOpts runtime.CreateOpts) (*client.Client, error) { - opt := client.WithStart(binary, grpcAddress, debug) +func (b *bundle) NewShim(ctx context.Context, binary, grpcAddress string, remote, debug bool, createOpts runtime.CreateOpts, exitHandler func()) (*client.Client, error) { + opt := client.WithStart(binary, grpcAddress, debug, exitHandler) if !remote { opt = client.WithLocal(b.events) } diff --git a/linux/runtime.go b/linux/runtime.go index 81358a74e..ecb14e4cb 100644 --- a/linux/runtime.go +++ b/linux/runtime.go @@ -8,8 +8,10 @@ import ( "io/ioutil" "os" "path/filepath" + "time" "github.com/boltdb/bolt" + eventsapi "github.com/containerd/containerd/api/services/events/v1" "github.com/containerd/containerd/api/types" "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/events" @@ -20,10 +22,13 @@ import ( "github.com/containerd/containerd/metadata" "github.com/containerd/containerd/namespaces" "github.com/containerd/containerd/plugin" + "github.com/containerd/containerd/reaper" "github.com/containerd/containerd/runtime" + "github.com/containerd/containerd/sys" runc "github.com/containerd/go-runc" google_protobuf "github.com/golang/protobuf/ptypes/empty" "github.com/pkg/errors" + "github.com/sirupsen/logrus" "golang.org/x/sys/unix" ) @@ -101,6 +106,7 @@ func New(ic *plugin.InitContext) (interface{}, error) { if err != nil { return nil, err } + // TODO: need to add the tasks to the monitor for _, t := range tasks { if err := r.tasks.AddWithNamespace(t.namespace, t); err != nil { return nil, err @@ -138,7 +144,11 @@ func (r *Runtime) Create(ctx context.Context, id string, opts runtime.CreateOpts return nil, errors.Wrapf(err, "invalid task id") } - bundle, err := newBundle(filepath.Join(r.state, namespace), namespace, filepath.Join(r.root, namespace), id, opts.Spec.Value, r.events) + bundle, err := newBundle( + namespace, id, + filepath.Join(r.state, namespace), + filepath.Join(r.root, namespace), + opts.Spec.Value, r.events) if err != nil { return nil, err } @@ -147,7 +157,36 @@ func (r *Runtime) Create(ctx context.Context, id string, opts runtime.CreateOpts bundle.Delete() } }() - s, err := bundle.NewShim(ctx, r.shim, r.address, r.remote, r.shimDebug, opts) + s, err := bundle.NewShim(ctx, r.shim, r.address, r.remote, r.shimDebug, opts, func() { + t, err := r.tasks.Get(ctx, id) + if err != nil { + // Task was never started or was already sucessfully deleted + return + } + lc := t.(*Task) + + // Stop the monitor + if err := r.monitor.Stop(lc); err != nil { + log.G(ctx).WithError(err).WithFields(logrus.Fields{ + "id": id, + "namespace": namespace, + }).Warn("failed to stop monitor") + } + + log.G(ctx).WithFields(logrus.Fields{ + "id": id, + "namespace": namespace, + }).Warn("cleaning up after killed shim") + err = r.cleanupAfterDeadShim(context.Background(), bundle, namespace, id, lc.pid, true) + if err == nil { + r.tasks.Delete(ctx, lc) + } else { + log.G(ctx).WithError(err).WithFields(logrus.Fields{ + "id": id, + "namespace": namespace, + }).Warn("failed to clen up after killed shim") + } + }) if err != nil { return nil, err } @@ -176,10 +215,11 @@ func (r *Runtime) Create(ctx context.Context, id string, opts runtime.CreateOpts Options: m.Options, }) } - if _, err = s.Create(ctx, sopts); err != nil { + cr, err := s.Create(ctx, sopts) + if err != nil { return nil, errdefs.FromGRPC(err) } - t := newTask(id, namespace, s) + t := newTask(id, namespace, int(cr.Pid), s) if err := r.tasks.Add(ctx, t); err != nil { return nil, err } @@ -207,10 +247,10 @@ func (r *Runtime) Delete(ctx context.Context, c runtime.Task) (*runtime.Exit, er if err != nil { return nil, errdefs.FromGRPC(err) } + r.tasks.Delete(ctx, lc) if err := lc.shim.KillShim(ctx); err != nil { log.G(ctx).WithError(err).Error("failed to kill shim") } - r.tasks.Delete(ctx, lc) bundle := loadBundle( filepath.Join(r.state, namespace, lc.id), @@ -269,18 +309,21 @@ func (r *Runtime) loadTasks(ctx context.Context, ns string) ([]*Task, error) { continue } id := path.Name() - bundle := loadBundle(filepath.Join(r.state, ns, id), - filepath.Join(r.root, ns, id), ns, id, r.events) - + bundle := loadBundle( + filepath.Join(r.state, ns, id), + filepath.Join(r.root, ns, id), + ns, id, r.events) s, err := bundle.Connect(ctx, r.remote) if err != nil { - log.G(ctx).WithError(err).Error("connecting to shim") - if err := r.terminate(ctx, bundle, ns, id); err != nil { - log.G(ctx).WithError(err).WithField("bundle", bundle.path).Error("failed to terminate task, leaving bundle for debugging") - continue - } - if err := bundle.Delete(); err != nil { - log.G(ctx).WithError(err).Error("delete bundle") + log.G(ctx).WithError(err).WithFields(logrus.Fields{ + "id": id, + "namespace": ns, + }).Error("connecting to shim") + pid, _ := runc.ReadPidFile(filepath.Join(bundle.path, client.InitPidFile)) + err := r.cleanupAfterDeadShim(ctx, bundle, ns, id, pid, false) + if err != nil { + log.G(ctx).WithError(err).WithField("bundle", bundle.path). + Error("cleaning up after dead shim") } continue } @@ -293,6 +336,45 @@ func (r *Runtime) loadTasks(ctx context.Context, ns string) ([]*Task, error) { return o, nil } +func (r *Runtime) cleanupAfterDeadShim(ctx context.Context, bundle *bundle, ns, id string, pid int, reap bool) error { + ctx = namespaces.WithNamespace(ctx, ns) + if err := r.terminate(ctx, bundle, ns, id); err != nil { + return errors.New("failed to terminate task, leaving bundle for debugging") + } + + if reap { + // if sub-reaper is set, reap our new child + if v, err := sys.GetSubreaper(); err == nil && v == 1 { + reaper.Default.Register(pid, &reaper.Cmd{ExitCh: make(chan struct{})}) + reaper.Default.WaitPid(pid) + reaper.Default.Delete(pid) + } + } + + // Notify Client + exitedAt := time.Now().UTC() + r.events.Publish(ctx, runtime.TaskExitEventTopic, &eventsapi.TaskExit{ + ContainerID: id, + ID: id, + Pid: uint32(pid), + ExitStatus: 128 + uint32(unix.SIGKILL), + ExitedAt: exitedAt, + }) + + if err := bundle.Delete(); err != nil { + log.G(ctx).WithError(err).Error("delete bundle") + } + + r.events.Publish(ctx, runtime.TaskDeleteEventTopic, &eventsapi.TaskDelete{ + ContainerID: id, + Pid: uint32(pid), + ExitStatus: 128 + uint32(unix.SIGKILL), + ExitedAt: exitedAt, + }) + + return nil +} + func (r *Runtime) terminate(ctx context.Context, bundle *bundle, ns, id string) error { ctx = namespaces.WithNamespace(ctx, ns) rt, err := r.getRuntime(ctx, ns, id) @@ -305,7 +387,10 @@ func (r *Runtime) terminate(ctx context.Context, bundle *bundle, ns, id string) log.G(ctx).WithError(err).Warnf("delete runtime state %s", id) } if err := unix.Unmount(filepath.Join(bundle.path, "rootfs"), 0); err != nil { - log.G(ctx).WithError(err).Warnf("unmount task rootfs %s", id) + log.G(ctx).WithError(err).WithFields(logrus.Fields{ + "path": bundle.path, + "id": id, + }).Warnf("unmount task rootfs") } return nil } diff --git a/linux/shim/client.go b/linux/shim/client.go index 2b5016fc5..17b1c4e05 100644 --- a/linux/shim/client.go +++ b/linux/shim/client.go @@ -10,6 +10,8 @@ import ( "os" "os/exec" "strings" + "sync" + "syscall" "time" "golang.org/x/sys/unix" @@ -28,7 +30,7 @@ import ( type ClientOpt func(context.Context, Config) (shim.ShimClient, io.Closer, error) // WithStart executes a new shim process -func WithStart(binary, address string, debug bool) ClientOpt { +func WithStart(binary, address string, debug bool, exitHandler func()) ClientOpt { return func(ctx context.Context, config Config) (_ shim.ShimClient, _ io.Closer, err error) { socket, err := newSocket(config) if err != nil { @@ -47,9 +49,14 @@ func WithStart(binary, address string, debug bool) ClientOpt { } defer func() { if err != nil { - terminate(cmd) + cmd.Process.Kill() } }() + go func() { + reaper.Default.Wait(cmd) + reaper.Default.Delete(cmd.Process.Pid) + exitHandler() + }() log.G(ctx).WithFields(logrus.Fields{ "pid": cmd.Process.Pid, "address": config.Address, @@ -72,11 +79,6 @@ func WithStart(binary, address string, debug bool) ClientOpt { } } -func terminate(cmd *exec.Cmd) { - cmd.Process.Kill() - reaper.Default.Wait(cmd) -} - func newCommand(binary, address string, debug bool, config Config, socket *os.File) *exec.Cmd { args := []string{ "--namespace", config.Namespace, @@ -178,15 +180,20 @@ func New(ctx context.Context, config Config, opt ClientOpt) (*Client, error) { return &Client{ ShimClient: s, c: c, + exitCh: make(chan struct{}), }, nil } type Client struct { shim.ShimClient - c io.Closer + c io.Closer + exitCh chan struct{} + exitOnce sync.Once } +// IsAlive returns true if the shim can be contacted. +// NOTE: a negative answer doesn't mean that the process is gone. func (c *Client) IsAlive(ctx context.Context) (bool, error) { _, err := c.ShimInfo(ctx, empty) if err != nil { @@ -198,8 +205,24 @@ func (c *Client) IsAlive(ctx context.Context) (bool, error) { return true, nil } -// KillShim kills the shim forcefully +// StopShim signals the shim to exit and wait for the process to disappear +func (c *Client) StopShim(ctx context.Context) error { + return c.signalShim(ctx, unix.SIGTERM) +} + +// KillShim kills the shim forcefully and wait for the process to disappear func (c *Client) KillShim(ctx context.Context) error { + return c.signalShim(ctx, unix.SIGKILL) +} + +func (c *Client) Close() error { + if c.c == nil { + return nil + } + return c.c.Close() +} + +func (c *Client) signalShim(ctx context.Context, sig syscall.Signal) error { info, err := c.ShimInfo(ctx, empty) if err != nil { return err @@ -209,23 +232,29 @@ func (c *Client) KillShim(ctx context.Context) error { if os.Getpid() == pid { return nil } - if err := unix.Kill(pid, unix.SIGKILL); err != nil { + if err := unix.Kill(pid, sig); err != nil && err != unix.ESRCH { return err } - // wait for shim to die after being SIGKILL'd - for { - // use kill(pid, 0) here because the shim could have been reparented - // and we are no longer able to waitpid(pid, ...) on the shim - if err := unix.Kill(pid, 0); err != nil && err == unix.ESRCH { - return nil - } - time.Sleep(10 * time.Millisecond) + // wait for shim to die after being signaled + select { + case <-ctx.Done(): + return ctx.Err() + case <-c.waitForExit(pid): + return nil } } -func (c *Client) Close() error { - if c.c == nil { - return nil - } - return c.c.Close() +func (c *Client) waitForExit(pid int) <-chan struct{} { + c.exitOnce.Do(func() { + for { + // use kill(pid, 0) here because the shim could have been reparented + // and we are no longer able to waitpid(pid, ...) on the shim + if err := unix.Kill(pid, 0); err == unix.ESRCH { + close(c.exitCh) + return + } + time.Sleep(10 * time.Millisecond) + } + }) + return c.exitCh } diff --git a/linux/shim/init.go b/linux/shim/init.go index 94f33718b..314375cd4 100644 --- a/linux/shim/init.go +++ b/linux/shim/init.go @@ -27,6 +27,8 @@ import ( "github.com/pkg/errors" ) +const InitPidFile = "init.pid" + type initProcess struct { sync.WaitGroup initState @@ -131,7 +133,7 @@ func newInitProcess(context context.Context, plat platform, path, namespace, wor return nil, errors.Wrap(err, "failed to create OCI runtime io pipes") } } - pidFile := filepath.Join(path, "init.pid") + pidFile := filepath.Join(path, InitPidFile) if r.Checkpoint != "" { opts := &runc.RestoreOpts{ CheckpointOpts: runc.CheckpointOpts{ diff --git a/linux/task.go b/linux/task.go index 2177262a1..3a5c48bbe 100644 --- a/linux/task.go +++ b/linux/task.go @@ -15,13 +15,15 @@ import ( type Task struct { id string + pid int shim *client.Client namespace string } -func newTask(id, namespace string, shim *client.Client) *Task { +func newTask(id, namespace string, pid int, shim *client.Client) *Task { return &Task{ id: id, + pid: pid, shim: shim, namespace: namespace, } diff --git a/runtime/task_list.go b/runtime/task_list.go index e5d5c0e53..12062cef5 100644 --- a/runtime/task_list.go +++ b/runtime/task_list.go @@ -2,10 +2,10 @@ package runtime import ( "context" - "errors" "sync" "github.com/containerd/containerd/namespaces" + "github.com/pkg/errors" ) var ( @@ -75,7 +75,7 @@ func (l *TaskList) AddWithNamespace(namespace string, t Task) error { l.tasks[namespace] = make(map[string]Task) } if _, ok := l.tasks[namespace][id]; ok { - return ErrTaskAlreadyExists + return errors.Wrap(ErrTaskAlreadyExists, id) } l.tasks[namespace][id] = t return nil