810 lines
30 KiB
Go
810 lines
30 KiB
Go
/*
|
|
Copyright 2020 The Kubernetes Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package defaultpreemption
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"math"
|
|
"math/rand"
|
|
"sort"
|
|
"sync"
|
|
"sync/atomic"
|
|
|
|
"k8s.io/klog/v2"
|
|
|
|
v1 "k8s.io/api/core/v1"
|
|
policy "k8s.io/api/policy/v1beta1"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/labels"
|
|
"k8s.io/apimachinery/pkg/runtime"
|
|
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
|
"k8s.io/client-go/informers"
|
|
"k8s.io/client-go/kubernetes"
|
|
corelisters "k8s.io/client-go/listers/core/v1"
|
|
policylisters "k8s.io/client-go/listers/policy/v1beta1"
|
|
corev1helpers "k8s.io/component-helpers/scheduling/corev1"
|
|
extenderv1 "k8s.io/kube-scheduler/extender/v1"
|
|
kubefeatures "k8s.io/kubernetes/pkg/features"
|
|
"k8s.io/kubernetes/pkg/scheduler/apis/config"
|
|
"k8s.io/kubernetes/pkg/scheduler/apis/config/validation"
|
|
"k8s.io/kubernetes/pkg/scheduler/framework"
|
|
"k8s.io/kubernetes/pkg/scheduler/internal/parallelize"
|
|
"k8s.io/kubernetes/pkg/scheduler/metrics"
|
|
"k8s.io/kubernetes/pkg/scheduler/util"
|
|
)
|
|
|
|
const (
|
|
// Name of the plugin used in the plugin registry and configurations.
|
|
Name = "DefaultPreemption"
|
|
)
|
|
|
|
// DefaultPreemption is a PostFilter plugin implements the preemption logic.
|
|
type DefaultPreemption struct {
|
|
fh framework.Handle
|
|
args config.DefaultPreemptionArgs
|
|
podLister corelisters.PodLister
|
|
pdbLister policylisters.PodDisruptionBudgetLister
|
|
}
|
|
|
|
var _ framework.PostFilterPlugin = &DefaultPreemption{}
|
|
|
|
// Name returns name of the plugin. It is used in logs, etc.
|
|
func (pl *DefaultPreemption) Name() string {
|
|
return Name
|
|
}
|
|
|
|
// New initializes a new plugin and returns it.
|
|
func New(dpArgs runtime.Object, fh framework.Handle) (framework.Plugin, error) {
|
|
args, ok := dpArgs.(*config.DefaultPreemptionArgs)
|
|
if !ok {
|
|
return nil, fmt.Errorf("got args of type %T, want *DefaultPreemptionArgs", dpArgs)
|
|
}
|
|
if err := validation.ValidateDefaultPreemptionArgs(*args); err != nil {
|
|
return nil, err
|
|
}
|
|
pl := DefaultPreemption{
|
|
fh: fh,
|
|
args: *args,
|
|
podLister: fh.SharedInformerFactory().Core().V1().Pods().Lister(),
|
|
pdbLister: getPDBLister(fh.SharedInformerFactory()),
|
|
}
|
|
return &pl, nil
|
|
}
|
|
|
|
// PostFilter invoked at the postFilter extension point.
|
|
func (pl *DefaultPreemption) PostFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod, m framework.NodeToStatusMap) (*framework.PostFilterResult, *framework.Status) {
|
|
defer func() {
|
|
metrics.PreemptionAttempts.Inc()
|
|
}()
|
|
|
|
nnn, err := pl.preempt(ctx, state, pod, m)
|
|
if err != nil {
|
|
if _, ok := err.(*framework.FitError); ok {
|
|
return nil, framework.NewStatus(framework.Unschedulable, err.Error())
|
|
}
|
|
return nil, framework.AsStatus(err)
|
|
}
|
|
// This happens when the pod is not eligible for preemption or extenders filtered all candidates.
|
|
if nnn == "" {
|
|
return nil, framework.NewStatus(framework.Unschedulable)
|
|
}
|
|
return &framework.PostFilterResult{NominatedNodeName: nnn}, framework.NewStatus(framework.Success)
|
|
}
|
|
|
|
// preempt finds nodes with pods that can be preempted to make room for "pod" to
|
|
// schedule. It chooses one of the nodes and preempts the pods on the node and
|
|
// returns 1) the node name which is picked up for preemption, 2) any possible error.
|
|
// preempt does not update its snapshot. It uses the same snapshot used in the
|
|
// scheduling cycle. This is to avoid a scenario where preempt finds feasible
|
|
// nodes without preempting any pod. When there are many pending pods in the
|
|
// scheduling queue a nominated pod will go back to the queue and behind
|
|
// other pods with the same priority. The nominated pod prevents other pods from
|
|
// using the nominated resources and the nominated pod could take a long time
|
|
// before it is retried after many other pending pods.
|
|
func (pl *DefaultPreemption) preempt(ctx context.Context, state *framework.CycleState, pod *v1.Pod, m framework.NodeToStatusMap) (string, error) {
|
|
cs := pl.fh.ClientSet()
|
|
ph := pl.fh.PreemptHandle()
|
|
nodeLister := pl.fh.SnapshotSharedLister().NodeInfos()
|
|
|
|
// 0) Fetch the latest version of <pod>.
|
|
// It's safe to directly fetch pod here. Because the informer cache has already been
|
|
// initialized when creating the Scheduler obj, i.e., factory.go#MakeDefaultErrorFunc().
|
|
// However, tests may need to manually initialize the shared pod informer.
|
|
pod, err := pl.podLister.Pods(pod.Namespace).Get(pod.Name)
|
|
if err != nil {
|
|
klog.Errorf("Error getting the updated preemptor pod object: %v", err)
|
|
return "", err
|
|
}
|
|
|
|
// 1) Ensure the preemptor is eligible to preempt other pods.
|
|
if !PodEligibleToPreemptOthers(pod, nodeLister, m[pod.Status.NominatedNodeName]) {
|
|
klog.V(5).Infof("Pod %v/%v is not eligible for more preemption.", pod.Namespace, pod.Name)
|
|
return "", nil
|
|
}
|
|
|
|
// 2) Find all preemption candidates.
|
|
candidates, evaluatedNodeNum, status, err := pl.FindCandidates(ctx, state, pod, m)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
// Return a FitError only when there are no candidates that fit the pod.
|
|
if len(candidates) == 0 {
|
|
return "", &framework.FitError{
|
|
Pod: pod,
|
|
NumAllNodes: len(evaluatedNodeNum),
|
|
Diagnosis: framework.Diagnosis{
|
|
NodeToStatusMap: status,
|
|
// Leave FailedPlugins as nil as it won't be used on moving Pods.
|
|
},
|
|
}
|
|
}
|
|
|
|
// 3) Interact with registered Extenders to filter out some candidates if needed.
|
|
candidates, err = CallExtenders(ph.Extenders(), pod, nodeLister, candidates)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
// 4) Find the best candidate.
|
|
bestCandidate := SelectCandidate(candidates)
|
|
if bestCandidate == nil || len(bestCandidate.Name()) == 0 {
|
|
return "", nil
|
|
}
|
|
|
|
// 5) Perform preparation work before nominating the selected candidate.
|
|
if err := PrepareCandidate(bestCandidate, pl.fh, cs, pod, pl.Name()); err != nil {
|
|
return "", err
|
|
}
|
|
|
|
return bestCandidate.Name(), nil
|
|
}
|
|
|
|
// calculateNumCandidates returns the number of candidates the FindCandidates
|
|
// method must produce from dry running based on the constraints given by
|
|
// <minCandidateNodesPercentage> and <minCandidateNodesAbsolute>. The number of
|
|
// candidates returned will never be greater than <numNodes>.
|
|
func (pl *DefaultPreemption) calculateNumCandidates(numNodes int32) int32 {
|
|
n := (numNodes * pl.args.MinCandidateNodesPercentage) / 100
|
|
if n < pl.args.MinCandidateNodesAbsolute {
|
|
n = pl.args.MinCandidateNodesAbsolute
|
|
}
|
|
if n > numNodes {
|
|
n = numNodes
|
|
}
|
|
return n
|
|
}
|
|
|
|
// getOffsetAndNumCandidates chooses a random offset and calculates the number
|
|
// of candidates that should be shortlisted for dry running preemption.
|
|
func (pl *DefaultPreemption) getOffsetAndNumCandidates(numNodes int32) (int32, int32) {
|
|
return rand.Int31n(numNodes), pl.calculateNumCandidates(numNodes)
|
|
}
|
|
|
|
// FindCandidates calculates a slice of preemption candidates.
|
|
// Each candidate is executable to make the given <pod> schedulable.
|
|
func (pl *DefaultPreemption) FindCandidates(ctx context.Context, state *framework.CycleState, pod *v1.Pod, m framework.NodeToStatusMap) ([]Candidate, []*framework.NodeInfo, framework.NodeToStatusMap, error) {
|
|
allNodes, err := pl.fh.SnapshotSharedLister().NodeInfos().List()
|
|
if err != nil {
|
|
return nil, nil, nil, err
|
|
}
|
|
if len(allNodes) == 0 {
|
|
return nil, nil, nil, fmt.Errorf("no nodes available")
|
|
}
|
|
potentialNodes := nodesWherePreemptionMightHelp(allNodes, m)
|
|
if len(potentialNodes) == 0 {
|
|
klog.V(3).Infof("Preemption will not help schedule pod %v/%v on any node.", pod.Namespace, pod.Name)
|
|
// In this case, we should clean-up any existing nominated node name of the pod.
|
|
if err := util.ClearNominatedNodeName(pl.fh.ClientSet(), pod); err != nil {
|
|
klog.Errorf("Cannot clear 'NominatedNodeName' field of pod %v/%v: %v", pod.Namespace, pod.Name, err)
|
|
// We do not return as this error is not critical.
|
|
}
|
|
return nil, nil, nil, nil
|
|
}
|
|
|
|
pdbs, err := getPodDisruptionBudgets(pl.pdbLister)
|
|
if err != nil {
|
|
return nil, nil, nil, err
|
|
}
|
|
|
|
offset, numCandidates := pl.getOffsetAndNumCandidates(int32(len(potentialNodes)))
|
|
if klog.V(5).Enabled() {
|
|
var sample []string
|
|
for i := offset; i < offset+10 && i < int32(len(potentialNodes)); i++ {
|
|
sample = append(sample, potentialNodes[i].Node().Name)
|
|
}
|
|
klog.Infof("from a pool of %d nodes (offset: %d, sample %d nodes: %v), ~%d candidates will be chosen", len(potentialNodes), offset, len(sample), sample, numCandidates)
|
|
}
|
|
candidates, nodeStatuses := dryRunPreemption(ctx, pl.fh, state, pod, potentialNodes, pdbs, offset, numCandidates)
|
|
return candidates, potentialNodes, nodeStatuses, nil
|
|
}
|
|
|
|
// PodEligibleToPreemptOthers determines whether this pod should be considered
|
|
// for preempting other pods or not. If this pod has already preempted other
|
|
// pods and those are in their graceful termination period, it shouldn't be
|
|
// considered for preemption.
|
|
// We look at the node that is nominated for this pod and as long as there are
|
|
// terminating pods on the node, we don't consider this for preempting more pods.
|
|
func PodEligibleToPreemptOthers(pod *v1.Pod, nodeInfos framework.NodeInfoLister, nominatedNodeStatus *framework.Status) bool {
|
|
if pod.Spec.PreemptionPolicy != nil && *pod.Spec.PreemptionPolicy == v1.PreemptNever {
|
|
klog.V(5).Infof("Pod %v/%v is not eligible for preemption because it has a preemptionPolicy of %v", pod.Namespace, pod.Name, v1.PreemptNever)
|
|
return false
|
|
}
|
|
nomNodeName := pod.Status.NominatedNodeName
|
|
if len(nomNodeName) > 0 {
|
|
// If the pod's nominated node is considered as UnschedulableAndUnresolvable by the filters,
|
|
// then the pod should be considered for preempting again.
|
|
if nominatedNodeStatus.Code() == framework.UnschedulableAndUnresolvable {
|
|
return true
|
|
}
|
|
|
|
if nodeInfo, _ := nodeInfos.Get(nomNodeName); nodeInfo != nil {
|
|
podPriority := corev1helpers.PodPriority(pod)
|
|
for _, p := range nodeInfo.Pods {
|
|
if p.Pod.DeletionTimestamp != nil && corev1helpers.PodPriority(p.Pod) < podPriority {
|
|
// There is a terminating pod on the nominated node.
|
|
return false
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
|
|
// nodesWherePreemptionMightHelp returns a list of nodes with failed predicates
|
|
// that may be satisfied by removing pods from the node.
|
|
func nodesWherePreemptionMightHelp(nodes []*framework.NodeInfo, m framework.NodeToStatusMap) []*framework.NodeInfo {
|
|
var potentialNodes []*framework.NodeInfo
|
|
for _, node := range nodes {
|
|
name := node.Node().Name
|
|
// We reply on the status by each plugin - 'Unschedulable' or 'UnschedulableAndUnresolvable'
|
|
// to determine whether preemption may help or not on the node.
|
|
if m[name].Code() == framework.UnschedulableAndUnresolvable {
|
|
continue
|
|
}
|
|
potentialNodes = append(potentialNodes, node)
|
|
}
|
|
return potentialNodes
|
|
}
|
|
|
|
type candidateList struct {
|
|
idx int32
|
|
items []Candidate
|
|
}
|
|
|
|
func newCandidateList(size int32) *candidateList {
|
|
return &candidateList{idx: -1, items: make([]Candidate, size)}
|
|
}
|
|
|
|
// add adds a new candidate to the internal array atomically.
|
|
func (cl *candidateList) add(c *candidate) {
|
|
if idx := atomic.AddInt32(&cl.idx, 1); idx < int32(len(cl.items)) {
|
|
cl.items[idx] = c
|
|
}
|
|
}
|
|
|
|
// size returns the number of candidate stored. Note that some add() operations
|
|
// might still be executing when this is called, so care must be taken to
|
|
// ensure that all add() operations complete before accessing the elements of
|
|
// the list.
|
|
func (cl *candidateList) size() int32 {
|
|
n := atomic.LoadInt32(&cl.idx) + 1
|
|
if n >= int32(len(cl.items)) {
|
|
n = int32(len(cl.items))
|
|
}
|
|
return n
|
|
}
|
|
|
|
// get returns the internal candidate array. This function is NOT atomic and
|
|
// assumes that all add() operations have been completed.
|
|
func (cl *candidateList) get() []Candidate {
|
|
return cl.items[:cl.size()]
|
|
}
|
|
|
|
// dryRunPreemption simulates Preemption logic on <potentialNodes> in parallel,
|
|
// returns preemption candidates and a map indicating filtered nodes statuses.
|
|
// The number of candidates depends on the constraints defined in the plugin's args. In the returned list of
|
|
// candidates, ones that do not violate PDB are preferred over ones that do.
|
|
func dryRunPreemption(ctx context.Context, fh framework.Handle,
|
|
state *framework.CycleState, pod *v1.Pod, potentialNodes []*framework.NodeInfo,
|
|
pdbs []*policy.PodDisruptionBudget, offset int32, numCandidates int32) ([]Candidate, framework.NodeToStatusMap) {
|
|
nonViolatingCandidates := newCandidateList(numCandidates)
|
|
violatingCandidates := newCandidateList(numCandidates)
|
|
parallelCtx, cancel := context.WithCancel(ctx)
|
|
nodeStatuses := make(framework.NodeToStatusMap)
|
|
var statusesLock sync.Mutex
|
|
checkNode := func(i int) {
|
|
nodeInfoCopy := potentialNodes[(int(offset)+i)%len(potentialNodes)].Clone()
|
|
stateCopy := state.Clone()
|
|
pods, numPDBViolations, status := selectVictimsOnNode(ctx, fh, stateCopy, pod, nodeInfoCopy, pdbs)
|
|
if status.IsSuccess() {
|
|
victims := extenderv1.Victims{
|
|
Pods: pods,
|
|
NumPDBViolations: int64(numPDBViolations),
|
|
}
|
|
c := &candidate{
|
|
victims: &victims,
|
|
name: nodeInfoCopy.Node().Name,
|
|
}
|
|
if numPDBViolations == 0 {
|
|
nonViolatingCandidates.add(c)
|
|
} else {
|
|
violatingCandidates.add(c)
|
|
}
|
|
nvcSize, vcSize := nonViolatingCandidates.size(), violatingCandidates.size()
|
|
if nvcSize > 0 && nvcSize+vcSize >= numCandidates {
|
|
cancel()
|
|
}
|
|
} else {
|
|
statusesLock.Lock()
|
|
nodeStatuses[nodeInfoCopy.Node().Name] = status
|
|
statusesLock.Unlock()
|
|
}
|
|
}
|
|
parallelize.Until(parallelCtx, len(potentialNodes), checkNode)
|
|
return append(nonViolatingCandidates.get(), violatingCandidates.get()...), nodeStatuses
|
|
}
|
|
|
|
// CallExtenders calls given <extenders> to select the list of feasible candidates.
|
|
// We will only check <candidates> with extenders that support preemption.
|
|
// Extenders which do not support preemption may later prevent preemptor from being scheduled on the nominated
|
|
// node. In that case, scheduler will find a different host for the preemptor in subsequent scheduling cycles.
|
|
func CallExtenders(extenders []framework.Extender, pod *v1.Pod, nodeLister framework.NodeInfoLister,
|
|
candidates []Candidate) ([]Candidate, error) {
|
|
if len(extenders) == 0 {
|
|
return candidates, nil
|
|
}
|
|
|
|
// Migrate candidate slice to victimsMap to adapt to the Extender interface.
|
|
// It's only applicable for candidate slice that have unique nominated node name.
|
|
victimsMap := candidatesToVictimsMap(candidates)
|
|
if len(victimsMap) == 0 {
|
|
return candidates, nil
|
|
}
|
|
for _, extender := range extenders {
|
|
if !extender.SupportsPreemption() || !extender.IsInterested(pod) {
|
|
continue
|
|
}
|
|
nodeNameToVictims, err := extender.ProcessPreemption(pod, victimsMap, nodeLister)
|
|
if err != nil {
|
|
if extender.IsIgnorable() {
|
|
klog.Warningf("Skipping extender %v as it returned error %v and has ignorable flag set",
|
|
extender, err)
|
|
continue
|
|
}
|
|
return nil, err
|
|
}
|
|
// Replace victimsMap with new result after preemption. So the
|
|
// rest of extenders can continue use it as parameter.
|
|
victimsMap = nodeNameToVictims
|
|
|
|
// If node list becomes empty, no preemption can happen regardless of other extenders.
|
|
if len(victimsMap) == 0 {
|
|
break
|
|
}
|
|
}
|
|
|
|
var newCandidates []Candidate
|
|
for nodeName := range victimsMap {
|
|
newCandidates = append(newCandidates, &candidate{
|
|
victims: victimsMap[nodeName],
|
|
name: nodeName,
|
|
})
|
|
}
|
|
return newCandidates, nil
|
|
}
|
|
|
|
// This function is not applicable for out-of-tree preemption plugins that exercise
|
|
// different preemption candidates on the same nominated node.
|
|
func candidatesToVictimsMap(candidates []Candidate) map[string]*extenderv1.Victims {
|
|
m := make(map[string]*extenderv1.Victims)
|
|
for _, c := range candidates {
|
|
m[c.Name()] = c.Victims()
|
|
}
|
|
return m
|
|
}
|
|
|
|
// SelectCandidate chooses the best-fit candidate from given <candidates> and return it.
|
|
func SelectCandidate(candidates []Candidate) Candidate {
|
|
if len(candidates) == 0 {
|
|
return nil
|
|
}
|
|
if len(candidates) == 1 {
|
|
return candidates[0]
|
|
}
|
|
|
|
victimsMap := candidatesToVictimsMap(candidates)
|
|
candidateNode := pickOneNodeForPreemption(victimsMap)
|
|
|
|
// Same as candidatesToVictimsMap, this logic is not applicable for out-of-tree
|
|
// preemption plugins that exercise different candidates on the same nominated node.
|
|
if victims := victimsMap[candidateNode]; victims != nil {
|
|
return &candidate{
|
|
victims: victims,
|
|
name: candidateNode,
|
|
}
|
|
}
|
|
|
|
// We shouldn't reach here.
|
|
klog.Errorf("should not reach here, no candidate selected from %v.", candidates)
|
|
// To not break the whole flow, return the first candidate.
|
|
return candidates[0]
|
|
}
|
|
|
|
// pickOneNodeForPreemption chooses one node among the given nodes. It assumes
|
|
// pods in each map entry are ordered by decreasing priority.
|
|
// It picks a node based on the following criteria:
|
|
// 1. A node with minimum number of PDB violations.
|
|
// 2. A node with minimum highest priority victim is picked.
|
|
// 3. Ties are broken by sum of priorities of all victims.
|
|
// 4. If there are still ties, node with the minimum number of victims is picked.
|
|
// 5. If there are still ties, node with the latest start time of all highest priority victims is picked.
|
|
// 6. If there are still ties, the first such node is picked (sort of randomly).
|
|
// The 'minNodes1' and 'minNodes2' are being reused here to save the memory
|
|
// allocation and garbage collection time.
|
|
func pickOneNodeForPreemption(nodesToVictims map[string]*extenderv1.Victims) string {
|
|
if len(nodesToVictims) == 0 {
|
|
return ""
|
|
}
|
|
minNumPDBViolatingPods := int64(math.MaxInt32)
|
|
var minNodes1 []string
|
|
lenNodes1 := 0
|
|
for node, victims := range nodesToVictims {
|
|
numPDBViolatingPods := victims.NumPDBViolations
|
|
if numPDBViolatingPods < minNumPDBViolatingPods {
|
|
minNumPDBViolatingPods = numPDBViolatingPods
|
|
minNodes1 = nil
|
|
lenNodes1 = 0
|
|
}
|
|
if numPDBViolatingPods == minNumPDBViolatingPods {
|
|
minNodes1 = append(minNodes1, node)
|
|
lenNodes1++
|
|
}
|
|
}
|
|
if lenNodes1 == 1 {
|
|
return minNodes1[0]
|
|
}
|
|
|
|
// There are more than one node with minimum number PDB violating pods. Find
|
|
// the one with minimum highest priority victim.
|
|
minHighestPriority := int32(math.MaxInt32)
|
|
var minNodes2 = make([]string, lenNodes1)
|
|
lenNodes2 := 0
|
|
for i := 0; i < lenNodes1; i++ {
|
|
node := minNodes1[i]
|
|
victims := nodesToVictims[node]
|
|
// highestPodPriority is the highest priority among the victims on this node.
|
|
highestPodPriority := corev1helpers.PodPriority(victims.Pods[0])
|
|
if highestPodPriority < minHighestPriority {
|
|
minHighestPriority = highestPodPriority
|
|
lenNodes2 = 0
|
|
}
|
|
if highestPodPriority == minHighestPriority {
|
|
minNodes2[lenNodes2] = node
|
|
lenNodes2++
|
|
}
|
|
}
|
|
if lenNodes2 == 1 {
|
|
return minNodes2[0]
|
|
}
|
|
|
|
// There are a few nodes with minimum highest priority victim. Find the
|
|
// smallest sum of priorities.
|
|
minSumPriorities := int64(math.MaxInt64)
|
|
lenNodes1 = 0
|
|
for i := 0; i < lenNodes2; i++ {
|
|
var sumPriorities int64
|
|
node := minNodes2[i]
|
|
for _, pod := range nodesToVictims[node].Pods {
|
|
// We add MaxInt32+1 to all priorities to make all of them >= 0. This is
|
|
// needed so that a node with a few pods with negative priority is not
|
|
// picked over a node with a smaller number of pods with the same negative
|
|
// priority (and similar scenarios).
|
|
sumPriorities += int64(corev1helpers.PodPriority(pod)) + int64(math.MaxInt32+1)
|
|
}
|
|
if sumPriorities < minSumPriorities {
|
|
minSumPriorities = sumPriorities
|
|
lenNodes1 = 0
|
|
}
|
|
if sumPriorities == minSumPriorities {
|
|
minNodes1[lenNodes1] = node
|
|
lenNodes1++
|
|
}
|
|
}
|
|
if lenNodes1 == 1 {
|
|
return minNodes1[0]
|
|
}
|
|
|
|
// There are a few nodes with minimum highest priority victim and sum of priorities.
|
|
// Find one with the minimum number of pods.
|
|
minNumPods := math.MaxInt32
|
|
lenNodes2 = 0
|
|
for i := 0; i < lenNodes1; i++ {
|
|
node := minNodes1[i]
|
|
numPods := len(nodesToVictims[node].Pods)
|
|
if numPods < minNumPods {
|
|
minNumPods = numPods
|
|
lenNodes2 = 0
|
|
}
|
|
if numPods == minNumPods {
|
|
minNodes2[lenNodes2] = node
|
|
lenNodes2++
|
|
}
|
|
}
|
|
if lenNodes2 == 1 {
|
|
return minNodes2[0]
|
|
}
|
|
|
|
// There are a few nodes with same number of pods.
|
|
// Find the node that satisfies latest(earliestStartTime(all highest-priority pods on node))
|
|
latestStartTime := util.GetEarliestPodStartTime(nodesToVictims[minNodes2[0]])
|
|
if latestStartTime == nil {
|
|
// If the earliest start time of all pods on the 1st node is nil, just return it,
|
|
// which is not expected to happen.
|
|
klog.Errorf("earliestStartTime is nil for node %s. Should not reach here.", minNodes2[0])
|
|
return minNodes2[0]
|
|
}
|
|
nodeToReturn := minNodes2[0]
|
|
for i := 1; i < lenNodes2; i++ {
|
|
node := minNodes2[i]
|
|
// Get earliest start time of all pods on the current node.
|
|
earliestStartTimeOnNode := util.GetEarliestPodStartTime(nodesToVictims[node])
|
|
if earliestStartTimeOnNode == nil {
|
|
klog.Errorf("earliestStartTime is nil for node %s. Should not reach here.", node)
|
|
continue
|
|
}
|
|
if earliestStartTimeOnNode.After(latestStartTime.Time) {
|
|
latestStartTime = earliestStartTimeOnNode
|
|
nodeToReturn = node
|
|
}
|
|
}
|
|
|
|
return nodeToReturn
|
|
}
|
|
|
|
// selectVictimsOnNode finds minimum set of pods on the given node that should
|
|
// be preempted in order to make enough room for "pod" to be scheduled. The
|
|
// minimum set selected is subject to the constraint that a higher-priority pod
|
|
// is never preempted when a lower-priority pod could be (higher/lower relative
|
|
// to one another, not relative to the preemptor "pod").
|
|
// The algorithm first checks if the pod can be scheduled on the node when all the
|
|
// lower priority pods are gone. If so, it sorts all the lower priority pods by
|
|
// their priority and then puts them into two groups of those whose PodDisruptionBudget
|
|
// will be violated if preempted and other non-violating pods. Both groups are
|
|
// sorted by priority. It first tries to reprieve as many PDB violating pods as
|
|
// possible and then does them same for non-PDB-violating pods while checking
|
|
// that the "pod" can still fit on the node.
|
|
// NOTE: This function assumes that it is never called if "pod" cannot be scheduled
|
|
// due to pod affinity, node affinity, or node anti-affinity reasons. None of
|
|
// these predicates can be satisfied by removing more pods from the node.
|
|
func selectVictimsOnNode(
|
|
ctx context.Context,
|
|
fh framework.Handle,
|
|
state *framework.CycleState,
|
|
pod *v1.Pod,
|
|
nodeInfo *framework.NodeInfo,
|
|
pdbs []*policy.PodDisruptionBudget,
|
|
) ([]*v1.Pod, int, *framework.Status) {
|
|
var potentialVictims []*framework.PodInfo
|
|
ph := fh.PreemptHandle()
|
|
removePod := func(rpi *framework.PodInfo) error {
|
|
if err := nodeInfo.RemovePod(rpi.Pod); err != nil {
|
|
return err
|
|
}
|
|
status := ph.RunPreFilterExtensionRemovePod(ctx, state, pod, rpi, nodeInfo)
|
|
if !status.IsSuccess() {
|
|
return status.AsError()
|
|
}
|
|
return nil
|
|
}
|
|
addPod := func(api *framework.PodInfo) error {
|
|
nodeInfo.AddPodInfo(api)
|
|
status := ph.RunPreFilterExtensionAddPod(ctx, state, pod, api, nodeInfo)
|
|
if !status.IsSuccess() {
|
|
return status.AsError()
|
|
}
|
|
return nil
|
|
}
|
|
// As the first step, remove all the lower priority pods from the node and
|
|
// check if the given pod can be scheduled.
|
|
podPriority := corev1helpers.PodPriority(pod)
|
|
for _, pi := range nodeInfo.Pods {
|
|
if corev1helpers.PodPriority(pi.Pod) < podPriority {
|
|
potentialVictims = append(potentialVictims, pi)
|
|
if err := removePod(pi); err != nil {
|
|
return nil, 0, framework.AsStatus(err)
|
|
}
|
|
}
|
|
}
|
|
|
|
// No potential victims are found, and so we don't need to evaluate the node again since its state didn't change.
|
|
if len(potentialVictims) == 0 {
|
|
message := fmt.Sprintf("No victims found on node %v for preemptor pod %v", nodeInfo.Node().Name, pod.Name)
|
|
return nil, 0, framework.NewStatus(framework.UnschedulableAndUnresolvable, message)
|
|
}
|
|
|
|
// If the new pod does not fit after removing all the lower priority pods,
|
|
// we are almost done and this node is not suitable for preemption. The only
|
|
// condition that we could check is if the "pod" is failing to schedule due to
|
|
// inter-pod affinity to one or more victims, but we have decided not to
|
|
// support this case for performance reasons. Having affinity to lower
|
|
// priority pods is not a recommended configuration anyway.
|
|
if status := fh.RunFilterPluginsWithNominatedPods(ctx, state, pod, nodeInfo); !status.IsSuccess() {
|
|
return nil, 0, status
|
|
}
|
|
var victims []*v1.Pod
|
|
numViolatingVictim := 0
|
|
sort.Slice(potentialVictims, func(i, j int) bool { return util.MoreImportantPod(potentialVictims[i].Pod, potentialVictims[j].Pod) })
|
|
// Try to reprieve as many pods as possible. We first try to reprieve the PDB
|
|
// violating victims and then other non-violating ones. In both cases, we start
|
|
// from the highest priority victims.
|
|
violatingVictims, nonViolatingVictims := filterPodsWithPDBViolation(potentialVictims, pdbs)
|
|
reprievePod := func(pi *framework.PodInfo) (bool, error) {
|
|
if err := addPod(pi); err != nil {
|
|
return false, err
|
|
}
|
|
status := fh.RunFilterPluginsWithNominatedPods(ctx, state, pod, nodeInfo)
|
|
fits := status.IsSuccess()
|
|
if !fits {
|
|
if err := removePod(pi); err != nil {
|
|
return false, err
|
|
}
|
|
rpi := pi.Pod
|
|
victims = append(victims, rpi)
|
|
klog.V(5).Infof("Pod %v/%v is a potential preemption victim on node %v.", rpi.Namespace, rpi.Name, nodeInfo.Node().Name)
|
|
}
|
|
return fits, nil
|
|
}
|
|
for _, p := range violatingVictims {
|
|
if fits, err := reprievePod(p); err != nil {
|
|
return nil, 0, framework.AsStatus(err)
|
|
} else if !fits {
|
|
numViolatingVictim++
|
|
}
|
|
}
|
|
// Now we try to reprieve non-violating victims.
|
|
for _, p := range nonViolatingVictims {
|
|
if _, err := reprievePod(p); err != nil {
|
|
return nil, 0, framework.AsStatus(err)
|
|
}
|
|
}
|
|
return victims, numViolatingVictim, framework.NewStatus(framework.Success)
|
|
}
|
|
|
|
// PrepareCandidate does some preparation work before nominating the selected candidate:
|
|
// - Evict the victim pods
|
|
// - Reject the victim pods if they are in waitingPod map
|
|
// - Clear the low-priority pods' nominatedNodeName status if needed
|
|
func PrepareCandidate(c Candidate, fh framework.Handle, cs kubernetes.Interface, pod *v1.Pod, pluginName string) error {
|
|
for _, victim := range c.Victims().Pods {
|
|
if err := util.DeletePod(cs, victim); err != nil {
|
|
klog.Errorf("Error preempting pod %v/%v: %v", victim.Namespace, victim.Name, err)
|
|
return err
|
|
}
|
|
// If the victim is a WaitingPod, send a reject message to the PermitPlugin
|
|
if waitingPod := fh.GetWaitingPod(victim.UID); waitingPod != nil {
|
|
waitingPod.Reject(pluginName, "preempted")
|
|
}
|
|
fh.EventRecorder().Eventf(victim, pod, v1.EventTypeNormal, "Preempted", "Preempting", "Preempted by %v/%v on node %v",
|
|
pod.Namespace, pod.Name, c.Name())
|
|
}
|
|
metrics.PreemptionVictims.Observe(float64(len(c.Victims().Pods)))
|
|
|
|
// Lower priority pods nominated to run on this node, may no longer fit on
|
|
// this node. So, we should remove their nomination. Removing their
|
|
// nomination updates these pods and moves them to the active queue. It
|
|
// lets scheduler find another place for them.
|
|
nominatedPods := getLowerPriorityNominatedPods(fh.PreemptHandle(), pod, c.Name())
|
|
if err := util.ClearNominatedNodeName(cs, nominatedPods...); err != nil {
|
|
klog.Errorf("Cannot clear 'NominatedNodeName' field: %v", err)
|
|
// We do not return as this error is not critical.
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// getLowerPriorityNominatedPods returns pods whose priority is smaller than the
|
|
// priority of the given "pod" and are nominated to run on the given node.
|
|
// Note: We could possibly check if the nominated lower priority pods still fit
|
|
// and return those that no longer fit, but that would require lots of
|
|
// manipulation of NodeInfo and PreFilter state per nominated pod. It may not be
|
|
// worth the complexity, especially because we generally expect to have a very
|
|
// small number of nominated pods per node.
|
|
func getLowerPriorityNominatedPods(pn framework.PodNominator, pod *v1.Pod, nodeName string) []*v1.Pod {
|
|
pods := pn.NominatedPodsForNode(nodeName)
|
|
|
|
if len(pods) == 0 {
|
|
return nil
|
|
}
|
|
|
|
var lowerPriorityPods []*v1.Pod
|
|
podPriority := corev1helpers.PodPriority(pod)
|
|
for _, p := range pods {
|
|
if corev1helpers.PodPriority(p) < podPriority {
|
|
lowerPriorityPods = append(lowerPriorityPods, p)
|
|
}
|
|
}
|
|
return lowerPriorityPods
|
|
}
|
|
|
|
// filterPodsWithPDBViolation groups the given "pods" into two groups of "violatingPods"
|
|
// and "nonViolatingPods" based on whether their PDBs will be violated if they are
|
|
// preempted.
|
|
// This function is stable and does not change the order of received pods. So, if it
|
|
// receives a sorted list, grouping will preserve the order of the input list.
|
|
func filterPodsWithPDBViolation(podInfos []*framework.PodInfo, pdbs []*policy.PodDisruptionBudget) (violatingPodInfos, nonViolatingPodInfos []*framework.PodInfo) {
|
|
pdbsAllowed := make([]int32, len(pdbs))
|
|
for i, pdb := range pdbs {
|
|
pdbsAllowed[i] = pdb.Status.DisruptionsAllowed
|
|
}
|
|
|
|
for _, podInfo := range podInfos {
|
|
pod := podInfo.Pod
|
|
pdbForPodIsViolated := false
|
|
// A pod with no labels will not match any PDB. So, no need to check.
|
|
if len(pod.Labels) != 0 {
|
|
for i, pdb := range pdbs {
|
|
if pdb.Namespace != pod.Namespace {
|
|
continue
|
|
}
|
|
selector, err := metav1.LabelSelectorAsSelector(pdb.Spec.Selector)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
// A PDB with a nil or empty selector matches nothing.
|
|
if selector.Empty() || !selector.Matches(labels.Set(pod.Labels)) {
|
|
continue
|
|
}
|
|
|
|
// Existing in DisruptedPods means it has been processed in API server,
|
|
// we don't treat it as a violating case.
|
|
if _, exist := pdb.Status.DisruptedPods[pod.Name]; exist {
|
|
continue
|
|
}
|
|
// Only decrement the matched pdb when it's not in its <DisruptedPods>;
|
|
// otherwise we may over-decrement the budget number.
|
|
pdbsAllowed[i]--
|
|
// We have found a matching PDB.
|
|
if pdbsAllowed[i] < 0 {
|
|
pdbForPodIsViolated = true
|
|
}
|
|
}
|
|
}
|
|
if pdbForPodIsViolated {
|
|
violatingPodInfos = append(violatingPodInfos, podInfo)
|
|
} else {
|
|
nonViolatingPodInfos = append(nonViolatingPodInfos, podInfo)
|
|
}
|
|
}
|
|
return violatingPodInfos, nonViolatingPodInfos
|
|
}
|
|
|
|
func getPDBLister(informerFactory informers.SharedInformerFactory) policylisters.PodDisruptionBudgetLister {
|
|
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.PodDisruptionBudget) {
|
|
return informerFactory.Policy().V1beta1().PodDisruptionBudgets().Lister()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func getPodDisruptionBudgets(pdbLister policylisters.PodDisruptionBudgetLister) ([]*policy.PodDisruptionBudget, error) {
|
|
if pdbLister != nil {
|
|
return pdbLister.List(labels.Everything())
|
|
}
|
|
return nil, nil
|
|
}
|