
Automatic merge from submit-queue Use full package path for definition name in OpenAPI spec We were using short package name (last part of package name) plus type name for OpenAPI spec definition name. That can result in duplicate names and make the spec invalid. To be sure we will always have unique names, we are going to use full package name as definition name. Also "x-kubernetes-tag" custom field is added to definitions to list Group/Version/Kind for the definitions that has it. This will help clients to discover definitions easier. Lastly, we've added a reference from old definition names to the new ones to keep backward compatibilities. The list of old definitions will not be updated. **Release note**: - Rename OpenAPI definition names to type's full package names to prevent duplicates - Create OpenAPI extension "x-kubernetes-group-version-kind" for definitions to store Group/Version/Kind - Deprecate old definition names and create a reference to the new definitions. Old definitions will be removed in the next release.
349 lines
14 KiB
Go
349 lines
14 KiB
Go
/*
|
|
Copyright 2014 The Kubernetes Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
// Package app does all of the work necessary to create a Kubernetes
|
|
// APIServer by binding together the API, master and APIServer infrastructure.
|
|
// It can be configured and called directly or via the hyperkube framework.
|
|
package app
|
|
|
|
import (
|
|
"crypto/tls"
|
|
"fmt"
|
|
"net"
|
|
"net/http"
|
|
"net/url"
|
|
"os"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/golang/glog"
|
|
"github.com/pborman/uuid"
|
|
"github.com/spf13/cobra"
|
|
"github.com/spf13/pflag"
|
|
|
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
|
utilerrors "k8s.io/apimachinery/pkg/util/errors"
|
|
utilnet "k8s.io/apimachinery/pkg/util/net"
|
|
"k8s.io/apimachinery/pkg/util/sets"
|
|
"k8s.io/apimachinery/pkg/util/wait"
|
|
"k8s.io/apiserver/pkg/admission"
|
|
"k8s.io/apiserver/pkg/server/filters"
|
|
"k8s.io/kubernetes/cmd/kube-apiserver/app/options"
|
|
"k8s.io/kubernetes/pkg/api"
|
|
"k8s.io/kubernetes/pkg/apis/autoscaling"
|
|
"k8s.io/kubernetes/pkg/apis/batch"
|
|
"k8s.io/kubernetes/pkg/apis/extensions"
|
|
"k8s.io/kubernetes/pkg/capabilities"
|
|
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
|
|
"k8s.io/kubernetes/pkg/cloudprovider"
|
|
"k8s.io/kubernetes/pkg/controller/informers"
|
|
serviceaccountcontroller "k8s.io/kubernetes/pkg/controller/serviceaccount"
|
|
generatedopenapi "k8s.io/kubernetes/pkg/generated/openapi"
|
|
genericapiserver "k8s.io/kubernetes/pkg/genericapiserver/server"
|
|
"k8s.io/kubernetes/pkg/kubeapiserver"
|
|
kubeadmission "k8s.io/kubernetes/pkg/kubeapiserver/admission"
|
|
kubeauthenticator "k8s.io/kubernetes/pkg/kubeapiserver/authenticator"
|
|
"k8s.io/kubernetes/pkg/master"
|
|
"k8s.io/kubernetes/pkg/master/tunneler"
|
|
"k8s.io/kubernetes/pkg/registry/cachesize"
|
|
"k8s.io/kubernetes/pkg/version"
|
|
)
|
|
|
|
// NewAPIServerCommand creates a *cobra.Command object with default parameters
|
|
func NewAPIServerCommand() *cobra.Command {
|
|
s := options.NewServerRunOptions()
|
|
s.AddFlags(pflag.CommandLine)
|
|
cmd := &cobra.Command{
|
|
Use: "kube-apiserver",
|
|
Long: `The Kubernetes API server validates and configures data
|
|
for the api objects which include pods, services, replicationcontrollers, and
|
|
others. The API Server services REST operations and provides the frontend to the
|
|
cluster's shared state through which all other components interact.`,
|
|
Run: func(cmd *cobra.Command, args []string) {
|
|
},
|
|
}
|
|
|
|
return cmd
|
|
}
|
|
|
|
// Run runs the specified APIServer. This should never exit.
|
|
func Run(s *options.ServerRunOptions) error {
|
|
// set defaults
|
|
if err := s.GenericServerRunOptions.DefaultAdvertiseAddress(s.SecureServing, s.InsecureServing); err != nil {
|
|
return err
|
|
}
|
|
serviceIPRange, apiServerServiceIP, err := master.DefaultServiceIPRange(s.ServiceClusterIPRange)
|
|
if err != nil {
|
|
return fmt.Errorf("error determining service IP ranges: %v", err)
|
|
}
|
|
if err := s.SecureServing.MaybeDefaultWithSelfSignedCerts(s.GenericServerRunOptions.AdvertiseAddress.String(), apiServerServiceIP); err != nil {
|
|
return fmt.Errorf("error creating self-signed certificates: %v", err)
|
|
}
|
|
if err := s.CloudProvider.DefaultExternalHost(s.GenericServerRunOptions); err != nil {
|
|
return fmt.Errorf("error setting the external host value: %v", err)
|
|
}
|
|
|
|
s.Authentication.ApplyAuthorization(s.Authorization)
|
|
|
|
// validate options
|
|
if errs := s.Validate(); len(errs) != 0 {
|
|
return utilerrors.NewAggregate(errs)
|
|
}
|
|
|
|
// create config from options
|
|
genericConfig := genericapiserver.NewConfig().
|
|
ApplyOptions(s.GenericServerRunOptions).
|
|
ApplyInsecureServingOptions(s.InsecureServing)
|
|
|
|
if _, err := genericConfig.ApplySecureServingOptions(s.SecureServing); err != nil {
|
|
return fmt.Errorf("failed to configure https: %s", err)
|
|
}
|
|
if err = s.Authentication.Apply(genericConfig); err != nil {
|
|
return fmt.Errorf("failed to configure authentication: %s", err)
|
|
}
|
|
|
|
capabilities.Initialize(capabilities.Capabilities{
|
|
AllowPrivileged: s.AllowPrivileged,
|
|
// TODO(vmarmol): Implement support for HostNetworkSources.
|
|
PrivilegedSources: capabilities.PrivilegedSources{
|
|
HostNetworkSources: []string{},
|
|
HostPIDSources: []string{},
|
|
HostIPCSources: []string{},
|
|
},
|
|
PerConnectionBandwidthLimitBytesPerSec: s.MaxConnectionBytesPerSec,
|
|
})
|
|
|
|
// Setup nodeTunneler if needed
|
|
var nodeTunneler tunneler.Tunneler
|
|
var proxyDialerFn utilnet.DialFunc
|
|
if len(s.SSHUser) > 0 {
|
|
// Get ssh key distribution func, if supported
|
|
var installSSHKey tunneler.InstallSSHKey
|
|
cloud, err := cloudprovider.InitCloudProvider(s.CloudProvider.CloudProvider, s.CloudProvider.CloudConfigFile)
|
|
if err != nil {
|
|
return fmt.Errorf("cloud provider could not be initialized: %v", err)
|
|
}
|
|
if cloud != nil {
|
|
if instances, supported := cloud.Instances(); supported {
|
|
installSSHKey = instances.AddSSHKeyToAllInstances
|
|
}
|
|
}
|
|
if s.KubeletConfig.Port == 0 {
|
|
return fmt.Errorf("must enable kubelet port if proxy ssh-tunneling is specified")
|
|
}
|
|
if s.KubeletConfig.ReadOnlyPort == 0 {
|
|
return fmt.Errorf("must enable kubelet readonly port if proxy ssh-tunneling is specified")
|
|
}
|
|
// Set up the nodeTunneler
|
|
// TODO(cjcullen): If we want this to handle per-kubelet ports or other
|
|
// kubelet listen-addresses, we need to plumb through options.
|
|
healthCheckPath := &url.URL{
|
|
Scheme: "http",
|
|
Host: net.JoinHostPort("127.0.0.1", strconv.FormatUint(uint64(s.KubeletConfig.ReadOnlyPort), 10)),
|
|
Path: "healthz",
|
|
}
|
|
nodeTunneler = tunneler.New(s.SSHUser, s.SSHKeyfile, healthCheckPath, installSSHKey)
|
|
|
|
// Use the nodeTunneler's dialer to connect to the kubelet
|
|
s.KubeletConfig.Dial = nodeTunneler.Dial
|
|
// Use the nodeTunneler's dialer when proxying to pods, services, and nodes
|
|
proxyDialerFn = nodeTunneler.Dial
|
|
}
|
|
|
|
// Proxying to pods and services is IP-based... don't expect to be able to verify the hostname
|
|
proxyTLSClientConfig := &tls.Config{InsecureSkipVerify: true}
|
|
|
|
if s.Etcd.StorageConfig.DeserializationCacheSize == 0 {
|
|
// When size of cache is not explicitly set, estimate its size based on
|
|
// target memory usage.
|
|
glog.V(2).Infof("Initializing deserialization cache size based on %dMB limit", s.GenericServerRunOptions.TargetRAMMB)
|
|
|
|
// This is the heuristics that from memory capacity is trying to infer
|
|
// the maximum number of nodes in the cluster and set cache sizes based
|
|
// on that value.
|
|
// From our documentation, we officially recomment 120GB machines for
|
|
// 2000 nodes, and we scale from that point. Thus we assume ~60MB of
|
|
// capacity per node.
|
|
// TODO: We may consider deciding that some percentage of memory will
|
|
// be used for the deserialization cache and divide it by the max object
|
|
// size to compute its size. We may even go further and measure
|
|
// collective sizes of the objects in the cache.
|
|
clusterSize := s.GenericServerRunOptions.TargetRAMMB / 60
|
|
s.Etcd.StorageConfig.DeserializationCacheSize = 25 * clusterSize
|
|
if s.Etcd.StorageConfig.DeserializationCacheSize < 1000 {
|
|
s.Etcd.StorageConfig.DeserializationCacheSize = 1000
|
|
}
|
|
}
|
|
|
|
storageGroupsToEncodingVersion, err := s.StorageSerialization.StorageGroupsToEncodingVersion()
|
|
if err != nil {
|
|
return fmt.Errorf("error generating storage version map: %s", err)
|
|
}
|
|
storageFactory, err := kubeapiserver.BuildDefaultStorageFactory(
|
|
s.Etcd.StorageConfig, s.GenericServerRunOptions.DefaultStorageMediaType, api.Codecs,
|
|
genericapiserver.NewDefaultResourceEncodingConfig(), storageGroupsToEncodingVersion,
|
|
// FIXME: this GroupVersionResource override should be configurable
|
|
[]schema.GroupVersionResource{batch.Resource("cronjobs").WithVersion("v2alpha1")},
|
|
master.DefaultAPIResourceConfigSource(), s.GenericServerRunOptions.RuntimeConfig)
|
|
if err != nil {
|
|
return fmt.Errorf("error in initializing storage factory: %s", err)
|
|
}
|
|
storageFactory.AddCohabitatingResources(autoscaling.Resource("horizontalpodautoscalers"), extensions.Resource("horizontalpodautoscalers"))
|
|
for _, override := range s.Etcd.EtcdServersOverrides {
|
|
tokens := strings.Split(override, "#")
|
|
if len(tokens) != 2 {
|
|
glog.Errorf("invalid value of etcd server overrides: %s", override)
|
|
continue
|
|
}
|
|
|
|
apiresource := strings.Split(tokens[0], "/")
|
|
if len(apiresource) != 2 {
|
|
glog.Errorf("invalid resource definition: %s", tokens[0])
|
|
continue
|
|
}
|
|
group := apiresource[0]
|
|
resource := apiresource[1]
|
|
groupResource := schema.GroupResource{Group: group, Resource: resource}
|
|
|
|
servers := strings.Split(tokens[1], ";")
|
|
storageFactory.SetEtcdLocation(groupResource, servers)
|
|
}
|
|
|
|
// Default to the private server key for service account token signing
|
|
if len(s.Authentication.ServiceAccounts.KeyFiles) == 0 && s.SecureServing.ServerCert.CertKey.KeyFile != "" {
|
|
if kubeauthenticator.IsValidServiceAccountKeyFile(s.SecureServing.ServerCert.CertKey.KeyFile) {
|
|
s.Authentication.ServiceAccounts.KeyFiles = []string{s.SecureServing.ServerCert.CertKey.KeyFile}
|
|
} else {
|
|
glog.Warning("No TLS key provided, service account token authentication disabled")
|
|
}
|
|
}
|
|
|
|
authenticatorConfig := s.Authentication.ToAuthenticationConfig()
|
|
if s.Authentication.ServiceAccounts.Lookup {
|
|
// If we need to look up service accounts and tokens,
|
|
// go directly to etcd to avoid recursive auth insanity
|
|
storageConfig, err := storageFactory.NewConfig(api.Resource("serviceaccounts"))
|
|
if err != nil {
|
|
return fmt.Errorf("unable to get serviceaccounts storage: %v", err)
|
|
}
|
|
authenticatorConfig.ServiceAccountTokenGetter = serviceaccountcontroller.NewGetterFromStorageInterface(storageConfig, storageFactory.ResourcePrefix(api.Resource("serviceaccounts")), storageFactory.ResourcePrefix(api.Resource("secrets")))
|
|
}
|
|
|
|
apiAuthenticator, securityDefinitions, err := authenticatorConfig.New()
|
|
if err != nil {
|
|
return fmt.Errorf("invalid Authentication Config: %v", err)
|
|
}
|
|
|
|
privilegedLoopbackToken := uuid.NewRandom().String()
|
|
selfClientConfig, err := genericapiserver.NewSelfClientConfig(genericConfig.SecureServingInfo, genericConfig.InsecureServingInfo, privilegedLoopbackToken)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create clientset: %v", err)
|
|
}
|
|
client, err := internalclientset.NewForConfig(selfClientConfig)
|
|
if err != nil {
|
|
kubeAPIVersions := os.Getenv("KUBE_API_VERSIONS")
|
|
if len(kubeAPIVersions) == 0 {
|
|
return fmt.Errorf("failed to create clientset: %v", err)
|
|
}
|
|
|
|
// KUBE_API_VERSIONS is used in test-update-storage-objects.sh, disabling a number of API
|
|
// groups. This leads to a nil client above and undefined behaviour further down.
|
|
// TODO: get rid of KUBE_API_VERSIONS or define sane behaviour if set
|
|
glog.Errorf("Failed to create clientset with KUBE_API_VERSIONS=%q. KUBE_API_VERSIONS is only for testing. Things will break.", kubeAPIVersions)
|
|
}
|
|
sharedInformers := informers.NewSharedInformerFactory(nil, client, 10*time.Minute)
|
|
|
|
authorizationConfig := s.Authorization.ToAuthorizationConfig(sharedInformers)
|
|
apiAuthorizer, err := authorizationConfig.New()
|
|
if err != nil {
|
|
return fmt.Errorf("invalid Authorization Config: %v", err)
|
|
}
|
|
|
|
admissionControlPluginNames := strings.Split(s.GenericServerRunOptions.AdmissionControl, ",")
|
|
pluginInitializer := kubeadmission.NewPluginInitializer(client, sharedInformers, apiAuthorizer)
|
|
admissionConfigProvider, err := kubeadmission.ReadAdmissionConfiguration(admissionControlPluginNames, s.GenericServerRunOptions.AdmissionControlConfigFile)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to read plugin config: %v", err)
|
|
}
|
|
admissionController, err := admission.NewFromPlugins(admissionControlPluginNames, admissionConfigProvider, pluginInitializer)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to initialize plugins: %v", err)
|
|
}
|
|
|
|
proxyTransport := utilnet.SetTransportDefaults(&http.Transport{
|
|
Dial: proxyDialerFn,
|
|
TLSClientConfig: proxyTLSClientConfig,
|
|
})
|
|
kubeVersion := version.Get()
|
|
|
|
genericConfig.Version = &kubeVersion
|
|
genericConfig.LoopbackClientConfig = selfClientConfig
|
|
genericConfig.Authenticator = apiAuthenticator
|
|
genericConfig.Authorizer = apiAuthorizer
|
|
genericConfig.AdmissionControl = admissionController
|
|
genericConfig.OpenAPIConfig = genericapiserver.DefaultOpenAPIConfig(generatedopenapi.GetOpenAPIDefinitions)
|
|
genericConfig.OpenAPIConfig.SecurityDefinitions = securityDefinitions
|
|
genericConfig.OpenAPIConfig.Info.Title = "Kubernetes"
|
|
genericConfig.SwaggerConfig = genericapiserver.DefaultSwaggerConfig()
|
|
genericConfig.EnableMetrics = true
|
|
genericConfig.LongRunningFunc = filters.BasicLongRunningRequestCheck(
|
|
sets.NewString("watch", "proxy"),
|
|
sets.NewString("attach", "exec", "proxy", "log", "portforward"),
|
|
)
|
|
|
|
config := &master.Config{
|
|
GenericConfig: genericConfig,
|
|
|
|
APIResourceConfigSource: storageFactory.APIResourceConfigSource,
|
|
StorageFactory: storageFactory,
|
|
EnableWatchCache: s.GenericServerRunOptions.EnableWatchCache,
|
|
EnableCoreControllers: true,
|
|
DeleteCollectionWorkers: s.GenericServerRunOptions.DeleteCollectionWorkers,
|
|
EventTTL: s.EventTTL,
|
|
KubeletClientConfig: s.KubeletConfig,
|
|
EnableUISupport: true,
|
|
EnableLogsSupport: true,
|
|
ProxyTransport: proxyTransport,
|
|
|
|
Tunneler: nodeTunneler,
|
|
|
|
ServiceIPRange: serviceIPRange,
|
|
APIServerServiceIP: apiServerServiceIP,
|
|
APIServerServicePort: 443,
|
|
|
|
ServiceNodePortRange: s.ServiceNodePortRange,
|
|
KubernetesServiceNodePort: s.KubernetesServiceNodePort,
|
|
|
|
MasterCount: s.MasterCount,
|
|
}
|
|
|
|
if s.GenericServerRunOptions.EnableWatchCache {
|
|
glog.V(2).Infof("Initializing cache sizes based on %dMB limit", s.GenericServerRunOptions.TargetRAMMB)
|
|
cachesize.InitializeWatchCacheSizes(s.GenericServerRunOptions.TargetRAMMB)
|
|
cachesize.SetWatchCacheSizes(s.GenericServerRunOptions.WatchCacheSizes)
|
|
}
|
|
|
|
m, err := config.Complete().New()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
sharedInformers.Start(wait.NeverStop)
|
|
m.GenericAPIServer.PrepareRun().Run(wait.NeverStop)
|
|
return nil
|
|
}
|