Merge pull request #53555 from leblancd/v6_del_endpoint_proxier
Automatic merge from submit-queue (batch tested with PRs 55988, 53555, 55858). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>. Add IPv6 and negative UT test cases for proxier's deleteEndpointConnections This change adds IPv6 and negative UT test cases for the proxier's deleteEndpointConnections. Changes include: - Add IPv6 UT test cases to TestDeleteEndpointConnections. - Add negative UT test case to TestDeleteEndpointConnections for handling case where no connections need clearing (benign error). - Add negative UT test case to test unexpected error. - Reorganize UT in TestDeleteEndpointConnections so that the fake command executor's command and scripted responses are generated on the fly based on the test case table (rather than using a fixed set of commands/responses that will need to be updated every time test cases are added/deleted). - Create the proxier service map in real time, based on the test case table (rather than using a fixed service map that will need to be updated every time test cases are added/deleted). fixes #53554 **What this PR does / why we need it**: This change adds IPv6 and negative UT test cases for the proxier's deleteEndpointConnections. **Which issue this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close that issue when PR gets merged)*: fixes #53554 **Special notes for your reviewer**: **Release note**: ```release-note NONE ```
This commit is contained in:
commit
02ca5cac01
@ -49,6 +49,7 @@ go_test(
|
||||
"//pkg/util/iptables:go_default_library",
|
||||
"//pkg/util/iptables/testing:go_default_library",
|
||||
"//vendor/github.com/davecgh/go-spew/spew:go_default_library",
|
||||
"//vendor/github.com/golang/glog:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/intstr:go_default_library",
|
||||
|
@ -24,6 +24,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/davecgh/go-spew/spew"
|
||||
"github.com/golang/glog"
|
||||
|
||||
"fmt"
|
||||
"net"
|
||||
@ -89,7 +90,7 @@ func TestReadLinesFromByteBuffer(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestGetChainLines(t *testing.T) {
|
||||
iptables_save := `# Generated by iptables-save v1.4.7 on Wed Oct 29 14:56:01 2014
|
||||
iptablesSave := `# Generated by iptables-save v1.4.7 on Wed Oct 29 14:56:01 2014
|
||||
*nat
|
||||
:PREROUTING ACCEPT [2136997:197881818]
|
||||
:POSTROUTING ACCEPT [4284525:258542680]
|
||||
@ -102,11 +103,11 @@ func TestGetChainLines(t *testing.T) {
|
||||
utiliptables.ChainPostrouting: ":POSTROUTING ACCEPT [4284525:258542680]",
|
||||
utiliptables.ChainOutput: ":OUTPUT ACCEPT [5901660:357267963]",
|
||||
}
|
||||
checkAllLines(t, utiliptables.TableNAT, []byte(iptables_save), expected)
|
||||
checkAllLines(t, utiliptables.TableNAT, []byte(iptablesSave), expected)
|
||||
}
|
||||
|
||||
func TestGetChainLinesMultipleTables(t *testing.T) {
|
||||
iptables_save := `# Generated by iptables-save v1.4.21 on Fri Aug 7 14:47:37 2015
|
||||
iptablesSave := `# Generated by iptables-save v1.4.21 on Fri Aug 7 14:47:37 2015
|
||||
*nat
|
||||
:PREROUTING ACCEPT [2:138]
|
||||
:INPUT ACCEPT [0:0]
|
||||
@ -174,7 +175,7 @@ func TestGetChainLinesMultipleTables(t *testing.T) {
|
||||
utiliptables.Chain("KUBE-SVC-5555555555555555"): ":KUBE-SVC-5555555555555555 - [0:0]",
|
||||
utiliptables.Chain("KUBE-SVC-6666666666666666"): ":KUBE-SVC-6666666666666666 - [0:0]",
|
||||
}
|
||||
checkAllLines(t, utiliptables.TableNAT, []byte(iptables_save), expected)
|
||||
checkAllLines(t, utiliptables.TableNAT, []byte(iptablesSave), expected)
|
||||
}
|
||||
|
||||
func newFakeServiceInfo(service proxy.ServicePortName, ip net.IP, port int, protocol api.Protocol, onlyNodeLocalEndpoints bool) *serviceInfo {
|
||||
@ -189,67 +190,153 @@ func newFakeServiceInfo(service proxy.ServicePortName, ip net.IP, port int, prot
|
||||
}
|
||||
|
||||
func TestDeleteEndpointConnections(t *testing.T) {
|
||||
fcmd := fakeexec.FakeCmd{
|
||||
CombinedOutputScript: []fakeexec.FakeCombinedOutputAction{
|
||||
func() ([]byte, error) { return []byte("1 flow entries have been deleted"), nil },
|
||||
func() ([]byte, error) {
|
||||
return []byte(""), fmt.Errorf("conntrack v1.4.2 (conntrack-tools): 0 flow entries have been deleted.")
|
||||
},
|
||||
const (
|
||||
UDP = api.ProtocolUDP
|
||||
TCP = api.ProtocolTCP
|
||||
)
|
||||
testCases := []struct {
|
||||
description string
|
||||
svcName string
|
||||
svcIP string
|
||||
svcPort int
|
||||
protocol api.Protocol
|
||||
endpoint string // IP:port endpoint
|
||||
epSvcPair endpointServicePair // Will be generated by test
|
||||
simulatedErr string
|
||||
}{
|
||||
{
|
||||
description: "V4 UDP",
|
||||
svcName: "v4-udp",
|
||||
svcIP: "10.96.1.1",
|
||||
svcPort: 80,
|
||||
protocol: UDP,
|
||||
endpoint: "10.240.0.3:80",
|
||||
}, {
|
||||
description: "V4 TCP",
|
||||
svcName: "v4-tcp",
|
||||
svcIP: "10.96.2.2",
|
||||
svcPort: 80,
|
||||
protocol: TCP,
|
||||
endpoint: "10.240.0.4:80",
|
||||
}, {
|
||||
description: "V4 UDP, nothing to delete, benign error",
|
||||
svcName: "v4-udp-nothing-to-delete",
|
||||
svcIP: "10.96.1.1",
|
||||
svcPort: 80,
|
||||
protocol: UDP,
|
||||
endpoint: "10.240.0.3:80",
|
||||
simulatedErr: utilproxy.NoConnectionToDelete,
|
||||
}, {
|
||||
description: "V4 UDP, unexpected error, should be glogged",
|
||||
svcName: "v4-udp-simulated-error",
|
||||
svcIP: "10.96.1.1",
|
||||
svcPort: 80,
|
||||
protocol: UDP,
|
||||
endpoint: "10.240.0.3:80",
|
||||
simulatedErr: "simulated error",
|
||||
}, {
|
||||
description: "V6 UDP",
|
||||
svcName: "v6-udp",
|
||||
svcIP: "fd00:1234::20",
|
||||
svcPort: 80,
|
||||
protocol: UDP,
|
||||
endpoint: "[2001:db8::2]:80",
|
||||
}, {
|
||||
description: "V6 TCP",
|
||||
svcName: "v6-tcp",
|
||||
svcIP: "fd00:1234::30",
|
||||
svcPort: 80,
|
||||
protocol: TCP,
|
||||
endpoint: "[2001:db8::3]:80",
|
||||
},
|
||||
}
|
||||
|
||||
// Create a service map that has service info entries for all test cases
|
||||
// and generate an endpoint service pair for each test case
|
||||
serviceMap := make(map[proxy.ServicePortName]*serviceInfo)
|
||||
for i, tc := range testCases {
|
||||
svc := proxy.ServicePortName{
|
||||
NamespacedName: types.NamespacedName{Namespace: "ns1", Name: tc.svcName},
|
||||
Port: "p80",
|
||||
}
|
||||
serviceMap[svc] = newFakeServiceInfo(svc, net.ParseIP(tc.svcIP), 80, tc.protocol, false)
|
||||
testCases[i].epSvcPair = endpointServicePair{
|
||||
endpoint: tc.endpoint,
|
||||
servicePortName: svc,
|
||||
}
|
||||
}
|
||||
|
||||
// Create a fake executor for the conntrack utility. This should only be
|
||||
// invoked for UDP connections, since no conntrack cleanup is needed for TCP
|
||||
fcmd := fakeexec.FakeCmd{}
|
||||
fexec := fakeexec.FakeExec{
|
||||
CommandScript: []fakeexec.FakeCommandAction{
|
||||
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
|
||||
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
|
||||
},
|
||||
LookPathFunc: func(cmd string) (string, error) { return cmd, nil },
|
||||
}
|
||||
|
||||
serviceMap := make(map[proxy.ServicePortName]*serviceInfo)
|
||||
svc1 := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "ns1", Name: "svc1"}, Port: "p80"}
|
||||
svc2 := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "ns1", Name: "svc2"}, Port: "p80"}
|
||||
serviceMap[svc1] = newFakeServiceInfo(svc1, net.IPv4(10, 20, 30, 40), 80, api.ProtocolUDP, false)
|
||||
serviceMap[svc2] = newFakeServiceInfo(svc1, net.IPv4(10, 20, 30, 41), 80, api.ProtocolTCP, false)
|
||||
|
||||
fakeProxier := Proxier{exec: &fexec, serviceMap: serviceMap}
|
||||
|
||||
testCases := []endpointServicePair{
|
||||
{
|
||||
endpoint: "10.240.0.3:80",
|
||||
servicePortName: svc1,
|
||||
},
|
||||
{
|
||||
endpoint: "10.240.0.4:80",
|
||||
servicePortName: svc1,
|
||||
},
|
||||
{
|
||||
endpoint: "10.240.0.5:80",
|
||||
servicePortName: svc2,
|
||||
},
|
||||
{
|
||||
endpoint: "[fd00:1::5]:8080",
|
||||
servicePortName: svc2,
|
||||
},
|
||||
execFunc := func(cmd string, args ...string) exec.Cmd {
|
||||
return fakeexec.InitFakeCmd(&fcmd, cmd, args...)
|
||||
}
|
||||
for _, tc := range testCases {
|
||||
if tc.protocol == UDP {
|
||||
var cmdOutput string
|
||||
var simErr error
|
||||
if tc.simulatedErr == "" {
|
||||
cmdOutput = "1 flow entries have been deleted"
|
||||
} else {
|
||||
simErr = fmt.Errorf(tc.simulatedErr)
|
||||
}
|
||||
cmdFunc := func() ([]byte, error) { return []byte(cmdOutput), simErr }
|
||||
fcmd.CombinedOutputScript = append(fcmd.CombinedOutputScript, cmdFunc)
|
||||
fexec.CommandScript = append(fexec.CommandScript, execFunc)
|
||||
}
|
||||
}
|
||||
|
||||
expectCommandExecCount := 0
|
||||
for i := range testCases {
|
||||
input := map[endpointServicePair]bool{testCases[i]: true}
|
||||
// Create a proxier using the fake conntrack executor and service map
|
||||
fakeProxier := Proxier{exec: &fexec, serviceMap: serviceMap}
|
||||
|
||||
// Run the test cases
|
||||
for _, tc := range testCases {
|
||||
priorExecs := fexec.CommandCalls
|
||||
priorGlogErrs := glog.Stats.Error.Lines()
|
||||
|
||||
input := map[endpointServicePair]bool{tc.epSvcPair: true}
|
||||
fakeProxier.deleteEndpointConnections(input)
|
||||
svcInfo := fakeProxier.serviceMap[testCases[i].servicePortName]
|
||||
if svcInfo.protocol == api.ProtocolUDP {
|
||||
svcIp := svcInfo.clusterIP.String()
|
||||
endpointIp := utilproxy.IPPart(testCases[i].endpoint)
|
||||
expectCommand := fmt.Sprintf("conntrack -D --orig-dst %s --dst-nat %s -p udp", svcIp, endpointIp)
|
||||
execCommand := strings.Join(fcmd.CombinedOutputLog[expectCommandExecCount], " ")
|
||||
if expectCommand != execCommand {
|
||||
t.Errorf("Exepect comand: %s, but executed %s", expectCommand, execCommand)
|
||||
|
||||
// For UDP connections, check the executed conntrack command
|
||||
var expExecs int
|
||||
if tc.protocol == UDP {
|
||||
isIPv6 := func(ip string) bool {
|
||||
netIP := net.ParseIP(ip)
|
||||
if netIP.To4() == nil {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
expectCommandExecCount += 1
|
||||
endpointIP := utilproxy.IPPart(tc.endpoint)
|
||||
expectCommand := fmt.Sprintf("conntrack -D --orig-dst %s --dst-nat %s -p udp", tc.svcIP, endpointIP)
|
||||
if isIPv6(endpointIP) {
|
||||
expectCommand += " -f ipv6"
|
||||
}
|
||||
actualCommand := strings.Join(fcmd.CombinedOutputLog[fexec.CommandCalls-1], " ")
|
||||
if actualCommand != expectCommand {
|
||||
t.Errorf("%s: Expected command: %s, but executed %s", tc.description, expectCommand, actualCommand)
|
||||
}
|
||||
expExecs = 1
|
||||
}
|
||||
|
||||
if expectCommandExecCount != fexec.CommandCalls {
|
||||
t.Errorf("Exepect comand executed %d times, but got %d", expectCommandExecCount, fexec.CommandCalls)
|
||||
// Check the number of times conntrack was executed
|
||||
execs := fexec.CommandCalls - priorExecs
|
||||
if execs != expExecs {
|
||||
t.Errorf("%s: Expected conntrack to be executed %d times, but got %d", tc.description, expExecs, execs)
|
||||
}
|
||||
|
||||
// Check the number of new glog errors
|
||||
var expGlogErrs int64
|
||||
if tc.simulatedErr != "" && tc.simulatedErr != utilproxy.NoConnectionToDelete {
|
||||
expGlogErrs = 1
|
||||
}
|
||||
glogErrs := glog.Stats.Error.Lines() - priorGlogErrs
|
||||
if glogErrs != expGlogErrs {
|
||||
t.Errorf("%s: Expected %d glogged errors, but got %d", tc.description, expGlogErrs, glogErrs)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -27,7 +27,7 @@ import (
|
||||
|
||||
// Utilities for dealing with conntrack
|
||||
|
||||
const noConnectionToDelete = "0 flow entries have been deleted"
|
||||
const NoConnectionToDelete = "0 flow entries have been deleted"
|
||||
|
||||
func IsIPv6(netIP net.IP) bool {
|
||||
return netIP != nil && netIP.To4() == nil
|
||||
@ -50,7 +50,7 @@ func parametersWithFamily(isIPv6 bool, parameters ...string) []string {
|
||||
func ClearUDPConntrackForIP(execer exec.Interface, ip string) error {
|
||||
parameters := parametersWithFamily(IsIPv6String(ip), "-D", "--orig-dst", ip, "-p", "udp")
|
||||
err := ExecConntrackTool(execer, parameters...)
|
||||
if err != nil && !strings.Contains(err.Error(), noConnectionToDelete) {
|
||||
if err != nil && !strings.Contains(err.Error(), NoConnectionToDelete) {
|
||||
// TODO: Better handling for deletion failure. When failure occur, stale udp connection may not get flushed.
|
||||
// These stale udp connection will keep black hole traffic. Making this a best effort operation for now, since it
|
||||
// is expensive to baby-sit all udp connections to kubernetes services.
|
||||
@ -84,7 +84,7 @@ func ClearUDPConntrackForPort(execer exec.Interface, port int, isIPv6 bool) erro
|
||||
}
|
||||
parameters := parametersWithFamily(isIPv6, "-D", "-p", "udp", "--dport", strconv.Itoa(port))
|
||||
err := ExecConntrackTool(execer, parameters...)
|
||||
if err != nil && !strings.Contains(err.Error(), noConnectionToDelete) {
|
||||
if err != nil && !strings.Contains(err.Error(), NoConnectionToDelete) {
|
||||
return fmt.Errorf("error deleting conntrack entries for UDP port: %d, error: %v", port, err)
|
||||
}
|
||||
return nil
|
||||
@ -95,7 +95,7 @@ func ClearUDPConntrackForPort(execer exec.Interface, port int, isIPv6 bool) erro
|
||||
func ClearUDPConntrackForPeers(execer exec.Interface, origin, dest string) error {
|
||||
parameters := parametersWithFamily(IsIPv6String(origin), "-D", "--orig-dst", origin, "--dst-nat", dest, "-p", "udp")
|
||||
err := ExecConntrackTool(execer, parameters...)
|
||||
if err != nil && !strings.Contains(err.Error(), noConnectionToDelete) {
|
||||
if err != nil && !strings.Contains(err.Error(), NoConnectionToDelete) {
|
||||
// TODO: Better handling for deletion failure. When failure occur, stale udp connection may not get flushed.
|
||||
// These stale udp connection will keep black hole traffic. Making this a best effort operation for now, since it
|
||||
// is expensive to baby sit all udp connections to kubernetes services.
|
||||
|
Loading…
Reference in New Issue
Block a user