Try to preserve exit event order

Signed-off-by: Michael Crosby <crosbymichael@gmail.com>
This commit is contained in:
Michael Crosby 2019-08-15 21:14:25 +00:00
parent bee4c1a8a2
commit 2763639388

View File

@ -33,41 +33,59 @@ var ErrNoSuchProcess = errors.New("no such process")
const bufferSize = 2048
type subscriber struct {
sync.Mutex
c chan runc.Exit
closed bool
}
func (s *subscriber) close() {
s.Lock()
if s.closed {
s.Unlock()
return
}
close(s.c)
s.closed = true
s.Unlock()
}
func (s *subscriber) do(fn func()) {
s.Lock()
fn()
s.Unlock()
}
// Reap should be called when the process receives an SIGCHLD. Reap will reap
// all exited processes and close their wait channels
func Reap() error {
var (
now = time.Now()
current []chan runc.Exit
)
now := time.Now()
exits, err := sys.Reap(false)
Default.Lock()
for c := range Default.subscribers {
current = append(current, c)
}
Default.Unlock()
for _, e := range exits {
go notify(runc.Exit{
done := Default.notify(runc.Exit{
Timestamp: now,
Pid: e.Pid,
Status: e.Status,
}, current)
})
select {
case <-done:
case <-time.After(1 * time.Second):
}
}
return err
}
// Default is the default monitor initialized for the package
var Default = &Monitor{
subscribers: make(map[chan runc.Exit]struct{}),
subscribers: make(map[chan runc.Exit]*subscriber),
}
// Monitor monitors the underlying system for process status changes
type Monitor struct {
sync.Mutex
subscribers map[chan runc.Exit]struct{}
subscribers map[chan runc.Exit]*subscriber
}
// Start starts the command a registers the process with the reaper
@ -101,7 +119,9 @@ func (m *Monitor) Wait(c *exec.Cmd, ec chan runc.Exit) (int, error) {
func (m *Monitor) Subscribe() chan runc.Exit {
c := make(chan runc.Exit, bufferSize)
m.Lock()
m.subscribers[c] = struct{}{}
m.subscribers[c] = &subscriber{
c: c,
}
m.Unlock()
return c
}
@ -109,32 +129,74 @@ func (m *Monitor) Subscribe() chan runc.Exit {
// Unsubscribe to process exit changes
func (m *Monitor) Unsubscribe(c chan runc.Exit) {
m.Lock()
s, ok := m.subscribers[c]
if !ok {
m.Unlock()
return
}
s.close()
delete(m.subscribers, c)
close(c)
m.Unlock()
}
func notify(e runc.Exit, subscribers []chan runc.Exit) {
const timeout = 10 * time.Millisecond
timer := time.NewTimer(timeout)
timer.Stop()
func (m *Monitor) getSubscribers() map[chan runc.Exit]*subscriber {
out := make(map[chan runc.Exit]*subscriber)
m.Lock()
for k, v := range m.subscribers {
out[k] = v
}
m.Unlock()
return out
}
for i := 0; i < 50; i++ {
var failed []chan runc.Exit
func (m *Monitor) notify(e runc.Exit) chan struct{} {
const timeout = 1 * time.Millisecond
var (
done = make(chan struct{}, 1)
timer = time.NewTimer(timeout)
success = make(map[chan runc.Exit]struct{})
)
stop(timer, true)
go func() {
defer close(done)
for {
var (
failed int
subscribers = m.getSubscribers()
)
for _, s := range subscribers {
timer.Reset(timeout)
select {
case s <- e:
case <-timer.C:
failed = append(failed, s)
}
timer.Stop()
}
// all subscribers received the message
if len(failed) == 0 {
s.do(func() {
if s.closed {
return
}
subscribers = failed
if _, ok := success[s.c]; ok {
return
}
timer.Reset(timeout)
recv := true
select {
case s.c <- e:
success[s.c] = struct{}{}
case <-timer.C:
recv = false
failed++
}
stop(timer, recv)
})
}
// all subscribers received the message
if failed == 0 {
return
}
}
}()
return done
}
func stop(timer *time.Timer, recv bool) {
if !timer.Stop() && recv {
<-timer.C
}
}