scheduler perf: add DynamicResourceAllocation test cases

The default scheduler configuration must be based on the v1 API where the
plugin is enabled by default. Then if (and only if) the
DynamicResourceAllocation feature gate for a test is set, the corresponding
API group also gets enabled.

The normal dynamic resource claim controller is started if needed to create
ResourceClaims from ResourceClaimTemplates.

Without the upcoming optimizations in the scheduler, scheduling with dynamic
resources is fairly slow. The new test cases take around 15 minutes wall clock
time on my desktop.
This commit is contained in:
Patrick Ohly 2023-01-24 17:45:57 +01:00
parent 47f1bd9f80
commit 034528a9f0
13 changed files with 518 additions and 15 deletions

View File

@ -0,0 +1,7 @@
rules:
# test/integration should not use test/e2e, but reusing the
# DRA test driver for the simulated cluster during scheduling
# tests is fine.
- selectorRegexp: k8s[.]io/kubernetes/test/e2e
allowedPrefixes:
- k8s.io/kubernetes/test/e2e/dra/test-driver

View File

@ -0,0 +1,7 @@
apiVersion: resource.k8s.io/v1alpha1
kind: ResourceClaimTemplate
metadata:
name: claim-template
spec:
spec:
resourceClassName: scheduler-performance

View File

@ -0,0 +1,15 @@
apiVersion: v1
kind: Node
metadata:
# The name is relevant for selecting whether the driver has resources for this node.
generateName: scheduler-perf-dra-
spec: {}
status:
capacity:
pods: "110"
cpu: "4"
memory: 32Gi
conditions:
- status: "True"
type: Ready
phase: Running

View File

@ -0,0 +1,15 @@
apiVersion: v1
kind: Pod
metadata:
generateName: test-dra
spec:
containers:
- image: registry.k8s.io/pause:3.9
name: pause
resources:
claims:
- name: resource
resourceClaims:
- name: resource
source:
resourceClaimTemplateName: test-claim-template

View File

@ -0,0 +1,7 @@
apiVersion: resource.k8s.io/v1alpha1
kind: ResourceClaimTemplate
metadata:
name: test-claim-template
spec:
spec:
resourceClassName: test-class

View File

@ -0,0 +1,5 @@
apiVersion: resource.k8s.io/v1alpha1
kind: ResourceClass
metadata:
name: test-class
driverName: test-driver.cdi.k8s.io

View File

@ -0,0 +1,5 @@
apiVersion: resource.k8s.io/v1alpha1
kind: ResourceClass
metadata:
name: scheduler-performance
driverName: test-driver.cdi.k8s.io

View File

@ -664,3 +664,66 @@
taintNodes: 1000
normalNodes: 4000
measurePods: 4000
# SchedulingWithResourceClaimTemplate uses a ResourceClaimTemplate
# and dynamically created ResourceClaim instances for each pod.
- name: SchedulingWithResourceClaimTemplate
featureGates:
DynamicResourceAllocation: true
workloadTemplate:
- opcode: createNodes
countParam: $nodesWithoutDRA
- opcode: createNodes
nodeTemplatePath: config/dra/node-with-dra-test-driver.yaml
countParam: $nodesWithDRA
- opcode: createResourceDriver
driverName: test-driver.cdi.k8s.io
nodes: scheduler-perf-dra-*
maxClaimsPerNodeParam: $maxClaimsPerNode
- opcode: createResourceClass
templatePath: config/dra/resourceclass.yaml
- opcode: createResourceClaimTemplate
templatePath: config/dra/resourceclaimtemplate.yaml
namespace: init
- opcode: createPods
namespace: init
countParam: $initPods
podTemplatePath: config/dra/pod-with-claim-template.yaml
- opcode: createResourceClaimTemplate
templatePath: config/dra/resourceclaimtemplate.yaml
namespace: test
- opcode: createPods
namespace: test
countParam: $measurePods
podTemplatePath: config/dra/pod-with-claim-template.yaml
collectMetrics: true
workloads:
- name: fast
params:
# This testcase runs through all code paths without
# taking too long overall.
nodesWithDRA: 1
nodesWithoutDRA: 1
initPods: 0
measurePods: 10
maxClaimsPerNode: 10
- name: 2000pods_100nodes
labels: [performance,fast]
params:
# In this testcase, the number of nodes is smaller
# than the limit for the PodScheduling slices.
nodesWithDRA: 100
nodesWithoutDRA: 0
initPods: 1000
measurePods: 1000
maxClaimsPerNode: 20
- name: 2000pods_200nodes
params:
# In this testcase, the driver and scheduler must
# truncate the PotentialNodes and UnsuitableNodes
# slices.
nodesWithDRA: 200
nodesWithoutDRA: 0
initPods: 1000
measurePods: 1000
maxClaimsPerNode: 10

