Fix an issue in inter-pod affinity predicate that cause affinity to self being processed incorrectly

This commit is contained in:
Bobby (Babak) Salamat
2018-04-06 17:43:51 -07:00
parent cb5f1ad9f7
commit c590ec7ae9
3 changed files with 81 additions and 63 deletions

View File

@@ -1164,39 +1164,36 @@ func (c *PodAffinityChecker) InterPodAffinityMatches(pod *v1.Pod, meta algorithm
return true, nil, nil
}
// anyPodMatchesPodAffinityTerm checks if any of given pods can match the specific podAffinityTerm.
// First return value indicates whether a matching pod exists on a node that matches the topology key,
// while the second return value indicates whether a matching pod exists anywhere.
// TODO: Do we really need any pod matching, or all pods matching? I think the latter.
func (c *PodAffinityChecker) anyPodMatchesPodAffinityTerm(pod *v1.Pod, pods []*v1.Pod, nodeInfo *schedulercache.NodeInfo, term *v1.PodAffinityTerm) (bool, bool, error) {
if len(term.TopologyKey) == 0 {
return false, false, fmt.Errorf("empty topologyKey is not allowed except for PreferredDuringScheduling pod anti-affinity")
// podMatchesPodAffinityTerms checks if the "targetPod" matches the given "terms"
// of the "pod" on the given "nodeInfo".Node(). It returns three values: 1) whether
// targetPod matches all the terms and their topologies, 2) whether targetPod
// matches all the terms label selector and namespaces (AKA term properties),
// 3) any error.
func (c *PodAffinityChecker) podMatchesPodAffinityTerms(pod *v1.Pod, targetPod *v1.Pod, nodeInfo *schedulercache.NodeInfo, terms []v1.PodAffinityTerm) (bool, bool, error) {
if len(terms) == 0 {
return false, false, fmt.Errorf("terms array is empty")
}
matchingPodExists := false
namespaces := priorityutil.GetNamespacesFromPodAffinityTerm(pod, term)
selector, err := metav1.LabelSelectorAsSelector(term.LabelSelector)
props, err := getAffinityTermProperties(pod, terms)
if err != nil {
return false, false, err
}
// Special case: When the topological domain is node, we can limit our
// search to pods on that node without searching the entire cluster.
if term.TopologyKey == kubeletapis.LabelHostname {
pods = nodeInfo.Pods()
affinityTermPropertiesMatch := podMatchesAffinityTermProperties(targetPod, props)
if !affinityTermPropertiesMatch {
return false, false, nil
}
for _, existingPod := range pods {
match := priorityutil.PodMatchesTermsNamespaceAndSelector(existingPod, namespaces, selector)
if match {
matchingPodExists = true
existingPodNode, err := c.info.GetNodeInfo(existingPod.Spec.NodeName)
if err != nil {
return false, matchingPodExists, err
}
if priorityutil.NodesHaveSameTopologyKey(nodeInfo.Node(), existingPodNode, term.TopologyKey) {
return true, matchingPodExists, nil
}
targetPodNode, err := c.info.GetNodeInfo(targetPod.Spec.NodeName)
if err != nil {
return false, false, err
}
for _, term := range terms {
if len(term.TopologyKey) == 0 {
return false, false, fmt.Errorf("empty topologyKey is not allowed except for PreferredDuringScheduling pod anti-affinity")
}
if !priorityutil.NodesHaveSameTopologyKey(nodeInfo.Node(), targetPodNode, term.TopologyKey) {
return false, affinityTermPropertiesMatch, nil
}
}
return false, matchingPodExists, nil
return true, affinityTermPropertiesMatch, nil
}
// GetPodAffinityTerms gets pod affinity terms by a pod affinity object.
@@ -1460,49 +1457,57 @@ func (c *PodAffinityChecker) satisfiesPodsAffinityAntiAffinity(pod *v1.Pod,
return ErrPodAffinityRulesNotMatch, err
}
// Check all affinity terms.
for _, term := range GetPodAffinityTerms(affinity.PodAffinity) {
termMatches, matchingPodExists, err := c.anyPodMatchesPodAffinityTerm(pod, filteredPods, nodeInfo, &term)
if err != nil {
errMessage := fmt.Sprintf("Cannot schedule pod %+v onto node %v, because of PodAffinityTerm %v, err: %v", podName(pod), node.Name, term, err)
glog.Error(errMessage)
return ErrPodAffinityRulesNotMatch, errors.New(errMessage)
}
if !termMatches {
// If the requirement matches a pod's own labels are namespace, and there are
// no other such pods, then disregard the requirement. This is necessary to
// not block forever because the first pod of the collection can't be scheduled.
if matchingPodExists {
glog.V(10).Infof("Cannot schedule pod %+v onto node %v, because of PodAffinityTerm %v",
podName(pod), node.Name, term)
return ErrPodAffinityRulesNotMatch, nil
}
namespaces := priorityutil.GetNamespacesFromPodAffinityTerm(pod, &term)
selector, err := metav1.LabelSelectorAsSelector(term.LabelSelector)
affinityTerms := GetPodAffinityTerms(affinity.PodAffinity)
antiAffinityTerms := GetPodAntiAffinityTerms(affinity.PodAntiAffinity)
matchFound, termsSelectorMatchFound := false, false
for _, targetPod := range filteredPods {
// Check all affinity terms.
if !matchFound && len(affinityTerms) > 0 {
affTermsMatch, termsSelectorMatch, err := c.podMatchesPodAffinityTerms(pod, targetPod, nodeInfo, affinityTerms)
if err != nil {
errMessage := fmt.Sprintf("Cannot parse selector on term %v for pod %v. Details %v", term, podName(pod), err)
errMessage := fmt.Sprintf("Cannot schedule pod %+v onto node %v, because of PodAffinity, err: %v", podName(pod), node.Name, err)
glog.Error(errMessage)
return ErrPodAffinityRulesNotMatch, errors.New(errMessage)
}
match := priorityutil.PodMatchesTermsNamespaceAndSelector(pod, namespaces, selector)
if !match {
glog.V(10).Infof("Cannot schedule pod %+v onto node %v, because of PodAffinityTerm %v",
podName(pod), node.Name, term)
return ErrPodAffinityRulesNotMatch, nil
if termsSelectorMatch {
termsSelectorMatchFound = true
}
if affTermsMatch {
matchFound = true
}
}
// Check all anti-affinity terms.
if len(antiAffinityTerms) > 0 {
antiAffTermsMatch, _, err := c.podMatchesPodAffinityTerms(pod, targetPod, nodeInfo, antiAffinityTerms)
if err != nil || antiAffTermsMatch {
glog.V(10).Infof("Cannot schedule pod %+v onto node %v, because of PodAntiAffinityTerm, err: %v",
podName(pod), node.Name, err)
return ErrPodAntiAffinityRulesNotMatch, nil
}
}
}
// Check all anti-affinity terms.
for _, term := range GetPodAntiAffinityTerms(affinity.PodAntiAffinity) {
termMatches, _, err := c.anyPodMatchesPodAffinityTerm(pod, filteredPods, nodeInfo, &term)
if err != nil || termMatches {
glog.V(10).Infof("Cannot schedule pod %+v onto node %v, because of PodAntiAffinityTerm %v, err: %v",
podName(pod), node.Name, term, err)
return ErrPodAntiAffinityRulesNotMatch, nil
if !matchFound && len(affinityTerms) > 0 {
// We have not been able to find any matches for the pod's affinity rules.
// This pod may the first pod in a series that have affinity to themselves. In order
// to not leave such pods in pending state forever, we check that if no other pod
// in the cluster matches the namespace and selector of this pod and the pod matches
// its own terms, then we allow the pod to pass the affinity check.
if termsSelectorMatchFound {
glog.V(10).Infof("Cannot schedule pod %+v onto node %v, because of PodAffinity",
podName(pod), node.Name)
return ErrPodAffinityRulesNotMatch, nil
}
// Check if pod matches its own affinity properties (namespace and label selector).
if !targetPodMatchesAffinityOfPod(pod, pod) {
glog.V(10).Infof("Cannot schedule pod %+v onto node %v, because of PodAffinity",
podName(pod), node.Name)
return ErrPodAffinityRulesNotMatch, nil
}
}
}
if glog.V(10) {
// We explicitly don't do glog.V(10).Infof() to avoid computing all the parameters if this is
// not logged. There is visible performance gain from it.