Merge pull request #115291 from sarveshr7/multicidr-tests

Add integration tests for MultiCIDRRangeAllocator
This commit is contained in:
Kubernetes Prow Robot
2023-01-31 01:47:00 -08:00
committed by GitHub
2 changed files with 514 additions and 39 deletions

View File

@@ -168,8 +168,26 @@ func NewMultiCIDRRangeAllocator(
}
clusterCIDRInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: createClusterCIDRHandler(ra.reconcileCreate),
DeleteFunc: createClusterCIDRHandler(ra.reconcileDelete),
AddFunc: func(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err == nil {
ra.queue.Add(key)
}
},
UpdateFunc: func(old, new interface{}) {
key, err := cache.MetaNamespaceKeyFunc(new)
if err == nil {
ra.queue.Add(key)
}
},
DeleteFunc: func(obj interface{}) {
// IndexerInformer uses a delta nodeQueue, therefore for deletes we have to use this
// key function.
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err == nil {
ra.queue.Add(key)
}
},
})
if allocatorParams.ServiceCIDR != nil {
@@ -262,6 +280,7 @@ func (r *multiCIDRRangeAllocator) Run(stopCh <-chan struct{}) {
defer raWaitGroup.Done()
r.worker(stopCh)
}()
go wait.Until(r.runWorker, time.Second, stopCh)
}
raWaitGroup.Wait()
@@ -269,6 +288,69 @@ func (r *multiCIDRRangeAllocator) Run(stopCh <-chan struct{}) {
<-stopCh
}
// runWorker is a long-running function that will continually call the
// processNextWorkItem function in order to read and process a message on the
// workqueue.
func (r *multiCIDRRangeAllocator) runWorker() {
for r.processNextWorkItem() {
}
}
// processNextWorkItem will read a single work item off the workqueue and
// attempt to process it, by calling the syncHandler.
func (r *multiCIDRRangeAllocator) processNextWorkItem() bool {
obj, shutdown := r.queue.Get()
if shutdown {
return false
}
// We wrap this block in a func so we can defer c.workqueue.Done.
err := func(obj interface{}) error {
// We call Done here so the queue knows we have finished
// processing this item. We also must remember to call Forget if we
// do not want this work item being re-queued. For example, we do
// not call Forget if a transient error occurs, instead the item is
// put back on the queue and attempted again after a back-off
// period.
defer r.queue.Done(obj)
var key string
var ok bool
// We expect strings to come off the workqueue. These are of the
// form namespace/name. We do this as the delayed nature of the
// workqueue means the items in the informer cache may actually be
// more up to date that when the item was initially put onto the
// workqueue.
if key, ok = obj.(string); !ok {
// As the item in the workqueue is actually invalid, we call
// Forget here else we'd go into a loop of attempting to
// process a work item that is invalid.
r.queue.Forget(obj)
utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
return nil
}
// Run the syncHandler, passing it the namespace/name string of the
// Foo resource to be synced.
if err := r.syncClusterCIDR(key); err != nil {
// Put the item back on the workqueue to handle any transient errors.
r.queue.AddRateLimited(key)
return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error())
}
// Finally, if no error occurs we Forget this item so it does not
// get queued again until another change happens.
r.queue.Forget(obj)
klog.Infof("Successfully synced '%s'", key)
return nil
}(obj)
if err != nil {
utilruntime.HandleError(err)
return true
}
return true
}
func (r *multiCIDRRangeAllocator) worker(stopChan <-chan struct{}) {
for {
select {
@@ -301,16 +383,6 @@ func (r *multiCIDRRangeAllocator) worker(stopChan <-chan struct{}) {
}
}
// createClusterCIDRHandler creates clusterCIDR handler.
func createClusterCIDRHandler(f func(ccc *networkingv1alpha1.ClusterCIDR) error) func(obj interface{}) {
return func(originalObj interface{}) {
ccc := originalObj.(*networkingv1alpha1.ClusterCIDR)
if err := f(ccc); err != nil {
utilruntime.HandleError(fmt.Errorf("error while processing ClusterCIDR Add/Delete: %w", err))
}
}
}
// needToAddFinalizer checks if a finalizer should be added to the object.
func needToAddFinalizer(obj metav1.Object, finalizer string) bool {
return obj.GetDeletionTimestamp() == nil && !slice.ContainsString(obj.GetFinalizers(),
@@ -392,7 +464,7 @@ func (r *multiCIDRRangeAllocator) occupyCIDRs(node *v1.Node) error {
return nil
}
clusterCIDRList, err := r.orderedMatchingClusterCIDRs(node)
clusterCIDRList, err := r.orderedMatchingClusterCIDRs(node, true)
if err != nil {
return err
}
@@ -687,7 +759,7 @@ func defaultNodeSelector() ([]byte, error) {
// Returns 1 CIDR if single stack.
// Returns 2 CIDRs , 1 from each ip family if dual stack.
func (r *multiCIDRRangeAllocator) prioritizedCIDRs(node *v1.Node) ([]*net.IPNet, *cidrset.ClusterCIDR, error) {
clusterCIDRList, err := r.orderedMatchingClusterCIDRs(node)
clusterCIDRList, err := r.orderedMatchingClusterCIDRs(node, true)
if err != nil {
return nil, nil, fmt.Errorf("unable to get a clusterCIDR for node %s: %w", node.Name, err)
}
@@ -782,7 +854,7 @@ func (r *multiCIDRRangeAllocator) cidrOverlapWithAllocatedList(cidr *net.IPNet)
// allocatedClusterCIDR returns the ClusterCIDR from which the node CIDRs were allocated.
func (r *multiCIDRRangeAllocator) allocatedClusterCIDR(node *v1.Node) (*cidrset.ClusterCIDR, error) {
clusterCIDRList, err := r.orderedMatchingClusterCIDRs(node)
clusterCIDRList, err := r.orderedMatchingClusterCIDRs(node, false)
if err != nil {
return nil, fmt.Errorf("unable to get a clusterCIDR for node %s: %w", node.Name, err)
}
@@ -802,7 +874,11 @@ func (r *multiCIDRRangeAllocator) allocatedClusterCIDR(node *v1.Node) (*cidrset.
// P2: ClusterCIDR with a PerNodeMaskSize having fewer IPs has higher priority.
// P3: ClusterCIDR having label with lower alphanumeric value has higher priority.
// P4: ClusterCIDR with a cidrSet having a smaller IP address value has a higher priority.
func (r *multiCIDRRangeAllocator) orderedMatchingClusterCIDRs(node *v1.Node) ([]*cidrset.ClusterCIDR, error) {
//
// orderedMatchingClusterCIDRs takes `occupy` as an argument, it determines whether the function
// is called during an occupy or a release operation. For a release operation, a ClusterCIDR must
// be added to the matching ClusterCIDRs list, irrespective of whether the ClusterCIDR is terminating.
func (r *multiCIDRRangeAllocator) orderedMatchingClusterCIDRs(node *v1.Node, occupy bool) ([]*cidrset.ClusterCIDR, error) {
matchingCIDRs := make([]*cidrset.ClusterCIDR, 0)
pq := make(PriorityQueue, 0)
@@ -824,7 +900,8 @@ func (r *multiCIDRRangeAllocator) orderedMatchingClusterCIDRs(node *v1.Node) ([]
}
// Only push the CIDRsets which are not marked for termination.
if !clusterCIDR.Terminating {
// Always push the CIDRsets when marked for release.
if !occupy || !clusterCIDR.Terminating {
heap.Push(&pq, pqItem)
}
}
@@ -1090,19 +1167,22 @@ func (r *multiCIDRRangeAllocator) mapClusterCIDRSet(cidrMap map[string][]*cidrse
return nil
}
// reconcileDelete deletes the ClusterCIDR object and removes the finalizer.
// reconcileDelete releases the assigned ClusterCIDR and removes the finalizer
// if the deletion timestamp is set.
func (r *multiCIDRRangeAllocator) reconcileDelete(clusterCIDR *networkingv1alpha1.ClusterCIDR) error {
r.lock.Lock()
defer r.lock.Unlock()
if slice.ContainsString(clusterCIDR.GetFinalizers(), clusterCIDRFinalizer, nil) {
klog.V(2).Infof("Releasing ClusterCIDR: %s", clusterCIDR.Name)
if err := r.deleteClusterCIDR(clusterCIDR); err != nil {
klog.V(2).Infof("Error while deleting ClusterCIDR: %+v", err)
return err
}
// Remove the finalizer as delete is successful.
cccCopy := clusterCIDR.DeepCopy()
cccCopy.ObjectMeta.Finalizers = slice.RemoveString(cccCopy.ObjectMeta.Finalizers, clusterCIDRFinalizer, nil)
if _, err := r.client.NetworkingV1alpha1().ClusterCIDRs().Update(context.TODO(), clusterCIDR, metav1.UpdateOptions{}); err != nil {
if _, err := r.client.NetworkingV1alpha1().ClusterCIDRs().Update(context.TODO(), cccCopy, metav1.UpdateOptions{}); err != nil {
klog.V(2).Infof("Error removing finalizer for ClusterCIDR %s: %v", clusterCIDR.Name, err)
return err
}