Fix PDB preemption tests.
This commit is contained in:
		| @@ -27,7 +27,6 @@ import ( | ||||
| 	policy "k8s.io/api/policy/v1beta1" | ||||
| 	"k8s.io/apimachinery/pkg/api/resource" | ||||
| 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||||
| 	"k8s.io/apimachinery/pkg/labels" | ||||
| 	"k8s.io/apimachinery/pkg/types" | ||||
| 	"k8s.io/apimachinery/pkg/util/intstr" | ||||
| 	"k8s.io/apimachinery/pkg/util/wait" | ||||
| @@ -601,6 +600,18 @@ func mkMinAvailablePDB(name, namespace string, uid types.UID, minAvailable int, | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func addPodConditionReady(pod *v1.Pod) { | ||||
| 	pod.Status = v1.PodStatus{ | ||||
| 		Phase: v1.PodRunning, | ||||
| 		Conditions: []v1.PodCondition{ | ||||
| 			{ | ||||
| 				Type:   v1.PodReady, | ||||
| 				Status: v1.ConditionTrue, | ||||
| 			}, | ||||
| 		}, | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // TestPDBInPreemption tests PodDisruptionBudget support in preemption. | ||||
| func TestPDBInPreemption(t *testing.T) { | ||||
| 	// Enable PodPriority feature gate. | ||||
| @@ -610,6 +621,8 @@ func TestPDBInPreemption(t *testing.T) { | ||||
| 	defer cleanupTest(t, context) | ||||
| 	cs := context.clientSet | ||||
|  | ||||
| 	initDisruptionController(context) | ||||
|  | ||||
| 	defaultPodRes := &v1.ResourceRequirements{Requests: v1.ResourceList{ | ||||
| 		v1.ResourceCPU:    *resource.NewMilliQuantity(100, resource.DecimalSI), | ||||
| 		v1.ResourceMemory: *resource.NewQuantity(100, resource.BinarySI)}, | ||||
| @@ -629,6 +642,7 @@ func TestPDBInPreemption(t *testing.T) { | ||||
| 		description         string | ||||
| 		nodes               []*nodeConfig | ||||
| 		pdbs                []*policy.PodDisruptionBudget | ||||
| 		pdbPodNum           []int32 | ||||
| 		existingPods        []*v1.Pod | ||||
| 		pod                 *v1.Pod | ||||
| 		preemptedPodIndexes map[int]struct{} | ||||
| @@ -639,6 +653,7 @@ func TestPDBInPreemption(t *testing.T) { | ||||
| 			pdbs: []*policy.PodDisruptionBudget{ | ||||
| 				mkMinAvailablePDB("pdb-1", context.ns.Name, types.UID("pdb-1-uid"), 2, map[string]string{"foo": "bar"}), | ||||
| 			}, | ||||
| 			pdbPodNum: []int32{2}, | ||||
| 			existingPods: []*v1.Pod{ | ||||
| 				initPausePod(context.clientSet, &pausePodConfig{ | ||||
| 					Name:      "low-pod1", | ||||
| @@ -681,6 +696,7 @@ func TestPDBInPreemption(t *testing.T) { | ||||
| 			pdbs: []*policy.PodDisruptionBudget{ | ||||
| 				mkMinAvailablePDB("pdb-1", context.ns.Name, types.UID("pdb-1-uid"), 2, map[string]string{"foo": "bar"}), | ||||
| 			}, | ||||
| 			pdbPodNum: []int32{1}, | ||||
| 			existingPods: []*v1.Pod{ | ||||
| 				initPausePod(context.clientSet, &pausePodConfig{ | ||||
| 					Name:      "low-pod1", | ||||
| @@ -720,6 +736,7 @@ func TestPDBInPreemption(t *testing.T) { | ||||
| 				mkMinAvailablePDB("pdb-1", context.ns.Name, types.UID("pdb-1-uid"), 2, map[string]string{"foo1": "bar"}), | ||||
| 				mkMinAvailablePDB("pdb-2", context.ns.Name, types.UID("pdb-2-uid"), 2, map[string]string{"foo2": "bar"}), | ||||
| 			}, | ||||
| 			pdbPodNum: []int32{1, 5}, | ||||
| 			existingPods: []*v1.Pod{ | ||||
| 				initPausePod(context.clientSet, &pausePodConfig{ | ||||
| 					Name:      "low-pod1", | ||||
| @@ -783,38 +800,22 @@ func TestPDBInPreemption(t *testing.T) { | ||||
| 				Priority:  &highPriority, | ||||
| 				Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{ | ||||
| 					v1.ResourceCPU:    *resource.NewMilliQuantity(500, resource.DecimalSI), | ||||
| 					v1.ResourceMemory: *resource.NewQuantity(200, resource.BinarySI)}, | ||||
| 					v1.ResourceMemory: *resource.NewQuantity(400, resource.BinarySI)}, | ||||
| 				}, | ||||
| 			}), | ||||
| 			preemptedPodIndexes: map[int]struct{}{0: {}, 1: {}}, | ||||
| 			// The third node is chosen because PDB is not violated for node 3 and the victims have lower priority than node-2. | ||||
| 			preemptedPodIndexes: map[int]struct{}{4: {}, 5: {}, 6: {}}, | ||||
| 		}, | ||||
| 	} | ||||
|  | ||||
| 	for _, test := range tests { | ||||
| 		t.Logf("================ Running test: %v\n", test.description) | ||||
| 		for _, nodeConf := range test.nodes { | ||||
| 			_, err := createNode(cs, nodeConf.name, nodeConf.res) | ||||
| 			if err != nil { | ||||
| 				t.Fatalf("Error creating node %v: %v", nodeConf.name, err) | ||||
| 			} | ||||
| 		} | ||||
| 		// Create PDBs. | ||||
| 		for _, pdb := range test.pdbs { | ||||
| 			_, err := context.clientSet.PolicyV1beta1().PodDisruptionBudgets(context.ns.Name).Create(pdb) | ||||
| 			if err != nil { | ||||
| 				t.Fatalf("Failed to create PDB: %v", err) | ||||
| 			} | ||||
| 		} | ||||
| 		// Wait for PDBs to show up in the scheduler's cache. | ||||
| 		if err := wait.Poll(time.Second, 15*time.Second, func() (bool, error) { | ||||
| 			cachedPDBs, err := context.scheduler.Config().SchedulerCache.ListPDBs(labels.Everything()) | ||||
| 			if err != nil { | ||||
| 				t.Errorf("Error while polling for PDB: %v", err) | ||||
| 				return false, err | ||||
| 			} | ||||
| 			return len(cachedPDBs) == len(test.pdbs), err | ||||
| 		}); err != nil { | ||||
| 			t.Fatalf("Not all PDBs were added to the cache: %v", err) | ||||
| 		} | ||||
|  | ||||
| 		pods := make([]*v1.Pod, len(test.existingPods)) | ||||
| 		var err error | ||||
| @@ -823,7 +824,29 @@ func TestPDBInPreemption(t *testing.T) { | ||||
| 			if pods[i], err = runPausePod(cs, p); err != nil { | ||||
| 				t.Fatalf("Test [%v]: Error running pause pod: %v", test.description, err) | ||||
| 			} | ||||
| 			// Add pod condition ready so that PDB is updated. | ||||
| 			addPodConditionReady(p) | ||||
| 			if _, err := context.clientSet.CoreV1().Pods(context.ns.Name).UpdateStatus(p); err != nil { | ||||
| 				t.Fatal(err) | ||||
| 			} | ||||
| 		} | ||||
| 		// Wait for Pods to be stable in scheduler cache. | ||||
| 		if err := waitCachedPodsStable(context, test.existingPods); err != nil { | ||||
| 			t.Fatalf("Not all pods are stable in the cache: %v", err) | ||||
| 		} | ||||
|  | ||||
| 		// Create PDBs. | ||||
| 		for _, pdb := range test.pdbs { | ||||
| 			_, err := context.clientSet.PolicyV1beta1().PodDisruptionBudgets(context.ns.Name).Create(pdb) | ||||
| 			if err != nil { | ||||
| 				t.Fatalf("Failed to create PDB: %v", err) | ||||
| 			} | ||||
| 		} | ||||
| 		// Wait for PDBs to show up in the scheduler's cache and become stable. | ||||
| 		if err := waitCachedPDBsStable(context, test.pdbs, test.pdbPodNum); err != nil { | ||||
| 			t.Fatalf("Not all pdbs are stable in the cache: %v", err) | ||||
| 		} | ||||
|  | ||||
| 		// Create the "pod". | ||||
| 		preemptor, err := createPausePod(cs, test.pod) | ||||
| 		if err != nil { | ||||
|   | ||||
| @@ -24,9 +24,11 @@ import ( | ||||
| 	"time" | ||||
|  | ||||
| 	"k8s.io/api/core/v1" | ||||
| 	policy "k8s.io/api/policy/v1beta1" | ||||
| 	"k8s.io/apimachinery/pkg/api/errors" | ||||
| 	"k8s.io/apimachinery/pkg/api/resource" | ||||
| 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||||
| 	"k8s.io/apimachinery/pkg/labels" | ||||
| 	"k8s.io/apimachinery/pkg/runtime/schema" | ||||
| 	"k8s.io/apimachinery/pkg/util/uuid" | ||||
| 	"k8s.io/apimachinery/pkg/util/wait" | ||||
| @@ -42,6 +44,8 @@ import ( | ||||
| 	"k8s.io/client-go/tools/record" | ||||
| 	"k8s.io/kubernetes/pkg/api/legacyscheme" | ||||
| 	podutil "k8s.io/kubernetes/pkg/api/v1/pod" | ||||
| 	"k8s.io/kubernetes/pkg/controller" | ||||
| 	"k8s.io/kubernetes/pkg/controller/disruption" | ||||
| 	"k8s.io/kubernetes/pkg/features" | ||||
| 	"k8s.io/kubernetes/pkg/scheduler" | ||||
| 	_ "k8s.io/kubernetes/pkg/scheduler/algorithmprovider" | ||||
| @@ -194,6 +198,7 @@ func initTestSchedulerWithOptions( | ||||
| 	// set setPodInformer if provided. | ||||
| 	if setPodInformer { | ||||
| 		go podInformer.Informer().Run(context.schedulerConfig.StopEverything) | ||||
| 		controller.WaitForCacheSync("scheduler", context.schedulerConfig.StopEverything, podInformer.Informer().HasSynced) | ||||
| 	} | ||||
|  | ||||
| 	eventBroadcaster := record.NewBroadcaster() | ||||
| @@ -218,6 +223,26 @@ func initTestSchedulerWithOptions( | ||||
| 	return context | ||||
| } | ||||
|  | ||||
| // initDisruptionController initializes and runs a Disruption Controller to properly | ||||
| // update PodDisuptionBudget objects. | ||||
| func initDisruptionController(context *TestContext) *disruption.DisruptionController { | ||||
| 	informers := informers.NewSharedInformerFactory(context.clientSet, 12*time.Hour) | ||||
|  | ||||
| 	dc := disruption.NewDisruptionController( | ||||
| 		informers.Core().V1().Pods(), | ||||
| 		informers.Policy().V1beta1().PodDisruptionBudgets(), | ||||
| 		informers.Core().V1().ReplicationControllers(), | ||||
| 		informers.Extensions().V1beta1().ReplicaSets(), | ||||
| 		informers.Extensions().V1beta1().Deployments(), | ||||
| 		informers.Apps().V1beta1().StatefulSets(), | ||||
| 		context.clientSet) | ||||
|  | ||||
| 	informers.Start(context.schedulerConfig.StopEverything) | ||||
| 	informers.WaitForCacheSync(context.schedulerConfig.StopEverything) | ||||
| 	go dc.Run(context.schedulerConfig.StopEverything) | ||||
| 	return dc | ||||
| } | ||||
|  | ||||
| // initTest initializes a test environment and creates master and scheduler with default | ||||
| // configuration. | ||||
| func initTest(t *testing.T, nsPrefix string) *TestContext { | ||||
| @@ -514,6 +539,59 @@ func waitForPodUnschedulable(cs clientset.Interface, pod *v1.Pod) error { | ||||
| 	return waitForPodUnschedulableWithTimeout(cs, pod, 30*time.Second) | ||||
| } | ||||
|  | ||||
| // waitCachedPDBsStable waits for PDBs in scheduler cache to have "CurrentHealthy" status equal to | ||||
| // the expected values. | ||||
| func waitCachedPDBsStable(context *TestContext, pdbs []*policy.PodDisruptionBudget, pdbPodNum []int32) error { | ||||
| 	return wait.Poll(time.Second, 60*time.Second, func() (bool, error) { | ||||
| 		cachedPDBs, err := context.scheduler.Config().SchedulerCache.ListPDBs(labels.Everything()) | ||||
| 		if err != nil { | ||||
| 			return false, err | ||||
| 		} | ||||
| 		if len(cachedPDBs) != len(pdbs) { | ||||
| 			return false, nil | ||||
| 		} | ||||
| 		for i, pdb := range pdbs { | ||||
| 			found := false | ||||
| 			for _, cpdb := range cachedPDBs { | ||||
| 				if pdb.Name == cpdb.Name && pdb.Namespace == cpdb.Namespace { | ||||
| 					found = true | ||||
| 					if cpdb.Status.CurrentHealthy != pdbPodNum[i] { | ||||
| 						return false, nil | ||||
| 					} | ||||
| 				} | ||||
| 			} | ||||
| 			if !found { | ||||
| 				return false, nil | ||||
| 			} | ||||
| 		} | ||||
| 		return true, nil | ||||
| 	}) | ||||
| } | ||||
|  | ||||
| // waitCachedPodsStable waits until scheduler cache has the given pods. | ||||
| func waitCachedPodsStable(context *TestContext, pods []*v1.Pod) error { | ||||
| 	return wait.Poll(time.Second, 30*time.Second, func() (bool, error) { | ||||
| 		cachedPods, err := context.scheduler.Config().SchedulerCache.List(labels.Everything()) | ||||
| 		if err != nil { | ||||
| 			return false, err | ||||
| 		} | ||||
| 		if len(pods) != len(cachedPods) { | ||||
| 			return false, nil | ||||
| 		} | ||||
| 		for _, p := range pods { | ||||
| 			actualPod, err1 := context.clientSet.CoreV1().Pods(p.Namespace).Get(p.Name, metav1.GetOptions{}) | ||||
| 			if err1 != nil { | ||||
| 				return false, err1 | ||||
| 			} | ||||
| 			cachedPod, err2 := context.scheduler.Config().SchedulerCache.GetPod(actualPod) | ||||
| 			if err2 != nil || cachedPod == nil { | ||||
| 				return false, err2 | ||||
| 			} | ||||
| 		} | ||||
| 		return true, nil | ||||
| 	}) | ||||
| } | ||||
|  | ||||
| // deletePod deletes the given pod in the given namespace. | ||||
| func deletePod(cs clientset.Interface, podName string, nsName string) error { | ||||
| 	return cs.CoreV1().Pods(nsName).Delete(podName, metav1.NewDeleteOptions(0)) | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 Bobby (Babak) Salamat
					Bobby (Babak) Salamat