move inter pod affinity predicate logic to its Filter plugin

This commit is contained in:
Abdullah Gharaibeh
2019-12-19 20:35:57 -05:00
parent 641d0290e4
commit 429448cb40
14 changed files with 1686 additions and 1713 deletions

View File

@@ -27,7 +27,6 @@ import (
v1 "k8s.io/api/core/v1"
storage "k8s.io/api/storage/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/rand"
@@ -40,8 +39,6 @@ import (
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler/algorithm"
priorityutil "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/util"
schedulerlisters "k8s.io/kubernetes/pkg/scheduler/listers"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
schedutil "k8s.io/kubernetes/pkg/scheduler/util"
"k8s.io/kubernetes/pkg/scheduler/volumebinder"
@@ -1056,341 +1053,6 @@ func GeneralPredicates(pod *v1.Pod, meta Metadata, nodeInfo *schedulernodeinfo.N
return len(predicateFails) == 0, predicateFails, nil
}
// PodAffinityChecker contains information to check pod affinity.
type PodAffinityChecker struct {
nodeInfoLister schedulerlisters.NodeInfoLister
podLister schedulerlisters.PodLister
}
// NewPodAffinityChecker returns a PodAffinityChecker.
func NewPodAffinityChecker(sharedLister schedulerlisters.SharedLister) *PodAffinityChecker {
return &PodAffinityChecker{
nodeInfoLister: sharedLister.NodeInfos(),
podLister: sharedLister.Pods(),
}
}
// NewPodAffinityPredicate creates a PodAffinityChecker.
func NewPodAffinityPredicate(nodeInfoLister schedulerlisters.NodeInfoLister, podLister schedulerlisters.PodLister) FitPredicate {
return func(pod *v1.Pod, meta Metadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) {
return false, nil, fmt.Errorf("This function should never be called")
}
}
// InterPodAffinityMatches checks if a pod can be scheduled on the specified node with pod affinity/anti-affinity configuration.
// First return value indicates whether a pod can be scheduled on the specified node while the second return value indicates the
// predicate failure reasons if the pod cannot be scheduled on the specified node.
func (c *PodAffinityChecker) InterPodAffinityMatches(pod *v1.Pod, meta *PodAffinityMetadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) {
node := nodeInfo.Node()
if node == nil {
return false, nil, fmt.Errorf("node not found")
}
if failedPredicates, error := c.satisfiesExistingPodsAntiAffinity(pod, meta, nodeInfo); failedPredicates != nil {
failedPredicates := append([]PredicateFailureReason{ErrPodAffinityNotMatch}, failedPredicates)
return false, failedPredicates, error
}
// Now check if <pod> requirements will be satisfied on this node.
affinity := pod.Spec.Affinity
if affinity == nil || (affinity.PodAffinity == nil && affinity.PodAntiAffinity == nil) {
return true, nil, nil
}
if failedPredicates, error := c.satisfiesPodsAffinityAntiAffinity(pod, meta, nodeInfo, affinity); failedPredicates != nil {
failedPredicates := append([]PredicateFailureReason{ErrPodAffinityNotMatch}, failedPredicates)
return false, failedPredicates, error
}
if klog.V(10) {
// We explicitly don't do klog.V(10).Infof() to avoid computing all the parameters if this is
// not logged. There is visible performance gain from it.
klog.Infof("Schedule Pod %+v on Node %+v is allowed, pod (anti)affinity constraints satisfied",
podName(pod), node.Name)
}
return true, nil, nil
}
// podMatchesPodAffinityTerms checks if the "targetPod" matches the given "terms"
// of the "pod" on the given "nodeInfo".Node(). It returns three values: 1) whether
// targetPod matches all the terms and their topologies, 2) whether targetPod
// matches all the terms label selector and namespaces (AKA term properties),
// 3) any error.
func (c *PodAffinityChecker) podMatchesPodAffinityTerms(pod, targetPod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo, terms []v1.PodAffinityTerm) (bool, bool, error) {
if len(terms) == 0 {
return false, false, fmt.Errorf("terms array is empty")
}
props, err := getAffinityTermProperties(pod, terms)
if err != nil {
return false, false, err
}
if !podMatchesAllAffinityTermProperties(targetPod, props) {
return false, false, nil
}
// Namespace and selector of the terms have matched. Now we check topology of the terms.
targetPodNodeInfo, err := c.nodeInfoLister.Get(targetPod.Spec.NodeName)
if err != nil {
return false, false, err
}
for _, term := range terms {
if len(term.TopologyKey) == 0 {
return false, false, fmt.Errorf("empty topologyKey is not allowed except for PreferredDuringScheduling pod anti-affinity")
}
if !priorityutil.NodesHaveSameTopologyKey(nodeInfo.Node(), targetPodNodeInfo.Node(), term.TopologyKey) {
return false, true, nil
}
}
return true, true, nil
}
// GetPodAffinityTerms gets pod affinity terms by a pod affinity object.
func GetPodAffinityTerms(podAffinity *v1.PodAffinity) (terms []v1.PodAffinityTerm) {
if podAffinity != nil {
if len(podAffinity.RequiredDuringSchedulingIgnoredDuringExecution) != 0 {
terms = podAffinity.RequiredDuringSchedulingIgnoredDuringExecution
}
// TODO: Uncomment this block when implement RequiredDuringSchedulingRequiredDuringExecution.
//if len(podAffinity.RequiredDuringSchedulingRequiredDuringExecution) != 0 {
// terms = append(terms, podAffinity.RequiredDuringSchedulingRequiredDuringExecution...)
//}
}
return terms
}
// GetPodAntiAffinityTerms gets pod affinity terms by a pod anti-affinity.
func GetPodAntiAffinityTerms(podAntiAffinity *v1.PodAntiAffinity) (terms []v1.PodAffinityTerm) {
if podAntiAffinity != nil {
if len(podAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution) != 0 {
terms = podAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution
}
// TODO: Uncomment this block when implement RequiredDuringSchedulingRequiredDuringExecution.
//if len(podAntiAffinity.RequiredDuringSchedulingRequiredDuringExecution) != 0 {
// terms = append(terms, podAntiAffinity.RequiredDuringSchedulingRequiredDuringExecution...)
//}
}
return terms
}
// getMatchingAntiAffinityTopologyPairs calculates the following for "existingPod" on given node:
// (1) Whether it has PodAntiAffinity
// (2) Whether ANY AffinityTerm matches the incoming pod
func getMatchingAntiAffinityTopologyPairsOfPod(newPod *v1.Pod, existingPod *v1.Pod, node *v1.Node) (topologyToMatchedTermCount, error) {
affinity := existingPod.Spec.Affinity
if affinity == nil || affinity.PodAntiAffinity == nil {
return nil, nil
}
topologyMap := make(topologyToMatchedTermCount)
for _, term := range GetPodAntiAffinityTerms(affinity.PodAntiAffinity) {
selector, err := metav1.LabelSelectorAsSelector(term.LabelSelector)
if err != nil {
return nil, err
}
namespaces := priorityutil.GetNamespacesFromPodAffinityTerm(existingPod, &term)
if priorityutil.PodMatchesTermsNamespaceAndSelector(newPod, namespaces, selector) {
if topologyValue, ok := node.Labels[term.TopologyKey]; ok {
pair := topologyPair{key: term.TopologyKey, value: topologyValue}
topologyMap[pair]++
}
}
}
return topologyMap, nil
}
func (c *PodAffinityChecker) getMatchingAntiAffinityTopologyPairsOfPods(pod *v1.Pod, existingPods []*v1.Pod) (topologyToMatchedTermCount, error) {
topologyMaps := make(topologyToMatchedTermCount)
for _, existingPod := range existingPods {
existingPodNodeInfo, err := c.nodeInfoLister.Get(existingPod.Spec.NodeName)
if err != nil {
klog.Errorf("Pod %s has NodeName %q but node is not found", podName(existingPod), existingPod.Spec.NodeName)
continue
}
existingPodTopologyMaps, err := getMatchingAntiAffinityTopologyPairsOfPod(pod, existingPod, existingPodNodeInfo.Node())
if err != nil {
return nil, err
}
topologyMaps.appendMaps(existingPodTopologyMaps)
}
return topologyMaps, nil
}
// Checks if scheduling the pod onto this node would break any anti-affinity
// terms indicated by the existing pods.
func (c *PodAffinityChecker) satisfiesExistingPodsAntiAffinity(pod *v1.Pod, meta *PodAffinityMetadata, nodeInfo *schedulernodeinfo.NodeInfo) (PredicateFailureReason, error) {
node := nodeInfo.Node()
if node == nil {
return ErrExistingPodsAntiAffinityRulesNotMatch, fmt.Errorf("node not found")
}
var topologyMap topologyToMatchedTermCount
if meta != nil {
topologyMap = meta.topologyToMatchedExistingAntiAffinityTerms
} else {
// Filter out pods whose nodeName is equal to nodeInfo.node.Name, but are not
// present in nodeInfo. Pods on other nodes pass the filter.
filteredPods, err := c.podLister.FilteredList(nodeInfo.Filter, labels.Everything())
if err != nil {
errMessage := fmt.Sprintf("Failed to get all pods: %v", err)
klog.Error(errMessage)
return ErrExistingPodsAntiAffinityRulesNotMatch, errors.New(errMessage)
}
if topologyMap, err = c.getMatchingAntiAffinityTopologyPairsOfPods(pod, filteredPods); err != nil {
errMessage := fmt.Sprintf("Failed to get all terms that match pod %s: %v", podName(pod), err)
klog.Error(errMessage)
return ErrExistingPodsAntiAffinityRulesNotMatch, errors.New(errMessage)
}
}
// Iterate over topology pairs to get any of the pods being affected by
// the scheduled pod anti-affinity terms
for topologyKey, topologyValue := range node.Labels {
if topologyMap[topologyPair{key: topologyKey, value: topologyValue}] > 0 {
klog.V(10).Infof("Cannot schedule pod %+v onto node %v", podName(pod), node.Name)
return ErrExistingPodsAntiAffinityRulesNotMatch, nil
}
}
if klog.V(10) {
// We explicitly don't do klog.V(10).Infof() to avoid computing all the parameters if this is
// not logged. There is visible performance gain from it.
klog.Infof("Schedule Pod %+v on Node %+v is allowed, existing pods anti-affinity terms satisfied.",
podName(pod), node.Name)
}
return nil, nil
}
// nodeMatchesAllTopologyTerms checks whether "nodeInfo" matches
// topology of all the "terms" for the given "pod".
func (c *PodAffinityChecker) nodeMatchesAllTopologyTerms(pod *v1.Pod, topologyPairs topologyToMatchedTermCount, nodeInfo *schedulernodeinfo.NodeInfo, terms []v1.PodAffinityTerm) bool {
node := nodeInfo.Node()
for _, term := range terms {
if topologyValue, ok := node.Labels[term.TopologyKey]; ok {
pair := topologyPair{key: term.TopologyKey, value: topologyValue}
if topologyPairs[pair] <= 0 {
return false
}
} else {
return false
}
}
return true
}
// nodeMatchesAnyTopologyTerm checks whether "nodeInfo" matches
// topology of any "term" for the given "pod".
func (c *PodAffinityChecker) nodeMatchesAnyTopologyTerm(pod *v1.Pod, topologyPairs topologyToMatchedTermCount, nodeInfo *schedulernodeinfo.NodeInfo, terms []v1.PodAffinityTerm) bool {
node := nodeInfo.Node()
for _, term := range terms {
if topologyValue, ok := node.Labels[term.TopologyKey]; ok {
pair := topologyPair{key: term.TopologyKey, value: topologyValue}
if topologyPairs[pair] > 0 {
return true
}
}
}
return false
}
// satisfiesPodsAffinityAntiAffinity checks if scheduling the pod onto this node would break any term of this pod.
func (c *PodAffinityChecker) satisfiesPodsAffinityAntiAffinity(pod *v1.Pod,
predicateMeta *PodAffinityMetadata, nodeInfo *schedulernodeinfo.NodeInfo,
affinity *v1.Affinity) (PredicateFailureReason, error) {
node := nodeInfo.Node()
if node == nil {
return ErrPodAffinityRulesNotMatch, fmt.Errorf("node not found")
}
if predicateMeta != nil {
// Check all affinity terms.
topologyToMatchedAffinityTerms := predicateMeta.topologyToMatchedAffinityTerms
if affinityTerms := GetPodAffinityTerms(affinity.PodAffinity); len(affinityTerms) > 0 {
matchExists := c.nodeMatchesAllTopologyTerms(pod, topologyToMatchedAffinityTerms, nodeInfo, affinityTerms)
if !matchExists {
// This pod may the first pod in a series that have affinity to themselves. In order
// to not leave such pods in pending state forever, we check that if no other pod
// in the cluster matches the namespace and selector of this pod and the pod matches
// its own terms, then we allow the pod to pass the affinity check.
if len(topologyToMatchedAffinityTerms) != 0 || !targetPodMatchesAffinityOfPod(pod, pod) {
klog.V(10).Infof("Cannot schedule pod %+v onto node %v, because of PodAffinity",
podName(pod), node.Name)
return ErrPodAffinityRulesNotMatch, nil
}
}
}
// Check all anti-affinity terms.
topologyToMatchedAntiAffinityTerms := predicateMeta.topologyToMatchedAntiAffinityTerms
if antiAffinityTerms := GetPodAntiAffinityTerms(affinity.PodAntiAffinity); len(antiAffinityTerms) > 0 {
matchExists := c.nodeMatchesAnyTopologyTerm(pod, topologyToMatchedAntiAffinityTerms, nodeInfo, antiAffinityTerms)
if matchExists {
klog.V(10).Infof("Cannot schedule pod %+v onto node %v, because of PodAntiAffinity",
podName(pod), node.Name)
return ErrPodAntiAffinityRulesNotMatch, nil
}
}
} else { // We don't have precomputed metadata. We have to follow a slow path to check affinity terms.
filteredPods, err := c.podLister.FilteredList(nodeInfo.Filter, labels.Everything())
if err != nil {
return ErrPodAffinityRulesNotMatch, err
}
affinityTerms := GetPodAffinityTerms(affinity.PodAffinity)
antiAffinityTerms := GetPodAntiAffinityTerms(affinity.PodAntiAffinity)
matchFound, termsSelectorMatchFound := false, false
for _, targetPod := range filteredPods {
// Check all affinity terms.
if !matchFound && len(affinityTerms) > 0 {
affTermsMatch, termsSelectorMatch, err := c.podMatchesPodAffinityTerms(pod, targetPod, nodeInfo, affinityTerms)
if err != nil {
errMessage := fmt.Sprintf("Cannot schedule pod %s onto node %s, because of PodAffinity: %v", podName(pod), node.Name, err)
klog.Error(errMessage)
return ErrPodAffinityRulesNotMatch, errors.New(errMessage)
}
if termsSelectorMatch {
termsSelectorMatchFound = true
}
if affTermsMatch {
matchFound = true
}
}
// Check all anti-affinity terms.
if len(antiAffinityTerms) > 0 {
antiAffTermsMatch, _, err := c.podMatchesPodAffinityTerms(pod, targetPod, nodeInfo, antiAffinityTerms)
if err != nil || antiAffTermsMatch {
klog.V(10).Infof("Cannot schedule pod %+v onto node %v, because of PodAntiAffinityTerm, err: %v",
podName(pod), node.Name, err)
return ErrPodAntiAffinityRulesNotMatch, nil
}
}
}
if !matchFound && len(affinityTerms) > 0 {
// We have not been able to find any matches for the pod's affinity terms.
// This pod may be the first pod in a series that have affinity to themselves. In order
// to not leave such pods in pending state forever, we check that if no other pod
// in the cluster matches the namespace and selector of this pod and the pod matches
// its own terms, then we allow the pod to pass the affinity check.
if termsSelectorMatchFound {
klog.V(10).Infof("Cannot schedule pod %+v onto node %v, because of PodAffinity",
podName(pod), node.Name)
return ErrPodAffinityRulesNotMatch, nil
}
// Check if pod matches its own affinity properties (namespace and label selector).
if !targetPodMatchesAffinityOfPod(pod, pod) {
klog.V(10).Infof("Cannot schedule pod %+v onto node %v, because of PodAffinity",
podName(pod), node.Name)
return ErrPodAffinityRulesNotMatch, nil
}
}
}
if klog.V(10) {
// We explicitly don't do klog.V(10).Infof() to avoid computing all the parameters if this is
// not logged. There is visible performance gain from it.
klog.Infof("Schedule Pod %+v on Node %+v is allowed, pod affinity/anti-affinity constraints satisfied.",
podName(pod), node.Name)
}
return nil, nil
}
// CheckNodeUnschedulablePredicate checks if a pod can be scheduled on a node with Unschedulable spec.
func CheckNodeUnschedulablePredicate(pod *v1.Pod, meta Metadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) {
if nodeInfo == nil || nodeInfo.Node() == nil {