Update reaper for multipe subscribers

Depends on https://github.com/containerd/go-runc/pull/24

The is currently a race with the reaper where you could miss some exit
events from processes.

The problem before and why the reaper was so complex was because
processes could fork, getting a pid, and then fail on an execve before
we would have time to register the process with the reaper.  This could
cause pids to fill up in a map as a way to reduce the race.

This changes makes the reaper handle multiple subscribers so that the
caller can handle locking, for when they want to wait for a specific
pid, without affecting other callers using the reaper code.

Exit events are broadcast to multiple subscribers, in the case, the runc
commands and container pids that we get from a pid-file.  Locking while
the entire container stats no longs affects runc commands where you want
to call `runc create` and wait until that has been completed.

Signed-off-by: Michael Crosby <crosbymichael@gmail.com>
This commit is contained in:
Michael Crosby
2017-08-31 13:36:40 -04:00
parent c2e894c33a
commit 6b4c4a2937
8 changed files with 153 additions and 156 deletions

View File

@@ -144,6 +144,9 @@ func (r *Runtime) Create(ctx context.Context, id string, opts runtime.CreateOpts
return nil, errors.Wrapf(err, "invalid task id")
}
ec := reaper.Default.Subscribe()
defer reaper.Default.Unsubscribe(ec)
bundle, err := newBundle(
namespace, id,
filepath.Join(r.state, namespace),
@@ -177,7 +180,7 @@ func (r *Runtime) Create(ctx context.Context, id string, opts runtime.CreateOpts
"id": id,
"namespace": namespace,
}).Warn("cleaning up after killed shim")
err = r.cleanupAfterDeadShim(context.Background(), bundle, namespace, id, lc.pid, true)
err = r.cleanupAfterDeadShim(context.Background(), bundle, namespace, id, lc.pid, ec)
if err == nil {
r.tasks.Delete(ctx, lc)
} else {
@@ -320,7 +323,7 @@ func (r *Runtime) loadTasks(ctx context.Context, ns string) ([]*Task, error) {
"namespace": ns,
}).Error("connecting to shim")
pid, _ := runc.ReadPidFile(filepath.Join(bundle.path, client.InitPidFile))
err := r.cleanupAfterDeadShim(ctx, bundle, ns, id, pid, false)
err := r.cleanupAfterDeadShim(ctx, bundle, ns, id, pid, nil)
if err != nil {
log.G(ctx).WithError(err).WithField("bundle", bundle.path).
Error("cleaning up after dead shim")
@@ -336,18 +339,20 @@ func (r *Runtime) loadTasks(ctx context.Context, ns string) ([]*Task, error) {
return o, nil
}
func (r *Runtime) cleanupAfterDeadShim(ctx context.Context, bundle *bundle, ns, id string, pid int, reap bool) error {
func (r *Runtime) cleanupAfterDeadShim(ctx context.Context, bundle *bundle, ns, id string, pid int, ec chan runc.Exit) error {
ctx = namespaces.WithNamespace(ctx, ns)
if err := r.terminate(ctx, bundle, ns, id); err != nil {
return errors.New("failed to terminate task, leaving bundle for debugging")
}
if reap {
if ec != nil {
// if sub-reaper is set, reap our new child
if v, err := sys.GetSubreaper(); err == nil && v == 1 {
reaper.Default.Register(pid, &reaper.Cmd{ExitCh: make(chan struct{})})
reaper.Default.WaitPid(pid)
reaper.Default.Delete(pid)
for e := range ec {
if e.Pid == pid {
break
}
}
}
}

View File

@@ -44,7 +44,8 @@ func WithStart(binary, address string, debug bool, exitHandler func()) ClientOpt
defer f.Close()
cmd := newCommand(binary, address, debug, config, f)
if err := reaper.Default.Start(cmd); err != nil {
ec, err := reaper.Default.Start(cmd)
if err != nil {
return nil, nil, errors.Wrapf(err, "failed to start shim")
}
defer func() {
@@ -53,8 +54,7 @@ func WithStart(binary, address string, debug bool, exitHandler func()) ClientOpt
}
}()
go func() {
reaper.Default.Wait(cmd)
reaper.Default.Delete(cmd.Process.Pid)
reaper.Default.Wait(cmd, ec)
exitHandler()
}()
log.G(ctx).WithFields(logrus.Fields{

View File

@@ -6,6 +6,7 @@ import (
"context"
"github.com/containerd/console"
"github.com/containerd/containerd/errdefs"
shimapi "github.com/containerd/containerd/linux/shim/v1"
"github.com/pkg/errors"
)
@@ -345,10 +346,7 @@ func (s *stoppedState) Delete(ctx context.Context) error {
}
func (s *stoppedState) Kill(ctx context.Context, sig uint32, all bool) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
return s.p.kill(ctx, sig, all)
return errdefs.ToGRPCf(errdefs.ErrNotFound, "process %s not found", s.p.id)
}
func (s *stoppedState) SetExited(status int) {

View File

@@ -20,6 +20,7 @@ import (
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/reaper"
"github.com/containerd/containerd/runtime"
runc "github.com/containerd/go-runc"
google_protobuf "github.com/golang/protobuf/ptypes/empty"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
@@ -48,7 +49,9 @@ func NewService(path, namespace, workDir string, publisher events.Publisher) (*S
namespace: namespace,
context: context,
workDir: workDir,
ec: reaper.Default.Subscribe(),
}
go s.processExits()
if err := s.initPlatform(); err != nil {
return nil, errors.Wrap(err, "failed to initialized platform behavior")
}
@@ -70,31 +73,27 @@ type Service struct {
mu sync.Mutex
processes map[string]process
events chan interface{}
eventsMu sync.Mutex
deferredEvent interface{}
namespace string
context context.Context
ec chan runc.Exit
workDir string
platform platform
}
func (s *Service) Create(ctx context.Context, r *shimapi.CreateTaskRequest) (*shimapi.CreateTaskResponse, error) {
s.mu.Lock()
defer s.mu.Unlock()
process, err := newInitProcess(ctx, s.platform, s.path, s.namespace, s.workDir, r)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
s.mu.Lock()
// save the main task id and bundle to the shim for additional requests
s.id = r.ID
s.bundle = r.Bundle
pid := process.Pid()
s.processes[r.ID] = process
s.mu.Unlock()
cmd := &reaper.Cmd{
ExitCh: make(chan struct{}),
}
reaper.Default.Register(pid, cmd)
s.events <- &eventsapi.TaskCreate{
ContainerID: r.ID,
Bundle: r.Bundle,
@@ -108,7 +107,6 @@ func (s *Service) Create(ctx context.Context, r *shimapi.CreateTaskRequest) (*sh
Checkpoint: r.Checkpoint,
Pid: uint32(pid),
}
go s.waitExit(process, pid, cmd)
return &shimapi.CreateTaskResponse{
Pid: uint32(pid),
}, nil
@@ -129,11 +127,6 @@ func (s *Service) Start(ctx context.Context, r *shimapi.StartRequest) (*shimapi.
}
} else {
pid := p.Pid()
cmd := &reaper.Cmd{
ExitCh: make(chan struct{}),
}
reaper.Default.Register(pid, cmd)
go s.waitExit(p, pid, cmd)
s.events <- &eventsapi.TaskExecStarted{
ContainerID: s.id,
ExecID: r.ID,
@@ -392,17 +385,27 @@ func (s *Service) deleteProcess(id string) {
s.mu.Unlock()
}
func (s *Service) waitExit(p process, pid int, cmd *reaper.Cmd) {
status, _ := reaper.Default.WaitPid(pid)
p.SetExited(status)
func (s *Service) processExits() {
for e := range s.ec {
s.checkProcesses(e)
}
}
reaper.Default.Delete(pid)
s.events <- &eventsapi.TaskExit{
ContainerID: s.id,
ID: p.ID(),
Pid: uint32(pid),
ExitStatus: uint32(status),
ExitedAt: p.ExitedAt(),
func (s *Service) checkProcesses(e runc.Exit) {
s.mu.Lock()
defer s.mu.Unlock()
for _, p := range s.processes {
if p.Pid() == e.Pid {
p.SetExited(e.Status)
s.events <- &eventsapi.TaskExit{
ContainerID: s.id,
ID: p.ID(),
Pid: uint32(e.Pid),
ExitStatus: uint32(e.Status),
ExitedAt: p.ExitedAt(),
}
return
}
}
}