From 9507af3c79a8ecb90e6912e5f7d80de7f87385af Mon Sep 17 00:00:00 2001 From: Tim Hockin Date: Wed, 1 Feb 2017 12:47:29 -0800 Subject: [PATCH] Refactor OnEndpointsUpdate for testing This is a weird function, but I didn't want to change any semantics until the tests are in place. Testing exposed one bug where stale connections of renamed ports were not marked stale. There are other things that seem wrong here, more will follow. --- pkg/proxy/iptables/BUILD | 1 + pkg/proxy/iptables/proxier.go | 94 +++++---- pkg/proxy/iptables/proxier_test.go | 309 +++++++++++++++++++++++++++++ 3 files changed, 365 insertions(+), 39 deletions(-) diff --git a/pkg/proxy/iptables/BUILD b/pkg/proxy/iptables/BUILD index 0c5cc6891d1..0df7cc5a7d4 100644 --- a/pkg/proxy/iptables/BUILD +++ b/pkg/proxy/iptables/BUILD @@ -46,6 +46,7 @@ go_test( "//pkg/util/exec:go_default_library", "//pkg/util/iptables:go_default_library", "//pkg/util/iptables/testing:go_default_library", + "//vendor:github.com/davecgh/go-spew/spew", "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", "//vendor:k8s.io/apimachinery/pkg/types", "//vendor:k8s.io/apimachinery/pkg/util/intstr", diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index 8d2127af322..5b27d6644f6 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -595,47 +595,12 @@ func (proxier *Proxier) OnEndpointsUpdate(allEndpoints []api.Endpoints) { // Update endpoints for services. for i := range allEndpoints { - svcEndpoints := &allEndpoints[i] - - // We need to build a map of portname -> all ip:ports for that - // portname. Explode Endpoints.Subsets[*] into this structure. - portsToEndpoints := map[string][]hostPortInfo{} - for i := range svcEndpoints.Subsets { - ss := &svcEndpoints.Subsets[i] - for i := range ss.Ports { - port := &ss.Ports[i] - for i := range ss.Addresses { - addr := &ss.Addresses[i] - hostPortObject := hostPortInfo{ - host: addr.IP, - port: int(port.Port), - isLocal: addr.NodeName != nil && *addr.NodeName == proxier.hostname, - } - portsToEndpoints[port.Name] = append(portsToEndpoints[port.Name], hostPortObject) - } - } - } - for portname := range portsToEndpoints { - svcPort := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: svcEndpoints.Namespace, Name: svcEndpoints.Name}, Port: portname} - svcPortToInfoMap[svcPort] = portsToEndpoints[portname] - curEndpoints := proxier.endpointsMap[svcPort] - newEndpoints := flattenValidEndpoints(portsToEndpoints[portname]) - // Flatten the list of current endpoint infos to just a list of ips as strings - curEndpointIPs := flattenEndpointsInfo(curEndpoints) - if len(curEndpointIPs) != len(newEndpoints) || !slicesEquiv(slice.CopyStrings(curEndpointIPs), newEndpoints) { - glog.V(3).Infof("Setting endpoints for %q to %+v", svcPort, newEndpoints) - // Gather stale connections to removed endpoints - removedEndpoints := getRemovedEndpoints(curEndpointIPs, newEndpoints) - for _, ep := range removedEndpoints { - staleConnections[endpointServicePair{endpoint: ep, servicePortName: svcPort}] = true - } - } - // Once the set operations using the list of ips are complete, build the list of endpoint infos - newEndpointsMap[svcPort] = buildEndpointInfoList(portsToEndpoints[portname], newEndpoints) - activeEndpoints[svcPort] = true - } + accumulateEndpointsMap(&allEndpoints[i], proxier.hostname, proxier.endpointsMap, &newEndpointsMap, + &svcPortToInfoMap, &staleConnections, &activeEndpoints) } // Check stale connections against endpoints missing from the update. + // TODO: we should really only mark a connection stale if the proto was UDP + // and the (ip, port, proto) was removed from the endpoints. for svcPort := range proxier.endpointsMap { if !activeEndpoints[svcPort] { glog.V(2).Infof("Removing endpoints for %q", svcPort) @@ -668,6 +633,57 @@ func (proxier *Proxier) OnEndpointsUpdate(allEndpoints []api.Endpoints) { proxier.deleteEndpointConnections(staleConnections) } +// Gather information about all the endpoint state for a given api.Endpoints. +// This can not detect all stale connections, so the caller should also check +// for entries that were totally removed. +func accumulateEndpointsMap(endpoints *api.Endpoints, hostname string, + curEndpoints map[proxy.ServicePortName][]*endpointsInfo, + newEndpoints *map[proxy.ServicePortName][]*endpointsInfo, + svcPortToInfoMap *map[proxy.ServicePortName][]hostPortInfo, + staleConnections *map[endpointServicePair]bool, + activeEndpoints *map[proxy.ServicePortName]bool) { + + // We need to build a map of portname -> all ip:ports for that + // portname. Explode Endpoints.Subsets[*] into this structure. + portsToEndpoints := map[string][]hostPortInfo{} + for i := range endpoints.Subsets { + ss := &endpoints.Subsets[i] + for i := range ss.Ports { + port := &ss.Ports[i] + for i := range ss.Addresses { + addr := &ss.Addresses[i] + hostPortObject := hostPortInfo{ + host: addr.IP, + port: int(port.Port), + isLocal: addr.NodeName != nil && *addr.NodeName == hostname, + } + portsToEndpoints[port.Name] = append(portsToEndpoints[port.Name], hostPortObject) + } + } + } + for portname := range portsToEndpoints { + svcPort := proxy.ServicePortName{ + NamespacedName: types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}, + Port: portname, + } + (*svcPortToInfoMap)[svcPort] = portsToEndpoints[portname] + newEPList := flattenValidEndpoints(portsToEndpoints[portname]) + // Flatten the list of current endpoint infos to just a list of ips as strings + curEndpointIPs := flattenEndpointsInfo(curEndpoints[svcPort]) + if len(curEndpointIPs) != len(newEPList) || !slicesEquiv(slice.CopyStrings(curEndpointIPs), newEPList) { + glog.V(3).Infof("Setting endpoints for %q to %+v", svcPort, newEndpoints) + // Gather stale connections to removed endpoints + removedEndpoints := getRemovedEndpoints(curEndpointIPs, newEPList) + for _, ep := range removedEndpoints { + (*staleConnections)[endpointServicePair{endpoint: ep, servicePortName: svcPort}] = true + } + } + // Once the set operations using the list of ips are complete, build the list of endpoint infos + (*newEndpoints)[svcPort] = buildEndpointInfoList(portsToEndpoints[portname], newEPList) + (*activeEndpoints)[svcPort] = true + } +} + // updateHealthCheckEntries - send the new set of local endpoints to the health checker func (proxier *Proxier) updateHealthCheckEntries(name types.NamespacedName, hostPorts []hostPortInfo) { if !utilfeature.DefaultFeatureGate.Enabled(features.ExternalTrafficLocalOnly) { diff --git a/pkg/proxy/iptables/proxier_test.go b/pkg/proxy/iptables/proxier_test.go index 472432b8313..e763b95d36f 100644 --- a/pkg/proxy/iptables/proxier_test.go +++ b/pkg/proxy/iptables/proxier_test.go @@ -19,6 +19,8 @@ package iptables import ( "testing" + "github.com/davecgh/go-spew/spew" + "fmt" "net" "strings" @@ -1172,4 +1174,311 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) { } } +// This is a coarse test, but it offers some modicum of confidence as the code is evolved. +func Test_accumulateEndpointsMap(t *testing.T) { + testCases := []struct { + newEndpoints api.Endpoints + oldEndpoints map[proxy.ServicePortName][]*endpointsInfo + expectedNew map[proxy.ServicePortName][]*endpointsInfo + expectedActive []proxy.ServicePortName + expectedStale []endpointServicePair + }{{ + // Case[0]: nothing + newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) {}), + oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{}, + expectedNew: map[proxy.ServicePortName][]*endpointsInfo{}, + expectedActive: []proxy.ServicePortName{}, + expectedStale: []endpointServicePair{}, + }, { + // Case[1]: no changes, unnamed port + newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { + ept.Subsets = []api.EndpointSubset{ + { + Addresses: []api.EndpointAddress{{ + IP: "1.1.1.1", + }}, + Ports: []api.EndpointPort{{ + Name: "", + Port: 11, + }}, + }, + } + }), + oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", ""): { + {"1.1.1.1:11", false}, + }, + }, + expectedNew: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", ""): { + {"1.1.1.1:11", false}, + }, + }, + expectedActive: []proxy.ServicePortName{makeServicePortName("ns1", "ep1", "")}, + expectedStale: []endpointServicePair{}, + }, { + // Case[2]: no changes, named port + newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { + ept.Subsets = []api.EndpointSubset{ + { + Addresses: []api.EndpointAddress{{ + IP: "1.1.1.1", + }}, + Ports: []api.EndpointPort{{ + Name: "port", + Port: 11, + }}, + }, + } + }), + oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", "port"): { + {"1.1.1.1:11", false}, + }, + }, + expectedNew: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", "port"): { + {"1.1.1.1:11", false}, + }, + }, + expectedActive: []proxy.ServicePortName{makeServicePortName("ns1", "ep1", "port")}, + expectedStale: []endpointServicePair{}, + }, { + // Case[3]: new port + newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { + ept.Subsets = []api.EndpointSubset{ + { + Addresses: []api.EndpointAddress{{ + IP: "1.1.1.1", + }}, + Ports: []api.EndpointPort{{ + Port: 11, + }}, + }, + } + }), + oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", ""): {}, + }, + expectedNew: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", ""): { + {"1.1.1.1:11", false}, + }, + }, + expectedActive: []proxy.ServicePortName{makeServicePortName("ns1", "ep1", "")}, + expectedStale: []endpointServicePair{}, + }, { + // Case[4]: remove port + newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) {}), + oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", ""): { + {"1.1.1.1:11", false}, + }, + }, + expectedNew: map[proxy.ServicePortName][]*endpointsInfo{}, + expectedActive: []proxy.ServicePortName{}, + expectedStale: []endpointServicePair{ /* can't detect this one */ }, + }, { + // Case[5]: new IP and port + newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { + ept.Subsets = []api.EndpointSubset{ + { + Addresses: []api.EndpointAddress{{ + IP: "1.1.1.1", + }, { + IP: "2.2.2.2", + }}, + Ports: []api.EndpointPort{{ + Name: "p1", + Port: 11, + }, { + Name: "p2", + Port: 22, + }}, + }, + } + }), + oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", "p1"): { + {"1.1.1.1:11", false}, + }, + }, + expectedNew: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", "p1"): { + {"1.1.1.1:11", false}, + {"2.2.2.2:11", false}, + }, + makeServicePortName("ns1", "ep1", "p2"): { + {"1.1.1.1:22", false}, + {"2.2.2.2:22", false}, + }, + }, + expectedActive: []proxy.ServicePortName{ + makeServicePortName("ns1", "ep1", "p1"), + makeServicePortName("ns1", "ep1", "p2"), + }, + expectedStale: []endpointServicePair{}, + }, { + // Case[6]: remove IP and port + newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { + ept.Subsets = []api.EndpointSubset{ + { + Addresses: []api.EndpointAddress{{ + IP: "1.1.1.1", + }}, + Ports: []api.EndpointPort{{ + Name: "p1", + Port: 11, + }}, + }, + } + }), + oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", "p1"): { + {"1.1.1.1:11", false}, + {"2.2.2.2:11", false}, + }, + makeServicePortName("ns1", "ep1", "p2"): { + {"1.1.1.1:22", false}, + {"2.2.2.2:22", false}, + }, + }, + expectedNew: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", "p1"): { + {"1.1.1.1:11", false}, + }, + }, + expectedActive: []proxy.ServicePortName{ + makeServicePortName("ns1", "ep1", "p1"), + }, + expectedStale: []endpointServicePair{{ + endpoint: "2.2.2.2:11", + servicePortName: makeServicePortName("ns1", "ep1", "p1"), + }}, + }, { + // Case[7]: rename port + newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { + ept.Subsets = []api.EndpointSubset{ + { + Addresses: []api.EndpointAddress{{ + IP: "1.1.1.1", + }}, + Ports: []api.EndpointPort{{ + Name: "p2", + Port: 11, + }}, + }, + } + }), + oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", "p1"): { + {"1.1.1.1:11", false}, + }, + }, + expectedNew: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", "p2"): { + {"1.1.1.1:11", false}, + }, + }, + expectedActive: []proxy.ServicePortName{ + makeServicePortName("ns1", "ep1", "p2"), + }, + expectedStale: []endpointServicePair{}, + }, { + // Case[8]: renumber port + newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { + ept.Subsets = []api.EndpointSubset{ + { + Addresses: []api.EndpointAddress{{ + IP: "1.1.1.1", + }}, + Ports: []api.EndpointPort{{ + Name: "p1", + Port: 22, + }}, + }, + } + }), + oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", "p1"): { + {"1.1.1.1:11", false}, + }, + }, + expectedNew: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", "p1"): { + {"1.1.1.1:22", false}, + }, + }, + expectedActive: []proxy.ServicePortName{ + makeServicePortName("ns1", "ep1", "p1"), + }, + expectedStale: []endpointServicePair{{ + endpoint: "1.1.1.1:11", + servicePortName: makeServicePortName("ns1", "ep1", "p1"), + }}, + }} + + for tci, tc := range testCases { + // outputs + newEndpoints := map[proxy.ServicePortName][]*endpointsInfo{} + svcPortToInfoMap := map[proxy.ServicePortName][]hostPortInfo{} + staleConnections := map[endpointServicePair]bool{} + activeEndpoints := map[proxy.ServicePortName]bool{} + accumulateEndpointsMap(&tc.newEndpoints, "host", tc.oldEndpoints, + &newEndpoints, &svcPortToInfoMap, &staleConnections, &activeEndpoints) + + if len(newEndpoints) != len(tc.expectedNew) { + t.Errorf("[%d] expected %d new, got %d: %v", tci, len(tc.expectedNew), len(newEndpoints), spew.Sdump(newEndpoints)) + } + for x := range tc.expectedNew { + if len(newEndpoints[x]) != len(tc.expectedNew[x]) { + t.Errorf("[%d] expected %d endpoints for %v, got %d", tci, len(tc.expectedNew[x]), x, len(newEndpoints[x])) + } else { + for i := range newEndpoints[x] { + if *(newEndpoints[x][i]) != *(tc.expectedNew[x][i]) { + t.Errorf("[%d] expected new[%v][%d] to be %v, got %v", tci, x, i, tc.expectedNew[x][i], *(newEndpoints[x][i])) + } + } + } + } + if len(activeEndpoints) != len(tc.expectedActive) { + t.Errorf("[%d] expected %d active, got %d: %v", tci, len(tc.expectedActive), len(activeEndpoints), activeEndpoints) + } + for _, x := range tc.expectedActive { + if activeEndpoints[x] != true { + t.Errorf("[%d] expected active[%v], but didn't find it: %v", tci, x, activeEndpoints) + } + } + if len(staleConnections) != len(tc.expectedStale) { + t.Errorf("[%d] expected %d stale, got %d: %v", tci, len(tc.expectedStale), len(staleConnections), staleConnections) + } + for _, x := range tc.expectedStale { + if staleConnections[x] != true { + t.Errorf("[%d] expected stale[%v], but didn't find it: %v", tci, x, staleConnections) + } + } + } +} + +func makeTestEndpoints(namespace, name string, eptFunc func(*api.Endpoints)) api.Endpoints { + ept := api.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + } + eptFunc(&ept) + return ept +} + +func makeServicePortName(ns, name, port string) proxy.ServicePortName { + return proxy.ServicePortName{ + NamespacedName: types.NamespacedName{ + Namespace: ns, + Name: name, + }, + Port: port, + } +} + // TODO(thockin): add *more* tests for syncProxyRules() or break it down further and test the pieces.