fix(scheduling queue): ignore events that interest no registered plugin
This commit is contained in:
		| @@ -409,6 +409,26 @@ const ( | ||||
| 	queueImmediately | ||||
| ) | ||||
|  | ||||
| // isEventOfInterest returns true if the event is of interest by some plugins. | ||||
| func (p *PriorityQueue) isEventOfInterest(logger klog.Logger, event framework.ClusterEvent) bool { | ||||
| 	if event.IsWildCard() { | ||||
| 		return true | ||||
| 	} | ||||
|  | ||||
| 	for _, hintMap := range p.queueingHintMap { | ||||
| 		for eventToMatch := range hintMap { | ||||
| 			if eventToMatch.Match(event) { | ||||
| 				// This event is interested by some plugins. | ||||
| 				return true | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	logger.V(6).Info("receive an event that isn't interested by any enabled plugins", "event", event) | ||||
|  | ||||
| 	return false | ||||
| } | ||||
|  | ||||
| // isPodWorthRequeuing calls QueueingHintFn of only plugins registered in pInfo.unschedulablePlugins and pInfo.PendingPlugins. | ||||
| // | ||||
| // If any of pInfo.PendingPlugins return Queue, | ||||
| @@ -1077,6 +1097,12 @@ func (p *PriorityQueue) AssignedPodUpdated(logger klog.Logger, oldPod, newPod *v | ||||
| // if Pop() is waiting for an item, it receives the signal after all the pods are in the | ||||
| // queue and the head is the highest priority pod. | ||||
| func (p *PriorityQueue) moveAllToActiveOrBackoffQueue(logger klog.Logger, event framework.ClusterEvent, oldObj, newObj interface{}, preCheck PreEnqueueCheck) { | ||||
| 	if !p.isEventOfInterest(logger, event) { | ||||
| 		// No plugin is interested in this event. | ||||
| 		// Return early before iterating all pods in unschedulablePods for preCheck. | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	unschedulablePods := make([]*framework.QueuedPodInfo, 0, len(p.unschedulablePods.podInfoMap)) | ||||
| 	for _, pInfo := range p.unschedulablePods.podInfoMap { | ||||
| 		if preCheck == nil || preCheck(pInfo.Pod) { | ||||
| @@ -1141,6 +1167,11 @@ func (p *PriorityQueue) requeuePodViaQueueingHint(logger klog.Logger, pInfo *fra | ||||
|  | ||||
| // NOTE: this function assumes lock has been acquired in caller | ||||
| func (p *PriorityQueue) movePodsToActiveOrBackoffQueue(logger klog.Logger, podInfoList []*framework.QueuedPodInfo, event framework.ClusterEvent, oldObj, newObj interface{}) { | ||||
| 	if !p.isEventOfInterest(logger, event) { | ||||
| 		// No plugin is interested in this event. | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	activated := false | ||||
| 	for _, pInfo := range podInfoList { | ||||
| 		schedulingHint := p.isPodWorthRequeuing(logger, pInfo, event, oldObj, newObj) | ||||
|   | ||||
| @@ -56,9 +56,7 @@ const queueMetricMetadata = ` | ||||
| 	` | ||||
|  | ||||
| var ( | ||||
| 	TestEvent    = framework.ClusterEvent{Resource: "test"} | ||||
| 	NodeAllEvent = framework.ClusterEvent{Resource: framework.Node, ActionType: framework.All} | ||||
| 	EmptyEvent   = framework.ClusterEvent{} | ||||
|  | ||||
| 	lowPriority, midPriority, highPriority = int32(0), int32(100), int32(1000) | ||||
| 	mediumPriority                         = (lowPriority + highPriority) / 2 | ||||
| @@ -223,9 +221,175 @@ func Test_InFlightPods(t *testing.T) { | ||||
| 			}, | ||||
| 			wantInFlightPods:   nil, | ||||
| 			wantInFlightEvents: nil, | ||||
| 			queueingHintMap: QueueingHintMapPerProfile{ | ||||
| 				"": { | ||||
| 					PvAdd: { | ||||
| 						{ | ||||
| 							PluginName:     "fooPlugin1", | ||||
| 							QueueingHintFn: queueHintReturnQueue, | ||||
| 						}, | ||||
| 					}, | ||||
| 				}, | ||||
| 			}, | ||||
| 		}, | ||||
| 		{ | ||||
| 			name:        "when SchedulingQueueHint is disabled, which queue to enqueue Pod should be decided without SchedulingQueueHint", | ||||
| 			name:                         "Pod and interested events are registered in inFlightPods/inFlightEvents", | ||||
| 			isSchedulingQueueHintEnabled: true, | ||||
| 			initialPods:                  []*v1.Pod{pod}, | ||||
| 			actions: []action{ | ||||
| 				// This won't be added to inFlightEvents because no inFlightPods at this point. | ||||
| 				{eventHappens: &PvcAdd}, | ||||
| 				{podPopped: pod}, | ||||
| 				// This gets added for the pod. | ||||
| 				{eventHappens: &PvAdd}, | ||||
| 				// This doesn't get added because no plugin is interested in PvUpdate. | ||||
| 				{eventHappens: &PvUpdate}, | ||||
| 			}, | ||||
| 			wantInFlightPods:   []*v1.Pod{pod}, | ||||
| 			wantInFlightEvents: []interface{}{pod, PvAdd}, | ||||
| 			queueingHintMap: QueueingHintMapPerProfile{ | ||||
| 				"": { | ||||
| 					PvAdd: { | ||||
| 						{ | ||||
| 							PluginName:     "fooPlugin1", | ||||
| 							QueueingHintFn: queueHintReturnQueue, | ||||
| 						}, | ||||
| 					}, | ||||
| 				}, | ||||
| 			}, | ||||
| 		}, | ||||
| 		{ | ||||
| 			name:                         "Pod, registered in inFlightPods, is enqueued back to activeQ", | ||||
| 			isSchedulingQueueHintEnabled: true, | ||||
| 			initialPods:                  []*v1.Pod{pod, pod2}, | ||||
| 			actions: []action{ | ||||
| 				// This won't be added to inFlightEvents because no inFlightPods at this point. | ||||
| 				{eventHappens: &PvcAdd}, | ||||
| 				{podPopped: pod}, | ||||
| 				{eventHappens: &PvAdd}, | ||||
| 				{podPopped: pod2}, | ||||
| 				{eventHappens: &NodeAdd}, | ||||
| 				// This pod will be requeued to backoffQ because no plugin is registered as unschedulable plugin. | ||||
| 				{podEnqueued: newQueuedPodInfoForLookup(pod)}, | ||||
| 			}, | ||||
| 			wantBackoffQPodNames: []string{"targetpod"}, | ||||
| 			wantInFlightPods:     []*v1.Pod{pod2}, // only pod2 is registered because pod is already enqueued back. | ||||
| 			wantInFlightEvents:   []interface{}{pod2, NodeAdd}, | ||||
| 			queueingHintMap: QueueingHintMapPerProfile{ | ||||
| 				"": { | ||||
| 					PvAdd: { | ||||
| 						{ | ||||
| 							PluginName:     "fooPlugin1", | ||||
| 							QueueingHintFn: queueHintReturnQueue, | ||||
| 						}, | ||||
| 					}, | ||||
| 					NodeAdd: { | ||||
| 						{ | ||||
| 							PluginName:     "fooPlugin1", | ||||
| 							QueueingHintFn: queueHintReturnQueue, | ||||
| 						}, | ||||
| 					}, | ||||
| 					PvcAdd: { | ||||
| 						{ | ||||
| 							PluginName:     "fooPlugin1", | ||||
| 							QueueingHintFn: queueHintReturnQueue, | ||||
| 						}, | ||||
| 					}, | ||||
| 				}, | ||||
| 			}, | ||||
| 		}, | ||||
| 		{ | ||||
| 			name:                         "All Pods registered in inFlightPods are enqueued back to activeQ", | ||||
| 			isSchedulingQueueHintEnabled: true, | ||||
| 			initialPods:                  []*v1.Pod{pod, pod2}, | ||||
| 			actions: []action{ | ||||
| 				// This won't be added to inFlightEvents because no inFlightPods at this point. | ||||
| 				{eventHappens: &PvcAdd}, | ||||
| 				{podPopped: pod}, | ||||
| 				{eventHappens: &PvAdd}, | ||||
| 				{podPopped: pod2}, | ||||
| 				{eventHappens: &NodeAdd}, | ||||
| 				// This pod will be requeued to backoffQ because no plugin is registered as unschedulable plugin. | ||||
| 				{podEnqueued: newQueuedPodInfoForLookup(pod)}, | ||||
| 				{eventHappens: &CSINodeUpdate}, | ||||
| 				// This pod will be requeued to backoffQ because no plugin is registered as unschedulable plugin. | ||||
| 				{podEnqueued: newQueuedPodInfoForLookup(pod2)}, | ||||
| 			}, | ||||
| 			wantBackoffQPodNames: []string{"targetpod", "targetpod2"}, | ||||
| 			wantInFlightPods:     nil, // empty | ||||
| 			queueingHintMap: QueueingHintMapPerProfile{ | ||||
| 				"": { | ||||
| 					PvAdd: { | ||||
| 						{ | ||||
| 							PluginName:     "fooPlugin1", | ||||
| 							QueueingHintFn: queueHintReturnQueue, | ||||
| 						}, | ||||
| 					}, | ||||
| 					NodeAdd: { | ||||
| 						{ | ||||
| 							PluginName:     "fooPlugin1", | ||||
| 							QueueingHintFn: queueHintReturnQueue, | ||||
| 						}, | ||||
| 					}, | ||||
| 					PvcAdd: { | ||||
| 						{ | ||||
| 							PluginName:     "fooPlugin1", | ||||
| 							QueueingHintFn: queueHintReturnQueue, | ||||
| 						}, | ||||
| 					}, | ||||
| 					CSINodeUpdate: { | ||||
| 						{ | ||||
| 							PluginName:     "fooPlugin1", | ||||
| 							QueueingHintFn: queueHintReturnQueue, | ||||
| 						}, | ||||
| 					}, | ||||
| 				}, | ||||
| 			}, | ||||
| 		}, | ||||
| 		{ | ||||
| 			name:                         "One intermediate Pod registered in inFlightPods is enqueued back to activeQ", | ||||
| 			isSchedulingQueueHintEnabled: true, | ||||
| 			initialPods:                  []*v1.Pod{pod, pod2, pod3}, | ||||
| 			actions: []action{ | ||||
| 				// This won't be added to inFlightEvents because no inFlightPods at this point. | ||||
| 				{eventHappens: &PvcAdd}, | ||||
| 				{podPopped: pod}, | ||||
| 				{eventHappens: &PvAdd}, | ||||
| 				{podPopped: pod2}, | ||||
| 				{eventHappens: &NodeAdd}, | ||||
| 				// This Pod won't be requeued again. | ||||
| 				{podPopped: pod3}, | ||||
| 				{eventHappens: &AssignedPodAdd}, | ||||
| 				{podEnqueued: newQueuedPodInfoForLookup(pod2)}, | ||||
| 			}, | ||||
| 			wantBackoffQPodNames: []string{"targetpod2"}, | ||||
| 			wantInFlightPods:     []*v1.Pod{pod, pod3}, | ||||
| 			wantInFlightEvents:   []interface{}{pod, PvAdd, NodeAdd, pod3, AssignedPodAdd}, | ||||
| 			queueingHintMap: QueueingHintMapPerProfile{ | ||||
| 				"": { | ||||
| 					PvAdd: { | ||||
| 						{ | ||||
| 							PluginName:     "fooPlugin1", | ||||
| 							QueueingHintFn: queueHintReturnQueue, | ||||
| 						}, | ||||
| 					}, | ||||
| 					NodeAdd: { | ||||
| 						{ | ||||
| 							PluginName:     "fooPlugin1", | ||||
| 							QueueingHintFn: queueHintReturnQueue, | ||||
| 						}, | ||||
| 					}, | ||||
| 					AssignedPodAdd: { | ||||
| 						{ | ||||
| 							PluginName:     "fooPlugin1", | ||||
| 							QueueingHintFn: queueHintReturnQueue, | ||||
| 						}, | ||||
| 					}, | ||||
| 				}, | ||||
| 			}, | ||||
| 		}, | ||||
| 		{ | ||||
| 			name:        "pod is enqueued to queue without QueueingHint when SchedulingQueueHint is disabled", | ||||
| 			initialPods: []*v1.Pod{pod}, | ||||
| 			actions: []action{ | ||||
| 				{podPopped: pod}, | ||||
| @@ -249,83 +413,14 @@ func Test_InFlightPods(t *testing.T) { | ||||
| 			}, | ||||
| 		}, | ||||
| 		{ | ||||
| 			name:                         "Pod is registered in inFlightPods when Pod is popped from activeQ", | ||||
| 			isSchedulingQueueHintEnabled: true, | ||||
| 			initialPods:                  []*v1.Pod{pod}, | ||||
| 			actions: []action{ | ||||
| 				// This won't be added to inFlightEvents because no inFlightPods at this point. | ||||
| 				{eventHappens: &PvcAdd}, | ||||
| 				{podPopped: pod}, | ||||
| 				// This gets added for the pod. | ||||
| 				{eventHappens: &PvAdd}, | ||||
| 			}, | ||||
| 			wantInFlightPods:   []*v1.Pod{pod}, | ||||
| 			wantInFlightEvents: []interface{}{pod, PvAdd}, | ||||
| 		}, | ||||
| 		{ | ||||
| 			name:                         "Pod, registered in inFlightPods, is enqueued back to activeQ", | ||||
| 			isSchedulingQueueHintEnabled: true, | ||||
| 			initialPods:                  []*v1.Pod{pod, pod2}, | ||||
| 			actions: []action{ | ||||
| 				// This won't be added to inFlightEvents because no inFlightPods at this point. | ||||
| 				{eventHappens: &PvcAdd}, | ||||
| 				{podPopped: pod}, | ||||
| 				{eventHappens: &PvAdd}, | ||||
| 				{podPopped: pod2}, | ||||
| 				{eventHappens: &NodeAdd}, | ||||
| 				{podEnqueued: newQueuedPodInfoForLookup(pod)}, | ||||
| 			}, | ||||
| 			wantBackoffQPodNames: []string{"targetpod"}, | ||||
| 			wantInFlightPods:     []*v1.Pod{pod2}, | ||||
| 			wantInFlightEvents:   []interface{}{pod2, NodeAdd}, | ||||
| 		}, | ||||
| 		{ | ||||
| 			name:                         "All Pods registered in inFlightPods are enqueued back to activeQ", | ||||
| 			isSchedulingQueueHintEnabled: true, | ||||
| 			initialPods:                  []*v1.Pod{pod, pod2}, | ||||
| 			actions: []action{ | ||||
| 				// This won't be added to inFlightEvents because no inFlightPods at this point. | ||||
| 				{eventHappens: &PvcAdd}, | ||||
| 				{podPopped: pod}, | ||||
| 				{eventHappens: &PvAdd}, | ||||
| 				{podPopped: pod2}, | ||||
| 				{eventHappens: &NodeAdd}, | ||||
| 				{podEnqueued: newQueuedPodInfoForLookup(pod)}, | ||||
| 				{eventHappens: &CSINodeUpdate}, | ||||
| 				{podEnqueued: newQueuedPodInfoForLookup(pod2)}, | ||||
| 			}, | ||||
| 			wantBackoffQPodNames: []string{"targetpod", "targetpod2"}, | ||||
| 			wantInFlightPods:     nil, | ||||
| 			wantInFlightEvents:   nil, | ||||
| 		}, | ||||
| 		{ | ||||
| 			name:                         "One intermediate Pod registered in inFlightPods is enqueued back to activeQ", | ||||
| 			isSchedulingQueueHintEnabled: true, | ||||
| 			initialPods:                  []*v1.Pod{pod, pod2, pod3}, | ||||
| 			actions: []action{ | ||||
| 				// This won't be added to inFlightEvents because no inFlightPods at this point. | ||||
| 				{eventHappens: &PvcAdd}, | ||||
| 				{podPopped: pod}, | ||||
| 				{eventHappens: &PvAdd}, | ||||
| 				{podPopped: pod2}, | ||||
| 				{eventHappens: &NodeAdd}, | ||||
| 				// This Pod won't be requeued again. | ||||
| 				{podPopped: pod3}, | ||||
| 				{eventHappens: &AssignedPodAdd}, | ||||
| 				{podEnqueued: newQueuedPodInfoForLookup(pod2)}, | ||||
| 			}, | ||||
| 			wantBackoffQPodNames: []string{"targetpod2"}, | ||||
| 			wantInFlightPods:     []*v1.Pod{pod, pod3}, | ||||
| 			wantInFlightEvents:   []interface{}{pod, PvAdd, NodeAdd, pod3, AssignedPodAdd}, | ||||
| 		}, | ||||
| 		{ | ||||
| 			name:                         "events before popping Pod are ignored", | ||||
| 			name:                         "events before popping Pod are ignored when Pod is enqueued back to queue", | ||||
| 			isSchedulingQueueHintEnabled: true, | ||||
| 			initialPods:                  []*v1.Pod{pod}, | ||||
| 			actions: []action{ | ||||
| 				{eventHappens: &WildCardEvent}, | ||||
| 				{podPopped: pod}, | ||||
| 				{eventHappens: &AssignedPodAdd}, | ||||
| 				// This Pod won't be requeued to activeQ/backoffQ because fooPlugin1 returns QueueSkip. | ||||
| 				{podEnqueued: newQueuedPodInfoForLookup(pod, "fooPlugin1")}, | ||||
| 			}, | ||||
| 			wantUnschedPodPoolPodNames: []string{"targetpod"}, | ||||
| @@ -1854,7 +1949,7 @@ func TestPriorityQueue_PendingPods(t *testing.T) { | ||||
| 		t.Errorf("Unexpected pending pods summary: want %v, but got %v.", wantSummary, gotSummary) | ||||
| 	} | ||||
| 	// Move all to active queue. We should still see the same set of pods. | ||||
| 	q.MoveAllToActiveOrBackoffQueue(logger, TestEvent, nil, nil, nil) | ||||
| 	q.MoveAllToActiveOrBackoffQueue(logger, WildCardEvent, nil, nil, nil) | ||||
| 	gotPods, gotSummary = q.PendingPods() | ||||
| 	if diff := cmp.Diff(expectedSet, makeSet(gotPods)); diff != "" { | ||||
| 		t.Errorf("Unexpected list of pending Pods (-want, +got):\n%s", diff) | ||||
| @@ -2271,7 +2366,7 @@ func TestHighPriorityBackoff(t *testing.T) { | ||||
| 		t.Fatalf("unexpected error from AddUnschedulableIfNotPresent: %v", err) | ||||
| 	} | ||||
| 	// Move all unschedulable pods to the active queue. | ||||
| 	q.MoveAllToActiveOrBackoffQueue(logger, TestEvent, nil, nil, nil) | ||||
| 	q.MoveAllToActiveOrBackoffQueue(logger, WildCardEvent, nil, nil, nil) | ||||
|  | ||||
| 	p, err = q.Pop(logger) | ||||
| 	if err != nil { | ||||
| @@ -3182,22 +3277,26 @@ func TestMoveAllToActiveOrBackoffQueue_PreEnqueueChecks(t *testing.T) { | ||||
| 		name            string | ||||
| 		preEnqueueCheck PreEnqueueCheck | ||||
| 		podInfos        []*framework.QueuedPodInfo | ||||
| 		event           framework.ClusterEvent | ||||
| 		want            []string | ||||
| 	}{ | ||||
| 		{ | ||||
| 			name:     "nil PreEnqueueCheck", | ||||
| 			podInfos: podInfos, | ||||
| 			event:    WildCardEvent, | ||||
| 			want:     []string{"p0", "p1", "p2", "p3", "p4"}, | ||||
| 		}, | ||||
| 		{ | ||||
| 			name:            "move Pods with priority greater than 2", | ||||
| 			podInfos:        podInfos, | ||||
| 			event:           WildCardEvent, | ||||
| 			preEnqueueCheck: func(pod *v1.Pod) bool { return *pod.Spec.Priority >= 2 }, | ||||
| 			want:            []string{"p2", "p3", "p4"}, | ||||
| 		}, | ||||
| 		{ | ||||
| 			name:     "move Pods with even priority and greater than 2", | ||||
| 			podInfos: podInfos, | ||||
| 			event:    WildCardEvent, | ||||
| 			preEnqueueCheck: func(pod *v1.Pod) bool { | ||||
| 				return *pod.Spec.Priority%2 == 0 && *pod.Spec.Priority >= 2 | ||||
| 			}, | ||||
| @@ -3206,10 +3305,19 @@ func TestMoveAllToActiveOrBackoffQueue_PreEnqueueChecks(t *testing.T) { | ||||
| 		{ | ||||
| 			name:     "move Pods with even and negative priority", | ||||
| 			podInfos: podInfos, | ||||
| 			event:    WildCardEvent, | ||||
| 			preEnqueueCheck: func(pod *v1.Pod) bool { | ||||
| 				return *pod.Spec.Priority%2 == 0 && *pod.Spec.Priority < 0 | ||||
| 			}, | ||||
| 		}, | ||||
| 		{ | ||||
| 			name:     "preCheck isn't called if the event is not interested by any plugins", | ||||
| 			podInfos: podInfos, | ||||
| 			event:    PvAdd, // No plugin is interested in this event. | ||||
| 			preEnqueueCheck: func(pod *v1.Pod) bool { | ||||
| 				panic("preCheck shouldn't be called") | ||||
| 			}, | ||||
| 		}, | ||||
| 	} | ||||
|  | ||||
| 	for _, tt := range tests { | ||||
| @@ -3236,7 +3344,7 @@ func TestMoveAllToActiveOrBackoffQueue_PreEnqueueChecks(t *testing.T) { | ||||
| 				// See: https://github.com/golang/go/issues/8687 | ||||
| 				podInfo.Timestamp = podInfo.Timestamp.Add(time.Duration((i - len(tt.podInfos))) * time.Millisecond) | ||||
| 			} | ||||
| 			q.MoveAllToActiveOrBackoffQueue(logger, TestEvent, nil, nil, tt.preEnqueueCheck) | ||||
| 			q.MoveAllToActiveOrBackoffQueue(logger, tt.event, nil, nil, tt.preEnqueueCheck) | ||||
| 			var got []string | ||||
| 			for q.podBackoffQ.Len() != 0 { | ||||
| 				obj, err := q.podBackoffQ.Pop() | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 Kensei Nakada
					Kensei Nakada