add federation service controller
This commit is contained in:
		| @@ -51,6 +51,7 @@ federation-controller-manager | |||||||
| ``` | ``` | ||||||
|       --address=0.0.0.0: The IP address to serve on (set to 0.0.0.0 for all interfaces) |       --address=0.0.0.0: The IP address to serve on (set to 0.0.0.0 for all interfaces) | ||||||
|       --cluster-monitor-period=40s: The period for syncing ClusterStatus in ClusterController. |       --cluster-monitor-period=40s: The period for syncing ClusterStatus in ClusterController. | ||||||
|  |       --concurrent-service-syncs=10: The number of service syncing operations that will be done concurrently. Larger number = faster endpoint updating, but more CPU (and network) load | ||||||
|       --federated-api-burst=30: Burst to use while talking with federation apiserver |       --federated-api-burst=30: Burst to use while talking with federation apiserver | ||||||
|       --federated-api-qps=20: QPS to use while talking with federation apiserver |       --federated-api-qps=20: QPS to use while talking with federation apiserver | ||||||
|       --kube-api-content-type="": ContentType of requests sent to apiserver. Passing application/vnd.kubernetes.protobuf is an experimental feature now. |       --kube-api-content-type="": ContentType of requests sent to apiserver. Passing application/vnd.kubernetes.protobuf is an experimental feature now. | ||||||
| @@ -65,7 +66,7 @@ federation-controller-manager | |||||||
|       --profiling[=true]: Enable profiling via web interface host:port/debug/pprof/ |       --profiling[=true]: Enable profiling via web interface host:port/debug/pprof/ | ||||||
| ``` | ``` | ||||||
|  |  | ||||||
| ###### Auto generated by spf13/cobra on 25-May-2016 | ###### Auto generated by spf13/cobra on 29-May-2016 | ||||||
|  |  | ||||||
|  |  | ||||||
| <!-- BEGIN MUNGE: GENERATED_ANALYTICS --> | <!-- BEGIN MUNGE: GENERATED_ANALYTICS --> | ||||||
|   | |||||||
							
								
								
									
										35
									
								
								federation/client/cache/cluster_cache.go
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										35
									
								
								federation/client/cache/cluster_cache.go
									
									
									
									
										vendored
									
									
								
							| @@ -17,7 +17,8 @@ limitations under the License. | |||||||
