feature(scheduler): implement ClusterEventWithHint to filter out useless events

This commit is contained in:
Kensei Nakada
2023-06-08 04:54:30 +00:00
parent 7cd51541cd
commit 6f8d38406a
29 changed files with 1281 additions and 511 deletions

View File

@@ -57,7 +57,7 @@ func (sched *Scheduler) onStorageClassAdd(obj interface{}) {
// We don't need to invalidate cached results because results will not be
// cached for pod that has unbound immediate PVCs.
if sc.VolumeBindingMode != nil && *sc.VolumeBindingMode == storagev1.VolumeBindingWaitForFirstConsumer {
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, queue.StorageClassAdd, nil)
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, queue.StorageClassAdd, nil, sc, nil)
}
}
@@ -71,7 +71,7 @@ func (sched *Scheduler) addNodeToCache(obj interface{}) {
nodeInfo := sched.Cache.AddNode(logger, node)
logger.V(3).Info("Add event for node", "node", klog.KObj(node))
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, queue.NodeAdd, preCheckForNode(nodeInfo))
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, queue.NodeAdd, nil, node, preCheckForNode(nodeInfo))
}
func (sched *Scheduler) updateNodeInCache(oldObj, newObj interface{}) {
@@ -90,7 +90,7 @@ func (sched *Scheduler) updateNodeInCache(oldObj, newObj interface{}) {
nodeInfo := sched.Cache.UpdateNode(logger, oldNode, newNode)
// Only requeue unschedulable pods if the node became more schedulable.
if event := nodeSchedulingPropertiesChange(newNode, oldNode); event != nil {
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, *event, preCheckForNode(nodeInfo))
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, *event, oldNode, newNode, preCheckForNode(nodeInfo))
}
}
@@ -180,7 +180,7 @@ func (sched *Scheduler) deletePodFromSchedulingQueue(obj interface{}) {
// removing it from the scheduler cache. In this case, signal a AssignedPodDelete
// event to immediately retry some unscheduled Pods.
if fwk.RejectWaitingPod(pod.UID) {
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, queue.AssignedPodDelete, nil)
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, queue.AssignedPodDelete, pod, nil, nil)
}
}
@@ -218,7 +218,7 @@ func (sched *Scheduler) updatePodInCache(oldObj, newObj interface{}) {
logger.Error(err, "Scheduler cache UpdatePod failed", "pod", klog.KObj(oldPod))
}
sched.SchedulingQueue.AssignedPodUpdated(logger, newPod)
sched.SchedulingQueue.AssignedPodUpdated(logger, oldPod, newPod)
}
func (sched *Scheduler) deletePodFromCache(obj interface{}) {
@@ -243,7 +243,7 @@ func (sched *Scheduler) deletePodFromCache(obj interface{}) {
logger.Error(err, "Scheduler cache RemovePod failed", "pod", klog.KObj(pod))
}
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, queue.AssignedPodDelete, nil)
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, queue.AssignedPodDelete, pod, nil, nil)
}
// assignedPod selects pods that are assigned (scheduled and running).
@@ -332,20 +332,20 @@ func addAllEventHandlers(
funcs := cache.ResourceEventHandlerFuncs{}
if at&framework.Add != 0 {
evt := framework.ClusterEvent{Resource: gvk, ActionType: framework.Add, Label: fmt.Sprintf("%vAdd", shortGVK)}
funcs.AddFunc = func(_ interface{}) {
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, evt, nil)
funcs.AddFunc = func(obj interface{}) {
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, evt, nil, obj, nil)
}
}
if at&framework.Update != 0 {
evt := framework.ClusterEvent{Resource: gvk, ActionType: framework.Update, Label: fmt.Sprintf("%vUpdate", shortGVK)}
funcs.UpdateFunc = func(_, _ interface{}) {
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, evt, nil)
funcs.UpdateFunc = func(old, obj interface{}) {
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, evt, old, obj, nil)
}
}
if at&framework.Delete != 0 {
evt := framework.ClusterEvent{Resource: gvk, ActionType: framework.Delete, Label: fmt.Sprintf("%vDelete", shortGVK)}
funcs.DeleteFunc = func(_ interface{}) {
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, evt, nil)
funcs.DeleteFunc = func(obj interface{}) {
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, evt, obj, nil, nil)
}
}
return funcs
@@ -412,8 +412,8 @@ func addAllEventHandlers(
if at&framework.Update != 0 {
informerFactory.Storage().V1().StorageClasses().Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
UpdateFunc: func(_, _ interface{}) {
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, queue.StorageClassUpdate, nil)
UpdateFunc: func(old, obj interface{}) {
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, queue.StorageClassUpdate, old, obj, nil)
},
},
)

View File

