Refactor tracking of terminating pods in Job controller
This commit is contained in:
		@@ -811,18 +811,16 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	var terminating *int32
 | 
			
		||||
	if feature.DefaultFeatureGate.Enabled(features.JobPodReplacementPolicy) {
 | 
			
		||||
		terminating = ptr.To(controller.CountTerminatingPods(pods))
 | 
			
		||||
	}
 | 
			
		||||
	jobCtx := &syncJobCtx{
 | 
			
		||||
		job:                  &job,
 | 
			
		||||
		pods:                 pods,
 | 
			
		||||
		activePods:           controller.FilterActivePods(logger, pods),
 | 
			
		||||
		terminating:          terminating,
 | 
			
		||||
		uncounted:            newUncountedTerminatedPods(*job.Status.UncountedTerminatedPods),
 | 
			
		||||
		expectedRmFinalizers: jm.finalizerExpectations.getExpectedUIDs(key),
 | 
			
		||||
	}
 | 
			
		||||
	if trackTerminatingPods(&job) {
 | 
			
		||||
		jobCtx.terminating = ptr.To(controller.CountTerminatingPods(pods))
 | 
			
		||||
	}
 | 
			
		||||
	active := int32(len(jobCtx.activePods))
 | 
			
		||||
	newSucceededPods, newFailedPods := getNewFinishedPods(jobCtx)
 | 
			
		||||
	jobCtx.succeeded = job.Status.Succeeded + int32(len(newSucceededPods)) + int32(len(jobCtx.uncounted.succeeded))
 | 
			
		||||
@@ -896,7 +894,7 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
 | 
			
		||||
			jobCtx.finishedCondition = nil
 | 
			
		||||
		}
 | 
			
		||||
		active -= deleted
 | 
			
		||||
		if feature.DefaultFeatureGate.Enabled(features.JobPodReplacementPolicy) {
 | 
			
		||||
		if trackTerminatingPods(jobCtx.job) {
 | 
			
		||||
			*jobCtx.terminating += deleted
 | 
			
		||||
		}
 | 
			
		||||
		manageJobErr = err
 | 
			
		||||
@@ -956,11 +954,15 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	var terminating *int32
 | 
			
		||||
	if feature.DefaultFeatureGate.Enabled(features.JobPodReplacementPolicy) {
 | 
			
		||||
		terminating = jobCtx.terminating
 | 
			
		||||
	}
 | 
			
		||||
	needsStatusUpdate := suspendCondChanged || active != job.Status.Active || !ptr.Equal(ready, job.Status.Ready)
 | 
			
		||||
	needsStatusUpdate = needsStatusUpdate || !ptr.Equal(job.Status.Terminating, jobCtx.terminating)
 | 
			
		||||
	needsStatusUpdate = needsStatusUpdate || !ptr.Equal(job.Status.Terminating, terminating)
 | 
			
		||||
	job.Status.Active = active
 | 
			
		||||
	job.Status.Ready = ready
 | 
			
		||||
	job.Status.Terminating = jobCtx.terminating
 | 
			
		||||
	job.Status.Terminating = terminating
 | 
			
		||||
	err = jm.trackJobStatusAndRemoveFinalizers(ctx, jobCtx, needsStatusUpdate)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return fmt.Errorf("tracking status: %w", err)
 | 
			
		||||
@@ -1504,23 +1506,12 @@ func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, jobCtx *syn
 | 
			
		||||
		jm.expectations.ExpectDeletions(logger, jobKey, len(podsToDelete))
 | 
			
		||||
		removed, err := jm.deleteJobPods(ctx, job, jobKey, podsToDelete)
 | 
			
		||||
		active -= removed
 | 
			
		||||
		if feature.DefaultFeatureGate.Enabled(features.JobPodReplacementPolicy) {
 | 
			
		||||
		if trackTerminatingPods(job) {
 | 
			
		||||
			*jobCtx.terminating += removed
 | 
			
		||||
		}
 | 
			
		||||
		return active, metrics.JobSyncActionPodsDeleted, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	var terminating int32 = 0
 | 
			
		||||
	if onlyReplaceFailedPods(jobCtx.job) {
 | 
			
		||||
		// For PodFailurePolicy specified but PodReplacementPolicy disabled
 | 
			
		||||
		// we still need to count terminating pods for replica counts
 | 
			
		||||
		// But we will not allow updates to status.
 | 
			
		||||
		if jobCtx.terminating == nil {
 | 
			
		||||
			terminating = controller.CountTerminatingPods(jobCtx.pods)
 | 
			
		||||
		} else {
 | 
			
		||||
			terminating = *jobCtx.terminating
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	wantActive := int32(0)
 | 
			
		||||
	if job.Spec.Completions == nil {
 | 
			
		||||
		// Job does not specify a number of completions.  Therefore, number active
 | 
			
		||||
@@ -1556,7 +1547,7 @@ func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, jobCtx *syn
 | 
			
		||||
		logger.V(4).Info("Too many pods running for job", "job", klog.KObj(job), "deleted", len(podsToDelete), "target", wantActive)
 | 
			
		||||
		removed, err := jm.deleteJobPods(ctx, job, jobKey, podsToDelete)
 | 
			
		||||
		active -= removed
 | 
			
		||||
		if feature.DefaultFeatureGate.Enabled(features.JobPodReplacementPolicy) {
 | 
			
		||||
		if trackTerminatingPods(job) {
 | 
			
		||||
			*jobCtx.terminating += removed
 | 
			
		||||
		}
 | 
			
		||||
		// While it is possible for a Job to require both pod creations and
 | 
			
		||||
@@ -1566,6 +1557,12 @@ func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, jobCtx *syn
 | 
			
		||||
		return active, metrics.JobSyncActionPodsDeleted, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	var terminating int32 = 0
 | 
			
		||||
	if onlyReplaceFailedPods(jobCtx.job) {
 | 
			
		||||
		// When onlyReplaceFailedPods=true, then also trackTerminatingPods=true,
 | 
			
		||||
		// and so we can use the value.
 | 
			
		||||
		terminating = *jobCtx.terminating
 | 
			
		||||
	}
 | 
			
		||||
	if diff := wantActive - terminating - active; diff > 0 {
 | 
			
		||||
		var remainingTime time.Duration
 | 
			
		||||
		if !hasBackoffLimitPerIndex(job) {
 | 
			
		||||
@@ -1951,6 +1948,17 @@ func countReadyPods(pods []*v1.Pod) int32 {
 | 
			
		||||
	return cnt
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// trackTerminatingPods checks if the count of terminating pods is tracked.
 | 
			
		||||
// They are tracked when any the following is true:
 | 
			
		||||
// - JobPodReplacementPolicy is enabled to be returned in the status field,
 | 
			
		||||
// - only failed pods are replaced, because pod failure policy is used
 | 
			
		||||
func trackTerminatingPods(job *batch.Job) bool {
 | 
			
		||||
	if feature.DefaultFeatureGate.Enabled(features.JobPodReplacementPolicy) {
 | 
			
		||||
		return true
 | 
			
		||||
	}
 | 
			
		||||
	return feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) && job.Spec.PodFailurePolicy != nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// This checks if we should apply PodReplacementPolicy.
 | 
			
		||||
// PodReplacementPolicy controls when we recreate pods if they are marked as terminating
 | 
			
		||||
// Failed means that we recreate only once the pod has terminated.
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user