diff --git a/sys/reaper/reaper_unix.go b/sys/reaper/reaper_unix.go index 85b7034ef..8f764ec1e 100644 --- a/sys/reaper/reaper_unix.go +++ b/sys/reaper/reaper_unix.go @@ -33,41 +33,59 @@ var ErrNoSuchProcess = errors.New("no such process") const bufferSize = 2048 +type subscriber struct { + sync.Mutex + c chan runc.Exit + closed bool +} + +func (s *subscriber) close() { + s.Lock() + if s.closed { + s.Unlock() + return + } + close(s.c) + s.closed = true + s.Unlock() +} + +func (s *subscriber) do(fn func()) { + s.Lock() + fn() + s.Unlock() +} + // Reap should be called when the process receives an SIGCHLD. Reap will reap // all exited processes and close their wait channels func Reap() error { - var ( - now = time.Now() - current []chan runc.Exit - ) + now := time.Now() exits, err := sys.Reap(false) - - Default.Lock() - for c := range Default.subscribers { - current = append(current, c) - } - Default.Unlock() - for _, e := range exits { - go notify(runc.Exit{ + done := Default.notify(runc.Exit{ Timestamp: now, Pid: e.Pid, Status: e.Status, - }, current) + }) + + select { + case <-done: + case <-time.After(1 * time.Second): + } } return err } // Default is the default monitor initialized for the package var Default = &Monitor{ - subscribers: make(map[chan runc.Exit]struct{}), + subscribers: make(map[chan runc.Exit]*subscriber), } // Monitor monitors the underlying system for process status changes type Monitor struct { sync.Mutex - subscribers map[chan runc.Exit]struct{} + subscribers map[chan runc.Exit]*subscriber } // Start starts the command a registers the process with the reaper @@ -101,7 +119,9 @@ func (m *Monitor) Wait(c *exec.Cmd, ec chan runc.Exit) (int, error) { func (m *Monitor) Subscribe() chan runc.Exit { c := make(chan runc.Exit, bufferSize) m.Lock() - m.subscribers[c] = struct{}{} + m.subscribers[c] = &subscriber{ + c: c, + } m.Unlock() return c } @@ -109,32 +129,74 @@ func (m *Monitor) Subscribe() chan runc.Exit { // Unsubscribe to process exit changes func (m *Monitor) Unsubscribe(c chan runc.Exit) { m.Lock() + s, ok := m.subscribers[c] + if !ok { + m.Unlock() + return + } + s.close() delete(m.subscribers, c) - close(c) m.Unlock() } -func notify(e runc.Exit, subscribers []chan runc.Exit) { - const timeout = 10 * time.Millisecond - timer := time.NewTimer(timeout) - timer.Stop() +func (m *Monitor) getSubscribers() map[chan runc.Exit]*subscriber { + out := make(map[chan runc.Exit]*subscriber) + m.Lock() + for k, v := range m.subscribers { + out[k] = v + } + m.Unlock() + return out +} - for i := 0; i < 50; i++ { - var failed []chan runc.Exit - for _, s := range subscribers { - timer.Reset(timeout) +func (m *Monitor) notify(e runc.Exit) chan struct{} { + const timeout = 1 * time.Millisecond + var ( + done = make(chan struct{}, 1) + timer = time.NewTimer(timeout) + success = make(map[chan runc.Exit]struct{}) + ) + stop(timer, true) - select { - case s <- e: - case <-timer.C: - failed = append(failed, s) + go func() { + defer close(done) + + for { + var ( + failed int + subscribers = m.getSubscribers() + ) + for _, s := range subscribers { + s.do(func() { + if s.closed { + return + } + if _, ok := success[s.c]; ok { + return + } + timer.Reset(timeout) + recv := true + select { + case s.c <- e: + success[s.c] = struct{}{} + case <-timer.C: + recv = false + failed++ + } + stop(timer, recv) + }) + } + // all subscribers received the message + if failed == 0 { + return } - timer.Stop() } - // all subscribers received the message - if len(failed) == 0 { - return - } - subscribers = failed + }() + return done +} + +func stop(timer *time.Timer, recv bool) { + if !timer.Stop() && recv { + <-timer.C } }