Unregister events in schedulingGates plugin
Signed-off-by: kerthcet <kerthcet@gmail.com>
This commit is contained in:
		@@ -370,6 +370,10 @@ type EnqueueExtensions interface {
 | 
			
		||||
	// and leveraged to build event handlers dynamically.
 | 
			
		||||
	// Note: the returned list needs to be static (not depend on configuration parameters);
 | 
			
		||||
	// otherwise it would lead to undefined behavior.
 | 
			
		||||
	//
 | 
			
		||||
	// The returned events could be nil to indicate that no events other than the pod's own update
 | 
			
		||||
	// can make the pod re-schedulable. An example is SchedulingGates plugin.
 | 
			
		||||
	// Appropriate implementation of this function will make Pod's re-scheduling accurate and performant.
 | 
			
		||||
	EventsToRegister() []ClusterEventWithHint
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -53,12 +53,10 @@ func (pl *SchedulingGates) PreEnqueue(ctx context.Context, p *v1.Pod) *framework
 | 
			
		||||
	return framework.NewStatus(framework.UnschedulableAndUnresolvable, fmt.Sprintf("waiting for scheduling gates: %v", gates))
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// EventsToRegister returns the possible events that may make a Pod
 | 
			
		||||
// failed by this plugin schedulable.
 | 
			
		||||
// EventsToRegister returns nil here to indicate that schedulingGates plugin is not
 | 
			
		||||
// interested in any event but its own update.
 | 
			
		||||
func (pl *SchedulingGates) EventsToRegister() []framework.ClusterEventWithHint {
 | 
			
		||||
	return []framework.ClusterEventWithHint{
 | 
			
		||||
		{Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Update}},
 | 
			
		||||
	}
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// New initializes a new plugin and returns it.
 | 
			
		||||
 
 | 
			
		||||
@@ -373,6 +373,13 @@ func buildQueueingHintMap(es []framework.EnqueueExtensions) internalqueue.Queuei
 | 
			
		||||
	for _, e := range es {
 | 
			
		||||
		events := e.EventsToRegister()
 | 
			
		||||
 | 
			
		||||
		// This will happen when plugin registers with empty events, it's usually the case a pod
 | 
			
		||||
		// will become reschedulable only for self-update, e.g. schedulingGates plugin, the pod
 | 
			
		||||
		// will enter into the activeQ via priorityQueue.Update().
 | 
			
		||||
		if len(events) == 0 {
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		// Note: Rarely, a plugin implements EnqueueExtensions but returns nil.
 | 
			
		||||
		// We treat it as: the plugin is not interested in any event, and hence pod failed by that plugin
 | 
			
		||||
		// cannot be moved by any regular cluster event.
 | 
			
		||||
 
 | 
			
		||||
@@ -662,6 +662,7 @@ const (
 | 
			
		||||
	emptyEventsToRegister          = "emptyEventsToRegister"
 | 
			
		||||
	queueSort                      = "no-op-queue-sort-plugin"
 | 
			
		||||
	fakeBind                       = "bind-plugin"
 | 
			
		||||
	emptyEventExtensions           = "emptyEventExtensions"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func Test_buildQueueingHintMap(t *testing.T) {
 | 
			
		||||
@@ -729,6 +730,23 @@ func Test_buildQueueingHintMap(t *testing.T) {
 | 
			
		||||
				},
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			name:    "register plugin with empty event",
 | 
			
		||||
			plugins: []framework.Plugin{&emptyEventPlugin{}},
 | 
			
		||||
			want:    map[framework.ClusterEvent][]*internalqueue.QueueingHintFunction{},
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			name:    "register plugins including emptyEventPlugin",
 | 
			
		||||
			plugins: []framework.Plugin{&emptyEventPlugin{}, &fakeNodePlugin{}},
 | 
			
		||||
			want: map[framework.ClusterEvent][]*internalqueue.QueueingHintFunction{
 | 
			
		||||
				{Resource: framework.Pod, ActionType: framework.Add}: {
 | 
			
		||||
					{PluginName: fakePod, QueueingHintFn: fakePodPluginQueueingFn},
 | 
			
		||||
				},
 | 
			
		||||
				{Resource: framework.Node, ActionType: framework.Add}: {
 | 
			
		||||
					{PluginName: fakeNode, QueueingHintFn: fakeNodePluginQueueingFn},
 | 
			
		||||
				},
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for _, tt := range tests {
 | 
			
		||||
@@ -1009,6 +1027,18 @@ func (pl *fakePodPlugin) EventsToRegister() []framework.ClusterEventWithHint {
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type emptyEventPlugin struct{}
 | 
			
		||||
 | 
			
		||||
func (*emptyEventPlugin) Name() string { return emptyEventExtensions }
 | 
			
		||||
 | 
			
		||||
func (*emptyEventPlugin) Filter(_ context.Context, _ *framework.CycleState, _ *v1.Pod, _ *framework.NodeInfo) *framework.Status {
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (pl *emptyEventPlugin) EventsToRegister() []framework.ClusterEventWithHint {
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// emptyEventsToRegisterPlugin implement interface framework.EnqueueExtensions, but returns nil from EventsToRegister.
 | 
			
		||||
// This can simulate a plugin registered at scheduler setup, but does nothing
 | 
			
		||||
// due to some disabled feature gate.
 | 
			
		||||
 
 | 
			
		||||
@@ -45,6 +45,7 @@ import (
 | 
			
		||||
	"k8s.io/kubernetes/pkg/scheduler/framework"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultbinder"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/scheduler/framework/plugins/schedulinggates"
 | 
			
		||||
	frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
 | 
			
		||||
	st "k8s.io/kubernetes/pkg/scheduler/testing"
 | 
			
		||||
	schedulerutils "k8s.io/kubernetes/test/integration/scheduler"
 | 
			
		||||
@@ -2588,3 +2589,197 @@ func TestActivatePods(t *testing.T) {
 | 
			
		||||
		t.Errorf("JobPlugin's pods activation logic is not called")
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
var _ framework.PreEnqueuePlugin = &SchedulingGatesPluginWithEvents{}
 | 
			
		||||
var _ framework.EnqueueExtensions = &SchedulingGatesPluginWithEvents{}
 | 
			
		||||
var _ framework.PreEnqueuePlugin = &SchedulingGatesPluginWOEvents{}
 | 
			
		||||
var _ framework.EnqueueExtensions = &SchedulingGatesPluginWOEvents{}
 | 
			
		||||
 | 
			
		||||
const (
 | 
			
		||||
	schedulingGatesPluginWithEvents = "scheduling-gates-with-events"
 | 
			
		||||
	schedulingGatesPluginWOEvents   = "scheduling-gates-without-events"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type SchedulingGatesPluginWithEvents struct {
 | 
			
		||||
	called int
 | 
			
		||||
	schedulinggates.SchedulingGates
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (pl *SchedulingGatesPluginWithEvents) Name() string {
 | 
			
		||||
	return schedulingGatesPluginWithEvents
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (pl *SchedulingGatesPluginWithEvents) PreEnqueue(ctx context.Context, p *v1.Pod) *framework.Status {
 | 
			
		||||
	pl.called++
 | 
			
		||||
	return pl.SchedulingGates.PreEnqueue(ctx, p)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (pl *SchedulingGatesPluginWithEvents) EventsToRegister() []framework.ClusterEventWithHint {
 | 
			
		||||
	return []framework.ClusterEventWithHint{
 | 
			
		||||
		{Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Update}},
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type SchedulingGatesPluginWOEvents struct {
 | 
			
		||||
	called int
 | 
			
		||||
	schedulinggates.SchedulingGates
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (pl *SchedulingGatesPluginWOEvents) Name() string {
 | 
			
		||||
	return schedulingGatesPluginWOEvents
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (pl *SchedulingGatesPluginWOEvents) PreEnqueue(ctx context.Context, p *v1.Pod) *framework.Status {
 | 
			
		||||
	pl.called++
 | 
			
		||||
	return pl.SchedulingGates.PreEnqueue(ctx, p)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (pl *SchedulingGatesPluginWOEvents) EventsToRegister() []framework.ClusterEventWithHint {
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// This test helps to verify registering nil events for schedulingGates plugin works as expected.
 | 
			
		||||
func TestSchedulingGatesPluginEventsToRegister(t *testing.T) {
 | 
			
		||||
	defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.PodSchedulingReadiness, true)()
 | 
			
		||||
 | 
			
		||||
	testContext := testutils.InitTestAPIServer(t, "preenqueue-plugin", nil)
 | 
			
		||||
 | 
			
		||||
	num := func(pl framework.Plugin) int {
 | 
			
		||||
		switch item := pl.(type) {
 | 
			
		||||
		case *SchedulingGatesPluginWithEvents:
 | 
			
		||||
			return item.called
 | 
			
		||||
		case *SchedulingGatesPluginWOEvents:
 | 
			
		||||
			return item.called
 | 
			
		||||
		default:
 | 
			
		||||
			t.Error("unsupported plugin")
 | 
			
		||||
		}
 | 
			
		||||
		return 0
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	tests := []struct {
 | 
			
		||||
		name          string
 | 
			
		||||
		enqueuePlugin framework.PreEnqueuePlugin
 | 
			
		||||
		count         int
 | 
			
		||||
	}{
 | 
			
		||||
		{
 | 
			
		||||
			name:          "preEnqueue plugin without event registered",
 | 
			
		||||
			enqueuePlugin: &SchedulingGatesPluginWOEvents{SchedulingGates: schedulinggates.SchedulingGates{EnablePodSchedulingReadiness: true}},
 | 
			
		||||
			count:         2,
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			name:          "preEnqueue plugin with event registered",
 | 
			
		||||
			enqueuePlugin: &SchedulingGatesPluginWithEvents{SchedulingGates: schedulinggates.SchedulingGates{EnablePodSchedulingReadiness: true}},
 | 
			
		||||
			count:         3,
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for _, tt := range tests {
 | 
			
		||||
		t.Run(tt.name, func(t *testing.T) {
 | 
			
		||||
			registry := frameworkruntime.Registry{
 | 
			
		||||
				tt.enqueuePlugin.Name(): newPlugin(tt.enqueuePlugin),
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			// Setup plugins for testing.
 | 
			
		||||
			cfg := configtesting.V1ToInternalWithDefaults(t, configv1.KubeSchedulerConfiguration{
 | 
			
		||||
				Profiles: []configv1.KubeSchedulerProfile{{
 | 
			
		||||
					SchedulerName: pointer.String(v1.DefaultSchedulerName),
 | 
			
		||||
					Plugins: &configv1.Plugins{
 | 
			
		||||
						PreEnqueue: configv1.PluginSet{
 | 
			
		||||
							Enabled: []configv1.Plugin{
 | 
			
		||||
								{Name: tt.enqueuePlugin.Name()},
 | 
			
		||||
							},
 | 
			
		||||
							Disabled: []configv1.Plugin{
 | 
			
		||||
								{Name: "*"},
 | 
			
		||||
							},
 | 
			
		||||
						},
 | 
			
		||||
					},
 | 
			
		||||
				}},
 | 
			
		||||
			})
 | 
			
		||||
 | 
			
		||||
			testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 2,
 | 
			
		||||
				scheduler.WithProfiles(cfg.Profiles...),
 | 
			
		||||
				scheduler.WithFrameworkOutOfTreeRegistry(registry),
 | 
			
		||||
			)
 | 
			
		||||
			defer teardown()
 | 
			
		||||
 | 
			
		||||
			// Create a pod with schedulingGates.
 | 
			
		||||
			gatedPod := st.MakePod().Name("p").Namespace(testContext.NS.Name).
 | 
			
		||||
				SchedulingGates([]string{"foo"}).
 | 
			
		||||
				PodAffinity("kubernetes.io/hostname", &metav1.LabelSelector{MatchLabels: map[string]string{"foo": "bar"}}, st.PodAffinityWithRequiredReq).
 | 
			
		||||
				Container("pause").Obj()
 | 
			
		||||
			gatedPod, err := testutils.CreatePausePod(testCtx.ClientSet, gatedPod)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				t.Errorf("Error while creating a gated pod: %v", err)
 | 
			
		||||
				return
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			if err := testutils.WaitForPodSchedulingGated(testCtx.ClientSet, gatedPod, 10*time.Second); err != nil {
 | 
			
		||||
				t.Errorf("Expected the pod to be gated, but got: %v", err)
 | 
			
		||||
				return
 | 
			
		||||
			}
 | 
			
		||||
			if num(tt.enqueuePlugin) != 1 {
 | 
			
		||||
				t.Errorf("Expected the preEnqueue plugin to be called once, but got %v", num(tt.enqueuePlugin))
 | 
			
		||||
				return
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			// Create a best effort pod.
 | 
			
		||||
			pausePod, err := testutils.CreatePausePod(testCtx.ClientSet, testutils.InitPausePod(&testutils.PausePodConfig{
 | 
			
		||||
				Name:      "pause-pod",
 | 
			
		||||
				Namespace: testCtx.NS.Name,
 | 
			
		||||
				Labels:    map[string]string{"foo": "bar"},
 | 
			
		||||
			}))
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				t.Errorf("Error while creating a pod: %v", err)
 | 
			
		||||
				return
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			// Wait for the pod schedulabled.
 | 
			
		||||
			if err := testutils.WaitForPodToScheduleWithTimeout(testCtx.ClientSet, pausePod, 10*time.Second); err != nil {
 | 
			
		||||
				t.Errorf("Expected the pod to be schedulable, but got: %v", err)
 | 
			
		||||
				return
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			// Update the pod which will trigger the requeue logic if plugin registers the events.
 | 
			
		||||
			pausePod, err = testCtx.ClientSet.CoreV1().Pods(pausePod.Namespace).Get(testCtx.Ctx, pausePod.Name, metav1.GetOptions{})
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				t.Errorf("Error while getting a pod: %v", err)
 | 
			
		||||
				return
 | 
			
		||||
			}
 | 
			
		||||
			pausePod.Annotations = map[string]string{"foo": "bar"}
 | 
			
		||||
			_, err = testCtx.ClientSet.CoreV1().Pods(pausePod.Namespace).Update(testCtx.Ctx, pausePod, metav1.UpdateOptions{})
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				t.Errorf("Error while updating a pod: %v", err)
 | 
			
		||||
				return
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			// Pod should still be unschedulable because scheduling gates still exist, theoretically, it's a waste rescheduling.
 | 
			
		||||
			if err := testutils.WaitForPodSchedulingGated(testCtx.ClientSet, gatedPod, 10*time.Second); err != nil {
 | 
			
		||||
				t.Errorf("Expected the pod to be gated, but got: %v", err)
 | 
			
		||||
				return
 | 
			
		||||
			}
 | 
			
		||||
			if num(tt.enqueuePlugin) != tt.count {
 | 
			
		||||
				t.Errorf("Expected the preEnqueue plugin to be called %v, but got %v", tt.count, num(tt.enqueuePlugin))
 | 
			
		||||
				return
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			// Remove gated pod's scheduling gates.
 | 
			
		||||
			gatedPod, err = testCtx.ClientSet.CoreV1().Pods(gatedPod.Namespace).Get(testCtx.Ctx, gatedPod.Name, metav1.GetOptions{})
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				t.Errorf("Error while getting a pod: %v", err)
 | 
			
		||||
				return
 | 
			
		||||
			}
 | 
			
		||||
			gatedPod.Spec.SchedulingGates = nil
 | 
			
		||||
			_, err = testCtx.ClientSet.CoreV1().Pods(gatedPod.Namespace).Update(testCtx.Ctx, gatedPod, metav1.UpdateOptions{})
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				t.Errorf("Error while updating a pod: %v", err)
 | 
			
		||||
				return
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			// Ungated pod should be schedulable now.
 | 
			
		||||
			if err := testutils.WaitForPodToScheduleWithTimeout(testCtx.ClientSet, gatedPod, 10*time.Second); err != nil {
 | 
			
		||||
				t.Errorf("Expected the pod to be schedulable, but got: %v", err)
 | 
			
		||||
				return
 | 
			
		||||
			}
 | 
			
		||||
		})
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user