From bb4c92c773e195b86af92d5db26db6ca67164d7c Mon Sep 17 00:00:00 2001 From: Shukui Yang Date: Wed, 14 Aug 2019 22:14:36 +0800 Subject: [PATCH 1/4] Fix shim hung shim.Reap and shim.Default.Wait may deadlock, use Monitor.Notify to fix this issue. Signed-off-by: Shukui Yang --- runtime/v1/shim/reaper.go | 50 ++++++++++++++++++++++++++++----------- 1 file changed, 36 insertions(+), 14 deletions(-) diff --git a/runtime/v1/shim/reaper.go b/runtime/v1/shim/reaper.go index 45a88db12..4e002f5dc 100644 --- a/runtime/v1/shim/reaper.go +++ b/runtime/v1/shim/reaper.go @@ -24,7 +24,7 @@ import ( "time" "github.com/containerd/containerd/sys" - runc "github.com/containerd/go-runc" + "github.com/containerd/go-runc" "github.com/pkg/errors" ) @@ -38,30 +38,26 @@ const bufferSize = 2048 func Reap() error { now := time.Now() exits, err := sys.Reap(false) - Default.Lock() - for c := range Default.subscribers { - for _, e := range exits { - c <- runc.Exit{ - Timestamp: now, - Pid: e.Pid, - Status: e.Status, - } - } + for _, e := range exits { + Default.Notify(runc.Exit{ + Timestamp: now, + Pid: e.Pid, + Status: e.Status, + }) } - Default.Unlock() return err } // Default is the default monitor initialized for the package var Default = &Monitor{ - subscribers: make(map[chan runc.Exit]struct{}), + subscribers: make(map[chan runc.Exit]bool), } // Monitor monitors the underlying system for process status changes type Monitor struct { sync.Mutex - subscribers map[chan runc.Exit]struct{} + subscribers map[chan runc.Exit]bool } // Start starts the command a registers the process with the reaper @@ -95,7 +91,7 @@ func (m *Monitor) Wait(c *exec.Cmd, ec chan runc.Exit) (int, error) { func (m *Monitor) Subscribe() chan runc.Exit { c := make(chan runc.Exit, bufferSize) m.Lock() - m.subscribers[c] = struct{}{} + m.subscribers[c] = false m.Unlock() return c } @@ -107,3 +103,29 @@ func (m *Monitor) Unsubscribe(c chan runc.Exit) { close(c) m.Unlock() } + +// Notify to subscribers exit changes +func (m *Monitor) Notify(e runc.Exit) { +retry: + m.Lock() + for c, v := range m.subscribers { + // subscriber has receive this exit + if v == true { + continue + } + + select { + case c <- e: + m.subscribers[c] = true + case <-time.After(time.Millisecond): + m.Unlock() + goto retry + } + } + + // reset subscriber's state + for c := range m.subscribers { + m.subscribers[c] = false + } + m.Unlock() +} From 0d27d8f4f22909616f433134142c6b85a3e27cb9 Mon Sep 17 00:00:00 2001 From: Michael Crosby Date: Thu, 15 Aug 2019 18:40:56 +0000 Subject: [PATCH 2/4] Unifi reaper logic into package Signed-off-by: Michael Crosby --- cmd/containerd-shim/main_unix.go | 7 +- cmd/containerd-shim/shim_linux.go | 4 +- runtime/v1/shim/reaper.go | 131 ------------------ runtime/v1/shim/service.go | 3 +- runtime/v2/runc/v1/service.go | 5 +- runtime/v2/runc/v2/service.go | 5 +- runtime/v2/shim/shim_unix.go | 3 +- .../v2/shim => sys/reaper}/reaper_unix.go | 2 +- 8 files changed, 17 insertions(+), 143 deletions(-) delete mode 100644 runtime/v1/shim/reaper.go rename {runtime/v2/shim => sys/reaper}/reaper_unix.go (99%) diff --git a/cmd/containerd-shim/main_unix.go b/cmd/containerd-shim/main_unix.go index c8af6b968..49f16e6ca 100644 --- a/cmd/containerd-shim/main_unix.go +++ b/cmd/containerd-shim/main_unix.go @@ -41,6 +41,7 @@ import ( shimlog "github.com/containerd/containerd/runtime/v1" "github.com/containerd/containerd/runtime/v1/shim" shimapi "github.com/containerd/containerd/runtime/v1/shim/v1" + "github.com/containerd/containerd/sys/reaper" "github.com/containerd/ttrpc" "github.com/containerd/typeurl" ptypes "github.com/gogo/protobuf/types" @@ -233,7 +234,7 @@ func handleSignals(logger *logrus.Entry, signals chan os.Signal, server *ttrpc.S case s := <-signals: switch s { case unix.SIGCHLD: - if err := shim.Reap(); err != nil { + if err := reaper.Reap(); err != nil { logger.WithError(err).Error("reap exit status") } case unix.SIGTERM, unix.SIGINT: @@ -291,11 +292,11 @@ func (l *remoteEventsPublisher) Publish(ctx context.Context, topic string, event defer bufPool.Put(b) cmd.Stdout = b cmd.Stderr = b - c, err := shim.Default.Start(cmd) + c, err := reaper.Default.Start(cmd) if err != nil { return err } - status, err := shim.Default.Wait(cmd, c) + status, err := reaper.Default.Wait(cmd, c) if err != nil { return errors.Wrapf(err, "failed to publish event: %s", b.String()) } diff --git a/cmd/containerd-shim/shim_linux.go b/cmd/containerd-shim/shim_linux.go index d55646977..4d81aa6d5 100644 --- a/cmd/containerd-shim/shim_linux.go +++ b/cmd/containerd-shim/shim_linux.go @@ -20,7 +20,7 @@ import ( "os" "os/signal" - "github.com/containerd/containerd/runtime/v1/shim" + "github.com/containerd/containerd/sys/reaper" runc "github.com/containerd/go-runc" "github.com/containerd/ttrpc" "github.com/opencontainers/runc/libcontainer/system" @@ -34,7 +34,7 @@ func setupSignals() (chan os.Signal, error) { signal.Notify(signals, unix.SIGTERM, unix.SIGINT, unix.SIGCHLD, unix.SIGPIPE) // make sure runc is setup to use the monitor // for waiting on processes - runc.Monitor = shim.Default + runc.Monitor = reaper.Default // set the shim as the subreaper for all orphaned processes created by the container if err := system.SetSubreaper(1); err != nil { return nil, err diff --git a/runtime/v1/shim/reaper.go b/runtime/v1/shim/reaper.go deleted file mode 100644 index 4e002f5dc..000000000 --- a/runtime/v1/shim/reaper.go +++ /dev/null @@ -1,131 +0,0 @@ -// +build !windows - -/* - Copyright The containerd Authors. - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. -*/ - -package shim - -import ( - "os/exec" - "sync" - "time" - - "github.com/containerd/containerd/sys" - "github.com/containerd/go-runc" - "github.com/pkg/errors" -) - -// ErrNoSuchProcess is returned when the process no longer exists -var ErrNoSuchProcess = errors.New("no such process") - -const bufferSize = 2048 - -// Reap should be called when the process receives an SIGCHLD. Reap will reap -// all exited processes and close their wait channels -func Reap() error { - now := time.Now() - exits, err := sys.Reap(false) - for _, e := range exits { - Default.Notify(runc.Exit{ - Timestamp: now, - Pid: e.Pid, - Status: e.Status, - }) - } - return err -} - -// Default is the default monitor initialized for the package -var Default = &Monitor{ - subscribers: make(map[chan runc.Exit]bool), -} - -// Monitor monitors the underlying system for process status changes -type Monitor struct { - sync.Mutex - - subscribers map[chan runc.Exit]bool -} - -// Start starts the command a registers the process with the reaper -func (m *Monitor) Start(c *exec.Cmd) (chan runc.Exit, error) { - ec := m.Subscribe() - if err := c.Start(); err != nil { - m.Unsubscribe(ec) - return nil, err - } - return ec, nil -} - -// Wait blocks until a process is signal as dead. -// User should rely on the value of the exit status to determine if the -// command was successful or not. -func (m *Monitor) Wait(c *exec.Cmd, ec chan runc.Exit) (int, error) { - for e := range ec { - if e.Pid == c.Process.Pid { - // make sure we flush all IO - c.Wait() - m.Unsubscribe(ec) - return e.Status, nil - } - } - // return no such process if the ec channel is closed and no more exit - // events will be sent - return -1, ErrNoSuchProcess -} - -// Subscribe to process exit changes -func (m *Monitor) Subscribe() chan runc.Exit { - c := make(chan runc.Exit, bufferSize) - m.Lock() - m.subscribers[c] = false - m.Unlock() - return c -} - -// Unsubscribe to process exit changes -func (m *Monitor) Unsubscribe(c chan runc.Exit) { - m.Lock() - delete(m.subscribers, c) - close(c) - m.Unlock() -} - -// Notify to subscribers exit changes -func (m *Monitor) Notify(e runc.Exit) { -retry: - m.Lock() - for c, v := range m.subscribers { - // subscriber has receive this exit - if v == true { - continue - } - - select { - case c <- e: - m.subscribers[c] = true - case <-time.After(time.Millisecond): - m.Unlock() - goto retry - } - } - - // reset subscriber's state - for c := range m.subscribers { - m.subscribers[c] = false - } - m.Unlock() -} diff --git a/runtime/v1/shim/service.go b/runtime/v1/shim/service.go index f55700135..37eae7f8b 100644 --- a/runtime/v1/shim/service.go +++ b/runtime/v1/shim/service.go @@ -40,6 +40,7 @@ import ( "github.com/containerd/containerd/runtime" "github.com/containerd/containerd/runtime/linux/runctypes" shimapi "github.com/containerd/containerd/runtime/v1/shim/v1" + "github.com/containerd/containerd/sys/reaper" runc "github.com/containerd/go-runc" "github.com/containerd/typeurl" ptypes "github.com/gogo/protobuf/types" @@ -86,7 +87,7 @@ func NewService(config Config, publisher events.Publisher) (*Service, error) { context: ctx, processes: make(map[string]process.Process), events: make(chan interface{}, 128), - ec: Default.Subscribe(), + ec: reaper.Default.Subscribe(), } go s.processExits() if err := s.initPlatform(); err != nil { diff --git a/runtime/v2/runc/v1/service.go b/runtime/v2/runc/v1/service.go index c4f5328bf..516a25f0d 100644 --- a/runtime/v2/runc/v1/service.go +++ b/runtime/v2/runc/v1/service.go @@ -43,6 +43,7 @@ import ( "github.com/containerd/containerd/runtime/v2/runc/options" "github.com/containerd/containerd/runtime/v2/shim" taskAPI "github.com/containerd/containerd/runtime/v2/task" + "github.com/containerd/containerd/sys/reaper" runcC "github.com/containerd/go-runc" "github.com/containerd/typeurl" "github.com/gogo/protobuf/proto" @@ -70,12 +71,12 @@ func New(ctx context.Context, id string, publisher shim.Publisher, shutdown func id: id, context: ctx, events: make(chan interface{}, 128), - ec: shim.Default.Subscribe(), + ec: reaper.Default.Subscribe(), ep: ep, cancel: shutdown, } go s.processExits() - runcC.Monitor = shim.Default + runcC.Monitor = reaper.Default if err := s.initPlatform(); err != nil { shutdown() return nil, errors.Wrap(err, "failed to initialized platform behavior") diff --git a/runtime/v2/runc/v2/service.go b/runtime/v2/runc/v2/service.go index 54e82dba9..9b31b3547 100644 --- a/runtime/v2/runc/v2/service.go +++ b/runtime/v2/runc/v2/service.go @@ -44,6 +44,7 @@ import ( "github.com/containerd/containerd/runtime/v2/runc/options" "github.com/containerd/containerd/runtime/v2/shim" taskAPI "github.com/containerd/containerd/runtime/v2/task" + "github.com/containerd/containerd/sys/reaper" runcC "github.com/containerd/go-runc" "github.com/containerd/typeurl" "github.com/gogo/protobuf/proto" @@ -83,13 +84,13 @@ func New(ctx context.Context, id string, publisher shim.Publisher, shutdown func id: id, context: ctx, events: make(chan interface{}, 128), - ec: shim.Default.Subscribe(), + ec: reaper.Default.Subscribe(), ep: ep, cancel: shutdown, containers: make(map[string]*runc.Container), } go s.processExits() - runcC.Monitor = shim.Default + runcC.Monitor = reaper.Default if err := s.initPlatform(); err != nil { shutdown() return nil, errors.Wrap(err, "failed to initialized platform behavior") diff --git a/runtime/v2/shim/shim_unix.go b/runtime/v2/shim/shim_unix.go index dc3e6a891..e6dc3e02f 100644 --- a/runtime/v2/shim/shim_unix.go +++ b/runtime/v2/shim/shim_unix.go @@ -26,6 +26,7 @@ import ( "os/signal" "syscall" + "github.com/containerd/containerd/sys/reaper" "github.com/containerd/fifo" "github.com/pkg/errors" "github.com/sirupsen/logrus" @@ -79,7 +80,7 @@ func handleSignals(ctx context.Context, logger *logrus.Entry, signals chan os.Si case s := <-signals: switch s { case unix.SIGCHLD: - if err := Reap(); err != nil { + if err := reaper.Reap(); err != nil { logger.WithError(err).Error("reap exit status") } case unix.SIGPIPE: diff --git a/runtime/v2/shim/reaper_unix.go b/sys/reaper/reaper_unix.go similarity index 99% rename from runtime/v2/shim/reaper_unix.go rename to sys/reaper/reaper_unix.go index 45a88db12..4a150a8f9 100644 --- a/runtime/v2/shim/reaper_unix.go +++ b/sys/reaper/reaper_unix.go @@ -16,7 +16,7 @@ limitations under the License. */ -package shim +package reaper import ( "os/exec" From bee4c1a8a287e2469aed2968141199aba1304912 Mon Sep 17 00:00:00 2001 From: Michael Crosby Date: Thu, 15 Aug 2019 18:53:59 +0000 Subject: [PATCH 3/4] Add retry and non-blocking send for exit events Signed-off-by: Michael Crosby --- cmd/containerd-shim/shim_darwin.go | 4 +-- sys/reaper/reaper_unix.go | 47 +++++++++++++++++++++++++----- 2 files changed, 41 insertions(+), 10 deletions(-) diff --git a/cmd/containerd-shim/shim_darwin.go b/cmd/containerd-shim/shim_darwin.go index 85593d6c8..d6dc23072 100644 --- a/cmd/containerd-shim/shim_darwin.go +++ b/cmd/containerd-shim/shim_darwin.go @@ -22,7 +22,7 @@ import ( "os" "os/signal" - "github.com/containerd/containerd/runtime/v1/shim" + "github.com/containerd/containerd/sys/reaper" runc "github.com/containerd/go-runc" "github.com/containerd/ttrpc" ) @@ -34,7 +34,7 @@ func setupSignals() (chan os.Signal, error) { signal.Notify(signals) // make sure runc is setup to use the monitor // for waiting on processes - runc.Monitor = shim.Default + runc.Monitor = reaper.Default return signals, nil } diff --git a/sys/reaper/reaper_unix.go b/sys/reaper/reaper_unix.go index 4a150a8f9..85b7034ef 100644 --- a/sys/reaper/reaper_unix.go +++ b/sys/reaper/reaper_unix.go @@ -36,19 +36,25 @@ const bufferSize = 2048 // Reap should be called when the process receives an SIGCHLD. Reap will reap // all exited processes and close their wait channels func Reap() error { - now := time.Now() + var ( + now = time.Now() + current []chan runc.Exit + ) exits, err := sys.Reap(false) + Default.Lock() for c := range Default.subscribers { - for _, e := range exits { - c <- runc.Exit{ - Timestamp: now, - Pid: e.Pid, - Status: e.Status, - } - } + current = append(current, c) } Default.Unlock() + + for _, e := range exits { + go notify(runc.Exit{ + Timestamp: now, + Pid: e.Pid, + Status: e.Status, + }, current) + } return err } @@ -107,3 +113,28 @@ func (m *Monitor) Unsubscribe(c chan runc.Exit) { close(c) m.Unlock() } + +func notify(e runc.Exit, subscribers []chan runc.Exit) { + const timeout = 10 * time.Millisecond + timer := time.NewTimer(timeout) + timer.Stop() + + for i := 0; i < 50; i++ { + var failed []chan runc.Exit + for _, s := range subscribers { + timer.Reset(timeout) + + select { + case s <- e: + case <-timer.C: + failed = append(failed, s) + } + timer.Stop() + } + // all subscribers received the message + if len(failed) == 0 { + return + } + subscribers = failed + } +} From 27636393884fda7e69bf7ce7c0516a5eb5bcda14 Mon Sep 17 00:00:00 2001 From: Michael Crosby Date: Thu, 15 Aug 2019 21:14:25 +0000 Subject: [PATCH 4/4] Try to preserve exit event order Signed-off-by: Michael Crosby --- sys/reaper/reaper_unix.go | 132 ++++++++++++++++++++++++++++---------- 1 file changed, 97 insertions(+), 35 deletions(-) diff --git a/sys/reaper/reaper_unix.go b/sys/reaper/reaper_unix.go index 85b7034ef..8f764ec1e 100644 --- a/sys/reaper/reaper_unix.go +++ b/sys/reaper/reaper_unix.go @@ -33,41 +33,59 @@ var ErrNoSuchProcess = errors.New("no such process") const bufferSize = 2048 +type subscriber struct { + sync.Mutex + c chan runc.Exit + closed bool +} + +func (s *subscriber) close() { + s.Lock() + if s.closed { + s.Unlock() + return + } + close(s.c) + s.closed = true + s.Unlock() +} + +func (s *subscriber) do(fn func()) { + s.Lock() + fn() + s.Unlock() +} + // Reap should be called when the process receives an SIGCHLD. Reap will reap // all exited processes and close their wait channels func Reap() error { - var ( - now = time.Now() - current []chan runc.Exit - ) + now := time.Now() exits, err := sys.Reap(false) - - Default.Lock() - for c := range Default.subscribers { - current = append(current, c) - } - Default.Unlock() - for _, e := range exits { - go notify(runc.Exit{ + done := Default.notify(runc.Exit{ Timestamp: now, Pid: e.Pid, Status: e.Status, - }, current) + }) + + select { + case <-done: + case <-time.After(1 * time.Second): + } } return err } // Default is the default monitor initialized for the package var Default = &Monitor{ - subscribers: make(map[chan runc.Exit]struct{}), + subscribers: make(map[chan runc.Exit]*subscriber), } // Monitor monitors the underlying system for process status changes type Monitor struct { sync.Mutex - subscribers map[chan runc.Exit]struct{} + subscribers map[chan runc.Exit]*subscriber } // Start starts the command a registers the process with the reaper @@ -101,7 +119,9 @@ func (m *Monitor) Wait(c *exec.Cmd, ec chan runc.Exit) (int, error) { func (m *Monitor) Subscribe() chan runc.Exit { c := make(chan runc.Exit, bufferSize) m.Lock() - m.subscribers[c] = struct{}{} + m.subscribers[c] = &subscriber{ + c: c, + } m.Unlock() return c } @@ -109,32 +129,74 @@ func (m *Monitor) Subscribe() chan runc.Exit { // Unsubscribe to process exit changes func (m *Monitor) Unsubscribe(c chan runc.Exit) { m.Lock() + s, ok := m.subscribers[c] + if !ok { + m.Unlock() + return + } + s.close() delete(m.subscribers, c) - close(c) m.Unlock() } -func notify(e runc.Exit, subscribers []chan runc.Exit) { - const timeout = 10 * time.Millisecond - timer := time.NewTimer(timeout) - timer.Stop() +func (m *Monitor) getSubscribers() map[chan runc.Exit]*subscriber { + out := make(map[chan runc.Exit]*subscriber) + m.Lock() + for k, v := range m.subscribers { + out[k] = v + } + m.Unlock() + return out +} - for i := 0; i < 50; i++ { - var failed []chan runc.Exit - for _, s := range subscribers { - timer.Reset(timeout) +func (m *Monitor) notify(e runc.Exit) chan struct{} { + const timeout = 1 * time.Millisecond + var ( + done = make(chan struct{}, 1) + timer = time.NewTimer(timeout) + success = make(map[chan runc.Exit]struct{}) + ) + stop(timer, true) - select { - case s <- e: - case <-timer.C: - failed = append(failed, s) + go func() { + defer close(done) + + for { + var ( + failed int + subscribers = m.getSubscribers() + ) + for _, s := range subscribers { + s.do(func() { + if s.closed { + return + } + if _, ok := success[s.c]; ok { + return + } + timer.Reset(timeout) + recv := true + select { + case s.c <- e: + success[s.c] = struct{}{} + case <-timer.C: + recv = false + failed++ + } + stop(timer, recv) + }) + } + // all subscribers received the message + if failed == 0 { + return } - timer.Stop() } - // all subscribers received the message - if len(failed) == 0 { - return - } - subscribers = failed + }() + return done +} + +func stop(timer *time.Timer, recv bool) { + if !timer.Stop() && recv { + <-timer.C } }