Add logic to account for pods nominated to run on nodes, but are not running yet.

Add tests for the new logic.
This commit is contained in:
Bobby (Babak) Salamat
2017-11-07 17:09:21 -08:00
parent 2cbb07a439
commit 8a17ae241d
12 changed files with 853 additions and 388 deletions

View File

@@ -49,8 +49,9 @@ type ScheduleAlgorithm interface {
Schedule(*v1.Pod, NodeLister) (selectedMachine string, err error)
// Preempt receives scheduling errors for a pod and tries to create room for
// the pod by preempting lower priority pods if possible.
// It returns the node where preemption happened, a list of preempted pods, and error if any.
Preempt(*v1.Pod, NodeLister, error) (selectedNode *v1.Node, preemptedPods []*v1.Pod, err error)
// It returns the node where preemption happened, a list of preempted pods, a
// list of pods whose nominated node name should be removed, and error if any.
Preempt(*v1.Pod, NodeLister, error) (selectedNode *v1.Node, preemptedPods []*v1.Pod, cleanupNominatedPods []*v1.Pod, err error)
// Predicates() returns a pointer to a map of predicate functions. This is
// exposed for testing.
Predicates() map[string]FitPredicate

View File

@@ -315,8 +315,9 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
for _, name := range test.nodes {
cache.AddNode(&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: name}})
}
queue := NewSchedulingQueue()
scheduler := NewGenericScheduler(
cache, nil, test.predicates, algorithm.EmptyPredicateMetadataProducer, test.prioritizers, algorithm.EmptyMetadataProducer, extenders)
cache, nil, queue, test.predicates, algorithm.EmptyPredicateMetadataProducer, test.prioritizers, algorithm.EmptyMetadataProducer, extenders)
podIgnored := &v1.Pod{}
machine, err := scheduler.Schedule(podIgnored, schedulertesting.FakeNodeLister(makeNodeList(test.nodes)))
if test.expectsErr {

View File

@@ -81,6 +81,7 @@ func (f *FitError) Error() string {
type genericScheduler struct {
cache schedulercache.Cache
equivalenceCache *EquivalenceCache
schedulingQueue SchedulingQueue
predicates map[string]algorithm.FitPredicate
priorityMetaProducer algorithm.MetadataProducer
predicateMetaProducer algorithm.PredicateMetadataProducer
@@ -114,7 +115,7 @@ func (g *genericScheduler) Schedule(pod *v1.Pod, nodeLister algorithm.NodeLister
}
trace.Step("Computing predicates")
filteredNodes, failedPredicateMap, err := findNodesThatFit(pod, g.cachedNodeInfoMap, nodes, g.predicates, g.extenders, g.predicateMetaProducer, g.equivalenceCache)
filteredNodes, failedPredicateMap, err := findNodesThatFit(pod, g.cachedNodeInfoMap, nodes, g.predicates, g.extenders, g.predicateMetaProducer, g.equivalenceCache, g.schedulingQueue)
if err != nil {
return "", err
}
@@ -177,53 +178,54 @@ func (g *genericScheduler) selectHost(priorityList schedulerapi.HostPriorityList
// preempt finds nodes with pods that can be preempted to make room for "pod" to
// schedule. It chooses one of the nodes and preempts the pods on the node and
// returns the node and the list of preempted pods if such a node is found.
// TODO(bsalamat): Add priority-based scheduling. More info: today one or more
// pending pods (different from the pod that triggered the preemption(s)) may
// schedule into some portion of the resources freed up by the preemption(s)
// before the pod that triggered the preemption(s) has a chance to schedule
// there, thereby preventing the pod that triggered the preemption(s) from
// scheduling. Solution is given at:
// https://github.com/kubernetes/community/blob/master/contributors/design-proposals/scheduling/pod-preemption.md#preemption-mechanics
func (g *genericScheduler) Preempt(pod *v1.Pod, nodeLister algorithm.NodeLister, scheduleErr error) (*v1.Node, []*v1.Pod, error) {
// returns 1) the node, 2) the list of preempted pods if such a node is found,
// 3) A list of pods whose nominated node name should be cleared, and 4) any
// possible error.
func (g *genericScheduler) Preempt(pod *v1.Pod, nodeLister algorithm.NodeLister, scheduleErr error) (*v1.Node, []*v1.Pod, []*v1.Pod, error) {
// Scheduler may return various types of errors. Consider preemption only if
// the error is of type FitError.
fitError, ok := scheduleErr.(*FitError)
if !ok || fitError == nil {
return nil, nil, nil
return nil, nil, nil, nil
}
err := g.cache.UpdateNodeNameToInfoMap(g.cachedNodeInfoMap)
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}
if !podEligibleToPreemptOthers(pod, g.cachedNodeInfoMap) {
glog.V(5).Infof("Pod %v is not eligible for more preemption.", pod.Name)
return nil, nil, nil
return nil, nil, nil, nil
}
allNodes, err := nodeLister.List()
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}
if len(allNodes) == 0 {
return nil, nil, ErrNoNodesAvailable
return nil, nil, nil, ErrNoNodesAvailable
}
potentialNodes := nodesWherePreemptionMightHelp(pod, allNodes, fitError.FailedPredicates)
if len(potentialNodes) == 0 {
glog.V(3).Infof("Preemption will not help schedule pod %v on any node.", pod.Name)
return nil, nil, nil
// In this case, we should clean-up any existing nominated node name of the pod.
return nil, nil, []*v1.Pod{pod}, nil
}
nodeToPods, err := selectNodesForPreemption(pod, g.cachedNodeInfoMap, potentialNodes, g.predicates, g.predicateMetaProducer)
nodeToPods, err := selectNodesForPreemption(pod, g.cachedNodeInfoMap, potentialNodes, g.predicates, g.predicateMetaProducer, g.schedulingQueue)
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}
for len(nodeToPods) > 0 {
node := pickOneNodeForPreemption(nodeToPods)
if node == nil {
return nil, nil, err
return nil, nil, nil, err
}
passes, pErr := nodePassesExtendersForPreemption(pod, node.Name, nodeToPods[node], g.cachedNodeInfoMap, g.extenders)
if passes && pErr == nil {
return node, nodeToPods[node], err
// Lower priority pods nominated to run on this node, may no longer fit on
// this node. So, we should remove their nomination. Removing their
// nomination updates these pods and moves them to the active queue. It
// lets scheduler find another place for them.
nominatedPods := g.getLowerPriorityNominatedPods(pod, node.Name)
return node, nodeToPods[node], nominatedPods, err
}
if pErr != nil {
glog.Errorf("Error occurred while checking extenders for preemption on node %v: %v", node, pErr)
@@ -231,7 +233,30 @@ func (g *genericScheduler) Preempt(pod *v1.Pod, nodeLister algorithm.NodeLister,
// Remove the node from the map and try to pick a different node.
delete(nodeToPods, node)
}
return nil, nil, err
return nil, nil, nil, err
}
// GetLowerPriorityNominatedPods returns pods whose priority is smaller than the
// priority of the given "pod" and are nominated to run on the given node.
// Note: We could possibly check if the nominated lower priority pods still fit
// and return those that no longer fit, but that would require lots of
// manipulation of NodeInfo and PredicateMeta per nominated pod. It may not be
// worth the complexity, especially because we generally expect to have a very
// small number of nominated pods per node.
func (g *genericScheduler) getLowerPriorityNominatedPods(pod *v1.Pod, nodeName string) []*v1.Pod {
pods := g.schedulingQueue.WaitingPodsForNode(nodeName)
if len(pods) == 0 {
return nil
}
var lowerPriorityPods []*v1.Pod
podPriority := util.GetPodPriority(pod)
for _, p := range pods {
if util.GetPodPriority(p) < podPriority {
lowerPriorityPods = append(lowerPriorityPods, p)
}
}
return lowerPriorityPods
}
// Filters the nodes to find the ones that fit based on the given predicate functions
@@ -244,6 +269,7 @@ func findNodesThatFit(
extenders []algorithm.SchedulerExtender,
metadataProducer algorithm.PredicateMetadataProducer,
ecache *EquivalenceCache,
schedulingQueue SchedulingQueue,
) ([]*v1.Node, FailedPredicateMap, error) {
var filtered []*v1.Node
failedPredicateMap := FailedPredicateMap{}
@@ -262,7 +288,7 @@ func findNodesThatFit(
meta := metadataProducer(pod, nodeNameToInfo)
checkNode := func(i int) {
nodeName := nodes[i].Name
fits, failedPredicates, err := podFitsOnNode(pod, meta, nodeNameToInfo[nodeName], predicateFuncs, ecache)
fits, failedPredicates, err := podFitsOnNode(pod, meta, nodeNameToInfo[nodeName], predicateFuncs, ecache, schedulingQueue)
if err != nil {
predicateResultLock.Lock()
errs[err.Error()]++
@@ -306,9 +332,45 @@ func findNodesThatFit(
return filtered, failedPredicateMap, nil
}
// addNominatedPods adds pods with equal or greater priority which are nominated
// to run on the node given in nodeInfo to meta and nodeInfo. It returns 1) whether
// any pod was found, 2) augmented meta data, 3) augmented nodeInfo.
func addNominatedPods(podPriority int32, meta algorithm.PredicateMetadata,
nodeInfo *schedulercache.NodeInfo, queue SchedulingQueue) (bool, algorithm.PredicateMetadata,
*schedulercache.NodeInfo) {
if queue == nil || nodeInfo == nil || nodeInfo.Node() == nil {
// This may happen only in tests.
return false, meta, nodeInfo
}
nominatedPods := queue.WaitingPodsForNode(nodeInfo.Node().Name)
if nominatedPods == nil || len(nominatedPods) == 0 {
return false, meta, nodeInfo
}
var metaOut algorithm.PredicateMetadata = nil
if meta != nil {
metaOut = meta.ShallowCopy()
}
nodeInfoOut := nodeInfo.Clone()
for _, p := range nominatedPods {
if util.GetPodPriority(p) >= podPriority {
nodeInfoOut.AddPod(p)
if metaOut != nil {
metaOut.AddPod(p, nodeInfoOut)
}
}
}
return true, metaOut, nodeInfoOut
}
// Checks whether node with a given name and NodeInfo satisfies all predicateFuncs.
func podFitsOnNode(pod *v1.Pod, meta algorithm.PredicateMetadata, info *schedulercache.NodeInfo, predicateFuncs map[string]algorithm.FitPredicate,
ecache *EquivalenceCache) (bool, []algorithm.PredicateFailureReason, error) {
func podFitsOnNode(
pod *v1.Pod,
meta algorithm.PredicateMetadata,
info *schedulercache.NodeInfo,
predicateFuncs map[string]algorithm.FitPredicate,
ecache *EquivalenceCache,
queue SchedulingQueue,
) (bool, []algorithm.PredicateFailureReason, error) {
var (
equivalenceHash uint64
failedPredicates []algorithm.PredicateFailureReason
@@ -318,34 +380,85 @@ func podFitsOnNode(pod *v1.Pod, meta algorithm.PredicateMetadata, info *schedule
reasons []algorithm.PredicateFailureReason
err error
)
predicateResults := make(map[string]HostPredicate)
if ecache != nil {
// getHashEquivalencePod will return immediately if no equivalence pod found
equivalenceHash, eCacheAvailable = ecache.getHashEquivalencePod(pod)
}
for predicateKey, predicate := range predicateFuncs {
// If equivalenceCache is available
if eCacheAvailable {
// PredicateWithECache will returns it's cached predicate results
fit, reasons, invalid = ecache.PredicateWithECache(pod.GetName(), info.Node().GetName(), predicateKey, equivalenceHash)
podsAdded := false
// We run predicates twice in some cases. If the node has greater or equal priority
// nominated pods, we run them when those pods are added to meta and nodeInfo.
// If all predicates succeed in this pass, we run them again when these
// nominated pods are not added. This second pass is necessary because some
// predicates such as inter-pod affinity may not pass without the nominated pods.
// If there are no nominated pods for the node or if the first run of the
// predicates fail, we don't run the second pass.
// We consider only equal or higher priority pods in the first pass, because
// those are the current "pod" must yield to them and not take a space opened
// for running them. It is ok if the current "pod" take resources freed for
// lower priority pods.
// Requiring that the new pod is schedulable in both circumstances ensures that
// we are making a conservative decision: predicates like resources and inter-pod
// anti-affinity are more likely to fail when the nominated pods are treated
// as running, while predicates like pod affinity are more likely to fail when
// the nominated pods are treated as not running. We can't just assume the
// nominated pods are running because they are not running right now and in fact,
// they may end up getting scheduled to a different node.
for i := 0; i < 2; i++ {
metaToUse := meta
nodeInfoToUse := info
if i == 0 {
podsAdded, metaToUse, nodeInfoToUse = addNominatedPods(util.GetPodPriority(pod), meta, info, queue)
} else if !podsAdded || len(failedPredicates) != 0 {
break
}
if !eCacheAvailable || invalid {
// we need to execute predicate functions since equivalence cache does not work
fit, reasons, err = predicate(pod, meta, info)
if err != nil {
return false, []algorithm.PredicateFailureReason{}, err
}
// Bypass eCache if node has any nominated pods.
// TODO(bsalamat): consider using eCache and adding proper eCache invalidations
// when pods are nominated or their nominations change.
eCacheAvailable = eCacheAvailable && !podsAdded
for predicateKey, predicate := range predicateFuncs {
if eCacheAvailable {
// update equivalence cache with newly computed fit & reasons
// TODO(resouer) should we do this in another thread? any race?
ecache.UpdateCachedPredicateItem(pod.GetName(), info.Node().GetName(), predicateKey, fit, reasons, equivalenceHash)
// PredicateWithECache will return its cached predicate results.
fit, reasons, invalid = ecache.PredicateWithECache(pod.GetName(), info.Node().GetName(), predicateKey, equivalenceHash)
}
// TODO(bsalamat): When one predicate fails and fit is false, why do we continue
// checking other predicates?
if !eCacheAvailable || invalid {
// we need to execute predicate functions since equivalence cache does not work
fit, reasons, err = predicate(pod, metaToUse, nodeInfoToUse)
if err != nil {
return false, []algorithm.PredicateFailureReason{}, err
}
if eCacheAvailable {
// Store data to update eCache after this loop.
if res, exists := predicateResults[predicateKey]; exists {
res.Fit = res.Fit && fit
res.FailReasons = append(res.FailReasons, reasons...)
predicateResults[predicateKey] = res
} else {
predicateResults[predicateKey] = HostPredicate{Fit: fit, FailReasons: reasons}
}
}
}
if !fit {
// eCache is available and valid, and predicates result is unfit, record the fail reasons
failedPredicates = append(failedPredicates, reasons...)
}
}
}
if !fit {
// eCache is available and valid, and predicates result is unfit, record the fail reasons
failedPredicates = append(failedPredicates, reasons...)
// TODO(bsalamat): This way of updating equiv. cache has a race condition against
// cache invalidations invoked in event handlers. This race has existed despite locks
// in eCache implementation. If cache is invalidated after a predicate is executed
// and before we update the cache, the updates should not be written to the cache.
if eCacheAvailable {
nodeName := info.Node().GetName()
for predKey, result := range predicateResults {
// update equivalence cache with newly computed fit & reasons
// TODO(resouer) should we do this in another thread? any race?
ecache.UpdateCachedPredicateItem(pod.GetName(), nodeName, predKey, result.Fit, result.FailReasons, equivalenceHash)
}
}
return len(failedPredicates) == 0, failedPredicates, nil
@@ -597,6 +710,7 @@ func selectNodesForPreemption(pod *v1.Pod,
potentialNodes []*v1.Node,
predicates map[string]algorithm.FitPredicate,
metadataProducer algorithm.PredicateMetadataProducer,
queue SchedulingQueue,
) (map[*v1.Node][]*v1.Pod, error) {
nodeNameToPods := map[*v1.Node][]*v1.Pod{}
@@ -610,7 +724,7 @@ func selectNodesForPreemption(pod *v1.Pod,
if meta != nil {
metaCopy = meta.ShallowCopy()
}
pods, fits := selectVictimsOnNode(pod, metaCopy, nodeNameToInfo[nodeName], predicates)
pods, fits := selectVictimsOnNode(pod, metaCopy, nodeNameToInfo[nodeName], predicates, queue)
if fits {
resultLock.Lock()
nodeNameToPods[potentialNodes[i]] = pods
@@ -672,7 +786,9 @@ func selectVictimsOnNode(
pod *v1.Pod,
meta algorithm.PredicateMetadata,
nodeInfo *schedulercache.NodeInfo,
fitPredicates map[string]algorithm.FitPredicate) ([]*v1.Pod, bool) {
fitPredicates map[string]algorithm.FitPredicate,
queue SchedulingQueue,
) ([]*v1.Pod, bool) {
potentialVictims := util.SortableList{CompFunc: util.HigherPriorityPod}
nodeInfoCopy := nodeInfo.Clone()
@@ -703,7 +819,7 @@ func selectVictimsOnNode(
// that we should check is if the "pod" is failing to schedule due to pod affinity
// failure.
// TODO(bsalamat): Consider checking affinity to lower priority pods if feasible with reasonable performance.
if fits, _, err := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, nil); !fits {
if fits, _, err := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, nil, queue); !fits {
if err != nil {
glog.Warningf("Encountered error while selecting victims on node %v: %v", nodeInfo.Node().Name, err)
}
@@ -714,7 +830,7 @@ func selectVictimsOnNode(
for _, p := range potentialVictims.Items {
lpp := p.(*v1.Pod)
addPod(lpp)
if fits, _, _ := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, nil); !fits {
if fits, _, _ := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, nil, queue); !fits {
removePod(lpp)
victims = append(victims, lpp)
glog.V(5).Infof("Pod %v is a potential preemption victim on node %v.", lpp.Name, nodeInfo.Node().Name)
@@ -764,7 +880,6 @@ func nodesWherePreemptionMightHelp(pod *v1.Pod, nodes []*v1.Node, failedPredicat
// considered for preemption.
// We look at the node that is nominated for this pod and as long as there are
// terminating pods on the node, we don't consider this for preempting more pods.
// TODO(bsalamat): Revisit this algorithm once scheduling by priority is added.
func podEligibleToPreemptOthers(pod *v1.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo) bool {
if nodeName, found := pod.Annotations[NominatedNodeAnnotationKey]; found {
if nodeInfo, found := nodeNameToInfo[nodeName]; found {
@@ -782,6 +897,7 @@ func podEligibleToPreemptOthers(pod *v1.Pod, nodeNameToInfo map[string]*schedule
func NewGenericScheduler(
cache schedulercache.Cache,
eCache *EquivalenceCache,
podQueue SchedulingQueue,
predicates map[string]algorithm.FitPredicate,
predicateMetaProducer algorithm.PredicateMetadataProducer,
prioritizers []algorithm.PriorityConfig,
@@ -790,6 +906,7 @@ func NewGenericScheduler(
return &genericScheduler{
cache: cache,
equivalenceCache: eCache,
schedulingQueue: podQueue,
predicates: predicates,
predicateMetaProducer: predicateMetaProducer,
prioritizers: prioritizers,

View File

@@ -311,7 +311,7 @@ func TestGenericScheduler(t *testing.T) {
}
scheduler := NewGenericScheduler(
cache, nil, test.predicates, algorithm.EmptyPredicateMetadataProducer, test.prioritizers, algorithm.EmptyMetadataProducer, []algorithm.SchedulerExtender{})
cache, nil, NewSchedulingQueue(), test.predicates, algorithm.EmptyPredicateMetadataProducer, test.prioritizers, algorithm.EmptyMetadataProducer, []algorithm.SchedulerExtender{})
machine, err := scheduler.Schedule(test.pod, schedulertesting.FakeNodeLister(makeNodeList(test.nodes)))
if !reflect.DeepEqual(err, test.wErr) {
@@ -331,7 +331,7 @@ func TestFindFitAllError(t *testing.T) {
"2": schedulercache.NewNodeInfo(),
"1": schedulercache.NewNodeInfo(),
}
_, predicateMap, err := findNodesThatFit(&v1.Pod{}, nodeNameToInfo, makeNodeList(nodes), predicates, nil, algorithm.EmptyPredicateMetadataProducer, nil)
_, predicateMap, err := findNodesThatFit(&v1.Pod{}, nodeNameToInfo, makeNodeList(nodes), predicates, nil, algorithm.EmptyPredicateMetadataProducer, nil, nil)
if err != nil {
t.Errorf("unexpected error: %v", err)
@@ -365,7 +365,7 @@ func TestFindFitSomeError(t *testing.T) {
nodeNameToInfo[name].SetNode(&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: name}})
}
_, predicateMap, err := findNodesThatFit(pod, nodeNameToInfo, makeNodeList(nodes), predicates, nil, algorithm.EmptyPredicateMetadataProducer, nil)
_, predicateMap, err := findNodesThatFit(pod, nodeNameToInfo, makeNodeList(nodes), predicates, nil, algorithm.EmptyPredicateMetadataProducer, nil, nil)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@@ -790,7 +790,7 @@ func TestSelectNodesForPreemption(t *testing.T) {
test.predicates[predicates.MatchInterPodAffinity] = algorithmpredicates.NewPodAffinityPredicate(FakeNodeInfo(*nodes[0]), schedulertesting.FakePodLister(test.pods))
}
nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods, nodes)
nodeToPods, err := selectNodesForPreemption(test.pod, nodeNameToInfo, nodes, test.predicates, PredicateMetadata)
nodeToPods, err := selectNodesForPreemption(test.pod, nodeNameToInfo, nodes, test.predicates, PredicateMetadata, nil)
if err != nil {
t.Error(err)
}
@@ -947,7 +947,7 @@ func TestPickOneNodeForPreemption(t *testing.T) {
nodes = append(nodes, makeNode(n, priorityutil.DefaultMilliCpuRequest*5, priorityutil.DefaultMemoryRequest*5))
}
nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods, nodes)
candidateNodes, _ := selectNodesForPreemption(test.pod, nodeNameToInfo, nodes, test.predicates, PredicateMetadata)
candidateNodes, _ := selectNodesForPreemption(test.pod, nodeNameToInfo, nodes, test.predicates, PredicateMetadata, nil)
node := pickOneNodeForPreemption(candidateNodes)
found := false
for _, nodeName := range test.expected {
@@ -1190,9 +1190,9 @@ func TestPreempt(t *testing.T) {
extenders = append(extenders, extender)
}
scheduler := NewGenericScheduler(
cache, nil, map[string]algorithm.FitPredicate{"matches": algorithmpredicates.PodFitsResources}, algorithm.EmptyPredicateMetadataProducer, []algorithm.PriorityConfig{{Function: numericPriority, Weight: 1}}, algorithm.EmptyMetadataProducer, extenders)
cache, nil, NewSchedulingQueue(), map[string]algorithm.FitPredicate{"matches": algorithmpredicates.PodFitsResources}, algorithm.EmptyPredicateMetadataProducer, []algorithm.PriorityConfig{{Function: numericPriority, Weight: 1}}, algorithm.EmptyMetadataProducer, extenders)
// Call Preempt and check the expected results.
node, victims, err := scheduler.Preempt(test.pod, schedulertesting.FakeNodeLister(makeNodeList(nodeNames)), error(&FitError{Pod: test.pod, FailedPredicates: failedPredMap}))
node, victims, _, err := scheduler.Preempt(test.pod, schedulertesting.FakeNodeLister(makeNodeList(nodeNames)), error(&FitError{Pod: test.pod, FailedPredicates: failedPredMap}))
if err != nil {
t.Errorf("test [%v]: unexpected error in preemption: %v", test.name, err)
}
@@ -1220,7 +1220,7 @@ func TestPreempt(t *testing.T) {
test.pod.Annotations[NominatedNodeAnnotationKey] = node.Name
}
// Call preempt again and make sure it doesn't preempt any more pods.
node, victims, err = scheduler.Preempt(test.pod, schedulertesting.FakeNodeLister(makeNodeList(nodeNames)), error(&FitError{Pod: test.pod, FailedPredicates: failedPredMap}))
node, victims, _, err = scheduler.Preempt(test.pod, schedulertesting.FakeNodeLister(makeNodeList(nodeNames)), error(&FitError{Pod: test.pod, FailedPredicates: failedPredMap}))
if err != nil {
t.Errorf("test [%v]: unexpected error in preemption: %v", test.name, err)
}

View File

@@ -22,7 +22,7 @@ limitations under the License.
// pods that are already tried and are determined to be unschedulable. The latter
// is called unschedulableQ.
// FIFO is here for flag-gating purposes and allows us to use the traditional
// scheduling queue when Pod Priority flag is false.
// scheduling queue when util.PodPriorityEnabled() returns false.
package core
@@ -34,6 +34,7 @@ import (
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/cache"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates"
priorityutil "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/priorities/util"
"k8s.io/kubernetes/plugin/pkg/scheduler/util"
@@ -217,6 +218,11 @@ func (p *PriorityQueue) AddIfNotPresent(pod *v1.Pod) error {
return err
}
func isPodUnschedulable(pod *v1.Pod) bool {
_, cond := podutil.GetPodCondition(&pod.Status, v1.PodScheduled)
return cond != nil && cond.Status == v1.ConditionFalse && cond.Reason == v1.PodReasonUnschedulable
}
// AddUnschedulableIfNotPresent does nothing if the pod is present in either
// queue. Otherwise it adds the pod to the unschedulable queue if
// p.receivedMoveRequest is false, and to the activeQ if p.receivedMoveRequest is true.
@@ -229,11 +235,15 @@ func (p *PriorityQueue) AddUnschedulableIfNotPresent(pod *v1.Pod) error {
if _, exists, _ := p.activeQ.Get(pod); exists {
return fmt.Errorf("pod is already present in the activeQ")
}
if p.receivedMoveRequest {
return p.activeQ.Add(pod)
if !p.receivedMoveRequest && isPodUnschedulable(pod) {
p.unschedulableQ.Add(pod)
return nil
}
p.unschedulableQ.Add(pod)
return nil
err := p.activeQ.Add(pod)
if err == nil {
p.cond.Broadcast()
}
return err
}
// Pop removes the head of the active queue and returns it. It blocks if the
@@ -259,6 +269,7 @@ func isPodUpdated(oldPod, newPod *v1.Pod) bool {
strip := func(pod *v1.Pod) *v1.Pod {
p := pod.DeepCopy()
p.ResourceVersion = ""
p.Generation = 0
p.Status = v1.PodStatus{}
return p
}
@@ -274,15 +285,12 @@ func (p *PriorityQueue) Update(pod *v1.Pod) error {
// If the pod is already in the active queue, just update it there.
if _, exists, _ := p.activeQ.Get(pod); exists {
err := p.activeQ.Update(pod)
if err == nil {
p.cond.Broadcast()
}
return err
}
// If the pod is in the unschedulable queue, updating it may make it schedulable.
if oldPod := p.unschedulableQ.Get(pod); oldPod != nil {
if isPodUpdated(oldPod, pod) {
p.unschedulableQ.Delete(pod)
p.unschedulableQ.Delete(oldPod)
err := p.activeQ.Add(pod)
if err == nil {
p.cond.Broadcast()
@@ -386,7 +394,18 @@ func (p *PriorityQueue) getUnschedulablePodsWithMatchingAffinityTerm(pod *v1.Pod
// but they are waiting for other pods to be removed from the node before they
// can be actually scheduled.
func (p *PriorityQueue) WaitingPodsForNode(nodeName string) []*v1.Pod {
return p.unschedulableQ.GetPodsWaitingForNode(nodeName)
p.lock.RLock()
defer p.lock.RUnlock()
pods := p.unschedulableQ.GetPodsWaitingForNode(nodeName)
for _, obj := range p.activeQ.List() {
pod := obj.(*v1.Pod)
if pod.Annotations != nil {
if n, ok := pod.Annotations[NominatedNodeAnnotationKey]; ok && n == nodeName {
pods = append(pods, pod)
}
}
}
return pods
}
// UnschedulablePodsMap holds pods that cannot be scheduled. This data structure

View File

@@ -399,7 +399,6 @@ func (c *configFactory) onPvcDelete(obj interface{}) {
}
c.invalidatePredicatesForPvc(pvc)
}
c.podQueue.MoveAllToActiveQueue()
}
func (c *configFactory) invalidatePredicatesForPvc(pvc *v1.PersistentVolumeClaim) {
@@ -831,7 +830,7 @@ func (f *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String,
f.equivalencePodCache = core.NewEquivalenceCache(getEquivalencePodFunc)
glog.Info("Created equivalence class cache")
}
algo := core.NewGenericScheduler(f.schedulerCache, f.equivalencePodCache, predicateFuncs, predicateMetaProducer, priorityConfigs, priorityMetaProducer, extenders)
algo := core.NewGenericScheduler(f.schedulerCache, f.equivalencePodCache, f.podQueue, predicateFuncs, predicateMetaProducer, priorityConfigs, priorityMetaProducer, extenders)
podBackoff := util.CreateDefaultPodBackoff()
return &scheduler.Config{
@@ -1038,6 +1037,7 @@ func (factory *configFactory) MakeDefaultErrorFunc(backoff *util.PodBackoff, pod
glog.Errorf("Error scheduling %v %v: %v; retrying", pod.Namespace, pod.Name, err)
}
}
backoff.Gc()
// Retry asynchronously.
// Note that this is extremely rudimentary and we need a more real error handling path.
@@ -1048,10 +1048,16 @@ func (factory *configFactory) MakeDefaultErrorFunc(backoff *util.PodBackoff, pod
Name: pod.Name,
}
entry := backoff.GetEntry(podID)
if !entry.TryWait(backoff.MaxDuration()) {
glog.Warningf("Request for pod %v already in flight, abandoning", podID)
return
// When pod priority is enabled, we would like to place an unschedulable
// pod in the unschedulable queue. This ensures that if the pod is nominated
// to run on a node, scheduler takes the pod into account when running
// predicates for the node.
if !util.PodPriorityEnabled() {
entry := backoff.GetEntry(podID)
if !entry.TryWait(backoff.MaxDuration()) {
glog.Warningf("Request for pod %v already in flight, abandoning", podID)
return
}
}
// Get the pod again; it may have changed/been scheduled already.
getBackoff := initialGetBackoff
@@ -1059,7 +1065,7 @@ func (factory *configFactory) MakeDefaultErrorFunc(backoff *util.PodBackoff, pod
pod, err := factory.client.CoreV1().Pods(podID.Namespace).Get(podID.Name, metav1.GetOptions{})
if err == nil {
if len(pod.Spec.NodeName) == 0 {
podQueue.AddIfNotPresent(pod)
podQueue.AddUnschedulableIfNotPresent(pod)
}
break
}
@@ -1147,3 +1153,24 @@ func (p *podPreemptor) UpdatePodAnnotations(pod *v1.Pod, annotations map[string]
_, error := p.Client.CoreV1().Pods(podCopy.Namespace).Patch(podCopy.Name, types.MergePatchType, patchData, "status")
return error
}
func (p *podPreemptor) RemoveNominatedNodeAnnotation(pod *v1.Pod) error {
podCopy := pod.DeepCopy()
if podCopy.Annotations == nil {
return nil
}
if _, exists := podCopy.Annotations[core.NominatedNodeAnnotationKey]; !exists {
return nil
}
// Note: Deleting the entry from the annotations and passing it Patch() will
// not remove the annotation. That's why we set it to empty string.
podCopy.Annotations[core.NominatedNodeAnnotationKey] = ""
ret := &unstructured.Unstructured{}
ret.SetAnnotations(podCopy.Annotations)
patchData, err := json.Marshal(ret)
if err != nil {
return err
}
_, error := p.Client.CoreV1().Pods(podCopy.Namespace).Patch(podCopy.Name, types.MergePatchType, patchData, "status")
return error
}

View File

@@ -53,6 +53,7 @@ type PodPreemptor interface {
GetUpdatedPod(pod *v1.Pod) (*v1.Pod, error)
DeletePod(pod *v1.Pod) error
UpdatePodAnnotations(pod *v1.Pod, annots map[string]string) error
RemoveNominatedNodeAnnotation(pod *v1.Pod) error
}
// Scheduler watches for new unscheduled pods. It attempts to find
@@ -203,29 +204,36 @@ func (sched *Scheduler) preempt(preemptor *v1.Pod, scheduleErr error) (string, e
glog.Errorf("Error getting the updated preemptor pod object: %v", err)
return "", err
}
node, victims, err := sched.config.Algorithm.Preempt(preemptor, sched.config.NodeLister, scheduleErr)
node, victims, nominatedPodsToClear, err := sched.config.Algorithm.Preempt(preemptor, sched.config.NodeLister, scheduleErr)
if err != nil {
glog.Errorf("Error preempting victims to make room for %v/%v.", preemptor.Namespace, preemptor.Name)
return "", err
}
if node == nil {
return "", err
}
glog.Infof("Preempting %d pod(s) on node %v to make room for %v/%v.", len(victims), node.Name, preemptor.Namespace, preemptor.Name)
annotations := map[string]string{core.NominatedNodeAnnotationKey: node.Name}
err = sched.config.PodPreemptor.UpdatePodAnnotations(preemptor, annotations)
if err != nil {
glog.Errorf("Error in preemption process. Cannot update pod %v annotations: %v", preemptor.Name, err)
return "", err
}
for _, victim := range victims {
if err := sched.config.PodPreemptor.DeletePod(victim); err != nil {
glog.Errorf("Error preempting pod %v/%v: %v", victim.Namespace, victim.Name, err)
var nodeName = ""
if node != nil {
nodeName = node.Name
annotations := map[string]string{core.NominatedNodeAnnotationKey: nodeName}
err = sched.config.PodPreemptor.UpdatePodAnnotations(preemptor, annotations)
if err != nil {
glog.Errorf("Error in preemption process. Cannot update pod %v annotations: %v", preemptor.Name, err)
return "", err
}
sched.config.Recorder.Eventf(victim, v1.EventTypeNormal, "Preempted", "by %v/%v on node %v", preemptor.Namespace, preemptor.Name, node.Name)
for _, victim := range victims {
if err := sched.config.PodPreemptor.DeletePod(victim); err != nil {
glog.Errorf("Error preempting pod %v/%v: %v", victim.Namespace, victim.Name, err)
return "", err
}
sched.config.Recorder.Eventf(victim, v1.EventTypeNormal, "Preempted", "by %v/%v on node %v", preemptor.Namespace, preemptor.Name, nodeName)
}
}
return node.Name, err
for _, p := range nominatedPodsToClear {
rErr := sched.config.PodPreemptor.RemoveNominatedNodeAnnotation(p)
if rErr != nil {
glog.Errorf("Cannot remove nominated node annotation of pod: %v", rErr)
// We do not return as this error is not critical.
}
}
return nodeName, err
}
// assume signals to the cache that a pod is already in the cache, so that binding can be asynchronous.

View File

@@ -52,6 +52,24 @@ func (fc fakePodConditionUpdater) Update(pod *v1.Pod, podCondition *v1.PodCondit
return nil
}
type fakePodPreemptor struct{}
func (fp fakePodPreemptor) GetUpdatedPod(pod *v1.Pod) (*v1.Pod, error) {
return pod, nil
}
func (fp fakePodPreemptor) DeletePod(pod *v1.Pod) error {
return nil
}
func (fp fakePodPreemptor) UpdatePodAnnotations(pod *v1.Pod, annots map[string]string) error {
return nil
}
func (fp fakePodPreemptor) RemoveNominatedNodeAnnotation(pod *v1.Pod) error {
return nil
}
func podWithID(id, desiredHost string) *v1.Pod {
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{Name: id, SelfLink: util.Test.SelfLink(string(v1.ResourcePods), id)},
@@ -103,8 +121,8 @@ func (es mockScheduler) Prioritizers() []algorithm.PriorityConfig {
return nil
}
func (es mockScheduler) Preempt(pod *v1.Pod, nodeLister algorithm.NodeLister, scheduleErr error) (*v1.Node, []*v1.Pod, error) {
return nil, nil, nil
func (es mockScheduler) Preempt(pod *v1.Pod, nodeLister algorithm.NodeLister, scheduleErr error) (*v1.Node, []*v1.Pod, []*v1.Pod, error) {
return nil, nil, nil, nil
}
func TestScheduler(t *testing.T) {
@@ -505,6 +523,7 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache schedulercache.
algo := core.NewGenericScheduler(
scache,
nil,
nil,
predicateMap,
algorithm.EmptyPredicateMetadataProducer,
[]algorithm.PriorityConfig{},
@@ -529,6 +548,7 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache schedulercache.
},
Recorder: &record.FakeRecorder{},
PodConditionUpdater: fakePodConditionUpdater{},
PodPreemptor: fakePodPreemptor{},
},
}
@@ -541,6 +561,7 @@ func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, sc
algo := core.NewGenericScheduler(
scache,
nil,
nil,
predicateMap,
algorithm.EmptyPredicateMetadataProducer,
[]algorithm.PriorityConfig{},
@@ -568,6 +589,7 @@ func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, sc
},
Recorder: &record.FakeRecorder{},
PodConditionUpdater: fakePodConditionUpdater{},
PodPreemptor: fakePodPreemptor{},
StopEverything: stop,
},
}