1235 lines
		
	
	
		
			53 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			1235 lines
		
	
	
		
			53 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
/*
 | 
						|
Copyright 2014 The Kubernetes Authors.
 | 
						|
 | 
						|
Licensed under the Apache License, Version 2.0 (the "License");
 | 
						|
you may not use this file except in compliance with the License.
 | 
						|
You may obtain a copy of the License at
 | 
						|
 | 
						|
    http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
 | 
						|
Unless required by applicable law or agreed to in writing, software
 | 
						|
distributed under the License is distributed on an "AS IS" BASIS,
 | 
						|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
						|
See the License for the specific language governing permissions and
 | 
						|
limitations under the License.
 | 
						|
*/
 | 
						|
 | 
						|
package network
 | 
						|
 | 
						|
import (
 | 
						|
	"context"
 | 
						|
	"crypto/tls"
 | 
						|
	"encoding/json"
 | 
						|
	"fmt"
 | 
						|
	"io"
 | 
						|
	"net"
 | 
						|
	"net/http"
 | 
						|
	"strconv"
 | 
						|
	"strings"
 | 
						|
	"time"
 | 
						|
 | 
						|
	"github.com/onsi/ginkgo/v2"
 | 
						|
	v1 "k8s.io/api/core/v1"
 | 
						|
	apierrors "k8s.io/apimachinery/pkg/api/errors"
 | 
						|
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
						|
	"k8s.io/apimachinery/pkg/util/intstr"
 | 
						|
	utilnet "k8s.io/apimachinery/pkg/util/net"
 | 
						|
	"k8s.io/apimachinery/pkg/util/sets"
 | 
						|
	"k8s.io/apimachinery/pkg/util/uuid"
 | 
						|
	"k8s.io/apimachinery/pkg/util/wait"
 | 
						|
	clientset "k8s.io/client-go/kubernetes"
 | 
						|
	coreclientset "k8s.io/client-go/kubernetes/typed/core/v1"
 | 
						|
	"k8s.io/kubernetes/test/e2e/framework"
 | 
						|
	e2ekubectl "k8s.io/kubernetes/test/e2e/framework/kubectl"
 | 
						|
	e2enode "k8s.io/kubernetes/test/e2e/framework/node"
 | 
						|
	e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
 | 
						|
	e2epodoutput "k8s.io/kubernetes/test/e2e/framework/pod/output"
 | 
						|
	e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
 | 
						|
	e2essh "k8s.io/kubernetes/test/e2e/framework/ssh"
 | 
						|
	storageutils "k8s.io/kubernetes/test/e2e/storage/utils"
 | 
						|
	imageutils "k8s.io/kubernetes/test/utils/image"
 | 
						|
	netutils "k8s.io/utils/net"
 | 
						|
)
 | 
						|
 | 
						|
const (
 | 
						|
	// EndpointHTTPPort is an endpoint HTTP port for testing.
 | 
						|
	EndpointHTTPPort = 8083
 | 
						|
	// EndpointUDPPort is an endpoint UDP port for testing.
 | 
						|
	EndpointUDPPort = 8081
 | 
						|
	// EndpointSCTPPort is an endpoint SCTP port for testing.
 | 
						|
	EndpointSCTPPort = 8082
 | 
						|
	// testContainerHTTPPort is the test container http port.
 | 
						|
	testContainerHTTPPort = 9080
 | 
						|
	// ClusterHTTPPort is a cluster HTTP port for testing.
 | 
						|
	ClusterHTTPPort = 80
 | 
						|
	// ClusterUDPPort is a cluster UDP port for testing.
 | 
						|
	ClusterUDPPort = 90
 | 
						|
	// ClusterSCTPPort is a cluster SCTP port for testing.
 | 
						|
	ClusterSCTPPort            = 95
 | 
						|
	testPodName                = "test-container-pod"
 | 
						|
	hostTestPodName            = "host-test-container-pod"
 | 
						|
	nodePortServiceName        = "node-port-service"
 | 
						|
	sessionAffinityServiceName = "session-affinity-service"
 | 
						|
	// wait time between poll attempts of a Service vip and/or nodePort.
 | 
						|
	// coupled with testTries to produce a net timeout value.
 | 
						|
	hitEndpointRetryDelay = 2 * time.Second
 | 
						|
	// Number of retries to hit a given set of endpoints. Needs to be high
 | 
						|
	// because we verify iptables statistical rr loadbalancing.
 | 
						|
	testTries = 30
 | 
						|
	// Maximum number of pods in a test, to make test work in large clusters.
 | 
						|
	maxNetProxyPodsCount = 10
 | 
						|
	// SessionAffinityChecks is number of checks to hit a given set of endpoints when enable session affinity.
 | 
						|
	SessionAffinityChecks = 10
 | 
						|
	// RegexIPv4 is a regex to match IPv4 addresses
 | 
						|
	RegexIPv4 = "(?:\\d+)\\.(?:\\d+)\\.(?:\\d+)\\.(?:\\d+)"
 | 
						|
	// RegexIPv6 is a regex to match IPv6 addresses
 | 
						|
	RegexIPv6                 = "(?:(?:(?:(?:(?:(?:(?:[0-9a-fA-F]{1,4})):){6})(?:(?:(?:(?:(?:[0-9a-fA-F]{1,4})):(?:(?:[0-9a-fA-F]{1,4})))|(?:(?:(?:(?:(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9]))\\.){3}(?:(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9])))))))|(?:(?:::(?:(?:(?:[0-9a-fA-F]{1,4})):){5})(?:(?:(?:(?:(?:[0-9a-fA-F]{1,4})):(?:(?:[0-9a-fA-F]{1,4})))|(?:(?:(?:(?:(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9]))\\.){3}(?:(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9])))))))|(?:(?:(?:(?:(?:[0-9a-fA-F]{1,4})))?::(?:(?:(?:[0-9a-fA-F]{1,4})):){4})(?:(?:(?:(?:(?:[0-9a-fA-F]{1,4})):(?:(?:[0-9a-fA-F]{1,4})))|(?:(?:(?:(?:(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9]))\\.){3}(?:(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9])))))))|(?:(?:(?:(?:(?:(?:[0-9a-fA-F]{1,4})):){0,1}(?:(?:[0-9a-fA-F]{1,4})))?::(?:(?:(?:[0-9a-fA-F]{1,4})):){3})(?:(?:(?:(?:(?:[0-9a-fA-F]{1,4})):(?:(?:[0-9a-fA-F]{1,4})))|(?:(?:(?:(?:(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9]))\\.){3}(?:(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9])))))))|(?:(?:(?:(?:(?:(?:[0-9a-fA-F]{1,4})):){0,2}(?:(?:[0-9a-fA-F]{1,4})))?::(?:(?:(?:[0-9a-fA-F]{1,4})):){2})(?:(?:(?:(?:(?:[0-9a-fA-F]{1,4})):(?:(?:[0-9a-fA-F]{1,4})))|(?:(?:(?:(?:(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9]))\\.){3}(?:(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9])))))))|(?:(?:(?:(?:(?:(?:[0-9a-fA-F]{1,4})):){0,3}(?:(?:[0-9a-fA-F]{1,4})))?::(?:(?:[0-9a-fA-F]{1,4})):)(?:(?:(?:(?:(?:[0-9a-fA-F]{1,4})):(?:(?:[0-9a-fA-F]{1,4})))|(?:(?:(?:(?:(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9]))\\.){3}(?:(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9])))))))|(?:(?:(?:(?:(?:(?:[0-9a-fA-F]{1,4})):){0,4}(?:(?:[0-9a-fA-F]{1,4})))?::)(?:(?:(?:(?:(?:[0-9a-fA-F]{1,4})):(?:(?:[0-9a-fA-F]{1,4})))|(?:(?:(?:(?:(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9]))\\.){3}(?:(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9])))))))|(?:(?:(?:(?:(?:(?:[0-9a-fA-F]{1,4})):){0,5}(?:(?:[0-9a-fA-F]{1,4})))?::)(?:(?:[0-9a-fA-F]{1,4})))|(?:(?:(?:(?:(?:(?:[0-9a-fA-F]{1,4})):){0,6}(?:(?:[0-9a-fA-F]{1,4})))?::))))"
 | 
						|
	resizeNodeReadyTimeout    = 2 * time.Minute
 | 
						|
	resizeNodeNotReadyTimeout = 2 * time.Minute
 | 
						|
	// netexec dial commands
 | 
						|
	// the destination will echo its hostname.
 | 
						|
	echoHostname = "hostname"
 | 
						|
)
 | 
						|
 | 
						|
// Option is used to configure the NetworkingTest object
 | 
						|
type Option func(*NetworkingTestConfig)
 | 
						|
 | 
						|
// EnableSCTP listen on SCTP ports on the endpoints
 | 
						|
func EnableSCTP(config *NetworkingTestConfig) {
 | 
						|
	config.SCTPEnabled = true
 | 
						|
}
 | 
						|
 | 
						|
// EnableDualStack create Dual Stack services
 | 
						|
func EnableDualStack(config *NetworkingTestConfig) {
 | 
						|
	config.DualStackEnabled = true
 | 
						|
}
 | 
						|
 | 
						|
// UseHostNetwork run the test container with HostNetwork=true.
 | 
						|
func UseHostNetwork(config *NetworkingTestConfig) {
 | 
						|
	config.HostNetwork = true
 | 
						|
}
 | 
						|
 | 
						|
// EndpointsUseHostNetwork run the endpoints pods with HostNetwork=true.
 | 
						|
func EndpointsUseHostNetwork(config *NetworkingTestConfig) {
 | 
						|
	config.EndpointsHostNetwork = true
 | 
						|
}
 | 
						|
 | 
						|
// PreferExternalAddresses prefer node External Addresses for the tests
 | 
						|
func PreferExternalAddresses(config *NetworkingTestConfig) {
 | 
						|
	config.PreferExternalAddresses = true
 | 
						|
}
 | 
						|
 | 
						|
// NewNetworkingTestConfig creates and sets up a new test config helper.
 | 
						|
