Use the generic/typed workqueue throughout

This change makes us use the generic workqueue throughout the project in
order to improve type safety and readability of the code.
This commit is contained in:
Alvaro Aleman
2024-04-28 18:26:18 +02:00
parent d387c0c903
commit 6d0ac8c561
94 changed files with 830 additions and 603 deletions

View File

@@ -57,7 +57,7 @@ type NamespaceController struct {
// returns true when the namespace cache is ready
listerSynced cache.InformerSynced
// namespaces that have been queued up for processing by workers
queue workqueue.RateLimitingInterface
queue workqueue.TypedRateLimitingInterface[string]
// helper to delete all resources in the namespace when the namespace is deleted.
namespacedResourcesDeleter deletion.NamespacedResourcesDeleterInterface
}
@@ -74,7 +74,12 @@ func NewNamespaceController(
// create the controller so we can inject the enqueue function
namespaceController := &NamespaceController{
queue: workqueue.NewNamedRateLimitingQueue(nsControllerRateLimiter(), "namespace"),
queue: workqueue.NewTypedRateLimitingQueueWithConfig(
nsControllerRateLimiter(),
workqueue.TypedRateLimitingQueueConfig[string]{
Name: "namespace",
},
),
namespacedResourcesDeleter: deletion.NewNamespacedResourcesDeleter(ctx, kubeClient.CoreV1().Namespaces(), metadataClient, kubeClient.CoreV1(), discoverResourcesFn, finalizerToken),
}
@@ -101,12 +106,12 @@ func NewNamespaceController(
// nsControllerRateLimiter is tuned for a faster than normal recycle time with default backoff speed and default overall
// requeing speed. We do this so that namespace cleanup is reliably faster and we know that the number of namespaces being
// deleted is smaller than total number of other namespace scoped resources in a cluster.
func nsControllerRateLimiter() workqueue.RateLimiter {
return workqueue.NewMaxOfRateLimiter(
func nsControllerRateLimiter() workqueue.TypedRateLimiter[string] {
return workqueue.NewTypedMaxOfRateLimiter(
// this ensures that we retry namespace deletion at least every minute, never longer.
workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, 60*time.Second),
workqueue.NewTypedItemExponentialFailureRateLimiter[string](5*time.Millisecond, 60*time.Second),
// 10 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item)
&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
&workqueue.TypedBucketRateLimiter[string]{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
)
}
@@ -142,7 +147,7 @@ func (nm *NamespaceController) worker(ctx context.Context) {
}
defer nm.queue.Done(key)
err := nm.syncNamespaceFromKey(ctx, key.(string))
err := nm.syncNamespaceFromKey(ctx, key)
if err == nil {
// no error, forget this entry and return
nm.queue.Forget(key)