Simplify nftables/proxier.go by removing partial syncing
Since optimization will be done differently in nftables.
This commit is contained in:
@@ -36,7 +36,6 @@ import (
|
||||
v1 "k8s.io/api/core/v1"
|
||||
discovery "k8s.io/api/discovery/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/client-go/tools/events"
|
||||
utilsysctl "k8s.io/component-helpers/node/util/sysctl"
|
||||
@@ -153,7 +152,6 @@ type Proxier struct {
|
||||
// updating iptables with some partial data after kube-proxy restart.
|
||||
endpointSlicesSynced bool
|
||||
servicesSynced bool
|
||||
needFullSync bool
|
||||
initialized int32
|
||||
syncRunner *async.BoundedFrequencyRunner // governs calls to syncProxyRules
|
||||
syncPeriod time.Duration
|
||||
@@ -265,7 +263,6 @@ func NewProxier(ipFamily v1.IPFamily,
|
||||
serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipFamily, recorder, nil),
|
||||
endpointsMap: make(proxy.EndpointsMap),
|
||||
endpointsChanges: proxy.NewEndpointsChangeTracker(hostname, newEndpointInfo, ipFamily, recorder, nil),
|
||||
needFullSync: true,
|
||||
syncPeriod: syncPeriod,
|
||||
iptables: ipt,
|
||||
masqueradeAll: masqueradeAll,
|
||||
@@ -298,7 +295,7 @@ func NewProxier(ipFamily v1.IPFamily,
|
||||
proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, time.Hour, burstSyncs)
|
||||
|
||||
go ipt.Monitor(kubeProxyCanaryChain, []utiliptables.Table{utiliptables.TableMangle, utiliptables.TableNAT, utiliptables.TableFilter},
|
||||
proxier.forceSyncProxyRules, syncPeriod, wait.NeverStop)
|
||||
proxier.syncProxyRules, syncPeriod, wait.NeverStop)
|
||||
|
||||
if ipt.HasRandomFully() {
|
||||
klog.V(2).InfoS("Iptables supports --random-fully", "ipFamily", ipt.Protocol())
|
||||
@@ -606,7 +603,6 @@ func (proxier *Proxier) OnNodeAdd(node *v1.Node) {
|
||||
for k, v := range node.Labels {
|
||||
proxier.nodeLabels[k] = v
|
||||
}
|
||||
proxier.needFullSync = true
|
||||
proxier.mu.Unlock()
|
||||
klog.V(4).InfoS("Updated proxier node labels", "labels", node.Labels)
|
||||
|
||||
@@ -631,7 +627,6 @@ func (proxier *Proxier) OnNodeUpdate(oldNode, node *v1.Node) {
|
||||
for k, v := range node.Labels {
|
||||
proxier.nodeLabels[k] = v
|
||||
}
|
||||
proxier.needFullSync = true
|
||||
proxier.mu.Unlock()
|
||||
klog.V(4).InfoS("Updated proxier node labels", "labels", node.Labels)
|
||||
|
||||
@@ -649,7 +644,6 @@ func (proxier *Proxier) OnNodeDelete(node *v1.Node) {
|
||||
|
||||
proxier.mu.Lock()
|
||||
proxier.nodeLabels = nil
|
||||
proxier.needFullSync = true
|
||||
proxier.mu.Unlock()
|
||||
|
||||
proxier.Sync()
|
||||
@@ -731,17 +725,6 @@ func isServiceChainName(chainString string) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// Called by the iptables.Monitor, and in response to topology changes; this calls
|
||||
// syncProxyRules() and tells it to resync all services, regardless of whether the
|
||||
// Service or Endpoints/EndpointSlice objects themselves have changed
|
||||
func (proxier *Proxier) forceSyncProxyRules() {
|
||||
proxier.mu.Lock()
|
||||
proxier.needFullSync = true
|
||||
proxier.mu.Unlock()
|
||||
|
||||
proxier.syncProxyRules()
|
||||
}
|
||||
|
||||
// This is where all of the iptables-save/restore calls happen.
|
||||
// The only other iptables rules are those that are setup in iptablesInit()
|
||||
// This assumes proxier.mu is NOT held
|
||||
@@ -755,27 +738,13 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
return
|
||||
}
|
||||
|
||||
// The value of proxier.needFullSync may change before the defer funcs run, so
|
||||
// we need to keep track of whether it was set at the *start* of the sync.
|
||||
tryPartialSync := !proxier.needFullSync
|
||||
|
||||
// Keep track of how long syncs take.
|
||||
start := time.Now()
|
||||
defer func() {
|
||||
metrics.SyncProxyRulesLatency.Observe(metrics.SinceInSeconds(start))
|
||||
if tryPartialSync {
|
||||
metrics.SyncPartialProxyRulesLatency.Observe(metrics.SinceInSeconds(start))
|
||||
} else {
|
||||
metrics.SyncFullProxyRulesLatency.Observe(metrics.SinceInSeconds(start))
|
||||
}
|
||||
klog.V(2).InfoS("SyncProxyRules complete", "elapsed", time.Since(start))
|
||||
}()
|
||||
|
||||
var serviceChanged, endpointsChanged sets.Set[string]
|
||||
if tryPartialSync {
|
||||
serviceChanged = proxier.serviceChanges.PendingChanges()
|
||||
endpointsChanged = proxier.endpointsChanges.PendingChanges()
|
||||
}
|
||||
serviceUpdateResult := proxier.svcPortMap.Update(proxier.serviceChanges)
|
||||
endpointUpdateResult := proxier.endpointsMap.Update(proxier.endpointsChanges)
|
||||
|
||||
@@ -786,42 +755,25 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
if !success {
|
||||
klog.InfoS("Sync failed", "retryingTime", proxier.syncPeriod)
|
||||
proxier.syncRunner.RetryAfter(proxier.syncPeriod)
|
||||
if tryPartialSync {
|
||||
metrics.IptablesPartialRestoreFailuresTotal.Inc()
|
||||
}
|
||||
// proxier.serviceChanges and proxier.endpointChanges have already
|
||||
// been flushed, so we've lost the state needed to be able to do
|
||||
// a partial sync.
|
||||
proxier.needFullSync = true
|
||||
}
|
||||
}()
|
||||
|
||||
if !tryPartialSync {
|
||||
// Ensure that our jump rules (eg from PREROUTING to KUBE-SERVICES) exist.
|
||||
// We can't do this as part of the iptables-restore because we don't want
|
||||
// to specify/replace *all* of the rules in PREROUTING, etc.
|
||||
//
|
||||
// We need to create these rules when kube-proxy first starts, and we need
|
||||
// to recreate them if the utiliptables Monitor detects that iptables has
|
||||
// been flushed. In both of those cases, the code will force a full sync.
|
||||
// In all other cases, it ought to be safe to assume that the rules
|
||||
// already exist, so we'll skip this step when doing a partial sync, to
|
||||
// save us from having to invoke /sbin/iptables 20 times on each sync
|
||||
// (which will be very slow on hosts with lots of iptables rules).
|
||||
for _, jump := range append(iptablesJumpChains, iptablesKubeletJumpChains...) {
|
||||
if _, err := proxier.iptables.EnsureChain(jump.table, jump.dstChain); err != nil {
|
||||
klog.ErrorS(err, "Failed to ensure chain exists", "table", jump.table, "chain", jump.dstChain)
|
||||
return
|
||||
}
|
||||
args := jump.extraArgs
|
||||
if jump.comment != "" {
|
||||
args = append(args, "-m", "comment", "--comment", jump.comment)
|
||||
}
|
||||
args = append(args, "-j", string(jump.dstChain))
|
||||
if _, err := proxier.iptables.EnsureRule(utiliptables.Prepend, jump.table, jump.srcChain, args...); err != nil {
|
||||
klog.ErrorS(err, "Failed to ensure chain jumps", "table", jump.table, "srcChain", jump.srcChain, "dstChain", jump.dstChain)
|
||||
return
|
||||
}
|
||||
// Ensure that our jump rules (eg from PREROUTING to KUBE-SERVICES) exist.
|
||||
// We can't do this as part of the iptables-restore because we don't want
|
||||
// to specify/replace *all* of the rules in PREROUTING, etc.
|
||||
for _, jump := range append(iptablesJumpChains, iptablesKubeletJumpChains...) {
|
||||
if _, err := proxier.iptables.EnsureChain(jump.table, jump.dstChain); err != nil {
|
||||
klog.ErrorS(err, "Failed to ensure chain exists", "table", jump.table, "chain", jump.dstChain)
|
||||
return
|
||||
}
|
||||
args := jump.extraArgs
|
||||
if jump.comment != "" {
|
||||
args = append(args, "-m", "comment", "--comment", jump.comment)
|
||||
}
|
||||
args = append(args, "-j", string(jump.dstChain))
|
||||
if _, err := proxier.iptables.EnsureRule(utiliptables.Prepend, jump.table, jump.srcChain, args...); err != nil {
|
||||
klog.ErrorS(err, "Failed to ensure chain jumps", "table", jump.table, "srcChain", jump.srcChain, "dstChain", jump.dstChain)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
@@ -836,9 +788,6 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
proxier.natChains.Reset()
|
||||
proxier.natRules.Reset()
|
||||
|
||||
skippedNatChains := proxyutil.NewDiscardLineBuffer()
|
||||
skippedNatRules := proxyutil.NewDiscardLineBuffer()
|
||||
|
||||
// Write chain lines for all the "top-level" chains we'll be filling in
|
||||
for _, chainName := range []utiliptables.Chain{kubeServicesChain, kubeExternalServicesChain, kubeForwardChain, kubeNodePortsChain, kubeProxyFirewallChain} {
|
||||
proxier.filterChains.Write(utiliptables.MakeChainLine(chainName))
|
||||
@@ -1047,13 +996,9 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
}
|
||||
}
|
||||
|
||||
filterRules := proxier.filterRules
|
||||
natChains := proxier.natChains
|
||||
natRules := proxier.natRules
|
||||
|
||||
// Capture the clusterIP.
|
||||
if hasInternalEndpoints {
|
||||
natRules.Write(
|
||||
proxier.natRules.Write(
|
||||
"-A", string(kubeServicesChain),
|
||||
"-m", "comment", "--comment", fmt.Sprintf(`"%s cluster IP"`, svcPortNameString),
|
||||
"-m", protocol, "-p", protocol,
|
||||
@@ -1062,7 +1007,7 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
"-j", string(internalTrafficChain))
|
||||
} else {
|
||||
// No endpoints.
|
||||
filterRules.Write(
|
||||
proxier.filterRules.Write(
|
||||
"-A", string(kubeServicesChain),
|
||||
"-m", "comment", "--comment", internalTrafficFilterComment,
|
||||
"-m", protocol, "-p", protocol,
|
||||
@@ -1077,7 +1022,7 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
if hasEndpoints {
|
||||
// Send traffic bound for external IPs to the "external
|
||||
// destinations" chain.
|
||||
natRules.Write(
|
||||
proxier.natRules.Write(
|
||||
"-A", string(kubeServicesChain),
|
||||
"-m", "comment", "--comment", fmt.Sprintf(`"%s external IP"`, svcPortNameString),
|
||||
"-m", protocol, "-p", protocol,
|
||||
@@ -1089,7 +1034,7 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
// Either no endpoints at all (REJECT) or no endpoints for
|
||||
// external traffic (DROP anything that didn't get
|
||||
// short-circuited by the EXT chain.)
|
||||
filterRules.Write(
|
||||
proxier.filterRules.Write(
|
||||
"-A", string(kubeExternalServicesChain),
|
||||
"-m", "comment", "--comment", externalTrafficFilterComment,
|
||||
"-m", protocol, "-p", protocol,
|
||||
@@ -1103,7 +1048,7 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
// Capture load-balancer ingress.
|
||||
for _, lbip := range svcInfo.LoadBalancerVIPStrings() {
|
||||
if hasEndpoints {
|
||||
natRules.Write(
|
||||
proxier.natRules.Write(
|
||||
"-A", string(kubeServicesChain),
|
||||
"-m", "comment", "--comment", fmt.Sprintf(`"%s loadbalancer IP"`, svcPortNameString),
|
||||
"-m", protocol, "-p", protocol,
|
||||
@@ -1113,7 +1058,7 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
|
||||
}
|
||||
if usesFWChain {
|
||||
filterRules.Write(
|
||||
proxier.filterRules.Write(
|
||||
"-A", string(kubeProxyFirewallChain),
|
||||
"-m", "comment", "--comment", fmt.Sprintf(`"%s traffic not accepted by %s"`, svcPortNameString, svcInfo.firewallChainName),
|
||||
"-m", protocol, "-p", protocol,
|
||||
@@ -1127,7 +1072,7 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
// external traffic (DROP anything that didn't get short-circuited
|
||||
// by the EXT chain.)
|
||||
for _, lbip := range svcInfo.LoadBalancerVIPStrings() {
|
||||
filterRules.Write(
|
||||
proxier.filterRules.Write(
|
||||
"-A", string(kubeExternalServicesChain),
|
||||
"-m", "comment", "--comment", externalTrafficFilterComment,
|
||||
"-m", protocol, "-p", protocol,
|
||||
@@ -1144,7 +1089,7 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
// Jump to the external destination chain. For better or for
|
||||
// worse, nodeports are not subect to loadBalancerSourceRanges,
|
||||
// and we can't change that.
|
||||
natRules.Write(
|
||||
proxier.natRules.Write(
|
||||
"-A", string(kubeNodePortsChain),
|
||||
"-m", "comment", "--comment", svcPortNameString,
|
||||
"-m", protocol, "-p", protocol,
|
||||
@@ -1155,7 +1100,7 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
// Either no endpoints at all (REJECT) or no endpoints for
|
||||
// external traffic (DROP anything that didn't get
|
||||
// short-circuited by the EXT chain.)
|
||||
filterRules.Write(
|
||||
proxier.filterRules.Write(
|
||||
"-A", string(kubeExternalServicesChain),
|
||||
"-m", "comment", "--comment", externalTrafficFilterComment,
|
||||
"-m", "addrtype", "--dst-type", "LOCAL",
|
||||
@@ -1170,7 +1115,7 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
if svcInfo.HealthCheckNodePort() != 0 {
|
||||
// no matter if node has local endpoints, healthCheckNodePorts
|
||||
// need to add a rule to accept the incoming connection
|
||||
filterRules.Write(
|
||||
proxier.filterRules.Write(
|
||||
"-A", string(kubeNodePortsChain),
|
||||
"-m", "comment", "--comment", fmt.Sprintf(`"%s health check node port"`, svcPortNameString),
|
||||
"-m", "tcp", "-p", "tcp",
|
||||
@@ -1179,16 +1124,6 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
)
|
||||
}
|
||||
|
||||
// If the SVC/SVL/EXT/FW/SEP chains have not changed since the last sync
|
||||
// then we can omit them from the restore input. (We have already marked
|
||||
// them in activeNATChains, so they won't get deleted.) However, we have
|
||||
// to still figure out how many chains we _would_ have written to make the
|
||||
// metrics come out right, so we just compute them and throw them away.
|
||||
if tryPartialSync && !serviceChanged.Has(svcName.NamespacedName.String()) && !endpointsChanged.Has(svcName.NamespacedName.String()) {
|
||||
natChains = skippedNatChains
|
||||
natRules = skippedNatRules
|
||||
}
|
||||
|
||||
// Set up internal traffic handling.
|
||||
if hasInternalEndpoints {
|
||||
args = append(args[:0],
|
||||
@@ -1198,7 +1133,7 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
"--dport", strconv.Itoa(svcInfo.Port()),
|
||||
)
|
||||
if proxier.masqueradeAll {
|
||||
natRules.Write(
|
||||
proxier.natRules.Write(
|
||||
"-A", string(internalTrafficChain),
|
||||
args,
|
||||
"-j", string(kubeMarkMasqChain))
|
||||
@@ -1208,7 +1143,7 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
// Service range, routing to any node, and that node will
|
||||
// bridge into the Service for you. Since that might bounce
|
||||
// off-node, we masquerade here.
|
||||
natRules.Write(
|
||||
proxier.natRules.Write(
|
||||
"-A", string(internalTrafficChain),
|
||||
args,
|
||||
proxier.localDetector.IfNotLocal(),
|
||||
@@ -1221,12 +1156,12 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
// jump to externalTrafficChain, which will handle some special cases and
|
||||
// then jump to externalPolicyChain.
|
||||
if usesExternalTrafficChain {
|
||||
natChains.Write(utiliptables.MakeChainLine(externalTrafficChain))
|
||||
proxier.natChains.Write(utiliptables.MakeChainLine(externalTrafficChain))
|
||||
|
||||
if !svcInfo.ExternalPolicyLocal() {
|
||||
// If we are using non-local endpoints we need to masquerade,
|
||||
// in case we cross nodes.
|
||||
natRules.Write(
|
||||
proxier.natRules.Write(
|
||||
"-A", string(externalTrafficChain),
|
||||
"-m", "comment", "--comment", fmt.Sprintf(`"masquerade traffic for %s external destinations"`, svcPortNameString),
|
||||
"-j", string(kubeMarkMasqChain))
|
||||
@@ -1239,7 +1174,7 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
// traffic as a special-case. It is subject to neither
|
||||
// form of traffic policy, which simulates going up-and-out
|
||||
// to an external load-balancer and coming back in.
|
||||
natRules.Write(
|
||||
proxier.natRules.Write(
|
||||
"-A", string(externalTrafficChain),
|
||||
"-m", "comment", "--comment", fmt.Sprintf(`"pod traffic for %s external destinations"`, svcPortNameString),
|
||||
proxier.localDetector.IfLocal(),
|
||||
@@ -1249,7 +1184,7 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
// Locally originated traffic (not a pod, but the host node)
|
||||
// still needs masquerade because the LBIP itself is a local
|
||||
// address, so that will be the chosen source IP.
|
||||
natRules.Write(
|
||||
proxier.natRules.Write(
|
||||
"-A", string(externalTrafficChain),
|
||||
"-m", "comment", "--comment", fmt.Sprintf(`"masquerade LOCAL traffic for %s external destinations"`, svcPortNameString),
|
||||
"-m", "addrtype", "--src-type", "LOCAL",
|
||||
@@ -1258,7 +1193,7 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
// Redirect all src-type=LOCAL -> external destination to the
|
||||
// policy=cluster chain. This allows traffic originating
|
||||
// from the host to be redirected to the service correctly.
|
||||
natRules.Write(
|
||||
proxier.natRules.Write(
|
||||
"-A", string(externalTrafficChain),
|
||||
"-m", "comment", "--comment", fmt.Sprintf(`"route LOCAL traffic for %s external destinations"`, svcPortNameString),
|
||||
"-m", "addrtype", "--src-type", "LOCAL",
|
||||
@@ -1267,7 +1202,7 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
|
||||
// Anything else falls thru to the appropriate policy chain.
|
||||
if hasExternalEndpoints {
|
||||
natRules.Write(
|
||||
proxier.natRules.Write(
|
||||
"-A", string(externalTrafficChain),
|
||||
"-j", string(externalPolicyChain))
|
||||
}
|
||||
@@ -1275,7 +1210,7 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
|
||||
// Set up firewall chain, if needed
|
||||
if usesFWChain {
|
||||
natChains.Write(utiliptables.MakeChainLine(fwChain))
|
||||
proxier.natChains.Write(utiliptables.MakeChainLine(fwChain))
|
||||
|
||||
// The service firewall rules are created based on the
|
||||
// loadBalancerSourceRanges field. This only works for VIP-like
|
||||
@@ -1290,7 +1225,7 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
// firewall filter based on each source range
|
||||
allowFromNode := false
|
||||
for _, src := range svcInfo.LoadBalancerSourceRanges() {
|
||||
natRules.Write(args, "-s", src, "-j", string(externalTrafficChain))
|
||||
proxier.natRules.Write(args, "-s", src, "-j", string(externalTrafficChain))
|
||||
_, cidr, err := netutils.ParseCIDRSloppy(src)
|
||||
if err != nil {
|
||||
klog.ErrorS(err, "Error parsing CIDR in LoadBalancerSourceRanges, dropping it", "cidr", cidr)
|
||||
@@ -1305,7 +1240,7 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
// need the following rules to allow requests from this node.
|
||||
if allowFromNode {
|
||||
for _, lbip := range svcInfo.LoadBalancerVIPStrings() {
|
||||
natRules.Write(
|
||||
proxier.natRules.Write(
|
||||
args,
|
||||
"-s", lbip,
|
||||
"-j", string(externalTrafficChain))
|
||||
@@ -1314,7 +1249,7 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
// If the packet was able to reach the end of firewall chain,
|
||||
// then it did not get DNATed, so it will match the
|
||||
// corresponding KUBE-PROXY-FIREWALL rule.
|
||||
natRules.Write(
|
||||
proxier.natRules.Write(
|
||||
"-A", string(fwChain),
|
||||
"-m", "comment", "--comment", fmt.Sprintf(`"other traffic to %s will be dropped by KUBE-PROXY-FIREWALL"`, svcPortNameString),
|
||||
)
|
||||
@@ -1323,15 +1258,15 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
// If Cluster policy is in use, create the chain and create rules jumping
|
||||
// from clusterPolicyChain to the clusterEndpoints
|
||||
if usesClusterPolicyChain {
|
||||
natChains.Write(utiliptables.MakeChainLine(clusterPolicyChain))
|
||||
proxier.writeServiceToEndpointRules(natRules, svcPortNameString, svcInfo, clusterPolicyChain, clusterEndpoints, args)
|
||||
proxier.natChains.Write(utiliptables.MakeChainLine(clusterPolicyChain))
|
||||
proxier.writeServiceToEndpointRules(svcPortNameString, svcInfo, clusterPolicyChain, clusterEndpoints, args)
|
||||
}
|
||||
|
||||
// If Local policy is in use, create the chain and create rules jumping
|
||||
// from localPolicyChain to the localEndpoints
|
||||
if usesLocalPolicyChain {
|
||||
natChains.Write(utiliptables.MakeChainLine(localPolicyChain))
|
||||
proxier.writeServiceToEndpointRules(natRules, svcPortNameString, svcInfo, localPolicyChain, localEndpoints, args)
|
||||
proxier.natChains.Write(utiliptables.MakeChainLine(localPolicyChain))
|
||||
proxier.writeServiceToEndpointRules(svcPortNameString, svcInfo, localPolicyChain, localEndpoints, args)
|
||||
}
|
||||
|
||||
// Generate the per-endpoint chains.
|
||||
@@ -1345,13 +1280,13 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
endpointChain := epInfo.ChainName
|
||||
|
||||
// Create the endpoint chain
|
||||
natChains.Write(utiliptables.MakeChainLine(endpointChain))
|
||||
proxier.natChains.Write(utiliptables.MakeChainLine(endpointChain))
|
||||
activeNATChains[endpointChain] = true
|
||||
|
||||
args = append(args[:0], "-A", string(endpointChain))
|
||||
args = append(args, "-m", "comment", "--comment", svcPortNameString)
|
||||
// Handle traffic that loops back to the originator with SNAT.
|
||||
natRules.Write(
|
||||
proxier.natRules.Write(
|
||||
args,
|
||||
"-s", epInfo.IP(),
|
||||
"-j", string(kubeMarkMasqChain))
|
||||
@@ -1361,7 +1296,7 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
}
|
||||
// DNAT to final destination.
|
||||
args = append(args, "-m", protocol, "-p", protocol, "-j", "DNAT", "--to-destination", epInfo.String())
|
||||
natRules.Write(args)
|
||||
proxier.natRules.Write(args)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1468,9 +1403,7 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
)
|
||||
|
||||
metrics.IptablesRulesTotal.WithLabelValues(string(utiliptables.TableFilter)).Set(float64(proxier.filterRules.Lines()))
|
||||
metrics.IptablesRulesLastSync.WithLabelValues(string(utiliptables.TableFilter)).Set(float64(proxier.filterRules.Lines()))
|
||||
metrics.IptablesRulesTotal.WithLabelValues(string(utiliptables.TableNAT)).Set(float64(proxier.natRules.Lines() + skippedNatRules.Lines() - deletedChains))
|
||||
metrics.IptablesRulesLastSync.WithLabelValues(string(utiliptables.TableNAT)).Set(float64(proxier.natRules.Lines() - deletedChains))
|
||||
metrics.IptablesRulesTotal.WithLabelValues(string(utiliptables.TableNAT)).Set(float64(proxier.natRules.Lines() - deletedChains))
|
||||
|
||||
// Sync rules.
|
||||
proxier.iptablesData.Reset()
|
||||
@@ -1506,7 +1439,6 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
return
|
||||
}
|
||||
success = true
|
||||
proxier.needFullSync = false
|
||||
|
||||
for name, lastChangeTriggerTimes := range endpointUpdateResult.LastChangeTriggerTimes {
|
||||
for _, lastChangeTriggerTime := range lastChangeTriggerTimes {
|
||||
@@ -1537,7 +1469,7 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
conntrack.CleanStaleEntries(proxier.iptables.IsIPv6(), proxier.exec, proxier.svcPortMap, serviceUpdateResult, endpointUpdateResult)
|
||||
}
|
||||
|
||||
func (proxier *Proxier) writeServiceToEndpointRules(natRules proxyutil.LineBuffer, svcPortNameString string, svcInfo proxy.ServicePort, svcChain utiliptables.Chain, endpoints []proxy.Endpoint, args []string) {
|
||||
func (proxier *Proxier) writeServiceToEndpointRules(svcPortNameString string, svcInfo proxy.ServicePort, svcChain utiliptables.Chain, endpoints []proxy.Endpoint, args []string) {
|
||||
// First write session affinity rules, if applicable.
|
||||
if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
|
||||
for _, ep := range endpoints {
|
||||
@@ -1556,7 +1488,7 @@ func (proxier *Proxier) writeServiceToEndpointRules(natRules proxyutil.LineBuffe
|
||||
"--rcheck", "--seconds", strconv.Itoa(svcInfo.StickyMaxAgeSeconds()), "--reap",
|
||||
"-j", string(epInfo.ChainName),
|
||||
)
|
||||
natRules.Write(args)
|
||||
proxier.natRules.Write(args)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1579,6 +1511,6 @@ func (proxier *Proxier) writeServiceToEndpointRules(natRules proxyutil.LineBuffe
|
||||
"--probability", proxier.probability(numEndpoints-i))
|
||||
}
|
||||
// The final (or only if n == 1) rule is a guaranteed match.
|
||||
natRules.Write(args, "-j", string(epInfo.ChainName))
|
||||
proxier.natRules.Write(args, "-j", string(epInfo.ChainName))
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user