diff --git a/pkg/proxy/iptables/BUILD b/pkg/proxy/iptables/BUILD index b1cfcd32fd8..821b14da8a3 100644 --- a/pkg/proxy/iptables/BUILD +++ b/pkg/proxy/iptables/BUILD @@ -20,7 +20,6 @@ go_library( "//pkg/proxy/healthcheck:go_default_library", "//pkg/util/exec:go_default_library", "//pkg/util/iptables:go_default_library", - "//pkg/util/slice:go_default_library", "//pkg/util/sysctl:go_default_library", "//pkg/util/version:go_default_library", "//vendor:github.com/davecgh/go-spew/spew", diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index 58f443ad6ae..6208dc00128 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -48,7 +48,6 @@ import ( "k8s.io/kubernetes/pkg/proxy/healthcheck" utilexec "k8s.io/kubernetes/pkg/util/exec" utiliptables "k8s.io/kubernetes/pkg/util/iptables" - "k8s.io/kubernetes/pkg/util/slice" utilsysctl "k8s.io/kubernetes/pkg/util/sysctl" utilversion "k8s.io/kubernetes/pkg/util/version" ) @@ -556,15 +555,6 @@ func (proxier *Proxier) OnServiceUpdate(allServices []api.Service) { proxier.deleteServiceConnections(staleUDPServices.List()) } -// Generate a list of ip strings from the list of endpoint infos -func flattenEndpointsInfo(endPoints []*endpointsInfo) []string { - var endpointIPs []string - for _, ep := range endPoints { - endpointIPs = append(endpointIPs, ep.endpoint) - } - return endpointIPs -} - // Reconstruct the list of endpoint infos from the endpointIP list // Use the slice of endpointIPs to rebuild a slice of corresponding {endpointIP, localEndpointOnly} infos // from the full []hostPortInfo slice. @@ -614,28 +604,34 @@ func (proxier *Proxier) OnEndpointsUpdate(allEndpoints []api.Endpoints) { // Convert a slice of api.Endpoints objects into a map of service-port -> endpoints. func updateEndpoints(allEndpoints []api.Endpoints, curMap map[proxy.ServicePortName][]*endpointsInfo, hostname string, - healthChecker healthChecker) (newMap map[proxy.ServicePortName][]*endpointsInfo, stale map[endpointServicePair]bool) { + healthChecker healthChecker) (newMap map[proxy.ServicePortName][]*endpointsInfo, staleSet map[endpointServicePair]bool) { // return values newMap = make(map[proxy.ServicePortName][]*endpointsInfo) - stale = make(map[endpointServicePair]bool) + staleSet = make(map[endpointServicePair]bool) // local svcPortToInfoMap := make(map[proxy.ServicePortName][]hostPortInfo) // Update endpoints for services. for i := range allEndpoints { - accumulateEndpointsMap(&allEndpoints[i], hostname, curMap, &newMap, &svcPortToInfoMap, &stale) + accumulateEndpointsMap(&allEndpoints[i], hostname, curMap, &newMap, &svcPortToInfoMap) } // Check stale connections against endpoints missing from the update. // TODO: we should really only mark a connection stale if the proto was UDP // and the (ip, port, proto) was removed from the endpoints. - for svcPort := range curMap { - if _, found := newMap[svcPort]; !found { - glog.V(3).Infof("Removing endpoints for %q", svcPort) - // record endpoints of unactive service to stale connections - for _, ep := range curMap[svcPort] { - stale[endpointServicePair{endpoint: ep.endpoint, servicePortName: svcPort}] = true + for svcPort, epList := range curMap { + for _, ep := range epList { + stale := true + for i := range newMap[svcPort] { + if *newMap[svcPort][i] == *ep { + stale = false + break + } + } + if stale { + glog.V(4).Infof("Stale endpoint %v -> %v", svcPort, ep.endpoint) + staleSet[endpointServicePair{endpoint: ep.endpoint, servicePortName: svcPort}] = true } } } @@ -652,7 +648,7 @@ func updateEndpoints(allEndpoints []api.Endpoints, curMap map[proxy.ServicePortN updateHealthCheckEntries(svcPort.NamespacedName, svcPortToInfoMap[svcPort], healthChecker) } - return newMap, stale + return newMap, staleSet } // Gather information about all the endpoint state for a given api.Endpoints. @@ -667,8 +663,7 @@ func updateEndpoints(allEndpoints []api.Endpoints, curMap map[proxy.ServicePortN func accumulateEndpointsMap(endpoints *api.Endpoints, hostname string, curEndpoints map[proxy.ServicePortName][]*endpointsInfo, newEndpoints *map[proxy.ServicePortName][]*endpointsInfo, - svcPortToInfoMap *map[proxy.ServicePortName][]hostPortInfo, - staleConnections *map[endpointServicePair]bool) { + svcPortToInfoMap *map[proxy.ServicePortName][]hostPortInfo) { // We need to build a map of portname -> all ip:ports for that // portname. Explode Endpoints.Subsets[*] into this structure. @@ -694,16 +689,6 @@ func accumulateEndpointsMap(endpoints *api.Endpoints, hostname string, // Decompose the lists of endpoints into details of what was changed for the caller. for svcPort, hostPortInfos := range *svcPortToInfoMap { newEPList := flattenValidEndpoints(hostPortInfos) - // Flatten the list of current endpoint infos to just a list of ips as strings - curEndpointIPs := flattenEndpointsInfo(curEndpoints[svcPort]) - if len(curEndpointIPs) != len(newEPList) || !slicesEquiv(slice.CopyStrings(curEndpointIPs), newEPList) { - glog.V(3).Infof("Setting endpoints for %q to %+v", svcPort, newEndpoints) - // Gather stale connections to removed endpoints - removedEndpoints := getRemovedEndpoints(curEndpointIPs, newEPList) - for _, ep := range removedEndpoints { - (*staleConnections)[endpointServicePair{endpoint: ep, servicePortName: svcPort}] = true - } - } glog.V(3).Infof("Setting endpoints for %q to %+v", svcPort, newEPList) // Once the set operations using the list of ips are complete, build the list of endpoint infos (*newEndpoints)[svcPort] = buildEndpointInfoList(hostPortInfos, newEPList) @@ -738,17 +723,6 @@ func isValidEndpoint(hpp *hostPortInfo) bool { return hpp.host != "" && hpp.port > 0 } -// Tests whether two slices are equivalent. This sorts both slices in-place. -func slicesEquiv(lhs, rhs []string) bool { - if len(lhs) != len(rhs) { - return false - } - if reflect.DeepEqual(slice.SortStrings(lhs), slice.SortStrings(rhs)) { - return true - } - return false -} - func flattenValidEndpoints(endpoints []hostPortInfo) []string { // Convert Endpoint objects into strings for easier use later. var result []string @@ -803,11 +777,6 @@ func servicePortEndpointChainName(s proxy.ServicePortName, protocol string, endp return utiliptables.Chain("KUBE-SEP-" + encoded[:16]) } -// getRemovedEndpoints returns the endpoint IPs that are missing in the new endpoints -func getRemovedEndpoints(curEndpoints, newEndpoints []string) []string { - return sets.NewString(curEndpoints...).Difference(sets.NewString(newEndpoints...)).List() -} - type endpointServicePair struct { endpoint string servicePortName proxy.ServicePortName diff --git a/pkg/proxy/iptables/proxier_test.go b/pkg/proxy/iptables/proxier_test.go index b46c8b0e499..7fbc27c1d42 100644 --- a/pkg/proxy/iptables/proxier_test.go +++ b/pkg/proxy/iptables/proxier_test.go @@ -172,47 +172,6 @@ func TestGetChainLinesMultipleTables(t *testing.T) { checkAllLines(t, utiliptables.TableNAT, []byte(iptables_save), expected) } -func TestGetRemovedEndpoints(t *testing.T) { - testCases := []struct { - currentEndpoints []string - newEndpoints []string - removedEndpoints []string - }{ - { - currentEndpoints: []string{"10.0.2.1:80", "10.0.2.2:80"}, - newEndpoints: []string{"10.0.2.1:80", "10.0.2.2:80"}, - removedEndpoints: []string{}, - }, - { - currentEndpoints: []string{"10.0.2.1:80", "10.0.2.2:80", "10.0.2.3:80"}, - newEndpoints: []string{"10.0.2.1:80", "10.0.2.2:80"}, - removedEndpoints: []string{"10.0.2.3:80"}, - }, - { - currentEndpoints: []string{}, - newEndpoints: []string{"10.0.2.1:80", "10.0.2.2:80"}, - removedEndpoints: []string{}, - }, - { - currentEndpoints: []string{"10.0.2.1:80", "10.0.2.2:80"}, - newEndpoints: []string{}, - removedEndpoints: []string{"10.0.2.1:80", "10.0.2.2:80"}, - }, - { - currentEndpoints: []string{"10.0.2.1:80", "10.0.2.2:80", "10.0.2.2:443"}, - newEndpoints: []string{"10.0.2.1:80", "10.0.2.2:80"}, - removedEndpoints: []string{"10.0.2.2:443"}, - }, - } - - for i := range testCases { - res := getRemovedEndpoints(testCases[i].currentEndpoints, testCases[i].newEndpoints) - if !slicesEquiv(res, testCases[i].removedEndpoints) { - t.Errorf("Expected: %v, but getRemovedEndpoints returned: %v", testCases[i].removedEndpoints, res) - } - } -} - func TestExecConntrackTool(t *testing.T) { fcmd := exec.FakeCmd{ CombinedOutputScript: []exec.FakeCombinedOutputAction{ @@ -1268,16 +1227,14 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) { // This is a coarse test, but it offers some modicum of confidence as the code is evolved. func Test_accumulateEndpointsMap(t *testing.T) { testCases := []struct { - newEndpoints api.Endpoints - oldEndpoints map[proxy.ServicePortName][]*endpointsInfo - expectedNew map[proxy.ServicePortName][]*endpointsInfo - expectedStale []endpointServicePair + newEndpoints api.Endpoints + oldEndpoints map[proxy.ServicePortName][]*endpointsInfo + expectedNew map[proxy.ServicePortName][]*endpointsInfo }{{ // Case[0]: nothing - newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) {}), - oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{}, - expectedNew: map[proxy.ServicePortName][]*endpointsInfo{}, - expectedStale: []endpointServicePair{}, + newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) {}), + oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{}, + expectedNew: map[proxy.ServicePortName][]*endpointsInfo{}, }, { // Case[1]: no changes, unnamed port newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { @@ -1303,7 +1260,6 @@ func Test_accumulateEndpointsMap(t *testing.T) { {"1.1.1.1:11", false}, }, }, - expectedStale: []endpointServicePair{}, }, { // Case[2]: no changes, named port newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { @@ -1329,7 +1285,6 @@ func Test_accumulateEndpointsMap(t *testing.T) { {"1.1.1.1:11", false}, }, }, - expectedStale: []endpointServicePair{}, }, { // Case[3]: new port newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { @@ -1352,7 +1307,6 @@ func Test_accumulateEndpointsMap(t *testing.T) { {"1.1.1.1:11", false}, }, }, - expectedStale: []endpointServicePair{}, }, { // Case[4]: remove port newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) {}), @@ -1361,8 +1315,7 @@ func Test_accumulateEndpointsMap(t *testing.T) { {"1.1.1.1:11", false}, }, }, - expectedNew: map[proxy.ServicePortName][]*endpointsInfo{}, - expectedStale: []endpointServicePair{ /* can't detect this one */ }, + expectedNew: map[proxy.ServicePortName][]*endpointsInfo{}, }, { // Case[5]: new IP and port newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { @@ -1398,7 +1351,6 @@ func Test_accumulateEndpointsMap(t *testing.T) { {"2.2.2.2:22", false}, }, }, - expectedStale: []endpointServicePair{}, }, { // Case[6]: remove IP and port newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { @@ -1429,10 +1381,6 @@ func Test_accumulateEndpointsMap(t *testing.T) { {"1.1.1.1:11", false}, }, }, - expectedStale: []endpointServicePair{{ - endpoint: "2.2.2.2:11", - servicePortName: makeServicePortName("ns1", "ep1", "p1"), - }}, }, { // Case[7]: rename port newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { @@ -1458,7 +1406,6 @@ func Test_accumulateEndpointsMap(t *testing.T) { {"1.1.1.1:11", false}, }, }, - expectedStale: []endpointServicePair{ /* can't detect this one */ }, }, { // Case[8]: renumber port newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { @@ -1484,19 +1431,13 @@ func Test_accumulateEndpointsMap(t *testing.T) { {"1.1.1.1:22", false}, }, }, - expectedStale: []endpointServicePair{{ - endpoint: "1.1.1.1:11", - servicePortName: makeServicePortName("ns1", "ep1", "p1"), - }}, }} for tci, tc := range testCases { // outputs newEndpoints := map[proxy.ServicePortName][]*endpointsInfo{} svcPortToInfoMap := map[proxy.ServicePortName][]hostPortInfo{} - staleConnections := map[endpointServicePair]bool{} - accumulateEndpointsMap(&tc.newEndpoints, "host", tc.oldEndpoints, - &newEndpoints, &svcPortToInfoMap, &staleConnections) + accumulateEndpointsMap(&tc.newEndpoints, "host", tc.oldEndpoints, &newEndpoints, &svcPortToInfoMap) if len(newEndpoints) != len(tc.expectedNew) { t.Errorf("[%d] expected %d new, got %d: %v", tci, len(tc.expectedNew), len(newEndpoints), spew.Sdump(newEndpoints)) @@ -1512,14 +1453,6 @@ func Test_accumulateEndpointsMap(t *testing.T) { } } } - if len(staleConnections) != len(tc.expectedStale) { - t.Errorf("[%d] expected %d stale, got %d: %v", tci, len(tc.expectedStale), len(staleConnections), staleConnections) - } - for _, x := range tc.expectedStale { - if staleConnections[x] != true { - t.Errorf("[%d] expected stale[%v], but didn't find it: %v", tci, x, staleConnections) - } - } } }