DRA scheduler: adapt to v1alpha3 API

The structured parameter allocation logic was written from scratch in
staging/src/k8s.io/dynamic-resource-allocation/structured where it might be
useful for out-of-tree components.

Besides the new features (amount, admin access) and API it now supports
backtracking when the initial device selection doesn't lead to a complete
allocation of all claims.

Co-authored-by: Ed Bartosh <eduard.bartosh@intel.com>
Co-authored-by: John Belamaric <jbelamaric@google.com>
This commit is contained in:
Patrick Ohly
2024-07-11 16:42:51 +02:00
parent 0fc78b9bcc
commit 599fe605f9
31 changed files with 2472 additions and 3115 deletions

View File

@@ -38,10 +38,10 @@ import (
resourceapiapply "k8s.io/client-go/applyconfigurations/resource/v1alpha3"
"k8s.io/client-go/kubernetes"
resourcelisters "k8s.io/client-go/listers/resource/v1alpha3"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/retry"
"k8s.io/component-helpers/scheduling/corev1/nodeaffinity"
"k8s.io/dynamic-resource-allocation/resourceclaim"
"k8s.io/dynamic-resource-allocation/structured"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
@@ -56,10 +56,6 @@ const (
Name = names.DynamicResources
stateKey framework.StateKey = Name
// generatedFromIndex is the lookup name for the index function
// which indexes by other resource which generated the parameters object.
generatedFromIndex = "generated-from-index"
)
// The state is initialized in PreFilter phase. Because we save the pointer in
@@ -82,9 +78,8 @@ type stateData struct {
// (if one exists) and the changes made to it.
podSchedulingState podSchedulingState
// resourceModel contains the information about available and allocated resources when using
// structured parameters and the pod needs this information.
resources resources
// Allocator handles claims with structured parameters.
allocator *structured.Allocator
// mutex must be locked while accessing any of the fields below.
mutex sync.Mutex
@@ -99,6 +94,9 @@ type stateData struct {
unavailableClaims sets.Set[int]
informationsForClaim []informationForClaim
// nodeAllocations caches the result of Filter for the nodes.
nodeAllocations map[string][]*resourceapi.AllocationResult
}
func (d *stateData) Clone() framework.StateData {
@@ -106,24 +104,20 @@ func (d *stateData) Clone() framework.StateData {
}
type informationForClaim struct {
// The availableOnNode node filter of the claim converted from the
// v1 API to nodeaffinity.NodeSelector by PreFilter for repeated
// evaluation in Filter. Nil for claim which don't have it.
availableOnNode *nodeaffinity.NodeSelector
// Node selectors based on the claim status (single entry, key is empty) if allocated,
// otherwise the device class AvailableOnNodes selectors (potentially multiple entries,
// key is the device class name).
availableOnNodes map[string]*nodeaffinity.NodeSelector
// The status of the claim got from the
// schedulingCtx by PreFilter for repeated
// evaluation in Filter. Nil for claim which don't have it.
status *resourceapi.ResourceClaimSchedulingStatus
// structuredParameters is true if the claim is handled via the builtin
// controller.
structuredParameters bool
controller *claimController
// Set by Reserved, published by PreBind.
allocation *resourceapi.AllocationResult
allocationDriverName string
allocation *resourceapi.AllocationResult
}
type podSchedulingState struct {
@@ -276,19 +270,9 @@ type dynamicResources struct {
enabled bool
fh framework.Handle
clientset kubernetes.Interface
classLister resourcelisters.ResourceClassLister
classLister resourcelisters.DeviceClassLister
podSchedulingContextLister resourcelisters.PodSchedulingContextLister
claimParametersLister resourcelisters.ResourceClaimParametersLister
classParametersLister resourcelisters.ResourceClassParametersLister
resourceSliceLister resourcelisters.ResourceSliceLister
claimNameLookup *resourceclaim.Lookup
// claimParametersIndexer has the common claimParametersGeneratedFrom indexer installed to
// limit iteration over claimParameters to those of interest.
claimParametersIndexer cache.Indexer
// classParametersIndexer has the common classParametersGeneratedFrom indexer installed to
// limit iteration over classParameters to those of interest.
classParametersIndexer cache.Indexer
sliceLister resourcelisters.ResourceSliceLister
// claimAssumeCache enables temporarily storing a newer claim object
// while the scheduler has allocated it and the corresponding object
@@ -357,61 +341,15 @@ func New(ctx context.Context, plArgs runtime.Object, fh framework.Handle, fts fe
enabled: true,
fh: fh,
clientset: fh.ClientSet(),
classLister: fh.SharedInformerFactory().Resource().V1alpha3().ResourceClasses().Lister(),
classLister: fh.SharedInformerFactory().Resource().V1alpha3().DeviceClasses().Lister(),
podSchedulingContextLister: fh.SharedInformerFactory().Resource().V1alpha3().PodSchedulingContexts().Lister(),
claimParametersLister: fh.SharedInformerFactory().Resource().V1alpha3().ResourceClaimParameters().Lister(),
claimParametersIndexer: fh.SharedInformerFactory().Resource().V1alpha3().ResourceClaimParameters().Informer().GetIndexer(),
classParametersLister: fh.SharedInformerFactory().Resource().V1alpha3().ResourceClassParameters().Lister(),
classParametersIndexer: fh.SharedInformerFactory().Resource().V1alpha3().ResourceClassParameters().Informer().GetIndexer(),
resourceSliceLister: fh.SharedInformerFactory().Resource().V1alpha3().ResourceSlices().Lister(),
claimNameLookup: resourceclaim.NewNameLookup(fh.ClientSet()),
sliceLister: fh.SharedInformerFactory().Resource().V1alpha3().ResourceSlices().Lister(),
claimAssumeCache: fh.ResourceClaimCache(),
}
if err := pl.claimParametersIndexer.AddIndexers(cache.Indexers{generatedFromIndex: claimParametersGeneratedFromIndexFunc}); err != nil {
return nil, fmt.Errorf("add claim parameters cache indexer: %w", err)
}
if err := pl.classParametersIndexer.AddIndexers(cache.Indexers{generatedFromIndex: classParametersGeneratedFromIndexFunc}); err != nil {
return nil, fmt.Errorf("add class parameters cache indexer: %w", err)
}
return pl, nil
}
func claimParametersReferenceKeyFunc(namespace string, ref *resourceapi.ResourceClaimParametersReference) string {
return ref.APIGroup + "/" + ref.Kind + "/" + namespace + "/" + ref.Name
}
// claimParametersGeneratedFromIndexFunc is an index function that returns other resource keys
// (= apiGroup/kind/namespace/name) for ResourceClaimParametersReference in a given claim parameters.
func claimParametersGeneratedFromIndexFunc(obj interface{}) ([]string, error) {
parameters, ok := obj.(*resourceapi.ResourceClaimParameters)
if !ok {
return nil, nil
}
if parameters.GeneratedFrom == nil {
return nil, nil
}
return []string{claimParametersReferenceKeyFunc(parameters.Namespace, parameters.GeneratedFrom)}, nil
}
func classParametersReferenceKeyFunc(ref *resourceapi.ResourceClassParametersReference) string {
return ref.APIGroup + "/" + ref.Kind + "/" + ref.Namespace + "/" + ref.Name
}
// classParametersGeneratedFromIndexFunc is an index function that returns other resource keys
// (= apiGroup/kind/namespace/name) for ResourceClassParametersReference in a given class parameters.
func classParametersGeneratedFromIndexFunc(obj interface{}) ([]string, error) {
parameters, ok := obj.(*resourceapi.ResourceClassParameters)
if !ok {
return nil, nil
}
if parameters.GeneratedFrom == nil {
return nil, nil
}
return []string{classParametersReferenceKeyFunc(parameters.GeneratedFrom)}, nil
}
var _ framework.PreEnqueuePlugin = &dynamicResources{}
var _ framework.PreFilterPlugin = &dynamicResources{}
var _ framework.FilterPlugin = &dynamicResources{}
@@ -435,11 +373,6 @@ func (pl *dynamicResources) EventsToRegister(_ context.Context) ([]framework.Clu
}
events := []framework.ClusterEventWithHint{
// Changes for claim or class parameters creation may make pods
// schedulable which depend on claims using those parameters.
{Event: framework.ClusterEvent{Resource: framework.ResourceClaimParameters, ActionType: framework.Add | framework.Update}, QueueingHintFn: pl.isSchedulableAfterClaimParametersChange},
{Event: framework.ClusterEvent{Resource: framework.ResourceClassParameters, ActionType: framework.Add | framework.Update}, QueueingHintFn: pl.isSchedulableAfterClassParametersChange},
// Allocation is tracked in ResourceClaims, so any changes may make the pods schedulable.
{Event: framework.ClusterEvent{Resource: framework.ResourceClaim, ActionType: framework.Add | framework.Update}, QueueingHintFn: pl.isSchedulableAfterClaimChange},
// When a driver has provided additional information, a pod waiting for that information
@@ -458,7 +391,7 @@ func (pl *dynamicResources) EventsToRegister(_ context.Context) ([]framework.Clu
// See: https://github.com/kubernetes/kubernetes/issues/110175
{Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.UpdateNodeLabel | framework.UpdateNodeTaint}},
// A pod might be waiting for a class to get created or modified.
{Event: framework.ClusterEvent{Resource: framework.ResourceClass, ActionType: framework.Add | framework.Update}},
{Event: framework.ClusterEvent{Resource: framework.DeviceClass, ActionType: framework.Add | framework.Update}},
}
return events, nil
}
@@ -473,149 +406,6 @@ func (pl *dynamicResources) PreEnqueue(ctx context.Context, pod *v1.Pod) (status
return nil
}
// isSchedulableAfterClaimParametersChange is invoked for add and update claim parameters events reported by
// an informer. It checks whether that change made a previously unschedulable
// pod schedulable. It errs on the side of letting a pod scheduling attempt
// happen. The delete claim event will not invoke it, so newObj will never be nil.
func (pl *dynamicResources) isSchedulableAfterClaimParametersChange(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) {
originalParameters, modifiedParameters, err := schedutil.As[*resourceapi.ResourceClaimParameters](oldObj, newObj)
if err != nil {
// Shouldn't happen.
return framework.Queue, fmt.Errorf("unexpected object in isSchedulableAfterClaimParametersChange: %w", err)
}
usesParameters := false
if err := pl.foreachPodResourceClaim(pod, func(_ string, claim *resourceapi.ResourceClaim) {
ref := claim.Spec.ParametersRef
if ref == nil {
return
}
// Using in-tree parameters directly?
if ref.APIGroup == resourceapi.SchemeGroupVersion.Group &&
ref.Kind == "ResourceClaimParameters" {
if modifiedParameters.Name == ref.Name {
usesParameters = true
}
return
}
// Need to look for translated parameters.
generatedFrom := modifiedParameters.GeneratedFrom
if generatedFrom == nil {
return
}
if generatedFrom.APIGroup == ref.APIGroup &&
generatedFrom.Kind == ref.Kind &&
generatedFrom.Name == ref.Name {
usesParameters = true
}
}); err != nil {
// This is not an unexpected error: we know that
// foreachPodResourceClaim only returns errors for "not
// schedulable".
logger.V(4).Info("pod is not schedulable", "pod", klog.KObj(pod), "claim", klog.KObj(modifiedParameters), "reason", err.Error())
return framework.QueueSkip, nil
}
if !usesParameters {
// This were not the parameters the pod was waiting for.
logger.V(6).Info("unrelated claim parameters got modified", "pod", klog.KObj(pod), "claimParameters", klog.KObj(modifiedParameters))
return framework.QueueSkip, nil
}
if originalParameters == nil {
logger.V(4).Info("claim parameters for pod got created", "pod", klog.KObj(pod), "claimParameters", klog.KObj(modifiedParameters))
return framework.Queue, nil
}
// Modifications may or may not be relevant. If the entire
// requests are as before, then something else must have changed
// and we don't care.
if apiequality.Semantic.DeepEqual(&originalParameters.DriverRequests, &modifiedParameters.DriverRequests) {
logger.V(6).Info("claim parameters for pod got modified where the pod doesn't care", "pod", klog.KObj(pod), "claimParameters", klog.KObj(modifiedParameters))
return framework.QueueSkip, nil
}
logger.V(4).Info("requests in claim parameters for pod got updated", "pod", klog.KObj(pod), "claimParameters", klog.KObj(modifiedParameters))
return framework.Queue, nil
}
// isSchedulableAfterClassParametersChange is invoked for add and update class parameters events reported by
// an informer. It checks whether that change made a previously unschedulable
// pod schedulable. It errs on the side of letting a pod scheduling attempt
// happen. The delete class event will not invoke it, so newObj will never be nil.
func (pl *dynamicResources) isSchedulableAfterClassParametersChange(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) {
originalParameters, modifiedParameters, err := schedutil.As[*resourceapi.ResourceClassParameters](oldObj, newObj)
if err != nil {
// Shouldn't happen.
return framework.Queue, fmt.Errorf("unexpected object in isSchedulableAfterClassParametersChange: %w", err)
}
usesParameters := false
if err := pl.foreachPodResourceClaim(pod, func(_ string, claim *resourceapi.ResourceClaim) {
class, err := pl.classLister.Get(claim.Spec.ResourceClassName)
if err != nil {
if !apierrors.IsNotFound(err) {
logger.Error(err, "look up resource class")
}
return
}
ref := class.ParametersRef
if ref == nil {
return
}
// Using in-tree parameters directly?
if ref.APIGroup == resourceapi.SchemeGroupVersion.Group &&
ref.Kind == "ResourceClassParameters" {
if modifiedParameters.Name == ref.Name {
usesParameters = true
}
return
}
// Need to look for translated parameters.
generatedFrom := modifiedParameters.GeneratedFrom
if generatedFrom == nil {
return
}
if generatedFrom.APIGroup == ref.APIGroup &&
generatedFrom.Kind == ref.Kind &&
generatedFrom.Name == ref.Name {
usesParameters = true
}
}); err != nil {
// This is not an unexpected error: we know that
// foreachPodResourceClaim only returns errors for "not
// schedulable".
logger.V(4).Info("pod is not schedulable", "pod", klog.KObj(pod), "classParameters", klog.KObj(modifiedParameters), "reason", err.Error())
return framework.QueueSkip, nil
}
if !usesParameters {
// This were not the parameters the pod was waiting for.
logger.V(6).Info("unrelated class parameters got modified", "pod", klog.KObj(pod), "classParameters", klog.KObj(modifiedParameters))
return framework.QueueSkip, nil
}
if originalParameters == nil {
logger.V(4).Info("class parameters for pod got created", "pod", klog.KObj(pod), "class", klog.KObj(modifiedParameters))
return framework.Queue, nil
}
// Modifications may or may not be relevant. If the entire
// requests are as before, then something else must have changed
// and we don't care.
if apiequality.Semantic.DeepEqual(&originalParameters.Filters, &modifiedParameters.Filters) {
logger.V(6).Info("class parameters for pod got modified where the pod doesn't care", "pod", klog.KObj(pod), "classParameters", klog.KObj(modifiedParameters))
return framework.QueueSkip, nil
}
logger.V(4).Info("filters in class parameters for pod got updated", "pod", klog.KObj(pod), "classParameters", klog.KObj(modifiedParameters))
return framework.Queue, nil
}
// isSchedulableAfterClaimChange is invoked for add and update claim events reported by
// an informer. It checks whether that change made a previously unschedulable
// pod schedulable. It errs on the side of letting a pod scheduling attempt
@@ -641,7 +431,8 @@ func (pl *dynamicResources) isSchedulableAfterClaimChange(logger klog.Logger, po
}
if originalClaim != nil &&
resourceclaim.IsAllocatedWithStructuredParameters(originalClaim) &&
originalClaim.Status.Allocation != nil &&
originalClaim.Status.Allocation.Controller == "" &&
modifiedClaim.Status.Allocation == nil {
// A claim with structured parameters was deallocated. This might have made
// resources available for other pods.
@@ -823,7 +614,7 @@ func (pl *dynamicResources) podResourceClaims(pod *v1.Pod) ([]*resourceapi.Resou
// It calls an optional handler for those claims that it finds.
func (pl *dynamicResources) foreachPodResourceClaim(pod *v1.Pod, cb func(podResourceName string, claim *resourceapi.ResourceClaim)) error {
for _, resource := range pod.Spec.ResourceClaims {
claimName, mustCheckOwner, err := pl.claimNameLookup.Name(pod, &resource)
claimName, mustCheckOwner, err := resourceclaim.Name(pod, &resource)
if err != nil {
return err
}
@@ -892,8 +683,10 @@ func (pl *dynamicResources) PreFilter(ctx context.Context, state *framework.Cycl
return nil, statusError(logger, err)
}
// All claims which the scheduler needs to allocate itself.
allocateClaims := make([]*resourceapi.ResourceClaim, 0, len(claims))
s.informationsForClaim = make([]informationForClaim, len(claims))
needResourceInformation := false
for index, claim := range claims {
if claim.Status.DeallocationRequested {
// This will get resolved by the resource driver.
@@ -907,44 +700,19 @@ func (pl *dynamicResources) PreFilter(ctx context.Context, state *framework.Cycl
}
if claim.Status.Allocation != nil {
if claim.Status.Allocation.AvailableOnNodes != nil {
nodeSelector, err := nodeaffinity.NewNodeSelector(claim.Status.Allocation.AvailableOnNodes)
s.informationsForClaim[index].structuredParameters = claim.Status.Allocation.Controller == ""
if claim.Status.Allocation.NodeSelector != nil {
nodeSelector, err := nodeaffinity.NewNodeSelector(claim.Status.Allocation.NodeSelector)
if err != nil {
return nil, statusError(logger, err)
}
s.informationsForClaim[index].availableOnNode = nodeSelector
s.informationsForClaim[index].availableOnNodes = map[string]*nodeaffinity.NodeSelector{"": nodeSelector}
}
// The claim was allocated by the scheduler if it has the finalizer that is
// reserved for Kubernetes.
s.informationsForClaim[index].structuredParameters = slices.Contains(claim.Finalizers, resourceapi.Finalizer)
} else {
// The ResourceClass might have a node filter. This is
// useful for trimming the initial set of potential
// nodes before we ask the driver(s) for information
// about the specific pod.
class, err := pl.classLister.Get(claim.Spec.ResourceClassName)
if err != nil {
// If the class cannot be retrieved, allocation cannot proceed.
if apierrors.IsNotFound(err) {
// Here we mark the pod as "unschedulable", so it'll sleep in
// the unscheduleable queue until a ResourceClass event occurs.
return nil, statusUnschedulable(logger, fmt.Sprintf("resource class %s does not exist", claim.Spec.ResourceClassName))
}
// Other error, retry with backoff.
return nil, statusError(logger, fmt.Errorf("look up resource class: %v", err))
}
if class.SuitableNodes != nil {
selector, err := nodeaffinity.NewNodeSelector(class.SuitableNodes)
if err != nil {
return nil, statusError(logger, err)
}
s.informationsForClaim[index].availableOnNode = selector
}
s.informationsForClaim[index].status = statusForClaim(s.podSchedulingState.schedulingCtx, pod.Spec.ResourceClaims[index].Name)
if class.StructuredParameters != nil && *class.StructuredParameters {
s.informationsForClaim[index].structuredParameters = true
structuredParameters := claim.Spec.Controller == ""
s.informationsForClaim[index].structuredParameters = structuredParameters
if structuredParameters {
allocateClaims = append(allocateClaims, claim)
// Allocation in flight? Better wait for that
// to finish, see inFlightAllocations
@@ -952,164 +720,93 @@ func (pl *dynamicResources) PreFilter(ctx context.Context, state *framework.Cycl
if _, found := pl.inFlightAllocations.Load(claim.UID); found {
return nil, statusUnschedulable(logger, fmt.Sprintf("resource claim %s is in the process of being allocated", klog.KObj(claim)))
}
} else {
s.informationsForClaim[index].status = statusForClaim(s.podSchedulingState.schedulingCtx, pod.Spec.ResourceClaims[index].Name)
}
// We need the claim and class parameters. If
// they don't exist yet, the pod has to wait.
//
// TODO (https://github.com/kubernetes/kubernetes/issues/123697):
// check this already in foreachPodResourceClaim, together with setting up informationsForClaim.
// Then PreEnqueue will also check for existence of parameters.
classParameters, claimParameters, status := pl.lookupParameters(logger, class, claim)
if status != nil {
return nil, status
// Check all requests and device classes. If a class
// does not exist, scheduling cannot proceed, no matter
// how the claim is being allocated.
//
// When using a control plane controller, a class might
// have a node filter. This is useful for trimming the
// initial set of potential nodes before we ask the
// driver(s) for information about the specific pod.
for _, request := range claim.Spec.Devices.Requests {
if request.DeviceClassName == "" {
return nil, statusError(logger, fmt.Errorf("request %s: unsupported request type", request.Name))
}
controller, err := newClaimController(logger, class, classParameters, claimParameters)
class, err := pl.classLister.Get(request.DeviceClassName)
if err != nil {
return nil, statusError(logger, err)
// If the class cannot be retrieved, allocation cannot proceed.
if apierrors.IsNotFound(err) {
// Here we mark the pod as "unschedulable", so it'll sleep in
// the unscheduleable queue until a DeviceClass event occurs.
return nil, statusUnschedulable(logger, fmt.Sprintf("request %s: device class %s does not exist", request.Name, request.DeviceClassName))
}
// Other error, retry with backoff.
return nil, statusError(logger, fmt.Errorf("request %s: look up device class: %w", request.Name, err))
}
if class.Spec.SuitableNodes != nil && !structuredParameters {
selector, err := nodeaffinity.NewNodeSelector(class.Spec.SuitableNodes)
if err != nil {
return nil, statusError(logger, err)
}
if s.informationsForClaim[index].availableOnNodes == nil {
s.informationsForClaim[index].availableOnNodes = make(map[string]*nodeaffinity.NodeSelector)
}
s.informationsForClaim[index].availableOnNodes[class.Name] = selector
}
s.informationsForClaim[index].controller = controller
needResourceInformation = true
}
}
}
if needResourceInformation {
if len(allocateClaims) > 0 {
logger.V(5).Info("Preparing allocation with structured parameters", "pod", klog.KObj(pod), "resourceclaims", klog.KObjSlice(allocateClaims))
// Doing this over and over again for each pod could be avoided
// by parsing once when creating the plugin and then updating
// that state in informer callbacks. But that would cause
// problems for using the plugin in the Cluster Autoscaler. If
// this step here turns out to be expensive, we may have to
// maintain and update state more persistently.
// by setting the allocator up once and then keeping it up-to-date
// as changes are observed.
//
// But that would cause problems for using the plugin in the
// Cluster Autoscaler. If this step here turns out to be
// expensive, we may have to maintain and update state more
// persistently.
//
// Claims are treated as "allocated" if they are in the assume cache
// or currently their allocation is in-flight.
resources, err := newResourceModel(logger, pl.resourceSliceLister, pl.claimAssumeCache, &pl.inFlightAllocations)
logger.V(5).Info("Resource usage", "resources", klog.Format(resources))
allocator, err := structured.NewAllocator(ctx, allocateClaims, &claimListerForAssumeCache{assumeCache: pl.claimAssumeCache, inFlightAllocations: &pl.inFlightAllocations}, pl.classLister, pl.sliceLister)
if err != nil {
return nil, statusError(logger, err)
}
s.resources = resources
s.allocator = allocator
s.nodeAllocations = make(map[string][]*resourceapi.AllocationResult)
}
s.claims = claims
return nil, nil
}
func (pl *dynamicResources) lookupParameters(logger klog.Logger, class *resourceapi.ResourceClass, claim *resourceapi.ResourceClaim) (classParameters *resourceapi.ResourceClassParameters, claimParameters *resourceapi.ResourceClaimParameters, status *framework.Status) {
classParameters, status = pl.lookupClassParameters(logger, class)
if status != nil {
return
}
claimParameters, status = pl.lookupClaimParameters(logger, class, claim)
return
type claimListerForAssumeCache struct {
assumeCache *assumecache.AssumeCache
inFlightAllocations *sync.Map
}
func (pl *dynamicResources) lookupClassParameters(logger klog.Logger, class *resourceapi.ResourceClass) (*resourceapi.ResourceClassParameters, *framework.Status) {
defaultClassParameters := resourceapi.ResourceClassParameters{}
if class.ParametersRef == nil {
return &defaultClassParameters, nil
}
if class.ParametersRef.APIGroup == resourceapi.SchemeGroupVersion.Group &&
class.ParametersRef.Kind == "ResourceClassParameters" {
// Use the parameters which were referenced directly.
parameters, err := pl.classParametersLister.ResourceClassParameters(class.ParametersRef.Namespace).Get(class.ParametersRef.Name)
if err != nil {
if apierrors.IsNotFound(err) {
return nil, statusUnschedulable(logger, fmt.Sprintf("class parameters %s not found", klog.KRef(class.ParametersRef.Namespace, class.ParametersRef.Name)))
}
return nil, statusError(logger, fmt.Errorf("get class parameters %s: %v", klog.KRef(class.Namespace, class.ParametersRef.Name), err))
func (cl *claimListerForAssumeCache) ListAllAllocated() ([]*resourceapi.ResourceClaim, error) {
// Probably not worth adding an index for?
objs := cl.assumeCache.List(nil)
allocated := make([]*resourceapi.ResourceClaim, 0, len(objs))
for _, obj := range objs {
claim := obj.(*resourceapi.ResourceClaim)
if obj, ok := cl.inFlightAllocations.Load(claim.UID); ok {
claim = obj.(*resourceapi.ResourceClaim)
}
return parameters, nil
}
objs, err := pl.classParametersIndexer.ByIndex(generatedFromIndex, classParametersReferenceKeyFunc(class.ParametersRef))
if err != nil {
return nil, statusError(logger, fmt.Errorf("listing class parameters failed: %v", err))
}
switch len(objs) {
case 0:
return nil, statusUnschedulable(logger, fmt.Sprintf("generated class parameters for %s.%s %s not found", class.ParametersRef.Kind, class.ParametersRef.APIGroup, klog.KRef(class.ParametersRef.Namespace, class.ParametersRef.Name)))
case 1:
parameters, ok := objs[0].(*resourceapi.ResourceClassParameters)
if !ok {
return nil, statusError(logger, fmt.Errorf("unexpected object in class parameters index: %T", objs[0]))
if claim.Status.Allocation != nil {
allocated = append(allocated, claim)
}
return parameters, nil
default:
sort.Slice(objs, func(i, j int) bool {
obj1, obj2 := objs[i].(*resourceapi.ResourceClassParameters), objs[j].(*resourceapi.ResourceClassParameters)
if obj1 == nil || obj2 == nil {
return false
}
return obj1.Name < obj2.Name
})
return nil, statusError(logger, fmt.Errorf("multiple generated class parameters for %s.%s %s found: %s", class.ParametersRef.Kind, class.ParametersRef.APIGroup, klog.KRef(class.Namespace, class.ParametersRef.Name), klog.KObjSlice(objs)))
}
}
func (pl *dynamicResources) lookupClaimParameters(logger klog.Logger, class *resourceapi.ResourceClass, claim *resourceapi.ResourceClaim) (*resourceapi.ResourceClaimParameters, *framework.Status) {
defaultClaimParameters := resourceapi.ResourceClaimParameters{
DriverRequests: []resourceapi.DriverRequests{
{
DriverName: class.DriverName,
Requests: []resourceapi.ResourceRequest{
{
ResourceRequestModel: resourceapi.ResourceRequestModel{
// TODO: This only works because NamedResources is
// the only model currently implemented. We need to
// match the default to how the resources of this
// class are being advertized in a ResourceSlice.
NamedResources: &resourceapi.NamedResourcesRequest{
Selector: "true",
},
},
},
},
},
},
}
if claim.Spec.ParametersRef == nil {
return &defaultClaimParameters, nil
}
if claim.Spec.ParametersRef.APIGroup == resourceapi.SchemeGroupVersion.Group &&
claim.Spec.ParametersRef.Kind == "ResourceClaimParameters" {
// Use the parameters which were referenced directly.
parameters, err := pl.claimParametersLister.ResourceClaimParameters(claim.Namespace).Get(claim.Spec.ParametersRef.Name)
if err != nil {
if apierrors.IsNotFound(err) {
return nil, statusUnschedulable(logger, fmt.Sprintf("claim parameters %s not found", klog.KRef(claim.Namespace, claim.Spec.ParametersRef.Name)))
}
return nil, statusError(logger, fmt.Errorf("get claim parameters %s: %v", klog.KRef(claim.Namespace, claim.Spec.ParametersRef.Name), err))
}
return parameters, nil
}
objs, err := pl.claimParametersIndexer.ByIndex(generatedFromIndex, claimParametersReferenceKeyFunc(claim.Namespace, claim.Spec.ParametersRef))
if err != nil {
return nil, statusError(logger, fmt.Errorf("listing claim parameters failed: %v", err))
}
switch len(objs) {
case 0:
return nil, statusUnschedulable(logger, fmt.Sprintf("generated claim parameters for %s.%s %s not found", claim.Spec.ParametersRef.Kind, claim.Spec.ParametersRef.APIGroup, klog.KRef(claim.Namespace, claim.Spec.ParametersRef.Name)))
case 1:
parameters, ok := objs[0].(*resourceapi.ResourceClaimParameters)
if !ok {
return nil, statusError(logger, fmt.Errorf("unexpected object in claim parameters index: %T", objs[0]))
}
return parameters, nil
default:
sort.Slice(objs, func(i, j int) bool {
obj1, obj2 := objs[i].(*resourceapi.ResourceClaimParameters), objs[j].(*resourceapi.ResourceClaimParameters)
if obj1 == nil || obj2 == nil {
return false
}
return obj1.Name < obj2.Name
})
return nil, statusError(logger, fmt.Errorf("multiple generated claim parameters for %s.%s %s found: %s", claim.Spec.ParametersRef.Kind, claim.Spec.ParametersRef.APIGroup, klog.KRef(claim.Namespace, claim.Spec.ParametersRef.Name), klog.KObjSlice(objs)))
}
return allocated, nil
}
// PreFilterExtensions returns prefilter extensions, pod add and remove.
@@ -1158,10 +855,11 @@ func (pl *dynamicResources) Filter(ctx context.Context, cs *framework.CycleState
logger.V(10).Info("filtering based on resource claims of the pod", "pod", klog.KObj(pod), "node", klog.KObj(node), "resourceclaim", klog.KObj(claim))
if claim.Status.Allocation != nil {
if nodeSelector := state.informationsForClaim[index].availableOnNode; nodeSelector != nil {
for _, nodeSelector := range state.informationsForClaim[index].availableOnNodes {
if !nodeSelector.Match(node) {
logger.V(5).Info("AvailableOnNodes does not match", "pod", klog.KObj(pod), "node", klog.KObj(node), "resourceclaim", klog.KObj(claim))
unavailableClaims = append(unavailableClaims, index)
break
}
}
continue
@@ -1172,40 +870,61 @@ func (pl *dynamicResources) Filter(ctx context.Context, cs *framework.CycleState
return statusUnschedulable(logger, "resourceclaim must be reallocated", "pod", klog.KObj(pod), "node", klog.KObj(node), "resourceclaim", klog.KObj(claim))
}
if selector := state.informationsForClaim[index].availableOnNode; selector != nil {
if matches := selector.Match(node); !matches {
return statusUnschedulable(logger, "excluded by resource class node filter", "pod", klog.KObj(pod), "node", klog.KObj(node), "resourceclassName", claim.Spec.ResourceClassName)
for className, nodeSelector := range state.informationsForClaim[index].availableOnNodes {
if !nodeSelector.Match(node) {
return statusUnschedulable(logger, "excluded by device class node filter", "pod", klog.KObj(pod), "node", klog.KObj(node), "deviceclass", klog.KRef("", className))
}
}
// Can the builtin controller tell us whether the node is suitable?
if state.informationsForClaim[index].structuredParameters {
suitable, err := state.informationsForClaim[index].controller.nodeIsSuitable(ctx, node.Name, state.resources)
if err != nil {
// An error indicates that something wasn't configured correctly, for example
// writing a CEL expression which doesn't handle a map lookup error. Normally
// this should never fail. We could return an error here, but then the pod
// would get retried. Instead we ignore the node.
return statusUnschedulable(logger, fmt.Sprintf("checking structured parameters failed: %v", err), "pod", klog.KObj(pod), "node", klog.KObj(node), "resourceclaim", klog.KObj(claim))
}
if !suitable {
return statusUnschedulable(logger, "resourceclaim cannot be allocated for the node (unsuitable)", "pod", klog.KObj(pod), "node", klog.KObj(node), "resourceclaim", klog.KObj(claim))
}
} else {
if status := state.informationsForClaim[index].status; status != nil {
for _, unsuitableNode := range status.UnsuitableNodes {
if node.Name == unsuitableNode {
return statusUnschedulable(logger, "resourceclaim cannot be allocated for the node (unsuitable)", "pod", klog.KObj(pod), "node", klog.KObj(node), "resourceclaim", klog.KObj(claim), "unsuitablenodes", status.UnsuitableNodes)
}
// Use information from control plane controller?
if status := state.informationsForClaim[index].status; status != nil {
for _, unsuitableNode := range status.UnsuitableNodes {
if node.Name == unsuitableNode {
return statusUnschedulable(logger, "resourceclaim cannot be allocated for the node (unsuitable)", "pod", klog.KObj(pod), "node", klog.KObj(node), "resourceclaim", klog.KObj(claim), "unsuitablenodes", status.UnsuitableNodes)
}
}
}
}
// Use allocator to check the node and cache the result in case that the node is picked.
var allocations []*resourceapi.AllocationResult
if state.allocator != nil {
allocCtx := ctx
if loggerV := logger.V(5); loggerV.Enabled() {
allocCtx = klog.NewContext(allocCtx, klog.LoggerWithValues(logger, "node", klog.KObj(node)))
}
a, err := state.allocator.Allocate(allocCtx, node)
if err != nil {
// This should only fail if there is something wrong with the claim or class.
// Return an error to abort scheduling of it.
//
// This will cause retries. It would be slightly nicer to mark it as unschedulable
// *and* abort scheduling. Then only cluster event for updating the claim or class
// with the broken CEL expression would trigger rescheduling.
//
// But we cannot do both. As this shouldn't occur often, aborting like this is
// better than the more complicated alternative (return Unschedulable here, remember
// the error, then later raise it again later if needed).
return statusError(logger, err, "pod", klog.KObj(pod), "node", klog.KObj(node), "resourceclaims", klog.KObjSlice(state.allocator.ClaimsToAllocate()))
}
// Check for exact length just to be sure. In practice this is all-or-nothing.
if len(a) != len(state.allocator.ClaimsToAllocate()) {
return statusUnschedulable(logger, "cannot allocate all claims", "pod", klog.KObj(pod), "node", klog.KObj(node), "resourceclaims", klog.KObjSlice(state.allocator.ClaimsToAllocate()))
}
// Reserve uses this information.
allocations = a
}
// Store information in state while holding the mutex.
if state.allocator != nil || len(unavailableClaims) > 0 {
state.mutex.Lock()
defer state.mutex.Unlock()
}
if len(unavailableClaims) > 0 {
// Remember all unavailable claims. This might be observed
// concurrently, so we have to lock the state before writing.
state.mutex.Lock()
defer state.mutex.Unlock()
if state.unavailableClaims == nil {
state.unavailableClaims = sets.New[int]()
@@ -1217,6 +936,10 @@ func (pl *dynamicResources) Filter(ctx context.Context, cs *framework.CycleState
return statusUnschedulable(logger, "resourceclaim not available on the node", "pod", klog.KObj(pod))
}
if state.allocator != nil {
state.nodeAllocations[node.Name] = allocations
}
return nil
}
@@ -1266,7 +989,6 @@ func (pl *dynamicResources) PostFilter(ctx context.Context, cs *framework.CycleS
claim := claim.DeepCopy()
claim.Status.ReservedFor = nil
if clearAllocation {
claim.Status.DriverName = ""
claim.Status.Allocation = nil
} else {
claim.Status.DeallocationRequested = true
@@ -1303,7 +1025,7 @@ func (pl *dynamicResources) PreScore(ctx context.Context, cs *framework.CycleSta
pending := false
for index, claim := range state.claims {
if claim.Status.Allocation == nil &&
state.informationsForClaim[index].controller == nil {
!state.informationsForClaim[index].structuredParameters {
pending = true
break
}
@@ -1379,10 +1101,11 @@ func (pl *dynamicResources) Reserve(ctx context.Context, cs *framework.CycleStat
return nil
}
logger := klog.FromContext(ctx)
numDelayedAllocationPending := 0
numClaimsWithStatusInfo := 0
claimsWithBuiltinController := make([]int, 0, len(state.claims))
logger := klog.FromContext(ctx)
numClaimsWithAllocator := 0
for index, claim := range state.claims {
if claim.Status.Allocation != nil {
// Allocated, but perhaps not reserved yet. We checked in PreFilter that
@@ -1393,9 +1116,9 @@ func (pl *dynamicResources) Reserve(ctx context.Context, cs *framework.CycleStat
continue
}
// Do we have the builtin controller?
if state.informationsForClaim[index].controller != nil {
claimsWithBuiltinController = append(claimsWithBuiltinController, index)
// Do we use the allocator for it?
if state.informationsForClaim[index].structuredParameters {
numClaimsWithAllocator++
continue
}
@@ -1409,7 +1132,7 @@ func (pl *dynamicResources) Reserve(ctx context.Context, cs *framework.CycleStat
}
}
if numDelayedAllocationPending == 0 && len(claimsWithBuiltinController) == 0 {
if numDelayedAllocationPending == 0 && numClaimsWithAllocator == 0 {
// Nothing left to do.
return nil
}
@@ -1430,27 +1153,41 @@ func (pl *dynamicResources) Reserve(ctx context.Context, cs *framework.CycleStat
}
// Prepare allocation of claims handled by the schedulder.
for _, index := range claimsWithBuiltinController {
claim := state.claims[index]
driverName, allocation, err := state.informationsForClaim[index].controller.allocate(ctx, nodeName, state.resources)
if err != nil {
if state.allocator != nil {
// Entries in these two slices match each other.
claimsToAllocate := state.allocator.ClaimsToAllocate()
allocations, ok := state.nodeAllocations[nodeName]
if !ok {
// We checked before that the node is suitable. This shouldn't have failed,
// so treat this as an error.
return statusError(logger, fmt.Errorf("claim allocation failed unexpectedly: %v", err))
return statusError(logger, errors.New("claim allocation not found for node"))
}
state.informationsForClaim[index].allocation = allocation
state.informationsForClaim[index].allocationDriverName = driverName
// Strictly speaking, we don't need to store the full modified object.
// The allocation would be enough. The full object is useful for
// debugging and testing, so let's make it realistic.
claim = claim.DeepCopy()
if !slices.Contains(claim.Finalizers, resourceapi.Finalizer) {
claim.Finalizers = append(claim.Finalizers, resourceapi.Finalizer)
// Sanity check: do we have results for all pending claims?
if len(allocations) != len(claimsToAllocate) ||
len(allocations) != numClaimsWithAllocator {
return statusError(logger, fmt.Errorf("internal error, have %d allocations, %d claims to allocate, want %d claims", len(allocations), len(claimsToAllocate), numClaimsWithAllocator))
}
for i, claim := range claimsToAllocate {
index := slices.Index(state.claims, claim)
if index < 0 {
return statusError(logger, fmt.Errorf("internal error, claim %s with allocation not found", claim.Name))
}
allocation := allocations[i]
state.informationsForClaim[index].allocation = allocation
// Strictly speaking, we don't need to store the full modified object.
// The allocation would be enough. The full object is useful for
// debugging, testing and the allocator, so let's make it realistic.
claim = claim.DeepCopy()
if !slices.Contains(claim.Finalizers, resourceapi.Finalizer) {
claim.Finalizers = append(claim.Finalizers, resourceapi.Finalizer)
}
claim.Status.Allocation = allocation
pl.inFlightAllocations.Store(claim.UID, claim)
logger.V(5).Info("Reserved resource in allocation result", "claim", klog.KObj(claim), "allocation", klog.Format(allocation))
}
claim.Status.DriverName = driverName
claim.Status.Allocation = allocation
pl.inFlightAllocations.Store(claim.UID, claim)
logger.V(5).Info("Reserved resource in allocation result", "claim", klog.KObj(claim), "driver", driverName, "allocation", klog.Format(allocation))
}
// When there is only one pending resource, we can go ahead with
@@ -1460,8 +1197,8 @@ func (pl *dynamicResources) Reserve(ctx context.Context, cs *framework.CycleStat
//
// If all pending claims are handled with the builtin controller,
// there is no need for a PodSchedulingContext change.
if numDelayedAllocationPending == 1 && len(claimsWithBuiltinController) == 0 ||
numClaimsWithStatusInfo+len(claimsWithBuiltinController) == numDelayedAllocationPending && len(claimsWithBuiltinController) < numDelayedAllocationPending {
if numDelayedAllocationPending == 1 && numClaimsWithAllocator == 0 ||
numClaimsWithStatusInfo+numClaimsWithAllocator == numDelayedAllocationPending && numClaimsWithAllocator < numDelayedAllocationPending {
// TODO: can we increase the chance that the scheduler picks
// the same node as before when allocation is on-going,
// assuming that that node still fits the pod? Picking a
@@ -1530,7 +1267,7 @@ func (pl *dynamicResources) Unreserve(ctx context.Context, cs *framework.CycleSt
for index, claim := range state.claims {
// If allocation was in-flight, then it's not anymore and we need to revert the
// claim object in the assume cache to what it was before.
if state.informationsForClaim[index].controller != nil {
if state.informationsForClaim[index].structuredParameters {
if _, found := pl.inFlightAllocations.LoadAndDelete(state.claims[index].UID); found {
pl.claimAssumeCache.Restore(claim.Namespace + "/" + claim.Name)
}
@@ -1661,8 +1398,6 @@ func (pl *dynamicResources) bindClaim(ctx context.Context, state *stateData, ind
}
claim = updatedClaim
}
claim.Status.DriverName = state.informationsForClaim[index].allocationDriverName
claim.Status.Allocation = allocation
}