fix(e2e): access nodes via test container in LB network tests

Signed-off-by: knight42 <anonymousknight96@gmail.com>
This commit is contained in:
knight42
2020-05-29 12:00:40 +08:00
parent bcdb3c568e
commit 1b9f11c9a9
4 changed files with 250 additions and 86 deletions

View File

@@ -17,7 +17,6 @@ limitations under the License.
package network
import (
"bytes"
"context"
"encoding/json"
"errors"
@@ -615,6 +614,32 @@ func testHTTPHealthCheckNodePort(ip string, port int, request string) (bool, err
return false, fmt.Errorf("unexpected HTTP response code %s from health check responder at %s", resp.Status, url)
}
func testHTTPHealthCheckNodePortFromTestContainer(config *e2enetwork.NetworkingTestConfig, host string, port int, timeout time.Duration, expectSucceed bool, threshold int) error {
count := 0
pollFn := func() (bool, error) {
statusCode, err := config.GetHTTPCodeFromTestContainer(
"/healthz",
host,
port)
if err != nil {
framework.Logf("Got error reading status code from http://%s:%d/healthz via test container: %v", host, port, err)
return false, nil
}
framework.Logf("Got status code from http://%s:%d/healthz via test container: %d", host, port, statusCode)
success := statusCode == 200
if (success && expectSucceed) ||
(!success && !expectSucceed) {
count++
}
return count >= threshold, nil
}
err := wait.PollImmediate(time.Second, timeout, pollFn)
if err != nil {
return fmt.Errorf("error waiting for healthCheckNodePort: expected at least %d succeed=%v on %v:%v/healthz, got %d", threshold, expectSucceed, host, port, count)
}
return nil
}
// Does an HTTP GET, but does not reuse TCP connections
// This masks problems where the iptables rule has changed, but we don't see it
func httpGetNoConnectionPoolTimeout(url string, timeout time.Duration) (*http.Response, error) {
@@ -683,6 +708,23 @@ func waitForApiserverUp(c clientset.Interface) error {
return fmt.Errorf("waiting for apiserver timed out")
}
// getEndpointNodesWithInternalIP returns a map of nodenames:internal-ip on which the
// endpoints of the Service are running.
func getEndpointNodesWithInternalIP(jig *e2eservice.TestJig) (map[string]string, error) {
nodesWithIPs, err := jig.GetEndpointNodesWithIP(v1.NodeInternalIP)
if err != nil {
return nil, err
}
endpointsNodeMap := make(map[string]string, len(nodesWithIPs))
for nodeName, internalIPs := range nodesWithIPs {
if len(internalIPs) < 1 {
return nil, fmt.Errorf("no internal ip found for node %s", nodeName)
}
endpointsNodeMap[nodeName] = internalIPs[0]
}
return endpointsNodeMap, nil
}
var _ = SIGDescribe("Services", func() {
f := framework.NewDefaultFramework("services")
@@ -2885,11 +2927,18 @@ var _ = SIGDescribe("ESIPP [Slow]", func() {
framework.ExpectNoError(err)
// Make sure we didn't leak the health check node port.
threshold := 2
nodes, err := jig.GetEndpointNodes()
const threshold = 2
nodes, err := getEndpointNodesWithInternalIP(jig)
framework.ExpectNoError(err)
for _, ips := range nodes {
err := TestHTTPHealthCheckNodePort(ips[0], healthCheckNodePort, "/healthz", e2eservice.KubeProxyEndpointLagTimeout, false, threshold)
config := e2enetwork.NewNetworkingTestConfig(f, false, false)
for _, internalIP := range nodes {
err := testHTTPHealthCheckNodePortFromTestContainer(
config,
internalIP,
healthCheckNodePort,
e2eservice.KubeProxyLagTimeout,
false,
threshold)
framework.ExpectNoError(err)
}
err = cs.CoreV1().Services(svc.Namespace).Delete(context.TODO(), svc.Name, metav1.DeleteOptions{})
@@ -2923,17 +2972,20 @@ var _ = SIGDescribe("ESIPP [Slow]", func() {
}()
tcpNodePort := int(svc.Spec.Ports[0].NodePort)
endpointsNodeMap, err := jig.GetEndpointNodes()
framework.ExpectNoError(err)
path := "/clientip"
for nodeName, nodeIPs := range endpointsNodeMap {
nodeIP := nodeIPs[0]
ginkgo.By(fmt.Sprintf("reading clientIP using the TCP service's NodePort, on node %v: %v%v%v", nodeName, nodeIP, tcpNodePort, path))
content := GetHTTPContent(nodeIP, tcpNodePort, e2eservice.KubeProxyLagTimeout, path)
clientIP := content.String()
framework.Logf("ClientIP detected by target pod using NodePort is %s", clientIP)
if strings.HasPrefix(clientIP, "10.") {
endpointsNodeMap, err := getEndpointNodesWithInternalIP(jig)
framework.ExpectNoError(err)
dialCmd := "clientip"
config := e2enetwork.NewNetworkingTestConfig(f, false, false)
for nodeName, nodeIP := range endpointsNodeMap {
ginkgo.By(fmt.Sprintf("reading clientIP using the TCP service's NodePort, on node %v: %v:%v/%v", nodeName, nodeIP, tcpNodePort, dialCmd))
clientIP, err := GetHTTPContentFromTestContainer(config, nodeIP, tcpNodePort, e2eservice.KubeProxyLagTimeout, dialCmd)
framework.ExpectNoError(err)
framework.Logf("ClientIP detected by target pod using NodePort is %s, the ip of test container is %s", clientIP, config.TestContainerPod.Status.PodIP)
// the clientIP returned by agnhost contains port
if !strings.HasPrefix(clientIP, config.TestContainerPod.Status.PodIP) {
framework.Failf("Source IP was NOT preserved")
}
}
@@ -2970,13 +3022,13 @@ var _ = SIGDescribe("ESIPP [Slow]", func() {
framework.Failf("Service HealthCheck NodePort was not allocated")
}
ips := e2enode.CollectAddresses(nodes, v1.NodeExternalIP)
ips := e2enode.CollectAddresses(nodes, v1.NodeInternalIP)
ingressIP := e2eservice.GetIngressPoint(&svc.Status.LoadBalancer.Ingress[0])
svcTCPPort := int(svc.Spec.Ports[0].Port)
threshold := 2
path := "/healthz"
const threshold = 2
config := e2enetwork.NewNetworkingTestConfig(f, false, false)
for i := 0; i < len(nodes.Items); i++ {
endpointNodeName := nodes.Items[i].Name
@@ -2995,15 +3047,21 @@ var _ = SIGDescribe("ESIPP [Slow]", func() {
// HealthCheck should pass only on the node where num(endpoints) > 0
// All other nodes should fail the healthcheck on the service healthCheckNodePort
for n, publicIP := range ips {
for n, internalIP := range ips {
// Make sure the loadbalancer picked up the health check change.
// Confirm traffic can reach backend through LB before checking healthcheck nodeport.
e2eservice.TestReachableHTTP(ingressIP, svcTCPPort, e2eservice.KubeProxyLagTimeout)
expectedSuccess := nodes.Items[n].Name == endpointNodeName
port := strconv.Itoa(healthCheckNodePort)
ipPort := net.JoinHostPort(publicIP, port)
framework.Logf("Health checking %s, http://%s%s, expectedSuccess %v", nodes.Items[n].Name, ipPort, path, expectedSuccess)
err := TestHTTPHealthCheckNodePort(publicIP, healthCheckNodePort, path, e2eservice.KubeProxyEndpointLagTimeout, expectedSuccess, threshold)
ipPort := net.JoinHostPort(internalIP, port)
framework.Logf("Health checking %s, http://%s/healthz, expectedSuccess %v", nodes.Items[n].Name, ipPort, expectedSuccess)
err := testHTTPHealthCheckNodePortFromTestContainer(
config,
internalIP,
healthCheckNodePort,
e2eservice.KubeProxyEndpointLagTimeout,
expectedSuccess,
threshold)
framework.ExpectNoError(err)
}
framework.ExpectNoError(e2erc.DeleteRCAndWaitForGC(f.ClientSet, namespace, serviceName))
@@ -3069,8 +3127,7 @@ var _ = SIGDescribe("ESIPP [Slow]", func() {
}
})
// TODO: Get rid of [DisabledForLargeClusters] tag when issue #90047 is fixed.
ginkgo.It("should handle updates to ExternalTrafficPolicy field [DisabledForLargeClusters]", func() {
ginkgo.It("should handle updates to ExternalTrafficPolicy field", func() {
namespace := f.Namespace.Name
serviceName := "external-local-update"
jig := e2eservice.NewTestJig(cs, namespace, serviceName)
@@ -3103,42 +3160,71 @@ var _ = SIGDescribe("ESIPP [Slow]", func() {
framework.Failf("Service HealthCheck NodePort still present")
}
endpointNodeMap, err := jig.GetEndpointNodes()
epNodes, err := jig.ListNodesWithEndpoint()
framework.ExpectNoError(err)
noEndpointNodeMap := map[string][]string{}
for _, n := range nodes.Items {
if _, ok := endpointNodeMap[n.Name]; ok {
continue
// map from name of nodes with endpoint to internal ip
// it is assumed that there is only a single node with the endpoint
endpointNodeMap := make(map[string]string)
// map from name of nodes without endpoint to internal ip
noEndpointNodeMap := make(map[string]string)
for _, node := range epNodes {
ips := e2enode.GetAddresses(&node, v1.NodeInternalIP)
if len(ips) < 1 {
framework.Failf("No internal ip found for node %s", node.Name)
}
noEndpointNodeMap[n.Name] = e2enode.GetAddresses(&n, v1.NodeExternalIP)
endpointNodeMap[node.Name] = ips[0]
}
for _, n := range nodes.Items {
ips := e2enode.GetAddresses(&n, v1.NodeInternalIP)
if len(ips) < 1 {
framework.Failf("No internal ip found for node %s", n.Name)
}
if _, ok := endpointNodeMap[n.Name]; !ok {
noEndpointNodeMap[n.Name] = ips[0]
}
}
framework.ExpectNotEqual(len(endpointNodeMap), 0)
framework.ExpectNotEqual(len(noEndpointNodeMap), 0)
svcTCPPort := int(svc.Spec.Ports[0].Port)
svcNodePort := int(svc.Spec.Ports[0].NodePort)
ingressIP := e2eservice.GetIngressPoint(&svc.Status.LoadBalancer.Ingress[0])
path := "/clientip"
dialCmd := "clientip"
config := e2enetwork.NewNetworkingTestConfig(f, false, false)
ginkgo.By(fmt.Sprintf("endpoints present on nodes %v, absent on nodes %v", endpointNodeMap, noEndpointNodeMap))
for nodeName, nodeIPs := range noEndpointNodeMap {
ginkgo.By(fmt.Sprintf("Checking %v (%v:%v%v) proxies to endpoints on another node", nodeName, nodeIPs[0], svcNodePort, path))
GetHTTPContent(nodeIPs[0], svcNodePort, e2eservice.KubeProxyLagTimeout, path)
for nodeName, nodeIP := range noEndpointNodeMap {
ginkgo.By(fmt.Sprintf("Checking %v (%v:%v/%v) proxies to endpoints on another node", nodeName, nodeIP[0], svcNodePort, dialCmd))
_, err := GetHTTPContentFromTestContainer(config, nodeIP, svcNodePort, e2eservice.KubeProxyLagTimeout, dialCmd)
framework.ExpectNoError(err, "Could not reach HTTP service through %v:%v/%v after %v", nodeIP, svcNodePort, dialCmd, e2eservice.KubeProxyLagTimeout)
}
for nodeName, nodeIPs := range endpointNodeMap {
ginkgo.By(fmt.Sprintf("checking kube-proxy health check fails on node with endpoint (%s), public IP %s", nodeName, nodeIPs[0]))
var body bytes.Buffer
pollfn := func() (bool, error) {
result := e2enetwork.PokeHTTP(nodeIPs[0], healthCheckNodePort, "/healthz", nil)
if result.Code == 0 {
for nodeName, nodeIP := range endpointNodeMap {
ginkgo.By(fmt.Sprintf("checking kube-proxy health check fails on node with endpoint (%s), public IP %s", nodeName, nodeIP))
var body string
pollFn := func() (bool, error) {
// we expect connection failure here, but not other errors
resp, err := config.GetResponseFromTestContainer(
"http",
"healthz",
nodeIP,
healthCheckNodePort)
if err != nil {
return false, nil
}
if len(resp.Errors) > 0 {
return true, nil
}
body.Reset()
body.Write(result.Body)
if len(resp.Responses) > 0 {
body = resp.Responses[0]
}
return false, nil
}
if pollErr := wait.PollImmediate(framework.Poll, e2eservice.TestTimeout, pollfn); pollErr != nil {
if pollErr := wait.PollImmediate(framework.Poll, e2eservice.TestTimeout, pollFn); pollErr != nil {
framework.Failf("Kube-proxy still exposing health check on node %v:%v, after ESIPP was turned off. body %s",
nodeName, healthCheckNodePort, body.String())
nodeName, healthCheckNodePort, body)
}
}

View File

@@ -53,6 +53,23 @@ func GetHTTPContent(host string, port int, timeout time.Duration, url string) by
return body
}
// GetHTTPContentFromTestContainer returns the content of the given url by HTTP via a test container.
func GetHTTPContentFromTestContainer(config *e2enetwork.NetworkingTestConfig, host string, port int, timeout time.Duration, dialCmd string) (string, error) {
var body string
pollFn := func() (bool, error) {
resp, err := config.GetResponseFromTestContainer("http", dialCmd, host, port)
if err != nil || len(resp.Errors) > 0 || len(resp.Responses) == 0 {
return false, nil
}
body = resp.Responses[0]
return true, nil
}
if pollErr := wait.PollImmediate(framework.Poll, timeout, pollFn); pollErr != nil {
return "", pollErr
}
return body, nil
}
// DescribeSvc logs the output of kubectl describe svc for the given namespace
func DescribeSvc(ns string) {
framework.Logf("\nOutput of kubectl describe svc:\n")