Merge pull request #125175 from dejanzele/feat/count-terminating-for-failed-jobs

Count terminating pods when deleting active pods for failed jobs
This commit is contained in:
Kubernetes Prow Robot
2024-06-10 16:56:37 -07:00
committed by GitHub
3 changed files with 312 additions and 50 deletions

View File

@@ -896,6 +896,9 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
jobCtx.finishedCondition = nil jobCtx.finishedCondition = nil
} }
active -= deleted active -= deleted
if feature.DefaultFeatureGate.Enabled(features.JobPodReplacementPolicy) {
*jobCtx.terminating += deleted
}
manageJobErr = err manageJobErr = err
} else { } else {
manageJobCalled := false manageJobCalled := false
@@ -1501,6 +1504,9 @@ func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, jobCtx *syn
jm.expectations.ExpectDeletions(logger, jobKey, len(podsToDelete)) jm.expectations.ExpectDeletions(logger, jobKey, len(podsToDelete))
removed, err := jm.deleteJobPods(ctx, job, jobKey, podsToDelete) removed, err := jm.deleteJobPods(ctx, job, jobKey, podsToDelete)
active -= removed active -= removed
if feature.DefaultFeatureGate.Enabled(features.JobPodReplacementPolicy) {
*jobCtx.terminating += removed
}
return active, metrics.JobSyncActionPodsDeleted, err return active, metrics.JobSyncActionPodsDeleted, err
} }
@@ -1550,6 +1556,9 @@ func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, jobCtx *syn
logger.V(4).Info("Too many pods running for job", "job", klog.KObj(job), "deleted", len(podsToDelete), "target", wantActive) logger.V(4).Info("Too many pods running for job", "job", klog.KObj(job), "deleted", len(podsToDelete), "target", wantActive)
removed, err := jm.deleteJobPods(ctx, job, jobKey, podsToDelete) removed, err := jm.deleteJobPods(ctx, job, jobKey, podsToDelete)
active -= removed active -= removed
if feature.DefaultFeatureGate.Enabled(features.JobPodReplacementPolicy) {
*jobCtx.terminating += removed
}
// While it is possible for a Job to require both pod creations and // While it is possible for a Job to require both pod creations and
// deletions at the same time (e.g. indexed Jobs with repeated indexes), we // deletions at the same time (e.g. indexed Jobs with repeated indexes), we
// restrict ourselves to either just pod deletion or pod creation in any // restrict ourselves to either just pod deletion or pod creation in any

View File

