Introduce syncJobContext to limit the number of function parameters
This commit is contained in:
		@@ -131,6 +131,20 @@ type Controller struct {
 | 
			
		||||
	podBackoffStore *backoffStore
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type syncJobContext struct {
 | 
			
		||||
	job                           *batch.Job
 | 
			
		||||
	pods                          []*v1.Pod
 | 
			
		||||
	finishedCondition             *batch.JobCondition
 | 
			
		||||
	activePods                    []*v1.Pod
 | 
			
		||||
	succeeded                     int32
 | 
			
		||||
	prevSucceededIndexes          orderedIntervals
 | 
			
		||||
	succeededIndexes              orderedIntervals
 | 
			
		||||
	newBackoffRecord              backoffRecord
 | 
			
		||||
	expectedRmFinalizers          sets.Set[string]
 | 
			
		||||
	uncounted                     *uncountedTerminatedPods
 | 
			
		||||
	podFailureCountByPolicyAction map[string]int
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// NewController creates a new Job controller that keeps the relevant pods
 | 
			
		||||
// in sync with their corresponding Job objects.
 | 
			
		||||
func NewController(ctx context.Context, podInformer coreinformers.PodInformer, jobInformer batchinformers.JobInformer, kubeClient clientset.Interface) *Controller {
 | 
			
		||||
@@ -742,6 +756,9 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
 | 
			
		||||
		return nil
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	syncJobContext := &syncJobContext{}
 | 
			
		||||
	syncJobContext.job = &job
 | 
			
		||||
 | 
			
		||||
	completionMode := getCompletionMode(&job)
 | 
			
		||||
	action := metrics.JobSyncActionReconciling
 | 
			
		||||
 | 
			
		||||
@@ -759,7 +776,8 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
 | 
			
		||||
		job.Status.UncountedTerminatedPods = &batch.UncountedTerminatedPods{}
 | 
			
		||||
	}
 | 
			
		||||
	uncounted := newUncountedTerminatedPods(*job.Status.UncountedTerminatedPods)
 | 
			
		||||
	expectedRmFinalizers := jm.finalizerExpectations.getExpectedUIDs(key)
 | 
			
		||||
	syncJobContext.uncounted = uncounted
 | 
			
		||||
	syncJobContext.expectedRmFinalizers = jm.finalizerExpectations.getExpectedUIDs(key)
 | 
			
		||||
 | 
			
		||||
	// Check the expectations of the job before counting active pods, otherwise a new pod can sneak in
 | 
			
		||||
	// and update the expectations after we've retrieved active pods from the store. If a new pod enters
 | 
			
		||||
@@ -771,10 +789,12 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	syncJobContext.pods = pods
 | 
			
		||||
	activePods := controller.FilterActivePods(pods)
 | 
			
		||||
	syncJobContext.activePods = activePods
 | 
			
		||||
	active := int32(len(activePods))
 | 
			
		||||
	newSucceededPods, newFailedPods := getNewFinishedPods(&job, pods, uncounted, expectedRmFinalizers)
 | 
			
		||||
	succeeded := job.Status.Succeeded + int32(len(newSucceededPods)) + int32(len(uncounted.succeeded))
 | 
			
		||||
	newSucceededPods, newFailedPods := getNewFinishedPods(syncJobContext)
 | 
			
		||||
	syncJobContext.succeeded = job.Status.Succeeded + int32(len(newSucceededPods)) + int32(len(uncounted.succeeded))
 | 
			
		||||
	failed := job.Status.Failed + int32(len(newFailedPods)) + int32(len(uncounted.failed))
 | 
			
		||||
	var ready *int32
 | 
			
		||||
	if feature.DefaultFeatureGate.Enabled(features.JobReadyPods) {
 | 
			
		||||
@@ -787,28 +807,27 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
 | 
			
		||||
		job.Status.StartTime = &now
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	newBackoffRecord := jm.podBackoffStore.newBackoffRecord(key, newSucceededPods, newFailedPods)
 | 
			
		||||
	syncJobContext.newBackoffRecord = jm.podBackoffStore.newBackoffRecord(key, newSucceededPods, newFailedPods)
 | 
			
		||||
 | 
			
		||||
	var manageJobErr error
 | 
			
		||||
	var finishedCondition *batch.JobCondition
 | 
			
		||||
 | 
			
		||||
	exceedsBackoffLimit := failed > *job.Spec.BackoffLimit
 | 
			
		||||
 | 
			
		||||
	if feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) {
 | 
			
		||||
		if failureTargetCondition := findConditionByType(job.Status.Conditions, batch.JobFailureTarget); failureTargetCondition != nil {
 | 
			
		||||
			finishedCondition = newFailedConditionForFailureTarget(failureTargetCondition, jm.clock.Now())
 | 
			
		||||
			syncJobContext.finishedCondition = newFailedConditionForFailureTarget(failureTargetCondition, jm.clock.Now())
 | 
			
		||||
		} else if failJobMessage := getFailJobMessage(&job, pods); failJobMessage != nil {
 | 
			
		||||
			// Prepare the interim FailureTarget condition to record the failure message before the finalizers (allowing removal of the pods) are removed.
 | 
			
		||||
			finishedCondition = newCondition(batch.JobFailureTarget, v1.ConditionTrue, jobConditionReasonPodFailurePolicy, *failJobMessage, jm.clock.Now())
 | 
			
		||||
			syncJobContext.finishedCondition = newCondition(batch.JobFailureTarget, v1.ConditionTrue, jobConditionReasonPodFailurePolicy, *failJobMessage, jm.clock.Now())
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	if finishedCondition == nil {
 | 
			
		||||
	if syncJobContext.finishedCondition == nil {
 | 
			
		||||
		if exceedsBackoffLimit || pastBackoffLimitOnFailure(&job, pods) {
 | 
			
		||||
			// check if the number of pod restart exceeds backoff (for restart OnFailure only)
 | 
			
		||||
			// OR if the number of failed jobs increased since the last syncJob
 | 
			
		||||
			finishedCondition = newCondition(batch.JobFailed, v1.ConditionTrue, "BackoffLimitExceeded", "Job has reached the specified backoff limit", jm.clock.Now())
 | 
			
		||||
			syncJobContext.finishedCondition = newCondition(batch.JobFailed, v1.ConditionTrue, "BackoffLimitExceeded", "Job has reached the specified backoff limit", jm.clock.Now())
 | 
			
		||||
		} else if jm.pastActiveDeadline(&job) {
 | 
			
		||||
			finishedCondition = newCondition(batch.JobFailed, v1.ConditionTrue, "DeadlineExceeded", "Job was active longer than specified deadline", jm.clock.Now())
 | 
			
		||||
			syncJobContext.finishedCondition = newCondition(batch.JobFailed, v1.ConditionTrue, "DeadlineExceeded", "Job was active longer than specified deadline", jm.clock.Now())
 | 
			
		||||
		} else if job.Spec.ActiveDeadlineSeconds != nil && !jobSuspended(&job) {
 | 
			
		||||
			syncDuration := time.Duration(*job.Spec.ActiveDeadlineSeconds)*time.Second - jm.clock.Since(job.Status.StartTime.Time)
 | 
			
		||||
			logger.V(2).Info("Job has activeDeadlineSeconds configuration. Will sync this job again", "key", key, "nextSyncIn", syncDuration)
 | 
			
		||||
@@ -819,23 +838,25 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
 | 
			
		||||
	var prevSucceededIndexes, succeededIndexes orderedIntervals
 | 
			
		||||
	if isIndexedJob(&job) {
 | 
			
		||||
		prevSucceededIndexes, succeededIndexes = calculateSucceededIndexes(logger, &job, pods)
 | 
			
		||||
		succeeded = int32(succeededIndexes.total())
 | 
			
		||||
		syncJobContext.prevSucceededIndexes = prevSucceededIndexes
 | 
			
		||||
		syncJobContext.succeededIndexes = succeededIndexes
 | 
			
		||||
		syncJobContext.succeeded = int32(succeededIndexes.total())
 | 
			
		||||
	}
 | 
			
		||||
	suspendCondChanged := false
 | 
			
		||||
	// Remove active pods if Job failed.
 | 
			
		||||
	if finishedCondition != nil {
 | 
			
		||||
	if syncJobContext.finishedCondition != nil {
 | 
			
		||||
		deleted, err := jm.deleteActivePods(ctx, &job, activePods)
 | 
			
		||||
		if deleted != active || !satisfiedExpectations {
 | 
			
		||||
			// Can't declare the Job as finished yet, as there might be remaining
 | 
			
		||||
			// pod finalizers or pods that are not in the informer's cache yet.
 | 
			
		||||
			finishedCondition = nil
 | 
			
		||||
			syncJobContext.finishedCondition = nil
 | 
			
		||||
		}
 | 
			
		||||
		active -= deleted
 | 
			
		||||
		manageJobErr = err
 | 
			
		||||
	} else {
 | 
			
		||||
		manageJobCalled := false
 | 
			
		||||
		if satisfiedExpectations && job.DeletionTimestamp == nil {
 | 
			
		||||
			active, action, manageJobErr = jm.manageJob(ctx, &job, activePods, succeeded, succeededIndexes, newBackoffRecord)
 | 
			
		||||
			active, action, manageJobErr = jm.manageJob(ctx, syncJobContext)
 | 
			
		||||
			manageJobCalled = true
 | 
			
		||||
		}
 | 
			
		||||
		complete := false
 | 
			
		||||
@@ -846,16 +867,16 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
 | 
			
		||||
			// not expected to fail, but if they do, the failure is ignored.  Once any
 | 
			
		||||
			// pod succeeds, the controller waits for remaining pods to finish, and
 | 
			
		||||
			// then the job is complete.
 | 
			
		||||
			complete = succeeded > 0 && active == 0
 | 
			
		||||
			complete = syncJobContext.succeeded > 0 && active == 0
 | 
			
		||||
		} else {
 | 
			
		||||
			// Job specifies a number of completions.  This type of job signals
 | 
			
		||||
			// success by having that number of successes.  Since we do not
 | 
			
		||||
			// start more pods than there are remaining completions, there should
 | 
			
		||||
			// not be any remaining active pods once this count is reached.
 | 
			
		||||
			complete = succeeded >= *job.Spec.Completions && active == 0
 | 
			
		||||
			complete = syncJobContext.succeeded >= *job.Spec.Completions && active == 0
 | 
			
		||||
		}
 | 
			
		||||
		if complete {
 | 
			
		||||
			finishedCondition = newCondition(batch.JobComplete, v1.ConditionTrue, "", "", jm.clock.Now())
 | 
			
		||||
			syncJobContext.finishedCondition = newCondition(batch.JobComplete, v1.ConditionTrue, "", "", jm.clock.Now())
 | 
			
		||||
		} else if manageJobCalled {
 | 
			
		||||
			// Update the conditions / emit events only if manageJob was called in
 | 
			
		||||
			// this syncJob. Otherwise wait for the right syncJob call to make
 | 
			
		||||
@@ -891,7 +912,7 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
 | 
			
		||||
	needsStatusUpdate := suspendCondChanged || active != job.Status.Active || !pointer.Int32Equal(ready, job.Status.Ready)
 | 
			
		||||
	job.Status.Active = active
 | 
			
		||||
	job.Status.Ready = ready
 | 
			
		||||
	err = jm.trackJobStatusAndRemoveFinalizers(ctx, &job, pods, prevSucceededIndexes, *uncounted, expectedRmFinalizers, finishedCondition, needsStatusUpdate, newBackoffRecord)
 | 
			
		||||
	err = jm.trackJobStatusAndRemoveFinalizers(ctx, syncJobContext, needsStatusUpdate)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return fmt.Errorf("tracking status: %w", err)
 | 
			
		||||
	}
 | 
			
		||||
@@ -970,8 +991,15 @@ func (jm *Controller) deleteJobPods(ctx context.Context, job *batch.Job, jobKey
 | 
			
		||||
//
 | 
			
		||||
// It does this up to a limited number of Pods so that the size of .status
 | 
			
		||||
// doesn't grow too much and this sync doesn't starve other Jobs.
 | 
			
		||||
func (jm *Controller) trackJobStatusAndRemoveFinalizers(ctx context.Context, job *batch.Job, pods []*v1.Pod, succeededIndexes orderedIntervals, uncounted uncountedTerminatedPods, expectedRmFinalizers sets.Set[string], finishedCond *batch.JobCondition, needsFlush bool, newBackoffRecord backoffRecord) error {
 | 
			
		||||
func (jm *Controller) trackJobStatusAndRemoveFinalizers(ctx context.Context, syncJobContext *syncJobContext, needsFlush bool) error {
 | 
			
		||||
	logger := klog.FromContext(ctx)
 | 
			
		||||
	job := syncJobContext.job
 | 
			
		||||
	pods := syncJobContext.pods
 | 
			
		||||
	finishedCond := syncJobContext.finishedCondition
 | 
			
		||||
	expectedRmFinalizers := syncJobContext.expectedRmFinalizers
 | 
			
		||||
	succeededIndexes := syncJobContext.succeededIndexes
 | 
			
		||||
	uncounted := syncJobContext.uncounted
 | 
			
		||||
 | 
			
		||||
	isIndexed := isIndexedJob(job)
 | 
			
		||||
	var podsToRemoveFinalizer []*v1.Pod
 | 
			
		||||
	uncountedStatus := job.Status.UncountedTerminatedPods
 | 
			
		||||
@@ -1070,8 +1098,9 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(ctx context.Context, job
 | 
			
		||||
			finishedCond = newFailedConditionForFailureTarget(finishedCond, jm.clock.Now())
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	syncJobContext.podFailureCountByPolicyAction = podFailureCountByPolicyAction
 | 
			
		||||
	var err error
 | 
			
		||||
	if job, needsFlush, err = jm.flushUncountedAndRemoveFinalizers(ctx, job, podsToRemoveFinalizer, uidsWithFinalizer, &oldCounters, podFailureCountByPolicyAction, needsFlush, newBackoffRecord); err != nil {
 | 
			
		||||
	if job, needsFlush, err = jm.flushUncountedAndRemoveFinalizers(ctx, syncJobContext, podsToRemoveFinalizer, uidsWithFinalizer, &oldCounters, needsFlush); err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	jobFinished := !reachedMaxUncountedPods && jm.enactJobFinished(job, finishedCond)
 | 
			
		||||
@@ -1101,8 +1130,11 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(ctx context.Context, job
 | 
			
		||||
//
 | 
			
		||||
// Returns whether there are pending changes in the Job status that need to be
 | 
			
		||||
// flushed in subsequent calls.
 | 
			
		||||
func (jm *Controller) flushUncountedAndRemoveFinalizers(ctx context.Context, job *batch.Job, podsToRemoveFinalizer []*v1.Pod, uidsWithFinalizer sets.Set[string], oldCounters *batch.JobStatus, podFailureCountByPolicyAction map[string]int, needsFlush bool, newBackoffRecord backoffRecord) (*batch.Job, bool, error) {
 | 
			
		||||
func (jm *Controller) flushUncountedAndRemoveFinalizers(ctx context.Context, syncJobContext *syncJobContext, podsToRemoveFinalizer []*v1.Pod, uidsWithFinalizer sets.Set[string], oldCounters *batch.JobStatus, needsFlush bool) (*batch.Job, bool, error) {
 | 
			
		||||
	logger := klog.FromContext(ctx)
 | 
			
		||||
	job := syncJobContext.job
 | 
			
		||||
	newBackoffRecord := syncJobContext.newBackoffRecord
 | 
			
		||||
	podFailureCountByPolicyAction := syncJobContext.podFailureCountByPolicyAction
 | 
			
		||||
	var err error
 | 
			
		||||
	if needsFlush {
 | 
			
		||||
		if job, err = jm.updateStatusHandler(ctx, job); err != nil {
 | 
			
		||||
@@ -1337,11 +1369,12 @@ func getFailJobMessage(job *batch.Job, pods []*v1.Pod) *string {
 | 
			
		||||
 | 
			
		||||
// getNewFinishedPods returns the list of newly succeeded and failed pods that are not accounted
 | 
			
		||||
// in the job status. The list of failed pods can be affected by the podFailurePolicy.
 | 
			
		||||
func getNewFinishedPods(job *batch.Job, pods []*v1.Pod, uncounted *uncountedTerminatedPods, expectedRmFinalizers sets.Set[string]) (succeededPods, failedPods []*v1.Pod) {
 | 
			
		||||
	succeededPods = getValidPodsWithFilter(job, pods, uncounted.Succeeded(), expectedRmFinalizers, func(p *v1.Pod) bool {
 | 
			
		||||
func getNewFinishedPods(syncJobContext *syncJobContext) (succeededPods, failedPods []*v1.Pod) {
 | 
			
		||||
	job := syncJobContext.job
 | 
			
		||||
	succeededPods = getValidPodsWithFilter(syncJobContext, syncJobContext.uncounted.Succeeded(), func(p *v1.Pod) bool {
 | 
			
		||||
		return p.Status.Phase == v1.PodSucceeded
 | 
			
		||||
	})
 | 
			
		||||
	failedPods = getValidPodsWithFilter(job, pods, uncounted.Failed(), expectedRmFinalizers, func(p *v1.Pod) bool {
 | 
			
		||||
	failedPods = getValidPodsWithFilter(syncJobContext, syncJobContext.uncounted.Failed(), func(p *v1.Pod) bool {
 | 
			
		||||
		if feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) && job.Spec.PodFailurePolicy != nil {
 | 
			
		||||
			if !isPodFailed(p, job) {
 | 
			
		||||
				return false
 | 
			
		||||
@@ -1365,8 +1398,14 @@ func jobSuspended(job *batch.Job) bool {
 | 
			
		||||
// pods according to what is specified in the job.Spec.
 | 
			
		||||
// Respects back-off; does not create new pods if the back-off time has not passed
 | 
			
		||||
// Does NOT modify <activePods>.
 | 
			
		||||
func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, activePods []*v1.Pod, succeeded int32, succeededIndexes []interval, newBackoffRecord backoffRecord) (int32, string, error) {
 | 
			
		||||
func (jm *Controller) manageJob(ctx context.Context, syncJobContext *syncJobContext) (int32, string, error) {
 | 
			
		||||
	logger := klog.FromContext(ctx)
 | 
			
		||||
	job := syncJobContext.job
 | 
			
		||||
	activePods := syncJobContext.activePods
 | 
			
		||||
	succeeded := syncJobContext.succeeded
 | 
			
		||||
	succeededIndexes := syncJobContext.succeededIndexes
 | 
			
		||||
	newBackoffRecord := syncJobContext.newBackoffRecord
 | 
			
		||||
 | 
			
		||||
	active := int32(len(activePods))
 | 
			
		||||
	parallelism := *job.Spec.Parallelism
 | 
			
		||||
	jobKey, err := controller.KeyFunc(job)
 | 
			
		||||
@@ -1561,7 +1600,10 @@ func (jm *Controller) patchJob(ctx context.Context, job *batch.Job, data []byte)
 | 
			
		||||
// getValidPodsWithFilter returns the valid pods that pass the filter.
 | 
			
		||||
// Pods are valid if they have a finalizer or in uncounted set
 | 
			
		||||
// and, for Indexed Jobs, a valid completion index.
 | 
			
		||||
func getValidPodsWithFilter(job *batch.Job, pods []*v1.Pod, uncounted sets.Set[string], expectedRmFinalizers sets.Set[string], filter func(*v1.Pod) bool) []*v1.Pod {
 | 
			
		||||
func getValidPodsWithFilter(synJobContext *syncJobContext, uncounted sets.Set[string], filter func(*v1.Pod) bool) []*v1.Pod {
 | 
			
		||||
	job := synJobContext.job
 | 
			
		||||
	pods := synJobContext.pods
 | 
			
		||||
	expectedRmFinalizers := synJobContext.expectedRmFinalizers
 | 
			
		||||
	var result []*v1.Pod
 | 
			
		||||
	for _, p := range pods {
 | 
			
		||||
		uid := string(p.UID)
 | 
			
		||||
 
 | 
			
		||||
@@ -1079,7 +1079,8 @@ func TestGetNewFinshedPods(t *testing.T) {
 | 
			
		||||
	for name, tc := range cases {
 | 
			
		||||
		t.Run(name, func(t *testing.T) {
 | 
			
		||||
			uncounted := newUncountedTerminatedPods(*tc.job.Status.UncountedTerminatedPods)
 | 
			
		||||
			succeededPods, failedPods := getNewFinishedPods(&tc.job, tc.pods, uncounted, tc.expectedRmFinalizers)
 | 
			
		||||
			syncJobContext := &syncJobContext{job: &tc.job, pods: tc.pods, uncounted: uncounted, expectedRmFinalizers: tc.expectedRmFinalizers}
 | 
			
		||||
			succeededPods, failedPods := getNewFinishedPods(syncJobContext)
 | 
			
		||||
			succeeded := int32(len(succeededPods)) + tc.job.Status.Succeeded + int32(len(uncounted.succeeded))
 | 
			
		||||
			failed := int32(len(failedPods)) + tc.job.Status.Failed + int32(len(uncounted.failed))
 | 
			
		||||
			if succeeded != tc.wantSucceeded {
 | 
			
		||||
@@ -1654,7 +1655,16 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) {
 | 
			
		||||
			if isIndexedJob(job) {
 | 
			
		||||
				succeededIndexes = succeededIndexesFromString(logger, job.Status.CompletedIndexes, int(*job.Spec.Completions))
 | 
			
		||||
			}
 | 
			
		||||
			err := manager.trackJobStatusAndRemoveFinalizers(context.TODO(), job, tc.pods, succeededIndexes, *uncounted, tc.expectedRmFinalizers, tc.finishedCond, tc.needsFlush, backoffRecord{})
 | 
			
		||||
			syncJobContext := &syncJobContext{
 | 
			
		||||
				job:                  job,
 | 
			
		||||
				pods:                 tc.pods,
 | 
			
		||||
				succeededIndexes:     succeededIndexes,
 | 
			
		||||
				uncounted:            uncounted,
 | 
			
		||||
				expectedRmFinalizers: tc.expectedRmFinalizers,
 | 
			
		||||
				finishedCondition:    tc.finishedCond,
 | 
			
		||||
				newBackoffRecord:     backoffRecord{},
 | 
			
		||||
			}
 | 
			
		||||
			err := manager.trackJobStatusAndRemoveFinalizers(context.TODO(), syncJobContext, tc.needsFlush)
 | 
			
		||||
			if !errors.Is(err, tc.wantErr) {
 | 
			
		||||
				t.Errorf("Got error %v, want %v", err, tc.wantErr)
 | 
			
		||||
			}
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user