Add a new workqueue to endpointslice controller for updating topology

cache and checking node topology distribution.
This commit is contained in:
hbostan
2024-06-03 12:41:37 +00:00
parent 548d50da98
commit db827e67fc
2 changed files with 78 additions and 44 deletions

View File

@@ -75,6 +75,9 @@ const (
// controllerName is a unique value used with LabelManagedBy to indicated
// the component managing an EndpointSlice.
controllerName = "endpointslice-controller.k8s.io"
// topologyQueueItemKey is the key for all items in the topologyQueue.
topologyQueueItemKey = "topologyQueueItemKey"
)
// NewController creates and initializes a new Controller
@@ -99,7 +102,7 @@ func NewController(ctx context.Context, podInformer coreinformers.PodInformer,
// such as an update to a Service or Deployment. A more significant
// rate limit back off here helps ensure that the Controller does not
// overwhelm the API Server.
queue: workqueue.NewTypedRateLimitingQueueWithConfig(
serviceQueue: workqueue.NewTypedRateLimitingQueueWithConfig(
workqueue.NewTypedMaxOfRateLimiter(
workqueue.NewTypedItemExponentialFailureRateLimiter[string](defaultSyncBackOff, maxSyncBackOff),
// 10 qps, 100 bucket size. This is only for retry speed and its
@@ -110,6 +113,9 @@ func NewController(ctx context.Context, podInformer coreinformers.PodInformer,
Name: "endpoint_slice",
},
),
topologyQueue: workqueue.NewTypedRateLimitingQueue[string](
workqueue.DefaultTypedControllerRateLimiter[string](),
),
workerLoopPeriod: time.Second,
}
@@ -158,14 +164,14 @@ func NewController(ctx context.Context, podInformer coreinformers.PodInformer,
if utilfeature.DefaultFeatureGate.Enabled(features.TopologyAwareHints) {
nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
c.addNode(logger, obj)
AddFunc: func(_ interface{}) {
c.addNode()
},
UpdateFunc: func(oldObj, newObj interface{}) {
c.updateNode(logger, oldObj, newObj)
c.updateNode(oldObj, newObj)
},
DeleteFunc: func(obj interface{}) {
c.deleteNode(logger, obj)
DeleteFunc: func(_ interface{}) {
c.deleteNode()
},
})
@@ -236,7 +242,11 @@ type Controller struct {
// more often than services with few pods; it also would cause a
// service that's inserted multiple times to be processed more than
// necessary.
queue workqueue.TypedRateLimitingInterface[string]
serviceQueue workqueue.TypedRateLimitingInterface[string]
// topologyQueue is used to trigger a topology cache update and checking node
// topology distribution.
topologyQueue workqueue.TypedRateLimitingInterface[string]
// maxEndpointsPerSlice references the maximum number of endpoints that
// should be added to an EndpointSlice
@@ -264,7 +274,8 @@ func (c *Controller) Run(ctx context.Context, workers int) {
c.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: c.client.CoreV1().Events("")})
defer c.eventBroadcaster.Shutdown()
defer c.queue.ShutDown()
defer c.serviceQueue.ShutDown()
defer c.topologyQueue.ShutDown()
logger := klog.FromContext(ctx)
logger.Info("Starting endpoint slice controller")
@@ -274,29 +285,31 @@ func (c *Controller) Run(ctx context.Context, workers int) {
return
}
logger.V(2).Info("Starting worker threads", "total", workers)
logger.V(2).Info("Starting service queue worker threads", "total", workers)
for i := 0; i < workers; i++ {
go wait.Until(func() { c.worker(logger) }, c.workerLoopPeriod, ctx.Done())
go wait.Until(func() { c.serviceQueueWorker(logger) }, c.workerLoopPeriod, ctx.Done())
}
logger.V(2).Info("Starting topology queue worker threads", "total", 1)
go wait.Until(func() { c.topologyQueueWorker(logger) }, c.workerLoopPeriod, ctx.Done())
<-ctx.Done()
}
// worker runs a worker thread that just dequeues items, processes them, and
// marks them done. You may run as many of these in parallel as you wish; the
// workqueue guarantees that they will not end up processing the same service
// at the same time
func (c *Controller) worker(logger klog.Logger) {
for c.processNextWorkItem(logger) {
// serviceQueueWorker runs a worker thread that just dequeues items, processes
// them, and marks them done. You may run as many of these in parallel as you
// wish; the workqueue guarantees that they will not end up processing the same
// service at the same time
func (c *Controller) serviceQueueWorker(logger klog.Logger) {
for c.processNextServiceWorkItem(logger) {
}
}
func (c *Controller) processNextWorkItem(logger klog.Logger) bool {
cKey, quit := c.queue.Get()
func (c *Controller) processNextServiceWorkItem(logger klog.Logger) bool {
cKey, quit := c.serviceQueue.Get()
if quit {
return false
}
defer c.queue.Done(cKey)
defer c.serviceQueue.Done(cKey)
err := c.syncService(logger, cKey)
c.handleErr(logger, err, cKey)
@@ -304,22 +317,37 @@ func (c *Controller) processNextWorkItem(logger klog.Logger) bool {
return true
}
func (c *Controller) topologyQueueWorker(logger klog.Logger) {
for c.processNextTopologyWorkItem(logger) {
}
}
func (c *Controller) processNextTopologyWorkItem(logger klog.Logger) bool {
key, quit := c.topologyQueue.Get()
if quit {
return false
}
defer c.topologyQueue.Done(key)
c.checkNodeTopologyDistribution(logger)
return true
}
func (c *Controller) handleErr(logger klog.Logger, err error, key string) {
trackSync(err)
if err == nil {
c.queue.Forget(key)
c.serviceQueue.Forget(key)
return
}
if c.queue.NumRequeues(key) < maxRetries {
if c.serviceQueue.NumRequeues(key) < maxRetries {
logger.Info("Error syncing endpoint slices for service, retrying", "key", key, "err", err)
c.queue.AddRateLimited(key)
c.serviceQueue.AddRateLimited(key)
return
}
logger.Info("Retry budget exceeded, dropping service out of the queue", "key", key, "err", err)
c.queue.Forget(key)
c.serviceQueue.Forget(key)
utilruntime.HandleError(err)
}
@@ -416,7 +444,7 @@ func (c *Controller) onServiceUpdate(obj interface{}) {
return
}
c.queue.Add(key)
c.serviceQueue.Add(key)
}
// onServiceDelete removes the Service Selector from the cache and queues the Service for processing.
@@ -427,7 +455,7 @@ func (c *Controller) onServiceDelete(obj interface{}) {
return
}
c.queue.Add(key)
c.serviceQueue.Add(key)
}
// onEndpointSliceAdd queues a sync for the relevant Service for a sync if the
@@ -500,7 +528,7 @@ func (c *Controller) queueServiceForEndpointSlice(endpointSlice *discovery.Endpo
if c.endpointUpdatesBatchPeriod > delay {
delay = c.endpointUpdatesBatchPeriod
}
c.queue.AddAfter(key, delay)
c.serviceQueue.AddAfter(key, delay)
}
func (c *Controller) addPod(obj interface{}) {
@@ -511,14 +539,14 @@ func (c *Controller) addPod(obj interface{}) {
return
}
for key := range services {
c.queue.AddAfter(key, c.endpointUpdatesBatchPeriod)
c.serviceQueue.AddAfter(key, c.endpointUpdatesBatchPeriod)
}
}
func (c *Controller) updatePod(old, cur interface{}) {
services := endpointsliceutil.GetServicesToUpdateOnPodChange(c.serviceLister, old, cur)
for key := range services {
c.queue.AddAfter(key, c.endpointUpdatesBatchPeriod)
c.serviceQueue.AddAfter(key, c.endpointUpdatesBatchPeriod)
}
}
@@ -531,11 +559,11 @@ func (c *Controller) deletePod(obj interface{}) {
}
}
func (c *Controller) addNode(logger klog.Logger, obj interface{}) {
c.checkNodeTopologyDistribution(logger)
func (c *Controller) addNode() {
c.topologyQueue.Add(topologyQueueItemKey)
}
func (c *Controller) updateNode(logger klog.Logger, old, cur interface{}) {
func (c *Controller) updateNode(old, cur interface{}) {
oldNode := old.(*v1.Node)
curNode := cur.(*v1.Node)
@@ -543,12 +571,12 @@ func (c *Controller) updateNode(logger klog.Logger, old, cur interface{}) {
// The topology cache should be updated in this case.
if isNodeReady(oldNode) != isNodeReady(curNode) ||
oldNode.Labels[v1.LabelTopologyZone] != curNode.Labels[v1.LabelTopologyZone] {
c.checkNodeTopologyDistribution(logger)
c.topologyQueue.Add(topologyQueueItemKey)
}
}
func (c *Controller) deleteNode(logger klog.Logger, obj interface{}) {
c.checkNodeTopologyDistribution(logger)
func (c *Controller) deleteNode() {
c.topologyQueue.Add(topologyQueueItemKey)
}
// checkNodeTopologyDistribution updates Nodes in the topology cache and then
@@ -566,7 +594,7 @@ func (c *Controller) checkNodeTopologyDistribution(logger klog.Logger) {
serviceKeys := c.topologyCache.GetOverloadedServices()
for _, serviceKey := range serviceKeys {
logger.V(2).Info("Queuing Service after Node change due to overloading", "key", serviceKey)
c.queue.Add(serviceKey)
c.serviceQueue.Add(serviceKey)
}
}