From c84c27b6ac797347cf03c6776c86327c33a36908 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Tyczy=C5=84ski?= Date: Wed, 26 Oct 2022 14:51:46 +0200 Subject: [PATCH] Clean shutdown of event broadcaster in controllers --- .../cronjob/cronjob_controllerv2.go | 21 +++++++++++++------ .../nodeipam/node_ipam_controller.go | 16 +++++++------- 2 files changed, 22 insertions(+), 15 deletions(-) diff --git a/pkg/controller/cronjob/cronjob_controllerv2.go b/pkg/controller/cronjob/cronjob_controllerv2.go index 3c0dda0bd67..551861a1825 100644 --- a/pkg/controller/cronjob/cronjob_controllerv2.go +++ b/pkg/controller/cronjob/cronjob_controllerv2.go @@ -62,8 +62,11 @@ var ( // ControllerV2 is a controller for CronJobs. // Refactored Cronjob controller that uses DelayingQueue and informers type ControllerV2 struct { - queue workqueue.RateLimitingInterface - recorder record.EventRecorder + queue workqueue.RateLimitingInterface + + kubeClient clientset.Interface + recorder record.EventRecorder + broadcaster record.EventBroadcaster jobControl jobControlInterface cronJobControl cjControlInterface @@ -81,12 +84,12 @@ type ControllerV2 struct { // NewControllerV2 creates and initializes a new Controller. func NewControllerV2(jobInformer batchv1informers.JobInformer, cronJobsInformer batchv1informers.CronJobInformer, kubeClient clientset.Interface) (*ControllerV2, error) { eventBroadcaster := record.NewBroadcaster() - eventBroadcaster.StartStructuredLogging(0) - eventBroadcaster.StartRecordingToSink(&covev1client.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")}) jm := &ControllerV2{ - queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "cronjob"), - recorder: eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: "cronjob-controller"}), + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "cronjob"), + kubeClient: kubeClient, + broadcaster: eventBroadcaster, + recorder: eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: "cronjob-controller"}), jobControl: realJobControl{KubeClient: kubeClient}, cronJobControl: &realCJControl{KubeClient: kubeClient}, @@ -123,6 +126,12 @@ func NewControllerV2(jobInformer batchv1informers.JobInformer, cronJobsInformer // Run starts the main goroutine responsible for watching and syncing jobs. func (jm *ControllerV2) Run(ctx context.Context, workers int) { defer utilruntime.HandleCrash() + + // Start event processing pipeline. + jm.broadcaster.StartStructuredLogging(0) + jm.broadcaster.StartRecordingToSink(&covev1client.EventSinkImpl{Interface: jm.kubeClient.CoreV1().Events("")}) + defer jm.broadcaster.Shutdown() + defer jm.queue.ShutDown() klog.InfoS("Starting cronjob controller v2") diff --git a/pkg/controller/nodeipam/node_ipam_controller.go b/pkg/controller/nodeipam/node_ipam_controller.go index b8dc32f33de..cb0853ffeab 100644 --- a/pkg/controller/nodeipam/node_ipam_controller.go +++ b/pkg/controller/nodeipam/node_ipam_controller.go @@ -55,6 +55,7 @@ type Controller struct { serviceCIDR *net.IPNet secondaryServiceCIDR *net.IPNet kubeClient clientset.Interface + eventBroadcaster record.EventBroadcaster // Method for easy mocking in unittest. lookupIP func(host string) ([]net.IP, error) @@ -84,15 +85,6 @@ func NewNodeIpamController( klog.Fatalf("kubeClient is nil when starting Controller") } - eventBroadcaster := record.NewBroadcaster() - eventBroadcaster.StartStructuredLogging(0) - - klog.Infof("Sending events to api server.") - eventBroadcaster.StartRecordingToSink( - &v1core.EventSinkImpl{ - Interface: kubeClient.CoreV1().Events(""), - }) - // Cloud CIDR allocator does not rely on clusterCIDR or nodeCIDRMaskSize for allocation. if allocatorType != ipam.CloudAllocatorType { if len(clusterCIDRs) == 0 { @@ -110,6 +102,7 @@ func NewNodeIpamController( ic := &Controller{ cloud: cloud, kubeClient: kubeClient, + eventBroadcaster: record.NewBroadcaster(), lookupIP: net.LookupIP, clusterCIDRs: clusterCIDRs, serviceCIDR: serviceCIDR, @@ -146,6 +139,11 @@ func NewNodeIpamController( func (nc *Controller) Run(stopCh <-chan struct{}) { defer utilruntime.HandleCrash() + // Start event processing pipeline. + nc.eventBroadcaster.StartStructuredLogging(0) + nc.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: nc.kubeClient.CoreV1().Events("")}) + defer nc.eventBroadcaster.Shutdown() + klog.Infof("Starting ipam controller") defer klog.Infof("Shutting down ipam controller")