Port NAT rules to nftables (and backend is now functional)

This commit is contained in:
Dan Winship
2023-05-19 08:38:50 -04:00
parent 0c5c620b4f
commit ef1347b06d
4 changed files with 615 additions and 287 deletions

View File

@@ -51,8 +51,6 @@ import (
proxyutil "k8s.io/kubernetes/pkg/proxy/util"
proxyutiliptables "k8s.io/kubernetes/pkg/proxy/util/iptables"
"k8s.io/kubernetes/pkg/util/async"
utiliptables "k8s.io/kubernetes/pkg/util/iptables"
utiliptablestesting "k8s.io/kubernetes/pkg/util/iptables/testing"
utilexec "k8s.io/utils/exec"
netutils "k8s.io/utils/net"
"k8s.io/utils/ptr"
@@ -117,14 +115,17 @@ func newServiceInfo(port *v1.ServicePort, service *v1.Service, bsvcPortInfo *pro
type endpointInfo struct {
*proxy.BaseEndpointInfo
chainName string
chainName string
affinitySetName string
}
// returns a new proxy.Endpoint which abstracts a endpointInfo
func newEndpointInfo(baseInfo *proxy.BaseEndpointInfo, svcPortName *proxy.ServicePortName) proxy.Endpoint {
chainNameBase := servicePortEndpointChainNameBase(svcPortName, strings.ToLower(string(svcPortName.Protocol)), baseInfo.String())
return &endpointInfo{
BaseEndpointInfo: baseInfo,
chainName: servicePortEndpointChainNamePrefix + servicePortEndpointChainNameBase(svcPortName, strings.ToLower(string(svcPortName.Protocol)), baseInfo.String()),
chainName: servicePortEndpointChainNamePrefix + chainNameBase,
affinitySetName: servicePortEndpointAffinityNamePrefix + chainNameBase,
}
}
@@ -154,7 +155,6 @@ type Proxier struct {
syncPeriod time.Duration
// These are effectively const and do not need the mutex to be held.
iptables utiliptables.Interface
nftables knftables.Interface
masqueradeAll bool
masqueradeMark string
@@ -167,15 +167,6 @@ type Proxier struct {
serviceHealthServer healthcheck.ServiceHealthServer
healthzServer *healthcheck.ProxierHealthServer
// Since converting probabilities (floats) to strings is expensive
// and we are using only probabilities in the format of 1/n, we are
// precomputing some number of those and cache for future reuse.
precomputedProbabilities []string
// The following buffers are used to reuse memory and avoid allocations
// that are significantly impacting performance.
natRules proxyutil.LineBuffer
// conntrackTCPLiberal indicates whether the system sets the kernel nf_conntrack_tcp_be_liberal
conntrackTCPLiberal bool
@@ -244,29 +235,26 @@ func NewProxier(ipFamily v1.IPFamily,
}
proxier := &Proxier{
ipFamily: ipFamily,
svcPortMap: make(proxy.ServicePortMap),
serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipFamily, recorder, nil),
endpointsMap: make(proxy.EndpointsMap),
endpointsChanges: proxy.NewEndpointsChangeTracker(hostname, newEndpointInfo, ipFamily, recorder, nil),
syncPeriod: syncPeriod,
iptables: utiliptablestesting.NewFake(),
nftables: nft,
masqueradeAll: masqueradeAll,
masqueradeMark: masqueradeMark,
exec: utilexec.New(),
localDetector: localDetector,
hostname: hostname,
nodeIP: nodeIP,
recorder: recorder,
serviceHealthServer: serviceHealthServer,
healthzServer: healthzServer,
precomputedProbabilities: make([]string, 0, 1001),
natRules: proxyutil.NewLineBuffer(),
nodePortAddresses: nodePortAddresses,
networkInterfacer: proxyutil.RealNetwork{},
conntrackTCPLiberal: conntrackTCPLiberal,
staleChains: make(map[string]time.Time),
ipFamily: ipFamily,
svcPortMap: make(proxy.ServicePortMap),
serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipFamily, recorder, nil),
endpointsMap: make(proxy.EndpointsMap),
endpointsChanges: proxy.NewEndpointsChangeTracker(hostname, newEndpointInfo, ipFamily, recorder, nil),
syncPeriod: syncPeriod,
nftables: nft,
masqueradeAll: masqueradeAll,
masqueradeMark: masqueradeMark,
exec: utilexec.New(),
localDetector: localDetector,
hostname: hostname,
nodeIP: nodeIP,
recorder: recorder,
serviceHealthServer: serviceHealthServer,
healthzServer: healthzServer,
nodePortAddresses: nodePortAddresses,
networkInterfacer: proxyutil.RealNetwork{},
conntrackTCPLiberal: conntrackTCPLiberal,
staleChains: make(map[string]time.Time),
}
burstSyncs := 2
@@ -470,28 +458,6 @@ func CleanupLeftovers() bool {
return encounteredError
}
func computeProbability(n int) string {
return fmt.Sprintf("%0.10f", 1.0/float64(n))
}
// This assumes proxier.mu is held
func (proxier *Proxier) precomputeProbabilities(numberOfPrecomputed int) {
if len(proxier.precomputedProbabilities) == 0 {
proxier.precomputedProbabilities = append(proxier.precomputedProbabilities, "<bad value>")
}
for i := len(proxier.precomputedProbabilities); i <= numberOfPrecomputed; i++ {
proxier.precomputedProbabilities = append(proxier.precomputedProbabilities, computeProbability(i))
}
}
// This assumes proxier.mu is held
func (proxier *Proxier) probability(n int) string {
if n >= len(proxier.precomputedProbabilities) {
proxier.precomputeProbabilities(n)
}
return proxier.precomputedProbabilities[n]
}
// Sync is called to synchronize the proxier state to nftables as soon as possible.
func (proxier *Proxier) Sync() {
if proxier.healthzServer != nil {
@@ -679,6 +645,7 @@ const (
serviceFirewallChainNamePrefix = "firewall-"
serviceExternalChainNamePrefix = "external-"
servicePortEndpointChainNamePrefix = "endpoint-"
servicePortEndpointAffinityNamePrefix = "affinity-"
)
// hashAndTruncate prefixes name with a hash of itself and then truncates to
@@ -772,6 +739,10 @@ func isServiceChainName(chainString string) bool {
return strings.Contains(chainString, "/")
}
func isAffinitySetName(set string) bool {
return strings.HasPrefix(set, servicePortEndpointAffinityNamePrefix)
}
// This is where all of the nftables calls happen.
// This assumes proxier.mu is NOT held
func (proxier *Proxier) syncProxyRules() {
@@ -842,16 +813,15 @@ func (proxier *Proxier) syncProxyRules() {
// We need to use, eg, "ip daddr" for IPv4 but "ip6 daddr" for IPv6
ipX := "ip"
ipvX_addr := "ipv4_addr" //nolint:stylecheck // var name intentionally resembles value
if proxier.ipFamily == v1.IPv6Protocol {
ipX = "ip6"
ipvX_addr = "ipv6_addr"
}
// Reset all buffers used later.
// This is to avoid memory reallocations and thus improve performance.
proxier.natRules.Reset()
// Accumulate service/endpoint chains to keep.
// Accumulate service/endpoint chains and affinity sets to keep.
activeChains := sets.New[string]()
activeAffinitySets := sets.New[string]()
// Compute total number of endpoint chains across all services
// to get a sense of how big the cluster is.
@@ -990,13 +960,14 @@ func (proxier *Proxier) syncProxyRules() {
// Capture the clusterIP.
if hasInternalEndpoints {
proxier.natRules.Write(
"-A", string(kubeServicesChain),
"-m", "comment", "--comment", fmt.Sprintf(`"%s cluster IP"`, svcPortNameString),
"-m", protocol, "-p", protocol,
"-d", svcInfo.ClusterIP().String(),
"--dport", strconv.Itoa(svcInfo.Port()),
"-j", string(internalTrafficChain))
tx.Add(&knftables.Rule{
Chain: kubeServicesChain,
Rule: knftables.Concat(
ipX, "daddr", svcInfo.ClusterIP(),
protocol, "dport", svcInfo.Port(),
"goto", internalTrafficChain,
),
})
} else {
// No endpoints.
tx.Add(&knftables.Rule{
@@ -1015,13 +986,14 @@ func (proxier *Proxier) syncProxyRules() {
if hasEndpoints {
// Send traffic bound for external IPs to the "external
// destinations" chain.
proxier.natRules.Write(
"-A", string(kubeServicesChain),
"-m", "comment", "--comment", fmt.Sprintf(`"%s external IP"`, svcPortNameString),
"-m", protocol, "-p", protocol,
"-d", externalIP,
"--dport", strconv.Itoa(svcInfo.Port()),
"-j", string(externalTrafficChain))
tx.Add(&knftables.Rule{
Chain: kubeServicesChain,
Rule: knftables.Concat(
ipX, "daddr", externalIP,
protocol, "dport", svcInfo.Port(),
"goto", externalTrafficChain,
),
})
}
if !hasExternalEndpoints {
// Either no endpoints at all (REJECT) or no endpoints for
@@ -1042,14 +1014,14 @@ func (proxier *Proxier) syncProxyRules() {
// Capture load-balancer ingress.
for _, lbip := range svcInfo.LoadBalancerVIPStrings() {
if hasEndpoints {
proxier.natRules.Write(
"-A", string(kubeServicesChain),
"-m", "comment", "--comment", fmt.Sprintf(`"%s loadbalancer IP"`, svcPortNameString),
"-m", protocol, "-p", protocol,
"-d", lbip,
"--dport", strconv.Itoa(svcInfo.Port()),
"-j", string(loadBalancerTrafficChain))
tx.Add(&knftables.Rule{
Chain: kubeServicesChain,
Rule: knftables.Concat(
ipX, "daddr", lbip,
protocol, "dport", svcInfo.Port(),
"goto", loadBalancerTrafficChain,
),
})
}
if usesFWChain {
comment := fmt.Sprintf("%s traffic not accepted by %s", svcPortNameString, svcInfo.firewallChainName)
@@ -1087,12 +1059,13 @@ func (proxier *Proxier) syncProxyRules() {
// Jump to the external destination chain. For better or for
// worse, nodeports are not subect to loadBalancerSourceRanges,
// and we can't change that.
proxier.natRules.Write(
"-A", string(kubeNodePortsChain),
"-m", "comment", "--comment", svcPortNameString,
"-m", protocol, "-p", protocol,
"--dport", strconv.Itoa(svcInfo.NodePort()),
"-j", string(externalTrafficChain))
tx.Add(&knftables.Rule{
Chain: kubeNodePortsChain,
Rule: knftables.Concat(
protocol, "dport", svcInfo.NodePort(),
"goto", externalTrafficChain,
),
})
}
if !hasExternalEndpoints {
// Either no endpoints at all (REJECT) or no endpoints for
@@ -1113,27 +1086,29 @@ func (proxier *Proxier) syncProxyRules() {
// Set up internal traffic handling.
if hasInternalEndpoints {
if proxier.masqueradeAll {
proxier.natRules.Write(
"-A", string(internalTrafficChain),
"-m", "comment", "--comment", fmt.Sprintf(`"%s cluster IP"`, svcPortNameString),
"-m", protocol, "-p", protocol,
"-d", svcInfo.ClusterIP().String(),
"--dport", strconv.Itoa(svcInfo.Port()),
"-j", string(kubeMarkMasqChain))
tx.Add(&knftables.Rule{
Chain: internalTrafficChain,
Rule: knftables.Concat(
ipX, "daddr", svcInfo.ClusterIP(),
protocol, "dport", svcInfo.Port(),
"jump", kubeMarkMasqChain,
),
})
} else if proxier.localDetector.IsImplemented() {
// This masquerades off-cluster traffic to a service VIP. The
// idea is that you can establish a static route for your
// Service range, routing to any node, and that node will
// bridge into the Service for you. Since that might bounce
// off-node, we masquerade here.
proxier.natRules.Write(
"-A", string(internalTrafficChain),
"-m", "comment", "--comment", fmt.Sprintf(`"%s cluster IP"`, svcPortNameString),
"-m", protocol, "-p", protocol,
"-d", svcInfo.ClusterIP().String(),
"--dport", strconv.Itoa(svcInfo.Port()),
proxier.localDetector.IfNotLocal(),
"-j", string(kubeMarkMasqChain))
tx.Add(&knftables.Rule{
Chain: internalTrafficChain,
Rule: knftables.Concat(
ipX, "daddr", svcInfo.ClusterIP(),
protocol, "dport", svcInfo.Port(),
proxier.localDetector.IfNotLocalNFT(),
"jump", kubeMarkMasqChain,
),
})
}
}
@@ -1145,10 +1120,12 @@ func (proxier *Proxier) syncProxyRules() {
if !svcInfo.ExternalPolicyLocal() {
// If we are using non-local endpoints we need to masquerade,
// in case we cross nodes.
proxier.natRules.Write(
"-A", string(externalTrafficChain),
"-m", "comment", "--comment", fmt.Sprintf(`"masquerade traffic for %s external destinations"`, svcPortNameString),
"-j", string(kubeMarkMasqChain))
tx.Add(&knftables.Rule{
Chain: externalTrafficChain,
Rule: knftables.Concat(
"jump", kubeMarkMasqChain,
),
})
} else {
// If we are only using same-node endpoints, we can retain the
// source IP in most cases.
@@ -1158,37 +1135,49 @@ func (proxier *Proxier) syncProxyRules() {
// traffic as a special-case. It is subject to neither
// form of traffic policy, which simulates going up-and-out
// to an external load-balancer and coming back in.
proxier.natRules.Write(
"-A", string(externalTrafficChain),
"-m", "comment", "--comment", fmt.Sprintf(`"pod traffic for %s external destinations"`, svcPortNameString),
proxier.localDetector.IfLocal(),
"-j", string(clusterPolicyChain))
tx.Add(&knftables.Rule{
Chain: externalTrafficChain,
Rule: knftables.Concat(
proxier.localDetector.IfLocalNFT(),
"goto", clusterPolicyChain,
),
Comment: ptr.To("short-circuit pod traffic"),
})
}
// Locally originated traffic (not a pod, but the host node)
// still needs masquerade because the LBIP itself is a local
// address, so that will be the chosen source IP.
proxier.natRules.Write(
"-A", string(externalTrafficChain),
"-m", "comment", "--comment", fmt.Sprintf(`"masquerade LOCAL traffic for %s external destinations"`, svcPortNameString),
"-m", "addrtype", "--src-type", "LOCAL",
"-j", string(kubeMarkMasqChain))
tx.Add(&knftables.Rule{
Chain: externalTrafficChain,
Rule: knftables.Concat(
"fib", "saddr", "type", "local",
"jump", kubeMarkMasqChain,
),
Comment: ptr.To("masquerade local traffic"),
})
// Redirect all src-type=LOCAL -> external destination to the
// policy=cluster chain. This allows traffic originating
// from the host to be redirected to the service correctly.
proxier.natRules.Write(
"-A", string(externalTrafficChain),
"-m", "comment", "--comment", fmt.Sprintf(`"route LOCAL traffic for %s external destinations"`, svcPortNameString),
"-m", "addrtype", "--src-type", "LOCAL",
"-j", string(clusterPolicyChain))
tx.Add(&knftables.Rule{
Chain: externalTrafficChain,
Rule: knftables.Concat(
"fib", "saddr", "type", "local",
"goto", clusterPolicyChain,
),
Comment: ptr.To("short-circuit local traffic"),
})
}
// Anything else falls thru to the appropriate policy chain.
if hasExternalEndpoints {
proxier.natRules.Write(
"-A", string(externalTrafficChain),
"-j", string(externalPolicyChain))
tx.Add(&knftables.Rule{
Chain: externalTrafficChain,
Rule: knftables.Concat(
"goto", externalPolicyChain,
),
})
}
}
@@ -1203,12 +1192,13 @@ func (proxier *Proxier) syncProxyRules() {
// firewall filter based on each source range
allowFromNode := false
for _, src := range svcInfo.LoadBalancerSourceRanges() {
proxier.natRules.Write(
"-A", string(fwChain),
"-m", "comment", "--comment", fmt.Sprintf(`"%s loadbalancer IP"`, svcPortNameString),
"-s", src,
"-j", string(externalTrafficChain),
)
tx.Add(&knftables.Rule{
Chain: fwChain,
Rule: knftables.Concat(
ipX, "saddr", src,
"goto", externalTrafficChain,
),
})
_, cidr, err := netutils.ParseCIDRSloppy(src)
if err != nil {
klog.ErrorS(err, "Error parsing CIDR in LoadBalancerSourceRanges, dropping it", "cidr", cidr)
@@ -1223,36 +1213,75 @@ func (proxier *Proxier) syncProxyRules() {
// need the following rules to allow requests from this node.
if allowFromNode {
for _, lbip := range svcInfo.LoadBalancerVIPStrings() {
proxier.natRules.Write(
"-A", string(fwChain),
"-m", "comment", "--comment", fmt.Sprintf(`"%s loadbalancer IP"`, svcPortNameString),
"-s", lbip,
"-j", string(externalTrafficChain),
)
tx.Add(&knftables.Rule{
Chain: fwChain,
Rule: knftables.Concat(
ipX, "saddr", lbip,
"goto", externalTrafficChain,
),
})
}
}
// If the packet was able to reach the end of firewall chain,
// then it did not get DNATed, so it will match the
// corresponding KUBE-PROXY-FIREWALL rule.
proxier.natRules.Write(
"-A", string(fwChain),
"-m", "comment", "--comment", fmt.Sprintf(`"other traffic to %s will be dropped by KUBE-PROXY-FIREWALL"`, svcPortNameString),
)
tx.Add(&knftables.Rule{
Chain: fwChain,
Rule: "continue",
Comment: ptr.To("other traffic will be dropped by firewall"),
})
}
// If Cluster policy is in use, create rules jumping from
// clusterPolicyChain to the clusterEndpoints
if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
// Generate the per-endpoint affinity sets
for _, ep := range allLocallyReachableEndpoints {
epInfo, ok := ep.(*endpointInfo)
if !ok {
klog.ErrorS(nil, "Failed to cast endpointsInfo", "endpointsInfo", ep)
continue
}
// Create a set to store current affinity mappings. As
// with the iptables backend, endpoint affinity is
// recorded for connections from a particular source IP
// (without regard to source port) to a particular
// ServicePort (without regard to which service IP was
// used to reach the service). This may be changed in the
// future.
tx.Add(&knftables.Set{
Name: epInfo.affinitySetName,
Type: ipvX_addr,
Flags: []knftables.SetFlag{
// The nft docs say "dynamic" is only
// needed for sets containing stateful
// objects (eg counters), but (at least on
// RHEL8) if we create the set without
// "dynamic", it later gets mutated to
// have it, and then the next attempt to
// tx.Add() it here fails because it looks
// like we're trying to change the flags.
knftables.DynamicFlag,
knftables.TimeoutFlag,
},
Timeout: ptr.To(time.Duration(svcInfo.StickyMaxAgeSeconds()) * time.Second),
})
activeAffinitySets.Insert(epInfo.affinitySetName)
}
}
// If Cluster policy is in use, create the chain and create rules jumping
// from clusterPolicyChain to the clusterEndpoints
if usesClusterPolicyChain {
proxier.writeServiceToEndpointRules(svcPortNameString, svcInfo, clusterPolicyChain, clusterEndpoints)
proxier.writeServiceToEndpointRules(tx, svcPortNameString, svcInfo, clusterPolicyChain, clusterEndpoints)
}
// If Local policy is in use, create rules jumping from localPolicyChain
// to the localEndpoints
if usesLocalPolicyChain {
proxier.writeServiceToEndpointRules(svcPortNameString, svcInfo, localPolicyChain, localEndpoints)
proxier.writeServiceToEndpointRules(tx, svcPortNameString, svcInfo, localPolicyChain, localEndpoints)
}
// Generate the per-endpoint chains.
// Generate the per-endpoint chains and affinity sets
for _, ep := range allLocallyReachableEndpoints {
epInfo, ok := ep.(*endpointInfo)
if !ok {
@@ -1263,44 +1292,56 @@ func (proxier *Proxier) syncProxyRules() {
endpointChain := epInfo.chainName
// Handle traffic that loops back to the originator with SNAT.
proxier.natRules.Write(
"-A", string(endpointChain),
"-m", "comment", "--comment", svcPortNameString,
"-s", epInfo.IP(),
"-j", string(kubeMarkMasqChain),
)
commentAndAffinityArgs := []string{"-m", "comment", "--comment", svcPortNameString}
tx.Add(&knftables.Rule{
Chain: endpointChain,
Rule: knftables.Concat(
ipX, "saddr", epInfo.IP(),
"jump", kubeMarkMasqChain,
),
})
// Handle session affinity
if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
commentAndAffinityArgs = append(commentAndAffinityArgs, "-m", "recent", "--name", string(endpointChain), "--set")
tx.Add(&knftables.Rule{
Chain: endpointChain,
Rule: knftables.Concat(
"update", "@", epInfo.affinitySetName,
"{", ipX, "saddr", "}",
),
})
}
// DNAT to final destination.
proxier.natRules.Write(
"-A", string(endpointChain),
commentAndAffinityArgs,
"-m", protocol, "-p", protocol,
"-j", "DNAT", "--to-destination", epInfo.String(),
)
tx.Add(&knftables.Rule{
Chain: endpointChain,
Rule: knftables.Concat(
"meta l4proto", protocol,
"dnat to", epInfo.String(),
),
})
}
}
// Finally, tail-call to the nodePorts chain. This needs to be after all
// other service portal rules.
if proxier.nodePortAddresses.MatchAll() {
isIPv6 := proxier.ipFamily == v1.IPv6Protocol
destinations := []string{"-m", "addrtype", "--dst-type", "LOCAL"}
// Block localhost nodePorts
if isIPv6 {
destinations = append(destinations, "!", "-d", "::1/128")
var noLocalhost string
if proxier.ipFamily == v1.IPv6Protocol {
noLocalhost = "ip6 daddr != ::1"
} else {
destinations = append(destinations, "!", "-d", "127.0.0.0/8")
noLocalhost = "ip daddr != 127.0.0.0/8"
}
proxier.natRules.Write(
"-A", string(kubeServicesChain),
"-m", "comment", "--comment", `"kubernetes service nodeports; NOTE: this must be the last rule in this chain"`,
destinations,
"-j", string(kubeNodePortsChain))
tx.Add(&knftables.Rule{
Chain: kubeServicesChain,
Rule: knftables.Concat(
"fib daddr type local",
noLocalhost,
"jump", kubeNodePortsChain,
),
Comment: ptr.To("kubernetes service nodeports; NOTE: this must be the last rule in this chain"),
})
} else {
nodeIPs, err := proxier.nodePortAddresses.GetNodeIPs(proxier.networkInterfacer)
if err != nil {
@@ -1313,11 +1354,14 @@ func (proxier *Proxier) syncProxyRules() {
}
// create nodeport rules for each IP one by one
proxier.natRules.Write(
"-A", string(kubeServicesChain),
"-m", "comment", "--comment", `"kubernetes service nodeports; NOTE: this must be the last rule in this chain"`,
"-d", ip.String(),
"-j", string(kubeNodePortsChain))
tx.Add(&knftables.Rule{
Chain: kubeServicesChain,
Rule: knftables.Concat(
ipX, "daddr", ip,
"jump", kubeNodePortsChain,
),
Comment: ptr.To("kubernetes service nodeports; NOTE: this must be the last rule in this chain"),
})
}
}
@@ -1341,13 +1385,24 @@ func (proxier *Proxier) syncProxyRules() {
klog.ErrorS(err, "Failed to list nftables chains: stale chains will not be deleted")
}
metrics.IptablesRulesTotal.WithLabelValues(string(utiliptables.TableNAT)).Set(float64(proxier.natRules.Lines()))
// OTOH, we can immediately delete any stale affinity sets
existingSets, err := proxier.nftables.List(context.TODO(), "sets")
if err == nil {
for _, set := range existingSets {
if isAffinitySetName(set) && !activeAffinitySets.Has(set) {
tx.Delete(&knftables.Set{
Name: set,
})
}
}
} else if !knftables.IsNotFound(err) {
klog.ErrorS(err, "Failed to list nftables sets: stale affinity sets will not be deleted")
}
// Sync rules.
klog.V(2).InfoS("Reloading service nftables data",
"numServices", len(proxier.svcPortMap),
"numEndpoints", totalEndpoints,
"numNATRules", proxier.natRules.Lines(),
)
// FIXME
@@ -1390,51 +1445,50 @@ func (proxier *Proxier) syncProxyRules() {
conntrack.CleanStaleEntries(proxier.ipFamily == v1.IPv6Protocol, proxier.exec, proxier.svcPortMap, serviceUpdateResult, endpointUpdateResult)
}
func (proxier *Proxier) writeServiceToEndpointRules(svcPortNameString string, svcInfo proxy.ServicePort, svcChain string, endpoints []proxy.Endpoint) {
func (proxier *Proxier) writeServiceToEndpointRules(tx *knftables.Transaction, svcPortNameString string, svcInfo *servicePortInfo, svcChain string, endpoints []proxy.Endpoint) {
// First write session affinity rules, if applicable.
if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
ipX := "ip"
if proxier.ipFamily == v1.IPv6Protocol {
ipX = "ip6"
}
for _, ep := range endpoints {
epInfo, ok := ep.(*endpointInfo)
if !ok {
continue
}
comment := fmt.Sprintf(`"%s -> %s"`, svcPortNameString, epInfo.String())
proxier.natRules.Write(
"-A", string(svcChain),
"-m", "comment", "--comment", comment,
"-m", "recent", "--name", string(epInfo.chainName),
"--rcheck", "--seconds", strconv.Itoa(svcInfo.StickyMaxAgeSeconds()), "--reap",
"-j", string(epInfo.chainName),
)
tx.Add(&knftables.Rule{
Chain: svcChain,
Rule: knftables.Concat(
ipX, "saddr", "@", epInfo.affinitySetName,
"goto", epInfo.chainName,
),
})
}
}
// Now write loadbalancing rules.
numEndpoints := len(endpoints)
// Now write loadbalancing rule
var elements []string
for i, ep := range endpoints {
epInfo, ok := ep.(*endpointInfo)
if !ok {
continue
}
comment := fmt.Sprintf(`"%s -> %s"`, svcPortNameString, epInfo.String())
if i < (numEndpoints - 1) {
// Each rule is a probabilistic match.
proxier.natRules.Write(
"-A", string(svcChain),
"-m", "comment", "--comment", comment,
"-m", "statistic",
"--mode", "random",
"--probability", proxier.probability(numEndpoints-i),
"-j", string(epInfo.chainName),
)
} else {
// The final (or only if n == 1) rule is a guaranteed match.
proxier.natRules.Write(
"-A", string(svcChain),
"-m", "comment", "--comment", comment,
"-j", string(epInfo.chainName),
)
elements = append(elements,
strconv.Itoa(i), ":", "goto", epInfo.chainName,
)
if i != len(endpoints)-1 {
elements = append(elements, ",")
}
}
tx.Add(&knftables.Rule{
Chain: svcChain,
Rule: knftables.Concat(
"numgen random mod", len(endpoints), "vmap",
"{", elements, "}",
),
})
}