Merge pull request #40932 from peay/cronjob-max-finished-jobs

Automatic merge from submit-queue (batch tested with PRs 40932, 41896, 41815, 41309, 41628)

Modify CronJob API to add job history limits, cleanup jobs in controller

**What this PR does / why we need it**:
As discussed in #34710: this adds two limits to `CronJobSpec`, to limit the number of finished jobs created by a CronJob to keep.

**Which issue this PR fixes**: fixes #34710

**Special notes for your reviewer**:

cc @soltysh, please have a look and let me know what you think -- I'll then add end to end testing and update the doc in a separate commit. What is the timeline to get this into 1.6?

The plan:

- [x] API changes
  - [x] Changing versioned APIs
    - [x] `types.go`
    - [x] `defaults.go` (nothing to do)
    - [x] `conversion.go` (nothing to do?)
    - [x] `conversion_test.go` (nothing to do?)
  - [x] Changing the internal structure
    - [x] `types.go`
    - [x] `validation.go`
    - [x] `validation_test.go`
  - [x] Edit version conversions
    - [x] Edit (nothing to do?)
    - [x] Run `hack/update-codegen.sh`
  - [x] Generate protobuf objects
    - [x] Run `hack/update-generated-protobuf.sh`
  - [x] Generate json (un)marshaling code
    - [x] Run `hack/update-codecgen.sh`
  - [x] Update fuzzer
- [x] Actual logic
- [x] Unit tests
- [x] End to end tests
- [x] Documentation changes and API specs update in separate commit


**Release note**:

```release-note
Add configurable limits to CronJob resource to specify how many successful and failed jobs are preserved.
```
This commit is contained in:
Kubernetes Submit Queue
2017-02-26 08:09:54 -08:00
committed by GitHub
18 changed files with 965 additions and 208 deletions

View File

