Merge pull request #30419 from quinton-hoole/2016-08-10-federated-ingress-controller
Automatic merge from submit-queue Federated Ingress Controller Based on new federated controller libraries. cc @kubernetes/sig-cluster-federation @mfanjie @nikhiljindal @mwielgus @mml @madhusudancs FYI
This commit is contained in:
		@@ -30,6 +30,7 @@ import (
 | 
			
		||||
	"k8s.io/kubernetes/federation/cmd/federation-controller-manager/app/options"
 | 
			
		||||
	"k8s.io/kubernetes/federation/pkg/dnsprovider"
 | 
			
		||||
	clustercontroller "k8s.io/kubernetes/federation/pkg/federation-controller/cluster"
 | 
			
		||||
	ingresscontroller "k8s.io/kubernetes/federation/pkg/federation-controller/ingress"
 | 
			
		||||
	namespacecontroller "k8s.io/kubernetes/federation/pkg/federation-controller/namespace"
 | 
			
		||||
	replicasetcontroller "k8s.io/kubernetes/federation/pkg/federation-controller/replicaset"
 | 
			
		||||
	secretcontroller "k8s.io/kubernetes/federation/pkg/federation-controller/secret"
 | 
			
		||||
@@ -136,20 +137,19 @@ func Run(s *options.CMServer) error {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func StartControllers(s *options.CMServer, restClientCfg *restclient.Config) error {
 | 
			
		||||
	glog.Infof("Loading client config for cluster controller %q", "cluster-controller")
 | 
			
		||||
	ccClientset := federationclientset.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, "cluster-controller"))
 | 
			
		||||
	glog.Infof("Running cluster controller")
 | 
			
		||||
	go clustercontroller.NewclusterController(ccClientset, s.ClusterMonitorPeriod.Duration).Run()
 | 
			
		||||
	dns, err := dnsprovider.InitDnsProvider(s.DnsProvider, s.DnsConfigFile)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		glog.Fatalf("Cloud provider could not be initialized: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
	scClientset := federationclientset.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, servicecontroller.UserAgentName))
 | 
			
		||||
	servicecontroller := servicecontroller.New(scClientset, dns, s.FederationName, s.ZoneName)
 | 
			
		||||
	if err := servicecontroller.Run(s.ConcurrentServiceSyncs, wait.NeverStop); err != nil {
 | 
			
		||||
		glog.Errorf("Failed to start service controller: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	glog.Infof("Loading client config for namespace controller %q", "namespace-controller")
 | 
			
		||||
	nsClientset := federationclientset.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, "namespace-controller"))
 | 
			
		||||
	namespaceController := namespacecontroller.NewNamespaceController(nsClientset)
 | 
			
		||||
	glog.Infof("Running namespace controller")
 | 
			
		||||
	namespaceController.Run(wait.NeverStop)
 | 
			
		||||
 | 
			
		||||
	secretcontrollerClientset := federationclientset.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, "secret-controller"))
 | 
			
		||||
