Proxier unittests

This commit is contained in:
bprashanth
2016-09-26 19:48:21 -07:00
parent 93f9b54cab
commit 06cbb36a1f
3 changed files with 354 additions and 29 deletions

View File

@@ -28,6 +28,7 @@ import (
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util/exec"
utiliptables "k8s.io/kubernetes/pkg/util/iptables"
iptablestest "k8s.io/kubernetes/pkg/util/iptables/testing"
)
func checkAllLines(t *testing.T, table utiliptables.Table, save []byte, expectedLines map[utiliptables.Chain]string) {
@@ -256,12 +257,13 @@ func TestExecConntrackTool(t *testing.T) {
}
}
func newFakeServiceInfo(service proxy.ServicePortName, ip net.IP, protocol api.Protocol) *serviceInfo {
func newFakeServiceInfo(service proxy.ServicePortName, ip net.IP, protocol api.Protocol, onlyNodeLocalEndpoints bool) *serviceInfo {
return &serviceInfo{
sessionAffinityType: api.ServiceAffinityNone, // default
stickyMaxAgeSeconds: 180, // TODO: paramaterize this in the API.
clusterIP: ip,
protocol: protocol,
sessionAffinityType: api.ServiceAffinityNone, // default
stickyMaxAgeSeconds: 180, // TODO: paramaterize this in the API.
clusterIP: ip,
protocol: protocol,
onlyNodeLocalEndpoints: onlyNodeLocalEndpoints,
}
}
@@ -285,8 +287,8 @@ func TestDeleteEndpointConnections(t *testing.T) {
serviceMap := make(map[proxy.ServicePortName]*serviceInfo)
svc1 := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "ns1", Name: "svc1"}, Port: ""}
svc2 := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "ns1", Name: "svc2"}, Port: ""}
serviceMap[svc1] = newFakeServiceInfo(svc1, net.IPv4(10, 20, 30, 40), api.ProtocolUDP)
serviceMap[svc2] = newFakeServiceInfo(svc1, net.IPv4(10, 20, 30, 41), api.ProtocolTCP)
serviceMap[svc1] = newFakeServiceInfo(svc1, net.IPv4(10, 20, 30, 40), api.ProtocolUDP, false)
serviceMap[svc2] = newFakeServiceInfo(svc1, net.IPv4(10, 20, 30, 41), api.ProtocolTCP, false)
fakeProxier := Proxier{exec: &fexec, serviceMap: serviceMap}
@@ -473,4 +475,262 @@ func TestRevertPorts(t *testing.T) {
}
// TODO(thockin): add a test for syncProxyRules() or break it down further and test the pieces.
// fakePortOpener implements portOpener.
type fakePortOpener struct {
openPorts []*localPort
}
// OpenLocalPort fakes out the listen() and bind() used by syncProxyRules
// to lock a local port.
func (f *fakePortOpener) OpenLocalPort(lp *localPort) (closeable, error) {
f.openPorts = append(f.openPorts, lp)
return nil, nil
}
func NewFakeProxier(ipt utiliptables.Interface) *Proxier {
// TODO: Call NewProxier after refactoring out the goroutine
// invocation into a Run() method.
return &Proxier{
exec: &exec.FakeExec{},
serviceMap: make(map[proxy.ServicePortName]*serviceInfo),
iptables: ipt,
endpointsMap: make(map[proxy.ServicePortName][]*endpointsInfo),
clusterCIDR: "10.0.0.0/24",
haveReceivedEndpointsUpdate: true,
haveReceivedServiceUpdate: true,
hostname: "test-hostname",
portsMap: make(map[localPort]closeable),
portMapper: &fakePortOpener{[]*localPort{}},
}
}
func hasJump(rules []iptablestest.Rule, destChain, destIP, destPort string) bool {
for _, r := range rules {
if r[iptablestest.Jump] == destChain {
if destIP != "" {
return strings.Contains(r[iptablestest.Destination], destIP)
}
if destPort != "" {
return strings.Contains(r[iptablestest.DPort], destPort)
}
return true
}
}
return false
}
func hasDNAT(rules []iptablestest.Rule, endpoint string) bool {
for _, r := range rules {
if r[iptablestest.ToDest] == endpoint {
return true
}
}
return false
}
func errorf(msg string, rules []iptablestest.Rule, t *testing.T) {
for _, r := range rules {
t.Logf("%v", r)
}
t.Errorf("%v", msg)
}
func TestClusterIPReject(t *testing.T) {
ipt := iptablestest.NewFake()
fp := NewFakeProxier(ipt)
svcName := "svc1"
svcIP := net.IPv4(10, 20, 30, 41)
svc := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "ns1", Name: svcName}, Port: ""}
fp.serviceMap[svc] = newFakeServiceInfo(svc, svcIP, api.ProtocolTCP, false)
fp.syncProxyRules()
svcChain := string(servicePortChainName(svc, strings.ToLower(string(api.ProtocolTCP))))
svcRules := ipt.GetRules(svcChain)
if len(svcRules) != 0 {
errorf(fmt.Sprintf("Unexpected rule for chain %v service %v without endpoints", svcChain, svcName), svcRules, t)
}
kubeSvcRules := ipt.GetRules(string(kubeServicesChain))
if !hasJump(kubeSvcRules, iptablestest.Reject, svcIP.String(), "") {
errorf(fmt.Sprintf("Failed to find a %v rule for service %v with no endpoints", iptablestest.Reject, svcName), kubeSvcRules, t)
}
}
func TestClusterIPEndpointsJump(t *testing.T) {
ipt := iptablestest.NewFake()
fp := NewFakeProxier(ipt)
svcName := "svc1"
svcIP := net.IPv4(10, 20, 30, 41)
svc := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "ns1", Name: svcName}, Port: "80"}
fp.serviceMap[svc] = newFakeServiceInfo(svc, svcIP, api.ProtocolTCP, true)
ep := "10.180.0.1:80"
fp.endpointsMap[svc] = []*endpointsInfo{{ep, false}}
fp.syncProxyRules()
svcChain := string(servicePortChainName(svc, strings.ToLower(string(api.ProtocolTCP))))
epChain := string(servicePortEndpointChainName(svc, strings.ToLower(string(api.ProtocolTCP)), ep))
kubeSvcRules := ipt.GetRules(string(kubeServicesChain))
if !hasJump(kubeSvcRules, svcChain, svcIP.String(), "") {
errorf(fmt.Sprintf("Failed to find jump from KUBE-SERVICES to %v chain", svcChain), kubeSvcRules, t)
}
svcRules := ipt.GetRules(svcChain)
if !hasJump(svcRules, epChain, "", "") {
errorf(fmt.Sprintf("Failed to jump to ep chain %v", epChain), svcRules, t)
}
epRules := ipt.GetRules(epChain)
if !hasDNAT(epRules, ep) {
errorf(fmt.Sprintf("Endpoint chain %v lacks DNAT to %v", epChain, ep), epRules, t)
}
}
func typeLoadBalancer(svcInfo *serviceInfo) *serviceInfo {
svcInfo.nodePort = 3001
svcInfo.loadBalancerStatus = api.LoadBalancerStatus{
Ingress: []api.LoadBalancerIngress{{IP: "1.2.3.4"}},
}
return svcInfo
}
func TestLoadBalancer(t *testing.T) {
ipt := iptablestest.NewFake()
fp := NewFakeProxier(ipt)
svcName := "svc1"
svcIP := net.IPv4(10, 20, 30, 41)
svc := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "ns1", Name: svcName}, Port: "80"}
svcInfo := newFakeServiceInfo(svc, svcIP, api.ProtocolTCP, false)
fp.serviceMap[svc] = typeLoadBalancer(svcInfo)
ep1 := "10.180.0.1:80"
fp.endpointsMap[svc] = []*endpointsInfo{{ep1, false}}
fp.syncProxyRules()
proto := strings.ToLower(string(api.ProtocolTCP))
fwChain := string(serviceFirewallChainName(svc, proto))
svcChain := string(servicePortChainName(svc, strings.ToLower(string(api.ProtocolTCP))))
//lbChain := string(serviceLBChainName(svc, proto))
kubeSvcRules := ipt.GetRules(string(kubeServicesChain))
if !hasJump(kubeSvcRules, fwChain, svcInfo.loadBalancerStatus.Ingress[0].IP, "") {
errorf(fmt.Sprintf("Failed to find jump to firewall chain %v", fwChain), kubeSvcRules, t)
}
fwRules := ipt.GetRules(fwChain)
if !hasJump(fwRules, svcChain, "", "") || !hasJump(fwRules, string(KubeMarkMasqChain), "", "") {
errorf(fmt.Sprintf("Failed to find jump from firewall chain %v to svc chain %v", fwChain, svcChain), fwRules, t)
}
}
func TestNodePort(t *testing.T) {
ipt := iptablestest.NewFake()
fp := NewFakeProxier(ipt)
svcName := "svc1"
svcIP := net.IPv4(10, 20, 30, 41)
svc := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "ns1", Name: svcName}, Port: "80"}
svcInfo := newFakeServiceInfo(svc, svcIP, api.ProtocolTCP, false)
svcInfo.nodePort = 3001
fp.serviceMap[svc] = svcInfo
ep1 := "10.180.0.1:80"
fp.endpointsMap[svc] = []*endpointsInfo{{ep1, false}}
fp.syncProxyRules()
proto := strings.ToLower(string(api.ProtocolTCP))
svcChain := string(servicePortChainName(svc, strings.ToLower(proto)))
kubeNodePortRules := ipt.GetRules(string(kubeNodePortsChain))
if !hasJump(kubeNodePortRules, svcChain, "", fmt.Sprintf("%v", svcInfo.nodePort)) {
errorf(fmt.Sprintf("Failed to find jump to svc chain %v", svcChain), kubeNodePortRules, t)
}
}
func TestOnlyLocalLoadBalancing(t *testing.T) {
ipt := iptablestest.NewFake()
fp := NewFakeProxier(ipt)
svcName := "svc1"
svcIP := net.IPv4(10, 20, 30, 41)
svc := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "ns1", Name: svcName}, Port: "80"}
svcInfo := newFakeServiceInfo(svc, svcIP, api.ProtocolTCP, true)
fp.serviceMap[svc] = typeLoadBalancer(svcInfo)
nonLocalEp := "10.180.0.1:80"
localEp := "10.180.2.1:80"
fp.endpointsMap[svc] = []*endpointsInfo{{nonLocalEp, false}, {localEp, true}}
fp.syncProxyRules()
proto := strings.ToLower(string(api.ProtocolTCP))
fwChain := string(serviceFirewallChainName(svc, proto))
lbChain := string(serviceLBChainName(svc, proto))
nonLocalEpChain := string(servicePortEndpointChainName(svc, strings.ToLower(string(api.ProtocolTCP)), nonLocalEp))
localEpChain := string(servicePortEndpointChainName(svc, strings.ToLower(string(api.ProtocolTCP)), localEp))
kubeSvcRules := ipt.GetRules(string(kubeServicesChain))
if !hasJump(kubeSvcRules, fwChain, svcInfo.loadBalancerStatus.Ingress[0].IP, "") {
errorf(fmt.Sprintf("Failed to find jump to firewall chain %v", fwChain), kubeSvcRules, t)
}
fwRules := ipt.GetRules(fwChain)
if !hasJump(fwRules, lbChain, "", "") {
errorf(fmt.Sprintf("Failed to find jump from firewall chain %v to svc chain %v", fwChain, lbChain), fwRules, t)
}
if hasJump(fwRules, string(KubeMarkMasqChain), "", "") {
errorf(fmt.Sprintf("Found jump from fw chain %v to MASQUERADE", fwChain), fwRules, t)
}
lbRules := ipt.GetRules(lbChain)
if hasJump(lbRules, nonLocalEpChain, "", "") {
errorf(fmt.Sprintf("Found jump from lb chain %v to non-local ep %v", lbChain, nonLocalEp), lbRules, t)
}
if !hasJump(lbRules, localEpChain, "", "") {
errorf(fmt.Sprintf("Didn't find jump from lb chain %v to local ep %v", lbChain, nonLocalEp), lbRules, t)
}
}
func TestOnlyLocalNodePorts(t *testing.T) {
ipt := iptablestest.NewFake()
fp := NewFakeProxier(ipt)
svcName := "svc1"
svcIP := net.IPv4(10, 20, 30, 41)
svc := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "ns1", Name: svcName}, Port: "80"}
svcInfo := newFakeServiceInfo(svc, svcIP, api.ProtocolTCP, true)
svcInfo.nodePort = 3001
fp.serviceMap[svc] = svcInfo
nonLocalEp := "10.180.0.1:80"
localEp := "10.180.2.1:80"
fp.endpointsMap[svc] = []*endpointsInfo{{nonLocalEp, false}, {localEp, true}}
fp.syncProxyRules()
proto := strings.ToLower(string(api.ProtocolTCP))
lbChain := string(serviceLBChainName(svc, proto))
nonLocalEpChain := string(servicePortEndpointChainName(svc, strings.ToLower(string(api.ProtocolTCP)), nonLocalEp))
localEpChain := string(servicePortEndpointChainName(svc, strings.ToLower(string(api.ProtocolTCP)), localEp))
kubeNodePortRules := ipt.GetRules(string(kubeNodePortsChain))
if !hasJump(kubeNodePortRules, lbChain, "", fmt.Sprintf("%v", svcInfo.nodePort)) {
errorf(fmt.Sprintf("Failed to find jump to lb chain %v", lbChain), kubeNodePortRules, t)
}
lbRules := ipt.GetRules(lbChain)
if hasJump(lbRules, nonLocalEpChain, "", "") {
errorf(fmt.Sprintf("Found jump from lb chain %v to non-local ep %v", lbChain, nonLocalEp), lbRules, t)
}
if !hasJump(lbRules, localEpChain, "", "") {
errorf(fmt.Sprintf("Didn't find jump from lb chain %v to local ep %v", lbChain, nonLocalEp), lbRules, t)
}
}
// TODO(thockin): add *more* tests for syncProxyRules() or break it down further and test the pieces.