func NewNetworkingTestConfig(ctx context.Context, f *framework.Framework, setters ...Option) *NetworkingTestConfig {
 | 
						|
	// default options
 | 
						|
	config := &NetworkingTestConfig{
 | 
						|
		f:         f,
 | 
						|
		Namespace: f.Namespace.Name,
 | 
						|
	}
 | 
						|
	for _, setter := range setters {
 | 
						|
		setter(config)
 | 
						|
	}
 | 
						|
	ginkgo.By(fmt.Sprintf("Performing setup for networking test in namespace %v", config.Namespace))
 | 
						|
	config.setup(ctx, getServiceSelector())
 | 
						|
	return config
 | 
						|
}
 | 
						|
 | 
						|
// NewCoreNetworkingTestConfig creates and sets up a new test config helper for Node E2E.
 | 
						|
func NewCoreNetworkingTestConfig(ctx context.Context, f *framework.Framework, hostNetwork bool) *NetworkingTestConfig {
 | 
						|
	// default options
 | 
						|
	config := &NetworkingTestConfig{
 | 
						|
		f:           f,
 | 
						|
		Namespace:   f.Namespace.Name,
 | 
						|
		HostNetwork: hostNetwork,
 | 
						|
	}
 | 
						|
	ginkgo.By(fmt.Sprintf("Performing setup for networking test in namespace %v", config.Namespace))
 | 
						|
	config.setupCore(ctx, getServiceSelector())
 | 
						|
	return config
 | 
						|
}
 | 
						|
 | 
						|
func getServiceSelector() map[string]string {
 | 
						|
	ginkgo.By("creating a selector")
 | 
						|
	selectorName := "selector-" + string(uuid.NewUUID())
 | 
						|
	serviceSelector := map[string]string{
 | 
						|
		selectorName: "true",
 | 
						|
	}
 | 
						|
	return serviceSelector
 | 
						|
}
 | 
						|
 | 
						|
// NetworkingTestConfig is a convenience class around some utility methods
 | 
						|
// for testing kubeproxy/networking/services/endpoints.
 | 
						|
type NetworkingTestConfig struct {
 | 
						|
	// TestContainerPod is a test pod running the netexec image. It is capable
 | 
						|
	// of executing tcp/udp requests against ip:port.
 | 
						|
	TestContainerPod *v1.Pod
 | 
						|
	// HostTestContainerPod is a pod running using the hostexec image.
 | 
						|
	HostTestContainerPod *v1.Pod
 | 
						|
	// if the HostTestContainerPod is running with HostNetwork=true.
 | 
						|
	HostNetwork bool
 | 
						|
	// if the endpoints Pods are running with HostNetwork=true.
 | 
						|
	EndpointsHostNetwork bool
 | 
						|
	// if the test pods are listening on sctp port. We need this as sctp tests
 | 
						|
	// are marked as disruptive as they may load the sctp module.
 | 
						|
	SCTPEnabled bool
 | 
						|
	// DualStackEnabled enables dual stack on services
 | 
						|
	DualStackEnabled bool
 | 
						|
	// EndpointPods are the pods belonging to the Service created by this
 | 
						|
	// test config. Each invocation of `setup` creates a service with
 | 
						|
	// 1 pod per node running the netexecImage.
 | 
						|
	EndpointPods []*v1.Pod
 | 
						|
	f            *framework.Framework
 | 
						|
	podClient    *e2epod.PodClient
 | 
						|
	// NodePortService is a Service with Type=NodePort spanning over all
 | 
						|
	// endpointPods.
 | 
						|
	NodePortService *v1.Service
 | 
						|
	// SessionAffinityService is a Service with SessionAffinity=ClientIP
 | 
						|
	// spanning over all endpointPods.
 | 
						|
	SessionAffinityService *v1.Service
 | 
						|
	// Nodes is a list of nodes in the cluster.
 | 
						|
	Nodes []v1.Node
 | 
						|
	// MaxTries is the number of retries tolerated for tests run against
 | 
						|
	// endpoints and services created by this config.
 | 
						|
	MaxTries int
 | 
						|
	// The ClusterIP of the Service created by this test config.
 | 
						|
	ClusterIP string
 | 
						|
	// The SecondaryClusterIP of the Service created by this test config.
 | 
						|
	SecondaryClusterIP string
 | 
						|
	// NodeIP it's an ExternalIP if the node has one,
 | 
						|
	// or an InternalIP if not, for use in nodePort testing.
 | 
						|
	NodeIP string
 | 
						|
	// SecondaryNodeIP it's an ExternalIP of the secondary IP family if the node has one,
 | 
						|
	// or an InternalIP if not, for usein nodePort testing.
 | 
						|
	SecondaryNodeIP string
 | 
						|
	// The http/udp/sctp nodePorts of the Service.
 | 
						|
	NodeHTTPPort int
 | 
						|
	NodeUDPPort  int
 | 
						|
	NodeSCTPPort int
 | 
						|
	// The kubernetes namespace within which all resources for this
 | 
						|
	// config are created
 | 
						|
	Namespace string
 | 
						|
	// Whether to prefer node External Addresses for the tests
 | 
						|
	PreferExternalAddresses bool
 | 
						|
}
 | 
						|
 | 
						|
// NetexecDialResponse represents the response returned by the `netexec` subcommand of `agnhost`
 | 
						|
type NetexecDialResponse struct {
 | 
						|
	Responses []string `json:"responses"`
 | 
						|
	Errors    []string `json:"errors"`
 | 
						|
}
 | 
						|
 | 
						|
// DialFromEndpointContainer executes a curl via kubectl exec in an endpoint container.   Returns an error to be handled by the caller.
 | 
						|
func (config *NetworkingTestConfig) DialFromEndpointContainer(ctx context.Context, protocol, targetIP string, targetPort, maxTries, minTries int, expectedEps sets.String) error {
 | 
						|
	return config.DialFromContainer(ctx, protocol, echoHostname, config.EndpointPods[0].Status.PodIP, targetIP, EndpointHTTPPort, targetPort, maxTries, minTries, expectedEps)
 | 
						|
}
 | 
						|
 | 
						|
// DialFromTestContainer executes a curl via kubectl exec in a test container. Returns an error to be handled by the caller.
 | 
						|
func (config *NetworkingTestConfig) DialFromTestContainer(ctx context.Context, protocol, targetIP string, targetPort, maxTries, minTries int, expectedEps sets.String) error {
 | 
						|
	return config.DialFromContainer(ctx, protocol, echoHostname, config.TestContainerPod.Status.PodIP, targetIP, testContainerHTTPPort, targetPort, maxTries, minTries, expectedEps)
 | 
						|
}
 | 
						|
 | 
						|
// DialEchoFromTestContainer executes a curl via kubectl exec in a test container. The response is expected to match the echoMessage,  Returns an error to be handled by the caller.
 | 
						|
func (config *NetworkingTestConfig) DialEchoFromTestContainer(ctx context.Context, protocol, targetIP string, targetPort, maxTries, minTries int, echoMessage string) error {
 | 
						|
	expectedResponse := sets.NewString()
 | 
						|
	expectedResponse.Insert(echoMessage)
 | 
						|
	var dialCommand string
 | 
						|
 | 
						|
	// NOTE(claudiub): netexec /dialCommand will send a request to the given targetIP and targetPort as follows:
 | 
						|
	// for HTTP: it will send a request to: http://targetIP:targetPort/dialCommand
 | 
						|
	// for UDP: it will send targetCommand as a message. The consumer receives the data message and looks for
 | 
						|
	// a few starting strings, including echo, and treats it accordingly.
 | 
						|
	if protocol == "http" {
 | 
						|
		dialCommand = fmt.Sprintf("echo?msg=%s", echoMessage)
 | 
						|
	} else {
 | 
						|
		dialCommand = fmt.Sprintf("echo%%20%s", echoMessage)
 | 
						|
	}
 | 
						|
	return config.DialFromContainer(ctx, protocol, dialCommand, config.TestContainerPod.Status.PodIP, targetIP, testContainerHTTPPort, targetPort, maxTries, minTries, expectedResponse)
 | 
						|
}
 | 
						|
 | 
						|
// diagnoseMissingEndpoints prints debug information about the endpoints that
 | 
						|
// are NOT in the given list of foundEndpoints. These are the endpoints we
 | 
						|
// expected a response from.
 | 
						|
