replace contrustor of ad controller with config.complete.new flow

review:
1. import order
2. only set nil field on complete
3. replace hardcoded default namespace
This commit is contained in:
yue9944882
2018-08-08 21:06:30 +08:00
committed by zuoxiu.jm
parent 0df5462db6
commit 6a8faa6e40
4 changed files with 166 additions and 116 deletions

View File

@@ -1,9 +1,4 @@
package(default_visibility = ["//visibility:public"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
)
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "go_default_library",
@@ -13,6 +8,7 @@ go_library(
"server.go",
],
importpath = "k8s.io/kubernetes/cmd/kube-apiserver/app",
visibility = ["//visibility:public"],
deps = [
"//cmd/kube-apiserver/app/options:go_default_library",
"//pkg/api/legacyscheme:go_default_library",
@@ -32,7 +28,6 @@ go_library(
"//pkg/master/controller/crdregistration:go_default_library",
"//pkg/master/reconcilers:go_default_library",
"//pkg/master/tunneler:go_default_library",
"//pkg/quota/v1/install:go_default_library",
"//pkg/registry/cachesize:go_default_library",
"//pkg/registry/rbac/rest:go_default_library",
"//pkg/serviceaccount:go_default_library",
@@ -53,7 +48,6 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/admission:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/admission/plugin/webhook/initializer:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/authentication/authenticator:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/authorization/authorizer:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/endpoints/openapi:go_default_library",
@@ -67,11 +61,8 @@ go_library(
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/flag:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/webhook:go_default_library",
"//staging/src/k8s.io/client-go/discovery/cached:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/rest:go_default_library",
"//staging/src/k8s.io/client-go/restmapper:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//staging/src/k8s.io/client-go/util/cert:go_default_library",
"//staging/src/k8s.io/cloud-provider:go_default_library",
@@ -105,4 +96,5 @@ filegroup(
"//cmd/kube-apiserver/app/testing:all-srcs",
],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@@ -42,7 +42,6 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
utilwait "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/admission"
webhookinit "k8s.io/apiserver/pkg/admission/plugin/webhook/initializer"
"k8s.io/apiserver/pkg/authentication/authenticator"
"k8s.io/apiserver/pkg/authorization/authorizer"
openapinamer "k8s.io/apiserver/pkg/endpoints/openapi"
@@ -54,11 +53,8 @@ import (
utilfeature "k8s.io/apiserver/pkg/util/feature"
apiserverflag "k8s.io/apiserver/pkg/util/flag"
"k8s.io/apiserver/pkg/util/webhook"
cacheddiscovery "k8s.io/client-go/discovery/cached"
clientgoinformers "k8s.io/client-go/informers"
clientgoclientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/restmapper"
certutil "k8s.io/client-go/util/cert"
cloudprovider "k8s.io/cloud-provider"
aggregatorapiserver "k8s.io/kube-aggregator/pkg/apiserver"
@@ -81,7 +77,6 @@ import (
"k8s.io/kubernetes/pkg/master"
"k8s.io/kubernetes/pkg/master/reconcilers"
"k8s.io/kubernetes/pkg/master/tunneler"
quotainstall "k8s.io/kubernetes/pkg/quota/v1/install"
"k8s.io/kubernetes/pkg/registry/cachesize"
rbacrest "k8s.io/kubernetes/pkg/registry/rbac/rest"
"k8s.io/kubernetes/pkg/serviceaccount"
@@ -506,24 +501,6 @@ func buildGenericConfig(
}
versionedInformers = clientgoinformers.NewSharedInformerFactory(clientgoExternalClient, 10*time.Minute)
if s.EnableAggregatorRouting {
serviceResolver = aggregatorapiserver.NewEndpointServiceResolver(
versionedInformers.Core().V1().Services().Lister(),
versionedInformers.Core().V1().Endpoints().Lister(),
)
} else {
serviceResolver = aggregatorapiserver.NewClusterIPServiceResolver(
versionedInformers.Core().V1().Services().Lister(),
)
}
// resolve kubernetes.default.svc locally
localHost, err := url.Parse(genericConfig.LoopbackClientConfig.Host)
if err != nil {
lastErr = err
return
}
serviceResolver = aggregatorapiserver.NewLoopbackServiceResolver(serviceResolver, localHost)
genericConfig.Authentication.Authenticator, genericConfig.OpenAPIConfig.SecurityDefinitions, err = BuildAuthenticator(s, clientgoExternalClient, versionedInformers)
if err != nil {
lastErr = fmt.Errorf("invalid authentication config: %v", err)
@@ -539,35 +516,14 @@ func buildGenericConfig(
genericConfig.DisabledPostStartHooks.Insert(rbacrest.PostStartHookName)
}
webhookAuthResolverWrapper := func(delegate webhook.AuthenticationInfoResolver) webhook.AuthenticationInfoResolver {
return &webhook.AuthenticationInfoResolverDelegator{
ClientConfigForFunc: func(server string) (*rest.Config, error) {
if server == "kubernetes.default.svc" {
return genericConfig.LoopbackClientConfig, nil
}
return delegate.ClientConfigFor(server)
},
ClientConfigForServiceFunc: func(serviceName, serviceNamespace string) (*rest.Config, error) {
if serviceName == "kubernetes" && serviceNamespace == "default" {
return genericConfig.LoopbackClientConfig, nil
}
ret, err := delegate.ClientConfigForService(serviceName, serviceNamespace)
if err != nil {
return nil, err
}
if proxyTransport != nil && proxyTransport.DialContext != nil {
ret.Dial = proxyTransport.DialContext
}
return ret, err
},
}
admissionConfig := &kubeapiserveradmission.AdmissionConfig{
ExternalInformers: versionedInformers,
LoopbackClientConfig: genericConfig.LoopbackClientConfig,
CloudConfigFile: s.CloudProvider.CloudConfigFile,
}
pluginInitializers, admissionPostStartHook, err = BuildAdmissionPluginInitializers(
s,
client,
serviceResolver,
webhookAuthResolverWrapper,
)
serviceResolver = buildServiceResolver(s.EnableAggregatorRouting, genericConfig.LoopbackClientConfig.Host, versionedInformers)
pluginInitializers, admissionPostStartHook, err = admissionConfig.New(proxyTransport, serviceResolver)
if err != nil {
lastErr = fmt.Errorf("failed to create admission plugin initializer: %v", err)
return
@@ -586,42 +542,6 @@ func buildGenericConfig(
return
}
// BuildAdmissionPluginInitializers constructs the admission plugin initializer
func BuildAdmissionPluginInitializers(
s *options.ServerRunOptions,
client internalclientset.Interface,
serviceResolver aggregatorapiserver.ServiceResolver,
webhookAuthWrapper webhook.AuthenticationInfoResolverWrapper,
) ([]admission.PluginInitializer, genericapiserver.PostStartHookFunc, error) {
var cloudConfig []byte
if s.CloudProvider.CloudConfigFile != "" {
var err error
cloudConfig, err = ioutil.ReadFile(s.CloudProvider.CloudConfigFile)
if err != nil {
glog.Fatalf("Error reading from cloud configuration file %s: %#v", s.CloudProvider.CloudConfigFile, err)
}
}
// We have a functional client so we can use that to build our discovery backed REST mapper
// Use a discovery client capable of being refreshed.
discoveryClient := cacheddiscovery.NewMemCacheClient(client.Discovery())
discoveryRESTMapper := restmapper.NewDeferredDiscoveryRESTMapper(discoveryClient)
admissionPostStartHook := func(context genericapiserver.PostStartHookContext) error {
discoveryRESTMapper.Reset()
go utilwait.Until(discoveryRESTMapper.Reset, 30*time.Second, context.StopCh)
return nil
}
quotaConfiguration := quotainstall.NewQuotaConfigurationForAdmission()
kubePluginInitializer := kubeapiserveradmission.NewPluginInitializer(cloudConfig, discoveryRESTMapper, quotaConfiguration)
webhookPluginInitializer := webhookinit.NewPluginInitializer(webhookAuthWrapper, serviceResolver)
return []admission.PluginInitializer{webhookPluginInitializer, kubePluginInitializer}, admissionPostStartHook, nil
}
// BuildAuthenticator constructs the authenticator
func BuildAuthenticator(s *options.ServerRunOptions, extclient clientgoclientset.Interface, versionedInformer clientgoinformers.SharedInformerFactory) (authenticator.Request, *spec.SecurityDefinitions, error) {
authenticatorConfig := s.Authentication.ToAuthenticationConfig()
@@ -728,6 +648,25 @@ func Complete(s *options.ServerRunOptions) (completedServerRunOptions, error) {
return options, nil
}
func buildServiceResolver(enabledAggregatorRouting bool, hostname string, informer clientgoinformers.SharedInformerFactory) webhook.ServiceResolver {
var serviceResolver webhook.ServiceResolver
if enabledAggregatorRouting {
serviceResolver = aggregatorapiserver.NewEndpointServiceResolver(
informer.Core().V1().Services().Lister(),
informer.Core().V1().Endpoints().Lister(),
)
} else {
serviceResolver = aggregatorapiserver.NewClusterIPServiceResolver(
informer.Core().V1().Services().Lister(),
)
}
// resolve kubernetes.default.svc locally
if localHost, err := url.Parse(hostname); err == nil {
serviceResolver = aggregatorapiserver.NewLoopbackServiceResolver(serviceResolver, localHost)
}
return serviceResolver
}
func readCAorNil(file string) ([]byte, error) {
if len(file) == 0 {
return nil, nil