proxy/iptables: fix all-vs-ready endpoints a bit

Filter the allEndpoints list into readyEndpoints sooner, and set
"hasEndpoints" based (mostly) on readyEndpoints, not allEndpoints (so
that, eg, we correctly generate REJECT rules for services with no
_functioning_ endpoints, even if they have unusable terminating
endpoints).

Also, write out the endpoint chains at the top of the loop when we
iterate the endpoints for the first time, rather than copying some of
the data to another set of variables and then writing them out later.
And don't write out endpoint chains that won't be used

Also, generate affinity rules only for readyEndpoints rather than
allEndpoints, so affinity gets broken correctly when an endpoint
becomes unready.
This commit is contained in:
Dan Winship
2021-11-02 11:34:38 -04:00
parent 3679639cf1
commit 229ae58520
2 changed files with 217 additions and 127 deletions

View File

@@ -955,12 +955,8 @@ func (proxier *Proxier) syncProxyRules() {
// you should always do one of the below:
// slice = slice[:0] // and then append to it
// slice = append(slice[:0], ...)
endpoints := make([]*endpointsInfo, 0)
endpointChains := make([]utiliptables.Chain, 0)
readyEndpoints := make([]*endpointsInfo, 0)
readyEndpointChains := make([]utiliptables.Chain, 0)
localReadyEndpointChains := make([]utiliptables.Chain, 0)
localServingTerminatingEndpointChains := make([]utiliptables.Chain, 0)
localEndpointChains := make([]utiliptables.Chain, 0)
// To avoid growing this slice, we arbitrarily set its size to 64,
// there is never more than that many arguments for a single line.
@@ -1002,7 +998,82 @@ func (proxier *Proxier) syncProxyRules() {
// Service does not have conflicting configuration such as
// externalTrafficPolicy=Local.
allEndpoints = proxy.FilterEndpoints(allEndpoints, svcInfo, proxier.nodeLabels)
hasEndpoints := len(allEndpoints) > 0
// Scan the endpoints list to see what we have. "hasEndpoints" will be true
// if there are any usable endpoints for this service anywhere in the cluster.
var hasEndpoints, hasLocalReadyEndpoints, hasLocalServingTerminatingEndpoints bool
for _, ep := range allEndpoints {
if ep.IsReady() {
hasEndpoints = true
if ep.GetIsLocal() {
hasLocalReadyEndpoints = true
}
} else if svc.NodeLocalExternal() && utilfeature.DefaultFeatureGate.Enabled(features.ProxyTerminatingEndpoints) {
if ep.IsServing() && ep.IsTerminating() {
hasEndpoints = true
if ep.GetIsLocal() {
hasLocalServingTerminatingEndpoints = true
}
}
}
}
useTerminatingEndpoints := !hasLocalReadyEndpoints && hasLocalServingTerminatingEndpoints
// Generate the per-endpoint chains.
readyEndpointChains = readyEndpointChains[:0]
localEndpointChains = localEndpointChains[:0]
for _, ep := range allEndpoints {
epInfo, ok := ep.(*endpointsInfo)
if !ok {
klog.ErrorS(err, "Failed to cast endpointsInfo", "endpointsInfo", ep)
continue
}
endpointChain := epInfo.endpointChain(svcNameString, protocol)
endpointInUse := false
if epInfo.Ready {
readyEndpointChains = append(readyEndpointChains, endpointChain)
endpointInUse = true
}
if svc.NodeLocalExternal() && epInfo.IsLocal {
if useTerminatingEndpoints {
if epInfo.Serving && epInfo.Terminating {
localEndpointChains = append(localEndpointChains, endpointChain)
endpointInUse = true
}
} else if epInfo.Ready {
localEndpointChains = append(localEndpointChains, endpointChain)
endpointInUse = true
}
}
if !endpointInUse {
continue
}
// Create the endpoint chain, retaining counters if possible.
if chain, ok := existingNATChains[endpointChain]; ok {
utilproxy.WriteBytesLine(proxier.natChains, chain)
} else {
utilproxy.WriteLine(proxier.natChains, utiliptables.MakeChainLine(endpointChain))
}
activeNATChains[endpointChain] = true
args = append(args[:0], "-A", string(endpointChain))
args = proxier.appendServiceCommentLocked(args, svcNameString)
// Handle traffic that loops back to the originator with SNAT.
utilproxy.WriteLine(proxier.natRules, append(args,
"-s", utilproxy.ToCIDR(netutils.ParseIPSloppy(epInfo.IP())),
"-j", string(KubeMarkMasqChain))...)
// Update client-affinity lists.
if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
args = append(args, "-m", "recent", "--name", string(endpointChain), "--set")
}
// DNAT to final destination.
args = append(args, "-m", protocol, "-p", protocol, "-j", "DNAT", "--to-destination", epInfo.Endpoint)
utilproxy.WriteLine(proxier.natRules, args...)
}
svcChain := svcInfo.servicePortChainName
if hasEndpoints {
@@ -1319,35 +1390,9 @@ func (proxier *Proxier) syncProxyRules() {
continue
}
// Generate the per-endpoint chains. We do this in multiple passes so we
// can group rules together.
// These two slices parallel each other - keep in sync
endpoints = endpoints[:0]
endpointChains = endpointChains[:0]
var endpointChain utiliptables.Chain
for _, ep := range allEndpoints {
epInfo, ok := ep.(*endpointsInfo)
if !ok {
klog.ErrorS(err, "Failed to cast endpointsInfo", "endpointsInfo", ep)
continue
}
endpoints = append(endpoints, epInfo)
endpointChain = epInfo.endpointChain(svcNameString, protocol)
endpointChains = append(endpointChains, endpointChain)
// Create the endpoint chain, retaining counters if possible.
if chain, ok := existingNATChains[endpointChain]; ok {
utilproxy.WriteBytesLine(proxier.natChains, chain)
} else {
utilproxy.WriteLine(proxier.natChains, utiliptables.MakeChainLine(endpointChain))
}
activeNATChains[endpointChain] = true
}
// First write session affinity rules, if applicable.
if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
for _, endpointChain := range endpointChains {
for _, endpointChain := range readyEndpointChains {
args = append(args[:0],
"-A", string(svcChain),
)
@@ -1361,30 +1406,7 @@ func (proxier *Proxier) syncProxyRules() {
}
}
// Firstly, categorize each endpoint into three buckets:
// 1. all endpoints that are ready and NOT terminating.
// 2. all endpoints that are local, ready and NOT terminating, and externalTrafficPolicy=Local
// 3. all endpoints that are local, serving and terminating, and externalTrafficPolicy=Local
readyEndpointChains = readyEndpointChains[:0]
readyEndpoints := readyEndpoints[:0]
localReadyEndpointChains := localReadyEndpointChains[:0]
localServingTerminatingEndpointChains := localServingTerminatingEndpointChains[:0]
for i, endpointChain := range endpointChains {
if endpoints[i].Ready {
readyEndpointChains = append(readyEndpointChains, endpointChain)
readyEndpoints = append(readyEndpoints, endpoints[i])
}
if svc.NodeLocalExternal() && endpoints[i].IsLocal {
if endpoints[i].Ready {
localReadyEndpointChains = append(localReadyEndpointChains, endpointChain)
} else if endpoints[i].Serving && endpoints[i].Terminating {
localServingTerminatingEndpointChains = append(localServingTerminatingEndpointChains, endpointChain)
}
}
}
// Now write loadbalancing & DNAT rules.
// Now write loadbalancing rules
numReadyEndpoints := len(readyEndpointChains)
for i, endpointChain := range readyEndpointChains {
// Balancing rules in the per-service chain.
@@ -1402,25 +1424,6 @@ func (proxier *Proxier) syncProxyRules() {
utilproxy.WriteLine(proxier.natRules, args...)
}
// Every endpoint gets a chain, regardless of its state. This is required later since we may
// want to jump to endpoint chains that are terminating.
for i, endpointChain := range endpointChains {
// Rules in the per-endpoint chain.
args = append(args[:0], "-A", string(endpointChain))
args = proxier.appendServiceCommentLocked(args, svcNameString)
// Handle traffic that loops back to the originator with SNAT.
utilproxy.WriteLine(proxier.natRules, append(args,
"-s", utilproxy.ToCIDR(netutils.ParseIPSloppy(endpoints[i].IP())),
"-j", string(KubeMarkMasqChain))...)
// Update client-affinity lists.
if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
args = append(args, "-m", "recent", "--name", string(endpointChain), "--set")
}
// DNAT to final destination.
args = append(args, "-m", protocol, "-p", protocol, "-j", "DNAT", "--to-destination", endpoints[i].Endpoint)
utilproxy.WriteLine(proxier.natRules, args...)
}
// The logic below this applies only if this service is marked as OnlyLocal
if !svcInfo.NodeLocalExternal() {
continue
@@ -1449,12 +1452,6 @@ func (proxier *Proxier) syncProxyRules() {
"-m", "comment", "--comment", fmt.Sprintf(`"route LOCAL traffic for %s LB IP to service chain"`, svcNameString),
"-m", "addrtype", "--src-type", "LOCAL", "-j", string(svcChain))...)
// Prefer local ready endpoint chains, but fall back to ready terminating if none exist
localEndpointChains := localReadyEndpointChains
if utilfeature.DefaultFeatureGate.Enabled(features.ProxyTerminatingEndpoints) && len(localEndpointChains) == 0 {
localEndpointChains = localServingTerminatingEndpointChains
}
numLocalEndpoints := len(localEndpointChains)
if numLocalEndpoints == 0 {
// Blackhole all traffic since there are no local endpoints