diff --git a/pkg/controller/endpoint/endpoints_controller.go b/pkg/controller/endpoint/endpoints_controller.go index 61df9eac35a..3eae02210bb 100644 --- a/pkg/controller/endpoint/endpoints_controller.go +++ b/pkg/controller/endpoint/endpoints_controller.go @@ -73,8 +73,6 @@ const ( func NewEndpointController(podInformer coreinformers.PodInformer, serviceInformer coreinformers.ServiceInformer, endpointsInformer coreinformers.EndpointsInformer, client clientset.Interface, endpointUpdatesBatchPeriod time.Duration) *Controller { broadcaster := record.NewBroadcaster() - broadcaster.StartStructuredLogging(0) - broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: client.CoreV1().Events("")}) recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "endpoint-controller"}) if client != nil && client.CoreV1().RESTClient().GetRateLimiter() != nil { @@ -173,6 +171,12 @@ type Controller struct { // endpoints will be handled in parallel. func (e *Controller) Run(ctx context.Context, workers int) { defer utilruntime.HandleCrash() + + // Start events processing pipeline. + e.eventBroadcaster.StartStructuredLogging(0) + e.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: e.client.CoreV1().Events("")}) + defer e.eventBroadcaster.Shutdown() + defer e.queue.ShutDown() klog.Infof("Starting endpoint controller") diff --git a/pkg/controller/endpointslice/endpointslice_controller.go b/pkg/controller/endpointslice/endpointslice_controller.go index f91dc4de11d..15778c97a61 100644 --- a/pkg/controller/endpointslice/endpointslice_controller.go +++ b/pkg/controller/endpointslice/endpointslice_controller.go @@ -86,8 +86,6 @@ func NewController(podInformer coreinformers.PodInformer, endpointUpdatesBatchPeriod time.Duration, ) *Controller { broadcaster := record.NewBroadcaster() - broadcaster.StartStructuredLogging(0) - broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: client.CoreV1().Events("")}) recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "endpoint-slice-controller"}) if client != nil && client.CoreV1().RESTClient().GetRateLimiter() != nil { @@ -252,6 +250,12 @@ type Controller struct { // Run will not return until stopCh is closed. func (c *Controller) Run(workers int, stopCh <-chan struct{}) { defer utilruntime.HandleCrash() + + // Start events processing pipeline. + c.eventBroadcaster.StartLogging(klog.Infof) + c.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: c.client.CoreV1().Events("")}) + defer c.eventBroadcaster.Shutdown() + defer c.queue.ShutDown() klog.Infof("Starting endpoint slice controller") diff --git a/pkg/controller/endpointslicemirroring/endpointslicemirroring_controller.go b/pkg/controller/endpointslicemirroring/endpointslicemirroring_controller.go index 11664d98614..ecefec98fba 100644 --- a/pkg/controller/endpointslicemirroring/endpointslicemirroring_controller.go +++ b/pkg/controller/endpointslicemirroring/endpointslicemirroring_controller.go @@ -75,8 +75,6 @@ func NewController(endpointsInformer coreinformers.EndpointsInformer, endpointUpdatesBatchPeriod time.Duration, ) *Controller { broadcaster := record.NewBroadcaster() - broadcaster.StartLogging(klog.Infof) - broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: client.CoreV1().Events("")}) recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "endpoint-slice-mirroring-controller"}) if client != nil && client.CoreV1().RESTClient().GetRateLimiter() != nil { @@ -207,6 +205,12 @@ type Controller struct { // Run will not return until stopCh is closed. func (c *Controller) Run(workers int, stopCh <-chan struct{}) { defer utilruntime.HandleCrash() + + // Start events processing pipeline. + c.eventBroadcaster.StartLogging(klog.Infof) + c.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: c.client.CoreV1().Events("")}) + defer c.eventBroadcaster.Shutdown() + defer c.queue.ShutDown() klog.Infof("Starting EndpointSliceMirroring controller") diff --git a/pkg/controller/endpointslicemirroring/endpointslicemirroring_controller_test.go b/pkg/controller/endpointslicemirroring/endpointslicemirroring_controller_test.go index 883db710da2..960e09855e0 100644 --- a/pkg/controller/endpointslicemirroring/endpointslicemirroring_controller_test.go +++ b/pkg/controller/endpointslicemirroring/endpointslicemirroring_controller_test.go @@ -28,9 +28,11 @@ import ( "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" + v1core "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/leaderelection/resourcelock" + "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/controller" ) @@ -58,6 +60,11 @@ func newController(batchPeriod time.Duration) (*fake.Clientset, *endpointSliceMi client, batchPeriod) + // The event processing pipeline is normally started via Run() method. + // However, since we don't start it in unit tests, we explicitly start it here. + esController.eventBroadcaster.StartLogging(klog.Infof) + esController.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: client.CoreV1().Events("")}) + esController.endpointsSynced = alwaysReady esController.endpointSlicesSynced = alwaysReady esController.servicesSynced = alwaysReady