From bee4c1a8a287e2469aed2968141199aba1304912 Mon Sep 17 00:00:00 2001 From: Michael Crosby Date: Thu, 15 Aug 2019 18:53:59 +0000 Subject: [PATCH] Add retry and non-blocking send for exit events Signed-off-by: Michael Crosby --- cmd/containerd-shim/shim_darwin.go | 4 +-- sys/reaper/reaper_unix.go | 47 +++++++++++++++++++++++++----- 2 files changed, 41 insertions(+), 10 deletions(-) diff --git a/cmd/containerd-shim/shim_darwin.go b/cmd/containerd-shim/shim_darwin.go index 85593d6c8..d6dc23072 100644 --- a/cmd/containerd-shim/shim_darwin.go +++ b/cmd/containerd-shim/shim_darwin.go @@ -22,7 +22,7 @@ import ( "os" "os/signal" - "github.com/containerd/containerd/runtime/v1/shim" + "github.com/containerd/containerd/sys/reaper" runc "github.com/containerd/go-runc" "github.com/containerd/ttrpc" ) @@ -34,7 +34,7 @@ func setupSignals() (chan os.Signal, error) { signal.Notify(signals) // make sure runc is setup to use the monitor // for waiting on processes - runc.Monitor = shim.Default + runc.Monitor = reaper.Default return signals, nil } diff --git a/sys/reaper/reaper_unix.go b/sys/reaper/reaper_unix.go index 4a150a8f9..85b7034ef 100644 --- a/sys/reaper/reaper_unix.go +++ b/sys/reaper/reaper_unix.go @@ -36,19 +36,25 @@ const bufferSize = 2048 // Reap should be called when the process receives an SIGCHLD. Reap will reap // all exited processes and close their wait channels func Reap() error { - now := time.Now() + var ( + now = time.Now() + current []chan runc.Exit + ) exits, err := sys.Reap(false) + Default.Lock() for c := range Default.subscribers { - for _, e := range exits { - c <- runc.Exit{ - Timestamp: now, - Pid: e.Pid, - Status: e.Status, - } - } + current = append(current, c) } Default.Unlock() + + for _, e := range exits { + go notify(runc.Exit{ + Timestamp: now, + Pid: e.Pid, + Status: e.Status, + }, current) + } return err } @@ -107,3 +113,28 @@ func (m *Monitor) Unsubscribe(c chan runc.Exit) { close(c) m.Unlock() } + +func notify(e runc.Exit, subscribers []chan runc.Exit) { + const timeout = 10 * time.Millisecond + timer := time.NewTimer(timeout) + timer.Stop() + + for i := 0; i < 50; i++ { + var failed []chan runc.Exit + for _, s := range subscribers { + timer.Reset(timeout) + + select { + case s <- e: + case <-timer.C: + failed = append(failed, s) + } + timer.Stop() + } + // all subscribers received the message + if len(failed) == 0 { + return + } + subscribers = failed + } +}