scrub aggregator names to eliminate discovery

This commit is contained in:
deads2k
2017-02-06 14:20:15 -05:00
parent a461eab321
commit dc30d1750e
19 changed files with 64 additions and 457 deletions

View File

@@ -45,7 +45,6 @@ filegroup(
"//cmd/kube-aggregator/pkg/client/listers/apiregistration/internalversion:all-srcs",
"//cmd/kube-aggregator/pkg/client/listers/apiregistration/v1alpha1:all-srcs",
"//cmd/kube-aggregator/pkg/cmd/server:all-srcs",
"//cmd/kube-aggregator/pkg/legacy:all-srcs",
"//cmd/kube-aggregator/pkg/registry/apiservice:all-srcs",
],
tags = ["automanaged"],

View File

@@ -84,14 +84,14 @@ spec:
- name: volume-etcd-client-cert
secret:
defaultMode: 420
secretName: discovery-etcd
secretName: kube-aggregator-etcd
- name: volume-serving-cert
secret:
defaultMode: 420
secretName: serving-discovery
secretName: serving-kube-aggregator
- configMap:
defaultMode: 420
name: discovery-ca
name: kube-aggregator-ca
name: volume-serving-ca
- configMap:
defaultMode: 420

View File

@@ -43,7 +43,7 @@ func main() {
runtime.GOMAXPROCS(runtime.NumCPU())
}
cmd := server.NewCommandStartDiscoveryServer(os.Stdout, os.Stderr)
cmd := server.NewCommandStartAggregator(os.Stdout, os.Stderr)
cmd.Flags().AddGoFlagSet(flag.CommandLine)
if err := cmd.Execute(); err != nil {
cmdutil.CheckErr(err)

View File

@@ -36,7 +36,7 @@ import (
"k8s.io/kubernetes/cmd/kube-aggregator/pkg/apis/apiregistration"
"k8s.io/kubernetes/cmd/kube-aggregator/pkg/apis/apiregistration/v1alpha1"
discoveryclientset "k8s.io/kubernetes/cmd/kube-aggregator/pkg/client/clientset_generated/clientset"
aggregatorclient "k8s.io/kubernetes/cmd/kube-aggregator/pkg/client/clientset_generated/clientset"
"k8s.io/kubernetes/cmd/kube-aggregator/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/cmd/kube-aggregator/pkg/client/informers"
listers "k8s.io/kubernetes/cmd/kube-aggregator/pkg/client/listers/apiregistration/internalversion"
@@ -59,8 +59,8 @@ type Config struct {
RESTOptionsGetter generic.RESTOptionsGetter
}
// APIDiscoveryServer contains state for a Kubernetes cluster master/api server.
type APIDiscoveryServer struct {
// APIAggregator contains state for a Kubernetes cluster master/api server.
type APIAggregator struct {
GenericAPIServer *genericapiserver.GenericAPIServer
contextMapper genericapirequest.RequestContextMapper
@@ -73,13 +73,13 @@ type APIDiscoveryServer struct {
// proxyHandlers are the proxy handlers that are currently registered, keyed by apiservice.name
proxyHandlers map[string]*proxyHandler
// lister is used to add group handling for /apis/<group> discovery lookups based on
// lister is used to add group handling for /apis/<group> aggregator lookups based on
// controller state
lister listers.APIServiceLister
// serviceLister is used by the discovery handler to determine whether or not to try to expose the group
// serviceLister is used by the aggregator handler to determine whether or not to try to expose the group
serviceLister v1listers.ServiceLister
// endpointsLister is used by the discovery handler to determine whether or not to try to expose the group
// endpointsLister is used by the aggregator handler to determine whether or not to try to expose the group
endpointsLister v1listers.EndpointsLister
// proxyMux intercepts requests that need to be proxied to backing API servers
@@ -105,11 +105,11 @@ func (c *Config) SkipComplete() completedConfig {
return completedConfig{c}
}
// New returns a new instance of APIDiscoveryServer from the given config.
func (c completedConfig) New() (*APIDiscoveryServer, error) {
// New returns a new instance of APIAggregator from the given config.
func (c completedConfig) New() (*APIAggregator, error) {
informerFactory := informers.NewSharedInformerFactory(
internalclientset.NewForConfigOrDie(c.Config.GenericConfig.LoopbackClientConfig),
discoveryclientset.NewForConfigOrDie(c.Config.GenericConfig.LoopbackClientConfig),
aggregatorclient.NewForConfigOrDie(c.Config.GenericConfig.LoopbackClientConfig),
5*time.Minute, // this is effectively used as a refresh interval right now. Might want to do something nicer later on.
)
kubeInformers := kubeinformers.NewSharedInformerFactory(nil, c.CoreAPIServerClient, 5*time.Minute)
@@ -129,7 +129,7 @@ func (c completedConfig) New() (*APIDiscoveryServer, error) {
return nil, err
}
s := &APIDiscoveryServer{
s := &APIAggregator{
GenericAPIServer: genericServer,
contextMapper: c.GenericConfig.RequestContextMapper,
proxyClientCert: c.ProxyClientCert,
@@ -175,7 +175,7 @@ type handlerChainConfig struct {
}
// handlerChain is a method to build the handler chain for this API server. We need a custom handler chain so that we
// can have custom handling for `/apis`, since we're hosting discovery differently from anyone else and we're hosting
// can have custom handling for `/apis`, since we're hosting aggregation differently from anyone else and we're hosting
// the endpoints differently, since we're proxying all groups except for apiregistration.k8s.io.
func (h *handlerChainConfig) handlerChain(apiHandler http.Handler, c *genericapiserver.Config) (secure, insecure http.Handler) {
// add this as a filter so that we never collide with "already registered" failures on `/apis`
@@ -205,8 +205,8 @@ func (h *handlerChainConfig) handlerChain(apiHandler http.Handler, c *genericapi
// AddAPIService adds an API service. It is not thread-safe, so only call it on one thread at a time please.
// It's a slow moving API, so its ok to run the controller on a single thread
func (s *APIDiscoveryServer) AddAPIService(apiService *apiregistration.APIService) {
// if the proxyHandler already exists, it needs to be updated. The discovery bits do not
func (s *APIAggregator) AddAPIService(apiService *apiregistration.APIService) {
// if the proxyHandler already exists, it needs to be updated. The aggregation bits do not
// since they are wired against listers because they require multiple resources to respond
if proxyHandler, exists := s.proxyHandlers[apiService.Name]; exists {
proxyHandler.updateAPIService(apiService)
@@ -237,7 +237,7 @@ func (s *APIDiscoveryServer) AddAPIService(apiService *apiregistration.APIServic
return
}
// it's time to register the group discovery endpoint
// it's time to register the group aggregation endpoint
groupPath := "/apis/" + apiService.Spec.Group
groupDiscoveryHandler := &apiGroupHandler{
groupName: apiService.Spec.Group,
@@ -245,7 +245,7 @@ func (s *APIDiscoveryServer) AddAPIService(apiService *apiregistration.APIServic
serviceLister: s.serviceLister,
endpointsLister: s.endpointsLister,
}
// discovery is protected
// aggregation is protected
s.GenericAPIServer.HandlerContainer.UnlistedRoutes.Handle(groupPath, groupDiscoveryHandler)
s.GenericAPIServer.HandlerContainer.UnlistedRoutes.Handle(groupPath+"/", groupDiscoveryHandler)
@@ -253,7 +253,7 @@ func (s *APIDiscoveryServer) AddAPIService(apiService *apiregistration.APIServic
// RemoveAPIService removes the APIService from being handled. Later on it will disable the proxy endpoint.
// Right now it does nothing because our handler has to properly 404 itself since muxes don't unregister
func (s *APIDiscoveryServer) RemoveAPIService(apiServiceName string) {
func (s *APIAggregator) RemoveAPIService(apiServiceName string) {
proxyHandler, exists := s.proxyHandlers[apiServiceName]
if !exists {
return

View File

@@ -89,7 +89,7 @@ func (c *APIServiceRegistrationController) Run(stopCh <-chan struct{}) {
glog.Infof("Starting APIServiceRegistrationController")
// only start one worker thread since its a slow moving API and the discovery server adding bits
// only start one worker thread since its a slow moving API and the aggregation server adding bits
// aren't threadsafe
go wait.Until(c.runWorker, time.Second, stopCh)

View File

@@ -14,7 +14,6 @@ go_library(
deps = [
"//cmd/kube-aggregator/pkg/apis/apiregistration/v1alpha1:go_default_library",
"//cmd/kube-aggregator/pkg/apiserver:go_default_library",
"//cmd/kube-aggregator/pkg/legacy:go_default_library",
"//pkg/api:go_default_library",
"//pkg/client/clientset_generated/clientset:go_default_library",
"//pkg/kubectl/cmd/util:go_default_library",

View File

@@ -35,7 +35,6 @@ import (
"k8s.io/apiserver/pkg/storage/storagebackend"
restclient "k8s.io/client-go/rest"
"k8s.io/kubernetes/cmd/kube-aggregator/pkg/apiserver"
"k8s.io/kubernetes/cmd/kube-aggregator/pkg/legacy"
"k8s.io/kubernetes/pkg/api"
kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util"
@@ -43,9 +42,9 @@ import (
"k8s.io/kubernetes/cmd/kube-aggregator/pkg/apis/apiregistration/v1alpha1"
)
const defaultEtcdPathPrefix = "/registry/kubernetes.io/kube-aggregator"
const defaultEtcdPathPrefix = "/registry/kube-aggregator.kubernetes.io/"
type DiscoveryServerOptions struct {
type AggregatorOptions struct {
Etcd *genericoptions.EtcdOptions
SecureServing *genericoptions.SecureServingOptions
Authentication *genericoptions.DelegatingAuthenticationOptions
@@ -61,8 +60,8 @@ type DiscoveryServerOptions struct {
}
// NewCommandStartMaster provides a CLI handler for 'start master' command
func NewCommandStartDiscoveryServer(out, err io.Writer) *cobra.Command {
o := &DiscoveryServerOptions{
func NewCommandStartAggregator(out, err io.Writer) *cobra.Command {
o := &AggregatorOptions{
Etcd: genericoptions.NewEtcdOptions(api.Scheme),
SecureServing: genericoptions.NewSecureServingOptions(),
Authentication: genericoptions.NewDelegatingAuthenticationOptions(),
@@ -77,12 +76,12 @@ func NewCommandStartDiscoveryServer(out, err io.Writer) *cobra.Command {
o.SecureServing.ServingOptions.BindPort = 443
cmd := &cobra.Command{
Short: "Launch a discovery summarizer and proxy server",
Long: "Launch a discovery summarizer and proxy server",
Short: "Launch a API aggregator and proxy server",
Long: "Launch a API aggregator and proxy server",
Run: func(c *cobra.Command, args []string) {
cmdutil.CheckErr(o.Complete())
cmdutil.CheckErr(o.Validate(args))
cmdutil.CheckErr(o.RunDiscoveryServer())
cmdutil.CheckErr(o.RunAggregator())
},
}
@@ -97,20 +96,15 @@ func NewCommandStartDiscoveryServer(out, err io.Writer) *cobra.Command {
return cmd
}
func (o DiscoveryServerOptions) Validate(args []string) error {
func (o AggregatorOptions) Validate(args []string) error {
return nil
}
func (o *DiscoveryServerOptions) Complete() error {
func (o *AggregatorOptions) Complete() error {
return nil
}
func (o DiscoveryServerOptions) RunDiscoveryServer() error {
// if we don't have an etcd to back the server, we must be a legacy server
if len(o.Etcd.StorageConfig.ServerList) == 0 {
return o.RunLegacyDiscoveryServer()
}
func (o AggregatorOptions) RunAggregator() error {
// TODO have a "real" external address
if err := o.SecureServing.MaybeDefaultWithSelfSignedCerts("localhost"); err != nil {
return fmt.Errorf("error creating self-signed certificates: %v", err)
@@ -171,17 +165,6 @@ func (o DiscoveryServerOptions) RunDiscoveryServer() error {
return nil
}
// RunLegacyDiscoveryServer runs the legacy mode of discovery
func (o DiscoveryServerOptions) RunLegacyDiscoveryServer() error {
configFilePath := "config.json"
port := "9090"
s, err := legacy.NewDiscoverySummarizer(configFilePath)
if err != nil {
return err
}
return s.Run(port)
}
type restOptionsFactory struct {
storageConfig *storagebackend.Config
}

View File

@@ -1,37 +0,0 @@
package(default_visibility = ["//visibility:public"])
licenses(["notice"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
)
go_library(
name = "go_default_library",
srcs = [
"discoverysummarizer.go",
"doc.go",
],
tags = ["automanaged"],
deps = [
"//cmd/kube-aggregator/pkg/legacy/apis/config/v1alpha1:go_default_library",
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [
":package-srcs",
"//cmd/kube-aggregator/pkg/legacy/apis/config/v1alpha1:all-srcs",
],
tags = ["automanaged"],
)

View File

@@ -1,27 +0,0 @@
package(default_visibility = ["//visibility:public"])
licenses(["notice"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
)
go_library(
name = "go_default_library",
srcs = ["types.go"],
tags = ["automanaged"],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
)

View File

@@ -1,43 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package v1alpha1
// List of servers from which group versions should be summarized.
// This is used to represent the structure of the config file passed to discovery summarizer server.
type FederatedServerList struct {
Servers []FederatedServer `json:"servers"`
}
// Information about each individual server, whose group versions needs to be summarized.
type FederatedServer struct {
// The address that summarizer can reach to get discovery information from the server.
// This can be hostname, hostname:port, IP or IP:port.
ServerAddress string `json:"serverAddress"`
// The list of paths where server exposes group version discovery information.
// Summarizer will use these paths to figure out group versions supported by this server.
GroupVersionDiscoveryPaths []GroupVersionDiscoveryPath `json:"groupVersionDiscoveryPaths"`
}
// Information about each group version discovery path that needs to be summarized.
type GroupVersionDiscoveryPath struct {
// Path where the server exposes the discovery API to surface the group versions that it supports.
Path string `json:"path"`
// True if the path is for legacy group version.
// (i.e the path returns metav1.APIVersions instead of metav1.APIGroupList)
IsLegacy bool `json:"isLegacy"`
}

View File

@@ -1,211 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package legacy
import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
config "k8s.io/kubernetes/cmd/kube-aggregator/pkg/legacy/apis/config/v1alpha1"
)
type DiscoverySummarizer interface {
Run(port string) error
}
type discoverySummarizerServer struct {
// The list of servers as read from the config file.
serverList config.FederatedServerList
groupVersionPaths map[string][]string
legacyVersionPaths map[string][]string
}
// Ensure that discoverySummarizerServer implements DiscoverySummarizer interface.
var _ DiscoverySummarizer = &discoverySummarizerServer{}
// Creates a server to summarize all group versions
// supported by the servers mentioned in the given config file.
// Call Run() to bring up the server.
func NewDiscoverySummarizer(configFilePath string) (DiscoverySummarizer, error) {
file, err := ioutil.ReadFile(configFilePath)
if err != nil {
return nil, fmt.Errorf("Error in reading config file: %v\n", err)
}
ds := discoverySummarizerServer{
groupVersionPaths: map[string][]string{},
legacyVersionPaths: map[string][]string{},
}
err = json.Unmarshal(file, &ds.serverList)
if err != nil {
return nil, fmt.Errorf("Error in marshalling config file to json: %v\n", err)
}
for _, server := range ds.serverList.Servers {
for _, groupVersionPath := range server.GroupVersionDiscoveryPaths {
if groupVersionPath.IsLegacy {
ds.legacyVersionPaths[groupVersionPath.Path] = append(ds.legacyVersionPaths[groupVersionPath.Path], server.ServerAddress)
} else {
ds.groupVersionPaths[groupVersionPath.Path] = append(ds.groupVersionPaths[groupVersionPath.Path], server.ServerAddress)
}
}
}
return &ds, nil
}
// Brings up the server at the given port.
// TODO: Add HTTPS support.
func (ds *discoverySummarizerServer) Run(port string) error {
http.HandleFunc("/", ds.indexHandler)
// Register a handler for all paths.
for path := range ds.groupVersionPaths {
p := path
fmt.Printf("setting up a handler for %s\n", p)
http.HandleFunc(p, ds.summarizeGroupVersionsHandler(p))
}
for path := range ds.legacyVersionPaths {
p := path
fmt.Printf("setting up a handler for %s\n", p)
http.HandleFunc(p, ds.summarizeLegacyVersionsHandler(p))
}
fmt.Printf("Server running on port %s\n", port)
return http.ListenAndServe(":"+port, nil)
}
// Handler for "/"
func (ds *discoverySummarizerServer) indexHandler(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/" {
w.WriteHeader(http.StatusNotFound)
return
}
w.WriteHeader(http.StatusOK)
w.Write([]byte("Success"))
}
// Handler for group versions summarizer.
func (ds *discoverySummarizerServer) summarizeGroupVersionsHandler(path string) func(http.ResponseWriter, *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
var apiGroupList *metav1.APIGroupList
// TODO: We can cache calls to all servers.
groups := make(chan *metav1.APIGroupList)
errorChannel := make(chan error)
for _, serverAddress := range ds.groupVersionPaths[path] {
addr := serverAddress
go func(groups chan *metav1.APIGroupList, error_channel chan error) {
groupList, err := ds.getAPIGroupList(addr + path)
if err != nil {
errorChannel <- err
return
}
groups <- groupList
return
}(groups, errorChannel)
}
var groupList *metav1.APIGroupList
var err error
for range ds.groupVersionPaths[path] {
select {
case groupList = <-groups:
if apiGroupList == nil {
apiGroupList = &metav1.APIGroupList{}
*apiGroupList = *groupList
} else {
apiGroupList.Groups = append(apiGroupList.Groups, groupList.Groups...)
}
case err = <-errorChannel:
ds.writeErr(http.StatusBadGateway, err, w)
return
}
}
ds.writeRawJSON(http.StatusOK, *apiGroupList, w)
return
}
}
// Handler for legacy versions summarizer.
func (ds *discoverySummarizerServer) summarizeLegacyVersionsHandler(path string) func(http.ResponseWriter, *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
if len(ds.legacyVersionPaths[path]) > 1 {
err := fmt.Errorf("invalid multiple servers serving legacy group %v", ds.legacyVersionPaths[path])
ds.writeErr(http.StatusInternalServerError, err, w)
}
serverAddress := ds.legacyVersionPaths[path][0]
apiVersions, err := ds.getAPIVersions(serverAddress + path)
if err != nil {
ds.writeErr(http.StatusBadGateway, err, w)
return
}
ds.writeRawJSON(http.StatusOK, apiVersions, w)
return
}
}
func (ds *discoverySummarizerServer) getAPIGroupList(serverAddress string) (*metav1.APIGroupList, error) {
response, err := http.Get(serverAddress)
if err != nil {
return nil, fmt.Errorf("Error in fetching %s: %v", serverAddress, err)
}
defer response.Body.Close()
contents, err := ioutil.ReadAll(response.Body)
if err != nil {
return nil, fmt.Errorf("Error reading response from %s: %v", serverAddress, err)
}
var apiGroupList metav1.APIGroupList
err = json.Unmarshal(contents, &apiGroupList)
if err != nil {
return nil, fmt.Errorf("Error in unmarshalling response from server %s: %v", serverAddress, err)
}
return &apiGroupList, nil
}
func (ds *discoverySummarizerServer) getAPIVersions(serverAddress string) (*metav1.APIVersions, error) {
response, err := http.Get(serverAddress)
if err != nil {
return nil, fmt.Errorf("Error in fetching %s: %v", serverAddress, err)
}
defer response.Body.Close()
contents, err := ioutil.ReadAll(response.Body)
if err != nil {
return nil, fmt.Errorf("Error reading response from %s: %v", serverAddress, err)
}
var apiVersions metav1.APIVersions
err = json.Unmarshal(contents, &apiVersions)
if err != nil {
return nil, fmt.Errorf("Error in unmarshalling response from server %s: %v", serverAddress, err)
}
return &apiVersions, nil
}
// TODO: Pass a runtime.Object here instead of interface{} and use the encoding/decoding stack from kubernetes apiserver.
func (ds *discoverySummarizerServer) writeRawJSON(statusCode int, object interface{}, w http.ResponseWriter) {
output, err := json.MarshalIndent(object, "", " ")
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(statusCode)
w.Write(output)
}
func (ds *discoverySummarizerServer) writeErr(statusCode int, err error, w http.ResponseWriter) {
http.Error(w, err.Error(), statusCode)
}

View File

@@ -1,20 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package discoverysummarizer contains code for the legacy discovery summarizer
// (program to summarize discovery information from all federated api servers)
// as per https://github.com/kubernetes/kubernetes/blob/master/docs/proposals/federated-api-servers.md
package legacy // import "k8s.io/kubernetes/cmd/kube-aggregator/pkg/legacy"