add(scheduler/framework): implement smaller Pod update events
This commit is contained in:
		| @@ -270,19 +270,22 @@ func (sched *Scheduler) updatePodInCache(oldObj, newObj interface{}) { | |||||||
| 		logger.Error(err, "Scheduler cache UpdatePod failed", "pod", klog.KObj(oldPod)) | 		logger.Error(err, "Scheduler cache UpdatePod failed", "pod", klog.KObj(oldPod)) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	// SchedulingQueue.AssignedPodUpdated has a problem: | 	events := queue.PodSchedulingPropertiesChange(newPod, oldPod) | ||||||
| 	// It internally pre-filters Pods to move to activeQ, | 	for _, evt := range events { | ||||||
| 	// while taking only in-tree plugins into consideration. | 		// SchedulingQueue.AssignedPodUpdated has a problem: | ||||||
| 	// Consequently, if custom plugins that subscribes Pod/Update events reject Pods, | 		// It internally pre-filters Pods to move to activeQ, | ||||||
| 	// those Pods will never be requeued to activeQ by an assigned Pod related events, | 		// while taking only in-tree plugins into consideration. | ||||||
| 	// and they may be stuck in unschedulableQ. | 		// Consequently, if custom plugins that subscribes Pod/Update events reject Pods, | ||||||
| 	// | 		// those Pods will never be requeued to activeQ by an assigned Pod related events, | ||||||
| 	// Here we use MoveAllToActiveOrBackoffQueue only when QueueingHint is enabled. | 		// and they may be stuck in unschedulableQ. | ||||||
| 	// (We cannot switch to MoveAllToActiveOrBackoffQueue right away because of throughput concern.) | 		// | ||||||
| 	if utilfeature.DefaultFeatureGate.Enabled(features.SchedulerQueueingHints) { | 		// Here we use MoveAllToActiveOrBackoffQueue only when QueueingHint is enabled. | ||||||
| 		sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, queue.AssignedPodUpdate, oldPod, newPod, nil) | 		// (We cannot switch to MoveAllToActiveOrBackoffQueue right away because of throughput concern.) | ||||||
| 	} else { | 		if utilfeature.DefaultFeatureGate.Enabled(features.SchedulerQueueingHints) { | ||||||
| 		sched.SchedulingQueue.AssignedPodUpdated(logger, oldPod, newPod) | 			sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, queue.AssignedPodUpdate, oldPod, newPod, nil) | ||||||
|  | 		} else { | ||||||
|  | 			sched.SchedulingQueue.AssignedPodUpdated(logger, oldPod, newPod, evt) | ||||||
|  | 		} | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
|   | |||||||
| @@ -62,7 +62,7 @@ func (pl *InterPodAffinity) EventsToRegister(_ context.Context) ([]framework.Clu | |||||||
| 		// All ActionType includes the following events: | 		// All ActionType includes the following events: | ||||||
| 		// - Delete. An unschedulable Pod may fail due to violating an existing Pod's anti-affinity constraints, | 		// - Delete. An unschedulable Pod may fail due to violating an existing Pod's anti-affinity constraints, | ||||||
| 		// deleting an existing Pod may make it schedulable. | 		// deleting an existing Pod may make it schedulable. | ||||||
| 		// - Update. Updating on an existing Pod's labels (e.g., removal) may make | 		// - UpdatePodLabel. Updating on an existing Pod's labels (e.g., removal) may make | ||||||
| 		// an unschedulable Pod schedulable. | 		// an unschedulable Pod schedulable. | ||||||
| 		// - Add. An unschedulable Pod may fail due to violating pod-affinity constraints, | 		// - Add. An unschedulable Pod may fail due to violating pod-affinity constraints, | ||||||
| 		// adding an assigned Pod may make it schedulable. | 		// adding an assigned Pod may make it schedulable. | ||||||
| @@ -75,7 +75,7 @@ func (pl *InterPodAffinity) EventsToRegister(_ context.Context) ([]framework.Clu | |||||||
| 		// As a workaround, we add UpdateNodeTaint event to catch the case. | 		// As a workaround, we add UpdateNodeTaint event to catch the case. | ||||||
| 		// We can remove UpdateNodeTaint when we remove the preCheck feature. | 		// We can remove UpdateNodeTaint when we remove the preCheck feature. | ||||||
| 		// See: https://github.com/kubernetes/kubernetes/issues/110175 | 		// See: https://github.com/kubernetes/kubernetes/issues/110175 | ||||||
| 		{Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.All}, QueueingHintFn: pl.isSchedulableAfterPodChange}, | 		{Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Add | framework.UpdatePodLabel | framework.Delete}, QueueingHintFn: pl.isSchedulableAfterPodChange}, | ||||||
| 		{Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.UpdateNodeLabel | framework.UpdateNodeTaint}, QueueingHintFn: pl.isSchedulableAfterNodeChange}, | 		{Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.UpdateNodeLabel | framework.UpdateNodeTaint}, QueueingHintFn: pl.isSchedulableAfterNodeChange}, | ||||||
| 	}, nil | 	}, nil | ||||||
| } | } | ||||||
|   | |||||||
| @@ -250,9 +250,9 @@ func getPreFilterState(cycleState *framework.CycleState) (*preFilterState, error | |||||||
| func (f *Fit) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) { | func (f *Fit) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) { | ||||||
| 	podActionType := framework.Delete | 	podActionType := framework.Delete | ||||||
| 	if f.enableInPlacePodVerticalScaling { | 	if f.enableInPlacePodVerticalScaling { | ||||||
| 		// If InPlacePodVerticalScaling (KEP 1287) is enabled, then PodUpdate event should be registered | 		// If InPlacePodVerticalScaling (KEP 1287) is enabled, then PodRequestUpdate event should be registered | ||||||
| 		// for this plugin since a Pod update may free up resources that make other Pods schedulable. | 		// for this plugin since a Pod update may free up resources that make other Pods schedulable. | ||||||
| 		podActionType |= framework.Update | 		podActionType |= framework.UpdatePodRequest | ||||||
| 	} | 	} | ||||||
| 	return []framework.ClusterEventWithHint{ | 	return []framework.ClusterEventWithHint{ | ||||||
| 		{Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: podActionType}, QueueingHintFn: f.isSchedulableAfterPodChange}, | 		{Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: podActionType}, QueueingHintFn: f.isSchedulableAfterPodChange}, | ||||||
|   | |||||||
| @@ -1095,7 +1095,7 @@ func TestEventsToRegister(t *testing.T) { | |||||||
| 			"Register events with InPlacePodVerticalScaling feature enabled", | 			"Register events with InPlacePodVerticalScaling feature enabled", | ||||||
| 			true, | 			true, | ||||||
| 			[]framework.ClusterEventWithHint{ | 			[]framework.ClusterEventWithHint{ | ||||||
| 				{Event: framework.ClusterEvent{Resource: "Pod", ActionType: framework.Update | framework.Delete}}, | 				{Event: framework.ClusterEvent{Resource: "Pod", ActionType: framework.UpdatePodRequest | framework.Delete}}, | ||||||
| 				{Event: framework.ClusterEvent{Resource: "Node", ActionType: framework.Add | framework.Update}}, | 				{Event: framework.ClusterEvent{Resource: "Node", ActionType: framework.Add | framework.Update}}, | ||||||
| 			}, | 			}, | ||||||
| 		}, | 		}, | ||||||
|   | |||||||
| @@ -139,11 +139,11 @@ func (pl *PodTopologySpread) EventsToRegister(_ context.Context) ([]framework.Cl | |||||||
| 		// All ActionType includes the following events: | 		// All ActionType includes the following events: | ||||||
| 		// - Add. An unschedulable Pod may fail due to violating topology spread constraints, | 		// - Add. An unschedulable Pod may fail due to violating topology spread constraints, | ||||||
| 		// adding an assigned Pod may make it schedulable. | 		// adding an assigned Pod may make it schedulable. | ||||||
| 		// - Update. Updating on an existing Pod's labels (e.g., removal) may make | 		// - UpdatePodLabel. Updating on an existing Pod's labels (e.g., removal) may make | ||||||
| 		// an unschedulable Pod schedulable. | 		// an unschedulable Pod schedulable. | ||||||
| 		// - Delete. An unschedulable Pod may fail due to violating an existing Pod's topology spread constraints, | 		// - Delete. An unschedulable Pod may fail due to violating an existing Pod's topology spread constraints, | ||||||
| 		// deleting an existing Pod may make it schedulable. | 		// deleting an existing Pod may make it schedulable. | ||||||
| 		{Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.All}, QueueingHintFn: pl.isSchedulableAfterPodChange}, | 		{Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Add | framework.UpdatePodLabel | framework.Delete}, QueueingHintFn: pl.isSchedulableAfterPodChange}, | ||||||
| 		// Node add|delete|update maybe lead an topology key changed, | 		// Node add|delete|update maybe lead an topology key changed, | ||||||
| 		// and make these pod in scheduling schedulable or unschedulable. | 		// and make these pod in scheduling schedulable or unschedulable. | ||||||
| 		// | 		// | ||||||
|   | |||||||
| @@ -46,25 +46,39 @@ type ActionType int64 | |||||||
|  |  | ||||||
| // Constants for ActionTypes. | // Constants for ActionTypes. | ||||||
| const ( | const ( | ||||||
| 	Add    ActionType = 1 << iota // 1 | 	Add ActionType = 1 << iota | ||||||
| 	Delete                        // 10 | 	Delete | ||||||
| 	// UpdateNodeXYZ is only applicable for Node events. | 	// UpdateNodeXYZ is only applicable for Node events. | ||||||
| 	UpdateNodeAllocatable // 100 | 	// If you use UpdateNodeXYZ, | ||||||
| 	UpdateNodeLabel       // 1000 | 	// your plugin's QueueingHint is only executed for the specific sub-Update event. | ||||||
| 	UpdateNodeTaint       // 10000 | 	// It's better to narrow down the scope of the event by specifying them | ||||||
| 	UpdateNodeCondition   // 100000 | 	// for better performance in requeueing. | ||||||
| 	UpdateNodeAnnotation  // 1000000 | 	UpdateNodeAllocatable | ||||||
|  | 	UpdateNodeLabel | ||||||
|  | 	UpdateNodeTaint | ||||||
|  | 	UpdateNodeCondition | ||||||
|  | 	UpdateNodeAnnotation | ||||||
|  |  | ||||||
| 	All ActionType = 1<<iota - 1 // 1111111 | 	// UpdatePodXYZ is only applicable for Pod events. | ||||||
|  | 	UpdatePodLabel | ||||||
|  | 	// UpdatePodRequest is a update for pod's resource request calculated by resource.PodRequests() function. | ||||||
|  | 	UpdatePodRequest | ||||||
|  |  | ||||||
|  | 	All ActionType = 1<<iota - 1 | ||||||
|  |  | ||||||
| 	// Use the general Update type if you don't either know or care the specific sub-Update type to use. | 	// Use the general Update type if you don't either know or care the specific sub-Update type to use. | ||||||
| 	Update = UpdateNodeAllocatable | UpdateNodeLabel | UpdateNodeTaint | UpdateNodeCondition | UpdateNodeAnnotation | 	Update = UpdateNodeAllocatable | UpdateNodeLabel | UpdateNodeTaint | UpdateNodeCondition | UpdateNodeAnnotation | UpdatePodLabel | UpdatePodRequest | ||||||
| ) | ) | ||||||
|  |  | ||||||
| // GVK is short for group/version/kind, which can uniquely represent a particular API resource. | // GVK is short for group/version/kind, which can uniquely represent a particular API resource. | ||||||
| type GVK string | type GVK string | ||||||
|  |  | ||||||
| // Constants for GVKs. | // Constants for GVKs. | ||||||
|  | // | ||||||
|  | // Note: | ||||||
|  | // - UpdatePodXYZ or UpdateNodeXYZ: triggered by updating particular parts of a Pod or a Node, e.g. updatePodLabel. | ||||||
|  | // Use specific events rather than general ones (updatePodLabel vs update) can make the requeueing process more efficient | ||||||
|  | // and consume less memories as less events will be cached at scheduler. | ||||||
| const ( | const ( | ||||||
| 	// There are a couple of notes about how the scheduler notifies the events of Pods: | 	// There are a couple of notes about how the scheduler notifies the events of Pods: | ||||||
| 	// - Add: add events could be triggered by either a newly created Pod or an existing Pod that is scheduled to a Node. | 	// - Add: add events could be triggered by either a newly created Pod or an existing Pod that is scheduled to a Node. | ||||||
| @@ -166,7 +180,8 @@ func (s QueueingHint) String() string { | |||||||
| type ClusterEvent struct { | type ClusterEvent struct { | ||||||
| 	Resource   GVK | 	Resource   GVK | ||||||
| 	ActionType ActionType | 	ActionType ActionType | ||||||
| 	Label      string | 	// Label describes this cluster event, only used in logging and metrics. | ||||||
|  | 	Label string | ||||||
| } | } | ||||||
|  |  | ||||||
| // IsWildCard returns true if ClusterEvent follows WildCard semantics | // IsWildCard returns true if ClusterEvent follows WildCard semantics | ||||||
| @@ -175,14 +190,25 @@ func (ce ClusterEvent) IsWildCard() bool { | |||||||
| } | } | ||||||
|  |  | ||||||
| // Match returns true if ClusterEvent is matched with the coming event. | // Match returns true if ClusterEvent is matched with the coming event. | ||||||
|  | // "match" means the coming event is the same or more specific than the ce. | ||||||
|  | // i.e., when ce.ActionType is Update, it return true if a coming event is UpdateNodeLabel | ||||||
|  | // because UpdateNodeLabel is more specific than Update. | ||||||
|  | // On the other hand, when ce.ActionType is UpdateNodeLabel, it doesn't return true if a coming event is Update. | ||||||
|  | // This is based on the fact that the scheduler interprets the coming cluster event as specific event if possible; | ||||||
|  | // meaning, if a coming event is Node/Update, | ||||||
|  | // it means that Node's update is not something that can be interpreted as any of Node's specific Update events. | ||||||
|  | // | ||||||
| // If the ce.Resource is "*", there's no requirement for the coming event' Resource. | // If the ce.Resource is "*", there's no requirement for the coming event' Resource. | ||||||
| // Contrarily, if the coming event's Resource is "*", the ce.Resource should only be "*". | // Contrarily, if the coming event's Resource is "*", the ce.Resource should only be "*". | ||||||
|  | // (which should never happen in the current implementation of the scheduling queue.) | ||||||
| // | // | ||||||
| // Note: we have a special case here when the coming event is a wildcard event, | // Note: we have a special case here when the coming event is a wildcard event, | ||||||
| // it will force all Pods to move to activeQ/backoffQ, | // it will force all Pods to move to activeQ/backoffQ, | ||||||
| // but we take it as an unmatched event unless the ce is also a wildcard one. | // but we take it as an unmatched event unless the ce is also a wildcard one. | ||||||
| func (ce ClusterEvent) Match(event ClusterEvent) bool { | func (ce ClusterEvent) Match(comingEvent ClusterEvent) bool { | ||||||
| 	return ce.IsWildCard() || (ce.Resource == WildCard || ce.Resource == event.Resource) && ce.ActionType&event.ActionType != 0 | 	return ce.IsWildCard() || | ||||||
|  | 		(ce.Resource == WildCard || ce.Resource == comingEvent.Resource) && | ||||||
|  | 			(ce.ActionType&comingEvent.ActionType != 0 && comingEvent.ActionType <= ce.ActionType) | ||||||
| } | } | ||||||
|  |  | ||||||
| func UnrollWildCardResource() []ClusterEventWithHint { | func UnrollWildCardResource() []ClusterEventWithHint { | ||||||
|   | |||||||
| @@ -1626,9 +1626,15 @@ func TestCloudEvent_Match(t *testing.T) { | |||||||
| 			wantResult:  true, | 			wantResult:  true, | ||||||
| 		}, | 		}, | ||||||
| 		{ | 		{ | ||||||
| 			name:        "event with resource = 'Pod' matching with coming events carries same actionType", | 			name:        "no match if a coming event is less specific", | ||||||
| 			event:       ClusterEvent{Resource: Pod, ActionType: UpdateNodeLabel | UpdateNodeTaint}, | 			event:       ClusterEvent{Resource: Node, ActionType: UpdateNodeLabel}, | ||||||
| 			comingEvent: ClusterEvent{Resource: Pod, ActionType: UpdateNodeLabel}, | 			comingEvent: ClusterEvent{Resource: Node, ActionType: Update}, | ||||||
|  | 			wantResult:  false, | ||||||
|  | 		}, | ||||||
|  | 		{ | ||||||
|  | 			name:        "match if a coming event is more specific", | ||||||
|  | 			event:       ClusterEvent{Resource: Node, ActionType: Update}, | ||||||
|  | 			comingEvent: ClusterEvent{Resource: Node, ActionType: UpdateNodeLabel}, | ||||||
| 			wantResult:  true, | 			wantResult:  true, | ||||||
| 		}, | 		}, | ||||||
| 		{ | 		{ | ||||||
|   | |||||||
| @@ -19,6 +19,9 @@ package queue | |||||||
| import ( | import ( | ||||||
| 	v1 "k8s.io/api/core/v1" | 	v1 "k8s.io/api/core/v1" | ||||||
| 	"k8s.io/apimachinery/pkg/api/equality" | 	"k8s.io/apimachinery/pkg/api/equality" | ||||||
|  | 	utilfeature "k8s.io/apiserver/pkg/util/feature" | ||||||
|  | 	"k8s.io/kubernetes/pkg/api/v1/resource" | ||||||
|  | 	"k8s.io/kubernetes/pkg/features" | ||||||
| 	"k8s.io/kubernetes/pkg/scheduler/framework" | 	"k8s.io/kubernetes/pkg/scheduler/framework" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| @@ -53,6 +56,10 @@ var ( | |||||||
| 	UnscheduledPodDelete = framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Update, Label: "UnschedulablePodDelete"} | 	UnscheduledPodDelete = framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Update, Label: "UnschedulablePodDelete"} | ||||||
| 	// AssignedPodDelete is the event when an assigned pod is deleted. | 	// AssignedPodDelete is the event when an assigned pod is deleted. | ||||||
| 	AssignedPodDelete = framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Delete, Label: "AssignedPodDelete"} | 	AssignedPodDelete = framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Delete, Label: "AssignedPodDelete"} | ||||||
|  | 	// PodRequestChange is the event when a pod's resource request is changed. | ||||||
|  | 	PodRequestChange = framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.UpdatePodRequest, Label: "PodRequestChange"} | ||||||
|  | 	// PodLabelChange is the event when a pod's label is changed. | ||||||
|  | 	PodLabelChange = framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.UpdatePodLabel, Label: "PodLabelChange"} | ||||||
| 	// NodeSpecUnschedulableChange is the event when unschedulable node spec is changed. | 	// NodeSpecUnschedulableChange is the event when unschedulable node spec is changed. | ||||||
| 	NodeSpecUnschedulableChange = framework.ClusterEvent{Resource: framework.Node, ActionType: framework.UpdateNodeTaint, Label: "NodeSpecUnschedulableChange"} | 	NodeSpecUnschedulableChange = framework.ClusterEvent{Resource: framework.Node, ActionType: framework.UpdateNodeTaint, Label: "NodeSpecUnschedulableChange"} | ||||||
| 	// NodeAllocatableChange is the event when node allocatable is changed. | 	// NodeAllocatableChange is the event when node allocatable is changed. | ||||||
| @@ -95,6 +102,44 @@ var ( | |||||||
| 	UnschedulableTimeout = framework.ClusterEvent{Resource: framework.WildCard, ActionType: framework.All, Label: "UnschedulableTimeout"} | 	UnschedulableTimeout = framework.ClusterEvent{Resource: framework.WildCard, ActionType: framework.All, Label: "UnschedulableTimeout"} | ||||||
| ) | ) | ||||||
|  |  | ||||||
|  | // PodSchedulingPropertiesChange interprets the update of a pod and returns corresponding UpdatePodXYZ event(s). | ||||||
|  | // Once we have other pod update events, we should update here as well. | ||||||
|  | func PodSchedulingPropertiesChange(newPod *v1.Pod, oldPod *v1.Pod) (events []framework.ClusterEvent) { | ||||||
|  | 	podChangeExtracters := []podChangeExtractor{ | ||||||
|  | 		extractPodLabelsChange, | ||||||
|  | 		extractPodResourceRequestChange, | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	for _, fn := range podChangeExtracters { | ||||||
|  | 		if event := fn(newPod, oldPod); event != nil { | ||||||
|  | 			events = append(events, *event) | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	if len(events) == 0 { | ||||||
|  | 		events = append(events, AssignedPodUpdate) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	return | ||||||
|  | } | ||||||
|  |  | ||||||
|  | type podChangeExtractor func(newNode *v1.Pod, oldNode *v1.Pod) *framework.ClusterEvent | ||||||
|  |  | ||||||
|  | func extractPodResourceRequestChange(newPod, oldPod *v1.Pod) *framework.ClusterEvent { | ||||||
|  | 	opt := resource.PodResourcesOptions{InPlacePodVerticalScalingEnabled: utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling)} | ||||||
|  | 	if !equality.Semantic.DeepEqual(resource.PodRequests(newPod, opt), resource.PodRequests(oldPod, opt)) { | ||||||
|  | 		return &PodRequestChange | ||||||
|  | 	} | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func extractPodLabelsChange(newPod *v1.Pod, oldPod *v1.Pod) *framework.ClusterEvent { | ||||||
|  | 	if isLabelChanged(newPod.GetLabels(), oldPod.GetLabels()) { | ||||||
|  | 		return &PodLabelChange | ||||||
|  | 	} | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  |  | ||||||
| // NodeSchedulingPropertiesChange interprets the update of a node and returns corresponding UpdateNodeXYZ event(s). | // NodeSchedulingPropertiesChange interprets the update of a node and returns corresponding UpdateNodeXYZ event(s). | ||||||
| func NodeSchedulingPropertiesChange(newNode *v1.Node, oldNode *v1.Node) (events []framework.ClusterEvent) { | func NodeSchedulingPropertiesChange(newNode *v1.Node, oldNode *v1.Node) (events []framework.ClusterEvent) { | ||||||
| 	nodeChangeExtracters := []nodeChangeExtractor{ | 	nodeChangeExtracters := []nodeChangeExtractor{ | ||||||
| @@ -124,12 +169,16 @@ func extractNodeAllocatableChange(newNode *v1.Node, oldNode *v1.Node) *framework | |||||||
| } | } | ||||||
|  |  | ||||||
| func extractNodeLabelsChange(newNode *v1.Node, oldNode *v1.Node) *framework.ClusterEvent { | func extractNodeLabelsChange(newNode *v1.Node, oldNode *v1.Node) *framework.ClusterEvent { | ||||||
| 	if !equality.Semantic.DeepEqual(oldNode.GetLabels(), newNode.GetLabels()) { | 	if isLabelChanged(newNode.GetLabels(), oldNode.GetLabels()) { | ||||||
| 		return &NodeLabelChange | 		return &NodeLabelChange | ||||||
| 	} | 	} | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
|  |  | ||||||
|  | func isLabelChanged(newLabels map[string]string, oldLabels map[string]string) bool { | ||||||
|  | 	return !equality.Semantic.DeepEqual(newLabels, oldLabels) | ||||||
|  | } | ||||||
|  |  | ||||||
| func extractNodeTaintsChange(newNode *v1.Node, oldNode *v1.Node) *framework.ClusterEvent { | func extractNodeTaintsChange(newNode *v1.Node, oldNode *v1.Node) *framework.ClusterEvent { | ||||||
| 	if !equality.Semantic.DeepEqual(newNode.Spec.Taints, oldNode.Spec.Taints) { | 	if !equality.Semantic.DeepEqual(newNode.Spec.Taints, oldNode.Spec.Taints) { | ||||||
| 		return &NodeTaintChange | 		return &NodeTaintChange | ||||||
|   | |||||||
| @@ -270,3 +270,108 @@ func TestNodeSchedulingPropertiesChange(t *testing.T) { | |||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
|  | func Test_podSchedulingPropertiesChange(t *testing.T) { | ||||||
|  | 	podWithBigRequest := &v1.Pod{ | ||||||
|  | 		Spec: v1.PodSpec{ | ||||||
|  | 			Containers: []v1.Container{ | ||||||
|  | 				{ | ||||||
|  | 					Name: "app", | ||||||
|  | 					Resources: v1.ResourceRequirements{ | ||||||
|  | 						Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("101m")}, | ||||||
|  | 					}, | ||||||
|  | 				}, | ||||||
|  | 			}, | ||||||
|  | 		}, | ||||||
|  | 		Status: v1.PodStatus{ | ||||||
|  | 			ContainerStatuses: []v1.ContainerStatus{ | ||||||
|  | 				{ | ||||||
|  | 					Name:               "app", | ||||||
|  | 					AllocatedResources: v1.ResourceList{v1.ResourceCPU: resource.MustParse("101m")}, | ||||||
|  | 				}, | ||||||
|  | 			}, | ||||||
|  | 		}, | ||||||
|  | 	} | ||||||
|  | 	podWithBigRequestAndLabel := &v1.Pod{ | ||||||
|  | 		ObjectMeta: metav1.ObjectMeta{ | ||||||
|  | 			Labels: map[string]string{"foo": "bar"}, | ||||||
|  | 		}, | ||||||
|  | 		Spec: v1.PodSpec{ | ||||||
|  | 			Containers: []v1.Container{ | ||||||
|  | 				{ | ||||||
|  | 					Name: "app", | ||||||
|  | 					Resources: v1.ResourceRequirements{ | ||||||
|  | 						Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("101m")}, | ||||||
|  | 					}, | ||||||
|  | 				}, | ||||||
|  | 			}, | ||||||
|  | 		}, | ||||||
|  | 		Status: v1.PodStatus{ | ||||||
|  | 			ContainerStatuses: []v1.ContainerStatus{ | ||||||
|  | 				{ | ||||||
|  | 					Name:               "app", | ||||||
|  | 					AllocatedResources: v1.ResourceList{v1.ResourceCPU: resource.MustParse("101m")}, | ||||||
|  | 				}, | ||||||
|  | 			}, | ||||||
|  | 		}, | ||||||
|  | 	} | ||||||
|  | 	podWithSmallRequest := &v1.Pod{ | ||||||
|  | 		Spec: v1.PodSpec{ | ||||||
|  | 			Containers: []v1.Container{ | ||||||
|  | 				{ | ||||||
|  | 					Name: "app", | ||||||
|  | 					Resources: v1.ResourceRequirements{ | ||||||
|  | 						Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("100m")}, | ||||||
|  | 					}, | ||||||
|  | 				}, | ||||||
|  | 			}, | ||||||
|  | 		}, | ||||||
|  | 		Status: v1.PodStatus{ | ||||||
|  | 			ContainerStatuses: []v1.ContainerStatus{ | ||||||
|  | 				{ | ||||||
|  | 					Name:               "app", | ||||||
|  | 					AllocatedResources: v1.ResourceList{v1.ResourceCPU: resource.MustParse("100m")}, | ||||||
|  | 				}, | ||||||
|  | 			}, | ||||||
|  | 		}, | ||||||
|  | 	} | ||||||
|  | 	tests := []struct { | ||||||
|  | 		name   string | ||||||
|  | 		newPod *v1.Pod | ||||||
|  | 		oldPod *v1.Pod | ||||||
|  | 		want   []framework.ClusterEvent | ||||||
|  | 	}{ | ||||||
|  | 		{ | ||||||
|  | 			name:   "only label is updated", | ||||||
|  | 			newPod: st.MakePod().Label("foo", "bar").Obj(), | ||||||
|  | 			oldPod: st.MakePod().Label("foo", "bar2").Obj(), | ||||||
|  | 			want:   []framework.ClusterEvent{PodLabelChange}, | ||||||
|  | 		}, | ||||||
|  | 		{ | ||||||
|  | 			name:   "only pod's resource request is updated", | ||||||
|  | 			oldPod: podWithSmallRequest, | ||||||
|  | 			newPod: podWithBigRequest, | ||||||
|  | 			want:   []framework.ClusterEvent{PodRequestChange}, | ||||||
|  | 		}, | ||||||
|  | 		{ | ||||||
|  | 			name:   "both pod's resource request and label are updated", | ||||||
|  | 			oldPod: podWithSmallRequest, | ||||||
|  | 			newPod: podWithBigRequestAndLabel, | ||||||
|  | 			want:   []framework.ClusterEvent{PodLabelChange, PodRequestChange}, | ||||||
|  | 		}, | ||||||
|  | 		{ | ||||||
|  | 			name:   "untracked properties of pod is updated", | ||||||
|  | 			newPod: st.MakePod().Annotation("foo", "bar").Obj(), | ||||||
|  | 			oldPod: st.MakePod().Annotation("foo", "bar2").Obj(), | ||||||
|  | 			want:   []framework.ClusterEvent{AssignedPodUpdate}, | ||||||
|  | 		}, | ||||||
|  | 	} | ||||||
|  | 	for _, tt := range tests { | ||||||
|  | 		t.Run(tt.name, func(t *testing.T) { | ||||||
|  | 			got := PodSchedulingPropertiesChange(tt.newPod, tt.oldPod) | ||||||
|  | 			if diff := cmp.Diff(tt.want, got); diff != "" { | ||||||
|  | 				t.Errorf("unexpected event is returned from podSchedulingPropertiesChange (-want, +got):\n%s", diff) | ||||||
|  | 			} | ||||||
|  | 		}) | ||||||
|  | 	} | ||||||
|  | } | ||||||
|   | |||||||
| @@ -120,7 +120,7 @@ type SchedulingQueue interface { | |||||||
| 	// See https://github.com/kubernetes/kubernetes/issues/110175 | 	// See https://github.com/kubernetes/kubernetes/issues/110175 | ||||||
| 	MoveAllToActiveOrBackoffQueue(logger klog.Logger, event framework.ClusterEvent, oldObj, newObj interface{}, preCheck PreEnqueueCheck) | 	MoveAllToActiveOrBackoffQueue(logger klog.Logger, event framework.ClusterEvent, oldObj, newObj interface{}, preCheck PreEnqueueCheck) | ||||||
| 	AssignedPodAdded(logger klog.Logger, pod *v1.Pod) | 	AssignedPodAdded(logger klog.Logger, pod *v1.Pod) | ||||||
| 	AssignedPodUpdated(logger klog.Logger, oldPod, newPod *v1.Pod) | 	AssignedPodUpdated(logger klog.Logger, oldPod, newPod *v1.Pod, event framework.ClusterEvent) | ||||||
| 	PendingPods() ([]*v1.Pod, string) | 	PendingPods() ([]*v1.Pod, string) | ||||||
| 	PodsInActiveQ() []*v1.Pod | 	PodsInActiveQ() []*v1.Pod | ||||||
| 	// Close closes the SchedulingQueue so that the goroutine which is | 	// Close closes the SchedulingQueue so that the goroutine which is | ||||||
| @@ -438,6 +438,7 @@ const ( | |||||||
| // isEventOfInterest returns true if the event is of interest by some plugins. | // isEventOfInterest returns true if the event is of interest by some plugins. | ||||||
| func (p *PriorityQueue) isEventOfInterest(logger klog.Logger, event framework.ClusterEvent) bool { | func (p *PriorityQueue) isEventOfInterest(logger klog.Logger, event framework.ClusterEvent) bool { | ||||||
| 	if event.IsWildCard() { | 	if event.IsWildCard() { | ||||||
|  | 		// Wildcard event moves Pods that failed with any plugins. | ||||||
| 		return true | 		return true | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| @@ -1086,15 +1087,21 @@ func (p *PriorityQueue) Update(logger klog.Logger, oldPod, newPod *v1.Pod) error | |||||||
| 			// whether the update may make the pods schedulable. | 			// whether the update may make the pods schedulable. | ||||||
| 			// Plugins have to implement a QueueingHint for Pod/Update event | 			// Plugins have to implement a QueueingHint for Pod/Update event | ||||||
| 			// if the rejection from them could be resolved by updating unscheduled Pods itself. | 			// if the rejection from them could be resolved by updating unscheduled Pods itself. | ||||||
| 			hint := p.isPodWorthRequeuing(logger, pInfo, UnscheduledPodUpdate, oldPod, newPod) |  | ||||||
| 			queue := p.requeuePodViaQueueingHint(logger, pInfo, hint, UnscheduledPodUpdate.Label) | 			events := PodSchedulingPropertiesChange(newPod, oldPod) | ||||||
| 			if queue != unschedulablePods { | 			for _, evt := range events { | ||||||
| 				logger.V(5).Info("Pod moved to an internal scheduling queue because the Pod is updated", "pod", klog.KObj(newPod), "event", PodUpdate, "queue", queue) | 				hint := p.isPodWorthRequeuing(logger, pInfo, evt, oldPod, newPod) | ||||||
| 				p.unschedulablePods.delete(usPodInfo.Pod, gated) | 				queue := p.requeuePodViaQueueingHint(logger, pInfo, hint, UnscheduledPodUpdate.Label) | ||||||
| 			} | 				if queue != unschedulablePods { | ||||||
| 			if queue == activeQ { | 					logger.V(5).Info("Pod moved to an internal scheduling queue because the Pod is updated", "pod", klog.KObj(newPod), "event", PodUpdate, "queue", queue) | ||||||
| 				p.cond.Broadcast() | 					p.unschedulablePods.delete(usPodInfo.Pod, gated) | ||||||
|  | 				} | ||||||
|  | 				if queue == activeQ { | ||||||
|  | 					p.cond.Broadcast() | ||||||
|  | 					break | ||||||
|  | 				} | ||||||
| 			} | 			} | ||||||
|  |  | ||||||
| 			return nil | 			return nil | ||||||
| 		} | 		} | ||||||
| 		if isPodUpdated(oldPod, newPod) { | 		if isPodUpdated(oldPod, newPod) { | ||||||
| @@ -1158,32 +1165,18 @@ func (p *PriorityQueue) AssignedPodAdded(logger klog.Logger, pod *v1.Pod) { | |||||||
| 	p.lock.Unlock() | 	p.lock.Unlock() | ||||||
| } | } | ||||||
|  |  | ||||||
| // isPodResourcesResizedDown returns true if a pod CPU and/or memory resize request has been |  | ||||||
| // admitted by kubelet, is 'InProgress', and results in a net sizing down of updated resources. |  | ||||||
| // It returns false if either CPU or memory resource is net resized up, or if no resize is in progress. |  | ||||||
| func isPodResourcesResizedDown(pod *v1.Pod) bool { |  | ||||||
| 	if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) { |  | ||||||
| 		// TODO(vinaykul,wangchen615,InPlacePodVerticalScaling): Fix this to determine when a |  | ||||||
| 		// pod is truly resized down (might need oldPod if we cannot determine from Status alone) |  | ||||||
| 		if pod.Status.Resize == v1.PodResizeStatusInProgress { |  | ||||||
| 			return true |  | ||||||
| 		} |  | ||||||
| 	} |  | ||||||
| 	return false |  | ||||||
| } |  | ||||||
|  |  | ||||||
| // AssignedPodUpdated is called when a bound pod is updated. Change of labels | // AssignedPodUpdated is called when a bound pod is updated. Change of labels | ||||||
| // may make pending pods with matching affinity terms schedulable. | // may make pending pods with matching affinity terms schedulable. | ||||||
| func (p *PriorityQueue) AssignedPodUpdated(logger klog.Logger, oldPod, newPod *v1.Pod) { | func (p *PriorityQueue) AssignedPodUpdated(logger klog.Logger, oldPod, newPod *v1.Pod, event framework.ClusterEvent) { | ||||||
| 	p.lock.Lock() | 	p.lock.Lock() | ||||||
| 	if isPodResourcesResizedDown(newPod) { | 	if event.Resource == framework.Pod && event.ActionType&framework.UpdatePodRequest != 0 { | ||||||
| 		// In this case, we don't want to pre-filter Pods by getUnschedulablePodsWithCrossTopologyTerm | 		// In this case, we don't want to pre-filter Pods by getUnschedulablePodsWithCrossTopologyTerm | ||||||
| 		// because Pod related events may make Pods that were rejected by NodeResourceFit schedulable. | 		// because Pod related events may make Pods that were rejected by NodeResourceFit schedulable. | ||||||
| 		p.moveAllToActiveOrBackoffQueue(logger, AssignedPodUpdate, oldPod, newPod, nil) | 		p.moveAllToActiveOrBackoffQueue(logger, AssignedPodUpdate, oldPod, newPod, nil) | ||||||
| 	} else { | 	} else { | ||||||
| 		// Pre-filter Pods to move by getUnschedulablePodsWithCrossTopologyTerm | 		// Pre-filter Pods to move by getUnschedulablePodsWithCrossTopologyTerm | ||||||
| 		// because Pod related events only make Pods rejected by cross topology term schedulable. | 		// because Pod related events only make Pods rejected by cross topology term schedulable. | ||||||
| 		p.movePodsToActiveOrBackoffQueue(logger, p.getUnschedulablePodsWithCrossTopologyTerm(logger, newPod), AssignedPodUpdate, oldPod, newPod) | 		p.movePodsToActiveOrBackoffQueue(logger, p.getUnschedulablePodsWithCrossTopologyTerm(logger, newPod), event, oldPod, newPod) | ||||||
| 	} | 	} | ||||||
| 	p.lock.Unlock() | 	p.lock.Unlock() | ||||||
| } | } | ||||||
|   | |||||||
| @@ -788,9 +788,10 @@ func Test_buildQueueingHintMap(t *testing.T) { | |||||||
| // Test_UnionedGVKs tests UnionedGVKs worked with buildQueueingHintMap. | // Test_UnionedGVKs tests UnionedGVKs worked with buildQueueingHintMap. | ||||||
| func Test_UnionedGVKs(t *testing.T) { | func Test_UnionedGVKs(t *testing.T) { | ||||||
| 	tests := []struct { | 	tests := []struct { | ||||||
| 		name    string | 		name                            string | ||||||
| 		plugins schedulerapi.PluginSet | 		plugins                         schedulerapi.PluginSet | ||||||
| 		want    map[framework.GVK]framework.ActionType | 		want                            map[framework.GVK]framework.ActionType | ||||||
|  | 		enableInPlacePodVerticalScaling bool | ||||||
| 	}{ | 	}{ | ||||||
| 		{ | 		{ | ||||||
| 			name: "filter without EnqueueExtensions plugin", | 			name: "filter without EnqueueExtensions plugin", | ||||||
| @@ -875,10 +876,10 @@ func Test_UnionedGVKs(t *testing.T) { | |||||||
| 			want: map[framework.GVK]framework.ActionType{}, | 			want: map[framework.GVK]framework.ActionType{}, | ||||||
| 		}, | 		}, | ||||||
| 		{ | 		{ | ||||||
| 			name:    "plugins with default profile", | 			name:    "plugins with default profile (InPlacePodVerticalScaling: disabled)", | ||||||
| 			plugins: schedulerapi.PluginSet{Enabled: defaults.PluginsV1.MultiPoint.Enabled}, | 			plugins: schedulerapi.PluginSet{Enabled: defaults.PluginsV1.MultiPoint.Enabled}, | ||||||
| 			want: map[framework.GVK]framework.ActionType{ | 			want: map[framework.GVK]framework.ActionType{ | ||||||
| 				framework.Pod:                   framework.All, | 				framework.Pod:                   framework.Add | framework.UpdatePodLabel | framework.Delete, | ||||||
| 				framework.Node:                  framework.All, | 				framework.Node:                  framework.All, | ||||||
| 				framework.CSINode:               framework.All - framework.Delete, | 				framework.CSINode:               framework.All - framework.Delete, | ||||||
| 				framework.CSIDriver:             framework.All - framework.Delete, | 				framework.CSIDriver:             framework.All - framework.Delete, | ||||||
| @@ -888,9 +889,26 @@ func Test_UnionedGVKs(t *testing.T) { | |||||||
| 				framework.StorageClass:          framework.All - framework.Delete, | 				framework.StorageClass:          framework.All - framework.Delete, | ||||||
| 			}, | 			}, | ||||||
| 		}, | 		}, | ||||||
|  | 		{ | ||||||
|  | 			name:    "plugins with default profile (InPlacePodVerticalScaling: enabled)", | ||||||
|  | 			plugins: schedulerapi.PluginSet{Enabled: defaults.PluginsV1.MultiPoint.Enabled}, | ||||||
|  | 			want: map[framework.GVK]framework.ActionType{ | ||||||
|  | 				framework.Pod:                   framework.Add | framework.UpdatePodLabel | framework.UpdatePodRequest | framework.Delete, | ||||||
|  | 				framework.Node:                  framework.All, | ||||||
|  | 				framework.CSINode:               framework.All - framework.Delete, | ||||||
|  | 				framework.CSIDriver:             framework.All - framework.Delete, | ||||||
|  | 				framework.CSIStorageCapacity:    framework.All - framework.Delete, | ||||||
|  | 				framework.PersistentVolume:      framework.All - framework.Delete, | ||||||
|  | 				framework.PersistentVolumeClaim: framework.All - framework.Delete, | ||||||
|  | 				framework.StorageClass:          framework.All - framework.Delete, | ||||||
|  | 			}, | ||||||
|  | 			enableInPlacePodVerticalScaling: true, | ||||||
|  | 		}, | ||||||
| 	} | 	} | ||||||
| 	for _, tt := range tests { | 	for _, tt := range tests { | ||||||
| 		t.Run(tt.name, func(t *testing.T) { | 		t.Run(tt.name, func(t *testing.T) { | ||||||
|  | 			featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.InPlacePodVerticalScaling, tt.enableInPlacePodVerticalScaling) | ||||||
|  |  | ||||||
| 			_, ctx := ktesting.NewTestContext(t) | 			_, ctx := ktesting.NewTestContext(t) | ||||||
| 			ctx, cancel := context.WithCancel(ctx) | 			ctx, cancel := context.WithCancel(ctx) | ||||||
| 			defer cancel() | 			defer cancel() | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user
	 Kensei Nakada
					Kensei Nakada