pkg/controlplane: split up config into generic controlplane and kube-related part

Signed-off-by: Dr. Stefan Schimanski <stefan.schimanski@gmail.com>
This commit is contained in:
Dr. Stefan Schimanski
2023-06-27 12:07:05 +02:00
parent ba05a8deb3
commit e37917fea7
26 changed files with 333 additions and 263 deletions

View File

@@ -19,7 +19,6 @@ package controlplane
import (
"fmt"
"net"
"net/http"
"os"
"reflect"
"strconv"
@@ -69,7 +68,6 @@ import (
serverstorage "k8s.io/apiserver/pkg/server/storage"
utilfeature "k8s.io/apiserver/pkg/util/feature"
utilpeerproxy "k8s.io/apiserver/pkg/util/peerproxy"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
discoveryclient "k8s.io/client-go/kubernetes/typed/discovery/v1"
@@ -80,6 +78,7 @@ import (
flowcontrolv1beta1 "k8s.io/kubernetes/pkg/apis/flowcontrol/v1beta1"
flowcontrolv1beta2 "k8s.io/kubernetes/pkg/apis/flowcontrol/v1beta2"
flowcontrolv1beta3 "k8s.io/kubernetes/pkg/apis/flowcontrol/v1beta3"
controlplaneapiserver "k8s.io/kubernetes/pkg/controlplane/apiserver"
"k8s.io/kubernetes/pkg/controlplane/apiserver/options"
"k8s.io/kubernetes/pkg/controlplane/controller/apiserverleasegc"
"k8s.io/kubernetes/pkg/controlplane/controller/clusterauthenticationtrust"
@@ -125,6 +124,9 @@ const (
DefaultEndpointReconcilerInterval = 10 * time.Second
// DefaultEndpointReconcilerTTL is the default TTL timeout for the storage layer
DefaultEndpointReconcilerTTL = 15 * time.Second
// DefaultPeerEndpointReconcilerTTL is the default TTL timeout for peer endpoint
// leases on the storage layer
DefaultPeerEndpointReconcilerTTL = 15 * time.Second
// IdentityLeaseComponentLabelKey is used to apply a component label to identity lease objects, indicating:
// 1. the lease is an identity lease (different from leader election leases)
// 2. which component owns this lease
@@ -144,36 +146,25 @@ var (
// IdentityLeaseDurationSeconds is the duration of kube-apiserver lease in seconds
// IdentityLeaseDurationSeconds is exposed so integration tests can tune this value.
IdentityLeaseDurationSeconds = 3600
// IdentityLeaseRenewIntervalSeconds is the interval of kube-apiserver renewing its lease in seconds
// IdentityLeaseRenewIntervalSeconds is exposed so integration tests can tune this value.
// IdentityLeaseRenewIntervalPeriod is the interval of kube-apiserver renewing its lease in seconds
// IdentityLeaseRenewIntervalPeriod is exposed so integration tests can tune this value.
IdentityLeaseRenewIntervalPeriod = 10 * time.Second
)
// ExtraConfig defines extra configuration for the master
type ExtraConfig struct {
ClusterAuthenticationInfo clusterauthenticationtrust.ClusterAuthenticationInfo
APIResourceConfigSource serverstorage.APIResourceConfigSource
StorageFactory serverstorage.StorageFactory
// Extra defines extra configuration for the master
type Extra struct {
EndpointReconcilerConfig EndpointReconcilerConfig
EventTTL time.Duration
KubeletClientConfig kubeletclient.KubeletClientConfig
EnableLogsSupport bool
ProxyTransport *http.Transport
// PeerProxy, if not nil, sets proxy transport between kube-apiserver peers for requests
// that can not be served locally
PeerProxy utilpeerproxy.Interface
// PeerEndpointLeaseReconciler updates the peer endpoint leases
PeerEndpointLeaseReconciler peerreconcilers.PeerEndpointLeaseReconciler
// PeerCAFile is the ca bundle used by this kube-apiserver to verify peer apiservers'
// serving certs when routing a request to the peer in the case the request can not be served
// locally due to version skew.
PeerCAFile string
// PeerAdvertiseAddress is the IP for this kube-apiserver which is used by peer apiservers to route a request
// to this apiserver. This happens in cases where the peer is not able to serve the request due to
// version skew. If unset, AdvertiseAddress/BindAddress will be used.
@@ -219,17 +210,6 @@ type ExtraConfig struct {
// Selects which reconciler to use
EndpointReconcilerType reconcilers.Type
ServiceAccountIssuer serviceaccount.TokenGenerator
ServiceAccountMaxExpiration time.Duration
ExtendExpiration bool
// ServiceAccountIssuerDiscovery
ServiceAccountIssuerURL string
ServiceAccountJWKSURI string
ServiceAccountPublicKeys []interface{}
VersionedInformers informers.SharedInformerFactory
// RepairServicesInterval interval used by the repair loops for
// the Services NodePort and ClusterIP resources
RepairServicesInterval time.Duration
@@ -237,13 +217,13 @@ type ExtraConfig struct {
// Config defines configuration for the master
type Config struct {
GenericConfig *genericapiserver.Config
ExtraConfig ExtraConfig
ControlPlane controlplaneapiserver.Config
Extra
}
type completedConfig struct {
GenericConfig genericapiserver.CompletedConfig
ExtraConfig *ExtraConfig
ControlPlane controlplaneapiserver.CompletedConfig
*Extra
}
// CompletedConfig embeds a private pointer that cannot be instantiated outside of this package
@@ -266,11 +246,11 @@ type Instance struct {
}
func (c *Config) createMasterCountReconciler() reconcilers.EndpointReconciler {
endpointClient := corev1client.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig)
endpointSliceClient := discoveryclient.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig)
endpointClient := corev1client.NewForConfigOrDie(c.ControlPlane.Generic.LoopbackClientConfig)
endpointSliceClient := discoveryclient.NewForConfigOrDie(c.ControlPlane.Generic.LoopbackClientConfig)
endpointsAdapter := reconcilers.NewEndpointsAdapter(endpointClient, endpointSliceClient)
return reconcilers.NewMasterCountEndpointReconciler(c.ExtraConfig.MasterCount, endpointsAdapter)
return reconcilers.NewMasterCountEndpointReconciler(c.Extra.MasterCount, endpointsAdapter)
}
func (c *Config) createNoneReconciler() reconcilers.EndpointReconciler {
@@ -278,12 +258,12 @@ func (c *Config) createNoneReconciler() reconcilers.EndpointReconciler {
}
func (c *Config) createLeaseReconciler() reconcilers.EndpointReconciler {
endpointClient := corev1client.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig)
endpointSliceClient := discoveryclient.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig)
endpointClient := corev1client.NewForConfigOrDie(c.ControlPlane.Generic.LoopbackClientConfig)
endpointSliceClient := discoveryclient.NewForConfigOrDie(c.ControlPlane.Generic.LoopbackClientConfig)
endpointsAdapter := reconcilers.NewEndpointsAdapter(endpointClient, endpointSliceClient)
ttl := c.ExtraConfig.MasterEndpointReconcileTTL
config, err := c.ExtraConfig.StorageFactory.NewConfig(api.Resource("apiServerIPInfo"))
ttl := c.Extra.MasterEndpointReconcileTTL
config, err := c.ControlPlane.StorageFactory.NewConfig(api.Resource("apiServerIPInfo"))
if err != nil {
klog.Fatalf("Error creating storage factory config: %v", err)
}
@@ -296,8 +276,8 @@ func (c *Config) createLeaseReconciler() reconcilers.EndpointReconciler {
}
func (c *Config) createEndpointReconciler() reconcilers.EndpointReconciler {
klog.Infof("Using reconciler: %v", c.ExtraConfig.EndpointReconcilerType)
switch c.ExtraConfig.EndpointReconcilerType {
klog.Infof("Using reconciler: %v", c.Extra.EndpointReconcilerType)
switch c.Extra.EndpointReconcilerType {
// there are numerous test dependencies that depend on a default controller
case reconcilers.MasterCountReconcilerType:
return c.createMasterCountReconciler()
@@ -306,7 +286,7 @@ func (c *Config) createEndpointReconciler() reconcilers.EndpointReconciler {
case reconcilers.NoneEndpointReconcilerType:
return c.createNoneReconciler()
default:
klog.Fatalf("Reconciler not implemented: %v", c.ExtraConfig.EndpointReconcilerType)
klog.Fatalf("Reconciler not implemented: %v", c.Extra.EndpointReconcilerType)
}
return nil
}
@@ -314,49 +294,50 @@ func (c *Config) createEndpointReconciler() reconcilers.EndpointReconciler {
// Complete fills in any fields not set that are required to have valid data. It's mutating the receiver.
func (c *Config) Complete() CompletedConfig {
cfg := completedConfig{
c.GenericConfig.Complete(c.ExtraConfig.VersionedInformers),
&c.ExtraConfig,
c.ControlPlane.Complete(),
&c.Extra,
}
serviceIPRange, apiServerServiceIP, err := options.ServiceIPRange(cfg.ExtraConfig.ServiceIPRange)
serviceIPRange, apiServerServiceIP, err := options.ServiceIPRange(cfg.Extra.ServiceIPRange)
if err != nil {
klog.Fatalf("Error determining service IP ranges: %v", err)
}
if cfg.ExtraConfig.ServiceIPRange.IP == nil {
cfg.ExtraConfig.ServiceIPRange = serviceIPRange
if cfg.Extra.ServiceIPRange.IP == nil {
cfg.Extra.ServiceIPRange = serviceIPRange
}
if cfg.ExtraConfig.APIServerServiceIP == nil {
cfg.ExtraConfig.APIServerServiceIP = apiServerServiceIP
if cfg.Extra.APIServerServiceIP == nil {
cfg.Extra.APIServerServiceIP = apiServerServiceIP
}
discoveryAddresses := discovery.DefaultAddresses{DefaultAddress: cfg.GenericConfig.ExternalAddress}
// override the default discovery addresses in the generic controlplane adding service IP support
discoveryAddresses := discovery.DefaultAddresses{DefaultAddress: cfg.ControlPlane.Generic.ExternalAddress}
discoveryAddresses.CIDRRules = append(discoveryAddresses.CIDRRules,
discovery.CIDRRule{IPRange: cfg.ExtraConfig.ServiceIPRange, Address: net.JoinHostPort(cfg.ExtraConfig.APIServerServiceIP.String(), strconv.Itoa(cfg.ExtraConfig.APIServerServicePort))})
cfg.GenericConfig.DiscoveryAddresses = discoveryAddresses
discovery.CIDRRule{IPRange: cfg.Extra.ServiceIPRange, Address: net.JoinHostPort(cfg.Extra.APIServerServiceIP.String(), strconv.Itoa(cfg.Extra.APIServerServicePort))})
cfg.ControlPlane.Generic.DiscoveryAddresses = discoveryAddresses
if cfg.ExtraConfig.ServiceNodePortRange.Size == 0 {
if cfg.Extra.ServiceNodePortRange.Size == 0 {
// TODO: Currently no way to specify an empty range (do we need to allow this?)
// We should probably allow this for clouds that don't require NodePort to do load-balancing (GCE)
// but then that breaks the strict nestedness of ServiceType.
// Review post-v1
cfg.ExtraConfig.ServiceNodePortRange = kubeoptions.DefaultServiceNodePortRange
klog.Infof("Node port range unspecified. Defaulting to %v.", cfg.ExtraConfig.ServiceNodePortRange)
cfg.Extra.ServiceNodePortRange = kubeoptions.DefaultServiceNodePortRange
klog.Infof("Node port range unspecified. Defaulting to %v.", cfg.Extra.ServiceNodePortRange)
}
if cfg.ExtraConfig.EndpointReconcilerConfig.Interval == 0 {
cfg.ExtraConfig.EndpointReconcilerConfig.Interval = DefaultEndpointReconcilerInterval
if cfg.Extra.EndpointReconcilerConfig.Interval == 0 {
cfg.Extra.EndpointReconcilerConfig.Interval = DefaultEndpointReconcilerInterval
}
if cfg.ExtraConfig.MasterEndpointReconcileTTL == 0 {
cfg.ExtraConfig.MasterEndpointReconcileTTL = DefaultEndpointReconcilerTTL
if cfg.Extra.MasterEndpointReconcileTTL == 0 {
cfg.Extra.MasterEndpointReconcileTTL = DefaultEndpointReconcilerTTL
}
if cfg.ExtraConfig.EndpointReconcilerConfig.Reconciler == nil {
cfg.ExtraConfig.EndpointReconcilerConfig.Reconciler = c.createEndpointReconciler()
if cfg.Extra.EndpointReconcilerConfig.Reconciler == nil {
cfg.Extra.EndpointReconcilerConfig.Reconciler = c.createEndpointReconciler()
}
if cfg.ExtraConfig.RepairServicesInterval == 0 {
cfg.ExtraConfig.RepairServicesInterval = repairLoopInterval
if cfg.Extra.RepairServicesInterval == 0 {
cfg.Extra.RepairServicesInterval = repairLoopInterval
}
return CompletedConfig{&cfg}
@@ -366,27 +347,27 @@ func (c *Config) Complete() CompletedConfig {
// Certain config fields will be set to a default value if unset.
// Certain config fields must be specified, including:
// KubeletClientConfig
func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*Instance, error) {
if reflect.DeepEqual(c.ExtraConfig.KubeletClientConfig, kubeletclient.KubeletClientConfig{}) {
func (c CompletedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*Instance, error) {
if reflect.DeepEqual(c.Extra.KubeletClientConfig, kubeletclient.KubeletClientConfig{}) {
return nil, fmt.Errorf("Master.New() called with empty config.KubeletClientConfig")
}
s, err := c.GenericConfig.New("kube-apiserver", delegationTarget)
s, err := c.ControlPlane.Generic.New("kube-apiserver", delegationTarget)
if err != nil {
return nil, err
}
if c.ExtraConfig.EnableLogsSupport {
if c.ControlPlane.Extra.EnableLogsSupport {
routes.Logs{}.Install(s.Handler.GoRestfulContainer)
}
// Metadata and keys are expected to only change across restarts at present,
// so we just marshal immediately and serve the cached JSON bytes.
md, err := serviceaccount.NewOpenIDMetadata(
c.ExtraConfig.ServiceAccountIssuerURL,
c.ExtraConfig.ServiceAccountJWKSURI,
c.GenericConfig.ExternalAddress,
c.ExtraConfig.ServiceAccountPublicKeys,
c.ControlPlane.Extra.ServiceAccountIssuerURL,
c.ControlPlane.Extra.ServiceAccountJWKSURI,
c.ControlPlane.Generic.ExternalAddress,
c.ControlPlane.Extra.ServiceAccountPublicKeys,
)
if err != nil {
// If there was an error, skip installing the endpoints and log the
@@ -396,7 +377,7 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget)
msg := fmt.Sprintf("Could not construct pre-rendered responses for"+
" ServiceAccountIssuerDiscovery endpoints. Endpoints will not be"+
" enabled. Error: %v", err)
if c.ExtraConfig.ServiceAccountIssuerURL != "" {
if c.ControlPlane.Extra.ServiceAccountIssuerURL != "" {
// The user likely expects this feature to be enabled if issuer URL is
// set and the feature gate is enabled. In the future, if there is no
// longer a feature gate and issuer URL is not set, the user may not
@@ -413,37 +394,37 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget)
m := &Instance{
GenericAPIServer: s,
ClusterAuthenticationInfo: c.ExtraConfig.ClusterAuthenticationInfo,
ClusterAuthenticationInfo: c.ControlPlane.Extra.ClusterAuthenticationInfo,
}
clientset, err := kubernetes.NewForConfig(c.GenericConfig.LoopbackClientConfig)
client, err := kubernetes.NewForConfig(c.ControlPlane.Generic.LoopbackClientConfig)
if err != nil {
return nil, err
}
// TODO: update to a version that caches success but will recheck on failure, unlike memcache discovery
discoveryClientForAdmissionRegistration := clientset.Discovery()
discoveryClientForAdmissionRegistration := client.Discovery()
legacyRESTStorageProvider, err := corerest.New(corerest.Config{
GenericConfig: corerest.GenericConfig{
StorageFactory: c.ExtraConfig.StorageFactory,
EventTTL: c.ExtraConfig.EventTTL,
LoopbackClientConfig: c.GenericConfig.LoopbackClientConfig,
ServiceAccountIssuer: c.ExtraConfig.ServiceAccountIssuer,
ExtendExpiration: c.ExtraConfig.ExtendExpiration,
ServiceAccountMaxExpiration: c.ExtraConfig.ServiceAccountMaxExpiration,
APIAudiences: c.GenericConfig.Authentication.APIAudiences,
Informers: c.ExtraConfig.VersionedInformers,
StorageFactory: c.ControlPlane.Extra.StorageFactory,
EventTTL: c.ControlPlane.Extra.EventTTL,
LoopbackClientConfig: c.ControlPlane.Generic.LoopbackClientConfig,
ServiceAccountIssuer: c.ControlPlane.Extra.ServiceAccountIssuer,
ExtendExpiration: c.ControlPlane.Extra.ExtendExpiration,
ServiceAccountMaxExpiration: c.ControlPlane.Extra.ServiceAccountMaxExpiration,
APIAudiences: c.ControlPlane.Generic.Authentication.APIAudiences,
Informers: c.ControlPlane.Extra.VersionedInformers,
},
Proxy: corerest.ProxyConfig{
Transport: c.ExtraConfig.ProxyTransport,
KubeletClientConfig: c.ExtraConfig.KubeletClientConfig,
Transport: c.ControlPlane.Extra.ProxyTransport,
KubeletClientConfig: c.Extra.KubeletClientConfig,
},
Services: corerest.ServicesConfig{
ClusterIPRange: c.ExtraConfig.ServiceIPRange,
SecondaryClusterIPRange: c.ExtraConfig.SecondaryServiceIPRange,
NodePortRange: c.ExtraConfig.ServiceNodePortRange,
IPRepairInterval: c.ExtraConfig.RepairServicesInterval,
ClusterIPRange: c.Extra.ServiceIPRange,
SecondaryClusterIPRange: c.Extra.SecondaryServiceIPRange,
NodePortRange: c.Extra.ServiceNodePortRange,
IPRepairInterval: c.Extra.RepairServicesInterval,
},
})
if err != nil {
@@ -460,8 +441,8 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget)
restStorageProviders := []RESTStorageProvider{
legacyRESTStorageProvider,
apiserverinternalrest.StorageProvider{},
authenticationrest.RESTStorageProvider{Authenticator: c.GenericConfig.Authentication.Authenticator, APIAudiences: c.GenericConfig.Authentication.APIAudiences},
authorizationrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorization.Authorizer, RuleResolver: c.GenericConfig.RuleResolver},
authenticationrest.RESTStorageProvider{Authenticator: c.ControlPlane.Generic.Authentication.Authenticator, APIAudiences: c.ControlPlane.Generic.Authentication.APIAudiences},
authorizationrest.RESTStorageProvider{Authorizer: c.ControlPlane.Generic.Authorization.Authorizer, RuleResolver: c.ControlPlane.Generic.RuleResolver},
autoscalingrest.RESTStorageProvider{},
batchrest.RESTStorageProvider{},
certificatesrest.RESTStorageProvider{},
@@ -470,47 +451,47 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget)
networkingrest.RESTStorageProvider{},
noderest.RESTStorageProvider{},
policyrest.RESTStorageProvider{},
rbacrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorization.Authorizer},
rbacrest.RESTStorageProvider{Authorizer: c.ControlPlane.Generic.Authorization.Authorizer},
schedulingrest.RESTStorageProvider{},
storagerest.RESTStorageProvider{},
svmrest.RESTStorageProvider{},
flowcontrolrest.RESTStorageProvider{InformerFactory: c.GenericConfig.SharedInformerFactory},
flowcontrolrest.RESTStorageProvider{InformerFactory: c.ControlPlane.Generic.SharedInformerFactory},
// keep apps after extensions so legacy clients resolve the extensions versions of shared resource names.
// See https://github.com/kubernetes/kubernetes/issues/42392
appsrest.StorageProvider{},
admissionregistrationrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorization.Authorizer, DiscoveryClient: discoveryClientForAdmissionRegistration},
eventsrest.RESTStorageProvider{TTL: c.ExtraConfig.EventTTL},
admissionregistrationrest.RESTStorageProvider{Authorizer: c.ControlPlane.Generic.Authorization.Authorizer, DiscoveryClient: discoveryClientForAdmissionRegistration},
eventsrest.RESTStorageProvider{TTL: c.ControlPlane.EventTTL},
resourcerest.RESTStorageProvider{},
}
if err := m.InstallAPIs(c.ExtraConfig.APIResourceConfigSource, c.GenericConfig.RESTOptionsGetter, restStorageProviders...); err != nil {
if err := m.InstallAPIs(c.ControlPlane.Extra.APIResourceConfigSource, c.ControlPlane.Generic.RESTOptionsGetter, restStorageProviders...); err != nil {
return nil, err
}
m.GenericAPIServer.AddPostStartHookOrDie("start-system-namespaces-controller", func(hookContext genericapiserver.PostStartHookContext) error {
go systemnamespaces.NewController(clientset, c.ExtraConfig.VersionedInformers.Core().V1().Namespaces()).Run(hookContext.StopCh)
go systemnamespaces.NewController(client, c.ControlPlane.Extra.VersionedInformers.Core().V1().Namespaces()).Run(hookContext.StopCh)
return nil
})
_, publicServicePort, err := c.GenericConfig.SecureServing.HostPort()
_, publicServicePort, err := c.ControlPlane.Generic.SecureServing.HostPort()
if err != nil {
return nil, fmt.Errorf("failed to get listener address: %w", err)
}
kubernetesServiceCtrl := kubernetesservice.New(kubernetesservice.Config{
PublicIP: c.GenericConfig.PublicAddress,
PublicIP: c.ControlPlane.Generic.PublicAddress,
EndpointReconciler: c.ExtraConfig.EndpointReconcilerConfig.Reconciler,
EndpointInterval: c.ExtraConfig.EndpointReconcilerConfig.Interval,
EndpointReconciler: c.Extra.EndpointReconcilerConfig.Reconciler,
EndpointInterval: c.Extra.EndpointReconcilerConfig.Interval,
ServiceIP: c.ExtraConfig.APIServerServiceIP,
ServicePort: c.ExtraConfig.APIServerServicePort,
ServiceIP: c.Extra.APIServerServiceIP,
ServicePort: c.Extra.APIServerServicePort,
PublicServicePort: publicServicePort,
KubernetesServiceNodePort: c.ExtraConfig.KubernetesServiceNodePort,
}, clientset, c.ExtraConfig.VersionedInformers.Core().V1().Services())
m.GenericAPIServer.AddPostStartHookOrDie("bootstrap-controller", func(hookContext genericapiserver.PostStartHookContext) error {
KubernetesServiceNodePort: c.Extra.KubernetesServiceNodePort,
}, client, c.ControlPlane.Extra.VersionedInformers.Core().V1().Services())
s.AddPostStartHookOrDie("bootstrap-controller", func(hookContext genericapiserver.PostStartHookContext) error {
kubernetesServiceCtrl.Start(hookContext.StopCh)
return nil
})
m.GenericAPIServer.AddPreShutdownHookOrDie("stop-kubernetes-service-controller", func() error {
s.AddPreShutdownHookOrDie("stop-kubernetes-service-controller", func() error {
kubernetesServiceCtrl.Stop()
return nil
})
@@ -518,9 +499,9 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget)
if utilfeature.DefaultFeatureGate.Enabled(features.MultiCIDRServiceAllocator) {
m.GenericAPIServer.AddPostStartHookOrDie("start-kubernetes-service-cidr-controller", func(hookContext genericapiserver.PostStartHookContext) error {
controller := defaultservicecidr.NewController(
c.ExtraConfig.ServiceIPRange,
c.ExtraConfig.SecondaryServiceIPRange,
clientset,
c.Extra.ServiceIPRange,
c.Extra.SecondaryServiceIPRange,
client,
)
// The default serviceCIDR must exist before the apiserver is healthy
// otherwise the allocators for Services will not work.
@@ -530,13 +511,13 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget)
}
if utilfeature.DefaultFeatureGate.Enabled(features.UnknownVersionInteroperabilityProxy) {
peeraddress := getPeerAddress(c.ExtraConfig.PeerAdvertiseAddress, c.GenericConfig.PublicAddress, publicServicePort)
peeraddress := getPeerAddress(c.Extra.PeerAdvertiseAddress, c.ControlPlane.Generic.PublicAddress, publicServicePort)
peerEndpointCtrl := peerreconcilers.New(
c.GenericConfig.APIServerID,
c.ControlPlane.Generic.APIServerID,
peeraddress,
c.ExtraConfig.PeerEndpointLeaseReconciler,
c.ExtraConfig.EndpointReconcilerConfig.Interval,
clientset)
c.Extra.PeerEndpointLeaseReconciler,
c.Extra.EndpointReconcilerConfig.Interval,
client)
if err != nil {
return nil, fmt.Errorf("failed to create peer endpoint lease controller: %w", err)
}
@@ -551,16 +532,16 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget)
return nil
})
// Add PostStartHooks for Unknown Version Proxy filter.
if c.ExtraConfig.PeerProxy != nil {
if c.Extra.PeerProxy != nil {
m.GenericAPIServer.AddPostStartHookOrDie("unknown-version-proxy-filter", func(context genericapiserver.PostStartHookContext) error {
err := c.ExtraConfig.PeerProxy.WaitForCacheSync(context.StopCh)
err := c.Extra.PeerProxy.WaitForCacheSync(context.StopCh)
return err
})
}
}
m.GenericAPIServer.AddPostStartHookOrDie("start-cluster-authentication-info-controller", func(hookContext genericapiserver.PostStartHookContext) error {
controller := clusterauthenticationtrust.NewClusterAuthenticationTrustController(m.ClusterAuthenticationInfo, clientset)
controller := clusterauthenticationtrust.NewClusterAuthenticationTrustController(m.ClusterAuthenticationInfo, client)
// generate a context from stopCh. This is to avoid modifying files which are relying on apiserver
// TODO: See if we can pass ctx to the current method
@@ -601,11 +582,11 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget)
leaseName := m.GenericAPIServer.APIServerID
holderIdentity := m.GenericAPIServer.APIServerID + "_" + string(uuid.NewUUID())
peeraddress := getPeerAddress(c.ExtraConfig.PeerAdvertiseAddress, c.GenericConfig.PublicAddress, publicServicePort)
peeraddress := getPeerAddress(c.Extra.PeerAdvertiseAddress, c.ControlPlane.Generic.PublicAddress, publicServicePort)
// must replace ':,[]' in [ip:port] to be able to store this as a valid label value
controller := lease.NewController(
clock.RealClock{},
clientset,
client,
holderIdentity,
int32(IdentityLeaseDurationSeconds),
nil,
@@ -620,7 +601,7 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget)
// TODO: move this into generic apiserver and make the lease identity value configurable
m.GenericAPIServer.AddPostStartHookOrDie("start-kube-apiserver-identity-lease-garbage-collector", func(hookContext genericapiserver.PostStartHookContext) error {
go apiserverleasegc.NewAPIServerLeaseGC(
clientset,
client,
IdentityLeaseGCPeriod,
metav1.NamespaceSystem,
KubeAPIServerIdentityLeaseLabelSelector,
@@ -630,7 +611,7 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget)
}
m.GenericAPIServer.AddPostStartHookOrDie("start-legacy-token-tracking-controller", func(hookContext genericapiserver.PostStartHookContext) error {
go legacytokentracking.NewController(clientset).Run(hookContext.StopCh)
go legacytokentracking.NewController(client).Run(hookContext.StopCh)
return nil
})
@@ -804,6 +785,18 @@ func DefaultAPIResourceConfigSource() *serverstorage.ResourceConfig {
return ret
}
// CreatePeerEndpointLeaseReconciler creates a apiserver endpoint lease reconciliation loop
// The peer endpoint leases are used to find network locations of apiservers for peer proxy
func CreatePeerEndpointLeaseReconciler(c genericapiserver.Config, storageFactory serverstorage.StorageFactory) (peerreconcilers.PeerEndpointLeaseReconciler, error) {
ttl := DefaultPeerEndpointReconcilerTTL
config, err := storageFactory.NewConfig(api.Resource("apiServerPeerIPInfo"))
if err != nil {
return nil, fmt.Errorf("error creating storage factory config: %w", err)
}
reconciler, err := peerreconcilers.NewPeerEndpointLeaseReconciler(config, "/peerserverleases/", ttl)
return reconciler, err
}
// utility function to get the apiserver address that is used by peer apiservers to proxy
// requests to this apiserver in case the peer is incapable of serving the request
func getPeerAddress(peerAdvertiseAddress peerreconcilers.PeerAdvertiseAddress, publicAddress net.IP, publicServicePort int) string {