podtopologyspread: scheduler queueing hints
This commit is contained in:
@@ -19,18 +19,22 @@ package podtopologyspread
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"reflect"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/client-go/informers"
|
||||
appslisters "k8s.io/client-go/listers/apps/v1"
|
||||
corelisters "k8s.io/client-go/listers/core/v1"
|
||||
"k8s.io/klog/v2"
|
||||
"k8s.io/kubernetes/pkg/scheduler/apis/config"
|
||||
"k8s.io/kubernetes/pkg/scheduler/apis/config/validation"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/parallelize"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/names"
|
||||
"k8s.io/kubernetes/pkg/scheduler/util"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -141,9 +145,137 @@ func (pl *PodTopologySpread) EventsToRegister() []framework.ClusterEventWithHint
|
||||
// an unschedulable Pod schedulable.
|
||||
// - Delete. An unschedulable Pod may fail due to violating an existing Pod's topology spread constraints,
|
||||
// deleting an existing Pod may make it schedulable.
|
||||
{Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.All}},
|
||||
// Node add|delete|updateLabel maybe lead an topology key changed,
|
||||
{Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.All}, QueueingHintFn: pl.isSchedulableAfterPodChange},
|
||||
// Node add|delete|update maybe lead an topology key changed,
|
||||
// and make these pod in scheduling schedulable or unschedulable.
|
||||
{Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.Delete | framework.UpdateNodeLabel}},
|
||||
{Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.Delete | framework.Update}, QueueingHintFn: pl.isSchedulableAfterNodeChange},
|
||||
}
|
||||
}
|
||||
|
||||
func involvedInTopologySpreading(incomingPod, podWithSpreading *v1.Pod) bool {
|
||||
return incomingPod.Spec.NodeName != "" && incomingPod.Namespace == podWithSpreading.Namespace
|
||||
}
|
||||
|
||||
func (pl *PodTopologySpread) isSchedulableAfterPodChange(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) {
|
||||
originalPod, modifiedPod, err := util.As[*v1.Pod](oldObj, newObj)
|
||||
if err != nil {
|
||||
return framework.Queue, err
|
||||
}
|
||||
|
||||
if (modifiedPod != nil && !involvedInTopologySpreading(modifiedPod, pod)) || (originalPod != nil && !involvedInTopologySpreading(originalPod, pod)) {
|
||||
logger.V(5).Info("the added/updated/deleted pod is unscheduled or has different namespace with target pod, so it doesn't make the target pod schedulable",
|
||||
"pod", klog.KObj(pod), "originalPod", klog.KObj(originalPod))
|
||||
return framework.QueueSkip, nil
|
||||
}
|
||||
|
||||
constraints, err := pl.getConstraints(pod)
|
||||
if err != nil {
|
||||
return framework.Queue, err
|
||||
}
|
||||
|
||||
// Pod is modified. Return Queue when the label(s) matching topologySpread's selector is added, changed, or deleted.
|
||||
if modifiedPod != nil && originalPod != nil {
|
||||
if reflect.DeepEqual(modifiedPod.Labels, originalPod.Labels) {
|
||||
logger.V(5).Info("the updated pod is unscheduled or has no updated labels or has different namespace with target pod, so it doesn't make the target pod schedulable",
|
||||
"pod", klog.KObj(pod), "modifiedPod", klog.KObj(modifiedPod))
|
||||
return framework.QueueSkip, nil
|
||||
}
|
||||
for _, c := range constraints {
|
||||
if c.Selector.Matches(labels.Set(originalPod.Labels)) != c.Selector.Matches(labels.Set(modifiedPod.Labels)) {
|
||||
// This modification makes this Pod match(or not match) with this constraint.
|
||||
// Maybe now the scheduling result of topology spread gets changed by this change.
|
||||
logger.V(5).Info("a scheduled pod's label was updated and it makes the updated pod match or unmatch the pod's topology spread constraints",
|
||||
"pod", klog.KObj(pod), "modifiedPod", klog.KObj(modifiedPod))
|
||||
return framework.Queue, nil
|
||||
}
|
||||
}
|
||||
// This modification of labels doesn't change whether this Pod would match selector or not in any constraints.
|
||||
logger.V(5).Info("a scheduled pod's label was updated, but it's a change unrelated to the pod's topology spread constraints",
|
||||
"pod", klog.KObj(pod), "modifiedPod", klog.KObj(modifiedPod))
|
||||
return framework.QueueSkip, nil
|
||||
}
|
||||
|
||||
// Pod is added. Return Queue when the added Pod has a label that matches with topologySpread's selector.
|
||||
if originalPod != nil {
|
||||
if podLabelsMatchSpreadConstraints(constraints, originalPod.Labels) {
|
||||
logger.V(5).Info("a scheduled pod was created and it matches with the pod's topology spread constraints",
|
||||
"pod", klog.KObj(pod), "createdPod", klog.KObj(originalPod))
|
||||
return framework.Queue, nil
|
||||
}
|
||||
logger.V(5).Info("a scheduled pod was created, but it doesn't matches with the pod's topology spread constraints",
|
||||
"pod", klog.KObj(pod), "createdPod", klog.KObj(originalPod))
|
||||
return framework.QueueSkip, nil
|
||||
}
|
||||
|
||||
// Pod is deleted. Return Queue when the deleted Pod has a label that matches with topologySpread's selector.
|
||||
if podLabelsMatchSpreadConstraints(constraints, modifiedPod.Labels) {
|
||||
logger.V(5).Info("a scheduled pod which matches with the pod's topology spread constraints was deleted, and the pod may be schedulable now",
|
||||
"pod", klog.KObj(pod), "deletedPod", klog.KObj(modifiedPod))
|
||||
return framework.Queue, nil
|
||||
}
|
||||
logger.V(5).Info("a scheduled pod was deleted, but it's unrelated to the pod's topology spread constraints",
|
||||
"pod", klog.KObj(pod), "deletedPod", klog.KObj(modifiedPod))
|
||||
|
||||
return framework.QueueSkip, nil
|
||||
}
|
||||
|
||||
// getConstraints extracts topologySpreadConstraint(s) from the Pod spec.
|
||||
// If the Pod doesn't have any topologySpreadConstraint, it returns default constraints.
|
||||
func (pl *PodTopologySpread) getConstraints(pod *v1.Pod) ([]topologySpreadConstraint, error) {
|
||||
var constraints []topologySpreadConstraint
|
||||
var err error
|
||||
if len(pod.Spec.TopologySpreadConstraints) > 0 {
|
||||
// We have feature gating in APIServer to strip the spec
|
||||
// so don't need to re-check feature gate, just check length of Constraints.
|
||||
constraints, err = pl.filterTopologySpreadConstraints(
|
||||
pod.Spec.TopologySpreadConstraints,
|
||||
pod.Labels,
|
||||
v1.DoNotSchedule,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("obtaining pod's hard topology spread constraints: %w", err)
|
||||
}
|
||||
} else {
|
||||
constraints, err = pl.buildDefaultConstraints(pod, v1.DoNotSchedule)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("setting default hard topology spread constraints: %w", err)
|
||||
}
|
||||
}
|
||||
return constraints, nil
|
||||
}
|
||||
|
||||
func (pl *PodTopologySpread) isSchedulableAfterNodeChange(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) {
|
||||
originalNode, modifiedNode, err := util.As[*v1.Node](oldObj, newObj)
|
||||
if err != nil {
|
||||
return framework.Queue, err
|
||||
}
|
||||
|
||||
constraints, err := pl.getConstraints(pod)
|
||||
if err != nil {
|
||||
return framework.Queue, err
|
||||
}
|
||||
|
||||
// framework.Add/framework.Update: return Queue when node has topologyKey in its labels, else return QueueSkip.
|
||||
//
|
||||
// TODO: we can filter out node update events in a more fine-grained way once preCheck is completely removed.
|
||||
// See: https://github.com/kubernetes/kubernetes/issues/110175
|
||||
if modifiedNode != nil {
|
||||
if !nodeLabelsMatchSpreadConstraints(modifiedNode.Labels, constraints) {
|
||||
logger.V(5).Info("the created/updated node doesn't match pod topology spread constraints",
|
||||
"pod", klog.KObj(pod), "node", klog.KObj(modifiedNode))
|
||||
return framework.QueueSkip, nil
|
||||
}
|
||||
logger.V(5).Info("node that match topology spread constraints was created/updated, and the pod may be schedulable now",
|
||||
"pod", klog.KObj(pod), "node", klog.KObj(modifiedNode))
|
||||
return framework.Queue, nil
|
||||
}
|
||||
|
||||
// framework.Delete: return Queue when node has topologyKey in its labels, else return QueueSkip.
|
||||
if !nodeLabelsMatchSpreadConstraints(originalNode.Labels, constraints) {
|
||||
logger.V(5).Info("the deleted node doesn't match pod topology spread constraints", "pod", klog.KObj(pod), "node", klog.KObj(originalNode))
|
||||
return framework.QueueSkip, nil
|
||||
}
|
||||
logger.V(5).Info("node that match topology spread constraints was deleted, and the pod may be schedulable now",
|
||||
"pod", klog.KObj(pod), "node", klog.KObj(originalNode))
|
||||
return framework.Queue, nil
|
||||
}
|
||||
|
Reference in New Issue
Block a user