Merge pull request #114688 from sanposhiho/sanposhiho/scheduling-one-score
feature(schedule_one): use heap to find the highest score node
This commit is contained in:
		@@ -17,7 +17,9 @@ limitations under the License.
 | 
			
		||||
package scheduler
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"container/heap"
 | 
			
		||||
	"context"
 | 
			
		||||
	"errors"
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"math/rand"
 | 
			
		||||
	"strconv"
 | 
			
		||||
@@ -56,6 +58,9 @@ const (
 | 
			
		||||
	// to ensure that a certain minimum of nodes are checked for feasibility.
 | 
			
		||||
	// This in turn helps ensure a minimum level of spreading.
 | 
			
		||||
	minFeasibleNodesPercentageToFind = 5
 | 
			
		||||
	// numberOfHighestScoredNodesToReport is the number of node scores
 | 
			
		||||
	// to be included in ScheduleResult.
 | 
			
		||||
	numberOfHighestScoredNodesToReport = 3
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// scheduleOne does the entire scheduling workflow for a single pod. It is serialized on the scheduling algorithm's host fitting.
 | 
			
		||||
@@ -374,7 +379,7 @@ func (sched *Scheduler) schedulePod(ctx context.Context, fwk framework.Framework
 | 
			
		||||
		return result, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	host, err := selectHost(priorityList)
 | 
			
		||||
	host, _, err := selectHost(priorityList, numberOfHighestScoredNodesToReport)
 | 
			
		||||
	trace.Step("Prioritizing done")
 | 
			
		||||
 | 
			
		||||
	return ScheduleResult{
 | 
			
		||||
@@ -747,29 +752,81 @@ func prioritizeNodes(
 | 
			
		||||
	return nodesScores, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
var errEmptyPriorityList = errors.New("empty priorityList")
 | 
			
		||||
 | 
			
		||||
// selectHost takes a prioritized list of nodes and then picks one
 | 
			
		||||
// in a reservoir sampling manner from the nodes that had the highest score.
 | 
			
		||||
func selectHost(nodeScores []framework.NodePluginScores) (string, error) {
 | 
			
		||||
	if len(nodeScores) == 0 {
 | 
			
		||||
		return "", fmt.Errorf("empty priorityList")
 | 
			
		||||
// It also returns the top {count} Nodes,
 | 
			
		||||
// and the top of the list will be always the selected host.
 | 
			
		||||
func selectHost(nodeScoreList []framework.NodePluginScores, count int) (string, []framework.NodePluginScores, error) {
 | 
			
		||||
	if len(nodeScoreList) == 0 {
 | 
			
		||||
		return "", nil, errEmptyPriorityList
 | 
			
		||||
	}
 | 
			
		||||
	maxScore := nodeScores[0].TotalScore
 | 
			
		||||
	selected := nodeScores[0].Name
 | 
			
		||||
 | 
			
		||||
	var h nodeScoreHeap = nodeScoreList
 | 
			
		||||
	heap.Init(&h)
 | 
			
		||||
	cntOfMaxScore := 1
 | 
			
		||||
	for _, ns := range nodeScores[1:] {
 | 
			
		||||
		if ns.TotalScore > maxScore {
 | 
			
		||||
			maxScore = ns.TotalScore
 | 
			
		||||
			selected = ns.Name
 | 
			
		||||
			cntOfMaxScore = 1
 | 
			
		||||
		} else if ns.TotalScore == maxScore {
 | 
			
		||||
	selectedIndex := 0
 | 
			
		||||
	// The top of the heap is the NodeScoreResult with the highest score.
 | 
			
		||||
	sortedNodeScoreList := make([]framework.NodePluginScores, 0, count)
 | 
			
		||||
	sortedNodeScoreList = append(sortedNodeScoreList, heap.Pop(&h).(framework.NodePluginScores))
 | 
			
		||||
 | 
			
		||||
	// This for-loop will continue until all Nodes with the highest scores get checked for a reservoir sampling,
 | 
			
		||||
	// and sortedNodeScoreList gets (count - 1) elements.
 | 
			
		||||
	for ns := heap.Pop(&h).(framework.NodePluginScores); ; ns = heap.Pop(&h).(framework.NodePluginScores) {
 | 
			
		||||
		if ns.TotalScore != sortedNodeScoreList[0].TotalScore && len(sortedNodeScoreList) == count {
 | 
			
		||||
			break
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		if ns.TotalScore == sortedNodeScoreList[0].TotalScore {
 | 
			
		||||
			cntOfMaxScore++
 | 
			
		||||
			if rand.Intn(cntOfMaxScore) == 0 {
 | 
			
		||||
				// Replace the candidate with probability of 1/cntOfMaxScore
 | 
			
		||||
				selected = ns.Name
 | 
			
		||||
				selectedIndex = cntOfMaxScore - 1
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		sortedNodeScoreList = append(sortedNodeScoreList, ns)
 | 
			
		||||
 | 
			
		||||
		if h.Len() == 0 {
 | 
			
		||||
			break
 | 
			
		||||
		}
 | 
			
		||||
	return selected, nil
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if selectedIndex != 0 {
 | 
			
		||||
		// replace the first one with selected one
 | 
			
		||||
		previous := sortedNodeScoreList[0]
 | 
			
		||||
		sortedNodeScoreList[0] = sortedNodeScoreList[selectedIndex]
 | 
			
		||||
		sortedNodeScoreList[selectedIndex] = previous
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if len(sortedNodeScoreList) > count {
 | 
			
		||||
		sortedNodeScoreList = sortedNodeScoreList[:count]
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return sortedNodeScoreList[0].Name, sortedNodeScoreList, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// nodeScoreHeap is a heap of framework.NodePluginScores.
 | 
			
		||||
type nodeScoreHeap []framework.NodePluginScores
 | 
			
		||||
 | 
			
		||||
// nodeScoreHeap implements heap.Interface.
 | 
			
		||||
var _ heap.Interface = &nodeScoreHeap{}
 | 
			
		||||
 | 
			
		||||
func (h nodeScoreHeap) Len() int           { return len(h) }
 | 
			
		||||
func (h nodeScoreHeap) Less(i, j int) bool { return h[i].TotalScore > h[j].TotalScore }
 | 
			
		||||
func (h nodeScoreHeap) Swap(i, j int)      { h[i], h[j] = h[j], h[i] }
 | 
			
		||||
 | 
			
		||||
func (h *nodeScoreHeap) Push(x interface{}) {
 | 
			
		||||
	*h = append(*h, x.(framework.NodePluginScores))
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (h *nodeScoreHeap) Pop() interface{} {
 | 
			
		||||
	old := *h
 | 
			
		||||
	n := len(old)
 | 
			
		||||
	x := old[n-1]
 | 
			
		||||
	*h = old[0 : n-1]
 | 
			
		||||
	return x
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// assume signals to the cache that a pod is already in the cache, so that binding can be asynchronous.
 | 
			
		||||
 
 | 
			
		||||
@@ -1444,50 +1444,109 @@ func TestUpdatePod(t *testing.T) {
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestSelectHost(t *testing.T) {
 | 
			
		||||
func Test_SelectHost(t *testing.T) {
 | 
			
		||||
	tests := []struct {
 | 
			
		||||
		name              string
 | 
			
		||||
		list              []framework.NodePluginScores
 | 
			
		||||
		possibleHosts sets.Set[string]
 | 
			
		||||
		expectsErr    bool
 | 
			
		||||
		topNodesCnt       int
 | 
			
		||||
		possibleNodes     sets.Set[string]
 | 
			
		||||
		possibleNodeLists [][]framework.NodePluginScores
 | 
			
		||||
		wantError         error
 | 
			
		||||
	}{
 | 
			
		||||
		{
 | 
			
		||||
			name: "unique properly ordered scores",
 | 
			
		||||
			list: []framework.NodePluginScores{
 | 
			
		||||
				{Name: "node1.1", TotalScore: 1},
 | 
			
		||||
				{Name: "node2.1", TotalScore: 2},
 | 
			
		||||
				{Name: "node1", TotalScore: 1},
 | 
			
		||||
				{Name: "node2", TotalScore: 2},
 | 
			
		||||
			},
 | 
			
		||||
			topNodesCnt:   2,
 | 
			
		||||
			possibleNodes: sets.New("node2"),
 | 
			
		||||
			possibleNodeLists: [][]framework.NodePluginScores{
 | 
			
		||||
				{
 | 
			
		||||
					{Name: "node2", TotalScore: 2},
 | 
			
		||||
					{Name: "node1", TotalScore: 1},
 | 
			
		||||
				},
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			name: "numberOfNodeScoresToReturn > len(list)",
 | 
			
		||||
			list: []framework.NodePluginScores{
 | 
			
		||||
				{Name: "node1", TotalScore: 1},
 | 
			
		||||
				{Name: "node2", TotalScore: 2},
 | 
			
		||||
			},
 | 
			
		||||
			topNodesCnt:   100,
 | 
			
		||||
			possibleNodes: sets.New("node2"),
 | 
			
		||||
			possibleNodeLists: [][]framework.NodePluginScores{
 | 
			
		||||
				{
 | 
			
		||||
					{Name: "node2", TotalScore: 2},
 | 
			
		||||
					{Name: "node1", TotalScore: 1},
 | 
			
		||||
				},
 | 
			
		||||
			},
 | 
			
		||||
			possibleHosts: sets.New("node2.1"),
 | 
			
		||||
			expectsErr:    false,
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			name: "equal scores",
 | 
			
		||||
			list: []framework.NodePluginScores{
 | 
			
		||||
				{Name: "node1.1", TotalScore: 1},
 | 
			
		||||
				{Name: "node1.2", TotalScore: 2},
 | 
			
		||||
				{Name: "node1.3", TotalScore: 2},
 | 
			
		||||
				{Name: "node2.1", TotalScore: 2},
 | 
			
		||||
				{Name: "node2.2", TotalScore: 2},
 | 
			
		||||
				{Name: "node2.3", TotalScore: 2},
 | 
			
		||||
			},
 | 
			
		||||
			topNodesCnt:   2,
 | 
			
		||||
			possibleNodes: sets.New("node2.1", "node2.2", "node2.3"),
 | 
			
		||||
			possibleNodeLists: [][]framework.NodePluginScores{
 | 
			
		||||
				{
 | 
			
		||||
					{Name: "node2.1", TotalScore: 2},
 | 
			
		||||
					{Name: "node2.2", TotalScore: 2},
 | 
			
		||||
				},
 | 
			
		||||
				{
 | 
			
		||||
					{Name: "node2.1", TotalScore: 2},
 | 
			
		||||
					{Name: "node2.3", TotalScore: 2},
 | 
			
		||||
				},
 | 
			
		||||
				{
 | 
			
		||||
					{Name: "node2.2", TotalScore: 2},
 | 
			
		||||
					{Name: "node2.1", TotalScore: 2},
 | 
			
		||||
				},
 | 
			
		||||
			possibleHosts: sets.New("node1.2", "node1.3", "node2.1"),
 | 
			
		||||
			expectsErr:    false,
 | 
			
		||||
				{
 | 
			
		||||
					{Name: "node2.2", TotalScore: 2},
 | 
			
		||||
					{Name: "node2.3", TotalScore: 2},
 | 
			
		||||
				},
 | 
			
		||||
				{
 | 
			
		||||
					{Name: "node2.3", TotalScore: 2},
 | 
			
		||||
					{Name: "node2.1", TotalScore: 2},
 | 
			
		||||
				},
 | 
			
		||||
				{
 | 
			
		||||
					{Name: "node2.3", TotalScore: 2},
 | 
			
		||||
					{Name: "node2.2", TotalScore: 2},
 | 
			
		||||
				},
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			name: "out of order scores",
 | 
			
		||||
			list: []framework.NodePluginScores{
 | 
			
		||||
				{Name: "node1.1", TotalScore: 3},
 | 
			
		||||
				{Name: "node1.2", TotalScore: 3},
 | 
			
		||||
				{Name: "node3.1", TotalScore: 3},
 | 
			
		||||
				{Name: "node2.1", TotalScore: 2},
 | 
			
		||||
				{Name: "node3.1", TotalScore: 1},
 | 
			
		||||
				{Name: "node1.3", TotalScore: 3},
 | 
			
		||||
				{Name: "node1.1", TotalScore: 1},
 | 
			
		||||
				{Name: "node3.2", TotalScore: 3},
 | 
			
		||||
			},
 | 
			
		||||
			topNodesCnt:   3,
 | 
			
		||||
			possibleNodes: sets.New("node3.1", "node3.2"),
 | 
			
		||||
			possibleNodeLists: [][]framework.NodePluginScores{
 | 
			
		||||
				{
 | 
			
		||||
					{Name: "node3.1", TotalScore: 3},
 | 
			
		||||
					{Name: "node3.2", TotalScore: 3},
 | 
			
		||||
					{Name: "node2.1", TotalScore: 2},
 | 
			
		||||
				},
 | 
			
		||||
				{
 | 
			
		||||
					{Name: "node3.2", TotalScore: 3},
 | 
			
		||||
					{Name: "node3.1", TotalScore: 3},
 | 
			
		||||
					{Name: "node2.1", TotalScore: 2},
 | 
			
		||||
				},
 | 
			
		||||
			},
 | 
			
		||||
			possibleHosts: sets.New("node1.1", "node1.2", "node1.3"),
 | 
			
		||||
			expectsErr:    false,
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			name:          "empty priority list",
 | 
			
		||||
			list:          []framework.NodePluginScores{},
 | 
			
		||||
			possibleHosts: sets.New[string](),
 | 
			
		||||
			expectsErr:    true,
 | 
			
		||||
			possibleNodes: sets.Set[string]{},
 | 
			
		||||
			wantError:     errEmptyPriorityList,
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
@@ -1495,19 +1554,28 @@ func TestSelectHost(t *testing.T) {
 | 
			
		||||
		t.Run(test.name, func(t *testing.T) {
 | 
			
		||||
			// increase the randomness
 | 
			
		||||
			for i := 0; i < 10; i++ {
 | 
			
		||||
				got, err := selectHost(test.list)
 | 
			
		||||
				if test.expectsErr {
 | 
			
		||||
					if err == nil {
 | 
			
		||||
						t.Error("Unexpected non-error")
 | 
			
		||||
				got, scoreList, err := selectHost(test.list, test.topNodesCnt)
 | 
			
		||||
				if err != test.wantError {
 | 
			
		||||
					t.Fatalf("unexpected error is returned from selectHost: got: %v want: %v", err, test.wantError)
 | 
			
		||||
				}
 | 
			
		||||
				} else {
 | 
			
		||||
					if err != nil {
 | 
			
		||||
						t.Errorf("Unexpected error: %v", err)
 | 
			
		||||
				if test.possibleNodes.Len() == 0 {
 | 
			
		||||
					if got != "" {
 | 
			
		||||
						t.Fatalf("expected nothing returned as selected Node, but actually %s is returned from selectHost", got)
 | 
			
		||||
					}
 | 
			
		||||
					if !test.possibleHosts.Has(got) {
 | 
			
		||||
						t.Errorf("got %s is not in the possible map %v", got, test.possibleHosts)
 | 
			
		||||
					return
 | 
			
		||||
				}
 | 
			
		||||
				if !test.possibleNodes.Has(got) {
 | 
			
		||||
					t.Errorf("got %s is not in the possible map %v", got, test.possibleNodes)
 | 
			
		||||
				}
 | 
			
		||||
				if got != scoreList[0].Name {
 | 
			
		||||
					t.Errorf("The head of list should be the selected Node's score: got: %v, expected: %v", scoreList[0], got)
 | 
			
		||||
				}
 | 
			
		||||
				for _, list := range test.possibleNodeLists {
 | 
			
		||||
					if cmp.Equal(list, scoreList) {
 | 
			
		||||
						return
 | 
			
		||||
					}
 | 
			
		||||
				}
 | 
			
		||||
				t.Errorf("Unexpected scoreList: %v", scoreList)
 | 
			
		||||
			}
 | 
			
		||||
		})
 | 
			
		||||
	}
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user