From d901992eaef0b3b3a5689dca90c1af0b5a747a47 Mon Sep 17 00:00:00 2001 From: Dan Winship Date: Sun, 22 Jan 2023 10:25:22 -0500 Subject: [PATCH] Split out HealthCheckNodePort stuff from service/endpoint map Update() In addition to actually updating their data from the provided list of changes, EndpointsMap.Update() and ServicePortMap.Update() return a struct with some information about things that changed because of that update (eg services with stale conntrack entries). For some reason, they were also returning information about HealthCheckNodePorts, but they were returning *static* information based on the current (post-Update) state of the map, not information about what had *changed* in the update. Since this doesn't match how the other data in the struct is used (and since there's no reason to have the data only be returned when you call Update() anyway) , split it out. --- pkg/proxy/endpoints.go | 32 ++++--- pkg/proxy/endpoints_test.go | 40 +++++---- pkg/proxy/iptables/proxier.go | 4 +- pkg/proxy/iptables/proxier_test.go | 129 ++++++++++++++------------- pkg/proxy/ipvs/proxier.go | 4 +- pkg/proxy/ipvs/proxier_test.go | 134 ++++++++++++++++------------- pkg/proxy/service.go | 15 ++-- pkg/proxy/service_test.go | 81 +++++++++-------- pkg/proxy/winkernel/proxier.go | 4 +- 9 files changed, 246 insertions(+), 197 deletions(-) diff --git a/pkg/proxy/endpoints.go b/pkg/proxy/endpoints.go index 23fe42fa0f9..6597f8f340d 100644 --- a/pkg/proxy/endpoints.go +++ b/pkg/proxy/endpoints.go @@ -293,8 +293,6 @@ type endpointsChange struct { // UpdateEndpointMapResult is the updated results after applying endpoints changes. type UpdateEndpointMapResult struct { - // HCEndpointsLocalIPSize maps an endpoints name to the length of its local IPs. - HCEndpointsLocalIPSize map[types.NamespacedName]int // StaleEndpoints identifies if an endpoints service pair is stale. StaleEndpoints []ServiceEndpoint // StaleServiceNames identifies if a service is stale. @@ -313,15 +311,6 @@ func (em EndpointsMap) Update(changes *EndpointChangeTracker) (result UpdateEndp em.apply( changes, &result.StaleEndpoints, &result.StaleServiceNames, &result.LastChangeTriggerTimes) - - // TODO: If this will appear to be computationally expensive, consider - // computing this incrementally similarly to endpointsMap. - result.HCEndpointsLocalIPSize = make(map[types.NamespacedName]int) - localIPs := em.getLocalReadyEndpointIPs() - for nsn, ips := range localIPs { - result.HCEndpointsLocalIPSize[nsn] = len(ips) - } - return result } @@ -366,7 +355,7 @@ func (em EndpointsMap) unmerge(other EndpointsMap) { } } -// GetLocalEndpointIPs returns endpoints IPs if given endpoint is local - local means the endpoint is running in same host as kube-proxy. +// getLocalEndpointIPs returns endpoints IPs if given endpoint is local - local means the endpoint is running in same host as kube-proxy. func (em EndpointsMap) getLocalReadyEndpointIPs() map[types.NamespacedName]sets.String { localIPs := make(map[types.NamespacedName]sets.String) for svcPortName, epList := range em { @@ -389,6 +378,25 @@ func (em EndpointsMap) getLocalReadyEndpointIPs() map[types.NamespacedName]sets. return localIPs } +// LocalReadyEndpoints returns a map of Service names to the number of local ready +// endpoints for that service. +func (em EndpointsMap) LocalReadyEndpoints() map[types.NamespacedName]int { + // TODO: If this will appear to be computationally expensive, consider + // computing this incrementally similarly to endpointsMap. + + // (Note that we need to call getLocalEndpointIPs first to squash the data by IP, + // because the EndpointsMap is sorted by IP+port, not just IP, and we want to + // consider a Service pointing to 10.0.0.1:80 and 10.0.0.1:443 to have 1 endpoint, + // not 2.) + + eps := make(map[types.NamespacedName]int) + localIPs := em.getLocalReadyEndpointIPs() + for nsn, ips := range localIPs { + eps[nsn] = len(ips) + } + return eps +} + // detectStaleConnections modifies and with detected stale connections. // is used to store stale udp service in order to clear udp conntrack later. func detectStaleConnections(oldEndpointsMap, newEndpointsMap EndpointsMap, staleEndpoints *[]ServiceEndpoint, staleServiceNames *[]ServicePortName) { diff --git a/pkg/proxy/endpoints_test.go b/pkg/proxy/endpoints_test.go index cd5662cb11a..3e7388bdbe3 100644 --- a/pkg/proxy/endpoints_test.go +++ b/pkg/proxy/endpoints_test.go @@ -505,7 +505,7 @@ func TestUpdateEndpointsMap(t *testing.T) { expectedResult map[ServicePortName][]*BaseEndpointInfo expectedStaleEndpoints []ServiceEndpoint expectedStaleServiceNames map[ServicePortName]bool - expectedHealthchecks map[types.NamespacedName]int + expectedLocalEndpoints map[types.NamespacedName]int expectedChangedEndpoints sets.String }{{ name: "empty", @@ -513,7 +513,7 @@ func TestUpdateEndpointsMap(t *testing.T) { expectedResult: map[ServicePortName][]*BaseEndpointInfo{}, expectedStaleEndpoints: []ServiceEndpoint{}, expectedStaleServiceNames: map[ServicePortName]bool{}, - expectedHealthchecks: map[types.NamespacedName]int{}, + expectedLocalEndpoints: map[types.NamespacedName]int{}, expectedChangedEndpoints: sets.NewString(), }, { name: "no change, unnamed port", @@ -535,7 +535,7 @@ func TestUpdateEndpointsMap(t *testing.T) { }, expectedStaleEndpoints: []ServiceEndpoint{}, expectedStaleServiceNames: map[ServicePortName]bool{}, - expectedHealthchecks: map[types.NamespacedName]int{}, + expectedLocalEndpoints: map[types.NamespacedName]int{}, expectedChangedEndpoints: sets.NewString(), }, { name: "no change, named port, local", @@ -557,7 +557,7 @@ func TestUpdateEndpointsMap(t *testing.T) { }, expectedStaleEndpoints: []ServiceEndpoint{}, expectedStaleServiceNames: map[ServicePortName]bool{}, - expectedHealthchecks: map[types.NamespacedName]int{ + expectedLocalEndpoints: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 1, }, expectedChangedEndpoints: sets.NewString(), @@ -589,7 +589,7 @@ func TestUpdateEndpointsMap(t *testing.T) { }, expectedStaleEndpoints: []ServiceEndpoint{}, expectedStaleServiceNames: map[ServicePortName]bool{}, - expectedHealthchecks: map[types.NamespacedName]int{}, + expectedLocalEndpoints: map[types.NamespacedName]int{}, expectedChangedEndpoints: sets.NewString(), }, { name: "no change, multiple slices, multiple ports, local", @@ -625,7 +625,7 @@ func TestUpdateEndpointsMap(t *testing.T) { }, expectedStaleEndpoints: []ServiceEndpoint{}, expectedStaleServiceNames: map[ServicePortName]bool{}, - expectedHealthchecks: map[types.NamespacedName]int{ + expectedLocalEndpoints: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 1, }, expectedChangedEndpoints: sets.NewString(), @@ -695,7 +695,7 @@ func TestUpdateEndpointsMap(t *testing.T) { }, expectedStaleEndpoints: []ServiceEndpoint{}, expectedStaleServiceNames: map[ServicePortName]bool{}, - expectedHealthchecks: map[types.NamespacedName]int{ + expectedLocalEndpoints: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 2, makeNSN("ns2", "ep2"): 1, }, @@ -718,7 +718,7 @@ func TestUpdateEndpointsMap(t *testing.T) { expectedStaleServiceNames: map[ServicePortName]bool{ makeServicePortName("ns1", "ep1", "", v1.ProtocolUDP): true, }, - expectedHealthchecks: map[types.NamespacedName]int{ + expectedLocalEndpoints: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 1, }, expectedChangedEndpoints: sets.NewString("ns1/ep1"), @@ -741,7 +741,7 @@ func TestUpdateEndpointsMap(t *testing.T) { ServicePortName: makeServicePortName("ns1", "ep1", "", v1.ProtocolUDP), }}, expectedStaleServiceNames: map[ServicePortName]bool{}, - expectedHealthchecks: map[types.NamespacedName]int{}, + expectedLocalEndpoints: map[types.NamespacedName]int{}, expectedChangedEndpoints: sets.NewString("ns1/ep1"), }, { name: "add an IP and port", @@ -770,7 +770,7 @@ func TestUpdateEndpointsMap(t *testing.T) { expectedStaleServiceNames: map[ServicePortName]bool{ makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true, }, - expectedHealthchecks: map[types.NamespacedName]int{ + expectedLocalEndpoints: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 1, }, expectedChangedEndpoints: sets.NewString("ns1/ep1"), @@ -808,7 +808,7 @@ func TestUpdateEndpointsMap(t *testing.T) { ServicePortName: makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP), }}, expectedStaleServiceNames: map[ServicePortName]bool{}, - expectedHealthchecks: map[types.NamespacedName]int{}, + expectedLocalEndpoints: map[types.NamespacedName]int{}, expectedChangedEndpoints: sets.NewString("ns1/ep1"), }, { name: "add a slice to an endpoint", @@ -837,7 +837,7 @@ func TestUpdateEndpointsMap(t *testing.T) { expectedStaleServiceNames: map[ServicePortName]bool{ makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true, }, - expectedHealthchecks: map[types.NamespacedName]int{ + expectedLocalEndpoints: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 1, }, expectedChangedEndpoints: sets.NewString("ns1/ep1"), @@ -869,7 +869,7 @@ func TestUpdateEndpointsMap(t *testing.T) { ServicePortName: makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP), }}, expectedStaleServiceNames: map[ServicePortName]bool{}, - expectedHealthchecks: map[types.NamespacedName]int{}, + expectedLocalEndpoints: map[types.NamespacedName]int{}, expectedChangedEndpoints: sets.NewString("ns1/ep1"), }, { name: "rename a port", @@ -896,7 +896,7 @@ func TestUpdateEndpointsMap(t *testing.T) { expectedStaleServiceNames: map[ServicePortName]bool{ makeServicePortName("ns1", "ep1", "p11-2", v1.ProtocolUDP): true, }, - expectedHealthchecks: map[types.NamespacedName]int{}, + expectedLocalEndpoints: map[types.NamespacedName]int{}, expectedChangedEndpoints: sets.NewString("ns1/ep1"), }, { name: "renumber a port", @@ -921,7 +921,7 @@ func TestUpdateEndpointsMap(t *testing.T) { ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP), }}, expectedStaleServiceNames: map[ServicePortName]bool{}, - expectedHealthchecks: map[types.NamespacedName]int{}, + expectedLocalEndpoints: map[types.NamespacedName]int{}, expectedChangedEndpoints: sets.NewString("ns1/ep1"), }, { name: "complex add and remove", @@ -1009,7 +1009,7 @@ func TestUpdateEndpointsMap(t *testing.T) { makeServicePortName("ns1", "ep1", "p122", v1.ProtocolUDP): true, makeServicePortName("ns3", "ep3", "p33", v1.ProtocolUDP): true, }, - expectedHealthchecks: map[types.NamespacedName]int{ + expectedLocalEndpoints: map[types.NamespacedName]int{ makeNSN("ns4", "ep4"): 1, }, expectedChangedEndpoints: sets.NewString("ns1/ep1", "ns2/ep2", "ns3/ep3", "ns4/ep4"), @@ -1031,7 +1031,7 @@ func TestUpdateEndpointsMap(t *testing.T) { expectedStaleServiceNames: map[ServicePortName]bool{ makeServicePortName("ns1", "ep1", "", v1.ProtocolUDP): true, }, - expectedHealthchecks: map[types.NamespacedName]int{}, + expectedLocalEndpoints: map[types.NamespacedName]int{}, expectedChangedEndpoints: sets.NewString("ns1/ep1"), }, } @@ -1108,8 +1108,10 @@ func TestUpdateEndpointsMap(t *testing.T) { t.Errorf("[%d] expected staleServiceNames[%v], but didn't find it: %v", tci, svcName, result.StaleServiceNames) } } - if !reflect.DeepEqual(result.HCEndpointsLocalIPSize, tc.expectedHealthchecks) { - t.Errorf("[%d] expected healthchecks %v, got %v", tci, tc.expectedHealthchecks, result.HCEndpointsLocalIPSize) + + localReadyEndpoints := fp.endpointsMap.LocalReadyEndpoints() + if !reflect.DeepEqual(localReadyEndpoints, tc.expectedLocalEndpoints) { + t.Errorf("[%d] expected local ready endpoints %v, got %v", tci, tc.expectedLocalEndpoints, localReadyEndpoints) } }) } diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index fbde2f8db8b..107c2445983 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -1570,10 +1570,10 @@ func (proxier *Proxier) syncProxyRules() { // Update service healthchecks. The endpoints list might include services that are // not "OnlyLocal", but the services list will not, and the serviceHealthServer // will just drop those endpoints. - if err := proxier.serviceHealthServer.SyncServices(serviceUpdateResult.HCServiceNodePorts); err != nil { + if err := proxier.serviceHealthServer.SyncServices(proxier.svcPortMap.HealthCheckNodePorts()); err != nil { klog.ErrorS(err, "Error syncing healthcheck services") } - if err := proxier.serviceHealthServer.SyncEndpoints(endpointUpdateResult.HCEndpointsLocalIPSize); err != nil { + if err := proxier.serviceHealthServer.SyncEndpoints(proxier.endpointsMap.LocalReadyEndpoints()); err != nil { klog.ErrorS(err, "Error syncing healthcheck endpoints") } diff --git a/pkg/proxy/iptables/proxier_test.go b/pkg/proxy/iptables/proxier_test.go index 6b050a14713..e409ec0208f 100644 --- a/pkg/proxy/iptables/proxier_test.go +++ b/pkg/proxy/iptables/proxier_test.go @@ -4039,21 +4039,22 @@ func TestBuildServiceMapAddRemove(t *testing.T) { t.Errorf("expected service map length 10, got %v", fp.svcPortMap) } - // The only-local-loadbalancer ones get added - if len(result.HCServiceNodePorts) != 1 { - t.Errorf("expected 1 healthcheck port, got %v", result.HCServiceNodePorts) - } else { - nsn := makeNSN("somewhere", "only-local-load-balancer") - if port, found := result.HCServiceNodePorts[nsn]; !found || port != 345 { - t.Errorf("expected healthcheck port [%q]=345: got %v", nsn, result.HCServiceNodePorts) - } - } - if len(result.UDPStaleClusterIP) != 0 { // Services only added, so nothing stale yet t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP)) } + // The only-local-loadbalancer ones get added + healthCheckNodePorts := fp.svcPortMap.HealthCheckNodePorts() + if len(healthCheckNodePorts) != 1 { + t.Errorf("expected 1 healthcheck port, got %v", healthCheckNodePorts) + } else { + nsn := makeNSN("somewhere", "only-local-load-balancer") + if port, found := healthCheckNodePorts[nsn]; !found || port != 345 { + t.Errorf("expected healthcheck port [%q]=345: got %v", nsn, healthCheckNodePorts) + } + } + // Remove some stuff // oneService is a modification of services[0] with removed first port. oneService := makeTestService("somewhere-else", "cluster-ip", func(svc *v1.Service) { @@ -4072,10 +4073,6 @@ func TestBuildServiceMapAddRemove(t *testing.T) { t.Errorf("expected service map length 1, got %v", fp.svcPortMap) } - if len(result.HCServiceNodePorts) != 0 { - t.Errorf("expected 0 healthcheck ports, got %v", result.HCServiceNodePorts) - } - // All services but one were deleted. While you'd expect only the ClusterIPs // from the three deleted services here, we still have the ClusterIP for // the not-deleted service, because one of it's ServicePorts was deleted. @@ -4088,6 +4085,11 @@ func TestBuildServiceMapAddRemove(t *testing.T) { t.Errorf("expected stale UDP service service %s", ip) } } + + healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts() + if len(healthCheckNodePorts) != 0 { + t.Errorf("expected 0 healthcheck ports, got %v", healthCheckNodePorts) + } } func TestBuildServiceMapServiceHeadless(t *testing.T) { @@ -4112,14 +4114,15 @@ func TestBuildServiceMapServiceHeadless(t *testing.T) { t.Errorf("expected service map length 0, got %d", len(fp.svcPortMap)) } - // No proxied services, so no healthchecks - if len(result.HCServiceNodePorts) != 0 { - t.Errorf("expected healthcheck ports length 0, got %d", len(result.HCServiceNodePorts)) - } - if len(result.UDPStaleClusterIP) != 0 { t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP)) } + + // No proxied services, so no healthchecks + healthCheckNodePorts := fp.svcPortMap.HealthCheckNodePorts() + if len(healthCheckNodePorts) != 0 { + t.Errorf("expected healthcheck ports length 0, got %d", len(healthCheckNodePorts)) + } } func TestBuildServiceMapServiceTypeExternalName(t *testing.T) { @@ -4139,13 +4142,14 @@ func TestBuildServiceMapServiceTypeExternalName(t *testing.T) { if len(fp.svcPortMap) != 0 { t.Errorf("expected service map length 0, got %v", fp.svcPortMap) } - // No proxied services, so no healthchecks - if len(result.HCServiceNodePorts) != 0 { - t.Errorf("expected healthcheck ports length 0, got %v", result.HCServiceNodePorts) - } if len(result.UDPStaleClusterIP) != 0 { t.Errorf("expected stale UDP services length 0, got %v", result.UDPStaleClusterIP) } + // No proxied services, so no healthchecks + healthCheckNodePorts := fp.svcPortMap.HealthCheckNodePorts() + if len(healthCheckNodePorts) != 0 { + t.Errorf("expected healthcheck ports length 0, got %v", healthCheckNodePorts) + } } func TestBuildServiceMapServiceUpdate(t *testing.T) { @@ -4179,13 +4183,14 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) { if len(fp.svcPortMap) != 2 { t.Errorf("expected service map length 2, got %v", fp.svcPortMap) } - if len(result.HCServiceNodePorts) != 0 { - t.Errorf("expected healthcheck ports length 0, got %v", result.HCServiceNodePorts) - } if len(result.UDPStaleClusterIP) != 0 { // Services only added, so nothing stale yet t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP)) } + healthCheckNodePorts := fp.svcPortMap.HealthCheckNodePorts() + if len(healthCheckNodePorts) != 0 { + t.Errorf("expected healthcheck ports length 0, got %v", healthCheckNodePorts) + } // Change service to load-balancer fp.OnServiceUpdate(servicev1, servicev2) @@ -4193,12 +4198,13 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) { if len(fp.svcPortMap) != 2 { t.Errorf("expected service map length 2, got %v", fp.svcPortMap) } - if len(result.HCServiceNodePorts) != 1 { - t.Errorf("expected healthcheck ports length 1, got %v", result.HCServiceNodePorts) - } if len(result.UDPStaleClusterIP) != 0 { t.Errorf("expected stale UDP services length 0, got %v", result.UDPStaleClusterIP.UnsortedList()) } + healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts() + if len(healthCheckNodePorts) != 1 { + t.Errorf("expected healthcheck ports length 1, got %v", healthCheckNodePorts) + } // No change; make sure the service map stays the same and there are // no health-check changes @@ -4207,12 +4213,13 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) { if len(fp.svcPortMap) != 2 { t.Errorf("expected service map length 2, got %v", fp.svcPortMap) } - if len(result.HCServiceNodePorts) != 1 { - t.Errorf("expected healthcheck ports length 1, got %v", result.HCServiceNodePorts) - } if len(result.UDPStaleClusterIP) != 0 { t.Errorf("expected stale UDP services length 0, got %v", result.UDPStaleClusterIP.UnsortedList()) } + healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts() + if len(healthCheckNodePorts) != 1 { + t.Errorf("expected healthcheck ports length 1, got %v", healthCheckNodePorts) + } // And back to ClusterIP fp.OnServiceUpdate(servicev2, servicev1) @@ -4220,13 +4227,14 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) { if len(fp.svcPortMap) != 2 { t.Errorf("expected service map length 2, got %v", fp.svcPortMap) } - if len(result.HCServiceNodePorts) != 0 { - t.Errorf("expected healthcheck ports length 0, got %v", result.HCServiceNodePorts) - } if len(result.UDPStaleClusterIP) != 0 { // Services only added, so nothing stale yet t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP)) } + healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts() + if len(healthCheckNodePorts) != 0 { + t.Errorf("expected healthcheck ports length 0, got %v", healthCheckNodePorts) + } } func populateEndpointSlices(proxier *Proxier, allEndpointSlices ...*discovery.EndpointSlice) { @@ -4633,7 +4641,7 @@ func TestUpdateEndpointsMap(t *testing.T) { expectedResult map[proxy.ServicePortName][]*endpointsInfo expectedStaleEndpoints []proxy.ServiceEndpoint expectedStaleServiceNames map[proxy.ServicePortName]bool - expectedHealthchecks map[types.NamespacedName]int + expectedLocalEndpoints map[types.NamespacedName]int }{{ // Case[0]: nothing name: "nothing", @@ -4641,7 +4649,7 @@ func TestUpdateEndpointsMap(t *testing.T) { expectedResult: map[proxy.ServicePortName][]*endpointsInfo{}, expectedStaleEndpoints: []proxy.ServiceEndpoint{}, expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, - expectedHealthchecks: map[types.NamespacedName]int{}, + expectedLocalEndpoints: map[types.NamespacedName]int{}, }, { // Case[1]: no change, named port, local name: "no change, named port, local", @@ -4659,7 +4667,7 @@ func TestUpdateEndpointsMap(t *testing.T) { }, expectedStaleEndpoints: []proxy.ServiceEndpoint{}, expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, - expectedHealthchecks: map[types.NamespacedName]int{ + expectedLocalEndpoints: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 1, }, }, { @@ -4685,7 +4693,7 @@ func TestUpdateEndpointsMap(t *testing.T) { }, expectedStaleEndpoints: []proxy.ServiceEndpoint{}, expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, - expectedHealthchecks: map[types.NamespacedName]int{}, + expectedLocalEndpoints: map[types.NamespacedName]int{}, }, { // Case[3]: no change, multiple subsets, multiple ports, local name: "no change, multiple subsets, multiple ports, local", @@ -4715,7 +4723,7 @@ func TestUpdateEndpointsMap(t *testing.T) { }, expectedStaleEndpoints: []proxy.ServiceEndpoint{}, expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, - expectedHealthchecks: map[types.NamespacedName]int{ + expectedLocalEndpoints: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 1, }, }, { @@ -4777,7 +4785,7 @@ func TestUpdateEndpointsMap(t *testing.T) { }, expectedStaleEndpoints: []proxy.ServiceEndpoint{}, expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, - expectedHealthchecks: map[types.NamespacedName]int{ + expectedLocalEndpoints: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 2, makeNSN("ns2", "ep2"): 1, }, @@ -4796,7 +4804,7 @@ func TestUpdateEndpointsMap(t *testing.T) { expectedStaleServiceNames: map[proxy.ServicePortName]bool{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): true, }, - expectedHealthchecks: map[types.NamespacedName]int{ + expectedLocalEndpoints: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 1, }, }, { @@ -4815,7 +4823,7 @@ func TestUpdateEndpointsMap(t *testing.T) { ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP), }}, expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, - expectedHealthchecks: map[types.NamespacedName]int{}, + expectedLocalEndpoints: map[types.NamespacedName]int{}, }, { // Case[7]: add an IP and port name: "add an IP and port", @@ -4840,7 +4848,7 @@ func TestUpdateEndpointsMap(t *testing.T) { expectedStaleServiceNames: map[proxy.ServicePortName]bool{ makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true, }, - expectedHealthchecks: map[types.NamespacedName]int{ + expectedLocalEndpoints: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 1, }, }, { @@ -4874,7 +4882,7 @@ func TestUpdateEndpointsMap(t *testing.T) { ServicePortName: makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP), }}, expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, - expectedHealthchecks: map[types.NamespacedName]int{}, + expectedLocalEndpoints: map[types.NamespacedName]int{}, }, { // Case[9]: add a subset name: "add a subset", @@ -4897,7 +4905,7 @@ func TestUpdateEndpointsMap(t *testing.T) { expectedStaleServiceNames: map[proxy.ServicePortName]bool{ makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true, }, - expectedHealthchecks: map[types.NamespacedName]int{ + expectedLocalEndpoints: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 1, }, }, { @@ -4923,7 +4931,7 @@ func TestUpdateEndpointsMap(t *testing.T) { ServicePortName: makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP), }}, expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, - expectedHealthchecks: map[types.NamespacedName]int{}, + expectedLocalEndpoints: map[types.NamespacedName]int{}, }, { // Case[11]: rename a port name: "rename a port", @@ -4946,7 +4954,7 @@ func TestUpdateEndpointsMap(t *testing.T) { expectedStaleServiceNames: map[proxy.ServicePortName]bool{ makeServicePortName("ns1", "ep1", "p11-2", v1.ProtocolUDP): true, }, - expectedHealthchecks: map[types.NamespacedName]int{}, + expectedLocalEndpoints: map[types.NamespacedName]int{}, }, { // Case[12]: renumber a port name: "renumber a port", @@ -4967,7 +4975,7 @@ func TestUpdateEndpointsMap(t *testing.T) { ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP), }}, expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, - expectedHealthchecks: map[types.NamespacedName]int{}, + expectedLocalEndpoints: map[types.NamespacedName]int{}, }, { // Case[13]: complex add and remove name: "complex add and remove", @@ -5031,7 +5039,7 @@ func TestUpdateEndpointsMap(t *testing.T) { makeServicePortName("ns1", "ep1", "p122", v1.ProtocolUDP): true, makeServicePortName("ns3", "ep3", "p33", v1.ProtocolUDP): true, }, - expectedHealthchecks: map[types.NamespacedName]int{ + expectedLocalEndpoints: map[types.NamespacedName]int{ makeNSN("ns4", "ep4"): 1, }, }, { @@ -5049,7 +5057,7 @@ func TestUpdateEndpointsMap(t *testing.T) { expectedStaleServiceNames: map[proxy.ServicePortName]bool{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): true, }, - expectedHealthchecks: map[types.NamespacedName]int{}, + expectedLocalEndpoints: map[types.NamespacedName]int{}, }, } @@ -5117,8 +5125,9 @@ func TestUpdateEndpointsMap(t *testing.T) { t.Errorf("[%d] expected staleServiceNames[%v], but didn't find it: %v", tci, svcName, result.StaleServiceNames) } } - if !reflect.DeepEqual(result.HCEndpointsLocalIPSize, tc.expectedHealthchecks) { - t.Errorf("[%d] expected healthchecks %v, got %v", tci, tc.expectedHealthchecks, result.HCEndpointsLocalIPSize) + localReadyEndpoints := fp.endpointsMap.LocalReadyEndpoints() + if !reflect.DeepEqual(localReadyEndpoints, tc.expectedLocalEndpoints) { + t.Errorf("[%d] expected local endpoints %v, got %v", tci, tc.expectedLocalEndpoints, localReadyEndpoints) } }) } @@ -5277,9 +5286,10 @@ func TestHealthCheckNodePortWhenTerminating(t *testing.T) { } fp.OnEndpointSliceAdd(endpointSlice) - result := fp.endpointsMap.Update(fp.endpointsChanges) - if len(result.HCEndpointsLocalIPSize) != 1 { - t.Errorf("unexpected number of health check node ports, expected 1 but got: %d", len(result.HCEndpointsLocalIPSize)) + _ = fp.endpointsMap.Update(fp.endpointsChanges) + localReadyEndpoints := fp.endpointsMap.LocalReadyEndpoints() + if len(localReadyEndpoints) != 1 { + t.Errorf("unexpected number of local ready endpoints, expected 1 but got: %d", len(localReadyEndpoints)) } // set all endpoints to terminating @@ -5331,9 +5341,10 @@ func TestHealthCheckNodePortWhenTerminating(t *testing.T) { } fp.OnEndpointSliceUpdate(endpointSlice, endpointSliceTerminating) - result = fp.endpointsMap.Update(fp.endpointsChanges) - if len(result.HCEndpointsLocalIPSize) != 0 { - t.Errorf("unexpected number of health check node ports, expected 0 but got: %d", len(result.HCEndpointsLocalIPSize)) + _ = fp.endpointsMap.Update(fp.endpointsChanges) + localReadyEndpoints = fp.endpointsMap.LocalReadyEndpoints() + if len(localReadyEndpoints) != 0 { + t.Errorf("unexpected number of local ready endpoints, expected 0 but got: %d", len(localReadyEndpoints)) } } diff --git a/pkg/proxy/ipvs/proxier.go b/pkg/proxy/ipvs/proxier.go index 21b4e550422..7798662675e 100644 --- a/pkg/proxy/ipvs/proxier.go +++ b/pkg/proxy/ipvs/proxier.go @@ -1572,10 +1572,10 @@ func (proxier *Proxier) syncProxyRules() { // Update service healthchecks. The endpoints list might include services that are // not "OnlyLocal", but the services list will not, and the serviceHealthServer // will just drop those endpoints. - if err := proxier.serviceHealthServer.SyncServices(serviceUpdateResult.HCServiceNodePorts); err != nil { + if err := proxier.serviceHealthServer.SyncServices(proxier.svcPortMap.HealthCheckNodePorts()); err != nil { klog.ErrorS(err, "Error syncing healthcheck services") } - if err := proxier.serviceHealthServer.SyncEndpoints(endpointUpdateResult.HCEndpointsLocalIPSize); err != nil { + if err := proxier.serviceHealthServer.SyncEndpoints(proxier.endpointsMap.LocalReadyEndpoints()); err != nil { klog.ErrorS(err, "Error syncing healthcheck endpoints") } diff --git a/pkg/proxy/ipvs/proxier_test.go b/pkg/proxy/ipvs/proxier_test.go index 0e1e8b5383e..bba797e9a04 100644 --- a/pkg/proxy/ipvs/proxier_test.go +++ b/pkg/proxy/ipvs/proxier_test.go @@ -2589,21 +2589,22 @@ func TestBuildServiceMapAddRemove(t *testing.T) { t.Errorf("expected service map length 12, got %v", fp.svcPortMap) } - // The only-local-loadbalancer ones get added - if len(result.HCServiceNodePorts) != 1 { - t.Errorf("expected 1 healthcheck port, got %v", result.HCServiceNodePorts) - } else { - nsn := makeNSN("somewhere", "only-local-load-balancer") - if port, found := result.HCServiceNodePorts[nsn]; !found || port != 345 { - t.Errorf("expected healthcheck port [%q]=345: got %v", nsn, result.HCServiceNodePorts) - } - } - if len(result.UDPStaleClusterIP) != 0 { // Services only added, so nothing stale yet t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP)) } + // The only-local-loadbalancer ones get added + healthCheckNodePorts := fp.svcPortMap.HealthCheckNodePorts() + if len(healthCheckNodePorts) != 1 { + t.Errorf("expected 1 healthcheck port, got %v", healthCheckNodePorts) + } else { + nsn := makeNSN("somewhere", "only-local-load-balancer") + if port, found := healthCheckNodePorts[nsn]; !found || port != 345 { + t.Errorf("expected healthcheck port [%q]=345: got %v", nsn, healthCheckNodePorts) + } + } + // Remove some stuff // oneService is a modification of services[0] with removed first port. oneService := makeTestService("somewhere-else", "cluster-ip", func(svc *v1.Service) { @@ -2622,10 +2623,6 @@ func TestBuildServiceMapAddRemove(t *testing.T) { t.Errorf("expected service map length 1, got %v", fp.svcPortMap) } - if len(result.HCServiceNodePorts) != 0 { - t.Errorf("expected 0 healthcheck ports, got %v", result.HCServiceNodePorts) - } - // All services but one were deleted. While you'd expect only the ClusterIPs // from the three deleted services here, we still have the ClusterIP for // the not-deleted service, because one of it's ServicePorts was deleted. @@ -2638,6 +2635,11 @@ func TestBuildServiceMapAddRemove(t *testing.T) { t.Errorf("expected stale UDP service service %s", ip) } } + + healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts() + if len(healthCheckNodePorts) != 0 { + t.Errorf("expected 0 healthcheck ports, got %v", healthCheckNodePorts) + } } func TestBuildServiceMapServiceHeadless(t *testing.T) { @@ -2669,14 +2671,15 @@ func TestBuildServiceMapServiceHeadless(t *testing.T) { t.Errorf("expected service map length 0, got %d", len(fp.svcPortMap)) } - // No proxied services, so no healthchecks - if len(result.HCServiceNodePorts) != 0 { - t.Errorf("expected healthcheck ports length 0, got %d", len(result.HCServiceNodePorts)) - } - if len(result.UDPStaleClusterIP) != 0 { t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP)) } + + // No proxied services, so no healthchecks + healthCheckNodePorts := fp.svcPortMap.HealthCheckNodePorts() + if len(healthCheckNodePorts) != 0 { + t.Errorf("expected healthcheck ports length 0, got %d", len(healthCheckNodePorts)) + } } func TestBuildServiceMapServiceTypeExternalName(t *testing.T) { @@ -2698,13 +2701,15 @@ func TestBuildServiceMapServiceTypeExternalName(t *testing.T) { if len(fp.svcPortMap) != 0 { t.Errorf("expected service map length 0, got %v", fp.svcPortMap) } - // No proxied services, so no healthchecks - if len(result.HCServiceNodePorts) != 0 { - t.Errorf("expected healthcheck ports length 0, got %v", result.HCServiceNodePorts) - } if len(result.UDPStaleClusterIP) != 0 { t.Errorf("expected stale UDP services length 0, got %v", result.UDPStaleClusterIP) } + + // No proxied services, so no healthchecks + healthCheckNodePorts := fp.svcPortMap.HealthCheckNodePorts() + if len(healthCheckNodePorts) != 0 { + t.Errorf("expected healthcheck ports length 0, got %v", healthCheckNodePorts) + } } func TestBuildServiceMapServiceUpdate(t *testing.T) { @@ -2740,27 +2745,31 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) { if len(fp.svcPortMap) != 2 { t.Errorf("expected service map length 2, got %v", fp.svcPortMap) } - if len(result.HCServiceNodePorts) != 0 { - t.Errorf("expected healthcheck ports length 0, got %v", result.HCServiceNodePorts) - } if len(result.UDPStaleClusterIP) != 0 { // Services only added, so nothing stale yet t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP)) } + healthCheckNodePorts := fp.svcPortMap.HealthCheckNodePorts() + if len(healthCheckNodePorts) != 0 { + t.Errorf("expected healthcheck ports length 0, got %v", healthCheckNodePorts) + } + // Change service to load-balancer fp.OnServiceUpdate(servicev1, servicev2) result = fp.svcPortMap.Update(fp.serviceChanges) if len(fp.svcPortMap) != 2 { t.Errorf("expected service map length 2, got %v", fp.svcPortMap) } - if len(result.HCServiceNodePorts) != 1 { - t.Errorf("expected healthcheck ports length 1, got %v", result.HCServiceNodePorts) - } if len(result.UDPStaleClusterIP) != 0 { t.Errorf("expected stale UDP services length 0, got %v", result.UDPStaleClusterIP.List()) } + healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts() + if len(healthCheckNodePorts) != 1 { + t.Errorf("expected healthcheck ports length 1, got %v", healthCheckNodePorts) + } + // No change; make sure the service map stays the same and there are // no health-check changes fp.OnServiceUpdate(servicev2, servicev2) @@ -2768,26 +2777,30 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) { if len(fp.svcPortMap) != 2 { t.Errorf("expected service map length 2, got %v", fp.svcPortMap) } - if len(result.HCServiceNodePorts) != 1 { - t.Errorf("expected healthcheck ports length 1, got %v", result.HCServiceNodePorts) - } if len(result.UDPStaleClusterIP) != 0 { t.Errorf("expected stale UDP services length 0, got %v", result.UDPStaleClusterIP.List()) } + healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts() + if len(healthCheckNodePorts) != 1 { + t.Errorf("expected healthcheck ports length 1, got %v", healthCheckNodePorts) + } + // And back to ClusterIP fp.OnServiceUpdate(servicev2, servicev1) result = fp.svcPortMap.Update(fp.serviceChanges) if len(fp.svcPortMap) != 2 { t.Errorf("expected service map length 2, got %v", fp.svcPortMap) } - if len(result.HCServiceNodePorts) != 0 { - t.Errorf("expected healthcheck ports length 0, got %v", result.HCServiceNodePorts) - } if len(result.UDPStaleClusterIP) != 0 { // Services only added, so nothing stale yet t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP)) } + + healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts() + if len(healthCheckNodePorts) != 0 { + t.Errorf("expected healthcheck ports length 0, got %v", healthCheckNodePorts) + } } func TestSessionAffinity(t *testing.T) { @@ -3188,7 +3201,7 @@ func Test_updateEndpointsMap(t *testing.T) { expectedResult map[proxy.ServicePortName][]*proxy.BaseEndpointInfo expectedStaleEndpoints []proxy.ServiceEndpoint expectedStaleServiceNames map[proxy.ServicePortName]bool - expectedHealthchecks map[types.NamespacedName]int + expectedReadyEndpoints map[types.NamespacedName]int }{{ // Case[0]: nothing name: "nothing", @@ -3196,7 +3209,7 @@ func Test_updateEndpointsMap(t *testing.T) { expectedResult: map[proxy.ServicePortName][]*proxy.BaseEndpointInfo{}, expectedStaleEndpoints: []proxy.ServiceEndpoint{}, expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, - expectedHealthchecks: map[types.NamespacedName]int{}, + expectedReadyEndpoints: map[types.NamespacedName]int{}, }, { // Case[1]: no change, named port, local name: "no change, named port, local", @@ -3214,7 +3227,7 @@ func Test_updateEndpointsMap(t *testing.T) { }, expectedStaleEndpoints: []proxy.ServiceEndpoint{}, expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, - expectedHealthchecks: map[types.NamespacedName]int{ + expectedReadyEndpoints: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 1, }, }, { @@ -3240,7 +3253,7 @@ func Test_updateEndpointsMap(t *testing.T) { }, expectedStaleEndpoints: []proxy.ServiceEndpoint{}, expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, - expectedHealthchecks: map[types.NamespacedName]int{}, + expectedReadyEndpoints: map[types.NamespacedName]int{}, }, { // Case[3]: no change, multiple subsets, multiple ports, local name: "no change, multiple subsets, multiple ports, local", @@ -3270,7 +3283,7 @@ func Test_updateEndpointsMap(t *testing.T) { }, expectedStaleEndpoints: []proxy.ServiceEndpoint{}, expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, - expectedHealthchecks: map[types.NamespacedName]int{ + expectedReadyEndpoints: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 1, }, }, { @@ -3332,7 +3345,7 @@ func Test_updateEndpointsMap(t *testing.T) { }, expectedStaleEndpoints: []proxy.ServiceEndpoint{}, expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, - expectedHealthchecks: map[types.NamespacedName]int{ + expectedReadyEndpoints: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 2, makeNSN("ns2", "ep2"): 1, }, @@ -3351,7 +3364,7 @@ func Test_updateEndpointsMap(t *testing.T) { expectedStaleServiceNames: map[proxy.ServicePortName]bool{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): true, }, - expectedHealthchecks: map[types.NamespacedName]int{ + expectedReadyEndpoints: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 1, }, }, { @@ -3370,7 +3383,7 @@ func Test_updateEndpointsMap(t *testing.T) { ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP), }}, expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, - expectedHealthchecks: map[types.NamespacedName]int{}, + expectedReadyEndpoints: map[types.NamespacedName]int{}, }, { // Case[7]: add an IP and port name: "add an IP and port", @@ -3395,7 +3408,7 @@ func Test_updateEndpointsMap(t *testing.T) { expectedStaleServiceNames: map[proxy.ServicePortName]bool{ makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true, }, - expectedHealthchecks: map[types.NamespacedName]int{ + expectedReadyEndpoints: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 1, }, }, { @@ -3429,7 +3442,7 @@ func Test_updateEndpointsMap(t *testing.T) { ServicePortName: makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP), }}, expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, - expectedHealthchecks: map[types.NamespacedName]int{}, + expectedReadyEndpoints: map[types.NamespacedName]int{}, }, { // Case[9]: add a subset name: "add a subset", @@ -3452,7 +3465,7 @@ func Test_updateEndpointsMap(t *testing.T) { expectedStaleServiceNames: map[proxy.ServicePortName]bool{ makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true, }, - expectedHealthchecks: map[types.NamespacedName]int{ + expectedReadyEndpoints: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 1, }, }, { @@ -3478,7 +3491,7 @@ func Test_updateEndpointsMap(t *testing.T) { ServicePortName: makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP), }}, expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, - expectedHealthchecks: map[types.NamespacedName]int{}, + expectedReadyEndpoints: map[types.NamespacedName]int{}, }, { // Case[11]: rename a port name: "rename a port", @@ -3501,7 +3514,7 @@ func Test_updateEndpointsMap(t *testing.T) { expectedStaleServiceNames: map[proxy.ServicePortName]bool{ makeServicePortName("ns1", "ep1", "p11-2", v1.ProtocolUDP): true, }, - expectedHealthchecks: map[types.NamespacedName]int{}, + expectedReadyEndpoints: map[types.NamespacedName]int{}, }, { // Case[12]: renumber a port name: "renumber a port", @@ -3522,7 +3535,7 @@ func Test_updateEndpointsMap(t *testing.T) { ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP), }}, expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, - expectedHealthchecks: map[types.NamespacedName]int{}, + expectedReadyEndpoints: map[types.NamespacedName]int{}, }, { // Case[13]: complex add and remove name: "complex add and remove", @@ -3586,7 +3599,7 @@ func Test_updateEndpointsMap(t *testing.T) { makeServicePortName("ns1", "ep1", "p122", v1.ProtocolUDP): true, makeServicePortName("ns3", "ep3", "p33", v1.ProtocolUDP): true, }, - expectedHealthchecks: map[types.NamespacedName]int{ + expectedReadyEndpoints: map[types.NamespacedName]int{ makeNSN("ns4", "ep4"): 1, }, }, { @@ -3604,7 +3617,7 @@ func Test_updateEndpointsMap(t *testing.T) { expectedStaleServiceNames: map[proxy.ServicePortName]bool{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): true, }, - expectedHealthchecks: map[types.NamespacedName]int{}, + expectedReadyEndpoints: map[types.NamespacedName]int{}, }, } @@ -3675,8 +3688,9 @@ func Test_updateEndpointsMap(t *testing.T) { t.Errorf("[%d] expected staleServiceNames[%v], but didn't find it: %v", tci, svcName, result.StaleServiceNames) } } - if !reflect.DeepEqual(result.HCEndpointsLocalIPSize, tc.expectedHealthchecks) { - t.Errorf("[%d] expected healthchecks %v, got %v", tci, tc.expectedHealthchecks, result.HCEndpointsLocalIPSize) + localReadyEndpoints := fp.endpointsMap.LocalReadyEndpoints() + if !reflect.DeepEqual(localReadyEndpoints, tc.expectedReadyEndpoints) { + t.Errorf("[%d] expected healthchecks %v, got %v", tci, tc.expectedReadyEndpoints, localReadyEndpoints) } }) } @@ -4665,9 +4679,10 @@ func Test_HealthCheckNodePortWhenTerminating(t *testing.T) { } fp.OnEndpointSliceAdd(endpointSlice) - result := fp.endpointsMap.Update(fp.endpointsChanges) - if len(result.HCEndpointsLocalIPSize) != 1 { - t.Errorf("unexpected number of health check node ports, expected 1 but got: %d", len(result.HCEndpointsLocalIPSize)) + _ = fp.endpointsMap.Update(fp.endpointsChanges) + localReadyEndpoints := fp.endpointsMap.LocalReadyEndpoints() + if len(localReadyEndpoints) != 1 { + t.Errorf("unexpected number of health check node ports, expected 1 but got: %d", len(localReadyEndpoints)) } // set all endpoints to terminating @@ -4719,9 +4734,10 @@ func Test_HealthCheckNodePortWhenTerminating(t *testing.T) { } fp.OnEndpointSliceUpdate(endpointSlice, endpointSliceTerminating) - result = fp.endpointsMap.Update(fp.endpointsChanges) - if len(result.HCEndpointsLocalIPSize) != 0 { - t.Errorf("unexpected number of health check node ports, expected 0 but got: %d", len(result.HCEndpointsLocalIPSize)) + _ = fp.endpointsMap.Update(fp.endpointsChanges) + localReadyEndpoints = fp.endpointsMap.LocalReadyEndpoints() + if len(localReadyEndpoints) != 0 { + t.Errorf("unexpected number of health check node ports, expected 0 but got: %d", len(localReadyEndpoints)) } } diff --git a/pkg/proxy/service.go b/pkg/proxy/service.go index ceae54eb202..1d2d8071988 100644 --- a/pkg/proxy/service.go +++ b/pkg/proxy/service.go @@ -337,9 +337,6 @@ func (sct *ServiceChangeTracker) PendingChanges() sets.String { // UpdateServiceMapResult is the updated results after applying service changes. type UpdateServiceMapResult struct { - // HCServiceNodePorts is a map of Service names to node port numbers which indicate the health of that Service on this Node. - // The value(uint16) of HCServices map is the service health check node port. - HCServiceNodePorts map[types.NamespacedName]uint16 // UDPStaleClusterIP holds stale (no longer assigned to a Service) Service IPs that had UDP ports. // Callers can use this to abort timeout-waits or clear connection-tracking information. UDPStaleClusterIP sets.String @@ -349,17 +346,21 @@ type UpdateServiceMapResult struct { func (sm ServicePortMap) Update(changes *ServiceChangeTracker) (result UpdateServiceMapResult) { result.UDPStaleClusterIP = sets.NewString() sm.apply(changes, result.UDPStaleClusterIP) + return result +} +// HealthCheckNodePorts returns a map of Service names to HealthCheckNodePort values +// for all Services in sm with non-zero HealthCheckNodePort. +func (sm ServicePortMap) HealthCheckNodePorts() map[types.NamespacedName]uint16 { // TODO: If this will appear to be computationally expensive, consider // computing this incrementally similarly to svcPortMap. - result.HCServiceNodePorts = make(map[types.NamespacedName]uint16) + ports := make(map[types.NamespacedName]uint16) for svcPortName, info := range sm { if info.HealthCheckNodePort() != 0 { - result.HCServiceNodePorts[svcPortName.NamespacedName] = uint16(info.HealthCheckNodePort()) + ports[svcPortName.NamespacedName] = uint16(info.HealthCheckNodePort()) } } - - return result + return ports } // ServicePortMap maps a service to its ServicePort. diff --git a/pkg/proxy/service_test.go b/pkg/proxy/service_test.go index a45ae5e490a..4b694425047 100644 --- a/pkg/proxy/service_test.go +++ b/pkg/proxy/service_test.go @@ -568,15 +568,15 @@ func TestServiceMapUpdateHeadless(t *testing.T) { if len(fp.svcPortMap) != 0 { t.Errorf("expected service map length 0, got %d", len(fp.svcPortMap)) } - - // No proxied services, so no healthchecks - if len(result.HCServiceNodePorts) != 0 { - t.Errorf("expected healthcheck ports length 0, got %d", len(result.HCServiceNodePorts)) - } - if len(result.UDPStaleClusterIP) != 0 { t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP)) } + + // No proxied services, so no healthchecks + healthCheckNodePorts := fp.svcPortMap.HealthCheckNodePorts() + if len(healthCheckNodePorts) != 0 { + t.Errorf("expected healthcheck ports length 0, got %d", len(healthCheckNodePorts)) + } } func TestUpdateServiceTypeExternalName(t *testing.T) { @@ -599,13 +599,15 @@ func TestUpdateServiceTypeExternalName(t *testing.T) { if len(fp.svcPortMap) != 0 { t.Errorf("expected service map length 0, got %v", fp.svcPortMap) } - // No proxied services, so no healthchecks - if len(result.HCServiceNodePorts) != 0 { - t.Errorf("expected healthcheck ports length 0, got %v", result.HCServiceNodePorts) - } if len(result.UDPStaleClusterIP) != 0 { t.Errorf("expected stale UDP services length 0, got %v", result.UDPStaleClusterIP) } + + // No proxied services, so no healthchecks + healthCheckNodePorts := fp.svcPortMap.HealthCheckNodePorts() + if len(healthCheckNodePorts) != 0 { + t.Errorf("expected healthcheck ports length 0, got %v", healthCheckNodePorts) + } } func TestBuildServiceMapAddRemove(t *testing.T) { @@ -671,22 +673,22 @@ func TestBuildServiceMapAddRemove(t *testing.T) { if len(fp.svcPortMap) != 8 { t.Errorf("expected service map length 2, got %v", fp.svcPortMap) } - - // The only-local-loadbalancer ones get added - if len(result.HCServiceNodePorts) != 1 { - t.Errorf("expected 1 healthcheck port, got %v", result.HCServiceNodePorts) - } else { - nsn := makeNSN("ns1", "only-local-load-balancer") - if port, found := result.HCServiceNodePorts[nsn]; !found || port != 345 { - t.Errorf("expected healthcheck port [%q]=345: got %v", nsn, result.HCServiceNodePorts) - } - } - if len(result.UDPStaleClusterIP) != 0 { // Services only added, so nothing stale yet t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP)) } + // The only-local-loadbalancer ones get added + healthCheckNodePorts := fp.svcPortMap.HealthCheckNodePorts() + if len(healthCheckNodePorts) != 1 { + t.Errorf("expected 1 healthcheck port, got %v", healthCheckNodePorts) + } else { + nsn := makeNSN("ns1", "only-local-load-balancer") + if port, found := healthCheckNodePorts[nsn]; !found || port != 345 { + t.Errorf("expected healthcheck port [%q]=345: got %v", nsn, healthCheckNodePorts) + } + } + // Remove some stuff // oneService is a modification of services[0] with removed first port. oneService := makeTestService("ns2", "cluster-ip", func(svc *v1.Service) { @@ -709,8 +711,9 @@ func TestBuildServiceMapAddRemove(t *testing.T) { t.Errorf("expected service map length 1, got %v", fp.svcPortMap) } - if len(result.HCServiceNodePorts) != 0 { - t.Errorf("expected 0 healthcheck ports, got %v", result.HCServiceNodePorts) + healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts() + if len(healthCheckNodePorts) != 0 { + t.Errorf("expected 0 healthcheck ports, got %v", healthCheckNodePorts) } // All services but one were deleted. While you'd expect only the ClusterIPs @@ -761,14 +764,16 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) { if len(fp.svcPortMap) != 2 { t.Errorf("expected service map length 2, got %v", fp.svcPortMap) } - if len(result.HCServiceNodePorts) != 0 { - t.Errorf("expected healthcheck ports length 0, got %v", result.HCServiceNodePorts) - } if len(result.UDPStaleClusterIP) != 0 { // Services only added, so nothing stale yet t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP)) } + healthCheckNodePorts := fp.svcPortMap.HealthCheckNodePorts() + if len(healthCheckNodePorts) != 0 { + t.Errorf("expected healthcheck ports length 0, got %v", healthCheckNodePorts) + } + // Change service to load-balancer fp.updateService(servicev1, servicev2) pending = fp.serviceChanges.PendingChanges() @@ -779,13 +784,15 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) { if len(fp.svcPortMap) != 2 { t.Errorf("expected service map length 2, got %v", fp.svcPortMap) } - if len(result.HCServiceNodePorts) != 1 { - t.Errorf("expected healthcheck ports length 1, got %v", result.HCServiceNodePorts) - } if len(result.UDPStaleClusterIP) != 0 { t.Errorf("expected stale UDP services length 0, got %v", result.UDPStaleClusterIP.UnsortedList()) } + healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts() + if len(healthCheckNodePorts) != 1 { + t.Errorf("expected healthcheck ports length 1, got %v", healthCheckNodePorts) + } + // No change; make sure the service map stays the same and there are // no health-check changes fp.updateService(servicev2, servicev2) @@ -797,13 +804,15 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) { if len(fp.svcPortMap) != 2 { t.Errorf("expected service map length 2, got %v", fp.svcPortMap) } - if len(result.HCServiceNodePorts) != 1 { - t.Errorf("expected healthcheck ports length 1, got %v", result.HCServiceNodePorts) - } if len(result.UDPStaleClusterIP) != 0 { t.Errorf("expected stale UDP services length 0, got %v", result.UDPStaleClusterIP.UnsortedList()) } + healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts() + if len(healthCheckNodePorts) != 1 { + t.Errorf("expected healthcheck ports length 1, got %v", healthCheckNodePorts) + } + // And back to ClusterIP fp.updateService(servicev2, servicev1) pending = fp.serviceChanges.PendingChanges() @@ -814,11 +823,13 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) { if len(fp.svcPortMap) != 2 { t.Errorf("expected service map length 2, got %v", fp.svcPortMap) } - if len(result.HCServiceNodePorts) != 0 { - t.Errorf("expected healthcheck ports length 0, got %v", result.HCServiceNodePorts) - } if len(result.UDPStaleClusterIP) != 0 { // Services only added, so nothing stale yet t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP)) } + + healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts() + if len(healthCheckNodePorts) != 0 { + t.Errorf("expected healthcheck ports length 0, got %v", healthCheckNodePorts) + } } diff --git a/pkg/proxy/winkernel/proxier.go b/pkg/proxy/winkernel/proxier.go index 16cc4317b22..b302e137dee 100644 --- a/pkg/proxy/winkernel/proxier.go +++ b/pkg/proxy/winkernel/proxier.go @@ -1640,10 +1640,10 @@ func (proxier *Proxier) syncProxyRules() { // Update service healthchecks. The endpoints list might include services that are // not "OnlyLocal", but the services list will not, and the serviceHealthServer // will just drop those endpoints. - if err := proxier.serviceHealthServer.SyncServices(serviceUpdateResult.HCServiceNodePorts); err != nil { + if err := proxier.serviceHealthServer.SyncServices(proxier.svcPortMap.HealthCheckNodePorts()); err != nil { klog.ErrorS(err, "Error syncing healthcheck services") } - if err := proxier.serviceHealthServer.SyncEndpoints(endpointUpdateResult.HCEndpointsLocalIPSize); err != nil { + if err := proxier.serviceHealthServer.SyncEndpoints(proxier.endpointsMap.LocalReadyEndpoints()); err != nil { klog.ErrorS(err, "Error syncing healthcheck endpoints") }