Prep to move guts of OnEnpointsUpdate to sync

This makes it more obvious that they run together and makes the upcoming
rate-limited syncs easier.

Also make test use ints for ports, so it's easier to see when a port is
a literal value vs a name.
This commit is contained in:
Tim Hockin 2017-02-03 18:03:45 -08:00
parent cddda17d42
commit 7046c7efcb
2 changed files with 168 additions and 80 deletions

View File

@ -196,7 +196,7 @@ type Proxier struct {
endpointsMap map[proxy.ServicePortName][]*endpointsInfo
portsMap map[localPort]closeable
haveReceivedServiceUpdate bool // true once we've seen an OnServiceUpdate event
haveReceivedEndpointsUpdate bool // true once we've seen an OnEndpointsUpdate event
allEndpoints []api.Endpoints // nil until we have seen an OnEndpointsUpdate event
throttle flowcontrol.RateLimiter
// These are effectively const and do not need the mutex to be held.
@ -593,16 +593,15 @@ func buildEndpointInfoList(endPoints []hostPortInfo, endpointIPs []string) []*en
// OnEndpointsUpdate takes in a slice of updated endpoints.
func (proxier *Proxier) OnEndpointsUpdate(allEndpoints []api.Endpoints) {
start := time.Now()
defer func() {
glog.V(4).Infof("OnEndpointsUpdate took %v for %d endpoints", time.Since(start), len(allEndpoints))
}()
proxier.mu.Lock()
defer proxier.mu.Unlock()
proxier.haveReceivedEndpointsUpdate = true
if proxier.allEndpoints == nil {
glog.V(2).Info("Received first Endpoints update")
}
proxier.allEndpoints = allEndpoints
newMap, staleConnections := updateEndpoints(allEndpoints, proxier.endpointsMap, proxier.hostname, proxier.healthChecker)
// TODO: once service has made this same transform, move this into proxier.syncProxyRules()
newMap, staleConnections := updateEndpoints(proxier.allEndpoints, proxier.endpointsMap, proxier.hostname, proxier.healthChecker)
if len(newMap) != len(proxier.endpointsMap) || !reflect.DeepEqual(newMap, proxier.endpointsMap) {
proxier.endpointsMap = newMap
proxier.syncProxyRules()
@ -874,7 +873,7 @@ func (proxier *Proxier) syncProxyRules() {
glog.V(4).Infof("syncProxyRules took %v", time.Since(start))
}()
// don't sync rules till we've received services and endpoints
if !proxier.haveReceivedEndpointsUpdate || !proxier.haveReceivedServiceUpdate {
if proxier.allEndpoints == nil || !proxier.haveReceivedServiceUpdate {
glog.V(2).Info("Not syncing iptables until Services and Endpoints have been received from master")
return
}
@ -923,6 +922,10 @@ func (proxier *Proxier) syncProxyRules() {
}
}
//
// Below this point we will not return until we try to write the iptables rules.
//
// Get iptables-save output so we can check for existing chains and rules.
// This will be a map of chain name to chain with rules as stored in iptables-save/iptables-restore
existingFilterChains := make(map[utiliptables.Chain]string)

View File

@ -17,6 +17,7 @@ limitations under the License.
package iptables
import (
"strconv"
"testing"
"github.com/davecgh/go-spew/spew"
@ -292,8 +293,8 @@ func TestDeleteEndpointConnections(t *testing.T) {
}
serviceMap := make(map[proxy.ServicePortName]*serviceInfo)
svc1 := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "ns1", Name: "svc1"}, Port: "80"}
svc2 := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "ns1", Name: "svc2"}, Port: "80"}
svc1 := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "ns1", Name: "svc1"}, Port: "p80"}
svc2 := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "ns1", Name: "svc2"}, Port: "p80"}
serviceMap[svc1] = newFakeServiceInfo(svc1, net.IPv4(10, 20, 30, 40), 80, api.ProtocolUDP, false)
serviceMap[svc2] = newFakeServiceInfo(svc1, net.IPv4(10, 20, 30, 41), 80, api.ProtocolTCP, false)
@ -498,6 +499,8 @@ type fakeHealthChecker struct{}
func (fakeHealthChecker) UpdateEndpoints(serviceName types.NamespacedName, endpointUIDs sets.String) {}
const testHostname = "test-hostname"
func NewFakeProxier(ipt utiliptables.Interface) *Proxier {
// TODO: Call NewProxier after refactoring out the goroutine
// invocation into a Run() method.
@ -505,30 +508,30 @@ func NewFakeProxier(ipt utiliptables.Interface) *Proxier {
exec: &exec.FakeExec{},
serviceMap: make(map[proxy.ServicePortName]*serviceInfo),
iptables: ipt,
endpointsMap: make(map[proxy.ServicePortName][]*endpointsInfo),
clusterCIDR: "10.0.0.0/24",
haveReceivedEndpointsUpdate: true,
allEndpoints: []api.Endpoints{},
haveReceivedServiceUpdate: true,
hostname: "test-hostname",
hostname: testHostname,
portsMap: make(map[localPort]closeable),
portMapper: &fakePortOpener{[]*localPort{}},
healthChecker: fakeHealthChecker{},
}
}
func hasJump(rules []iptablestest.Rule, destChain, destIP, destPort string) bool {
func hasJump(rules []iptablestest.Rule, destChain, destIP string, destPort int) bool {
destPortStr := strconv.Itoa(destPort)
match := false
for _, r := range rules {
if r[iptablestest.Jump] == destChain {
match = true
if destIP != "" {
if strings.Contains(r[iptablestest.Destination], destIP) && (strings.Contains(r[iptablestest.DPort], destPort) || r[iptablestest.DPort] == "") {
if strings.Contains(r[iptablestest.Destination], destIP) && (strings.Contains(r[iptablestest.DPort], destPortStr) || r[iptablestest.DPort] == "") {
return true
}
match = false
}
if destPort != "" {
if strings.Contains(r[iptablestest.DPort], destPort) && (strings.Contains(r[iptablestest.Destination], destIP) || r[iptablestest.Destination] == "") {
if destPort != 0 {
if strings.Contains(r[iptablestest.DPort], destPortStr) && (strings.Contains(r[iptablestest.Destination], destIP) || r[iptablestest.Destination] == "") {
return true
}
match = false
@ -543,7 +546,7 @@ func TestHasJump(t *testing.T) {
rules []iptablestest.Rule
destChain string
destIP string
destPort string
destPort int
expected bool
}{
"case 1": {
@ -554,7 +557,7 @@ func TestHasJump(t *testing.T) {
},
destChain: "REJECT",
destIP: "10.20.30.41",
destPort: "80",
destPort: 80,
expected: true,
},
"case 2": {
@ -565,7 +568,7 @@ func TestHasJump(t *testing.T) {
},
destChain: "REJECT",
destIP: "",
destPort: "3001",
destPort: 3001,
expected: true,
},
"case 3": {
@ -575,7 +578,7 @@ func TestHasJump(t *testing.T) {
},
destChain: "KUBE-XLB-GF53O3C2HZEXL2XN",
destIP: "1.2.3.4",
destPort: "80",
destPort: 80,
expected: true,
},
"case 4": {
@ -585,7 +588,7 @@ func TestHasJump(t *testing.T) {
},
destChain: "KUBE-XLB-GF53O3C2HZEXL2XN",
destIP: "1.2.3.4",
destPort: "8080",
destPort: 8080,
expected: false,
},
"case 5": {
@ -595,7 +598,7 @@ func TestHasJump(t *testing.T) {
},
destChain: "KUBE-XLB-GF53O3C2HZEXL2XN",
destIP: "10.20.30.40",
destPort: "80",
destPort: 80,
expected: false,
},
"case 6": {
@ -607,7 +610,7 @@ func TestHasJump(t *testing.T) {
},
destChain: "REJECT",
destIP: "1.2.3.4",
destPort: "8080",
destPort: 8080,
expected: true,
},
"case 7": {
@ -618,7 +621,7 @@ func TestHasJump(t *testing.T) {
},
destChain: "REJECT",
destIP: "1.2.3.4",
destPort: "3001",
destPort: 3001,
expected: true,
},
"case 8": {
@ -629,7 +632,7 @@ func TestHasJump(t *testing.T) {
},
destChain: "REJECT",
destIP: "10.20.30.41",
destPort: "8080",
destPort: 8080,
expected: true,
},
"case 9": {
@ -638,7 +641,7 @@ func TestHasJump(t *testing.T) {
},
destChain: "KUBE-SEP-LWSOSDSHMKPJHHJV",
destIP: "",
destPort: "",
destPort: 0,
expected: true,
},
"case 10": {
@ -647,7 +650,7 @@ func TestHasJump(t *testing.T) {
},
destChain: "KUBE-SEP-BAR",
destIP: "",
destPort: "",
destPort: 0,
expected: false,
},
}
@ -681,7 +684,7 @@ func TestClusterIPReject(t *testing.T) {
svcName := "svc1"
svcIP := net.IPv4(10, 20, 30, 41)
svc := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "ns1", Name: svcName}, Port: "80"}
svc := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "ns1", Name: svcName}, Port: "p80"}
fp.serviceMap[svc] = newFakeServiceInfo(svc, svcIP, 80, api.ProtocolTCP, false)
fp.syncProxyRules()
@ -691,7 +694,7 @@ func TestClusterIPReject(t *testing.T) {
errorf(fmt.Sprintf("Unexpected rule for chain %v service %v without endpoints", svcChain, svcName), svcRules, t)
}
kubeSvcRules := ipt.GetRules(string(kubeServicesChain))
if !hasJump(kubeSvcRules, iptablestest.Reject, svcIP.String(), "80") {
if !hasJump(kubeSvcRules, iptablestest.Reject, svcIP.String(), 80) {
errorf(fmt.Sprintf("Failed to find a %v rule for service %v with no endpoints", iptablestest.Reject, svcName), kubeSvcRules, t)
}
}
@ -702,23 +705,37 @@ func TestClusterIPEndpointsJump(t *testing.T) {
svcName := "svc1"
svcIP := net.IPv4(10, 20, 30, 41)
svc := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "ns1", Name: svcName}, Port: "80"}
svc := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "ns1", Name: svcName}, Port: "p80"}
fp.serviceMap[svc] = newFakeServiceInfo(svc, svcIP, 80, api.ProtocolTCP, true)
ep := "10.180.0.1:80"
fp.endpointsMap[svc] = []*endpointsInfo{{ep, false}}
ip := "10.180.0.1"
port := 80
ep := fmt.Sprintf("%s:%d", ip, port)
allEndpoints := []api.Endpoints{
makeTestEndpoints("ns1", svcName, func(ept *api.Endpoints) {
ept.Subsets = []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{
IP: ip,
}},
Ports: []api.EndpointPort{{
Name: "p80",
Port: int32(port),
}},
}}
}),
}
fp.syncProxyRules()
fp.OnEndpointsUpdate(allEndpoints)
svcChain := string(servicePortChainName(svc, strings.ToLower(string(api.ProtocolTCP))))
epChain := string(servicePortEndpointChainName(svc, strings.ToLower(string(api.ProtocolTCP)), ep))
kubeSvcRules := ipt.GetRules(string(kubeServicesChain))
if !hasJump(kubeSvcRules, svcChain, svcIP.String(), "80") {
if !hasJump(kubeSvcRules, svcChain, svcIP.String(), 80) {
errorf(fmt.Sprintf("Failed to find jump from KUBE-SERVICES to %v chain", svcChain), kubeSvcRules, t)
}
svcRules := ipt.GetRules(svcChain)
if !hasJump(svcRules, epChain, "", "") {
if !hasJump(svcRules, epChain, "", 0) {
errorf(fmt.Sprintf("Failed to jump to ep chain %v", epChain), svcRules, t)
}
epRules := ipt.GetRules(epChain)
@ -741,12 +758,25 @@ func TestLoadBalancer(t *testing.T) {
svcName := "svc1"
svcIP := net.IPv4(10, 20, 30, 41)
svc := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "ns1", Name: svcName}, Port: "80"}
svc := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "ns1", Name: svcName}, Port: "p80"}
svcInfo := newFakeServiceInfo(svc, svcIP, 80, api.ProtocolTCP, false)
fp.serviceMap[svc] = typeLoadBalancer(svcInfo)
ep1 := "10.180.0.1:80"
fp.endpointsMap[svc] = []*endpointsInfo{{ep1, false}}
ip := "10.180.0.1"
port := 80
fp.allEndpoints = []api.Endpoints{
makeTestEndpoints("ns1", svcName, func(ept *api.Endpoints) {
ept.Subsets = []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{
IP: ip,
}},
Ports: []api.EndpointPort{{
Name: "p80",
Port: int32(port),
}},
}}
}),
}
fp.syncProxyRules()
@ -756,12 +786,12 @@ func TestLoadBalancer(t *testing.T) {
//lbChain := string(serviceLBChainName(svc, proto))
kubeSvcRules := ipt.GetRules(string(kubeServicesChain))
if !hasJump(kubeSvcRules, fwChain, svcInfo.loadBalancerStatus.Ingress[0].IP, "80") {
if !hasJump(kubeSvcRules, fwChain, svcInfo.loadBalancerStatus.Ingress[0].IP, 80) {
errorf(fmt.Sprintf("Failed to find jump to firewall chain %v", fwChain), kubeSvcRules, t)
}
fwRules := ipt.GetRules(fwChain)
if !hasJump(fwRules, svcChain, "", "") || !hasJump(fwRules, string(KubeMarkMasqChain), "", "") {
if !hasJump(fwRules, svcChain, "", 0) || !hasJump(fwRules, string(KubeMarkMasqChain), "", 0) {
errorf(fmt.Sprintf("Failed to find jump from firewall chain %v to svc chain %v", fwChain, svcChain), fwRules, t)
}
}
@ -772,13 +802,26 @@ func TestNodePort(t *testing.T) {
svcName := "svc1"
svcIP := net.IPv4(10, 20, 30, 41)
svc := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "ns1", Name: svcName}, Port: "80"}
svc := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "ns1", Name: svcName}, Port: "p80"}
svcInfo := newFakeServiceInfo(svc, svcIP, 80, api.ProtocolTCP, false)
svcInfo.nodePort = 3001
fp.serviceMap[svc] = svcInfo
ep1 := "10.180.0.1:80"
fp.endpointsMap[svc] = []*endpointsInfo{{ep1, false}}
ip := "10.180.0.1"
port := 80
fp.allEndpoints = []api.Endpoints{
makeTestEndpoints("ns1", svcName, func(ept *api.Endpoints) {
ept.Subsets = []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{
IP: ip,
}},
Ports: []api.EndpointPort{{
Name: "p80",
Port: int32(port),
}},
}}
}),
}
fp.syncProxyRules()
@ -786,26 +829,49 @@ func TestNodePort(t *testing.T) {
svcChain := string(servicePortChainName(svc, strings.ToLower(proto)))
kubeNodePortRules := ipt.GetRules(string(kubeNodePortsChain))
if !hasJump(kubeNodePortRules, svcChain, "", fmt.Sprintf("%v", svcInfo.nodePort)) {
if !hasJump(kubeNodePortRules, svcChain, "", svcInfo.nodePort) {
errorf(fmt.Sprintf("Failed to find jump to svc chain %v", svcChain), kubeNodePortRules, t)
}
}
func strPtr(s string) *string {
return &s
}
func TestOnlyLocalLoadBalancing(t *testing.T) {
ipt := iptablestest.NewFake()
fp := NewFakeProxier(ipt)
svcName := "svc1"
svcIP := net.IPv4(10, 20, 30, 41)
svc := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "ns1", Name: svcName}, Port: "80"}
svc := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "ns1", Name: svcName}, Port: "p80"}
svcInfo := newFakeServiceInfo(svc, svcIP, 80, api.ProtocolTCP, true)
fp.serviceMap[svc] = typeLoadBalancer(svcInfo)
nonLocalEp := "10.180.0.1:80"
localEp := "10.180.2.1:80"
fp.endpointsMap[svc] = []*endpointsInfo{{nonLocalEp, false}, {localEp, true}}
ip1 := "10.180.0.1"
ip2 := "10.180.2.1"
port := 80
nonLocalEp := fmt.Sprintf("%s:%d", ip1, port)
localEp := fmt.Sprintf("%s:%d", ip2, port)
allEndpoints := []api.Endpoints{
makeTestEndpoints("ns1", svcName, func(ept *api.Endpoints) {
ept.Subsets = []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{
IP: ip1,
NodeName: nil,
}, {
IP: ip2,
NodeName: strPtr(testHostname),
}},
Ports: []api.EndpointPort{{
Name: "p80",
Port: int32(port),
}},
}}
}),
}
fp.syncProxyRules()
fp.OnEndpointsUpdate(allEndpoints)
proto := strings.ToLower(string(api.ProtocolTCP))
fwChain := string(serviceFirewallChainName(svc, proto))
@ -815,24 +881,24 @@ func TestOnlyLocalLoadBalancing(t *testing.T) {
localEpChain := string(servicePortEndpointChainName(svc, strings.ToLower(string(api.ProtocolTCP)), localEp))
kubeSvcRules := ipt.GetRules(string(kubeServicesChain))
if !hasJump(kubeSvcRules, fwChain, svcInfo.loadBalancerStatus.Ingress[0].IP, "") {
if !hasJump(kubeSvcRules, fwChain, svcInfo.loadBalancerStatus.Ingress[0].IP, 0) {
errorf(fmt.Sprintf("Failed to find jump to firewall chain %v", fwChain), kubeSvcRules, t)
}
fwRules := ipt.GetRules(fwChain)
if !hasJump(fwRules, lbChain, "", "") {
if !hasJump(fwRules, lbChain, "", 0) {
errorf(fmt.Sprintf("Failed to find jump from firewall chain %v to svc chain %v", fwChain, lbChain), fwRules, t)
}
if hasJump(fwRules, string(KubeMarkMasqChain), "", "") {
if hasJump(fwRules, string(KubeMarkMasqChain), "", 0) {
errorf(fmt.Sprintf("Found jump from fw chain %v to MASQUERADE", fwChain), fwRules, t)
}
lbRules := ipt.GetRules(lbChain)
if hasJump(lbRules, nonLocalEpChain, "", "") {
if hasJump(lbRules, nonLocalEpChain, "", 0) {
errorf(fmt.Sprintf("Found jump from lb chain %v to non-local ep %v", lbChain, nonLocalEp), lbRules, t)
}
if !hasJump(lbRules, localEpChain, "", "") {
errorf(fmt.Sprintf("Didn't find jump from lb chain %v to local ep %v", lbChain, nonLocalEp), lbRules, t)
if !hasJump(lbRules, localEpChain, "", 0) {
errorf(fmt.Sprintf("Didn't find jump from lb chain %v to local ep %v", lbChain, localEp), lbRules, t)
}
}
@ -855,16 +921,35 @@ func onlyLocalNodePorts(t *testing.T, fp *Proxier, ipt *iptablestest.FakeIPTable
svcName := "svc1"
svcIP := net.IPv4(10, 20, 30, 41)
svc := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "ns1", Name: svcName}, Port: "80"}
svc := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "ns1", Name: svcName}, Port: "p80"}
svcInfo := newFakeServiceInfo(svc, svcIP, 80, api.ProtocolTCP, true)
svcInfo.nodePort = 3001
fp.serviceMap[svc] = svcInfo
nonLocalEp := "10.180.0.1:80"
localEp := "10.180.2.1:80"
fp.endpointsMap[svc] = []*endpointsInfo{{nonLocalEp, false}, {localEp, true}}
ip1 := "10.180.0.1"
ip2 := "10.180.2.1"
port := 80
nonLocalEp := fmt.Sprintf("%s:%d", ip1, port)
localEp := fmt.Sprintf("%s:%d", ip2, port)
allEndpoints := []api.Endpoints{
makeTestEndpoints("ns1", svcName, func(ept *api.Endpoints) {
ept.Subsets = []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{
IP: ip1,
NodeName: nil,
}, {
IP: ip2,
NodeName: strPtr(testHostname),
}},
Ports: []api.EndpointPort{{
Name: "p80",
Port: int32(port),
}},
}}
}),
}
fp.syncProxyRules()
fp.OnEndpointsUpdate(allEndpoints)
proto := strings.ToLower(string(api.ProtocolTCP))
lbChain := string(serviceLBChainName(svc, proto))
@ -873,23 +958,23 @@ func onlyLocalNodePorts(t *testing.T, fp *Proxier, ipt *iptablestest.FakeIPTable
localEpChain := string(servicePortEndpointChainName(svc, strings.ToLower(string(api.ProtocolTCP)), localEp))
kubeNodePortRules := ipt.GetRules(string(kubeNodePortsChain))
if !hasJump(kubeNodePortRules, lbChain, "", fmt.Sprintf("%v", svcInfo.nodePort)) {
if !hasJump(kubeNodePortRules, lbChain, "", svcInfo.nodePort) {
errorf(fmt.Sprintf("Failed to find jump to lb chain %v", lbChain), kubeNodePortRules, t)
}
svcChain := string(servicePortChainName(svc, strings.ToLower(string(api.ProtocolTCP))))
lbRules := ipt.GetRules(lbChain)
if hasJump(lbRules, nonLocalEpChain, "", "") {
if hasJump(lbRules, nonLocalEpChain, "", 0) {
errorf(fmt.Sprintf("Found jump from lb chain %v to non-local ep %v", lbChain, nonLocalEp), lbRules, t)
}
if hasJump(lbRules, svcChain, "", "") != shouldLBTOSVCRuleExist {
if hasJump(lbRules, svcChain, "", 0) != shouldLBTOSVCRuleExist {
prefix := "Did not find "
if !shouldLBTOSVCRuleExist {
prefix = "Found "
}
errorf(fmt.Sprintf("%s jump from lb chain %v to svc %v", prefix, lbChain, svcChain), lbRules, t)
}
if !hasJump(lbRules, localEpChain, "", "") {
if !hasJump(lbRules, localEpChain, "", 0) {
errorf(fmt.Sprintf("Didn't find jump from lb chain %v to local ep %v", lbChain, nonLocalEp), lbRules, t)
}
}