SPLIT: kube-apiserver/corerest: split out service IP setup logic
This commit is contained in:
		@@ -591,7 +591,7 @@ func (m *Instance) InstallLegacyAPI(c *completedConfig, restOptionsGetter generi
 | 
			
		||||
		SecondaryServiceIPRange: c.ExtraConfig.SecondaryServiceIPRange,
 | 
			
		||||
		ServiceNodePortRange:    c.ExtraConfig.ServiceNodePortRange,
 | 
			
		||||
	}
 | 
			
		||||
	legacyRESTStorage, apiGroupInfo, err := legacyRESTStorageProvider.NewLegacyRESTStorage(c.ExtraConfig.APIResourceConfigSource, restOptionsGetter)
 | 
			
		||||
	rangeRegistries, apiGroupInfo, err := legacyRESTStorageProvider.NewLegacyRESTStorage(c.ExtraConfig.APIResourceConfigSource, restOptionsGetter)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return fmt.Errorf("error building core storage: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
@@ -612,7 +612,7 @@ func (m *Instance) InstallLegacyAPI(c *completedConfig, restOptionsGetter generi
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	bootstrapController, err := kubernetesservice.New(*kubenetesserviceConfig, legacyRESTStorage)
 | 
			
		||||
	bootstrapController, err := kubernetesservice.New(*kubenetesserviceConfig, rangeRegistries)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return fmt.Errorf("error creating bootstrap controller: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
 
 | 
			
		||||
@@ -53,7 +53,6 @@ import (
 | 
			
		||||
	pvcstore "k8s.io/kubernetes/pkg/registry/core/persistentvolumeclaim/storage"
 | 
			
		||||
	podstore "k8s.io/kubernetes/pkg/registry/core/pod/storage"
 | 
			
		||||
	podtemplatestore "k8s.io/kubernetes/pkg/registry/core/podtemplate/storage"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/registry/core/rangeallocation"
 | 
			
		||||
	controllerstore "k8s.io/kubernetes/pkg/registry/core/replicationcontroller/storage"
 | 
			
		||||
	resourcequotastore "k8s.io/kubernetes/pkg/registry/core/resourcequota/storage"
 | 
			
		||||
	secretstore "k8s.io/kubernetes/pkg/registry/core/secret/storage"
 | 
			
		||||
@@ -231,109 +230,14 @@ func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(apiResourceConfigSource
 | 
			
		||||
		return kubernetesservice.RangeRegistries{}, genericapiserver.APIGroupInfo{}, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	var rangeRegistries kubernetesservice.RangeRegistries
 | 
			
		||||
 | 
			
		||||
	var serviceClusterIPRegistry rangeallocation.RangeRegistry
 | 
			
		||||
	serviceClusterIPRange := c.ServiceIPRange
 | 
			
		||||
	if serviceClusterIPRange.IP == nil {
 | 
			
		||||
		return kubernetesservice.RangeRegistries{}, genericapiserver.APIGroupInfo{}, fmt.Errorf("service clusterIPRange is missing")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	serviceStorageConfig, err := c.StorageFactory.NewConfig(api.Resource("services"))
 | 
			
		||||
	rangeRegistries, primaryServiceClusterIPAllocator, serviceClusterIPAllocators, serviceNodePortAllocator, err := c.newServiceIPAllocators()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return kubernetesservice.RangeRegistries{}, genericapiserver.APIGroupInfo{}, err
 | 
			
		||||
	}
 | 
			
		||||
	var serviceClusterIPAllocator, secondaryServiceClusterIPAllocator ipallocator.Interface
 | 
			
		||||
 | 
			
		||||
	if !utilfeature.DefaultFeatureGate.Enabled(features.MultiCIDRServiceAllocator) {
 | 
			
		||||
		serviceClusterIPAllocator, err = ipallocator.New(&serviceClusterIPRange, func(max int, rangeSpec string, offset int) (allocator.Interface, error) {
 | 
			
		||||
			var mem allocator.Snapshottable
 | 
			
		||||
			mem = allocator.NewAllocationMapWithOffset(max, rangeSpec, offset)
 | 
			
		||||
			// TODO etcdallocator package to return a storage interface via the storageFactory
 | 
			
		||||
			etcd, err := serviceallocator.NewEtcd(mem, "/ranges/serviceips", serviceStorageConfig.ForResource(api.Resource("serviceipallocations")))
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				return nil, err
 | 
			
		||||
			}
 | 
			
		||||
			serviceClusterIPRegistry = etcd
 | 
			
		||||
			return etcd, nil
 | 
			
		||||
		})
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return kubernetesservice.RangeRegistries{}, genericapiserver.APIGroupInfo{}, fmt.Errorf("cannot create cluster IP allocator: %v", err)
 | 
			
		||||
		}
 | 
			
		||||
	} else {
 | 
			
		||||
		networkingv1alphaClient, err := networkingv1alpha1client.NewForConfig(c.LoopbackClientConfig)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return kubernetesservice.RangeRegistries{}, genericapiserver.APIGroupInfo{}, err
 | 
			
		||||
		}
 | 
			
		||||
		serviceClusterIPAllocator, err = ipallocator.NewIPAllocator(&serviceClusterIPRange, networkingv1alphaClient, c.Informers.Networking().V1alpha1().IPAddresses())
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return kubernetesservice.RangeRegistries{}, genericapiserver.APIGroupInfo{}, fmt.Errorf("cannot create cluster IP allocator: %v", err)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	serviceClusterIPAllocator.EnableMetrics()
 | 
			
		||||
	rangeRegistries.ServiceClusterIPRegistry = serviceClusterIPRegistry
 | 
			
		||||
 | 
			
		||||
	// allocator for secondary service ip range
 | 
			
		||||
	if c.SecondaryServiceIPRange.IP != nil {
 | 
			
		||||
		var secondaryServiceClusterIPRegistry rangeallocation.RangeRegistry
 | 
			
		||||
		if !utilfeature.DefaultFeatureGate.Enabled(features.MultiCIDRServiceAllocator) {
 | 
			
		||||
			secondaryServiceClusterIPAllocator, err = ipallocator.New(&c.SecondaryServiceIPRange, func(max int, rangeSpec string, offset int) (allocator.Interface, error) {
 | 
			
		||||
				var mem allocator.Snapshottable
 | 
			
		||||
				mem = allocator.NewAllocationMapWithOffset(max, rangeSpec, offset)
 | 
			
		||||
				// TODO etcdallocator package to return a storage interface via the storageFactory
 | 
			
		||||
				etcd, err := serviceallocator.NewEtcd(mem, "/ranges/secondaryserviceips", serviceStorageConfig.ForResource(api.Resource("serviceipallocations")))
 | 
			
		||||
				if err != nil {
 | 
			
		||||
					return nil, err
 | 
			
		||||
				}
 | 
			
		||||
				secondaryServiceClusterIPRegistry = etcd
 | 
			
		||||
				return etcd, nil
 | 
			
		||||
			})
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				return kubernetesservice.RangeRegistries{}, genericapiserver.APIGroupInfo{}, fmt.Errorf("cannot create cluster secondary IP allocator: %v", err)
 | 
			
		||||
			}
 | 
			
		||||
		} else {
 | 
			
		||||
			networkingv1alphaClient, err := networkingv1alpha1client.NewForConfig(c.LoopbackClientConfig)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				return kubernetesservice.RangeRegistries{}, genericapiserver.APIGroupInfo{}, err
 | 
			
		||||
			}
 | 
			
		||||
			secondaryServiceClusterIPAllocator, err = ipallocator.NewIPAllocator(&c.SecondaryServiceIPRange, networkingv1alphaClient, c.Informers.Networking().V1alpha1().IPAddresses())
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				return kubernetesservice.RangeRegistries{}, genericapiserver.APIGroupInfo{}, fmt.Errorf("cannot create cluster secondary IP allocator: %v", err)
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
		secondaryServiceClusterIPAllocator.EnableMetrics()
 | 
			
		||||
		rangeRegistries.SecondaryServiceClusterIPRegistry = secondaryServiceClusterIPRegistry
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	var serviceNodePortRegistry rangeallocation.RangeRegistry
 | 
			
		||||
	serviceNodePortAllocator, err := portallocator.New(c.ServiceNodePortRange, func(max int, rangeSpec string, offset int) (allocator.Interface, error) {
 | 
			
		||||
		mem := allocator.NewAllocationMapWithOffset(max, rangeSpec, offset)
 | 
			
		||||
		// TODO etcdallocator package to return a storage interface via the storageFactory
 | 
			
		||||
		etcd, err := serviceallocator.NewEtcd(mem, "/ranges/servicenodeports", serviceStorageConfig.ForResource(api.Resource("servicenodeportallocations")))
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return nil, err
 | 
			
		||||
		}
 | 
			
		||||
		serviceNodePortRegistry = etcd
 | 
			
		||||
		return etcd, nil
 | 
			
		||||
	})
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return kubernetesservice.RangeRegistries{}, genericapiserver.APIGroupInfo{}, fmt.Errorf("cannot create cluster port allocator: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
	serviceNodePortAllocator.EnableMetrics()
 | 
			
		||||
	rangeRegistries.ServiceNodePortRegistry = serviceNodePortRegistry
 | 
			
		||||
 | 
			
		||||
	serviceIPAllocators := map[api.IPFamily]ipallocator.Interface{
 | 
			
		||||
		serviceClusterIPAllocator.IPFamily(): serviceClusterIPAllocator,
 | 
			
		||||
	}
 | 
			
		||||
	if secondaryServiceClusterIPAllocator != nil {
 | 
			
		||||
		serviceIPAllocators[secondaryServiceClusterIPAllocator.IPFamily()] = secondaryServiceClusterIPAllocator
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	serviceRESTStorage, serviceStatusStorage, serviceRESTProxy, err := servicestore.NewREST(
 | 
			
		||||
		restOptionsGetter,
 | 
			
		||||
		serviceClusterIPAllocator.IPFamily(),
 | 
			
		||||
		serviceIPAllocators,
 | 
			
		||||
		primaryServiceClusterIPAllocator.IPFamily(),
 | 
			
		||||
		serviceClusterIPAllocators,
 | 
			
		||||
		serviceNodePortAllocator,
 | 
			
		||||
		endpointsStorage,
 | 
			
		||||
		podStorage.Pod,
 | 
			
		||||
@@ -446,6 +350,97 @@ func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(apiResourceConfigSource
 | 
			
		||||
	return rangeRegistries, apiGroupInfo, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c LegacyRESTStorageProvider) newServiceIPAllocators() (registries kubernetesservice.RangeRegistries, primaryClusterIPAllocator ipallocator.Interface, clusterIPAllocators map[api.IPFamily]ipallocator.Interface, nodePortAllocator *portallocator.PortAllocator, err error) {
 | 
			
		||||
	clusterIPAllocators = map[api.IPFamily]ipallocator.Interface{}
 | 
			
		||||
 | 
			
		||||
	serviceStorageConfig, err := c.StorageFactory.NewConfig(api.Resource("services"))
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return kubernetesservice.RangeRegistries{}, nil, nil, nil, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	serviceClusterIPRange := c.ServiceIPRange
 | 
			
		||||
	if serviceClusterIPRange.IP == nil {
 | 
			
		||||
		return kubernetesservice.RangeRegistries{}, nil, nil, nil, fmt.Errorf("service clusterIPRange is missing")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if !utilfeature.DefaultFeatureGate.Enabled(features.MultiCIDRServiceAllocator) {
 | 
			
		||||
		primaryClusterIPAllocator, err = ipallocator.New(&serviceClusterIPRange, func(max int, rangeSpec string, offset int) (allocator.Interface, error) {
 | 
			
		||||
			var mem allocator.Snapshottable
 | 
			
		||||
			mem = allocator.NewAllocationMapWithOffset(max, rangeSpec, offset)
 | 
			
		||||
			// TODO etcdallocator package to return a storage interface via the storageFactory
 | 
			
		||||
			etcd, err := serviceallocator.NewEtcd(mem, "/ranges/serviceips", serviceStorageConfig.ForResource(api.Resource("serviceipallocations")))
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				return nil, err
 | 
			
		||||
			}
 | 
			
		||||
			registries.ServiceClusterIPRegistry = etcd
 | 
			
		||||
			return etcd, nil
 | 
			
		||||
		})
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return kubernetesservice.RangeRegistries{}, nil, nil, nil, fmt.Errorf("cannot create cluster IP allocator: %v", err)
 | 
			
		||||
		}
 | 
			
		||||
	} else {
 | 
			
		||||
		networkingv1alphaClient, err := networkingv1alpha1client.NewForConfig(c.LoopbackClientConfig)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return kubernetesservice.RangeRegistries{}, nil, nil, nil, err
 | 
			
		||||
		}
 | 
			
		||||
		primaryClusterIPAllocator, err = ipallocator.NewIPAllocator(&serviceClusterIPRange, networkingv1alphaClient, c.Informers.Networking().V1alpha1().IPAddresses())
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return kubernetesservice.RangeRegistries{}, nil, nil, nil, fmt.Errorf("cannot create cluster IP allocator: %v", err)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	primaryClusterIPAllocator.EnableMetrics()
 | 
			
		||||
	clusterIPAllocators[primaryClusterIPAllocator.IPFamily()] = primaryClusterIPAllocator
 | 
			
		||||
 | 
			
		||||
	var secondaryClusterIPAllocator ipallocator.Interface
 | 
			
		||||
	if c.SecondaryServiceIPRange.IP != nil {
 | 
			
		||||
		if !utilfeature.DefaultFeatureGate.Enabled(features.MultiCIDRServiceAllocator) {
 | 
			
		||||
			var err error
 | 
			
		||||
			secondaryClusterIPAllocator, err = ipallocator.New(&c.SecondaryServiceIPRange, func(max int, rangeSpec string, offset int) (allocator.Interface, error) {
 | 
			
		||||
				var mem allocator.Snapshottable
 | 
			
		||||
				mem = allocator.NewAllocationMapWithOffset(max, rangeSpec, offset)
 | 
			
		||||
				// TODO etcdallocator package to return a storage interface via the storageFactory
 | 
			
		||||
				etcd, err := serviceallocator.NewEtcd(mem, "/ranges/secondaryserviceips", serviceStorageConfig.ForResource(api.Resource("serviceipallocations")))
 | 
			
		||||
				if err != nil {
 | 
			
		||||
					return nil, err
 | 
			
		||||
				}
 | 
			
		||||
				registries.SecondaryServiceClusterIPRegistry = etcd
 | 
			
		||||
				return etcd, nil
 | 
			
		||||
			})
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				return kubernetesservice.RangeRegistries{}, nil, nil, nil, fmt.Errorf("cannot create cluster secondary IP allocator: %v", err)
 | 
			
		||||
			}
 | 
			
		||||
		} else {
 | 
			
		||||
			networkingv1alphaClient, err := networkingv1alpha1client.NewForConfig(c.LoopbackClientConfig)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				return kubernetesservice.RangeRegistries{}, nil, nil, nil, err
 | 
			
		||||
			}
 | 
			
		||||
			secondaryClusterIPAllocator, err = ipallocator.NewIPAllocator(&c.SecondaryServiceIPRange, networkingv1alphaClient, c.Informers.Networking().V1alpha1().IPAddresses())
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				return kubernetesservice.RangeRegistries{}, nil, nil, nil, fmt.Errorf("cannot create cluster secondary IP allocator: %v", err)
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
		secondaryClusterIPAllocator.EnableMetrics()
 | 
			
		||||
		clusterIPAllocators[secondaryClusterIPAllocator.IPFamily()] = secondaryClusterIPAllocator
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	nodePortAllocator, err = portallocator.New(c.ServiceNodePortRange, func(max int, rangeSpec string, offset int) (allocator.Interface, error) {
 | 
			
		||||
		mem := allocator.NewAllocationMapWithOffset(max, rangeSpec, offset)
 | 
			
		||||
		// TODO etcdallocator package to return a storage interface via the storageFactory
 | 
			
		||||
		etcd, err := serviceallocator.NewEtcd(mem, "/ranges/servicenodeports", serviceStorageConfig.ForResource(api.Resource("servicenodeportallocations")))
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return nil, err
 | 
			
		||||
		}
 | 
			
		||||
		registries.ServiceNodePortRegistry = etcd
 | 
			
		||||
		return etcd, nil
 | 
			
		||||
	})
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return kubernetesservice.RangeRegistries{}, nil, nil, nil, fmt.Errorf("cannot create cluster port allocator: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
	nodePortAllocator.EnableMetrics()
 | 
			
		||||
 | 
			
		||||
	return
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (p LegacyRESTStorageProvider) GroupName() string {
 | 
			
		||||
	return api.GroupName
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user