Merge pull request #118759 from mimowo/dont-apibackoff-on-pod-failures
Do not bump API requests backoff in the Job controller due to pod failures
This commit is contained in:
		@@ -905,12 +905,6 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
 | 
			
		||||
		return fmt.Errorf("tracking status: %w", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	jobFinished := IsJobFinished(&job)
 | 
			
		||||
	if jobHasNewFailure && !jobFinished {
 | 
			
		||||
		// returning an error will re-enqueue Job after the backoff period
 | 
			
		||||
		return fmt.Errorf("failed pod(s) detected for job key %q", key)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return manageJobErr
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -854,10 +854,6 @@ func TestControllerSyncJob(t *testing.T) {
 | 
			
		||||
				if err == nil {
 | 
			
		||||
					t.Error("Syncing jobs expected to return error on podControl exception")
 | 
			
		||||
				}
 | 
			
		||||
			} else if tc.expectedCondition == nil && (hasValidFailingPods(tc.podsWithIndexes, int(tc.completions)) || (tc.completionMode != batch.IndexedCompletion && tc.failedPods > 0)) {
 | 
			
		||||
				if err == nil {
 | 
			
		||||
					t.Error("Syncing jobs expected to return error when there are new failed pods and Job didn't finish")
 | 
			
		||||
				}
 | 
			
		||||
			} else if tc.podLimit != 0 && fakePodControl.CreateCallCount > tc.podLimit {
 | 
			
		||||
				if err == nil {
 | 
			
		||||
					t.Error("Syncing jobs expected to return error when reached the podControl limit")
 | 
			
		||||
@@ -1704,7 +1700,6 @@ func TestSyncJobPastDeadline(t *testing.T) {
 | 
			
		||||
		failedPods    int
 | 
			
		||||
 | 
			
		||||
		// expectations
 | 
			
		||||
		expectedForGetKey       bool
 | 
			
		||||
		expectedDeletions       int32
 | 
			
		||||
		expectedActive          int32
 | 
			
		||||
		expectedSucceeded       int32
 | 
			
		||||
@@ -1719,7 +1714,6 @@ func TestSyncJobPastDeadline(t *testing.T) {
 | 
			
		||||
			startTime:               15,
 | 
			
		||||
			backoffLimit:            6,
 | 
			
		||||
			activePods:              1,
 | 
			
		||||
			expectedForGetKey:       false,
 | 
			
		||||
			expectedDeletions:       1,
 | 
			
		||||
			expectedFailed:          1,
 | 
			
		||||
			expectedCondition:       batch.JobFailed,
 | 
			
		||||
@@ -1733,7 +1727,6 @@ func TestSyncJobPastDeadline(t *testing.T) {
 | 
			
		||||
			backoffLimit:            6,
 | 
			
		||||
			activePods:              1,
 | 
			
		||||
			succeededPods:           1,
 | 
			
		||||
			expectedForGetKey:       true,
 | 
			
		||||
			expectedDeletions:       1,
 | 
			
		||||
			expectedSucceeded:       1,
 | 
			
		||||
			expectedFailed:          1,
 | 
			
		||||
@@ -1746,7 +1739,6 @@ func TestSyncJobPastDeadline(t *testing.T) {
 | 
			
		||||
			activeDeadlineSeconds:   10,
 | 
			
		||||
			startTime:               10,
 | 
			
		||||
			backoffLimit:            6,
 | 
			
		||||
			expectedForGetKey:       false,
 | 
			
		||||
			expectedCondition:       batch.JobFailed,
 | 
			
		||||
			expectedConditionReason: "DeadlineExceeded",
 | 
			
		||||
		},
 | 
			
		||||
@@ -1756,7 +1748,6 @@ func TestSyncJobPastDeadline(t *testing.T) {
 | 
			
		||||
			activeDeadlineSeconds:   1,
 | 
			
		||||
			startTime:               10,
 | 
			
		||||
			failedPods:              1,
 | 
			
		||||
			expectedForGetKey:       false,
 | 
			
		||||
			expectedFailed:          1,
 | 
			
		||||
			expectedCondition:       batch.JobFailed,
 | 
			
		||||
			expectedConditionReason: "BackoffLimitExceeded",
 | 
			
		||||
@@ -1768,7 +1759,6 @@ func TestSyncJobPastDeadline(t *testing.T) {
 | 
			
		||||
			activeDeadlineSeconds:   10,
 | 
			
		||||
			startTime:               15,
 | 
			
		||||
			backoffLimit:            6,
 | 
			
		||||
			expectedForGetKey:       true,
 | 
			
		||||
			expectedCondition:       batch.JobSuspended,
 | 
			
		||||
			expectedConditionReason: "JobSuspended",
 | 
			
		||||
		},
 | 
			
		||||
@@ -3898,80 +3888,38 @@ func bumpResourceVersion(obj metav1.Object) {
 | 
			
		||||
	obj.SetResourceVersion(strconv.FormatInt(ver+1, 10))
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type pods struct {
 | 
			
		||||
	pending int
 | 
			
		||||
	active  int
 | 
			
		||||
	succeed int
 | 
			
		||||
	failed  int
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestJobBackoffReset(t *testing.T) {
 | 
			
		||||
func TestJobApiBackoffReset(t *testing.T) {
 | 
			
		||||
	_, ctx := ktesting.NewTestContext(t)
 | 
			
		||||
	testCases := map[string]struct {
 | 
			
		||||
		// job setup
 | 
			
		||||
		parallelism  int32
 | 
			
		||||
		completions  int32
 | 
			
		||||
		backoffLimit int32
 | 
			
		||||
 | 
			
		||||
		// pod setup - each row is additive!
 | 
			
		||||
		pods []pods
 | 
			
		||||
	}{
 | 
			
		||||
		"parallelism=1": {
 | 
			
		||||
			1, 2, 1,
 | 
			
		||||
			[]pods{
 | 
			
		||||
				{0, 1, 0, 1},
 | 
			
		||||
				{0, 0, 1, 0},
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
		"parallelism=2 (just failure)": {
 | 
			
		||||
			2, 2, 1,
 | 
			
		||||
			[]pods{
 | 
			
		||||
				{0, 2, 0, 1},
 | 
			
		||||
				{0, 0, 1, 0},
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for name, tc := range testCases {
 | 
			
		||||
	clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
 | 
			
		||||
		defer func() { DefaultJobApiBackOff = 1 * time.Second }()
 | 
			
		||||
		DefaultJobApiBackOff = time.Duration(0) // overwrite the default value for testing
 | 
			
		||||
	manager, sharedInformerFactory := newControllerFromClient(ctx, clientset, controller.NoResyncPeriodFunc)
 | 
			
		||||
	fakePodControl := controller.FakePodControl{}
 | 
			
		||||
	manager.podControl = &fakePodControl
 | 
			
		||||
	manager.podStoreSynced = alwaysReady
 | 
			
		||||
	manager.jobStoreSynced = alwaysReady
 | 
			
		||||
		var actual *batch.Job
 | 
			
		||||
	manager.updateStatusHandler = func(ctx context.Context, job *batch.Job) (*batch.Job, error) {
 | 
			
		||||
			actual = job
 | 
			
		||||
		return job, nil
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
		// job & pods setup
 | 
			
		||||
		job := newJob(tc.parallelism, tc.completions, tc.backoffLimit, batch.NonIndexedCompletion)
 | 
			
		||||
	job := newJob(1, 1, 2, batch.NonIndexedCompletion)
 | 
			
		||||
	key := testutil.GetKey(job, t)
 | 
			
		||||
	sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job)
 | 
			
		||||
		podIndexer := sharedInformerFactory.Core().V1().Pods().Informer().GetIndexer()
 | 
			
		||||
 | 
			
		||||
		setPodsStatuses(podIndexer, job, tc.pods[0].pending, tc.pods[0].active, tc.pods[0].succeed, tc.pods[0].failed, 0)
 | 
			
		||||
	// error returned make the key requeued
 | 
			
		||||
	fakePodControl.Err = errors.New("Controller error")
 | 
			
		||||
	manager.queue.Add(key)
 | 
			
		||||
	manager.processNextWorkItem(context.TODO())
 | 
			
		||||
	retries := manager.queue.NumRequeues(key)
 | 
			
		||||
	if retries != 1 {
 | 
			
		||||
			t.Errorf("%s: expected exactly 1 retry, got %d", name, retries)
 | 
			
		||||
		t.Fatalf("%s: expected exactly 1 retry, got %d", job.Name, retries)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
		job = actual
 | 
			
		||||
		sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Replace([]interface{}{actual}, actual.ResourceVersion)
 | 
			
		||||
		setPodsStatuses(podIndexer, job, tc.pods[1].pending, tc.pods[1].active, tc.pods[1].succeed, tc.pods[1].failed, 0)
 | 
			
		||||
	// the queue is emptied on success
 | 
			
		||||
	fakePodControl.Err = nil
 | 
			
		||||
	manager.processNextWorkItem(context.TODO())
 | 
			
		||||
	retries = manager.queue.NumRequeues(key)
 | 
			
		||||
	if retries != 0 {
 | 
			
		||||
			t.Errorf("%s: expected exactly 0 retries, got %d", name, retries)
 | 
			
		||||
		}
 | 
			
		||||
		if getCondition(actual, batch.JobFailed, v1.ConditionTrue, "BackoffLimitExceeded") {
 | 
			
		||||
			t.Errorf("%s: unexpected job failure", name)
 | 
			
		||||
		}
 | 
			
		||||
		t.Fatalf("%s: expected exactly 0 retries, got %d", job.Name, retries)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@@ -4066,7 +4014,6 @@ func TestJobBackoffForOnFailure(t *testing.T) {
 | 
			
		||||
		suspend      bool
 | 
			
		||||
 | 
			
		||||
		// pod setup
 | 
			
		||||
		jobKeyForget  bool
 | 
			
		||||
		restartCounts []int32
 | 
			
		||||
		podPhase      v1.PodPhase
 | 
			
		||||
 | 
			
		||||
@@ -4078,57 +4025,57 @@ func TestJobBackoffForOnFailure(t *testing.T) {
 | 
			
		||||
		expectedConditionReason string
 | 
			
		||||
	}{
 | 
			
		||||
		"backoffLimit 0 should have 1 pod active": {
 | 
			
		||||
			1, 1, 0, false,
 | 
			
		||||
			1, 1, 0,
 | 
			
		||||
			false, []int32{0}, v1.PodRunning,
 | 
			
		||||
			1, 0, 0, nil, "",
 | 
			
		||||
		},
 | 
			
		||||
		"backoffLimit 1 with restartCount 0 should have 1 pod active": {
 | 
			
		||||
			1, 1, 1, false,
 | 
			
		||||
			1, 1, 1,
 | 
			
		||||
			false, []int32{0}, v1.PodRunning,
 | 
			
		||||
			1, 0, 0, nil, "",
 | 
			
		||||
		},
 | 
			
		||||
		"backoffLimit 1 with restartCount 1 and podRunning should have 0 pod active": {
 | 
			
		||||
			1, 1, 1, false,
 | 
			
		||||
			1, 1, 1,
 | 
			
		||||
			false, []int32{1}, v1.PodRunning,
 | 
			
		||||
			0, 0, 1, &jobConditionFailed, "BackoffLimitExceeded",
 | 
			
		||||
		},
 | 
			
		||||
		"backoffLimit 1 with restartCount 1 and podPending should have 0 pod active": {
 | 
			
		||||
			1, 1, 1, false,
 | 
			
		||||
			1, 1, 1,
 | 
			
		||||
			false, []int32{1}, v1.PodPending,
 | 
			
		||||
			0, 0, 1, &jobConditionFailed, "BackoffLimitExceeded",
 | 
			
		||||
		},
 | 
			
		||||
		"too many job failures with podRunning - single pod": {
 | 
			
		||||
			1, 5, 2, false,
 | 
			
		||||
			1, 5, 2,
 | 
			
		||||
			false, []int32{2}, v1.PodRunning,
 | 
			
		||||
			0, 0, 1, &jobConditionFailed, "BackoffLimitExceeded",
 | 
			
		||||
		},
 | 
			
		||||
		"too many job failures with podPending - single pod": {
 | 
			
		||||
			1, 5, 2, false,
 | 
			
		||||
			1, 5, 2,
 | 
			
		||||
			false, []int32{2}, v1.PodPending,
 | 
			
		||||
			0, 0, 1, &jobConditionFailed, "BackoffLimitExceeded",
 | 
			
		||||
		},
 | 
			
		||||
		"too many job failures with podRunning - multiple pods": {
 | 
			
		||||
			2, 5, 2, false,
 | 
			
		||||
			2, 5, 2,
 | 
			
		||||
			false, []int32{1, 1}, v1.PodRunning,
 | 
			
		||||
			0, 0, 2, &jobConditionFailed, "BackoffLimitExceeded",
 | 
			
		||||
		},
 | 
			
		||||
		"too many job failures with podPending - multiple pods": {
 | 
			
		||||
			2, 5, 2, false,
 | 
			
		||||
			2, 5, 2,
 | 
			
		||||
			false, []int32{1, 1}, v1.PodPending,
 | 
			
		||||
			0, 0, 2, &jobConditionFailed, "BackoffLimitExceeded",
 | 
			
		||||
		},
 | 
			
		||||
		"not enough failures": {
 | 
			
		||||
			2, 5, 3, false,
 | 
			
		||||
			2, 5, 3,
 | 
			
		||||
			false, []int32{1, 1}, v1.PodRunning,
 | 
			
		||||
			2, 0, 0, nil, "",
 | 
			
		||||
		},
 | 
			
		||||
		"suspending a job": {
 | 
			
		||||
			2, 4, 6, true,
 | 
			
		||||
			2, 4, 6,
 | 
			
		||||
			true, []int32{1, 1}, v1.PodRunning,
 | 
			
		||||
			0, 0, 0, &jobConditionSuspended, "JobSuspended",
 | 
			
		||||
		},
 | 
			
		||||
		"finshed job": {
 | 
			
		||||
			2, 4, 6, true,
 | 
			
		||||
			2, 4, 6,
 | 
			
		||||
			true, []int32{1, 1, 2, 0}, v1.PodSucceeded,
 | 
			
		||||
			0, 4, 0, &jobConditionComplete, "",
 | 
			
		||||
		},
 | 
			
		||||
@@ -4200,8 +4147,6 @@ func TestJobBackoffOnRestartPolicyNever(t *testing.T) {
 | 
			
		||||
		failedPods      int
 | 
			
		||||
 | 
			
		||||
		// expectations
 | 
			
		||||
		isExpectingAnError      bool
 | 
			
		||||
		jobKeyForget            bool
 | 
			
		||||
		expectedActive          int32
 | 
			
		||||
		expectedSucceeded       int32
 | 
			
		||||
		expectedFailed          int32
 | 
			
		||||
@@ -4211,27 +4156,27 @@ func TestJobBackoffOnRestartPolicyNever(t *testing.T) {
 | 
			
		||||
		"not enough failures with backoffLimit 0 - single pod": {
 | 
			
		||||
			1, 1, 0,
 | 
			
		||||
			v1.PodRunning, 1, 0,
 | 
			
		||||
			false, false, 1, 0, 0, nil, "",
 | 
			
		||||
			1, 0, 0, nil, "",
 | 
			
		||||
		},
 | 
			
		||||
		"not enough failures with backoffLimit 1 - single pod": {
 | 
			
		||||
			1, 1, 1,
 | 
			
		||||
			"", 0, 1,
 | 
			
		||||
			true, false, 1, 0, 1, nil, "",
 | 
			
		||||
			1, 0, 1, nil, "",
 | 
			
		||||
		},
 | 
			
		||||
		"too many failures with backoffLimit 1 - single pod": {
 | 
			
		||||
			1, 1, 1,
 | 
			
		||||
			"", 0, 2,
 | 
			
		||||
			false, false, 0, 0, 2, &jobConditionFailed, "BackoffLimitExceeded",
 | 
			
		||||
			0, 0, 2, &jobConditionFailed, "BackoffLimitExceeded",
 | 
			
		||||
		},
 | 
			
		||||
		"not enough failures with backoffLimit 6 - multiple pods": {
 | 
			
		||||
			2, 2, 6,
 | 
			
		||||
			v1.PodRunning, 1, 6,
 | 
			
		||||
			true, false, 2, 0, 6, nil, "",
 | 
			
		||||
			2, 0, 6, nil, "",
 | 
			
		||||
		},
 | 
			
		||||
		"too many failures with backoffLimit 6 - multiple pods": {
 | 
			
		||||
			2, 2, 6,
 | 
			
		||||
			"", 0, 7,
 | 
			
		||||
			false, false, 0, 0, 7, &jobConditionFailed, "BackoffLimitExceeded",
 | 
			
		||||
			0, 0, 7, &jobConditionFailed, "BackoffLimitExceeded",
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
@@ -4267,9 +4212,8 @@ func TestJobBackoffOnRestartPolicyNever(t *testing.T) {
 | 
			
		||||
 | 
			
		||||
			// run
 | 
			
		||||
			err := manager.syncJob(context.TODO(), testutil.GetKey(job, t))
 | 
			
		||||
 | 
			
		||||
			if (err != nil) != tc.isExpectingAnError {
 | 
			
		||||
				t.Errorf("unexpected error syncing job. Got %#v, isExpectingAnError: %v\n", err, tc.isExpectingAnError)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				t.Fatalf("unexpected error syncing job: %#v\n", err)
 | 
			
		||||
			}
 | 
			
		||||
			// validate status
 | 
			
		||||
			if actual.Status.Active != tc.expectedActive {
 | 
			
		||||
@@ -4490,23 +4434,6 @@ func checkJobCompletionEnvVariable(t *testing.T, spec *v1.PodSpec) {
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// hasValidFailingPods checks if there exists failed pods with valid index.
 | 
			
		||||
func hasValidFailingPods(status []indexPhase, completions int) bool {
 | 
			
		||||
	for _, s := range status {
 | 
			
		||||
		ix, err := strconv.Atoi(s.Index)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
		if ix < 0 || ix >= completions {
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
		if s.Phase == v1.PodFailed {
 | 
			
		||||
			return true
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return false
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type podBuilder struct {
 | 
			
		||||
	*v1.Pod
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user