Merge pull request #124630 from carlory/fix-123731
DRA: scheduler: index claim and class parameters to simplify lookup
This commit is contained in:
		| @@ -32,7 +32,6 @@ import ( | ||||
| 	apiequality "k8s.io/apimachinery/pkg/api/equality" | ||||
| 	apierrors "k8s.io/apimachinery/pkg/api/errors" | ||||
| 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||||
| 	"k8s.io/apimachinery/pkg/labels" | ||||
| 	"k8s.io/apimachinery/pkg/runtime" | ||||
| 	"k8s.io/apimachinery/pkg/runtime/schema" | ||||
| 	"k8s.io/apimachinery/pkg/types" | ||||
| @@ -40,6 +39,7 @@ import ( | ||||
| 	resourcev1alpha2apply "k8s.io/client-go/applyconfigurations/resource/v1alpha2" | ||||
| 	"k8s.io/client-go/kubernetes" | ||||
| 	resourcev1alpha2listers "k8s.io/client-go/listers/resource/v1alpha2" | ||||
| 	"k8s.io/client-go/tools/cache" | ||||
| 	"k8s.io/component-helpers/scheduling/corev1/nodeaffinity" | ||||
| 	"k8s.io/dynamic-resource-allocation/resourceclaim" | ||||
| 	"k8s.io/klog/v2" | ||||
| @@ -56,6 +56,10 @@ const ( | ||||
| 	Name = names.DynamicResources | ||||
|  | ||||
| 	stateKey framework.StateKey = Name | ||||
|  | ||||
| 	// generatedFromIndex is the lookup name for the index function | ||||
| 	// which indexes by other resource which generated the parameters object. | ||||
| 	generatedFromIndex = "generated-from-index" | ||||
| ) | ||||
|  | ||||
| // The state is initialized in PreFilter phase. Because we save the pointer in | ||||
| @@ -280,6 +284,13 @@ type dynamicResources struct { | ||||
| 	resourceSliceLister        resourcev1alpha2listers.ResourceSliceLister | ||||
| 	claimNameLookup            *resourceclaim.Lookup | ||||
|  | ||||
| 	// claimParametersIndexer has the common claimParametersGeneratedFrom indexer installed to | ||||
| 	// limit iteration over claimParameters to those of interest. | ||||
| 	claimParametersIndexer cache.Indexer | ||||
| 	// classParametersIndexer has the common classParametersGeneratedFrom indexer installed to | ||||
| 	// limit iteration over classParameters to those of interest. | ||||
| 	classParametersIndexer cache.Indexer | ||||
|  | ||||
| 	// claimAssumeCache enables temporarily storing a newer claim object | ||||
| 	// while the scheduler has allocated it and the corresponding object | ||||
| 	// update from the apiserver has not been processed by the claim | ||||
| @@ -352,15 +363,58 @@ func New(ctx context.Context, plArgs runtime.Object, fh framework.Handle, fts fe | ||||
| 		classLister:                fh.SharedInformerFactory().Resource().V1alpha2().ResourceClasses().Lister(), | ||||
| 		podSchedulingContextLister: fh.SharedInformerFactory().Resource().V1alpha2().PodSchedulingContexts().Lister(), | ||||
| 		claimParametersLister:      fh.SharedInformerFactory().Resource().V1alpha2().ResourceClaimParameters().Lister(), | ||||
| 		claimParametersIndexer:     fh.SharedInformerFactory().Resource().V1alpha2().ResourceClaimParameters().Informer().GetIndexer(), | ||||
| 		classParametersLister:      fh.SharedInformerFactory().Resource().V1alpha2().ResourceClassParameters().Lister(), | ||||
| 		classParametersIndexer:     fh.SharedInformerFactory().Resource().V1alpha2().ResourceClassParameters().Informer().GetIndexer(), | ||||
| 		resourceSliceLister:        fh.SharedInformerFactory().Resource().V1alpha2().ResourceSlices().Lister(), | ||||
| 		claimNameLookup:            resourceclaim.NewNameLookup(fh.ClientSet()), | ||||
| 		claimAssumeCache:           assumecache.NewAssumeCache(logger, fh.SharedInformerFactory().Resource().V1alpha2().ResourceClaims().Informer(), "claim", "", nil), | ||||
| 	} | ||||
|  | ||||
| 	if err := pl.claimParametersIndexer.AddIndexers(cache.Indexers{generatedFromIndex: claimParametersGeneratedFromIndexFunc}); err != nil { | ||||
| 		return nil, fmt.Errorf("add claim parameters cache indexer: %w", err) | ||||
| 	} | ||||
| 	if err := pl.classParametersIndexer.AddIndexers(cache.Indexers{generatedFromIndex: classParametersGeneratedFromIndexFunc}); err != nil { | ||||
| 		return nil, fmt.Errorf("add class parameters cache indexer: %w", err) | ||||
| 	} | ||||
|  | ||||
| 	return pl, nil | ||||
| } | ||||
|  | ||||
| func claimParametersReferenceKeyFunc(ref *resourcev1alpha2.ResourceClaimParametersReference) string { | ||||
| 	return ref.APIGroup + "/" + ref.Kind + "/" + ref.Name | ||||
| } | ||||
|  | ||||
| // claimParametersGeneratedFromIndexFunc is an index function that returns other resource keys | ||||
| // (= apiGroup/kind/name) for ResourceClaimParametersReference in a given claim parameters. | ||||
| func claimParametersGeneratedFromIndexFunc(obj interface{}) ([]string, error) { | ||||
| 	parameters, ok := obj.(*resourcev1alpha2.ResourceClaimParameters) | ||||
| 	if !ok { | ||||
| 		return nil, nil | ||||
| 	} | ||||
| 	if parameters.GeneratedFrom == nil { | ||||
| 		return nil, nil | ||||
| 	} | ||||
| 	return []string{claimParametersReferenceKeyFunc(parameters.GeneratedFrom)}, nil | ||||
| } | ||||
|  | ||||
| func classParametersReferenceKeyFunc(ref *resourcev1alpha2.ResourceClassParametersReference) string { | ||||
| 	return ref.APIGroup + "/" + ref.Kind + "/" + ref.Namespace + "/" + ref.Name | ||||
| } | ||||
|  | ||||
| // classParametersGeneratedFromIndexFunc is an index function that returns other resource keys | ||||
| // (= apiGroup/kind/namespace/name) for ResourceClassParametersReference in a given class parameters. | ||||
| func classParametersGeneratedFromIndexFunc(obj interface{}) ([]string, error) { | ||||
| 	parameters, ok := obj.(*resourcev1alpha2.ResourceClassParameters) | ||||
| 	if !ok { | ||||
| 		return nil, nil | ||||
| 	} | ||||
| 	if parameters.GeneratedFrom == nil { | ||||
| 		return nil, nil | ||||
| 	} | ||||
| 	return []string{classParametersReferenceKeyFunc(parameters.GeneratedFrom)}, nil | ||||
| } | ||||
|  | ||||
| var _ framework.PreEnqueuePlugin = &dynamicResources{} | ||||
| var _ framework.PreFilterPlugin = &dynamicResources{} | ||||
| var _ framework.FilterPlugin = &dynamicResources{} | ||||
| @@ -987,23 +1041,22 @@ func (pl *dynamicResources) lookupClassParameters(logger klog.Logger, class *res | ||||
| 		return parameters, nil | ||||
| 	} | ||||
|  | ||||
| 	// TODO (https://github.com/kubernetes/kubernetes/issues/123731): use an indexer | ||||
| 	allParameters, err := pl.classParametersLister.ResourceClassParameters(class.Namespace).List(labels.Everything()) | ||||
| 	objs, err := pl.classParametersIndexer.ByIndex(generatedFromIndex, classParametersReferenceKeyFunc(class.ParametersRef)) | ||||
| 	if err != nil { | ||||
| 		return nil, statusError(logger, fmt.Errorf("listing class parameters failed: %v", err)) | ||||
| 	} | ||||
| 	for _, parameters := range allParameters { | ||||
| 		if parameters.GeneratedFrom == nil { | ||||
| 			continue | ||||
| 		} | ||||
| 		if parameters.GeneratedFrom.APIGroup == class.ParametersRef.APIGroup && | ||||
| 			parameters.GeneratedFrom.Kind == class.ParametersRef.Kind && | ||||
| 			parameters.GeneratedFrom.Name == class.ParametersRef.Name && | ||||
| 			parameters.GeneratedFrom.Namespace == class.ParametersRef.Namespace { | ||||
| 			return parameters, nil | ||||
| 	switch len(objs) { | ||||
| 	case 0: | ||||
| 		return nil, statusUnschedulable(logger, fmt.Sprintf("generated class parameters for %s.%s %s not found", class.ParametersRef.Kind, class.ParametersRef.APIGroup, klog.KRef(class.ParametersRef.Namespace, class.ParametersRef.Name))) | ||||
| 	case 1: | ||||
| 		parameters, ok := objs[0].(*resourcev1alpha2.ResourceClassParameters) | ||||
| 		if !ok { | ||||
| 			return nil, statusError(logger, fmt.Errorf("unexpected object in class parameters index: %T", objs[0])) | ||||
| 		} | ||||
| 		return parameters, nil | ||||
| 	default: | ||||
| 		return nil, statusError(logger, fmt.Errorf("multiple generated class parameters for %s.%s %s found: %s", class.ParametersRef.Kind, class.ParametersRef.APIGroup, klog.KRef(class.Namespace, class.ParametersRef.Name), klog.KObjSlice(objs))) | ||||
| 	} | ||||
| 	return nil, statusUnschedulable(logger, fmt.Sprintf("generated class parameters for %s.%s %s not found", class.ParametersRef.Kind, class.ParametersRef.APIGroup, klog.KRef(class.Namespace, class.ParametersRef.Name))) | ||||
| } | ||||
|  | ||||
| func (pl *dynamicResources) lookupClaimParameters(logger klog.Logger, class *resourcev1alpha2.ResourceClass, claim *resourcev1alpha2.ResourceClaim) (*resourcev1alpha2.ResourceClaimParameters, *framework.Status) { | ||||
| @@ -1045,22 +1098,22 @@ func (pl *dynamicResources) lookupClaimParameters(logger klog.Logger, class *res | ||||
| 		return parameters, nil | ||||
| 	} | ||||
|  | ||||
| 	// TODO (https://github.com/kubernetes/kubernetes/issues/123731): use an indexer | ||||
| 	allParameters, err := pl.claimParametersLister.ResourceClaimParameters(claim.Namespace).List(labels.Everything()) | ||||
| 	objs, err := pl.claimParametersIndexer.ByIndex(generatedFromIndex, claimParametersReferenceKeyFunc(claim.Spec.ParametersRef)) | ||||
| 	if err != nil { | ||||
| 		return nil, statusError(logger, fmt.Errorf("listing claim parameters failed: %v", err)) | ||||
| 	} | ||||
| 	for _, parameters := range allParameters { | ||||
| 		if parameters.GeneratedFrom == nil { | ||||
| 			continue | ||||
| 		} | ||||
| 		if parameters.GeneratedFrom.APIGroup == claim.Spec.ParametersRef.APIGroup && | ||||
| 			parameters.GeneratedFrom.Kind == claim.Spec.ParametersRef.Kind && | ||||
| 			parameters.GeneratedFrom.Name == claim.Spec.ParametersRef.Name { | ||||
| 			return parameters, nil | ||||
| 	switch len(objs) { | ||||
| 	case 0: | ||||
| 		return nil, statusUnschedulable(logger, fmt.Sprintf("generated claim parameters for %s.%s %s not found", claim.Spec.ParametersRef.Kind, claim.Spec.ParametersRef.APIGroup, klog.KRef(claim.Namespace, claim.Spec.ParametersRef.Name))) | ||||
| 	case 1: | ||||
| 		parameters, ok := objs[0].(*resourcev1alpha2.ResourceClaimParameters) | ||||
| 		if !ok { | ||||
| 			return nil, statusError(logger, fmt.Errorf("unexpected object in claim parameters index: %T", objs[0])) | ||||
| 		} | ||||
| 		return parameters, nil | ||||
| 	default: | ||||
| 		return nil, statusError(logger, fmt.Errorf("multiple generated claim parameters for %s.%s %s found: %s", claim.Spec.ParametersRef.Kind, claim.Spec.ParametersRef.APIGroup, klog.KRef(claim.Namespace, claim.Spec.ParametersRef.Name), klog.KObjSlice(objs))) | ||||
| 	} | ||||
| 	return nil, statusUnschedulable(logger, fmt.Sprintf("generated claim parameters for %s.%s %s not found", claim.Spec.ParametersRef.Kind, claim.Spec.ParametersRef.APIGroup, klog.KRef(claim.Namespace, claim.Spec.ParametersRef.Name))) | ||||
| } | ||||
|  | ||||
| // PreFilterExtensions returns prefilter extensions, pod add and remove. | ||||
|   | ||||
| @@ -659,7 +659,7 @@ func TestPlugin(t *testing.T) { | ||||
| 			pod:     podWithClaimName, | ||||
| 			claims:  []*resourcev1alpha2.ResourceClaim{pendingDelayedClaimWithParams}, | ||||
| 			classes: []*resourcev1alpha2.ResourceClass{structuredResourceClassWithParams}, | ||||
| 			objs:    []apiruntime.Object{claimParameters /* classParameters, */, workerNodeSlice}, | ||||
| 			objs:    []apiruntime.Object{claimParameters, workerNodeSlice}, | ||||
| 			want: want{ | ||||
| 				prefilter: result{ | ||||
| 					status: framework.NewStatus(framework.UnschedulableAndUnresolvable, `class parameters default/my-resource-class not found`), | ||||
| @@ -674,7 +674,7 @@ func TestPlugin(t *testing.T) { | ||||
| 			pod:     podWithClaimName, | ||||
| 			claims:  []*resourcev1alpha2.ResourceClaim{pendingDelayedClaimWithParams}, | ||||
| 			classes: []*resourcev1alpha2.ResourceClass{structuredResourceClassWithParams}, | ||||
| 			objs:    []apiruntime.Object{ /* claimParameters, */ classParameters, workerNodeSlice}, | ||||
| 			objs:    []apiruntime.Object{classParameters, workerNodeSlice}, | ||||
| 			want: want{ | ||||
| 				prefilter: result{ | ||||
| 					status: framework.NewStatus(framework.UnschedulableAndUnresolvable, `claim parameters default/my-pod-my-resource not found`), | ||||
| @@ -685,6 +685,66 @@ func TestPlugin(t *testing.T) { | ||||
| 			}, | ||||
| 		}, | ||||
|  | ||||
| 		"missing-translated-class-parameters": { | ||||
| 			pod:     podWithClaimName, | ||||
| 			claims:  []*resourcev1alpha2.ResourceClaim{claimWithCRD(pendingDelayedClaimWithParams)}, | ||||
| 			classes: []*resourcev1alpha2.ResourceClass{classWithCRD(structuredResourceClassWithCRD)}, | ||||
| 			objs:    []apiruntime.Object{claimParameters, workerNodeSlice}, | ||||
| 			want: want{ | ||||
| 				prefilter: result{ | ||||
| 					status: framework.NewStatus(framework.UnschedulableAndUnresolvable, `generated class parameters for ResourceClassParameters.example.com default/my-resource-class not found`), | ||||
| 				}, | ||||
| 				postfilter: result{ | ||||
| 					status: framework.NewStatus(framework.Unschedulable, `no new claims to deallocate`), | ||||
| 				}, | ||||
| 			}, | ||||
| 		}, | ||||
|  | ||||
| 		"missing-translated-claim-parameters": { | ||||
| 			pod:     podWithClaimName, | ||||
| 			claims:  []*resourcev1alpha2.ResourceClaim{claimWithCRD(pendingDelayedClaimWithParams)}, | ||||
| 			classes: []*resourcev1alpha2.ResourceClass{classWithCRD(structuredResourceClassWithCRD)}, | ||||
| 			objs:    []apiruntime.Object{classParameters, workerNodeSlice}, | ||||
| 			want: want{ | ||||
| 				prefilter: result{ | ||||
| 					status: framework.NewStatus(framework.UnschedulableAndUnresolvable, `generated claim parameters for ResourceClaimParameters.example.com default/my-pod-my-resource not found`), | ||||
| 				}, | ||||
| 				postfilter: result{ | ||||
| 					status: framework.NewStatus(framework.Unschedulable, `no new claims to deallocate`), | ||||
| 				}, | ||||
| 			}, | ||||
| 		}, | ||||
|  | ||||
| 		"too-many-translated-class-parameters": { | ||||
| 			pod:     podWithClaimName, | ||||
| 			claims:  []*resourcev1alpha2.ResourceClaim{claimWithCRD(pendingDelayedClaimWithParams)}, | ||||
| 			classes: []*resourcev1alpha2.ResourceClass{classWithCRD(structuredResourceClassWithCRD)}, | ||||
| 			objs:    []apiruntime.Object{claimParameters, classParameters, st.FromClassParameters(classParameters).Name("other").Obj() /* too many */, workerNodeSlice}, | ||||
| 			want: want{ | ||||
| 				prefilter: result{ | ||||
| 					status: framework.AsStatus(errors.New(`multiple generated class parameters for ResourceClassParameters.example.com my-resource-class found: [default/my-resource-class default/other]`)), | ||||
| 				}, | ||||
| 				postfilter: result{ | ||||
| 					status: framework.NewStatus(framework.Unschedulable, `no new claims to deallocate`), | ||||
| 				}, | ||||
| 			}, | ||||
| 		}, | ||||
|  | ||||
| 		"too-many-translated-claim-parameters": { | ||||
| 			pod:     podWithClaimName, | ||||
| 			claims:  []*resourcev1alpha2.ResourceClaim{claimWithCRD(pendingDelayedClaimWithParams)}, | ||||
| 			classes: []*resourcev1alpha2.ResourceClass{classWithCRD(structuredResourceClassWithCRD)}, | ||||
| 			objs:    []apiruntime.Object{claimParameters, st.FromClaimParameters(claimParameters).Name("other").Obj() /* too many */, classParameters, workerNodeSlice}, | ||||
| 			want: want{ | ||||
| 				prefilter: result{ | ||||
| 					status: framework.AsStatus(errors.New(`multiple generated claim parameters for ResourceClaimParameters.example.com default/my-pod-my-resource found: [default/my-pod-my-resource default/other]`)), | ||||
| 				}, | ||||
| 				postfilter: result{ | ||||
| 					status: framework.NewStatus(framework.Unschedulable, `no new claims to deallocate`), | ||||
| 				}, | ||||
| 			}, | ||||
| 		}, | ||||
|  | ||||
| 		"claim-parameters-CEL-runtime-error": { | ||||
| 			pod:     podWithClaimName, | ||||
| 			claims:  []*resourcev1alpha2.ResourceClaim{pendingDelayedClaimWithParams}, | ||||
|   | ||||
| @@ -1154,6 +1154,11 @@ func MakeClaimParameters() *ClaimParametersWrapper { | ||||
| 	return &ClaimParametersWrapper{} | ||||
| } | ||||
|  | ||||
| // FromClaimParameters creates a ResourceClaimParameters wrapper from an existing object. | ||||
| func FromClaimParameters(other *resourcev1alpha2.ResourceClaimParameters) *ClaimParametersWrapper { | ||||
| 	return &ClaimParametersWrapper{*other.DeepCopy()} | ||||
| } | ||||
|  | ||||
| func (wrapper *ClaimParametersWrapper) Obj() *resourcev1alpha2.ResourceClaimParameters { | ||||
| 	return &wrapper.ResourceClaimParameters | ||||
| } | ||||
| @@ -1209,6 +1214,11 @@ func MakeClassParameters() *ClassParametersWrapper { | ||||
| 	return &ClassParametersWrapper{} | ||||
| } | ||||
|  | ||||
| // FromClassParameters creates a ResourceClassParameters wrapper from an existing object. | ||||
| func FromClassParameters(other *resourcev1alpha2.ResourceClassParameters) *ClassParametersWrapper { | ||||
| 	return &ClassParametersWrapper{*other.DeepCopy()} | ||||
| } | ||||
|  | ||||
| func (wrapper *ClassParametersWrapper) Obj() *resourcev1alpha2.ResourceClassParameters { | ||||
| 	return &wrapper.ResourceClassParameters | ||||
| } | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 Kubernetes Prow Robot
					Kubernetes Prow Robot