diff --git a/test/integration/scheduler_perf/.import-restrictions b/test/integration/scheduler_perf/.import-restrictions new file mode 100644 index 00000000000..be861812e73 --- /dev/null +++ b/test/integration/scheduler_perf/.import-restrictions @@ -0,0 +1,7 @@ +rules: + # test/integration should not use test/e2e, but reusing the + # DRA test driver for the simulated cluster during scheduling + # tests is fine. + - selectorRegexp: k8s[.]io/kubernetes/test/e2e + allowedPrefixes: + - k8s.io/kubernetes/test/e2e/dra/test-driver diff --git a/test/integration/scheduler_perf/config/dra/claim-template.yaml b/test/integration/scheduler_perf/config/dra/claim-template.yaml new file mode 100644 index 00000000000..2ccc57e478c --- /dev/null +++ b/test/integration/scheduler_perf/config/dra/claim-template.yaml @@ -0,0 +1,7 @@ +apiVersion: resource.k8s.io/v1alpha1 +kind: ResourceClaimTemplate +metadata: + name: claim-template +spec: + spec: + resourceClassName: scheduler-performance diff --git a/test/integration/scheduler_perf/config/dra/node-with-dra-test-driver.yaml b/test/integration/scheduler_perf/config/dra/node-with-dra-test-driver.yaml new file mode 100644 index 00000000000..b3fe7309070 --- /dev/null +++ b/test/integration/scheduler_perf/config/dra/node-with-dra-test-driver.yaml @@ -0,0 +1,15 @@ +apiVersion: v1 +kind: Node +metadata: + # The name is relevant for selecting whether the driver has resources for this node. + generateName: scheduler-perf-dra- +spec: {} +status: + capacity: + pods: "110" + cpu: "4" + memory: 32Gi + conditions: + - status: "True" + type: Ready + phase: Running diff --git a/test/integration/scheduler_perf/config/dra/pod-with-claim-template.yaml b/test/integration/scheduler_perf/config/dra/pod-with-claim-template.yaml new file mode 100644 index 00000000000..eb1dd879205 --- /dev/null +++ b/test/integration/scheduler_perf/config/dra/pod-with-claim-template.yaml @@ -0,0 +1,15 @@ +apiVersion: v1 +kind: Pod +metadata: + generateName: test-dra +spec: + containers: + - image: registry.k8s.io/pause:3.9 + name: pause + resources: + claims: + - name: resource + resourceClaims: + - name: resource + source: + resourceClaimTemplateName: test-claim-template diff --git a/test/integration/scheduler_perf/config/dra/resourceclaimtemplate.yaml b/test/integration/scheduler_perf/config/dra/resourceclaimtemplate.yaml new file mode 100644 index 00000000000..d3da6879e76 --- /dev/null +++ b/test/integration/scheduler_perf/config/dra/resourceclaimtemplate.yaml @@ -0,0 +1,7 @@ +apiVersion: resource.k8s.io/v1alpha1 +kind: ResourceClaimTemplate +metadata: + name: test-claim-template +spec: + spec: + resourceClassName: test-class diff --git a/test/integration/scheduler_perf/config/dra/resourceclass.yaml b/test/integration/scheduler_perf/config/dra/resourceclass.yaml new file mode 100644 index 00000000000..d24aa82392f --- /dev/null +++ b/test/integration/scheduler_perf/config/dra/resourceclass.yaml @@ -0,0 +1,5 @@ +apiVersion: resource.k8s.io/v1alpha1 +kind: ResourceClass +metadata: + name: test-class +driverName: test-driver.cdi.k8s.io diff --git a/test/integration/scheduler_perf/config/dra/resourclass.yaml b/test/integration/scheduler_perf/config/dra/resourclass.yaml new file mode 100644 index 00000000000..b87692e8f05 --- /dev/null +++ b/test/integration/scheduler_perf/config/dra/resourclass.yaml @@ -0,0 +1,5 @@ +apiVersion: resource.k8s.io/v1alpha1 +kind: ResourceClass +metadata: + name: scheduler-performance +driverName: test-driver.cdi.k8s.io diff --git a/test/integration/scheduler_perf/config/performance-config.yaml b/test/integration/scheduler_perf/config/performance-config.yaml index 2ba1a518782..515d47975b8 100644 --- a/test/integration/scheduler_perf/config/performance-config.yaml +++ b/test/integration/scheduler_perf/config/performance-config.yaml @@ -664,3 +664,66 @@ taintNodes: 1000 normalNodes: 4000 measurePods: 4000 + +# SchedulingWithResourceClaimTemplate uses a ResourceClaimTemplate +# and dynamically created ResourceClaim instances for each pod. +- name: SchedulingWithResourceClaimTemplate + featureGates: + DynamicResourceAllocation: true + workloadTemplate: + - opcode: createNodes + countParam: $nodesWithoutDRA + - opcode: createNodes + nodeTemplatePath: config/dra/node-with-dra-test-driver.yaml + countParam: $nodesWithDRA + - opcode: createResourceDriver + driverName: test-driver.cdi.k8s.io + nodes: scheduler-perf-dra-* + maxClaimsPerNodeParam: $maxClaimsPerNode + - opcode: createResourceClass + templatePath: config/dra/resourceclass.yaml + - opcode: createResourceClaimTemplate + templatePath: config/dra/resourceclaimtemplate.yaml + namespace: init + - opcode: createPods + namespace: init + countParam: $initPods + podTemplatePath: config/dra/pod-with-claim-template.yaml + - opcode: createResourceClaimTemplate + templatePath: config/dra/resourceclaimtemplate.yaml + namespace: test + - opcode: createPods + namespace: test + countParam: $measurePods + podTemplatePath: config/dra/pod-with-claim-template.yaml + collectMetrics: true + workloads: + - name: fast + params: + # This testcase runs through all code paths without + # taking too long overall. + nodesWithDRA: 1 + nodesWithoutDRA: 1 + initPods: 0 + measurePods: 10 + maxClaimsPerNode: 10 + - name: 2000pods_100nodes + labels: [performance,fast] + params: + # In this testcase, the number of nodes is smaller + # than the limit for the PodScheduling slices. + nodesWithDRA: 100 + nodesWithoutDRA: 0 + initPods: 1000 + measurePods: 1000 + maxClaimsPerNode: 20 + - name: 2000pods_200nodes + params: + # In this testcase, the driver and scheduler must + # truncate the PotentialNodes and UnsuitableNodes + # slices. + nodesWithDRA: 200 + nodesWithoutDRA: 0 + initPods: 1000 + measurePods: 1000 + maxClaimsPerNode: 10 diff --git a/test/integration/scheduler_perf/create.go b/test/integration/scheduler_perf/create.go new file mode 100644 index 00000000000..a2fcd296542 --- /dev/null +++ b/test/integration/scheduler_perf/create.go @@ -0,0 +1,89 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package benchmark + +import ( + "context" + "fmt" + "testing" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + clientset "k8s.io/client-go/kubernetes" +) + +// createOp defines an op where some object gets created from a template. +// Everything specific for that object (create call, op code, names) gets +// provided through a type. +type createOp[T interface{}, P createOpType[T]] struct { + // Must match createOpType.Opcode(). + Opcode operationCode + // Namespace the object should be created in. Must be empty for cluster-scoped objects. + Namespace string + // Path to spec file describing the object to create. + TemplatePath string +} + +func (cro *createOp[T, P]) isValid(allowParameterization bool) error { + var p P + if cro.Opcode != p.Opcode() { + return fmt.Errorf("invalid opcode %q; expected %q", cro.Opcode, p.Opcode()) + } + if p.Namespaced() && cro.Namespace == "" { + return fmt.Errorf("Namespace must be set") + } + if !p.Namespaced() && cro.Namespace != "" { + return fmt.Errorf("Namespace must not be set") + } + if cro.TemplatePath == "" { + return fmt.Errorf("TemplatePath must be set") + } + return nil +} + +func (cro *createOp[T, P]) collectsMetrics() bool { + return false +} + +func (cro *createOp[T, P]) patchParams(w *workload) (realOp, error) { + return cro, cro.isValid(false) +} + +func (cro *createOp[T, P]) requiredNamespaces() []string { + if cro.Namespace == "" { + return nil + } + return []string{cro.Namespace} +} + +func (cro *createOp[T, P]) run(ctx context.Context, tb testing.TB, client clientset.Interface) { + var obj *T + var p P + if err := getSpecFromFile(&cro.TemplatePath, &obj); err != nil { + tb.Fatalf("parsing %s %q: %v", p.Name(), cro.TemplatePath, err) + } + if _, err := p.CreateCall(client, cro.Namespace)(ctx, obj, metav1.CreateOptions{}); err != nil { + tb.Fatalf("create %s: %v", p.Name(), err) + } +} + +// createOpType provides type-specific values for the generic createOp. +type createOpType[T interface{}] interface { + Opcode() operationCode + Name() string + Namespaced() bool + CreateCall(client clientset.Interface, namespace string) func(context.Context, *T, metav1.CreateOptions) (*T, error) +} diff --git a/test/integration/scheduler_perf/dra.go b/test/integration/scheduler_perf/dra.go new file mode 100644 index 00000000000..0c4b82328da --- /dev/null +++ b/test/integration/scheduler_perf/dra.go @@ -0,0 +1,229 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package benchmark + +import ( + "context" + "fmt" + "path/filepath" + "sync" + "testing" + + resourcev1alpha2 "k8s.io/api/resource/v1alpha2" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + clientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" + draapp "k8s.io/kubernetes/test/e2e/dra/test-driver/app" +) + +// createResourceClaimsOp defines an op where resource claims are created. +type createResourceClaimsOp struct { + // Must be createResourceClaimsOpcode. + Opcode operationCode + // Number of claims to create. Parameterizable through CountParam. + Count int + // Template parameter for Count. + CountParam string + // Namespace the claims should be created in. + Namespace string + // Path to spec file describing the claims to create. + TemplatePath string +} + +var _ realOp = &createResourceClaimsOp{} +var _ runnableOp = &createResourceClaimsOp{} + +func (op *createResourceClaimsOp) isValid(allowParameterization bool) error { + if op.Opcode != createResourceClaimsOpcode { + return fmt.Errorf("invalid opcode %q; expected %q", op.Opcode, createResourceClaimsOpcode) + } + if !isValidCount(allowParameterization, op.Count, op.CountParam) { + return fmt.Errorf("invalid Count=%d / CountParam=%q", op.Count, op.CountParam) + } + if op.Namespace == "" { + return fmt.Errorf("Namespace must be set") + } + if op.TemplatePath == "" { + return fmt.Errorf("TemplatePath must be set") + } + return nil +} + +func (op *createResourceClaimsOp) collectsMetrics() bool { + return false +} +func (op *createResourceClaimsOp) patchParams(w *workload) (realOp, error) { + if op.CountParam != "" { + var err error + op.Count, err = w.Params.get(op.CountParam[1:]) + if err != nil { + return nil, err + } + } + return op, op.isValid(false) +} + +func (op *createResourceClaimsOp) requiredNamespaces() []string { + return []string{op.Namespace} +} + +func (op *createResourceClaimsOp) run(ctx context.Context, tb testing.TB, clientset clientset.Interface) { + tb.Logf("creating %d claims in namespace %q", op.Count, op.Namespace) + + var claimTemplate *resourcev1alpha2.ResourceClaim + if err := getSpecFromFile(&op.TemplatePath, &claimTemplate); err != nil { + tb.Fatalf("parsing ResourceClaim %q: %v", op.TemplatePath, err) + } + var createErr error + var mutex sync.Mutex + create := func(i int) { + err := func() error { + if _, err := clientset.ResourceV1alpha2().ResourceClaims(op.Namespace).Create(ctx, claimTemplate.DeepCopy(), metav1.CreateOptions{}); err != nil { + return fmt.Errorf("create claim: %v", err) + } + return nil + }() + if err != nil { + mutex.Lock() + defer mutex.Unlock() + createErr = err + } + } + + workers := op.Count + if workers > 30 { + workers = 30 + } + workqueue.ParallelizeUntil(ctx, workers, op.Count, create) + if createErr != nil { + tb.Fatal(createErr.Error()) + } +} + +// createResourceClassOpType customizes createOp for creating a ResourceClass. +type createResourceClassOpType struct{} + +func (c createResourceClassOpType) Opcode() operationCode { return createResourceClassOpcode } +func (c createResourceClassOpType) Name() string { return "ResourceClass" } +func (c createResourceClassOpType) Namespaced() bool { return false } +func (c createResourceClassOpType) CreateCall(client clientset.Interface, namespace string) func(context.Context, *resourcev1alpha2.ResourceClass, metav1.CreateOptions) (*resourcev1alpha2.ResourceClass, error) { + return client.ResourceV1alpha2().ResourceClasses().Create +} + +// createResourceClassOpType customizes createOp for creating a ResourceClaim. +type createResourceClaimTemplateOpType struct{} + +func (c createResourceClaimTemplateOpType) Opcode() operationCode { + return createResourceClaimTemplateOpcode +} +func (c createResourceClaimTemplateOpType) Name() string { return "ResourceClaimTemplate" } +func (c createResourceClaimTemplateOpType) Namespaced() bool { return true } +func (c createResourceClaimTemplateOpType) CreateCall(client clientset.Interface, namespace string) func(context.Context, *resourcev1alpha2.ResourceClaimTemplate, metav1.CreateOptions) (*resourcev1alpha2.ResourceClaimTemplate, error) { + return client.ResourceV1alpha2().ResourceClaimTemplates(namespace).Create +} + +// createResourceDriverOp defines an op where resource claims are created. +type createResourceDriverOp struct { + // Must be createResourceDriverOpcode. + Opcode operationCode + // Name of the driver, used to reference it in a resource class. + DriverName string + // Number of claims to allow per node. Parameterizable through MaxClaimsPerNodeParam. + MaxClaimsPerNode int + // Template parameter for MaxClaimsPerNode. + MaxClaimsPerNodeParam string + // Nodes matching this glob pattern have resources managed by the driver. + Nodes string +} + +var _ realOp = &createResourceDriverOp{} +var _ runnableOp = &createResourceDriverOp{} + +func (op *createResourceDriverOp) isValid(allowParameterization bool) error { + if op.Opcode != createResourceDriverOpcode { + return fmt.Errorf("invalid opcode %q; expected %q", op.Opcode, createResourceDriverOpcode) + } + if !isValidCount(allowParameterization, op.MaxClaimsPerNode, op.MaxClaimsPerNodeParam) { + return fmt.Errorf("invalid MaxClaimsPerNode=%d / MaxClaimsPerNodeParam=%q", op.MaxClaimsPerNode, op.MaxClaimsPerNodeParam) + } + if op.DriverName == "" { + return fmt.Errorf("DriverName must be set") + } + if op.Nodes == "" { + return fmt.Errorf("Nodes must be set") + } + return nil +} + +func (op *createResourceDriverOp) collectsMetrics() bool { + return false +} +func (op *createResourceDriverOp) patchParams(w *workload) (realOp, error) { + if op.MaxClaimsPerNodeParam != "" { + var err error + op.MaxClaimsPerNode, err = w.Params.get(op.MaxClaimsPerNodeParam[1:]) + if err != nil { + return nil, err + } + } + return op, op.isValid(false) +} + +func (op *createResourceDriverOp) requiredNamespaces() []string { return nil } + +func (op *createResourceDriverOp) run(ctx context.Context, tb testing.TB, clientset clientset.Interface) { + tb.Logf("creating resource driver %q for nodes matching %q", op.DriverName, op.Nodes) + + // Start the controller side of the DRA test driver such that it simulates + // per-node resources. + resources := draapp.Resources{ + NodeLocal: true, + MaxAllocations: op.MaxClaimsPerNode, + } + + nodes, err := clientset.CoreV1().Nodes().List(ctx, metav1.ListOptions{}) + if err != nil { + tb.Fatalf("list nodes: %v", err) + } + for _, node := range nodes.Items { + match, err := filepath.Match(op.Nodes, node.Name) + if err != nil { + tb.Fatalf("matching glob pattern %q against node name %q: %v", op.Nodes, node.Name, err) + } + if match { + resources.Nodes = append(resources.Nodes, node.Name) + } + } + + controller := draapp.NewController(clientset, "test-driver.cdi.k8s.io", resources) + ctx, cancel := context.WithCancel(ctx) + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + ctx := klog.NewContext(ctx, klog.LoggerWithName(klog.FromContext(ctx), "DRA test driver")) + controller.Run(ctx, 5 /* workers */) + }() + tb.Cleanup(func() { + tb.Logf("stopping resource driver %q", op.DriverName) + // We must cancel before waiting. + cancel() + wg.Wait() + tb.Logf("stopped resource driver %q", op.DriverName) + }) +} diff --git a/test/integration/scheduler_perf/scheduler_perf_test.go b/test/integration/scheduler_perf/scheduler_perf_test.go index 1a06866d022..319352077eb 100644 --- a/test/integration/scheduler_perf/scheduler_perf_test.go +++ b/test/integration/scheduler_perf/scheduler_perf_test.go @@ -32,6 +32,7 @@ import ( "time" v1 "k8s.io/api/core/v1" + resourcev1alpha2 "k8s.io/api/resource/v1alpha2" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -59,13 +60,17 @@ import ( type operationCode string const ( - createNodesOpcode operationCode = "createNodes" - createNamespacesOpcode operationCode = "createNamespaces" - createPodsOpcode operationCode = "createPods" - createPodSetsOpcode operationCode = "createPodSets" - churnOpcode operationCode = "churn" - barrierOpcode operationCode = "barrier" - sleepOpcode operationCode = "sleep" + createNodesOpcode operationCode = "createNodes" + createNamespacesOpcode operationCode = "createNamespaces" + createPodsOpcode operationCode = "createPods" + createPodSetsOpcode operationCode = "createPodSets" + createResourceClaimsOpcode operationCode = "createResourceClaims" + createResourceClaimTemplateOpcode operationCode = "createResourceClaimTemplate" + createResourceClassOpcode operationCode = "createResourceClass" + createResourceDriverOpcode operationCode = "createResourceDriver" + churnOpcode operationCode = "churn" + barrierOpcode operationCode = "barrier" + sleepOpcode operationCode = "sleep" ) const ( @@ -227,6 +232,10 @@ func (op *op) UnmarshalJSON(b []byte) error { &createNamespacesOp{}, &createPodsOp{}, &createPodSetsOp{}, + &createResourceClaimsOp{}, + &createOp[resourcev1alpha2.ResourceClaimTemplate, createResourceClaimTemplateOpType]{}, + &createOp[resourcev1alpha2.ResourceClass, createResourceClassOpType]{}, + &createResourceDriverOp{}, &churnOp{}, &barrierOp{}, &sleepOp{}, @@ -265,6 +274,18 @@ type realOp interface { patchParams(w *workload) (realOp, error) } +// runnableOp is an interface implemented by some operations. It makes it posssible +// to execute the operation without having to add separate code into runWorkload. +type runnableOp interface { + realOp + + // requiredNamespaces returns all namespaces that runWorkload must create + // before running the operation. + requiredNamespaces() []string + // run executes the steps provided by the operation. + run(context.Context, testing.TB, clientset.Interface) +} + func isValidParameterizable(val string) bool { return strings.HasPrefix(val, "$") } @@ -760,7 +781,7 @@ func runWorkload(ctx context.Context, b *testing.B, tc *testCase, w *workload) [ b.Fatalf("validate scheduler config file failed: %v", err) } } - informerFactory, client, dynClient := mustSetupScheduler(ctx, b, cfg) + informerFactory, client, dynClient := mustSetupScheduler(ctx, b, cfg, tc.FeatureGates) // Additional informers needed for testing. The pod informer was // already created before (scheduler.NewInformerFactory) and the @@ -1014,7 +1035,14 @@ func runWorkload(ctx context.Context, b *testing.B, tc *testCase, w *workload) [ case <-time.After(concreteOp.Duration): } default: - b.Fatalf("op %d: invalid op %v", opIndex, concreteOp) + runable, ok := concreteOp.(runnableOp) + if !ok { + b.Fatalf("op %d: invalid op %v", opIndex, concreteOp) + } + for _, namespace := range runable.requiredNamespaces() { + createNamespaceIfNotPresent(ctx, b, client, namespace, &numPodsScheduledPerNamespace) + } + runable.run(ctx, b, client) } } diff --git a/test/integration/scheduler_perf/util.go b/test/integration/scheduler_perf/util.go index dc191e2e157..04027b903eb 100644 --- a/test/integration/scheduler_perf/util.go +++ b/test/integration/scheduler_perf/util.go @@ -30,6 +30,7 @@ import ( "time" v1 "k8s.io/api/core/v1" + resourcev1alpha2 "k8s.io/api/resource/v1alpha2" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/sets" @@ -38,11 +39,14 @@ import ( coreinformers "k8s.io/client-go/informers/core/v1" clientset "k8s.io/client-go/kubernetes" restclient "k8s.io/client-go/rest" + cliflag "k8s.io/component-base/cli/flag" + "k8s.io/component-base/featuregate" "k8s.io/component-base/metrics/legacyregistry" "k8s.io/component-base/metrics/testutil" "k8s.io/klog/v2" kubeschedulerconfigv1 "k8s.io/kube-scheduler/config/v1" "k8s.io/kubernetes/cmd/kube-apiserver/app/options" + "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/scheduler/apis/config" kubeschedulerscheme "k8s.io/kubernetes/pkg/scheduler/apis/config/scheme" "k8s.io/kubernetes/test/integration/framework" @@ -76,7 +80,7 @@ func newDefaultComponentConfig() (*config.KubeSchedulerConfiguration, error) { // remove resources after finished. // Notes on rate limiter: // - client rate limit is set to 5000. -func mustSetupScheduler(ctx context.Context, b *testing.B, config *config.KubeSchedulerConfiguration) (informers.SharedInformerFactory, clientset.Interface, dynamic.Interface) { +func mustSetupScheduler(ctx context.Context, b *testing.B, config *config.KubeSchedulerConfiguration, enabledFeatures map[featuregate.Feature]bool) (informers.SharedInformerFactory, clientset.Interface, dynamic.Interface) { // Run API server with minimimal logging by default. Can be raised with -v. framework.MinVerbosity = 0 @@ -84,6 +88,13 @@ func mustSetupScheduler(ctx context.Context, b *testing.B, config *config.KubeSc ModifyServerRunOptions: func(opts *options.ServerRunOptions) { // Disable ServiceAccount admission plugin as we don't have serviceaccount controller running. opts.Admission.GenericAdmission.DisablePlugins = []string{"ServiceAccount", "TaintNodesByCondition", "Priority"} + + // Enable DRA API group. + if enabledFeatures[features.DynamicResourceAllocation] { + opts.APIEnablement.RuntimeConfig = cliflag.ConfigurationMap{ + resourcev1alpha2.SchemeGroupVersion.String(): "true", + } + } }, }) b.Cleanup(tearDownFn) @@ -114,7 +125,18 @@ func mustSetupScheduler(ctx context.Context, b *testing.B, config *config.KubeSc // Not all config options will be effective but only those mostly related with scheduler performance will // be applied to start a scheduler, most of them are defined in `scheduler.schedulerOptions`. _, informerFactory := util.StartScheduler(ctx, client, cfg, config) - util.StartFakePVController(ctx, client) + util.StartFakePVController(ctx, client, informerFactory) + + runResourceClaimController := func() {} + if enabledFeatures[features.DynamicResourceAllocation] { + // Testing of DRA with inline resource claims depends on this + // controller for creating and removing ResourceClaims. + runResourceClaimController = util.CreateResourceClaimController(ctx, b, client, informerFactory) + } + + informerFactory.Start(ctx.Done()) + informerFactory.WaitForCacheSync(ctx.Done()) + go runResourceClaimController() return informerFactory, client, dynClient } diff --git a/test/integration/util/util.go b/test/integration/util/util.go index 68442ab89e0..2b3cd7bbb83 100644 --- a/test/integration/util/util.go +++ b/test/integration/util/util.go @@ -47,6 +47,7 @@ import ( "k8s.io/kubernetes/cmd/kube-apiserver/app/options" podutil "k8s.io/kubernetes/pkg/api/v1/pod" "k8s.io/kubernetes/pkg/controller/disruption" + "k8s.io/kubernetes/pkg/controller/resourceclaim" "k8s.io/kubernetes/pkg/controlplane" "k8s.io/kubernetes/pkg/scheduler" kubeschedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config" @@ -102,10 +103,22 @@ func StartScheduler(ctx context.Context, clientSet clientset.Interface, kubeConf return sched, informerFactory } +func CreateResourceClaimController(ctx context.Context, tb testing.TB, clientSet clientset.Interface, informerFactory informers.SharedInformerFactory) func() { + podInformer := informerFactory.Core().V1().Pods() + claimInformer := informerFactory.Resource().V1alpha2().ResourceClaims() + claimTemplateInformer := informerFactory.Resource().V1alpha2().ResourceClaimTemplates() + claimController, err := resourceclaim.NewController(clientSet, podInformer, claimInformer, claimTemplateInformer) + if err != nil { + tb.Fatalf("Error creating claim controller: %v", err) + } + return func() { + go claimController.Run(ctx, 5 /* workers */) + } +} + // StartFakePVController is a simplified pv controller logic that sets PVC VolumeName and annotation for each PV binding. // TODO(mborsz): Use a real PV controller here. -func StartFakePVController(ctx context.Context, clientSet clientset.Interface) { - informerFactory := informers.NewSharedInformerFactory(clientSet, 0) +func StartFakePVController(ctx context.Context, clientSet clientset.Interface, informerFactory informers.SharedInformerFactory) { pvInformer := informerFactory.Core().V1().PersistentVolumes() syncPV := func(obj *v1.PersistentVolume) { @@ -137,8 +150,6 @@ func StartFakePVController(ctx context.Context, clientSet clientset.Interface) { syncPV(obj.(*v1.PersistentVolume)) }, }) - - informerFactory.Start(ctx.Done()) } // TestContext store necessary context info