kubernetes/pkg/controlplane/apiserver/config.go

196 lines
8.0 KiB
Go

/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package apiserver
import (
"fmt"
"time"
oteltrace "go.opentelemetry.io/otel/trace"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apiserver/pkg/authorization/authorizer"
"k8s.io/apiserver/pkg/endpoints/discovery/aggregated"
openapinamer "k8s.io/apiserver/pkg/endpoints/openapi"
genericfeatures "k8s.io/apiserver/pkg/features"
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/egressselector"
"k8s.io/apiserver/pkg/server/filters"
serverstorage "k8s.io/apiserver/pkg/server/storage"
utilfeature "k8s.io/apiserver/pkg/util/feature"
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
"k8s.io/apiserver/pkg/util/openapi"
clientgoinformers "k8s.io/client-go/informers"
clientgoclientset "k8s.io/client-go/kubernetes"
"k8s.io/component-base/version"
openapicommon "k8s.io/kube-openapi/pkg/common"
"k8s.io/kubernetes/cmd/kube-apiserver/app/options"
"k8s.io/kubernetes/pkg/api/legacyscheme"
"k8s.io/kubernetes/pkg/controlplane"
"k8s.io/kubernetes/pkg/kubeapiserver"
"k8s.io/kubernetes/pkg/kubeapiserver/authorizer/modes"
rbacrest "k8s.io/kubernetes/pkg/registry/rbac/rest"
)
// BuildGenericConfig takes the master server options and produces the genericapiserver.Config associated with it
func BuildGenericConfig(
s *options.ServerRunOptions,
schemes []*runtime.Scheme,
getOpenAPIDefinitions func(ref openapicommon.ReferenceCallback) map[string]openapicommon.OpenAPIDefinition,
) (
genericConfig *genericapiserver.Config,
versionedInformers clientgoinformers.SharedInformerFactory,
storageFactory *serverstorage.DefaultStorageFactory,
lastErr error,
) {
genericConfig = genericapiserver.NewConfig(legacyscheme.Codecs)
genericConfig.MergedResourceConfig = controlplane.DefaultAPIResourceConfigSource()
if lastErr = s.GenericServerRunOptions.ApplyTo(genericConfig); lastErr != nil {
return
}
if lastErr = s.SecureServing.ApplyTo(&genericConfig.SecureServing, &genericConfig.LoopbackClientConfig); lastErr != nil {
return
}
if lastErr = s.Features.ApplyTo(genericConfig); lastErr != nil {
return
}
if lastErr = s.APIEnablement.ApplyTo(genericConfig, controlplane.DefaultAPIResourceConfigSource(), legacyscheme.Scheme); lastErr != nil {
return
}
if lastErr = s.EgressSelector.ApplyTo(genericConfig); lastErr != nil {
return
}
if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerTracing) {
if lastErr = s.Traces.ApplyTo(genericConfig.EgressSelector, genericConfig); lastErr != nil {
return
}
}
// wrap the definitions to revert any changes from disabled features
getOpenAPIDefinitions = openapi.GetOpenAPIDefinitionsWithoutDisabledFeatures(getOpenAPIDefinitions)
namer := openapinamer.NewDefinitionNamer(schemes...)
genericConfig.OpenAPIConfig = genericapiserver.DefaultOpenAPIConfig(getOpenAPIDefinitions, namer)
genericConfig.OpenAPIConfig.Info.Title = "Kubernetes"
genericConfig.OpenAPIV3Config = genericapiserver.DefaultOpenAPIV3Config(getOpenAPIDefinitions, namer)
genericConfig.OpenAPIV3Config.Info.Title = "Kubernetes"
genericConfig.LongRunningFunc = filters.BasicLongRunningRequestCheck(
sets.NewString("watch", "proxy"),
sets.NewString("attach", "exec", "proxy", "log", "portforward"),
)
kubeVersion := version.Get()
genericConfig.Version = &kubeVersion
if genericConfig.EgressSelector != nil {
s.Etcd.StorageConfig.Transport.EgressLookup = genericConfig.EgressSelector.Lookup
}
if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerTracing) {
s.Etcd.StorageConfig.Transport.TracerProvider = genericConfig.TracerProvider
} else {
s.Etcd.StorageConfig.Transport.TracerProvider = oteltrace.NewNoopTracerProvider()
}
storageFactoryConfig := kubeapiserver.NewStorageFactoryConfig()
storageFactoryConfig.APIResourceConfig = genericConfig.MergedResourceConfig
storageFactory, lastErr = storageFactoryConfig.Complete(s.Etcd).New()
if lastErr != nil {
return
}
if lastErr = s.Etcd.ApplyWithStorageFactoryTo(storageFactory, genericConfig); lastErr != nil {
return
}
// Use protobufs for self-communication.
// Since not every generic apiserver has to support protobufs, we
// cannot default to it in generic apiserver and need to explicitly
// set it in kube-apiserver.
genericConfig.LoopbackClientConfig.ContentConfig.ContentType = "application/vnd.kubernetes.protobuf"
// Disable compression for self-communication, since we are going to be
// on a fast local network
genericConfig.LoopbackClientConfig.DisableCompression = true
kubeClientConfig := genericConfig.LoopbackClientConfig
clientgoExternalClient, err := clientgoclientset.NewForConfig(kubeClientConfig)
if err != nil {
lastErr = fmt.Errorf("failed to create real external clientset: %v", err)
return
}
versionedInformers = clientgoinformers.NewSharedInformerFactory(clientgoExternalClient, 10*time.Minute)
// Authentication.ApplyTo requires already applied OpenAPIConfig and EgressSelector if present
if lastErr = s.Authentication.ApplyTo(&genericConfig.Authentication, genericConfig.SecureServing, genericConfig.EgressSelector, genericConfig.OpenAPIConfig, genericConfig.OpenAPIV3Config, clientgoExternalClient, versionedInformers); lastErr != nil {
return
}
genericConfig.Authorization.Authorizer, genericConfig.RuleResolver, err = BuildAuthorizer(s, genericConfig.EgressSelector, versionedInformers)
if err != nil {
lastErr = fmt.Errorf("invalid authorization config: %v", err)
return
}
if !sets.NewString(s.Authorization.Modes...).Has(modes.ModeRBAC) {
genericConfig.DisabledPostStartHooks.Insert(rbacrest.PostStartHookName)
}
lastErr = s.Audit.ApplyTo(genericConfig)
if lastErr != nil {
return
}
if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIPriorityAndFairness) && s.GenericServerRunOptions.EnablePriorityAndFairness {
genericConfig.FlowControl, lastErr = BuildPriorityAndFairness(s, clientgoExternalClient, versionedInformers)
}
if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.AggregatedDiscoveryEndpoint) {
genericConfig.AggregatedDiscoveryGroupManager = aggregated.NewResourceManager("apis")
}
return
}
// BuildAuthorizer constructs the authorizer
func BuildAuthorizer(s *options.ServerRunOptions, EgressSelector *egressselector.EgressSelector, versionedInformers clientgoinformers.SharedInformerFactory) (authorizer.Authorizer, authorizer.RuleResolver, error) {
authorizationConfig := s.Authorization.ToAuthorizationConfig(versionedInformers)
if EgressSelector != nil {
egressDialer, err := EgressSelector.Lookup(egressselector.ControlPlane.AsNetworkContext())
if err != nil {
return nil, nil, err
}
authorizationConfig.CustomDial = egressDialer
}
return authorizationConfig.New()
}
// BuildPriorityAndFairness constructs the guts of the API Priority and Fairness filter
func BuildPriorityAndFairness(s *options.ServerRunOptions, extclient clientgoclientset.Interface, versionedInformer clientgoinformers.SharedInformerFactory) (utilflowcontrol.Interface, error) {
if s.GenericServerRunOptions.MaxRequestsInFlight+s.GenericServerRunOptions.MaxMutatingRequestsInFlight <= 0 {
return nil, fmt.Errorf("invalid configuration: MaxRequestsInFlight=%d and MaxMutatingRequestsInFlight=%d; they must add up to something positive", s.GenericServerRunOptions.MaxRequestsInFlight, s.GenericServerRunOptions.MaxMutatingRequestsInFlight)
}
return utilflowcontrol.New(
versionedInformer,
extclient.FlowcontrolV1beta3(),
s.GenericServerRunOptions.MaxRequestsInFlight+s.GenericServerRunOptions.MaxMutatingRequestsInFlight,
s.GenericServerRunOptions.RequestTimeout/4,
), nil
}