Merge pull request #121867 from lianghao208/preeption
feat: Support score extension function in preemption.
This commit is contained in:
		@@ -264,6 +264,11 @@ func (pl *DefaultPreemption) PodEligibleToPreemptOthers(pod *v1.Pod, nominatedNo
 | 
			
		||||
	return true, ""
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// OrderedScoreFuncs returns a list of ordered score functions to select preferable node where victims will be preempted.
 | 
			
		||||
func (pl *DefaultPreemption) OrderedScoreFuncs(ctx context.Context, nodesToVictims map[string]*extenderv1.Victims) []func(node string) int64 {
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// podTerminatingByPreemption returns the pod's terminating state if feature PodDisruptionConditions is not enabled.
 | 
			
		||||
// Otherwise, it additionally checks if the termination state is caused by scheduler preemption.
 | 
			
		||||
func podTerminatingByPreemption(p *v1.Pod, enablePodDisruptionConditions bool) bool {
 | 
			
		||||
 
 | 
			
		||||
@@ -1389,7 +1389,7 @@ func TestSelectBestCandidate(t *testing.T) {
 | 
			
		||||
			}
 | 
			
		||||
			offset, numCandidates := pl.GetOffsetAndNumCandidates(int32(len(nodeInfos)))
 | 
			
		||||
			candidates, _, _ := pe.DryRunPreemption(ctx, tt.pod, nodeInfos, nil, offset, numCandidates)
 | 
			
		||||
			s := pe.SelectCandidate(logger, candidates)
 | 
			
		||||
			s := pe.SelectCandidate(ctx, candidates)
 | 
			
		||||
			if s == nil || len(s.Name()) == 0 {
 | 
			
		||||
				return
 | 
			
		||||
			}
 | 
			
		||||
 
 | 
			
		||||
@@ -116,6 +116,10 @@ type Interface interface {
 | 
			
		||||
	// Note that both `state` and `nodeInfo` are deep copied.
 | 
			
		||||
	SelectVictimsOnNode(ctx context.Context, state *framework.CycleState,
 | 
			
		||||
		pod *v1.Pod, nodeInfo *framework.NodeInfo, pdbs []*policy.PodDisruptionBudget) ([]*v1.Pod, int, *framework.Status)
 | 
			
		||||
	// OrderedScoreFuncs returns a list of ordered score functions to select preferable node where victims will be preempted.
 | 
			
		||||
	// The ordered score functions will be processed one by one iff we find more than one node with the highest score.
 | 
			
		||||
	// Default score functions will be processed if nil returned here for backwards-compatibility.
 | 
			
		||||
	OrderedScoreFuncs(ctx context.Context, nodesToVictims map[string]*extenderv1.Victims) []func(node string) int64
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type Evaluator struct {
 | 
			
		||||
@@ -190,7 +194,7 @@ func (ev *Evaluator) Preempt(ctx context.Context, pod *v1.Pod, m framework.NodeT
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// 4) Find the best candidate.
 | 
			
		||||
	bestCandidate := ev.SelectCandidate(logger, candidates)
 | 
			
		||||
	bestCandidate := ev.SelectCandidate(ctx, candidates)
 | 
			
		||||
	if bestCandidate == nil || len(bestCandidate.Name()) == 0 {
 | 
			
		||||
		return nil, framework.NewStatus(framework.Unschedulable, "no candidate node for preemption")
 | 
			
		||||
	}
 | 
			
		||||
@@ -309,7 +313,9 @@ func (ev *Evaluator) callExtenders(logger klog.Logger, pod *v1.Pod, candidates [
 | 
			
		||||
 | 
			
		||||
// SelectCandidate chooses the best-fit candidate from given <candidates> and return it.
 | 
			
		||||
// NOTE: This method is exported for easier testing in default preemption.
 | 
			
		||||
func (ev *Evaluator) SelectCandidate(logger klog.Logger, candidates []Candidate) Candidate {
 | 
			
		||||
func (ev *Evaluator) SelectCandidate(ctx context.Context, candidates []Candidate) Candidate {
 | 
			
		||||
	logger := klog.FromContext(ctx)
 | 
			
		||||
 | 
			
		||||
	if len(candidates) == 0 {
 | 
			
		||||
		return nil
 | 
			
		||||
	}
 | 
			
		||||
@@ -318,7 +324,8 @@ func (ev *Evaluator) SelectCandidate(logger klog.Logger, candidates []Candidate)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	victimsMap := ev.CandidatesToVictimsMap(candidates)
 | 
			
		||||
	candidateNode := pickOneNodeForPreemption(logger, victimsMap)
 | 
			
		||||
	scoreFuncs := ev.OrderedScoreFuncs(ctx, victimsMap)
 | 
			
		||||
	candidateNode := pickOneNodeForPreemption(logger, victimsMap, scoreFuncs)
 | 
			
		||||
 | 
			
		||||
	// Same as candidatesToVictimsMap, this logic is not applicable for out-of-tree
 | 
			
		||||
	// preemption plugins that exercise different candidates on the same nominated node.
 | 
			
		||||
@@ -428,8 +435,10 @@ func getPodDisruptionBudgets(pdbLister policylisters.PodDisruptionBudgetLister)
 | 
			
		||||
	return nil, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// pickOneNodeForPreemption chooses one node among the given nodes. It assumes
 | 
			
		||||
// pods in each map entry are ordered by decreasing priority.
 | 
			
		||||
// pickOneNodeForPreemption chooses one node among the given nodes.
 | 
			
		||||
// It assumes pods in each map entry are ordered by decreasing priority.
 | 
			
		||||
// If the scoreFuns is not empty, It picks a node based on score scoreFuns returns.
 | 
			
		||||
// If the scoreFuns is empty,
 | 
			
		||||
// It picks a node based on the following criteria:
 | 
			
		||||
// 1. A node with minimum number of PDB violations.
 | 
			
		||||
// 2. A node with minimum highest priority victim is picked.
 | 
			
		||||
@@ -439,7 +448,7 @@ func getPodDisruptionBudgets(pdbLister policylisters.PodDisruptionBudgetLister)
 | 
			
		||||
// 6. If there are still ties, the first such node is picked (sort of randomly).
 | 
			
		||||
// The 'minNodes1' and 'minNodes2' are being reused here to save the memory
 | 
			
		||||
// allocation and garbage collection time.
 | 
			
		||||
func pickOneNodeForPreemption(logger klog.Logger, nodesToVictims map[string]*extenderv1.Victims) string {
 | 
			
		||||
func pickOneNodeForPreemption(logger klog.Logger, nodesToVictims map[string]*extenderv1.Victims, scoreFuncs []func(node string) int64) string {
 | 
			
		||||
	if len(nodesToVictims) == 0 {
 | 
			
		||||
		return ""
 | 
			
		||||
	}
 | 
			
		||||
@@ -449,6 +458,7 @@ func pickOneNodeForPreemption(logger klog.Logger, nodesToVictims map[string]*ext
 | 
			
		||||
		allCandidates = append(allCandidates, node)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if len(scoreFuncs) == 0 {
 | 
			
		||||
		minNumPDBViolatingScoreFunc := func(node string) int64 {
 | 
			
		||||
			// The smaller the NumPDBViolations, the higher the score.
 | 
			
		||||
			return -nodesToVictims[node].NumPDBViolations
 | 
			
		||||
@@ -476,7 +486,7 @@ func pickOneNodeForPreemption(logger klog.Logger, nodesToVictims map[string]*ext
 | 
			
		||||
			return -int64(len(nodesToVictims[node].Pods))
 | 
			
		||||
		}
 | 
			
		||||
		latestStartTimeScoreFunc := func(node string) int64 {
 | 
			
		||||
		// Get earliest start time of all pods on the current node.
 | 
			
		||||
			// Get the earliest start time of all pods on the current node.
 | 
			
		||||
			earliestStartTimeOnNode := util.GetEarliestPodStartTime(nodesToVictims[node])
 | 
			
		||||
			if earliestStartTimeOnNode == nil {
 | 
			
		||||
				logger.Error(errors.New("earliestStartTime is nil for node"), "Should not reach here", "node", node)
 | 
			
		||||
@@ -489,7 +499,7 @@ func pickOneNodeForPreemption(logger klog.Logger, nodesToVictims map[string]*ext
 | 
			
		||||
		// Each scoreFunc scores the nodes according to specific rules and keeps the name of the node
 | 
			
		||||
		// with the highest score. If and only if the scoreFunc has more than one node with the highest
 | 
			
		||||
		// score, we will execute the other scoreFunc in order of precedence.
 | 
			
		||||
	scoreFuncs := []func(string) int64{
 | 
			
		||||
		scoreFuncs = []func(string) int64{
 | 
			
		||||
			// A node with a minimum number of PDB is preferable.
 | 
			
		||||
			minNumPDBViolatingScoreFunc,
 | 
			
		||||
			// A node with a minimum highest priority victim is preferable.
 | 
			
		||||
@@ -502,6 +512,7 @@ func pickOneNodeForPreemption(logger klog.Logger, nodesToVictims map[string]*ext
 | 
			
		||||
			latestStartTimeScoreFunc,
 | 
			
		||||
			// If there are still ties, then the first Node in the list is selected.
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for _, f := range scoreFuncs {
 | 
			
		||||
		selectedNodes := []string{}
 | 
			
		||||
 
 | 
			
		||||
@@ -82,6 +82,47 @@ func (pl *FakePostFilterPlugin) PodEligibleToPreemptOthers(pod *v1.Pod, nominate
 | 
			
		||||
	return true, ""
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (pl *FakePostFilterPlugin) OrderedScoreFuncs(ctx context.Context, nodesToVictims map[string]*extenderv1.Victims) []func(node string) int64 {
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type FakePreemptionScorePostFilterPlugin struct{}
 | 
			
		||||
 | 
			
		||||
func (pl *FakePreemptionScorePostFilterPlugin) SelectVictimsOnNode(
 | 
			
		||||
	ctx context.Context, state *framework.CycleState, pod *v1.Pod,
 | 
			
		||||
	nodeInfo *framework.NodeInfo, pdbs []*policy.PodDisruptionBudget) (victims []*v1.Pod, numViolatingVictim int, status *framework.Status) {
 | 
			
		||||
	return append(victims, nodeInfo.Pods[0].Pod), 1, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (pl *FakePreemptionScorePostFilterPlugin) GetOffsetAndNumCandidates(nodes int32) (int32, int32) {
 | 
			
		||||
	return 0, nodes
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (pl *FakePreemptionScorePostFilterPlugin) CandidatesToVictimsMap(candidates []Candidate) map[string]*extenderv1.Victims {
 | 
			
		||||
	m := make(map[string]*extenderv1.Victims, len(candidates))
 | 
			
		||||
	for _, c := range candidates {
 | 
			
		||||
		m[c.Name()] = c.Victims()
 | 
			
		||||
	}
 | 
			
		||||
	return m
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (pl *FakePreemptionScorePostFilterPlugin) PodEligibleToPreemptOthers(pod *v1.Pod, nominatedNodeStatus *framework.Status) (bool, string) {
 | 
			
		||||
	return true, ""
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (pl *FakePreemptionScorePostFilterPlugin) OrderedScoreFuncs(ctx context.Context, nodesToVictims map[string]*extenderv1.Victims) []func(node string) int64 {
 | 
			
		||||
	return []func(string) int64{
 | 
			
		||||
		func(node string) int64 {
 | 
			
		||||
			var sumContainers int64
 | 
			
		||||
			for _, pod := range nodesToVictims[node].Pods {
 | 
			
		||||
				sumContainers += int64(len(pod.Spec.Containers) + len(pod.Spec.InitContainers))
 | 
			
		||||
			}
 | 
			
		||||
			// The smaller the sumContainers, the higher the score.
 | 
			
		||||
			return -sumContainers
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestNodesWherePreemptionMightHelp(t *testing.T) {
 | 
			
		||||
	// Prepare 4 nodes names.
 | 
			
		||||
	nodeNames := []string{"node1", "node2", "node3", "node4"}
 | 
			
		||||
@@ -337,3 +378,100 @@ func TestDryRunPreemption(t *testing.T) {
 | 
			
		||||
		})
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestSelectCandidate(t *testing.T) {
 | 
			
		||||
	tests := []struct {
 | 
			
		||||
		name      string
 | 
			
		||||
		nodeNames []string
 | 
			
		||||
		pod       *v1.Pod
 | 
			
		||||
		testPods  []*v1.Pod
 | 
			
		||||
		expected  string
 | 
			
		||||
	}{
 | 
			
		||||
		{
 | 
			
		||||
			name:      "pod has different number of containers on each node",
 | 
			
		||||
			nodeNames: []string{"node1", "node2", "node3"},
 | 
			
		||||
			pod:       st.MakePod().Name("p").UID("p").Priority(highPriority).Req(veryLargeRes).Obj(),
 | 
			
		||||
			testPods: []*v1.Pod{
 | 
			
		||||
				st.MakePod().Name("p1.1").UID("p1.1").Node("node1").Priority(midPriority).Containers([]v1.Container{
 | 
			
		||||
					st.MakeContainer().Name("container1").Obj(),
 | 
			
		||||
					st.MakeContainer().Name("container2").Obj(),
 | 
			
		||||
				}).Obj(),
 | 
			
		||||
				st.MakePod().Name("p2.1").UID("p2.1").Node("node2").Priority(midPriority).Containers([]v1.Container{
 | 
			
		||||
					st.MakeContainer().Name("container1").Obj(),
 | 
			
		||||
				}).Obj(),
 | 
			
		||||
				st.MakePod().Name("p3.1").UID("p3.1").Node("node3").Priority(midPriority).Containers([]v1.Container{
 | 
			
		||||
					st.MakeContainer().Name("container1").Obj(),
 | 
			
		||||
					st.MakeContainer().Name("container2").Obj(),
 | 
			
		||||
					st.MakeContainer().Name("container3").Obj(),
 | 
			
		||||
				}).Obj(),
 | 
			
		||||
			},
 | 
			
		||||
			expected: "node2",
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for _, tt := range tests {
 | 
			
		||||
		t.Run(tt.name, func(t *testing.T) {
 | 
			
		||||
			logger, _ := ktesting.NewTestContext(t)
 | 
			
		||||
			nodes := make([]*v1.Node, len(tt.nodeNames))
 | 
			
		||||
			for i, nodeName := range tt.nodeNames {
 | 
			
		||||
				nodes[i] = st.MakeNode().Name(nodeName).Capacity(veryLargeRes).Obj()
 | 
			
		||||
			}
 | 
			
		||||
			registeredPlugins := append([]tf.RegisterPluginFunc{
 | 
			
		||||
				tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New)},
 | 
			
		||||
				tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
 | 
			
		||||
			)
 | 
			
		||||
			var objs []runtime.Object
 | 
			
		||||
			objs = append(objs, tt.pod)
 | 
			
		||||
			for _, pod := range tt.testPods {
 | 
			
		||||
				objs = append(objs, pod)
 | 
			
		||||
			}
 | 
			
		||||
			informerFactory := informers.NewSharedInformerFactory(clientsetfake.NewSimpleClientset(objs...), 0)
 | 
			
		||||
			snapshot := internalcache.NewSnapshot(tt.testPods, nodes)
 | 
			
		||||
			_, ctx := ktesting.NewTestContext(t)
 | 
			
		||||
			ctx, cancel := context.WithCancel(ctx)
 | 
			
		||||
			defer cancel()
 | 
			
		||||
			fwk, err := tf.NewFramework(
 | 
			
		||||
				ctx,
 | 
			
		||||
				registeredPlugins,
 | 
			
		||||
				"",
 | 
			
		||||
				frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())),
 | 
			
		||||
				frameworkruntime.WithSnapshotSharedLister(snapshot),
 | 
			
		||||
				frameworkruntime.WithLogger(logger),
 | 
			
		||||
			)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				t.Fatal(err)
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			state := framework.NewCycleState()
 | 
			
		||||
			// Some tests rely on PreFilter plugin to compute its CycleState.
 | 
			
		||||
			if _, status := fwk.RunPreFilterPlugins(ctx, state, tt.pod); !status.IsSuccess() {
 | 
			
		||||
				t.Errorf("Unexpected PreFilter Status: %v", status)
 | 
			
		||||
			}
 | 
			
		||||
			nodeInfos, err := snapshot.NodeInfos().List()
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				t.Fatal(err)
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			fakePreemptionScorePostFilterPlugin := &FakePreemptionScorePostFilterPlugin{}
 | 
			
		||||
 | 
			
		||||
			for _, pod := range tt.testPods {
 | 
			
		||||
				state := framework.NewCycleState()
 | 
			
		||||
				pe := Evaluator{
 | 
			
		||||
					PluginName: "FakePreemptionScorePostFilter",
 | 
			
		||||
					Handler:    fwk,
 | 
			
		||||
					Interface:  fakePreemptionScorePostFilterPlugin,
 | 
			
		||||
					State:      state,
 | 
			
		||||
				}
 | 
			
		||||
				candidates, _, _ := pe.DryRunPreemption(context.Background(), pod, nodeInfos, nil, 0, int32(len(nodeInfos)))
 | 
			
		||||
				s := pe.SelectCandidate(ctx, candidates)
 | 
			
		||||
				if s == nil || len(s.Name()) == 0 {
 | 
			
		||||
					t.Errorf("expect any node in %v, but no candidate selected", tt.expected)
 | 
			
		||||
					return
 | 
			
		||||
				}
 | 
			
		||||
				if diff := cmp.Diff(tt.expected, s.Name()); diff != "" {
 | 
			
		||||
					t.Errorf("expect any node in %v, but got %v", tt.expected, s.Name())
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
		})
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user