Generate fatal error when cri plugin fail to start.

Signed-off-by: Lantao Liu <lantaol@google.com>
This commit is contained in:
Lantao Liu 2018-05-31 00:54:25 -07:00
parent 8bb978e3d6
commit b870ee7942
2 changed files with 38 additions and 22 deletions

View File

@ -116,15 +116,16 @@ func convertEvent(e *gogotypes.Any) (string, interface{}, error) {
} }
// start starts the event monitor which monitors and handles all container events. It returns // start starts the event monitor which monitors and handles all container events. It returns
// a channel for the caller to wait for the event monitor to stop. start must be called after // an error channel for the caller to wait for stop errors from the event monitor.
// subscribe. // start must be called after subscribe.
func (em *eventMonitor) start() (<-chan struct{}, error) { func (em *eventMonitor) start() <-chan error {
errCh := make(chan error)
if em.ch == nil || em.errCh == nil { if em.ch == nil || em.errCh == nil {
return nil, errors.New("event channel is nil") panic("event channel is nil")
} }
closeCh := make(chan struct{})
backOffCheckCh := em.backOff.start() backOffCheckCh := em.backOff.start()
go func() { go func() {
defer close(errCh)
for { for {
select { select {
case e := <-em.ch: case e := <-em.ch:
@ -144,8 +145,11 @@ func (em *eventMonitor) start() (<-chan struct{}, error) {
em.backOff.enBackOff(cID, evt) em.backOff.enBackOff(cID, evt)
} }
case err := <-em.errCh: case err := <-em.errCh:
logrus.WithError(err).Error("Failed to handle event stream") // Close errCh in defer directly if there is no error.
close(closeCh) if err != nil {
logrus.WithError(err).Errorf("Failed to handle event stream")
errCh <- err
}
return return
case <-backOffCheckCh: case <-backOffCheckCh:
cIDs := em.backOff.getExpiredContainers() cIDs := em.backOff.getExpiredContainers()
@ -162,7 +166,7 @@ func (em *eventMonitor) start() (<-chan struct{}, error) {
} }
} }
}() }()
return closeCh, nil return errCh
} }
// stop stops the event monitor. It will close the event channel. // stop stops the event monitor. It will close the event channel.

View File

@ -19,6 +19,7 @@ package server
import ( import (
"fmt" "fmt"
"io" "io"
"net/http"
"path/filepath" "path/filepath"
"time" "time"
@ -179,10 +180,7 @@ func (c *criService) Run() error {
// Start event handler. // Start event handler.
logrus.Info("Start event monitor") logrus.Info("Start event monitor")
eventMonitorCloseCh, err := c.eventMonitor.start() eventMonitorErrCh := c.eventMonitor.start()
if err != nil {
return errors.Wrap(err, "failed to start event monitor")
}
// Start snapshot stats syncer, it doesn't need to be stopped. // Start snapshot stats syncer, it doesn't need to be stopped.
logrus.Info("Start snapshots syncer") logrus.Info("Start snapshots syncer")
@ -195,27 +193,32 @@ func (c *criService) Run() error {
// Start streaming server. // Start streaming server.
logrus.Info("Start streaming server") logrus.Info("Start streaming server")
streamServerCloseCh := make(chan struct{}) streamServerErrCh := make(chan error)
go func() { go func() {
if err := c.streamServer.Start(true); err != nil { defer close(streamServerErrCh)
if err := c.streamServer.Start(true); err != nil && err != http.ErrServerClosed {
logrus.WithError(err).Error("Failed to start streaming server") logrus.WithError(err).Error("Failed to start streaming server")
streamServerErrCh <- err
} }
close(streamServerCloseCh)
}() }()
// Set the server as initialized. GRPC services could start serving traffic. // Set the server as initialized. GRPC services could start serving traffic.
c.initialized.Set() c.initialized.Set()
var eventMonitorErr, streamServerErr error
// Stop the whole CRI service if any of the critical service exits. // Stop the whole CRI service if any of the critical service exits.
select { select {
case <-eventMonitorCloseCh: case eventMonitorErr = <-eventMonitorErrCh:
case <-streamServerCloseCh: case streamServerErr = <-streamServerErrCh:
} }
if err := c.Close(); err != nil { if err := c.Close(); err != nil {
return errors.Wrap(err, "failed to stop cri service") return errors.Wrap(err, "failed to stop cri service")
} }
// If the error is set above, err from channel must be nil here, because
<-eventMonitorCloseCh // the channel is supposed to be closed. Or else, we wait and set it.
if err := <-eventMonitorErrCh; err != nil {
eventMonitorErr = err
}
logrus.Info("Event monitor stopped") logrus.Info("Event monitor stopped")
// There is a race condition with http.Server.Serve. // There is a race condition with http.Server.Serve.
// When `Close` is called at the same time with `Serve`, `Close` // When `Close` is called at the same time with `Serve`, `Close`
@ -227,18 +230,27 @@ func (c *criService) Run() error {
// is fixed. // is fixed.
const streamServerStopTimeout = 2 * time.Second const streamServerStopTimeout = 2 * time.Second
select { select {
case <-streamServerCloseCh: case err := <-streamServerErrCh:
if err != nil {
streamServerErr = err
}
logrus.Info("Stream server stopped") logrus.Info("Stream server stopped")
case <-time.After(streamServerStopTimeout): case <-time.After(streamServerStopTimeout):
logrus.Errorf("Stream server is not stopped in %q", streamServerStopTimeout) logrus.Errorf("Stream server is not stopped in %q", streamServerStopTimeout)
} }
if eventMonitorErr != nil {
return errors.Wrap(eventMonitorErr, "event monitor error")
}
if streamServerErr != nil {
return errors.Wrap(streamServerErr, "stream server error")
}
return nil return nil
} }
// Stop stops the CRI service. // Close stops the CRI service.
// TODO(random-liu): Make close synchronous.
func (c *criService) Close() error { func (c *criService) Close() error {
logrus.Info("Stop CRI service") logrus.Info("Stop CRI service")
// TODO(random-liu): Make event monitor stop synchronous.
c.eventMonitor.stop() c.eventMonitor.stop()
if err := c.streamServer.Stop(); err != nil { if err := c.streamServer.Stop(); err != nil {
return errors.Wrap(err, "failed to stop stream server") return errors.Wrap(err, "failed to stop stream server")