Merge pull request #110529 from wojtek-t/fix_leaking_goroutines_7
Fix leaking goroutines in multiple integration tests
This commit is contained in:
@@ -77,6 +77,9 @@ type GarbageCollector struct {
|
||||
// GC caches the owners that do not exist according to the API server.
|
||||
absentOwnerCache *ReferenceCache
|
||||
|
||||
kubeClient clientset.Interface
|
||||
eventBroadcaster record.EventBroadcaster
|
||||
|
||||
workerLock sync.RWMutex
|
||||
}
|
||||
|
||||
@@ -94,8 +97,6 @@ func NewGarbageCollector(
|
||||
) (*GarbageCollector, error) {
|
||||
|
||||
eventBroadcaster := record.NewBroadcaster()
|
||||
eventBroadcaster.StartStructuredLogging(0)
|
||||
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
|
||||
eventRecorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "garbage-collector-controller"})
|
||||
|
||||
attemptToDelete := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "garbage_collector_attempt_to_delete")
|
||||
@@ -107,6 +108,8 @@ func NewGarbageCollector(
|
||||
attemptToDelete: attemptToDelete,
|
||||
attemptToOrphan: attemptToOrphan,
|
||||
absentOwnerCache: absentOwnerCache,
|
||||
kubeClient: kubeClient,
|
||||
eventBroadcaster: eventBroadcaster,
|
||||
}
|
||||
gc.dependencyGraphBuilder = &GraphBuilder{
|
||||
eventRecorder: eventRecorder,
|
||||
@@ -146,6 +149,11 @@ func (gc *GarbageCollector) Run(ctx context.Context, workers int) {
|
||||
defer gc.attemptToOrphan.ShutDown()
|
||||
defer gc.dependencyGraphBuilder.graphChanges.ShutDown()
|
||||
|
||||
// Start events processing pipeline.
|
||||
gc.eventBroadcaster.StartStructuredLogging(0)
|
||||
gc.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: gc.kubeClient.CoreV1().Events("")})
|
||||
defer gc.eventBroadcaster.Shutdown()
|
||||
|
||||
klog.Infof("Starting garbage collector controller")
|
||||
defer klog.Infof("Shutting down garbage collector controller")
|
||||
|
||||
|
@@ -269,6 +269,7 @@ func (rq *Controller) worker(ctx context.Context, queue workqueue.RateLimitingIn
|
||||
func (rq *Controller) Run(ctx context.Context, workers int) {
|
||||
defer utilruntime.HandleCrash()
|
||||
defer rq.queue.ShutDown()
|
||||
defer rq.missingUsageQueue.ShutDown()
|
||||
|
||||
klog.Infof("Starting resource quota controller")
|
||||
defer klog.Infof("Shutting down resource quota controller")
|
||||
|
@@ -305,6 +305,8 @@ func (qm *QuotaMonitor) IsSynced() bool {
|
||||
// Run sets the stop channel and starts monitor execution until stopCh is
|
||||
// closed. Any running monitors will be stopped before Run returns.
|
||||
func (qm *QuotaMonitor) Run(stopCh <-chan struct{}) {
|
||||
defer utilruntime.HandleCrash()
|
||||
|
||||
klog.Infof("QuotaMonitor running")
|
||||
defer klog.Infof("QuotaMonitor stopping")
|
||||
|
||||
@@ -317,6 +319,15 @@ func (qm *QuotaMonitor) Run(stopCh <-chan struct{}) {
|
||||
// Start monitors and begin change processing until the stop channel is
|
||||
// closed.
|
||||
qm.StartMonitors()
|
||||
|
||||
// The following workers are hanging forever until the queue is
|
||||
// shutted down, so we need to shut it down in a separate goroutine.
|
||||
go func() {
|
||||
defer utilruntime.HandleCrash()
|
||||
defer qm.resourceChanges.ShutDown()
|
||||
|
||||
<-stopCh
|
||||
}()
|
||||
wait.Until(qm.runProcessResourceChanges, 1*time.Second, stopCh)
|
||||
|
||||
// Stop any running monitors.
|
||||
|
Reference in New Issue
Block a user