From 6b4c4a29371fa3dea9c005ad24e2bea340c763ab Mon Sep 17 00:00:00 2001 From: Michael Crosby Date: Thu, 31 Aug 2017 13:36:40 -0400 Subject: [PATCH] Update reaper for multipe subscribers Depends on https://github.com/containerd/go-runc/pull/24 The is currently a race with the reaper where you could miss some exit events from processes. The problem before and why the reaper was so complex was because processes could fork, getting a pid, and then fail on an execve before we would have time to register the process with the reaper. This could cause pids to fill up in a map as a way to reduce the race. This changes makes the reaper handle multiple subscribers so that the caller can handle locking, for when they want to wait for a specific pid, without affecting other callers using the reaper code. Exit events are broadcast to multiple subscribers, in the case, the runc commands and container pids that we get from a pid-file. Locking while the entire container stats no longs affects runc commands where you want to call `runc create` and wait until that has been completed. Signed-off-by: Michael Crosby --- linux/runtime.go | 19 ++- linux/shim/client.go | 6 +- linux/shim/init_state.go | 6 +- linux/shim/service.go | 49 ++++--- reaper/reaper.go | 136 +++++++----------- vendor.conf | 2 +- .../github.com/containerd/go-runc/monitor.go | 49 +++++-- vendor/github.com/containerd/go-runc/runc.go | 42 +++--- 8 files changed, 153 insertions(+), 156 deletions(-) diff --git a/linux/runtime.go b/linux/runtime.go index a2e09d527..f3638a149 100644 --- a/linux/runtime.go +++ b/linux/runtime.go @@ -144,6 +144,9 @@ func (r *Runtime) Create(ctx context.Context, id string, opts runtime.CreateOpts return nil, errors.Wrapf(err, "invalid task id") } + ec := reaper.Default.Subscribe() + defer reaper.Default.Unsubscribe(ec) + bundle, err := newBundle( namespace, id, filepath.Join(r.state, namespace), @@ -177,7 +180,7 @@ func (r *Runtime) Create(ctx context.Context, id string, opts runtime.CreateOpts "id": id, "namespace": namespace, }).Warn("cleaning up after killed shim") - err = r.cleanupAfterDeadShim(context.Background(), bundle, namespace, id, lc.pid, true) + err = r.cleanupAfterDeadShim(context.Background(), bundle, namespace, id, lc.pid, ec) if err == nil { r.tasks.Delete(ctx, lc) } else { @@ -320,7 +323,7 @@ func (r *Runtime) loadTasks(ctx context.Context, ns string) ([]*Task, error) { "namespace": ns, }).Error("connecting to shim") pid, _ := runc.ReadPidFile(filepath.Join(bundle.path, client.InitPidFile)) - err := r.cleanupAfterDeadShim(ctx, bundle, ns, id, pid, false) + err := r.cleanupAfterDeadShim(ctx, bundle, ns, id, pid, nil) if err != nil { log.G(ctx).WithError(err).WithField("bundle", bundle.path). Error("cleaning up after dead shim") @@ -336,18 +339,20 @@ func (r *Runtime) loadTasks(ctx context.Context, ns string) ([]*Task, error) { return o, nil } -func (r *Runtime) cleanupAfterDeadShim(ctx context.Context, bundle *bundle, ns, id string, pid int, reap bool) error { +func (r *Runtime) cleanupAfterDeadShim(ctx context.Context, bundle *bundle, ns, id string, pid int, ec chan runc.Exit) error { ctx = namespaces.WithNamespace(ctx, ns) if err := r.terminate(ctx, bundle, ns, id); err != nil { return errors.New("failed to terminate task, leaving bundle for debugging") } - if reap { + if ec != nil { // if sub-reaper is set, reap our new child if v, err := sys.GetSubreaper(); err == nil && v == 1 { - reaper.Default.Register(pid, &reaper.Cmd{ExitCh: make(chan struct{})}) - reaper.Default.WaitPid(pid) - reaper.Default.Delete(pid) + for e := range ec { + if e.Pid == pid { + break + } + } } } diff --git a/linux/shim/client.go b/linux/shim/client.go index 17b1c4e05..5ff0461b5 100644 --- a/linux/shim/client.go +++ b/linux/shim/client.go @@ -44,7 +44,8 @@ func WithStart(binary, address string, debug bool, exitHandler func()) ClientOpt defer f.Close() cmd := newCommand(binary, address, debug, config, f) - if err := reaper.Default.Start(cmd); err != nil { + ec, err := reaper.Default.Start(cmd) + if err != nil { return nil, nil, errors.Wrapf(err, "failed to start shim") } defer func() { @@ -53,8 +54,7 @@ func WithStart(binary, address string, debug bool, exitHandler func()) ClientOpt } }() go func() { - reaper.Default.Wait(cmd) - reaper.Default.Delete(cmd.Process.Pid) + reaper.Default.Wait(cmd, ec) exitHandler() }() log.G(ctx).WithFields(logrus.Fields{ diff --git a/linux/shim/init_state.go b/linux/shim/init_state.go index aa0456a16..39846d859 100644 --- a/linux/shim/init_state.go +++ b/linux/shim/init_state.go @@ -6,6 +6,7 @@ import ( "context" "github.com/containerd/console" + "github.com/containerd/containerd/errdefs" shimapi "github.com/containerd/containerd/linux/shim/v1" "github.com/pkg/errors" ) @@ -345,10 +346,7 @@ func (s *stoppedState) Delete(ctx context.Context) error { } func (s *stoppedState) Kill(ctx context.Context, sig uint32, all bool) error { - s.p.mu.Lock() - defer s.p.mu.Unlock() - - return s.p.kill(ctx, sig, all) + return errdefs.ToGRPCf(errdefs.ErrNotFound, "process %s not found", s.p.id) } func (s *stoppedState) SetExited(status int) { diff --git a/linux/shim/service.go b/linux/shim/service.go index a47e596e8..0168c8fc6 100644 --- a/linux/shim/service.go +++ b/linux/shim/service.go @@ -20,6 +20,7 @@ import ( "github.com/containerd/containerd/namespaces" "github.com/containerd/containerd/reaper" "github.com/containerd/containerd/runtime" + runc "github.com/containerd/go-runc" google_protobuf "github.com/golang/protobuf/ptypes/empty" "github.com/pkg/errors" "github.com/sirupsen/logrus" @@ -48,7 +49,9 @@ func NewService(path, namespace, workDir string, publisher events.Publisher) (*S namespace: namespace, context: context, workDir: workDir, + ec: reaper.Default.Subscribe(), } + go s.processExits() if err := s.initPlatform(); err != nil { return nil, errors.Wrap(err, "failed to initialized platform behavior") } @@ -70,31 +73,27 @@ type Service struct { mu sync.Mutex processes map[string]process events chan interface{} - eventsMu sync.Mutex deferredEvent interface{} namespace string context context.Context + ec chan runc.Exit workDir string platform platform } func (s *Service) Create(ctx context.Context, r *shimapi.CreateTaskRequest) (*shimapi.CreateTaskResponse, error) { + s.mu.Lock() + defer s.mu.Unlock() process, err := newInitProcess(ctx, s.platform, s.path, s.namespace, s.workDir, r) if err != nil { return nil, errdefs.ToGRPC(err) } - s.mu.Lock() // save the main task id and bundle to the shim for additional requests s.id = r.ID s.bundle = r.Bundle pid := process.Pid() s.processes[r.ID] = process - s.mu.Unlock() - cmd := &reaper.Cmd{ - ExitCh: make(chan struct{}), - } - reaper.Default.Register(pid, cmd) s.events <- &eventsapi.TaskCreate{ ContainerID: r.ID, Bundle: r.Bundle, @@ -108,7 +107,6 @@ func (s *Service) Create(ctx context.Context, r *shimapi.CreateTaskRequest) (*sh Checkpoint: r.Checkpoint, Pid: uint32(pid), } - go s.waitExit(process, pid, cmd) return &shimapi.CreateTaskResponse{ Pid: uint32(pid), }, nil @@ -129,11 +127,6 @@ func (s *Service) Start(ctx context.Context, r *shimapi.StartRequest) (*shimapi. } } else { pid := p.Pid() - cmd := &reaper.Cmd{ - ExitCh: make(chan struct{}), - } - reaper.Default.Register(pid, cmd) - go s.waitExit(p, pid, cmd) s.events <- &eventsapi.TaskExecStarted{ ContainerID: s.id, ExecID: r.ID, @@ -392,17 +385,27 @@ func (s *Service) deleteProcess(id string) { s.mu.Unlock() } -func (s *Service) waitExit(p process, pid int, cmd *reaper.Cmd) { - status, _ := reaper.Default.WaitPid(pid) - p.SetExited(status) +func (s *Service) processExits() { + for e := range s.ec { + s.checkProcesses(e) + } +} - reaper.Default.Delete(pid) - s.events <- &eventsapi.TaskExit{ - ContainerID: s.id, - ID: p.ID(), - Pid: uint32(pid), - ExitStatus: uint32(status), - ExitedAt: p.ExitedAt(), +func (s *Service) checkProcesses(e runc.Exit) { + s.mu.Lock() + defer s.mu.Unlock() + for _, p := range s.processes { + if p.Pid() == e.Pid { + p.SetExited(e.Status) + s.events <- &eventsapi.TaskExit{ + ContainerID: s.id, + ID: p.ID(), + Pid: uint32(e.Pid), + ExitStatus: uint32(e.Status), + ExitedAt: p.ExitedAt(), + } + return + } } } diff --git a/reaper/reaper.go b/reaper/reaper.go index 4c8a58c75..97df901dc 100644 --- a/reaper/reaper.go +++ b/reaper/reaper.go @@ -6,49 +6,45 @@ import ( "bytes" "os/exec" "sync" + "time" "github.com/containerd/containerd/sys" + runc "github.com/containerd/go-runc" "github.com/pkg/errors" ) -var ( - ErrNoSuchProcess = errors.New("no such process") -) +var ErrNoSuchProcess = errors.New("no such process") + +const bufferSize = 2048 // Reap should be called when the process receives an SIGCHLD. Reap will reap // all exited processes and close their wait channels func Reap() error { + now := time.Now() exits, err := sys.Reap(false) - for _, e := range exits { - Default.Lock() - c, ok := Default.cmds[e.Pid] - if !ok { - Default.unknown[e.Pid] = e.Status - Default.Unlock() - continue + Default.Lock() + for c := range Default.subscribers { + for _, e := range exits { + c <- runc.Exit{ + Timestamp: now, + Pid: e.Pid, + Status: e.Status, + } } - Default.Unlock() - if c.c != nil { - // after we get an exit, call wait on the go process to make sure all - // pipes are closed and finalizers are run on the process - c.c.Wait() - } - c.exitStatus = e.Status - close(c.ExitCh) + } + Default.Unlock() return err } var Default = &Monitor{ - cmds: make(map[int]*Cmd), - unknown: make(map[int]int), + subscribers: make(map[chan runc.Exit]struct{}), } type Monitor struct { sync.Mutex - cmds map[int]*Cmd - unknown map[int]int + subscribers map[chan runc.Exit]struct{} } func (m *Monitor) Output(c *exec.Cmd) ([]byte, error) { @@ -69,82 +65,50 @@ func (m *Monitor) CombinedOutput(c *exec.Cmd) ([]byte, error) { } // Start starts the command a registers the process with the reaper -func (m *Monitor) Start(c *exec.Cmd) error { - rc := &Cmd{ - c: c, - ExitCh: make(chan struct{}), +func (m *Monitor) Start(c *exec.Cmd) (chan runc.Exit, error) { + ec := m.Subscribe() + if err := c.Start(); err != nil { + m.Unsubscribe(ec) + return nil, err } - // start the process - m.Lock() - err := c.Start() - if c.Process != nil { - m.RegisterNL(c.Process.Pid, rc) - } - m.Unlock() - return err + return ec, nil } // Run runs and waits for the command to finish func (m *Monitor) Run(c *exec.Cmd) error { - if err := m.Start(c); err != nil { + ec, err := m.Start(c) + if err != nil { return err } - _, err := m.Wait(c) + _, err = m.Wait(c, ec) return err } -func (m *Monitor) Wait(c *exec.Cmd) (int, error) { - return m.WaitPid(c.Process.Pid) +func (m *Monitor) Wait(c *exec.Cmd, ec chan runc.Exit) (int, error) { + for e := range ec { + if e.Pid == c.Process.Pid { + // make sure we flush all IO + c.Wait() + m.Unsubscribe(ec) + return e.Status, nil + } + } + // return no such process if the ec channel is closed and no more exit + // events will be sent + return -1, ErrNoSuchProcess } -func (m *Monitor) Register(pid int, c *Cmd) { +func (m *Monitor) Subscribe() chan runc.Exit { + c := make(chan runc.Exit, bufferSize) m.Lock() - m.RegisterNL(pid, c) + m.subscribers[c] = struct{}{} + m.Unlock() + return c +} + +func (m *Monitor) Unsubscribe(c chan runc.Exit) { + m.Lock() + delete(m.subscribers, c) + close(c) m.Unlock() } - -// RegisterNL does not grab the lock internally -// the caller is responsible for locking the monitor -func (m *Monitor) RegisterNL(pid int, c *Cmd) { - if status, ok := m.unknown[pid]; ok { - delete(m.unknown, pid) - m.cmds[pid] = c - c.exitStatus = status - close(c.ExitCh) - return - } - m.cmds[pid] = c -} - -func (m *Monitor) WaitPid(pid int) (int, error) { - m.Lock() - rc, ok := m.cmds[pid] - m.Unlock() - if !ok { - return 255, errors.Wrapf(ErrNoSuchProcess, "pid %d", pid) - } - <-rc.ExitCh - if rc.exitStatus != 0 { - return rc.exitStatus, errors.Errorf("exit status %d", rc.exitStatus) - } - return rc.exitStatus, nil -} - -// Command returns the registered pid for the command created -func (m *Monitor) Command(pid int) *Cmd { - m.Lock() - defer m.Unlock() - return m.cmds[pid] -} - -func (m *Monitor) Delete(pid int) { - m.Lock() - delete(m.cmds, pid) - m.Unlock() -} - -type Cmd struct { - c *exec.Cmd - ExitCh chan struct{} - exitStatus int -} diff --git a/vendor.conf b/vendor.conf index b93e27e1c..7c58b0334 100644 --- a/vendor.conf +++ b/vendor.conf @@ -1,5 +1,5 @@ github.com/coreos/go-systemd 48702e0da86bd25e76cfef347e2adeb434a0d0a6 -github.com/containerd/go-runc b85ac701de5065a66918203dd18f057433290807 +github.com/containerd/go-runc e103f453ff3db23ec69d31371cadc1ea0ce87ec0 github.com/containerd/console 76d18fd1d66972718ab2284449591db0b3cdb4de github.com/containerd/cgroups e6d1aa8c71c6103624b2c6e6f4be0863b67027f1 github.com/docker/go-metrics 8fd5772bf1584597834c6f7961a530f06cbfbb87 diff --git a/vendor/github.com/containerd/go-runc/monitor.go b/vendor/github.com/containerd/go-runc/monitor.go index f6aed52c8..4ad539927 100644 --- a/vendor/github.com/containerd/go-runc/monitor.go +++ b/vendor/github.com/containerd/go-runc/monitor.go @@ -3,10 +3,17 @@ package runc import ( "os/exec" "syscall" + "time" ) var Monitor ProcessMonitor = &defaultMonitor{} +type Exit struct { + Timestamp time.Time + Pid int + Status int +} + // ProcessMonitor is an interface for process monitoring // // It allows daemons using go-runc to have a SIGCHLD handler @@ -18,8 +25,8 @@ type ProcessMonitor interface { Output(*exec.Cmd) ([]byte, error) CombinedOutput(*exec.Cmd) ([]byte, error) Run(*exec.Cmd) error - Start(*exec.Cmd) error - Wait(*exec.Cmd) (int, error) + Start(*exec.Cmd) (chan Exit, error) + Wait(*exec.Cmd, chan Exit) (int, error) } type defaultMonitor struct { @@ -37,18 +44,32 @@ func (m *defaultMonitor) Run(c *exec.Cmd) error { return c.Run() } -func (m *defaultMonitor) Start(c *exec.Cmd) error { - return c.Start() -} - -func (m *defaultMonitor) Wait(c *exec.Cmd) (int, error) { - if err := c.Wait(); err != nil { - if exitErr, ok := err.(*exec.ExitError); ok { - if status, ok := exitErr.Sys().(syscall.WaitStatus); ok { - return status.ExitStatus(), nil +func (m *defaultMonitor) Start(c *exec.Cmd) (chan Exit, error) { + if err := c.Start(); err != nil { + return nil, err + } + ec := make(chan Exit, 1) + go func() { + var status int + if err := c.Wait(); err != nil { + status = 255 + if exitErr, ok := err.(*exec.ExitError); ok { + if ws, ok := exitErr.Sys().(syscall.WaitStatus); ok { + status = ws.ExitStatus() + } } } - return -1, err - } - return 0, nil + ec <- Exit{ + Timestamp: time.Now(), + Pid: c.Process.Pid, + Status: status, + } + close(ec) + }() + return ec, nil +} + +func (m *defaultMonitor) Wait(c *exec.Cmd, ec chan Exit) (int, error) { + e := <-ec + return e.Status, nil } diff --git a/vendor/github.com/containerd/go-runc/runc.go b/vendor/github.com/containerd/go-runc/runc.go index 474306ede..92218db42 100644 --- a/vendor/github.com/containerd/go-runc/runc.go +++ b/vendor/github.com/containerd/go-runc/runc.go @@ -41,7 +41,7 @@ type Runc struct { PdeathSignal syscall.Signal Setpgid bool Criu string - SystemdCgroup string + SystemdCgroup bool } // List returns all containers created inside the provided runc root directory @@ -134,7 +134,8 @@ func (r *Runc) Create(context context.Context, id, bundle string, opts *CreateOp } return nil } - if err := Monitor.Start(cmd); err != nil { + ec, err := Monitor.Start(cmd) + if err != nil { return err } if opts != nil && opts.IO != nil { @@ -144,7 +145,7 @@ func (r *Runc) Create(context context.Context, id, bundle string, opts *CreateOp } } } - _, err := Monitor.Wait(cmd) + _, err = Monitor.Wait(cmd, ec) return err } @@ -209,7 +210,8 @@ func (r *Runc) Exec(context context.Context, id string, spec specs.Process, opts } return nil } - if err := Monitor.Start(cmd); err != nil { + ec, err := Monitor.Start(cmd) + if err != nil { return err } if opts != nil && opts.IO != nil { @@ -219,7 +221,7 @@ func (r *Runc) Exec(context context.Context, id string, spec specs.Process, opts } } } - _, err = Monitor.Wait(cmd) + _, err = Monitor.Wait(cmd, ec) return err } @@ -238,10 +240,11 @@ func (r *Runc) Run(context context.Context, id, bundle string, opts *CreateOpts) if opts != nil { opts.Set(cmd) } - if err := Monitor.Start(cmd); err != nil { + ec, err := Monitor.Start(cmd) + if err != nil { return -1, err } - return Monitor.Wait(cmd) + return Monitor.Wait(cmd, ec) } type DeleteOpts struct { @@ -294,13 +297,14 @@ func (r *Runc) Stats(context context.Context, id string) (*Stats, error) { if err != nil { return nil, err } - defer func() { - rd.Close() - Monitor.Wait(cmd) - }() - if err := Monitor.Start(cmd); err != nil { + ec, err := Monitor.Start(cmd) + if err != nil { return nil, err } + defer func() { + rd.Close() + Monitor.Wait(cmd, ec) + }() var e Event if err := json.NewDecoder(rd).Decode(&e); err != nil { return nil, err @@ -315,7 +319,8 @@ func (r *Runc) Events(context context.Context, id string, interval time.Duration if err != nil { return nil, err } - if err := Monitor.Start(cmd); err != nil { + ec, err := Monitor.Start(cmd) + if err != nil { rd.Close() return nil, err } @@ -327,7 +332,7 @@ func (r *Runc) Events(context context.Context, id string, interval time.Duration defer func() { close(c) rd.Close() - Monitor.Wait(cmd) + Monitor.Wait(cmd, ec) }() for { var e Event @@ -505,7 +510,8 @@ func (r *Runc) Restore(context context.Context, id, bundle string, opts *Restore if opts != nil { opts.Set(cmd) } - if err := Monitor.Start(cmd); err != nil { + ec, err := Monitor.Start(cmd) + if err != nil { return -1, err } if opts != nil && opts.IO != nil { @@ -515,7 +521,7 @@ func (r *Runc) Restore(context context.Context, id, bundle string, opts *Restore } } } - return Monitor.Wait(cmd) + return Monitor.Wait(cmd, ec) } // Update updates the current container with the provided resource spec @@ -596,8 +602,8 @@ func (r *Runc) args() (out []string) { if r.Criu != "" { out = append(out, "--criu", r.Criu) } - if r.SystemdCgroup != "" { - out = append(out, "--systemd-cgroup", r.SystemdCgroup) + if r.SystemdCgroup { + out = append(out, "--systemd-cgroup") } return out }