pkg/proxy/nftables: handle traffic to cluster ip
NFTables proxy will now drop traffic directed towards unallocated ClusterIPs and reject traffic directed towards invalid ports of Cluster IPs. Signed-off-by: Daman Arora <aroradaman@gmail.com>
This commit is contained in:
@@ -921,7 +921,7 @@ func (s *ProxyServer) Run() error {
|
||||
options.LabelSelector = labelSelector.String()
|
||||
}))
|
||||
|
||||
// Create configs (i.e. Watches for Services and EndpointSlices)
|
||||
// Create configs (i.e. Watches for Services, EndpointSlices and ServiceCIDRs)
|
||||
// Note: RegisterHandler() calls need to happen before creation of Sources because sources
|
||||
// only notify on changes, and the initial update (on process start) may be lost if no handlers
|
||||
// are registered yet.
|
||||
@@ -933,6 +933,11 @@ func (s *ProxyServer) Run() error {
|
||||
endpointSliceConfig.RegisterEventHandler(s.Proxier)
|
||||
go endpointSliceConfig.Run(wait.NeverStop)
|
||||
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.MultiCIDRServiceAllocator) {
|
||||
serviceCIDRConfig := config.NewServiceCIDRConfig(informerFactory.Networking().V1alpha1().ServiceCIDRs(), s.Config.ConfigSyncPeriod.Duration)
|
||||
serviceCIDRConfig.RegisterEventHandler(s.Proxier)
|
||||
go serviceCIDRConfig.Run(wait.NeverStop)
|
||||
}
|
||||
// This has to start after the calls to NewServiceConfig because that
|
||||
// function must configure its shared informer event handlers first.
|
||||
informerFactory.Start(wait.NeverStop)
|
||||
|
@@ -18,13 +18,17 @@ package config
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
discoveryv1 "k8s.io/api/discovery/v1"
|
||||
networkingv1alpha1 "k8s.io/api/networking/v1alpha1"
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
v1informers "k8s.io/client-go/informers/core/v1"
|
||||
discoveryv1informers "k8s.io/client-go/informers/discovery/v1"
|
||||
networkingv1alpha1informers "k8s.io/client-go/informers/networking/v1alpha1"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/klog/v2"
|
||||
)
|
||||
@@ -371,3 +375,97 @@ func (c *NodeConfig) handleDeleteNode(obj interface{}) {
|
||||
c.eventHandlers[i].OnNodeDelete(node)
|
||||
}
|
||||
}
|
||||
|
||||
// ServiceCIDRHandler is an abstract interface of objects which receive
|
||||
// notifications about ServiceCIDR object changes.
|
||||
type ServiceCIDRHandler interface {
|
||||
// OnServiceCIDRsChanged is called whenever a change is observed
|
||||
// in any of the ServiceCIDRs, and provides complete list of service cidrs.
|
||||
OnServiceCIDRsChanged(cidrs []string)
|
||||
}
|
||||
|
||||
// ServiceCIDRConfig tracks a set of service configurations.
|
||||
type ServiceCIDRConfig struct {
|
||||
listerSynced cache.InformerSynced
|
||||
eventHandlers []ServiceCIDRHandler
|
||||
mu sync.Mutex
|
||||
cidrs sets.Set[string]
|
||||
}
|
||||
|
||||
// NewServiceCIDRConfig creates a new ServiceCIDRConfig.
|
||||
func NewServiceCIDRConfig(serviceCIDRInformer networkingv1alpha1informers.ServiceCIDRInformer, resyncPeriod time.Duration) *ServiceCIDRConfig {
|
||||
result := &ServiceCIDRConfig{
|
||||
listerSynced: serviceCIDRInformer.Informer().HasSynced,
|
||||
cidrs: sets.New[string](),
|
||||
}
|
||||
|
||||
_, _ = serviceCIDRInformer.Informer().AddEventHandlerWithResyncPeriod(
|
||||
cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj interface{}) {
|
||||
result.handleServiceCIDREvent(nil, obj)
|
||||
},
|
||||
UpdateFunc: func(oldObj, newObj interface{}) {
|
||||
result.handleServiceCIDREvent(oldObj, newObj)
|
||||
},
|
||||
DeleteFunc: func(obj interface{}) {
|
||||
result.handleServiceCIDREvent(obj, nil)
|
||||
},
|
||||
},
|
||||
resyncPeriod,
|
||||
)
|
||||
return result
|
||||
}
|
||||
|
||||
// RegisterEventHandler registers a handler which is called on every ServiceCIDR change.
|
||||
func (c *ServiceCIDRConfig) RegisterEventHandler(handler ServiceCIDRHandler) {
|
||||
c.eventHandlers = append(c.eventHandlers, handler)
|
||||
}
|
||||
|
||||
// Run waits for cache synced and invokes handlers after syncing.
|
||||
func (c *ServiceCIDRConfig) Run(stopCh <-chan struct{}) {
|
||||
klog.InfoS("Starting serviceCIDR config controller")
|
||||
|
||||
if !cache.WaitForNamedCacheSync("serviceCIDR config", stopCh, c.listerSynced) {
|
||||
return
|
||||
}
|
||||
c.handleServiceCIDREvent(nil, nil)
|
||||
}
|
||||
|
||||
// handleServiceCIDREvent is a helper function to handle Add, Update and Delete
|
||||
// events on ServiceCIDR objects and call downstream event handlers.
|
||||
func (c *ServiceCIDRConfig) handleServiceCIDREvent(oldObj, newObj interface{}) {
|
||||
var oldServiceCIDR, newServiceCIDR *networkingv1alpha1.ServiceCIDR
|
||||
var ok bool
|
||||
|
||||
if oldObj != nil {
|
||||
oldServiceCIDR, ok = oldObj.(*networkingv1alpha1.ServiceCIDR)
|
||||
if !ok {
|
||||
utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", oldObj))
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
if newObj != nil {
|
||||
newServiceCIDR, ok = newObj.(*networkingv1alpha1.ServiceCIDR)
|
||||
if !ok {
|
||||
utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", newObj))
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
if oldServiceCIDR != nil {
|
||||
c.cidrs.Delete(oldServiceCIDR.Spec.CIDRs...)
|
||||
}
|
||||
|
||||
if newServiceCIDR != nil {
|
||||
c.cidrs.Insert(newServiceCIDR.Spec.CIDRs...)
|
||||
}
|
||||
|
||||
for i := range c.eventHandlers {
|
||||
klog.V(4).InfoS("Calling handler.OnServiceCIDRsChanged")
|
||||
c.eventHandlers[i].OnServiceCIDRsChanged(c.cidrs.UnsortedList())
|
||||
}
|
||||
}
|
||||
|
@@ -673,6 +673,10 @@ func (proxier *Proxier) OnNodeDelete(node *v1.Node) {
|
||||
func (proxier *Proxier) OnNodeSynced() {
|
||||
}
|
||||
|
||||
// OnServiceCIDRsChanged is called whenever a change is observed
|
||||
// in any of the ServiceCIDRs, and provides complete list of service cidrs.
|
||||
func (proxier *Proxier) OnServiceCIDRsChanged(_ []string) {}
|
||||
|
||||
// portProtoHash takes the ServicePortName and protocol for a service
|
||||
// returns the associated 16 character hash. This is computed by hashing (sha256)
|
||||
// then encoding to base32 and truncating to 16 chars. We do this because IPTables
|
||||
|
@@ -892,6 +892,10 @@ func (proxier *Proxier) OnNodeDelete(node *v1.Node) {
|
||||
func (proxier *Proxier) OnNodeSynced() {
|
||||
}
|
||||
|
||||
// OnServiceCIDRsChanged is called whenever a change is observed
|
||||
// in any of the ServiceCIDRs, and provides complete list of service cidrs.
|
||||
func (proxier *Proxier) OnServiceCIDRsChanged(_ []string) {}
|
||||
|
||||
// This is where all of the ipvs calls happen.
|
||||
func (proxier *Proxier) syncProxyRules() {
|
||||
proxier.mu.Lock()
|
||||
|
@@ -53,6 +53,7 @@ func (*FakeProxier) OnEndpointSliceAdd(slice *discoveryv1.EndpointSlice)
|
||||
func (*FakeProxier) OnEndpointSliceUpdate(oldSlice, slice *discoveryv1.EndpointSlice) {}
|
||||
func (*FakeProxier) OnEndpointSliceDelete(slice *discoveryv1.EndpointSlice) {}
|
||||
func (*FakeProxier) OnEndpointSlicesSynced() {}
|
||||
func (*FakeProxier) OnServiceCIDRsChanged(_ []string) {}
|
||||
|
||||
func NewHollowProxy(
|
||||
nodeName string,
|
||||
|
@@ -158,3 +158,10 @@ func (proxier *metaProxier) OnNodeSynced() {
|
||||
proxier.ipv4Proxier.OnNodeSynced()
|
||||
proxier.ipv6Proxier.OnNodeSynced()
|
||||
}
|
||||
|
||||
// OnServiceCIDRsChanged is called whenever a change is observed
|
||||
// in any of the ServiceCIDRs, and provides complete list of service cidrs.
|
||||
func (proxier *metaProxier) OnServiceCIDRsChanged(cidrs []string) {
|
||||
proxier.ipv4Proxier.OnServiceCIDRsChanged(cidrs)
|
||||
proxier.ipv6Proxier.OnServiceCIDRsChanged(cidrs)
|
||||
}
|
||||
|
@@ -51,7 +51,7 @@ the forward path.
|
||||
|
||||
## kube-proxy's use of nftables hooks
|
||||
|
||||
Kube-proxy uses nftables for four things:
|
||||
Kube-proxy uses nftables for seven things:
|
||||
|
||||
- Using DNAT to rewrite traffic from service IPs (cluster IPs, external IPs, load balancer
|
||||
IP, and NodePorts on node IPs) to the corresponding endpoint IPs.
|
||||
@@ -65,6 +65,10 @@ Kube-proxy uses nftables for four things:
|
||||
|
||||
- Rejecting packets for services with no local or remote endpoints.
|
||||
|
||||
- Dropping packets to ClusterIPs which are not yet allocated.
|
||||
|
||||
- Rejecting packets to undefined ports of ClusterIPs.
|
||||
|
||||
This is implemented as follows:
|
||||
|
||||
- We do the DNAT for inbound traffic in `prerouting`: this covers traffic coming from
|
||||
@@ -101,3 +105,9 @@ This is implemented as follows:
|
||||
network IP, because masquerading is about ensuring that the packet eventually gets
|
||||
routed back to the host network namespace on this node, so if it's never getting
|
||||
routed away from there, there's nothing to do.)
|
||||
|
||||
- We install a `reject` rule for ClusterIPs matching `@cluster-ips` set and a `drop`
|
||||
rule for ClusterIPs belonging to any of the ServiceCIDRs in `forward` and `output` hook, with a
|
||||
higher (i.e. less urgent) priority than the DNAT chains making sure all valid
|
||||
traffic directed for ClusterIPs is already DNATed. Drop rule will only
|
||||
be installed if `MultiCIDRServiceAllocator` feature is enabled.
|
@@ -22,6 +22,7 @@ package nftables
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net"
|
||||
"regexp"
|
||||
"runtime"
|
||||
"sort"
|
||||
@@ -198,11 +199,13 @@ func (tracer *nftablesTracer) addressMatches(ipStr string, wantMatch bool, ruleA
|
||||
return match == wantMatch
|
||||
}
|
||||
|
||||
func (tracer *nftablesTracer) noneAddressesMatch(ipStr, ruleAddress string) bool {
|
||||
func (tracer *nftablesTracer) addressMatchesSet(ipStr string, wantMatch bool, ruleAddress string) bool {
|
||||
ruleAddress = strings.ReplaceAll(ruleAddress, " ", "")
|
||||
addresses := strings.Split(ruleAddress, ",")
|
||||
var match bool
|
||||
for _, address := range addresses {
|
||||
if tracer.addressMatches(ipStr, true, address) {
|
||||
match = tracer.addressMatches(ipStr, true, address)
|
||||
if match != wantMatch {
|
||||
return false
|
||||
}
|
||||
}
|
||||
@@ -266,7 +269,7 @@ func (tracer *nftablesTracer) matchDestPort(elements []*knftables.Element, proto
|
||||
// match verdictRegexp.
|
||||
|
||||
var destAddrRegexp = regexp.MustCompile(`^ip6* daddr (!= )?(\S+)`)
|
||||
var destAddrLookupRegexp = regexp.MustCompile(`^ip6* daddr != \{([^}]*)\}`)
|
||||
var destAddrLookupRegexp = regexp.MustCompile(`^ip6* daddr (!= )?\{([^}]*)\}`)
|
||||
var destAddrLocalRegexp = regexp.MustCompile(`^fib daddr type local`)
|
||||
var destPortRegexp = regexp.MustCompile(`^(tcp|udp|sctp) dport (\d+)`)
|
||||
var destIPOnlyLookupRegexp = regexp.MustCompile(`^ip6* daddr @(\S+)`)
|
||||
@@ -278,7 +281,7 @@ var destDispatchRegexp = regexp.MustCompile(`^ip6* daddr \. meta l4proto \. th d
|
||||
var destPortDispatchRegexp = regexp.MustCompile(`^meta l4proto \. th dport vmap @(\S+)$`)
|
||||
|
||||
var sourceAddrRegexp = regexp.MustCompile(`^ip6* saddr (!= )?(\S+)`)
|
||||
var sourceAddrLookupRegexp = regexp.MustCompile(`^ip6* saddr != \{([^}]*)\}`)
|
||||
var sourceAddrLookupRegexp = regexp.MustCompile(`^ip6* saddr (!= )?\{([^}]*)\}`)
|
||||
var sourceAddrLocalRegexp = regexp.MustCompile(`^fib saddr type local`)
|
||||
|
||||
var endpointVMAPRegexp = regexp.MustCompile(`^numgen random mod \d+ vmap \{(.*)\}$`)
|
||||
@@ -398,11 +401,12 @@ func (tracer *nftablesTracer) runChain(chname, sourceIP, protocol, destIP, destP
|
||||
}
|
||||
|
||||
case destAddrLookupRegexp.MatchString(rule):
|
||||
// `^ip6* daddr != \{([^}]*)\}`
|
||||
// `^ip6* daddr (!= )?\{([^}]*)\}`
|
||||
// Tests whether destIP doesn't match an anonymous set.
|
||||
match := destAddrLookupRegexp.FindStringSubmatch(rule)
|
||||
rule = strings.TrimPrefix(rule, match[0])
|
||||
if !tracer.noneAddressesMatch(destIP, match[1]) {
|
||||
wantMatch, set := match[1] != "!= ", match[2]
|
||||
if !tracer.addressMatchesSet(destIP, wantMatch, set) {
|
||||
rule = ""
|
||||
break
|
||||
}
|
||||
@@ -440,11 +444,12 @@ func (tracer *nftablesTracer) runChain(chname, sourceIP, protocol, destIP, destP
|
||||
}
|
||||
|
||||
case sourceAddrLookupRegexp.MatchString(rule):
|
||||
// `^ip6* saddr != \{([^}]*)\}`
|
||||
// `^ip6* saddr (!= )?\{([^}]*)\}`
|
||||
// Tests whether sourceIP doesn't match an anonymous set.
|
||||
match := sourceAddrLookupRegexp.FindStringSubmatch(rule)
|
||||
rule = strings.TrimPrefix(rule, match[0])
|
||||
if !tracer.noneAddressesMatch(sourceIP, match[1]) {
|
||||
wantMatch, set := match[1] != "!= ", match[2]
|
||||
if !tracer.addressMatchesSet(sourceIP, wantMatch, set) {
|
||||
rule = ""
|
||||
break
|
||||
}
|
||||
@@ -565,6 +570,7 @@ func (tracer *nftablesTracer) runChain(chname, sourceIP, protocol, destIP, destP
|
||||
// destinations (a comma-separated list of IPs, or one of the special targets "ACCEPT",
|
||||
// "DROP", or "REJECT"), and whether the packet would be masqueraded.
|
||||
func tracePacket(t *testing.T, nft *knftables.Fake, sourceIP, protocol, destIP, destPort string, nodeIPs []string) ([]string, string, bool) {
|
||||
var err error
|
||||
tracer := newNFTablesTracer(t, nft, nodeIPs)
|
||||
|
||||
// filter-prerouting goes first, then nat-prerouting if not terminated.
|
||||
@@ -575,7 +581,10 @@ func tracePacket(t *testing.T, nft *knftables.Fake, sourceIP, protocol, destIP,
|
||||
// After the prerouting rules run, pending DNATs are processed (which would affect
|
||||
// the destination IP that later rules match against).
|
||||
if len(tracer.outputs) != 0 {
|
||||
destIP = strings.Split(tracer.outputs[0], ":")[0]
|
||||
destIP, _, err = net.SplitHostPort(tracer.outputs[0])
|
||||
if err != nil {
|
||||
t.Errorf("failed to parse host port '%s': %s", tracer.outputs[0], err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
// Run filter-forward, skip filter-input as it ought to be fully redundant with the filter-forward chain.
|
||||
|
@@ -55,6 +55,7 @@ import (
|
||||
proxyutiliptables "k8s.io/kubernetes/pkg/proxy/util/iptables"
|
||||
"k8s.io/kubernetes/pkg/util/async"
|
||||
utilexec "k8s.io/utils/exec"
|
||||
netutils "k8s.io/utils/net"
|
||||
"k8s.io/utils/ptr"
|
||||
)
|
||||
|
||||
@@ -63,6 +64,16 @@ const (
|
||||
// so they don't need any "kube-" or "kube-proxy-" prefix of their own.
|
||||
kubeProxyTable = "kube-proxy"
|
||||
|
||||
// base chains
|
||||
filterPreroutingChain = "filter-prerouting"
|
||||
filterInputChain = "filter-input"
|
||||
filterForwardChain = "filter-forward"
|
||||
filterOutputChain = "filter-output"
|
||||
filterOutputPostDNATChain = "filter-output-post-dnat"
|
||||
natPreroutingChain = "nat-prerouting"
|
||||
natOutputChain = "nat-output"
|
||||
natPostroutingChain = "nat-postrouting"
|
||||
|
||||
// service dispatch
|
||||
servicesChain = "services"
|
||||
serviceIPsMap = "service-ips"
|
||||
@@ -71,12 +82,18 @@ const (
|
||||
// set of IPs that accept NodePort traffic
|
||||
nodePortIPsSet = "nodeport-ips"
|
||||
|
||||
// set of active ClusterIPs.
|
||||
clusterIPsSet = "cluster-ips"
|
||||
|
||||
// handling for services with no endpoints
|
||||
endpointsCheckChain = "endpoints-check"
|
||||
noEndpointServicesMap = "no-endpoint-services"
|
||||
noEndpointNodePortsMap = "no-endpoint-nodeports"
|
||||
rejectChain = "reject-chain"
|
||||
|
||||
// handling traffic to unallocated ClusterIPs and undefined ports of ClusterIPs
|
||||
clusterIPsCheckChain = "cluster-ips-check"
|
||||
|
||||
// LoadBalancerSourceRanges handling
|
||||
firewallIPsMap = "firewall-ips"
|
||||
firewallCheckChain = "firewall-check"
|
||||
@@ -179,6 +196,10 @@ type Proxier struct {
|
||||
|
||||
// staleChains contains information about chains to be deleted later
|
||||
staleChains map[string]time.Time
|
||||
|
||||
// serviceCIDRs is a comma separated list of ServiceCIDRs belonging to the IPFamily
|
||||
// which proxier is operating on, can be directly consumed by knftables.
|
||||
serviceCIDRs string
|
||||
}
|
||||
|
||||
// Proxier implements proxy.Provider
|
||||
@@ -309,13 +330,14 @@ type nftablesBaseChain struct {
|
||||
var nftablesBaseChains = []nftablesBaseChain{
|
||||
// We want our filtering rules to operate on pre-DNAT dest IPs, so our filter
|
||||
// chains have to run before DNAT.
|
||||
{"filter-prerouting", knftables.FilterType, knftables.PreroutingHook, knftables.DNATPriority + "-10"},
|
||||
{"filter-input", knftables.FilterType, knftables.InputHook, knftables.DNATPriority + "-10"},
|
||||
{"filter-forward", knftables.FilterType, knftables.ForwardHook, knftables.DNATPriority + "-10"},
|
||||
{"filter-output", knftables.FilterType, knftables.OutputHook, knftables.DNATPriority + "-10"},
|
||||
{"nat-prerouting", knftables.NATType, knftables.PreroutingHook, knftables.DNATPriority},
|
||||
{"nat-output", knftables.NATType, knftables.OutputHook, knftables.DNATPriority},
|
||||
{"nat-postrouting", knftables.NATType, knftables.PostroutingHook, knftables.SNATPriority},
|
||||
{filterPreroutingChain, knftables.FilterType, knftables.PreroutingHook, knftables.DNATPriority + "-10"},
|
||||
{filterInputChain, knftables.FilterType, knftables.InputHook, knftables.DNATPriority + "-10"},
|
||||
{filterForwardChain, knftables.FilterType, knftables.ForwardHook, knftables.DNATPriority + "-10"},
|
||||
{filterOutputChain, knftables.FilterType, knftables.OutputHook, knftables.DNATPriority + "-10"},
|
||||
{filterOutputPostDNATChain, knftables.FilterType, knftables.OutputHook, knftables.DNATPriority + "+10"},
|
||||
{natPreroutingChain, knftables.NATType, knftables.PreroutingHook, knftables.DNATPriority},
|
||||
{natOutputChain, knftables.NATType, knftables.OutputHook, knftables.DNATPriority},
|
||||
{natPostroutingChain, knftables.NATType, knftables.PostroutingHook, knftables.SNATPriority},
|
||||
}
|
||||
|
||||
// nftablesJumpChains lists our top-level "regular chains" that are jumped to directly
|
||||
@@ -331,16 +353,19 @@ var nftablesJumpChains = []nftablesJumpChain{
|
||||
// We can't jump to endpointsCheckChain from filter-prerouting like
|
||||
// firewallCheckChain because reject action is only valid in chains using the
|
||||
// input, forward or output hooks.
|
||||
{endpointsCheckChain, "filter-input", "ct state new"},
|
||||
{endpointsCheckChain, "filter-forward", "ct state new"},
|
||||
{endpointsCheckChain, "filter-output", "ct state new"},
|
||||
{endpointsCheckChain, filterInputChain, "ct state new"},
|
||||
{endpointsCheckChain, filterForwardChain, "ct state new"},
|
||||
{endpointsCheckChain, filterOutputChain, "ct state new"},
|
||||
|
||||
{firewallCheckChain, "filter-prerouting", "ct state new"},
|
||||
{firewallCheckChain, "filter-output", "ct state new"},
|
||||
{firewallCheckChain, filterPreroutingChain, "ct state new"},
|
||||
{firewallCheckChain, filterOutputChain, "ct state new"},
|
||||
|
||||
{servicesChain, "nat-output", ""},
|
||||
{servicesChain, "nat-prerouting", ""},
|
||||
{masqueradingChain, "nat-postrouting", ""},
|
||||
{servicesChain, natOutputChain, ""},
|
||||
{servicesChain, natPreroutingChain, ""},
|
||||
{masqueradingChain, natPostroutingChain, ""},
|
||||
|
||||
{clusterIPsCheckChain, filterForwardChain, "ct state new"},
|
||||
{clusterIPsCheckChain, filterOutputPostDNATChain, "ct state new"},
|
||||
}
|
||||
|
||||
// ensureChain adds commands to tx to ensure that chain exists and doesn't contain
|
||||
@@ -399,7 +424,7 @@ func (proxier *Proxier) setupNFTables(tx *knftables.Transaction) {
|
||||
}
|
||||
|
||||
// Ensure all of our other "top-level" chains exist
|
||||
for _, chain := range []string{servicesChain, masqueradingChain, markMasqChain} {
|
||||
for _, chain := range []string{servicesChain, clusterIPsCheckChain, masqueradingChain, markMasqChain} {
|
||||
ensureChain(chain, tx, createdChains)
|
||||
}
|
||||
|
||||
@@ -429,6 +454,34 @@ func (proxier *Proxier) setupNFTables(tx *knftables.Transaction) {
|
||||
Rule: "masquerade fully-random",
|
||||
})
|
||||
|
||||
// add cluster-ips set.
|
||||
tx.Add(&knftables.Set{
|
||||
Name: clusterIPsSet,
|
||||
Type: ipvX_addr,
|
||||
Comment: ptr.To("Active ClusterIPs"),
|
||||
})
|
||||
|
||||
// reject traffic to invalid ports of ClusterIPs.
|
||||
tx.Add(&knftables.Rule{
|
||||
Chain: clusterIPsCheckChain,
|
||||
Rule: knftables.Concat(
|
||||
ipX, "daddr", "@", clusterIPsSet, "reject",
|
||||
),
|
||||
Comment: ptr.To("Reject traffic to invalid ports of ClusterIPs"),
|
||||
})
|
||||
|
||||
// drop traffic to unallocated ClusterIPs.
|
||||
if len(proxier.serviceCIDRs) > 0 {
|
||||
tx.Add(&knftables.Rule{
|
||||
Chain: clusterIPsCheckChain,
|
||||
Rule: knftables.Concat(
|
||||
ipX, "daddr", "{", proxier.serviceCIDRs, "}",
|
||||
"drop",
|
||||
),
|
||||
Comment: ptr.To("Drop traffic to unallocated ClusterIPs"),
|
||||
})
|
||||
}
|
||||
|
||||
// Fill in nodeport-ips set if needed (or delete it if not). (We do "add+delete"
|
||||
// rather than just "delete" when we want to ensure the set doesn't exist, because
|
||||
// doing just "delete" would return an error if the set didn't exist.)
|
||||
@@ -764,6 +817,26 @@ func (proxier *Proxier) OnNodeDelete(node *v1.Node) {
|
||||
func (proxier *Proxier) OnNodeSynced() {
|
||||
}
|
||||
|
||||
// OnServiceCIDRsChanged is called whenever a change is observed
|
||||
// in any of the ServiceCIDRs, and provides complete list of service cidrs.
|
||||
func (proxier *Proxier) OnServiceCIDRsChanged(cidrs []string) {
|
||||
proxier.mu.Lock()
|
||||
defer proxier.mu.Unlock()
|
||||
|
||||
cidrsForProxier := make([]string, 0)
|
||||
for _, cidr := range cidrs {
|
||||
isIPv4CIDR := netutils.IsIPv4CIDRString(cidr)
|
||||
if proxier.ipFamily == v1.IPv4Protocol && isIPv4CIDR {
|
||||
cidrsForProxier = append(cidrsForProxier, cidr)
|
||||
}
|
||||
|
||||
if proxier.ipFamily == v1.IPv6Protocol && !isIPv4CIDR {
|
||||
cidrsForProxier = append(cidrsForProxier, cidr)
|
||||
}
|
||||
}
|
||||
proxier.serviceCIDRs = strings.Join(cidrsForProxier, ",")
|
||||
}
|
||||
|
||||
const (
|
||||
// Maximum length for one of our chain name prefixes, including the trailing
|
||||
// hyphen.
|
||||
@@ -955,6 +1028,9 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
}
|
||||
|
||||
// We currently fully-rebuild our sets and maps on each resync
|
||||
tx.Flush(&knftables.Set{
|
||||
Name: clusterIPsSet,
|
||||
})
|
||||
tx.Flush(&knftables.Map{
|
||||
Name: firewallIPsMap,
|
||||
})
|
||||
@@ -1095,6 +1171,10 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
}
|
||||
|
||||
// Capture the clusterIP.
|
||||
tx.Add(&knftables.Element{
|
||||
Set: clusterIPsSet,
|
||||
Key: []string{svcInfo.ClusterIP().String()},
|
||||
})
|
||||
if hasInternalEndpoints {
|
||||
tx.Add(&knftables.Element{
|
||||
Map: serviceIPsMap,
|
||||
|
@@ -30,6 +30,8 @@ import (
|
||||
|
||||
"github.com/danwinship/knftables"
|
||||
"github.com/lithammer/dedent"
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
discovery "k8s.io/api/discovery/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
@@ -43,9 +45,8 @@ import (
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
"k8s.io/kubernetes/pkg/proxy"
|
||||
"k8s.io/kubernetes/pkg/proxy/conntrack"
|
||||
"k8s.io/kubernetes/pkg/proxy/metrics"
|
||||
|
||||
"k8s.io/kubernetes/pkg/proxy/healthcheck"
|
||||
"k8s.io/kubernetes/pkg/proxy/metrics"
|
||||
proxyutil "k8s.io/kubernetes/pkg/proxy/util"
|
||||
proxyutiliptables "k8s.io/kubernetes/pkg/proxy/util/iptables"
|
||||
proxyutiltest "k8s.io/kubernetes/pkg/proxy/util/testing"
|
||||
@@ -284,9 +285,11 @@ func NewFakeProxier(ipFamily v1.IPFamily) (*knftables.Fake, *Proxier) {
|
||||
// invocation into a Run() method.
|
||||
nftablesFamily := knftables.IPv4Family
|
||||
podCIDR := "10.0.0.0/8"
|
||||
serviceCIDRs := "172.30.0.0/16"
|
||||
if ipFamily == v1.IPv6Protocol {
|
||||
nftablesFamily = knftables.IPv6Family
|
||||
podCIDR = "fd00:10::/64"
|
||||
serviceCIDRs = "fd00:10:96::/112"
|
||||
}
|
||||
detectLocal, _ := proxyutiliptables.NewDetectLocalByCIDR(podCIDR)
|
||||
|
||||
@@ -325,6 +328,7 @@ func NewFakeProxier(ipFamily v1.IPFamily) (*knftables.Fake, *Proxier) {
|
||||
nodePortAddresses: proxyutil.NewNodePortAddresses(ipFamily, nil),
|
||||
networkInterfacer: networkInterfacer,
|
||||
staleChains: make(map[string]time.Time),
|
||||
serviceCIDRs: serviceCIDRs,
|
||||
}
|
||||
p.setInitialized(true)
|
||||
p.syncRunner = async.NewBoundedFrequencyRunner("test-sync-runner", p.syncProxyRules, 0, time.Minute, 1)
|
||||
@@ -510,11 +514,14 @@ func TestOverallNFTablesRules(t *testing.T) {
|
||||
add rule ip kube-proxy filter-prerouting ct state new jump firewall-check
|
||||
add chain ip kube-proxy filter-forward { type filter hook forward priority -110 ; }
|
||||
add rule ip kube-proxy filter-forward ct state new jump endpoints-check
|
||||
add rule ip kube-proxy filter-forward ct state new jump cluster-ips-check
|
||||
add chain ip kube-proxy filter-input { type filter hook input priority -110 ; }
|
||||
add rule ip kube-proxy filter-input ct state new jump endpoints-check
|
||||
add chain ip kube-proxy filter-output { type filter hook output priority -110 ; }
|
||||
add rule ip kube-proxy filter-output ct state new jump endpoints-check
|
||||
add rule ip kube-proxy filter-output ct state new jump firewall-check
|
||||
add chain ip kube-proxy filter-output-post-dnat { type filter hook output priority -90 ; }
|
||||
add rule ip kube-proxy filter-output-post-dnat ct state new jump cluster-ips-check
|
||||
add chain ip kube-proxy nat-output { type nat hook output priority -100 ; }
|
||||
add rule ip kube-proxy nat-output jump services
|
||||
add chain ip kube-proxy nat-postrouting { type nat hook postrouting priority 100 ; }
|
||||
@@ -522,6 +529,11 @@ func TestOverallNFTablesRules(t *testing.T) {
|
||||
add chain ip kube-proxy nat-prerouting { type nat hook prerouting priority -100 ; }
|
||||
add rule ip kube-proxy nat-prerouting jump services
|
||||
|
||||
add set ip kube-proxy cluster-ips { type ipv4_addr ; comment "Active ClusterIPs" ; }
|
||||
add chain ip kube-proxy cluster-ips-check
|
||||
add rule ip kube-proxy cluster-ips-check ip daddr @cluster-ips reject comment "Reject traffic to invalid ports of ClusterIPs"
|
||||
add rule ip kube-proxy cluster-ips-check ip daddr { 172.30.0.0/16 } drop comment "Drop traffic to unallocated ClusterIPs"
|
||||
|
||||
add map ip kube-proxy firewall-ips { type ipv4_addr . inet_proto . inet_service : verdict ; comment "destinations that are subject to LoadBalancerSourceRanges" ; }
|
||||
add chain ip kube-proxy firewall-check
|
||||
add rule ip kube-proxy firewall-check ip daddr . meta l4proto . th dport vmap @firewall-ips
|
||||
@@ -550,6 +562,7 @@ func TestOverallNFTablesRules(t *testing.T) {
|
||||
add rule ip kube-proxy endpoint-5OJB2KTY-ns1/svc1/tcp/p80__10.180.0.1/80 ip saddr 10.180.0.1 jump mark-for-masquerade
|
||||
add rule ip kube-proxy endpoint-5OJB2KTY-ns1/svc1/tcp/p80__10.180.0.1/80 meta l4proto tcp dnat to 10.180.0.1:80
|
||||
|
||||
add element ip kube-proxy cluster-ips { 172.30.0.41 }
|
||||
add element ip kube-proxy service-ips { 172.30.0.41 . tcp . 80 : goto service-ULMVA6XW-ns1/svc1/tcp/p80 }
|
||||
|
||||
# svc2
|
||||
@@ -564,6 +577,7 @@ func TestOverallNFTablesRules(t *testing.T) {
|
||||
add rule ip kube-proxy endpoint-SGOXE6O3-ns2/svc2/tcp/p80__10.180.0.2/80 ip saddr 10.180.0.2 jump mark-for-masquerade
|
||||
add rule ip kube-proxy endpoint-SGOXE6O3-ns2/svc2/tcp/p80__10.180.0.2/80 meta l4proto tcp dnat to 10.180.0.2:80
|
||||
|
||||
add element ip kube-proxy cluster-ips { 172.30.0.42 }
|
||||
add element ip kube-proxy service-ips { 172.30.0.42 . tcp . 80 : goto service-42NFTM6N-ns2/svc2/tcp/p80 }
|
||||
add element ip kube-proxy service-ips { 192.168.99.22 . tcp . 80 : goto external-42NFTM6N-ns2/svc2/tcp/p80 }
|
||||
add element ip kube-proxy service-ips { 1.2.3.4 . tcp . 80 : goto external-42NFTM6N-ns2/svc2/tcp/p80 }
|
||||
@@ -584,6 +598,7 @@ func TestOverallNFTablesRules(t *testing.T) {
|
||||
add rule ip kube-proxy endpoint-UEIP74TE-ns3/svc3/tcp/p80__10.180.0.3/80 ip saddr 10.180.0.3 jump mark-for-masquerade
|
||||
add rule ip kube-proxy endpoint-UEIP74TE-ns3/svc3/tcp/p80__10.180.0.3/80 meta l4proto tcp dnat to 10.180.0.3:80
|
||||
|
||||
add element ip kube-proxy cluster-ips { 172.30.0.43 }
|
||||
add element ip kube-proxy service-ips { 172.30.0.43 . tcp . 80 : goto service-4AT6LBPK-ns3/svc3/tcp/p80 }
|
||||
add element ip kube-proxy service-nodeports { tcp . 3003 : goto external-4AT6LBPK-ns3/svc3/tcp/p80 }
|
||||
|
||||
@@ -601,6 +616,7 @@ func TestOverallNFTablesRules(t *testing.T) {
|
||||
add rule ip kube-proxy endpoint-UNZV3OEC-ns4/svc4/tcp/p80__10.180.0.4/80 ip saddr 10.180.0.4 jump mark-for-masquerade
|
||||
add rule ip kube-proxy endpoint-UNZV3OEC-ns4/svc4/tcp/p80__10.180.0.4/80 meta l4proto tcp dnat to 10.180.0.4:80
|
||||
|
||||
add element ip kube-proxy cluster-ips { 172.30.0.44 }
|
||||
add element ip kube-proxy service-ips { 172.30.0.44 . tcp . 80 : goto service-LAUZTJTB-ns4/svc4/tcp/p80 }
|
||||
add element ip kube-proxy service-ips { 192.168.99.33 . tcp . 80 : goto external-LAUZTJTB-ns4/svc4/tcp/p80 }
|
||||
|
||||
@@ -622,12 +638,14 @@ func TestOverallNFTablesRules(t *testing.T) {
|
||||
add chain ip kube-proxy firewall-HVFWP5L3-ns5/svc5/tcp/p80
|
||||
add rule ip kube-proxy firewall-HVFWP5L3-ns5/svc5/tcp/p80 ip saddr != { 203.0.113.0/25 } drop
|
||||
|
||||
add element ip kube-proxy cluster-ips { 172.30.0.45 }
|
||||
add element ip kube-proxy service-ips { 172.30.0.45 . tcp . 80 : goto service-HVFWP5L3-ns5/svc5/tcp/p80 }
|
||||
add element ip kube-proxy service-ips { 5.6.7.8 . tcp . 80 : goto external-HVFWP5L3-ns5/svc5/tcp/p80 }
|
||||
add element ip kube-proxy service-nodeports { tcp . 3002 : goto external-HVFWP5L3-ns5/svc5/tcp/p80 }
|
||||
add element ip kube-proxy firewall-ips { 5.6.7.8 . tcp . 80 comment "ns5/svc5:p80" : goto firewall-HVFWP5L3-ns5/svc5/tcp/p80 }
|
||||
|
||||
# svc6
|
||||
add element ip kube-proxy cluster-ips { 172.30.0.46 }
|
||||
add element ip kube-proxy no-endpoint-services { 172.30.0.46 . tcp . 80 comment "ns6/svc6:p80" : goto reject-chain }
|
||||
`)
|
||||
|
||||
@@ -869,14 +887,21 @@ func TestClusterIPGeneral(t *testing.T) {
|
||||
protocol: v1.ProtocolUDP,
|
||||
destIP: "172.30.0.42",
|
||||
destPort: 80,
|
||||
output: "",
|
||||
output: "REJECT",
|
||||
},
|
||||
{
|
||||
name: "svc1 does not accept svc2's ports",
|
||||
sourceIP: "10.180.0.2",
|
||||
destIP: "172.30.0.41",
|
||||
destPort: 443,
|
||||
output: "",
|
||||
output: "REJECT",
|
||||
},
|
||||
{
|
||||
name: "packet to unallocated cluster ip",
|
||||
sourceIP: "10.180.0.2",
|
||||
destIP: "172.30.0.50",
|
||||
destPort: 80,
|
||||
output: "DROP",
|
||||
},
|
||||
})
|
||||
}
|
||||
@@ -4242,11 +4267,13 @@ func TestSyncProxyRulesRepeated(t *testing.T) {
|
||||
baseRules := dedent.Dedent(`
|
||||
add table ip kube-proxy { comment "rules for kube-proxy" ; }
|
||||
|
||||
add chain ip kube-proxy cluster-ips-check
|
||||
add chain ip kube-proxy endpoints-check
|
||||
add chain ip kube-proxy filter-prerouting { type filter hook prerouting priority -110 ; }
|
||||
add chain ip kube-proxy filter-forward { type filter hook forward priority -110 ; }
|
||||
add chain ip kube-proxy filter-input { type filter hook input priority -110 ; }
|
||||
add chain ip kube-proxy filter-output { type filter hook output priority -110 ; }
|
||||
add chain ip kube-proxy filter-output-post-dnat { type filter hook output priority -90 ; }
|
||||
add chain ip kube-proxy firewall-check
|
||||
add chain ip kube-proxy mark-for-masquerade
|
||||
add chain ip kube-proxy masquerading
|
||||
@@ -4256,13 +4283,17 @@ func TestSyncProxyRulesRepeated(t *testing.T) {
|
||||
add chain ip kube-proxy reject-chain { comment "helper for @no-endpoint-services / @no-endpoint-nodeports" ; }
|
||||
add chain ip kube-proxy services
|
||||
|
||||
add rule ip kube-proxy cluster-ips-check ip daddr @cluster-ips reject comment "Reject traffic to invalid ports of ClusterIPs"
|
||||
add rule ip kube-proxy cluster-ips-check ip daddr { 172.30.0.0/16 } drop comment "Drop traffic to unallocated ClusterIPs"
|
||||
add rule ip kube-proxy endpoints-check ip daddr . meta l4proto . th dport vmap @no-endpoint-services
|
||||
add rule ip kube-proxy endpoints-check fib daddr type local ip daddr != 127.0.0.0/8 meta l4proto . th dport vmap @no-endpoint-nodeports
|
||||
add rule ip kube-proxy filter-prerouting ct state new jump firewall-check
|
||||
add rule ip kube-proxy filter-forward ct state new jump endpoints-check
|
||||
add rule ip kube-proxy filter-forward ct state new jump cluster-ips-check
|
||||
add rule ip kube-proxy filter-input ct state new jump endpoints-check
|
||||
add rule ip kube-proxy filter-output ct state new jump endpoints-check
|
||||
add rule ip kube-proxy filter-output ct state new jump firewall-check
|
||||
add rule ip kube-proxy filter-output-post-dnat ct state new jump cluster-ips-check
|
||||
add rule ip kube-proxy firewall-check ip daddr . meta l4proto . th dport vmap @firewall-ips
|
||||
add rule ip kube-proxy mark-for-masquerade mark set mark or 0x4000
|
||||
add rule ip kube-proxy masquerading mark and 0x4000 == 0 return
|
||||
@@ -4275,6 +4306,8 @@ func TestSyncProxyRulesRepeated(t *testing.T) {
|
||||
add rule ip kube-proxy services ip daddr . meta l4proto . th dport vmap @service-ips
|
||||
add rule ip kube-proxy services fib daddr type local ip daddr != 127.0.0.0/8 meta l4proto . th dport vmap @service-nodeports
|
||||
|
||||
add set ip kube-proxy cluster-ips { type ipv4_addr ; comment "Active ClusterIPs" ; }
|
||||
|
||||
add map ip kube-proxy firewall-ips { type ipv4_addr . inet_proto . inet_service : verdict ; comment "destinations that are subject to LoadBalancerSourceRanges" ; }
|
||||
add map ip kube-proxy no-endpoint-nodeports { type inet_proto . inet_service : verdict ; comment "vmap to drop or reject packets to service nodeports with no endpoints" ; }
|
||||
add map ip kube-proxy no-endpoint-services { type ipv4_addr . inet_proto . inet_service : verdict ; comment "vmap to drop or reject packets to services with no endpoints" ; }
|
||||
@@ -4343,6 +4376,8 @@ func TestSyncProxyRulesRepeated(t *testing.T) {
|
||||
fp.syncProxyRules()
|
||||
|
||||
expected := baseRules + dedent.Dedent(`
|
||||
add element ip kube-proxy cluster-ips { 172.30.0.41 }
|
||||
add element ip kube-proxy cluster-ips { 172.30.0.42 }
|
||||
add element ip kube-proxy service-ips { 172.30.0.41 . tcp . 80 : goto service-ULMVA6XW-ns1/svc1/tcp/p80 }
|
||||
add element ip kube-proxy service-ips { 172.30.0.42 . tcp . 8080 : goto service-MHHHYRWA-ns2/svc2/tcp/p8080 }
|
||||
|
||||
@@ -4392,6 +4427,9 @@ func TestSyncProxyRulesRepeated(t *testing.T) {
|
||||
fp.syncProxyRules()
|
||||
|
||||
expected = baseRules + dedent.Dedent(`
|
||||
add element ip kube-proxy cluster-ips { 172.30.0.41 }
|
||||
add element ip kube-proxy cluster-ips { 172.30.0.42 }
|
||||
add element ip kube-proxy cluster-ips { 172.30.0.43 }
|
||||
add element ip kube-proxy service-ips { 172.30.0.41 . tcp . 80 : goto service-ULMVA6XW-ns1/svc1/tcp/p80 }
|
||||
add element ip kube-proxy service-ips { 172.30.0.42 . tcp . 8080 : goto service-MHHHYRWA-ns2/svc2/tcp/p8080 }
|
||||
add element ip kube-proxy service-ips { 172.30.0.43 . tcp . 80 : goto service-4AT6LBPK-ns3/svc3/tcp/p80 }
|
||||
@@ -4423,6 +4461,8 @@ func TestSyncProxyRulesRepeated(t *testing.T) {
|
||||
fp.OnServiceDelete(svc2)
|
||||
fp.syncProxyRules()
|
||||
expected = baseRules + dedent.Dedent(`
|
||||
add element ip kube-proxy cluster-ips { 172.30.0.41 }
|
||||
add element ip kube-proxy cluster-ips { 172.30.0.43 }
|
||||
add element ip kube-proxy service-ips { 172.30.0.41 . tcp . 80 : goto service-ULMVA6XW-ns1/svc1/tcp/p80 }
|
||||
add element ip kube-proxy service-ips { 172.30.0.43 . tcp . 80 : goto service-4AT6LBPK-ns3/svc3/tcp/p80 }
|
||||
|
||||
@@ -4449,6 +4489,8 @@ func TestSyncProxyRulesRepeated(t *testing.T) {
|
||||
ageStaleChains()
|
||||
fp.syncProxyRules()
|
||||
expected = baseRules + dedent.Dedent(`
|
||||
add element ip kube-proxy cluster-ips { 172.30.0.41 }
|
||||
add element ip kube-proxy cluster-ips { 172.30.0.43 }
|
||||
add element ip kube-proxy service-ips { 172.30.0.41 . tcp . 80 : goto service-ULMVA6XW-ns1/svc1/tcp/p80 }
|
||||
add element ip kube-proxy service-ips { 172.30.0.43 . tcp . 80 : goto service-4AT6LBPK-ns3/svc3/tcp/p80 }
|
||||
|
||||
@@ -4482,6 +4524,9 @@ func TestSyncProxyRulesRepeated(t *testing.T) {
|
||||
)
|
||||
fp.syncProxyRules()
|
||||
expected = baseRules + dedent.Dedent(`
|
||||
add element ip kube-proxy cluster-ips { 172.30.0.41 }
|
||||
add element ip kube-proxy cluster-ips { 172.30.0.43 }
|
||||
add element ip kube-proxy cluster-ips { 172.30.0.44 }
|
||||
add element ip kube-proxy service-ips { 172.30.0.41 . tcp . 80 : goto service-ULMVA6XW-ns1/svc1/tcp/p80 }
|
||||
add element ip kube-proxy service-ips { 172.30.0.43 . tcp . 80 : goto service-4AT6LBPK-ns3/svc3/tcp/p80 }
|
||||
|
||||
@@ -4518,6 +4563,9 @@ func TestSyncProxyRulesRepeated(t *testing.T) {
|
||||
)
|
||||
fp.syncProxyRules()
|
||||
expected = baseRules + dedent.Dedent(`
|
||||
add element ip kube-proxy cluster-ips { 172.30.0.41 }
|
||||
add element ip kube-proxy cluster-ips { 172.30.0.43 }
|
||||
add element ip kube-proxy cluster-ips { 172.30.0.44 }
|
||||
add element ip kube-proxy service-ips { 172.30.0.41 . tcp . 80 : goto service-ULMVA6XW-ns1/svc1/tcp/p80 }
|
||||
add element ip kube-proxy service-ips { 172.30.0.43 . tcp . 80 : goto service-4AT6LBPK-ns3/svc3/tcp/p80 }
|
||||
add element ip kube-proxy service-ips { 172.30.0.44 . tcp . 80 : goto service-LAUZTJTB-ns4/svc4/tcp/p80 }
|
||||
@@ -4553,6 +4601,9 @@ func TestSyncProxyRulesRepeated(t *testing.T) {
|
||||
|
||||
// The old endpoint chain (for 10.0.3.1) will not be deleted yet.
|
||||
expected = baseRules + dedent.Dedent(`
|
||||
add element ip kube-proxy cluster-ips { 172.30.0.41 }
|
||||
add element ip kube-proxy cluster-ips { 172.30.0.43 }
|
||||
add element ip kube-proxy cluster-ips { 172.30.0.44 }
|
||||
add element ip kube-proxy service-ips { 172.30.0.41 . tcp . 80 : goto service-ULMVA6XW-ns1/svc1/tcp/p80 }
|
||||
add element ip kube-proxy service-ips { 172.30.0.43 . tcp . 80 : goto service-4AT6LBPK-ns3/svc3/tcp/p80 }
|
||||
add element ip kube-proxy service-ips { 172.30.0.44 . tcp . 80 : goto service-LAUZTJTB-ns4/svc4/tcp/p80 }
|
||||
@@ -4591,6 +4642,9 @@ func TestSyncProxyRulesRepeated(t *testing.T) {
|
||||
fp.syncProxyRules()
|
||||
|
||||
expected = baseRules + dedent.Dedent(`
|
||||
add element ip kube-proxy cluster-ips { 172.30.0.41 }
|
||||
add element ip kube-proxy cluster-ips { 172.30.0.43 }
|
||||
add element ip kube-proxy cluster-ips { 172.30.0.44 }
|
||||
add element ip kube-proxy service-ips { 172.30.0.41 . tcp . 80 : goto service-ULMVA6XW-ns1/svc1/tcp/p80 }
|
||||
add element ip kube-proxy service-ips { 172.30.0.43 . tcp . 80 : goto service-4AT6LBPK-ns3/svc3/tcp/p80 }
|
||||
add element ip kube-proxy service-ips { 172.30.0.44 . tcp . 80 : goto service-LAUZTJTB-ns4/svc4/tcp/p80 }
|
||||
@@ -4627,6 +4681,9 @@ func TestSyncProxyRulesRepeated(t *testing.T) {
|
||||
fp.OnEndpointSliceUpdate(eps3update2, eps3update3)
|
||||
fp.syncProxyRules()
|
||||
expected = baseRules + dedent.Dedent(`
|
||||
add element ip kube-proxy cluster-ips { 172.30.0.41 }
|
||||
add element ip kube-proxy cluster-ips { 172.30.0.43 }
|
||||
add element ip kube-proxy cluster-ips { 172.30.0.44 }
|
||||
add element ip kube-proxy service-ips { 172.30.0.41 . tcp . 80 : goto service-ULMVA6XW-ns1/svc1/tcp/p80 }
|
||||
add element ip kube-proxy no-endpoint-services { 172.30.0.43 . tcp . 80 comment "ns3/svc3:p80" : goto reject-chain }
|
||||
add element ip kube-proxy service-ips { 172.30.0.44 . tcp . 80 : goto service-LAUZTJTB-ns4/svc4/tcp/p80 }
|
||||
@@ -4659,6 +4716,9 @@ func TestSyncProxyRulesRepeated(t *testing.T) {
|
||||
fp.OnEndpointSliceUpdate(eps3update3, eps3update2)
|
||||
fp.syncProxyRules()
|
||||
expected = baseRules + dedent.Dedent(`
|
||||
add element ip kube-proxy cluster-ips { 172.30.0.41 }
|
||||
add element ip kube-proxy cluster-ips { 172.30.0.43 }
|
||||
add element ip kube-proxy cluster-ips { 172.30.0.44 }
|
||||
add element ip kube-proxy service-ips { 172.30.0.41 . tcp . 80 : goto service-ULMVA6XW-ns1/svc1/tcp/p80 }
|
||||
add element ip kube-proxy service-ips { 172.30.0.43 . tcp . 80 : goto service-4AT6LBPK-ns3/svc3/tcp/p80 }
|
||||
add element ip kube-proxy service-ips { 172.30.0.44 . tcp . 80 : goto service-LAUZTJTB-ns4/svc4/tcp/p80 }
|
||||
@@ -5131,3 +5191,21 @@ func Test_servicePortEndpointChainNameBase(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestProxier_OnServiceCIDRsChanged(t *testing.T) {
|
||||
var proxier *Proxier
|
||||
|
||||
proxier = &Proxier{ipFamily: v1.IPv4Protocol}
|
||||
proxier.OnServiceCIDRsChanged([]string{"172.30.0.0/16", "fd00:10:96::/112"})
|
||||
assert.Equal(t, proxier.serviceCIDRs, "172.30.0.0/16")
|
||||
|
||||
proxier.OnServiceCIDRsChanged([]string{"172.30.0.0/16", "172.50.0.0/16", "fd00:10:96::/112", "fd00:172:30::/112"})
|
||||
assert.Equal(t, proxier.serviceCIDRs, "172.30.0.0/16,172.50.0.0/16")
|
||||
|
||||
proxier = &Proxier{ipFamily: v1.IPv6Protocol}
|
||||
proxier.OnServiceCIDRsChanged([]string{"172.30.0.0/16", "fd00:10:96::/112"})
|
||||
assert.Equal(t, proxier.serviceCIDRs, "fd00:10:96::/112")
|
||||
|
||||
proxier.OnServiceCIDRsChanged([]string{"172.30.0.0/16", "172.50.0.0/16", "fd00:10:96::/112", "fd00:172:30::/112"})
|
||||
assert.Equal(t, proxier.serviceCIDRs, "fd00:10:96::/112,fd00:172:30::/112")
|
||||
}
|
||||
|
@@ -29,6 +29,7 @@ type Provider interface {
|
||||
config.EndpointSliceHandler
|
||||
config.ServiceHandler
|
||||
config.NodeHandler
|
||||
config.ServiceCIDRHandler
|
||||
|
||||
// Sync immediately synchronizes the Provider's current state to proxy rules.
|
||||
Sync()
|
||||
|
@@ -1009,6 +1009,10 @@ func (proxier *Proxier) OnEndpointSlicesSynced() {
|
||||
proxier.syncProxyRules()
|
||||
}
|
||||
|
||||
// OnServiceCIDRsChanged is called whenever a change is observed
|
||||
// in any of the ServiceCIDRs, and provides complete list of service cidrs.
|
||||
func (proxier *Proxier) OnServiceCIDRsChanged(_ []string) {}
|
||||
|
||||
func (proxier *Proxier) cleanupAllPolicies() {
|
||||
for svcName, svc := range proxier.svcPortMap {
|
||||
svcInfo, ok := svc.(*serviceInfo)
|
||||
|
@@ -534,6 +534,10 @@ func ClusterRoles() []rbacv1.ClusterRole {
|
||||
|
||||
eventsRule(),
|
||||
}
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.MultiCIDRServiceAllocator) {
|
||||
nodeProxierRules = append(nodeProxierRules, rbacv1helpers.NewRule("list", "watch").Groups(networkingGroup).Resources("servicecidrs").RuleOrDie())
|
||||
}
|
||||
|
||||
nodeProxierRules = append(nodeProxierRules, rbacv1helpers.NewRule("list", "watch").Groups(discoveryGroup).Resources("endpointslices").RuleOrDie())
|
||||
roles = append(roles, rbacv1.ClusterRole{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "system:node-proxier"},
|
||||
|
Reference in New Issue
Block a user