Adding support for ModifyLoadbalancer in windows kubeproxy.
This commit is contained in:
		| @@ -20,7 +20,6 @@ limitations under the License. | ||||
| package winkernel | ||||
|  | ||||
| import ( | ||||
| 	"github.com/Microsoft/hcsshim" | ||||
| 	"github.com/Microsoft/hcsshim/hcn" | ||||
| 	"k8s.io/klog/v2" | ||||
| ) | ||||
| @@ -41,6 +40,7 @@ type HcnService interface { | ||||
| 	ListLoadBalancers() ([]hcn.HostComputeLoadBalancer, error) | ||||
| 	GetLoadBalancerByID(loadBalancerId string) (*hcn.HostComputeLoadBalancer, error) | ||||
| 	CreateLoadBalancer(loadBalancer *hcn.HostComputeLoadBalancer) (*hcn.HostComputeLoadBalancer, error) | ||||
| 	UpdateLoadBalancer(loadBalancer *hcn.HostComputeLoadBalancer, hnsID string) (*hcn.HostComputeLoadBalancer, error) | ||||
| 	DeleteLoadBalancer(loadBalancer *hcn.HostComputeLoadBalancer) error | ||||
| 	// Features functions | ||||
| 	GetSupportedFeatures() hcn.SupportedFeatures | ||||
| @@ -104,6 +104,10 @@ func (hcnObj hcnImpl) CreateLoadBalancer(loadBalancer *hcn.HostComputeLoadBalanc | ||||
| 	return loadBalancer.Create() | ||||
| } | ||||
|  | ||||
| func (hcnObj hcnImpl) UpdateLoadBalancer(loadBalancer *hcn.HostComputeLoadBalancer, hnsID string) (*hcn.HostComputeLoadBalancer, error) { | ||||
| 	return loadBalancer.Update(hnsID) | ||||
| } | ||||
|  | ||||
| func (hcnObj hcnImpl) DeleteLoadBalancer(loadBalancer *hcn.HostComputeLoadBalancer) error { | ||||
| 	return loadBalancer.Delete() | ||||
| } | ||||
| @@ -121,15 +125,16 @@ func (hcnObj hcnImpl) DsrSupported() error { | ||||
| } | ||||
|  | ||||
| func (hcnObj hcnImpl) DeleteAllHnsLoadBalancerPolicy() { | ||||
| 	plists, err := hcsshim.HNSListPolicyListRequest() | ||||
| 	lbs, err := hcnObj.ListLoadBalancers() | ||||
| 	if err != nil { | ||||
| 		klog.V(2).ErrorS(err, "Deleting all existing loadbalancers failed.") | ||||
| 		return | ||||
| 	} | ||||
| 	for _, plist := range plists { | ||||
| 		klog.V(3).InfoS("Remove policy", "policies", plist) | ||||
| 		_, err = plist.Delete() | ||||
| 	klog.V(3).InfoS("Deleting all existing loadbalancers", "lbCount", len(lbs)) | ||||
| 	for _, lb := range lbs { | ||||
| 		err = lb.Delete() | ||||
| 		if err != nil { | ||||
| 			klog.ErrorS(err, "Failed to delete policy list") | ||||
| 			klog.V(2).ErrorS(err, "Error deleting existing loadbalancer", "lb", lb) | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|   | ||||
| @@ -40,6 +40,7 @@ type HostNetworkService interface { | ||||
| 	deleteEndpoint(hnsID string) error | ||||
| 	getLoadBalancer(endpoints []endpointInfo, flags loadBalancerFlags, sourceVip string, vip string, protocol uint16, internalPort uint16, externalPort uint16, previousLoadBalancers map[loadBalancerIdentifier]*loadBalancerInfo) (*loadBalancerInfo, error) | ||||
| 	getAllLoadBalancers() (map[loadBalancerIdentifier]*loadBalancerInfo, error) | ||||
| 	updateLoadBalancer(hnsID string, sourceVip, vip string, endpoints []endpointInfo, flags loadBalancerFlags, protocol, internalPort, externalPort uint16, previousLoadBalancers map[loadBalancerIdentifier]*loadBalancerInfo) (*loadBalancerInfo, error) | ||||
| 	deleteLoadBalancer(hnsID string) error | ||||
| } | ||||
|  | ||||
| @@ -54,6 +55,33 @@ var ( | ||||
| 	LoadBalancerPortMappingFlagsVipExternalIP hcn.LoadBalancerPortMappingFlags = 16 | ||||
| ) | ||||
|  | ||||
| func getLoadBalancerPolicyFlags(flags loadBalancerFlags) (lbPortMappingFlags hcn.LoadBalancerPortMappingFlags, lbFlags hcn.LoadBalancerFlags) { | ||||
| 	lbPortMappingFlags = hcn.LoadBalancerPortMappingFlagsNone | ||||
| 	if flags.isILB { | ||||
| 		lbPortMappingFlags |= hcn.LoadBalancerPortMappingFlagsILB | ||||
| 	} | ||||
| 	if flags.useMUX { | ||||
| 		lbPortMappingFlags |= hcn.LoadBalancerPortMappingFlagsUseMux | ||||
| 	} | ||||
| 	if flags.preserveDIP { | ||||
| 		lbPortMappingFlags |= hcn.LoadBalancerPortMappingFlagsPreserveDIP | ||||
| 	} | ||||
| 	if flags.localRoutedVIP { | ||||
| 		lbPortMappingFlags |= hcn.LoadBalancerPortMappingFlagsLocalRoutedVIP | ||||
| 	} | ||||
| 	if flags.isVipExternalIP { | ||||
| 		lbPortMappingFlags |= LoadBalancerPortMappingFlagsVipExternalIP | ||||
| 	} | ||||
| 	lbFlags = hcn.LoadBalancerFlagsNone | ||||
| 	if flags.isDSR { | ||||
| 		lbFlags |= hcn.LoadBalancerFlagsDSR | ||||
| 	} | ||||
| 	if flags.isIPv6 { | ||||
| 		lbFlags |= LoadBalancerFlagsIPv6 | ||||
| 	} | ||||
| 	return | ||||
| } | ||||
|  | ||||
| func (hns hns) getNetworkByName(name string) (*hnsNetworkInfo, error) { | ||||
| 	hnsnetwork, err := hns.hcn.GetNetworkByName(name) | ||||
| 	if err != nil { | ||||
| @@ -406,6 +434,84 @@ func (hns hns) getLoadBalancer(endpoints []endpointInfo, flags loadBalancerFlags | ||||
| 	return lbInfo, err | ||||
| } | ||||
|  | ||||
| func (hns hns) updateLoadBalancer(hnsID string, | ||||
| 	sourceVip, | ||||
| 	vip string, | ||||
| 	endpoints []endpointInfo, | ||||
| 	flags loadBalancerFlags, | ||||
| 	protocol, | ||||
| 	internalPort, | ||||
| 	externalPort uint16, | ||||
| 	previousLoadBalancers map[loadBalancerIdentifier]*loadBalancerInfo) (*loadBalancerInfo, error) { | ||||
| 	klog.V(3).InfoS("Updating existing loadbalancer called", "hnsLbID", hnsID, "endpointCount", len(endpoints), "vip", vip, "sourceVip", sourceVip, "internalPort", internalPort, "externalPort", externalPort) | ||||
|  | ||||
| 	var id loadBalancerIdentifier | ||||
| 	vips := []string{} | ||||
| 	// Compute hash from backends (endpoint IDs) | ||||
| 	hash, err := hashEndpoints(endpoints) | ||||
| 	if err != nil { | ||||
| 		klog.V(2).ErrorS(err, "Error hashing endpoints", "endpoints", endpoints) | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	if len(vip) > 0 { | ||||
| 		id = loadBalancerIdentifier{protocol: protocol, internalPort: internalPort, externalPort: externalPort, vip: vip, endpointsHash: hash} | ||||
| 		vips = append(vips, vip) | ||||
| 	} else { | ||||
| 		id = loadBalancerIdentifier{protocol: protocol, internalPort: internalPort, externalPort: externalPort, endpointsHash: hash} | ||||
| 	} | ||||
|  | ||||
| 	if lb, found := previousLoadBalancers[id]; found { | ||||
| 		klog.V(1).InfoS("Found cached Hns loadbalancer policy resource", "policies", lb) | ||||
| 		return lb, nil | ||||
| 	} | ||||
|  | ||||
| 	lbPortMappingFlags, lbFlags := getLoadBalancerPolicyFlags(flags) | ||||
|  | ||||
| 	lbDistributionType := hcn.LoadBalancerDistributionNone | ||||
|  | ||||
| 	if flags.sessionAffinity { | ||||
| 		lbDistributionType = hcn.LoadBalancerDistributionSourceIP | ||||
| 	} | ||||
|  | ||||
| 	loadBalancer := &hcn.HostComputeLoadBalancer{ | ||||
| 		SourceVIP: sourceVip, | ||||
| 		PortMappings: []hcn.LoadBalancerPortMapping{ | ||||
| 			{ | ||||
| 				Protocol:         uint32(protocol), | ||||
| 				InternalPort:     internalPort, | ||||
| 				ExternalPort:     externalPort, | ||||
| 				DistributionType: lbDistributionType, | ||||
| 				Flags:            lbPortMappingFlags, | ||||
| 			}, | ||||
| 		}, | ||||
| 		FrontendVIPs: vips, | ||||
| 		SchemaVersion: hcn.SchemaVersion{ | ||||
| 			Major: 2, | ||||
| 			Minor: 0, | ||||
| 		}, | ||||
| 		Flags: lbFlags, | ||||
| 	} | ||||
|  | ||||
| 	for _, ep := range endpoints { | ||||
| 		loadBalancer.HostComputeEndpoints = append(loadBalancer.HostComputeEndpoints, ep.hnsID) | ||||
| 	} | ||||
|  | ||||
| 	lb, err := hns.hcn.UpdateLoadBalancer(loadBalancer, hnsID) | ||||
|  | ||||
| 	if err != nil { | ||||
| 		klog.V(2).ErrorS(err, "Error updating existing loadbalancer", "hnsLbID", hnsID, "error", err, "endpoints", endpoints) | ||||
| 		return nil, err | ||||
| 	} | ||||
|  | ||||
| 	klog.V(1).InfoS("Update loadbalancer is successful", "loadBalancer", lb) | ||||
| 	lbInfo := &loadBalancerInfo{ | ||||
| 		hnsID: lb.Id, | ||||
| 	} | ||||
| 	// Add to map of load balancers | ||||
| 	previousLoadBalancers[id] = lbInfo | ||||
| 	return lbInfo, err | ||||
| } | ||||
|  | ||||
| func (hns hns) deleteLoadBalancer(hnsID string) error { | ||||
| 	lb, err := hns.hcn.GetLoadBalancerByID(hnsID) | ||||
| 	if err != nil { | ||||
| @@ -440,7 +546,7 @@ func hashEndpoints[T string | endpointInfo](endpoints []T) (hash [20]byte, err e | ||||
| 		case endpointInfo: | ||||
| 			id = strings.ToUpper(x.hnsID) | ||||
| 		case string: | ||||
| 			id = x | ||||
| 			id = strings.ToUpper(x) | ||||
| 		} | ||||
| 		if len(id) > 0 { | ||||
| 			// We XOR the hashes of endpoints, since they are an unordered set. | ||||
|   | ||||
| @@ -155,6 +155,7 @@ const ( | ||||
| func newHostNetworkService(hcnImpl HcnService) (HostNetworkService, hcn.SupportedFeatures) { | ||||
| 	var h HostNetworkService | ||||
| 	supportedFeatures := hcnImpl.GetSupportedFeatures() | ||||
| 	klog.V(3).InfoS("HNS Supported features", "hnsSupportedFeatures", supportedFeatures) | ||||
| 	if supportedFeatures.Api.V2 { | ||||
| 		h = hns{ | ||||
| 			hcn: hcnImpl, | ||||
| @@ -344,19 +345,37 @@ func conjureMac(macPrefix string, ip net.IP) string { | ||||
| 	return "02-11-22-33-44-55" | ||||
| } | ||||
|  | ||||
| // This will keep the track of all terminated endpoints. | ||||
| // This is done by adding the endpoints from old endpoint map and removing the endpoints from new endpoint map. | ||||
| // This way, we have entries which are only present in old endpoint map and not in new endpoint map. | ||||
| func (proxier *Proxier) updateTerminatedEndpoints(eps []proxy.Endpoint, isOldEndpointsMap bool) { | ||||
| 	for _, ep := range eps { | ||||
| 		if !ep.IsLocal() { | ||||
| 			if isOldEndpointsMap { | ||||
| 				proxier.terminatedEndpoints[ep.IP()] = true | ||||
| 			} else { | ||||
| 				delete(proxier.terminatedEndpoints, ep.IP()) | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (proxier *Proxier) endpointsMapChange(oldEndpointsMap, newEndpointsMap proxy.EndpointsMap) { | ||||
| 	// This will optimize remote endpoint and loadbalancer deletion based on the annotation | ||||
| 	var svcPortMap = make(map[proxy.ServicePortName]bool) | ||||
| 	clear(proxier.terminatedEndpoints) | ||||
| 	var logLevel klog.Level = 5 | ||||
| 	for svcPortName, eps := range oldEndpointsMap { | ||||
| 		logFormattedEndpoints("endpointsMapChange oldEndpointsMap", logLevel, svcPortName, eps) | ||||
| 		svcPortMap[svcPortName] = true | ||||
| 		proxier.updateTerminatedEndpoints(eps, true) | ||||
| 		proxier.onEndpointsMapChange(&svcPortName, false) | ||||
| 	} | ||||
|  | ||||
| 	for svcPortName, eps := range newEndpointsMap { | ||||
| 		logFormattedEndpoints("endpointsMapChange newEndpointsMap", logLevel, svcPortName, eps) | ||||
| 		// redundantCleanup true means cleanup is called second time on the same svcPort | ||||
| 		proxier.updateTerminatedEndpoints(eps, false) | ||||
| 		redundantCleanup := svcPortMap[svcPortName] | ||||
| 		proxier.onEndpointsMapChange(&svcPortName, redundantCleanup) | ||||
| 	} | ||||
| @@ -615,6 +634,7 @@ type Proxier struct { | ||||
| 	forwardHealthCheckVip bool | ||||
| 	rootHnsEndpointName   string | ||||
| 	mapStaleLoadbalancers map[string]bool // This maintains entries of stale load balancers which are pending delete in last iteration | ||||
| 	terminatedEndpoints   map[string]bool // This maintains entries of endpoints which are terminated. Key is ip address:portnumber | ||||
| } | ||||
|  | ||||
| type localPort struct { | ||||
| @@ -773,6 +793,7 @@ func NewProxier( | ||||
| 		rootHnsEndpointName:   config.RootHnsEndpointName, | ||||
| 		forwardHealthCheckVip: config.ForwardHealthCheckVip, | ||||
| 		mapStaleLoadbalancers: make(map[string]bool), | ||||
| 		terminatedEndpoints:   make(map[string]bool), | ||||
| 	} | ||||
|  | ||||
| 	serviceChanges := proxy.NewServiceChangeTracker(proxier.newServiceInfo, ipFamily, recorder, proxier.serviceMapChange) | ||||
| @@ -1085,6 +1106,25 @@ func updateQueriedEndpoints(newHnsEndpoint *endpointInfo, queriedEndpoints map[s | ||||
| 	queriedEndpoints[newHnsEndpoint.ip] = newHnsEndpoint | ||||
| } | ||||
|  | ||||
| func (proxier *Proxier) requiresUpdateLoadbalancer(lbHnsID string, endpointCount int) bool { | ||||
| 	return proxier.supportedFeatures.ModifyLoadbalancer && lbHnsID != "" && endpointCount > 0 | ||||
| } | ||||
|  | ||||
| // handleUpdateLoadbalancerFailure will handle the error returned by updatePolicy. If the error is due to unsupported feature, | ||||
| // then it will set the supportedFeatures.ModifyLoadbalancer to false. return true means skip the iteration. | ||||
| func (proxier *Proxier) handleUpdateLoadbalancerFailure(err error, hnsID, svcIP string, endpointCount int) (skipIteration bool) { | ||||
| 	if err != nil { | ||||
| 		if hcn.IsNotImplemented(err) { | ||||
| 			klog.Warning("Update loadbalancer policies is not implemented.", "hnsID", hnsID, "svcIP", svcIP, "endpointCount", endpointCount) | ||||
| 			proxier.supportedFeatures.ModifyLoadbalancer = false | ||||
| 		} else { | ||||
| 			klog.ErrorS(err, "Update loadbalancer policy failed", "hnsID", hnsID, "svcIP", svcIP, "endpointCount", endpointCount) | ||||
| 			skipIteration = true | ||||
| 		} | ||||
| 	} | ||||
| 	return skipIteration | ||||
| } | ||||
|  | ||||
| // This is where all of the hns save/restore calls happen. | ||||
| // assumes proxier.mu is held | ||||
| func (proxier *Proxier) syncProxyRules() { | ||||
| @@ -1368,7 +1408,7 @@ func (proxier *Proxier) syncProxyRules() { | ||||
|  | ||||
| 		if len(svcInfo.hnsID) > 0 { | ||||
| 			// This should not happen | ||||
| 			klog.InfoS("Load Balancer already exists -- Debug ", "hnsID", svcInfo.hnsID) | ||||
| 			klog.InfoS("Load Balancer already exists.", "hnsID", svcInfo.hnsID) | ||||
| 		} | ||||
|  | ||||
| 		// In ETP:Cluster, if all endpoints are under termination, | ||||
| @@ -1396,7 +1436,6 @@ func (proxier *Proxier) syncProxyRules() { | ||||
| 		} | ||||
|  | ||||
| 		endpointsAvailableForLB := !allEndpointsTerminating && !allEndpointsNonServing | ||||
| 		proxier.deleteExistingLoadBalancer(hns, svcInfo.winProxyOptimization, &svcInfo.hnsID, sourceVip, Enum(svcInfo.Protocol()), uint16(svcInfo.targetPort), uint16(svcInfo.Port()), hnsEndpoints, queriedLoadBalancers) | ||||
|  | ||||
| 		// clusterIPEndpoints is the endpoint list used for creating ClusterIP loadbalancer. | ||||
| 		clusterIPEndpoints := hnsEndpoints | ||||
| @@ -1405,6 +1444,25 @@ func (proxier *Proxier) syncProxyRules() { | ||||
| 			clusterIPEndpoints = hnsLocalEndpoints | ||||
| 		} | ||||
|  | ||||
| 		if proxier.requiresUpdateLoadbalancer(svcInfo.hnsID, len(clusterIPEndpoints)) { | ||||
| 			hnsLoadBalancer, err = hns.updateLoadBalancer( | ||||
| 				svcInfo.hnsID, | ||||
| 				sourceVip, | ||||
| 				svcInfo.ClusterIP().String(), | ||||
| 				clusterIPEndpoints, | ||||
| 				loadBalancerFlags{isDSR: proxier.isDSR, isIPv6: proxier.ipFamily == v1.IPv6Protocol, sessionAffinity: sessionAffinityClientIP}, | ||||
| 				Enum(svcInfo.Protocol()), | ||||
| 				uint16(svcInfo.targetPort), | ||||
| 				uint16(svcInfo.Port()), | ||||
| 				queriedLoadBalancers, | ||||
| 			) | ||||
| 			if skipIteration := proxier.handleUpdateLoadbalancerFailure(err, svcInfo.hnsID, svcInfo.ClusterIP().String(), len(clusterIPEndpoints)); skipIteration { | ||||
| 				continue | ||||
| 			} | ||||
| 		} | ||||
|  | ||||
| 		if !proxier.requiresUpdateLoadbalancer(svcInfo.hnsID, len(clusterIPEndpoints)) { | ||||
| 			proxier.deleteExistingLoadBalancer(hns, svcInfo.winProxyOptimization, &svcInfo.hnsID, svcInfo.ClusterIP().String(), Enum(svcInfo.Protocol()), uint16(svcInfo.targetPort), uint16(svcInfo.Port()), hnsEndpoints, queriedLoadBalancers) | ||||
| 			if len(clusterIPEndpoints) > 0 { | ||||
|  | ||||
| 				// If all endpoints are terminating, then no need to create Cluster IP LoadBalancer | ||||
| @@ -1420,7 +1478,7 @@ func (proxier *Proxier) syncProxyRules() { | ||||
| 					queriedLoadBalancers, | ||||
| 				) | ||||
| 				if err != nil { | ||||
| 				klog.ErrorS(err, "Policy creation failed") | ||||
| 					klog.ErrorS(err, "ClusterIP policy creation failed") | ||||
| 					continue | ||||
| 				} | ||||
|  | ||||
| @@ -1430,6 +1488,7 @@ func (proxier *Proxier) syncProxyRules() { | ||||
| 			} else { | ||||
| 				klog.V(3).InfoS("Skipped creating Hns LoadBalancer for cluster ip resources. Reason : all endpoints are terminating", "clusterIP", svcInfo.ClusterIP(), "nodeport", svcInfo.NodePort(), "allEndpointsTerminating", allEndpointsTerminating) | ||||
| 			} | ||||
| 		} | ||||
|  | ||||
| 		// If nodePort is specified, user should be able to use nodeIP:nodePort to reach the backend endpoints | ||||
| 		if svcInfo.NodePort() > 0 { | ||||
| @@ -1440,7 +1499,25 @@ func (proxier *Proxier) syncProxyRules() { | ||||
| 				nodePortEndpoints = hnsLocalEndpoints | ||||
| 			} | ||||
|  | ||||
| 			proxier.deleteExistingLoadBalancer(hns, svcInfo.winProxyOptimization, &svcInfo.nodePorthnsID, sourceVip, Enum(svcInfo.Protocol()), uint16(svcInfo.targetPort), uint16(svcInfo.Port()), nodePortEndpoints, queriedLoadBalancers) | ||||
| 			if proxier.requiresUpdateLoadbalancer(svcInfo.nodePorthnsID, len(nodePortEndpoints)) && endpointsAvailableForLB { | ||||
| 				hnsLoadBalancer, err = hns.updateLoadBalancer( | ||||
| 					svcInfo.nodePorthnsID, | ||||
| 					sourceVip, | ||||
| 					"", | ||||
| 					nodePortEndpoints, | ||||
| 					loadBalancerFlags{isVipExternalIP: true, isDSR: svcInfo.localTrafficDSR, localRoutedVIP: true, sessionAffinity: sessionAffinityClientIP, isIPv6: proxier.ipFamily == v1.IPv6Protocol}, | ||||
| 					Enum(svcInfo.Protocol()), | ||||
| 					uint16(svcInfo.targetPort), | ||||
| 					uint16(svcInfo.NodePort()), | ||||
| 					queriedLoadBalancers, | ||||
| 				) | ||||
| 				if skipIteration := proxier.handleUpdateLoadbalancerFailure(err, svcInfo.nodePorthnsID, sourceVip, len(nodePortEndpoints)); skipIteration { | ||||
| 					continue | ||||
| 				} | ||||
| 			} | ||||
|  | ||||
| 			if !proxier.requiresUpdateLoadbalancer(svcInfo.nodePorthnsID, len(nodePortEndpoints)) { | ||||
| 				proxier.deleteExistingLoadBalancer(hns, svcInfo.winProxyOptimization, &svcInfo.nodePorthnsID, "", Enum(svcInfo.Protocol()), uint16(svcInfo.targetPort), uint16(svcInfo.NodePort()), nodePortEndpoints, queriedLoadBalancers) | ||||
|  | ||||
| 				if len(nodePortEndpoints) > 0 && endpointsAvailableForLB { | ||||
| 					// If all endpoints are in terminating stage, then no need to create Node Port LoadBalancer | ||||
| @@ -1455,7 +1532,7 @@ func (proxier *Proxier) syncProxyRules() { | ||||
| 						queriedLoadBalancers, | ||||
| 					) | ||||
| 					if err != nil { | ||||
| 					klog.ErrorS(err, "Policy creation failed") | ||||
| 						klog.ErrorS(err, "Nodeport policy creation failed") | ||||
| 						continue | ||||
| 					} | ||||
|  | ||||
| @@ -1465,6 +1542,7 @@ func (proxier *Proxier) syncProxyRules() { | ||||
| 					klog.V(3).InfoS("Skipped creating Hns LoadBalancer for nodePort resources", "clusterIP", svcInfo.ClusterIP(), "nodeport", svcInfo.NodePort(), "allEndpointsTerminating", allEndpointsTerminating) | ||||
| 				} | ||||
| 			} | ||||
| 		} | ||||
|  | ||||
| 		// Create a Load Balancer Policy for each external IP | ||||
| 		for _, externalIP := range svcInfo.externalIPs { | ||||
| @@ -1474,7 +1552,25 @@ func (proxier *Proxier) syncProxyRules() { | ||||
| 				externalIPEndpoints = hnsLocalEndpoints | ||||
| 			} | ||||
|  | ||||
| 			proxier.deleteExistingLoadBalancer(hns, svcInfo.winProxyOptimization, &externalIP.hnsID, sourceVip, Enum(svcInfo.Protocol()), uint16(svcInfo.targetPort), uint16(svcInfo.Port()), externalIPEndpoints, queriedLoadBalancers) | ||||
| 			if proxier.requiresUpdateLoadbalancer(externalIP.hnsID, len(externalIPEndpoints)) && endpointsAvailableForLB { | ||||
| 				hnsLoadBalancer, err = hns.updateLoadBalancer( | ||||
| 					externalIP.hnsID, | ||||
| 					sourceVip, | ||||
| 					externalIP.ip, | ||||
| 					externalIPEndpoints, | ||||
| 					loadBalancerFlags{isVipExternalIP: true, isDSR: svcInfo.localTrafficDSR, sessionAffinity: sessionAffinityClientIP, isIPv6: proxier.ipFamily == v1.IPv6Protocol}, | ||||
| 					Enum(svcInfo.Protocol()), | ||||
| 					uint16(svcInfo.targetPort), | ||||
| 					uint16(svcInfo.Port()), | ||||
| 					queriedLoadBalancers, | ||||
| 				) | ||||
| 				if skipIteration := proxier.handleUpdateLoadbalancerFailure(err, externalIP.hnsID, externalIP.ip, len(externalIPEndpoints)); skipIteration { | ||||
| 					continue | ||||
| 				} | ||||
| 			} | ||||
|  | ||||
| 			if !proxier.requiresUpdateLoadbalancer(externalIP.hnsID, len(externalIPEndpoints)) { | ||||
| 				proxier.deleteExistingLoadBalancer(hns, svcInfo.winProxyOptimization, &externalIP.hnsID, externalIP.ip, Enum(svcInfo.Protocol()), uint16(svcInfo.targetPort), uint16(svcInfo.Port()), externalIPEndpoints, queriedLoadBalancers) | ||||
|  | ||||
| 				if len(externalIPEndpoints) > 0 && endpointsAvailableForLB { | ||||
| 					// If all endpoints are in terminating stage, then no need to External IP LoadBalancer | ||||
| @@ -1490,7 +1586,7 @@ func (proxier *Proxier) syncProxyRules() { | ||||
| 						queriedLoadBalancers, | ||||
| 					) | ||||
| 					if err != nil { | ||||
| 					klog.ErrorS(err, "Policy creation failed") | ||||
| 						klog.ErrorS(err, "ExternalIP policy creation failed") | ||||
| 						continue | ||||
| 					} | ||||
| 					externalIP.hnsID = hnsLoadBalancer.hnsID | ||||
| @@ -1499,6 +1595,7 @@ func (proxier *Proxier) syncProxyRules() { | ||||
| 					klog.V(3).InfoS("Skipped creating Hns LoadBalancer for externalIP resources", "externalIP", externalIP, "allEndpointsTerminating", allEndpointsTerminating) | ||||
| 				} | ||||
| 			} | ||||
| 		} | ||||
| 		// Create a Load Balancer Policy for each loadbalancer ingress | ||||
| 		for _, lbIngressIP := range svcInfo.loadBalancerIngressIPs { | ||||
| 			// Try loading existing policies, if already available | ||||
| @@ -1507,7 +1604,25 @@ func (proxier *Proxier) syncProxyRules() { | ||||
| 				lbIngressEndpoints = hnsLocalEndpoints | ||||
| 			} | ||||
|  | ||||
| 			proxier.deleteExistingLoadBalancer(hns, svcInfo.winProxyOptimization, &lbIngressIP.hnsID, sourceVip, Enum(svcInfo.Protocol()), uint16(svcInfo.targetPort), uint16(svcInfo.Port()), lbIngressEndpoints, queriedLoadBalancers) | ||||
| 			if proxier.requiresUpdateLoadbalancer(lbIngressIP.hnsID, len(lbIngressEndpoints)) { | ||||
| 				hnsLoadBalancer, err = hns.updateLoadBalancer( | ||||
| 					lbIngressIP.hnsID, | ||||
| 					sourceVip, | ||||
| 					lbIngressIP.ip, | ||||
| 					lbIngressEndpoints, | ||||
| 					loadBalancerFlags{isVipExternalIP: true, isDSR: svcInfo.preserveDIP || svcInfo.localTrafficDSR, useMUX: svcInfo.preserveDIP, preserveDIP: svcInfo.preserveDIP, sessionAffinity: sessionAffinityClientIP, isIPv6: proxier.ipFamily == v1.IPv6Protocol}, | ||||
| 					Enum(svcInfo.Protocol()), | ||||
| 					uint16(svcInfo.targetPort), | ||||
| 					uint16(svcInfo.Port()), | ||||
| 					queriedLoadBalancers, | ||||
| 				) | ||||
| 				if skipIteration := proxier.handleUpdateLoadbalancerFailure(err, lbIngressIP.hnsID, lbIngressIP.ip, len(lbIngressEndpoints)); skipIteration { | ||||
| 					continue | ||||
| 				} | ||||
| 			} | ||||
|  | ||||
| 			if !proxier.requiresUpdateLoadbalancer(lbIngressIP.hnsID, len(lbIngressEndpoints)) { | ||||
| 				proxier.deleteExistingLoadBalancer(hns, svcInfo.winProxyOptimization, &lbIngressIP.hnsID, lbIngressIP.ip, Enum(svcInfo.Protocol()), uint16(svcInfo.targetPort), uint16(svcInfo.Port()), lbIngressEndpoints, queriedLoadBalancers) | ||||
|  | ||||
| 				if len(lbIngressEndpoints) > 0 { | ||||
| 					hnsLoadBalancer, err := hns.getLoadBalancer( | ||||
| @@ -1521,7 +1636,7 @@ func (proxier *Proxier) syncProxyRules() { | ||||
| 						queriedLoadBalancers, | ||||
| 					) | ||||
| 					if err != nil { | ||||
| 					klog.ErrorS(err, "Policy creation failed") | ||||
| 						klog.ErrorS(err, "IngressIP policy creation failed") | ||||
| 						continue | ||||
| 					} | ||||
| 					lbIngressIP.hnsID = hnsLoadBalancer.hnsID | ||||
| @@ -1529,6 +1644,7 @@ func (proxier *Proxier) syncProxyRules() { | ||||
| 				} else { | ||||
| 					klog.V(3).InfoS("Skipped creating Hns LoadBalancer for loadBalancer Ingress resources", "lbIngressIP", lbIngressIP) | ||||
| 				} | ||||
| 			} | ||||
|  | ||||
| 			if proxier.forwardHealthCheckVip && gatewayHnsendpoint != nil && endpointsAvailableForLB { | ||||
| 				// Avoid creating health check loadbalancer if all the endpoints are terminating | ||||
| @@ -1537,10 +1653,30 @@ func (proxier *Proxier) syncProxyRules() { | ||||
| 					nodeport = svcInfo.HealthCheckNodePort() | ||||
| 				} | ||||
|  | ||||
| 				proxier.deleteExistingLoadBalancer(hns, svcInfo.winProxyOptimization, &lbIngressIP.healthCheckHnsID, sourceVip, Enum(svcInfo.Protocol()), uint16(svcInfo.targetPort), uint16(svcInfo.Port()), []endpointInfo{*gatewayHnsendpoint}, queriedLoadBalancers) | ||||
| 				gwEndpoints := []endpointInfo{*gatewayHnsendpoint} | ||||
|  | ||||
| 				if proxier.requiresUpdateLoadbalancer(lbIngressIP.healthCheckHnsID, len(gwEndpoints)) { | ||||
| 					hnsLoadBalancer, err = hns.updateLoadBalancer( | ||||
| 						lbIngressIP.healthCheckHnsID, | ||||
| 						sourceVip, | ||||
| 						lbIngressIP.ip, | ||||
| 						gwEndpoints, | ||||
| 						loadBalancerFlags{isDSR: false, useMUX: svcInfo.preserveDIP, preserveDIP: svcInfo.preserveDIP}, | ||||
| 						Enum(svcInfo.Protocol()), | ||||
| 						uint16(nodeport), | ||||
| 						uint16(nodeport), | ||||
| 						queriedLoadBalancers, | ||||
| 					) | ||||
| 					if skipIteration := proxier.handleUpdateLoadbalancerFailure(err, lbIngressIP.healthCheckHnsID, lbIngressIP.ip, 1); skipIteration { | ||||
| 						continue | ||||
| 					} | ||||
| 				} | ||||
|  | ||||
| 				if !proxier.requiresUpdateLoadbalancer(lbIngressIP.healthCheckHnsID, len(gwEndpoints)) { | ||||
| 					proxier.deleteExistingLoadBalancer(hns, svcInfo.winProxyOptimization, &lbIngressIP.healthCheckHnsID, lbIngressIP.ip, Enum(svcInfo.Protocol()), uint16(nodeport), uint16(nodeport), gwEndpoints, queriedLoadBalancers) | ||||
|  | ||||
| 					hnsHealthCheckLoadBalancer, err := hns.getLoadBalancer( | ||||
| 					[]endpointInfo{*gatewayHnsendpoint}, | ||||
| 						gwEndpoints, | ||||
| 						loadBalancerFlags{isDSR: false, useMUX: svcInfo.preserveDIP, preserveDIP: svcInfo.preserveDIP}, | ||||
| 						sourceVip, | ||||
| 						lbIngressIP.ip, | ||||
| @@ -1550,11 +1686,12 @@ func (proxier *Proxier) syncProxyRules() { | ||||
| 						queriedLoadBalancers, | ||||
| 					) | ||||
| 					if err != nil { | ||||
| 					klog.ErrorS(err, "Policy creation failed") | ||||
| 						klog.ErrorS(err, "Healthcheck loadbalancer policy creation failed") | ||||
| 						continue | ||||
| 					} | ||||
| 					lbIngressIP.healthCheckHnsID = hnsHealthCheckLoadBalancer.hnsID | ||||
| 					klog.V(3).InfoS("Hns Health Check LoadBalancer resource created for loadBalancer Ingress resources", "ip", lbIngressIP) | ||||
| 				} | ||||
| 			} else { | ||||
| 				klog.V(3).InfoS("Skipped creating Hns Health Check LoadBalancer for loadBalancer Ingress resources", "ip", lbIngressIP, "allEndpointsTerminating", allEndpointsTerminating) | ||||
| 			} | ||||
| @@ -1586,11 +1723,12 @@ func (proxier *Proxier) syncProxyRules() { | ||||
| 	} | ||||
|  | ||||
| 	// remove stale endpoint refcount entries | ||||
| 	for hnsID, referenceCount := range proxier.endPointsRefCount { | ||||
| 		if *referenceCount <= 0 { | ||||
| 			klog.V(3).InfoS("Deleting unreferenced remote endpoint", "hnsID", hnsID) | ||||
| 			proxier.hns.deleteEndpoint(hnsID) | ||||
| 			delete(proxier.endPointsRefCount, hnsID) | ||||
| 	for epIP := range proxier.terminatedEndpoints { | ||||
| 		if epToDelete := queriedEndpoints[epIP]; epToDelete != nil && epToDelete.hnsID != "" { | ||||
| 			if refCount := proxier.endPointsRefCount.getRefCount(epToDelete.hnsID); refCount == nil || *refCount == 0 { | ||||
| 				klog.V(3).InfoS("Deleting unreferenced remote endpoint", "hnsID", epToDelete.hnsID) | ||||
| 				proxier.hns.deleteEndpoint(epToDelete.hnsID) | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
| 	// This will cleanup stale load balancers which are pending delete | ||||
| @@ -1600,7 +1738,7 @@ func (proxier *Proxier) syncProxyRules() { | ||||
|  | ||||
| // deleteExistingLoadBalancer checks whether loadbalancer delete is needed or not. | ||||
| // If it is needed, the function will delete the existing loadbalancer and return true, else false. | ||||
| func (proxier *Proxier) deleteExistingLoadBalancer(hns HostNetworkService, winProxyOptimization bool, lbHnsID *string, sourceVip string, protocol, intPort, extPort uint16, endpoints []endpointInfo, queriedLoadBalancers map[loadBalancerIdentifier]*loadBalancerInfo) bool { | ||||
| func (proxier *Proxier) deleteExistingLoadBalancer(hns HostNetworkService, winProxyOptimization bool, lbHnsID *string, vip string, protocol, intPort, extPort uint16, endpoints []endpointInfo, queriedLoadBalancers map[loadBalancerIdentifier]*loadBalancerInfo) bool { | ||||
|  | ||||
| 	if !winProxyOptimization || *lbHnsID == "" { | ||||
| 		// Loadbalancer delete not needed | ||||
| @@ -1609,7 +1747,7 @@ func (proxier *Proxier) deleteExistingLoadBalancer(hns HostNetworkService, winPr | ||||
|  | ||||
| 	lbID, lbIdErr := findLoadBalancerID( | ||||
| 		endpoints, | ||||
| 		sourceVip, | ||||
| 		vip, | ||||
| 		protocol, | ||||
| 		intPort, | ||||
| 		extPort, | ||||
|   | ||||
| @@ -118,6 +118,7 @@ func NewFakeProxier(syncPeriod time.Duration, minSyncPeriod time.Duration, hostn | ||||
| 		endPointsRefCount:     make(endPointsReferenceCountMap), | ||||
| 		forwardHealthCheckVip: true, | ||||
| 		mapStaleLoadbalancers: make(map[string]bool), | ||||
| 		terminatedEndpoints:   make(map[string]bool), | ||||
| 	} | ||||
|  | ||||
| 	serviceChanges := proxy.NewServiceChangeTracker(proxier.newServiceInfo, v1.IPv4Protocol, nil, proxier.serviceMapChange) | ||||
| @@ -682,6 +683,291 @@ func TestCreateLoadBalancer(t *testing.T) { | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func TestUpdateLoadBalancerWhenSupported(t *testing.T) { | ||||
| 	syncPeriod := 30 * time.Second | ||||
| 	proxier := NewFakeProxier(syncPeriod, syncPeriod, "testhost", netutils.ParseIPSloppy("10.0.0.1"), NETWORK_TYPE_OVERLAY) | ||||
| 	if proxier == nil { | ||||
| 		t.Error() | ||||
| 	} | ||||
|  | ||||
| 	proxier.supportedFeatures.ModifyLoadbalancer = true | ||||
|  | ||||
| 	svcIP := "10.20.30.41" | ||||
| 	svcPort := 80 | ||||
| 	svcNodePort := 3001 | ||||
| 	svcPortName := proxy.ServicePortName{ | ||||
| 		NamespacedName: makeNSN("ns1", "svc1"), | ||||
| 		Port:           "p80", | ||||
| 		Protocol:       v1.ProtocolTCP, | ||||
| 	} | ||||
|  | ||||
| 	makeServiceMap(proxier, | ||||
| 		makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) { | ||||
| 			svc.Spec.Type = "NodePort" | ||||
| 			svc.Spec.ClusterIP = svcIP | ||||
| 			svc.Spec.Ports = []v1.ServicePort{{ | ||||
| 				Name:     svcPortName.Port, | ||||
| 				Port:     int32(svcPort), | ||||
| 				Protocol: v1.ProtocolTCP, | ||||
| 				NodePort: int32(svcNodePort), | ||||
| 			}} | ||||
| 		}), | ||||
| 	) | ||||
| 	populateEndpointSlices(proxier, | ||||
| 		makeTestEndpointSlice(svcPortName.Namespace, svcPortName.Name, 1, func(eps *discovery.EndpointSlice) { | ||||
| 			eps.AddressType = discovery.AddressTypeIPv4 | ||||
| 			eps.Endpoints = []discovery.Endpoint{{ | ||||
| 				Addresses: []string{epIpAddressRemote}, | ||||
| 			}} | ||||
| 			eps.Ports = []discovery.EndpointPort{{ | ||||
| 				Name:     ptr.To(svcPortName.Port), | ||||
| 				Port:     ptr.To(int32(svcPort)), | ||||
| 				Protocol: ptr.To(v1.ProtocolTCP), | ||||
| 			}} | ||||
| 		}), | ||||
| 	) | ||||
|  | ||||
| 	proxier.setInitialized(true) | ||||
| 	proxier.syncProxyRules() | ||||
|  | ||||
| 	svc := proxier.svcPortMap[svcPortName] | ||||
| 	svcInfo, ok := svc.(*serviceInfo) | ||||
| 	if !ok { | ||||
| 		t.Errorf("Failed to cast serviceInfo %q", svcPortName.String()) | ||||
|  | ||||
| 	} else { | ||||
| 		if svcInfo.hnsID != loadbalancerGuid1 { | ||||
| 			t.Errorf("%v does not match %v", svcInfo.hnsID, loadbalancerGuid1) | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	proxier.setInitialized(false) | ||||
|  | ||||
| 	proxier.OnEndpointSliceUpdate( | ||||
| 		makeTestEndpointSlice(svcPortName.Namespace, svcPortName.Name, 1, func(eps *discovery.EndpointSlice) { | ||||
| 			eps.AddressType = discovery.AddressTypeIPv4 | ||||
| 			eps.Endpoints = []discovery.Endpoint{{ | ||||
| 				Addresses: []string{epIpAddressRemote}, | ||||
| 			}} | ||||
| 			eps.Ports = []discovery.EndpointPort{{ | ||||
| 				Name:     ptr.To(svcPortName.Port), | ||||
| 				Port:     ptr.To(int32(svcPort)), | ||||
| 				Protocol: ptr.To(v1.ProtocolTCP), | ||||
| 			}} | ||||
| 		}), | ||||
| 		makeTestEndpointSlice(svcPortName.Namespace, svcPortName.Name, 1, func(eps *discovery.EndpointSlice) { | ||||
| 			eps.AddressType = discovery.AddressTypeIPv4 | ||||
| 			eps.Endpoints = []discovery.Endpoint{{ | ||||
| 				Addresses: []string{epPaAddress}, | ||||
| 			}} | ||||
| 			eps.Ports = []discovery.EndpointPort{{ | ||||
| 				Name:     ptr.To(svcPortName.Port), | ||||
| 				Port:     ptr.To(int32(svcPort)), | ||||
| 				Protocol: ptr.To(v1.ProtocolTCP), | ||||
| 			}} | ||||
| 		})) | ||||
|  | ||||
| 	proxier.mu.Lock() | ||||
| 	proxier.endpointSlicesSynced = true | ||||
| 	proxier.mu.Unlock() | ||||
|  | ||||
| 	proxier.setInitialized(true) | ||||
|  | ||||
| 	epObj, err := proxier.hcn.GetEndpointByID("EPID-3") | ||||
| 	if err != nil || epObj == nil { | ||||
| 		t.Errorf("Failed to fetch endpoint: EPID-3") | ||||
| 	} | ||||
|  | ||||
| 	proxier.syncProxyRules() | ||||
|  | ||||
| 	// The endpoint should be deleted as it is not present in the new endpoint slice | ||||
| 	epObj, err = proxier.hcn.GetEndpointByID("EPID-3") | ||||
| 	if err == nil || epObj != nil { | ||||
| 		t.Errorf("Failed to fetch endpoint: EPID-3") | ||||
| 	} | ||||
|  | ||||
| 	ep := proxier.endpointsMap[svcPortName][0] | ||||
| 	epInfo, ok := ep.(*endpointInfo) | ||||
|  | ||||
| 	epObj, err = proxier.hcn.GetEndpointByID("EPID-5") | ||||
| 	if err != nil || epObj == nil { | ||||
| 		t.Errorf("Failed to fetch endpoint: EPID-5") | ||||
| 	} | ||||
|  | ||||
| 	if !ok { | ||||
| 		t.Errorf("Failed to cast endpointInfo %q", svcPortName.String()) | ||||
|  | ||||
| 	} else { | ||||
| 		if epInfo.hnsID != "EPID-5" { | ||||
| 			t.Errorf("%v does not match %v", epInfo.hnsID, "EPID-5") | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	if *epInfo.refCount != 1 { | ||||
| 		t.Errorf("Incorrect refcount. Current value: %v", *epInfo.refCount) | ||||
| 	} | ||||
|  | ||||
| 	if *proxier.endPointsRefCount["EPID-5"] != *epInfo.refCount { | ||||
| 		t.Errorf("Global refCount: %v does not match endpoint refCount: %v", *proxier.endPointsRefCount[endpointGuid1], *epInfo.refCount) | ||||
| 	} | ||||
|  | ||||
| 	svc = proxier.svcPortMap[svcPortName] | ||||
| 	svcInfo, ok = svc.(*serviceInfo) | ||||
| 	if !ok { | ||||
| 		t.Errorf("Failed to cast serviceInfo %q", svcPortName.String()) | ||||
|  | ||||
| 	} else { | ||||
| 		// Loadbalancer id should not change after the update | ||||
| 		if svcInfo.hnsID != loadbalancerGuid1 { | ||||
| 			t.Errorf("%v does not match %v", svcInfo.hnsID, loadbalancerGuid1) | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| } | ||||
|  | ||||
| func TestUpdateLoadBalancerWhenUnsupported(t *testing.T) { | ||||
| 	syncPeriod := 30 * time.Second | ||||
| 	proxier := NewFakeProxier(syncPeriod, syncPeriod, "testhost", netutils.ParseIPSloppy("10.0.0.1"), NETWORK_TYPE_OVERLAY) | ||||
| 	if proxier == nil { | ||||
| 		t.Error() | ||||
| 	} | ||||
|  | ||||
| 	// By default the value is false, for the readibility of the test case setting it to false again | ||||
| 	proxier.supportedFeatures.ModifyLoadbalancer = false | ||||
|  | ||||
| 	svcIP := "10.20.30.41" | ||||
| 	svcPort := 80 | ||||
| 	svcNodePort := 3001 | ||||
| 	svcPortName := proxy.ServicePortName{ | ||||
| 		NamespacedName: makeNSN("ns1", "svc1"), | ||||
| 		Port:           "p80", | ||||
| 		Protocol:       v1.ProtocolTCP, | ||||
| 	} | ||||
|  | ||||
| 	makeServiceMap(proxier, | ||||
| 		makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) { | ||||
| 			svc.Spec.Type = "NodePort" | ||||
| 			svc.Spec.ClusterIP = svcIP | ||||
| 			svc.Spec.Ports = []v1.ServicePort{{ | ||||
| 				Name:     svcPortName.Port, | ||||
| 				Port:     int32(svcPort), | ||||
| 				Protocol: v1.ProtocolTCP, | ||||
| 				NodePort: int32(svcNodePort), | ||||
| 			}} | ||||
| 		}), | ||||
| 	) | ||||
| 	populateEndpointSlices(proxier, | ||||
| 		makeTestEndpointSlice(svcPortName.Namespace, svcPortName.Name, 1, func(eps *discovery.EndpointSlice) { | ||||
| 			eps.AddressType = discovery.AddressTypeIPv4 | ||||
| 			eps.Endpoints = []discovery.Endpoint{{ | ||||
| 				Addresses: []string{epIpAddressRemote}, | ||||
| 			}} | ||||
| 			eps.Ports = []discovery.EndpointPort{{ | ||||
| 				Name:     ptr.To(svcPortName.Port), | ||||
| 				Port:     ptr.To(int32(svcPort)), | ||||
| 				Protocol: ptr.To(v1.ProtocolTCP), | ||||
| 			}} | ||||
| 		}), | ||||
| 	) | ||||
|  | ||||
| 	proxier.setInitialized(true) | ||||
| 	proxier.syncProxyRules() | ||||
|  | ||||
| 	svc := proxier.svcPortMap[svcPortName] | ||||
| 	svcInfo, ok := svc.(*serviceInfo) | ||||
| 	if !ok { | ||||
| 		t.Errorf("Failed to cast serviceInfo %q", svcPortName.String()) | ||||
|  | ||||
| 	} else { | ||||
| 		if svcInfo.hnsID != loadbalancerGuid1 { | ||||
| 			t.Errorf("%v does not match %v", svcInfo.hnsID, loadbalancerGuid1) | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	proxier.setInitialized(false) | ||||
|  | ||||
| 	proxier.OnEndpointSliceUpdate( | ||||
| 		makeTestEndpointSlice(svcPortName.Namespace, svcPortName.Name, 1, func(eps *discovery.EndpointSlice) { | ||||
| 			eps.AddressType = discovery.AddressTypeIPv4 | ||||
| 			eps.Endpoints = []discovery.Endpoint{{ | ||||
| 				Addresses: []string{epIpAddressRemote}, | ||||
| 			}} | ||||
| 			eps.Ports = []discovery.EndpointPort{{ | ||||
| 				Name:     ptr.To(svcPortName.Port), | ||||
| 				Port:     ptr.To(int32(svcPort)), | ||||
| 				Protocol: ptr.To(v1.ProtocolTCP), | ||||
| 			}} | ||||
| 		}), | ||||
| 		makeTestEndpointSlice(svcPortName.Namespace, svcPortName.Name, 1, func(eps *discovery.EndpointSlice) { | ||||
| 			eps.AddressType = discovery.AddressTypeIPv4 | ||||
| 			eps.Endpoints = []discovery.Endpoint{{ | ||||
| 				Addresses: []string{epPaAddress}, | ||||
| 			}} | ||||
| 			eps.Ports = []discovery.EndpointPort{{ | ||||
| 				Name:     ptr.To(svcPortName.Port), | ||||
| 				Port:     ptr.To(int32(svcPort)), | ||||
| 				Protocol: ptr.To(v1.ProtocolTCP), | ||||
| 			}} | ||||
| 		})) | ||||
|  | ||||
| 	proxier.mu.Lock() | ||||
| 	proxier.endpointSlicesSynced = true | ||||
| 	proxier.mu.Unlock() | ||||
|  | ||||
| 	proxier.setInitialized(true) | ||||
|  | ||||
| 	epObj, err := proxier.hcn.GetEndpointByID("EPID-3") | ||||
| 	if err != nil || epObj == nil { | ||||
| 		t.Errorf("Failed to fetch endpoint: EPID-3") | ||||
| 	} | ||||
|  | ||||
| 	proxier.syncProxyRules() | ||||
|  | ||||
| 	// The endpoint should be deleted as it is not present in the new endpoint slice | ||||
| 	epObj, err = proxier.hcn.GetEndpointByID("EPID-3") | ||||
| 	if err == nil || epObj != nil { | ||||
| 		t.Errorf("Failed to fetch endpoint: EPID-3") | ||||
| 	} | ||||
|  | ||||
| 	ep := proxier.endpointsMap[svcPortName][0] | ||||
| 	epInfo, ok := ep.(*endpointInfo) | ||||
|  | ||||
| 	epObj, err = proxier.hcn.GetEndpointByID("EPID-5") | ||||
| 	if err != nil || epObj == nil { | ||||
| 		t.Errorf("Failed to fetch endpoint: EPID-5") | ||||
| 	} | ||||
|  | ||||
| 	if !ok { | ||||
| 		t.Errorf("Failed to cast endpointInfo %q", svcPortName.String()) | ||||
|  | ||||
| 	} else { | ||||
| 		if epInfo.hnsID != "EPID-5" { | ||||
| 			t.Errorf("%v does not match %v", epInfo.hnsID, "EPID-5") | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	if *epInfo.refCount != 1 { | ||||
| 		t.Errorf("Incorrect refcount. Current value: %v", *epInfo.refCount) | ||||
| 	} | ||||
|  | ||||
| 	if *proxier.endPointsRefCount["EPID-5"] != *epInfo.refCount { | ||||
| 		t.Errorf("Global refCount: %v does not match endpoint refCount: %v", *proxier.endPointsRefCount[endpointGuid1], *epInfo.refCount) | ||||
| 	} | ||||
|  | ||||
| 	svc = proxier.svcPortMap[svcPortName] | ||||
| 	svcInfo, ok = svc.(*serviceInfo) | ||||
| 	if !ok { | ||||
| 		t.Errorf("Failed to cast serviceInfo %q", svcPortName.String()) | ||||
|  | ||||
| 	} else { | ||||
| 		// Loadbalancer id should change after the update | ||||
| 		if svcInfo.hnsID != "LBID-3" { | ||||
| 			t.Errorf("%v does not match %v", svcInfo.hnsID, "LBID-3") | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| } | ||||
|  | ||||
| func TestCreateDsrLoadBalancer(t *testing.T) { | ||||
| 	syncPeriod := 30 * time.Second | ||||
| 	proxier := NewFakeProxier(syncPeriod, syncPeriod, "testhost", netutils.ParseIPSloppy("10.0.0.1"), NETWORK_TYPE_OVERLAY) | ||||
|   | ||||
| @@ -183,6 +183,15 @@ func (hcnObj HcnMock) CreateLoadBalancer(loadBalancer *hcn.HostComputeLoadBalanc | ||||
| 	return loadBalancer, nil | ||||
| } | ||||
|  | ||||
| func (hcnObj HcnMock) UpdateLoadBalancer(loadBalancer *hcn.HostComputeLoadBalancer, hnsLbID string) (*hcn.HostComputeLoadBalancer, error) { | ||||
| 	if _, ok := loadbalancerMap[hnsLbID]; !ok { | ||||
| 		return nil, fmt.Errorf("LoadBalancer id %s Not Present", loadBalancer.Id) | ||||
| 	} | ||||
| 	loadBalancer.Id = hnsLbID | ||||
| 	loadbalancerMap[hnsLbID] = loadBalancer | ||||
| 	return loadBalancer, nil | ||||
| } | ||||
|  | ||||
| func (hcnObj HcnMock) DeleteLoadBalancer(loadBalancer *hcn.HostComputeLoadBalancer) error { | ||||
| 	if _, ok := loadbalancerMap[loadBalancer.Id]; !ok { | ||||
| 		return hcn.LoadBalancerNotFoundError{LoadBalancerId: loadBalancer.Id} | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 Prince Pereira
					Prince Pereira