Merge pull request #9598 from abel-von/sandbox-plugin-0103
sandbox: add event monitor for podsandbox controller
This commit is contained in:
@@ -42,7 +42,7 @@ import (
|
||||
"github.com/containerd/containerd/v2/internal/cri/config"
|
||||
criconfig "github.com/containerd/containerd/v2/internal/cri/config"
|
||||
"github.com/containerd/containerd/v2/internal/cri/nri"
|
||||
"github.com/containerd/containerd/v2/internal/cri/server/podsandbox"
|
||||
"github.com/containerd/containerd/v2/internal/cri/server/events"
|
||||
containerstore "github.com/containerd/containerd/v2/internal/cri/store/container"
|
||||
imagestore "github.com/containerd/containerd/v2/internal/cri/store/image"
|
||||
"github.com/containerd/containerd/v2/internal/cri/store/label"
|
||||
@@ -136,7 +136,7 @@ type criService struct {
|
||||
// streamServer is the streaming server serves container streaming request.
|
||||
streamServer streaming.Server
|
||||
// eventMonitor is the monitor monitors containerd events.
|
||||
eventMonitor *eventMonitor
|
||||
eventMonitor *events.EventMonitor
|
||||
// initialized indicates whether the server is initialized. All GRPC services
|
||||
// should return error before the server is initialized.
|
||||
initialized atomic.Bool
|
||||
@@ -217,7 +217,7 @@ func NewCRIService(options *CRIServiceOptions) (CRIService, runtime.RuntimeServi
|
||||
return nil, nil, fmt.Errorf("failed to create stream server: %w", err)
|
||||
}
|
||||
|
||||
c.eventMonitor = newEventMonitor(c)
|
||||
c.eventMonitor = events.NewEventMonitor(&criEventHandler{c: c})
|
||||
|
||||
c.cniNetConfMonitor = make(map[string]*cniNetConfSyncer)
|
||||
for name, i := range c.netPlugin {
|
||||
@@ -236,10 +236,6 @@ func NewCRIService(options *CRIServiceOptions) (CRIService, runtime.RuntimeServi
|
||||
}
|
||||
}
|
||||
|
||||
// Initialize pod sandbox controller
|
||||
podSandboxController := options.SandboxControllers[string(criconfig.ModePodSandbox)].(*podsandbox.Controller)
|
||||
podSandboxController.Init(c)
|
||||
|
||||
c.nri = options.NRI
|
||||
|
||||
c.runtimeHandlers, err = c.introspectRuntimeHandlers(ctx)
|
||||
@@ -250,16 +246,12 @@ func NewCRIService(options *CRIServiceOptions) (CRIService, runtime.RuntimeServi
|
||||
return c, c, nil
|
||||
}
|
||||
|
||||
// BackOffEvent is a temporary workaround to call eventMonitor from controller.Stop.
|
||||
// TODO: get rid of this.
|
||||
func (c *criService) BackOffEvent(id string, event interface{}) {
|
||||
c.eventMonitor.backOff.enBackOff(id, event)
|
||||
}
|
||||
|
||||
// Run starts the CRI service.
|
||||
func (c *criService) Run(ready func()) error {
|
||||
log.L.Info("Start subscribing containerd event")
|
||||
c.eventMonitor.subscribe(c.client)
|
||||
// note: filters are any match, if you want any match but not in namespace foo
|
||||
// then you have to manually filter namespace foo
|
||||
c.eventMonitor.Subscribe(c.client, []string{`topic=="/tasks/oom"`, `topic~="/images/"`})
|
||||
|
||||
log.L.Infof("Start recovering state")
|
||||
if err := c.recover(ctrdutil.NamespacedContext()); err != nil {
|
||||
@@ -268,7 +260,7 @@ func (c *criService) Run(ready func()) error {
|
||||
|
||||
// Start event handler.
|
||||
log.L.Info("Start event monitor")
|
||||
eventMonitorErrCh := c.eventMonitor.start()
|
||||
eventMonitorErrCh := c.eventMonitor.Start()
|
||||
|
||||
// Start CNI network conf syncers
|
||||
cniNetConfMonitorErrCh := make(chan error, len(c.cniNetConfMonitor))
|
||||
@@ -354,7 +346,7 @@ func (c *criService) Close() error {
|
||||
log.L.WithError(err).Errorf("failed to stop cni network conf monitor for %s", name)
|
||||
}
|
||||
}
|
||||
c.eventMonitor.stop()
|
||||
c.eventMonitor.Stop()
|
||||
if err := c.streamServer.Stop(); err != nil {
|
||||
return fmt.Errorf("failed to stop stream server: %w", err)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user