Merge pull request #37077 from soltysh/issue34585

Automatic merge from submit-queue

Retry job update after failure to prevent modification conflict

This fixes #34585 flake.

@janetkuo || @kubernetes/sig-apps  ptal

I've been getting too many emails recently wrt to that issue, so I wanted to "clean" my inbox a bit 😉
This commit is contained in:
Kubernetes Submit Queue
2016-11-19 12:43:14 -08:00
committed by GitHub
4 changed files with 32 additions and 4 deletions

View File

@@ -204,8 +204,9 @@ var _ = framework.KubeDescribe("V1Job", func() {
// the job stabilized and won't be synced until modification or full
// resync happens, we don't want to wait for the latter so we force
// sync modifying it
job.Spec.Parallelism = &completions
job, err = updateV1Job(f.ClientSet, f.Namespace.Name, job)
_, err = framework.UpdateJobWithRetries(f.ClientSet, f.Namespace.Name, job.Name, func(update *batch.Job) {
update.Spec.Parallelism = &completions
})
Expect(err).NotTo(HaveOccurred())
err = waitForV1JobFail(f.ClientSet, f.Namespace.Name, job.Name, v1JobTimeout)
}

View File

@@ -39,6 +39,7 @@ go_library(
"//pkg/api/validation:go_default_library",
"//pkg/apimachinery/registered:go_default_library",
"//pkg/apis/apps:go_default_library",
"//pkg/apis/batch:go_default_library",
"//pkg/apis/componentconfig:go_default_library",
"//pkg/apis/extensions:go_default_library",
"//pkg/client/clientset_generated/internalclientset:go_default_library",

View File

@@ -49,6 +49,7 @@ import (
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/apimachinery/registered"
"k8s.io/kubernetes/pkg/apis/apps"
"k8s.io/kubernetes/pkg/apis/batch"
"k8s.io/kubernetes/pkg/apis/extensions"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5"
@@ -3397,6 +3398,30 @@ func UpdateStatefulSetWithRetries(c clientset.Interface, namespace, name string,
return statefulSet, pollErr
}
type updateJobFunc func(*batch.Job)
func UpdateJobWithRetries(c clientset.Interface, namespace, name string, applyUpdate updateJobFunc) (job *batch.Job, err error) {
jobs := c.Batch().Jobs(namespace)
var updateErr error
pollErr := wait.PollImmediate(10*time.Millisecond, 1*time.Minute, func() (bool, error) {
if job, err = jobs.Get(name); err != nil {
return false, err
}
// Apply the update, then attempt to push it to the apiserver.
applyUpdate(job)
if job, err = jobs.Update(job); err == nil {
Logf("Updating job %s", name)
return true, nil
}
updateErr = err
return false, nil
})
if pollErr == wait.ErrWaitTimeout {
pollErr = fmt.Errorf("couldn't apply the provided updated to job %q: %v", name, updateErr)
}
return job, pollErr
}
// NodeAddresses returns the first address of the given type of each node.
func NodeAddresses(nodelist *api.NodeList, addrType api.NodeAddressType) []string {
hosts := []string{}

View File

@@ -200,8 +200,9 @@ var _ = framework.KubeDescribe("Job", func() {
// the job stabilized and won't be synced until modification or full
// resync happens, we don't want to wait for the latter so we force
// sync modifying it
job.Spec.Parallelism = &completions
job, err = updateJob(f.ClientSet, f.Namespace.Name, job)
_, err = framework.UpdateJobWithRetries(f.ClientSet, f.Namespace.Name, job.Name, func(update *batch.Job) {
update.Spec.Parallelism = &completions
})
Expect(err).NotTo(HaveOccurred())
err = waitForJobFail(f.ClientSet, f.Namespace.Name, job.Name, jobTimeout)
}