Merge pull request #8576 from henry118/ready

This commit is contained in:
Samuel Karp 2023-05-26 14:41:57 -07:00 committed by GitHub
commit d0dba8e163
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 42 additions and 16 deletions

View File

@ -270,12 +270,21 @@ can be used and modified as necessary as a custom configuration.`
} }
serve(ctx, l, server.ServeGRPC) serve(ctx, l, server.ServeGRPC)
if err := notifyReady(ctx); err != nil { readyC := make(chan struct{})
log.G(ctx).WithError(err).Warn("notify ready failed") go func() {
} server.Wait()
close(readyC)
}()
log.G(ctx).Infof("containerd successfully booted in %fs", time.Since(start).Seconds()) select {
<-done case <-readyC:
if err := notifyReady(ctx); err != nil {
log.G(ctx).WithError(err).Warn("notify ready failed")
}
log.G(ctx).Infof("containerd successfully booted in %fs", time.Since(start).Seconds())
<-done
case <-done:
}
return nil return nil
} }
return app return app

View File

@ -54,6 +54,7 @@ func init() {
} }
func initCRIService(ic *plugin.InitContext) (interface{}, error) { func initCRIService(ic *plugin.InitContext) (interface{}, error) {
ready := ic.RegisterReadiness()
ic.Meta.Platforms = []imagespec.Platform{platforms.DefaultSpec()} ic.Meta.Platforms = []imagespec.Platform{platforms.DefaultSpec()}
ic.Meta.Exports = map[string]string{"CRIVersion": constants.CRIVersion} ic.Meta.Exports = map[string]string{"CRIVersion": constants.CRIVersion}
ctx := ic.Context ctx := ic.Context
@ -99,7 +100,7 @@ func initCRIService(ic *plugin.InitContext) (interface{}, error) {
} }
go func() { go func() {
if err := s.Run(); err != nil { if err := s.Run(ready); err != nil {
log.G(ctx).WithError(err).Fatal("Failed to run CRI service") log.G(ctx).WithError(err).Fatal("Failed to run CRI service")
} }
// TODO(random-liu): Whether and how we can stop containerd. // TODO(random-liu): Whether and how we can stop containerd.

View File

@ -63,7 +63,7 @@ type CRIService interface {
// Closer is used by containerd to gracefully stop cri service. // Closer is used by containerd to gracefully stop cri service.
io.Closer io.Closer
Run() error Run(ready func()) error
Register(*grpc.Server) error Register(*grpc.Server) error
} }
@ -237,7 +237,7 @@ func (c *criService) RegisterTCP(s *grpc.Server) error {
} }
// Run starts the CRI service. // Run starts the CRI service.
func (c *criService) Run() error { func (c *criService) Run(ready func()) error {
log.L.Info("Start subscribing containerd event") log.L.Info("Start subscribing containerd event")
c.eventMonitor.subscribe(c.client) c.eventMonitor.subscribe(c.client)
@ -291,6 +291,7 @@ func (c *criService) Run() error {
// Set the server as initialized. GRPC services could start serving traffic. // Set the server as initialized. GRPC services could start serving traffic.
c.initialized.Store(true) c.initialized.Store(true)
ready()
var eventMonitorErr, streamServerErr, cniNetConfMonitorErr error var eventMonitorErr, streamServerErr, cniNetConfMonitorErr error
// Stop the whole CRI service if any of the critical service exits. // Stop the whole CRI service if any of the critical service exits.

View File

@ -62,7 +62,7 @@ type CRIService interface {
// Closer is used by containerd to gracefully stop cri service. // Closer is used by containerd to gracefully stop cri service.
io.Closer io.Closer
Run() error Run(ready func()) error
Register(*grpc.Server) error Register(*grpc.Server) error
} }
@ -203,7 +203,7 @@ func (c *criService) RegisterTCP(s *grpc.Server) error {
} }
// Run starts the CRI service. // Run starts the CRI service.
func (c *criService) Run() error { func (c *criService) Run(ready func()) error {
log.L.Info("Start subscribing containerd event") log.L.Info("Start subscribing containerd event")
c.eventMonitor.subscribe(c.client) c.eventMonitor.subscribe(c.client)
@ -266,6 +266,7 @@ func (c *criService) Run() error {
// Set the server as initialized. GRPC services could start serving traffic. // Set the server as initialized. GRPC services could start serving traffic.
c.initialized.Store(true) c.initialized.Store(true)
ready()
var eventMonitorErr, streamServerErr, cniNetConfMonitorErr error var eventMonitorErr, streamServerErr, cniNetConfMonitorErr error
// Stop the whole CRI service if any of the critical service exits. // Stop the whole CRI service if any of the critical service exits.

View File

@ -28,12 +28,13 @@ import (
// InitContext is used for plugin initialization // InitContext is used for plugin initialization
type InitContext struct { type InitContext struct {
Context context.Context Context context.Context
Root string Root string
State string State string
Config interface{} Config interface{}
Address string Address string
TTRPCAddress string TTRPCAddress string
RegisterReadiness func() func()
// deprecated: will be removed in 2.0, use plugin.EventType // deprecated: will be removed in 2.0, use plugin.EventType
Events *exchange.Exchange Events *exchange.Exchange

View File

@ -218,6 +218,7 @@ func New(ctx context.Context, config *srvconfig.Config) (*Server, error) {
initContext.Events = events initContext.Events = events
initContext.Address = config.GRPC.Address initContext.Address = config.GRPC.Address
initContext.TTRPCAddress = config.TTRPC.Address initContext.TTRPCAddress = config.TTRPC.Address
initContext.RegisterReadiness = s.RegisterReadiness
// load the plugin specific configuration if it is provided // load the plugin specific configuration if it is provided
if p.Config != nil { if p.Config != nil {
@ -293,6 +294,7 @@ type Server struct {
tcpServer *grpc.Server tcpServer *grpc.Server
config *srvconfig.Config config *srvconfig.Config
plugins []*plugin.Plugin plugins []*plugin.Plugin
ready sync.WaitGroup
} }
// ServeGRPC provides the containerd grpc APIs on the provided listener // ServeGRPC provides the containerd grpc APIs on the provided listener
@ -370,6 +372,17 @@ func (s *Server) Stop() {
} }
} }
func (s *Server) RegisterReadiness() func() {
s.ready.Add(1)
return func() {
s.ready.Done()
}
}
func (s *Server) Wait() {
s.ready.Wait()
}
// LoadPlugins loads all plugins into containerd and generates an ordered graph // LoadPlugins loads all plugins into containerd and generates an ordered graph
// of all plugins. // of all plugins.
func LoadPlugins(ctx context.Context, config *srvconfig.Config) ([]*plugin.Registration, error) { func LoadPlugins(ctx context.Context, config *srvconfig.Config) ([]*plugin.Registration, error) {