func (config *NetworkingTestConfig) diagnoseMissingEndpoints(foundEndpoints sets.String) {
 | 
						|
	for _, e := range config.EndpointPods {
 | 
						|
		if foundEndpoints.Has(e.Name) {
 | 
						|
			continue
 | 
						|
		}
 | 
						|
		framework.Logf("\nOutput of kubectl describe pod %v/%v:\n", e.Namespace, e.Name)
 | 
						|
		desc, _ := e2ekubectl.RunKubectl(
 | 
						|
			e.Namespace, "describe", "pod", e.Name, fmt.Sprintf("--namespace=%v", e.Namespace))
 | 
						|
		framework.Logf(desc)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// EndpointHostnames returns a set of hostnames for existing endpoints.
 | 
						|
func (config *NetworkingTestConfig) EndpointHostnames() sets.String {
 | 
						|
	expectedEps := sets.NewString()
 | 
						|
	for _, p := range config.EndpointPods {
 | 
						|
		if config.EndpointsHostNetwork {
 | 
						|
			expectedEps.Insert(p.Spec.NodeSelector["kubernetes.io/hostname"])
 | 
						|
		} else {
 | 
						|
			expectedEps.Insert(p.Name)
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return expectedEps
 | 
						|
}
 | 
						|
 | 
						|
func makeCURLDialCommand(ipPort, dialCmd, protocol, targetIP string, targetPort int) string {
 | 
						|
	// The current versions of curl included in CentOS and RHEL distros
 | 
						|
	// misinterpret square brackets around IPv6 as globbing, so use the -g
 | 
						|
	// argument to disable globbing to handle the IPv6 case.
 | 
						|
	return fmt.Sprintf("curl -g -q -s 'http://%s/dial?request=%s&protocol=%s&host=%s&port=%d&tries=1'",
 | 
						|
		ipPort,
 | 
						|
		dialCmd,
 | 
						|
		protocol,
 | 
						|
		targetIP,
 | 
						|
		targetPort)
 | 
						|
}
 | 
						|
 | 
						|
// DialFromContainer executes a curl via kubectl exec in a test container,
 | 
						|
// which might then translate to a tcp or udp request based on the protocol
 | 
						|
// argument in the url.
 | 
						|
//   - minTries is the minimum number of curl attempts required before declaring
 | 
						|
//     success. Set to 0 if you'd like to return as soon as all endpoints respond
 | 
						|
//     at least once.
 | 
						|
//   - maxTries is the maximum number of curl attempts. If this many attempts pass
 | 
						|
//     and we don't see all expected endpoints, the test fails.
 | 
						|
//   - targetIP is the source Pod IP that will dial the given dialCommand using the given protocol.
 | 
						|
//   - dialCommand is the command that the targetIP will send to the targetIP using the given protocol.
 | 
						|
//     the dialCommand should be formatted properly for the protocol (http: URL path+parameters,
 | 
						|
//     udp: command%20parameters, where parameters are optional)
 | 
						|
//   - expectedResponses is the unordered set of responses to wait for. The responses are based on
 | 
						|
//     the dialCommand; for example, for the dialCommand "hostname", the expectedResponses
 | 
						|
//     should contain the hostnames reported by each pod in the service through /hostName.
 | 
						|
//
 | 
						|
// maxTries == minTries will confirm that we see the expected endpoints and no
 | 
						|
// more for maxTries. Use this if you want to eg: fail a readiness check on a
 | 
						|
// pod and confirm it doesn't show up as an endpoint.
 | 
						|
// Returns nil if no error, or error message if failed after trying maxTries.
 | 
						|
func (config *NetworkingTestConfig) DialFromContainer(ctx context.Context, protocol, dialCommand, containerIP, targetIP string, containerHTTPPort, targetPort, maxTries, minTries int, expectedResponses sets.String) error {
 | 
						|
	ipPort := net.JoinHostPort(containerIP, strconv.Itoa(containerHTTPPort))
 | 
						|
	cmd := makeCURLDialCommand(ipPort, dialCommand, protocol, targetIP, targetPort)
 | 
						|
 | 
						|
	responses := sets.NewString()
 | 
						|
 | 
						|
	for i := 0; i < maxTries; i++ {
 | 
						|
		resp, err := config.GetResponseFromContainer(ctx, protocol, dialCommand, containerIP, targetIP, containerHTTPPort, targetPort)
 | 
						|
		if err != nil {
 | 
						|
			// A failure to kubectl exec counts as a try, not a hard fail.
 | 
						|
			// Also note that we will keep failing for maxTries in tests where
 | 
						|
			// we confirm unreachability.
 | 
						|
			framework.Logf("GetResponseFromContainer: %s", err)
 | 
						|
			continue
 | 
						|
		}
 | 
						|
		for _, response := range resp.Responses {
 | 
						|
			trimmed := strings.TrimSpace(response)
 | 
						|
			if trimmed != "" {
 | 
						|
				responses.Insert(trimmed)
 | 
						|
			}
 | 
						|
		}
 | 
						|
		if responses.Difference(expectedResponses).Len() > 0 {
 | 
						|
			returnMsg := fmt.Errorf("received unexpected responses... \nAttempt %d\nCommand %v\nretrieved %v\nexpected %v", i, cmd, responses, expectedResponses)
 | 
						|
			// TODO(aojea) Remove once issues.k8s.io/123760 is solved
 | 
						|
			// Dump the nodes network routes and addresses for troubleshooting #123760
 | 
						|
			framework.Logf("encountered error during dial (%v)", returnMsg)
 | 
						|
			hostExec := storageutils.NewHostExec(config.f)
 | 
						|
			ginkgo.DeferCleanup(hostExec.Cleanup)
 | 
						|
			cmd := `echo "IP routes: " && ip route && echo "IP addresses:" && ip addr && echo "Open sockets: " && ss -anp --socket=tcp`
 | 
						|
			for _, node := range config.Nodes {
 | 
						|
				result, err := hostExec.IssueCommandWithResult(ctx, cmd, &node)
 | 
						|
				if err != nil {
 | 
						|
					framework.Logf("error occurred while executing command %s on node: %v", cmd, err)
 | 
						|
					continue
 | 
						|
				}
 | 
						|
				framework.Logf("Dump network information for node %s:\n%s", node.Name, result)
 | 
						|
			}
 | 
						|
			// Dump the node iptables rules and conntrack flows for troubleshooting #123760
 | 
						|
			podList, _ := config.f.ClientSet.CoreV1().Pods("kube-system").List(ctx, metav1.ListOptions{
 | 
						|
				LabelSelector: "k8s-app=kube-proxy",
 | 
						|
			})
 | 
						|
			for _, pod := range podList.Items {
 | 
						|
				// dump only for the node running test-container-pod
 | 
						|
				if pod.Status.HostIP == config.TestContainerPod.Status.HostIP {
 | 
						|
					output, _, _ := e2epod.ExecWithOptions(config.f, e2epod.ExecOptions{
 | 
						|
						Namespace:          "kube-system",
 | 
						|
						PodName:            pod.Name,
 | 
						|
						ContainerName:      "kube-proxy",
 | 
						|
						Command:            []string{"sh", "-c", fmt.Sprintf(`echo "IPTables Dump: " && iptables-save | grep "%s/%s:http" && echo "Conntrack flows: " && conntrack -Ln -p tcp | grep %d`, config.Namespace, config.NodePortService.Name, EndpointHTTPPort)},
 | 
						|
						Stdin:              nil,
 | 
						|
						CaptureStdout:      true,
 | 
						|
						CaptureStderr:      true,
 | 
						|
						PreserveWhitespace: false,
 | 
						|
					})
 | 
						|
					framework.Logf("Dump iptables and connntrack flows\n%s", output)
 | 
						|
					break
 | 
						|
				}
 | 
						|
			}
 | 
						|
			return returnMsg
 | 
						|
		}
 | 
						|
 | 
						|
		framework.Logf("Waiting for responses: %v", expectedResponses.Difference(responses))
 | 
						|
 | 
						|
		// Check against i+1 so we exit if minTries == maxTries.
 | 
						|
		if (responses.Equal(expectedResponses) || responses.Len() == 0 && expectedResponses.Len() == 0) && i+1 >= minTries {
 | 
						|
			framework.Logf("reached %v after %v/%v tries", targetIP, i, maxTries)
 | 
						|
			return nil
 | 
						|
		}
 | 
						|
		// TODO: get rid of this delay #36281
 | 
						|
		time.Sleep(hitEndpointRetryDelay)
 | 
						|
	}
 | 
						|
	if dialCommand == echoHostname {
 | 
						|
		config.diagnoseMissingEndpoints(responses)
 | 
						|
	}
 | 
						|
	returnMsg := fmt.Errorf("did not find expected responses... \nTries %d\nCommand %v\nretrieved %v\nexpected %v", maxTries, cmd, responses, expectedResponses)
 | 
						|
	framework.Logf("encountered error during dial (%v)", returnMsg)
 | 
						|
	return returnMsg
 | 
						|
 | 
						|
}
 | 
						|
 | 
						|
// GetEndpointsFromTestContainer executes a curl via kubectl exec in a test container.
 | 
						|
func (config *NetworkingTestConfig) GetEndpointsFromTestContainer(ctx context.Context, protocol, targetIP string, targetPort, tries int) (sets.String, error) {
 | 
						|
	return config.GetEndpointsFromContainer(ctx, protocol, config.TestContainerPod.Status.PodIP, targetIP, testContainerHTTPPort, targetPort, tries)
 | 
						|
}
 | 
						|
 | 
						|
// GetEndpointsFromContainer executes a curl via kubectl exec in a test container,
 | 
						|
// which might then translate to a tcp or udp request based on the protocol argument
 | 
						|
// in the url. It returns all different endpoints from multiple retries.
 | 
						|
//   - tries is the number of curl attempts. If this many attempts pass and
 | 
						|
//     we don't see any endpoints, the test fails.
 | 
						|
func (config *NetworkingTestConfig) GetEndpointsFromContainer(ctx context.Context, protocol, containerIP, targetIP string, containerHTTPPort, targetPort, tries int) (sets.String, error) {
 | 
						|
	ipPort := net.JoinHostPort(containerIP, strconv.Itoa(containerHTTPPort))
 | 
						|
	cmd := makeCURLDialCommand(ipPort, "hostName", protocol, targetIP, targetPort)
 | 
						|
 | 
						|
	eps := sets.NewString()
 | 
						|
 | 
						|
	for i := 0; i < tries; i++ {
 | 
						|
		stdout, stderr, err := e2epod.ExecShellInPodWithFullOutput(ctx, config.f, config.TestContainerPod.Name, cmd)
 | 
						|
		if err != nil {
 | 
						|
			// A failure to kubectl exec counts as a try, not a hard fail.
 | 
						|
			// Also note that we will keep failing for maxTries in tests where
 | 
						|
			// we confirm unreachability.
 | 
						|
			framework.Logf("Failed to execute %q: %v, stdout: %q, stderr: %q", cmd, err, stdout, stderr)
 | 
						|
		} else {
 | 
						|
			podInfo := fmt.Sprintf("name: %v, namespace: %v, hostIp: %v, podIp: %v, conditions: %v", config.TestContainerPod.Name, config.TestContainerPod.Namespace, config.TestContainerPod.Status.HostIP, config.TestContainerPod.Status.PodIP, config.TestContainerPod.Status.Conditions)
 | 
						|
			framework.Logf("Tries: %d, in try: %d, stdout: %v, stderr: %v, command run in Pod { %#v }", tries, i, stdout, stderr, podInfo)
 | 
						|
 | 
						|
			var output NetexecDialResponse
 | 
						|
			if err := json.Unmarshal([]byte(stdout), &output); err != nil {
 | 
						|
				framework.Logf("WARNING: Failed to unmarshal curl response. Cmd %v run in %v, output: %s, err: %v",
 | 
						|
					cmd, config.TestContainerPod.Name, stdout, err)
 | 
						|
				continue
 | 
						|
			}
 | 
						|
 | 
						|
			for _, hostName := range output.Responses {
 | 
						|
				trimmed := strings.TrimSpace(hostName)
 | 
						|
				if trimmed != "" {
 | 
						|
					eps.Insert(trimmed)
 | 
						|
				}
 | 
						|
			}
 | 
						|
			// TODO: get rid of this delay #36281
 | 
						|
			time.Sleep(hitEndpointRetryDelay)
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return eps, nil
 | 
						|
}
 | 
						|
 | 
						|
// GetResponseFromContainer executes a curl via kubectl exec in a container.
 | 
						|
func (config *NetworkingTestConfig) GetResponseFromContainer(ctx context.Context, protocol, dialCommand, containerIP, targetIP string, containerHTTPPort, targetPort int) (NetexecDialResponse, error) {
 | 
						|
	ipPort := net.JoinHostPort(containerIP, strconv.Itoa(containerHTTPPort))
 | 
						|
	cmd := makeCURLDialCommand(ipPort, dialCommand, protocol, targetIP, targetPort)
 | 
						|
 | 
						|
	stdout, stderr, err := e2epod.ExecShellInPodWithFullOutput(ctx, config.f, config.TestContainerPod.Name, cmd)
 | 
						|
	if err != nil {
 | 
						|
		return NetexecDialResponse{}, fmt.Errorf("failed to execute %q: %v, stdout: %q, stderr: %q", cmd, err, stdout, stderr)
 | 
						|
	}
 | 
						|
 | 
						|
	var output NetexecDialResponse
 | 
						|
	if err := json.Unmarshal([]byte(stdout), &output); err != nil {
 | 
						|
		return NetexecDialResponse{}, fmt.Errorf("failed to unmarshal curl response. Cmd %v run in %v, output: %s, err: %v",
 | 
						|
			cmd, config.TestContainerPod.Name, stdout, err)
 | 
						|
	}
 | 
						|
	return output, nil
 | 
						|
}
 | 
						|
 | 
						|
// GetResponseFromTestContainer executes a curl via kubectl exec in a test container.
 | 
						|
func (config *NetworkingTestConfig) GetResponseFromTestContainer(ctx context.Context, protocol, dialCommand, targetIP string, targetPort int) (NetexecDialResponse, error) {
 | 
						|
	return config.GetResponseFromContainer(ctx, protocol, dialCommand, config.TestContainerPod.Status.PodIP, targetIP, testContainerHTTPPort, targetPort)
 | 
						|
}
 | 
						|
 | 
						|
// GetHTTPCodeFromTestContainer executes a curl via kubectl exec in a test container and returns the status code.
 | 
						|
func (config *NetworkingTestConfig) GetHTTPCodeFromTestContainer(ctx context.Context, path, targetIP string, targetPort int) (int, error) {
 | 
						|
	cmd := fmt.Sprintf("curl -g -q -s -o /dev/null -w %%{http_code} http://%s%s",
 | 
						|
		net.JoinHostPort(targetIP, strconv.Itoa(targetPort)),
 | 
						|
		path)
 | 
						|
	stdout, stderr, err := e2epod.ExecShellInPodWithFullOutput(ctx, config.f, config.TestContainerPod.Name, cmd)
 | 
						|
	// We only care about the status code reported by curl,
 | 
						|
	// and want to return any other errors, such as cannot execute command in the Pod.
 | 
						|
	// If curl failed to connect to host, it would exit with code 7, which makes `ExecShellInPodWithFullOutput`
 | 
						|
	// return a non-nil error and output "000" to stdout.
 | 
						|
	if err != nil && len(stdout) == 0 {
 | 
						|
		return 0, fmt.Errorf("failed to execute %q: %v, stderr: %q", cmd, err, stderr)
 | 
						|
	}
 | 
						|
	code, err := strconv.Atoi(stdout)
 | 
						|
	if err != nil {
 | 
						|
		return 0, fmt.Errorf("failed to parse status code returned by healthz endpoint: %w, code: %s", err, stdout)
 | 
						|
	}
 | 
						|
	return code, nil
 | 
						|
}
 | 
						|
 | 
						|
// DialFromNode executes a tcp/udp curl/nc request based on protocol via kubectl exec
 | 
						|
// in a test container running with host networking.
 | 
						|
//   - minTries is the minimum number of curl/nc attempts required before declaring
 | 
						|
//     success. If 0, then we return as soon as all endpoints succeed.
 | 
						|
//   - There is no logical change to test results if faillures happen AFTER endpoints have succeeded,
 | 
						|
//     hence over-padding minTries will NOT reverse a successful result and is thus not very useful yet
 | 
						|
//     (See the TODO about checking probability, which isn't implemented yet).
 | 
						|
//   - maxTries is the maximum number of curl/echo attempts before an error is returned.  The
 | 
						|
//     smaller this number is, the less 'slack' there is for declaring success.
 | 
						|
//   - if maxTries < expectedEps, this test is guaranteed to return an error, because all endpoints won't be hit.
 | 
						|
//   - maxTries == minTries will return as soon as all endpoints succeed (or fail once maxTries is reached without
 | 
						|
//     success on all endpoints).
 | 
						|
//     In general its prudent to have a high enough level of minTries to guarantee that all pods get a fair chance at receiving traffic.
 | 
						|
func (config *NetworkingTestConfig) DialFromNode(ctx context.Context, protocol, targetIP string, targetPort, maxTries, minTries int, expectedEps sets.String) error {
 | 
						|
	var cmd string
 | 
						|
	if protocol == "udp" {
 | 
						|
		cmd = fmt.Sprintf("echo hostName | nc -w 1 -u %s %d", targetIP, targetPort)
 | 
						|
	} else {
 | 
						|
		ipPort := net.JoinHostPort(targetIP, strconv.Itoa(targetPort))
 | 
						|
		// The current versions of curl included in CentOS and RHEL distros
 | 
						|
		// misinterpret square brackets around IPv6 as globbing, so use the -g
 | 
						|
		// argument to disable globbing to handle the IPv6 case.
 | 
						|
		cmd = fmt.Sprintf("curl -g -q -s --max-time 15 --connect-timeout 1 http://%s/hostName", ipPort)
 | 
						|
	}
 | 
						|
 | 
						|
	// TODO: This simply tells us that we can reach the endpoints. Check that
 | 
						|
	// the probability of hitting a specific endpoint is roughly the same as
 | 
						|
	// hitting any other.
 | 
						|
	eps := sets.NewString()
 | 
						|
 | 
						|
	filterCmd := fmt.Sprintf("%s | grep -v '^\\s*$'", cmd)
 | 
						|
	framework.Logf("Going to poll %v on port %v at least %v times, with a maximum of %v tries before failing", targetIP, targetPort, minTries, maxTries)
 | 
						|
	for i := 0; i < maxTries; i++ {
 | 
						|
		stdout, stderr, err := e2epod.ExecShellInPodWithFullOutput(ctx, config.f, config.HostTestContainerPod.Name, filterCmd)
 | 
						|
		if err != nil || len(stderr) > 0 {
 | 
						|
			// A failure to exec command counts as a try, not a hard fail.
 | 
						|
			// Also note that we will keep failing for maxTries in tests where
 | 
						|
			// we confirm unreachability.
 | 
						|
			framework.Logf("Failed to execute %q: %v, stdout: %q, stderr: %q", filterCmd, err, stdout, stderr)
 | 
						|
		} else {
 | 
						|
			trimmed := strings.TrimSpace(stdout)
 | 
						|
			if trimmed != "" {
 | 
						|
				eps.Insert(trimmed)
 | 
						|
			}
 | 
						|
		}
 | 
						|
 | 
						|
		// Check against i+1 so we exit if minTries == maxTries.
 | 
						|
		if eps.Equal(expectedEps) && i+1 >= minTries {
 | 
						|
			framework.Logf("Found all %d expected endpoints: %+v", eps.Len(), eps.List())
 | 
						|
			return nil
 | 
						|
		}
 | 
						|
 | 
						|
		framework.Logf("Waiting for %+v endpoints (expected=%+v, actual=%+v)", expectedEps.Difference(eps).List(), expectedEps.List(), eps.List())
 | 
						|
 | 
						|
		// TODO: get rid of this delay #36281
 | 
						|
		time.Sleep(hitEndpointRetryDelay)
 | 
						|
	}
 | 
						|
 | 
						|
	config.diagnoseMissingEndpoints(eps)
 | 
						|
	return fmt.Errorf("failed to find expected endpoints, \ntries %d\nCommand %v\nretrieved %v\nexpected %v", maxTries, cmd, eps, expectedEps)
 | 
						|
}
 | 
						|
 | 
						|
// GetSelfURL executes a curl against the given path via kubectl exec into a
 | 
						|
// test container running with host networking, and fails if the output
 | 
						|
// doesn't match the expected string.
 | 
						|
func (config *NetworkingTestConfig) GetSelfURL(ctx context.Context, port int32, path string, expected string) {
 | 
						|
	cmd := fmt.Sprintf("curl -i -q -s --connect-timeout 1 http://localhost:%d%s", port, path)
 | 
						|
	ginkgo.By(fmt.Sprintf("Getting kube-proxy self URL %s", path))
 | 
						|
	config.executeCurlCmd(ctx, cmd, expected)
 | 
						|
}
 | 
						|
 | 
						|
// GetSelfURLStatusCode executes a curl against the given path via kubectl exec into a
 | 
						|
// test container running with host networking, and fails if the returned status
 | 
						|
// code doesn't match the expected string.
 | 
						|
func (config *NetworkingTestConfig) GetSelfURLStatusCode(ctx context.Context, port int32, path string, expected string) {
 | 
						|
	// check status code
 | 
						|
	cmd := fmt.Sprintf("curl -o /dev/null -i -q -s -w %%{http_code} --connect-timeout 1 http://localhost:%d%s", port, path)
 | 
						|
	ginkgo.By(fmt.Sprintf("Checking status code against http://localhost:%d%s", port, path))
 | 
						|
	config.executeCurlCmd(ctx, cmd, expected)
 | 
						|
}
 | 
						|
 | 
						|
func (config *NetworkingTestConfig) executeCurlCmd(ctx context.Context, cmd string, expected string) {
 | 
						|
	// These are arbitrary timeouts. The curl command should pass on first try,
 | 
						|
	// unless remote server is starved/bootstrapping/restarting etc.
 | 
						|
	const retryInterval = 1 * time.Second
 | 
						|
	const retryTimeout = 30 * time.Second
 | 
						|
	podName := config.HostTestContainerPod.Name
 | 
						|
	var msg string
 | 
						|
	if pollErr := wait.PollUntilContextTimeout(ctx, retryInterval, retryTimeout, true, func(ctx context.Context) (bool, error) {
 | 
						|
		stdout, err := e2epodoutput.RunHostCmd(config.Namespace, podName, cmd)
 | 
						|
		if err != nil {
 | 
						|
			msg = fmt.Sprintf("failed executing cmd %v in %v/%v: %v", cmd, config.Namespace, podName, err)
 | 
						|
			framework.Logf(msg)
 | 
						|
			return false, nil
 | 
						|
		}
 | 
						|
		if !strings.Contains(stdout, expected) {
 | 
						|
			msg = fmt.Sprintf("successfully executed %v in %v/%v, but output '%v' doesn't contain expected string '%v'", cmd, config.Namespace, podName, stdout, expected)
 | 
						|
			framework.Logf(msg)
 | 
						|
			return false, nil
 | 
						|
		}
 | 
						|
		return true, nil
 | 
						|
	}); pollErr != nil {
 | 
						|
		framework.Logf("\nOutput of kubectl describe pod %v/%v:\n", config.Namespace, podName)
 | 
						|
		desc, _ := e2ekubectl.RunKubectl(
 | 
						|
			config.Namespace, "describe", "pod", podName, fmt.Sprintf("--namespace=%v", config.Namespace))
 | 
						|
		framework.Logf("%s", desc)
 | 
						|
		framework.Failf("Timed out in %v: %v", retryTimeout, msg)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (config *NetworkingTestConfig) createNetShellPodSpec(podName, hostname string) *v1.Pod {
 | 
						|
	netexecArgs := []string{
 | 
						|
		"netexec",
 | 
						|
		fmt.Sprintf("--http-port=%d", EndpointHTTPPort),
 | 
						|
		fmt.Sprintf("--udp-port=%d", EndpointUDPPort),
 | 
						|
	}
 | 
						|
	// In case of hostnetwork endpoints, we want to bind the udp listener to specific ip addresses.
 | 
						|
	// In order to cover legacy AND dualstack, we pass both the host ip and the two pod ips. Agnhost
 | 
						|
	// removes duplicates and so this will listen on both addresses (or on the single existing one).
 | 
						|
	if config.EndpointsHostNetwork {
 | 
						|
		netexecArgs = append(netexecArgs, "--udp-listen-addresses=$(HOST_IP),$(POD_IPS)")
 | 
						|
	}
 | 
						|
 | 
						|
	probe := &v1.Probe{
 | 
						|
		InitialDelaySeconds: 10,
 | 
						|
		TimeoutSeconds:      30,
 | 
						|
		PeriodSeconds:       10,
 | 
						|
		SuccessThreshold:    1,
 | 
						|
		FailureThreshold:    3,
 | 
						|
		ProbeHandler: v1.ProbeHandler{
 | 
						|
			HTTPGet: &v1.HTTPGetAction{
 | 
						|
				Path: "/healthz",
 | 
						|
				Port: intstr.IntOrString{IntVal: EndpointHTTPPort},
 | 
						|
			},
 | 
						|
		},
 | 
						|
	}
 | 
						|
	pod := &v1.Pod{
 | 
						|
		TypeMeta: metav1.TypeMeta{
 | 
						|
			Kind:       "Pod",
 | 
						|
			APIVersion: "v1",
 | 
						|
		},
 | 
						|
		ObjectMeta: metav1.ObjectMeta{
 | 
						|
			Name:      podName,
 | 
						|
			Namespace: config.Namespace,
 | 
						|
		},
 | 
						|
		Spec: v1.PodSpec{
 | 
						|
			Containers: []v1.Container{
 | 
						|
				{
 | 
						|
					Name:            "webserver",
 | 
						|
					Image:           imageutils.GetE2EImage(imageutils.Agnhost),
 | 
						|
					ImagePullPolicy: v1.PullIfNotPresent,
 | 
						|
					Args:            netexecArgs,
 | 
						|
					Ports: []v1.ContainerPort{
 | 
						|
						{
 | 
						|
							Name:          "http",
 | 
						|
							ContainerPort: EndpointHTTPPort,
 | 
						|
						},
 | 
						|
						{
 | 
						|
							Name:          "udp",
 | 
						|
							ContainerPort: EndpointUDPPort,
 | 
						|
							Protocol:      v1.ProtocolUDP,
 | 
						|
						},
 | 
						|
					},
 | 
						|
					LivenessProbe:  probe,
 | 
						|
					ReadinessProbe: probe,
 | 
						|
				},
 | 
						|
			},
 | 
						|
			NodeSelector: map[string]string{
 | 
						|
				"kubernetes.io/hostname": hostname,
 | 
						|
			},
 | 
						|
		},
 | 
						|
	}
 | 
						|
	// we want sctp to be optional as it will load the sctp kernel module
 | 
						|
	if config.SCTPEnabled {
 | 
						|
		pod.Spec.Containers[0].Args = append(pod.Spec.Containers[0].Args, fmt.Sprintf("--sctp-port=%d", EndpointSCTPPort))
 | 
						|
		pod.Spec.Containers[0].Ports = append(pod.Spec.Containers[0].Ports, v1.ContainerPort{
 | 
						|
			Name:          "sctp",
 | 
						|
			ContainerPort: EndpointSCTPPort,
 | 
						|
			Protocol:      v1.ProtocolSCTP,
 | 
						|
		})
 | 
						|
	}
 | 
						|
 | 
						|
	if config.EndpointsHostNetwork {
 | 
						|
		pod.Spec.Containers[0].Env = []v1.EnvVar{
 | 
						|
			{
 | 
						|
				Name: "HOST_IP",
 | 
						|
				ValueFrom: &v1.EnvVarSource{
 | 
						|
					FieldRef: &v1.ObjectFieldSelector{
 | 
						|
						FieldPath: "status.hostIP",
 | 
						|
					},
 | 
						|
				},
 | 
						|
			},
 | 
						|
			{
 | 
						|
				Name: "POD_IPS",
 | 
						|
				ValueFrom: &v1.EnvVarSource{
 | 
						|
					FieldRef: &v1.ObjectFieldSelector{
 | 
						|
						FieldPath: "status.podIPs",
 | 
						|
					},
 | 
						|
				},
 | 
						|
			},
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return pod
 | 
						|
}
 | 
						|
 | 
						|
func (config *NetworkingTestConfig) createTestPodSpec() *v1.Pod {
 | 
						|
	pod := &v1.Pod{
 | 
						|
		TypeMeta: metav1.TypeMeta{
 | 
						|
			Kind:       "Pod",
 | 
						|
			APIVersion: "v1",
 | 
						|
		},
 | 
						|
		ObjectMeta: metav1.ObjectMeta{
 | 
						|
			Name:      testPodName,
 | 
						|
			Namespace: config.Namespace,
 | 
						|
		},
 | 
						|
		Spec: v1.PodSpec{
 | 
						|
			Containers: []v1.Container{
 | 
						|
				{
 | 
						|
					Name:            "webserver",
 | 
						|
					Image:           imageutils.GetE2EImage(imageutils.Agnhost),
 | 
						|
					ImagePullPolicy: v1.PullIfNotPresent,
 | 
						|
					Args: []string{
 | 
						|
						"netexec",
 | 
						|
						fmt.Sprintf("--http-port=%d", testContainerHTTPPort),
 | 
						|
					},
 | 
						|
					Ports: []v1.ContainerPort{
 | 
						|
						{
 | 
						|
							Name:          "http",
 | 
						|
							ContainerPort: testContainerHTTPPort,
 | 
						|
						},
 | 
						|
					},
 | 
						|
				},
 | 
						|
			},
 | 
						|
		},
 | 
						|
	}
 | 
						|
	return pod
 | 
						|
}
 | 
						|
 | 
						|
func (config *NetworkingTestConfig) createNodePortServiceSpec(svcName string, selector map[string]string, enableSessionAffinity bool) *v1.Service {
 | 
						|
	sessionAffinity := v1.ServiceAffinityNone
 | 
						|
	if enableSessionAffinity {
 | 
						|
		sessionAffinity = v1.ServiceAffinityClientIP
 | 
						|
	}
 | 
						|
	res := &v1.Service{
 | 
						|
		ObjectMeta: metav1.ObjectMeta{
 | 
						|
			Name: svcName,
 | 
						|
		},
 | 
						|
		Spec: v1.ServiceSpec{
 | 
						|
			Type: v1.ServiceTypeNodePort,
 | 
						|
			Ports: []v1.ServicePort{
 | 
						|
				{Port: ClusterHTTPPort, Name: "http", Protocol: v1.ProtocolTCP, TargetPort: intstr.FromInt32(EndpointHTTPPort)},
 | 
						|
				{Port: ClusterUDPPort, Name: "udp", Protocol: v1.ProtocolUDP, TargetPort: intstr.FromInt32(EndpointUDPPort)},
 | 
						|
			},
 | 
						|
			Selector:        selector,
 | 
						|
			SessionAffinity: sessionAffinity,
 | 
						|
		},
 | 
						|
	}
 | 
						|
 | 
						|
	if config.SCTPEnabled {
 | 
						|
		res.Spec.Ports = append(res.Spec.Ports, v1.ServicePort{Port: ClusterSCTPPort, Name: "sctp", Protocol: v1.ProtocolSCTP, TargetPort: intstr.FromInt32(EndpointSCTPPort)})
 | 
						|
	}
 | 
						|
	if config.DualStackEnabled {
 | 
						|
		requireDual := v1.IPFamilyPolicyRequireDualStack
 | 
						|
		res.Spec.IPFamilyPolicy = &requireDual
 | 
						|
	}
 | 
						|
	return res
 | 
						|
}
 | 
						|
 | 
						|
func (config *NetworkingTestConfig) createNodePortService(ctx context.Context, selector map[string]string) {
 | 
						|
	config.NodePortService = config.CreateService(ctx, config.createNodePortServiceSpec(nodePortServiceName, selector, false))
 | 
						|
}
 | 
						|
 | 
						|
func (config *NetworkingTestConfig) createSessionAffinityService(ctx context.Context, selector map[string]string) {
 | 
						|
	config.SessionAffinityService = config.CreateService(ctx, config.createNodePortServiceSpec(sessionAffinityServiceName, selector, true))
 | 
						|
}
 | 
						|
 | 
						|
// DeleteNodePortService deletes NodePort service.
 | 
						|
func (config *NetworkingTestConfig) DeleteNodePortService(ctx context.Context) {
 | 
						|
	err := config.getServiceClient().Delete(ctx, config.NodePortService.Name, metav1.DeleteOptions{})
 | 
						|
	framework.ExpectNoError(err, "error while deleting NodePortService. err:%v)", err)
 | 
						|
	time.Sleep(15 * time.Second) // wait for kube-proxy to catch up with the service being deleted.
 | 
						|
}
 | 
						|
 | 
						|
func (config *NetworkingTestConfig) createTestPods(ctx context.Context) {
 | 
						|
	testContainerPod := config.createTestPodSpec()
 | 
						|
	hostTestContainerPod := e2epod.NewExecPodSpec(config.Namespace, hostTestPodName, config.HostNetwork)
 | 
						|
 | 
						|
	config.createPod(ctx, testContainerPod)
 | 
						|
	if config.HostNetwork {
 | 
						|
		config.createPod(ctx, hostTestContainerPod)
 | 
						|
	}
 | 
						|
 | 
						|
	framework.ExpectNoError(e2epod.WaitForPodNameRunningInNamespace(ctx, config.f.ClientSet, testContainerPod.Name, config.f.Namespace.Name))
 | 
						|
 | 
						|
	var err error
 | 
						|
	config.TestContainerPod, err = config.getPodClient().Get(ctx, testContainerPod.Name, metav1.GetOptions{})
 | 
						|
	if err != nil {
 | 
						|
		framework.Failf("Failed to retrieve %s pod: %v", testContainerPod.Name, err)
 | 
						|
	}
 | 
						|
 | 
						|
	if config.HostNetwork {
 | 
						|
		framework.ExpectNoError(e2epod.WaitForPodNameRunningInNamespace(ctx, config.f.ClientSet, hostTestContainerPod.Name, config.f.Namespace.Name))
 | 
						|
		config.HostTestContainerPod, err = config.getPodClient().Get(ctx, hostTestContainerPod.Name, metav1.GetOptions{})
 | 
						|
		if err != nil {
 | 
						|
			framework.Failf("Failed to retrieve %s pod: %v", hostTestContainerPod.Name, err)
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// CreateService creates the provided service in config.Namespace and returns created service
 | 
						|
func (config *NetworkingTestConfig) CreateService(ctx context.Context, serviceSpec *v1.Service) *v1.Service {
 | 
						|
	_, err := config.getServiceClient().Create(ctx, serviceSpec, metav1.CreateOptions{})
 | 
						|
	framework.ExpectNoError(err, fmt.Sprintf("Failed to create %s service: %v", serviceSpec.Name, err))
 | 
						|
 | 
						|
	err = WaitForService(ctx, config.f.ClientSet, config.Namespace, serviceSpec.Name, true, 5*time.Second, 45*time.Second)
 | 
						|
	framework.ExpectNoError(err, fmt.Sprintf("error while waiting for service:%s err: %v", serviceSpec.Name, err))
 | 
						|
 | 
						|
	createdService, err := config.getServiceClient().Get(ctx, serviceSpec.Name, metav1.GetOptions{})
 | 
						|
	framework.ExpectNoError(err, fmt.Sprintf("Failed to create %s service: %v", serviceSpec.Name, err))
 | 
						|
 | 
						|
	return createdService
 | 
						|
}
 | 
						|
 | 
						|
// setupCore sets up the pods and core test config
 | 
						|
// mainly for simplified node e2e setup
 | 
						|
func (config *NetworkingTestConfig) setupCore(ctx context.Context, selector map[string]string) {
 | 
						|
	ginkgo.By("Creating the service pods in kubernetes")
 | 
						|
	podName := "netserver"
 | 
						|
	config.EndpointPods = config.createNetProxyPods(ctx, podName, selector)
 | 
						|
 | 
						|
	ginkgo.By("Creating test pods")
 | 
						|
	config.createTestPods(ctx)
 | 
						|
 | 
						|
	epCount := len(config.EndpointPods)
 | 
						|
 | 
						|
	// Note that this is not O(n^2) in practice, because epCount SHOULD be < 10.  In cases that epCount is > 10, this would be prohibitively large.
 | 
						|
	// Check maxNetProxyPodsCount for details.
 | 
						|
	config.MaxTries = epCount*epCount + testTries
 | 
						|
	framework.Logf("Setting MaxTries for pod polling to %v for networking test based on endpoint count %v", config.MaxTries, epCount)
 | 
						|
}
 | 
						|
 | 
						|
// setup includes setupCore and also sets up services
 | 
						|
func (config *NetworkingTestConfig) setup(ctx context.Context, selector map[string]string) {
 | 
						|
	config.setupCore(ctx, selector)
 | 
						|
 | 
						|
	ginkgo.By("Getting node addresses")
 | 
						|
	framework.ExpectNoError(e2enode.WaitForAllNodesSchedulable(ctx, config.f.ClientSet, 10*time.Minute))
 | 
						|
	nodeList, err := e2enode.GetReadySchedulableNodes(ctx, config.f.ClientSet)
 | 
						|
	framework.ExpectNoError(err)
 | 
						|
 | 
						|
	e2eskipper.SkipUnlessNodeCountIsAtLeast(2)
 | 
						|
	config.Nodes = nodeList.Items
 | 
						|
 | 
						|
	ginkgo.By("Creating the service on top of the pods in kubernetes")
 | 
						|
	config.createNodePortService(ctx, selector)
 | 
						|
	config.createSessionAffinityService(ctx, selector)
 | 
						|
 | 
						|
	for _, p := range config.NodePortService.Spec.Ports {
 | 
						|
		switch p.Protocol {
 | 
						|
		case v1.ProtocolUDP:
 | 
						|
			config.NodeUDPPort = int(p.NodePort)
 | 
						|
		case v1.ProtocolTCP:
 | 
						|
			config.NodeHTTPPort = int(p.NodePort)
 | 
						|
		case v1.ProtocolSCTP:
 | 
						|
			config.NodeSCTPPort = int(p.NodePort)
 | 
						|
		default:
 | 
						|
			continue
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	// obtain the ClusterIP
 | 
						|
	config.ClusterIP = config.NodePortService.Spec.ClusterIP
 | 
						|
	if config.DualStackEnabled {
 | 
						|
		config.SecondaryClusterIP = config.NodePortService.Spec.ClusterIPs[1]
 | 
						|
	}
 | 
						|
 | 
						|
	// Obtain the primary IP family of the Cluster based on the first ClusterIP
 | 
						|
	// TODO: Eventually we should just be getting these from Spec.IPFamilies
 | 
						|
	// but for now that would only if the feature gate is enabled.
 | 
						|
	family := v1.IPv4Protocol
 | 
						|
	secondaryFamily := v1.IPv6Protocol
 | 
						|
	if netutils.IsIPv6String(config.ClusterIP) {
 | 
						|
		family = v1.IPv6Protocol
 | 
						|
		secondaryFamily = v1.IPv4Protocol
 | 
						|
	}
 | 
						|
	if config.PreferExternalAddresses {
 | 
						|
		// Get Node IPs from the cluster, ExternalIPs take precedence
 | 
						|
		config.NodeIP = e2enode.FirstAddressByTypeAndFamily(nodeList, v1.NodeExternalIP, family)
 | 
						|
	}
 | 
						|
	if config.NodeIP == "" {
 | 
						|
		config.NodeIP = e2enode.FirstAddressByTypeAndFamily(nodeList, v1.NodeInternalIP, family)
 | 
						|
	}
 | 
						|
	if config.DualStackEnabled {
 | 
						|
		if config.PreferExternalAddresses {
 | 
						|
			config.SecondaryNodeIP = e2enode.FirstAddressByTypeAndFamily(nodeList, v1.NodeExternalIP, secondaryFamily)
 | 
						|
		}
 | 
						|
		if config.SecondaryNodeIP == "" {
 | 
						|
			config.SecondaryNodeIP = e2enode.FirstAddressByTypeAndFamily(nodeList, v1.NodeInternalIP, secondaryFamily)
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	ginkgo.By("Waiting for NodePort service to expose endpoint")
 | 
						|
	err = framework.WaitForServiceEndpointsNum(ctx, config.f.ClientSet, config.Namespace, nodePortServiceName, len(config.EndpointPods), time.Second, wait.ForeverTestTimeout)
 | 
						|
	framework.ExpectNoError(err, "failed to validate endpoints for service %s in namespace: %s", nodePortServiceName, config.Namespace)
 | 
						|
	ginkgo.By("Waiting for Session Affinity service to expose endpoint")
 | 
						|
	err = framework.WaitForServiceEndpointsNum(ctx, config.f.ClientSet, config.Namespace, sessionAffinityServiceName, len(config.EndpointPods), time.Second, wait.ForeverTestTimeout)
 | 
						|
	framework.ExpectNoError(err, "failed to validate endpoints for service %s in namespace: %s", sessionAffinityServiceName, config.Namespace)
 | 
						|
}
 | 
						|
 | 
						|
func (config *NetworkingTestConfig) createNetProxyPods(ctx context.Context, podName string, selector map[string]string) []*v1.Pod {
 | 
						|
	framework.ExpectNoError(e2enode.WaitForAllNodesSchedulable(ctx, config.f.ClientSet, 10*time.Minute))
 | 
						|
	nodeList, err := e2enode.GetBoundedReadySchedulableNodes(ctx, config.f.ClientSet, maxNetProxyPodsCount)
 | 
						|
	framework.ExpectNoError(err)
 | 
						|
	nodes := nodeList.Items
 | 
						|
 | 
						|
	// create pods, one for each node
 | 
						|
	createdPods := make([]*v1.Pod, 0, len(nodes))
 | 
						|
	for i, n := range nodes {
 | 
						|
		podName := fmt.Sprintf("%s-%d", podName, i)
 | 
						|
		hostname, _ := n.Labels["kubernetes.io/hostname"]
 | 
						|
		pod := config.createNetShellPodSpec(podName, hostname)
 | 
						|
		pod.ObjectMeta.Labels = selector
 | 
						|
		pod.Spec.HostNetwork = config.EndpointsHostNetwork
 | 
						|
 | 
						|
		// NOTE(claudiub): In order to use HostNetwork on Windows, we need to use Privileged Containers.
 | 
						|
		if pod.Spec.HostNetwork && framework.NodeOSDistroIs("windows") {
 | 
						|
			e2epod.WithWindowsHostProcess(pod, "")
 | 
						|
		}
 | 
						|
		createdPod := config.createPod(ctx, pod)
 | 
						|
		createdPods = append(createdPods, createdPod)
 | 
						|
	}
 | 
						|
 | 
						|
	// wait that all of them are up
 | 
						|
	runningPods := make([]*v1.Pod, 0, len(nodes))
 | 
						|
	for _, p := range createdPods {
 | 
						|
		framework.ExpectNoError(e2epod.WaitTimeoutForPodReadyInNamespace(ctx, config.f.ClientSet, p.Name, config.f.Namespace.Name, framework.PodStartTimeout))
 | 
						|
		rp, err := config.getPodClient().Get(ctx, p.Name, metav1.GetOptions{})
 | 
						|
		framework.ExpectNoError(err)
 | 
						|
		runningPods = append(runningPods, rp)
 | 
						|
	}
 | 
						|
 | 
						|
	return runningPods
 | 
						|
}
 | 
						|
 | 
						|
// DeleteNetProxyPod deletes the first endpoint pod and waits for it being removed.
 | 
						|
func (config *NetworkingTestConfig) DeleteNetProxyPod(ctx context.Context) {
 | 
						|
	pod := config.EndpointPods[0]
 | 
						|
	framework.ExpectNoError(config.getPodClient().Delete(ctx, pod.Name, metav1.DeleteOptions{}))
 | 
						|
	config.EndpointPods = config.EndpointPods[1:]
 | 
						|
	// wait for pod being deleted.
 | 
						|
	err := e2epod.WaitForPodNotFoundInNamespace(ctx, config.f.ClientSet, pod.Name, config.Namespace, config.f.Timeouts.PodDelete)
 | 
						|
	if err != nil {
 | 
						|
		framework.Failf("Failed to delete %s pod: %v", pod.Name, err)
 | 
						|
	}
 | 
						|
	// wait for endpoint being removed.
 | 
						|
	err = framework.WaitForServiceEndpointsNum(ctx, config.f.ClientSet, config.Namespace, nodePortServiceName, len(config.EndpointPods), time.Second, wait.ForeverTestTimeout)
 | 
						|
	if err != nil {
 | 
						|
		framework.Failf("Failed to remove endpoint from service: %s", nodePortServiceName)
 | 
						|
	}
 | 
						|
	// wait for kube-proxy to catch up with the pod being deleted.
 | 
						|
	time.Sleep(5 * time.Second)
 | 
						|
}
 | 
						|
 | 
						|
func (config *NetworkingTestConfig) createPod(ctx context.Context, pod *v1.Pod) *v1.Pod {
 | 
						|
	return config.getPodClient().Create(ctx, pod)
 | 
						|
}
 | 
						|
 | 
						|
func (config *NetworkingTestConfig) getPodClient() *e2epod.PodClient {
 | 
						|
	if config.podClient == nil {
 | 
						|
		config.podClient = e2epod.NewPodClient(config.f)
 | 
						|
	}
 | 
						|
	return config.podClient
 | 
						|
}
 | 
						|
 | 
						|
func (config *NetworkingTestConfig) getServiceClient() coreclientset.ServiceInterface {
 | 
						|
	return config.f.ClientSet.CoreV1().Services(config.Namespace)
 | 
						|
}
 | 
						|
 | 
						|
// HTTPPokeParams is a struct for HTTP poke parameters.
 | 
						|
type HTTPPokeParams struct {
 | 
						|
	Timeout        time.Duration // default = 10 secs
 | 
						|
	ExpectCode     int           // default = 200
 | 
						|
	BodyContains   string
 | 
						|
	RetriableCodes []int
 | 
						|
	EnableHTTPS    bool
 | 
						|
}
 | 
						|
 | 
						|
// HTTPPokeResult is a struct for HTTP poke result.
 | 
						|
type HTTPPokeResult struct {
 | 
						|
	Status HTTPPokeStatus
 | 
						|
	Code   int    // HTTP code: 0 if the connection was not made
 | 
						|
	Error  error  // if there was any error
 | 
						|
	Body   []byte // if code != 0
 | 
						|
}
 | 
						|
 | 
						|
// HTTPPokeStatus is string for representing HTTP poke status.
 | 
						|
type HTTPPokeStatus string
 | 
						|
 | 
						|
const (
 | 
						|
	// HTTPSuccess is HTTP poke status which is success.
 | 
						|
	HTTPSuccess HTTPPokeStatus = "Success"
 | 
						|
	// HTTPError is HTTP poke status which is error.
 | 
						|
	HTTPError HTTPPokeStatus = "UnknownError"
 | 
						|
	// HTTPTimeout is HTTP poke status which is timeout.
 | 
						|
	HTTPTimeout HTTPPokeStatus = "TimedOut"
 | 
						|
	// HTTPRefused is HTTP poke status which is connection refused.
 | 
						|
	HTTPRefused HTTPPokeStatus = "ConnectionRefused"
 | 
						|
	// HTTPRetryCode is HTTP poke status which is retry code.
 | 
						|
	HTTPRetryCode HTTPPokeStatus = "RetryCode"
 | 
						|
	// HTTPWrongCode is HTTP poke status which is wrong code.
 | 
						|
	HTTPWrongCode HTTPPokeStatus = "WrongCode"
 | 
						|
	// HTTPBadResponse is HTTP poke status which is bad response.
 | 
						|
	HTTPBadResponse HTTPPokeStatus = "BadResponse"
 | 
						|
	// Any time we add new errors, we should audit all callers of this.
 | 
						|
)
 | 
						|
 | 
						|
// PokeHTTP tries to connect to a host on a port for a given URL path.  Callers
 | 
						|
// can specify additional success parameters, if desired.
 | 
						|
//
 | 
						|
// The result status will be characterized as precisely as possible, given the
 | 
						|
// known users of this.
 | 
						|
//
 | 
						|
// The result code will be zero in case of any failure to connect, or non-zero
 | 
						|
// if the HTTP transaction completed (even if the other test params make this a
 | 
						|
// failure).
 | 
						|
//
 | 
						|
// The result error will be populated for any status other than Success.
 | 
						|
//
 | 
						|
// The result body will be populated if the HTTP transaction was completed, even
 | 
						|
// if the other test params make this a failure).
 | 
						|
func PokeHTTP(host string, port int, path string, params *HTTPPokeParams) HTTPPokeResult {
 | 
						|
	// Set default params.
 | 
						|
	if params == nil {
 | 
						|
		params = &HTTPPokeParams{}
 | 
						|
	}
 | 
						|
 | 
						|
	hostPort := net.JoinHostPort(host, strconv.Itoa(port))
 | 
						|
	var url string
 | 
						|
	if params.EnableHTTPS {
 | 
						|
		url = fmt.Sprintf("https://%s%s", hostPort, path)
 | 
						|
	} else {
 | 
						|
		url = fmt.Sprintf("http://%s%s", hostPort, path)
 | 
						|
	}
 | 
						|
 | 
						|
	ret := HTTPPokeResult{}
 | 
						|
 | 
						|
	// Sanity check inputs, because it has happened.  These are the only things
 | 
						|
	// that should hard fail the test - they are basically ASSERT()s.
 | 
						|
	if host == "" {
 | 
						|
		framework.Failf("Got empty host for HTTP poke (%s)", url)
 | 
						|
		return ret
 | 
						|
	}
 | 
						|
	if port == 0 {
 | 
						|
		framework.Failf("Got port==0 for HTTP poke (%s)", url)
 | 
						|
		return ret
 | 
						|
	}
 | 
						|
 | 
						|
	if params.ExpectCode == 0 {
 | 
						|
		params.ExpectCode = http.StatusOK
 | 
						|
	}
 | 
						|
 | 
						|
	if params.Timeout == 0 {
 | 
						|
		params.Timeout = 10 * time.Second
 | 
						|
	}
 | 
						|
 | 
						|
	framework.Logf("Poking %q", url)
 | 
						|
 | 
						|
	resp, err := httpGetNoConnectionPoolTimeout(url, params.Timeout)
 | 
						|
	if err != nil {
 | 
						|
		ret.Error = err
 | 
						|
		neterr, ok := err.(net.Error)
 | 
						|
		if ok && neterr.Timeout() {
 | 
						|
			ret.Status = HTTPTimeout
 | 
						|
		} else if strings.Contains(err.Error(), "connection refused") {
 | 
						|
			ret.Status = HTTPRefused
 | 
						|
		} else {
 | 
						|
			ret.Status = HTTPError
 | 
						|
		}
 | 
						|
		framework.Logf("Poke(%q): %v", url, err)
 | 
						|
		return ret
 | 
						|
	}
 | 
						|
 | 
						|
	ret.Code = resp.StatusCode
 | 
						|
 | 
						|
	defer resp.Body.Close()
 | 
						|
	body, err := io.ReadAll(resp.Body)
 | 
						|
	if err != nil {
 | 
						|
		ret.Status = HTTPError
 | 
						|
		ret.Error = fmt.Errorf("error reading HTTP body: %w", err)
 | 
						|
		framework.Logf("Poke(%q): %v", url, ret.Error)
 | 
						|
		return ret
 | 
						|
	}
 | 
						|
	ret.Body = make([]byte, len(body))
 | 
						|
	copy(ret.Body, body)
 | 
						|
 | 
						|
	if resp.StatusCode != params.ExpectCode {
 | 
						|
		for _, code := range params.RetriableCodes {
 | 
						|
			if resp.StatusCode == code {
 | 
						|
				ret.Error = fmt.Errorf("retriable status code: %d", resp.StatusCode)
 | 
						|
				ret.Status = HTTPRetryCode
 | 
						|
				framework.Logf("Poke(%q): %v", url, ret.Error)
 | 
						|
				return ret
 | 
						|
			}
 | 
						|
		}
 | 
						|
		ret.Status = HTTPWrongCode
 | 
						|
		ret.Error = fmt.Errorf("bad status code: %d", resp.StatusCode)
 | 
						|
		framework.Logf("Poke(%q): %v", url, ret.Error)
 | 
						|
		return ret
 | 
						|
	}
 | 
						|
 | 
						|
	if params.BodyContains != "" && !strings.Contains(string(body), params.BodyContains) {
 | 
						|
		ret.Status = HTTPBadResponse
 | 
						|
		ret.Error = fmt.Errorf("response does not contain expected substring: %q", string(body))
 | 
						|
		framework.Logf("Poke(%q): %v", url, ret.Error)
 | 
						|
		return ret
 | 
						|
	}
 | 
						|
 | 
						|
	ret.Status = HTTPSuccess
 | 
						|
	framework.Logf("Poke(%q): success", url)
 | 
						|
	return ret
 | 
						|
}
 | 
						|
 | 
						|
// Does an HTTP GET, but does not reuse TCP connections
 | 
						|
// This masks problems where the iptables rule has changed, but we don't see it
 | 
						|
func httpGetNoConnectionPoolTimeout(url string, timeout time.Duration) (*http.Response, error) {
 | 
						|
	tr := utilnet.SetTransportDefaults(&http.Transport{
 | 
						|
		DisableKeepAlives: true,
 | 
						|
		TLSClientConfig:   &tls.Config{InsecureSkipVerify: true},
 | 
						|
	})
 | 
						|
	client := &http.Client{
 | 
						|
		Transport: tr,
 | 
						|
		Timeout:   timeout,
 | 
						|
	}
 | 
						|
 | 
						|
	return client.Get(url)
 | 
						|
}
 | 
						|
 | 
						|
// TestUnderTemporaryNetworkFailure blocks outgoing network traffic on 'node'. Then runs testFunc and returns its status.
 | 
						|
// At the end (even in case of errors), the network traffic is brought back to normal.
 | 
						|
// This function executes commands on a node so it will work only for some
 | 
						|
// environments.
 | 
						|
func TestUnderTemporaryNetworkFailure(ctx context.Context, c clientset.Interface, ns string, node *v1.Node, testFunc func(ctx context.Context)) {
 | 
						|
	host, err := e2enode.GetSSHExternalIP(node)
 | 
						|
	if err != nil {
 | 
						|
		framework.Failf("Error getting node external ip : %v", err)
 | 
						|
	}
 | 
						|
	controlPlaneAddresses := framework.GetControlPlaneAddresses(ctx, c)
 | 
						|
	ginkgo.By(fmt.Sprintf("block network traffic from node %s to the control plane", node.Name))
 | 
						|
	defer func() {
 | 
						|
		// This code will execute even if setting the iptables rule failed.
 | 
						|
		// It is on purpose because we may have an error even if the new rule
 | 
						|
		// had been inserted. (yes, we could look at the error code and ssh error
 | 
						|
		// separately, but I prefer to stay on the safe side).
 | 
						|
		ginkgo.By(fmt.Sprintf("Unblock network traffic from node %s to the control plane", node.Name))
 | 
						|
		for _, instanceAddress := range controlPlaneAddresses {
 | 
						|
			UnblockNetwork(ctx, host, instanceAddress)
 | 
						|
		}
 | 
						|
	}()
 | 
						|
 | 
						|
	framework.Logf("Waiting %v to ensure node %s is ready before beginning test...", resizeNodeReadyTimeout, node.Name)
 | 
						|
	if !e2enode.WaitConditionToBe(ctx, c, node.Name, v1.NodeReady, true, resizeNodeReadyTimeout) {
 | 
						|
		framework.Failf("Node %s did not become ready within %v", node.Name, resizeNodeReadyTimeout)
 | 
						|
	}
 | 
						|
	for _, instanceAddress := range controlPlaneAddresses {
 | 
						|
		BlockNetwork(ctx, host, instanceAddress)
 | 
						|
	}
 | 
						|
 | 
						|
	framework.Logf("Waiting %v for node %s to be not ready after simulated network failure", resizeNodeNotReadyTimeout, node.Name)
 | 
						|
	if !e2enode.WaitConditionToBe(ctx, c, node.Name, v1.NodeReady, false, resizeNodeNotReadyTimeout) {
 | 
						|
		framework.Failf("Node %s did not become not-ready within %v", node.Name, resizeNodeNotReadyTimeout)
 | 
						|
	}
 | 
						|
 | 
						|
	testFunc(ctx)
 | 
						|
	// network traffic is unblocked in a deferred function
 | 
						|
}
 | 
						|
 | 
						|
// BlockNetwork blocks network between the given from value and the given to value.
 | 
						|
// The following helper functions can block/unblock network from source
 | 
						|
// host to destination host by manipulating iptable rules.
 | 
						|
// This function assumes it can ssh to the source host.
 | 
						|
//
 | 
						|
// Caution:
 | 
						|
// Recommend to input IP instead of hostnames. Using hostnames will cause iptables to
 | 
						|
// do a DNS lookup to resolve the name to an IP address, which will
 | 
						|
// slow down the test and cause it to fail if DNS is absent or broken.
 | 
						|
//
 | 
						|
// Suggested usage pattern:
 | 
						|
//
 | 
						|
//	func foo() {
 | 
						|
//		...
 | 
						|
//		defer UnblockNetwork(from, to)
 | 
						|
//		BlockNetwork(from, to)
 | 
						|
//		...
 | 
						|
//	}
 | 
						|
func BlockNetwork(ctx context.Context, from string, to string) {
 | 
						|
	framework.Logf("block network traffic from %s to %s", from, to)
 | 
						|
	iptablesRule := fmt.Sprintf("OUTPUT --destination %s --jump REJECT", to)
 | 
						|
	dropCmd := fmt.Sprintf("sudo iptables --insert %s", iptablesRule)
 | 
						|
	if result, err := e2essh.SSH(ctx, dropCmd, from, framework.TestContext.Provider); result.Code != 0 || err != nil {
 | 
						|
		e2essh.LogResult(result)
 | 
						|
		framework.Failf("Unexpected error: %v", err)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// UnblockNetwork unblocks network between the given from value and the given to value.
 | 
						|
func UnblockNetwork(ctx context.Context, from string, to string) {
 | 
						|
	framework.Logf("Unblock network traffic from %s to %s", from, to)
 | 
						|
	iptablesRule := fmt.Sprintf("OUTPUT --destination %s --jump REJECT", to)
 | 
						|
	undropCmd := fmt.Sprintf("sudo iptables --delete %s", iptablesRule)
 | 
						|
	// Undrop command may fail if the rule has never been created.
 | 
						|
	// In such case we just lose 30 seconds, but the cluster is healthy.
 | 
						|
	// But if the rule had been created and removing it failed, the node is broken and
 | 
						|
	// not coming back. Subsequent tests will run or fewer nodes (some of the tests
 | 
						|
	// may fail). Manual intervention is required in such case (recreating the
 | 
						|
	// cluster solves the problem too).
 | 
						|
	err := wait.PollUntilContextTimeout(ctx, time.Millisecond*100, time.Second*30, false, func(ctx context.Context) (bool, error) {
 | 
						|
		result, err := e2essh.SSH(ctx, undropCmd, from, framework.TestContext.Provider)
 | 
						|
		if result.Code == 0 && err == nil {
 | 
						|
			return true, nil
 | 
						|
		}
 | 
						|
		e2essh.LogResult(result)
 | 
						|
		if err != nil {
 | 
						|
			framework.Logf("Unexpected error: %v", err)
 | 
						|
		}
 | 
						|
		return false, nil
 | 
						|
	})
 | 
						|
	if err != nil {
 | 
						|
		framework.Failf("Failed to remove the iptable REJECT rule. Manual intervention is "+
 | 
						|
			"required on host %s: remove rule %s, if exists", from, iptablesRule)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// WaitForService waits until the service appears (exist == true), or disappears (exist == false)
 | 
						|
func WaitForService(ctx context.Context, c clientset.Interface, namespace, name string, exist bool, interval, timeout time.Duration) error {
 | 
						|
	err := wait.PollUntilContextTimeout(ctx, interval, timeout, true, func(ctx context.Context) (bool, error) {
 | 
						|
		_, err := c.CoreV1().Services(namespace).Get(ctx, name, metav1.GetOptions{})
 | 
						|
		switch {
 | 
						|
		case err == nil:
 | 
						|
			framework.Logf("Service %s in namespace %s found.", name, namespace)
 | 
						|
			return exist, nil
 | 
						|
		case apierrors.IsNotFound(err):
 | 
						|
			framework.Logf("Service %s in namespace %s disappeared.", name, namespace)
 | 
						|
			return !exist, nil
 | 
						|
		case err != nil:
 | 
						|
			framework.Logf("Non-retryable failure while getting service.")
 | 
						|
			return false, err
 | 
						|
		default:
 | 
						|
			framework.Logf("Get service %s in namespace %s failed: %v", name, namespace, err)
 | 
						|
			return false, nil
 | 
						|
		}
 | 
						|
	})
 | 
						|
	if err != nil {
 | 
						|
		stateMsg := map[bool]string{true: "to appear", false: "to disappear"}
 | 
						|
		return fmt.Errorf("error waiting for service %s/%s %s: %w", namespace, name, stateMsg[exist], err)
 | 
						|
	}
 | 
						|
	return nil
 | 
						|
}
 |