From d9881ab912e778933c0cca9f403602b537d63b01 Mon Sep 17 00:00:00 2001 From: Michael Crosby Date: Thu, 10 Dec 2015 13:56:49 -0800 Subject: [PATCH] Add locks around subscribers channel Signed-off-by: Michael Crosby --- buffer.go | 4 ++++ supervisor.go | 19 ++++++++++++++++--- 2 files changed, 20 insertions(+), 3 deletions(-) create mode 100644 buffer.go diff --git a/buffer.go b/buffer.go new file mode 100644 index 000000000..96fadaf75 --- /dev/null +++ b/buffer.go @@ -0,0 +1,4 @@ +package containerd + +// DefaultBufferSize is the default size for a channel's buffer +const DefaultBufferSize = 2048 diff --git a/supervisor.go b/supervisor.go index 1af293be9..428356c93 100644 --- a/supervisor.go +++ b/supervisor.go @@ -33,7 +33,7 @@ func NewSupervisor(id, stateDir string, tasks chan *StartTask) (*Supervisor, err processes: make(map[int]runtime.Container), runtime: r, tasks: tasks, - events: make(chan *Event, 2048), + events: make(chan *Event, DefaultBufferSize), machine: machine, subscribers: make(map[chan *Event]struct{}), } @@ -63,6 +63,7 @@ type Supervisor struct { runtime runtime.Runtime events chan *Event tasks chan *StartTask + subscriberLock sync.RWMutex subscribers map[chan *Event]struct{} machine Machine containerGroup sync.WaitGroup @@ -109,20 +110,32 @@ func (s *Supervisor) Close() error { } func (s *Supervisor) Events() chan *Event { - c := make(chan *Event, 2048) + s.subscriberLock.Lock() + defer s.subscriberLock.Unlock() + c := make(chan *Event, DefaultBufferSize) EventSubscriberCounter.Inc(1) s.subscribers[c] = struct{}{} return c } func (s *Supervisor) Unsubscribe(sub chan *Event) { + s.subscriberLock.Lock() + defer s.subscriberLock.Unlock() delete(s.subscribers, sub) + close(sub) EventSubscriberCounter.Dec(1) } func (s *Supervisor) NotifySubscribers(e *Event) { + s.subscriberLock.RLock() + defer s.subscriberLock.RUnlock() for sub := range s.subscribers { - sub <- e + // do a non-blocking send for the channel + select { + case sub <- e: + default: + logrus.WithField("event", e.Type).Warn("event not sent to subscriber") + } } }