Merge pull request #122234 from AxeZhan/podUpdateEvent
[Scheduler]Put pod into the correct queue during podUpdate
This commit is contained in:
		| @@ -27,4 +27,5 @@ type Features struct { | ||||
| 	EnablePodDisruptionConditions                bool | ||||
| 	EnableInPlacePodVerticalScaling              bool | ||||
| 	EnableSidecarContainers                      bool | ||||
| 	EnableSchedulingQueueHint                    bool | ||||
| } | ||||
|   | ||||
| @@ -300,18 +300,20 @@ func (f *Fit) isSchedulableAfterPodChange(logger klog.Logger, pod *v1.Pod, oldOb | ||||
| 	return framework.Queue, nil | ||||
| } | ||||
|  | ||||
| // isResourceScaleDown checks whether the resource request of the modified pod is less than the original pod | ||||
| // for the resources requested by the pod we are trying to schedule. | ||||
| func (f *Fit) isResourceScaleDown(targetPod, originalOtherPod, modifiedOtherPod *v1.Pod) bool { | ||||
| 	if modifiedOtherPod.Spec.NodeName == "" { | ||||
| 		// no resource is freed up whatever the pod is modified. | ||||
| // isResourceScaleDown checks whether an update event may make the pod schedulable. Specifically: | ||||
| // - Returns true when an update event shows a scheduled pod's resource request got reduced. | ||||
| // - Returns true when an update event is for the unscheduled pod itself, and it shows the pod's resource request got reduced. | ||||
| func (f *Fit) isResourceScaleDown(targetPod, originalPod, modifiedPod *v1.Pod) bool { | ||||
| 	if modifiedPod.UID != targetPod.UID && modifiedPod.Spec.NodeName == "" { | ||||
| 		// If the update event is not for targetPod and a scheduled Pod, | ||||
| 		// it wouldn't make targetPod schedulable. | ||||
| 		return false | ||||
| 	} | ||||
|  | ||||
| 	// the other pod was scheduled, so modification or deletion may free up some resources. | ||||
| 	originalMaxResourceReq, modifiedMaxResourceReq := &framework.Resource{}, &framework.Resource{} | ||||
| 	originalMaxResourceReq.SetMaxResource(resource.PodRequests(originalOtherPod, resource.PodResourcesOptions{InPlacePodVerticalScalingEnabled: f.enableInPlacePodVerticalScaling})) | ||||
| 	modifiedMaxResourceReq.SetMaxResource(resource.PodRequests(modifiedOtherPod, resource.PodResourcesOptions{InPlacePodVerticalScalingEnabled: f.enableInPlacePodVerticalScaling})) | ||||
| 	originalMaxResourceReq.SetMaxResource(resource.PodRequests(originalPod, resource.PodResourcesOptions{InPlacePodVerticalScalingEnabled: f.enableInPlacePodVerticalScaling})) | ||||
| 	modifiedMaxResourceReq.SetMaxResource(resource.PodRequests(modifiedPod, resource.PodResourcesOptions{InPlacePodVerticalScalingEnabled: f.enableInPlacePodVerticalScaling})) | ||||
|  | ||||
| 	// check whether the resource request of the modified pod is less than the original pod. | ||||
| 	podRequests := resource.PodRequests(targetPod, resource.PodResourcesOptions{InPlacePodVerticalScalingEnabled: f.enableInPlacePodVerticalScaling}) | ||||
|   | ||||
| @@ -1145,9 +1145,9 @@ func Test_isSchedulableAfterPodChange(t *testing.T) { | ||||
| 			expectedHint:                    framework.Queue, | ||||
| 			expectedErr:                     true, | ||||
| 		}, | ||||
| 		"queue-on-deleted": { | ||||
| 			pod:                             st.MakePod().Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Obj(), | ||||
| 			oldObj:                          st.MakePod().Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Node("fake").Obj(), | ||||
| 		"queue-on-other-pod-deleted": { | ||||
| 			pod:                             st.MakePod().Name("pod1").Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Obj(), | ||||
| 			oldObj:                          st.MakePod().Name("pod2").Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Node("fake").Obj(), | ||||
| 			enableInPlacePodVerticalScaling: true, | ||||
| 			expectedHint:                    framework.Queue, | ||||
| 		}, | ||||
| @@ -1158,44 +1158,51 @@ func Test_isSchedulableAfterPodChange(t *testing.T) { | ||||
| 			expectedHint:                    framework.QueueSkip, | ||||
| 		}, | ||||
| 		"skip-queue-on-disable-inplace-pod-vertical-scaling": { | ||||
| 			pod:                             st.MakePod().Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Obj(), | ||||
| 			oldObj:                          st.MakePod().Req(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Node("fake").Obj(), | ||||
| 			newObj:                          st.MakePod().Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Node("fake").Obj(), | ||||
| 			pod:                             st.MakePod().Name("pod1").Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Obj(), | ||||
| 			oldObj:                          st.MakePod().Name("pod2").Req(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Node("fake").Obj(), | ||||
| 			newObj:                          st.MakePod().Name("pod2").Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Node("fake").Obj(), | ||||
| 			enableInPlacePodVerticalScaling: false, | ||||
| 			expectedHint:                    framework.QueueSkip, | ||||
| 		}, | ||||
| 		"skip-queue-on-unscheduled-pod": { | ||||
| 			pod:                             st.MakePod().Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Obj(), | ||||
| 			oldObj:                          st.MakePod().Req(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Obj(), | ||||
| 			newObj:                          st.MakePod().Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Obj(), | ||||
| 		"skip-queue-on-other-unscheduled-pod": { | ||||
| 			pod:                             st.MakePod().Name("pod1").Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).UID("uid0").Obj(), | ||||
| 			oldObj:                          st.MakePod().Name("pod2").Req(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).UID("uid1").Obj(), | ||||
| 			newObj:                          st.MakePod().Name("pod2").Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).UID("uid1").Obj(), | ||||
| 			enableInPlacePodVerticalScaling: true, | ||||
| 			expectedHint:                    framework.QueueSkip, | ||||
| 		}, | ||||
| 		"skip-queue-on-non-resource-changes": { | ||||
| 		"skip-queue-on-other-pod-non-resource-changes": { | ||||
| 			pod:                             &v1.Pod{}, | ||||
| 			oldObj:                          st.MakePod().Label("k", "v").Node("fake").Obj(), | ||||
| 			newObj:                          st.MakePod().Label("foo", "bar").Node("fake").Obj(), | ||||
| 			oldObj:                          st.MakePod().Name("pod2").Label("k", "v").Node("fake").Obj(), | ||||
| 			newObj:                          st.MakePod().Name("pod2").Label("foo", "bar").Node("fake").Obj(), | ||||
| 			enableInPlacePodVerticalScaling: true, | ||||
| 			expectedHint:                    framework.QueueSkip, | ||||
| 		}, | ||||
| 		"skip-queue-on-unrelated-resource-changes": { | ||||
| 			pod:                             st.MakePod().Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Obj(), | ||||
| 			oldObj:                          st.MakePod().Req(map[v1.ResourceName]string{v1.ResourceMemory: "2"}).Node("fake").Obj(), | ||||
| 			newObj:                          st.MakePod().Req(map[v1.ResourceName]string{v1.ResourceMemory: "1"}).Node("fake").Obj(), | ||||
| 		"skip-queue-on-other-pod-unrelated-resource-changes": { | ||||
| 			pod:                             st.MakePod().Name("pod1").Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Obj(), | ||||
| 			oldObj:                          st.MakePod().Name("pod2").Req(map[v1.ResourceName]string{v1.ResourceMemory: "2"}).Node("fake").Obj(), | ||||
| 			newObj:                          st.MakePod().Name("pod2").Req(map[v1.ResourceName]string{v1.ResourceMemory: "1"}).Node("fake").Obj(), | ||||
| 			enableInPlacePodVerticalScaling: true, | ||||
| 			expectedHint:                    framework.QueueSkip, | ||||
| 		}, | ||||
| 		"skip-queue-on-resource-scale-up": { | ||||
| 			pod:                             st.MakePod().Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Obj(), | ||||
| 			oldObj:                          st.MakePod().Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Node("fake").Obj(), | ||||
| 			newObj:                          st.MakePod().Req(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Node("fake").Obj(), | ||||
| 		"skip-queue-on-other-pod-resource-scale-up": { | ||||
| 			pod:                             st.MakePod().Name("pod1").Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Obj(), | ||||
| 			oldObj:                          st.MakePod().Name("pod2").Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Node("fake").Obj(), | ||||
| 			newObj:                          st.MakePod().Name("pod2").Req(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Node("fake").Obj(), | ||||
| 			enableInPlacePodVerticalScaling: true, | ||||
| 			expectedHint:                    framework.QueueSkip, | ||||
| 		}, | ||||
| 		"queue-on-some-resource-scale-down": { | ||||
| 			pod:                             st.MakePod().Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Obj(), | ||||
| 			oldObj:                          st.MakePod().Req(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Node("fake").Obj(), | ||||
| 			newObj:                          st.MakePod().Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Node("fake").Obj(), | ||||
| 		"queue-on-other-pod-some-resource-scale-down": { | ||||
| 			pod:                             st.MakePod().Name("pod1").Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Obj(), | ||||
| 			oldObj:                          st.MakePod().Name("pod2").Req(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Node("fake").Obj(), | ||||
| 			newObj:                          st.MakePod().Name("pod2").Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Node("fake").Obj(), | ||||
| 			enableInPlacePodVerticalScaling: true, | ||||
| 			expectedHint:                    framework.Queue, | ||||
| 		}, | ||||
| 		"queue-on-target-pod-some-resource-scale-down": { | ||||
| 			pod:                             st.MakePod().Name("pod1").Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Obj(), | ||||
| 			oldObj:                          st.MakePod().Name("pod1").Req(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Obj(), | ||||
| 			newObj:                          st.MakePod().Name("pod1").Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Obj(), | ||||
| 			enableInPlacePodVerticalScaling: true, | ||||
| 			expectedHint:                    framework.Queue, | ||||
| 		}, | ||||
|   | ||||
| @@ -53,12 +53,13 @@ func NewInTreeRegistry() runtime.Registry { | ||||
| 		EnablePodDisruptionConditions:                feature.DefaultFeatureGate.Enabled(features.PodDisruptionConditions), | ||||
| 		EnableInPlacePodVerticalScaling:              feature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling), | ||||
| 		EnableSidecarContainers:                      feature.DefaultFeatureGate.Enabled(features.SidecarContainers), | ||||
| 		EnableSchedulingQueueHint:                    feature.DefaultFeatureGate.Enabled(features.SchedulerQueueingHints), | ||||
| 	} | ||||
|  | ||||
| 	registry := runtime.Registry{ | ||||
| 		dynamicresources.Name:                runtime.FactoryAdapter(fts, dynamicresources.New), | ||||
| 		imagelocality.Name:                   imagelocality.New, | ||||
| 		tainttoleration.Name:                 tainttoleration.New, | ||||
| 		tainttoleration.Name:                 runtime.FactoryAdapter(fts, tainttoleration.New), | ||||
| 		nodename.Name:                        nodename.New, | ||||
| 		nodeports.Name:                       nodeports.New, | ||||
| 		nodeaffinity.Name:                    nodeaffinity.New, | ||||
| @@ -78,7 +79,7 @@ func NewInTreeRegistry() runtime.Registry { | ||||
| 		queuesort.Name:                       queuesort.New, | ||||
| 		defaultbinder.Name:                   defaultbinder.New, | ||||
| 		defaultpreemption.Name:               runtime.FactoryAdapter(fts, defaultpreemption.New), | ||||
| 		schedulinggates.Name:                 schedulinggates.New, | ||||
| 		schedulinggates.Name:                 runtime.FactoryAdapter(fts, schedulinggates.New), | ||||
| 	} | ||||
|  | ||||
| 	return registry | ||||
|   | ||||
| @@ -22,15 +22,21 @@ import ( | ||||
|  | ||||
| 	v1 "k8s.io/api/core/v1" | ||||
| 	"k8s.io/apimachinery/pkg/runtime" | ||||
| 	"k8s.io/klog/v2" | ||||
|  | ||||
| 	"k8s.io/kubernetes/pkg/scheduler/framework" | ||||
| 	"k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" | ||||
| 	"k8s.io/kubernetes/pkg/scheduler/framework/plugins/names" | ||||
| 	"k8s.io/kubernetes/pkg/scheduler/util" | ||||
| ) | ||||
|  | ||||
| // Name of the plugin used in the plugin registry and configurations. | ||||
| const Name = names.SchedulingGates | ||||
|  | ||||
| // SchedulingGates checks if a Pod carries .spec.schedulingGates. | ||||
| type SchedulingGates struct{} | ||||
| type SchedulingGates struct { | ||||
| 	enableSchedulingQueueHint bool | ||||
| } | ||||
|  | ||||
| var _ framework.PreEnqueuePlugin = &SchedulingGates{} | ||||
| var _ framework.EnqueueExtensions = &SchedulingGates{} | ||||
| @@ -50,13 +56,42 @@ func (pl *SchedulingGates) PreEnqueue(ctx context.Context, p *v1.Pod) *framework | ||||
| 	return framework.NewStatus(framework.UnschedulableAndUnresolvable, fmt.Sprintf("waiting for scheduling gates: %v", gates)) | ||||
| } | ||||
|  | ||||
| // EventsToRegister returns nil here to indicate that schedulingGates plugin is not | ||||
| // interested in any event but its own update. | ||||
| // EventsToRegister returns the possible events that may make a Pod | ||||
| // failed by this plugin schedulable. | ||||
| func (pl *SchedulingGates) EventsToRegister() []framework.ClusterEventWithHint { | ||||
| 	return nil | ||||
| 	if !pl.enableSchedulingQueueHint { | ||||
| 		return nil | ||||
| 	} | ||||
| 	// When the QueueingHint feature is enabled, | ||||
| 	// the scheduling queue uses Pod/Update Queueing Hint | ||||
| 	// to determine whether a Pod's update makes the Pod schedulable or not. | ||||
| 	// https://github.com/kubernetes/kubernetes/pull/122234 | ||||
| 	return []framework.ClusterEventWithHint{ | ||||
| 		// Pods can be more schedulable once it's gates are removed | ||||
| 		{Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Update}, QueueingHintFn: pl.isSchedulableAfterPodChange}, | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // New initializes a new plugin and returns it. | ||||
| func New(_ context.Context, _ runtime.Object, _ framework.Handle) (framework.Plugin, error) { | ||||
| 	return &SchedulingGates{}, nil | ||||
| func New(_ context.Context, _ runtime.Object, _ framework.Handle, fts feature.Features) (framework.Plugin, error) { | ||||
| 	return &SchedulingGates{ | ||||
| 		enableSchedulingQueueHint: fts.EnableSchedulingQueueHint, | ||||
| 	}, nil | ||||
| } | ||||
|  | ||||
| func (pl *SchedulingGates) isSchedulableAfterPodChange(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) { | ||||
| 	_, modifiedPod, err := util.As[*v1.Pod](oldObj, newObj) | ||||
| 	if err != nil { | ||||
| 		return framework.Queue, err | ||||
| 	} | ||||
|  | ||||
| 	if modifiedPod.UID != pod.UID { | ||||
| 		// If the update event is not for targetPod, it wouldn't make targetPod schedulable. | ||||
| 		return framework.QueueSkip, nil | ||||
| 	} | ||||
|  | ||||
| 	if len(modifiedPod.Spec.SchedulingGates) == 0 { | ||||
| 		return framework.Queue, nil | ||||
| 	} | ||||
| 	return framework.QueueSkip, nil | ||||
| } | ||||
|   | ||||
| @@ -20,9 +20,11 @@ import ( | ||||
| 	"testing" | ||||
|  | ||||
| 	"github.com/google/go-cmp/cmp" | ||||
| 	"github.com/stretchr/testify/require" | ||||
|  | ||||
| 	v1 "k8s.io/api/core/v1" | ||||
| 	"k8s.io/kubernetes/pkg/scheduler/framework" | ||||
| 	"k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" | ||||
| 	st "k8s.io/kubernetes/pkg/scheduler/testing" | ||||
| 	"k8s.io/kubernetes/test/utils/ktesting" | ||||
| ) | ||||
| @@ -48,7 +50,7 @@ func TestPreEnqueue(t *testing.T) { | ||||
| 	for _, tt := range tests { | ||||
| 		t.Run(tt.name, func(t *testing.T) { | ||||
| 			_, ctx := ktesting.NewTestContext(t) | ||||
| 			p, err := New(ctx, nil, nil) | ||||
| 			p, err := New(ctx, nil, nil, feature.Features{}) | ||||
| 			if err != nil { | ||||
| 				t.Fatalf("Creating plugin: %v", err) | ||||
| 			} | ||||
| @@ -60,3 +62,60 @@ func TestPreEnqueue(t *testing.T) { | ||||
| 		}) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func Test_isSchedulableAfterPodChange(t *testing.T) { | ||||
| 	testcases := map[string]struct { | ||||
| 		pod            *v1.Pod | ||||
| 		oldObj, newObj interface{} | ||||
| 		expectedHint   framework.QueueingHint | ||||
| 		expectedErr    bool | ||||
| 	}{ | ||||
| 		"backoff-wrong-old-object": { | ||||
| 			pod:          &v1.Pod{}, | ||||
| 			oldObj:       "not-a-pod", | ||||
| 			expectedHint: framework.Queue, | ||||
| 			expectedErr:  true, | ||||
| 		}, | ||||
| 		"backoff-wrong-new-object": { | ||||
| 			pod:          &v1.Pod{}, | ||||
| 			newObj:       "not-a-pod", | ||||
| 			expectedHint: framework.Queue, | ||||
| 			expectedErr:  true, | ||||
| 		}, | ||||
| 		"skip-queue-on-other-pod-updated": { | ||||
| 			pod:          st.MakePod().Name("p").SchedulingGates([]string{"foo", "bar"}).UID("uid0").Obj(), | ||||
| 			oldObj:       st.MakePod().Name("p1").SchedulingGates([]string{"foo", "bar"}).UID("uid1").Obj(), | ||||
| 			newObj:       st.MakePod().Name("p1").SchedulingGates([]string{"foo"}).UID("uid1").Obj(), | ||||
| 			expectedHint: framework.QueueSkip, | ||||
| 		}, | ||||
| 		"skip-queue-on-gates-not-empty": { | ||||
| 			pod:          st.MakePod().Name("p").SchedulingGates([]string{"foo", "bar"}).Obj(), | ||||
| 			oldObj:       st.MakePod().Name("p").SchedulingGates([]string{"foo", "bar"}).Obj(), | ||||
| 			newObj:       st.MakePod().Name("p").SchedulingGates([]string{"foo"}).Obj(), | ||||
| 			expectedHint: framework.QueueSkip, | ||||
| 		}, | ||||
| 		"queue-on-gates-become-empty": { | ||||
| 			pod:          st.MakePod().Name("p").SchedulingGates([]string{"foo"}).Obj(), | ||||
| 			oldObj:       st.MakePod().Name("p").SchedulingGates([]string{"foo"}).Obj(), | ||||
| 			newObj:       st.MakePod().Name("p").SchedulingGates([]string{}).Obj(), | ||||
| 			expectedHint: framework.Queue, | ||||
| 		}, | ||||
| 	} | ||||
|  | ||||
| 	for name, tc := range testcases { | ||||
| 		t.Run(name, func(t *testing.T) { | ||||
| 			logger, ctx := ktesting.NewTestContext(t) | ||||
| 			p, err := New(ctx, nil, nil, feature.Features{}) | ||||
| 			if err != nil { | ||||
| 				t.Fatalf("Creating plugin: %v", err) | ||||
| 			} | ||||
| 			actualHint, err := p.(*SchedulingGates).isSchedulableAfterPodChange(logger, tc.pod, tc.oldObj, tc.newObj) | ||||
| 			if tc.expectedErr { | ||||
| 				require.Error(t, err) | ||||
| 				return | ||||
| 			} | ||||
| 			require.NoError(t, err) | ||||
| 			require.Equal(t, tc.expectedHint, actualHint) | ||||
| 		}) | ||||
| 	} | ||||
| } | ||||
|   | ||||
| @@ -25,6 +25,7 @@ import ( | ||||
| 	v1helper "k8s.io/component-helpers/scheduling/corev1" | ||||
| 	"k8s.io/klog/v2" | ||||
| 	"k8s.io/kubernetes/pkg/scheduler/framework" | ||||
| 	"k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" | ||||
| 	"k8s.io/kubernetes/pkg/scheduler/framework/plugins/helper" | ||||
| 	"k8s.io/kubernetes/pkg/scheduler/framework/plugins/names" | ||||
| 	"k8s.io/kubernetes/pkg/scheduler/util" | ||||
| @@ -32,7 +33,8 @@ import ( | ||||
|  | ||||
| // TaintToleration is a plugin that checks if a pod tolerates a node's taints. | ||||
| type TaintToleration struct { | ||||
| 	handle framework.Handle | ||||
| 	handle                    framework.Handle | ||||
| 	enableSchedulingQueueHint bool | ||||
| } | ||||
|  | ||||
| var _ framework.FilterPlugin = &TaintToleration{} | ||||
| @@ -57,9 +59,19 @@ func (pl *TaintToleration) Name() string { | ||||
| // EventsToRegister returns the possible events that may make a Pod | ||||
| // failed by this plugin schedulable. | ||||
| func (pl *TaintToleration) EventsToRegister() []framework.ClusterEventWithHint { | ||||
| 	return []framework.ClusterEventWithHint{ | ||||
| 	clusterEventWithHint := []framework.ClusterEventWithHint{ | ||||
| 		{Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.Update}, QueueingHintFn: pl.isSchedulableAfterNodeChange}, | ||||
| 	} | ||||
| 	if !pl.enableSchedulingQueueHint { | ||||
| 		return clusterEventWithHint | ||||
| 	} | ||||
| 	// When the QueueingHint feature is enabled, | ||||
| 	// the scheduling queue uses Pod/Update Queueing Hint | ||||
| 	// to determine whether a Pod's update makes the Pod schedulable or not. | ||||
| 	// https://github.com/kubernetes/kubernetes/pull/122234 | ||||
| 	clusterEventWithHint = append(clusterEventWithHint, | ||||
| 		framework.ClusterEventWithHint{Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Update}, QueueingHintFn: pl.isSchedulableAfterPodChange}) | ||||
| 	return clusterEventWithHint | ||||
| } | ||||
|  | ||||
| // isSchedulableAfterNodeChange is invoked for all node events reported by | ||||
| @@ -191,6 +203,32 @@ func (pl *TaintToleration) ScoreExtensions() framework.ScoreExtensions { | ||||
| } | ||||
|  | ||||
| // New initializes a new plugin and returns it. | ||||
| func New(_ context.Context, _ runtime.Object, h framework.Handle) (framework.Plugin, error) { | ||||
| 	return &TaintToleration{handle: h}, nil | ||||
| func New(_ context.Context, _ runtime.Object, h framework.Handle, fts feature.Features) (framework.Plugin, error) { | ||||
| 	return &TaintToleration{ | ||||
| 		handle:                    h, | ||||
| 		enableSchedulingQueueHint: fts.EnableSchedulingQueueHint, | ||||
| 	}, nil | ||||
| } | ||||
|  | ||||
| // isSchedulableAfterPodChange is invoked whenever a pod changed. It checks whether | ||||
| // that change made a previously unschedulable pod schedulable. | ||||
| // When an unscheduled Pod, which was rejected by TaintToleration, is updated to have a new toleration, | ||||
| // it may make the Pod schedulable. | ||||
| func (pl *TaintToleration) isSchedulableAfterPodChange(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) { | ||||
| 	originalPod, modifiedPod, err := util.As[*v1.Pod](oldObj, newObj) | ||||
| 	if err != nil { | ||||
| 		return framework.Queue, err | ||||
| 	} | ||||
|  | ||||
| 	if pod.UID == modifiedPod.UID && | ||||
| 		len(originalPod.Spec.Tolerations) != len(modifiedPod.Spec.Tolerations) { | ||||
| 		// An unscheduled Pod got a new toleration. | ||||
| 		// Due to API validation, the user can add, but cannot modify or remove tolerations. | ||||
| 		// So, it's enough to just check the length of tolerations to notice the update. | ||||
| 		// And, any updates in tolerations could make Pod schedulable. | ||||
| 		logger.V(5).Info("a new toleration is added for the Pod, and it may make it schedulable", "pod", klog.KObj(modifiedPod)) | ||||
| 		return framework.Queue, nil | ||||
| 	} | ||||
|  | ||||
| 	return framework.QueueSkip, nil | ||||
| } | ||||
|   | ||||
| @@ -21,10 +21,12 @@ import ( | ||||
| 	"reflect" | ||||
| 	"testing" | ||||
|  | ||||
| 	"github.com/google/go-cmp/cmp" | ||||
| 	v1 "k8s.io/api/core/v1" | ||||
| 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||||
| 	"k8s.io/klog/v2/ktesting" | ||||
| 	"k8s.io/kubernetes/pkg/scheduler/framework" | ||||
| 	"k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" | ||||
| 	"k8s.io/kubernetes/pkg/scheduler/framework/runtime" | ||||
| 	"k8s.io/kubernetes/pkg/scheduler/internal/cache" | ||||
| 	tf "k8s.io/kubernetes/pkg/scheduler/testing/framework" | ||||
| @@ -238,7 +240,7 @@ func TestTaintTolerationScore(t *testing.T) { | ||||
| 			snapshot := cache.NewSnapshot(nil, test.nodes) | ||||
| 			fh, _ := runtime.NewFramework(ctx, nil, nil, runtime.WithSnapshotSharedLister(snapshot)) | ||||
|  | ||||
| 			p, err := New(ctx, nil, fh) | ||||
| 			p, err := New(ctx, nil, fh, feature.Features{}) | ||||
| 			if err != nil { | ||||
| 				t.Fatalf("creating plugin: %v", err) | ||||
| 			} | ||||
| @@ -342,7 +344,7 @@ func TestTaintTolerationFilter(t *testing.T) { | ||||
| 			_, ctx := ktesting.NewTestContext(t) | ||||
| 			nodeInfo := framework.NewNodeInfo() | ||||
| 			nodeInfo.SetNode(test.node) | ||||
| 			p, err := New(ctx, nil, nil) | ||||
| 			p, err := New(ctx, nil, nil, feature.Features{}) | ||||
| 			if err != nil { | ||||
| 				t.Fatalf("creating plugin: %v", err) | ||||
| 			} | ||||
| @@ -418,3 +420,130 @@ func TestIsSchedulableAfterNodeChange(t *testing.T) { | ||||
| 		}) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func Test_isSchedulableAfterPodChange(t *testing.T) { | ||||
| 	testcases := map[string]struct { | ||||
| 		pod            *v1.Pod | ||||
| 		oldObj, newObj interface{} | ||||
| 		expectedHint   framework.QueueingHint | ||||
| 		expectedErr    bool | ||||
| 	}{ | ||||
| 		"backoff-wrong-new-object": { | ||||
| 			pod:          &v1.Pod{}, | ||||
| 			newObj:       "not-a-pod", | ||||
| 			expectedHint: framework.Queue, | ||||
| 			expectedErr:  true, | ||||
| 		}, | ||||
| 		"backoff-wrong-old-object": { | ||||
| 			pod:          &v1.Pod{}, | ||||
| 			oldObj:       "not-a-pod", | ||||
| 			newObj:       &v1.Pod{}, | ||||
| 			expectedHint: framework.Queue, | ||||
| 			expectedErr:  true, | ||||
| 		}, | ||||
| 		"skip-updates-other-pod": { | ||||
| 			pod: &v1.Pod{ | ||||
| 				ObjectMeta: metav1.ObjectMeta{ | ||||
| 					Name:      "pod-1", | ||||
| 					Namespace: "ns-1", | ||||
| 					UID:       "uid0", | ||||
| 				}}, | ||||
| 			oldObj: &v1.Pod{ | ||||
| 				ObjectMeta: metav1.ObjectMeta{ | ||||
| 					Name:      "pod-2", | ||||
| 					Namespace: "ns-1", | ||||
| 					UID:       "uid1", | ||||
| 				}}, | ||||
| 			newObj: &v1.Pod{ | ||||
| 				ObjectMeta: metav1.ObjectMeta{ | ||||
| 					Name:      "pod-2", | ||||
| 					Namespace: "ns-1", | ||||
| 					UID:       "uid1", | ||||
| 				}, | ||||
| 				Spec: v1.PodSpec{ | ||||
| 					Tolerations: []v1.Toleration{ | ||||
| 						{ | ||||
| 							Key:    "foo", | ||||
| 							Effect: v1.TaintEffectNoSchedule, | ||||
| 						}, | ||||
| 					}, | ||||
| 				}, | ||||
| 			}, | ||||
| 			expectedHint: framework.QueueSkip, | ||||
| 			expectedErr:  false, | ||||
| 		}, | ||||
| 		"skip-updates-not-toleration": { | ||||
| 			pod: &v1.Pod{ | ||||
| 				ObjectMeta: metav1.ObjectMeta{ | ||||
| 					Name:      "pod-1", | ||||
| 					Namespace: "ns-1", | ||||
| 				}}, | ||||
| 			oldObj: &v1.Pod{ | ||||
| 				ObjectMeta: metav1.ObjectMeta{ | ||||
| 					Name:      "pod-1", | ||||
| 					Namespace: "ns-1", | ||||
| 				}}, | ||||
| 			newObj: &v1.Pod{ | ||||
| 				ObjectMeta: metav1.ObjectMeta{ | ||||
| 					Name:      "pod-1", | ||||
| 					Namespace: "ns-1", | ||||
| 					Labels:    map[string]string{"foo": "bar"}, | ||||
| 				}, | ||||
| 			}, | ||||
| 			expectedHint: framework.QueueSkip, | ||||
| 			expectedErr:  false, | ||||
| 		}, | ||||
| 		"queue-on-toleration-added": { | ||||
| 			pod: &v1.Pod{ | ||||
| 				ObjectMeta: metav1.ObjectMeta{ | ||||
| 					Name:      "pod-1", | ||||
| 					Namespace: "ns-1", | ||||
| 				}}, | ||||
| 			oldObj: &v1.Pod{ | ||||
| 				ObjectMeta: metav1.ObjectMeta{ | ||||
| 					Name:      "pod-1", | ||||
| 					Namespace: "ns-1", | ||||
| 				}}, | ||||
| 			newObj: &v1.Pod{ | ||||
| 				ObjectMeta: metav1.ObjectMeta{ | ||||
| 					Name:      "pod-1", | ||||
| 					Namespace: "ns-1", | ||||
| 				}, | ||||
| 				Spec: v1.PodSpec{ | ||||
| 					Tolerations: []v1.Toleration{ | ||||
| 						{ | ||||
| 							Key:    "foo", | ||||
| 							Effect: v1.TaintEffectNoSchedule, | ||||
| 						}, | ||||
| 					}, | ||||
| 				}, | ||||
| 			}, | ||||
| 			expectedHint: framework.Queue, | ||||
| 			expectedErr:  false, | ||||
| 		}, | ||||
| 	} | ||||
|  | ||||
| 	for name, tc := range testcases { | ||||
| 		t.Run(name, func(t *testing.T) { | ||||
| 			logger, ctx := ktesting.NewTestContext(t) | ||||
| 			p, err := New(ctx, nil, nil, feature.Features{}) | ||||
| 			if err != nil { | ||||
| 				t.Fatalf("creating plugin: %v", err) | ||||
| 			} | ||||
| 			actualHint, err := p.(*TaintToleration).isSchedulableAfterPodChange(logger, tc.pod, tc.oldObj, tc.newObj) | ||||
| 			if tc.expectedErr { | ||||
| 				if err == nil { | ||||
| 					t.Errorf("unexpected success") | ||||
| 				} | ||||
| 				return | ||||
| 			} | ||||
| 			if err != nil { | ||||
| 				t.Errorf("unexpected error") | ||||
| 				return | ||||
| 			} | ||||
| 			if diff := cmp.Diff(tc.expectedHint, actualHint); diff != "" { | ||||
| 				t.Errorf("Unexpected hint (-want, +got):\n%s", diff) | ||||
| 			} | ||||
| 		}) | ||||
| 	} | ||||
| } | ||||
|   | ||||
| @@ -72,6 +72,17 @@ const ( | ||||
| 	//           - a Pod that is deleted | ||||
| 	//           - a Pod that was assumed, but gets un-assumed due to some errors in the binding cycle. | ||||
| 	//           - an existing Pod that was unscheduled but gets scheduled to a Node. | ||||
| 	// | ||||
| 	// Note that the Pod event type includes the events for the unscheduled Pod itself. | ||||
| 	// i.e., when unscheduled Pods are updated, the scheduling queue checks with Pod/Update QueueingHint(s) whether the update may make the pods schedulable, | ||||
| 	// and requeues them to activeQ/backoffQ when at least one QueueingHint(s) return Queue. | ||||
| 	// Plugins **have to** implement a QueueingHint for Pod/Update event | ||||
| 	// if the rejection from them could be resolved by updating unscheduled Pods themselves. | ||||
| 	// Example: Pods that require excessive resources may be rejected by the noderesources plugin, | ||||
| 	// if this unscheduled pod is updated to require fewer resources, | ||||
| 	// the previous rejection from noderesources plugin can be resolved. | ||||
| 	// this plugin would implement QueueingHint for Pod/Update event | ||||
| 	// that returns Queue when such label changes are made in unscheduled Pods. | ||||
| 	Pod GVK = "Pod" | ||||
| 	// A note about NodeAdd event and UpdateNodeTaint event: | ||||
| 	// NodeAdd QueueingHint isn't always called because of the internal feature called preCheck. | ||||
|   | ||||
| @@ -43,6 +43,8 @@ var ( | ||||
| 	// AssignedPodUpdate is the event when a pod is updated that causes pods with matching affinity | ||||
| 	// terms to be more schedulable. | ||||
| 	AssignedPodUpdate = framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Update, Label: "AssignedPodUpdate"} | ||||
| 	// UnscheduledPodUpdate is the event when an unscheduled pod is updated. | ||||
| 	UnscheduledPodUpdate = framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Update, Label: "UnschedulablePodUpdate"} | ||||
| 	// AssignedPodDelete is the event when a pod is deleted that causes pods with matching affinity | ||||
| 	// terms to be more schedulable. | ||||
| 	AssignedPodDelete = framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Delete, Label: "AssignedPodDelete"} | ||||
|   | ||||
| @@ -1007,27 +1007,45 @@ func (p *PriorityQueue) Update(logger klog.Logger, oldPod, newPod *v1.Pod) error | ||||
| 	if usPodInfo := p.unschedulablePods.get(newPod); usPodInfo != nil { | ||||
| 		pInfo := updatePod(usPodInfo, newPod) | ||||
| 		p.updateNominatedPodUnlocked(logger, oldPod, pInfo.PodInfo) | ||||
| 		gated := usPodInfo.Gated | ||||
| 		if p.isSchedulingQueueHintEnabled { | ||||
| 			// When unscheduled Pods are updated, we check with QueueingHint | ||||
| 			// whether the update may make the pods schedulable. | ||||
| 			// Plugins have to implement a QueueingHint for Pod/Update event | ||||
| 			// if the rejection from them could be resolved by updating unscheduled Pods itself. | ||||
| 			hint := p.isPodWorthRequeuing(logger, pInfo, UnscheduledPodUpdate, oldPod, newPod) | ||||
| 			queue := p.requeuePodViaQueueingHint(logger, pInfo, hint, UnscheduledPodUpdate.Label) | ||||
| 			if queue != unschedulablePods { | ||||
| 				logger.V(5).Info("Pod moved to an internal scheduling queue because the Pod is updated", "pod", klog.KObj(newPod), "event", PodUpdate, "queue", queue) | ||||
| 				p.unschedulablePods.delete(usPodInfo.Pod, gated) | ||||
| 			} | ||||
| 			if queue == activeQ { | ||||
| 				p.cond.Broadcast() | ||||
| 			} | ||||
| 			return nil | ||||
| 		} | ||||
| 		if isPodUpdated(oldPod, newPod) { | ||||
| 			gated := usPodInfo.Gated | ||||
|  | ||||
| 			if p.isPodBackingoff(usPodInfo) { | ||||
| 				if err := p.podBackoffQ.Add(pInfo); err != nil { | ||||
| 					return err | ||||
| 				} | ||||
| 				p.unschedulablePods.delete(usPodInfo.Pod, gated) | ||||
| 				logger.V(5).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", PodUpdate, "queue", backoffQ) | ||||
| 			} else { | ||||
| 				if added, err := p.addToActiveQ(logger, pInfo); !added { | ||||
| 					return err | ||||
| 				} | ||||
| 				p.unschedulablePods.delete(usPodInfo.Pod, gated) | ||||
| 				logger.V(5).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", BackoffComplete, "queue", activeQ) | ||||
| 				p.cond.Broadcast() | ||||
| 				return nil | ||||
| 			} | ||||
| 		} else { | ||||
| 			// Pod update didn't make it schedulable, keep it in the unschedulable queue. | ||||
| 			p.unschedulablePods.addOrUpdate(pInfo) | ||||
|  | ||||
| 			if added, err := p.addToActiveQ(logger, pInfo); !added { | ||||
| 				return err | ||||
| 			} | ||||
| 			p.unschedulablePods.delete(usPodInfo.Pod, gated) | ||||
| 			logger.V(5).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", BackoffComplete, "queue", activeQ) | ||||
| 			p.cond.Broadcast() | ||||
| 			return nil | ||||
| 		} | ||||
|  | ||||
| 		// Pod update didn't make it schedulable, keep it in the unschedulable queue. | ||||
| 		p.unschedulablePods.addOrUpdate(pInfo) | ||||
| 		return nil | ||||
| 	} | ||||
| 	// If pod is not in any of the queues, we put it in the active queue. | ||||
|   | ||||
| @@ -43,6 +43,7 @@ import ( | ||||
| 	podutil "k8s.io/kubernetes/pkg/api/v1/pod" | ||||
| 	"k8s.io/kubernetes/pkg/features" | ||||
| 	"k8s.io/kubernetes/pkg/scheduler/framework" | ||||
| 	plfeature "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" | ||||
| 	"k8s.io/kubernetes/pkg/scheduler/framework/plugins/queuesort" | ||||
| 	"k8s.io/kubernetes/pkg/scheduler/framework/plugins/schedulinggates" | ||||
| 	"k8s.io/kubernetes/pkg/scheduler/metrics" | ||||
| @@ -975,6 +976,23 @@ func TestPriorityQueue_Pop(t *testing.T) { | ||||
| func TestPriorityQueue_Update(t *testing.T) { | ||||
| 	c := testingclock.NewFakeClock(time.Now()) | ||||
|  | ||||
| 	queuePlugin := "queuePlugin" | ||||
| 	skipPlugin := "skipPlugin" | ||||
| 	queueingHintMap := QueueingHintMapPerProfile{ | ||||
| 		"": { | ||||
| 			UnscheduledPodUpdate: { | ||||
| 				{ | ||||
| 					PluginName:     queuePlugin, | ||||
| 					QueueingHintFn: queueHintReturnQueue, | ||||
| 				}, | ||||
| 				{ | ||||
| 					PluginName:     skipPlugin, | ||||
| 					QueueingHintFn: queueHintReturnSkip, | ||||
| 				}, | ||||
| 			}, | ||||
| 		}, | ||||
| 	} | ||||
|  | ||||
| 	tests := []struct { | ||||
| 		name  string | ||||
| 		wantQ string | ||||
| @@ -984,6 +1002,8 @@ func TestPriorityQueue_Update(t *testing.T) { | ||||
| 		// This function returns three values; | ||||
| 		// - oldPod/newPod: each test will call Update() with these oldPod and newPod. | ||||
| 		prepareFunc func(t *testing.T, logger klog.Logger, q *PriorityQueue) (oldPod, newPod *v1.Pod) | ||||
| 		// schedulingHintsEnablement shows which value of QHint feature gate we test a test case with. | ||||
| 		schedulingHintsEnablement []bool | ||||
| 	}{ | ||||
| 		{ | ||||
| 			name:  "add highPriorityPodInfo to activeQ", | ||||
| @@ -991,6 +1011,7 @@ func TestPriorityQueue_Update(t *testing.T) { | ||||
| 			prepareFunc: func(t *testing.T, logger klog.Logger, q *PriorityQueue) (oldPod, newPod *v1.Pod) { | ||||
| 				return nil, highPriorityPodInfo.Pod | ||||
| 			}, | ||||
| 			schedulingHintsEnablement: []bool{false, true}, | ||||
| 		}, | ||||
| 		{ | ||||
| 			name:  "Update pod that didn't exist in the queue", | ||||
| @@ -1000,6 +1021,7 @@ func TestPriorityQueue_Update(t *testing.T) { | ||||
| 				updatedPod.Annotations["foo"] = "test" | ||||
| 				return medPriorityPodInfo.Pod, updatedPod | ||||
| 			}, | ||||
| 			schedulingHintsEnablement: []bool{false, true}, | ||||
| 		}, | ||||
| 		{ | ||||
| 			name:                 "Update highPriorityPodInfo and add a nominatedNodeName to it", | ||||
| @@ -1008,6 +1030,7 @@ func TestPriorityQueue_Update(t *testing.T) { | ||||
| 			prepareFunc: func(t *testing.T, logger klog.Logger, q *PriorityQueue) (oldPod, newPod *v1.Pod) { | ||||
| 				return highPriorityPodInfo.Pod, highPriNominatedPodInfo.Pod | ||||
| 			}, | ||||
| 			schedulingHintsEnablement: []bool{false, true}, | ||||
| 		}, | ||||
| 		{ | ||||
| 			name:  "When updating a pod that is already in activeQ, the pod should remain in activeQ after Update()", | ||||
| @@ -1019,6 +1042,7 @@ func TestPriorityQueue_Update(t *testing.T) { | ||||
| 				} | ||||
| 				return highPriorityPodInfo.Pod, highPriorityPodInfo.Pod | ||||
| 			}, | ||||
| 			schedulingHintsEnablement: []bool{false, true}, | ||||
| 		}, | ||||
| 		{ | ||||
| 			name:  "When updating a pod that is in backoff queue and is still backing off, it will be updated in backoff queue", | ||||
| @@ -1030,22 +1054,24 @@ func TestPriorityQueue_Update(t *testing.T) { | ||||
| 				} | ||||
| 				return podInfo.Pod, podInfo.Pod | ||||
| 			}, | ||||
| 			schedulingHintsEnablement: []bool{false, true}, | ||||
| 		}, | ||||
| 		{ | ||||
| 			name:  "when updating a pod which is in unschedulable queue and is backing off, it will be moved to backoff queue", | ||||
| 			wantQ: backoffQ, | ||||
| 			prepareFunc: func(t *testing.T, logger klog.Logger, q *PriorityQueue) (oldPod, newPod *v1.Pod) { | ||||
| 				q.unschedulablePods.addOrUpdate(q.newQueuedPodInfo(medPriorityPodInfo.Pod, "plugin")) | ||||
| 				q.unschedulablePods.addOrUpdate(q.newQueuedPodInfo(medPriorityPodInfo.Pod, queuePlugin)) | ||||
| 				updatedPod := medPriorityPodInfo.Pod.DeepCopy() | ||||
| 				updatedPod.Annotations["foo"] = "test" | ||||
| 				return medPriorityPodInfo.Pod, updatedPod | ||||
| 			}, | ||||
| 			schedulingHintsEnablement: []bool{false, true}, | ||||
| 		}, | ||||
| 		{ | ||||
| 			name:  "when updating a pod which is in unschedulable queue and is not backing off, it will be moved to active queue", | ||||
| 			wantQ: activeQ, | ||||
| 			prepareFunc: func(t *testing.T, logger klog.Logger, q *PriorityQueue) (oldPod, newPod *v1.Pod) { | ||||
| 				q.unschedulablePods.addOrUpdate(q.newQueuedPodInfo(medPriorityPodInfo.Pod, "plugin")) | ||||
| 				q.unschedulablePods.addOrUpdate(q.newQueuedPodInfo(medPriorityPodInfo.Pod, queuePlugin)) | ||||
| 				updatedPod := medPriorityPodInfo.Pod.DeepCopy() | ||||
| 				updatedPod.Annotations["foo"] = "test1" | ||||
| 				// Move clock by podInitialBackoffDuration, so that pods in the unschedulablePods would pass the backing off, | ||||
| @@ -1053,56 +1079,71 @@ func TestPriorityQueue_Update(t *testing.T) { | ||||
| 				c.Step(q.podInitialBackoffDuration) | ||||
| 				return medPriorityPodInfo.Pod, updatedPod | ||||
| 			}, | ||||
| 			schedulingHintsEnablement: []bool{false, true}, | ||||
| 		}, | ||||
| 		{ | ||||
| 			name:  "when updating a pod which is in unschedulable pods but the plugin returns skip, it will remain in unschedulablePods", | ||||
| 			wantQ: unschedulablePods, | ||||
| 			prepareFunc: func(t *testing.T, logger klog.Logger, q *PriorityQueue) (oldPod, newPod *v1.Pod) { | ||||
| 				q.unschedulablePods.addOrUpdate(q.newQueuedPodInfo(medPriorityPodInfo.Pod, skipPlugin)) | ||||
| 				updatedPod := medPriorityPodInfo.Pod.DeepCopy() | ||||
| 				updatedPod.Annotations["foo"] = "test1" | ||||
| 				return medPriorityPodInfo.Pod, updatedPod | ||||
| 			}, | ||||
| 			schedulingHintsEnablement: []bool{true}, | ||||
| 		}, | ||||
| 	} | ||||
|  | ||||
| 	for _, tt := range tests { | ||||
| 		t.Run(tt.name, func(t *testing.T) { | ||||
| 			logger, ctx := ktesting.NewTestContext(t) | ||||
| 			objs := []runtime.Object{highPriorityPodInfo.Pod, unschedulablePodInfo.Pod, medPriorityPodInfo.Pod} | ||||
| 			ctx, cancel := context.WithCancel(ctx) | ||||
| 			defer cancel() | ||||
| 			q := NewTestQueueWithObjects(ctx, newDefaultQueueSort(), objs, WithClock(c)) | ||||
| 		for _, qHintEnabled := range tt.schedulingHintsEnablement { | ||||
| 			t.Run(fmt.Sprintf("%s, with queuehint(%v)", tt.name, qHintEnabled), func(t *testing.T) { | ||||
| 				featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SchedulerQueueingHints, qHintEnabled) | ||||
| 				logger, ctx := ktesting.NewTestContext(t) | ||||
| 				objs := []runtime.Object{highPriorityPodInfo.Pod, unschedulablePodInfo.Pod, medPriorityPodInfo.Pod} | ||||
| 				ctx, cancel := context.WithCancel(ctx) | ||||
| 				defer cancel() | ||||
| 				q := NewTestQueueWithObjects(ctx, newDefaultQueueSort(), objs, WithClock(c), WithQueueingHintMapPerProfile(queueingHintMap)) | ||||
|  | ||||
| 			oldPod, newPod := tt.prepareFunc(t, logger, q) | ||||
| 				oldPod, newPod := tt.prepareFunc(t, logger, q) | ||||
|  | ||||
| 			if err := q.Update(logger, oldPod, newPod); err != nil { | ||||
| 				t.Fatalf("unexpected error from Update: %v", err) | ||||
| 			} | ||||
|  | ||||
| 			var pInfo *framework.QueuedPodInfo | ||||
|  | ||||
| 			// validate expected queue | ||||
| 			if obj, exists, _ := q.podBackoffQ.Get(newQueuedPodInfoForLookup(newPod)); exists { | ||||
| 				if tt.wantQ != backoffQ { | ||||
| 					t.Errorf("expected pod %s not to be queued to backoffQ, but it was", newPod.Name) | ||||
| 				if err := q.Update(logger, oldPod, newPod); err != nil { | ||||
| 					t.Fatalf("unexpected error from Update: %v", err) | ||||
| 				} | ||||
| 				pInfo = obj.(*framework.QueuedPodInfo) | ||||
| 			} | ||||
|  | ||||
| 			if obj, exists, _ := q.activeQ.Get(newQueuedPodInfoForLookup(newPod)); exists { | ||||
| 				if tt.wantQ != activeQ { | ||||
| 					t.Errorf("expected pod %s not to be queued to activeQ, but it was", newPod.Name) | ||||
| 				var pInfo *framework.QueuedPodInfo | ||||
|  | ||||
| 				// validate expected queue | ||||
| 				if obj, exists, _ := q.podBackoffQ.Get(newQueuedPodInfoForLookup(newPod)); exists { | ||||
| 					if tt.wantQ != backoffQ { | ||||
| 						t.Errorf("expected pod %s not to be queued to backoffQ, but it was", newPod.Name) | ||||
| 					} | ||||
| 					pInfo = obj.(*framework.QueuedPodInfo) | ||||
| 				} | ||||
| 				pInfo = obj.(*framework.QueuedPodInfo) | ||||
| 			} | ||||
|  | ||||
| 			if pInfoFromUnsched := q.unschedulablePods.get(newPod); pInfoFromUnsched != nil { | ||||
| 				if tt.wantQ != unschedulablePods { | ||||
| 					t.Errorf("expected pod %s to not be queued to unschedulablePods, but it was", newPod.Name) | ||||
| 				if obj, exists, _ := q.activeQ.Get(newQueuedPodInfoForLookup(newPod)); exists { | ||||
| 					if tt.wantQ != activeQ { | ||||
| 						t.Errorf("expected pod %s not to be queued to activeQ, but it was", newPod.Name) | ||||
| 					} | ||||
| 					pInfo = obj.(*framework.QueuedPodInfo) | ||||
| 				} | ||||
| 				pInfo = pInfoFromUnsched | ||||
| 			} | ||||
|  | ||||
| 			if diff := cmp.Diff(newPod, pInfo.PodInfo.Pod); diff != "" { | ||||
| 				t.Errorf("Unexpected updated pod diff (-want, +got): %s", diff) | ||||
| 			} | ||||
| 				if pInfoFromUnsched := q.unschedulablePods.get(newPod); pInfoFromUnsched != nil { | ||||
| 					if tt.wantQ != unschedulablePods { | ||||
| 						t.Errorf("expected pod %s to not be queued to unschedulablePods, but it was", newPod.Name) | ||||
| 					} | ||||
| 					pInfo = pInfoFromUnsched | ||||
| 				} | ||||
|  | ||||
| 			if tt.wantAddedToNominated && len(q.nominator.nominatedPods) != 1 { | ||||
| 				t.Errorf("Expected one item in nomindatePods map: %v", q.nominator) | ||||
| 			} | ||||
| 				if diff := cmp.Diff(newPod, pInfo.PodInfo.Pod); diff != "" { | ||||
| 					t.Errorf("Unexpected updated pod diff (-want, +got): %s", diff) | ||||
| 				} | ||||
|  | ||||
| 		}) | ||||
| 				if tt.wantAddedToNominated && len(q.nominator.nominatedPods) != 1 { | ||||
| 					t.Errorf("Expected one item in nomindatePods map: %v", q.nominator) | ||||
| 				} | ||||
|  | ||||
| 			}) | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|  | ||||
| @@ -3746,7 +3787,7 @@ func Test_isPodWorthRequeuing(t *testing.T) { | ||||
|  | ||||
| func Test_queuedPodInfo_gatedSetUponCreationAndUnsetUponUpdate(t *testing.T) { | ||||
| 	logger, ctx := ktesting.NewTestContext(t) | ||||
| 	plugin, _ := schedulinggates.New(ctx, nil, nil) | ||||
| 	plugin, _ := schedulinggates.New(ctx, nil, nil, plfeature.Features{}) | ||||
| 	m := map[string][]framework.PreEnqueuePlugin{"": {plugin.(framework.PreEnqueuePlugin)}} | ||||
| 	q := NewTestQueue(ctx, newDefaultQueueSort(), WithPreEnqueuePluginMap(m)) | ||||
|  | ||||
|   | ||||
| @@ -34,9 +34,12 @@ import ( | ||||
| 	"k8s.io/apimachinery/pkg/types" | ||||
| 	"k8s.io/apimachinery/pkg/util/sets" | ||||
| 	"k8s.io/apimachinery/pkg/util/wait" | ||||
| 	utilfeature "k8s.io/apiserver/pkg/util/feature" | ||||
| 	clientset "k8s.io/client-go/kubernetes" | ||||
| 	listersv1 "k8s.io/client-go/listers/core/v1" | ||||
| 	featuregatetesting "k8s.io/component-base/featuregate/testing" | ||||
| 	configv1 "k8s.io/kube-scheduler/config/v1" | ||||
| 	"k8s.io/kubernetes/pkg/features" | ||||
| 	"k8s.io/kubernetes/pkg/scheduler" | ||||
| 	schedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config" | ||||
| 	configtesting "k8s.io/kubernetes/pkg/scheduler/apis/config/testing" | ||||
| @@ -2665,129 +2668,160 @@ func TestSchedulingGatesPluginEventsToRegister(t *testing.T) { | ||||
| 	} | ||||
|  | ||||
| 	tests := []struct { | ||||
| 		name          string | ||||
| 		enqueuePlugin framework.PreEnqueuePlugin | ||||
| 		count         int | ||||
| 		name              string | ||||
| 		withEvents        bool | ||||
| 		count             int | ||||
| 		queueHintEnabled  []bool | ||||
| 		expectedScheduled []bool | ||||
| 	}{ | ||||
| 		{ | ||||
| 			name:          "preEnqueue plugin without event registered", | ||||
| 			enqueuePlugin: &SchedulingGatesPluginWOEvents{SchedulingGates: schedulinggates.SchedulingGates{}}, | ||||
| 			count:         2, | ||||
| 			name:       "preEnqueue plugin without event registered", | ||||
| 			withEvents: false, | ||||
| 			count:      2, | ||||
| 			// This test case doesn't expect that the pod is scheduled again after the pod is updated | ||||
| 			// when queuehint is enabled, because it doesn't register any events in EventsToRegister. | ||||
| 			queueHintEnabled:  []bool{false, true}, | ||||
| 			expectedScheduled: []bool{true, false}, | ||||
| 		}, | ||||
| 		{ | ||||
| 			name:          "preEnqueue plugin with event registered", | ||||
| 			enqueuePlugin: &SchedulingGatesPluginWithEvents{SchedulingGates: schedulinggates.SchedulingGates{}}, | ||||
| 			count:         2, | ||||
| 			name:              "preEnqueue plugin with event registered", | ||||
| 			withEvents:        true, | ||||
| 			count:             2, | ||||
| 			queueHintEnabled:  []bool{false, true}, | ||||
| 			expectedScheduled: []bool{true, true}, | ||||
| 		}, | ||||
| 	} | ||||
|  | ||||
| 	for _, tt := range tests { | ||||
| 		t.Run(tt.name, func(t *testing.T) { | ||||
| 			registry := frameworkruntime.Registry{ | ||||
| 				tt.enqueuePlugin.Name(): newPlugin(tt.enqueuePlugin), | ||||
| 			} | ||||
| 		for i := 0; i < len(tt.queueHintEnabled); i++ { | ||||
| 			queueHintEnabled := tt.queueHintEnabled[i] | ||||
| 			expectedScheduled := tt.expectedScheduled[i] | ||||
|  | ||||
| 			// Setup plugins for testing. | ||||
| 			cfg := configtesting.V1ToInternalWithDefaults(t, configv1.KubeSchedulerConfiguration{ | ||||
| 				Profiles: []configv1.KubeSchedulerProfile{{ | ||||
| 					SchedulerName: pointer.String(v1.DefaultSchedulerName), | ||||
| 					Plugins: &configv1.Plugins{ | ||||
| 						PreEnqueue: configv1.PluginSet{ | ||||
| 							Enabled: []configv1.Plugin{ | ||||
| 								{Name: tt.enqueuePlugin.Name()}, | ||||
| 							}, | ||||
| 							Disabled: []configv1.Plugin{ | ||||
| 								{Name: "*"}, | ||||
| 			t.Run(tt.name+fmt.Sprintf(" queueHint(%v)", queueHintEnabled), func(t *testing.T) { | ||||
| 				featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SchedulerQueueingHints, queueHintEnabled) | ||||
|  | ||||
| 				// new plugin every time to clear counts | ||||
| 				var plugin framework.PreEnqueuePlugin | ||||
| 				if tt.withEvents { | ||||
| 					plugin = &SchedulingGatesPluginWithEvents{SchedulingGates: schedulinggates.SchedulingGates{}} | ||||
| 				} else { | ||||
| 					plugin = &SchedulingGatesPluginWOEvents{SchedulingGates: schedulinggates.SchedulingGates{}} | ||||
| 				} | ||||
|  | ||||
| 				registry := frameworkruntime.Registry{ | ||||
| 					plugin.Name(): newPlugin(plugin), | ||||
| 				} | ||||
|  | ||||
| 				// Setup plugins for testing. | ||||
| 				cfg := configtesting.V1ToInternalWithDefaults(t, configv1.KubeSchedulerConfiguration{ | ||||
| 					Profiles: []configv1.KubeSchedulerProfile{{ | ||||
| 						SchedulerName: pointer.String(v1.DefaultSchedulerName), | ||||
| 						Plugins: &configv1.Plugins{ | ||||
| 							PreEnqueue: configv1.PluginSet{ | ||||
| 								Enabled: []configv1.Plugin{ | ||||
| 									{Name: plugin.Name()}, | ||||
| 								}, | ||||
| 								Disabled: []configv1.Plugin{ | ||||
| 									{Name: "*"}, | ||||
| 								}, | ||||
| 							}, | ||||
| 						}, | ||||
| 					}, | ||||
| 				}}, | ||||
| 					}}, | ||||
| 				}) | ||||
|  | ||||
| 				testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 2, | ||||
| 					scheduler.WithProfiles(cfg.Profiles...), | ||||
| 					scheduler.WithFrameworkOutOfTreeRegistry(registry), | ||||
| 				) | ||||
| 				defer teardown() | ||||
|  | ||||
| 				// Create a pod with schedulingGates. | ||||
| 				gatedPod := st.MakePod().Name("p").Namespace(testContext.NS.Name). | ||||
| 					SchedulingGates([]string{"foo"}). | ||||
| 					PodAffinity("kubernetes.io/hostname", &metav1.LabelSelector{MatchLabels: map[string]string{"foo": "bar"}}, st.PodAffinityWithRequiredReq). | ||||
| 					Container("pause").Obj() | ||||
| 				gatedPod, err := testutils.CreatePausePod(testCtx.ClientSet, gatedPod) | ||||
| 				if err != nil { | ||||
| 					t.Errorf("Error while creating a gated pod: %v", err) | ||||
| 					return | ||||
| 				} | ||||
|  | ||||
| 				if err := testutils.WaitForPodSchedulingGated(testCtx.Ctx, testCtx.ClientSet, gatedPod, 10*time.Second); err != nil { | ||||
| 					t.Errorf("Expected the pod to be gated, but got: %v", err) | ||||
| 					return | ||||
| 				} | ||||
| 				if num(plugin) != 1 { | ||||
| 					t.Errorf("Expected the preEnqueue plugin to be called once, but got %v", num(plugin)) | ||||
| 					return | ||||
| 				} | ||||
|  | ||||
| 				// Create a best effort pod. | ||||
| 				pausePod, err := testutils.CreatePausePod(testCtx.ClientSet, testutils.InitPausePod(&testutils.PausePodConfig{ | ||||
| 					Name:      "pause-pod", | ||||
| 					Namespace: testCtx.NS.Name, | ||||
| 					Labels:    map[string]string{"foo": "bar"}, | ||||
| 				})) | ||||
| 				if err != nil { | ||||
| 					t.Errorf("Error while creating a pod: %v", err) | ||||
| 					return | ||||
| 				} | ||||
|  | ||||
| 				// Wait for the pod schedulabled. | ||||
| 				if err := testutils.WaitForPodToScheduleWithTimeout(testCtx.ClientSet, pausePod, 10*time.Second); err != nil { | ||||
| 					t.Errorf("Expected the pod to be schedulable, but got: %v", err) | ||||
| 					return | ||||
| 				} | ||||
|  | ||||
| 				// Update the pod which will trigger the requeue logic if plugin registers the events. | ||||
| 				pausePod, err = testCtx.ClientSet.CoreV1().Pods(pausePod.Namespace).Get(testCtx.Ctx, pausePod.Name, metav1.GetOptions{}) | ||||
| 				if err != nil { | ||||
| 					t.Errorf("Error while getting a pod: %v", err) | ||||
| 					return | ||||
| 				} | ||||
| 				pausePod.Annotations = map[string]string{"foo": "bar"} | ||||
| 				_, err = testCtx.ClientSet.CoreV1().Pods(pausePod.Namespace).Update(testCtx.Ctx, pausePod, metav1.UpdateOptions{}) | ||||
| 				if err != nil { | ||||
| 					t.Errorf("Error while updating a pod: %v", err) | ||||
| 					return | ||||
| 				} | ||||
|  | ||||
| 				// Pod should still be unschedulable because scheduling gates still exist, theoretically, it's a waste rescheduling. | ||||
| 				if err := testutils.WaitForPodSchedulingGated(testCtx.Ctx, testCtx.ClientSet, gatedPod, 10*time.Second); err != nil { | ||||
| 					t.Errorf("Expected the pod to be gated, but got: %v", err) | ||||
| 					return | ||||
| 				} | ||||
| 				if num(plugin) != tt.count { | ||||
| 					t.Errorf("Expected the preEnqueue plugin to be called %v, but got %v", tt.count, num(plugin)) | ||||
| 					return | ||||
| 				} | ||||
|  | ||||
| 				// Remove gated pod's scheduling gates. | ||||
| 				gatedPod, err = testCtx.ClientSet.CoreV1().Pods(gatedPod.Namespace).Get(testCtx.Ctx, gatedPod.Name, metav1.GetOptions{}) | ||||
| 				if err != nil { | ||||
| 					t.Errorf("Error while getting a pod: %v", err) | ||||
| 					return | ||||
| 				} | ||||
| 				gatedPod.Spec.SchedulingGates = nil | ||||
| 				_, err = testCtx.ClientSet.CoreV1().Pods(gatedPod.Namespace).Update(testCtx.Ctx, gatedPod, metav1.UpdateOptions{}) | ||||
| 				if err != nil { | ||||
| 					t.Errorf("Error while updating a pod: %v", err) | ||||
| 					return | ||||
| 				} | ||||
|  | ||||
| 				if expectedScheduled { | ||||
| 					if err := testutils.WaitForPodToScheduleWithTimeout(testCtx.ClientSet, gatedPod, 10*time.Second); err != nil { | ||||
| 						t.Errorf("Expected the pod to be schedulable, but got: %v", err) | ||||
| 					} | ||||
| 					return | ||||
| 				} | ||||
| 				// wait for some time to ensure that the schedulerQueue has completed processing the podUpdate event. | ||||
| 				time.Sleep(time.Second) | ||||
| 				// pod shouldn't be scheduled if we didn't register podUpdate event for schedulingGates plugin | ||||
| 				if err := testutils.WaitForPodSchedulingGated(testCtx.Ctx, testCtx.ClientSet, gatedPod, 10*time.Second); err != nil { | ||||
| 					t.Errorf("Expected the pod to be gated, but got: %v", err) | ||||
| 					return | ||||
| 				} | ||||
| 			}) | ||||
|  | ||||
| 			testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 2, | ||||
| 				scheduler.WithProfiles(cfg.Profiles...), | ||||
| 				scheduler.WithFrameworkOutOfTreeRegistry(registry), | ||||
| 			) | ||||
| 			defer teardown() | ||||
|  | ||||
| 			// Create a pod with schedulingGates. | ||||
| 			gatedPod := st.MakePod().Name("p").Namespace(testContext.NS.Name). | ||||
| 				SchedulingGates([]string{"foo"}). | ||||
| 				PodAffinity("kubernetes.io/hostname", &metav1.LabelSelector{MatchLabels: map[string]string{"foo": "bar"}}, st.PodAffinityWithRequiredReq). | ||||
| 				Container("pause").Obj() | ||||
| 			gatedPod, err := testutils.CreatePausePod(testCtx.ClientSet, gatedPod) | ||||
| 			if err != nil { | ||||
| 				t.Errorf("Error while creating a gated pod: %v", err) | ||||
| 				return | ||||
| 			} | ||||
|  | ||||
| 			if err := testutils.WaitForPodSchedulingGated(testCtx.Ctx, testCtx.ClientSet, gatedPod, 10*time.Second); err != nil { | ||||
| 				t.Errorf("Expected the pod to be gated, but got: %v", err) | ||||
| 				return | ||||
| 			} | ||||
| 			if num(tt.enqueuePlugin) != 1 { | ||||
| 				t.Errorf("Expected the preEnqueue plugin to be called once, but got %v", num(tt.enqueuePlugin)) | ||||
| 				return | ||||
| 			} | ||||
|  | ||||
| 			// Create a best effort pod. | ||||
| 			pausePod, err := testutils.CreatePausePod(testCtx.ClientSet, testutils.InitPausePod(&testutils.PausePodConfig{ | ||||
| 				Name:      "pause-pod", | ||||
| 				Namespace: testCtx.NS.Name, | ||||
| 				Labels:    map[string]string{"foo": "bar"}, | ||||
| 			})) | ||||
| 			if err != nil { | ||||
| 				t.Errorf("Error while creating a pod: %v", err) | ||||
| 				return | ||||
| 			} | ||||
|  | ||||
| 			// Wait for the pod schedulabled. | ||||
| 			if err := testutils.WaitForPodToScheduleWithTimeout(testCtx.ClientSet, pausePod, 10*time.Second); err != nil { | ||||
| 				t.Errorf("Expected the pod to be schedulable, but got: %v", err) | ||||
| 				return | ||||
| 			} | ||||
|  | ||||
| 			// Update the pod which will trigger the requeue logic if plugin registers the events. | ||||
| 			pausePod, err = testCtx.ClientSet.CoreV1().Pods(pausePod.Namespace).Get(testCtx.Ctx, pausePod.Name, metav1.GetOptions{}) | ||||
| 			if err != nil { | ||||
| 				t.Errorf("Error while getting a pod: %v", err) | ||||
| 				return | ||||
| 			} | ||||
| 			pausePod.Annotations = map[string]string{"foo": "bar"} | ||||
| 			_, err = testCtx.ClientSet.CoreV1().Pods(pausePod.Namespace).Update(testCtx.Ctx, pausePod, metav1.UpdateOptions{}) | ||||
| 			if err != nil { | ||||
| 				t.Errorf("Error while updating a pod: %v", err) | ||||
| 				return | ||||
| 			} | ||||
|  | ||||
| 			// Pod should still be unschedulable because scheduling gates still exist, theoretically, it's a waste rescheduling. | ||||
| 			if err := testutils.WaitForPodSchedulingGated(testCtx.Ctx, testCtx.ClientSet, gatedPod, 10*time.Second); err != nil { | ||||
| 				t.Errorf("Expected the pod to be gated, but got: %v", err) | ||||
| 				return | ||||
| 			} | ||||
| 			if num(tt.enqueuePlugin) != tt.count { | ||||
| 				t.Errorf("Expected the preEnqueue plugin to be called %v, but got %v", tt.count, num(tt.enqueuePlugin)) | ||||
| 				return | ||||
| 			} | ||||
|  | ||||
| 			// Remove gated pod's scheduling gates. | ||||
| 			gatedPod, err = testCtx.ClientSet.CoreV1().Pods(gatedPod.Namespace).Get(testCtx.Ctx, gatedPod.Name, metav1.GetOptions{}) | ||||
| 			if err != nil { | ||||
| 				t.Errorf("Error while getting a pod: %v", err) | ||||
| 				return | ||||
| 			} | ||||
| 			gatedPod.Spec.SchedulingGates = nil | ||||
| 			_, err = testCtx.ClientSet.CoreV1().Pods(gatedPod.Namespace).Update(testCtx.Ctx, gatedPod, metav1.UpdateOptions{}) | ||||
| 			if err != nil { | ||||
| 				t.Errorf("Error while updating a pod: %v", err) | ||||
| 				return | ||||
| 			} | ||||
|  | ||||
| 			// Ungated pod should be schedulable now. | ||||
| 			if err := testutils.WaitForPodToScheduleWithTimeout(testCtx.ClientSet, gatedPod, 10*time.Second); err != nil { | ||||
| 				t.Errorf("Expected the pod to be schedulable, but got: %v", err) | ||||
| 				return | ||||
| 			} | ||||
| 		}) | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|   | ||||
| @@ -198,6 +198,8 @@ func TestCoreResourceEnqueue(t *testing.T) { | ||||
| 		triggerFn func(testCtx *testutils.TestContext) error | ||||
| 		// wantRequeuedPods is the map of Pods that are expected to be requeued after triggerFn. | ||||
| 		wantRequeuedPods sets.Set[string] | ||||
| 		// enableSchedulingQueueHint indicates which feature gate value(s) the test case should run with. | ||||
| 		enableSchedulingQueueHint []bool | ||||
| 	}{ | ||||
| 		{ | ||||
| 			name:        "Pod without a required toleration to a node isn't requeued to activeQ", | ||||
| @@ -218,7 +220,8 @@ func TestCoreResourceEnqueue(t *testing.T) { | ||||
| 				} | ||||
| 				return nil | ||||
| 			}, | ||||
| 			wantRequeuedPods: sets.New("pod2"), | ||||
| 			wantRequeuedPods:          sets.New("pod2"), | ||||
| 			enableSchedulingQueueHint: []bool{false, true}, | ||||
| 		}, | ||||
| 		{ | ||||
| 			name:        "Pod rejected by the PodAffinity plugin is requeued when a new Node is created and turned to ready", | ||||
| @@ -247,14 +250,90 @@ func TestCoreResourceEnqueue(t *testing.T) { | ||||
| 				} | ||||
| 				return nil | ||||
| 			}, | ||||
| 			wantRequeuedPods: sets.New("pod2"), | ||||
| 			wantRequeuedPods:          sets.New("pod2"), | ||||
| 			enableSchedulingQueueHint: []bool{false, true}, | ||||
| 		}, | ||||
| 		{ | ||||
| 			name:        "Pod updated with toleration requeued to activeQ", | ||||
| 			initialNode: st.MakeNode().Name("fake-node").Capacity(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Taints([]v1.Taint{{Key: "taint-key", Effect: v1.TaintEffectNoSchedule}}).Obj(), | ||||
| 			pods: []*v1.Pod{ | ||||
| 				// - Pod1 doesn't have the required toleration and will be rejected by the TaintToleration plugin. | ||||
| 				st.MakePod().Name("pod1").Req(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Container("image").Obj(), | ||||
| 			}, | ||||
| 			triggerFn: func(testCtx *testutils.TestContext) error { | ||||
| 				// Trigger a PodUpdate event by adding a toleration to Pod1. | ||||
| 				// It makes Pod1 schedulable. | ||||
| 				if _, err := testCtx.ClientSet.CoreV1().Pods(testCtx.NS.Name).Update(testCtx.Ctx, st.MakePod().Name("pod1").Req(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Container("image").Toleration("taint-key").Obj(), metav1.UpdateOptions{}); err != nil { | ||||
| 					return fmt.Errorf("failed to update the pod: %w", err) | ||||
| 				} | ||||
| 				return nil | ||||
| 			}, | ||||
| 			wantRequeuedPods:          sets.New("pod1"), | ||||
| 			enableSchedulingQueueHint: []bool{false, true}, | ||||
| 		}, | ||||
| 		{ | ||||
| 			name:        "Pod got resource scaled down requeued to activeQ", | ||||
| 			initialNode: st.MakeNode().Name("fake-node").Capacity(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Obj(), | ||||
| 			pods: []*v1.Pod{ | ||||
| 				// - Pod1 requests a large amount of CPU and will be rejected by the NodeResourcesFit plugin. | ||||
| 				st.MakePod().Name("pod1").Req(map[v1.ResourceName]string{v1.ResourceCPU: "4"}).Container("image").Obj(), | ||||
| 			}, | ||||
| 			triggerFn: func(testCtx *testutils.TestContext) error { | ||||
| 				// Trigger a PodUpdate event by reducing cpu requested by pod1. | ||||
| 				// It makes Pod1 schedulable. | ||||
| 				if _, err := testCtx.ClientSet.CoreV1().Pods(testCtx.NS.Name).Update(testCtx.Ctx, st.MakePod().Name("pod1").Req(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Container("image").Obj(), metav1.UpdateOptions{}); err != nil { | ||||
| 					return fmt.Errorf("failed to update the pod: %w", err) | ||||
| 				} | ||||
| 				return nil | ||||
| 			}, | ||||
| 			wantRequeuedPods:          sets.New("pod1"), | ||||
| 			enableSchedulingQueueHint: []bool{false, true}, | ||||
| 		}, | ||||
| 		{ | ||||
| 			name:        "Updating pod condition doesn't retry scheduling if the Pod was rejected by TaintToleration", | ||||
| 			initialNode: st.MakeNode().Name("fake-node").Taints([]v1.Taint{{Key: v1.TaintNodeNotReady, Effect: v1.TaintEffectNoSchedule}}).Obj(), | ||||
| 			pods: []*v1.Pod{ | ||||
| 				// - Pod1 doesn't have the required toleration and will be rejected by the TaintToleration plugin. | ||||
| 				st.MakePod().Name("pod1").Container("image").Obj(), | ||||
| 			}, | ||||
| 			// Simulate a Pod update by directly calling `SchedulingQueue.Update` instead of actually updating a Pod | ||||
| 			// because we don't have a way to confirm the scheduler has handled a Pod update event at the moment. | ||||
| 			// TODO: actually update a Pod update and confirm the scheduler has handled a Pod update event with a metric. | ||||
| 			// https://github.com/kubernetes/kubernetes/pull/122234#discussion_r1597456808 | ||||
| 			triggerFn: func(testCtx *testutils.TestContext) (err error) { | ||||
| 				// Trigger a Pod Condition update event. | ||||
| 				// It will not make pod1 schedulable | ||||
| 				var ( | ||||
| 					oldPod *v1.Pod | ||||
| 					newPod *v1.Pod | ||||
| 				) | ||||
| 				if oldPod, err = testCtx.ClientSet.CoreV1().Pods(testCtx.NS.Name).Get(testCtx.Ctx, "pod1", metav1.GetOptions{}); err != nil { | ||||
| 					return fmt.Errorf("failed to get the pod: %w", err) | ||||
| 				} | ||||
| 				newPod = oldPod.DeepCopy() | ||||
| 				newPod.Status.Conditions[0].Message = "injected message" | ||||
|  | ||||
| 				if err := testCtx.Scheduler.SchedulingQueue.Update( | ||||
| 					klog.FromContext(testCtx.Ctx), | ||||
| 					oldPod, | ||||
| 					newPod, | ||||
| 				); err != nil { | ||||
| 					return fmt.Errorf("failed to update the pod: %w", err) | ||||
| 				} | ||||
| 				return nil | ||||
| 			}, | ||||
| 			wantRequeuedPods: sets.Set[string]{}, | ||||
| 			// This behaviour is only true when enabling QHint | ||||
| 			// because QHint of TaintToleration would decide to ignore a Pod update. | ||||
| 			enableSchedulingQueueHint: []bool{true}, | ||||
| 		}, | ||||
| 	} | ||||
|  | ||||
| 	for _, featureEnabled := range []bool{false, true} { | ||||
| 		for _, tt := range tests { | ||||
| 	for _, tt := range tests { | ||||
| 		for _, featureEnabled := range tt.enableSchedulingQueueHint { | ||||
| 			t.Run(fmt.Sprintf("%s [SchedulerQueueingHints enabled: %v]", tt.name, featureEnabled), func(t *testing.T) { | ||||
| 				featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SchedulerQueueingHints, featureEnabled) | ||||
| 				featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.InPlacePodVerticalScaling, true) | ||||
|  | ||||
| 				// Use zero backoff seconds to bypass backoffQ. | ||||
| 				// It's intended to not start the scheduler's queue, and hence to | ||||
| @@ -271,7 +350,7 @@ func TestCoreResourceEnqueue(t *testing.T) { | ||||
| 				defer testCtx.Scheduler.SchedulingQueue.Close() | ||||
|  | ||||
| 				cs, ns, ctx := testCtx.ClientSet, testCtx.NS.Name, testCtx.Ctx | ||||
| 				// Create one Node with a taint. | ||||
| 				// Create initialNode. | ||||
| 				if _, err := cs.CoreV1().Nodes().Create(ctx, tt.initialNode, metav1.CreateOptions{}); err != nil { | ||||
| 					t.Fatalf("Failed to create an initial Node %q: %v", tt.initialNode.Name, err) | ||||
| 				} | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 Kubernetes Prow Robot
					Kubernetes Prow Robot