Merge pull request #22573 from freehan/udpproxy
Automatic merge from submit-queue Flush conntrack state for removed/changed UDP Services fixes: #19029
This commit is contained in:
@@ -103,28 +103,28 @@ kube::build::get_docker_wrapped_binaries() {
|
|||||||
kube-apiserver,busybox
|
kube-apiserver,busybox
|
||||||
kube-controller-manager,busybox
|
kube-controller-manager,busybox
|
||||||
kube-scheduler,busybox
|
kube-scheduler,busybox
|
||||||
kube-proxy,gcr.io/google_containers/debian-iptables-amd64:v2
|
kube-proxy,gcr.io/google_containers/debian-iptables-amd64:v3
|
||||||
);;
|
);;
|
||||||
"arm")
|
"arm")
|
||||||
local targets=(
|
local targets=(
|
||||||
kube-apiserver,armel/busybox
|
kube-apiserver,armel/busybox
|
||||||
kube-controller-manager,armel/busybox
|
kube-controller-manager,armel/busybox
|
||||||
kube-scheduler,armel/busybox
|
kube-scheduler,armel/busybox
|
||||||
kube-proxy,gcr.io/google_containers/debian-iptables-arm:v2
|
kube-proxy,gcr.io/google_containers/debian-iptables-arm:v3
|
||||||
);;
|
);;
|
||||||
"arm64")
|
"arm64")
|
||||||
local targets=(
|
local targets=(
|
||||||
kube-apiserver,aarch64/busybox
|
kube-apiserver,aarch64/busybox
|
||||||
kube-controller-manager,aarch64/busybox
|
kube-controller-manager,aarch64/busybox
|
||||||
kube-scheduler,aarch64/busybox
|
kube-scheduler,aarch64/busybox
|
||||||
kube-proxy,gcr.io/google_containers/debian-iptables-arm64:v2
|
kube-proxy,gcr.io/google_containers/debian-iptables-arm64:v3
|
||||||
);;
|
);;
|
||||||
"ppc64le")
|
"ppc64le")
|
||||||
local targets=(
|
local targets=(
|
||||||
kube-apiserver,ppc64le/busybox
|
kube-apiserver,ppc64le/busybox
|
||||||
kube-controller-manager,ppc64le/busybox
|
kube-controller-manager,ppc64le/busybox
|
||||||
kube-scheduler,ppc64le/busybox
|
kube-scheduler,ppc64le/busybox
|
||||||
kube-proxy,gcr.io/google_containers/debian-iptables-ppc64le:v2
|
kube-proxy,gcr.io/google_containers/debian-iptables-ppc64le:v3
|
||||||
);;
|
);;
|
||||||
esac
|
esac
|
||||||
|
|
||||||
|
@@ -22,4 +22,5 @@ CROSS_BUILD_COPY qemu-ARCH-static /usr/bin/
|
|||||||
# cleanup has no effect.
|
# cleanup has no effect.
|
||||||
RUN DEBIAN_FRONTEND=noninteractive apt-get update \
|
RUN DEBIAN_FRONTEND=noninteractive apt-get update \
|
||||||
&& DEBIAN_FRONTEND=noninteractive apt-get install -y iptables \
|
&& DEBIAN_FRONTEND=noninteractive apt-get install -y iptables \
|
||||||
|
&& DEBIAN_FRONTEND=noninteractive apt-get install -y conntrack \
|
||||||
&& rm -rf /var/lib/apt/lists/*
|
&& rm -rf /var/lib/apt/lists/*
|
||||||
|
@@ -16,7 +16,7 @@
|
|||||||
|
|
||||||
REGISTRY?="gcr.io/google_containers"
|
REGISTRY?="gcr.io/google_containers"
|
||||||
IMAGE=debian-iptables
|
IMAGE=debian-iptables
|
||||||
TAG=v2
|
TAG=v3
|
||||||
ARCH?=amd64
|
ARCH?=amd64
|
||||||
TEMP_DIR:=$(shell mktemp -d)
|
TEMP_DIR:=$(shell mktemp -d)
|
||||||
|
|
||||||
|
@@ -41,6 +41,7 @@ import (
|
|||||||
"k8s.io/kubernetes/pkg/types"
|
"k8s.io/kubernetes/pkg/types"
|
||||||
utilexec "k8s.io/kubernetes/pkg/util/exec"
|
utilexec "k8s.io/kubernetes/pkg/util/exec"
|
||||||
utiliptables "k8s.io/kubernetes/pkg/util/iptables"
|
utiliptables "k8s.io/kubernetes/pkg/util/iptables"
|
||||||
|
"k8s.io/kubernetes/pkg/util/sets"
|
||||||
"k8s.io/kubernetes/pkg/util/slice"
|
"k8s.io/kubernetes/pkg/util/slice"
|
||||||
utilsysctl "k8s.io/kubernetes/pkg/util/sysctl"
|
utilsysctl "k8s.io/kubernetes/pkg/util/sysctl"
|
||||||
)
|
)
|
||||||
@@ -160,6 +161,7 @@ type Proxier struct {
|
|||||||
iptables utiliptables.Interface
|
iptables utiliptables.Interface
|
||||||
masqueradeAll bool
|
masqueradeAll bool
|
||||||
masqueradeMark string
|
masqueradeMark string
|
||||||
|
exec utilexec.Interface
|
||||||
}
|
}
|
||||||
|
|
||||||
type localPort struct {
|
type localPort struct {
|
||||||
@@ -220,6 +222,7 @@ func NewProxier(ipt utiliptables.Interface, exec utilexec.Interface, syncPeriod
|
|||||||
iptables: ipt,
|
iptables: ipt,
|
||||||
masqueradeAll: masqueradeAll,
|
masqueradeAll: masqueradeAll,
|
||||||
masqueradeMark: masqueradeMark,
|
masqueradeMark: masqueradeMark,
|
||||||
|
exec: exec,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -434,15 +437,21 @@ func (proxier *Proxier) OnServiceUpdate(allServices []api.Service) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
staleUDPServices := sets.NewString()
|
||||||
// Remove services missing from the update.
|
// Remove services missing from the update.
|
||||||
for name := range proxier.serviceMap {
|
for name := range proxier.serviceMap {
|
||||||
if !activeServices[name] {
|
if !activeServices[name] {
|
||||||
glog.V(1).Infof("Removing service %q", name)
|
glog.V(1).Infof("Removing service %q", name)
|
||||||
|
if proxier.serviceMap[name].protocol == api.ProtocolUDP {
|
||||||
|
staleUDPServices.Insert(proxier.serviceMap[name].clusterIP.String())
|
||||||
|
}
|
||||||
delete(proxier.serviceMap, name)
|
delete(proxier.serviceMap, name)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
proxier.syncProxyRules()
|
proxier.syncProxyRules()
|
||||||
|
proxier.deleteServiceConnections(staleUDPServices.List())
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// OnEndpointsUpdate takes in a slice of updated endpoints.
|
// OnEndpointsUpdate takes in a slice of updated endpoints.
|
||||||
@@ -457,6 +466,7 @@ func (proxier *Proxier) OnEndpointsUpdate(allEndpoints []api.Endpoints) {
|
|||||||
proxier.haveReceivedEndpointsUpdate = true
|
proxier.haveReceivedEndpointsUpdate = true
|
||||||
|
|
||||||
activeEndpoints := make(map[proxy.ServicePortName]bool) // use a map as a set
|
activeEndpoints := make(map[proxy.ServicePortName]bool) // use a map as a set
|
||||||
|
staleConnections := make(map[endpointServicePair]bool)
|
||||||
|
|
||||||
// Update endpoints for services.
|
// Update endpoints for services.
|
||||||
for i := range allEndpoints {
|
for i := range allEndpoints {
|
||||||
@@ -480,7 +490,12 @@ func (proxier *Proxier) OnEndpointsUpdate(allEndpoints []api.Endpoints) {
|
|||||||
svcPort := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: svcEndpoints.Namespace, Name: svcEndpoints.Name}, Port: portname}
|
svcPort := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: svcEndpoints.Namespace, Name: svcEndpoints.Name}, Port: portname}
|
||||||
curEndpoints := proxier.endpointsMap[svcPort]
|
curEndpoints := proxier.endpointsMap[svcPort]
|
||||||
newEndpoints := flattenValidEndpoints(portsToEndpoints[portname])
|
newEndpoints := flattenValidEndpoints(portsToEndpoints[portname])
|
||||||
|
|
||||||
if len(curEndpoints) != len(newEndpoints) || !slicesEquiv(slice.CopyStrings(curEndpoints), newEndpoints) {
|
if len(curEndpoints) != len(newEndpoints) || !slicesEquiv(slice.CopyStrings(curEndpoints), newEndpoints) {
|
||||||
|
removedEndpoints := getRemovedEndpoints(curEndpoints, newEndpoints)
|
||||||
|
for _, ep := range removedEndpoints {
|
||||||
|
staleConnections[endpointServicePair{endpoint: ep, servicePortName: svcPort}] = true
|
||||||
|
}
|
||||||
glog.V(1).Infof("Setting endpoints for %q to %+v", svcPort, newEndpoints)
|
glog.V(1).Infof("Setting endpoints for %q to %+v", svcPort, newEndpoints)
|
||||||
proxier.endpointsMap[svcPort] = newEndpoints
|
proxier.endpointsMap[svcPort] = newEndpoints
|
||||||
}
|
}
|
||||||
@@ -491,12 +506,18 @@ func (proxier *Proxier) OnEndpointsUpdate(allEndpoints []api.Endpoints) {
|
|||||||
// Remove endpoints missing from the update.
|
// Remove endpoints missing from the update.
|
||||||
for name := range proxier.endpointsMap {
|
for name := range proxier.endpointsMap {
|
||||||
if !activeEndpoints[name] {
|
if !activeEndpoints[name] {
|
||||||
|
// record endpoints of unactive service to stale connections
|
||||||
|
for _, ep := range proxier.endpointsMap[name] {
|
||||||
|
staleConnections[endpointServicePair{endpoint: ep, servicePortName: name}] = true
|
||||||
|
}
|
||||||
|
|
||||||
glog.V(2).Infof("Removing endpoints for %q", name)
|
glog.V(2).Infof("Removing endpoints for %q", name)
|
||||||
delete(proxier.endpointsMap, name)
|
delete(proxier.endpointsMap, name)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
proxier.syncProxyRules()
|
proxier.syncProxyRules()
|
||||||
|
proxier.deleteEndpointConnections(staleConnections)
|
||||||
}
|
}
|
||||||
|
|
||||||
// used in OnEndpointsUpdate
|
// used in OnEndpointsUpdate
|
||||||
@@ -552,6 +573,64 @@ func servicePortEndpointChainName(s proxy.ServicePortName, protocol string, endp
|
|||||||
return utiliptables.Chain("KUBE-SEP-" + encoded[:16])
|
return utiliptables.Chain("KUBE-SEP-" + encoded[:16])
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// getRemovedEndpoints returns the endpoint IPs that are missing in the new endpoints
|
||||||
|
func getRemovedEndpoints(curEndpoints, newEndpoints []string) []string {
|
||||||
|
return sets.NewString(curEndpoints...).Difference(sets.NewString(newEndpoints...)).List()
|
||||||
|
}
|
||||||
|
|
||||||
|
type endpointServicePair struct {
|
||||||
|
endpoint string
|
||||||
|
servicePortName proxy.ServicePortName
|
||||||
|
}
|
||||||
|
|
||||||
|
const noConnectionToDelete = "0 flow entries have been deleted"
|
||||||
|
|
||||||
|
// After a UDP endpoint has been removed, we must flush any pending conntrack entries to it, or else we
|
||||||
|
// risk sending more traffic to it, all of which will be lost (because UDP).
|
||||||
|
// This assumes the proxier mutex is held
|
||||||
|
func (proxier *Proxier) deleteEndpointConnections(connectionMap map[endpointServicePair]bool) {
|
||||||
|
for epSvcPair := range connectionMap {
|
||||||
|
if svcInfo, ok := proxier.serviceMap[epSvcPair.servicePortName]; ok && svcInfo.protocol == api.ProtocolUDP {
|
||||||
|
endpointIP := strings.Split(epSvcPair.endpoint, ":")[0]
|
||||||
|
glog.V(2).Infof("Deleting connection tracking state for service IP %s, endpoint IP %s", svcInfo.clusterIP.String(), endpointIP)
|
||||||
|
err := proxier.execConntrackTool("-D", "--orig-dst", svcInfo.clusterIP.String(), "--dst-nat", endpointIP, "-p", "udp")
|
||||||
|
if err != nil && !strings.Contains(err.Error(), noConnectionToDelete) {
|
||||||
|
// TODO: Better handling for deletion failure. When failure occur, stale udp connection may not get flushed.
|
||||||
|
// These stale udp connection will keep black hole traffic. Making this a best effort operation for now, since it
|
||||||
|
// is expensive to baby sit all udp connections to kubernetes services.
|
||||||
|
glog.Errorf("conntrack return with error: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// deleteServiceConnection use conntrack-tool to delete UDP connection specified by service ip
|
||||||
|
func (proxier *Proxier) deleteServiceConnections(svcIPs []string) {
|
||||||
|
for _, ip := range svcIPs {
|
||||||
|
glog.V(2).Infof("Deleting connection tracking state for service IP %s", ip)
|
||||||
|
err := proxier.execConntrackTool("-D", "--orig-dst", ip, "-p", "udp")
|
||||||
|
if err != nil && !strings.Contains(err.Error(), noConnectionToDelete) {
|
||||||
|
// TODO: Better handling for deletion failure. When failure occur, stale udp connection may not get flushed.
|
||||||
|
// These stale udp connection will keep black hole traffic. Making this a best effort operation for now, since it
|
||||||
|
// is expensive to baby sit all udp connections to kubernetes services.
|
||||||
|
glog.Errorf("conntrack return with error: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
//execConntrackTool executes conntrack tool using given paramters
|
||||||
|
func (proxier *Proxier) execConntrackTool(parameters ...string) error {
|
||||||
|
conntrackPath, err := proxier.exec.LookPath("conntrack")
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("Error looking for path of conntrack: %v", err)
|
||||||
|
}
|
||||||
|
output, err := proxier.exec.Command(conntrackPath, parameters...).CombinedOutput()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("Conntrack command returned: %q, error message: %s", string(output), err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// This is where all of the iptables-save/restore calls happen.
|
// This is where all of the iptables-save/restore calls happen.
|
||||||
// The only other iptables rules are those that are setup in iptablesInit()
|
// The only other iptables rules are those that are setup in iptablesInit()
|
||||||
// assumes proxier.mu is held
|
// assumes proxier.mu is held
|
||||||
|
@@ -19,7 +19,14 @@ package iptables
|
|||||||
import (
|
import (
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
"fmt"
|
||||||
|
"k8s.io/kubernetes/pkg/api"
|
||||||
|
"k8s.io/kubernetes/pkg/proxy"
|
||||||
|
"k8s.io/kubernetes/pkg/types"
|
||||||
|
"k8s.io/kubernetes/pkg/util/exec"
|
||||||
utiliptables "k8s.io/kubernetes/pkg/util/iptables"
|
utiliptables "k8s.io/kubernetes/pkg/util/iptables"
|
||||||
|
"net"
|
||||||
|
"strings"
|
||||||
)
|
)
|
||||||
|
|
||||||
func checkAllLines(t *testing.T, table utiliptables.Table, save []byte, expectedLines map[utiliptables.Chain]string) {
|
func checkAllLines(t *testing.T, table utiliptables.Table, save []byte, expectedLines map[utiliptables.Chain]string) {
|
||||||
@@ -156,4 +163,215 @@ func TestGetChainLinesMultipleTables(t *testing.T) {
|
|||||||
checkAllLines(t, utiliptables.TableNAT, []byte(iptables_save), expected)
|
checkAllLines(t, utiliptables.TableNAT, []byte(iptables_save), expected)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestGetRemovedEndpoints(t *testing.T) {
|
||||||
|
testCases := []struct {
|
||||||
|
currentEndpoints []string
|
||||||
|
newEndpoints []string
|
||||||
|
removedEndpoints []string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
currentEndpoints: []string{"10.0.2.1:80", "10.0.2.2:80"},
|
||||||
|
newEndpoints: []string{"10.0.2.1:80", "10.0.2.2:80"},
|
||||||
|
removedEndpoints: []string{},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
currentEndpoints: []string{"10.0.2.1:80", "10.0.2.2:80", "10.0.2.3:80"},
|
||||||
|
newEndpoints: []string{"10.0.2.1:80", "10.0.2.2:80"},
|
||||||
|
removedEndpoints: []string{"10.0.2.3:80"},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
currentEndpoints: []string{},
|
||||||
|
newEndpoints: []string{"10.0.2.1:80", "10.0.2.2:80"},
|
||||||
|
removedEndpoints: []string{},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
currentEndpoints: []string{"10.0.2.1:80", "10.0.2.2:80"},
|
||||||
|
newEndpoints: []string{},
|
||||||
|
removedEndpoints: []string{"10.0.2.1:80", "10.0.2.2:80"},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
currentEndpoints: []string{"10.0.2.1:80", "10.0.2.2:80", "10.0.2.2:443"},
|
||||||
|
newEndpoints: []string{"10.0.2.1:80", "10.0.2.2:80"},
|
||||||
|
removedEndpoints: []string{"10.0.2.2:443"},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := range testCases {
|
||||||
|
res := getRemovedEndpoints(testCases[i].currentEndpoints, testCases[i].newEndpoints)
|
||||||
|
if !slicesEquiv(res, testCases[i].removedEndpoints) {
|
||||||
|
t.Errorf("Expected: %v, but getRemovedEndpoints returned: %v", testCases[i].removedEndpoints, res)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestExecConntrackTool(t *testing.T) {
|
||||||
|
fcmd := exec.FakeCmd{
|
||||||
|
CombinedOutputScript: []exec.FakeCombinedOutputAction{
|
||||||
|
func() ([]byte, error) { return []byte("1 flow entries have been deleted"), nil },
|
||||||
|
func() ([]byte, error) { return []byte("1 flow entries have been deleted"), nil },
|
||||||
|
func() ([]byte, error) {
|
||||||
|
return []byte(""), fmt.Errorf("conntrack v1.4.2 (conntrack-tools): 0 flow entries have been deleted.")
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
fexec := exec.FakeExec{
|
||||||
|
CommandScript: []exec.FakeCommandAction{
|
||||||
|
func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) },
|
||||||
|
func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) },
|
||||||
|
func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) },
|
||||||
|
},
|
||||||
|
LookPathFunc: func(cmd string) (string, error) { return cmd, nil },
|
||||||
|
}
|
||||||
|
|
||||||
|
fakeProxier := Proxier{exec: &fexec}
|
||||||
|
|
||||||
|
testCases := [][]string{
|
||||||
|
{"-L", "-p", "udp"},
|
||||||
|
{"-D", "-p", "udp", "-d", "10.0.240.1"},
|
||||||
|
{"-D", "-p", "udp", "--orig-dst", "10.240.0.2", "--dst-nat", "10.0.10.2"},
|
||||||
|
}
|
||||||
|
|
||||||
|
expectErr := []bool{false, false, true}
|
||||||
|
|
||||||
|
for i := range testCases {
|
||||||
|
err := fakeProxier.execConntrackTool(testCases[i]...)
|
||||||
|
|
||||||
|
if expectErr[i] {
|
||||||
|
if err == nil {
|
||||||
|
t.Errorf("expected err, got %v", err)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("expected success, got %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
execCmd := strings.Join(fcmd.CombinedOutputLog[i], " ")
|
||||||
|
expectCmd := fmt.Sprintf("%s %s", "conntrack", strings.Join(testCases[i], " "))
|
||||||
|
|
||||||
|
if execCmd != expectCmd {
|
||||||
|
t.Errorf("expect execute command: %s, but got: %s", expectCmd, execCmd)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func newFakeServiceInfo(service proxy.ServicePortName, ip net.IP, protocol api.Protocol) *serviceInfo {
|
||||||
|
return &serviceInfo{
|
||||||
|
sessionAffinityType: api.ServiceAffinityNone, // default
|
||||||
|
stickyMaxAgeSeconds: 180, // TODO: paramaterize this in the API.
|
||||||
|
clusterIP: ip,
|
||||||
|
protocol: protocol,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestDeleteEndpointConnections(t *testing.T) {
|
||||||
|
fcmd := exec.FakeCmd{
|
||||||
|
CombinedOutputScript: []exec.FakeCombinedOutputAction{
|
||||||
|
func() ([]byte, error) { return []byte("1 flow entries have been deleted"), nil },
|
||||||
|
func() ([]byte, error) {
|
||||||
|
return []byte(""), fmt.Errorf("conntrack v1.4.2 (conntrack-tools): 0 flow entries have been deleted.")
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
fexec := exec.FakeExec{
|
||||||
|
CommandScript: []exec.FakeCommandAction{
|
||||||
|
func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) },
|
||||||
|
func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) },
|
||||||
|
},
|
||||||
|
LookPathFunc: func(cmd string) (string, error) { return cmd, nil },
|
||||||
|
}
|
||||||
|
|
||||||
|
serviceMap := make(map[proxy.ServicePortName]*serviceInfo)
|
||||||
|
svc1 := proxy.ServicePortName{types.NamespacedName{Namespace: "ns1", Name: "svc1"}, ""}
|
||||||
|
svc2 := proxy.ServicePortName{types.NamespacedName{Namespace: "ns1", Name: "svc2"}, ""}
|
||||||
|
serviceMap[svc1] = newFakeServiceInfo(svc1, net.IPv4(10, 20, 30, 40), api.ProtocolUDP)
|
||||||
|
serviceMap[svc2] = newFakeServiceInfo(svc1, net.IPv4(10, 20, 30, 41), api.ProtocolTCP)
|
||||||
|
|
||||||
|
fakeProxier := Proxier{exec: &fexec, serviceMap: serviceMap}
|
||||||
|
|
||||||
|
testCases := []endpointServicePair{
|
||||||
|
{
|
||||||
|
endpoint: "10.240.0.3:80",
|
||||||
|
servicePortName: svc1,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
endpoint: "10.240.0.4:80",
|
||||||
|
servicePortName: svc1,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
endpoint: "10.240.0.5:80",
|
||||||
|
servicePortName: svc2,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
expectCommandExecCount := 0
|
||||||
|
for i := range testCases {
|
||||||
|
input := map[endpointServicePair]bool{testCases[i]: true}
|
||||||
|
fakeProxier.deleteEndpointConnections(input)
|
||||||
|
svcInfo := fakeProxier.serviceMap[testCases[i].servicePortName]
|
||||||
|
if svcInfo.protocol == api.ProtocolUDP {
|
||||||
|
svcIp := svcInfo.clusterIP.String()
|
||||||
|
endpointIp := strings.Split(testCases[i].endpoint, ":")[0]
|
||||||
|
expectCommand := fmt.Sprintf("conntrack -D --orig-dst %s --dst-nat %s -p udp", svcIp, endpointIp)
|
||||||
|
execCommand := strings.Join(fcmd.CombinedOutputLog[expectCommandExecCount], " ")
|
||||||
|
if expectCommand != execCommand {
|
||||||
|
t.Errorf("Exepect comand: %s, but executed %s", expectCommand, execCommand)
|
||||||
|
}
|
||||||
|
expectCommandExecCount += 1
|
||||||
|
}
|
||||||
|
|
||||||
|
if expectCommandExecCount != fexec.CommandCalls {
|
||||||
|
t.Errorf("Exepect comand executed %d times, but got %d", expectCommandExecCount, fexec.CommandCalls)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestDeleteServiceConnections(t *testing.T) {
|
||||||
|
fcmd := exec.FakeCmd{
|
||||||
|
CombinedOutputScript: []exec.FakeCombinedOutputAction{
|
||||||
|
func() ([]byte, error) { return []byte("1 flow entries have been deleted"), nil },
|
||||||
|
func() ([]byte, error) { return []byte("1 flow entries have been deleted"), nil },
|
||||||
|
func() ([]byte, error) {
|
||||||
|
return []byte(""), fmt.Errorf("conntrack v1.4.2 (conntrack-tools): 0 flow entries have been deleted.")
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
fexec := exec.FakeExec{
|
||||||
|
CommandScript: []exec.FakeCommandAction{
|
||||||
|
func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) },
|
||||||
|
func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) },
|
||||||
|
func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) },
|
||||||
|
},
|
||||||
|
LookPathFunc: func(cmd string) (string, error) { return cmd, nil },
|
||||||
|
}
|
||||||
|
|
||||||
|
fakeProxier := Proxier{exec: &fexec}
|
||||||
|
|
||||||
|
testCases := [][]string{
|
||||||
|
{
|
||||||
|
"10.240.0.3",
|
||||||
|
"10.240.0.5",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"10.240.0.4",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
svcCount := 0
|
||||||
|
for i := range testCases {
|
||||||
|
fakeProxier.deleteServiceConnections(testCases[i])
|
||||||
|
for _, ip := range testCases[i] {
|
||||||
|
expectCommand := fmt.Sprintf("conntrack -D --orig-dst %s -p udp", ip)
|
||||||
|
execCommand := strings.Join(fcmd.CombinedOutputLog[svcCount], " ")
|
||||||
|
if expectCommand != execCommand {
|
||||||
|
t.Errorf("Exepect comand: %s, but executed %s", expectCommand, execCommand)
|
||||||
|
}
|
||||||
|
svcCount += 1
|
||||||
|
}
|
||||||
|
if svcCount != fexec.CommandCalls {
|
||||||
|
t.Errorf("Exepect comand executed %d times, but got %d", svcCount, fexec.CommandCalls)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// TODO(thockin): add a test for syncProxyRules() or break it down further and test the pieces.
|
// TODO(thockin): add a test for syncProxyRules() or break it down further and test the pieces.
|
||||||
|
Reference in New Issue
Block a user