diff --git a/test/integration/scheduler_perf/create.go b/test/integration/scheduler_perf/create.go index a2fcd296542..b80adc18d78 100644 --- a/test/integration/scheduler_perf/create.go +++ b/test/integration/scheduler_perf/create.go @@ -19,10 +19,10 @@ package benchmark import ( "context" "fmt" - "testing" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" clientset "k8s.io/client-go/kubernetes" + "k8s.io/kubernetes/test/utils/ktesting" ) // createOp defines an op where some object gets created from a template. @@ -69,14 +69,14 @@ func (cro *createOp[T, P]) requiredNamespaces() []string { return []string{cro.Namespace} } -func (cro *createOp[T, P]) run(ctx context.Context, tb testing.TB, client clientset.Interface) { +func (cro *createOp[T, P]) run(tCtx ktesting.TContext) { var obj *T var p P if err := getSpecFromFile(&cro.TemplatePath, &obj); err != nil { - tb.Fatalf("parsing %s %q: %v", p.Name(), cro.TemplatePath, err) + tCtx.Fatalf("parsing %s %q: %v", p.Name(), cro.TemplatePath, err) } - if _, err := p.CreateCall(client, cro.Namespace)(ctx, obj, metav1.CreateOptions{}); err != nil { - tb.Fatalf("create %s: %v", p.Name(), err) + if _, err := p.CreateCall(tCtx.Client(), cro.Namespace)(tCtx, obj, metav1.CreateOptions{}); err != nil { + tCtx.Fatalf("create %s: %v", p.Name(), err) } } diff --git a/test/integration/scheduler_perf/dra.go b/test/integration/scheduler_perf/dra.go index eef0b2c1abd..f8d96aedd85 100644 --- a/test/integration/scheduler_perf/dra.go +++ b/test/integration/scheduler_perf/dra.go @@ -21,7 +21,6 @@ import ( "fmt" "path/filepath" "sync" - "testing" resourcev1alpha2 "k8s.io/api/resource/v1alpha2" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -29,6 +28,7 @@ import ( "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" draapp "k8s.io/kubernetes/test/e2e/dra/test-driver/app" + "k8s.io/kubernetes/test/utils/ktesting" ) // createResourceClaimsOp defines an op where resource claims are created. @@ -82,18 +82,18 @@ func (op *createResourceClaimsOp) requiredNamespaces() []string { return []string{op.Namespace} } -func (op *createResourceClaimsOp) run(ctx context.Context, tb testing.TB, clientset clientset.Interface) { - tb.Logf("creating %d claims in namespace %q", op.Count, op.Namespace) +func (op *createResourceClaimsOp) run(tCtx ktesting.TContext) { + tCtx.Logf("creating %d claims in namespace %q", op.Count, op.Namespace) var claimTemplate *resourcev1alpha2.ResourceClaim if err := getSpecFromFile(&op.TemplatePath, &claimTemplate); err != nil { - tb.Fatalf("parsing ResourceClaim %q: %v", op.TemplatePath, err) + tCtx.Fatalf("parsing ResourceClaim %q: %v", op.TemplatePath, err) } var createErr error var mutex sync.Mutex create := func(i int) { err := func() error { - if _, err := clientset.ResourceV1alpha2().ResourceClaims(op.Namespace).Create(ctx, claimTemplate.DeepCopy(), metav1.CreateOptions{}); err != nil { + if _, err := tCtx.Client().ResourceV1alpha2().ResourceClaims(op.Namespace).Create(tCtx, claimTemplate.DeepCopy(), metav1.CreateOptions{}); err != nil { return fmt.Errorf("create claim: %v", err) } return nil @@ -109,9 +109,9 @@ func (op *createResourceClaimsOp) run(ctx context.Context, tb testing.TB, client if workers > 30 { workers = 30 } - workqueue.ParallelizeUntil(ctx, workers, op.Count, create) + workqueue.ParallelizeUntil(tCtx, workers, op.Count, create) if createErr != nil { - tb.Fatal(createErr.Error()) + tCtx.Fatal(createErr.Error()) } } @@ -186,8 +186,8 @@ func (op *createResourceDriverOp) patchParams(w *workload) (realOp, error) { func (op *createResourceDriverOp) requiredNamespaces() []string { return nil } -func (op *createResourceDriverOp) run(ctx context.Context, tb testing.TB, clientset clientset.Interface) { - tb.Logf("creating resource driver %q for nodes matching %q", op.DriverName, op.Nodes) +func (op *createResourceDriverOp) run(tCtx ktesting.TContext) { + tCtx.Logf("creating resource driver %q for nodes matching %q", op.DriverName, op.Nodes) // Start the controller side of the DRA test driver such that it simulates // per-node resources. @@ -197,22 +197,22 @@ func (op *createResourceDriverOp) run(ctx context.Context, tb testing.TB, client MaxAllocations: op.MaxClaimsPerNode, } - nodes, err := clientset.CoreV1().Nodes().List(ctx, metav1.ListOptions{}) + nodes, err := tCtx.Client().CoreV1().Nodes().List(tCtx, metav1.ListOptions{}) if err != nil { - tb.Fatalf("list nodes: %v", err) + tCtx.Fatalf("list nodes: %v", err) } for _, node := range nodes.Items { match, err := filepath.Match(op.Nodes, node.Name) if err != nil { - tb.Fatalf("matching glob pattern %q against node name %q: %v", op.Nodes, node.Name, err) + tCtx.Fatalf("matching glob pattern %q against node name %q: %v", op.Nodes, node.Name, err) } if match { resources.Nodes = append(resources.Nodes, node.Name) } } - controller := draapp.NewController(clientset, resources) - ctx, cancel := context.WithCancel(ctx) + controller := draapp.NewController(tCtx.Client(), resources) + ctx, cancel := context.WithCancel(tCtx) var wg sync.WaitGroup wg.Add(1) go func() { @@ -220,11 +220,11 @@ func (op *createResourceDriverOp) run(ctx context.Context, tb testing.TB, client ctx := klog.NewContext(ctx, klog.LoggerWithName(klog.FromContext(ctx), op.DriverName)) controller.Run(ctx, 5 /* workers */) }() - tb.Cleanup(func() { - tb.Logf("stopping resource driver %q", op.DriverName) + tCtx.Cleanup(func() { + tCtx.Logf("stopping resource driver %q", op.DriverName) // We must cancel before waiting. cancel() wg.Wait() - tb.Logf("stopped resource driver %q", op.DriverName) + tCtx.Logf("stopped resource driver %q", op.DriverName) }) } diff --git a/test/integration/scheduler_perf/scheduler_perf.go b/test/integration/scheduler_perf/scheduler_perf.go index 2673a3e1a74..37eeb5c0a62 100644 --- a/test/integration/scheduler_perf/scheduler_perf.go +++ b/test/integration/scheduler_perf/scheduler_perf.go @@ -59,6 +59,7 @@ import ( "k8s.io/kubernetes/test/integration/framework" testutils "k8s.io/kubernetes/test/utils" "k8s.io/kubernetes/test/utils/ktesting" + "k8s.io/kubernetes/test/utils/ktesting/initoption" "sigs.k8s.io/yaml" ) @@ -293,7 +294,7 @@ type runnableOp interface { // before running the operation. requiredNamespaces() []string // run executes the steps provided by the operation. - run(context.Context, testing.TB, clientset.Interface) + run(ktesting.TContext) } func isValidParameterizable(val string) bool { @@ -674,6 +675,7 @@ func RunBenchmarkPerfScheduling(b *testing.B, outOfTreePluginRegistry frameworkr if !enabled(*perfSchedulingLabelFilter, append(tc.Labels, w.Labels...)...) { b.Skipf("disabled by label filter %q", *perfSchedulingLabelFilter) } + tCtx := ktesting.Init(b, initoption.PerTestOutput(*useTestingLog)) // Ensure that there are no leaked // goroutines. They could influence @@ -684,28 +686,19 @@ func RunBenchmarkPerfScheduling(b *testing.B, outOfTreePluginRegistry frameworkr // quit *before* restoring klog settings. framework.GoleakCheck(b) - ctx := context.Background() - - if *useTestingLog { - // In addition to redirection klog - // output, also enable contextual - // logging. - _, ctx = ktesting.NewTestContext(b) - } - // Now that we are ready to run, start // etcd. framework.StartEtcd(b, output) // 30 minutes should be plenty enough even for the 5000-node tests. - ctx, cancel := context.WithTimeout(ctx, 30*time.Minute) - b.Cleanup(cancel) + timeout := 30 * time.Minute + tCtx = ktesting.WithTimeout(tCtx, timeout, fmt.Sprintf("timed out after the %s per-test timeout", timeout)) for feature, flag := range tc.FeatureGates { defer featuregatetesting.SetFeatureGateDuringTest(b, utilfeature.DefaultFeatureGate, feature, flag)() } - informerFactory, client, dyncClient := setupClusterForWorkload(ctx, b, tc.SchedulerConfigPath, tc.FeatureGates, outOfTreePluginRegistry) - results := runWorkload(ctx, b, tc, w, informerFactory, client, dyncClient, false) + informerFactory, tCtx := setupClusterForWorkload(tCtx, tc.SchedulerConfigPath, tc.FeatureGates, outOfTreePluginRegistry) + results := runWorkload(tCtx, tc, w, informerFactory, false) dataItems.DataItems = append(dataItems.DataItems, results...) if len(results) > 0 { @@ -771,7 +764,7 @@ func loadSchedulerConfig(file string) (*config.KubeSchedulerConfiguration, error return nil, fmt.Errorf("couldn't decode as KubeSchedulerConfiguration, got %s: ", gvk) } -func unrollWorkloadTemplate(tb testing.TB, wt []op, w *workload) []op { +func unrollWorkloadTemplate(tb ktesting.TB, wt []op, w *workload) []op { var unrolled []op for opIndex, o := range wt { realOp, err := o.realOp.patchParams(w) @@ -794,23 +787,23 @@ func unrollWorkloadTemplate(tb testing.TB, wt []op, w *workload) []op { return unrolled } -func setupClusterForWorkload(ctx context.Context, tb testing.TB, configPath string, featureGates map[featuregate.Feature]bool, outOfTreePluginRegistry frameworkruntime.Registry) (informers.SharedInformerFactory, clientset.Interface, dynamic.Interface) { +func setupClusterForWorkload(tCtx ktesting.TContext, configPath string, featureGates map[featuregate.Feature]bool, outOfTreePluginRegistry frameworkruntime.Registry) (informers.SharedInformerFactory, ktesting.TContext) { var cfg *config.KubeSchedulerConfiguration var err error if configPath != "" { cfg, err = loadSchedulerConfig(configPath) if err != nil { - tb.Fatalf("error loading scheduler config file: %v", err) + tCtx.Fatalf("error loading scheduler config file: %v", err) } if err = validation.ValidateKubeSchedulerConfiguration(cfg); err != nil { - tb.Fatalf("validate scheduler config file failed: %v", err) + tCtx.Fatalf("validate scheduler config file failed: %v", err) } } - return mustSetupCluster(ctx, tb, cfg, featureGates, outOfTreePluginRegistry) + return mustSetupCluster(tCtx, cfg, featureGates, outOfTreePluginRegistry) } -func runWorkload(ctx context.Context, tb testing.TB, tc *testCase, w *workload, informerFactory informers.SharedInformerFactory, client clientset.Interface, dynClient dynamic.Interface, cleanup bool) []DataItem { - b, benchmarking := tb.(*testing.B) +func runWorkload(tCtx ktesting.TContext, tc *testCase, w *workload, informerFactory informers.SharedInformerFactory, cleanup bool) []DataItem { + b, benchmarking := tCtx.TB().(*testing.B) if benchmarking { start := time.Now() b.Cleanup(func() { @@ -839,10 +832,10 @@ func runWorkload(ctx context.Context, tb testing.TB, tc *testCase, w *workload, podInformer := informerFactory.Core().V1().Pods() // Everything else started by this function gets stopped before it returns. - ctx, cancel := context.WithCancel(ctx) + tCtx = ktesting.WithCancel(tCtx) var wg sync.WaitGroup defer wg.Wait() - defer cancel() + defer tCtx.Cancel("workload is done") var mu sync.Mutex var dataItems []DataItem @@ -853,48 +846,48 @@ func runWorkload(ctx context.Context, tb testing.TB, tc *testCase, w *workload, if cleanup { // This must run before controllers get shut down. - defer cleanupWorkload(ctx, tb, tc, client, numPodsScheduledPerNamespace) + defer cleanupWorkload(tCtx, tc, numPodsScheduledPerNamespace) } - for opIndex, op := range unrollWorkloadTemplate(tb, tc.WorkloadTemplate, w) { + for opIndex, op := range unrollWorkloadTemplate(tCtx, tc.WorkloadTemplate, w) { realOp, err := op.realOp.patchParams(w) if err != nil { - tb.Fatalf("op %d: %v", opIndex, err) + tCtx.Fatalf("op %d: %v", opIndex, err) } select { - case <-ctx.Done(): - tb.Fatalf("op %d: %v", opIndex, ctx.Err()) + case <-tCtx.Done(): + tCtx.Fatalf("op %d: %v", opIndex, context.Cause(tCtx)) default: } switch concreteOp := realOp.(type) { case *createNodesOp: - nodePreparer, err := getNodePreparer(fmt.Sprintf("node-%d-", opIndex), concreteOp, client) + nodePreparer, err := getNodePreparer(fmt.Sprintf("node-%d-", opIndex), concreteOp, tCtx.Client()) if err != nil { - tb.Fatalf("op %d: %v", opIndex, err) + tCtx.Fatalf("op %d: %v", opIndex, err) } - if err := nodePreparer.PrepareNodes(ctx, nextNodeIndex); err != nil { - tb.Fatalf("op %d: %v", opIndex, err) + if err := nodePreparer.PrepareNodes(tCtx, nextNodeIndex); err != nil { + tCtx.Fatalf("op %d: %v", opIndex, err) } if cleanup { defer func() { - if err := nodePreparer.CleanupNodes(ctx); err != nil { - tb.Fatalf("failed to clean up nodes, error: %v", err) + if err := nodePreparer.CleanupNodes(tCtx); err != nil { + tCtx.Fatalf("failed to clean up nodes, error: %v", err) } }() } nextNodeIndex += concreteOp.Count case *createNamespacesOp: - nsPreparer, err := newNamespacePreparer(concreteOp, client, tb) + nsPreparer, err := newNamespacePreparer(tCtx, concreteOp) if err != nil { - tb.Fatalf("op %d: %v", opIndex, err) + tCtx.Fatalf("op %d: %v", opIndex, err) } - if err := nsPreparer.prepare(ctx); err != nil { - err2 := nsPreparer.cleanup(ctx) + if err := nsPreparer.prepare(tCtx); err != nil { + err2 := nsPreparer.cleanup(tCtx) if err2 != nil { err = fmt.Errorf("prepare: %v; cleanup: %v", err, err2) } - tb.Fatalf("op %d: %v", opIndex, err) + tCtx.Fatalf("op %d: %v", opIndex, err) } for _, n := range nsPreparer.namespaces() { if _, ok := numPodsScheduledPerNamespace[n]; ok { @@ -911,7 +904,7 @@ func runWorkload(ctx context.Context, tb testing.TB, tc *testCase, w *workload, if concreteOp.Namespace != nil { namespace = *concreteOp.Namespace } - createNamespaceIfNotPresent(ctx, tb, client, namespace, &numPodsScheduledPerNamespace) + createNamespaceIfNotPresent(tCtx, namespace, &numPodsScheduledPerNamespace) if concreteOp.PodTemplatePath == nil { concreteOp.PodTemplatePath = tc.DefaultPodTemplatePath } @@ -919,18 +912,17 @@ func runWorkload(ctx context.Context, tb testing.TB, tc *testCase, w *workload, // This needs a separate context and wait group because // the code below needs to be sure that the goroutines // are stopped. - var collectorCtx context.Context - var collectorCancel func() + var collectorCtx ktesting.TContext var collectorWG sync.WaitGroup defer collectorWG.Wait() if concreteOp.CollectMetrics { - collectorCtx, collectorCancel = context.WithCancel(ctx) - defer collectorCancel() - name := tb.Name() + collectorCtx = ktesting.WithCancel(tCtx) + defer collectorCtx.Cancel("cleaning up") + name := tCtx.Name() // The first part is the same for each work load, therefore we can strip it. name = name[strings.Index(name, "/")+1:] - collectors = getTestDataCollectors(tb, podInformer, fmt.Sprintf("%s/%s", name, namespace), namespace, tc.MetricsCollectorConfig, throughputErrorMargin) + collectors = getTestDataCollectors(collectorCtx, podInformer, fmt.Sprintf("%s/%s", name, namespace), namespace, tc.MetricsCollectorConfig, throughputErrorMargin) for _, collector := range collectors { // Need loop-local variable for function below. collector := collector @@ -941,23 +933,23 @@ func runWorkload(ctx context.Context, tb testing.TB, tc *testCase, w *workload, }() } } - if err := createPods(ctx, tb, namespace, concreteOp, client); err != nil { - tb.Fatalf("op %d: %v", opIndex, err) + if err := createPods(tCtx, namespace, concreteOp); err != nil { + tCtx.Fatalf("op %d: %v", opIndex, err) } if concreteOp.SkipWaitToCompletion { // Only record those namespaces that may potentially require barriers // in the future. numPodsScheduledPerNamespace[namespace] += concreteOp.Count } else { - if err := waitUntilPodsScheduledInNamespace(ctx, tb, podInformer, namespace, concreteOp.Count); err != nil { - tb.Fatalf("op %d: error in waiting for pods to get scheduled: %v", opIndex, err) + if err := waitUntilPodsScheduledInNamespace(tCtx, podInformer, namespace, concreteOp.Count); err != nil { + tCtx.Fatalf("op %d: error in waiting for pods to get scheduled: %v", opIndex, err) } } if concreteOp.CollectMetrics { // CollectMetrics and SkipWaitToCompletion can never be true at the // same time, so if we're here, it means that all pods have been // scheduled. - collectorCancel() + collectorCtx.Cancel("collecting metrix, collector must stop first") collectorWG.Wait() mu.Lock() for _, collector := range collectors { @@ -980,11 +972,11 @@ func runWorkload(ctx context.Context, tb testing.TB, tc *testCase, w *workload, } else { namespace = fmt.Sprintf("namespace-%d", opIndex) } - restMapper := restmapper.NewDeferredDiscoveryRESTMapper(cacheddiscovery.NewMemCacheClient(client.Discovery())) + restMapper := restmapper.NewDeferredDiscoveryRESTMapper(cacheddiscovery.NewMemCacheClient(tCtx.Client().Discovery())) // Ensure the namespace exists. nsObj := &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: namespace}} - if _, err := client.CoreV1().Namespaces().Create(ctx, nsObj, metav1.CreateOptions{}); err != nil && !apierrors.IsAlreadyExists(err) { - tb.Fatalf("op %d: unable to create namespace %v: %v", opIndex, namespace, err) + if _, err := tCtx.Client().CoreV1().Namespaces().Create(tCtx, nsObj, metav1.CreateOptions{}); err != nil && !apierrors.IsAlreadyExists(err) { + tCtx.Fatalf("op %d: unable to create namespace %v: %v", opIndex, namespace, err) } var churnFns []func(name string) string @@ -992,31 +984,31 @@ func runWorkload(ctx context.Context, tb testing.TB, tc *testCase, w *workload, for i, path := range concreteOp.TemplatePaths { unstructuredObj, gvk, err := getUnstructuredFromFile(path) if err != nil { - tb.Fatalf("op %d: unable to parse the %v-th template path: %v", opIndex, i, err) + tCtx.Fatalf("op %d: unable to parse the %v-th template path: %v", opIndex, i, err) } // Obtain GVR. mapping, err := restMapper.RESTMapping(gvk.GroupKind(), gvk.Version) if err != nil { - tb.Fatalf("op %d: unable to find GVR for %v: %v", opIndex, gvk, err) + tCtx.Fatalf("op %d: unable to find GVR for %v: %v", opIndex, gvk, err) } gvr := mapping.Resource // Distinguish cluster-scoped with namespaced API objects. var dynRes dynamic.ResourceInterface if mapping.Scope.Name() == meta.RESTScopeNameNamespace { - dynRes = dynClient.Resource(gvr).Namespace(namespace) + dynRes = tCtx.Dynamic().Resource(gvr).Namespace(namespace) } else { - dynRes = dynClient.Resource(gvr) + dynRes = tCtx.Dynamic().Resource(gvr) } churnFns = append(churnFns, func(name string) string { if name != "" { - if err := dynRes.Delete(ctx, name, metav1.DeleteOptions{}); err != nil { - tb.Errorf("op %d: unable to delete %v: %v", opIndex, name, err) + if err := dynRes.Delete(tCtx, name, metav1.DeleteOptions{}); err != nil { + tCtx.Errorf("op %d: unable to delete %v: %v", opIndex, name, err) } return "" } - live, err := dynRes.Create(ctx, unstructuredObj, metav1.CreateOptions{}) + live, err := dynRes.Create(tCtx, unstructuredObj, metav1.CreateOptions{}) if err != nil { return "" } @@ -1047,7 +1039,7 @@ func runWorkload(ctx context.Context, tb testing.TB, tc *testCase, w *workload, churnFns[i]("") } count++ - case <-ctx.Done(): + case <-tCtx.Done(): return } } @@ -1070,7 +1062,7 @@ func runWorkload(ctx context.Context, tb testing.TB, tc *testCase, w *workload, retVals[i][count%concreteOp.Number] = churnFns[i](retVals[i][count%concreteOp.Number]) } count++ - case <-ctx.Done(): + case <-tCtx.Done(): return } } @@ -1080,11 +1072,11 @@ func runWorkload(ctx context.Context, tb testing.TB, tc *testCase, w *workload, case *barrierOp: for _, namespace := range concreteOp.Namespaces { if _, ok := numPodsScheduledPerNamespace[namespace]; !ok { - tb.Fatalf("op %d: unknown namespace %s", opIndex, namespace) + tCtx.Fatalf("op %d: unknown namespace %s", opIndex, namespace) } } - if err := waitUntilPodsScheduled(ctx, tb, podInformer, concreteOp.Namespaces, numPodsScheduledPerNamespace); err != nil { - tb.Fatalf("op %d: %v", opIndex, err) + if err := waitUntilPodsScheduled(tCtx, podInformer, concreteOp.Namespaces, numPodsScheduledPerNamespace); err != nil { + tCtx.Fatalf("op %d: %v", opIndex, err) } // At the end of the barrier, we can be sure that there are no pods // pending scheduling in the namespaces that we just blocked on. @@ -1098,25 +1090,25 @@ func runWorkload(ctx context.Context, tb testing.TB, tc *testCase, w *workload, case *sleepOp: select { - case <-ctx.Done(): + case <-tCtx.Done(): case <-time.After(concreteOp.Duration): } default: runable, ok := concreteOp.(runnableOp) if !ok { - tb.Fatalf("op %d: invalid op %v", opIndex, concreteOp) + tCtx.Fatalf("op %d: invalid op %v", opIndex, concreteOp) } for _, namespace := range runable.requiredNamespaces() { - createNamespaceIfNotPresent(ctx, tb, client, namespace, &numPodsScheduledPerNamespace) + createNamespaceIfNotPresent(tCtx, namespace, &numPodsScheduledPerNamespace) } - runable.run(ctx, tb, client) + runable.run(tCtx) } } // check unused params and inform users unusedParams := w.unusedParams() if len(unusedParams) != 0 { - tb.Fatalf("the parameters %v are defined on workload %s, but unused.\nPlease make sure there are no typos.", unusedParams, w.Name) + tCtx.Fatalf("the parameters %v are defined on workload %s, but unused.\nPlease make sure there are no typos.", unusedParams, w.Name) } // Some tests have unschedulable pods. Do not add an implicit barrier at the @@ -1133,17 +1125,17 @@ func runWorkload(ctx context.Context, tb testing.TB, tc *testCase, w *workload, // // Calling cleanupWorkload can be skipped if it is known that the next workload // will run with a fresh etcd instance. -func cleanupWorkload(ctx context.Context, tb testing.TB, tc *testCase, client clientset.Interface, numPodsScheduledPerNamespace map[string]int) { +func cleanupWorkload(tCtx ktesting.TContext, tc *testCase, numPodsScheduledPerNamespace map[string]int) { deleteNow := *metav1.NewDeleteOptions(0) for namespace := range numPodsScheduledPerNamespace { // Pods have to be deleted explicitly, with no grace period. Normally // kubelet will set the DeletionGracePeriodSeconds to zero when it's okay // to remove a deleted pod, but we don't run kubelet... - if err := client.CoreV1().Pods(namespace).DeleteCollection(ctx, deleteNow, metav1.ListOptions{}); err != nil { - tb.Fatalf("failed to delete pods in namespace %q: %v", namespace, err) + if err := tCtx.Client().CoreV1().Pods(namespace).DeleteCollection(tCtx, deleteNow, metav1.ListOptions{}); err != nil { + tCtx.Fatalf("failed to delete pods in namespace %q: %v", namespace, err) } - if err := client.CoreV1().Namespaces().Delete(ctx, namespace, deleteNow); err != nil { - tb.Fatalf("Deleting Namespace %q in numPodsScheduledPerNamespace: %v", namespace, err) + if err := tCtx.Client().CoreV1().Namespaces().Delete(tCtx, namespace, deleteNow); err != nil { + tCtx.Fatalf("Deleting Namespace %q in numPodsScheduledPerNamespace: %v", namespace, err) } } @@ -1151,8 +1143,8 @@ func cleanupWorkload(ctx context.Context, tb testing.TB, tc *testCase, client cl // actually removing a namespace can take some time (garbage collecting // other generated object like secrets, etc.) and we don't want to // start the next workloads while that cleanup is still going on. - if err := wait.PollUntilContextTimeout(ctx, time.Second, 5*time.Minute, false, func(ctx context.Context) (bool, error) { - namespaces, err := client.CoreV1().Namespaces().List(ctx, metav1.ListOptions{}) + if err := wait.PollUntilContextTimeout(tCtx, time.Second, 5*time.Minute, false, func(ctx context.Context) (bool, error) { + namespaces, err := tCtx.Client().CoreV1().Namespaces().List(ctx, metav1.ListOptions{}) if err != nil { return false, err } @@ -1165,33 +1157,33 @@ func cleanupWorkload(ctx context.Context, tb testing.TB, tc *testCase, client cl // All namespaces gone. return true, nil }); err != nil { - tb.Fatalf("failed while waiting for namespace removal: %v", err) + tCtx.Fatalf("failed while waiting for namespace removal: %v", err) } } -func createNamespaceIfNotPresent(ctx context.Context, tb testing.TB, client clientset.Interface, namespace string, podsPerNamespace *map[string]int) { +func createNamespaceIfNotPresent(tCtx ktesting.TContext, namespace string, podsPerNamespace *map[string]int) { if _, ok := (*podsPerNamespace)[namespace]; !ok { // The namespace has not created yet. // So, create that and register it. - _, err := client.CoreV1().Namespaces().Create(ctx, &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: namespace}}, metav1.CreateOptions{}) + _, err := tCtx.Client().CoreV1().Namespaces().Create(tCtx, &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: namespace}}, metav1.CreateOptions{}) if err != nil { - tb.Fatalf("failed to create namespace for Pod: %v", namespace) + tCtx.Fatalf("failed to create namespace for Pod: %v", namespace) } (*podsPerNamespace)[namespace] = 0 } } type testDataCollector interface { - run(ctx context.Context) + run(tCtx ktesting.TContext) collect() []DataItem } -func getTestDataCollectors(tb testing.TB, podInformer coreinformers.PodInformer, name, namespace string, mcc *metricsCollectorConfig, throughputErrorMargin float64) []testDataCollector { +func getTestDataCollectors(tCtx ktesting.TContext, podInformer coreinformers.PodInformer, name, namespace string, mcc *metricsCollectorConfig, throughputErrorMargin float64) []testDataCollector { if mcc == nil { mcc = &defaultMetricsCollectorConfig } return []testDataCollector{ - newThroughputCollector(tb, podInformer, map[string]string{"Name": name}, []string{namespace}, throughputErrorMargin), + newThroughputCollector(tCtx, podInformer, map[string]string{"Name": name}, []string{namespace}, throughputErrorMargin), newMetricsCollector(mcc, map[string]string{"Name": name}), } } @@ -1224,25 +1216,25 @@ func getNodePreparer(prefix string, cno *createNodesOp, clientset clientset.Inte ), nil } -func createPods(ctx context.Context, tb testing.TB, namespace string, cpo *createPodsOp, clientset clientset.Interface) error { +func createPods(tCtx ktesting.TContext, namespace string, cpo *createPodsOp) error { strategy, err := getPodStrategy(cpo) if err != nil { return err } - tb.Logf("creating %d pods in namespace %q", cpo.Count, namespace) + tCtx.Logf("creating %d pods in namespace %q", cpo.Count, namespace) config := testutils.NewTestPodCreatorConfig() config.AddStrategy(namespace, cpo.Count, strategy) - podCreator := testutils.NewTestPodCreator(clientset, config) - return podCreator.CreatePods(ctx) + podCreator := testutils.NewTestPodCreator(tCtx.Client(), config) + return podCreator.CreatePods(tCtx) } // waitUntilPodsScheduledInNamespace blocks until all pods in the given // namespace are scheduled. Times out after 10 minutes because even at the // lowest observed QPS of ~10 pods/sec, a 5000-node test should complete. -func waitUntilPodsScheduledInNamespace(ctx context.Context, tb testing.TB, podInformer coreinformers.PodInformer, namespace string, wantCount int) error { +func waitUntilPodsScheduledInNamespace(tCtx ktesting.TContext, podInformer coreinformers.PodInformer, namespace string, wantCount int) error { var pendingPod *v1.Pod - err := wait.PollUntilContextTimeout(ctx, 1*time.Second, 10*time.Minute, true, func(ctx context.Context) (bool, error) { + err := wait.PollUntilContextTimeout(tCtx, 1*time.Second, 10*time.Minute, true, func(ctx context.Context) (bool, error) { select { case <-ctx.Done(): return true, ctx.Err() @@ -1253,10 +1245,10 @@ func waitUntilPodsScheduledInNamespace(ctx context.Context, tb testing.TB, podIn return false, err } if len(scheduled) >= wantCount { - tb.Logf("scheduling succeed") + tCtx.Logf("scheduling succeed") return true, nil } - tb.Logf("namespace: %s, pods: want %d, got %d", namespace, wantCount, len(scheduled)) + tCtx.Logf("namespace: %s, pods: want %d, got %d", namespace, wantCount, len(scheduled)) if len(unscheduled) > 0 { pendingPod = unscheduled[0] } else { @@ -1273,7 +1265,7 @@ func waitUntilPodsScheduledInNamespace(ctx context.Context, tb testing.TB, podIn // waitUntilPodsScheduled blocks until the all pods in the given namespaces are // scheduled. -func waitUntilPodsScheduled(ctx context.Context, tb testing.TB, podInformer coreinformers.PodInformer, namespaces []string, numPodsScheduledPerNamespace map[string]int) error { +func waitUntilPodsScheduled(tCtx ktesting.TContext, podInformer coreinformers.PodInformer, namespaces []string, numPodsScheduledPerNamespace map[string]int) error { // If unspecified, default to all known namespaces. if len(namespaces) == 0 { for namespace := range numPodsScheduledPerNamespace { @@ -1282,15 +1274,15 @@ func waitUntilPodsScheduled(ctx context.Context, tb testing.TB, podInformer core } for _, namespace := range namespaces { select { - case <-ctx.Done(): - return ctx.Err() + case <-tCtx.Done(): + return context.Cause(tCtx) default: } wantCount, ok := numPodsScheduledPerNamespace[namespace] if !ok { return fmt.Errorf("unknown namespace %s", namespace) } - if err := waitUntilPodsScheduledInNamespace(ctx, tb, podInformer, namespace, wantCount); err != nil { + if err := waitUntilPodsScheduledInNamespace(tCtx, podInformer, namespace, wantCount); err != nil { return fmt.Errorf("error waiting for pods in namespace %q: %w", namespace, err) } } @@ -1440,14 +1432,12 @@ func getCustomVolumeFactory(pvTemplate *v1.PersistentVolume) func(id int) *v1.Pe // namespacePreparer holds configuration information for the test namespace preparer. type namespacePreparer struct { - client clientset.Interface count int prefix string spec *v1.Namespace - tb testing.TB } -func newNamespacePreparer(cno *createNamespacesOp, clientset clientset.Interface, tb testing.TB) (*namespacePreparer, error) { +func newNamespacePreparer(tCtx ktesting.TContext, cno *createNamespacesOp) (*namespacePreparer, error) { ns := &v1.Namespace{} if cno.NamespaceTemplatePath != nil { if err := getSpecFromFile(cno.NamespaceTemplatePath, ns); err != nil { @@ -1456,11 +1446,9 @@ func newNamespacePreparer(cno *createNamespacesOp, clientset clientset.Interface } return &namespacePreparer{ - client: clientset, count: cno.Count, prefix: cno.Prefix, spec: ns, - tb: tb, }, nil } @@ -1474,17 +1462,17 @@ func (p *namespacePreparer) namespaces() []string { } // prepare creates the namespaces. -func (p *namespacePreparer) prepare(ctx context.Context) error { +func (p *namespacePreparer) prepare(tCtx ktesting.TContext) error { base := &v1.Namespace{} if p.spec != nil { base = p.spec } - p.tb.Logf("Making %d namespaces with prefix %q and template %v", p.count, p.prefix, *base) + tCtx.Logf("Making %d namespaces with prefix %q and template %v", p.count, p.prefix, *base) for i := 0; i < p.count; i++ { n := base.DeepCopy() n.Name = fmt.Sprintf("%s-%d", p.prefix, i) if err := testutils.RetryWithExponentialBackOff(func() (bool, error) { - _, err := p.client.CoreV1().Namespaces().Create(ctx, n, metav1.CreateOptions{}) + _, err := tCtx.Client().CoreV1().Namespaces().Create(tCtx, n, metav1.CreateOptions{}) return err == nil || apierrors.IsAlreadyExists(err), nil }); err != nil { return err @@ -1494,12 +1482,12 @@ func (p *namespacePreparer) prepare(ctx context.Context) error { } // cleanup deletes existing test namespaces. -func (p *namespacePreparer) cleanup(ctx context.Context) error { +func (p *namespacePreparer) cleanup(tCtx ktesting.TContext) error { var errRet error for i := 0; i < p.count; i++ { n := fmt.Sprintf("%s-%d", p.prefix, i) - if err := p.client.CoreV1().Namespaces().Delete(ctx, n, metav1.DeleteOptions{}); err != nil { - p.tb.Errorf("Deleting Namespace: %v", err) + if err := tCtx.Client().CoreV1().Namespaces().Delete(tCtx, n, metav1.DeleteOptions{}); err != nil { + tCtx.Errorf("Deleting Namespace: %v", err) errRet = err } } diff --git a/test/integration/scheduler_perf/scheduler_test.go b/test/integration/scheduler_perf/scheduler_test.go index e28ec5307a5..dacc201f8e4 100644 --- a/test/integration/scheduler_perf/scheduler_test.go +++ b/test/integration/scheduler_perf/scheduler_test.go @@ -17,7 +17,6 @@ limitations under the License. package benchmark import ( - "context" "testing" utilfeature "k8s.io/apiserver/pkg/util/feature" @@ -70,16 +69,15 @@ func TestScheduling(t *testing.T) { for _, config := range configs { // Not a sub test because we don't have a good name for it. func() { - _, ctx := ktesting.NewTestContext(t) + tCtx := ktesting.Init(t) + // No timeout here because the `go test -timeout` will ensure that // the test doesn't get stuck forever. - ctx, cancel := context.WithCancel(ctx) - defer cancel() for feature, flag := range config.featureGates { defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, feature, flag)() } - informerFactory, client, dynClient := setupClusterForWorkload(ctx, t, config.schedulerConfigPath, config.featureGates, nil) + informerFactory, tCtx := setupClusterForWorkload(tCtx, config.schedulerConfigPath, config.featureGates, nil) for _, tc := range testCases { if !config.equals(tc) { @@ -93,8 +91,8 @@ func TestScheduling(t *testing.T) { if !enabled(*testSchedulingLabelFilter, append(tc.Labels, w.Labels...)...) { t.Skipf("disabled by label filter %q", *testSchedulingLabelFilter) } - _, ctx := ktesting.NewTestContext(t) - runWorkload(ctx, t, tc, w, informerFactory, client, dynClient, true) + tCtx := ktesting.WithTB(tCtx, t) + runWorkload(tCtx, tc, w, informerFactory, true) }) } }) diff --git a/test/integration/scheduler_perf/util.go b/test/integration/scheduler_perf/util.go index 5f3c4a3f9df..5a359d151f1 100644 --- a/test/integration/scheduler_perf/util.go +++ b/test/integration/scheduler_perf/util.go @@ -18,7 +18,6 @@ package benchmark import ( "bytes" - "context" "encoding/json" "flag" "fmt" @@ -26,26 +25,22 @@ import ( "os" "path" "sort" - "testing" + "strings" "time" v1 "k8s.io/api/core/v1" - resourcev1alpha2 "k8s.io/api/resource/v1alpha2" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/client-go/dynamic" "k8s.io/client-go/informers" coreinformers "k8s.io/client-go/informers/core/v1" - clientset "k8s.io/client-go/kubernetes" restclient "k8s.io/client-go/rest" - cliflag "k8s.io/component-base/cli/flag" "k8s.io/component-base/featuregate" "k8s.io/component-base/metrics/legacyregistry" "k8s.io/component-base/metrics/testutil" "k8s.io/klog/v2" kubeschedulerconfigv1 "k8s.io/kube-scheduler/config/v1" - "k8s.io/kubernetes/cmd/kube-apiserver/app/options" + apiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing" "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/scheduler/apis/config" kubeschedulerscheme "k8s.io/kubernetes/pkg/scheduler/apis/config/scheme" @@ -53,6 +48,7 @@ import ( "k8s.io/kubernetes/test/integration/framework" "k8s.io/kubernetes/test/integration/util" testutils "k8s.io/kubernetes/test/utils" + "k8s.io/kubernetes/test/utils/ktesting" ) const ( @@ -83,33 +79,34 @@ func newDefaultComponentConfig() (*config.KubeSchedulerConfiguration, error) { // remove resources after finished. // Notes on rate limiter: // - client rate limit is set to 5000. -func mustSetupCluster(ctx context.Context, tb testing.TB, config *config.KubeSchedulerConfiguration, enabledFeatures map[featuregate.Feature]bool, outOfTreePluginRegistry frameworkruntime.Registry) (informers.SharedInformerFactory, clientset.Interface, dynamic.Interface) { +func mustSetupCluster(tCtx ktesting.TContext, config *config.KubeSchedulerConfiguration, enabledFeatures map[featuregate.Feature]bool, outOfTreePluginRegistry frameworkruntime.Registry) (informers.SharedInformerFactory, ktesting.TContext) { // Run API server with minimimal logging by default. Can be raised with -v. framework.MinVerbosity = 0 - _, kubeConfig, tearDownFn := framework.StartTestServer(ctx, tb, framework.TestServerSetup{ - ModifyServerRunOptions: func(opts *options.ServerRunOptions) { - // Disable ServiceAccount admission plugin as we don't have serviceaccount controller running. - opts.Admission.GenericAdmission.DisablePlugins = []string{"ServiceAccount", "TaintNodesByCondition", "Priority"} - - // Enable DRA API group. - if enabledFeatures[features.DynamicResourceAllocation] { - opts.APIEnablement.RuntimeConfig = cliflag.ConfigurationMap{ - resourcev1alpha2.SchemeGroupVersion.String(): "true", - } - } - }, - }) - tb.Cleanup(tearDownFn) + // No alpha APIs (overrides api/all=true in https://github.com/kubernetes/kubernetes/blob/d647d19f6aef811bace300eec96a67644ff303d4/staging/src/k8s.io/apiextensions-apiserver/pkg/cmd/server/testing/testserver.go#L136), + // except for DRA API group when needed. + runtimeConfig := []string{"api/alpha=false"} + if enabledFeatures[features.DynamicResourceAllocation] { + runtimeConfig = append(runtimeConfig, "resource.k8s.io/v1alpha2=true") + } + customFlags := []string{ + // Disable ServiceAccount admission plugin as we don't have serviceaccount controller running. + "--disable-admission-plugins=ServiceAccount,TaintNodesByCondition,Priority", + "--runtime-config=" + strings.Join(runtimeConfig, ","), + } + server, err := apiservertesting.StartTestServer(tCtx, apiservertesting.NewDefaultTestServerOptions(), customFlags, framework.SharedEtcd()) + if err != nil { + tCtx.Fatalf("start apiserver: %v", err) + } + tCtx.Cleanup(server.TearDownFn) // Cleanup will be in reverse order: first the clients get cancelled, - // then the apiserver is torn down. - ctx, cancel := context.WithCancel(ctx) - tb.Cleanup(cancel) + // then the apiserver is torn down via the automatic cancelation of + // tCtx. // TODO: client connection configuration, such as QPS or Burst is configurable in theory, this could be derived from the `config`, need to // support this when there is any testcase that depends on such configuration. - cfg := restclient.CopyConfig(kubeConfig) + cfg := restclient.CopyConfig(server.ClientConfig) cfg.QPS = 5000.0 cfg.Burst = 5000 @@ -118,34 +115,33 @@ func mustSetupCluster(ctx context.Context, tb testing.TB, config *config.KubeSch var err error config, err = newDefaultComponentConfig() if err != nil { - tb.Fatalf("Error creating default component config: %v", err) + tCtx.Fatalf("Error creating default component config: %v", err) } } - client := clientset.NewForConfigOrDie(cfg) - dynClient := dynamic.NewForConfigOrDie(cfg) + tCtx = ktesting.WithRESTConfig(tCtx, cfg) // Not all config options will be effective but only those mostly related with scheduler performance will // be applied to start a scheduler, most of them are defined in `scheduler.schedulerOptions`. - _, informerFactory := util.StartScheduler(ctx, client, cfg, config, outOfTreePluginRegistry) - util.StartFakePVController(ctx, client, informerFactory) - runGC := util.CreateGCController(ctx, tb, *cfg, informerFactory) - runNS := util.CreateNamespaceController(ctx, tb, *cfg, informerFactory) + _, informerFactory := util.StartScheduler(tCtx, tCtx.Client(), cfg, config, outOfTreePluginRegistry) + util.StartFakePVController(tCtx, tCtx.Client(), informerFactory) + runGC := util.CreateGCController(tCtx, tCtx, *cfg, informerFactory) + runNS := util.CreateNamespaceController(tCtx, tCtx, *cfg, informerFactory) runResourceClaimController := func() {} if enabledFeatures[features.DynamicResourceAllocation] { // Testing of DRA with inline resource claims depends on this // controller for creating and removing ResourceClaims. - runResourceClaimController = util.CreateResourceClaimController(ctx, tb, client, informerFactory) + runResourceClaimController = util.CreateResourceClaimController(tCtx, tCtx, tCtx.Client(), informerFactory) } - informerFactory.Start(ctx.Done()) - informerFactory.WaitForCacheSync(ctx.Done()) + informerFactory.Start(tCtx.Done()) + informerFactory.WaitForCacheSync(tCtx.Done()) go runGC() go runNS() go runResourceClaimController() - return informerFactory, client, dynClient + return informerFactory, tCtx } // Returns the list of scheduled and unscheduled pods in the specified namespaces. @@ -268,7 +264,7 @@ func newMetricsCollector(config *metricsCollectorConfig, labels map[string]strin } } -func (*metricsCollector) run(ctx context.Context) { +func (*metricsCollector) run(tCtx ktesting.TContext) { // metricCollector doesn't need to start before the tests, so nothing to do here. } @@ -342,7 +338,6 @@ func collectHistogramVec(metric string, labels map[string]string, lvMap map[stri } type throughputCollector struct { - tb testing.TB podInformer coreinformers.PodInformer schedulingThroughputs []float64 labels map[string]string @@ -350,9 +345,8 @@ type throughputCollector struct { errorMargin float64 } -func newThroughputCollector(tb testing.TB, podInformer coreinformers.PodInformer, labels map[string]string, namespaces []string, errorMargin float64) *throughputCollector { +func newThroughputCollector(tb ktesting.TB, podInformer coreinformers.PodInformer, labels map[string]string, namespaces []string, errorMargin float64) *throughputCollector { return &throughputCollector{ - tb: tb, podInformer: podInformer, labels: labels, namespaces: namespaces, @@ -360,7 +354,7 @@ func newThroughputCollector(tb testing.TB, podInformer coreinformers.PodInformer } } -func (tc *throughputCollector) run(ctx context.Context) { +func (tc *throughputCollector) run(tCtx ktesting.TContext) { podsScheduled, _, err := getScheduledPods(tc.podInformer, tc.namespaces...) if err != nil { klog.Fatalf("%v", err) @@ -374,7 +368,7 @@ func (tc *throughputCollector) run(ctx context.Context) { for { select { - case <-ctx.Done(): + case <-tCtx.Done(): return case <-ticker.C: now := time.Now() @@ -419,7 +413,7 @@ func (tc *throughputCollector) run(ctx context.Context) { errorMargin := (duration - expectedDuration).Seconds() / expectedDuration.Seconds() * 100 if tc.errorMargin > 0 && math.Abs(errorMargin) > tc.errorMargin { // This might affect the result, report it. - tc.tb.Errorf("ERROR: Expected throuput collector to sample at regular time intervals. The %d most recent intervals took %s instead of %s, a difference of %0.1f%%.", skipped+1, duration, expectedDuration, errorMargin) + tCtx.Errorf("ERROR: Expected throuput collector to sample at regular time intervals. The %d most recent intervals took %s instead of %s, a difference of %0.1f%%.", skipped+1, duration, expectedDuration, errorMargin) } // To keep percentiles accurate, we have to record multiple samples with the same diff --git a/test/integration/util/util.go b/test/integration/util/util.go index 13e34f4cfb2..c555b637db0 100644 --- a/test/integration/util/util.go +++ b/test/integration/util/util.go @@ -128,7 +128,7 @@ func StartScheduler(ctx context.Context, clientSet clientset.Interface, kubeConf return sched, informerFactory } -func CreateResourceClaimController(ctx context.Context, tb testing.TB, clientSet clientset.Interface, informerFactory informers.SharedInformerFactory) func() { +func CreateResourceClaimController(ctx context.Context, tb ktesting.TB, clientSet clientset.Interface, informerFactory informers.SharedInformerFactory) func() { podInformer := informerFactory.Core().V1().Pods() schedulingInformer := informerFactory.Resource().V1alpha2().PodSchedulingContexts() claimInformer := informerFactory.Resource().V1alpha2().ResourceClaims() @@ -190,7 +190,7 @@ func StartFakePVController(ctx context.Context, clientSet clientset.Interface, i // CreateGCController creates a garbage controller and returns a run function // for it. The informer factory needs to be started before invoking that // function. -func CreateGCController(ctx context.Context, tb testing.TB, restConfig restclient.Config, informerSet informers.SharedInformerFactory) func() { +func CreateGCController(ctx context.Context, tb ktesting.TB, restConfig restclient.Config, informerSet informers.SharedInformerFactory) func() { restclient.AddUserAgent(&restConfig, "gc-controller") clientSet := clientset.NewForConfigOrDie(&restConfig) metadataClient, err := metadata.NewForConfig(&restConfig) @@ -227,7 +227,7 @@ func CreateGCController(ctx context.Context, tb testing.TB, restConfig restclien // CreateNamespaceController creates a namespace controller and returns a run // function for it. The informer factory needs to be started before invoking // that function. -func CreateNamespaceController(ctx context.Context, tb testing.TB, restConfig restclient.Config, informerSet informers.SharedInformerFactory) func() { +func CreateNamespaceController(ctx context.Context, tb ktesting.TB, restConfig restclient.Config, informerSet informers.SharedInformerFactory) func() { restclient.AddUserAgent(&restConfig, "namespace-controller") clientSet := clientset.NewForConfigOrDie(&restConfig) metadataClient, err := metadata.NewForConfig(&restConfig)