Merge pull request #119944 from Sharpz7/jm/backup-finalizers
Adding backup code for removing finalizers to more Job End States.
This commit is contained in:
@@ -467,7 +467,12 @@ func (jm *Controller) updateJob(logger klog.Logger, old, cur interface{}) {
|
|||||||
} else {
|
} else {
|
||||||
// Trigger immediate sync when spec is changed.
|
// Trigger immediate sync when spec is changed.
|
||||||
jm.enqueueSyncJobImmediately(logger, curJob)
|
jm.enqueueSyncJobImmediately(logger, curJob)
|
||||||
|
}
|
||||||
|
|
||||||
|
// The job shouldn't be marked as finished until all pod finalizers are removed.
|
||||||
|
// This is a backup operation in this case.
|
||||||
|
if IsJobFinished(curJob) {
|
||||||
|
jm.cleanupPodFinalizers(curJob)
|
||||||
}
|
}
|
||||||
|
|
||||||
// check if need to add a new rsync for ActiveDeadlineSeconds
|
// check if need to add a new rsync for ActiveDeadlineSeconds
|
||||||
@@ -504,18 +509,7 @@ func (jm *Controller) deleteJob(logger klog.Logger, obj interface{}) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Listing pods shouldn't really fail, as we are just querying the informer cache.
|
jm.cleanupPodFinalizers(jobObj)
|
||||||
selector, err := metav1.LabelSelectorAsSelector(jobObj.Spec.Selector)
|
|
||||||
if err != nil {
|
|
||||||
utilruntime.HandleError(fmt.Errorf("parsing deleted job selector: %v", err))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
pods, _ := jm.podStore.Pods(jobObj.Namespace).List(selector)
|
|
||||||
for _, pod := range pods {
|
|
||||||
if metav1.IsControlledBy(pod, jobObj) && hasJobTrackingFinalizer(pod) {
|
|
||||||
jm.enqueueOrphanPod(pod)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// enqueueSyncJobImmediately tells the Job controller to invoke syncJob
|
// enqueueSyncJobImmediately tells the Job controller to invoke syncJob
|
||||||
@@ -1879,3 +1873,18 @@ func onlyReplaceFailedPods(job *batch.Job) bool {
|
|||||||
}
|
}
|
||||||
return feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) && job.Spec.PodFailurePolicy != nil
|
return feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) && job.Spec.PodFailurePolicy != nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (jm *Controller) cleanupPodFinalizers(job *batch.Job) {
|
||||||
|
// Listing pods shouldn't really fail, as we are just querying the informer cache.
|
||||||
|
selector, err := metav1.LabelSelectorAsSelector(job.Spec.Selector)
|
||||||
|
if err != nil {
|
||||||
|
utilruntime.HandleError(fmt.Errorf("parsing deleted job selector: %v", err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
pods, _ := jm.podStore.Pods(job.Namespace).List(selector)
|
||||||
|
for _, pod := range pods {
|
||||||
|
if metav1.IsControlledBy(pod, job) && hasJobTrackingFinalizer(pod) {
|
||||||
|
jm.enqueueOrphanPod(pod)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@@ -5172,6 +5172,64 @@ func TestFinalizersRemovedExpectations(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestFinalizerCleanup(t *testing.T) {
|
||||||
|
_, ctx := ktesting.NewTestContext(t)
|
||||||
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
clientset := fake.NewSimpleClientset()
|
||||||
|
sharedInformers := informers.NewSharedInformerFactory(clientset, controller.NoResyncPeriodFunc())
|
||||||
|
manager := NewController(ctx, sharedInformers.Core().V1().Pods(), sharedInformers.Batch().V1().Jobs(), clientset)
|
||||||
|
manager.podStoreSynced = alwaysReady
|
||||||
|
manager.jobStoreSynced = alwaysReady
|
||||||
|
|
||||||
|
// Initialize the controller with 0 workers to make sure the
|
||||||
|
// pod finalizers are not removed by the "syncJob" function.
|
||||||
|
go manager.Run(ctx, 0)
|
||||||
|
|
||||||
|
// Start the Pod and Job informers.
|
||||||
|
sharedInformers.Start(ctx.Done())
|
||||||
|
sharedInformers.WaitForCacheSync(ctx.Done())
|
||||||
|
|
||||||
|
// Create a simple Job
|
||||||
|
job := newJob(1, 1, 1, batch.NonIndexedCompletion)
|
||||||
|
job, err := clientset.BatchV1().Jobs(job.GetNamespace()).Create(ctx, job, metav1.CreateOptions{})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Creating job: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create a Pod with the job tracking finalizer
|
||||||
|
pod := newPod("test-pod", job)
|
||||||
|
pod.Finalizers = append(pod.Finalizers, batch.JobTrackingFinalizer)
|
||||||
|
pod, err = clientset.CoreV1().Pods(pod.GetNamespace()).Create(ctx, pod, metav1.CreateOptions{})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Creating pod: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Mark Job as complete.
|
||||||
|
job.Status.Conditions = append(job.Status.Conditions, batch.JobCondition{
|
||||||
|
Type: batch.JobComplete,
|
||||||
|
Status: v1.ConditionTrue,
|
||||||
|
})
|
||||||
|
_, err = clientset.BatchV1().Jobs(job.GetNamespace()).UpdateStatus(ctx, job, metav1.UpdateOptions{})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Updating job status: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify the pod finalizer is removed for a finished Job,
|
||||||
|
// even if the jobs pods are not tracked by the main reconciliation loop.
|
||||||
|
if err := wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, wait.ForeverTestTimeout, true, func(ctx context.Context) (bool, error) {
|
||||||
|
p, err := clientset.CoreV1().Pods(pod.Namespace).Get(ctx, pod.Name, metav1.GetOptions{})
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
return !hasJobTrackingFinalizer(p), nil
|
||||||
|
}); err != nil {
|
||||||
|
t.Errorf("Waiting for Pod to get the finalizer removed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
func checkJobCompletionLabel(t *testing.T, p *v1.PodTemplateSpec) {
|
func checkJobCompletionLabel(t *testing.T, p *v1.PodTemplateSpec) {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
labels := p.GetLabels()
|
labels := p.GetLabels()
|
||||||
|
Reference in New Issue
Block a user