Merge pull request #31476 from janetkuo/fix-sj-finished-job-warning
Automatic merge from submit-queue Fix the bug that SJ sees finished jobs as unexpected <!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, read our contributor guidelines https://github.com/kubernetes/kubernetes/blob/master/CONTRIBUTING.md and developer guide https://github.com/kubernetes/kubernetes/blob/master/docs/devel/development.md 2. If you want *faster* PR reviews, read how: https://github.com/kubernetes/kubernetes/blob/master/docs/devel/faster_reviews.md 3. Follow the instructions for writing a release note: https://github.com/kubernetes/kubernetes/blob/master/docs/devel/pull-requests.md#release-notes --> **What this PR does / why we need it**: **Which issue this PR fixes** *(optional, in `fixes #<issue number>(, #<issue_number>, ...)` format, will close that issue when PR gets merged)*: fixes #31472 **Special notes for your reviewer**: **Release note**: <!-- Steps to write your release note: 1. Use the release-note-* labels to set the release note state (if you have access) 2. Enter your extended release note in the below block; leaving it blank means using the PR title as the release note. If no release note is required, just write `NONE`. --> ```release-note NONE ``` cc @soltysh @erictune
This commit is contained in:
@@ -127,11 +127,12 @@ func (jm *ScheduledJobController) SyncAll() {
|
|||||||
func SyncOne(sj batch.ScheduledJob, js []batch.Job, now time.Time, jc jobControlInterface, sjc sjControlInterface, pc podControlInterface, recorder record.EventRecorder) {
|
func SyncOne(sj batch.ScheduledJob, js []batch.Job, now time.Time, jc jobControlInterface, sjc sjControlInterface, pc podControlInterface, recorder record.EventRecorder) {
|
||||||
nameForLog := fmt.Sprintf("%s/%s", sj.Namespace, sj.Name)
|
nameForLog := fmt.Sprintf("%s/%s", sj.Namespace, sj.Name)
|
||||||
|
|
||||||
for _, j := range js {
|
for i := range js {
|
||||||
|
j := js[i]
|
||||||
found := inActiveList(sj, j.ObjectMeta.UID)
|
found := inActiveList(sj, j.ObjectMeta.UID)
|
||||||
if !found {
|
if !found && !job.IsJobFinished(&j) {
|
||||||
recorder.Eventf(&sj, api.EventTypeWarning, "UnexpectedJob", "Saw a job that the controller did not create or forgot: %v", j.Name)
|
recorder.Eventf(&sj, api.EventTypeWarning, "UnexpectedJob", "Saw a job that the controller did not create or forgot: %v", j.Name)
|
||||||
// We found a job object that has us as the parent, but it is not in our Active list.
|
// We found an unfinished job that has us as the parent, but it is not in our Active list.
|
||||||
// This could happen if we crashed right after creating the Job and before updating the status,
|
// This could happen if we crashed right after creating the Job and before updating the status,
|
||||||
// or if our jobs list is newer than our sj status after a relist, or if someone intentionally created
|
// or if our jobs list is newer than our sj status after a relist, or if someone intentionally created
|
||||||
// a job that they wanted us to adopt.
|
// a job that they wanted us to adopt.
|
||||||
@@ -141,14 +142,12 @@ func SyncOne(sj batch.ScheduledJob, js []batch.Job, now time.Time, jc jobControl
|
|||||||
// user has permission to create a job within a namespace, then they have permission to make any scheduledJob
|
// user has permission to create a job within a namespace, then they have permission to make any scheduledJob
|
||||||
// in the same namespace "adopt" that job. ReplicaSets and their Pods work the same way.
|
// in the same namespace "adopt" that job. ReplicaSets and their Pods work the same way.
|
||||||
// TBS: how to update sj.Status.LastScheduleTime if the adopted job is newer than any we knew about?
|
// TBS: how to update sj.Status.LastScheduleTime if the adopted job is newer than any we knew about?
|
||||||
} else {
|
} else if found && job.IsJobFinished(&j) {
|
||||||
if job.IsJobFinished(&j) {
|
|
||||||
deleteFromActiveList(&sj, j.ObjectMeta.UID)
|
deleteFromActiveList(&sj, j.ObjectMeta.UID)
|
||||||
// TODO: event to call out failure vs success.
|
// TODO: event to call out failure vs success.
|
||||||
recorder.Eventf(&sj, api.EventTypeNormal, "SawCompletedJob", "Saw completed job: %v", j.Name)
|
recorder.Eventf(&sj, api.EventTypeNormal, "SawCompletedJob", "Saw completed job: %v", j.Name)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
updatedSJ, err := sjc.UpdateStatus(&sj)
|
updatedSJ, err := sjc.UpdateStatus(&sj)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("Unable to update status for %s (rv = %s): %v", nameForLog, sj.ResourceVersion, err)
|
glog.Errorf("Unable to update status for %s (rv = %s): %v", nameForLog, sj.ResourceVersion, err)
|
||||||
|
@@ -28,6 +28,7 @@ import (
|
|||||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||||
"k8s.io/kubernetes/pkg/apis/batch"
|
"k8s.io/kubernetes/pkg/apis/batch"
|
||||||
client "k8s.io/kubernetes/pkg/client/unversioned"
|
client "k8s.io/kubernetes/pkg/client/unversioned"
|
||||||
|
"k8s.io/kubernetes/pkg/controller/job"
|
||||||
"k8s.io/kubernetes/pkg/util/wait"
|
"k8s.io/kubernetes/pkg/util/wait"
|
||||||
"k8s.io/kubernetes/test/e2e/framework"
|
"k8s.io/kubernetes/test/e2e/framework"
|
||||||
)
|
)
|
||||||
@@ -56,7 +57,7 @@ var _ = framework.KubeDescribe("ScheduledJob", func() {
|
|||||||
// multiple jobs running at once
|
// multiple jobs running at once
|
||||||
It("should schedule multiple jobs concurrently", func() {
|
It("should schedule multiple jobs concurrently", func() {
|
||||||
By("Creating a scheduledjob")
|
By("Creating a scheduledjob")
|
||||||
scheduledJob := newTestScheduledJob("concurrent", "*/1 * * * ?", batch.AllowConcurrent)
|
scheduledJob := newTestScheduledJob("concurrent", "*/1 * * * ?", batch.AllowConcurrent, true)
|
||||||
scheduledJob, err := createScheduledJob(f.Client, f.Namespace.Name, scheduledJob)
|
scheduledJob, err := createScheduledJob(f.Client, f.Namespace.Name, scheduledJob)
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
@@ -77,7 +78,7 @@ var _ = framework.KubeDescribe("ScheduledJob", func() {
|
|||||||
// suspended should not schedule jobs
|
// suspended should not schedule jobs
|
||||||
It("should not schedule jobs when suspended", func() {
|
It("should not schedule jobs when suspended", func() {
|
||||||
By("Creating a suspended scheduledjob")
|
By("Creating a suspended scheduledjob")
|
||||||
scheduledJob := newTestScheduledJob("suspended", "*/1 * * * ?", batch.AllowConcurrent)
|
scheduledJob := newTestScheduledJob("suspended", "*/1 * * * ?", batch.AllowConcurrent, true)
|
||||||
scheduledJob.Spec.Suspend = newBool(true)
|
scheduledJob.Spec.Suspend = newBool(true)
|
||||||
scheduledJob, err := createScheduledJob(f.Client, f.Namespace.Name, scheduledJob)
|
scheduledJob, err := createScheduledJob(f.Client, f.Namespace.Name, scheduledJob)
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
@@ -99,7 +100,7 @@ var _ = framework.KubeDescribe("ScheduledJob", func() {
|
|||||||
// only single active job is allowed for ForbidConcurrent
|
// only single active job is allowed for ForbidConcurrent
|
||||||
It("should not schedule new jobs when ForbidConcurrent", func() {
|
It("should not schedule new jobs when ForbidConcurrent", func() {
|
||||||
By("Creating a ForbidConcurrent scheduledjob")
|
By("Creating a ForbidConcurrent scheduledjob")
|
||||||
scheduledJob := newTestScheduledJob("forbid", "*/1 * * * ?", batch.ForbidConcurrent)
|
scheduledJob := newTestScheduledJob("forbid", "*/1 * * * ?", batch.ForbidConcurrent, true)
|
||||||
scheduledJob, err := createScheduledJob(f.Client, f.Namespace.Name, scheduledJob)
|
scheduledJob, err := createScheduledJob(f.Client, f.Namespace.Name, scheduledJob)
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
@@ -129,7 +130,7 @@ var _ = framework.KubeDescribe("ScheduledJob", func() {
|
|||||||
// only single active job is allowed for ReplaceConcurrent
|
// only single active job is allowed for ReplaceConcurrent
|
||||||
It("should replace jobs when ReplaceConcurrent", func() {
|
It("should replace jobs when ReplaceConcurrent", func() {
|
||||||
By("Creating a ReplaceConcurrent scheduledjob")
|
By("Creating a ReplaceConcurrent scheduledjob")
|
||||||
scheduledJob := newTestScheduledJob("replace", "*/1 * * * ?", batch.ReplaceConcurrent)
|
scheduledJob := newTestScheduledJob("replace", "*/1 * * * ?", batch.ReplaceConcurrent, true)
|
||||||
scheduledJob, err := createScheduledJob(f.Client, f.Namespace.Name, scheduledJob)
|
scheduledJob, err := createScheduledJob(f.Client, f.Namespace.Name, scheduledJob)
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
@@ -155,13 +156,35 @@ var _ = framework.KubeDescribe("ScheduledJob", func() {
|
|||||||
err = deleteScheduledJob(f.Client, f.Namespace.Name, scheduledJob.Name)
|
err = deleteScheduledJob(f.Client, f.Namespace.Name, scheduledJob.Name)
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
})
|
})
|
||||||
|
|
||||||
|
// shouldn't give us unexpected warnings
|
||||||
|
It("should not emit unexpected warnings", func() {
|
||||||
|
By("Creating a scheduledjob")
|
||||||
|
scheduledJob := newTestScheduledJob("concurrent", "*/1 * * * ?", batch.AllowConcurrent, false)
|
||||||
|
scheduledJob, err := createScheduledJob(f.Client, f.Namespace.Name, scheduledJob)
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
|
By("Ensuring at least two jobs and at least one finished job exists by listing jobs explicitly")
|
||||||
|
err = waitForJobsAtLeast(f.Client, f.Namespace.Name, 2)
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
err = waitForAnyFinishedJob(f.Client, f.Namespace.Name)
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
|
By("Ensuring no unexpected event has happened")
|
||||||
|
err = checkNoUnexpectedEvents(f.Client, f.Namespace.Name, scheduledJob.Name)
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
|
By("Removing scheduledjob")
|
||||||
|
err = deleteScheduledJob(f.Client, f.Namespace.Name, scheduledJob.Name)
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
// newTestScheduledJob returns a scheduledjob which does one of several testing behaviors.
|
// newTestScheduledJob returns a scheduledjob which does one of several testing behaviors.
|
||||||
func newTestScheduledJob(name, schedule string, concurrencyPolicy batch.ConcurrencyPolicy) *batch.ScheduledJob {
|
func newTestScheduledJob(name, schedule string, concurrencyPolicy batch.ConcurrencyPolicy, sleep bool) *batch.ScheduledJob {
|
||||||
parallelism := int32(1)
|
parallelism := int32(1)
|
||||||
completions := int32(1)
|
completions := int32(1)
|
||||||
return &batch.ScheduledJob{
|
sj := &batch.ScheduledJob{
|
||||||
ObjectMeta: api.ObjectMeta{
|
ObjectMeta: api.ObjectMeta{
|
||||||
Name: name,
|
Name: name,
|
||||||
},
|
},
|
||||||
@@ -187,7 +210,6 @@ func newTestScheduledJob(name, schedule string, concurrencyPolicy batch.Concurre
|
|||||||
{
|
{
|
||||||
Name: "c",
|
Name: "c",
|
||||||
Image: "gcr.io/google_containers/busybox:1.24",
|
Image: "gcr.io/google_containers/busybox:1.24",
|
||||||
Command: []string{"sleep", "300"},
|
|
||||||
VolumeMounts: []api.VolumeMount{
|
VolumeMounts: []api.VolumeMount{
|
||||||
{
|
{
|
||||||
MountPath: "/data",
|
MountPath: "/data",
|
||||||
@@ -202,6 +224,10 @@ func newTestScheduledJob(name, schedule string, concurrencyPolicy batch.Concurre
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
if sleep {
|
||||||
|
sj.Spec.JobTemplate.Spec.Template.Spec.Containers[0].Command = []string{"sleep", "300"}
|
||||||
|
}
|
||||||
|
return sj
|
||||||
}
|
}
|
||||||
|
|
||||||
func createScheduledJob(c *client.Client, ns string, scheduledJob *batch.ScheduledJob) (*batch.ScheduledJob, error) {
|
func createScheduledJob(c *client.Client, ns string, scheduledJob *batch.ScheduledJob) (*batch.ScheduledJob, error) {
|
||||||
@@ -252,3 +278,49 @@ func waitForJobReplaced(c *client.Client, ns, previousJobName string) error {
|
|||||||
return jobs.Items[0].Name != previousJobName, nil
|
return jobs.Items[0].Name != previousJobName, nil
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// waitForJobsAtLeast waits for at least a number of jobs to appear.
|
||||||
|
func waitForJobsAtLeast(c *client.Client, ns string, atLeast int) error {
|
||||||
|
return wait.Poll(framework.Poll, scheduledJobTimeout, func() (bool, error) {
|
||||||
|
jobs, err := c.Batch().Jobs(ns).List(api.ListOptions{})
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
return len(jobs.Items) >= atLeast, nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// waitForAnyFinishedJob waits for any completed job to appear.
|
||||||
|
func waitForAnyFinishedJob(c *client.Client, ns string) error {
|
||||||
|
return wait.Poll(framework.Poll, scheduledJobTimeout, func() (bool, error) {
|
||||||
|
jobs, err := c.Batch().Jobs(ns).List(api.ListOptions{})
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
for i := range jobs.Items {
|
||||||
|
if job.IsJobFinished(&jobs.Items[i]) {
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false, nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// checkNoUnexpectedEvents checks unexpected events didn't happen.
|
||||||
|
// Currently only "UnexpectedJob" is checked.
|
||||||
|
func checkNoUnexpectedEvents(c *client.Client, ns, scheduledJobName string) error {
|
||||||
|
sj, err := c.Batch().ScheduledJobs(ns).Get(scheduledJobName)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("error in getting scheduledjob %s/%s: %v", ns, scheduledJobName, err)
|
||||||
|
}
|
||||||
|
events, err := c.Events(ns).Search(sj)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("error in listing events: %s", err)
|
||||||
|
}
|
||||||
|
for _, e := range events.Items {
|
||||||
|
if e.Reason == "UnexpectedJob" {
|
||||||
|
return fmt.Errorf("found unexpected event: %#v", e)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
Reference in New Issue
Block a user