e2e/network_policy: using expected==observed as condition for polling in probeConnectivity function
This commit is contained in:
		| @@ -34,15 +34,10 @@ import ( | |||||||
| ) | ) | ||||||
|  |  | ||||||
| // defaultPollIntervalSeconds [seconds] is the default value for which the Prober will wait before attempting next attempt. | // defaultPollIntervalSeconds [seconds] is the default value for which the Prober will wait before attempting next attempt. | ||||||
| const defaultPollIntervalSeconds = 2 | const defaultPollIntervalSeconds = 1 | ||||||
|  |  | ||||||
| // defaultPollTimeoutSeconds [seconds] is the default timeout when polling on probes. | // defaultPollTimeoutSeconds [seconds] is the default timeout when polling on probes. | ||||||
| // using this value leads to a minimum of 2 attempts for every probe | const defaultPollTimeoutSeconds = 10 | ||||||
| const defaultPollTimeoutSeconds = 1 |  | ||||||
|  |  | ||||||
| // maxPollTimeoutSeconds [seconds] is the max timeout when polling on probes, this should only be used when expect a |  | ||||||
| // successful probe; use defaultPollTimeout otherwise |  | ||||||
| const maxPollTimeoutSeconds = 10 |  | ||||||
|  |  | ||||||
| // TestPod represents an actual running pod. For each Pod defined by the model, | // TestPod represents an actual running pod. For each Pod defined by the model, | ||||||
| // there will be a corresponding TestPod. TestPod includes some runtime info | // there will be a corresponding TestPod. TestPod includes some runtime info | ||||||
| @@ -185,18 +180,57 @@ func (k *kubeManager) probeConnectivity(args *probeConnectivityArgs) (bool, stri | |||||||
|  |  | ||||||
| 	attempt := 0 | 	attempt := 0 | ||||||
|  |  | ||||||
| 	err := wait.PollImmediate(time.Duration(args.pollIntervalSeconds)*time.Second, time.Duration(args.pollTimeoutSeconds)*time.Second, | 	// NOTE: The return value of this function[probeConnectivity] should be true if the probe is successful and false otherwise. | ||||||
| 		func() (bool, error) { |  | ||||||
| 			stdout, stderr, err := k.executeRemoteCommand(args.nsFrom, args.podFrom, args.containerFrom, cmd) | 	// probeError will be the return value of this function[probeConnectivity] call. | ||||||
| 			if err != nil { | 	var probeError error | ||||||
| 				framework.Logf("retrying probe #%d :: %s/%s -> %s: error when running command: err - %v /// stdout - %s /// stderr - %s", attempt+1, args.nsFrom, args.podFrom, args.addrTo, err, stdout, stderr) | 	var stderr string | ||||||
|  |  | ||||||
|  | 	// Instead of re-running the job on connectivity failure, the following conditionFunc when passed to PollImmediate, reruns | ||||||
|  | 	// the job when the observed value don't match the expected value, so we don't rely on return value of PollImmediate, we | ||||||
|  | 	// simply discard it and use probeError, defined outside scope of conditionFunc, for returning the result of probeConnectivity. | ||||||
|  | 	conditionFunc := func() (bool, error) { | ||||||
|  | 		_, stderr, probeError = k.executeRemoteCommand(args.nsFrom, args.podFrom, args.containerFrom, cmd) | ||||||
|  | 		// retry should only occur if expected and observed value don't match. | ||||||
|  | 		if args.expectConnectivity { | ||||||
|  | 			if probeError != nil { | ||||||
|  | 				// since we expect connectivity here, we fail the condition for PollImmediate to reattempt the probe. | ||||||
|  | 				// this happens in the cases where network is congested, we don't have any policy rejecting traffic | ||||||
|  | 				// from "podFrom" to "podTo" and probes from "podFrom" to "podTo" are failing. | ||||||
|  | 				framework.Logf("probe #%d :: connectivity expected :: %s/%s -> %s :: stderr - %s", | ||||||
|  | 					attempt+1, args.nsFrom, args.podFrom, args.addrTo, stderr, | ||||||
|  | 				) | ||||||
|  | 				attempt++ | ||||||
|  | 				return false, nil | ||||||
|  | 			} else { | ||||||
|  | 				// we got the expected results, exit immediately. | ||||||
|  | 				return true, nil | ||||||
|  | 			} | ||||||
|  | 		} else { | ||||||
|  | 			if probeError != nil { | ||||||
|  | 				// we got the expected results, exit immediately. | ||||||
|  | 				return true, nil | ||||||
|  | 			} else { | ||||||
|  | 				// since we don't expect connectivity here, we fail the condition for PollImmediate to reattempt the probe. | ||||||
|  | 				// this happens in the cases where we have policy rejecting traffic from "podFrom" to "podTo", but CNI takes | ||||||
|  | 				// time to implement the policy and probe from "podFrom" to "podTo" was successful in that window. | ||||||
|  | 				framework.Logf(" probe #%d :: connectivity not expected :: %s/%s -> %s", | ||||||
|  | 					attempt+1, args.nsFrom, args.podFrom, args.addrTo, | ||||||
|  | 				) | ||||||
| 				attempt++ | 				attempt++ | ||||||
| 				return false, nil | 				return false, nil | ||||||
| 			} | 			} | ||||||
| 			return true, nil | 		} | ||||||
| 		}) | 	} | ||||||
|  |  | ||||||
| 	if err != nil { | 	// ignore the result of PollImmediate, we are only concerned with probeError. | ||||||
|  | 	_ = wait.PollImmediate( | ||||||
|  | 		time.Duration(args.pollIntervalSeconds)*time.Second, | ||||||
|  | 		time.Duration(args.pollTimeoutSeconds)*time.Second, | ||||||
|  | 		conditionFunc, | ||||||
|  | 	) | ||||||
|  |  | ||||||
|  | 	if probeError != nil { | ||||||
| 		return false, commandDebugString, nil | 		return false, commandDebugString, nil | ||||||
| 	} | 	} | ||||||
| 	return true, commandDebugString, nil | 	return true, commandDebugString, nil | ||||||
| @@ -309,10 +343,10 @@ func getPollIntervalSeconds() int { | |||||||
| 	return defaultPollIntervalSeconds | 	return defaultPollIntervalSeconds | ||||||
| } | } | ||||||
|  |  | ||||||
| // getPollTimeout returns the timeout for polling on probes. | // getPollTimeout returns the timeout for polling on probes, and takes windows heuristics into account, where requests can take longer sometimes. | ||||||
| func getPollTimeoutSeconds(useMaxPollTimout bool) int { | func getPollTimeoutSeconds() int { | ||||||
| 	if useMaxPollTimout { | 	if framework.NodeOSDistroIs("windows") { | ||||||
| 		return maxPollTimeoutSeconds | 		return defaultPollTimeoutSeconds * 2 | ||||||
| 	} | 	} | ||||||
| 	return defaultPollTimeoutSeconds | 	return defaultPollTimeoutSeconds | ||||||
| } | } | ||||||
|   | |||||||
| @@ -33,6 +33,7 @@ type probeConnectivityArgs struct { | |||||||
| 	addrTo              string | 	addrTo              string | ||||||
| 	protocol            v1.Protocol | 	protocol            v1.Protocol | ||||||
| 	toPort              int | 	toPort              int | ||||||
|  | 	expectConnectivity  bool | ||||||
| 	timeoutSeconds      int | 	timeoutSeconds      int | ||||||
| 	pollIntervalSeconds int | 	pollIntervalSeconds int | ||||||
| 	pollTimeoutSeconds  int | 	pollTimeoutSeconds  int | ||||||
| @@ -51,7 +52,7 @@ type ProbeJob struct { | |||||||
| 	ToPort             int | 	ToPort             int | ||||||
| 	ToPodDNSDomain     string | 	ToPodDNSDomain     string | ||||||
| 	Protocol           v1.Protocol | 	Protocol           v1.Protocol | ||||||
| 	UseMaxPollTimeout bool | 	ExpectConnectivity bool | ||||||
| } | } | ||||||
|  |  | ||||||
| // ProbeJobResults packages the data for the results of a pod->pod connectivity probe | // ProbeJobResults packages the data for the results of a pod->pod connectivity probe | ||||||
| @@ -72,18 +73,17 @@ func ProbePodToPodConnectivity(prober Prober, allPods []TestPod, dnsDomain strin | |||||||
| 	} | 	} | ||||||
| 	for _, podFrom := range allPods { | 	for _, podFrom := range allPods { | ||||||
| 		for _, podTo := range allPods { | 		for _, podTo := range allPods { | ||||||
| 			useMaxPollTimeout := false | 			// set connectivity expectation for the probe job, this allows to retry probe when observed value | ||||||
| 			// we only want to use max poll timeout for the probes where we expect connectivity from "podFrom" to "podTo". | 			// don't match expected value. | ||||||
| 			if testCase.Reachability.Expected.Get(podFrom.PodString().String(), podTo.PodString().String()) { | 			expectConnectivity := testCase.Reachability.Expected.Get(podFrom.PodString().String(), podTo.PodString().String()) | ||||||
| 				useMaxPollTimeout = true |  | ||||||
| 			} |  | ||||||
| 			jobs <- &ProbeJob{ | 			jobs <- &ProbeJob{ | ||||||
| 				PodFrom:            podFrom, | 				PodFrom:            podFrom, | ||||||
| 				PodTo:              podTo, | 				PodTo:              podTo, | ||||||
| 				ToPort:             testCase.ToPort, | 				ToPort:             testCase.ToPort, | ||||||
| 				ToPodDNSDomain:     dnsDomain, | 				ToPodDNSDomain:     dnsDomain, | ||||||
| 				Protocol:           testCase.Protocol, | 				Protocol:           testCase.Protocol, | ||||||
| 				UseMaxPollTimeout: useMaxPollTimeout, | 				ExpectConnectivity: expectConnectivity, | ||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| @@ -136,9 +136,10 @@ func probeWorker(prober Prober, jobs <-chan *ProbeJob, results chan<- *ProbeJobR | |||||||
| 			addrTo:              job.PodTo.ServiceIP, | 			addrTo:              job.PodTo.ServiceIP, | ||||||
| 			protocol:            job.Protocol, | 			protocol:            job.Protocol, | ||||||
| 			toPort:              job.ToPort, | 			toPort:              job.ToPort, | ||||||
|  | 			expectConnectivity:  job.ExpectConnectivity, | ||||||
| 			timeoutSeconds:      getProbeTimeoutSeconds(), | 			timeoutSeconds:      getProbeTimeoutSeconds(), | ||||||
| 			pollIntervalSeconds: getPollIntervalSeconds(), | 			pollIntervalSeconds: getPollIntervalSeconds(), | ||||||
| 			pollTimeoutSeconds:  getPollTimeoutSeconds(job.UseMaxPollTimeout), | 			pollTimeoutSeconds:  getPollTimeoutSeconds(), | ||||||
| 		}) | 		}) | ||||||
| 		result := &ProbeJobResults{ | 		result := &ProbeJobResults{ | ||||||
| 			Job:         job, | 			Job:         job, | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user
	 Daman
					Daman