diff --git a/runtime/v1/shim/reaper.go b/runtime/v1/shim/reaper.go index 45a88db12..4e002f5dc 100644 --- a/runtime/v1/shim/reaper.go +++ b/runtime/v1/shim/reaper.go @@ -24,7 +24,7 @@ import ( "time" "github.com/containerd/containerd/sys" - runc "github.com/containerd/go-runc" + "github.com/containerd/go-runc" "github.com/pkg/errors" ) @@ -38,30 +38,26 @@ const bufferSize = 2048 func Reap() error { now := time.Now() exits, err := sys.Reap(false) - Default.Lock() - for c := range Default.subscribers { - for _, e := range exits { - c <- runc.Exit{ - Timestamp: now, - Pid: e.Pid, - Status: e.Status, - } - } + for _, e := range exits { + Default.Notify(runc.Exit{ + Timestamp: now, + Pid: e.Pid, + Status: e.Status, + }) } - Default.Unlock() return err } // Default is the default monitor initialized for the package var Default = &Monitor{ - subscribers: make(map[chan runc.Exit]struct{}), + subscribers: make(map[chan runc.Exit]bool), } // Monitor monitors the underlying system for process status changes type Monitor struct { sync.Mutex - subscribers map[chan runc.Exit]struct{} + subscribers map[chan runc.Exit]bool } // Start starts the command a registers the process with the reaper @@ -95,7 +91,7 @@ func (m *Monitor) Wait(c *exec.Cmd, ec chan runc.Exit) (int, error) { func (m *Monitor) Subscribe() chan runc.Exit { c := make(chan runc.Exit, bufferSize) m.Lock() - m.subscribers[c] = struct{}{} + m.subscribers[c] = false m.Unlock() return c } @@ -107,3 +103,29 @@ func (m *Monitor) Unsubscribe(c chan runc.Exit) { close(c) m.Unlock() } + +// Notify to subscribers exit changes +func (m *Monitor) Notify(e runc.Exit) { +retry: + m.Lock() + for c, v := range m.subscribers { + // subscriber has receive this exit + if v == true { + continue + } + + select { + case c <- e: + m.subscribers[c] = true + case <-time.After(time.Millisecond): + m.Unlock() + goto retry + } + } + + // reset subscriber's state + for c := range m.subscribers { + m.subscribers[c] = false + } + m.Unlock() +}