diff --git a/pkg/proxy/endpoints.go b/pkg/proxy/endpoints.go index 23fe42fa0f9..6597f8f340d 100644 --- a/pkg/proxy/endpoints.go +++ b/pkg/proxy/endpoints.go @@ -293,8 +293,6 @@ type endpointsChange struct { // UpdateEndpointMapResult is the updated results after applying endpoints changes. type UpdateEndpointMapResult struct { - // HCEndpointsLocalIPSize maps an endpoints name to the length of its local IPs. - HCEndpointsLocalIPSize map[types.NamespacedName]int // StaleEndpoints identifies if an endpoints service pair is stale. StaleEndpoints []ServiceEndpoint // StaleServiceNames identifies if a service is stale. @@ -313,15 +311,6 @@ func (em EndpointsMap) Update(changes *EndpointChangeTracker) (result UpdateEndp em.apply( changes, &result.StaleEndpoints, &result.StaleServiceNames, &result.LastChangeTriggerTimes) - - // TODO: If this will appear to be computationally expensive, consider - // computing this incrementally similarly to endpointsMap. - result.HCEndpointsLocalIPSize = make(map[types.NamespacedName]int) - localIPs := em.getLocalReadyEndpointIPs() - for nsn, ips := range localIPs { - result.HCEndpointsLocalIPSize[nsn] = len(ips) - } - return result } @@ -366,7 +355,7 @@ func (em EndpointsMap) unmerge(other EndpointsMap) { } } -// GetLocalEndpointIPs returns endpoints IPs if given endpoint is local - local means the endpoint is running in same host as kube-proxy. +// getLocalEndpointIPs returns endpoints IPs if given endpoint is local - local means the endpoint is running in same host as kube-proxy. func (em EndpointsMap) getLocalReadyEndpointIPs() map[types.NamespacedName]sets.String { localIPs := make(map[types.NamespacedName]sets.String) for svcPortName, epList := range em { @@ -389,6 +378,25 @@ func (em EndpointsMap) getLocalReadyEndpointIPs() map[types.NamespacedName]sets. return localIPs } +// LocalReadyEndpoints returns a map of Service names to the number of local ready +// endpoints for that service. +func (em EndpointsMap) LocalReadyEndpoints() map[types.NamespacedName]int { + // TODO: If this will appear to be computationally expensive, consider + // computing this incrementally similarly to endpointsMap. + + // (Note that we need to call getLocalEndpointIPs first to squash the data by IP, + // because the EndpointsMap is sorted by IP+port, not just IP, and we want to + // consider a Service pointing to 10.0.0.1:80 and 10.0.0.1:443 to have 1 endpoint, + // not 2.) + + eps := make(map[types.NamespacedName]int) + localIPs := em.getLocalReadyEndpointIPs() + for nsn, ips := range localIPs { + eps[nsn] = len(ips) + } + return eps +} + // detectStaleConnections modifies and with detected stale connections. // is used to store stale udp service in order to clear udp conntrack later. func detectStaleConnections(oldEndpointsMap, newEndpointsMap EndpointsMap, staleEndpoints *[]ServiceEndpoint, staleServiceNames *[]ServicePortName) { diff --git a/pkg/proxy/endpoints_test.go b/pkg/proxy/endpoints_test.go index cd5662cb11a..3e7388bdbe3 100644 --- a/pkg/proxy/endpoints_test.go +++ b/pkg/proxy/endpoints_test.go @@ -505,7 +505,7 @@ func TestUpdateEndpointsMap(t *testing.T) { expectedResult map[ServicePortName][]*BaseEndpointInfo expectedStaleEndpoints []ServiceEndpoint expectedStaleServiceNames map[ServicePortName]bool - expectedHealthchecks map[types.NamespacedName]int + expectedLocalEndpoints map[types.NamespacedName]int expectedChangedEndpoints sets.String }{{ name: "empty", @@ -513,7 +513,7 @@ func TestUpdateEndpointsMap(t *testing.T) { expectedResult: map[ServicePortName][]*BaseEndpointInfo{}, expectedStaleEndpoints: []ServiceEndpoint{}, expectedStaleServiceNames: map[ServicePortName]bool{}, - expectedHealthchecks: map[types.NamespacedName]int{}, + expectedLocalEndpoints: map[types.NamespacedName]int{}, expectedChangedEndpoints: sets.NewString(), }, { name: "no change, unnamed port", @@ -535,7 +535,7 @@ func TestUpdateEndpointsMap(t *testing.T) { }, expectedStaleEndpoints: []ServiceEndpoint{}, expectedStaleServiceNames: map[ServicePortName]bool{}, - expectedHealthchecks: map[types.NamespacedName]int{}, + expectedLocalEndpoints: map[types.NamespacedName]int{}, expectedChangedEndpoints: sets.NewString(), }, { name: "no change, named port, local", @@ -557,7 +557,7 @@ func TestUpdateEndpointsMap(t *testing.T) { }, expectedStaleEndpoints: []ServiceEndpoint{}, expectedStaleServiceNames: map[ServicePortName]bool{}, - expectedHealthchecks: map[types.NamespacedName]int{ + expectedLocalEndpoints: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 1, }, expectedChangedEndpoints: sets.NewString(), @@ -589,7 +589,7 @@ func TestUpdateEndpointsMap(t *testing.T) { }, expectedStaleEndpoints: []ServiceEndpoint{}, expectedStaleServiceNames: map[ServicePortName]bool{}, - expectedHealthchecks: map[types.NamespacedName]int{}, + expectedLocalEndpoints: map[types.NamespacedName]int{}, expectedChangedEndpoints: sets.NewString(), }, { name: "no change, multiple slices, multiple ports, local", @@ -625,7 +625,7 @@ func TestUpdateEndpointsMap(t *testing.T) { }, expectedStaleEndpoints: []ServiceEndpoint{}, expectedStaleServiceNames: map[ServicePortName]bool{}, - expectedHealthchecks: map[types.NamespacedName]int{ + expectedLocalEndpoints: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 1, }, expectedChangedEndpoints: sets.NewString(), @@ -695,7 +695,7 @@ func TestUpdateEndpointsMap(t *testing.T) { }, expectedStaleEndpoints: []ServiceEndpoint{}, expectedStaleServiceNames: map[ServicePortName]bool{}, - expectedHealthchecks: map[types.NamespacedName]int{ + expectedLocalEndpoints: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 2, makeNSN("ns2", "ep2"): 1, }, @@ -718,7 +718,7 @@ func TestUpdateEndpointsMap(t *testing.T) { expectedStaleServiceNames: map[ServicePortName]bool{ makeServicePortName("ns1", "ep1", "", v1.ProtocolUDP): true, }, - expectedHealthchecks: map[types.NamespacedName]int{ + expectedLocalEndpoints: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 1, }, expectedChangedEndpoints: sets.NewString("ns1/ep1"), @@ -741,7 +741,7 @@ func TestUpdateEndpointsMap(t *testing.T) { ServicePortName: makeServicePortName("ns1", "ep1", "", v1.ProtocolUDP), }}, expectedStaleServiceNames: map[ServicePortName]bool{}, - expectedHealthchecks: map[types.NamespacedName]int{}, + expectedLocalEndpoints: map[types.NamespacedName]int{}, expectedChangedEndpoints: sets.NewString("ns1/ep1"), }, { name: "add an IP and port", @@ -770,7 +770,7 @@ func TestUpdateEndpointsMap(t *testing.T) { expectedStaleServiceNames: map[ServicePortName]bool{ makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true, }, - expectedHealthchecks: map[types.NamespacedName]int{ + expectedLocalEndpoints: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 1, }, expectedChangedEndpoints: sets.NewString("ns1/ep1"), @@ -808,7 +808,7 @@ func TestUpdateEndpointsMap(t *testing.T) { ServicePortName: makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP), }}, expectedStaleServiceNames: map[ServicePortName]bool{}, - expectedHealthchecks: map[types.NamespacedName]int{}, + expectedLocalEndpoints: map[types.NamespacedName]int{}, expectedChangedEndpoints: sets.NewString("ns1/ep1"), }, { name: "add a slice to an endpoint", @@ -837,7 +837,7 @@ func TestUpdateEndpointsMap(t *testing.T) { expectedStaleServiceNames: map[ServicePortName]bool{ makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true, }, - expectedHealthchecks: map[types.NamespacedName]int{ + expectedLocalEndpoints: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 1, }, expectedChangedEndpoints: sets.NewString("ns1/ep1"), @@ -869,7 +869,7 @@ func TestUpdateEndpointsMap(t *testing.T) { ServicePortName: makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP), }}, expectedStaleServiceNames: map[ServicePortName]bool{}, - expectedHealthchecks: map[types.NamespacedName]int{}, + expectedLocalEndpoints: map[types.NamespacedName]int{}, expectedChangedEndpoints: sets.NewString("ns1/ep1"), }, { name: "rename a port", @@ -896,7 +896,7 @@ func TestUpdateEndpointsMap(t *testing.T) { expectedStaleServiceNames: map[ServicePortName]bool{ makeServicePortName("ns1", "ep1", "p11-2", v1.ProtocolUDP): true, }, - expectedHealthchecks: map[types.NamespacedName]int{}, + expectedLocalEndpoints: map[types.NamespacedName]int{}, expectedChangedEndpoints: sets.NewString("ns1/ep1"), }, { name: "renumber a port", @@ -921,7 +921,7 @@ func TestUpdateEndpointsMap(t *testing.T) { ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP), }}, expectedStaleServiceNames: map[ServicePortName]bool{}, - expectedHealthchecks: map[types.NamespacedName]int{}, + expectedLocalEndpoints: map[types.NamespacedName]int{}, expectedChangedEndpoints: sets.NewString("ns1/ep1"), }, { name: "complex add and remove", @@ -1009,7 +1009,7 @@ func TestUpdateEndpointsMap(t *testing.T) { makeServicePortName("ns1", "ep1", "p122", v1.ProtocolUDP): true, makeServicePortName("ns3", "ep3", "p33", v1.ProtocolUDP): true, }, - expectedHealthchecks: map[types.NamespacedName]int{ + expectedLocalEndpoints: map[types.NamespacedName]int{ makeNSN("ns4", "ep4"): 1, }, expectedChangedEndpoints: sets.NewString("ns1/ep1", "ns2/ep2", "ns3/ep3", "ns4/ep4"), @@ -1031,7 +1031,7 @@ func TestUpdateEndpointsMap(t *testing.T) { expectedStaleServiceNames: map[ServicePortName]bool{ makeServicePortName("ns1", "ep1", "", v1.ProtocolUDP): true, }, - expectedHealthchecks: map[types.NamespacedName]int{}, + expectedLocalEndpoints: map[types.NamespacedName]int{}, expectedChangedEndpoints: sets.NewString("ns1/ep1"), }, } @@ -1108,8 +1108,10 @@ func TestUpdateEndpointsMap(t *testing.T) { t.Errorf("[%d] expected staleServiceNames[%v], but didn't find it: %v", tci, svcName, result.StaleServiceNames) } } - if !reflect.DeepEqual(result.HCEndpointsLocalIPSize, tc.expectedHealthchecks) { - t.Errorf("[%d] expected healthchecks %v, got %v", tci, tc.expectedHealthchecks, result.HCEndpointsLocalIPSize) + + localReadyEndpoints := fp.endpointsMap.LocalReadyEndpoints() + if !reflect.DeepEqual(localReadyEndpoints, tc.expectedLocalEndpoints) { + t.Errorf("[%d] expected local ready endpoints %v, got %v", tci, tc.expectedLocalEndpoints, localReadyEndpoints) } }) } diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index fbde2f8db8b..107c2445983 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -1570,10 +1570,10 @@ func (proxier *Proxier) syncProxyRules() { // Update service healthchecks. The endpoints list might include services that are // not "OnlyLocal", but the services list will not, and the serviceHealthServer // will just drop those endpoints. - if err := proxier.serviceHealthServer.SyncServices(serviceUpdateResult.HCServiceNodePorts); err != nil { + if err := proxier.serviceHealthServer.SyncServices(proxier.svcPortMap.HealthCheckNodePorts()); err != nil { klog.ErrorS(err, "Error syncing healthcheck services") } - if err := proxier.serviceHealthServer.SyncEndpoints(endpointUpdateResult.HCEndpointsLocalIPSize); err != nil { + if err := proxier.serviceHealthServer.SyncEndpoints(proxier.endpointsMap.LocalReadyEndpoints()); err != nil { klog.ErrorS(err, "Error syncing healthcheck endpoints") } diff --git a/pkg/proxy/iptables/proxier_test.go b/pkg/proxy/iptables/proxier_test.go index 6b050a14713..e409ec0208f 100644 --- a/pkg/proxy/iptables/proxier_test.go +++ b/pkg/proxy/iptables/proxier_test.go @@ -4039,21 +4039,22 @@ func TestBuildServiceMapAddRemove(t *testing.T) { t.Errorf("expected service map length 10, got %v", fp.svcPortMap) } - // The only-local-loadbalancer ones get added - if len(result.HCServiceNodePorts) != 1 { - t.Errorf("expected 1 healthcheck port, got %v", result.HCServiceNodePorts) - } else { - nsn := makeNSN("somewhere", "only-local-load-balancer") - if port, found := result.HCServiceNodePorts[nsn]; !found || port != 345 { - t.Errorf("expected healthcheck port [%q]=345: got %v", nsn, result.HCServiceNodePorts) - } - } - if len(result.UDPStaleClusterIP) != 0 { // Services only added, so nothing stale yet t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP)) } + // The only-local-loadbalancer ones get added + healthCheckNodePorts := fp.svcPortMap.HealthCheckNodePorts() + if len(healthCheckNodePorts) != 1 { + t.Errorf("expected 1 healthcheck port, got %v", healthCheckNodePorts) + } else { + nsn := makeNSN("somewhere", "only-local-load-balancer") + if port, found := healthCheckNodePorts[nsn]; !found || port != 345 { + t.Errorf("expected healthcheck port [%q]=345: got %v", nsn, healthCheckNodePorts) + } + } + // Remove some stuff // oneService is a modification of services[0] with removed first port. oneService := makeTestService("somewhere-else", "cluster-ip", func(svc *v1.Service) { @@ -4072,10 +4073,6 @@ func TestBuildServiceMapAddRemove(t *testing.T) { t.Errorf("expected service map length 1, got %v", fp.svcPortMap) } - if len(result.HCServiceNodePorts) != 0 { - t.Errorf("expected 0 healthcheck ports, got %v", result.HCServiceNodePorts) - } - // All services but one were deleted. While you'd expect only the ClusterIPs // from the three deleted services here, we still have the ClusterIP for // the not-deleted service, because one of it's ServicePorts was deleted. @@ -4088,6 +4085,11 @@ func TestBuildServiceMapAddRemove(t *testing.T) { t.Errorf("expected stale UDP service service %s", ip) } } + + healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts() + if len(healthCheckNodePorts) != 0 { + t.Errorf("expected 0 healthcheck ports, got %v", healthCheckNodePorts) + } } func TestBuildServiceMapServiceHeadless(t *testing.T) { @@ -4112,14 +4114,15 @@ func TestBuildServiceMapServiceHeadless(t *testing.T) { t.Errorf("expected service map length 0, got %d", len(fp.svcPortMap)) } - // No proxied services, so no healthchecks - if len(result.HCServiceNodePorts) != 0 { - t.Errorf("expected healthcheck ports length 0, got %d", len(result.HCServiceNodePorts)) - } - if len(result.UDPStaleClusterIP) != 0 { t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP)) } + + // No proxied services, so no healthchecks + healthCheckNodePorts := fp.svcPortMap.HealthCheckNodePorts() + if len(healthCheckNodePorts) != 0 { + t.Errorf("expected healthcheck ports length 0, got %d", len(healthCheckNodePorts)) + } } func TestBuildServiceMapServiceTypeExternalName(t *testing.T) { @@ -4139,13 +4142,14 @@ func TestBuildServiceMapServiceTypeExternalName(t *testing.T) { if len(fp.svcPortMap) != 0 { t.Errorf("expected service map length 0, got %v", fp.svcPortMap) } - // No proxied services, so no healthchecks - if len(result.HCServiceNodePorts) != 0 { - t.Errorf("expected healthcheck ports length 0, got %v", result.HCServiceNodePorts) - } if len(result.UDPStaleClusterIP) != 0 { t.Errorf("expected stale UDP services length 0, got %v", result.UDPStaleClusterIP) } + // No proxied services, so no healthchecks + healthCheckNodePorts := fp.svcPortMap.HealthCheckNodePorts() + if len(healthCheckNodePorts) != 0 { + t.Errorf("expected healthcheck ports length 0, got %v", healthCheckNodePorts) + } } func TestBuildServiceMapServiceUpdate(t *testing.T) { @@ -4179,13 +4183,14 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) { if len(fp.svcPortMap) != 2 { t.Errorf("expected service map length 2, got %v", fp.svcPortMap) } - if len(result.HCServiceNodePorts) != 0 { - t.Errorf("expected healthcheck ports length 0, got %v", result.HCServiceNodePorts) - } if len(result.UDPStaleClusterIP) != 0 { // Services only added, so nothing stale yet t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP)) } + healthCheckNodePorts := fp.svcPortMap.HealthCheckNodePorts() + if len(healthCheckNodePorts) != 0 { + t.Errorf("expected healthcheck ports length 0, got %v", healthCheckNodePorts) + } // Change service to load-balancer fp.OnServiceUpdate(servicev1, servicev2) @@ -4193,12 +4198,13 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) { if len(fp.svcPortMap) != 2 { t.Errorf("expected service map length 2, got %v", fp.svcPortMap) } - if len(result.HCServiceNodePorts) != 1 { - t.Errorf("expected healthcheck ports length 1, got %v", result.HCServiceNodePorts) - } if len(result.UDPStaleClusterIP) != 0 { t.Errorf("expected stale UDP services length 0, got %v", result.UDPStaleClusterIP.UnsortedList()) } + healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts() + if len(healthCheckNodePorts) != 1 { + t.Errorf("expected healthcheck ports length 1, got %v", healthCheckNodePorts) + } // No change; make sure the service map stays the same and there are // no health-check changes @@ -4207,12 +4213,13 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) { if len(fp.svcPortMap) != 2 { t.Errorf("expected service map length 2, got %v", fp.svcPortMap) } - if len(result.HCServiceNodePorts) != 1 { - t.Errorf("expected healthcheck ports length 1, got %v", result.HCServiceNodePorts) - } if len(result.UDPStaleClusterIP) != 0 { t.Errorf("expected stale UDP services length 0, got %v", result.UDPStaleClusterIP.UnsortedList()) } + healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts() + if len(healthCheckNodePorts) != 1 { + t.Errorf("expected healthcheck ports length 1, got %v", healthCheckNodePorts) + } // And back to ClusterIP fp.OnServiceUpdate(servicev2, servicev1) @@ -4220,13 +4227,14 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) { if len(fp.svcPortMap) != 2 { t.Errorf("expected service map length 2, got %v", fp.svcPortMap) } - if len(result.HCServiceNodePorts) != 0 { - t.Errorf("expected healthcheck ports length 0, got %v", result.HCServiceNodePorts) - } if len(result.UDPStaleClusterIP) != 0 { // Services only added, so nothing stale yet t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP)) } + healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts() + if len(healthCheckNodePorts) != 0 { + t.Errorf("expected healthcheck ports length 0, got %v", healthCheckNodePorts) + } } func populateEndpointSlices(proxier *Proxier, allEndpointSlices ...*discovery.EndpointSlice) { @@ -4633,7 +4641,7 @@ func TestUpdateEndpointsMap(t *testing.T) { expectedResult map[proxy.ServicePortName][]*endpointsInfo expectedStaleEndpoints []proxy.ServiceEndpoint expectedStaleServiceNames map[proxy.ServicePortName]bool - expectedHealthchecks map[types.NamespacedName]int + expectedLocalEndpoints map[types.NamespacedName]int }{{ // Case[0]: nothing name: "nothing", @@ -4641,7 +4649,7 @@ func TestUpdateEndpointsMap(t *testing.T) { expectedResult: map[proxy.ServicePortName][]*endpointsInfo{}, expectedStaleEndpoints: []proxy.ServiceEndpoint{}, expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, - expectedHealthchecks: map[types.NamespacedName]int{}, + expectedLocalEndpoints: map[types.NamespacedName]int{}, }, { // Case[1]: no change, named port, local name: "no change, named port, local", @@ -4659,7 +4667,7 @@ func TestUpdateEndpointsMap(t *testing.T) { }, expectedStaleEndpoints: []proxy.ServiceEndpoint{}, expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, - expectedHealthchecks: map[types.NamespacedName]int{ + expectedLocalEndpoints: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 1, }, }, { @@ -4685,7 +4693,7 @@ func TestUpdateEndpointsMap(t *testing.T) { }, expectedStaleEndpoints: []proxy.ServiceEndpoint{}, expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, - expectedHealthchecks: map[types.NamespacedName]int{}, + expectedLocalEndpoints: map[types.NamespacedName]int{}, }, { // Case[3]: no change, multiple subsets, multiple ports, local name: "no change, multiple subsets, multiple ports, local", @@ -4715,7 +4723,7 @@ func TestUpdateEndpointsMap(t *testing.T) { }, expectedStaleEndpoints: []proxy.ServiceEndpoint{}, expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, - expectedHealthchecks: map[types.NamespacedName]int{ + expectedLocalEndpoints: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 1, }, }, { @@ -4777,7 +4785,7 @@ func TestUpdateEndpointsMap(t *testing.T) { }, expectedStaleEndpoints: []proxy.ServiceEndpoint{}, expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, - expectedHealthchecks: map[types.NamespacedName]int{ + expectedLocalEndpoints: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 2, makeNSN("ns2", "ep2"): 1, }, @@ -4796,7 +4804,7 @@ func TestUpdateEndpointsMap(t *testing.T) { expectedStaleServiceNames: map[proxy.ServicePortName]bool{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): true, }, - expectedHealthchecks: map[types.NamespacedName]int{ + expectedLocalEndpoints: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 1, }, }, { @@ -4815,7 +4823,7 @@ func TestUpdateEndpointsMap(t *testing.T) { ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP), }}, expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, - expectedHealthchecks: map[types.NamespacedName]int{}, + expectedLocalEndpoints: map[types.NamespacedName]int{}, }, { // Case[7]: add an IP and port name: "add an IP and port", @@ -4840,7 +4848,7 @@ func TestUpdateEndpointsMap(t *testing.T) { expectedStaleServiceNames: map[proxy.ServicePortName]bool{ makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true, }, - expectedHealthchecks: map[types.NamespacedName]int{ + expectedLocalEndpoints: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 1, }, }, { @@ -4874,7 +4882,7 @@ func TestUpdateEndpointsMap(t *testing.T) { ServicePortName: makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP), }}, expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, - expectedHealthchecks: map[types.NamespacedName]int{}, + expectedLocalEndpoints: map[types.NamespacedName]int{}, }, { // Case[9]: add a subset name: "add a subset", @@ -4897,7 +4905,7 @@ func TestUpdateEndpointsMap(t *testing.T) { expectedStaleServiceNames: map[proxy.ServicePortName]bool{ makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true, }, - expectedHealthchecks: map[types.NamespacedName]int{ + expectedLocalEndpoints: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 1, }, }, { @@ -4923,7 +4931,7 @@ func TestUpdateEndpointsMap(t *testing.T) { ServicePortName: makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP), }}, expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, - expectedHealthchecks: map[types.NamespacedName]int{}, + expectedLocalEndpoints: map[types.NamespacedName]int{}, }, { // Case[11]: rename a port name: "rename a port", @@ -4946,7 +4954,7 @@ func TestUpdateEndpointsMap(t *testing.T) { expectedStaleServiceNames: map[proxy.ServicePortName]bool{ makeServicePortName("ns1", "ep1", "p11-2", v1.ProtocolUDP): true, }, - expectedHealthchecks: map[types.NamespacedName]int{}, + expectedLocalEndpoints: map[types.NamespacedName]int{}, }, { // Case[12]: renumber a port name: "renumber a port", @@ -4967,7 +4975,7 @@ func TestUpdateEndpointsMap(t *testing.T) { ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP), }}, expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, - expectedHealthchecks: map[types.NamespacedName]int{}, + expectedLocalEndpoints: map[types.NamespacedName]int{}, }, { // Case[13]: complex add and remove name: "complex add and remove", @@ -5031,7 +5039,7 @@ func TestUpdateEndpointsMap(t *testing.T) { makeServicePortName("ns1", "ep1", "p122", v1.ProtocolUDP): true, makeServicePortName("ns3", "ep3", "p33", v1.ProtocolUDP): true, }, - expectedHealthchecks: map[types.NamespacedName]int{ + expectedLocalEndpoints: map[types.NamespacedName]int{ makeNSN("ns4", "ep4"): 1, }, }, { @@ -5049,7 +5057,7 @@ func TestUpdateEndpointsMap(t *testing.T) { expectedStaleServiceNames: map[proxy.ServicePortName]bool{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): true, }, - expectedHealthchecks: map[types.NamespacedName]int{}, + expectedLocalEndpoints: map[types.NamespacedName]int{}, }, } @@ -5117,8 +5125,9 @@ func TestUpdateEndpointsMap(t *testing.T) { t.Errorf("[%d] expected staleServiceNames[%v], but didn't find it: %v", tci, svcName, result.StaleServiceNames) } } - if !reflect.DeepEqual(result.HCEndpointsLocalIPSize, tc.expectedHealthchecks) { - t.Errorf("[%d] expected healthchecks %v, got %v", tci, tc.expectedHealthchecks, result.HCEndpointsLocalIPSize) + localReadyEndpoints := fp.endpointsMap.LocalReadyEndpoints() + if !reflect.DeepEqual(localReadyEndpoints, tc.expectedLocalEndpoints) { + t.Errorf("[%d] expected local endpoints %v, got %v", tci, tc.expectedLocalEndpoints, localReadyEndpoints) } }) } @@ -5277,9 +5286,10 @@ func TestHealthCheckNodePortWhenTerminating(t *testing.T) { } fp.OnEndpointSliceAdd(endpointSlice) - result := fp.endpointsMap.Update(fp.endpointsChanges) - if len(result.HCEndpointsLocalIPSize) != 1 { - t.Errorf("unexpected number of health check node ports, expected 1 but got: %d", len(result.HCEndpointsLocalIPSize)) + _ = fp.endpointsMap.Update(fp.endpointsChanges) + localReadyEndpoints := fp.endpointsMap.LocalReadyEndpoints() + if len(localReadyEndpoints) != 1 { + t.Errorf("unexpected number of local ready endpoints, expected 1 but got: %d", len(localReadyEndpoints)) } // set all endpoints to terminating @@ -5331,9 +5341,10 @@ func TestHealthCheckNodePortWhenTerminating(t *testing.T) { } fp.OnEndpointSliceUpdate(endpointSlice, endpointSliceTerminating) - result = fp.endpointsMap.Update(fp.endpointsChanges) - if len(result.HCEndpointsLocalIPSize) != 0 { - t.Errorf("unexpected number of health check node ports, expected 0 but got: %d", len(result.HCEndpointsLocalIPSize)) + _ = fp.endpointsMap.Update(fp.endpointsChanges) + localReadyEndpoints = fp.endpointsMap.LocalReadyEndpoints() + if len(localReadyEndpoints) != 0 { + t.Errorf("unexpected number of local ready endpoints, expected 0 but got: %d", len(localReadyEndpoints)) } } diff --git a/pkg/proxy/ipvs/proxier.go b/pkg/proxy/ipvs/proxier.go index 21b4e550422..7798662675e 100644 --- a/pkg/proxy/ipvs/proxier.go +++ b/pkg/proxy/ipvs/proxier.go @@ -1572,10 +1572,10 @@ func (proxier *Proxier) syncProxyRules() { // Update service healthchecks. The endpoints list might include services that are // not "OnlyLocal", but the services list will not, and the serviceHealthServer // will just drop those endpoints. - if err := proxier.serviceHealthServer.SyncServices(serviceUpdateResult.HCServiceNodePorts); err != nil { + if err := proxier.serviceHealthServer.SyncServices(proxier.svcPortMap.HealthCheckNodePorts()); err != nil { klog.ErrorS(err, "Error syncing healthcheck services") } - if err := proxier.serviceHealthServer.SyncEndpoints(endpointUpdateResult.HCEndpointsLocalIPSize); err != nil { + if err := proxier.serviceHealthServer.SyncEndpoints(proxier.endpointsMap.LocalReadyEndpoints()); err != nil { klog.ErrorS(err, "Error syncing healthcheck endpoints") } diff --git a/pkg/proxy/ipvs/proxier_test.go b/pkg/proxy/ipvs/proxier_test.go index 0e1e8b5383e..bba797e9a04 100644 --- a/pkg/proxy/ipvs/proxier_test.go +++ b/pkg/proxy/ipvs/proxier_test.go @@ -2589,21 +2589,22 @@ func TestBuildServiceMapAddRemove(t *testing.T) { t.Errorf("expected service map length 12, got %v", fp.svcPortMap) } - // The only-local-loadbalancer ones get added - if len(result.HCServiceNodePorts) != 1 { - t.Errorf("expected 1 healthcheck port, got %v", result.HCServiceNodePorts) - } else { - nsn := makeNSN("somewhere", "only-local-load-balancer") - if port, found := result.HCServiceNodePorts[nsn]; !found || port != 345 { - t.Errorf("expected healthcheck port [%q]=345: got %v", nsn, result.HCServiceNodePorts) - } - } - if len(result.UDPStaleClusterIP) != 0 { // Services only added, so nothing stale yet t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP)) } + // The only-local-loadbalancer ones get added + healthCheckNodePorts := fp.svcPortMap.HealthCheckNodePorts() + if len(healthCheckNodePorts) != 1 { + t.Errorf("expected 1 healthcheck port, got %v", healthCheckNodePorts) + } else { + nsn := makeNSN("somewhere", "only-local-load-balancer") + if port, found := healthCheckNodePorts[nsn]; !found || port != 345 { + t.Errorf("expected healthcheck port [%q]=345: got %v", nsn, healthCheckNodePorts) + } + } + // Remove some stuff // oneService is a modification of services[0] with removed first port. oneService := makeTestService("somewhere-else", "cluster-ip", func(svc *v1.Service) { @@ -2622,10 +2623,6 @@ func TestBuildServiceMapAddRemove(t *testing.T) { t.Errorf("expected service map length 1, got %v", fp.svcPortMap) } - if len(result.HCServiceNodePorts) != 0 { - t.Errorf("expected 0 healthcheck ports, got %v", result.HCServiceNodePorts) - } - // All services but one were deleted. While you'd expect only the ClusterIPs // from the three deleted services here, we still have the ClusterIP for // the not-deleted service, because one of it's ServicePorts was deleted. @@ -2638,6 +2635,11 @@ func TestBuildServiceMapAddRemove(t *testing.T) { t.Errorf("expected stale UDP service service %s", ip) } } + + healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts() + if len(healthCheckNodePorts) != 0 { + t.Errorf("expected 0 healthcheck ports, got %v", healthCheckNodePorts) + } } func TestBuildServiceMapServiceHeadless(t *testing.T) { @@ -2669,14 +2671,15 @@ func TestBuildServiceMapServiceHeadless(t *testing.T) { t.Errorf("expected service map length 0, got %d", len(fp.svcPortMap)) } - // No proxied services, so no healthchecks - if len(result.HCServiceNodePorts) != 0 { - t.Errorf("expected healthcheck ports length 0, got %d", len(result.HCServiceNodePorts)) - } - if len(result.UDPStaleClusterIP) != 0 { t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP)) } + + // No proxied services, so no healthchecks + healthCheckNodePorts := fp.svcPortMap.HealthCheckNodePorts() + if len(healthCheckNodePorts) != 0 { + t.Errorf("expected healthcheck ports length 0, got %d", len(healthCheckNodePorts)) + } } func TestBuildServiceMapServiceTypeExternalName(t *testing.T) { @@ -2698,13 +2701,15 @@ func TestBuildServiceMapServiceTypeExternalName(t *testing.T) { if len(fp.svcPortMap) != 0 { t.Errorf("expected service map length 0, got %v", fp.svcPortMap) } - // No proxied services, so no healthchecks - if len(result.HCServiceNodePorts) != 0 { - t.Errorf("expected healthcheck ports length 0, got %v", result.HCServiceNodePorts) - } if len(result.UDPStaleClusterIP) != 0 { t.Errorf("expected stale UDP services length 0, got %v", result.UDPStaleClusterIP) } + + // No proxied services, so no healthchecks + healthCheckNodePorts := fp.svcPortMap.HealthCheckNodePorts() + if len(healthCheckNodePorts) != 0 { + t.Errorf("expected healthcheck ports length 0, got %v", healthCheckNodePorts) + } } func TestBuildServiceMapServiceUpdate(t *testing.T) { @@ -2740,27 +2745,31 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) { if len(fp.svcPortMap) != 2 { t.Errorf("expected service map length 2, got %v", fp.svcPortMap) } - if len(result.HCServiceNodePorts) != 0 { - t.Errorf("expected healthcheck ports length 0, got %v", result.HCServiceNodePorts) - } if len(result.UDPStaleClusterIP) != 0 { // Services only added, so nothing stale yet t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP)) } + healthCheckNodePorts := fp.svcPortMap.HealthCheckNodePorts() + if len(healthCheckNodePorts) != 0 { + t.Errorf("expected healthcheck ports length 0, got %v", healthCheckNodePorts) + } + // Change service to load-balancer fp.OnServiceUpdate(servicev1, servicev2) result = fp.svcPortMap.Update(fp.serviceChanges) if len(fp.svcPortMap) != 2 { t.Errorf("expected service map length 2, got %v", fp.svcPortMap) } - if len(result.HCServiceNodePorts) != 1 { - t.Errorf("expected healthcheck ports length 1, got %v", result.HCServiceNodePorts) - } if len(result.UDPStaleClusterIP) != 0 { t.Errorf("expected stale UDP services length 0, got %v", result.UDPStaleClusterIP.List()) } + healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts() + if len(healthCheckNodePorts) != 1 { + t.Errorf("expected healthcheck ports length 1, got %v", healthCheckNodePorts) + } + // No change; make sure the service map stays the same and there are // no health-check changes fp.OnServiceUpdate(servicev2, servicev2) @@ -2768,26 +2777,30 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) { if len(fp.svcPortMap) != 2 { t.Errorf("expected service map length 2, got %v", fp.svcPortMap) } - if len(result.HCServiceNodePorts) != 1 { - t.Errorf("expected healthcheck ports length 1, got %v", result.HCServiceNodePorts) - } if len(result.UDPStaleClusterIP) != 0 { t.Errorf("expected stale UDP services length 0, got %v", result.UDPStaleClusterIP.List()) } + healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts() + if len(healthCheckNodePorts) != 1 { + t.Errorf("expected healthcheck ports length 1, got %v", healthCheckNodePorts) + } + // And back to ClusterIP fp.OnServiceUpdate(servicev2, servicev1) result = fp.svcPortMap.Update(fp.serviceChanges) if len(fp.svcPortMap) != 2 { t.Errorf("expected service map length 2, got %v", fp.svcPortMap) } - if len(result.HCServiceNodePorts) != 0 { - t.Errorf("expected healthcheck ports length 0, got %v", result.HCServiceNodePorts) - } if len(result.UDPStaleClusterIP) != 0 { // Services only added, so nothing stale yet t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP)) } + + healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts() + if len(healthCheckNodePorts) != 0 { + t.Errorf("expected healthcheck ports length 0, got %v", healthCheckNodePorts) + } } func TestSessionAffinity(t *testing.T) { @@ -3188,7 +3201,7 @@ func Test_updateEndpointsMap(t *testing.T) { expectedResult map[proxy.ServicePortName][]*proxy.BaseEndpointInfo expectedStaleEndpoints []proxy.ServiceEndpoint expectedStaleServiceNames map[proxy.ServicePortName]bool - expectedHealthchecks map[types.NamespacedName]int + expectedReadyEndpoints map[types.NamespacedName]int }{{ // Case[0]: nothing name: "nothing", @@ -3196,7 +3209,7 @@ func Test_updateEndpointsMap(t *testing.T) { expectedResult: map[proxy.ServicePortName][]*proxy.BaseEndpointInfo{}, expectedStaleEndpoints: []proxy.ServiceEndpoint{}, expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, - expectedHealthchecks: map[types.NamespacedName]int{}, + expectedReadyEndpoints: map[types.NamespacedName]int{}, }, { // Case[1]: no change, named port, local name: "no change, named port, local", @@ -3214,7 +3227,7 @@ func Test_updateEndpointsMap(t *testing.T) { }, expectedStaleEndpoints: []proxy.ServiceEndpoint{}, expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, - expectedHealthchecks: map[types.NamespacedName]int{ + expectedReadyEndpoints: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 1, }, }, { @@ -3240,7 +3253,7 @@ func Test_updateEndpointsMap(t *testing.T) { }, expectedStaleEndpoints: []proxy.ServiceEndpoint{}, expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, - expectedHealthchecks: map[types.NamespacedName]int{}, + expectedReadyEndpoints: map[types.NamespacedName]int{}, }, { // Case[3]: no change, multiple subsets, multiple ports, local name: "no change, multiple subsets, multiple ports, local", @@ -3270,7 +3283,7 @@ func Test_updateEndpointsMap(t *testing.T) { }, expectedStaleEndpoints: []proxy.ServiceEndpoint{}, expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, - expectedHealthchecks: map[types.NamespacedName]int{ + expectedReadyEndpoints: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 1, }, }, { @@ -3332,7 +3345,7 @@ func Test_updateEndpointsMap(t *testing.T) { }, expectedStaleEndpoints: []proxy.ServiceEndpoint{}, expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, - expectedHealthchecks: map[types.NamespacedName]int{ + expectedReadyEndpoints: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 2, makeNSN("ns2", "ep2"): 1, }, @@ -3351,7 +3364,7 @@ func Test_updateEndpointsMap(t *testing.T) { expectedStaleServiceNames: map[proxy.ServicePortName]bool{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): true, }, - expectedHealthchecks: map[types.NamespacedName]int{ + expectedReadyEndpoints: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 1, }, }, { @@ -3370,7 +3383,7 @@ func Test_updateEndpointsMap(t *testing.T) { ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP), }}, expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, - expectedHealthchecks: map[types.NamespacedName]int{}, + expectedReadyEndpoints: map[types.NamespacedName]int{}, }, { // Case[7]: add an IP and port name: "add an IP and port", @@ -3395,7 +3408,7 @@ func Test_updateEndpointsMap(t *testing.T) { expectedStaleServiceNames: map[proxy.ServicePortName]bool{ makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true, }, - expectedHealthchecks: map[types.NamespacedName]int{ + expectedReadyEndpoints: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 1, }, }, { @@ -3429,7 +3442,7 @@ func Test_updateEndpointsMap(t *testing.T) { ServicePortName: makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP), }}, expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, - expectedHealthchecks: map[types.NamespacedName]int{}, + expectedReadyEndpoints: map[types.NamespacedName]int{}, }, { // Case[9]: add a subset name: "add a subset", @@ -3452,7 +3465,7 @@ func Test_updateEndpointsMap(t *testing.T) { expectedStaleServiceNames: map[proxy.ServicePortName]bool{ makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true, }, - expectedHealthchecks: map[types.NamespacedName]int{ + expectedReadyEndpoints: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 1, }, }, { @@ -3478,7 +3491,7 @@ func Test_updateEndpointsMap(t *testing.T) { ServicePortName: makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP), }}, expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, - expectedHealthchecks: map[types.NamespacedName]int{}, + expectedReadyEndpoints: map[types.NamespacedName]int{}, }, { // Case[11]: rename a port name: "rename a port", @@ -3501,7 +3514,7 @@ func Test_updateEndpointsMap(t *testing.T) { expectedStaleServiceNames: map[proxy.ServicePortName]bool{ makeServicePortName("ns1", "ep1", "p11-2", v1.ProtocolUDP): true, }, - expectedHealthchecks: map[types.NamespacedName]int{}, + expectedReadyEndpoints: map[types.NamespacedName]int{}, }, { // Case[12]: renumber a port name: "renumber a port", @@ -3522,7 +3535,7 @@ func Test_updateEndpointsMap(t *testing.T) { ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP), }}, expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, - expectedHealthchecks: map[types.NamespacedName]int{}, + expectedReadyEndpoints: map[types.NamespacedName]int{}, }, { // Case[13]: complex add and remove name: "complex add and remove", @@ -3586,7 +3599,7 @@ func Test_updateEndpointsMap(t *testing.T) { makeServicePortName("ns1", "ep1", "p122", v1.ProtocolUDP): true, makeServicePortName("ns3", "ep3", "p33", v1.ProtocolUDP): true, }, - expectedHealthchecks: map[types.NamespacedName]int{ + expectedReadyEndpoints: map[types.NamespacedName]int{ makeNSN("ns4", "ep4"): 1, }, }, { @@ -3604,7 +3617,7 @@ func Test_updateEndpointsMap(t *testing.T) { expectedStaleServiceNames: map[proxy.ServicePortName]bool{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): true, }, - expectedHealthchecks: map[types.NamespacedName]int{}, + expectedReadyEndpoints: map[types.NamespacedName]int{}, }, } @@ -3675,8 +3688,9 @@ func Test_updateEndpointsMap(t *testing.T) { t.Errorf("[%d] expected staleServiceNames[%v], but didn't find it: %v", tci, svcName, result.StaleServiceNames) } } - if !reflect.DeepEqual(result.HCEndpointsLocalIPSize, tc.expectedHealthchecks) { - t.Errorf("[%d] expected healthchecks %v, got %v", tci, tc.expectedHealthchecks, result.HCEndpointsLocalIPSize) + localReadyEndpoints := fp.endpointsMap.LocalReadyEndpoints() + if !reflect.DeepEqual(localReadyEndpoints, tc.expectedReadyEndpoints) { + t.Errorf("[%d] expected healthchecks %v, got %v", tci, tc.expectedReadyEndpoints, localReadyEndpoints) } }) } @@ -4665,9 +4679,10 @@ func Test_HealthCheckNodePortWhenTerminating(t *testing.T) { } fp.OnEndpointSliceAdd(endpointSlice) - result := fp.endpointsMap.Update(fp.endpointsChanges) - if len(result.HCEndpointsLocalIPSize) != 1 { - t.Errorf("unexpected number of health check node ports, expected 1 but got: %d", len(result.HCEndpointsLocalIPSize)) + _ = fp.endpointsMap.Update(fp.endpointsChanges) + localReadyEndpoints := fp.endpointsMap.LocalReadyEndpoints() + if len(localReadyEndpoints) != 1 { + t.Errorf("unexpected number of health check node ports, expected 1 but got: %d", len(localReadyEndpoints)) } // set all endpoints to terminating @@ -4719,9 +4734,10 @@ func Test_HealthCheckNodePortWhenTerminating(t *testing.T) { } fp.OnEndpointSliceUpdate(endpointSlice, endpointSliceTerminating) - result = fp.endpointsMap.Update(fp.endpointsChanges) - if len(result.HCEndpointsLocalIPSize) != 0 { - t.Errorf("unexpected number of health check node ports, expected 0 but got: %d", len(result.HCEndpointsLocalIPSize)) + _ = fp.endpointsMap.Update(fp.endpointsChanges) + localReadyEndpoints = fp.endpointsMap.LocalReadyEndpoints() + if len(localReadyEndpoints) != 0 { + t.Errorf("unexpected number of health check node ports, expected 0 but got: %d", len(localReadyEndpoints)) } } diff --git a/pkg/proxy/service.go b/pkg/proxy/service.go index ceae54eb202..1d2d8071988 100644 --- a/pkg/proxy/service.go +++ b/pkg/proxy/service.go @@ -337,9 +337,6 @@ func (sct *ServiceChangeTracker) PendingChanges() sets.String { // UpdateServiceMapResult is the updated results after applying service changes. type UpdateServiceMapResult struct { - // HCServiceNodePorts is a map of Service names to node port numbers which indicate the health of that Service on this Node. - // The value(uint16) of HCServices map is the service health check node port. - HCServiceNodePorts map[types.NamespacedName]uint16 // UDPStaleClusterIP holds stale (no longer assigned to a Service) Service IPs that had UDP ports. // Callers can use this to abort timeout-waits or clear connection-tracking information. UDPStaleClusterIP sets.String @@ -349,17 +346,21 @@ type UpdateServiceMapResult struct { func (sm ServicePortMap) Update(changes *ServiceChangeTracker) (result UpdateServiceMapResult) { result.UDPStaleClusterIP = sets.NewString() sm.apply(changes, result.UDPStaleClusterIP) + return result +} +// HealthCheckNodePorts returns a map of Service names to HealthCheckNodePort values +// for all Services in sm with non-zero HealthCheckNodePort. +func (sm ServicePortMap) HealthCheckNodePorts() map[types.NamespacedName]uint16 { // TODO: If this will appear to be computationally expensive, consider // computing this incrementally similarly to svcPortMap. - result.HCServiceNodePorts = make(map[types.NamespacedName]uint16) + ports := make(map[types.NamespacedName]uint16) for svcPortName, info := range sm { if info.HealthCheckNodePort() != 0 { - result.HCServiceNodePorts[svcPortName.NamespacedName] = uint16(info.HealthCheckNodePort()) + ports[svcPortName.NamespacedName] = uint16(info.HealthCheckNodePort()) } } - - return result + return ports } // ServicePortMap maps a service to its ServicePort. diff --git a/pkg/proxy/service_test.go b/pkg/proxy/service_test.go index a45ae5e490a..4b694425047 100644 --- a/pkg/proxy/service_test.go +++ b/pkg/proxy/service_test.go @@ -568,15 +568,15 @@ func TestServiceMapUpdateHeadless(t *testing.T) { if len(fp.svcPortMap) != 0 { t.Errorf("expected service map length 0, got %d", len(fp.svcPortMap)) } - - // No proxied services, so no healthchecks - if len(result.HCServiceNodePorts) != 0 { - t.Errorf("expected healthcheck ports length 0, got %d", len(result.HCServiceNodePorts)) - } - if len(result.UDPStaleClusterIP) != 0 { t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP)) } + + // No proxied services, so no healthchecks + healthCheckNodePorts := fp.svcPortMap.HealthCheckNodePorts() + if len(healthCheckNodePorts) != 0 { + t.Errorf("expected healthcheck ports length 0, got %d", len(healthCheckNodePorts)) + } } func TestUpdateServiceTypeExternalName(t *testing.T) { @@ -599,13 +599,15 @@ func TestUpdateServiceTypeExternalName(t *testing.T) { if len(fp.svcPortMap) != 0 { t.Errorf("expected service map length 0, got %v", fp.svcPortMap) } - // No proxied services, so no healthchecks - if len(result.HCServiceNodePorts) != 0 { - t.Errorf("expected healthcheck ports length 0, got %v", result.HCServiceNodePorts) - } if len(result.UDPStaleClusterIP) != 0 { t.Errorf("expected stale UDP services length 0, got %v", result.UDPStaleClusterIP) } + + // No proxied services, so no healthchecks + healthCheckNodePorts := fp.svcPortMap.HealthCheckNodePorts() + if len(healthCheckNodePorts) != 0 { + t.Errorf("expected healthcheck ports length 0, got %v", healthCheckNodePorts) + } } func TestBuildServiceMapAddRemove(t *testing.T) { @@ -671,22 +673,22 @@ func TestBuildServiceMapAddRemove(t *testing.T) { if len(fp.svcPortMap) != 8 { t.Errorf("expected service map length 2, got %v", fp.svcPortMap) } - - // The only-local-loadbalancer ones get added - if len(result.HCServiceNodePorts) != 1 { - t.Errorf("expected 1 healthcheck port, got %v", result.HCServiceNodePorts) - } else { - nsn := makeNSN("ns1", "only-local-load-balancer") - if port, found := result.HCServiceNodePorts[nsn]; !found || port != 345 { - t.Errorf("expected healthcheck port [%q]=345: got %v", nsn, result.HCServiceNodePorts) - } - } - if len(result.UDPStaleClusterIP) != 0 { // Services only added, so nothing stale yet t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP)) } + // The only-local-loadbalancer ones get added + healthCheckNodePorts := fp.svcPortMap.HealthCheckNodePorts() + if len(healthCheckNodePorts) != 1 { + t.Errorf("expected 1 healthcheck port, got %v", healthCheckNodePorts) + } else { + nsn := makeNSN("ns1", "only-local-load-balancer") + if port, found := healthCheckNodePorts[nsn]; !found || port != 345 { + t.Errorf("expected healthcheck port [%q]=345: got %v", nsn, healthCheckNodePorts) + } + } + // Remove some stuff // oneService is a modification of services[0] with removed first port. oneService := makeTestService("ns2", "cluster-ip", func(svc *v1.Service) { @@ -709,8 +711,9 @@ func TestBuildServiceMapAddRemove(t *testing.T) { t.Errorf("expected service map length 1, got %v", fp.svcPortMap) } - if len(result.HCServiceNodePorts) != 0 { - t.Errorf("expected 0 healthcheck ports, got %v", result.HCServiceNodePorts) + healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts() + if len(healthCheckNodePorts) != 0 { + t.Errorf("expected 0 healthcheck ports, got %v", healthCheckNodePorts) } // All services but one were deleted. While you'd expect only the ClusterIPs @@ -761,14 +764,16 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) { if len(fp.svcPortMap) != 2 { t.Errorf("expected service map length 2, got %v", fp.svcPortMap) } - if len(result.HCServiceNodePorts) != 0 { - t.Errorf("expected healthcheck ports length 0, got %v", result.HCServiceNodePorts) - } if len(result.UDPStaleClusterIP) != 0 { // Services only added, so nothing stale yet t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP)) } + healthCheckNodePorts := fp.svcPortMap.HealthCheckNodePorts() + if len(healthCheckNodePorts) != 0 { + t.Errorf("expected healthcheck ports length 0, got %v", healthCheckNodePorts) + } + // Change service to load-balancer fp.updateService(servicev1, servicev2) pending = fp.serviceChanges.PendingChanges() @@ -779,13 +784,15 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) { if len(fp.svcPortMap) != 2 { t.Errorf("expected service map length 2, got %v", fp.svcPortMap) } - if len(result.HCServiceNodePorts) != 1 { - t.Errorf("expected healthcheck ports length 1, got %v", result.HCServiceNodePorts) - } if len(result.UDPStaleClusterIP) != 0 { t.Errorf("expected stale UDP services length 0, got %v", result.UDPStaleClusterIP.UnsortedList()) } + healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts() + if len(healthCheckNodePorts) != 1 { + t.Errorf("expected healthcheck ports length 1, got %v", healthCheckNodePorts) + } + // No change; make sure the service map stays the same and there are // no health-check changes fp.updateService(servicev2, servicev2) @@ -797,13 +804,15 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) { if len(fp.svcPortMap) != 2 { t.Errorf("expected service map length 2, got %v", fp.svcPortMap) } - if len(result.HCServiceNodePorts) != 1 { - t.Errorf("expected healthcheck ports length 1, got %v", result.HCServiceNodePorts) - } if len(result.UDPStaleClusterIP) != 0 { t.Errorf("expected stale UDP services length 0, got %v", result.UDPStaleClusterIP.UnsortedList()) } + healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts() + if len(healthCheckNodePorts) != 1 { + t.Errorf("expected healthcheck ports length 1, got %v", healthCheckNodePorts) + } + // And back to ClusterIP fp.updateService(servicev2, servicev1) pending = fp.serviceChanges.PendingChanges() @@ -814,11 +823,13 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) { if len(fp.svcPortMap) != 2 { t.Errorf("expected service map length 2, got %v", fp.svcPortMap) } - if len(result.HCServiceNodePorts) != 0 { - t.Errorf("expected healthcheck ports length 0, got %v", result.HCServiceNodePorts) - } if len(result.UDPStaleClusterIP) != 0 { // Services only added, so nothing stale yet t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP)) } + + healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts() + if len(healthCheckNodePorts) != 0 { + t.Errorf("expected healthcheck ports length 0, got %v", healthCheckNodePorts) + } } diff --git a/pkg/proxy/winkernel/proxier.go b/pkg/proxy/winkernel/proxier.go index 16cc4317b22..b302e137dee 100644 --- a/pkg/proxy/winkernel/proxier.go +++ b/pkg/proxy/winkernel/proxier.go @@ -1640,10 +1640,10 @@ func (proxier *Proxier) syncProxyRules() { // Update service healthchecks. The endpoints list might include services that are // not "OnlyLocal", but the services list will not, and the serviceHealthServer // will just drop those endpoints. - if err := proxier.serviceHealthServer.SyncServices(serviceUpdateResult.HCServiceNodePorts); err != nil { + if err := proxier.serviceHealthServer.SyncServices(proxier.svcPortMap.HealthCheckNodePorts()); err != nil { klog.ErrorS(err, "Error syncing healthcheck services") } - if err := proxier.serviceHealthServer.SyncEndpoints(endpointUpdateResult.HCEndpointsLocalIPSize); err != nil { + if err := proxier.serviceHealthServer.SyncEndpoints(proxier.endpointsMap.LocalReadyEndpoints()); err != nil { klog.ErrorS(err, "Error syncing healthcheck endpoints") }