| package cache | package cache | ||||||
|  |  | ||||||
| import ( | import ( | ||||||
| 	federation_v1alpha1 "k8s.io/kubernetes/federation/apis/federation/v1alpha1" | 	"github.com/golang/glog" | ||||||
|  | 	"k8s.io/kubernetes/federation/apis/federation" | ||||||
| 	kubeCache "k8s.io/kubernetes/pkg/client/cache" | 	kubeCache "k8s.io/kubernetes/pkg/client/cache" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| @@ -27,9 +28,37 @@ type StoreToClusterLister struct { | |||||||
| 	kubeCache.Store | 	kubeCache.Store | ||||||
| } | } | ||||||
|  |  | ||||||
| func (s *StoreToClusterLister) List() (clusters federation_v1alpha1.ClusterList, err error) { | func (s *StoreToClusterLister) List() (clusters federation.ClusterList, err error) { | ||||||
| 	for _, m := range s.Store.List() { | 	for _, m := range s.Store.List() { | ||||||
| 		clusters.Items = append(clusters.Items, *(m.(*federation_v1alpha1.Cluster))) | 		clusters.Items = append(clusters.Items, *(m.(*federation.Cluster))) | ||||||
| 	} | 	} | ||||||
| 	return clusters, nil | 	return clusters, nil | ||||||
| } | } | ||||||
|  |  | ||||||
|  | // ClusterConditionPredicate is a function that indicates whether the given cluster's conditions meet | ||||||
|  | // some set of criteria defined by the function. | ||||||
|  | type ClusterConditionPredicate func(cluster federation.Cluster) bool | ||||||
|  |  | ||||||
|  | // storeToClusterConditionLister filters and returns nodes matching the given type and status from the store. | ||||||
|  | type storeToClusterConditionLister struct { | ||||||
|  | 	store     kubeCache.Store | ||||||
|  | 	predicate ClusterConditionPredicate | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // ClusterCondition returns a storeToClusterConditionLister | ||||||
|  | func (s *StoreToClusterLister) ClusterCondition(predicate ClusterConditionPredicate) storeToClusterConditionLister { | ||||||
|  | 	return storeToClusterConditionLister{s.Store, predicate} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // List returns a list of clusters that match the conditions defined by the predicate functions in the storeToClusterConditionLister. | ||||||
|  | func (s storeToClusterConditionLister) List() (clusters federation.ClusterList, err error) { | ||||||
|  | 	for _, m := range s.store.List() { | ||||||
|  | 		cluster := *m.(*federation.Cluster) | ||||||
|  | 		if s.predicate(cluster) { | ||||||
|  | 			clusters.Items = append(clusters.Items, cluster) | ||||||
|  | 		} else { | ||||||
|  | 			glog.V(5).Infof("Cluster %s matches none of the conditions", cluster.Name) | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 	return | ||||||
|  | } | ||||||
|   | |||||||
| @@ -28,11 +28,15 @@ import ( | |||||||
| 	"k8s.io/kubernetes/federation/cmd/federation-controller-manager/app/options" | 	"k8s.io/kubernetes/federation/cmd/federation-controller-manager/app/options" | ||||||
| 	"k8s.io/kubernetes/pkg/client/restclient" | 	"k8s.io/kubernetes/pkg/client/restclient" | ||||||
|  |  | ||||||
|  | 	internalclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_internalclientset" | ||||||
| 	federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_3" | 	federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_3" | ||||||
|  | 	"k8s.io/kubernetes/federation/pkg/dnsprovider" | ||||||
| 	clustercontroller "k8s.io/kubernetes/federation/pkg/federation-controller/cluster" | 	clustercontroller "k8s.io/kubernetes/federation/pkg/federation-controller/cluster" | ||||||
|  | 	servicecontroller "k8s.io/kubernetes/federation/pkg/federation-controller/service" | ||||||
| 	"k8s.io/kubernetes/pkg/client/unversioned/clientcmd" | 	"k8s.io/kubernetes/pkg/client/unversioned/clientcmd" | ||||||
| 	"k8s.io/kubernetes/pkg/healthz" | 	"k8s.io/kubernetes/pkg/healthz" | ||||||
| 	"k8s.io/kubernetes/pkg/util/configz" | 	"k8s.io/kubernetes/pkg/util/configz" | ||||||
|  | 	"k8s.io/kubernetes/pkg/util/wait" | ||||||
|  |  | ||||||
| 	"github.com/golang/glog" | 	"github.com/golang/glog" | ||||||
| 	"github.com/prometheus/client_golang/prometheus" | 	"github.com/prometheus/client_golang/prometheus" | ||||||
| @@ -103,7 +107,17 @@ func Run(s *options.CMServer) error { | |||||||
| } | } | ||||||
|  |  | ||||||
| func StartControllers(s *options.CMServer, restClientCfg *restclient.Config) error { | func StartControllers(s *options.CMServer, restClientCfg *restclient.Config) error { | ||||||
|  |  | ||||||
| 	federationClientSet := federationclientset.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, "cluster-controller")) | 	federationClientSet := federationclientset.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, "cluster-controller")) | ||||||
| 	go clustercontroller.NewclusterController(federationClientSet, s.ClusterMonitorPeriod.Duration).Run() | 	go clustercontroller.NewclusterController(federationClientSet, s.ClusterMonitorPeriod.Duration).Run() | ||||||
|  | 	dns, err := dnsprovider.InitDnsProvider(s.DnsProvider, s.DnsConfigFile) | ||||||
|  | 	if err != nil { | ||||||
|  | 		glog.Fatalf("Cloud provider could not be initialized: %v", err) | ||||||
|  | 	} | ||||||
|  | 	scclientset := internalclientset.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, servicecontroller.UserAgentName)) | ||||||
|  | 	servicecontroller := servicecontroller.New(scclientset, dns) | ||||||
|  | 	if err := servicecontroller.Run(s.ConcurrentServiceSyncs, wait.NeverStop); err != nil { | ||||||
|  | 		glog.Errorf("Failed to start service controller: %v", err) | ||||||
|  | 	} | ||||||
| 	select {} | 	select {} | ||||||
| } | } | ||||||
|   | |||||||
| @@ -32,6 +32,14 @@ type ControllerManagerConfiguration struct { | |||||||
| 	Port int `json:"port"` | 	Port int `json:"port"` | ||||||
| 	// address is the IP address to serve on (set to 0.0.0.0 for all interfaces). | 	// address is the IP address to serve on (set to 0.0.0.0 for all interfaces). | ||||||
| 	Address string `json:"address"` | 	Address string `json:"address"` | ||||||
|  | 	// dnsProvider is the provider for dns services. | ||||||
|  | 	DnsProvider string `json:"dnsProvider"` | ||||||
|  | 	// dnsConfigFile is the path to the dns provider configuration file. | ||||||
|  | 	DnsConfigFile string `json:"ndsConfigFile"` | ||||||
|  | 	// concurrentServiceSyncs is the number of services that are | ||||||
|  | 	// allowed to sync concurrently. Larger number = more responsive service | ||||||
|  | 	// management, but more CPU (and network) load. | ||||||
|  | 	ConcurrentServiceSyncs int `json:"concurrentServiceSyncs"` | ||||||
| 	// clusterMonitorPeriod is the period for syncing ClusterStatus in cluster controller. | 	// clusterMonitorPeriod is the period for syncing ClusterStatus in cluster controller. | ||||||
| 	ClusterMonitorPeriod unversioned.Duration `json:"clusterMonitorPeriod"` | 	ClusterMonitorPeriod unversioned.Duration `json:"clusterMonitorPeriod"` | ||||||
| 	// APIServerQPS is the QPS to use while talking with federation apiserver. | 	// APIServerQPS is the QPS to use while talking with federation apiserver. | ||||||
| @@ -65,6 +73,7 @@ func NewCMServer() *CMServer { | |||||||
| 		ControllerManagerConfiguration: ControllerManagerConfiguration{ | 		ControllerManagerConfiguration: ControllerManagerConfiguration{ | ||||||
| 			Port:                   FederatedControllerManagerPort, | 			Port:                   FederatedControllerManagerPort, | ||||||
| 			Address:                "0.0.0.0", | 			Address:                "0.0.0.0", | ||||||
|  | 			ConcurrentServiceSyncs: 10, | ||||||
| 			ClusterMonitorPeriod:   unversioned.Duration{Duration: 40 * time.Second}, | 			ClusterMonitorPeriod:   unversioned.Duration{Duration: 40 * time.Second}, | ||||||
| 			APIServerQPS:           20.0, | 			APIServerQPS:           20.0, | ||||||
| 			APIServerBurst:         30, | 			APIServerBurst:         30, | ||||||
| @@ -78,6 +87,7 @@ func NewCMServer() *CMServer { | |||||||
| func (s *CMServer) AddFlags(fs *pflag.FlagSet) { | func (s *CMServer) AddFlags(fs *pflag.FlagSet) { | ||||||
| 	fs.IntVar(&s.Port, "port", s.Port, "The port that the controller-manager's http service runs on") | 	fs.IntVar(&s.Port, "port", s.Port, "The port that the controller-manager's http service runs on") | ||||||
| 	fs.Var(componentconfig.IPVar{Val: &s.Address}, "address", "The IP address to serve on (set to 0.0.0.0 for all interfaces)") | 	fs.Var(componentconfig.IPVar{Val: &s.Address}, "address", "The IP address to serve on (set to 0.0.0.0 for all interfaces)") | ||||||
|  | 	fs.IntVar(&s.ConcurrentServiceSyncs, "concurrent-service-syncs", s.ConcurrentServiceSyncs, "The number of service syncing operations that will be done concurrently. Larger number = faster endpoint updating, but more CPU (and network) load") | ||||||
| 	fs.DurationVar(&s.ClusterMonitorPeriod.Duration, "cluster-monitor-period", s.ClusterMonitorPeriod.Duration, "The period for syncing ClusterStatus in ClusterController.") | 	fs.DurationVar(&s.ClusterMonitorPeriod.Duration, "cluster-monitor-period", s.ClusterMonitorPeriod.Duration, "The period for syncing ClusterStatus in ClusterController.") | ||||||
| 	fs.BoolVar(&s.EnableProfiling, "profiling", true, "Enable profiling via web interface host:port/debug/pprof/") | 	fs.BoolVar(&s.EnableProfiling, "profiling", true, "Enable profiling via web interface host:port/debug/pprof/") | ||||||
| 	fs.StringVar(&s.Master, "master", s.Master, "The address of the federation API server (overrides any value in kubeconfig)") | 	fs.StringVar(&s.Master, "master", s.Master, "The address of the federation API server (overrides any value in kubeconfig)") | ||||||
|   | |||||||
							
								
								
									
										207
									
								
								federation/pkg/federation-controller/service/cluster_helper.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										207
									
								
								federation/pkg/federation-controller/service/cluster_helper.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,207 @@ | |||||||
|  | /* | ||||||
|  | Copyright 2016 The Kubernetes Authors All rights reserved. | ||||||
|  |  | ||||||
|  | Licensed under the Apache License, Version 2.0 (the "License"); | ||||||
|  | you may not use this file except in compliance with the License. | ||||||
|  | You may obtain a copy of the License at | ||||||
|  |  | ||||||
|  |     http://www.apache.org/licenses/LICENSE-2.0 | ||||||
|  |  | ||||||
|  | Unless required by applicable law or agreed to in writing, software | ||||||
|  | distributed under the License is distributed on an "AS IS" BASIS, | ||||||
|  | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||||
|  | See the License for the specific language governing permissions and | ||||||
|  | limitations under the License. | ||||||
|  | */ | ||||||
|  |  | ||||||
|  | package service | ||||||
|  |  | ||||||
|  | import ( | ||||||
|  | 	"sync" | ||||||
|  |  | ||||||
|  | 	"k8s.io/kubernetes/federation/apis/federation" | ||||||
|  | 	"k8s.io/kubernetes/pkg/api" | ||||||
|  | 	cache "k8s.io/kubernetes/pkg/client/cache" | ||||||
|  | 	clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" | ||||||
|  | 	"k8s.io/kubernetes/pkg/client/restclient" | ||||||
|  | 	"k8s.io/kubernetes/pkg/client/unversioned/clientcmd" | ||||||
|  | 	"k8s.io/kubernetes/pkg/controller/framework" | ||||||
|  | 	pkg_runtime "k8s.io/kubernetes/pkg/runtime" | ||||||
|  | 	"k8s.io/kubernetes/pkg/util/wait" | ||||||
|  | 	"k8s.io/kubernetes/pkg/util/workqueue" | ||||||
|  | 	"k8s.io/kubernetes/pkg/watch" | ||||||
|  |  | ||||||
|  | 	"github.com/golang/glog" | ||||||
|  | 	"reflect" | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | type clusterCache struct { | ||||||
|  | 	clientset *clientset.Clientset | ||||||
|  | 	cluster   *federation.Cluster | ||||||
|  | 	// A store of services, populated by the serviceController | ||||||
|  | 	serviceStore cache.StoreToServiceLister | ||||||
|  | 	// Watches changes to all services | ||||||
|  | 	serviceController *framework.Controller | ||||||
|  | 	// A store of endpoint, populated by the serviceController | ||||||
|  | 	endpointStore cache.StoreToEndpointsLister | ||||||
|  | 	// Watches changes to all endpoints | ||||||
|  | 	endpointController *framework.Controller | ||||||
|  | 	// services that need to be synced | ||||||
|  | 	serviceQueue *workqueue.Type | ||||||
|  | 	// endpoints that need to be synced | ||||||
|  | 	endpointQueue *workqueue.Type | ||||||
|  | } | ||||||
|  |  | ||||||
|  | type clusterClientCache struct { | ||||||
|  | 	rwlock    sync.Mutex // protects serviceMap | ||||||
|  | 	clientMap map[string]*clusterCache | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (cc *clusterClientCache) startClusterLW(cluster *federation.Cluster, clusterName string) { | ||||||
|  | 	cachedClusterClient, ok := cc.clientMap[clusterName] | ||||||
|  | 	// only create when no existing cachedClusterClient | ||||||
|  | 	if ok { | ||||||
|  | 		if !reflect.DeepEqual(cachedClusterClient.cluster.Spec, cluster.Spec) { | ||||||
|  | 			//rebuild clientset when cluster spec is changed | ||||||
|  | 			clientset, err := newClusterClientset(cluster) | ||||||
|  | 			if err != nil || clientset == nil { | ||||||
|  | 				glog.Errorf("Failed to create corresponding restclient of kubernetes cluster: %v", err) | ||||||
|  | 			} | ||||||
|  | 			glog.V(4).Infof("Cluster spec changed, rebuild clientset for cluster %s", clusterName) | ||||||
|  | 			cachedClusterClient.clientset = clientset | ||||||
|  | 			go cachedClusterClient.serviceController.Run(wait.NeverStop) | ||||||
|  | 			go cachedClusterClient.endpointController.Run(wait.NeverStop) | ||||||
|  | 			glog.V(2).Infof("Start watching services and endpoints on cluster %s", clusterName) | ||||||
|  | 		} else { | ||||||
|  | 			// do nothing when there is no spec change | ||||||
|  | 			glog.V(4).Infof("Keep clientset for cluster %s", clusterName) | ||||||
|  | 			return | ||||||
|  | 		} | ||||||
|  | 	} else { | ||||||
|  | 		glog.V(4).Infof("No client cache for cluster %s, building new", clusterName) | ||||||
|  | 		clientset, err := newClusterClientset(cluster) | ||||||
|  | 		if err != nil || clientset == nil { | ||||||
|  | 			glog.Errorf("Failed to create corresponding restclient of kubernetes cluster: %v", err) | ||||||
|  | 		} | ||||||
|  | 		cachedClusterClient = &clusterCache{ | ||||||
|  | 			cluster:       cluster, | ||||||
|  | 			clientset:     clientset, | ||||||
|  | 			serviceQueue:  workqueue.New(), | ||||||
|  | 			endpointQueue: workqueue.New(), | ||||||
|  | 		} | ||||||
|  | 		cachedClusterClient.endpointStore.Store, cachedClusterClient.endpointController = framework.NewInformer( | ||||||
|  | 			&cache.ListWatch{ | ||||||
|  | 				ListFunc: func(options api.ListOptions) (pkg_runtime.Object, error) { | ||||||
|  | 					return clientset.Core().Endpoints(api.NamespaceAll).List(options) | ||||||
|  | 				}, | ||||||
|  | 				WatchFunc: func(options api.ListOptions) (watch.Interface, error) { | ||||||
|  | 					return clientset.Core().Endpoints(api.NamespaceAll).Watch(options) | ||||||
|  | 				}, | ||||||
|  | 			}, | ||||||
|  | 			&api.Endpoints{}, | ||||||
|  | 			serviceSyncPeriod, | ||||||
|  | 			framework.ResourceEventHandlerFuncs{ | ||||||
|  | 				AddFunc: func(obj interface{}) { | ||||||
|  | 					cc.enqueueEndpoint(obj, clusterName) | ||||||
|  | 				}, | ||||||
|  | 				UpdateFunc: func(old, cur interface{}) { | ||||||
|  | 					cc.enqueueEndpoint(cur, clusterName) | ||||||
|  | 				}, | ||||||
|  | 				DeleteFunc: func(obj interface{}) { | ||||||
|  | 					cc.enqueueEndpoint(obj, clusterName) | ||||||
|  | 				}, | ||||||
|  | 			}, | ||||||
|  | 		) | ||||||
|  |  | ||||||
|  | 		cachedClusterClient.serviceStore.Store, cachedClusterClient.serviceController = framework.NewInformer( | ||||||
|  | 			&cache.ListWatch{ | ||||||
|  | 				ListFunc: func(options api.ListOptions) (pkg_runtime.Object, error) { | ||||||
|  | 					return clientset.Core().Services(api.NamespaceAll).List(options) | ||||||
|  | 				}, | ||||||
|  | 				WatchFunc: func(options api.ListOptions) (watch.Interface, error) { | ||||||
|  | 					return clientset.Core().Services(api.NamespaceAll).Watch(options) | ||||||
|  | 				}, | ||||||
|  | 			}, | ||||||
|  | 			&api.Service{}, | ||||||
|  | 			serviceSyncPeriod, | ||||||
|  | 			framework.ResourceEventHandlerFuncs{ | ||||||
|  | 				AddFunc: func(obj interface{}) { | ||||||
|  | 					cc.enqueueService(obj, clusterName) | ||||||
|  | 				}, | ||||||
|  | 				UpdateFunc: func(old, cur interface{}) { | ||||||
|  | 					oldService, ok := old.(*api.Service) | ||||||
|  |  | ||||||
|  | 					if !ok { | ||||||
|  | 						return | ||||||
|  | 					} | ||||||
|  | 					curService, ok := cur.(*api.Service) | ||||||
|  | 					if !ok { | ||||||
|  | 						return | ||||||
|  | 					} | ||||||
|  | 					if !reflect.DeepEqual(oldService.Status.LoadBalancer, curService.Status.LoadBalancer) { | ||||||
|  | 						cc.enqueueService(cur, clusterName) | ||||||
|  | 					} | ||||||
|  | 				}, | ||||||
|  | 				DeleteFunc: func(obj interface{}) { | ||||||
|  | 					service, _ := obj.(*api.Service) | ||||||
|  | 					cc.enqueueService(obj, clusterName) | ||||||
|  | 					glog.V(2).Infof("Service %s/%s deletion found and enque to service store %s", service.Namespace, service.Name, clusterName) | ||||||
|  | 				}, | ||||||
|  | 			}, | ||||||
|  | 		) | ||||||
|  | 		cc.clientMap[clusterName] = cachedClusterClient | ||||||
|  | 		go cachedClusterClient.serviceController.Run(wait.NeverStop) | ||||||
|  | 		go cachedClusterClient.endpointController.Run(wait.NeverStop) | ||||||
|  | 		glog.V(2).Infof("Start watching services and endpoints on cluster %s", clusterName) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | } | ||||||
|  |  | ||||||
|  | //TODO: copied from cluster controller, to make this as common function in pass 2 | ||||||
|  | // delFromClusterSet delete a cluster from clusterSet and | ||||||
|  | // delete the corresponding restclient from the map clusterKubeClientMap | ||||||
|  | func (cc *clusterClientCache) delFromClusterSet(obj interface{}) { | ||||||
|  | 	cluster, ok := obj.(*federation.Cluster) | ||||||
|  | 	cc.rwlock.Lock() | ||||||
|  | 	defer cc.rwlock.Unlock() | ||||||
|  | 	if ok { | ||||||
|  | 		delete(cc.clientMap, cluster.Name) | ||||||
|  | 	} else { | ||||||
|  | 		tombstone, ok := obj.(cache.DeletedFinalStateUnknown) | ||||||
|  | 		if !ok { | ||||||
|  | 			glog.Infof("Object contained wasn't a cluster or a deleted key: %+v", obj) | ||||||
|  | 			return | ||||||
|  | 		} | ||||||
|  | 		glog.Infof("Found tombstone for %v", obj) | ||||||
|  | 		delete(cc.clientMap, tombstone.Key) | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // addToClusterSet inserts the new cluster to clusterSet and creates a corresponding | ||||||
|  | // restclient to map clusterKubeClientMap | ||||||
|  | func (cc *clusterClientCache) addToClientMap(obj interface{}) { | ||||||
|  | 	cluster := obj.(*federation.Cluster) | ||||||
|  | 	cc.rwlock.Lock() | ||||||
|  | 	defer cc.rwlock.Unlock() | ||||||
|  | 	cluster, ok := obj.(*federation.Cluster) | ||||||
|  | 	if !ok { | ||||||
|  | 		return | ||||||
|  | 	} | ||||||
|  | 	pred := getClusterConditionPredicate() | ||||||
|  | 	// check status | ||||||
|  | 	// skip if not ready | ||||||
|  | 	if pred(*cluster) { | ||||||
|  | 		cc.startClusterLW(cluster, cluster.Name) | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func newClusterClientset(c *federation.Cluster) (*clientset.Clientset, error) { | ||||||
|  | 	clusterConfig, err := clientcmd.BuildConfigFromFlags(c.Spec.ServerAddressByClientCIDRs[0].ServerAddress, "") | ||||||
|  | 	if err != nil { | ||||||
|  | 		return nil, err | ||||||
|  | 	} | ||||||
|  | 	clusterConfig.QPS = KubeAPIQPS | ||||||
|  | 	clusterConfig.Burst = KubeAPIBurst | ||||||
|  | 	clientset := clientset.NewForConfigOrDie(restclient.AddUserAgent(clusterConfig, UserAgentName)) | ||||||
|  | 	return clientset, nil | ||||||
|  | } | ||||||
							
								
								
									
										40
									
								
								federation/pkg/federation-controller/service/dns.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										40
									
								
								federation/pkg/federation-controller/service/dns.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,40 @@ | |||||||
|  | /* | ||||||
|  | Copyright 2016 The Kubernetes Authors All rights reserved. | ||||||
|  |  | ||||||
|  | Licensed under the Apache License, Version 2.0 (the "License"); | ||||||
|  | you may not use this file except in compliance with the License. | ||||||
|  | You may obtain a copy of the License at | ||||||
|  |  | ||||||
|  |     http://www.apache.org/licenses/LICENSE-2.0 | ||||||
|  |  | ||||||
|  | Unless required by applicable law or agreed to in writing, software | ||||||
|  | distributed under the License is distributed on an "AS IS" BASIS, | ||||||
|  | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||||
|  | See the License for the specific language governing permissions and | ||||||
|  | limitations under the License. | ||||||
|  | */ | ||||||
|  |  | ||||||
|  | package service | ||||||
|  |  | ||||||
|  | // getClusterZoneName returns the name of the zone where the specified cluster exists (e.g. "us-east1-c" on GCE, or "us-east-1b" on AWS) | ||||||
|  | func getClusterZoneName(clusterName string) string { | ||||||
|  | 	// TODO: quinton: Get this from the cluster API object - from the annotation on a node in the cluster - it doesn't contain this yet. | ||||||
|  | 	return "zone-of-cluster-" + clusterName | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // getClusterRegionName returns the name of the region where the specified cluster exists (e.g. us-east1 on GCE, or "us-east-1" on AWS) | ||||||
|  | func getClusterRegionName(clusterName string) string { | ||||||
|  | 	// TODO: quinton: Get this from the cluster API object - from the annotation on a node in the cluster - it doesn't contain this yet. | ||||||
|  | 	return "region-of-cluster-" + clusterName | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // getFederationDNSZoneName returns the name of the managed DNS Zone configured for this federation | ||||||
|  | func getFederationDNSZoneName() string { | ||||||
|  | 	return "mydomain.com" // TODO: quinton: Get this from the federation configuration. | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func ensureDNSRecords(clusterName string, cachedService *cachedService) error { | ||||||
|  | 	// Quinton: Pseudocode.... | ||||||
|  |  | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
							
								
								
									
										19
									
								
								federation/pkg/federation-controller/service/doc.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										19
									
								
								federation/pkg/federation-controller/service/doc.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,19 @@ | |||||||
|  | /* | ||||||
|  | Copyright 2016 The Kubernetes Authors All rights reserved. | ||||||
|  |  | ||||||
|  | Licensed under the Apache License, Version 2.0 (the "License"); | ||||||
|  | you may not use this file except in compliance with the License. | ||||||
|  | You may obtain a copy of the License at | ||||||
|  |  | ||||||
|  |     http://www.apache.org/licenses/LICENSE-2.0 | ||||||
|  |  | ||||||
|  | Unless required by applicable law or agreed to in writing, software | ||||||
|  | distributed under the License is distributed on an "AS IS" BASIS, | ||||||
|  | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||||
|  | See the License for the specific language governing permissions and | ||||||
|  | limitations under the License. | ||||||
|  | */ | ||||||
|  |  | ||||||
|  | // Package service contains code for syncing Kubernetes services, | ||||||
|  | // and cloud DNS servers with the federated service registry. | ||||||
|  | package service | ||||||
							
								
								
									
										162
									
								
								federation/pkg/federation-controller/service/endpoint_helper.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										162
									
								
								federation/pkg/federation-controller/service/endpoint_helper.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,162 @@ | |||||||
|  | /* | ||||||
|  | Copyright 2016 The Kubernetes Authors All rights reserved. | ||||||
|  |  | ||||||
|  | Licensed under the Apache License, Version 2.0 (the "License"); | ||||||
|  | you may not use this file except in compliance with the License. | ||||||
|  | You may obtain a copy of the License at | ||||||
|  |  | ||||||
|  |     http://www.apache.org/licenses/LICENSE-2.0 | ||||||
|  |  | ||||||
|  | Unless required by applicable law or agreed to in writing, software | ||||||
|  | distributed under the License is distributed on an "AS IS" BASIS, | ||||||
|  | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||||
|  | See the License for the specific language governing permissions and | ||||||
|  | limitations under the License. | ||||||
|  | */ | ||||||
|  |  | ||||||
|  | package service | ||||||
|  |  | ||||||
|  | import ( | ||||||
|  | 	"fmt" | ||||||
|  | 	"time" | ||||||
|  |  | ||||||
|  | 	federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_internalclientset" | ||||||
|  | 	"k8s.io/kubernetes/pkg/api" | ||||||
|  | 	cache "k8s.io/kubernetes/pkg/client/cache" | ||||||
|  | 	"k8s.io/kubernetes/pkg/controller" | ||||||
|  |  | ||||||
|  | 	"github.com/golang/glog" | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | // worker runs a worker thread that just dequeues items, processes them, and marks them done. | ||||||
|  | // It enforces that the syncHandler is never invoked concurrently with the same key. | ||||||
|  | func (sc *ServiceController) clusterEndpointWorker() { | ||||||
|  | 	fedClient := sc.federationClient | ||||||
|  | 	for clusterName, cache := range sc.clusterCache.clientMap { | ||||||
|  | 		go func(cache *clusterCache, clusterName string) { | ||||||
|  | 			for { | ||||||
|  | 				func() { | ||||||
|  | 					key, quit := cache.endpointQueue.Get() | ||||||
|  | 					// update endpoint cache | ||||||
|  | 					if quit { | ||||||
|  | 						return | ||||||
|  | 					} | ||||||
|  | 					defer cache.endpointQueue.Done(key) | ||||||
|  | 					err := sc.clusterCache.syncEndpoint(key.(string), clusterName, cache, sc.serviceCache, fedClient) | ||||||
|  | 					if err != nil { | ||||||
|  | 						glog.V(2).Infof("Failed to sync endpoint: %+v", err) | ||||||
|  | 					} | ||||||
|  | 				}() | ||||||
|  | 			} | ||||||
|  | 		}(cache, clusterName) | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // Whenever there is change on endpoint, the federation service should be updated | ||||||
|  | // key is the namespaced name of endpoint | ||||||
|  | func (cc *clusterClientCache) syncEndpoint(key, clusterName string, clusterCache *clusterCache, serviceCache *serviceCache, fedClient federationclientset.Interface) error { | ||||||
|  | 	cachedService, ok := serviceCache.get(key) | ||||||
|  | 	if !ok { | ||||||
|  | 		// here we filtered all non-federation services | ||||||
|  | 		return nil | ||||||
|  | 	} | ||||||
|  | 	endpointInterface, exists, err := clusterCache.endpointStore.GetByKey(key) | ||||||
|  | 	if err != nil { | ||||||
|  | 		glog.Infof("Did not successfully get %v from store: %v, will retry later", key, err) | ||||||
|  | 		clusterCache.endpointQueue.Add(key) | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  | 	if exists { | ||||||
|  | 		endpoint, ok := endpointInterface.(*api.Endpoints) | ||||||
|  | 		if ok { | ||||||
|  | 			glog.V(4).Infof("Found endpoint for federation service %s/%s from cluster %s", endpoint.Namespace, endpoint.Name, clusterName) | ||||||
|  | 			err = cc.processEndpointUpdate(cachedService, endpoint, clusterName) | ||||||
|  | 		} else { | ||||||
|  | 			_, ok := endpointInterface.(cache.DeletedFinalStateUnknown) | ||||||
|  | 			if !ok { | ||||||
|  | 				return fmt.Errorf("Object contained wasn't a service or a deleted key: %+v", endpointInterface) | ||||||
|  | 			} | ||||||
|  | 			glog.Infof("Found tombstone for %v", key) | ||||||
|  | 			err = cc.processEndpointDeletion(cachedService, clusterName) | ||||||
|  | 		} | ||||||
|  | 	} else { | ||||||
|  | 		// service absence in store means watcher caught the deletion, ensure LB info is cleaned | ||||||
|  | 		glog.Infof("Can not get endpoint %v for cluster %s from endpointStore", key, clusterName) | ||||||
|  | 		err = cc.processEndpointDeletion(cachedService, clusterName) | ||||||
|  | 	} | ||||||
|  | 	if err != nil { | ||||||
|  | 		glog.Errorf("Failed to sync service: %+v, put back to service queue", err) | ||||||
|  | 		clusterCache.endpointQueue.Add(key) | ||||||
|  | 	} | ||||||
|  | 	cachedService.resetDNSUpdateDelay() | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (cc *clusterClientCache) processEndpointDeletion(cachedService *cachedService, clusterName string) error { | ||||||
|  | 	glog.V(4).Infof("Processing endpoint update for %s/%s, cluster %s", cachedService.lastState.Namespace, cachedService.lastState.Name, clusterName) | ||||||
|  | 	var err error | ||||||
|  | 	cachedService.rwlock.Lock() | ||||||
|  | 	defer cachedService.rwlock.Unlock() | ||||||
|  | 	_, ok := cachedService.endpointMap[clusterName] | ||||||
|  | 	// TODO remove ok checking? if service controller is restarted, then endpointMap for the cluster does not exist | ||||||
|  | 	// need to query dns info from dnsprovider and make sure of if deletion is needed | ||||||
|  | 	if ok { | ||||||
|  | 		// endpoints lost, clean dns record | ||||||
|  | 		glog.V(4).Infof("Cached endpoint was not found for %s/%s, cluster %s, building one", cachedService.lastState.Namespace, cachedService.lastState.Name, clusterName) | ||||||
|  | 		// TODO: need to integrate with dns.go:ensureDNSRecords | ||||||
|  | 		for i := 0; i < clientRetryCount; i++ { | ||||||
|  | 			err := ensureDNSRecords(clusterName, cachedService) | ||||||
|  | 			if err == nil { | ||||||
|  | 				delete(cachedService.endpointMap, clusterName) | ||||||
|  | 				return nil | ||||||
|  | 			} | ||||||
|  | 			time.Sleep(cachedService.nextDNSUpdateDelay()) | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 	return err | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // Update dns info when endpoint update event received | ||||||
|  | // We do not care about the endpoint info, what we need to make sure here is len(endpoints.subsets)>0 | ||||||
|  | func (cc *clusterClientCache) processEndpointUpdate(cachedService *cachedService, endpoint *api.Endpoints, clusterName string) error { | ||||||
|  | 	glog.V(4).Infof("Processing endpoint update for %s/%s, cluster %s", endpoint.Namespace, endpoint.Name, clusterName) | ||||||
|  | 	cachedService.rwlock.Lock() | ||||||
|  | 	defer cachedService.rwlock.Unlock() | ||||||
|  | 	for _, subset := range endpoint.Subsets { | ||||||
|  | 		if len(subset.Addresses) > 0 { | ||||||
|  | 			cachedService.endpointMap[clusterName] = 1 | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 	_, ok := cachedService.endpointMap[clusterName] | ||||||
|  | 	if !ok { | ||||||
|  | 		// first time get endpoints, update dns record | ||||||
|  | 		glog.V(4).Infof("Cached endpoint was not found for %s/%s, cluster %s, building one", endpoint.Namespace, endpoint.Name, clusterName) | ||||||
|  | 		cachedService.endpointMap[clusterName] = 1 | ||||||
|  | 		err := ensureDNSRecords(clusterName, cachedService) | ||||||
|  | 		if err != nil { | ||||||
|  | 			// TODO: need to integrate with dns.go:ensureDNSRecords | ||||||
|  | 			for i := 0; i < clientRetryCount; i++ { | ||||||
|  | 				time.Sleep(cachedService.nextDNSUpdateDelay()) | ||||||
|  | 				err := ensureDNSRecords(clusterName, cachedService) | ||||||
|  | 				if err == nil { | ||||||
|  | 					return nil | ||||||
|  | 				} | ||||||
|  | 			} | ||||||
|  | 			return err | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // obj could be an *api.Endpoints, or a DeletionFinalStateUnknown marker item. | ||||||
|  | func (cc *clusterClientCache) enqueueEndpoint(obj interface{}, clusterName string) { | ||||||
|  | 	key, err := controller.KeyFunc(obj) | ||||||
|  | 	if err != nil { | ||||||
|  | 		glog.Errorf("Couldn't get key for object %+v: %v", obj, err) | ||||||
|  | 		return | ||||||
|  | 	} | ||||||
|  | 	_, ok := cc.clientMap[clusterName] | ||||||
|  | 	if ok { | ||||||
|  | 		cc.clientMap[clusterName].endpointQueue.Add(key) | ||||||
|  | 	} | ||||||
|  | } | ||||||
| @@ -0,0 +1,120 @@ | |||||||
|  | /* | ||||||
|  | Copyright 2016 The Kubernetes Authors All rights reserved. | ||||||
|  |  | ||||||
|  | Licensed under the Apache License, Version 2.0 (the "License"); | ||||||
|  | you may not use this file except in compliance with the License. | ||||||
|  | You may obtain a copy of the License at | ||||||
|  |  | ||||||
|  |     http://www.apache.org/licenses/LICENSE-2.0 | ||||||
|  |  | ||||||
|  | Unless required by applicable law or agreed to in writing, software | ||||||
|  | distributed under the License is distributed on an "AS IS" BASIS, | ||||||
|  | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||||
|  | See the License for the specific language governing permissions and | ||||||
|  | limitations under the License. | ||||||
|  | */ | ||||||
|  |  | ||||||
|  | package service | ||||||
|  |  | ||||||
|  | import ( | ||||||
|  | 	"testing" | ||||||
|  |  | ||||||
|  | 	"k8s.io/kubernetes/pkg/api" | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | func buildEndpoint(subsets [][]string) *api.Endpoints { | ||||||
|  | 	endpoint := &api.Endpoints{ | ||||||
|  | 		Subsets: []api.EndpointSubset{ | ||||||
|  | 			{Addresses: []api.EndpointAddress{}}, | ||||||
|  | 		}, | ||||||
|  | 	} | ||||||
|  | 	for _, element := range subsets { | ||||||
|  | 		address := api.EndpointAddress{IP: element[0], Hostname: element[1], TargetRef: nil} | ||||||
|  | 		endpoint.Subsets[0].Addresses = append(endpoint.Subsets[0].Addresses, address) | ||||||
|  | 	} | ||||||
|  | 	return endpoint | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func TestProcessEndpointUpdate(t *testing.T) { | ||||||
|  | 	cc := clusterClientCache{ | ||||||
|  | 		clientMap: make(map[string]*clusterCache), | ||||||
|  | 	} | ||||||
|  | 	tests := []struct { | ||||||
|  | 		name          string | ||||||
|  | 		cachedService *cachedService | ||||||
|  | 		endpoint      *api.Endpoints | ||||||
|  | 		clusterName   string | ||||||
|  | 		expectResult  int | ||||||
|  | 	}{ | ||||||
|  | 		{ | ||||||
|  | 			"no-cache", | ||||||
|  | 			&cachedService{ | ||||||
|  | 				lastState:   &api.Service{}, | ||||||
|  | 				endpointMap: make(map[string]int), | ||||||
|  | 			}, | ||||||
|  | 			buildEndpoint([][]string{{"ip1", ""}}), | ||||||
|  | 			"foo", | ||||||
|  | 			1, | ||||||
|  | 		}, | ||||||
|  | 		{ | ||||||
|  | 			"has-cache", | ||||||
|  | 			&cachedService{ | ||||||
|  | 				lastState: &api.Service{}, | ||||||
|  | 				endpointMap: map[string]int{ | ||||||
|  | 					"foo": 1, | ||||||
|  | 				}, | ||||||
|  | 			}, | ||||||
|  | 			buildEndpoint([][]string{{"ip1", ""}}), | ||||||
|  | 			"foo", | ||||||
|  | 			1, | ||||||
|  | 		}, | ||||||
|  | 	} | ||||||
|  | 	for _, test := range tests { | ||||||
|  | 		cc.processEndpointUpdate(test.cachedService, test.endpoint, test.clusterName) | ||||||
|  | 		if test.expectResult != test.cachedService.endpointMap[test.clusterName] { | ||||||
|  | 			t.Errorf("Test failed for %s, expected %v, saw %v", test.name, test.expectResult, test.cachedService.endpointMap[test.clusterName]) | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func TestProcessEndpointDeletion(t *testing.T) { | ||||||
|  | 	cc := clusterClientCache{ | ||||||
|  | 		clientMap: make(map[string]*clusterCache), | ||||||
|  | 	} | ||||||
|  | 	tests := []struct { | ||||||
|  | 		name          string | ||||||
|  | 		cachedService *cachedService | ||||||
|  | 		endpoint      *api.Endpoints | ||||||
|  | 		clusterName   string | ||||||
|  | 		expectResult  int | ||||||
|  | 	}{ | ||||||
|  | 		{ | ||||||
|  | 			"no-cache", | ||||||
|  | 			&cachedService{ | ||||||
|  | 				lastState:   &api.Service{}, | ||||||
|  | 				endpointMap: make(map[string]int), | ||||||
|  | 			}, | ||||||
|  | 			buildEndpoint([][]string{{"ip1", ""}}), | ||||||
|  | 			"foo", | ||||||
|  | 			0, | ||||||
|  | 		}, | ||||||
|  | 		{ | ||||||
|  | 			"has-cache", | ||||||
|  | 			&cachedService{ | ||||||
|  | 				lastState: &api.Service{}, | ||||||
|  | 				endpointMap: map[string]int{ | ||||||
|  | 					"foo": 1, | ||||||
|  | 				}, | ||||||
|  | 			}, | ||||||
|  | 			buildEndpoint([][]string{{"ip1", ""}}), | ||||||
|  | 			"foo", | ||||||
|  | 			0, | ||||||
|  | 		}, | ||||||
|  | 	} | ||||||
|  | 	for _, test := range tests { | ||||||
|  | 		cc.processEndpointDeletion(test.cachedService, test.clusterName) | ||||||
|  | 		if test.expectResult != test.cachedService.endpointMap[test.clusterName] { | ||||||
|  | 			t.Errorf("Test failed for %s, expected %v, saw %v", test.name, test.expectResult, test.cachedService.endpointMap[test.clusterName]) | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | } | ||||||
							
								
								
									
										253
									
								
								federation/pkg/federation-controller/service/service_helper.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										253
									
								
								federation/pkg/federation-controller/service/service_helper.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,253 @@ | |||||||
|  | /* | ||||||
|  | Copyright 2016 The Kubernetes Authors All rights reserved. | ||||||
|  |  | ||||||
|  | Licensed under the Apache License, Version 2.0 (the "License"); | ||||||
|  | you may not use this file except in compliance with the License. | ||||||
|  | You may obtain a copy of the License at | ||||||
|  |  | ||||||
|  |     http://www.apache.org/licenses/LICENSE-2.0 | ||||||
|  |  | ||||||
|  | Unless required by applicable law or agreed to in writing, software | ||||||
|  | distributed under the License is distributed on an "AS IS" BASIS, | ||||||
|  | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||||
|  | See the License for the specific language governing permissions and | ||||||
|  | limitations under the License. | ||||||
|  | */ | ||||||
|  |  | ||||||
|  | package service | ||||||
|  |  | ||||||
|  | import ( | ||||||
|  | 	"fmt" | ||||||
|  | 	"time" | ||||||
|  |  | ||||||
|  | 	federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_internalclientset" | ||||||
|  | 	"k8s.io/kubernetes/pkg/api" | ||||||
|  | 	"k8s.io/kubernetes/pkg/api/errors" | ||||||
|  | 	cache "k8s.io/kubernetes/pkg/client/cache" | ||||||
|  | 	"k8s.io/kubernetes/pkg/controller" | ||||||
|  |  | ||||||
|  | 	"github.com/golang/glog" | ||||||
|  | 	"reflect" | ||||||
|  | 	"sort" | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | // worker runs a worker thread that just dequeues items, processes them, and marks them done. | ||||||
|  | // It enforces that the syncHandler is never invoked concurrently with the same key. | ||||||
|  | func (sc *ServiceController) clusterServiceWorker() { | ||||||
|  | 	fedClient := sc.federationClient | ||||||
|  | 	for clusterName, cache := range sc.clusterCache.clientMap { | ||||||
|  | 		go func(cache *clusterCache, clusterName string) { | ||||||
|  | 			for { | ||||||
|  | 				func() { | ||||||
|  | 					key, quit := cache.serviceQueue.Get() | ||||||
|  | 					defer cache.serviceQueue.Done(key) | ||||||
|  | 					if quit { | ||||||
|  | 						return | ||||||
|  | 					} | ||||||
|  | 					err := sc.clusterCache.syncService(key.(string), clusterName, cache, sc.serviceCache, fedClient) | ||||||
|  | 					if err != nil { | ||||||
|  | 						glog.Errorf("Failed to sync service: %+v", err) | ||||||
|  | 					} | ||||||
|  | 				}() | ||||||
|  | 			} | ||||||
|  | 		}(cache, clusterName) | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // Whenever there is change on service, the federation service should be updated | ||||||
|  | func (cc *clusterClientCache) syncService(key, clusterName string, clusterCache *clusterCache, serviceCache *serviceCache, fedClient federationclientset.Interface) error { | ||||||
|  | 	// obj holds the latest service info from apiserver, return if there is no federation cache for the service | ||||||
|  | 	cachedService, ok := serviceCache.get(key) | ||||||
|  | 	if !ok { | ||||||
|  | 		// if serviceCache does not exists, that means the service is not created by federation, we should skip it | ||||||
|  | 		return nil | ||||||
|  | 	} | ||||||
|  | 	serviceInterface, exists, err := clusterCache.serviceStore.GetByKey(key) | ||||||
|  | 	if err != nil { | ||||||
|  | 		glog.Infof("Did not successfully get %v from store: %v, will retry later", key, err) | ||||||
|  | 		clusterCache.serviceQueue.Add(key) | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  | 	var needUpdate bool | ||||||
|  | 	if exists { | ||||||
|  | 		service, ok := serviceInterface.(*api.Service) | ||||||
|  | 		if ok { | ||||||
|  | 			glog.V(4).Infof("Found service for federation service %s/%s from cluster %s", service.Namespace, service.Name, clusterName) | ||||||
|  | 			needUpdate = cc.processServiceUpdate(cachedService, service, clusterName) | ||||||
|  | 		} else { | ||||||
|  | 			_, ok := serviceInterface.(cache.DeletedFinalStateUnknown) | ||||||
|  | 			if !ok { | ||||||
|  | 				return fmt.Errorf("Object contained wasn't a service or a deleted key: %+v", serviceInterface) | ||||||
|  | 			} | ||||||
|  | 			glog.Infof("Found tombstone for %v", key) | ||||||
|  | 			needUpdate = cc.processServiceDeletion(cachedService, clusterName) | ||||||
|  | 		} | ||||||
|  | 	} else { | ||||||
|  | 		glog.Infof("Can not get service %v for cluster %s from serviceStore", key, clusterName) | ||||||
|  | 		needUpdate = cc.processServiceDeletion(cachedService, clusterName) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	if needUpdate { | ||||||
|  | 		err := cc.persistFedServiceUpdate(cachedService, fedClient) | ||||||
|  | 		if err == nil { | ||||||
|  | 			cachedService.appliedState = cachedService.lastState | ||||||
|  | 			cachedService.resetFedUpdateDelay() | ||||||
|  | 		} else { | ||||||
|  | 			if err != nil { | ||||||
|  | 				glog.Errorf("Failed to sync service: %+v, put back to service queue", err) | ||||||
|  | 				clusterCache.serviceQueue.Add(key) | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // processServiceDeletion is triggered when a service is delete from underlying k8s cluster | ||||||
|  | // the deletion function will wip out the cached ingress info of the service from federation service ingress | ||||||
|  | // the function returns a bool to indicate if actual update happend on federation service cache | ||||||
|  | // and if the federation service cache is updated, the updated info should be post to federation apiserver | ||||||
|  | func (cc *clusterClientCache) processServiceDeletion(cachedService *cachedService, clusterName string) bool { | ||||||
|  | 	cachedService.rwlock.Lock() | ||||||
|  | 	defer cachedService.rwlock.Unlock() | ||||||
|  | 	cachedStatus, ok := cachedService.serviceStatusMap[clusterName] | ||||||
|  | 	// cached status found, remove ingress info from federation service cache | ||||||
|  | 	if ok { | ||||||
|  | 		cachedFedServiceStatus := cachedService.lastState.Status.LoadBalancer | ||||||
|  | 		removeIndexes := []int{} | ||||||
|  | 		for i, fed := range cachedFedServiceStatus.Ingress { | ||||||
|  | 			for _, new := range cachedStatus.Ingress { | ||||||
|  | 				// remove if same ingress record found | ||||||
|  | 				if new.IP == fed.IP && new.Hostname == fed.Hostname { | ||||||
|  | 					removeIndexes = append(removeIndexes, i) | ||||||
|  | 				} | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 		sort.Ints(removeIndexes) | ||||||
|  | 		for i := len(removeIndexes) - 1; i >= 0; i-- { | ||||||
|  | 			cachedFedServiceStatus.Ingress = append(cachedFedServiceStatus.Ingress[:removeIndexes[i]], cachedFedServiceStatus.Ingress[removeIndexes[i]+1:]...) | ||||||
|  | 			glog.V(4).Infof("Remove old ingress %d for service %s/%s", removeIndexes[i], cachedService.lastState.Namespace, cachedService.lastState.Name) | ||||||
|  | 		} | ||||||
|  | 		delete(cachedService.serviceStatusMap, clusterName) | ||||||
|  | 		delete(cachedService.endpointMap, clusterName) | ||||||
|  | 		cachedService.lastState.Status.LoadBalancer = cachedFedServiceStatus | ||||||
|  | 		return true | ||||||
|  | 	} else { | ||||||
|  | 		glog.V(4).Infof("Service removal %s/%s from cluster %s observed.", cachedService.lastState.Namespace, cachedService.lastState.Name, clusterName) | ||||||
|  | 	} | ||||||
|  | 	return false | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // processServiceUpdate Update ingress info when service updated | ||||||
|  | // the function returns a bool to indicate if actual update happend on federation service cache | ||||||
|  | // and if the federation service cache is updated, the updated info should be post to federation apiserver | ||||||
|  | func (cc *clusterClientCache) processServiceUpdate(cachedService *cachedService, service *api.Service, clusterName string) bool { | ||||||
|  | 	glog.V(4).Infof("Processing service update for %s/%s, cluster %s", service.Namespace, service.Name, clusterName) | ||||||
|  | 	cachedService.rwlock.Lock() | ||||||
|  | 	defer cachedService.rwlock.Unlock() | ||||||
|  | 	var needUpdate bool | ||||||
|  | 	newServiceLB := service.Status.LoadBalancer | ||||||
|  | 	cachedFedServiceStatus := cachedService.lastState.Status.LoadBalancer | ||||||
|  | 	if len(newServiceLB.Ingress) == 0 { | ||||||
|  | 		// not yet get LB IP | ||||||
|  | 		return false | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	cachedStatus, ok := cachedService.serviceStatusMap[clusterName] | ||||||
|  | 	if ok { | ||||||
|  | 		if reflect.DeepEqual(cachedStatus, newServiceLB) { | ||||||
|  | 			glog.V(4).Infof("Same ingress info observed for service %s/%s: %+v ", service.Namespace, service.Name, cachedStatus.Ingress) | ||||||
|  | 		} else { | ||||||
|  | 			glog.V(4).Infof("Ingress info was changed for service %s/%s: cache: %+v, new: %+v ", | ||||||
|  | 				service.Namespace, service.Name, cachedStatus.Ingress, newServiceLB) | ||||||
|  | 			needUpdate = true | ||||||
|  | 		} | ||||||
|  | 	} else { | ||||||
|  | 		glog.V(4).Infof("Cached service status was not found for %s/%s, cluster %s, building one", service.Namespace, service.Name, clusterName) | ||||||
|  |  | ||||||
|  | 		// cache is not always reliable(cache will be cleaned when service controller restart) | ||||||
|  | 		// two cases will run into this branch: | ||||||
|  | 		// 1. new service loadbalancer info received -> no info in cache, and no in federation service | ||||||
|  | 		// 2. service controller being restarted -> no info in cache, but it is in federation service | ||||||
|  |  | ||||||
|  | 		// check if the lb info is already in federation service | ||||||
|  |  | ||||||
|  | 		cachedService.serviceStatusMap[clusterName] = newServiceLB | ||||||
|  | 		needUpdate = false | ||||||
|  | 		// iterate service ingress info | ||||||
|  | 		for _, new := range newServiceLB.Ingress { | ||||||
|  | 			var found bool | ||||||
|  | 			// if it is known by federation service | ||||||
|  | 			for _, fed := range cachedFedServiceStatus.Ingress { | ||||||
|  | 				if new.IP == fed.IP && new.Hostname == fed.Hostname { | ||||||
|  | 					found = true | ||||||
|  | 				} | ||||||
|  | 			} | ||||||
|  | 			if !found { | ||||||
|  | 				needUpdate = true | ||||||
|  | 				break | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	if needUpdate { | ||||||
|  | 		// new status = cached federation status - cached status + new status from k8s cluster | ||||||
|  |  | ||||||
|  | 		removeIndexes := []int{} | ||||||
|  | 		for i, fed := range cachedFedServiceStatus.Ingress { | ||||||
|  | 			for _, new := range cachedStatus.Ingress { | ||||||
|  | 				// remove if same ingress record found | ||||||
|  | 				if new.IP == fed.IP && new.Hostname == fed.Hostname { | ||||||
|  | 					removeIndexes = append(removeIndexes, i) | ||||||
|  | 				} | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 		sort.Ints(removeIndexes) | ||||||
|  | 		for i := len(removeIndexes) - 1; i >= 0; i-- { | ||||||
|  | 			cachedFedServiceStatus.Ingress = append(cachedFedServiceStatus.Ingress[:removeIndexes[i]], cachedFedServiceStatus.Ingress[removeIndexes[i]+1:]...) | ||||||
|  | 		} | ||||||
|  | 		cachedFedServiceStatus.Ingress = append(cachedFedServiceStatus.Ingress, service.Status.LoadBalancer.Ingress...) | ||||||
|  | 		cachedService.lastState.Status.LoadBalancer = cachedFedServiceStatus | ||||||
|  | 		glog.V(4).Infof("Add new ingress info %+v for service %s/%s", service.Status.LoadBalancer, service.Namespace, service.Name) | ||||||
|  | 	} else { | ||||||
|  | 		glog.V(4).Infof("Same ingress info found for %s/%s, cluster %s", service.Namespace, service.Name, clusterName) | ||||||
|  | 	} | ||||||
|  | 	return needUpdate | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (cc *clusterClientCache) persistFedServiceUpdate(cachedService *cachedService, fedClient federationclientset.Interface) error { | ||||||
|  | 	service := cachedService.lastState | ||||||
|  | 	glog.V(5).Infof("Persist federation service status %s/%s", service.Namespace, service.Name) | ||||||
|  | 	var err error | ||||||
|  | 	for i := 0; i < clientRetryCount; i++ { | ||||||
|  | 		_, err := fedClient.Core().Services(service.Namespace).UpdateStatus(service) | ||||||
|  | 		if err == nil { | ||||||
|  | 			glog.V(2).Infof("Successfully update service %s/%s to federation apiserver", service.Namespace, service.Name) | ||||||
|  | 			return nil | ||||||
|  | 		} | ||||||
|  | 		if errors.IsNotFound(err) { | ||||||
|  | 			glog.Infof("Not persisting update to service '%s/%s' that no longer exists: %v", | ||||||
|  | 				service.Namespace, service.Name, err) | ||||||
|  | 			return nil | ||||||
|  | 		} | ||||||
|  | 		if errors.IsConflict(err) { | ||||||
|  | 			glog.V(4).Infof("Not persisting update to service '%s/%s' that has been changed since we received it: %v", | ||||||
|  | 				service.Namespace, service.Name, err) | ||||||
|  | 			return err | ||||||
|  | 		} | ||||||
|  | 		time.Sleep(cachedService.nextFedUpdateDelay()) | ||||||
|  | 	} | ||||||
|  | 	return err | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // obj could be an *api.Service, or a DeletionFinalStateUnknown marker item. | ||||||
|  | func (cc *clusterClientCache) enqueueService(obj interface{}, clusterName string) { | ||||||
|  | 	key, err := controller.KeyFunc(obj) | ||||||
|  | 	if err != nil { | ||||||
|  | 		glog.Errorf("Couldn't get key for object %+v: %v", obj, err) | ||||||
|  | 		return | ||||||
|  | 	} | ||||||
|  | 	_, ok := cc.clientMap[clusterName] | ||||||
|  | 	if ok { | ||||||
|  | 		cc.clientMap[clusterName].serviceQueue.Add(key) | ||||||
|  | 	} | ||||||
|  | } | ||||||
| @@ -0,0 +1,162 @@ | |||||||
|  | /* | ||||||
|  | Copyright 2016 The Kubernetes Authors All rights reserved. | ||||||
|  |  | ||||||
|  | Licensed under the Apache License, Version 2.0 (the "License"); | ||||||
|  | you may not use this file except in compliance with the License. | ||||||
|  | You may obtain a copy of the License at | ||||||
|  |  | ||||||
|  |     http://www.apache.org/licenses/LICENSE-2.0 | ||||||
|  |  | ||||||
|  | Unless required by applicable law or agreed to in writing, software | ||||||
|  | distributed under the License is distributed on an "AS IS" BASIS, | ||||||
|  | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||||
|  | See the License for the specific language governing permissions and | ||||||
|  | limitations under the License. | ||||||
|  | */ | ||||||
|  |  | ||||||
|  | package service | ||||||
|  |  | ||||||
|  | import ( | ||||||
|  | 	"reflect" | ||||||
|  | 	"testing" | ||||||
|  |  | ||||||
|  | 	"k8s.io/kubernetes/pkg/api" | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | func buildServiceStatus(ingresses [][]string) api.LoadBalancerStatus { | ||||||
|  | 	status := api.LoadBalancerStatus{ | ||||||
|  | 		Ingress: []api.LoadBalancerIngress{}, | ||||||
|  | 	} | ||||||
|  | 	for _, element := range ingresses { | ||||||
|  | 		ingress := api.LoadBalancerIngress{IP: element[0], Hostname: element[1]} | ||||||
|  | 		status.Ingress = append(status.Ingress, ingress) | ||||||
|  | 	} | ||||||
|  | 	return status | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func TestProcessServiceUpdate(t *testing.T) { | ||||||
|  | 	cc := clusterClientCache{ | ||||||
|  | 		clientMap: make(map[string]*clusterCache), | ||||||
|  | 	} | ||||||
|  | 	tests := []struct { | ||||||
|  | 		name             string | ||||||
|  | 		cachedService    *cachedService | ||||||
|  | 		service          *api.Service | ||||||
|  | 		clusterName      string | ||||||
|  | 		expectNeedUpdate bool | ||||||
|  | 		expectStatus     api.LoadBalancerStatus | ||||||
|  | 	}{ | ||||||
|  | 		{ | ||||||
|  | 			"no-cache", | ||||||
|  | 			&cachedService{ | ||||||
|  | 				lastState:        &api.Service{}, | ||||||
|  | 				serviceStatusMap: make(map[string]api.LoadBalancerStatus), | ||||||
|  | 			}, | ||||||
|  | 			&api.Service{Status: api.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip1", ""}})}}, | ||||||
|  | 			"foo", | ||||||
|  | 			true, | ||||||
|  | 			buildServiceStatus([][]string{{"ip1", ""}}), | ||||||
|  | 		}, | ||||||
|  | 		{ | ||||||
|  | 			"same-ingress", | ||||||
|  | 			&cachedService{ | ||||||
|  | 				lastState: &api.Service{Status: api.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip1", ""}})}}, | ||||||
|  | 				serviceStatusMap: map[string]api.LoadBalancerStatus{ | ||||||
|  | 					"foo1": {Ingress: []api.LoadBalancerIngress{{"ip1", ""}}}, | ||||||
|  | 				}, | ||||||
|  | 			}, | ||||||
|  | 			&api.Service{Status: api.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip1", ""}})}}, | ||||||
|  | 			"foo1", | ||||||
|  | 			false, | ||||||
|  | 			buildServiceStatus([][]string{{"ip1", ""}}), | ||||||
|  | 		}, | ||||||
|  | 		{ | ||||||
|  | 			"diff-cluster", | ||||||
|  | 			&cachedService{ | ||||||
|  | 				lastState: &api.Service{ | ||||||
|  | 					ObjectMeta: api.ObjectMeta{Name: "bar1"}, | ||||||
|  | 				}, | ||||||
|  | 				serviceStatusMap: map[string]api.LoadBalancerStatus{ | ||||||
|  | 					"foo2": {Ingress: []api.LoadBalancerIngress{{"ip1", ""}}}, | ||||||
|  | 				}, | ||||||
|  | 			}, | ||||||
|  | 			&api.Service{Status: api.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip1", ""}})}}, | ||||||
|  | 			"foo1", | ||||||
|  | 			true, | ||||||
|  | 			buildServiceStatus([][]string{{"ip1", ""}}), | ||||||
|  | 		}, | ||||||
|  | 		{ | ||||||
|  | 			"diff-ingress", | ||||||
|  | 			&cachedService{ | ||||||
|  | 				lastState: &api.Service{Status: api.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip4", ""}, {"ip1", ""}, {"ip2", ""}})}}, | ||||||
|  | 				serviceStatusMap: map[string]api.LoadBalancerStatus{ | ||||||
|  | 					"foo1": buildServiceStatus([][]string{{"ip4", ""}, {"ip1", ""}, {"ip2", ""}}), | ||||||
|  | 				}, | ||||||
|  | 			}, | ||||||
|  | 			&api.Service{Status: api.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip2", ""}, {"ip3", ""}, {"ip5", ""}})}}, | ||||||
|  | 			"foo1", | ||||||
|  | 			true, | ||||||
|  | 			buildServiceStatus([][]string{{"ip2", ""}, {"ip3", ""}, {"ip5", ""}}), | ||||||
|  | 		}, | ||||||
|  | 	} | ||||||
|  | 	for _, test := range tests { | ||||||
|  | 		result := cc.processServiceUpdate(test.cachedService, test.service, test.clusterName) | ||||||
|  | 		if test.expectNeedUpdate != result { | ||||||
|  | 			t.Errorf("Test failed for %s, expected %v, saw %v", test.name, test.expectNeedUpdate, result) | ||||||
|  | 		} | ||||||
|  | 		if !reflect.DeepEqual(test.expectStatus, test.cachedService.lastState.Status.LoadBalancer) { | ||||||
|  | 			t.Errorf("Test failed for %s, expected %v, saw %v", test.name, test.expectStatus, test.cachedService.lastState.Status.LoadBalancer) | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func TestProcessServiceDeletion(t *testing.T) { | ||||||
|  | 	cc := clusterClientCache{ | ||||||
|  | 		clientMap: make(map[string]*clusterCache), | ||||||
|  | 	} | ||||||
|  | 	tests := []struct { | ||||||
|  | 		name             string | ||||||
|  | 		cachedService    *cachedService | ||||||
|  | 		service          *api.Service | ||||||
|  | 		clusterName      string | ||||||
|  | 		expectNeedUpdate bool | ||||||
|  | 		expectStatus     api.LoadBalancerStatus | ||||||
|  | 	}{ | ||||||
|  | 		{ | ||||||
|  | 			"same-ingress", | ||||||
|  | 			&cachedService{ | ||||||
|  | 				lastState: &api.Service{Status: api.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip1", ""}})}}, | ||||||
|  | 				serviceStatusMap: map[string]api.LoadBalancerStatus{ | ||||||
|  | 					"foo1": {Ingress: []api.LoadBalancerIngress{{"ip1", ""}}}, | ||||||
|  | 				}, | ||||||
|  | 			}, | ||||||
|  | 			&api.Service{Status: api.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip1", ""}})}}, | ||||||
|  | 			"foo1", | ||||||
|  | 			true, | ||||||
|  | 			buildServiceStatus([][]string{}), | ||||||
|  | 		}, | ||||||
|  | 		{ | ||||||
|  | 			"diff-ingress", | ||||||
|  | 			&cachedService{ | ||||||
|  | 				lastState: &api.Service{Status: api.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip4", ""}, {"ip1", ""}, {"ip2", ""}, {"ip3", ""}, {"ip5", ""}, {"ip6", ""}, {"ip8", ""}})}}, | ||||||
|  | 				serviceStatusMap: map[string]api.LoadBalancerStatus{ | ||||||
|  | 					"foo1": buildServiceStatus([][]string{{"ip1", ""}, {"ip2", ""}, {"ip3", ""}}), | ||||||
|  | 					"foo2": buildServiceStatus([][]string{{"ip5", ""}, {"ip6", ""}, {"ip8", ""}}), | ||||||
|  | 				}, | ||||||
|  | 			}, | ||||||
|  | 			&api.Service{Status: api.ServiceStatus{LoadBalancer: buildServiceStatus([][]string{{"ip1", ""}, {"ip2", ""}, {"ip3", ""}})}}, | ||||||
|  | 			"foo1", | ||||||
|  | 			true, | ||||||
|  | 			buildServiceStatus([][]string{{"ip4", ""}, {"ip5", ""}, {"ip6", ""}, {"ip8", ""}}), | ||||||
|  | 		}, | ||||||
|  | 	} | ||||||
|  | 	for _, test := range tests { | ||||||
|  | 		result := cc.processServiceDeletion(test.cachedService, test.clusterName) | ||||||
|  | 		if test.expectNeedUpdate != result { | ||||||
|  | 			t.Errorf("Test failed for %s, expected %v, saw %v", test.name, test.expectNeedUpdate, result) | ||||||
|  | 		} | ||||||
|  | 		if !reflect.DeepEqual(test.expectStatus, test.cachedService.lastState.Status.LoadBalancer) { | ||||||
|  | 			t.Errorf("Test failed for %s, expected %+v, saw %+v", test.name, test.expectStatus, test.cachedService.lastState.Status.LoadBalancer) | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | } | ||||||
| @@ -0,0 +1,874 @@ | |||||||
|  | /* | ||||||
|  | Copyright 2016 The Kubernetes Authors All rights reserved. | ||||||
|  |  | ||||||
|  | Licensed under the Apache License, Version 2.0 (the "License"); | ||||||
|  | you may not use this file except in compliance with the License. | ||||||
|  | You may obtain a copy of the License at | ||||||
|  |  | ||||||
|  |     http://www.apache.org/licenses/LICENSE-2.0 | ||||||
|  |  | ||||||
|  | Unless required by applicable law or agreed to in writing, software | ||||||
|  | distributed under the License is distributed on an "AS IS" BASIS, | ||||||
|  | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||||
|  | See the License for the specific language governing permissions and | ||||||
|  | limitations under the License. | ||||||
|  | */ | ||||||
|  |  | ||||||
|  | package service | ||||||
|  |  | ||||||
|  | import ( | ||||||
|  | 	"fmt" | ||||||
|  | 	"sync" | ||||||
|  | 	"time" | ||||||
|  |  | ||||||
|  | 	"reflect" | ||||||
|  |  | ||||||
|  | 	"github.com/golang/glog" | ||||||
|  | 	federation "k8s.io/kubernetes/federation/apis/federation" | ||||||
|  | 	federationcache "k8s.io/kubernetes/federation/client/cache" | ||||||
|  | 	federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_internalclientset" | ||||||
|  | 	"k8s.io/kubernetes/federation/pkg/dnsprovider" | ||||||
|  | 	"k8s.io/kubernetes/pkg/api" | ||||||
|  | 	"k8s.io/kubernetes/pkg/api/errors" | ||||||
|  | 	cache "k8s.io/kubernetes/pkg/client/cache" | ||||||
|  | 	clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" | ||||||
|  | 	"k8s.io/kubernetes/pkg/client/record" | ||||||
|  | 	"k8s.io/kubernetes/pkg/controller" | ||||||
|  | 	"k8s.io/kubernetes/pkg/controller/framework" | ||||||
|  | 	pkg_runtime "k8s.io/kubernetes/pkg/runtime" | ||||||
|  | 	"k8s.io/kubernetes/pkg/util/runtime" | ||||||
|  | 	"k8s.io/kubernetes/pkg/util/sets" | ||||||
|  | 	"k8s.io/kubernetes/pkg/util/wait" | ||||||
|  | 	"k8s.io/kubernetes/pkg/util/workqueue" | ||||||
|  | 	"k8s.io/kubernetes/pkg/watch" | ||||||
|  |  | ||||||
|  | 	"k8s.io/kubernetes/pkg/conversion" | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | const ( | ||||||
|  | 	// TODO update to 10 mins before merge | ||||||
|  | 	serviceSyncPeriod = 30 * time.Second | ||||||
|  | 	clusterSyncPeriod = 100 * time.Second | ||||||
|  |  | ||||||
|  | 	// How long to wait before retrying the processing of a service change. | ||||||
|  | 	// If this changes, the sleep in hack/jenkins/e2e.sh before downing a cluster | ||||||
|  | 	// should be changed appropriately. | ||||||
|  | 	minRetryDelay = 5 * time.Second | ||||||
|  | 	maxRetryDelay = 300 * time.Second | ||||||
|  |  | ||||||
|  | 	// client retry count and interval is when accessing a remote kube-apiserver or federation apiserver | ||||||
|  | 	// how many times should be attempted and how long it should sleep when failure occurs | ||||||
|  | 	// the retry should be in short time so no exponential backoff | ||||||
|  | 	clientRetryCount = 5 | ||||||
|  |  | ||||||
|  | 	retryable = true | ||||||
|  |  | ||||||
|  | 	doNotRetry = time.Duration(0) | ||||||
|  |  | ||||||
|  | 	UserAgentName = "federation-service-controller" | ||||||
|  | 	KubeAPIQPS    = 20.0 | ||||||
|  | 	KubeAPIBurst  = 30 | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | type cachedService struct { | ||||||
|  | 	lastState *api.Service | ||||||
|  | 	// The state as successfully applied to the DNS server | ||||||
|  | 	appliedState *api.Service | ||||||
|  | 	// cluster endpoint map hold subset info from kubernetes clusters | ||||||
|  | 	// key clusterName | ||||||
|  | 	// value is a flag that if there is ready address, 1 means there is ready address, 0 means no ready address | ||||||
|  | 	endpointMap map[string]int | ||||||
|  | 	// cluster service map hold serivice status info from kubernetes clusters | ||||||
|  | 	// key clusterName | ||||||
|  |  | ||||||
|  | 	serviceStatusMap map[string]api.LoadBalancerStatus | ||||||
|  |  | ||||||
|  | 	// Ensures only one goroutine can operate on this service at any given time. | ||||||
|  | 	rwlock sync.Mutex | ||||||
|  |  | ||||||
|  | 	// Controls error back-off for procceeding federation service to k8s clusters | ||||||
|  | 	lastRetryDelay time.Duration | ||||||
|  | 	// Controls error back-off for updating federation service back to federation apiserver | ||||||
|  | 	lastFedUpdateDelay time.Duration | ||||||
|  | 	// Controls error back-off for dns record update | ||||||
|  | 	lastDNSUpdateDelay time.Duration | ||||||
|  | } | ||||||
|  |  | ||||||
|  | type serviceCache struct { | ||||||
|  | 	rwlock sync.Mutex // protects serviceMap | ||||||
|  | 	// federation service map contains all service received from federation apiserver | ||||||
|  | 	// key serviceName | ||||||
|  | 	fedServiceMap map[string]*cachedService | ||||||
|  | } | ||||||
|  |  | ||||||
|  | type ServiceController struct { | ||||||
|  | 	dns              dnsprovider.Interface | ||||||
|  | 	federationClient federationclientset.Interface | ||||||
|  | 	zones            []dnsprovider.Zone | ||||||
|  | 	serviceCache     *serviceCache | ||||||
|  | 	clusterCache     *clusterClientCache | ||||||
|  | 	// A store of services, populated by the serviceController | ||||||
|  | 	serviceStore cache.StoreToServiceLister | ||||||
|  | 	// Watches changes to all services | ||||||
|  | 	serviceController *framework.Controller | ||||||
|  | 	// A store of services, populated by the serviceController | ||||||
|  | 	clusterStore federationcache.StoreToClusterLister | ||||||
|  | 	// Watches changes to all services | ||||||
|  | 	clusterController *framework.Controller | ||||||
|  | 	eventBroadcaster  record.EventBroadcaster | ||||||
|  | 	eventRecorder     record.EventRecorder | ||||||
|  | 	// services that need to be synced | ||||||
|  | 	queue           *workqueue.Type | ||||||
|  | 	knownClusterSet sets.String | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // New returns a new service controller to keep DNS provider service resources | ||||||
|  | // (like Kubernetes Services and DNS server records for service discovery) in sync with the registry. | ||||||
|  |  | ||||||
|  | func New(federationClient federationclientset.Interface, dns dnsprovider.Interface) *ServiceController { | ||||||
|  | 	broadcaster := record.NewBroadcaster() | ||||||
|  | 	// federationClient event is not supported yet | ||||||
|  | 	// broadcaster.StartRecordingToSink(&unversioned_core.EventSinkImpl{Interface: kubeClient.Core().Events("")}) | ||||||
|  | 	recorder := broadcaster.NewRecorder(api.EventSource{Component: UserAgentName}) | ||||||
|  |  | ||||||
|  | 	s := &ServiceController{ | ||||||
|  | 		dns:              dns, | ||||||
|  | 		federationClient: federationClient, | ||||||
|  | 		serviceCache:     &serviceCache{fedServiceMap: make(map[string]*cachedService)}, | ||||||
|  | 		clusterCache: &clusterClientCache{ | ||||||
|  | 			rwlock:    sync.Mutex{}, | ||||||
|  | 			clientMap: make(map[string]*clusterCache), | ||||||
|  | 		}, | ||||||
|  | 		eventBroadcaster: broadcaster, | ||||||
|  | 		eventRecorder:    recorder, | ||||||
|  | 		queue:            workqueue.New(), | ||||||
|  | 		knownClusterSet:  make(sets.String), | ||||||
|  | 	} | ||||||
|  | 	s.serviceStore.Store, s.serviceController = framework.NewInformer( | ||||||
|  | 		&cache.ListWatch{ | ||||||
|  | 			ListFunc: func(options api.ListOptions) (pkg_runtime.Object, error) { | ||||||
|  | 				return s.federationClient.Core().Services(api.NamespaceAll).List(options) | ||||||
|  | 			}, | ||||||
|  | 			WatchFunc: func(options api.ListOptions) (watch.Interface, error) { | ||||||
|  | 				return s.federationClient.Core().Services(api.NamespaceAll).Watch(options) | ||||||
|  | 			}, | ||||||
|  | 		}, | ||||||
|  | 		&api.Service{}, | ||||||
|  | 		serviceSyncPeriod, | ||||||
|  | 		framework.ResourceEventHandlerFuncs{ | ||||||
|  | 			AddFunc: s.enqueueService, | ||||||
|  | 			UpdateFunc: func(old, cur interface{}) { | ||||||
|  | 				// there is case that old and new are equals but we still catch the event now. | ||||||
|  | 				if !reflect.DeepEqual(old, cur) { | ||||||
|  | 					s.enqueueService(cur) | ||||||
|  | 				} | ||||||
|  | 			}, | ||||||
|  | 			DeleteFunc: s.enqueueService, | ||||||
|  | 		}, | ||||||
|  | 	) | ||||||
|  | 	s.clusterStore.Store, s.clusterController = framework.NewInformer( | ||||||
|  | 		&cache.ListWatch{ | ||||||
|  | 			ListFunc: func(options api.ListOptions) (pkg_runtime.Object, error) { | ||||||
|  | 				return s.federationClient.Federation().Clusters().List(options) | ||||||
|  | 			}, | ||||||
|  | 			WatchFunc: func(options api.ListOptions) (watch.Interface, error) { | ||||||
|  | 				return s.federationClient.Federation().Clusters().Watch(options) | ||||||
|  | 			}, | ||||||
|  | 		}, | ||||||
|  | 		&federation.Cluster{}, | ||||||
|  | 		clusterSyncPeriod, | ||||||
|  | 		framework.ResourceEventHandlerFuncs{ | ||||||
|  | 			DeleteFunc: s.clusterCache.delFromClusterSet, | ||||||
|  | 			AddFunc:    s.clusterCache.addToClientMap, | ||||||
|  | 			UpdateFunc: func(old, cur interface{}) { | ||||||
|  | 				oldCluster, ok := old.(*federation.Cluster) | ||||||
|  | 				if !ok { | ||||||
|  | 					return | ||||||
|  | 				} | ||||||
|  | 				curCluster, ok := cur.(*federation.Cluster) | ||||||
|  | 				if !ok { | ||||||
|  | 					return | ||||||
|  | 				} | ||||||
|  | 				if !reflect.DeepEqual(oldCluster.Spec, curCluster.Spec) { | ||||||
|  | 					// update when spec is changed | ||||||
|  | 					s.clusterCache.addToClientMap(cur) | ||||||
|  | 				} | ||||||
|  |  | ||||||
|  | 				pred := getClusterConditionPredicate() | ||||||
|  | 				// only update when condition changed to ready from not-ready | ||||||
|  | 				if !pred(*oldCluster) && pred(*curCluster) { | ||||||
|  | 					s.clusterCache.addToClientMap(cur) | ||||||
|  | 				} | ||||||
|  | 				// did not handle ready -> not-ready | ||||||
|  | 				// how could we stop a controller? | ||||||
|  | 			}, | ||||||
|  | 		}, | ||||||
|  | 	) | ||||||
|  | 	return s | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // obj could be an *api.Service, or a DeletionFinalStateUnknown marker item. | ||||||
|  | func (s *ServiceController) enqueueService(obj interface{}) { | ||||||
|  | 	key, err := controller.KeyFunc(obj) | ||||||
|  | 	if err != nil { | ||||||
|  | 		glog.Errorf("Couldn't get key for object %+v: %v", obj, err) | ||||||
|  | 		return | ||||||
|  | 	} | ||||||
|  | 	s.queue.Add(key) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // Run starts a background goroutine that watches for changes to federation services | ||||||
|  | // and ensures that they have Kubernetes services created, updated or deleted appropriately. | ||||||
|  | // federationSyncPeriod controls how often we check the federation's services to | ||||||
|  | // ensure that the correct Kubernetes services (and associated DNS entries) exist. | ||||||
|  | // This is only necessary to fudge over failed watches. | ||||||
|  | // clusterSyncPeriod controls how often we check the federation's underlying clusters and | ||||||
|  | // their Kubernetes services to ensure that matching services created independently of the Federation | ||||||
|  | // (e.g. directly via the underlying cluster's API) are correctly accounted for. | ||||||
|  |  | ||||||
|  | // It's an error to call Run() more than once for a given ServiceController | ||||||
|  | // object. | ||||||
|  | func (s *ServiceController) Run(workers int, stopCh <-chan struct{}) error { | ||||||
|  | 	defer runtime.HandleCrash() | ||||||
|  | 	go s.serviceController.Run(stopCh) | ||||||
|  | 	go s.clusterController.Run(stopCh) | ||||||
|  | 	for i := 0; i < workers; i++ { | ||||||
|  | 		go wait.Until(s.fedServiceWorker, time.Second, stopCh) | ||||||
|  | 	} | ||||||
|  | 	go wait.Until(s.clusterEndpointWorker, time.Second, stopCh) | ||||||
|  | 	go wait.Until(s.clusterServiceWorker, time.Second, stopCh) | ||||||
|  | 	go wait.Until(s.clusterSyncLoop, clusterSyncPeriod, stopCh) | ||||||
|  | 	<-stopCh | ||||||
|  | 	glog.Infof("Shutting down Federation Service Controller") | ||||||
|  | 	s.queue.ShutDown() | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // fedServiceWorker runs a worker thread that just dequeues items, processes them, and marks them done. | ||||||
|  | // It enforces that the syncService is never invoked concurrently with the same key. | ||||||
|  | func (s *ServiceController) fedServiceWorker() { | ||||||
|  | 	for { | ||||||
|  | 		func() { | ||||||
|  | 			key, quit := s.queue.Get() | ||||||
|  | 			if quit { | ||||||
|  | 				return | ||||||
|  | 			} | ||||||
|  |  | ||||||
|  | 			defer s.queue.Done(key) | ||||||
|  | 			err := s.syncService(key.(string)) | ||||||
|  | 			if err != nil { | ||||||
|  | 				glog.Errorf("Error syncing service: %v", err) | ||||||
|  | 			} | ||||||
|  | 		}() | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func wantsDNSRecords(service *api.Service) bool { | ||||||
|  | 	return service.Spec.Type == api.ServiceTypeLoadBalancer | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // processServiceForCluster creates or updates service to all registered running clusters, | ||||||
|  | // update DNS records and update the service info with DNS entries to federation apiserver. | ||||||
|  | // the function returns any error caught | ||||||
|  | func (s *ServiceController) processServiceForCluster(cachedService *cachedService, clusterName string, service *api.Service, client *clientset.Clientset) error { | ||||||
|  | 	glog.V(4).Infof("Process service %s/%s for cluster %s", service.Namespace, service.Name, clusterName) | ||||||
|  | 	// Create or Update k8s Service | ||||||
|  | 	err := s.ensureClusterService(cachedService, clusterName, service, client) | ||||||
|  | 	if err != nil { | ||||||
|  | 		glog.V(4).Infof("Failed to process service %s/%s for cluster %s", service.Namespace, service.Name, clusterName) | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  | 	glog.V(4).Infof("Successfully process service %s/%s for cluster %s", service.Namespace, service.Name, clusterName) | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // updateFederationService Returns whatever error occurred along with a boolean indicator of whether it | ||||||
|  | // should be retried. | ||||||
|  | func (s *ServiceController) updateFederationService(key string, cachedService *cachedService) (error, bool) { | ||||||
|  | 	// Clone federation service, and create them in underlying k8s cluster | ||||||
|  | 	clone, err := conversion.NewCloner().DeepCopy(cachedService.lastState) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return err, !retryable | ||||||
|  | 	} | ||||||
|  | 	service, ok := clone.(*api.Service) | ||||||
|  | 	if !ok { | ||||||
|  | 		return fmt.Errorf("Unexpected service cast error : %v\n", service), !retryable | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	// handle available clusters one by one | ||||||
|  | 	var hasErr bool | ||||||
|  | 	for clusterName, cache := range s.clusterCache.clientMap { | ||||||
|  | 		go func(cache *clusterCache, clusterName string) { | ||||||
|  | 			err = s.processServiceForCluster(cachedService, clusterName, service, cache.clientset) | ||||||
|  | 			if err != nil { | ||||||
|  | 				hasErr = true | ||||||
|  | 			} | ||||||
|  | 		}(cache, clusterName) | ||||||
|  | 	} | ||||||
|  | 	if hasErr { | ||||||
|  | 		// detail error has been dumpped inside the loop | ||||||
|  | 		return fmt.Errorf("Service %s/%s was not successfully updated to all clusters", service.Namespace, service.Name), retryable | ||||||
|  | 	} | ||||||
|  | 	return nil, !retryable | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (s *ServiceController) deleteFederationService(cachedService *cachedService) (error, bool) { | ||||||
|  | 	// handle available clusters one by one | ||||||
|  | 	var hasErr bool | ||||||
|  | 	for clusterName, cluster := range s.clusterCache.clientMap { | ||||||
|  | 		err := s.deleteClusterService(clusterName, cachedService, cluster.clientset) | ||||||
|  | 		if err != nil { | ||||||
|  | 			hasErr = true | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 	if hasErr { | ||||||
|  | 		// detail error has been dumpped inside the loop | ||||||
|  | 		return fmt.Errorf("Service %s/%s was not successfully updated to all clusters", cachedService.lastState.Namespace, cachedService.lastState.Name), retryable | ||||||
|  | 	} | ||||||
|  | 	return nil, !retryable | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (s *ServiceController) deleteClusterService(clusterName string, cachedService *cachedService, clientset *clientset.Clientset) error { | ||||||
|  | 	service := cachedService.lastState | ||||||
|  | 	glog.V(4).Infof("Deleting service %s/%s from cluster %s", service.Namespace, service.Name, clusterName) | ||||||
|  | 	var err error | ||||||
|  | 	for i := 0; i < clientRetryCount; i++ { | ||||||
|  | 		err = clientset.Core().Services(service.Namespace).Delete(service.Name, &api.DeleteOptions{}) | ||||||
|  | 		if err == nil || errors.IsNotFound(err) { | ||||||
|  | 			glog.V(4).Infof("Service %s/%s deleted from cluster %s", service.Namespace, service.Name, clusterName) | ||||||
|  | 			return nil | ||||||
|  | 		} | ||||||
|  | 		time.Sleep(cachedService.nextRetryDelay()) | ||||||
|  | 	} | ||||||
|  | 	glog.V(4).Infof("Failed to delete service %s/%s from cluster %s, %+v", service.Namespace, service.Name, clusterName, err) | ||||||
|  | 	return err | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (s *ServiceController) ensureClusterService(cachedService *cachedService, clusterName string, service *api.Service, client *clientset.Clientset) error { | ||||||
|  | 	var err error | ||||||
|  | 	var needUpdate bool | ||||||
|  | 	for i := 0; i < clientRetryCount; i++ { | ||||||
|  | 		svc, err := client.Core().Services(service.Namespace).Get(service.Name) | ||||||
|  | 		if err == nil { | ||||||
|  | 			// service exists | ||||||
|  | 			glog.V(5).Infof("Found service %s/%s from cluster %s", service.Namespace, service.Name, clusterName) | ||||||
|  | 			//reserve immutable fields | ||||||
|  | 			service.Spec.ClusterIP = svc.Spec.ClusterIP | ||||||
|  |  | ||||||
|  | 			//reserve auto assigned field | ||||||
|  | 			for i, oldPort := range svc.Spec.Ports { | ||||||
|  | 				for _, port := range service.Spec.Ports { | ||||||
|  | 					if port.NodePort == 0 { | ||||||
|  | 						if !portEqualExcludeNodePort(&oldPort, &port) { | ||||||
|  | 							svc.Spec.Ports[i] = port | ||||||
|  | 							needUpdate = true | ||||||
|  | 						} | ||||||
|  | 					} else { | ||||||
|  | 						if !portEqualForLB(&oldPort, &port) { | ||||||
|  | 							svc.Spec.Ports[i] = port | ||||||
|  | 							needUpdate = true | ||||||
|  | 						} | ||||||
|  | 					} | ||||||
|  | 				} | ||||||
|  | 			} | ||||||
|  |  | ||||||
|  | 			if needUpdate { | ||||||
|  | 				// we only apply spec update | ||||||
|  | 				svc.Spec = service.Spec | ||||||
|  | 				_, err = client.Core().Services(svc.Namespace).Update(svc) | ||||||
|  | 				if err == nil { | ||||||
|  | 					glog.V(5).Infof("Service %s/%s successfully updated to cluster %s", svc.Namespace, svc.Name, clusterName) | ||||||
|  | 					return nil | ||||||
|  | 				} else { | ||||||
|  | 					glog.V(4).Infof("Failed to update %+v", err) | ||||||
|  | 				} | ||||||
|  | 			} else { | ||||||
|  | 				glog.V(5).Infof("Service %s/%s is not updated to cluster %s as the spec are identical", svc.Namespace, svc.Name, clusterName) | ||||||
|  | 				return nil | ||||||
|  | 			} | ||||||
|  | 		} else if errors.IsNotFound(err) { | ||||||
|  | 			// Create service if it is not found | ||||||
|  | 			glog.Infof("Service '%s/%s' is not found in cluster %s, trying to create new", | ||||||
|  | 				service.Namespace, service.Name, clusterName) | ||||||
|  | 			service.ResourceVersion = "" | ||||||
|  | 			_, err = client.Core().Services(service.Namespace).Create(service) | ||||||
|  | 			if err == nil { | ||||||
|  | 				glog.V(5).Infof("Service %s/%s successfully created to cluster %s", service.Namespace, service.Name, clusterName) | ||||||
|  | 				return nil | ||||||
|  | 			} | ||||||
|  | 			glog.V(4).Infof("Failed to create %+v", err) | ||||||
|  | 			if errors.IsAlreadyExists(err) { | ||||||
|  | 				glog.V(5).Infof("service %s/%s already exists in cluster %s", service.Namespace, service.Name, clusterName) | ||||||
|  | 				return nil | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 		if errors.IsConflict(err) { | ||||||
|  | 			glog.V(4).Infof("Not persisting update to service '%s/%s' that has been changed since we received it: %v", | ||||||
|  | 				service.Namespace, service.Name, err) | ||||||
|  | 		} | ||||||
|  | 		// should we reuse same retry delay for all clusters? | ||||||
|  | 		time.Sleep(cachedService.nextRetryDelay()) | ||||||
|  | 	} | ||||||
|  | 	return err | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (s *serviceCache) allServices() []*cachedService { | ||||||
|  | 	s.rwlock.Lock() | ||||||
|  | 	defer s.rwlock.Unlock() | ||||||
|  | 	services := make([]*cachedService, 0, len(s.fedServiceMap)) | ||||||
|  | 	for _, v := range s.fedServiceMap { | ||||||
|  | 		services = append(services, v) | ||||||
|  | 	} | ||||||
|  | 	return services | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (s *serviceCache) get(serviceName string) (*cachedService, bool) { | ||||||
|  | 	s.rwlock.Lock() | ||||||
|  | 	defer s.rwlock.Unlock() | ||||||
|  | 	service, ok := s.fedServiceMap[serviceName] | ||||||
|  | 	return service, ok | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (s *serviceCache) getOrCreate(serviceName string) *cachedService { | ||||||
|  | 	s.rwlock.Lock() | ||||||
|  | 	defer s.rwlock.Unlock() | ||||||
|  | 	service, ok := s.fedServiceMap[serviceName] | ||||||
|  | 	if !ok { | ||||||
|  | 		service = &cachedService{ | ||||||
|  | 			endpointMap:      make(map[string]int), | ||||||
|  | 			serviceStatusMap: make(map[string]api.LoadBalancerStatus), | ||||||
|  | 		} | ||||||
|  | 		s.fedServiceMap[serviceName] = service | ||||||
|  | 	} | ||||||
|  | 	return service | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (s *serviceCache) set(serviceName string, service *cachedService) { | ||||||
|  | 	s.rwlock.Lock() | ||||||
|  | 	defer s.rwlock.Unlock() | ||||||
|  | 	s.fedServiceMap[serviceName] = service | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (s *serviceCache) delete(serviceName string) { | ||||||
|  | 	s.rwlock.Lock() | ||||||
|  | 	defer s.rwlock.Unlock() | ||||||
|  | 	delete(s.fedServiceMap, serviceName) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // needsUpdateDNS check if the dns records of the given service should be updated | ||||||
|  | func (s *ServiceController) needsUpdateDNS(oldService *api.Service, newService *api.Service) bool { | ||||||
|  | 	if !wantsDNSRecords(oldService) && !wantsDNSRecords(newService) { | ||||||
|  | 		return false | ||||||
|  | 	} | ||||||
|  | 	if wantsDNSRecords(oldService) != wantsDNSRecords(newService) { | ||||||
|  | 		s.eventRecorder.Eventf(newService, api.EventTypeNormal, "Type", "%v -> %v", | ||||||
|  | 			oldService.Spec.Type, newService.Spec.Type) | ||||||
|  | 		return true | ||||||
|  | 	} | ||||||
|  | 	if !portsEqualForLB(oldService, newService) || oldService.Spec.SessionAffinity != newService.Spec.SessionAffinity { | ||||||
|  | 		return true | ||||||
|  | 	} | ||||||
|  | 	if !LoadBalancerIPsAreEqual(oldService, newService) { | ||||||
|  | 		s.eventRecorder.Eventf(newService, api.EventTypeNormal, "LoadbalancerIP", "%v -> %v", | ||||||
|  | 			oldService.Spec.LoadBalancerIP, newService.Spec.LoadBalancerIP) | ||||||
|  | 		return true | ||||||
|  | 	} | ||||||
|  | 	if len(oldService.Spec.ExternalIPs) != len(newService.Spec.ExternalIPs) { | ||||||
|  | 		s.eventRecorder.Eventf(newService, api.EventTypeNormal, "ExternalIP", "Count: %v -> %v", | ||||||
|  | 			len(oldService.Spec.ExternalIPs), len(newService.Spec.ExternalIPs)) | ||||||
|  | 		return true | ||||||
|  | 	} | ||||||
|  | 	for i := range oldService.Spec.ExternalIPs { | ||||||
|  | 		if oldService.Spec.ExternalIPs[i] != newService.Spec.ExternalIPs[i] { | ||||||
|  | 			s.eventRecorder.Eventf(newService, api.EventTypeNormal, "ExternalIP", "Added: %v", | ||||||
|  | 				newService.Spec.ExternalIPs[i]) | ||||||
|  | 			return true | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 	if !reflect.DeepEqual(oldService.Annotations, newService.Annotations) { | ||||||
|  | 		return true | ||||||
|  | 	} | ||||||
|  | 	if oldService.UID != newService.UID { | ||||||
|  | 		s.eventRecorder.Eventf(newService, api.EventTypeNormal, "UID", "%v -> %v", | ||||||
|  | 			oldService.UID, newService.UID) | ||||||
|  | 		return true | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	return false | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func getPortsForLB(service *api.Service) ([]*api.ServicePort, error) { | ||||||
|  | 	// TODO: quinton: Probably applies for DNS SVC records.  Come back to this. | ||||||
|  | 	//var protocol api.Protocol | ||||||
|  |  | ||||||
|  | 	ports := []*api.ServicePort{} | ||||||
|  | 	for i := range service.Spec.Ports { | ||||||
|  | 		sp := &service.Spec.Ports[i] | ||||||
|  | 		// The check on protocol was removed here.  The DNS provider itself is now responsible for all protocol validation | ||||||
|  | 		ports = append(ports, sp) | ||||||
|  | 	} | ||||||
|  | 	return ports, nil | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func portsEqualForLB(x, y *api.Service) bool { | ||||||
|  | 	xPorts, err := getPortsForLB(x) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return false | ||||||
|  | 	} | ||||||
|  | 	yPorts, err := getPortsForLB(y) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return false | ||||||
|  | 	} | ||||||
|  | 	return portSlicesEqualForLB(xPorts, yPorts) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func portSlicesEqualForLB(x, y []*api.ServicePort) bool { | ||||||
|  | 	if len(x) != len(y) { | ||||||
|  | 		return false | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	for i := range x { | ||||||
|  | 		if !portEqualForLB(x[i], y[i]) { | ||||||
|  | 			return false | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 	return true | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func portEqualForLB(x, y *api.ServicePort) bool { | ||||||
|  | 	// TODO: Should we check name?  (In theory, an LB could expose it) | ||||||
|  | 	if x.Name != y.Name { | ||||||
|  | 		return false | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	if x.Protocol != y.Protocol { | ||||||
|  | 		return false | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	if x.Port != y.Port { | ||||||
|  | 		return false | ||||||
|  | 	} | ||||||
|  | 	if x.NodePort != y.NodePort { | ||||||
|  | 		return false | ||||||
|  | 	} | ||||||
|  | 	return true | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func portEqualExcludeNodePort(x, y *api.ServicePort) bool { | ||||||
|  | 	// TODO: Should we check name?  (In theory, an LB could expose it) | ||||||
|  | 	if x.Name != y.Name { | ||||||
|  | 		return false | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	if x.Protocol != y.Protocol { | ||||||
|  | 		return false | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	if x.Port != y.Port { | ||||||
|  | 		return false | ||||||
|  | 	} | ||||||
|  | 	return true | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func clustersFromList(list *federation.ClusterList) []string { | ||||||
|  | 	result := []string{} | ||||||
|  | 	for ix := range list.Items { | ||||||
|  | 		result = append(result, list.Items[ix].Name) | ||||||
|  | 	} | ||||||
|  | 	return result | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // getClusterConditionPredicate filter all clusters meet condition of | ||||||
|  | // condition.type=Ready and condition.status=true | ||||||
|  | func getClusterConditionPredicate() federationcache.ClusterConditionPredicate { | ||||||
|  | 	return func(cluster federation.Cluster) bool { | ||||||
|  | 		// If we have no info, don't accept | ||||||
|  | 		if len(cluster.Status.Conditions) == 0 { | ||||||
|  | 			return false | ||||||
|  | 		} | ||||||
|  | 		for _, cond := range cluster.Status.Conditions { | ||||||
|  | 			//We consider the cluster for load balancing only when its ClusterReady condition status | ||||||
|  | 			//is ConditionTrue | ||||||
|  | 			if cond.Type == federation.ClusterReady && cond.Status != api.ConditionTrue { | ||||||
|  | 				glog.V(4).Infof("Ignoring cluser %v with %v condition status %v", cluster.Name, cond.Type, cond.Status) | ||||||
|  | 				return false | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 		return true | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // clusterSyncLoop observes running clusters changes, and apply all services to new added cluster | ||||||
|  | // and add dns records for the changes | ||||||
|  | func (s *ServiceController) clusterSyncLoop() { | ||||||
|  | 	var servicesToUpdate []*cachedService | ||||||
|  | 	// should we remove cache for cluster from ready to not ready? should remove the condition predicate if no | ||||||
|  | 	clusters, err := s.clusterStore.ClusterCondition(getClusterConditionPredicate()).List() | ||||||
|  | 	if err != nil { | ||||||
|  | 		glog.Infof("Fail to get cluster list") | ||||||
|  | 		return | ||||||
|  | 	} | ||||||
|  | 	newClusters := clustersFromList(&clusters) | ||||||
|  | 	var newSet, increase sets.String | ||||||
|  | 	newSet = sets.NewString(newClusters...) | ||||||
|  | 	if newSet.Equal(s.knownClusterSet) { | ||||||
|  | 		// The set of cluster names in the services in the federation hasn't changed, but we can retry | ||||||
|  | 		// updating any services that we failed to update last time around. | ||||||
|  | 		servicesToUpdate = s.updateDNSRecords(servicesToUpdate, newClusters) | ||||||
|  | 		return | ||||||
|  | 	} | ||||||
|  | 	glog.Infof("Detected change in list of cluster names. New  set: %v, Old set: %v", newSet, s.knownClusterSet) | ||||||
|  | 	increase = newSet.Difference(s.knownClusterSet) | ||||||
|  | 	// do nothing when cluster is removed. | ||||||
|  | 	if increase != nil { | ||||||
|  | 		for newCluster := range increase { | ||||||
|  | 			glog.Infof("New cluster observed %s", newCluster) | ||||||
|  | 			s.updateAllServicesToCluster(servicesToUpdate, newCluster) | ||||||
|  | 		} | ||||||
|  | 		// Try updating all services, and save the ones that fail to try again next | ||||||
|  | 		// round. | ||||||
|  | 		servicesToUpdate = s.serviceCache.allServices() | ||||||
|  | 		numServices := len(servicesToUpdate) | ||||||
|  | 		servicesToUpdate = s.updateDNSRecords(servicesToUpdate, newClusters) | ||||||
|  | 		glog.Infof("Successfully updated %d out of %d DNS records to direct traffic to the updated cluster", | ||||||
|  | 			numServices-len(servicesToUpdate), numServices) | ||||||
|  | 	} | ||||||
|  | 	s.knownClusterSet = newSet | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (s *ServiceController) updateAllServicesToCluster(services []*cachedService, clusterName string) { | ||||||
|  | 	cluster, ok := s.clusterCache.clientMap[clusterName] | ||||||
|  | 	if ok { | ||||||
|  | 		for _, cachedService := range services { | ||||||
|  | 			appliedState := cachedService.appliedState | ||||||
|  | 			s.processServiceForCluster(cachedService, clusterName, appliedState, cluster.clientset) | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (s *ServiceController) removeAllServicesFromCluster(services []*cachedService, clusterName string) { | ||||||
|  | 	client, ok := s.clusterCache.clientMap[clusterName] | ||||||
|  | 	if ok { | ||||||
|  | 		for _, cachedService := range services { | ||||||
|  | 			s.deleteClusterService(clusterName, cachedService, client.clientset) | ||||||
|  | 		} | ||||||
|  | 		glog.Infof("Synced all services to cluster %s", clusterName) | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // updateDNSRecords updates all existing federation service DNS Records so that | ||||||
|  | // they will match the list of cluster names provided. | ||||||
|  | // Returns the list of services that couldn't be updated. | ||||||
|  | func (s *ServiceController) updateDNSRecords(services []*cachedService, clusters []string) (servicesToRetry []*cachedService) { | ||||||
|  | 	for _, service := range services { | ||||||
|  | 		func() { | ||||||
|  | 			service.rwlock.Lock() | ||||||
|  | 			defer service.rwlock.Unlock() | ||||||
|  | 			// If the applied state is nil, that means it hasn't yet been successfully dealt | ||||||
|  | 			// with by the DNS Record reconciler. We can trust the DNS Record | ||||||
|  | 			// reconciler to ensure the federation service's DNS records are created to target | ||||||
|  | 			// the correct backend service IP's | ||||||
|  | 			if service.appliedState == nil { | ||||||
|  | 				return | ||||||
|  | 			} | ||||||
|  | 			if err := s.lockedUpdateDNSRecords(service, clusters); err != nil { | ||||||
|  | 				glog.Errorf("External error while updating DNS Records: %v.", err) | ||||||
|  | 				servicesToRetry = append(servicesToRetry, service) | ||||||
|  | 			} | ||||||
|  | 		}() | ||||||
|  | 	} | ||||||
|  | 	return servicesToRetry | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // lockedUpdateDNSRecords Updates the DNS records of a service, assuming we hold the mutex | ||||||
|  | // associated with the service. | ||||||
|  | // TODO: quinton: Still screwed up in the same way as above.  Fix. | ||||||
|  | func (s *ServiceController) lockedUpdateDNSRecords(service *cachedService, clusterNames []string) error { | ||||||
|  | 	if !wantsDNSRecords(service.appliedState) { | ||||||
|  | 		return nil | ||||||
|  | 	} | ||||||
|  | 	for key := range s.clusterCache.clientMap { | ||||||
|  | 		for _, clusterName := range clusterNames { | ||||||
|  | 			if key == clusterName { | ||||||
|  | 				ensureDNSRecords(clusterName, service) | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func LoadBalancerIPsAreEqual(oldService, newService *api.Service) bool { | ||||||
|  | 	return oldService.Spec.LoadBalancerIP == newService.Spec.LoadBalancerIP | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // Computes the next retry, using exponential backoff | ||||||
|  | // mutex must be held. | ||||||
|  | func (s *cachedService) nextRetryDelay() time.Duration { | ||||||
|  | 	s.lastRetryDelay = s.lastRetryDelay * 2 | ||||||
|  | 	if s.lastRetryDelay < minRetryDelay { | ||||||
|  | 		s.lastRetryDelay = minRetryDelay | ||||||
|  | 	} | ||||||
|  | 	if s.lastRetryDelay > maxRetryDelay { | ||||||
|  | 		s.lastRetryDelay = maxRetryDelay | ||||||
|  | 	} | ||||||
|  | 	return s.lastRetryDelay | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // resetRetryDelay Resets the retry exponential backoff.  mutex must be held. | ||||||
|  | func (s *cachedService) resetRetryDelay() { | ||||||
|  | 	s.lastRetryDelay = time.Duration(0) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // Computes the next retry, using exponential backoff | ||||||
|  | // mutex must be held. | ||||||
|  | func (s *cachedService) nextFedUpdateDelay() time.Duration { | ||||||
|  | 	s.lastFedUpdateDelay = s.lastFedUpdateDelay * 2 | ||||||
|  | 	if s.lastFedUpdateDelay < minRetryDelay { | ||||||
|  | 		s.lastFedUpdateDelay = minRetryDelay | ||||||
|  | 	} | ||||||
|  | 	if s.lastFedUpdateDelay > maxRetryDelay { | ||||||
|  | 		s.lastFedUpdateDelay = maxRetryDelay | ||||||
|  | 	} | ||||||
|  | 	return s.lastFedUpdateDelay | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // resetRetryDelay Resets the retry exponential backoff.  mutex must be held. | ||||||
|  | func (s *cachedService) resetFedUpdateDelay() { | ||||||
|  | 	s.lastFedUpdateDelay = time.Duration(0) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // Computes the next retry, using exponential backoff | ||||||
|  | // mutex must be held. | ||||||
|  | func (s *cachedService) nextDNSUpdateDelay() time.Duration { | ||||||
|  | 	s.lastDNSUpdateDelay = s.lastDNSUpdateDelay * 2 | ||||||
|  | 	if s.lastDNSUpdateDelay < minRetryDelay { | ||||||
|  | 		s.lastDNSUpdateDelay = minRetryDelay | ||||||
|  | 	} | ||||||
|  | 	if s.lastDNSUpdateDelay > maxRetryDelay { | ||||||
|  | 		s.lastDNSUpdateDelay = maxRetryDelay | ||||||
|  | 	} | ||||||
|  | 	return s.lastDNSUpdateDelay | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // resetRetryDelay Resets the retry exponential backoff.  mutex must be held. | ||||||
|  | func (s *cachedService) resetDNSUpdateDelay() { | ||||||
|  | 	s.lastDNSUpdateDelay = time.Duration(0) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // syncService will sync the Service with the given key if it has had its expectations fulfilled, | ||||||
|  | // meaning it did not expect to see any more of its pods created or deleted. This function is not meant to be | ||||||
|  | // invoked concurrently with the same key. | ||||||
|  | func (s *ServiceController) syncService(key string) error { | ||||||
|  | 	startTime := time.Now() | ||||||
|  | 	var cachedService *cachedService | ||||||
|  | 	var retryDelay time.Duration | ||||||
|  | 	defer func() { | ||||||
|  | 		glog.V(4).Infof("Finished syncing service %q (%v)", key, time.Now().Sub(startTime)) | ||||||
|  | 	}() | ||||||
|  | 	// obj holds the latest service info from apiserver | ||||||
|  | 	obj, exists, err := s.serviceStore.Store.GetByKey(key) | ||||||
|  | 	if err != nil { | ||||||
|  | 		glog.Infof("Unable to retrieve service %v from store: %v", key, err) | ||||||
|  | 		s.queue.Add(key) | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	if !exists { | ||||||
|  | 		// service absence in store means watcher caught the deletion, ensure LB info is cleaned | ||||||
|  | 		glog.Infof("Service has been deleted %v", key) | ||||||
|  | 		err, retryDelay = s.processServiceDeletion(key) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	if exists { | ||||||
|  | 		service, ok := obj.(*api.Service) | ||||||
|  | 		if ok { | ||||||
|  | 			cachedService = s.serviceCache.getOrCreate(key) | ||||||
|  | 			err, retryDelay = s.processServiceUpdate(cachedService, service, key) | ||||||
|  | 		} else { | ||||||
|  | 			tombstone, ok := obj.(cache.DeletedFinalStateUnknown) | ||||||
|  | 			if !ok { | ||||||
|  | 				return fmt.Errorf("Object contained wasn't a service or a deleted key: %+v", obj) | ||||||
|  | 			} | ||||||
|  | 			glog.Infof("Found tombstone for %v", key) | ||||||
|  | 			err, retryDelay = s.processServiceDeletion(tombstone.Key) | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	if retryDelay != 0 { | ||||||
|  | 		s.enqueueService(obj) | ||||||
|  | 	} else if err != nil { | ||||||
|  | 		runtime.HandleError(fmt.Errorf("Failed to process service. Not retrying: %v", err)) | ||||||
|  | 	} | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // processServiceUpdate returns an error if processing the service update failed, along with a time.Duration | ||||||
|  | // indicating whether processing should be retried; zero means no-retry; otherwise | ||||||
|  | // we should retry in that Duration. | ||||||
|  | func (s *ServiceController) processServiceUpdate(cachedService *cachedService, service *api.Service, key string) (error, time.Duration) { | ||||||
|  | 	// Ensure that no other goroutine will interfere with our processing of the | ||||||
|  | 	// service. | ||||||
|  | 	cachedService.rwlock.Lock() | ||||||
|  | 	defer cachedService.rwlock.Unlock() | ||||||
|  |  | ||||||
|  | 	// Update the cached service (used above for populating synthetic deletes) | ||||||
|  | 	// alway trust service, which is retrieve from serviceStore, which keeps the latest service info getting from apiserver | ||||||
|  | 	// if the same service is changed before this go routine finished, there will be another queue entry to handle that. | ||||||
|  | 	cachedService.lastState = service | ||||||
|  | 	err, retry := s.updateFederationService(key, cachedService) | ||||||
|  | 	if err != nil { | ||||||
|  | 		message := "Error occurs when updating service to all clusters" | ||||||
|  | 		if retry { | ||||||
|  | 			message += " (will retry): " | ||||||
|  | 		} else { | ||||||
|  | 			message += " (will not retry): " | ||||||
|  | 		} | ||||||
|  | 		message += err.Error() | ||||||
|  | 		s.eventRecorder.Event(service, api.EventTypeWarning, "UpdateServiceFail", message) | ||||||
|  | 		return err, cachedService.nextRetryDelay() | ||||||
|  | 	} | ||||||
|  | 	// Always update the cache upon success. | ||||||
|  | 	// NOTE: Since we update the cached service if and only if we successfully | ||||||
|  | 	// processed it, a cached service being nil implies that it hasn't yet | ||||||
|  | 	// been successfully processed. | ||||||
|  |  | ||||||
|  | 	cachedService.appliedState = service | ||||||
|  | 	s.serviceCache.set(key, cachedService) | ||||||
|  | 	glog.V(4).Infof("Successfully procceeded services %s", key) | ||||||
|  | 	cachedService.resetRetryDelay() | ||||||
|  | 	return nil, doNotRetry | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // processServiceDeletion returns an error if processing the service deletion failed, along with a time.Duration | ||||||
|  | // indicating whether processing should be retried; zero means no-retry; otherwise | ||||||
|  | // we should retry in that Duration. | ||||||
|  | func (s *ServiceController) processServiceDeletion(key string) (error, time.Duration) { | ||||||
|  | 	glog.V(2).Infof("Process service deletion for %v", key) | ||||||
|  | 	cachedService, ok := s.serviceCache.get(key) | ||||||
|  | 	if !ok { | ||||||
|  | 		return fmt.Errorf("Service %s not in cache even though the watcher thought it was. Ignoring the deletion.", key), doNotRetry | ||||||
|  | 	} | ||||||
|  | 	service := cachedService.lastState | ||||||
|  | 	cachedService.rwlock.Lock() | ||||||
|  | 	defer cachedService.rwlock.Unlock() | ||||||
|  | 	s.eventRecorder.Event(service, api.EventTypeNormal, "DeletingDNSRecord", "Deleting DNS Records") | ||||||
|  | 	// TODO should we delete dns info here or wait for endpoint changes? prefer here | ||||||
|  | 	// or we do nothing for service deletion | ||||||
|  | 	//err := s.dns.balancer.EnsureLoadBalancerDeleted(service) | ||||||
|  | 	err, retry := s.deleteFederationService(cachedService) | ||||||
|  | 	if err != nil { | ||||||
|  | 		message := "Error occurs when deleting federation service" | ||||||
|  | 		if retry { | ||||||
|  | 			message += " (will retry): " | ||||||
|  | 		} else { | ||||||
|  | 			message += " (will not retry): " | ||||||
|  | 		} | ||||||
|  | 		s.eventRecorder.Event(service, api.EventTypeWarning, "DeletingDNSRecordFailed", message) | ||||||
|  | 		return err, cachedService.nextRetryDelay() | ||||||
|  | 	} | ||||||
|  | 	s.eventRecorder.Event(service, api.EventTypeNormal, "DeletedDNSRecord", "Deleted DNS Records") | ||||||
|  | 	s.serviceCache.delete(key) | ||||||
|  |  | ||||||
|  | 	cachedService.resetRetryDelay() | ||||||
|  | 	return nil, doNotRetry | ||||||
|  | } | ||||||
| @@ -0,0 +1,67 @@ | |||||||
|  | /* | ||||||
|  | Copyright 2016 The Kubernetes Authors All rights reserved. | ||||||
|  |  | ||||||
|  | Licensed under the Apache License, Version 2.0 (the "License"); | ||||||
|  | you may not use this file except in compliance with the License. | ||||||
|  | You may obtain a copy of the License at | ||||||
|  |  | ||||||
|  |     http://www.apache.org/licenses/LICENSE-2.0 | ||||||
|  |  | ||||||
|  | Unless required by applicable law or agreed to in writing, software | ||||||
|  | distributed under the License is distributed on an "AS IS" BASIS, | ||||||
|  | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||||
|  | See the License for the specific language governing permissions and | ||||||
|  | limitations under the License. | ||||||
|  | */ | ||||||
|  |  | ||||||
|  | package service | ||||||
|  |  | ||||||
|  | import ( | ||||||
|  | 	"testing" | ||||||
|  |  | ||||||
|  | 	"k8s.io/kubernetes/federation/apis/federation" | ||||||
|  | 	"k8s.io/kubernetes/pkg/api" | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | func TestGetClusterConditionPredicate(t *testing.T) { | ||||||
|  | 	tests := []struct { | ||||||
|  | 		cluster      federation.Cluster | ||||||
|  | 		expectAccept bool | ||||||
|  | 		name         string | ||||||
|  | 	}{ | ||||||
|  | 		{ | ||||||
|  | 			cluster:      federation.Cluster{}, | ||||||
|  | 			expectAccept: false, | ||||||
|  | 			name:         "empty", | ||||||
|  | 		}, | ||||||
|  | 		{ | ||||||
|  | 			cluster: federation.Cluster{ | ||||||
|  | 				Status: federation.ClusterStatus{ | ||||||
|  | 					Conditions: []federation.ClusterCondition{ | ||||||
|  | 						{Type: federation.ClusterReady, Status: api.ConditionTrue}, | ||||||
|  | 					}, | ||||||
|  | 				}, | ||||||
|  | 			}, | ||||||
|  | 			expectAccept: true, | ||||||
|  | 			name:         "basic", | ||||||
|  | 		}, | ||||||
|  | 		{ | ||||||
|  | 			cluster: federation.Cluster{ | ||||||
|  | 				Status: federation.ClusterStatus{ | ||||||
|  | 					Conditions: []federation.ClusterCondition{ | ||||||
|  | 						{Type: federation.ClusterReady, Status: api.ConditionFalse}, | ||||||
|  | 					}, | ||||||
|  | 				}, | ||||||
|  | 			}, | ||||||
|  | 			expectAccept: false, | ||||||
|  | 			name:         "notready", | ||||||
|  | 		}, | ||||||
|  | 	} | ||||||
|  | 	pred := getClusterConditionPredicate() | ||||||
|  | 	for _, test := range tests { | ||||||
|  | 		accept := pred(test.cluster) | ||||||
|  | 		if accept != test.expectAccept { | ||||||
|  | 			t.Errorf("Test failed for %s, expected %v, saw %v", test.name, test.expectAccept, accept) | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | } | ||||||
| @@ -63,6 +63,7 @@ concurrent-deployment-syncs | |||||||
| concurrent-endpoint-syncs | concurrent-endpoint-syncs | ||||||
| concurrent-namespace-syncs | concurrent-namespace-syncs | ||||||
| concurrent-replicaset-syncs | concurrent-replicaset-syncs | ||||||
|  | concurrent-service-syncs | ||||||
| concurrent-resource-quota-syncs | concurrent-resource-quota-syncs | ||||||
| config-sync-period | config-sync-period | ||||||
| configure-cbr0 | configure-cbr0 | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user
	 mfanjie
					mfanjie