Merge pull request #28596 from wojtek-t/scheduler_optimizations
Automatic merge from submit-queue Some scheduler optimizations Ref #28590 This PR doesn't do anything fancy - it is just reducing amount of memory allocations in scheduler, which in turn significantly speeds up scheduler.
This commit is contained in:
6
pkg/client/cache/listers.go
vendored
6
pkg/client/cache/listers.go
vendored
@@ -122,7 +122,7 @@ func (s *StoreToPodLister) Exists(pod *api.Pod) (bool, error) {
|
||||
|
||||
// NodeConditionPredicate is a function that indicates whether the given node's conditions meet
|
||||
// some set of criteria defined by the function.
|
||||
type NodeConditionPredicate func(node api.Node) bool
|
||||
type NodeConditionPredicate func(node *api.Node) bool
|
||||
|
||||
// StoreToNodeLister makes a Store have the List method of the client.NodeInterface
|
||||
// The Store must contain (only) Nodes.
|
||||
@@ -153,9 +153,9 @@ type storeToNodeConditionLister struct {
|
||||
// List returns a list of nodes that match the conditions defined by the predicate functions in the storeToNodeConditionLister.
|
||||
func (s storeToNodeConditionLister) List() (nodes api.NodeList, err error) {
|
||||
for _, m := range s.store.List() {
|
||||
node := *m.(*api.Node)
|
||||
node := m.(*api.Node)
|
||||
if s.predicate(node) {
|
||||
nodes.Items = append(nodes.Items, node)
|
||||
nodes.Items = append(nodes.Items, *node)
|
||||
} else {
|
||||
glog.V(5).Infof("Node %s matches none of the conditions", node.Name)
|
||||
}
|
||||
|
||||
2
pkg/client/cache/listers_test.go
vendored
2
pkg/client/cache/listers_test.go
vendored
@@ -97,7 +97,7 @@ func TestStoreToNodeConditionLister(t *testing.T) {
|
||||
store.Add(n)
|
||||
}
|
||||
|
||||
predicate := func(node api.Node) bool {
|
||||
predicate := func(node *api.Node) bool {
|
||||
for _, cond := range node.Status.Conditions {
|
||||
if cond.Type == api.NodeOutOfDisk && cond.Status == api.ConditionTrue {
|
||||
return false
|
||||
|
||||
@@ -641,7 +641,7 @@ func hostsFromNodeList(list *api.NodeList) []string {
|
||||
}
|
||||
|
||||
func getNodeConditionPredicate() cache.NodeConditionPredicate {
|
||||
return func(node api.Node) bool {
|
||||
return func(node *api.Node) bool {
|
||||
// We add the master to the node list, but its unschedulable. So we use this to filter
|
||||
// the master.
|
||||
// TODO: Use a node annotation to indicate the master
|
||||
|
||||
@@ -319,7 +319,7 @@ func TestGetNodeConditionPredicate(t *testing.T) {
|
||||
}
|
||||
pred := getNodeConditionPredicate()
|
||||
for _, test := range tests {
|
||||
accept := pred(test.node)
|
||||
accept := pred(&test.node)
|
||||
if accept != test.expectAccept {
|
||||
t.Errorf("Test failed for %s, expected %v, saw %v", test.name, test.expectAccept, accept)
|
||||
}
|
||||
|
||||
@@ -204,6 +204,12 @@ func (c *MaxPDVolumeCountChecker) filterVolumes(volumes []api.Volume, namespace
|
||||
}
|
||||
|
||||
func (c *MaxPDVolumeCountChecker) predicate(pod *api.Pod, nodeInfo *schedulercache.NodeInfo) (bool, error) {
|
||||
// If a pod doesn't have any volume attached to it, the predicate will always be true.
|
||||
// Thus we make a fast path for it, to avoid unnecessary computations in this case.
|
||||
if len(pod.Spec.Volumes) == 0 {
|
||||
return true, nil
|
||||
}
|
||||
|
||||
newVolumes := make(map[string]bool)
|
||||
if err := c.filterVolumes(pod.Spec.Volumes, pod.Namespace, newVolumes); err != nil {
|
||||
return false, err
|
||||
@@ -302,6 +308,12 @@ func NewVolumeZonePredicate(pvInfo PersistentVolumeInfo, pvcInfo PersistentVolum
|
||||
}
|
||||
|
||||
func (c *VolumeZoneChecker) predicate(pod *api.Pod, nodeInfo *schedulercache.NodeInfo) (bool, error) {
|
||||
// If a pod doesn't have any volume attached to it, the predicate will always be true.
|
||||
// Thus we make a fast path for it, to avoid unnecessary computations in this case.
|
||||
if len(pod.Spec.Volumes) == 0 {
|
||||
return true, nil
|
||||
}
|
||||
|
||||
node := nodeInfo.Node()
|
||||
if node == nil {
|
||||
return false, fmt.Errorf("node not found")
|
||||
|
||||
@@ -83,7 +83,8 @@ func (s *NodeAffinity) CalculateNodeAffinityPriority(pod *api.Pod, nodeNameToInf
|
||||
}
|
||||
|
||||
result := []schedulerapi.HostPriority{}
|
||||
for _, node := range nodes.Items {
|
||||
for i := range nodes.Items {
|
||||
node := &nodes.Items[i]
|
||||
fScore := float64(0)
|
||||
if maxCount > 0 {
|
||||
fScore = 10 * (float64(counts[node.Name]) / float64(maxCount))
|
||||
|
||||
@@ -44,7 +44,8 @@ func calculateScore(requested int64, capacity int64, node string) int {
|
||||
|
||||
// Calculate the resource occupancy on a node. 'node' has information about the resources on the node.
|
||||
// 'pods' is a list of pods currently scheduled on the node.
|
||||
func calculateResourceOccupancy(pod *api.Pod, node api.Node, nodeInfo *schedulercache.NodeInfo) schedulerapi.HostPriority {
|
||||
// TODO: Use Node() from nodeInfo instead of passing it.
|
||||
func calculateResourceOccupancy(pod *api.Pod, node *api.Node, nodeInfo *schedulercache.NodeInfo) schedulerapi.HostPriority {
|
||||
totalMilliCPU := nodeInfo.NonZeroRequest().MilliCPU
|
||||
totalMemory := nodeInfo.NonZeroRequest().Memory
|
||||
capacityMilliCPU := node.Status.Allocatable.Cpu().MilliValue()
|
||||
@@ -84,8 +85,9 @@ func LeastRequestedPriority(pod *api.Pod, nodeNameToInfo map[string]*schedulerca
|
||||
return schedulerapi.HostPriorityList{}, err
|
||||
}
|
||||
|
||||
list := schedulerapi.HostPriorityList{}
|
||||
for _, node := range nodes.Items {
|
||||
list := make(schedulerapi.HostPriorityList, 0, len(nodes.Items))
|
||||
for i := range nodes.Items {
|
||||
node := &nodes.Items[i]
|
||||
list = append(list, calculateResourceOccupancy(pod, node, nodeNameToInfo[node.Name]))
|
||||
}
|
||||
return list, nil
|
||||
@@ -120,7 +122,7 @@ func (n *NodeLabelPrioritizer) CalculateNodeLabelPriority(pod *api.Pod, nodeName
|
||||
labeledNodes[node.Name] = (exists && n.presence) || (!exists && !n.presence)
|
||||
}
|
||||
|
||||
result := []schedulerapi.HostPriority{}
|
||||
result := make(schedulerapi.HostPriorityList, 0, len(nodes.Items))
|
||||
//score int - scale of 0-10
|
||||
// 0 being the lowest priority and 10 being the highest
|
||||
for nodeName, success := range labeledNodes {
|
||||
@@ -155,7 +157,8 @@ func ImageLocalityPriority(pod *api.Pod, nodeNameToInfo map[string]*schedulercac
|
||||
}
|
||||
|
||||
for _, container := range pod.Spec.Containers {
|
||||
for _, node := range nodes.Items {
|
||||
for i := range nodes.Items {
|
||||
node := &nodes.Items[i]
|
||||
// Check if this container's image is present and get its size.
|
||||
imageSize := checkContainerImageOnNode(node, container)
|
||||
// Add this size to the total result of this node.
|
||||
@@ -163,7 +166,7 @@ func ImageLocalityPriority(pod *api.Pod, nodeNameToInfo map[string]*schedulercac
|
||||
}
|
||||
}
|
||||
|
||||
result := []schedulerapi.HostPriority{}
|
||||
result := make(schedulerapi.HostPriorityList, 0, len(nodes.Items))
|
||||
// score int - scale of 0-10
|
||||
// 0 being the lowest priority and 10 being the highest.
|
||||
for nodeName, sumSize := range sumSizeMap {
|
||||
@@ -174,7 +177,7 @@ func ImageLocalityPriority(pod *api.Pod, nodeNameToInfo map[string]*schedulercac
|
||||
}
|
||||
|
||||
// checkContainerImageOnNode checks if a container image is present on a node and returns its size.
|
||||
func checkContainerImageOnNode(node api.Node, container api.Container) int64 {
|
||||
func checkContainerImageOnNode(node *api.Node, container api.Container) int64 {
|
||||
for _, image := range node.Status.Images {
|
||||
for _, name := range image.Names {
|
||||
if container.Image == name {
|
||||
@@ -218,14 +221,16 @@ func BalancedResourceAllocation(pod *api.Pod, nodeNameToInfo map[string]*schedul
|
||||
return schedulerapi.HostPriorityList{}, err
|
||||
}
|
||||
|
||||
list := schedulerapi.HostPriorityList{}
|
||||
for _, node := range nodes.Items {
|
||||
list := make(schedulerapi.HostPriorityList, 0, len(nodes.Items))
|
||||
for i := range nodes.Items {
|
||||
node := &nodes.Items[i]
|
||||
list = append(list, calculateBalancedResourceAllocation(pod, node, nodeNameToInfo[node.Name]))
|
||||
}
|
||||
return list, nil
|
||||
}
|
||||
|
||||
func calculateBalancedResourceAllocation(pod *api.Pod, node api.Node, nodeInfo *schedulercache.NodeInfo) schedulerapi.HostPriority {
|
||||
// TODO: Use Node() from nodeInfo instead of passing it.
|
||||
func calculateBalancedResourceAllocation(pod *api.Pod, node *api.Node, nodeInfo *schedulercache.NodeInfo) schedulerapi.HostPriority {
|
||||
totalMilliCPU := nodeInfo.NonZeroRequest().MilliCPU
|
||||
totalMemory := nodeInfo.NonZeroRequest().Memory
|
||||
score := int(0)
|
||||
|
||||
@@ -207,7 +207,7 @@ func (s *SelectorSpread) CalculateSpreadPriority(pod *api.Pod, nodeNameToInfo ma
|
||||
}
|
||||
}
|
||||
|
||||
result := []schedulerapi.HostPriority{}
|
||||
result := make(schedulerapi.HostPriorityList, 0, len(nodes.Items))
|
||||
//score int - scale of 0-maxPriority
|
||||
// 0 being the lowest priority and maxPriority being the highest
|
||||
for i := range nodes.Items {
|
||||
|
||||
@@ -83,7 +83,8 @@ func (s *TaintToleration) ComputeTaintTolerationPriority(pod *api.Pod, nodeNameT
|
||||
tolerationList := getAllTolerationPreferNoSchedule(tolerations)
|
||||
|
||||
// calculate the intolerable taints for all the nodes
|
||||
for _, node := range nodes.Items {
|
||||
for i := range nodes.Items {
|
||||
node := &nodes.Items[i]
|
||||
taints, err := api.GetTaintsFromNodeAnnotations(node.Annotations)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -99,7 +100,7 @@ func (s *TaintToleration) ComputeTaintTolerationPriority(pod *api.Pod, nodeNameT
|
||||
// The maximum priority value to give to a node
|
||||
// Priority values range from 0 - maxPriority
|
||||
const maxPriority = 10
|
||||
result := []schedulerapi.HostPriority{}
|
||||
result := make(schedulerapi.HostPriorityList, 0, len(nodes.Items))
|
||||
for _, node := range nodes.Items {
|
||||
fScore := float64(maxPriority)
|
||||
if maxCount > 0 {
|
||||
|
||||
@@ -432,7 +432,7 @@ func (f *ConfigFactory) responsibleForPod(pod *api.Pod) bool {
|
||||
}
|
||||
|
||||
func getNodeConditionPredicate() cache.NodeConditionPredicate {
|
||||
return func(node api.Node) bool {
|
||||
return func(node *api.Node) bool {
|
||||
for _, cond := range node.Status.Conditions {
|
||||
// We consider the node for scheduling only when its:
|
||||
// - NodeReady condition status is ConditionTrue,
|
||||
|
||||
@@ -464,7 +464,7 @@ func TestNodeConditionPredicate(t *testing.T) {
|
||||
|
||||
nodeNames := []string{}
|
||||
for _, node := range nodeList.Items {
|
||||
if nodeFunc(node) {
|
||||
if nodeFunc(&node) {
|
||||
nodeNames = append(nodeNames, node.Name)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -135,7 +135,8 @@ func (g *genericScheduler) selectHost(priorityList schedulerapi.HostPriorityList
|
||||
// Filters the nodes to find the ones that fit based on the given predicate functions
|
||||
// Each node is passed through the predicate functions to determine if it is a fit
|
||||
func findNodesThatFit(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, predicateFuncs map[string]algorithm.FitPredicate, nodes api.NodeList, extenders []algorithm.SchedulerExtender) (api.NodeList, FailedPredicateMap, error) {
|
||||
filtered := []api.Node{}
|
||||
// Create filtered list with enough space to avoid growing it.
|
||||
filtered := make([]api.Node, 0, len(nodes.Items))
|
||||
failedPredicateMap := FailedPredicateMap{}
|
||||
|
||||
if len(predicateFuncs) == 0 {
|
||||
|
||||
Reference in New Issue
Block a user