Merge pull request #794 from Random-Liu/panic-for-cri-start-failure
Generate fatal error when cri plugin fail to start.
This commit is contained in:
commit
578b34f112
@ -116,15 +116,16 @@ func convertEvent(e *gogotypes.Any) (string, interface{}, error) {
|
||||
}
|
||||
|
||||
// start starts the event monitor which monitors and handles all container events. It returns
|
||||
// a channel for the caller to wait for the event monitor to stop. start must be called after
|
||||
// subscribe.
|
||||
func (em *eventMonitor) start() (<-chan struct{}, error) {
|
||||
// an error channel for the caller to wait for stop errors from the event monitor.
|
||||
// start must be called after subscribe.
|
||||
func (em *eventMonitor) start() <-chan error {
|
||||
errCh := make(chan error)
|
||||
if em.ch == nil || em.errCh == nil {
|
||||
return nil, errors.New("event channel is nil")
|
||||
panic("event channel is nil")
|
||||
}
|
||||
closeCh := make(chan struct{})
|
||||
backOffCheckCh := em.backOff.start()
|
||||
go func() {
|
||||
defer close(errCh)
|
||||
for {
|
||||
select {
|
||||
case e := <-em.ch:
|
||||
@ -144,8 +145,11 @@ func (em *eventMonitor) start() (<-chan struct{}, error) {
|
||||
em.backOff.enBackOff(cID, evt)
|
||||
}
|
||||
case err := <-em.errCh:
|
||||
logrus.WithError(err).Error("Failed to handle event stream")
|
||||
close(closeCh)
|
||||
// Close errCh in defer directly if there is no error.
|
||||
if err != nil {
|
||||
logrus.WithError(err).Errorf("Failed to handle event stream")
|
||||
errCh <- err
|
||||
}
|
||||
return
|
||||
case <-backOffCheckCh:
|
||||
cIDs := em.backOff.getExpiredContainers()
|
||||
@ -162,7 +166,7 @@ func (em *eventMonitor) start() (<-chan struct{}, error) {
|
||||
}
|
||||
}
|
||||
}()
|
||||
return closeCh, nil
|
||||
return errCh
|
||||
}
|
||||
|
||||
// stop stops the event monitor. It will close the event channel.
|
||||
|
@ -19,6 +19,7 @@ package server
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
@ -179,10 +180,7 @@ func (c *criService) Run() error {
|
||||
|
||||
// Start event handler.
|
||||
logrus.Info("Start event monitor")
|
||||
eventMonitorCloseCh, err := c.eventMonitor.start()
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "failed to start event monitor")
|
||||
}
|
||||
eventMonitorErrCh := c.eventMonitor.start()
|
||||
|
||||
// Start snapshot stats syncer, it doesn't need to be stopped.
|
||||
logrus.Info("Start snapshots syncer")
|
||||
@ -195,27 +193,32 @@ func (c *criService) Run() error {
|
||||
|
||||
// Start streaming server.
|
||||
logrus.Info("Start streaming server")
|
||||
streamServerCloseCh := make(chan struct{})
|
||||
streamServerErrCh := make(chan error)
|
||||
go func() {
|
||||
if err := c.streamServer.Start(true); err != nil {
|
||||
defer close(streamServerErrCh)
|
||||
if err := c.streamServer.Start(true); err != nil && err != http.ErrServerClosed {
|
||||
logrus.WithError(err).Error("Failed to start streaming server")
|
||||
streamServerErrCh <- err
|
||||
}
|
||||
close(streamServerCloseCh)
|
||||
}()
|
||||
|
||||
// Set the server as initialized. GRPC services could start serving traffic.
|
||||
c.initialized.Set()
|
||||
|
||||
var eventMonitorErr, streamServerErr error
|
||||
// Stop the whole CRI service if any of the critical service exits.
|
||||
select {
|
||||
case <-eventMonitorCloseCh:
|
||||
case <-streamServerCloseCh:
|
||||
case eventMonitorErr = <-eventMonitorErrCh:
|
||||
case streamServerErr = <-streamServerErrCh:
|
||||
}
|
||||
if err := c.Close(); err != nil {
|
||||
return errors.Wrap(err, "failed to stop cri service")
|
||||
}
|
||||
|
||||
<-eventMonitorCloseCh
|
||||
// If the error is set above, err from channel must be nil here, because
|
||||
// the channel is supposed to be closed. Or else, we wait and set it.
|
||||
if err := <-eventMonitorErrCh; err != nil {
|
||||
eventMonitorErr = err
|
||||
}
|
||||
logrus.Info("Event monitor stopped")
|
||||
// There is a race condition with http.Server.Serve.
|
||||
// When `Close` is called at the same time with `Serve`, `Close`
|
||||
@ -227,18 +230,27 @@ func (c *criService) Run() error {
|
||||
// is fixed.
|
||||
const streamServerStopTimeout = 2 * time.Second
|
||||
select {
|
||||
case <-streamServerCloseCh:
|
||||
case err := <-streamServerErrCh:
|
||||
if err != nil {
|
||||
streamServerErr = err
|
||||
}
|
||||
logrus.Info("Stream server stopped")
|
||||
case <-time.After(streamServerStopTimeout):
|
||||
logrus.Errorf("Stream server is not stopped in %q", streamServerStopTimeout)
|
||||
}
|
||||
if eventMonitorErr != nil {
|
||||
return errors.Wrap(eventMonitorErr, "event monitor error")
|
||||
}
|
||||
if streamServerErr != nil {
|
||||
return errors.Wrap(streamServerErr, "stream server error")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Stop stops the CRI service.
|
||||
// Close stops the CRI service.
|
||||
// TODO(random-liu): Make close synchronous.
|
||||
func (c *criService) Close() error {
|
||||
logrus.Info("Stop CRI service")
|
||||
// TODO(random-liu): Make event monitor stop synchronous.
|
||||
c.eventMonitor.stop()
|
||||
if err := c.streamServer.Stop(); err != nil {
|
||||
return errors.Wrap(err, "failed to stop stream server")
|
||||
|
Loading…
Reference in New Issue
Block a user