Cleanup "slow-path" logic in scheduler Filters

This commit is contained in:
notpad
2020-02-10 22:48:49 +08:00
parent 06c639fb4a
commit a7057f8df0
11 changed files with 65 additions and 421 deletions

View File

@@ -388,10 +388,8 @@ func (pl *InterPodAffinity) RemovePod(ctx context.Context, cycleState *framework
func getPreFilterState(cycleState *framework.CycleState) (*preFilterState, error) {
c, err := cycleState.Read(preFilterStateKey)
if err != nil {
// The preFilterState wasn't pre-computed in prefilter. We ignore the error for now since
// Filter is able to handle that by computing it again.
klog.V(5).Infof("Error reading %q from cycleState: %v", preFilterStateKey, err)
return nil, nil
// The preFilterState wasn't pre-computed in prefilter.
return nil, fmt.Errorf("error reading %q from cycleState: %v", preFilterStateKey, err)
}
s, ok := c.(*preFilterState)
@@ -405,20 +403,7 @@ func getPreFilterState(cycleState *framework.CycleState) (*preFilterState, error
// terms indicated by the existing pods.
func (pl *InterPodAffinity) satisfiesExistingPodsAntiAffinity(pod *v1.Pod, state *preFilterState, nodeInfo *nodeinfo.NodeInfo) (bool, error) {
node := nodeInfo.Node()
var topologyMap topologyToMatchedTermCount
if state != nil {
topologyMap = state.topologyToMatchedExistingAntiAffinityTerms
} else {
// Filter out pods whose nodeName is equal to nodeInfo.node.Name, but are not
// present in nodeInfo. Pods on other nodes pass the filter.
filteredPods, err := pl.sharedLister.Pods().FilteredList(nodeInfo.Filter, labels.Everything())
if err != nil {
return false, fmt.Errorf("Failed to get all pods: %v", err)
}
if topologyMap, err = pl.getMatchingAntiAffinityTopologyPairsOfPods(pod, filteredPods); err != nil {
return false, fmt.Errorf("Failed to get all terms that match pod: %v", err)
}
}
topologyMap := state.topologyToMatchedExistingAntiAffinityTerms
// Iterate over topology pairs to get any of the pods being affected by
// the scheduled pod anti-affinity terms
@@ -462,37 +447,6 @@ func nodeMatchesAnyTopologyTerm(pod *v1.Pod, topologyPairs topologyToMatchedTerm
return false
}
// podMatchesPodAffinityTerms checks if the "targetPod" matches the given "terms"
// of the "pod" on the given "nodeInfo".Node(). It returns three values: 1) whether
// targetPod matches all the terms and their topologies, 2) whether targetPod
// matches all the terms label selector and namespaces 3) any error.
func (pl *InterPodAffinity) podMatchesPodAffinityTerms(pod, targetPod *v1.Pod, nodeInfo *nodeinfo.NodeInfo, terms []v1.PodAffinityTerm) (bool, bool, error) {
if len(terms) == 0 {
return false, false, fmt.Errorf("terms array is empty")
}
props, err := getAffinityTerms(pod, terms)
if err != nil {
return false, false, err
}
if !podMatchesAllAffinityTerms(targetPod, props) {
return false, false, nil
}
// Namespace and selector of the terms have matched. Now we check topology of the terms.
targetPodNodeInfo, err := pl.sharedLister.NodeInfos().Get(targetPod.Spec.NodeName)
if err != nil {
return false, false, err
}
for _, term := range terms {
if len(term.TopologyKey) == 0 {
return false, false, fmt.Errorf("empty topologyKey is not allowed except for PreferredDuringScheduling pod anti-affinity")
}
if !schedutil.NodesHaveSameTopologyKey(nodeInfo.Node(), targetPodNodeInfo.Node(), term.TopologyKey) {
return false, true, nil
}
}
return true, true, nil
}
// getMatchingAntiAffinityTopologyPairs calculates the following for "existingPod" on given node:
// (1) Whether it has PodAntiAffinity
// (2) Whether ANY AffinityTerm matches the incoming pod
@@ -519,24 +473,6 @@ func getMatchingAntiAffinityTopologyPairsOfPod(newPod *v1.Pod, existingPod *v1.P
return topologyMap, nil
}
func (pl *InterPodAffinity) getMatchingAntiAffinityTopologyPairsOfPods(pod *v1.Pod, existingPods []*v1.Pod) (topologyToMatchedTermCount, error) {
topologyMaps := make(topologyToMatchedTermCount)
for _, existingPod := range existingPods {
existingPodNodeInfo, err := pl.sharedLister.NodeInfos().Get(existingPod.Spec.NodeName)
if err != nil {
klog.Errorf("Pod %s has NodeName %q but node is not found", existingPod.Name, existingPod.Spec.NodeName)
continue
}
existingPodTopologyMaps, err := getMatchingAntiAffinityTopologyPairsOfPod(pod, existingPod, existingPodNodeInfo.Node())
if err != nil {
return nil, err
}
topologyMaps.append(existingPodTopologyMaps)
}
return topologyMaps, nil
}
// satisfiesPodsAffinityAntiAffinity checks if scheduling the pod onto this node would break any term of this pod.
// This function returns two boolean flags. The first boolean flag indicates whether the pod matches affinity rules
// or not. The second boolean flag indicates if the pod matches anti-affinity rules.
@@ -547,79 +483,31 @@ func (pl *InterPodAffinity) satisfiesPodsAffinityAntiAffinity(pod *v1.Pod,
if node == nil {
return false, false, fmt.Errorf("node not found")
}
if state != nil {
// Check all affinity terms.
topologyToMatchedAffinityTerms := state.topologyToMatchedAffinityTerms
if affinityTerms := schedutil.GetPodAffinityTerms(affinity.PodAffinity); len(affinityTerms) > 0 {
matchExists := nodeMatchesAllTopologyTerms(pod, topologyToMatchedAffinityTerms, nodeInfo, affinityTerms)
if !matchExists {
// This pod may the first pod in a series that have affinity to themselves. In order
// to not leave such pods in pending state forever, we check that if no other pod
// in the cluster matches the namespace and selector of this pod and the pod matches
// its own terms, then we allow the pod to pass the affinity check.
if len(topologyToMatchedAffinityTerms) != 0 || !targetPodMatchesAffinityOfPod(pod, pod) {
return false, false, nil
}
}
}
// Check all anti-affinity terms.
topologyToMatchedAntiAffinityTerms := state.topologyToMatchedAntiAffinityTerms
if antiAffinityTerms := schedutil.GetPodAntiAffinityTerms(affinity.PodAntiAffinity); len(antiAffinityTerms) > 0 {
matchExists := nodeMatchesAnyTopologyTerm(pod, topologyToMatchedAntiAffinityTerms, nodeInfo, antiAffinityTerms)
if matchExists {
return true, false, nil
}
}
} else { // We don't have precomputed preFilterState. We have to follow a slow path to check affinity terms.
filteredPods, err := pl.sharedLister.Pods().FilteredList(nodeInfo.Filter, labels.Everything())
if err != nil {
return false, false, err
}
affinityTerms := schedutil.GetPodAffinityTerms(affinity.PodAffinity)
antiAffinityTerms := schedutil.GetPodAntiAffinityTerms(affinity.PodAntiAffinity)
matchFound, termsSelectorMatchFound := false, false
for _, targetPod := range filteredPods {
// Check all affinity terms.
if !matchFound && len(affinityTerms) > 0 {
affTermsMatch, termsSelectorMatch, err := pl.podMatchesPodAffinityTerms(pod, targetPod, nodeInfo, affinityTerms)
if err != nil {
return false, false, err
}
if termsSelectorMatch {
termsSelectorMatchFound = true
}
if affTermsMatch {
matchFound = true
}
}
// Check all anti-affinity terms.
if len(antiAffinityTerms) > 0 {
antiAffTermsMatch, _, err := pl.podMatchesPodAffinityTerms(pod, targetPod, nodeInfo, antiAffinityTerms)
if err != nil || antiAffTermsMatch {
return true, false, err
}
}
}
if !matchFound && len(affinityTerms) > 0 {
// We have not been able to find any matches for the pod's affinity terms.
// This pod may be the first pod in a series that have affinity to themselves. In order
// Check all affinity terms.
topologyToMatchedAffinityTerms := state.topologyToMatchedAffinityTerms
if affinityTerms := schedutil.GetPodAffinityTerms(affinity.PodAffinity); len(affinityTerms) > 0 {
matchExists := nodeMatchesAllTopologyTerms(pod, topologyToMatchedAffinityTerms, nodeInfo, affinityTerms)
if !matchExists {
// This pod may the first pod in a series that have affinity to themselves. In order
// to not leave such pods in pending state forever, we check that if no other pod
// in the cluster matches the namespace and selector of this pod and the pod matches
// its own terms, then we allow the pod to pass the affinity check.
if termsSelectorMatchFound {
return false, false, nil
}
// Check if pod matches its own affinity terms (namespace and label selector).
if !targetPodMatchesAffinityOfPod(pod, pod) {
if len(topologyToMatchedAffinityTerms) != 0 || !targetPodMatchesAffinityOfPod(pod, pod) {
return false, false, nil
}
}
}
// Check all anti-affinity terms.
topologyToMatchedAntiAffinityTerms := state.topologyToMatchedAntiAffinityTerms
if antiAffinityTerms := schedutil.GetPodAntiAffinityTerms(affinity.PodAntiAffinity); len(antiAffinityTerms) > 0 {
matchExists := nodeMatchesAnyTopologyTerm(pod, topologyToMatchedAntiAffinityTerms, nodeInfo, antiAffinityTerms)
if matchExists {
return true, false, nil
}
}
return true, true, nil
}

View File

@@ -1634,124 +1634,6 @@ func TestRequiredAffinityMultipleNodes(t *testing.T) {
}
}
func TestPreFilterDisabled(t *testing.T) {
labelRgChina := map[string]string{
"region": "China",
}
labelRgChinaAzAz1 := map[string]string{
"region": "China",
"az": "az1",
}
labelRgIndia := map[string]string{
"region": "India",
}
labelRgUS := map[string]string{
"region": "US",
}
tests := []struct {
pod *v1.Pod
pods []*v1.Pod
nodes []*v1.Node
wantStatuses []*framework.Status
name string
}{
{
pod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{"foo": "123"}},
Spec: v1.PodSpec{
Affinity: &v1.Affinity{
PodAntiAffinity: &v1.PodAntiAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{
{
LabelSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "foo",
Operator: metav1.LabelSelectorOpIn,
Values: []string{"bar"},
},
},
},
TopologyKey: "region",
},
},
},
},
},
},
pods: []*v1.Pod{
{Spec: v1.PodSpec{NodeName: "nodeA"}, ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{"foo": "bar"}}},
{
Spec: v1.PodSpec{
NodeName: "nodeC",
Affinity: &v1.Affinity{
PodAntiAffinity: &v1.PodAntiAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{
{
LabelSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "foo",
Operator: metav1.LabelSelectorOpIn,
Values: []string{"123"},
},
},
},
TopologyKey: "region",
},
},
},
},
},
},
},
nodes: []*v1.Node{
{ObjectMeta: metav1.ObjectMeta{Name: "nodeA", Labels: labelRgChina}},
{ObjectMeta: metav1.ObjectMeta{Name: "nodeB", Labels: labelRgChinaAzAz1}},
{ObjectMeta: metav1.ObjectMeta{Name: "nodeC", Labels: labelRgIndia}},
{ObjectMeta: metav1.ObjectMeta{Name: "nodeD", Labels: labelRgUS}},
},
wantStatuses: []*framework.Status{
framework.NewStatus(
framework.Unschedulable,
ErrReasonAffinityNotMatch,
ErrReasonAntiAffinityRulesNotMatch,
),
framework.NewStatus(
framework.Unschedulable,
ErrReasonAffinityNotMatch,
ErrReasonAntiAffinityRulesNotMatch,
),
framework.NewStatus(
framework.Unschedulable,
ErrReasonAffinityNotMatch,
ErrReasonExistingAntiAffinityRulesNotMatch,
),
nil,
},
name: "NodeA and nodeB have same topologyKey and label value. NodeA has an existing pod that matches the inter pod affinity rule. NodeC has an existing pod that match the inter pod affinity rule. The pod can not be scheduled onto nodeA, nodeB and nodeC but can be schedulerd onto nodeD",
},
}
for indexTest, test := range tests {
t.Run(test.name, func(t *testing.T) {
snapshot := cache.NewSnapshot(test.pods, test.nodes)
for indexNode, node := range test.nodes {
p := &InterPodAffinity{
sharedLister: snapshot,
}
state := framework.NewCycleState()
nodeInfo := mustGetNodeInfo(t, snapshot, node.Name)
gotStatus := p.Filter(context.Background(), state, test.pod, nodeInfo)
if !reflect.DeepEqual(gotStatus, test.wantStatuses[indexNode]) {
t.Errorf("index: %d status does not match: %v, want: %v", indexTest, gotStatus, test.wantStatuses[indexNode])
}
}
})
}
}
func TestPreFilterStateAddRemovePod(t *testing.T) {
var label1 = map[string]string{
"region": "r1",