Merge pull request #52844 from aleksandra-malinowska/autoscaling-test-fix-5
Automatic merge from submit-queue (batch tested with PRs 52843, 52710, 52821, 52844). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.. improve retrying logic when checking CA status This should reduce the flake rate in cluster size autoscaling test suite.
This commit is contained in:
		| @@ -62,6 +62,7 @@ const ( | ||||
| 	rcCreationRetryDelay   = 20 * time.Second | ||||
| 	makeSchedulableTimeout = 10 * time.Minute | ||||
| 	makeSchedulableDelay   = 20 * time.Second | ||||
| 	freshStatusLimit       = 20 * time.Second | ||||
|  | ||||
| 	gkeEndpoint      = "https://test-container.sandbox.googleapis.com" | ||||
| 	gkeUpdateTimeout = 15 * time.Minute | ||||
| @@ -73,6 +74,7 @@ const ( | ||||
|  | ||||
| 	caNoScaleUpStatus      = "NoActivity" | ||||
| 	caOngoingScaleUpStatus = "InProgress" | ||||
| 	timestampFormat        = "2006-01-02 15:04:05 -0700 MST" | ||||
| ) | ||||
|  | ||||
| var _ = SIGDescribe("Cluster size autoscaling [Slow]", func() { | ||||
| @@ -192,15 +194,21 @@ var _ = SIGDescribe("Cluster size autoscaling [Slow]", func() { | ||||
| 		}) | ||||
|  | ||||
| 	It("shouldn't trigger additional scale-ups during processing scale-up [Feature:ClusterSizeAutoscalingScaleUp]", func() { | ||||
| 		status, err := getScaleUpStatus(c) | ||||
| 		// Wait for the situation to stabilize - CA should be running and have up-to-date node readiness info. | ||||
| 		status, err := waitForScaleUpStatus(c, func(s *scaleUpStatus) bool { | ||||
| 			return s.ready == s.target && s.ready <= nodeCount | ||||
| 		}, scaleUpTriggerTimeout) | ||||
| 		framework.ExpectNoError(err) | ||||
|  | ||||
| 		unmanagedNodes := nodeCount - status.ready | ||||
|  | ||||
| 		By("Schedule more pods than can fit and wait for cluster to scale-up") | ||||
| 		ReserveMemory(f, "memory-reservation", 100, nodeCount*memAllocatableMb, false, 1*time.Second) | ||||
| 		defer framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.Namespace.Name, "memory-reservation") | ||||
|  | ||||
| 		status, err = waitForScaleUpStatus(c, caOngoingScaleUpStatus, scaleUpTriggerTimeout) | ||||
| 		status, err = waitForScaleUpStatus(c, func(s *scaleUpStatus) bool { | ||||
| 			return s.status == caOngoingScaleUpStatus | ||||
| 		}, scaleUpTriggerTimeout) | ||||
| 		framework.ExpectNoError(err) | ||||
| 		target := status.target | ||||
| 		framework.ExpectNoError(waitForAllCaPodsReadyInNamespace(f, c)) | ||||
| @@ -211,6 +219,7 @@ var _ = SIGDescribe("Cluster size autoscaling [Slow]", func() { | ||||
| 		if status.target != target { | ||||
| 			glog.Warningf("Final number of nodes (%v) does not match initial scale-up target (%v).", status.target, target) | ||||
| 		} | ||||
| 		Expect(status.timestamp.Add(freshStatusLimit).Before(time.Now())).Should(Equal(false)) | ||||
| 		Expect(status.status).Should(Equal(caNoScaleUpStatus)) | ||||
| 		Expect(status.ready).Should(Equal(status.target)) | ||||
| 		Expect(len(framework.GetReadySchedulableNodesOrDie(f.ClientSet).Items)).Should(Equal(status.target + unmanagedNodes)) | ||||
| @@ -1342,9 +1351,30 @@ func getClusterwideStatus(c clientset.Interface) (string, error) { | ||||
| } | ||||
|  | ||||
| type scaleUpStatus struct { | ||||
| 	status string | ||||
| 	ready  int | ||||
| 	target int | ||||
| 	status    string | ||||
| 	ready     int | ||||
| 	target    int | ||||
| 	timestamp time.Time | ||||
| } | ||||
|  | ||||
| // Try to get timestamp from status. | ||||
| // Status configmap is not parsing-friendly, so evil regexpery follows. | ||||
| func getStatusTimestamp(status string) (time.Time, error) { | ||||
| 	timestampMatcher, err := regexp.Compile("Cluster-autoscaler status at \\s*([0-9\\-]+ [0-9]+:[0-9]+:[0-9]+\\.[0-9]+ \\+[0-9]+ [A-Za-z]+):") | ||||
| 	if err != nil { | ||||
| 		return time.Time{}, err | ||||
| 	} | ||||
|  | ||||
| 	timestampMatches := timestampMatcher.FindAllStringSubmatch(status, -1) | ||||
| 	if len(timestampMatches) < 1 { | ||||
| 		return time.Time{}, fmt.Errorf("Failed to parse CA status timestamp, raw status: %v", status) | ||||
| 	} | ||||
|  | ||||
| 	timestamp, err := time.Parse(timestampFormat, timestampMatches[0][1]) | ||||
| 	if err != nil { | ||||
| 		return time.Time{}, err | ||||
| 	} | ||||
| 	return timestamp, nil | ||||
| } | ||||
|  | ||||
| // Try to get scaleup statuses of all node groups. | ||||
| @@ -1358,6 +1388,12 @@ func getScaleUpStatus(c clientset.Interface) (*scaleUpStatus, error) { | ||||
| 	if !ok { | ||||
| 		return nil, fmt.Errorf("Status information not found in configmap") | ||||
| 	} | ||||
|  | ||||
| 	timestamp, err := getStatusTimestamp(status) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
|  | ||||
| 	matcher, err := regexp.Compile("s*ScaleUp:\\s*([A-Za-z]+)\\s*\\(ready=([0-9]+)\\s*cloudProviderTarget=([0-9]+)\\s*\\)") | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| @@ -1366,10 +1402,12 @@ func getScaleUpStatus(c clientset.Interface) (*scaleUpStatus, error) { | ||||
| 	if len(matches) < 1 { | ||||
| 		return nil, fmt.Errorf("Failed to parse CA status configmap, raw status: %v", status) | ||||
| 	} | ||||
|  | ||||
| 	result := scaleUpStatus{ | ||||
| 		status: caNoScaleUpStatus, | ||||
| 		ready:  0, | ||||
| 		target: 0, | ||||
| 		status:    caNoScaleUpStatus, | ||||
| 		ready:     0, | ||||
| 		target:    0, | ||||
| 		timestamp: timestamp, | ||||
| 	} | ||||
| 	for _, match := range matches { | ||||
| 		if match[1] == caOngoingScaleUpStatus { | ||||
| @@ -1390,17 +1428,25 @@ func getScaleUpStatus(c clientset.Interface) (*scaleUpStatus, error) { | ||||
| 	return &result, nil | ||||
| } | ||||
|  | ||||
| func waitForScaleUpStatus(c clientset.Interface, expected string, timeout time.Duration) (*scaleUpStatus, error) { | ||||
| 	for start := time.Now(); time.Since(start) < timeout; time.Sleep(5 * time.Second) { | ||||
| 		status, err := getScaleUpStatus(c) | ||||
| 		if err != nil { | ||||
| 			return nil, err | ||||
| func waitForScaleUpStatus(c clientset.Interface, cond func(s *scaleUpStatus) bool, timeout time.Duration) (*scaleUpStatus, error) { | ||||
| 	var finalErr error | ||||
| 	var status *scaleUpStatus | ||||
| 	err := wait.PollImmediate(5*time.Second, timeout, func() (bool, error) { | ||||
| 		status, finalErr = getScaleUpStatus(c) | ||||
| 		if finalErr != nil { | ||||
| 			return false, nil | ||||
| 		} | ||||
| 		if status.status == expected { | ||||
| 			return status, nil | ||||
| 		if status.timestamp.Add(freshStatusLimit).Before(time.Now()) { | ||||
| 			// stale status | ||||
| 			finalErr = fmt.Errorf("Status too old") | ||||
| 			return false, nil | ||||
| 		} | ||||
| 		return cond(status), nil | ||||
| 	}) | ||||
| 	if err != nil { | ||||
| 		err = fmt.Errorf("Failed to find expected scale up status: %v, last status: %v, final err: %v", err, status, finalErr) | ||||
| 	} | ||||
| 	return nil, fmt.Errorf("ScaleUp status did not reach expected value: %v", expected) | ||||
| 	return status, err | ||||
| } | ||||
|  | ||||
| // This is a temporary fix to allow CA to migrate some kube-system pods | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 Kubernetes Submit Queue
					Kubernetes Submit Queue