diff --git a/event.go b/event.go index c21ef6cbb..540dac213 100644 --- a/event.go +++ b/event.go @@ -63,3 +63,22 @@ type Event struct { type Handler interface { Handle(*Event) error } + +type commonEvent struct { + data *Event + sv *Supervisor +} + +func (e *commonEvent) Handle() { + h, ok := e.sv.handlers[e.data.Type] + if !ok { + e.data.Err <- ErrUnknownEvent + return + } + err := h.Handle(e.data) + if err != errDeferedResponse { + e.data.Err <- err + close(e.data.Err) + return + } +} diff --git a/eventloop/eventloop.go b/eventloop/eventloop.go new file mode 100644 index 000000000..7a80a273e --- /dev/null +++ b/eventloop/eventloop.go @@ -0,0 +1,51 @@ +package eventloop + +import ( + "runtime" + "sync" +) + +// Event is receiving notification from loop with Handle() call. +type Event interface { + Handle() +} + +// EventLoop is interface for event loops. +// Start starting events processing +// Send adding event to loop +type EventLoop interface { + Start() error + Send(Event) error +} + +// ChanLoop is implementation of EventLoop based on channels. +type ChanLoop struct { + events chan Event + once sync.Once +} + +// NewChanLoop returns ChanLoop with internal channel buffer set to q. +func NewChanLoop(q int) EventLoop { + return &ChanLoop{ + events: make(chan Event, q), + } +} + +// Start starting to read events from channel in separate goroutines. +// All calls after first is no-op. +func (el *ChanLoop) Start() error { + go el.once.Do(func() { + // allocate whole OS thread, so nothing can get scheduled over eventloop + runtime.LockOSThread() + for ev := range el.events { + ev.Handle() + } + }) + return nil +} + +// Send sends event to channel. Will block if buffer is full. +func (el *ChanLoop) Send(ev Event) error { + el.events <- ev + return nil +} diff --git a/eventloop/eventloop_test.go b/eventloop/eventloop_test.go new file mode 100644 index 000000000..a4fda4708 --- /dev/null +++ b/eventloop/eventloop_test.go @@ -0,0 +1,73 @@ +package eventloop + +import ( + "sync" + "testing" + "time" +) + +type racyEvent struct { + m map[int]struct{} + wg *sync.WaitGroup +} + +func (e *racyEvent) Handle() { + e.m[0] = struct{}{} + e.wg.Done() +} + +func simulateRacyEvents(el EventLoop) { + wg := &sync.WaitGroup{} + raceMap := make(map[int]struct{}) + var evs []*racyEvent + for i := 0; i < 1024; i++ { + wg.Add(1) + evs = append(evs, &racyEvent{m: raceMap, wg: wg}) + } + for _, ev := range evs { + el.Send(ev) + } + wg.Wait() +} + +// run with -race +func TestChanRace(t *testing.T) { + e := NewChanLoop(1024) + e.Start() + simulateRacyEvents(e) +} + +// run with -race +func TestChanStartTwiceRace(t *testing.T) { + e := NewChanLoop(1024) + e.Start() + e.Start() + simulateRacyEvents(e) +} + +type testEvent struct { + wg *sync.WaitGroup +} + +func (e *testEvent) Handle() { + e.wg.Done() +} + +func TestChanEventSpawn(t *testing.T) { + e := NewChanLoop(1024) + e.Start() + wg := &sync.WaitGroup{} + wg.Add(2) + e.Send(&testEvent{wg: wg}) + e.Send(&testEvent{wg: wg}) + waitCh := make(chan struct{}) + go func() { + wg.Wait() + close(waitCh) + }() + select { + case <-waitCh: + case <-time.After(1 * time.Second): + t.Fatal("Events was not handled in loop") + } +} diff --git a/supervisor.go b/supervisor.go index 2ac9a8c39..9406899eb 100644 --- a/supervisor.go +++ b/supervisor.go @@ -4,12 +4,12 @@ import ( "os" "os/signal" "path/filepath" - goruntime "runtime" "sync" "syscall" "time" "github.com/Sirupsen/logrus" + "github.com/docker/containerd/eventloop" "github.com/docker/containerd/runtime" "github.com/opencontainers/runc/libcontainer" ) @@ -36,10 +36,10 @@ func NewSupervisor(id, stateDir string, tasks chan *StartTask, oom bool) (*Super processes: make(map[int]*containerInfo), runtime: r, tasks: tasks, - events: make(chan *Event, DefaultBufferSize), machine: machine, subscribers: make(map[chan *Event]struct{}), statsCollector: newStatsCollector(statsInterval), + el: eventloop.NewChanLoop(DefaultBufferSize), } if oom { s.notifier = newNotifier(s) @@ -86,6 +86,7 @@ type Supervisor struct { containerGroup sync.WaitGroup statsCollector *statsCollector notifier *notifier + el eventloop.EventLoop } // Stop closes all tasks and sends a SIGTERM to each container's pid1 then waits for they to @@ -174,32 +175,11 @@ func (s *Supervisor) notifySubscribers(e *Event) { // therefore it is save to do operations in the handlers that modify state of the system or // state of the Supervisor func (s *Supervisor) Start() error { - go func() { - // allocate an entire thread to this goroutine for the main event loop - // so that nothing else is scheduled over the top of it. - goruntime.LockOSThread() - for e := range s.events { - EventsCounter.Inc(1) - h, ok := s.handlers[e.Type] - if !ok { - e.Err <- ErrUnknownEvent - continue - } - if err := h.Handle(e); err != nil { - if err != errDeferedResponse { - e.Err <- err - close(e.Err) - } - continue - } - close(e.Err) - } - }() logrus.WithFields(logrus.Fields{ "runtime": s.runtime.Type(), "stateDir": s.stateDir, }).Debug("Supervisor started") - return nil + return s.el.Start() } // Machine returns the machine information for which the @@ -231,7 +211,8 @@ func (s *Supervisor) getContainerForPid(pid int) (runtime.Container, error) { // SendEvent sends the provided event the the supervisors main event loop func (s *Supervisor) SendEvent(evt *Event) { - s.events <- evt + EventsCounter.Inc(1) + s.el.Send(&commonEvent{data: evt, sv: s}) } func (s *Supervisor) copyIO(stdin, stdout, stderr string, i *runtime.IO) (*copier, error) {