Merge pull request #115253 from danwinship/proxy-update-healthchecknodeport
Split out HealthCheckNodePort stuff from service/endpoint map Update()
This commit is contained in:
commit
611273a5bb
@ -293,8 +293,6 @@ type endpointsChange struct {
|
||||
|
||||
// UpdateEndpointMapResult is the updated results after applying endpoints changes.
|
||||
type UpdateEndpointMapResult struct {
|
||||
// HCEndpointsLocalIPSize maps an endpoints name to the length of its local IPs.
|
||||
HCEndpointsLocalIPSize map[types.NamespacedName]int
|
||||
// StaleEndpoints identifies if an endpoints service pair is stale.
|
||||
StaleEndpoints []ServiceEndpoint
|
||||
// StaleServiceNames identifies if a service is stale.
|
||||
@ -313,15 +311,6 @@ func (em EndpointsMap) Update(changes *EndpointChangeTracker) (result UpdateEndp
|
||||
|
||||
em.apply(
|
||||
changes, &result.StaleEndpoints, &result.StaleServiceNames, &result.LastChangeTriggerTimes)
|
||||
|
||||
// TODO: If this will appear to be computationally expensive, consider
|
||||
// computing this incrementally similarly to endpointsMap.
|
||||
result.HCEndpointsLocalIPSize = make(map[types.NamespacedName]int)
|
||||
localIPs := em.getLocalReadyEndpointIPs()
|
||||
for nsn, ips := range localIPs {
|
||||
result.HCEndpointsLocalIPSize[nsn] = len(ips)
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
@ -366,7 +355,7 @@ func (em EndpointsMap) unmerge(other EndpointsMap) {
|
||||
}
|
||||
}
|
||||
|
||||
// GetLocalEndpointIPs returns endpoints IPs if given endpoint is local - local means the endpoint is running in same host as kube-proxy.
|
||||
// getLocalEndpointIPs returns endpoints IPs if given endpoint is local - local means the endpoint is running in same host as kube-proxy.
|
||||
func (em EndpointsMap) getLocalReadyEndpointIPs() map[types.NamespacedName]sets.String {
|
||||
localIPs := make(map[types.NamespacedName]sets.String)
|
||||
for svcPortName, epList := range em {
|
||||
@ -389,6 +378,25 @@ func (em EndpointsMap) getLocalReadyEndpointIPs() map[types.NamespacedName]sets.
|
||||
return localIPs
|
||||
}
|
||||
|
||||
// LocalReadyEndpoints returns a map of Service names to the number of local ready
|
||||
// endpoints for that service.
|
||||
func (em EndpointsMap) LocalReadyEndpoints() map[types.NamespacedName]int {
|
||||
// TODO: If this will appear to be computationally expensive, consider
|
||||
// computing this incrementally similarly to endpointsMap.
|
||||
|
||||
// (Note that we need to call getLocalEndpointIPs first to squash the data by IP,
|
||||
// because the EndpointsMap is sorted by IP+port, not just IP, and we want to
|
||||
// consider a Service pointing to 10.0.0.1:80 and 10.0.0.1:443 to have 1 endpoint,
|
||||
// not 2.)
|
||||
|
||||
eps := make(map[types.NamespacedName]int)
|
||||
localIPs := em.getLocalReadyEndpointIPs()
|
||||
for nsn, ips := range localIPs {
|
||||
eps[nsn] = len(ips)
|
||||
}
|
||||
return eps
|
||||
}
|
||||
|
||||
// detectStaleConnections modifies <staleEndpoints> and <staleServices> with detected stale connections. <staleServiceNames>
|
||||
// is used to store stale udp service in order to clear udp conntrack later.
|
||||
func detectStaleConnections(oldEndpointsMap, newEndpointsMap EndpointsMap, staleEndpoints *[]ServiceEndpoint, staleServiceNames *[]ServicePortName) {
|
||||
|
@ -505,7 +505,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
||||
expectedResult map[ServicePortName][]*BaseEndpointInfo
|
||||
expectedStaleEndpoints []ServiceEndpoint
|
||||
expectedStaleServiceNames map[ServicePortName]bool
|
||||
expectedHealthchecks map[types.NamespacedName]int
|
||||
expectedLocalEndpoints map[types.NamespacedName]int
|
||||
expectedChangedEndpoints sets.String
|
||||
}{{
|
||||
name: "empty",
|
||||
@ -513,7 +513,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
||||
expectedResult: map[ServicePortName][]*BaseEndpointInfo{},
|
||||
expectedStaleEndpoints: []ServiceEndpoint{},
|
||||
expectedStaleServiceNames: map[ServicePortName]bool{},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{},
|
||||
expectedLocalEndpoints: map[types.NamespacedName]int{},
|
||||
expectedChangedEndpoints: sets.NewString(),
|
||||
}, {
|
||||
name: "no change, unnamed port",
|
||||
@ -535,7 +535,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
||||
},
|
||||
expectedStaleEndpoints: []ServiceEndpoint{},
|
||||
expectedStaleServiceNames: map[ServicePortName]bool{},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{},
|
||||
expectedLocalEndpoints: map[types.NamespacedName]int{},
|
||||
expectedChangedEndpoints: sets.NewString(),
|
||||
}, {
|
||||
name: "no change, named port, local",
|
||||
@ -557,7 +557,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
||||
},
|
||||
expectedStaleEndpoints: []ServiceEndpoint{},
|
||||
expectedStaleServiceNames: map[ServicePortName]bool{},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{
|
||||
expectedLocalEndpoints: map[types.NamespacedName]int{
|
||||
makeNSN("ns1", "ep1"): 1,
|
||||
},
|
||||
expectedChangedEndpoints: sets.NewString(),
|
||||
@ -589,7 +589,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
||||
},
|
||||
expectedStaleEndpoints: []ServiceEndpoint{},
|
||||
expectedStaleServiceNames: map[ServicePortName]bool{},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{},
|
||||
expectedLocalEndpoints: map[types.NamespacedName]int{},
|
||||
expectedChangedEndpoints: sets.NewString(),
|
||||
}, {
|
||||
name: "no change, multiple slices, multiple ports, local",
|
||||
@ -625,7 +625,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
||||
},
|
||||
expectedStaleEndpoints: []ServiceEndpoint{},
|
||||
expectedStaleServiceNames: map[ServicePortName]bool{},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{
|
||||
expectedLocalEndpoints: map[types.NamespacedName]int{
|
||||
makeNSN("ns1", "ep1"): 1,
|
||||
},
|
||||
expectedChangedEndpoints: sets.NewString(),
|
||||
@ -695,7 +695,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
||||
},
|
||||
expectedStaleEndpoints: []ServiceEndpoint{},
|
||||
expectedStaleServiceNames: map[ServicePortName]bool{},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{
|
||||
expectedLocalEndpoints: map[types.NamespacedName]int{
|
||||
makeNSN("ns1", "ep1"): 2,
|
||||
makeNSN("ns2", "ep2"): 1,
|
||||
},
|
||||
@ -718,7 +718,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
||||
expectedStaleServiceNames: map[ServicePortName]bool{
|
||||
makeServicePortName("ns1", "ep1", "", v1.ProtocolUDP): true,
|
||||
},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{
|
||||
expectedLocalEndpoints: map[types.NamespacedName]int{
|
||||
makeNSN("ns1", "ep1"): 1,
|
||||
},
|
||||
expectedChangedEndpoints: sets.NewString("ns1/ep1"),
|
||||
@ -741,7 +741,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
||||
ServicePortName: makeServicePortName("ns1", "ep1", "", v1.ProtocolUDP),
|
||||
}},
|
||||
expectedStaleServiceNames: map[ServicePortName]bool{},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{},
|
||||
expectedLocalEndpoints: map[types.NamespacedName]int{},
|
||||
expectedChangedEndpoints: sets.NewString("ns1/ep1"),
|
||||
}, {
|
||||
name: "add an IP and port",
|
||||
@ -770,7 +770,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
||||
expectedStaleServiceNames: map[ServicePortName]bool{
|
||||
makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true,
|
||||
},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{
|
||||
expectedLocalEndpoints: map[types.NamespacedName]int{
|
||||
makeNSN("ns1", "ep1"): 1,
|
||||
},
|
||||
expectedChangedEndpoints: sets.NewString("ns1/ep1"),
|
||||
@ -808,7 +808,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
||||
ServicePortName: makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP),
|
||||
}},
|
||||
expectedStaleServiceNames: map[ServicePortName]bool{},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{},
|
||||
expectedLocalEndpoints: map[types.NamespacedName]int{},
|
||||
expectedChangedEndpoints: sets.NewString("ns1/ep1"),
|
||||
}, {
|
||||
name: "add a slice to an endpoint",
|
||||
@ -837,7 +837,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
||||
expectedStaleServiceNames: map[ServicePortName]bool{
|
||||
makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true,
|
||||
},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{
|
||||
expectedLocalEndpoints: map[types.NamespacedName]int{
|
||||
makeNSN("ns1", "ep1"): 1,
|
||||
},
|
||||
expectedChangedEndpoints: sets.NewString("ns1/ep1"),
|
||||
@ -869,7 +869,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
||||
ServicePortName: makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP),
|
||||
}},
|
||||
expectedStaleServiceNames: map[ServicePortName]bool{},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{},
|
||||
expectedLocalEndpoints: map[types.NamespacedName]int{},
|
||||
expectedChangedEndpoints: sets.NewString("ns1/ep1"),
|
||||
}, {
|
||||
name: "rename a port",
|
||||
@ -896,7 +896,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
||||
expectedStaleServiceNames: map[ServicePortName]bool{
|
||||
makeServicePortName("ns1", "ep1", "p11-2", v1.ProtocolUDP): true,
|
||||
},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{},
|
||||
expectedLocalEndpoints: map[types.NamespacedName]int{},
|
||||
expectedChangedEndpoints: sets.NewString("ns1/ep1"),
|
||||
}, {
|
||||
name: "renumber a port",
|
||||
@ -921,7 +921,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
||||
ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP),
|
||||
}},
|
||||
expectedStaleServiceNames: map[ServicePortName]bool{},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{},
|
||||
expectedLocalEndpoints: map[types.NamespacedName]int{},
|
||||
expectedChangedEndpoints: sets.NewString("ns1/ep1"),
|
||||
}, {
|
||||
name: "complex add and remove",
|
||||
@ -1009,7 +1009,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
||||
makeServicePortName("ns1", "ep1", "p122", v1.ProtocolUDP): true,
|
||||
makeServicePortName("ns3", "ep3", "p33", v1.ProtocolUDP): true,
|
||||
},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{
|
||||
expectedLocalEndpoints: map[types.NamespacedName]int{
|
||||
makeNSN("ns4", "ep4"): 1,
|
||||
},
|
||||
expectedChangedEndpoints: sets.NewString("ns1/ep1", "ns2/ep2", "ns3/ep3", "ns4/ep4"),
|
||||
@ -1031,7 +1031,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
||||
expectedStaleServiceNames: map[ServicePortName]bool{
|
||||
makeServicePortName("ns1", "ep1", "", v1.ProtocolUDP): true,
|
||||
},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{},
|
||||
expectedLocalEndpoints: map[types.NamespacedName]int{},
|
||||
expectedChangedEndpoints: sets.NewString("ns1/ep1"),
|
||||
},
|
||||
}
|
||||
@ -1108,8 +1108,10 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
||||
t.Errorf("[%d] expected staleServiceNames[%v], but didn't find it: %v", tci, svcName, result.StaleServiceNames)
|
||||
}
|
||||
}
|
||||
if !reflect.DeepEqual(result.HCEndpointsLocalIPSize, tc.expectedHealthchecks) {
|
||||
t.Errorf("[%d] expected healthchecks %v, got %v", tci, tc.expectedHealthchecks, result.HCEndpointsLocalIPSize)
|
||||
|
||||
localReadyEndpoints := fp.endpointsMap.LocalReadyEndpoints()
|
||||
if !reflect.DeepEqual(localReadyEndpoints, tc.expectedLocalEndpoints) {
|
||||
t.Errorf("[%d] expected local ready endpoints %v, got %v", tci, tc.expectedLocalEndpoints, localReadyEndpoints)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
@ -1615,10 +1615,10 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
// Update service healthchecks. The endpoints list might include services that are
|
||||
// not "OnlyLocal", but the services list will not, and the serviceHealthServer
|
||||
// will just drop those endpoints.
|
||||
if err := proxier.serviceHealthServer.SyncServices(serviceUpdateResult.HCServiceNodePorts); err != nil {
|
||||
if err := proxier.serviceHealthServer.SyncServices(proxier.svcPortMap.HealthCheckNodePorts()); err != nil {
|
||||
klog.ErrorS(err, "Error syncing healthcheck services")
|
||||
}
|
||||
if err := proxier.serviceHealthServer.SyncEndpoints(endpointUpdateResult.HCEndpointsLocalIPSize); err != nil {
|
||||
if err := proxier.serviceHealthServer.SyncEndpoints(proxier.endpointsMap.LocalReadyEndpoints()); err != nil {
|
||||
klog.ErrorS(err, "Error syncing healthcheck endpoints")
|
||||
}
|
||||
|
||||
|
@ -4079,21 +4079,22 @@ func TestBuildServiceMapAddRemove(t *testing.T) {
|
||||
t.Errorf("expected service map length 10, got %v", fp.svcPortMap)
|
||||
}
|
||||
|
||||
// The only-local-loadbalancer ones get added
|
||||
if len(result.HCServiceNodePorts) != 1 {
|
||||
t.Errorf("expected 1 healthcheck port, got %v", result.HCServiceNodePorts)
|
||||
} else {
|
||||
nsn := makeNSN("somewhere", "only-local-load-balancer")
|
||||
if port, found := result.HCServiceNodePorts[nsn]; !found || port != 345 {
|
||||
t.Errorf("expected healthcheck port [%q]=345: got %v", nsn, result.HCServiceNodePorts)
|
||||
}
|
||||
}
|
||||
|
||||
if len(result.UDPStaleClusterIP) != 0 {
|
||||
// Services only added, so nothing stale yet
|
||||
t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP))
|
||||
}
|
||||
|
||||
// The only-local-loadbalancer ones get added
|
||||
healthCheckNodePorts := fp.svcPortMap.HealthCheckNodePorts()
|
||||
if len(healthCheckNodePorts) != 1 {
|
||||
t.Errorf("expected 1 healthcheck port, got %v", healthCheckNodePorts)
|
||||
} else {
|
||||
nsn := makeNSN("somewhere", "only-local-load-balancer")
|
||||
if port, found := healthCheckNodePorts[nsn]; !found || port != 345 {
|
||||
t.Errorf("expected healthcheck port [%q]=345: got %v", nsn, healthCheckNodePorts)
|
||||
}
|
||||
}
|
||||
|
||||
// Remove some stuff
|
||||
// oneService is a modification of services[0] with removed first port.
|
||||
oneService := makeTestService("somewhere-else", "cluster-ip", func(svc *v1.Service) {
|
||||
@ -4112,10 +4113,6 @@ func TestBuildServiceMapAddRemove(t *testing.T) {
|
||||
t.Errorf("expected service map length 1, got %v", fp.svcPortMap)
|
||||
}
|
||||
|
||||
if len(result.HCServiceNodePorts) != 0 {
|
||||
t.Errorf("expected 0 healthcheck ports, got %v", result.HCServiceNodePorts)
|
||||
}
|
||||
|
||||
// All services but one were deleted. While you'd expect only the ClusterIPs
|
||||
// from the three deleted services here, we still have the ClusterIP for
|
||||
// the not-deleted service, because one of it's ServicePorts was deleted.
|
||||
@ -4128,6 +4125,11 @@ func TestBuildServiceMapAddRemove(t *testing.T) {
|
||||
t.Errorf("expected stale UDP service service %s", ip)
|
||||
}
|
||||
}
|
||||
|
||||
healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts()
|
||||
if len(healthCheckNodePorts) != 0 {
|
||||
t.Errorf("expected 0 healthcheck ports, got %v", healthCheckNodePorts)
|
||||
}
|
||||
}
|
||||
|
||||
func TestBuildServiceMapServiceHeadless(t *testing.T) {
|
||||
@ -4152,14 +4154,15 @@ func TestBuildServiceMapServiceHeadless(t *testing.T) {
|
||||
t.Errorf("expected service map length 0, got %d", len(fp.svcPortMap))
|
||||
}
|
||||
|
||||
// No proxied services, so no healthchecks
|
||||
if len(result.HCServiceNodePorts) != 0 {
|
||||
t.Errorf("expected healthcheck ports length 0, got %d", len(result.HCServiceNodePorts))
|
||||
}
|
||||
|
||||
if len(result.UDPStaleClusterIP) != 0 {
|
||||
t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP))
|
||||
}
|
||||
|
||||
// No proxied services, so no healthchecks
|
||||
healthCheckNodePorts := fp.svcPortMap.HealthCheckNodePorts()
|
||||
if len(healthCheckNodePorts) != 0 {
|
||||
t.Errorf("expected healthcheck ports length 0, got %d", len(healthCheckNodePorts))
|
||||
}
|
||||
}
|
||||
|
||||
func TestBuildServiceMapServiceTypeExternalName(t *testing.T) {
|
||||
@ -4179,13 +4182,14 @@ func TestBuildServiceMapServiceTypeExternalName(t *testing.T) {
|
||||
if len(fp.svcPortMap) != 0 {
|
||||
t.Errorf("expected service map length 0, got %v", fp.svcPortMap)
|
||||
}
|
||||
// No proxied services, so no healthchecks
|
||||
if len(result.HCServiceNodePorts) != 0 {
|
||||
t.Errorf("expected healthcheck ports length 0, got %v", result.HCServiceNodePorts)
|
||||
}
|
||||
if len(result.UDPStaleClusterIP) != 0 {
|
||||
t.Errorf("expected stale UDP services length 0, got %v", result.UDPStaleClusterIP)
|
||||
}
|
||||
// No proxied services, so no healthchecks
|
||||
healthCheckNodePorts := fp.svcPortMap.HealthCheckNodePorts()
|
||||
if len(healthCheckNodePorts) != 0 {
|
||||
t.Errorf("expected healthcheck ports length 0, got %v", healthCheckNodePorts)
|
||||
}
|
||||
}
|
||||
|
||||
func TestBuildServiceMapServiceUpdate(t *testing.T) {
|
||||
@ -4219,13 +4223,14 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) {
|
||||
if len(fp.svcPortMap) != 2 {
|
||||
t.Errorf("expected service map length 2, got %v", fp.svcPortMap)
|
||||
}
|
||||
if len(result.HCServiceNodePorts) != 0 {
|
||||
t.Errorf("expected healthcheck ports length 0, got %v", result.HCServiceNodePorts)
|
||||
}
|
||||
if len(result.UDPStaleClusterIP) != 0 {
|
||||
// Services only added, so nothing stale yet
|
||||
t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP))
|
||||
}
|
||||
healthCheckNodePorts := fp.svcPortMap.HealthCheckNodePorts()
|
||||
if len(healthCheckNodePorts) != 0 {
|
||||
t.Errorf("expected healthcheck ports length 0, got %v", healthCheckNodePorts)
|
||||
}
|
||||
|
||||
// Change service to load-balancer
|
||||
fp.OnServiceUpdate(servicev1, servicev2)
|
||||
@ -4233,12 +4238,13 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) {
|
||||
if len(fp.svcPortMap) != 2 {
|
||||
t.Errorf("expected service map length 2, got %v", fp.svcPortMap)
|
||||
}
|
||||
if len(result.HCServiceNodePorts) != 1 {
|
||||
t.Errorf("expected healthcheck ports length 1, got %v", result.HCServiceNodePorts)
|
||||
}
|
||||
if len(result.UDPStaleClusterIP) != 0 {
|
||||
t.Errorf("expected stale UDP services length 0, got %v", result.UDPStaleClusterIP.UnsortedList())
|
||||
}
|
||||
healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts()
|
||||
if len(healthCheckNodePorts) != 1 {
|
||||
t.Errorf("expected healthcheck ports length 1, got %v", healthCheckNodePorts)
|
||||
}
|
||||
|
||||
// No change; make sure the service map stays the same and there are
|
||||
// no health-check changes
|
||||
@ -4247,12 +4253,13 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) {
|
||||
if len(fp.svcPortMap) != 2 {
|
||||
t.Errorf("expected service map length 2, got %v", fp.svcPortMap)
|
||||
}
|
||||
if len(result.HCServiceNodePorts) != 1 {
|
||||
t.Errorf("expected healthcheck ports length 1, got %v", result.HCServiceNodePorts)
|
||||
}
|
||||
if len(result.UDPStaleClusterIP) != 0 {
|
||||
t.Errorf("expected stale UDP services length 0, got %v", result.UDPStaleClusterIP.UnsortedList())
|
||||
}
|
||||
healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts()
|
||||
if len(healthCheckNodePorts) != 1 {
|
||||
t.Errorf("expected healthcheck ports length 1, got %v", healthCheckNodePorts)
|
||||
}
|
||||
|
||||
// And back to ClusterIP
|
||||
fp.OnServiceUpdate(servicev2, servicev1)
|
||||
@ -4260,13 +4267,14 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) {
|
||||
if len(fp.svcPortMap) != 2 {
|
||||
t.Errorf("expected service map length 2, got %v", fp.svcPortMap)
|
||||
}
|
||||
if len(result.HCServiceNodePorts) != 0 {
|
||||
t.Errorf("expected healthcheck ports length 0, got %v", result.HCServiceNodePorts)
|
||||
}
|
||||
if len(result.UDPStaleClusterIP) != 0 {
|
||||
// Services only added, so nothing stale yet
|
||||
t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP))
|
||||
}
|
||||
healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts()
|
||||
if len(healthCheckNodePorts) != 0 {
|
||||
t.Errorf("expected healthcheck ports length 0, got %v", healthCheckNodePorts)
|
||||
}
|
||||
}
|
||||
|
||||
func populateEndpointSlices(proxier *Proxier, allEndpointSlices ...*discovery.EndpointSlice) {
|
||||
@ -4673,7 +4681,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
||||
expectedResult map[proxy.ServicePortName][]*endpointsInfo
|
||||
expectedStaleEndpoints []proxy.ServiceEndpoint
|
||||
expectedStaleServiceNames map[proxy.ServicePortName]bool
|
||||
expectedHealthchecks map[types.NamespacedName]int
|
||||
expectedLocalEndpoints map[types.NamespacedName]int
|
||||
}{{
|
||||
// Case[0]: nothing
|
||||
name: "nothing",
|
||||
@ -4681,7 +4689,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
||||
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{},
|
||||
expectedStaleEndpoints: []proxy.ServiceEndpoint{},
|
||||
expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{},
|
||||
expectedLocalEndpoints: map[types.NamespacedName]int{},
|
||||
}, {
|
||||
// Case[1]: no change, named port, local
|
||||
name: "no change, named port, local",
|
||||
@ -4699,7 +4707,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
||||
},
|
||||
expectedStaleEndpoints: []proxy.ServiceEndpoint{},
|
||||
expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{
|
||||
expectedLocalEndpoints: map[types.NamespacedName]int{
|
||||
makeNSN("ns1", "ep1"): 1,
|
||||
},
|
||||
}, {
|
||||
@ -4725,7 +4733,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
||||
},
|
||||
expectedStaleEndpoints: []proxy.ServiceEndpoint{},
|
||||
expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{},
|
||||
expectedLocalEndpoints: map[types.NamespacedName]int{},
|
||||
}, {
|
||||
// Case[3]: no change, multiple subsets, multiple ports, local
|
||||
name: "no change, multiple subsets, multiple ports, local",
|
||||
@ -4755,7 +4763,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
||||
},
|
||||
expectedStaleEndpoints: []proxy.ServiceEndpoint{},
|
||||
expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{
|
||||
expectedLocalEndpoints: map[types.NamespacedName]int{
|
||||
makeNSN("ns1", "ep1"): 1,
|
||||
},
|
||||
}, {
|
||||
@ -4817,7 +4825,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
||||
},
|
||||
expectedStaleEndpoints: []proxy.ServiceEndpoint{},
|
||||
expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{
|
||||
expectedLocalEndpoints: map[types.NamespacedName]int{
|
||||
makeNSN("ns1", "ep1"): 2,
|
||||
makeNSN("ns2", "ep2"): 1,
|
||||
},
|
||||
@ -4836,7 +4844,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
||||
expectedStaleServiceNames: map[proxy.ServicePortName]bool{
|
||||
makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): true,
|
||||
},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{
|
||||
expectedLocalEndpoints: map[types.NamespacedName]int{
|
||||
makeNSN("ns1", "ep1"): 1,
|
||||
},
|
||||
}, {
|
||||
@ -4855,7 +4863,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
||||
ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP),
|
||||
}},
|
||||
expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{},
|
||||
expectedLocalEndpoints: map[types.NamespacedName]int{},
|
||||
}, {
|
||||
// Case[7]: add an IP and port
|
||||
name: "add an IP and port",
|
||||
@ -4880,7 +4888,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
||||
expectedStaleServiceNames: map[proxy.ServicePortName]bool{
|
||||
makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true,
|
||||
},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{
|
||||
expectedLocalEndpoints: map[types.NamespacedName]int{
|
||||
makeNSN("ns1", "ep1"): 1,
|
||||
},
|
||||
}, {
|
||||
@ -4914,7 +4922,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
||||
ServicePortName: makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP),
|
||||
}},
|
||||
expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{},
|
||||
expectedLocalEndpoints: map[types.NamespacedName]int{},
|
||||
}, {
|
||||
// Case[9]: add a subset
|
||||
name: "add a subset",
|
||||
@ -4937,7 +4945,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
||||
expectedStaleServiceNames: map[proxy.ServicePortName]bool{
|
||||
makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true,
|
||||
},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{
|
||||
expectedLocalEndpoints: map[types.NamespacedName]int{
|
||||
makeNSN("ns1", "ep1"): 1,
|
||||
},
|
||||
}, {
|
||||
@ -4963,7 +4971,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
||||
ServicePortName: makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP),
|
||||
}},
|
||||
expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{},
|
||||
expectedLocalEndpoints: map[types.NamespacedName]int{},
|
||||
}, {
|
||||
// Case[11]: rename a port
|
||||
name: "rename a port",
|
||||
@ -4986,7 +4994,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
||||
expectedStaleServiceNames: map[proxy.ServicePortName]bool{
|
||||
makeServicePortName("ns1", "ep1", "p11-2", v1.ProtocolUDP): true,
|
||||
},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{},
|
||||
expectedLocalEndpoints: map[types.NamespacedName]int{},
|
||||
}, {
|
||||
// Case[12]: renumber a port
|
||||
name: "renumber a port",
|
||||
@ -5007,7 +5015,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
||||
ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP),
|
||||
}},
|
||||
expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{},
|
||||
expectedLocalEndpoints: map[types.NamespacedName]int{},
|
||||
}, {
|
||||
// Case[13]: complex add and remove
|
||||
name: "complex add and remove",
|
||||
@ -5071,7 +5079,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
||||
makeServicePortName("ns1", "ep1", "p122", v1.ProtocolUDP): true,
|
||||
makeServicePortName("ns3", "ep3", "p33", v1.ProtocolUDP): true,
|
||||
},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{
|
||||
expectedLocalEndpoints: map[types.NamespacedName]int{
|
||||
makeNSN("ns4", "ep4"): 1,
|
||||
},
|
||||
}, {
|
||||
@ -5089,7 +5097,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
||||
expectedStaleServiceNames: map[proxy.ServicePortName]bool{
|
||||
makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): true,
|
||||
},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{},
|
||||
expectedLocalEndpoints: map[types.NamespacedName]int{},
|
||||
},
|
||||
}
|
||||
|
||||
@ -5157,8 +5165,9 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
||||
t.Errorf("[%d] expected staleServiceNames[%v], but didn't find it: %v", tci, svcName, result.StaleServiceNames)
|
||||
}
|
||||
}
|
||||
if !reflect.DeepEqual(result.HCEndpointsLocalIPSize, tc.expectedHealthchecks) {
|
||||
t.Errorf("[%d] expected healthchecks %v, got %v", tci, tc.expectedHealthchecks, result.HCEndpointsLocalIPSize)
|
||||
localReadyEndpoints := fp.endpointsMap.LocalReadyEndpoints()
|
||||
if !reflect.DeepEqual(localReadyEndpoints, tc.expectedLocalEndpoints) {
|
||||
t.Errorf("[%d] expected local endpoints %v, got %v", tci, tc.expectedLocalEndpoints, localReadyEndpoints)
|
||||
}
|
||||
})
|
||||
}
|
||||
@ -5319,9 +5328,10 @@ func TestHealthCheckNodePortWhenTerminating(t *testing.T) {
|
||||
}
|
||||
|
||||
fp.OnEndpointSliceAdd(endpointSlice)
|
||||
result := fp.endpointsMap.Update(fp.endpointsChanges)
|
||||
if len(result.HCEndpointsLocalIPSize) != 1 {
|
||||
t.Errorf("unexpected number of health check node ports, expected 1 but got: %d", len(result.HCEndpointsLocalIPSize))
|
||||
_ = fp.endpointsMap.Update(fp.endpointsChanges)
|
||||
localReadyEndpoints := fp.endpointsMap.LocalReadyEndpoints()
|
||||
if len(localReadyEndpoints) != 1 {
|
||||
t.Errorf("unexpected number of local ready endpoints, expected 1 but got: %d", len(localReadyEndpoints))
|
||||
}
|
||||
|
||||
// set all endpoints to terminating
|
||||
@ -5373,9 +5383,10 @@ func TestHealthCheckNodePortWhenTerminating(t *testing.T) {
|
||||
}
|
||||
|
||||
fp.OnEndpointSliceUpdate(endpointSlice, endpointSliceTerminating)
|
||||
result = fp.endpointsMap.Update(fp.endpointsChanges)
|
||||
if len(result.HCEndpointsLocalIPSize) != 0 {
|
||||
t.Errorf("unexpected number of health check node ports, expected 0 but got: %d", len(result.HCEndpointsLocalIPSize))
|
||||
_ = fp.endpointsMap.Update(fp.endpointsChanges)
|
||||
localReadyEndpoints = fp.endpointsMap.LocalReadyEndpoints()
|
||||
if len(localReadyEndpoints) != 0 {
|
||||
t.Errorf("unexpected number of local ready endpoints, expected 0 but got: %d", len(localReadyEndpoints))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1519,10 +1519,10 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
// Update service healthchecks. The endpoints list might include services that are
|
||||
// not "OnlyLocal", but the services list will not, and the serviceHealthServer
|
||||
// will just drop those endpoints.
|
||||
if err := proxier.serviceHealthServer.SyncServices(serviceUpdateResult.HCServiceNodePorts); err != nil {
|
||||
if err := proxier.serviceHealthServer.SyncServices(proxier.svcPortMap.HealthCheckNodePorts()); err != nil {
|
||||
klog.ErrorS(err, "Error syncing healthcheck services")
|
||||
}
|
||||
if err := proxier.serviceHealthServer.SyncEndpoints(endpointUpdateResult.HCEndpointsLocalIPSize); err != nil {
|
||||
if err := proxier.serviceHealthServer.SyncEndpoints(proxier.endpointsMap.LocalReadyEndpoints()); err != nil {
|
||||
klog.ErrorS(err, "Error syncing healthcheck endpoints")
|
||||
}
|
||||
|
||||
|
@ -2543,21 +2543,22 @@ func TestBuildServiceMapAddRemove(t *testing.T) {
|
||||
t.Errorf("expected service map length 12, got %v", fp.svcPortMap)
|
||||
}
|
||||
|
||||
// The only-local-loadbalancer ones get added
|
||||
if len(result.HCServiceNodePorts) != 1 {
|
||||
t.Errorf("expected 1 healthcheck port, got %v", result.HCServiceNodePorts)
|
||||
} else {
|
||||
nsn := makeNSN("somewhere", "only-local-load-balancer")
|
||||
if port, found := result.HCServiceNodePorts[nsn]; !found || port != 345 {
|
||||
t.Errorf("expected healthcheck port [%q]=345: got %v", nsn, result.HCServiceNodePorts)
|
||||
}
|
||||
}
|
||||
|
||||
if len(result.UDPStaleClusterIP) != 0 {
|
||||
// Services only added, so nothing stale yet
|
||||
t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP))
|
||||
}
|
||||
|
||||
// The only-local-loadbalancer ones get added
|
||||
healthCheckNodePorts := fp.svcPortMap.HealthCheckNodePorts()
|
||||
if len(healthCheckNodePorts) != 1 {
|
||||
t.Errorf("expected 1 healthcheck port, got %v", healthCheckNodePorts)
|
||||
} else {
|
||||
nsn := makeNSN("somewhere", "only-local-load-balancer")
|
||||
if port, found := healthCheckNodePorts[nsn]; !found || port != 345 {
|
||||
t.Errorf("expected healthcheck port [%q]=345: got %v", nsn, healthCheckNodePorts)
|
||||
}
|
||||
}
|
||||
|
||||
// Remove some stuff
|
||||
// oneService is a modification of services[0] with removed first port.
|
||||
oneService := makeTestService("somewhere-else", "cluster-ip", func(svc *v1.Service) {
|
||||
@ -2576,10 +2577,6 @@ func TestBuildServiceMapAddRemove(t *testing.T) {
|
||||
t.Errorf("expected service map length 1, got %v", fp.svcPortMap)
|
||||
}
|
||||
|
||||
if len(result.HCServiceNodePorts) != 0 {
|
||||
t.Errorf("expected 0 healthcheck ports, got %v", result.HCServiceNodePorts)
|
||||
}
|
||||
|
||||
// All services but one were deleted. While you'd expect only the ClusterIPs
|
||||
// from the three deleted services here, we still have the ClusterIP for
|
||||
// the not-deleted service, because one of it's ServicePorts was deleted.
|
||||
@ -2592,6 +2589,11 @@ func TestBuildServiceMapAddRemove(t *testing.T) {
|
||||
t.Errorf("expected stale UDP service service %s", ip)
|
||||
}
|
||||
}
|
||||
|
||||
healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts()
|
||||
if len(healthCheckNodePorts) != 0 {
|
||||
t.Errorf("expected 0 healthcheck ports, got %v", healthCheckNodePorts)
|
||||
}
|
||||
}
|
||||
|
||||
func TestBuildServiceMapServiceHeadless(t *testing.T) {
|
||||
@ -2623,14 +2625,15 @@ func TestBuildServiceMapServiceHeadless(t *testing.T) {
|
||||
t.Errorf("expected service map length 0, got %d", len(fp.svcPortMap))
|
||||
}
|
||||
|
||||
// No proxied services, so no healthchecks
|
||||
if len(result.HCServiceNodePorts) != 0 {
|
||||
t.Errorf("expected healthcheck ports length 0, got %d", len(result.HCServiceNodePorts))
|
||||
}
|
||||
|
||||
if len(result.UDPStaleClusterIP) != 0 {
|
||||
t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP))
|
||||
}
|
||||
|
||||
// No proxied services, so no healthchecks
|
||||
healthCheckNodePorts := fp.svcPortMap.HealthCheckNodePorts()
|
||||
if len(healthCheckNodePorts) != 0 {
|
||||
t.Errorf("expected healthcheck ports length 0, got %d", len(healthCheckNodePorts))
|
||||
}
|
||||
}
|
||||
|
||||
func TestBuildServiceMapServiceTypeExternalName(t *testing.T) {
|
||||
@ -2652,13 +2655,15 @@ func TestBuildServiceMapServiceTypeExternalName(t *testing.T) {
|
||||
if len(fp.svcPortMap) != 0 {
|
||||
t.Errorf("expected service map length 0, got %v", fp.svcPortMap)
|
||||
}
|
||||
// No proxied services, so no healthchecks
|
||||
if len(result.HCServiceNodePorts) != 0 {
|
||||
t.Errorf("expected healthcheck ports length 0, got %v", result.HCServiceNodePorts)
|
||||
}
|
||||
if len(result.UDPStaleClusterIP) != 0 {
|
||||
t.Errorf("expected stale UDP services length 0, got %v", result.UDPStaleClusterIP)
|
||||
}
|
||||
|
||||
// No proxied services, so no healthchecks
|
||||
healthCheckNodePorts := fp.svcPortMap.HealthCheckNodePorts()
|
||||
if len(healthCheckNodePorts) != 0 {
|
||||
t.Errorf("expected healthcheck ports length 0, got %v", healthCheckNodePorts)
|
||||
}
|
||||
}
|
||||
|
||||
func TestBuildServiceMapServiceUpdate(t *testing.T) {
|
||||
@ -2694,27 +2699,31 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) {
|
||||
if len(fp.svcPortMap) != 2 {
|
||||
t.Errorf("expected service map length 2, got %v", fp.svcPortMap)
|
||||
}
|
||||
if len(result.HCServiceNodePorts) != 0 {
|
||||
t.Errorf("expected healthcheck ports length 0, got %v", result.HCServiceNodePorts)
|
||||
}
|
||||
if len(result.UDPStaleClusterIP) != 0 {
|
||||
// Services only added, so nothing stale yet
|
||||
t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP))
|
||||
}
|
||||
|
||||
healthCheckNodePorts := fp.svcPortMap.HealthCheckNodePorts()
|
||||
if len(healthCheckNodePorts) != 0 {
|
||||
t.Errorf("expected healthcheck ports length 0, got %v", healthCheckNodePorts)
|
||||
}
|
||||
|
||||
// Change service to load-balancer
|
||||
fp.OnServiceUpdate(servicev1, servicev2)
|
||||
result = fp.svcPortMap.Update(fp.serviceChanges)
|
||||
if len(fp.svcPortMap) != 2 {
|
||||
t.Errorf("expected service map length 2, got %v", fp.svcPortMap)
|
||||
}
|
||||
if len(result.HCServiceNodePorts) != 1 {
|
||||
t.Errorf("expected healthcheck ports length 1, got %v", result.HCServiceNodePorts)
|
||||
}
|
||||
if len(result.UDPStaleClusterIP) != 0 {
|
||||
t.Errorf("expected stale UDP services length 0, got %v", result.UDPStaleClusterIP.List())
|
||||
}
|
||||
|
||||
healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts()
|
||||
if len(healthCheckNodePorts) != 1 {
|
||||
t.Errorf("expected healthcheck ports length 1, got %v", healthCheckNodePorts)
|
||||
}
|
||||
|
||||
// No change; make sure the service map stays the same and there are
|
||||
// no health-check changes
|
||||
fp.OnServiceUpdate(servicev2, servicev2)
|
||||
@ -2722,26 +2731,30 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) {
|
||||
if len(fp.svcPortMap) != 2 {
|
||||
t.Errorf("expected service map length 2, got %v", fp.svcPortMap)
|
||||
}
|
||||
if len(result.HCServiceNodePorts) != 1 {
|
||||
t.Errorf("expected healthcheck ports length 1, got %v", result.HCServiceNodePorts)
|
||||
}
|
||||
if len(result.UDPStaleClusterIP) != 0 {
|
||||
t.Errorf("expected stale UDP services length 0, got %v", result.UDPStaleClusterIP.List())
|
||||
}
|
||||
|
||||
healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts()
|
||||
if len(healthCheckNodePorts) != 1 {
|
||||
t.Errorf("expected healthcheck ports length 1, got %v", healthCheckNodePorts)
|
||||
}
|
||||
|
||||
// And back to ClusterIP
|
||||
fp.OnServiceUpdate(servicev2, servicev1)
|
||||
result = fp.svcPortMap.Update(fp.serviceChanges)
|
||||
if len(fp.svcPortMap) != 2 {
|
||||
t.Errorf("expected service map length 2, got %v", fp.svcPortMap)
|
||||
}
|
||||
if len(result.HCServiceNodePorts) != 0 {
|
||||
t.Errorf("expected healthcheck ports length 0, got %v", result.HCServiceNodePorts)
|
||||
}
|
||||
if len(result.UDPStaleClusterIP) != 0 {
|
||||
// Services only added, so nothing stale yet
|
||||
t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP))
|
||||
}
|
||||
|
||||
healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts()
|
||||
if len(healthCheckNodePorts) != 0 {
|
||||
t.Errorf("expected healthcheck ports length 0, got %v", healthCheckNodePorts)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSessionAffinity(t *testing.T) {
|
||||
@ -3142,7 +3155,7 @@ func Test_updateEndpointsMap(t *testing.T) {
|
||||
expectedResult map[proxy.ServicePortName][]*proxy.BaseEndpointInfo
|
||||
expectedStaleEndpoints []proxy.ServiceEndpoint
|
||||
expectedStaleServiceNames map[proxy.ServicePortName]bool
|
||||
expectedHealthchecks map[types.NamespacedName]int
|
||||
expectedReadyEndpoints map[types.NamespacedName]int
|
||||
}{{
|
||||
// Case[0]: nothing
|
||||
name: "nothing",
|
||||
@ -3150,7 +3163,7 @@ func Test_updateEndpointsMap(t *testing.T) {
|
||||
expectedResult: map[proxy.ServicePortName][]*proxy.BaseEndpointInfo{},
|
||||
expectedStaleEndpoints: []proxy.ServiceEndpoint{},
|
||||
expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{},
|
||||
expectedReadyEndpoints: map[types.NamespacedName]int{},
|
||||
}, {
|
||||
// Case[1]: no change, named port, local
|
||||
name: "no change, named port, local",
|
||||
@ -3168,7 +3181,7 @@ func Test_updateEndpointsMap(t *testing.T) {
|
||||
},
|
||||
expectedStaleEndpoints: []proxy.ServiceEndpoint{},
|
||||
expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{
|
||||
expectedReadyEndpoints: map[types.NamespacedName]int{
|
||||
makeNSN("ns1", "ep1"): 1,
|
||||
},
|
||||
}, {
|
||||
@ -3194,7 +3207,7 @@ func Test_updateEndpointsMap(t *testing.T) {
|
||||
},
|
||||
expectedStaleEndpoints: []proxy.ServiceEndpoint{},
|
||||
expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{},
|
||||
expectedReadyEndpoints: map[types.NamespacedName]int{},
|
||||
}, {
|
||||
// Case[3]: no change, multiple subsets, multiple ports, local
|
||||
name: "no change, multiple subsets, multiple ports, local",
|
||||
@ -3224,7 +3237,7 @@ func Test_updateEndpointsMap(t *testing.T) {
|
||||
},
|
||||
expectedStaleEndpoints: []proxy.ServiceEndpoint{},
|
||||
expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{
|
||||
expectedReadyEndpoints: map[types.NamespacedName]int{
|
||||
makeNSN("ns1", "ep1"): 1,
|
||||
},
|
||||
}, {
|
||||
@ -3286,7 +3299,7 @@ func Test_updateEndpointsMap(t *testing.T) {
|
||||
},
|
||||
expectedStaleEndpoints: []proxy.ServiceEndpoint{},
|
||||
expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{
|
||||
expectedReadyEndpoints: map[types.NamespacedName]int{
|
||||
makeNSN("ns1", "ep1"): 2,
|
||||
makeNSN("ns2", "ep2"): 1,
|
||||
},
|
||||
@ -3305,7 +3318,7 @@ func Test_updateEndpointsMap(t *testing.T) {
|
||||
expectedStaleServiceNames: map[proxy.ServicePortName]bool{
|
||||
makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): true,
|
||||
},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{
|
||||
expectedReadyEndpoints: map[types.NamespacedName]int{
|
||||
makeNSN("ns1", "ep1"): 1,
|
||||
},
|
||||
}, {
|
||||
@ -3324,7 +3337,7 @@ func Test_updateEndpointsMap(t *testing.T) {
|
||||
ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP),
|
||||
}},
|
||||
expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{},
|
||||
expectedReadyEndpoints: map[types.NamespacedName]int{},
|
||||
}, {
|
||||
// Case[7]: add an IP and port
|
||||
name: "add an IP and port",
|
||||
@ -3349,7 +3362,7 @@ func Test_updateEndpointsMap(t *testing.T) {
|
||||
expectedStaleServiceNames: map[proxy.ServicePortName]bool{
|
||||
makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true,
|
||||
},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{
|
||||
expectedReadyEndpoints: map[types.NamespacedName]int{
|
||||
makeNSN("ns1", "ep1"): 1,
|
||||
},
|
||||
}, {
|
||||
@ -3383,7 +3396,7 @@ func Test_updateEndpointsMap(t *testing.T) {
|
||||
ServicePortName: makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP),
|
||||
}},
|
||||
expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{},
|
||||
expectedReadyEndpoints: map[types.NamespacedName]int{},
|
||||
}, {
|
||||
// Case[9]: add a subset
|
||||
name: "add a subset",
|
||||
@ -3406,7 +3419,7 @@ func Test_updateEndpointsMap(t *testing.T) {
|
||||
expectedStaleServiceNames: map[proxy.ServicePortName]bool{
|
||||
makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true,
|
||||
},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{
|
||||
expectedReadyEndpoints: map[types.NamespacedName]int{
|
||||
makeNSN("ns1", "ep1"): 1,
|
||||
},
|
||||
}, {
|
||||
@ -3432,7 +3445,7 @@ func Test_updateEndpointsMap(t *testing.T) {
|
||||
ServicePortName: makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP),
|
||||
}},
|
||||
expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{},
|
||||
expectedReadyEndpoints: map[types.NamespacedName]int{},
|
||||
}, {
|
||||
// Case[11]: rename a port
|
||||
name: "rename a port",
|
||||
@ -3455,7 +3468,7 @@ func Test_updateEndpointsMap(t *testing.T) {
|
||||
expectedStaleServiceNames: map[proxy.ServicePortName]bool{
|
||||
makeServicePortName("ns1", "ep1", "p11-2", v1.ProtocolUDP): true,
|
||||
},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{},
|
||||
expectedReadyEndpoints: map[types.NamespacedName]int{},
|
||||
}, {
|
||||
// Case[12]: renumber a port
|
||||
name: "renumber a port",
|
||||
@ -3476,7 +3489,7 @@ func Test_updateEndpointsMap(t *testing.T) {
|
||||
ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP),
|
||||
}},
|
||||
expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{},
|
||||
expectedReadyEndpoints: map[types.NamespacedName]int{},
|
||||
}, {
|
||||
// Case[13]: complex add and remove
|
||||
name: "complex add and remove",
|
||||
@ -3540,7 +3553,7 @@ func Test_updateEndpointsMap(t *testing.T) {
|
||||
makeServicePortName("ns1", "ep1", "p122", v1.ProtocolUDP): true,
|
||||
makeServicePortName("ns3", "ep3", "p33", v1.ProtocolUDP): true,
|
||||
},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{
|
||||
expectedReadyEndpoints: map[types.NamespacedName]int{
|
||||
makeNSN("ns4", "ep4"): 1,
|
||||
},
|
||||
}, {
|
||||
@ -3558,7 +3571,7 @@ func Test_updateEndpointsMap(t *testing.T) {
|
||||
expectedStaleServiceNames: map[proxy.ServicePortName]bool{
|
||||
makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): true,
|
||||
},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{},
|
||||
expectedReadyEndpoints: map[types.NamespacedName]int{},
|
||||
},
|
||||
}
|
||||
|
||||
@ -3629,8 +3642,9 @@ func Test_updateEndpointsMap(t *testing.T) {
|
||||
t.Errorf("[%d] expected staleServiceNames[%v], but didn't find it: %v", tci, svcName, result.StaleServiceNames)
|
||||
}
|
||||
}
|
||||
if !reflect.DeepEqual(result.HCEndpointsLocalIPSize, tc.expectedHealthchecks) {
|
||||
t.Errorf("[%d] expected healthchecks %v, got %v", tci, tc.expectedHealthchecks, result.HCEndpointsLocalIPSize)
|
||||
localReadyEndpoints := fp.endpointsMap.LocalReadyEndpoints()
|
||||
if !reflect.DeepEqual(localReadyEndpoints, tc.expectedReadyEndpoints) {
|
||||
t.Errorf("[%d] expected healthchecks %v, got %v", tci, tc.expectedReadyEndpoints, localReadyEndpoints)
|
||||
}
|
||||
})
|
||||
}
|
||||
@ -4539,9 +4553,10 @@ func Test_HealthCheckNodePortWhenTerminating(t *testing.T) {
|
||||
}
|
||||
|
||||
fp.OnEndpointSliceAdd(endpointSlice)
|
||||
result := fp.endpointsMap.Update(fp.endpointsChanges)
|
||||
if len(result.HCEndpointsLocalIPSize) != 1 {
|
||||
t.Errorf("unexpected number of health check node ports, expected 1 but got: %d", len(result.HCEndpointsLocalIPSize))
|
||||
_ = fp.endpointsMap.Update(fp.endpointsChanges)
|
||||
localReadyEndpoints := fp.endpointsMap.LocalReadyEndpoints()
|
||||
if len(localReadyEndpoints) != 1 {
|
||||
t.Errorf("unexpected number of health check node ports, expected 1 but got: %d", len(localReadyEndpoints))
|
||||
}
|
||||
|
||||
// set all endpoints to terminating
|
||||
@ -4593,9 +4608,10 @@ func Test_HealthCheckNodePortWhenTerminating(t *testing.T) {
|
||||
}
|
||||
|
||||
fp.OnEndpointSliceUpdate(endpointSlice, endpointSliceTerminating)
|
||||
result = fp.endpointsMap.Update(fp.endpointsChanges)
|
||||
if len(result.HCEndpointsLocalIPSize) != 0 {
|
||||
t.Errorf("unexpected number of health check node ports, expected 0 but got: %d", len(result.HCEndpointsLocalIPSize))
|
||||
_ = fp.endpointsMap.Update(fp.endpointsChanges)
|
||||
localReadyEndpoints = fp.endpointsMap.LocalReadyEndpoints()
|
||||
if len(localReadyEndpoints) != 0 {
|
||||
t.Errorf("unexpected number of health check node ports, expected 0 but got: %d", len(localReadyEndpoints))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -337,9 +337,6 @@ func (sct *ServiceChangeTracker) PendingChanges() sets.String {
|
||||
|
||||
// UpdateServiceMapResult is the updated results after applying service changes.
|
||||
type UpdateServiceMapResult struct {
|
||||
// HCServiceNodePorts is a map of Service names to node port numbers which indicate the health of that Service on this Node.
|
||||
// The value(uint16) of HCServices map is the service health check node port.
|
||||
HCServiceNodePorts map[types.NamespacedName]uint16
|
||||
// UDPStaleClusterIP holds stale (no longer assigned to a Service) Service IPs that had UDP ports.
|
||||
// Callers can use this to abort timeout-waits or clear connection-tracking information.
|
||||
UDPStaleClusterIP sets.String
|
||||
@ -349,17 +346,21 @@ type UpdateServiceMapResult struct {
|
||||
func (sm ServicePortMap) Update(changes *ServiceChangeTracker) (result UpdateServiceMapResult) {
|
||||
result.UDPStaleClusterIP = sets.NewString()
|
||||
sm.apply(changes, result.UDPStaleClusterIP)
|
||||
return result
|
||||
}
|
||||
|
||||
// HealthCheckNodePorts returns a map of Service names to HealthCheckNodePort values
|
||||
// for all Services in sm with non-zero HealthCheckNodePort.
|
||||
func (sm ServicePortMap) HealthCheckNodePorts() map[types.NamespacedName]uint16 {
|
||||
// TODO: If this will appear to be computationally expensive, consider
|
||||
// computing this incrementally similarly to svcPortMap.
|
||||
result.HCServiceNodePorts = make(map[types.NamespacedName]uint16)
|
||||
ports := make(map[types.NamespacedName]uint16)
|
||||
for svcPortName, info := range sm {
|
||||
if info.HealthCheckNodePort() != 0 {
|
||||
result.HCServiceNodePorts[svcPortName.NamespacedName] = uint16(info.HealthCheckNodePort())
|
||||
ports[svcPortName.NamespacedName] = uint16(info.HealthCheckNodePort())
|
||||
}
|
||||
}
|
||||
|
||||
return result
|
||||
return ports
|
||||
}
|
||||
|
||||
// ServicePortMap maps a service to its ServicePort.
|
||||
|
@ -568,15 +568,15 @@ func TestServiceMapUpdateHeadless(t *testing.T) {
|
||||
if len(fp.svcPortMap) != 0 {
|
||||
t.Errorf("expected service map length 0, got %d", len(fp.svcPortMap))
|
||||
}
|
||||
|
||||
// No proxied services, so no healthchecks
|
||||
if len(result.HCServiceNodePorts) != 0 {
|
||||
t.Errorf("expected healthcheck ports length 0, got %d", len(result.HCServiceNodePorts))
|
||||
}
|
||||
|
||||
if len(result.UDPStaleClusterIP) != 0 {
|
||||
t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP))
|
||||
}
|
||||
|
||||
// No proxied services, so no healthchecks
|
||||
healthCheckNodePorts := fp.svcPortMap.HealthCheckNodePorts()
|
||||
if len(healthCheckNodePorts) != 0 {
|
||||
t.Errorf("expected healthcheck ports length 0, got %d", len(healthCheckNodePorts))
|
||||
}
|
||||
}
|
||||
|
||||
func TestUpdateServiceTypeExternalName(t *testing.T) {
|
||||
@ -599,13 +599,15 @@ func TestUpdateServiceTypeExternalName(t *testing.T) {
|
||||
if len(fp.svcPortMap) != 0 {
|
||||
t.Errorf("expected service map length 0, got %v", fp.svcPortMap)
|
||||
}
|
||||
// No proxied services, so no healthchecks
|
||||
if len(result.HCServiceNodePorts) != 0 {
|
||||
t.Errorf("expected healthcheck ports length 0, got %v", result.HCServiceNodePorts)
|
||||
}
|
||||
if len(result.UDPStaleClusterIP) != 0 {
|
||||
t.Errorf("expected stale UDP services length 0, got %v", result.UDPStaleClusterIP)
|
||||
}
|
||||
|
||||
// No proxied services, so no healthchecks
|
||||
healthCheckNodePorts := fp.svcPortMap.HealthCheckNodePorts()
|
||||
if len(healthCheckNodePorts) != 0 {
|
||||
t.Errorf("expected healthcheck ports length 0, got %v", healthCheckNodePorts)
|
||||
}
|
||||
}
|
||||
|
||||
func TestBuildServiceMapAddRemove(t *testing.T) {
|
||||
@ -671,22 +673,22 @@ func TestBuildServiceMapAddRemove(t *testing.T) {
|
||||
if len(fp.svcPortMap) != 8 {
|
||||
t.Errorf("expected service map length 2, got %v", fp.svcPortMap)
|
||||
}
|
||||
|
||||
// The only-local-loadbalancer ones get added
|
||||
if len(result.HCServiceNodePorts) != 1 {
|
||||
t.Errorf("expected 1 healthcheck port, got %v", result.HCServiceNodePorts)
|
||||
} else {
|
||||
nsn := makeNSN("ns1", "only-local-load-balancer")
|
||||
if port, found := result.HCServiceNodePorts[nsn]; !found || port != 345 {
|
||||
t.Errorf("expected healthcheck port [%q]=345: got %v", nsn, result.HCServiceNodePorts)
|
||||
}
|
||||
}
|
||||
|
||||
if len(result.UDPStaleClusterIP) != 0 {
|
||||
// Services only added, so nothing stale yet
|
||||
t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP))
|
||||
}
|
||||
|
||||
// The only-local-loadbalancer ones get added
|
||||
healthCheckNodePorts := fp.svcPortMap.HealthCheckNodePorts()
|
||||
if len(healthCheckNodePorts) != 1 {
|
||||
t.Errorf("expected 1 healthcheck port, got %v", healthCheckNodePorts)
|
||||
} else {
|
||||
nsn := makeNSN("ns1", "only-local-load-balancer")
|
||||
if port, found := healthCheckNodePorts[nsn]; !found || port != 345 {
|
||||
t.Errorf("expected healthcheck port [%q]=345: got %v", nsn, healthCheckNodePorts)
|
||||
}
|
||||
}
|
||||
|
||||
// Remove some stuff
|
||||
// oneService is a modification of services[0] with removed first port.
|
||||
oneService := makeTestService("ns2", "cluster-ip", func(svc *v1.Service) {
|
||||
@ -709,8 +711,9 @@ func TestBuildServiceMapAddRemove(t *testing.T) {
|
||||
t.Errorf("expected service map length 1, got %v", fp.svcPortMap)
|
||||
}
|
||||
|
||||
if len(result.HCServiceNodePorts) != 0 {
|
||||
t.Errorf("expected 0 healthcheck ports, got %v", result.HCServiceNodePorts)
|
||||
healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts()
|
||||
if len(healthCheckNodePorts) != 0 {
|
||||
t.Errorf("expected 0 healthcheck ports, got %v", healthCheckNodePorts)
|
||||
}
|
||||
|
||||
// All services but one were deleted. While you'd expect only the ClusterIPs
|
||||
@ -761,14 +764,16 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) {
|
||||
if len(fp.svcPortMap) != 2 {
|
||||
t.Errorf("expected service map length 2, got %v", fp.svcPortMap)
|
||||
}
|
||||
if len(result.HCServiceNodePorts) != 0 {
|
||||
t.Errorf("expected healthcheck ports length 0, got %v", result.HCServiceNodePorts)
|
||||
}
|
||||
if len(result.UDPStaleClusterIP) != 0 {
|
||||
// Services only added, so nothing stale yet
|
||||
t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP))
|
||||
}
|
||||
|
||||
healthCheckNodePorts := fp.svcPortMap.HealthCheckNodePorts()
|
||||
if len(healthCheckNodePorts) != 0 {
|
||||
t.Errorf("expected healthcheck ports length 0, got %v", healthCheckNodePorts)
|
||||
}
|
||||
|
||||
// Change service to load-balancer
|
||||
fp.updateService(servicev1, servicev2)
|
||||
pending = fp.serviceChanges.PendingChanges()
|
||||
@ -779,13 +784,15 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) {
|
||||
if len(fp.svcPortMap) != 2 {
|
||||
t.Errorf("expected service map length 2, got %v", fp.svcPortMap)
|
||||
}
|
||||
if len(result.HCServiceNodePorts) != 1 {
|
||||
t.Errorf("expected healthcheck ports length 1, got %v", result.HCServiceNodePorts)
|
||||
}
|
||||
if len(result.UDPStaleClusterIP) != 0 {
|
||||
t.Errorf("expected stale UDP services length 0, got %v", result.UDPStaleClusterIP.UnsortedList())
|
||||
}
|
||||
|
||||
healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts()
|
||||
if len(healthCheckNodePorts) != 1 {
|
||||
t.Errorf("expected healthcheck ports length 1, got %v", healthCheckNodePorts)
|
||||
}
|
||||
|
||||
// No change; make sure the service map stays the same and there are
|
||||
// no health-check changes
|
||||
fp.updateService(servicev2, servicev2)
|
||||
@ -797,13 +804,15 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) {
|
||||
if len(fp.svcPortMap) != 2 {
|
||||
t.Errorf("expected service map length 2, got %v", fp.svcPortMap)
|
||||
}
|
||||
if len(result.HCServiceNodePorts) != 1 {
|
||||
t.Errorf("expected healthcheck ports length 1, got %v", result.HCServiceNodePorts)
|
||||
}
|
||||
if len(result.UDPStaleClusterIP) != 0 {
|
||||
t.Errorf("expected stale UDP services length 0, got %v", result.UDPStaleClusterIP.UnsortedList())
|
||||
}
|
||||
|
||||
healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts()
|
||||
if len(healthCheckNodePorts) != 1 {
|
||||
t.Errorf("expected healthcheck ports length 1, got %v", healthCheckNodePorts)
|
||||
}
|
||||
|
||||
// And back to ClusterIP
|
||||
fp.updateService(servicev2, servicev1)
|
||||
pending = fp.serviceChanges.PendingChanges()
|
||||
@ -814,11 +823,13 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) {
|
||||
if len(fp.svcPortMap) != 2 {
|
||||
t.Errorf("expected service map length 2, got %v", fp.svcPortMap)
|
||||
}
|
||||
if len(result.HCServiceNodePorts) != 0 {
|
||||
t.Errorf("expected healthcheck ports length 0, got %v", result.HCServiceNodePorts)
|
||||
}
|
||||
if len(result.UDPStaleClusterIP) != 0 {
|
||||
// Services only added, so nothing stale yet
|
||||
t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP))
|
||||
}
|
||||
|
||||
healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts()
|
||||
if len(healthCheckNodePorts) != 0 {
|
||||
t.Errorf("expected healthcheck ports length 0, got %v", healthCheckNodePorts)
|
||||
}
|
||||
}
|
||||
|
@ -1644,10 +1644,10 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
// Update service healthchecks. The endpoints list might include services that are
|
||||
// not "OnlyLocal", but the services list will not, and the serviceHealthServer
|
||||
// will just drop those endpoints.
|
||||
if err := proxier.serviceHealthServer.SyncServices(serviceUpdateResult.HCServiceNodePorts); err != nil {
|
||||
if err := proxier.serviceHealthServer.SyncServices(proxier.svcPortMap.HealthCheckNodePorts()); err != nil {
|
||||
klog.ErrorS(err, "Error syncing healthcheck services")
|
||||
}
|
||||
if err := proxier.serviceHealthServer.SyncEndpoints(endpointUpdateResult.HCEndpointsLocalIPSize); err != nil {
|
||||
if err := proxier.serviceHealthServer.SyncEndpoints(proxier.endpointsMap.LocalReadyEndpoints()); err != nil {
|
||||
klog.ErrorS(err, "Error syncing healthcheck endpoints")
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user