kubernetes/cmd/kube-apiserver/app/server.go

553 lines
22 KiB
Go

/*
Copyright 2014 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package app does all of the work necessary to create a Kubernetes
// APIServer by binding together the API, master and APIServer infrastructure.
// It can be configured and called directly or via the hyperkube framework.
package app
import (
"crypto/tls"
"fmt"
"net"
"net/http"
"net/url"
"os"
"strings"
"time"
"github.com/spf13/cobra"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
extensionsapiserver "k8s.io/apiextensions-apiserver/pkg/apiserver"
"k8s.io/apimachinery/pkg/runtime"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
utilnet "k8s.io/apimachinery/pkg/util/net"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apiserver/pkg/admission"
"k8s.io/apiserver/pkg/cel/openapi/resolver"
genericapifilters "k8s.io/apiserver/pkg/endpoints/filters"
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/egressselector"
serveroptions "k8s.io/apiserver/pkg/server/options"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/apiserver/pkg/util/notfoundhandler"
"k8s.io/apiserver/pkg/util/webhook"
"k8s.io/client-go/dynamic"
clientgoinformers "k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
k8sscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"k8s.io/client-go/util/keyutil"
cliflag "k8s.io/component-base/cli/flag"
"k8s.io/component-base/cli/globalflag"
"k8s.io/component-base/logs"
logsapi "k8s.io/component-base/logs/api/v1"
_ "k8s.io/component-base/metrics/prometheus/workqueue"
"k8s.io/component-base/term"
"k8s.io/component-base/version"
"k8s.io/component-base/version/verflag"
"k8s.io/klog/v2"
aggregatorapiserver "k8s.io/kube-aggregator/pkg/apiserver"
aggregatorscheme "k8s.io/kube-aggregator/pkg/apiserver/scheme"
netutils "k8s.io/utils/net"
"k8s.io/kubernetes/cmd/kube-apiserver/app/options"
"k8s.io/kubernetes/pkg/api/legacyscheme"
"k8s.io/kubernetes/pkg/capabilities"
"k8s.io/kubernetes/pkg/controlplane"
controlplaneapiserver "k8s.io/kubernetes/pkg/controlplane/apiserver"
"k8s.io/kubernetes/pkg/controlplane/reconcilers"
generatedopenapi "k8s.io/kubernetes/pkg/generated/openapi"
"k8s.io/kubernetes/pkg/kubeapiserver"
kubeapiserveradmission "k8s.io/kubernetes/pkg/kubeapiserver/admission"
kubeauthenticator "k8s.io/kubernetes/pkg/kubeapiserver/authenticator"
"k8s.io/kubernetes/pkg/serviceaccount"
)
func init() {
utilruntime.Must(logsapi.AddFeatureGates(utilfeature.DefaultMutableFeatureGate))
}
// NewAPIServerCommand creates a *cobra.Command object with default parameters
func NewAPIServerCommand() *cobra.Command {
s := options.NewServerRunOptions()
cmd := &cobra.Command{
Use: "kube-apiserver",
Long: `The Kubernetes API server validates and configures data
for the api objects which include pods, services, replicationcontrollers, and
others. The API Server services REST operations and provides the frontend to the
cluster's shared state through which all other components interact.`,
// stop printing usage when the command errors
SilenceUsage: true,
PersistentPreRunE: func(*cobra.Command, []string) error {
// silence client-go warnings.
// kube-apiserver loopback clients should not log self-issued warnings.
rest.SetDefaultWarningHandler(rest.NoWarnings{})
return nil
},
RunE: func(cmd *cobra.Command, args []string) error {
verflag.PrintAndExitIfRequested()
fs := cmd.Flags()
// Activate logging as soon as possible, after that
// show flags with the final logging configuration.
if err := logsapi.ValidateAndApply(s.Logs, utilfeature.DefaultFeatureGate); err != nil {
return err
}
cliflag.PrintFlags(fs)
// set default options
completedOptions, err := Complete(s)
if err != nil {
return err
}
// validate options
if errs := completedOptions.Validate(); len(errs) != 0 {
return utilerrors.NewAggregate(errs)
}
// add feature enablement metrics
utilfeature.DefaultMutableFeatureGate.AddMetrics()
return Run(completedOptions, genericapiserver.SetupSignalHandler())
},
Args: func(cmd *cobra.Command, args []string) error {
for _, arg := range args {
if len(arg) > 0 {
return fmt.Errorf("%q does not take any arguments, got %q", cmd.CommandPath(), args)
}
}
return nil
},
}
fs := cmd.Flags()
namedFlagSets := s.Flags()
verflag.AddFlags(namedFlagSets.FlagSet("global"))
globalflag.AddGlobalFlags(namedFlagSets.FlagSet("global"), cmd.Name(), logs.SkipLoggingConfigurationFlags())
options.AddCustomGlobalFlags(namedFlagSets.FlagSet("generic"))
for _, f := range namedFlagSets.FlagSets {
fs.AddFlagSet(f)
}
cols, _, _ := term.TerminalSize(cmd.OutOrStdout())
cliflag.SetUsageAndHelpFunc(cmd, namedFlagSets, cols)
return cmd
}
// Run runs the specified APIServer. This should never exit.
func Run(options completedServerRunOptions, stopCh <-chan struct{}) error {
// To help debugging, immediately log version
klog.Infof("Version: %+v", version.Get())
klog.InfoS("Golang settings", "GOGC", os.Getenv("GOGC"), "GOMAXPROCS", os.Getenv("GOMAXPROCS"), "GOTRACEBACK", os.Getenv("GOTRACEBACK"))
config, err := NewConfig(options)
if err != nil {
return err
}
completed, err := config.Complete()
if err != nil {
return err
}
server, err := CreateServerChain(completed)
if err != nil {
return err
}
prepared, err := server.PrepareRun()
if err != nil {
return err
}
return prepared.Run(stopCh)
}
// CreateServerChain creates the apiservers connected via delegation.
func CreateServerChain(config CompletedConfig) (*aggregatorapiserver.APIAggregator, error) {
notFoundHandler := notfoundhandler.New(config.ControlPlane.GenericConfig.Serializer, genericapifilters.NoMuxAndDiscoveryIncompleteKey)
apiExtensionsServer, err := config.ApiExtensions.New(genericapiserver.NewEmptyDelegateWithCustomHandler(notFoundHandler))
if err != nil {
return nil, err
}
crdAPIEnabled := config.ApiExtensions.GenericConfig.MergedResourceConfig.ResourceEnabled(apiextensionsv1.SchemeGroupVersion.WithResource("customresourcedefinitions"))
kubeAPIServer, err := config.ControlPlane.New(apiExtensionsServer.GenericAPIServer)
if err != nil {
return nil, err
}
// aggregator comes last in the chain
aggregatorServer, err := createAggregatorServer(config.Aggregator, kubeAPIServer.GenericAPIServer, apiExtensionsServer.Informers, crdAPIEnabled)
if err != nil {
// we don't need special handling for innerStopCh because the aggregator server doesn't create any go routines
return nil, err
}
return aggregatorServer, nil
}
// CreateProxyTransport creates the dialer infrastructure to connect to the nodes.
func CreateProxyTransport() *http.Transport {
var proxyDialerFn utilnet.DialFunc
// Proxying to pods and services is IP-based... don't expect to be able to verify the hostname
proxyTLSClientConfig := &tls.Config{InsecureSkipVerify: true}
proxyTransport := utilnet.SetTransportDefaults(&http.Transport{
DialContext: proxyDialerFn,
TLSClientConfig: proxyTLSClientConfig,
})
return proxyTransport
}
// CreateKubeAPIServerConfig creates all the resources for running the API server, but runs none of them
func CreateKubeAPIServerConfig(s completedServerRunOptions) (
*controlplane.Config,
aggregatorapiserver.ServiceResolver,
[]admission.PluginInitializer,
error,
) {
proxyTransport := CreateProxyTransport()
genericConfig, versionedInformers, storageFactory, err := controlplaneapiserver.BuildGenericConfig(
s.ServerRunOptions,
[]*runtime.Scheme{legacyscheme.Scheme, extensionsapiserver.Scheme, aggregatorscheme.Scheme},
generatedopenapi.GetOpenAPIDefinitions,
)
if err != nil {
return nil, nil, nil, err
}
capabilities.Setup(s.AllowPrivileged, s.MaxConnectionBytesPerSec)
s.Metrics.Apply()
serviceaccount.RegisterMetrics()
config := &controlplane.Config{
GenericConfig: genericConfig,
ExtraConfig: controlplane.ExtraConfig{
APIResourceConfigSource: storageFactory.APIResourceConfigSource,
StorageFactory: storageFactory,
EventTTL: s.EventTTL,
KubeletClientConfig: s.KubeletConfig,
EnableLogsSupport: s.EnableLogsHandler,
ProxyTransport: proxyTransport,
ServiceIPRange: s.PrimaryServiceClusterIPRange,
APIServerServiceIP: s.APIServerServiceIP,
SecondaryServiceIPRange: s.SecondaryServiceClusterIPRange,
APIServerServicePort: 443,
ServiceNodePortRange: s.ServiceNodePortRange,
KubernetesServiceNodePort: s.KubernetesServiceNodePort,
EndpointReconcilerType: reconcilers.Type(s.EndpointReconcilerType),
MasterCount: s.MasterCount,
ServiceAccountIssuer: s.ServiceAccountIssuer,
ServiceAccountMaxExpiration: s.ServiceAccountTokenMaxExpiration,
ExtendExpiration: s.Authentication.ServiceAccounts.ExtendExpiration,
VersionedInformers: versionedInformers,
},
}
clientCAProvider, err := s.Authentication.ClientCert.GetClientCAContentProvider()
if err != nil {
return nil, nil, nil, err
}
config.ExtraConfig.ClusterAuthenticationInfo.ClientCA = clientCAProvider
requestHeaderConfig, err := s.Authentication.RequestHeader.ToAuthenticationRequestHeaderConfig()
if err != nil {
return nil, nil, nil, err
}
if requestHeaderConfig != nil {
config.ExtraConfig.ClusterAuthenticationInfo.RequestHeaderCA = requestHeaderConfig.CAContentProvider
config.ExtraConfig.ClusterAuthenticationInfo.RequestHeaderAllowedNames = requestHeaderConfig.AllowedClientNames
config.ExtraConfig.ClusterAuthenticationInfo.RequestHeaderExtraHeaderPrefixes = requestHeaderConfig.ExtraHeaderPrefixes
config.ExtraConfig.ClusterAuthenticationInfo.RequestHeaderGroupHeaders = requestHeaderConfig.GroupHeaders
config.ExtraConfig.ClusterAuthenticationInfo.RequestHeaderUsernameHeaders = requestHeaderConfig.UsernameHeaders
}
// setup admission
admissionConfig := &kubeapiserveradmission.Config{
ExternalInformers: versionedInformers,
LoopbackClientConfig: genericConfig.LoopbackClientConfig,
CloudConfigFile: s.CloudProvider.CloudConfigFile,
}
serviceResolver := buildServiceResolver(s.EnableAggregatorRouting, genericConfig.LoopbackClientConfig.Host, versionedInformers)
schemaResolver := resolver.NewDefinitionsSchemaResolver(k8sscheme.Scheme, genericConfig.OpenAPIConfig.GetDefinitions)
pluginInitializers, admissionPostStartHook, err := admissionConfig.New(proxyTransport, genericConfig.EgressSelector, serviceResolver, genericConfig.TracerProvider, schemaResolver)
if err != nil {
return nil, nil, nil, fmt.Errorf("failed to create admission plugin initializer: %v", err)
}
clientgoExternalClient, err := clientset.NewForConfig(genericConfig.LoopbackClientConfig)
if err != nil {
return nil, nil, nil, fmt.Errorf("failed to create real client-go external client: %w", err)
}
dynamicExternalClient, err := dynamic.NewForConfig(genericConfig.LoopbackClientConfig)
if err != nil {
return nil, nil, nil, fmt.Errorf("failed to create real dynamic external client: %w", err)
}
err = s.Admission.ApplyTo(
genericConfig,
versionedInformers,
clientgoExternalClient,
dynamicExternalClient,
utilfeature.DefaultFeatureGate,
pluginInitializers...)
if err != nil {
return nil, nil, nil, fmt.Errorf("failed to apply admission: %w", err)
}
if err := config.GenericConfig.AddPostStartHook("start-kube-apiserver-admission-initializer", admissionPostStartHook); err != nil {
return nil, nil, nil, err
}
if config.GenericConfig.EgressSelector != nil {
// Use the config.GenericConfig.EgressSelector lookup to find the dialer to connect to the kubelet
config.ExtraConfig.KubeletClientConfig.Lookup = config.GenericConfig.EgressSelector.Lookup
// Use the config.GenericConfig.EgressSelector lookup as the transport used by the "proxy" subresources.
networkContext := egressselector.Cluster.AsNetworkContext()
dialer, err := config.GenericConfig.EgressSelector.Lookup(networkContext)
if err != nil {
return nil, nil, nil, err
}
c := proxyTransport.Clone()
c.DialContext = dialer
config.ExtraConfig.ProxyTransport = c
}
// Load and set the public keys.
var pubKeys []interface{}
for _, f := range s.Authentication.ServiceAccounts.KeyFiles {
keys, err := keyutil.PublicKeysFromFile(f)
if err != nil {
return nil, nil, nil, fmt.Errorf("failed to parse key file %q: %v", f, err)
}
pubKeys = append(pubKeys, keys...)
}
config.ExtraConfig.ServiceAccountIssuerURL = s.Authentication.ServiceAccounts.Issuers[0]
config.ExtraConfig.ServiceAccountJWKSURI = s.Authentication.ServiceAccounts.JWKSURI
config.ExtraConfig.ServiceAccountPublicKeys = pubKeys
return config, serviceResolver, pluginInitializers, nil
}
// completedServerRunOptions is a private wrapper that enforces a call of Complete() before Run can be invoked.
type completedServerRunOptions struct {
*options.ServerRunOptions
}
// Complete set default ServerRunOptions.
// Should be called after kube-apiserver flags parsed.
func Complete(s *options.ServerRunOptions) (completedServerRunOptions, error) {
var options completedServerRunOptions
// set defaults
if err := s.GenericServerRunOptions.DefaultAdvertiseAddress(s.SecureServing.SecureServingOptions); err != nil {
return options, err
}
// process s.ServiceClusterIPRange from list to Primary and Secondary
// we process secondary only if provided by user
apiServerServiceIP, primaryServiceIPRange, secondaryServiceIPRange, err := getServiceIPAndRanges(s.ServiceClusterIPRanges)
if err != nil {
return options, err
}
s.PrimaryServiceClusterIPRange = primaryServiceIPRange
s.SecondaryServiceClusterIPRange = secondaryServiceIPRange
s.APIServerServiceIP = apiServerServiceIP
if err := s.SecureServing.MaybeDefaultWithSelfSignedCerts(s.GenericServerRunOptions.AdvertiseAddress.String(), []string{"kubernetes.default.svc", "kubernetes.default", "kubernetes"}, []net.IP{apiServerServiceIP}); err != nil {
return options, fmt.Errorf("error creating self-signed certificates: %v", err)
}
if len(s.GenericServerRunOptions.ExternalHost) == 0 {
if len(s.GenericServerRunOptions.AdvertiseAddress) > 0 {
s.GenericServerRunOptions.ExternalHost = s.GenericServerRunOptions.AdvertiseAddress.String()
} else {
hostname, err := os.Hostname()
if err != nil {
return options, fmt.Errorf("error finding host name: %v", err)
}
s.GenericServerRunOptions.ExternalHost = hostname
}
klog.Infof("external host was not specified, using %v", s.GenericServerRunOptions.ExternalHost)
}
s.Authentication.ApplyAuthorization(s.Authorization)
// Use (ServiceAccountSigningKeyFile != "") as a proxy to the user enabling
// TokenRequest functionality. This defaulting was convenient, but messed up
// a lot of people when they rotated their serving cert with no idea it was
// connected to their service account keys. We are taking this opportunity to
// remove this problematic defaulting.
if s.ServiceAccountSigningKeyFile == "" {
// Default to the private server key for service account token signing
if len(s.Authentication.ServiceAccounts.KeyFiles) == 0 && s.SecureServing.ServerCert.CertKey.KeyFile != "" {
if kubeauthenticator.IsValidServiceAccountKeyFile(s.SecureServing.ServerCert.CertKey.KeyFile) {
s.Authentication.ServiceAccounts.KeyFiles = []string{s.SecureServing.ServerCert.CertKey.KeyFile}
} else {
klog.Warning("No TLS key provided, service account token authentication disabled")
}
}
}
if s.ServiceAccountSigningKeyFile != "" && len(s.Authentication.ServiceAccounts.Issuers) != 0 && s.Authentication.ServiceAccounts.Issuers[0] != "" {
sk, err := keyutil.PrivateKeyFromFile(s.ServiceAccountSigningKeyFile)
if err != nil {
return options, fmt.Errorf("failed to parse service-account-issuer-key-file: %v", err)
}
if s.Authentication.ServiceAccounts.MaxExpiration != 0 {
lowBound := time.Hour
upBound := time.Duration(1<<32) * time.Second
if s.Authentication.ServiceAccounts.MaxExpiration < lowBound ||
s.Authentication.ServiceAccounts.MaxExpiration > upBound {
return options, fmt.Errorf("the service-account-max-token-expiration must be between 1 hour and 2^32 seconds")
}
if s.Authentication.ServiceAccounts.ExtendExpiration {
if s.Authentication.ServiceAccounts.MaxExpiration < serviceaccount.WarnOnlyBoundTokenExpirationSeconds*time.Second {
klog.Warningf("service-account-extend-token-expiration is true, in order to correctly trigger safe transition logic, service-account-max-token-expiration must be set longer than %d seconds (currently %s)", serviceaccount.WarnOnlyBoundTokenExpirationSeconds, s.Authentication.ServiceAccounts.MaxExpiration)
}
if s.Authentication.ServiceAccounts.MaxExpiration < serviceaccount.ExpirationExtensionSeconds*time.Second {
klog.Warningf("service-account-extend-token-expiration is true, enabling tokens valid up to %d seconds, which is longer than service-account-max-token-expiration set to %s seconds", serviceaccount.ExpirationExtensionSeconds, s.Authentication.ServiceAccounts.MaxExpiration)
}
}
}
s.ServiceAccountIssuer, err = serviceaccount.JWTTokenGenerator(s.Authentication.ServiceAccounts.Issuers[0], sk)
if err != nil {
return options, fmt.Errorf("failed to build token generator: %v", err)
}
s.ServiceAccountTokenMaxExpiration = s.Authentication.ServiceAccounts.MaxExpiration
}
if s.Etcd.EnableWatchCache {
sizes := kubeapiserver.DefaultWatchCacheSizes()
// Ensure that overrides parse correctly.
userSpecified, err := serveroptions.ParseWatchCacheSizes(s.Etcd.WatchCacheSizes)
if err != nil {
return options, err
}
for resource, size := range userSpecified {
sizes[resource] = size
}
s.Etcd.WatchCacheSizes, err = serveroptions.WriteWatchCacheSizes(sizes)
if err != nil {
return options, err
}
}
for key, value := range s.APIEnablement.RuntimeConfig {
if key == "v1" || strings.HasPrefix(key, "v1/") ||
key == "api/v1" || strings.HasPrefix(key, "api/v1/") {
delete(s.APIEnablement.RuntimeConfig, key)
s.APIEnablement.RuntimeConfig["/v1"] = value
}
if key == "api/legacy" {
delete(s.APIEnablement.RuntimeConfig, key)
}
}
options.ServerRunOptions = s
return options, nil
}
var testServiceResolver webhook.ServiceResolver
// SetServiceResolverForTests allows the service resolver to be overridden during tests.
// Tests using this function must run serially as this function is not safe to call concurrently with server start.
func SetServiceResolverForTests(resolver webhook.ServiceResolver) func() {
if testServiceResolver != nil {
panic("test service resolver is set: tests are either running concurrently or clean up was skipped")
}
testServiceResolver = resolver
return func() {
testServiceResolver = nil
}
}
func buildServiceResolver(enabledAggregatorRouting bool, hostname string, informer clientgoinformers.SharedInformerFactory) webhook.ServiceResolver {
if testServiceResolver != nil {
return testServiceResolver
}
var serviceResolver webhook.ServiceResolver
if enabledAggregatorRouting {
serviceResolver = aggregatorapiserver.NewEndpointServiceResolver(
informer.Core().V1().Services().Lister(),
informer.Core().V1().Endpoints().Lister(),
)
} else {
serviceResolver = aggregatorapiserver.NewClusterIPServiceResolver(
informer.Core().V1().Services().Lister(),
)
}
// resolve kubernetes.default.svc locally
if localHost, err := url.Parse(hostname); err == nil {
serviceResolver = aggregatorapiserver.NewLoopbackServiceResolver(serviceResolver, localHost)
}
return serviceResolver
}
func getServiceIPAndRanges(serviceClusterIPRanges string) (net.IP, net.IPNet, net.IPNet, error) {
serviceClusterIPRangeList := []string{}
if serviceClusterIPRanges != "" {
serviceClusterIPRangeList = strings.Split(serviceClusterIPRanges, ",")
}
var apiServerServiceIP net.IP
var primaryServiceIPRange net.IPNet
var secondaryServiceIPRange net.IPNet
var err error
// nothing provided by user, use default range (only applies to the Primary)
if len(serviceClusterIPRangeList) == 0 {
var primaryServiceClusterCIDR net.IPNet
primaryServiceIPRange, apiServerServiceIP, err = controlplane.ServiceIPRange(primaryServiceClusterCIDR)
if err != nil {
return net.IP{}, net.IPNet{}, net.IPNet{}, fmt.Errorf("error determining service IP ranges: %v", err)
}
return apiServerServiceIP, primaryServiceIPRange, net.IPNet{}, nil
}
_, primaryServiceClusterCIDR, err := netutils.ParseCIDRSloppy(serviceClusterIPRangeList[0])
if err != nil {
return net.IP{}, net.IPNet{}, net.IPNet{}, fmt.Errorf("service-cluster-ip-range[0] is not a valid cidr")
}
primaryServiceIPRange, apiServerServiceIP, err = controlplane.ServiceIPRange(*primaryServiceClusterCIDR)
if err != nil {
return net.IP{}, net.IPNet{}, net.IPNet{}, fmt.Errorf("error determining service IP ranges for primary service cidr: %v", err)
}
// user provided at least two entries
// note: validation asserts that the list is max of two dual stack entries
if len(serviceClusterIPRangeList) > 1 {
_, secondaryServiceClusterCIDR, err := netutils.ParseCIDRSloppy(serviceClusterIPRangeList[1])
if err != nil {
return net.IP{}, net.IPNet{}, net.IPNet{}, fmt.Errorf("service-cluster-ip-range[1] is not an ip net")
}
secondaryServiceIPRange = *secondaryServiceClusterCIDR
}
return apiServerServiceIP, primaryServiceIPRange, secondaryServiceIPRange, nil
}