Merge pull request #123242 from mimowo/fast-backoff-for-replacment-policy-tests
Improve accuracy of the PodsCreationTotal metric and use fast pod failure backoff for ReplacementPolicy integration tests
This commit is contained in:
		@@ -130,6 +130,7 @@ type syncJobCtx struct {
 | 
				
			|||||||
	finishedCondition               *batch.JobCondition
 | 
						finishedCondition               *batch.JobCondition
 | 
				
			||||||
	activePods                      []*v1.Pod
 | 
						activePods                      []*v1.Pod
 | 
				
			||||||
	succeeded                       int32
 | 
						succeeded                       int32
 | 
				
			||||||
 | 
						failed                          int32
 | 
				
			||||||
	prevSucceededIndexes            orderedIntervals
 | 
						prevSucceededIndexes            orderedIntervals
 | 
				
			||||||
	succeededIndexes                orderedIntervals
 | 
						succeededIndexes                orderedIntervals
 | 
				
			||||||
	failedIndexes                   *orderedIntervals
 | 
						failedIndexes                   *orderedIntervals
 | 
				
			||||||
@@ -790,7 +791,7 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
 | 
				
			|||||||
	active := int32(len(jobCtx.activePods))
 | 
						active := int32(len(jobCtx.activePods))
 | 
				
			||||||
	newSucceededPods, newFailedPods := getNewFinishedPods(jobCtx)
 | 
						newSucceededPods, newFailedPods := getNewFinishedPods(jobCtx)
 | 
				
			||||||
	jobCtx.succeeded = job.Status.Succeeded + int32(len(newSucceededPods)) + int32(len(jobCtx.uncounted.succeeded))
 | 
						jobCtx.succeeded = job.Status.Succeeded + int32(len(newSucceededPods)) + int32(len(jobCtx.uncounted.succeeded))
 | 
				
			||||||
	failed := job.Status.Failed + int32(nonIgnoredFailedPodsCount(jobCtx, newFailedPods)) + int32(len(jobCtx.uncounted.failed))
 | 
						jobCtx.failed = job.Status.Failed + int32(nonIgnoredFailedPodsCount(jobCtx, newFailedPods)) + int32(len(jobCtx.uncounted.failed))
 | 
				
			||||||
	var ready *int32
 | 
						var ready *int32
 | 
				
			||||||
	if feature.DefaultFeatureGate.Enabled(features.JobReadyPods) {
 | 
						if feature.DefaultFeatureGate.Enabled(features.JobReadyPods) {
 | 
				
			||||||
		ready = ptr.To(countReadyPods(jobCtx.activePods))
 | 
							ready = ptr.To(countReadyPods(jobCtx.activePods))
 | 
				
			||||||
@@ -806,7 +807,7 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	var manageJobErr error
 | 
						var manageJobErr error
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	exceedsBackoffLimit := failed > *job.Spec.BackoffLimit
 | 
						exceedsBackoffLimit := jobCtx.failed > *job.Spec.BackoffLimit
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) {
 | 
						if feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) {
 | 
				
			||||||
		if failureTargetCondition := findConditionByType(job.Status.Conditions, batch.JobFailureTarget); failureTargetCondition != nil {
 | 
							if failureTargetCondition := findConditionByType(job.Status.Conditions, batch.JobFailureTarget); failureTargetCondition != nil {
 | 
				
			||||||
@@ -1618,7 +1619,7 @@ func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, jobCtx *syn
 | 
				
			|||||||
			}
 | 
								}
 | 
				
			||||||
			diff -= batchSize
 | 
								diff -= batchSize
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		recordJobPodsCreationTotal(job, creationsSucceeded, creationsFailed)
 | 
							recordJobPodsCreationTotal(job, jobCtx, creationsSucceeded, creationsFailed)
 | 
				
			||||||
		return active, metrics.JobSyncActionPodsCreated, errorFromChannel(errCh)
 | 
							return active, metrics.JobSyncActionPodsCreated, errorFromChannel(errCh)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -1917,16 +1918,13 @@ func (jm *Controller) cleanupPodFinalizers(job *batch.Job) {
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func recordJobPodsCreationTotal(job *batch.Job, succeeded, failed int32) {
 | 
					func recordJobPodsCreationTotal(job *batch.Job, jobCtx *syncJobCtx, succeeded, failed int32) {
 | 
				
			||||||
	reason := metrics.PodCreateNew
 | 
						reason := metrics.PodCreateNew
 | 
				
			||||||
	if feature.DefaultFeatureGate.Enabled(features.JobPodReplacementPolicy) {
 | 
						if feature.DefaultFeatureGate.Enabled(features.JobPodReplacementPolicy) {
 | 
				
			||||||
		podsTerminating := job.Status.Terminating != nil && *job.Status.Terminating > 0
 | 
							if ptr.Deref(job.Spec.PodReplacementPolicy, batch.TerminatingOrFailed) == batch.Failed && jobCtx.failed > 0 {
 | 
				
			||||||
		isRecreateAction := podsTerminating || job.Status.Failed > 0
 | 
					 | 
				
			||||||
		if isRecreateAction {
 | 
					 | 
				
			||||||
			reason = metrics.PodRecreateTerminatingOrFailed
 | 
					 | 
				
			||||||
			if *job.Spec.PodReplacementPolicy == batch.Failed {
 | 
					 | 
				
			||||||
			reason = metrics.PodRecreateFailed
 | 
								reason = metrics.PodRecreateFailed
 | 
				
			||||||
			}
 | 
							} else if jobCtx.failed > 0 || ptr.Deref(jobCtx.terminating, 0) > 0 {
 | 
				
			||||||
 | 
								reason = metrics.PodRecreateTerminatingOrFailed
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	if succeeded > 0 {
 | 
						if succeeded > 0 {
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -1587,6 +1587,7 @@ func TestIndexedJob(t *testing.T) {
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func TestJobPodReplacementPolicy(t *testing.T) {
 | 
					func TestJobPodReplacementPolicy(t *testing.T) {
 | 
				
			||||||
 | 
						t.Cleanup(setDurationDuringTest(&jobcontroller.DefaultJobPodFailureBackOff, fastPodFailureBackoff))
 | 
				
			||||||
	indexedCompletion := batchv1.IndexedCompletion
 | 
						indexedCompletion := batchv1.IndexedCompletion
 | 
				
			||||||
	nonIndexedCompletion := batchv1.NonIndexedCompletion
 | 
						nonIndexedCompletion := batchv1.NonIndexedCompletion
 | 
				
			||||||
	var podReplacementPolicy = func(obj batchv1.PodReplacementPolicy) *batchv1.PodReplacementPolicy {
 | 
						var podReplacementPolicy = func(obj batchv1.PodReplacementPolicy) *batchv1.PodReplacementPolicy {
 | 
				
			||||||
@@ -1632,7 +1633,7 @@ func TestJobPodReplacementPolicy(t *testing.T) {
 | 
				
			|||||||
				new: 4,
 | 
									new: 4,
 | 
				
			||||||
			},
 | 
								},
 | 
				
			||||||
		},
 | 
							},
 | 
				
			||||||
		"feature flag true, TerminatingOrFailed policy, delete & fail pods, recreate terminating pods, and verify job status counters": {
 | 
							"feature flag true with IndexedJob, TerminatingOrFailed policy, delete & fail pods, recreate terminating pods, and verify job status counters": {
 | 
				
			||||||
			podReplacementPolicyEnabled: true,
 | 
								podReplacementPolicyEnabled: true,
 | 
				
			||||||
			jobSpec: &batchv1.JobSpec{
 | 
								jobSpec: &batchv1.JobSpec{
 | 
				
			||||||
				Parallelism:          ptr.To[int32](2),
 | 
									Parallelism:          ptr.To[int32](2),
 | 
				
			||||||
@@ -1665,7 +1666,7 @@ func TestJobPodReplacementPolicy(t *testing.T) {
 | 
				
			|||||||
			jobSpec: &batchv1.JobSpec{
 | 
								jobSpec: &batchv1.JobSpec{
 | 
				
			||||||
				Parallelism:          ptr.To[int32](2),
 | 
									Parallelism:          ptr.To[int32](2),
 | 
				
			||||||
				Completions:          ptr.To[int32](2),
 | 
									Completions:          ptr.To[int32](2),
 | 
				
			||||||
				CompletionMode:       &indexedCompletion,
 | 
									CompletionMode:       &nonIndexedCompletion,
 | 
				
			||||||
				PodReplacementPolicy: podReplacementPolicy(batchv1.TerminatingOrFailed),
 | 
									PodReplacementPolicy: podReplacementPolicy(batchv1.TerminatingOrFailed),
 | 
				
			||||||
				Template: v1.PodTemplateSpec{
 | 
									Template: v1.PodTemplateSpec{
 | 
				
			||||||
					ObjectMeta: metav1.ObjectMeta{
 | 
										ObjectMeta: metav1.ObjectMeta{
 | 
				
			||||||
@@ -1846,6 +1847,7 @@ func TestJobPodReplacementPolicy(t *testing.T) {
 | 
				
			|||||||
// Disable reverts to previous behavior.
 | 
					// Disable reverts to previous behavior.
 | 
				
			||||||
// Enabling will then match the original failed case.
 | 
					// Enabling will then match the original failed case.
 | 
				
			||||||
func TestJobPodReplacementPolicyFeatureToggling(t *testing.T) {
 | 
					func TestJobPodReplacementPolicyFeatureToggling(t *testing.T) {
 | 
				
			||||||
 | 
						t.Cleanup(setDurationDuringTest(&jobcontroller.DefaultJobPodFailureBackOff, fastPodFailureBackoff))
 | 
				
			||||||
	const podCount int32 = 2
 | 
						const podCount int32 = 2
 | 
				
			||||||
	jobSpec := batchv1.JobSpec{
 | 
						jobSpec := batchv1.JobSpec{
 | 
				
			||||||
		Parallelism:          ptr.To(podCount),
 | 
							Parallelism:          ptr.To(podCount),
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user