Merge pull request #64146 from Lion-Wei/ipvs-lb
Automatic merge from submit-queue (batch tested with PRs 64034, 64072, 64146, 64059, 64161). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>. fix session affinity for LoadBalancer service with ESIPP **What this PR does / why we need it**: fix session affinity for LoadBalancer service with ESIPP **Which issue(s) this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close the issue(s) when PR gets merged)*: Fixes #63351 **Special notes for your reviewer**: In cases that loadbalancer type service with externaltrafficpolicy=local and session-affinity specified, traffic to loadbalancer should only route to backends that in the same node with kube-proxy. **Release note**: ```release-note NONE ```
This commit is contained in:
		@@ -877,7 +877,7 @@ func (proxier *Proxier) syncProxyRules() {
 | 
				
			|||||||
			}
 | 
								}
 | 
				
			||||||
			if err := proxier.syncService(svcNameString, serv, true); err == nil {
 | 
								if err := proxier.syncService(svcNameString, serv, true); err == nil {
 | 
				
			||||||
				activeIPVSServices[serv.String()] = true
 | 
									activeIPVSServices[serv.String()] = true
 | 
				
			||||||
				if err := proxier.syncEndpoint(svcName, svcInfo.OnlyNodeLocalEndpoints, serv); err != nil {
 | 
									if err := proxier.syncEndpoint(svcName, false, serv); err != nil {
 | 
				
			||||||
					glog.Errorf("Failed to sync endpoint for service: %v, err: %v", serv, err)
 | 
										glog.Errorf("Failed to sync endpoint for service: %v, err: %v", serv, err)
 | 
				
			||||||
				}
 | 
									}
 | 
				
			||||||
			} else {
 | 
								} else {
 | 
				
			||||||
@@ -976,8 +976,10 @@ func (proxier *Proxier) syncProxyRules() {
 | 
				
			|||||||
					serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds)
 | 
										serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds)
 | 
				
			||||||
				}
 | 
									}
 | 
				
			||||||
				if err := proxier.syncService(svcNameString, serv, true); err == nil {
 | 
									if err := proxier.syncService(svcNameString, serv, true); err == nil {
 | 
				
			||||||
 | 
										// check if service need skip endpoints that not in same host as kube-proxy
 | 
				
			||||||
 | 
										onlyLocal := svcInfo.SessionAffinityType == api.ServiceAffinityClientIP && svcInfo.OnlyNodeLocalEndpoints
 | 
				
			||||||
					activeIPVSServices[serv.String()] = true
 | 
										activeIPVSServices[serv.String()] = true
 | 
				
			||||||
					if err := proxier.syncEndpoint(svcName, svcInfo.OnlyNodeLocalEndpoints, serv); err != nil {
 | 
										if err := proxier.syncEndpoint(svcName, onlyLocal, serv); err != nil {
 | 
				
			||||||
						glog.Errorf("Failed to sync endpoint for service: %v, err: %v", serv, err)
 | 
											glog.Errorf("Failed to sync endpoint for service: %v, err: %v", serv, err)
 | 
				
			||||||
					}
 | 
										}
 | 
				
			||||||
				} else {
 | 
									} else {
 | 
				
			||||||
@@ -1105,7 +1107,7 @@ func (proxier *Proxier) syncProxyRules() {
 | 
				
			|||||||
				// There is no need to bind Node IP to dummy interface, so set parameter `bindAddr` to `false`.
 | 
									// There is no need to bind Node IP to dummy interface, so set parameter `bindAddr` to `false`.
 | 
				
			||||||
				if err := proxier.syncService(svcNameString, serv, false); err == nil {
 | 
									if err := proxier.syncService(svcNameString, serv, false); err == nil {
 | 
				
			||||||
					activeIPVSServices[serv.String()] = true
 | 
										activeIPVSServices[serv.String()] = true
 | 
				
			||||||
					if err := proxier.syncEndpoint(svcName, svcInfo.OnlyNodeLocalEndpoints, serv); err != nil {
 | 
										if err := proxier.syncEndpoint(svcName, false, serv); err != nil {
 | 
				
			||||||
						glog.Errorf("Failed to sync endpoint for service: %v, err: %v", serv, err)
 | 
											glog.Errorf("Failed to sync endpoint for service: %v, err: %v", serv, err)
 | 
				
			||||||
					}
 | 
										}
 | 
				
			||||||
				} else {
 | 
									} else {
 | 
				
			||||||
@@ -1498,6 +1500,9 @@ func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNode
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	for _, epInfo := range proxier.endpointsMap[svcPortName] {
 | 
						for _, epInfo := range proxier.endpointsMap[svcPortName] {
 | 
				
			||||||
 | 
							if onlyNodeLocalEndpoints && !epInfo.GetIsLocal() {
 | 
				
			||||||
 | 
								continue
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
		newEndpoints.Insert(epInfo.String())
 | 
							newEndpoints.Insert(epInfo.String())
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user