flush conntrack entry for udp service when # of backend changes from 0 to non-0
This commit is contained in:
parent
96d8ab725b
commit
25ac521f88
@ -755,16 +755,16 @@ func (proxier *Proxier) OnEndpointsSynced() {
|
|||||||
func updateEndpointsMap(
|
func updateEndpointsMap(
|
||||||
endpointsMap proxyEndpointsMap,
|
endpointsMap proxyEndpointsMap,
|
||||||
changes *endpointsChangeMap,
|
changes *endpointsChangeMap,
|
||||||
hostname string) (hcEndpoints map[types.NamespacedName]int, staleSet map[endpointServicePair]bool) {
|
hostname string) (hcEndpoints map[types.NamespacedName]int, staleEndpoints map[endpointServicePair]bool, staleServiceNames map[proxy.ServicePortName]bool) {
|
||||||
staleSet = make(map[endpointServicePair]bool)
|
staleEndpoints = make(map[endpointServicePair]bool)
|
||||||
|
staleServiceNames = make(map[proxy.ServicePortName]bool)
|
||||||
func() {
|
func() {
|
||||||
changes.lock.Lock()
|
changes.lock.Lock()
|
||||||
defer changes.lock.Unlock()
|
defer changes.lock.Unlock()
|
||||||
for _, change := range changes.items {
|
for _, change := range changes.items {
|
||||||
endpointsMap.unmerge(change.previous)
|
endpointsMap.unmerge(change.previous)
|
||||||
endpointsMap.merge(change.current)
|
endpointsMap.merge(change.current)
|
||||||
detectStaleConnections(change.previous, change.current, staleSet)
|
detectStaleConnections(change.previous, change.current, staleEndpoints, staleServiceNames)
|
||||||
}
|
}
|
||||||
changes.items = make(map[types.NamespacedName]*endpointsChange)
|
changes.items = make(map[types.NamespacedName]*endpointsChange)
|
||||||
}()
|
}()
|
||||||
@ -781,12 +781,11 @@ func updateEndpointsMap(
|
|||||||
hcEndpoints[nsn] = len(ips)
|
hcEndpoints[nsn] = len(ips)
|
||||||
}
|
}
|
||||||
|
|
||||||
return hcEndpoints, staleSet
|
return hcEndpoints, staleEndpoints, staleServiceNames
|
||||||
}
|
}
|
||||||
|
|
||||||
// <staleEndpoints> are modified by this function with detected stale
|
// <staleEndpoints> and <staleServices> are modified by this function with detected stale connections.
|
||||||
// connections.
|
func detectStaleConnections(oldEndpointsMap, newEndpointsMap proxyEndpointsMap, staleEndpoints map[endpointServicePair]bool, staleServiceNames map[proxy.ServicePortName]bool) {
|
||||||
func detectStaleConnections(oldEndpointsMap, newEndpointsMap proxyEndpointsMap, staleEndpoints map[endpointServicePair]bool) {
|
|
||||||
for svcPortName, epList := range oldEndpointsMap {
|
for svcPortName, epList := range oldEndpointsMap {
|
||||||
for _, ep := range epList {
|
for _, ep := range epList {
|
||||||
stale := true
|
stale := true
|
||||||
@ -802,6 +801,13 @@ func detectStaleConnections(oldEndpointsMap, newEndpointsMap proxyEndpointsMap,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for svcPortName, epList := range newEndpointsMap {
|
||||||
|
// For udp service, if its backend changes from 0 to non-0. There may exist a conntrack entry that could blackhole traffic to the service.
|
||||||
|
if len(epList) > 0 && len(oldEndpointsMap[svcPortName]) == 0 {
|
||||||
|
staleServiceNames[svcPortName] = true
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func getLocalIPs(endpointsMap proxyEndpointsMap) map[types.NamespacedName]sets.String {
|
func getLocalIPs(endpointsMap proxyEndpointsMap) map[types.NamespacedName]sets.String {
|
||||||
@ -980,14 +986,23 @@ func (proxier *Proxier) syncProxyRules() {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var staleServices sets.String
|
||||||
// We assume that if this was called, we really want to sync them,
|
// We assume that if this was called, we really want to sync them,
|
||||||
// even if nothing changed in the meantime. In other words, callers are
|
// even if nothing changed in the meantime. In other words, callers are
|
||||||
// responsible for detecting no-op changes and not calling this function.
|
// responsible for detecting no-op changes and not calling this function.
|
||||||
hcServices, staleServices := updateServiceMap(
|
hcServices, staleServices := updateServiceMap(
|
||||||
proxier.serviceMap, &proxier.serviceChanges)
|
proxier.serviceMap, &proxier.serviceChanges)
|
||||||
hcEndpoints, staleEndpoints := updateEndpointsMap(
|
hcEndpoints, staleEndpoints, staleServiceNames := updateEndpointsMap(
|
||||||
proxier.endpointsMap, &proxier.endpointsChanges, proxier.hostname)
|
proxier.endpointsMap, &proxier.endpointsChanges, proxier.hostname)
|
||||||
|
|
||||||
|
// merge stale services gathered from updateEndpointsMap
|
||||||
|
for svcPortName := range staleServiceNames {
|
||||||
|
if svcInfo, ok := proxier.serviceMap[svcPortName]; ok && svcInfo != nil && svcInfo.protocol == api.ProtocolUDP {
|
||||||
|
glog.V(2).Infof("Stale udp service %v -> %s", svcPortName, svcInfo.clusterIP.String())
|
||||||
|
staleServices.Insert(svcInfo.clusterIP.String())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
glog.V(3).Infof("Syncing iptables rules")
|
glog.V(3).Infof("Syncing iptables rules")
|
||||||
|
|
||||||
// Create and link the kube services chain.
|
// Create and link the kube services chain.
|
||||||
|
Loading…
Reference in New Issue
Block a user