schedulingQueue update pod by queueHint

This commit is contained in:
AxeZhan
2023-12-08 16:12:13 +08:00
parent 68091805a5
commit d66f8f9413
14 changed files with 671 additions and 214 deletions

View File

@@ -34,9 +34,12 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
clientset "k8s.io/client-go/kubernetes"
listersv1 "k8s.io/client-go/listers/core/v1"
featuregatetesting "k8s.io/component-base/featuregate/testing"
configv1 "k8s.io/kube-scheduler/config/v1"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler"
schedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config"
configtesting "k8s.io/kubernetes/pkg/scheduler/apis/config/testing"
@@ -2665,129 +2668,160 @@ func TestSchedulingGatesPluginEventsToRegister(t *testing.T) {
}
tests := []struct {
name string
enqueuePlugin framework.PreEnqueuePlugin
count int
name string
withEvents bool
count int
queueHintEnabled []bool
expectedScheduled []bool
}{
{
name: "preEnqueue plugin without event registered",
enqueuePlugin: &SchedulingGatesPluginWOEvents{SchedulingGates: schedulinggates.SchedulingGates{}},
count: 2,
name: "preEnqueue plugin without event registered",
withEvents: false,
count: 2,
// This test case doesn't expect that the pod is scheduled again after the pod is updated
// when queuehint is enabled, because it doesn't register any events in EventsToRegister.
queueHintEnabled: []bool{false, true},
expectedScheduled: []bool{true, false},
},
{
name: "preEnqueue plugin with event registered",
enqueuePlugin: &SchedulingGatesPluginWithEvents{SchedulingGates: schedulinggates.SchedulingGates{}},
count: 2,
name: "preEnqueue plugin with event registered",
withEvents: true,
count: 2,
queueHintEnabled: []bool{false, true},
expectedScheduled: []bool{true, true},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
registry := frameworkruntime.Registry{
tt.enqueuePlugin.Name(): newPlugin(tt.enqueuePlugin),
}
for i := 0; i < len(tt.queueHintEnabled); i++ {
queueHintEnabled := tt.queueHintEnabled[i]
expectedScheduled := tt.expectedScheduled[i]
// Setup plugins for testing.
cfg := configtesting.V1ToInternalWithDefaults(t, configv1.KubeSchedulerConfiguration{
Profiles: []configv1.KubeSchedulerProfile{{
SchedulerName: pointer.String(v1.DefaultSchedulerName),
Plugins: &configv1.Plugins{
PreEnqueue: configv1.PluginSet{
Enabled: []configv1.Plugin{
{Name: tt.enqueuePlugin.Name()},
},
Disabled: []configv1.Plugin{
{Name: "*"},
t.Run(tt.name+fmt.Sprintf(" queueHint(%v)", queueHintEnabled), func(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SchedulerQueueingHints, queueHintEnabled)
// new plugin every time to clear counts
var plugin framework.PreEnqueuePlugin
if tt.withEvents {
plugin = &SchedulingGatesPluginWithEvents{SchedulingGates: schedulinggates.SchedulingGates{}}
} else {
plugin = &SchedulingGatesPluginWOEvents{SchedulingGates: schedulinggates.SchedulingGates{}}
}
registry := frameworkruntime.Registry{
plugin.Name(): newPlugin(plugin),
}
// Setup plugins for testing.
cfg := configtesting.V1ToInternalWithDefaults(t, configv1.KubeSchedulerConfiguration{
Profiles: []configv1.KubeSchedulerProfile{{
SchedulerName: pointer.String(v1.DefaultSchedulerName),
Plugins: &configv1.Plugins{
PreEnqueue: configv1.PluginSet{
Enabled: []configv1.Plugin{
{Name: plugin.Name()},
},
Disabled: []configv1.Plugin{
{Name: "*"},
},
},
},
},
}},
}},
})
testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 2,
scheduler.WithProfiles(cfg.Profiles...),
scheduler.WithFrameworkOutOfTreeRegistry(registry),
)
defer teardown()
// Create a pod with schedulingGates.
gatedPod := st.MakePod().Name("p").Namespace(testContext.NS.Name).
SchedulingGates([]string{"foo"}).
PodAffinity("kubernetes.io/hostname", &metav1.LabelSelector{MatchLabels: map[string]string{"foo": "bar"}}, st.PodAffinityWithRequiredReq).
Container("pause").Obj()
gatedPod, err := testutils.CreatePausePod(testCtx.ClientSet, gatedPod)
if err != nil {
t.Errorf("Error while creating a gated pod: %v", err)
return
}
if err := testutils.WaitForPodSchedulingGated(testCtx.Ctx, testCtx.ClientSet, gatedPod, 10*time.Second); err != nil {
t.Errorf("Expected the pod to be gated, but got: %v", err)
return
}
if num(plugin) != 1 {
t.Errorf("Expected the preEnqueue plugin to be called once, but got %v", num(plugin))
return
}
// Create a best effort pod.
pausePod, err := testutils.CreatePausePod(testCtx.ClientSet, testutils.InitPausePod(&testutils.PausePodConfig{
Name: "pause-pod",
Namespace: testCtx.NS.Name,
Labels: map[string]string{"foo": "bar"},
}))
if err != nil {
t.Errorf("Error while creating a pod: %v", err)
return
}
// Wait for the pod schedulabled.
if err := testutils.WaitForPodToScheduleWithTimeout(testCtx.ClientSet, pausePod, 10*time.Second); err != nil {
t.Errorf("Expected the pod to be schedulable, but got: %v", err)
return
}
// Update the pod which will trigger the requeue logic if plugin registers the events.
pausePod, err = testCtx.ClientSet.CoreV1().Pods(pausePod.Namespace).Get(testCtx.Ctx, pausePod.Name, metav1.GetOptions{})
if err != nil {
t.Errorf("Error while getting a pod: %v", err)
return
}
pausePod.Annotations = map[string]string{"foo": "bar"}
_, err = testCtx.ClientSet.CoreV1().Pods(pausePod.Namespace).Update(testCtx.Ctx, pausePod, metav1.UpdateOptions{})
if err != nil {
t.Errorf("Error while updating a pod: %v", err)
return
}
// Pod should still be unschedulable because scheduling gates still exist, theoretically, it's a waste rescheduling.
if err := testutils.WaitForPodSchedulingGated(testCtx.Ctx, testCtx.ClientSet, gatedPod, 10*time.Second); err != nil {
t.Errorf("Expected the pod to be gated, but got: %v", err)
return
}
if num(plugin) != tt.count {
t.Errorf("Expected the preEnqueue plugin to be called %v, but got %v", tt.count, num(plugin))
return
}
// Remove gated pod's scheduling gates.
gatedPod, err = testCtx.ClientSet.CoreV1().Pods(gatedPod.Namespace).Get(testCtx.Ctx, gatedPod.Name, metav1.GetOptions{})
if err != nil {
t.Errorf("Error while getting a pod: %v", err)
return
}
gatedPod.Spec.SchedulingGates = nil
_, err = testCtx.ClientSet.CoreV1().Pods(gatedPod.Namespace).Update(testCtx.Ctx, gatedPod, metav1.UpdateOptions{})
if err != nil {
t.Errorf("Error while updating a pod: %v", err)
return
}
if expectedScheduled {
if err := testutils.WaitForPodToScheduleWithTimeout(testCtx.ClientSet, gatedPod, 10*time.Second); err != nil {
t.Errorf("Expected the pod to be schedulable, but got: %v", err)
}
return
}
// wait for some time to ensure that the schedulerQueue has completed processing the podUpdate event.
time.Sleep(time.Second)
// pod shouldn't be scheduled if we didn't register podUpdate event for schedulingGates plugin
if err := testutils.WaitForPodSchedulingGated(testCtx.Ctx, testCtx.ClientSet, gatedPod, 10*time.Second); err != nil {
t.Errorf("Expected the pod to be gated, but got: %v", err)
return
}
})
testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 2,
scheduler.WithProfiles(cfg.Profiles...),
scheduler.WithFrameworkOutOfTreeRegistry(registry),
)
defer teardown()
// Create a pod with schedulingGates.
gatedPod := st.MakePod().Name("p").Namespace(testContext.NS.Name).
SchedulingGates([]string{"foo"}).
PodAffinity("kubernetes.io/hostname", &metav1.LabelSelector{MatchLabels: map[string]string{"foo": "bar"}}, st.PodAffinityWithRequiredReq).
Container("pause").Obj()
gatedPod, err := testutils.CreatePausePod(testCtx.ClientSet, gatedPod)
if err != nil {
t.Errorf("Error while creating a gated pod: %v", err)
return
}
if err := testutils.WaitForPodSchedulingGated(testCtx.Ctx, testCtx.ClientSet, gatedPod, 10*time.Second); err != nil {
t.Errorf("Expected the pod to be gated, but got: %v", err)
return
}
if num(tt.enqueuePlugin) != 1 {
t.Errorf("Expected the preEnqueue plugin to be called once, but got %v", num(tt.enqueuePlugin))
return
}
// Create a best effort pod.
pausePod, err := testutils.CreatePausePod(testCtx.ClientSet, testutils.InitPausePod(&testutils.PausePodConfig{
Name: "pause-pod",
Namespace: testCtx.NS.Name,
Labels: map[string]string{"foo": "bar"},
}))
if err != nil {
t.Errorf("Error while creating a pod: %v", err)
return
}
// Wait for the pod schedulabled.
if err := testutils.WaitForPodToScheduleWithTimeout(testCtx.ClientSet, pausePod, 10*time.Second); err != nil {
t.Errorf("Expected the pod to be schedulable, but got: %v", err)
return
}
// Update the pod which will trigger the requeue logic if plugin registers the events.
pausePod, err = testCtx.ClientSet.CoreV1().Pods(pausePod.Namespace).Get(testCtx.Ctx, pausePod.Name, metav1.GetOptions{})
if err != nil {
t.Errorf("Error while getting a pod: %v", err)
return
}
pausePod.Annotations = map[string]string{"foo": "bar"}
_, err = testCtx.ClientSet.CoreV1().Pods(pausePod.Namespace).Update(testCtx.Ctx, pausePod, metav1.UpdateOptions{})
if err != nil {
t.Errorf("Error while updating a pod: %v", err)
return
}
// Pod should still be unschedulable because scheduling gates still exist, theoretically, it's a waste rescheduling.
if err := testutils.WaitForPodSchedulingGated(testCtx.Ctx, testCtx.ClientSet, gatedPod, 10*time.Second); err != nil {
t.Errorf("Expected the pod to be gated, but got: %v", err)
return
}
if num(tt.enqueuePlugin) != tt.count {
t.Errorf("Expected the preEnqueue plugin to be called %v, but got %v", tt.count, num(tt.enqueuePlugin))
return
}
// Remove gated pod's scheduling gates.
gatedPod, err = testCtx.ClientSet.CoreV1().Pods(gatedPod.Namespace).Get(testCtx.Ctx, gatedPod.Name, metav1.GetOptions{})
if err != nil {
t.Errorf("Error while getting a pod: %v", err)
return
}
gatedPod.Spec.SchedulingGates = nil
_, err = testCtx.ClientSet.CoreV1().Pods(gatedPod.Namespace).Update(testCtx.Ctx, gatedPod, metav1.UpdateOptions{})
if err != nil {
t.Errorf("Error while updating a pod: %v", err)
return
}
// Ungated pod should be schedulable now.
if err := testutils.WaitForPodToScheduleWithTimeout(testCtx.ClientSet, gatedPod, 10*time.Second); err != nil {
t.Errorf("Expected the pod to be schedulable, but got: %v", err)
return
}
})
}
}
}