notify readiness when registered plugins are ready
Signed-off-by: Henry Wang <henwang@amazon.com>
This commit is contained in:
parent
ed7c0ebe28
commit
4bfcac85fa
@ -270,12 +270,21 @@ can be used and modified as necessary as a custom configuration.`
|
||||
}
|
||||
serve(ctx, l, server.ServeGRPC)
|
||||
|
||||
if err := notifyReady(ctx); err != nil {
|
||||
log.G(ctx).WithError(err).Warn("notify ready failed")
|
||||
}
|
||||
readyC := make(chan struct{})
|
||||
go func() {
|
||||
server.Wait()
|
||||
close(readyC)
|
||||
}()
|
||||
|
||||
log.G(ctx).Infof("containerd successfully booted in %fs", time.Since(start).Seconds())
|
||||
<-done
|
||||
select {
|
||||
case <-readyC:
|
||||
if err := notifyReady(ctx); err != nil {
|
||||
log.G(ctx).WithError(err).Warn("notify ready failed")
|
||||
}
|
||||
log.G(ctx).Infof("containerd successfully booted in %fs", time.Since(start).Seconds())
|
||||
<-done
|
||||
case <-done:
|
||||
}
|
||||
return nil
|
||||
}
|
||||
return app
|
||||
|
@ -54,6 +54,7 @@ func init() {
|
||||
}
|
||||
|
||||
func initCRIService(ic *plugin.InitContext) (interface{}, error) {
|
||||
ready := ic.RegisterReadiness()
|
||||
ic.Meta.Platforms = []imagespec.Platform{platforms.DefaultSpec()}
|
||||
ic.Meta.Exports = map[string]string{"CRIVersion": constants.CRIVersion}
|
||||
ctx := ic.Context
|
||||
@ -99,7 +100,7 @@ func initCRIService(ic *plugin.InitContext) (interface{}, error) {
|
||||
}
|
||||
|
||||
go func() {
|
||||
if err := s.Run(); err != nil {
|
||||
if err := s.Run(ready); err != nil {
|
||||
log.G(ctx).WithError(err).Fatal("Failed to run CRI service")
|
||||
}
|
||||
// TODO(random-liu): Whether and how we can stop containerd.
|
||||
|
@ -63,7 +63,7 @@ type CRIService interface {
|
||||
// Closer is used by containerd to gracefully stop cri service.
|
||||
io.Closer
|
||||
|
||||
Run() error
|
||||
Run(ready func()) error
|
||||
|
||||
Register(*grpc.Server) error
|
||||
}
|
||||
@ -237,7 +237,7 @@ func (c *criService) RegisterTCP(s *grpc.Server) error {
|
||||
}
|
||||
|
||||
// Run starts the CRI service.
|
||||
func (c *criService) Run() error {
|
||||
func (c *criService) Run(ready func()) error {
|
||||
log.L.Info("Start subscribing containerd event")
|
||||
c.eventMonitor.subscribe(c.client)
|
||||
|
||||
@ -291,6 +291,7 @@ func (c *criService) Run() error {
|
||||
|
||||
// Set the server as initialized. GRPC services could start serving traffic.
|
||||
c.initialized.Store(true)
|
||||
ready()
|
||||
|
||||
var eventMonitorErr, streamServerErr, cniNetConfMonitorErr error
|
||||
// Stop the whole CRI service if any of the critical service exits.
|
||||
|
@ -62,7 +62,7 @@ type CRIService interface {
|
||||
// Closer is used by containerd to gracefully stop cri service.
|
||||
io.Closer
|
||||
|
||||
Run() error
|
||||
Run(ready func()) error
|
||||
|
||||
Register(*grpc.Server) error
|
||||
}
|
||||
@ -203,7 +203,7 @@ func (c *criService) RegisterTCP(s *grpc.Server) error {
|
||||
}
|
||||
|
||||
// Run starts the CRI service.
|
||||
func (c *criService) Run() error {
|
||||
func (c *criService) Run(ready func()) error {
|
||||
log.L.Info("Start subscribing containerd event")
|
||||
c.eventMonitor.subscribe(c.client)
|
||||
|
||||
@ -266,6 +266,7 @@ func (c *criService) Run() error {
|
||||
|
||||
// Set the server as initialized. GRPC services could start serving traffic.
|
||||
c.initialized.Store(true)
|
||||
ready()
|
||||
|
||||
var eventMonitorErr, streamServerErr, cniNetConfMonitorErr error
|
||||
// Stop the whole CRI service if any of the critical service exits.
|
||||
|
@ -28,12 +28,13 @@ import (
|
||||
|
||||
// InitContext is used for plugin initialization
|
||||
type InitContext struct {
|
||||
Context context.Context
|
||||
Root string
|
||||
State string
|
||||
Config interface{}
|
||||
Address string
|
||||
TTRPCAddress string
|
||||
Context context.Context
|
||||
Root string
|
||||
State string
|
||||
Config interface{}
|
||||
Address string
|
||||
TTRPCAddress string
|
||||
RegisterReadiness func() func()
|
||||
|
||||
// deprecated: will be removed in 2.0, use plugin.EventType
|
||||
Events *exchange.Exchange
|
||||
|
@ -218,6 +218,7 @@ func New(ctx context.Context, config *srvconfig.Config) (*Server, error) {
|
||||
initContext.Events = events
|
||||
initContext.Address = config.GRPC.Address
|
||||
initContext.TTRPCAddress = config.TTRPC.Address
|
||||
initContext.RegisterReadiness = s.RegisterReadiness
|
||||
|
||||
// load the plugin specific configuration if it is provided
|
||||
if p.Config != nil {
|
||||
@ -293,6 +294,7 @@ type Server struct {
|
||||
tcpServer *grpc.Server
|
||||
config *srvconfig.Config
|
||||
plugins []*plugin.Plugin
|
||||
ready sync.WaitGroup
|
||||
}
|
||||
|
||||
// ServeGRPC provides the containerd grpc APIs on the provided listener
|
||||
@ -370,6 +372,17 @@ func (s *Server) Stop() {
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Server) RegisterReadiness() func() {
|
||||
s.ready.Add(1)
|
||||
return func() {
|
||||
s.ready.Done()
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Server) Wait() {
|
||||
s.ready.Wait()
|
||||
}
|
||||
|
||||
// LoadPlugins loads all plugins into containerd and generates an ordered graph
|
||||
// of all plugins.
|
||||
func LoadPlugins(ctx context.Context, config *srvconfig.Config) ([]*plugin.Registration, error) {
|
||||
|
Loading…
Reference in New Issue
Block a user