View File

@ -0,0 +1,89 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package benchmark
import (
"context"
"fmt"
"testing"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
clientset "k8s.io/client-go/kubernetes"
)
// createOp defines an op where some object gets created from a template.
// Everything specific for that object (create call, op code, names) gets
// provided through a type.
type createOp[T interface{}, P createOpType[T]] struct {
// Must match createOpType.Opcode().
Opcode operationCode
// Namespace the object should be created in. Must be empty for cluster-scoped objects.
Namespace string
// Path to spec file describing the object to create.
TemplatePath string
}
func (cro *createOp[T, P]) isValid(allowParameterization bool) error {
var p P
if cro.Opcode != p.Opcode() {
return fmt.Errorf("invalid opcode %q; expected %q", cro.Opcode, p.Opcode())
}
if p.Namespaced() && cro.Namespace == "" {
return fmt.Errorf("Namespace must be set")
}
if !p.Namespaced() && cro.Namespace != "" {
return fmt.Errorf("Namespace must not be set")
}
if cro.TemplatePath == "" {
return fmt.Errorf("TemplatePath must be set")
}
return nil
}
func (cro *createOp[T, P]) collectsMetrics() bool {
return false
}
func (cro *createOp[T, P]) patchParams(w *workload) (realOp, error) {
return cro, cro.isValid(false)
}
func (cro *createOp[T, P]) requiredNamespaces() []string {
if cro.Namespace == "" {
return nil
}
return []string{cro.Namespace}
}
func (cro *createOp[T, P]) run(ctx context.Context, tb testing.TB, client clientset.Interface) {
var obj *T
var p P
if err := getSpecFromFile(&cro.TemplatePath, &obj); err != nil {
tb.Fatalf("parsing %s %q: %v", p.Name(), cro.TemplatePath, err)
}
if _, err := p.CreateCall(client, cro.Namespace)(ctx, obj, metav1.CreateOptions{}); err != nil {
tb.Fatalf("create %s: %v", p.Name(), err)
}
}
// createOpType provides type-specific values for the generic createOp.
type createOpType[T interface{}] interface {
Opcode() operationCode
Name() string
Namespaced() bool
CreateCall(client clientset.Interface, namespace string) func(context.Context, *T, metav1.CreateOptions) (*T, error)
}

View File

