Merge pull request #119082 from sttts/sttts-corerest-generic-split
kube-apiserver/corerest: normalize service IP range plumbing
This commit is contained in:
		| @@ -575,21 +575,23 @@ func labelAPIServerHeartbeatFunc(identity string) lease.ProcessLeaseFunc { | ||||
| // InstallLegacyAPI will install the legacy APIs for the restStorageProviders if they are enabled. | ||||
| func (m *Instance) InstallLegacyAPI(c *completedConfig, restOptionsGetter generic.RESTOptionsGetter) error { | ||||
| 	legacyRESTStorageProvider := corerest.LegacyRESTStorageProvider{ | ||||
| 		StorageFactory:              c.ExtraConfig.StorageFactory, | ||||
| 		ProxyTransport:              c.ExtraConfig.ProxyTransport, | ||||
| 		KubeletClientConfig:         c.ExtraConfig.KubeletClientConfig, | ||||
| 		EventTTL:                    c.ExtraConfig.EventTTL, | ||||
| 		ServiceIPRange:              c.ExtraConfig.ServiceIPRange, | ||||
| 		SecondaryServiceIPRange:     c.ExtraConfig.SecondaryServiceIPRange, | ||||
| 		ServiceNodePortRange:        c.ExtraConfig.ServiceNodePortRange, | ||||
| 		LoopbackClientConfig:        c.GenericConfig.LoopbackClientConfig, | ||||
| 		ServiceAccountIssuer:        c.ExtraConfig.ServiceAccountIssuer, | ||||
| 		ExtendExpiration:            c.ExtraConfig.ExtendExpiration, | ||||
| 		ServiceAccountMaxExpiration: c.ExtraConfig.ServiceAccountMaxExpiration, | ||||
| 		APIAudiences:                c.GenericConfig.Authentication.APIAudiences, | ||||
| 		Informers:                   c.ExtraConfig.VersionedInformers, | ||||
| 		GenericLegacyRESTStorageProvider: corerest.GenericLegacyRESTStorageProvider{ | ||||
| 			StorageFactory:              c.ExtraConfig.StorageFactory, | ||||
| 			EventTTL:                    c.ExtraConfig.EventTTL, | ||||
| 			LoopbackClientConfig:        c.GenericConfig.LoopbackClientConfig, | ||||
| 			ServiceAccountIssuer:        c.ExtraConfig.ServiceAccountIssuer, | ||||
| 			ExtendExpiration:            c.ExtraConfig.ExtendExpiration, | ||||
| 			ServiceAccountMaxExpiration: c.ExtraConfig.ServiceAccountMaxExpiration, | ||||
| 			APIAudiences:                c.GenericConfig.Authentication.APIAudiences, | ||||
| 			Informers:                   c.ExtraConfig.VersionedInformers, | ||||
| 		}, | ||||
| 		ProxyTransport:          c.ExtraConfig.ProxyTransport, | ||||
| 		KubeletClientConfig:     c.ExtraConfig.KubeletClientConfig, | ||||
| 		ServiceIPRange:          c.ExtraConfig.ServiceIPRange, | ||||
| 		SecondaryServiceIPRange: c.ExtraConfig.SecondaryServiceIPRange, | ||||
| 		ServiceNodePortRange:    c.ExtraConfig.ServiceNodePortRange, | ||||
| 	} | ||||
| 	legacyRESTStorage, apiGroupInfo, err := legacyRESTStorageProvider.NewLegacyRESTStorage(c.ExtraConfig.APIResourceConfigSource, restOptionsGetter) | ||||
| 	rangeRegistries, apiGroupInfo, err := legacyRESTStorageProvider.NewLegacyRESTStorage(c.ExtraConfig.APIResourceConfigSource, restOptionsGetter) | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("error building core storage: %v", err) | ||||
| 	} | ||||
| @@ -610,7 +612,7 @@ func (m *Instance) InstallLegacyAPI(c *completedConfig, restOptionsGetter generi | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	bootstrapController, err := kubernetesservice.New(*kubenetesserviceConfig, legacyRESTStorage) | ||||
| 	bootstrapController, err := kubernetesservice.New(*kubenetesserviceConfig, rangeRegistries) | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("error creating bootstrap controller: %v", err) | ||||
| 	} | ||||
|   | ||||
| @@ -153,14 +153,16 @@ func TestLegacyRestStorageStrategies(t *testing.T) { | ||||
| 	defer etcdserver.Terminate(t) | ||||
|  | ||||
| 	storageProvider := corerest.LegacyRESTStorageProvider{ | ||||
| 		StorageFactory:       apiserverCfg.ExtraConfig.StorageFactory, | ||||
| 		GenericLegacyRESTStorageProvider: corerest.GenericLegacyRESTStorageProvider{ | ||||
| 			StorageFactory:       apiserverCfg.ExtraConfig.StorageFactory, | ||||
| 			EventTTL:             apiserverCfg.ExtraConfig.EventTTL, | ||||
| 			LoopbackClientConfig: apiserverCfg.GenericConfig.LoopbackClientConfig, | ||||
| 			Informers:            apiserverCfg.ExtraConfig.VersionedInformers, | ||||
| 		}, | ||||
| 		ProxyTransport:       apiserverCfg.ExtraConfig.ProxyTransport, | ||||
| 		KubeletClientConfig:  apiserverCfg.ExtraConfig.KubeletClientConfig, | ||||
| 		EventTTL:             apiserverCfg.ExtraConfig.EventTTL, | ||||
| 		ServiceIPRange:       apiserverCfg.ExtraConfig.ServiceIPRange, | ||||
| 		ServiceNodePortRange: apiserverCfg.ExtraConfig.ServiceNodePortRange, | ||||
| 		LoopbackClientConfig: apiserverCfg.GenericConfig.LoopbackClientConfig, | ||||
| 		Informers:            apiserverCfg.ExtraConfig.VersionedInformers, | ||||
| 	} | ||||
|  | ||||
| 	_, apiGroupInfo, err := storageProvider.NewLegacyRESTStorage(serverstorage.NewResourceConfig(), apiserverCfg.GenericConfig.RESTOptionsGetter) | ||||
|   | ||||
| @@ -53,7 +53,6 @@ import ( | ||||
| 	pvcstore "k8s.io/kubernetes/pkg/registry/core/persistentvolumeclaim/storage" | ||||
| 	podstore "k8s.io/kubernetes/pkg/registry/core/pod/storage" | ||||
| 	podtemplatestore "k8s.io/kubernetes/pkg/registry/core/podtemplate/storage" | ||||
| 	"k8s.io/kubernetes/pkg/registry/core/rangeallocation" | ||||
| 	controllerstore "k8s.io/kubernetes/pkg/registry/core/replicationcontroller/storage" | ||||
| 	resourcequotastore "k8s.io/kubernetes/pkg/registry/core/resourcequota/storage" | ||||
| 	secretstore "k8s.io/kubernetes/pkg/registry/core/secret/storage" | ||||
| @@ -67,20 +66,12 @@ import ( | ||||
| 	"k8s.io/kubernetes/pkg/serviceaccount" | ||||
| ) | ||||
|  | ||||
| // LegacyRESTStorageProvider provides information needed to build RESTStorage for core, but | ||||
| // does NOT implement the "normal" RESTStorageProvider (yet!) | ||||
| type LegacyRESTStorageProvider struct { | ||||
| // GenericLegacyRESTStorageProvider provides information needed to build RESTStorage | ||||
| // for generic resources in core, but does NOT implement the "normal" | ||||
| // RESTStorageProvider (yet!) | ||||
| type GenericLegacyRESTStorageProvider struct { | ||||
| 	StorageFactory serverstorage.StorageFactory | ||||
| 	// Used for custom proxy dialing, and proxy TLS options | ||||
| 	ProxyTransport      http.RoundTripper | ||||
| 	KubeletClientConfig kubeletclient.KubeletClientConfig | ||||
| 	EventTTL            time.Duration | ||||
|  | ||||
| 	// ServiceIPRange is used to build cluster IPs for discovery. | ||||
| 	ServiceIPRange net.IPNet | ||||
| 	// allocates ips for secondary service cidr in dual  stack clusters | ||||
| 	SecondaryServiceIPRange net.IPNet | ||||
| 	ServiceNodePortRange    utilnet.PortRange | ||||
| 	EventTTL       time.Duration | ||||
|  | ||||
| 	ServiceAccountIssuer        serviceaccount.TokenGenerator | ||||
| 	ServiceAccountMaxExpiration time.Duration | ||||
| @@ -92,16 +83,24 @@ type LegacyRESTStorageProvider struct { | ||||
| 	Informers            informers.SharedInformerFactory | ||||
| } | ||||
|  | ||||
| // LegacyRESTStorage returns stateful information about particular instances of REST storage to | ||||
| // master.go for wiring controllers. | ||||
| // TODO remove this by running the controller as a poststarthook | ||||
| type LegacyRESTStorage struct { | ||||
| 	ServiceClusterIPAllocator          rangeallocation.RangeRegistry | ||||
| 	SecondaryServiceClusterIPAllocator rangeallocation.RangeRegistry | ||||
| 	ServiceNodePortAllocator           rangeallocation.RangeRegistry | ||||
| // LegacyRESTStorageProvider provides information needed to build RESTStorage for core, but | ||||
| // does NOT implement the "normal" RESTStorageProvider (yet!) | ||||
| type LegacyRESTStorageProvider struct { | ||||
| 	GenericLegacyRESTStorageProvider | ||||
|  | ||||
| 	// Used for custom proxy dialing, and proxy TLS options | ||||
| 	ProxyTransport      http.RoundTripper | ||||
| 	KubeletClientConfig kubeletclient.KubeletClientConfig | ||||
|  | ||||
| 	// ServiceIPRange is used to build cluster IPs for discovery. | ||||
| 	ServiceIPRange net.IPNet | ||||
|  | ||||
| 	// allocates ips for secondary service cidr in dual  stack clusters | ||||
| 	SecondaryServiceIPRange net.IPNet | ||||
| 	ServiceNodePortRange    utilnet.PortRange | ||||
| } | ||||
|  | ||||
| func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter) (kubernetesservice.RangeRegistries, genericapiserver.APIGroupInfo, error) { | ||||
| func (c GenericLegacyRESTStorageProvider) NewRESTStorage(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter) (genericapiserver.APIGroupInfo, error) { | ||||
| 	apiGroupInfo := genericapiserver.APIGroupInfo{ | ||||
| 		PrioritizedVersions:          legacyscheme.Scheme.PrioritizedVersionsForGroup(""), | ||||
| 		VersionedResourcesStorageMap: map[string]map[string]rest.Storage{}, | ||||
| @@ -110,252 +109,45 @@ func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(apiResourceConfigSource | ||||
| 		NegotiatedSerializer:         legacyscheme.Codecs, | ||||
| 	} | ||||
|  | ||||
| 	podDisruptionClient, err := policyclient.NewForConfig(c.LoopbackClientConfig) | ||||
| 	if err != nil { | ||||
| 		return kubernetesservice.RangeRegistries{}, genericapiserver.APIGroupInfo{}, err | ||||
| 	} | ||||
| 	restStorage := kubernetesservice.RangeRegistries{} | ||||
|  | ||||
| 	podTemplateStorage, err := podtemplatestore.NewREST(restOptionsGetter) | ||||
| 	if err != nil { | ||||
| 		return kubernetesservice.RangeRegistries{}, genericapiserver.APIGroupInfo{}, err | ||||
| 	} | ||||
|  | ||||
| 	eventStorage, err := eventstore.NewREST(restOptionsGetter, uint64(c.EventTTL.Seconds())) | ||||
| 	if err != nil { | ||||
| 		return kubernetesservice.RangeRegistries{}, genericapiserver.APIGroupInfo{}, err | ||||
| 	} | ||||
| 	limitRangeStorage, err := limitrangestore.NewREST(restOptionsGetter) | ||||
| 	if err != nil { | ||||
| 		return kubernetesservice.RangeRegistries{}, genericapiserver.APIGroupInfo{}, err | ||||
| 		return genericapiserver.APIGroupInfo{}, err | ||||
| 	} | ||||
|  | ||||
| 	resourceQuotaStorage, resourceQuotaStatusStorage, err := resourcequotastore.NewREST(restOptionsGetter) | ||||
| 	if err != nil { | ||||
| 		return kubernetesservice.RangeRegistries{}, genericapiserver.APIGroupInfo{}, err | ||||
| 		return genericapiserver.APIGroupInfo{}, err | ||||
| 	} | ||||
| 	secretStorage, err := secretstore.NewREST(restOptionsGetter) | ||||
| 	if err != nil { | ||||
| 		return kubernetesservice.RangeRegistries{}, genericapiserver.APIGroupInfo{}, err | ||||
| 	} | ||||
| 	persistentVolumeStorage, persistentVolumeStatusStorage, err := pvstore.NewREST(restOptionsGetter) | ||||
| 	if err != nil { | ||||
| 		return kubernetesservice.RangeRegistries{}, genericapiserver.APIGroupInfo{}, err | ||||
| 	} | ||||
| 	persistentVolumeClaimStorage, persistentVolumeClaimStatusStorage, err := pvcstore.NewREST(restOptionsGetter) | ||||
| 	if err != nil { | ||||
| 		return kubernetesservice.RangeRegistries{}, genericapiserver.APIGroupInfo{}, err | ||||
| 		return genericapiserver.APIGroupInfo{}, err | ||||
| 	} | ||||
|  | ||||
| 	configMapStorage, err := configmapstore.NewREST(restOptionsGetter) | ||||
| 	if err != nil { | ||||
| 		return kubernetesservice.RangeRegistries{}, genericapiserver.APIGroupInfo{}, err | ||||
| 		return genericapiserver.APIGroupInfo{}, err | ||||
| 	} | ||||
|  | ||||
| 	namespaceStorage, namespaceStatusStorage, namespaceFinalizeStorage, err := namespacestore.NewREST(restOptionsGetter) | ||||
| 	if err != nil { | ||||
| 		return kubernetesservice.RangeRegistries{}, genericapiserver.APIGroupInfo{}, err | ||||
| 	} | ||||
|  | ||||
| 	endpointsStorage, err := endpointsstore.NewREST(restOptionsGetter) | ||||
| 	if err != nil { | ||||
| 		return kubernetesservice.RangeRegistries{}, genericapiserver.APIGroupInfo{}, err | ||||
| 	} | ||||
|  | ||||
| 	nodeStorage, err := nodestore.NewStorage(restOptionsGetter, c.KubeletClientConfig, c.ProxyTransport) | ||||
| 	if err != nil { | ||||
| 		return kubernetesservice.RangeRegistries{}, genericapiserver.APIGroupInfo{}, err | ||||
| 	} | ||||
|  | ||||
| 	podStorage, err := podstore.NewStorage( | ||||
| 		restOptionsGetter, | ||||
| 		nodeStorage.KubeletConnectionInfo, | ||||
| 		c.ProxyTransport, | ||||
| 		podDisruptionClient, | ||||
| 	) | ||||
| 	if err != nil { | ||||
| 		return kubernetesservice.RangeRegistries{}, genericapiserver.APIGroupInfo{}, err | ||||
| 		return genericapiserver.APIGroupInfo{}, err | ||||
| 	} | ||||
|  | ||||
| 	var serviceAccountStorage *serviceaccountstore.REST | ||||
| 	if c.ServiceAccountIssuer != nil { | ||||
| 		serviceAccountStorage, err = serviceaccountstore.NewREST(restOptionsGetter, c.ServiceAccountIssuer, c.APIAudiences, c.ServiceAccountMaxExpiration, podStorage.Pod.Store, secretStorage.Store, c.ExtendExpiration) | ||||
| 		serviceAccountStorage, err = serviceaccountstore.NewREST(restOptionsGetter, c.ServiceAccountIssuer, c.APIAudiences, c.ServiceAccountMaxExpiration, nil, secretStorage.Store, c.ExtendExpiration) | ||||
| 	} else { | ||||
| 		serviceAccountStorage, err = serviceaccountstore.NewREST(restOptionsGetter, nil, nil, 0, nil, nil, false) | ||||
| 	} | ||||
| 	if err != nil { | ||||
| 		return kubernetesservice.RangeRegistries{}, genericapiserver.APIGroupInfo{}, err | ||||
| 	} | ||||
|  | ||||
| 	var serviceClusterIPRegistry rangeallocation.RangeRegistry | ||||
| 	serviceClusterIPRange := c.ServiceIPRange | ||||
| 	if serviceClusterIPRange.IP == nil { | ||||
| 		return kubernetesservice.RangeRegistries{}, genericapiserver.APIGroupInfo{}, fmt.Errorf("service clusterIPRange is missing") | ||||
| 	} | ||||
|  | ||||
| 	serviceStorageConfig, err := c.StorageFactory.NewConfig(api.Resource("services")) | ||||
| 	if err != nil { | ||||
| 		return kubernetesservice.RangeRegistries{}, genericapiserver.APIGroupInfo{}, err | ||||
| 	} | ||||
| 	var serviceClusterIPAllocator, secondaryServiceClusterIPAllocator ipallocator.Interface | ||||
|  | ||||
| 	if !utilfeature.DefaultFeatureGate.Enabled(features.MultiCIDRServiceAllocator) { | ||||
| 		serviceClusterIPAllocator, err = ipallocator.New(&serviceClusterIPRange, func(max int, rangeSpec string, offset int) (allocator.Interface, error) { | ||||
| 			var mem allocator.Snapshottable | ||||
| 			mem = allocator.NewAllocationMapWithOffset(max, rangeSpec, offset) | ||||
| 			// TODO etcdallocator package to return a storage interface via the storageFactory | ||||
| 			etcd, err := serviceallocator.NewEtcd(mem, "/ranges/serviceips", serviceStorageConfig.ForResource(api.Resource("serviceipallocations"))) | ||||
| 			if err != nil { | ||||
| 				return nil, err | ||||
| 			} | ||||
| 			serviceClusterIPRegistry = etcd | ||||
| 			return etcd, nil | ||||
| 		}) | ||||
| 		if err != nil { | ||||
| 			return kubernetesservice.RangeRegistries{}, genericapiserver.APIGroupInfo{}, fmt.Errorf("cannot create cluster IP allocator: %v", err) | ||||
| 		} | ||||
| 	} else { | ||||
| 		networkingv1alphaClient, err := networkingv1alpha1client.NewForConfig(c.LoopbackClientConfig) | ||||
| 		if err != nil { | ||||
| 			return kubernetesservice.RangeRegistries{}, genericapiserver.APIGroupInfo{}, err | ||||
| 		} | ||||
| 		serviceClusterIPAllocator, err = ipallocator.NewIPAllocator(&serviceClusterIPRange, networkingv1alphaClient, c.Informers.Networking().V1alpha1().IPAddresses()) | ||||
| 		if err != nil { | ||||
| 			return kubernetesservice.RangeRegistries{}, genericapiserver.APIGroupInfo{}, fmt.Errorf("cannot create cluster IP allocator: %v", err) | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	serviceClusterIPAllocator.EnableMetrics() | ||||
| 	restStorage.ServiceClusterIPRegistry = serviceClusterIPRegistry | ||||
|  | ||||
| 	// allocator for secondary service ip range | ||||
| 	if c.SecondaryServiceIPRange.IP != nil { | ||||
| 		var secondaryServiceClusterIPRegistry rangeallocation.RangeRegistry | ||||
| 		if !utilfeature.DefaultFeatureGate.Enabled(features.MultiCIDRServiceAllocator) { | ||||
| 			secondaryServiceClusterIPAllocator, err = ipallocator.New(&c.SecondaryServiceIPRange, func(max int, rangeSpec string, offset int) (allocator.Interface, error) { | ||||
| 				var mem allocator.Snapshottable | ||||
| 				mem = allocator.NewAllocationMapWithOffset(max, rangeSpec, offset) | ||||
| 				// TODO etcdallocator package to return a storage interface via the storageFactory | ||||
| 				etcd, err := serviceallocator.NewEtcd(mem, "/ranges/secondaryserviceips", serviceStorageConfig.ForResource(api.Resource("serviceipallocations"))) | ||||
| 				if err != nil { | ||||
| 					return nil, err | ||||
| 				} | ||||
| 				secondaryServiceClusterIPRegistry = etcd | ||||
| 				return etcd, nil | ||||
| 			}) | ||||
| 			if err != nil { | ||||
| 				return kubernetesservice.RangeRegistries{}, genericapiserver.APIGroupInfo{}, fmt.Errorf("cannot create cluster secondary IP allocator: %v", err) | ||||
| 			} | ||||
| 		} else { | ||||
| 			networkingv1alphaClient, err := networkingv1alpha1client.NewForConfig(c.LoopbackClientConfig) | ||||
| 			if err != nil { | ||||
| 				return kubernetesservice.RangeRegistries{}, genericapiserver.APIGroupInfo{}, err | ||||
| 			} | ||||
| 			secondaryServiceClusterIPAllocator, err = ipallocator.NewIPAllocator(&c.SecondaryServiceIPRange, networkingv1alphaClient, c.Informers.Networking().V1alpha1().IPAddresses()) | ||||
| 			if err != nil { | ||||
| 				return kubernetesservice.RangeRegistries{}, genericapiserver.APIGroupInfo{}, fmt.Errorf("cannot create cluster secondary IP allocator: %v", err) | ||||
| 			} | ||||
| 		} | ||||
| 		secondaryServiceClusterIPAllocator.EnableMetrics() | ||||
| 		restStorage.SecondaryServiceClusterIPRegistry = secondaryServiceClusterIPRegistry | ||||
| 	} | ||||
|  | ||||
| 	var serviceNodePortRegistry rangeallocation.RangeRegistry | ||||
| 	serviceNodePortAllocator, err := portallocator.New(c.ServiceNodePortRange, func(max int, rangeSpec string, offset int) (allocator.Interface, error) { | ||||
| 		mem := allocator.NewAllocationMapWithOffset(max, rangeSpec, offset) | ||||
| 		// TODO etcdallocator package to return a storage interface via the storageFactory | ||||
| 		etcd, err := serviceallocator.NewEtcd(mem, "/ranges/servicenodeports", serviceStorageConfig.ForResource(api.Resource("servicenodeportallocations"))) | ||||
| 		if err != nil { | ||||
| 			return nil, err | ||||
| 		} | ||||
| 		serviceNodePortRegistry = etcd | ||||
| 		return etcd, nil | ||||
| 	}) | ||||
| 	if err != nil { | ||||
| 		return kubernetesservice.RangeRegistries{}, genericapiserver.APIGroupInfo{}, fmt.Errorf("cannot create cluster port allocator: %v", err) | ||||
| 	} | ||||
| 	serviceNodePortAllocator.EnableMetrics() | ||||
| 	restStorage.ServiceNodePortRegistry = serviceNodePortRegistry | ||||
|  | ||||
| 	controllerStorage, err := controllerstore.NewStorage(restOptionsGetter) | ||||
| 	if err != nil { | ||||
| 		return kubernetesservice.RangeRegistries{}, genericapiserver.APIGroupInfo{}, err | ||||
| 	} | ||||
|  | ||||
| 	serviceIPAllocators := map[api.IPFamily]ipallocator.Interface{ | ||||
| 		serviceClusterIPAllocator.IPFamily(): serviceClusterIPAllocator, | ||||
| 	} | ||||
| 	if secondaryServiceClusterIPAllocator != nil { | ||||
| 		serviceIPAllocators[secondaryServiceClusterIPAllocator.IPFamily()] = secondaryServiceClusterIPAllocator | ||||
| 	} | ||||
|  | ||||
| 	serviceRESTStorage, serviceStatusStorage, serviceRESTProxy, err := servicestore.NewREST( | ||||
| 		restOptionsGetter, | ||||
| 		serviceClusterIPAllocator.IPFamily(), | ||||
| 		serviceIPAllocators, | ||||
| 		serviceNodePortAllocator, | ||||
| 		endpointsStorage, | ||||
| 		podStorage.Pod, | ||||
| 		c.ProxyTransport) | ||||
| 	if err != nil { | ||||
| 		return kubernetesservice.RangeRegistries{}, genericapiserver.APIGroupInfo{}, err | ||||
| 		return genericapiserver.APIGroupInfo{}, err | ||||
| 	} | ||||
|  | ||||
| 	storage := map[string]rest.Storage{} | ||||
| 	if resource := "pods"; apiResourceConfigSource.ResourceEnabled(corev1.SchemeGroupVersion.WithResource(resource)) { | ||||
| 		storage[resource] = podStorage.Pod | ||||
| 		storage[resource+"/attach"] = podStorage.Attach | ||||
| 		storage[resource+"/status"] = podStorage.Status | ||||
| 		storage[resource+"/log"] = podStorage.Log | ||||
| 		storage[resource+"/exec"] = podStorage.Exec | ||||
| 		storage[resource+"/portforward"] = podStorage.PortForward | ||||
| 		storage[resource+"/proxy"] = podStorage.Proxy | ||||
| 		storage[resource+"/binding"] = podStorage.Binding | ||||
| 		if podStorage.Eviction != nil { | ||||
| 			storage[resource+"/eviction"] = podStorage.Eviction | ||||
| 		} | ||||
| 		storage[resource+"/ephemeralcontainers"] = podStorage.EphemeralContainers | ||||
|  | ||||
| 	} | ||||
| 	if resource := "bindings"; apiResourceConfigSource.ResourceEnabled(corev1.SchemeGroupVersion.WithResource(resource)) { | ||||
| 		storage[resource] = podStorage.LegacyBinding | ||||
| 	} | ||||
|  | ||||
| 	if resource := "podtemplates"; apiResourceConfigSource.ResourceEnabled(corev1.SchemeGroupVersion.WithResource(resource)) { | ||||
| 		storage[resource] = podTemplateStorage | ||||
| 	} | ||||
|  | ||||
| 	if resource := "replicationcontrollers"; apiResourceConfigSource.ResourceEnabled(corev1.SchemeGroupVersion.WithResource(resource)) { | ||||
| 		storage[resource] = controllerStorage.Controller | ||||
| 		storage[resource+"/status"] = controllerStorage.Status | ||||
| 		if legacyscheme.Scheme.IsVersionRegistered(schema.GroupVersion{Group: "autoscaling", Version: "v1"}) { | ||||
| 			storage[resource+"/scale"] = controllerStorage.Scale | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	if resource := "services"; apiResourceConfigSource.ResourceEnabled(corev1.SchemeGroupVersion.WithResource(resource)) { | ||||
| 		storage[resource] = serviceRESTStorage | ||||
| 		storage[resource+"/proxy"] = serviceRESTProxy | ||||
| 		storage[resource+"/status"] = serviceStatusStorage | ||||
| 	} | ||||
|  | ||||
| 	if resource := "endpoints"; apiResourceConfigSource.ResourceEnabled(corev1.SchemeGroupVersion.WithResource(resource)) { | ||||
| 		storage[resource] = endpointsStorage | ||||
| 	} | ||||
|  | ||||
| 	if resource := "nodes"; apiResourceConfigSource.ResourceEnabled(corev1.SchemeGroupVersion.WithResource(resource)) { | ||||
| 		storage[resource] = nodeStorage.Node | ||||
| 		storage[resource+"/proxy"] = nodeStorage.Proxy | ||||
| 		storage[resource+"/status"] = nodeStorage.Status | ||||
| 	} | ||||
|  | ||||
| 	if resource := "events"; apiResourceConfigSource.ResourceEnabled(corev1.SchemeGroupVersion.WithResource(resource)) { | ||||
| 		storage[resource] = eventStorage | ||||
| 	} | ||||
|  | ||||
| 	if resource := "limitranges"; apiResourceConfigSource.ResourceEnabled(corev1.SchemeGroupVersion.WithResource(resource)) { | ||||
| 		storage[resource] = limitRangeStorage | ||||
| 	} | ||||
|  | ||||
| 	if resource := "resourcequotas"; apiResourceConfigSource.ResourceEnabled(corev1.SchemeGroupVersion.WithResource(resource)) { | ||||
| 		storage[resource] = resourceQuotaStorage | ||||
| 		storage[resource+"/status"] = resourceQuotaStatusStorage | ||||
| @@ -378,6 +170,165 @@ func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(apiResourceConfigSource | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	if resource := "configmaps"; apiResourceConfigSource.ResourceEnabled(corev1.SchemeGroupVersion.WithResource(resource)) { | ||||
| 		storage[resource] = configMapStorage | ||||
| 	} | ||||
|  | ||||
| 	if len(storage) > 0 { | ||||
| 		apiGroupInfo.VersionedResourcesStorageMap["v1"] = storage | ||||
| 	} | ||||
|  | ||||
| 	return apiGroupInfo, nil | ||||
| } | ||||
| func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter) (kubernetesservice.RangeRegistries, genericapiserver.APIGroupInfo, error) { | ||||
| 	apiGroupInfo, err := c.GenericLegacyRESTStorageProvider.NewRESTStorage(apiResourceConfigSource, restOptionsGetter) | ||||
| 	if err != nil { | ||||
| 		return kubernetesservice.RangeRegistries{}, genericapiserver.APIGroupInfo{}, err | ||||
| 	} | ||||
|  | ||||
| 	podDisruptionClient, err := policyclient.NewForConfig(c.LoopbackClientConfig) | ||||
| 	if err != nil { | ||||
| 		return kubernetesservice.RangeRegistries{}, genericapiserver.APIGroupInfo{}, err | ||||
| 	} | ||||
|  | ||||
| 	podTemplateStorage, err := podtemplatestore.NewREST(restOptionsGetter) | ||||
| 	if err != nil { | ||||
| 		return kubernetesservice.RangeRegistries{}, genericapiserver.APIGroupInfo{}, err | ||||
| 	} | ||||
|  | ||||
| 	limitRangeStorage, err := limitrangestore.NewREST(restOptionsGetter) | ||||
| 	if err != nil { | ||||
| 		return kubernetesservice.RangeRegistries{}, genericapiserver.APIGroupInfo{}, err | ||||
| 	} | ||||
|  | ||||
| 	persistentVolumeStorage, persistentVolumeStatusStorage, err := pvstore.NewREST(restOptionsGetter) | ||||
| 	if err != nil { | ||||
| 		return kubernetesservice.RangeRegistries{}, genericapiserver.APIGroupInfo{}, err | ||||
| 	} | ||||
| 	persistentVolumeClaimStorage, persistentVolumeClaimStatusStorage, err := pvcstore.NewREST(restOptionsGetter) | ||||
| 	if err != nil { | ||||
| 		return kubernetesservice.RangeRegistries{}, genericapiserver.APIGroupInfo{}, err | ||||
| 	} | ||||
|  | ||||
| 	endpointsStorage, err := endpointsstore.NewREST(restOptionsGetter) | ||||
| 	if err != nil { | ||||
| 		return kubernetesservice.RangeRegistries{}, genericapiserver.APIGroupInfo{}, err | ||||
| 	} | ||||
|  | ||||
| 	nodeStorage, err := nodestore.NewStorage(restOptionsGetter, c.KubeletClientConfig, c.ProxyTransport) | ||||
| 	if err != nil { | ||||
| 		return kubernetesservice.RangeRegistries{}, genericapiserver.APIGroupInfo{}, err | ||||
| 	} | ||||
|  | ||||
| 	podStorage, err := podstore.NewStorage( | ||||
| 		restOptionsGetter, | ||||
| 		nodeStorage.KubeletConnectionInfo, | ||||
| 		c.ProxyTransport, | ||||
| 		podDisruptionClient, | ||||
| 	) | ||||
| 	if err != nil { | ||||
| 		return kubernetesservice.RangeRegistries{}, genericapiserver.APIGroupInfo{}, err | ||||
| 	} | ||||
|  | ||||
| 	rangeRegistries, primaryServiceClusterIPAllocator, serviceClusterIPAllocators, serviceNodePortAllocator, err := c.newServiceIPAllocators() | ||||
| 	if err != nil { | ||||
| 		return kubernetesservice.RangeRegistries{}, genericapiserver.APIGroupInfo{}, err | ||||
| 	} | ||||
| 	serviceRESTStorage, serviceStatusStorage, serviceRESTProxy, err := servicestore.NewREST( | ||||
| 		restOptionsGetter, | ||||
| 		primaryServiceClusterIPAllocator.IPFamily(), | ||||
| 		serviceClusterIPAllocators, | ||||
| 		serviceNodePortAllocator, | ||||
| 		endpointsStorage, | ||||
| 		podStorage.Pod, | ||||
| 		c.ProxyTransport) | ||||
| 	if err != nil { | ||||
| 		return kubernetesservice.RangeRegistries{}, genericapiserver.APIGroupInfo{}, err | ||||
| 	} | ||||
|  | ||||
| 	storage := apiGroupInfo.VersionedResourcesStorageMap["v1"] | ||||
| 	if storage == nil { | ||||
| 		storage = map[string]rest.Storage{} | ||||
| 	} | ||||
|  | ||||
| 	// potentially override the generic serviceaccount storage with one that supports pods | ||||
| 	var serviceAccountStorage *serviceaccountstore.REST | ||||
| 	if c.ServiceAccountIssuer != nil { | ||||
| 		serviceAccountStorage, err = serviceaccountstore.NewREST(restOptionsGetter, c.ServiceAccountIssuer, c.APIAudiences, c.ServiceAccountMaxExpiration, podStorage.Pod.Store, storage["secrets"].(rest.Getter), c.ExtendExpiration) | ||||
| 		if err != nil { | ||||
| 			return kubernetesservice.RangeRegistries{}, genericapiserver.APIGroupInfo{}, err | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	if resource := "pods"; apiResourceConfigSource.ResourceEnabled(corev1.SchemeGroupVersion.WithResource(resource)) { | ||||
| 		storage[resource] = podStorage.Pod | ||||
| 		storage[resource+"/attach"] = podStorage.Attach | ||||
| 		storage[resource+"/status"] = podStorage.Status | ||||
| 		storage[resource+"/log"] = podStorage.Log | ||||
| 		storage[resource+"/exec"] = podStorage.Exec | ||||
| 		storage[resource+"/portforward"] = podStorage.PortForward | ||||
| 		storage[resource+"/proxy"] = podStorage.Proxy | ||||
| 		storage[resource+"/binding"] = podStorage.Binding | ||||
| 		if podStorage.Eviction != nil { | ||||
| 			storage[resource+"/eviction"] = podStorage.Eviction | ||||
| 		} | ||||
| 		storage[resource+"/ephemeralcontainers"] = podStorage.EphemeralContainers | ||||
| 	} | ||||
| 	if resource := "bindings"; apiResourceConfigSource.ResourceEnabled(corev1.SchemeGroupVersion.WithResource(resource)) { | ||||
| 		storage[resource] = podStorage.LegacyBinding | ||||
| 	} | ||||
|  | ||||
| 	if resource := "podtemplates"; apiResourceConfigSource.ResourceEnabled(corev1.SchemeGroupVersion.WithResource(resource)) { | ||||
| 		storage[resource] = podTemplateStorage | ||||
| 	} | ||||
|  | ||||
| 	if resource := "replicationcontrollers"; apiResourceConfigSource.ResourceEnabled(corev1.SchemeGroupVersion.WithResource(resource)) { | ||||
| 		controllerStorage, err := controllerstore.NewStorage(restOptionsGetter) | ||||
| 		if err != nil { | ||||
| 			return kubernetesservice.RangeRegistries{}, genericapiserver.APIGroupInfo{}, err | ||||
| 		} | ||||
|  | ||||
| 		storage[resource] = controllerStorage.Controller | ||||
| 		storage[resource+"/status"] = controllerStorage.Status | ||||
| 		if legacyscheme.Scheme.IsVersionRegistered(schema.GroupVersion{Group: "autoscaling", Version: "v1"}) { | ||||
| 			storage[resource+"/scale"] = controllerStorage.Scale | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	// potentially override generic storage for service account (with pod support) | ||||
| 	if resource := "serviceaccounts"; serviceAccountStorage != nil && apiResourceConfigSource.ResourceEnabled(corev1.SchemeGroupVersion.WithResource(resource)) { | ||||
| 		// don't leak go routines | ||||
| 		storage[resource].Destroy() | ||||
| 		if storage[resource+"/token"] != nil { | ||||
| 			storage[resource+"/token"].Destroy() | ||||
| 		} | ||||
|  | ||||
| 		storage[resource] = serviceAccountStorage | ||||
| 		if serviceAccountStorage.Token != nil { | ||||
| 			storage[resource+"/token"] = serviceAccountStorage.Token | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	if resource := "services"; apiResourceConfigSource.ResourceEnabled(corev1.SchemeGroupVersion.WithResource(resource)) { | ||||
| 		storage[resource] = serviceRESTStorage | ||||
| 		storage[resource+"/proxy"] = serviceRESTProxy | ||||
| 		storage[resource+"/status"] = serviceStatusStorage | ||||
| 	} | ||||
|  | ||||
| 	if resource := "endpoints"; apiResourceConfigSource.ResourceEnabled(corev1.SchemeGroupVersion.WithResource(resource)) { | ||||
| 		storage[resource] = endpointsStorage | ||||
| 	} | ||||
|  | ||||
| 	if resource := "nodes"; apiResourceConfigSource.ResourceEnabled(corev1.SchemeGroupVersion.WithResource(resource)) { | ||||
| 		storage[resource] = nodeStorage.Node | ||||
| 		storage[resource+"/proxy"] = nodeStorage.Proxy | ||||
| 		storage[resource+"/status"] = nodeStorage.Status | ||||
| 	} | ||||
|  | ||||
| 	if resource := "limitranges"; apiResourceConfigSource.ResourceEnabled(corev1.SchemeGroupVersion.WithResource(resource)) { | ||||
| 		storage[resource] = limitRangeStorage | ||||
| 	} | ||||
|  | ||||
| 	if resource := "persistentvolumes"; apiResourceConfigSource.ResourceEnabled(corev1.SchemeGroupVersion.WithResource(resource)) { | ||||
| 		storage[resource] = persistentVolumeStorage | ||||
| 		storage[resource+"/status"] = persistentVolumeStatusStorage | ||||
| @@ -388,10 +339,6 @@ func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(apiResourceConfigSource | ||||
| 		storage[resource+"/status"] = persistentVolumeClaimStatusStorage | ||||
| 	} | ||||
|  | ||||
| 	if resource := "configmaps"; apiResourceConfigSource.ResourceEnabled(corev1.SchemeGroupVersion.WithResource(resource)) { | ||||
| 		storage[resource] = configMapStorage | ||||
| 	} | ||||
|  | ||||
| 	if resource := "componentstatuses"; apiResourceConfigSource.ResourceEnabled(corev1.SchemeGroupVersion.WithResource(resource)) { | ||||
| 		storage[resource] = componentstatus.NewStorage(componentStatusStorage{c.StorageFactory}.serversToValidate) | ||||
| 	} | ||||
| @@ -400,7 +347,98 @@ func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(apiResourceConfigSource | ||||
| 		apiGroupInfo.VersionedResourcesStorageMap["v1"] = storage | ||||
| 	} | ||||
|  | ||||
| 	return restStorage, apiGroupInfo, nil | ||||
| 	return rangeRegistries, apiGroupInfo, nil | ||||
| } | ||||
|  | ||||
| func (c LegacyRESTStorageProvider) newServiceIPAllocators() (registries kubernetesservice.RangeRegistries, primaryClusterIPAllocator ipallocator.Interface, clusterIPAllocators map[api.IPFamily]ipallocator.Interface, nodePortAllocator *portallocator.PortAllocator, err error) { | ||||
| 	clusterIPAllocators = map[api.IPFamily]ipallocator.Interface{} | ||||
|  | ||||
| 	serviceStorageConfig, err := c.StorageFactory.NewConfig(api.Resource("services")) | ||||
| 	if err != nil { | ||||
| 		return kubernetesservice.RangeRegistries{}, nil, nil, nil, err | ||||
| 	} | ||||
|  | ||||
| 	serviceClusterIPRange := c.ServiceIPRange | ||||
| 	if serviceClusterIPRange.IP == nil { | ||||
| 		return kubernetesservice.RangeRegistries{}, nil, nil, nil, fmt.Errorf("service clusterIPRange is missing") | ||||
| 	} | ||||
|  | ||||
| 	if !utilfeature.DefaultFeatureGate.Enabled(features.MultiCIDRServiceAllocator) { | ||||
| 		primaryClusterIPAllocator, err = ipallocator.New(&serviceClusterIPRange, func(max int, rangeSpec string, offset int) (allocator.Interface, error) { | ||||
| 			var mem allocator.Snapshottable | ||||
| 			mem = allocator.NewAllocationMapWithOffset(max, rangeSpec, offset) | ||||
| 			// TODO etcdallocator package to return a storage interface via the storageFactory | ||||
| 			etcd, err := serviceallocator.NewEtcd(mem, "/ranges/serviceips", serviceStorageConfig.ForResource(api.Resource("serviceipallocations"))) | ||||
| 			if err != nil { | ||||
| 				return nil, err | ||||
| 			} | ||||
| 			registries.ServiceClusterIPRegistry = etcd | ||||
| 			return etcd, nil | ||||
| 		}) | ||||
| 		if err != nil { | ||||
| 			return kubernetesservice.RangeRegistries{}, nil, nil, nil, fmt.Errorf("cannot create cluster IP allocator: %v", err) | ||||
| 		} | ||||
| 	} else { | ||||
| 		networkingv1alphaClient, err := networkingv1alpha1client.NewForConfig(c.LoopbackClientConfig) | ||||
| 		if err != nil { | ||||
| 			return kubernetesservice.RangeRegistries{}, nil, nil, nil, err | ||||
| 		} | ||||
| 		primaryClusterIPAllocator, err = ipallocator.NewIPAllocator(&serviceClusterIPRange, networkingv1alphaClient, c.Informers.Networking().V1alpha1().IPAddresses()) | ||||
| 		if err != nil { | ||||
| 			return kubernetesservice.RangeRegistries{}, nil, nil, nil, fmt.Errorf("cannot create cluster IP allocator: %v", err) | ||||
| 		} | ||||
| 	} | ||||
| 	primaryClusterIPAllocator.EnableMetrics() | ||||
| 	clusterIPAllocators[primaryClusterIPAllocator.IPFamily()] = primaryClusterIPAllocator | ||||
|  | ||||
| 	var secondaryClusterIPAllocator ipallocator.Interface | ||||
| 	if c.SecondaryServiceIPRange.IP != nil { | ||||
| 		if !utilfeature.DefaultFeatureGate.Enabled(features.MultiCIDRServiceAllocator) { | ||||
| 			var err error | ||||
| 			secondaryClusterIPAllocator, err = ipallocator.New(&c.SecondaryServiceIPRange, func(max int, rangeSpec string, offset int) (allocator.Interface, error) { | ||||
| 				var mem allocator.Snapshottable | ||||
| 				mem = allocator.NewAllocationMapWithOffset(max, rangeSpec, offset) | ||||
| 				// TODO etcdallocator package to return a storage interface via the storageFactory | ||||
| 				etcd, err := serviceallocator.NewEtcd(mem, "/ranges/secondaryserviceips", serviceStorageConfig.ForResource(api.Resource("serviceipallocations"))) | ||||
| 				if err != nil { | ||||
| 					return nil, err | ||||
| 				} | ||||
| 				registries.SecondaryServiceClusterIPRegistry = etcd | ||||
| 				return etcd, nil | ||||
| 			}) | ||||
| 			if err != nil { | ||||
| 				return kubernetesservice.RangeRegistries{}, nil, nil, nil, fmt.Errorf("cannot create cluster secondary IP allocator: %v", err) | ||||
| 			} | ||||
| 		} else { | ||||
| 			networkingv1alphaClient, err := networkingv1alpha1client.NewForConfig(c.LoopbackClientConfig) | ||||
| 			if err != nil { | ||||
| 				return kubernetesservice.RangeRegistries{}, nil, nil, nil, err | ||||
| 			} | ||||
| 			secondaryClusterIPAllocator, err = ipallocator.NewIPAllocator(&c.SecondaryServiceIPRange, networkingv1alphaClient, c.Informers.Networking().V1alpha1().IPAddresses()) | ||||
| 			if err != nil { | ||||
| 				return kubernetesservice.RangeRegistries{}, nil, nil, nil, fmt.Errorf("cannot create cluster secondary IP allocator: %v", err) | ||||
| 			} | ||||
| 		} | ||||
| 		secondaryClusterIPAllocator.EnableMetrics() | ||||
| 		clusterIPAllocators[secondaryClusterIPAllocator.IPFamily()] = secondaryClusterIPAllocator | ||||
| 	} | ||||
|  | ||||
| 	nodePortAllocator, err = portallocator.New(c.ServiceNodePortRange, func(max int, rangeSpec string, offset int) (allocator.Interface, error) { | ||||
| 		mem := allocator.NewAllocationMapWithOffset(max, rangeSpec, offset) | ||||
| 		// TODO etcdallocator package to return a storage interface via the storageFactory | ||||
| 		etcd, err := serviceallocator.NewEtcd(mem, "/ranges/servicenodeports", serviceStorageConfig.ForResource(api.Resource("servicenodeportallocations"))) | ||||
| 		if err != nil { | ||||
| 			return nil, err | ||||
| 		} | ||||
| 		registries.ServiceNodePortRegistry = etcd | ||||
| 		return etcd, nil | ||||
| 	}) | ||||
| 	if err != nil { | ||||
| 		return kubernetesservice.RangeRegistries{}, nil, nil, nil, fmt.Errorf("cannot create cluster port allocator: %v", err) | ||||
| 	} | ||||
| 	nodePortAllocator.EnableMetrics() | ||||
|  | ||||
| 	return | ||||
| } | ||||
|  | ||||
| func (p LegacyRESTStorageProvider) GroupName() string { | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 Kubernetes Prow Robot
					Kubernetes Prow Robot