Merge pull request #119046 from kerthcet/fix/handle-unschedule-plugins
Fix fitError in Permit plugin not handled perfectly
This commit is contained in:
		| @@ -1341,8 +1341,7 @@ func (f *frameworkImpl) RunPermitPlugins(ctx context.Context, state *framework.C | ||||
| 		if !status.IsSuccess() { | ||||
| 			if status.IsUnschedulable() { | ||||
| 				logger.V(4).Info("Pod rejected by plugin", "pod", klog.KObj(pod), "plugin", pl.Name(), "status", status.Message()) | ||||
| 				status.SetFailedPlugin(pl.Name()) | ||||
| 				return status | ||||
| 				return status.WithFailedPlugin(pl.Name()) | ||||
| 			} | ||||
| 			if status.IsWait() { | ||||
| 				// Not allowed to be greater than maxTimeout. | ||||
|   | ||||
| @@ -2669,7 +2669,6 @@ func TestPermitPlugins(t *testing.T) { | ||||
| 			} | ||||
|  | ||||
| 			status := f.RunPermitPlugins(ctx, nil, pod, "") | ||||
|  | ||||
| 			if !reflect.DeepEqual(status, tt.want) { | ||||
| 				t.Errorf("wrong status code. got %v, want %v", status, tt.want) | ||||
| 			} | ||||
|   | ||||
| @@ -343,6 +343,7 @@ func (f *FitError) Error() string { | ||||
| 	if postFilterMsg != "" { | ||||
| 		reasonMsg += fmt.Sprintf(SeparatorFormat, postFilterMsg) | ||||
| 	} | ||||
|  | ||||
| 	return reasonMsg | ||||
| } | ||||
|  | ||||
|   | ||||
| @@ -1447,6 +1447,17 @@ func TestFitError_Error(t *testing.T) { | ||||
| 			}, | ||||
| 			wantReasonMsg: "0/3 nodes are available: 1 Node(s) failed Filter plugin FalseFilter-2, 2 Node(s) failed Filter plugin FalseFilter-1. Error running PostFilter plugin FailedPostFilter.", | ||||
| 		}, | ||||
| 		{ | ||||
| 			name:        "failed to Permit on node", | ||||
| 			numAllNodes: 1, | ||||
| 			diagnosis: Diagnosis{ | ||||
| 				NodeToStatusMap: NodeToStatusMap{ | ||||
| 					// There should be only one node here. | ||||
| 					"node1": NewStatus(Unschedulable, "Node failed Permit plugin Permit-1"), | ||||
| 				}, | ||||
| 			}, | ||||
| 			wantReasonMsg: "0/1 nodes are available: 1 Node failed Permit plugin Permit-1.", | ||||
| 		}, | ||||
| 	} | ||||
| 	for _, tt := range tests { | ||||
| 		t.Run(tt.name, func(t *testing.T) { | ||||
|   | ||||
| @@ -184,9 +184,7 @@ func (sched *Scheduler) schedulingCycle( | ||||
| 		// This relies on the fact that Error will check if the pod has been bound | ||||
| 		// to a node and if so will not add it back to the unscheduled pods queue | ||||
| 		// (otherwise this would cause an infinite loop). | ||||
| 		return ScheduleResult{nominatingInfo: clearNominatedNode}, | ||||
| 			assumedPodInfo, | ||||
| 			framework.AsStatus(err) | ||||
| 		return ScheduleResult{nominatingInfo: clearNominatedNode}, assumedPodInfo, framework.AsStatus(err) | ||||
| 	} | ||||
|  | ||||
| 	// Run the Reserve method of reserve plugins. | ||||
| @@ -197,9 +195,7 @@ func (sched *Scheduler) schedulingCycle( | ||||
| 			logger.Error(forgetErr, "Scheduler cache ForgetPod failed") | ||||
| 		} | ||||
|  | ||||
| 		return ScheduleResult{nominatingInfo: clearNominatedNode}, | ||||
| 			assumedPodInfo, | ||||
| 			sts | ||||
| 		return ScheduleResult{nominatingInfo: clearNominatedNode}, assumedPodInfo, sts | ||||
| 	} | ||||
|  | ||||
| 	// Run "permit" plugins. | ||||
| @@ -211,9 +207,19 @@ func (sched *Scheduler) schedulingCycle( | ||||
| 			logger.Error(forgetErr, "Scheduler cache ForgetPod failed") | ||||
| 		} | ||||
|  | ||||
| 		return ScheduleResult{nominatingInfo: clearNominatedNode}, | ||||
| 			assumedPodInfo, | ||||
| 			runPermitStatus | ||||
| 		if runPermitStatus.IsUnschedulable() { | ||||
| 			fitErr := &framework.FitError{ | ||||
| 				NumAllNodes: 1, | ||||
| 				Pod:         pod, | ||||
| 				Diagnosis: framework.Diagnosis{ | ||||
| 					NodeToStatusMap:      framework.NodeToStatusMap{scheduleResult.SuggestedHost: runPermitStatus}, | ||||
| 					UnschedulablePlugins: sets.New(runPermitStatus.FailedPlugin()), | ||||
| 				}, | ||||
| 			} | ||||
| 			return ScheduleResult{nominatingInfo: clearNominatedNode}, assumedPodInfo, framework.NewStatus(runPermitStatus.Code()).WithError(fitErr) | ||||
| 		} | ||||
|  | ||||
| 		return ScheduleResult{nominatingInfo: clearNominatedNode}, assumedPodInfo, runPermitStatus | ||||
| 	} | ||||
|  | ||||
| 	// At the end of a successful scheduling cycle, pop and move up Pods if needed. | ||||
| @@ -923,7 +929,6 @@ func (sched *Scheduler) handleSchedulingFailure(ctx context.Context, fwk framewo | ||||
| 	errMsg := status.Message() | ||||
|  | ||||
| 	if err == ErrNoNodesAvailable { | ||||
|  | ||||
| 		logger.V(2).Info("Unable to schedule pod; no nodes are registered to the cluster; waiting", "pod", klog.KObj(pod)) | ||||
| 	} else if fitError, ok := err.(*framework.FitError); ok { // Inject UnschedulablePlugins to PodInfo, which will be used later for moving Pods between queues efficiently. | ||||
| 		podInfo.UnschedulablePlugins = fitError.Diagnosis.UnschedulablePlugins | ||||
|   | ||||
| @@ -47,6 +47,7 @@ import ( | ||||
| 	"k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources" | ||||
| 	frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime" | ||||
| 	st "k8s.io/kubernetes/pkg/scheduler/testing" | ||||
| 	schedulerutils "k8s.io/kubernetes/test/integration/scheduler" | ||||
| 	testutils "k8s.io/kubernetes/test/integration/util" | ||||
| 	imageutils "k8s.io/kubernetes/test/utils/image" | ||||
| 	"k8s.io/utils/pointer" | ||||
| @@ -64,6 +65,9 @@ var ( | ||||
| 	waitForPodUnschedulable         = testutils.WaitForPodUnschedulable | ||||
| 	waitForPodSchedulingGated       = testutils.WaitForPodSchedulingGated | ||||
| 	waitForPodToScheduleWithTimeout = testutils.WaitForPodToScheduleWithTimeout | ||||
| 	initRegistryAndConfig           = func(t *testing.T, plugins ...framework.Plugin) (frameworkruntime.Registry, schedulerconfig.KubeSchedulerProfile) { | ||||
| 		return schedulerutils.InitRegistryAndConfig(t, newPlugin, plugins...) | ||||
| 	} | ||||
| ) | ||||
|  | ||||
| type PreEnqueuePlugin struct { | ||||
| @@ -659,7 +663,7 @@ func TestPreFilterPlugin(t *testing.T) { | ||||
| 			preFilterPlugin := &PreFilterPlugin{} | ||||
| 			registry, prof := initRegistryAndConfig(t, preFilterPlugin) | ||||
|  | ||||
| 			testCtx, teardown := initTestSchedulerForFrameworkTest(t, testContext, 2, | ||||
| 			testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 2, | ||||
| 				scheduler.WithProfiles(prof), | ||||
| 				scheduler.WithFrameworkOutOfTreeRegistry(registry)) | ||||
| 			defer teardown() | ||||
| @@ -837,7 +841,7 @@ func TestPostFilterPlugin(t *testing.T) { | ||||
| 					}, | ||||
| 				}}}) | ||||
|  | ||||
| 			testCtx, teardown := initTestSchedulerForFrameworkTest(t, testContext, int(tt.numNodes), | ||||
| 			testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, int(tt.numNodes), | ||||
| 				scheduler.WithProfiles(cfg.Profiles...), | ||||
| 				scheduler.WithFrameworkOutOfTreeRegistry(registry), | ||||
| 			) | ||||
| @@ -904,7 +908,7 @@ func TestScorePlugin(t *testing.T) { | ||||
| 			scorePlugin := &ScorePlugin{} | ||||
| 			registry, prof := initRegistryAndConfig(t, scorePlugin) | ||||
|  | ||||
| 			testCtx, teardown := initTestSchedulerForFrameworkTest(t, testContext, 10, | ||||
| 			testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 10, | ||||
| 				scheduler.WithProfiles(prof), | ||||
| 				scheduler.WithFrameworkOutOfTreeRegistry(registry)) | ||||
| 			defer teardown() | ||||
| @@ -947,7 +951,7 @@ func TestNormalizeScorePlugin(t *testing.T) { | ||||
| 	scoreWithNormalizePlugin := &ScoreWithNormalizePlugin{} | ||||
| 	registry, prof := initRegistryAndConfig(t, scoreWithNormalizePlugin) | ||||
|  | ||||
| 	testCtx, _ := initTestSchedulerForFrameworkTest(t, testutils.InitTestAPIServer(t, "score-plugin", nil), 10, | ||||
| 	testCtx, _ := schedulerutils.InitTestSchedulerForFrameworkTest(t, testutils.InitTestAPIServer(t, "score-plugin", nil), 10, | ||||
| 		scheduler.WithProfiles(prof), | ||||
| 		scheduler.WithFrameworkOutOfTreeRegistry(registry)) | ||||
|  | ||||
| @@ -995,7 +999,7 @@ func TestReservePluginReserve(t *testing.T) { | ||||
| 			reservePlugin := &ReservePlugin{} | ||||
| 			registry, prof := initRegistryAndConfig(t, reservePlugin) | ||||
|  | ||||
| 			testCtx, teardown := initTestSchedulerForFrameworkTest(t, testContext, 2, | ||||
| 			testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 2, | ||||
| 				scheduler.WithProfiles(prof), | ||||
| 				scheduler.WithFrameworkOutOfTreeRegistry(registry)) | ||||
| 			defer teardown() | ||||
| @@ -1110,7 +1114,7 @@ func TestPrebindPlugin(t *testing.T) { | ||||
| 				}, | ||||
| 			}) | ||||
|  | ||||
| 			testCtx, teardown := initTestSchedulerForFrameworkTest(t, testContext, nodesNum, | ||||
| 			testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, nodesNum, | ||||
| 				scheduler.WithProfiles(cfg.Profiles...), | ||||
| 				scheduler.WithFrameworkOutOfTreeRegistry(registry)) | ||||
| 			defer teardown() | ||||
| @@ -1265,7 +1269,7 @@ func TestUnReserveReservePlugins(t *testing.T) { | ||||
| 			} | ||||
| 			registry, prof := initRegistryAndConfig(t, pls...) | ||||
|  | ||||
| 			testCtx, teardown := initTestSchedulerForFrameworkTest(t, testContext, 2, | ||||
| 			testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 2, | ||||
| 				scheduler.WithProfiles(prof), | ||||
| 				scheduler.WithFrameworkOutOfTreeRegistry(registry)) | ||||
| 			defer teardown() | ||||
| @@ -1358,7 +1362,7 @@ func TestUnReservePermitPlugins(t *testing.T) { | ||||
| 			} | ||||
| 			registry, profile := initRegistryAndConfig(t, []framework.Plugin{test.plugin, reservePlugin}...) | ||||
|  | ||||
| 			testCtx, teardown := initTestSchedulerForFrameworkTest(t, testContext, 2, | ||||
| 			testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 2, | ||||
| 				scheduler.WithProfiles(profile), | ||||
| 				scheduler.WithFrameworkOutOfTreeRegistry(registry)) | ||||
| 			defer teardown() | ||||
| @@ -1430,7 +1434,7 @@ func TestUnReservePreBindPlugins(t *testing.T) { | ||||
| 			} | ||||
| 			registry, profile := initRegistryAndConfig(t, []framework.Plugin{test.plugin, reservePlugin}...) | ||||
|  | ||||
| 			testCtx, teardown := initTestSchedulerForFrameworkTest(t, testContext, 2, | ||||
| 			testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 2, | ||||
| 				scheduler.WithProfiles(profile), | ||||
| 				scheduler.WithFrameworkOutOfTreeRegistry(registry)) | ||||
| 			defer teardown() | ||||
| @@ -1501,7 +1505,7 @@ func TestUnReserveBindPlugins(t *testing.T) { | ||||
|  | ||||
| 			test.plugin.client = testContext.ClientSet | ||||
|  | ||||
| 			testCtx, teardown := initTestSchedulerForFrameworkTest(t, testContext, 2, | ||||
| 			testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 2, | ||||
| 				scheduler.WithProfiles(profile), | ||||
| 				scheduler.WithFrameworkOutOfTreeRegistry(registry)) | ||||
| 			defer teardown() | ||||
| @@ -1634,7 +1638,7 @@ func TestBindPlugin(t *testing.T) { | ||||
| 				}}, | ||||
| 			}) | ||||
|  | ||||
| 			testCtx, teardown := initTestSchedulerForFrameworkTest(t, testContext, 2, | ||||
| 			testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 2, | ||||
| 				scheduler.WithProfiles(cfg.Profiles...), | ||||
| 				scheduler.WithFrameworkOutOfTreeRegistry(registry), | ||||
| 			) | ||||
| @@ -1755,7 +1759,7 @@ func TestPostBindPlugin(t *testing.T) { | ||||
| 			} | ||||
|  | ||||
| 			registry, prof := initRegistryAndConfig(t, preBindPlugin, postBindPlugin) | ||||
| 			testCtx, teardown := initTestSchedulerForFrameworkTest(t, testContext, 2, | ||||
| 			testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 2, | ||||
| 				scheduler.WithProfiles(prof), | ||||
| 				scheduler.WithFrameworkOutOfTreeRegistry(registry)) | ||||
| 			defer teardown() | ||||
| @@ -1846,7 +1850,7 @@ func TestPermitPlugin(t *testing.T) { | ||||
| 			perPlugin := &PermitPlugin{name: permitPluginName} | ||||
| 			registry, prof := initRegistryAndConfig(t, perPlugin) | ||||
|  | ||||
| 			testCtx, teardown := initTestSchedulerForFrameworkTest(t, testContext, 2, | ||||
| 			testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 2, | ||||
| 				scheduler.WithProfiles(prof), | ||||
| 				scheduler.WithFrameworkOutOfTreeRegistry(registry)) | ||||
| 			defer teardown() | ||||
| @@ -1895,7 +1899,7 @@ func TestMultiplePermitPlugins(t *testing.T) { | ||||
| 	registry, prof := initRegistryAndConfig(t, perPlugin1, perPlugin2) | ||||
|  | ||||
| 	// Create the API server and the scheduler with the test plugin set. | ||||
| 	testCtx, _ := initTestSchedulerForFrameworkTest(t, testutils.InitTestAPIServer(t, "multi-permit-plugin", nil), 2, | ||||
| 	testCtx, _ := schedulerutils.InitTestSchedulerForFrameworkTest(t, testutils.InitTestAPIServer(t, "multi-permit-plugin", nil), 2, | ||||
| 		scheduler.WithProfiles(prof), | ||||
| 		scheduler.WithFrameworkOutOfTreeRegistry(registry)) | ||||
|  | ||||
| @@ -1947,7 +1951,7 @@ func TestPermitPluginsCancelled(t *testing.T) { | ||||
| 	registry, prof := initRegistryAndConfig(t, perPlugin1, perPlugin2) | ||||
|  | ||||
| 	// Create the API server and the scheduler with the test plugin set. | ||||
| 	testCtx, _ := initTestSchedulerForFrameworkTest(t, testutils.InitTestAPIServer(t, "permit-plugins", nil), 2, | ||||
| 	testCtx, _ := schedulerutils.InitTestSchedulerForFrameworkTest(t, testutils.InitTestAPIServer(t, "permit-plugins", nil), 2, | ||||
| 		scheduler.WithProfiles(prof), | ||||
| 		scheduler.WithFrameworkOutOfTreeRegistry(registry)) | ||||
|  | ||||
| @@ -2010,7 +2014,7 @@ func TestCoSchedulingWithPermitPlugin(t *testing.T) { | ||||
| 			permitPlugin := &PermitPlugin{name: permitPluginName} | ||||
| 			registry, prof := initRegistryAndConfig(t, permitPlugin) | ||||
|  | ||||
| 			testCtx, teardown := initTestSchedulerForFrameworkTest(t, testContext, 2, | ||||
| 			testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 2, | ||||
| 				scheduler.WithProfiles(prof), | ||||
| 				scheduler.WithFrameworkOutOfTreeRegistry(registry)) | ||||
| 			defer teardown() | ||||
| @@ -2092,7 +2096,7 @@ func TestFilterPlugin(t *testing.T) { | ||||
| 			filterPlugin := &FilterPlugin{} | ||||
| 			registry, prof := initRegistryAndConfig(t, filterPlugin) | ||||
|  | ||||
| 			testCtx, teardown := initTestSchedulerForFrameworkTest(t, testContext, 1, | ||||
| 			testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 1, | ||||
| 				scheduler.WithProfiles(prof), | ||||
| 				scheduler.WithFrameworkOutOfTreeRegistry(registry)) | ||||
| 			defer teardown() | ||||
| @@ -2148,7 +2152,7 @@ func TestPreScorePlugin(t *testing.T) { | ||||
| 			preScorePlugin := &PreScorePlugin{} | ||||
| 			registry, prof := initRegistryAndConfig(t, preScorePlugin) | ||||
|  | ||||
| 			testCtx, teardown := initTestSchedulerForFrameworkTest(t, testContext, 2, | ||||
| 			testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 2, | ||||
| 				scheduler.WithProfiles(prof), | ||||
| 				scheduler.WithFrameworkOutOfTreeRegistry(registry)) | ||||
| 			defer teardown() | ||||
| @@ -2209,7 +2213,7 @@ func TestPreEnqueuePlugin(t *testing.T) { | ||||
| 			preFilterPlugin := &PreFilterPlugin{} | ||||
| 			registry, prof := initRegistryAndConfig(t, enqueuePlugin, preFilterPlugin) | ||||
|  | ||||
| 			testCtx, teardown := initTestSchedulerForFrameworkTest(t, testContext, 1, | ||||
| 			testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 1, | ||||
| 				scheduler.WithProfiles(prof), | ||||
| 				scheduler.WithFrameworkOutOfTreeRegistry(registry)) | ||||
| 			defer teardown() | ||||
| @@ -2338,7 +2342,7 @@ func TestPreemptWithPermitPlugin(t *testing.T) { | ||||
| 				}, | ||||
| 			}) | ||||
|  | ||||
| 			testCtx, teardown := initTestSchedulerForFrameworkTest(t, testContext, 0, | ||||
| 			testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 0, | ||||
| 				scheduler.WithProfiles(cfg.Profiles...), | ||||
| 				scheduler.WithFrameworkOutOfTreeRegistry(registry), | ||||
| 			) | ||||
| @@ -2518,7 +2522,7 @@ func TestActivatePods(t *testing.T) { | ||||
| 	}) | ||||
|  | ||||
| 	// Create the API server and the scheduler with the test plugin set. | ||||
| 	testCtx, _ := initTestSchedulerForFrameworkTest(t, testutils.InitTestAPIServer(t, "job-plugin", nil), 1, | ||||
| 	testCtx, _ := schedulerutils.InitTestSchedulerForFrameworkTest(t, testutils.InitTestAPIServer(t, "job-plugin", nil), 1, | ||||
| 		scheduler.WithProfiles(cfg.Profiles...), | ||||
| 		scheduler.WithFrameworkOutOfTreeRegistry(registry)) | ||||
|  | ||||
| @@ -2563,96 +2567,3 @@ func TestActivatePods(t *testing.T) { | ||||
| 		t.Errorf("JobPlugin's pods activation logic is not called") | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // The returned shutdown func will delete created resources and scheduler, resources should be those | ||||
| // that will affect the scheduling result, like nodes, pods, etc.. Namespaces should not be | ||||
| // deleted here because it's created together with the apiserver, they should be deleted | ||||
| // simultaneously or we'll have no namespace. | ||||
| // This should only be called when you want to kill the scheduler alone, away from apiserver. | ||||
| // For example, in scheduler integration tests, recreating apiserver is performance consuming, | ||||
| // then shutdown the scheduler and recreate it between each test case is a better approach. | ||||
| func initTestSchedulerForFrameworkTest(t *testing.T, testCtx *testutils.TestContext, nodeCount int, opts ...scheduler.Option) (*testutils.TestContext, testutils.ShutdownFunc) { | ||||
| 	testCtx = testutils.InitTestSchedulerWithOptions(t, testCtx, 0, opts...) | ||||
| 	testutils.SyncSchedulerInformerFactory(testCtx) | ||||
| 	go testCtx.Scheduler.Run(testCtx.SchedulerCtx) | ||||
|  | ||||
| 	if nodeCount > 0 { | ||||
| 		if _, err := createAndWaitForNodesInCache(testCtx, "test-node", st.MakeNode(), nodeCount); err != nil { | ||||
| 			// Make sure to cleanup the resources when initializing error. | ||||
| 			testutils.CleanupTest(t, testCtx) | ||||
| 			t.Fatal(err) | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	teardown := func() { | ||||
| 		err := testCtx.ClientSet.CoreV1().Nodes().DeleteCollection(testCtx.SchedulerCtx, *metav1.NewDeleteOptions(0), metav1.ListOptions{}) | ||||
| 		if err != nil { | ||||
| 			t.Errorf("error while deleting all nodes: %v", err) | ||||
| 		} | ||||
| 		err = testCtx.ClientSet.CoreV1().Pods(testCtx.NS.Name).DeleteCollection(testCtx.SchedulerCtx, *metav1.NewDeleteOptions(0), metav1.ListOptions{}) | ||||
| 		if err != nil { | ||||
| 			t.Errorf("error while deleting pod: %v", err) | ||||
| 		} | ||||
| 		// Wait for all pods to be deleted, or will failed to create same name pods | ||||
| 		// required in other test cases. | ||||
| 		err = wait.PollUntilContextTimeout(testCtx.SchedulerCtx, time.Millisecond, wait.ForeverTestTimeout, true, | ||||
| 			testutils.PodsCleanedUp(testCtx.SchedulerCtx, testCtx.ClientSet, testCtx.NS.Name)) | ||||
| 		if err != nil { | ||||
| 			t.Errorf("error while waiting for all pods to be deleted: %v", err) | ||||
| 		} | ||||
| 		// Kill the scheduler. | ||||
| 		testCtx.SchedulerCloseFn() | ||||
| 	} | ||||
|  | ||||
| 	return testCtx, teardown | ||||
| } | ||||
|  | ||||
| // initRegistryAndConfig returns registry and plugins config based on give plugins. | ||||
| func initRegistryAndConfig(t *testing.T, plugins ...framework.Plugin) (frameworkruntime.Registry, schedulerconfig.KubeSchedulerProfile) { | ||||
| 	if len(plugins) == 0 { | ||||
| 		return frameworkruntime.Registry{}, schedulerconfig.KubeSchedulerProfile{} | ||||
| 	} | ||||
|  | ||||
| 	registry := frameworkruntime.Registry{} | ||||
| 	pls := &configv1.Plugins{} | ||||
|  | ||||
| 	for _, p := range plugins { | ||||
| 		registry.Register(p.Name(), newPlugin(p)) | ||||
| 		plugin := configv1.Plugin{Name: p.Name()} | ||||
|  | ||||
| 		switch p.(type) { | ||||
| 		case *PreEnqueuePlugin: | ||||
| 			pls.PreEnqueue.Enabled = append(pls.PreEnqueue.Enabled, plugin) | ||||
| 		case *PreFilterPlugin: | ||||
| 			pls.PreFilter.Enabled = append(pls.PreFilter.Enabled, plugin) | ||||
| 		case *FilterPlugin: | ||||
| 			pls.Filter.Enabled = append(pls.Filter.Enabled, plugin) | ||||
| 		case *PreScorePlugin: | ||||
| 			pls.PreScore.Enabled = append(pls.PreScore.Enabled, plugin) | ||||
| 		case *ScorePlugin, *ScoreWithNormalizePlugin: | ||||
| 			pls.Score.Enabled = append(pls.Score.Enabled, plugin) | ||||
| 		case *ReservePlugin: | ||||
| 			pls.Reserve.Enabled = append(pls.Reserve.Enabled, plugin) | ||||
| 		case *PreBindPlugin: | ||||
| 			pls.PreBind.Enabled = append(pls.PreBind.Enabled, plugin) | ||||
| 		case *BindPlugin: | ||||
| 			pls.Bind.Enabled = append(pls.Bind.Enabled, plugin) | ||||
| 			// It's intentional to disable the DefaultBind plugin. Otherwise, DefaultBinder's failure would fail | ||||
| 			// a pod's scheduling, as well as the test BindPlugin's execution. | ||||
| 			pls.Bind.Disabled = []configv1.Plugin{{Name: defaultbinder.Name}} | ||||
| 		case *PostBindPlugin: | ||||
| 			pls.PostBind.Enabled = append(pls.PostBind.Enabled, plugin) | ||||
| 		case *PermitPlugin: | ||||
| 			pls.Permit.Enabled = append(pls.Permit.Enabled, plugin) | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	versionedCfg := configv1.KubeSchedulerConfiguration{ | ||||
| 		Profiles: []configv1.KubeSchedulerProfile{{ | ||||
| 			SchedulerName: pointer.String(v1.DefaultSchedulerName), | ||||
| 			Plugins:       pls, | ||||
| 		}}, | ||||
| 	} | ||||
| 	cfg := configtesting.V1ToInternalWithDefaults(t, versionedCfg) | ||||
| 	return registry, cfg.Profiles[0] | ||||
| } | ||||
|   | ||||
							
								
								
									
										171
									
								
								test/integration/scheduler/rescheduling_test.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										171
									
								
								test/integration/scheduler/rescheduling_test.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,171 @@ | ||||
| /* | ||||
| Copyright 2022 The Kubernetes Authors. | ||||
|  | ||||
| Licensed under the Apache License, Version 2.0 (the "License"); | ||||
| you may not use this file except in compliance with the License. | ||||
| You may obtain a copy of the License at | ||||
|  | ||||
|     http://www.apache.org/licenses/LICENSE-2.0 | ||||
|  | ||||
| Unless required by applicable law or agreed to in writing, software | ||||
| distributed under the License is distributed on an "AS IS" BASIS, | ||||
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
| See the License for the specific language governing permissions and | ||||
| limitations under the License. | ||||
| */ | ||||
|  | ||||
| package scheduler | ||||
|  | ||||
| import ( | ||||
| 	"context" | ||||
| 	"testing" | ||||
| 	"time" | ||||
|  | ||||
| 	v1 "k8s.io/api/core/v1" | ||||
| 	"k8s.io/apimachinery/pkg/util/wait" | ||||
| 	"k8s.io/kubernetes/pkg/scheduler" | ||||
| 	"k8s.io/kubernetes/pkg/scheduler/framework" | ||||
| 	st "k8s.io/kubernetes/pkg/scheduler/testing" | ||||
| 	testutils "k8s.io/kubernetes/test/integration/util" | ||||
| ) | ||||
|  | ||||
| var _ framework.PermitPlugin = &PermitPlugin{} | ||||
| var _ framework.EnqueueExtensions = &PermitPlugin{} | ||||
|  | ||||
| type PermitPlugin struct { | ||||
| 	name            string | ||||
| 	statusCode      framework.Code | ||||
| 	numPermitCalled int | ||||
| } | ||||
|  | ||||
| func (pp *PermitPlugin) Name() string { | ||||
| 	return pp.name | ||||
| } | ||||
|  | ||||
| func (pp *PermitPlugin) Permit(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) (*framework.Status, time.Duration) { | ||||
| 	pp.numPermitCalled += 1 | ||||
|  | ||||
| 	if pp.statusCode == framework.Error { | ||||
| 		return framework.NewStatus(framework.Error, "failed to permit"), 0 | ||||
| 	} | ||||
|  | ||||
| 	if pp.statusCode == framework.Unschedulable { | ||||
| 		if pp.numPermitCalled <= 1 { | ||||
| 			return framework.NewStatus(framework.Unschedulable, "reject to permit"), 0 | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	return nil, 0 | ||||
| } | ||||
|  | ||||
| func (pp *PermitPlugin) EventsToRegister() []framework.ClusterEventWithHint { | ||||
| 	return []framework.ClusterEventWithHint{ | ||||
| 		{ | ||||
| 			Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add}, | ||||
| 			QueueingHintFn: func(pod *v1.Pod, oldObj, newObj interface{}) framework.QueueingHint { | ||||
| 				return framework.QueueImmediately | ||||
| 			}, | ||||
| 		}, | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func TestReScheduling(t *testing.T) { | ||||
| 	testContext := testutils.InitTestAPIServer(t, "permit-plugin", nil) | ||||
| 	tests := []struct { | ||||
| 		name    string | ||||
| 		plugins []framework.Plugin | ||||
| 		action  func() error | ||||
| 		// The first time for pod scheduling, we make pod scheduled error or unschedulable on purpose. | ||||
| 		// This is controlled by wantFirstSchedulingError. By default, pod is unschedulable. | ||||
| 		wantFirstSchedulingError bool | ||||
|  | ||||
| 		// wantScheduled/wantError means the final expected scheduling result. | ||||
| 		wantScheduled bool | ||||
| 		wantError     bool | ||||
| 	}{ | ||||
| 		{ | ||||
| 			name: "Rescheduling pod rejected by Permit Plugin", | ||||
| 			plugins: []framework.Plugin{ | ||||
| 				&PermitPlugin{name: "permit", statusCode: framework.Unschedulable}, | ||||
| 			}, | ||||
| 			action: func() error { | ||||
| 				_, err := createNode(testContext.ClientSet, st.MakeNode().Name("fake-node").Obj()) | ||||
| 				return err | ||||
| 			}, | ||||
| 			wantScheduled: true, | ||||
| 		}, | ||||
| 		{ | ||||
| 			name: "Rescheduling pod rejected by Permit Plugin with unrelated event", | ||||
| 			plugins: []framework.Plugin{ | ||||
| 				&PermitPlugin{name: "permit", statusCode: framework.Unschedulable}, | ||||
| 			}, | ||||
| 			action: func() error { | ||||
| 				_, err := createPausePod(testContext.ClientSet, | ||||
| 					initPausePod(&testutils.PausePodConfig{Name: "test-pod-2", Namespace: testContext.NS.Name})) | ||||
| 				return err | ||||
| 			}, | ||||
| 			wantScheduled: false, | ||||
| 		}, | ||||
| 		{ | ||||
| 			name: "Rescheduling pod failed by Permit Plugin", | ||||
| 			plugins: []framework.Plugin{ | ||||
| 				&PermitPlugin{name: "permit", statusCode: framework.Error}, | ||||
| 			}, | ||||
| 			action: func() error { | ||||
| 				_, err := createNode(testContext.ClientSet, st.MakeNode().Name("fake-node").Obj()) | ||||
| 				return err | ||||
| 			}, | ||||
| 			wantFirstSchedulingError: true, | ||||
| 			wantError:                true, | ||||
| 		}, | ||||
| 	} | ||||
|  | ||||
| 	for _, test := range tests { | ||||
| 		t.Run(test.name, func(t *testing.T) { | ||||
| 			// Create a plugin registry for testing. Register only a permit plugin. | ||||
| 			registry, prof := InitRegistryAndConfig(t, nil, test.plugins...) | ||||
|  | ||||
| 			testCtx, teardown := InitTestSchedulerForFrameworkTest(t, testContext, 2, | ||||
| 				scheduler.WithProfiles(prof), | ||||
| 				scheduler.WithFrameworkOutOfTreeRegistry(registry)) | ||||
| 			defer teardown() | ||||
|  | ||||
| 			pod, err := createPausePod(testCtx.ClientSet, | ||||
| 				initPausePod(&testutils.PausePodConfig{Name: "test-pod", Namespace: testCtx.NS.Name})) | ||||
| 			if err != nil { | ||||
| 				t.Errorf("Error while creating a test pod: %v", err) | ||||
| 			} | ||||
|  | ||||
| 			// The first time for scheduling, pod is error or unschedulable, controlled by wantFirstSchedulingError | ||||
| 			if test.wantFirstSchedulingError { | ||||
| 				if err = wait.Poll(10*time.Millisecond, 30*time.Second, testutils.PodSchedulingError(testCtx.ClientSet, pod.Namespace, pod.Name)); err != nil { | ||||
| 					t.Errorf("Expected a scheduling error, but got: %v", err) | ||||
| 				} | ||||
| 			} else { | ||||
| 				if err = waitForPodUnschedulable(testCtx.ClientSet, pod); err != nil { | ||||
| 					t.Errorf("Didn't expect the pod to be scheduled. error: %v", err) | ||||
| 				} | ||||
| 			} | ||||
|  | ||||
| 			if test.action() != nil { | ||||
| 				if err = test.action(); err != nil { | ||||
| 					t.Errorf("Perform action() error: %v", err) | ||||
| 				} | ||||
| 			} | ||||
|  | ||||
| 			if test.wantScheduled { | ||||
| 				if err = testutils.WaitForPodToSchedule(testCtx.ClientSet, pod); err != nil { | ||||
| 					t.Errorf("Didn't expect the pod to be unschedulable. error: %v", err) | ||||
| 				} | ||||
| 			} else if test.wantError { | ||||
| 				if err = wait.Poll(10*time.Millisecond, 30*time.Second, testutils.PodSchedulingError(testCtx.ClientSet, pod.Namespace, pod.Name)); err != nil { | ||||
| 					t.Errorf("Expected a scheduling error, but got: %v", err) | ||||
| 				} | ||||
| 			} else { | ||||
| 				if err = waitForPodUnschedulable(testCtx.ClientSet, pod); err != nil { | ||||
| 					t.Errorf("Didn't expect the pod to be scheduled. error: %v", err) | ||||
| 				} | ||||
| 			} | ||||
| 		}) | ||||
| 	} | ||||
| } | ||||
| @@ -17,7 +17,23 @@ limitations under the License. | ||||
| package scheduler | ||||
|  | ||||
| import ( | ||||
| 	"testing" | ||||
| 	"time" | ||||
|  | ||||
| 	v1 "k8s.io/api/core/v1" | ||||
| 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||||
| 	"k8s.io/apimachinery/pkg/runtime" | ||||
| 	"k8s.io/apimachinery/pkg/util/wait" | ||||
| 	configv1 "k8s.io/kube-scheduler/config/v1" | ||||
| 	"k8s.io/kubernetes/pkg/scheduler" | ||||
| 	schedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config" | ||||
| 	configtesting "k8s.io/kubernetes/pkg/scheduler/apis/config/testing" | ||||
| 	"k8s.io/kubernetes/pkg/scheduler/framework" | ||||
| 	"k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultbinder" | ||||
| 	frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime" | ||||
| 	st "k8s.io/kubernetes/pkg/scheduler/testing" | ||||
| 	testutils "k8s.io/kubernetes/test/integration/util" | ||||
| 	"k8s.io/utils/pointer" | ||||
| ) | ||||
|  | ||||
| var ( | ||||
| @@ -35,3 +51,107 @@ var ( | ||||
| 	waitForPodUnschedulable    = testutils.WaitForPodUnschedulable | ||||
| 	waitForReflection          = testutils.WaitForReflection | ||||
| ) | ||||
|  | ||||
| // The returned shutdown func will delete created resources and scheduler, resources should be those | ||||
| // that will affect the scheduling result, like nodes, pods, etc.. Namespaces should not be | ||||
| // deleted here because it's created together with the apiserver, they should be deleted | ||||
| // simultaneously or we'll have no namespace. | ||||
| // This should only be called when you want to kill the scheduler alone, away from apiserver. | ||||
| // For example, in scheduler integration tests, recreating apiserver is performance consuming, | ||||
| // then shutdown the scheduler and recreate it between each test case is a better approach. | ||||
| func InitTestSchedulerForFrameworkTest(t *testing.T, testCtx *testutils.TestContext, nodeCount int, opts ...scheduler.Option) (*testutils.TestContext, testutils.ShutdownFunc) { | ||||
| 	testCtx = testutils.InitTestSchedulerWithOptions(t, testCtx, 0, opts...) | ||||
| 	testutils.SyncSchedulerInformerFactory(testCtx) | ||||
| 	go testCtx.Scheduler.Run(testCtx.SchedulerCtx) | ||||
|  | ||||
| 	if nodeCount > 0 { | ||||
| 		if _, err := testutils.CreateAndWaitForNodesInCache(testCtx, "test-node", st.MakeNode(), nodeCount); err != nil { | ||||
| 			// Make sure to cleanup the resources when initializing error. | ||||
| 			testutils.CleanupTest(t, testCtx) | ||||
| 			t.Fatal(err) | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	teardown := func() { | ||||
| 		err := testCtx.ClientSet.CoreV1().Nodes().DeleteCollection(testCtx.SchedulerCtx, *metav1.NewDeleteOptions(0), metav1.ListOptions{}) | ||||
| 		if err != nil { | ||||
| 			t.Errorf("error while deleting all nodes: %v", err) | ||||
| 		} | ||||
| 		err = testCtx.ClientSet.CoreV1().Pods(testCtx.NS.Name).DeleteCollection(testCtx.SchedulerCtx, *metav1.NewDeleteOptions(0), metav1.ListOptions{}) | ||||
| 		if err != nil { | ||||
| 			t.Errorf("error while deleting pod: %v", err) | ||||
| 		} | ||||
| 		// Wait for all pods to be deleted, or will failed to create same name pods | ||||
| 		// required in other test cases. | ||||
| 		err = wait.PollUntilContextTimeout(testCtx.SchedulerCtx, time.Millisecond, wait.ForeverTestTimeout, true, | ||||
| 			testutils.PodsCleanedUp(testCtx.SchedulerCtx, testCtx.ClientSet, testCtx.NS.Name)) | ||||
| 		if err != nil { | ||||
| 			t.Errorf("error while waiting for all pods to be deleted: %v", err) | ||||
| 		} | ||||
| 		// Kill the scheduler. | ||||
| 		testCtx.SchedulerCloseFn() | ||||
| 	} | ||||
|  | ||||
| 	return testCtx, teardown | ||||
| } | ||||
|  | ||||
| // NewPlugin returns a plugin factory with specified Plugin. | ||||
| func NewPlugin(plugin framework.Plugin) frameworkruntime.PluginFactory { | ||||
| 	return func(_ runtime.Object, fh framework.Handle) (framework.Plugin, error) { | ||||
| 		return plugin, nil | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // InitRegistryAndConfig returns registry and plugins config based on give plugins. | ||||
| func InitRegistryAndConfig(t *testing.T, factory func(plugin framework.Plugin) frameworkruntime.PluginFactory, plugins ...framework.Plugin) (frameworkruntime.Registry, schedulerconfig.KubeSchedulerProfile) { | ||||
| 	if len(plugins) == 0 { | ||||
| 		return frameworkruntime.Registry{}, schedulerconfig.KubeSchedulerProfile{} | ||||
| 	} | ||||
|  | ||||
| 	if factory == nil { | ||||
| 		factory = NewPlugin | ||||
| 	} | ||||
|  | ||||
| 	registry := frameworkruntime.Registry{} | ||||
| 	pls := &configv1.Plugins{} | ||||
|  | ||||
| 	for _, p := range plugins { | ||||
| 		registry.Register(p.Name(), factory(p)) | ||||
| 		plugin := configv1.Plugin{Name: p.Name()} | ||||
|  | ||||
| 		switch p.(type) { | ||||
| 		case framework.PreEnqueuePlugin: | ||||
| 			pls.PreEnqueue.Enabled = append(pls.PreEnqueue.Enabled, plugin) | ||||
| 		case framework.PreFilterPlugin: | ||||
| 			pls.PreFilter.Enabled = append(pls.PreFilter.Enabled, plugin) | ||||
| 		case framework.FilterPlugin: | ||||
| 			pls.Filter.Enabled = append(pls.Filter.Enabled, plugin) | ||||
| 		case framework.PreScorePlugin: | ||||
| 			pls.PreScore.Enabled = append(pls.PreScore.Enabled, plugin) | ||||
| 		case framework.ScorePlugin: | ||||
| 			pls.Score.Enabled = append(pls.Score.Enabled, plugin) | ||||
| 		case framework.ReservePlugin: | ||||
| 			pls.Reserve.Enabled = append(pls.Reserve.Enabled, plugin) | ||||
| 		case framework.PreBindPlugin: | ||||
| 			pls.PreBind.Enabled = append(pls.PreBind.Enabled, plugin) | ||||
| 		case framework.BindPlugin: | ||||
| 			pls.Bind.Enabled = append(pls.Bind.Enabled, plugin) | ||||
| 			// It's intentional to disable the DefaultBind plugin. Otherwise, DefaultBinder's failure would fail | ||||
| 			// a pod's scheduling, as well as the test BindPlugin's execution. | ||||
| 			pls.Bind.Disabled = []configv1.Plugin{{Name: defaultbinder.Name}} | ||||
| 		case framework.PostBindPlugin: | ||||
| 			pls.PostBind.Enabled = append(pls.PostBind.Enabled, plugin) | ||||
| 		case framework.PermitPlugin: | ||||
| 			pls.Permit.Enabled = append(pls.Permit.Enabled, plugin) | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	versionedCfg := configv1.KubeSchedulerConfiguration{ | ||||
| 		Profiles: []configv1.KubeSchedulerProfile{{ | ||||
| 			SchedulerName: pointer.String(v1.DefaultSchedulerName), | ||||
| 			Plugins:       pls, | ||||
| 		}}, | ||||
| 	} | ||||
| 	cfg := configtesting.V1ToInternalWithDefaults(t, versionedCfg) | ||||
| 	return registry, cfg.Profiles[0] | ||||
| } | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 Kubernetes Prow Robot
					Kubernetes Prow Robot