|
|
|
@@ -19,7 +19,6 @@ package job
|
|
|
|
|
import (
|
|
|
|
|
"context"
|
|
|
|
|
"fmt"
|
|
|
|
|
"math"
|
|
|
|
|
"reflect"
|
|
|
|
|
"sort"
|
|
|
|
|
"sync"
|
|
|
|
@@ -57,9 +56,8 @@ import (
|
|
|
|
|
"k8s.io/utils/pointer"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
// podUpdateBatchPeriod is the batch period to hold pod updates before syncing
|
|
|
|
|
// a Job. It is used if the feature gate JobReadyPods is enabled.
|
|
|
|
|
const podUpdateBatchPeriod = time.Second
|
|
|
|
|
// syncJobBatchPeriod is the batch period for controller sync invocations for a Job.
|
|
|
|
|
const syncJobBatchPeriod = time.Second
|
|
|
|
|
|
|
|
|
|
const (
|
|
|
|
|
// PodFailurePolicy reason indicates a job failure condition is added due to
|
|
|
|
@@ -123,11 +121,13 @@ type Controller struct {
|
|
|
|
|
broadcaster record.EventBroadcaster
|
|
|
|
|
recorder record.EventRecorder
|
|
|
|
|
|
|
|
|
|
podUpdateBatchPeriod time.Duration
|
|
|
|
|
syncJobBatchPeriod time.Duration
|
|
|
|
|
|
|
|
|
|
clock clock.WithTicker
|
|
|
|
|
|
|
|
|
|
backoffRecordStore *backoffStore
|
|
|
|
|
// Store with information to compute the expotential backoff delay for pod
|
|
|
|
|
// recreation in case of pod failures.
|
|
|
|
|
podBackoffStore *backoffStore
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// NewController creates a new Job controller that keeps the relevant pods
|
|
|
|
@@ -153,15 +153,13 @@ func newControllerWithClock(ctx context.Context, podInformer coreinformers.PodIn
|
|
|
|
|
broadcaster: eventBroadcaster,
|
|
|
|
|
recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "job-controller"}),
|
|
|
|
|
clock: clock,
|
|
|
|
|
backoffRecordStore: newBackoffRecordStore(),
|
|
|
|
|
}
|
|
|
|
|
if feature.DefaultFeatureGate.Enabled(features.JobReadyPods) {
|
|
|
|
|
jm.podUpdateBatchPeriod = podUpdateBatchPeriod
|
|
|
|
|
podBackoffStore: newBackoffStore(),
|
|
|
|
|
}
|
|
|
|
|
jm.syncJobBatchPeriod = syncJobBatchPeriod
|
|
|
|
|
|
|
|
|
|
jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
|
|
|
|
AddFunc: func(obj interface{}) {
|
|
|
|
|
jm.enqueueController(logger, obj, true)
|
|
|
|
|
jm.enqueueSyncJobImmediately(logger, obj)
|
|
|
|
|
},
|
|
|
|
|
UpdateFunc: func(oldObj, newObj interface{}) {
|
|
|
|
|
jm.updateJob(logger, oldObj, newObj)
|
|
|
|
@@ -286,7 +284,7 @@ func (jm *Controller) addPod(logger klog.Logger, obj interface{}) {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
jm.expectations.CreationObserved(jobKey)
|
|
|
|
|
jm.enqueueControllerPodUpdate(logger, job, true)
|
|
|
|
|
jm.enqueueSyncJobBatched(logger, job)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@@ -300,7 +298,7 @@ func (jm *Controller) addPod(logger klog.Logger, obj interface{}) {
|
|
|
|
|
// DO NOT observe creation because no controller should be waiting for an
|
|
|
|
|
// orphan.
|
|
|
|
|
for _, job := range jm.getPodJobs(pod) {
|
|
|
|
|
jm.enqueueControllerPodUpdate(logger, job, true)
|
|
|
|
|
jm.enqueueSyncJobBatched(logger, job)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@@ -325,11 +323,6 @@ func (jm *Controller) updatePod(logger klog.Logger, old, cur interface{}) {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// the only time we want the backoff to kick-in, is when the pod failed for the first time.
|
|
|
|
|
// we don't want to re-calculate backoff for an update event when the tracking finalizer
|
|
|
|
|
// for a failed pod is removed.
|
|
|
|
|
immediate := !(curPod.Status.Phase == v1.PodFailed && oldPod.Status.Phase != v1.PodFailed)
|
|
|
|
|
|
|
|
|
|
// Don't check if oldPod has the finalizer, as during ownership transfer
|
|
|
|
|
// finalizers might be re-added and removed again in behalf of the new owner.
|
|
|
|
|
// If all those Pod updates collapse into a single event, the finalizer
|
|
|
|
@@ -348,7 +341,7 @@ func (jm *Controller) updatePod(logger klog.Logger, old, cur interface{}) {
|
|
|
|
|
jm.finalizerExpectations.finalizerRemovalObserved(logger, key, string(curPod.UID))
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
jm.enqueueControllerPodUpdate(logger, job, immediate)
|
|
|
|
|
jm.enqueueSyncJobBatched(logger, job)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@@ -364,7 +357,7 @@ func (jm *Controller) updatePod(logger klog.Logger, old, cur interface{}) {
|
|
|
|
|
jm.finalizerExpectations.finalizerRemovalObserved(logger, key, string(curPod.UID))
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
jm.enqueueControllerPodUpdate(logger, job, immediate)
|
|
|
|
|
jm.enqueueSyncJobBatched(logger, job)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@@ -378,7 +371,7 @@ func (jm *Controller) updatePod(logger klog.Logger, old, cur interface{}) {
|
|
|
|
|
labelChanged := !reflect.DeepEqual(curPod.Labels, oldPod.Labels)
|
|
|
|
|
if labelChanged || controllerRefChanged {
|
|
|
|
|
for _, job := range jm.getPodJobs(curPod) {
|
|
|
|
|
jm.enqueueControllerPodUpdate(logger, job, immediate)
|
|
|
|
|
jm.enqueueSyncJobBatched(logger, job)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@@ -438,7 +431,7 @@ func (jm *Controller) deletePod(logger klog.Logger, obj interface{}, final bool)
|
|
|
|
|
jm.finalizerExpectations.finalizerRemovalObserved(logger, jobKey, string(pod.UID))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
jm.enqueueControllerPodUpdate(logger, job, true)
|
|
|
|
|
jm.enqueueSyncJobBatched(logger, job)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (jm *Controller) updateJob(logger klog.Logger, old, cur interface{}) {
|
|
|
|
@@ -454,10 +447,11 @@ func (jm *Controller) updateJob(logger klog.Logger, old, cur interface{}) {
|
|
|
|
|
if curJob.Generation == oldJob.Generation {
|
|
|
|
|
// Delay the Job sync when no generation change to batch Job status updates,
|
|
|
|
|
// typically triggered by pod events.
|
|
|
|
|
jm.enqueueControllerPodUpdate(logger, curJob, true)
|
|
|
|
|
jm.enqueueSyncJobBatched(logger, curJob)
|
|
|
|
|
} else {
|
|
|
|
|
// Trigger immediate sync when spec is changed.
|
|
|
|
|
jm.enqueueController(logger, curJob, true)
|
|
|
|
|
jm.enqueueSyncJobImmediately(logger, curJob)
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// check if need to add a new rsync for ActiveDeadlineSeconds
|
|
|
|
@@ -480,7 +474,7 @@ func (jm *Controller) updateJob(logger klog.Logger, old, cur interface{}) {
|
|
|
|
|
// deleteJob enqueues the job and all the pods associated with it that still
|
|
|
|
|
// have a finalizer.
|
|
|
|
|
func (jm *Controller) deleteJob(logger klog.Logger, obj interface{}) {
|
|
|
|
|
jm.enqueueController(logger, obj, true)
|
|
|
|
|
jm.enqueueSyncJobImmediately(logger, obj)
|
|
|
|
|
jobObj, ok := obj.(*batch.Job)
|
|
|
|
|
if !ok {
|
|
|
|
|
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
|
|
|
|
@@ -508,31 +502,41 @@ func (jm *Controller) deleteJob(logger klog.Logger, obj interface{}) {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// obj could be an *batch.Job, or a DeletionFinalStateUnknown marker item,
|
|
|
|
|
// immediate tells the controller to update the status right away, and should
|
|
|
|
|
// happen ONLY when there was a successful pod run.
|
|
|
|
|
func (jm *Controller) enqueueController(logger klog.Logger, obj interface{}, immediate bool) {
|
|
|
|
|
jm.enqueueControllerDelayed(logger, obj, immediate, 0)
|
|
|
|
|
// enqueueSyncJobImmediately tells the Job controller to invoke syncJob
|
|
|
|
|
// immediately.
|
|
|
|
|
// It is only used for Job events (creation, deletion, spec update).
|
|
|
|
|
// obj could be an *batch.Job, or a DeletionFinalStateUnknown marker item.
|
|
|
|
|
func (jm *Controller) enqueueSyncJobImmediately(logger klog.Logger, obj interface{}) {
|
|
|
|
|
jm.enqueueSyncJobInternal(logger, obj, 0)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (jm *Controller) enqueueControllerPodUpdate(logger klog.Logger, obj interface{}, immediate bool) {
|
|
|
|
|
jm.enqueueControllerDelayed(logger, obj, immediate, jm.podUpdateBatchPeriod)
|
|
|
|
|
// enqueueSyncJobBatched tells the controller to invoke syncJob with a
|
|
|
|
|
// constant batching delay.
|
|
|
|
|
// It is used for:
|
|
|
|
|
// - Pod events (creation, deletion, update)
|
|
|
|
|
// - Job status update
|
|
|
|
|
// obj could be an *batch.Job, or a DeletionFinalStateUnknown marker item.
|
|
|
|
|
func (jm *Controller) enqueueSyncJobBatched(logger klog.Logger, obj interface{}) {
|
|
|
|
|
jm.enqueueSyncJobInternal(logger, obj, jm.syncJobBatchPeriod)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (jm *Controller) enqueueControllerDelayed(logger klog.Logger, obj interface{}, immediate bool, delay time.Duration) {
|
|
|
|
|
// enqueueSyncJobWithDelay tells the controller to invoke syncJob with a
|
|
|
|
|
// custom delay, but not smaller than the batching delay.
|
|
|
|
|
// It is used when pod recreations are delayed due to pod failures.
|
|
|
|
|
// obj could be an *batch.Job, or a DeletionFinalStateUnknown marker item.
|
|
|
|
|
func (jm *Controller) enqueueSyncJobWithDelay(logger klog.Logger, obj interface{}, delay time.Duration) {
|
|
|
|
|
if delay < jm.syncJobBatchPeriod {
|
|
|
|
|
delay = jm.syncJobBatchPeriod
|
|
|
|
|
}
|
|
|
|
|
jm.enqueueSyncJobInternal(logger, obj, delay)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (jm *Controller) enqueueSyncJobInternal(logger klog.Logger, obj interface{}, delay time.Duration) {
|
|
|
|
|
key, err := controller.KeyFunc(obj)
|
|
|
|
|
if err != nil {
|
|
|
|
|
utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
backoff := delay
|
|
|
|
|
if !immediate {
|
|
|
|
|
if calculatedBackoff := getBackoff(jm.queue, key); calculatedBackoff > 0 {
|
|
|
|
|
backoff = calculatedBackoff
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// TODO: Handle overlapping controllers better. Either disallow them at admission time or
|
|
|
|
|
// deterministically avoid syncing controllers that fight over pods. Currently, we only
|
|
|
|
|
// ensure that the same controller is synced for a given pod. When we periodically relist
|
|
|
|
@@ -540,7 +544,7 @@ func (jm *Controller) enqueueControllerDelayed(logger klog.Logger, obj interface
|
|
|
|
|
// by querying the store for all controllers that this rc overlaps, as well as all
|
|
|
|
|
// controllers that overlap this rc, and sorting them.
|
|
|
|
|
logger.Info("enqueueing job", "key", key)
|
|
|
|
|
jm.queue.AddAfter(key, backoff)
|
|
|
|
|
jm.queue.AddAfter(key, delay)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (jm *Controller) enqueueOrphanPod(obj *v1.Pod) {
|
|
|
|
@@ -711,7 +715,7 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
|
|
|
|
|
jm.expectations.DeleteExpectations(key)
|
|
|
|
|
jm.finalizerExpectations.deleteExpectations(logger, key)
|
|
|
|
|
|
|
|
|
|
err := jm.backoffRecordStore.removeBackoffRecord(key)
|
|
|
|
|
err := jm.podBackoffStore.removeBackoffRecord(key)
|
|
|
|
|
if err != nil {
|
|
|
|
|
// re-syncing here as the record has to be removed for finished/deleted jobs
|
|
|
|
|
return fmt.Errorf("error removing backoff record %w", err)
|
|
|
|
@@ -725,7 +729,7 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
|
|
|
|
|
|
|
|
|
|
// if job was finished previously, we don't want to redo the termination
|
|
|
|
|
if IsJobFinished(&job) {
|
|
|
|
|
err := jm.backoffRecordStore.removeBackoffRecord(key)
|
|
|
|
|
err := jm.podBackoffStore.removeBackoffRecord(key)
|
|
|
|
|
if err != nil {
|
|
|
|
|
// re-syncing here as the record has to be removed for finished/deleted jobs
|
|
|
|
|
return fmt.Errorf("error removing backoff record %w", err)
|
|
|
|
@@ -783,7 +787,7 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
|
|
|
|
|
job.Status.StartTime = &now
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
newBackoffInfo := jm.backoffRecordStore.newBackoffRecord(key, newSucceededPods, newFailedPods)
|
|
|
|
|
newBackoffRecord := jm.podBackoffStore.newBackoffRecord(key, newSucceededPods, newFailedPods)
|
|
|
|
|
|
|
|
|
|
var manageJobErr error
|
|
|
|
|
var finishedCondition *batch.JobCondition
|
|
|
|
@@ -836,7 +840,7 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
|
|
|
|
|
} else {
|
|
|
|
|
manageJobCalled := false
|
|
|
|
|
if satisfiedExpectations && job.DeletionTimestamp == nil {
|
|
|
|
|
active, action, manageJobErr = jm.manageJob(ctx, &job, activePods, succeeded, succeededIndexes, newBackoffInfo)
|
|
|
|
|
active, action, manageJobErr = jm.manageJob(ctx, &job, activePods, succeeded, succeededIndexes, newBackoffRecord)
|
|
|
|
|
manageJobCalled = true
|
|
|
|
|
}
|
|
|
|
|
complete := false
|
|
|
|
@@ -892,14 +896,8 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
|
|
|
|
|
needsStatusUpdate := suspendCondChanged || active != job.Status.Active || !equalReady(ready, job.Status.Ready)
|
|
|
|
|
job.Status.Active = active
|
|
|
|
|
job.Status.Ready = ready
|
|
|
|
|
err = jm.trackJobStatusAndRemoveFinalizers(ctx, &job, pods, prevSucceededIndexes, *uncounted, expectedRmFinalizers, finishedCondition, needsStatusUpdate, newBackoffInfo)
|
|
|
|
|
err = jm.trackJobStatusAndRemoveFinalizers(ctx, &job, pods, prevSucceededIndexes, *uncounted, expectedRmFinalizers, finishedCondition, needsStatusUpdate, newBackoffRecord)
|
|
|
|
|
if err != nil {
|
|
|
|
|
if apierrors.IsConflict(err) {
|
|
|
|
|
// we probably have a stale informer cache
|
|
|
|
|
// so don't return an error to avoid backoff
|
|
|
|
|
jm.enqueueController(logger, &job, false)
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
return fmt.Errorf("tracking status: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@@ -1120,7 +1118,7 @@ func (jm *Controller) flushUncountedAndRemoveFinalizers(ctx context.Context, job
|
|
|
|
|
return job, needsFlush, fmt.Errorf("adding uncounted pods to status: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
err = jm.backoffRecordStore.updateBackoffRecord(newBackoffRecord)
|
|
|
|
|
err = jm.podBackoffStore.updateBackoffRecord(newBackoffRecord)
|
|
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
// this error might undercount the backoff.
|
|
|
|
@@ -1376,7 +1374,7 @@ func jobSuspended(job *batch.Job) bool {
|
|
|
|
|
// pods according to what is specified in the job.Spec.
|
|
|
|
|
// Respects back-off; does not create new pods if the back-off time has not passed
|
|
|
|
|
// Does NOT modify <activePods>.
|
|
|
|
|
func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, activePods []*v1.Pod, succeeded int32, succeededIndexes []interval, backoff backoffRecord) (int32, string, error) {
|
|
|
|
|
func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, activePods []*v1.Pod, succeeded int32, succeededIndexes []interval, newBackoffRecord backoffRecord) (int32, string, error) {
|
|
|
|
|
logger := klog.FromContext(ctx)
|
|
|
|
|
active := int32(len(activePods))
|
|
|
|
|
parallelism := *job.Spec.Parallelism
|
|
|
|
@@ -1438,9 +1436,9 @@ func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, activePods
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if active < wantActive {
|
|
|
|
|
remainingTime := backoff.getRemainingTime(jm.clock, DefaultJobBackOff, MaxJobBackOff)
|
|
|
|
|
remainingTime := newBackoffRecord.getRemainingTime(jm.clock, DefaultJobBackOff, MaxJobBackOff)
|
|
|
|
|
if remainingTime > 0 {
|
|
|
|
|
jm.enqueueControllerDelayed(logger, job, true, remainingTime)
|
|
|
|
|
jm.enqueueSyncJobWithDelay(logger, job, remainingTime)
|
|
|
|
|
return 0, metrics.JobSyncActionPodsCreated, nil
|
|
|
|
|
}
|
|
|
|
|
diff := wantActive - active
|
|
|
|
@@ -1569,26 +1567,6 @@ func (jm *Controller) patchJob(ctx context.Context, job *batch.Job, data []byte)
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func getBackoff(queue workqueue.RateLimitingInterface, key interface{}) time.Duration {
|
|
|
|
|
exp := queue.NumRequeues(key)
|
|
|
|
|
|
|
|
|
|
if exp <= 0 {
|
|
|
|
|
return time.Duration(0)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// The backoff is capped such that 'calculated' value never overflows.
|
|
|
|
|
backoff := float64(DefaultJobBackOff.Nanoseconds()) * math.Pow(2, float64(exp-1))
|
|
|
|
|
if backoff > math.MaxInt64 {
|
|
|
|
|
return MaxJobBackOff
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
calculated := time.Duration(backoff)
|
|
|
|
|
if calculated > MaxJobBackOff {
|
|
|
|
|
return MaxJobBackOff
|
|
|
|
|
}
|
|
|
|
|
return calculated
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// getValidPodsWithFilter returns the valid pods that pass the filter.
|
|
|
|
|
// Pods are valid if they have a finalizer or in uncounted set
|
|
|
|
|
// and, for Indexed Jobs, a valid completion index.
|
|
|
|
|