implement dual write on Service ClusterIP allocator
MultiCIDRServiceAllocator implements a new ClusterIP allocator based on IPAddress object to solve the problems and limitations caused by existing bitmap allocators. However, during the rollout of new versions, deployments need to support a skew of one version between kube-apiservers. To avoid the possible problem where there are multiple Services requests on the skewed apiservers and that both allocate the same IP to different Services, the new allocator will implement a dual-write strategy under the feature gate DisableAllocatorDualWrite. After the MultiCIDRServiceAllocator is GA, the DisableAllocatorDualWrite can be enabled safely as all apiservers will run with the new allocators. The graduation of DisableAllocatorDualWrite can also be used to clean up the opaque API object that contains the old bitmaps. If MultiCIDRServiceAllocator is enabled and DisableAllocatorDualWrite is disable and is a new environment, there is no bitmap object created, hence, the apiserver will initialize it to be able to write on it.
This commit is contained in:
@@ -50,7 +50,8 @@ func validateClusterIPFlags(options Extra) []error {
|
||||
}
|
||||
|
||||
// Complete() expected to have set Primary* and Secondary
|
||||
if !utilfeature.DefaultFeatureGate.Enabled(features.MultiCIDRServiceAllocator) {
|
||||
if !utilfeature.DefaultFeatureGate.Enabled(features.MultiCIDRServiceAllocator) ||
|
||||
!utilfeature.DefaultFeatureGate.Enabled(features.DisableAllocatorDualWrite) {
|
||||
// primary CIDR validation
|
||||
if err := validateMaxCIDRRange(options.PrimaryServiceClusterIPRange, maxCIDRBits, "--service-cluster-ip-range"); err != nil {
|
||||
errs = append(errs, err)
|
||||
@@ -72,7 +73,8 @@ func validateClusterIPFlags(options Extra) []error {
|
||||
if !dualstack {
|
||||
errs = append(errs, errors.New("--service-cluster-ip-range[0] and --service-cluster-ip-range[1] must be of different IP family"))
|
||||
}
|
||||
if !utilfeature.DefaultFeatureGate.Enabled(features.MultiCIDRServiceAllocator) {
|
||||
if !utilfeature.DefaultFeatureGate.Enabled(features.MultiCIDRServiceAllocator) ||
|
||||
!utilfeature.DefaultFeatureGate.Enabled(features.DisableAllocatorDualWrite) {
|
||||
if err := validateMaxCIDRRange(options.SecondaryServiceClusterIPRange, maxCIDRBits, "--service-cluster-ip-range[1]"); err != nil {
|
||||
errs = append(errs, err)
|
||||
}
|
||||
|
@@ -60,10 +60,11 @@ func makeOptionsWithCIDRs(serviceCIDR string, secondaryServiceCIDR string) *Serv
|
||||
|
||||
func TestClusterServiceIPRange(t *testing.T) {
|
||||
testCases := []struct {
|
||||
name string
|
||||
options *ServerRunOptions
|
||||
expectErrors bool
|
||||
gate bool
|
||||
name string
|
||||
options *ServerRunOptions
|
||||
expectErrors bool
|
||||
ipAllocatorGate bool
|
||||
disableDualWriteGate bool
|
||||
}{
|
||||
{
|
||||
name: "no service cidr",
|
||||
@@ -91,22 +92,46 @@ func TestClusterServiceIPRange(t *testing.T) {
|
||||
options: makeOptionsWithCIDRs("10.0.0.0/8", ""),
|
||||
},
|
||||
{
|
||||
name: "service cidr IPv4 is too big but gate enbled",
|
||||
expectErrors: false,
|
||||
options: makeOptionsWithCIDRs("10.0.0.0/8", ""),
|
||||
gate: true,
|
||||
name: "service cidr IPv4 is too big but gate enbled",
|
||||
expectErrors: true,
|
||||
options: makeOptionsWithCIDRs("10.0.0.0/8", ""),
|
||||
ipAllocatorGate: true,
|
||||
disableDualWriteGate: false,
|
||||
},
|
||||
{
|
||||
name: "service cidr IPv6 is too big but gate enbled",
|
||||
expectErrors: false,
|
||||
options: makeOptionsWithCIDRs("2001:db8::/64", ""),
|
||||
gate: true,
|
||||
name: "service cidr IPv6 is too big but only ipallocator gate enabled",
|
||||
expectErrors: true,
|
||||
options: makeOptionsWithCIDRs("2001:db8::/64", ""),
|
||||
ipAllocatorGate: true,
|
||||
disableDualWriteGate: false,
|
||||
},
|
||||
{
|
||||
name: "service cidr IPv6 is too big and gate enbled",
|
||||
expectErrors: false,
|
||||
options: makeOptionsWithCIDRs("2001:db8::/12", ""),
|
||||
gate: true,
|
||||
name: "service cidr IPv6 is too big but only ipallocator gate enabled",
|
||||
expectErrors: true,
|
||||
options: makeOptionsWithCIDRs("2001:db8::/12", ""),
|
||||
ipAllocatorGate: true,
|
||||
disableDualWriteGate: false,
|
||||
},
|
||||
{
|
||||
name: "service cidr IPv4 is too big but gate enabled",
|
||||
expectErrors: false,
|
||||
options: makeOptionsWithCIDRs("10.0.0.0/8", ""),
|
||||
ipAllocatorGate: true,
|
||||
disableDualWriteGate: true,
|
||||
},
|
||||
{
|
||||
name: "service cidr IPv6 is too big but gate enabled",
|
||||
expectErrors: false,
|
||||
options: makeOptionsWithCIDRs("2001:db8::/64", ""),
|
||||
ipAllocatorGate: true,
|
||||
disableDualWriteGate: true,
|
||||
},
|
||||
{
|
||||
name: "service cidr IPv6 is too big and gate enabled",
|
||||
expectErrors: false,
|
||||
options: makeOptionsWithCIDRs("2001:db8::/12", ""),
|
||||
ipAllocatorGate: true,
|
||||
disableDualWriteGate: true,
|
||||
},
|
||||
{
|
||||
name: "dual-stack secondary cidr too big",
|
||||
@@ -114,10 +139,18 @@ func TestClusterServiceIPRange(t *testing.T) {
|
||||
options: makeOptionsWithCIDRs("10.0.0.0/16", "3000::/64"),
|
||||
},
|
||||
{
|
||||
name: "dual-stack secondary cidr too big gate enabled",
|
||||
expectErrors: false,
|
||||
options: makeOptionsWithCIDRs("10.0.0.0/16", "3000::/48"),
|
||||
gate: true,
|
||||
name: "dual-stack secondary cidr too big but only ipallocator gate enabled",
|
||||
expectErrors: true,
|
||||
options: makeOptionsWithCIDRs("10.0.0.0/16", "3000::/48"),
|
||||
ipAllocatorGate: true,
|
||||
disableDualWriteGate: false,
|
||||
},
|
||||
{
|
||||
name: "dual-stack secondary cidr too big gate enabled",
|
||||
expectErrors: false,
|
||||
options: makeOptionsWithCIDRs("10.0.0.0/16", "3000::/48"),
|
||||
ipAllocatorGate: true,
|
||||
disableDualWriteGate: true,
|
||||
},
|
||||
{
|
||||
name: "more than two entries",
|
||||
@@ -149,7 +182,8 @@ func TestClusterServiceIPRange(t *testing.T) {
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.MultiCIDRServiceAllocator, tc.gate)
|
||||
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.MultiCIDRServiceAllocator, tc.ipAllocatorGate)
|
||||
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.DisableAllocatorDualWrite, tc.disableDualWriteGate)
|
||||
|
||||
errs := validateClusterIPFlags(tc.options.Extra)
|
||||
if len(errs) > 0 && !tc.expectErrors {
|
||||
|
@@ -178,6 +178,14 @@ const (
|
||||
// Add support for CDI Device IDs in the Device Plugin API.
|
||||
DevicePluginCDIDevices featuregate.Feature = "DevicePluginCDIDevices"
|
||||
|
||||
// owner: @aojea
|
||||
// alpha: v1.31
|
||||
//
|
||||
// The apiservers with the MultiCIDRServiceAllocator feature enable, in order to support live migration from the old bitmap ClusterIP
|
||||
// allocators to the new IPAddress allocators introduced by the MultiCIDRServiceAllocator feature, performs a dual-write on
|
||||
// both allocators. This feature gate disables the dual write on the new Cluster IP allocators.
|
||||
DisableAllocatorDualWrite featuregate.Feature = "DisableAllocatorDualWrite"
|
||||
|
||||
// owner: @andrewsykim
|
||||
// alpha: v1.22
|
||||
// beta: v1.29
|
||||
@@ -1003,6 +1011,8 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS
|
||||
|
||||
CronJobsScheduledAnnotation: {Default: true, PreRelease: featuregate.Beta},
|
||||
|
||||
DisableAllocatorDualWrite: {Default: false, PreRelease: featuregate.Alpha}, // remove after MultiCIDRServiceAllocator is GA
|
||||
|
||||
DisableCloudProviders: {Default: true, PreRelease: featuregate.GA, LockToDefault: true},
|
||||
|
||||
DisableKubeletCloudCredentialProviders: {Default: true, PreRelease: featuregate.GA, LockToDefault: true},
|
||||
|
@@ -36,6 +36,7 @@ import (
|
||||
"k8s.io/client-go/kubernetes"
|
||||
networkingv1alpha1client "k8s.io/client-go/kubernetes/typed/networking/v1alpha1"
|
||||
policyclient "k8s.io/client-go/kubernetes/typed/policy/v1"
|
||||
"k8s.io/klog/v2"
|
||||
"k8s.io/kubernetes/pkg/api/legacyscheme"
|
||||
api "k8s.io/kubernetes/pkg/apis/core"
|
||||
"k8s.io/kubernetes/pkg/cluster/ports"
|
||||
@@ -351,6 +352,37 @@ func (c *Config) newServiceIPAllocators() (registries rangeRegistries, primaryCl
|
||||
if err != nil {
|
||||
return rangeRegistries{}, nil, nil, nil, err
|
||||
}
|
||||
var bitmapAllocator ipallocator.Interface
|
||||
if !utilfeature.DefaultFeatureGate.Enabled(features.DisableAllocatorDualWrite) {
|
||||
bitmapAllocator, err = ipallocator.New(&serviceClusterIPRange, func(max int, rangeSpec string, offset int) (allocator.Interface, error) {
|
||||
mem := allocator.NewAllocationMapWithOffset(max, rangeSpec, offset)
|
||||
etcd, err := serviceallocator.NewEtcd(mem, "/ranges/serviceips", serviceStorageConfig.ForResource(api.Resource("serviceipallocations")))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// It is possible to start apiserver clusters with the new allocator and dual write enable on new environments.
|
||||
// If this is the case we need to initialize the bitmap or it will fail to allocate IP addresses because
|
||||
// the ResourceVersion of the opaque API object is zero.
|
||||
rangeRegistry, err := etcd.Get()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
rangeRegistry.Range = serviceClusterIPRange.String()
|
||||
if len(rangeRegistry.ResourceVersion) == 0 {
|
||||
klog.Infof("kube-apiserver started with IP allocator and dual write enabled but bitmap allocator does not exist, recreating it ...")
|
||||
err := etcd.CreateOrUpdate(rangeRegistry)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
registries.clusterIP = etcd
|
||||
return etcd, nil
|
||||
})
|
||||
if err != nil {
|
||||
return rangeRegistries{}, nil, nil, nil, fmt.Errorf("cannot create cluster IP allocator: %w", err)
|
||||
}
|
||||
|
||||
}
|
||||
// TODO(aojea) Revisit the initialization of the allocators
|
||||
// since right now it depends on the service-cidr flags and
|
||||
// sets the default IPFamily that may not be coherent with the
|
||||
@@ -360,6 +392,7 @@ func (c *Config) newServiceIPAllocators() (registries rangeRegistries, primaryCl
|
||||
c.Informers.Networking().V1alpha1().ServiceCIDRs(),
|
||||
c.Informers.Networking().V1alpha1().IPAddresses(),
|
||||
netutils.IsIPv6CIDR(&serviceClusterIPRange),
|
||||
bitmapAllocator,
|
||||
)
|
||||
if err != nil {
|
||||
return rangeRegistries{}, nil, nil, nil, fmt.Errorf("cannot create cluster IP allocator: %v", err)
|
||||
@@ -391,6 +424,37 @@ func (c *Config) newServiceIPAllocators() (registries rangeRegistries, primaryCl
|
||||
if err != nil {
|
||||
return rangeRegistries{}, nil, nil, nil, err
|
||||
}
|
||||
var bitmapAllocator ipallocator.Interface
|
||||
if !utilfeature.DefaultFeatureGate.Enabled(features.DisableAllocatorDualWrite) {
|
||||
bitmapAllocator, err = ipallocator.New(&c.Services.SecondaryClusterIPRange, func(max int, rangeSpec string, offset int) (allocator.Interface, error) {
|
||||
mem := allocator.NewAllocationMapWithOffset(max, rangeSpec, offset)
|
||||
// TODO etcdallocator package to return a storage interface via the storageFactory
|
||||
etcd, err := serviceallocator.NewEtcd(mem, "/ranges/secondaryserviceips", serviceStorageConfig.ForResource(api.Resource("serviceipallocations")))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// It is possible to start apiserver clusters with the new allocator and dual write enable on new environments.
|
||||
// If this is the case we need to initialize the bitmap or it will fail to allocate IP addresses because
|
||||
// the ResourceVersion of the opaque API object is zero.
|
||||
rangeRegistry, err := etcd.Get()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
rangeRegistry.Range = serviceClusterIPRange.String()
|
||||
if len(rangeRegistry.ResourceVersion) == 0 {
|
||||
klog.Infof("kube-apiserver started with IP allocator and dual write enabled but bitmap allocator does not exist, recreating it ...")
|
||||
err := etcd.CreateOrUpdate(rangeRegistry)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
registries.secondaryClusterIP = etcd
|
||||
return etcd, nil
|
||||
})
|
||||
if err != nil {
|
||||
return rangeRegistries{}, nil, nil, nil, fmt.Errorf("cannot create cluster secondary IP allocator: %w", err)
|
||||
}
|
||||
}
|
||||
// TODO(aojea) Revisit the initialization of the allocators
|
||||
// since right now it depends on the service-cidr flags and
|
||||
// sets the default IPFamily that may not be coherent with the
|
||||
@@ -400,6 +464,7 @@ func (c *Config) newServiceIPAllocators() (registries rangeRegistries, primaryCl
|
||||
c.Informers.Networking().V1alpha1().ServiceCIDRs(),
|
||||
c.Informers.Networking().V1alpha1().IPAddresses(),
|
||||
netutils.IsIPv6CIDR(&c.Services.SecondaryClusterIPRange),
|
||||
bitmapAllocator,
|
||||
)
|
||||
if err != nil {
|
||||
return rangeRegistries{}, nil, nil, nil, fmt.Errorf("cannot create cluster secondary IP allocator: %v", err)
|
||||
|
@@ -30,6 +30,7 @@ import (
|
||||
"k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
networkingv1alpha1informers "k8s.io/client-go/informers/networking/v1alpha1"
|
||||
networkingv1alpha1client "k8s.io/client-go/kubernetes/typed/networking/v1alpha1"
|
||||
networkingv1alpha1listers "k8s.io/client-go/listers/networking/v1alpha1"
|
||||
@@ -38,6 +39,7 @@ import (
|
||||
"k8s.io/klog/v2"
|
||||
"k8s.io/kubernetes/pkg/api/servicecidr"
|
||||
api "k8s.io/kubernetes/pkg/apis/core"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
netutils "k8s.io/utils/net"
|
||||
)
|
||||
|
||||
@@ -69,6 +71,9 @@ type MetaAllocator struct {
|
||||
|
||||
ipFamily api.IPFamily
|
||||
metrics bool // enable the metrics collection
|
||||
|
||||
// TODO(aojea): remove with the feature gate DisableAllocatorDualWrite
|
||||
bitmapAllocator Interface
|
||||
}
|
||||
|
||||
type item struct {
|
||||
@@ -86,9 +91,10 @@ func NewMetaAllocator(
|
||||
serviceCIDRInformer networkingv1alpha1informers.ServiceCIDRInformer,
|
||||
ipAddressInformer networkingv1alpha1informers.IPAddressInformer,
|
||||
isIPv6 bool,
|
||||
bitmapAllocator Interface,
|
||||
) (*MetaAllocator, error) {
|
||||
|
||||
c := newMetaAllocator(client, serviceCIDRInformer, ipAddressInformer, isIPv6)
|
||||
c := newMetaAllocator(client, serviceCIDRInformer, ipAddressInformer, isIPv6, bitmapAllocator)
|
||||
go c.run()
|
||||
return c, nil
|
||||
}
|
||||
@@ -98,6 +104,7 @@ func newMetaAllocator(client networkingv1alpha1client.NetworkingV1alpha1Interfac
|
||||
serviceCIDRInformer networkingv1alpha1informers.ServiceCIDRInformer,
|
||||
ipAddressInformer networkingv1alpha1informers.IPAddressInformer,
|
||||
isIPv6 bool,
|
||||
bitmapAllocator Interface,
|
||||
) *MetaAllocator {
|
||||
// TODO: make the NewMetaAllocator agnostic of the IP family
|
||||
family := api.IPv4Protocol
|
||||
@@ -116,10 +123,11 @@ func newMetaAllocator(client networkingv1alpha1client.NetworkingV1alpha1Interfac
|
||||
workqueue.DefaultTypedControllerRateLimiter[string](),
|
||||
workqueue.TypedRateLimitingQueueConfig[string]{Name: ControllerName},
|
||||
),
|
||||
internalStopCh: make(chan struct{}),
|
||||
allocators: make(map[string]*item),
|
||||
ipFamily: family,
|
||||
metrics: false,
|
||||
internalStopCh: make(chan struct{}),
|
||||
allocators: make(map[string]*item),
|
||||
ipFamily: family,
|
||||
metrics: false,
|
||||
bitmapAllocator: bitmapAllocator,
|
||||
}
|
||||
|
||||
_, _ = serviceCIDRInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
@@ -329,15 +337,27 @@ func (c *MetaAllocator) AllocateService(service *api.Service, ip net.IP) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !utilfeature.DefaultFeatureGate.Enabled(features.DisableAllocatorDualWrite) {
|
||||
cidr := c.bitmapAllocator.CIDR()
|
||||
if cidr.Contains(ip) {
|
||||
err := c.bitmapAllocator.Allocate(ip)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
return allocator.AllocateService(service, ip)
|
||||
}
|
||||
|
||||
// Allocate attempts to reserve the provided IP. ErrNotInRange or
|
||||
// ErrAllocated will be returned if the IP is not valid for this range
|
||||
// or has already been reserved. ErrFull will be returned if there
|
||||
// are no addresses left.
|
||||
// Only for testing, it will fail to create the IPAddress object because
|
||||
// the Service reference is required.s
|
||||
func (c *MetaAllocator) Allocate(ip net.IP) error {
|
||||
allocator, err := c.getAllocator(ip, true)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return allocator.Allocate(ip)
|
||||
return c.AllocateService(nil, ip)
|
||||
|
||||
}
|
||||
|
||||
func (c *MetaAllocator) AllocateNextService(service *api.Service) (net.IP, error) {
|
||||
@@ -356,32 +376,26 @@ func (c *MetaAllocator) AllocateNextService(service *api.Service) (net.IP, error
|
||||
}
|
||||
ip, err := item.allocator.AllocateNextService(service)
|
||||
if err == nil {
|
||||
if !utilfeature.DefaultFeatureGate.Enabled(features.DisableAllocatorDualWrite) {
|
||||
cidr := c.bitmapAllocator.CIDR()
|
||||
if cidr.Contains(ip) {
|
||||
err := c.bitmapAllocator.Allocate(ip)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
return ip, nil
|
||||
}
|
||||
}
|
||||
return nil, ErrFull
|
||||
}
|
||||
|
||||
// AllocateNext return an IP address that wasn't allocated yet.
|
||||
// Only for testing, it will fail to create the IPAddress object because
|
||||
// the Service reference is required
|
||||
func (c *MetaAllocator) AllocateNext() (net.IP, error) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
// TODO(aojea) add strategy to return a random allocator but
|
||||
// taking into consideration the number of addresses of each allocator.
|
||||
// Per example, if we have allocator A and B with 256 and 1024 possible
|
||||
// addresses each, the chances to get B has to be 4 times the chances to
|
||||
// get A so we can spread the load of IPs randomly.
|
||||
// However, we need to validate the best strategy before going to Beta.
|
||||
isIPv6 := c.ipFamily == api.IPFamily(v1.IPv6Protocol)
|
||||
for cidr, item := range c.allocators {
|
||||
if netutils.IsIPv6String(cidr) != isIPv6 {
|
||||
continue
|
||||
}
|
||||
ip, err := item.allocator.AllocateNext()
|
||||
if err == nil {
|
||||
return ip, nil
|
||||
}
|
||||
}
|
||||
return nil, ErrFull
|
||||
return c.AllocateNextService(nil)
|
||||
}
|
||||
|
||||
func (c *MetaAllocator) Release(ip net.IP) error {
|
||||
@@ -389,6 +403,12 @@ func (c *MetaAllocator) Release(ip net.IP) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !utilfeature.DefaultFeatureGate.Enabled(features.DisableAllocatorDualWrite) {
|
||||
cidr := c.bitmapAllocator.CIDR()
|
||||
if cidr.Contains(ip) {
|
||||
_ = c.bitmapAllocator.Release(ip)
|
||||
}
|
||||
}
|
||||
return allocator.Release(ip)
|
||||
|
||||
}
|
||||
@@ -424,6 +444,9 @@ func (c *MetaAllocator) Destroy() {
|
||||
select {
|
||||
case <-c.internalStopCh:
|
||||
default:
|
||||
if !utilfeature.DefaultFeatureGate.Enabled(features.DisableAllocatorDualWrite) {
|
||||
c.bitmapAllocator.Destroy()
|
||||
}
|
||||
close(c.internalStopCh)
|
||||
}
|
||||
}
|
||||
|
@@ -28,9 +28,12 @@ import (
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
"k8s.io/client-go/informers"
|
||||
"k8s.io/client-go/kubernetes/fake"
|
||||
k8stesting "k8s.io/client-go/testing"
|
||||
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
netutils "k8s.io/utils/net"
|
||||
)
|
||||
|
||||
@@ -85,7 +88,7 @@ func newTestMetaAllocator() (*MetaAllocator, error) {
|
||||
return false, ip, err
|
||||
}))
|
||||
|
||||
c := newMetaAllocator(client.NetworkingV1alpha1(), serviceCIDRInformer, ipInformer, false)
|
||||
c := newMetaAllocator(client.NetworkingV1alpha1(), serviceCIDRInformer, ipInformer, false, nil)
|
||||
|
||||
c.serviceCIDRSynced = func() bool { return true }
|
||||
c.ipAddressSynced = func() bool { return true }
|
||||
@@ -94,6 +97,7 @@ func newTestMetaAllocator() (*MetaAllocator, error) {
|
||||
}
|
||||
|
||||
func TestCIDRAllocateMultiple(t *testing.T) {
|
||||
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.DisableAllocatorDualWrite, true)
|
||||
r, err := newTestMetaAllocator()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
@@ -191,6 +195,7 @@ func TestCIDRAllocateMultiple(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestCIDRAllocateShadow(t *testing.T) {
|
||||
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.DisableAllocatorDualWrite, true)
|
||||
r, err := newTestMetaAllocator()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
@@ -263,6 +268,7 @@ func TestCIDRAllocateShadow(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestCIDRAllocateGrow(t *testing.T) {
|
||||
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.DisableAllocatorDualWrite, true)
|
||||
r, err := newTestMetaAllocator()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
@@ -354,6 +360,7 @@ func TestCIDRAllocateGrow(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestCIDRAllocateShrink(t *testing.T) {
|
||||
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.DisableAllocatorDualWrite, true)
|
||||
r, err := newTestMetaAllocator()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
@@ -466,6 +473,134 @@ func TestCIDRAllocateShrink(t *testing.T) {
|
||||
|
||||
}
|
||||
|
||||
func TestCIDRAllocateDualWrite(t *testing.T) {
|
||||
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.DisableAllocatorDualWrite, false)
|
||||
r, err := newTestMetaAllocator()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer r.Destroy()
|
||||
|
||||
if f := r.Free(); f != 0 {
|
||||
t.Errorf("free: %d", f)
|
||||
}
|
||||
if _, err := r.AllocateNext(); err == nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
cidr := newServiceCIDR("test", "192.168.0.0/28")
|
||||
_, err = r.client.ServiceCIDRs().Create(context.Background(), cidr, metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
r.enqueServiceCIDR(cidr)
|
||||
// wait for the cidr to be processed and set the informer synced
|
||||
err = wait.PollUntilContextTimeout(context.Background(), 100*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) {
|
||||
allocator, err := r.getAllocator(netutils.ParseIPSloppy("192.168.0.1"), true)
|
||||
if err != nil {
|
||||
t.Logf("unexpected error %v", err)
|
||||
return false, nil
|
||||
}
|
||||
allocator.ipAddressSynced = func() bool { return true }
|
||||
return allocator.ready.Load(), nil
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Create a bitmap allocator that will mirror the ip allocator
|
||||
_, ipnet, err := netutils.ParseCIDRSloppy(cidr.Spec.CIDRs[0])
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected failure: %v", err)
|
||||
}
|
||||
bitmapAllocator, err := NewInMemory(ipnet)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected failure: %v", err)
|
||||
}
|
||||
r.bitmapAllocator = bitmapAllocator
|
||||
|
||||
found := sets.NewString()
|
||||
count := 0
|
||||
for r.Free() > 0 {
|
||||
ip, err := r.AllocateNext()
|
||||
if err != nil {
|
||||
t.Fatalf("error @ free: %d count: %d: %v", r.Free(), count, err)
|
||||
}
|
||||
if r.Free() != bitmapAllocator.Free() {
|
||||
t.Fatalf("ip and bitmap allocator out of sync: %d %d", r.Free(), bitmapAllocator.Free())
|
||||
}
|
||||
count++
|
||||
if found.Has(ip.String()) {
|
||||
t.Fatalf("allocated %s twice: %d", ip, count)
|
||||
}
|
||||
found.Insert(ip.String())
|
||||
}
|
||||
if count != 14 {
|
||||
t.Fatalf("expected 14 IPs got %d", count)
|
||||
}
|
||||
if _, err := r.AllocateNext(); err == nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCIDRAllocateDualWriteCollision(t *testing.T) {
|
||||
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.DisableAllocatorDualWrite, false)
|
||||
r, err := newTestMetaAllocator()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer r.Destroy()
|
||||
|
||||
if f := r.Free(); f != 0 {
|
||||
t.Errorf("free: %d", f)
|
||||
}
|
||||
if _, err := r.AllocateNext(); err == nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
cidr := newServiceCIDR("test", "192.168.0.0/28")
|
||||
_, err = r.client.ServiceCIDRs().Create(context.Background(), cidr, metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
r.enqueServiceCIDR(cidr)
|
||||
// wait for the cidr to be processed and set the informer synced
|
||||
err = wait.PollUntilContextTimeout(context.Background(), 100*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) {
|
||||
allocator, err := r.getAllocator(netutils.ParseIPSloppy("192.168.0.1"), true)
|
||||
if err != nil {
|
||||
t.Logf("unexpected error %v", err)
|
||||
return false, nil
|
||||
}
|
||||
allocator.ipAddressSynced = func() bool { return true }
|
||||
return allocator.ready.Load(), nil
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Create a bitmap allocator that will mirror the ip allocator
|
||||
_, ipnet, err := netutils.ParseCIDRSloppy(cidr.Spec.CIDRs[0])
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected failure: %v", err)
|
||||
}
|
||||
bitmapAllocator, err := NewInMemory(ipnet)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected failure: %v", err)
|
||||
}
|
||||
r.bitmapAllocator = bitmapAllocator
|
||||
|
||||
// preallocate one IP in the bitmap allocator
|
||||
err = bitmapAllocator.Allocate(netutils.ParseIPSloppy("192.168.0.5"))
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error allocating an IP on the bitmap allocator: %v", err)
|
||||
}
|
||||
// the ipallocator must not be able to allocate
|
||||
err = r.Allocate(netutils.ParseIPSloppy("192.168.0.5"))
|
||||
if err == nil {
|
||||
t.Fatalf("unexpected allocation: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: add IPv6 and dual stack test cases
|
||||
func newServiceCIDR(name, cidr string) *networkingv1alpha1.ServiceCIDR {
|
||||
return &networkingv1alpha1.ServiceCIDR{
|
||||
|
@@ -20,6 +20,8 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@@ -28,104 +30,149 @@ import (
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
||||
"k8s.io/kubernetes/cmd/kube-apiserver/app/options"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
|
||||
"k8s.io/kubernetes/pkg/api/legacyscheme"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
"k8s.io/kubernetes/test/integration/framework"
|
||||
"k8s.io/kubernetes/test/utils/ktesting"
|
||||
netutils "k8s.io/utils/net"
|
||||
)
|
||||
|
||||
func TestServiceAlloc(t *testing.T) {
|
||||
func TestServiceAllocation(t *testing.T) {
|
||||
// Create an IPv4 single stack control-plane
|
||||
serviceCIDR := "192.168.0.0/29"
|
||||
|
||||
tCtx := ktesting.Init(t)
|
||||
client, _, tearDownFn := framework.StartTestServer(tCtx, t, framework.TestServerSetup{
|
||||
ModifyServerRunOptions: func(opts *options.ServerRunOptions) {
|
||||
opts.ServiceClusterIPRanges = serviceCIDR
|
||||
var testcases = []struct {
|
||||
name string
|
||||
ipAllocatorGate bool
|
||||
disableDualWriteGate bool
|
||||
}{
|
||||
{
|
||||
name: "Bitmap allocator",
|
||||
ipAllocatorGate: false,
|
||||
disableDualWriteGate: false,
|
||||
},
|
||||
})
|
||||
defer tearDownFn()
|
||||
|
||||
svc := func(i int) *v1.Service {
|
||||
return &v1.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: fmt.Sprintf("svc-%v", i),
|
||||
},
|
||||
Spec: v1.ServiceSpec{
|
||||
Type: v1.ServiceTypeClusterIP,
|
||||
Ports: []v1.ServicePort{
|
||||
{Port: 80},
|
||||
{
|
||||
name: "IP allocator and dual write",
|
||||
ipAllocatorGate: true,
|
||||
disableDualWriteGate: false,
|
||||
},
|
||||
{
|
||||
name: "IP allocator only",
|
||||
ipAllocatorGate: true,
|
||||
disableDualWriteGate: true,
|
||||
},
|
||||
{
|
||||
name: "disable dual write with bitmap allocator",
|
||||
ipAllocatorGate: false,
|
||||
disableDualWriteGate: true,
|
||||
},
|
||||
}
|
||||
for _, tc := range testcases {
|
||||
t.Run(fmt.Sprintf(tc.name), func(t *testing.T) {
|
||||
etcdOptions := framework.SharedEtcd()
|
||||
apiServerOptions := kubeapiservertesting.NewDefaultTestServerOptions()
|
||||
s1 := kubeapiservertesting.StartTestServerOrDie(t,
|
||||
apiServerOptions,
|
||||
[]string{
|
||||
"--runtime-config=networking.k8s.io/v1alpha1=true",
|
||||
"--service-cluster-ip-range=" + serviceCIDR,
|
||||
"--advertise-address=10.0.0.2",
|
||||
"--disable-admission-plugins=ServiceAccount",
|
||||
fmt.Sprintf("--feature-gates=%s=%v,%s=%v", features.MultiCIDRServiceAllocator, tc.ipAllocatorGate, features.DisableAllocatorDualWrite, tc.disableDualWriteGate),
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
etcdOptions)
|
||||
defer s1.TearDownFn()
|
||||
|
||||
// Wait until the default "kubernetes" service is created.
|
||||
if err := wait.Poll(250*time.Millisecond, time.Minute, func() (bool, error) {
|
||||
_, err := client.CoreV1().Services(metav1.NamespaceDefault).Get(context.TODO(), "kubernetes", metav1.GetOptions{})
|
||||
if err != nil && !apierrors.IsNotFound(err) {
|
||||
return false, err
|
||||
}
|
||||
return !apierrors.IsNotFound(err), nil
|
||||
}); err != nil {
|
||||
t.Fatalf("creating kubernetes service timed out")
|
||||
}
|
||||
client, err := clientset.NewForConfig(s1.ClientConfig)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
|
||||
// make 5 more services to take up all IPs
|
||||
for i := 0; i < 5; i++ {
|
||||
if _, err := client.CoreV1().Services(metav1.NamespaceDefault).Create(context.TODO(), svc(i), metav1.CreateOptions{}); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
svc := func(i int) *v1.Service {
|
||||
return &v1.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: fmt.Sprintf("svc-%v", i),
|
||||
},
|
||||
Spec: v1.ServiceSpec{
|
||||
Type: v1.ServiceTypeClusterIP,
|
||||
Ports: []v1.ServicePort{
|
||||
{Port: 80},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// Make another service. It will fail because we're out of cluster IPs
|
||||
if _, err := client.CoreV1().Services(metav1.NamespaceDefault).Create(context.TODO(), svc(8), metav1.CreateOptions{}); err != nil {
|
||||
if !strings.Contains(err.Error(), "range is full") {
|
||||
t.Errorf("unexpected error text: %v", err)
|
||||
}
|
||||
} else {
|
||||
svcs, err := client.CoreV1().Services(metav1.NamespaceAll).List(context.TODO(), metav1.ListOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected success, and error getting the services: %v", err)
|
||||
}
|
||||
allIPs := []string{}
|
||||
for _, s := range svcs.Items {
|
||||
allIPs = append(allIPs, s.Spec.ClusterIP)
|
||||
}
|
||||
t.Fatalf("unexpected creation success. The following IPs exist: %#v. It should only be possible to allocate 2 IP addresses in this cluster.\n\n%#v", allIPs, svcs)
|
||||
}
|
||||
// Wait until the default "kubernetes" service is created.
|
||||
if err := wait.PollUntilContextTimeout(context.Background(), 250*time.Millisecond, 15*time.Second, true, func(context.Context) (bool, error) {
|
||||
_, err := client.CoreV1().Services(metav1.NamespaceDefault).Get(context.TODO(), "kubernetes", metav1.GetOptions{})
|
||||
if err != nil && !apierrors.IsNotFound(err) {
|
||||
return false, err
|
||||
}
|
||||
return !apierrors.IsNotFound(err), nil
|
||||
}); err != nil {
|
||||
t.Fatalf("creating kubernetes service timed out: %v", err)
|
||||
}
|
||||
|
||||
// Delete the first service.
|
||||
if err := client.CoreV1().Services(metav1.NamespaceDefault).Delete(context.TODO(), svc(1).ObjectMeta.Name, metav1.DeleteOptions{}); err != nil {
|
||||
t.Fatalf("got unexpected error: %v", err)
|
||||
}
|
||||
// make 5 more services to take up all IPs
|
||||
for i := 0; i < 5; i++ {
|
||||
if _, err := client.CoreV1().Services(metav1.NamespaceDefault).Create(context.TODO(), svc(i), metav1.CreateOptions{}); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
// This time creating the second service should work.
|
||||
if _, err := client.CoreV1().Services(metav1.NamespaceDefault).Create(context.TODO(), svc(8), metav1.CreateOptions{}); err != nil {
|
||||
t.Fatalf("got unexpected error: %v", err)
|
||||
// Make another service. It will fail because we're out of cluster IPs
|
||||
if _, err := client.CoreV1().Services(metav1.NamespaceDefault).Create(context.TODO(), svc(8), metav1.CreateOptions{}); err != nil {
|
||||
if !strings.Contains(err.Error(), "range is full") {
|
||||
t.Errorf("unexpected error text: %v", err)
|
||||
}
|
||||
} else {
|
||||
svcs, err := client.CoreV1().Services(metav1.NamespaceAll).List(context.TODO(), metav1.ListOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected success, and error getting the services: %v", err)
|
||||
}
|
||||
allIPs := []string{}
|
||||
for _, s := range svcs.Items {
|
||||
allIPs = append(allIPs, s.Spec.ClusterIP)
|
||||
}
|
||||
t.Fatalf("unexpected creation success. The following IPs exist: %#v. It should only be possible to allocate 2 IP addresses in this cluster.\n\n%#v", allIPs, svcs)
|
||||
}
|
||||
|
||||
// Delete the first service.
|
||||
if err := client.CoreV1().Services(metav1.NamespaceDefault).Delete(context.TODO(), svc(1).ObjectMeta.Name, metav1.DeleteOptions{}); err != nil {
|
||||
t.Fatalf("got unexpected error: %v", err)
|
||||
}
|
||||
|
||||
// This time creating the second service should work.
|
||||
if _, err := client.CoreV1().Services(metav1.NamespaceDefault).Create(context.TODO(), svc(8), metav1.CreateOptions{}); err != nil {
|
||||
t.Fatalf("got unexpected error: %v", err)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestServiceAllocIPAddress(t *testing.T) {
|
||||
func TestServiceAllocIPAddressLargeCIDR(t *testing.T) {
|
||||
// Create an IPv6 single stack control-plane with a large range
|
||||
serviceCIDR := "2001:db8::/64"
|
||||
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.MultiCIDRServiceAllocator, true)
|
||||
|
||||
tCtx := ktesting.Init(t)
|
||||
client, _, tearDownFn := framework.StartTestServer(tCtx, t, framework.TestServerSetup{
|
||||
ModifyServerRunOptions: func(opts *options.ServerRunOptions) {
|
||||
opts.ServiceClusterIPRanges = serviceCIDR
|
||||
opts.GenericServerRunOptions.AdvertiseAddress = netutils.ParseIPSloppy("2001:db8::10")
|
||||
opts.APIEnablement.RuntimeConfig.Set("networking.k8s.io/v1alpha1=true")
|
||||
etcdOptions := framework.SharedEtcd()
|
||||
apiServerOptions := kubeapiservertesting.NewDefaultTestServerOptions()
|
||||
s1 := kubeapiservertesting.StartTestServerOrDie(t,
|
||||
apiServerOptions,
|
||||
[]string{
|
||||
"--runtime-config=networking.k8s.io/v1alpha1=true",
|
||||
"--service-cluster-ip-range=" + serviceCIDR,
|
||||
"--advertise-address=2001:db8::10",
|
||||
"--disable-admission-plugins=ServiceAccount",
|
||||
// bitmap allocator does not support large service CIDRs set DisableAllocatorDualWrite to false
|
||||
fmt.Sprintf("--feature-gates=%s=true,%s=true", features.MultiCIDRServiceAllocator, features.DisableAllocatorDualWrite),
|
||||
},
|
||||
})
|
||||
defer tearDownFn()
|
||||
etcdOptions)
|
||||
defer s1.TearDownFn()
|
||||
|
||||
client, err := clientset.NewForConfig(s1.ClientConfig)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
|
||||
svc := func(i int) *v1.Service {
|
||||
return &v1.Service{
|
||||
@@ -168,10 +215,10 @@ func TestServiceAllocIPAddress(t *testing.T) {
|
||||
// because it is not reasonable to create 2^64 services
|
||||
lastSvc := svc(8)
|
||||
lastSvc.Spec.ClusterIP = "2001:db8::ffff:ffff:ffff:ffff"
|
||||
if _, err := client.CoreV1().Services(metav1.NamespaceDefault).Create(context.TODO(), lastSvc, metav1.CreateOptions{}); err != nil {
|
||||
if _, err := client.CoreV1().Services(metav1.NamespaceDefault).Create(tCtx, lastSvc, metav1.CreateOptions{}); err != nil {
|
||||
t.Errorf("unexpected error text: %v", err)
|
||||
}
|
||||
_, err := client.NetworkingV1alpha1().IPAddresses().Get(context.TODO(), lastSvc.Spec.ClusterIP, metav1.GetOptions{})
|
||||
_, err = client.NetworkingV1alpha1().IPAddresses().Get(tCtx, lastSvc.Spec.ClusterIP, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
@@ -179,9 +226,6 @@ func TestServiceAllocIPAddress(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestMigrateService(t *testing.T) {
|
||||
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.MultiCIDRServiceAllocator, true)
|
||||
//logs.GlogSetter("7")
|
||||
|
||||
etcdOptions := framework.SharedEtcd()
|
||||
apiServerOptions := kubeapiservertesting.NewDefaultTestServerOptions()
|
||||
s := kubeapiservertesting.StartTestServerOrDie(t,
|
||||
@@ -191,6 +235,7 @@ func TestMigrateService(t *testing.T) {
|
||||
"--service-cluster-ip-range=10.0.0.0/24",
|
||||
"--advertise-address=10.1.1.1",
|
||||
"--disable-admission-plugins=ServiceAccount",
|
||||
fmt.Sprintf("--feature-gates=%s=true,%s=false", features.MultiCIDRServiceAllocator, features.DisableAllocatorDualWrite),
|
||||
},
|
||||
etcdOptions)
|
||||
defer s.TearDownFn()
|
||||
@@ -224,7 +269,7 @@ func TestMigrateService(t *testing.T) {
|
||||
}
|
||||
t.Logf("Service stored in etcd %v", string(svcJSON))
|
||||
|
||||
kubeclient, err := kubernetes.NewForConfig(s.ClientConfig)
|
||||
kubeclient, err := clientset.NewForConfig(s.ClientConfig)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
@@ -252,7 +297,9 @@ func TestMigrateService(t *testing.T) {
|
||||
|
||||
}
|
||||
|
||||
func TestSkewedAllocators(t *testing.T) {
|
||||
// TestSkewedAllocatorsRollback creating an apiserver with the new allocator and
|
||||
// later starting an old apiserver with the bitmap allocator.
|
||||
func TestSkewedAllocatorsRollback(t *testing.T) {
|
||||
svc := func(i int) *v1.Service {
|
||||
return &v1.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
@@ -275,11 +322,11 @@ func TestSkewedAllocators(t *testing.T) {
|
||||
"--runtime-config=networking.k8s.io/v1alpha1=true",
|
||||
"--service-cluster-ip-range=10.0.0.0/24",
|
||||
"--disable-admission-plugins=ServiceAccount",
|
||||
fmt.Sprintf("--feature-gates=%s=true", features.MultiCIDRServiceAllocator)},
|
||||
fmt.Sprintf("--feature-gates=%s=true,%s=true", features.MultiCIDRServiceAllocator, features.DisableAllocatorDualWrite)},
|
||||
etcdOptions)
|
||||
defer s1.TearDownFn()
|
||||
|
||||
kubeclient1, err := kubernetes.NewForConfig(s1.ClientConfig)
|
||||
kubeclient1, err := clientset.NewForConfig(s1.ClientConfig)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
@@ -307,7 +354,7 @@ func TestSkewedAllocators(t *testing.T) {
|
||||
etcdOptions)
|
||||
defer s2.TearDownFn()
|
||||
|
||||
kubeclient2, err := kubernetes.NewForConfig(s2.ClientConfig)
|
||||
kubeclient2, err := clientset.NewForConfig(s2.ClientConfig)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
@@ -335,6 +382,162 @@ func TestSkewedAllocators(t *testing.T) {
|
||||
|
||||
}
|
||||
|
||||
// TestSkewAllocatorsRollout test that two different apiservers, one with
|
||||
// the feature gate enable and other with it disable, can not allocate
|
||||
// the same IP to two different Services
|
||||
func TestSkewAllocatorsRollout(t *testing.T) {
|
||||
svc := func(name string, ip string) *v1.Service {
|
||||
return &v1.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: name,
|
||||
},
|
||||
Spec: v1.ServiceSpec{
|
||||
Type: v1.ServiceTypeClusterIP,
|
||||
ClusterIP: ip,
|
||||
Ports: []v1.ServicePort{
|
||||
{Port: 80},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
etcdOptions := framework.SharedEtcd()
|
||||
apiServerOptions := kubeapiservertesting.NewDefaultTestServerOptions()
|
||||
// Order matters here because the apiserver allocator logic needs to cast
|
||||
// the Allocator interface to be able to pass the Service reference.
|
||||
|
||||
// oldServer uses bitmap allocator
|
||||
oldServer := kubeapiservertesting.StartTestServerOrDie(t, apiServerOptions,
|
||||
[]string{
|
||||
"--runtime-config=networking.k8s.io/v1alpha1=false",
|
||||
"--service-cluster-ip-range=10.0.0.0/16",
|
||||
"--disable-admission-plugins=ServiceAccount",
|
||||
fmt.Sprintf("--feature-gates=%s=false", features.MultiCIDRServiceAllocator)},
|
||||
etcdOptions)
|
||||
defer oldServer.TearDownFn()
|
||||
|
||||
kubeclientOld, err := clientset.NewForConfig(oldServer.ClientConfig)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
// s1 uses IPAddress allocator
|
||||
newServer := kubeapiservertesting.StartTestServerOrDie(t, apiServerOptions,
|
||||
[]string{
|
||||
"--runtime-config=networking.k8s.io/v1alpha1=true",
|
||||
"--service-cluster-ip-range=10.0.0.0/16",
|
||||
"--disable-admission-plugins=ServiceAccount",
|
||||
fmt.Sprintf("--feature-gates=%s=true,%s=false", features.MultiCIDRServiceAllocator, features.DisableAllocatorDualWrite)},
|
||||
etcdOptions)
|
||||
defer newServer.TearDownFn()
|
||||
|
||||
kubeclientNew, err := clientset.NewForConfig(newServer.ClientConfig)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
|
||||
namespace := "test-ns"
|
||||
ns := framework.CreateNamespaceOrDie(kubeclientNew, namespace, t)
|
||||
defer framework.DeleteNamespaceOrDie(kubeclientNew, ns, t)
|
||||
|
||||
// create two Services in parallel , with the same ClusterIP, in each apiserver N times.
|
||||
concurrency := 100
|
||||
var errorsOld, errorsNew atomic.Uint64
|
||||
|
||||
var wg sync.WaitGroup
|
||||
for i := 5; i < concurrency+5; i++ {
|
||||
ip := fmt.Sprintf("10.0.0.%d", i)
|
||||
service1 := svc(fmt.Sprintf("svc-%d-new", i), ip)
|
||||
service2 := svc(fmt.Sprintf("svc-%d-old", i), ip)
|
||||
wg.Add(2)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
_, err := kubeclientNew.CoreV1().Services(namespace).Create(context.TODO(), service1, metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
t.Logf("Service %s with ip %s result: %v", service1.Name, service1.Spec.ClusterIP, err)
|
||||
errorsNew.Add(1)
|
||||
}
|
||||
}()
|
||||
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
_, err := kubeclientOld.CoreV1().Services(namespace).Create(context.TODO(), service2, metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
t.Logf("Service %s with ip %s result: %v", service2.Name, service2.Spec.ClusterIP, err)
|
||||
errorsOld.Add(1)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
|
||||
errorsTotal := errorsOld.Load() + errorsNew.Load()
|
||||
t.Logf("errors received, old allocator %d new allocator %d", errorsOld.Load(), errorsNew.Load())
|
||||
if errorsTotal != uint64(concurrency) {
|
||||
t.Fatalf("expected %d Services creation to have failed, got %d", concurrency, errorsTotal)
|
||||
}
|
||||
|
||||
// It takes some time for Services to be available,
|
||||
servicesList := []v1.Service{}
|
||||
err = wait.PollUntilContextTimeout(context.Background(), 1*time.Second, 10*time.Second, true, func(context.Context) (bool, error) {
|
||||
svcs, err := kubeclientNew.CoreV1().Services(namespace).List(context.TODO(), metav1.ListOptions{})
|
||||
if err != nil {
|
||||
return false, nil
|
||||
}
|
||||
if len(svcs.Items) != concurrency {
|
||||
t.Logf("expected %d Services to exist, got %d", concurrency, len(svcs.Items))
|
||||
return false, nil
|
||||
}
|
||||
servicesList = svcs.Items
|
||||
return true, nil
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("No expected Services objects created: %v", err)
|
||||
}
|
||||
|
||||
// It takes some time for the repairip loop to create the corresponding IPAddress objects
|
||||
// ClusterIPs are synchronized through the bitmap.
|
||||
err = wait.PollUntilContextTimeout(context.Background(), 1*time.Second, 10*time.Second, true, func(context.Context) (bool, error) {
|
||||
ips, err := kubeclientNew.NetworkingV1alpha1().IPAddresses().List(context.Background(), metav1.ListOptions{})
|
||||
if err != nil {
|
||||
return false, nil
|
||||
}
|
||||
// count the kubernetes.default service too
|
||||
if len(ips.Items) != concurrency+1 {
|
||||
t.Logf("expected %d IPAddresses to exist, got %d: %v", concurrency+1, len(ips.Items), ips.Items)
|
||||
return false, nil
|
||||
}
|
||||
return true, nil
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("No expected IPAddress objects created: %v", err)
|
||||
}
|
||||
|
||||
allIPs := map[string]string{}
|
||||
for _, s := range servicesList {
|
||||
if svc, ok := allIPs[s.Spec.ClusterIP]; ok {
|
||||
t.Fatalf("duplicate IP %s for Services %s and %s", s.Spec.ClusterIP, svc, s.Name)
|
||||
} else {
|
||||
allIPs[s.Spec.ClusterIP] = s.Name
|
||||
}
|
||||
}
|
||||
|
||||
// Check all the IPAddress objects are created
|
||||
for i := 5; i < concurrency+5; i++ {
|
||||
ip := fmt.Sprintf("10.0.0.%d", i)
|
||||
err = wait.PollUntilContextTimeout(context.Background(), 1*time.Second, 10*time.Second, true, func(context.Context) (bool, error) {
|
||||
// The repair loop must create the IP address associated
|
||||
_, err = kubeclientNew.NetworkingV1alpha1().IPAddresses().Get(context.Background(), ip, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return false, nil
|
||||
}
|
||||
return true, nil
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("No expected IPAddress objects created: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestFlagsIPAllocator(t *testing.T) {
|
||||
svc := func(i int) *v1.Service {
|
||||
return &v1.Service{
|
||||
@@ -361,7 +564,7 @@ func TestFlagsIPAllocator(t *testing.T) {
|
||||
etcdOptions)
|
||||
defer s1.TearDownFn()
|
||||
|
||||
kubeclient1, err := kubernetes.NewForConfig(s1.ClientConfig)
|
||||
kubeclient1, err := clientset.NewForConfig(s1.ClientConfig)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
|
@@ -30,7 +30,6 @@ import (
|
||||
)
|
||||
|
||||
func TestEnableDisableServiceCIDR(t *testing.T) {
|
||||
|
||||
svc := func(i int) *v1.Service {
|
||||
return &v1.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
@@ -119,6 +118,7 @@ func TestEnableDisableServiceCIDR(t *testing.T) {
|
||||
"--disable-admission-plugins=ServiceAccount",
|
||||
fmt.Sprintf("--feature-gates=%s=false", features.MultiCIDRServiceAllocator)},
|
||||
etcdOptions)
|
||||
defer s3.TearDownFn()
|
||||
|
||||
client3, err := clientset.NewForConfig(s3.ClientConfig)
|
||||
if err != nil {
|
||||
|
@@ -27,10 +27,8 @@ import (
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
"k8s.io/client-go/informers"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
||||
kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
|
||||
"k8s.io/kubernetes/pkg/controller/servicecidrs"
|
||||
"k8s.io/kubernetes/pkg/controlplane/controller/defaultservicecidr"
|
||||
@@ -49,7 +47,6 @@ import (
|
||||
// 6. start the new apiserver with the new ServiceCIDRs on the flags and shutdown the old one
|
||||
// 7. delete the kubernetes.default service, the new apiserver will recreate it within the new ServiceCIDR
|
||||
func TestMigrateServiceCIDR(t *testing.T) {
|
||||
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.MultiCIDRServiceAllocator, true)
|
||||
tCtx := ktesting.Init(t)
|
||||
|
||||
cidr1 := "192.168.0.0/29"
|
||||
@@ -64,6 +61,7 @@ func TestMigrateServiceCIDR(t *testing.T) {
|
||||
"--service-cluster-ip-range=" + cidr1,
|
||||
"--advertise-address=10.1.1.1",
|
||||
"--disable-admission-plugins=ServiceAccount",
|
||||
fmt.Sprintf("--feature-gates=%s=true,%s=true", features.MultiCIDRServiceAllocator, features.DisableAllocatorDualWrite),
|
||||
},
|
||||
etcdOptions)
|
||||
|
||||
|
@@ -24,17 +24,13 @@ import (
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
||||
"k8s.io/component-base/metrics"
|
||||
"k8s.io/component-base/metrics/legacyregistry"
|
||||
"k8s.io/component-base/metrics/testutil"
|
||||
"k8s.io/kubernetes/cmd/kube-apiserver/app/options"
|
||||
kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
"k8s.io/kubernetes/test/integration/framework"
|
||||
"k8s.io/kubernetes/test/utils/ktesting"
|
||||
netutils "k8s.io/utils/net"
|
||||
)
|
||||
|
||||
// TestServiceAllocPerformance measure the latency to create N services with a parallelism of K
|
||||
@@ -88,18 +84,25 @@ func TestServiceAllocPerformance(t *testing.T) {
|
||||
|
||||
for _, gate := range []bool{false, true} {
|
||||
t.Run(fmt.Sprintf("feature-gate=%v", gate), func(t *testing.T) {
|
||||
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.MultiCIDRServiceAllocator, gate)
|
||||
|
||||
tCtx := ktesting.Init(t)
|
||||
client, _, tearDownFn := framework.StartTestServer(tCtx, t, framework.TestServerSetup{
|
||||
ModifyServerRunOptions: func(opts *options.ServerRunOptions) {
|
||||
// use the larget range possible , this is limited by the old allocator
|
||||
opts.ServiceClusterIPRanges = "10.0.0.0/12"
|
||||
opts.GenericServerRunOptions.AdvertiseAddress = netutils.ParseIPSloppy("10.0.0.1")
|
||||
opts.APIEnablement.RuntimeConfig.Set("networking.k8s.io/v1alpha1=true") // nolint: errcheck
|
||||
etcdOptions := framework.SharedEtcd()
|
||||
apiServerOptions := kubeapiservertesting.NewDefaultTestServerOptions()
|
||||
s1 := kubeapiservertesting.StartTestServerOrDie(t,
|
||||
apiServerOptions,
|
||||
[]string{
|
||||
"--runtime-config=networking.k8s.io/v1alpha1=true",
|
||||
"--service-cluster-ip-range=" + "10.0.0.0/12",
|
||||
"--advertise-address=10.0.0.1",
|
||||
"--disable-admission-plugins=ServiceAccount",
|
||||
fmt.Sprintf("--feature-gates=%s=true,%s=true", features.MultiCIDRServiceAllocator, features.DisableAllocatorDualWrite),
|
||||
},
|
||||
})
|
||||
defer tearDownFn()
|
||||
etcdOptions)
|
||||
|
||||
defer s1.TearDownFn()
|
||||
|
||||
client, err := clientset.NewForConfig(s1.ClientConfig)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
|
||||
legacyregistry.Reset()
|
||||
|
||||
|
@@ -29,10 +29,8 @@ import (
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
"k8s.io/client-go/informers"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
||||
kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
|
||||
"k8s.io/kubernetes/pkg/controller/servicecidrs"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
@@ -40,8 +38,6 @@ import (
|
||||
)
|
||||
|
||||
func TestServiceAllocNewServiceCIDR(t *testing.T) {
|
||||
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.MultiCIDRServiceAllocator, true)
|
||||
|
||||
etcdOptions := framework.SharedEtcd()
|
||||
apiServerOptions := kubeapiservertesting.NewDefaultTestServerOptions()
|
||||
s := kubeapiservertesting.StartTestServerOrDie(t,
|
||||
@@ -51,6 +47,7 @@ func TestServiceAllocNewServiceCIDR(t *testing.T) {
|
||||
"--service-cluster-ip-range=192.168.0.0/29",
|
||||
"--advertise-address=10.1.1.1",
|
||||
"--disable-admission-plugins=ServiceAccount",
|
||||
fmt.Sprintf("--feature-gates=%s=true,%s=true", features.MultiCIDRServiceAllocator, features.DisableAllocatorDualWrite),
|
||||
},
|
||||
etcdOptions)
|
||||
defer s.TearDownFn()
|
||||
@@ -134,7 +131,6 @@ func TestServiceAllocNewServiceCIDR(t *testing.T) {
|
||||
// Deletes the Service with the IPAddress blocking the deletion
|
||||
// cidr3 must not exist at this point
|
||||
func TestServiceCIDRDeletion(t *testing.T) {
|
||||
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.MultiCIDRServiceAllocator, true)
|
||||
cidr1 := "192.168.0.0/29" // same as the default
|
||||
cidr2 := "10.0.0.0/24" // new range
|
||||
cidr3 := "10.0.0.0/16" // contains cidr2
|
||||
@@ -148,6 +144,7 @@ func TestServiceCIDRDeletion(t *testing.T) {
|
||||
"--service-cluster-ip-range=" + cidr1,
|
||||
"--advertise-address=172.16.1.1",
|
||||
"--disable-admission-plugins=ServiceAccount",
|
||||
fmt.Sprintf("--feature-gates=%s=true,%s=true", features.MultiCIDRServiceAllocator, features.DisableAllocatorDualWrite),
|
||||
},
|
||||
etcdOptions)
|
||||
defer s.TearDownFn()
|
||||
|
Reference in New Issue
Block a user