Merge pull request #1452 from crosbymichael/reaper2

Update reaper for multiple subscribers
This commit is contained in:
Kenfe-Mickaël Laventure 2017-08-31 11:52:23 -07:00 committed by GitHub
commit 22df20b35f
8 changed files with 153 additions and 156 deletions

View File

@ -144,6 +144,9 @@ func (r *Runtime) Create(ctx context.Context, id string, opts runtime.CreateOpts
return nil, errors.Wrapf(err, "invalid task id")
}
ec := reaper.Default.Subscribe()
defer reaper.Default.Unsubscribe(ec)
bundle, err := newBundle(
namespace, id,
filepath.Join(r.state, namespace),
@ -177,7 +180,7 @@ func (r *Runtime) Create(ctx context.Context, id string, opts runtime.CreateOpts
"id": id,
"namespace": namespace,
}).Warn("cleaning up after killed shim")
err = r.cleanupAfterDeadShim(context.Background(), bundle, namespace, id, lc.pid, true)
err = r.cleanupAfterDeadShim(context.Background(), bundle, namespace, id, lc.pid, ec)
if err == nil {
r.tasks.Delete(ctx, lc)
} else {
@ -320,7 +323,7 @@ func (r *Runtime) loadTasks(ctx context.Context, ns string) ([]*Task, error) {
"namespace": ns,
}).Error("connecting to shim")
pid, _ := runc.ReadPidFile(filepath.Join(bundle.path, client.InitPidFile))
err := r.cleanupAfterDeadShim(ctx, bundle, ns, id, pid, false)
err := r.cleanupAfterDeadShim(ctx, bundle, ns, id, pid, nil)
if err != nil {
log.G(ctx).WithError(err).WithField("bundle", bundle.path).
Error("cleaning up after dead shim")
@ -336,18 +339,20 @@ func (r *Runtime) loadTasks(ctx context.Context, ns string) ([]*Task, error) {
return o, nil
}
func (r *Runtime) cleanupAfterDeadShim(ctx context.Context, bundle *bundle, ns, id string, pid int, reap bool) error {
func (r *Runtime) cleanupAfterDeadShim(ctx context.Context, bundle *bundle, ns, id string, pid int, ec chan runc.Exit) error {
ctx = namespaces.WithNamespace(ctx, ns)
if err := r.terminate(ctx, bundle, ns, id); err != nil {
return errors.New("failed to terminate task, leaving bundle for debugging")
}
if reap {
if ec != nil {
// if sub-reaper is set, reap our new child
if v, err := sys.GetSubreaper(); err == nil && v == 1 {
reaper.Default.Register(pid, &reaper.Cmd{ExitCh: make(chan struct{})})
reaper.Default.WaitPid(pid)
reaper.Default.Delete(pid)
for e := range ec {
if e.Pid == pid {
break
}
}
}
}

View File

@ -44,7 +44,8 @@ func WithStart(binary, address string, debug bool, exitHandler func()) ClientOpt
defer f.Close()
cmd := newCommand(binary, address, debug, config, f)
if err := reaper.Default.Start(cmd); err != nil {
ec, err := reaper.Default.Start(cmd)
if err != nil {
return nil, nil, errors.Wrapf(err, "failed to start shim")
}
defer func() {
@ -53,8 +54,7 @@ func WithStart(binary, address string, debug bool, exitHandler func()) ClientOpt
}
}()
go func() {
reaper.Default.Wait(cmd)
reaper.Default.Delete(cmd.Process.Pid)
reaper.Default.Wait(cmd, ec)
exitHandler()
}()
log.G(ctx).WithFields(logrus.Fields{

View File

@ -6,6 +6,7 @@ import (
"context"
"github.com/containerd/console"
"github.com/containerd/containerd/errdefs"
shimapi "github.com/containerd/containerd/linux/shim/v1"
"github.com/pkg/errors"
)
@ -345,10 +346,7 @@ func (s *stoppedState) Delete(ctx context.Context) error {
}
func (s *stoppedState) Kill(ctx context.Context, sig uint32, all bool) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
return s.p.kill(ctx, sig, all)
return errdefs.ToGRPCf(errdefs.ErrNotFound, "process %s not found", s.p.id)
}
func (s *stoppedState) SetExited(status int) {

View File

@ -20,6 +20,7 @@ import (
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/reaper"
"github.com/containerd/containerd/runtime"
runc "github.com/containerd/go-runc"
google_protobuf "github.com/golang/protobuf/ptypes/empty"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
@ -48,7 +49,9 @@ func NewService(path, namespace, workDir string, publisher events.Publisher) (*S
namespace: namespace,
context: context,
workDir: workDir,
ec: reaper.Default.Subscribe(),
}
go s.processExits()
if err := s.initPlatform(); err != nil {
return nil, errors.Wrap(err, "failed to initialized platform behavior")
}
@ -70,31 +73,27 @@ type Service struct {
mu sync.Mutex
processes map[string]process
events chan interface{}
eventsMu sync.Mutex
deferredEvent interface{}
namespace string
context context.Context
ec chan runc.Exit
workDir string
platform platform
}
func (s *Service) Create(ctx context.Context, r *shimapi.CreateTaskRequest) (*shimapi.CreateTaskResponse, error) {
s.mu.Lock()
defer s.mu.Unlock()
process, err := newInitProcess(ctx, s.platform, s.path, s.namespace, s.workDir, r)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
s.mu.Lock()
// save the main task id and bundle to the shim for additional requests
s.id = r.ID
s.bundle = r.Bundle
pid := process.Pid()
s.processes[r.ID] = process
s.mu.Unlock()
cmd := &reaper.Cmd{
ExitCh: make(chan struct{}),
}
reaper.Default.Register(pid, cmd)
s.events <- &eventsapi.TaskCreate{
ContainerID: r.ID,
Bundle: r.Bundle,
@ -108,7 +107,6 @@ func (s *Service) Create(ctx context.Context, r *shimapi.CreateTaskRequest) (*sh
Checkpoint: r.Checkpoint,
Pid: uint32(pid),
}
go s.waitExit(process, pid, cmd)
return &shimapi.CreateTaskResponse{
Pid: uint32(pid),
}, nil
@ -129,11 +127,6 @@ func (s *Service) Start(ctx context.Context, r *shimapi.StartRequest) (*shimapi.
}
} else {
pid := p.Pid()
cmd := &reaper.Cmd{
ExitCh: make(chan struct{}),
}
reaper.Default.Register(pid, cmd)
go s.waitExit(p, pid, cmd)
s.events <- &eventsapi.TaskExecStarted{
ContainerID: s.id,
ExecID: r.ID,
@ -392,17 +385,27 @@ func (s *Service) deleteProcess(id string) {
s.mu.Unlock()
}
func (s *Service) waitExit(p process, pid int, cmd *reaper.Cmd) {
status, _ := reaper.Default.WaitPid(pid)
p.SetExited(status)
func (s *Service) processExits() {
for e := range s.ec {
s.checkProcesses(e)
}
}
reaper.Default.Delete(pid)
s.events <- &eventsapi.TaskExit{
ContainerID: s.id,
ID: p.ID(),
Pid: uint32(pid),
ExitStatus: uint32(status),
ExitedAt: p.ExitedAt(),
func (s *Service) checkProcesses(e runc.Exit) {
s.mu.Lock()
defer s.mu.Unlock()
for _, p := range s.processes {
if p.Pid() == e.Pid {
p.SetExited(e.Status)
s.events <- &eventsapi.TaskExit{
ContainerID: s.id,
ID: p.ID(),
Pid: uint32(e.Pid),
ExitStatus: uint32(e.Status),
ExitedAt: p.ExitedAt(),
}
return
}
}
}

View File

@ -6,49 +6,45 @@ import (
"bytes"
"os/exec"
"sync"
"time"
"github.com/containerd/containerd/sys"
runc "github.com/containerd/go-runc"
"github.com/pkg/errors"
)
var (
ErrNoSuchProcess = errors.New("no such process")
)
var ErrNoSuchProcess = errors.New("no such process")
const bufferSize = 2048
// Reap should be called when the process receives an SIGCHLD. Reap will reap
// all exited processes and close their wait channels
func Reap() error {
now := time.Now()
exits, err := sys.Reap(false)
for _, e := range exits {
Default.Lock()
c, ok := Default.cmds[e.Pid]
if !ok {
Default.unknown[e.Pid] = e.Status
Default.Unlock()
continue
Default.Lock()
for c := range Default.subscribers {
for _, e := range exits {
c <- runc.Exit{
Timestamp: now,
Pid: e.Pid,
Status: e.Status,
}
}
Default.Unlock()
if c.c != nil {
// after we get an exit, call wait on the go process to make sure all
// pipes are closed and finalizers are run on the process
c.c.Wait()
}
c.exitStatus = e.Status
close(c.ExitCh)
}
Default.Unlock()
return err
}
var Default = &Monitor{
cmds: make(map[int]*Cmd),
unknown: make(map[int]int),
subscribers: make(map[chan runc.Exit]struct{}),
}
type Monitor struct {
sync.Mutex
cmds map[int]*Cmd
unknown map[int]int
subscribers map[chan runc.Exit]struct{}
}
func (m *Monitor) Output(c *exec.Cmd) ([]byte, error) {
@ -69,82 +65,50 @@ func (m *Monitor) CombinedOutput(c *exec.Cmd) ([]byte, error) {
}
// Start starts the command a registers the process with the reaper
func (m *Monitor) Start(c *exec.Cmd) error {
rc := &Cmd{
c: c,
ExitCh: make(chan struct{}),
func (m *Monitor) Start(c *exec.Cmd) (chan runc.Exit, error) {
ec := m.Subscribe()
if err := c.Start(); err != nil {
m.Unsubscribe(ec)
return nil, err
}
// start the process
m.Lock()
err := c.Start()
if c.Process != nil {
m.RegisterNL(c.Process.Pid, rc)
}
m.Unlock()
return err
return ec, nil
}
// Run runs and waits for the command to finish
func (m *Monitor) Run(c *exec.Cmd) error {
if err := m.Start(c); err != nil {
ec, err := m.Start(c)
if err != nil {
return err
}
_, err := m.Wait(c)
_, err = m.Wait(c, ec)
return err
}
func (m *Monitor) Wait(c *exec.Cmd) (int, error) {
return m.WaitPid(c.Process.Pid)
func (m *Monitor) Wait(c *exec.Cmd, ec chan runc.Exit) (int, error) {
for e := range ec {
if e.Pid == c.Process.Pid {
// make sure we flush all IO
c.Wait()
m.Unsubscribe(ec)
return e.Status, nil
}
}
// return no such process if the ec channel is closed and no more exit
// events will be sent
return -1, ErrNoSuchProcess
}
func (m *Monitor) Register(pid int, c *Cmd) {
func (m *Monitor) Subscribe() chan runc.Exit {
c := make(chan runc.Exit, bufferSize)
m.Lock()
m.RegisterNL(pid, c)
m.subscribers[c] = struct{}{}
m.Unlock()
return c
}
func (m *Monitor) Unsubscribe(c chan runc.Exit) {
m.Lock()
delete(m.subscribers, c)
close(c)
m.Unlock()
}
// RegisterNL does not grab the lock internally
// the caller is responsible for locking the monitor
func (m *Monitor) RegisterNL(pid int, c *Cmd) {
if status, ok := m.unknown[pid]; ok {
delete(m.unknown, pid)
m.cmds[pid] = c
c.exitStatus = status
close(c.ExitCh)
return
}
m.cmds[pid] = c
}
func (m *Monitor) WaitPid(pid int) (int, error) {
m.Lock()
rc, ok := m.cmds[pid]
m.Unlock()
if !ok {
return 255, errors.Wrapf(ErrNoSuchProcess, "pid %d", pid)
}
<-rc.ExitCh
if rc.exitStatus != 0 {
return rc.exitStatus, errors.Errorf("exit status %d", rc.exitStatus)
}
return rc.exitStatus, nil
}
// Command returns the registered pid for the command created
func (m *Monitor) Command(pid int) *Cmd {
m.Lock()
defer m.Unlock()
return m.cmds[pid]
}
func (m *Monitor) Delete(pid int) {
m.Lock()
delete(m.cmds, pid)
m.Unlock()
}
type Cmd struct {
c *exec.Cmd
ExitCh chan struct{}
exitStatus int
}

View File

@ -1,5 +1,5 @@
github.com/coreos/go-systemd 48702e0da86bd25e76cfef347e2adeb434a0d0a6
github.com/containerd/go-runc b85ac701de5065a66918203dd18f057433290807
github.com/containerd/go-runc e103f453ff3db23ec69d31371cadc1ea0ce87ec0
github.com/containerd/console 76d18fd1d66972718ab2284449591db0b3cdb4de
github.com/containerd/cgroups e6d1aa8c71c6103624b2c6e6f4be0863b67027f1
github.com/docker/go-metrics 8fd5772bf1584597834c6f7961a530f06cbfbb87

View File

@ -3,10 +3,17 @@ package runc
import (
"os/exec"
"syscall"
"time"
)
var Monitor ProcessMonitor = &defaultMonitor{}
type Exit struct {
Timestamp time.Time
Pid int
Status int
}
// ProcessMonitor is an interface for process monitoring
//
// It allows daemons using go-runc to have a SIGCHLD handler
@ -18,8 +25,8 @@ type ProcessMonitor interface {
Output(*exec.Cmd) ([]byte, error)
CombinedOutput(*exec.Cmd) ([]byte, error)
Run(*exec.Cmd) error
Start(*exec.Cmd) error
Wait(*exec.Cmd) (int, error)
Start(*exec.Cmd) (chan Exit, error)
Wait(*exec.Cmd, chan Exit) (int, error)
}
type defaultMonitor struct {
@ -37,18 +44,32 @@ func (m *defaultMonitor) Run(c *exec.Cmd) error {
return c.Run()
}
func (m *defaultMonitor) Start(c *exec.Cmd) error {
return c.Start()
}
func (m *defaultMonitor) Wait(c *exec.Cmd) (int, error) {
if err := c.Wait(); err != nil {
if exitErr, ok := err.(*exec.ExitError); ok {
if status, ok := exitErr.Sys().(syscall.WaitStatus); ok {
return status.ExitStatus(), nil
func (m *defaultMonitor) Start(c *exec.Cmd) (chan Exit, error) {
if err := c.Start(); err != nil {
return nil, err
}
ec := make(chan Exit, 1)
go func() {
var status int
if err := c.Wait(); err != nil {
status = 255
if exitErr, ok := err.(*exec.ExitError); ok {
if ws, ok := exitErr.Sys().(syscall.WaitStatus); ok {
status = ws.ExitStatus()
}
}
}
return -1, err
}
return 0, nil
ec <- Exit{
Timestamp: time.Now(),
Pid: c.Process.Pid,
Status: status,
}
close(ec)
}()
return ec, nil
}
func (m *defaultMonitor) Wait(c *exec.Cmd, ec chan Exit) (int, error) {
e := <-ec
return e.Status, nil
}

View File

@ -41,7 +41,7 @@ type Runc struct {
PdeathSignal syscall.Signal
Setpgid bool
Criu string
SystemdCgroup string
SystemdCgroup bool
}
// List returns all containers created inside the provided runc root directory
@ -134,7 +134,8 @@ func (r *Runc) Create(context context.Context, id, bundle string, opts *CreateOp
}
return nil
}
if err := Monitor.Start(cmd); err != nil {
ec, err := Monitor.Start(cmd)
if err != nil {
return err
}
if opts != nil && opts.IO != nil {
@ -144,7 +145,7 @@ func (r *Runc) Create(context context.Context, id, bundle string, opts *CreateOp
}
}
}
_, err := Monitor.Wait(cmd)
_, err = Monitor.Wait(cmd, ec)
return err
}
@ -209,7 +210,8 @@ func (r *Runc) Exec(context context.Context, id string, spec specs.Process, opts
}
return nil
}
if err := Monitor.Start(cmd); err != nil {
ec, err := Monitor.Start(cmd)
if err != nil {
return err
}
if opts != nil && opts.IO != nil {
@ -219,7 +221,7 @@ func (r *Runc) Exec(context context.Context, id string, spec specs.Process, opts
}
}
}
_, err = Monitor.Wait(cmd)
_, err = Monitor.Wait(cmd, ec)
return err
}
@ -238,10 +240,11 @@ func (r *Runc) Run(context context.Context, id, bundle string, opts *CreateOpts)
if opts != nil {
opts.Set(cmd)
}
if err := Monitor.Start(cmd); err != nil {
ec, err := Monitor.Start(cmd)
if err != nil {
return -1, err
}
return Monitor.Wait(cmd)
return Monitor.Wait(cmd, ec)
}
type DeleteOpts struct {
@ -294,13 +297,14 @@ func (r *Runc) Stats(context context.Context, id string) (*Stats, error) {
if err != nil {
return nil, err
}
defer func() {
rd.Close()
Monitor.Wait(cmd)
}()
if err := Monitor.Start(cmd); err != nil {
ec, err := Monitor.Start(cmd)
if err != nil {
return nil, err
}
defer func() {
rd.Close()
Monitor.Wait(cmd, ec)
}()
var e Event
if err := json.NewDecoder(rd).Decode(&e); err != nil {
return nil, err
@ -315,7 +319,8 @@ func (r *Runc) Events(context context.Context, id string, interval time.Duration
if err != nil {
return nil, err
}
if err := Monitor.Start(cmd); err != nil {
ec, err := Monitor.Start(cmd)
if err != nil {
rd.Close()
return nil, err
}
@ -327,7 +332,7 @@ func (r *Runc) Events(context context.Context, id string, interval time.Duration
defer func() {
close(c)
rd.Close()
Monitor.Wait(cmd)
Monitor.Wait(cmd, ec)
}()
for {
var e Event
@ -505,7 +510,8 @@ func (r *Runc) Restore(context context.Context, id, bundle string, opts *Restore
if opts != nil {
opts.Set(cmd)
}
if err := Monitor.Start(cmd); err != nil {
ec, err := Monitor.Start(cmd)
if err != nil {
return -1, err
}
if opts != nil && opts.IO != nil {
@ -515,7 +521,7 @@ func (r *Runc) Restore(context context.Context, id, bundle string, opts *Restore
}
}
}
return Monitor.Wait(cmd)
return Monitor.Wait(cmd, ec)
}
// Update updates the current container with the provided resource spec
@ -596,8 +602,8 @@ func (r *Runc) args() (out []string) {
if r.Criu != "" {
out = append(out, "--criu", r.Criu)
}
if r.SystemdCgroup != "" {
out = append(out, "--systemd-cgroup", r.SystemdCgroup)
if r.SystemdCgroup {
out = append(out, "--systemd-cgroup")
}
return out
}