@@ -52,6 +52,11 @@ var (
var _ = framework.KubeDescribe("CronJob", func() {
f := framework.NewDefaultGroupVersionFramework("cronjob", BatchV2Alpha1GroupVersion)
sleepCommand := []string{"sleep", "300"}
// Pod will complete instantly
successCommand := []string{"/bin/true"}
BeforeEach(func() {
framework.SkipIfMissingResource(f.ClientPool, CronJobGroupVersionResource, f.Namespace.Name)
})
@@ -59,7 +64,8 @@ var _ = framework.KubeDescribe("CronJob", func() {
// multiple jobs running at once
It("should schedule multiple jobs concurrently", func() {
By("Creating a cronjob")
cronJob := newTestCronJob("concurrent", "*/1 * * * ?", batch.AllowConcurrent, true)
cronJob := newTestCronJob("concurrent", "*/1 * * * ?", batch.AllowConcurrent,
sleepCommand, nil)
cronJob, err := createCronJob(f.ClientSet, f.Namespace.Name, cronJob)
Expect(err).NotTo(HaveOccurred())
@@ -70,7 +76,7 @@ var _ = framework.KubeDescribe("CronJob", func() {
By("Ensuring at least two running jobs exists by listing jobs explicitly")
jobs, err := f.ClientSet.Batch().Jobs(f.Namespace.Name).List(metav1.ListOptions{})
Expect(err).NotTo(HaveOccurred())
activeJobs := filterActiveJobs(jobs)
activeJobs, _ := filterActiveJobs(jobs)
Expect(len(activeJobs) >= 2).To(BeTrue())
By("Removing cronjob")
@@ -81,7 +87,8 @@ var _ = framework.KubeDescribe("CronJob", func() {
// suspended should not schedule jobs
It("should not schedule jobs when suspended [Slow]", func() {
By("Creating a suspended cronjob")
cronJob := newTestCronJob("suspended", "*/1 * * * ?", batch.AllowConcurrent, true)
cronJob := newTestCronJob("suspended", "*/1 * * * ?", batch.AllowConcurrent,
sleepCommand, nil)
cronJob.Spec.Suspend = newBool(true)
cronJob, err := createCronJob(f.ClientSet, f.Namespace.Name, cronJob)
Expect(err).NotTo(HaveOccurred())
@@ -103,7 +110,8 @@ var _ = framework.KubeDescribe("CronJob", func() {
// only single active job is allowed for ForbidConcurrent
It("should not schedule new jobs when ForbidConcurrent [Slow]", func() {
By("Creating a ForbidConcurrent cronjob")
cronJob := newTestCronJob("forbid", "*/1 * * * ?", batch.ForbidConcurrent, true)
cronJob := newTestCronJob("forbid", "*/1 * * * ?", batch.ForbidConcurrent,
sleepCommand, nil)
cronJob, err := createCronJob(f.ClientSet, f.Namespace.Name, cronJob)
Expect(err).NotTo(HaveOccurred())
@@ -119,7 +127,7 @@ var _ = framework.KubeDescribe("CronJob", func() {
By("Ensuring exaclty one running job exists by listing jobs explicitly")
jobs, err := f.ClientSet.Batch().Jobs(f.Namespace.Name).List(metav1.ListOptions{})
Expect(err).NotTo(HaveOccurred())
activeJobs := filterActiveJobs(jobs)
activeJobs, _ := filterActiveJobs(jobs)
Expect(activeJobs).To(HaveLen(1))
By("Ensuring no more jobs are scheduled")
@@ -134,7 +142,8 @@ var _ = framework.KubeDescribe("CronJob", func() {
// only single active job is allowed for ReplaceConcurrent
It("should replace jobs when ReplaceConcurrent", func() {
By("Creating a ReplaceConcurrent cronjob")
cronJob := newTestCronJob("replace", "*/1 * * * ?", batch.ReplaceConcurrent, true)
cronJob := newTestCronJob("replace", "*/1 * * * ?", batch.ReplaceConcurrent,
sleepCommand, nil)
cronJob, err := createCronJob(f.ClientSet, f.Namespace.Name, cronJob)
Expect(err).NotTo(HaveOccurred())
@@ -150,7 +159,7 @@ var _ = framework.KubeDescribe("CronJob", func() {
By("Ensuring exaclty one running job exists by listing jobs explicitly")
jobs, err := f.ClientSet.Batch().Jobs(f.Namespace.Name).List(metav1.ListOptions{})
Expect(err).NotTo(HaveOccurred())
activeJobs := filterActiveJobs(jobs)
activeJobs, _ := filterActiveJobs(jobs)
Expect(activeJobs).To(HaveLen(1))
By("Ensuring the job is replaced with a new one")
@@ -165,7 +174,8 @@ var _ = framework.KubeDescribe("CronJob", func() {
// shouldn't give us unexpected warnings
It("should not emit unexpected warnings", func() {
By("Creating a cronjob")
cronJob := newTestCronJob("concurrent", "*/1 * * * ?", batch.AllowConcurrent, false)
cronJob := newTestCronJob("concurrent", "*/1 * * * ?", batch.AllowConcurrent,
nil, nil)
cronJob, err := createCronJob(f.ClientSet, f.Namespace.Name, cronJob)
Expect(err).NotTo(HaveOccurred())
@@ -187,7 +197,8 @@ var _ = framework.KubeDescribe("CronJob", func() {
// deleted jobs should be removed from the active list
It("should remove from active list jobs that have been deleted", func() {
By("Creating a ForbidConcurrent cronjob")
cronJob := newTestCronJob("forbid", "*/1 * * * ?", batch.ForbidConcurrent, true)
cronJob := newTestCronJob("forbid", "*/1 * * * ?", batch.ForbidConcurrent,
sleepCommand, nil)
cronJob, err := createCronJob(f.ClientSet, f.Namespace.Name, cronJob)
Expect(err).NotTo(HaveOccurred())
@@ -225,10 +236,49 @@ var _ = framework.KubeDescribe("CronJob", func() {
err = deleteCronJob(f.ClientSet, f.Namespace.Name, cronJob.Name)
Expect(err).NotTo(HaveOccurred())
})
// cleanup of successful finished jobs, with limit of one successful job
It("should delete successful finished jobs with limit of one successful job", func() {
By("Creating a AllowConcurrent cronjob with custom history limits")
successLimit := int32(1)
cronJob := newTestCronJob("concurrent-limit", "*/1 * * * ?", batch.AllowConcurrent,
successCommand, &successLimit)
cronJob, err := createCronJob(f.ClientSet, f.Namespace.Name, cronJob)
Expect(err).NotTo(HaveOccurred())
// Job is going to complete instantly: do not check for an active job
// as we are most likely to miss it
By("Ensuring a finished job exists")
err = waitForAnyFinishedJob(f.ClientSet, f.Namespace.Name)
Expect(err).NotTo(HaveOccurred())
By("Ensuring a finished job exists by listing jobs explicitly")
jobs, err := f.ClientSet.Batch().Jobs(f.Namespace.Name).List(metav1.ListOptions{})
Expect(err).NotTo(HaveOccurred())
_, finishedJobs := filterActiveJobs(jobs)
Expect(len(finishedJobs) == 1).To(BeTrue())
// Job should get deleted when the next job finishes the next minute
By("Ensuring this job does not exist anymore")
err = waitForJobNotExist(f.ClientSet, f.Namespace.Name, finishedJobs[0])
Expect(err).NotTo(HaveOccurred())
By("Ensuring there is 1 finished job by listing jobs explicitly")
jobs, err = f.ClientSet.Batch().Jobs(f.Namespace.Name).List(metav1.ListOptions{})
Expect(err).NotTo(HaveOccurred())
_, finishedJobs = filterActiveJobs(jobs)
Expect(len(finishedJobs) == 1).To(BeTrue())
By("Removing cronjob")
err = deleteCronJob(f.ClientSet, f.Namespace.Name, cronJob.Name)
Expect(err).NotTo(HaveOccurred())
})
})
// newTestCronJob returns a cronjob which does one of several testing behaviors.
func newTestCronJob(name, schedule string, concurrencyPolicy batch.ConcurrencyPolicy, sleep bool) *batch.CronJob {
func newTestCronJob(name, schedule string, concurrencyPolicy batch.ConcurrencyPolicy, command []string,
successfulJobsHistoryLimit *int32) *batch.CronJob {
parallelism := int32(1)
completions := int32(1)
sj := &batch.CronJob{
@@ -271,8 +321,9 @@ func newTestCronJob(name, schedule string, concurrencyPolicy batch.ConcurrencyPo
},
},
}
if sleep {
sj.Spec.JobTemplate.Spec.Template.Spec.Containers[0].Command = []string{"sleep", "300"}
sj.Spec.SuccessfulJobsHistoryLimit = successfulJobsHistoryLimit
if command != nil {
sj.Spec.JobTemplate.Spec.Template.Spec.Containers[0].Command = command
}
return sj
}
@@ -319,6 +370,23 @@ func waitForNoJobs(c clientset.Interface, ns, jobName string, failIfNonEmpty boo
})
}
// Wait for a job to not exist by listing jobs explicitly.
func waitForJobNotExist(c clientset.Interface, ns string, targetJob *batchv1.Job) error {
return wait.Poll(framework.Poll, cronJobTimeout, func() (bool, error) {
jobs, err := c.Batch().Jobs(ns).List(metav1.ListOptions{})
if err != nil {
return false, err
}
_, finishedJobs := filterActiveJobs(jobs)
for _, job := range finishedJobs {
if targetJob.Namespace == job.Namespace && targetJob.Name == job.Name {
return false, nil
}
}
return true, nil
})
}
// Wait for a job to be replaced with a new one.
func waitForJobReplaced(c clientset.Interface, ns, previousJobName string) error {
return wait.Poll(framework.Poll, cronJobTimeout, func() (bool, error) {
@@ -383,11 +451,13 @@ func checkNoEventWithReason(c clientset.Interface, ns, cronJobName string, reaso
return nil
}
func filterActiveJobs(jobs *batchv1.JobList) (active []*batchv1.Job) {
func filterActiveJobs(jobs *batchv1.JobList) (active []*batchv1.Job, finished []*batchv1.Job) {
for i := range jobs.Items {
j := jobs.Items[i]
if !job.IsJobFinished(&j) {
active = append(active, &j)
} else {
finished = append(finished, &j)
}
}
return