From 6f8d38406a7f16fc9cc9b72789a9b826105b1b54 Mon Sep 17 00:00:00 2001 From: Kensei Nakada Date: Thu, 8 Jun 2023 04:54:30 +0000 Subject: [PATCH] feature(scheduler): implement ClusterEventWithHint to filter out useless events --- pkg/scheduler/eventhandlers.go | 28 +- pkg/scheduler/framework/interface.go | 9 +- .../dynamicresources/dynamicresources.go | 11 +- .../plugins/interpodaffinity/plugin.go | 8 +- .../plugins/nodeaffinity/node_affinity.go | 6 +- .../framework/plugins/nodename/node_name.go | 6 +- .../framework/plugins/nodeports/node_ports.go | 8 +- .../framework/plugins/noderesources/fit.go | 8 +- .../plugins/noderesources/fit_test.go | 14 +- .../nodeunschedulable/node_unschedulable.go | 6 +- .../framework/plugins/nodevolumelimits/csi.go | 8 +- .../plugins/nodevolumelimits/non_csi.go | 8 +- .../plugins/podtopologyspread/plugin.go | 8 +- .../schedulinggates/scheduling_gates.go | 6 +- .../tainttoleration/taint_toleration.go | 6 +- .../plugins/volumebinding/volume_binding.go | 18 +- .../volumerestrictions/volume_restrictions.go | 10 +- .../plugins/volumezone/volume_zone.go | 12 +- pkg/scheduler/framework/runtime/framework.go | 75 ++- .../framework/runtime/framework_test.go | 159 ------ pkg/scheduler/framework/types.go | 56 +++ .../internal/queue/scheduling_queue.go | 224 +++++---- .../internal/queue/scheduling_queue_test.go | 472 +++++++++++++----- pkg/scheduler/schedule_one.go | 4 +- pkg/scheduler/scheduler.go | 49 +- pkg/scheduler/scheduler_test.go | 439 ++++++++++++++++ pkg/scheduler/util/utils.go | 24 + pkg/scheduler/util/utils_test.go | 104 ++++ test/integration/scheduler/queue_test.go | 6 +- 29 files changed, 1281 insertions(+), 511 deletions(-) diff --git a/pkg/scheduler/eventhandlers.go b/pkg/scheduler/eventhandlers.go index 15944ceb163..08adba8a0a9 100644 --- a/pkg/scheduler/eventhandlers.go +++ b/pkg/scheduler/eventhandlers.go @@ -57,7 +57,7 @@ func (sched *Scheduler) onStorageClassAdd(obj interface{}) { // We don't need to invalidate cached results because results will not be // cached for pod that has unbound immediate PVCs. if sc.VolumeBindingMode != nil && *sc.VolumeBindingMode == storagev1.VolumeBindingWaitForFirstConsumer { - sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, queue.StorageClassAdd, nil) + sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, queue.StorageClassAdd, nil, sc, nil) } } @@ -71,7 +71,7 @@ func (sched *Scheduler) addNodeToCache(obj interface{}) { nodeInfo := sched.Cache.AddNode(logger, node) logger.V(3).Info("Add event for node", "node", klog.KObj(node)) - sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, queue.NodeAdd, preCheckForNode(nodeInfo)) + sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, queue.NodeAdd, nil, node, preCheckForNode(nodeInfo)) } func (sched *Scheduler) updateNodeInCache(oldObj, newObj interface{}) { @@ -90,7 +90,7 @@ func (sched *Scheduler) updateNodeInCache(oldObj, newObj interface{}) { nodeInfo := sched.Cache.UpdateNode(logger, oldNode, newNode) // Only requeue unschedulable pods if the node became more schedulable. if event := nodeSchedulingPropertiesChange(newNode, oldNode); event != nil { - sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, *event, preCheckForNode(nodeInfo)) + sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, *event, oldNode, newNode, preCheckForNode(nodeInfo)) } } @@ -180,7 +180,7 @@ func (sched *Scheduler) deletePodFromSchedulingQueue(obj interface{}) { // removing it from the scheduler cache. In this case, signal a AssignedPodDelete // event to immediately retry some unscheduled Pods. if fwk.RejectWaitingPod(pod.UID) { - sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, queue.AssignedPodDelete, nil) + sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, queue.AssignedPodDelete, pod, nil, nil) } } @@ -218,7 +218,7 @@ func (sched *Scheduler) updatePodInCache(oldObj, newObj interface{}) { logger.Error(err, "Scheduler cache UpdatePod failed", "pod", klog.KObj(oldPod)) } - sched.SchedulingQueue.AssignedPodUpdated(logger, newPod) + sched.SchedulingQueue.AssignedPodUpdated(logger, oldPod, newPod) } func (sched *Scheduler) deletePodFromCache(obj interface{}) { @@ -243,7 +243,7 @@ func (sched *Scheduler) deletePodFromCache(obj interface{}) { logger.Error(err, "Scheduler cache RemovePod failed", "pod", klog.KObj(pod)) } - sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, queue.AssignedPodDelete, nil) + sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, queue.AssignedPodDelete, pod, nil, nil) } // assignedPod selects pods that are assigned (scheduled and running). @@ -332,20 +332,20 @@ func addAllEventHandlers( funcs := cache.ResourceEventHandlerFuncs{} if at&framework.Add != 0 { evt := framework.ClusterEvent{Resource: gvk, ActionType: framework.Add, Label: fmt.Sprintf("%vAdd", shortGVK)} - funcs.AddFunc = func(_ interface{}) { - sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, evt, nil) + funcs.AddFunc = func(obj interface{}) { + sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, evt, nil, obj, nil) } } if at&framework.Update != 0 { evt := framework.ClusterEvent{Resource: gvk, ActionType: framework.Update, Label: fmt.Sprintf("%vUpdate", shortGVK)} - funcs.UpdateFunc = func(_, _ interface{}) { - sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, evt, nil) + funcs.UpdateFunc = func(old, obj interface{}) { + sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, evt, old, obj, nil) } } if at&framework.Delete != 0 { evt := framework.ClusterEvent{Resource: gvk, ActionType: framework.Delete, Label: fmt.Sprintf("%vDelete", shortGVK)} - funcs.DeleteFunc = func(_ interface{}) { - sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, evt, nil) + funcs.DeleteFunc = func(obj interface{}) { + sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, evt, obj, nil, nil) } } return funcs @@ -412,8 +412,8 @@ func addAllEventHandlers( if at&framework.Update != 0 { informerFactory.Storage().V1().StorageClasses().Informer().AddEventHandler( cache.ResourceEventHandlerFuncs{ - UpdateFunc: func(_, _ interface{}) { - sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, queue.StorageClassUpdate, nil) + UpdateFunc: func(old, obj interface{}) { + sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, queue.StorageClassUpdate, old, obj, nil) }, }, ) diff --git a/pkg/scheduler/framework/interface.go b/pkg/scheduler/framework/interface.go index 43ba3567482..6fc1564e387 100644 --- a/pkg/scheduler/framework/interface.go +++ b/pkg/scheduler/framework/interface.go @@ -323,13 +323,15 @@ type QueueSortPlugin interface { // move unschedulable Pods in internal scheduling queues. Plugins // that fail pod scheduling (e.g., Filter plugins) are expected to implement this interface. type EnqueueExtensions interface { + Plugin // EventsToRegister returns a series of possible events that may cause a Pod - // failed by this plugin schedulable. + // failed by this plugin schedulable. Each event has a callback function that + // filters out events to reduce useless retry of Pod's scheduling. // The events will be registered when instantiating the internal scheduling queue, // and leveraged to build event handlers dynamically. // Note: the returned list needs to be static (not depend on configuration parameters); // otherwise it would lead to undefined behavior. - EventsToRegister() []ClusterEvent + EventsToRegister() []ClusterEventWithHint } // PreFilterExtensions is an interface that is included in plugins that allow specifying @@ -513,6 +515,9 @@ type Framework interface { // PreEnqueuePlugins returns the registered preEnqueue plugins. PreEnqueuePlugins() []PreEnqueuePlugin + // EnqueueExtensions returns the registered Enqueue extensions. + EnqueueExtensions() []EnqueueExtensions + // QueueSortFunc returns the function to sort pods in scheduling queue QueueSortFunc() LessFunc diff --git a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go index 081b77f51ce..20fe2eae469 100644 --- a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go +++ b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go @@ -245,22 +245,21 @@ func (pl *dynamicResources) Name() string { // EventsToRegister returns the possible events that may make a Pod // failed by this plugin schedulable. -func (pl *dynamicResources) EventsToRegister() []framework.ClusterEvent { +func (pl *dynamicResources) EventsToRegister() []framework.ClusterEventWithHint { if !pl.enabled { return nil } - - events := []framework.ClusterEvent{ + events := []framework.ClusterEventWithHint{ // Allocation is tracked in ResourceClaims, so any changes may make the pods schedulable. - {Resource: framework.ResourceClaim, ActionType: framework.Add | framework.Update}, + {Event: framework.ClusterEvent{Resource: framework.ResourceClaim, ActionType: framework.Add | framework.Update}}, // When a driver has provided additional information, a pod waiting for that information // may be schedulable. // TODO (#113702): can we change this so that such an event does not trigger *all* pods? // Yes: https://github.com/kubernetes/kubernetes/blob/abcbaed0784baf5ed2382aae9705a8918f2daa18/pkg/scheduler/eventhandlers.go#L70 - {Resource: framework.PodSchedulingContext, ActionType: framework.Add | framework.Update}, + {Event: framework.ClusterEvent{Resource: framework.PodSchedulingContext, ActionType: framework.Add | framework.Update}}, // A resource might depend on node labels for topology filtering. // A new or updated node may make pods schedulable. - {Resource: framework.Node, ActionType: framework.Add | framework.UpdateNodeLabel}, + {Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.UpdateNodeLabel}}, } return events } diff --git a/pkg/scheduler/framework/plugins/interpodaffinity/plugin.go b/pkg/scheduler/framework/plugins/interpodaffinity/plugin.go index 2ca14362f7b..b7131eaf48f 100644 --- a/pkg/scheduler/framework/plugins/interpodaffinity/plugin.go +++ b/pkg/scheduler/framework/plugins/interpodaffinity/plugin.go @@ -54,8 +54,8 @@ func (pl *InterPodAffinity) Name() string { // EventsToRegister returns the possible events that may make a failed Pod // schedulable -func (pl *InterPodAffinity) EventsToRegister() []framework.ClusterEvent { - return []framework.ClusterEvent{ +func (pl *InterPodAffinity) EventsToRegister() []framework.ClusterEventWithHint { + return []framework.ClusterEventWithHint{ // All ActionType includes the following events: // - Delete. An unschedulable Pod may fail due to violating an existing Pod's anti-affinity constraints, // deleting an existing Pod may make it schedulable. @@ -63,8 +63,8 @@ func (pl *InterPodAffinity) EventsToRegister() []framework.ClusterEvent { // an unschedulable Pod schedulable. // - Add. An unschedulable Pod may fail due to violating pod-affinity constraints, // adding an assigned Pod may make it schedulable. - {Resource: framework.Pod, ActionType: framework.All}, - {Resource: framework.Node, ActionType: framework.Add | framework.UpdateNodeLabel}, + {Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.All}}, + {Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.UpdateNodeLabel}}, } } diff --git a/pkg/scheduler/framework/plugins/nodeaffinity/node_affinity.go b/pkg/scheduler/framework/plugins/nodeaffinity/node_affinity.go index 871fd924ea4..d9d431f9aeb 100644 --- a/pkg/scheduler/framework/plugins/nodeaffinity/node_affinity.go +++ b/pkg/scheduler/framework/plugins/nodeaffinity/node_affinity.go @@ -81,9 +81,9 @@ func (s *preFilterState) Clone() framework.StateData { // EventsToRegister returns the possible events that may make a Pod // failed by this plugin schedulable. -func (pl *NodeAffinity) EventsToRegister() []framework.ClusterEvent { - return []framework.ClusterEvent{ - {Resource: framework.Node, ActionType: framework.Add | framework.Update}, +func (pl *NodeAffinity) EventsToRegister() []framework.ClusterEventWithHint { + return []framework.ClusterEventWithHint{ + {Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.Update}}, } } diff --git a/pkg/scheduler/framework/plugins/nodename/node_name.go b/pkg/scheduler/framework/plugins/nodename/node_name.go index 1f0a0f388df..7adea806cb7 100644 --- a/pkg/scheduler/framework/plugins/nodename/node_name.go +++ b/pkg/scheduler/framework/plugins/nodename/node_name.go @@ -41,9 +41,9 @@ const ( // EventsToRegister returns the possible events that may make a Pod // failed by this plugin schedulable. -func (pl *NodeName) EventsToRegister() []framework.ClusterEvent { - return []framework.ClusterEvent{ - {Resource: framework.Node, ActionType: framework.Add | framework.Update}, +func (pl *NodeName) EventsToRegister() []framework.ClusterEventWithHint { + return []framework.ClusterEventWithHint{ + {Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.Update}}, } } diff --git a/pkg/scheduler/framework/plugins/nodeports/node_ports.go b/pkg/scheduler/framework/plugins/nodeports/node_ports.go index bfd648efe4a..237770afebb 100644 --- a/pkg/scheduler/framework/plugins/nodeports/node_ports.go +++ b/pkg/scheduler/framework/plugins/nodeports/node_ports.go @@ -101,11 +101,11 @@ func getPreFilterState(cycleState *framework.CycleState) (preFilterState, error) // EventsToRegister returns the possible events that may make a Pod // failed by this plugin schedulable. -func (pl *NodePorts) EventsToRegister() []framework.ClusterEvent { - return []framework.ClusterEvent{ +func (pl *NodePorts) EventsToRegister() []framework.ClusterEventWithHint { + return []framework.ClusterEventWithHint{ // Due to immutable fields `spec.containers[*].ports`, pod update events are ignored. - {Resource: framework.Pod, ActionType: framework.Delete}, - {Resource: framework.Node, ActionType: framework.Add | framework.Update}, + {Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Delete}}, + {Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.Update}}, } } diff --git a/pkg/scheduler/framework/plugins/noderesources/fit.go b/pkg/scheduler/framework/plugins/noderesources/fit.go index 3f4a352183d..dc72bf55f4e 100644 --- a/pkg/scheduler/framework/plugins/noderesources/fit.go +++ b/pkg/scheduler/framework/plugins/noderesources/fit.go @@ -234,16 +234,16 @@ func getPreFilterState(cycleState *framework.CycleState) (*preFilterState, error // EventsToRegister returns the possible events that may make a Pod // failed by this plugin schedulable. -func (f *Fit) EventsToRegister() []framework.ClusterEvent { +func (f *Fit) EventsToRegister() []framework.ClusterEventWithHint { podActionType := framework.Delete if f.enableInPlacePodVerticalScaling { // If InPlacePodVerticalScaling (KEP 1287) is enabled, then PodUpdate event should be registered // for this plugin since a Pod update may free up resources that make other Pods schedulable. podActionType |= framework.Update } - return []framework.ClusterEvent{ - {Resource: framework.Pod, ActionType: podActionType}, - {Resource: framework.Node, ActionType: framework.Add | framework.Update}, + return []framework.ClusterEventWithHint{ + {Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: podActionType}}, + {Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.Update}}, } } diff --git a/pkg/scheduler/framework/plugins/noderesources/fit_test.go b/pkg/scheduler/framework/plugins/noderesources/fit_test.go index 2ee4214683c..266e262f958 100644 --- a/pkg/scheduler/framework/plugins/noderesources/fit_test.go +++ b/pkg/scheduler/framework/plugins/noderesources/fit_test.go @@ -991,22 +991,22 @@ func TestEventsToRegister(t *testing.T) { tests := []struct { name string inPlacePodVerticalScalingEnabled bool - expectedClusterEvents []framework.ClusterEvent + expectedClusterEvents []framework.ClusterEventWithHint }{ { "Register events with InPlacePodVerticalScaling feature enabled", true, - []framework.ClusterEvent{ - {Resource: "Pod", ActionType: framework.Update | framework.Delete}, - {Resource: "Node", ActionType: framework.Add | framework.Update}, + []framework.ClusterEventWithHint{ + {Event: framework.ClusterEvent{Resource: "Pod", ActionType: framework.Update | framework.Delete}}, + {Event: framework.ClusterEvent{Resource: "Node", ActionType: framework.Add | framework.Update}}, }, }, { "Register events with InPlacePodVerticalScaling feature disabled", false, - []framework.ClusterEvent{ - {Resource: "Pod", ActionType: framework.Delete}, - {Resource: "Node", ActionType: framework.Add | framework.Update}, + []framework.ClusterEventWithHint{ + {Event: framework.ClusterEvent{Resource: "Pod", ActionType: framework.Delete}}, + {Event: framework.ClusterEvent{Resource: "Node", ActionType: framework.Add | framework.Update}}, }, }, } diff --git a/pkg/scheduler/framework/plugins/nodeunschedulable/node_unschedulable.go b/pkg/scheduler/framework/plugins/nodeunschedulable/node_unschedulable.go index 47844847ff8..1ba667ed75a 100644 --- a/pkg/scheduler/framework/plugins/nodeunschedulable/node_unschedulable.go +++ b/pkg/scheduler/framework/plugins/nodeunschedulable/node_unschedulable.go @@ -46,9 +46,9 @@ const ( // EventsToRegister returns the possible events that may make a Pod // failed by this plugin schedulable. -func (pl *NodeUnschedulable) EventsToRegister() []framework.ClusterEvent { - return []framework.ClusterEvent{ - {Resource: framework.Node, ActionType: framework.Add | framework.UpdateNodeTaint}, +func (pl *NodeUnschedulable) EventsToRegister() []framework.ClusterEventWithHint { + return []framework.ClusterEventWithHint{ + {Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.UpdateNodeTaint}}, } } diff --git a/pkg/scheduler/framework/plugins/nodevolumelimits/csi.go b/pkg/scheduler/framework/plugins/nodevolumelimits/csi.go index 8d345d602ee..e26401d39c2 100644 --- a/pkg/scheduler/framework/plugins/nodevolumelimits/csi.go +++ b/pkg/scheduler/framework/plugins/nodevolumelimits/csi.go @@ -74,10 +74,10 @@ func (pl *CSILimits) Name() string { // EventsToRegister returns the possible events that may make a Pod // failed by this plugin schedulable. -func (pl *CSILimits) EventsToRegister() []framework.ClusterEvent { - return []framework.ClusterEvent{ - {Resource: framework.CSINode, ActionType: framework.Add}, - {Resource: framework.Pod, ActionType: framework.Delete}, +func (pl *CSILimits) EventsToRegister() []framework.ClusterEventWithHint { + return []framework.ClusterEventWithHint{ + {Event: framework.ClusterEvent{Resource: framework.CSINode, ActionType: framework.Add}}, + {Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Delete}}, } } diff --git a/pkg/scheduler/framework/plugins/nodevolumelimits/non_csi.go b/pkg/scheduler/framework/plugins/nodevolumelimits/non_csi.go index 03a012eb0f3..763c6f45ddd 100644 --- a/pkg/scheduler/framework/plugins/nodevolumelimits/non_csi.go +++ b/pkg/scheduler/framework/plugins/nodevolumelimits/non_csi.go @@ -202,10 +202,10 @@ func (pl *nonCSILimits) Name() string { // EventsToRegister returns the possible events that may make a Pod // failed by this plugin schedulable. -func (pl *nonCSILimits) EventsToRegister() []framework.ClusterEvent { - return []framework.ClusterEvent{ - {Resource: framework.Node, ActionType: framework.Add}, - {Resource: framework.Pod, ActionType: framework.Delete}, +func (pl *nonCSILimits) EventsToRegister() []framework.ClusterEventWithHint { + return []framework.ClusterEventWithHint{ + {Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add}}, + {Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Delete}}, } } diff --git a/pkg/scheduler/framework/plugins/podtopologyspread/plugin.go b/pkg/scheduler/framework/plugins/podtopologyspread/plugin.go index ed2333e76df..17803e4ee0f 100644 --- a/pkg/scheduler/framework/plugins/podtopologyspread/plugin.go +++ b/pkg/scheduler/framework/plugins/podtopologyspread/plugin.go @@ -131,8 +131,8 @@ func (pl *PodTopologySpread) setListers(factory informers.SharedInformerFactory) // EventsToRegister returns the possible events that may make a Pod // failed by this plugin schedulable. -func (pl *PodTopologySpread) EventsToRegister() []framework.ClusterEvent { - return []framework.ClusterEvent{ +func (pl *PodTopologySpread) EventsToRegister() []framework.ClusterEventWithHint { + return []framework.ClusterEventWithHint{ // All ActionType includes the following events: // - Add. An unschedulable Pod may fail due to violating topology spread constraints, // adding an assigned Pod may make it schedulable. @@ -140,9 +140,9 @@ func (pl *PodTopologySpread) EventsToRegister() []framework.ClusterEvent { // an unschedulable Pod schedulable. // - Delete. An unschedulable Pod may fail due to violating an existing Pod's topology spread constraints, // deleting an existing Pod may make it schedulable. - {Resource: framework.Pod, ActionType: framework.All}, + {Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.All}}, // Node add|delete|updateLabel maybe lead an topology key changed, // and make these pod in scheduling schedulable or unschedulable. - {Resource: framework.Node, ActionType: framework.Add | framework.Delete | framework.UpdateNodeLabel}, + {Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.Delete | framework.UpdateNodeLabel}}, } } diff --git a/pkg/scheduler/framework/plugins/schedulinggates/scheduling_gates.go b/pkg/scheduler/framework/plugins/schedulinggates/scheduling_gates.go index 249e31a3b57..5c0678cb0d8 100644 --- a/pkg/scheduler/framework/plugins/schedulinggates/scheduling_gates.go +++ b/pkg/scheduler/framework/plugins/schedulinggates/scheduling_gates.go @@ -55,9 +55,9 @@ func (pl *SchedulingGates) PreEnqueue(ctx context.Context, p *v1.Pod) *framework // EventsToRegister returns the possible events that may make a Pod // failed by this plugin schedulable. -func (pl *SchedulingGates) EventsToRegister() []framework.ClusterEvent { - return []framework.ClusterEvent{ - {Resource: framework.Pod, ActionType: framework.Update}, +func (pl *SchedulingGates) EventsToRegister() []framework.ClusterEventWithHint { + return []framework.ClusterEventWithHint{ + {Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Update}}, } } diff --git a/pkg/scheduler/framework/plugins/tainttoleration/taint_toleration.go b/pkg/scheduler/framework/plugins/tainttoleration/taint_toleration.go index a990754e741..9d1bbe85caf 100644 --- a/pkg/scheduler/framework/plugins/tainttoleration/taint_toleration.go +++ b/pkg/scheduler/framework/plugins/tainttoleration/taint_toleration.go @@ -54,9 +54,9 @@ func (pl *TaintToleration) Name() string { // EventsToRegister returns the possible events that may make a Pod // failed by this plugin schedulable. -func (pl *TaintToleration) EventsToRegister() []framework.ClusterEvent { - return []framework.ClusterEvent{ - {Resource: framework.Node, ActionType: framework.Add | framework.Update}, +func (pl *TaintToleration) EventsToRegister() []framework.ClusterEventWithHint { + return []framework.ClusterEventWithHint{ + {Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.Update}}, } } diff --git a/pkg/scheduler/framework/plugins/volumebinding/volume_binding.go b/pkg/scheduler/framework/plugins/volumebinding/volume_binding.go index 2209e9309bb..66756b7af13 100644 --- a/pkg/scheduler/framework/plugins/volumebinding/volume_binding.go +++ b/pkg/scheduler/framework/plugins/volumebinding/volume_binding.go @@ -87,25 +87,25 @@ func (pl *VolumeBinding) Name() string { // EventsToRegister returns the possible events that may make a Pod // failed by this plugin schedulable. -func (pl *VolumeBinding) EventsToRegister() []framework.ClusterEvent { - events := []framework.ClusterEvent{ +func (pl *VolumeBinding) EventsToRegister() []framework.ClusterEventWithHint { + events := []framework.ClusterEventWithHint{ // Pods may fail because of missing or mis-configured storage class // (e.g., allowedTopologies, volumeBindingMode), and hence may become // schedulable upon StorageClass Add or Update events. - {Resource: framework.StorageClass, ActionType: framework.Add | framework.Update}, + {Event: framework.ClusterEvent{Resource: framework.StorageClass, ActionType: framework.Add | framework.Update}}, // We bind PVCs with PVs, so any changes may make the pods schedulable. - {Resource: framework.PersistentVolumeClaim, ActionType: framework.Add | framework.Update}, - {Resource: framework.PersistentVolume, ActionType: framework.Add | framework.Update}, + {Event: framework.ClusterEvent{Resource: framework.PersistentVolumeClaim, ActionType: framework.Add | framework.Update}}, + {Event: framework.ClusterEvent{Resource: framework.PersistentVolume, ActionType: framework.Add | framework.Update}}, // Pods may fail to find available PVs because the node labels do not // match the storage class's allowed topologies or PV's node affinity. // A new or updated node may make pods schedulable. - {Resource: framework.Node, ActionType: framework.Add | framework.UpdateNodeLabel}, + {Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.UpdateNodeLabel}}, // We rely on CSI node to translate in-tree PV to CSI. - {Resource: framework.CSINode, ActionType: framework.Add | framework.Update}, + {Event: framework.ClusterEvent{Resource: framework.CSINode, ActionType: framework.Add | framework.Update}}, // When CSIStorageCapacity is enabled, pods may become schedulable // on CSI driver & storage capacity changes. - {Resource: framework.CSIDriver, ActionType: framework.Add | framework.Update}, - {Resource: framework.CSIStorageCapacity, ActionType: framework.Add | framework.Update}, + {Event: framework.ClusterEvent{Resource: framework.CSIDriver, ActionType: framework.Add | framework.Update}}, + {Event: framework.ClusterEvent{Resource: framework.CSIStorageCapacity, ActionType: framework.Add | framework.Update}}, } return events } diff --git a/pkg/scheduler/framework/plugins/volumerestrictions/volume_restrictions.go b/pkg/scheduler/framework/plugins/volumerestrictions/volume_restrictions.go index 3681da4f7ab..cfe883bd83f 100644 --- a/pkg/scheduler/framework/plugins/volumerestrictions/volume_restrictions.go +++ b/pkg/scheduler/framework/plugins/volumerestrictions/volume_restrictions.go @@ -333,17 +333,17 @@ func (pl *VolumeRestrictions) Filter(ctx context.Context, cycleState *framework. // EventsToRegister returns the possible events that may make a Pod // failed by this plugin schedulable. -func (pl *VolumeRestrictions) EventsToRegister() []framework.ClusterEvent { - return []framework.ClusterEvent{ +func (pl *VolumeRestrictions) EventsToRegister() []framework.ClusterEventWithHint { + return []framework.ClusterEventWithHint{ // Pods may fail to schedule because of volumes conflicting with other pods on same node. // Once running pods are deleted and volumes have been released, the unschedulable pod will be schedulable. // Due to immutable fields `spec.volumes`, pod update events are ignored. - {Resource: framework.Pod, ActionType: framework.Delete}, + {Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Delete}}, // A new Node may make a pod schedulable. - {Resource: framework.Node, ActionType: framework.Add}, + {Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add}}, // Pods may fail to schedule because the PVC it uses has not yet been created. // This PVC is required to exist to check its access modes. - {Resource: framework.PersistentVolumeClaim, ActionType: framework.Add | framework.Update}, + {Event: framework.ClusterEvent{Resource: framework.PersistentVolumeClaim, ActionType: framework.Add | framework.Update}}, } } diff --git a/pkg/scheduler/framework/plugins/volumezone/volume_zone.go b/pkg/scheduler/framework/plugins/volumezone/volume_zone.go index e1b09f6f2a8..b0504ea7f78 100644 --- a/pkg/scheduler/framework/plugins/volumezone/volume_zone.go +++ b/pkg/scheduler/framework/plugins/volumezone/volume_zone.go @@ -260,18 +260,18 @@ func getErrorAsStatus(err error) *framework.Status { // EventsToRegister returns the possible events that may make a Pod // failed by this plugin schedulable. -func (pl *VolumeZone) EventsToRegister() []framework.ClusterEvent { - return []framework.ClusterEvent{ +func (pl *VolumeZone) EventsToRegister() []framework.ClusterEventWithHint { + return []framework.ClusterEventWithHint{ // New storageClass with bind mode `VolumeBindingWaitForFirstConsumer` will make a pod schedulable. // Due to immutable field `storageClass.volumeBindingMode`, storageClass update events are ignored. - {Resource: framework.StorageClass, ActionType: framework.Add}, + {Event: framework.ClusterEvent{Resource: framework.StorageClass, ActionType: framework.Add}}, // A new node or updating a node's volume zone labels may make a pod schedulable. - {Resource: framework.Node, ActionType: framework.Add | framework.UpdateNodeLabel}, + {Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.UpdateNodeLabel}}, // A new pvc may make a pod schedulable. // Due to fields are immutable except `spec.resources`, pvc update events are ignored. - {Resource: framework.PersistentVolumeClaim, ActionType: framework.Add}, + {Event: framework.ClusterEvent{Resource: framework.PersistentVolumeClaim, ActionType: framework.Add}}, // A new pv or updating a pv's volume zone labels may make a pod schedulable. - {Resource: framework.PersistentVolume, ActionType: framework.Add | framework.Update}, + {Event: framework.ClusterEvent{Resource: framework.PersistentVolume, ActionType: framework.Add | framework.Update}}, } } diff --git a/pkg/scheduler/framework/runtime/framework.go b/pkg/scheduler/framework/runtime/framework.go index 780e0a7923e..35ffede14ee 100644 --- a/pkg/scheduler/framework/runtime/framework.go +++ b/pkg/scheduler/framework/runtime/framework.go @@ -44,15 +44,6 @@ const ( maxTimeout = 15 * time.Minute ) -var allClusterEvents = []framework.ClusterEvent{ - {Resource: framework.Pod, ActionType: framework.All}, - {Resource: framework.Node, ActionType: framework.All}, - {Resource: framework.CSINode, ActionType: framework.All}, - {Resource: framework.PersistentVolume, ActionType: framework.All}, - {Resource: framework.PersistentVolumeClaim, ActionType: framework.All}, - {Resource: framework.StorageClass, ActionType: framework.All}, -} - // frameworkImpl is the component responsible for initializing and running scheduler // plugins. type frameworkImpl struct { @@ -61,6 +52,7 @@ type frameworkImpl struct { waitingPods *waitingPodsMap scorePluginWeight map[string]int preEnqueuePlugins []framework.PreEnqueuePlugin + enqueueExtensions []framework.EnqueueExtensions queueSortPlugins []framework.QueueSortPlugin preFilterPlugins []framework.PreFilterPlugin filterPlugins []framework.FilterPlugin @@ -133,7 +125,6 @@ type frameworkOptions struct { podNominator framework.PodNominator extenders []framework.Extender captureProfile CaptureProfile - clusterEventMap map[framework.ClusterEvent]sets.Set[string] parallelizer parallelize.Parallelizer logger *klog.Logger } @@ -217,13 +208,6 @@ func WithCaptureProfile(c CaptureProfile) Option { } } -// WithClusterEventMap sets clusterEventMap for the scheduling frameworkImpl. -func WithClusterEventMap(m map[framework.ClusterEvent]sets.Set[string]) Option { - return func(o *frameworkOptions) { - o.clusterEventMap = m - } -} - // WithMetricsRecorder sets metrics recorder for the scheduling frameworkImpl. func WithMetricsRecorder(r *metrics.MetricAsyncRecorder) Option { return func(o *frameworkOptions) { @@ -242,7 +226,6 @@ func WithLogger(logger klog.Logger) Option { func defaultFrameworkOptions(stopCh <-chan struct{}) frameworkOptions { return frameworkOptions{ metricsRecorder: metrics.NewMetricsAsyncRecorder(1000, time.Second, stopCh), - clusterEventMap: make(map[framework.ClusterEvent]sets.Set[string]), parallelizer: parallelize.NewParallelizer(parallelize.DefaultParallelism), } } @@ -325,8 +308,7 @@ func NewFramework(ctx context.Context, r Registry, profile *config.KubeScheduler } pluginsMap[name] = p - // Update ClusterEventMap in place. - fillEventToPluginMap(logger, p, options.clusterEventMap) + f.fillEnqueueExtensions(p) } // initialize plugins per individual extension points @@ -545,34 +527,40 @@ func (f *frameworkImpl) expandMultiPointPlugins(logger klog.Logger, profile *con return nil } -func fillEventToPluginMap(logger klog.Logger, p framework.Plugin, eventToPlugins map[framework.ClusterEvent]sets.Set[string]) { +func (f *frameworkImpl) fillEnqueueExtensions(p framework.Plugin) { ext, ok := p.(framework.EnqueueExtensions) if !ok { - // If interface EnqueueExtensions is not implemented, register the default events - // to the plugin. This is to ensure backward compatibility. - registerClusterEvents(p.Name(), eventToPlugins, allClusterEvents) + // If interface EnqueueExtensions is not implemented, register the default enqueue extensions + // to the plugin because we don't know which events the plugin is interested in. + // This is to ensure backward compatibility. + f.enqueueExtensions = append(f.enqueueExtensions, &defaultEnqueueExtension{pluginName: p.Name()}) return } - events := ext.EventsToRegister() - // It's rare that a plugin implements EnqueueExtensions but returns nil. - // We treat it as: the plugin is not interested in any event, and hence pod failed by that plugin - // cannot be moved by any regular cluster event. - if len(events) == 0 { - logger.Info("Plugin's EventsToRegister() returned nil", "plugin", p.Name()) - return - } - // The most common case: a plugin implements EnqueueExtensions and returns non-nil result. - registerClusterEvents(p.Name(), eventToPlugins, events) + f.enqueueExtensions = append(f.enqueueExtensions, ext) } -func registerClusterEvents(name string, eventToPlugins map[framework.ClusterEvent]sets.Set[string], evts []framework.ClusterEvent) { - for _, evt := range evts { - if eventToPlugins[evt] == nil { - eventToPlugins[evt] = sets.New(name) - } else { - eventToPlugins[evt].Insert(name) - } +// defaultEnqueueExtension is used when a plugin does not implement EnqueueExtensions interface. +type defaultEnqueueExtension struct { + pluginName string +} + +func (p *defaultEnqueueExtension) Name() string { return p.pluginName } +func (p *defaultEnqueueExtension) EventsToRegister() []framework.ClusterEventWithHint { + // need to return all specific cluster events with framework.All action instead of wildcard event + // because the returning values are used to register event handlers. + // If we return the wildcard here, it won't affect the event handlers registered by the plugin + // and some events may not be registered in the event handlers. + return []framework.ClusterEventWithHint{ + {Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.All}}, + {Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.All}}, + {Event: framework.ClusterEvent{Resource: framework.CSINode, ActionType: framework.All}}, + {Event: framework.ClusterEvent{Resource: framework.CSIDriver, ActionType: framework.All}}, + {Event: framework.ClusterEvent{Resource: framework.CSIStorageCapacity, ActionType: framework.All}}, + {Event: framework.ClusterEvent{Resource: framework.PersistentVolume, ActionType: framework.All}}, + {Event: framework.ClusterEvent{Resource: framework.PersistentVolumeClaim, ActionType: framework.All}}, + {Event: framework.ClusterEvent{Resource: framework.StorageClass, ActionType: framework.All}}, + {Event: framework.ClusterEvent{Resource: framework.PodSchedulingContext, ActionType: framework.All}}, } } @@ -607,6 +595,11 @@ func (f *frameworkImpl) PreEnqueuePlugins() []framework.PreEnqueuePlugin { return f.preEnqueuePlugins } +// EnqueueExtensions returns the registered reenqueue plugins. +func (f *frameworkImpl) EnqueueExtensions() []framework.EnqueueExtensions { + return f.enqueueExtensions +} + // QueueSortFunc returns the function to sort pods in scheduling queue func (f *frameworkImpl) QueueSortFunc() framework.LessFunc { if f == nil { diff --git a/pkg/scheduler/framework/runtime/framework_test.go b/pkg/scheduler/framework/runtime/framework_test.go index 9713250a70b..a0e4cb36dab 100644 --- a/pkg/scheduler/framework/runtime/framework_test.go +++ b/pkg/scheduler/framework/runtime/framework_test.go @@ -853,165 +853,6 @@ func TestNewFrameworkMultiPointExpansion(t *testing.T) { } } -// fakeNoopPlugin doesn't implement interface framework.EnqueueExtensions. -type fakeNoopPlugin struct{} - -func (*fakeNoopPlugin) Name() string { return "fakeNoop" } - -func (*fakeNoopPlugin) Filter(_ context.Context, _ *framework.CycleState, _ *v1.Pod, _ *framework.NodeInfo) *framework.Status { - return nil -} - -type fakeNodePlugin struct{} - -func (*fakeNodePlugin) Name() string { return "fakeNode" } - -func (*fakeNodePlugin) Filter(_ context.Context, _ *framework.CycleState, _ *v1.Pod, _ *framework.NodeInfo) *framework.Status { - return nil -} - -func (*fakeNodePlugin) EventsToRegister() []framework.ClusterEvent { - return []framework.ClusterEvent{ - {Resource: framework.Pod, ActionType: framework.All}, - {Resource: framework.Node, ActionType: framework.Delete}, - {Resource: framework.CSINode, ActionType: framework.Update | framework.Delete}, - } -} - -type fakePodPlugin struct{} - -func (*fakePodPlugin) Name() string { return "fakePod" } - -func (*fakePodPlugin) Filter(_ context.Context, _ *framework.CycleState, _ *v1.Pod, _ *framework.NodeInfo) *framework.Status { - return nil -} - -func (*fakePodPlugin) EventsToRegister() []framework.ClusterEvent { - return []framework.ClusterEvent{ - {Resource: framework.Pod, ActionType: framework.All}, - {Resource: framework.Node, ActionType: framework.Add | framework.Delete}, - {Resource: framework.PersistentVolumeClaim, ActionType: framework.Delete}, - } -} - -// fakeNoopRuntimePlugin implement interface framework.EnqueueExtensions, but returns nil -// at runtime. This can simulate a plugin registered at scheduler setup, but does nothing -// due to some disabled feature gate. -type fakeNoopRuntimePlugin struct{} - -func (*fakeNoopRuntimePlugin) Name() string { return "fakeNoopRuntime" } - -func (*fakeNoopRuntimePlugin) Filter(_ context.Context, _ *framework.CycleState, _ *v1.Pod, _ *framework.NodeInfo) *framework.Status { - return nil -} - -func (*fakeNoopRuntimePlugin) EventsToRegister() []framework.ClusterEvent { return nil } - -func TestNewFrameworkFillEventToPluginMap(t *testing.T) { - tests := []struct { - name string - plugins []framework.Plugin - want map[framework.ClusterEvent]sets.Set[string] - }{ - { - name: "no-op plugin", - plugins: []framework.Plugin{&fakeNoopPlugin{}}, - want: map[framework.ClusterEvent]sets.Set[string]{ - {Resource: framework.Pod, ActionType: framework.All}: sets.New("fakeNoop", bindPlugin, queueSortPlugin), - {Resource: framework.Node, ActionType: framework.All}: sets.New("fakeNoop", bindPlugin, queueSortPlugin), - {Resource: framework.CSINode, ActionType: framework.All}: sets.New("fakeNoop", bindPlugin, queueSortPlugin), - {Resource: framework.PersistentVolume, ActionType: framework.All}: sets.New("fakeNoop", bindPlugin, queueSortPlugin), - {Resource: framework.PersistentVolumeClaim, ActionType: framework.All}: sets.New("fakeNoop", bindPlugin, queueSortPlugin), - {Resource: framework.StorageClass, ActionType: framework.All}: sets.New("fakeNoop", bindPlugin, queueSortPlugin), - }, - }, - { - name: "node plugin", - plugins: []framework.Plugin{&fakeNodePlugin{}}, - want: map[framework.ClusterEvent]sets.Set[string]{ - {Resource: framework.Pod, ActionType: framework.All}: sets.New("fakeNode", bindPlugin, queueSortPlugin), - {Resource: framework.Node, ActionType: framework.Delete}: sets.New("fakeNode"), - {Resource: framework.Node, ActionType: framework.All}: sets.New(bindPlugin, queueSortPlugin), - {Resource: framework.CSINode, ActionType: framework.Update | framework.Delete}: sets.New("fakeNode"), - {Resource: framework.CSINode, ActionType: framework.All}: sets.New(bindPlugin, queueSortPlugin), - {Resource: framework.PersistentVolume, ActionType: framework.All}: sets.New(bindPlugin, queueSortPlugin), - {Resource: framework.PersistentVolumeClaim, ActionType: framework.All}: sets.New(bindPlugin, queueSortPlugin), - {Resource: framework.StorageClass, ActionType: framework.All}: sets.New(bindPlugin, queueSortPlugin), - }, - }, - { - name: "pod plugin", - plugins: []framework.Plugin{&fakePodPlugin{}}, - want: map[framework.ClusterEvent]sets.Set[string]{ - {Resource: framework.Pod, ActionType: framework.All}: sets.New("fakePod", bindPlugin, queueSortPlugin), - {Resource: framework.Node, ActionType: framework.Add | framework.Delete}: sets.New("fakePod"), - {Resource: framework.Node, ActionType: framework.All}: sets.New(bindPlugin, queueSortPlugin), - {Resource: framework.PersistentVolumeClaim, ActionType: framework.Delete}: sets.New("fakePod"), - {Resource: framework.PersistentVolumeClaim, ActionType: framework.All}: sets.New(bindPlugin, queueSortPlugin), - {Resource: framework.CSINode, ActionType: framework.All}: sets.New(bindPlugin, queueSortPlugin), - {Resource: framework.PersistentVolume, ActionType: framework.All}: sets.New(bindPlugin, queueSortPlugin), - {Resource: framework.StorageClass, ActionType: framework.All}: sets.New(bindPlugin, queueSortPlugin), - }, - }, - { - name: "node and pod plugin", - plugins: []framework.Plugin{&fakeNodePlugin{}, &fakePodPlugin{}}, - want: map[framework.ClusterEvent]sets.Set[string]{ - {Resource: framework.Node, ActionType: framework.Delete}: sets.New("fakeNode"), - {Resource: framework.Node, ActionType: framework.Add | framework.Delete}: sets.New("fakePod"), - {Resource: framework.Pod, ActionType: framework.All}: sets.New("fakeNode", "fakePod", bindPlugin, queueSortPlugin), - {Resource: framework.CSINode, ActionType: framework.Update | framework.Delete}: sets.New("fakeNode"), - {Resource: framework.PersistentVolumeClaim, ActionType: framework.Delete}: sets.New("fakePod"), - {Resource: framework.Node, ActionType: framework.All}: sets.New(bindPlugin, queueSortPlugin), - {Resource: framework.CSINode, ActionType: framework.All}: sets.New(bindPlugin, queueSortPlugin), - {Resource: framework.PersistentVolume, ActionType: framework.All}: sets.New(bindPlugin, queueSortPlugin), - {Resource: framework.PersistentVolumeClaim, ActionType: framework.All}: sets.New(bindPlugin, queueSortPlugin), - {Resource: framework.StorageClass, ActionType: framework.All}: sets.New(bindPlugin, queueSortPlugin), - }, - }, - { - name: "no-op runtime plugin", - plugins: []framework.Plugin{&fakeNoopRuntimePlugin{}}, - want: map[framework.ClusterEvent]sets.Set[string]{ - {Resource: framework.Pod, ActionType: framework.All}: sets.New(bindPlugin, queueSortPlugin), - {Resource: framework.Node, ActionType: framework.All}: sets.New(bindPlugin, queueSortPlugin), - {Resource: framework.CSINode, ActionType: framework.All}: sets.New(bindPlugin, queueSortPlugin), - {Resource: framework.PersistentVolume, ActionType: framework.All}: sets.New(bindPlugin, queueSortPlugin), - {Resource: framework.PersistentVolumeClaim, ActionType: framework.All}: sets.New(bindPlugin, queueSortPlugin), - {Resource: framework.StorageClass, ActionType: framework.All}: sets.New(bindPlugin, queueSortPlugin), - }, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - registry := Registry{} - cfgPls := &config.Plugins{} - for _, pl := range tt.plugins { - tmpPl := pl - if err := registry.Register(pl.Name(), func(_ runtime.Object, _ framework.Handle) (framework.Plugin, error) { - return tmpPl, nil - }); err != nil { - t.Fatalf("fail to register filter plugin (%s)", pl.Name()) - } - cfgPls.Filter.Enabled = append(cfgPls.Filter.Enabled, config.Plugin{Name: pl.Name()}) - } - - got := make(map[framework.ClusterEvent]sets.Set[string]) - profile := config.KubeSchedulerProfile{Plugins: cfgPls} - _, ctx := ktesting.NewTestContext(t) - ctx, cancel := context.WithCancel(ctx) - defer cancel() - _, err := newFrameworkWithQueueSortAndBind(ctx, registry, profile, WithClusterEventMap(got)) - if err != nil { - t.Fatal(err) - } - if diff := cmp.Diff(tt.want, got); diff != "" { - t.Errorf("Unexpected eventToPlugin map (-want,+got):%s", diff) - } - }) - } -} - func TestPreEnqueuePlugins(t *testing.T) { tests := []struct { name string diff --git a/pkg/scheduler/framework/types.go b/pkg/scheduler/framework/types.go index 6f9e9b295da..34d0c984d84 100644 --- a/pkg/scheduler/framework/types.go +++ b/pkg/scheduler/framework/types.go @@ -78,6 +78,62 @@ const ( WildCard GVK = "*" ) +type ClusterEventWithHint struct { + Event ClusterEvent + // QueueingHintFn is executed for the plugin rejected by this plugin when the above Event happens, + // and filters out events to reduce useless retry of Pod's scheduling. + // It's an optional field. If not set, + // the scheduling of Pods will be always retried with backoff when this Event happens. + // (the same as QueueAfterBackoff) + QueueingHintFn QueueingHintFn +} + +// QueueingHintFn returns a hint that signals whether the event can make a Pod, +// which was rejected by this plugin in the past scheduling cycle, schedulable or not. +// It's called before a Pod gets moved from unschedulableQ to backoffQ or activeQ. +// +// - `pod`: the Pod to be enqueued, which is rejected by this plugin in the past. +// - `event`: By which event the pod will be moved back to schedQ/backoffQ. +// - `oldObj` `newObj`: the object involved in that event. +// - For example, the given event is "Node deleted", the `oldObj` will be that deleted Node. +// - `oldObj` is nil if the event is add event. +// - `newObj` is nil if the event is delete event. +type QueueingHintFn func(pod *v1.Pod, oldObj, newObj interface{}) QueueingHint + +type QueueingHint int + +const ( + // QueueSkip implies that the cluster event has no impact on + // scheduling of the pod. + QueueSkip QueueingHint = iota + + // QueueAfterBackoff implies that the Pod may be schedulable by the event, + // and worth retring the scheduling again after backoff. + QueueAfterBackoff + + // QueueImmediately is returned only when it is highly possible that the Pod gets scheduled in the next scheduling. + // You should only return QueueImmediately when there is a high chance that the Pod gets scheduled in the next scheduling. + // Otherwise, it's detrimental to scheduling throughput. + // For example, when the Pod was rejected as waiting for an external resource to be provisioned, that is directly tied to the Pod, + // and the event is that the resource is provisioned, then you can return QueueImmediately. + // As a counterexample, when the Pod was rejected due to insufficient memory resource, + // and the event is that more memory on Node is available, then you should return QueueAfterBackoff instead of QueueImmediately + // because other Pods may be waiting for the same resources and only a few of them would schedule in the next scheduling cycle. + QueueImmediately +) + +func (s QueueingHint) String() string { + switch s { + case QueueSkip: + return "QueueSkip" + case QueueAfterBackoff: + return "QueueAfterBackoff" + case QueueImmediately: + return "QueueImmediately" + } + return "" +} + // ClusterEvent abstracts how a system resource's state gets changed. // Resource represents the standard API resources such as Pod, Node, etc. // ActionType denotes the specific change such as Add, Update or Delete. diff --git a/pkg/scheduler/internal/queue/scheduling_queue.go b/pkg/scheduler/internal/queue/scheduling_queue.go index 15eb581c15f..fabd13007d3 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue.go +++ b/pkg/scheduler/internal/queue/scheduling_queue.go @@ -106,9 +106,13 @@ type SchedulingQueue interface { Pop() (*framework.QueuedPodInfo, error) Update(logger klog.Logger, oldPod, newPod *v1.Pod) error Delete(pod *v1.Pod) error - MoveAllToActiveOrBackoffQueue(logger klog.Logger, event framework.ClusterEvent, preCheck PreEnqueueCheck) + // TODO(sanposhiho): move all PreEnqueueCkeck to Requeue and delete it from this parameter eventually. + // Some PreEnqueueCheck include event filtering logic based on some in-tree plugins + // and it affect badly to other plugins. + // See https://github.com/kubernetes/kubernetes/issues/110175 + MoveAllToActiveOrBackoffQueue(logger klog.Logger, event framework.ClusterEvent, oldObj, newObj interface{}, preCheck PreEnqueueCheck) AssignedPodAdded(logger klog.Logger, pod *v1.Pod) - AssignedPodUpdated(logger klog.Logger, pod *v1.Pod) + AssignedPodUpdated(logger klog.Logger, oldPod, newPod *v1.Pod) PendingPods() ([]*v1.Pod, string) // Close closes the SchedulingQueue so that the goroutine which is // waiting to pop items can exit gracefully. @@ -171,9 +175,10 @@ type PriorityQueue struct { // when we received move request. moveRequestCycle int64 - clusterEventMap map[framework.ClusterEvent]sets.Set[string] // preEnqueuePluginMap is keyed with profile name, valued with registered preEnqueue plugins. preEnqueuePluginMap map[string][]framework.PreEnqueuePlugin + // queueingHintMap is keyed with profile name, valued with registered queueing hint functions. + queueingHintMap QueueingHintMapPerProfile // closed indicates that the queue is closed. // It is mainly used to let Pop() exit its control loop while waiting for an item. @@ -186,6 +191,12 @@ type PriorityQueue struct { pluginMetricsSamplePercent int } +// QueueingHintFunction is the wrapper of QueueingHintFn that has PluginName. +type QueueingHintFunction struct { + PluginName string + QueueingHintFn framework.QueueingHintFn +} + type priorityQueueOptions struct { clock clock.Clock podInitialBackoffDuration time.Duration @@ -194,8 +205,8 @@ type priorityQueueOptions struct { podLister listersv1.PodLister metricsRecorder metrics.MetricAsyncRecorder pluginMetricsSamplePercent int - clusterEventMap map[framework.ClusterEvent]sets.Set[string] preEnqueuePluginMap map[string][]framework.PreEnqueuePlugin + queueingHintMap QueueingHintMapPerProfile } // Option configures a PriorityQueue @@ -229,13 +240,6 @@ func WithPodLister(pl listersv1.PodLister) Option { } } -// WithClusterEventMap sets clusterEventMap for PriorityQueue. -func WithClusterEventMap(m map[framework.ClusterEvent]sets.Set[string]) Option { - return func(o *priorityQueueOptions) { - o.clusterEventMap = m - } -} - // WithPodMaxInUnschedulablePodsDuration sets podMaxInUnschedulablePodsDuration for PriorityQueue. func WithPodMaxInUnschedulablePodsDuration(duration time.Duration) Option { return func(o *priorityQueueOptions) { @@ -243,6 +247,19 @@ func WithPodMaxInUnschedulablePodsDuration(duration time.Duration) Option { } } +// QueueingHintMapPerProfile is keyed with profile name, valued with queueing hint map registered for the profile. +type QueueingHintMapPerProfile map[string]QueueingHintMap + +// QueueingHintMap is keyed with ClusterEvent, valued with queueing hint functions registered for the event. +type QueueingHintMap map[framework.ClusterEvent][]*QueueingHintFunction + +// WithQueueingHintMapPerProfile sets preEnqueuePluginMap for PriorityQueue. +func WithQueueingHintMapPerProfile(m QueueingHintMapPerProfile) Option { + return func(o *priorityQueueOptions) { + o.queueingHintMap = m + } +} + // WithPreEnqueuePluginMap sets preEnqueuePluginMap for PriorityQueue. func WithPreEnqueuePluginMap(m map[string][]framework.PreEnqueuePlugin) Option { return func(o *priorityQueueOptions) { @@ -314,8 +331,8 @@ func NewPriorityQueue( activeQ: heap.NewWithRecorder(podInfoKeyFunc, comp, metrics.NewActivePodsRecorder()), unschedulablePods: newUnschedulablePods(metrics.NewUnschedulablePodsRecorder(), metrics.NewGatedPodsRecorder()), moveRequestCycle: -1, - clusterEventMap: options.clusterEventMap, preEnqueuePluginMap: options.preEnqueuePluginMap, + queueingHintMap: options.queueingHintMap, metricsRecorder: options.metricsRecorder, pluginMetricsSamplePercent: options.pluginMetricsSamplePercent, } @@ -336,6 +353,62 @@ func (p *PriorityQueue) Run(logger klog.Logger) { }, 30*time.Second, p.stop) } +// isPodWorthRequeuing calls QueueingHintFn of only plugins registered in pInfo.unschedulablePlugins. +// If any QueueingHintFn returns QueueImmediately, the scheduling queue is supposed to enqueue this Pod to activeQ. +// If no QueueingHintFn returns QueueImmediately, but some return QueueAfterBackoff, +// the scheduling queue is supposed to enqueue this Pod to activeQ/backoffQ depending on the remaining backoff time of the Pod. +// If all QueueingHintFn returns QueueSkip, the scheduling queue enqueues the Pod back to unschedulable Pod pool +// because no plugin changes the scheduling result via the event. +func (p *PriorityQueue) isPodWorthRequeuing(logger klog.Logger, pInfo *framework.QueuedPodInfo, event framework.ClusterEvent, oldObj, newObj interface{}) framework.QueueingHint { + if pInfo.UnschedulablePlugins.Len() == 0 { + logger.V(6).Info("Worth requeuing because no unschedulable plugins", "pod", klog.KObj(pInfo.Pod)) + return framework.QueueAfterBackoff + } + + if event.IsWildCard() { + logger.V(6).Info("Worth requeuing because the event is wildcard", "pod", klog.KObj(pInfo.Pod)) + return framework.QueueAfterBackoff + } + + hintMap, ok := p.queueingHintMap[pInfo.Pod.Spec.SchedulerName] + if !ok { + // shouldn't reach here unless bug. + logger.Error(nil, "No QueueingHintMap is registered for this profile", "profile", pInfo.Pod.Spec.SchedulerName, "pod", klog.KObj(pInfo.Pod)) + return framework.QueueAfterBackoff + } + + pod := pInfo.Pod + queueHint := framework.QueueSkip + for eventToMatch, hintfns := range hintMap { + if eventToMatch.Resource != event.Resource || eventToMatch.ActionType&event.ActionType == 0 { + continue + } + + for _, hintfn := range hintfns { + if !pInfo.UnschedulablePlugins.Has(hintfn.PluginName) { + continue + } + + h := hintfn.QueueingHintFn(pod, oldObj, newObj) + if h == framework.QueueSkip { + continue + } + + if h == framework.QueueImmediately { + return h + } + + // replace queueHint with the returned value, + // but continue to other queueHintFn to check because other plugins may want to return QueueImmediately. + queueHint = h + } + } + + // No queueing hint function is registered for this event + // or no queueing hint fn returns the value other than QueueSkip. + return queueHint +} + // runPreEnqueuePlugins iterates PreEnqueue function in each registered PreEnqueuePlugin. // It returns true if all PreEnqueue function run successfully; otherwise returns false // upon the first failure. @@ -528,7 +601,6 @@ func (p *PriorityQueue) AddUnschedulableIfNotPresent(logger klog.Logger, pInfo * p.unschedulablePods.addOrUpdate(pInfo) logger.V(5).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pod), "event", ScheduleAttemptFailure, "queue", unschedulablePods) metrics.SchedulerQueueIncomingPods.WithLabelValues("unschedulable", ScheduleAttemptFailure).Inc() - } p.addNominatedPodUnlocked(logger, pInfo.PodInfo, nil) @@ -583,7 +655,7 @@ func (p *PriorityQueue) flushUnschedulablePodsLeftover(logger klog.Logger) { } if len(podsToMove) > 0 { - p.movePodsToActiveOrBackoffQueue(logger, podsToMove, UnschedulableTimeout) + p.movePodsToActiveOrBackoffQueue(logger, podsToMove, UnschedulableTimeout, nil, nil) } } @@ -712,7 +784,7 @@ func (p *PriorityQueue) Delete(pod *v1.Pod) error { // may make pending pods with matching affinity terms schedulable. func (p *PriorityQueue) AssignedPodAdded(logger klog.Logger, pod *v1.Pod) { p.lock.Lock() - p.movePodsToActiveOrBackoffQueue(logger, p.getUnschedulablePodsWithMatchingAffinityTerm(logger, pod), AssignedPodAdd) + p.movePodsToActiveOrBackoffQueue(logger, p.getUnschedulablePodsWithMatchingAffinityTerm(logger, pod), AssignedPodAdd, nil, pod) p.lock.Unlock() } @@ -732,12 +804,12 @@ func isPodResourcesResizedDown(pod *v1.Pod) bool { // AssignedPodUpdated is called when a bound pod is updated. Change of labels // may make pending pods with matching affinity terms schedulable. -func (p *PriorityQueue) AssignedPodUpdated(logger klog.Logger, pod *v1.Pod) { +func (p *PriorityQueue) AssignedPodUpdated(logger klog.Logger, oldPod, newPod *v1.Pod) { p.lock.Lock() - if isPodResourcesResizedDown(pod) { - p.moveAllToActiveOrBackoffQueue(logger, AssignedPodUpdate, nil) + if isPodResourcesResizedDown(newPod) { + p.moveAllToActiveOrBackoffQueue(logger, AssignedPodUpdate, oldPod, newPod, nil) } else { - p.movePodsToActiveOrBackoffQueue(logger, p.getUnschedulablePodsWithMatchingAffinityTerm(logger, pod), AssignedPodUpdate) + p.movePodsToActiveOrBackoffQueue(logger, p.getUnschedulablePodsWithMatchingAffinityTerm(logger, newPod), AssignedPodUpdate, oldPod, newPod) } p.lock.Unlock() } @@ -747,54 +819,75 @@ func (p *PriorityQueue) AssignedPodUpdated(logger klog.Logger, pod *v1.Pod) { // This function adds all pods and then signals the condition variable to ensure that // if Pop() is waiting for an item, it receives the signal after all the pods are in the // queue and the head is the highest priority pod. -func (p *PriorityQueue) moveAllToActiveOrBackoffQueue(logger klog.Logger, event framework.ClusterEvent, preCheck PreEnqueueCheck) { +func (p *PriorityQueue) moveAllToActiveOrBackoffQueue(logger klog.Logger, event framework.ClusterEvent, oldObj, newObj interface{}, preCheck PreEnqueueCheck) { unschedulablePods := make([]*framework.QueuedPodInfo, 0, len(p.unschedulablePods.podInfoMap)) for _, pInfo := range p.unschedulablePods.podInfoMap { if preCheck == nil || preCheck(pInfo.Pod) { unschedulablePods = append(unschedulablePods, pInfo) } } - p.movePodsToActiveOrBackoffQueue(logger, unschedulablePods, event) + p.movePodsToActiveOrBackoffQueue(logger, unschedulablePods, event, oldObj, newObj) } // MoveAllToActiveOrBackoffQueue moves all pods from unschedulablePods to activeQ or backoffQ. // This function adds all pods and then signals the condition variable to ensure that // if Pop() is waiting for an item, it receives the signal after all the pods are in the // queue and the head is the highest priority pod. -func (p *PriorityQueue) MoveAllToActiveOrBackoffQueue(logger klog.Logger, event framework.ClusterEvent, preCheck PreEnqueueCheck) { +func (p *PriorityQueue) MoveAllToActiveOrBackoffQueue(logger klog.Logger, event framework.ClusterEvent, oldObj, newObj interface{}, preCheck PreEnqueueCheck) { p.lock.Lock() defer p.lock.Unlock() - p.moveAllToActiveOrBackoffQueue(logger, event, preCheck) + p.moveAllToActiveOrBackoffQueue(logger, event, oldObj, newObj, preCheck) +} + +// requeuePodViaQueueingHint tries to requeue Pod to activeQ, backoffQ or unschedulable pod pool based on schedulingHint. +// It returns the queue name Pod goes. +// +// NOTE: this function assumes lock has been acquired in caller +func (p *PriorityQueue) requeuePodViaQueueingHint(logger klog.Logger, pInfo *framework.QueuedPodInfo, schedulingHint framework.QueueingHint, event string) string { + if schedulingHint == framework.QueueSkip { + p.unschedulablePods.addOrUpdate(pInfo) + metrics.SchedulerQueueIncomingPods.WithLabelValues("unschedulable", ScheduleAttemptFailure).Inc() + return unschedulablePods + } + + pod := pInfo.Pod + if p.isPodBackingoff(pInfo) && schedulingHint == framework.QueueAfterBackoff { + if err := p.podBackoffQ.Add(pInfo); err != nil { + logger.Error(err, "Error adding pod to the backoff queue, queue this Pod to unschedulable pod pool", "pod", klog.KObj(pod)) + p.unschedulablePods.addOrUpdate(pInfo) + return unschedulablePods + } + + metrics.SchedulerQueueIncomingPods.WithLabelValues("backoff", event).Inc() + return backoffQ + } + + if added, _ := p.addToActiveQ(logger, pInfo); added { + metrics.SchedulerQueueIncomingPods.WithLabelValues("active", event).Inc() + return activeQ + } + + p.unschedulablePods.addOrUpdate(pInfo) + metrics.SchedulerQueueIncomingPods.WithLabelValues("unschedulable", ScheduleAttemptFailure).Inc() + return unschedulablePods } // NOTE: this function assumes lock has been acquired in caller -func (p *PriorityQueue) movePodsToActiveOrBackoffQueue(logger klog.Logger, podInfoList []*framework.QueuedPodInfo, event framework.ClusterEvent) { +func (p *PriorityQueue) movePodsToActiveOrBackoffQueue(logger klog.Logger, podInfoList []*framework.QueuedPodInfo, event framework.ClusterEvent, oldObj, newObj interface{}) { activated := false for _, pInfo := range podInfoList { - // If the event doesn't help making the Pod schedulable, continue. - // Note: we don't run the check if pInfo.UnschedulablePlugins is nil, which denotes - // either there is some abnormal error, or scheduling the pod failed by plugins other than PreFilter, Filter and Permit. - // In that case, it's desired to move it anyways. - if len(pInfo.UnschedulablePlugins) != 0 && !p.podMatchesEvent(pInfo, event) { + schedulingHint := p.isPodWorthRequeuing(logger, pInfo, event, oldObj, newObj) + if schedulingHint == framework.QueueSkip { + // QueueingHintFn determined that this Pod isn't worth putting to activeQ or backoffQ by this event. + logger.V(5).Info("Event is not making pod schedulable", "pod", klog.KObj(pInfo.Pod), "event", event.Label) continue } - pod := pInfo.Pod - if p.isPodBackingoff(pInfo) { - if err := p.podBackoffQ.Add(pInfo); err != nil { - logger.Error(err, "Error adding pod to the backoff queue", "pod", klog.KObj(pod)) - } else { - logger.V(5).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", event, "queue", backoffQ) - metrics.SchedulerQueueIncomingPods.WithLabelValues("backoff", event.Label).Inc() - p.unschedulablePods.delete(pod, pInfo.Gated) - } - } else { - gated := pInfo.Gated - if added, _ := p.addToActiveQ(logger, pInfo); added { - logger.V(5).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", event, "queue", activeQ) - activated = true - metrics.SchedulerQueueIncomingPods.WithLabelValues("active", event.Label).Inc() - p.unschedulablePods.delete(pod, gated) - } + + p.unschedulablePods.delete(pInfo.Pod, pInfo.Gated) + queue := p.requeuePodViaQueueingHint(logger, pInfo, schedulingHint, event.Label) + logger.V(4).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", event.Label, "queue", queue, "hint", schedulingHint) + if queue == activeQ { + activated = true } } p.moveRequestCycle = p.schedulingCycle @@ -1144,42 +1237,3 @@ func MakeNextPodFunc(logger klog.Logger, queue SchedulingQueue) func() *framewor func podInfoKeyFunc(obj interface{}) (string, error) { return cache.MetaNamespaceKeyFunc(obj.(*framework.QueuedPodInfo).Pod) } - -// Checks if the Pod may become schedulable upon the event. -// This is achieved by looking up the global clusterEventMap registry. -func (p *PriorityQueue) podMatchesEvent(podInfo *framework.QueuedPodInfo, clusterEvent framework.ClusterEvent) bool { - if clusterEvent.IsWildCard() { - return true - } - - for evt, nameSet := range p.clusterEventMap { - // Firstly verify if the two ClusterEvents match: - // - either the registered event from plugin side is a WildCardEvent, - // - or the two events have identical Resource fields and *compatible* ActionType. - // Note the ActionTypes don't need to be *identical*. We check if the ANDed value - // is zero or not. In this way, it's easy to tell Update&Delete is not compatible, - // but Update&All is. - evtMatch := evt.IsWildCard() || - (evt.Resource == clusterEvent.Resource && evt.ActionType&clusterEvent.ActionType != 0) - - // Secondly verify the plugin name matches. - // Note that if it doesn't match, we shouldn't continue to search. - if evtMatch && intersect(nameSet, podInfo.UnschedulablePlugins) { - return true - } - } - - return false -} - -func intersect(x, y sets.Set[string]) bool { - if len(x) > len(y) { - x, y = y, x - } - for v := range x { - if y.Has(v) { - return true - } - } - return false -} diff --git a/pkg/scheduler/internal/queue/scheduling_queue_test.go b/pkg/scheduler/internal/queue/scheduling_queue_test.go index 08f482b1646..6c4792ee4e5 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue_test.go +++ b/pkg/scheduler/internal/queue/scheduling_queue_test.go @@ -83,6 +83,16 @@ var ( cmp.AllowUnexported(nominator{}), cmpopts.IgnoreFields(nominator{}, "podLister", "lock"), } + + queueHintReturnQueueAfterBackoff = func(pod *v1.Pod, oldObj, newObj interface{}) framework.QueueingHint { + return framework.QueueAfterBackoff + } + queueHintReturnQueueImmediately = func(pod *v1.Pod, oldObj, newObj interface{}) framework.QueueingHint { + return framework.QueueImmediately + } + queueHintReturnQueueSkip = func(pod *v1.Pod, oldObj, newObj interface{}) framework.QueueingHint { + return framework.QueueSkip + } ) func getUnschedulablePod(p *PriorityQueue, pod *v1.Pod) *v1.Pod { @@ -93,6 +103,13 @@ func getUnschedulablePod(p *PriorityQueue, pod *v1.Pod) *v1.Pod { return nil } +// makeEmptyQueueingHintMapPerProfile initializes an empty QueueingHintMapPerProfile for "" profile name. +func makeEmptyQueueingHintMapPerProfile() QueueingHintMapPerProfile { + m := make(QueueingHintMapPerProfile) + m[""] = make(QueueingHintMap) + return m +} + func TestPriorityQueue_Add(t *testing.T) { objs := []runtime.Object{medPriorityPodInfo.Pod, unschedulablePodInfo.Pod, highPriorityPodInfo.Pod} logger, ctx := ktesting.NewTestContext(t) @@ -219,7 +236,7 @@ func TestPriorityQueue_AddUnschedulableIfNotPresent_Backoff(t *testing.T) { } // move all pods to active queue when we were trying to schedule them - q.MoveAllToActiveOrBackoffQueue(logger, TestEvent, nil) + q.MoveAllToActiveOrBackoffQueue(logger, TestEvent, nil, nil, nil) oldCycle := q.SchedulingCycle() firstPod, _ := q.Pop() @@ -611,23 +628,25 @@ func BenchmarkMoveAllToActiveOrBackoffQueue(b *testing.B) { b.StopTimer() c := testingclock.NewFakeClock(time.Now()) - m := make(map[framework.ClusterEvent]sets.Set[string]) + m := makeEmptyQueueingHintMapPerProfile() // - All plugins registered for events[0], which is NodeAdd. // - 1/2 of plugins registered for events[1] // - 1/3 of plugins registered for events[2] // - ... for j := 0; j < len(events); j++ { - m[events[j]] = sets.New[string]() for k := 0; k < len(plugins); k++ { if (k+1)%(j+1) == 0 { - m[events[j]].Insert(plugins[k]) + m[""][events[j]] = append(m[""][events[j]], &QueueingHintFunction{ + PluginName: plugins[k], + QueueingHintFn: queueHintReturnQueueAfterBackoff, + }) } } } ctx, cancel := context.WithCancel(context.Background()) defer cancel() - q := NewTestQueue(ctx, newDefaultQueueSort(), WithClock(c), WithClusterEventMap(m)) + q := NewTestQueue(ctx, newDefaultQueueSort(), WithClock(c), WithQueueingHintMapPerProfile(m)) // Init pods in unschedulablePods. for j := 0; j < podsInUnschedulablePods; j++ { @@ -657,10 +676,10 @@ func BenchmarkMoveAllToActiveOrBackoffQueue(b *testing.B) { b.StartTimer() if tt.moveEvent.Resource != "" { - q.MoveAllToActiveOrBackoffQueue(logger, tt.moveEvent, nil) + q.MoveAllToActiveOrBackoffQueue(logger, tt.moveEvent, nil, nil, nil) } else { // Random case. - q.MoveAllToActiveOrBackoffQueue(logger, events[i%len(events)], nil) + q.MoveAllToActiveOrBackoffQueue(logger, events[i%len(events)], nil, nil, nil) } } }) @@ -668,15 +687,93 @@ func BenchmarkMoveAllToActiveOrBackoffQueue(b *testing.B) { } } +func TestPriorityQueue_MoveAllToActiveOrBackoffQueueWithQueueingHint(t *testing.T) { + now := time.Now() + p := st.MakePod().Name("pod1").Namespace("ns1").UID("1").Obj() + tests := []struct { + name string + podInfo *framework.QueuedPodInfo + hint framework.QueueingHintFn + // duration is the duration that the Pod has been in the unschedulable queue. + duration time.Duration + // expectedQ is the queue name (activeQ, backoffQ, or unschedulablePods) that this Pod should be quened to. + expectedQ string + }{ + { + name: "QueueImmediately queues pod to activeQ", + podInfo: &framework.QueuedPodInfo{PodInfo: mustNewPodInfo(p)}, + hint: queueHintReturnQueueImmediately, + expectedQ: activeQ, + }, + { + name: "QueueAfterBackoff queues pod to backoffQ if Pod is backing off", + podInfo: &framework.QueuedPodInfo{PodInfo: mustNewPodInfo(p)}, + hint: queueHintReturnQueueAfterBackoff, + expectedQ: backoffQ, + }, + { + name: "QueueAfterBackoff queues pod to activeQ if Pod is not backing off", + podInfo: &framework.QueuedPodInfo{PodInfo: mustNewPodInfo(p)}, + hint: queueHintReturnQueueAfterBackoff, + duration: DefaultPodInitialBackoffDuration, // backoff is finished + expectedQ: activeQ, + }, + { + name: "QueueSkip queues pod to unschedulablePods", + podInfo: &framework.QueuedPodInfo{PodInfo: mustNewPodInfo(p)}, + hint: queueHintReturnQueueSkip, + expectedQ: unschedulablePods, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + logger, ctx := ktesting.NewTestContext(t) + m := makeEmptyQueueingHintMapPerProfile() + m[""][NodeAdd] = []*QueueingHintFunction{ + { + PluginName: "foo", + QueueingHintFn: test.hint, + }, + } + test.podInfo.UnschedulablePlugins = sets.New("foo") + cl := testingclock.NewFakeClock(now) + q := NewTestQueue(ctx, newDefaultQueueSort(), WithQueueingHintMapPerProfile(m), WithClock(cl)) + // add to unsched pod pool + q.AddUnschedulableIfNotPresent(logger, test.podInfo, q.SchedulingCycle()) + + cl.Step(test.duration) + + q.MoveAllToActiveOrBackoffQueue(logger, NodeAdd, nil, nil, nil) + + if q.podBackoffQ.Len() == 0 && test.expectedQ == backoffQ { + t.Fatalf("expected pod to be queued to backoffQ, but it was not") + } + + if q.activeQ.Len() == 0 && test.expectedQ == activeQ { + t.Fatalf("expected pod to be queued to activeQ, but it was not") + } + + if q.unschedulablePods.get(test.podInfo.Pod) == nil && test.expectedQ == unschedulablePods { + t.Fatalf("expected pod to be queued to unschedulablePods, but it was not") + } + }) + } +} + func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) { c := testingclock.NewFakeClock(time.Now()) - m := map[framework.ClusterEvent]sets.Set[string]{ - {Resource: framework.Node, ActionType: framework.Add}: sets.New("fooPlugin"), - } logger, ctx := ktesting.NewTestContext(t) ctx, cancel := context.WithCancel(ctx) defer cancel() - q := NewTestQueue(ctx, newDefaultQueueSort(), WithClock(c), WithClusterEventMap(m)) + m := makeEmptyQueueingHintMapPerProfile() + m[""][NodeAdd] = []*QueueingHintFunction{ + { + PluginName: "fooPlugin", + QueueingHintFn: queueHintReturnQueueAfterBackoff, + }, + } + q := NewTestQueue(ctx, newDefaultQueueSort(), WithClock(c), WithQueueingHintMapPerProfile(m)) q.Add(logger, medPriorityPodInfo.Pod) q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(unschedulablePodInfo.Pod, "fooPlugin"), q.SchedulingCycle()) q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(highPriorityPodInfo.Pod, "fooPlugin"), q.SchedulingCycle()) @@ -689,7 +786,7 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) { hpp2.Name = "hpp2" q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(hpp2, "barPlugin"), q.SchedulingCycle()) // Pods is still backing off, move the pod into backoffQ. - q.MoveAllToActiveOrBackoffQueue(logger, NodeAdd, nil) + q.MoveAllToActiveOrBackoffQueue(logger, NodeAdd, nil, nil, nil) if q.activeQ.Len() != 1 { t.Errorf("Expected 1 item to be in activeQ, but got: %v", q.activeQ.Len()) } @@ -715,7 +812,7 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) { // Move clock by podInitialBackoffDuration, so that pods in the unschedulablePods would pass the backing off, // and the pods will be moved into activeQ. c.Step(q.podInitialBackoffDuration) - q.MoveAllToActiveOrBackoffQueue(logger, NodeAdd, nil) + q.MoveAllToActiveOrBackoffQueue(logger, NodeAdd, nil, nil, nil) // hpp2 won't be moved regardless of its backoff timer. if q.activeQ.Len() != 4 { t.Errorf("Expected 4 items to be in activeQ, but got: %v", q.activeQ.Len()) @@ -737,8 +834,14 @@ func TestPriorityQueue_AssignedPodAdded(t *testing.T) { labelPod := st.MakePod().Name("lbp").Namespace(affinityPod.Namespace).Label("service", "securityscan").Node("node1").Obj() c := testingclock.NewFakeClock(time.Now()) - m := map[framework.ClusterEvent]sets.Set[string]{AssignedPodAdd: sets.New("fakePlugin")} - q := NewTestQueue(ctx, newDefaultQueueSort(), WithClock(c), WithClusterEventMap(m)) + m := makeEmptyQueueingHintMapPerProfile() + m[""][AssignedPodAdd] = []*QueueingHintFunction{ + { + PluginName: "fakePlugin", + QueueingHintFn: queueHintReturnQueueAfterBackoff, + }, + } + q := NewTestQueue(ctx, newDefaultQueueSort(), WithClock(c), WithQueueingHintMapPerProfile(m)) q.Add(logger, medPriorityPodInfo.Pod) // Add a couple of pods to the unschedulablePods. q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(unschedulablePodInfo.Pod, "fakePlugin"), q.SchedulingCycle()) @@ -866,7 +969,7 @@ func TestPriorityQueue_PendingPods(t *testing.T) { t.Errorf("Unexpected pending pods summary: want %v, but got %v.", wantSummary, gotSummary) } // Move all to active queue. We should still see the same set of pods. - q.MoveAllToActiveOrBackoffQueue(logger, TestEvent, nil) + q.MoveAllToActiveOrBackoffQueue(logger, TestEvent, nil, nil, nil) gotPods, gotSummary = q.PendingPods() if !reflect.DeepEqual(expectedSet, makeSet(gotPods)) { t.Error("Unexpected list of pending Pods.") @@ -1150,7 +1253,7 @@ func TestRecentlyTriedPodsGoBack(t *testing.T) { q.AddUnschedulableIfNotPresent(logger, p1, q.SchedulingCycle()) c.Step(DefaultPodInitialBackoffDuration) // Move all unschedulable pods to the active queue. - q.MoveAllToActiveOrBackoffQueue(logger, UnschedulableTimeout, nil) + q.MoveAllToActiveOrBackoffQueue(logger, UnschedulableTimeout, nil, nil, nil) // Simulation is over. Now let's pop all pods. The pod popped first should be // the last one we pop here. for i := 0; i < 5; i++ { @@ -1193,7 +1296,7 @@ func TestPodFailedSchedulingMultipleTimesDoesNotBlockNewerPod(t *testing.T) { // Move clock to make the unschedulable pods complete backoff. c.Step(DefaultPodInitialBackoffDuration + time.Second) // Move all unschedulable pods to the active queue. - q.MoveAllToActiveOrBackoffQueue(logger, UnschedulableTimeout, nil) + q.MoveAllToActiveOrBackoffQueue(logger, UnschedulableTimeout, nil, nil, nil) // Simulate a pod being popped by the scheduler, // At this time, unschedulable pod should be popped. @@ -1223,7 +1326,7 @@ func TestPodFailedSchedulingMultipleTimesDoesNotBlockNewerPod(t *testing.T) { // Move clock to make the unschedulable pods complete backoff. c.Step(DefaultPodInitialBackoffDuration + time.Second) // Move all unschedulable pods to the active queue. - q.MoveAllToActiveOrBackoffQueue(logger, UnschedulableTimeout, nil) + q.MoveAllToActiveOrBackoffQueue(logger, UnschedulableTimeout, nil, nil, nil) // At this time, newerPod should be popped // because it is the oldest tried pod. @@ -1267,7 +1370,7 @@ func TestHighPriorityBackoff(t *testing.T) { // Put in the unschedulable queue. q.AddUnschedulableIfNotPresent(logger, p, q.SchedulingCycle()) // Move all unschedulable pods to the active queue. - q.MoveAllToActiveOrBackoffQueue(logger, TestEvent, nil) + q.MoveAllToActiveOrBackoffQueue(logger, TestEvent, nil, nil, nil) p, err = q.Pop() if err != nil { @@ -1282,13 +1385,17 @@ func TestHighPriorityBackoff(t *testing.T) { // activeQ after one minutes if it is in unschedulablePods. func TestHighPriorityFlushUnschedulablePodsLeftover(t *testing.T) { c := testingclock.NewFakeClock(time.Now()) - m := map[framework.ClusterEvent]sets.Set[string]{ - NodeAdd: sets.New("fakePlugin"), + m := makeEmptyQueueingHintMapPerProfile() + m[""][NodeAdd] = []*QueueingHintFunction{ + { + PluginName: "fakePlugin", + QueueingHintFn: queueHintReturnQueueAfterBackoff, + }, } logger, ctx := ktesting.NewTestContext(t) ctx, cancel := context.WithCancel(ctx) defer cancel() - q := NewTestQueue(ctx, newDefaultQueueSort(), WithClock(c), WithClusterEventMap(m)) + q := NewTestQueue(ctx, newDefaultQueueSort(), WithClock(c), WithQueueingHintMapPerProfile(m)) midPod := st.MakePod().Name("test-midpod").Namespace("ns1").UID("tp-mid").Priority(midPriority).NominatedNodeName("node1").Obj() highPod := st.MakePod().Name("test-highpod").Namespace("ns1").UID("tp-high").Priority(highPriority).NominatedNodeName("node1").Obj() @@ -1448,7 +1555,7 @@ var ( queue.podBackoffQ.Add(pInfo) } moveAllToActiveOrBackoffQ = func(logger klog.Logger, queue *PriorityQueue, _ *framework.QueuedPodInfo) { - queue.MoveAllToActiveOrBackoffQueue(logger, UnschedulableTimeout, nil) + queue.MoveAllToActiveOrBackoffQueue(logger, UnschedulableTimeout, nil, nil, nil) } flushBackoffQ = func(logger klog.Logger, queue *PriorityQueue, _ *framework.QueuedPodInfo) { queue.clock.(*testingclock.FakeClock).Step(2 * time.Second) @@ -2040,7 +2147,7 @@ func TestBackOffFlow(t *testing.T) { } // An event happens. - q.MoveAllToActiveOrBackoffQueue(logger, UnschedulableTimeout, nil) + q.MoveAllToActiveOrBackoffQueue(logger, UnschedulableTimeout, nil, nil, nil) if _, ok, _ := q.podBackoffQ.Get(podInfo); !ok { t.Errorf("pod %v is not in the backoff queue", podID) @@ -2070,102 +2177,6 @@ func TestBackOffFlow(t *testing.T) { } } -func TestPodMatchesEvent(t *testing.T) { - tests := []struct { - name string - podInfo *framework.QueuedPodInfo - event framework.ClusterEvent - clusterEventMap map[framework.ClusterEvent]sets.Set[string] - want bool - }{ - { - name: "event not registered", - podInfo: newQueuedPodInfoForLookup(st.MakePod().Name("p").Obj()), - event: EmptyEvent, - clusterEventMap: map[framework.ClusterEvent]sets.Set[string]{ - NodeAllEvent: sets.New("foo"), - }, - want: false, - }, - { - name: "pod's failed plugin matches but event does not match", - podInfo: newQueuedPodInfoForLookup(st.MakePod().Name("p").Obj(), "bar"), - event: AssignedPodAdd, - clusterEventMap: map[framework.ClusterEvent]sets.Set[string]{ - NodeAllEvent: sets.New("foo", "bar"), - }, - want: false, - }, - { - name: "wildcard event wins regardless of event matching", - podInfo: newQueuedPodInfoForLookup(st.MakePod().Name("p").Obj(), "bar"), - event: WildCardEvent, - clusterEventMap: map[framework.ClusterEvent]sets.Set[string]{ - NodeAllEvent: sets.New("foo"), - }, - want: true, - }, - { - name: "pod's failed plugin and event both match", - podInfo: newQueuedPodInfoForLookup(st.MakePod().Name("p").Obj(), "bar"), - event: NodeTaintChange, - clusterEventMap: map[framework.ClusterEvent]sets.Set[string]{ - NodeAllEvent: sets.New("foo", "bar"), - }, - want: true, - }, - { - name: "pod's failed plugin registers fine-grained event", - podInfo: newQueuedPodInfoForLookup(st.MakePod().Name("p").Obj(), "bar"), - event: NodeTaintChange, - clusterEventMap: map[framework.ClusterEvent]sets.Set[string]{ - NodeAllEvent: sets.New("foo"), - NodeTaintChange: sets.New("bar"), - }, - want: true, - }, - { - name: "if pod failed by multiple plugins, a single match gets a final match", - podInfo: newQueuedPodInfoForLookup(st.MakePod().Name("p").Obj(), "foo", "bar"), - event: NodeAdd, - clusterEventMap: map[framework.ClusterEvent]sets.Set[string]{ - NodeAllEvent: sets.New("bar"), - }, - want: true, - }, - { - name: "plugin returns WildCardEvent and plugin name matches", - podInfo: newQueuedPodInfoForLookup(st.MakePod().Name("p").Obj(), "foo"), - event: PvAdd, - clusterEventMap: map[framework.ClusterEvent]sets.Set[string]{ - WildCardEvent: sets.New("foo"), - }, - want: true, - }, - { - name: "plugin returns WildCardEvent but plugin name not match", - podInfo: newQueuedPodInfoForLookup(st.MakePod().Name("p").Obj(), "foo"), - event: PvAdd, - clusterEventMap: map[framework.ClusterEvent]sets.Set[string]{ - WildCardEvent: sets.New("bar"), - }, - want: false, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - q := NewTestQueue(ctx, newDefaultQueueSort()) - q.clusterEventMap = tt.clusterEventMap - if got := q.podMatchesEvent(tt.podInfo, tt.event); got != tt.want { - t.Errorf("Want %v, but got %v", tt.want, got) - } - }) - } -} - func TestMoveAllToActiveOrBackoffQueue_PreEnqueueChecks(t *testing.T) { var podInfos []*framework.QueuedPodInfo for i := 0; i < 5; i++ { @@ -2224,7 +2235,7 @@ func TestMoveAllToActiveOrBackoffQueue_PreEnqueueChecks(t *testing.T) { // See: https://github.com/golang/go/issues/8687 podInfo.Timestamp = podInfo.Timestamp.Add(time.Duration((i - len(tt.podInfos))) * time.Millisecond) } - q.MoveAllToActiveOrBackoffQueue(logger, TestEvent, tt.preEnqueueCheck) + q.MoveAllToActiveOrBackoffQueue(logger, TestEvent, nil, nil, tt.preEnqueueCheck) var got []string for q.podBackoffQ.Len() != 0 { obj, err := q.podBackoffQ.Pop() @@ -2315,3 +2326,220 @@ func mustNewPodInfo(pod *v1.Pod) *framework.PodInfo { } return podInfo } + +// Test_isPodWorthRequeuing tests isPodWorthRequeuing function. +func Test_isPodWorthRequeuing(t *testing.T) { + count := 0 + queueHintReturnQueueImmediately := func(pod *v1.Pod, oldObj, newObj interface{}) framework.QueueingHint { + count++ + return framework.QueueImmediately + } + queueHintReturnQueueSkip := func(pod *v1.Pod, oldObj, newObj interface{}) framework.QueueingHint { + count++ + return framework.QueueSkip + } + queueHintReturnQueueAfterBackoff := func(pod *v1.Pod, oldObj, newObj interface{}) framework.QueueingHint { + count++ + return framework.QueueAfterBackoff + } + + tests := []struct { + name string + podInfo *framework.QueuedPodInfo + event framework.ClusterEvent + oldObj interface{} + newObj interface{} + expected framework.QueueingHint + expectedExecutionCount int // expected total execution count of queueing hint function + queueingHintMap QueueingHintMapPerProfile + }{ + { + name: "return QueueAfterBackoff when no queueing hint function is registered for the event", + podInfo: &framework.QueuedPodInfo{ + UnschedulablePlugins: sets.New("fooPlugin1"), + PodInfo: mustNewPodInfo(st.MakePod().Name("pod1").Namespace("ns1").UID("1").Obj()), + }, + event: NodeAdd, + oldObj: nil, + newObj: st.MakeNode().Node, + expected: framework.QueueSkip, + expectedExecutionCount: 0, + queueingHintMap: QueueingHintMapPerProfile{ + "": { + // no queueing hint function for NodeAdd. + AssignedPodAdd: { + { + // It will be ignored because the event is not NodeAdd. + PluginName: "fooPlugin1", + QueueingHintFn: queueHintReturnQueueImmediately, + }, + }, + }, + }, + }, + { + name: "return QueueAfterBackoff when the event is wildcard", + podInfo: &framework.QueuedPodInfo{ + UnschedulablePlugins: sets.New("fooPlugin1"), + PodInfo: mustNewPodInfo(st.MakePod().Name("pod1").Namespace("ns1").UID("1").Obj()), + }, + event: WildCardEvent, + oldObj: nil, + newObj: st.MakeNode().Node, + expected: framework.QueueAfterBackoff, + expectedExecutionCount: 0, + queueingHintMap: QueueingHintMapPerProfile{}, + }, + { + name: "QueueImmediately is the highest priority", + podInfo: &framework.QueuedPodInfo{ + UnschedulablePlugins: sets.New("fooPlugin1", "fooPlugin2", "fooPlugin3", "fooPlugin4"), + PodInfo: mustNewPodInfo(st.MakePod().Name("pod1").Namespace("ns1").UID("1").Obj()), + }, + event: NodeAdd, + oldObj: nil, + newObj: st.MakeNode().Node, + expected: framework.QueueImmediately, + expectedExecutionCount: 2, + queueingHintMap: QueueingHintMapPerProfile{ + "": { + NodeAdd: { + { + // executed + PluginName: "fooPlugin1", + QueueingHintFn: queueHintReturnQueueAfterBackoff, + }, + { + // executed + // But, no more queueing hint function is executed + // because the highest priority is QueueImmediately. + PluginName: "fooPlugin2", + QueueingHintFn: queueHintReturnQueueImmediately, + }, + { + PluginName: "fooPlugin3", + QueueingHintFn: queueHintReturnQueueAfterBackoff, + }, + { + PluginName: "fooPlugin4", + QueueingHintFn: queueHintReturnQueueSkip, + }, + }, + }, + }, + }, + { + name: "QueueSkip is the lowest priority", + podInfo: &framework.QueuedPodInfo{ + UnschedulablePlugins: sets.New("fooPlugin1", "fooPlugin2", "fooPlugin3"), + PodInfo: mustNewPodInfo(st.MakePod().Name("pod1").Namespace("ns1").UID("1").Obj()), + }, + event: NodeAdd, + oldObj: nil, + newObj: st.MakeNode().Node, + expected: framework.QueueAfterBackoff, + expectedExecutionCount: 3, + queueingHintMap: QueueingHintMapPerProfile{ + "": { + NodeAdd: { + { + PluginName: "fooPlugin1", + QueueingHintFn: queueHintReturnQueueAfterBackoff, + }, + { + PluginName: "fooPlugin2", + QueueingHintFn: queueHintReturnQueueSkip, + }, + { + PluginName: "fooPlugin3", + QueueingHintFn: queueHintReturnQueueAfterBackoff, + }, + }, + }, + }, + }, + { + name: "Queueing hint function that isn't from the plugin, that is in the UnschedulablePlugins, is ignored", + podInfo: &framework.QueuedPodInfo{ + UnschedulablePlugins: sets.New("fooPlugin1", "fooPlugin2"), + PodInfo: mustNewPodInfo(st.MakePod().Name("pod1").Namespace("ns1").UID("1").Obj()), + }, + event: NodeAdd, + oldObj: nil, + newObj: st.MakeNode().Node, + expected: framework.QueueAfterBackoff, + expectedExecutionCount: 2, + queueingHintMap: QueueingHintMapPerProfile{ + "": { + NodeAdd: { + { + PluginName: "fooPlugin1", + QueueingHintFn: queueHintReturnQueueAfterBackoff, + }, + { + PluginName: "fooPlugin2", + QueueingHintFn: queueHintReturnQueueSkip, + }, + { + PluginName: "fooPlugin3", + QueueingHintFn: queueHintReturnQueueImmediately, // It'll be ignored. + }, + }, + }, + }, + }, + { + name: "If event is specific Node update event, queueing hint function for NodeUpdate/UpdateNodeLabel is executed", + podInfo: &framework.QueuedPodInfo{ + UnschedulablePlugins: sets.New("fooPlugin1", "fooPlugin2"), + PodInfo: mustNewPodInfo(st.MakePod().Name("pod1").Namespace("ns1").UID("1").Obj()), + }, + event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.UpdateNodeLabel}, + oldObj: nil, + newObj: st.MakeNode().Node, + expected: framework.QueueAfterBackoff, + expectedExecutionCount: 3, + queueingHintMap: QueueingHintMapPerProfile{ + "": { + framework.ClusterEvent{Resource: framework.Node, ActionType: framework.UpdateNodeLabel}: { + { + PluginName: "fooPlugin1", + QueueingHintFn: queueHintReturnQueueAfterBackoff, + }, + { + PluginName: "fooPlugin2", + QueueingHintFn: queueHintReturnQueueAfterBackoff, + }, + }, + framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Update}: { + { + PluginName: "fooPlugin1", + QueueingHintFn: queueHintReturnQueueAfterBackoff, + }, + }, + NodeAdd: { // not executed because NodeAdd is unrelated. + { + PluginName: "fooPlugin1", + QueueingHintFn: queueHintReturnQueueAfterBackoff, + }, + }, + }, + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + count = 0 // reset count every time + logger, ctx := ktesting.NewTestContext(t) + q := NewTestQueue(ctx, newDefaultQueueSort(), WithQueueingHintMapPerProfile(test.queueingHintMap)) + actual := q.isPodWorthRequeuing(logger, test.podInfo, test.event, test.oldObj, test.newObj) + if actual != test.expected { + t.Errorf("isPodWorthRequeuing() = %v, want %v", actual, test.expected) + } + if count != test.expectedExecutionCount { + t.Errorf("isPodWorthRequeuing() executed queueing hint functions %v times, expected: %v", count, test.expectedExecutionCount) + } + }) + } +} diff --git a/pkg/scheduler/schedule_one.go b/pkg/scheduler/schedule_one.go index 6c0b69b789e..6d0b4daac70 100644 --- a/pkg/scheduler/schedule_one.go +++ b/pkg/scheduler/schedule_one.go @@ -296,11 +296,11 @@ func (sched *Scheduler) handleBindingCycleError( // It's intentional to "defer" this operation; otherwise MoveAllToActiveOrBackoffQueue() would // update `q.moveRequest` and thus move the assumed pod to backoffQ anyways. if status.IsUnschedulable() { - defer sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, internalqueue.AssignedPodDelete, func(pod *v1.Pod) bool { + defer sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, internalqueue.AssignedPodDelete, assumedPod, nil, func(pod *v1.Pod) bool { return assumedPod.UID != pod.UID }) } else { - sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, internalqueue.AssignedPodDelete, nil) + sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, internalqueue.AssignedPodDelete, assumedPod, nil, nil) } } diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 406f2d96553..ac5f4873dfe 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -24,7 +24,6 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/dynamic/dynamicinformer" "k8s.io/client-go/informers" @@ -285,7 +284,6 @@ func New(ctx context.Context, nodeLister := informerFactory.Core().V1().Nodes().Lister() snapshot := internalcache.NewEmptySnapshot() - clusterEventMap := make(map[framework.ClusterEvent]sets.Set[string]) metricsRecorder := metrics.NewMetricsAsyncRecorder(1000, time.Second, stopEverything) profiles, err := profile.NewMap(ctx, options.profiles, registry, recorderFactory, @@ -295,7 +293,6 @@ func New(ctx context.Context, frameworkruntime.WithInformerFactory(informerFactory), frameworkruntime.WithSnapshotSharedLister(snapshot), frameworkruntime.WithCaptureProfile(frameworkruntime.CaptureProfile(options.frameworkCapturer)), - frameworkruntime.WithClusterEventMap(clusterEventMap), frameworkruntime.WithParallelism(int(options.parallelism)), frameworkruntime.WithExtenders(extenders), frameworkruntime.WithMetricsRecorder(metricsRecorder), @@ -309,18 +306,21 @@ func New(ctx context.Context, } preEnqueuePluginMap := make(map[string][]framework.PreEnqueuePlugin) + queueingHintsPerProfile := make(internalqueue.QueueingHintMapPerProfile) for profileName, profile := range profiles { preEnqueuePluginMap[profileName] = profile.PreEnqueuePlugins() + queueingHintsPerProfile[profileName] = buildQueueingHintMap(profile.EnqueueExtensions()) } + podQueue := internalqueue.NewSchedulingQueue( profiles[options.profiles[0].SchedulerName].QueueSortFunc(), informerFactory, internalqueue.WithPodInitialBackoffDuration(time.Duration(options.podInitialBackoffSeconds)*time.Second), internalqueue.WithPodMaxBackoffDuration(time.Duration(options.podMaxBackoffSeconds)*time.Second), internalqueue.WithPodLister(podLister), - internalqueue.WithClusterEventMap(clusterEventMap), internalqueue.WithPodMaxInUnschedulablePodsDuration(options.podMaxInUnschedulablePodsDuration), internalqueue.WithPreEnqueuePluginMap(preEnqueuePluginMap), + internalqueue.WithQueueingHintMapPerProfile(queueingHintsPerProfile), internalqueue.WithPluginMetricsSamplePercent(pluginMetricsSamplePercent), internalqueue.WithMetricsRecorder(*metricsRecorder), ) @@ -349,11 +349,36 @@ func New(ctx context.Context, } sched.applyDefaultHandlers() - addAllEventHandlers(sched, informerFactory, dynInformerFactory, unionedGVKs(clusterEventMap)) + addAllEventHandlers(sched, informerFactory, dynInformerFactory, unionedGVKs(queueingHintsPerProfile)) return sched, nil } +// defaultQueueingHintFn is the default queueing hint function. +// It always returns QueueAfterBackoff as the queueing hint. +var defaultQueueingHintFn = func(_ *v1.Pod, _, _ interface{}) framework.QueueingHint { + return framework.QueueAfterBackoff +} + +func buildQueueingHintMap(es []framework.EnqueueExtensions) internalqueue.QueueingHintMap { + queueingHintMap := make(map[framework.ClusterEvent][]*internalqueue.QueueingHintFunction) + for _, e := range es { + events := e.EventsToRegister() + for _, event := range events { + fn := event.QueueingHintFn + if fn == nil { + fn = defaultQueueingHintFn + } + + queueingHintMap[event.Event] = append(queueingHintMap[event.Event], &internalqueue.QueueingHintFunction{ + PluginName: e.Name(), + QueueingHintFn: fn, + }) + } + } + return queueingHintMap +} + // Run begins watching and scheduling. It starts scheduling and blocked until the context is done. func (sched *Scheduler) Run(ctx context.Context) { logger := klog.FromContext(ctx) @@ -439,13 +464,15 @@ func buildExtenders(logger klog.Logger, extenders []schedulerapi.Extender, profi type FailureHandlerFn func(ctx context.Context, fwk framework.Framework, podInfo *framework.QueuedPodInfo, status *framework.Status, nominatingInfo *framework.NominatingInfo, start time.Time) -func unionedGVKs(m map[framework.ClusterEvent]sets.Set[string]) map[framework.GVK]framework.ActionType { +func unionedGVKs(queueingHintsPerProfile internalqueue.QueueingHintMapPerProfile) map[framework.GVK]framework.ActionType { gvkMap := make(map[framework.GVK]framework.ActionType) - for evt := range m { - if _, ok := gvkMap[evt.Resource]; ok { - gvkMap[evt.Resource] |= evt.ActionType - } else { - gvkMap[evt.Resource] = evt.ActionType + for _, queueingHints := range queueingHintsPerProfile { + for evt := range queueingHints { + if _, ok := gvkMap[evt.Resource]; ok { + gvkMap[evt.Resource] |= evt.ActionType + } else { + gvkMap[evt.Resource] = evt.ActionType + } } } return gvkMap diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 71f301aa94b..46114b8ec48 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -39,7 +39,9 @@ import ( "k8s.io/klog/v2" "k8s.io/klog/v2/ktesting" schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config" + "k8s.io/kubernetes/pkg/scheduler/apis/config/testing/defaults" "k8s.io/kubernetes/pkg/scheduler/framework" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultbinder" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/queuesort" frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime" @@ -643,3 +645,440 @@ func indexByPodAnnotationNodeName(obj interface{}) ([]string, error) { } return []string{nodeName}, nil } + +const ( + fakeNoop = "fakeNoop" + fakeNode = "fakeNode" + fakePod = "fakePod" + fakeNoopRuntime = "fakeNoopRuntime" + queueSort = "no-op-queue-sort-plugin" + fakeBind = "bind-plugin" +) + +func Test_buildQueueingHintMap(t *testing.T) { + tests := []struct { + name string + plugins []framework.Plugin + want map[framework.ClusterEvent][]*internalqueue.QueueingHintFunction + assertFn func(t *testing.T, got map[framework.ClusterEvent][]*internalqueue.QueueingHintFunction) bool + }{ + { + name: "no-op plugin", + plugins: []framework.Plugin{&fakeNoopPlugin{}}, + want: map[framework.ClusterEvent][]*internalqueue.QueueingHintFunction{ + {Resource: framework.Pod, ActionType: framework.All}: { + {PluginName: fakeBind, QueueingHintFn: defaultQueueingHintFn}, + {PluginName: fakeNoop, QueueingHintFn: defaultQueueingHintFn}, + {PluginName: queueSort, QueueingHintFn: defaultQueueingHintFn}, + }, + {Resource: framework.Node, ActionType: framework.All}: { + {PluginName: fakeBind, QueueingHintFn: defaultQueueingHintFn}, + {PluginName: fakeNoop, QueueingHintFn: defaultQueueingHintFn}, + {PluginName: queueSort, QueueingHintFn: defaultQueueingHintFn}, + }, + {Resource: framework.CSINode, ActionType: framework.All}: { + {PluginName: fakeBind, QueueingHintFn: defaultQueueingHintFn}, + {PluginName: fakeNoop, QueueingHintFn: defaultQueueingHintFn}, + {PluginName: queueSort, QueueingHintFn: defaultQueueingHintFn}, + }, + {Resource: framework.CSIDriver, ActionType: framework.All}: { + {PluginName: fakeBind, QueueingHintFn: defaultQueueingHintFn}, + {PluginName: fakeNoop, QueueingHintFn: defaultQueueingHintFn}, + {PluginName: queueSort, QueueingHintFn: defaultQueueingHintFn}, + }, + {Resource: framework.CSIStorageCapacity, ActionType: framework.All}: { + {PluginName: fakeBind, QueueingHintFn: defaultQueueingHintFn}, + {PluginName: fakeNoop, QueueingHintFn: defaultQueueingHintFn}, + {PluginName: queueSort, QueueingHintFn: defaultQueueingHintFn}, + }, + {Resource: framework.PersistentVolume, ActionType: framework.All}: { + {PluginName: fakeBind, QueueingHintFn: defaultQueueingHintFn}, + {PluginName: fakeNoop, QueueingHintFn: defaultQueueingHintFn}, + {PluginName: queueSort, QueueingHintFn: defaultQueueingHintFn}, + }, + {Resource: framework.StorageClass, ActionType: framework.All}: { + {PluginName: fakeBind, QueueingHintFn: defaultQueueingHintFn}, + {PluginName: fakeNoop, QueueingHintFn: defaultQueueingHintFn}, + {PluginName: queueSort, QueueingHintFn: defaultQueueingHintFn}, + }, + {Resource: framework.PersistentVolumeClaim, ActionType: framework.All}: { + {PluginName: fakeBind, QueueingHintFn: defaultQueueingHintFn}, + {PluginName: fakeNoop, QueueingHintFn: defaultQueueingHintFn}, + {PluginName: queueSort, QueueingHintFn: defaultQueueingHintFn}, + }, + {Resource: framework.PodSchedulingContext, ActionType: framework.All}: { + {PluginName: fakeBind, QueueingHintFn: defaultQueueingHintFn}, + {PluginName: fakeNoop, QueueingHintFn: defaultQueueingHintFn}, + {PluginName: queueSort, QueueingHintFn: defaultQueueingHintFn}, + }, + }, + }, + { + name: "node and pod plugin", + plugins: []framework.Plugin{&fakeNodePlugin{}, &fakePodPlugin{}}, + want: map[framework.ClusterEvent][]*internalqueue.QueueingHintFunction{ + {Resource: framework.Pod, ActionType: framework.All}: { + {PluginName: fakeBind, QueueingHintFn: defaultQueueingHintFn}, + {PluginName: queueSort, QueueingHintFn: defaultQueueingHintFn}, + }, + {Resource: framework.Pod, ActionType: framework.Add}: { + {PluginName: fakePod, QueueingHintFn: fakePodPluginQueueingFn}, + }, + {Resource: framework.Node, ActionType: framework.All}: { + {PluginName: fakeBind, QueueingHintFn: defaultQueueingHintFn}, + {PluginName: queueSort, QueueingHintFn: defaultQueueingHintFn}, + }, + {Resource: framework.Node, ActionType: framework.Add}: { + {PluginName: fakeNode, QueueingHintFn: fakeNodePluginQueueingFn}, + }, + {Resource: framework.CSINode, ActionType: framework.All}: { + {PluginName: fakeBind, QueueingHintFn: defaultQueueingHintFn}, + {PluginName: queueSort, QueueingHintFn: defaultQueueingHintFn}, + }, + {Resource: framework.CSIDriver, ActionType: framework.All}: { + {PluginName: fakeBind, QueueingHintFn: defaultQueueingHintFn}, + {PluginName: queueSort, QueueingHintFn: defaultQueueingHintFn}, + }, + {Resource: framework.CSIStorageCapacity, ActionType: framework.All}: { + {PluginName: fakeBind, QueueingHintFn: defaultQueueingHintFn}, + {PluginName: queueSort, QueueingHintFn: defaultQueueingHintFn}, + }, + {Resource: framework.PersistentVolume, ActionType: framework.All}: { + {PluginName: fakeBind, QueueingHintFn: defaultQueueingHintFn}, + {PluginName: queueSort, QueueingHintFn: defaultQueueingHintFn}, + }, + {Resource: framework.StorageClass, ActionType: framework.All}: { + {PluginName: fakeBind, QueueingHintFn: defaultQueueingHintFn}, + {PluginName: queueSort, QueueingHintFn: defaultQueueingHintFn}, + }, + {Resource: framework.PersistentVolumeClaim, ActionType: framework.All}: { + {PluginName: fakeBind, QueueingHintFn: defaultQueueingHintFn}, + {PluginName: queueSort, QueueingHintFn: defaultQueueingHintFn}, + }, + {Resource: framework.PodSchedulingContext, ActionType: framework.All}: { + {PluginName: fakeBind, QueueingHintFn: defaultQueueingHintFn}, + {PluginName: queueSort, QueueingHintFn: defaultQueueingHintFn}, + }, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + registry := frameworkruntime.Registry{} + cfgPls := &schedulerapi.Plugins{} + plugins := append(tt.plugins, &fakebindPlugin{}, &fakeQueueSortPlugin{}) + for _, pl := range plugins { + tmpPl := pl + if err := registry.Register(pl.Name(), func(_ runtime.Object, _ framework.Handle) (framework.Plugin, error) { + return tmpPl, nil + }); err != nil { + t.Fatalf("fail to register filter plugin (%s)", pl.Name()) + } + cfgPls.MultiPoint.Enabled = append(cfgPls.MultiPoint.Enabled, schedulerapi.Plugin{Name: pl.Name()}) + } + + profile := schedulerapi.KubeSchedulerProfile{Plugins: cfgPls} + stopCh := make(chan struct{}) + defer close(stopCh) + fwk, err := newFramework(registry, profile, stopCh) + if err != nil { + t.Fatal(err) + } + + exts := fwk.EnqueueExtensions() + // need to sort to make the test result stable. + sort.Slice(exts, func(i, j int) bool { + return exts[i].Name() < exts[j].Name() + }) + + got := buildQueueingHintMap(exts) + + for e, fns := range got { + wantfns, ok := tt.want[e] + if !ok { + t.Errorf("got unexpected event %v", e) + continue + } + if len(fns) != len(wantfns) { + t.Errorf("got %v queueing hint functions, want %v", len(fns), len(wantfns)) + continue + } + for i, fn := range fns { + if fn.PluginName != wantfns[i].PluginName { + t.Errorf("got plugin name %v, want %v", fn.PluginName, wantfns[i].PluginName) + continue + } + if fn.QueueingHintFn(nil, nil, nil) != wantfns[i].QueueingHintFn(nil, nil, nil) { + t.Errorf("got queueing hint function (%v) returning %v, expect it to return %v", fn.PluginName, fn.QueueingHintFn(nil, nil, nil), wantfns[i].QueueingHintFn(nil, nil, nil)) + continue + } + } + } + }) + } +} + +// Test_UnionedGVKs tests UnionedGVKs worked with buildQueueingHintMap. +func Test_UnionedGVKs(t *testing.T) { + tests := []struct { + name string + plugins schedulerapi.PluginSet + want map[framework.GVK]framework.ActionType + }{ + { + name: "no-op plugin", + plugins: schedulerapi.PluginSet{ + Enabled: []schedulerapi.Plugin{ + {Name: fakeNoop}, + {Name: queueSort}, + {Name: fakeBind}, + }, + Disabled: []schedulerapi.Plugin{{Name: "*"}}, // disable default plugins + }, + want: map[framework.GVK]framework.ActionType{ + framework.Pod: framework.All, + framework.Node: framework.All, + framework.CSINode: framework.All, + framework.CSIDriver: framework.All, + framework.CSIStorageCapacity: framework.All, + framework.PersistentVolume: framework.All, + framework.PersistentVolumeClaim: framework.All, + framework.StorageClass: framework.All, + framework.PodSchedulingContext: framework.All, + }, + }, + { + name: "node plugin", + plugins: schedulerapi.PluginSet{ + Enabled: []schedulerapi.Plugin{ + {Name: fakeNode}, + {Name: queueSort}, + {Name: fakeBind}, + }, + Disabled: []schedulerapi.Plugin{{Name: "*"}}, // disable default plugins + }, + want: map[framework.GVK]framework.ActionType{ + framework.Pod: framework.All, + framework.Node: framework.All, + framework.CSINode: framework.All, + framework.CSIDriver: framework.All, + framework.CSIStorageCapacity: framework.All, + framework.PersistentVolume: framework.All, + framework.PersistentVolumeClaim: framework.All, + framework.StorageClass: framework.All, + framework.PodSchedulingContext: framework.All, + }, + }, + { + name: "pod plugin", + plugins: schedulerapi.PluginSet{ + Enabled: []schedulerapi.Plugin{ + {Name: fakePod}, + {Name: queueSort}, + {Name: fakeBind}, + }, + Disabled: []schedulerapi.Plugin{{Name: "*"}}, // disable default plugins + }, + want: map[framework.GVK]framework.ActionType{ + framework.Pod: framework.All, + framework.Node: framework.All, + framework.CSINode: framework.All, + framework.CSIDriver: framework.All, + framework.CSIStorageCapacity: framework.All, + framework.PersistentVolume: framework.All, + framework.PersistentVolumeClaim: framework.All, + framework.StorageClass: framework.All, + framework.PodSchedulingContext: framework.All, + }, + }, + { + name: "node and pod plugin", + plugins: schedulerapi.PluginSet{ + Enabled: []schedulerapi.Plugin{ + {Name: fakePod}, + {Name: fakeNode}, + {Name: queueSort}, + {Name: fakeBind}, + }, + Disabled: []schedulerapi.Plugin{{Name: "*"}}, // disable default plugins + }, + want: map[framework.GVK]framework.ActionType{ + framework.Pod: framework.All, + framework.Node: framework.All, + framework.CSINode: framework.All, + framework.CSIDriver: framework.All, + framework.CSIStorageCapacity: framework.All, + framework.PersistentVolume: framework.All, + framework.PersistentVolumeClaim: framework.All, + framework.StorageClass: framework.All, + framework.PodSchedulingContext: framework.All, + }, + }, + { + name: "no-op runtime plugin", + plugins: schedulerapi.PluginSet{ + Enabled: []schedulerapi.Plugin{ + {Name: fakeNoopRuntime}, + {Name: queueSort}, + {Name: fakeBind}, + }, + Disabled: []schedulerapi.Plugin{{Name: "*"}}, // disable default plugins + }, + want: map[framework.GVK]framework.ActionType{ + framework.Pod: framework.All, + framework.Node: framework.All, + framework.CSINode: framework.All, + framework.CSIDriver: framework.All, + framework.CSIStorageCapacity: framework.All, + framework.PersistentVolume: framework.All, + framework.PersistentVolumeClaim: framework.All, + framework.StorageClass: framework.All, + framework.PodSchedulingContext: framework.All, + }, + }, + { + name: "plugins with default profile", + plugins: schedulerapi.PluginSet{Enabled: defaults.PluginsV1.MultiPoint.Enabled}, + want: map[framework.GVK]framework.ActionType{ + framework.Pod: framework.All, + framework.Node: framework.All, + framework.CSINode: framework.All, + framework.CSIDriver: framework.All, + framework.CSIStorageCapacity: framework.All, + framework.PersistentVolume: framework.All, + framework.PersistentVolumeClaim: framework.All, + framework.StorageClass: framework.All, + framework.PodSchedulingContext: framework.All, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + registry := plugins.NewInTreeRegistry() + + cfgPls := &schedulerapi.Plugins{MultiPoint: tt.plugins} + plugins := []framework.Plugin{&fakeNodePlugin{}, &fakePodPlugin{}, &fakeNoopPlugin{}, &fakeNoopRuntimePlugin{}, &fakeQueueSortPlugin{}, &fakebindPlugin{}} + for _, pl := range plugins { + tmpPl := pl + if err := registry.Register(pl.Name(), func(_ runtime.Object, _ framework.Handle) (framework.Plugin, error) { + return tmpPl, nil + }); err != nil { + t.Fatalf("fail to register filter plugin (%s)", pl.Name()) + } + } + + profile := schedulerapi.KubeSchedulerProfile{Plugins: cfgPls, PluginConfig: defaults.PluginConfigsV1} + stopCh := make(chan struct{}) + defer close(stopCh) + fwk, err := newFramework(registry, profile, stopCh) + if err != nil { + t.Fatal(err) + } + + queueingHintsPerProfile := internalqueue.QueueingHintMapPerProfile{ + "default": buildQueueingHintMap(fwk.EnqueueExtensions()), + } + got := unionedGVKs(queueingHintsPerProfile) + + if diff := cmp.Diff(tt.want, got); diff != "" { + t.Errorf("Unexpected eventToPlugin map (-want,+got):%s", diff) + } + }) + } +} + +func newFramework(r frameworkruntime.Registry, profile schedulerapi.KubeSchedulerProfile, stopCh <-chan struct{}) (framework.Framework, error) { + return frameworkruntime.NewFramework(context.Background(), r, &profile, + frameworkruntime.WithSnapshotSharedLister(internalcache.NewSnapshot(nil, nil)), + frameworkruntime.WithInformerFactory(informers.NewSharedInformerFactory(fake.NewSimpleClientset(), 0)), + ) +} + +var _ framework.QueueSortPlugin = &fakeQueueSortPlugin{} + +// fakeQueueSortPlugin is a no-op implementation for QueueSort extension point. +type fakeQueueSortPlugin struct{} + +func (pl *fakeQueueSortPlugin) Name() string { + return queueSort +} + +func (pl *fakeQueueSortPlugin) Less(_, _ *framework.QueuedPodInfo) bool { + return false +} + +var _ framework.BindPlugin = &fakebindPlugin{} + +// fakebindPlugin is a no-op implementation for Bind extension point. +type fakebindPlugin struct{} + +func (t *fakebindPlugin) Name() string { + return fakeBind +} + +func (t *fakebindPlugin) Bind(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) *framework.Status { + return nil +} + +// fakeNoopPlugin doesn't implement interface framework.EnqueueExtensions. +type fakeNoopPlugin struct{} + +func (*fakeNoopPlugin) Name() string { return fakeNoop } + +func (*fakeNoopPlugin) Filter(_ context.Context, _ *framework.CycleState, _ *v1.Pod, _ *framework.NodeInfo) *framework.Status { + return nil +} + +var hintFromFakeNode = framework.QueueingHint(100) + +type fakeNodePlugin struct{} + +var fakeNodePluginQueueingFn = func(_ *v1.Pod, _, _ interface{}) framework.QueueingHint { + return hintFromFakeNode +} + +func (*fakeNodePlugin) Name() string { return fakeNode } + +func (*fakeNodePlugin) Filter(_ context.Context, _ *framework.CycleState, _ *v1.Pod, _ *framework.NodeInfo) *framework.Status { + return nil +} + +func (*fakeNodePlugin) EventsToRegister() []framework.ClusterEventWithHint { + return []framework.ClusterEventWithHint{ + {Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add}, QueueingHintFn: fakeNodePluginQueueingFn}, + } +} + +var hintFromFakePod = framework.QueueingHint(101) + +type fakePodPlugin struct{} + +var fakePodPluginQueueingFn = func(_ *v1.Pod, _, _ interface{}) framework.QueueingHint { + return hintFromFakePod +} + +func (*fakePodPlugin) Name() string { return fakePod } + +func (*fakePodPlugin) Filter(_ context.Context, _ *framework.CycleState, _ *v1.Pod, _ *framework.NodeInfo) *framework.Status { + return nil +} + +func (*fakePodPlugin) EventsToRegister() []framework.ClusterEventWithHint { + return []framework.ClusterEventWithHint{ + {Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Add}, QueueingHintFn: fakePodPluginQueueingFn}, + } +} + +// fakeNoopRuntimePlugin implement interface framework.EnqueueExtensions, but returns nil +// at runtime. This can simulate a plugin registered at scheduler setup, but does nothing +// due to some disabled feature gate. +type fakeNoopRuntimePlugin struct{} + +func (*fakeNoopRuntimePlugin) Name() string { return fakeNoopRuntime } + +func (*fakeNoopRuntimePlugin) Filter(_ context.Context, _ *framework.CycleState, _ *v1.Pod, _ *framework.NodeInfo) *framework.Status { + return nil +} + +func (*fakeNoopRuntimePlugin) EventsToRegister() []framework.ClusterEventWithHint { return nil } diff --git a/pkg/scheduler/util/utils.go b/pkg/scheduler/util/utils.go index a477742032d..eb7c4301f30 100644 --- a/pkg/scheduler/util/utils.go +++ b/pkg/scheduler/util/utils.go @@ -25,6 +25,7 @@ import ( v1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" utilerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/net" @@ -159,3 +160,26 @@ func IsScalarResourceName(name v1.ResourceName) bool { return v1helper.IsExtendedResourceName(name) || v1helper.IsHugePageResourceName(name) || v1helper.IsPrefixedNativeResource(name) || v1helper.IsAttachableVolumeResourceName(name) } + +// As converts two objects to the given type. +// Both objects must be of the same type. If not, an error is returned. +// nil objects are allowed and will be converted to nil. +func As[T runtime.Object](oldObj, newobj interface{}) (T, T, error) { + var oldTyped T + var newTyped T + var ok bool + if newobj != nil { + newTyped, ok = newobj.(T) + if !ok { + return oldTyped, newTyped, fmt.Errorf("expected %T, but got %T", newTyped, newobj) + } + } + + if oldObj != nil { + oldTyped, ok = oldObj.(T) + if !ok { + return oldTyped, newTyped, fmt.Errorf("expected %T, but got %T", oldTyped, oldObj) + } + } + return oldTyped, newTyped, nil +} diff --git a/pkg/scheduler/util/utils_test.go b/pkg/scheduler/util/utils_test.go index 7dd66a0f97b..bb8115cb8e1 100644 --- a/pkg/scheduler/util/utils_test.go +++ b/pkg/scheduler/util/utils_test.go @@ -383,3 +383,107 @@ func TestPatchPodStatus(t *testing.T) { }) } } + +// Test_As tests the As function with Pod. +func Test_As_Pod(t *testing.T) { + tests := []struct { + name string + oldObj interface{} + newObj interface{} + wantOldObj *v1.Pod + wantNewObj *v1.Pod + wantErr bool + }{ + { + name: "nil old Pod", + oldObj: nil, + newObj: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, + wantOldObj: nil, + wantNewObj: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, + }, + { + name: "nil new Pod", + oldObj: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, + newObj: nil, + wantOldObj: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, + wantNewObj: nil, + }, + { + name: "two different kinds of objects", + oldObj: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, + newObj: &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, + wantErr: true, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + gotOld, gotNew, err := As[*v1.Pod](tc.oldObj, tc.newObj) + if err != nil && !tc.wantErr { + t.Fatalf("unexpected error: %v", err) + } + if tc.wantErr { + if err == nil { + t.Fatalf("expected error, but got nil") + } + return + } + + if diff := cmp.Diff(tc.wantOldObj, gotOld); diff != "" { + t.Errorf("unexpected old object (-want,+got):\n%s", diff) + } + if diff := cmp.Diff(tc.wantNewObj, gotNew); diff != "" { + t.Errorf("unexpected new object (-want,+got):\n%s", diff) + } + }) + } +} + +// Test_As_Node tests the As function with Node. +func Test_As_Node(t *testing.T) { + tests := []struct { + name string + oldObj interface{} + newObj interface{} + wantOldObj *v1.Node + wantNewObj *v1.Node + wantErr bool + }{ + { + name: "nil old Node", + oldObj: nil, + newObj: &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, + wantOldObj: nil, + wantNewObj: &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, + }, + { + name: "nil new Node", + oldObj: &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, + newObj: nil, + wantOldObj: &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, + wantNewObj: nil, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + gotOld, gotNew, err := As[*v1.Node](tc.oldObj, tc.newObj) + if err != nil && !tc.wantErr { + t.Fatalf("unexpected error: %v", err) + } + if tc.wantErr { + if err == nil { + t.Fatalf("expected error, but got nil") + } + return + } + + if diff := cmp.Diff(tc.wantOldObj, gotOld); diff != "" { + t.Errorf("unexpected old object (-want,+got):\n%s", diff) + } + if diff := cmp.Diff(tc.wantNewObj, gotNew); diff != "" { + t.Errorf("unexpected new object (-want,+got):\n%s", diff) + } + }) + } +} diff --git a/test/integration/scheduler/queue_test.go b/test/integration/scheduler/queue_test.go index a816a8e4fa9..cd9b0627951 100644 --- a/test/integration/scheduler/queue_test.go +++ b/test/integration/scheduler/queue_test.go @@ -276,9 +276,9 @@ func (f *fakeCRPlugin) Filter(_ context.Context, _ *framework.CycleState, _ *v1. // EventsToRegister returns the possible events that may make a Pod // failed by this plugin schedulable. -func (f *fakeCRPlugin) EventsToRegister() []framework.ClusterEvent { - return []framework.ClusterEvent{ - {Resource: "foos.v1.example.com", ActionType: framework.All}, +func (f *fakeCRPlugin) EventsToRegister() []framework.ClusterEventWithHint { + return []framework.ClusterEventWithHint{ + {Event: framework.ClusterEvent{Resource: "foos.v1.example.com", ActionType: framework.All}}, } }