From fbe734ee4731f28b71f6b73e6a4b0d0581b03ff9 Mon Sep 17 00:00:00 2001 From: Janet Kuo Date: Thu, 25 Aug 2016 15:18:19 -0700 Subject: [PATCH 1/2] SJ e2e test for unexpected events --- test/e2e/scheduledjob.go | 90 ++++++++++++++++++++++++++++++++++++---- 1 file changed, 81 insertions(+), 9 deletions(-) diff --git a/test/e2e/scheduledjob.go b/test/e2e/scheduledjob.go index cc5ded90a23..e642f6d7cee 100644 --- a/test/e2e/scheduledjob.go +++ b/test/e2e/scheduledjob.go @@ -28,6 +28,7 @@ import ( "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/apis/batch" client "k8s.io/kubernetes/pkg/client/unversioned" + "k8s.io/kubernetes/pkg/controller/job" "k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/test/e2e/framework" ) @@ -56,7 +57,7 @@ var _ = framework.KubeDescribe("ScheduledJob", func() { // multiple jobs running at once It("should schedule multiple jobs concurrently", func() { By("Creating a scheduledjob") - scheduledJob := newTestScheduledJob("concurrent", "*/1 * * * ?", batch.AllowConcurrent) + scheduledJob := newTestScheduledJob("concurrent", "*/1 * * * ?", batch.AllowConcurrent, true) scheduledJob, err := createScheduledJob(f.Client, f.Namespace.Name, scheduledJob) Expect(err).NotTo(HaveOccurred()) @@ -77,7 +78,7 @@ var _ = framework.KubeDescribe("ScheduledJob", func() { // suspended should not schedule jobs It("should not schedule jobs when suspended", func() { By("Creating a suspended scheduledjob") - scheduledJob := newTestScheduledJob("suspended", "*/1 * * * ?", batch.AllowConcurrent) + scheduledJob := newTestScheduledJob("suspended", "*/1 * * * ?", batch.AllowConcurrent, true) scheduledJob.Spec.Suspend = newBool(true) scheduledJob, err := createScheduledJob(f.Client, f.Namespace.Name, scheduledJob) Expect(err).NotTo(HaveOccurred()) @@ -99,7 +100,7 @@ var _ = framework.KubeDescribe("ScheduledJob", func() { // only single active job is allowed for ForbidConcurrent It("should not schedule new jobs when ForbidConcurrent", func() { By("Creating a ForbidConcurrent scheduledjob") - scheduledJob := newTestScheduledJob("forbid", "*/1 * * * ?", batch.ForbidConcurrent) + scheduledJob := newTestScheduledJob("forbid", "*/1 * * * ?", batch.ForbidConcurrent, true) scheduledJob, err := createScheduledJob(f.Client, f.Namespace.Name, scheduledJob) Expect(err).NotTo(HaveOccurred()) @@ -129,7 +130,7 @@ var _ = framework.KubeDescribe("ScheduledJob", func() { // only single active job is allowed for ReplaceConcurrent It("should replace jobs when ReplaceConcurrent", func() { By("Creating a ReplaceConcurrent scheduledjob") - scheduledJob := newTestScheduledJob("replace", "*/1 * * * ?", batch.ReplaceConcurrent) + scheduledJob := newTestScheduledJob("replace", "*/1 * * * ?", batch.ReplaceConcurrent, true) scheduledJob, err := createScheduledJob(f.Client, f.Namespace.Name, scheduledJob) Expect(err).NotTo(HaveOccurred()) @@ -155,13 +156,35 @@ var _ = framework.KubeDescribe("ScheduledJob", func() { err = deleteScheduledJob(f.Client, f.Namespace.Name, scheduledJob.Name) Expect(err).NotTo(HaveOccurred()) }) + + // shouldn't give us unexpected warnings + It("should not emit unexpected warnings", func() { + By("Creating a scheduledjob") + scheduledJob := newTestScheduledJob("concurrent", "*/1 * * * ?", batch.AllowConcurrent, false) + scheduledJob, err := createScheduledJob(f.Client, f.Namespace.Name, scheduledJob) + Expect(err).NotTo(HaveOccurred()) + + By("Ensuring at least two jobs and at least one finished job exists by listing jobs explicitly") + err = waitForJobsAtLeast(f.Client, f.Namespace.Name, 2) + Expect(err).NotTo(HaveOccurred()) + err = waitForAnyFinishedJob(f.Client, f.Namespace.Name) + Expect(err).NotTo(HaveOccurred()) + + By("Ensuring no unexpected event has happened") + err = checkNoUnexpectedEvents(f.Client, f.Namespace.Name, scheduledJob.Name) + Expect(err).NotTo(HaveOccurred()) + + By("Removing scheduledjob") + err = deleteScheduledJob(f.Client, f.Namespace.Name, scheduledJob.Name) + Expect(err).NotTo(HaveOccurred()) + }) }) // newTestScheduledJob returns a scheduledjob which does one of several testing behaviors. -func newTestScheduledJob(name, schedule string, concurrencyPolicy batch.ConcurrencyPolicy) *batch.ScheduledJob { +func newTestScheduledJob(name, schedule string, concurrencyPolicy batch.ConcurrencyPolicy, sleep bool) *batch.ScheduledJob { parallelism := int32(1) completions := int32(1) - return &batch.ScheduledJob{ + sj := &batch.ScheduledJob{ ObjectMeta: api.ObjectMeta{ Name: name, }, @@ -185,9 +208,8 @@ func newTestScheduledJob(name, schedule string, concurrencyPolicy batch.Concurre }, Containers: []api.Container{ { - Name: "c", - Image: "gcr.io/google_containers/busybox:1.24", - Command: []string{"sleep", "300"}, + Name: "c", + Image: "gcr.io/google_containers/busybox:1.24", VolumeMounts: []api.VolumeMount{ { MountPath: "/data", @@ -202,6 +224,10 @@ func newTestScheduledJob(name, schedule string, concurrencyPolicy batch.Concurre }, }, } + if sleep { + sj.Spec.JobTemplate.Spec.Template.Spec.Containers[0].Command = []string{"sleep", "300"} + } + return sj } func createScheduledJob(c *client.Client, ns string, scheduledJob *batch.ScheduledJob) (*batch.ScheduledJob, error) { @@ -252,3 +278,49 @@ func waitForJobReplaced(c *client.Client, ns, previousJobName string) error { return jobs.Items[0].Name != previousJobName, nil }) } + +// waitForJobsAtLeast waits for at least a number of jobs to appear. +func waitForJobsAtLeast(c *client.Client, ns string, atLeast int) error { + return wait.Poll(framework.Poll, scheduledJobTimeout, func() (bool, error) { + jobs, err := c.Batch().Jobs(ns).List(api.ListOptions{}) + if err != nil { + return false, err + } + return len(jobs.Items) >= atLeast, nil + }) +} + +// waitForAnyFinishedJob waits for any completed job to appear. +func waitForAnyFinishedJob(c *client.Client, ns string) error { + return wait.Poll(framework.Poll, scheduledJobTimeout, func() (bool, error) { + jobs, err := c.Batch().Jobs(ns).List(api.ListOptions{}) + if err != nil { + return false, err + } + for i := range jobs.Items { + if job.IsJobFinished(&jobs.Items[i]) { + return true, nil + } + } + return false, nil + }) +} + +// checkNoUnexpectedEvents checks unexpected events didn't happen. +// Currently only "UnexpectedJob" is checked. +func checkNoUnexpectedEvents(c *client.Client, ns, scheduledJobName string) error { + sj, err := c.Batch().ScheduledJobs(ns).Get(scheduledJobName) + if err != nil { + return fmt.Errorf("error in getting scheduledjob %s/%s: %v", ns, scheduledJobName, err) + } + events, err := c.Events(ns).Search(sj) + if err != nil { + return fmt.Errorf("error in listing events: %s", err) + } + for _, e := range events.Items { + if e.Reason == "UnexpectedJob" { + return fmt.Errorf("found unexpected event: %#v", e) + } + } + return nil +} From 6004be15b88a950f3a78449e6d00f526fcf24ec7 Mon Sep 17 00:00:00 2001 From: Janet Kuo Date: Thu, 25 Aug 2016 16:49:55 -0700 Subject: [PATCH 2/2] Fix the bug that SJ sees finished jobs as unexpected --- pkg/controller/scheduledjob/controller.go | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/pkg/controller/scheduledjob/controller.go b/pkg/controller/scheduledjob/controller.go index ba837cb693c..c06fbafa7aa 100644 --- a/pkg/controller/scheduledjob/controller.go +++ b/pkg/controller/scheduledjob/controller.go @@ -127,11 +127,12 @@ func (jm *ScheduledJobController) SyncAll() { func SyncOne(sj batch.ScheduledJob, js []batch.Job, now time.Time, jc jobControlInterface, sjc sjControlInterface, pc podControlInterface, recorder record.EventRecorder) { nameForLog := fmt.Sprintf("%s/%s", sj.Namespace, sj.Name) - for _, j := range js { + for i := range js { + j := js[i] found := inActiveList(sj, j.ObjectMeta.UID) - if !found { + if !found && !job.IsJobFinished(&j) { recorder.Eventf(&sj, api.EventTypeWarning, "UnexpectedJob", "Saw a job that the controller did not create or forgot: %v", j.Name) - // We found a job object that has us as the parent, but it is not in our Active list. + // We found an unfinished job that has us as the parent, but it is not in our Active list. // This could happen if we crashed right after creating the Job and before updating the status, // or if our jobs list is newer than our sj status after a relist, or if someone intentionally created // a job that they wanted us to adopt. @@ -141,12 +142,10 @@ func SyncOne(sj batch.ScheduledJob, js []batch.Job, now time.Time, jc jobControl // user has permission to create a job within a namespace, then they have permission to make any scheduledJob // in the same namespace "adopt" that job. ReplicaSets and their Pods work the same way. // TBS: how to update sj.Status.LastScheduleTime if the adopted job is newer than any we knew about? - } else { - if job.IsJobFinished(&j) { - deleteFromActiveList(&sj, j.ObjectMeta.UID) - // TODO: event to call out failure vs success. - recorder.Eventf(&sj, api.EventTypeNormal, "SawCompletedJob", "Saw completed job: %v", j.Name) - } + } else if found && job.IsJobFinished(&j) { + deleteFromActiveList(&sj, j.ObjectMeta.UID) + // TODO: event to call out failure vs success. + recorder.Eventf(&sj, api.EventTypeNormal, "SawCompletedJob", "Saw completed job: %v", j.Name) } } updatedSJ, err := sjc.UpdateStatus(&sj)