Merge pull request #37323 from nikhiljindal/firstClusterPR
Automatic merge from submit-queue Fixing the logic to select first cluster in federated ingress controller Ref https://github.com/kubernetes/kubernetes/issues/36074. Before this change, ingress controller was using cluster with clusterIndex = 0 as the first cluster to create the ingress in. But the ordering of clusters can change and hence ingress controller ended up creating the ingress in multiple clusters. This PR fixes it by using an annotation on federated ingress. Controller now picks up a cluster randomly as the first cluster and creates ingress there. This clusters name is stored as an annotation on the federated ingress. Contoller does not create an ingress in any other cluster if this annotation is set on the federated ingress and IP has not been propagated. Once IP has been propagated, controller creates the ingress in all clusters. cc @kubernetes/sig-cluster-federation @madhusudancs
This commit is contained in:
		| @@ -53,6 +53,11 @@ const ( | ||||
| 	uidConfigMapName        = "ingress-uid"                                 // Name of the config-map and key the ingress controller stores its uid in. | ||||
| 	uidConfigMapNamespace   = "kube-system" | ||||
| 	uidKey                  = "uid" | ||||
| 	// Annotation on the ingress in federation control plane that is used to keep | ||||
| 	// track of the first cluster in which we create ingress. | ||||
| 	// We wait for ingress to be created in this cluster before creating it any | ||||
| 	// other cluster. | ||||
| 	firstClusterAnnotation = "ingress.federation.kubernetes.io/first-cluster" | ||||
| ) | ||||
|  | ||||
| type IngressController struct { | ||||
| @@ -637,6 +642,31 @@ func (ic *IngressController) updateClusterIngressUIDToMasters(cluster *federatio | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (ic *IngressController) isClusterReady(clusterName string) bool { | ||||
| 	cluster, isReady, err := ic.ingressFederatedInformer.GetReadyCluster(clusterName) | ||||
| 	return isReady && err == nil && cluster != nil | ||||
| } | ||||
|  | ||||
| // updateAnnotationOnIngress updates the annotation with the given key on the given federated ingress. | ||||
| // Queues the ingress for resync when done. | ||||
| func (ic *IngressController) updateAnnotationOnIngress(ingress *extensions_v1beta1.Ingress, key, value string) { | ||||
| 	if ingress.ObjectMeta.Annotations == nil { | ||||
| 		ingress.ObjectMeta.Annotations = make(map[string]string) | ||||
| 	} | ||||
| 	ingress.ObjectMeta.Annotations[key] = value | ||||
| 	ingressName := types.NamespacedName{Name: ingress.Name, Namespace: ingress.Namespace} | ||||
| 	glog.V(4).Infof("Attempting to update annotation %s:%s on base federated ingress: %v", key, value, ingressName) | ||||
| 	if updatedFedIngress, err := ic.federatedApiClient.Extensions().Ingresses(ingress.Namespace).Update(ingress); err != nil { | ||||
| 		glog.Errorf("Failed to update annotation %s:%s on federated ingress %q, will try again later: %v", key, value, ingressName, err) | ||||
| 		ic.deliverIngress(ingressName, ic.ingressReviewDelay, true) | ||||
| 		return | ||||
| 	} else { | ||||
| 		glog.V(4).Infof("Successfully updated annotation %s:%s on federated ingress %q, after update: %q", key, value, ingress, updatedFedIngress) | ||||
| 		ic.deliverIngress(ingressName, ic.smallDelay, false) | ||||
| 		return | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (ic *IngressController) reconcileIngress(ingress types.NamespacedName) { | ||||
| 	glog.V(4).Infof("Reconciling ingress %q for all clusters", ingress) | ||||
| 	if !ic.isSynced() { | ||||
| @@ -699,8 +729,9 @@ func (ic *IngressController) reconcileIngress(ingress types.NamespacedName) { | ||||
|  | ||||
| 	operations := make([]util.FederatedOperation, 0) | ||||
|  | ||||
| 	for clusterIndex, cluster := range clusters { | ||||
| 	for _, cluster := range clusters { | ||||
| 		baseIPName, baseIPAnnotationExists := baseIngress.ObjectMeta.Annotations[staticIPNameKeyWritable] | ||||
| 		firstClusterName, firstClusterExists := baseIngress.ObjectMeta.Annotations[firstClusterAnnotation] | ||||
| 		clusterIngressObj, clusterIngressFound, err := ic.ingressFederatedInformer.GetTargetStore().GetByKey(cluster.Name, key) | ||||
| 		if err != nil { | ||||
| 			glog.Errorf("Failed to get cached ingress %s for cluster %s, will retry: %v", ingress, cluster.Name, err) | ||||
| @@ -733,22 +764,33 @@ func (ic *IngressController) reconcileIngress(ingress types.NamespacedName) { | ||||
| 			ic.eventRecorder.Eventf(baseIngress, api.EventTypeNormal, "CreateInCluster", | ||||
| 				"Creating ingress in cluster %s", cluster.Name) | ||||
|  | ||||
| 			// We always first create an ingress in the first available cluster.  Once that ingress | ||||
| 			// We always first create an ingress in the first available cluster. Once that ingress | ||||
| 			// has been created and allocated a global IP (visible via an annotation), | ||||
| 			// we record that annotation on the federated ingress, and create all other cluster | ||||
| 			// ingresses with that same global IP. | ||||
| 			// Note: If the first cluster becomes (e.g. temporarily) unavailable, the second cluster will be allocated | ||||
| 			// index 0, but eventually all ingresses will share the single global IP recorded in the annotation | ||||
| 			// of the federated ingress. | ||||
| 			if baseIPAnnotationExists || (clusterIndex == 0) { | ||||
| 				glog.V(4).Infof("No existing Ingress %s in cluster %s (index %d) and static IP annotation (%q) on base ingress - queuing a create operation", ingress, cluster.Name, clusterIndex, staticIPNameKeyWritable) | ||||
| 			// Note: If the first cluster becomes (e.g. temporarily) unavailable, the | ||||
| 			// second cluster will become the first cluster, but eventually all ingresses | ||||
| 			// will share the single global IP recorded in the annotation of the | ||||
| 			// federated ingress. | ||||
| 			haveFirstCluster := firstClusterExists && firstClusterName != "" && ic.isClusterReady(firstClusterName) | ||||
| 			if !haveFirstCluster { | ||||
| 				glog.V(4).Infof("No cluster has been chosen as the first cluster. Electing cluster %s as the first cluster to create ingress in", cluster.Name) | ||||
| 				ic.updateAnnotationOnIngress(baseIngress, firstClusterAnnotation, cluster.Name) | ||||
| 				return | ||||
| 			} | ||||
| 			if baseIPAnnotationExists || firstClusterName == cluster.Name { | ||||
| 				if baseIPAnnotationExists { | ||||
| 					glog.V(4).Infof("No existing Ingress %s in cluster %s and static IP annotation (%q) exists on base ingress - queuing a create operation", ingress, cluster.Name, staticIPNameKeyWritable) | ||||
| 				} else { | ||||
| 					glog.V(4).Infof("No existing Ingress %s in cluster %s and no static IP annotation (%q) on base ingress - queuing a create operation in first cluster", ingress, cluster.Name, staticIPNameKeyWritable) | ||||
| 				} | ||||
| 				operations = append(operations, util.FederatedOperation{ | ||||
| 					Type:        util.OperationTypeAdd, | ||||
| 					Obj:         desiredIngress, | ||||
| 					ClusterName: cluster.Name, | ||||
| 				}) | ||||
| 			} else { | ||||
| 				glog.V(4).Infof("No annotation %q exists on ingress %q in federation, and index of cluster %q is %d and not zero.  Not queueing create operation for ingress %q until annotation exists", staticIPNameKeyWritable, ingress, cluster.Name, clusterIndex, ingress) | ||||
| 				glog.V(4).Infof("No annotation %q exists on ingress %q in federation and waiting for ingress in cluster %s. Not queueing create operation for ingress until annotation exists", staticIPNameKeyWritable, ingress, firstClusterName) | ||||
| 			} | ||||
| 		} else { | ||||
| 			clusterIngress := clusterIngressObj.(*extensions_v1beta1.Ingress) | ||||
| @@ -760,20 +802,8 @@ func (ic *IngressController) reconcileIngress(ingress types.NamespacedName) { | ||||
| 			if (!baseIPAnnotationExists && clusterIPNameExists) || (!baseLBStatusExists && clusterLBStatusExists) { // copy the IP name from the readonly annotation on the cluster ingress, to the writable annotation on the federated ingress | ||||
| 				glog.V(4).Infof(logStr, "Transferring") | ||||
| 				if !baseIPAnnotationExists && clusterIPNameExists { | ||||
| 					if baseIngress.ObjectMeta.Annotations == nil { | ||||
| 						baseIngress.ObjectMeta.Annotations = make(map[string]string) | ||||
| 					} | ||||
| 					baseIngress.ObjectMeta.Annotations[staticIPNameKeyWritable] = clusterIPName | ||||
| 					glog.V(4).Infof("Attempting to update base federated ingress annotations: %v", baseIngress) | ||||
| 					if updatedFedIngress, err := ic.federatedApiClient.Extensions().Ingresses(baseIngress.Namespace).Update(baseIngress); err != nil { | ||||
| 						glog.Errorf("Failed to add static IP annotation to federated ingress %q, will try again later: %v", ingress, err) | ||||
| 						ic.deliverIngress(ingress, ic.ingressReviewDelay, true) | ||||
| 						return | ||||
| 					} else { | ||||
| 						glog.V(4).Infof("Successfully updated federated ingress %q (added IP annotation), after update: %q", ingress, updatedFedIngress) | ||||
| 						ic.deliverIngress(ingress, ic.smallDelay, false) | ||||
| 						return | ||||
| 					} | ||||
| 					ic.updateAnnotationOnIngress(baseIngress, staticIPNameKeyWritable, clusterIPName) | ||||
| 					return | ||||
| 				} | ||||
| 				if !baseLBStatusExists && clusterLBStatusExists { | ||||
| 					lbstatusObj, lbErr := conversion.NewCloner().DeepCopy(&clusterIngress.Status.LoadBalancer) | ||||
|   | ||||
| @@ -99,11 +99,17 @@ func TestIngressController(t *testing.T) { | ||||
| 	t.Log("Running Ingress Controller") | ||||
| 	ingressController.Run(stop) | ||||
|  | ||||
| 	// TODO: Here we are creating the ingress with first cluster annotation. | ||||
| 	// Add another test without that annotation when | ||||
| 	// https://github.com/kubernetes/kubernetes/issues/36540 is fixed. | ||||
| 	ing1 := extensions_v1beta1.Ingress{ | ||||
| 		ObjectMeta: api_v1.ObjectMeta{ | ||||
| 			Name:      "test-ingress", | ||||
| 			Namespace: "mynamespace", | ||||
| 			SelfLink:  "/api/v1/namespaces/mynamespace/ingress/test-ingress", | ||||
| 			Annotations: map[string]string{ | ||||
| 				firstClusterAnnotation: cluster1.Name, | ||||
| 			}, | ||||
| 		}, | ||||
| 		Status: extensions_v1beta1.IngressStatus{ | ||||
| 			LoadBalancer: api_v1.LoadBalancerStatus{ | ||||
| @@ -181,7 +187,6 @@ func TestIngressController(t *testing.T) { | ||||
| 	*/ | ||||
| 	// Test add cluster | ||||
| 	t.Log("Adding a second cluster") | ||||
| 	ing1.Annotations = make(map[string]string) | ||||
| 	ing1.Annotations[staticIPNameKeyWritable] = "foo" // Make sure that the base object has a static IP name first. | ||||
| 	fedIngressWatch.Modify(&ing1) | ||||
| 	clusterWatch.Add(cluster2) | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 Kubernetes Submit Queue
					Kubernetes Submit Queue