pkg/controlplane: split apart generic server part of instance.go

Signed-off-by: Dr. Stefan Schimanski <stefan.schimanski@gmail.com>
This commit is contained in:
Dr. Stefan Schimanski
2024-04-26 13:09:00 +02:00
parent c6b6163e2e
commit 3b6d2a66a4
11 changed files with 427 additions and 320 deletions

View File

@@ -19,7 +19,6 @@ package controlplane
import (
"fmt"
"net"
"os"
"reflect"
"strconv"
"time"
@@ -53,24 +52,15 @@ import (
storageapiv1alpha1 "k8s.io/api/storage/v1alpha1"
storageapiv1beta1 "k8s.io/api/storage/v1beta1"
svmv1alpha1 "k8s.io/api/storagemigration/v1alpha1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/endpoints/discovery"
apiserverfeatures "k8s.io/apiserver/pkg/features"
peerreconcilers "k8s.io/apiserver/pkg/reconcilers"
"k8s.io/apiserver/pkg/registry/generic"
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/dynamiccertificates"
serverstorage "k8s.io/apiserver/pkg/server/storage"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/kubernetes"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
discoveryclient "k8s.io/client-go/kubernetes/typed/discovery/v1"
"k8s.io/component-helpers/apimachinery/lease"
"k8s.io/klog/v2"
api "k8s.io/kubernetes/pkg/apis/core"
flowcontrolv1 "k8s.io/kubernetes/pkg/apis/flowcontrol/v1"
@@ -79,19 +69,12 @@ import (
flowcontrolv1beta3 "k8s.io/kubernetes/pkg/apis/flowcontrol/v1beta3"
controlplaneapiserver "k8s.io/kubernetes/pkg/controlplane/apiserver"
"k8s.io/kubernetes/pkg/controlplane/apiserver/options"
"k8s.io/kubernetes/pkg/controlplane/controller/apiserverleasegc"
"k8s.io/kubernetes/pkg/controlplane/controller/clusterauthenticationtrust"
"k8s.io/kubernetes/pkg/controlplane/controller/defaultservicecidr"
"k8s.io/kubernetes/pkg/controlplane/controller/kubernetesservice"
"k8s.io/kubernetes/pkg/controlplane/controller/legacytokentracking"
"k8s.io/kubernetes/pkg/controlplane/controller/systemnamespaces"
"k8s.io/kubernetes/pkg/controlplane/reconcilers"
"k8s.io/kubernetes/pkg/features"
kubeoptions "k8s.io/kubernetes/pkg/kubeapiserver/options"
kubeletclient "k8s.io/kubernetes/pkg/kubelet/client"
"k8s.io/kubernetes/pkg/routes"
"k8s.io/kubernetes/pkg/serviceaccount"
"k8s.io/utils/clock"
// RESTStorage installers
admissionregistrationrest "k8s.io/kubernetes/pkg/registry/admissionregistration/rest"
@@ -126,27 +109,14 @@ const (
// IdentityLeaseComponentLabelKey is used to apply a component label to identity lease objects, indicating:
// 1. the lease is an identity lease (different from leader election leases)
// 2. which component owns this lease
IdentityLeaseComponentLabelKey = "apiserver.kubernetes.io/identity"
// TODO(sttts): remove this indirection
IdentityLeaseComponentLabelKey = controlplaneapiserver.IdentityLeaseComponentLabelKey
// KubeAPIServer defines variable used internally when referring to kube-apiserver component
KubeAPIServer = "kube-apiserver"
// KubeAPIServerIdentityLeaseLabelSelector selects kube-apiserver identity leases
KubeAPIServerIdentityLeaseLabelSelector = IdentityLeaseComponentLabelKey + "=" + KubeAPIServer
// repairLoopInterval defines the interval used to run the Services ClusterIP and NodePort repair loops
repairLoopInterval = 3 * time.Minute
)
var (
// IdentityLeaseGCPeriod is the interval which the lease GC controller checks for expired leases
// IdentityLeaseGCPeriod is exposed so integration tests can tune this value.
IdentityLeaseGCPeriod = 3600 * time.Second
// IdentityLeaseDurationSeconds is the duration of kube-apiserver lease in seconds
// IdentityLeaseDurationSeconds is exposed so integration tests can tune this value.
IdentityLeaseDurationSeconds = 3600
// IdentityLeaseRenewIntervalPeriod is the interval of kube-apiserver renewing its lease in seconds
// IdentityLeaseRenewIntervalPeriod is exposed so integration tests can tune this value.
IdentityLeaseRenewIntervalPeriod = 10 * time.Second
)
// Extra defines extra configuration for kube-apiserver
type Extra struct {
EndpointReconcilerConfig EndpointReconcilerConfig
@@ -222,9 +192,7 @@ type EndpointReconcilerConfig struct {
// Instance contains state for a Kubernetes cluster api server instance.
type Instance struct {
GenericAPIServer *genericapiserver.GenericAPIServer
ClusterAuthenticationInfo clusterauthenticationtrust.ClusterAuthenticationInfo
ControlPlane *controlplaneapiserver.Server
}
func (c *Config) createMasterCountReconciler() reconcilers.EndpointReconciler {
@@ -340,49 +308,13 @@ func (c CompletedConfig) New(delegationTarget genericapiserver.DelegationTarget)
return nil, fmt.Errorf("Master.New() called with empty config.KubeletClientConfig")
}
s, err := c.ControlPlane.Generic.New("kube-apiserver", delegationTarget)
cp, err := c.ControlPlane.New(KubeAPIServer, delegationTarget)
if err != nil {
return nil, err
}
if c.ControlPlane.Extra.EnableLogsSupport {
routes.Logs{}.Install(s.Handler.GoRestfulContainer)
}
// Metadata and keys are expected to only change across restarts at present,
// so we just marshal immediately and serve the cached JSON bytes.
md, err := serviceaccount.NewOpenIDMetadata(
c.ControlPlane.Extra.ServiceAccountIssuerURL,
c.ControlPlane.Extra.ServiceAccountJWKSURI,
c.ControlPlane.Generic.ExternalAddress,
c.ControlPlane.Extra.ServiceAccountPublicKeys,
)
if err != nil {
// If there was an error, skip installing the endpoints and log the
// error, but continue on. We don't return the error because the
// metadata responses require additional, backwards incompatible
// validation of command-line options.
msg := fmt.Sprintf("Could not construct pre-rendered responses for"+
" ServiceAccountIssuerDiscovery endpoints. Endpoints will not be"+
" enabled. Error: %v", err)
if c.ControlPlane.Extra.ServiceAccountIssuerURL != "" {
// The user likely expects this feature to be enabled if issuer URL is
// set and the feature gate is enabled. In the future, if there is no
// longer a feature gate and issuer URL is not set, the user may not
// expect this feature to be enabled. We log the former case as an Error
// and the latter case as an Info.
klog.Error(msg)
} else {
klog.Info(msg)
}
} else {
routes.NewOpenIDMetadataServer(md.ConfigJSON, md.PublicKeysetJSON).
Install(s.Handler.GoRestfulContainer)
}
m := &Instance{
GenericAPIServer: s,
ClusterAuthenticationInfo: c.ControlPlane.Extra.ClusterAuthenticationInfo,
s := &Instance{
ControlPlane: cp,
}
client, err := kubernetes.NewForConfig(c.ControlPlane.Generic.LoopbackClientConfig)
@@ -426,7 +358,7 @@ func (c CompletedConfig) New(delegationTarget genericapiserver.DelegationTarget)
// with specific priorities.
// TODO: describe the priority all the way down in the RESTStorageProviders and plumb it back through the various discovery
// handlers that we have.
restStorageProviders := []RESTStorageProvider{
restStorageProviders := []controlplaneapiserver.RESTStorageProvider{
legacyRESTStorageProvider,
apiserverinternalrest.StorageProvider{},
authenticationrest.RESTStorageProvider{Authenticator: c.ControlPlane.Generic.Authentication.Authenticator, APIAudiences: c.ControlPlane.Generic.Authentication.APIAudiences},
@@ -451,15 +383,10 @@ func (c CompletedConfig) New(delegationTarget genericapiserver.DelegationTarget)
eventsrest.RESTStorageProvider{TTL: c.ControlPlane.EventTTL},
resourcerest.RESTStorageProvider{},
}
if err := m.InstallAPIs(c.ControlPlane.Extra.APIResourceConfigSource, c.ControlPlane.Generic.RESTOptionsGetter, restStorageProviders...); err != nil {
if err := s.ControlPlane.InstallAPIs(restStorageProviders...); err != nil {
return nil, err
}
m.GenericAPIServer.AddPostStartHookOrDie("start-system-namespaces-controller", func(hookContext genericapiserver.PostStartHookContext) error {
go systemnamespaces.NewController(c.ControlPlane.SystemNamespaces, client, c.ControlPlane.Extra.VersionedInformers.Core().V1().Namespaces()).Run(hookContext.StopCh)
return nil
})
_, publicServicePort, err := c.ControlPlane.Generic.SecureServing.HostPort()
if err != nil {
return nil, fmt.Errorf("failed to get listener address: %w", err)
@@ -475,17 +402,17 @@ func (c CompletedConfig) New(delegationTarget genericapiserver.DelegationTarget)
PublicServicePort: publicServicePort,
KubernetesServiceNodePort: c.Extra.KubernetesServiceNodePort,
}, client, c.ControlPlane.Extra.VersionedInformers.Core().V1().Services())
s.AddPostStartHookOrDie("bootstrap-controller", func(hookContext genericapiserver.PostStartHookContext) error {
s.ControlPlane.GenericAPIServer.AddPostStartHookOrDie("bootstrap-controller", func(hookContext genericapiserver.PostStartHookContext) error {
kubernetesServiceCtrl.Start(hookContext.StopCh)
return nil
})
s.AddPreShutdownHookOrDie("stop-kubernetes-service-controller", func() error {
s.ControlPlane.GenericAPIServer.AddPreShutdownHookOrDie("stop-kubernetes-service-controller", func() error {
kubernetesServiceCtrl.Stop()
return nil
})
if utilfeature.DefaultFeatureGate.Enabled(features.MultiCIDRServiceAllocator) {
m.GenericAPIServer.AddPostStartHookOrDie("start-kubernetes-service-cidr-controller", func(hookContext genericapiserver.PostStartHookContext) error {
s.ControlPlane.GenericAPIServer.AddPostStartHookOrDie("start-kubernetes-service-cidr-controller", func(hookContext genericapiserver.PostStartHookContext) error {
controller := defaultservicecidr.NewController(
c.Extra.ServiceIPRange,
c.Extra.SecondaryServiceIPRange,
@@ -498,208 +425,7 @@ func (c CompletedConfig) New(delegationTarget genericapiserver.DelegationTarget)
})
}
if utilfeature.DefaultFeatureGate.Enabled(features.UnknownVersionInteroperabilityProxy) {
peeraddress := getPeerAddress(c.ControlPlane.Extra.PeerAdvertiseAddress, c.ControlPlane.Generic.PublicAddress, publicServicePort)
peerEndpointCtrl := peerreconcilers.New(
c.ControlPlane.Generic.APIServerID,
peeraddress,
c.ControlPlane.Extra.PeerEndpointLeaseReconciler,
c.Extra.EndpointReconcilerConfig.Interval,
client)
if err != nil {
return nil, fmt.Errorf("failed to create peer endpoint lease controller: %w", err)
}
m.GenericAPIServer.AddPostStartHookOrDie("peer-endpoint-reconciler-controller",
func(hookContext genericapiserver.PostStartHookContext) error {
peerEndpointCtrl.Start(hookContext.StopCh)
return nil
})
m.GenericAPIServer.AddPreShutdownHookOrDie("peer-endpoint-reconciler-controller",
func() error {
peerEndpointCtrl.Stop()
return nil
})
// Add PostStartHooks for Unknown Version Proxy filter.
if c.ControlPlane.Extra.PeerProxy != nil {
m.GenericAPIServer.AddPostStartHookOrDie("unknown-version-proxy-filter", func(context genericapiserver.PostStartHookContext) error {
err := c.ControlPlane.Extra.PeerProxy.WaitForCacheSync(context.StopCh)
return err
})
}
}
m.GenericAPIServer.AddPostStartHookOrDie("start-cluster-authentication-info-controller", func(hookContext genericapiserver.PostStartHookContext) error {
controller := clusterauthenticationtrust.NewClusterAuthenticationTrustController(m.ClusterAuthenticationInfo, client)
// generate a context from stopCh. This is to avoid modifying files which are relying on apiserver
// TODO: See if we can pass ctx to the current method
ctx := wait.ContextForChannel(hookContext.StopCh)
// prime values and start listeners
if m.ClusterAuthenticationInfo.ClientCA != nil {
m.ClusterAuthenticationInfo.ClientCA.AddListener(controller)
if controller, ok := m.ClusterAuthenticationInfo.ClientCA.(dynamiccertificates.ControllerRunner); ok {
// runonce to be sure that we have a value.
if err := controller.RunOnce(ctx); err != nil {
runtime.HandleError(err)
}
go controller.Run(ctx, 1)
}
}
if m.ClusterAuthenticationInfo.RequestHeaderCA != nil {
m.ClusterAuthenticationInfo.RequestHeaderCA.AddListener(controller)
if controller, ok := m.ClusterAuthenticationInfo.RequestHeaderCA.(dynamiccertificates.ControllerRunner); ok {
// runonce to be sure that we have a value.
if err := controller.RunOnce(ctx); err != nil {
runtime.HandleError(err)
}
go controller.Run(ctx, 1)
}
}
go controller.Run(ctx, 1)
return nil
})
if utilfeature.DefaultFeatureGate.Enabled(apiserverfeatures.APIServerIdentity) {
m.GenericAPIServer.AddPostStartHookOrDie("start-kube-apiserver-identity-lease-controller", func(hookContext genericapiserver.PostStartHookContext) error {
// generate a context from stopCh. This is to avoid modifying files which are relying on apiserver
// TODO: See if we can pass ctx to the current method
ctx := wait.ContextForChannel(hookContext.StopCh)
leaseName := m.GenericAPIServer.APIServerID
holderIdentity := m.GenericAPIServer.APIServerID + "_" + string(uuid.NewUUID())
peeraddress := getPeerAddress(c.ControlPlane.Extra.PeerAdvertiseAddress, c.ControlPlane.Generic.PublicAddress, publicServicePort)
// must replace ':,[]' in [ip:port] to be able to store this as a valid label value
controller := lease.NewController(
clock.RealClock{},
client,
holderIdentity,
int32(IdentityLeaseDurationSeconds),
nil,
IdentityLeaseRenewIntervalPeriod,
leaseName,
metav1.NamespaceSystem,
// TODO: receive identity label value as a parameter when post start hook is moved to generic apiserver.
labelAPIServerHeartbeatFunc(KubeAPIServer, peeraddress))
go controller.Run(ctx)
return nil
})
// TODO: move this into generic apiserver and make the lease identity value configurable
m.GenericAPIServer.AddPostStartHookOrDie("start-kube-apiserver-identity-lease-garbage-collector", func(hookContext genericapiserver.PostStartHookContext) error {
go apiserverleasegc.NewAPIServerLeaseGC(
client,
IdentityLeaseGCPeriod,
metav1.NamespaceSystem,
KubeAPIServerIdentityLeaseLabelSelector,
).Run(hookContext.StopCh)
return nil
})
}
m.GenericAPIServer.AddPostStartHookOrDie("start-legacy-token-tracking-controller", func(hookContext genericapiserver.PostStartHookContext) error {
go legacytokentracking.NewController(client).Run(hookContext.StopCh)
return nil
})
return m, nil
}
func labelAPIServerHeartbeatFunc(identity string, peeraddress string) lease.ProcessLeaseFunc {
return func(lease *coordinationapiv1.Lease) error {
if lease.Labels == nil {
lease.Labels = map[string]string{}
}
if lease.Annotations == nil {
lease.Annotations = map[string]string{}
}
// This label indiciates the identity of the lease object.
lease.Labels[IdentityLeaseComponentLabelKey] = identity
hostname, err := os.Hostname()
if err != nil {
return err
}
// convenience label to easily map a lease object to a specific apiserver
lease.Labels[apiv1.LabelHostname] = hostname
// Include apiserver network location <ip_port> used by peers to proxy requests between kube-apiservers
if utilfeature.DefaultFeatureGate.Enabled(features.UnknownVersionInteroperabilityProxy) {
if peeraddress != "" {
lease.Annotations[apiv1.AnnotationPeerAdvertiseAddress] = peeraddress
}
}
return nil
}
}
// RESTStorageProvider is a factory type for REST storage.
type RESTStorageProvider interface {
GroupName() string
NewRESTStorage(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter) (genericapiserver.APIGroupInfo, error)
}
// InstallAPIs will install the APIs for the restStorageProviders if they are enabled.
func (m *Instance) InstallAPIs(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter, restStorageProviders ...RESTStorageProvider) error {
nonLegacy := []*genericapiserver.APIGroupInfo{}
// used later in the loop to filter the served resource by those that have expired.
resourceExpirationEvaluator, err := genericapiserver.NewResourceExpirationEvaluator(*m.GenericAPIServer.Version)
if err != nil {
return err
}
for _, restStorageBuilder := range restStorageProviders {
groupName := restStorageBuilder.GroupName()
apiGroupInfo, err := restStorageBuilder.NewRESTStorage(apiResourceConfigSource, restOptionsGetter)
if err != nil {
return fmt.Errorf("problem initializing API group %q : %v", groupName, err)
}
if len(apiGroupInfo.VersionedResourcesStorageMap) == 0 {
// If we have no storage for any resource configured, this API group is effectively disabled.
// This can happen when an entire API group, version, or development-stage (alpha, beta, GA) is disabled.
klog.Infof("API group %q is not enabled, skipping.", groupName)
continue
}
// Remove resources that serving kinds that are removed.
// We do this here so that we don't accidentally serve versions without resources or openapi information that for kinds we don't serve.
// This is a spot above the construction of individual storage handlers so that no sig accidentally forgets to check.
resourceExpirationEvaluator.RemoveDeletedKinds(groupName, apiGroupInfo.Scheme, apiGroupInfo.VersionedResourcesStorageMap)
if len(apiGroupInfo.VersionedResourcesStorageMap) == 0 {
klog.V(1).Infof("Removing API group %v because it is time to stop serving it because it has no versions per APILifecycle.", groupName)
continue
}
klog.V(1).Infof("Enabling API group %q.", groupName)
if postHookProvider, ok := restStorageBuilder.(genericapiserver.PostStartHookProvider); ok {
name, hook, err := postHookProvider.PostStartHook()
if err != nil {
klog.Fatalf("Error building PostStartHook: %v", err)
}
m.GenericAPIServer.AddPostStartHookOrDie(name, hook)
}
if len(groupName) == 0 {
// the legacy group for core APIs is special that it is installed into /api via this special install method.
if err := m.GenericAPIServer.InstallLegacyAPIGroup(genericapiserver.DefaultLegacyAPIPrefix, &apiGroupInfo); err != nil {
return fmt.Errorf("error in registering legacy API: %w", err)
}
} else {
// everything else goes to /apis
nonLegacy = append(nonLegacy, &apiGroupInfo)
}
}
if err := m.GenericAPIServer.InstallAPIGroups(nonLegacy...); err != nil {
return fmt.Errorf("error in registering group versions: %v", err)
}
return nil
return s, nil
}
var (
@@ -772,13 +498,3 @@ func DefaultAPIResourceConfigSource() *serverstorage.ResourceConfig {
return ret
}
// utility function to get the apiserver address that is used by peer apiservers to proxy
// requests to this apiserver in case the peer is incapable of serving the request
func getPeerAddress(peerAdvertiseAddress peerreconcilers.PeerAdvertiseAddress, publicAddress net.IP, publicServicePort int) string {
if peerAdvertiseAddress.PeerAdvertiseIP != "" && peerAdvertiseAddress.PeerAdvertisePort != "" {
return net.JoinHostPort(peerAdvertiseAddress.PeerAdvertiseIP, peerAdvertiseAddress.PeerAdvertisePort)
} else {
return net.JoinHostPort(publicAddress.String(), strconv.Itoa(publicServicePort))
}
}