@ -0,0 +1,229 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package benchmark
import (
"context"
"fmt"
"path/filepath"
"sync"
"testing"
resourcev1alpha2 "k8s.io/api/resource/v1alpha2"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
draapp "k8s.io/kubernetes/test/e2e/dra/test-driver/app"
)
// createResourceClaimsOp defines an op where resource claims are created.
type createResourceClaimsOp struct {
// Must be createResourceClaimsOpcode.
Opcode operationCode
// Number of claims to create. Parameterizable through CountParam.
Count int
// Template parameter for Count.
CountParam string
// Namespace the claims should be created in.
Namespace string
// Path to spec file describing the claims to create.
TemplatePath string
}
var _ realOp = &createResourceClaimsOp{}
var _ runnableOp = &createResourceClaimsOp{}
func (op *createResourceClaimsOp) isValid(allowParameterization bool) error {
if op.Opcode != createResourceClaimsOpcode {
return fmt.Errorf("invalid opcode %q; expected %q", op.Opcode, createResourceClaimsOpcode)
}
if !isValidCount(allowParameterization, op.Count, op.CountParam) {
return fmt.Errorf("invalid Count=%d / CountParam=%q", op.Count, op.CountParam)
}
if op.Namespace == "" {
return fmt.Errorf("Namespace must be set")
}
if op.TemplatePath == "" {
return fmt.Errorf("TemplatePath must be set")
}
return nil
}
func (op *createResourceClaimsOp) collectsMetrics() bool {
return false
}
func (op *createResourceClaimsOp) patchParams(w *workload) (realOp, error) {
if op.CountParam != "" {
var err error
op.Count, err = w.Params.get(op.CountParam[1:])
if err != nil {
return nil, err
}
}
return op, op.isValid(false)
}
func (op *createResourceClaimsOp) requiredNamespaces() []string {
return []string{op.Namespace}
}
func (op *createResourceClaimsOp) run(ctx context.Context, tb testing.TB, clientset clientset.Interface) {
tb.Logf("creating %d claims in namespace %q", op.Count, op.Namespace)
var claimTemplate *resourcev1alpha2.ResourceClaim
if err := getSpecFromFile(&op.TemplatePath, &claimTemplate); err != nil {
tb.Fatalf("parsing ResourceClaim %q: %v", op.TemplatePath, err)
}
var createErr error
var mutex sync.Mutex
create := func(i int) {
err := func() error {
if _, err := clientset.ResourceV1alpha2().ResourceClaims(op.Namespace).Create(ctx, claimTemplate.DeepCopy(), metav1.CreateOptions{}); err != nil {
return fmt.Errorf("create claim: %v", err)
}
return nil
}()
if err != nil {
mutex.Lock()
defer mutex.Unlock()
createErr = err
}
}
workers := op.Count
if workers > 30 {
workers = 30
}
workqueue.ParallelizeUntil(ctx, workers, op.Count, create)
if createErr != nil {
tb.Fatal(createErr.Error())
}
}
// createResourceClassOpType customizes createOp for creating a ResourceClass.
type createResourceClassOpType struct{}
func (c createResourceClassOpType) Opcode() operationCode { return createResourceClassOpcode }
func (c createResourceClassOpType) Name() string { return "ResourceClass" }
func (c createResourceClassOpType) Namespaced() bool { return false }
func (c createResourceClassOpType) CreateCall(client clientset.Interface, namespace string) func(context.Context, *resourcev1alpha2.ResourceClass, metav1.CreateOptions) (*resourcev1alpha2.ResourceClass, error) {
return client.ResourceV1alpha2().ResourceClasses().Create
}
// createResourceClassOpType customizes createOp for creating a ResourceClaim.
type createResourceClaimTemplateOpType struct{}
func (c createResourceClaimTemplateOpType) Opcode() operationCode {
return createResourceClaimTemplateOpcode
}
func (c createResourceClaimTemplateOpType) Name() string { return "ResourceClaimTemplate" }
func (c createResourceClaimTemplateOpType) Namespaced() bool { return true }
func (c createResourceClaimTemplateOpType) CreateCall(client clientset.Interface, namespace string) func(context.Context, *resourcev1alpha2.ResourceClaimTemplate, metav1.CreateOptions) (*resourcev1alpha2.ResourceClaimTemplate, error) {
return client.ResourceV1alpha2().ResourceClaimTemplates(namespace).Create
}
// createResourceDriverOp defines an op where resource claims are created.
type createResourceDriverOp struct {
// Must be createResourceDriverOpcode.
Opcode operationCode
// Name of the driver, used to reference it in a resource class.
DriverName string
// Number of claims to allow per node. Parameterizable through MaxClaimsPerNodeParam.
MaxClaimsPerNode int
// Template parameter for MaxClaimsPerNode.
MaxClaimsPerNodeParam string
// Nodes matching this glob pattern have resources managed by the driver.
Nodes string
}
var _ realOp = &createResourceDriverOp{}
var _ runnableOp = &createResourceDriverOp{}
func (op *createResourceDriverOp) isValid(allowParameterization bool) error {
if op.Opcode != createResourceDriverOpcode {
return fmt.Errorf("invalid opcode %q; expected %q", op.Opcode, createResourceDriverOpcode)
}
if !isValidCount(allowParameterization, op.MaxClaimsPerNode, op.MaxClaimsPerNodeParam) {
return fmt.Errorf("invalid MaxClaimsPerNode=%d / MaxClaimsPerNodeParam=%q", op.MaxClaimsPerNode, op.MaxClaimsPerNodeParam)
}
if op.DriverName == "" {
return fmt.Errorf("DriverName must be set")
}
if op.Nodes == "" {
return fmt.Errorf("Nodes must be set")
}
return nil
}
func (op *createResourceDriverOp) collectsMetrics() bool {
return false
}
func (op *createResourceDriverOp) patchParams(w *workload) (realOp, error) {
if op.MaxClaimsPerNodeParam != "" {
var err error
op.MaxClaimsPerNode, err = w.Params.get(op.MaxClaimsPerNodeParam[1:])
if err != nil {
return nil, err
}
}
return op, op.isValid(false)
}
func (op *createResourceDriverOp) requiredNamespaces() []string { return nil }
func (op *createResourceDriverOp) run(ctx context.Context, tb testing.TB, clientset clientset.Interface) {
tb.Logf("creating resource driver %q for nodes matching %q", op.DriverName, op.Nodes)
// Start the controller side of the DRA test driver such that it simulates
// per-node resources.
resources := draapp.Resources{
NodeLocal: true,
MaxAllocations: op.MaxClaimsPerNode,
}
nodes, err := clientset.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
if err != nil {
tb.Fatalf("list nodes: %v", err)
}
for _, node := range nodes.Items {
match, err := filepath.Match(op.Nodes, node.Name)
if err != nil {
tb.Fatalf("matching glob pattern %q against node name %q: %v", op.Nodes, node.Name, err)
}
if match {
resources.Nodes = append(resources.Nodes, node.Name)
}
}
controller := draapp.NewController(clientset, "test-driver.cdi.k8s.io", resources)
ctx, cancel := context.WithCancel(ctx)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
ctx := klog.NewContext(ctx, klog.LoggerWithName(klog.FromContext(ctx), "DRA test driver"))
controller.Run(ctx, 5 /* workers */)
}()
tb.Cleanup(func() {
tb.Logf("stopping resource driver %q", op.DriverName)
// We must cancel before waiting.
cancel()
wg.Wait()
tb.Logf("stopped resource driver %q", op.DriverName)
})
}

