kubernetes/pkg/controller/nodeipam/ipam/multi_cidr_range_allocator.go
Antonio Ojea fd62265d19 unexport buggy function nodeSelectorAsSelector
Change-Id: I1e48ac0dd0b33c367fa9be4f4adb11a4531849f9
2023-03-09 16:58:25 +00:00

1322 lines
46 KiB
Go

/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package ipam
import (
"container/heap"
"context"
"errors"
"fmt"
"math"
"net"
"sync"
"time"
v1 "k8s.io/api/core/v1"
networkingv1alpha1 "k8s.io/api/networking/v1alpha1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/selection"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
informers "k8s.io/client-go/informers/core/v1"
networkinginformers "k8s.io/client-go/informers/networking/v1alpha1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
corelisters "k8s.io/client-go/listers/core/v1"
networkinglisters "k8s.io/client-go/listers/networking/v1alpha1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
nodeutil "k8s.io/component-helpers/node/util"
"k8s.io/klog/v2"
cidrset "k8s.io/kubernetes/pkg/controller/nodeipam/ipam/multicidrset"
controllerutil "k8s.io/kubernetes/pkg/controller/util/node"
"k8s.io/kubernetes/pkg/util/slice"
netutil "k8s.io/utils/net"
)
const (
defaultClusterCIDRKey = "kubernetes.io/clusterCIDR"
defaultClusterCIDRValue = "default"
defaultClusterCIDRName = "default-cluster-cidr"
defaultClusterCIDRAPIVersion = "networking.k8s.io/v1alpha1"
clusterCIDRFinalizer = "networking.k8s.io/cluster-cidr-finalizer"
ipv4MaxCIDRMask = 32
ipv6MaxCIDRMask = 128
minPerNodeHostBits = 4
)
// CIDRs are reserved, then node resource is patched with them.
// multiCIDRNodeReservedCIDRs holds the reservation info for a node.
type multiCIDRNodeReservedCIDRs struct {
nodeReservedCIDRs
clusterCIDR *cidrset.ClusterCIDR
}
type multiCIDRRangeAllocator struct {
client clientset.Interface
// nodeLister is able to list/get nodes and is populated by the shared informer passed to controller.
nodeLister corelisters.NodeLister
// nodesSynced returns true if the node shared informer has been synced at least once.
nodesSynced cache.InformerSynced
// clusterCIDRLister is able to list/get clustercidrs and is populated by the shared informer passed to controller.
clusterCIDRLister networkinglisters.ClusterCIDRLister
// clusterCIDRSynced returns true if the clustercidr shared informer has been synced at least once.
clusterCIDRSynced cache.InformerSynced
// Channel that is used to pass updating Nodes and their reserved CIDRs to the background.
// This increases a throughput of CIDR assignment by not blocking on long operations.
nodeCIDRUpdateChannel chan multiCIDRNodeReservedCIDRs
broadcaster record.EventBroadcaster
recorder record.EventRecorder
// queues are where incoming work is placed to de-dup and to allow "easy"
// rate limited requeues on errors
cidrQueue workqueue.RateLimitingInterface
nodeQueue workqueue.RateLimitingInterface
// lock guards cidrMap to avoid races in CIDR allocation.
lock *sync.Mutex
// cidrMap maps ClusterCIDR labels to internal ClusterCIDR objects.
cidrMap map[string][]*cidrset.ClusterCIDR
}
// NewMultiCIDRRangeAllocator returns a CIDRAllocator to allocate CIDRs for node (one for each ip family).
// Caller must always pass in a list of existing nodes to the new allocator.
// NodeList is only nil in testing.
func NewMultiCIDRRangeAllocator(
ctx context.Context,
client clientset.Interface,
nodeInformer informers.NodeInformer,
clusterCIDRInformer networkinginformers.ClusterCIDRInformer,
allocatorParams CIDRAllocatorParams,
nodeList *v1.NodeList,
testCIDRMap map[string][]*cidrset.ClusterCIDR,
) (CIDRAllocator, error) {
logger := klog.FromContext(ctx)
if client == nil {
logger.Error(nil, "kubeClient is nil when starting multi CIDRRangeAllocator")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
eventBroadcaster := record.NewBroadcaster()
eventSource := v1.EventSource{
Component: "multiCIDRRangeAllocator",
}
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, eventSource)
ra := &multiCIDRRangeAllocator{
client: client,
nodeLister: nodeInformer.Lister(),
nodesSynced: nodeInformer.Informer().HasSynced,
clusterCIDRLister: clusterCIDRInformer.Lister(),
clusterCIDRSynced: clusterCIDRInformer.Informer().HasSynced,
nodeCIDRUpdateChannel: make(chan multiCIDRNodeReservedCIDRs, cidrUpdateQueueSize),
broadcaster: eventBroadcaster,
recorder: recorder,
cidrQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "multi_cidr_range_allocator_cidr"),
nodeQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "multi_cidr_range_allocator_node"),
lock: &sync.Mutex{},
cidrMap: make(map[string][]*cidrset.ClusterCIDR, 0),
}
// testCIDRMap is only set for testing purposes.
if len(testCIDRMap) > 0 {
ra.cidrMap = testCIDRMap
logger.Info("TestCIDRMap should only be set for testing purposes, if this is seen in production logs, it might be a misconfiguration or a bug")
}
ccList, err := listClusterCIDRs(ctx, client)
if err != nil {
return nil, err
}
if ccList == nil {
ccList = &networkingv1alpha1.ClusterCIDRList{}
}
createDefaultClusterCIDR(logger, ccList, allocatorParams)
// Regenerate the cidrMaps from the existing ClusterCIDRs.
for _, clusterCIDR := range ccList.Items {
logger.Info("Regenerating existing ClusterCIDR", "clusterCIDR", clusterCIDR)
// Create an event for invalid ClusterCIDRs, do not crash on failures.
if err := ra.reconcileBootstrap(ctx, &clusterCIDR); err != nil {
logger.Error(err, "Error while regenerating existing ClusterCIDR")
ra.recorder.Event(&clusterCIDR, "Warning", "InvalidClusterCIDR encountered while regenerating ClusterCIDR during bootstrap.", err.Error())
}
}
clusterCIDRInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err == nil {
ra.cidrQueue.Add(key)
}
},
UpdateFunc: func(old, new interface{}) {
key, err := cache.MetaNamespaceKeyFunc(new)
if err == nil {
ra.cidrQueue.Add(key)
}
},
DeleteFunc: func(obj interface{}) {
// IndexerInformer uses a delta nodeQueue, therefore for deletes we have to use this
// key function.
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err == nil {
ra.cidrQueue.Add(key)
}
},
})
if allocatorParams.ServiceCIDR != nil {
ra.filterOutServiceRange(logger, allocatorParams.ServiceCIDR)
} else {
logger.Info("No Service CIDR provided. Skipping filtering out service addresses")
}
if allocatorParams.SecondaryServiceCIDR != nil {
ra.filterOutServiceRange(logger, allocatorParams.SecondaryServiceCIDR)
} else {
logger.Info("No Secondary Service CIDR provided. Skipping filtering out secondary service addresses")
}
if nodeList != nil {
for _, node := range nodeList.Items {
if len(node.Spec.PodCIDRs) == 0 {
logger.V(4).Info("Node has no CIDR, ignoring", "node", klog.KObj(&node))
continue
}
logger.Info("Node has CIDR, occupying it in CIDR map", "node", klog.KObj(&node), "podCIDRs", node.Spec.PodCIDRs)
if err := ra.occupyCIDRs(logger, &node); err != nil {
// This will happen if:
// 1. We find garbage in the podCIDRs field. Retrying is useless.
// 2. CIDR out of range: This means ClusterCIDR is not yet created
// This error will keep crashing controller-manager until the
// appropriate ClusterCIDR has been created
return nil, err
}
}
}
nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err == nil {
ra.nodeQueue.Add(key)
}
},
UpdateFunc: func(old, new interface{}) {
key, err := cache.MetaNamespaceKeyFunc(new)
if err == nil {
ra.nodeQueue.Add(key)
}
},
DeleteFunc: func(obj interface{}) {
// The informer cache no longer has the object, and since Node doesn't have a finalizer,
// we don't see the Update with DeletionTimestamp != 0.
// TODO: instead of executing the operation directly in the handler, build a small cache with key node.Name
// and value PodCIDRs use ReleaseCIDR on the reconcile loop so we can retry on `ReleaseCIDR` failures.
ra.ReleaseCIDR(logger, obj.(*v1.Node))
// IndexerInformer uses a delta nodeQueue, therefore for deletes we have to use this
// key function.
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err == nil {
ra.nodeQueue.Add(key)
}
},
})
return ra, nil
}
func (r *multiCIDRRangeAllocator) Run(ctx context.Context) {
defer utilruntime.HandleCrash()
// Start event processing pipeline.
logger := klog.FromContext(ctx)
r.broadcaster.StartStructuredLogging(0)
logger.Info("Started sending events to API Server")
r.broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: r.client.CoreV1().Events("")})
defer r.broadcaster.Shutdown()
defer r.cidrQueue.ShutDown()
defer r.nodeQueue.ShutDown()
logger.Info("Starting Multi CIDR Range allocator")
defer logger.Info("Shutting down Multi CIDR Range allocator")
if !cache.WaitForNamedCacheSync("multi_cidr_range_allocator", ctx.Done(), r.nodesSynced, r.clusterCIDRSynced) {
return
}
for i := 0; i < cidrUpdateWorkers; i++ {
go wait.UntilWithContext(ctx, r.runCIDRWorker, time.Second)
go wait.UntilWithContext(ctx, r.runNodeWorker, time.Second)
}
<-ctx.Done()
}
// runWorker is a long-running function that will continually call the
// processNextWorkItem function in order to read and process a message on the
// cidrQueue.
func (r *multiCIDRRangeAllocator) runCIDRWorker(ctx context.Context) {
for r.processNextCIDRWorkItem(ctx) {
}
}
// processNextWorkItem will read a single work item off the cidrQueue and
// attempt to process it, by calling the syncHandler.
func (r *multiCIDRRangeAllocator) processNextCIDRWorkItem(ctx context.Context) bool {
obj, shutdown := r.cidrQueue.Get()
if shutdown {
return false
}
// We wrap this block in a func so we can defer c.cidrQueue.Done.
err := func(ctx context.Context, obj interface{}) error {
// We call Done here so the cidrQueue knows we have finished
// processing this item. We also must remember to call Forget if we
// do not want this work item being re-queued. For example, we do
// not call Forget if a transient error occurs, instead the item is
// put back on the cidrQueue and attempted again after a back-off
// period.
defer r.cidrQueue.Done(obj)
var key string
var ok bool
// We expect strings to come off the cidrQueue. These are of the
// form namespace/name. We do this as the delayed nature of the
// cidrQueue means the items in the informer cache may actually be
// more up to date that when the item was initially put onto the
// cidrQueue.
if key, ok = obj.(string); !ok {
// As the item in the cidrQueue is actually invalid, we call
// Forget here else we'd go into a loop of attempting to
// process a work item that is invalid.
r.cidrQueue.Forget(obj)
utilruntime.HandleError(fmt.Errorf("expected string in cidrQueue but got %#v", obj))
return nil
}
// Run the syncHandler, passing it the namespace/name string of the
// Foo resource to be synced.
if err := r.syncClusterCIDR(ctx, key); err != nil {
// Put the item back on the cidrQueue to handle any transient errors.
r.cidrQueue.AddRateLimited(key)
return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error())
}
// Finally, if no error occurs we Forget this item so it does not
// get cidrQueued again until another change happens.
r.cidrQueue.Forget(obj)
klog.Infof("Successfully synced '%s'", key)
return nil
}(ctx, obj)
if err != nil {
utilruntime.HandleError(err)
return true
}
return true
}
func (r *multiCIDRRangeAllocator) runNodeWorker(ctx context.Context) {
for r.processNextNodeWorkItem(ctx) {
}
}
// processNextWorkItem will read a single work item off the cidrQueue and
// attempt to process it, by calling the syncHandler.
func (r *multiCIDRRangeAllocator) processNextNodeWorkItem(ctx context.Context) bool {
obj, shutdown := r.nodeQueue.Get()
if shutdown {
return false
}
// We wrap this block in a func so we can defer c.cidrQueue.Done.
err := func(logger klog.Logger, obj interface{}) error {
// We call Done here so the workNodeQueue knows we have finished
// processing this item. We also must remember to call Forget if we
// do not want this work item being re-queued. For example, we do
// not call Forget if a transient error occurs, instead the item is
// put back on the nodeQueue and attempted again after a back-off
// period.
defer r.nodeQueue.Done(obj)
var key string
var ok bool
// We expect strings to come off the workNodeQueue. These are of the
// form namespace/name. We do this as the delayed nature of the
// workNodeQueue means the items in the informer cache may actually be
// more up to date that when the item was initially put onto the
// workNodeQueue.
if key, ok = obj.(string); !ok {
// As the item in the workNodeQueue is actually invalid, we call
// Forget here else we'd go into a loop of attempting to
// process a work item that is invalid.
r.nodeQueue.Forget(obj)
utilruntime.HandleError(fmt.Errorf("expected string in workNodeQueue but got %#v", obj))
return nil
}
// Run the syncHandler, passing it the namespace/name string of the
// Foo resource to be synced.
if err := r.syncNode(logger, key); err != nil {
// Put the item back on the cidrQueue to handle any transient errors.
r.nodeQueue.AddRateLimited(key)
return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error())
}
// Finally, if no error occurs we Forget this item so it does not
// get nodeQueue again until another change happens.
r.nodeQueue.Forget(obj)
klog.Infof("Successfully synced '%s'", key)
return nil
}(klog.FromContext(ctx), obj)
if err != nil {
utilruntime.HandleError(err)
return true
}
return true
}
func (r *multiCIDRRangeAllocator) syncNode(logger klog.Logger, key string) error {
startTime := time.Now()
defer func() {
klog.V(4).Infof("Finished syncing Node request %q (%v)", key, time.Since(startTime))
}()
node, err := r.nodeLister.Get(key)
if apierrors.IsNotFound(err) {
klog.V(3).Infof("node has been deleted: %v", key)
// TODO: obtain the node object information to call ReleaseCIDR from here
// and retry if there is an error.
return nil
}
if err != nil {
return err
}
// Check the DeletionTimestamp to determine if object is under deletion.
if !node.DeletionTimestamp.IsZero() {
klog.V(3).Infof("node is being deleted: %v", key)
return r.ReleaseCIDR(logger, node)
}
return r.AllocateOrOccupyCIDR(logger, node)
}
// needToAddFinalizer checks if a finalizer should be added to the object.
func needToAddFinalizer(obj metav1.Object, finalizer string) bool {
return obj.GetDeletionTimestamp() == nil && !slice.ContainsString(obj.GetFinalizers(),
finalizer, nil)
}
func (r *multiCIDRRangeAllocator) syncClusterCIDR(ctx context.Context, key string) error {
startTime := time.Now()
logger := klog.FromContext(ctx)
defer func() {
logger.V(4).Info("Finished syncing clusterCIDR request", "key", key, "latency", time.Since(startTime))
}()
clusterCIDR, err := r.clusterCIDRLister.Get(key)
if apierrors.IsNotFound(err) {
logger.V(3).Info("clusterCIDR has been deleted", "key", key)
return nil
}
if err != nil {
return err
}
// Check the DeletionTimestamp to determine if object is under deletion.
if !clusterCIDR.DeletionTimestamp.IsZero() {
return r.reconcileDelete(ctx, clusterCIDR)
}
return r.reconcileCreate(ctx, clusterCIDR)
}
// occupyCIDRs marks node.PodCIDRs[...] as used in allocator's tracked cidrSet.
func (r *multiCIDRRangeAllocator) occupyCIDRs(logger klog.Logger, node *v1.Node) error {
err := func(node *v1.Node) error {
if len(node.Spec.PodCIDRs) == 0 {
return nil
}
clusterCIDRList, err := r.orderedMatchingClusterCIDRs(logger, node, true)
if err != nil {
return err
}
for _, clusterCIDR := range clusterCIDRList {
occupiedCount := 0
for _, cidr := range node.Spec.PodCIDRs {
_, podCIDR, err := netutil.ParseCIDRSloppy(cidr)
if err != nil {
return fmt.Errorf("failed to parse CIDR %s on Node %v: %w", cidr, node.Name, err)
}
logger.Info("occupy CIDR for node", "CIDR", cidr, "node", klog.KObj(node))
if err := r.Occupy(clusterCIDR, podCIDR); err != nil {
logger.V(3).Info("Could not occupy cidr, trying next range", "podCIDRs", node.Spec.PodCIDRs, "err", err)
break
}
occupiedCount++
}
// Mark CIDRs as occupied only if the CCC is able to occupy all the node CIDRs.
if occupiedCount == len(node.Spec.PodCIDRs) {
clusterCIDR.AssociatedNodes[node.Name] = true
return nil
}
}
return fmt.Errorf("could not occupy cidrs: %v, No matching ClusterCIDRs found", node.Spec.PodCIDRs)
}(node)
return err
}
// associatedCIDRSet returns the CIDRSet, based on the ip family of the CIDR.
func (r *multiCIDRRangeAllocator) associatedCIDRSet(clusterCIDR *cidrset.ClusterCIDR, cidr *net.IPNet) (*cidrset.MultiCIDRSet, error) {
switch {
case netutil.IsIPv4CIDR(cidr):
return clusterCIDR.IPv4CIDRSet, nil
case netutil.IsIPv6CIDR(cidr):
return clusterCIDR.IPv6CIDRSet, nil
default:
return nil, fmt.Errorf("invalid cidr: %v", cidr)
}
}
// Occupy marks the CIDR as occupied in the allocatedCIDRMap of the cidrSet.
func (r *multiCIDRRangeAllocator) Occupy(clusterCIDR *cidrset.ClusterCIDR, cidr *net.IPNet) error {
currCIDRSet, err := r.associatedCIDRSet(clusterCIDR, cidr)
if err != nil {
return err
}
if err := currCIDRSet.Occupy(cidr); err != nil {
return fmt.Errorf("unable to occupy cidr %v in cidrSet", cidr)
}
return nil
}
// Release marks the CIDR as free in the cidrSet used bitmap,
// Also removes the CIDR from the allocatedCIDRSet.
func (r *multiCIDRRangeAllocator) Release(logger klog.Logger, clusterCIDR *cidrset.ClusterCIDR, cidr *net.IPNet) error {
currCIDRSet, err := r.associatedCIDRSet(clusterCIDR, cidr)
if err != nil {
return err
}
if err := currCIDRSet.Release(cidr); err != nil {
logger.Info("Unable to release cidr in cidrSet", "CIDR", cidr)
return err
}
return nil
}
// AllocateOrOccupyCIDR allocates a CIDR to the node if the node doesn't have a
// CIDR already allocated, occupies the CIDR and marks as used if the node
// already has a PodCIDR assigned.
// WARNING: If you're adding any return calls or defer any more work from this
// function you have to make sure to update nodesInProcessing properly with the
// disposition of the node when the work is done.
func (r *multiCIDRRangeAllocator) AllocateOrOccupyCIDR(logger klog.Logger, node *v1.Node) error {
r.lock.Lock()
defer r.lock.Unlock()
if node == nil {
return nil
}
if len(node.Spec.PodCIDRs) > 0 {
return r.occupyCIDRs(logger, node)
}
cidrs, clusterCIDR, err := r.prioritizedCIDRs(logger, node)
if err != nil {
controllerutil.RecordNodeStatusChange(r.recorder, node, "CIDRNotAvailable")
return fmt.Errorf("failed to get cidrs for node %s", node.Name)
}
if len(cidrs) == 0 {
controllerutil.RecordNodeStatusChange(r.recorder, node, "CIDRNotAvailable")
return fmt.Errorf("no cidrSets with matching labels found for node %s", node.Name)
}
// allocate and queue the assignment.
allocated := multiCIDRNodeReservedCIDRs{
nodeReservedCIDRs: nodeReservedCIDRs{
nodeName: node.Name,
allocatedCIDRs: cidrs,
},
clusterCIDR: clusterCIDR,
}
return r.updateCIDRsAllocation(logger, allocated)
}
// ReleaseCIDR marks node.podCIDRs[...] as unused in our tracked cidrSets.
func (r *multiCIDRRangeAllocator) ReleaseCIDR(logger klog.Logger, node *v1.Node) error {
r.lock.Lock()
defer r.lock.Unlock()
if node == nil || len(node.Spec.PodCIDRs) == 0 {
return nil
}
clusterCIDR, err := r.allocatedClusterCIDR(logger, node)
if err != nil {
return err
}
for _, cidr := range node.Spec.PodCIDRs {
_, podCIDR, err := netutil.ParseCIDRSloppy(cidr)
if err != nil {
return fmt.Errorf("failed to parse CIDR %q on Node %q: %w", cidr, node.Name, err)
}
logger.Info("release CIDR for node", "CIDR", cidr, "node", klog.KObj(node))
if err := r.Release(logger, clusterCIDR, podCIDR); err != nil {
return fmt.Errorf("failed to release cidr %q from clusterCIDR %q for node %q: %w", cidr, clusterCIDR.Name, node.Name, err)
}
}
// Remove the node from the ClusterCIDR AssociatedNodes.
delete(clusterCIDR.AssociatedNodes, node.Name)
return nil
}
// Marks all CIDRs with subNetMaskSize that belongs to serviceCIDR as used across all cidrs
// so that they won't be assignable.
func (r *multiCIDRRangeAllocator) filterOutServiceRange(logger klog.Logger, serviceCIDR *net.IPNet) {
// Checks if service CIDR has a nonempty intersection with cluster
// CIDR. It is the case if either clusterCIDR contains serviceCIDR with
// clusterCIDR's Mask applied (this means that clusterCIDR contains
// serviceCIDR) or vice versa (which means that serviceCIDR contains
// clusterCIDR).
for _, clusterCIDRList := range r.cidrMap {
for _, clusterCIDR := range clusterCIDRList {
if err := r.occupyServiceCIDR(clusterCIDR, serviceCIDR); err != nil {
logger.Error(err, "Unable to occupy service CIDR")
}
}
}
}
func (r *multiCIDRRangeAllocator) occupyServiceCIDR(clusterCIDR *cidrset.ClusterCIDR, serviceCIDR *net.IPNet) error {
cidrSet, err := r.associatedCIDRSet(clusterCIDR, serviceCIDR)
if err != nil {
return err
}
cidr := cidrSet.ClusterCIDR
// No need to occupy as Service CIDR doesn't intersect with the current ClusterCIDR.
if !cidr.Contains(serviceCIDR.IP.Mask(cidr.Mask)) && !serviceCIDR.Contains(cidr.IP.Mask(serviceCIDR.Mask)) {
return nil
}
if err := r.Occupy(clusterCIDR, serviceCIDR); err != nil {
return fmt.Errorf("error filtering out service cidr %v from cluster cidr %v: %w", cidr, serviceCIDR, err)
}
return nil
}
// updateCIDRsAllocation assigns CIDR to Node and sends an update to the API server.
func (r *multiCIDRRangeAllocator) updateCIDRsAllocation(logger klog.Logger, data multiCIDRNodeReservedCIDRs) error {
err := func(data multiCIDRNodeReservedCIDRs) error {
cidrsString := ipnetToStringList(data.allocatedCIDRs)
node, err := r.nodeLister.Get(data.nodeName)
if err != nil {
logger.Error(err, "Failed while getting node for updating Node.Spec.PodCIDRs", "node", klog.KRef("", data.nodeName))
return err
}
// if cidr list matches the proposed,
// then we possibly updated this node
// and just failed to ack the success.
if len(node.Spec.PodCIDRs) == len(data.allocatedCIDRs) {
match := true
for idx, cidr := range cidrsString {
if node.Spec.PodCIDRs[idx] != cidr {
match = false
break
}
}
if match {
logger.V(4).Info("Node already has allocated CIDR. It matches the proposed one.", "node", klog.KObj(node), "CIDRs", data.allocatedCIDRs)
return nil
}
}
// node has cidrs allocated, release the reserved.
if len(node.Spec.PodCIDRs) != 0 {
logger.Error(nil, "Node already has a CIDR allocated. Releasing the new one", "node", klog.KObj(node), "podCIDRs", node.Spec.PodCIDRs)
for _, cidr := range data.allocatedCIDRs {
if err := r.Release(logger, data.clusterCIDR, cidr); err != nil {
return fmt.Errorf("failed to release cidr %s from clusterCIDR %s for node: %s: %w", cidr, data.clusterCIDR.Name, node.Name, err)
}
}
return nil
}
// If we reached here, it means that the node has no CIDR currently assigned. So we set it.
for i := 0; i < cidrUpdateRetries; i++ {
if err = nodeutil.PatchNodeCIDRs(r.client, types.NodeName(node.Name), cidrsString); err == nil {
data.clusterCIDR.AssociatedNodes[node.Name] = true
logger.Info("Set node PodCIDR", "node", klog.KObj(node), "podCIDR", cidrsString)
return nil
}
}
// failed release back to the pool.
logger.Error(err, "Failed to update node PodCIDR after attempts", "node", klog.KObj(node), "podCIDR", cidrsString, "retries", cidrUpdateRetries)
controllerutil.RecordNodeStatusChange(r.recorder, node, "CIDRAssignmentFailed")
// We accept the fact that we may leak CIDRs here. This is safer than releasing
// them in case when we don't know if request went through.
// NodeController restart will return all falsely allocated CIDRs to the pool.
if !apierrors.IsServerTimeout(err) {
logger.Error(err, "CIDR assignment for node failed. Releasing allocated CIDR", "node", klog.KObj(node))
for _, cidr := range data.allocatedCIDRs {
if err := r.Release(logger, data.clusterCIDR, cidr); err != nil {
return fmt.Errorf("failed to release cidr %q from clusterCIDR %q for node: %q: %w", cidr, data.clusterCIDR.Name, node.Name, err)
}
}
}
return err
}(data)
return err
}
// defaultNodeSelector generates a label with defaultClusterCIDRKey as the key and
// defaultClusterCIDRValue as the value, it is an internal nodeSelector matching all
// nodes. Only used if no ClusterCIDR selects the node.
func defaultNodeSelector() *v1.NodeSelector {
return &v1.NodeSelector{
NodeSelectorTerms: []v1.NodeSelectorTerm{
{
MatchExpressions: []v1.NodeSelectorRequirement{
{
Key: defaultClusterCIDRKey,
Operator: v1.NodeSelectorOpIn,
Values: []string{defaultClusterCIDRValue},
},
},
},
},
}
}
// prioritizedCIDRs returns a list of CIDRs to be allocated to the node.
// Returns 1 CIDR if single stack.
// Returns 2 CIDRs , 1 from each ip family if dual stack.
func (r *multiCIDRRangeAllocator) prioritizedCIDRs(logger klog.Logger, node *v1.Node) ([]*net.IPNet, *cidrset.ClusterCIDR, error) {
clusterCIDRList, err := r.orderedMatchingClusterCIDRs(logger, node, true)
if err != nil {
return nil, nil, fmt.Errorf("unable to get a clusterCIDR for node %s: %w", node.Name, err)
}
for _, clusterCIDR := range clusterCIDRList {
cidrs := make([]*net.IPNet, 0)
if clusterCIDR.IPv4CIDRSet != nil {
cidr, err := r.allocateCIDR(clusterCIDR, clusterCIDR.IPv4CIDRSet)
if err != nil {
logger.V(3).Info("Unable to allocate IPv4 CIDR, trying next range", "err", err)
continue
}
cidrs = append(cidrs, cidr)
}
if clusterCIDR.IPv6CIDRSet != nil {
cidr, err := r.allocateCIDR(clusterCIDR, clusterCIDR.IPv6CIDRSet)
if err != nil {
logger.V(3).Info("Unable to allocate IPv6 CIDR, trying next range", "err", err)
continue
}
cidrs = append(cidrs, cidr)
}
return cidrs, clusterCIDR, nil
}
return nil, nil, fmt.Errorf("unable to get a clusterCIDR for node %s, no available CIDRs", node.Name)
}
func (r *multiCIDRRangeAllocator) allocateCIDR(clusterCIDR *cidrset.ClusterCIDR, cidrSet *cidrset.MultiCIDRSet) (*net.IPNet, error) {
for evaluated := 0; evaluated < cidrSet.MaxCIDRs; evaluated++ {
candidate, lastEvaluated, err := cidrSet.NextCandidate()
if err != nil {
return nil, err
}
evaluated += lastEvaluated
if r.cidrInAllocatedList(candidate) {
continue
}
// Deep Check.
if r.cidrOverlapWithAllocatedList(candidate) {
continue
}
// Mark the CIDR as occupied in the map.
if err := r.Occupy(clusterCIDR, candidate); err != nil {
return nil, err
}
// Increment the evaluated count metric.
cidrSet.UpdateEvaluatedCount(evaluated)
return candidate, nil
}
return nil, &cidrset.CIDRRangeNoCIDRsRemainingErr{
CIDR: cidrSet.Label,
}
}
func (r *multiCIDRRangeAllocator) cidrInAllocatedList(cidr *net.IPNet) bool {
for _, clusterCIDRList := range r.cidrMap {
for _, clusterCIDR := range clusterCIDRList {
cidrSet, _ := r.associatedCIDRSet(clusterCIDR, cidr)
if cidrSet != nil {
if ok := cidrSet.AllocatedCIDRMap[cidr.String()]; ok {
return true
}
}
}
}
return false
}
func (r *multiCIDRRangeAllocator) cidrOverlapWithAllocatedList(cidr *net.IPNet) bool {
for _, clusterCIDRList := range r.cidrMap {
for _, clusterCIDR := range clusterCIDRList {
cidrSet, _ := r.associatedCIDRSet(clusterCIDR, cidr)
if cidrSet != nil {
for allocated := range cidrSet.AllocatedCIDRMap {
_, allocatedCIDR, _ := netutil.ParseCIDRSloppy(allocated)
if cidr.Contains(allocatedCIDR.IP.Mask(cidr.Mask)) || allocatedCIDR.Contains(cidr.IP.Mask(allocatedCIDR.Mask)) {
return true
}
}
}
}
}
return false
}
// allocatedClusterCIDR returns the ClusterCIDR from which the node CIDRs were allocated.
func (r *multiCIDRRangeAllocator) allocatedClusterCIDR(logger klog.Logger, node *v1.Node) (*cidrset.ClusterCIDR, error) {
clusterCIDRList, err := r.orderedMatchingClusterCIDRs(logger, node, false)
if err != nil {
return nil, fmt.Errorf("unable to get a clusterCIDR for node %s: %w", node.Name, err)
}
for _, clusterCIDR := range clusterCIDRList {
if ok := clusterCIDR.AssociatedNodes[node.Name]; ok {
return clusterCIDR, nil
}
}
return nil, fmt.Errorf("no clusterCIDR found associated with node: %s", node.Name)
}
// orderedMatchingClusterCIDRs returns a list of all the ClusterCIDRs matching the node labels.
// The list is ordered with the following priority, which act as tie-breakers.
// P0: ClusterCIDR with higher number of matching labels has the highest priority.
// P1: ClusterCIDR having cidrSet with fewer allocatable Pod CIDRs has higher priority.
// P2: ClusterCIDR with a PerNodeMaskSize having fewer IPs has higher priority.
// P3: ClusterCIDR having label with lower alphanumeric value has higher priority.
// P4: ClusterCIDR with a cidrSet having a smaller IP address value has a higher priority.
//
// orderedMatchingClusterCIDRs takes `occupy` as an argument, it determines whether the function
// is called during an occupy or a release operation. For a release operation, a ClusterCIDR must
// be added to the matching ClusterCIDRs list, irrespective of whether the ClusterCIDR is terminating.
func (r *multiCIDRRangeAllocator) orderedMatchingClusterCIDRs(logger klog.Logger, node *v1.Node, occupy bool) ([]*cidrset.ClusterCIDR, error) {
matchingCIDRs := make([]*cidrset.ClusterCIDR, 0)
pq := make(PriorityQueue, 0)
for label, clusterCIDRList := range r.cidrMap {
labelsMatch, matchCnt, err := r.matchCIDRLabels(logger, node, label)
if err != nil {
return nil, err
}
if !labelsMatch {
continue
}
for _, clusterCIDR := range clusterCIDRList {
pqItem := &PriorityQueueItem{
clusterCIDR: clusterCIDR,
labelMatchCount: matchCnt,
selectorString: label,
}
// Only push the CIDRsets which are not marked for termination.
// Always push the CIDRsets when marked for release.
if !occupy || !clusterCIDR.Terminating {
heap.Push(&pq, pqItem)
}
}
}
// Remove the ClusterCIDRs from the PriorityQueue.
// They arrive in descending order of matchCnt,
// if matchCnt is equal it is ordered in ascending order of labels.
for pq.Len() > 0 {
pqItem := heap.Pop(&pq).(*PriorityQueueItem)
matchingCIDRs = append(matchingCIDRs, pqItem.clusterCIDR)
}
// Append the catch all CIDR config.
defaultSelector, err := nodeSelectorAsSelector(defaultNodeSelector())
if err != nil {
return nil, err
}
if clusterCIDRList, ok := r.cidrMap[defaultSelector.String()]; ok {
matchingCIDRs = append(matchingCIDRs, clusterCIDRList...)
}
return matchingCIDRs, nil
}
// matchCIDRLabels Matches the Node labels to CIDR Configs.
// Returns true only if all the labels match, also returns the count of matching labels.
func (r *multiCIDRRangeAllocator) matchCIDRLabels(logger klog.Logger, node *v1.Node, label string) (bool, int, error) {
var labelSet labels.Set
var matchCnt int
labelsMatch := false
ls, err := labels.Parse(label)
if err != nil {
logger.Error(err, "Unable to parse label to labels.Selector", "label", label)
return labelsMatch, 0, err
}
reqs, selectable := ls.Requirements()
labelSet = node.ObjectMeta.Labels
if selectable {
matchCnt = 0
for _, req := range reqs {
if req.Matches(labelSet) {
matchCnt += 1
}
}
if matchCnt == len(reqs) {
labelsMatch = true
}
}
return labelsMatch, matchCnt, nil
}
// Methods for handling ClusterCIDRs.
// createDefaultClusterCIDR creates a default ClusterCIDR if --cluster-cidr has
// been configured. It converts the --cluster-cidr and --per-node-mask-size* flags
// to appropriate ClusterCIDR fields.
func createDefaultClusterCIDR(logger klog.Logger, existingConfigList *networkingv1alpha1.ClusterCIDRList,
allocatorParams CIDRAllocatorParams) {
// Create default ClusterCIDR only if --cluster-cidr has been configured
if len(allocatorParams.ClusterCIDRs) == 0 {
return
}
for _, clusterCIDR := range existingConfigList.Items {
if clusterCIDR.Name == defaultClusterCIDRName {
// Default ClusterCIDR already exists, no further action required.
logger.V(3).Info("Default ClusterCIDR already exists", "defaultClusterCIDRName", defaultClusterCIDRName)
return
}
}
// Create a default ClusterCIDR as it is not already created.
defaultCIDRConfig := &networkingv1alpha1.ClusterCIDR{
TypeMeta: metav1.TypeMeta{
APIVersion: defaultClusterCIDRAPIVersion,
Kind: "ClusterCIDR",
},
ObjectMeta: metav1.ObjectMeta{
Name: defaultClusterCIDRName,
},
Spec: networkingv1alpha1.ClusterCIDRSpec{
PerNodeHostBits: minPerNodeHostBits,
},
}
ipv4PerNodeHostBits := int32(math.MinInt32)
ipv6PerNodeHostBits := int32(math.MinInt32)
isDualstack := false
if len(allocatorParams.ClusterCIDRs) == 2 {
isDualstack = true
}
for i, cidr := range allocatorParams.ClusterCIDRs {
if netutil.IsIPv4CIDR(cidr) {
defaultCIDRConfig.Spec.IPv4 = cidr.String()
ipv4PerNodeHostBits = ipv4MaxCIDRMask - int32(allocatorParams.NodeCIDRMaskSizes[i])
if !isDualstack && ipv4PerNodeHostBits > minPerNodeHostBits {
defaultCIDRConfig.Spec.PerNodeHostBits = ipv4PerNodeHostBits
}
} else if netutil.IsIPv6CIDR(cidr) {
defaultCIDRConfig.Spec.IPv6 = cidr.String()
ipv6PerNodeHostBits = ipv6MaxCIDRMask - int32(allocatorParams.NodeCIDRMaskSizes[i])
if !isDualstack && ipv6PerNodeHostBits > minPerNodeHostBits {
defaultCIDRConfig.Spec.PerNodeHostBits = ipv6PerNodeHostBits
}
}
}
if isDualstack {
// In case of dualstack CIDRs, currently the default values for PerNodeMaskSize are
// 24 for IPv4 (PerNodeHostBits=8) and 64 for IPv6(PerNodeHostBits=64), there is no
// requirement for the PerNodeHostBits to be equal for IPv4 and IPv6, However with
// the introduction of ClusterCIDRs, we enforce the requirement for a single
// PerNodeHostBits field, thus we choose the minimum PerNodeHostBits value, to avoid
// overflow for IPv4 CIDRs.
if ipv4PerNodeHostBits >= minPerNodeHostBits && ipv4PerNodeHostBits <= ipv6PerNodeHostBits {
defaultCIDRConfig.Spec.PerNodeHostBits = ipv4PerNodeHostBits
} else if ipv6PerNodeHostBits >= minPerNodeHostBits && ipv6PerNodeHostBits <= ipv4MaxCIDRMask {
defaultCIDRConfig.Spec.PerNodeHostBits = ipv6PerNodeHostBits
}
}
existingConfigList.Items = append(existingConfigList.Items, *defaultCIDRConfig)
return
}
// reconcileCreate handles create ClusterCIDR events.
func (r *multiCIDRRangeAllocator) reconcileCreate(ctx context.Context, clusterCIDR *networkingv1alpha1.ClusterCIDR) error {
r.lock.Lock()
defer r.lock.Unlock()
logger := klog.FromContext(ctx)
if needToAddFinalizer(clusterCIDR, clusterCIDRFinalizer) {
logger.V(3).Info("Creating ClusterCIDR", "clusterCIDR", clusterCIDR.Name)
if err := r.createClusterCIDR(ctx, clusterCIDR, false); err != nil {
logger.Error(err, "Unable to create ClusterCIDR", "clusterCIDR", clusterCIDR.Name)
return err
}
}
return nil
}
// reconcileBootstrap handles creation of existing ClusterCIDRs.
// adds a finalizer if not already present.
func (r *multiCIDRRangeAllocator) reconcileBootstrap(ctx context.Context, clusterCIDR *networkingv1alpha1.ClusterCIDR) error {
r.lock.Lock()
defer r.lock.Unlock()
logger := klog.FromContext(ctx)
terminating := false
// Create the ClusterCIDR only if the Spec has not been modified.
if clusterCIDR.Generation > 1 {
terminating = true
err := fmt.Errorf("CIDRs from ClusterCIDR %s will not be used for allocation as it was modified", clusterCIDR.Name)
logger.Error(err, "ClusterCIDR Modified")
}
logger.V(2).Info("Creating ClusterCIDR during bootstrap", "clusterCIDR", clusterCIDR.Name)
if err := r.createClusterCIDR(ctx, clusterCIDR, terminating); err != nil {
logger.Error(err, "Unable to create ClusterCIDR", "clusterCIDR", clusterCIDR.Name)
return err
}
return nil
}
// createClusterCIDR creates and maps the cidrSets in the cidrMap.
func (r *multiCIDRRangeAllocator) createClusterCIDR(ctx context.Context, clusterCIDR *networkingv1alpha1.ClusterCIDR, terminating bool) error {
nodeSelector, err := r.nodeSelectorKey(clusterCIDR)
if err != nil {
return fmt.Errorf("unable to get labelSelector key: %w", err)
}
clusterCIDRSet, err := r.createClusterCIDRSet(clusterCIDR, terminating)
if err != nil {
return fmt.Errorf("invalid ClusterCIDR: %w", err)
}
if clusterCIDRSet.IPv4CIDRSet == nil && clusterCIDRSet.IPv6CIDRSet == nil {
return errors.New("invalid ClusterCIDR: must provide IPv4 and/or IPv6 config")
}
if err := r.mapClusterCIDRSet(r.cidrMap, nodeSelector, clusterCIDRSet); err != nil {
return fmt.Errorf("unable to map clusterCIDRSet: %w", err)
}
// Make a copy so we don't mutate the shared informer cache.
updatedClusterCIDR := clusterCIDR.DeepCopy()
if needToAddFinalizer(clusterCIDR, clusterCIDRFinalizer) {
updatedClusterCIDR.ObjectMeta.Finalizers = append(clusterCIDR.ObjectMeta.Finalizers, clusterCIDRFinalizer)
}
logger := klog.FromContext(ctx)
if updatedClusterCIDR.ResourceVersion == "" {
// Create is only used for creating default ClusterCIDR.
if _, err := r.client.NetworkingV1alpha1().ClusterCIDRs().Create(ctx, updatedClusterCIDR, metav1.CreateOptions{}); err != nil {
logger.V(2).Info("Error creating ClusterCIDR", "clusterCIDR", klog.KObj(clusterCIDR), "err", err)
return err
}
} else {
// Update the ClusterCIDR object when called from reconcileCreate.
if _, err := r.client.NetworkingV1alpha1().ClusterCIDRs().Update(ctx, updatedClusterCIDR, metav1.UpdateOptions{}); err != nil {
logger.V(2).Info("Error creating ClusterCIDR", "clusterCIDR", clusterCIDR.Name, "err", err)
return err
}
}
return nil
}
// createClusterCIDRSet creates and returns new cidrset.ClusterCIDR based on ClusterCIDR API object.
func (r *multiCIDRRangeAllocator) createClusterCIDRSet(clusterCIDR *networkingv1alpha1.ClusterCIDR, terminating bool) (*cidrset.ClusterCIDR, error) {
clusterCIDRSet := &cidrset.ClusterCIDR{
Name: clusterCIDR.Name,
AssociatedNodes: make(map[string]bool, 0),
Terminating: terminating,
}
if clusterCIDR.Spec.IPv4 != "" {
_, ipv4CIDR, err := netutil.ParseCIDRSloppy(clusterCIDR.Spec.IPv4)
if err != nil {
return nil, fmt.Errorf("unable to parse provided IPv4 CIDR: %w", err)
}
clusterCIDRSet.IPv4CIDRSet, err = cidrset.NewMultiCIDRSet(ipv4CIDR, int(clusterCIDR.Spec.PerNodeHostBits))
if err != nil {
return nil, fmt.Errorf("unable to create IPv4 cidrSet: %w", err)
}
}
if clusterCIDR.Spec.IPv6 != "" {
_, ipv6CIDR, err := netutil.ParseCIDRSloppy(clusterCIDR.Spec.IPv6)
if err != nil {
return nil, fmt.Errorf("unable to parse provided IPv6 CIDR: %w", err)
}
clusterCIDRSet.IPv6CIDRSet, err = cidrset.NewMultiCIDRSet(ipv6CIDR, int(clusterCIDR.Spec.PerNodeHostBits))
if err != nil {
return nil, fmt.Errorf("unable to create IPv6 cidrSet: %w", err)
}
}
return clusterCIDRSet, nil
}
// mapClusterCIDRSet maps the ClusterCIDRSet to the provided labelSelector in the cidrMap.
func (r *multiCIDRRangeAllocator) mapClusterCIDRSet(cidrMap map[string][]*cidrset.ClusterCIDR, nodeSelector string, clusterCIDRSet *cidrset.ClusterCIDR) error {
if clusterCIDRSet == nil {
return errors.New("invalid clusterCIDRSet, clusterCIDRSet cannot be nil")
}
if clusterCIDRSetList, ok := cidrMap[nodeSelector]; ok {
cidrMap[nodeSelector] = append(clusterCIDRSetList, clusterCIDRSet)
} else {
cidrMap[nodeSelector] = []*cidrset.ClusterCIDR{clusterCIDRSet}
}
return nil
}
// reconcileDelete releases the assigned ClusterCIDR and removes the finalizer
// if the deletion timestamp is set.
func (r *multiCIDRRangeAllocator) reconcileDelete(ctx context.Context, clusterCIDR *networkingv1alpha1.ClusterCIDR) error {
r.lock.Lock()
defer r.lock.Unlock()
logger := klog.FromContext(ctx)
if slice.ContainsString(clusterCIDR.GetFinalizers(), clusterCIDRFinalizer, nil) {
logger.V(2).Info("Releasing ClusterCIDR", "clusterCIDR", clusterCIDR.Name)
if err := r.deleteClusterCIDR(logger, clusterCIDR); err != nil {
klog.V(2).Info("Error while deleting ClusterCIDR", "err", err)
return err
}
// Remove the finalizer as delete is successful.
cccCopy := clusterCIDR.DeepCopy()
cccCopy.ObjectMeta.Finalizers = slice.RemoveString(cccCopy.ObjectMeta.Finalizers, clusterCIDRFinalizer, nil)
if _, err := r.client.NetworkingV1alpha1().ClusterCIDRs().Update(ctx, cccCopy, metav1.UpdateOptions{}); err != nil {
logger.V(2).Info("Error removing finalizer for ClusterCIDR", "clusterCIDR", clusterCIDR.Name, "err", err)
return err
}
logger.V(2).Info("Removed finalizer for ClusterCIDR", "clusterCIDR", clusterCIDR.Name)
}
return nil
}
// deleteClusterCIDR Deletes and unmaps the ClusterCIDRs from the cidrMap.
func (r *multiCIDRRangeAllocator) deleteClusterCIDR(logger klog.Logger, clusterCIDR *networkingv1alpha1.ClusterCIDR) error {
labelSelector, err := r.nodeSelectorKey(clusterCIDR)
if err != nil {
return fmt.Errorf("unable to delete cidr: %w", err)
}
clusterCIDRSetList, ok := r.cidrMap[labelSelector]
if !ok {
logger.Info("Label not found in CIDRMap, proceeding with delete", "labelSelector", labelSelector)
return nil
}
for i, clusterCIDRSet := range clusterCIDRSetList {
if clusterCIDRSet.Name != clusterCIDR.Name {
continue
}
// Mark clusterCIDRSet as terminating.
clusterCIDRSet.Terminating = true
// Allow deletion only if no nodes are associated with the ClusterCIDR.
if len(clusterCIDRSet.AssociatedNodes) > 0 {
return fmt.Errorf("ClusterCIDRSet %s marked as terminating, won't be deleted until all associated nodes are deleted", clusterCIDR.Name)
}
// Remove the label from the map if this was the only clusterCIDR associated
// with it.
if len(clusterCIDRSetList) == 1 {
delete(r.cidrMap, labelSelector)
return nil
}
clusterCIDRSetList = append(clusterCIDRSetList[:i], clusterCIDRSetList[i+1:]...)
r.cidrMap[labelSelector] = clusterCIDRSetList
return nil
}
logger.V(2).Info("clusterCIDR not found, proceeding with delete", "clusterCIDR", clusterCIDR.Name, "label", labelSelector)
return nil
}
func (r *multiCIDRRangeAllocator) nodeSelectorKey(clusterCIDR *networkingv1alpha1.ClusterCIDR) (string, error) {
var nodeSelector labels.Selector
var err error
if clusterCIDR.Spec.NodeSelector != nil {
nodeSelector, err = nodeSelectorAsSelector(clusterCIDR.Spec.NodeSelector)
} else {
nodeSelector, err = nodeSelectorAsSelector(defaultNodeSelector())
}
if err != nil {
return "", err
}
return nodeSelector.String(), nil
}
func listClusterCIDRs(ctx context.Context, kubeClient clientset.Interface) (*networkingv1alpha1.ClusterCIDRList, error) {
var clusterCIDRList *networkingv1alpha1.ClusterCIDRList
// We must poll because apiserver might not be up. This error causes
// controller manager to restart.
startTimestamp := time.Now()
// start with 2s, multiply the duration by 1.6 each step, 11 steps = 9.7 minutes
backoff := wait.Backoff{
Duration: 2 * time.Second,
Factor: 1.6,
Steps: 11,
}
logger := klog.FromContext(ctx)
if pollErr := wait.ExponentialBackoff(backoff, func() (bool, error) {
var err error
clusterCIDRList, err = kubeClient.NetworkingV1alpha1().ClusterCIDRs().List(ctx, metav1.ListOptions{
FieldSelector: fields.Everything().String(),
LabelSelector: labels.Everything().String(),
})
if err != nil {
logger.Error(err, "Failed to list all clusterCIDRs")
return false, nil
}
return true, nil
}); pollErr != nil {
logger.Error(nil, "Failed to list clusterCIDRs", "latency", time.Now().Sub(startTimestamp))
return nil, fmt.Errorf("failed to list all clusterCIDRs in %v, cannot proceed without updating CIDR map",
apiserverStartupGracePeriod)
}
return clusterCIDRList, nil
}
// nodeSelectorRequirementsAsLabelRequirements converts the NodeSelectorRequirement
// type to a labels.Requirement type.
func nodeSelectorRequirementsAsLabelRequirements(nsr v1.NodeSelectorRequirement) (*labels.Requirement, error) {
var op selection.Operator
switch nsr.Operator {
case v1.NodeSelectorOpIn:
op = selection.In
case v1.NodeSelectorOpNotIn:
op = selection.NotIn
case v1.NodeSelectorOpExists:
op = selection.Exists
case v1.NodeSelectorOpDoesNotExist:
op = selection.DoesNotExist
case v1.NodeSelectorOpGt:
op = selection.GreaterThan
case v1.NodeSelectorOpLt:
op = selection.LessThan
default:
return nil, fmt.Errorf("%q is not a valid node selector operator", nsr.Operator)
}
return labels.NewRequirement(nsr.Key, op, nsr.Values)
}
// TODO: nodeSelect and labelSelector semantics are different and the function
// doesn't translate them correctly, this has to be fixed before Beta
// xref: https://issues.k8s.io/116419
// nodeSelectorAsSelector converts the NodeSelector api type into a struct that
// implements labels.Selector
// Note: This function should be kept in sync with the selector methods in
// pkg/labels/selector.go
func nodeSelectorAsSelector(ns *v1.NodeSelector) (labels.Selector, error) {
if ns == nil {
return labels.Nothing(), nil
}
if len(ns.NodeSelectorTerms) == 0 {
return labels.Everything(), nil
}
var requirements []labels.Requirement
for _, nsTerm := range ns.NodeSelectorTerms {
for _, expr := range nsTerm.MatchExpressions {
req, err := nodeSelectorRequirementsAsLabelRequirements(expr)
if err != nil {
return nil, err
}
requirements = append(requirements, *req)
}
for _, field := range nsTerm.MatchFields {
req, err := nodeSelectorRequirementsAsLabelRequirements(field)
if err != nil {
return nil, err
}
requirements = append(requirements, *req)
}
}
selector := labels.NewSelector()
selector = selector.Add(requirements...)
return selector, nil
}