@@ -403,7 +403,7 @@ func TestControllerSyncJob(t *testing.T) {
terminatingPods: 4, terminatingPods: 4,
podReplacementPolicy: podReplacementPolicy(batch.Failed), podReplacementPolicy: podReplacementPolicy(batch.Failed),
jobPodReplacementPolicy: true, jobPodReplacementPolicy: true,
expectedTerminating: ptr.To[int32](4), expectedTerminating: ptr.To[int32](5),
expectedReady: ptr.To[int32](0), expectedReady: ptr.To[int32](0),
expectedActive: 1, expectedActive: 1,
expectedDeletions: 1, expectedDeletions: 1,
@@ -497,6 +497,18 @@ func TestControllerSyncJob(t *testing.T) {
expectedPodPatches: 1, expectedPodPatches: 1,
expectedReady: ptr.To[int32](0), expectedReady: ptr.To[int32](0),
}, },
"too many active pods; PodReplacementPolicy enabled": {
parallelism: 2,
completions: 5,
backoffLimit: 6,
activePods: 3,
jobPodReplacementPolicy: true,
expectedDeletions: 1,
expectedActive: 2,
expectedPodPatches: 1,
expectedReady: ptr.To[int32](0),
expectedTerminating: ptr.To[int32](1),
},
"too many active pods, with controller error": { "too many active pods, with controller error": {
parallelism: 2, parallelism: 2,
completions: 5, completions: 5,
@@ -745,6 +757,48 @@ func TestControllerSyncJob(t *testing.T) {
expectedPodPatches: 4, expectedPodPatches: 4,
expectedReady: ptr.To[int32](0), expectedReady: ptr.To[int32](0),
}, },
"indexed job failed": {
parallelism: 2,
completions: 3,
backoffLimit: 0,
completionMode: batch.IndexedCompletion,
podsWithIndexes: []indexPhase{
{"0", v1.PodSucceeded},
{"1", v1.PodFailed},
{"2", v1.PodRunning},
},
expectedSucceeded: 1,
expectedFailed: 2,
expectedCompletedIdxs: "0",
expectedCondition: &jobConditionFailed,
expectedConditionStatus: v1.ConditionTrue,
expectedConditionReason: "BackoffLimitExceeded",
expectedPodPatches: 3,
expectedReady: ptr.To[int32](0),
expectedDeletions: 1,
},
"count terminating pods when indexed job fails and PodReplacementPolicy enabled": {
parallelism: 2,
completions: 3,
backoffLimit: 0,
completionMode: batch.IndexedCompletion,
podsWithIndexes: []indexPhase{
{"0", v1.PodSucceeded},
{"1", v1.PodFailed},
{"2", v1.PodRunning},
},
jobPodReplacementPolicy: true,
expectedSucceeded: 1,
expectedFailed: 2,
expectedCompletedIdxs: "0",
expectedCondition: &jobConditionFailed,
expectedConditionStatus: v1.ConditionTrue,
expectedConditionReason: "BackoffLimitExceeded",
expectedPodPatches: 3,
expectedReady: ptr.To[int32](0),
expectedDeletions: 1,
expectedTerminating: ptr.To[int32](1),
},
"indexed job repeated completed index": { "indexed job repeated completed index": {
parallelism: 2, parallelism: 2,
completions: 3, completions: 3,
@@ -888,6 +942,25 @@ func TestControllerSyncJob(t *testing.T) {
expectedPodPatches: 2, expectedPodPatches: 2,
expectedReady: ptr.To[int32](0), expectedReady: ptr.To[int32](0),
}, },
"suspending a job with satisfied expectations; PodReplacementPolicy enabled": {
// Suspended Job should delete active pods when expectations are
// satisfied.
suspend: true,
parallelism: 2,
activePods: 2, // parallelism == active, expectations satisfied
completions: 4,
backoffLimit: 6,
jobPodReplacementPolicy: true,
expectedCreations: 0,
expectedDeletions: 2,
expectedActive: 0,
expectedCondition: &jobConditionSuspended,
expectedConditionStatus: v1.ConditionTrue,
expectedConditionReason: "JobSuspended",
expectedPodPatches: 2,
expectedReady: ptr.To[int32](0),
expectedTerminating: ptr.To[int32](2),
},
"suspending a job with unsatisfied expectations": { "suspending a job with unsatisfied expectations": {
// Unlike the previous test, we expect the controller to NOT suspend the // Unlike the previous test, we expect the controller to NOT suspend the
// Job in the syncJob call because the controller will wait for // Job in the syncJob call because the controller will wait for
@@ -904,6 +977,20 @@ func TestControllerSyncJob(t *testing.T) {
expectedActive: 3, expectedActive: 3,
expectedReady: ptr.To[int32](0), expectedReady: ptr.To[int32](0),
}, },
"suspending a job with unsatisfied expectations; PodReplacementPolicy enabled": {
suspend: true,
parallelism: 2,
activePods: 3, // active > parallelism, expectations unsatisfied
fakeExpectationAtCreation: -1, // the controller is expecting a deletion
completions: 4,
backoffLimit: 6,
jobPodReplacementPolicy: true,
expectedCreations: 0,
expectedDeletions: 0,
expectedActive: 3,
expectedReady: ptr.To[int32](0),
expectedTerminating: ptr.To[int32](0),
},
"resuming a suspended job": { "resuming a suspended job": {
wasSuspended: true, wasSuspended: true,
suspend: false, suspend: false,
@@ -935,6 +1022,21 @@ func TestControllerSyncJob(t *testing.T) {
expectedPodPatches: 2, expectedPodPatches: 2,
expectedReady: ptr.To[int32](0), expectedReady: ptr.To[int32](0),
}, },
"suspending a deleted job; PodReplacementPolicy enabled": {
suspend: true,
deleting: true,
parallelism: 2,
activePods: 2, // parallelism == active, expectations satisfied
completions: 4,
backoffLimit: 6,
jobPodReplacementPolicy: true,
expectedCreations: 0,
expectedDeletions: 0,
expectedActive: 2,
expectedPodPatches: 2,
expectedReady: ptr.To[int32](0),
expectedTerminating: ptr.To[int32](0),
},
"indexed job with podIndexLabel feature disabled": { "indexed job with podIndexLabel feature disabled": {
parallelism: 2, parallelism: 2,
completions: 5, completions: 5,
@@ -3604,15 +3706,17 @@ func TestSyncJobWithJobSuccessPolicy(t *testing.T) {
} }
testCases := map[string]struct { testCases := map[string]struct {
enableJobFailurePolicy bool enableJobFailurePolicy bool
enableBackoffLimitPerIndex bool enableBackoffLimitPerIndex bool
enableJobSuccessPolicy bool enableJobSuccessPolicy bool
job batch.Job enableJobPodReplacementPolicy bool
pods []v1.Pod job batch.Job
wantStatus batch.JobStatus pods []v1.Pod
wantStatus batch.JobStatus
}{ }{
"job with successPolicy; job has SuccessCriteriaMet condition if job meets to successPolicy and some indexes fail": { "job with successPolicy; jobPodReplacementPolicy feature enabled; job has SuccessCriteriaMet condition if job meets to successPolicy and some indexes fail": {
enableJobSuccessPolicy: true, enableJobSuccessPolicy: true,
enableJobPodReplacementPolicy: true,
job: batch.Job{ job: batch.Job{
TypeMeta: validTypeMeta, TypeMeta: validTypeMeta,
ObjectMeta: validObjectMeta, ObjectMeta: validObjectMeta,
@@ -3640,7 +3744,7 @@ func TestSyncJobWithJobSuccessPolicy(t *testing.T) {
wantStatus: batch.JobStatus{ wantStatus: batch.JobStatus{
Failed: 1, Failed: 1,
Succeeded: 1, Succeeded: 1,
Terminating: ptr.To[int32](0), Terminating: ptr.To[int32](2),
CompletedIndexes: "1", CompletedIndexes: "1",
UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
Conditions: []batch.JobCondition{ Conditions: []batch.JobCondition{
@@ -3659,9 +3763,58 @@ func TestSyncJobWithJobSuccessPolicy(t *testing.T) {
}, },
}, },
}, },
"job with podFailurePolicy and successPolicy; job has SuccessCriteriaMet condition if job meets to successPolicy and doesn't meet to podFailurePolicy": { "job with successPolicy; jobPodReplacementPolicy feature disabled; job has SuccessCriteriaMet condition if job meets to successPolicy and some indexes fail": {
enableJobSuccessPolicy: true, enableJobSuccessPolicy: true,
enableJobFailurePolicy: true, enableJobPodReplacementPolicy: false,
job: batch.Job{
TypeMeta: validTypeMeta,
ObjectMeta: validObjectMeta,
Spec: batch.JobSpec{
Selector: validSelector,
Template: validTemplate,
CompletionMode: completionModePtr(batch.IndexedCompletion),
Parallelism: ptr.To[int32](3),
Completions: ptr.To[int32](3),
BackoffLimit: ptr.To[int32](math.MaxInt32),
SuccessPolicy: &batch.SuccessPolicy{
Rules: []batch.SuccessPolicyRule{{
SucceededIndexes: ptr.To("0,1"),
SucceededCount: ptr.To[int32](1),
}},
},
},
},
pods: []v1.Pod{
*buildPod().uid("a1").index("0").phase(v1.PodFailed).trackingFinalizer().Pod,
*buildPod().uid("a2").index("0").phase(v1.PodRunning).trackingFinalizer().Pod,
*buildPod().uid("b").index("1").phase(v1.PodSucceeded).trackingFinalizer().Pod,
*buildPod().uid("c").index("2").phase(v1.PodRunning).trackingFinalizer().Pod,
},
wantStatus: batch.JobStatus{
Failed: 1,
Succeeded: 1,
CompletedIndexes: "1",
UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
Conditions: []batch.JobCondition{
{
Type: batch.JobSuccessCriteriaMet,
Status: v1.ConditionTrue,
Reason: batch.JobReasonSuccessPolicy,
Message: "Matched rules at index 0",
},
{
Type: batch.JobComplete,
Status: v1.ConditionTrue,
Reason: batch.JobReasonSuccessPolicy,
Message: "Matched rules at index 0",
},
},
},
},
"job with podFailurePolicy and successPolicy; jobPodReplacementPolicy feature enabled; job has SuccessCriteriaMet condition if job meets to successPolicy and doesn't meet to podFailurePolicy": {
enableJobSuccessPolicy: true,
enableJobFailurePolicy: true,
enableJobPodReplacementPolicy: true,
job: batch.Job{ job: batch.Job{
TypeMeta: validTypeMeta, TypeMeta: validTypeMeta,
ObjectMeta: validObjectMeta, ObjectMeta: validObjectMeta,
@@ -3697,7 +3850,7 @@ func TestSyncJobWithJobSuccessPolicy(t *testing.T) {
wantStatus: batch.JobStatus{ wantStatus: batch.JobStatus{
Failed: 1, Failed: 1,
Succeeded: 1, Succeeded: 1,
Terminating: ptr.To[int32](0), Terminating: ptr.To[int32](1),
CompletedIndexes: "1", CompletedIndexes: "1",
UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
Conditions: []batch.JobCondition{ Conditions: []batch.JobCondition{
@@ -3716,9 +3869,67 @@ func TestSyncJobWithJobSuccessPolicy(t *testing.T) {
}, },
}, },
}, },
"job with backoffLimitPerIndex and successPolicy; job has SuccessCriteriaMet condition if job meets to successPolicy and doesn't meet backoffLimitPerIndex": { "job with podFailurePolicy and successPolicy; jobPodReplacementPolicy feature disabled; job has SuccessCriteriaMet condition if job meets to successPolicy and doesn't meet to podFailurePolicy": {
enableJobSuccessPolicy: true, enableJobSuccessPolicy: true,
enableBackoffLimitPerIndex: true, enableJobFailurePolicy: true,
enableJobPodReplacementPolicy: false,
job: batch.Job{
TypeMeta: validTypeMeta,
ObjectMeta: validObjectMeta,
Spec: batch.JobSpec{
Selector: validSelector,
Template: validTemplate,
CompletionMode: completionModePtr(batch.IndexedCompletion),
Parallelism: ptr.To[int32](2),
Completions: ptr.To[int32](2),
BackoffLimit: ptr.To[int32](math.MaxInt32),
SuccessPolicy: &batch.SuccessPolicy{
Rules: []batch.SuccessPolicyRule{{
SucceededIndexes: ptr.To("0,1"),
SucceededCount: ptr.To[int32](1),
}},
},
PodFailurePolicy: &batch.PodFailurePolicy{
Rules: []batch.PodFailurePolicyRule{{
Action: batch.PodFailurePolicyActionFailJob,
OnPodConditions: []batch.PodFailurePolicyOnPodConditionsPattern{{
Type: v1.DisruptionTarget,
Status: v1.ConditionTrue,
}},
}},
},
},
},
pods: []v1.Pod{
*buildPod().uid("a1").index("0").phase(v1.PodFailed).trackingFinalizer().Pod,
*buildPod().uid("a2").index("0").phase(v1.PodRunning).trackingFinalizer().Pod,
*buildPod().uid("b").index("1").phase(v1.PodSucceeded).trackingFinalizer().Pod,
},
wantStatus: batch.JobStatus{
Failed: 1,
Succeeded: 1,
CompletedIndexes: "1",
UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
Conditions: []batch.JobCondition{
{
Type: batch.JobSuccessCriteriaMet,
Status: v1.ConditionTrue,
Reason: batch.JobReasonSuccessPolicy,
Message: "Matched rules at index 0",
},
{
Type: batch.JobComplete,
Status: v1.ConditionTrue,
Reason: batch.JobReasonSuccessPolicy,
Message: "Matched rules at index 0",
},
},
},
},
"job with backoffLimitPerIndex and successPolicy; jobPodReplacementPolicy feature enabled; job has SuccessCriteriaMet condition if job meets to successPolicy and doesn't meet backoffLimitPerIndex": {
enableJobSuccessPolicy: true,
enableBackoffLimitPerIndex: true,
enableJobPodReplacementPolicy: true,
job: batch.Job{ job: batch.Job{
TypeMeta: validTypeMeta, TypeMeta: validTypeMeta,
ObjectMeta: validObjectMeta, ObjectMeta: validObjectMeta,
@@ -3746,7 +3957,7 @@ func TestSyncJobWithJobSuccessPolicy(t *testing.T) {
wantStatus: batch.JobStatus{ wantStatus: batch.JobStatus{
Failed: 1, Failed: 1,
Succeeded: 1, Succeeded: 1,
Terminating: ptr.To[int32](0), Terminating: ptr.To[int32](1),
CompletedIndexes: "1", CompletedIndexes: "1",
FailedIndexes: ptr.To(""), FailedIndexes: ptr.To(""),
UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
@@ -3766,8 +3977,9 @@ func TestSyncJobWithJobSuccessPolicy(t *testing.T) {
}, },
}, },
}, },
"job with successPolicy; job has both Complete and SuccessCriteriaMet condition when job meets to successPolicy and all pods have been already removed": { "job with successPolicy; jobPodReplacementPolicy feature enabled; job has both Complete and SuccessCriteriaMet condition when job meets to successPolicy and all pods have been already removed": {
enableJobSuccessPolicy: true, enableJobSuccessPolicy: true,
enableJobPodReplacementPolicy: true,
job: batch.Job{ job: batch.Job{
TypeMeta: validTypeMeta, TypeMeta: validTypeMeta,
ObjectMeta: validObjectMeta, ObjectMeta: validObjectMeta,
@@ -3828,8 +4040,9 @@ func TestSyncJobWithJobSuccessPolicy(t *testing.T) {
// So, we need to revisit here before we graduate the JobSuccessPolicy to beta. // So, we need to revisit here before we graduate the JobSuccessPolicy to beta.
// TODO(#123775): A Job might finish with ready!=0 // TODO(#123775): A Job might finish with ready!=0
// REF: https://github.com/kubernetes/kubernetes/issues/123775 // REF: https://github.com/kubernetes/kubernetes/issues/123775
"job with successPolicy; job has SuccessCriteriaMet and Complete condition when job meets to successPolicy and some pods still are running": { "job with successPolicy; jobPodReplacementPolicy feature enabled; job has SuccessCriteriaMet and Complete condition when job meets to successPolicy and some pods still are running": {
enableJobSuccessPolicy: true, enableJobSuccessPolicy: true,
enableJobPodReplacementPolicy: true,
job: batch.Job{ job: batch.Job{
TypeMeta: validTypeMeta, TypeMeta: validTypeMeta,
ObjectMeta: validObjectMeta, ObjectMeta: validObjectMeta,
@@ -3868,7 +4081,7 @@ func TestSyncJobWithJobSuccessPolicy(t *testing.T) {
wantStatus: batch.JobStatus{ wantStatus: batch.JobStatus{
Failed: 1, Failed: 1,
Succeeded: 1, Succeeded: 1,
Terminating: ptr.To[int32](0), Terminating: ptr.To[int32](2),
CompletedIndexes: "1", CompletedIndexes: "1",
UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
Conditions: []batch.JobCondition{ Conditions: []batch.JobCondition{
@@ -3887,9 +4100,10 @@ func TestSyncJobWithJobSuccessPolicy(t *testing.T) {
}, },
}, },
}, },
"job with successPolicy and podFailurePolicy; job has a failed condition when job meets to both successPolicy and podFailurePolicy": { "job with successPolicy and podFailurePolicy; jobPodReplacementPolicy feature enabled; job has a failed condition when job meets to both successPolicy and podFailurePolicy": {
enableJobSuccessPolicy: true, enableJobSuccessPolicy: true,
enableJobFailurePolicy: true, enableJobFailurePolicy: true,
enableJobPodReplacementPolicy: true,
job: batch.Job{ job: batch.Job{
TypeMeta: validTypeMeta, TypeMeta: validTypeMeta,
ObjectMeta: validObjectMeta, ObjectMeta: validObjectMeta,
@@ -3931,7 +4145,7 @@ func TestSyncJobWithJobSuccessPolicy(t *testing.T) {
wantStatus: batch.JobStatus{ wantStatus: batch.JobStatus{
Failed: 2, Failed: 2,
Succeeded: 1, Succeeded: 1,
Terminating: ptr.To[int32](0), Terminating: ptr.To[int32](1),
CompletedIndexes: "1", CompletedIndexes: "1",
UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
Conditions: []batch.JobCondition{ Conditions: []batch.JobCondition{
@@ -3950,9 +4164,10 @@ func TestSyncJobWithJobSuccessPolicy(t *testing.T) {
}, },
}, },
}, },
"job with successPolicy and backoffLimitPerIndex; job has a failed condition when job meets to both successPolicy and backoffLimitPerIndex": { "job with successPolicy and backoffLimitPerIndex; jobPodReplacementPolicy feature enabled; job has a failed condition when job meets to both successPolicy and backoffLimitPerIndex": {
enableJobSuccessPolicy: true, enableJobSuccessPolicy: true,
enableBackoffLimitPerIndex: true, enableJobPodReplacementPolicy: true,
enableBackoffLimitPerIndex: true,
job: batch.Job{ job: batch.Job{
TypeMeta: validTypeMeta, TypeMeta: validTypeMeta,
ObjectMeta: validObjectMeta, ObjectMeta: validObjectMeta,
@@ -3993,8 +4208,9 @@ func TestSyncJobWithJobSuccessPolicy(t *testing.T) {
}, },
}, },
}, },
"job with successPolicy and backoffLimit; job has a failed condition when job meets to both successPolicy and backoffLimit": { "job with successPolicy and backoffLimit; jobPodReplacementPolicy feature enabled; job has a failed condition when job meets to both successPolicy and backoffLimit": {
enableJobSuccessPolicy: true, enableJobSuccessPolicy: true,
enableJobPodReplacementPolicy: true,
job: batch.Job{ job: batch.Job{
TypeMeta: validTypeMeta, TypeMeta: validTypeMeta,
ObjectMeta: validObjectMeta, ObjectMeta: validObjectMeta,
@@ -4034,9 +4250,10 @@ func TestSyncJobWithJobSuccessPolicy(t *testing.T) {
}, },
}, },
}, },
"job with successPolicy and podFailurePolicy; job with SuccessCriteriaMet has never been transitioned to FailureTarget and Failed even if job meets podFailurePolicy": { "job with successPolicy and podFailurePolicy; jobPodReplacementPolicy feature enabled; job with SuccessCriteriaMet has never been transitioned to FailureTarget and Failed even if job meets podFailurePolicy": {
enableJobSuccessPolicy: true, enableJobSuccessPolicy: true,
enableJobFailurePolicy: true, enableJobFailurePolicy: true,
enableJobPodReplacementPolicy: true,
job: batch.Job{ job: batch.Job{
TypeMeta: validTypeMeta, TypeMeta: validTypeMeta,
ObjectMeta: validObjectMeta, ObjectMeta: validObjectMeta,
@@ -4110,9 +4327,10 @@ func TestSyncJobWithJobSuccessPolicy(t *testing.T) {
}, },
}, },
}, },
"job with successPolicy and backoffLimitPerIndex; job with SuccessCriteriaMet has never been transitioned to FailureTarget and Failed even if job meet backoffLimitPerIndex": { "job with successPolicy and backoffLimitPerIndex; jobPodReplacementPolicy feature enabled; job with SuccessCriteriaMet has never been transitioned to FailureTarget and Failed even if job meet backoffLimitPerIndex": {
enableJobSuccessPolicy: true, enableJobSuccessPolicy: true,
enableBackoffLimitPerIndex: true, enableJobPodReplacementPolicy: true,
enableBackoffLimitPerIndex: true,
job: batch.Job{ job: batch.Job{
TypeMeta: validTypeMeta, TypeMeta: validTypeMeta,
ObjectMeta: validObjectMeta, ObjectMeta: validObjectMeta,
@@ -4173,8 +4391,9 @@ func TestSyncJobWithJobSuccessPolicy(t *testing.T) {
}, },
}, },
}, },
"job with successPolicy and backoffLimit: job with SuccessCriteriaMet has never been transitioned to FailureTarget and Failed even if job meets backoffLimit": { "job with successPolicy and backoffLimit; jobPodReplacementPolicy feature enabled; job with SuccessCriteriaMet has never been transitioned to FailureTarget and Failed even if job meets backoffLimit": {
enableJobSuccessPolicy: true, enableJobSuccessPolicy: true,
enableJobPodReplacementPolicy: true,
job: batch.Job{ job: batch.Job{
TypeMeta: validTypeMeta, TypeMeta: validTypeMeta,
ObjectMeta: validObjectMeta, ObjectMeta: validObjectMeta,
@@ -4234,9 +4453,10 @@ func TestSyncJobWithJobSuccessPolicy(t *testing.T) {
}, },
}, },
}, },
"job with successPolicy and podFailureTarget; job with FailureTarget has never been transitioned to SuccessCriteriaMet even if job meets successPolicy": { "job with successPolicy and podFailureTarget; jobPodReplacementPolicy feature enabled; job with FailureTarget has never been transitioned to SuccessCriteriaMet even if job meets successPolicy": {
enableJobSuccessPolicy: true, enableJobSuccessPolicy: true,
enableJobFailurePolicy: true, enableJobFailurePolicy: true,
enableJobPodReplacementPolicy: true,
job: batch.Job{ job: batch.Job{
TypeMeta: validTypeMeta, TypeMeta: validTypeMeta,
ObjectMeta: validObjectMeta, ObjectMeta: validObjectMeta,
@@ -4317,6 +4537,7 @@ func TestSyncJobWithJobSuccessPolicy(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodFailurePolicy, tc.enableJobFailurePolicy) featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodFailurePolicy, tc.enableJobFailurePolicy)
featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobBackoffLimitPerIndex, tc.enableBackoffLimitPerIndex) featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobBackoffLimitPerIndex, tc.enableBackoffLimitPerIndex)
featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobSuccessPolicy, tc.enableJobSuccessPolicy) featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobSuccessPolicy, tc.enableJobSuccessPolicy)
featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodReplacementPolicy, tc.enableJobPodReplacementPolicy)
clientSet := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) clientSet := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
fakeClock := clocktesting.NewFakeClock(now) fakeClock := clocktesting.NewFakeClock(now)
@@ -4761,7 +4982,7 @@ func TestSyncJobWithJobBackoffLimitPerIndex(t *testing.T) {
wantStatus: batch.JobStatus{ wantStatus: batch.JobStatus{
Failed: 3, Failed: 3,
Succeeded: 1, Succeeded: 1,
Terminating: ptr.To[int32](0), Terminating: ptr.To[int32](1),
FailedIndexes: ptr.To("0,2"), FailedIndexes: ptr.To("0,2"),
CompletedIndexes: "1", CompletedIndexes: "1",
UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, UncountedTerminatedPods: &batch.UncountedTerminatedPods{},

View File

@@ -514,6 +514,7 @@ func TestSuccessPolicy(t *testing.T) {
wantActiveIndexes sets.Set[int] wantActiveIndexes sets.Set[int]
wantCompletedIndexes string wantCompletedIndexes string
wantFailedIndexes *string wantFailedIndexes *string
wantTerminating *int32
} }
podTemplateSpec := v1.PodTemplateSpec{ podTemplateSpec := v1.PodTemplateSpec{
@@ -561,6 +562,7 @@ func TestSuccessPolicy(t *testing.T) {
wantFailed: 0, wantFailed: 0,
wantSucceeded: 1, wantSucceeded: 1,
wantCompletedIndexes: "0", wantCompletedIndexes: "0",
wantTerminating: ptr.To(int32(0)),
}, },
}, },
wantConditionTypes: []batchv1.JobConditionType{batchv1.JobSuccessCriteriaMet, batchv1.JobComplete}, wantConditionTypes: []batchv1.JobConditionType{batchv1.JobSuccessCriteriaMet, batchv1.JobComplete},
@@ -595,6 +597,7 @@ func TestSuccessPolicy(t *testing.T) {
wantFailed: 0, wantFailed: 0,
wantSucceeded: 1, wantSucceeded: 1,
wantCompletedIndexes: "0", wantCompletedIndexes: "0",
wantTerminating: ptr.To(int32(0)),
}, },
}, },
wantConditionTypes: []batchv1.JobConditionType{batchv1.JobComplete}, wantConditionTypes: []batchv1.JobConditionType{batchv1.JobComplete},
@@ -630,6 +633,7 @@ func TestSuccessPolicy(t *testing.T) {
wantActiveIndexes: sets.New(0, 1), wantActiveIndexes: sets.New(0, 1),
wantFailed: 0, wantFailed: 0,
wantSucceeded: 0, wantSucceeded: 0,
wantTerminating: ptr.To(int32(0)),
}, },
{ {
index: 1, index: 1,
@@ -640,6 +644,7 @@ func TestSuccessPolicy(t *testing.T) {
wantFailed: 0, wantFailed: 0,
wantSucceeded: 1, wantSucceeded: 1,
wantCompletedIndexes: "1", wantCompletedIndexes: "1",
wantTerminating: ptr.To(int32(1)),
}, },
}, },
wantConditionTypes: []batchv1.JobConditionType{batchv1.JobSuccessCriteriaMet, batchv1.JobComplete}, wantConditionTypes: []batchv1.JobConditionType{batchv1.JobSuccessCriteriaMet, batchv1.JobComplete},
@@ -675,6 +680,7 @@ func TestSuccessPolicy(t *testing.T) {
wantActiveIndexes: sets.New(0, 1), wantActiveIndexes: sets.New(0, 1),
wantFailed: 0, wantFailed: 0,
wantSucceeded: 0, wantSucceeded: 0,
wantTerminating: ptr.To(int32(0)),
}, },
{ {
index: 1, index: 1,
@@ -685,6 +691,7 @@ func TestSuccessPolicy(t *testing.T) {
wantFailed: 0, wantFailed: 0,
wantSucceeded: 1, wantSucceeded: 1,
wantCompletedIndexes: "1", wantCompletedIndexes: "1",
wantTerminating: ptr.To(int32(1)),
}, },
}, },
wantConditionTypes: []batchv1.JobConditionType{batchv1.JobSuccessCriteriaMet, batchv1.JobComplete}, wantConditionTypes: []batchv1.JobConditionType{batchv1.JobSuccessCriteriaMet, batchv1.JobComplete},
@@ -723,6 +730,7 @@ func TestSuccessPolicy(t *testing.T) {
wantFailed: 1, wantFailed: 1,
wantFailedIndexes: ptr.To("0"), wantFailedIndexes: ptr.To("0"),
wantSucceeded: 0, wantSucceeded: 0,
wantTerminating: ptr.To(int32(0)),
}, },
{ {
index: 1, index: 1,
@@ -734,6 +742,7 @@ func TestSuccessPolicy(t *testing.T) {
wantSucceeded: 1, wantSucceeded: 1,
wantFailedIndexes: ptr.To("0"), wantFailedIndexes: ptr.To("0"),
wantCompletedIndexes: "1", wantCompletedIndexes: "1",
wantTerminating: ptr.To(int32(0)),
}, },
}, },
wantConditionTypes: []batchv1.JobConditionType{batchv1.JobFailed}, wantConditionTypes: []batchv1.JobConditionType{batchv1.JobFailed},
@@ -774,7 +783,7 @@ func TestSuccessPolicy(t *testing.T) {
Succeeded: podTermination.wantSucceeded, Succeeded: podTermination.wantSucceeded,
Failed: podTermination.wantFailed, Failed: podTermination.wantFailed,
Ready: ptr.To[int32](0), Ready: ptr.To[int32](0),
Terminating: ptr.To[int32](0), Terminating: podTermination.wantTerminating,
}) })
validateIndexedJobPods(ctx, t, clientSet, jobObj, podTermination.wantActiveIndexes, podTermination.wantCompletedIndexes, podTermination.wantFailedIndexes) validateIndexedJobPods(ctx, t, clientSet, jobObj, podTermination.wantActiveIndexes, podTermination.wantCompletedIndexes, podTermination.wantFailedIndexes)
} }
@@ -861,7 +870,7 @@ func TestSuccessPolicy_ReEnabling(t *testing.T) {
Active: 0, Active: 0,
Succeeded: 3, Succeeded: 3,
Ready: ptr.To[int32](0), Ready: ptr.To[int32](0),
Terminating: ptr.To[int32](0), Terminating: ptr.To[int32](2),
}) })
validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.New[int](), "0-2", nil) validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.New[int](), "0-2", nil)
@@ -1168,6 +1177,7 @@ func TestBackoffLimitPerIndex(t *testing.T) {
wantCompletedIndexes string wantCompletedIndexes string
wantFailedIndexes *string wantFailedIndexes *string
wantReplacementPodFailureCount *int wantReplacementPodFailureCount *int
wantTerminating *int32
} }
podTemplateSpec := v1.PodTemplateSpec{ podTemplateSpec := v1.PodTemplateSpec{
@@ -1208,6 +1218,7 @@ func TestBackoffLimitPerIndex(t *testing.T) {
wantActiveIndexes: sets.New(0, 1), wantActiveIndexes: sets.New(0, 1),
wantFailedIndexes: ptr.To(""), wantFailedIndexes: ptr.To(""),
wantReplacementPodFailureCount: ptr.To(1), wantReplacementPodFailureCount: ptr.To(1),
wantTerminating: ptr.To(int32(0)),
}, },
}, },
wantJobConditionType: batchv1.JobComplete, wantJobConditionType: batchv1.JobComplete,
@@ -1238,6 +1249,7 @@ func TestBackoffLimitPerIndex(t *testing.T) {
wantActiveIndexes: sets.New(0, 1), wantActiveIndexes: sets.New(0, 1),
wantFailedIndexes: ptr.To(""), wantFailedIndexes: ptr.To(""),
wantReplacementPodFailureCount: ptr.To(1), wantReplacementPodFailureCount: ptr.To(1),
wantTerminating: ptr.To(int32(0)),
}, },
{ {
status: v1.PodStatus{ status: v1.PodStatus{
@@ -1248,6 +1260,7 @@ func TestBackoffLimitPerIndex(t *testing.T) {
wantActiveIndexes: sets.New(0, 1), wantActiveIndexes: sets.New(0, 1),
wantFailedIndexes: ptr.To(""), wantFailedIndexes: ptr.To(""),
wantReplacementPodFailureCount: ptr.To(2), wantReplacementPodFailureCount: ptr.To(2),
wantTerminating: ptr.To(int32(0)),
}, },
{ {
status: v1.PodStatus{ status: v1.PodStatus{
@@ -1257,6 +1270,7 @@ func TestBackoffLimitPerIndex(t *testing.T) {
wantFailed: 3, wantFailed: 3,
wantActiveIndexes: sets.New(1), wantActiveIndexes: sets.New(1),
wantFailedIndexes: ptr.To("0"), wantFailedIndexes: ptr.To("0"),
wantTerminating: ptr.To(int32(0)),
}, },
}, },
wantJobConditionType: batchv1.JobFailed, wantJobConditionType: batchv1.JobFailed,
@@ -1292,6 +1306,7 @@ func TestBackoffLimitPerIndex(t *testing.T) {
wantFailed: 1, wantFailed: 1,
wantActiveIndexes: sets.New(0, 1, 2), wantActiveIndexes: sets.New(0, 1, 2),
wantFailedIndexes: ptr.To(""), wantFailedIndexes: ptr.To(""),
wantTerminating: ptr.To(int32(0)),
}, },
{ {
index: 1, index: 1,
@@ -1302,6 +1317,7 @@ func TestBackoffLimitPerIndex(t *testing.T) {
wantFailed: 2, wantFailed: 2,
wantActiveIndexes: sets.New(0, 1, 2), wantActiveIndexes: sets.New(0, 1, 2),
wantFailedIndexes: ptr.To(""), wantFailedIndexes: ptr.To(""),
wantTerminating: ptr.To(int32(0)),
}, },
{ {
index: 2, index: 2,
@@ -1310,6 +1326,7 @@ func TestBackoffLimitPerIndex(t *testing.T) {
}, },
wantFailed: 5, wantFailed: 5,
wantFailedIndexes: ptr.To(""), wantFailedIndexes: ptr.To(""),
wantTerminating: ptr.To(int32(2)),
}, },
}, },
wantJobConditionType: batchv1.JobFailed, wantJobConditionType: batchv1.JobFailed,
@@ -1344,6 +1361,7 @@ func TestBackoffLimitPerIndex(t *testing.T) {
wantFailed: 1, wantFailed: 1,
wantActiveIndexes: sets.New(1), wantActiveIndexes: sets.New(1),
wantFailedIndexes: ptr.To("0"), wantFailedIndexes: ptr.To("0"),
wantTerminating: ptr.To(int32(0)),
}, },
{ {
index: 1, index: 1,
@@ -1354,6 +1372,7 @@ func TestBackoffLimitPerIndex(t *testing.T) {
wantSucceeded: 1, wantSucceeded: 1,
wantFailedIndexes: ptr.To("0"), wantFailedIndexes: ptr.To("0"),
wantCompletedIndexes: "1", wantCompletedIndexes: "1",
wantTerminating: ptr.To(int32(0)),
}, },
}, },
wantJobConditionType: batchv1.JobFailed, wantJobConditionType: batchv1.JobFailed,
@@ -1389,6 +1408,7 @@ func TestBackoffLimitPerIndex(t *testing.T) {
wantFailed: 1, wantFailed: 1,
wantActiveIndexes: sets.New(1, 2), wantActiveIndexes: sets.New(1, 2),
wantFailedIndexes: ptr.To("0"), wantFailedIndexes: ptr.To("0"),
wantTerminating: ptr.To(int32(0)),
}, },
{ {
index: 1, index: 1,
@@ -1398,6 +1418,7 @@ func TestBackoffLimitPerIndex(t *testing.T) {
wantActive: 0, wantActive: 0,
wantFailed: 3, wantFailed: 3,
wantFailedIndexes: ptr.To("0,1"), wantFailedIndexes: ptr.To("0,1"),
wantTerminating: ptr.To(int32(1)),
}, },
}, },
wantJobConditionType: batchv1.JobFailed, wantJobConditionType: batchv1.JobFailed,
@@ -1457,6 +1478,7 @@ func TestBackoffLimitPerIndex(t *testing.T) {
wantFailed: 1, wantFailed: 1,
wantActiveIndexes: sets.New(1), wantActiveIndexes: sets.New(1),
wantFailedIndexes: ptr.To("0"), wantFailedIndexes: ptr.To("0"),
wantTerminating: ptr.To(int32(0)),
}, },
{ {
index: 1, index: 1,
@@ -1471,6 +1493,7 @@ func TestBackoffLimitPerIndex(t *testing.T) {
}, },
wantFailed: 2, wantFailed: 2,
wantFailedIndexes: ptr.To("0,1"), wantFailedIndexes: ptr.To("0,1"),
wantTerminating: ptr.To(int32(0)),
}, },
}, },
wantJobConditionType: batchv1.JobFailed, wantJobConditionType: batchv1.JobFailed,
@@ -1517,7 +1540,7 @@ func TestBackoffLimitPerIndex(t *testing.T) {
Succeeded: podTermination.wantSucceeded, Succeeded: podTermination.wantSucceeded,
Failed: podTermination.wantFailed, Failed: podTermination.wantFailed,
Ready: ptr.To[int32](0), Ready: ptr.To[int32](0),
Terminating: ptr.To[int32](0), Terminating: podTermination.wantTerminating,
}) })
validateIndexedJobPods(ctx, t, clientSet, jobObj, podTermination.wantActiveIndexes, podTermination.wantCompletedIndexes, podTermination.wantFailedIndexes) validateIndexedJobPods(ctx, t, clientSet, jobObj, podTermination.wantActiveIndexes, podTermination.wantCompletedIndexes, podTermination.wantFailedIndexes)
if podTermination.wantReplacementPodFailureCount != nil { if podTermination.wantReplacementPodFailureCount != nil {
@@ -2730,6 +2753,7 @@ func TestElasticIndexedJob(t *testing.T) {
wantFailed int wantFailed int
wantRemainingIndexes sets.Set[int] wantRemainingIndexes sets.Set[int]
wantActivePods int wantActivePods int
wantTerminating *int32
} }
cases := map[string]struct { cases := map[string]struct {
featureGate bool featureGate bool
@@ -2739,7 +2763,8 @@ func TestElasticIndexedJob(t *testing.T) {
"feature flag off, mutation not allowed": { "feature flag off, mutation not allowed": {
jobUpdates: []jobUpdate{ jobUpdates: []jobUpdate{
{ {
completions: ptr.To[int32](4), completions: ptr.To[int32](4),
wantTerminating: ptr.To[int32](0),
}, },
}, },
wantErr: apierrors.NewInvalid( wantErr: apierrors.NewInvalid(
@@ -2756,6 +2781,7 @@ func TestElasticIndexedJob(t *testing.T) {
completions: ptr.To[int32](4), completions: ptr.To[int32](4),
succeedIndexes: []int{0, 1, 2, 3}, succeedIndexes: []int{0, 1, 2, 3},
wantSucceededIndexes: "0-3", wantSucceededIndexes: "0-3",
wantTerminating: ptr.To[int32](0),
}, },
}, },
}, },
@@ -2770,6 +2796,7 @@ func TestElasticIndexedJob(t *testing.T) {
wantFailed: 1, wantFailed: 1,
wantRemainingIndexes: sets.New(0, 2), wantRemainingIndexes: sets.New(0, 2),
wantActivePods: 2, wantActivePods: 2,
wantTerminating: ptr.To[int32](0),
}, },
// Scale down completions 3->1, verify prev failure out of range still counts // Scale down completions 3->1, verify prev failure out of range still counts
// but succeeded out of range does not. // but succeeded out of range does not.
@@ -2778,6 +2805,7 @@ func TestElasticIndexedJob(t *testing.T) {
succeedIndexes: []int{0}, succeedIndexes: []int{0},
wantSucceededIndexes: "0", wantSucceededIndexes: "0",
wantFailed: 1, wantFailed: 1,
wantTerminating: ptr.To[int32](0),
}, },
}, },
}, },
@@ -2790,18 +2818,21 @@ func TestElasticIndexedJob(t *testing.T) {
wantSucceededIndexes: "2", wantSucceededIndexes: "2",
wantRemainingIndexes: sets.New(0, 1), wantRemainingIndexes: sets.New(0, 1),
wantActivePods: 2, wantActivePods: 2,
wantTerminating: ptr.To[int32](0),
}, },
// Scale completions down 3->2 to exclude previously succeeded index. // Scale completions down 3->2 to exclude previously succeeded index.
{ {
completions: ptr.To[int32](2), completions: ptr.To[int32](2),
wantRemainingIndexes: sets.New(0, 1), wantRemainingIndexes: sets.New(0, 1),
wantActivePods: 2, wantActivePods: 2,
wantTerminating: ptr.To[int32](0),
}, },
// Scale completions back up to include previously succeeded index that was temporarily out of range. // Scale completions back up to include previously succeeded index that was temporarily out of range.
{ {
completions: ptr.To[int32](3), completions: ptr.To[int32](3),
succeedIndexes: []int{0, 1, 2}, succeedIndexes: []int{0, 1, 2},
wantSucceededIndexes: "0-2", wantSucceededIndexes: "0-2",
wantTerminating: ptr.To[int32](0),
}, },
}, },
}, },
@@ -2809,7 +2840,8 @@ func TestElasticIndexedJob(t *testing.T) {
featureGate: true, featureGate: true,
jobUpdates: []jobUpdate{ jobUpdates: []jobUpdate{
{ {
completions: ptr.To[int32](0), completions: ptr.To[int32](0),
wantTerminating: ptr.To[int32](3),
}, },
}, },
}, },
@@ -2887,7 +2919,7 @@ func TestElasticIndexedJob(t *testing.T) {
Succeeded: len(update.succeedIndexes), Succeeded: len(update.succeedIndexes),
Failed: update.wantFailed, Failed: update.wantFailed,
Ready: ptr.To[int32](0), Ready: ptr.To[int32](0),
Terminating: ptr.To[int32](0), Terminating: update.wantTerminating,
}) })
validateIndexedJobPods(ctx, t, clientSet, jobObj, update.wantRemainingIndexes, update.wantSucceededIndexes, nil) validateIndexedJobPods(ctx, t, clientSet, jobObj, update.wantRemainingIndexes, update.wantSucceededIndexes, nil)
} }