Benchmark job with backoff limit per index
This commit is contained in:
		@@ -2097,25 +2097,53 @@ func BenchmarkLargeIndexedJob(b *testing.B) {
 | 
				
			|||||||
		Steps:    30,
 | 
							Steps:    30,
 | 
				
			||||||
		Cap:      5 * time.Minute,
 | 
							Cap:      5 * time.Minute,
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
						cases := map[string]struct {
 | 
				
			||||||
 | 
							nPods                int32
 | 
				
			||||||
 | 
							backoffLimitPerIndex *int32
 | 
				
			||||||
 | 
						}{
 | 
				
			||||||
 | 
							"regular indexed job without failures; size=10": {
 | 
				
			||||||
 | 
								nPods: 10,
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							"job with backoffLimitPerIndex without failures; size=10": {
 | 
				
			||||||
 | 
								nPods:                10,
 | 
				
			||||||
 | 
								backoffLimitPerIndex: ptr.To[int32](1),
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							"regular indexed job without failures; size=100": {
 | 
				
			||||||
 | 
								nPods: 100,
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							"job with backoffLimitPerIndex without failures; size=100": {
 | 
				
			||||||
 | 
								nPods:                100,
 | 
				
			||||||
 | 
								backoffLimitPerIndex: ptr.To[int32](1),
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
	mode := batchv1.IndexedCompletion
 | 
						mode := batchv1.IndexedCompletion
 | 
				
			||||||
	for _, nPods := range []int32{1000, 10_000} {
 | 
						for name, tc := range cases {
 | 
				
			||||||
		b.Run(fmt.Sprintf("nPods=%d", nPods), func(b *testing.B) {
 | 
							b.Run(name, func(b *testing.B) {
 | 
				
			||||||
 | 
								enableJobBackoffLimitPerIndex := tc.backoffLimitPerIndex != nil
 | 
				
			||||||
 | 
								defer featuregatetesting.SetFeatureGateDuringTest(b, feature.DefaultFeatureGate, features.JobBackoffLimitPerIndex, enableJobBackoffLimitPerIndex)()
 | 
				
			||||||
			b.ResetTimer()
 | 
								b.ResetTimer()
 | 
				
			||||||
			for n := 0; n < b.N; n++ {
 | 
								for n := 0; n < b.N; n++ {
 | 
				
			||||||
 | 
									b.StartTimer()
 | 
				
			||||||
				jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
 | 
									jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
 | 
				
			||||||
					ObjectMeta: metav1.ObjectMeta{
 | 
										ObjectMeta: metav1.ObjectMeta{
 | 
				
			||||||
						Name: fmt.Sprintf("npods-%d-%d", nPods, n),
 | 
											Name: fmt.Sprintf("npods-%d-%d-%v", tc.nPods, n, enableJobBackoffLimitPerIndex),
 | 
				
			||||||
					},
 | 
										},
 | 
				
			||||||
					Spec: batchv1.JobSpec{
 | 
										Spec: batchv1.JobSpec{
 | 
				
			||||||
						Parallelism:    ptr.To(nPods),
 | 
											Parallelism:          ptr.To(tc.nPods),
 | 
				
			||||||
						Completions:    ptr.To(nPods),
 | 
											Completions:          ptr.To(tc.nPods),
 | 
				
			||||||
						CompletionMode: &mode,
 | 
											CompletionMode:       &mode,
 | 
				
			||||||
 | 
											BackoffLimitPerIndex: tc.backoffLimitPerIndex,
 | 
				
			||||||
					},
 | 
										},
 | 
				
			||||||
				})
 | 
									})
 | 
				
			||||||
				if err != nil {
 | 
									if err != nil {
 | 
				
			||||||
					b.Fatalf("Failed to create Job: %v", err)
 | 
										b.Fatalf("Failed to create Job: %v", err)
 | 
				
			||||||
				}
 | 
									}
 | 
				
			||||||
				remaining := int(nPods)
 | 
									b.Cleanup(func() {
 | 
				
			||||||
 | 
										if err := cleanUp(ctx, clientSet, jobObj); err != nil {
 | 
				
			||||||
 | 
											b.Fatalf("Failed cleanup: %v", err)
 | 
				
			||||||
 | 
										}
 | 
				
			||||||
 | 
									})
 | 
				
			||||||
 | 
									remaining := int(tc.nPods)
 | 
				
			||||||
				if err := wait.ExponentialBackoff(backoff, func() (done bool, err error) {
 | 
									if err := wait.ExponentialBackoff(backoff, func() (done bool, err error) {
 | 
				
			||||||
					if err, succ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, remaining); err != nil {
 | 
										if err, succ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, remaining); err != nil {
 | 
				
			||||||
						remaining -= succ
 | 
											remaining -= succ
 | 
				
			||||||
@@ -2127,38 +2155,134 @@ func BenchmarkLargeIndexedJob(b *testing.B) {
 | 
				
			|||||||
					b.Fatalf("Could not succeed the remaining %d pods: %v", remaining, err)
 | 
										b.Fatalf("Could not succeed the remaining %d pods: %v", remaining, err)
 | 
				
			||||||
				}
 | 
									}
 | 
				
			||||||
				validateJobSucceeded(ctx, b, clientSet, jobObj)
 | 
									validateJobSucceeded(ctx, b, clientSet, jobObj)
 | 
				
			||||||
 | 
					 | 
				
			||||||
				// Cleanup Pods and Job.
 | 
					 | 
				
			||||||
				b.StopTimer()
 | 
									b.StopTimer()
 | 
				
			||||||
				// Clean up pods in pages, because DeleteCollection might timeout.
 | 
					 | 
				
			||||||
				// #90743
 | 
					 | 
				
			||||||
				for {
 | 
					 | 
				
			||||||
					pods, err := clientSet.CoreV1().Pods(ns.Name).List(ctx, metav1.ListOptions{Limit: 1})
 | 
					 | 
				
			||||||
					if err != nil {
 | 
					 | 
				
			||||||
						b.Fatalf("Failed to list Pods for cleanup: %v", err)
 | 
					 | 
				
			||||||
					}
 | 
					 | 
				
			||||||
					if len(pods.Items) == 0 {
 | 
					 | 
				
			||||||
						break
 | 
					 | 
				
			||||||
					}
 | 
					 | 
				
			||||||
					err = clientSet.CoreV1().Pods(ns.Name).DeleteCollection(ctx,
 | 
					 | 
				
			||||||
						metav1.DeleteOptions{},
 | 
					 | 
				
			||||||
						metav1.ListOptions{
 | 
					 | 
				
			||||||
							Limit: 1000,
 | 
					 | 
				
			||||||
						})
 | 
					 | 
				
			||||||
					if err != nil {
 | 
					 | 
				
			||||||
						b.Fatalf("Failed to cleanup Pods: %v", err)
 | 
					 | 
				
			||||||
					}
 | 
					 | 
				
			||||||
				}
 | 
					 | 
				
			||||||
				err = clientSet.BatchV1().Jobs(jobObj.Namespace).Delete(ctx, jobObj.Name, metav1.DeleteOptions{})
 | 
					 | 
				
			||||||
				if err != nil {
 | 
					 | 
				
			||||||
					b.Fatalf("Failed to cleanup Job: %v", err)
 | 
					 | 
				
			||||||
				}
 | 
					 | 
				
			||||||
				b.StartTimer()
 | 
					 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
		})
 | 
							})
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// BenchmarkLargeFailureHandling benchmarks the handling of numerous pod failures
 | 
				
			||||||
 | 
					// of an Indexed Job. We set minimal backoff delay to make the job controller
 | 
				
			||||||
 | 
					// performance comparable for indexed jobs with global backoffLimit, and those
 | 
				
			||||||
 | 
					// with backoffLimit per-index, despite different patterns of handling failures.
 | 
				
			||||||
 | 
					func BenchmarkLargeFailureHandling(b *testing.B) {
 | 
				
			||||||
 | 
						b.Cleanup(setDurationDuringTest(&jobcontroller.DefaultJobPodFailureBackOff, fastPodFailureBackoff))
 | 
				
			||||||
 | 
						b.Cleanup(setDurationDuringTest(&jobcontroller.MaxJobPodFailureBackOff, fastPodFailureBackoff))
 | 
				
			||||||
 | 
						closeFn, restConfig, clientSet, ns := setup(b, "indexed")
 | 
				
			||||||
 | 
						restConfig.QPS = 100
 | 
				
			||||||
 | 
						restConfig.Burst = 100
 | 
				
			||||||
 | 
						defer closeFn()
 | 
				
			||||||
 | 
						ctx, cancel := startJobControllerAndWaitForCaches(b, restConfig)
 | 
				
			||||||
 | 
						defer cancel()
 | 
				
			||||||
 | 
						backoff := wait.Backoff{
 | 
				
			||||||
 | 
							Duration: time.Second,
 | 
				
			||||||
 | 
							Factor:   1.5,
 | 
				
			||||||
 | 
							Steps:    30,
 | 
				
			||||||
 | 
							Cap:      5 * time.Minute,
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						cases := map[string]struct {
 | 
				
			||||||
 | 
							nPods                int32
 | 
				
			||||||
 | 
							backoffLimitPerIndex *int32
 | 
				
			||||||
 | 
							customTimeout        *time.Duration
 | 
				
			||||||
 | 
						}{
 | 
				
			||||||
 | 
							"regular indexed job with failures; size=10": {
 | 
				
			||||||
 | 
								nPods: 10,
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							"job with backoffLimitPerIndex with failures; size=10": {
 | 
				
			||||||
 | 
								nPods:                10,
 | 
				
			||||||
 | 
								backoffLimitPerIndex: ptr.To[int32](1),
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							"regular indexed job with failures; size=100": {
 | 
				
			||||||
 | 
								nPods: 100,
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							"job with backoffLimitPerIndex with failures; size=100": {
 | 
				
			||||||
 | 
								nPods:                100,
 | 
				
			||||||
 | 
								backoffLimitPerIndex: ptr.To[int32](1),
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						mode := batchv1.IndexedCompletion
 | 
				
			||||||
 | 
						for name, tc := range cases {
 | 
				
			||||||
 | 
							b.Run(name, func(b *testing.B) {
 | 
				
			||||||
 | 
								enableJobBackoffLimitPerIndex := tc.backoffLimitPerIndex != nil
 | 
				
			||||||
 | 
								timeout := ptr.Deref(tc.customTimeout, wait.ForeverTestTimeout)
 | 
				
			||||||
 | 
								defer featuregatetesting.SetFeatureGateDuringTest(b, feature.DefaultFeatureGate, features.JobBackoffLimitPerIndex, enableJobBackoffLimitPerIndex)()
 | 
				
			||||||
 | 
								b.ResetTimer()
 | 
				
			||||||
 | 
								for n := 0; n < b.N; n++ {
 | 
				
			||||||
 | 
									b.StopTimer()
 | 
				
			||||||
 | 
									jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
 | 
				
			||||||
 | 
										ObjectMeta: metav1.ObjectMeta{
 | 
				
			||||||
 | 
											Name: fmt.Sprintf("npods-%d-%d-%v", tc.nPods, n, enableJobBackoffLimitPerIndex),
 | 
				
			||||||
 | 
										},
 | 
				
			||||||
 | 
										Spec: batchv1.JobSpec{
 | 
				
			||||||
 | 
											Parallelism:          ptr.To(tc.nPods),
 | 
				
			||||||
 | 
											Completions:          ptr.To(tc.nPods),
 | 
				
			||||||
 | 
											CompletionMode:       &mode,
 | 
				
			||||||
 | 
											BackoffLimitPerIndex: tc.backoffLimitPerIndex,
 | 
				
			||||||
 | 
											BackoffLimit:         ptr.To(tc.nPods),
 | 
				
			||||||
 | 
										},
 | 
				
			||||||
 | 
									})
 | 
				
			||||||
 | 
									if err != nil {
 | 
				
			||||||
 | 
										b.Fatalf("Failed to create Job: %v", err)
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
 | 
									b.Cleanup(func() {
 | 
				
			||||||
 | 
										if err := cleanUp(ctx, clientSet, jobObj); err != nil {
 | 
				
			||||||
 | 
											b.Fatalf("Failed cleanup: %v", err)
 | 
				
			||||||
 | 
										}
 | 
				
			||||||
 | 
									})
 | 
				
			||||||
 | 
									validateJobsPodsStatusOnlyWithTimeout(ctx, b, clientSet, jobObj, podsByStatus{
 | 
				
			||||||
 | 
										Active:      int(tc.nPods),
 | 
				
			||||||
 | 
										Ready:       ptr.To[int32](0),
 | 
				
			||||||
 | 
										Terminating: ptr.To[int32](0),
 | 
				
			||||||
 | 
									}, timeout)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
									b.StartTimer()
 | 
				
			||||||
 | 
									remaining := int(tc.nPods)
 | 
				
			||||||
 | 
									if err := wait.ExponentialBackoff(backoff, func() (done bool, err error) {
 | 
				
			||||||
 | 
										if err, fail := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, remaining); err != nil {
 | 
				
			||||||
 | 
											remaining -= fail
 | 
				
			||||||
 | 
											b.Logf("Transient failure failing pods: %v", err)
 | 
				
			||||||
 | 
											return false, nil
 | 
				
			||||||
 | 
										}
 | 
				
			||||||
 | 
										return true, nil
 | 
				
			||||||
 | 
									}); err != nil {
 | 
				
			||||||
 | 
										b.Fatalf("Could not succeed the remaining %d pods: %v", remaining, err)
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
 | 
									validateJobsPodsStatusOnlyWithTimeout(ctx, b, clientSet, jobObj, podsByStatus{
 | 
				
			||||||
 | 
										Active:      int(tc.nPods),
 | 
				
			||||||
 | 
										Ready:       ptr.To[int32](0),
 | 
				
			||||||
 | 
										Failed:      int(tc.nPods),
 | 
				
			||||||
 | 
										Terminating: ptr.To[int32](0),
 | 
				
			||||||
 | 
									}, timeout)
 | 
				
			||||||
 | 
									b.StopTimer()
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
							})
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// cleanUp deletes all pods and the job
 | 
				
			||||||
 | 
					func cleanUp(ctx context.Context, clientSet clientset.Interface, jobObj *batchv1.Job) error {
 | 
				
			||||||
 | 
						// Clean up pods in pages, because DeleteCollection might timeout.
 | 
				
			||||||
 | 
						// #90743
 | 
				
			||||||
 | 
						for {
 | 
				
			||||||
 | 
							pods, err := clientSet.CoreV1().Pods(jobObj.Namespace).List(ctx, metav1.ListOptions{Limit: 1})
 | 
				
			||||||
 | 
							if err != nil {
 | 
				
			||||||
 | 
								return err
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							if len(pods.Items) == 0 {
 | 
				
			||||||
 | 
								break
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							err = clientSet.CoreV1().Pods(jobObj.Namespace).DeleteCollection(ctx,
 | 
				
			||||||
 | 
								metav1.DeleteOptions{},
 | 
				
			||||||
 | 
								metav1.ListOptions{
 | 
				
			||||||
 | 
									Limit: 1000,
 | 
				
			||||||
 | 
								})
 | 
				
			||||||
 | 
							if err != nil {
 | 
				
			||||||
 | 
								return err
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return clientSet.BatchV1().Jobs(jobObj.Namespace).Delete(ctx, jobObj.Name, metav1.DeleteOptions{})
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func TestOrphanPodsFinalizersClearedWithGC(t *testing.T) {
 | 
					func TestOrphanPodsFinalizersClearedWithGC(t *testing.T) {
 | 
				
			||||||
	for _, policy := range []metav1.DeletionPropagation{metav1.DeletePropagationOrphan, metav1.DeletePropagationBackground, metav1.DeletePropagationForeground} {
 | 
						for _, policy := range []metav1.DeletionPropagation{metav1.DeletePropagationOrphan, metav1.DeletePropagationBackground, metav1.DeletePropagationForeground} {
 | 
				
			||||||
		t.Run(string(policy), func(t *testing.T) {
 | 
							t.Run(string(policy), func(t *testing.T) {
 | 
				
			||||||
@@ -2617,10 +2741,15 @@ type podsByStatus struct {
 | 
				
			|||||||
	Terminating *int32
 | 
						Terminating *int32
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func validateJobsPodsStatusOnly(ctx context.Context, t *testing.T, clientSet clientset.Interface, jobObj *batchv1.Job, desired podsByStatus) {
 | 
					func validateJobsPodsStatusOnly(ctx context.Context, t testing.TB, clientSet clientset.Interface, jobObj *batchv1.Job, desired podsByStatus) {
 | 
				
			||||||
 | 
						t.Helper()
 | 
				
			||||||
 | 
						validateJobsPodsStatusOnlyWithTimeout(ctx, t, clientSet, jobObj, desired, wait.ForeverTestTimeout)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func validateJobsPodsStatusOnlyWithTimeout(ctx context.Context, t testing.TB, clientSet clientset.Interface, jobObj *batchv1.Job, desired podsByStatus, timeout time.Duration) {
 | 
				
			||||||
	t.Helper()
 | 
						t.Helper()
 | 
				
			||||||
	var actualCounts podsByStatus
 | 
						var actualCounts podsByStatus
 | 
				
			||||||
	if err := wait.PollUntilContextTimeout(ctx, waitInterval, wait.ForeverTestTimeout, true, func(ctx context.Context) (bool, error) {
 | 
						if err := wait.PollUntilContextTimeout(ctx, waitInterval, timeout, true, func(ctx context.Context) (bool, error) {
 | 
				
			||||||
		updatedJob, err := clientSet.BatchV1().Jobs(jobObj.Namespace).Get(ctx, jobObj.Name, metav1.GetOptions{})
 | 
							updatedJob, err := clientSet.BatchV1().Jobs(jobObj.Namespace).Get(ctx, jobObj.Name, metav1.GetOptions{})
 | 
				
			||||||
		if err != nil {
 | 
							if err != nil {
 | 
				
			||||||
			t.Fatalf("Failed to get updated Job: %v", err)
 | 
								t.Fatalf("Failed to get updated Job: %v", err)
 | 
				
			||||||
@@ -2638,7 +2767,8 @@ func validateJobsPodsStatusOnly(ctx context.Context, t *testing.T, clientSet cli
 | 
				
			|||||||
		t.Errorf("Waiting for Job Status: %v\nPods (-want,+got):\n%s", err, diff)
 | 
							t.Errorf("Waiting for Job Status: %v\nPods (-want,+got):\n%s", err, diff)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
func validateJobPodsStatus(ctx context.Context, t *testing.T, clientSet clientset.Interface, jobObj *batchv1.Job, desired podsByStatus) {
 | 
					
 | 
				
			||||||
 | 
					func validateJobPodsStatus(ctx context.Context, t testing.TB, clientSet clientset.Interface, jobObj *batchv1.Job, desired podsByStatus) {
 | 
				
			||||||
	t.Helper()
 | 
						t.Helper()
 | 
				
			||||||
	validateJobsPodsStatusOnly(ctx, t, clientSet, jobObj, desired)
 | 
						validateJobsPodsStatusOnly(ctx, t, clientSet, jobObj, desired)
 | 
				
			||||||
	var active []*v1.Pod
 | 
						var active []*v1.Pod
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user