@@ -160,5 +160,19 @@ func StartControllers(s *options.CMServer, restClientCfg *restclient.Config) err
 | 
			
		||||
	replicaSetController := replicasetcontroller.NewReplicaSetController(replicaSetClientset)
 | 
			
		||||
	go replicaSetController.Run(s.ConcurrentReplicaSetSyncs, wait.NeverStop)
 | 
			
		||||
 | 
			
		||||
	glog.Infof("Loading client config for ingress controller %q", "ingress-controller")
 | 
			
		||||
	ingClientset := federationclientset.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, "ingress-controller"))
 | 
			
		||||
	ingressController := ingresscontroller.NewIngressController(ingClientset)
 | 
			
		||||
	glog.Infof("Running ingress controller")
 | 
			
		||||
	ingressController.Run(wait.NeverStop)
 | 
			
		||||
 | 
			
		||||
	glog.Infof("Loading client config for service controller %q", servicecontroller.UserAgentName)
 | 
			
		||||
	scClientset := federationclientset.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, servicecontroller.UserAgentName))
 | 
			
		||||
	servicecontroller := servicecontroller.New(scClientset, dns, s.FederationName, s.ZoneName)
 | 
			
		||||
	glog.Infof("Running service controller")
 | 
			
		||||
	if err := servicecontroller.Run(s.ConcurrentServiceSyncs, wait.NeverStop); err != nil {
 | 
			
		||||
		glog.Errorf("Failed to start service controller: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	select {}
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -0,0 +1,358 @@
 | 
			
		||||
/*
 | 
			
		||||
Copyright 2016 The Kubernetes Authors.
 | 
			
		||||
 | 
			
		||||
Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
you may not use this file except in compliance with the License.
 | 
			
		||||
You may obtain a copy of the License at
 | 
			
		||||
 | 
			
		||||
    http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 | 
			
		||||
Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
See the License for the specific language governing permissions and
 | 
			
		||||
limitations under the License.
 | 
			
		||||
*/
 | 
			
		||||
 | 
			
		||||
package ingress
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"reflect"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	federation_api "k8s.io/kubernetes/federation/apis/federation/v1beta1"
 | 
			
		||||
	federation_release_1_4 "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_4"
 | 
			
		||||
	"k8s.io/kubernetes/federation/pkg/federation-controller/util"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/api"
 | 
			
		||||
	extensions_v1beta1 "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/client/cache"
 | 
			
		||||
	kube_release_1_4 "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_4"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/controller"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/controller/framework"
 | 
			
		||||
	pkg_runtime "k8s.io/kubernetes/pkg/runtime"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/types"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/util/flowcontrol"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/watch"
 | 
			
		||||
 | 
			
		||||
	"github.com/golang/glog"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
const (
 | 
			
		||||
	allClustersKey        = "ALL_CLUSTERS"
 | 
			
		||||
	staticIPAnnotationKey = "ingress.kubernetes.io/static-ip" // TODO: Get this directly from the Kubernetes Ingress Controller constant
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type IngressController struct {
 | 
			
		||||
	// For triggering single ingress reconcilation. This is used when there is an
 | 
			
		||||
	// add/update/delete operation on an ingress in either federated API server or
 | 
			
		||||
	// in some member of the federation.
 | 
			
		||||
	ingressDeliverer *util.DelayingDeliverer
 | 
			
		||||
 | 
			
		||||
	// For triggering reconcilation of all ingresses. This is used when
 | 
			
		||||
	// a new cluster becomes available.
 | 
			
		||||
	clusterDeliverer *util.DelayingDeliverer
 | 
			
		||||
 | 
			
		||||
	// Contains ingresses present in members of federation.
 | 
			
		||||
	ingressFederatedInformer util.FederatedInformer
 | 
			
		||||
	// For updating members of federation.
 | 
			
		||||
	federatedUpdater util.FederatedUpdater
 | 
			
		||||
	// Definitions of ingresses that should be federated.
 | 
			
		||||
	ingressInformerStore cache.Store
 | 
			
		||||
	// Informer controller for ingresses that should be federated.
 | 
			
		||||
	ingressInformerController framework.ControllerInterface
 | 
			
		||||
 | 
			
		||||
	// Client to federated api server.
 | 
			
		||||
	federatedApiClient federation_release_1_4.Interface
 | 
			
		||||
 | 
			
		||||
	// Backoff manager for ingresses
 | 
			
		||||
	ingressBackoff *flowcontrol.Backoff
 | 
			
		||||
 | 
			
		||||
	ingressReviewDelay    time.Duration
 | 
			
		||||
	clusterAvailableDelay time.Duration
 | 
			
		||||
	smallDelay            time.Duration
 | 
			
		||||
	updateTimeout         time.Duration
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// NewIngressController returns a new ingress controller
 | 
			
		||||
func NewIngressController(client federation_release_1_4.Interface) *IngressController {
 | 
			
		||||
	ic := &IngressController{
 | 
			
		||||
		federatedApiClient:    client,
 | 
			
		||||
		ingressReviewDelay:    time.Second * 10,
 | 
			
		||||
		clusterAvailableDelay: time.Second * 20,
 | 
			
		||||
		smallDelay:            time.Second * 3,
 | 
			
		||||
		updateTimeout:         time.Second * 30,
 | 
			
		||||
		ingressBackoff:        flowcontrol.NewBackOff(5*time.Second, time.Minute),
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Build deliverers for triggering reconcilations.
 | 
			
		||||
	ic.ingressDeliverer = util.NewDelayingDeliverer()
 | 
			
		||||
	ic.clusterDeliverer = util.NewDelayingDeliverer()
 | 
			
		||||
 | 
			
		||||
	// Start informer in federated API servers on ingresses that should be federated.
 | 
			
		||||
	ic.ingressInformerStore, ic.ingressInformerController = framework.NewInformer(
 | 
			
		||||
		&cache.ListWatch{
 | 
			
		||||
			ListFunc: func(options api.ListOptions) (pkg_runtime.Object, error) {
 | 
			
		||||
				return client.Extensions().Ingresses(api.NamespaceAll).List(options)
 | 
			
		||||
			},
 | 
			
		||||
			WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
 | 
			
		||||
				return client.Extensions().Ingresses(api.NamespaceAll).Watch(options)
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
		&extensions_v1beta1.Ingress{},
 | 
			
		||||
		controller.NoResyncPeriodFunc(),
 | 
			
		||||
		util.NewTriggerOnAllChanges(
 | 
			
		||||
			func(obj pkg_runtime.Object) {
 | 
			
		||||
				ic.deliverIngressObj(obj, 0, false)
 | 
			
		||||
			},
 | 
			
		||||
		))
 | 
			
		||||
 | 
			
		||||
	// Federated informer on ingresses in members of federation.
 | 
			
		||||
	ic.ingressFederatedInformer = util.NewFederatedInformer(
 | 
			
		||||
		client,
 | 
			
		||||
		func(cluster *federation_api.Cluster, targetClient kube_release_1_4.Interface) (cache.Store, framework.ControllerInterface) {
 | 
			
		||||
			return framework.NewInformer(
 | 
			
		||||
				&cache.ListWatch{
 | 
			
		||||
					ListFunc: func(options api.ListOptions) (pkg_runtime.Object, error) {
 | 
			
		||||
						return targetClient.Extensions().Ingresses(api.NamespaceAll).List(options)
 | 
			
		||||
					},
 | 
			
		||||
					WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
 | 
			
		||||
						return targetClient.Extensions().Ingresses(api.NamespaceAll).Watch(options)
 | 
			
		||||
					},
 | 
			
		||||
				},
 | 
			
		||||
				&extensions_v1beta1.Ingress{},
 | 
			
		||||
				controller.NoResyncPeriodFunc(),
 | 
			
		||||
				// Trigger reconcilation whenever something in federated cluster is changed. In most cases it
 | 
			
		||||
				// would be just confirmation that some ingress operation suceeded.
 | 
			
		||||
				util.NewTriggerOnAllChanges(
 | 
			
		||||
					func(obj pkg_runtime.Object) {
 | 
			
		||||
						ic.deliverIngressObj(obj, ic.ingressReviewDelay, false)
 | 
			
		||||
					},
 | 
			
		||||
				))
 | 
			
		||||
		},
 | 
			
		||||
 | 
			
		||||
		&util.ClusterLifecycleHandlerFuncs{
 | 
			
		||||
			ClusterAvailable: func(cluster *federation_api.Cluster) {
 | 
			
		||||
				// When new cluster becomes available process all the ingresses again.
 | 
			
		||||
				ic.clusterDeliverer.DeliverAt(allClustersKey, nil, time.Now().Add(ic.clusterAvailableDelay))
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
	)
 | 
			
		||||
 | 
			
		||||
	// Federated updater along with Create/Update/Delete operations.
 | 
			
		||||
	ic.federatedUpdater = util.NewFederatedUpdater(ic.ingressFederatedInformer,
 | 
			
		||||
		func(client kube_release_1_4.Interface, obj pkg_runtime.Object) error {
 | 
			
		||||
			ingress := obj.(*extensions_v1beta1.Ingress)
 | 
			
		||||
			glog.V(4).Infof("Attempting to create Ingress: %v", ingress)
 | 
			
		||||
			_, err := client.Extensions().Ingresses(ingress.Namespace).Create(ingress)
 | 
			
		||||
			return err
 | 
			
		||||
		},
 | 
			
		||||
		func(client kube_release_1_4.Interface, obj pkg_runtime.Object) error {
 | 
			
		||||
			ingress := obj.(*extensions_v1beta1.Ingress)
 | 
			
		||||
			glog.V(4).Infof("Attempting to update Ingress: %v", ingress)
 | 
			
		||||
			_, err := client.Extensions().Ingresses(ingress.Namespace).Update(ingress)
 | 
			
		||||
			return err
 | 
			
		||||
		},
 | 
			
		||||
		func(client kube_release_1_4.Interface, obj pkg_runtime.Object) error {
 | 
			
		||||
			ingress := obj.(*extensions_v1beta1.Ingress)
 | 
			
		||||
			glog.V(4).Infof("Attempting to delete Ingress: %v", ingress)
 | 
			
		||||
			err := client.Extensions().Ingresses(ingress.Namespace).Delete(ingress.Name, &api.DeleteOptions{})
 | 
			
		||||
			return err
 | 
			
		||||
		})
 | 
			
		||||
	return ic
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (ic *IngressController) Run(stopChan <-chan struct{}) {
 | 
			
		||||
	glog.Infof("Starting Ingress Controller")
 | 
			
		||||
	go ic.ingressInformerController.Run(stopChan)
 | 
			
		||||
	glog.Infof("... Starting Ingress Federated Informer")
 | 
			
		||||
	ic.ingressFederatedInformer.Start()
 | 
			
		||||
	go func() {
 | 
			
		||||
		<-stopChan
 | 
			
		||||
		glog.Infof("Stopping Ingress Controller")
 | 
			
		||||
		ic.ingressFederatedInformer.Stop()
 | 
			
		||||
	}()
 | 
			
		||||
	ic.ingressDeliverer.StartWithHandler(func(item *util.DelayingDelivererItem) {
 | 
			
		||||
		ingress := item.Value.(types.NamespacedName)
 | 
			
		||||
		glog.V(4).Infof("Ingress change delivered, reconciling: %v", ingress)
 | 
			
		||||
		ic.reconcileIngress(ingress)
 | 
			
		||||
	})
 | 
			
		||||
	ic.clusterDeliverer.StartWithHandler(func(_ *util.DelayingDelivererItem) {
 | 
			
		||||
		glog.V(4).Infof("Cluster change delivered, reconciling ingresses")
 | 
			
		||||
		ic.reconcileIngressesOnClusterChange()
 | 
			
		||||
	})
 | 
			
		||||
	go func() {
 | 
			
		||||
		select {
 | 
			
		||||
		case <-time.After(time.Minute):
 | 
			
		||||
			glog.V(4).Infof("Ingress controller is garbage collecting")
 | 
			
		||||
			ic.ingressBackoff.GC()
 | 
			
		||||
		case <-stopChan:
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
	}()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (ic *IngressController) deliverIngressObj(obj interface{}, delay time.Duration, failed bool) {
 | 
			
		||||
	ingress := obj.(*extensions_v1beta1.Ingress)
 | 
			
		||||
	ic.deliverIngress(types.NamespacedName{Namespace: ingress.Namespace, Name: ingress.Name}, delay, failed)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (ic *IngressController) deliverIngress(ingress types.NamespacedName, delay time.Duration, failed bool) {
 | 
			
		||||
	glog.V(4).Infof("Delivering ingress: %s", ingress)
 | 
			
		||||
	key := ingress.String()
 | 
			
		||||
	if failed {
 | 
			
		||||
		ic.ingressBackoff.Next(key, time.Now())
 | 
			
		||||
		delay = delay + ic.ingressBackoff.Get(key)
 | 
			
		||||
	} else {
 | 
			
		||||
		ic.ingressBackoff.Reset(key)
 | 
			
		||||
	}
 | 
			
		||||
	ic.ingressDeliverer.DeliverAfter(key, ingress, delay)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Check whether all data stores are in sync. False is returned if any of the informer/stores is not yet
 | 
			
		||||
// synced with the coresponding api server.
 | 
			
		||||
func (ic *IngressController) isSynced() bool {
 | 
			
		||||
	if !ic.ingressFederatedInformer.ClustersSynced() {
 | 
			
		||||
		glog.V(2).Infof("Cluster list not synced")
 | 
			
		||||
		return false
 | 
			
		||||
	}
 | 
			
		||||
	clusters, err := ic.ingressFederatedInformer.GetReadyClusters()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		glog.Errorf("Failed to get ready clusters: %v", err)
 | 
			
		||||
		return false
 | 
			
		||||
	}
 | 
			
		||||
	if !ic.ingressFederatedInformer.GetTargetStore().ClustersSynced(clusters) {
 | 
			
		||||
		glog.V(2).Infof("Target store not synced")
 | 
			
		||||
		return false
 | 
			
		||||
	}
 | 
			
		||||
	glog.V(4).Infof("Cluster list is synced")
 | 
			
		||||
	return true
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// The function triggers reconcilation of all federated ingresses.
 | 
			
		||||
func (ic *IngressController) reconcileIngressesOnClusterChange() {
 | 
			
		||||
	glog.V(4).Infof("Reconciling ingresses on cluster change")
 | 
			
		||||
	if !ic.isSynced() {
 | 
			
		||||
		ic.clusterDeliverer.DeliverAt(allClustersKey, nil, time.Now().Add(ic.clusterAvailableDelay))
 | 
			
		||||
	}
 | 
			
		||||
	for _, obj := range ic.ingressInformerStore.List() {
 | 
			
		||||
		ingress := obj.(*extensions_v1beta1.Ingress)
 | 
			
		||||
		ic.deliverIngress(types.NamespacedName{Namespace: ingress.Namespace, Name: ingress.Name}, ic.smallDelay, false)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (ic *IngressController) reconcileIngress(ingress types.NamespacedName) {
 | 
			
		||||
	glog.V(4).Infof("Reconciling ingress %q", ingress)
 | 
			
		||||
	if !ic.isSynced() {
 | 
			
		||||
		ic.deliverIngress(ingress, ic.clusterAvailableDelay, false)
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	key := ingress.String()
 | 
			
		||||
	baseIngressObj, exist, err := ic.ingressInformerStore.GetByKey(key)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		glog.Errorf("Failed to query main ingress store for %v: %v", ingress, err)
 | 
			
		||||
		ic.deliverIngress(ingress, 0, true)
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	if !exist {
 | 
			
		||||
		// Not federated ingress, ignoring.
 | 
			
		||||
		glog.V(4).Infof("Ingress %q is not federated.  Ignoring.", ingress)
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	baseIngress := baseIngressObj.(*extensions_v1beta1.Ingress)
 | 
			
		||||
 | 
			
		||||
	clusters, err := ic.ingressFederatedInformer.GetReadyClusters()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		glog.Errorf("Failed to get cluster list: %v", err)
 | 
			
		||||
		ic.deliverIngress(ingress, ic.clusterAvailableDelay, false)
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	operations := make([]util.FederatedOperation, 0)
 | 
			
		||||
 | 
			
		||||
	for clusterIndex, cluster := range clusters {
 | 
			
		||||
		_, baseIPExists := baseIngress.ObjectMeta.Annotations[staticIPAnnotationKey]
 | 
			
		||||
		clusterIngressObj, found, err := ic.ingressFederatedInformer.GetTargetStore().GetByKey(cluster.Name, key)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			glog.Errorf("Failed to get %s from %s: %v", ingress, cluster.Name, err)
 | 
			
		||||
			ic.deliverIngress(ingress, 0, true)
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
		desiredIngress := &extensions_v1beta1.Ingress{
 | 
			
		||||
			ObjectMeta: baseIngress.ObjectMeta,
 | 
			
		||||
			Spec:       baseIngress.Spec,
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		if !found {
 | 
			
		||||
			// We can't supply server-created fields when creating a new object.
 | 
			
		||||
			desiredIngress.ObjectMeta.ResourceVersion = ""
 | 
			
		||||
			desiredIngress.ObjectMeta.UID = ""
 | 
			
		||||
			// We always first create an ingress in the first available cluster.  Once that ingress
 | 
			
		||||
			// has been created and allocated a global IP (visible via an annotation),
 | 
			
		||||
			// we record that annotation on the federated ingress, and create all other cluster
 | 
			
		||||
			// ingresses with that same global IP.
 | 
			
		||||
			// Note: If the first cluster becomes (e.g. temporarily) unavailable, the second cluster will be allocated
 | 
			
		||||
			// index 0, but eventually all ingresses will share the single global IP recorded in the annotation
 | 
			
		||||
			// of the federated ingress.
 | 
			
		||||
			if baseIPExists || (clusterIndex == 0) {
 | 
			
		||||
				operations = append(operations, util.FederatedOperation{
 | 
			
		||||
					Type:        util.OperationTypeAdd,
 | 
			
		||||
					Obj:         desiredIngress,
 | 
			
		||||
					ClusterName: cluster.Name,
 | 
			
		||||
				})
 | 
			
		||||
			}
 | 
			
		||||
		} else {
 | 
			
		||||
			clusterIngress := clusterIngressObj.(*extensions_v1beta1.Ingress)
 | 
			
		||||
			glog.V(4).Infof("Found existing Ingress %s in cluster %s - checking if update is required", ingress, cluster.Name)
 | 
			
		||||
			clusterIPName, clusterIPExists := clusterIngress.ObjectMeta.Annotations[staticIPAnnotationKey]
 | 
			
		||||
			if !baseIPExists && clusterIPExists {
 | 
			
		||||
				// Add annotation to federated ingress via API.
 | 
			
		||||
				original, err := ic.federatedApiClient.Extensions().Ingresses(baseIngress.Namespace).Get(baseIngress.Name)
 | 
			
		||||
				if err == nil {
 | 
			
		||||
					original.ObjectMeta.Annotations[staticIPAnnotationKey] = clusterIPName
 | 
			
		||||
					if _, err = ic.federatedApiClient.Extensions().Ingresses(baseIngress.Namespace).Update(original); err != nil {
 | 
			
		||||
						glog.Errorf("Failed to add static IP annotation to federated ingress %q: %v", ingress, err)
 | 
			
		||||
					}
 | 
			
		||||
				} else {
 | 
			
		||||
					glog.Errorf("Failed to get federated ingress %q: %v", ingress, err)
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
			// Update existing ingress, if needed.
 | 
			
		||||
			if !util.ObjectMetaIsEquivalent(desiredIngress.ObjectMeta, clusterIngress.ObjectMeta) ||
 | 
			
		||||
				!reflect.DeepEqual(desiredIngress.Spec, clusterIngress.Spec) {
 | 
			
		||||
				// TODO: In some cases Ingress controllers in the clusters add annotations, so we ideally need to exclude those from
 | 
			
		||||
				// the equivalence comparison to cut down on unnecessary updates.
 | 
			
		||||
				glog.V(4).Infof("Ingress %s in cluster %s needs an update: cluster ingress %v is not equivalent to federated ingress %v", ingress, cluster.Name, clusterIngress, desiredIngress)
 | 
			
		||||
				// We need to use server-created fields from the cluster, not the desired object when updating.
 | 
			
		||||
				desiredIngress.ObjectMeta.ResourceVersion = clusterIngress.ObjectMeta.ResourceVersion
 | 
			
		||||
				desiredIngress.ObjectMeta.UID = clusterIngress.ObjectMeta.UID
 | 
			
		||||
				// Merge any annotations on the federated ingress onto the underlying cluster ingress,
 | 
			
		||||
				// overwriting duplicates.
 | 
			
		||||
				// TODO: We should probably use a PATCH operation for this instead.
 | 
			
		||||
				for key, val := range baseIngress.ObjectMeta.Annotations {
 | 
			
		||||
					desiredIngress.ObjectMeta.Annotations[key] = val
 | 
			
		||||
				}
 | 
			
		||||
				operations = append(operations, util.FederatedOperation{
 | 
			
		||||
					Type:        util.OperationTypeUpdate,
 | 
			
		||||
					Obj:         desiredIngress,
 | 
			
		||||
					ClusterName: cluster.Name,
 | 
			
		||||
				})
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if len(operations) == 0 {
 | 
			
		||||
		// Everything is in order
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	glog.V(4).Infof("Calling federatedUpdater.Update() - operations: %v", operations)
 | 
			
		||||
	err = ic.federatedUpdater.Update(operations, ic.updateTimeout)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		glog.Errorf("Failed to execute updates for %s: %v", ingress, err)
 | 
			
		||||
		ic.deliverIngress(ingress, ic.ingressReviewDelay, true)
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Evertyhing is in order but lets be double sure - TODO: quinton: Why? This seems like a hack.
 | 
			
		||||
	ic.deliverIngress(ingress, ic.ingressReviewDelay, false)
 | 
			
		||||
}
 | 
			
		||||
@@ -0,0 +1,180 @@
 | 
			
		||||
/*
 | 
			
		||||
Copyright 2016 The Kubernetes Authors.
 | 
			
		||||
 | 
			
		||||
Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
you may not use this file except in compliance with the License.
 | 
			
		||||
You may obtain a copy of the License at
 | 
			
		||||
 | 
			
		||||
    http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 | 
			
		||||
Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
See the License for the specific language governing permissions and
 | 
			
		||||
limitations under the License.
 | 
			
		||||
*/
 | 
			
		||||
 | 
			
		||||
package ingress
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"reflect"
 | 
			
		||||
	"testing"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	federation_api "k8s.io/kubernetes/federation/apis/federation/v1beta1"
 | 
			
		||||
	// federation_release_1_4 "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_4"
 | 
			
		||||
	fake_federation_release_1_4 "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_4/fake"
 | 
			
		||||
	"k8s.io/kubernetes/federation/pkg/federation-controller/util"
 | 
			
		||||
	api_v1 "k8s.io/kubernetes/pkg/api/v1"
 | 
			
		||||
	extensions_v1beta1 "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
 | 
			
		||||
	kube_release_1_4 "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_4"
 | 
			
		||||
	fake_kube_release_1_4 "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_4/fake"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/client/testing/core"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/runtime"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/watch"
 | 
			
		||||
 | 
			
		||||
	"github.com/stretchr/testify/assert"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func TestIngressController(t *testing.T) {
 | 
			
		||||
	cluster1 := mkCluster("cluster1", api_v1.ConditionTrue)
 | 
			
		||||
	cluster2 := mkCluster("cluster2", api_v1.ConditionTrue)
 | 
			
		||||
 | 
			
		||||
	fakeClient := &fake_federation_release_1_4.Clientset{}
 | 
			
		||||
	RegisterList("clusters", &fakeClient.Fake, &federation_api.ClusterList{Items: []federation_api.Cluster{*cluster1}})
 | 
			
		||||
	RegisterList("ingresses", &fakeClient.Fake, &extensions_v1beta1.IngressList{Items: []extensions_v1beta1.Ingress{}})
 | 
			
		||||
	ingressWatch := RegisterWatch("ingresses", &fakeClient.Fake)
 | 
			
		||||
	clusterWatch := RegisterWatch("clusters", &fakeClient.Fake)
 | 
			
		||||
 | 
			
		||||
	cluster1Client := &fake_kube_release_1_4.Clientset{}
 | 
			
		||||
	cluster1Watch := RegisterWatch("ingresses", &cluster1Client.Fake)
 | 
			
		||||
	RegisterList("ingresses", &cluster1Client.Fake, &extensions_v1beta1.IngressList{Items: []extensions_v1beta1.Ingress{}})
 | 
			
		||||
	cluster1CreateChan := RegisterCopyOnCreate("ingresses", &cluster1Client.Fake, cluster1Watch)
 | 
			
		||||
	cluster1UpdateChan := RegisterCopyOnUpdate("ingresses", &cluster1Client.Fake, cluster1Watch)
 | 
			
		||||
 | 
			
		||||
	cluster2Client := &fake_kube_release_1_4.Clientset{}
 | 
			
		||||
	cluster2Watch := RegisterWatch("ingresses", &cluster2Client.Fake)
 | 
			
		||||
	RegisterList("ingresses", &cluster2Client.Fake, &extensions_v1beta1.IngressList{Items: []extensions_v1beta1.Ingress{}})
 | 
			
		||||
	cluster2CreateChan := RegisterCopyOnCreate("ingresses", &cluster2Client.Fake, cluster2Watch)
 | 
			
		||||
 | 
			
		||||
	ingressController := NewIngressController(fakeClient)
 | 
			
		||||
	informer := toFederatedInformerForTestOnly(ingressController.ingressFederatedInformer)
 | 
			
		||||
	informer.SetClientFactory(func(cluster *federation_api.Cluster) (kube_release_1_4.Interface, error) {
 | 
			
		||||
		switch cluster.Name {
 | 
			
		||||
		case cluster1.Name:
 | 
			
		||||
			return cluster1Client, nil
 | 
			
		||||
		case cluster2.Name:
 | 
			
		||||
			return cluster2Client, nil
 | 
			
		||||
		default:
 | 
			
		||||
			return nil, fmt.Errorf("Unknown cluster")
 | 
			
		||||
		}
 | 
			
		||||
	})
 | 
			
		||||
	ingressController.clusterAvailableDelay = time.Second
 | 
			
		||||
	ingressController.ingressReviewDelay = 50 * time.Millisecond
 | 
			
		||||
	ingressController.smallDelay = 20 * time.Millisecond
 | 
			
		||||
	ingressController.updateTimeout = 5 * time.Second
 | 
			
		||||
 | 
			
		||||
	stop := make(chan struct{})
 | 
			
		||||
	ingressController.Run(stop)
 | 
			
		||||
 | 
			
		||||
	ing1 := extensions_v1beta1.Ingress{
 | 
			
		||||
		ObjectMeta: api_v1.ObjectMeta{
 | 
			
		||||
			Name:      "test-ingress",
 | 
			
		||||
			Namespace: "mynamespace",
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Test add federated ingress.
 | 
			
		||||
	ingressWatch.Add(&ing1)
 | 
			
		||||
	createdIngress := GetIngressFromChan(cluster1CreateChan)
 | 
			
		||||
	assert.NotNil(t, createdIngress)
 | 
			
		||||
	assert.True(t, reflect.DeepEqual(&ing1, createdIngress))
 | 
			
		||||
 | 
			
		||||
	// Test update federated ingress.
 | 
			
		||||
	ing1.Annotations = map[string]string{
 | 
			
		||||
		"A": "B",
 | 
			
		||||
	}
 | 
			
		||||
	ingressWatch.Modify(&ing1)
 | 
			
		||||
	updatedIngress := GetIngressFromChan(cluster1UpdateChan)
 | 
			
		||||
	assert.NotNil(t, updatedIngress)
 | 
			
		||||
	assert.True(t, reflect.DeepEqual(&ing1, updatedIngress))
 | 
			
		||||
 | 
			
		||||
	// Test add cluster
 | 
			
		||||
	ing1.Annotations[staticIPAnnotationKey] = "foo" // Make sure that the base object has a static IP name first.
 | 
			
		||||
	ingressWatch.Modify(&ing1)
 | 
			
		||||
	clusterWatch.Add(cluster2)
 | 
			
		||||
	createdIngress2 := GetIngressFromChan(cluster2CreateChan)
 | 
			
		||||
	assert.NotNil(t, createdIngress2)
 | 
			
		||||
	assert.True(t, reflect.DeepEqual(&ing1, createdIngress2))
 | 
			
		||||
 | 
			
		||||
	close(stop)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func toFederatedInformerForTestOnly(informer util.FederatedInformer) util.FederatedInformerForTestOnly {
 | 
			
		||||
	inter := informer.(interface{})
 | 
			
		||||
	return inter.(util.FederatedInformerForTestOnly)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func mkCluster(name string, readyStatus api_v1.ConditionStatus) *federation_api.Cluster {
 | 
			
		||||
	return &federation_api.Cluster{
 | 
			
		||||
		ObjectMeta: api_v1.ObjectMeta{
 | 
			
		||||
			Name: name,
 | 
			
		||||
		},
 | 
			
		||||
		Status: federation_api.ClusterStatus{
 | 
			
		||||
			Conditions: []federation_api.ClusterCondition{
 | 
			
		||||
				{Type: federation_api.ClusterReady, Status: readyStatus},
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func RegisterWatch(resource string, client *core.Fake) *watch.FakeWatcher {
 | 
			
		||||
	watcher := watch.NewFake()
 | 
			
		||||
	client.AddWatchReactor(resource, func(action core.Action) (bool, watch.Interface, error) { return true, watcher, nil })
 | 
			
		||||
	return watcher
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func RegisterList(resource string, client *core.Fake, obj runtime.Object) {
 | 
			
		||||
	client.AddReactor("list", resource, func(action core.Action) (bool, runtime.Object, error) {
 | 
			
		||||
		return true, obj, nil
 | 
			
		||||
	})
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func RegisterCopyOnCreate(resource string, client *core.Fake, watcher *watch.FakeWatcher) chan runtime.Object {
 | 
			
		||||
	objChan := make(chan runtime.Object, 100)
 | 
			
		||||
	client.AddReactor("create", resource, func(action core.Action) (bool, runtime.Object, error) {
 | 
			
		||||
		createAction := action.(core.CreateAction)
 | 
			
		||||
		obj := createAction.GetObject()
 | 
			
		||||
		go func() {
 | 
			
		||||
			watcher.Add(obj)
 | 
			
		||||
			objChan <- obj
 | 
			
		||||
		}()
 | 
			
		||||
		return true, obj, nil
 | 
			
		||||
	})
 | 
			
		||||
	return objChan
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func RegisterCopyOnUpdate(resource string, client *core.Fake, watcher *watch.FakeWatcher) chan runtime.Object {
 | 
			
		||||
	objChan := make(chan runtime.Object, 100)
 | 
			
		||||
	client.AddReactor("update", resource, func(action core.Action) (bool, runtime.Object, error) {
 | 
			
		||||
		updateAction := action.(core.UpdateAction)
 | 
			
		||||
		obj := updateAction.GetObject()
 | 
			
		||||
		go func() {
 | 
			
		||||
			watcher.Modify(obj)
 | 
			
		||||
			objChan <- obj
 | 
			
		||||
		}()
 | 
			
		||||
		return true, obj, nil
 | 
			
		||||
	})
 | 
			
		||||
	return objChan
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func GetIngressFromChan(c chan runtime.Object) *extensions_v1beta1.Ingress {
 | 
			
		||||
	select {
 | 
			
		||||
	case obj := <-c:
 | 
			
		||||
		ingress := obj.(*extensions_v1beta1.Ingress)
 | 
			
		||||
		return ingress
 | 
			
		||||
	case <-time.After(time.Minute):
 | 
			
		||||
		return nil
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
@@ -38,7 +38,7 @@ import (
 | 
			
		||||
 | 
			
		||||
const (
 | 
			
		||||
	clusterSyncPeriod = 10 * time.Minute
 | 
			
		||||
	userAgentName     = "federation-service-controller"
 | 
			
		||||
	userAgentName     = "federation-controller"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// An object with an origin information.
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										43
									
								
								federation/pkg/federation-controller/util/util.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										43
									
								
								federation/pkg/federation-controller/util/util.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,43 @@
 | 
			
		||||
/*
 | 
			
		||||
Copyright 2016 The Kubernetes Authors.
 | 
			
		||||
 | 
			
		||||
Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
you may not use this file except in compliance with the License.
 | 
			
		||||
You may obtain a copy of the License at
 | 
			
		||||
 | 
			
		||||
    http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 | 
			
		||||
Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
See the License for the specific language governing permissions and
 | 
			
		||||
limitations under the License.
 | 
			
		||||
*/
 | 
			
		||||
 | 
			
		||||
package util
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"reflect"
 | 
			
		||||
 | 
			
		||||
	"k8s.io/kubernetes/pkg/api/v1"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
/*
 | 
			
		||||
ObjectMetaIsEquivalent determines whether two ObjectMeta's (typically one from a federated API object,
 | 
			
		||||
and the other from a cluster object) are equivalent.
 | 
			
		||||
*/
 | 
			
		||||
func ObjectMetaIsEquivalent(m1, m2 v1.ObjectMeta) bool {
 | 
			
		||||
	// First make all of the read-only fields equal, then perform a deep equality comparison
 | 
			
		||||
	m1.SelfLink = m2.SelfLink                   // Might be different in different cluster contexts.
 | 
			
		||||
	m1.UID = m2.UID                             // Definitely different in different cluster contexts
 | 
			
		||||
	m1.ResourceVersion = m2.ResourceVersion     // Definitely different in different cluster contexts
 | 
			
		||||
	m1.Generation = m2.Generation               // Might be different in different cluster contexts.
 | 
			
		||||
	m1.CreationTimestamp = m2.CreationTimestamp // Definitely different in different cluster contexts.
 | 
			
		||||
	m1.DeletionTimestamp = m2.DeletionTimestamp // Might be different in different cluster contexts.
 | 
			
		||||
	m1.OwnerReferences = nil                    // Might be different in different cluster contexts.
 | 
			
		||||
	m2.OwnerReferences = nil
 | 
			
		||||
	m1.Finalizers = nil // Might be different in different cluster contexts.
 | 
			
		||||
	m2.Finalizers = nil
 | 
			
		||||
 | 
			
		||||
	return reflect.DeepEqual(m1, m2)
 | 
			
		||||
}
 | 
			
		||||
@@ -16,6 +16,11 @@ limitations under the License.
 | 
			
		||||
 | 
			
		||||
package types
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"strings"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// NamespacedName comprises a resource name, with a mandatory namespace,
 | 
			
		||||
// rendered as "<namespace>/<name>".  Being a type captures intent and
 | 
			
		||||
// helps make sure that UIDs, namespaced names and non-namespaced names
 | 
			
		||||
@@ -29,7 +34,27 @@ type NamespacedName struct {
 | 
			
		||||
	Name      string
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
const (
 | 
			
		||||
	Separator = '/'
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// String returns the general purpose string representation
 | 
			
		||||
func (n NamespacedName) String() string {
 | 
			
		||||
	return n.Namespace + "/" + n.Name
 | 
			
		||||
	return fmt.Sprintf("%s%c%s", n.Namespace, Separator, n.Name)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// NewNamespacedNameFromString parses the provided string and returns a NamespacedName.
 | 
			
		||||
// The expected format is as per String() above.
 | 
			
		||||
// If the input string is invalid, the returned NamespacedName has all empty string field values.
 | 
			
		||||
// This allows a single-value return from this function, while still allowing error checks in the caller.
 | 
			
		||||
// Note that an input string which does not include exactly one Separator is not a valid input (as it could never
 | 
			
		||||
// have neem returned by String() )
 | 
			
		||||
func NewNamespacedNameFromString(s string) NamespacedName {
 | 
			
		||||
	nn := NamespacedName{}
 | 
			
		||||
	result := strings.Split(s, string(Separator))
 | 
			
		||||
	if len(result) == 2 {
 | 
			
		||||
		nn.Namespace = result[0]
 | 
			
		||||
		nn.Name = result[1]
 | 
			
		||||
	}
 | 
			
		||||
	return nn
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user