View File

@ -32,6 +32,7 @@ import (
"time"
v1 "k8s.io/api/core/v1"
resourcev1alpha2 "k8s.io/api/resource/v1alpha2"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -59,13 +60,17 @@ import (
type operationCode string
const (
createNodesOpcode operationCode = "createNodes"
createNamespacesOpcode operationCode = "createNamespaces"
createPodsOpcode operationCode = "createPods"
createPodSetsOpcode operationCode = "createPodSets"
churnOpcode operationCode = "churn"
barrierOpcode operationCode = "barrier"
sleepOpcode operationCode = "sleep"
createNodesOpcode operationCode = "createNodes"
createNamespacesOpcode operationCode = "createNamespaces"
createPodsOpcode operationCode = "createPods"
createPodSetsOpcode operationCode = "createPodSets"
createResourceClaimsOpcode operationCode = "createResourceClaims"
createResourceClaimTemplateOpcode operationCode = "createResourceClaimTemplate"
createResourceClassOpcode operationCode = "createResourceClass"
createResourceDriverOpcode operationCode = "createResourceDriver"
churnOpcode operationCode = "churn"
barrierOpcode operationCode = "barrier"
sleepOpcode operationCode = "sleep"
)
const (
@ -227,6 +232,10 @@ func (op *op) UnmarshalJSON(b []byte) error {
&createNamespacesOp{},
&createPodsOp{},
&createPodSetsOp{},
&createResourceClaimsOp{},
&createOp[resourcev1alpha2.ResourceClaimTemplate, createResourceClaimTemplateOpType]{},
&createOp[resourcev1alpha2.ResourceClass, createResourceClassOpType]{},
&createResourceDriverOp{},
&churnOp{},
&barrierOp{},
&sleepOp{},
@ -265,6 +274,18 @@ type realOp interface {
patchParams(w *workload) (realOp, error)
}
// runnableOp is an interface implemented by some operations. It makes it posssible
// to execute the operation without having to add separate code into runWorkload.
type runnableOp interface {
realOp
// requiredNamespaces returns all namespaces that runWorkload must create
// before running the operation.
requiredNamespaces() []string
// run executes the steps provided by the operation.
run(context.Context, testing.TB, clientset.Interface)
}
func isValidParameterizable(val string) bool {
return strings.HasPrefix(val, "$")
}
@ -760,7 +781,7 @@ func runWorkload(ctx context.Context, b *testing.B, tc *testCase, w *workload) [
b.Fatalf("validate scheduler config file failed: %v", err)
}
}
informerFactory, client, dynClient := mustSetupScheduler(ctx, b, cfg)
informerFactory, client, dynClient := mustSetupScheduler(ctx, b, cfg, tc.FeatureGates)
// Additional informers needed for testing. The pod informer was
// already created before (scheduler.NewInformerFactory) and the
@ -1014,7 +1035,14 @@ func runWorkload(ctx context.Context, b *testing.B, tc *testCase, w *workload) [
case <-time.After(concreteOp.Duration):
}
default:
b.Fatalf("op %d: invalid op %v", opIndex, concreteOp)
runable, ok := concreteOp.(runnableOp)
if !ok {
b.Fatalf("op %d: invalid op %v", opIndex, concreteOp)
}
for _, namespace := range runable.requiredNamespaces() {
createNamespaceIfNotPresent(ctx, b, client, namespace, &numPodsScheduledPerNamespace)
}
runable.run(ctx, b, client)
}
}

View File

@ -30,6 +30,7 @@ import (
"time"
v1 "k8s.io/api/core/v1"
resourcev1alpha2 "k8s.io/api/resource/v1alpha2"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/sets"
@ -38,11 +39,14 @@ import (
coreinformers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
cliflag "k8s.io/component-base/cli/flag"
"k8s.io/component-base/featuregate"
"k8s.io/component-base/metrics/legacyregistry"
"k8s.io/component-base/metrics/testutil"
"k8s.io/klog/v2"
kubeschedulerconfigv1 "k8s.io/kube-scheduler/config/v1"
"k8s.io/kubernetes/cmd/kube-apiserver/app/options"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler/apis/config"
kubeschedulerscheme "k8s.io/kubernetes/pkg/scheduler/apis/config/scheme"
"k8s.io/kubernetes/test/integration/framework"
@ -76,7 +80,7 @@ func newDefaultComponentConfig() (*config.KubeSchedulerConfiguration, error) {
// remove resources after finished.
// Notes on rate limiter:
// - client rate limit is set to 5000.
func mustSetupScheduler(ctx context.Context, b *testing.B, config *config.KubeSchedulerConfiguration) (informers.SharedInformerFactory, clientset.Interface, dynamic.Interface) {
func mustSetupScheduler(ctx context.Context, b *testing.B, config *config.KubeSchedulerConfiguration, enabledFeatures map[featuregate.Feature]bool) (informers.SharedInformerFactory, clientset.Interface, dynamic.Interface) {
// Run API server with minimimal logging by default. Can be raised with -v.
framework.MinVerbosity = 0
@ -84,6 +88,13 @@ func mustSetupScheduler(ctx context.Context, b *testing.B, config *config.KubeSc
ModifyServerRunOptions: func(opts *options.ServerRunOptions) {
// Disable ServiceAccount admission plugin as we don't have serviceaccount controller running.
opts.Admission.GenericAdmission.DisablePlugins = []string{"ServiceAccount", "TaintNodesByCondition", "Priority"}
// Enable DRA API group.
if enabledFeatures[features.DynamicResourceAllocation] {
opts.APIEnablement.RuntimeConfig = cliflag.ConfigurationMap{
resourcev1alpha2.SchemeGroupVersion.String(): "true",
}
}
},
})
b.Cleanup(tearDownFn)
@ -114,7 +125,18 @@ func mustSetupScheduler(ctx context.Context, b *testing.B, config *config.KubeSc
// Not all config options will be effective but only those mostly related with scheduler performance will
// be applied to start a scheduler, most of them are defined in `scheduler.schedulerOptions`.
_, informerFactory := util.StartScheduler(ctx, client, cfg, config)
util.StartFakePVController(ctx, client)
util.StartFakePVController(ctx, client, informerFactory)
runResourceClaimController := func() {}
if enabledFeatures[features.DynamicResourceAllocation] {
// Testing of DRA with inline resource claims depends on this
// controller for creating and removing ResourceClaims.
runResourceClaimController = util.CreateResourceClaimController(ctx, b, client, informerFactory)
}
informerFactory.Start(ctx.Done())
informerFactory.WaitForCacheSync(ctx.Done())
go runResourceClaimController()
return informerFactory, client, dynClient
}

View File

@ -47,6 +47,7 @@ import (
"k8s.io/kubernetes/cmd/kube-apiserver/app/options"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/controller/disruption"
"k8s.io/kubernetes/pkg/controller/resourceclaim"
"k8s.io/kubernetes/pkg/controlplane"
"k8s.io/kubernetes/pkg/scheduler"
kubeschedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config"
@ -102,10 +103,22 @@ func StartScheduler(ctx context.Context, clientSet clientset.Interface, kubeConf
return sched, informerFactory
}
func CreateResourceClaimController(ctx context.Context, tb testing.TB, clientSet clientset.Interface, informerFactory informers.SharedInformerFactory) func() {
podInformer := informerFactory.Core().V1().Pods()
claimInformer := informerFactory.Resource().V1alpha2().ResourceClaims()
claimTemplateInformer := informerFactory.Resource().V1alpha2().ResourceClaimTemplates()
claimController, err := resourceclaim.NewController(clientSet, podInformer, claimInformer, claimTemplateInformer)
if err != nil {
tb.Fatalf("Error creating claim controller: %v", err)
}
return func() {
go claimController.Run(ctx, 5 /* workers */)
}
}
// StartFakePVController is a simplified pv controller logic that sets PVC VolumeName and annotation for each PV binding.
// TODO(mborsz): Use a real PV controller here.
func StartFakePVController(ctx context.Context, clientSet clientset.Interface) {
informerFactory := informers.NewSharedInformerFactory(clientSet, 0)
func StartFakePVController(ctx context.Context, clientSet clientset.Interface, informerFactory informers.SharedInformerFactory) {
pvInformer := informerFactory.Core().V1().PersistentVolumes()
syncPV := func(obj *v1.PersistentVolume) {
@ -137,8 +150,6 @@ func StartFakePVController(ctx context.Context, clientSet clientset.Interface) {
syncPV(obj.(*v1.PersistentVolume))
},
})
informerFactory.Start(ctx.Done())
}
// TestContext store necessary context info