Clean shutdown of event broadcaster in controllers

This commit is contained in:
Wojciech Tyczyński 2022-10-26 14:51:46 +02:00
parent 91dbd82cde
commit c84c27b6ac
2 changed files with 22 additions and 15 deletions

View File

@ -62,8 +62,11 @@ var (
// ControllerV2 is a controller for CronJobs. // ControllerV2 is a controller for CronJobs.
// Refactored Cronjob controller that uses DelayingQueue and informers // Refactored Cronjob controller that uses DelayingQueue and informers
type ControllerV2 struct { type ControllerV2 struct {
queue workqueue.RateLimitingInterface queue workqueue.RateLimitingInterface
recorder record.EventRecorder
kubeClient clientset.Interface
recorder record.EventRecorder
broadcaster record.EventBroadcaster
jobControl jobControlInterface jobControl jobControlInterface
cronJobControl cjControlInterface cronJobControl cjControlInterface
@ -81,12 +84,12 @@ type ControllerV2 struct {
// NewControllerV2 creates and initializes a new Controller. // NewControllerV2 creates and initializes a new Controller.
func NewControllerV2(jobInformer batchv1informers.JobInformer, cronJobsInformer batchv1informers.CronJobInformer, kubeClient clientset.Interface) (*ControllerV2, error) { func NewControllerV2(jobInformer batchv1informers.JobInformer, cronJobsInformer batchv1informers.CronJobInformer, kubeClient clientset.Interface) (*ControllerV2, error) {
eventBroadcaster := record.NewBroadcaster() eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartStructuredLogging(0)
eventBroadcaster.StartRecordingToSink(&covev1client.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
jm := &ControllerV2{ jm := &ControllerV2{
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "cronjob"), queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "cronjob"),
recorder: eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: "cronjob-controller"}), kubeClient: kubeClient,
broadcaster: eventBroadcaster,
recorder: eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: "cronjob-controller"}),
jobControl: realJobControl{KubeClient: kubeClient}, jobControl: realJobControl{KubeClient: kubeClient},
cronJobControl: &realCJControl{KubeClient: kubeClient}, cronJobControl: &realCJControl{KubeClient: kubeClient},
@ -123,6 +126,12 @@ func NewControllerV2(jobInformer batchv1informers.JobInformer, cronJobsInformer
// Run starts the main goroutine responsible for watching and syncing jobs. // Run starts the main goroutine responsible for watching and syncing jobs.
func (jm *ControllerV2) Run(ctx context.Context, workers int) { func (jm *ControllerV2) Run(ctx context.Context, workers int) {
defer utilruntime.HandleCrash() defer utilruntime.HandleCrash()
// Start event processing pipeline.
jm.broadcaster.StartStructuredLogging(0)
jm.broadcaster.StartRecordingToSink(&covev1client.EventSinkImpl{Interface: jm.kubeClient.CoreV1().Events("")})
defer jm.broadcaster.Shutdown()
defer jm.queue.ShutDown() defer jm.queue.ShutDown()
klog.InfoS("Starting cronjob controller v2") klog.InfoS("Starting cronjob controller v2")

View File

@ -55,6 +55,7 @@ type Controller struct {
serviceCIDR *net.IPNet serviceCIDR *net.IPNet
secondaryServiceCIDR *net.IPNet secondaryServiceCIDR *net.IPNet
kubeClient clientset.Interface kubeClient clientset.Interface
eventBroadcaster record.EventBroadcaster
// Method for easy mocking in unittest. // Method for easy mocking in unittest.
lookupIP func(host string) ([]net.IP, error) lookupIP func(host string) ([]net.IP, error)
@ -84,15 +85,6 @@ func NewNodeIpamController(
klog.Fatalf("kubeClient is nil when starting Controller") klog.Fatalf("kubeClient is nil when starting Controller")
} }
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartStructuredLogging(0)
klog.Infof("Sending events to api server.")
eventBroadcaster.StartRecordingToSink(
&v1core.EventSinkImpl{
Interface: kubeClient.CoreV1().Events(""),
})
// Cloud CIDR allocator does not rely on clusterCIDR or nodeCIDRMaskSize for allocation. // Cloud CIDR allocator does not rely on clusterCIDR or nodeCIDRMaskSize for allocation.
if allocatorType != ipam.CloudAllocatorType { if allocatorType != ipam.CloudAllocatorType {
if len(clusterCIDRs) == 0 { if len(clusterCIDRs) == 0 {
@ -110,6 +102,7 @@ func NewNodeIpamController(
ic := &Controller{ ic := &Controller{
cloud: cloud, cloud: cloud,
kubeClient: kubeClient, kubeClient: kubeClient,
eventBroadcaster: record.NewBroadcaster(),
lookupIP: net.LookupIP, lookupIP: net.LookupIP,
clusterCIDRs: clusterCIDRs, clusterCIDRs: clusterCIDRs,
serviceCIDR: serviceCIDR, serviceCIDR: serviceCIDR,
@ -146,6 +139,11 @@ func NewNodeIpamController(
func (nc *Controller) Run(stopCh <-chan struct{}) { func (nc *Controller) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash() defer utilruntime.HandleCrash()
// Start event processing pipeline.
nc.eventBroadcaster.StartStructuredLogging(0)
nc.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: nc.kubeClient.CoreV1().Events("")})
defer nc.eventBroadcaster.Shutdown()
klog.Infof("Starting ipam controller") klog.Infof("Starting ipam controller")
defer klog.Infof("Shutting down ipam controller") defer klog.Infof("Shutting down ipam controller")