Merge pull request #122768 from danwinship/conntrack-cleanup

conntrack cleanup cleanup
This commit is contained in:
Kubernetes Prow Robot
2024-01-16 01:15:29 +01:00
committed by GitHub
11 changed files with 656 additions and 953 deletions

View File

@@ -25,24 +25,24 @@ import (
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/proxy"
proxyutil "k8s.io/kubernetes/pkg/proxy/util"
utilexec "k8s.io/utils/exec"
netutils "k8s.io/utils/net"
)
// CleanStaleEntries takes care of flushing stale conntrack entries for services and endpoints.
func CleanStaleEntries(isIPv6 bool, exec utilexec.Interface, svcPortMap proxy.ServicePortMap,
func CleanStaleEntries(ct Interface, svcPortMap proxy.ServicePortMap,
serviceUpdateResult proxy.UpdateServiceMapResult, endpointsUpdateResult proxy.UpdateEndpointsMapResult) {
deleteStaleServiceConntrackEntries(isIPv6, exec, svcPortMap, serviceUpdateResult, endpointsUpdateResult)
deleteStaleEndpointConntrackEntries(exec, svcPortMap, endpointsUpdateResult)
deleteStaleServiceConntrackEntries(ct, svcPortMap, serviceUpdateResult, endpointsUpdateResult)
deleteStaleEndpointConntrackEntries(ct, svcPortMap, endpointsUpdateResult)
}
// deleteStaleServiceConntrackEntries takes care of flushing stale conntrack entries related
// to UDP Service IPs. When a service has no endpoints and we drop traffic to it, conntrack
// may create "black hole" entries for that IP+port. When the service gets endpoints we
// need to delete those entries so further traffic doesn't get dropped.
func deleteStaleServiceConntrackEntries(isIPv6 bool, exec utilexec.Interface, svcPortMap proxy.ServicePortMap, serviceUpdateResult proxy.UpdateServiceMapResult, endpointsUpdateResult proxy.UpdateEndpointsMapResult) {
func deleteStaleServiceConntrackEntries(ct Interface, svcPortMap proxy.ServicePortMap, serviceUpdateResult proxy.UpdateServiceMapResult, endpointsUpdateResult proxy.UpdateEndpointsMapResult) {
conntrackCleanupServiceIPs := serviceUpdateResult.DeletedUDPClusterIPs
conntrackCleanupServiceNodePorts := sets.New[int]()
isIPv6 := false
// merge newly active services gathered from endpointsUpdateResult
// a UDP service that changes from 0 to non-0 endpoints is newly active.
@@ -59,19 +59,20 @@ func deleteStaleServiceConntrackEntries(isIPv6 bool, exec utilexec.Interface, sv
nodePort := svcInfo.NodePort()
if svcInfo.Protocol() == v1.ProtocolUDP && nodePort != 0 {
conntrackCleanupServiceNodePorts.Insert(nodePort)
isIPv6 = netutils.IsIPv6(svcInfo.ClusterIP())
}
}
}
klog.V(4).InfoS("Deleting conntrack stale entries for services", "IPs", conntrackCleanupServiceIPs.UnsortedList())
for _, svcIP := range conntrackCleanupServiceIPs.UnsortedList() {
if err := ClearEntriesForIP(exec, svcIP, v1.ProtocolUDP); err != nil {
if err := ct.ClearEntriesForIP(svcIP, v1.ProtocolUDP); err != nil {
klog.ErrorS(err, "Failed to delete stale service connections", "IP", svcIP)
}
}
klog.V(4).InfoS("Deleting conntrack stale entries for services", "nodePorts", conntrackCleanupServiceNodePorts.UnsortedList())
for _, nodePort := range conntrackCleanupServiceNodePorts.UnsortedList() {
err := ClearEntriesForPort(exec, nodePort, isIPv6, v1.ProtocolUDP)
err := ct.ClearEntriesForPort(nodePort, isIPv6, v1.ProtocolUDP)
if err != nil {
klog.ErrorS(err, "Failed to clear udp conntrack", "nodePort", nodePort)
}
@@ -81,30 +82,30 @@ func deleteStaleServiceConntrackEntries(isIPv6 bool, exec utilexec.Interface, sv
// deleteStaleEndpointConntrackEntries takes care of flushing stale conntrack entries related
// to UDP endpoints. After a UDP endpoint is removed we must flush any conntrack entries
// for it so that if the same client keeps sending, the packets will get routed to a new endpoint.
func deleteStaleEndpointConntrackEntries(exec utilexec.Interface, svcPortMap proxy.ServicePortMap, endpointsUpdateResult proxy.UpdateEndpointsMapResult) {
func deleteStaleEndpointConntrackEntries(ct Interface, svcPortMap proxy.ServicePortMap, endpointsUpdateResult proxy.UpdateEndpointsMapResult) {
for _, epSvcPair := range endpointsUpdateResult.DeletedUDPEndpoints {
if svcInfo, ok := svcPortMap[epSvcPair.ServicePortName]; ok {
endpointIP := proxyutil.IPPart(epSvcPair.Endpoint)
nodePort := svcInfo.NodePort()
var err error
if nodePort != 0 {
err = ClearEntriesForPortNAT(exec, endpointIP, nodePort, v1.ProtocolUDP)
err = ct.ClearEntriesForPortNAT(endpointIP, nodePort, v1.ProtocolUDP)
if err != nil {
klog.ErrorS(err, "Failed to delete nodeport-related endpoint connections", "servicePortName", epSvcPair.ServicePortName)
}
}
err = ClearEntriesForNAT(exec, svcInfo.ClusterIP().String(), endpointIP, v1.ProtocolUDP)
err = ct.ClearEntriesForNAT(svcInfo.ClusterIP().String(), endpointIP, v1.ProtocolUDP)
if err != nil {
klog.ErrorS(err, "Failed to delete endpoint connections", "servicePortName", epSvcPair.ServicePortName)
}
for _, extIP := range svcInfo.ExternalIPs() {
err := ClearEntriesForNAT(exec, extIP.String(), endpointIP, v1.ProtocolUDP)
err := ct.ClearEntriesForNAT(extIP.String(), endpointIP, v1.ProtocolUDP)
if err != nil {
klog.ErrorS(err, "Failed to delete endpoint connections for externalIP", "servicePortName", epSvcPair.ServicePortName, "externalIP", extIP)
}
}
for _, lbIP := range svcInfo.LoadBalancerVIPs() {
err := ClearEntriesForNAT(exec, lbIP.String(), endpointIP, v1.ProtocolUDP)
err := ct.ClearEntriesForNAT(lbIP.String(), endpointIP, v1.ProtocolUDP)
if err != nil {
klog.ErrorS(err, "Failed to delete endpoint connections for LoadBalancerIP", "servicePortName", epSvcPair.ServicePortName, "loadBalancerIP", lbIP)
}

View File

@@ -0,0 +1,263 @@
//go:build linux
// +build linux
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package conntrack
import (
"net"
"reflect"
"testing"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/kubernetes/pkg/proxy"
)
const (
testClusterIP = "172.30.1.1"
testExternalIP = "192.168.99.100"
testLoadBalancerIP = "1.2.3.4"
testEndpointIP = "10.240.0.4"
testPort = 53
testNodePort = 5353
testEndpointPort = "5300"
)
func TestCleanStaleEntries(t *testing.T) {
// We need to construct a proxy.ServicePortMap to pass to CleanStaleEntries.
// ServicePortMap is just map[string]proxy.ServicePort, but there are no public
// constructors for any implementation of proxy.ServicePort, so we have to either
// provide our own implementation of that interface, or else use a
// proxy.ServiceChangeTracker to construct them and fill in the map for us.
sct := proxy.NewServiceChangeTracker(nil, v1.IPv4Protocol, nil, nil)
svc := &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "cleanup-test",
Namespace: "test",
},
Spec: v1.ServiceSpec{
ClusterIP: testClusterIP,
ExternalIPs: []string{testExternalIP},
Ports: []v1.ServicePort{
{
Name: "dns-tcp",
Port: testPort,
Protocol: v1.ProtocolTCP,
},
{
Name: "dns-udp",
Port: testPort,
NodePort: testNodePort,
Protocol: v1.ProtocolUDP,
},
},
},
Status: v1.ServiceStatus{
LoadBalancer: v1.LoadBalancerStatus{
Ingress: []v1.LoadBalancerIngress{{
IP: testLoadBalancerIP,
}},
},
},
}
sct.Update(nil, svc)
svcPortMap := make(proxy.ServicePortMap)
_ = svcPortMap.Update(sct)
// (At this point we are done with sct, and in particular, we don't use sct to
// construct UpdateServiceMapResults, because pkg/proxy already has its own tests
// for that. Also, svcPortMap is read-only from this point on.)
tcpPortName := proxy.ServicePortName{
NamespacedName: types.NamespacedName{
Namespace: svc.Namespace,
Name: svc.Name,
},
Port: svc.Spec.Ports[0].Name,
Protocol: svc.Spec.Ports[0].Protocol,
}
udpPortName := proxy.ServicePortName{
NamespacedName: types.NamespacedName{
Namespace: svc.Namespace,
Name: svc.Name,
},
Port: svc.Spec.Ports[1].Name,
Protocol: svc.Spec.Ports[1].Protocol,
}
unknownPortName := udpPortName
unknownPortName.Namespace = "unknown"
// Sanity-check to make sure we constructed the map correctly
if len(svcPortMap) != 2 {
t.Fatalf("expected svcPortMap to have 2 entries, got %+v", svcPortMap)
}
servicePort := svcPortMap[tcpPortName]
if servicePort == nil || servicePort.String() != "172.30.1.1:53/TCP" {
t.Fatalf("expected svcPortMap[%q] to be \"172.30.1.1:53/TCP\", got %q", tcpPortName.String(), servicePort.String())
}
servicePort = svcPortMap[udpPortName]
if servicePort == nil || servicePort.String() != "172.30.1.1:53/UDP" {
t.Fatalf("expected svcPortMap[%q] to be \"172.30.1.1:53/UDP\", got %q", udpPortName.String(), servicePort.String())
}
testCases := []struct {
description string
serviceUpdates proxy.UpdateServiceMapResult
endpointsUpdates proxy.UpdateEndpointsMapResult
result FakeInterface
}{
{
description: "DeletedUDPClusterIPs clears entries for given clusterIPs (only)",
serviceUpdates: proxy.UpdateServiceMapResult{
// Note: this isn't testClusterIP; it's the IP of some
// unknown (because deleted) service.
DeletedUDPClusterIPs: sets.New("172.30.99.99"),
},
endpointsUpdates: proxy.UpdateEndpointsMapResult{},
result: FakeInterface{
ClearedIPs: sets.New("172.30.99.99"),
ClearedPorts: sets.New[int](),
ClearedNATs: map[string]string{},
ClearedPortNATs: map[int]string{},
},
},
{
description: "DeletedUDPEndpoints clears NAT entries for all IPs and NodePorts",
serviceUpdates: proxy.UpdateServiceMapResult{
DeletedUDPClusterIPs: sets.New[string](),
},
endpointsUpdates: proxy.UpdateEndpointsMapResult{
DeletedUDPEndpoints: []proxy.ServiceEndpoint{{
Endpoint: net.JoinHostPort(testEndpointIP, testEndpointPort),
ServicePortName: udpPortName,
}},
},
result: FakeInterface{
ClearedIPs: sets.New[string](),
ClearedPorts: sets.New[int](),
ClearedNATs: map[string]string{
testClusterIP: testEndpointIP,
testExternalIP: testEndpointIP,
testLoadBalancerIP: testEndpointIP,
},
ClearedPortNATs: map[int]string{
testNodePort: testEndpointIP,
},
},
},
{
description: "NewlyActiveUDPServices clears entries for all IPs and NodePorts",
serviceUpdates: proxy.UpdateServiceMapResult{
DeletedUDPClusterIPs: sets.New[string](),
},
endpointsUpdates: proxy.UpdateEndpointsMapResult{
DeletedUDPEndpoints: []proxy.ServiceEndpoint{},
NewlyActiveUDPServices: []proxy.ServicePortName{
udpPortName,
},
},
result: FakeInterface{
ClearedIPs: sets.New(testClusterIP, testExternalIP, testLoadBalancerIP),
ClearedPorts: sets.New(testNodePort),
ClearedNATs: map[string]string{},
ClearedPortNATs: map[int]string{},
},
},
{
description: "DeletedUDPEndpoints for unknown Service has no effect",
serviceUpdates: proxy.UpdateServiceMapResult{
DeletedUDPClusterIPs: sets.New[string](),
},
endpointsUpdates: proxy.UpdateEndpointsMapResult{
DeletedUDPEndpoints: []proxy.ServiceEndpoint{{
Endpoint: "10.240.0.4:80",
ServicePortName: unknownPortName,
}},
NewlyActiveUDPServices: []proxy.ServicePortName{},
},
result: FakeInterface{
ClearedIPs: sets.New[string](),
ClearedPorts: sets.New[int](),
ClearedNATs: map[string]string{},
ClearedPortNATs: map[int]string{},
},
},
{
description: "NewlyActiveUDPServices for unknown Service has no effect",
serviceUpdates: proxy.UpdateServiceMapResult{
DeletedUDPClusterIPs: sets.New[string](),
},
endpointsUpdates: proxy.UpdateEndpointsMapResult{
DeletedUDPEndpoints: []proxy.ServiceEndpoint{},
NewlyActiveUDPServices: []proxy.ServicePortName{
unknownPortName,
},
},
result: FakeInterface{
ClearedIPs: sets.New[string](),
ClearedPorts: sets.New[int](),
ClearedNATs: map[string]string{},
ClearedPortNATs: map[int]string{},
},
},
}
for _, tc := range testCases {
t.Run(tc.description, func(t *testing.T) {
fake := NewFake()
CleanStaleEntries(fake, svcPortMap, tc.serviceUpdates, tc.endpointsUpdates)
if !fake.ClearedIPs.Equal(tc.result.ClearedIPs) {
t.Errorf("Expected ClearedIPs=%v, got %v", tc.result.ClearedIPs, fake.ClearedIPs)
}
if !fake.ClearedPorts.Equal(tc.result.ClearedPorts) {
t.Errorf("Expected ClearedPorts=%v, got %v", tc.result.ClearedPorts, fake.ClearedPorts)
}
if !reflect.DeepEqual(fake.ClearedNATs, tc.result.ClearedNATs) {
t.Errorf("Expected ClearedNATs=%v, got %v", tc.result.ClearedNATs, fake.ClearedNATs)
}
if !reflect.DeepEqual(fake.ClearedPortNATs, tc.result.ClearedPortNATs) {
t.Errorf("Expected ClearedPortNATs=%v, got %v", tc.result.ClearedPortNATs, fake.ClearedPortNATs)
}
})
}
}

View File

@@ -30,10 +30,38 @@ import (
utilnet "k8s.io/utils/net"
)
// Utilities for dealing with conntrack
// Interface for dealing with conntrack
type Interface interface {
// ClearEntriesForIP deletes conntrack entries for connections of the given
// protocol, to the given IP.
ClearEntriesForIP(ip string, protocol v1.Protocol) error
// NoConnectionToDelete is the error string returned by conntrack when no matching connections are found
const NoConnectionToDelete = "0 flow entries have been deleted"
// ClearEntriesForPort deletes conntrack entries for connections of the given
// protocol and IP family, to the given port.
ClearEntriesForPort(port int, isIPv6 bool, protocol v1.Protocol) error
// ClearEntriesForNAT deletes conntrack entries for connections of the given
// protocol, which had been DNATted from origin to dest.
ClearEntriesForNAT(origin, dest string, protocol v1.Protocol) error
// ClearEntriesForPortNAT deletes conntrack entries for connections of the given
// protocol, which had been DNATted from the given port (on any IP) to dest.
ClearEntriesForPortNAT(dest string, port int, protocol v1.Protocol) error
}
// execCT implements Interface by execing the conntrack tool
type execCT struct {
execer exec.Interface
}
var _ Interface = &execCT{}
func NewExec(execer exec.Interface) Interface {
return &execCT{execer: execer}
}
// noConnectionToDelete is the error string returned by conntrack when no matching connections are found
const noConnectionToDelete = "0 flow entries have been deleted"
func protoStr(proto v1.Protocol) string {
return strings.ToLower(string(proto))
@@ -46,12 +74,26 @@ func parametersWithFamily(isIPv6 bool, parameters ...string) []string {
return parameters
}
// ClearEntriesForIP uses the conntrack tool to delete the conntrack entries
// for the UDP connections specified by the given service IP
func ClearEntriesForIP(execer exec.Interface, ip string, protocol v1.Protocol) error {
// exec executes the conntrack tool using the given parameters
func (ct *execCT) exec(parameters ...string) error {
conntrackPath, err := ct.execer.LookPath("conntrack")
if err != nil {
return fmt.Errorf("error looking for path of conntrack: %v", err)
}
klog.V(4).InfoS("Clearing conntrack entries", "parameters", parameters)
output, err := ct.execer.Command(conntrackPath, parameters...).CombinedOutput()
if err != nil {
return fmt.Errorf("conntrack command returned: %q, error message: %s", string(output), err)
}
klog.V(4).InfoS("Conntrack entries deleted", "output", string(output))
return nil
}
// ClearEntriesForIP is part of Interface
func (ct *execCT) ClearEntriesForIP(ip string, protocol v1.Protocol) error {
parameters := parametersWithFamily(utilnet.IsIPv6String(ip), "-D", "--orig-dst", ip, "-p", protoStr(protocol))
err := Exec(execer, parameters...)
if err != nil && !strings.Contains(err.Error(), NoConnectionToDelete) {
err := ct.exec(parameters...)
if err != nil && !strings.Contains(err.Error(), noConnectionToDelete) {
// TODO: Better handling for deletion failure. When failure occur, stale udp connection may not get flushed.
// These stale udp connection will keep black hole traffic. Making this a best effort operation for now, since it
// is expensive to baby-sit all udp connections to kubernetes services.
@@ -60,46 +102,25 @@ func ClearEntriesForIP(execer exec.Interface, ip string, protocol v1.Protocol) e
return nil
}
// Exec executes the conntrack tool using the given parameters
func Exec(execer exec.Interface, parameters ...string) error {
conntrackPath, err := execer.LookPath("conntrack")
if err != nil {
return fmt.Errorf("error looking for path of conntrack: %v", err)
}
klog.V(4).InfoS("Clearing conntrack entries", "parameters", parameters)
output, err := execer.Command(conntrackPath, parameters...).CombinedOutput()
if err != nil {
return fmt.Errorf("conntrack command returned: %q, error message: %s", string(output), err)
}
klog.V(4).InfoS("Conntrack entries deleted", "output", string(output))
return nil
}
// ClearEntriesForPort uses the conntrack tool to delete the conntrack entries
// for connections specified by the port.
// When a packet arrives, it will not go through NAT table again, because it is not "the first" packet.
// The solution is clearing the conntrack. Known issues:
// https://github.com/docker/docker/issues/8795
// https://github.com/kubernetes/kubernetes/issues/31983
func ClearEntriesForPort(execer exec.Interface, port int, isIPv6 bool, protocol v1.Protocol) error {
// ClearEntriesForPort is part of Interface
func (ct *execCT) ClearEntriesForPort(port int, isIPv6 bool, protocol v1.Protocol) error {
if port <= 0 {
return fmt.Errorf("wrong port number. The port number must be greater than zero")
}
parameters := parametersWithFamily(isIPv6, "-D", "-p", protoStr(protocol), "--dport", strconv.Itoa(port))
err := Exec(execer, parameters...)
if err != nil && !strings.Contains(err.Error(), NoConnectionToDelete) {
err := ct.exec(parameters...)
if err != nil && !strings.Contains(err.Error(), noConnectionToDelete) {
return fmt.Errorf("error deleting conntrack entries for UDP port: %d, error: %v", port, err)
}
return nil
}
// ClearEntriesForNAT uses the conntrack tool to delete the conntrack entries
// for connections specified by the {origin, dest} IP pair.
func ClearEntriesForNAT(execer exec.Interface, origin, dest string, protocol v1.Protocol) error {
// ClearEntriesForNAT is part of Interface
func (ct *execCT) ClearEntriesForNAT(origin, dest string, protocol v1.Protocol) error {
parameters := parametersWithFamily(utilnet.IsIPv6String(origin), "-D", "--orig-dst", origin, "--dst-nat", dest,
"-p", protoStr(protocol))
err := Exec(execer, parameters...)
if err != nil && !strings.Contains(err.Error(), NoConnectionToDelete) {
err := ct.exec(parameters...)
if err != nil && !strings.Contains(err.Error(), noConnectionToDelete) {
// TODO: Better handling for deletion failure. When failure occur, stale udp connection may not get flushed.
// These stale udp connection will keep black hole traffic. Making this a best effort operation for now, since it
// is expensive to baby sit all udp connections to kubernetes services.
@@ -108,17 +129,14 @@ func ClearEntriesForNAT(execer exec.Interface, origin, dest string, protocol v1.
return nil
}
// ClearEntriesForPortNAT uses the conntrack tool to delete the conntrack entries
// for connections specified by the {dest IP, port} pair.
// Known issue:
// https://github.com/kubernetes/kubernetes/issues/59368
func ClearEntriesForPortNAT(execer exec.Interface, dest string, port int, protocol v1.Protocol) error {
// ClearEntriesForPortNAT is part of Interface
func (ct *execCT) ClearEntriesForPortNAT(dest string, port int, protocol v1.Protocol) error {
if port <= 0 {
return fmt.Errorf("wrong port number. The port number must be greater than zero")
}
parameters := parametersWithFamily(utilnet.IsIPv6String(dest), "-D", "-p", protoStr(protocol), "--dport", strconv.Itoa(port), "--dst-nat", dest)
err := Exec(execer, parameters...)
if err != nil && !strings.Contains(err.Error(), NoConnectionToDelete) {
err := ct.exec(parameters...)
if err != nil && !strings.Contains(err.Error(), noConnectionToDelete) {
return fmt.Errorf("error deleting conntrack entries for UDP port: %d, error: %v", port, err)
}
return nil

View File

@@ -27,47 +27,66 @@ import (
v1 "k8s.io/api/core/v1"
"k8s.io/utils/exec"
fakeexec "k8s.io/utils/exec/testing"
utilnet "k8s.io/utils/net"
)
func familyParamStr(isIPv6 bool) string {
if isIPv6 {
return " -f ipv6"
}
return ""
var success = func() ([]byte, []byte, error) { return []byte("1 flow entries have been deleted"), nil, nil }
var nothingToDelete = func() ([]byte, []byte, error) {
return []byte(""), nil, fmt.Errorf("conntrack v1.4.2 (conntrack-tools): 0 flow entries have been deleted")
}
func TestExecConntrackTool(t *testing.T) {
fcmd := fakeexec.FakeCmd{
CombinedOutputScript: []fakeexec.FakeAction{
func() ([]byte, []byte, error) { return []byte("1 flow entries have been deleted"), nil, nil },
func() ([]byte, []byte, error) { return []byte("1 flow entries have been deleted"), nil, nil },
func() ([]byte, []byte, error) {
return []byte(""), nil, fmt.Errorf("conntrack v1.4.2 (conntrack-tools): 0 flow entries have been deleted")
},
},
type testCT struct {
execCT
fcmd *fakeexec.FakeCmd
}
func makeCT(result fakeexec.FakeAction) *testCT {
fcmd := &fakeexec.FakeCmd{
CombinedOutputScript: []fakeexec.FakeAction{result},
}
fexec := &fakeexec.FakeExec{
CommandScript: []fakeexec.FakeCommandAction{
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(fcmd, cmd, args...) },
},
LookPathFunc: func(cmd string) (string, error) { return cmd, nil },
}
testCases := [][]string{
{"-L", "-p", "udp"},
{"-D", "-p", "udp", "-d", "10.0.240.1"},
{"-D", "-p", "udp", "--orig-dst", "10.240.0.2", "--dst-nat", "10.0.10.2"},
return &testCT{execCT{fexec}, fcmd}
}
// Gets the command that ct executed. (If it didn't execute any commands, this will
// return "".)
func (ct *testCT) getExecutedCommand() string {
// FakeExec panics if you try to run more commands than you set it up for. So the
// only possibilities here are that we ran 1 command or we ran 0.
if ct.execer.(*fakeexec.FakeExec).CommandCalls != 1 {
return ""
}
return strings.Join(ct.fcmd.CombinedOutputLog[0], " ")
}
func TestExec(t *testing.T) {
testCases := []struct {
args []string
result fakeexec.FakeAction
expectErr bool
}{
{
args: []string{"-D", "-p", "udp", "-d", "10.0.240.1"},
result: success,
expectErr: false,
},
{
args: []string{"-D", "-p", "udp", "--orig-dst", "10.240.0.2", "--dst-nat", "10.0.10.2"},
result: nothingToDelete,
expectErr: true,
},
}
expectErr := []bool{false, false, true}
for i := range testCases {
err := Exec(fexec, testCases[i]...)
if expectErr[i] {
for _, tc := range testCases {
ct := makeCT(tc.result)
err := ct.exec(tc.args...)
if tc.expectErr {
if err == nil {
t.Errorf("expected err, got %v", err)
}
@@ -77,204 +96,177 @@ func TestExecConntrackTool(t *testing.T) {
}
}
execCmd := strings.Join(fcmd.CombinedOutputLog[i], " ")
expectCmd := fmt.Sprintf("%s %s", "conntrack", strings.Join(testCases[i], " "))
execCmd := ct.getExecutedCommand()
expectCmd := "conntrack " + strings.Join(tc.args, " ")
if execCmd != expectCmd {
t.Errorf("expect execute command: %s, but got: %s", expectCmd, execCmd)
}
}
}
func TestClearUDPConntrackForIP(t *testing.T) {
fcmd := fakeexec.FakeCmd{
CombinedOutputScript: []fakeexec.FakeAction{
func() ([]byte, []byte, error) { return []byte("1 flow entries have been deleted"), nil, nil },
func() ([]byte, []byte, error) { return []byte("1 flow entries have been deleted"), nil, nil },
func() ([]byte, []byte, error) {
return []byte(""), nil, fmt.Errorf("conntrack v1.4.2 (conntrack-tools): 0 flow entries have been deleted")
},
func() ([]byte, []byte, error) { return []byte("1 flow entries have been deleted"), nil, nil },
},
}
fexec := &fakeexec.FakeExec{
CommandScript: []fakeexec.FakeCommandAction{
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
},
LookPathFunc: func(cmd string) (string, error) { return cmd, nil },
}
func TestClearEntriesForIP(t *testing.T) {
testCases := []struct {
name string
ip string
expectCommand string
}{
{"IPv4 success", "10.240.0.3"},
{"IPv4 success", "10.240.0.5"},
{"IPv4 simulated error", "10.240.0.4"},
{"IPv6 success", "2001:db8::10"},
{
name: "IPv4",
ip: "10.240.0.3",
expectCommand: "conntrack -D --orig-dst 10.240.0.3 -p udp",
},
{
name: "IPv6",
ip: "2001:db8::10",
expectCommand: "conntrack -D --orig-dst 2001:db8::10 -p udp -f ipv6",
},
}
svcCount := 0
for _, tc := range testCases {
if err := ClearEntriesForIP(fexec, tc.ip, v1.ProtocolUDP); err != nil {
t.Errorf("%s test case:, Unexpected error: %v", tc.name, err)
ct := makeCT(success)
if err := ct.ClearEntriesForIP(tc.ip, v1.ProtocolUDP); err != nil {
t.Errorf("%s/success: Unexpected error: %v", tc.name, err)
}
expectCommand := fmt.Sprintf("conntrack -D --orig-dst %s -p udp", tc.ip) + familyParamStr(utilnet.IsIPv6String(tc.ip))
execCommand := strings.Join(fcmd.CombinedOutputLog[svcCount], " ")
if expectCommand != execCommand {
t.Errorf("%s test case: Expect command: %s, but executed %s", tc.name, expectCommand, execCommand)
execCommand := ct.getExecutedCommand()
if tc.expectCommand != execCommand {
t.Errorf("%s/success: Expect command: %s, but executed %s", tc.name, tc.expectCommand, execCommand)
}
ct = makeCT(nothingToDelete)
if err := ct.ClearEntriesForIP(tc.ip, v1.ProtocolUDP); err != nil {
t.Errorf("%s/nothing to delete: Unexpected error: %v", tc.name, err)
}
svcCount++
}
if svcCount != fexec.CommandCalls {
t.Errorf("Expect command executed %d times, but got %d", svcCount, fexec.CommandCalls)
}
}
func TestClearUDPConntrackForPort(t *testing.T) {
fcmd := fakeexec.FakeCmd{
CombinedOutputScript: []fakeexec.FakeAction{
func() ([]byte, []byte, error) { return []byte("1 flow entries have been deleted"), nil, nil },
func() ([]byte, []byte, error) {
return []byte(""), nil, fmt.Errorf("conntrack v1.4.2 (conntrack-tools): 0 flow entries have been deleted")
},
func() ([]byte, []byte, error) { return []byte("1 flow entries have been deleted"), nil, nil },
},
}
fexec := &fakeexec.FakeExec{
CommandScript: []fakeexec.FakeCommandAction{
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
},
LookPathFunc: func(cmd string) (string, error) { return cmd, nil },
}
func TestClearEntriesForPort(t *testing.T) {
testCases := []struct {
name string
port int
isIPv6 bool
expectCommand string
}{
{"IPv4, no error", 8080, false},
{"IPv4, simulated error", 9090, false},
{"IPv6, no error", 6666, true},
{
name: "IPv4",
port: 8080,
isIPv6: false,
expectCommand: "conntrack -D -p udp --dport 8080",
},
{
name: "IPv6",
port: 6666,
isIPv6: true,
expectCommand: "conntrack -D -p udp --dport 6666 -f ipv6",
},
}
svcCount := 0
for _, tc := range testCases {
err := ClearEntriesForPort(fexec, tc.port, tc.isIPv6, v1.ProtocolUDP)
ct := makeCT(success)
err := ct.ClearEntriesForPort(tc.port, tc.isIPv6, v1.ProtocolUDP)
if err != nil {
t.Errorf("%s test case: Unexpected error: %v", tc.name, err)
t.Errorf("%s/success: Unexpected error: %v", tc.name, err)
}
expectCommand := fmt.Sprintf("conntrack -D -p udp --dport %d", tc.port) + familyParamStr(tc.isIPv6)
execCommand := strings.Join(fcmd.CombinedOutputLog[svcCount], " ")
if expectCommand != execCommand {
t.Errorf("%s test case: Expect command: %s, but executed %s", tc.name, expectCommand, execCommand)
execCommand := ct.getExecutedCommand()
if tc.expectCommand != execCommand {
t.Errorf("%s/success: Expect command: %s, but executed %s", tc.name, tc.expectCommand, execCommand)
}
ct = makeCT(nothingToDelete)
err = ct.ClearEntriesForPort(tc.port, tc.isIPv6, v1.ProtocolUDP)
if err != nil {
t.Errorf("%s/nothing to delete: Unexpected error: %v", tc.name, err)
}
svcCount++
}
if svcCount != fexec.CommandCalls {
t.Errorf("Expect command executed %d times, but got %d", svcCount, fexec.CommandCalls)
}
}
func TestDeleteUDPConnections(t *testing.T) {
fcmd := fakeexec.FakeCmd{
CombinedOutputScript: []fakeexec.FakeAction{
func() ([]byte, []byte, error) { return []byte("1 flow entries have been deleted"), nil, nil },
func() ([]byte, []byte, error) {
return []byte(""), nil, fmt.Errorf("conntrack v1.4.2 (conntrack-tools): 0 flow entries have been deleted")
},
func() ([]byte, []byte, error) { return []byte("1 flow entries have been deleted"), nil, nil },
},
}
fexec := &fakeexec.FakeExec{
CommandScript: []fakeexec.FakeCommandAction{
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
},
LookPathFunc: func(cmd string) (string, error) { return cmd, nil },
}
func TestClearEntriesForNAT(t *testing.T) {
testCases := []struct {
name string
origin string
dest string
expectCommand string
}{
{
name: "IPv4 success",
name: "IPv4",
origin: "1.2.3.4",
dest: "10.20.30.40",
expectCommand: "conntrack -D --orig-dst 1.2.3.4 --dst-nat 10.20.30.40 -p udp",
},
{
name: "IPv4 simulated failure",
origin: "2.3.4.5",
dest: "20.30.40.50",
},
{
name: "IPv6 success",
name: "IPv6",
origin: "fd00::600d:f00d",
dest: "2001:db8::5",
expectCommand: "conntrack -D --orig-dst fd00::600d:f00d --dst-nat 2001:db8::5 -p udp -f ipv6",
},
}
svcCount := 0
for i, tc := range testCases {
err := ClearEntriesForNAT(fexec, tc.origin, tc.dest, v1.ProtocolUDP)
for _, tc := range testCases {
ct := makeCT(success)
err := ct.ClearEntriesForNAT(tc.origin, tc.dest, v1.ProtocolUDP)
if err != nil {
t.Errorf("%s test case: unexpected error: %v", tc.name, err)
t.Errorf("%s/success: unexpected error: %v", tc.name, err)
}
expectCommand := fmt.Sprintf("conntrack -D --orig-dst %s --dst-nat %s -p udp", tc.origin, tc.dest) + familyParamStr(utilnet.IsIPv6String(tc.origin))
execCommand := strings.Join(fcmd.CombinedOutputLog[i], " ")
if expectCommand != execCommand {
t.Errorf("%s test case: Expect command: %s, but executed %s", tc.name, expectCommand, execCommand)
execCommand := ct.getExecutedCommand()
if tc.expectCommand != execCommand {
t.Errorf("%s/success: Expect command: %s, but executed %s", tc.name, tc.expectCommand, execCommand)
}
ct = makeCT(nothingToDelete)
err = ct.ClearEntriesForNAT(tc.origin, tc.dest, v1.ProtocolUDP)
if err != nil {
t.Errorf("%s/nothing to delete: unexpected error: %v", tc.name, err)
}
svcCount++
}
if svcCount != fexec.CommandCalls {
t.Errorf("Expect command executed %d times, but got %d", svcCount, fexec.CommandCalls)
}
}
func TestClearUDPConntrackForPortNAT(t *testing.T) {
fcmd := fakeexec.FakeCmd{
CombinedOutputScript: []fakeexec.FakeAction{
func() ([]byte, []byte, error) { return []byte("1 flow entries have been deleted"), nil, nil },
},
}
fexec := &fakeexec.FakeExec{
CommandScript: []fakeexec.FakeCommandAction{
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
},
LookPathFunc: func(cmd string) (string, error) { return cmd, nil },
}
func TestClearEntriesForPortNAT(t *testing.T) {
testCases := []struct {
name string
port int
dest string
expectCommand string
}{
{
name: "IPv4 success",
name: "IPv4",
port: 30211,
dest: "1.2.3.4",
expectCommand: "conntrack -D -p udp --dport 30211 --dst-nat 1.2.3.4",
},
{
name: "IPv6",
port: 30212,
dest: "2600:5200::7800",
expectCommand: "conntrack -D -p udp --dport 30212 --dst-nat 2600:5200::7800 -f ipv6",
},
}
svcCount := 0
for i, tc := range testCases {
err := ClearEntriesForPortNAT(fexec, tc.dest, tc.port, v1.ProtocolUDP)
for _, tc := range testCases {
ct := makeCT(success)
err := ct.ClearEntriesForPortNAT(tc.dest, tc.port, v1.ProtocolUDP)
if err != nil {
t.Errorf("%s test case: unexpected error: %v", tc.name, err)
t.Errorf("%s/success: unexpected error: %v", tc.name, err)
}
expectCommand := fmt.Sprintf("conntrack -D -p udp --dport %d --dst-nat %s", tc.port, tc.dest) + familyParamStr(utilnet.IsIPv6String(tc.dest))
execCommand := strings.Join(fcmd.CombinedOutputLog[i], " ")
if expectCommand != execCommand {
t.Errorf("%s test case: Expect command: %s, but executed %s", tc.name, expectCommand, execCommand)
execCommand := ct.getExecutedCommand()
if tc.expectCommand != execCommand {
t.Errorf("%s/success: Expect command: %s, but executed %s", tc.name, tc.expectCommand, execCommand)
}
ct = makeCT(nothingToDelete)
err = ct.ClearEntriesForPortNAT(tc.dest, tc.port, v1.ProtocolUDP)
if err != nil {
t.Errorf("%s/nothing to delete: unexpected error: %v", tc.name, err)
}
svcCount++
}
if svcCount != fexec.CommandCalls {
t.Errorf("Expect command executed %d times, but got %d", svcCount, fexec.CommandCalls)
}
}

View File

@@ -0,0 +1,95 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package conntrack
import (
"fmt"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
)
// FakeInterface implements Interface by just recording entries that have been cleared.
type FakeInterface struct {
ClearedIPs sets.Set[string]
ClearedPorts sets.Set[int]
ClearedNATs map[string]string // origin -> dest
ClearedPortNATs map[int]string // port -> dest
}
var _ Interface = &FakeInterface{}
// NewFake creates a new FakeInterface
func NewFake() *FakeInterface {
fake := &FakeInterface{}
fake.Reset()
return fake
}
// Reset clears fake's sets/maps
func (fake *FakeInterface) Reset() {
fake.ClearedIPs = sets.New[string]()
fake.ClearedPorts = sets.New[int]()
fake.ClearedNATs = make(map[string]string)
fake.ClearedPortNATs = make(map[int]string)
}
// ClearEntriesForIP is part of Interface
func (fake *FakeInterface) ClearEntriesForIP(ip string, protocol v1.Protocol) error {
if protocol != v1.ProtocolUDP {
return fmt.Errorf("FakeInterface currently only supports UDP")
}
fake.ClearedIPs.Insert(ip)
return nil
}
// ClearEntriesForPort is part of Interface
func (fake *FakeInterface) ClearEntriesForPort(port int, isIPv6 bool, protocol v1.Protocol) error {
if protocol != v1.ProtocolUDP {
return fmt.Errorf("FakeInterface currently only supports UDP")
}
fake.ClearedPorts.Insert(port)
return nil
}
// ClearEntriesForNAT is part of Interface
func (fake *FakeInterface) ClearEntriesForNAT(origin, dest string, protocol v1.Protocol) error {
if protocol != v1.ProtocolUDP {
return fmt.Errorf("FakeInterface currently only supports UDP")
}
if previous, exists := fake.ClearedNATs[origin]; exists && previous != dest {
return fmt.Errorf("ClearEntriesForNAT called with same origin (%s), different destination (%s / %s)", origin, previous, dest)
}
fake.ClearedNATs[origin] = dest
return nil
}
// ClearEntriesForPortNAT is part of Interface
func (fake *FakeInterface) ClearEntriesForPortNAT(dest string, port int, protocol v1.Protocol) error {
if protocol != v1.ProtocolUDP {
return fmt.Errorf("FakeInterface currently only supports UDP")
}
if previous, exists := fake.ClearedPortNATs[port]; exists && previous != dest {
return fmt.Errorf("ClearEntriesForPortNAT called with same port (%d), different destination (%s / %s)", port, previous, dest)
}
fake.ClearedPortNATs[port] = dest
return nil
}

View File

@@ -170,7 +170,7 @@ type Proxier struct {
iptables utiliptables.Interface
masqueradeAll bool
masqueradeMark string
exec utilexec.Interface
conntrack conntrack.Interface
localDetector proxyutiliptables.LocalTrafficDetector
hostname string
nodeIP net.IP
@@ -283,7 +283,7 @@ func NewProxier(ipFamily v1.IPFamily,
iptables: ipt,
masqueradeAll: masqueradeAll,
masqueradeMark: masqueradeMark,
exec: exec,
conntrack: conntrack.NewExec(exec),
localDetector: localDetector,
hostname: hostname,
nodeIP: nodeIP,
@@ -1538,7 +1538,7 @@ func (proxier *Proxier) syncProxyRules() {
}
// Finish housekeeping, clear stale conntrack entries for UDP Services
conntrack.CleanStaleEntries(proxier.iptables.IsIPv6(), proxier.exec, proxier.svcPortMap, serviceUpdateResult, endpointUpdateResult)
conntrack.CleanStaleEntries(proxier.conntrack, proxier.svcPortMap, serviceUpdateResult, endpointUpdateResult)
}
func (proxier *Proxier) writeServiceToEndpointRules(natRules proxyutil.LineBuffer, svcPortNameString string, svcInfo proxy.ServicePort, svcChain utiliptables.Chain, endpoints []proxy.Endpoint, args []string) {

View File

@@ -58,221 +58,10 @@ import (
"k8s.io/kubernetes/pkg/util/async"
utiliptables "k8s.io/kubernetes/pkg/util/iptables"
iptablestest "k8s.io/kubernetes/pkg/util/iptables/testing"
"k8s.io/utils/exec"
fakeexec "k8s.io/utils/exec/testing"
netutils "k8s.io/utils/net"
"k8s.io/utils/ptr"
)
func TestDeleteEndpointConnections(t *testing.T) {
const (
UDP = v1.ProtocolUDP
TCP = v1.ProtocolTCP
SCTP = v1.ProtocolSCTP
)
testCases := []struct {
description string
svcName string
svcIP string
svcPort int32
protocol v1.Protocol
endpoint string // IP:port endpoint
simulatedErr string
}{
{
description: "V4 UDP",
svcName: "v4-udp",
svcIP: "172.30.1.1",
svcPort: 80,
protocol: UDP,
endpoint: "10.240.0.3:80",
},
{
description: "V4 TCP",
svcName: "v4-tcp",
svcIP: "172.30.2.2",
svcPort: 80,
protocol: TCP,
endpoint: "10.240.0.4:80",
},
{
description: "V4 SCTP",
svcName: "v4-sctp",
svcIP: "172.30.3.3",
svcPort: 80,
protocol: SCTP,
endpoint: "10.240.0.5:80",
},
{
description: "V4 UDP, nothing to delete, benign error",
svcName: "v4-udp-nothing-to-delete",
svcIP: "172.30.4.4",
svcPort: 80,
protocol: UDP,
endpoint: "10.240.0.6:80",
simulatedErr: conntrack.NoConnectionToDelete,
},
{
description: "V4 UDP, unexpected error, should be glogged",
svcName: "v4-udp-simulated-error",
svcIP: "172.30.5.5",
svcPort: 80,
protocol: UDP,
endpoint: "10.240.0.7:80",
simulatedErr: "simulated error",
},
{
description: "V6 UDP",
svcName: "v6-udp",
svcIP: "fd00:1234::20",
svcPort: 80,
protocol: UDP,
endpoint: "[2001:db8::2]:80",
},
{
description: "V6 TCP",
svcName: "v6-tcp",
svcIP: "fd00:1234::30",
svcPort: 80,
protocol: TCP,
endpoint: "[2001:db8::3]:80",
},
{
description: "V6 SCTP",
svcName: "v6-sctp",
svcIP: "fd00:1234::40",
svcPort: 80,
protocol: SCTP,
endpoint: "[2001:db8::4]:80",
},
}
for _, tc := range testCases {
t.Run(tc.description, func(t *testing.T) {
priorGlogErrs := klog.Stats.Error.Lines()
// Create a fake executor for the conntrack utility.
fcmd := fakeexec.FakeCmd{}
fexec := &fakeexec.FakeExec{
LookPathFunc: func(cmd string) (string, error) { return cmd, nil },
}
execFunc := func(cmd string, args ...string) exec.Cmd {
return fakeexec.InitFakeCmd(&fcmd, cmd, args...)
}
if tc.protocol == UDP {
cmdOutput := "1 flow entries have been deleted"
var simErr error
// First call outputs cmdOutput and succeeds
fcmd.CombinedOutputScript = append(fcmd.CombinedOutputScript,
func() ([]byte, []byte, error) { return []byte(cmdOutput), nil, nil },
)
fexec.CommandScript = append(fexec.CommandScript, execFunc)
// Second call may succeed or fail
if tc.simulatedErr != "" {
cmdOutput = ""
simErr = fmt.Errorf(tc.simulatedErr)
}
fcmd.CombinedOutputScript = append(fcmd.CombinedOutputScript,
func() ([]byte, []byte, error) { return []byte(cmdOutput), nil, simErr },
)
fexec.CommandScript = append(fexec.CommandScript, execFunc)
}
endpointIP := proxyutil.IPPart(tc.endpoint)
isIPv6 := netutils.IsIPv6String(endpointIP)
var ipt utiliptables.Interface
if isIPv6 {
ipt = iptablestest.NewIPv6Fake()
} else {
ipt = iptablestest.NewFake()
}
fp := NewFakeProxier(ipt)
fp.exec = fexec
makeServiceMap(fp,
makeTestService("ns1", tc.svcName, func(svc *v1.Service) {
svc.Spec.ClusterIP = tc.svcIP
svc.Spec.Ports = []v1.ServicePort{{
Name: "p80",
Port: tc.svcPort,
Protocol: tc.protocol,
}}
svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyLocal
}),
)
fp.svcPortMap.Update(fp.serviceChanges)
slice := makeTestEndpointSlice("ns1", tc.svcName, 1, func(eps *discovery.EndpointSlice) {
if isIPv6 {
eps.AddressType = discovery.AddressTypeIPv6
} else {
eps.AddressType = discovery.AddressTypeIPv4
}
eps.Endpoints = []discovery.Endpoint{{
Addresses: []string{endpointIP},
}}
eps.Ports = []discovery.EndpointPort{{
Name: ptr.To("p80"),
Port: ptr.To[int32](80),
Protocol: ptr.To(tc.protocol),
}}
})
// Add and then remove the endpoint slice
fp.OnEndpointSliceAdd(slice)
fp.syncProxyRules()
fp.OnEndpointSliceDelete(slice)
fp.syncProxyRules()
// Check the executed conntrack command
if tc.protocol == UDP {
if fexec.CommandCalls != 2 {
t.Fatalf("Expected conntrack to be executed 2 times, but got %d", fexec.CommandCalls)
}
// First clear conntrack entries for the clusterIP when the
// endpoint is first added.
expectCommand := fmt.Sprintf("conntrack -D --orig-dst %s -p udp", tc.svcIP)
if isIPv6 {
expectCommand += " -f ipv6"
}
actualCommand := strings.Join(fcmd.CombinedOutputLog[0], " ")
if actualCommand != expectCommand {
t.Errorf("Expected command: %s, but executed %s", expectCommand, actualCommand)
}
// Then clear conntrack entries for the endpoint when it is
// deleted.
expectCommand = fmt.Sprintf("conntrack -D --orig-dst %s --dst-nat %s -p udp", tc.svcIP, endpointIP)
if isIPv6 {
expectCommand += " -f ipv6"
}
actualCommand = strings.Join(fcmd.CombinedOutputLog[1], " ")
if actualCommand != expectCommand {
t.Errorf("Expected command: %s, but executed %s", expectCommand, actualCommand)
}
} else if fexec.CommandCalls != 0 {
t.Fatalf("Expected conntrack to be executed 0 times, but got %d", fexec.CommandCalls)
}
// Check the number of new glog errors
var expGlogErrs int64
if tc.simulatedErr != "" && tc.simulatedErr != conntrack.NoConnectionToDelete {
expGlogErrs = 1
}
glogErrs := klog.Stats.Error.Lines() - priorGlogErrs
if glogErrs != expGlogErrs {
t.Errorf("Expected %d glogged errors, but got %d", expGlogErrs, glogErrs)
}
})
}
}
// Conventions for tests using NewFakeProxier:
//
// Pod IPs: 10.0.0.0/8
@@ -324,7 +113,6 @@ func NewFakeProxier(ipt utiliptables.Interface) *Proxier {
networkInterfacer.AddInterfaceAddr(&itf1, addrs1)
p := &Proxier{
exec: &fakeexec.FakeExec{},
svcPortMap: make(proxy.ServicePortMap),
serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipfamily, nil, nil),
endpointsMap: make(proxy.EndpointsMap),
@@ -332,6 +120,7 @@ func NewFakeProxier(ipt utiliptables.Interface) *Proxier {
needFullSync: true,
iptables: ipt,
masqueradeMark: "0x4000",
conntrack: conntrack.NewFake(),
localDetector: detectLocal,
hostname: testHostname,
serviceHealthServer: healthcheck.NewFakeServiceHealthServer(),
@@ -2126,22 +1915,23 @@ func TestClusterIPGeneral(t *testing.T) {
TargetPort: intstr.FromInt32(8443),
},
{
// Of course this should really be UDP, but if we
// create a service with UDP ports, the Proxier will
// try to do conntrack cleanup and we'd have to set
// the FakeExec up to be able to deal with that...
Name: "dns-sctp",
Name: "dns-udp",
Port: 53,
Protocol: v1.ProtocolSCTP,
Protocol: v1.ProtocolUDP,
},
{
Name: "dns-tcp",
Port: 53,
Protocol: v1.ProtocolTCP,
// We use TargetPort on TCP but not SCTP to help
// disambiguate the output.
// We use TargetPort on TCP but not UDP/SCTP to
// help disambiguate the output.
TargetPort: intstr.FromInt32(5353),
},
{
Name: "dns-sctp",
Port: 53,
Protocol: v1.ProtocolSCTP,
},
}
}),
)
@@ -2183,15 +1973,20 @@ func TestClusterIPGeneral(t *testing.T) {
Protocol: ptr.To(v1.ProtocolTCP),
},
{
Name: ptr.To("dns-sctp"),
Name: ptr.To("dns-udp"),
Port: ptr.To[int32](53),
Protocol: ptr.To(v1.ProtocolSCTP),
Protocol: ptr.To(v1.ProtocolUDP),
},
{
Name: ptr.To("dns-tcp"),
Port: ptr.To[int32](5353),
Protocol: ptr.To(v1.ProtocolTCP),
},
{
Name: ptr.To("dns-sctp"),
Port: ptr.To[int32](53),
Protocol: ptr.To(v1.ProtocolSCTP),
},
}
}),
)
@@ -2232,7 +2027,7 @@ func TestClusterIPGeneral(t *testing.T) {
masq: false,
},
{
name: "clusterIP with TCP and SCTP on same port (TCP)",
name: "clusterIP with TCP, UDP, and SCTP on same port (TCP)",
sourceIP: "10.180.0.2",
protocol: v1.ProtocolTCP,
destIP: "172.30.0.42",
@@ -2241,7 +2036,16 @@ func TestClusterIPGeneral(t *testing.T) {
masq: false,
},
{
name: "clusterIP with TCP and SCTP on same port (SCTP)",
name: "clusterIP with TCP, UDP, and SCTP on same port (UDP)",
sourceIP: "10.180.0.2",
protocol: v1.ProtocolUDP,
destIP: "172.30.0.42",
destPort: 53,
output: "10.180.0.1:53, 10.180.2.1:53",
masq: false,
},
{
name: "clusterIP with TCP, UDP, and SCTP on same port (SCTP)",
sourceIP: "10.180.0.2",
protocol: v1.ProtocolSCTP,
destIP: "172.30.0.42",
@@ -4337,139 +4141,6 @@ func TestHealthCheckNodePortWhenTerminating(t *testing.T) {
}
}
func TestProxierDeleteNodePortStaleUDP(t *testing.T) {
fcmd := fakeexec.FakeCmd{}
fexec := &fakeexec.FakeExec{
LookPathFunc: func(cmd string) (string, error) { return cmd, nil },
}
execFunc := func(cmd string, args ...string) exec.Cmd {
return fakeexec.InitFakeCmd(&fcmd, cmd, args...)
}
cmdOutput := "1 flow entries have been deleted"
cmdFunc := func() ([]byte, []byte, error) { return []byte(cmdOutput), nil, nil }
// Delete ClusterIP entries
fcmd.CombinedOutputScript = append(fcmd.CombinedOutputScript, cmdFunc)
fexec.CommandScript = append(fexec.CommandScript, execFunc)
// Delete ExternalIP entries
fcmd.CombinedOutputScript = append(fcmd.CombinedOutputScript, cmdFunc)
fexec.CommandScript = append(fexec.CommandScript, execFunc)
// Delete LoadBalancerIP entries
fcmd.CombinedOutputScript = append(fcmd.CombinedOutputScript, cmdFunc)
fexec.CommandScript = append(fexec.CommandScript, execFunc)
// Delete NodePort entries
fcmd.CombinedOutputScript = append(fcmd.CombinedOutputScript, cmdFunc)
fexec.CommandScript = append(fexec.CommandScript, execFunc)
ipt := iptablestest.NewFake()
fp := NewFakeProxier(ipt)
fp.exec = fexec
svcIP := "172.30.0.41"
extIP := "192.168.99.11"
lbIngressIP := "1.2.3.4"
svcPort := 80
nodePort := 31201
svcPortName := proxy.ServicePortName{
NamespacedName: makeNSN("ns1", "svc1"),
Port: "p80",
Protocol: v1.ProtocolUDP,
}
makeServiceMap(fp,
makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) {
svc.Spec.ClusterIP = svcIP
svc.Spec.ExternalIPs = []string{extIP}
svc.Spec.Type = "LoadBalancer"
svc.Spec.Ports = []v1.ServicePort{{
Name: svcPortName.Port,
Port: int32(svcPort),
Protocol: v1.ProtocolUDP,
NodePort: int32(nodePort),
}}
svc.Status.LoadBalancer.Ingress = []v1.LoadBalancerIngress{{
IP: lbIngressIP,
}}
}),
)
fp.syncProxyRules()
if fexec.CommandCalls != 0 {
t.Fatalf("Created service without endpoints must not clear conntrack entries")
}
epIP := "10.180.0.1"
populateEndpointSlices(fp,
makeTestEndpointSlice(svcPortName.Namespace, svcPortName.Name, 1, func(eps *discovery.EndpointSlice) {
eps.AddressType = discovery.AddressTypeIPv4
eps.Endpoints = []discovery.Endpoint{{
Addresses: []string{epIP},
Conditions: discovery.EndpointConditions{
Serving: ptr.To(false),
},
}}
eps.Ports = []discovery.EndpointPort{{
Name: ptr.To(svcPortName.Port),
Port: ptr.To(int32(svcPort)),
Protocol: ptr.To(v1.ProtocolUDP),
}}
}),
)
fp.syncProxyRules()
if fexec.CommandCalls != 0 {
t.Fatalf("Updated UDP service with not ready endpoints must not clear UDP entries")
}
populateEndpointSlices(fp,
makeTestEndpointSlice(svcPortName.Namespace, svcPortName.Name, 1, func(eps *discovery.EndpointSlice) {
eps.AddressType = discovery.AddressTypeIPv4
eps.Endpoints = []discovery.Endpoint{{
Addresses: []string{epIP},
Conditions: discovery.EndpointConditions{
Serving: ptr.To(true),
},
}}
eps.Ports = []discovery.EndpointPort{{
Name: ptr.To(svcPortName.Port),
Port: ptr.To(int32(svcPort)),
Protocol: ptr.To(v1.ProtocolUDP),
}}
}),
)
fp.syncProxyRules()
if fexec.CommandCalls != 4 {
t.Fatalf("Updated UDP service with new endpoints must clear UDP entries 4 times: ClusterIP, NodePort, ExternalIP and LB")
}
// the order is not guaranteed so we have to compare the strings in any order
expectedCommands := []string{
// Delete ClusterIP Conntrack entries
fmt.Sprintf("conntrack -D --orig-dst %s -p %s", svcIP, strings.ToLower(string((v1.ProtocolUDP)))),
// Delete ExternalIP Conntrack entries
fmt.Sprintf("conntrack -D --orig-dst %s -p %s", extIP, strings.ToLower(string((v1.ProtocolUDP)))),
// Delete LoadBalancerIP Conntrack entries
fmt.Sprintf("conntrack -D --orig-dst %s -p %s", lbIngressIP, strings.ToLower(string((v1.ProtocolUDP)))),
// Delete NodePort Conntrack entrie
fmt.Sprintf("conntrack -D -p %s --dport %d", strings.ToLower(string((v1.ProtocolUDP))), nodePort),
}
actualCommands := []string{
strings.Join(fcmd.CombinedOutputLog[0], " "),
strings.Join(fcmd.CombinedOutputLog[1], " "),
strings.Join(fcmd.CombinedOutputLog[2], " "),
strings.Join(fcmd.CombinedOutputLog[3], " "),
}
sort.Strings(expectedCommands)
sort.Strings(actualCommands)
if !reflect.DeepEqual(expectedCommands, actualCommands) {
t.Errorf("Expected commands: %v, but executed %v", expectedCommands, actualCommands)
}
}
func TestProxierMetricsIptablesTotalRules(t *testing.T) {
ipt := iptablestest.NewFake()
fp := NewFakeProxier(ipt)

View File

@@ -256,7 +256,7 @@ type Proxier struct {
iptables utiliptables.Interface
ipvs utilipvs.Interface
ipset utilipset.Interface
exec utilexec.Interface
conntrack conntrack.Interface
masqueradeAll bool
masqueradeMark string
localDetector proxyutiliptables.LocalTrafficDetector
@@ -433,7 +433,7 @@ func NewProxier(ipFamily v1.IPFamily,
iptables: ipt,
masqueradeAll: masqueradeAll,
masqueradeMark: masqueradeMark,
exec: exec,
conntrack: conntrack.NewExec(exec),
localDetector: localDetector,
hostname: hostname,
nodeIP: nodeIP,
@@ -1482,7 +1482,7 @@ func (proxier *Proxier) syncProxyRules() {
metrics.SyncProxyRulesNoLocalEndpointsTotal.WithLabelValues("external").Set(float64(proxier.serviceNoLocalEndpointsExternal.Len()))
// Finish housekeeping, clear stale conntrack entries for UDP Services
conntrack.CleanStaleEntries(proxier.ipFamily == v1.IPv6Protocol, proxier.exec, proxier.svcPortMap, serviceUpdateResult, endpointUpdateResult)
conntrack.CleanStaleEntries(proxier.conntrack, proxier.svcPortMap, serviceUpdateResult, endpointUpdateResult)
}
// writeIptablesRules write all iptables rules to proxier.natRules or proxier.FilterRules that ipvs proxier needed

View File

@@ -41,6 +41,7 @@ import (
"k8s.io/component-base/metrics/testutil"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/proxy"
"k8s.io/kubernetes/pkg/proxy/conntrack"
"k8s.io/kubernetes/pkg/proxy/healthcheck"
utilipset "k8s.io/kubernetes/pkg/proxy/ipvs/ipset"
ipsettest "k8s.io/kubernetes/pkg/proxy/ipvs/ipset/testing"
@@ -54,8 +55,6 @@ import (
"k8s.io/kubernetes/pkg/util/async"
utiliptables "k8s.io/kubernetes/pkg/util/iptables"
iptablestest "k8s.io/kubernetes/pkg/util/iptables/testing"
"k8s.io/utils/exec"
fakeexec "k8s.io/utils/exec/testing"
netutils "k8s.io/utils/net"
"k8s.io/utils/ptr"
)
@@ -131,26 +130,12 @@ func NewFakeProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface, ipset u
netlinkHandle := netlinktest.NewFakeNetlinkHandle(ipFamily == v1.IPv6Protocol)
netlinkHandle.SetLocalAddresses("eth0", nodeIPs...)
fcmd := fakeexec.FakeCmd{
CombinedOutputScript: []fakeexec.FakeAction{
func() ([]byte, []byte, error) { return []byte("dummy device have been created"), nil, nil },
func() ([]byte, []byte, error) { return []byte(""), nil, nil },
},
}
fexec := &fakeexec.FakeExec{
CommandScript: []fakeexec.FakeCommandAction{
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
},
LookPathFunc: func(cmd string) (string, error) { return cmd, nil },
}
// initialize ipsetList with all sets we needed
ipsetList := make(map[string]*IPSet)
for _, is := range ipsetInfo {
ipsetList[is.name] = NewIPSet(ipset, is.name, is.setType, false, is.comment)
}
p := &Proxier{
exec: fexec,
svcPortMap: make(proxy.ServicePortMap),
serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipFamily, nil, nil),
endpointsMap: make(proxy.EndpointsMap),
@@ -159,6 +144,7 @@ func NewFakeProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface, ipset u
iptables: ipt,
ipvs: ipvs,
ipset: ipset,
conntrack: conntrack.NewFake(),
strictARP: false,
localDetector: proxyutiliptables.NewNoOpLocalDetector(),
hostname: testHostname,

View File

@@ -162,7 +162,7 @@ type Proxier struct {
nftables knftables.Interface
masqueradeAll bool
masqueradeMark string
exec utilexec.Interface
conntrack conntrack.Interface
localDetector proxyutiliptables.LocalTrafficDetector
hostname string
nodeIP net.IP
@@ -236,7 +236,7 @@ func NewProxier(ipFamily v1.IPFamily,
nftables: nft,
masqueradeAll: masqueradeAll,
masqueradeMark: masqueradeMark,
exec: utilexec.New(),
conntrack: conntrack.NewExec(utilexec.New()),
localDetector: localDetector,
hostname: hostname,
nodeIP: nodeIP,
@@ -1548,7 +1548,7 @@ func (proxier *Proxier) syncProxyRules() {
}
// Finish housekeeping, clear stale conntrack entries for UDP Services
conntrack.CleanStaleEntries(proxier.ipFamily == v1.IPv6Protocol, proxier.exec, proxier.svcPortMap, serviceUpdateResult, endpointUpdateResult)
conntrack.CleanStaleEntries(proxier.conntrack, proxier.svcPortMap, serviceUpdateResult, endpointUpdateResult)
}
func (proxier *Proxier) writeServiceToEndpointRules(tx *knftables.Transaction, svcPortNameString string, svcInfo *servicePortInfo, svcChain string, endpoints []proxy.Endpoint) {

View File

@@ -23,8 +23,6 @@ import (
"fmt"
"net"
"reflect"
"sort"
"strings"
"testing"
"time"
@@ -39,7 +37,6 @@ import (
utilfeature "k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/component-base/metrics/testutil"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/proxy"
"k8s.io/kubernetes/pkg/proxy/conntrack"
@@ -50,213 +47,10 @@ import (
proxyutiliptables "k8s.io/kubernetes/pkg/proxy/util/iptables"
proxyutiltest "k8s.io/kubernetes/pkg/proxy/util/testing"
"k8s.io/kubernetes/pkg/util/async"
"k8s.io/utils/exec"
fakeexec "k8s.io/utils/exec/testing"
netutils "k8s.io/utils/net"
"k8s.io/utils/ptr"
)
func TestDeleteEndpointConnections(t *testing.T) {
const (
UDP = v1.ProtocolUDP
TCP = v1.ProtocolTCP
SCTP = v1.ProtocolSCTP
)
testCases := []struct {
description string
svcName string
svcIP string
svcPort int32
protocol v1.Protocol
endpoint string // IP:port endpoint
simulatedErr string
}{
{
description: "V4 UDP",
svcName: "v4-udp",
svcIP: "172.30.1.1",
svcPort: 80,
protocol: UDP,
endpoint: "10.240.0.3:80",
},
{
description: "V4 TCP",
svcName: "v4-tcp",
svcIP: "172.30.2.2",
svcPort: 80,
protocol: TCP,
endpoint: "10.240.0.4:80",
},
{
description: "V4 SCTP",
svcName: "v4-sctp",
svcIP: "172.30.3.3",
svcPort: 80,
protocol: SCTP,
endpoint: "10.240.0.5:80",
},
{
description: "V4 UDP, nothing to delete, benign error",
svcName: "v4-udp-nothing-to-delete",
svcIP: "172.30.4.4",
svcPort: 80,
protocol: UDP,
endpoint: "10.240.0.6:80",
simulatedErr: conntrack.NoConnectionToDelete,
},
{
description: "V4 UDP, unexpected error, should be glogged",
svcName: "v4-udp-simulated-error",
svcIP: "172.30.5.5",
svcPort: 80,
protocol: UDP,
endpoint: "10.240.0.7:80",
simulatedErr: "simulated error",
},
{
description: "V6 UDP",
svcName: "v6-udp",
svcIP: "fd00:1234::20",
svcPort: 80,
protocol: UDP,
endpoint: "[2001:db8::2]:80",
},
{
description: "V6 TCP",
svcName: "v6-tcp",
svcIP: "fd00:1234::30",
svcPort: 80,
protocol: TCP,
endpoint: "[2001:db8::3]:80",
},
{
description: "V6 SCTP",
svcName: "v6-sctp",
svcIP: "fd00:1234::40",
svcPort: 80,
protocol: SCTP,
endpoint: "[2001:db8::4]:80",
},
}
for _, tc := range testCases {
t.Run(tc.description, func(t *testing.T) {
priorGlogErrs := klog.Stats.Error.Lines()
// Create a fake executor for the conntrack utility.
fcmd := fakeexec.FakeCmd{}
fexec := &fakeexec.FakeExec{
LookPathFunc: func(cmd string) (string, error) { return cmd, nil },
}
execFunc := func(cmd string, args ...string) exec.Cmd {
return fakeexec.InitFakeCmd(&fcmd, cmd, args...)
}
if tc.protocol == UDP {
cmdOutput := "1 flow entries have been deleted"
var simErr error
// First call outputs cmdOutput and succeeds
fcmd.CombinedOutputScript = append(fcmd.CombinedOutputScript,
func() ([]byte, []byte, error) { return []byte(cmdOutput), nil, nil },
)
fexec.CommandScript = append(fexec.CommandScript, execFunc)
// Second call may succeed or fail
if tc.simulatedErr != "" {
cmdOutput = ""
simErr = fmt.Errorf(tc.simulatedErr)
}
fcmd.CombinedOutputScript = append(fcmd.CombinedOutputScript,
func() ([]byte, []byte, error) { return []byte(cmdOutput), nil, simErr },
)
fexec.CommandScript = append(fexec.CommandScript, execFunc)
}
endpointIP := proxyutil.IPPart(tc.endpoint)
_, fp := NewFakeProxier(proxyutil.GetIPFamilyFromIP(netutils.ParseIPSloppy(endpointIP)))
fp.exec = fexec
makeServiceMap(fp,
makeTestService("ns1", tc.svcName, func(svc *v1.Service) {
svc.Spec.ClusterIP = tc.svcIP
svc.Spec.Ports = []v1.ServicePort{{
Name: "p80",
Port: tc.svcPort,
Protocol: tc.protocol,
}}
svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyLocal
}),
)
fp.svcPortMap.Update(fp.serviceChanges)
slice := makeTestEndpointSlice("ns1", tc.svcName, 1, func(eps *discovery.EndpointSlice) {
if fp.ipFamily == v1.IPv6Protocol {
eps.AddressType = discovery.AddressTypeIPv6
} else {
eps.AddressType = discovery.AddressTypeIPv4
}
eps.Endpoints = []discovery.Endpoint{{
Addresses: []string{endpointIP},
}}
eps.Ports = []discovery.EndpointPort{{
Name: ptr.To("p80"),
Port: ptr.To[int32](80),
Protocol: ptr.To(tc.protocol),
}}
})
// Add and then remove the endpoint slice
fp.OnEndpointSliceAdd(slice)
fp.syncProxyRules()
fp.OnEndpointSliceDelete(slice)
fp.syncProxyRules()
// Check the executed conntrack command
if tc.protocol == UDP {
if fexec.CommandCalls != 2 {
t.Fatalf("Expected conntrack to be executed 2 times, but got %d", fexec.CommandCalls)
}
// First clear conntrack entries for the clusterIP when the
// endpoint is first added.
expectCommand := fmt.Sprintf("conntrack -D --orig-dst %s -p udp", tc.svcIP)
if fp.ipFamily == v1.IPv6Protocol {
expectCommand += " -f ipv6"
}
actualCommand := strings.Join(fcmd.CombinedOutputLog[0], " ")
if actualCommand != expectCommand {
t.Errorf("Expected command: %s, but executed %s", expectCommand, actualCommand)
}
// Then clear conntrack entries for the endpoint when it is
// deleted.
expectCommand = fmt.Sprintf("conntrack -D --orig-dst %s --dst-nat %s -p udp", tc.svcIP, endpointIP)
if fp.ipFamily == v1.IPv6Protocol {
expectCommand += " -f ipv6"
}
actualCommand = strings.Join(fcmd.CombinedOutputLog[1], " ")
if actualCommand != expectCommand {
t.Errorf("Expected command: %s, but executed %s", expectCommand, actualCommand)
}
} else if fexec.CommandCalls != 0 {
t.Fatalf("Expected conntrack to be executed 0 times, but got %d", fexec.CommandCalls)
}
// Check the number of new glog errors
var expGlogErrs int64
if tc.simulatedErr != "" && tc.simulatedErr != conntrack.NoConnectionToDelete {
expGlogErrs = 1
}
glogErrs := klog.Stats.Error.Lines() - priorGlogErrs
if glogErrs != expGlogErrs {
t.Errorf("Expected %d glogged errors, but got %d", expGlogErrs, glogErrs)
}
})
}
}
// Conventions for tests using NewFakeProxier:
//
// Pod IPs: 10.0.0.0/8
@@ -311,13 +105,13 @@ func NewFakeProxier(ipFamily v1.IPFamily) (*knftables.Fake, *Proxier) {
p := &Proxier{
ipFamily: ipFamily,
exec: &fakeexec.FakeExec{},
svcPortMap: make(proxy.ServicePortMap),
serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipFamily, nil, nil),
endpointsMap: make(proxy.EndpointsMap),
endpointsChanges: proxy.NewEndpointsChangeTracker(testHostname, newEndpointInfo, ipFamily, nil, nil),
nftables: nft,
masqueradeMark: "0x4000",
conntrack: conntrack.NewFake(),
localDetector: detectLocal,
hostname: testHostname,
serviceHealthServer: healthcheck.NewFakeServiceHealthServer(),
@@ -740,22 +534,23 @@ func TestClusterIPGeneral(t *testing.T) {
TargetPort: intstr.FromInt32(8443),
},
{
// Of course this should really be UDP, but if we
// create a service with UDP ports, the Proxier will
// try to do conntrack cleanup and we'd have to set
// the FakeExec up to be able to deal with that...
Name: "dns-sctp",
Name: "dns-udp",
Port: 53,
Protocol: v1.ProtocolSCTP,
Protocol: v1.ProtocolUDP,
},
{
Name: "dns-tcp",
Port: 53,
Protocol: v1.ProtocolTCP,
// We use TargetPort on TCP but not SCTP to help
// disambiguate the output.
// We use TargetPort on TCP but not UDP/SCTP to
// help disambiguate the output.
TargetPort: intstr.FromInt32(5353),
},
{
Name: "dns-sctp",
Port: 53,
Protocol: v1.ProtocolSCTP,
},
}
}),
)
@@ -797,15 +592,20 @@ func TestClusterIPGeneral(t *testing.T) {
Protocol: ptr.To(v1.ProtocolTCP),
},
{
Name: ptr.To("dns-sctp"),
Name: ptr.To("dns-udp"),
Port: ptr.To[int32](53),
Protocol: ptr.To(v1.ProtocolSCTP),
Protocol: ptr.To(v1.ProtocolUDP),
},
{
Name: ptr.To("dns-tcp"),
Port: ptr.To[int32](5353),
Protocol: ptr.To(v1.ProtocolTCP),
},
{
Name: ptr.To("dns-sctp"),
Port: ptr.To[int32](53),
Protocol: ptr.To(v1.ProtocolSCTP),
},
}
}),
)
@@ -846,7 +646,7 @@ func TestClusterIPGeneral(t *testing.T) {
masq: false,
},
{
name: "clusterIP with TCP and SCTP on same port (TCP)",
name: "clusterIP with TCP, UDP, and SCTP on same port (TCP)",
sourceIP: "10.180.0.2",
protocol: v1.ProtocolTCP,
destIP: "172.30.0.42",
@@ -855,7 +655,16 @@ func TestClusterIPGeneral(t *testing.T) {
masq: false,
},
{
name: "clusterIP with TCP and SCTP on same port (SCTP)",
name: "clusterIP with TCP, UDP, and SCTP on same port (TCP)",
sourceIP: "10.180.0.2",
protocol: v1.ProtocolUDP,
destIP: "172.30.0.42",
destPort: 53,
output: "10.180.0.1:53, 10.180.2.1:53",
masq: false,
},
{
name: "clusterIP with TCP, UDP, and SCTP on same port (SCTP)",
sourceIP: "10.180.0.2",
protocol: v1.ProtocolSCTP,
destIP: "172.30.0.42",
@@ -2737,138 +2546,6 @@ func TestHealthCheckNodePortWhenTerminating(t *testing.T) {
}
}
func TestProxierDeleteNodePortStaleUDP(t *testing.T) {
fcmd := fakeexec.FakeCmd{}
fexec := &fakeexec.FakeExec{
LookPathFunc: func(cmd string) (string, error) { return cmd, nil },
}
execFunc := func(cmd string, args ...string) exec.Cmd {
return fakeexec.InitFakeCmd(&fcmd, cmd, args...)
}
cmdOutput := "1 flow entries have been deleted"
cmdFunc := func() ([]byte, []byte, error) { return []byte(cmdOutput), nil, nil }
// Delete ClusterIP entries
fcmd.CombinedOutputScript = append(fcmd.CombinedOutputScript, cmdFunc)
fexec.CommandScript = append(fexec.CommandScript, execFunc)
// Delete ExternalIP entries
fcmd.CombinedOutputScript = append(fcmd.CombinedOutputScript, cmdFunc)
fexec.CommandScript = append(fexec.CommandScript, execFunc)
// Delete LoadBalancerIP entries
fcmd.CombinedOutputScript = append(fcmd.CombinedOutputScript, cmdFunc)
fexec.CommandScript = append(fexec.CommandScript, execFunc)
// Delete NodePort entries
fcmd.CombinedOutputScript = append(fcmd.CombinedOutputScript, cmdFunc)
fexec.CommandScript = append(fexec.CommandScript, execFunc)
_, fp := NewFakeProxier(v1.IPv4Protocol)
fp.exec = fexec
svcIP := "172.30.0.41"
extIP := "192.168.99.11"
lbIngressIP := "1.2.3.4"
svcPort := 80
nodePort := 31201
svcPortName := proxy.ServicePortName{
NamespacedName: makeNSN("ns1", "svc1"),
Port: "p80",
Protocol: v1.ProtocolUDP,
}
makeServiceMap(fp,
makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) {
svc.Spec.ClusterIP = svcIP
svc.Spec.ExternalIPs = []string{extIP}
svc.Spec.Type = "LoadBalancer"
svc.Spec.Ports = []v1.ServicePort{{
Name: svcPortName.Port,
Port: int32(svcPort),
Protocol: v1.ProtocolUDP,
NodePort: int32(nodePort),
}}
svc.Status.LoadBalancer.Ingress = []v1.LoadBalancerIngress{{
IP: lbIngressIP,
}}
}),
)
fp.syncProxyRules()
if fexec.CommandCalls != 0 {
t.Fatalf("Created service without endpoints must not clear conntrack entries")
}
epIP := "10.180.0.1"
populateEndpointSlices(fp,
makeTestEndpointSlice(svcPortName.Namespace, svcPortName.Name, 1, func(eps *discovery.EndpointSlice) {
eps.AddressType = discovery.AddressTypeIPv4
eps.Endpoints = []discovery.Endpoint{{
Addresses: []string{epIP},
Conditions: discovery.EndpointConditions{
Serving: ptr.To(false),
},
}}
eps.Ports = []discovery.EndpointPort{{
Name: ptr.To(svcPortName.Port),
Port: ptr.To(int32(svcPort)),
Protocol: ptr.To(v1.ProtocolUDP),
}}
}),
)
fp.syncProxyRules()
if fexec.CommandCalls != 0 {
t.Fatalf("Updated UDP service with not ready endpoints must not clear UDP entries")
}
populateEndpointSlices(fp,
makeTestEndpointSlice(svcPortName.Namespace, svcPortName.Name, 1, func(eps *discovery.EndpointSlice) {
eps.AddressType = discovery.AddressTypeIPv4
eps.Endpoints = []discovery.Endpoint{{
Addresses: []string{epIP},
Conditions: discovery.EndpointConditions{
Serving: ptr.To(true),
},
}}
eps.Ports = []discovery.EndpointPort{{
Name: ptr.To(svcPortName.Port),
Port: ptr.To(int32(svcPort)),
Protocol: ptr.To(v1.ProtocolUDP),
}}
}),
)
fp.syncProxyRules()
if fexec.CommandCalls != 4 {
t.Fatalf("Updated UDP service with new endpoints must clear UDP entries 4 times: ClusterIP, NodePort, ExternalIP and LB")
}
// the order is not guaranteed so we have to compare the strings in any order
expectedCommands := []string{
// Delete ClusterIP Conntrack entries
fmt.Sprintf("conntrack -D --orig-dst %s -p %s", svcIP, strings.ToLower(string((v1.ProtocolUDP)))),
// Delete ExternalIP Conntrack entries
fmt.Sprintf("conntrack -D --orig-dst %s -p %s", extIP, strings.ToLower(string((v1.ProtocolUDP)))),
// Delete LoadBalancerIP Conntrack entries
fmt.Sprintf("conntrack -D --orig-dst %s -p %s", lbIngressIP, strings.ToLower(string((v1.ProtocolUDP)))),
// Delete NodePort Conntrack entrie
fmt.Sprintf("conntrack -D -p %s --dport %d", strings.ToLower(string((v1.ProtocolUDP))), nodePort),
}
actualCommands := []string{
strings.Join(fcmd.CombinedOutputLog[0], " "),
strings.Join(fcmd.CombinedOutputLog[1], " "),
strings.Join(fcmd.CombinedOutputLog[2], " "),
strings.Join(fcmd.CombinedOutputLog[3], " "),
}
sort.Strings(expectedCommands)
sort.Strings(actualCommands)
if !reflect.DeepEqual(expectedCommands, actualCommands) {
t.Errorf("Expected commands: %v, but executed %v", expectedCommands, actualCommands)
}
}
// TODO(thockin): add *more* tests for syncProxyRules() or break it down further and test the pieces.
// This test ensures that the iptables proxier supports translating Endpoints to