Split out HealthCheckNodePort stuff from service/endpoint map Update()
In addition to actually updating their data from the provided list of changes, EndpointsMap.Update() and ServicePortMap.Update() return a struct with some information about things that changed because of that update (eg services with stale conntrack entries). For some reason, they were also returning information about HealthCheckNodePorts, but they were returning *static* information based on the current (post-Update) state of the map, not information about what had *changed* in the update. Since this doesn't match how the other data in the struct is used (and since there's no reason to have the data only be returned when you call Update() anyway) , split it out.
This commit is contained in:
parent
b3925d4741
commit
d901992eae
@ -293,8 +293,6 @@ type endpointsChange struct {
|
||||
|
||||
// UpdateEndpointMapResult is the updated results after applying endpoints changes.
|
||||
type UpdateEndpointMapResult struct {
|
||||
// HCEndpointsLocalIPSize maps an endpoints name to the length of its local IPs.
|
||||
HCEndpointsLocalIPSize map[types.NamespacedName]int
|
||||
// StaleEndpoints identifies if an endpoints service pair is stale.
|
||||
StaleEndpoints []ServiceEndpoint
|
||||
// StaleServiceNames identifies if a service is stale.
|
||||
@ -313,15 +311,6 @@ func (em EndpointsMap) Update(changes *EndpointChangeTracker) (result UpdateEndp
|
||||
|
||||
em.apply(
|
||||
changes, &result.StaleEndpoints, &result.StaleServiceNames, &result.LastChangeTriggerTimes)
|
||||
|
||||
// TODO: If this will appear to be computationally expensive, consider
|
||||
// computing this incrementally similarly to endpointsMap.
|
||||
result.HCEndpointsLocalIPSize = make(map[types.NamespacedName]int)
|
||||
localIPs := em.getLocalReadyEndpointIPs()
|
||||
for nsn, ips := range localIPs {
|
||||
result.HCEndpointsLocalIPSize[nsn] = len(ips)
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
@ -366,7 +355,7 @@ func (em EndpointsMap) unmerge(other EndpointsMap) {
|
||||
}
|
||||
}
|
||||
|
||||
// GetLocalEndpointIPs returns endpoints IPs if given endpoint is local - local means the endpoint is running in same host as kube-proxy.
|
||||
// getLocalEndpointIPs returns endpoints IPs if given endpoint is local - local means the endpoint is running in same host as kube-proxy.
|
||||
func (em EndpointsMap) getLocalReadyEndpointIPs() map[types.NamespacedName]sets.String {
|
||||
localIPs := make(map[types.NamespacedName]sets.String)
|
||||
for svcPortName, epList := range em {
|
||||
@ -389,6 +378,25 @@ func (em EndpointsMap) getLocalReadyEndpointIPs() map[types.NamespacedName]sets.
|
||||
return localIPs
|
||||
}
|
||||
|
||||
// LocalReadyEndpoints returns a map of Service names to the number of local ready
|
||||
// endpoints for that service.
|
||||
func (em EndpointsMap) LocalReadyEndpoints() map[types.NamespacedName]int {
|
||||
// TODO: If this will appear to be computationally expensive, consider
|
||||
// computing this incrementally similarly to endpointsMap.
|
||||
|
||||
// (Note that we need to call getLocalEndpointIPs first to squash the data by IP,
|
||||
// because the EndpointsMap is sorted by IP+port, not just IP, and we want to
|
||||
// consider a Service pointing to 10.0.0.1:80 and 10.0.0.1:443 to have 1 endpoint,
|
||||
// not 2.)
|
||||
|
||||
eps := make(map[types.NamespacedName]int)
|
||||
localIPs := em.getLocalReadyEndpointIPs()
|
||||
for nsn, ips := range localIPs {
|
||||
eps[nsn] = len(ips)
|
||||
}
|
||||
return eps
|
||||
}
|
||||
|
||||
// detectStaleConnections modifies <staleEndpoints> and <staleServices> with detected stale connections. <staleServiceNames>
|
||||
// is used to store stale udp service in order to clear udp conntrack later.
|
||||
func detectStaleConnections(oldEndpointsMap, newEndpointsMap EndpointsMap, staleEndpoints *[]ServiceEndpoint, staleServiceNames *[]ServicePortName) {
|
||||
|
@ -505,7 +505,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
||||
expectedResult map[ServicePortName][]*BaseEndpointInfo
|
||||
expectedStaleEndpoints []ServiceEndpoint
|
||||
expectedStaleServiceNames map[ServicePortName]bool
|
||||
expectedHealthchecks map[types.NamespacedName]int
|
||||
expectedLocalEndpoints map[types.NamespacedName]int
|
||||
expectedChangedEndpoints sets.String
|
||||
}{{
|
||||
name: "empty",
|
||||
@ -513,7 +513,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
||||
expectedResult: map[ServicePortName][]*BaseEndpointInfo{},
|
||||
expectedStaleEndpoints: []ServiceEndpoint{},
|
||||
expectedStaleServiceNames: map[ServicePortName]bool{},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{},
|
||||
expectedLocalEndpoints: map[types.NamespacedName]int{},
|
||||
expectedChangedEndpoints: sets.NewString(),
|
||||
}, {
|
||||
name: "no change, unnamed port",
|
||||
@ -535,7 +535,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
||||
},
|
||||
expectedStaleEndpoints: []ServiceEndpoint{},
|
||||
expectedStaleServiceNames: map[ServicePortName]bool{},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{},
|
||||
expectedLocalEndpoints: map[types.NamespacedName]int{},
|
||||
expectedChangedEndpoints: sets.NewString(),
|
||||
}, {
|
||||
name: "no change, named port, local",
|
||||
@ -557,7 +557,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
||||
},
|
||||
expectedStaleEndpoints: []ServiceEndpoint{},
|
||||
expectedStaleServiceNames: map[ServicePortName]bool{},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{
|
||||
expectedLocalEndpoints: map[types.NamespacedName]int{
|
||||
makeNSN("ns1", "ep1"): 1,
|
||||
},
|
||||
expectedChangedEndpoints: sets.NewString(),
|
||||
@ -589,7 +589,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
||||
},
|
||||
expectedStaleEndpoints: []ServiceEndpoint{},
|
||||
expectedStaleServiceNames: map[ServicePortName]bool{},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{},
|
||||
expectedLocalEndpoints: map[types.NamespacedName]int{},
|
||||
expectedChangedEndpoints: sets.NewString(),
|
||||
}, {
|
||||
name: "no change, multiple slices, multiple ports, local",
|
||||
@ -625,7 +625,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
||||
},
|
||||
expectedStaleEndpoints: []ServiceEndpoint{},
|
||||
expectedStaleServiceNames: map[ServicePortName]bool{},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{
|
||||
expectedLocalEndpoints: map[types.NamespacedName]int{
|
||||
makeNSN("ns1", "ep1"): 1,
|
||||
},
|
||||
expectedChangedEndpoints: sets.NewString(),
|
||||
@ -695,7 +695,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
||||
},
|
||||
expectedStaleEndpoints: []ServiceEndpoint{},
|
||||
expectedStaleServiceNames: map[ServicePortName]bool{},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{
|
||||
expectedLocalEndpoints: map[types.NamespacedName]int{
|
||||
makeNSN("ns1", "ep1"): 2,
|
||||
makeNSN("ns2", "ep2"): 1,
|
||||
},
|
||||
@ -718,7 +718,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
||||
expectedStaleServiceNames: map[ServicePortName]bool{
|
||||
makeServicePortName("ns1", "ep1", "", v1.ProtocolUDP): true,
|
||||
},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{
|
||||
expectedLocalEndpoints: map[types.NamespacedName]int{
|
||||
makeNSN("ns1", "ep1"): 1,
|
||||
},
|
||||
expectedChangedEndpoints: sets.NewString("ns1/ep1"),
|
||||
@ -741,7 +741,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
||||
ServicePortName: makeServicePortName("ns1", "ep1", "", v1.ProtocolUDP),
|
||||
}},
|
||||
expectedStaleServiceNames: map[ServicePortName]bool{},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{},
|
||||
expectedLocalEndpoints: map[types.NamespacedName]int{},
|
||||
expectedChangedEndpoints: sets.NewString("ns1/ep1"),
|
||||
}, {
|
||||
name: "add an IP and port",
|
||||
@ -770,7 +770,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
||||
expectedStaleServiceNames: map[ServicePortName]bool{
|
||||
makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true,
|
||||
},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{
|
||||
expectedLocalEndpoints: map[types.NamespacedName]int{
|
||||
makeNSN("ns1", "ep1"): 1,
|
||||
},
|
||||
expectedChangedEndpoints: sets.NewString("ns1/ep1"),
|
||||
@ -808,7 +808,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
||||
ServicePortName: makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP),
|
||||
}},
|
||||
expectedStaleServiceNames: map[ServicePortName]bool{},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{},
|
||||
expectedLocalEndpoints: map[types.NamespacedName]int{},
|
||||
expectedChangedEndpoints: sets.NewString("ns1/ep1"),
|
||||
}, {
|
||||
name: "add a slice to an endpoint",
|
||||
@ -837,7 +837,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
||||
expectedStaleServiceNames: map[ServicePortName]bool{
|
||||
makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true,
|
||||
},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{
|
||||
expectedLocalEndpoints: map[types.NamespacedName]int{
|
||||
makeNSN("ns1", "ep1"): 1,
|
||||
},
|
||||
expectedChangedEndpoints: sets.NewString("ns1/ep1"),
|
||||
@ -869,7 +869,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
||||
ServicePortName: makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP),
|
||||
}},
|
||||
expectedStaleServiceNames: map[ServicePortName]bool{},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{},
|
||||
expectedLocalEndpoints: map[types.NamespacedName]int{},
|
||||
expectedChangedEndpoints: sets.NewString("ns1/ep1"),
|
||||
}, {
|
||||
name: "rename a port",
|
||||
@ -896,7 +896,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
||||
expectedStaleServiceNames: map[ServicePortName]bool{
|
||||
makeServicePortName("ns1", "ep1", "p11-2", v1.ProtocolUDP): true,
|
||||
},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{},
|
||||
expectedLocalEndpoints: map[types.NamespacedName]int{},
|
||||
expectedChangedEndpoints: sets.NewString("ns1/ep1"),
|
||||
}, {
|
||||
name: "renumber a port",
|
||||
@ -921,7 +921,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
||||
ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP),
|
||||
}},
|
||||
expectedStaleServiceNames: map[ServicePortName]bool{},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{},
|
||||
expectedLocalEndpoints: map[types.NamespacedName]int{},
|
||||
expectedChangedEndpoints: sets.NewString("ns1/ep1"),
|
||||
}, {
|
||||
name: "complex add and remove",
|
||||
@ -1009,7 +1009,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
||||
makeServicePortName("ns1", "ep1", "p122", v1.ProtocolUDP): true,
|
||||
makeServicePortName("ns3", "ep3", "p33", v1.ProtocolUDP): true,
|
||||
},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{
|
||||
expectedLocalEndpoints: map[types.NamespacedName]int{
|
||||
makeNSN("ns4", "ep4"): 1,
|
||||
},
|
||||
expectedChangedEndpoints: sets.NewString("ns1/ep1", "ns2/ep2", "ns3/ep3", "ns4/ep4"),
|
||||
@ -1031,7 +1031,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
||||
expectedStaleServiceNames: map[ServicePortName]bool{
|
||||
makeServicePortName("ns1", "ep1", "", v1.ProtocolUDP): true,
|
||||
},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{},
|
||||
expectedLocalEndpoints: map[types.NamespacedName]int{},
|
||||
expectedChangedEndpoints: sets.NewString("ns1/ep1"),
|
||||
},
|
||||
}
|
||||
@ -1108,8 +1108,10 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
||||
t.Errorf("[%d] expected staleServiceNames[%v], but didn't find it: %v", tci, svcName, result.StaleServiceNames)
|
||||
}
|
||||
}
|
||||
if !reflect.DeepEqual(result.HCEndpointsLocalIPSize, tc.expectedHealthchecks) {
|
||||
t.Errorf("[%d] expected healthchecks %v, got %v", tci, tc.expectedHealthchecks, result.HCEndpointsLocalIPSize)
|
||||
|
||||
localReadyEndpoints := fp.endpointsMap.LocalReadyEndpoints()
|
||||
if !reflect.DeepEqual(localReadyEndpoints, tc.expectedLocalEndpoints) {
|
||||
t.Errorf("[%d] expected local ready endpoints %v, got %v", tci, tc.expectedLocalEndpoints, localReadyEndpoints)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
@ -1570,10 +1570,10 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
// Update service healthchecks. The endpoints list might include services that are
|
||||
// not "OnlyLocal", but the services list will not, and the serviceHealthServer
|
||||
// will just drop those endpoints.
|
||||
if err := proxier.serviceHealthServer.SyncServices(serviceUpdateResult.HCServiceNodePorts); err != nil {
|
||||
if err := proxier.serviceHealthServer.SyncServices(proxier.svcPortMap.HealthCheckNodePorts()); err != nil {
|
||||
klog.ErrorS(err, "Error syncing healthcheck services")
|
||||
}
|
||||
if err := proxier.serviceHealthServer.SyncEndpoints(endpointUpdateResult.HCEndpointsLocalIPSize); err != nil {
|
||||
if err := proxier.serviceHealthServer.SyncEndpoints(proxier.endpointsMap.LocalReadyEndpoints()); err != nil {
|
||||
klog.ErrorS(err, "Error syncing healthcheck endpoints")
|
||||
}
|
||||
|
||||
|
@ -4039,21 +4039,22 @@ func TestBuildServiceMapAddRemove(t *testing.T) {
|
||||
t.Errorf("expected service map length 10, got %v", fp.svcPortMap)
|
||||
}
|
||||
|
||||
// The only-local-loadbalancer ones get added
|
||||
if len(result.HCServiceNodePorts) != 1 {
|
||||
t.Errorf("expected 1 healthcheck port, got %v", result.HCServiceNodePorts)
|
||||
} else {
|
||||
nsn := makeNSN("somewhere", "only-local-load-balancer")
|
||||
if port, found := result.HCServiceNodePorts[nsn]; !found || port != 345 {
|
||||
t.Errorf("expected healthcheck port [%q]=345: got %v", nsn, result.HCServiceNodePorts)
|
||||
}
|
||||
}
|
||||
|
||||
if len(result.UDPStaleClusterIP) != 0 {
|
||||
// Services only added, so nothing stale yet
|
||||
t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP))
|
||||
}
|
||||
|
||||
// The only-local-loadbalancer ones get added
|
||||
healthCheckNodePorts := fp.svcPortMap.HealthCheckNodePorts()
|
||||
if len(healthCheckNodePorts) != 1 {
|
||||
t.Errorf("expected 1 healthcheck port, got %v", healthCheckNodePorts)
|
||||
} else {
|
||||
nsn := makeNSN("somewhere", "only-local-load-balancer")
|
||||
if port, found := healthCheckNodePorts[nsn]; !found || port != 345 {
|
||||
t.Errorf("expected healthcheck port [%q]=345: got %v", nsn, healthCheckNodePorts)
|
||||
}
|
||||
}
|
||||
|
||||
// Remove some stuff
|
||||
// oneService is a modification of services[0] with removed first port.
|
||||
oneService := makeTestService("somewhere-else", "cluster-ip", func(svc *v1.Service) {
|
||||
@ -4072,10 +4073,6 @@ func TestBuildServiceMapAddRemove(t *testing.T) {
|
||||
t.Errorf("expected service map length 1, got %v", fp.svcPortMap)
|
||||
}
|
||||
|
||||
if len(result.HCServiceNodePorts) != 0 {
|
||||
t.Errorf("expected 0 healthcheck ports, got %v", result.HCServiceNodePorts)
|
||||
}
|
||||
|
||||
// All services but one were deleted. While you'd expect only the ClusterIPs
|
||||
// from the three deleted services here, we still have the ClusterIP for
|
||||
// the not-deleted service, because one of it's ServicePorts was deleted.
|
||||
@ -4088,6 +4085,11 @@ func TestBuildServiceMapAddRemove(t *testing.T) {
|
||||
t.Errorf("expected stale UDP service service %s", ip)
|
||||
}
|
||||
}
|
||||
|
||||
healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts()
|
||||
if len(healthCheckNodePorts) != 0 {
|
||||
t.Errorf("expected 0 healthcheck ports, got %v", healthCheckNodePorts)
|
||||
}
|
||||
}
|
||||
|
||||
func TestBuildServiceMapServiceHeadless(t *testing.T) {
|
||||
@ -4112,14 +4114,15 @@ func TestBuildServiceMapServiceHeadless(t *testing.T) {
|
||||
t.Errorf("expected service map length 0, got %d", len(fp.svcPortMap))
|
||||
}
|
||||
|
||||
// No proxied services, so no healthchecks
|
||||
if len(result.HCServiceNodePorts) != 0 {
|
||||
t.Errorf("expected healthcheck ports length 0, got %d", len(result.HCServiceNodePorts))
|
||||
}
|
||||
|
||||
if len(result.UDPStaleClusterIP) != 0 {
|
||||
t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP))
|
||||
}
|
||||
|
||||
// No proxied services, so no healthchecks
|
||||
healthCheckNodePorts := fp.svcPortMap.HealthCheckNodePorts()
|
||||
if len(healthCheckNodePorts) != 0 {
|
||||
t.Errorf("expected healthcheck ports length 0, got %d", len(healthCheckNodePorts))
|
||||
}
|
||||
}
|
||||
|
||||
func TestBuildServiceMapServiceTypeExternalName(t *testing.T) {
|
||||
@ -4139,13 +4142,14 @@ func TestBuildServiceMapServiceTypeExternalName(t *testing.T) {
|
||||
if len(fp.svcPortMap) != 0 {
|
||||
t.Errorf("expected service map length 0, got %v", fp.svcPortMap)
|
||||
}
|
||||
// No proxied services, so no healthchecks
|
||||
if len(result.HCServiceNodePorts) != 0 {
|
||||
t.Errorf("expected healthcheck ports length 0, got %v", result.HCServiceNodePorts)
|
||||
}
|
||||
if len(result.UDPStaleClusterIP) != 0 {
|
||||
t.Errorf("expected stale UDP services length 0, got %v", result.UDPStaleClusterIP)
|
||||
}
|
||||
// No proxied services, so no healthchecks
|
||||
healthCheckNodePorts := fp.svcPortMap.HealthCheckNodePorts()
|
||||
if len(healthCheckNodePorts) != 0 {
|
||||
t.Errorf("expected healthcheck ports length 0, got %v", healthCheckNodePorts)
|
||||
}
|
||||
}
|
||||
|
||||
func TestBuildServiceMapServiceUpdate(t *testing.T) {
|
||||
@ -4179,13 +4183,14 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) {
|
||||
if len(fp.svcPortMap) != 2 {
|
||||
t.Errorf("expected service map length 2, got %v", fp.svcPortMap)
|
||||
}
|
||||
if len(result.HCServiceNodePorts) != 0 {
|
||||
t.Errorf("expected healthcheck ports length 0, got %v", result.HCServiceNodePorts)
|
||||
}
|
||||
if len(result.UDPStaleClusterIP) != 0 {
|
||||
// Services only added, so nothing stale yet
|
||||
t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP))
|
||||
}
|
||||
healthCheckNodePorts := fp.svcPortMap.HealthCheckNodePorts()
|
||||
if len(healthCheckNodePorts) != 0 {
|
||||
t.Errorf("expected healthcheck ports length 0, got %v", healthCheckNodePorts)
|
||||
}
|
||||
|
||||
// Change service to load-balancer
|
||||
fp.OnServiceUpdate(servicev1, servicev2)
|
||||
@ -4193,12 +4198,13 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) {
|
||||
if len(fp.svcPortMap) != 2 {
|
||||
t.Errorf("expected service map length 2, got %v", fp.svcPortMap)
|
||||
}
|
||||
if len(result.HCServiceNodePorts) != 1 {
|
||||
t.Errorf("expected healthcheck ports length 1, got %v", result.HCServiceNodePorts)
|
||||
}
|
||||
if len(result.UDPStaleClusterIP) != 0 {
|
||||
t.Errorf("expected stale UDP services length 0, got %v", result.UDPStaleClusterIP.UnsortedList())
|
||||
}
|
||||
healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts()
|
||||
if len(healthCheckNodePorts) != 1 {
|
||||
t.Errorf("expected healthcheck ports length 1, got %v", healthCheckNodePorts)
|
||||
}
|
||||
|
||||
// No change; make sure the service map stays the same and there are
|
||||
// no health-check changes
|
||||
@ -4207,12 +4213,13 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) {
|
||||
if len(fp.svcPortMap) != 2 {
|
||||
t.Errorf("expected service map length 2, got %v", fp.svcPortMap)
|
||||
}
|
||||
if len(result.HCServiceNodePorts) != 1 {
|
||||
t.Errorf("expected healthcheck ports length 1, got %v", result.HCServiceNodePorts)
|
||||
}
|
||||
if len(result.UDPStaleClusterIP) != 0 {
|
||||
t.Errorf("expected stale UDP services length 0, got %v", result.UDPStaleClusterIP.UnsortedList())
|
||||
}
|
||||
healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts()
|
||||
if len(healthCheckNodePorts) != 1 {
|
||||
t.Errorf("expected healthcheck ports length 1, got %v", healthCheckNodePorts)
|
||||
}
|
||||
|
||||
// And back to ClusterIP
|
||||
fp.OnServiceUpdate(servicev2, servicev1)
|
||||
@ -4220,13 +4227,14 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) {
|
||||
if len(fp.svcPortMap) != 2 {
|
||||
t.Errorf("expected service map length 2, got %v", fp.svcPortMap)
|
||||
}
|
||||
if len(result.HCServiceNodePorts) != 0 {
|
||||
t.Errorf("expected healthcheck ports length 0, got %v", result.HCServiceNodePorts)
|
||||
}
|
||||
if len(result.UDPStaleClusterIP) != 0 {
|
||||
// Services only added, so nothing stale yet
|
||||
t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP))
|
||||
}
|
||||
healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts()
|
||||
if len(healthCheckNodePorts) != 0 {
|
||||
t.Errorf("expected healthcheck ports length 0, got %v", healthCheckNodePorts)
|
||||
}
|
||||
}
|
||||
|
||||
func populateEndpointSlices(proxier *Proxier, allEndpointSlices ...*discovery.EndpointSlice) {
|
||||
@ -4633,7 +4641,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
||||
expectedResult map[proxy.ServicePortName][]*endpointsInfo
|
||||
expectedStaleEndpoints []proxy.ServiceEndpoint
|
||||
expectedStaleServiceNames map[proxy.ServicePortName]bool
|
||||
expectedHealthchecks map[types.NamespacedName]int
|
||||
expectedLocalEndpoints map[types.NamespacedName]int
|
||||
}{{
|
||||
// Case[0]: nothing
|
||||
name: "nothing",
|
||||
@ -4641,7 +4649,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
||||
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{},
|
||||
expectedStaleEndpoints: []proxy.ServiceEndpoint{},
|
||||
expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{},
|
||||
expectedLocalEndpoints: map[types.NamespacedName]int{},
|
||||
}, {
|
||||
// Case[1]: no change, named port, local
|
||||
name: "no change, named port, local",
|
||||
@ -4659,7 +4667,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
||||
},
|
||||
expectedStaleEndpoints: []proxy.ServiceEndpoint{},
|
||||
expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{
|
||||
expectedLocalEndpoints: map[types.NamespacedName]int{
|
||||
makeNSN("ns1", "ep1"): 1,
|
||||
},
|
||||
}, {
|
||||
@ -4685,7 +4693,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
||||
},
|
||||
expectedStaleEndpoints: []proxy.ServiceEndpoint{},
|
||||
expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{},
|
||||
expectedLocalEndpoints: map[types.NamespacedName]int{},
|
||||
}, {
|
||||
// Case[3]: no change, multiple subsets, multiple ports, local
|
||||
name: "no change, multiple subsets, multiple ports, local",
|
||||
@ -4715,7 +4723,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
||||
},
|
||||
expectedStaleEndpoints: []proxy.ServiceEndpoint{},
|
||||
expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{
|
||||
expectedLocalEndpoints: map[types.NamespacedName]int{
|
||||
makeNSN("ns1", "ep1"): 1,
|
||||
},
|
||||
}, {
|
||||
@ -4777,7 +4785,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
||||
},
|
||||
expectedStaleEndpoints: []proxy.ServiceEndpoint{},
|
||||
expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{
|
||||
expectedLocalEndpoints: map[types.NamespacedName]int{
|
||||
makeNSN("ns1", "ep1"): 2,
|
||||
makeNSN("ns2", "ep2"): 1,
|
||||
},
|
||||
@ -4796,7 +4804,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
||||
expectedStaleServiceNames: map[proxy.ServicePortName]bool{
|
||||
makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): true,
|
||||
},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{
|
||||
expectedLocalEndpoints: map[types.NamespacedName]int{
|
||||
makeNSN("ns1", "ep1"): 1,
|
||||
},
|
||||
}, {
|
||||
@ -4815,7 +4823,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
||||
ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP),
|
||||
}},
|
||||
expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{},
|
||||
expectedLocalEndpoints: map[types.NamespacedName]int{},
|
||||
}, {
|
||||
// Case[7]: add an IP and port
|
||||
name: "add an IP and port",
|
||||
@ -4840,7 +4848,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
||||
expectedStaleServiceNames: map[proxy.ServicePortName]bool{
|
||||
makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true,
|
||||
},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{
|
||||
expectedLocalEndpoints: map[types.NamespacedName]int{
|
||||
makeNSN("ns1", "ep1"): 1,
|
||||
},
|
||||
}, {
|
||||
@ -4874,7 +4882,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
||||
ServicePortName: makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP),
|
||||
}},
|
||||
expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{},
|
||||
expectedLocalEndpoints: map[types.NamespacedName]int{},
|
||||
}, {
|
||||
// Case[9]: add a subset
|
||||
name: "add a subset",
|
||||
@ -4897,7 +4905,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
||||
expectedStaleServiceNames: map[proxy.ServicePortName]bool{
|
||||
makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true,
|
||||
},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{
|
||||
expectedLocalEndpoints: map[types.NamespacedName]int{
|
||||
makeNSN("ns1", "ep1"): 1,
|
||||
},
|
||||
}, {
|
||||
@ -4923,7 +4931,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
||||
ServicePortName: makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP),
|
||||
}},
|
||||
expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{},
|
||||
expectedLocalEndpoints: map[types.NamespacedName]int{},
|
||||
}, {
|
||||
// Case[11]: rename a port
|
||||
name: "rename a port",
|
||||
@ -4946,7 +4954,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
||||
expectedStaleServiceNames: map[proxy.ServicePortName]bool{
|
||||
makeServicePortName("ns1", "ep1", "p11-2", v1.ProtocolUDP): true,
|
||||
},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{},
|
||||
expectedLocalEndpoints: map[types.NamespacedName]int{},
|
||||
}, {
|
||||
// Case[12]: renumber a port
|
||||
name: "renumber a port",
|
||||
@ -4967,7 +4975,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
||||
ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP),
|
||||
}},
|
||||
expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{},
|
||||
expectedLocalEndpoints: map[types.NamespacedName]int{},
|
||||
}, {
|
||||
// Case[13]: complex add and remove
|
||||
name: "complex add and remove",
|
||||
@ -5031,7 +5039,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
||||
makeServicePortName("ns1", "ep1", "p122", v1.ProtocolUDP): true,
|
||||
makeServicePortName("ns3", "ep3", "p33", v1.ProtocolUDP): true,
|
||||
},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{
|
||||
expectedLocalEndpoints: map[types.NamespacedName]int{
|
||||
makeNSN("ns4", "ep4"): 1,
|
||||
},
|
||||
}, {
|
||||
@ -5049,7 +5057,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
||||
expectedStaleServiceNames: map[proxy.ServicePortName]bool{
|
||||
makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): true,
|
||||
},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{},
|
||||
expectedLocalEndpoints: map[types.NamespacedName]int{},
|
||||
},
|
||||
}
|
||||
|
||||
@ -5117,8 +5125,9 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
||||
t.Errorf("[%d] expected staleServiceNames[%v], but didn't find it: %v", tci, svcName, result.StaleServiceNames)
|
||||
}
|
||||
}
|
||||
if !reflect.DeepEqual(result.HCEndpointsLocalIPSize, tc.expectedHealthchecks) {
|
||||
t.Errorf("[%d] expected healthchecks %v, got %v", tci, tc.expectedHealthchecks, result.HCEndpointsLocalIPSize)
|
||||
localReadyEndpoints := fp.endpointsMap.LocalReadyEndpoints()
|
||||
if !reflect.DeepEqual(localReadyEndpoints, tc.expectedLocalEndpoints) {
|
||||
t.Errorf("[%d] expected local endpoints %v, got %v", tci, tc.expectedLocalEndpoints, localReadyEndpoints)
|
||||
}
|
||||
})
|
||||
}
|
||||
@ -5277,9 +5286,10 @@ func TestHealthCheckNodePortWhenTerminating(t *testing.T) {
|
||||
}
|
||||
|
||||
fp.OnEndpointSliceAdd(endpointSlice)
|
||||
result := fp.endpointsMap.Update(fp.endpointsChanges)
|
||||
if len(result.HCEndpointsLocalIPSize) != 1 {
|
||||
t.Errorf("unexpected number of health check node ports, expected 1 but got: %d", len(result.HCEndpointsLocalIPSize))
|
||||
_ = fp.endpointsMap.Update(fp.endpointsChanges)
|
||||
localReadyEndpoints := fp.endpointsMap.LocalReadyEndpoints()
|
||||
if len(localReadyEndpoints) != 1 {
|
||||
t.Errorf("unexpected number of local ready endpoints, expected 1 but got: %d", len(localReadyEndpoints))
|
||||
}
|
||||
|
||||
// set all endpoints to terminating
|
||||
@ -5331,9 +5341,10 @@ func TestHealthCheckNodePortWhenTerminating(t *testing.T) {
|
||||
}
|
||||
|
||||
fp.OnEndpointSliceUpdate(endpointSlice, endpointSliceTerminating)
|
||||
result = fp.endpointsMap.Update(fp.endpointsChanges)
|
||||
if len(result.HCEndpointsLocalIPSize) != 0 {
|
||||
t.Errorf("unexpected number of health check node ports, expected 0 but got: %d", len(result.HCEndpointsLocalIPSize))
|
||||
_ = fp.endpointsMap.Update(fp.endpointsChanges)
|
||||
localReadyEndpoints = fp.endpointsMap.LocalReadyEndpoints()
|
||||
if len(localReadyEndpoints) != 0 {
|
||||
t.Errorf("unexpected number of local ready endpoints, expected 0 but got: %d", len(localReadyEndpoints))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1572,10 +1572,10 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
// Update service healthchecks. The endpoints list might include services that are
|
||||
// not "OnlyLocal", but the services list will not, and the serviceHealthServer
|
||||
// will just drop those endpoints.
|
||||
if err := proxier.serviceHealthServer.SyncServices(serviceUpdateResult.HCServiceNodePorts); err != nil {
|
||||
if err := proxier.serviceHealthServer.SyncServices(proxier.svcPortMap.HealthCheckNodePorts()); err != nil {
|
||||
klog.ErrorS(err, "Error syncing healthcheck services")
|
||||
}
|
||||
if err := proxier.serviceHealthServer.SyncEndpoints(endpointUpdateResult.HCEndpointsLocalIPSize); err != nil {
|
||||
if err := proxier.serviceHealthServer.SyncEndpoints(proxier.endpointsMap.LocalReadyEndpoints()); err != nil {
|
||||
klog.ErrorS(err, "Error syncing healthcheck endpoints")
|
||||
}
|
||||
|
||||
|
@ -2589,21 +2589,22 @@ func TestBuildServiceMapAddRemove(t *testing.T) {
|
||||
t.Errorf("expected service map length 12, got %v", fp.svcPortMap)
|
||||
}
|
||||
|
||||
// The only-local-loadbalancer ones get added
|
||||
if len(result.HCServiceNodePorts) != 1 {
|
||||
t.Errorf("expected 1 healthcheck port, got %v", result.HCServiceNodePorts)
|
||||
} else {
|
||||
nsn := makeNSN("somewhere", "only-local-load-balancer")
|
||||
if port, found := result.HCServiceNodePorts[nsn]; !found || port != 345 {
|
||||
t.Errorf("expected healthcheck port [%q]=345: got %v", nsn, result.HCServiceNodePorts)
|
||||
}
|
||||
}
|
||||
|
||||
if len(result.UDPStaleClusterIP) != 0 {
|
||||
// Services only added, so nothing stale yet
|
||||
t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP))
|
||||
}
|
||||
|
||||
// The only-local-loadbalancer ones get added
|
||||
healthCheckNodePorts := fp.svcPortMap.HealthCheckNodePorts()
|
||||
if len(healthCheckNodePorts) != 1 {
|
||||
t.Errorf("expected 1 healthcheck port, got %v", healthCheckNodePorts)
|
||||
} else {
|
||||
nsn := makeNSN("somewhere", "only-local-load-balancer")
|
||||
if port, found := healthCheckNodePorts[nsn]; !found || port != 345 {
|
||||
t.Errorf("expected healthcheck port [%q]=345: got %v", nsn, healthCheckNodePorts)
|
||||
}
|
||||
}
|
||||
|
||||
// Remove some stuff
|
||||
// oneService is a modification of services[0] with removed first port.
|
||||
oneService := makeTestService("somewhere-else", "cluster-ip", func(svc *v1.Service) {
|
||||
@ -2622,10 +2623,6 @@ func TestBuildServiceMapAddRemove(t *testing.T) {
|
||||
t.Errorf("expected service map length 1, got %v", fp.svcPortMap)
|
||||
}
|
||||
|
||||
if len(result.HCServiceNodePorts) != 0 {
|
||||
t.Errorf("expected 0 healthcheck ports, got %v", result.HCServiceNodePorts)
|
||||
}
|
||||
|
||||
// All services but one were deleted. While you'd expect only the ClusterIPs
|
||||
// from the three deleted services here, we still have the ClusterIP for
|
||||
// the not-deleted service, because one of it's ServicePorts was deleted.
|
||||
@ -2638,6 +2635,11 @@ func TestBuildServiceMapAddRemove(t *testing.T) {
|
||||
t.Errorf("expected stale UDP service service %s", ip)
|
||||
}
|
||||
}
|
||||
|
||||
healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts()
|
||||
if len(healthCheckNodePorts) != 0 {
|
||||
t.Errorf("expected 0 healthcheck ports, got %v", healthCheckNodePorts)
|
||||
}
|
||||
}
|
||||
|
||||
func TestBuildServiceMapServiceHeadless(t *testing.T) {
|
||||
@ -2669,14 +2671,15 @@ func TestBuildServiceMapServiceHeadless(t *testing.T) {
|
||||
t.Errorf("expected service map length 0, got %d", len(fp.svcPortMap))
|
||||
}
|
||||
|
||||
// No proxied services, so no healthchecks
|
||||
if len(result.HCServiceNodePorts) != 0 {
|
||||
t.Errorf("expected healthcheck ports length 0, got %d", len(result.HCServiceNodePorts))
|
||||
}
|
||||
|
||||
if len(result.UDPStaleClusterIP) != 0 {
|
||||
t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP))
|
||||
}
|
||||
|
||||
// No proxied services, so no healthchecks
|
||||
healthCheckNodePorts := fp.svcPortMap.HealthCheckNodePorts()
|
||||
if len(healthCheckNodePorts) != 0 {
|
||||
t.Errorf("expected healthcheck ports length 0, got %d", len(healthCheckNodePorts))
|
||||
}
|
||||
}
|
||||
|
||||
func TestBuildServiceMapServiceTypeExternalName(t *testing.T) {
|
||||
@ -2698,13 +2701,15 @@ func TestBuildServiceMapServiceTypeExternalName(t *testing.T) {
|
||||
if len(fp.svcPortMap) != 0 {
|
||||
t.Errorf("expected service map length 0, got %v", fp.svcPortMap)
|
||||
}
|
||||
// No proxied services, so no healthchecks
|
||||
if len(result.HCServiceNodePorts) != 0 {
|
||||
t.Errorf("expected healthcheck ports length 0, got %v", result.HCServiceNodePorts)
|
||||
}
|
||||
if len(result.UDPStaleClusterIP) != 0 {
|
||||
t.Errorf("expected stale UDP services length 0, got %v", result.UDPStaleClusterIP)
|
||||
}
|
||||
|
||||
// No proxied services, so no healthchecks
|
||||
healthCheckNodePorts := fp.svcPortMap.HealthCheckNodePorts()
|
||||
if len(healthCheckNodePorts) != 0 {
|
||||
t.Errorf("expected healthcheck ports length 0, got %v", healthCheckNodePorts)
|
||||
}
|
||||
}
|
||||
|
||||
func TestBuildServiceMapServiceUpdate(t *testing.T) {
|
||||
@ -2740,27 +2745,31 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) {
|
||||
if len(fp.svcPortMap) != 2 {
|
||||
t.Errorf("expected service map length 2, got %v", fp.svcPortMap)
|
||||
}
|
||||
if len(result.HCServiceNodePorts) != 0 {
|
||||
t.Errorf("expected healthcheck ports length 0, got %v", result.HCServiceNodePorts)
|
||||
}
|
||||
if len(result.UDPStaleClusterIP) != 0 {
|
||||
// Services only added, so nothing stale yet
|
||||
t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP))
|
||||
}
|
||||
|
||||
healthCheckNodePorts := fp.svcPortMap.HealthCheckNodePorts()
|
||||
if len(healthCheckNodePorts) != 0 {
|
||||
t.Errorf("expected healthcheck ports length 0, got %v", healthCheckNodePorts)
|
||||
}
|
||||
|
||||
// Change service to load-balancer
|
||||
fp.OnServiceUpdate(servicev1, servicev2)
|
||||
result = fp.svcPortMap.Update(fp.serviceChanges)
|
||||
if len(fp.svcPortMap) != 2 {
|
||||
t.Errorf("expected service map length 2, got %v", fp.svcPortMap)
|
||||
}
|
||||
if len(result.HCServiceNodePorts) != 1 {
|
||||
t.Errorf("expected healthcheck ports length 1, got %v", result.HCServiceNodePorts)
|
||||
}
|
||||
if len(result.UDPStaleClusterIP) != 0 {
|
||||
t.Errorf("expected stale UDP services length 0, got %v", result.UDPStaleClusterIP.List())
|
||||
}
|
||||
|
||||
healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts()
|
||||
if len(healthCheckNodePorts) != 1 {
|
||||
t.Errorf("expected healthcheck ports length 1, got %v", healthCheckNodePorts)
|
||||
}
|
||||
|
||||
// No change; make sure the service map stays the same and there are
|
||||
// no health-check changes
|
||||
fp.OnServiceUpdate(servicev2, servicev2)
|
||||
@ -2768,26 +2777,30 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) {
|
||||
if len(fp.svcPortMap) != 2 {
|
||||
t.Errorf("expected service map length 2, got %v", fp.svcPortMap)
|
||||
}
|
||||
if len(result.HCServiceNodePorts) != 1 {
|
||||
t.Errorf("expected healthcheck ports length 1, got %v", result.HCServiceNodePorts)
|
||||
}
|
||||
if len(result.UDPStaleClusterIP) != 0 {
|
||||
t.Errorf("expected stale UDP services length 0, got %v", result.UDPStaleClusterIP.List())
|
||||
}
|
||||
|
||||
healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts()
|
||||
if len(healthCheckNodePorts) != 1 {
|
||||
t.Errorf("expected healthcheck ports length 1, got %v", healthCheckNodePorts)
|
||||
}
|
||||
|
||||
// And back to ClusterIP
|
||||
fp.OnServiceUpdate(servicev2, servicev1)
|
||||
result = fp.svcPortMap.Update(fp.serviceChanges)
|
||||
if len(fp.svcPortMap) != 2 {
|
||||
t.Errorf("expected service map length 2, got %v", fp.svcPortMap)
|
||||
}
|
||||
if len(result.HCServiceNodePorts) != 0 {
|
||||
t.Errorf("expected healthcheck ports length 0, got %v", result.HCServiceNodePorts)
|
||||
}
|
||||
if len(result.UDPStaleClusterIP) != 0 {
|
||||
// Services only added, so nothing stale yet
|
||||
t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP))
|
||||
}
|
||||
|
||||
healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts()
|
||||
if len(healthCheckNodePorts) != 0 {
|
||||
t.Errorf("expected healthcheck ports length 0, got %v", healthCheckNodePorts)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSessionAffinity(t *testing.T) {
|
||||
@ -3188,7 +3201,7 @@ func Test_updateEndpointsMap(t *testing.T) {
|
||||
expectedResult map[proxy.ServicePortName][]*proxy.BaseEndpointInfo
|
||||
expectedStaleEndpoints []proxy.ServiceEndpoint
|
||||
expectedStaleServiceNames map[proxy.ServicePortName]bool
|
||||
expectedHealthchecks map[types.NamespacedName]int
|
||||
expectedReadyEndpoints map[types.NamespacedName]int
|
||||
}{{
|
||||
// Case[0]: nothing
|
||||
name: "nothing",
|
||||
@ -3196,7 +3209,7 @@ func Test_updateEndpointsMap(t *testing.T) {
|
||||
expectedResult: map[proxy.ServicePortName][]*proxy.BaseEndpointInfo{},
|
||||
expectedStaleEndpoints: []proxy.ServiceEndpoint{},
|
||||
expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{},
|
||||
expectedReadyEndpoints: map[types.NamespacedName]int{},
|
||||
}, {
|
||||
// Case[1]: no change, named port, local
|
||||
name: "no change, named port, local",
|
||||
@ -3214,7 +3227,7 @@ func Test_updateEndpointsMap(t *testing.T) {
|
||||
},
|
||||
expectedStaleEndpoints: []proxy.ServiceEndpoint{},
|
||||
expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{
|
||||
expectedReadyEndpoints: map[types.NamespacedName]int{
|
||||
makeNSN("ns1", "ep1"): 1,
|
||||
},
|
||||
}, {
|
||||
@ -3240,7 +3253,7 @@ func Test_updateEndpointsMap(t *testing.T) {
|
||||
},
|
||||
expectedStaleEndpoints: []proxy.ServiceEndpoint{},
|
||||
expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{},
|
||||
expectedReadyEndpoints: map[types.NamespacedName]int{},
|
||||
}, {
|
||||
// Case[3]: no change, multiple subsets, multiple ports, local
|
||||
name: "no change, multiple subsets, multiple ports, local",
|
||||
@ -3270,7 +3283,7 @@ func Test_updateEndpointsMap(t *testing.T) {
|
||||
},
|
||||
expectedStaleEndpoints: []proxy.ServiceEndpoint{},
|
||||
expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{
|
||||
expectedReadyEndpoints: map[types.NamespacedName]int{
|
||||
makeNSN("ns1", "ep1"): 1,
|
||||
},
|
||||
}, {
|
||||
@ -3332,7 +3345,7 @@ func Test_updateEndpointsMap(t *testing.T) {
|
||||
},
|
||||
expectedStaleEndpoints: []proxy.ServiceEndpoint{},
|
||||
expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{
|
||||
expectedReadyEndpoints: map[types.NamespacedName]int{
|
||||
makeNSN("ns1", "ep1"): 2,
|
||||
makeNSN("ns2", "ep2"): 1,
|
||||
},
|
||||
@ -3351,7 +3364,7 @@ func Test_updateEndpointsMap(t *testing.T) {
|
||||
expectedStaleServiceNames: map[proxy.ServicePortName]bool{
|
||||
makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): true,
|
||||
},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{
|
||||
expectedReadyEndpoints: map[types.NamespacedName]int{
|
||||
makeNSN("ns1", "ep1"): 1,
|
||||
},
|
||||
}, {
|
||||
@ -3370,7 +3383,7 @@ func Test_updateEndpointsMap(t *testing.T) {
|
||||
ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP),
|
||||
}},
|
||||
expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{},
|
||||
expectedReadyEndpoints: map[types.NamespacedName]int{},
|
||||
}, {
|
||||
// Case[7]: add an IP and port
|
||||
name: "add an IP and port",
|
||||
@ -3395,7 +3408,7 @@ func Test_updateEndpointsMap(t *testing.T) {
|
||||
expectedStaleServiceNames: map[proxy.ServicePortName]bool{
|
||||
makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true,
|
||||
},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{
|
||||
expectedReadyEndpoints: map[types.NamespacedName]int{
|
||||
makeNSN("ns1", "ep1"): 1,
|
||||
},
|
||||
}, {
|
||||
@ -3429,7 +3442,7 @@ func Test_updateEndpointsMap(t *testing.T) {
|
||||
ServicePortName: makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP),
|
||||
}},
|
||||
expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{},
|
||||
expectedReadyEndpoints: map[types.NamespacedName]int{},
|
||||
}, {
|
||||
// Case[9]: add a subset
|
||||
name: "add a subset",
|
||||
@ -3452,7 +3465,7 @@ func Test_updateEndpointsMap(t *testing.T) {
|
||||
expectedStaleServiceNames: map[proxy.ServicePortName]bool{
|
||||
makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true,
|
||||
},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{
|
||||
expectedReadyEndpoints: map[types.NamespacedName]int{
|
||||
makeNSN("ns1", "ep1"): 1,
|
||||
},
|
||||
}, {
|
||||
@ -3478,7 +3491,7 @@ func Test_updateEndpointsMap(t *testing.T) {
|
||||
ServicePortName: makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP),
|
||||
}},
|
||||
expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{},
|
||||
expectedReadyEndpoints: map[types.NamespacedName]int{},
|
||||
}, {
|
||||
// Case[11]: rename a port
|
||||
name: "rename a port",
|
||||
@ -3501,7 +3514,7 @@ func Test_updateEndpointsMap(t *testing.T) {
|
||||
expectedStaleServiceNames: map[proxy.ServicePortName]bool{
|
||||
makeServicePortName("ns1", "ep1", "p11-2", v1.ProtocolUDP): true,
|
||||
},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{},
|
||||
expectedReadyEndpoints: map[types.NamespacedName]int{},
|
||||
}, {
|
||||
// Case[12]: renumber a port
|
||||
name: "renumber a port",
|
||||
@ -3522,7 +3535,7 @@ func Test_updateEndpointsMap(t *testing.T) {
|
||||
ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP),
|
||||
}},
|
||||
expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{},
|
||||
expectedReadyEndpoints: map[types.NamespacedName]int{},
|
||||
}, {
|
||||
// Case[13]: complex add and remove
|
||||
name: "complex add and remove",
|
||||
@ -3586,7 +3599,7 @@ func Test_updateEndpointsMap(t *testing.T) {
|
||||
makeServicePortName("ns1", "ep1", "p122", v1.ProtocolUDP): true,
|
||||
makeServicePortName("ns3", "ep3", "p33", v1.ProtocolUDP): true,
|
||||
},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{
|
||||
expectedReadyEndpoints: map[types.NamespacedName]int{
|
||||
makeNSN("ns4", "ep4"): 1,
|
||||
},
|
||||
}, {
|
||||
@ -3604,7 +3617,7 @@ func Test_updateEndpointsMap(t *testing.T) {
|
||||
expectedStaleServiceNames: map[proxy.ServicePortName]bool{
|
||||
makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): true,
|
||||
},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{},
|
||||
expectedReadyEndpoints: map[types.NamespacedName]int{},
|
||||
},
|
||||
}
|
||||
|
||||
@ -3675,8 +3688,9 @@ func Test_updateEndpointsMap(t *testing.T) {
|
||||
t.Errorf("[%d] expected staleServiceNames[%v], but didn't find it: %v", tci, svcName, result.StaleServiceNames)
|
||||
}
|
||||
}
|
||||
if !reflect.DeepEqual(result.HCEndpointsLocalIPSize, tc.expectedHealthchecks) {
|
||||
t.Errorf("[%d] expected healthchecks %v, got %v", tci, tc.expectedHealthchecks, result.HCEndpointsLocalIPSize)
|
||||
localReadyEndpoints := fp.endpointsMap.LocalReadyEndpoints()
|
||||
if !reflect.DeepEqual(localReadyEndpoints, tc.expectedReadyEndpoints) {
|
||||
t.Errorf("[%d] expected healthchecks %v, got %v", tci, tc.expectedReadyEndpoints, localReadyEndpoints)
|
||||
}
|
||||
})
|
||||
}
|
||||
@ -4665,9 +4679,10 @@ func Test_HealthCheckNodePortWhenTerminating(t *testing.T) {
|
||||
}
|
||||
|
||||
fp.OnEndpointSliceAdd(endpointSlice)
|
||||
result := fp.endpointsMap.Update(fp.endpointsChanges)
|
||||
if len(result.HCEndpointsLocalIPSize) != 1 {
|
||||
t.Errorf("unexpected number of health check node ports, expected 1 but got: %d", len(result.HCEndpointsLocalIPSize))
|
||||
_ = fp.endpointsMap.Update(fp.endpointsChanges)
|
||||
localReadyEndpoints := fp.endpointsMap.LocalReadyEndpoints()
|
||||
if len(localReadyEndpoints) != 1 {
|
||||
t.Errorf("unexpected number of health check node ports, expected 1 but got: %d", len(localReadyEndpoints))
|
||||
}
|
||||
|
||||
// set all endpoints to terminating
|
||||
@ -4719,9 +4734,10 @@ func Test_HealthCheckNodePortWhenTerminating(t *testing.T) {
|
||||
}
|
||||
|
||||
fp.OnEndpointSliceUpdate(endpointSlice, endpointSliceTerminating)
|
||||
result = fp.endpointsMap.Update(fp.endpointsChanges)
|
||||
if len(result.HCEndpointsLocalIPSize) != 0 {
|
||||
t.Errorf("unexpected number of health check node ports, expected 0 but got: %d", len(result.HCEndpointsLocalIPSize))
|
||||
_ = fp.endpointsMap.Update(fp.endpointsChanges)
|
||||
localReadyEndpoints = fp.endpointsMap.LocalReadyEndpoints()
|
||||
if len(localReadyEndpoints) != 0 {
|
||||
t.Errorf("unexpected number of health check node ports, expected 0 but got: %d", len(localReadyEndpoints))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -337,9 +337,6 @@ func (sct *ServiceChangeTracker) PendingChanges() sets.String {
|
||||
|
||||
// UpdateServiceMapResult is the updated results after applying service changes.
|
||||
type UpdateServiceMapResult struct {
|
||||
// HCServiceNodePorts is a map of Service names to node port numbers which indicate the health of that Service on this Node.
|
||||
// The value(uint16) of HCServices map is the service health check node port.
|
||||
HCServiceNodePorts map[types.NamespacedName]uint16
|
||||
// UDPStaleClusterIP holds stale (no longer assigned to a Service) Service IPs that had UDP ports.
|
||||
// Callers can use this to abort timeout-waits or clear connection-tracking information.
|
||||
UDPStaleClusterIP sets.String
|
||||
@ -349,17 +346,21 @@ type UpdateServiceMapResult struct {
|
||||
func (sm ServicePortMap) Update(changes *ServiceChangeTracker) (result UpdateServiceMapResult) {
|
||||
result.UDPStaleClusterIP = sets.NewString()
|
||||
sm.apply(changes, result.UDPStaleClusterIP)
|
||||
return result
|
||||
}
|
||||
|
||||
// HealthCheckNodePorts returns a map of Service names to HealthCheckNodePort values
|
||||
// for all Services in sm with non-zero HealthCheckNodePort.
|
||||
func (sm ServicePortMap) HealthCheckNodePorts() map[types.NamespacedName]uint16 {
|
||||
// TODO: If this will appear to be computationally expensive, consider
|
||||
// computing this incrementally similarly to svcPortMap.
|
||||
result.HCServiceNodePorts = make(map[types.NamespacedName]uint16)
|
||||
ports := make(map[types.NamespacedName]uint16)
|
||||
for svcPortName, info := range sm {
|
||||
if info.HealthCheckNodePort() != 0 {
|
||||
result.HCServiceNodePorts[svcPortName.NamespacedName] = uint16(info.HealthCheckNodePort())
|
||||
ports[svcPortName.NamespacedName] = uint16(info.HealthCheckNodePort())
|
||||
}
|
||||
}
|
||||
|
||||
return result
|
||||
return ports
|
||||
}
|
||||
|
||||
// ServicePortMap maps a service to its ServicePort.
|
||||
|
@ -568,15 +568,15 @@ func TestServiceMapUpdateHeadless(t *testing.T) {
|
||||
if len(fp.svcPortMap) != 0 {
|
||||
t.Errorf("expected service map length 0, got %d", len(fp.svcPortMap))
|
||||
}
|
||||
|
||||
// No proxied services, so no healthchecks
|
||||
if len(result.HCServiceNodePorts) != 0 {
|
||||
t.Errorf("expected healthcheck ports length 0, got %d", len(result.HCServiceNodePorts))
|
||||
}
|
||||
|
||||
if len(result.UDPStaleClusterIP) != 0 {
|
||||
t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP))
|
||||
}
|
||||
|
||||
// No proxied services, so no healthchecks
|
||||
healthCheckNodePorts := fp.svcPortMap.HealthCheckNodePorts()
|
||||
if len(healthCheckNodePorts) != 0 {
|
||||
t.Errorf("expected healthcheck ports length 0, got %d", len(healthCheckNodePorts))
|
||||
}
|
||||
}
|
||||
|
||||
func TestUpdateServiceTypeExternalName(t *testing.T) {
|
||||
@ -599,13 +599,15 @@ func TestUpdateServiceTypeExternalName(t *testing.T) {
|
||||
if len(fp.svcPortMap) != 0 {
|
||||
t.Errorf("expected service map length 0, got %v", fp.svcPortMap)
|
||||
}
|
||||
// No proxied services, so no healthchecks
|
||||
if len(result.HCServiceNodePorts) != 0 {
|
||||
t.Errorf("expected healthcheck ports length 0, got %v", result.HCServiceNodePorts)
|
||||
}
|
||||
if len(result.UDPStaleClusterIP) != 0 {
|
||||
t.Errorf("expected stale UDP services length 0, got %v", result.UDPStaleClusterIP)
|
||||
}
|
||||
|
||||
// No proxied services, so no healthchecks
|
||||
healthCheckNodePorts := fp.svcPortMap.HealthCheckNodePorts()
|
||||
if len(healthCheckNodePorts) != 0 {
|
||||
t.Errorf("expected healthcheck ports length 0, got %v", healthCheckNodePorts)
|
||||
}
|
||||
}
|
||||
|
||||
func TestBuildServiceMapAddRemove(t *testing.T) {
|
||||
@ -671,22 +673,22 @@ func TestBuildServiceMapAddRemove(t *testing.T) {
|
||||
if len(fp.svcPortMap) != 8 {
|
||||
t.Errorf("expected service map length 2, got %v", fp.svcPortMap)
|
||||
}
|
||||
|
||||
// The only-local-loadbalancer ones get added
|
||||
if len(result.HCServiceNodePorts) != 1 {
|
||||
t.Errorf("expected 1 healthcheck port, got %v", result.HCServiceNodePorts)
|
||||
} else {
|
||||
nsn := makeNSN("ns1", "only-local-load-balancer")
|
||||
if port, found := result.HCServiceNodePorts[nsn]; !found || port != 345 {
|
||||
t.Errorf("expected healthcheck port [%q]=345: got %v", nsn, result.HCServiceNodePorts)
|
||||
}
|
||||
}
|
||||
|
||||
if len(result.UDPStaleClusterIP) != 0 {
|
||||
// Services only added, so nothing stale yet
|
||||
t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP))
|
||||
}
|
||||
|
||||
// The only-local-loadbalancer ones get added
|
||||
healthCheckNodePorts := fp.svcPortMap.HealthCheckNodePorts()
|
||||
if len(healthCheckNodePorts) != 1 {
|
||||
t.Errorf("expected 1 healthcheck port, got %v", healthCheckNodePorts)
|
||||
} else {
|
||||
nsn := makeNSN("ns1", "only-local-load-balancer")
|
||||
if port, found := healthCheckNodePorts[nsn]; !found || port != 345 {
|
||||
t.Errorf("expected healthcheck port [%q]=345: got %v", nsn, healthCheckNodePorts)
|
||||
}
|
||||
}
|
||||
|
||||
// Remove some stuff
|
||||
// oneService is a modification of services[0] with removed first port.
|
||||
oneService := makeTestService("ns2", "cluster-ip", func(svc *v1.Service) {
|
||||
@ -709,8 +711,9 @@ func TestBuildServiceMapAddRemove(t *testing.T) {
|
||||
t.Errorf("expected service map length 1, got %v", fp.svcPortMap)
|
||||
}
|
||||
|
||||
if len(result.HCServiceNodePorts) != 0 {
|
||||
t.Errorf("expected 0 healthcheck ports, got %v", result.HCServiceNodePorts)
|
||||
healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts()
|
||||
if len(healthCheckNodePorts) != 0 {
|
||||
t.Errorf("expected 0 healthcheck ports, got %v", healthCheckNodePorts)
|
||||
}
|
||||
|
||||
// All services but one were deleted. While you'd expect only the ClusterIPs
|
||||
@ -761,14 +764,16 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) {
|
||||
if len(fp.svcPortMap) != 2 {
|
||||
t.Errorf("expected service map length 2, got %v", fp.svcPortMap)
|
||||
}
|
||||
if len(result.HCServiceNodePorts) != 0 {
|
||||
t.Errorf("expected healthcheck ports length 0, got %v", result.HCServiceNodePorts)
|
||||
}
|
||||
if len(result.UDPStaleClusterIP) != 0 {
|
||||
// Services only added, so nothing stale yet
|
||||
t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP))
|
||||
}
|
||||
|
||||
healthCheckNodePorts := fp.svcPortMap.HealthCheckNodePorts()
|
||||
if len(healthCheckNodePorts) != 0 {
|
||||
t.Errorf("expected healthcheck ports length 0, got %v", healthCheckNodePorts)
|
||||
}
|
||||
|
||||
// Change service to load-balancer
|
||||
fp.updateService(servicev1, servicev2)
|
||||
pending = fp.serviceChanges.PendingChanges()
|
||||
@ -779,13 +784,15 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) {
|
||||
if len(fp.svcPortMap) != 2 {
|
||||
t.Errorf("expected service map length 2, got %v", fp.svcPortMap)
|
||||
}
|
||||
if len(result.HCServiceNodePorts) != 1 {
|
||||
t.Errorf("expected healthcheck ports length 1, got %v", result.HCServiceNodePorts)
|
||||
}
|
||||
if len(result.UDPStaleClusterIP) != 0 {
|
||||
t.Errorf("expected stale UDP services length 0, got %v", result.UDPStaleClusterIP.UnsortedList())
|
||||
}
|
||||
|
||||
healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts()
|
||||
if len(healthCheckNodePorts) != 1 {
|
||||
t.Errorf("expected healthcheck ports length 1, got %v", healthCheckNodePorts)
|
||||
}
|
||||
|
||||
// No change; make sure the service map stays the same and there are
|
||||
// no health-check changes
|
||||
fp.updateService(servicev2, servicev2)
|
||||
@ -797,13 +804,15 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) {
|
||||
if len(fp.svcPortMap) != 2 {
|
||||
t.Errorf("expected service map length 2, got %v", fp.svcPortMap)
|
||||
}
|
||||
if len(result.HCServiceNodePorts) != 1 {
|
||||
t.Errorf("expected healthcheck ports length 1, got %v", result.HCServiceNodePorts)
|
||||
}
|
||||
if len(result.UDPStaleClusterIP) != 0 {
|
||||
t.Errorf("expected stale UDP services length 0, got %v", result.UDPStaleClusterIP.UnsortedList())
|
||||
}
|
||||
|
||||
healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts()
|
||||
if len(healthCheckNodePorts) != 1 {
|
||||
t.Errorf("expected healthcheck ports length 1, got %v", healthCheckNodePorts)
|
||||
}
|
||||
|
||||
// And back to ClusterIP
|
||||
fp.updateService(servicev2, servicev1)
|
||||
pending = fp.serviceChanges.PendingChanges()
|
||||
@ -814,11 +823,13 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) {
|
||||
if len(fp.svcPortMap) != 2 {
|
||||
t.Errorf("expected service map length 2, got %v", fp.svcPortMap)
|
||||
}
|
||||
if len(result.HCServiceNodePorts) != 0 {
|
||||
t.Errorf("expected healthcheck ports length 0, got %v", result.HCServiceNodePorts)
|
||||
}
|
||||
if len(result.UDPStaleClusterIP) != 0 {
|
||||
// Services only added, so nothing stale yet
|
||||
t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP))
|
||||
}
|
||||
|
||||
healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts()
|
||||
if len(healthCheckNodePorts) != 0 {
|
||||
t.Errorf("expected healthcheck ports length 0, got %v", healthCheckNodePorts)
|
||||
}
|
||||
}
|
||||
|
@ -1640,10 +1640,10 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
// Update service healthchecks. The endpoints list might include services that are
|
||||
// not "OnlyLocal", but the services list will not, and the serviceHealthServer
|
||||
// will just drop those endpoints.
|
||||
if err := proxier.serviceHealthServer.SyncServices(serviceUpdateResult.HCServiceNodePorts); err != nil {
|
||||
if err := proxier.serviceHealthServer.SyncServices(proxier.svcPortMap.HealthCheckNodePorts()); err != nil {
|
||||
klog.ErrorS(err, "Error syncing healthcheck services")
|
||||
}
|
||||
if err := proxier.serviceHealthServer.SyncEndpoints(endpointUpdateResult.HCEndpointsLocalIPSize); err != nil {
|
||||
if err := proxier.serviceHealthServer.SyncEndpoints(proxier.endpointsMap.LocalReadyEndpoints()); err != nil {
|
||||
klog.ErrorS(err, "Error syncing healthcheck endpoints")
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user