api: resource.k8s.io PodScheduling -> PodSchedulingContext
The name "PodScheduling" was unusual because in contrast to most other names, it was impossible to put an article in front of it. Now PodSchedulingContext is used instead.
This commit is contained in:
@@ -74,14 +74,14 @@ type stateData struct {
|
||||
// protected by the mutex. Used by PostFilter.
|
||||
unavailableClaims sets.Int
|
||||
|
||||
// A pointer to the PodScheduling object for the pod, if one exists.
|
||||
// A pointer to the PodSchedulingContext object for the pod, if one exists.
|
||||
// Gets set on demand.
|
||||
//
|
||||
// Conceptually, this object belongs into the scheduler framework
|
||||
// where it might get shared by different plugins. But in practice,
|
||||
// it is currently only used by dynamic provisioning and thus
|
||||
// managed entirely here.
|
||||
podScheduling *resourcev1alpha2.PodScheduling
|
||||
schedulingCtx *resourcev1alpha2.PodSchedulingContext
|
||||
|
||||
// podSchedulingDirty is true if the current copy was locally modified.
|
||||
podSchedulingDirty bool
|
||||
@@ -112,23 +112,23 @@ func (d *stateData) updateClaimStatus(ctx context.Context, clientset kubernetes.
|
||||
return nil
|
||||
}
|
||||
|
||||
// initializePodScheduling can be called concurrently. It returns an existing PodScheduling
|
||||
// initializePodSchedulingContext can be called concurrently. It returns an existing PodSchedulingContext
|
||||
// object if there is one already, retrieves one if not, or as a last resort creates
|
||||
// one from scratch.
|
||||
func (d *stateData) initializePodScheduling(ctx context.Context, pod *v1.Pod, podSchedulingLister resourcev1alpha2listers.PodSchedulingLister) (*resourcev1alpha2.PodScheduling, error) {
|
||||
// TODO (#113701): check if this mutex locking can be avoided by calling initializePodScheduling during PreFilter.
|
||||
func (d *stateData) initializePodSchedulingContexts(ctx context.Context, pod *v1.Pod, podSchedulingContextLister resourcev1alpha2listers.PodSchedulingContextLister) (*resourcev1alpha2.PodSchedulingContext, error) {
|
||||
// TODO (#113701): check if this mutex locking can be avoided by calling initializePodSchedulingContext during PreFilter.
|
||||
d.mutex.Lock()
|
||||
defer d.mutex.Unlock()
|
||||
|
||||
if d.podScheduling != nil {
|
||||
return d.podScheduling, nil
|
||||
if d.schedulingCtx != nil {
|
||||
return d.schedulingCtx, nil
|
||||
}
|
||||
|
||||
podScheduling, err := podSchedulingLister.PodSchedulings(pod.Namespace).Get(pod.Name)
|
||||
schedulingCtx, err := podSchedulingContextLister.PodSchedulingContexts(pod.Namespace).Get(pod.Name)
|
||||
switch {
|
||||
case apierrors.IsNotFound(err):
|
||||
controller := true
|
||||
podScheduling = &resourcev1alpha2.PodScheduling{
|
||||
schedulingCtx = &resourcev1alpha2.PodSchedulingContext{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: pod.Name,
|
||||
Namespace: pod.Namespace,
|
||||
@@ -148,56 +148,56 @@ func (d *stateData) initializePodScheduling(ctx context.Context, pod *v1.Pod, po
|
||||
return nil, err
|
||||
default:
|
||||
// We have an object, but it might be obsolete.
|
||||
if !metav1.IsControlledBy(podScheduling, pod) {
|
||||
return nil, fmt.Errorf("PodScheduling object with UID %s is not owned by Pod %s/%s", podScheduling.UID, pod.Namespace, pod.Name)
|
||||
if !metav1.IsControlledBy(schedulingCtx, pod) {
|
||||
return nil, fmt.Errorf("PodSchedulingContext object with UID %s is not owned by Pod %s/%s", schedulingCtx.UID, pod.Namespace, pod.Name)
|
||||
}
|
||||
}
|
||||
d.podScheduling = podScheduling
|
||||
return podScheduling, err
|
||||
d.schedulingCtx = schedulingCtx
|
||||
return schedulingCtx, err
|
||||
}
|
||||
|
||||
// publishPodScheduling creates or updates the PodScheduling object.
|
||||
func (d *stateData) publishPodScheduling(ctx context.Context, clientset kubernetes.Interface, podScheduling *resourcev1alpha2.PodScheduling) error {
|
||||
// publishPodSchedulingContext creates or updates the PodSchedulingContext object.
|
||||
func (d *stateData) publishPodSchedulingContexts(ctx context.Context, clientset kubernetes.Interface, schedulingCtx *resourcev1alpha2.PodSchedulingContext) error {
|
||||
d.mutex.Lock()
|
||||
defer d.mutex.Unlock()
|
||||
|
||||
var err error
|
||||
logger := klog.FromContext(ctx)
|
||||
msg := "Updating PodScheduling"
|
||||
if podScheduling.UID == "" {
|
||||
msg = "Creating PodScheduling"
|
||||
msg := "Updating PodSchedulingContext"
|
||||
if schedulingCtx.UID == "" {
|
||||
msg = "Creating PodSchedulingContext"
|
||||
}
|
||||
if loggerV := logger.V(6); loggerV.Enabled() {
|
||||
// At a high enough log level, dump the entire object.
|
||||
loggerV.Info(msg, "podschedulingDump", podScheduling)
|
||||
loggerV.Info(msg, "podSchedulingCtxDump", schedulingCtx)
|
||||
} else {
|
||||
logger.V(5).Info(msg, "podscheduling", klog.KObj(podScheduling))
|
||||
logger.V(5).Info(msg, "podSchedulingCtx", klog.KObj(schedulingCtx))
|
||||
}
|
||||
if podScheduling.UID == "" {
|
||||
podScheduling, err = clientset.ResourceV1alpha2().PodSchedulings(podScheduling.Namespace).Create(ctx, podScheduling, metav1.CreateOptions{})
|
||||
if schedulingCtx.UID == "" {
|
||||
schedulingCtx, err = clientset.ResourceV1alpha2().PodSchedulingContexts(schedulingCtx.Namespace).Create(ctx, schedulingCtx, metav1.CreateOptions{})
|
||||
} else {
|
||||
// TODO (#113700): patch here to avoid racing with drivers which update the status.
|
||||
podScheduling, err = clientset.ResourceV1alpha2().PodSchedulings(podScheduling.Namespace).Update(ctx, podScheduling, metav1.UpdateOptions{})
|
||||
schedulingCtx, err = clientset.ResourceV1alpha2().PodSchedulingContexts(schedulingCtx.Namespace).Update(ctx, schedulingCtx, metav1.UpdateOptions{})
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
d.podScheduling = podScheduling
|
||||
d.schedulingCtx = schedulingCtx
|
||||
d.podSchedulingDirty = false
|
||||
return nil
|
||||
}
|
||||
|
||||
// storePodScheduling replaces the pod scheduling object in the state.
|
||||
func (d *stateData) storePodScheduling(podScheduling *resourcev1alpha2.PodScheduling) {
|
||||
// storePodSchedulingContext replaces the pod schedulingCtx object in the state.
|
||||
func (d *stateData) storePodSchedulingContexts(schedulingCtx *resourcev1alpha2.PodSchedulingContext) {
|
||||
d.mutex.Lock()
|
||||
defer d.mutex.Unlock()
|
||||
|
||||
d.podScheduling = podScheduling
|
||||
d.schedulingCtx = schedulingCtx
|
||||
d.podSchedulingDirty = true
|
||||
}
|
||||
|
||||
func statusForClaim(podScheduling *resourcev1alpha2.PodScheduling, podClaimName string) *resourcev1alpha2.ResourceClaimSchedulingStatus {
|
||||
for _, status := range podScheduling.Status.ResourceClaims {
|
||||
func statusForClaim(schedulingCtx *resourcev1alpha2.PodSchedulingContext, podClaimName string) *resourcev1alpha2.ResourceClaimSchedulingStatus {
|
||||
for _, status := range schedulingCtx.Status.ResourceClaims {
|
||||
if status.Name == podClaimName {
|
||||
return &status
|
||||
}
|
||||
@@ -207,11 +207,11 @@ func statusForClaim(podScheduling *resourcev1alpha2.PodScheduling, podClaimName
|
||||
|
||||
// dynamicResources is a plugin that ensures that ResourceClaims are allocated.
|
||||
type dynamicResources struct {
|
||||
enabled bool
|
||||
clientset kubernetes.Interface
|
||||
claimLister resourcev1alpha2listers.ResourceClaimLister
|
||||
classLister resourcev1alpha2listers.ResourceClassLister
|
||||
podSchedulingLister resourcev1alpha2listers.PodSchedulingLister
|
||||
enabled bool
|
||||
clientset kubernetes.Interface
|
||||
claimLister resourcev1alpha2listers.ResourceClaimLister
|
||||
classLister resourcev1alpha2listers.ResourceClassLister
|
||||
podSchedulingContextLister resourcev1alpha2listers.PodSchedulingContextLister
|
||||
}
|
||||
|
||||
// New initializes a new plugin and returns it.
|
||||
@@ -222,11 +222,11 @@ func New(plArgs runtime.Object, fh framework.Handle, fts feature.Features) (fram
|
||||
}
|
||||
|
||||
return &dynamicResources{
|
||||
enabled: true,
|
||||
clientset: fh.ClientSet(),
|
||||
claimLister: fh.SharedInformerFactory().Resource().V1alpha2().ResourceClaims().Lister(),
|
||||
classLister: fh.SharedInformerFactory().Resource().V1alpha2().ResourceClasses().Lister(),
|
||||
podSchedulingLister: fh.SharedInformerFactory().Resource().V1alpha2().PodSchedulings().Lister(),
|
||||
enabled: true,
|
||||
clientset: fh.ClientSet(),
|
||||
claimLister: fh.SharedInformerFactory().Resource().V1alpha2().ResourceClaims().Lister(),
|
||||
classLister: fh.SharedInformerFactory().Resource().V1alpha2().ResourceClasses().Lister(),
|
||||
podSchedulingContextLister: fh.SharedInformerFactory().Resource().V1alpha2().PodSchedulingContexts().Lister(),
|
||||
}, nil
|
||||
}
|
||||
|
||||
@@ -257,7 +257,7 @@ func (pl *dynamicResources) EventsToRegister() []framework.ClusterEvent {
|
||||
// may be schedulable.
|
||||
// TODO (#113702): can we change this so that such an event does not trigger *all* pods?
|
||||
// Yes: https://github.com/kubernetes/kubernetes/blob/abcbaed0784baf5ed2382aae9705a8918f2daa18/pkg/scheduler/eventhandlers.go#L70
|
||||
{Resource: framework.PodScheduling, ActionType: framework.Add | framework.Update},
|
||||
{Resource: framework.PodSchedulingContext, ActionType: framework.Add | framework.Update},
|
||||
// A resource might depend on node labels for topology filtering.
|
||||
// A new or updated node may make pods schedulable.
|
||||
{Resource: framework.Node, ActionType: framework.Add | framework.UpdateNodeLabel},
|
||||
@@ -436,11 +436,11 @@ func (pl *dynamicResources) Filter(ctx context.Context, cs *framework.CycleState
|
||||
}
|
||||
|
||||
// Now we need information from drivers.
|
||||
podScheduling, err := state.initializePodScheduling(ctx, pod, pl.podSchedulingLister)
|
||||
schedulingCtx, err := state.initializePodSchedulingContexts(ctx, pod, pl.podSchedulingContextLister)
|
||||
if err != nil {
|
||||
return statusError(logger, err)
|
||||
}
|
||||
status := statusForClaim(podScheduling, pod.Spec.ResourceClaims[index].Name)
|
||||
status := statusForClaim(schedulingCtx, pod.Spec.ResourceClaims[index].Name)
|
||||
if status != nil {
|
||||
for _, unsuitableNode := range status.UnsuitableNodes {
|
||||
if node.Name == unsuitableNode {
|
||||
@@ -530,7 +530,7 @@ func (pl *dynamicResources) PreScore(ctx context.Context, cs *framework.CycleSta
|
||||
}
|
||||
|
||||
logger := klog.FromContext(ctx)
|
||||
podScheduling, err := state.initializePodScheduling(ctx, pod, pl.podSchedulingLister)
|
||||
schedulingCtx, err := state.initializePodSchedulingContexts(ctx, pod, pl.podSchedulingContextLister)
|
||||
if err != nil {
|
||||
return statusError(logger, err)
|
||||
}
|
||||
@@ -540,22 +540,22 @@ func (pl *dynamicResources) PreScore(ctx context.Context, cs *framework.CycleSta
|
||||
pending = true
|
||||
}
|
||||
}
|
||||
if pending && !haveAllNodes(podScheduling.Spec.PotentialNodes, nodes) {
|
||||
if pending && !haveAllNodes(schedulingCtx.Spec.PotentialNodes, nodes) {
|
||||
// Remember the potential nodes. The object will get created or
|
||||
// updated in Reserve. This is both an optimization and
|
||||
// covers the case that PreScore doesn't get called when there
|
||||
// is only a single node.
|
||||
logger.V(5).Info("remembering potential nodes", "pod", klog.KObj(pod), "potentialnodes", klog.KObjSlice(nodes))
|
||||
podScheduling = podScheduling.DeepCopy()
|
||||
schedulingCtx = schedulingCtx.DeepCopy()
|
||||
numNodes := len(nodes)
|
||||
if numNodes > resourcev1alpha2.PodSchedulingNodeListMaxSize {
|
||||
numNodes = resourcev1alpha2.PodSchedulingNodeListMaxSize
|
||||
}
|
||||
podScheduling.Spec.PotentialNodes = make([]string, 0, numNodes)
|
||||
schedulingCtx.Spec.PotentialNodes = make([]string, 0, numNodes)
|
||||
if numNodes == len(nodes) {
|
||||
// Copy all node names.
|
||||
for _, node := range nodes {
|
||||
podScheduling.Spec.PotentialNodes = append(podScheduling.Spec.PotentialNodes, node.Name)
|
||||
schedulingCtx.Spec.PotentialNodes = append(schedulingCtx.Spec.PotentialNodes, node.Name)
|
||||
}
|
||||
} else {
|
||||
// Select a random subset of the nodes to comply with
|
||||
@@ -567,14 +567,14 @@ func (pl *dynamicResources) PreScore(ctx context.Context, cs *framework.CycleSta
|
||||
nodeNames[node.Name] = struct{}{}
|
||||
}
|
||||
for nodeName := range nodeNames {
|
||||
if len(podScheduling.Spec.PotentialNodes) >= resourcev1alpha2.PodSchedulingNodeListMaxSize {
|
||||
if len(schedulingCtx.Spec.PotentialNodes) >= resourcev1alpha2.PodSchedulingNodeListMaxSize {
|
||||
break
|
||||
}
|
||||
podScheduling.Spec.PotentialNodes = append(podScheduling.Spec.PotentialNodes, nodeName)
|
||||
schedulingCtx.Spec.PotentialNodes = append(schedulingCtx.Spec.PotentialNodes, nodeName)
|
||||
}
|
||||
}
|
||||
sort.Strings(podScheduling.Spec.PotentialNodes)
|
||||
state.storePodScheduling(podScheduling)
|
||||
sort.Strings(schedulingCtx.Spec.PotentialNodes)
|
||||
state.storePodSchedulingContexts(schedulingCtx)
|
||||
}
|
||||
logger.V(5).Info("all potential nodes already set", "pod", klog.KObj(pod), "potentialnodes", nodes)
|
||||
return nil
|
||||
@@ -614,7 +614,7 @@ func (pl *dynamicResources) Reserve(ctx context.Context, cs *framework.CycleStat
|
||||
numDelayedAllocationPending := 0
|
||||
numClaimsWithStatusInfo := 0
|
||||
logger := klog.FromContext(ctx)
|
||||
podScheduling, err := state.initializePodScheduling(ctx, pod, pl.podSchedulingLister)
|
||||
schedulingCtx, err := state.initializePodSchedulingContexts(ctx, pod, pl.podSchedulingContextLister)
|
||||
if err != nil {
|
||||
return statusError(logger, err)
|
||||
}
|
||||
@@ -639,7 +639,7 @@ func (pl *dynamicResources) Reserve(ctx context.Context, cs *framework.CycleStat
|
||||
return statusError(logger, err)
|
||||
}
|
||||
// If we get here, we know that reserving the claim for
|
||||
// the pod worked and we can proceed with scheduling
|
||||
// the pod worked and we can proceed with schedulingCtx
|
||||
// it.
|
||||
} else {
|
||||
// Must be delayed allocation.
|
||||
@@ -647,7 +647,7 @@ func (pl *dynamicResources) Reserve(ctx context.Context, cs *framework.CycleStat
|
||||
|
||||
// Did the driver provide information that steered node
|
||||
// selection towards a node that it can support?
|
||||
if statusForClaim(podScheduling, pod.Spec.ResourceClaims[index].Name) != nil {
|
||||
if statusForClaim(schedulingCtx, pod.Spec.ResourceClaims[index].Name) != nil {
|
||||
numClaimsWithStatusInfo++
|
||||
}
|
||||
}
|
||||
@@ -659,13 +659,13 @@ func (pl *dynamicResources) Reserve(ctx context.Context, cs *framework.CycleStat
|
||||
}
|
||||
|
||||
podSchedulingDirty := state.podSchedulingDirty
|
||||
if len(podScheduling.Spec.PotentialNodes) == 0 {
|
||||
if len(schedulingCtx.Spec.PotentialNodes) == 0 {
|
||||
// PreScore was not called, probably because there was
|
||||
// only one candidate. We need to ask whether that
|
||||
// node is suitable, otherwise the scheduler will pick
|
||||
// it forever even when it cannot satisfy the claim.
|
||||
podScheduling = podScheduling.DeepCopy()
|
||||
podScheduling.Spec.PotentialNodes = []string{nodeName}
|
||||
schedulingCtx = schedulingCtx.DeepCopy()
|
||||
schedulingCtx.Spec.PotentialNodes = []string{nodeName}
|
||||
logger.V(5).Info("asking for information about single potential node", "pod", klog.KObj(pod), "node", klog.ObjectRef{Name: nodeName})
|
||||
podSchedulingDirty = true
|
||||
}
|
||||
@@ -675,16 +675,16 @@ func (pl *dynamicResources) Reserve(ctx context.Context, cs *framework.CycleStat
|
||||
// the driver yet. Otherwise we wait for information before blindly
|
||||
// making a decision that might have to be reversed later.
|
||||
if numDelayedAllocationPending == 1 || numClaimsWithStatusInfo == numDelayedAllocationPending {
|
||||
podScheduling = podScheduling.DeepCopy()
|
||||
schedulingCtx = schedulingCtx.DeepCopy()
|
||||
// TODO: can we increase the chance that the scheduler picks
|
||||
// the same node as before when allocation is on-going,
|
||||
// assuming that that node still fits the pod? Picking a
|
||||
// different node may lead to some claims being allocated for
|
||||
// one node and others for another, which then would have to be
|
||||
// resolved with deallocation.
|
||||
podScheduling.Spec.SelectedNode = nodeName
|
||||
schedulingCtx.Spec.SelectedNode = nodeName
|
||||
logger.V(5).Info("start allocation", "pod", klog.KObj(pod), "node", klog.ObjectRef{Name: nodeName})
|
||||
if err := state.publishPodScheduling(ctx, pl.clientset, podScheduling); err != nil {
|
||||
if err := state.publishPodSchedulingContexts(ctx, pl.clientset, schedulingCtx); err != nil {
|
||||
return statusError(logger, err)
|
||||
}
|
||||
return statusUnschedulable(logger, "waiting for resource driver to allocate resource", "pod", klog.KObj(pod), "node", klog.ObjectRef{Name: nodeName})
|
||||
@@ -692,14 +692,14 @@ func (pl *dynamicResources) Reserve(ctx context.Context, cs *framework.CycleStat
|
||||
|
||||
// May have been modified earlier in PreScore or above.
|
||||
if podSchedulingDirty {
|
||||
if err := state.publishPodScheduling(ctx, pl.clientset, podScheduling); err != nil {
|
||||
if err := state.publishPodSchedulingContexts(ctx, pl.clientset, schedulingCtx); err != nil {
|
||||
return statusError(logger, err)
|
||||
}
|
||||
}
|
||||
|
||||
// More than one pending claim and not enough information about all of them.
|
||||
//
|
||||
// TODO: can or should we ensure that scheduling gets aborted while
|
||||
// TODO: can or should we ensure that schedulingCtx gets aborted while
|
||||
// waiting for resources *before* triggering delayed volume
|
||||
// provisioning? On the one hand, volume provisioning is currently
|
||||
// irreversible, so it better should come last. On the other hand,
|
||||
@@ -737,7 +737,7 @@ func (pl *dynamicResources) Unreserve(ctx context.Context, cs *framework.CycleSt
|
||||
claim.Status.ReservedFor = reservedFor
|
||||
logger.V(5).Info("unreserve", "resourceclaim", klog.KObj(claim))
|
||||
if err := state.updateClaimStatus(ctx, pl.clientset, index, claim); err != nil {
|
||||
// We will get here again when pod scheduling
|
||||
// We will get here again when pod schedulingCtx
|
||||
// is retried.
|
||||
logger.Error(err, "unreserve", "resourceclaim", klog.KObj(claim))
|
||||
}
|
||||
@@ -746,7 +746,7 @@ func (pl *dynamicResources) Unreserve(ctx context.Context, cs *framework.CycleSt
|
||||
}
|
||||
|
||||
// PostBind is called after a pod is successfully bound to a node. Now we are
|
||||
// sure that a PodScheduling object, if it exists, is definitely not going to
|
||||
// sure that a PodSchedulingContext object, if it exists, is definitely not going to
|
||||
// be needed anymore and can delete it. This is a one-shot thing, there won't
|
||||
// be any retries. This is okay because it should usually work and in those
|
||||
// cases where it doesn't, the garbage collector will eventually clean up.
|
||||
@@ -762,19 +762,19 @@ func (pl *dynamicResources) PostBind(ctx context.Context, cs *framework.CycleSta
|
||||
return
|
||||
}
|
||||
|
||||
// We cannot know for sure whether the PodScheduling object exists. We
|
||||
// might have created it in the previous pod scheduling cycle and not
|
||||
// We cannot know for sure whether the PodSchedulingContext object exists. We
|
||||
// might have created it in the previous pod schedulingCtx cycle and not
|
||||
// have it in our informer cache yet. Let's try to delete, just to be
|
||||
// on the safe side.
|
||||
logger := klog.FromContext(ctx)
|
||||
err = pl.clientset.ResourceV1alpha2().PodSchedulings(pod.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{})
|
||||
err = pl.clientset.ResourceV1alpha2().PodSchedulingContexts(pod.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{})
|
||||
switch {
|
||||
case apierrors.IsNotFound(err):
|
||||
logger.V(5).Info("no PodScheduling object to delete")
|
||||
logger.V(5).Info("no PodSchedulingContext object to delete")
|
||||
case err != nil:
|
||||
logger.Error(err, "delete PodScheduling")
|
||||
logger.Error(err, "delete PodSchedulingContext")
|
||||
default:
|
||||
logger.V(5).Info("PodScheduling object deleted")
|
||||
logger.V(5).Info("PodSchedulingContext object deleted")
|
||||
}
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user