Merge pull request #125510 from mimowo/extend-job-conditions

Delay setting terminal Job conditions until all pods are terminal
This commit is contained in:
Kubernetes Prow Robot
2024-07-12 08:12:46 -07:00
committed by GitHub
9 changed files with 1424 additions and 82 deletions

View File

@@ -21,7 +21,6 @@ import (
"encoding/json"
"fmt"
"strconv"
"time"
batchv1 "k8s.io/api/batch/v1"
v1 "k8s.io/api/core/v1"
@@ -35,9 +34,7 @@ import (
"k8s.io/apimachinery/pkg/types"
utilrand "k8s.io/apimachinery/pkg/util/rand"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
watchtools "k8s.io/client-go/tools/watch"
"k8s.io/client-go/util/retry"
@@ -705,9 +702,20 @@ done`}
job, err = e2ejob.CreateJob(ctx, f.ClientSet, f.Namespace.Name, job)
framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name)
ginkgo.By("Awaiting for the job to have the interim success condition")
err = e2ejob.WaitForJobCondition(ctx, f.ClientSet, f.Namespace.Name, job.Name, batchv1.JobSuccessCriteriaMet, "")
framework.ExpectNoError(err, "failed to ensure job has the interim success condition: %s", f.Namespace.Name)
ginkgo.By("Ensuring job reaches completions")
err = e2ejob.WaitForJobComplete(ctx, f.ClientSet, f.Namespace.Name, job.Name, *job.Spec.Completions)
framework.ExpectNoError(err, "failed to ensure job completion in namespace: %s", f.Namespace.Name)
ginkgo.By("Verifying the Job status fields to ensure correct final state")
job, err = e2ejob.GetJob(ctx, f.ClientSet, f.Namespace.Name, job.Name)
framework.ExpectNoError(err, "failed to retrieve latest job object")
gomega.Expect(job.Status.Active).Should(gomega.Equal(int32(0)))
gomega.Expect(job.Status.Ready).Should(gomega.Equal(ptr.To[int32](0)))
gomega.Expect(job.Status.Terminating).Should(gomega.Equal(ptr.To[int32](0)))
})
ginkgo.It("should fail when exceeds active deadline", func(ctx context.Context) {
@@ -720,9 +728,21 @@ done`}
job := e2ejob.NewTestJob("notTerminate", "exceed-active-deadline", v1.RestartPolicyNever, parallelism, completions, &activeDeadlineSeconds, backoffLimit)
job, err := e2ejob.CreateJob(ctx, f.ClientSet, f.Namespace.Name, job)
framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name)
ginkgo.By("Awaiting for the job to have the interim failure condition")
err = e2ejob.WaitForJobCondition(ctx, f.ClientSet, f.Namespace.Name, job.Name, batchv1.JobFailureTarget, batchv1.JobReasonDeadlineExceeded)
framework.ExpectNoError(err, "failed to ensure job has the interim failure condition: %s", f.Namespace.Name)
ginkgo.By("Ensuring job past active deadline")
err = waitForJobFailure(ctx, f.ClientSet, f.Namespace.Name, job.Name, time.Duration(activeDeadlineSeconds+15)*time.Second, "DeadlineExceeded")
err = e2ejob.WaitForJobCondition(ctx, f.ClientSet, f.Namespace.Name, job.Name, batchv1.JobFailed, batchv1.JobReasonDeadlineExceeded)
framework.ExpectNoError(err, "failed to ensure job past active deadline in namespace: %s", f.Namespace.Name)
ginkgo.By("Verifying the Job status fields to ensure correct final state")
job, err = e2ejob.GetJob(ctx, f.ClientSet, f.Namespace.Name, job.Name)
framework.ExpectNoError(err, "failed to retrieve latest job object")
gomega.Expect(job.Status.Active).Should(gomega.Equal(int32(0)))
gomega.Expect(job.Status.Ready).Should(gomega.Equal(ptr.To[int32](0)))
gomega.Expect(job.Status.Terminating).Should(gomega.Equal(ptr.To[int32](0)))
})
/*
@@ -823,9 +843,13 @@ done`}
job := e2ejob.NewTestJob("fail", "backofflimit", v1.RestartPolicyNever, 1, 1, nil, int32(backoff))
job, err := e2ejob.CreateJob(ctx, f.ClientSet, f.Namespace.Name, job)
framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name)
ginkgo.By("Ensuring job exceed backofflimit")
err = waitForJobFailure(ctx, f.ClientSet, f.Namespace.Name, job.Name, e2ejob.JobTimeout, "BackoffLimitExceeded")
ginkgo.By("Awaiting for the job to have the interim failure condition")
err = e2ejob.WaitForJobCondition(ctx, f.ClientSet, f.Namespace.Name, job.Name, batchv1.JobFailureTarget, batchv1.JobReasonBackoffLimitExceeded)
framework.ExpectNoError(err, "failed to ensure job has the interim failure condition: %s", f.Namespace.Name)
ginkgo.By("Ensuring job exceed backofflimit")
err = e2ejob.WaitForJobCondition(ctx, f.ClientSet, f.Namespace.Name, job.Name, batchv1.JobFailed, batchv1.JobReasonBackoffLimitExceeded)
framework.ExpectNoError(err, "failed to ensure job exceed backofflimit in namespace: %s", f.Namespace.Name)
ginkgo.By(fmt.Sprintf("Checking that %d pod created and status is failed", backoff+1))
@@ -835,6 +859,13 @@ done`}
for _, pod := range pods.Items {
gomega.Expect(pod.Status.Phase).To(gomega.Equal(v1.PodFailed))
}
ginkgo.By("Verifying the Job status fields to ensure correct final state")
job, err = e2ejob.GetJob(ctx, f.ClientSet, f.Namespace.Name, job.Name)
framework.ExpectNoError(err, "failed to retrieve latest job object")
gomega.Expect(job.Status.Active).Should(gomega.Equal(int32(0)))
gomega.Expect(job.Status.Ready).Should(gomega.Equal(ptr.To[int32](0)))
gomega.Expect(job.Status.Terminating).Should(gomega.Equal(ptr.To[int32](0)))
})
f.It("should run a job to completion with CPU requests", f.WithSerial(), func(ctx context.Context) {
@@ -1186,24 +1217,6 @@ func waitForJobEvent(ctx context.Context, config watchEventConfig) {
}
}
// waitForJobFailure uses c to wait for up to timeout for the Job named jobName in namespace ns to fail.
func waitForJobFailure(ctx context.Context, c clientset.Interface, ns, jobName string, timeout time.Duration, reason string) error {
return wait.Poll(framework.Poll, timeout, func() (bool, error) {
curr, err := c.BatchV1().Jobs(ns).Get(ctx, jobName, metav1.GetOptions{})
if err != nil {
return false, err
}
for _, c := range curr.Status.Conditions {
if c.Type == batchv1.JobFailed && c.Status == v1.ConditionTrue {
if reason == "" || reason == c.Reason {
return true, nil
}
}
}
return false, nil
})
}
func findConditionByType(list []batchv1.JobCondition, cType batchv1.JobConditionType) *batchv1.JobCondition {
for i := range list {
if list[i].Type == cType {

View File

@@ -69,13 +69,16 @@ func waitForJobPodsInPhase(ctx context.Context, c clientset.Interface, ns, jobNa
// WaitForJobComplete uses c to wait for completions to complete for the Job jobName in namespace ns.
func WaitForJobComplete(ctx context.Context, c clientset.Interface, ns, jobName string, completions int32) error {
return wait.PollUntilContextTimeout(ctx, framework.Poll, JobTimeout, false, func(ctx context.Context) (bool, error) {
if err := wait.PollUntilContextTimeout(ctx, framework.Poll, JobTimeout, false, func(ctx context.Context) (bool, error) {
curr, err := c.BatchV1().Jobs(ns).Get(ctx, jobName, metav1.GetOptions{})
if err != nil {
return false, err
}
return curr.Status.Succeeded == completions, nil
})
}); err != nil {
return nil
}
return WaitForJobCondition(ctx, c, ns, jobName, batchv1.JobComplete, "")
}
// WaitForJobReady waits for particular value of the Job .status.ready field
@@ -112,6 +115,28 @@ func WaitForJobFailed(c clientset.Interface, ns, jobName string) error {
})
}
// waitForJobCondition waits for the specified Job to have the expected condition with the specific reason.
func WaitForJobCondition(ctx context.Context, c clientset.Interface, ns, jobName string, cType batchv1.JobConditionType, reason string) error {
err := wait.PollUntilContextTimeout(ctx, framework.Poll, JobTimeout, false, func(ctx context.Context) (bool, error) {
curr, err := c.BatchV1().Jobs(ns).Get(ctx, jobName, metav1.GetOptions{})
if err != nil {
return false, err
}
for _, c := range curr.Status.Conditions {
if c.Type == cType && c.Status == v1.ConditionTrue {
if reason == c.Reason {
return true, nil
}
}
}
return false, nil
})
if err != nil {
return fmt.Errorf("waiting for Job %q to have the condition %q with reason: %q: %w", jobName, cType, reason, err)
}
return nil
}
func isJobFailed(j *batchv1.Job) bool {
for _, c := range j.Status.Conditions {
if (c.Type == batchv1.JobFailed) && c.Status == v1.ConditionTrue {

View File

@@ -29,6 +29,7 @@ import (
"time"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
batchv1 "k8s.io/api/batch/v1"
v1 "k8s.io/api/core/v1"
eventsv1 "k8s.io/api/events/v1"
@@ -1160,6 +1161,301 @@ func TestBackoffLimitPerIndex_JobPodsCreatedWithExponentialBackoff(t *testing.T)
}
}
// TestDelayTerminalPhaseCondition tests the fix for Job controller to delay
// setting the terminal phase conditions (Failed and Complete) until all Pods
// are terminal. The fate of the Job is indicated by the interim Job conditions:
// FailureTarget, or SuccessCriteriaMet.
func TestDelayTerminalPhaseCondition(t *testing.T) {
t.Cleanup(setDurationDuringTest(&jobcontroller.DefaultJobPodFailureBackOff, fastPodFailureBackoff))
podTemplateSpec := v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Finalizers: []string{"fake.example.com/blockDeletion"},
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "main-container",
Image: "foo",
ImagePullPolicy: v1.PullIfNotPresent,
TerminationMessagePolicy: v1.TerminationMessageFallbackToLogsOnError,
},
},
},
}
failOnePod := func(ctx context.Context, clientSet clientset.Interface, jobObj *batchv1.Job) {
if _, err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 1); err != nil {
t.Fatalf("Failed setting phase %q on Job Pod: %v", v1.PodFailed, err)
}
}
succeedOnePodAndScaleDown := func(ctx context.Context, clientSet clientset.Interface, jobObj *batchv1.Job) {
// mark one pod as succeeded
if err := setJobPhaseForIndex(ctx, clientSet, jobObj, v1.PodSucceeded, 0); err != nil {
t.Fatalf("Failed setting phase %q on Job Pod: %v", v1.PodSucceeded, err)
}
jobClient := clientSet.BatchV1().Jobs(jobObj.Namespace)
if _, err := updateJob(ctx, jobClient, jobObj.Name, func(j *batchv1.Job) {
j.Spec.Parallelism = ptr.To[int32](1)
j.Spec.Completions = ptr.To[int32](1)
}); err != nil {
t.Fatalf("Unexpected error when scaling down the job: %v", err)
}
}
testCases := map[string]struct {
enableJobManagedBy bool
enableJobPodReplacementPolicy bool
job batchv1.Job
action func(context.Context, clientset.Interface, *batchv1.Job)
wantInterimStatus *batchv1.JobStatus
wantTerminalStatus batchv1.JobStatus
}{
"job backoff limit exceeded; JobPodReplacementPolicy and JobManagedBy disabled": {
job: batchv1.Job{
Spec: batchv1.JobSpec{
Parallelism: ptr.To[int32](2),
Completions: ptr.To[int32](2),
Template: podTemplateSpec,
BackoffLimit: ptr.To[int32](0),
},
},
action: failOnePod,
wantTerminalStatus: batchv1.JobStatus{
Failed: 2,
Ready: ptr.To[int32](0),
Conditions: []batchv1.JobCondition{
{
Type: batchv1.JobFailed,
Status: v1.ConditionTrue,
Reason: batchv1.JobReasonBackoffLimitExceeded,
},
},
},
},
"job backoff limit exceeded; JobPodReplacementPolicy enabled": {
enableJobPodReplacementPolicy: true,
job: batchv1.Job{
Spec: batchv1.JobSpec{
Parallelism: ptr.To[int32](2),
Completions: ptr.To[int32](2),
Template: podTemplateSpec,
BackoffLimit: ptr.To[int32](0),
},
},
action: failOnePod,
wantInterimStatus: &batchv1.JobStatus{
Failed: 2,
Ready: ptr.To[int32](0),
Terminating: ptr.To[int32](1),
Conditions: []batchv1.JobCondition{
{
Type: batchv1.JobFailureTarget,
Status: v1.ConditionTrue,
Reason: batchv1.JobReasonBackoffLimitExceeded,
},
},
},
wantTerminalStatus: batchv1.JobStatus{
Failed: 2,
Ready: ptr.To[int32](0),
Terminating: ptr.To[int32](0),
Conditions: []batchv1.JobCondition{
{
Type: batchv1.JobFailureTarget,
Status: v1.ConditionTrue,
Reason: batchv1.JobReasonBackoffLimitExceeded,
},
{
Type: batchv1.JobFailed,
Status: v1.ConditionTrue,
Reason: batchv1.JobReasonBackoffLimitExceeded,
},
},
},
},
"job backoff limit exceeded; JobManagedBy enabled": {
enableJobManagedBy: true,
job: batchv1.Job{
Spec: batchv1.JobSpec{
Parallelism: ptr.To[int32](2),
Completions: ptr.To[int32](2),
Template: podTemplateSpec,
BackoffLimit: ptr.To[int32](0),
},
},
action: failOnePod,
wantInterimStatus: &batchv1.JobStatus{
Failed: 2,
Ready: ptr.To[int32](0),
Conditions: []batchv1.JobCondition{
{
Type: batchv1.JobFailureTarget,
Status: v1.ConditionTrue,
Reason: batchv1.JobReasonBackoffLimitExceeded,
},
},
},
wantTerminalStatus: batchv1.JobStatus{
Failed: 2,
Ready: ptr.To[int32](0),
Conditions: []batchv1.JobCondition{
{
Type: batchv1.JobFailureTarget,
Status: v1.ConditionTrue,
Reason: batchv1.JobReasonBackoffLimitExceeded,
},
{
Type: batchv1.JobFailed,
Status: v1.ConditionTrue,
Reason: batchv1.JobReasonBackoffLimitExceeded,
},
},
},
},
"job scale down to meet completions; JobPodReplacementPolicy and JobManagedBy disabled": {
job: batchv1.Job{
Spec: batchv1.JobSpec{
Parallelism: ptr.To[int32](2),
Completions: ptr.To[int32](2),
CompletionMode: ptr.To(batchv1.IndexedCompletion),
Template: podTemplateSpec,
},
},
action: succeedOnePodAndScaleDown,
wantTerminalStatus: batchv1.JobStatus{
Succeeded: 1,
Ready: ptr.To[int32](0),
CompletedIndexes: "0",
Conditions: []batchv1.JobCondition{
{
Type: batchv1.JobComplete,
Status: v1.ConditionTrue,
},
},
},
},
"job scale down to meet completions; JobPodReplacementPolicy enabled": {
enableJobPodReplacementPolicy: true,
job: batchv1.Job{
Spec: batchv1.JobSpec{
Parallelism: ptr.To[int32](2),
Completions: ptr.To[int32](2),
CompletionMode: ptr.To(batchv1.IndexedCompletion),
Template: podTemplateSpec,
},
},
action: succeedOnePodAndScaleDown,
wantInterimStatus: &batchv1.JobStatus{
Succeeded: 1,
Ready: ptr.To[int32](0),
Terminating: ptr.To[int32](1),
CompletedIndexes: "0",
Conditions: []batchv1.JobCondition{
{
Type: batchv1.JobSuccessCriteriaMet,
Status: v1.ConditionTrue,
},
},
},
wantTerminalStatus: batchv1.JobStatus{
Succeeded: 1,
Ready: ptr.To[int32](0),
Terminating: ptr.To[int32](0),
CompletedIndexes: "0",
Conditions: []batchv1.JobCondition{
{
Type: batchv1.JobSuccessCriteriaMet,
Status: v1.ConditionTrue,
},
{
Type: batchv1.JobComplete,
Status: v1.ConditionTrue,
},
},
},
},
"job scale down to meet completions; JobManagedBy enabled": {
enableJobManagedBy: true,
job: batchv1.Job{
Spec: batchv1.JobSpec{
Parallelism: ptr.To[int32](2),
Completions: ptr.To[int32](2),
CompletionMode: ptr.To(batchv1.IndexedCompletion),
Template: podTemplateSpec,
},
},
action: succeedOnePodAndScaleDown,
wantInterimStatus: &batchv1.JobStatus{
Succeeded: 1,
Ready: ptr.To[int32](0),
CompletedIndexes: "0",
Conditions: []batchv1.JobCondition{
{
Type: batchv1.JobSuccessCriteriaMet,
Status: v1.ConditionTrue,
},
},
},
wantTerminalStatus: batchv1.JobStatus{
Succeeded: 1,
Ready: ptr.To[int32](0),
CompletedIndexes: "0",
Conditions: []batchv1.JobCondition{
{
Type: batchv1.JobSuccessCriteriaMet,
Status: v1.ConditionTrue,
},
{
Type: batchv1.JobComplete,
Status: v1.ConditionTrue,
},
},
},
},
}
for name, test := range testCases {
t.Run(name, func(t *testing.T) {
resetMetrics()
featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodReplacementPolicy, test.enableJobPodReplacementPolicy)
featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobManagedBy, test.enableJobManagedBy)
featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.ElasticIndexedJob, true)
closeFn, restConfig, clientSet, ns := setup(t, "delay-terminal-condition")
t.Cleanup(closeFn)
ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig)
t.Cleanup(cancel)
jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &test.job)
if err != nil {
t.Fatalf("Error %q while creating the job %q", err, jobObj.Name)
}
t.Cleanup(func() { removePodsFinalizer(ctx, t, clientSet, ns.Name) })
jobClient := clientSet.BatchV1().Jobs(jobObj.Namespace)
waitForPodsToBeActive(ctx, t, jobClient, *jobObj.Spec.Parallelism, jobObj)
test.action(ctx, clientSet, jobObj)
if test.wantInterimStatus != nil {
validateJobStatus(ctx, t, clientSet, jobObj, *test.wantInterimStatus)
// Set terminal phase to all the remaining pods to simulate
// Kubelet (or other components like PodGC).
jobPods, err := getJobPods(ctx, t, clientSet, jobObj, func(s v1.PodStatus) bool {
return (s.Phase == v1.PodPending || s.Phase == v1.PodRunning)
})
if err != nil {
t.Fatalf("Failed to list Job Pods: %v", err)
}
if _, err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, len(jobPods)); err != nil {
t.Fatalf("Failed setting phase %q on Job Pod: %v", v1.PodSucceeded, err)
}
}
validateJobStatus(ctx, t, clientSet, jobObj, test.wantTerminalStatus)
})
}
}
// TestBackoffLimitPerIndex tests handling of job and its pods when
// backoff limit per index is used.
func TestBackoffLimitPerIndex(t *testing.T) {
@@ -1966,16 +2262,14 @@ func TestManagedBy_UsingReservedJobFinalizers(t *testing.T) {
t.Fatalf("Error %v when marking the %q pod as succeeded", err, klog.KObj(podObj))
}
// Mark the job as finished so that the built-in controller receives the
// Trigger termination for the Job so that the built-in controller receives the
// UpdateJob event in reaction to each it would remove the pod's finalizer,
// if not for the custom managedBy field.
jobObj.Status.Conditions = append(jobObj.Status.Conditions, batchv1.JobCondition{
Type: batchv1.JobComplete,
Type: batchv1.JobSuccessCriteriaMet,
Status: v1.ConditionTrue,
})
jobObj.Status.StartTime = ptr.To(metav1.Now())
jobObj.Status.CompletionTime = ptr.To(metav1.Now())
if jobObj, err = clientSet.BatchV1().Jobs(jobObj.Namespace).UpdateStatus(ctx, jobObj, metav1.UpdateOptions{}); err != nil {
t.Fatalf("Error %v when updating the job as finished %v", err, klog.KObj(jobObj))
}
@@ -2821,7 +3115,7 @@ func TestElasticIndexedJob(t *testing.T) {
jobUpdates: []jobUpdate{
{
completions: ptr.To[int32](0),
wantTerminating: ptr.To[int32](3),
wantTerminating: ptr.To[int32](0),
},
},
},
@@ -3595,6 +3889,25 @@ func validateJobsPodsStatusOnlyWithTimeout(ctx context.Context, t testing.TB, cl
}
}
func validateJobStatus(ctx context.Context, t testing.TB, clientSet clientset.Interface, jobObj *batchv1.Job, wantStatus batchv1.JobStatus) {
t.Helper()
diff := ""
if err := wait.PollUntilContextTimeout(ctx, waitInterval, wait.ForeverTestTimeout, true, func(ctx context.Context) (bool, error) {
gotJob, err := clientSet.BatchV1().Jobs(jobObj.Namespace).Get(ctx, jobObj.Name, metav1.GetOptions{})
if err != nil {
t.Fatalf("Failed to get updated Job: %v, last status diff (-want,+got):\n%s", err, diff)
}
diff = cmp.Diff(wantStatus, gotJob.Status,
cmpopts.EquateEmpty(),
cmpopts.IgnoreFields(batchv1.JobStatus{}, "StartTime", "UncountedTerminatedPods", "CompletionTime"),
cmpopts.IgnoreFields(batchv1.JobCondition{}, "LastProbeTime", "LastTransitionTime", "Message"),
)
return diff == "", nil
}); err != nil {
t.Fatalf("Waiting for Job Status: %v\n, Status diff (-want,+got):\n%s", err, diff)
}
}
func validateJobPodsStatus(ctx context.Context, t testing.TB, clientSet clientset.Interface, jobObj *batchv1.Job, desired podsByStatus) {
t.Helper()
validateJobsPodsStatusOnly(ctx, t, clientSet, jobObj, desired)