diff --git a/cmd/containerd/command/main.go b/cmd/containerd/command/main.go index a729b3e75..922b54f8f 100644 --- a/cmd/containerd/command/main.go +++ b/cmd/containerd/command/main.go @@ -270,12 +270,21 @@ can be used and modified as necessary as a custom configuration.` } serve(ctx, l, server.ServeGRPC) - if err := notifyReady(ctx); err != nil { - log.G(ctx).WithError(err).Warn("notify ready failed") - } + readyC := make(chan struct{}) + go func() { + server.Wait() + close(readyC) + }() - log.G(ctx).Infof("containerd successfully booted in %fs", time.Since(start).Seconds()) - <-done + select { + case <-readyC: + if err := notifyReady(ctx); err != nil { + log.G(ctx).WithError(err).Warn("notify ready failed") + } + log.G(ctx).Infof("containerd successfully booted in %fs", time.Since(start).Seconds()) + <-done + case <-done: + } return nil } return app diff --git a/pkg/cri/cri.go b/pkg/cri/cri.go index d15c10429..577905cbe 100644 --- a/pkg/cri/cri.go +++ b/pkg/cri/cri.go @@ -54,6 +54,7 @@ func init() { } func initCRIService(ic *plugin.InitContext) (interface{}, error) { + ready := ic.RegisterReadiness() ic.Meta.Platforms = []imagespec.Platform{platforms.DefaultSpec()} ic.Meta.Exports = map[string]string{"CRIVersion": constants.CRIVersion} ctx := ic.Context @@ -99,7 +100,7 @@ func initCRIService(ic *plugin.InitContext) (interface{}, error) { } go func() { - if err := s.Run(); err != nil { + if err := s.Run(ready); err != nil { log.G(ctx).WithError(err).Fatal("Failed to run CRI service") } // TODO(random-liu): Whether and how we can stop containerd. diff --git a/pkg/cri/sbserver/service.go b/pkg/cri/sbserver/service.go index 6ed414df3..09f7243dc 100644 --- a/pkg/cri/sbserver/service.go +++ b/pkg/cri/sbserver/service.go @@ -63,7 +63,7 @@ type CRIService interface { // Closer is used by containerd to gracefully stop cri service. io.Closer - Run() error + Run(ready func()) error Register(*grpc.Server) error } @@ -237,7 +237,7 @@ func (c *criService) RegisterTCP(s *grpc.Server) error { } // Run starts the CRI service. -func (c *criService) Run() error { +func (c *criService) Run(ready func()) error { log.L.Info("Start subscribing containerd event") c.eventMonitor.subscribe(c.client) @@ -291,6 +291,7 @@ func (c *criService) Run() error { // Set the server as initialized. GRPC services could start serving traffic. c.initialized.Store(true) + ready() var eventMonitorErr, streamServerErr, cniNetConfMonitorErr error // Stop the whole CRI service if any of the critical service exits. diff --git a/pkg/cri/server/service.go b/pkg/cri/server/service.go index 8ac36fb66..21799ce4a 100644 --- a/pkg/cri/server/service.go +++ b/pkg/cri/server/service.go @@ -62,7 +62,7 @@ type CRIService interface { // Closer is used by containerd to gracefully stop cri service. io.Closer - Run() error + Run(ready func()) error Register(*grpc.Server) error } @@ -203,7 +203,7 @@ func (c *criService) RegisterTCP(s *grpc.Server) error { } // Run starts the CRI service. -func (c *criService) Run() error { +func (c *criService) Run(ready func()) error { log.L.Info("Start subscribing containerd event") c.eventMonitor.subscribe(c.client) @@ -266,6 +266,7 @@ func (c *criService) Run() error { // Set the server as initialized. GRPC services could start serving traffic. c.initialized.Store(true) + ready() var eventMonitorErr, streamServerErr, cniNetConfMonitorErr error // Stop the whole CRI service if any of the critical service exits. diff --git a/plugin/context.go b/plugin/context.go index 811ad1c3b..1cc5f382a 100644 --- a/plugin/context.go +++ b/plugin/context.go @@ -28,12 +28,13 @@ import ( // InitContext is used for plugin initialization type InitContext struct { - Context context.Context - Root string - State string - Config interface{} - Address string - TTRPCAddress string + Context context.Context + Root string + State string + Config interface{} + Address string + TTRPCAddress string + RegisterReadiness func() func() // deprecated: will be removed in 2.0, use plugin.EventType Events *exchange.Exchange diff --git a/services/server/server.go b/services/server/server.go index 442ecd5f6..46b579ce4 100644 --- a/services/server/server.go +++ b/services/server/server.go @@ -218,6 +218,7 @@ func New(ctx context.Context, config *srvconfig.Config) (*Server, error) { initContext.Events = events initContext.Address = config.GRPC.Address initContext.TTRPCAddress = config.TTRPC.Address + initContext.RegisterReadiness = s.RegisterReadiness // load the plugin specific configuration if it is provided if p.Config != nil { @@ -293,6 +294,7 @@ type Server struct { tcpServer *grpc.Server config *srvconfig.Config plugins []*plugin.Plugin + ready sync.WaitGroup } // ServeGRPC provides the containerd grpc APIs on the provided listener @@ -370,6 +372,17 @@ func (s *Server) Stop() { } } +func (s *Server) RegisterReadiness() func() { + s.ready.Add(1) + return func() { + s.ready.Done() + } +} + +func (s *Server) Wait() { + s.ready.Wait() +} + // LoadPlugins loads all plugins into containerd and generates an ordered graph // of all plugins. func LoadPlugins(ctx context.Context, config *srvconfig.Config) ([]*plugin.Registration, error) {