Merge pull request #76165 from JacobTanenbaum/minor-cleanups
Minor cleanups in pkg/proxy/endpoints.go
This commit is contained in:
commit
929adb69e3
@ -201,18 +201,18 @@ type UpdateEndpointMapResult struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// UpdateEndpointsMap updates endpointsMap base on the given changes.
|
// UpdateEndpointsMap updates endpointsMap base on the given changes.
|
||||||
func UpdateEndpointsMap(endpointsMap EndpointsMap, changes *EndpointChangeTracker) (result UpdateEndpointMapResult) {
|
func (em EndpointsMap) Update(changes *EndpointChangeTracker) (result UpdateEndpointMapResult) {
|
||||||
result.StaleEndpoints = make([]ServiceEndpoint, 0)
|
result.StaleEndpoints = make([]ServiceEndpoint, 0)
|
||||||
result.StaleServiceNames = make([]ServicePortName, 0)
|
result.StaleServiceNames = make([]ServicePortName, 0)
|
||||||
result.LastChangeTriggerTimes = make([]time.Time, 0)
|
result.LastChangeTriggerTimes = make([]time.Time, 0)
|
||||||
|
|
||||||
endpointsMap.apply(
|
em.apply(
|
||||||
changes, &result.StaleEndpoints, &result.StaleServiceNames, &result.LastChangeTriggerTimes)
|
changes, &result.StaleEndpoints, &result.StaleServiceNames, &result.LastChangeTriggerTimes)
|
||||||
|
|
||||||
// TODO: If this will appear to be computationally expensive, consider
|
// TODO: If this will appear to be computationally expensive, consider
|
||||||
// computing this incrementally similarly to endpointsMap.
|
// computing this incrementally similarly to endpointsMap.
|
||||||
result.HCEndpointsLocalIPSize = make(map[types.NamespacedName]int)
|
result.HCEndpointsLocalIPSize = make(map[types.NamespacedName]int)
|
||||||
localIPs := GetLocalEndpointIPs(endpointsMap)
|
localIPs := em.getLocalEndpointIPs()
|
||||||
for nsn, ips := range localIPs {
|
for nsn, ips := range localIPs {
|
||||||
result.HCEndpointsLocalIPSize[nsn] = len(ips)
|
result.HCEndpointsLocalIPSize[nsn] = len(ips)
|
||||||
}
|
}
|
||||||
@ -294,8 +294,8 @@ func (em EndpointsMap) apply(changes *EndpointChangeTracker, staleEndpoints *[]S
|
|||||||
changes.lock.Lock()
|
changes.lock.Lock()
|
||||||
defer changes.lock.Unlock()
|
defer changes.lock.Unlock()
|
||||||
for _, change := range changes.items {
|
for _, change := range changes.items {
|
||||||
em.Unmerge(change.previous)
|
em.unmerge(change.previous)
|
||||||
em.Merge(change.current)
|
em.merge(change.current)
|
||||||
detectStaleConnections(change.previous, change.current, staleEndpoints, staleServiceNames)
|
detectStaleConnections(change.previous, change.current, staleEndpoints, staleServiceNames)
|
||||||
}
|
}
|
||||||
changes.items = make(map[types.NamespacedName]*endpointsChange)
|
changes.items = make(map[types.NamespacedName]*endpointsChange)
|
||||||
@ -307,23 +307,23 @@ func (em EndpointsMap) apply(changes *EndpointChangeTracker, staleEndpoints *[]S
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Merge ensures that the current EndpointsMap contains all <service, endpoints> pairs from the EndpointsMap passed in.
|
// Merge ensures that the current EndpointsMap contains all <service, endpoints> pairs from the EndpointsMap passed in.
|
||||||
func (em EndpointsMap) Merge(other EndpointsMap) {
|
func (em EndpointsMap) merge(other EndpointsMap) {
|
||||||
for svcPortName := range other {
|
for svcPortName := range other {
|
||||||
em[svcPortName] = other[svcPortName]
|
em[svcPortName] = other[svcPortName]
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Unmerge removes the <service, endpoints> pairs from the current EndpointsMap which are contained in the EndpointsMap passed in.
|
// Unmerge removes the <service, endpoints> pairs from the current EndpointsMap which are contained in the EndpointsMap passed in.
|
||||||
func (em EndpointsMap) Unmerge(other EndpointsMap) {
|
func (em EndpointsMap) unmerge(other EndpointsMap) {
|
||||||
for svcPortName := range other {
|
for svcPortName := range other {
|
||||||
delete(em, svcPortName)
|
delete(em, svcPortName)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetLocalEndpointIPs returns endpoints IPs if given endpoint is local - local means the endpoint is running in same host as kube-proxy.
|
// GetLocalEndpointIPs returns endpoints IPs if given endpoint is local - local means the endpoint is running in same host as kube-proxy.
|
||||||
func GetLocalEndpointIPs(endpointsMap EndpointsMap) map[types.NamespacedName]sets.String {
|
func (em EndpointsMap) getLocalEndpointIPs() map[types.NamespacedName]sets.String {
|
||||||
localIPs := make(map[types.NamespacedName]sets.String)
|
localIPs := make(map[types.NamespacedName]sets.String)
|
||||||
for svcPortName, epList := range endpointsMap {
|
for svcPortName, epList := range em {
|
||||||
for _, ep := range epList {
|
for _, ep := range epList {
|
||||||
if ep.GetIsLocal() {
|
if ep.GetIsLocal() {
|
||||||
nsn := svcPortName.NamespacedName
|
nsn := svcPortName.NamespacedName
|
||||||
|
@ -112,7 +112,7 @@ func TestGetLocalEndpointIPs(t *testing.T) {
|
|||||||
|
|
||||||
for tci, tc := range testCases {
|
for tci, tc := range testCases {
|
||||||
// outputs
|
// outputs
|
||||||
localIPs := GetLocalEndpointIPs(tc.endpointsMap)
|
localIPs := tc.endpointsMap.getLocalEndpointIPs()
|
||||||
|
|
||||||
if !reflect.DeepEqual(localIPs, tc.expected) {
|
if !reflect.DeepEqual(localIPs, tc.expected) {
|
||||||
t.Errorf("[%d] expected %#v, got %#v", tci, tc.expected, localIPs)
|
t.Errorf("[%d] expected %#v, got %#v", tci, tc.expected, localIPs)
|
||||||
@ -1213,7 +1213,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
|||||||
fp.addEndpoints(tc.previousEndpoints[i])
|
fp.addEndpoints(tc.previousEndpoints[i])
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
UpdateEndpointsMap(fp.endpointsMap, fp.endpointsChanges)
|
fp.endpointsMap.Update(fp.endpointsChanges)
|
||||||
compareEndpointsMaps(t, tci, fp.endpointsMap, tc.oldEndpoints)
|
compareEndpointsMaps(t, tci, fp.endpointsMap, tc.oldEndpoints)
|
||||||
|
|
||||||
// Now let's call appropriate handlers to get to state we want to be.
|
// Now let's call appropriate handlers to get to state we want to be.
|
||||||
@ -1233,7 +1233,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
|||||||
fp.updateEndpoints(prev, curr)
|
fp.updateEndpoints(prev, curr)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
result := UpdateEndpointsMap(fp.endpointsMap, fp.endpointsChanges)
|
result := fp.endpointsMap.Update(fp.endpointsChanges)
|
||||||
newMap := fp.endpointsMap
|
newMap := fp.endpointsMap
|
||||||
compareEndpointsMaps(t, tci, newMap, tc.expectedResult)
|
compareEndpointsMaps(t, tci, newMap, tc.expectedResult)
|
||||||
if len(result.StaleEndpoints) != len(tc.expectedStaleEndpoints) {
|
if len(result.StaleEndpoints) != len(tc.expectedStaleEndpoints) {
|
||||||
@ -1373,7 +1373,7 @@ func TestLastChangeTriggerTime(t *testing.T) {
|
|||||||
|
|
||||||
tc.scenario(fp)
|
tc.scenario(fp)
|
||||||
|
|
||||||
result := UpdateEndpointsMap(fp.endpointsMap, fp.endpointsChanges)
|
result := fp.endpointsMap.Update(fp.endpointsChanges)
|
||||||
got := result.LastChangeTriggerTimes
|
got := result.LastChangeTriggerTimes
|
||||||
sortTimeSlice(got)
|
sortTimeSlice(got)
|
||||||
sortTimeSlice(tc.expected)
|
sortTimeSlice(tc.expected)
|
||||||
|
@ -684,7 +684,7 @@ func (proxier *Proxier) syncProxyRules() {
|
|||||||
// even if nothing changed in the meantime. In other words, callers are
|
// even if nothing changed in the meantime. In other words, callers are
|
||||||
// responsible for detecting no-op changes and not calling this function.
|
// responsible for detecting no-op changes and not calling this function.
|
||||||
serviceUpdateResult := proxy.UpdateServiceMap(proxier.serviceMap, proxier.serviceChanges)
|
serviceUpdateResult := proxy.UpdateServiceMap(proxier.serviceMap, proxier.serviceChanges)
|
||||||
endpointUpdateResult := proxy.UpdateEndpointsMap(proxier.endpointsMap, proxier.endpointsChanges)
|
endpointUpdateResult := proxier.endpointsMap.Update(proxier.endpointsChanges)
|
||||||
|
|
||||||
staleServices := serviceUpdateResult.UDPStaleClusterIP
|
staleServices := serviceUpdateResult.UDPStaleClusterIP
|
||||||
// merge stale services gathered from updateEndpointsMap
|
// merge stale services gathered from updateEndpointsMap
|
||||||
|
@ -2185,7 +2185,7 @@ func Test_updateEndpointsMap(t *testing.T) {
|
|||||||
fp.OnEndpointsAdd(tc.previousEndpoints[i])
|
fp.OnEndpointsAdd(tc.previousEndpoints[i])
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
proxy.UpdateEndpointsMap(fp.endpointsMap, fp.endpointsChanges)
|
fp.endpointsMap.Update(fp.endpointsChanges)
|
||||||
compareEndpointsMaps(t, tci, fp.endpointsMap, tc.oldEndpoints)
|
compareEndpointsMaps(t, tci, fp.endpointsMap, tc.oldEndpoints)
|
||||||
|
|
||||||
// Now let's call appropriate handlers to get to state we want to be.
|
// Now let's call appropriate handlers to get to state we want to be.
|
||||||
@ -2205,7 +2205,7 @@ func Test_updateEndpointsMap(t *testing.T) {
|
|||||||
fp.OnEndpointsUpdate(prev, curr)
|
fp.OnEndpointsUpdate(prev, curr)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
result := proxy.UpdateEndpointsMap(fp.endpointsMap, fp.endpointsChanges)
|
result := fp.endpointsMap.Update(fp.endpointsChanges)
|
||||||
newMap := fp.endpointsMap
|
newMap := fp.endpointsMap
|
||||||
compareEndpointsMaps(t, tci, newMap, tc.expectedResult)
|
compareEndpointsMaps(t, tci, newMap, tc.expectedResult)
|
||||||
if len(result.StaleEndpoints) != len(tc.expectedStaleEndpoints) {
|
if len(result.StaleEndpoints) != len(tc.expectedStaleEndpoints) {
|
||||||
|
@ -753,7 +753,7 @@ func (proxier *Proxier) syncProxyRules() {
|
|||||||
// even if nothing changed in the meantime. In other words, callers are
|
// even if nothing changed in the meantime. In other words, callers are
|
||||||
// responsible for detecting no-op changes and not calling this function.
|
// responsible for detecting no-op changes and not calling this function.
|
||||||
serviceUpdateResult := proxy.UpdateServiceMap(proxier.serviceMap, proxier.serviceChanges)
|
serviceUpdateResult := proxy.UpdateServiceMap(proxier.serviceMap, proxier.serviceChanges)
|
||||||
endpointUpdateResult := proxy.UpdateEndpointsMap(proxier.endpointsMap, proxier.endpointsChanges)
|
endpointUpdateResult := proxier.endpointsMap.Update(proxier.endpointsChanges)
|
||||||
|
|
||||||
staleServices := serviceUpdateResult.UDPStaleClusterIP
|
staleServices := serviceUpdateResult.UDPStaleClusterIP
|
||||||
// merge stale services gathered from updateEndpointsMap
|
// merge stale services gathered from updateEndpointsMap
|
||||||
|
@ -2486,7 +2486,7 @@ func Test_updateEndpointsMap(t *testing.T) {
|
|||||||
fp.OnEndpointsAdd(tc.previousEndpoints[i])
|
fp.OnEndpointsAdd(tc.previousEndpoints[i])
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
proxy.UpdateEndpointsMap(fp.endpointsMap, fp.endpointsChanges)
|
fp.endpointsMap.Update(fp.endpointsChanges)
|
||||||
compareEndpointsMaps(t, tci, fp.endpointsMap, tc.oldEndpoints)
|
compareEndpointsMaps(t, tci, fp.endpointsMap, tc.oldEndpoints)
|
||||||
|
|
||||||
// Now let's call appropriate handlers to get to state we want to be.
|
// Now let's call appropriate handlers to get to state we want to be.
|
||||||
@ -2506,7 +2506,7 @@ func Test_updateEndpointsMap(t *testing.T) {
|
|||||||
fp.OnEndpointsUpdate(prev, curr)
|
fp.OnEndpointsUpdate(prev, curr)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
result := proxy.UpdateEndpointsMap(fp.endpointsMap, fp.endpointsChanges)
|
result := fp.endpointsMap.Update(fp.endpointsChanges)
|
||||||
newMap := fp.endpointsMap
|
newMap := fp.endpointsMap
|
||||||
compareEndpointsMaps(t, tci, newMap, tc.expectedResult)
|
compareEndpointsMaps(t, tci, newMap, tc.expectedResult)
|
||||||
if len(result.StaleEndpoints) != len(tc.expectedStaleEndpoints) {
|
if len(result.StaleEndpoints) != len(tc.expectedStaleEndpoints) {
|
||||||
|
Loading…
Reference in New Issue
Block a user