e2e test for cluster-autoscaler draining node
This commit is contained in:
		@@ -28,7 +28,11 @@ import (
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
						metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
				
			||||||
	"k8s.io/apimachinery/pkg/fields"
 | 
						"k8s.io/apimachinery/pkg/fields"
 | 
				
			||||||
 | 
						"k8s.io/apimachinery/pkg/util/intstr"
 | 
				
			||||||
	"k8s.io/apimachinery/pkg/util/sets"
 | 
						"k8s.io/apimachinery/pkg/util/sets"
 | 
				
			||||||
 | 
						"k8s.io/apimachinery/pkg/util/uuid"
 | 
				
			||||||
 | 
						"k8s.io/apimachinery/pkg/util/wait"
 | 
				
			||||||
 | 
						policy "k8s.io/client-go/pkg/apis/policy/v1beta1"
 | 
				
			||||||
	"k8s.io/kubernetes/pkg/api/v1"
 | 
						"k8s.io/kubernetes/pkg/api/v1"
 | 
				
			||||||
	"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
 | 
						"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
 | 
				
			||||||
	"k8s.io/kubernetes/test/e2e/framework"
 | 
						"k8s.io/kubernetes/test/e2e/framework"
 | 
				
			||||||
@@ -45,9 +49,13 @@ const (
 | 
				
			|||||||
	resizeTimeout    = 5 * time.Minute
 | 
						resizeTimeout    = 5 * time.Minute
 | 
				
			||||||
	scaleUpTimeout   = 5 * time.Minute
 | 
						scaleUpTimeout   = 5 * time.Minute
 | 
				
			||||||
	scaleDownTimeout = 15 * time.Minute
 | 
						scaleDownTimeout = 15 * time.Minute
 | 
				
			||||||
 | 
						podTimeout       = 2 * time.Minute
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	gkeEndpoint      = "https://test-container.sandbox.googleapis.com"
 | 
						gkeEndpoint      = "https://test-container.sandbox.googleapis.com"
 | 
				
			||||||
	gkeUpdateTimeout = 15 * time.Minute
 | 
						gkeUpdateTimeout = 15 * time.Minute
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						disabledTaint             = "DisabledForAutoscalingTest"
 | 
				
			||||||
 | 
						newNodesForScaledownTests = 2
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
var _ = framework.KubeDescribe("Cluster size autoscaling [Slow]", func() {
 | 
					var _ = framework.KubeDescribe("Cluster size autoscaling [Slow]", func() {
 | 
				
			||||||
@@ -95,6 +103,11 @@ var _ = framework.KubeDescribe("Cluster size autoscaling [Slow]", func() {
 | 
				
			|||||||
		By(fmt.Sprintf("Restoring initial size of the cluster"))
 | 
							By(fmt.Sprintf("Restoring initial size of the cluster"))
 | 
				
			||||||
		setMigSizes(originalSizes)
 | 
							setMigSizes(originalSizes)
 | 
				
			||||||
		framework.ExpectNoError(framework.WaitForClusterSize(c, nodeCount, scaleDownTimeout))
 | 
							framework.ExpectNoError(framework.WaitForClusterSize(c, nodeCount, scaleDownTimeout))
 | 
				
			||||||
 | 
							nodes, err := c.Core().Nodes().List(metav1.ListOptions{})
 | 
				
			||||||
 | 
							framework.ExpectNoError(err)
 | 
				
			||||||
 | 
							for _, n := range nodes.Items {
 | 
				
			||||||
 | 
								framework.ExpectNoError(makeNodeSchedulable(c, &n))
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
	})
 | 
						})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	It("shouldn't increase cluster size if pending pod is too large [Feature:ClusterSizeAutoscalingScaleUp]", func() {
 | 
						It("shouldn't increase cluster size if pending pod is too large [Feature:ClusterSizeAutoscalingScaleUp]", func() {
 | 
				
			||||||
@@ -279,17 +292,7 @@ var _ = framework.KubeDescribe("Cluster size autoscaling [Slow]", func() {
 | 
				
			|||||||
	})
 | 
						})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	It("should correctly scale down after a node is not needed [Feature:ClusterSizeAutoscalingScaleDown]", func() {
 | 
						It("should correctly scale down after a node is not needed [Feature:ClusterSizeAutoscalingScaleDown]", func() {
 | 
				
			||||||
		By("Manually increase cluster size")
 | 
							increasedSize := manuallyIncreaseClusterSize(f, originalSizes)
 | 
				
			||||||
		increasedSize := 0
 | 
					 | 
				
			||||||
		newSizes := make(map[string]int)
 | 
					 | 
				
			||||||
		for key, val := range originalSizes {
 | 
					 | 
				
			||||||
			newSizes[key] = val + 2
 | 
					 | 
				
			||||||
			increasedSize += val + 2
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
		setMigSizes(newSizes)
 | 
					 | 
				
			||||||
		framework.ExpectNoError(WaitForClusterSizeFunc(f.ClientSet,
 | 
					 | 
				
			||||||
			func(size int) bool { return size >= increasedSize }, scaleUpTimeout))
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
		By("Some node should be removed")
 | 
							By("Some node should be removed")
 | 
				
			||||||
		framework.ExpectNoError(WaitForClusterSizeFunc(f.ClientSet,
 | 
							framework.ExpectNoError(WaitForClusterSizeFunc(f.ClientSet,
 | 
				
			||||||
			func(size int) bool { return size < increasedSize }, scaleDownTimeout))
 | 
								func(size int) bool { return size < increasedSize }, scaleDownTimeout))
 | 
				
			||||||
@@ -298,16 +301,7 @@ var _ = framework.KubeDescribe("Cluster size autoscaling [Slow]", func() {
 | 
				
			|||||||
	It("should correctly scale down after a node is not needed when there is non autoscaled pool[Feature:ClusterSizeAutoscalingScaleDown]", func() {
 | 
						It("should correctly scale down after a node is not needed when there is non autoscaled pool[Feature:ClusterSizeAutoscalingScaleDown]", func() {
 | 
				
			||||||
		framework.SkipUnlessProviderIs("gke")
 | 
							framework.SkipUnlessProviderIs("gke")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		By("Manually increase cluster size")
 | 
							increasedSize := manuallyIncreaseClusterSize(f, originalSizes)
 | 
				
			||||||
		increasedSize := 0
 | 
					 | 
				
			||||||
		newSizes := make(map[string]int)
 | 
					 | 
				
			||||||
		for key, val := range originalSizes {
 | 
					 | 
				
			||||||
			newSizes[key] = val + 2
 | 
					 | 
				
			||||||
			increasedSize += val + 2
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
		setMigSizes(newSizes)
 | 
					 | 
				
			||||||
		framework.ExpectNoError(WaitForClusterSizeFunc(f.ClientSet,
 | 
					 | 
				
			||||||
			func(size int) bool { return size >= increasedSize }, scaleUpTimeout))
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
		const extraPoolName = "extra-pool"
 | 
							const extraPoolName = "extra-pool"
 | 
				
			||||||
		addNodePool(extraPoolName, "n1-standard-1", 3)
 | 
							addNodePool(extraPoolName, "n1-standard-1", 3)
 | 
				
			||||||
@@ -324,8 +318,62 @@ var _ = framework.KubeDescribe("Cluster size autoscaling [Slow]", func() {
 | 
				
			|||||||
		framework.ExpectNoError(WaitForClusterSizeFunc(f.ClientSet,
 | 
							framework.ExpectNoError(WaitForClusterSizeFunc(f.ClientSet,
 | 
				
			||||||
			func(size int) bool { return size < increasedSize+3 }, scaleDownTimeout+10*time.Minute))
 | 
								func(size int) bool { return size < increasedSize+3 }, scaleDownTimeout+10*time.Minute))
 | 
				
			||||||
	})
 | 
						})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						It("should be able to scale down when rescheduling a pod is required and pdb allows for it[Feature:ClusterSizeAutoscalingScaleDown]", func() {
 | 
				
			||||||
 | 
							runDrainTest(f, originalSizes, 1, func(increasedSize int) {
 | 
				
			||||||
 | 
								By("Some node should be removed")
 | 
				
			||||||
 | 
								framework.ExpectNoError(WaitForClusterSizeFunc(f.ClientSet,
 | 
				
			||||||
 | 
									func(size int) bool { return size < increasedSize }, scaleDownTimeout))
 | 
				
			||||||
 | 
							})
 | 
				
			||||||
 | 
						})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						It("shouldn't be able to scale down when rescheduling a pod is required, but pdb doesn't allow drain[Feature:ClusterSizeAutoscalingScaleDown]", func() {
 | 
				
			||||||
 | 
							runDrainTest(f, originalSizes, 0, func(increasedSize int) {
 | 
				
			||||||
 | 
								By("No nodes should be removed")
 | 
				
			||||||
 | 
								time.Sleep(scaleDownTimeout)
 | 
				
			||||||
 | 
								nodes := framework.GetReadySchedulableNodesOrDie(f.ClientSet)
 | 
				
			||||||
 | 
								Expect(len(nodes.Items)).Should(Equal(increasedSize))
 | 
				
			||||||
 | 
							})
 | 
				
			||||||
 | 
						})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
})
 | 
					})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func runDrainTest(f *framework.Framework, migSizes map[string]int, pdbSize int, verifyFunction func(int)) {
 | 
				
			||||||
 | 
						increasedSize := manuallyIncreaseClusterSize(f, migSizes)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						nodes, err := f.ClientSet.Core().Nodes().List(metav1.ListOptions{FieldSelector: fields.Set{
 | 
				
			||||||
 | 
							"spec.unschedulable": "false",
 | 
				
			||||||
 | 
						}.AsSelector().String()})
 | 
				
			||||||
 | 
						framework.ExpectNoError(err)
 | 
				
			||||||
 | 
						namespace := f.Namespace.Name
 | 
				
			||||||
 | 
						numPods := len(nodes.Items)
 | 
				
			||||||
 | 
						testId := string(uuid.NewUUID()) // So that we can label and find pods
 | 
				
			||||||
 | 
						labelMap := map[string]string{"test_id": testId}
 | 
				
			||||||
 | 
						framework.ExpectNoError(runReplicatedPodOnEachNode(f, nodes.Items, "reschedulable-pods", labelMap))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						defer framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.Namespace.Name, "reschedulable-pods")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						By("Create a PodDisruptionBudget")
 | 
				
			||||||
 | 
						pdb := &policy.PodDisruptionBudget{
 | 
				
			||||||
 | 
							ObjectMeta: metav1.ObjectMeta{
 | 
				
			||||||
 | 
								Name:      "test_pdb",
 | 
				
			||||||
 | 
								Namespace: namespace,
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							Spec: policy.PodDisruptionBudgetSpec{
 | 
				
			||||||
 | 
								Selector:     &metav1.LabelSelector{MatchLabels: labelMap},
 | 
				
			||||||
 | 
								MinAvailable: intstr.FromInt(numPods - pdbSize),
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						_, err = f.StagingClient.Policy().PodDisruptionBudgets(namespace).Create(pdb)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						defer func() {
 | 
				
			||||||
 | 
							f.StagingClient.Policy().PodDisruptionBudgets(namespace).Delete(pdb.Name, &metav1.DeleteOptions{})
 | 
				
			||||||
 | 
						}()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						framework.ExpectNoError(err)
 | 
				
			||||||
 | 
						verifyFunction(increasedSize)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func getGKEClusterUrl() string {
 | 
					func getGKEClusterUrl() string {
 | 
				
			||||||
	out, err := exec.Command("gcloud", "auth", "print-access-token").Output()
 | 
						out, err := exec.Command("gcloud", "auth", "print-access-token").Output()
 | 
				
			||||||
	framework.ExpectNoError(err)
 | 
						framework.ExpectNoError(err)
 | 
				
			||||||
@@ -605,3 +653,128 @@ func setMigSizes(sizes map[string]int) {
 | 
				
			|||||||
		}
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func makeNodeUnschedulable(c clientset.Interface, node *v1.Node) error {
 | 
				
			||||||
 | 
						By(fmt.Sprintf("Taint node %s", node.Name))
 | 
				
			||||||
 | 
						freshNode, err := c.Core().Nodes().Get(node.Name, metav1.GetOptions{})
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						for _, taint := range freshNode.Spec.Taints {
 | 
				
			||||||
 | 
							if taint.Key == disabledTaint {
 | 
				
			||||||
 | 
								return nil
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						freshNode.Spec.Taints = append(freshNode.Spec.Taints, v1.Taint{
 | 
				
			||||||
 | 
							Key:    disabledTaint,
 | 
				
			||||||
 | 
							Value:  "DisabledForTest",
 | 
				
			||||||
 | 
							Effect: v1.TaintEffectNoSchedule,
 | 
				
			||||||
 | 
						})
 | 
				
			||||||
 | 
						_, err = c.Core().Nodes().Update(freshNode)
 | 
				
			||||||
 | 
						return err
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func makeNodeSchedulable(c clientset.Interface, node *v1.Node) error {
 | 
				
			||||||
 | 
						By(fmt.Sprintf("Remove taint from node %s", node.Name))
 | 
				
			||||||
 | 
						freshNode, err := c.Core().Nodes().Get(node.Name, metav1.GetOptions{})
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						newTaints := make([]v1.Taint, 0)
 | 
				
			||||||
 | 
						for _, taint := range freshNode.Spec.Taints {
 | 
				
			||||||
 | 
							if taint.Key != disabledTaint {
 | 
				
			||||||
 | 
								newTaints = append(newTaints, taint)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if len(newTaints) != len(freshNode.Spec.Taints) {
 | 
				
			||||||
 | 
							freshNode.Spec.Taints = newTaints
 | 
				
			||||||
 | 
							_, err = c.Core().Nodes().Update(freshNode)
 | 
				
			||||||
 | 
							return err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return nil
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// Creat an RC running a single pod on each node without adding any constraint forcing such
 | 
				
			||||||
 | 
					// pod distribution. This is meant to create a bunch of underutilized (but not unused) nodes
 | 
				
			||||||
 | 
					// with pods that can be rescheduled on different nodes.
 | 
				
			||||||
 | 
					// This is achieved using the following method:
 | 
				
			||||||
 | 
					// 1. disable scheduling on each node
 | 
				
			||||||
 | 
					// 2. create an empty RC
 | 
				
			||||||
 | 
					// 3. for each node:
 | 
				
			||||||
 | 
					// 3a. enable scheduling on that node
 | 
				
			||||||
 | 
					// 3b. increase number of replicas in RC by 1
 | 
				
			||||||
 | 
					func runReplicatedPodOnEachNode(f *framework.Framework, nodes []v1.Node, id string, labels map[string]string) error {
 | 
				
			||||||
 | 
						By("Run a pod on each node")
 | 
				
			||||||
 | 
						for _, node := range nodes {
 | 
				
			||||||
 | 
							err := makeNodeUnschedulable(f.ClientSet, &node)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							defer func(n v1.Node) {
 | 
				
			||||||
 | 
								makeNodeSchedulable(f.ClientSet, &n)
 | 
				
			||||||
 | 
							}(node)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							if err != nil {
 | 
				
			||||||
 | 
								return err
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						config := &testutils.RCConfig{
 | 
				
			||||||
 | 
							Client:         f.ClientSet,
 | 
				
			||||||
 | 
							InternalClient: f.InternalClientset,
 | 
				
			||||||
 | 
							Name:           id,
 | 
				
			||||||
 | 
							Namespace:      f.Namespace.Name,
 | 
				
			||||||
 | 
							Timeout:        defaultTimeout,
 | 
				
			||||||
 | 
							Image:          framework.GetPauseImageName(f.ClientSet),
 | 
				
			||||||
 | 
							Replicas:       0,
 | 
				
			||||||
 | 
							Labels:         labels,
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						err := framework.RunRC(*config)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						rc, err := f.ClientSet.Core().ReplicationControllers(f.Namespace.Name).Get(id, metav1.GetOptions{})
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						for i, node := range nodes {
 | 
				
			||||||
 | 
							err = makeNodeSchedulable(f.ClientSet, &node)
 | 
				
			||||||
 | 
							if err != nil {
 | 
				
			||||||
 | 
								return err
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							*rc.Spec.Replicas = int32(i + 1)
 | 
				
			||||||
 | 
							rc, err = f.ClientSet.Core().ReplicationControllers(f.Namespace.Name).Update(rc)
 | 
				
			||||||
 | 
							if err != nil {
 | 
				
			||||||
 | 
								return err
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							err = wait.PollImmediate(5*time.Second, podTimeout, func() (bool, error) {
 | 
				
			||||||
 | 
								rc, err = f.ClientSet.Core().ReplicationControllers(f.Namespace.Name).Get(id, metav1.GetOptions{})
 | 
				
			||||||
 | 
								if err != nil || rc.Status.ReadyReplicas < int32(i+1) {
 | 
				
			||||||
 | 
									return false, nil
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								return true, nil
 | 
				
			||||||
 | 
							})
 | 
				
			||||||
 | 
							if err != nil {
 | 
				
			||||||
 | 
								return fmt.Errorf("failed to coerce RC into spawning a pod on node %s within timeout", node.Name)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							err = makeNodeUnschedulable(f.ClientSet, &node)
 | 
				
			||||||
 | 
							if err != nil {
 | 
				
			||||||
 | 
								return err
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return nil
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// Increase cluster size by newNodesForScaledownTests to create some unused nodes
 | 
				
			||||||
 | 
					// that can be later removed by cluster autoscaler.
 | 
				
			||||||
 | 
					func manuallyIncreaseClusterSize(f *framework.Framework, originalSizes map[string]int) int {
 | 
				
			||||||
 | 
						By("Manually increase cluster size")
 | 
				
			||||||
 | 
						increasedSize := 0
 | 
				
			||||||
 | 
						newSizes := make(map[string]int)
 | 
				
			||||||
 | 
						for key, val := range originalSizes {
 | 
				
			||||||
 | 
							newSizes[key] = val + newNodesForScaledownTests
 | 
				
			||||||
 | 
							increasedSize += val + newNodesForScaledownTests
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						setMigSizes(newSizes)
 | 
				
			||||||
 | 
						framework.ExpectNoError(WaitForClusterSizeFunc(f.ClientSet,
 | 
				
			||||||
 | 
							func(size int) bool { return size >= increasedSize }, scaleUpTimeout))
 | 
				
			||||||
 | 
						return increasedSize
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user