Merge pull request #76973 from Huang-Wei/lazy-init-int64-ptr
scheduler: fix perf downgrade of cases without presence of (anti-)affinity pods
This commit is contained in:
		| @@ -19,6 +19,7 @@ package priorities | |||||||
| import ( | import ( | ||||||
| 	"context" | 	"context" | ||||||
| 	"sync" | 	"sync" | ||||||
|  | 	"sync/atomic" | ||||||
|  |  | ||||||
| 	"k8s.io/api/core/v1" | 	"k8s.io/api/core/v1" | ||||||
| 	apierrors "k8s.io/apimachinery/pkg/api/errors" | 	apierrors "k8s.io/apimachinery/pkg/api/errors" | ||||||
| @@ -63,7 +64,7 @@ type podAffinityPriorityMap struct { | |||||||
| 	nodes []*v1.Node | 	nodes []*v1.Node | ||||||
| 	// counts store the mapping from node name to so-far computed score of | 	// counts store the mapping from node name to so-far computed score of | ||||||
| 	// the node. | 	// the node. | ||||||
| 	counts map[string]float64 | 	counts map[string]*int64 | ||||||
| 	// The first error that we faced. | 	// The first error that we faced. | ||||||
| 	firstError error | 	firstError error | ||||||
| } | } | ||||||
| @@ -71,7 +72,7 @@ type podAffinityPriorityMap struct { | |||||||
| func newPodAffinityPriorityMap(nodes []*v1.Node) *podAffinityPriorityMap { | func newPodAffinityPriorityMap(nodes []*v1.Node) *podAffinityPriorityMap { | ||||||
| 	return &podAffinityPriorityMap{ | 	return &podAffinityPriorityMap{ | ||||||
| 		nodes:  nodes, | 		nodes:  nodes, | ||||||
| 		counts: make(map[string]float64, len(nodes)), | 		counts: make(map[string]*int64, len(nodes)), | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
| @@ -83,7 +84,7 @@ func (p *podAffinityPriorityMap) setError(err error) { | |||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
| func (p *podAffinityPriorityMap) processTerm(term *v1.PodAffinityTerm, podDefiningAffinityTerm, podToCheck *v1.Pod, fixedNode *v1.Node, weight float64) { | func (p *podAffinityPriorityMap) processTerm(term *v1.PodAffinityTerm, podDefiningAffinityTerm, podToCheck *v1.Pod, fixedNode *v1.Node, weight int64) { | ||||||
| 	namespaces := priorityutil.GetNamespacesFromPodAffinityTerm(podDefiningAffinityTerm, term) | 	namespaces := priorityutil.GetNamespacesFromPodAffinityTerm(podDefiningAffinityTerm, term) | ||||||
| 	selector, err := metav1.LabelSelectorAsSelector(term.LabelSelector) | 	selector, err := metav1.LabelSelectorAsSelector(term.LabelSelector) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| @@ -92,22 +93,18 @@ func (p *podAffinityPriorityMap) processTerm(term *v1.PodAffinityTerm, podDefini | |||||||
| 	} | 	} | ||||||
| 	match := priorityutil.PodMatchesTermsNamespaceAndSelector(podToCheck, namespaces, selector) | 	match := priorityutil.PodMatchesTermsNamespaceAndSelector(podToCheck, namespaces, selector) | ||||||
| 	if match { | 	if match { | ||||||
| 		func() { | 		for _, node := range p.nodes { | ||||||
| 			p.Lock() | 			if priorityutil.NodesHaveSameTopologyKey(node, fixedNode, term.TopologyKey) { | ||||||
| 			defer p.Unlock() | 				atomic.AddInt64(p.counts[node.Name], weight) | ||||||
| 			for _, node := range p.nodes { |  | ||||||
| 				if priorityutil.NodesHaveSameTopologyKey(node, fixedNode, term.TopologyKey) { |  | ||||||
| 					p.counts[node.Name] += weight |  | ||||||
| 				} |  | ||||||
| 			} | 			} | ||||||
| 		}() | 		} | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
| func (p *podAffinityPriorityMap) processTerms(terms []v1.WeightedPodAffinityTerm, podDefiningAffinityTerm, podToCheck *v1.Pod, fixedNode *v1.Node, multiplier int) { | func (p *podAffinityPriorityMap) processTerms(terms []v1.WeightedPodAffinityTerm, podDefiningAffinityTerm, podToCheck *v1.Pod, fixedNode *v1.Node, multiplier int) { | ||||||
| 	for i := range terms { | 	for i := range terms { | ||||||
| 		term := &terms[i] | 		term := &terms[i] | ||||||
| 		p.processTerm(&term.PodAffinityTerm, podDefiningAffinityTerm, podToCheck, fixedNode, float64(term.Weight*int32(multiplier))) | 		p.processTerm(&term.PodAffinityTerm, podDefiningAffinityTerm, podToCheck, fixedNode, int64(term.Weight*int32(multiplier))) | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
| @@ -121,17 +118,21 @@ func (ipa *InterPodAffinity) CalculateInterPodAffinityPriority(pod *v1.Pod, node | |||||||
| 	hasAffinityConstraints := affinity != nil && affinity.PodAffinity != nil | 	hasAffinityConstraints := affinity != nil && affinity.PodAffinity != nil | ||||||
| 	hasAntiAffinityConstraints := affinity != nil && affinity.PodAntiAffinity != nil | 	hasAntiAffinityConstraints := affinity != nil && affinity.PodAntiAffinity != nil | ||||||
|  |  | ||||||
| 	allNodeNames := make([]string, 0, len(nodeNameToInfo)) |  | ||||||
| 	for name := range nodeNameToInfo { |  | ||||||
| 		allNodeNames = append(allNodeNames, name) |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	// convert the topology key based weights to the node name based weights |  | ||||||
| 	var maxCount float64 |  | ||||||
| 	var minCount float64 |  | ||||||
| 	// priorityMap stores the mapping from node name to so-far computed score of | 	// priorityMap stores the mapping from node name to so-far computed score of | ||||||
| 	// the node. | 	// the node. | ||||||
| 	pm := newPodAffinityPriorityMap(nodes) | 	pm := newPodAffinityPriorityMap(nodes) | ||||||
|  | 	allNodeNames := make([]string, 0, len(nodeNameToInfo)) | ||||||
|  | 	lazyInit := hasAffinityConstraints || hasAntiAffinityConstraints | ||||||
|  | 	for name := range nodeNameToInfo { | ||||||
|  | 		allNodeNames = append(allNodeNames, name) | ||||||
|  | 		// if pod has affinity defined, or target node has affinityPods | ||||||
|  | 		if lazyInit || len(nodeNameToInfo[name].PodsWithAffinity()) != 0 { | ||||||
|  | 			pm.counts[name] = new(int64) | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	// convert the topology key based weights to the node name based weights | ||||||
|  | 	var maxCount, minCount int64 | ||||||
|  |  | ||||||
| 	processPod := func(existingPod *v1.Pod) error { | 	processPod := func(existingPod *v1.Pod) error { | ||||||
| 		existingPodNode, err := ipa.info.GetNodeInfo(existingPod.Spec.NodeName) | 		existingPodNode, err := ipa.info.GetNodeInfo(existingPod.Spec.NodeName) | ||||||
| @@ -172,7 +173,7 @@ func (ipa *InterPodAffinity) CalculateInterPodAffinityPriority(pod *v1.Pod, node | |||||||
| 				//	terms = append(terms, existingPodAffinity.PodAffinity.RequiredDuringSchedulingRequiredDuringExecution...) | 				//	terms = append(terms, existingPodAffinity.PodAffinity.RequiredDuringSchedulingRequiredDuringExecution...) | ||||||
| 				//} | 				//} | ||||||
| 				for _, term := range terms { | 				for _, term := range terms { | ||||||
| 					pm.processTerm(&term, existingPod, pod, existingPodNode, float64(ipa.hardPodAffinityWeight)) | 					pm.processTerm(&term, existingPod, pod, existingPodNode, int64(ipa.hardPodAffinityWeight)) | ||||||
| 				} | 				} | ||||||
| 			} | 			} | ||||||
| 			// For every soft pod affinity term of <existingPod>, if <pod> matches the term, | 			// For every soft pod affinity term of <existingPod>, if <pod> matches the term, | ||||||
| @@ -194,7 +195,7 @@ func (ipa *InterPodAffinity) CalculateInterPodAffinityPriority(pod *v1.Pod, node | |||||||
| 		nodeInfo := nodeNameToInfo[allNodeNames[i]] | 		nodeInfo := nodeNameToInfo[allNodeNames[i]] | ||||||
| 		if nodeInfo.Node() != nil { | 		if nodeInfo.Node() != nil { | ||||||
| 			if hasAffinityConstraints || hasAntiAffinityConstraints { | 			if hasAffinityConstraints || hasAntiAffinityConstraints { | ||||||
| 				// We need to process all the nodes. | 				// We need to process all the pods. | ||||||
| 				for _, existingPod := range nodeInfo.Pods() { | 				for _, existingPod := range nodeInfo.Pods() { | ||||||
| 					if err := processPod(existingPod); err != nil { | 					if err := processPod(existingPod); err != nil { | ||||||
| 						pm.setError(err) | 						pm.setError(err) | ||||||
| @@ -217,20 +218,24 @@ func (ipa *InterPodAffinity) CalculateInterPodAffinityPriority(pod *v1.Pod, node | |||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	for _, node := range nodes { | 	for _, node := range nodes { | ||||||
| 		if pm.counts[node.Name] > maxCount { | 		if pm.counts[node.Name] == nil { | ||||||
| 			maxCount = pm.counts[node.Name] | 			continue | ||||||
| 		} | 		} | ||||||
| 		if pm.counts[node.Name] < minCount { | 		if *pm.counts[node.Name] > maxCount { | ||||||
| 			minCount = pm.counts[node.Name] | 			maxCount = *pm.counts[node.Name] | ||||||
|  | 		} | ||||||
|  | 		if *pm.counts[node.Name] < minCount { | ||||||
|  | 			minCount = *pm.counts[node.Name] | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	// calculate final priority score for each node | 	// calculate final priority score for each node | ||||||
| 	result := make(schedulerapi.HostPriorityList, 0, len(nodes)) | 	result := make(schedulerapi.HostPriorityList, 0, len(nodes)) | ||||||
|  | 	maxMinDiff := maxCount - minCount | ||||||
| 	for _, node := range nodes { | 	for _, node := range nodes { | ||||||
| 		fScore := float64(0) | 		fScore := float64(0) | ||||||
| 		if (maxCount - minCount) > 0 { | 		if maxMinDiff > 0 && pm.counts[node.Name] != nil { | ||||||
| 			fScore = float64(schedulerapi.MaxPriority) * ((pm.counts[node.Name] - minCount) / (maxCount - minCount)) | 			fScore = float64(schedulerapi.MaxPriority) * (float64(*pm.counts[node.Name]-minCount) / float64(maxCount-minCount)) | ||||||
| 		} | 		} | ||||||
| 		result = append(result, schedulerapi.HostPriority{Host: node.Name, Score: int(fScore)}) | 		result = append(result, schedulerapi.HostPriority{Host: node.Name, Score: int(fScore)}) | ||||||
| 		if klog.V(10) { | 		if klog.V(10) { | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user
	 Kubernetes Prow Robot
					Kubernetes Prow Robot