fix: skip isPodWorthRequeuing only when SchedulingGates gates the pod

This commit is contained in:
Kensei Nakada
2024-06-15 22:36:42 +00:00
parent c3689b9f8b
commit dd3af9a85b
3 changed files with 20 additions and 9 deletions

View File

@@ -47,6 +47,7 @@ import (
"k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/interpodaffinity" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/interpodaffinity"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/names"
"k8s.io/kubernetes/pkg/scheduler/internal/heap" "k8s.io/kubernetes/pkg/scheduler/internal/heap"
"k8s.io/kubernetes/pkg/scheduler/metrics" "k8s.io/kubernetes/pkg/scheduler/metrics"
"k8s.io/kubernetes/pkg/scheduler/util" "k8s.io/kubernetes/pkg/scheduler/util"
@@ -1195,9 +1196,10 @@ func (p *PriorityQueue) movePodsToActiveOrBackoffQueue(logger klog.Logger, podIn
for _, pInfo := range podInfoList { for _, pInfo := range podInfoList {
// Since there may be many gated pods and they will not move from the // Since there may be many gated pods and they will not move from the
// unschedulable pool, we skip calling the expensive isPodWorthRequeueing. // unschedulable pool, we skip calling the expensive isPodWorthRequeueing.
if pInfo.Gated { if pInfo.Gated && pInfo.UnschedulablePlugins.Has(names.SchedulingGates) {
continue continue
} }
schedulingHint := p.isPodWorthRequeuing(logger, pInfo, event, oldObj, newObj) schedulingHint := p.isPodWorthRequeuing(logger, pInfo, event, oldObj, newObj)
if schedulingHint == queueSkip { if schedulingHint == queueSkip {
// QueueingHintFn determined that this Pod isn't worth putting to activeQ or backoffQ by this event. // QueueingHintFn determined that this Pod isn't worth putting to activeQ or backoffQ by this event.

View File

@@ -44,6 +44,7 @@ import (
"k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework"
plfeature "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" plfeature "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/names"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/queuesort" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/queuesort"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/schedulinggates" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/schedulinggates"
"k8s.io/kubernetes/pkg/scheduler/metrics" "k8s.io/kubernetes/pkg/scheduler/metrics"
@@ -1499,13 +1500,21 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueueWithQueueingHint(t *testing.
expectedQ: unschedulablePods, expectedQ: unschedulablePods,
}, },
{ {
name: "QueueHintFunction is not called when Pod is gated", name: "QueueHintFunction is not called when Pod is gated by SchedulingGates plugin",
podInfo: setQueuedPodInfoGated(&framework.QueuedPodInfo{PodInfo: mustNewPodInfo(p), UnschedulablePlugins: sets.New("foo")}), podInfo: setQueuedPodInfoGated(&framework.QueuedPodInfo{PodInfo: mustNewPodInfo(p), UnschedulablePlugins: sets.New(names.SchedulingGates, "foo")}),
hint: func(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) { hint: func(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) {
return framework.Queue, fmt.Errorf("QueueingHintFn should not be called as pod is gated") return framework.Queue, fmt.Errorf("QueueingHintFn should not be called as pod is gated")
}, },
expectedQ: unschedulablePods, expectedQ: unschedulablePods,
}, },
{
name: "QueueHintFunction is called when Pod is gated by a plugin other than SchedulingGates",
podInfo: setQueuedPodInfoGated(&framework.QueuedPodInfo{PodInfo: mustNewPodInfo(p), UnschedulablePlugins: sets.New("foo")}),
hint: queueHintReturnQueue,
// FIXME: This should be backoffQ.
// https://github.com/kubernetes/kubernetes/issues/125538
expectedQ: activeQ,
},
} }
for _, test := range tests { for _, test := range tests {
@@ -1518,14 +1527,13 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueueWithQueueingHint(t *testing.
QueueingHintFn: test.hint, QueueingHintFn: test.hint,
}, },
} }
test.podInfo.UnschedulablePlugins = sets.New("foo")
cl := testingclock.NewFakeClock(now) cl := testingclock.NewFakeClock(now)
q := NewTestQueue(ctx, newDefaultQueueSort(), WithQueueingHintMapPerProfile(m), WithClock(cl)) q := NewTestQueue(ctx, newDefaultQueueSort(), WithQueueingHintMapPerProfile(m), WithClock(cl))
// add to unsched pod pool
q.activeQ.Add(q.newQueuedPodInfo(test.podInfo.Pod)) q.activeQ.Add(q.newQueuedPodInfo(test.podInfo.Pod))
if p, err := q.Pop(logger); err != nil || p.Pod != test.podInfo.Pod { if p, err := q.Pop(logger); err != nil || p.Pod != test.podInfo.Pod {
t.Errorf("Expected: %v after Pop, but got: %v", test.podInfo.Pod.Name, p.Pod.Name) t.Errorf("Expected: %v after Pop, but got: %v", test.podInfo.Pod.Name, p.Pod.Name)
} }
// add to unsched pod pool
err := q.AddUnschedulableIfNotPresent(logger, test.podInfo, q.SchedulingCycle()) err := q.AddUnschedulableIfNotPresent(logger, test.podInfo, q.SchedulingCycle())
if err != nil { if err != nil {
t.Fatalf("unexpected error from AddUnschedulableIfNotPresent: %v", err) t.Fatalf("unexpected error from AddUnschedulableIfNotPresent: %v", err)

View File

@@ -2668,8 +2668,9 @@ func TestSchedulingGatesPluginEventsToRegister(t *testing.T) {
} }
tests := []struct { tests := []struct {
name string name string
withEvents bool withEvents bool
// count is the expected number of calls to PreEnqueue().
count int count int
queueHintEnabled []bool queueHintEnabled []bool
expectedScheduled []bool expectedScheduled []bool
@@ -2686,7 +2687,7 @@ func TestSchedulingGatesPluginEventsToRegister(t *testing.T) {
{ {
name: "preEnqueue plugin with event registered", name: "preEnqueue plugin with event registered",
withEvents: true, withEvents: true,
count: 2, count: 3,
queueHintEnabled: []bool{false, true}, queueHintEnabled: []bool{false, true},
expectedScheduled: []bool{true, true}, expectedScheduled: []bool{true, true},
}, },
@@ -2700,7 +2701,7 @@ func TestSchedulingGatesPluginEventsToRegister(t *testing.T) {
t.Run(tt.name+fmt.Sprintf(" queueHint(%v)", queueHintEnabled), func(t *testing.T) { t.Run(tt.name+fmt.Sprintf(" queueHint(%v)", queueHintEnabled), func(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SchedulerQueueingHints, queueHintEnabled) featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SchedulerQueueingHints, queueHintEnabled)
// new plugin every time to clear counts // use new plugin every time to clear counts
var plugin framework.PreEnqueuePlugin var plugin framework.PreEnqueuePlugin
if tt.withEvents { if tt.withEvents {
plugin = &SchedulingGatesPluginWithEvents{SchedulingGates: schedulinggates.SchedulingGates{}} plugin = &SchedulingGatesPluginWithEvents{SchedulingGates: schedulinggates.SchedulingGates{}}