Fix shim hung

shim.Reap and shim.Default.Wait may deadlock, use Monitor.Notify
to fix this issue.

Signed-off-by: Shukui Yang <keloyangsk@gmail.com>
This commit is contained in:
Shukui Yang 2019-08-14 22:14:36 +08:00 committed by Michael Crosby
parent e07359b761
commit bb4c92c773

View File

@ -24,7 +24,7 @@ import (
"time" "time"
"github.com/containerd/containerd/sys" "github.com/containerd/containerd/sys"
runc "github.com/containerd/go-runc" "github.com/containerd/go-runc"
"github.com/pkg/errors" "github.com/pkg/errors"
) )
@ -38,30 +38,26 @@ const bufferSize = 2048
func Reap() error { func Reap() error {
now := time.Now() now := time.Now()
exits, err := sys.Reap(false) exits, err := sys.Reap(false)
Default.Lock()
for c := range Default.subscribers {
for _, e := range exits { for _, e := range exits {
c <- runc.Exit{ Default.Notify(runc.Exit{
Timestamp: now, Timestamp: now,
Pid: e.Pid, Pid: e.Pid,
Status: e.Status, Status: e.Status,
})
} }
}
}
Default.Unlock()
return err return err
} }
// Default is the default monitor initialized for the package // Default is the default monitor initialized for the package
var Default = &Monitor{ var Default = &Monitor{
subscribers: make(map[chan runc.Exit]struct{}), subscribers: make(map[chan runc.Exit]bool),
} }
// Monitor monitors the underlying system for process status changes // Monitor monitors the underlying system for process status changes
type Monitor struct { type Monitor struct {
sync.Mutex sync.Mutex
subscribers map[chan runc.Exit]struct{} subscribers map[chan runc.Exit]bool
} }
// Start starts the command a registers the process with the reaper // Start starts the command a registers the process with the reaper
@ -95,7 +91,7 @@ func (m *Monitor) Wait(c *exec.Cmd, ec chan runc.Exit) (int, error) {
func (m *Monitor) Subscribe() chan runc.Exit { func (m *Monitor) Subscribe() chan runc.Exit {
c := make(chan runc.Exit, bufferSize) c := make(chan runc.Exit, bufferSize)
m.Lock() m.Lock()
m.subscribers[c] = struct{}{} m.subscribers[c] = false
m.Unlock() m.Unlock()
return c return c
} }
@ -107,3 +103,29 @@ func (m *Monitor) Unsubscribe(c chan runc.Exit) {
close(c) close(c)
m.Unlock() m.Unlock()
} }
// Notify to subscribers exit changes
func (m *Monitor) Notify(e runc.Exit) {
retry:
m.Lock()
for c, v := range m.subscribers {
// subscriber has receive this exit
if v == true {
continue
}
select {
case c <- e:
m.subscribers[c] = true
case <-time.After(time.Millisecond):
m.Unlock()
goto retry
}
}
// reset subscriber's state
for c := range m.subscribers {
m.subscribers[c] = false
}
m.Unlock()
}