Merge pull request #55933 from bsalamat/starvation3

Automatic merge from submit-queue (batch tested with PRs 54316, 53400, 55933, 55786, 55794). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Add support to take nominated pods into account during scheduling to avoid starvation of higher priority pods

**What this PR does / why we need it**:
When a pod preempts lower priority pods, the preemptor gets a "nominated node name" annotation. We call such a pod a nominated pod. This PR adds the logic to take such nominated pods into account when scheduling other pods on the same node that the nominated pod is expected to run. This is needed to avoid starvation of preemptor pods. Otherwise, lower priority pods may fill up the space freed after preemption before the preemptor gets a chance to get scheduled.

**Which issue(s) this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close the issue(s) when PR gets merged)*:
Fixes #54501

**Special notes for your reviewer**: This PR is built on top of #55109 and includes all the changes there as well.

**Release note**:

```release-note
Add support to take nominated pods into account during scheduling to avoid starvation of higher priority pods.
```

/sig scheduling
ref/ #47604
This commit is contained in:
Kubernetes Submit Queue
2017-11-21 15:04:28 -08:00
committed by GitHub
14 changed files with 869 additions and 390 deletions

View File

@@ -49,8 +49,9 @@ type ScheduleAlgorithm interface {
Schedule(*v1.Pod, NodeLister) (selectedMachine string, err error)
// Preempt receives scheduling errors for a pod and tries to create room for
// the pod by preempting lower priority pods if possible.
// It returns the node where preemption happened, a list of preempted pods, and error if any.
Preempt(*v1.Pod, NodeLister, error) (selectedNode *v1.Node, preemptedPods []*v1.Pod, err error)
// It returns the node where preemption happened, a list of preempted pods, a
// list of pods whose nominated node name should be removed, and error if any.
Preempt(*v1.Pod, NodeLister, error) (selectedNode *v1.Node, preemptedPods []*v1.Pod, cleanupNominatedPods []*v1.Pod, err error)
// Predicates() returns a pointer to a map of predicate functions. This is
// exposed for testing.
Predicates() map[string]FitPredicate

View File

@@ -45,6 +45,7 @@ go_library(
],
importpath = "k8s.io/kubernetes/plugin/pkg/scheduler/core",
deps = [
"//pkg/api/v1/pod:go_default_library",
"//pkg/util/hash:go_default_library",
"//plugin/pkg/scheduler/algorithm:go_default_library",
"//plugin/pkg/scheduler/algorithm/predicates:go_default_library",

View File

@@ -315,8 +315,9 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
for _, name := range test.nodes {
cache.AddNode(&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: name}})
}
queue := NewSchedulingQueue()
scheduler := NewGenericScheduler(
cache, nil, test.predicates, algorithm.EmptyPredicateMetadataProducer, test.prioritizers, algorithm.EmptyMetadataProducer, extenders)
cache, nil, queue, test.predicates, algorithm.EmptyPredicateMetadataProducer, test.prioritizers, algorithm.EmptyMetadataProducer, extenders)
podIgnored := &v1.Pod{}
machine, err := scheduler.Schedule(podIgnored, schedulertesting.FakeNodeLister(makeNodeList(test.nodes)))
if test.expectsErr {

View File

@@ -81,6 +81,7 @@ func (f *FitError) Error() string {
type genericScheduler struct {
cache schedulercache.Cache
equivalenceCache *EquivalenceCache
schedulingQueue SchedulingQueue
predicates map[string]algorithm.FitPredicate
priorityMetaProducer algorithm.MetadataProducer
predicateMetaProducer algorithm.PredicateMetadataProducer
@@ -114,7 +115,7 @@ func (g *genericScheduler) Schedule(pod *v1.Pod, nodeLister algorithm.NodeLister
}
trace.Step("Computing predicates")
filteredNodes, failedPredicateMap, err := findNodesThatFit(pod, g.cachedNodeInfoMap, nodes, g.predicates, g.extenders, g.predicateMetaProducer, g.equivalenceCache)
filteredNodes, failedPredicateMap, err := findNodesThatFit(pod, g.cachedNodeInfoMap, nodes, g.predicates, g.extenders, g.predicateMetaProducer, g.equivalenceCache, g.schedulingQueue)
if err != nil {
return "", err
}
@@ -177,53 +178,54 @@ func (g *genericScheduler) selectHost(priorityList schedulerapi.HostPriorityList
// preempt finds nodes with pods that can be preempted to make room for "pod" to
// schedule. It chooses one of the nodes and preempts the pods on the node and
// returns the node and the list of preempted pods if such a node is found.
// TODO(bsalamat): Add priority-based scheduling. More info: today one or more
// pending pods (different from the pod that triggered the preemption(s)) may
// schedule into some portion of the resources freed up by the preemption(s)
// before the pod that triggered the preemption(s) has a chance to schedule
// there, thereby preventing the pod that triggered the preemption(s) from
// scheduling. Solution is given at:
// https://github.com/kubernetes/community/blob/master/contributors/design-proposals/scheduling/pod-preemption.md#preemption-mechanics
func (g *genericScheduler) Preempt(pod *v1.Pod, nodeLister algorithm.NodeLister, scheduleErr error) (*v1.Node, []*v1.Pod, error) {
// returns 1) the node, 2) the list of preempted pods if such a node is found,
// 3) A list of pods whose nominated node name should be cleared, and 4) any
// possible error.
func (g *genericScheduler) Preempt(pod *v1.Pod, nodeLister algorithm.NodeLister, scheduleErr error) (*v1.Node, []*v1.Pod, []*v1.Pod, error) {
// Scheduler may return various types of errors. Consider preemption only if
// the error is of type FitError.
fitError, ok := scheduleErr.(*FitError)
if !ok || fitError == nil {
return nil, nil, nil
return nil, nil, nil, nil
}
err := g.cache.UpdateNodeNameToInfoMap(g.cachedNodeInfoMap)
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}
if !podEligibleToPreemptOthers(pod, g.cachedNodeInfoMap) {
glog.V(5).Infof("Pod %v is not eligible for more preemption.", pod.Name)
return nil, nil, nil
return nil, nil, nil, nil
}
allNodes, err := nodeLister.List()
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}
if len(allNodes) == 0 {
return nil, nil, ErrNoNodesAvailable
return nil, nil, nil, ErrNoNodesAvailable
}
potentialNodes := nodesWherePreemptionMightHelp(pod, allNodes, fitError.FailedPredicates)
if len(potentialNodes) == 0 {
glog.V(3).Infof("Preemption will not help schedule pod %v on any node.", pod.Name)
return nil, nil, nil
// In this case, we should clean-up any existing nominated node name of the pod.
return nil, nil, []*v1.Pod{pod}, nil
}
nodeToPods, err := selectNodesForPreemption(pod, g.cachedNodeInfoMap, potentialNodes, g.predicates, g.predicateMetaProducer)
nodeToPods, err := selectNodesForPreemption(pod, g.cachedNodeInfoMap, potentialNodes, g.predicates, g.predicateMetaProducer, g.schedulingQueue)
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}
for len(nodeToPods) > 0 {
node := pickOneNodeForPreemption(nodeToPods)
if node == nil {
return nil, nil, err
return nil, nil, nil, err
}
passes, pErr := nodePassesExtendersForPreemption(pod, node.Name, nodeToPods[node], g.cachedNodeInfoMap, g.extenders)
if passes && pErr == nil {
return node, nodeToPods[node], err
// Lower priority pods nominated to run on this node, may no longer fit on
// this node. So, we should remove their nomination. Removing their
// nomination updates these pods and moves them to the active queue. It
// lets scheduler find another place for them.
nominatedPods := g.getLowerPriorityNominatedPods(pod, node.Name)
return node, nodeToPods[node], nominatedPods, err
}
if pErr != nil {
glog.Errorf("Error occurred while checking extenders for preemption on node %v: %v", node, pErr)
@@ -231,7 +233,30 @@ func (g *genericScheduler) Preempt(pod *v1.Pod, nodeLister algorithm.NodeLister,
// Remove the node from the map and try to pick a different node.
delete(nodeToPods, node)
}
return nil, nil, err
return nil, nil, nil, err
}
// GetLowerPriorityNominatedPods returns pods whose priority is smaller than the
// priority of the given "pod" and are nominated to run on the given node.
// Note: We could possibly check if the nominated lower priority pods still fit
// and return those that no longer fit, but that would require lots of
// manipulation of NodeInfo and PredicateMeta per nominated pod. It may not be
// worth the complexity, especially because we generally expect to have a very
// small number of nominated pods per node.
func (g *genericScheduler) getLowerPriorityNominatedPods(pod *v1.Pod, nodeName string) []*v1.Pod {
pods := g.schedulingQueue.WaitingPodsForNode(nodeName)
if len(pods) == 0 {
return nil
}
var lowerPriorityPods []*v1.Pod
podPriority := util.GetPodPriority(pod)
for _, p := range pods {
if util.GetPodPriority(p) < podPriority {
lowerPriorityPods = append(lowerPriorityPods, p)
}
}
return lowerPriorityPods
}
// Filters the nodes to find the ones that fit based on the given predicate functions
@@ -244,6 +269,7 @@ func findNodesThatFit(
extenders []algorithm.SchedulerExtender,
metadataProducer algorithm.PredicateMetadataProducer,
ecache *EquivalenceCache,
schedulingQueue SchedulingQueue,
) ([]*v1.Node, FailedPredicateMap, error) {
var filtered []*v1.Node
failedPredicateMap := FailedPredicateMap{}
@@ -262,7 +288,7 @@ func findNodesThatFit(
meta := metadataProducer(pod, nodeNameToInfo)
checkNode := func(i int) {
nodeName := nodes[i].Name
fits, failedPredicates, err := podFitsOnNode(pod, meta, nodeNameToInfo[nodeName], predicateFuncs, ecache)
fits, failedPredicates, err := podFitsOnNode(pod, meta, nodeNameToInfo[nodeName], predicateFuncs, ecache, schedulingQueue)
if err != nil {
predicateResultLock.Lock()
errs[err.Error()]++
@@ -306,9 +332,52 @@ func findNodesThatFit(
return filtered, failedPredicateMap, nil
}
// Checks whether node with a given name and NodeInfo satisfies all predicateFuncs.
func podFitsOnNode(pod *v1.Pod, meta algorithm.PredicateMetadata, info *schedulercache.NodeInfo, predicateFuncs map[string]algorithm.FitPredicate,
ecache *EquivalenceCache) (bool, []algorithm.PredicateFailureReason, error) {
// addNominatedPods adds pods with equal or greater priority which are nominated
// to run on the node given in nodeInfo to meta and nodeInfo. It returns 1) whether
// any pod was found, 2) augmented meta data, 3) augmented nodeInfo.
func addNominatedPods(podPriority int32, meta algorithm.PredicateMetadata,
nodeInfo *schedulercache.NodeInfo, queue SchedulingQueue) (bool, algorithm.PredicateMetadata,
*schedulercache.NodeInfo) {
if queue == nil || nodeInfo == nil || nodeInfo.Node() == nil {
// This may happen only in tests.
return false, meta, nodeInfo
}
nominatedPods := queue.WaitingPodsForNode(nodeInfo.Node().Name)
if nominatedPods == nil || len(nominatedPods) == 0 {
return false, meta, nodeInfo
}
var metaOut algorithm.PredicateMetadata = nil
if meta != nil {
metaOut = meta.ShallowCopy()
}
nodeInfoOut := nodeInfo.Clone()
for _, p := range nominatedPods {
if util.GetPodPriority(p) >= podPriority {
nodeInfoOut.AddPod(p)
if metaOut != nil {
metaOut.AddPod(p, nodeInfoOut)
}
}
}
return true, metaOut, nodeInfoOut
}
// podFitsOnNode checks whether a node given by NodeInfo satisfies the given predicate functions.
// This function is called from two different places: Schedule and Preempt.
// When it is called from Schedule, we want to test whether the pod is schedulable
// on the node with all the existing pods on the node plus higher and equal priority
// pods nominated to run on the node.
// When it is called from Preempt, we should remove the victims of preemption and
// add the nominated pods. Removal of the victims is done by SelectVictimsOnNode().
// It removes victims from meta and NodeInfo before calling this function.
func podFitsOnNode(
pod *v1.Pod,
meta algorithm.PredicateMetadata,
info *schedulercache.NodeInfo,
predicateFuncs map[string]algorithm.FitPredicate,
ecache *EquivalenceCache,
queue SchedulingQueue,
) (bool, []algorithm.PredicateFailureReason, error) {
var (
equivalenceHash uint64
failedPredicates []algorithm.PredicateFailureReason
@@ -318,34 +387,85 @@ func podFitsOnNode(pod *v1.Pod, meta algorithm.PredicateMetadata, info *schedule
reasons []algorithm.PredicateFailureReason
err error
)
predicateResults := make(map[string]HostPredicate)
if ecache != nil {
// getHashEquivalencePod will return immediately if no equivalence pod found
equivalenceHash, eCacheAvailable = ecache.getHashEquivalencePod(pod)
}
for predicateKey, predicate := range predicateFuncs {
// If equivalenceCache is available
if eCacheAvailable {
// PredicateWithECache will returns it's cached predicate results
fit, reasons, invalid = ecache.PredicateWithECache(pod.GetName(), info.Node().GetName(), predicateKey, equivalenceHash)
podsAdded := false
// We run predicates twice in some cases. If the node has greater or equal priority
// nominated pods, we run them when those pods are added to meta and nodeInfo.
// If all predicates succeed in this pass, we run them again when these
// nominated pods are not added. This second pass is necessary because some
// predicates such as inter-pod affinity may not pass without the nominated pods.
// If there are no nominated pods for the node or if the first run of the
// predicates fail, we don't run the second pass.
// We consider only equal or higher priority pods in the first pass, because
// those are the current "pod" must yield to them and not take a space opened
// for running them. It is ok if the current "pod" take resources freed for
// lower priority pods.
// Requiring that the new pod is schedulable in both circumstances ensures that
// we are making a conservative decision: predicates like resources and inter-pod
// anti-affinity are more likely to fail when the nominated pods are treated
// as running, while predicates like pod affinity are more likely to fail when
// the nominated pods are treated as not running. We can't just assume the
// nominated pods are running because they are not running right now and in fact,
// they may end up getting scheduled to a different node.
for i := 0; i < 2; i++ {
metaToUse := meta
nodeInfoToUse := info
if i == 0 {
podsAdded, metaToUse, nodeInfoToUse = addNominatedPods(util.GetPodPriority(pod), meta, info, queue)
} else if !podsAdded || len(failedPredicates) != 0 {
break
}
if !eCacheAvailable || invalid {
// we need to execute predicate functions since equivalence cache does not work
fit, reasons, err = predicate(pod, meta, info)
if err != nil {
return false, []algorithm.PredicateFailureReason{}, err
}
// Bypass eCache if node has any nominated pods.
// TODO(bsalamat): consider using eCache and adding proper eCache invalidations
// when pods are nominated or their nominations change.
eCacheAvailable = eCacheAvailable && !podsAdded
for predicateKey, predicate := range predicateFuncs {
if eCacheAvailable {
// update equivalence cache with newly computed fit & reasons
// TODO(resouer) should we do this in another thread? any race?
ecache.UpdateCachedPredicateItem(pod.GetName(), info.Node().GetName(), predicateKey, fit, reasons, equivalenceHash)
// PredicateWithECache will return its cached predicate results.
fit, reasons, invalid = ecache.PredicateWithECache(pod.GetName(), info.Node().GetName(), predicateKey, equivalenceHash)
}
// TODO(bsalamat): When one predicate fails and fit is false, why do we continue
// checking other predicates?
if !eCacheAvailable || invalid {
// we need to execute predicate functions since equivalence cache does not work
fit, reasons, err = predicate(pod, metaToUse, nodeInfoToUse)
if err != nil {
return false, []algorithm.PredicateFailureReason{}, err
}
if eCacheAvailable {
// Store data to update eCache after this loop.
if res, exists := predicateResults[predicateKey]; exists {
res.Fit = res.Fit && fit
res.FailReasons = append(res.FailReasons, reasons...)
predicateResults[predicateKey] = res
} else {
predicateResults[predicateKey] = HostPredicate{Fit: fit, FailReasons: reasons}
}
}
}
if !fit {
// eCache is available and valid, and predicates result is unfit, record the fail reasons
failedPredicates = append(failedPredicates, reasons...)
}
}
}
if !fit {
// eCache is available and valid, and predicates result is unfit, record the fail reasons
failedPredicates = append(failedPredicates, reasons...)
// TODO(bsalamat): This way of updating equiv. cache has a race condition against
// cache invalidations invoked in event handlers. This race has existed despite locks
// in eCache implementation. If cache is invalidated after a predicate is executed
// and before we update the cache, the updates should not be written to the cache.
if eCacheAvailable {
nodeName := info.Node().GetName()
for predKey, result := range predicateResults {
// update equivalence cache with newly computed fit & reasons
// TODO(resouer) should we do this in another thread? any race?
ecache.UpdateCachedPredicateItem(pod.GetName(), nodeName, predKey, result.Fit, result.FailReasons, equivalenceHash)
}
}
return len(failedPredicates) == 0, failedPredicates, nil
@@ -597,6 +717,7 @@ func selectNodesForPreemption(pod *v1.Pod,
potentialNodes []*v1.Node,
predicates map[string]algorithm.FitPredicate,
metadataProducer algorithm.PredicateMetadataProducer,
queue SchedulingQueue,
) (map[*v1.Node][]*v1.Pod, error) {
nodeNameToPods := map[*v1.Node][]*v1.Pod{}
@@ -610,7 +731,7 @@ func selectNodesForPreemption(pod *v1.Pod,
if meta != nil {
metaCopy = meta.ShallowCopy()
}
pods, fits := selectVictimsOnNode(pod, metaCopy, nodeNameToInfo[nodeName], predicates)
pods, fits := selectVictimsOnNode(pod, metaCopy, nodeNameToInfo[nodeName], predicates, queue)
if fits {
resultLock.Lock()
nodeNameToPods[potentialNodes[i]] = pods
@@ -672,7 +793,9 @@ func selectVictimsOnNode(
pod *v1.Pod,
meta algorithm.PredicateMetadata,
nodeInfo *schedulercache.NodeInfo,
fitPredicates map[string]algorithm.FitPredicate) ([]*v1.Pod, bool) {
fitPredicates map[string]algorithm.FitPredicate,
queue SchedulingQueue,
) ([]*v1.Pod, bool) {
potentialVictims := util.SortableList{CompFunc: util.HigherPriorityPod}
nodeInfoCopy := nodeInfo.Clone()
@@ -703,7 +826,7 @@ func selectVictimsOnNode(
// that we should check is if the "pod" is failing to schedule due to pod affinity
// failure.
// TODO(bsalamat): Consider checking affinity to lower priority pods if feasible with reasonable performance.
if fits, _, err := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, nil); !fits {
if fits, _, err := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, nil, queue); !fits {
if err != nil {
glog.Warningf("Encountered error while selecting victims on node %v: %v", nodeInfo.Node().Name, err)
}
@@ -714,7 +837,7 @@ func selectVictimsOnNode(
for _, p := range potentialVictims.Items {
lpp := p.(*v1.Pod)
addPod(lpp)
if fits, _, _ := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, nil); !fits {
if fits, _, _ := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, nil, queue); !fits {
removePod(lpp)
victims = append(victims, lpp)
glog.V(5).Infof("Pod %v is a potential preemption victim on node %v.", lpp.Name, nodeInfo.Node().Name)
@@ -764,7 +887,6 @@ func nodesWherePreemptionMightHelp(pod *v1.Pod, nodes []*v1.Node, failedPredicat
// considered for preemption.
// We look at the node that is nominated for this pod and as long as there are
// terminating pods on the node, we don't consider this for preempting more pods.
// TODO(bsalamat): Revisit this algorithm once scheduling by priority is added.
func podEligibleToPreemptOthers(pod *v1.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo) bool {
if nodeName, found := pod.Annotations[NominatedNodeAnnotationKey]; found {
if nodeInfo, found := nodeNameToInfo[nodeName]; found {
@@ -782,6 +904,7 @@ func podEligibleToPreemptOthers(pod *v1.Pod, nodeNameToInfo map[string]*schedule
func NewGenericScheduler(
cache schedulercache.Cache,
eCache *EquivalenceCache,
podQueue SchedulingQueue,
predicates map[string]algorithm.FitPredicate,
predicateMetaProducer algorithm.PredicateMetadataProducer,
prioritizers []algorithm.PriorityConfig,
@@ -790,6 +913,7 @@ func NewGenericScheduler(
return &genericScheduler{
cache: cache,
equivalenceCache: eCache,
schedulingQueue: podQueue,
predicates: predicates,
predicateMetaProducer: predicateMetaProducer,
prioritizers: prioritizers,

View File

@@ -311,7 +311,7 @@ func TestGenericScheduler(t *testing.T) {
}
scheduler := NewGenericScheduler(
cache, nil, test.predicates, algorithm.EmptyPredicateMetadataProducer, test.prioritizers, algorithm.EmptyMetadataProducer, []algorithm.SchedulerExtender{})
cache, nil, NewSchedulingQueue(), test.predicates, algorithm.EmptyPredicateMetadataProducer, test.prioritizers, algorithm.EmptyMetadataProducer, []algorithm.SchedulerExtender{})
machine, err := scheduler.Schedule(test.pod, schedulertesting.FakeNodeLister(makeNodeList(test.nodes)))
if !reflect.DeepEqual(err, test.wErr) {
@@ -331,7 +331,7 @@ func TestFindFitAllError(t *testing.T) {
"2": schedulercache.NewNodeInfo(),
"1": schedulercache.NewNodeInfo(),
}
_, predicateMap, err := findNodesThatFit(&v1.Pod{}, nodeNameToInfo, makeNodeList(nodes), predicates, nil, algorithm.EmptyPredicateMetadataProducer, nil)
_, predicateMap, err := findNodesThatFit(&v1.Pod{}, nodeNameToInfo, makeNodeList(nodes), predicates, nil, algorithm.EmptyPredicateMetadataProducer, nil, nil)
if err != nil {
t.Errorf("unexpected error: %v", err)
@@ -365,7 +365,7 @@ func TestFindFitSomeError(t *testing.T) {
nodeNameToInfo[name].SetNode(&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: name}})
}
_, predicateMap, err := findNodesThatFit(pod, nodeNameToInfo, makeNodeList(nodes), predicates, nil, algorithm.EmptyPredicateMetadataProducer, nil)
_, predicateMap, err := findNodesThatFit(pod, nodeNameToInfo, makeNodeList(nodes), predicates, nil, algorithm.EmptyPredicateMetadataProducer, nil, nil)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@@ -790,7 +790,7 @@ func TestSelectNodesForPreemption(t *testing.T) {
test.predicates[predicates.MatchInterPodAffinity] = algorithmpredicates.NewPodAffinityPredicate(FakeNodeInfo(*nodes[0]), schedulertesting.FakePodLister(test.pods))
}
nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods, nodes)
nodeToPods, err := selectNodesForPreemption(test.pod, nodeNameToInfo, nodes, test.predicates, PredicateMetadata)
nodeToPods, err := selectNodesForPreemption(test.pod, nodeNameToInfo, nodes, test.predicates, PredicateMetadata, nil)
if err != nil {
t.Error(err)
}
@@ -947,7 +947,7 @@ func TestPickOneNodeForPreemption(t *testing.T) {
nodes = append(nodes, makeNode(n, priorityutil.DefaultMilliCpuRequest*5, priorityutil.DefaultMemoryRequest*5))
}
nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods, nodes)
candidateNodes, _ := selectNodesForPreemption(test.pod, nodeNameToInfo, nodes, test.predicates, PredicateMetadata)
candidateNodes, _ := selectNodesForPreemption(test.pod, nodeNameToInfo, nodes, test.predicates, PredicateMetadata, nil)
node := pickOneNodeForPreemption(candidateNodes)
found := false
for _, nodeName := range test.expected {
@@ -1190,9 +1190,9 @@ func TestPreempt(t *testing.T) {
extenders = append(extenders, extender)
}
scheduler := NewGenericScheduler(
cache, nil, map[string]algorithm.FitPredicate{"matches": algorithmpredicates.PodFitsResources}, algorithm.EmptyPredicateMetadataProducer, []algorithm.PriorityConfig{{Function: numericPriority, Weight: 1}}, algorithm.EmptyMetadataProducer, extenders)
cache, nil, NewSchedulingQueue(), map[string]algorithm.FitPredicate{"matches": algorithmpredicates.PodFitsResources}, algorithm.EmptyPredicateMetadataProducer, []algorithm.PriorityConfig{{Function: numericPriority, Weight: 1}}, algorithm.EmptyMetadataProducer, extenders)
// Call Preempt and check the expected results.
node, victims, err := scheduler.Preempt(test.pod, schedulertesting.FakeNodeLister(makeNodeList(nodeNames)), error(&FitError{Pod: test.pod, FailedPredicates: failedPredMap}))
node, victims, _, err := scheduler.Preempt(test.pod, schedulertesting.FakeNodeLister(makeNodeList(nodeNames)), error(&FitError{Pod: test.pod, FailedPredicates: failedPredMap}))
if err != nil {
t.Errorf("test [%v]: unexpected error in preemption: %v", test.name, err)
}
@@ -1220,7 +1220,7 @@ func TestPreempt(t *testing.T) {
test.pod.Annotations[NominatedNodeAnnotationKey] = node.Name
}
// Call preempt again and make sure it doesn't preempt any more pods.
node, victims, err = scheduler.Preempt(test.pod, schedulertesting.FakeNodeLister(makeNodeList(nodeNames)), error(&FitError{Pod: test.pod, FailedPredicates: failedPredMap}))
node, victims, _, err = scheduler.Preempt(test.pod, schedulertesting.FakeNodeLister(makeNodeList(nodeNames)), error(&FitError{Pod: test.pod, FailedPredicates: failedPredMap}))
if err != nil {
t.Errorf("test [%v]: unexpected error in preemption: %v", test.name, err)
}

View File

@@ -22,7 +22,7 @@ limitations under the License.
// pods that are already tried and are determined to be unschedulable. The latter
// is called unschedulableQ.
// FIFO is here for flag-gating purposes and allows us to use the traditional
// scheduling queue when Pod Priority flag is false.
// scheduling queue when util.PodPriorityEnabled() returns false.
package core
@@ -34,6 +34,7 @@ import (
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/cache"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates"
priorityutil "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/priorities/util"
"k8s.io/kubernetes/plugin/pkg/scheduler/util"
@@ -217,6 +218,11 @@ func (p *PriorityQueue) AddIfNotPresent(pod *v1.Pod) error {
return err
}
func isPodUnschedulable(pod *v1.Pod) bool {
_, cond := podutil.GetPodCondition(&pod.Status, v1.PodScheduled)
return cond != nil && cond.Status == v1.ConditionFalse && cond.Reason == v1.PodReasonUnschedulable
}
// AddUnschedulableIfNotPresent does nothing if the pod is present in either
// queue. Otherwise it adds the pod to the unschedulable queue if
// p.receivedMoveRequest is false, and to the activeQ if p.receivedMoveRequest is true.
@@ -229,11 +235,15 @@ func (p *PriorityQueue) AddUnschedulableIfNotPresent(pod *v1.Pod) error {
if _, exists, _ := p.activeQ.Get(pod); exists {
return fmt.Errorf("pod is already present in the activeQ")
}
if p.receivedMoveRequest {
return p.activeQ.Add(pod)
if !p.receivedMoveRequest && isPodUnschedulable(pod) {
p.unschedulableQ.Add(pod)
return nil
}
p.unschedulableQ.Add(pod)
return nil
err := p.activeQ.Add(pod)
if err == nil {
p.cond.Broadcast()
}
return err
}
// Pop removes the head of the active queue and returns it. It blocks if the
@@ -259,6 +269,7 @@ func isPodUpdated(oldPod, newPod *v1.Pod) bool {
strip := func(pod *v1.Pod) *v1.Pod {
p := pod.DeepCopy()
p.ResourceVersion = ""
p.Generation = 0
p.Status = v1.PodStatus{}
return p
}
@@ -274,15 +285,12 @@ func (p *PriorityQueue) Update(pod *v1.Pod) error {
// If the pod is already in the active queue, just update it there.
if _, exists, _ := p.activeQ.Get(pod); exists {
err := p.activeQ.Update(pod)
if err == nil {
p.cond.Broadcast()
}
return err
}
// If the pod is in the unschedulable queue, updating it may make it schedulable.
if oldPod := p.unschedulableQ.Get(pod); oldPod != nil {
if isPodUpdated(oldPod, pod) {
p.unschedulableQ.Delete(pod)
p.unschedulableQ.Delete(oldPod)
err := p.activeQ.Add(pod)
if err == nil {
p.cond.Broadcast()
@@ -386,7 +394,18 @@ func (p *PriorityQueue) getUnschedulablePodsWithMatchingAffinityTerm(pod *v1.Pod
// but they are waiting for other pods to be removed from the node before they
// can be actually scheduled.
func (p *PriorityQueue) WaitingPodsForNode(nodeName string) []*v1.Pod {
return p.unschedulableQ.GetPodsWaitingForNode(nodeName)
p.lock.RLock()
defer p.lock.RUnlock()
pods := p.unschedulableQ.GetPodsWaitingForNode(nodeName)
for _, obj := range p.activeQ.List() {
pod := obj.(*v1.Pod)
if pod.Annotations != nil {
if n, ok := pod.Annotations[NominatedNodeAnnotationKey]; ok && n == nodeName {
pods = append(pods, pod)
}
}
}
return pods
}
// UnschedulablePodsMap holds pods that cannot be scheduled. This data structure

View File

@@ -399,7 +399,6 @@ func (c *configFactory) onPvcDelete(obj interface{}) {
}
c.invalidatePredicatesForPvc(pvc)
}
c.podQueue.MoveAllToActiveQueue()
}
func (c *configFactory) invalidatePredicatesForPvc(pvc *v1.PersistentVolumeClaim) {
@@ -831,7 +830,7 @@ func (f *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String,
f.equivalencePodCache = core.NewEquivalenceCache(getEquivalencePodFunc)
glog.Info("Created equivalence class cache")
}
algo := core.NewGenericScheduler(f.schedulerCache, f.equivalencePodCache, predicateFuncs, predicateMetaProducer, priorityConfigs, priorityMetaProducer, extenders)
algo := core.NewGenericScheduler(f.schedulerCache, f.equivalencePodCache, f.podQueue, predicateFuncs, predicateMetaProducer, priorityConfigs, priorityMetaProducer, extenders)
podBackoff := util.CreateDefaultPodBackoff()
return &scheduler.Config{
@@ -1038,6 +1037,7 @@ func (factory *configFactory) MakeDefaultErrorFunc(backoff *util.PodBackoff, pod
glog.Errorf("Error scheduling %v %v: %v; retrying", pod.Namespace, pod.Name, err)
}
}
backoff.Gc()
// Retry asynchronously.
// Note that this is extremely rudimentary and we need a more real error handling path.
@@ -1048,10 +1048,16 @@ func (factory *configFactory) MakeDefaultErrorFunc(backoff *util.PodBackoff, pod
Name: pod.Name,
}
entry := backoff.GetEntry(podID)
if !entry.TryWait(backoff.MaxDuration()) {
glog.Warningf("Request for pod %v already in flight, abandoning", podID)
return
// When pod priority is enabled, we would like to place an unschedulable
// pod in the unschedulable queue. This ensures that if the pod is nominated
// to run on a node, scheduler takes the pod into account when running
// predicates for the node.
if !util.PodPriorityEnabled() {
entry := backoff.GetEntry(podID)
if !entry.TryWait(backoff.MaxDuration()) {
glog.Warningf("Request for pod %v already in flight, abandoning", podID)
return
}
}
// Get the pod again; it may have changed/been scheduled already.
getBackoff := initialGetBackoff
@@ -1059,7 +1065,7 @@ func (factory *configFactory) MakeDefaultErrorFunc(backoff *util.PodBackoff, pod
pod, err := factory.client.CoreV1().Pods(podID.Namespace).Get(podID.Name, metav1.GetOptions{})
if err == nil {
if len(pod.Spec.NodeName) == 0 {
podQueue.AddIfNotPresent(pod)
podQueue.AddUnschedulableIfNotPresent(pod)
}
break
}
@@ -1147,3 +1153,24 @@ func (p *podPreemptor) UpdatePodAnnotations(pod *v1.Pod, annotations map[string]
_, error := p.Client.CoreV1().Pods(podCopy.Namespace).Patch(podCopy.Name, types.MergePatchType, patchData, "status")
return error
}
func (p *podPreemptor) RemoveNominatedNodeAnnotation(pod *v1.Pod) error {
podCopy := pod.DeepCopy()
if podCopy.Annotations == nil {
return nil
}
if _, exists := podCopy.Annotations[core.NominatedNodeAnnotationKey]; !exists {
return nil
}
// Note: Deleting the entry from the annotations and passing it to Patch() will
// not remove the annotation. That's why we set it to empty string.
podCopy.Annotations[core.NominatedNodeAnnotationKey] = ""
ret := &unstructured.Unstructured{}
ret.SetAnnotations(podCopy.Annotations)
patchData, err := json.Marshal(ret)
if err != nil {
return err
}
_, error := p.Client.CoreV1().Pods(podCopy.Namespace).Patch(podCopy.Name, types.MergePatchType, patchData, "status")
return error
}

View File

@@ -53,6 +53,7 @@ type PodPreemptor interface {
GetUpdatedPod(pod *v1.Pod) (*v1.Pod, error)
DeletePod(pod *v1.Pod) error
UpdatePodAnnotations(pod *v1.Pod, annots map[string]string) error
RemoveNominatedNodeAnnotation(pod *v1.Pod) error
}
// Scheduler watches for new unscheduled pods. It attempts to find
@@ -203,29 +204,40 @@ func (sched *Scheduler) preempt(preemptor *v1.Pod, scheduleErr error) (string, e
glog.Errorf("Error getting the updated preemptor pod object: %v", err)
return "", err
}
node, victims, err := sched.config.Algorithm.Preempt(preemptor, sched.config.NodeLister, scheduleErr)
node, victims, nominatedPodsToClear, err := sched.config.Algorithm.Preempt(preemptor, sched.config.NodeLister, scheduleErr)
if err != nil {
glog.Errorf("Error preempting victims to make room for %v/%v.", preemptor.Namespace, preemptor.Name)
return "", err
}
if node == nil {
return "", err
}
glog.Infof("Preempting %d pod(s) on node %v to make room for %v/%v.", len(victims), node.Name, preemptor.Namespace, preemptor.Name)
annotations := map[string]string{core.NominatedNodeAnnotationKey: node.Name}
err = sched.config.PodPreemptor.UpdatePodAnnotations(preemptor, annotations)
if err != nil {
glog.Errorf("Error in preemption process. Cannot update pod %v annotations: %v", preemptor.Name, err)
return "", err
}
for _, victim := range victims {
if err := sched.config.PodPreemptor.DeletePod(victim); err != nil {
glog.Errorf("Error preempting pod %v/%v: %v", victim.Namespace, victim.Name, err)
var nodeName = ""
if node != nil {
nodeName = node.Name
annotations := map[string]string{core.NominatedNodeAnnotationKey: nodeName}
err = sched.config.PodPreemptor.UpdatePodAnnotations(preemptor, annotations)
if err != nil {
glog.Errorf("Error in preemption process. Cannot update pod %v annotations: %v", preemptor.Name, err)
return "", err
}
sched.config.Recorder.Eventf(victim, v1.EventTypeNormal, "Preempted", "by %v/%v on node %v", preemptor.Namespace, preemptor.Name, node.Name)
for _, victim := range victims {
if err := sched.config.PodPreemptor.DeletePod(victim); err != nil {
glog.Errorf("Error preempting pod %v/%v: %v", victim.Namespace, victim.Name, err)
return "", err
}
sched.config.Recorder.Eventf(victim, v1.EventTypeNormal, "Preempted", "by %v/%v on node %v", preemptor.Namespace, preemptor.Name, nodeName)
}
}
return node.Name, err
// Clearing nominated pods should happen outside of "if node != nil". Node could
// be nil when a pod with nominated node name is eligible to preempt again,
// but preemption logic does not find any node for it. In that case Preempt()
// function of generic_scheduler.go returns the pod itself for removal of the annotation.
for _, p := range nominatedPodsToClear {
rErr := sched.config.PodPreemptor.RemoveNominatedNodeAnnotation(p)
if rErr != nil {
glog.Errorf("Cannot remove nominated node annotation of pod: %v", rErr)
// We do not return as this error is not critical.
}
}
return nodeName, err
}
// assume signals to the cache that a pod is already in the cache, so that binding can be asynchronous.

View File

@@ -52,6 +52,24 @@ func (fc fakePodConditionUpdater) Update(pod *v1.Pod, podCondition *v1.PodCondit
return nil
}
type fakePodPreemptor struct{}
func (fp fakePodPreemptor) GetUpdatedPod(pod *v1.Pod) (*v1.Pod, error) {
return pod, nil
}
func (fp fakePodPreemptor) DeletePod(pod *v1.Pod) error {
return nil
}
func (fp fakePodPreemptor) UpdatePodAnnotations(pod *v1.Pod, annots map[string]string) error {
return nil
}
func (fp fakePodPreemptor) RemoveNominatedNodeAnnotation(pod *v1.Pod) error {
return nil
}
func podWithID(id, desiredHost string) *v1.Pod {
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{Name: id, SelfLink: util.Test.SelfLink(string(v1.ResourcePods), id)},
@@ -103,8 +121,8 @@ func (es mockScheduler) Prioritizers() []algorithm.PriorityConfig {
return nil
}
func (es mockScheduler) Preempt(pod *v1.Pod, nodeLister algorithm.NodeLister, scheduleErr error) (*v1.Node, []*v1.Pod, error) {
return nil, nil, nil
func (es mockScheduler) Preempt(pod *v1.Pod, nodeLister algorithm.NodeLister, scheduleErr error) (*v1.Node, []*v1.Pod, []*v1.Pod, error) {
return nil, nil, nil, nil
}
func TestScheduler(t *testing.T) {
@@ -505,6 +523,7 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache schedulercache.
algo := core.NewGenericScheduler(
scache,
nil,
nil,
predicateMap,
algorithm.EmptyPredicateMetadataProducer,
[]algorithm.PriorityConfig{},
@@ -529,6 +548,7 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache schedulercache.
},
Recorder: &record.FakeRecorder{},
PodConditionUpdater: fakePodConditionUpdater{},
PodPreemptor: fakePodPreemptor{},
},
}
@@ -541,6 +561,7 @@ func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, sc
algo := core.NewGenericScheduler(
scache,
nil,
nil,
predicateMap,
algorithm.EmptyPredicateMetadataProducer,
[]algorithm.PriorityConfig{},
@@ -568,6 +589,7 @@ func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, sc
},
Recorder: &record.FakeRecorder{},
PodConditionUpdater: fakePodConditionUpdater{},
PodPreemptor: fakePodPreemptor{},
StopEverything: stop,
},
}