Merge pull request #109090 from sarveshr7/multicidr-rangeallocator

Enhance NodeIPAM to support multiple ClusterCIDRs
This commit is contained in:
Kubernetes Prow Robot
2022-08-07 15:40:18 -07:00
committed by GitHub
110 changed files with 13336 additions and 79 deletions

View File

@@ -22,16 +22,18 @@ import (
"net"
"time"
"k8s.io/klog/v2"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
informers "k8s.io/client-go/informers/core/v1"
networkinginformers "k8s.io/client-go/informers/networking/v1alpha1"
clientset "k8s.io/client-go/kubernetes"
cloudprovider "k8s.io/cloud-provider"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/features"
)
// CIDRAllocatorType is the type of the allocator to use.
@@ -41,6 +43,9 @@ const (
// RangeAllocatorType is the allocator that uses an internal CIDR
// range allocator to do node CIDR range allocations.
RangeAllocatorType CIDRAllocatorType = "RangeAllocator"
// MultiCIDRRangeAllocatorType is the allocator that uses an internal CIDR
// range allocator to do node CIDR range allocations.
MultiCIDRRangeAllocatorType CIDRAllocatorType = "MultiCIDRRangeAllocator"
// CloudAllocatorType is the allocator that uses cloud platform
// support to do node CIDR range allocations.
CloudAllocatorType CIDRAllocatorType = "CloudAllocator"
@@ -87,7 +92,7 @@ type CIDRAllocator interface {
// CIDR if it doesn't currently have one or mark the CIDR as used if
// the node already have one.
AllocateOrOccupyCIDR(node *v1.Node) error
// ReleaseCIDR releases the CIDR of the removed node
// ReleaseCIDR releases the CIDR of the removed node.
ReleaseCIDR(node *v1.Node) error
// Run starts all the working logic of the allocator.
Run(stopCh <-chan struct{})
@@ -96,18 +101,25 @@ type CIDRAllocator interface {
// CIDRAllocatorParams is parameters that's required for creating new
// cidr range allocator.
type CIDRAllocatorParams struct {
// ClusterCIDRs is list of cluster cidrs
// ClusterCIDRs is list of cluster cidrs.
ClusterCIDRs []*net.IPNet
// ServiceCIDR is primary service cidr for cluster
// ServiceCIDR is primary service cidr for cluster.
ServiceCIDR *net.IPNet
// SecondaryServiceCIDR is secondary service cidr for cluster
// SecondaryServiceCIDR is secondary service cidr for cluster.
SecondaryServiceCIDR *net.IPNet
// NodeCIDRMaskSizes is list of node cidr mask sizes
// NodeCIDRMaskSizes is list of node cidr mask sizes.
NodeCIDRMaskSizes []int
}
// CIDRs are reserved, then node resource is patched with them.
// nodeReservedCIDRs holds the reservation info for a node.
type nodeReservedCIDRs struct {
allocatedCIDRs []*net.IPNet
nodeName string
}
// New creates a new CIDR range allocator.
func New(kubeClient clientset.Interface, cloud cloudprovider.Interface, nodeInformer informers.NodeInformer, allocatorType CIDRAllocatorType, allocatorParams CIDRAllocatorParams) (CIDRAllocator, error) {
func New(kubeClient clientset.Interface, cloud cloudprovider.Interface, nodeInformer informers.NodeInformer, clusterCIDRInformer networkinginformers.ClusterCIDRInformer, allocatorType CIDRAllocatorType, allocatorParams CIDRAllocatorParams) (CIDRAllocator, error) {
nodeList, err := listNodes(kubeClient)
if err != nil {
return nil, err
@@ -116,6 +128,12 @@ func New(kubeClient clientset.Interface, cloud cloudprovider.Interface, nodeInfo
switch allocatorType {
case RangeAllocatorType:
return NewCIDRRangeAllocator(kubeClient, nodeInformer, allocatorParams, nodeList)
case MultiCIDRRangeAllocatorType:
if !utilfeature.DefaultFeatureGate.Enabled(features.MultiCIDRRangeAllocator) {
return nil, fmt.Errorf("invalid CIDR allocator type: %v, feature gate %v must be enabled", allocatorType, features.MultiCIDRRangeAllocator)
}
return NewMultiCIDRRangeAllocator(kubeClient, nodeInformer, clusterCIDRInformer, allocatorParams, nodeList, nil)
case CloudAllocatorType:
return NewCloudCIDRAllocator(kubeClient, cloud, nodeInformer)
default:
@@ -144,3 +162,12 @@ func listNodes(kubeClient clientset.Interface) (*v1.NodeList, error) {
}
return nodeList, nil
}
// ipnetToStringList converts a slice of net.IPNet into a list of CIDR in string format
func ipnetToStringList(inCIDRs []*net.IPNet) []string {
outCIDRs := make([]string, len(inCIDRs))
for idx, inCIDR := range inCIDRs {
outCIDRs[idx] = inCIDR.String()
}
return outCIDRs
}

View File

@@ -0,0 +1,140 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package ipam
import (
"math"
cidrset "k8s.io/kubernetes/pkg/controller/nodeipam/ipam/multicidrset"
)
// A PriorityQueue implementation based on https://pkg.go.dev/container/heap#example-package-PriorityQueue
// An PriorityQueueItem is something we manage in a priority queue.
type PriorityQueueItem struct {
clusterCIDR *cidrset.ClusterCIDR
// labelMatchCount is the first determinant of priority.
labelMatchCount int
// selectorString is a string representation of the labelSelector associated with the cidrSet.
selectorString string
// index is needed by update and is maintained by the heap.Interface methods.
index int // The index of the item in the heap.
}
// A PriorityQueue implements heap.Interface and holds PriorityQueueItems.
type PriorityQueue []*PriorityQueueItem
func (pq PriorityQueue) Len() int { return len(pq) }
// Less compares the priority queue items, to store in a min heap.
// Less(i,j) == true denotes i has higher priority than j.
func (pq PriorityQueue) Less(i, j int) bool {
if pq[i].labelMatchCount != pq[j].labelMatchCount {
// P0: CidrSet with higher number of matching labels has the highest priority.
return pq[i].labelMatchCount > pq[j].labelMatchCount
}
// If the count of matching labels is equal, compare the max allocatable pod CIDRs.
if pq[i].maxAllocatable() != pq[j].maxAllocatable() {
// P1: CidrSet with fewer allocatable pod CIDRs has higher priority.
return pq[i].maxAllocatable() < pq[j].maxAllocatable()
}
// If the value of allocatable pod CIDRs is equal, compare the node mask size.
if pq[i].nodeMaskSize() != pq[j].nodeMaskSize() {
// P2: CidrSet with a PerNodeMaskSize having fewer IPs has higher priority.
// For example, `27` (32 IPs) picked before `25` (128 IPs).
return pq[i].nodeMaskSize() > pq[j].nodeMaskSize()
}
// If the per node mask size are equal compare the CIDR labels.
if pq[i].selectorString != pq[j].selectorString {
// P3: CidrSet having label with lower alphanumeric value has higher priority.
return pq[i].selectorString < pq[j].selectorString
}
// P4: CidrSet having an alpha-numerically smaller IP address value has a higher priority.
return pq[i].cidrLabel() < pq[j].cidrLabel()
}
func (pq PriorityQueue) Swap(i, j int) {
pq[i], pq[j] = pq[j], pq[i]
pq[i].index = i
pq[j].index = j
}
func (pq *PriorityQueue) Push(x interface{}) {
n := len(*pq)
if item, ok := x.(*PriorityQueueItem); ok {
item.index = n
*pq = append(*pq, item)
}
}
func (pq *PriorityQueue) Pop() interface{} {
old := *pq
n := len(old)
item := old[n-1]
old[n-1] = nil // avoid memory leak.
item.index = -1 // for safety.
*pq = old[0 : n-1]
return item
}
// maxAllocatable computes the minimum value of the MaxCIDRs for a ClusterCIDR.
// It compares the MaxCIDRs for each CIDR family and returns the minimum.
// e.g. IPv4 - 10.0.0.0/16 PerNodeMaskSize: 24 MaxCIDRs = 256
// IPv6 - ff:ff::/120 PerNodeMaskSize: 120 MaxCIDRs = 1
// MaxAllocatable for this ClusterCIDR = 1
func (pqi *PriorityQueueItem) maxAllocatable() int {
ipv4Allocatable := math.MaxInt
ipv6Allocatable := math.MaxInt
if pqi.clusterCIDR.IPv4CIDRSet != nil {
ipv4Allocatable = pqi.clusterCIDR.IPv4CIDRSet.MaxCIDRs
}
if pqi.clusterCIDR.IPv6CIDRSet != nil {
ipv6Allocatable = pqi.clusterCIDR.IPv6CIDRSet.MaxCIDRs
}
if ipv4Allocatable < ipv6Allocatable {
return ipv4Allocatable
}
return ipv6Allocatable
}
// nodeMaskSize returns IPv4 NodeMaskSize if present, else returns IPv6 NodeMaskSize.
// Note the requirement: 32 - IPv4 NodeMaskSize == 128 - IPv6 NodeMaskSize
// Due to the above requirement it does not matter which NodeMaskSize we compare.
func (pqi *PriorityQueueItem) nodeMaskSize() int {
if pqi.clusterCIDR.IPv4CIDRSet != nil {
return pqi.clusterCIDR.IPv4CIDRSet.NodeMaskSize
}
return pqi.clusterCIDR.IPv6CIDRSet.NodeMaskSize
}
// cidrLabel returns IPv4 CIDR if present, else returns IPv6 CIDR.
func (pqi *PriorityQueueItem) cidrLabel() string {
if pqi.clusterCIDR.IPv4CIDRSet != nil {
return pqi.clusterCIDR.IPv4CIDRSet.Label
}
return pqi.clusterCIDR.IPv6CIDRSet.Label
}

View File

@@ -0,0 +1,170 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package ipam
import (
"container/heap"
"testing"
"k8s.io/kubernetes/pkg/controller/nodeipam/ipam/multicidrset"
utilnet "k8s.io/utils/net"
)
func createTestPriorityQueueItem(name, cidr, selectorString string, labelMatchCount, perNodeHostBits int) *PriorityQueueItem {
_, clusterCIDR, _ := utilnet.ParseCIDRSloppy(cidr)
cidrSet, _ := multicidrset.NewMultiCIDRSet(clusterCIDR, perNodeHostBits)
return &PriorityQueueItem{
clusterCIDR: &multicidrset.ClusterCIDR{
Name: name,
IPv4CIDRSet: cidrSet,
},
labelMatchCount: labelMatchCount,
selectorString: selectorString,
}
}
func TestPriorityQueue(t *testing.T) {
pqi1 := createTestPriorityQueueItem("cidr1", "192.168.0.0/16", "foo=bar,name=test1", 1, 8)
pqi2 := createTestPriorityQueueItem("cidr2", "10.1.0.0/24", "foo=bar,name=test2", 2, 8)
pqi3 := createTestPriorityQueueItem("cidr3", "172.16.0.0/16", "foo=bar,name=test3", 2, 8)
pqi4 := createTestPriorityQueueItem("cidr4", "10.1.1.0/26", "abc=bar,name=test4", 2, 6)
pqi5 := createTestPriorityQueueItem("cidr5", "10.1.2.0/26", "foo=bar,name=test5", 2, 6)
pqi6 := createTestPriorityQueueItem("cidr6", "10.1.3.0/26", "abc=bar,name=test4", 2, 6)
for _, testQueue := range []struct {
name string
items []*PriorityQueueItem
want *PriorityQueueItem
}{
{"Test queue with single item", []*PriorityQueueItem{pqi1}, pqi1},
{"Test queue with items having different labelMatchCount", []*PriorityQueueItem{pqi1, pqi2}, pqi2},
{"Test queue with items having same labelMatchCount, different max Allocatable Pod CIDRs", []*PriorityQueueItem{pqi1, pqi2, pqi3}, pqi2},
{"Test queue with items having same labelMatchCount, max Allocatable Pod CIDRs, different PerNodeMaskSize", []*PriorityQueueItem{pqi1, pqi2, pqi4}, pqi4},
{"Test queue with items having same labelMatchCount, max Allocatable Pod CIDRs, PerNodeMaskSize, different labels", []*PriorityQueueItem{pqi1, pqi2, pqi4, pqi5}, pqi4},
{"Test queue with items having same labelMatchCount, max Allocatable Pod CIDRs, PerNodeMaskSize, labels, different IP addresses", []*PriorityQueueItem{pqi1, pqi2, pqi4, pqi5, pqi6}, pqi4},
} {
pq := make(PriorityQueue, 0)
for _, pqi := range testQueue.items {
heap.Push(&pq, pqi)
}
got := heap.Pop(&pq)
if got != testQueue.want {
t.Errorf("Error, wanted: %+v, got: %+v", testQueue.want, got)
}
}
}
func TestLess(t *testing.T) {
for _, testQueue := range []struct {
name string
items []*PriorityQueueItem
want bool
}{
{
name: "different labelMatchCount, i higher priority than j",
items: []*PriorityQueueItem{
createTestPriorityQueueItem("cidr1", "192.168.0.0/16", "foo=bar,name=test1", 2, 8),
createTestPriorityQueueItem("cidr2", "10.1.0.0/24", "foo=bar,name=test2", 1, 8),
},
want: true,
},
{
name: "different labelMatchCount, i lower priority than j",
items: []*PriorityQueueItem{
createTestPriorityQueueItem("cidr1", "192.168.0.0/16", "foo=bar,name=test1", 1, 8),
createTestPriorityQueueItem("cidr2", "10.1.0.0/24", "foo=bar,name=test2", 2, 8),
},
want: false,
},
{
name: "same labelMatchCount, different max allocatable cidrs, i higher priority than j",
items: []*PriorityQueueItem{
createTestPriorityQueueItem("cidr2", "10.1.0.0/24", "foo=bar,name=test2", 2, 8),
createTestPriorityQueueItem("cidr3", "172.16.0.0/16", "foo=bar,name=test3", 2, 8),
},
want: true,
},
{
name: "same labelMatchCount, different max allocatable cidrs, i lower priority than j",
items: []*PriorityQueueItem{
createTestPriorityQueueItem("cidr2", "10.1.0.0/16", "foo=bar,name=test2", 2, 8),
createTestPriorityQueueItem("cidr3", "172.16.0.0/24", "foo=bar,name=test3", 2, 8),
},
want: false,
},
{
name: "same labelMatchCount, max allocatable cidrs, different PerNodeMaskSize i higher priority than j",
items: []*PriorityQueueItem{
createTestPriorityQueueItem("cidr2", "10.1.0.0/26", "foo=bar,name=test2", 2, 6),
createTestPriorityQueueItem("cidr4", "10.1.1.0/24", "abc=bar,name=test4", 2, 8),
},
want: true,
},
{
name: "same labelMatchCount, max allocatable cidrs, different PerNodeMaskSize i lower priority than j",
items: []*PriorityQueueItem{
createTestPriorityQueueItem("cidr2", "10.1.0.0/24", "foo=bar,name=test2", 2, 8),
createTestPriorityQueueItem("cidr4", "10.1.1.0/26", "abc=bar,name=test4", 2, 6),
},
want: false,
},
{
name: "same labelMatchCount, max Allocatable Pod CIDRs, PerNodeMaskSize, different labels i higher priority than j",
items: []*PriorityQueueItem{
createTestPriorityQueueItem("cidr4", "10.1.1.0/26", "abc=bar,name=test4", 2, 6),
createTestPriorityQueueItem("cidr5", "10.1.2.0/26", "foo=bar,name=test5", 2, 6),
},
want: true,
},
{
name: "same labelMatchCount, max Allocatable Pod CIDRs, PerNodeMaskSize, different labels i lower priority than j",
items: []*PriorityQueueItem{
createTestPriorityQueueItem("cidr4", "10.1.1.0/26", "xyz=bar,name=test4", 2, 6),
createTestPriorityQueueItem("cidr5", "10.1.2.0/26", "foo=bar,name=test5", 2, 6),
},
want: false,
},
{
name: "same labelMatchCount, max Allocatable Pod CIDRs, PerNodeMaskSize, labels, different IP addresses i higher priority than j",
items: []*PriorityQueueItem{
createTestPriorityQueueItem("cidr4", "10.1.1.0/26", "abc=bar,name=test4", 2, 6),
createTestPriorityQueueItem("cidr6", "10.1.3.0/26", "abc=bar,name=test4", 2, 6),
},
want: true,
},
{
name: "same labelMatchCount, max Allocatable Pod CIDRs, PerNodeMaskSize, labels, different IP addresses i lower priority than j",
items: []*PriorityQueueItem{
createTestPriorityQueueItem("cidr4", "10.1.1.0/26", "xyz=bar,name=test4", 2, 6),
createTestPriorityQueueItem("cidr6", "10.0.3.0/26", "abc=bar,name=test4", 2, 6),
},
want: false,
},
} {
var pq PriorityQueue
pq = testQueue.items
got := pq.Less(0, 1)
if got != testQueue.want {
t.Errorf("Error, wanted: %v, got: %v\nTest %q \npq[0]: %+v \npq[1]: %+v ", testQueue.want, got, testQueue.name, pq[0], pq[1])
}
}
}

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,78 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package multicidrset
import (
"sync"
"k8s.io/component-base/metrics"
"k8s.io/component-base/metrics/legacyregistry"
)
const nodeIpamSubsystem = "node_ipam_controller"
var (
cidrSetAllocations = metrics.NewCounterVec(
&metrics.CounterOpts{
Subsystem: nodeIpamSubsystem,
Name: "multicidrset_cidrs_allocations_total",
Help: "Counter measuring total number of CIDR allocations.",
StabilityLevel: metrics.ALPHA,
},
[]string{"clusterCIDR"},
)
cidrSetReleases = metrics.NewCounterVec(
&metrics.CounterOpts{
Subsystem: nodeIpamSubsystem,
Name: "multicidrset_cidrs_releases_total",
Help: "Counter measuring total number of CIDR releases.",
StabilityLevel: metrics.ALPHA,
},
[]string{"clusterCIDR"},
)
cidrSetUsage = metrics.NewGaugeVec(
&metrics.GaugeOpts{
Subsystem: nodeIpamSubsystem,
Name: "multicidrset_usage_cidrs",
Help: "Gauge measuring percentage of allocated CIDRs.",
StabilityLevel: metrics.ALPHA,
},
[]string{"clusterCIDR"},
)
cidrSetAllocationTriesPerRequest = metrics.NewHistogramVec(
&metrics.HistogramOpts{
Subsystem: nodeIpamSubsystem,
Name: "multicidrset_allocation_tries_per_request",
Help: "Histogram measuring CIDR allocation tries per request.",
StabilityLevel: metrics.ALPHA,
Buckets: metrics.ExponentialBuckets(1, 5, 5),
},
[]string{"clusterCIDR"},
)
)
var registerMetrics sync.Once
// registerCidrsetMetrics the metrics that are to be monitored.
func registerCidrsetMetrics() {
registerMetrics.Do(func() {
legacyregistry.MustRegister(cidrSetAllocations)
legacyregistry.MustRegister(cidrSetReleases)
legacyregistry.MustRegister(cidrSetUsage)
legacyregistry.MustRegister(cidrSetAllocationTriesPerRequest)
})
}

View File

@@ -0,0 +1,361 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package multicidrset
import (
"encoding/binary"
"fmt"
"math/big"
"math/bits"
"net"
"sync"
netutils "k8s.io/utils/net"
)
// MultiCIDRSet manages a set of CIDR ranges from which blocks of IPs can
// be allocated from.
type MultiCIDRSet struct {
sync.Mutex
// ClusterCIDR is the CIDR assigned to the cluster.
ClusterCIDR *net.IPNet
// NodeMaskSize is the mask size, in bits,assigned to the nodes
// caches the mask size to avoid the penalty of calling nodeMask.Size().
NodeMaskSize int
// MaxCIDRs is the maximum number of CIDRs that can be allocated.
MaxCIDRs int
// Label stores the CIDR in a string, it is used to identify the metrics such
// as Number of allocations, Total number of CIDR releases, Percentage of
// allocated CIDRs, Tries required for allocating a CIDR for a particular CIDRSet.
Label string
// AllocatedCIDRMap stores all the allocated CIDRs from the current CIDRSet.
// Stores a mapping of the next candidate CIDR for allocation to it's
// allocation status. Next candidate is used only if allocation status is false.
AllocatedCIDRMap map[string]bool
// clusterMaskSize is the mask size, in bits, assigned to the cluster.
// caches the mask size to avoid the penalty of calling clusterCIDR.Mask.Size().
clusterMaskSize int
// nodeMask is the network mask assigned to the nodes.
nodeMask net.IPMask
// allocatedCIDRs counts the number of CIDRs allocated.
allocatedCIDRs int
// nextCandidate points to the next CIDR that should be free.
nextCandidate int
}
// ClusterCIDR is an internal representation of the ClusterCIDR API object.
type ClusterCIDR struct {
// Name of the associated ClusterCIDR API object.
Name string
// IPv4CIDRSet is the MultiCIDRSet representation of ClusterCIDR.spec.ipv4
// of the associated ClusterCIDR API object.
IPv4CIDRSet *MultiCIDRSet
// IPv6CIDRSet is the MultiCIDRSet representation of ClusterCIDR.spec.ipv6
// of the associated ClusterCIDR API object.
IPv6CIDRSet *MultiCIDRSet
// AssociatedNodes is used to identify which nodes have CIDRs allocated from this ClusterCIDR.
// Stores a mapping of node name to association status.
AssociatedNodes map[string]bool
// Terminating is used to identify whether ClusterCIDR has been marked for termination.
Terminating bool
}
const (
// The subnet mask size cannot be greater than 16 more than the cluster mask size
// TODO: https://github.com/kubernetes/kubernetes/issues/44918
// clusterSubnetMaxDiff limited to 16 due to the uncompressed bitmap.
// Due to this limitation the subnet mask for IPv6 cluster cidr needs to be >= 48
// as default mask size for IPv6 is 64.
clusterSubnetMaxDiff = 16
// halfIPv6Len is the half of the IPv6 length.
halfIPv6Len = net.IPv6len / 2
)
// CIDRRangeNoCIDRsRemainingErr is an error type used to denote there is no more
// space to allocate CIDR ranges from the given CIDR.
type CIDRRangeNoCIDRsRemainingErr struct {
// CIDR represents the CIDR which is exhausted.
CIDR string
}
func (err *CIDRRangeNoCIDRsRemainingErr) Error() string {
return fmt.Sprintf("CIDR allocation failed; there are no remaining CIDRs left to allocate in the range %s", err.CIDR)
}
// CIDRSetSubNetTooBigErr is an error type to denote that subnet mask size is too
// big compared to the CIDR mask size.
type CIDRSetSubNetTooBigErr struct {
cidr string
subnetMaskSize int
clusterMaskSize int
}
func (err *CIDRSetSubNetTooBigErr) Error() string {
return fmt.Sprintf("Creation of New CIDR Set failed for %s. "+
"PerNodeMaskSize %d is too big for CIDR Mask %d, Maximum difference allowed "+
"is %d", err.cidr, err.subnetMaskSize, err.clusterMaskSize, clusterSubnetMaxDiff)
}
// NewMultiCIDRSet creates a new MultiCIDRSet.
func NewMultiCIDRSet(cidrConfig *net.IPNet, perNodeHostBits int) (*MultiCIDRSet, error) {
clusterMask := cidrConfig.Mask
clusterMaskSize, bits := clusterMask.Size()
var subNetMaskSize int
switch /*v4 or v6*/ {
case netutils.IsIPv4(cidrConfig.IP):
subNetMaskSize = 32 - perNodeHostBits
case netutils.IsIPv6(cidrConfig.IP):
subNetMaskSize = 128 - perNodeHostBits
}
if netutils.IsIPv6(cidrConfig.IP) && (subNetMaskSize-clusterMaskSize > clusterSubnetMaxDiff) {
return nil, &CIDRSetSubNetTooBigErr{
cidr: cidrConfig.String(),
subnetMaskSize: subNetMaskSize,
clusterMaskSize: clusterMaskSize,
}
}
// Register MultiCIDRSet metrics.
registerCidrsetMetrics()
return &MultiCIDRSet{
ClusterCIDR: cidrConfig,
nodeMask: net.CIDRMask(subNetMaskSize, bits),
clusterMaskSize: clusterMaskSize,
MaxCIDRs: 1 << uint32(subNetMaskSize-clusterMaskSize),
NodeMaskSize: subNetMaskSize,
Label: cidrConfig.String(),
AllocatedCIDRMap: make(map[string]bool, 0),
}, nil
}
func (s *MultiCIDRSet) indexToCIDRBlock(index int) (*net.IPNet, error) {
var ip []byte
switch /*v4 or v6*/ {
case netutils.IsIPv4(s.ClusterCIDR.IP):
j := uint32(index) << uint32(32-s.NodeMaskSize)
ipInt := (binary.BigEndian.Uint32(s.ClusterCIDR.IP)) | j
ip = make([]byte, net.IPv4len)
binary.BigEndian.PutUint32(ip, ipInt)
case netutils.IsIPv6(s.ClusterCIDR.IP):
// leftClusterIP | rightClusterIP
// 2001:0DB8:1234:0000:0000:0000:0000:0000
const v6NBits = 128
const halfV6NBits = v6NBits / 2
leftClusterIP := binary.BigEndian.Uint64(s.ClusterCIDR.IP[:halfIPv6Len])
rightClusterIP := binary.BigEndian.Uint64(s.ClusterCIDR.IP[halfIPv6Len:])
ip = make([]byte, net.IPv6len)
if s.NodeMaskSize <= halfV6NBits {
// We only care about left side IP.
leftClusterIP |= uint64(index) << uint(halfV6NBits-s.NodeMaskSize)
} else {
if s.clusterMaskSize < halfV6NBits {
// see how many bits are needed to reach the left side.
btl := uint(s.NodeMaskSize - halfV6NBits)
indexMaxBit := uint(64 - bits.LeadingZeros64(uint64(index)))
if indexMaxBit > btl {
leftClusterIP |= uint64(index) >> btl
}
}
// the right side will be calculated the same way either the
// subNetMaskSize affects both left and right sides.
rightClusterIP |= uint64(index) << uint(v6NBits-s.NodeMaskSize)
}
binary.BigEndian.PutUint64(ip[:halfIPv6Len], leftClusterIP)
binary.BigEndian.PutUint64(ip[halfIPv6Len:], rightClusterIP)
default:
return nil, fmt.Errorf("invalid IP: %s", s.ClusterCIDR.IP)
}
return &net.IPNet{
IP: ip,
Mask: s.nodeMask,
}, nil
}
// NextCandidate returns the next candidate and the last evaluated index
// for the current cidrSet. Returns nil if the candidate is already allocated.
func (s *MultiCIDRSet) NextCandidate() (*net.IPNet, int, error) {
s.Lock()
defer s.Unlock()
if s.allocatedCIDRs == s.MaxCIDRs {
return nil, 0, &CIDRRangeNoCIDRsRemainingErr{
CIDR: s.Label,
}
}
candidate := s.nextCandidate
for i := 0; i < s.MaxCIDRs; i++ {
nextCandidateCIDR, err := s.indexToCIDRBlock(candidate)
if err != nil {
return nil, i, err
}
// Check if the nextCandidate is not already allocated.
if _, ok := s.AllocatedCIDRMap[nextCandidateCIDR.String()]; !ok {
s.nextCandidate = (candidate + 1) % s.MaxCIDRs
return nextCandidateCIDR, i, nil
}
candidate = (candidate + 1) % s.MaxCIDRs
}
return nil, s.MaxCIDRs, &CIDRRangeNoCIDRsRemainingErr{
CIDR: s.Label,
}
}
// getBeginningAndEndIndices returns the indices for the given CIDR, returned
// values are inclusive indices [beginning, end].
func (s *MultiCIDRSet) getBeginningAndEndIndices(cidr *net.IPNet) (int, int, error) {
if cidr == nil {
return -1, -1, fmt.Errorf("error getting indices for cluster cidr %v, cidr is nil", s.ClusterCIDR)
}
begin, end := 0, s.MaxCIDRs-1
cidrMask := cidr.Mask
maskSize, _ := cidrMask.Size()
var ipSize int
if !s.ClusterCIDR.Contains(cidr.IP.Mask(s.ClusterCIDR.Mask)) && !cidr.Contains(s.ClusterCIDR.IP.Mask(cidr.Mask)) {
return -1, -1, fmt.Errorf("cidr %v is out the range of cluster cidr %v", cidr, s.ClusterCIDR)
}
if s.clusterMaskSize < maskSize {
var err error
ipSize = net.IPv4len
if netutils.IsIPv6(cidr.IP) {
ipSize = net.IPv6len
}
begin, err = s.getIndexForCIDR(&net.IPNet{
IP: cidr.IP.Mask(s.nodeMask),
Mask: s.nodeMask,
})
if err != nil {
return -1, -1, err
}
ip := make([]byte, ipSize)
if netutils.IsIPv4(cidr.IP) {
ipInt := binary.BigEndian.Uint32(cidr.IP) | (^binary.BigEndian.Uint32(cidr.Mask))
binary.BigEndian.PutUint32(ip, ipInt)
} else {
// ipIntLeft | ipIntRight
// 2001:0DB8:1234:0000:0000:0000:0000:0000
ipIntLeft := binary.BigEndian.Uint64(cidr.IP[:net.IPv6len/2]) | (^binary.BigEndian.Uint64(cidr.Mask[:net.IPv6len/2]))
ipIntRight := binary.BigEndian.Uint64(cidr.IP[net.IPv6len/2:]) | (^binary.BigEndian.Uint64(cidr.Mask[net.IPv6len/2:]))
binary.BigEndian.PutUint64(ip[:net.IPv6len/2], ipIntLeft)
binary.BigEndian.PutUint64(ip[net.IPv6len/2:], ipIntRight)
}
end, err = s.getIndexForCIDR(&net.IPNet{
IP: net.IP(ip).Mask(s.nodeMask),
Mask: s.nodeMask,
})
if err != nil {
return -1, -1, err
}
}
return begin, end, nil
}
// Release releases the given CIDR range.
func (s *MultiCIDRSet) Release(cidr *net.IPNet) error {
begin, end, err := s.getBeginningAndEndIndices(cidr)
if err != nil {
return err
}
s.Lock()
defer s.Unlock()
for i := begin; i <= end; i++ {
// Remove from the allocated CIDR Map and decrement the counter only if currently
// marked allocated. Avoids double counting.
currCIDR, err := s.indexToCIDRBlock(i)
if err != nil {
return err
}
if _, ok := s.AllocatedCIDRMap[currCIDR.String()]; ok {
delete(s.AllocatedCIDRMap, currCIDR.String())
s.allocatedCIDRs--
cidrSetReleases.WithLabelValues(s.Label).Inc()
}
}
cidrSetUsage.WithLabelValues(s.Label).Set(float64(s.allocatedCIDRs) / float64(s.MaxCIDRs))
return nil
}
// Occupy marks the given CIDR range as used. Occupy succeeds even if the CIDR
// range was previously used.
func (s *MultiCIDRSet) Occupy(cidr *net.IPNet) (err error) {
begin, end, err := s.getBeginningAndEndIndices(cidr)
if err != nil {
return err
}
s.Lock()
defer s.Unlock()
for i := begin; i <= end; i++ {
// Add to the allocated CIDR Map and increment the counter only if not already
// marked allocated. Prevents double counting.
currCIDR, err := s.indexToCIDRBlock(i)
if err != nil {
return err
}
if _, ok := s.AllocatedCIDRMap[currCIDR.String()]; !ok {
s.AllocatedCIDRMap[currCIDR.String()] = true
cidrSetAllocations.WithLabelValues(s.Label).Inc()
s.allocatedCIDRs++
}
}
cidrSetUsage.WithLabelValues(s.Label).Set(float64(s.allocatedCIDRs) / float64(s.MaxCIDRs))
return nil
}
func (s *MultiCIDRSet) getIndexForCIDR(cidr *net.IPNet) (int, error) {
return s.getIndexForIP(cidr.IP)
}
func (s *MultiCIDRSet) getIndexForIP(ip net.IP) (int, error) {
if ip.To4() != nil {
cidrIndex := (binary.BigEndian.Uint32(s.ClusterCIDR.IP) ^ binary.BigEndian.Uint32(ip.To4())) >> uint32(32-s.NodeMaskSize)
if cidrIndex >= uint32(s.MaxCIDRs) {
return 0, fmt.Errorf("CIDR: %v/%v is out of the range of CIDR allocator", ip, s.NodeMaskSize)
}
return int(cidrIndex), nil
}
if netutils.IsIPv6(ip) {
bigIP := big.NewInt(0).SetBytes(s.ClusterCIDR.IP)
bigIP = bigIP.Xor(bigIP, big.NewInt(0).SetBytes(ip))
cidrIndexBig := bigIP.Rsh(bigIP, uint(net.IPv6len*8-s.NodeMaskSize))
cidrIndex := cidrIndexBig.Uint64()
if cidrIndex >= uint64(s.MaxCIDRs) {
return 0, fmt.Errorf("CIDR: %v/%v is out of the range of CIDR allocator", ip, s.NodeMaskSize)
}
return int(cidrIndex), nil
}
return 0, fmt.Errorf("invalid IP: %v", ip)
}
// UpdateEvaluatedCount increments the evaluated count.
func (s *MultiCIDRSet) UpdateEvaluatedCount(evaluated int) {
cidrSetAllocationTriesPerRequest.WithLabelValues(s.Label).Observe(float64(evaluated))
}

View File

@@ -0,0 +1,874 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package multicidrset
import (
"net"
"reflect"
"testing"
"k8s.io/component-base/metrics/testutil"
"k8s.io/klog/v2"
utilnet "k8s.io/utils/net"
)
func allocateNext(s *MultiCIDRSet) (*net.IPNet, error) {
candidate, _, err := s.NextCandidate()
if err != nil {
return nil, err
}
err = s.Occupy(candidate)
return candidate, err
}
func TestCIDRSetFullyAllocated(t *testing.T) {
cases := []struct {
clusterCIDRStr string
perNodeHostBits int
expectedCIDR string
description string
}{
{
clusterCIDRStr: "127.123.234.0/28",
perNodeHostBits: 4,
expectedCIDR: "127.123.234.0/28",
description: "Fully allocated CIDR with IPv4",
},
{
clusterCIDRStr: "beef:1234::/112",
perNodeHostBits: 16,
expectedCIDR: "beef:1234::/112",
description: "Fully allocated CIDR with IPv6",
},
}
for _, tc := range cases {
_, clusterCIDR, _ := utilnet.ParseCIDRSloppy(tc.clusterCIDRStr)
a, err := NewMultiCIDRSet(clusterCIDR, tc.perNodeHostBits)
if err != nil {
t.Fatalf("unexpected error: %v for %v", err, tc.description)
}
p, err := allocateNext(a)
if err != nil {
t.Fatalf("unexpected error: %v for %v", err, tc.description)
}
if p.String() != tc.expectedCIDR {
t.Fatalf("unexpected allocated cidr: %v, expecting %v for %v",
p.String(), tc.expectedCIDR, tc.description)
}
_, err = allocateNext(a)
if err == nil {
t.Fatalf("expected error because of fully-allocated range for %v", tc.description)
}
a.Release(p)
p, err = allocateNext(a)
if err != nil {
t.Fatalf("unexpected error: %v for %v", err, tc.description)
}
if p.String() != tc.expectedCIDR {
t.Fatalf("unexpected allocated cidr: %v, expecting %v for %v",
p.String(), tc.expectedCIDR, tc.description)
}
_, err = allocateNext(a)
if err == nil {
t.Fatalf("expected error because of fully-allocated range for %v", tc.description)
}
}
}
func TestIndexToCIDRBlock(t *testing.T) {
cases := []struct {
clusterCIDRStr string
perNodeHostBits int
index int
CIDRBlock string
description string
}{
{
clusterCIDRStr: "127.123.3.0/16",
perNodeHostBits: 8,
index: 0,
CIDRBlock: "127.123.0.0/24",
description: "1st IP address indexed with IPv4",
},
{
clusterCIDRStr: "127.123.0.0/16",
perNodeHostBits: 8,
index: 15,
CIDRBlock: "127.123.15.0/24",
description: "16th IP address indexed with IPv4",
},
{
clusterCIDRStr: "192.168.5.219/28",
perNodeHostBits: 0,
index: 5,
CIDRBlock: "192.168.5.213/32",
description: "5th IP address indexed with IPv4",
},
{
clusterCIDRStr: "2001:0db8:1234:3::/48",
perNodeHostBits: 64,
index: 0,
CIDRBlock: "2001:db8:1234::/64",
description: "1st IP address indexed with IPv6 /64",
},
{
clusterCIDRStr: "2001:0db8:1234::/48",
perNodeHostBits: 64,
index: 15,
CIDRBlock: "2001:db8:1234:f::/64",
description: "16th IP address indexed with IPv6 /64",
},
{
clusterCIDRStr: "2001:0db8:85a3::8a2e:0370:7334/50",
perNodeHostBits: 65,
index: 6425,
CIDRBlock: "2001:db8:85a3:3232::/63",
description: "6426th IP address indexed with IPv6 /63",
},
{
clusterCIDRStr: "2001:0db8::/32",
perNodeHostBits: 80,
index: 0,
CIDRBlock: "2001:db8::/48",
description: "1st IP address indexed with IPv6 /48",
},
{
clusterCIDRStr: "2001:0db8::/32",
perNodeHostBits: 80,
index: 15,
CIDRBlock: "2001:db8:f::/48",
description: "16th IP address indexed with IPv6 /48",
},
{
clusterCIDRStr: "2001:0db8:85a3::8a2e:0370:7334/32",
perNodeHostBits: 80,
index: 6425,
CIDRBlock: "2001:db8:1919::/48",
description: "6426th IP address indexed with IPv6 /48",
},
{
clusterCIDRStr: "2001:0db8:1234:ff00::/56",
perNodeHostBits: 56,
index: 0,
CIDRBlock: "2001:db8:1234:ff00::/72",
description: "1st IP address indexed with IPv6 /72",
},
{
clusterCIDRStr: "2001:0db8:1234:ff00::/56",
perNodeHostBits: 56,
index: 15,
CIDRBlock: "2001:db8:1234:ff00:f00::/72",
description: "16th IP address indexed with IPv6 /72",
},
{
clusterCIDRStr: "2001:0db8:1234:ff00::0370:7334/56",
perNodeHostBits: 56,
index: 6425,
CIDRBlock: "2001:db8:1234:ff19:1900::/72",
description: "6426th IP address indexed with IPv6 /72",
},
{
clusterCIDRStr: "2001:0db8:1234:0:1234::/80",
perNodeHostBits: 32,
index: 0,
CIDRBlock: "2001:db8:1234:0:1234::/96",
description: "1st IP address indexed with IPv6 /96",
},
{
clusterCIDRStr: "2001:0db8:1234:0:1234::/80",
perNodeHostBits: 32,
index: 15,
CIDRBlock: "2001:db8:1234:0:1234:f::/96",
description: "16th IP address indexed with IPv6 /96",
},
{
clusterCIDRStr: "2001:0db8:1234:ff00::0370:7334/80",
perNodeHostBits: 32,
index: 6425,
CIDRBlock: "2001:db8:1234:ff00:0:1919::/96",
description: "6426th IP address indexed with IPv6 /96",
},
}
for _, tc := range cases {
_, clusterCIDR, _ := utilnet.ParseCIDRSloppy(tc.clusterCIDRStr)
a, err := NewMultiCIDRSet(clusterCIDR, tc.perNodeHostBits)
if err != nil {
t.Fatalf("error for %v ", tc.description)
}
cidr, err := a.indexToCIDRBlock(tc.index)
if err != nil {
t.Fatalf("error for %v ", tc.description)
}
if cidr.String() != tc.CIDRBlock {
t.Fatalf("error for %v index %d %s", tc.description, tc.index, cidr.String())
}
}
}
func TestCIDRSet_RandomishAllocation(t *testing.T) {
cases := []struct {
clusterCIDRStr string
description string
}{
{
clusterCIDRStr: "127.123.234.0/16",
description: "RandomishAllocation with IPv4",
},
{
clusterCIDRStr: "beef:1234::/112",
description: "RandomishAllocation with IPv6",
},
}
for _, tc := range cases {
_, clusterCIDR, _ := utilnet.ParseCIDRSloppy(tc.clusterCIDRStr)
a, err := NewMultiCIDRSet(clusterCIDR, 8)
if err != nil {
t.Fatalf("Error allocating CIDRSet for %v", tc.description)
}
// allocate all the CIDRs.
var cidrs []*net.IPNet
for i := 0; i < 256; i++ {
if c, err := allocateNext(a); err == nil {
cidrs = append(cidrs, c)
} else {
t.Fatalf("unexpected error: %v for %v", err, tc.description)
}
}
_, err = allocateNext(a)
if err == nil {
t.Fatalf("expected error because of fully-allocated range for %v", tc.description)
}
// release all the CIDRs.
for i := 0; i < len(cidrs); i++ {
a.Release(cidrs[i])
}
// allocate the CIDRs again.
var rcidrs []*net.IPNet
for i := 0; i < 256; i++ {
if c, err := allocateNext(a); err == nil {
rcidrs = append(rcidrs, c)
} else {
t.Fatalf("unexpected error: %d, %v for %v", i, err, tc.description)
}
}
_, err = allocateNext(a)
if err == nil {
t.Fatalf("expected error because of fully-allocated range for %v", tc.description)
}
if !reflect.DeepEqual(cidrs, rcidrs) {
t.Fatalf("expected re-allocated cidrs are the same collection for %v", tc.description)
}
}
}
func TestCIDRSet_AllocationOccupied(t *testing.T) {
cases := []struct {
clusterCIDRStr string
description string
}{
{
clusterCIDRStr: "127.123.234.0/16",
description: "AllocationOccupied with IPv4",
},
{
clusterCIDRStr: "beef:1234::/112",
description: "AllocationOccupied with IPv6",
},
}
for _, tc := range cases {
_, clusterCIDR, _ := utilnet.ParseCIDRSloppy(tc.clusterCIDRStr)
a, err := NewMultiCIDRSet(clusterCIDR, 8)
if err != nil {
t.Fatalf("Error allocating CIDRSet for %v", tc.description)
}
// allocate all the CIDRs.
var cidrs []*net.IPNet
var numCIDRs = 256
for i := 0; i < numCIDRs; i++ {
if c, err := allocateNext(a); err == nil {
cidrs = append(cidrs, c)
} else {
t.Fatalf("unexpected error: %v for %v", err, tc.description)
}
}
_, err = allocateNext(a)
if err == nil {
t.Fatalf("expected error because of fully-allocated range for %v", tc.description)
}
// release all the CIDRs.
for i := 0; i < len(cidrs); i++ {
a.Release(cidrs[i])
}
// occupy the last 128 CIDRs.
for i := numCIDRs / 2; i < numCIDRs; i++ {
a.Occupy(cidrs[i])
}
// occupy the first of the last 128 again.
a.Occupy(cidrs[numCIDRs/2])
// allocate the first 128 CIDRs again.
var rcidrs []*net.IPNet
for i := 0; i < numCIDRs/2; i++ {
if c, err := allocateNext(a); err == nil {
rcidrs = append(rcidrs, c)
} else {
t.Fatalf("unexpected error: %d, %v for %v", i, err, tc.description)
}
}
_, err = allocateNext(a)
if err == nil {
t.Fatalf("expected error because of fully-allocated range for %v", tc.description)
}
// check Occupy() works properly.
for i := numCIDRs / 2; i < numCIDRs; i++ {
rcidrs = append(rcidrs, cidrs[i])
}
if !reflect.DeepEqual(cidrs, rcidrs) {
t.Fatalf("expected re-allocated cidrs are the same collection for %v", tc.description)
}
}
}
func TestDoubleOccupyRelease(t *testing.T) {
// Run a sequence of operations and check the number of occupied CIDRs
// after each one.
clusterCIDRStr := "10.42.0.0/16"
operations := []struct {
cidrStr string
operation string
numOccupied int
}{
// Occupy 1 element: +1
{
cidrStr: "10.42.5.0/24",
operation: "occupy",
numOccupied: 1,
},
// Occupy 1 more element: +1
{
cidrStr: "10.42.9.0/24",
operation: "occupy",
numOccupied: 2,
},
// Occupy 4 elements overlapping with one from the above: +3
{
cidrStr: "10.42.8.0/22",
operation: "occupy",
numOccupied: 5,
},
// Occupy an already-occupied element: no change
{
cidrStr: "10.42.9.0/24",
operation: "occupy",
numOccupied: 5,
},
// Release an coccupied element: -1
{
cidrStr: "10.42.9.0/24",
operation: "release",
numOccupied: 4,
},
// Release an unoccupied element: no change
{
cidrStr: "10.42.9.0/24",
operation: "release",
numOccupied: 4,
},
// Release 4 elements, only one of which is occupied: -1
{
cidrStr: "10.42.4.0/22",
operation: "release",
numOccupied: 3,
},
}
// Check that there are exactly that many allocatable CIDRs after all
// operations have been executed.
numAllocatable24s := (1 << 8) - 3
_, clusterCIDR, _ := utilnet.ParseCIDRSloppy(clusterCIDRStr)
a, err := NewMultiCIDRSet(clusterCIDR, 8)
if err != nil {
t.Fatalf("Error allocating CIDRSet")
}
// Execute the operations.
for _, op := range operations {
_, cidr, _ := utilnet.ParseCIDRSloppy(op.cidrStr)
switch op.operation {
case "occupy":
a.Occupy(cidr)
case "release":
a.Release(cidr)
default:
t.Fatalf("test error: unknown operation %v", op.operation)
}
if a.allocatedCIDRs != op.numOccupied {
t.Fatalf("CIDR %v Expected %d occupied CIDRS, got %d", cidr, op.numOccupied, a.allocatedCIDRs)
}
}
// Make sure that we can allocate exactly `numAllocatable24s` elements.
for i := 0; i < numAllocatable24s; i++ {
_, err := allocateNext(a)
if err != nil {
t.Fatalf("Expected to be able to allocate %d CIDRS, failed after %d", numAllocatable24s, i)
}
}
_, err = allocateNext(a)
if err == nil {
t.Fatalf("Expected to be able to allocate exactly %d CIDRS, got one more", numAllocatable24s)
}
}
func TestGetBitforCIDR(t *testing.T) {
cases := []struct {
clusterCIDRStr string
perNodeHostBits int
subNetCIDRStr string
expectedBit int
expectErr bool
description string
}{
{
clusterCIDRStr: "127.0.0.0/8",
perNodeHostBits: 16,
subNetCIDRStr: "127.0.0.0/16",
expectedBit: 0,
expectErr: false,
description: "Get 0 Bit with IPv4",
},
{
clusterCIDRStr: "be00::/8",
perNodeHostBits: 112,
subNetCIDRStr: "be00::/16",
expectedBit: 0,
expectErr: false,
description: "Get 0 Bit with IPv6",
},
{
clusterCIDRStr: "127.0.0.0/8",
perNodeHostBits: 16,
subNetCIDRStr: "127.123.0.0/16",
expectedBit: 123,
expectErr: false,
description: "Get 123rd Bit with IPv4",
},
{
clusterCIDRStr: "be00::/8",
perNodeHostBits: 112,
subNetCIDRStr: "beef::/16",
expectedBit: 0xef,
expectErr: false,
description: "Get xef Bit with IPv6",
},
{
clusterCIDRStr: "127.0.0.0/8",
perNodeHostBits: 16,
subNetCIDRStr: "127.168.0.0/16",
expectedBit: 168,
expectErr: false,
description: "Get 168th Bit with IPv4",
},
{
clusterCIDRStr: "be00::/8",
perNodeHostBits: 112,
subNetCIDRStr: "be68::/16",
expectedBit: 0x68,
expectErr: false,
description: "Get x68th Bit with IPv6",
},
{
clusterCIDRStr: "127.0.0.0/8",
perNodeHostBits: 16,
subNetCIDRStr: "127.224.0.0/16",
expectedBit: 224,
expectErr: false,
description: "Get 224th Bit with IPv4",
},
{
clusterCIDRStr: "be00::/8",
perNodeHostBits: 112,
subNetCIDRStr: "be24::/16",
expectedBit: 0x24,
expectErr: false,
description: "Get x24th Bit with IPv6",
},
{
clusterCIDRStr: "192.168.0.0/16",
perNodeHostBits: 8,
subNetCIDRStr: "192.168.12.0/24",
expectedBit: 12,
expectErr: false,
description: "Get 12th Bit with IPv4",
},
{
clusterCIDRStr: "beef::/16",
perNodeHostBits: 104,
subNetCIDRStr: "beef:1200::/24",
expectedBit: 0x12,
expectErr: false,
description: "Get x12th Bit with IPv6",
},
{
clusterCIDRStr: "192.168.0.0/16",
perNodeHostBits: 8,
subNetCIDRStr: "192.168.151.0/24",
expectedBit: 151,
expectErr: false,
description: "Get 151st Bit with IPv4",
},
{
clusterCIDRStr: "beef::/16",
perNodeHostBits: 104,
subNetCIDRStr: "beef:9700::/24",
expectedBit: 0x97,
expectErr: false,
description: "Get x97st Bit with IPv6",
},
{
clusterCIDRStr: "192.168.0.0/16",
perNodeHostBits: 8,
subNetCIDRStr: "127.168.224.0/24",
expectErr: true,
description: "Get error with IPv4",
},
{
clusterCIDRStr: "beef::/16",
perNodeHostBits: 104,
subNetCIDRStr: "2001:db00::/24",
expectErr: true,
description: "Get error with IPv6",
},
}
for _, tc := range cases {
_, clusterCIDR, err := utilnet.ParseCIDRSloppy(tc.clusterCIDRStr)
if err != nil {
t.Fatalf("unexpected error: %v for %v", err, tc.description)
}
cs, err := NewMultiCIDRSet(clusterCIDR, tc.perNodeHostBits)
if err != nil {
t.Fatalf("Error allocating CIDRSet for %v", tc.description)
}
_, subnetCIDR, err := utilnet.ParseCIDRSloppy(tc.subNetCIDRStr)
if err != nil {
t.Fatalf("unexpected error: %v for %v", err, tc.description)
}
got, err := cs.getIndexForCIDR(subnetCIDR)
if err == nil && tc.expectErr {
klog.Errorf("expected error but got null for %v", tc.description)
continue
}
if err != nil && !tc.expectErr {
klog.Errorf("unexpected error: %v for %v", err, tc.description)
continue
}
if got != tc.expectedBit {
klog.Errorf("expected %v, but got %v for %v", tc.expectedBit, got, tc.description)
}
}
}
func TestCIDRSetv6(t *testing.T) {
cases := []struct {
clusterCIDRStr string
perNodeHostBits int
expectedCIDR string
expectedCIDR2 string
expectErr bool
description string
}{
{
clusterCIDRStr: "127.0.0.0/8",
perNodeHostBits: 0,
expectErr: false,
expectedCIDR: "127.0.0.0/32",
expectedCIDR2: "127.0.0.1/32",
description: "Max cluster subnet size with IPv4",
},
{
clusterCIDRStr: "beef:1234::/32",
perNodeHostBits: 79,
expectErr: true,
description: "Max cluster subnet size with IPv6",
},
{
clusterCIDRStr: "2001:beef:1234:369b::/60",
perNodeHostBits: 64,
expectedCIDR: "2001:beef:1234:3690::/64",
expectedCIDR2: "2001:beef:1234:3691::/64",
expectErr: false,
description: "Allocate a few IPv6",
},
}
for _, tc := range cases {
t.Run(tc.description, func(t *testing.T) {
_, clusterCIDR, _ := utilnet.ParseCIDRSloppy(tc.clusterCIDRStr)
a, err := NewMultiCIDRSet(clusterCIDR, tc.perNodeHostBits)
if gotErr := err != nil; gotErr != tc.expectErr {
t.Fatalf("NewMultiCIDRSet(%v, %v) = %v, %v; gotErr = %t, want %t", clusterCIDR, tc.perNodeHostBits, a, err, gotErr, tc.expectErr)
}
if a == nil {
return
}
p, err := allocateNext(a)
if err == nil && tc.expectErr {
t.Errorf("allocateNext(a) = nil, want error")
}
if err != nil && !tc.expectErr {
t.Errorf("allocateNext(a) = %+v, want no error", err)
}
if !tc.expectErr {
if p != nil && p.String() != tc.expectedCIDR {
t.Fatalf("allocateNext(a) got %+v, want %+v", p.String(), tc.expectedCIDR)
}
}
p2, err := allocateNext(a)
if err == nil && tc.expectErr {
t.Errorf("allocateNext(a) = nil, want error")
}
if err != nil && !tc.expectErr {
t.Errorf("allocateNext(a) = %+v, want no error", err)
}
if !tc.expectErr {
if p2 != nil && p2.String() != tc.expectedCIDR2 {
t.Fatalf("allocateNext(a) got %+v, want %+v", p2.String(), tc.expectedCIDR)
}
}
})
}
}
func TestMultiCIDRSetMetrics(t *testing.T) {
cidr := "10.0.0.0/16"
_, clusterCIDR, _ := utilnet.ParseCIDRSloppy(cidr)
// We have 256 free cidrs
a, err := NewMultiCIDRSet(clusterCIDR, 8)
if err != nil {
t.Fatalf("unexpected error creating MultiCIDRSet: %v", err)
}
clearMetrics(map[string]string{"clusterCIDR": cidr})
// Allocate next all.
for i := 1; i <= 256; i++ {
_, err := allocateNext(a)
if err != nil {
t.Fatalf("unexpected error allocating a new CIDR: %v", err)
}
em := testMetrics{
usage: float64(i) / float64(256),
allocs: float64(i),
releases: 0,
allocTries: 0,
}
expectMetrics(t, cidr, em)
}
// Release all CIDRs.
a.Release(clusterCIDR)
em := testMetrics{
usage: 0,
allocs: 256,
releases: 256,
allocTries: 0,
}
expectMetrics(t, cidr, em)
// Allocate all CIDRs.
a.Occupy(clusterCIDR)
em = testMetrics{
usage: 1,
allocs: 512,
releases: 256,
allocTries: 0,
}
expectMetrics(t, cidr, em)
}
func TestMultiCIDRSetMetricsHistogram(t *testing.T) {
cidr := "10.0.0.0/16"
_, clusterCIDR, _ := utilnet.ParseCIDRSloppy(cidr)
// We have 256 free cidrs.
a, err := NewMultiCIDRSet(clusterCIDR, 8)
if err != nil {
t.Fatalf("unexpected error creating MultiCIDRSet: %v", err)
}
clearMetrics(map[string]string{"clusterCIDR": cidr})
// Allocate half of the range.
// Occupy does not update the nextCandidate.
_, halfClusterCIDR, _ := utilnet.ParseCIDRSloppy("10.0.0.0/17")
a.Occupy(halfClusterCIDR)
em := testMetrics{
usage: 0.5,
allocs: 128,
releases: 0,
}
expectMetrics(t, cidr, em)
// Allocate next should iterate until the next free cidr
// that is exactly the same number we allocated previously.
_, err = allocateNext(a)
if err != nil {
t.Fatalf("unexpected error allocating a new CIDR: %v", err)
}
em = testMetrics{
usage: float64(129) / float64(256),
allocs: 129,
releases: 0,
}
expectMetrics(t, cidr, em)
}
func TestMultiCIDRSetMetricsDual(t *testing.T) {
// create IPv4 cidrSet.
cidrIPv4 := "10.0.0.0/16"
_, clusterCIDRv4, _ := utilnet.ParseCIDRSloppy(cidrIPv4)
a, err := NewMultiCIDRSet(clusterCIDRv4, 8)
if err != nil {
t.Fatalf("unexpected error creating MultiCIDRSet: %v", err)
}
clearMetrics(map[string]string{"clusterCIDR": cidrIPv4})
// create IPv6 cidrSet.
cidrIPv6 := "2001:db8::/48"
_, clusterCIDRv6, _ := utilnet.ParseCIDRSloppy(cidrIPv6)
b, err := NewMultiCIDRSet(clusterCIDRv6, 64)
if err != nil {
t.Fatalf("unexpected error creating MultiCIDRSet: %v", err)
}
clearMetrics(map[string]string{"clusterCIDR": cidrIPv6})
// Allocate all.
a.Occupy(clusterCIDRv4)
em := testMetrics{
usage: 1,
allocs: 256,
releases: 0,
allocTries: 0,
}
expectMetrics(t, cidrIPv4, em)
b.Occupy(clusterCIDRv6)
em = testMetrics{
usage: 1,
allocs: 65536,
releases: 0,
allocTries: 0,
}
expectMetrics(t, cidrIPv6, em)
// Release all.
a.Release(clusterCIDRv4)
em = testMetrics{
usage: 0,
allocs: 256,
releases: 256,
allocTries: 0,
}
expectMetrics(t, cidrIPv4, em)
b.Release(clusterCIDRv6)
em = testMetrics{
usage: 0,
allocs: 65536,
releases: 65536,
allocTries: 0,
}
expectMetrics(t, cidrIPv6, em)
}
// Metrics helpers.
func clearMetrics(labels map[string]string) {
cidrSetAllocations.Delete(labels)
cidrSetReleases.Delete(labels)
cidrSetUsage.Delete(labels)
cidrSetAllocationTriesPerRequest.Delete(labels)
}
type testMetrics struct {
usage float64
allocs float64
releases float64
allocTries float64
}
func expectMetrics(t *testing.T, label string, em testMetrics) {
var m testMetrics
var err error
m.usage, err = testutil.GetGaugeMetricValue(cidrSetUsage.WithLabelValues(label))
if err != nil {
t.Errorf("failed to get %s value, err: %v", cidrSetUsage.Name, err)
}
m.allocs, err = testutil.GetCounterMetricValue(cidrSetAllocations.WithLabelValues(label))
if err != nil {
t.Errorf("failed to get %s value, err: %v", cidrSetAllocations.Name, err)
}
m.releases, err = testutil.GetCounterMetricValue(cidrSetReleases.WithLabelValues(label))
if err != nil {
t.Errorf("failed to get %s value, err: %v", cidrSetReleases.Name, err)
}
m.allocTries, err = testutil.GetHistogramMetricValue(cidrSetAllocationTriesPerRequest.WithLabelValues(label))
if err != nil {
t.Errorf("failed to get %s value, err: %v", cidrSetAllocationTriesPerRequest.Name, err)
}
if m != em {
t.Fatalf("metrics error: expected %v, received %v", em, m)
}
}
// Benchmarks
func benchmarkAllocateAllIPv6(cidr string, perNodeHostBits int, b *testing.B) {
_, clusterCIDR, _ := utilnet.ParseCIDRSloppy(cidr)
a, _ := NewMultiCIDRSet(clusterCIDR, perNodeHostBits)
for n := 0; n < b.N; n++ {
// Allocate the whole range + 1.
for i := 0; i <= a.MaxCIDRs; i++ {
allocateNext(a)
}
// Release all.
a.Release(clusterCIDR)
}
}
func BenchmarkAllocateAll_48_52(b *testing.B) { benchmarkAllocateAllIPv6("2001:db8::/48", 52, b) }
func BenchmarkAllocateAll_48_56(b *testing.B) { benchmarkAllocateAllIPv6("2001:db8::/48", 56, b) }
func BenchmarkAllocateAll_48_60(b *testing.B) { benchmarkAllocateAllIPv6("2001:db8::/48", 60, b) }
func BenchmarkAllocateAll_48_64(b *testing.B) { benchmarkAllocateAllIPv6("2001:db8::/48", 64, b) }
func BenchmarkAllocateAll_64_68(b *testing.B) { benchmarkAllocateAllIPv6("2001:db8::/64", 68, b) }
func BenchmarkAllocateAll_64_72(b *testing.B) { benchmarkAllocateAllIPv6("2001:db8::/64", 72, b) }
func BenchmarkAllocateAll_64_76(b *testing.B) { benchmarkAllocateAllIPv6("2001:db8::/64", 76, b) }
func BenchmarkAllocateAll_64_80(b *testing.B) { benchmarkAllocateAllIPv6("2001:db8::/64", 80, b) }

View File

@@ -41,13 +41,6 @@ import (
controllerutil "k8s.io/kubernetes/pkg/controller/util/node"
)
// cidrs are reserved, then node resource is patched with them
// this type holds the reservation info for a node
type nodeReservedCIDRs struct {
allocatedCIDRs []*net.IPNet
nodeName string
}
type rangeAllocator struct {
client clientset.Interface
// cluster cidrs as passed in during controller creation
@@ -333,7 +326,7 @@ func (r *rangeAllocator) updateCIDRsAllocation(data nodeReservedCIDRs) error {
var err error
var node *v1.Node
defer r.removeNodeFromProcessing(data.nodeName)
cidrsString := cidrsAsString(data.allocatedCIDRs)
cidrsString := ipnetToStringList(data.allocatedCIDRs)
node, err = r.nodeLister.Get(data.nodeName)
if err != nil {
klog.Errorf("Failed while getting node %v for updating Node.Spec.PodCIDRs: %v", data.nodeName, err)
@@ -391,12 +384,3 @@ func (r *rangeAllocator) updateCIDRsAllocation(data nodeReservedCIDRs) error {
}
return err
}
// converts a slice of cidrs into <c-1>,<c-2>,<c-n>
func cidrsAsString(inCIDRs []*net.IPNet) []string {
outCIDRs := make([]string, len(inCIDRs))
for idx, inCIDR := range inCIDRs {
outCIDRs[idx] = inCIDR.String()
}
return outCIDRs
}

View File

@@ -25,40 +25,12 @@ import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
coreinformers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/nodeipam/ipam/test"
"k8s.io/kubernetes/pkg/controller/testutil"
netutils "k8s.io/utils/net"
)
const testNodePollInterval = 10 * time.Millisecond
var alwaysReady = func() bool { return true }
func waitForUpdatedNodeWithTimeout(nodeHandler *testutil.FakeNodeHandler, number int, timeout time.Duration) error {
return wait.Poll(nodePollInterval, timeout, func() (bool, error) {
if len(nodeHandler.GetUpdatedNodesCopy()) >= number {
return true, nil
}
return false, nil
})
}
// Creates a fakeNodeInformer using the provided fakeNodeHandler.
func getFakeNodeInformer(fakeNodeHandler *testutil.FakeNodeHandler) coreinformers.NodeInformer {
fakeClient := &fake.Clientset{}
fakeInformerFactory := informers.NewSharedInformerFactory(fakeClient, controller.NoResyncPeriodFunc())
fakeNodeInformer := fakeInformerFactory.Core().V1().Nodes()
for _, node := range fakeNodeHandler.Existing {
fakeNodeInformer.Informer().GetStore().Add(node)
}
return fakeNodeInformer
}
type testCase struct {
description string
fakeNodeHandler *testutil.FakeNodeHandler
@@ -305,7 +277,7 @@ func TestOccupyPreExistingCIDR(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.description, func(t *testing.T) {
// Initialize the range allocator.
fakeNodeInformer := getFakeNodeInformer(tc.fakeNodeHandler)
fakeNodeInformer := test.FakeNodeInformer(tc.fakeNodeHandler)
nodeList, _ := tc.fakeNodeHandler.List(context.TODO(), metav1.ListOptions{})
_, err := NewCIDRRangeAllocator(tc.fakeNodeHandler, fakeNodeInformer, tc.allocatorParams, nodeList)
if err == nil && tc.ctrlCreateFail {
@@ -321,7 +293,7 @@ func TestOccupyPreExistingCIDR(t *testing.T) {
func TestAllocateOrOccupyCIDRSuccess(t *testing.T) {
// Non-parallel test (overrides global var)
oldNodePollInterval := nodePollInterval
nodePollInterval = testNodePollInterval
nodePollInterval = test.NodePollInterval
defer func() {
nodePollInterval = oldNodePollInterval
}()
@@ -537,7 +509,7 @@ func TestAllocateOrOccupyCIDRSuccess(t *testing.T) {
// test function
testFunc := func(tc testCase) {
fakeNodeInformer := getFakeNodeInformer(tc.fakeNodeHandler)
fakeNodeInformer := test.FakeNodeInformer(tc.fakeNodeHandler)
nodeList, _ := tc.fakeNodeHandler.List(context.TODO(), metav1.ListOptions{})
// Initialize the range allocator.
allocator, err := NewCIDRRangeAllocator(tc.fakeNodeHandler, fakeNodeInformer, tc.allocatorParams, nodeList)
@@ -550,7 +522,7 @@ func TestAllocateOrOccupyCIDRSuccess(t *testing.T) {
t.Logf("%v: found non-default implementation of CIDRAllocator, skipping white-box test...", tc.description)
return
}
rangeAllocator.nodesSynced = alwaysReady
rangeAllocator.nodesSynced = test.AlwaysReady
rangeAllocator.recorder = testutil.NewFakeRecorder()
go allocator.Run(wait.NeverStop)
@@ -580,7 +552,7 @@ func TestAllocateOrOccupyCIDRSuccess(t *testing.T) {
if updateCount != 1 {
t.Fatalf("test error: all tests must update exactly one node")
}
if err := waitForUpdatedNodeWithTimeout(tc.fakeNodeHandler, updateCount, wait.ForeverTestTimeout); err != nil {
if err := test.WaitForUpdatedNodeWithTimeout(tc.fakeNodeHandler, updateCount, wait.ForeverTestTimeout); err != nil {
t.Fatalf("%v: timeout while waiting for Node update: %v", tc.description, err)
}
@@ -639,7 +611,7 @@ func TestAllocateOrOccupyCIDRFailure(t *testing.T) {
testFunc := func(tc testCase) {
// Initialize the range allocator.
allocator, err := NewCIDRRangeAllocator(tc.fakeNodeHandler, getFakeNodeInformer(tc.fakeNodeHandler), tc.allocatorParams, nil)
allocator, err := NewCIDRRangeAllocator(tc.fakeNodeHandler, test.FakeNodeInformer(tc.fakeNodeHandler), tc.allocatorParams, nil)
if err != nil {
t.Logf("%v: failed to create CIDRRangeAllocator with error %v", tc.description, err)
}
@@ -648,7 +620,7 @@ func TestAllocateOrOccupyCIDRFailure(t *testing.T) {
t.Logf("%v: found non-default implementation of CIDRAllocator, skipping white-box test...", tc.description)
return
}
rangeAllocator.nodesSynced = alwaysReady
rangeAllocator.nodesSynced = test.AlwaysReady
rangeAllocator.recorder = testutil.NewFakeRecorder()
go allocator.Run(wait.NeverStop)
@@ -708,7 +680,7 @@ type releaseTestCase struct {
func TestReleaseCIDRSuccess(t *testing.T) {
// Non-parallel test (overrides global var)
oldNodePollInterval := nodePollInterval
nodePollInterval = testNodePollInterval
nodePollInterval = test.NodePollInterval
defer func() {
nodePollInterval = oldNodePollInterval
}()
@@ -784,13 +756,13 @@ func TestReleaseCIDRSuccess(t *testing.T) {
testFunc := func(tc releaseTestCase) {
// Initialize the range allocator.
allocator, _ := NewCIDRRangeAllocator(tc.fakeNodeHandler, getFakeNodeInformer(tc.fakeNodeHandler), tc.allocatorParams, nil)
allocator, _ := NewCIDRRangeAllocator(tc.fakeNodeHandler, test.FakeNodeInformer(tc.fakeNodeHandler), tc.allocatorParams, nil)
rangeAllocator, ok := allocator.(*rangeAllocator)
if !ok {
t.Logf("%v: found non-default implementation of CIDRAllocator, skipping white-box test...", tc.description)
return
}
rangeAllocator.nodesSynced = alwaysReady
rangeAllocator.nodesSynced = test.AlwaysReady
rangeAllocator.recorder = testutil.NewFakeRecorder()
go allocator.Run(wait.NeverStop)
@@ -813,7 +785,7 @@ func TestReleaseCIDRSuccess(t *testing.T) {
if err != nil {
t.Fatalf("%v: unexpected error in AllocateOrOccupyCIDR: %v", tc.description, err)
}
if err := waitForUpdatedNodeWithTimeout(tc.fakeNodeHandler, 1, wait.ForeverTestTimeout); err != nil {
if err := test.WaitForUpdatedNodeWithTimeout(tc.fakeNodeHandler, 1, wait.ForeverTestTimeout); err != nil {
t.Fatalf("%v: timeout while waiting for Node update: %v", tc.description, err)
}
} else {
@@ -841,7 +813,7 @@ func TestReleaseCIDRSuccess(t *testing.T) {
if err = allocator.AllocateOrOccupyCIDR(tc.fakeNodeHandler.Existing[0]); err != nil {
t.Fatalf("%v: unexpected error in AllocateOrOccupyCIDR: %v", tc.description, err)
}
if err := waitForUpdatedNodeWithTimeout(tc.fakeNodeHandler, 1, wait.ForeverTestTimeout); err != nil {
if err := test.WaitForUpdatedNodeWithTimeout(tc.fakeNodeHandler, 1, wait.ForeverTestTimeout); err != nil {
t.Fatalf("%v: timeout while waiting for Node update: %v", tc.description, err)
}

View File

@@ -18,10 +18,21 @@ package test
import (
"net"
"time"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
coreinformers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/testutil"
netutils "k8s.io/utils/net"
)
const NodePollInterval = 10 * time.Millisecond
var AlwaysReady = func() bool { return true }
// MustParseCIDR returns the CIDR range parsed from s or panics if the string
// cannot be parsed.
func MustParseCIDR(s string) *net.IPNet {
@@ -31,3 +42,25 @@ func MustParseCIDR(s string) *net.IPNet {
}
return ret
}
// FakeNodeInformer creates a fakeNodeInformer using the provided fakeNodeHandler.
func FakeNodeInformer(fakeNodeHandler *testutil.FakeNodeHandler) coreinformers.NodeInformer {
fakeClient := &fake.Clientset{}
fakeInformerFactory := informers.NewSharedInformerFactory(fakeClient, controller.NoResyncPeriodFunc())
fakeNodeInformer := fakeInformerFactory.Core().V1().Nodes()
for _, node := range fakeNodeHandler.Existing {
fakeNodeInformer.Informer().GetStore().Add(node)
}
return fakeNodeInformer
}
func WaitForUpdatedNodeWithTimeout(nodeHandler *testutil.FakeNodeHandler, number int, timeout time.Duration) error {
return wait.Poll(NodePollInterval, timeout, func() (bool, error) {
if len(nodeHandler.GetUpdatedNodesCopy()) >= number {
return true, nil
}
return false, nil
})
}

View File

@@ -20,20 +20,18 @@ import (
"net"
"time"
"k8s.io/klog/v2"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
coreinformers "k8s.io/client-go/informers/core/v1"
networkinginformers "k8s.io/client-go/informers/networking/v1alpha1"
clientset "k8s.io/client-go/kubernetes"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
coreinformers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes"
corelisters "k8s.io/client-go/listers/core/v1"
cloudprovider "k8s.io/cloud-provider"
controllersmetrics "k8s.io/component-base/metrics/prometheus/controllers"
"k8s.io/component-base/metrics/prometheus/ratelimiter"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/controller/nodeipam/ipam"
)
@@ -74,6 +72,7 @@ type Controller struct {
// currently, this should be handled as a fatal error.
func NewNodeIpamController(
nodeInformer coreinformers.NodeInformer,
clusterCIDRInformer networkinginformers.ClusterCIDRInformer,
cloud cloudprovider.Interface,
kubeClient clientset.Interface,
clusterCIDRs []*net.IPNet,
@@ -136,7 +135,7 @@ func NewNodeIpamController(
NodeCIDRMaskSizes: nodeCIDRMaskSizes,
}
ic.cidrAllocator, err = ipam.New(kubeClient, cloud, nodeInformer, ic.allocatorType, allocatorParams)
ic.cidrAllocator, err = ipam.New(kubeClient, cloud, nodeInformer, clusterCIDRInformer, ic.allocatorType, allocatorParams)
if err != nil {
return nil, err
}

View File

@@ -48,6 +48,7 @@ func newTestNodeIpamController(clusterCIDR []*net.IPNet, serviceCIDR *net.IPNet,
fakeClient := &fake.Clientset{}
fakeInformerFactory := informers.NewSharedInformerFactory(fakeClient, controller.NoResyncPeriodFunc())
fakeNodeInformer := fakeInformerFactory.Core().V1().Nodes()
fakeClusterCIDRInformer := fakeInformerFactory.Networking().V1alpha1().ClusterCIDRs()
for _, node := range fakeNodeHandler.Existing {
fakeNodeInformer.Informer().GetStore().Add(node)
@@ -55,7 +56,7 @@ func newTestNodeIpamController(clusterCIDR []*net.IPNet, serviceCIDR *net.IPNet,
fakeGCE := gce.NewFakeGCECloud(gce.DefaultTestClusterValues())
return NewNodeIpamController(
fakeNodeInformer, fakeGCE, clientSet,
fakeNodeInformer, fakeClusterCIDRInformer, fakeGCE, clientSet,
clusterCIDR, serviceCIDR, secondaryServiceCIDR, nodeCIDRMaskSizes, allocatorType,
)
}
@@ -78,6 +79,9 @@ func TestNewNodeIpamControllerWithCIDRMasks(t *testing.T) {
{"valid_range_allocator_dualstack", "10.0.0.0/21,2000::/10", "10.1.0.0/21", emptyServiceCIDR, []int{24, 98}, ipam.RangeAllocatorType, false},
{"valid_range_allocator_dualstack_dualstackservice", "10.0.0.0/21,2000::/10", "10.1.0.0/21", "3000::/10", []int{24, 98}, ipam.RangeAllocatorType, false},
{"valid_multi_cidr_range_allocator", "10.0.0.0/21", "10.1.0.0/21", emptyServiceCIDR, []int{24}, ipam.MultiCIDRRangeAllocatorType, false},
{"valid_multi_cidr_range_allocator_dualstack", "10.0.0.0/21,2000::/10", "10.1.0.0/21", emptyServiceCIDR, []int{24, 98}, ipam.MultiCIDRRangeAllocatorType, false},
{"valid_cloud_allocator", "10.0.0.0/21", "10.1.0.0/21", emptyServiceCIDR, []int{24}, ipam.CloudAllocatorType, false},
{"valid_ipam_from_cluster", "10.0.0.0/21", "10.1.0.0/21", emptyServiceCIDR, []int{24}, ipam.IPAMFromClusterAllocatorType, false},
{"valid_ipam_from_cloud", "10.0.0.0/21", "10.1.0.0/21", emptyServiceCIDR, []int{24}, ipam.IPAMFromCloudAllocatorType, false},