add ClusterSelector to services
This commit is contained in:
		@@ -23,6 +23,7 @@ go_library(
 | 
			
		||||
        "//federation/pkg/dnsprovider:go_default_library",
 | 
			
		||||
        "//federation/pkg/dnsprovider/rrstype:go_default_library",
 | 
			
		||||
        "//federation/pkg/federation-controller/util:go_default_library",
 | 
			
		||||
        "//federation/pkg/federation-controller/util/clusterselector:go_default_library",
 | 
			
		||||
        "//federation/pkg/federation-controller/util/deletionhelper:go_default_library",
 | 
			
		||||
        "//pkg/api:go_default_library",
 | 
			
		||||
        "//pkg/api/v1:go_default_library",
 | 
			
		||||
 
 | 
			
		||||
@@ -42,6 +42,7 @@ import (
 | 
			
		||||
	v1beta1 "k8s.io/kubernetes/federation/apis/federation/v1beta1"
 | 
			
		||||
	fedclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset"
 | 
			
		||||
	fedutil "k8s.io/kubernetes/federation/pkg/federation-controller/util"
 | 
			
		||||
	"k8s.io/kubernetes/federation/pkg/federation-controller/util/clusterselector"
 | 
			
		||||
	"k8s.io/kubernetes/federation/pkg/federation-controller/util/deletionhelper"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/api"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/api/v1"
 | 
			
		||||
@@ -475,7 +476,7 @@ func (s *ServiceController) reconcileService(key string) reconciliationStatus {
 | 
			
		||||
	operations := make([]fedutil.FederatedOperation, 0)
 | 
			
		||||
	for _, cluster := range clusters {
 | 
			
		||||
		// Aggregate all operations to perform on all federated clusters
 | 
			
		||||
		operation, err := s.getOperationsToPerformOnCluster(cluster, fedService)
 | 
			
		||||
		operation, err := getOperationsToPerformOnCluster(s.federatedInformer, cluster, fedService, clusterselector.SendToCluster)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return statusRecoverableError
 | 
			
		||||
		}
 | 
			
		||||
@@ -529,43 +530,40 @@ func (s *ServiceController) reconcileService(key string) reconciliationStatus {
 | 
			
		||||
	return statusAllOk
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type clusterSelectorFunc func(map[string]string, map[string]string) (bool, error)
 | 
			
		||||
 | 
			
		||||
// getOperationsToPerformOnCluster returns the operations to be performed so that clustered service is in sync with federated service
 | 
			
		||||
func (s *ServiceController) getOperationsToPerformOnCluster(cluster *v1beta1.Cluster, fedService *v1.Service) (*fedutil.FederatedOperation, error) {
 | 
			
		||||
func getOperationsToPerformOnCluster(informer fedutil.FederatedInformer, cluster *v1beta1.Cluster, fedService *v1.Service, selector clusterSelectorFunc) (*fedutil.FederatedOperation, error) {
 | 
			
		||||
	var operation *fedutil.FederatedOperation
 | 
			
		||||
	var operationType fedutil.FederatedOperationType = ""
 | 
			
		||||
 | 
			
		||||
	key := types.NamespacedName{Namespace: fedService.Namespace, Name: fedService.Name}.String()
 | 
			
		||||
	clusterServiceObj, serviceFound, err := s.federatedInformer.GetTargetStore().GetByKey(cluster.Name, key)
 | 
			
		||||
	clusterServiceObj, found, err := informer.GetTargetStore().GetByKey(cluster.Name, key)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		runtime.HandleError(fmt.Errorf("Failed to get %s service from %s: %v", key, cluster.Name, err))
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
	if !serviceFound {
 | 
			
		||||
		desiredService := &v1.Service{
 | 
			
		||||
			ObjectMeta: fedutil.DeepCopyRelevantObjectMeta(fedService.ObjectMeta),
 | 
			
		||||
			Spec:       *(fedutil.DeepCopyApiTypeOrPanic(&fedService.Spec).(*v1.ServiceSpec)),
 | 
			
		||||
		}
 | 
			
		||||
		desiredService.ResourceVersion = ""
 | 
			
		||||
 | 
			
		||||
		glog.V(4).Infof("Creating service in underlying cluster %s: %+v", cluster.Name, desiredService)
 | 
			
		||||
	send, err := selector(cluster.Labels, fedService.ObjectMeta.Annotations)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		glog.Errorf("Error processing ClusterSelector cluster: %s for service map: %s error: %s", cluster.Name, key, err.Error())
 | 
			
		||||
		return nil, err
 | 
			
		||||
	} else if !send {
 | 
			
		||||
		glog.V(5).Infof("Skipping cluster: %s for service: %s reason: cluster selectors do not match: %-v %-v", cluster.Name, key, cluster.ObjectMeta.Labels, fedService.ObjectMeta.Annotations[v1beta1.FederationClusterSelectorAnnotation])
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
		operation = &fedutil.FederatedOperation{
 | 
			
		||||
			Type:        fedutil.OperationTypeAdd,
 | 
			
		||||
			Obj:         desiredService,
 | 
			
		||||
			ClusterName: cluster.Name,
 | 
			
		||||
			Key:         key,
 | 
			
		||||
		}
 | 
			
		||||
	} else {
 | 
			
		||||
	desiredService := &v1.Service{
 | 
			
		||||
		ObjectMeta: fedutil.DeepCopyRelevantObjectMeta(fedService.ObjectMeta),
 | 
			
		||||
		Spec:       *(fedutil.DeepCopyApiTypeOrPanic(&fedService.Spec).(*v1.ServiceSpec)),
 | 
			
		||||
	}
 | 
			
		||||
	switch {
 | 
			
		||||
	case found && send:
 | 
			
		||||
		clusterService, ok := clusterServiceObj.(*v1.Service)
 | 
			
		||||
		if !ok {
 | 
			
		||||
			runtime.HandleError(fmt.Errorf("Unexpected error for %q: %v", key, err))
 | 
			
		||||
			return nil, err
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		desiredService := &v1.Service{
 | 
			
		||||
			ObjectMeta: fedutil.DeepCopyRelevantObjectMeta(clusterService.ObjectMeta),
 | 
			
		||||
			Spec:       *(fedutil.DeepCopyApiTypeOrPanic(&fedService.Spec).(*v1.ServiceSpec)),
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		// ClusterIP and NodePort are allocated to Service by cluster, so retain the same if any while updating
 | 
			
		||||
		desiredService.Spec.ClusterIP = clusterService.Spec.ClusterIP
 | 
			
		||||
		for _, cPort := range clusterService.Spec.Ports {
 | 
			
		||||
@@ -583,21 +581,32 @@ func (s *ServiceController) getOperationsToPerformOnCluster(cluster *v1beta1.Clu
 | 
			
		||||
 | 
			
		||||
		// Update existing service, if needed.
 | 
			
		||||
		if !Equivalent(desiredService, clusterService) {
 | 
			
		||||
			operationType = fedutil.OperationTypeUpdate
 | 
			
		||||
 | 
			
		||||
			glog.V(4).Infof("Service in underlying cluster %s does not match, Desired: %+v, Existing: %+v", cluster.Name, desiredService, clusterService)
 | 
			
		||||
 | 
			
		||||
			// ResourceVersion of cluster service can be different from federated service,
 | 
			
		||||
			// so do not update ResourceVersion while updating cluster service
 | 
			
		||||
			desiredService.ResourceVersion = clusterService.ResourceVersion
 | 
			
		||||
 | 
			
		||||
			operation = &fedutil.FederatedOperation{
 | 
			
		||||
				Type:        fedutil.OperationTypeUpdate,
 | 
			
		||||
				Obj:         desiredService,
 | 
			
		||||
				ClusterName: cluster.Name,
 | 
			
		||||
				Key:         key,
 | 
			
		||||
			}
 | 
			
		||||
		} else {
 | 
			
		||||
			glog.V(5).Infof("Service in underlying cluster %s is up to date: %+v", cluster.Name, desiredService)
 | 
			
		||||
		}
 | 
			
		||||
	case found && !send:
 | 
			
		||||
		operationType = fedutil.OperationTypeDelete
 | 
			
		||||
	case !found && send:
 | 
			
		||||
		operationType = fedutil.OperationTypeAdd
 | 
			
		||||
		desiredService.ResourceVersion = ""
 | 
			
		||||
 | 
			
		||||
		glog.V(4).Infof("Creating service in underlying cluster %s: %+v", cluster.Name, desiredService)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if len(operationType) > 0 {
 | 
			
		||||
		operation = &fedutil.FederatedOperation{
 | 
			
		||||
			Type:        operationType,
 | 
			
		||||
			Obj:         desiredService,
 | 
			
		||||
			ClusterName: cluster.Name,
 | 
			
		||||
			Key:         key,
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return operation, nil
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -53,6 +53,8 @@ const (
 | 
			
		||||
	serviceEndpoint2 = "192.168.1.1"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
var awfulError error = errors.NewGone("Something bad happened")
 | 
			
		||||
 | 
			
		||||
func TestServiceController(t *testing.T) {
 | 
			
		||||
	glog.Infof("Creating fake infrastructure")
 | 
			
		||||
	fedClient := &fakefedclientset.Clientset{}
 | 
			
		||||
@@ -207,6 +209,53 @@ func TestServiceController(t *testing.T) {
 | 
			
		||||
	close(stop)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestGetOperationsToPerformOnCluster(t *testing.T) {
 | 
			
		||||
	obj := NewService("test-service-1", 80)
 | 
			
		||||
	cluster1 := NewClusterWithRegionZone("cluster1", v1.ConditionTrue, "region1", "zone1")
 | 
			
		||||
	fedClient := &fakefedclientset.Clientset{}
 | 
			
		||||
	sc := New(fedClient)
 | 
			
		||||
 | 
			
		||||
	testCases := map[string]struct {
 | 
			
		||||
		expectedSendErr bool
 | 
			
		||||
		sendToCluster   bool
 | 
			
		||||
		operationType   fedutil.FederatedOperationType
 | 
			
		||||
	}{
 | 
			
		||||
		"sendToCluster error returned": {
 | 
			
		||||
			expectedSendErr: true,
 | 
			
		||||
		},
 | 
			
		||||
		"Missing object and not matching ClusterSelector should result in no operations": {
 | 
			
		||||
			sendToCluster: false,
 | 
			
		||||
		},
 | 
			
		||||
		"Missing object and matching ClusterSelector should result in add operation": {
 | 
			
		||||
			operationType: fedutil.OperationTypeAdd,
 | 
			
		||||
			sendToCluster: true,
 | 
			
		||||
		},
 | 
			
		||||
		// Update and Delete scenarios are tested in TestServiceController
 | 
			
		||||
	}
 | 
			
		||||
	for testName, testCase := range testCases {
 | 
			
		||||
		t.Run(testName, func(t *testing.T) {
 | 
			
		||||
 | 
			
		||||
			operations, err := getOperationsToPerformOnCluster(sc.federatedInformer, cluster1, obj, func(map[string]string, map[string]string) (bool, error) {
 | 
			
		||||
				if testCase.expectedSendErr {
 | 
			
		||||
					return false, awfulError
 | 
			
		||||
				}
 | 
			
		||||
				return testCase.sendToCluster, nil
 | 
			
		||||
			})
 | 
			
		||||
			if testCase.expectedSendErr {
 | 
			
		||||
				require.Error(t, err, "An error was expected")
 | 
			
		||||
			} else {
 | 
			
		||||
				require.NoError(t, err, "An error was not expected")
 | 
			
		||||
			}
 | 
			
		||||
			if len(testCase.operationType) == 0 {
 | 
			
		||||
				require.Nil(t, operations, "An operation was not expected")
 | 
			
		||||
			} else {
 | 
			
		||||
				require.NotNil(t, operations, "A single operation was expected")
 | 
			
		||||
				require.Equal(t, testCase.operationType, operations.Type, "Unexpected operation returned")
 | 
			
		||||
			}
 | 
			
		||||
		})
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func NewService(name string, port int32) *v1.Service {
 | 
			
		||||
	return &v1.Service{
 | 
			
		||||
		ObjectMeta: metav1.ObjectMeta{
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user