Merge pull request #121916 from carlory/issue-121892
E2E - Sig-autoscaling: Refactor the Autoscaling utils
This commit is contained in:
		| @@ -178,7 +178,8 @@ var _ = SIGDescribe(feature.HPA, framework.WithSerial(), framework.WithSlow(), " | ||||
| 			gomega.Expect(timeWaited).To(gomega.BeNumerically(">", waitDeadline), "waited %s, wanted to wait more than %s", timeWaited, waitDeadline) | ||||
|  | ||||
| 			ginkgo.By("verifying number of replicas") | ||||
| 			replicas := rc.GetReplicas(ctx) | ||||
| 			replicas, err := rc.GetReplicas(ctx) | ||||
| 			framework.ExpectNoError(err) | ||||
| 			gomega.Expect(replicas).To(gomega.BeNumerically("==", initPods), "had %s replicas, still have %s replicas after time deadline", initPods, replicas) | ||||
| 		}) | ||||
|  | ||||
| @@ -214,7 +215,8 @@ var _ = SIGDescribe(feature.HPA, framework.WithSerial(), framework.WithSlow(), " | ||||
| 			gomega.Expect(timeWaited).To(gomega.BeNumerically(">", waitDeadline), "waited %s, wanted to wait more than %s", timeWaited, waitDeadline) | ||||
|  | ||||
| 			ginkgo.By("verifying number of replicas") | ||||
| 			replicas := rc.GetReplicas(ctx) | ||||
| 			replicas, err := rc.GetReplicas(ctx) | ||||
| 			framework.ExpectNoError(err) | ||||
| 			gomega.Expect(replicas).To(gomega.BeNumerically("==", initPods), "had %s replicas, still have %s replicas after time deadline", initPods, replicas) | ||||
| 		}) | ||||
|  | ||||
|   | ||||
| @@ -29,14 +29,12 @@ import ( | ||||
| 	apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" | ||||
| 	crdclientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" | ||||
| 	"k8s.io/apiextensions-apiserver/test/integration/fixtures" | ||||
| 	apierrors "k8s.io/apimachinery/pkg/api/errors" | ||||
| 	"k8s.io/apimachinery/pkg/api/meta" | ||||
| 	"k8s.io/apimachinery/pkg/api/resource" | ||||
| 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||||
| 	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" | ||||
| 	"k8s.io/apimachinery/pkg/runtime/schema" | ||||
| 	"k8s.io/apimachinery/pkg/util/intstr" | ||||
| 	"k8s.io/apimachinery/pkg/util/wait" | ||||
| 	"k8s.io/client-go/dynamic" | ||||
| 	clientset "k8s.io/client-go/kubernetes" | ||||
| 	scaleclient "k8s.io/client-go/scale" | ||||
| @@ -66,10 +64,6 @@ const ( | ||||
| 	timeoutRC                       = 120 * time.Second | ||||
| 	startServiceTimeout             = time.Minute | ||||
| 	startServiceInterval            = 5 * time.Second | ||||
| 	rcIsNil                         = "ERROR: replicationController = nil" | ||||
| 	deploymentIsNil                 = "ERROR: deployment = nil" | ||||
| 	rsIsNil                         = "ERROR: replicaset = nil" | ||||
| 	crdIsNil                        = "ERROR: CRD = nil" | ||||
| 	invalidKind                     = "ERROR: invalid workload kind for resource consumer" | ||||
| 	customMetricName                = "QPS" | ||||
| 	serviceInitializationTimeout    = 2 * time.Minute | ||||
| @@ -355,9 +349,11 @@ func (rc *ResourceConsumer) makeConsumeCustomMetric(ctx context.Context) { | ||||
| } | ||||
|  | ||||
| func (rc *ResourceConsumer) sendConsumeCPURequest(ctx context.Context, millicores int) { | ||||
| 	err := wait.PollUntilContextTimeout(ctx, serviceInitializationInterval, serviceInitializationTimeout, true, func(ctx context.Context) (bool, error) { | ||||
| 	err := framework.Gomega().Eventually(ctx, func(ctx context.Context) error { | ||||
| 		proxyRequest, err := e2eservice.GetServicesProxyRequest(rc.clientSet, rc.clientSet.CoreV1().RESTClient().Post()) | ||||
| 		framework.ExpectNoError(err) | ||||
| 		if err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| 		req := proxyRequest.Namespace(rc.nsName). | ||||
| 			Name(rc.controllerName). | ||||
| 			Suffix("ConsumeCPU"). | ||||
| @@ -368,10 +364,10 @@ func (rc *ResourceConsumer) sendConsumeCPURequest(ctx context.Context, millicore | ||||
| 		_, err = req.DoRaw(ctx) | ||||
| 		if err != nil { | ||||
| 			framework.Logf("ConsumeCPU failure: %v", err) | ||||
| 			return false, nil | ||||
| 			return err | ||||
| 		} | ||||
| 		return true, nil | ||||
| 	}) | ||||
| 		return nil | ||||
| 	}).WithTimeout(serviceInitializationTimeout).WithPolling(serviceInitializationInterval).Should(gomega.Succeed()) | ||||
|  | ||||
| 	// Test has already finished (ctx got canceled), so don't fail on err from PollUntilContextTimeout | ||||
| 	// which is a side-effect to context cancelling from the cleanup task. | ||||
| @@ -384,9 +380,11 @@ func (rc *ResourceConsumer) sendConsumeCPURequest(ctx context.Context, millicore | ||||
|  | ||||
| // sendConsumeMemRequest sends POST request for memory consumption | ||||
| func (rc *ResourceConsumer) sendConsumeMemRequest(ctx context.Context, megabytes int) { | ||||
| 	err := wait.PollUntilContextTimeout(ctx, serviceInitializationInterval, serviceInitializationTimeout, true, func(ctx context.Context) (bool, error) { | ||||
| 	err := framework.Gomega().Eventually(ctx, func(ctx context.Context) error { | ||||
| 		proxyRequest, err := e2eservice.GetServicesProxyRequest(rc.clientSet, rc.clientSet.CoreV1().RESTClient().Post()) | ||||
| 		framework.ExpectNoError(err) | ||||
| 		if err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| 		req := proxyRequest.Namespace(rc.nsName). | ||||
| 			Name(rc.controllerName). | ||||
| 			Suffix("ConsumeMem"). | ||||
| @@ -397,10 +395,10 @@ func (rc *ResourceConsumer) sendConsumeMemRequest(ctx context.Context, megabytes | ||||
| 		_, err = req.DoRaw(ctx) | ||||
| 		if err != nil { | ||||
| 			framework.Logf("ConsumeMem failure: %v", err) | ||||
| 			return false, nil | ||||
| 			return err | ||||
| 		} | ||||
| 		return true, nil | ||||
| 	}) | ||||
| 		return nil | ||||
| 	}).WithTimeout(serviceInitializationTimeout).WithPolling(serviceInitializationInterval).Should(gomega.Succeed()) | ||||
|  | ||||
| 	// Test has already finished (ctx got canceled), so don't fail on err from PollUntilContextTimeout | ||||
| 	// which is a side-effect to context cancelling from the cleanup task. | ||||
| @@ -413,9 +411,11 @@ func (rc *ResourceConsumer) sendConsumeMemRequest(ctx context.Context, megabytes | ||||
|  | ||||
| // sendConsumeCustomMetric sends POST request for custom metric consumption | ||||
| func (rc *ResourceConsumer) sendConsumeCustomMetric(ctx context.Context, delta int) { | ||||
| 	err := wait.PollUntilContextTimeout(ctx, serviceInitializationInterval, serviceInitializationTimeout, true, func(ctx context.Context) (bool, error) { | ||||
| 	err := framework.Gomega().Eventually(ctx, func(ctx context.Context) error { | ||||
| 		proxyRequest, err := e2eservice.GetServicesProxyRequest(rc.clientSet, rc.clientSet.CoreV1().RESTClient().Post()) | ||||
| 		framework.ExpectNoError(err) | ||||
| 		if err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| 		req := proxyRequest.Namespace(rc.nsName). | ||||
| 			Name(rc.controllerName). | ||||
| 			Suffix("BumpMetric"). | ||||
| @@ -427,10 +427,10 @@ func (rc *ResourceConsumer) sendConsumeCustomMetric(ctx context.Context, delta i | ||||
| 		_, err = req.DoRaw(ctx) | ||||
| 		if err != nil { | ||||
| 			framework.Logf("ConsumeCustomMetric failure: %v", err) | ||||
| 			return false, nil | ||||
| 			return err | ||||
| 		} | ||||
| 		return true, nil | ||||
| 	}) | ||||
| 		return nil | ||||
| 	}).WithTimeout(serviceInitializationTimeout).WithPolling(serviceInitializationInterval).Should(gomega.Succeed()) | ||||
|  | ||||
| 	// Test has already finished (ctx got canceled), so don't fail on err from PollUntilContextTimeout | ||||
| 	// which is a side-effect to context cancelling from the cleanup task. | ||||
| @@ -442,50 +442,54 @@ func (rc *ResourceConsumer) sendConsumeCustomMetric(ctx context.Context, delta i | ||||
| } | ||||
|  | ||||
| // GetReplicas get the replicas | ||||
| func (rc *ResourceConsumer) GetReplicas(ctx context.Context) int { | ||||
| func (rc *ResourceConsumer) GetReplicas(ctx context.Context) (int, error) { | ||||
| 	switch rc.kind { | ||||
| 	case KindRC: | ||||
| 		replicationController, err := rc.clientSet.CoreV1().ReplicationControllers(rc.nsName).Get(ctx, rc.name, metav1.GetOptions{}) | ||||
| 		framework.ExpectNoError(err) | ||||
| 		if replicationController == nil { | ||||
| 			framework.Failf(rcIsNil) | ||||
| 		if err != nil { | ||||
| 			return 0, err | ||||
| 		} | ||||
| 		return int(replicationController.Status.ReadyReplicas) | ||||
| 		return int(replicationController.Status.ReadyReplicas), nil | ||||
| 	case KindDeployment: | ||||
| 		deployment, err := rc.clientSet.AppsV1().Deployments(rc.nsName).Get(ctx, rc.name, metav1.GetOptions{}) | ||||
| 		framework.ExpectNoError(err) | ||||
| 		if deployment == nil { | ||||
| 			framework.Failf(deploymentIsNil) | ||||
| 		if err != nil { | ||||
| 			return 0, err | ||||
| 		} | ||||
| 		return int(deployment.Status.ReadyReplicas) | ||||
| 		return int(deployment.Status.ReadyReplicas), nil | ||||
| 	case KindReplicaSet: | ||||
| 		rs, err := rc.clientSet.AppsV1().ReplicaSets(rc.nsName).Get(ctx, rc.name, metav1.GetOptions{}) | ||||
| 		framework.ExpectNoError(err) | ||||
| 		if rs == nil { | ||||
| 			framework.Failf(rsIsNil) | ||||
| 		if err != nil { | ||||
| 			return 0, err | ||||
| 		} | ||||
| 		return int(rs.Status.ReadyReplicas) | ||||
| 		return int(rs.Status.ReadyReplicas), nil | ||||
| 	case KindCRD: | ||||
| 		deployment, err := rc.clientSet.AppsV1().Deployments(rc.nsName).Get(ctx, rc.name, metav1.GetOptions{}) | ||||
| 		framework.ExpectNoError(err) | ||||
| 		if deployment == nil { | ||||
| 			framework.Failf(deploymentIsNil) | ||||
| 		if err != nil { | ||||
| 			return 0, err | ||||
| 		} | ||||
| 		deploymentReplicas := int64(deployment.Status.ReadyReplicas) | ||||
|  | ||||
| 		scale, err := rc.scaleClient.Scales(rc.nsName).Get(ctx, schema.GroupResource{Group: crdGroup, Resource: crdNamePlural}, rc.name, metav1.GetOptions{}) | ||||
| 		framework.ExpectNoError(err) | ||||
| 		if err != nil { | ||||
| 			return 0, err | ||||
| 		} | ||||
| 		crdInstance, err := rc.resourceClient.Get(ctx, rc.name, metav1.GetOptions{}) | ||||
| 		framework.ExpectNoError(err) | ||||
| 		if err != nil { | ||||
| 			return 0, err | ||||
| 		} | ||||
| 		// Update custom resource's status.replicas with child Deployment's current number of ready replicas. | ||||
| 		framework.ExpectNoError(unstructured.SetNestedField(crdInstance.Object, deploymentReplicas, "status", "replicas")) | ||||
| 		err = unstructured.SetNestedField(crdInstance.Object, deploymentReplicas, "status", "replicas") | ||||
| 		if err != nil { | ||||
| 			return 0, err | ||||
| 		} | ||||
| 		_, err = rc.resourceClient.Update(ctx, crdInstance, metav1.UpdateOptions{}) | ||||
| 		framework.ExpectNoError(err) | ||||
| 		return int(scale.Spec.Replicas) | ||||
| 		if err != nil { | ||||
| 			return 0, err | ||||
| 		} | ||||
| 		return int(scale.Spec.Replicas), nil | ||||
| 	default: | ||||
| 		framework.Failf(invalidKind) | ||||
| 		return 0, fmt.Errorf(invalidKind) | ||||
| 	} | ||||
| 	return 0 | ||||
| } | ||||
|  | ||||
| // GetHpa get the corresponding horizontalPodAutoscaler object | ||||
| @@ -496,20 +500,21 @@ func (rc *ResourceConsumer) GetHpa(ctx context.Context, name string) (*autoscali | ||||
| // WaitForReplicas wait for the desired replicas | ||||
| func (rc *ResourceConsumer) WaitForReplicas(ctx context.Context, desiredReplicas int, duration time.Duration) { | ||||
| 	interval := 20 * time.Second | ||||
| 	err := wait.PollUntilContextTimeout(ctx, interval, duration, true, func(ctx context.Context) (bool, error) { | ||||
| 		replicas := rc.GetReplicas(ctx) | ||||
| 		framework.Logf("waiting for %d replicas (current: %d)", desiredReplicas, replicas) | ||||
| 		return replicas == desiredReplicas, nil // Expected number of replicas found. Exit. | ||||
| 	}) | ||||
| 	err := framework.Gomega().Eventually(ctx, framework.HandleRetry(rc.GetReplicas)). | ||||
| 		WithTimeout(duration). | ||||
| 		WithPolling(interval). | ||||
| 		Should(gomega.Equal(desiredReplicas)) | ||||
|  | ||||
| 	framework.ExpectNoErrorWithOffset(1, err, "timeout waiting %v for %d replicas", duration, desiredReplicas) | ||||
| } | ||||
|  | ||||
| // EnsureDesiredReplicasInRange ensure the replicas is in a desired range | ||||
| func (rc *ResourceConsumer) EnsureDesiredReplicasInRange(ctx context.Context, minDesiredReplicas, maxDesiredReplicas int, duration time.Duration, hpaName string) { | ||||
| 	interval := 10 * time.Second | ||||
| 	desiredReplicasErr := framework.Gomega().Consistently(ctx, func(ctx context.Context) int { | ||||
| 		return rc.GetReplicas(ctx) | ||||
| 	}).WithTimeout(duration).WithPolling(interval).Should(gomega.And(gomega.BeNumerically(">=", minDesiredReplicas), gomega.BeNumerically("<=", maxDesiredReplicas))) | ||||
| 	desiredReplicasErr := framework.Gomega().Consistently(ctx, framework.HandleRetry(rc.GetReplicas)). | ||||
| 		WithTimeout(duration). | ||||
| 		WithPolling(interval). | ||||
| 		Should(gomega.And(gomega.BeNumerically(">=", minDesiredReplicas), gomega.BeNumerically("<=", maxDesiredReplicas))) | ||||
|  | ||||
| 	// dump HPA for debugging | ||||
| 	as, err := rc.GetHpa(ctx, hpaName) | ||||
| @@ -593,7 +598,7 @@ func runServiceAndSidecarForResourceConsumer(ctx context.Context, c clientset.In | ||||
| 	_, err := createService(ctx, c, sidecarName, ns, serviceAnnotations, serviceSelectors, port, sidecarTargetPort) | ||||
| 	framework.ExpectNoError(err) | ||||
|  | ||||
| 	ginkgo.By(fmt.Sprintf("Running controller for sidecar")) | ||||
| 	ginkgo.By("Running controller for sidecar") | ||||
| 	controllerName := sidecarName + "-ctrl" | ||||
| 	_, err = createService(ctx, c, controllerName, ns, map[string]string{}, map[string]string{"name": controllerName}, port, targetPort) | ||||
| 	framework.ExpectNoError(err) | ||||
| @@ -954,32 +959,24 @@ func CreateCustomResourceDefinition(ctx context.Context, c crdclientset.Interfac | ||||
| 		crd, err = c.ApiextensionsV1().CustomResourceDefinitions().Create(ctx, crdSchema, metav1.CreateOptions{}) | ||||
| 		framework.ExpectNoError(err) | ||||
| 		// Wait until just created CRD appears in discovery. | ||||
| 		err = wait.PollUntilContextTimeout(ctx, 500*time.Millisecond, 30*time.Second, true, func(ctx context.Context) (bool, error) { | ||||
| 			return ExistsInDiscovery(crd, c, "v1") | ||||
| 		}) | ||||
| 		err = framework.Gomega().Eventually(ctx, framework.RetryNotFound(framework.HandleRetry(func(ctx context.Context) (*metav1.APIResourceList, error) { | ||||
| 			return c.Discovery().ServerResourcesForGroupVersion(crd.Spec.Group + "/" + "v1") | ||||
| 		}))).Should(framework.MakeMatcher(func(actual *metav1.APIResourceList) (func() string, error) { | ||||
| 			for _, g := range actual.APIResources { | ||||
| 				if g.Name == crd.Spec.Names.Plural { | ||||
| 					return nil, nil | ||||
| 				} | ||||
| 			} | ||||
| 			return func() string { | ||||
| 				return fmt.Sprintf("CRD %s not found in discovery", crd.Spec.Names.Plural) | ||||
| 			}, nil | ||||
| 		})) | ||||
| 		framework.ExpectNoError(err) | ||||
| 		ginkgo.By(fmt.Sprintf("Successfully created Custom Resource Definition: %v", crd)) | ||||
| 	} | ||||
| 	return crd | ||||
| } | ||||
|  | ||||
| func ExistsInDiscovery(crd *apiextensionsv1.CustomResourceDefinition, apiExtensionsClient crdclientset.Interface, version string) (bool, error) { | ||||
| 	groupResource, err := apiExtensionsClient.Discovery().ServerResourcesForGroupVersion(crd.Spec.Group + "/" + version) | ||||
| 	if err != nil { | ||||
| 		// Ignore 404 errors as it means the resources doesn't exist | ||||
| 		if apierrors.IsNotFound(err) { | ||||
| 			return false, nil | ||||
| 		} | ||||
| 		return false, err | ||||
| 	} | ||||
| 	for _, g := range groupResource.APIResources { | ||||
| 		if g.Name == crd.Spec.Names.Plural { | ||||
| 			return true, nil | ||||
| 		} | ||||
| 	} | ||||
| 	return false, nil | ||||
| } | ||||
|  | ||||
| func CreateCustomSubresourceInstance(ctx context.Context, namespace, name string, client dynamic.ResourceInterface, definition *apiextensionsv1.CustomResourceDefinition) (*unstructured.Unstructured, error) { | ||||
| 	instance := &unstructured.Unstructured{ | ||||
| 		Object: map[string]interface{}{ | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 Kubernetes Prow Robot
					Kubernetes Prow Robot