add test for udp connection flush
This commit is contained in:
		| @@ -71,8 +71,6 @@ const kubeMarkMasqChain utiliptables.Chain = "KUBE-MARK-MASQ" | ||||
| // TODO(thockin): Remove this for v1.3 or v1.4. | ||||
| const oldIptablesMasqueradeMark = "0x4d415351" | ||||
|  | ||||
| const noConnectionToDelete = "0 flow entries have been deleted" | ||||
|  | ||||
| // IptablesVersioner can query the current iptables version. | ||||
| type IptablesVersioner interface { | ||||
| 	// returns "X.Y.Z" | ||||
| @@ -439,20 +437,20 @@ func (proxier *Proxier) OnServiceUpdate(allServices []api.Service) { | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	staleUDPService := sets.NewString() | ||||
| 	staleUDPServices := sets.NewString() | ||||
| 	// Remove services missing from the update. | ||||
| 	for name := range proxier.serviceMap { | ||||
| 		if !activeServices[name] { | ||||
| 			glog.V(1).Infof("Removing service %q", name) | ||||
| 			if proxier.serviceMap[name].protocol == api.ProtocolUDP { | ||||
| 				staleUDPService.Insert(proxier.serviceMap[name].clusterIP.String()) | ||||
| 				staleUDPServices.Insert(proxier.serviceMap[name].clusterIP.String()) | ||||
| 			} | ||||
| 			delete(proxier.serviceMap, name) | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	proxier.syncProxyRules() | ||||
| 	proxier.deleteServiceConnection(staleUDPService.List()) | ||||
| 	proxier.deleteServiceConnections(staleUDPServices.List()) | ||||
|  | ||||
| } | ||||
|  | ||||
| @@ -519,7 +517,7 @@ func (proxier *Proxier) OnEndpointsUpdate(allEndpoints []api.Endpoints) { | ||||
| 	} | ||||
|  | ||||
| 	proxier.syncProxyRules() | ||||
| 	proxier.deleteEndpointConnection(staleConnections) | ||||
| 	proxier.deleteEndpointConnections(staleConnections) | ||||
| } | ||||
|  | ||||
| // used in OnEndpointsUpdate | ||||
| @@ -585,17 +583,19 @@ type endpointServicePair struct { | ||||
| 	servicePortName proxy.ServicePortName | ||||
| } | ||||
|  | ||||
| const noConnectionToDelete = "0 flow entries have been deleted" | ||||
|  | ||||
| // After a UDP endpoint has been removed, we must flush any pending conntrack entries to it, or else we | ||||
| // risk sending more traffic to it, all of which will be lost (because UDP). | ||||
| // This assumes the proxier mutex is held | ||||
| func (proxier *Proxier) deleteEndpointConnection(connectionMap map[endpointServicePair]bool) { | ||||
| func (proxier *Proxier) deleteEndpointConnections(connectionMap map[endpointServicePair]bool) { | ||||
| 	for epSvcPair := range connectionMap { | ||||
| 		if svcInfo, ok := proxier.serviceMap[epSvcPair.servicePortName]; ok && svcInfo.protocol == api.ProtocolUDP { | ||||
| 			endpointIP := strings.Split(epSvcPair.endpoint, ":")[0] | ||||
| 			glog.V(2).Infof("Deleting connection to service IP %s, endpoint IP %s", svcInfo.clusterIP.String(), endpointIP) | ||||
| 			glog.V(2).Infof("Deleting connection tracking state for service IP %s, endpoint IP %s", svcInfo.clusterIP.String(), endpointIP) | ||||
| 			err := proxier.execConntrackTool("-D", "--orig-dst", svcInfo.clusterIP.String(), "--dst-nat", endpointIP, "-p", "udp") | ||||
| 			if err != nil && !strings.Contains(err.Error(), noConnectionToDelete) { | ||||
| 				// TO DO: Better handling for deletion failure. When failure occur, stale udp connection may not get flushed. | ||||
| 				// TODO: Better handling for deletion failure. When failure occur, stale udp connection may not get flushed. | ||||
| 				// These stale udp connection will keep black hole traffic. Making this a best effort operation for now, since it | ||||
| 				// is expensive to baby sit all udp connections to kubernetes services. | ||||
| 				glog.Errorf("conntrack return with error: %v", err) | ||||
| @@ -604,13 +604,13 @@ func (proxier *Proxier) deleteEndpointConnection(connectionMap map[endpointServi | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // deleteServiceConnection use conntrack-tool to delete udp connection specified by service ip | ||||
| func (proxier *Proxier) deleteServiceConnection(svcIPs []string) { | ||||
| // deleteServiceConnection use conntrack-tool to delete UDP connection specified by service ip | ||||
| func (proxier *Proxier) deleteServiceConnections(svcIPs []string) { | ||||
| 	for _, ip := range svcIPs { | ||||
| 		glog.V(2).Infof("Deleting udp connection to service IP %s", ip) | ||||
| 		glog.V(2).Infof("Deleting connection tracking state for service IP %s", ip) | ||||
| 		err := proxier.execConntrackTool("-D", "--orig-dst", ip, "-p", "udp") | ||||
| 		if err != nil && !strings.Contains(err.Error(), noConnectionToDelete) { | ||||
| 			// TO DO: Better handling for deletion failure. When failure occur, stale udp connection may not get flushed. | ||||
| 			// TODO: Better handling for deletion failure. When failure occur, stale udp connection may not get flushed. | ||||
| 			// These stale udp connection will keep black hole traffic. Making this a best effort operation for now, since it | ||||
| 			// is expensive to baby sit all udp connections to kubernetes services. | ||||
| 			glog.Errorf("conntrack return with error: %v", err) | ||||
| @@ -626,7 +626,7 @@ func (proxier *Proxier) execConntrackTool(parameters ...string) error { | ||||
| 	} | ||||
| 	output, err := proxier.exec.Command(conntrackPath, parameters...).CombinedOutput() | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("Conntrack command returns: %s, error message: %s", string(output), err) | ||||
| 		return fmt.Errorf("Conntrack command returned: %q, error message: %s", string(output), err) | ||||
| 	} | ||||
| 	return nil | ||||
| } | ||||
|   | ||||
| @@ -19,7 +19,14 @@ package iptables | ||||
| import ( | ||||
| 	"testing" | ||||
|  | ||||
| 	"fmt" | ||||
| 	"k8s.io/kubernetes/pkg/api" | ||||
| 	"k8s.io/kubernetes/pkg/proxy" | ||||
| 	"k8s.io/kubernetes/pkg/types" | ||||
| 	"k8s.io/kubernetes/pkg/util/exec" | ||||
| 	utiliptables "k8s.io/kubernetes/pkg/util/iptables" | ||||
| 	"net" | ||||
| 	"strings" | ||||
| ) | ||||
|  | ||||
| func checkAllLines(t *testing.T, table utiliptables.Table, save []byte, expectedLines map[utiliptables.Chain]string) { | ||||
| @@ -156,4 +163,215 @@ func TestGetChainLinesMultipleTables(t *testing.T) { | ||||
| 	checkAllLines(t, utiliptables.TableNAT, []byte(iptables_save), expected) | ||||
| } | ||||
|  | ||||
| func TestGetRemovedEndpoints(t *testing.T) { | ||||
| 	testCases := []struct { | ||||
| 		currentEndpoints []string | ||||
| 		newEndpoints     []string | ||||
| 		removedEndpoints []string | ||||
| 	}{ | ||||
| 		{ | ||||
| 			currentEndpoints: []string{"10.0.2.1:80", "10.0.2.2:80"}, | ||||
| 			newEndpoints:     []string{"10.0.2.1:80", "10.0.2.2:80"}, | ||||
| 			removedEndpoints: []string{}, | ||||
| 		}, | ||||
| 		{ | ||||
| 			currentEndpoints: []string{"10.0.2.1:80", "10.0.2.2:80", "10.0.2.3:80"}, | ||||
| 			newEndpoints:     []string{"10.0.2.1:80", "10.0.2.2:80"}, | ||||
| 			removedEndpoints: []string{"10.0.2.3:80"}, | ||||
| 		}, | ||||
| 		{ | ||||
| 			currentEndpoints: []string{}, | ||||
| 			newEndpoints:     []string{"10.0.2.1:80", "10.0.2.2:80"}, | ||||
| 			removedEndpoints: []string{}, | ||||
| 		}, | ||||
| 		{ | ||||
| 			currentEndpoints: []string{"10.0.2.1:80", "10.0.2.2:80"}, | ||||
| 			newEndpoints:     []string{}, | ||||
| 			removedEndpoints: []string{"10.0.2.1:80", "10.0.2.2:80"}, | ||||
| 		}, | ||||
| 		{ | ||||
| 			currentEndpoints: []string{"10.0.2.1:80", "10.0.2.2:80", "10.0.2.2:443"}, | ||||
| 			newEndpoints:     []string{"10.0.2.1:80", "10.0.2.2:80"}, | ||||
| 			removedEndpoints: []string{"10.0.2.2:443"}, | ||||
| 		}, | ||||
| 	} | ||||
|  | ||||
| 	for i := range testCases { | ||||
| 		res := getRemovedEndpoints(testCases[i].currentEndpoints, testCases[i].newEndpoints) | ||||
| 		if !slicesEquiv(res, testCases[i].removedEndpoints) { | ||||
| 			t.Errorf("Expected: %v, but getRemovedEndpoints returned: %v", testCases[i].removedEndpoints, res) | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func TestExecConntrackTool(t *testing.T) { | ||||
| 	fcmd := exec.FakeCmd{ | ||||
| 		CombinedOutputScript: []exec.FakeCombinedOutputAction{ | ||||
| 			func() ([]byte, error) { return []byte("1 flow entries have been deleted"), nil }, | ||||
| 			func() ([]byte, error) { return []byte("1 flow entries have been deleted"), nil }, | ||||
| 			func() ([]byte, error) { | ||||
| 				return []byte(""), fmt.Errorf("conntrack v1.4.2 (conntrack-tools): 0 flow entries have been deleted.") | ||||
| 			}, | ||||
| 		}, | ||||
| 	} | ||||
| 	fexec := exec.FakeExec{ | ||||
| 		CommandScript: []exec.FakeCommandAction{ | ||||
| 			func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) }, | ||||
| 			func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) }, | ||||
| 			func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) }, | ||||
| 		}, | ||||
| 		LookPathFunc: func(cmd string) (string, error) { return cmd, nil }, | ||||
| 	} | ||||
|  | ||||
| 	fakeProxier := Proxier{exec: &fexec} | ||||
|  | ||||
| 	testCases := [][]string{ | ||||
| 		{"-L", "-p", "udp"}, | ||||
| 		{"-D", "-p", "udp", "-d", "10.0.240.1"}, | ||||
| 		{"-D", "-p", "udp", "--orig-dst", "10.240.0.2", "--dst-nat", "10.0.10.2"}, | ||||
| 	} | ||||
|  | ||||
| 	expectErr := []bool{false, false, true} | ||||
|  | ||||
| 	for i := range testCases { | ||||
| 		err := fakeProxier.execConntrackTool(testCases[i]...) | ||||
|  | ||||
| 		if expectErr[i] { | ||||
| 			if err == nil { | ||||
| 				t.Errorf("expected err, got %v", err) | ||||
| 			} | ||||
| 		} else { | ||||
| 			if err != nil { | ||||
| 				t.Errorf("expected success, got %v", err) | ||||
| 			} | ||||
| 		} | ||||
|  | ||||
| 		execCmd := strings.Join(fcmd.CombinedOutputLog[i], " ") | ||||
| 		expectCmd := fmt.Sprintf("%s %s", "conntrack", strings.Join(testCases[i], " ")) | ||||
|  | ||||
| 		if execCmd != expectCmd { | ||||
| 			t.Errorf("expect execute command: %s, but got: %s", expectCmd, execCmd) | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func newFakeServiceInfo(service proxy.ServicePortName, ip net.IP, protocol api.Protocol) *serviceInfo { | ||||
| 	return &serviceInfo{ | ||||
| 		sessionAffinityType: api.ServiceAffinityNone, // default | ||||
| 		stickyMaxAgeSeconds: 180,                     // TODO: paramaterize this in the API. | ||||
| 		clusterIP:           ip, | ||||
| 		protocol:            protocol, | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func TestDeleteEndpointConnections(t *testing.T) { | ||||
| 	fcmd := exec.FakeCmd{ | ||||
| 		CombinedOutputScript: []exec.FakeCombinedOutputAction{ | ||||
| 			func() ([]byte, error) { return []byte("1 flow entries have been deleted"), nil }, | ||||
| 			func() ([]byte, error) { | ||||
| 				return []byte(""), fmt.Errorf("conntrack v1.4.2 (conntrack-tools): 0 flow entries have been deleted.") | ||||
| 			}, | ||||
| 		}, | ||||
| 	} | ||||
| 	fexec := exec.FakeExec{ | ||||
| 		CommandScript: []exec.FakeCommandAction{ | ||||
| 			func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) }, | ||||
| 			func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) }, | ||||
| 		}, | ||||
| 		LookPathFunc: func(cmd string) (string, error) { return cmd, nil }, | ||||
| 	} | ||||
|  | ||||
| 	serviceMap := make(map[proxy.ServicePortName]*serviceInfo) | ||||
| 	svc1 := proxy.ServicePortName{types.NamespacedName{Namespace: "ns1", Name: "svc1"}, ""} | ||||
| 	svc2 := proxy.ServicePortName{types.NamespacedName{Namespace: "ns1", Name: "svc2"}, ""} | ||||
| 	serviceMap[svc1] = newFakeServiceInfo(svc1, net.IPv4(10, 20, 30, 40), api.ProtocolUDP) | ||||
| 	serviceMap[svc2] = newFakeServiceInfo(svc1, net.IPv4(10, 20, 30, 41), api.ProtocolTCP) | ||||
|  | ||||
| 	fakeProxier := Proxier{exec: &fexec, serviceMap: serviceMap} | ||||
|  | ||||
| 	testCases := []endpointServicePair{ | ||||
| 		{ | ||||
| 			endpoint:        "10.240.0.3:80", | ||||
| 			servicePortName: svc1, | ||||
| 		}, | ||||
| 		{ | ||||
| 			endpoint:        "10.240.0.4:80", | ||||
| 			servicePortName: svc1, | ||||
| 		}, | ||||
| 		{ | ||||
| 			endpoint:        "10.240.0.5:80", | ||||
| 			servicePortName: svc2, | ||||
| 		}, | ||||
| 	} | ||||
|  | ||||
| 	expectCommandExecCount := 0 | ||||
| 	for i := range testCases { | ||||
| 		input := map[endpointServicePair]bool{testCases[i]: true} | ||||
| 		fakeProxier.deleteEndpointConnections(input) | ||||
| 		svcInfo := fakeProxier.serviceMap[testCases[i].servicePortName] | ||||
| 		if svcInfo.protocol == api.ProtocolUDP { | ||||
| 			svcIp := svcInfo.clusterIP.String() | ||||
| 			endpointIp := strings.Split(testCases[i].endpoint, ":")[0] | ||||
| 			expectCommand := fmt.Sprintf("conntrack -D --orig-dst %s --dst-nat %s -p udp", svcIp, endpointIp) | ||||
| 			execCommand := strings.Join(fcmd.CombinedOutputLog[expectCommandExecCount], " ") | ||||
| 			if expectCommand != execCommand { | ||||
| 				t.Errorf("Exepect comand: %s, but executed %s", expectCommand, execCommand) | ||||
| 			} | ||||
| 			expectCommandExecCount += 1 | ||||
| 		} | ||||
|  | ||||
| 		if expectCommandExecCount != fexec.CommandCalls { | ||||
| 			t.Errorf("Exepect comand executed %d times, but got %d", expectCommandExecCount, fexec.CommandCalls) | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func TestDeleteServiceConnections(t *testing.T) { | ||||
| 	fcmd := exec.FakeCmd{ | ||||
| 		CombinedOutputScript: []exec.FakeCombinedOutputAction{ | ||||
| 			func() ([]byte, error) { return []byte("1 flow entries have been deleted"), nil }, | ||||
| 			func() ([]byte, error) { return []byte("1 flow entries have been deleted"), nil }, | ||||
| 			func() ([]byte, error) { | ||||
| 				return []byte(""), fmt.Errorf("conntrack v1.4.2 (conntrack-tools): 0 flow entries have been deleted.") | ||||
| 			}, | ||||
| 		}, | ||||
| 	} | ||||
| 	fexec := exec.FakeExec{ | ||||
| 		CommandScript: []exec.FakeCommandAction{ | ||||
| 			func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) }, | ||||
| 			func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) }, | ||||
| 			func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) }, | ||||
| 		}, | ||||
| 		LookPathFunc: func(cmd string) (string, error) { return cmd, nil }, | ||||
| 	} | ||||
|  | ||||
| 	fakeProxier := Proxier{exec: &fexec} | ||||
|  | ||||
| 	testCases := [][]string{ | ||||
| 		{ | ||||
| 			"10.240.0.3", | ||||
| 			"10.240.0.5", | ||||
| 		}, | ||||
| 		{ | ||||
| 			"10.240.0.4", | ||||
| 		}, | ||||
| 	} | ||||
|  | ||||
| 	svcCount := 0 | ||||
| 	for i := range testCases { | ||||
| 		fakeProxier.deleteServiceConnections(testCases[i]) | ||||
| 		for _, ip := range testCases[i] { | ||||
| 			expectCommand := fmt.Sprintf("conntrack -D --orig-dst %s -p udp", ip) | ||||
| 			execCommand := strings.Join(fcmd.CombinedOutputLog[svcCount], " ") | ||||
| 			if expectCommand != execCommand { | ||||
| 				t.Errorf("Exepect comand: %s, but executed %s", expectCommand, execCommand) | ||||
| 			} | ||||
| 			svcCount += 1 | ||||
| 		} | ||||
| 		if svcCount != fexec.CommandCalls { | ||||
| 			t.Errorf("Exepect comand executed %d times, but got %d", svcCount, fexec.CommandCalls) | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // TODO(thockin): add a test for syncProxyRules() or break it down further and test the pieces. | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 Minhan Xia
					Minhan Xia