diff --git a/pkg/controller/job/job_controller.go b/pkg/controller/job/job_controller.go index 7e985b8d88e..d236b11181c 100644 --- a/pkg/controller/job/job_controller.go +++ b/pkg/controller/job/job_controller.go @@ -896,6 +896,9 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) { jobCtx.finishedCondition = nil } active -= deleted + if feature.DefaultFeatureGate.Enabled(features.JobPodReplacementPolicy) { + *jobCtx.terminating += deleted + } manageJobErr = err } else { manageJobCalled := false @@ -1501,6 +1504,9 @@ func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, jobCtx *syn jm.expectations.ExpectDeletions(logger, jobKey, len(podsToDelete)) removed, err := jm.deleteJobPods(ctx, job, jobKey, podsToDelete) active -= removed + if feature.DefaultFeatureGate.Enabled(features.JobPodReplacementPolicy) { + *jobCtx.terminating += removed + } return active, metrics.JobSyncActionPodsDeleted, err } @@ -1550,6 +1556,9 @@ func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, jobCtx *syn logger.V(4).Info("Too many pods running for job", "job", klog.KObj(job), "deleted", len(podsToDelete), "target", wantActive) removed, err := jm.deleteJobPods(ctx, job, jobKey, podsToDelete) active -= removed + if feature.DefaultFeatureGate.Enabled(features.JobPodReplacementPolicy) { + *jobCtx.terminating += removed + } // While it is possible for a Job to require both pod creations and // deletions at the same time (e.g. indexed Jobs with repeated indexes), we // restrict ourselves to either just pod deletion or pod creation in any diff --git a/pkg/controller/job/job_controller_test.go b/pkg/controller/job/job_controller_test.go index f8be81228e5..9f5ceac0185 100644 --- a/pkg/controller/job/job_controller_test.go +++ b/pkg/controller/job/job_controller_test.go @@ -403,7 +403,7 @@ func TestControllerSyncJob(t *testing.T) { terminatingPods: 4, podReplacementPolicy: podReplacementPolicy(batch.Failed), jobPodReplacementPolicy: true, - expectedTerminating: ptr.To[int32](4), + expectedTerminating: ptr.To[int32](5), expectedReady: ptr.To[int32](0), expectedActive: 1, expectedDeletions: 1, @@ -497,6 +497,18 @@ func TestControllerSyncJob(t *testing.T) { expectedPodPatches: 1, expectedReady: ptr.To[int32](0), }, + "too many active pods; PodReplacementPolicy enabled": { + parallelism: 2, + completions: 5, + backoffLimit: 6, + activePods: 3, + jobPodReplacementPolicy: true, + expectedDeletions: 1, + expectedActive: 2, + expectedPodPatches: 1, + expectedReady: ptr.To[int32](0), + expectedTerminating: ptr.To[int32](1), + }, "too many active pods, with controller error": { parallelism: 2, completions: 5, @@ -745,6 +757,48 @@ func TestControllerSyncJob(t *testing.T) { expectedPodPatches: 4, expectedReady: ptr.To[int32](0), }, + "indexed job failed": { + parallelism: 2, + completions: 3, + backoffLimit: 0, + completionMode: batch.IndexedCompletion, + podsWithIndexes: []indexPhase{ + {"0", v1.PodSucceeded}, + {"1", v1.PodFailed}, + {"2", v1.PodRunning}, + }, + expectedSucceeded: 1, + expectedFailed: 2, + expectedCompletedIdxs: "0", + expectedCondition: &jobConditionFailed, + expectedConditionStatus: v1.ConditionTrue, + expectedConditionReason: "BackoffLimitExceeded", + expectedPodPatches: 3, + expectedReady: ptr.To[int32](0), + expectedDeletions: 1, + }, + "count terminating pods when indexed job fails and PodReplacementPolicy enabled": { + parallelism: 2, + completions: 3, + backoffLimit: 0, + completionMode: batch.IndexedCompletion, + podsWithIndexes: []indexPhase{ + {"0", v1.PodSucceeded}, + {"1", v1.PodFailed}, + {"2", v1.PodRunning}, + }, + jobPodReplacementPolicy: true, + expectedSucceeded: 1, + expectedFailed: 2, + expectedCompletedIdxs: "0", + expectedCondition: &jobConditionFailed, + expectedConditionStatus: v1.ConditionTrue, + expectedConditionReason: "BackoffLimitExceeded", + expectedPodPatches: 3, + expectedReady: ptr.To[int32](0), + expectedDeletions: 1, + expectedTerminating: ptr.To[int32](1), + }, "indexed job repeated completed index": { parallelism: 2, completions: 3, @@ -888,6 +942,25 @@ func TestControllerSyncJob(t *testing.T) { expectedPodPatches: 2, expectedReady: ptr.To[int32](0), }, + "suspending a job with satisfied expectations; PodReplacementPolicy enabled": { + // Suspended Job should delete active pods when expectations are + // satisfied. + suspend: true, + parallelism: 2, + activePods: 2, // parallelism == active, expectations satisfied + completions: 4, + backoffLimit: 6, + jobPodReplacementPolicy: true, + expectedCreations: 0, + expectedDeletions: 2, + expectedActive: 0, + expectedCondition: &jobConditionSuspended, + expectedConditionStatus: v1.ConditionTrue, + expectedConditionReason: "JobSuspended", + expectedPodPatches: 2, + expectedReady: ptr.To[int32](0), + expectedTerminating: ptr.To[int32](2), + }, "suspending a job with unsatisfied expectations": { // Unlike the previous test, we expect the controller to NOT suspend the // Job in the syncJob call because the controller will wait for @@ -904,6 +977,20 @@ func TestControllerSyncJob(t *testing.T) { expectedActive: 3, expectedReady: ptr.To[int32](0), }, + "suspending a job with unsatisfied expectations; PodReplacementPolicy enabled": { + suspend: true, + parallelism: 2, + activePods: 3, // active > parallelism, expectations unsatisfied + fakeExpectationAtCreation: -1, // the controller is expecting a deletion + completions: 4, + backoffLimit: 6, + jobPodReplacementPolicy: true, + expectedCreations: 0, + expectedDeletions: 0, + expectedActive: 3, + expectedReady: ptr.To[int32](0), + expectedTerminating: ptr.To[int32](0), + }, "resuming a suspended job": { wasSuspended: true, suspend: false, @@ -935,6 +1022,21 @@ func TestControllerSyncJob(t *testing.T) { expectedPodPatches: 2, expectedReady: ptr.To[int32](0), }, + "suspending a deleted job; PodReplacementPolicy enabled": { + suspend: true, + deleting: true, + parallelism: 2, + activePods: 2, // parallelism == active, expectations satisfied + completions: 4, + backoffLimit: 6, + jobPodReplacementPolicy: true, + expectedCreations: 0, + expectedDeletions: 0, + expectedActive: 2, + expectedPodPatches: 2, + expectedReady: ptr.To[int32](0), + expectedTerminating: ptr.To[int32](0), + }, "indexed job with podIndexLabel feature disabled": { parallelism: 2, completions: 5, @@ -3604,15 +3706,17 @@ func TestSyncJobWithJobSuccessPolicy(t *testing.T) { } testCases := map[string]struct { - enableJobFailurePolicy bool - enableBackoffLimitPerIndex bool - enableJobSuccessPolicy bool - job batch.Job - pods []v1.Pod - wantStatus batch.JobStatus + enableJobFailurePolicy bool + enableBackoffLimitPerIndex bool + enableJobSuccessPolicy bool + enableJobPodReplacementPolicy bool + job batch.Job + pods []v1.Pod + wantStatus batch.JobStatus }{ - "job with successPolicy; job has SuccessCriteriaMet condition if job meets to successPolicy and some indexes fail": { - enableJobSuccessPolicy: true, + "job with successPolicy; jobPodReplacementPolicy feature enabled; job has SuccessCriteriaMet condition if job meets to successPolicy and some indexes fail": { + enableJobSuccessPolicy: true, + enableJobPodReplacementPolicy: true, job: batch.Job{ TypeMeta: validTypeMeta, ObjectMeta: validObjectMeta, @@ -3640,7 +3744,7 @@ func TestSyncJobWithJobSuccessPolicy(t *testing.T) { wantStatus: batch.JobStatus{ Failed: 1, Succeeded: 1, - Terminating: ptr.To[int32](0), + Terminating: ptr.To[int32](2), CompletedIndexes: "1", UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, Conditions: []batch.JobCondition{ @@ -3659,9 +3763,58 @@ func TestSyncJobWithJobSuccessPolicy(t *testing.T) { }, }, }, - "job with podFailurePolicy and successPolicy; job has SuccessCriteriaMet condition if job meets to successPolicy and doesn't meet to podFailurePolicy": { - enableJobSuccessPolicy: true, - enableJobFailurePolicy: true, + "job with successPolicy; jobPodReplacementPolicy feature disabled; job has SuccessCriteriaMet condition if job meets to successPolicy and some indexes fail": { + enableJobSuccessPolicy: true, + enableJobPodReplacementPolicy: false, + job: batch.Job{ + TypeMeta: validTypeMeta, + ObjectMeta: validObjectMeta, + Spec: batch.JobSpec{ + Selector: validSelector, + Template: validTemplate, + CompletionMode: completionModePtr(batch.IndexedCompletion), + Parallelism: ptr.To[int32](3), + Completions: ptr.To[int32](3), + BackoffLimit: ptr.To[int32](math.MaxInt32), + SuccessPolicy: &batch.SuccessPolicy{ + Rules: []batch.SuccessPolicyRule{{ + SucceededIndexes: ptr.To("0,1"), + SucceededCount: ptr.To[int32](1), + }}, + }, + }, + }, + pods: []v1.Pod{ + *buildPod().uid("a1").index("0").phase(v1.PodFailed).trackingFinalizer().Pod, + *buildPod().uid("a2").index("0").phase(v1.PodRunning).trackingFinalizer().Pod, + *buildPod().uid("b").index("1").phase(v1.PodSucceeded).trackingFinalizer().Pod, + *buildPod().uid("c").index("2").phase(v1.PodRunning).trackingFinalizer().Pod, + }, + wantStatus: batch.JobStatus{ + Failed: 1, + Succeeded: 1, + CompletedIndexes: "1", + UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, + Conditions: []batch.JobCondition{ + { + Type: batch.JobSuccessCriteriaMet, + Status: v1.ConditionTrue, + Reason: batch.JobReasonSuccessPolicy, + Message: "Matched rules at index 0", + }, + { + Type: batch.JobComplete, + Status: v1.ConditionTrue, + Reason: batch.JobReasonSuccessPolicy, + Message: "Matched rules at index 0", + }, + }, + }, + }, + "job with podFailurePolicy and successPolicy; jobPodReplacementPolicy feature enabled; job has SuccessCriteriaMet condition if job meets to successPolicy and doesn't meet to podFailurePolicy": { + enableJobSuccessPolicy: true, + enableJobFailurePolicy: true, + enableJobPodReplacementPolicy: true, job: batch.Job{ TypeMeta: validTypeMeta, ObjectMeta: validObjectMeta, @@ -3697,7 +3850,7 @@ func TestSyncJobWithJobSuccessPolicy(t *testing.T) { wantStatus: batch.JobStatus{ Failed: 1, Succeeded: 1, - Terminating: ptr.To[int32](0), + Terminating: ptr.To[int32](1), CompletedIndexes: "1", UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, Conditions: []batch.JobCondition{ @@ -3716,9 +3869,67 @@ func TestSyncJobWithJobSuccessPolicy(t *testing.T) { }, }, }, - "job with backoffLimitPerIndex and successPolicy; job has SuccessCriteriaMet condition if job meets to successPolicy and doesn't meet backoffLimitPerIndex": { - enableJobSuccessPolicy: true, - enableBackoffLimitPerIndex: true, + "job with podFailurePolicy and successPolicy; jobPodReplacementPolicy feature disabled; job has SuccessCriteriaMet condition if job meets to successPolicy and doesn't meet to podFailurePolicy": { + enableJobSuccessPolicy: true, + enableJobFailurePolicy: true, + enableJobPodReplacementPolicy: false, + job: batch.Job{ + TypeMeta: validTypeMeta, + ObjectMeta: validObjectMeta, + Spec: batch.JobSpec{ + Selector: validSelector, + Template: validTemplate, + CompletionMode: completionModePtr(batch.IndexedCompletion), + Parallelism: ptr.To[int32](2), + Completions: ptr.To[int32](2), + BackoffLimit: ptr.To[int32](math.MaxInt32), + SuccessPolicy: &batch.SuccessPolicy{ + Rules: []batch.SuccessPolicyRule{{ + SucceededIndexes: ptr.To("0,1"), + SucceededCount: ptr.To[int32](1), + }}, + }, + PodFailurePolicy: &batch.PodFailurePolicy{ + Rules: []batch.PodFailurePolicyRule{{ + Action: batch.PodFailurePolicyActionFailJob, + OnPodConditions: []batch.PodFailurePolicyOnPodConditionsPattern{{ + Type: v1.DisruptionTarget, + Status: v1.ConditionTrue, + }}, + }}, + }, + }, + }, + pods: []v1.Pod{ + *buildPod().uid("a1").index("0").phase(v1.PodFailed).trackingFinalizer().Pod, + *buildPod().uid("a2").index("0").phase(v1.PodRunning).trackingFinalizer().Pod, + *buildPod().uid("b").index("1").phase(v1.PodSucceeded).trackingFinalizer().Pod, + }, + wantStatus: batch.JobStatus{ + Failed: 1, + Succeeded: 1, + CompletedIndexes: "1", + UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, + Conditions: []batch.JobCondition{ + { + Type: batch.JobSuccessCriteriaMet, + Status: v1.ConditionTrue, + Reason: batch.JobReasonSuccessPolicy, + Message: "Matched rules at index 0", + }, + { + Type: batch.JobComplete, + Status: v1.ConditionTrue, + Reason: batch.JobReasonSuccessPolicy, + Message: "Matched rules at index 0", + }, + }, + }, + }, + "job with backoffLimitPerIndex and successPolicy; jobPodReplacementPolicy feature enabled; job has SuccessCriteriaMet condition if job meets to successPolicy and doesn't meet backoffLimitPerIndex": { + enableJobSuccessPolicy: true, + enableBackoffLimitPerIndex: true, + enableJobPodReplacementPolicy: true, job: batch.Job{ TypeMeta: validTypeMeta, ObjectMeta: validObjectMeta, @@ -3746,7 +3957,7 @@ func TestSyncJobWithJobSuccessPolicy(t *testing.T) { wantStatus: batch.JobStatus{ Failed: 1, Succeeded: 1, - Terminating: ptr.To[int32](0), + Terminating: ptr.To[int32](1), CompletedIndexes: "1", FailedIndexes: ptr.To(""), UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, @@ -3766,8 +3977,9 @@ func TestSyncJobWithJobSuccessPolicy(t *testing.T) { }, }, }, - "job with successPolicy; job has both Complete and SuccessCriteriaMet condition when job meets to successPolicy and all pods have been already removed": { - enableJobSuccessPolicy: true, + "job with successPolicy; jobPodReplacementPolicy feature enabled; job has both Complete and SuccessCriteriaMet condition when job meets to successPolicy and all pods have been already removed": { + enableJobSuccessPolicy: true, + enableJobPodReplacementPolicy: true, job: batch.Job{ TypeMeta: validTypeMeta, ObjectMeta: validObjectMeta, @@ -3828,8 +4040,9 @@ func TestSyncJobWithJobSuccessPolicy(t *testing.T) { // So, we need to revisit here before we graduate the JobSuccessPolicy to beta. // TODO(#123775): A Job might finish with ready!=0 // REF: https://github.com/kubernetes/kubernetes/issues/123775 - "job with successPolicy; job has SuccessCriteriaMet and Complete condition when job meets to successPolicy and some pods still are running": { - enableJobSuccessPolicy: true, + "job with successPolicy; jobPodReplacementPolicy feature enabled; job has SuccessCriteriaMet and Complete condition when job meets to successPolicy and some pods still are running": { + enableJobSuccessPolicy: true, + enableJobPodReplacementPolicy: true, job: batch.Job{ TypeMeta: validTypeMeta, ObjectMeta: validObjectMeta, @@ -3868,7 +4081,7 @@ func TestSyncJobWithJobSuccessPolicy(t *testing.T) { wantStatus: batch.JobStatus{ Failed: 1, Succeeded: 1, - Terminating: ptr.To[int32](0), + Terminating: ptr.To[int32](2), CompletedIndexes: "1", UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, Conditions: []batch.JobCondition{ @@ -3887,9 +4100,10 @@ func TestSyncJobWithJobSuccessPolicy(t *testing.T) { }, }, }, - "job with successPolicy and podFailurePolicy; job has a failed condition when job meets to both successPolicy and podFailurePolicy": { - enableJobSuccessPolicy: true, - enableJobFailurePolicy: true, + "job with successPolicy and podFailurePolicy; jobPodReplacementPolicy feature enabled; job has a failed condition when job meets to both successPolicy and podFailurePolicy": { + enableJobSuccessPolicy: true, + enableJobFailurePolicy: true, + enableJobPodReplacementPolicy: true, job: batch.Job{ TypeMeta: validTypeMeta, ObjectMeta: validObjectMeta, @@ -3931,7 +4145,7 @@ func TestSyncJobWithJobSuccessPolicy(t *testing.T) { wantStatus: batch.JobStatus{ Failed: 2, Succeeded: 1, - Terminating: ptr.To[int32](0), + Terminating: ptr.To[int32](1), CompletedIndexes: "1", UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, Conditions: []batch.JobCondition{ @@ -3950,9 +4164,10 @@ func TestSyncJobWithJobSuccessPolicy(t *testing.T) { }, }, }, - "job with successPolicy and backoffLimitPerIndex; job has a failed condition when job meets to both successPolicy and backoffLimitPerIndex": { - enableJobSuccessPolicy: true, - enableBackoffLimitPerIndex: true, + "job with successPolicy and backoffLimitPerIndex; jobPodReplacementPolicy feature enabled; job has a failed condition when job meets to both successPolicy and backoffLimitPerIndex": { + enableJobSuccessPolicy: true, + enableJobPodReplacementPolicy: true, + enableBackoffLimitPerIndex: true, job: batch.Job{ TypeMeta: validTypeMeta, ObjectMeta: validObjectMeta, @@ -3993,8 +4208,9 @@ func TestSyncJobWithJobSuccessPolicy(t *testing.T) { }, }, }, - "job with successPolicy and backoffLimit; job has a failed condition when job meets to both successPolicy and backoffLimit": { - enableJobSuccessPolicy: true, + "job with successPolicy and backoffLimit; jobPodReplacementPolicy feature enabled; job has a failed condition when job meets to both successPolicy and backoffLimit": { + enableJobSuccessPolicy: true, + enableJobPodReplacementPolicy: true, job: batch.Job{ TypeMeta: validTypeMeta, ObjectMeta: validObjectMeta, @@ -4034,9 +4250,10 @@ func TestSyncJobWithJobSuccessPolicy(t *testing.T) { }, }, }, - "job with successPolicy and podFailurePolicy; job with SuccessCriteriaMet has never been transitioned to FailureTarget and Failed even if job meets podFailurePolicy": { - enableJobSuccessPolicy: true, - enableJobFailurePolicy: true, + "job with successPolicy and podFailurePolicy; jobPodReplacementPolicy feature enabled; job with SuccessCriteriaMet has never been transitioned to FailureTarget and Failed even if job meets podFailurePolicy": { + enableJobSuccessPolicy: true, + enableJobFailurePolicy: true, + enableJobPodReplacementPolicy: true, job: batch.Job{ TypeMeta: validTypeMeta, ObjectMeta: validObjectMeta, @@ -4110,9 +4327,10 @@ func TestSyncJobWithJobSuccessPolicy(t *testing.T) { }, }, }, - "job with successPolicy and backoffLimitPerIndex; job with SuccessCriteriaMet has never been transitioned to FailureTarget and Failed even if job meet backoffLimitPerIndex": { - enableJobSuccessPolicy: true, - enableBackoffLimitPerIndex: true, + "job with successPolicy and backoffLimitPerIndex; jobPodReplacementPolicy feature enabled; job with SuccessCriteriaMet has never been transitioned to FailureTarget and Failed even if job meet backoffLimitPerIndex": { + enableJobSuccessPolicy: true, + enableJobPodReplacementPolicy: true, + enableBackoffLimitPerIndex: true, job: batch.Job{ TypeMeta: validTypeMeta, ObjectMeta: validObjectMeta, @@ -4173,8 +4391,9 @@ func TestSyncJobWithJobSuccessPolicy(t *testing.T) { }, }, }, - "job with successPolicy and backoffLimit: job with SuccessCriteriaMet has never been transitioned to FailureTarget and Failed even if job meets backoffLimit": { - enableJobSuccessPolicy: true, + "job with successPolicy and backoffLimit; jobPodReplacementPolicy feature enabled; job with SuccessCriteriaMet has never been transitioned to FailureTarget and Failed even if job meets backoffLimit": { + enableJobSuccessPolicy: true, + enableJobPodReplacementPolicy: true, job: batch.Job{ TypeMeta: validTypeMeta, ObjectMeta: validObjectMeta, @@ -4234,9 +4453,10 @@ func TestSyncJobWithJobSuccessPolicy(t *testing.T) { }, }, }, - "job with successPolicy and podFailureTarget; job with FailureTarget has never been transitioned to SuccessCriteriaMet even if job meets successPolicy": { - enableJobSuccessPolicy: true, - enableJobFailurePolicy: true, + "job with successPolicy and podFailureTarget; jobPodReplacementPolicy feature enabled; job with FailureTarget has never been transitioned to SuccessCriteriaMet even if job meets successPolicy": { + enableJobSuccessPolicy: true, + enableJobFailurePolicy: true, + enableJobPodReplacementPolicy: true, job: batch.Job{ TypeMeta: validTypeMeta, ObjectMeta: validObjectMeta, @@ -4317,6 +4537,7 @@ func TestSyncJobWithJobSuccessPolicy(t *testing.T) { featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodFailurePolicy, tc.enableJobFailurePolicy) featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobBackoffLimitPerIndex, tc.enableBackoffLimitPerIndex) featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobSuccessPolicy, tc.enableJobSuccessPolicy) + featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodReplacementPolicy, tc.enableJobPodReplacementPolicy) clientSet := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) fakeClock := clocktesting.NewFakeClock(now) @@ -4761,7 +4982,7 @@ func TestSyncJobWithJobBackoffLimitPerIndex(t *testing.T) { wantStatus: batch.JobStatus{ Failed: 3, Succeeded: 1, - Terminating: ptr.To[int32](0), + Terminating: ptr.To[int32](1), FailedIndexes: ptr.To("0,2"), CompletedIndexes: "1", UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, diff --git a/test/integration/job/job_test.go b/test/integration/job/job_test.go index 44bb565a7ae..427e62b466c 100644 --- a/test/integration/job/job_test.go +++ b/test/integration/job/job_test.go @@ -514,6 +514,7 @@ func TestSuccessPolicy(t *testing.T) { wantActiveIndexes sets.Set[int] wantCompletedIndexes string wantFailedIndexes *string + wantTerminating *int32 } podTemplateSpec := v1.PodTemplateSpec{ @@ -561,6 +562,7 @@ func TestSuccessPolicy(t *testing.T) { wantFailed: 0, wantSucceeded: 1, wantCompletedIndexes: "0", + wantTerminating: ptr.To(int32(0)), }, }, wantConditionTypes: []batchv1.JobConditionType{batchv1.JobSuccessCriteriaMet, batchv1.JobComplete}, @@ -595,6 +597,7 @@ func TestSuccessPolicy(t *testing.T) { wantFailed: 0, wantSucceeded: 1, wantCompletedIndexes: "0", + wantTerminating: ptr.To(int32(0)), }, }, wantConditionTypes: []batchv1.JobConditionType{batchv1.JobComplete}, @@ -630,6 +633,7 @@ func TestSuccessPolicy(t *testing.T) { wantActiveIndexes: sets.New(0, 1), wantFailed: 0, wantSucceeded: 0, + wantTerminating: ptr.To(int32(0)), }, { index: 1, @@ -640,6 +644,7 @@ func TestSuccessPolicy(t *testing.T) { wantFailed: 0, wantSucceeded: 1, wantCompletedIndexes: "1", + wantTerminating: ptr.To(int32(1)), }, }, wantConditionTypes: []batchv1.JobConditionType{batchv1.JobSuccessCriteriaMet, batchv1.JobComplete}, @@ -675,6 +680,7 @@ func TestSuccessPolicy(t *testing.T) { wantActiveIndexes: sets.New(0, 1), wantFailed: 0, wantSucceeded: 0, + wantTerminating: ptr.To(int32(0)), }, { index: 1, @@ -685,6 +691,7 @@ func TestSuccessPolicy(t *testing.T) { wantFailed: 0, wantSucceeded: 1, wantCompletedIndexes: "1", + wantTerminating: ptr.To(int32(1)), }, }, wantConditionTypes: []batchv1.JobConditionType{batchv1.JobSuccessCriteriaMet, batchv1.JobComplete}, @@ -723,6 +730,7 @@ func TestSuccessPolicy(t *testing.T) { wantFailed: 1, wantFailedIndexes: ptr.To("0"), wantSucceeded: 0, + wantTerminating: ptr.To(int32(0)), }, { index: 1, @@ -734,6 +742,7 @@ func TestSuccessPolicy(t *testing.T) { wantSucceeded: 1, wantFailedIndexes: ptr.To("0"), wantCompletedIndexes: "1", + wantTerminating: ptr.To(int32(0)), }, }, wantConditionTypes: []batchv1.JobConditionType{batchv1.JobFailed}, @@ -774,7 +783,7 @@ func TestSuccessPolicy(t *testing.T) { Succeeded: podTermination.wantSucceeded, Failed: podTermination.wantFailed, Ready: ptr.To[int32](0), - Terminating: ptr.To[int32](0), + Terminating: podTermination.wantTerminating, }) validateIndexedJobPods(ctx, t, clientSet, jobObj, podTermination.wantActiveIndexes, podTermination.wantCompletedIndexes, podTermination.wantFailedIndexes) } @@ -861,7 +870,7 @@ func TestSuccessPolicy_ReEnabling(t *testing.T) { Active: 0, Succeeded: 3, Ready: ptr.To[int32](0), - Terminating: ptr.To[int32](0), + Terminating: ptr.To[int32](2), }) validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.New[int](), "0-2", nil) @@ -1168,6 +1177,7 @@ func TestBackoffLimitPerIndex(t *testing.T) { wantCompletedIndexes string wantFailedIndexes *string wantReplacementPodFailureCount *int + wantTerminating *int32 } podTemplateSpec := v1.PodTemplateSpec{ @@ -1208,6 +1218,7 @@ func TestBackoffLimitPerIndex(t *testing.T) { wantActiveIndexes: sets.New(0, 1), wantFailedIndexes: ptr.To(""), wantReplacementPodFailureCount: ptr.To(1), + wantTerminating: ptr.To(int32(0)), }, }, wantJobConditionType: batchv1.JobComplete, @@ -1238,6 +1249,7 @@ func TestBackoffLimitPerIndex(t *testing.T) { wantActiveIndexes: sets.New(0, 1), wantFailedIndexes: ptr.To(""), wantReplacementPodFailureCount: ptr.To(1), + wantTerminating: ptr.To(int32(0)), }, { status: v1.PodStatus{ @@ -1248,6 +1260,7 @@ func TestBackoffLimitPerIndex(t *testing.T) { wantActiveIndexes: sets.New(0, 1), wantFailedIndexes: ptr.To(""), wantReplacementPodFailureCount: ptr.To(2), + wantTerminating: ptr.To(int32(0)), }, { status: v1.PodStatus{ @@ -1257,6 +1270,7 @@ func TestBackoffLimitPerIndex(t *testing.T) { wantFailed: 3, wantActiveIndexes: sets.New(1), wantFailedIndexes: ptr.To("0"), + wantTerminating: ptr.To(int32(0)), }, }, wantJobConditionType: batchv1.JobFailed, @@ -1292,6 +1306,7 @@ func TestBackoffLimitPerIndex(t *testing.T) { wantFailed: 1, wantActiveIndexes: sets.New(0, 1, 2), wantFailedIndexes: ptr.To(""), + wantTerminating: ptr.To(int32(0)), }, { index: 1, @@ -1302,6 +1317,7 @@ func TestBackoffLimitPerIndex(t *testing.T) { wantFailed: 2, wantActiveIndexes: sets.New(0, 1, 2), wantFailedIndexes: ptr.To(""), + wantTerminating: ptr.To(int32(0)), }, { index: 2, @@ -1310,6 +1326,7 @@ func TestBackoffLimitPerIndex(t *testing.T) { }, wantFailed: 5, wantFailedIndexes: ptr.To(""), + wantTerminating: ptr.To(int32(2)), }, }, wantJobConditionType: batchv1.JobFailed, @@ -1344,6 +1361,7 @@ func TestBackoffLimitPerIndex(t *testing.T) { wantFailed: 1, wantActiveIndexes: sets.New(1), wantFailedIndexes: ptr.To("0"), + wantTerminating: ptr.To(int32(0)), }, { index: 1, @@ -1354,6 +1372,7 @@ func TestBackoffLimitPerIndex(t *testing.T) { wantSucceeded: 1, wantFailedIndexes: ptr.To("0"), wantCompletedIndexes: "1", + wantTerminating: ptr.To(int32(0)), }, }, wantJobConditionType: batchv1.JobFailed, @@ -1389,6 +1408,7 @@ func TestBackoffLimitPerIndex(t *testing.T) { wantFailed: 1, wantActiveIndexes: sets.New(1, 2), wantFailedIndexes: ptr.To("0"), + wantTerminating: ptr.To(int32(0)), }, { index: 1, @@ -1398,6 +1418,7 @@ func TestBackoffLimitPerIndex(t *testing.T) { wantActive: 0, wantFailed: 3, wantFailedIndexes: ptr.To("0,1"), + wantTerminating: ptr.To(int32(1)), }, }, wantJobConditionType: batchv1.JobFailed, @@ -1457,6 +1478,7 @@ func TestBackoffLimitPerIndex(t *testing.T) { wantFailed: 1, wantActiveIndexes: sets.New(1), wantFailedIndexes: ptr.To("0"), + wantTerminating: ptr.To(int32(0)), }, { index: 1, @@ -1471,6 +1493,7 @@ func TestBackoffLimitPerIndex(t *testing.T) { }, wantFailed: 2, wantFailedIndexes: ptr.To("0,1"), + wantTerminating: ptr.To(int32(0)), }, }, wantJobConditionType: batchv1.JobFailed, @@ -1517,7 +1540,7 @@ func TestBackoffLimitPerIndex(t *testing.T) { Succeeded: podTermination.wantSucceeded, Failed: podTermination.wantFailed, Ready: ptr.To[int32](0), - Terminating: ptr.To[int32](0), + Terminating: podTermination.wantTerminating, }) validateIndexedJobPods(ctx, t, clientSet, jobObj, podTermination.wantActiveIndexes, podTermination.wantCompletedIndexes, podTermination.wantFailedIndexes) if podTermination.wantReplacementPodFailureCount != nil { @@ -2730,6 +2753,7 @@ func TestElasticIndexedJob(t *testing.T) { wantFailed int wantRemainingIndexes sets.Set[int] wantActivePods int + wantTerminating *int32 } cases := map[string]struct { featureGate bool @@ -2739,7 +2763,8 @@ func TestElasticIndexedJob(t *testing.T) { "feature flag off, mutation not allowed": { jobUpdates: []jobUpdate{ { - completions: ptr.To[int32](4), + completions: ptr.To[int32](4), + wantTerminating: ptr.To[int32](0), }, }, wantErr: apierrors.NewInvalid( @@ -2756,6 +2781,7 @@ func TestElasticIndexedJob(t *testing.T) { completions: ptr.To[int32](4), succeedIndexes: []int{0, 1, 2, 3}, wantSucceededIndexes: "0-3", + wantTerminating: ptr.To[int32](0), }, }, }, @@ -2770,6 +2796,7 @@ func TestElasticIndexedJob(t *testing.T) { wantFailed: 1, wantRemainingIndexes: sets.New(0, 2), wantActivePods: 2, + wantTerminating: ptr.To[int32](0), }, // Scale down completions 3->1, verify prev failure out of range still counts // but succeeded out of range does not. @@ -2778,6 +2805,7 @@ func TestElasticIndexedJob(t *testing.T) { succeedIndexes: []int{0}, wantSucceededIndexes: "0", wantFailed: 1, + wantTerminating: ptr.To[int32](0), }, }, }, @@ -2790,18 +2818,21 @@ func TestElasticIndexedJob(t *testing.T) { wantSucceededIndexes: "2", wantRemainingIndexes: sets.New(0, 1), wantActivePods: 2, + wantTerminating: ptr.To[int32](0), }, // Scale completions down 3->2 to exclude previously succeeded index. { completions: ptr.To[int32](2), wantRemainingIndexes: sets.New(0, 1), wantActivePods: 2, + wantTerminating: ptr.To[int32](0), }, // Scale completions back up to include previously succeeded index that was temporarily out of range. { completions: ptr.To[int32](3), succeedIndexes: []int{0, 1, 2}, wantSucceededIndexes: "0-2", + wantTerminating: ptr.To[int32](0), }, }, }, @@ -2809,7 +2840,8 @@ func TestElasticIndexedJob(t *testing.T) { featureGate: true, jobUpdates: []jobUpdate{ { - completions: ptr.To[int32](0), + completions: ptr.To[int32](0), + wantTerminating: ptr.To[int32](3), }, }, }, @@ -2887,7 +2919,7 @@ func TestElasticIndexedJob(t *testing.T) { Succeeded: len(update.succeedIndexes), Failed: update.wantFailed, Ready: ptr.To[int32](0), - Terminating: ptr.To[int32](0), + Terminating: update.wantTerminating, }) validateIndexedJobPods(ctx, t, clientSet, jobObj, update.wantRemainingIndexes, update.wantSucceededIndexes, nil) }