Merge pull request #122309 from carlory/fix-118893-1
nodeaffinity: scheduler queueing hints
This commit is contained in:
		@@ -25,11 +25,13 @@ import (
 | 
				
			|||||||
	"k8s.io/apimachinery/pkg/runtime"
 | 
						"k8s.io/apimachinery/pkg/runtime"
 | 
				
			||||||
	"k8s.io/apimachinery/pkg/util/sets"
 | 
						"k8s.io/apimachinery/pkg/util/sets"
 | 
				
			||||||
	"k8s.io/component-helpers/scheduling/corev1/nodeaffinity"
 | 
						"k8s.io/component-helpers/scheduling/corev1/nodeaffinity"
 | 
				
			||||||
 | 
						"k8s.io/klog/v2"
 | 
				
			||||||
	"k8s.io/kubernetes/pkg/scheduler/apis/config"
 | 
						"k8s.io/kubernetes/pkg/scheduler/apis/config"
 | 
				
			||||||
	"k8s.io/kubernetes/pkg/scheduler/apis/config/validation"
 | 
						"k8s.io/kubernetes/pkg/scheduler/apis/config/validation"
 | 
				
			||||||
	"k8s.io/kubernetes/pkg/scheduler/framework"
 | 
						"k8s.io/kubernetes/pkg/scheduler/framework"
 | 
				
			||||||
	"k8s.io/kubernetes/pkg/scheduler/framework/plugins/helper"
 | 
						"k8s.io/kubernetes/pkg/scheduler/framework/plugins/helper"
 | 
				
			||||||
	"k8s.io/kubernetes/pkg/scheduler/framework/plugins/names"
 | 
						"k8s.io/kubernetes/pkg/scheduler/framework/plugins/names"
 | 
				
			||||||
 | 
						"k8s.io/kubernetes/pkg/scheduler/util"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// NodeAffinity is a plugin that checks if a pod node selector matches the node label.
 | 
					// NodeAffinity is a plugin that checks if a pod node selector matches the node label.
 | 
				
			||||||
@@ -83,10 +85,40 @@ func (s *preFilterState) Clone() framework.StateData {
 | 
				
			|||||||
// failed by this plugin schedulable.
 | 
					// failed by this plugin schedulable.
 | 
				
			||||||
func (pl *NodeAffinity) EventsToRegister() []framework.ClusterEventWithHint {
 | 
					func (pl *NodeAffinity) EventsToRegister() []framework.ClusterEventWithHint {
 | 
				
			||||||
	return []framework.ClusterEventWithHint{
 | 
						return []framework.ClusterEventWithHint{
 | 
				
			||||||
		{Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.Update}},
 | 
							{Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.Update}, QueueingHintFn: pl.isSchedulableAfterNodeChange},
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// isSchedulableAfterNodeChange is invoked whenever a node changed. It checks whether
 | 
				
			||||||
 | 
					// that change made a previously unschedulable pod schedulable.
 | 
				
			||||||
 | 
					func (pl *NodeAffinity) isSchedulableAfterNodeChange(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) {
 | 
				
			||||||
 | 
						_, modifiedNode, err := util.As[*v1.Node](oldObj, newObj)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return framework.Queue, err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if pl.addedNodeSelector != nil && !pl.addedNodeSelector.Match(modifiedNode) {
 | 
				
			||||||
 | 
							logger.V(4).Info("added or modified node didn't match scheduler-enforced node affinity and this event won't make the Pod schedulable", "pod", klog.KObj(pod), "node", klog.KObj(modifiedNode))
 | 
				
			||||||
 | 
							return framework.QueueSkip, nil
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						requiredNodeAffinity := nodeaffinity.GetRequiredNodeAffinity(pod)
 | 
				
			||||||
 | 
						isMatched, err := requiredNodeAffinity.Match(modifiedNode)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return framework.Queue, err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if isMatched {
 | 
				
			||||||
 | 
							logger.V(4).Info("node was created or updated, and matches with the pod's NodeAffinity", "pod", klog.KObj(pod), "node", klog.KObj(modifiedNode))
 | 
				
			||||||
 | 
							return framework.Queue, nil
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// TODO: also check if the original node meets the pod's requestments once preCheck is completely removed.
 | 
				
			||||||
 | 
						// See: https://github.com/kubernetes/kubernetes/issues/110175
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						logger.V(4).Info("node was created or updated, but it doesn't make this pod schedulable", "pod", klog.KObj(pod), "node", klog.KObj(modifiedNode))
 | 
				
			||||||
 | 
						return framework.QueueSkip, nil
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// PreFilter builds and writes cycle state used by Filter.
 | 
					// PreFilter builds and writes cycle state used by Filter.
 | 
				
			||||||
func (pl *NodeAffinity) PreFilter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) {
 | 
					func (pl *NodeAffinity) PreFilter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) {
 | 
				
			||||||
	affinity := pod.Spec.Affinity
 | 
						affinity := pod.Spec.Affinity
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -21,6 +21,7 @@ import (
 | 
				
			|||||||
	"testing"
 | 
						"testing"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	"github.com/google/go-cmp/cmp"
 | 
						"github.com/google/go-cmp/cmp"
 | 
				
			||||||
 | 
						"github.com/stretchr/testify/require"
 | 
				
			||||||
	v1 "k8s.io/api/core/v1"
 | 
						v1 "k8s.io/api/core/v1"
 | 
				
			||||||
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
						metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
				
			||||||
	"k8s.io/apimachinery/pkg/util/sets"
 | 
						"k8s.io/apimachinery/pkg/util/sets"
 | 
				
			||||||
@@ -1174,3 +1175,132 @@ func TestNodeAffinityPriority(t *testing.T) {
 | 
				
			|||||||
		})
 | 
							})
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func Test_isSchedulableAfterNodeChange(t *testing.T) {
 | 
				
			||||||
 | 
						podWithNodeAffinity := st.MakePod().NodeAffinityIn("foo", []string{"bar"})
 | 
				
			||||||
 | 
						testcases := map[string]struct {
 | 
				
			||||||
 | 
							args           *config.NodeAffinityArgs
 | 
				
			||||||
 | 
							pod            *v1.Pod
 | 
				
			||||||
 | 
							oldObj, newObj interface{}
 | 
				
			||||||
 | 
							expectedHint   framework.QueueingHint
 | 
				
			||||||
 | 
							expectedErr    bool
 | 
				
			||||||
 | 
						}{
 | 
				
			||||||
 | 
							"backoff-wrong-new-object": {
 | 
				
			||||||
 | 
								args:         &config.NodeAffinityArgs{},
 | 
				
			||||||
 | 
								pod:          podWithNodeAffinity.Obj(),
 | 
				
			||||||
 | 
								newObj:       "not-a-node",
 | 
				
			||||||
 | 
								expectedHint: framework.Queue,
 | 
				
			||||||
 | 
								expectedErr:  true,
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							"backoff-wrong-old-object": {
 | 
				
			||||||
 | 
								args:         &config.NodeAffinityArgs{},
 | 
				
			||||||
 | 
								pod:          podWithNodeAffinity.Obj(),
 | 
				
			||||||
 | 
								oldObj:       "not-a-node",
 | 
				
			||||||
 | 
								newObj:       st.MakeNode().Obj(),
 | 
				
			||||||
 | 
								expectedHint: framework.Queue,
 | 
				
			||||||
 | 
								expectedErr:  true,
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							"skip-queue-on-add": {
 | 
				
			||||||
 | 
								args:         &config.NodeAffinityArgs{},
 | 
				
			||||||
 | 
								pod:          podWithNodeAffinity.Obj(),
 | 
				
			||||||
 | 
								newObj:       st.MakeNode().Obj(),
 | 
				
			||||||
 | 
								expectedHint: framework.QueueSkip,
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							"queue-on-add": {
 | 
				
			||||||
 | 
								args:         &config.NodeAffinityArgs{},
 | 
				
			||||||
 | 
								pod:          podWithNodeAffinity.Obj(),
 | 
				
			||||||
 | 
								newObj:       st.MakeNode().Label("foo", "bar").Obj(),
 | 
				
			||||||
 | 
								expectedHint: framework.Queue,
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							"skip-unrelated-changes": {
 | 
				
			||||||
 | 
								args:         &config.NodeAffinityArgs{},
 | 
				
			||||||
 | 
								pod:          podWithNodeAffinity.Obj(),
 | 
				
			||||||
 | 
								oldObj:       st.MakeNode().Obj(),
 | 
				
			||||||
 | 
								newObj:       st.MakeNode().Capacity(nil).Obj(),
 | 
				
			||||||
 | 
								expectedHint: framework.QueueSkip,
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							"skip-unrelated-changes-on-labels": {
 | 
				
			||||||
 | 
								args:         &config.NodeAffinityArgs{},
 | 
				
			||||||
 | 
								pod:          podWithNodeAffinity.DeepCopy(),
 | 
				
			||||||
 | 
								oldObj:       st.MakeNode().Obj(),
 | 
				
			||||||
 | 
								newObj:       st.MakeNode().Label("k", "v").Obj(),
 | 
				
			||||||
 | 
								expectedHint: framework.QueueSkip,
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							"skip-labels-changes-on-node-from-suitable-to-unsuitable": {
 | 
				
			||||||
 | 
								args:         &config.NodeAffinityArgs{},
 | 
				
			||||||
 | 
								pod:          podWithNodeAffinity.DeepCopy(),
 | 
				
			||||||
 | 
								oldObj:       st.MakeNode().Label("foo", "bar").Obj(),
 | 
				
			||||||
 | 
								newObj:       st.MakeNode().Label("k", "v").Obj(),
 | 
				
			||||||
 | 
								expectedHint: framework.QueueSkip,
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							"queue-on-labels-change-makes-pod-schedulable": {
 | 
				
			||||||
 | 
								args:         &config.NodeAffinityArgs{},
 | 
				
			||||||
 | 
								pod:          podWithNodeAffinity.Obj(),
 | 
				
			||||||
 | 
								oldObj:       st.MakeNode().Obj(),
 | 
				
			||||||
 | 
								newObj:       st.MakeNode().Label("foo", "bar").Obj(),
 | 
				
			||||||
 | 
								expectedHint: framework.Queue,
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							"skip-queue-on-add-scheduler-enforced-node-affinity": {
 | 
				
			||||||
 | 
								args: &config.NodeAffinityArgs{
 | 
				
			||||||
 | 
									AddedAffinity: &v1.NodeAffinity{
 | 
				
			||||||
 | 
										RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{
 | 
				
			||||||
 | 
											NodeSelectorTerms: []v1.NodeSelectorTerm{
 | 
				
			||||||
 | 
												{
 | 
				
			||||||
 | 
													MatchExpressions: []v1.NodeSelectorRequirement{
 | 
				
			||||||
 | 
														{
 | 
				
			||||||
 | 
															Key:      "foo",
 | 
				
			||||||
 | 
															Operator: v1.NodeSelectorOpIn,
 | 
				
			||||||
 | 
															Values:   []string{"bar"},
 | 
				
			||||||
 | 
														},
 | 
				
			||||||
 | 
													},
 | 
				
			||||||
 | 
												},
 | 
				
			||||||
 | 
											},
 | 
				
			||||||
 | 
										},
 | 
				
			||||||
 | 
									},
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
								pod:          podWithNodeAffinity.Obj(),
 | 
				
			||||||
 | 
								newObj:       st.MakeNode().Obj(),
 | 
				
			||||||
 | 
								expectedHint: framework.QueueSkip,
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							"queue-on-add-scheduler-enforced-node-affinity": {
 | 
				
			||||||
 | 
								args: &config.NodeAffinityArgs{
 | 
				
			||||||
 | 
									AddedAffinity: &v1.NodeAffinity{
 | 
				
			||||||
 | 
										RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{
 | 
				
			||||||
 | 
											NodeSelectorTerms: []v1.NodeSelectorTerm{
 | 
				
			||||||
 | 
												{
 | 
				
			||||||
 | 
													MatchExpressions: []v1.NodeSelectorRequirement{
 | 
				
			||||||
 | 
														{
 | 
				
			||||||
 | 
															Key:      "foo",
 | 
				
			||||||
 | 
															Operator: v1.NodeSelectorOpIn,
 | 
				
			||||||
 | 
															Values:   []string{"bar"},
 | 
				
			||||||
 | 
														},
 | 
				
			||||||
 | 
													},
 | 
				
			||||||
 | 
												},
 | 
				
			||||||
 | 
											},
 | 
				
			||||||
 | 
										},
 | 
				
			||||||
 | 
									},
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
								pod:          podWithNodeAffinity.Obj(),
 | 
				
			||||||
 | 
								newObj:       st.MakeNode().Label("foo", "bar").Obj(),
 | 
				
			||||||
 | 
								expectedHint: framework.Queue,
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						for name, tc := range testcases {
 | 
				
			||||||
 | 
							t.Run(name, func(t *testing.T) {
 | 
				
			||||||
 | 
								logger, ctx := ktesting.NewTestContext(t)
 | 
				
			||||||
 | 
								p, err := New(ctx, tc.args, nil)
 | 
				
			||||||
 | 
								if err != nil {
 | 
				
			||||||
 | 
									t.Fatalf("Creating plugin: %v", err)
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								actualHint, err := p.(*NodeAffinity).isSchedulableAfterNodeChange(logger, tc.pod, tc.oldObj, tc.newObj)
 | 
				
			||||||
 | 
								if tc.expectedErr {
 | 
				
			||||||
 | 
									require.Error(t, err)
 | 
				
			||||||
 | 
									return
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								require.NoError(t, err)
 | 
				
			||||||
 | 
								require.Equal(t, tc.expectedHint, actualHint)
 | 
				
			||||||
 | 
							})
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user