diff --git a/pkg/proxy/endpoints.go b/pkg/proxy/endpoints.go index d81b2cab9e2..38754d3ecfc 100644 --- a/pkg/proxy/endpoints.go +++ b/pkg/proxy/endpoints.go @@ -201,18 +201,18 @@ type UpdateEndpointMapResult struct { } // UpdateEndpointsMap updates endpointsMap base on the given changes. -func UpdateEndpointsMap(endpointsMap EndpointsMap, changes *EndpointChangeTracker) (result UpdateEndpointMapResult) { +func (em EndpointsMap) Update(changes *EndpointChangeTracker) (result UpdateEndpointMapResult) { result.StaleEndpoints = make([]ServiceEndpoint, 0) result.StaleServiceNames = make([]ServicePortName, 0) result.LastChangeTriggerTimes = make([]time.Time, 0) - endpointsMap.apply( + em.apply( changes, &result.StaleEndpoints, &result.StaleServiceNames, &result.LastChangeTriggerTimes) // TODO: If this will appear to be computationally expensive, consider // computing this incrementally similarly to endpointsMap. result.HCEndpointsLocalIPSize = make(map[types.NamespacedName]int) - localIPs := GetLocalEndpointIPs(endpointsMap) + localIPs := em.getLocalEndpointIPs() for nsn, ips := range localIPs { result.HCEndpointsLocalIPSize[nsn] = len(ips) } @@ -294,8 +294,8 @@ func (em EndpointsMap) apply(changes *EndpointChangeTracker, staleEndpoints *[]S changes.lock.Lock() defer changes.lock.Unlock() for _, change := range changes.items { - em.Unmerge(change.previous) - em.Merge(change.current) + em.unmerge(change.previous) + em.merge(change.current) detectStaleConnections(change.previous, change.current, staleEndpoints, staleServiceNames) } changes.items = make(map[types.NamespacedName]*endpointsChange) @@ -307,23 +307,23 @@ func (em EndpointsMap) apply(changes *EndpointChangeTracker, staleEndpoints *[]S } // Merge ensures that the current EndpointsMap contains all pairs from the EndpointsMap passed in. -func (em EndpointsMap) Merge(other EndpointsMap) { +func (em EndpointsMap) merge(other EndpointsMap) { for svcPortName := range other { em[svcPortName] = other[svcPortName] } } // Unmerge removes the pairs from the current EndpointsMap which are contained in the EndpointsMap passed in. -func (em EndpointsMap) Unmerge(other EndpointsMap) { +func (em EndpointsMap) unmerge(other EndpointsMap) { for svcPortName := range other { delete(em, svcPortName) } } // GetLocalEndpointIPs returns endpoints IPs if given endpoint is local - local means the endpoint is running in same host as kube-proxy. -func GetLocalEndpointIPs(endpointsMap EndpointsMap) map[types.NamespacedName]sets.String { +func (em EndpointsMap) getLocalEndpointIPs() map[types.NamespacedName]sets.String { localIPs := make(map[types.NamespacedName]sets.String) - for svcPortName, epList := range endpointsMap { + for svcPortName, epList := range em { for _, ep := range epList { if ep.GetIsLocal() { nsn := svcPortName.NamespacedName diff --git a/pkg/proxy/endpoints_test.go b/pkg/proxy/endpoints_test.go index 650aa1da994..68227425fef 100644 --- a/pkg/proxy/endpoints_test.go +++ b/pkg/proxy/endpoints_test.go @@ -112,7 +112,7 @@ func TestGetLocalEndpointIPs(t *testing.T) { for tci, tc := range testCases { // outputs - localIPs := GetLocalEndpointIPs(tc.endpointsMap) + localIPs := tc.endpointsMap.getLocalEndpointIPs() if !reflect.DeepEqual(localIPs, tc.expected) { t.Errorf("[%d] expected %#v, got %#v", tci, tc.expected, localIPs) @@ -1213,7 +1213,7 @@ func TestUpdateEndpointsMap(t *testing.T) { fp.addEndpoints(tc.previousEndpoints[i]) } } - UpdateEndpointsMap(fp.endpointsMap, fp.endpointsChanges) + fp.endpointsMap.Update(fp.endpointsChanges) compareEndpointsMaps(t, tci, fp.endpointsMap, tc.oldEndpoints) // Now let's call appropriate handlers to get to state we want to be. @@ -1233,7 +1233,7 @@ func TestUpdateEndpointsMap(t *testing.T) { fp.updateEndpoints(prev, curr) } } - result := UpdateEndpointsMap(fp.endpointsMap, fp.endpointsChanges) + result := fp.endpointsMap.Update(fp.endpointsChanges) newMap := fp.endpointsMap compareEndpointsMaps(t, tci, newMap, tc.expectedResult) if len(result.StaleEndpoints) != len(tc.expectedStaleEndpoints) { @@ -1373,7 +1373,7 @@ func TestLastChangeTriggerTime(t *testing.T) { tc.scenario(fp) - result := UpdateEndpointsMap(fp.endpointsMap, fp.endpointsChanges) + result := fp.endpointsMap.Update(fp.endpointsChanges) got := result.LastChangeTriggerTimes sortTimeSlice(got) sortTimeSlice(tc.expected) diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index 69ee23c99e1..967f5f0e7fb 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -684,7 +684,7 @@ func (proxier *Proxier) syncProxyRules() { // even if nothing changed in the meantime. In other words, callers are // responsible for detecting no-op changes and not calling this function. serviceUpdateResult := proxy.UpdateServiceMap(proxier.serviceMap, proxier.serviceChanges) - endpointUpdateResult := proxy.UpdateEndpointsMap(proxier.endpointsMap, proxier.endpointsChanges) + endpointUpdateResult := proxier.endpointsMap.Update(proxier.endpointsChanges) staleServices := serviceUpdateResult.UDPStaleClusterIP // merge stale services gathered from updateEndpointsMap diff --git a/pkg/proxy/iptables/proxier_test.go b/pkg/proxy/iptables/proxier_test.go index c096753d0d9..a23cc247f20 100644 --- a/pkg/proxy/iptables/proxier_test.go +++ b/pkg/proxy/iptables/proxier_test.go @@ -2185,7 +2185,7 @@ func Test_updateEndpointsMap(t *testing.T) { fp.OnEndpointsAdd(tc.previousEndpoints[i]) } } - proxy.UpdateEndpointsMap(fp.endpointsMap, fp.endpointsChanges) + fp.endpointsMap.Update(fp.endpointsChanges) compareEndpointsMaps(t, tci, fp.endpointsMap, tc.oldEndpoints) // Now let's call appropriate handlers to get to state we want to be. @@ -2205,7 +2205,7 @@ func Test_updateEndpointsMap(t *testing.T) { fp.OnEndpointsUpdate(prev, curr) } } - result := proxy.UpdateEndpointsMap(fp.endpointsMap, fp.endpointsChanges) + result := fp.endpointsMap.Update(fp.endpointsChanges) newMap := fp.endpointsMap compareEndpointsMaps(t, tci, newMap, tc.expectedResult) if len(result.StaleEndpoints) != len(tc.expectedStaleEndpoints) { diff --git a/pkg/proxy/ipvs/proxier.go b/pkg/proxy/ipvs/proxier.go index a4c23572c5b..48c2d499a3e 100644 --- a/pkg/proxy/ipvs/proxier.go +++ b/pkg/proxy/ipvs/proxier.go @@ -753,7 +753,7 @@ func (proxier *Proxier) syncProxyRules() { // even if nothing changed in the meantime. In other words, callers are // responsible for detecting no-op changes and not calling this function. serviceUpdateResult := proxy.UpdateServiceMap(proxier.serviceMap, proxier.serviceChanges) - endpointUpdateResult := proxy.UpdateEndpointsMap(proxier.endpointsMap, proxier.endpointsChanges) + endpointUpdateResult := proxier.endpointsMap.Update(proxier.endpointsChanges) staleServices := serviceUpdateResult.UDPStaleClusterIP // merge stale services gathered from updateEndpointsMap diff --git a/pkg/proxy/ipvs/proxier_test.go b/pkg/proxy/ipvs/proxier_test.go index 9b599241ea2..288dadc5476 100644 --- a/pkg/proxy/ipvs/proxier_test.go +++ b/pkg/proxy/ipvs/proxier_test.go @@ -2486,7 +2486,7 @@ func Test_updateEndpointsMap(t *testing.T) { fp.OnEndpointsAdd(tc.previousEndpoints[i]) } } - proxy.UpdateEndpointsMap(fp.endpointsMap, fp.endpointsChanges) + fp.endpointsMap.Update(fp.endpointsChanges) compareEndpointsMaps(t, tci, fp.endpointsMap, tc.oldEndpoints) // Now let's call appropriate handlers to get to state we want to be. @@ -2506,7 +2506,7 @@ func Test_updateEndpointsMap(t *testing.T) { fp.OnEndpointsUpdate(prev, curr) } } - result := proxy.UpdateEndpointsMap(fp.endpointsMap, fp.endpointsChanges) + result := fp.endpointsMap.Update(fp.endpointsChanges) newMap := fp.endpointsMap compareEndpointsMaps(t, tci, newMap, tc.expectedResult) if len(result.StaleEndpoints) != len(tc.expectedStaleEndpoints) {