Add retry and non-blocking send for exit events

Signed-off-by: Michael Crosby <crosbymichael@gmail.com>
This commit is contained in:
Michael Crosby 2019-08-15 18:53:59 +00:00
parent 0d27d8f4f2
commit bee4c1a8a2
2 changed files with 41 additions and 10 deletions

View File

@ -22,7 +22,7 @@ import (
"os" "os"
"os/signal" "os/signal"
"github.com/containerd/containerd/runtime/v1/shim" "github.com/containerd/containerd/sys/reaper"
runc "github.com/containerd/go-runc" runc "github.com/containerd/go-runc"
"github.com/containerd/ttrpc" "github.com/containerd/ttrpc"
) )
@ -34,7 +34,7 @@ func setupSignals() (chan os.Signal, error) {
signal.Notify(signals) signal.Notify(signals)
// make sure runc is setup to use the monitor // make sure runc is setup to use the monitor
// for waiting on processes // for waiting on processes
runc.Monitor = shim.Default runc.Monitor = reaper.Default
return signals, nil return signals, nil
} }

View File

@ -36,19 +36,25 @@ const bufferSize = 2048
// Reap should be called when the process receives an SIGCHLD. Reap will reap // Reap should be called when the process receives an SIGCHLD. Reap will reap
// all exited processes and close their wait channels // all exited processes and close their wait channels
func Reap() error { func Reap() error {
now := time.Now() var (
now = time.Now()
current []chan runc.Exit
)
exits, err := sys.Reap(false) exits, err := sys.Reap(false)
Default.Lock() Default.Lock()
for c := range Default.subscribers { for c := range Default.subscribers {
current = append(current, c)
}
Default.Unlock()
for _, e := range exits { for _, e := range exits {
c <- runc.Exit{ go notify(runc.Exit{
Timestamp: now, Timestamp: now,
Pid: e.Pid, Pid: e.Pid,
Status: e.Status, Status: e.Status,
}, current)
} }
}
}
Default.Unlock()
return err return err
} }
@ -107,3 +113,28 @@ func (m *Monitor) Unsubscribe(c chan runc.Exit) {
close(c) close(c)
m.Unlock() m.Unlock()
} }
func notify(e runc.Exit, subscribers []chan runc.Exit) {
const timeout = 10 * time.Millisecond
timer := time.NewTimer(timeout)
timer.Stop()
for i := 0; i < 50; i++ {
var failed []chan runc.Exit
for _, s := range subscribers {
timer.Reset(timeout)
select {
case s <- e:
case <-timer.C:
failed = append(failed, s)
}
timer.Stop()
}
// all subscribers received the message
if len(failed) == 0 {
return
}
subscribers = failed
}
}