
Automatic merge from submit-queue. If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>. Split KUBE-SERVICES chain to re-shrink the INPUT chain **What this PR does / why we need it**: #43972 added an iptables rule "`-A INPUT -j KUBE-SERVICES`" to make NodePort ICMP rejection work. (Previously the KUBE-SERVICES chain was only run from OUTPUT, not INPUT.) #44547 extended that patch for ExternalIP rejection as well. However, the KUBE-SERVICES chain may potentially have a very large number of ICMP reject rules for plain ClusterIP services (the ones that get run from OUTPUT), and it seems that for some reason the kernel is much more sensitive to the length of the INPUT chain than it is to the length of the OUTPUT chain. So a node that worked fine with kube 1.6 (when KUBE-SERVICES was only run from OUTPUT) might fall over with kube 1.7 (with KUBE-SERVICES being run from both INPUT and OUTPUT). (Specifically, a node with about 5000 ClusterIP reject rules that ran fine with OpenShift 3.6 [kube 1.6] slowed almost to a complete halt with OpenShift 3.7 [kube 1.7].) This PR fixes things by splitting out the "new" part of KUBE-SERVICES (NodePort and ExternalIP reject rules) into a separate KUBE-EXTERNAL-SERVICES chain run from INPUT, and moves KUBE-SERVICES back to being only run from OUTPUT. (So, yes, this assumes that you don't have 5000 NodePort/ExternalIP services, but, if you do, there's not much we can do, since those rules *have* to be run on the INPUT side.) Oh, and I left in the code to clean up the "`-A INPUT -j KUBE-SERVICES`" rule even though we don't generate it any more, so it gets fixed on upgrade. **Release note**: ```release-note Reorganized iptables rules to fix a performance regression on clusters with thousands of services. ``` @kubernetes/sig-network-bugs @kubernetes/rh-networking
2246 lines
71 KiB
Go
2246 lines
71 KiB
Go
/*
|
|
Copyright 2015 The Kubernetes Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package iptables
|
|
|
|
import (
|
|
"bytes"
|
|
"fmt"
|
|
"net"
|
|
"reflect"
|
|
"strconv"
|
|
"strings"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/golang/glog"
|
|
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/types"
|
|
"k8s.io/apimachinery/pkg/util/intstr"
|
|
api "k8s.io/kubernetes/pkg/apis/core"
|
|
"k8s.io/kubernetes/pkg/proxy"
|
|
utilproxy "k8s.io/kubernetes/pkg/proxy/util"
|
|
"k8s.io/kubernetes/pkg/util/async"
|
|
utiliptables "k8s.io/kubernetes/pkg/util/iptables"
|
|
iptablestest "k8s.io/kubernetes/pkg/util/iptables/testing"
|
|
"k8s.io/utils/exec"
|
|
fakeexec "k8s.io/utils/exec/testing"
|
|
)
|
|
|
|
func checkAllLines(t *testing.T, table utiliptables.Table, save []byte, expectedLines map[utiliptables.Chain]string) {
|
|
chainLines := utiliptables.GetChainLines(table, save)
|
|
for chain, line := range chainLines {
|
|
if expected, exists := expectedLines[chain]; exists {
|
|
if expected != line {
|
|
t.Errorf("getChainLines expected chain line not present. For chain: %s Expected: %s Got: %s", chain, expected, line)
|
|
}
|
|
} else {
|
|
t.Errorf("getChainLines expected chain not present: %s", chain)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestReadLinesFromByteBuffer(t *testing.T) {
|
|
testFn := func(byteArray []byte, expected []string) {
|
|
index := 0
|
|
readIndex := 0
|
|
for ; readIndex < len(byteArray); index++ {
|
|
line, n := utiliptables.ReadLine(readIndex, byteArray)
|
|
readIndex = n
|
|
if expected[index] != line {
|
|
t.Errorf("expected:%q, actual:%q", expected[index], line)
|
|
}
|
|
} // for
|
|
if readIndex < len(byteArray) {
|
|
t.Errorf("Byte buffer was only partially read. Buffer length is:%d, readIndex is:%d", len(byteArray), readIndex)
|
|
}
|
|
if index < len(expected) {
|
|
t.Errorf("All expected strings were not compared. expected arr length:%d, matched count:%d", len(expected), index-1)
|
|
}
|
|
}
|
|
|
|
byteArray1 := []byte("\n Line 1 \n\n\n L ine4 \nLine 5 \n \n")
|
|
expected1 := []string{"", "Line 1", "", "", "L ine4", "Line 5", ""}
|
|
testFn(byteArray1, expected1)
|
|
|
|
byteArray1 = []byte("")
|
|
expected1 = []string{}
|
|
testFn(byteArray1, expected1)
|
|
|
|
byteArray1 = []byte("\n\n")
|
|
expected1 = []string{"", ""}
|
|
testFn(byteArray1, expected1)
|
|
}
|
|
|
|
func TestGetChainLines(t *testing.T) {
|
|
iptablesSave := `# Generated by iptables-save v1.4.7 on Wed Oct 29 14:56:01 2014
|
|
*nat
|
|
:PREROUTING ACCEPT [2136997:197881818]
|
|
:POSTROUTING ACCEPT [4284525:258542680]
|
|
:OUTPUT ACCEPT [5901660:357267963]
|
|
-A PREROUTING -m addrtype --dst-type LOCAL -j DOCKER
|
|
COMMIT
|
|
# Completed on Wed Oct 29 14:56:01 2014`
|
|
expected := map[utiliptables.Chain]string{
|
|
utiliptables.ChainPrerouting: ":PREROUTING ACCEPT [2136997:197881818]",
|
|
utiliptables.ChainPostrouting: ":POSTROUTING ACCEPT [4284525:258542680]",
|
|
utiliptables.ChainOutput: ":OUTPUT ACCEPT [5901660:357267963]",
|
|
}
|
|
checkAllLines(t, utiliptables.TableNAT, []byte(iptablesSave), expected)
|
|
}
|
|
|
|
func TestGetChainLinesMultipleTables(t *testing.T) {
|
|
iptablesSave := `# Generated by iptables-save v1.4.21 on Fri Aug 7 14:47:37 2015
|
|
*nat
|
|
:PREROUTING ACCEPT [2:138]
|
|
:INPUT ACCEPT [0:0]
|
|
:OUTPUT ACCEPT [0:0]
|
|
:POSTROUTING ACCEPT [0:0]
|
|
:DOCKER - [0:0]
|
|
:KUBE-NODEPORT-CONTAINER - [0:0]
|
|
:KUBE-NODEPORT-HOST - [0:0]
|
|
:KUBE-PORTALS-CONTAINER - [0:0]
|
|
:KUBE-PORTALS-HOST - [0:0]
|
|
:KUBE-SVC-1111111111111111 - [0:0]
|
|
:KUBE-SVC-2222222222222222 - [0:0]
|
|
:KUBE-SVC-3333333333333333 - [0:0]
|
|
:KUBE-SVC-4444444444444444 - [0:0]
|
|
:KUBE-SVC-5555555555555555 - [0:0]
|
|
:KUBE-SVC-6666666666666666 - [0:0]
|
|
-A PREROUTING -m comment --comment "handle ClusterIPs; NOTE: this must be before the NodePort rules" -j KUBE-PORTALS-CONTAINER
|
|
-A PREROUTING -m addrtype --dst-type LOCAL -j DOCKER
|
|
-A PREROUTING -m addrtype --dst-type LOCAL -m comment --comment "handle service NodePorts; NOTE: this must be the last rule in the chain" -j KUBE-NODEPORT-CONTAINER
|
|
-A OUTPUT -m comment --comment "handle ClusterIPs; NOTE: this must be before the NodePort rules" -j KUBE-PORTALS-HOST
|
|
-A OUTPUT ! -d 127.0.0.0/8 -m addrtype --dst-type LOCAL -j DOCKER
|
|
-A OUTPUT -m addrtype --dst-type LOCAL -m comment --comment "handle service NodePorts; NOTE: this must be the last rule in the chain" -j KUBE-NODEPORT-HOST
|
|
-A POSTROUTING -s 10.246.1.0/24 ! -o cbr0 -j MASQUERADE
|
|
-A POSTROUTING -s 10.0.2.15/32 -d 10.0.2.15/32 -m comment --comment "handle pod connecting to self" -j MASQUERADE
|
|
-A KUBE-PORTALS-CONTAINER -d 10.247.0.1/32 -p tcp -m comment --comment "portal for default/kubernetes:" -m state --state NEW -m tcp --dport 443 -j KUBE-SVC-5555555555555555
|
|
-A KUBE-PORTALS-CONTAINER -d 10.247.0.10/32 -p udp -m comment --comment "portal for kube-system/kube-dns:dns" -m state --state NEW -m udp --dport 53 -j KUBE-SVC-6666666666666666
|
|
-A KUBE-PORTALS-CONTAINER -d 10.247.0.10/32 -p tcp -m comment --comment "portal for kube-system/kube-dns:dns-tcp" -m state --state NEW -m tcp --dport 53 -j KUBE-SVC-2222222222222222
|
|
-A KUBE-PORTALS-HOST -d 10.247.0.1/32 -p tcp -m comment --comment "portal for default/kubernetes:" -m state --state NEW -m tcp --dport 443 -j KUBE-SVC-5555555555555555
|
|
-A KUBE-PORTALS-HOST -d 10.247.0.10/32 -p udp -m comment --comment "portal for kube-system/kube-dns:dns" -m state --state NEW -m udp --dport 53 -j KUBE-SVC-6666666666666666
|
|
-A KUBE-PORTALS-HOST -d 10.247.0.10/32 -p tcp -m comment --comment "portal for kube-system/kube-dns:dns-tcp" -m state --state NEW -m tcp --dport 53 -j KUBE-SVC-2222222222222222
|
|
-A KUBE-SVC-1111111111111111 -p udp -m comment --comment "kube-system/kube-dns:dns" -m recent --set --name KUBE-SVC-1111111111111111 --mask 255.255.255.255 --rsource -j DNAT --to-destination 10.246.1.2:53
|
|
-A KUBE-SVC-2222222222222222 -m comment --comment "kube-system/kube-dns:dns-tcp" -j KUBE-SVC-3333333333333333
|
|
-A KUBE-SVC-3333333333333333 -p tcp -m comment --comment "kube-system/kube-dns:dns-tcp" -m recent --set --name KUBE-SVC-3333333333333333 --mask 255.255.255.255 --rsource -j DNAT --to-destination 10.246.1.2:53
|
|
-A KUBE-SVC-4444444444444444 -p tcp -m comment --comment "default/kubernetes:" -m recent --set --name KUBE-SVC-4444444444444444 --mask 255.255.255.255 --rsource -j DNAT --to-destination 10.245.1.2:443
|
|
-A KUBE-SVC-5555555555555555 -m comment --comment "default/kubernetes:" -j KUBE-SVC-4444444444444444
|
|
-A KUBE-SVC-6666666666666666 -m comment --comment "kube-system/kube-dns:dns" -j KUBE-SVC-1111111111111111
|
|
COMMIT
|
|
# Completed on Fri Aug 7 14:47:37 2015
|
|
# Generated by iptables-save v1.4.21 on Fri Aug 7 14:47:37 2015
|
|
*filter
|
|
:INPUT ACCEPT [17514:83115836]
|
|
:FORWARD ACCEPT [0:0]
|
|
:OUTPUT ACCEPT [8909:688225]
|
|
:DOCKER - [0:0]
|
|
-A FORWARD -o cbr0 -j DOCKER
|
|
-A FORWARD -o cbr0 -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT
|
|
-A FORWARD -i cbr0 ! -o cbr0 -j ACCEPT
|
|
-A FORWARD -i cbr0 -o cbr0 -j ACCEPT
|
|
COMMIT
|
|
`
|
|
expected := map[utiliptables.Chain]string{
|
|
utiliptables.ChainPrerouting: ":PREROUTING ACCEPT [2:138]",
|
|
utiliptables.Chain("INPUT"): ":INPUT ACCEPT [0:0]",
|
|
utiliptables.Chain("OUTPUT"): ":OUTPUT ACCEPT [0:0]",
|
|
utiliptables.ChainPostrouting: ":POSTROUTING ACCEPT [0:0]",
|
|
utiliptables.Chain("DOCKER"): ":DOCKER - [0:0]",
|
|
utiliptables.Chain("KUBE-NODEPORT-CONTAINER"): ":KUBE-NODEPORT-CONTAINER - [0:0]",
|
|
utiliptables.Chain("KUBE-NODEPORT-HOST"): ":KUBE-NODEPORT-HOST - [0:0]",
|
|
utiliptables.Chain("KUBE-PORTALS-CONTAINER"): ":KUBE-PORTALS-CONTAINER - [0:0]",
|
|
utiliptables.Chain("KUBE-PORTALS-HOST"): ":KUBE-PORTALS-HOST - [0:0]",
|
|
utiliptables.Chain("KUBE-SVC-1111111111111111"): ":KUBE-SVC-1111111111111111 - [0:0]",
|
|
utiliptables.Chain("KUBE-SVC-2222222222222222"): ":KUBE-SVC-2222222222222222 - [0:0]",
|
|
utiliptables.Chain("KUBE-SVC-3333333333333333"): ":KUBE-SVC-3333333333333333 - [0:0]",
|
|
utiliptables.Chain("KUBE-SVC-4444444444444444"): ":KUBE-SVC-4444444444444444 - [0:0]",
|
|
utiliptables.Chain("KUBE-SVC-5555555555555555"): ":KUBE-SVC-5555555555555555 - [0:0]",
|
|
utiliptables.Chain("KUBE-SVC-6666666666666666"): ":KUBE-SVC-6666666666666666 - [0:0]",
|
|
}
|
|
checkAllLines(t, utiliptables.TableNAT, []byte(iptablesSave), expected)
|
|
}
|
|
|
|
func newFakeServiceInfo(service proxy.ServicePortName, ip net.IP, port int, protocol api.Protocol, onlyNodeLocalEndpoints bool) *serviceInfo {
|
|
return &serviceInfo{
|
|
sessionAffinityType: api.ServiceAffinityNone, // default
|
|
stickyMaxAgeSeconds: int(api.DefaultClientIPServiceAffinitySeconds), // default
|
|
clusterIP: ip,
|
|
port: port,
|
|
protocol: protocol,
|
|
onlyNodeLocalEndpoints: onlyNodeLocalEndpoints,
|
|
}
|
|
}
|
|
|
|
func TestDeleteEndpointConnections(t *testing.T) {
|
|
const (
|
|
UDP = api.ProtocolUDP
|
|
TCP = api.ProtocolTCP
|
|
)
|
|
testCases := []struct {
|
|
description string
|
|
svcName string
|
|
svcIP string
|
|
svcPort int
|
|
protocol api.Protocol
|
|
endpoint string // IP:port endpoint
|
|
epSvcPair proxy.ServiceEndpoint // Will be generated by test
|
|
simulatedErr string
|
|
}{
|
|
{
|
|
description: "V4 UDP",
|
|
svcName: "v4-udp",
|
|
svcIP: "10.96.1.1",
|
|
svcPort: 80,
|
|
protocol: UDP,
|
|
endpoint: "10.240.0.3:80",
|
|
}, {
|
|
description: "V4 TCP",
|
|
svcName: "v4-tcp",
|
|
svcIP: "10.96.2.2",
|
|
svcPort: 80,
|
|
protocol: TCP,
|
|
endpoint: "10.240.0.4:80",
|
|
}, {
|
|
description: "V4 UDP, nothing to delete, benign error",
|
|
svcName: "v4-udp-nothing-to-delete",
|
|
svcIP: "10.96.1.1",
|
|
svcPort: 80,
|
|
protocol: UDP,
|
|
endpoint: "10.240.0.3:80",
|
|
simulatedErr: utilproxy.NoConnectionToDelete,
|
|
}, {
|
|
description: "V4 UDP, unexpected error, should be glogged",
|
|
svcName: "v4-udp-simulated-error",
|
|
svcIP: "10.96.1.1",
|
|
svcPort: 80,
|
|
protocol: UDP,
|
|
endpoint: "10.240.0.3:80",
|
|
simulatedErr: "simulated error",
|
|
}, {
|
|
description: "V6 UDP",
|
|
svcName: "v6-udp",
|
|
svcIP: "fd00:1234::20",
|
|
svcPort: 80,
|
|
protocol: UDP,
|
|
endpoint: "[2001:db8::2]:80",
|
|
}, {
|
|
description: "V6 TCP",
|
|
svcName: "v6-tcp",
|
|
svcIP: "fd00:1234::30",
|
|
svcPort: 80,
|
|
protocol: TCP,
|
|
endpoint: "[2001:db8::3]:80",
|
|
},
|
|
}
|
|
|
|
// Create a service map that has service info entries for all test cases
|
|
// and generate an endpoint service pair for each test case
|
|
serviceMap := make(map[proxy.ServicePortName]proxy.ServicePort)
|
|
for i, tc := range testCases {
|
|
svc := proxy.ServicePortName{
|
|
NamespacedName: types.NamespacedName{Namespace: "ns1", Name: tc.svcName},
|
|
Port: "p80",
|
|
}
|
|
serviceMap[svc] = newFakeServiceInfo(svc, net.ParseIP(tc.svcIP), 80, tc.protocol, false)
|
|
testCases[i].epSvcPair = proxy.ServiceEndpoint{
|
|
Endpoint: tc.endpoint,
|
|
ServicePortName: svc,
|
|
}
|
|
}
|
|
|
|
// Create a fake executor for the conntrack utility. This should only be
|
|
// invoked for UDP connections, since no conntrack cleanup is needed for TCP
|
|
fcmd := fakeexec.FakeCmd{}
|
|
fexec := fakeexec.FakeExec{
|
|
LookPathFunc: func(cmd string) (string, error) { return cmd, nil },
|
|
}
|
|
execFunc := func(cmd string, args ...string) exec.Cmd {
|
|
return fakeexec.InitFakeCmd(&fcmd, cmd, args...)
|
|
}
|
|
for _, tc := range testCases {
|
|
if tc.protocol == UDP {
|
|
var cmdOutput string
|
|
var simErr error
|
|
if tc.simulatedErr == "" {
|
|
cmdOutput = "1 flow entries have been deleted"
|
|
} else {
|
|
simErr = fmt.Errorf(tc.simulatedErr)
|
|
}
|
|
cmdFunc := func() ([]byte, error) { return []byte(cmdOutput), simErr }
|
|
fcmd.CombinedOutputScript = append(fcmd.CombinedOutputScript, cmdFunc)
|
|
fexec.CommandScript = append(fexec.CommandScript, execFunc)
|
|
}
|
|
}
|
|
|
|
// Create a proxier using the fake conntrack executor and service map
|
|
fakeProxier := Proxier{exec: &fexec, serviceMap: serviceMap}
|
|
|
|
// Run the test cases
|
|
for _, tc := range testCases {
|
|
priorExecs := fexec.CommandCalls
|
|
priorGlogErrs := glog.Stats.Error.Lines()
|
|
|
|
input := []proxy.ServiceEndpoint{tc.epSvcPair}
|
|
fakeProxier.deleteEndpointConnections(input)
|
|
|
|
// For UDP connections, check the executed conntrack command
|
|
var expExecs int
|
|
if tc.protocol == UDP {
|
|
isIPv6 := func(ip string) bool {
|
|
netIP := net.ParseIP(ip)
|
|
if netIP.To4() == nil {
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
endpointIP := utilproxy.IPPart(tc.endpoint)
|
|
expectCommand := fmt.Sprintf("conntrack -D --orig-dst %s --dst-nat %s -p udp", tc.svcIP, endpointIP)
|
|
if isIPv6(endpointIP) {
|
|
expectCommand += " -f ipv6"
|
|
}
|
|
actualCommand := strings.Join(fcmd.CombinedOutputLog[fexec.CommandCalls-1], " ")
|
|
if actualCommand != expectCommand {
|
|
t.Errorf("%s: Expected command: %s, but executed %s", tc.description, expectCommand, actualCommand)
|
|
}
|
|
expExecs = 1
|
|
}
|
|
|
|
// Check the number of times conntrack was executed
|
|
execs := fexec.CommandCalls - priorExecs
|
|
if execs != expExecs {
|
|
t.Errorf("%s: Expected conntrack to be executed %d times, but got %d", tc.description, expExecs, execs)
|
|
}
|
|
|
|
// Check the number of new glog errors
|
|
var expGlogErrs int64
|
|
if tc.simulatedErr != "" && tc.simulatedErr != utilproxy.NoConnectionToDelete {
|
|
expGlogErrs = 1
|
|
}
|
|
glogErrs := glog.Stats.Error.Lines() - priorGlogErrs
|
|
if glogErrs != expGlogErrs {
|
|
t.Errorf("%s: Expected %d glogged errors, but got %d", tc.description, expGlogErrs, glogErrs)
|
|
}
|
|
}
|
|
}
|
|
|
|
type fakeClosable struct {
|
|
closed bool
|
|
}
|
|
|
|
func (c *fakeClosable) Close() error {
|
|
c.closed = true
|
|
return nil
|
|
}
|
|
|
|
// fakePortOpener implements portOpener.
|
|
type fakePortOpener struct {
|
|
openPorts []*utilproxy.LocalPort
|
|
}
|
|
|
|
// OpenLocalPort fakes out the listen() and bind() used by syncProxyRules
|
|
// to lock a local port.
|
|
func (f *fakePortOpener) OpenLocalPort(lp *utilproxy.LocalPort) (utilproxy.Closeable, error) {
|
|
f.openPorts = append(f.openPorts, lp)
|
|
return nil, nil
|
|
}
|
|
|
|
type fakeHealthChecker struct {
|
|
services map[types.NamespacedName]uint16
|
|
endpoints map[types.NamespacedName]int
|
|
}
|
|
|
|
func newFakeHealthChecker() *fakeHealthChecker {
|
|
return &fakeHealthChecker{
|
|
services: map[types.NamespacedName]uint16{},
|
|
endpoints: map[types.NamespacedName]int{},
|
|
}
|
|
}
|
|
|
|
func (fake *fakeHealthChecker) SyncServices(newServices map[types.NamespacedName]uint16) error {
|
|
fake.services = newServices
|
|
return nil
|
|
}
|
|
|
|
func (fake *fakeHealthChecker) SyncEndpoints(newEndpoints map[types.NamespacedName]int) error {
|
|
fake.endpoints = newEndpoints
|
|
return nil
|
|
}
|
|
|
|
const testHostname = "test-hostname"
|
|
|
|
func NewFakeProxier(ipt utiliptables.Interface) *Proxier {
|
|
// TODO: Call NewProxier after refactoring out the goroutine
|
|
// invocation into a Run() method.
|
|
p := &Proxier{
|
|
exec: &fakeexec.FakeExec{},
|
|
serviceMap: make(proxy.ServiceMap),
|
|
serviceChanges: proxy.NewServiceChangeTracker(),
|
|
endpointsMap: make(proxy.EndpointsMap),
|
|
endpointsChanges: proxy.NewEndpointChangeTracker(testHostname),
|
|
iptables: ipt,
|
|
clusterCIDR: "10.0.0.0/24",
|
|
hostname: testHostname,
|
|
portsMap: make(map[utilproxy.LocalPort]utilproxy.Closeable),
|
|
portMapper: &fakePortOpener{[]*utilproxy.LocalPort{}},
|
|
healthChecker: newFakeHealthChecker(),
|
|
precomputedProbabilities: make([]string, 0, 1001),
|
|
iptablesData: bytes.NewBuffer(nil),
|
|
filterChains: bytes.NewBuffer(nil),
|
|
filterRules: bytes.NewBuffer(nil),
|
|
natChains: bytes.NewBuffer(nil),
|
|
natRules: bytes.NewBuffer(nil),
|
|
}
|
|
p.syncRunner = async.NewBoundedFrequencyRunner("test-sync-runner", p.syncProxyRules, 0, time.Minute, 1)
|
|
return p
|
|
}
|
|
|
|
func hasSessionAffinityRule(rules []iptablestest.Rule) bool {
|
|
for _, r := range rules {
|
|
if _, ok := r[iptablestest.Recent]; ok {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func hasJump(rules []iptablestest.Rule, destChain, destIP string, destPort int) bool {
|
|
destPortStr := strconv.Itoa(destPort)
|
|
match := false
|
|
for _, r := range rules {
|
|
if r[iptablestest.Jump] == destChain {
|
|
match = true
|
|
if destIP != "" {
|
|
if strings.Contains(r[iptablestest.Destination], destIP) && (strings.Contains(r[iptablestest.DPort], destPortStr) || r[iptablestest.DPort] == "") {
|
|
return true
|
|
}
|
|
match = false
|
|
}
|
|
if destPort != 0 {
|
|
if strings.Contains(r[iptablestest.DPort], destPortStr) && (strings.Contains(r[iptablestest.Destination], destIP) || r[iptablestest.Destination] == "") {
|
|
return true
|
|
}
|
|
match = false
|
|
}
|
|
}
|
|
}
|
|
return match
|
|
}
|
|
|
|
func TestHasJump(t *testing.T) {
|
|
testCases := map[string]struct {
|
|
rules []iptablestest.Rule
|
|
destChain string
|
|
destIP string
|
|
destPort int
|
|
expected bool
|
|
}{
|
|
"case 1": {
|
|
// Match the 1st rule(both dest IP and dest Port)
|
|
rules: []iptablestest.Rule{
|
|
{"-d ": "10.20.30.41/32", "--dport ": "80", "-p ": "tcp", "-j ": "REJECT"},
|
|
{"--dport ": "3001", "-p ": "tcp", "-j ": "KUBE-MARK-MASQ"},
|
|
},
|
|
destChain: "REJECT",
|
|
destIP: "10.20.30.41",
|
|
destPort: 80,
|
|
expected: true,
|
|
},
|
|
"case 2": {
|
|
// Match the 2nd rule(dest Port)
|
|
rules: []iptablestest.Rule{
|
|
{"-d ": "10.20.30.41/32", "-p ": "tcp", "-j ": "REJECT"},
|
|
{"--dport ": "3001", "-p ": "tcp", "-j ": "REJECT"},
|
|
},
|
|
destChain: "REJECT",
|
|
destIP: "",
|
|
destPort: 3001,
|
|
expected: true,
|
|
},
|
|
"case 3": {
|
|
// Match both dest IP and dest Port
|
|
rules: []iptablestest.Rule{
|
|
{"-d ": "1.2.3.4/32", "--dport ": "80", "-p ": "tcp", "-j ": "KUBE-XLB-GF53O3C2HZEXL2XN"},
|
|
},
|
|
destChain: "KUBE-XLB-GF53O3C2HZEXL2XN",
|
|
destIP: "1.2.3.4",
|
|
destPort: 80,
|
|
expected: true,
|
|
},
|
|
"case 4": {
|
|
// Match dest IP but doesn't match dest Port
|
|
rules: []iptablestest.Rule{
|
|
{"-d ": "1.2.3.4/32", "--dport ": "80", "-p ": "tcp", "-j ": "KUBE-XLB-GF53O3C2HZEXL2XN"},
|
|
},
|
|
destChain: "KUBE-XLB-GF53O3C2HZEXL2XN",
|
|
destIP: "1.2.3.4",
|
|
destPort: 8080,
|
|
expected: false,
|
|
},
|
|
"case 5": {
|
|
// Match dest Port but doesn't match dest IP
|
|
rules: []iptablestest.Rule{
|
|
{"-d ": "1.2.3.4/32", "--dport ": "80", "-p ": "tcp", "-j ": "KUBE-XLB-GF53O3C2HZEXL2XN"},
|
|
},
|
|
destChain: "KUBE-XLB-GF53O3C2HZEXL2XN",
|
|
destIP: "10.20.30.40",
|
|
destPort: 80,
|
|
expected: false,
|
|
},
|
|
"case 6": {
|
|
// Match the 2nd rule(dest IP)
|
|
rules: []iptablestest.Rule{
|
|
{"-d ": "10.20.30.41/32", "-p ": "tcp", "-j ": "REJECT"},
|
|
{"-d ": "1.2.3.4/32", "-p ": "tcp", "-j ": "REJECT"},
|
|
{"--dport ": "3001", "-p ": "tcp", "-j ": "REJECT"},
|
|
},
|
|
destChain: "REJECT",
|
|
destIP: "1.2.3.4",
|
|
destPort: 8080,
|
|
expected: true,
|
|
},
|
|
"case 7": {
|
|
// Match the 2nd rule(dest Port)
|
|
rules: []iptablestest.Rule{
|
|
{"-d ": "10.20.30.41/32", "-p ": "tcp", "-j ": "REJECT"},
|
|
{"--dport ": "3001", "-p ": "tcp", "-j ": "REJECT"},
|
|
},
|
|
destChain: "REJECT",
|
|
destIP: "1.2.3.4",
|
|
destPort: 3001,
|
|
expected: true,
|
|
},
|
|
"case 8": {
|
|
// Match the 1st rule(dest IP)
|
|
rules: []iptablestest.Rule{
|
|
{"-d ": "10.20.30.41/32", "-p ": "tcp", "-j ": "REJECT"},
|
|
{"--dport ": "3001", "-p ": "tcp", "-j ": "REJECT"},
|
|
},
|
|
destChain: "REJECT",
|
|
destIP: "10.20.30.41",
|
|
destPort: 8080,
|
|
expected: true,
|
|
},
|
|
"case 9": {
|
|
rules: []iptablestest.Rule{
|
|
{"-j ": "KUBE-SEP-LWSOSDSHMKPJHHJV"},
|
|
},
|
|
destChain: "KUBE-SEP-LWSOSDSHMKPJHHJV",
|
|
destIP: "",
|
|
destPort: 0,
|
|
expected: true,
|
|
},
|
|
"case 10": {
|
|
rules: []iptablestest.Rule{
|
|
{"-j ": "KUBE-SEP-FOO"},
|
|
},
|
|
destChain: "KUBE-SEP-BAR",
|
|
destIP: "",
|
|
destPort: 0,
|
|
expected: false,
|
|
},
|
|
}
|
|
|
|
for k, tc := range testCases {
|
|
if got := hasJump(tc.rules, tc.destChain, tc.destIP, tc.destPort); got != tc.expected {
|
|
t.Errorf("%v: expected %v, got %v", k, tc.expected, got)
|
|
}
|
|
}
|
|
}
|
|
|
|
func hasDNAT(rules []iptablestest.Rule, endpoint string) bool {
|
|
for _, r := range rules {
|
|
if r[iptablestest.ToDest] == endpoint {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func errorf(msg string, rules []iptablestest.Rule, t *testing.T) {
|
|
for _, r := range rules {
|
|
t.Logf("%q", r)
|
|
}
|
|
t.Errorf("%v", msg)
|
|
}
|
|
|
|
func TestClusterIPReject(t *testing.T) {
|
|
ipt := iptablestest.NewFake()
|
|
fp := NewFakeProxier(ipt)
|
|
svcIP := "10.20.30.41"
|
|
svcPort := 80
|
|
svcPortName := proxy.ServicePortName{
|
|
NamespacedName: makeNSN("ns1", "svc1"),
|
|
Port: "p80",
|
|
}
|
|
|
|
makeServiceMap(fp,
|
|
makeTestService(svcPortName.Namespace, svcPortName.Namespace, func(svc *api.Service) {
|
|
svc.Spec.ClusterIP = svcIP
|
|
svc.Spec.Ports = []api.ServicePort{{
|
|
Name: svcPortName.Port,
|
|
Port: int32(svcPort),
|
|
Protocol: api.ProtocolTCP,
|
|
}}
|
|
}),
|
|
)
|
|
makeEndpointsMap(fp)
|
|
fp.syncProxyRules()
|
|
|
|
svcChain := string(servicePortChainName(svcPortName.String(), strings.ToLower(string(api.ProtocolTCP))))
|
|
svcRules := ipt.GetRules(svcChain)
|
|
if len(svcRules) != 0 {
|
|
errorf(fmt.Sprintf("Unexpected rule for chain %v service %v without endpoints", svcChain, svcPortName), svcRules, t)
|
|
}
|
|
kubeSvcRules := ipt.GetRules(string(kubeServicesChain))
|
|
if !hasJump(kubeSvcRules, iptablestest.Reject, svcIP, svcPort) {
|
|
errorf(fmt.Sprintf("Failed to find a %v rule for service %v with no endpoints", iptablestest.Reject, svcPortName), kubeSvcRules, t)
|
|
}
|
|
}
|
|
|
|
func TestClusterIPEndpointsJump(t *testing.T) {
|
|
ipt := iptablestest.NewFake()
|
|
fp := NewFakeProxier(ipt)
|
|
svcIP := "10.20.30.41"
|
|
svcPort := 80
|
|
svcPortName := proxy.ServicePortName{
|
|
NamespacedName: makeNSN("ns1", "svc1"),
|
|
Port: "p80",
|
|
}
|
|
|
|
makeServiceMap(fp,
|
|
makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *api.Service) {
|
|
svc.Spec.ClusterIP = svcIP
|
|
svc.Spec.Ports = []api.ServicePort{{
|
|
Name: svcPortName.Port,
|
|
Port: int32(svcPort),
|
|
Protocol: api.ProtocolTCP,
|
|
}}
|
|
}),
|
|
)
|
|
|
|
epIP := "10.180.0.1"
|
|
makeEndpointsMap(fp,
|
|
makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, func(ept *api.Endpoints) {
|
|
ept.Subsets = []api.EndpointSubset{{
|
|
Addresses: []api.EndpointAddress{{
|
|
IP: epIP,
|
|
}},
|
|
Ports: []api.EndpointPort{{
|
|
Name: svcPortName.Port,
|
|
Port: int32(svcPort),
|
|
}},
|
|
}}
|
|
}),
|
|
)
|
|
|
|
fp.syncProxyRules()
|
|
|
|
epStr := fmt.Sprintf("%s:%d", epIP, svcPort)
|
|
svcChain := string(servicePortChainName(svcPortName.String(), strings.ToLower(string(api.ProtocolTCP))))
|
|
epChain := string(servicePortEndpointChainName(svcPortName.String(), strings.ToLower(string(api.ProtocolTCP)), epStr))
|
|
|
|
kubeSvcRules := ipt.GetRules(string(kubeServicesChain))
|
|
if !hasJump(kubeSvcRules, svcChain, svcIP, svcPort) {
|
|
errorf(fmt.Sprintf("Failed to find jump from KUBE-SERVICES to %v chain", svcChain), kubeSvcRules, t)
|
|
}
|
|
|
|
svcRules := ipt.GetRules(svcChain)
|
|
if !hasJump(svcRules, epChain, "", 0) {
|
|
errorf(fmt.Sprintf("Failed to jump to ep chain %v", epChain), svcRules, t)
|
|
}
|
|
epRules := ipt.GetRules(epChain)
|
|
if !hasDNAT(epRules, epStr) {
|
|
errorf(fmt.Sprintf("Endpoint chain %v lacks DNAT to %v", epChain, epStr), epRules, t)
|
|
}
|
|
}
|
|
|
|
func TestLoadBalancer(t *testing.T) {
|
|
ipt := iptablestest.NewFake()
|
|
fp := NewFakeProxier(ipt)
|
|
svcIP := "10.20.30.41"
|
|
svcPort := 80
|
|
svcNodePort := 3001
|
|
svcLBIP := "1.2.3.4"
|
|
svcPortName := proxy.ServicePortName{
|
|
NamespacedName: makeNSN("ns1", "svc1"),
|
|
Port: "p80",
|
|
}
|
|
|
|
makeServiceMap(fp,
|
|
makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *api.Service) {
|
|
svc.Spec.Type = "LoadBalancer"
|
|
svc.Spec.ClusterIP = svcIP
|
|
svc.Spec.Ports = []api.ServicePort{{
|
|
Name: svcPortName.Port,
|
|
Port: int32(svcPort),
|
|
Protocol: api.ProtocolTCP,
|
|
NodePort: int32(svcNodePort),
|
|
}}
|
|
svc.Status.LoadBalancer.Ingress = []api.LoadBalancerIngress{{
|
|
IP: svcLBIP,
|
|
}}
|
|
}),
|
|
)
|
|
|
|
epIP := "10.180.0.1"
|
|
makeEndpointsMap(fp,
|
|
makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, func(ept *api.Endpoints) {
|
|
ept.Subsets = []api.EndpointSubset{{
|
|
Addresses: []api.EndpointAddress{{
|
|
IP: epIP,
|
|
}},
|
|
Ports: []api.EndpointPort{{
|
|
Name: svcPortName.Port,
|
|
Port: int32(svcPort),
|
|
}},
|
|
}}
|
|
}),
|
|
)
|
|
|
|
fp.syncProxyRules()
|
|
|
|
proto := strings.ToLower(string(api.ProtocolTCP))
|
|
fwChain := string(serviceFirewallChainName(svcPortName.String(), proto))
|
|
svcChain := string(servicePortChainName(svcPortName.String(), proto))
|
|
|
|
kubeSvcRules := ipt.GetRules(string(kubeServicesChain))
|
|
if !hasJump(kubeSvcRules, fwChain, svcLBIP, svcPort) {
|
|
errorf(fmt.Sprintf("Failed to find jump to firewall chain %v", fwChain), kubeSvcRules, t)
|
|
}
|
|
|
|
fwRules := ipt.GetRules(fwChain)
|
|
if !hasJump(fwRules, svcChain, "", 0) || !hasJump(fwRules, string(KubeMarkMasqChain), "", 0) {
|
|
errorf(fmt.Sprintf("Failed to find jump from firewall chain %v to svc chain %v", fwChain, svcChain), fwRules, t)
|
|
}
|
|
}
|
|
|
|
func TestNodePort(t *testing.T) {
|
|
ipt := iptablestest.NewFake()
|
|
fp := NewFakeProxier(ipt)
|
|
svcIP := "10.20.30.41"
|
|
svcPort := 80
|
|
svcNodePort := 3001
|
|
svcPortName := proxy.ServicePortName{
|
|
NamespacedName: makeNSN("ns1", "svc1"),
|
|
Port: "p80",
|
|
}
|
|
|
|
makeServiceMap(fp,
|
|
makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *api.Service) {
|
|
svc.Spec.Type = "NodePort"
|
|
svc.Spec.ClusterIP = svcIP
|
|
svc.Spec.Ports = []api.ServicePort{{
|
|
Name: svcPortName.Port,
|
|
Port: int32(svcPort),
|
|
Protocol: api.ProtocolTCP,
|
|
NodePort: int32(svcNodePort),
|
|
}}
|
|
}),
|
|
)
|
|
|
|
epIP := "10.180.0.1"
|
|
makeEndpointsMap(fp,
|
|
makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, func(ept *api.Endpoints) {
|
|
ept.Subsets = []api.EndpointSubset{{
|
|
Addresses: []api.EndpointAddress{{
|
|
IP: epIP,
|
|
}},
|
|
Ports: []api.EndpointPort{{
|
|
Name: svcPortName.Port,
|
|
Port: int32(svcPort),
|
|
}},
|
|
}}
|
|
}),
|
|
)
|
|
|
|
fp.syncProxyRules()
|
|
|
|
proto := strings.ToLower(string(api.ProtocolTCP))
|
|
svcChain := string(servicePortChainName(svcPortName.String(), proto))
|
|
|
|
kubeNodePortRules := ipt.GetRules(string(kubeNodePortsChain))
|
|
if !hasJump(kubeNodePortRules, svcChain, "", svcNodePort) {
|
|
errorf(fmt.Sprintf("Failed to find jump to svc chain %v", svcChain), kubeNodePortRules, t)
|
|
}
|
|
}
|
|
|
|
func TestExternalIPsReject(t *testing.T) {
|
|
ipt := iptablestest.NewFake()
|
|
fp := NewFakeProxier(ipt)
|
|
svcIP := "10.20.30.41"
|
|
svcPort := 80
|
|
svcExternalIPs := "50.60.70.81"
|
|
svcPortName := proxy.ServicePortName{
|
|
NamespacedName: makeNSN("ns1", "svc1"),
|
|
Port: "p80",
|
|
}
|
|
|
|
makeServiceMap(fp,
|
|
makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *api.Service) {
|
|
svc.Spec.Type = "ClusterIP"
|
|
svc.Spec.ClusterIP = svcIP
|
|
svc.Spec.ExternalIPs = []string{svcExternalIPs}
|
|
svc.Spec.Ports = []api.ServicePort{{
|
|
Name: svcPortName.Port,
|
|
Port: int32(svcPort),
|
|
Protocol: api.ProtocolTCP,
|
|
TargetPort: intstr.FromInt(svcPort),
|
|
}}
|
|
}),
|
|
)
|
|
makeEndpointsMap(fp)
|
|
|
|
fp.syncProxyRules()
|
|
|
|
kubeSvcRules := ipt.GetRules(string(kubeExternalServicesChain))
|
|
if !hasJump(kubeSvcRules, iptablestest.Reject, svcExternalIPs, svcPort) {
|
|
errorf(fmt.Sprintf("Failed to a %v rule for externalIP %v with no endpoints", iptablestest.Reject, svcPortName), kubeSvcRules, t)
|
|
}
|
|
}
|
|
|
|
func TestNodePortReject(t *testing.T) {
|
|
ipt := iptablestest.NewFake()
|
|
fp := NewFakeProxier(ipt)
|
|
svcIP := "10.20.30.41"
|
|
svcPort := 80
|
|
svcNodePort := 3001
|
|
svcPortName := proxy.ServicePortName{
|
|
NamespacedName: makeNSN("ns1", "svc1"),
|
|
Port: "p80",
|
|
}
|
|
|
|
makeServiceMap(fp,
|
|
makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *api.Service) {
|
|
svc.Spec.Type = "NodePort"
|
|
svc.Spec.ClusterIP = svcIP
|
|
svc.Spec.Ports = []api.ServicePort{{
|
|
Name: svcPortName.Port,
|
|
Port: int32(svcPort),
|
|
Protocol: api.ProtocolTCP,
|
|
NodePort: int32(svcNodePort),
|
|
}}
|
|
}),
|
|
)
|
|
makeEndpointsMap(fp)
|
|
|
|
fp.syncProxyRules()
|
|
|
|
kubeSvcRules := ipt.GetRules(string(kubeExternalServicesChain))
|
|
if !hasJump(kubeSvcRules, iptablestest.Reject, svcIP, svcNodePort) {
|
|
errorf(fmt.Sprintf("Failed to find a %v rule for service %v with no endpoints", iptablestest.Reject, svcPortName), kubeSvcRules, t)
|
|
}
|
|
}
|
|
|
|
func strPtr(s string) *string {
|
|
return &s
|
|
}
|
|
|
|
func TestOnlyLocalLoadBalancing(t *testing.T) {
|
|
ipt := iptablestest.NewFake()
|
|
fp := NewFakeProxier(ipt)
|
|
svcIP := "10.20.30.41"
|
|
svcPort := 80
|
|
svcNodePort := 3001
|
|
svcLBIP := "1.2.3.4"
|
|
svcPortName := proxy.ServicePortName{
|
|
NamespacedName: makeNSN("ns1", "svc1"),
|
|
Port: "p80",
|
|
}
|
|
svcSessionAffinityTimeout := int32(10800)
|
|
|
|
makeServiceMap(fp,
|
|
makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *api.Service) {
|
|
svc.Spec.Type = "LoadBalancer"
|
|
svc.Spec.ClusterIP = svcIP
|
|
svc.Spec.Ports = []api.ServicePort{{
|
|
Name: svcPortName.Port,
|
|
Port: int32(svcPort),
|
|
Protocol: api.ProtocolTCP,
|
|
NodePort: int32(svcNodePort),
|
|
}}
|
|
svc.Status.LoadBalancer.Ingress = []api.LoadBalancerIngress{{
|
|
IP: svcLBIP,
|
|
}}
|
|
svc.Spec.ExternalTrafficPolicy = api.ServiceExternalTrafficPolicyTypeLocal
|
|
svc.Spec.SessionAffinity = api.ServiceAffinityClientIP
|
|
svc.Spec.SessionAffinityConfig = &api.SessionAffinityConfig{
|
|
ClientIP: &api.ClientIPConfig{TimeoutSeconds: &svcSessionAffinityTimeout},
|
|
}
|
|
}),
|
|
)
|
|
|
|
epIP1 := "10.180.0.1"
|
|
epIP2 := "10.180.2.1"
|
|
epStrLocal := fmt.Sprintf("%s:%d", epIP1, svcPort)
|
|
epStrNonLocal := fmt.Sprintf("%s:%d", epIP2, svcPort)
|
|
makeEndpointsMap(fp,
|
|
makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, func(ept *api.Endpoints) {
|
|
ept.Subsets = []api.EndpointSubset{{
|
|
Addresses: []api.EndpointAddress{{
|
|
IP: epIP1,
|
|
NodeName: nil,
|
|
}, {
|
|
IP: epIP2,
|
|
NodeName: strPtr(testHostname),
|
|
}},
|
|
Ports: []api.EndpointPort{{
|
|
Name: svcPortName.Port,
|
|
Port: int32(svcPort),
|
|
}},
|
|
}}
|
|
}),
|
|
)
|
|
|
|
fp.syncProxyRules()
|
|
|
|
proto := strings.ToLower(string(api.ProtocolTCP))
|
|
fwChain := string(serviceFirewallChainName(svcPortName.String(), proto))
|
|
lbChain := string(serviceLBChainName(svcPortName.String(), proto))
|
|
|
|
nonLocalEpChain := string(servicePortEndpointChainName(svcPortName.String(), strings.ToLower(string(api.ProtocolTCP)), epStrLocal))
|
|
localEpChain := string(servicePortEndpointChainName(svcPortName.String(), strings.ToLower(string(api.ProtocolTCP)), epStrNonLocal))
|
|
|
|
kubeSvcRules := ipt.GetRules(string(kubeServicesChain))
|
|
if !hasJump(kubeSvcRules, fwChain, svcLBIP, svcPort) {
|
|
errorf(fmt.Sprintf("Failed to find jump to firewall chain %v", fwChain), kubeSvcRules, t)
|
|
}
|
|
|
|
fwRules := ipt.GetRules(fwChain)
|
|
if !hasJump(fwRules, lbChain, "", 0) {
|
|
errorf(fmt.Sprintf("Failed to find jump from firewall chain %v to svc chain %v", fwChain, lbChain), fwRules, t)
|
|
}
|
|
if hasJump(fwRules, string(KubeMarkMasqChain), "", 0) {
|
|
errorf(fmt.Sprintf("Found jump from fw chain %v to MASQUERADE", fwChain), fwRules, t)
|
|
}
|
|
|
|
lbRules := ipt.GetRules(lbChain)
|
|
if hasJump(lbRules, nonLocalEpChain, "", 0) {
|
|
errorf(fmt.Sprintf("Found jump from lb chain %v to non-local ep %v", lbChain, epStrLocal), lbRules, t)
|
|
}
|
|
if !hasJump(lbRules, localEpChain, "", 0) {
|
|
errorf(fmt.Sprintf("Didn't find jump from lb chain %v to local ep %v", lbChain, epStrNonLocal), lbRules, t)
|
|
}
|
|
if !hasSessionAffinityRule(lbRules) {
|
|
errorf(fmt.Sprintf("Didn't find session affinity rule from lb chain %v", lbChain), lbRules, t)
|
|
}
|
|
}
|
|
|
|
func TestOnlyLocalNodePortsNoClusterCIDR(t *testing.T) {
|
|
ipt := iptablestest.NewFake()
|
|
fp := NewFakeProxier(ipt)
|
|
// set cluster CIDR to empty before test
|
|
fp.clusterCIDR = ""
|
|
onlyLocalNodePorts(t, fp, ipt)
|
|
}
|
|
|
|
func TestOnlyLocalNodePorts(t *testing.T) {
|
|
ipt := iptablestest.NewFake()
|
|
fp := NewFakeProxier(ipt)
|
|
onlyLocalNodePorts(t, fp, ipt)
|
|
}
|
|
|
|
func onlyLocalNodePorts(t *testing.T, fp *Proxier, ipt *iptablestest.FakeIPTables) {
|
|
shouldLBTOSVCRuleExist := len(fp.clusterCIDR) > 0
|
|
svcIP := "10.20.30.41"
|
|
svcPort := 80
|
|
svcNodePort := 3001
|
|
svcPortName := proxy.ServicePortName{
|
|
NamespacedName: makeNSN("ns1", "svc1"),
|
|
Port: "p80",
|
|
}
|
|
|
|
makeServiceMap(fp,
|
|
makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *api.Service) {
|
|
svc.Spec.Type = "NodePort"
|
|
svc.Spec.ClusterIP = svcIP
|
|
svc.Spec.Ports = []api.ServicePort{{
|
|
Name: svcPortName.Port,
|
|
Port: int32(svcPort),
|
|
Protocol: api.ProtocolTCP,
|
|
NodePort: int32(svcNodePort),
|
|
}}
|
|
svc.Spec.ExternalTrafficPolicy = api.ServiceExternalTrafficPolicyTypeLocal
|
|
}),
|
|
)
|
|
|
|
epIP1 := "10.180.0.1"
|
|
epIP2 := "10.180.2.1"
|
|
epStrLocal := fmt.Sprintf("%s:%d", epIP1, svcPort)
|
|
epStrNonLocal := fmt.Sprintf("%s:%d", epIP2, svcPort)
|
|
makeEndpointsMap(fp,
|
|
makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, func(ept *api.Endpoints) {
|
|
ept.Subsets = []api.EndpointSubset{{
|
|
Addresses: []api.EndpointAddress{{
|
|
IP: epIP1,
|
|
NodeName: nil,
|
|
}, {
|
|
IP: epIP2,
|
|
NodeName: strPtr(testHostname),
|
|
}},
|
|
Ports: []api.EndpointPort{{
|
|
Name: svcPortName.Port,
|
|
Port: int32(svcPort),
|
|
}},
|
|
}}
|
|
}),
|
|
)
|
|
|
|
fp.syncProxyRules()
|
|
|
|
proto := strings.ToLower(string(api.ProtocolTCP))
|
|
lbChain := string(serviceLBChainName(svcPortName.String(), proto))
|
|
|
|
nonLocalEpChain := string(servicePortEndpointChainName(svcPortName.String(), proto, epStrLocal))
|
|
localEpChain := string(servicePortEndpointChainName(svcPortName.String(), proto, epStrNonLocal))
|
|
|
|
kubeNodePortRules := ipt.GetRules(string(kubeNodePortsChain))
|
|
if !hasJump(kubeNodePortRules, lbChain, "", svcNodePort) {
|
|
errorf(fmt.Sprintf("Failed to find jump to lb chain %v", lbChain), kubeNodePortRules, t)
|
|
}
|
|
|
|
if !hasJump(kubeNodePortRules, string(KubeMarkMasqChain), "", svcNodePort) {
|
|
errorf(fmt.Sprintf("Failed to find jump to %s chain for destination IP %d", KubeMarkMasqChain, svcNodePort), kubeNodePortRules, t)
|
|
}
|
|
|
|
svcChain := string(servicePortChainName(svcPortName.String(), proto))
|
|
lbRules := ipt.GetRules(lbChain)
|
|
if hasJump(lbRules, nonLocalEpChain, "", 0) {
|
|
errorf(fmt.Sprintf("Found jump from lb chain %v to non-local ep %v", lbChain, epStrLocal), lbRules, t)
|
|
}
|
|
if hasJump(lbRules, svcChain, "", 0) != shouldLBTOSVCRuleExist {
|
|
prefix := "Did not find "
|
|
if !shouldLBTOSVCRuleExist {
|
|
prefix = "Found "
|
|
}
|
|
errorf(fmt.Sprintf("%s jump from lb chain %v to svc %v", prefix, lbChain, svcChain), lbRules, t)
|
|
}
|
|
if !hasJump(lbRules, localEpChain, "", 0) {
|
|
errorf(fmt.Sprintf("Didn't find jump from lb chain %v to local ep %v", lbChain, epStrLocal), lbRules, t)
|
|
}
|
|
}
|
|
|
|
func makeTestService(namespace, name string, svcFunc func(*api.Service)) *api.Service {
|
|
svc := &api.Service{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: name,
|
|
Namespace: namespace,
|
|
Annotations: map[string]string{},
|
|
},
|
|
Spec: api.ServiceSpec{},
|
|
Status: api.ServiceStatus{},
|
|
}
|
|
svcFunc(svc)
|
|
return svc
|
|
}
|
|
|
|
func addTestPort(array []api.ServicePort, name string, protocol api.Protocol, port, nodeport int32, targetPort int) []api.ServicePort {
|
|
svcPort := api.ServicePort{
|
|
Name: name,
|
|
Protocol: protocol,
|
|
Port: port,
|
|
NodePort: nodeport,
|
|
TargetPort: intstr.FromInt(targetPort),
|
|
}
|
|
return append(array, svcPort)
|
|
}
|
|
|
|
func TestBuildServiceMapAddRemove(t *testing.T) {
|
|
ipt := iptablestest.NewFake()
|
|
fp := NewFakeProxier(ipt)
|
|
|
|
services := []*api.Service{
|
|
makeTestService("somewhere-else", "cluster-ip", func(svc *api.Service) {
|
|
svc.Spec.Type = api.ServiceTypeClusterIP
|
|
svc.Spec.ClusterIP = "172.16.55.4"
|
|
svc.Spec.Ports = addTestPort(svc.Spec.Ports, "something", "UDP", 1234, 4321, 0)
|
|
svc.Spec.Ports = addTestPort(svc.Spec.Ports, "somethingelse", "UDP", 1235, 5321, 0)
|
|
}),
|
|
makeTestService("somewhere-else", "node-port", func(svc *api.Service) {
|
|
svc.Spec.Type = api.ServiceTypeNodePort
|
|
svc.Spec.ClusterIP = "172.16.55.10"
|
|
svc.Spec.Ports = addTestPort(svc.Spec.Ports, "blahblah", "UDP", 345, 678, 0)
|
|
svc.Spec.Ports = addTestPort(svc.Spec.Ports, "moreblahblah", "TCP", 344, 677, 0)
|
|
}),
|
|
makeTestService("somewhere", "load-balancer", func(svc *api.Service) {
|
|
svc.Spec.Type = api.ServiceTypeLoadBalancer
|
|
svc.Spec.ClusterIP = "172.16.55.11"
|
|
svc.Spec.LoadBalancerIP = "5.6.7.8"
|
|
svc.Spec.Ports = addTestPort(svc.Spec.Ports, "foobar", "UDP", 8675, 30061, 7000)
|
|
svc.Spec.Ports = addTestPort(svc.Spec.Ports, "baz", "UDP", 8676, 30062, 7001)
|
|
svc.Status.LoadBalancer = api.LoadBalancerStatus{
|
|
Ingress: []api.LoadBalancerIngress{
|
|
{IP: "10.1.2.4"},
|
|
},
|
|
}
|
|
}),
|
|
makeTestService("somewhere", "only-local-load-balancer", func(svc *api.Service) {
|
|
svc.Spec.Type = api.ServiceTypeLoadBalancer
|
|
svc.Spec.ClusterIP = "172.16.55.12"
|
|
svc.Spec.LoadBalancerIP = "5.6.7.8"
|
|
svc.Spec.Ports = addTestPort(svc.Spec.Ports, "foobar2", "UDP", 8677, 30063, 7002)
|
|
svc.Spec.Ports = addTestPort(svc.Spec.Ports, "baz", "UDP", 8678, 30064, 7003)
|
|
svc.Status.LoadBalancer = api.LoadBalancerStatus{
|
|
Ingress: []api.LoadBalancerIngress{
|
|
{IP: "10.1.2.3"},
|
|
},
|
|
}
|
|
svc.Spec.ExternalTrafficPolicy = api.ServiceExternalTrafficPolicyTypeLocal
|
|
svc.Spec.HealthCheckNodePort = 345
|
|
}),
|
|
}
|
|
|
|
for i := range services {
|
|
fp.OnServiceAdd(services[i])
|
|
}
|
|
result := proxy.UpdateServiceMap(fp.serviceMap, fp.serviceChanges)
|
|
if len(fp.serviceMap) != 8 {
|
|
t.Errorf("expected service map length 8, got %v", fp.serviceMap)
|
|
}
|
|
|
|
// The only-local-loadbalancer ones get added
|
|
if len(result.HCServiceNodePorts) != 1 {
|
|
t.Errorf("expected 1 healthcheck port, got %v", result.HCServiceNodePorts)
|
|
} else {
|
|
nsn := makeNSN("somewhere", "only-local-load-balancer")
|
|
if port, found := result.HCServiceNodePorts[nsn]; !found || port != 345 {
|
|
t.Errorf("expected healthcheck port [%q]=345: got %v", nsn, result.HCServiceNodePorts)
|
|
}
|
|
}
|
|
|
|
if len(result.UDPStaleClusterIP) != 0 {
|
|
// Services only added, so nothing stale yet
|
|
t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP))
|
|
}
|
|
|
|
// Remove some stuff
|
|
// oneService is a modification of services[0] with removed first port.
|
|
oneService := makeTestService("somewhere-else", "cluster-ip", func(svc *api.Service) {
|
|
svc.Spec.Type = api.ServiceTypeClusterIP
|
|
svc.Spec.ClusterIP = "172.16.55.4"
|
|
svc.Spec.Ports = addTestPort(svc.Spec.Ports, "somethingelse", "UDP", 1235, 5321, 0)
|
|
})
|
|
|
|
fp.OnServiceUpdate(services[0], oneService)
|
|
fp.OnServiceDelete(services[1])
|
|
fp.OnServiceDelete(services[2])
|
|
fp.OnServiceDelete(services[3])
|
|
|
|
result = proxy.UpdateServiceMap(fp.serviceMap, fp.serviceChanges)
|
|
if len(fp.serviceMap) != 1 {
|
|
t.Errorf("expected service map length 1, got %v", fp.serviceMap)
|
|
}
|
|
|
|
if len(result.HCServiceNodePorts) != 0 {
|
|
t.Errorf("expected 0 healthcheck ports, got %v", result.HCServiceNodePorts)
|
|
}
|
|
|
|
// All services but one were deleted. While you'd expect only the ClusterIPs
|
|
// from the three deleted services here, we still have the ClusterIP for
|
|
// the not-deleted service, because one of it's ServicePorts was deleted.
|
|
expectedStaleUDPServices := []string{"172.16.55.10", "172.16.55.4", "172.16.55.11", "172.16.55.12"}
|
|
if len(result.UDPStaleClusterIP) != len(expectedStaleUDPServices) {
|
|
t.Errorf("expected stale UDP services length %d, got %v", len(expectedStaleUDPServices), result.UDPStaleClusterIP.UnsortedList())
|
|
}
|
|
for _, ip := range expectedStaleUDPServices {
|
|
if !result.UDPStaleClusterIP.Has(ip) {
|
|
t.Errorf("expected stale UDP service service %s", ip)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestBuildServiceMapServiceHeadless(t *testing.T) {
|
|
ipt := iptablestest.NewFake()
|
|
fp := NewFakeProxier(ipt)
|
|
|
|
makeServiceMap(fp,
|
|
makeTestService("somewhere-else", "headless", func(svc *api.Service) {
|
|
svc.Spec.Type = api.ServiceTypeClusterIP
|
|
svc.Spec.ClusterIP = api.ClusterIPNone
|
|
svc.Spec.Ports = addTestPort(svc.Spec.Ports, "rpc", "UDP", 1234, 0, 0)
|
|
}),
|
|
makeTestService("somewhere-else", "headless-without-port", func(svc *api.Service) {
|
|
svc.Spec.Type = api.ServiceTypeClusterIP
|
|
svc.Spec.ClusterIP = api.ClusterIPNone
|
|
}),
|
|
)
|
|
|
|
// Headless service should be ignored
|
|
result := proxy.UpdateServiceMap(fp.serviceMap, fp.serviceChanges)
|
|
if len(fp.serviceMap) != 0 {
|
|
t.Errorf("expected service map length 0, got %d", len(fp.serviceMap))
|
|
}
|
|
|
|
// No proxied services, so no healthchecks
|
|
if len(result.HCServiceNodePorts) != 0 {
|
|
t.Errorf("expected healthcheck ports length 0, got %d", len(result.HCServiceNodePorts))
|
|
}
|
|
|
|
if len(result.UDPStaleClusterIP) != 0 {
|
|
t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP))
|
|
}
|
|
}
|
|
|
|
func TestBuildServiceMapServiceTypeExternalName(t *testing.T) {
|
|
ipt := iptablestest.NewFake()
|
|
fp := NewFakeProxier(ipt)
|
|
|
|
makeServiceMap(fp,
|
|
makeTestService("somewhere-else", "external-name", func(svc *api.Service) {
|
|
svc.Spec.Type = api.ServiceTypeExternalName
|
|
svc.Spec.ClusterIP = "172.16.55.4" // Should be ignored
|
|
svc.Spec.ExternalName = "foo2.bar.com"
|
|
svc.Spec.Ports = addTestPort(svc.Spec.Ports, "blah", "UDP", 1235, 5321, 0)
|
|
}),
|
|
)
|
|
|
|
result := proxy.UpdateServiceMap(fp.serviceMap, fp.serviceChanges)
|
|
if len(fp.serviceMap) != 0 {
|
|
t.Errorf("expected service map length 0, got %v", fp.serviceMap)
|
|
}
|
|
// No proxied services, so no healthchecks
|
|
if len(result.HCServiceNodePorts) != 0 {
|
|
t.Errorf("expected healthcheck ports length 0, got %v", result.HCServiceNodePorts)
|
|
}
|
|
if len(result.UDPStaleClusterIP) != 0 {
|
|
t.Errorf("expected stale UDP services length 0, got %v", result.UDPStaleClusterIP)
|
|
}
|
|
}
|
|
|
|
func TestBuildServiceMapServiceUpdate(t *testing.T) {
|
|
ipt := iptablestest.NewFake()
|
|
fp := NewFakeProxier(ipt)
|
|
|
|
servicev1 := makeTestService("somewhere", "some-service", func(svc *api.Service) {
|
|
svc.Spec.Type = api.ServiceTypeClusterIP
|
|
svc.Spec.ClusterIP = "172.16.55.4"
|
|
svc.Spec.Ports = addTestPort(svc.Spec.Ports, "something", "UDP", 1234, 4321, 0)
|
|
svc.Spec.Ports = addTestPort(svc.Spec.Ports, "somethingelse", "TCP", 1235, 5321, 0)
|
|
})
|
|
servicev2 := makeTestService("somewhere", "some-service", func(svc *api.Service) {
|
|
svc.Spec.Type = api.ServiceTypeLoadBalancer
|
|
svc.Spec.ClusterIP = "172.16.55.4"
|
|
svc.Spec.LoadBalancerIP = "5.6.7.8"
|
|
svc.Spec.Ports = addTestPort(svc.Spec.Ports, "something", "UDP", 1234, 4321, 7002)
|
|
svc.Spec.Ports = addTestPort(svc.Spec.Ports, "somethingelse", "TCP", 1235, 5321, 7003)
|
|
svc.Status.LoadBalancer = api.LoadBalancerStatus{
|
|
Ingress: []api.LoadBalancerIngress{
|
|
{IP: "10.1.2.3"},
|
|
},
|
|
}
|
|
svc.Spec.ExternalTrafficPolicy = api.ServiceExternalTrafficPolicyTypeLocal
|
|
svc.Spec.HealthCheckNodePort = 345
|
|
})
|
|
|
|
fp.OnServiceAdd(servicev1)
|
|
|
|
result := proxy.UpdateServiceMap(fp.serviceMap, fp.serviceChanges)
|
|
if len(fp.serviceMap) != 2 {
|
|
t.Errorf("expected service map length 2, got %v", fp.serviceMap)
|
|
}
|
|
if len(result.HCServiceNodePorts) != 0 {
|
|
t.Errorf("expected healthcheck ports length 0, got %v", result.HCServiceNodePorts)
|
|
}
|
|
if len(result.UDPStaleClusterIP) != 0 {
|
|
// Services only added, so nothing stale yet
|
|
t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP))
|
|
}
|
|
|
|
// Change service to load-balancer
|
|
fp.OnServiceUpdate(servicev1, servicev2)
|
|
result = proxy.UpdateServiceMap(fp.serviceMap, fp.serviceChanges)
|
|
if len(fp.serviceMap) != 2 {
|
|
t.Errorf("expected service map length 2, got %v", fp.serviceMap)
|
|
}
|
|
if len(result.HCServiceNodePorts) != 1 {
|
|
t.Errorf("expected healthcheck ports length 1, got %v", result.HCServiceNodePorts)
|
|
}
|
|
if len(result.UDPStaleClusterIP) != 0 {
|
|
t.Errorf("expected stale UDP services length 0, got %v", result.UDPStaleClusterIP.UnsortedList())
|
|
}
|
|
|
|
// No change; make sure the service map stays the same and there are
|
|
// no health-check changes
|
|
fp.OnServiceUpdate(servicev2, servicev2)
|
|
result = proxy.UpdateServiceMap(fp.serviceMap, fp.serviceChanges)
|
|
if len(fp.serviceMap) != 2 {
|
|
t.Errorf("expected service map length 2, got %v", fp.serviceMap)
|
|
}
|
|
if len(result.HCServiceNodePorts) != 1 {
|
|
t.Errorf("expected healthcheck ports length 1, got %v", result.HCServiceNodePorts)
|
|
}
|
|
if len(result.UDPStaleClusterIP) != 0 {
|
|
t.Errorf("expected stale UDP services length 0, got %v", result.UDPStaleClusterIP.UnsortedList())
|
|
}
|
|
|
|
// And back to ClusterIP
|
|
fp.OnServiceUpdate(servicev2, servicev1)
|
|
result = proxy.UpdateServiceMap(fp.serviceMap, fp.serviceChanges)
|
|
if len(fp.serviceMap) != 2 {
|
|
t.Errorf("expected service map length 2, got %v", fp.serviceMap)
|
|
}
|
|
if len(result.HCServiceNodePorts) != 0 {
|
|
t.Errorf("expected healthcheck ports length 0, got %v", result.HCServiceNodePorts)
|
|
}
|
|
if len(result.UDPStaleClusterIP) != 0 {
|
|
// Services only added, so nothing stale yet
|
|
t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP))
|
|
}
|
|
}
|
|
|
|
func makeTestEndpoints(namespace, name string, eptFunc func(*api.Endpoints)) *api.Endpoints {
|
|
ept := &api.Endpoints{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: name,
|
|
Namespace: namespace,
|
|
},
|
|
}
|
|
eptFunc(ept)
|
|
return ept
|
|
}
|
|
|
|
func makeEndpointsMap(proxier *Proxier, allEndpoints ...*api.Endpoints) {
|
|
for i := range allEndpoints {
|
|
proxier.OnEndpointsAdd(allEndpoints[i])
|
|
}
|
|
|
|
proxier.mu.Lock()
|
|
defer proxier.mu.Unlock()
|
|
proxier.endpointsSynced = true
|
|
}
|
|
|
|
func makeNSN(namespace, name string) types.NamespacedName {
|
|
return types.NamespacedName{Namespace: namespace, Name: name}
|
|
}
|
|
|
|
func makeServicePortName(ns, name, port string) proxy.ServicePortName {
|
|
return proxy.ServicePortName{
|
|
NamespacedName: makeNSN(ns, name),
|
|
Port: port,
|
|
}
|
|
}
|
|
|
|
func makeServiceMap(proxier *Proxier, allServices ...*api.Service) {
|
|
for i := range allServices {
|
|
proxier.OnServiceAdd(allServices[i])
|
|
}
|
|
|
|
proxier.mu.Lock()
|
|
defer proxier.mu.Unlock()
|
|
proxier.servicesSynced = true
|
|
}
|
|
|
|
func compareEndpointsMaps(t *testing.T, tci int, newMap proxy.EndpointsMap, expected map[proxy.ServicePortName][]*endpointsInfo) {
|
|
if len(newMap) != len(expected) {
|
|
t.Errorf("[%d] expected %d results, got %d: %v", tci, len(expected), len(newMap), newMap)
|
|
}
|
|
for x := range expected {
|
|
if len(newMap[x]) != len(expected[x]) {
|
|
t.Errorf("[%d] expected %d endpoints for %v, got %d", tci, len(expected[x]), x, len(newMap[x]))
|
|
} else {
|
|
for i := range expected[x] {
|
|
newEp, ok := newMap[x][i].(*endpointsInfo)
|
|
if !ok {
|
|
t.Errorf("Failed to cast endpointsInfo")
|
|
continue
|
|
}
|
|
if *newEp != *(expected[x][i]) {
|
|
t.Errorf("[%d] expected new[%v][%d] to be %v, got %v", tci, x, i, expected[x][i], newEp)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func Test_updateEndpointsMap(t *testing.T) {
|
|
var nodeName = testHostname
|
|
|
|
emptyEndpoint := func(ept *api.Endpoints) {
|
|
ept.Subsets = []api.EndpointSubset{}
|
|
}
|
|
unnamedPort := func(ept *api.Endpoints) {
|
|
ept.Subsets = []api.EndpointSubset{{
|
|
Addresses: []api.EndpointAddress{{
|
|
IP: "1.1.1.1",
|
|
}},
|
|
Ports: []api.EndpointPort{{
|
|
Port: 11,
|
|
}},
|
|
}}
|
|
}
|
|
unnamedPortLocal := func(ept *api.Endpoints) {
|
|
ept.Subsets = []api.EndpointSubset{{
|
|
Addresses: []api.EndpointAddress{{
|
|
IP: "1.1.1.1",
|
|
NodeName: &nodeName,
|
|
}},
|
|
Ports: []api.EndpointPort{{
|
|
Port: 11,
|
|
}},
|
|
}}
|
|
}
|
|
namedPortLocal := func(ept *api.Endpoints) {
|
|
ept.Subsets = []api.EndpointSubset{{
|
|
Addresses: []api.EndpointAddress{{
|
|
IP: "1.1.1.1",
|
|
NodeName: &nodeName,
|
|
}},
|
|
Ports: []api.EndpointPort{{
|
|
Name: "p11",
|
|
Port: 11,
|
|
}},
|
|
}}
|
|
}
|
|
namedPort := func(ept *api.Endpoints) {
|
|
ept.Subsets = []api.EndpointSubset{{
|
|
Addresses: []api.EndpointAddress{{
|
|
IP: "1.1.1.1",
|
|
}},
|
|
Ports: []api.EndpointPort{{
|
|
Name: "p11",
|
|
Port: 11,
|
|
}},
|
|
}}
|
|
}
|
|
namedPortRenamed := func(ept *api.Endpoints) {
|
|
ept.Subsets = []api.EndpointSubset{{
|
|
Addresses: []api.EndpointAddress{{
|
|
IP: "1.1.1.1",
|
|
}},
|
|
Ports: []api.EndpointPort{{
|
|
Name: "p11-2",
|
|
Port: 11,
|
|
}},
|
|
}}
|
|
}
|
|
namedPortRenumbered := func(ept *api.Endpoints) {
|
|
ept.Subsets = []api.EndpointSubset{{
|
|
Addresses: []api.EndpointAddress{{
|
|
IP: "1.1.1.1",
|
|
}},
|
|
Ports: []api.EndpointPort{{
|
|
Name: "p11",
|
|
Port: 22,
|
|
}},
|
|
}}
|
|
}
|
|
namedPortsLocalNoLocal := func(ept *api.Endpoints) {
|
|
ept.Subsets = []api.EndpointSubset{{
|
|
Addresses: []api.EndpointAddress{{
|
|
IP: "1.1.1.1",
|
|
}, {
|
|
IP: "1.1.1.2",
|
|
NodeName: &nodeName,
|
|
}},
|
|
Ports: []api.EndpointPort{{
|
|
Name: "p11",
|
|
Port: 11,
|
|
}, {
|
|
Name: "p12",
|
|
Port: 12,
|
|
}},
|
|
}}
|
|
}
|
|
multipleSubsets := func(ept *api.Endpoints) {
|
|
ept.Subsets = []api.EndpointSubset{{
|
|
Addresses: []api.EndpointAddress{{
|
|
IP: "1.1.1.1",
|
|
}},
|
|
Ports: []api.EndpointPort{{
|
|
Name: "p11",
|
|
Port: 11,
|
|
}},
|
|
}, {
|
|
Addresses: []api.EndpointAddress{{
|
|
IP: "1.1.1.2",
|
|
}},
|
|
Ports: []api.EndpointPort{{
|
|
Name: "p12",
|
|
Port: 12,
|
|
}},
|
|
}}
|
|
}
|
|
multipleSubsetsWithLocal := func(ept *api.Endpoints) {
|
|
ept.Subsets = []api.EndpointSubset{{
|
|
Addresses: []api.EndpointAddress{{
|
|
IP: "1.1.1.1",
|
|
}},
|
|
Ports: []api.EndpointPort{{
|
|
Name: "p11",
|
|
Port: 11,
|
|
}},
|
|
}, {
|
|
Addresses: []api.EndpointAddress{{
|
|
IP: "1.1.1.2",
|
|
NodeName: &nodeName,
|
|
}},
|
|
Ports: []api.EndpointPort{{
|
|
Name: "p12",
|
|
Port: 12,
|
|
}},
|
|
}}
|
|
}
|
|
multipleSubsetsMultiplePortsLocal := func(ept *api.Endpoints) {
|
|
ept.Subsets = []api.EndpointSubset{{
|
|
Addresses: []api.EndpointAddress{{
|
|
IP: "1.1.1.1",
|
|
NodeName: &nodeName,
|
|
}},
|
|
Ports: []api.EndpointPort{{
|
|
Name: "p11",
|
|
Port: 11,
|
|
}, {
|
|
Name: "p12",
|
|
Port: 12,
|
|
}},
|
|
}, {
|
|
Addresses: []api.EndpointAddress{{
|
|
IP: "1.1.1.3",
|
|
}},
|
|
Ports: []api.EndpointPort{{
|
|
Name: "p13",
|
|
Port: 13,
|
|
}},
|
|
}}
|
|
}
|
|
multipleSubsetsIPsPorts1 := func(ept *api.Endpoints) {
|
|
ept.Subsets = []api.EndpointSubset{{
|
|
Addresses: []api.EndpointAddress{{
|
|
IP: "1.1.1.1",
|
|
}, {
|
|
IP: "1.1.1.2",
|
|
NodeName: &nodeName,
|
|
}},
|
|
Ports: []api.EndpointPort{{
|
|
Name: "p11",
|
|
Port: 11,
|
|
}, {
|
|
Name: "p12",
|
|
Port: 12,
|
|
}},
|
|
}, {
|
|
Addresses: []api.EndpointAddress{{
|
|
IP: "1.1.1.3",
|
|
}, {
|
|
IP: "1.1.1.4",
|
|
NodeName: &nodeName,
|
|
}},
|
|
Ports: []api.EndpointPort{{
|
|
Name: "p13",
|
|
Port: 13,
|
|
}, {
|
|
Name: "p14",
|
|
Port: 14,
|
|
}},
|
|
}}
|
|
}
|
|
multipleSubsetsIPsPorts2 := func(ept *api.Endpoints) {
|
|
ept.Subsets = []api.EndpointSubset{{
|
|
Addresses: []api.EndpointAddress{{
|
|
IP: "2.2.2.1",
|
|
}, {
|
|
IP: "2.2.2.2",
|
|
NodeName: &nodeName,
|
|
}},
|
|
Ports: []api.EndpointPort{{
|
|
Name: "p21",
|
|
Port: 21,
|
|
}, {
|
|
Name: "p22",
|
|
Port: 22,
|
|
}},
|
|
}}
|
|
}
|
|
complexBefore1 := func(ept *api.Endpoints) {
|
|
ept.Subsets = []api.EndpointSubset{{
|
|
Addresses: []api.EndpointAddress{{
|
|
IP: "1.1.1.1",
|
|
}},
|
|
Ports: []api.EndpointPort{{
|
|
Name: "p11",
|
|
Port: 11,
|
|
}},
|
|
}}
|
|
}
|
|
complexBefore2 := func(ept *api.Endpoints) {
|
|
ept.Subsets = []api.EndpointSubset{{
|
|
Addresses: []api.EndpointAddress{{
|
|
IP: "2.2.2.2",
|
|
NodeName: &nodeName,
|
|
}, {
|
|
IP: "2.2.2.22",
|
|
NodeName: &nodeName,
|
|
}},
|
|
Ports: []api.EndpointPort{{
|
|
Name: "p22",
|
|
Port: 22,
|
|
}},
|
|
}, {
|
|
Addresses: []api.EndpointAddress{{
|
|
IP: "2.2.2.3",
|
|
NodeName: &nodeName,
|
|
}},
|
|
Ports: []api.EndpointPort{{
|
|
Name: "p23",
|
|
Port: 23,
|
|
}},
|
|
}}
|
|
}
|
|
complexBefore4 := func(ept *api.Endpoints) {
|
|
ept.Subsets = []api.EndpointSubset{{
|
|
Addresses: []api.EndpointAddress{{
|
|
IP: "4.4.4.4",
|
|
NodeName: &nodeName,
|
|
}, {
|
|
IP: "4.4.4.5",
|
|
NodeName: &nodeName,
|
|
}},
|
|
Ports: []api.EndpointPort{{
|
|
Name: "p44",
|
|
Port: 44,
|
|
}},
|
|
}, {
|
|
Addresses: []api.EndpointAddress{{
|
|
IP: "4.4.4.6",
|
|
NodeName: &nodeName,
|
|
}},
|
|
Ports: []api.EndpointPort{{
|
|
Name: "p45",
|
|
Port: 45,
|
|
}},
|
|
}}
|
|
}
|
|
complexAfter1 := func(ept *api.Endpoints) {
|
|
ept.Subsets = []api.EndpointSubset{{
|
|
Addresses: []api.EndpointAddress{{
|
|
IP: "1.1.1.1",
|
|
}, {
|
|
IP: "1.1.1.11",
|
|
}},
|
|
Ports: []api.EndpointPort{{
|
|
Name: "p11",
|
|
Port: 11,
|
|
}},
|
|
}, {
|
|
Addresses: []api.EndpointAddress{{
|
|
IP: "1.1.1.2",
|
|
}},
|
|
Ports: []api.EndpointPort{{
|
|
Name: "p12",
|
|
Port: 12,
|
|
}, {
|
|
Name: "p122",
|
|
Port: 122,
|
|
}},
|
|
}}
|
|
}
|
|
complexAfter3 := func(ept *api.Endpoints) {
|
|
ept.Subsets = []api.EndpointSubset{{
|
|
Addresses: []api.EndpointAddress{{
|
|
IP: "3.3.3.3",
|
|
}},
|
|
Ports: []api.EndpointPort{{
|
|
Name: "p33",
|
|
Port: 33,
|
|
}},
|
|
}}
|
|
}
|
|
complexAfter4 := func(ept *api.Endpoints) {
|
|
ept.Subsets = []api.EndpointSubset{{
|
|
Addresses: []api.EndpointAddress{{
|
|
IP: "4.4.4.4",
|
|
NodeName: &nodeName,
|
|
}},
|
|
Ports: []api.EndpointPort{{
|
|
Name: "p44",
|
|
Port: 44,
|
|
}},
|
|
}}
|
|
}
|
|
|
|
testCases := []struct {
|
|
// previousEndpoints and currentEndpoints are used to call appropriate
|
|
// handlers OnEndpoints* (based on whether corresponding values are nil
|
|
// or non-nil) and must be of equal length.
|
|
previousEndpoints []*api.Endpoints
|
|
currentEndpoints []*api.Endpoints
|
|
oldEndpoints map[proxy.ServicePortName][]*endpointsInfo
|
|
expectedResult map[proxy.ServicePortName][]*endpointsInfo
|
|
expectedStaleEndpoints []proxy.ServiceEndpoint
|
|
expectedStaleServiceNames map[proxy.ServicePortName]bool
|
|
expectedHealthchecks map[types.NamespacedName]int
|
|
}{{
|
|
// Case[0]: nothing
|
|
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{},
|
|
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{},
|
|
expectedStaleEndpoints: []proxy.ServiceEndpoint{},
|
|
expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
|
|
expectedHealthchecks: map[types.NamespacedName]int{},
|
|
}, {
|
|
// Case[1]: no change, unnamed port
|
|
previousEndpoints: []*api.Endpoints{
|
|
makeTestEndpoints("ns1", "ep1", unnamedPort),
|
|
},
|
|
currentEndpoints: []*api.Endpoints{
|
|
makeTestEndpoints("ns1", "ep1", unnamedPort),
|
|
},
|
|
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
|
|
makeServicePortName("ns1", "ep1", ""): {
|
|
{endpoint: "1.1.1.1:11", isLocal: false},
|
|
},
|
|
},
|
|
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
|
|
makeServicePortName("ns1", "ep1", ""): {
|
|
{endpoint: "1.1.1.1:11", isLocal: false},
|
|
},
|
|
},
|
|
expectedStaleEndpoints: []proxy.ServiceEndpoint{},
|
|
expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
|
|
expectedHealthchecks: map[types.NamespacedName]int{},
|
|
}, {
|
|
// Case[2]: no change, named port, local
|
|
previousEndpoints: []*api.Endpoints{
|
|
makeTestEndpoints("ns1", "ep1", namedPortLocal),
|
|
},
|
|
currentEndpoints: []*api.Endpoints{
|
|
makeTestEndpoints("ns1", "ep1", namedPortLocal),
|
|
},
|
|
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
|
|
makeServicePortName("ns1", "ep1", "p11"): {
|
|
{endpoint: "1.1.1.1:11", isLocal: true},
|
|
},
|
|
},
|
|
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
|
|
makeServicePortName("ns1", "ep1", "p11"): {
|
|
{endpoint: "1.1.1.1:11", isLocal: true},
|
|
},
|
|
},
|
|
expectedStaleEndpoints: []proxy.ServiceEndpoint{},
|
|
expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
|
|
expectedHealthchecks: map[types.NamespacedName]int{
|
|
makeNSN("ns1", "ep1"): 1,
|
|
},
|
|
}, {
|
|
// Case[3]: no change, multiple subsets
|
|
previousEndpoints: []*api.Endpoints{
|
|
makeTestEndpoints("ns1", "ep1", multipleSubsets),
|
|
},
|
|
currentEndpoints: []*api.Endpoints{
|
|
makeTestEndpoints("ns1", "ep1", multipleSubsets),
|
|
},
|
|
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
|
|
makeServicePortName("ns1", "ep1", "p11"): {
|
|
{endpoint: "1.1.1.1:11", isLocal: false},
|
|
},
|
|
makeServicePortName("ns1", "ep1", "p12"): {
|
|
{endpoint: "1.1.1.2:12", isLocal: false},
|
|
},
|
|
},
|
|
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
|
|
makeServicePortName("ns1", "ep1", "p11"): {
|
|
{endpoint: "1.1.1.1:11", isLocal: false},
|
|
},
|
|
makeServicePortName("ns1", "ep1", "p12"): {
|
|
{endpoint: "1.1.1.2:12", isLocal: false},
|
|
},
|
|
},
|
|
expectedStaleEndpoints: []proxy.ServiceEndpoint{},
|
|
expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
|
|
expectedHealthchecks: map[types.NamespacedName]int{},
|
|
}, {
|
|
// Case[4]: no change, multiple subsets, multiple ports, local
|
|
previousEndpoints: []*api.Endpoints{
|
|
makeTestEndpoints("ns1", "ep1", multipleSubsetsMultiplePortsLocal),
|
|
},
|
|
currentEndpoints: []*api.Endpoints{
|
|
makeTestEndpoints("ns1", "ep1", multipleSubsetsMultiplePortsLocal),
|
|
},
|
|
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
|
|
makeServicePortName("ns1", "ep1", "p11"): {
|
|
{endpoint: "1.1.1.1:11", isLocal: true},
|
|
},
|
|
makeServicePortName("ns1", "ep1", "p12"): {
|
|
{endpoint: "1.1.1.1:12", isLocal: true},
|
|
},
|
|
makeServicePortName("ns1", "ep1", "p13"): {
|
|
{endpoint: "1.1.1.3:13", isLocal: false},
|
|
},
|
|
},
|
|
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
|
|
makeServicePortName("ns1", "ep1", "p11"): {
|
|
{endpoint: "1.1.1.1:11", isLocal: true},
|
|
},
|
|
makeServicePortName("ns1", "ep1", "p12"): {
|
|
{endpoint: "1.1.1.1:12", isLocal: true},
|
|
},
|
|
makeServicePortName("ns1", "ep1", "p13"): {
|
|
{endpoint: "1.1.1.3:13", isLocal: false},
|
|
},
|
|
},
|
|
expectedStaleEndpoints: []proxy.ServiceEndpoint{},
|
|
expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
|
|
expectedHealthchecks: map[types.NamespacedName]int{
|
|
makeNSN("ns1", "ep1"): 1,
|
|
},
|
|
}, {
|
|
// Case[5]: no change, multiple endpoints, subsets, IPs, and ports
|
|
previousEndpoints: []*api.Endpoints{
|
|
makeTestEndpoints("ns1", "ep1", multipleSubsetsIPsPorts1),
|
|
makeTestEndpoints("ns2", "ep2", multipleSubsetsIPsPorts2),
|
|
},
|
|
currentEndpoints: []*api.Endpoints{
|
|
makeTestEndpoints("ns1", "ep1", multipleSubsetsIPsPorts1),
|
|
makeTestEndpoints("ns2", "ep2", multipleSubsetsIPsPorts2),
|
|
},
|
|
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
|
|
makeServicePortName("ns1", "ep1", "p11"): {
|
|
{endpoint: "1.1.1.1:11", isLocal: false},
|
|
{endpoint: "1.1.1.2:11", isLocal: true},
|
|
},
|
|
makeServicePortName("ns1", "ep1", "p12"): {
|
|
{endpoint: "1.1.1.1:12", isLocal: false},
|
|
{endpoint: "1.1.1.2:12", isLocal: true},
|
|
},
|
|
makeServicePortName("ns1", "ep1", "p13"): {
|
|
{endpoint: "1.1.1.3:13", isLocal: false},
|
|
{endpoint: "1.1.1.4:13", isLocal: true},
|
|
},
|
|
makeServicePortName("ns1", "ep1", "p14"): {
|
|
{endpoint: "1.1.1.3:14", isLocal: false},
|
|
{endpoint: "1.1.1.4:14", isLocal: true},
|
|
},
|
|
makeServicePortName("ns2", "ep2", "p21"): {
|
|
{endpoint: "2.2.2.1:21", isLocal: false},
|
|
{endpoint: "2.2.2.2:21", isLocal: true},
|
|
},
|
|
makeServicePortName("ns2", "ep2", "p22"): {
|
|
{endpoint: "2.2.2.1:22", isLocal: false},
|
|
{endpoint: "2.2.2.2:22", isLocal: true},
|
|
},
|
|
},
|
|
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
|
|
makeServicePortName("ns1", "ep1", "p11"): {
|
|
{endpoint: "1.1.1.1:11", isLocal: false},
|
|
{endpoint: "1.1.1.2:11", isLocal: true},
|
|
},
|
|
makeServicePortName("ns1", "ep1", "p12"): {
|
|
{endpoint: "1.1.1.1:12", isLocal: false},
|
|
{endpoint: "1.1.1.2:12", isLocal: true},
|
|
},
|
|
makeServicePortName("ns1", "ep1", "p13"): {
|
|
{endpoint: "1.1.1.3:13", isLocal: false},
|
|
{endpoint: "1.1.1.4:13", isLocal: true},
|
|
},
|
|
makeServicePortName("ns1", "ep1", "p14"): {
|
|
{endpoint: "1.1.1.3:14", isLocal: false},
|
|
{endpoint: "1.1.1.4:14", isLocal: true},
|
|
},
|
|
makeServicePortName("ns2", "ep2", "p21"): {
|
|
{endpoint: "2.2.2.1:21", isLocal: false},
|
|
{endpoint: "2.2.2.2:21", isLocal: true},
|
|
},
|
|
makeServicePortName("ns2", "ep2", "p22"): {
|
|
{endpoint: "2.2.2.1:22", isLocal: false},
|
|
{endpoint: "2.2.2.2:22", isLocal: true},
|
|
},
|
|
},
|
|
expectedStaleEndpoints: []proxy.ServiceEndpoint{},
|
|
expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
|
|
expectedHealthchecks: map[types.NamespacedName]int{
|
|
makeNSN("ns1", "ep1"): 2,
|
|
makeNSN("ns2", "ep2"): 1,
|
|
},
|
|
}, {
|
|
// Case[6]: add an Endpoints
|
|
previousEndpoints: []*api.Endpoints{
|
|
nil,
|
|
},
|
|
currentEndpoints: []*api.Endpoints{
|
|
makeTestEndpoints("ns1", "ep1", unnamedPortLocal),
|
|
},
|
|
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{},
|
|
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
|
|
makeServicePortName("ns1", "ep1", ""): {
|
|
{endpoint: "1.1.1.1:11", isLocal: true},
|
|
},
|
|
},
|
|
expectedStaleEndpoints: []proxy.ServiceEndpoint{},
|
|
expectedStaleServiceNames: map[proxy.ServicePortName]bool{
|
|
makeServicePortName("ns1", "ep1", ""): true,
|
|
},
|
|
expectedHealthchecks: map[types.NamespacedName]int{
|
|
makeNSN("ns1", "ep1"): 1,
|
|
},
|
|
}, {
|
|
// Case[7]: remove an Endpoints
|
|
previousEndpoints: []*api.Endpoints{
|
|
makeTestEndpoints("ns1", "ep1", unnamedPortLocal),
|
|
},
|
|
currentEndpoints: []*api.Endpoints{
|
|
nil,
|
|
},
|
|
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
|
|
makeServicePortName("ns1", "ep1", ""): {
|
|
{endpoint: "1.1.1.1:11", isLocal: true},
|
|
},
|
|
},
|
|
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{},
|
|
expectedStaleEndpoints: []proxy.ServiceEndpoint{{
|
|
Endpoint: "1.1.1.1:11",
|
|
ServicePortName: makeServicePortName("ns1", "ep1", ""),
|
|
}},
|
|
expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
|
|
expectedHealthchecks: map[types.NamespacedName]int{},
|
|
}, {
|
|
// Case[8]: add an IP and port
|
|
previousEndpoints: []*api.Endpoints{
|
|
makeTestEndpoints("ns1", "ep1", namedPort),
|
|
},
|
|
currentEndpoints: []*api.Endpoints{
|
|
makeTestEndpoints("ns1", "ep1", namedPortsLocalNoLocal),
|
|
},
|
|
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
|
|
makeServicePortName("ns1", "ep1", "p11"): {
|
|
{endpoint: "1.1.1.1:11", isLocal: false},
|
|
},
|
|
},
|
|
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
|
|
makeServicePortName("ns1", "ep1", "p11"): {
|
|
{endpoint: "1.1.1.1:11", isLocal: false},
|
|
{endpoint: "1.1.1.2:11", isLocal: true},
|
|
},
|
|
makeServicePortName("ns1", "ep1", "p12"): {
|
|
{endpoint: "1.1.1.1:12", isLocal: false},
|
|
{endpoint: "1.1.1.2:12", isLocal: true},
|
|
},
|
|
},
|
|
expectedStaleEndpoints: []proxy.ServiceEndpoint{},
|
|
expectedStaleServiceNames: map[proxy.ServicePortName]bool{
|
|
makeServicePortName("ns1", "ep1", "p12"): true,
|
|
},
|
|
expectedHealthchecks: map[types.NamespacedName]int{
|
|
makeNSN("ns1", "ep1"): 1,
|
|
},
|
|
}, {
|
|
// Case[9]: remove an IP and port
|
|
previousEndpoints: []*api.Endpoints{
|
|
makeTestEndpoints("ns1", "ep1", namedPortsLocalNoLocal),
|
|
},
|
|
currentEndpoints: []*api.Endpoints{
|
|
makeTestEndpoints("ns1", "ep1", namedPort),
|
|
},
|
|
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
|
|
makeServicePortName("ns1", "ep1", "p11"): {
|
|
{endpoint: "1.1.1.1:11", isLocal: false},
|
|
{endpoint: "1.1.1.2:11", isLocal: true},
|
|
},
|
|
makeServicePortName("ns1", "ep1", "p12"): {
|
|
{endpoint: "1.1.1.1:12", isLocal: false},
|
|
{endpoint: "1.1.1.2:12", isLocal: true},
|
|
},
|
|
},
|
|
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
|
|
makeServicePortName("ns1", "ep1", "p11"): {
|
|
{endpoint: "1.1.1.1:11", isLocal: false},
|
|
},
|
|
},
|
|
expectedStaleEndpoints: []proxy.ServiceEndpoint{{
|
|
Endpoint: "1.1.1.2:11",
|
|
ServicePortName: makeServicePortName("ns1", "ep1", "p11"),
|
|
}, {
|
|
Endpoint: "1.1.1.1:12",
|
|
ServicePortName: makeServicePortName("ns1", "ep1", "p12"),
|
|
}, {
|
|
Endpoint: "1.1.1.2:12",
|
|
ServicePortName: makeServicePortName("ns1", "ep1", "p12"),
|
|
}},
|
|
expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
|
|
expectedHealthchecks: map[types.NamespacedName]int{},
|
|
}, {
|
|
// Case[10]: add a subset
|
|
previousEndpoints: []*api.Endpoints{
|
|
makeTestEndpoints("ns1", "ep1", namedPort),
|
|
},
|
|
currentEndpoints: []*api.Endpoints{
|
|
makeTestEndpoints("ns1", "ep1", multipleSubsetsWithLocal),
|
|
},
|
|
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
|
|
makeServicePortName("ns1", "ep1", "p11"): {
|
|
{endpoint: "1.1.1.1:11", isLocal: false},
|
|
},
|
|
},
|
|
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
|
|
makeServicePortName("ns1", "ep1", "p11"): {
|
|
{endpoint: "1.1.1.1:11", isLocal: false},
|
|
},
|
|
makeServicePortName("ns1", "ep1", "p12"): {
|
|
{endpoint: "1.1.1.2:12", isLocal: true},
|
|
},
|
|
},
|
|
expectedStaleEndpoints: []proxy.ServiceEndpoint{},
|
|
expectedStaleServiceNames: map[proxy.ServicePortName]bool{
|
|
makeServicePortName("ns1", "ep1", "p12"): true,
|
|
},
|
|
expectedHealthchecks: map[types.NamespacedName]int{
|
|
makeNSN("ns1", "ep1"): 1,
|
|
},
|
|
}, {
|
|
// Case[11]: remove a subset
|
|
previousEndpoints: []*api.Endpoints{
|
|
makeTestEndpoints("ns1", "ep1", multipleSubsets),
|
|
},
|
|
currentEndpoints: []*api.Endpoints{
|
|
makeTestEndpoints("ns1", "ep1", namedPort),
|
|
},
|
|
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
|
|
makeServicePortName("ns1", "ep1", "p11"): {
|
|
{endpoint: "1.1.1.1:11", isLocal: false},
|
|
},
|
|
makeServicePortName("ns1", "ep1", "p12"): {
|
|
{endpoint: "1.1.1.2:12", isLocal: false},
|
|
},
|
|
},
|
|
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
|
|
makeServicePortName("ns1", "ep1", "p11"): {
|
|
{endpoint: "1.1.1.1:11", isLocal: false},
|
|
},
|
|
},
|
|
expectedStaleEndpoints: []proxy.ServiceEndpoint{{
|
|
Endpoint: "1.1.1.2:12",
|
|
ServicePortName: makeServicePortName("ns1", "ep1", "p12"),
|
|
}},
|
|
expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
|
|
expectedHealthchecks: map[types.NamespacedName]int{},
|
|
}, {
|
|
// Case[12]: rename a port
|
|
previousEndpoints: []*api.Endpoints{
|
|
makeTestEndpoints("ns1", "ep1", namedPort),
|
|
},
|
|
currentEndpoints: []*api.Endpoints{
|
|
makeTestEndpoints("ns1", "ep1", namedPortRenamed),
|
|
},
|
|
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
|
|
makeServicePortName("ns1", "ep1", "p11"): {
|
|
{endpoint: "1.1.1.1:11", isLocal: false},
|
|
},
|
|
},
|
|
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
|
|
makeServicePortName("ns1", "ep1", "p11-2"): {
|
|
{endpoint: "1.1.1.1:11", isLocal: false},
|
|
},
|
|
},
|
|
expectedStaleEndpoints: []proxy.ServiceEndpoint{{
|
|
Endpoint: "1.1.1.1:11",
|
|
ServicePortName: makeServicePortName("ns1", "ep1", "p11"),
|
|
}},
|
|
expectedStaleServiceNames: map[proxy.ServicePortName]bool{
|
|
makeServicePortName("ns1", "ep1", "p11-2"): true,
|
|
},
|
|
expectedHealthchecks: map[types.NamespacedName]int{},
|
|
}, {
|
|
// Case[13]: renumber a port
|
|
previousEndpoints: []*api.Endpoints{
|
|
makeTestEndpoints("ns1", "ep1", namedPort),
|
|
},
|
|
currentEndpoints: []*api.Endpoints{
|
|
makeTestEndpoints("ns1", "ep1", namedPortRenumbered),
|
|
},
|
|
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
|
|
makeServicePortName("ns1", "ep1", "p11"): {
|
|
{endpoint: "1.1.1.1:11", isLocal: false},
|
|
},
|
|
},
|
|
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
|
|
makeServicePortName("ns1", "ep1", "p11"): {
|
|
{endpoint: "1.1.1.1:22", isLocal: false},
|
|
},
|
|
},
|
|
expectedStaleEndpoints: []proxy.ServiceEndpoint{{
|
|
Endpoint: "1.1.1.1:11",
|
|
ServicePortName: makeServicePortName("ns1", "ep1", "p11"),
|
|
}},
|
|
expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
|
|
expectedHealthchecks: map[types.NamespacedName]int{},
|
|
}, {
|
|
// Case[14]: complex add and remove
|
|
previousEndpoints: []*api.Endpoints{
|
|
makeTestEndpoints("ns1", "ep1", complexBefore1),
|
|
makeTestEndpoints("ns2", "ep2", complexBefore2),
|
|
nil,
|
|
makeTestEndpoints("ns4", "ep4", complexBefore4),
|
|
},
|
|
currentEndpoints: []*api.Endpoints{
|
|
makeTestEndpoints("ns1", "ep1", complexAfter1),
|
|
nil,
|
|
makeTestEndpoints("ns3", "ep3", complexAfter3),
|
|
makeTestEndpoints("ns4", "ep4", complexAfter4),
|
|
},
|
|
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
|
|
makeServicePortName("ns1", "ep1", "p11"): {
|
|
{endpoint: "1.1.1.1:11", isLocal: false},
|
|
},
|
|
makeServicePortName("ns2", "ep2", "p22"): {
|
|
{endpoint: "2.2.2.2:22", isLocal: true},
|
|
{endpoint: "2.2.2.22:22", isLocal: true},
|
|
},
|
|
makeServicePortName("ns2", "ep2", "p23"): {
|
|
{endpoint: "2.2.2.3:23", isLocal: true},
|
|
},
|
|
makeServicePortName("ns4", "ep4", "p44"): {
|
|
{endpoint: "4.4.4.4:44", isLocal: true},
|
|
{endpoint: "4.4.4.5:44", isLocal: true},
|
|
},
|
|
makeServicePortName("ns4", "ep4", "p45"): {
|
|
{endpoint: "4.4.4.6:45", isLocal: true},
|
|
},
|
|
},
|
|
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
|
|
makeServicePortName("ns1", "ep1", "p11"): {
|
|
{endpoint: "1.1.1.1:11", isLocal: false},
|
|
{endpoint: "1.1.1.11:11", isLocal: false},
|
|
},
|
|
makeServicePortName("ns1", "ep1", "p12"): {
|
|
{endpoint: "1.1.1.2:12", isLocal: false},
|
|
},
|
|
makeServicePortName("ns1", "ep1", "p122"): {
|
|
{endpoint: "1.1.1.2:122", isLocal: false},
|
|
},
|
|
makeServicePortName("ns3", "ep3", "p33"): {
|
|
{endpoint: "3.3.3.3:33", isLocal: false},
|
|
},
|
|
makeServicePortName("ns4", "ep4", "p44"): {
|
|
{endpoint: "4.4.4.4:44", isLocal: true},
|
|
},
|
|
},
|
|
expectedStaleEndpoints: []proxy.ServiceEndpoint{{
|
|
Endpoint: "2.2.2.2:22",
|
|
ServicePortName: makeServicePortName("ns2", "ep2", "p22"),
|
|
}, {
|
|
Endpoint: "2.2.2.22:22",
|
|
ServicePortName: makeServicePortName("ns2", "ep2", "p22"),
|
|
}, {
|
|
Endpoint: "2.2.2.3:23",
|
|
ServicePortName: makeServicePortName("ns2", "ep2", "p23"),
|
|
}, {
|
|
Endpoint: "4.4.4.5:44",
|
|
ServicePortName: makeServicePortName("ns4", "ep4", "p44"),
|
|
}, {
|
|
Endpoint: "4.4.4.6:45",
|
|
ServicePortName: makeServicePortName("ns4", "ep4", "p45"),
|
|
}},
|
|
expectedStaleServiceNames: map[proxy.ServicePortName]bool{
|
|
makeServicePortName("ns1", "ep1", "p12"): true,
|
|
makeServicePortName("ns1", "ep1", "p122"): true,
|
|
makeServicePortName("ns3", "ep3", "p33"): true,
|
|
},
|
|
expectedHealthchecks: map[types.NamespacedName]int{
|
|
makeNSN("ns4", "ep4"): 1,
|
|
},
|
|
}, {
|
|
// Case[15]: change from 0 endpoint address to 1 unnamed port
|
|
previousEndpoints: []*api.Endpoints{
|
|
makeTestEndpoints("ns1", "ep1", emptyEndpoint),
|
|
},
|
|
currentEndpoints: []*api.Endpoints{
|
|
makeTestEndpoints("ns1", "ep1", unnamedPort),
|
|
},
|
|
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{},
|
|
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
|
|
makeServicePortName("ns1", "ep1", ""): {
|
|
{endpoint: "1.1.1.1:11", isLocal: false},
|
|
},
|
|
},
|
|
expectedStaleEndpoints: []proxy.ServiceEndpoint{},
|
|
expectedStaleServiceNames: map[proxy.ServicePortName]bool{
|
|
makeServicePortName("ns1", "ep1", ""): true,
|
|
},
|
|
expectedHealthchecks: map[types.NamespacedName]int{},
|
|
},
|
|
}
|
|
|
|
for tci, tc := range testCases {
|
|
ipt := iptablestest.NewFake()
|
|
fp := NewFakeProxier(ipt)
|
|
fp.hostname = nodeName
|
|
|
|
// First check that after adding all previous versions of endpoints,
|
|
// the fp.oldEndpoints is as we expect.
|
|
for i := range tc.previousEndpoints {
|
|
if tc.previousEndpoints[i] != nil {
|
|
fp.OnEndpointsAdd(tc.previousEndpoints[i])
|
|
}
|
|
}
|
|
proxy.UpdateEndpointsMap(fp.endpointsMap, fp.endpointsChanges)
|
|
compareEndpointsMaps(t, tci, fp.endpointsMap, tc.oldEndpoints)
|
|
|
|
// Now let's call appropriate handlers to get to state we want to be.
|
|
if len(tc.previousEndpoints) != len(tc.currentEndpoints) {
|
|
t.Fatalf("[%d] different lengths of previous and current endpoints", tci)
|
|
continue
|
|
}
|
|
|
|
for i := range tc.previousEndpoints {
|
|
prev, curr := tc.previousEndpoints[i], tc.currentEndpoints[i]
|
|
switch {
|
|
case prev == nil:
|
|
fp.OnEndpointsAdd(curr)
|
|
case curr == nil:
|
|
fp.OnEndpointsDelete(prev)
|
|
default:
|
|
fp.OnEndpointsUpdate(prev, curr)
|
|
}
|
|
}
|
|
result := proxy.UpdateEndpointsMap(fp.endpointsMap, fp.endpointsChanges)
|
|
newMap := fp.endpointsMap
|
|
compareEndpointsMaps(t, tci, newMap, tc.expectedResult)
|
|
if len(result.StaleEndpoints) != len(tc.expectedStaleEndpoints) {
|
|
t.Errorf("[%d] expected %d staleEndpoints, got %d: %v", tci, len(tc.expectedStaleEndpoints), len(result.StaleEndpoints), result.StaleEndpoints)
|
|
}
|
|
for _, x := range tc.expectedStaleEndpoints {
|
|
found := false
|
|
for _, stale := range result.StaleEndpoints {
|
|
if stale == x {
|
|
found = true
|
|
break
|
|
}
|
|
}
|
|
if !found {
|
|
t.Errorf("[%d] expected staleEndpoints[%v], but didn't find it: %v", tci, x, result.StaleEndpoints)
|
|
}
|
|
}
|
|
if len(result.StaleServiceNames) != len(tc.expectedStaleServiceNames) {
|
|
t.Errorf("[%d] expected %d staleServiceNames, got %d: %v", tci, len(tc.expectedStaleServiceNames), len(result.StaleServiceNames), result.StaleServiceNames)
|
|
}
|
|
for svcName := range tc.expectedStaleServiceNames {
|
|
found := false
|
|
for _, stale := range result.StaleServiceNames {
|
|
if stale == svcName {
|
|
found = true
|
|
}
|
|
}
|
|
if !found {
|
|
t.Errorf("[%d] expected staleServiceNames[%v], but didn't find it: %v", tci, svcName, result.StaleServiceNames)
|
|
}
|
|
}
|
|
if !reflect.DeepEqual(result.HCEndpointsLocalIPSize, tc.expectedHealthchecks) {
|
|
t.Errorf("[%d] expected healthchecks %v, got %v", tci, tc.expectedHealthchecks, result.HCEndpointsLocalIPSize)
|
|
}
|
|
}
|
|
}
|
|
|
|
// TODO(thockin): add *more* tests for syncProxyRules() or break it down further and test the pieces.
|