Merge pull request #84262 from Huang-Wei/eps-prio-migration
Migrate EvenPodsSpread priority implementation as Score plugin with map/reduce style
This commit is contained in:
		| @@ -18,6 +18,7 @@ package priorities | ||||
|  | ||||
| import ( | ||||
| 	"context" | ||||
| 	"fmt" | ||||
| 	"math" | ||||
| 	"sync/atomic" | ||||
|  | ||||
| @@ -26,6 +27,7 @@ import ( | ||||
| 	"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" | ||||
| 	framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" | ||||
| 	schedulerlisters "k8s.io/kubernetes/pkg/scheduler/listers" | ||||
| 	schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" | ||||
| 	schedutil "k8s.io/kubernetes/pkg/scheduler/util" | ||||
|  | ||||
| 	"k8s.io/klog" | ||||
| @@ -36,67 +38,32 @@ type topologyPair struct { | ||||
| 	value string | ||||
| } | ||||
|  | ||||
| type topologySpreadConstraintsMap struct { | ||||
| 	// nodeNameToPodCounts is keyed with node name, and valued with the number of matching pods. | ||||
| 	nodeNameToPodCounts map[string]int32 | ||||
| type podTopologySpreadMap struct { | ||||
| 	// nodeNameSet is a string set holding all node names which have all constraints[*].topologyKey present. | ||||
| 	nodeNameSet map[string]struct{} | ||||
| 	// topologyPairToPodCounts is keyed with topologyPair, and valued with the number of matching pods. | ||||
| 	topologyPairToPodCounts map[topologyPair]*int32 | ||||
| 	topologyPairToPodCounts map[topologyPair]*int64 | ||||
| } | ||||
|  | ||||
| func newTopologySpreadConstraintsMap() *topologySpreadConstraintsMap { | ||||
| 	return &topologySpreadConstraintsMap{ | ||||
| 		nodeNameToPodCounts:     make(map[string]int32), | ||||
| 		topologyPairToPodCounts: make(map[topologyPair]*int32), | ||||
| func newTopologySpreadConstraintsMap() *podTopologySpreadMap { | ||||
| 	return &podTopologySpreadMap{ | ||||
| 		nodeNameSet:             make(map[string]struct{}), | ||||
| 		topologyPairToPodCounts: make(map[topologyPair]*int64), | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // Note: the <nodes> passed in are the "filtered" nodes which have passed Predicates. | ||||
| // This function iterates <nodes> to filter out the nodes which don't have required topologyKey(s), | ||||
| // and initialize two maps: | ||||
| // 1) t.topologyPairToPodCounts: keyed with both eligible topology pair and node names. | ||||
| // 2) t.nodeNameToPodCounts: keyed with node name, and valued with a *int32 pointer for eligible node only. | ||||
| func (t *topologySpreadConstraintsMap) initialize(pod *v1.Pod, nodes []*v1.Node) { | ||||
| 	constraints := getSoftTopologySpreadConstraints(pod) | ||||
| 	for _, node := range nodes { | ||||
| 		if !predicates.NodeLabelsMatchSpreadConstraints(node.Labels, constraints) { | ||||
| 			continue | ||||
| 		} | ||||
| 		for _, constraint := range constraints { | ||||
| 			pair := topologyPair{key: constraint.TopologyKey, value: node.Labels[constraint.TopologyKey]} | ||||
| 			if t.topologyPairToPodCounts[pair] == nil { | ||||
| 				t.topologyPairToPodCounts[pair] = new(int32) | ||||
| 			} | ||||
| 		} | ||||
| 		t.nodeNameToPodCounts[node.Name] = 0 | ||||
| 		// For those nodes which don't have all required topologyKeys present, it's intentional to keep | ||||
| 		// those entries absent in nodeNameToPodCounts, so that we're able to score them to 0 afterwards. | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // CalculateEvenPodsSpreadPriority computes a score by checking through the topologySpreadConstraints | ||||
| // that are with WhenUnsatisfiable=ScheduleAnyway (a.k.a soft constraint). | ||||
| // The function works as below: | ||||
| // 1) In all nodes, calculate the number of pods which match <pod>'s soft topology spread constraints. | ||||
| // 2) Group the number calculated in 1) by topologyPair, and sum up to corresponding candidate nodes. | ||||
| // 3) Finally normalize the number to 0~10. The node with the highest score is the most preferred. | ||||
| // Note: Symmetry is not applicable. We only weigh how incomingPod matches existingPod. | ||||
| // Whether existingPod matches incomingPod doesn't contribute to the final score. | ||||
| // This is different from the Affinity API. | ||||
| func CalculateEvenPodsSpreadPriority(pod *v1.Pod, sharedLister schedulerlisters.SharedLister, nodes []*v1.Node) (framework.NodeScoreList, error) { | ||||
| 	result := make(framework.NodeScoreList, len(nodes)) | ||||
| // buildPodTopologySpreadMap prepares necessary data (podTopologySpreadMap) for incoming pod on the filteredNodes. | ||||
| // Later Priority function will use 'podTopologySpreadMap' to perform the Scoring calculations. | ||||
| func buildPodTopologySpreadMap(pod *v1.Pod, filteredNodes []*v1.Node, allNodes []*schedulernodeinfo.NodeInfo) *podTopologySpreadMap { | ||||
| 	// return if incoming pod doesn't have soft topology spread constraints. | ||||
| 	constraints := getSoftTopologySpreadConstraints(pod) | ||||
| 	if len(constraints) == 0 { | ||||
| 		return result, nil | ||||
| 	if len(constraints) == 0 || len(filteredNodes) == 0 || len(allNodes) == 0 { | ||||
| 		return nil | ||||
| 	} | ||||
|  | ||||
| 	allNodes, err := sharedLister.NodeInfos().List() | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
|  | ||||
| 	t := newTopologySpreadConstraintsMap() | ||||
| 	t.initialize(pod, nodes) | ||||
| 	// initialize podTopologySpreadMap which will be used in Score plugin. | ||||
| 	m := newTopologySpreadConstraintsMap() | ||||
| 	m.initialize(pod, filteredNodes) | ||||
|  | ||||
| 	errCh := schedutil.NewErrorChannel() | ||||
| 	ctx, cancel := context.WithCancel(context.Background()) | ||||
| @@ -117,12 +84,12 @@ func CalculateEvenPodsSpreadPriority(pod *v1.Pod, sharedLister schedulerlisters. | ||||
| 			pair := topologyPair{key: constraint.TopologyKey, value: node.Labels[constraint.TopologyKey]} | ||||
| 			// If current topology pair is not associated with any candidate node, | ||||
| 			// continue to avoid unnecessary calculation. | ||||
| 			if t.topologyPairToPodCounts[pair] == nil { | ||||
| 			if m.topologyPairToPodCounts[pair] == nil { | ||||
| 				continue | ||||
| 			} | ||||
|  | ||||
| 			// <matchSum> indicates how many pods (on current node) match the <constraint>. | ||||
| 			matchSum := int32(0) | ||||
| 			matchSum := int64(0) | ||||
| 			for _, existingPod := range nodeInfo.Pods() { | ||||
| 				match, err := predicates.PodMatchesSpreadConstraint(existingPod.Labels, constraint) | ||||
| 				if err != nil { | ||||
| @@ -133,67 +100,132 @@ func CalculateEvenPodsSpreadPriority(pod *v1.Pod, sharedLister schedulerlisters. | ||||
| 					matchSum++ | ||||
| 				} | ||||
| 			} | ||||
| 			atomic.AddInt32(t.topologyPairToPodCounts[pair], matchSum) | ||||
| 			atomic.AddInt64(m.topologyPairToPodCounts[pair], matchSum) | ||||
| 		} | ||||
| 	} | ||||
| 	workqueue.ParallelizeUntil(ctx, 16, len(allNodes), processAllNode) | ||||
| 	if err := errCh.ReceiveError(); err != nil { | ||||
| 		return nil, err | ||||
| 		klog.Error(err) | ||||
| 		return nil | ||||
| 	} | ||||
|  | ||||
| 	var minCount int32 = math.MaxInt32 | ||||
| 	// <total> sums up the number of matching pods on each qualified topology pair | ||||
| 	var total int32 | ||||
| 	for _, node := range nodes { | ||||
| 		if _, ok := t.nodeNameToPodCounts[node.Name]; !ok { | ||||
| 	return m | ||||
| } | ||||
|  | ||||
| // initialize iterates "filteredNodes" to filter out the nodes which don't have required topologyKey(s), | ||||
| // and initialize two maps: | ||||
| // 1) m.topologyPairToPodCounts: keyed with both eligible topology pair and node names. | ||||
| // 2) m.nodeNameSet: keyed with node name, and valued with a *int64 pointer for eligible node only. | ||||
| func (m *podTopologySpreadMap) initialize(pod *v1.Pod, filteredNodes []*v1.Node) { | ||||
| 	constraints := getSoftTopologySpreadConstraints(pod) | ||||
| 	for _, node := range filteredNodes { | ||||
| 		if !predicates.NodeLabelsMatchSpreadConstraints(node.Labels, constraints) { | ||||
| 			continue | ||||
| 		} | ||||
| 		for _, constraint := range constraints { | ||||
| 			pair := topologyPair{key: constraint.TopologyKey, value: node.Labels[constraint.TopologyKey]} | ||||
| 			if m.topologyPairToPodCounts[pair] == nil { | ||||
| 				m.topologyPairToPodCounts[pair] = new(int64) | ||||
| 			} | ||||
| 		} | ||||
| 		m.nodeNameSet[node.Name] = struct{}{} | ||||
| 		// For those nodes which don't have all required topologyKeys present, it's intentional to leave | ||||
| 		// their entries absent in nodeNameSet, so that we're able to score them to 0 afterwards. | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // CalculateEvenPodsSpreadPriorityMap calculate the number of matching pods on the passed-in "node", | ||||
| // and return the number as Score. | ||||
| func CalculateEvenPodsSpreadPriorityMap(pod *v1.Pod, meta interface{}, nodeInfo *schedulernodeinfo.NodeInfo) (framework.NodeScore, error) { | ||||
| 	node := nodeInfo.Node() | ||||
| 	if node == nil { | ||||
| 		return framework.NodeScore{}, fmt.Errorf("node not found") | ||||
| 	} | ||||
|  | ||||
| 	var m *podTopologySpreadMap | ||||
| 	if priorityMeta, ok := meta.(*priorityMetadata); ok { | ||||
| 		m = priorityMeta.podTopologySpreadMap | ||||
| 	} | ||||
| 	if m == nil { | ||||
| 		return framework.NodeScore{}, nil | ||||
| 	} | ||||
|  | ||||
| 	// no need to continue if the node is not qualified. | ||||
| 	if _, ok := m.nodeNameSet[node.Name]; !ok { | ||||
| 		return framework.NodeScore{Name: node.Name, Score: 0}, nil | ||||
| 	} | ||||
|  | ||||
| 	constraints := getSoftTopologySpreadConstraints(pod) | ||||
| 	// For each present <pair>, current node gets a credit of <matchSum>. | ||||
| 		// And we add <matchSum> to <t.total> to reverse the final score later. | ||||
| 	// And we sum up <matchSum> and return it as this node's score. | ||||
| 	var score int64 | ||||
| 	for _, constraint := range constraints { | ||||
| 		if tpVal, ok := node.Labels[constraint.TopologyKey]; ok { | ||||
| 			pair := topologyPair{key: constraint.TopologyKey, value: tpVal} | ||||
| 				matchSum := *t.topologyPairToPodCounts[pair] | ||||
| 				t.nodeNameToPodCounts[node.Name] += matchSum | ||||
| 				total += matchSum | ||||
| 			matchSum := *m.topologyPairToPodCounts[pair] | ||||
| 			score += matchSum | ||||
| 		} | ||||
| 	} | ||||
| 		if t.nodeNameToPodCounts[node.Name] < minCount { | ||||
| 			minCount = t.nodeNameToPodCounts[node.Name] | ||||
| 	return framework.NodeScore{Name: node.Name, Score: score}, nil | ||||
| } | ||||
|  | ||||
| // CalculateEvenPodsSpreadPriorityReduce normalizes the score for each filteredNode, | ||||
| // The basic rule is: the bigger the score(matching number of pods) is, the smaller the | ||||
| // final normalized score will be. | ||||
| func CalculateEvenPodsSpreadPriorityReduce(pod *v1.Pod, meta interface{}, sharedLister schedulerlisters.SharedLister, | ||||
| 	result framework.NodeScoreList) error { | ||||
| 	var m *podTopologySpreadMap | ||||
| 	if priorityMeta, ok := meta.(*priorityMetadata); ok { | ||||
| 		m = priorityMeta.podTopologySpreadMap | ||||
| 	} | ||||
| 	if m == nil { | ||||
| 		return nil | ||||
| 	} | ||||
|  | ||||
| 	// Calculate the summed <total> score and <minScore>. | ||||
| 	var minScore int64 = math.MaxInt64 | ||||
| 	var total int64 | ||||
| 	for _, score := range result { | ||||
| 		// it's mandatory to check if <score.Name> is present in m.nodeNameSet | ||||
| 		if _, ok := m.nodeNameSet[score.Name]; !ok { | ||||
| 			continue | ||||
| 		} | ||||
| 		total += score.Score | ||||
| 		if score.Score < minScore { | ||||
| 			minScore = score.Score | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	// calculate final priority score for each node | ||||
| 	// TODO(Huang-Wei): in alpha version, we keep the formula as simple as possible. | ||||
| 	// current version ranks the nodes properly, but it doesn't take MaxSkew into | ||||
| 	// consideration, we may come up with a better formula in the future. | ||||
| 	maxMinDiff := total - minCount | ||||
| 	for i := range nodes { | ||||
| 		node := nodes[i] | ||||
| 		result[i].Name = node.Name | ||||
|  | ||||
| 		// debugging purpose: print the value for each node | ||||
| 		// score must be pointer here, otherwise it's always 0 | ||||
| 	maxMinDiff := total - minScore | ||||
| 	for i := range result { | ||||
| 		nodeInfo, err := sharedLister.NodeInfos().Get(result[i].Name) | ||||
| 		if err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| 		node := nodeInfo.Node() | ||||
| 		// Debugging purpose: print the score for each node. | ||||
| 		// Score must be a pointer here, otherwise it's always 0. | ||||
| 		if klog.V(10) { | ||||
| 			defer func(score *int64, nodeName string) { | ||||
| 				klog.Infof("%v -> %v: EvenPodsSpreadPriority, Score: (%d)", pod.Name, nodeName, *score) | ||||
| 				klog.Infof("%v -> %v: PodTopologySpread NormalizeScore, Score: (%d)", pod.Name, nodeName, *score) | ||||
| 			}(&result[i].Score, node.Name) | ||||
| 		} | ||||
|  | ||||
| 		if _, ok := t.nodeNameToPodCounts[node.Name]; !ok { | ||||
| 			result[i].Score = 0 | ||||
| 			continue | ||||
| 		} | ||||
| 		if maxMinDiff == 0 { | ||||
| 			result[i].Score = framework.MaxNodeScore | ||||
| 			continue | ||||
| 		} | ||||
| 		fScore := float64(framework.MaxNodeScore) * (float64(total-t.nodeNameToPodCounts[node.Name]) / float64(maxMinDiff)) | ||||
| 		result[i].Score = int64(fScore) | ||||
|  | ||||
| 		if _, ok := m.nodeNameSet[node.Name]; !ok { | ||||
| 			result[i].Score = 0 | ||||
| 			continue | ||||
| 		} | ||||
|  | ||||
| 	return result, nil | ||||
| 		flippedScore := total - result[i].Score | ||||
| 		fScore := float64(framework.MaxNodeScore) * (float64(flippedScore) / float64(maxMinDiff)) | ||||
| 		result[i].Score = int64(fScore) | ||||
| 	} | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // TODO(Huang-Wei): combine this with getHardTopologySpreadConstraints() in predicates package | ||||
|   | ||||
| @@ -26,13 +26,13 @@ import ( | ||||
| 	st "k8s.io/kubernetes/pkg/scheduler/testing" | ||||
| ) | ||||
|  | ||||
| func Test_topologySpreadConstraintsMap_initialize(t *testing.T) { | ||||
| func Test_podTopologySpreadMap_initialize(t *testing.T) { | ||||
| 	tests := []struct { | ||||
| 		name                string | ||||
| 		pod                 *v1.Pod | ||||
| 		nodes               []*v1.Node | ||||
| 		wantNodeNameMap     map[string]int32 | ||||
| 		wantTopologyPairMap map[topologyPair]*int32 | ||||
| 		wantNodeNameSet     map[string]struct{} | ||||
| 		wantTopologyPairMap map[topologyPair]*int64 | ||||
| 	}{ | ||||
| 		{ | ||||
| 			name: "normal case", | ||||
| @@ -45,17 +45,17 @@ func Test_topologySpreadConstraintsMap_initialize(t *testing.T) { | ||||
| 				st.MakeNode().Name("node-b").Label("zone", "zone1").Label("node", "node-b").Obj(), | ||||
| 				st.MakeNode().Name("node-x").Label("zone", "zone2").Label("node", "node-x").Obj(), | ||||
| 			}, | ||||
| 			wantNodeNameMap: map[string]int32{ | ||||
| 				"node-a": 0, | ||||
| 				"node-b": 0, | ||||
| 				"node-x": 0, | ||||
| 			wantNodeNameSet: map[string]struct{}{ | ||||
| 				"node-a": {}, | ||||
| 				"node-b": {}, | ||||
| 				"node-x": {}, | ||||
| 			}, | ||||
| 			wantTopologyPairMap: map[topologyPair]*int32{ | ||||
| 				{key: "zone", value: "zone1"}:  new(int32), | ||||
| 				{key: "zone", value: "zone2"}:  new(int32), | ||||
| 				{key: "node", value: "node-a"}: new(int32), | ||||
| 				{key: "node", value: "node-b"}: new(int32), | ||||
| 				{key: "node", value: "node-x"}: new(int32), | ||||
| 			wantTopologyPairMap: map[topologyPair]*int64{ | ||||
| 				{key: "zone", value: "zone1"}:  new(int64), | ||||
| 				{key: "zone", value: "zone2"}:  new(int64), | ||||
| 				{key: "node", value: "node-a"}: new(int64), | ||||
| 				{key: "node", value: "node-b"}: new(int64), | ||||
| 				{key: "node", value: "node-x"}: new(int64), | ||||
| 			}, | ||||
| 		}, | ||||
| 		{ | ||||
| @@ -69,26 +69,26 @@ func Test_topologySpreadConstraintsMap_initialize(t *testing.T) { | ||||
| 				st.MakeNode().Name("node-b").Label("zone", "zone1").Label("node", "node-b").Obj(), | ||||
| 				st.MakeNode().Name("node-x").Label("node", "node-x").Obj(), | ||||
| 			}, | ||||
| 			wantNodeNameMap: map[string]int32{ | ||||
| 				"node-a": 0, | ||||
| 				"node-b": 0, | ||||
| 			wantNodeNameSet: map[string]struct{}{ | ||||
| 				"node-a": {}, | ||||
| 				"node-b": {}, | ||||
| 			}, | ||||
| 			wantTopologyPairMap: map[topologyPair]*int32{ | ||||
| 				{key: "zone", value: "zone1"}:  new(int32), | ||||
| 				{key: "node", value: "node-a"}: new(int32), | ||||
| 				{key: "node", value: "node-b"}: new(int32), | ||||
| 			wantTopologyPairMap: map[topologyPair]*int64{ | ||||
| 				{key: "zone", value: "zone1"}:  new(int64), | ||||
| 				{key: "node", value: "node-a"}: new(int64), | ||||
| 				{key: "node", value: "node-b"}: new(int64), | ||||
| 			}, | ||||
| 		}, | ||||
| 	} | ||||
| 	for _, tt := range tests { | ||||
| 		t.Run(tt.name, func(t *testing.T) { | ||||
| 			tMap := newTopologySpreadConstraintsMap() | ||||
| 			tMap.initialize(tt.pod, tt.nodes) | ||||
| 			if !reflect.DeepEqual(tMap.nodeNameToPodCounts, tt.wantNodeNameMap) { | ||||
| 				t.Errorf("initilize().nodeNameToPodCounts = %#v, want %#v", tMap.nodeNameToPodCounts, tt.wantNodeNameMap) | ||||
| 			m := newTopologySpreadConstraintsMap() | ||||
| 			m.initialize(tt.pod, tt.nodes) | ||||
| 			if !reflect.DeepEqual(m.nodeNameSet, tt.wantNodeNameSet) { | ||||
| 				t.Errorf("initilize().nodeNameSet = %#v, want %#v", m.nodeNameSet, tt.wantNodeNameSet) | ||||
| 			} | ||||
| 			if !reflect.DeepEqual(tMap.topologyPairToPodCounts, tt.wantTopologyPairMap) { | ||||
| 				t.Errorf("initilize().topologyPairToPodCounts = %#v, want %#v", tMap.topologyPairToPodCounts, tt.wantTopologyPairMap) | ||||
| 			if !reflect.DeepEqual(m.topologyPairToPodCounts, tt.wantTopologyPairMap) { | ||||
| 				t.Errorf("initilize().topologyPairToPodCounts = %#v, want %#v", m.topologyPairToPodCounts, tt.wantTopologyPairMap) | ||||
| 			} | ||||
| 		}) | ||||
| 	} | ||||
| @@ -435,9 +435,23 @@ func TestCalculateEvenPodsSpreadPriority(t *testing.T) { | ||||
| 			allNodes := append([]*v1.Node{}, tt.nodes...) | ||||
| 			allNodes = append(allNodes, tt.failedNodes...) | ||||
| 			snapshot := nodeinfosnapshot.NewSnapshot(tt.existingPods, allNodes) | ||||
| 			got, _ := CalculateEvenPodsSpreadPriority(tt.pod, snapshot, tt.nodes) | ||||
| 			if !reflect.DeepEqual(got, tt.want) { | ||||
| 				t.Errorf("CalculateEvenPodsSpreadPriority() = %#v, want %#v", got, tt.want) | ||||
|  | ||||
| 			meta := &priorityMetadata{ | ||||
| 				podTopologySpreadMap: buildPodTopologySpreadMap(tt.pod, tt.nodes, snapshot.NodeInfoList), | ||||
| 			} | ||||
| 			var gotList framework.NodeScoreList | ||||
| 			for _, n := range tt.nodes { | ||||
| 				nodeName := n.Name | ||||
| 				nodeScore, err := CalculateEvenPodsSpreadPriorityMap(tt.pod, meta, snapshot.NodeInfoMap[nodeName]) | ||||
| 				if err != nil { | ||||
| 					t.Error(err) | ||||
| 				} | ||||
| 				gotList = append(gotList, nodeScore) | ||||
| 			} | ||||
|  | ||||
| 			CalculateEvenPodsSpreadPriorityReduce(tt.pod, meta, snapshot, gotList) | ||||
| 			if !reflect.DeepEqual(gotList, tt.want) { | ||||
| 				t.Errorf("CalculateEvenPodsSpreadPriorityReduce() = %#v, want %#v", gotList, tt.want) | ||||
| 			} | ||||
| 		}) | ||||
| 	} | ||||
| @@ -484,9 +498,19 @@ func BenchmarkTestCalculateEvenPodsSpreadPriority(b *testing.B) { | ||||
| 		b.Run(tt.name, func(b *testing.B) { | ||||
| 			existingPods, allNodes, filteredNodes := st.MakeNodesAndPodsForEvenPodsSpread(tt.pod, tt.existingPodsNum, tt.allNodesNum, tt.filteredNodesNum) | ||||
| 			snapshot := nodeinfosnapshot.NewSnapshot(existingPods, allNodes) | ||||
| 			meta := &priorityMetadata{ | ||||
| 				podTopologySpreadMap: buildPodTopologySpreadMap(tt.pod, filteredNodes, snapshot.NodeInfoList), | ||||
| 			} | ||||
| 			b.ResetTimer() | ||||
|  | ||||
| 			for i := 0; i < b.N; i++ { | ||||
| 				CalculateEvenPodsSpreadPriority(tt.pod, snapshot, filteredNodes) | ||||
| 				var gotList framework.NodeScoreList | ||||
| 				for _, n := range filteredNodes { | ||||
| 					nodeName := n.Name | ||||
| 					nodeScore, _ := CalculateEvenPodsSpreadPriorityMap(tt.pod, meta, snapshot.NodeInfoMap[nodeName]) | ||||
| 					gotList = append(gotList, nodeScore) | ||||
| 				} | ||||
| 				CalculateEvenPodsSpreadPriorityReduce(tt.pod, meta, snapshot, gotList) | ||||
| 			} | ||||
| 		}) | ||||
| 	} | ||||
|   | ||||
| @@ -59,18 +59,21 @@ type priorityMetadata struct { | ||||
| 	controllerRef           *metav1.OwnerReference | ||||
| 	podFirstServiceSelector labels.Selector | ||||
| 	totalNumNodes           int | ||||
| 	podTopologySpreadMap    *podTopologySpreadMap | ||||
| } | ||||
|  | ||||
| // PriorityMetadata is a PriorityMetadataProducer.  Node info can be nil. | ||||
| func (pmf *PriorityMetadataFactory) PriorityMetadata(pod *v1.Pod, _ []*v1.Node, sharedLister schedulerlisters.SharedLister) interface{} { | ||||
| func (pmf *PriorityMetadataFactory) PriorityMetadata(pod *v1.Pod, filteredNodes []*v1.Node, sharedLister schedulerlisters.SharedLister) interface{} { | ||||
| 	// If we cannot compute metadata, just return nil | ||||
| 	if pod == nil { | ||||
| 		return nil | ||||
| 	} | ||||
| 	totalNumNodes := 0 | ||||
| 	var allNodes []*schedulernodeinfo.NodeInfo | ||||
| 	if sharedLister != nil { | ||||
| 		if l, err := sharedLister.NodeInfos().List(); err == nil { | ||||
| 			totalNumNodes = len(l) | ||||
| 			allNodes = l | ||||
| 		} | ||||
| 	} | ||||
| 	return &priorityMetadata{ | ||||
| @@ -81,6 +84,7 @@ func (pmf *PriorityMetadataFactory) PriorityMetadata(pod *v1.Pod, _ []*v1.Node, | ||||
| 		controllerRef:           metav1.GetControllerOf(pod), | ||||
| 		podFirstServiceSelector: getFirstServiceSelector(pod, pmf.serviceLister), | ||||
| 		totalNumNodes:           totalNumNodes, | ||||
| 		podTopologySpreadMap:    buildPodTopologySpreadMap(pod, filteredNodes, allNodes), | ||||
| 	} | ||||
| } | ||||
|  | ||||
|   | ||||
| @@ -68,7 +68,12 @@ func ApplyFeatureGates() (restore func()) { | ||||
| 		scheduler.RegisterFitPredicate(predicates.EvenPodsSpreadPred, predicates.EvenPodsSpreadPredicate) | ||||
| 		// register priority | ||||
| 		scheduler.InsertPriorityKeyToAlgorithmProviderMap(priorities.EvenPodsSpreadPriority) | ||||
| 		scheduler.RegisterPriorityFunction(priorities.EvenPodsSpreadPriority, priorities.CalculateEvenPodsSpreadPriority, 1) | ||||
| 		scheduler.RegisterPriorityMapReduceFunction( | ||||
| 			priorities.EvenPodsSpreadPriority, | ||||
| 			priorities.CalculateEvenPodsSpreadPriorityMap, | ||||
| 			priorities.CalculateEvenPodsSpreadPriorityReduce, | ||||
| 			1, | ||||
| 		) | ||||
| 	} | ||||
|  | ||||
| 	// Prioritizes nodes that satisfy pod's resource limits | ||||
|   | ||||
| @@ -1196,15 +1196,15 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { | ||||
| 			featureGates: map[featuregate.Feature]bool{ | ||||
| 				features.EvenPodsSpread: true, | ||||
| 			}, | ||||
| 			wantPrioritizers: sets.NewString( | ||||
| 				"EvenPodsSpreadPriority", | ||||
| 			), | ||||
| 			wantPlugins: map[string][]kubeschedulerconfig.Plugin{ | ||||
| 				"FilterPlugin": { | ||||
| 					{Name: "NodeUnschedulable"}, | ||||
| 					{Name: "TaintToleration"}, | ||||
| 					{Name: "PodTopologySpread"}, | ||||
| 				}, | ||||
| 				"ScorePlugin": { | ||||
| 					{Name: "PodTopologySpread", Weight: 2}, | ||||
| 				}, | ||||
| 			}, | ||||
| 		}, | ||||
| 		{ | ||||
|   | ||||
| @@ -229,25 +229,26 @@ func NewDefaultConfigProducerRegistry() *ConfigProducerRegistry { | ||||
| 			plugins.Score = appendToPluginSet(plugins.Score, nodepreferavoidpods.Name, &args.Weight) | ||||
| 			return | ||||
| 		}) | ||||
|  | ||||
| 	registry.RegisterPriority(priorities.MostRequestedPriority, | ||||
| 		func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) { | ||||
| 			plugins.Score = appendToPluginSet(plugins.Score, noderesources.MostAllocatedName, &args.Weight) | ||||
| 			return | ||||
| 		}) | ||||
|  | ||||
| 	registry.RegisterPriority(priorities.BalancedResourceAllocation, | ||||
| 		func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) { | ||||
| 			plugins.Score = appendToPluginSet(plugins.Score, noderesources.BalancedAllocationName, &args.Weight) | ||||
| 			return | ||||
| 		}) | ||||
|  | ||||
| 	registry.RegisterPriority(priorities.LeastRequestedPriority, | ||||
| 		func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) { | ||||
| 			plugins.Score = appendToPluginSet(plugins.Score, noderesources.LeastAllocatedName, &args.Weight) | ||||
| 			return | ||||
| 		}) | ||||
|  | ||||
| 	registry.RegisterPriority(priorities.EvenPodsSpreadPriority, | ||||
| 		func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) { | ||||
| 			plugins.Score = appendToPluginSet(plugins.Score, podtopologyspread.Name, &args.Weight) | ||||
| 			return | ||||
| 		}) | ||||
| 	registry.RegisterPriority(requestedtocapacityratio.Name, | ||||
| 		func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) { | ||||
| 			plugins.Score = appendToPluginSet(plugins.Score, requestedtocapacityratio.Name, &args.Weight) | ||||
|   | ||||
| @@ -7,6 +7,7 @@ go_library( | ||||
|     visibility = ["//visibility:public"], | ||||
|     deps = [ | ||||
|         "//pkg/scheduler/algorithm/predicates:go_default_library", | ||||
|         "//pkg/scheduler/algorithm/priorities:go_default_library", | ||||
|         "//pkg/scheduler/framework/plugins/migration:go_default_library", | ||||
|         "//pkg/scheduler/framework/v1alpha1:go_default_library", | ||||
|         "//pkg/scheduler/nodeinfo:go_default_library", | ||||
|   | ||||
| @@ -23,15 +23,19 @@ import ( | ||||
| 	"k8s.io/api/core/v1" | ||||
| 	"k8s.io/apimachinery/pkg/runtime" | ||||
| 	"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" | ||||
| 	"k8s.io/kubernetes/pkg/scheduler/algorithm/priorities" | ||||
| 	"k8s.io/kubernetes/pkg/scheduler/framework/plugins/migration" | ||||
| 	framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" | ||||
| 	"k8s.io/kubernetes/pkg/scheduler/nodeinfo" | ||||
| ) | ||||
|  | ||||
| // PodTopologySpread is a plugin that ensures pod's topologySpreadConstraints is satisfied. | ||||
| type PodTopologySpread struct{} | ||||
| type PodTopologySpread struct { | ||||
| 	handle framework.FrameworkHandle | ||||
| } | ||||
|  | ||||
| var _ framework.FilterPlugin = &PodTopologySpread{} | ||||
| var _ framework.ScorePlugin = &PodTopologySpread{} | ||||
|  | ||||
| // Name is the name of the plugin used in the plugin registry and configurations. | ||||
| const Name = "PodTopologySpread" | ||||
| @@ -51,7 +55,33 @@ func (pl *PodTopologySpread) Filter(ctx context.Context, cycleState *framework.C | ||||
| 	return migration.PredicateResultToFrameworkStatus(reasons, err) | ||||
| } | ||||
|  | ||||
| // New initializes a new plugin and returns it. | ||||
| func New(_ *runtime.Unknown, _ framework.FrameworkHandle) (framework.Plugin, error) { | ||||
| 	return &PodTopologySpread{}, nil | ||||
| // Score invoked at the Score extension point. | ||||
| // The "score" returned in this function is the matching number of pods on the `nodeName`, | ||||
| // it is normalized later. | ||||
| func (pl *PodTopologySpread) Score(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) { | ||||
| 	nodeInfo, err := pl.handle.SnapshotSharedLister().NodeInfos().Get(nodeName) | ||||
| 	if err != nil { | ||||
| 		return 0, framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v", nodeName, err)) | ||||
| 	} | ||||
|  | ||||
| 	meta := migration.PriorityMetadata(state) | ||||
| 	s, err := priorities.CalculateEvenPodsSpreadPriorityMap(pod, meta, nodeInfo) | ||||
| 	return s.Score, migration.ErrorToFrameworkStatus(err) | ||||
| } | ||||
|  | ||||
| // NormalizeScore invoked after scoring all nodes. | ||||
| func (pl *PodTopologySpread) NormalizeScore(ctx context.Context, state *framework.CycleState, pod *v1.Pod, scores framework.NodeScoreList) *framework.Status { | ||||
| 	meta := migration.PriorityMetadata(state) | ||||
| 	err := priorities.CalculateEvenPodsSpreadPriorityReduce(pod, meta, pl.handle.SnapshotSharedLister(), scores) | ||||
| 	return migration.ErrorToFrameworkStatus(err) | ||||
| } | ||||
|  | ||||
| // ScoreExtensions of the Score plugin. | ||||
| func (pl *PodTopologySpread) ScoreExtensions() framework.ScoreExtensions { | ||||
| 	return pl | ||||
| } | ||||
|  | ||||
| // New initializes a new plugin and returns it. | ||||
| func New(_ *runtime.Unknown, h framework.FrameworkHandle) (framework.Plugin, error) { | ||||
| 	return &PodTopologySpread{handle: h}, nil | ||||
| } | ||||
|   | ||||
| @@ -23,14 +23,16 @@ import ( | ||||
| 	"k8s.io/api/core/v1" | ||||
| 	"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" | ||||
| 	"k8s.io/kubernetes/pkg/scheduler/framework/plugins/migration" | ||||
| 	"k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" | ||||
| 	framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" | ||||
| 	nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot" | ||||
| 	st "k8s.io/kubernetes/pkg/scheduler/testing" | ||||
| ) | ||||
|  | ||||
| var hardSpread = v1.DoNotSchedule | ||||
| var ( | ||||
| 	hardSpread = v1.DoNotSchedule | ||||
| ) | ||||
|  | ||||
| func TestPodTopologySpreadFilter_SingleConstraint(t *testing.T) { | ||||
| func TestPodTopologySpread_Filter_SingleConstraint(t *testing.T) { | ||||
| 	tests := []struct { | ||||
| 		name         string | ||||
| 		pod          *v1.Pod | ||||
| @@ -269,7 +271,7 @@ func TestPodTopologySpreadFilter_SingleConstraint(t *testing.T) { | ||||
| 		t.Run(tt.name, func(t *testing.T) { | ||||
| 			snapshot := nodeinfosnapshot.NewSnapshot(tt.existingPods, tt.nodes) | ||||
| 			meta := predicates.GetPredicateMetadata(tt.pod, snapshot) | ||||
| 			state := v1alpha1.NewCycleState() | ||||
| 			state := framework.NewCycleState() | ||||
| 			state.Write(migration.PredicatesStateKey, &migration.PredicatesStateData{Reference: meta}) | ||||
| 			plugin, _ := New(nil, nil) | ||||
| 			for _, node := range tt.nodes { | ||||
| @@ -283,7 +285,7 @@ func TestPodTopologySpreadFilter_SingleConstraint(t *testing.T) { | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func TestPodTopologySpreadFilter_MultipleConstraints(t *testing.T) { | ||||
| func TestPodTopologySpread_Filter_MultipleConstraints(t *testing.T) { | ||||
| 	tests := []struct { | ||||
| 		name         string | ||||
| 		pod          *v1.Pod | ||||
| @@ -466,7 +468,7 @@ func TestPodTopologySpreadFilter_MultipleConstraints(t *testing.T) { | ||||
| 		t.Run(tt.name, func(t *testing.T) { | ||||
| 			snapshot := nodeinfosnapshot.NewSnapshot(tt.existingPods, tt.nodes) | ||||
| 			meta := predicates.GetPredicateMetadata(tt.pod, snapshot) | ||||
| 			state := v1alpha1.NewCycleState() | ||||
| 			state := framework.NewCycleState() | ||||
| 			state.Write(migration.PredicatesStateKey, &migration.PredicatesStateData{Reference: meta}) | ||||
| 			plugin, _ := New(nil, nil) | ||||
| 			for _, node := range tt.nodes { | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 Kubernetes Prow Robot
					Kubernetes Prow Robot