@@ -323,13 +323,15 @@ type QueueSortPlugin interface {
// move unschedulable Pods in internal scheduling queues. Plugins
// that fail pod scheduling (e.g., Filter plugins) are expected to implement this interface.
type EnqueueExtensions interface {
Plugin
// EventsToRegister returns a series of possible events that may cause a Pod
// failed by this plugin schedulable.
// failed by this plugin schedulable. Each event has a callback function that
// filters out events to reduce useless retry of Pod's scheduling.
// The events will be registered when instantiating the internal scheduling queue,
// and leveraged to build event handlers dynamically.
// Note: the returned list needs to be static (not depend on configuration parameters);
// otherwise it would lead to undefined behavior.
EventsToRegister() []ClusterEvent
EventsToRegister() []ClusterEventWithHint
}
// PreFilterExtensions is an interface that is included in plugins that allow specifying
@@ -513,6 +515,9 @@ type Framework interface {
// PreEnqueuePlugins returns the registered preEnqueue plugins.
PreEnqueuePlugins() []PreEnqueuePlugin
// EnqueueExtensions returns the registered Enqueue extensions.
EnqueueExtensions() []EnqueueExtensions
// QueueSortFunc returns the function to sort pods in scheduling queue
QueueSortFunc() LessFunc

View File

@@ -245,22 +245,21 @@ func (pl *dynamicResources) Name() string {
// EventsToRegister returns the possible events that may make a Pod
// failed by this plugin schedulable.
func (pl *dynamicResources) EventsToRegister() []framework.ClusterEvent {
func (pl *dynamicResources) EventsToRegister() []framework.ClusterEventWithHint {
if !pl.enabled {
return nil
}
events := []framework.ClusterEvent{
events := []framework.ClusterEventWithHint{
// Allocation is tracked in ResourceClaims, so any changes may make the pods schedulable.
{Resource: framework.ResourceClaim, ActionType: framework.Add | framework.Update},
{Event: framework.ClusterEvent{Resource: framework.ResourceClaim, ActionType: framework.Add | framework.Update}},
// When a driver has provided additional information, a pod waiting for that information
// may be schedulable.
// TODO (#113702): can we change this so that such an event does not trigger *all* pods?
// Yes: https://github.com/kubernetes/kubernetes/blob/abcbaed0784baf5ed2382aae9705a8918f2daa18/pkg/scheduler/eventhandlers.go#L70
{Resource: framework.PodSchedulingContext, ActionType: framework.Add | framework.Update},
{Event: framework.ClusterEvent{Resource: framework.PodSchedulingContext, ActionType: framework.Add | framework.Update}},
// A resource might depend on node labels for topology filtering.
// A new or updated node may make pods schedulable.
{Resource: framework.Node, ActionType: framework.Add | framework.UpdateNodeLabel},
{Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.UpdateNodeLabel}},
}
return events
}

View File

@@ -54,8 +54,8 @@ func (pl *InterPodAffinity) Name() string {
// EventsToRegister returns the possible events that may make a failed Pod
// schedulable
func (pl *InterPodAffinity) EventsToRegister() []framework.ClusterEvent {
return []framework.ClusterEvent{
func (pl *InterPodAffinity) EventsToRegister() []framework.ClusterEventWithHint {
return []framework.ClusterEventWithHint{
// All ActionType includes the following events:
// - Delete. An unschedulable Pod may fail due to violating an existing Pod's anti-affinity constraints,
// deleting an existing Pod may make it schedulable.
@@ -63,8 +63,8 @@ func (pl *InterPodAffinity) EventsToRegister() []framework.ClusterEvent {
// an unschedulable Pod schedulable.
// - Add. An unschedulable Pod may fail due to violating pod-affinity constraints,
// adding an assigned Pod may make it schedulable.
{Resource: framework.Pod, ActionType: framework.All},
{Resource: framework.Node, ActionType: framework.Add | framework.UpdateNodeLabel},
{Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.All}},
{Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.UpdateNodeLabel}},
}
}

View File

@@ -81,9 +81,9 @@ func (s *preFilterState) Clone() framework.StateData {
// EventsToRegister returns the possible events that may make a Pod
// failed by this plugin schedulable.
func (pl *NodeAffinity) EventsToRegister() []framework.ClusterEvent {
return []framework.ClusterEvent{
{Resource: framework.Node, ActionType: framework.Add | framework.Update},
func (pl *NodeAffinity) EventsToRegister() []framework.ClusterEventWithHint {
return []framework.ClusterEventWithHint{
{Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.Update}},
}
}

View File

@@ -41,9 +41,9 @@ const (
// EventsToRegister returns the possible events that may make a Pod
// failed by this plugin schedulable.
func (pl *NodeName) EventsToRegister() []framework.ClusterEvent {
return []framework.ClusterEvent{
{Resource: framework.Node, ActionType: framework.Add | framework.Update},
func (pl *NodeName) EventsToRegister() []framework.ClusterEventWithHint {
return []framework.ClusterEventWithHint{
{Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.Update}},
}
}

View File

@@ -101,11 +101,11 @@ func getPreFilterState(cycleState *framework.CycleState) (preFilterState, error)
// EventsToRegister returns the possible events that may make a Pod
// failed by this plugin schedulable.
func (pl *NodePorts) EventsToRegister() []framework.ClusterEvent {
return []framework.ClusterEvent{
func (pl *NodePorts) EventsToRegister() []framework.ClusterEventWithHint {
return []framework.ClusterEventWithHint{
// Due to immutable fields `spec.containers[*].ports`, pod update events are ignored.
{Resource: framework.Pod, ActionType: framework.Delete},
{Resource: framework.Node, ActionType: framework.Add | framework.Update},
{Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Delete}},
{Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.Update}},
}
}

View File

@@ -234,16 +234,16 @@ func getPreFilterState(cycleState *framework.CycleState) (*preFilterState, error
// EventsToRegister returns the possible events that may make a Pod
// failed by this plugin schedulable.
func (f *Fit) EventsToRegister() []framework.ClusterEvent {
func (f *Fit) EventsToRegister() []framework.ClusterEventWithHint {
podActionType := framework.Delete
if f.enableInPlacePodVerticalScaling {
// If InPlacePodVerticalScaling (KEP 1287) is enabled, then PodUpdate event should be registered
// for this plugin since a Pod update may free up resources that make other Pods schedulable.
podActionType |= framework.Update
}
return []framework.ClusterEvent{
{Resource: framework.Pod, ActionType: podActionType},
{Resource: framework.Node, ActionType: framework.Add | framework.Update},
return []framework.ClusterEventWithHint{
{Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: podActionType}},
{Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.Update}},
}
}

View File

@@ -991,22 +991,22 @@ func TestEventsToRegister(t *testing.T) {
tests := []struct {
name string
inPlacePodVerticalScalingEnabled bool
expectedClusterEvents []framework.ClusterEvent
expectedClusterEvents []framework.ClusterEventWithHint
}{
{
"Register events with InPlacePodVerticalScaling feature enabled",
true,
[]framework.ClusterEvent{
{Resource: "Pod", ActionType: framework.Update | framework.Delete},
{Resource: "Node", ActionType: framework.Add | framework.Update},
[]framework.ClusterEventWithHint{
{Event: framework.ClusterEvent{Resource: "Pod", ActionType: framework.Update | framework.Delete}},
{Event: framework.ClusterEvent{Resource: "Node", ActionType: framework.Add | framework.Update}},
},
},
{
"Register events with InPlacePodVerticalScaling feature disabled",
false,
[]framework.ClusterEvent{
{Resource: "Pod", ActionType: framework.Delete},
{Resource: "Node", ActionType: framework.Add | framework.Update},
[]framework.ClusterEventWithHint{
{Event: framework.ClusterEvent{Resource: "Pod", ActionType: framework.Delete}},
{Event: framework.ClusterEvent{Resource: "Node", ActionType: framework.Add | framework.Update}},
},
},
}

View File

@@ -46,9 +46,9 @@ const (
// EventsToRegister returns the possible events that may make a Pod
// failed by this plugin schedulable.
func (pl *NodeUnschedulable) EventsToRegister() []framework.ClusterEvent {
return []framework.ClusterEvent{
{Resource: framework.Node, ActionType: framework.Add | framework.UpdateNodeTaint},
func (pl *NodeUnschedulable) EventsToRegister() []framework.ClusterEventWithHint {
return []framework.ClusterEventWithHint{
{Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.UpdateNodeTaint}},
}
}

View File

@@ -74,10 +74,10 @@ func (pl *CSILimits) Name() string {
// EventsToRegister returns the possible events that may make a Pod
// failed by this plugin schedulable.
func (pl *CSILimits) EventsToRegister() []framework.ClusterEvent {
return []framework.ClusterEvent{
{Resource: framework.CSINode, ActionType: framework.Add},
{Resource: framework.Pod, ActionType: framework.Delete},
func (pl *CSILimits) EventsToRegister() []framework.ClusterEventWithHint {
return []framework.ClusterEventWithHint{
{Event: framework.ClusterEvent{Resource: framework.CSINode, ActionType: framework.Add}},
{Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Delete}},
}
}

View File

@@ -202,10 +202,10 @@ func (pl *nonCSILimits) Name() string {
// EventsToRegister returns the possible events that may make a Pod
// failed by this plugin schedulable.
func (pl *nonCSILimits) EventsToRegister() []framework.ClusterEvent {
return []framework.ClusterEvent{
{Resource: framework.Node, ActionType: framework.Add},
{Resource: framework.Pod, ActionType: framework.Delete},
func (pl *nonCSILimits) EventsToRegister() []framework.ClusterEventWithHint {
return []framework.ClusterEventWithHint{
{Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add}},
{Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Delete}},
}
}

View File

@@ -131,8 +131,8 @@ func (pl *PodTopologySpread) setListers(factory informers.SharedInformerFactory)
// EventsToRegister returns the possible events that may make a Pod
// failed by this plugin schedulable.
func (pl *PodTopologySpread) EventsToRegister() []framework.ClusterEvent {
return []framework.ClusterEvent{
func (pl *PodTopologySpread) EventsToRegister() []framework.ClusterEventWithHint {
return []framework.ClusterEventWithHint{
// All ActionType includes the following events:
// - Add. An unschedulable Pod may fail due to violating topology spread constraints,
// adding an assigned Pod may make it schedulable.
@@ -140,9 +140,9 @@ func (pl *PodTopologySpread) EventsToRegister() []framework.ClusterEvent {
// an unschedulable Pod schedulable.
// - Delete. An unschedulable Pod may fail due to violating an existing Pod's topology spread constraints,
// deleting an existing Pod may make it schedulable.
{Resource: framework.Pod, ActionType: framework.All},
{Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.All}},
// Node add|delete|updateLabel maybe lead an topology key changed,
// and make these pod in scheduling schedulable or unschedulable.
{Resource: framework.Node, ActionType: framework.Add | framework.Delete | framework.UpdateNodeLabel},
{Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.Delete | framework.UpdateNodeLabel}},
}
}

View File

@@ -55,9 +55,9 @@ func (pl *SchedulingGates) PreEnqueue(ctx context.Context, p *v1.Pod) *framework
// EventsToRegister returns the possible events that may make a Pod
// failed by this plugin schedulable.
func (pl *SchedulingGates) EventsToRegister() []framework.ClusterEvent {
return []framework.ClusterEvent{
{Resource: framework.Pod, ActionType: framework.Update},
func (pl *SchedulingGates) EventsToRegister() []framework.ClusterEventWithHint {
return []framework.ClusterEventWithHint{
{Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Update}},
}
}

View File

@@ -54,9 +54,9 @@ func (pl *TaintToleration) Name() string {
// EventsToRegister returns the possible events that may make a Pod
// failed by this plugin schedulable.
func (pl *TaintToleration) EventsToRegister() []framework.ClusterEvent {
return []framework.ClusterEvent{
{Resource: framework.Node, ActionType: framework.Add | framework.Update},
func (pl *TaintToleration) EventsToRegister() []framework.ClusterEventWithHint {
return []framework.ClusterEventWithHint{
{Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.Update}},
}
}

View File

@@ -87,25 +87,25 @@ func (pl *VolumeBinding) Name() string {
// EventsToRegister returns the possible events that may make a Pod
// failed by this plugin schedulable.
func (pl *VolumeBinding) EventsToRegister() []framework.ClusterEvent {
events := []framework.ClusterEvent{
func (pl *VolumeBinding) EventsToRegister() []framework.ClusterEventWithHint {
events := []framework.ClusterEventWithHint{
// Pods may fail because of missing or mis-configured storage class
// (e.g., allowedTopologies, volumeBindingMode), and hence may become
// schedulable upon StorageClass Add or Update events.
{Resource: framework.StorageClass, ActionType: framework.Add | framework.Update},
{Event: framework.ClusterEvent{Resource: framework.StorageClass, ActionType: framework.Add | framework.Update}},
// We bind PVCs with PVs, so any changes may make the pods schedulable.
{Resource: framework.PersistentVolumeClaim, ActionType: framework.Add | framework.Update},
{Resource: framework.PersistentVolume, ActionType: framework.Add | framework.Update},
{Event: framework.ClusterEvent{Resource: framework.PersistentVolumeClaim, ActionType: framework.Add | framework.Update}},
{Event: framework.ClusterEvent{Resource: framework.PersistentVolume, ActionType: framework.Add | framework.Update}},
// Pods may fail to find available PVs because the node labels do not
// match the storage class's allowed topologies or PV's node affinity.
// A new or updated node may make pods schedulable.
{Resource: framework.Node, ActionType: framework.Add | framework.UpdateNodeLabel},
{Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.UpdateNodeLabel}},
// We rely on CSI node to translate in-tree PV to CSI.
{Resource: framework.CSINode, ActionType: framework.Add | framework.Update},
{Event: framework.ClusterEvent{Resource: framework.CSINode, ActionType: framework.Add | framework.Update}},
// When CSIStorageCapacity is enabled, pods may become schedulable
// on CSI driver & storage capacity changes.
{Resource: framework.CSIDriver, ActionType: framework.Add | framework.Update},
{Resource: framework.CSIStorageCapacity, ActionType: framework.Add | framework.Update},
{Event: framework.ClusterEvent{Resource: framework.CSIDriver, ActionType: framework.Add | framework.Update}},
{Event: framework.ClusterEvent{Resource: framework.CSIStorageCapacity, ActionType: framework.Add | framework.Update}},
}
return events
}

View File

@@ -333,17 +333,17 @@ func (pl *VolumeRestrictions) Filter(ctx context.Context, cycleState *framework.
// EventsToRegister returns the possible events that may make a Pod
// failed by this plugin schedulable.
func (pl *VolumeRestrictions) EventsToRegister() []framework.ClusterEvent {
return []framework.ClusterEvent{
func (pl *VolumeRestrictions) EventsToRegister() []framework.ClusterEventWithHint {
return []framework.ClusterEventWithHint{
// Pods may fail to schedule because of volumes conflicting with other pods on same node.
// Once running pods are deleted and volumes have been released, the unschedulable pod will be schedulable.
// Due to immutable fields `spec.volumes`, pod update events are ignored.
{Resource: framework.Pod, ActionType: framework.Delete},
{Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Delete}},
// A new Node may make a pod schedulable.
{Resource: framework.Node, ActionType: framework.Add},
{Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add}},
// Pods may fail to schedule because the PVC it uses has not yet been created.
// This PVC is required to exist to check its access modes.
{Resource: framework.PersistentVolumeClaim, ActionType: framework.Add | framework.Update},
{Event: framework.ClusterEvent{Resource: framework.PersistentVolumeClaim, ActionType: framework.Add | framework.Update}},
}
}

View File

@@ -260,18 +260,18 @@ func getErrorAsStatus(err error) *framework.Status {
// EventsToRegister returns the possible events that may make a Pod
// failed by this plugin schedulable.
func (pl *VolumeZone) EventsToRegister() []framework.ClusterEvent {
return []framework.ClusterEvent{
func (pl *VolumeZone) EventsToRegister() []framework.ClusterEventWithHint {
return []framework.ClusterEventWithHint{
// New storageClass with bind mode `VolumeBindingWaitForFirstConsumer` will make a pod schedulable.
// Due to immutable field `storageClass.volumeBindingMode`, storageClass update events are ignored.
{Resource: framework.StorageClass, ActionType: framework.Add},
{Event: framework.ClusterEvent{Resource: framework.StorageClass, ActionType: framework.Add}},
// A new node or updating a node's volume zone labels may make a pod schedulable.
{Resource: framework.Node, ActionType: framework.Add | framework.UpdateNodeLabel},
{Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.UpdateNodeLabel}},
// A new pvc may make a pod schedulable.
// Due to fields are immutable except `spec.resources`, pvc update events are ignored.
{Resource: framework.PersistentVolumeClaim, ActionType: framework.Add},
{Event: framework.ClusterEvent{Resource: framework.PersistentVolumeClaim, ActionType: framework.Add}},
// A new pv or updating a pv's volume zone labels may make a pod schedulable.
{Resource: framework.PersistentVolume, ActionType: framework.Add | framework.Update},
{Event: framework.ClusterEvent{Resource: framework.PersistentVolume, ActionType: framework.Add | framework.Update}},
}
}

View File

@@ -44,15 +44,6 @@ const (
maxTimeout = 15 * time.Minute
)
var allClusterEvents = []framework.ClusterEvent{
{Resource: framework.Pod, ActionType: framework.All},
{Resource: framework.Node, ActionType: framework.All},
{Resource: framework.CSINode, ActionType: framework.All},
{Resource: framework.PersistentVolume, ActionType: framework.All},
{Resource: framework.PersistentVolumeClaim, ActionType: framework.All},
{Resource: framework.StorageClass, ActionType: framework.All},
}
// frameworkImpl is the component responsible for initializing and running scheduler
// plugins.
type frameworkImpl struct {
@@ -61,6 +52,7 @@ type frameworkImpl struct {
waitingPods *waitingPodsMap
scorePluginWeight map[string]int
preEnqueuePlugins []framework.PreEnqueuePlugin
enqueueExtensions []framework.EnqueueExtensions
queueSortPlugins []framework.QueueSortPlugin
preFilterPlugins []framework.PreFilterPlugin
filterPlugins []framework.FilterPlugin
@@ -133,7 +125,6 @@ type frameworkOptions struct {
podNominator framework.PodNominator
extenders []framework.Extender
captureProfile CaptureProfile
clusterEventMap map[framework.ClusterEvent]sets.Set[string]
parallelizer parallelize.Parallelizer
logger *klog.Logger
}
@@ -217,13 +208,6 @@ func WithCaptureProfile(c CaptureProfile) Option {
}
}
// WithClusterEventMap sets clusterEventMap for the scheduling frameworkImpl.
func WithClusterEventMap(m map[framework.ClusterEvent]sets.Set[string]) Option {
return func(o *frameworkOptions) {
o.clusterEventMap = m
}
}
// WithMetricsRecorder sets metrics recorder for the scheduling frameworkImpl.
func WithMetricsRecorder(r *metrics.MetricAsyncRecorder) Option {
return func(o *frameworkOptions) {
@@ -242,7 +226,6 @@ func WithLogger(logger klog.Logger) Option {
func defaultFrameworkOptions(stopCh <-chan struct{}) frameworkOptions {
return frameworkOptions{
metricsRecorder: metrics.NewMetricsAsyncRecorder(1000, time.Second, stopCh),
clusterEventMap: make(map[framework.ClusterEvent]sets.Set[string]),
parallelizer: parallelize.NewParallelizer(parallelize.DefaultParallelism),
}
}
@@ -325,8 +308,7 @@ func NewFramework(ctx context.Context, r Registry, profile *config.KubeScheduler
}
pluginsMap[name] = p
// Update ClusterEventMap in place.
fillEventToPluginMap(logger, p, options.clusterEventMap)
f.fillEnqueueExtensions(p)
}
// initialize plugins per individual extension points
@@ -545,34 +527,40 @@ func (f *frameworkImpl) expandMultiPointPlugins(logger klog.Logger, profile *con
return nil
}
func fillEventToPluginMap(logger klog.Logger, p framework.Plugin, eventToPlugins map[framework.ClusterEvent]sets.Set[string]) {
func (f *frameworkImpl) fillEnqueueExtensions(p framework.Plugin) {
ext, ok := p.(framework.EnqueueExtensions)
if !ok {
// If interface EnqueueExtensions is not implemented, register the default events
// to the plugin. This is to ensure backward compatibility.
registerClusterEvents(p.Name(), eventToPlugins, allClusterEvents)
// If interface EnqueueExtensions is not implemented, register the default enqueue extensions
// to the plugin because we don't know which events the plugin is interested in.
// This is to ensure backward compatibility.
f.enqueueExtensions = append(f.enqueueExtensions, &defaultEnqueueExtension{pluginName: p.Name()})
return
}
events := ext.EventsToRegister()
// It's rare that a plugin implements EnqueueExtensions but returns nil.
// We treat it as: the plugin is not interested in any event, and hence pod failed by that plugin
// cannot be moved by any regular cluster event.
if len(events) == 0 {
logger.Info("Plugin's EventsToRegister() returned nil", "plugin", p.Name())
return
}
// The most common case: a plugin implements EnqueueExtensions and returns non-nil result.
registerClusterEvents(p.Name(), eventToPlugins, events)
f.enqueueExtensions = append(f.enqueueExtensions, ext)
}
func registerClusterEvents(name string, eventToPlugins map[framework.ClusterEvent]sets.Set[string], evts []framework.ClusterEvent) {
for _, evt := range evts {
if eventToPlugins[evt] == nil {
eventToPlugins[evt] = sets.New(name)
} else {
eventToPlugins[evt].Insert(name)
// defaultEnqueueExtension is used when a plugin does not implement EnqueueExtensions interface.
type defaultEnqueueExtension struct {
pluginName string
}
func (p *defaultEnqueueExtension) Name() string { return p.pluginName }
func (p *defaultEnqueueExtension) EventsToRegister() []framework.ClusterEventWithHint {
// need to return all specific cluster events with framework.All action instead of wildcard event
// because the returning values are used to register event handlers.
// If we return the wildcard here, it won't affect the event handlers registered by the plugin
// and some events may not be registered in the event handlers.
return []framework.ClusterEventWithHint{
{Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.All}},
{Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.All}},
{Event: framework.ClusterEvent{Resource: framework.CSINode, ActionType: framework.All}},
{Event: framework.ClusterEvent{Resource: framework.CSIDriver, ActionType: framework.All}},
{Event: framework.ClusterEvent{Resource: framework.CSIStorageCapacity, ActionType: framework.All}},
{Event: framework.ClusterEvent{Resource: framework.PersistentVolume, ActionType: framework.All}},
{Event: framework.ClusterEvent{Resource: framework.PersistentVolumeClaim, ActionType: framework.All}},
{Event: framework.ClusterEvent{Resource: framework.StorageClass, ActionType: framework.All}},
{Event: framework.ClusterEvent{Resource: framework.PodSchedulingContext, ActionType: framework.All}},
}
}
@@ -607,6 +595,11 @@ func (f *frameworkImpl) PreEnqueuePlugins() []framework.PreEnqueuePlugin {
return f.preEnqueuePlugins
}
// EnqueueExtensions returns the registered reenqueue plugins.
func (f *frameworkImpl) EnqueueExtensions() []framework.EnqueueExtensions {
return f.enqueueExtensions
}
// QueueSortFunc returns the function to sort pods in scheduling queue
func (f *frameworkImpl) QueueSortFunc() framework.LessFunc {
if f == nil {

View File

@@ -853,165 +853,6 @@ func TestNewFrameworkMultiPointExpansion(t *testing.T) {
}
}
// fakeNoopPlugin doesn't implement interface framework.EnqueueExtensions.
type fakeNoopPlugin struct{}
func (*fakeNoopPlugin) Name() string { return "fakeNoop" }
func (*fakeNoopPlugin) Filter(_ context.Context, _ *framework.CycleState, _ *v1.Pod, _ *framework.NodeInfo) *framework.Status {
return nil
}
type fakeNodePlugin struct{}
func (*fakeNodePlugin) Name() string { return "fakeNode" }
func (*fakeNodePlugin) Filter(_ context.Context, _ *framework.CycleState, _ *v1.Pod, _ *framework.NodeInfo) *framework.Status {
return nil
}
func (*fakeNodePlugin) EventsToRegister() []framework.ClusterEvent {
return []framework.ClusterEvent{
{Resource: framework.Pod, ActionType: framework.All},
{Resource: framework.Node, ActionType: framework.Delete},
{Resource: framework.CSINode, ActionType: framework.Update | framework.Delete},
}
}
type fakePodPlugin struct{}
func (*fakePodPlugin) Name() string { return "fakePod" }
func (*fakePodPlugin) Filter(_ context.Context, _ *framework.CycleState, _ *v1.Pod, _ *framework.NodeInfo) *framework.Status {
return nil
}
func (*fakePodPlugin) EventsToRegister() []framework.ClusterEvent {
return []framework.ClusterEvent{
{Resource: framework.Pod, ActionType: framework.All},
{Resource: framework.Node, ActionType: framework.Add | framework.Delete},
{Resource: framework.PersistentVolumeClaim, ActionType: framework.Delete},
}
}
// fakeNoopRuntimePlugin implement interface framework.EnqueueExtensions, but returns nil
// at runtime. This can simulate a plugin registered at scheduler setup, but does nothing
// due to some disabled feature gate.
type fakeNoopRuntimePlugin struct{}
func (*fakeNoopRuntimePlugin) Name() string { return "fakeNoopRuntime" }
func (*fakeNoopRuntimePlugin) Filter(_ context.Context, _ *framework.CycleState, _ *v1.Pod, _ *framework.NodeInfo) *framework.Status {
return nil
}
func (*fakeNoopRuntimePlugin) EventsToRegister() []framework.ClusterEvent { return nil }
func TestNewFrameworkFillEventToPluginMap(t *testing.T) {
tests := []struct {
name string
plugins []framework.Plugin
want map[framework.ClusterEvent]sets.Set[string]
}{
{
name: "no-op plugin",
plugins: []framework.Plugin{&fakeNoopPlugin{}},
want: map[framework.ClusterEvent]sets.Set[string]{
{Resource: framework.Pod, ActionType: framework.All}: sets.New("fakeNoop", bindPlugin, queueSortPlugin),
{Resource: framework.Node, ActionType: framework.All}: sets.New("fakeNoop", bindPlugin, queueSortPlugin),
{Resource: framework.CSINode, ActionType: framework.All}: sets.New("fakeNoop", bindPlugin, queueSortPlugin),
{Resource: framework.PersistentVolume, ActionType: framework.All}: sets.New("fakeNoop", bindPlugin, queueSortPlugin),
{Resource: framework.PersistentVolumeClaim, ActionType: framework.All}: sets.New("fakeNoop", bindPlugin, queueSortPlugin),
{Resource: framework.StorageClass, ActionType: framework.All}: sets.New("fakeNoop", bindPlugin, queueSortPlugin),
},
},
{
name: "node plugin",
plugins: []framework.Plugin{&fakeNodePlugin{}},
want: map[framework.ClusterEvent]sets.Set[string]{
{Resource: framework.Pod, ActionType: framework.All}: sets.New("fakeNode", bindPlugin, queueSortPlugin),
{Resource: framework.Node, ActionType: framework.Delete}: sets.New("fakeNode"),
{Resource: framework.Node, ActionType: framework.All}: sets.New(bindPlugin, queueSortPlugin),
{Resource: framework.CSINode, ActionType: framework.Update | framework.Delete}: sets.New("fakeNode"),
{Resource: framework.CSINode, ActionType: framework.All}: sets.New(bindPlugin, queueSortPlugin),
{Resource: framework.PersistentVolume, ActionType: framework.All}: sets.New(bindPlugin, queueSortPlugin),
{Resource: framework.PersistentVolumeClaim, ActionType: framework.All}: sets.New(bindPlugin, queueSortPlugin),
{Resource: framework.StorageClass, ActionType: framework.All}: sets.New(bindPlugin, queueSortPlugin),
},
},
{
name: "pod plugin",
plugins: []framework.Plugin{&fakePodPlugin{}},
want: map[framework.ClusterEvent]sets.Set[string]{
{Resource: framework.Pod, ActionType: framework.All}: sets.New("fakePod", bindPlugin, queueSortPlugin),
{Resource: framework.Node, ActionType: framework.Add | framework.Delete}: sets.New("fakePod"),
{Resource: framework.Node, ActionType: framework.All}: sets.New(bindPlugin, queueSortPlugin),
{Resource: framework.PersistentVolumeClaim, ActionType: framework.Delete}: sets.New("fakePod"),
{Resource: framework.PersistentVolumeClaim, ActionType: framework.All}: sets.New(bindPlugin, queueSortPlugin),
{Resource: framework.CSINode, ActionType: framework.All}: sets.New(bindPlugin, queueSortPlugin),
{Resource: framework.PersistentVolume, ActionType: framework.All}: sets.New(bindPlugin, queueSortPlugin),
{Resource: framework.StorageClass, ActionType: framework.All}: sets.New(bindPlugin, queueSortPlugin),
},
},
{
name: "node and pod plugin",
plugins: []framework.Plugin{&fakeNodePlugin{}, &fakePodPlugin{}},
want: map[framework.ClusterEvent]sets.Set[string]{
{Resource: framework.Node, ActionType: framework.Delete}: sets.New("fakeNode"),
{Resource: framework.Node, ActionType: framework.Add | framework.Delete}: sets.New("fakePod"),
{Resource: framework.Pod, ActionType: framework.All}: sets.New("fakeNode", "fakePod", bindPlugin, queueSortPlugin),
{Resource: framework.CSINode, ActionType: framework.Update | framework.Delete}: sets.New("fakeNode"),
{Resource: framework.PersistentVolumeClaim, ActionType: framework.Delete}: sets.New("fakePod"),
{Resource: framework.Node, ActionType: framework.All}: sets.New(bindPlugin, queueSortPlugin),
{Resource: framework.CSINode, ActionType: framework.All}: sets.New(bindPlugin, queueSortPlugin),
{Resource: framework.PersistentVolume, ActionType: framework.All}: sets.New(bindPlugin, queueSortPlugin),
{Resource: framework.PersistentVolumeClaim, ActionType: framework.All}: sets.New(bindPlugin, queueSortPlugin),
{Resource: framework.StorageClass, ActionType: framework.All}: sets.New(bindPlugin, queueSortPlugin),
},
},
{
name: "no-op runtime plugin",
plugins: []framework.Plugin{&fakeNoopRuntimePlugin{}},
want: map[framework.ClusterEvent]sets.Set[string]{
{Resource: framework.Pod, ActionType: framework.All}: sets.New(bindPlugin, queueSortPlugin),
{Resource: framework.Node, ActionType: framework.All}: sets.New(bindPlugin, queueSortPlugin),
{Resource: framework.CSINode, ActionType: framework.All}: sets.New(bindPlugin, queueSortPlugin),
{Resource: framework.PersistentVolume, ActionType: framework.All}: sets.New(bindPlugin, queueSortPlugin),
{Resource: framework.PersistentVolumeClaim, ActionType: framework.All}: sets.New(bindPlugin, queueSortPlugin),
{Resource: framework.StorageClass, ActionType: framework.All}: sets.New(bindPlugin, queueSortPlugin),
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
registry := Registry{}
cfgPls := &config.Plugins{}
for _, pl := range tt.plugins {
tmpPl := pl
if err := registry.Register(pl.Name(), func(_ runtime.Object, _ framework.Handle) (framework.Plugin, error) {
return tmpPl, nil
}); err != nil {
t.Fatalf("fail to register filter plugin (%s)", pl.Name())
}
cfgPls.Filter.Enabled = append(cfgPls.Filter.Enabled, config.Plugin{Name: pl.Name()})
}
got := make(map[framework.ClusterEvent]sets.Set[string])
profile := config.KubeSchedulerProfile{Plugins: cfgPls}
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
_, err := newFrameworkWithQueueSortAndBind(ctx, registry, profile, WithClusterEventMap(got))
if err != nil {
t.Fatal(err)
}
if diff := cmp.Diff(tt.want, got); diff != "" {
t.Errorf("Unexpected eventToPlugin map (-want,+got):%s", diff)
}
})
}
}
func TestPreEnqueuePlugins(t *testing.T) {
tests := []struct {
name string

View File

@@ -78,6 +78,62 @@ const (
WildCard GVK = "*"
)
type ClusterEventWithHint struct {
Event ClusterEvent
// QueueingHintFn is executed for the plugin rejected by this plugin when the above Event happens,
// and filters out events to reduce useless retry of Pod's scheduling.
// It's an optional field. If not set,
// the scheduling of Pods will be always retried with backoff when this Event happens.
// (the same as QueueAfterBackoff)
QueueingHintFn QueueingHintFn
}
// QueueingHintFn returns a hint that signals whether the event can make a Pod,
// which was rejected by this plugin in the past scheduling cycle, schedulable or not.
// It's called before a Pod gets moved from unschedulableQ to backoffQ or activeQ.
//
// - `pod`: the Pod to be enqueued, which is rejected by this plugin in the past.
// - `event`: By which event the pod will be moved back to schedQ/backoffQ.
// - `oldObj` `newObj`: the object involved in that event.
// - For example, the given event is "Node deleted", the `oldObj` will be that deleted Node.
// - `oldObj` is nil if the event is add event.
// - `newObj` is nil if the event is delete event.
type QueueingHintFn func(pod *v1.Pod, oldObj, newObj interface{}) QueueingHint
type QueueingHint int
const (
// QueueSkip implies that the cluster event has no impact on
// scheduling of the pod.
QueueSkip QueueingHint = iota
// QueueAfterBackoff implies that the Pod may be schedulable by the event,
// and worth retring the scheduling again after backoff.
QueueAfterBackoff
// QueueImmediately is returned only when it is highly possible that the Pod gets scheduled in the next scheduling.
// You should only return QueueImmediately when there is a high chance that the Pod gets scheduled in the next scheduling.
// Otherwise, it's detrimental to scheduling throughput.
// For example, when the Pod was rejected as waiting for an external resource to be provisioned, that is directly tied to the Pod,
// and the event is that the resource is provisioned, then you can return QueueImmediately.
// As a counterexample, when the Pod was rejected due to insufficient memory resource,
// and the event is that more memory on Node is available, then you should return QueueAfterBackoff instead of QueueImmediately
// because other Pods may be waiting for the same resources and only a few of them would schedule in the next scheduling cycle.
QueueImmediately
)
func (s QueueingHint) String() string {
switch s {
case QueueSkip:
return "QueueSkip"
case QueueAfterBackoff:
return "QueueAfterBackoff"
case QueueImmediately:
return "QueueImmediately"
}
return ""
}
// ClusterEvent abstracts how a system resource's state gets changed.
// Resource represents the standard API resources such as Pod, Node, etc.
// ActionType denotes the specific change such as Add, Update or Delete.

View File

@@ -106,9 +106,13 @@ type SchedulingQueue interface {
Pop() (*framework.QueuedPodInfo, error)
Update(logger klog.Logger, oldPod, newPod *v1.Pod) error
Delete(pod *v1.Pod) error
MoveAllToActiveOrBackoffQueue(logger klog.Logger, event framework.ClusterEvent, preCheck PreEnqueueCheck)
// TODO(sanposhiho): move all PreEnqueueCkeck to Requeue and delete it from this parameter eventually.
// Some PreEnqueueCheck include event filtering logic based on some in-tree plugins
// and it affect badly to other plugins.
// See https://github.com/kubernetes/kubernetes/issues/110175
MoveAllToActiveOrBackoffQueue(logger klog.Logger, event framework.ClusterEvent, oldObj, newObj interface{}, preCheck PreEnqueueCheck)
AssignedPodAdded(logger klog.Logger, pod *v1.Pod)
AssignedPodUpdated(logger klog.Logger, pod *v1.Pod)
AssignedPodUpdated(logger klog.Logger, oldPod, newPod *v1.Pod)
PendingPods() ([]*v1.Pod, string)
// Close closes the SchedulingQueue so that the goroutine which is
// waiting to pop items can exit gracefully.
@@ -171,9 +175,10 @@ type PriorityQueue struct {
// when we received move request.
moveRequestCycle int64
clusterEventMap map[framework.ClusterEvent]sets.Set[string]
// preEnqueuePluginMap is keyed with profile name, valued with registered preEnqueue plugins.
preEnqueuePluginMap map[string][]framework.PreEnqueuePlugin
// queueingHintMap is keyed with profile name, valued with registered queueing hint functions.
queueingHintMap QueueingHintMapPerProfile
// closed indicates that the queue is closed.
// It is mainly used to let Pop() exit its control loop while waiting for an item.
@@ -186,6 +191,12 @@ type PriorityQueue struct {
pluginMetricsSamplePercent int
}
// QueueingHintFunction is the wrapper of QueueingHintFn that has PluginName.
type QueueingHintFunction struct {
PluginName string
QueueingHintFn framework.QueueingHintFn
}
type priorityQueueOptions struct {
clock clock.Clock
podInitialBackoffDuration time.Duration
@@ -194,8 +205,8 @@ type priorityQueueOptions struct {
podLister listersv1.PodLister
metricsRecorder metrics.MetricAsyncRecorder
pluginMetricsSamplePercent int
clusterEventMap map[framework.ClusterEvent]sets.Set[string]
preEnqueuePluginMap map[string][]framework.PreEnqueuePlugin
queueingHintMap QueueingHintMapPerProfile
}
// Option configures a PriorityQueue
@@ -229,13 +240,6 @@ func WithPodLister(pl listersv1.PodLister) Option {
}
}
// WithClusterEventMap sets clusterEventMap for PriorityQueue.
func WithClusterEventMap(m map[framework.ClusterEvent]sets.Set[string]) Option {
return func(o *priorityQueueOptions) {
o.clusterEventMap = m
}
}
// WithPodMaxInUnschedulablePodsDuration sets podMaxInUnschedulablePodsDuration for PriorityQueue.
func WithPodMaxInUnschedulablePodsDuration(duration time.Duration) Option {
return func(o *priorityQueueOptions) {
@@ -243,6 +247,19 @@ func WithPodMaxInUnschedulablePodsDuration(duration time.Duration) Option {
}
}
// QueueingHintMapPerProfile is keyed with profile name, valued with queueing hint map registered for the profile.
type QueueingHintMapPerProfile map[string]QueueingHintMap
// QueueingHintMap is keyed with ClusterEvent, valued with queueing hint functions registered for the event.
type QueueingHintMap map[framework.ClusterEvent][]*QueueingHintFunction
// WithQueueingHintMapPerProfile sets preEnqueuePluginMap for PriorityQueue.
func WithQueueingHintMapPerProfile(m QueueingHintMapPerProfile) Option {
return func(o *priorityQueueOptions) {
o.queueingHintMap = m
}
}
// WithPreEnqueuePluginMap sets preEnqueuePluginMap for PriorityQueue.
func WithPreEnqueuePluginMap(m map[string][]framework.PreEnqueuePlugin) Option {
return func(o *priorityQueueOptions) {
@@ -314,8 +331,8 @@ func NewPriorityQueue(
activeQ: heap.NewWithRecorder(podInfoKeyFunc, comp, metrics.NewActivePodsRecorder()),
unschedulablePods: newUnschedulablePods(metrics.NewUnschedulablePodsRecorder(), metrics.NewGatedPodsRecorder()),
moveRequestCycle: -1,
clusterEventMap: options.clusterEventMap,
preEnqueuePluginMap: options.preEnqueuePluginMap,
queueingHintMap: options.queueingHintMap,
metricsRecorder: options.metricsRecorder,
pluginMetricsSamplePercent: options.pluginMetricsSamplePercent,
}
@@ -336,6 +353,62 @@ func (p *PriorityQueue) Run(logger klog.Logger) {
}, 30*time.Second, p.stop)
}
// isPodWorthRequeuing calls QueueingHintFn of only plugins registered in pInfo.unschedulablePlugins.
// If any QueueingHintFn returns QueueImmediately, the scheduling queue is supposed to enqueue this Pod to activeQ.
// If no QueueingHintFn returns QueueImmediately, but some return QueueAfterBackoff,
// the scheduling queue is supposed to enqueue this Pod to activeQ/backoffQ depending on the remaining backoff time of the Pod.
// If all QueueingHintFn returns QueueSkip, the scheduling queue enqueues the Pod back to unschedulable Pod pool
// because no plugin changes the scheduling result via the event.
func (p *PriorityQueue) isPodWorthRequeuing(logger klog.Logger, pInfo *framework.QueuedPodInfo, event framework.ClusterEvent, oldObj, newObj interface{}) framework.QueueingHint {
if pInfo.UnschedulablePlugins.Len() == 0 {
logger.V(6).Info("Worth requeuing because no unschedulable plugins", "pod", klog.KObj(pInfo.Pod))
return framework.QueueAfterBackoff
}
if event.IsWildCard() {
logger.V(6).Info("Worth requeuing because the event is wildcard", "pod", klog.KObj(pInfo.Pod))
return framework.QueueAfterBackoff
}
hintMap, ok := p.queueingHintMap[pInfo.Pod.Spec.SchedulerName]
if !ok {
// shouldn't reach here unless bug.
logger.Error(nil, "No QueueingHintMap is registered for this profile", "profile", pInfo.Pod.Spec.SchedulerName, "pod", klog.KObj(pInfo.Pod))
return framework.QueueAfterBackoff
}
pod := pInfo.Pod
queueHint := framework.QueueSkip
for eventToMatch, hintfns := range hintMap {
if eventToMatch.Resource != event.Resource || eventToMatch.ActionType&event.ActionType == 0 {
continue
}
for _, hintfn := range hintfns {
if !pInfo.UnschedulablePlugins.Has(hintfn.PluginName) {
continue
}
h := hintfn.QueueingHintFn(pod, oldObj, newObj)
if h == framework.QueueSkip {
continue
}
if h == framework.QueueImmediately {
return h
}
// replace queueHint with the returned value,
// but continue to other queueHintFn to check because other plugins may want to return QueueImmediately.
queueHint = h
}
}
// No queueing hint function is registered for this event
// or no queueing hint fn returns the value other than QueueSkip.
return queueHint
}
// runPreEnqueuePlugins iterates PreEnqueue function in each registered PreEnqueuePlugin.
// It returns true if all PreEnqueue function run successfully; otherwise returns false
// upon the first failure.
@@ -528,7 +601,6 @@ func (p *PriorityQueue) AddUnschedulableIfNotPresent(logger klog.Logger, pInfo *
p.unschedulablePods.addOrUpdate(pInfo)
logger.V(5).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pod), "event", ScheduleAttemptFailure, "queue", unschedulablePods)
metrics.SchedulerQueueIncomingPods.WithLabelValues("unschedulable", ScheduleAttemptFailure).Inc()
}
p.addNominatedPodUnlocked(logger, pInfo.PodInfo, nil)
@@ -583,7 +655,7 @@ func (p *PriorityQueue) flushUnschedulablePodsLeftover(logger klog.Logger) {
}
if len(podsToMove) > 0 {
p.movePodsToActiveOrBackoffQueue(logger, podsToMove, UnschedulableTimeout)
p.movePodsToActiveOrBackoffQueue(logger, podsToMove, UnschedulableTimeout, nil, nil)
}
}
@@ -712,7 +784,7 @@ func (p *PriorityQueue) Delete(pod *v1.Pod) error {
// may make pending pods with matching affinity terms schedulable.
func (p *PriorityQueue) AssignedPodAdded(logger klog.Logger, pod *v1.Pod) {
p.lock.Lock()
p.movePodsToActiveOrBackoffQueue(logger, p.getUnschedulablePodsWithMatchingAffinityTerm(logger, pod), AssignedPodAdd)
p.movePodsToActiveOrBackoffQueue(logger, p.getUnschedulablePodsWithMatchingAffinityTerm(logger, pod), AssignedPodAdd, nil, pod)
p.lock.Unlock()
}
@@ -732,12 +804,12 @@ func isPodResourcesResizedDown(pod *v1.Pod) bool {
// AssignedPodUpdated is called when a bound pod is updated. Change of labels
// may make pending pods with matching affinity terms schedulable.
func (p *PriorityQueue) AssignedPodUpdated(logger klog.Logger, pod *v1.Pod) {
func (p *PriorityQueue) AssignedPodUpdated(logger klog.Logger, oldPod, newPod *v1.Pod) {
p.lock.Lock()
if isPodResourcesResizedDown(pod) {
p.moveAllToActiveOrBackoffQueue(logger, AssignedPodUpdate, nil)
if isPodResourcesResizedDown(newPod) {
p.moveAllToActiveOrBackoffQueue(logger, AssignedPodUpdate, oldPod, newPod, nil)
} else {
p.movePodsToActiveOrBackoffQueue(logger, p.getUnschedulablePodsWithMatchingAffinityTerm(logger, pod), AssignedPodUpdate)
p.movePodsToActiveOrBackoffQueue(logger, p.getUnschedulablePodsWithMatchingAffinityTerm(logger, newPod), AssignedPodUpdate, oldPod, newPod)
}
p.lock.Unlock()
}
@@ -747,54 +819,75 @@ func (p *PriorityQueue) AssignedPodUpdated(logger klog.Logger, pod *v1.Pod) {
// This function adds all pods and then signals the condition variable to ensure that
// if Pop() is waiting for an item, it receives the signal after all the pods are in the
// queue and the head is the highest priority pod.
func (p *PriorityQueue) moveAllToActiveOrBackoffQueue(logger klog.Logger, event framework.ClusterEvent, preCheck PreEnqueueCheck) {
func (p *PriorityQueue) moveAllToActiveOrBackoffQueue(logger klog.Logger, event framework.ClusterEvent, oldObj, newObj interface{}, preCheck PreEnqueueCheck) {
unschedulablePods := make([]*framework.QueuedPodInfo, 0, len(p.unschedulablePods.podInfoMap))
for _, pInfo := range p.unschedulablePods.podInfoMap {
if preCheck == nil || preCheck(pInfo.Pod) {
unschedulablePods = append(unschedulablePods, pInfo)
}
}
p.movePodsToActiveOrBackoffQueue(logger, unschedulablePods, event)
p.movePodsToActiveOrBackoffQueue(logger, unschedulablePods, event, oldObj, newObj)
}
// MoveAllToActiveOrBackoffQueue moves all pods from unschedulablePods to activeQ or backoffQ.
// This function adds all pods and then signals the condition variable to ensure that
// if Pop() is waiting for an item, it receives the signal after all the pods are in the
// queue and the head is the highest priority pod.
func (p *PriorityQueue) MoveAllToActiveOrBackoffQueue(logger klog.Logger, event framework.ClusterEvent, preCheck PreEnqueueCheck) {
func (p *PriorityQueue) MoveAllToActiveOrBackoffQueue(logger klog.Logger, event framework.ClusterEvent, oldObj, newObj interface{}, preCheck PreEnqueueCheck) {
p.lock.Lock()
defer p.lock.Unlock()
p.moveAllToActiveOrBackoffQueue(logger, event, preCheck)
p.moveAllToActiveOrBackoffQueue(logger, event, oldObj, newObj, preCheck)
}
// requeuePodViaQueueingHint tries to requeue Pod to activeQ, backoffQ or unschedulable pod pool based on schedulingHint.
// It returns the queue name Pod goes.
//
// NOTE: this function assumes lock has been acquired in caller
func (p *PriorityQueue) requeuePodViaQueueingHint(logger klog.Logger, pInfo *framework.QueuedPodInfo, schedulingHint framework.QueueingHint, event string) string {
if schedulingHint == framework.QueueSkip {
p.unschedulablePods.addOrUpdate(pInfo)
metrics.SchedulerQueueIncomingPods.WithLabelValues("unschedulable", ScheduleAttemptFailure).Inc()
return unschedulablePods
}
pod := pInfo.Pod
if p.isPodBackingoff(pInfo) && schedulingHint == framework.QueueAfterBackoff {
if err := p.podBackoffQ.Add(pInfo); err != nil {
logger.Error(err, "Error adding pod to the backoff queue, queue this Pod to unschedulable pod pool", "pod", klog.KObj(pod))
p.unschedulablePods.addOrUpdate(pInfo)
return unschedulablePods
}
metrics.SchedulerQueueIncomingPods.WithLabelValues("backoff", event).Inc()
return backoffQ
}
if added, _ := p.addToActiveQ(logger, pInfo); added {
metrics.SchedulerQueueIncomingPods.WithLabelValues("active", event).Inc()
return activeQ
}
p.unschedulablePods.addOrUpdate(pInfo)
metrics.SchedulerQueueIncomingPods.WithLabelValues("unschedulable", ScheduleAttemptFailure).Inc()
return unschedulablePods
}
// NOTE: this function assumes lock has been acquired in caller
func (p *PriorityQueue) movePodsToActiveOrBackoffQueue(logger klog.Logger, podInfoList []*framework.QueuedPodInfo, event framework.ClusterEvent) {
func (p *PriorityQueue) movePodsToActiveOrBackoffQueue(logger klog.Logger, podInfoList []*framework.QueuedPodInfo, event framework.ClusterEvent, oldObj, newObj interface{}) {
activated := false
for _, pInfo := range podInfoList {
// If the event doesn't help making the Pod schedulable, continue.
// Note: we don't run the check if pInfo.UnschedulablePlugins is nil, which denotes
// either there is some abnormal error, or scheduling the pod failed by plugins other than PreFilter, Filter and Permit.
// In that case, it's desired to move it anyways.
if len(pInfo.UnschedulablePlugins) != 0 && !p.podMatchesEvent(pInfo, event) {
schedulingHint := p.isPodWorthRequeuing(logger, pInfo, event, oldObj, newObj)
if schedulingHint == framework.QueueSkip {
// QueueingHintFn determined that this Pod isn't worth putting to activeQ or backoffQ by this event.
logger.V(5).Info("Event is not making pod schedulable", "pod", klog.KObj(pInfo.Pod), "event", event.Label)
continue
}
pod := pInfo.Pod
if p.isPodBackingoff(pInfo) {
if err := p.podBackoffQ.Add(pInfo); err != nil {
logger.Error(err, "Error adding pod to the backoff queue", "pod", klog.KObj(pod))
} else {
logger.V(5).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", event, "queue", backoffQ)
metrics.SchedulerQueueIncomingPods.WithLabelValues("backoff", event.Label).Inc()
p.unschedulablePods.delete(pod, pInfo.Gated)
}
} else {
gated := pInfo.Gated
if added, _ := p.addToActiveQ(logger, pInfo); added {
logger.V(5).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", event, "queue", activeQ)
p.unschedulablePods.delete(pInfo.Pod, pInfo.Gated)
queue := p.requeuePodViaQueueingHint(logger, pInfo, schedulingHint, event.Label)
logger.V(4).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", event.Label, "queue", queue, "hint", schedulingHint)
if queue == activeQ {
activated = true
metrics.SchedulerQueueIncomingPods.WithLabelValues("active", event.Label).Inc()
p.unschedulablePods.delete(pod, gated)
}
}
}
p.moveRequestCycle = p.schedulingCycle
@@ -1144,42 +1237,3 @@ func MakeNextPodFunc(logger klog.Logger, queue SchedulingQueue) func() *framewor
func podInfoKeyFunc(obj interface{}) (string, error) {
return cache.MetaNamespaceKeyFunc(obj.(*framework.QueuedPodInfo).Pod)
}
// Checks if the Pod may become schedulable upon the event.
// This is achieved by looking up the global clusterEventMap registry.
func (p *PriorityQueue) podMatchesEvent(podInfo *framework.QueuedPodInfo, clusterEvent framework.ClusterEvent) bool {
if clusterEvent.IsWildCard() {
return true
}
for evt, nameSet := range p.clusterEventMap {
// Firstly verify if the two ClusterEvents match:
// - either the registered event from plugin side is a WildCardEvent,
// - or the two events have identical Resource fields and *compatible* ActionType.
// Note the ActionTypes don't need to be *identical*. We check if the ANDed value
// is zero or not. In this way, it's easy to tell Update&Delete is not compatible,
// but Update&All is.
evtMatch := evt.IsWildCard() ||
(evt.Resource == clusterEvent.Resource && evt.ActionType&clusterEvent.ActionType != 0)
// Secondly verify the plugin name matches.
// Note that if it doesn't match, we shouldn't continue to search.
if evtMatch && intersect(nameSet, podInfo.UnschedulablePlugins) {
return true
}
}
return false
}
func intersect(x, y sets.Set[string]) bool {
if len(x) > len(y) {
x, y = y, x
}
for v := range x {
if y.Has(v) {
return true
}
}
return false
}

View File

@@ -83,6 +83,16 @@ var (
cmp.AllowUnexported(nominator{}),
cmpopts.IgnoreFields(nominator{}, "podLister", "lock"),
}
queueHintReturnQueueAfterBackoff = func(pod *v1.Pod, oldObj, newObj interface{}) framework.QueueingHint {
return framework.QueueAfterBackoff
}
queueHintReturnQueueImmediately = func(pod *v1.Pod, oldObj, newObj interface{}) framework.QueueingHint {
return framework.QueueImmediately
}
queueHintReturnQueueSkip = func(pod *v1.Pod, oldObj, newObj interface{}) framework.QueueingHint {
return framework.QueueSkip
}
)
func getUnschedulablePod(p *PriorityQueue, pod *v1.Pod) *v1.Pod {
@@ -93,6 +103,13 @@ func getUnschedulablePod(p *PriorityQueue, pod *v1.Pod) *v1.Pod {
return nil
}
// makeEmptyQueueingHintMapPerProfile initializes an empty QueueingHintMapPerProfile for "" profile name.
func makeEmptyQueueingHintMapPerProfile() QueueingHintMapPerProfile {
m := make(QueueingHintMapPerProfile)
m[""] = make(QueueingHintMap)
return m
}
func TestPriorityQueue_Add(t *testing.T) {
objs := []runtime.Object{medPriorityPodInfo.Pod, unschedulablePodInfo.Pod, highPriorityPodInfo.Pod}
logger, ctx := ktesting.NewTestContext(t)
@@ -219,7 +236,7 @@ func TestPriorityQueue_AddUnschedulableIfNotPresent_Backoff(t *testing.T) {
}
// move all pods to active queue when we were trying to schedule them
q.MoveAllToActiveOrBackoffQueue(logger, TestEvent, nil)
q.MoveAllToActiveOrBackoffQueue(logger, TestEvent, nil, nil, nil)
oldCycle := q.SchedulingCycle()
firstPod, _ := q.Pop()
@@ -611,23 +628,25 @@ func BenchmarkMoveAllToActiveOrBackoffQueue(b *testing.B) {
b.StopTimer()
c := testingclock.NewFakeClock(time.Now())
m := make(map[framework.ClusterEvent]sets.Set[string])
m := makeEmptyQueueingHintMapPerProfile()
// - All plugins registered for events[0], which is NodeAdd.
// - 1/2 of plugins registered for events[1]
// - 1/3 of plugins registered for events[2]
// - ...
for j := 0; j < len(events); j++ {
m[events[j]] = sets.New[string]()
for k := 0; k < len(plugins); k++ {
if (k+1)%(j+1) == 0 {
m[events[j]].Insert(plugins[k])
m[""][events[j]] = append(m[""][events[j]], &QueueingHintFunction{
PluginName: plugins[k],
QueueingHintFn: queueHintReturnQueueAfterBackoff,
})
}
}
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
q := NewTestQueue(ctx, newDefaultQueueSort(), WithClock(c), WithClusterEventMap(m))
q := NewTestQueue(ctx, newDefaultQueueSort(), WithClock(c), WithQueueingHintMapPerProfile(m))
// Init pods in unschedulablePods.
for j := 0; j < podsInUnschedulablePods; j++ {
@@ -657,10 +676,10 @@ func BenchmarkMoveAllToActiveOrBackoffQueue(b *testing.B) {
b.StartTimer()
if tt.moveEvent.Resource != "" {
q.MoveAllToActiveOrBackoffQueue(logger, tt.moveEvent, nil)
q.MoveAllToActiveOrBackoffQueue(logger, tt.moveEvent, nil, nil, nil)
} else {
// Random case.
q.MoveAllToActiveOrBackoffQueue(logger, events[i%len(events)], nil)
q.MoveAllToActiveOrBackoffQueue(logger, events[i%len(events)], nil, nil, nil)
}
}
})
@@ -668,15 +687,93 @@ func BenchmarkMoveAllToActiveOrBackoffQueue(b *testing.B) {
}
}
func TestPriorityQueue_MoveAllToActiveOrBackoffQueueWithQueueingHint(t *testing.T) {
now := time.Now()
p := st.MakePod().Name("pod1").Namespace("ns1").UID("1").Obj()
tests := []struct {
name string
podInfo *framework.QueuedPodInfo
hint framework.QueueingHintFn
// duration is the duration that the Pod has been in the unschedulable queue.
duration time.Duration
// expectedQ is the queue name (activeQ, backoffQ, or unschedulablePods) that this Pod should be quened to.
expectedQ string
}{
{
name: "QueueImmediately queues pod to activeQ",
podInfo: &framework.QueuedPodInfo{PodInfo: mustNewPodInfo(p)},
hint: queueHintReturnQueueImmediately,
expectedQ: activeQ,
},
{
name: "QueueAfterBackoff queues pod to backoffQ if Pod is backing off",
podInfo: &framework.QueuedPodInfo{PodInfo: mustNewPodInfo(p)},
hint: queueHintReturnQueueAfterBackoff,
expectedQ: backoffQ,
},
{
name: "QueueAfterBackoff queues pod to activeQ if Pod is not backing off",
podInfo: &framework.QueuedPodInfo{PodInfo: mustNewPodInfo(p)},
hint: queueHintReturnQueueAfterBackoff,
duration: DefaultPodInitialBackoffDuration, // backoff is finished
expectedQ: activeQ,
},
{
name: "QueueSkip queues pod to unschedulablePods",
podInfo: &framework.QueuedPodInfo{PodInfo: mustNewPodInfo(p)},
hint: queueHintReturnQueueSkip,
expectedQ: unschedulablePods,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
logger, ctx := ktesting.NewTestContext(t)
m := makeEmptyQueueingHintMapPerProfile()
m[""][NodeAdd] = []*QueueingHintFunction{
{
PluginName: "foo",
QueueingHintFn: test.hint,
},
}
test.podInfo.UnschedulablePlugins = sets.New("foo")
cl := testingclock.NewFakeClock(now)
q := NewTestQueue(ctx, newDefaultQueueSort(), WithQueueingHintMapPerProfile(m), WithClock(cl))
// add to unsched pod pool
q.AddUnschedulableIfNotPresent(logger, test.podInfo, q.SchedulingCycle())
cl.Step(test.duration)
q.MoveAllToActiveOrBackoffQueue(logger, NodeAdd, nil, nil, nil)
if q.podBackoffQ.Len() == 0 && test.expectedQ == backoffQ {
t.Fatalf("expected pod to be queued to backoffQ, but it was not")
}
if q.activeQ.Len() == 0 && test.expectedQ == activeQ {
t.Fatalf("expected pod to be queued to activeQ, but it was not")
}
if q.unschedulablePods.get(test.podInfo.Pod) == nil && test.expectedQ == unschedulablePods {
t.Fatalf("expected pod to be queued to unschedulablePods, but it was not")
}
})
}
}
func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) {
c := testingclock.NewFakeClock(time.Now())
m := map[framework.ClusterEvent]sets.Set[string]{
{Resource: framework.Node, ActionType: framework.Add}: sets.New("fooPlugin"),
}
logger, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
q := NewTestQueue(ctx, newDefaultQueueSort(), WithClock(c), WithClusterEventMap(m))
m := makeEmptyQueueingHintMapPerProfile()
m[""][NodeAdd] = []*QueueingHintFunction{
{
PluginName: "fooPlugin",
QueueingHintFn: queueHintReturnQueueAfterBackoff,
},
}
q := NewTestQueue(ctx, newDefaultQueueSort(), WithClock(c), WithQueueingHintMapPerProfile(m))
q.Add(logger, medPriorityPodInfo.Pod)
q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(unschedulablePodInfo.Pod, "fooPlugin"), q.SchedulingCycle())
q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(highPriorityPodInfo.Pod, "fooPlugin"), q.SchedulingCycle())
@@ -689,7 +786,7 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) {
hpp2.Name = "hpp2"
q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(hpp2, "barPlugin"), q.SchedulingCycle())
// Pods is still backing off, move the pod into backoffQ.
q.MoveAllToActiveOrBackoffQueue(logger, NodeAdd, nil)
q.MoveAllToActiveOrBackoffQueue(logger, NodeAdd, nil, nil, nil)
if q.activeQ.Len() != 1 {
t.Errorf("Expected 1 item to be in activeQ, but got: %v", q.activeQ.Len())
}
@@ -715,7 +812,7 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) {
// Move clock by podInitialBackoffDuration, so that pods in the unschedulablePods would pass the backing off,
// and the pods will be moved into activeQ.
c.Step(q.podInitialBackoffDuration)
q.MoveAllToActiveOrBackoffQueue(logger, NodeAdd, nil)
q.MoveAllToActiveOrBackoffQueue(logger, NodeAdd, nil, nil, nil)
// hpp2 won't be moved regardless of its backoff timer.
if q.activeQ.Len() != 4 {
t.Errorf("Expected 4 items to be in activeQ, but got: %v", q.activeQ.Len())
@@ -737,8 +834,14 @@ func TestPriorityQueue_AssignedPodAdded(t *testing.T) {
labelPod := st.MakePod().Name("lbp").Namespace(affinityPod.Namespace).Label("service", "securityscan").Node("node1").Obj()
c := testingclock.NewFakeClock(time.Now())
m := map[framework.ClusterEvent]sets.Set[string]{AssignedPodAdd: sets.New("fakePlugin")}
q := NewTestQueue(ctx, newDefaultQueueSort(), WithClock(c), WithClusterEventMap(m))
m := makeEmptyQueueingHintMapPerProfile()
m[""][AssignedPodAdd] = []*QueueingHintFunction{
{
PluginName: "fakePlugin",
QueueingHintFn: queueHintReturnQueueAfterBackoff,
},
}
q := NewTestQueue(ctx, newDefaultQueueSort(), WithClock(c), WithQueueingHintMapPerProfile(m))
q.Add(logger, medPriorityPodInfo.Pod)
// Add a couple of pods to the unschedulablePods.
q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(unschedulablePodInfo.Pod, "fakePlugin"), q.SchedulingCycle())
@@ -866,7 +969,7 @@ func TestPriorityQueue_PendingPods(t *testing.T) {
t.Errorf("Unexpected pending pods summary: want %v, but got %v.", wantSummary, gotSummary)
}
// Move all to active queue. We should still see the same set of pods.
q.MoveAllToActiveOrBackoffQueue(logger, TestEvent, nil)
q.MoveAllToActiveOrBackoffQueue(logger, TestEvent, nil, nil, nil)
gotPods, gotSummary = q.PendingPods()
if !reflect.DeepEqual(expectedSet, makeSet(gotPods)) {
t.Error("Unexpected list of pending Pods.")
@@ -1150,7 +1253,7 @@ func TestRecentlyTriedPodsGoBack(t *testing.T) {
q.AddUnschedulableIfNotPresent(logger, p1, q.SchedulingCycle())
c.Step(DefaultPodInitialBackoffDuration)
// Move all unschedulable pods to the active queue.
q.MoveAllToActiveOrBackoffQueue(logger, UnschedulableTimeout, nil)
q.MoveAllToActiveOrBackoffQueue(logger, UnschedulableTimeout, nil, nil, nil)
// Simulation is over. Now let's pop all pods. The pod popped first should be
// the last one we pop here.
for i := 0; i < 5; i++ {
@@ -1193,7 +1296,7 @@ func TestPodFailedSchedulingMultipleTimesDoesNotBlockNewerPod(t *testing.T) {
// Move clock to make the unschedulable pods complete backoff.
c.Step(DefaultPodInitialBackoffDuration + time.Second)
// Move all unschedulable pods to the active queue.
q.MoveAllToActiveOrBackoffQueue(logger, UnschedulableTimeout, nil)
q.MoveAllToActiveOrBackoffQueue(logger, UnschedulableTimeout, nil, nil, nil)
// Simulate a pod being popped by the scheduler,
// At this time, unschedulable pod should be popped.
@@ -1223,7 +1326,7 @@ func TestPodFailedSchedulingMultipleTimesDoesNotBlockNewerPod(t *testing.T) {
// Move clock to make the unschedulable pods complete backoff.
c.Step(DefaultPodInitialBackoffDuration + time.Second)
// Move all unschedulable pods to the active queue.
q.MoveAllToActiveOrBackoffQueue(logger, UnschedulableTimeout, nil)
q.MoveAllToActiveOrBackoffQueue(logger, UnschedulableTimeout, nil, nil, nil)
// At this time, newerPod should be popped
// because it is the oldest tried pod.
@@ -1267,7 +1370,7 @@ func TestHighPriorityBackoff(t *testing.T) {
// Put in the unschedulable queue.
q.AddUnschedulableIfNotPresent(logger, p, q.SchedulingCycle())
// Move all unschedulable pods to the active queue.
q.MoveAllToActiveOrBackoffQueue(logger, TestEvent, nil)
q.MoveAllToActiveOrBackoffQueue(logger, TestEvent, nil, nil, nil)
p, err = q.Pop()
if err != nil {
@@ -1282,13 +1385,17 @@ func TestHighPriorityBackoff(t *testing.T) {
// activeQ after one minutes if it is in unschedulablePods.
func TestHighPriorityFlushUnschedulablePodsLeftover(t *testing.T) {
c := testingclock.NewFakeClock(time.Now())
m := map[framework.ClusterEvent]sets.Set[string]{
NodeAdd: sets.New("fakePlugin"),
m := makeEmptyQueueingHintMapPerProfile()
m[""][NodeAdd] = []*QueueingHintFunction{
{
PluginName: "fakePlugin",
QueueingHintFn: queueHintReturnQueueAfterBackoff,
},
}
logger, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
q := NewTestQueue(ctx, newDefaultQueueSort(), WithClock(c), WithClusterEventMap(m))
q := NewTestQueue(ctx, newDefaultQueueSort(), WithClock(c), WithQueueingHintMapPerProfile(m))
midPod := st.MakePod().Name("test-midpod").Namespace("ns1").UID("tp-mid").Priority(midPriority).NominatedNodeName("node1").Obj()
highPod := st.MakePod().Name("test-highpod").Namespace("ns1").UID("tp-high").Priority(highPriority).NominatedNodeName("node1").Obj()
@@ -1448,7 +1555,7 @@ var (
queue.podBackoffQ.Add(pInfo)
}
moveAllToActiveOrBackoffQ = func(logger klog.Logger, queue *PriorityQueue, _ *framework.QueuedPodInfo) {
queue.MoveAllToActiveOrBackoffQueue(logger, UnschedulableTimeout, nil)
queue.MoveAllToActiveOrBackoffQueue(logger, UnschedulableTimeout, nil, nil, nil)
}
flushBackoffQ = func(logger klog.Logger, queue *PriorityQueue, _ *framework.QueuedPodInfo) {
queue.clock.(*testingclock.FakeClock).Step(2 * time.Second)
@@ -2040,7 +2147,7 @@ func TestBackOffFlow(t *testing.T) {
}
// An event happens.
q.MoveAllToActiveOrBackoffQueue(logger, UnschedulableTimeout, nil)
q.MoveAllToActiveOrBackoffQueue(logger, UnschedulableTimeout, nil, nil, nil)
if _, ok, _ := q.podBackoffQ.Get(podInfo); !ok {
t.Errorf("pod %v is not in the backoff queue", podID)
@@ -2070,102 +2177,6 @@ func TestBackOffFlow(t *testing.T) {
}
}
func TestPodMatchesEvent(t *testing.T) {
tests := []struct {
name string
podInfo *framework.QueuedPodInfo
event framework.ClusterEvent
clusterEventMap map[framework.ClusterEvent]sets.Set[string]
want bool
}{
{
name: "event not registered",
podInfo: newQueuedPodInfoForLookup(st.MakePod().Name("p").Obj()),
event: EmptyEvent,
clusterEventMap: map[framework.ClusterEvent]sets.Set[string]{
NodeAllEvent: sets.New("foo"),
},
want: false,
},
{
name: "pod's failed plugin matches but event does not match",
podInfo: newQueuedPodInfoForLookup(st.MakePod().Name("p").Obj(), "bar"),
event: AssignedPodAdd,
clusterEventMap: map[framework.ClusterEvent]sets.Set[string]{
NodeAllEvent: sets.New("foo", "bar"),
},
want: false,
},
{
name: "wildcard event wins regardless of event matching",
podInfo: newQueuedPodInfoForLookup(st.MakePod().Name("p").Obj(), "bar"),
event: WildCardEvent,
clusterEventMap: map[framework.ClusterEvent]sets.Set[string]{
NodeAllEvent: sets.New("foo"),
},
want: true,
},
{
name: "pod's failed plugin and event both match",
podInfo: newQueuedPodInfoForLookup(st.MakePod().Name("p").Obj(), "bar"),
event: NodeTaintChange,
clusterEventMap: map[framework.ClusterEvent]sets.Set[string]{
NodeAllEvent: sets.New("foo", "bar"),
},
want: true,
},
{
name: "pod's failed plugin registers fine-grained event",
podInfo: newQueuedPodInfoForLookup(st.MakePod().Name("p").Obj(), "bar"),
event: NodeTaintChange,
clusterEventMap: map[framework.ClusterEvent]sets.Set[string]{
NodeAllEvent: sets.New("foo"),
NodeTaintChange: sets.New("bar"),
},
want: true,
},
{
name: "if pod failed by multiple plugins, a single match gets a final match",
podInfo: newQueuedPodInfoForLookup(st.MakePod().Name("p").Obj(), "foo", "bar"),
event: NodeAdd,
clusterEventMap: map[framework.ClusterEvent]sets.Set[string]{
NodeAllEvent: sets.New("bar"),
},
want: true,
},
{
name: "plugin returns WildCardEvent and plugin name matches",
podInfo: newQueuedPodInfoForLookup(st.MakePod().Name("p").Obj(), "foo"),
event: PvAdd,
clusterEventMap: map[framework.ClusterEvent]sets.Set[string]{
WildCardEvent: sets.New("foo"),
},
want: true,
},
{
name: "plugin returns WildCardEvent but plugin name not match",
podInfo: newQueuedPodInfoForLookup(st.MakePod().Name("p").Obj(), "foo"),
event: PvAdd,
clusterEventMap: map[framework.ClusterEvent]sets.Set[string]{
WildCardEvent: sets.New("bar"),
},
want: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
q := NewTestQueue(ctx, newDefaultQueueSort())
q.clusterEventMap = tt.clusterEventMap
if got := q.podMatchesEvent(tt.podInfo, tt.event); got != tt.want {
t.Errorf("Want %v, but got %v", tt.want, got)
}
})
}
}
func TestMoveAllToActiveOrBackoffQueue_PreEnqueueChecks(t *testing.T) {
var podInfos []*framework.QueuedPodInfo
for i := 0; i < 5; i++ {
@@ -2224,7 +2235,7 @@ func TestMoveAllToActiveOrBackoffQueue_PreEnqueueChecks(t *testing.T) {
// See: https://github.com/golang/go/issues/8687
podInfo.Timestamp = podInfo.Timestamp.Add(time.Duration((i - len(tt.podInfos))) * time.Millisecond)
}
q.MoveAllToActiveOrBackoffQueue(logger, TestEvent, tt.preEnqueueCheck)
q.MoveAllToActiveOrBackoffQueue(logger, TestEvent, nil, nil, tt.preEnqueueCheck)
var got []string
for q.podBackoffQ.Len() != 0 {
obj, err := q.podBackoffQ.Pop()
@@ -2315,3 +2326,220 @@ func mustNewPodInfo(pod *v1.Pod) *framework.PodInfo {
}
return podInfo
}
// Test_isPodWorthRequeuing tests isPodWorthRequeuing function.
func Test_isPodWorthRequeuing(t *testing.T) {
count := 0
queueHintReturnQueueImmediately := func(pod *v1.Pod, oldObj, newObj interface{}) framework.QueueingHint {
count++
return framework.QueueImmediately
}
queueHintReturnQueueSkip := func(pod *v1.Pod, oldObj, newObj interface{}) framework.QueueingHint {
count++
return framework.QueueSkip
}
queueHintReturnQueueAfterBackoff := func(pod *v1.Pod, oldObj, newObj interface{}) framework.QueueingHint {
count++
return framework.QueueAfterBackoff
}
tests := []struct {
name string
podInfo *framework.QueuedPodInfo
event framework.ClusterEvent
oldObj interface{}
newObj interface{}
expected framework.QueueingHint
expectedExecutionCount int // expected total execution count of queueing hint function
queueingHintMap QueueingHintMapPerProfile
}{
{
name: "return QueueAfterBackoff when no queueing hint function is registered for the event",
podInfo: &framework.QueuedPodInfo{
UnschedulablePlugins: sets.New("fooPlugin1"),
PodInfo: mustNewPodInfo(st.MakePod().Name("pod1").Namespace("ns1").UID("1").Obj()),
},
event: NodeAdd,
oldObj: nil,
newObj: st.MakeNode().Node,
expected: framework.QueueSkip,
expectedExecutionCount: 0,
queueingHintMap: QueueingHintMapPerProfile{
"": {
// no queueing hint function for NodeAdd.
AssignedPodAdd: {
{
// It will be ignored because the event is not NodeAdd.
PluginName: "fooPlugin1",
QueueingHintFn: queueHintReturnQueueImmediately,
},
},
},
},
},
{
name: "return QueueAfterBackoff when the event is wildcard",
podInfo: &framework.QueuedPodInfo{
UnschedulablePlugins: sets.New("fooPlugin1"),
PodInfo: mustNewPodInfo(st.MakePod().Name("pod1").Namespace("ns1").UID("1").Obj()),
},
event: WildCardEvent,
oldObj: nil,
newObj: st.MakeNode().Node,
expected: framework.QueueAfterBackoff,
expectedExecutionCount: 0,
queueingHintMap: QueueingHintMapPerProfile{},
},
{
name: "QueueImmediately is the highest priority",
podInfo: &framework.QueuedPodInfo{
UnschedulablePlugins: sets.New("fooPlugin1", "fooPlugin2", "fooPlugin3", "fooPlugin4"),
PodInfo: mustNewPodInfo(st.MakePod().Name("pod1").Namespace("ns1").UID("1").Obj()),
},
event: NodeAdd,
oldObj: nil,
newObj: st.MakeNode().Node,
expected: framework.QueueImmediately,
expectedExecutionCount: 2,
queueingHintMap: QueueingHintMapPerProfile{
"": {
NodeAdd: {
{
// executed
PluginName: "fooPlugin1",
QueueingHintFn: queueHintReturnQueueAfterBackoff,
},
{
// executed
// But, no more queueing hint function is executed
// because the highest priority is QueueImmediately.
PluginName: "fooPlugin2",
QueueingHintFn: queueHintReturnQueueImmediately,
},
{
PluginName: "fooPlugin3",
QueueingHintFn: queueHintReturnQueueAfterBackoff,
},
{
PluginName: "fooPlugin4",
QueueingHintFn: queueHintReturnQueueSkip,
},
},
},
},
},
{
name: "QueueSkip is the lowest priority",
podInfo: &framework.QueuedPodInfo{
UnschedulablePlugins: sets.New("fooPlugin1", "fooPlugin2", "fooPlugin3"),
PodInfo: mustNewPodInfo(st.MakePod().Name("pod1").Namespace("ns1").UID("1").Obj()),
},
event: NodeAdd,
oldObj: nil,
newObj: st.MakeNode().Node,
expected: framework.QueueAfterBackoff,
expectedExecutionCount: 3,
queueingHintMap: QueueingHintMapPerProfile{
"": {
NodeAdd: {
{
PluginName: "fooPlugin1",
QueueingHintFn: queueHintReturnQueueAfterBackoff,
},
{
PluginName: "fooPlugin2",
QueueingHintFn: queueHintReturnQueueSkip,
},
{
PluginName: "fooPlugin3",
QueueingHintFn: queueHintReturnQueueAfterBackoff,
},
},
},
},
},
{
name: "Queueing hint function that isn't from the plugin, that is in the UnschedulablePlugins, is ignored",
podInfo: &framework.QueuedPodInfo{
UnschedulablePlugins: sets.New("fooPlugin1", "fooPlugin2"),
PodInfo: mustNewPodInfo(st.MakePod().Name("pod1").Namespace("ns1").UID("1").Obj()),
},
event: NodeAdd,
oldObj: nil,
newObj: st.MakeNode().Node,
expected: framework.QueueAfterBackoff,
expectedExecutionCount: 2,
queueingHintMap: QueueingHintMapPerProfile{
"": {
NodeAdd: {
{
PluginName: "fooPlugin1",
QueueingHintFn: queueHintReturnQueueAfterBackoff,
},
{
PluginName: "fooPlugin2",
QueueingHintFn: queueHintReturnQueueSkip,
},
{
PluginName: "fooPlugin3",
QueueingHintFn: queueHintReturnQueueImmediately, // It'll be ignored.
},
},
},
},
},
{
name: "If event is specific Node update event, queueing hint function for NodeUpdate/UpdateNodeLabel is executed",
podInfo: &framework.QueuedPodInfo{
UnschedulablePlugins: sets.New("fooPlugin1", "fooPlugin2"),
PodInfo: mustNewPodInfo(st.MakePod().Name("pod1").Namespace("ns1").UID("1").Obj()),
},
event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.UpdateNodeLabel},
oldObj: nil,
newObj: st.MakeNode().Node,
expected: framework.QueueAfterBackoff,
expectedExecutionCount: 3,
queueingHintMap: QueueingHintMapPerProfile{
"": {
framework.ClusterEvent{Resource: framework.Node, ActionType: framework.UpdateNodeLabel}: {
{
PluginName: "fooPlugin1",
QueueingHintFn: queueHintReturnQueueAfterBackoff,
},
{
PluginName: "fooPlugin2",
QueueingHintFn: queueHintReturnQueueAfterBackoff,
},
},
framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Update}: {
{
PluginName: "fooPlugin1",
QueueingHintFn: queueHintReturnQueueAfterBackoff,
},
},
NodeAdd: { // not executed because NodeAdd is unrelated.
{
PluginName: "fooPlugin1",
QueueingHintFn: queueHintReturnQueueAfterBackoff,
},
},
},
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
count = 0 // reset count every time
logger, ctx := ktesting.NewTestContext(t)
q := NewTestQueue(ctx, newDefaultQueueSort(), WithQueueingHintMapPerProfile(test.queueingHintMap))
actual := q.isPodWorthRequeuing(logger, test.podInfo, test.event, test.oldObj, test.newObj)
if actual != test.expected {
t.Errorf("isPodWorthRequeuing() = %v, want %v", actual, test.expected)
}
if count != test.expectedExecutionCount {
t.Errorf("isPodWorthRequeuing() executed queueing hint functions %v times, expected: %v", count, test.expectedExecutionCount)
}
})
}
}

View File

@@ -296,11 +296,11 @@ func (sched *Scheduler) handleBindingCycleError(
// It's intentional to "defer" this operation; otherwise MoveAllToActiveOrBackoffQueue() would
// update `q.moveRequest` and thus move the assumed pod to backoffQ anyways.
if status.IsUnschedulable() {
defer sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, internalqueue.AssignedPodDelete, func(pod *v1.Pod) bool {
defer sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, internalqueue.AssignedPodDelete, assumedPod, nil, func(pod *v1.Pod) bool {
return assumedPod.UID != pod.UID
})
} else {
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, internalqueue.AssignedPodDelete, nil)
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, internalqueue.AssignedPodDelete, assumedPod, nil, nil)
}
}

View File

@@ -24,7 +24,6 @@ import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/client-go/informers"
@@ -285,7 +284,6 @@ func New(ctx context.Context,
nodeLister := informerFactory.Core().V1().Nodes().Lister()
snapshot := internalcache.NewEmptySnapshot()
clusterEventMap := make(map[framework.ClusterEvent]sets.Set[string])
metricsRecorder := metrics.NewMetricsAsyncRecorder(1000, time.Second, stopEverything)
profiles, err := profile.NewMap(ctx, options.profiles, registry, recorderFactory,
@@ -295,7 +293,6 @@ func New(ctx context.Context,
frameworkruntime.WithInformerFactory(informerFactory),
frameworkruntime.WithSnapshotSharedLister(snapshot),
frameworkruntime.WithCaptureProfile(frameworkruntime.CaptureProfile(options.frameworkCapturer)),
frameworkruntime.WithClusterEventMap(clusterEventMap),
frameworkruntime.WithParallelism(int(options.parallelism)),
frameworkruntime.WithExtenders(extenders),
frameworkruntime.WithMetricsRecorder(metricsRecorder),
@@ -309,18 +306,21 @@ func New(ctx context.Context,
}
preEnqueuePluginMap := make(map[string][]framework.PreEnqueuePlugin)
queueingHintsPerProfile := make(internalqueue.QueueingHintMapPerProfile)
for profileName, profile := range profiles {
preEnqueuePluginMap[profileName] = profile.PreEnqueuePlugins()
queueingHintsPerProfile[profileName] = buildQueueingHintMap(profile.EnqueueExtensions())
}
podQueue := internalqueue.NewSchedulingQueue(
profiles[options.profiles[0].SchedulerName].QueueSortFunc(),
informerFactory,
internalqueue.WithPodInitialBackoffDuration(time.Duration(options.podInitialBackoffSeconds)*time.Second),
internalqueue.WithPodMaxBackoffDuration(time.Duration(options.podMaxBackoffSeconds)*time.Second),
internalqueue.WithPodLister(podLister),
internalqueue.WithClusterEventMap(clusterEventMap),
internalqueue.WithPodMaxInUnschedulablePodsDuration(options.podMaxInUnschedulablePodsDuration),
internalqueue.WithPreEnqueuePluginMap(preEnqueuePluginMap),
internalqueue.WithQueueingHintMapPerProfile(queueingHintsPerProfile),
internalqueue.WithPluginMetricsSamplePercent(pluginMetricsSamplePercent),
internalqueue.WithMetricsRecorder(*metricsRecorder),
)
@@ -349,11 +349,36 @@ func New(ctx context.Context,
}
sched.applyDefaultHandlers()
addAllEventHandlers(sched, informerFactory, dynInformerFactory, unionedGVKs(clusterEventMap))
addAllEventHandlers(sched, informerFactory, dynInformerFactory, unionedGVKs(queueingHintsPerProfile))
return sched, nil
}
// defaultQueueingHintFn is the default queueing hint function.
// It always returns QueueAfterBackoff as the queueing hint.
var defaultQueueingHintFn = func(_ *v1.Pod, _, _ interface{}) framework.QueueingHint {
return framework.QueueAfterBackoff
}
func buildQueueingHintMap(es []framework.EnqueueExtensions) internalqueue.QueueingHintMap {
queueingHintMap := make(map[framework.ClusterEvent][]*internalqueue.QueueingHintFunction)
for _, e := range es {
events := e.EventsToRegister()
for _, event := range events {
fn := event.QueueingHintFn
if fn == nil {
fn = defaultQueueingHintFn
}
queueingHintMap[event.Event] = append(queueingHintMap[event.Event], &internalqueue.QueueingHintFunction{
PluginName: e.Name(),
QueueingHintFn: fn,
})
}
}
return queueingHintMap
}
// Run begins watching and scheduling. It starts scheduling and blocked until the context is done.
func (sched *Scheduler) Run(ctx context.Context) {
logger := klog.FromContext(ctx)
@@ -439,15 +464,17 @@ func buildExtenders(logger klog.Logger, extenders []schedulerapi.Extender, profi
type FailureHandlerFn func(ctx context.Context, fwk framework.Framework, podInfo *framework.QueuedPodInfo, status *framework.Status, nominatingInfo *framework.NominatingInfo, start time.Time)
func unionedGVKs(m map[framework.ClusterEvent]sets.Set[string]) map[framework.GVK]framework.ActionType {
func unionedGVKs(queueingHintsPerProfile internalqueue.QueueingHintMapPerProfile) map[framework.GVK]framework.ActionType {
gvkMap := make(map[framework.GVK]framework.ActionType)
for evt := range m {
for _, queueingHints := range queueingHintsPerProfile {
for evt := range queueingHints {
if _, ok := gvkMap[evt.Resource]; ok {
gvkMap[evt.Resource] |= evt.ActionType
} else {
gvkMap[evt.Resource] = evt.ActionType
}
}
}
return gvkMap
}

View File

@@ -39,7 +39,9 @@ import (
"k8s.io/klog/v2"
"k8s.io/klog/v2/ktesting"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/apis/config/testing/defaults"
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultbinder"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/queuesort"
frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
@@ -643,3 +645,440 @@ func indexByPodAnnotationNodeName(obj interface{}) ([]string, error) {
}
return []string{nodeName}, nil
}
const (
fakeNoop = "fakeNoop"
fakeNode = "fakeNode"
fakePod = "fakePod"
fakeNoopRuntime = "fakeNoopRuntime"
queueSort = "no-op-queue-sort-plugin"
fakeBind = "bind-plugin"
)
func Test_buildQueueingHintMap(t *testing.T) {
tests := []struct {
name string
plugins []framework.Plugin
want map[framework.ClusterEvent][]*internalqueue.QueueingHintFunction
assertFn func(t *testing.T, got map[framework.ClusterEvent][]*internalqueue.QueueingHintFunction) bool
}{
{
name: "no-op plugin",
plugins: []framework.Plugin{&fakeNoopPlugin{}},
want: map[framework.ClusterEvent][]*internalqueue.QueueingHintFunction{
{Resource: framework.Pod, ActionType: framework.All}: {
{PluginName: fakeBind, QueueingHintFn: defaultQueueingHintFn},
{PluginName: fakeNoop, QueueingHintFn: defaultQueueingHintFn},
{PluginName: queueSort, QueueingHintFn: defaultQueueingHintFn},
},
{Resource: framework.Node, ActionType: framework.All}: {
{PluginName: fakeBind, QueueingHintFn: defaultQueueingHintFn},
{PluginName: fakeNoop, QueueingHintFn: defaultQueueingHintFn},
{PluginName: queueSort, QueueingHintFn: defaultQueueingHintFn},
},
{Resource: framework.CSINode, ActionType: framework.All}: {
{PluginName: fakeBind, QueueingHintFn: defaultQueueingHintFn},
{PluginName: fakeNoop, QueueingHintFn: defaultQueueingHintFn},
{PluginName: queueSort, QueueingHintFn: defaultQueueingHintFn},
},
{Resource: framework.CSIDriver, ActionType: framework.All}: {
{PluginName: fakeBind, QueueingHintFn: defaultQueueingHintFn},
{PluginName: fakeNoop, QueueingHintFn: defaultQueueingHintFn},
{PluginName: queueSort, QueueingHintFn: defaultQueueingHintFn},
},
{Resource: framework.CSIStorageCapacity, ActionType: framework.All}: {
{PluginName: fakeBind, QueueingHintFn: defaultQueueingHintFn},
{PluginName: fakeNoop, QueueingHintFn: defaultQueueingHintFn},
{PluginName: queueSort, QueueingHintFn: defaultQueueingHintFn},
},
{Resource: framework.PersistentVolume, ActionType: framework.All}: {
{PluginName: fakeBind, QueueingHintFn: defaultQueueingHintFn},
{PluginName: fakeNoop, QueueingHintFn: defaultQueueingHintFn},
{PluginName: queueSort, QueueingHintFn: defaultQueueingHintFn},
},
{Resource: framework.StorageClass, ActionType: framework.All}: {
{PluginName: fakeBind, QueueingHintFn: defaultQueueingHintFn},
{PluginName: fakeNoop, QueueingHintFn: defaultQueueingHintFn},
{PluginName: queueSort, QueueingHintFn: defaultQueueingHintFn},
},
{Resource: framework.PersistentVolumeClaim, ActionType: framework.All}: {
{PluginName: fakeBind, QueueingHintFn: defaultQueueingHintFn},
{PluginName: fakeNoop, QueueingHintFn: defaultQueueingHintFn},
{PluginName: queueSort, QueueingHintFn: defaultQueueingHintFn},
},
{Resource: framework.PodSchedulingContext, ActionType: framework.All}: {
{PluginName: fakeBind, QueueingHintFn: defaultQueueingHintFn},
{PluginName: fakeNoop, QueueingHintFn: defaultQueueingHintFn},
{PluginName: queueSort, QueueingHintFn: defaultQueueingHintFn},
},
},
},
{
name: "node and pod plugin",
plugins: []framework.Plugin{&fakeNodePlugin{}, &fakePodPlugin{}},
want: map[framework.ClusterEvent][]*internalqueue.QueueingHintFunction{
{Resource: framework.Pod, ActionType: framework.All}: {
{PluginName: fakeBind, QueueingHintFn: defaultQueueingHintFn},
{PluginName: queueSort, QueueingHintFn: defaultQueueingHintFn},
},
{Resource: framework.Pod, ActionType: framework.Add}: {
{PluginName: fakePod, QueueingHintFn: fakePodPluginQueueingFn},
},
{Resource: framework.Node, ActionType: framework.All}: {
{PluginName: fakeBind, QueueingHintFn: defaultQueueingHintFn},
{PluginName: queueSort, QueueingHintFn: defaultQueueingHintFn},
},
{Resource: framework.Node, ActionType: framework.Add}: {
{PluginName: fakeNode, QueueingHintFn: fakeNodePluginQueueingFn},
},
{Resource: framework.CSINode, ActionType: framework.All}: {
{PluginName: fakeBind, QueueingHintFn: defaultQueueingHintFn},
{PluginName: queueSort, QueueingHintFn: defaultQueueingHintFn},
},
{Resource: framework.CSIDriver, ActionType: framework.All}: {
{PluginName: fakeBind, QueueingHintFn: defaultQueueingHintFn},
{PluginName: queueSort, QueueingHintFn: defaultQueueingHintFn},
},
{Resource: framework.CSIStorageCapacity, ActionType: framework.All}: {
{PluginName: fakeBind, QueueingHintFn: defaultQueueingHintFn},
{PluginName: queueSort, QueueingHintFn: defaultQueueingHintFn},
},
{Resource: framework.PersistentVolume, ActionType: framework.All}: {
{PluginName: fakeBind, QueueingHintFn: defaultQueueingHintFn},
{PluginName: queueSort, QueueingHintFn: defaultQueueingHintFn},
},
{Resource: framework.StorageClass, ActionType: framework.All}: {
{PluginName: fakeBind, QueueingHintFn: defaultQueueingHintFn},
{PluginName: queueSort, QueueingHintFn: defaultQueueingHintFn},
},
{Resource: framework.PersistentVolumeClaim, ActionType: framework.All}: {
{PluginName: fakeBind, QueueingHintFn: defaultQueueingHintFn},
{PluginName: queueSort, QueueingHintFn: defaultQueueingHintFn},
},
{Resource: framework.PodSchedulingContext, ActionType: framework.All}: {
{PluginName: fakeBind, QueueingHintFn: defaultQueueingHintFn},
{PluginName: queueSort, QueueingHintFn: defaultQueueingHintFn},
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
registry := frameworkruntime.Registry{}
cfgPls := &schedulerapi.Plugins{}
plugins := append(tt.plugins, &fakebindPlugin{}, &fakeQueueSortPlugin{})
for _, pl := range plugins {
tmpPl := pl
if err := registry.Register(pl.Name(), func(_ runtime.Object, _ framework.Handle) (framework.Plugin, error) {
return tmpPl, nil
}); err != nil {
t.Fatalf("fail to register filter plugin (%s)", pl.Name())
}
cfgPls.MultiPoint.Enabled = append(cfgPls.MultiPoint.Enabled, schedulerapi.Plugin{Name: pl.Name()})
}
profile := schedulerapi.KubeSchedulerProfile{Plugins: cfgPls}
stopCh := make(chan struct{})
defer close(stopCh)
fwk, err := newFramework(registry, profile, stopCh)
if err != nil {
t.Fatal(err)
}
exts := fwk.EnqueueExtensions()
// need to sort to make the test result stable.
sort.Slice(exts, func(i, j int) bool {
return exts[i].Name() < exts[j].Name()
})
got := buildQueueingHintMap(exts)
for e, fns := range got {
wantfns, ok := tt.want[e]
if !ok {
t.Errorf("got unexpected event %v", e)
continue
}
if len(fns) != len(wantfns) {
t.Errorf("got %v queueing hint functions, want %v", len(fns), len(wantfns))
continue
}
for i, fn := range fns {
if fn.PluginName != wantfns[i].PluginName {
t.Errorf("got plugin name %v, want %v", fn.PluginName, wantfns[i].PluginName)
continue
}
if fn.QueueingHintFn(nil, nil, nil) != wantfns[i].QueueingHintFn(nil, nil, nil) {
t.Errorf("got queueing hint function (%v) returning %v, expect it to return %v", fn.PluginName, fn.QueueingHintFn(nil, nil, nil), wantfns[i].QueueingHintFn(nil, nil, nil))
continue
}
}
}
})
}
}
// Test_UnionedGVKs tests UnionedGVKs worked with buildQueueingHintMap.
func Test_UnionedGVKs(t *testing.T) {
tests := []struct {
name string
plugins schedulerapi.PluginSet
want map[framework.GVK]framework.ActionType
}{
{
name: "no-op plugin",
plugins: schedulerapi.PluginSet{
Enabled: []schedulerapi.Plugin{
{Name: fakeNoop},
{Name: queueSort},
{Name: fakeBind},
},
Disabled: []schedulerapi.Plugin{{Name: "*"}}, // disable default plugins
},
want: map[framework.GVK]framework.ActionType{
framework.Pod: framework.All,
framework.Node: framework.All,
framework.CSINode: framework.All,
framework.CSIDriver: framework.All,
framework.CSIStorageCapacity: framework.All,
framework.PersistentVolume: framework.All,
framework.PersistentVolumeClaim: framework.All,
framework.StorageClass: framework.All,
framework.PodSchedulingContext: framework.All,
},
},
{
name: "node plugin",
plugins: schedulerapi.PluginSet{
Enabled: []schedulerapi.Plugin{
{Name: fakeNode},
{Name: queueSort},
{Name: fakeBind},
},
Disabled: []schedulerapi.Plugin{{Name: "*"}}, // disable default plugins
},
want: map[framework.GVK]framework.ActionType{
framework.Pod: framework.All,
framework.Node: framework.All,
framework.CSINode: framework.All,
framework.CSIDriver: framework.All,
framework.CSIStorageCapacity: framework.All,
framework.PersistentVolume: framework.All,
framework.PersistentVolumeClaim: framework.All,
framework.StorageClass: framework.All,
framework.PodSchedulingContext: framework.All,
},
},
{
name: "pod plugin",
plugins: schedulerapi.PluginSet{
Enabled: []schedulerapi.Plugin{
{Name: fakePod},
{Name: queueSort},
{Name: fakeBind},
},
Disabled: []schedulerapi.Plugin{{Name: "*"}}, // disable default plugins
},
want: map[framework.GVK]framework.ActionType{
framework.Pod: framework.All,
framework.Node: framework.All,
framework.CSINode: framework.All,
framework.CSIDriver: framework.All,
framework.CSIStorageCapacity: framework.All,
framework.PersistentVolume: framework.All,
framework.PersistentVolumeClaim: framework.All,
framework.StorageClass: framework.All,
framework.PodSchedulingContext: framework.All,
},
},
{
name: "node and pod plugin",
plugins: schedulerapi.PluginSet{
Enabled: []schedulerapi.Plugin{
{Name: fakePod},
{Name: fakeNode},
{Name: queueSort},
{Name: fakeBind},
},
Disabled: []schedulerapi.Plugin{{Name: "*"}}, // disable default plugins
},
want: map[framework.GVK]framework.ActionType{
framework.Pod: framework.All,
framework.Node: framework.All,
framework.CSINode: framework.All,
framework.CSIDriver: framework.All,
framework.CSIStorageCapacity: framework.All,
framework.PersistentVolume: framework.All,
framework.PersistentVolumeClaim: framework.All,
framework.StorageClass: framework.All,
framework.PodSchedulingContext: framework.All,
},
},
{
name: "no-op runtime plugin",
plugins: schedulerapi.PluginSet{
Enabled: []schedulerapi.Plugin{
{Name: fakeNoopRuntime},
{Name: queueSort},
{Name: fakeBind},
},
Disabled: []schedulerapi.Plugin{{Name: "*"}}, // disable default plugins
},
want: map[framework.GVK]framework.ActionType{
framework.Pod: framework.All,
framework.Node: framework.All,
framework.CSINode: framework.All,
framework.CSIDriver: framework.All,
framework.CSIStorageCapacity: framework.All,
framework.PersistentVolume: framework.All,
framework.PersistentVolumeClaim: framework.All,
framework.StorageClass: framework.All,
framework.PodSchedulingContext: framework.All,
},
},
{
name: "plugins with default profile",
plugins: schedulerapi.PluginSet{Enabled: defaults.PluginsV1.MultiPoint.Enabled},
want: map[framework.GVK]framework.ActionType{
framework.Pod: framework.All,
framework.Node: framework.All,
framework.CSINode: framework.All,
framework.CSIDriver: framework.All,
framework.CSIStorageCapacity: framework.All,
framework.PersistentVolume: framework.All,
framework.PersistentVolumeClaim: framework.All,
framework.StorageClass: framework.All,
framework.PodSchedulingContext: framework.All,
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
registry := plugins.NewInTreeRegistry()
cfgPls := &schedulerapi.Plugins{MultiPoint: tt.plugins}
plugins := []framework.Plugin{&fakeNodePlugin{}, &fakePodPlugin{}, &fakeNoopPlugin{}, &fakeNoopRuntimePlugin{}, &fakeQueueSortPlugin{}, &fakebindPlugin{}}
for _, pl := range plugins {
tmpPl := pl
if err := registry.Register(pl.Name(), func(_ runtime.Object, _ framework.Handle) (framework.Plugin, error) {
return tmpPl, nil
}); err != nil {
t.Fatalf("fail to register filter plugin (%s)", pl.Name())
}
}
profile := schedulerapi.KubeSchedulerProfile{Plugins: cfgPls, PluginConfig: defaults.PluginConfigsV1}
stopCh := make(chan struct{})
defer close(stopCh)
fwk, err := newFramework(registry, profile, stopCh)
if err != nil {
t.Fatal(err)
}
queueingHintsPerProfile := internalqueue.QueueingHintMapPerProfile{
"default": buildQueueingHintMap(fwk.EnqueueExtensions()),
}
got := unionedGVKs(queueingHintsPerProfile)
if diff := cmp.Diff(tt.want, got); diff != "" {
t.Errorf("Unexpected eventToPlugin map (-want,+got):%s", diff)
}
})
}
}
func newFramework(r frameworkruntime.Registry, profile schedulerapi.KubeSchedulerProfile, stopCh <-chan struct{}) (framework.Framework, error) {
return frameworkruntime.NewFramework(context.Background(), r, &profile,
frameworkruntime.WithSnapshotSharedLister(internalcache.NewSnapshot(nil, nil)),
frameworkruntime.WithInformerFactory(informers.NewSharedInformerFactory(fake.NewSimpleClientset(), 0)),
)
}
var _ framework.QueueSortPlugin = &fakeQueueSortPlugin{}
// fakeQueueSortPlugin is a no-op implementation for QueueSort extension point.
type fakeQueueSortPlugin struct{}
func (pl *fakeQueueSortPlugin) Name() string {
return queueSort
}
func (pl *fakeQueueSortPlugin) Less(_, _ *framework.QueuedPodInfo) bool {
return false
}
var _ framework.BindPlugin = &fakebindPlugin{}
// fakebindPlugin is a no-op implementation for Bind extension point.
type fakebindPlugin struct{}
func (t *fakebindPlugin) Name() string {
return fakeBind
}
func (t *fakebindPlugin) Bind(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) *framework.Status {
return nil
}
// fakeNoopPlugin doesn't implement interface framework.EnqueueExtensions.
type fakeNoopPlugin struct{}
func (*fakeNoopPlugin) Name() string { return fakeNoop }
func (*fakeNoopPlugin) Filter(_ context.Context, _ *framework.CycleState, _ *v1.Pod, _ *framework.NodeInfo) *framework.Status {
return nil
}
var hintFromFakeNode = framework.QueueingHint(100)
type fakeNodePlugin struct{}
var fakeNodePluginQueueingFn = func(_ *v1.Pod, _, _ interface{}) framework.QueueingHint {
return hintFromFakeNode
}
func (*fakeNodePlugin) Name() string { return fakeNode }
func (*fakeNodePlugin) Filter(_ context.Context, _ *framework.CycleState, _ *v1.Pod, _ *framework.NodeInfo) *framework.Status {
return nil
}
func (*fakeNodePlugin) EventsToRegister() []framework.ClusterEventWithHint {
return []framework.ClusterEventWithHint{
{Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add}, QueueingHintFn: fakeNodePluginQueueingFn},
}
}
var hintFromFakePod = framework.QueueingHint(101)
type fakePodPlugin struct{}
var fakePodPluginQueueingFn = func(_ *v1.Pod, _, _ interface{}) framework.QueueingHint {
return hintFromFakePod
}
func (*fakePodPlugin) Name() string { return fakePod }
func (*fakePodPlugin) Filter(_ context.Context, _ *framework.CycleState, _ *v1.Pod, _ *framework.NodeInfo) *framework.Status {
return nil
}
func (*fakePodPlugin) EventsToRegister() []framework.ClusterEventWithHint {
return []framework.ClusterEventWithHint{
{Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Add}, QueueingHintFn: fakePodPluginQueueingFn},
}
}
// fakeNoopRuntimePlugin implement interface framework.EnqueueExtensions, but returns nil
// at runtime. This can simulate a plugin registered at scheduler setup, but does nothing
// due to some disabled feature gate.
type fakeNoopRuntimePlugin struct{}
func (*fakeNoopRuntimePlugin) Name() string { return fakeNoopRuntime }
func (*fakeNoopRuntimePlugin) Filter(_ context.Context, _ *framework.CycleState, _ *v1.Pod, _ *framework.NodeInfo) *framework.Status {
return nil
}
func (*fakeNoopRuntimePlugin) EventsToRegister() []framework.ClusterEventWithHint { return nil }

View File

@@ -25,6 +25,7 @@ import (
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/net"
@@ -159,3 +160,26 @@ func IsScalarResourceName(name v1.ResourceName) bool {
return v1helper.IsExtendedResourceName(name) || v1helper.IsHugePageResourceName(name) ||
v1helper.IsPrefixedNativeResource(name) || v1helper.IsAttachableVolumeResourceName(name)
}
// As converts two objects to the given type.
// Both objects must be of the same type. If not, an error is returned.
// nil objects are allowed and will be converted to nil.
func As[T runtime.Object](oldObj, newobj interface{}) (T, T, error) {
var oldTyped T
var newTyped T
var ok bool
if newobj != nil {
newTyped, ok = newobj.(T)
if !ok {
return oldTyped, newTyped, fmt.Errorf("expected %T, but got %T", newTyped, newobj)
}
}
if oldObj != nil {
oldTyped, ok = oldObj.(T)
if !ok {
return oldTyped, newTyped, fmt.Errorf("expected %T, but got %T", oldTyped, oldObj)
}
}
return oldTyped, newTyped, nil
}

View File

@@ -383,3 +383,107 @@ func TestPatchPodStatus(t *testing.T) {
})
}
}
// Test_As tests the As function with Pod.
func Test_As_Pod(t *testing.T) {
tests := []struct {
name string
oldObj interface{}
newObj interface{}
wantOldObj *v1.Pod
wantNewObj *v1.Pod
wantErr bool
}{
{
name: "nil old Pod",
oldObj: nil,
newObj: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}},
wantOldObj: nil,
wantNewObj: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}},
},
{
name: "nil new Pod",
oldObj: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}},
newObj: nil,
wantOldObj: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}},
wantNewObj: nil,
},
{
name: "two different kinds of objects",
oldObj: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}},
newObj: &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "foo"}},
wantErr: true,
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
gotOld, gotNew, err := As[*v1.Pod](tc.oldObj, tc.newObj)
if err != nil && !tc.wantErr {
t.Fatalf("unexpected error: %v", err)
}
if tc.wantErr {
if err == nil {
t.Fatalf("expected error, but got nil")
}
return
}
if diff := cmp.Diff(tc.wantOldObj, gotOld); diff != "" {
t.Errorf("unexpected old object (-want,+got):\n%s", diff)
}
if diff := cmp.Diff(tc.wantNewObj, gotNew); diff != "" {
t.Errorf("unexpected new object (-want,+got):\n%s", diff)
}
})
}
}
// Test_As_Node tests the As function with Node.
func Test_As_Node(t *testing.T) {
tests := []struct {
name string
oldObj interface{}
newObj interface{}
wantOldObj *v1.Node
wantNewObj *v1.Node
wantErr bool
}{
{
name: "nil old Node",
oldObj: nil,
newObj: &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "foo"}},
wantOldObj: nil,
wantNewObj: &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "foo"}},
},
{
name: "nil new Node",
oldObj: &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "foo"}},
newObj: nil,
wantOldObj: &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "foo"}},
wantNewObj: nil,
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
gotOld, gotNew, err := As[*v1.Node](tc.oldObj, tc.newObj)
if err != nil && !tc.wantErr {
t.Fatalf("unexpected error: %v", err)
}
if tc.wantErr {
if err == nil {
t.Fatalf("expected error, but got nil")
}
return
}
if diff := cmp.Diff(tc.wantOldObj, gotOld); diff != "" {
t.Errorf("unexpected old object (-want,+got):\n%s", diff)
}
if diff := cmp.Diff(tc.wantNewObj, gotNew); diff != "" {
t.Errorf("unexpected new object (-want,+got):\n%s", diff)
}
})
}
}

View File

@@ -276,9 +276,9 @@ func (f *fakeCRPlugin) Filter(_ context.Context, _ *framework.CycleState, _ *v1.
// EventsToRegister returns the possible events that may make a Pod
// failed by this plugin schedulable.
func (f *fakeCRPlugin) EventsToRegister() []framework.ClusterEvent {
return []framework.ClusterEvent{
{Resource: "foos.v1.example.com", ActionType: framework.All},
func (f *fakeCRPlugin) EventsToRegister() []framework.ClusterEventWithHint {
return []framework.ClusterEventWithHint{
{Event: framework.ClusterEvent{Resource: "foos.v1.example.com", ActionType: framework.All}},
}
}