Merge pull request #122627 from sanposhiho/remove-AssignedPodUpdated
take PodTopologySpread into consideration when requeueing Pods based on Pod related events
This commit is contained in:
		@@ -205,7 +205,20 @@ func (sched *Scheduler) addPodToCache(obj interface{}) {
 | 
			
		||||
		logger.Error(err, "Scheduler cache AddPod failed", "pod", klog.KObj(pod))
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// SchedulingQueue.AssignedPodAdded has a problem:
 | 
			
		||||
	// It internally pre-filters Pods to move to activeQ,
 | 
			
		||||
	// while taking only in-tree plugins into consideration.
 | 
			
		||||
	// Consequently, if custom plugins that subscribes Pod/Add events reject Pods,
 | 
			
		||||
	// those Pods will never be requeued to activeQ by an assigned Pod related events,
 | 
			
		||||
	// and they may be stuck in unschedulableQ.
 | 
			
		||||
	//
 | 
			
		||||
	// Here we use MoveAllToActiveOrBackoffQueue only when QueueingHint is enabled.
 | 
			
		||||
	// (We cannot switch to MoveAllToActiveOrBackoffQueue right away because of throughput concern.)
 | 
			
		||||
	if utilfeature.DefaultFeatureGate.Enabled(features.SchedulerQueueingHints) {
 | 
			
		||||
		sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, queue.AssignedPodAdd, nil, pod, nil)
 | 
			
		||||
	} else {
 | 
			
		||||
		sched.SchedulingQueue.AssignedPodAdded(logger, pod)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (sched *Scheduler) updatePodInCache(oldObj, newObj interface{}) {
 | 
			
		||||
@@ -226,7 +239,20 @@ func (sched *Scheduler) updatePodInCache(oldObj, newObj interface{}) {
 | 
			
		||||
		logger.Error(err, "Scheduler cache UpdatePod failed", "pod", klog.KObj(oldPod))
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// SchedulingQueue.AssignedPodUpdated has a problem:
 | 
			
		||||
	// It internally pre-filters Pods to move to activeQ,
 | 
			
		||||
	// while taking only in-tree plugins into consideration.
 | 
			
		||||
	// Consequently, if custom plugins that subscribes Pod/Update events reject Pods,
 | 
			
		||||
	// those Pods will never be requeued to activeQ by an assigned Pod related events,
 | 
			
		||||
	// and they may be stuck in unschedulableQ.
 | 
			
		||||
	//
 | 
			
		||||
	// Here we use MoveAllToActiveOrBackoffQueue only when QueueingHint is enabled.
 | 
			
		||||
	// (We cannot switch to MoveAllToActiveOrBackoffQueue right away because of throughput concern.)
 | 
			
		||||
	if utilfeature.DefaultFeatureGate.Enabled(features.SchedulerQueueingHints) {
 | 
			
		||||
		sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, queue.AssignedPodUpdate, oldPod, newPod, nil)
 | 
			
		||||
	} else {
 | 
			
		||||
		sched.SchedulingQueue.AssignedPodUpdated(logger, oldPod, newPod)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (sched *Scheduler) deletePodFromCache(obj interface{}) {
 | 
			
		||||
 
 | 
			
		||||
@@ -48,6 +48,7 @@ import (
 | 
			
		||||
	"k8s.io/kubernetes/pkg/scheduler/framework"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/scheduler/framework/plugins/interpodaffinity"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/scheduler/framework/plugins/names"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/scheduler/framework/plugins/podtopologyspread"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/scheduler/internal/heap"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/scheduler/metrics"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/scheduler/util"
 | 
			
		||||
@@ -1092,7 +1093,10 @@ func (p *PriorityQueue) Delete(pod *v1.Pod) error {
 | 
			
		||||
// may make pending pods with matching affinity terms schedulable.
 | 
			
		||||
func (p *PriorityQueue) AssignedPodAdded(logger klog.Logger, pod *v1.Pod) {
 | 
			
		||||
	p.lock.Lock()
 | 
			
		||||
	p.movePodsToActiveOrBackoffQueue(logger, p.getUnschedulablePodsWithMatchingAffinityTerm(logger, pod), AssignedPodAdd, nil, pod)
 | 
			
		||||
 | 
			
		||||
	// Pre-filter Pods to move by getUnschedulablePodsWithCrossTopologyTerm
 | 
			
		||||
	// because Pod related events shouldn't make Pods that rejected by single-node scheduling requirement schedulable.
 | 
			
		||||
	p.movePodsToActiveOrBackoffQueue(logger, p.getUnschedulablePodsWithCrossTopologyTerm(logger, pod), AssignedPodAdd, nil, pod)
 | 
			
		||||
	p.lock.Unlock()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@@ -1115,9 +1119,13 @@ func isPodResourcesResizedDown(pod *v1.Pod) bool {
 | 
			
		||||
func (p *PriorityQueue) AssignedPodUpdated(logger klog.Logger, oldPod, newPod *v1.Pod) {
 | 
			
		||||
	p.lock.Lock()
 | 
			
		||||
	if isPodResourcesResizedDown(newPod) {
 | 
			
		||||
		// In this case, we don't want to pre-filter Pods by getUnschedulablePodsWithCrossTopologyTerm
 | 
			
		||||
		// because Pod related events may make Pods that were rejected by NodeResourceFit schedulable.
 | 
			
		||||
		p.moveAllToActiveOrBackoffQueue(logger, AssignedPodUpdate, oldPod, newPod, nil)
 | 
			
		||||
	} else {
 | 
			
		||||
		p.movePodsToActiveOrBackoffQueue(logger, p.getUnschedulablePodsWithMatchingAffinityTerm(logger, newPod), AssignedPodUpdate, oldPod, newPod)
 | 
			
		||||
		// Pre-filter Pods to move by getUnschedulablePodsWithCrossTopologyTerm
 | 
			
		||||
		// because Pod related events only make Pods rejected by cross topology term schedulable.
 | 
			
		||||
		p.movePodsToActiveOrBackoffQueue(logger, p.getUnschedulablePodsWithCrossTopologyTerm(logger, newPod), AssignedPodUpdate, oldPod, newPod)
 | 
			
		||||
	}
 | 
			
		||||
	p.lock.Unlock()
 | 
			
		||||
}
 | 
			
		||||
@@ -1258,22 +1266,29 @@ func (p *PriorityQueue) movePodsToActiveOrBackoffQueue(logger klog.Logger, podIn
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// getUnschedulablePodsWithMatchingAffinityTerm returns unschedulable pods which have
 | 
			
		||||
// any affinity term that matches "pod".
 | 
			
		||||
// getUnschedulablePodsWithCrossTopologyTerm returns unschedulable pods which either of following conditions is met:
 | 
			
		||||
// - have any affinity term that matches "pod".
 | 
			
		||||
// - rejected by PodTopologySpread plugin.
 | 
			
		||||
// NOTE: this function assumes lock has been acquired in caller.
 | 
			
		||||
func (p *PriorityQueue) getUnschedulablePodsWithMatchingAffinityTerm(logger klog.Logger, pod *v1.Pod) []*framework.QueuedPodInfo {
 | 
			
		||||
func (p *PriorityQueue) getUnschedulablePodsWithCrossTopologyTerm(logger klog.Logger, pod *v1.Pod) []*framework.QueuedPodInfo {
 | 
			
		||||
	nsLabels := interpodaffinity.GetNamespaceLabelsSnapshot(logger, pod.Namespace, p.nsLister)
 | 
			
		||||
 | 
			
		||||
	var podsToMove []*framework.QueuedPodInfo
 | 
			
		||||
	for _, pInfo := range p.unschedulablePods.podInfoMap {
 | 
			
		||||
		if pInfo.UnschedulablePlugins.Has(podtopologyspread.Name) && pod.Namespace == pInfo.Pod.Namespace {
 | 
			
		||||
			// This Pod may be schedulable now by this Pod event.
 | 
			
		||||
			podsToMove = append(podsToMove, pInfo)
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		for _, term := range pInfo.RequiredAffinityTerms {
 | 
			
		||||
			if term.Matches(pod, nsLabels) {
 | 
			
		||||
				podsToMove = append(podsToMove, pInfo)
 | 
			
		||||
				break
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return podsToMove
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -1932,14 +1932,57 @@ func expectInFlightPods(t *testing.T, q *PriorityQueue, uids ...types.UID) {
 | 
			
		||||
// TestPriorityQueue_AssignedPodAdded tests AssignedPodAdded. It checks that
 | 
			
		||||
// when a pod with pod affinity is in unschedulablePods and another pod with a
 | 
			
		||||
// matching label is added, the unschedulable pod is moved to activeQ.
 | 
			
		||||
func TestPriorityQueue_AssignedPodAdded(t *testing.T) {
 | 
			
		||||
func TestPriorityQueue_AssignedPodAdded_(t *testing.T) {
 | 
			
		||||
	tests := []struct {
 | 
			
		||||
		name               string
 | 
			
		||||
		unschedPod         *v1.Pod
 | 
			
		||||
		unschedPlugin      string
 | 
			
		||||
		updatedAssignedPod *v1.Pod
 | 
			
		||||
		wantToRequeue      bool
 | 
			
		||||
	}{
 | 
			
		||||
		{
 | 
			
		||||
			name:               "Pod rejected by pod affinity is requeued with matching Pod's update",
 | 
			
		||||
			unschedPod:         st.MakePod().Name("afp").Namespace("ns1").UID("afp").Annotation("annot2", "val2").PodAffinityExists("service", "region", st.PodAffinityWithRequiredReq).Obj(),
 | 
			
		||||
			unschedPlugin:      names.InterPodAffinity,
 | 
			
		||||
			updatedAssignedPod: st.MakePod().Name("lbp").Namespace("ns1").Label("service", "securityscan").Node("node1").Obj(),
 | 
			
		||||
			wantToRequeue:      true,
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			name:               "Pod rejected by pod affinity isn't requeued with unrelated Pod's update",
 | 
			
		||||
			unschedPod:         st.MakePod().Name("afp").Namespace("ns1").UID("afp").Annotation("annot2", "val2").PodAffinityExists("service", "region", st.PodAffinityWithRequiredReq).Obj(),
 | 
			
		||||
			unschedPlugin:      names.InterPodAffinity,
 | 
			
		||||
			updatedAssignedPod: st.MakePod().Name("lbp").Namespace("unrelated").Label("unrelated", "unrelated").Node("node1").Obj(),
 | 
			
		||||
			wantToRequeue:      false,
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			name:               "Pod rejected by pod topology spread is requeued with Pod's update in the same namespace",
 | 
			
		||||
			unschedPod:         st.MakePod().Name("tsp").Namespace("ns1").UID("tsp").SpreadConstraint(1, "node", v1.DoNotSchedule, nil, nil, nil, nil, nil).Obj(),
 | 
			
		||||
			unschedPlugin:      names.PodTopologySpread,
 | 
			
		||||
			updatedAssignedPod: st.MakePod().Name("lbp").Namespace("ns1").Label("service", "securityscan").Node("node1").Obj(),
 | 
			
		||||
			wantToRequeue:      true,
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			name:               "Pod rejected by pod topology spread isn't requeued with unrelated Pod's update",
 | 
			
		||||
			unschedPod:         st.MakePod().Name("afp").Namespace("ns1").UID("afp").Annotation("annot2", "val2").PodAffinityExists("service", "region", st.PodAffinityWithRequiredReq).Obj(),
 | 
			
		||||
			unschedPlugin:      names.PodTopologySpread,
 | 
			
		||||
			updatedAssignedPod: st.MakePod().Name("lbp").Namespace("unrelated").Label("unrelated", "unrelated").Node("node1").Obj(),
 | 
			
		||||
			wantToRequeue:      false,
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			name:               "Pod rejected by other plugins isn't requeued with any Pod's update",
 | 
			
		||||
			unschedPod:         st.MakePod().Name("afp").Namespace("ns1").UID("afp").Annotation("annot2", "val2").Obj(),
 | 
			
		||||
			unschedPlugin:      "fakePlugin",
 | 
			
		||||
			updatedAssignedPod: st.MakePod().Name("lbp").Namespace("unrelated").Label("unrelated", "unrelated").Node("node1").Obj(),
 | 
			
		||||
			wantToRequeue:      false,
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for _, tt := range tests {
 | 
			
		||||
		t.Run(tt.name, func(t *testing.T) {
 | 
			
		||||
			logger, ctx := ktesting.NewTestContext(t)
 | 
			
		||||
			ctx, cancel := context.WithCancel(ctx)
 | 
			
		||||
			defer cancel()
 | 
			
		||||
 | 
			
		||||
	affinityPod := st.MakePod().Name("afp").Namespace("ns1").UID("afp").Annotation("annot2", "val2").Priority(mediumPriority).NominatedNodeName("node1").PodAffinityExists("service", "region", st.PodAffinityWithRequiredReq).Obj()
 | 
			
		||||
	labelPod := st.MakePod().Name("lbp").Namespace(affinityPod.Namespace).Label("service", "securityscan").Node("node1").Obj()
 | 
			
		||||
 | 
			
		||||
			c := testingclock.NewFakeClock(time.Now())
 | 
			
		||||
			m := makeEmptyQueueingHintMapPerProfile()
 | 
			
		||||
			m[""][AssignedPodAdd] = []*QueueingHintFunction{
 | 
			
		||||
@@ -1947,41 +1990,39 @@ func TestPriorityQueue_AssignedPodAdded(t *testing.T) {
 | 
			
		||||
					PluginName:     "fakePlugin",
 | 
			
		||||
					QueueingHintFn: queueHintReturnQueue,
 | 
			
		||||
				},
 | 
			
		||||
				{
 | 
			
		||||
					PluginName:     names.InterPodAffinity,
 | 
			
		||||
					QueueingHintFn: queueHintReturnQueue,
 | 
			
		||||
				},
 | 
			
		||||
				{
 | 
			
		||||
					PluginName:     names.PodTopologySpread,
 | 
			
		||||
					QueueingHintFn: queueHintReturnQueue,
 | 
			
		||||
				},
 | 
			
		||||
			}
 | 
			
		||||
			q := NewTestQueue(ctx, newDefaultQueueSort(), WithClock(c), WithQueueingHintMapPerProfile(m))
 | 
			
		||||
 | 
			
		||||
			// To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent()s below.
 | 
			
		||||
	q.activeQ.Add(q.newQueuedPodInfo(unschedulablePodInfo.Pod))
 | 
			
		||||
	if p, err := q.Pop(logger); err != nil || p.Pod != unschedulablePodInfo.Pod {
 | 
			
		||||
		t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePodInfo.Pod.Name, p.Pod.Name)
 | 
			
		||||
			if err := q.activeQ.Add(q.newQueuedPodInfo(tt.unschedPod)); err != nil {
 | 
			
		||||
				t.Errorf("failed to add pod to activeQ: %v", err)
 | 
			
		||||
			}
 | 
			
		||||
	q.activeQ.Add(q.newQueuedPodInfo(affinityPod))
 | 
			
		||||
	if p, err := q.Pop(logger); err != nil || p.Pod != affinityPod {
 | 
			
		||||
		t.Errorf("Expected: %v after Pop, but got: %v", affinityPod.Name, p.Pod.Name)
 | 
			
		||||
			if p, err := q.Pop(logger); err != nil || p.Pod != tt.unschedPod {
 | 
			
		||||
				t.Errorf("Expected: %v after Pop, but got: %v", tt.unschedPod.Name, p.Pod.Name)
 | 
			
		||||
			}
 | 
			
		||||
	q.Add(logger, medPriorityPodInfo.Pod)
 | 
			
		||||
	err := q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(unschedulablePodInfo.Pod, "fakePlugin"), q.SchedulingCycle())
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatalf("unexpected error from AddUnschedulableIfNotPresent: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
	err = q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(affinityPod, "fakePlugin"), q.SchedulingCycle())
 | 
			
		||||
 | 
			
		||||
			err := q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(tt.unschedPod, tt.unschedPlugin), q.SchedulingCycle())
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				t.Fatalf("unexpected error from AddUnschedulableIfNotPresent: %v", err)
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			// Move clock to make the unschedulable pods complete backoff.
 | 
			
		||||
			c.Step(DefaultPodInitialBackoffDuration + time.Second)
 | 
			
		||||
	// Simulate addition of an assigned pod. The pod has matching labels for
 | 
			
		||||
	// affinityPod. So, affinityPod should go to activeQ.
 | 
			
		||||
	q.AssignedPodAdded(logger, labelPod)
 | 
			
		||||
	if getUnschedulablePod(q, affinityPod) != nil {
 | 
			
		||||
		t.Error("affinityPod is still in the unschedulablePods.")
 | 
			
		||||
 | 
			
		||||
			q.AssignedPodAdded(logger, tt.updatedAssignedPod)
 | 
			
		||||
 | 
			
		||||
			if _, exists, _ := q.activeQ.Get(newQueuedPodInfoForLookup(tt.unschedPod)); exists != tt.wantToRequeue {
 | 
			
		||||
				t.Fatalf("unexpected Pod move: Pod should be requeued: %v. Pod is actually requeued: %v", tt.wantToRequeue, exists)
 | 
			
		||||
			}
 | 
			
		||||
	if _, exists, _ := q.activeQ.Get(newQueuedPodInfoForLookup(affinityPod)); !exists {
 | 
			
		||||
		t.Error("affinityPod is not moved to activeQ.")
 | 
			
		||||
	}
 | 
			
		||||
	// Check that the other pod is still in the unschedulablePods.
 | 
			
		||||
	if getUnschedulablePod(q, unschedulablePodInfo.Pod) == nil {
 | 
			
		||||
		t.Error("unschedulablePodInfo is not in the unschedulablePods.")
 | 
			
		||||
		})
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -53,6 +53,7 @@ import (
 | 
			
		||||
	testutils "k8s.io/kubernetes/test/integration/util"
 | 
			
		||||
	imageutils "k8s.io/kubernetes/test/utils/image"
 | 
			
		||||
	"k8s.io/utils/pointer"
 | 
			
		||||
	"k8s.io/utils/ptr"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func TestSchedulingGates(t *testing.T) {
 | 
			
		||||
@@ -362,6 +363,25 @@ func TestCoreResourceEnqueue(t *testing.T) {
 | 
			
		||||
			},
 | 
			
		||||
			wantRequeuedPods: sets.New("pod1"),
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			name:         "Pods with PodTopologySpread should be requeued when a Pod with matching label is scheduled",
 | 
			
		||||
			initialNodes: []*v1.Node{st.MakeNode().Name("fake-node").Label("node", "fake-node").Capacity(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Obj()},
 | 
			
		||||
			initialPod:   st.MakePod().Name("pod1").Label("key", "val").Container("image").Node("fake-node").Obj(),
 | 
			
		||||
			pods: []*v1.Pod{
 | 
			
		||||
				// - Pod2 will be rejected by the PodTopologySpread plugin.
 | 
			
		||||
				st.MakePod().Name("pod2").Label("key", "val").SpreadConstraint(1, "node", v1.DoNotSchedule, st.MakeLabelSelector().Exists("key").Obj(), ptr.To(int32(3)), nil, nil, nil).Container("image").Obj(),
 | 
			
		||||
			},
 | 
			
		||||
			triggerFn: func(testCtx *testutils.TestContext) error {
 | 
			
		||||
				// Trigger an assigned Pod add event.
 | 
			
		||||
				pod := st.MakePod().Name("pod3").Label("key", "val").Node("fake-node").Container("image").Obj()
 | 
			
		||||
				if _, err := testCtx.ClientSet.CoreV1().Pods(testCtx.NS.Name).Create(testCtx.Ctx, pod, metav1.CreateOptions{}); err != nil {
 | 
			
		||||
					return fmt.Errorf("failed to create Pod %q: %w", pod.Name, err)
 | 
			
		||||
				}
 | 
			
		||||
 | 
			
		||||
				return nil
 | 
			
		||||
			},
 | 
			
		||||
			wantRequeuedPods: sets.New("pod2"),
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for _, tt := range tests {
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user