Add preemption in scheduler extender
Add verb and preemption for scheduler extender Update bazel Use simple preemption in extender Use node name instead of v1.Node Fix support method Fix preemption dup Remove uneeded logics Remove nodeInfo from param to extender Update bazel for scheduler types Mock extender cache with nodeInfo Add nodeInfo as extender cache Choose node name or node based on cache flag Always return meta victims in result
This commit is contained in:
@@ -54,13 +54,6 @@ type FitError struct {
|
||||
FailedPredicates FailedPredicateMap
|
||||
}
|
||||
|
||||
// Victims describes pod victims.
|
||||
type Victims struct {
|
||||
pods []*v1.Pod
|
||||
numPDBViolations int
|
||||
}
|
||||
|
||||
// ErrNoNodesAvailable defines an error of no nodes available.
|
||||
var ErrNoNodesAvailable = fmt.Errorf("no nodes available to schedule pods")
|
||||
|
||||
const (
|
||||
@@ -234,34 +227,64 @@ func (g *genericScheduler) Preempt(pod *v1.Pod, nodeLister algorithm.NodeLister,
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
nodeToVictims, err := selectNodesForPreemption(pod, g.cachedNodeInfoMap, potentialNodes, g.predicates, g.predicateMetaProducer, g.schedulingQueue, pdbs)
|
||||
nodeToVictims, err := selectNodesForPreemption(pod, g.cachedNodeInfoMap, potentialNodes, g.predicates,
|
||||
g.predicateMetaProducer, g.schedulingQueue, pdbs)
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
for len(nodeToVictims) > 0 {
|
||||
node := pickOneNodeForPreemption(nodeToVictims)
|
||||
if node == nil {
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
passes, pErr := nodePassesExtendersForPreemption(pod, node.Name, nodeToVictims[node].pods, g.cachedNodeInfoMap, g.extenders)
|
||||
if passes && pErr == nil {
|
||||
// Lower priority pods nominated to run on this node, may no longer fit on
|
||||
// this node. So, we should remove their nomination. Removing their
|
||||
// nomination updates these pods and moves them to the active queue. It
|
||||
// lets scheduler find another place for them.
|
||||
nominatedPods := g.getLowerPriorityNominatedPods(pod, node.Name)
|
||||
return node, nodeToVictims[node].pods, nominatedPods, err
|
||||
}
|
||||
if pErr != nil {
|
||||
glog.Errorf("Error occurred while checking extenders for preemption on node %v: %v", node, pErr)
|
||||
}
|
||||
// Remove the node from the map and try to pick a different node.
|
||||
delete(nodeToVictims, node)
|
||||
|
||||
// We will only check nodeToVictims with extenders that support preemption.
|
||||
// Extenders which do not support preemption may later prevent preemptor from being scheduled on the nominated
|
||||
// node. In that case, scheduler will find a different host for the preemptor in subsequent scheduling cycles.
|
||||
nodeToVictims, err = g.processPreemptionWithExtenders(pod, nodeToVictims)
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
|
||||
candidateNode := pickOneNodeForPreemption(nodeToVictims)
|
||||
if candidateNode == nil {
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
|
||||
// Lower priority pods nominated to run on this node, may no longer fit on
|
||||
// this node. So, we should remove their nomination. Removing their
|
||||
// nomination updates these pods and moves them to the active queue. It
|
||||
// lets scheduler find another place for them.
|
||||
nominatedPods := g.getLowerPriorityNominatedPods(pod, candidateNode.Name)
|
||||
if nodeInfo, ok := g.cachedNodeInfoMap[candidateNode.Name]; ok {
|
||||
return nodeInfo.Node(), nodeToVictims[candidateNode].Pods, nominatedPods, err
|
||||
} else {
|
||||
return nil, nil, nil, fmt.Errorf(
|
||||
"preemption failed: the target node %s has been deleted from scheduler cache",
|
||||
candidateNode.Name)
|
||||
}
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
|
||||
// GetLowerPriorityNominatedPods returns pods whose priority is smaller than the
|
||||
// processPreemptionWithExtenders processes preemption with extenders
|
||||
func (g *genericScheduler) processPreemptionWithExtenders(
|
||||
pod *v1.Pod,
|
||||
nodeToVictims map[*v1.Node]*schedulerapi.Victims,
|
||||
) (map[*v1.Node]*schedulerapi.Victims, error) {
|
||||
if len(nodeToVictims) > 0 {
|
||||
for _, extender := range g.extenders {
|
||||
if extender.SupportsPreemption() {
|
||||
var err error
|
||||
// Replace nodeToVictims with result after preemption from extender.
|
||||
if nodeToVictims, err = extender.ProcessPreemption(pod, nodeToVictims, g.cachedNodeInfoMap); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// If node list is empty, no preemption will happen, skip other extenders.
|
||||
if len(nodeToVictims) == 0 {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nodeToVictims, nil
|
||||
}
|
||||
|
||||
// getLowerPriorityNominatedPods returns pods whose priority is smaller than the
|
||||
// priority of the given "pod" and are nominated to run on the given node.
|
||||
// Note: We could possibly check if the nominated lower priority pods still fit
|
||||
// and return those that no longer fit, but that would require lots of
|
||||
@@ -270,6 +293,7 @@ func (g *genericScheduler) Preempt(pod *v1.Pod, nodeLister algorithm.NodeLister,
|
||||
// small number of nominated pods per node.
|
||||
func (g *genericScheduler) getLowerPriorityNominatedPods(pod *v1.Pod, nodeName string) []*v1.Pod {
|
||||
pods := g.schedulingQueue.WaitingPodsForNode(nodeName)
|
||||
|
||||
if len(pods) == 0 {
|
||||
return nil
|
||||
}
|
||||
@@ -683,7 +707,7 @@ func EqualPriorityMap(_ *v1.Pod, _ interface{}, nodeInfo *schedulercache.NodeInf
|
||||
// 5. If there are still ties, the first such node is picked (sort of randomly).
|
||||
// The 'minNodes1' and 'minNodes2' are being reused here to save the memory
|
||||
// allocation and garbage collection time.
|
||||
func pickOneNodeForPreemption(nodesToVictims map[*v1.Node]*Victims) *v1.Node {
|
||||
func pickOneNodeForPreemption(nodesToVictims map[*v1.Node]*schedulerapi.Victims) *v1.Node {
|
||||
if len(nodesToVictims) == 0 {
|
||||
return nil
|
||||
}
|
||||
@@ -691,14 +715,14 @@ func pickOneNodeForPreemption(nodesToVictims map[*v1.Node]*Victims) *v1.Node {
|
||||
var minNodes1 []*v1.Node
|
||||
lenNodes1 := 0
|
||||
for node, victims := range nodesToVictims {
|
||||
if len(victims.pods) == 0 {
|
||||
if len(victims.Pods) == 0 {
|
||||
// We found a node that doesn't need any preemption. Return it!
|
||||
// This should happen rarely when one or more pods are terminated between
|
||||
// the time that scheduler tries to schedule the pod and the time that
|
||||
// preemption logic tries to find nodes for preemption.
|
||||
return node
|
||||
}
|
||||
numPDBViolatingPods := victims.numPDBViolations
|
||||
numPDBViolatingPods := victims.NumPDBViolations
|
||||
if numPDBViolatingPods < minNumPDBViolatingPods {
|
||||
minNumPDBViolatingPods = numPDBViolatingPods
|
||||
minNodes1 = nil
|
||||
@@ -722,7 +746,7 @@ func pickOneNodeForPreemption(nodesToVictims map[*v1.Node]*Victims) *v1.Node {
|
||||
node := minNodes1[i]
|
||||
victims := nodesToVictims[node]
|
||||
// highestPodPriority is the highest priority among the victims on this node.
|
||||
highestPodPriority := util.GetPodPriority(victims.pods[0])
|
||||
highestPodPriority := util.GetPodPriority(victims.Pods[0])
|
||||
if highestPodPriority < minHighestPriority {
|
||||
minHighestPriority = highestPodPriority
|
||||
lenNodes2 = 0
|
||||
@@ -743,7 +767,7 @@ func pickOneNodeForPreemption(nodesToVictims map[*v1.Node]*Victims) *v1.Node {
|
||||
for i := 0; i < lenNodes2; i++ {
|
||||
var sumPriorities int64
|
||||
node := minNodes2[i]
|
||||
for _, pod := range nodesToVictims[node].pods {
|
||||
for _, pod := range nodesToVictims[node].Pods {
|
||||
// We add MaxInt32+1 to all priorities to make all of them >= 0. This is
|
||||
// needed so that a node with a few pods with negative priority is not
|
||||
// picked over a node with a smaller number of pods with the same negative
|
||||
@@ -769,7 +793,7 @@ func pickOneNodeForPreemption(nodesToVictims map[*v1.Node]*Victims) *v1.Node {
|
||||
lenNodes2 = 0
|
||||
for i := 0; i < lenNodes1; i++ {
|
||||
node := minNodes1[i]
|
||||
numPods := len(nodesToVictims[node].pods)
|
||||
numPods := len(nodesToVictims[node].Pods)
|
||||
if numPods < minNumPods {
|
||||
minNumPods = numPods
|
||||
lenNodes2 = 0
|
||||
@@ -797,9 +821,9 @@ func selectNodesForPreemption(pod *v1.Pod,
|
||||
metadataProducer algorithm.PredicateMetadataProducer,
|
||||
queue SchedulingQueue,
|
||||
pdbs []*policy.PodDisruptionBudget,
|
||||
) (map[*v1.Node]*Victims, error) {
|
||||
) (map[*v1.Node]*schedulerapi.Victims, error) {
|
||||
|
||||
nodeNameToVictims := map[*v1.Node]*Victims{}
|
||||
nodeToVictims := map[*v1.Node]*schedulerapi.Victims{}
|
||||
var resultLock sync.Mutex
|
||||
|
||||
// We can use the same metadata producer for all nodes.
|
||||
@@ -813,50 +837,16 @@ func selectNodesForPreemption(pod *v1.Pod,
|
||||
pods, numPDBViolations, fits := selectVictimsOnNode(pod, metaCopy, nodeNameToInfo[nodeName], predicates, queue, pdbs)
|
||||
if fits {
|
||||
resultLock.Lock()
|
||||
victims := Victims{
|
||||
pods: pods,
|
||||
numPDBViolations: numPDBViolations,
|
||||
victims := schedulerapi.Victims{
|
||||
Pods: pods,
|
||||
NumPDBViolations: numPDBViolations,
|
||||
}
|
||||
nodeNameToVictims[potentialNodes[i]] = &victims
|
||||
nodeToVictims[potentialNodes[i]] = &victims
|
||||
resultLock.Unlock()
|
||||
}
|
||||
}
|
||||
workqueue.Parallelize(16, len(potentialNodes), checkNode)
|
||||
return nodeNameToVictims, nil
|
||||
}
|
||||
|
||||
func nodePassesExtendersForPreemption(
|
||||
pod *v1.Pod,
|
||||
nodeName string,
|
||||
victims []*v1.Pod,
|
||||
nodeNameToInfo map[string]*schedulercache.NodeInfo,
|
||||
extenders []algorithm.SchedulerExtender) (bool, error) {
|
||||
// If there are any extenders, run them and filter the list of candidate nodes.
|
||||
if len(extenders) == 0 {
|
||||
return true, nil
|
||||
}
|
||||
// Remove the victims from the corresponding nodeInfo and send nodes to the
|
||||
// extenders for filtering.
|
||||
originalNodeInfo := nodeNameToInfo[nodeName]
|
||||
nodeInfoCopy := nodeNameToInfo[nodeName].Clone()
|
||||
for _, victim := range victims {
|
||||
nodeInfoCopy.RemovePod(victim)
|
||||
}
|
||||
nodeNameToInfo[nodeName] = nodeInfoCopy
|
||||
defer func() { nodeNameToInfo[nodeName] = originalNodeInfo }()
|
||||
filteredNodes := []*v1.Node{nodeInfoCopy.Node()}
|
||||
for _, extender := range extenders {
|
||||
var err error
|
||||
var failedNodesMap map[string]string
|
||||
filteredNodes, failedNodesMap, err = extender.Filter(pod, filteredNodes, nodeNameToInfo)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if _, found := failedNodesMap[nodeName]; found || len(filteredNodes) == 0 {
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
return true, nil
|
||||
return nodeToVictims, nil
|
||||
}
|
||||
|
||||
// filterPodsWithPDBViolation groups the given "pods" into two groups of "violatingPods"
|
||||
@@ -996,6 +986,7 @@ func nodesWherePreemptionMightHelp(pod *v1.Pod, nodes []*v1.Node, failedPredicat
|
||||
// (which is the case today), the !found case should never happen, but we'd prefer
|
||||
// to rely less on such assumptions in the code when checking does not impose
|
||||
// significant overhead.
|
||||
// Also, we currently assume all failures returned by extender as resolvable.
|
||||
for _, failedPredicate := range failedPredicates {
|
||||
switch failedPredicate {
|
||||
case
|
||||
|
Reference in New Issue
Block a user