Merge pull request #115236 from danielvegamyhre/scalable-indexed-job
Support for elastic Indexed Jobs
This commit is contained in:
@@ -31,9 +31,12 @@ import (
|
||||
batchv1 "k8s.io/api/batch/v1"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
eventsv1 "k8s.io/api/events/v1"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/apimachinery/pkg/util/validation/field"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
"k8s.io/apiserver/pkg/util/feature"
|
||||
@@ -1026,6 +1029,182 @@ func TestIndexedJob(t *testing.T) {
|
||||
validateTerminatedPodsTrackingFinalizerMetric(t, 5)
|
||||
}
|
||||
|
||||
func TestElasticIndexedJob(t *testing.T) {
|
||||
const initialCompletions int32 = 3
|
||||
type jobUpdate struct {
|
||||
completions *int32
|
||||
succeedIndexes []int
|
||||
failIndexes []int
|
||||
wantSucceededIndexes string
|
||||
wantFailed int
|
||||
wantRemainingIndexes sets.Int
|
||||
wantActivePods int
|
||||
}
|
||||
cases := map[string]struct {
|
||||
featureGate bool
|
||||
jobUpdates []jobUpdate
|
||||
wantErr *apierrors.StatusError
|
||||
}{
|
||||
"feature flag off, mutation not allowed": {
|
||||
jobUpdates: []jobUpdate{
|
||||
{
|
||||
completions: pointer.Int32Ptr(4),
|
||||
},
|
||||
},
|
||||
wantErr: apierrors.NewInvalid(
|
||||
schema.GroupKind{Group: "batch", Kind: "Job"},
|
||||
"test-job",
|
||||
field.ErrorList{field.Invalid(field.NewPath("spec", "completions"), 4, "field is immutable")},
|
||||
),
|
||||
},
|
||||
"scale up": {
|
||||
featureGate: true,
|
||||
jobUpdates: []jobUpdate{
|
||||
{
|
||||
// Scale up completions 3->4 then succeed indexes 0-3
|
||||
completions: pointer.Int32Ptr(4),
|
||||
succeedIndexes: []int{0, 1, 2, 3},
|
||||
wantSucceededIndexes: "0-3",
|
||||
},
|
||||
},
|
||||
},
|
||||
"scale down": {
|
||||
featureGate: true,
|
||||
jobUpdates: []jobUpdate{
|
||||
// First succeed index 1 and fail index 2 while completions is still original value (3).
|
||||
{
|
||||
succeedIndexes: []int{1},
|
||||
failIndexes: []int{2},
|
||||
wantSucceededIndexes: "1",
|
||||
wantFailed: 1,
|
||||
wantRemainingIndexes: sets.NewInt(0, 2),
|
||||
wantActivePods: 2,
|
||||
},
|
||||
// Scale down completions 3->1, verify prev failure out of range still counts
|
||||
// but succeeded out of range does not.
|
||||
{
|
||||
completions: pointer.Int32Ptr(1),
|
||||
succeedIndexes: []int{0},
|
||||
wantSucceededIndexes: "0",
|
||||
wantFailed: 1,
|
||||
},
|
||||
},
|
||||
},
|
||||
"index finishes successfully, scale down, scale up": {
|
||||
featureGate: true,
|
||||
jobUpdates: []jobUpdate{
|
||||
// First succeed index 2 while completions is still original value (3).
|
||||
{
|
||||
succeedIndexes: []int{2},
|
||||
wantSucceededIndexes: "2",
|
||||
wantRemainingIndexes: sets.NewInt(0, 1),
|
||||
wantActivePods: 2,
|
||||
},
|
||||
// Scale completions down 3->2 to exclude previously succeeded index.
|
||||
{
|
||||
completions: pointer.Int32Ptr(2),
|
||||
wantRemainingIndexes: sets.NewInt(0, 1),
|
||||
wantActivePods: 2,
|
||||
},
|
||||
// Scale completions back up to include previously succeeded index that was temporarily out of range.
|
||||
{
|
||||
completions: pointer.Int32Ptr(3),
|
||||
succeedIndexes: []int{0, 1, 2},
|
||||
wantSucceededIndexes: "0-2",
|
||||
},
|
||||
},
|
||||
},
|
||||
"scale down to 0, verify that the job succeeds": {
|
||||
featureGate: true,
|
||||
jobUpdates: []jobUpdate{
|
||||
{
|
||||
completions: pointer.Int32Ptr(0),
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for name, tc := range cases {
|
||||
tc := tc
|
||||
t.Run(name, func(t *testing.T) {
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.ElasticIndexedJob, tc.featureGate)()
|
||||
closeFn, restConfig, clientSet, ns := setup(t, "indexed")
|
||||
defer closeFn()
|
||||
ctx, cancel := startJobControllerAndWaitForCaches(restConfig)
|
||||
defer cancel()
|
||||
resetMetrics()
|
||||
|
||||
// Set up initial Job in Indexed completion mode.
|
||||
mode := batchv1.IndexedCompletion
|
||||
jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
|
||||
Spec: batchv1.JobSpec{
|
||||
Parallelism: pointer.Int32Ptr(initialCompletions),
|
||||
Completions: pointer.Int32Ptr(initialCompletions),
|
||||
CompletionMode: &mode,
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create Job: %v", err)
|
||||
}
|
||||
jobClient := clientSet.BatchV1().Jobs(jobObj.Namespace)
|
||||
|
||||
// Wait for pods to start up.
|
||||
err = wait.PollImmediate(5*time.Millisecond, wait.ForeverTestTimeout, func() (done bool, err error) {
|
||||
job, err := jobClient.Get(ctx, jobObj.Name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if job.Status.Active == int32(initialCompletions) {
|
||||
return true, nil
|
||||
}
|
||||
return false, nil
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Error waiting for Job pods to become active: %v", err)
|
||||
}
|
||||
|
||||
for _, update := range tc.jobUpdates {
|
||||
// Update Job spec if necessary.
|
||||
if update.completions != nil {
|
||||
if jobObj, err = updateJob(ctx, jobClient, jobObj.Name, func(j *batchv1.Job) {
|
||||
j.Spec.Completions = update.completions
|
||||
j.Spec.Parallelism = update.completions
|
||||
}); err != nil {
|
||||
if diff := cmp.Diff(tc.wantErr, err); diff != "" {
|
||||
t.Fatalf("Unexpected or missing errors (-want/+got): %s", diff)
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Succeed specified indexes.
|
||||
for _, idx := range update.succeedIndexes {
|
||||
if err := setJobPhaseForIndex(ctx, clientSet, jobObj, v1.PodSucceeded, idx); err != nil {
|
||||
t.Fatalf("Failed trying to succeed pod with index %d: %v", idx, err)
|
||||
}
|
||||
}
|
||||
|
||||
// Fail specified indexes.
|
||||
for _, idx := range update.failIndexes {
|
||||
if err := setJobPhaseForIndex(ctx, clientSet, jobObj, v1.PodFailed, idx); err != nil {
|
||||
t.Fatalf("Failed trying to fail pod with index %d: %v", idx, err)
|
||||
}
|
||||
}
|
||||
|
||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
|
||||
Active: update.wantActivePods,
|
||||
Succeeded: len(update.succeedIndexes),
|
||||
Failed: update.wantFailed,
|
||||
Ready: pointer.Int32(0),
|
||||
})
|
||||
validateIndexedJobPods(ctx, t, clientSet, jobObj, update.wantRemainingIndexes, update.wantSucceededIndexes)
|
||||
}
|
||||
|
||||
validateJobSucceeded(ctx, t, clientSet, jobObj)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// BenchmarkLargeIndexedJob benchmarks the completion of an Indexed Job.
|
||||
// We expect that large jobs are more commonly used as Indexed. And they are
|
||||
// also faster to track, as they need less API calls.
|
||||
|
Reference in New Issue
Block a user