diff --git a/pkg/proxy/iptables/BUILD b/pkg/proxy/iptables/BUILD index 0c5cc6891d1..0df7cc5a7d4 100644 --- a/pkg/proxy/iptables/BUILD +++ b/pkg/proxy/iptables/BUILD @@ -46,6 +46,7 @@ go_test( "//pkg/util/exec:go_default_library", "//pkg/util/iptables:go_default_library", "//pkg/util/iptables/testing:go_default_library", + "//vendor:github.com/davecgh/go-spew/spew", "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", "//vendor:k8s.io/apimachinery/pkg/types", "//vendor:k8s.io/apimachinery/pkg/util/intstr", diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index 8d2127af322..5b27d6644f6 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -595,47 +595,12 @@ func (proxier *Proxier) OnEndpointsUpdate(allEndpoints []api.Endpoints) { // Update endpoints for services. for i := range allEndpoints { - svcEndpoints := &allEndpoints[i] - - // We need to build a map of portname -> all ip:ports for that - // portname. Explode Endpoints.Subsets[*] into this structure. - portsToEndpoints := map[string][]hostPortInfo{} - for i := range svcEndpoints.Subsets { - ss := &svcEndpoints.Subsets[i] - for i := range ss.Ports { - port := &ss.Ports[i] - for i := range ss.Addresses { - addr := &ss.Addresses[i] - hostPortObject := hostPortInfo{ - host: addr.IP, - port: int(port.Port), - isLocal: addr.NodeName != nil && *addr.NodeName == proxier.hostname, - } - portsToEndpoints[port.Name] = append(portsToEndpoints[port.Name], hostPortObject) - } - } - } - for portname := range portsToEndpoints { - svcPort := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: svcEndpoints.Namespace, Name: svcEndpoints.Name}, Port: portname} - svcPortToInfoMap[svcPort] = portsToEndpoints[portname] - curEndpoints := proxier.endpointsMap[svcPort] - newEndpoints := flattenValidEndpoints(portsToEndpoints[portname]) - // Flatten the list of current endpoint infos to just a list of ips as strings - curEndpointIPs := flattenEndpointsInfo(curEndpoints) - if len(curEndpointIPs) != len(newEndpoints) || !slicesEquiv(slice.CopyStrings(curEndpointIPs), newEndpoints) { - glog.V(3).Infof("Setting endpoints for %q to %+v", svcPort, newEndpoints) - // Gather stale connections to removed endpoints - removedEndpoints := getRemovedEndpoints(curEndpointIPs, newEndpoints) - for _, ep := range removedEndpoints { - staleConnections[endpointServicePair{endpoint: ep, servicePortName: svcPort}] = true - } - } - // Once the set operations using the list of ips are complete, build the list of endpoint infos - newEndpointsMap[svcPort] = buildEndpointInfoList(portsToEndpoints[portname], newEndpoints) - activeEndpoints[svcPort] = true - } + accumulateEndpointsMap(&allEndpoints[i], proxier.hostname, proxier.endpointsMap, &newEndpointsMap, + &svcPortToInfoMap, &staleConnections, &activeEndpoints) } // Check stale connections against endpoints missing from the update. + // TODO: we should really only mark a connection stale if the proto was UDP + // and the (ip, port, proto) was removed from the endpoints. for svcPort := range proxier.endpointsMap { if !activeEndpoints[svcPort] { glog.V(2).Infof("Removing endpoints for %q", svcPort) @@ -668,6 +633,57 @@ func (proxier *Proxier) OnEndpointsUpdate(allEndpoints []api.Endpoints) { proxier.deleteEndpointConnections(staleConnections) } +// Gather information about all the endpoint state for a given api.Endpoints. +// This can not detect all stale connections, so the caller should also check +// for entries that were totally removed. +func accumulateEndpointsMap(endpoints *api.Endpoints, hostname string, + curEndpoints map[proxy.ServicePortName][]*endpointsInfo, + newEndpoints *map[proxy.ServicePortName][]*endpointsInfo, + svcPortToInfoMap *map[proxy.ServicePortName][]hostPortInfo, + staleConnections *map[endpointServicePair]bool, + activeEndpoints *map[proxy.ServicePortName]bool) { + + // We need to build a map of portname -> all ip:ports for that + // portname. Explode Endpoints.Subsets[*] into this structure. + portsToEndpoints := map[string][]hostPortInfo{} + for i := range endpoints.Subsets { + ss := &endpoints.Subsets[i] + for i := range ss.Ports { + port := &ss.Ports[i] + for i := range ss.Addresses { + addr := &ss.Addresses[i] + hostPortObject := hostPortInfo{ + host: addr.IP, + port: int(port.Port), + isLocal: addr.NodeName != nil && *addr.NodeName == hostname, + } + portsToEndpoints[port.Name] = append(portsToEndpoints[port.Name], hostPortObject) + } + } + } + for portname := range portsToEndpoints { + svcPort := proxy.ServicePortName{ + NamespacedName: types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}, + Port: portname, + } + (*svcPortToInfoMap)[svcPort] = portsToEndpoints[portname] + newEPList := flattenValidEndpoints(portsToEndpoints[portname]) + // Flatten the list of current endpoint infos to just a list of ips as strings + curEndpointIPs := flattenEndpointsInfo(curEndpoints[svcPort]) + if len(curEndpointIPs) != len(newEPList) || !slicesEquiv(slice.CopyStrings(curEndpointIPs), newEPList) { + glog.V(3).Infof("Setting endpoints for %q to %+v", svcPort, newEndpoints) + // Gather stale connections to removed endpoints + removedEndpoints := getRemovedEndpoints(curEndpointIPs, newEPList) + for _, ep := range removedEndpoints { + (*staleConnections)[endpointServicePair{endpoint: ep, servicePortName: svcPort}] = true + } + } + // Once the set operations using the list of ips are complete, build the list of endpoint infos + (*newEndpoints)[svcPort] = buildEndpointInfoList(portsToEndpoints[portname], newEPList) + (*activeEndpoints)[svcPort] = true + } +} + // updateHealthCheckEntries - send the new set of local endpoints to the health checker func (proxier *Proxier) updateHealthCheckEntries(name types.NamespacedName, hostPorts []hostPortInfo) { if !utilfeature.DefaultFeatureGate.Enabled(features.ExternalTrafficLocalOnly) { diff --git a/pkg/proxy/iptables/proxier_test.go b/pkg/proxy/iptables/proxier_test.go index 472432b8313..e763b95d36f 100644 --- a/pkg/proxy/iptables/proxier_test.go +++ b/pkg/proxy/iptables/proxier_test.go @@ -19,6 +19,8 @@ package iptables import ( "testing" + "github.com/davecgh/go-spew/spew" + "fmt" "net" "strings" @@ -1172,4 +1174,311 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) { } } +// This is a coarse test, but it offers some modicum of confidence as the code is evolved. +func Test_accumulateEndpointsMap(t *testing.T) { + testCases := []struct { + newEndpoints api.Endpoints + oldEndpoints map[proxy.ServicePortName][]*endpointsInfo + expectedNew map[proxy.ServicePortName][]*endpointsInfo + expectedActive []proxy.ServicePortName + expectedStale []endpointServicePair + }{{ + // Case[0]: nothing + newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) {}), + oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{}, + expectedNew: map[proxy.ServicePortName][]*endpointsInfo{}, + expectedActive: []proxy.ServicePortName{}, + expectedStale: []endpointServicePair{}, + }, { + // Case[1]: no changes, unnamed port + newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { + ept.Subsets = []api.EndpointSubset{ + { + Addresses: []api.EndpointAddress{{ + IP: "1.1.1.1", + }}, + Ports: []api.EndpointPort{{ + Name: "", + Port: 11, + }}, + }, + } + }), + oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", ""): { + {"1.1.1.1:11", false}, + }, + }, + expectedNew: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", ""): { + {"1.1.1.1:11", false}, + }, + }, + expectedActive: []proxy.ServicePortName{makeServicePortName("ns1", "ep1", "")}, + expectedStale: []endpointServicePair{}, + }, { + // Case[2]: no changes, named port + newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { + ept.Subsets = []api.EndpointSubset{ + { + Addresses: []api.EndpointAddress{{ + IP: "1.1.1.1", + }}, + Ports: []api.EndpointPort{{ + Name: "port", + Port: 11, + }}, + }, + } + }), + oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", "port"): { + {"1.1.1.1:11", false}, + }, + }, + expectedNew: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", "port"): { + {"1.1.1.1:11", false}, + }, + }, + expectedActive: []proxy.ServicePortName{makeServicePortName("ns1", "ep1", "port")}, + expectedStale: []endpointServicePair{}, + }, { + // Case[3]: new port + newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { + ept.Subsets = []api.EndpointSubset{ + { + Addresses: []api.EndpointAddress{{ + IP: "1.1.1.1", + }}, + Ports: []api.EndpointPort{{ + Port: 11, + }}, + }, + } + }), + oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", ""): {}, + }, + expectedNew: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", ""): { + {"1.1.1.1:11", false}, + }, + }, + expectedActive: []proxy.ServicePortName{makeServicePortName("ns1", "ep1", "")}, + expectedStale: []endpointServicePair{}, + }, { + // Case[4]: remove port + newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) {}), + oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", ""): { + {"1.1.1.1:11", false}, + }, + }, + expectedNew: map[proxy.ServicePortName][]*endpointsInfo{}, + expectedActive: []proxy.ServicePortName{}, + expectedStale: []endpointServicePair{ /* can't detect this one */ }, + }, { + // Case[5]: new IP and port + newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { + ept.Subsets = []api.EndpointSubset{ + { + Addresses: []api.EndpointAddress{{ + IP: "1.1.1.1", + }, { + IP: "2.2.2.2", + }}, + Ports: []api.EndpointPort{{ + Name: "p1", + Port: 11, + }, { + Name: "p2", + Port: 22, + }}, + }, + } + }), + oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", "p1"): { + {"1.1.1.1:11", false}, + }, + }, + expectedNew: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", "p1"): { + {"1.1.1.1:11", false}, + {"2.2.2.2:11", false}, + }, + makeServicePortName("ns1", "ep1", "p2"): { + {"1.1.1.1:22", false}, + {"2.2.2.2:22", false}, + }, + }, + expectedActive: []proxy.ServicePortName{ + makeServicePortName("ns1", "ep1", "p1"), + makeServicePortName("ns1", "ep1", "p2"), + }, + expectedStale: []endpointServicePair{}, + }, { + // Case[6]: remove IP and port + newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { + ept.Subsets = []api.EndpointSubset{ + { + Addresses: []api.EndpointAddress{{ + IP: "1.1.1.1", + }}, + Ports: []api.EndpointPort{{ + Name: "p1", + Port: 11, + }}, + }, + } + }), + oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", "p1"): { + {"1.1.1.1:11", false}, + {"2.2.2.2:11", false}, + }, + makeServicePortName("ns1", "ep1", "p2"): { + {"1.1.1.1:22", false}, + {"2.2.2.2:22", false}, + }, + }, + expectedNew: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", "p1"): { + {"1.1.1.1:11", false}, + }, + }, + expectedActive: []proxy.ServicePortName{ + makeServicePortName("ns1", "ep1", "p1"), + }, + expectedStale: []endpointServicePair{{ + endpoint: "2.2.2.2:11", + servicePortName: makeServicePortName("ns1", "ep1", "p1"), + }}, + }, { + // Case[7]: rename port + newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { + ept.Subsets = []api.EndpointSubset{ + { + Addresses: []api.EndpointAddress{{ + IP: "1.1.1.1", + }}, + Ports: []api.EndpointPort{{ + Name: "p2", + Port: 11, + }}, + }, + } + }), + oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", "p1"): { + {"1.1.1.1:11", false}, + }, + }, + expectedNew: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", "p2"): { + {"1.1.1.1:11", false}, + }, + }, + expectedActive: []proxy.ServicePortName{ + makeServicePortName("ns1", "ep1", "p2"), + }, + expectedStale: []endpointServicePair{}, + }, { + // Case[8]: renumber port + newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { + ept.Subsets = []api.EndpointSubset{ + { + Addresses: []api.EndpointAddress{{ + IP: "1.1.1.1", + }}, + Ports: []api.EndpointPort{{ + Name: "p1", + Port: 22, + }}, + }, + } + }), + oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", "p1"): { + {"1.1.1.1:11", false}, + }, + }, + expectedNew: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", "p1"): { + {"1.1.1.1:22", false}, + }, + }, + expectedActive: []proxy.ServicePortName{ + makeServicePortName("ns1", "ep1", "p1"), + }, + expectedStale: []endpointServicePair{{ + endpoint: "1.1.1.1:11", + servicePortName: makeServicePortName("ns1", "ep1", "p1"), + }}, + }} + + for tci, tc := range testCases { + // outputs + newEndpoints := map[proxy.ServicePortName][]*endpointsInfo{} + svcPortToInfoMap := map[proxy.ServicePortName][]hostPortInfo{} + staleConnections := map[endpointServicePair]bool{} + activeEndpoints := map[proxy.ServicePortName]bool{} + accumulateEndpointsMap(&tc.newEndpoints, "host", tc.oldEndpoints, + &newEndpoints, &svcPortToInfoMap, &staleConnections, &activeEndpoints) + + if len(newEndpoints) != len(tc.expectedNew) { + t.Errorf("[%d] expected %d new, got %d: %v", tci, len(tc.expectedNew), len(newEndpoints), spew.Sdump(newEndpoints)) + } + for x := range tc.expectedNew { + if len(newEndpoints[x]) != len(tc.expectedNew[x]) { + t.Errorf("[%d] expected %d endpoints for %v, got %d", tci, len(tc.expectedNew[x]), x, len(newEndpoints[x])) + } else { + for i := range newEndpoints[x] { + if *(newEndpoints[x][i]) != *(tc.expectedNew[x][i]) { + t.Errorf("[%d] expected new[%v][%d] to be %v, got %v", tci, x, i, tc.expectedNew[x][i], *(newEndpoints[x][i])) + } + } + } + } + if len(activeEndpoints) != len(tc.expectedActive) { + t.Errorf("[%d] expected %d active, got %d: %v", tci, len(tc.expectedActive), len(activeEndpoints), activeEndpoints) + } + for _, x := range tc.expectedActive { + if activeEndpoints[x] != true { + t.Errorf("[%d] expected active[%v], but didn't find it: %v", tci, x, activeEndpoints) + } + } + if len(staleConnections) != len(tc.expectedStale) { + t.Errorf("[%d] expected %d stale, got %d: %v", tci, len(tc.expectedStale), len(staleConnections), staleConnections) + } + for _, x := range tc.expectedStale { + if staleConnections[x] != true { + t.Errorf("[%d] expected stale[%v], but didn't find it: %v", tci, x, staleConnections) + } + } + } +} + +func makeTestEndpoints(namespace, name string, eptFunc func(*api.Endpoints)) api.Endpoints { + ept := api.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + } + eptFunc(&ept) + return ept +} + +func makeServicePortName(ns, name, port string) proxy.ServicePortName { + return proxy.ServicePortName{ + NamespacedName: types.NamespacedName{ + Namespace: ns, + Name: name, + }, + Port: port, + } +} + // TODO(thockin): add *more* tests for syncProxyRules() or break it down further and test the pieces.