Graduate JobTrackingWithFinalizers to stable

Change-Id: Ifc749a85b1270c0155ac511b91d4681d53236820
This commit is contained in:
Aldo Culquicondor
2022-11-01 11:16:34 -04:00
parent c8a3657bde
commit 4948918155
15 changed files with 306 additions and 564 deletions

View File

@@ -71,8 +71,6 @@ type metricLabelsWithValue struct {
func TestMetricsOnSuccesses(t *testing.T) {
nonIndexedCompletion := batchv1.NonIndexedCompletion
indexedCompletion := batchv1.IndexedCompletion
wFinalizers := true
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, wFinalizers)()
// setup the job controller
closeFn, restConfig, clientSet, ns := setup(t, "simple")
@@ -135,7 +133,7 @@ func TestMetricsOnSuccesses(t *testing.T) {
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Active: int(*jobObj.Spec.Parallelism),
Ready: pointer.Int32(0),
}, wFinalizers)
})
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, int(*jobObj.Spec.Parallelism)); err != nil {
t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodSucceeded, err)
}
@@ -150,7 +148,6 @@ func TestMetricsOnSuccesses(t *testing.T) {
}
func TestJobFinishedNumReasonMetric(t *testing.T) {
wFinalizers := true
// setup the job controller
closeFn, restConfig, clientSet, ns := setup(t, "simple")
defer closeFn()
@@ -257,7 +254,6 @@ func TestJobFinishedNumReasonMetric(t *testing.T) {
job_index := 0 // job index to avoid collisions between job names created by different test cases
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, wFinalizers)()
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodFailurePolicy, tc.enableJobPodFailurePolicy)()
resetMetrics()
// create a single job and wait for its completion
@@ -271,7 +267,7 @@ func TestJobFinishedNumReasonMetric(t *testing.T) {
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Active: int(*jobObj.Spec.Parallelism),
Ready: pointer.Int32(0),
}, wFinalizers)
})
op := func(p *v1.Pod) bool {
p.Status = tc.podStatus
@@ -375,8 +371,6 @@ func TestJobPodFailurePolicyWithFailedPodDeletedDuringControllerRestart(t *testi
},
},
}
wFinalizers := true
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, wFinalizers)()
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodFailurePolicy, true)()
closeFn, restConfig, cs, ns := setup(t, "simple")
defer closeFn()
@@ -400,7 +394,7 @@ func TestJobPodFailurePolicyWithFailedPodDeletedDuringControllerRestart(t *testi
validateJobPodsStatus(ctx, t, cs, jobObj, podsByStatus{
Active: count,
Ready: pointer.Int32(0),
}, wFinalizers)
})
jobPods, err := getJobPods(ctx, t, cs, jobObj)
if err != nil {
@@ -624,70 +618,9 @@ func TestJobPodFailurePolicy(t *testing.T) {
},
}
for name, test := range testCases {
for _, wFinalizers := range []bool{false, true} {
t.Run(fmt.Sprintf("%s; finalizers=%t", name, wFinalizers), func(t *testing.T) {
resetMetrics()
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, wFinalizers)()
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodFailurePolicy, test.enableJobPodFailurePolicy)()
closeFn, restConfig, clientSet, ns := setup(t, "simple")
defer closeFn()
ctx, cancel := startJobControllerAndWaitForCaches(restConfig)
defer func() {
cancel()
}()
jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &test.job)
if err != nil {
t.Fatalf("Error %q while creating the job %q", err, jobObj.Name)
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Active: 1,
Ready: pointer.Int32(0),
}, wFinalizers)
op := func(p *v1.Pod) bool {
p.Status = test.podStatus
return true
}
if err, _ := updateJobPodsStatus(ctx, clientSet, jobObj, op, 1); err != nil {
t.Fatalf("Error %q while updating pod status for Job: %q", err, jobObj.Name)
}
if test.restartController {
cancel()
ctx, cancel = startJobControllerAndWaitForCaches(restConfig)
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Active: test.wantActive,
Failed: test.wantFailed,
Ready: pointer.Int32(0),
}, wFinalizers)
if test.wantJobConditionType == batchv1.JobComplete {
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil {
t.Fatalf("Failed setting phase %q on Job Pod: %q", v1.PodSucceeded, err)
}
}
validateJobCondition(ctx, t, clientSet, jobObj, test.wantJobConditionType)
if wFinalizers && test.wantPodFailuresHandledByPolicyRuleMetric != nil {
validateCounterMetric(t, metrics.PodFailuresHandledByFailurePolicy, *test.wantPodFailuresHandledByPolicyRuleMetric)
}
validateFinishedPodsNoFinalizer(ctx, t, clientSet, jobObj)
})
}
}
}
// TestNonParallelJob tests that a Job that only executes one Pod. The test
// recreates the Job controller at some points to make sure a new controller
// is able to pickup.
func TestNonParallelJob(t *testing.T) {
for _, wFinalizers := range []bool{false, true} {
t.Run(fmt.Sprintf("finalizers=%t", wFinalizers), func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, wFinalizers)()
t.Run(name, func(t *testing.T) {
resetMetrics()
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodFailurePolicy, test.enableJobPodFailurePolicy)()
closeFn, restConfig, clientSet, ns := setup(t, "simple")
defer closeFn()
@@ -696,71 +629,115 @@ func TestNonParallelJob(t *testing.T) {
cancel()
}()
jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{})
jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &test.job)
if err != nil {
t.Fatalf("Failed to create Job: %v", err)
}
if got := hasJobTrackingAnnotation(jobObj); got != wFinalizers {
t.Errorf("apiserver created job with tracking annotation: %t, want %t", got, wFinalizers)
t.Fatalf("Error %q while creating the job %q", err, jobObj.Name)
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Active: 1,
Ready: pointer.Int32(0),
}, wFinalizers)
})
// Restarting controller.
cancel()
ctx, cancel = startJobControllerAndWaitForCaches(restConfig)
// Failed Pod is replaced.
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 1); err != nil {
t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodFailed, err)
op := func(p *v1.Pod) bool {
p.Status = test.podStatus
return true
}
if err, _ := updateJobPodsStatus(ctx, clientSet, jobObj, op, 1); err != nil {
t.Fatalf("Error %q while updating pod status for Job: %q", err, jobObj.Name)
}
if test.restartController {
cancel()
ctx, cancel = startJobControllerAndWaitForCaches(restConfig)
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Active: 1,
Failed: 1,
Active: test.wantActive,
Failed: test.wantFailed,
Ready: pointer.Int32(0),
}, wFinalizers)
})
// Restarting controller.
cancel()
ctx, cancel = startJobControllerAndWaitForCaches(restConfig)
// No more Pods are created after the Pod succeeds.
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil {
t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodSucceeded, err)
if test.wantJobConditionType == batchv1.JobComplete {
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil {
t.Fatalf("Failed setting phase %q on Job Pod: %q", v1.PodSucceeded, err)
}
}
validateJobCondition(ctx, t, clientSet, jobObj, test.wantJobConditionType)
if test.wantPodFailuresHandledByPolicyRuleMetric != nil {
validateCounterMetric(t, metrics.PodFailuresHandledByFailurePolicy, *test.wantPodFailuresHandledByPolicyRuleMetric)
}
validateJobSucceeded(ctx, t, clientSet, jobObj)
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Failed: 1,
Succeeded: 1,
Ready: pointer.Int32(0),
}, false)
validateFinishedPodsNoFinalizer(ctx, t, clientSet, jobObj)
})
}
}
// TestNonParallelJob tests that a Job that only executes one Pod. The test
// recreates the Job controller at some points to make sure a new controller
// is able to pickup.
func TestNonParallelJob(t *testing.T) {
closeFn, restConfig, clientSet, ns := setup(t, "simple")
defer closeFn()
ctx, cancel := startJobControllerAndWaitForCaches(restConfig)
defer func() {
cancel()
}()
jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{})
if err != nil {
t.Fatalf("Failed to create Job: %v", err)
}
if !hasJobTrackingAnnotation(jobObj) {
t.Error("apiserver created job without tracking annotation")
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Active: 1,
Ready: pointer.Int32(0),
})
// Restarting controller.
cancel()
ctx, cancel = startJobControllerAndWaitForCaches(restConfig)
// Failed Pod is replaced.
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 1); err != nil {
t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodFailed, err)
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Active: 1,
Failed: 1,
Ready: pointer.Int32(0),
})
// Restarting controller.
cancel()
ctx, cancel = startJobControllerAndWaitForCaches(restConfig)
// No more Pods are created after the Pod succeeds.
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil {
t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodSucceeded, err)
}
validateJobSucceeded(ctx, t, clientSet, jobObj)
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Failed: 1,
Succeeded: 1,
Ready: pointer.Int32(0),
})
validateFinishedPodsNoFinalizer(ctx, t, clientSet, jobObj)
}
func TestParallelJob(t *testing.T) {
cases := map[string]struct {
trackWithFinalizers bool
enableReadyPods bool
}{
"none": {},
"with finalizers": {
trackWithFinalizers: true,
},
"ready pods": {
enableReadyPods: true,
},
"all": {
trackWithFinalizers: true,
enableReadyPods: true,
},
}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, tc.trackWithFinalizers)()
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobReadyPods, tc.enableReadyPods)()
closeFn, restConfig, clientSet, ns := setup(t, "parallel")
@@ -781,7 +758,7 @@ func TestParallelJob(t *testing.T) {
if tc.enableReadyPods {
want.Ready = pointer.Int32Ptr(0)
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, want, tc.trackWithFinalizers)
validateJobPodsStatus(ctx, t, clientSet, jobObj, want)
// Tracks ready pods, if enabled.
if err, _ := setJobPodsReady(ctx, clientSet, jobObj, 2); err != nil {
@@ -790,7 +767,7 @@ func TestParallelJob(t *testing.T) {
if tc.enableReadyPods {
*want.Ready = 2
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, want, tc.trackWithFinalizers)
validateJobPodsStatus(ctx, t, clientSet, jobObj, want)
// Failed Pods are replaced.
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil {
@@ -803,7 +780,7 @@ func TestParallelJob(t *testing.T) {
if tc.enableReadyPods {
want.Ready = pointer.Int32(0)
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, want, tc.trackWithFinalizers)
validateJobPodsStatus(ctx, t, clientSet, jobObj, want)
// Once one Pod succeeds, no more Pods are created, even if some fail.
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil {
t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodSucceeded, err)
@@ -816,7 +793,7 @@ func TestParallelJob(t *testing.T) {
if tc.enableReadyPods {
want.Ready = pointer.Int32(0)
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, want, tc.trackWithFinalizers)
validateJobPodsStatus(ctx, t, clientSet, jobObj, want)
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil {
t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodFailed, err)
}
@@ -828,7 +805,7 @@ func TestParallelJob(t *testing.T) {
if tc.enableReadyPods {
want.Ready = pointer.Int32(0)
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, want, tc.trackWithFinalizers)
validateJobPodsStatus(ctx, t, clientSet, jobObj, want)
// No more Pods are created after remaining Pods succeed.
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 2); err != nil {
t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodSucceeded, err)
@@ -841,7 +818,7 @@ func TestParallelJob(t *testing.T) {
if tc.enableReadyPods {
want.Ready = pointer.Int32(0)
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, want, false)
validateJobPodsStatus(ctx, t, clientSet, jobObj, want)
validateFinishedPodsNoFinalizer(ctx, t, clientSet, jobObj)
if tc.trackWithFinalizers {
validateTerminatedPodsTrackingFinalizerMetric(t, 7)
@@ -851,63 +828,57 @@ func TestParallelJob(t *testing.T) {
}
func TestParallelJobParallelism(t *testing.T) {
for _, wFinalizers := range []bool{false, true} {
t.Run(fmt.Sprintf("finalizers=%t", wFinalizers), func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, wFinalizers)()
closeFn, restConfig, clientSet, ns := setup(t, "parallel")
defer closeFn()
ctx, cancel := startJobControllerAndWaitForCaches(restConfig)
defer cancel()
closeFn, restConfig, clientSet, ns := setup(t, "parallel")
defer closeFn()
ctx, cancel := startJobControllerAndWaitForCaches(restConfig)
defer cancel()
jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
Spec: batchv1.JobSpec{
BackoffLimit: pointer.Int32(2),
Parallelism: pointer.Int32Ptr(5),
},
})
if err != nil {
t.Fatalf("Failed to create Job: %v", err)
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Active: 5,
Ready: pointer.Int32(0),
}, wFinalizers)
// Reduce parallelism by a number greater than backoffLimit.
patch := []byte(`{"spec":{"parallelism":2}}`)
jobObj, err = clientSet.BatchV1().Jobs(ns.Name).Patch(ctx, jobObj.Name, types.StrategicMergePatchType, patch, metav1.PatchOptions{})
if err != nil {
t.Fatalf("Updating Job: %v", err)
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Active: 2,
Ready: pointer.Int32(0),
}, wFinalizers)
// Increase parallelism again.
patch = []byte(`{"spec":{"parallelism":4}}`)
jobObj, err = clientSet.BatchV1().Jobs(ns.Name).Patch(ctx, jobObj.Name, types.StrategicMergePatchType, patch, metav1.PatchOptions{})
if err != nil {
t.Fatalf("Updating Job: %v", err)
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Active: 4,
Ready: pointer.Int32(0),
}, wFinalizers)
// Succeed Job
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 4); err != nil {
t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodFailed, err)
}
validateJobSucceeded(ctx, t, clientSet, jobObj)
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Succeeded: 4,
Ready: pointer.Int32(0),
}, false)
validateFinishedPodsNoFinalizer(ctx, t, clientSet, jobObj)
})
jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
Spec: batchv1.JobSpec{
BackoffLimit: pointer.Int32(2),
Parallelism: pointer.Int32Ptr(5),
},
})
if err != nil {
t.Fatalf("Failed to create Job: %v", err)
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Active: 5,
Ready: pointer.Int32(0),
})
// Reduce parallelism by a number greater than backoffLimit.
patch := []byte(`{"spec":{"parallelism":2}}`)
jobObj, err = clientSet.BatchV1().Jobs(ns.Name).Patch(ctx, jobObj.Name, types.StrategicMergePatchType, patch, metav1.PatchOptions{})
if err != nil {
t.Fatalf("Updating Job: %v", err)
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Active: 2,
Ready: pointer.Int32(0),
})
// Increase parallelism again.
patch = []byte(`{"spec":{"parallelism":4}}`)
jobObj, err = clientSet.BatchV1().Jobs(ns.Name).Patch(ctx, jobObj.Name, types.StrategicMergePatchType, patch, metav1.PatchOptions{})
if err != nil {
t.Fatalf("Updating Job: %v", err)
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Active: 4,
Ready: pointer.Int32(0),
})
// Succeed Job
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 4); err != nil {
t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodFailed, err)
}
validateJobSucceeded(ctx, t, clientSet, jobObj)
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Succeeded: 4,
Ready: pointer.Int32(0),
})
validateFinishedPodsNoFinalizer(ctx, t, clientSet, jobObj)
}
func TestParallelJobWithCompletions(t *testing.T) {
@@ -916,24 +887,15 @@ func TestParallelJobWithCompletions(t *testing.T) {
t.Cleanup(setDuringTest(&jobcontroller.MaxUncountedPods, 10))
t.Cleanup(setDuringTest(&jobcontroller.MaxPodCreateDeletePerSync, 10))
cases := map[string]struct {
trackWithFinalizers bool
enableReadyPods bool
enableReadyPods bool
}{
"none": {},
"with finalizers": {
trackWithFinalizers: true,
},
"ready pods": {
enableReadyPods: true,
},
"all": {
trackWithFinalizers: true,
enableReadyPods: true,
},
}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, tc.trackWithFinalizers)()
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobReadyPods, tc.enableReadyPods)()
closeFn, restConfig, clientSet, ns := setup(t, "completions")
defer closeFn()
@@ -949,14 +911,14 @@ func TestParallelJobWithCompletions(t *testing.T) {
if err != nil {
t.Fatalf("Failed to create Job: %v", err)
}
if got := hasJobTrackingAnnotation(jobObj); got != tc.trackWithFinalizers {
t.Errorf("apiserver created job with tracking annotation: %t, want %t", got, tc.trackWithFinalizers)
if !hasJobTrackingAnnotation(jobObj) {
t.Error("apiserver created job without tracking annotation")
}
want := podsByStatus{Active: 54}
if tc.enableReadyPods {
want.Ready = pointer.Int32Ptr(0)
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, want, tc.trackWithFinalizers)
validateJobPodsStatus(ctx, t, clientSet, jobObj, want)
// Tracks ready pods, if enabled.
if err, _ := setJobPodsReady(ctx, clientSet, jobObj, 52); err != nil {
@@ -965,7 +927,7 @@ func TestParallelJobWithCompletions(t *testing.T) {
if tc.enableReadyPods {
want.Ready = pointer.Int32(52)
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, want, tc.trackWithFinalizers)
validateJobPodsStatus(ctx, t, clientSet, jobObj, want)
// Failed Pods are replaced.
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil {
@@ -978,7 +940,7 @@ func TestParallelJobWithCompletions(t *testing.T) {
if tc.enableReadyPods {
want.Ready = pointer.Int32(50)
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, want, tc.trackWithFinalizers)
validateJobPodsStatus(ctx, t, clientSet, jobObj, want)
// Pods are created until the number of succeeded Pods equals completions.
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 53); err != nil {
t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodSucceeded, err)
@@ -991,7 +953,7 @@ func TestParallelJobWithCompletions(t *testing.T) {
if tc.enableReadyPods {
want.Ready = pointer.Int32(0)
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, want, tc.trackWithFinalizers)
validateJobPodsStatus(ctx, t, clientSet, jobObj, want)
// No more Pods are created after the Job completes.
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 3); err != nil {
t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodSucceeded, err)
@@ -1004,84 +966,76 @@ func TestParallelJobWithCompletions(t *testing.T) {
if tc.enableReadyPods {
want.Ready = pointer.Int32(0)
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, want, false)
validateJobPodsStatus(ctx, t, clientSet, jobObj, want)
validateFinishedPodsNoFinalizer(ctx, t, clientSet, jobObj)
})
}
}
func TestIndexedJob(t *testing.T) {
for _, wFinalizers := range []bool{false, true} {
t.Run(fmt.Sprintf("finalizers=%t", wFinalizers), func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, wFinalizers)()
closeFn, restConfig, clientSet, ns := setup(t, "indexed")
defer closeFn()
ctx, cancel := startJobControllerAndWaitForCaches(restConfig)
defer cancel()
resetMetrics()
closeFn, restConfig, clientSet, ns := setup(t, "indexed")
defer closeFn()
ctx, cancel := startJobControllerAndWaitForCaches(restConfig)
defer cancel()
resetMetrics()
mode := batchv1.IndexedCompletion
jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
Spec: batchv1.JobSpec{
Parallelism: pointer.Int32Ptr(3),
Completions: pointer.Int32Ptr(4),
CompletionMode: &mode,
},
})
if err != nil {
t.Fatalf("Failed to create Job: %v", err)
}
if got := hasJobTrackingAnnotation(jobObj); got != wFinalizers {
t.Errorf("apiserver created job with tracking annotation: %t, want %t", got, wFinalizers)
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Active: 3,
Ready: pointer.Int32(0),
}, wFinalizers)
validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.NewInt(0, 1, 2), "")
// One Pod succeeds.
if err := setJobPhaseForIndex(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil {
t.Fatal("Failed trying to succeed pod with index 1")
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Active: 3,
Succeeded: 1,
Ready: pointer.Int32(0),
}, wFinalizers)
validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.NewInt(0, 2, 3), "1")
// One Pod fails, which should be recreated.
if err := setJobPhaseForIndex(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil {
t.Fatal("Failed trying to succeed pod with index 2")
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Active: 3,
Failed: 1,
Succeeded: 1,
Ready: pointer.Int32(0),
}, wFinalizers)
validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.NewInt(0, 2, 3), "1")
// Remaining Pods succeed.
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 3); err != nil {
t.Fatal("Failed trying to succeed remaining pods")
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Active: 0,
Failed: 1,
Succeeded: 4,
Ready: pointer.Int32(0),
}, false)
validateIndexedJobPods(ctx, t, clientSet, jobObj, nil, "0-3")
validateJobSucceeded(ctx, t, clientSet, jobObj)
validateFinishedPodsNoFinalizer(ctx, t, clientSet, jobObj)
if wFinalizers {
validateTerminatedPodsTrackingFinalizerMetric(t, 5)
}
})
mode := batchv1.IndexedCompletion
jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
Spec: batchv1.JobSpec{
Parallelism: pointer.Int32Ptr(3),
Completions: pointer.Int32Ptr(4),
CompletionMode: &mode,
},
})
if err != nil {
t.Fatalf("Failed to create Job: %v", err)
}
if !hasJobTrackingAnnotation(jobObj) {
t.Error("apiserver created job without tracking annotation")
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Active: 3,
Ready: pointer.Int32(0),
})
validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.NewInt(0, 1, 2), "")
// One Pod succeeds.
if err := setJobPhaseForIndex(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil {
t.Fatal("Failed trying to succeed pod with index 1")
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Active: 3,
Succeeded: 1,
Ready: pointer.Int32(0),
})
validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.NewInt(0, 2, 3), "1")
// One Pod fails, which should be recreated.
if err := setJobPhaseForIndex(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil {
t.Fatal("Failed trying to succeed pod with index 2")
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Active: 3,
Failed: 1,
Succeeded: 1,
Ready: pointer.Int32(0),
})
validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.NewInt(0, 2, 3), "1")
// Remaining Pods succeed.
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 3); err != nil {
t.Fatal("Failed trying to succeed remaining pods")
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Active: 0,
Failed: 1,
Succeeded: 4,
Ready: pointer.Int32(0),
})
validateIndexedJobPods(ctx, t, clientSet, jobObj, nil, "0-3")
validateJobSucceeded(ctx, t, clientSet, jobObj)
validateFinishedPodsNoFinalizer(ctx, t, clientSet, jobObj)
validateTerminatedPodsTrackingFinalizerMetric(t, 5)
}
// BenchmarkLargeIndexedJob benchmarks the completion of an Indexed Job.
@@ -1163,87 +1117,7 @@ func BenchmarkLargeIndexedJob(b *testing.B) {
}
}
// TestDisableJobTrackingWithFinalizers ensures that when the
// JobTrackingWithFinalizers feature is disabled, tracking finalizers are
// removed from all pods, but Job continues to be tracked.
// This test can be removed once the feature graduates to GA.
func TestDisableJobTrackingWithFinalizers(t *testing.T) {
// Step 1: job created while feature is enabled.
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, true)()
closeFn, restConfig, clientSet, ns := setup(t, "simple")
defer closeFn()
ctx, cancel := startJobControllerAndWaitForCaches(restConfig)
defer func() {
cancel()
}()
jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
Spec: batchv1.JobSpec{
Parallelism: pointer.Int32Ptr(2),
},
})
if err != nil {
t.Fatalf("Failed to create Job: %v", err)
}
if !hasJobTrackingAnnotation(jobObj) {
t.Error("apiserver didn't add the tracking annotation")
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Active: 2,
Ready: pointer.Int32(0),
}, true)
// Step 2: Disable tracking with finalizers.
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, false)()
cancel()
// Fail a pod while Job controller is stopped.
if err, _ := setJobPodsPhase(context.Background(), clientSet, jobObj, v1.PodFailed, 1); err != nil {
t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodFailed, err)
}
// Restart controller.
ctx, cancel = startJobControllerAndWaitForCaches(restConfig)
// Ensure Job continues to be tracked and finalizers are removed.
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Active: 2,
Failed: 1,
Ready: pointer.Int32(0),
}, false)
jobObj, err = clientSet.BatchV1().Jobs(jobObj.Namespace).Get(ctx, jobObj.Name, metav1.GetOptions{})
if err != nil {
t.Fatalf("Obtaining updated Job object: %v", err)
}
if hasJobTrackingAnnotation(jobObj) {
t.Error("controller didn't remove the tracking annotation")
}
// Step 3: Reenable tracking with finalizers.
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, true)()
cancel()
// Succeed a pod while Job controller is stopped.
if err, _ := setJobPodsPhase(context.Background(), clientSet, jobObj, v1.PodSucceeded, 1); err != nil {
t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodFailed, err)
}
// Restart controller.
ctx, cancel = startJobControllerAndWaitForCaches(restConfig)
// Ensure Job continues to be tracked and finalizers are removed.
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Active: 1,
Failed: 1,
Succeeded: 1,
Ready: pointer.Int32(0),
}, false)
}
func TestOrphanPodsFinalizersClearedWithGC(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, true)()
for _, policy := range []metav1.DeletionPropagation{metav1.DeletePropagationOrphan, metav1.DeletePropagationBackground, metav1.DeletePropagationForeground} {
t.Run(string(policy), func(t *testing.T) {
closeFn, restConfig, clientSet, ns := setup(t, "simple")
@@ -1276,7 +1150,7 @@ func TestOrphanPodsFinalizersClearedWithGC(t *testing.T) {
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Active: 2,
Ready: pointer.Int32(0),
}, true)
})
// Delete Job. The GC should delete the pods in cascade.
err = clientSet.BatchV1().Jobs(jobObj.Namespace).Delete(ctx, jobObj.Name, metav1.DeleteOptions{
@@ -1293,8 +1167,6 @@ func TestOrphanPodsFinalizersClearedWithGC(t *testing.T) {
}
func TestFinalizersClearedWhenBackoffLimitExceeded(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, true)()
closeFn, restConfig, clientSet, ns := setup(t, "simple")
defer closeFn()
ctx, cancel := startJobControllerAndWaitForCaches(restConfig)
@@ -1335,8 +1207,6 @@ func TestFinalizersClearedWhenBackoffLimitExceeded(t *testing.T) {
// TestJobFailedWithInterrupts tests that a job were one pod fails and the rest
// succeed is marked as Failed, even if the controller fails in the middle.
func TestJobFailedWithInterrupts(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, true)()
closeFn, restConfig, clientSet, ns := setup(t, "simple")
defer closeFn()
ctx, cancel := startJobControllerAndWaitForCaches(restConfig)
@@ -1361,7 +1231,7 @@ func TestJobFailedWithInterrupts(t *testing.T) {
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Active: 10,
Ready: pointer.Int32(0),
}, true)
})
t.Log("Finishing pods")
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 1); err != nil {
t.Fatalf("Could not fail a pod: %v", err)
@@ -1406,10 +1276,8 @@ func validateNoOrphanPodsWithFinalizers(ctx context.Context, t *testing.T, clien
}
}
func TestOrphanPodsFinalizersClearedWithFeatureDisabled(t *testing.T) {
// Step 0: job created while feature is enabled.
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, true)()
func TestOrphanPodsFinalizersClearedOnRestart(t *testing.T) {
// Step 0: create job.
closeFn, restConfig, clientSet, ns := setup(t, "simple")
defer closeFn()
ctx, cancel := startJobControllerAndWaitForCaches(restConfig)
@@ -1431,38 +1299,19 @@ func TestOrphanPodsFinalizersClearedWithFeatureDisabled(t *testing.T) {
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Active: 1,
Ready: pointer.Int32(0),
}, true)
})
// Step 2: Disable tracking with finalizers.
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, false)()
// Step 2: Delete the Job while the controller is stopped.
cancel()
// Delete the Job while controller is stopped.
err = clientSet.BatchV1().Jobs(jobObj.Namespace).Delete(context.Background(), jobObj.Name, metav1.DeleteOptions{})
if err != nil {
t.Fatalf("Failed to delete job: %v", err)
}
// Restart controller.
// Step 3: Restart controller.
ctx, cancel = startJobControllerAndWaitForCaches(restConfig)
if err := wait.PollImmediate(waitInterval, wait.ForeverTestTimeout, func() (done bool, err error) {
pods, err := clientSet.CoreV1().Pods(jobObj.Namespace).List(ctx, metav1.ListOptions{})
if err != nil {
t.Fatalf("Failed to list Job Pods: %v", err)
}
sawPods := false
for _, pod := range pods.Items {
if metav1.IsControlledBy(&pod, jobObj) {
if hasJobTrackingFinalizer(&pod) {
return false, nil
}
sawPods = true
}
}
return sawPods, nil
}); err != nil {
t.Errorf("Waiting for finalizers to be removed: %v", err)
}
validateNoOrphanPodsWithFinalizers(ctx, t, clientSet, jobObj)
}
func TestSuspendJob(t *testing.T) {
@@ -1518,7 +1367,7 @@ func TestSuspendJob(t *testing.T) {
validateJobPodsStatus(ctx, t, clientSet, job, podsByStatus{
Active: active,
Ready: pointer.Int32(0),
}, feature.DefaultFeatureGate.Enabled(features.JobTrackingWithFinalizers))
})
job, err = clientSet.BatchV1().Jobs(ns.Name).Get(ctx, job.Name, metav1.GetOptions{})
if err != nil {
t.Fatalf("Failed to get Job after %s: %v", s, err)
@@ -1561,7 +1410,7 @@ func TestSuspendJobControllerRestart(t *testing.T) {
validateJobPodsStatus(ctx, t, clientSet, job, podsByStatus{
Active: 0,
Ready: pointer.Int32(0),
}, true)
})
}
func TestNodeSelectorUpdate(t *testing.T) {
@@ -1646,7 +1495,7 @@ type podsByStatus struct {
Succeeded int
}
func validateJobPodsStatus(ctx context.Context, t *testing.T, clientSet clientset.Interface, jobObj *batchv1.Job, desired podsByStatus, wFinalizer bool) {
func validateJobPodsStatus(ctx context.Context, t *testing.T, clientSet clientset.Interface, jobObj *batchv1.Job, desired podsByStatus) {
t.Helper()
var actualCounts podsByStatus
if err := wait.PollImmediate(waitInterval, wait.ForeverTestTimeout, func() (bool, error) {
@@ -1686,8 +1535,8 @@ func validateJobPodsStatus(ctx context.Context, t *testing.T, clientSet clientse
}
}
for _, p := range active {
if got := hasJobTrackingFinalizer(p); got != wFinalizer {
t.Errorf("Pod %s has tracking finalizer %t, want %t", p.Name, got, wFinalizer)
if !hasJobTrackingFinalizer(p) {
t.Errorf("Active pod %s doesn't have tracking finalizer", p.Name)
}
}
}