update validation logic so completions is mutable iff completions is modified in tandem with parallelsim so completions == parallelism

This commit is contained in:
Daniel Vega-Myhre
2023-01-20 23:55:54 +00:00
parent b86f94f438
commit d41302312e
5 changed files with 354 additions and 4 deletions

View File

@@ -31,9 +31,12 @@ import (
batchv1 "k8s.io/api/batch/v1"
v1 "k8s.io/api/core/v1"
eventsv1 "k8s.io/api/events/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/validation/field"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/util/feature"
@@ -1026,6 +1029,196 @@ func TestIndexedJob(t *testing.T) {
validateTerminatedPodsTrackingFinalizerMetric(t, 5)
}
func TestElasticIndexedJob(t *testing.T) {
const (
noCompletionsUpdate = -1
initialCompletions = 3
)
type jobUpdate struct {
completions int
succeededIndexes []int
failedIndexes []int
wantSucceededIndexes string
wantFailed int
}
cases := map[string]struct {
featureGate bool
jobUpdates []jobUpdate
wantErr *apierrors.StatusError
}{
"feature flag off, mutation not allowed": {
jobUpdates: []jobUpdate{
{
completions: 4,
},
},
wantErr: apierrors.NewInvalid(
schema.GroupKind{Group: "batch", Kind: "Job"},
"test-job",
field.ErrorList{field.Invalid(field.NewPath("spec", "completions"), 4, "field is immutable")},
),
},
"scale up, verify that the range expands, and the job finishes successfully when indexes including the ones in the new range exit successfully": {
featureGate: true,
jobUpdates: []jobUpdate{
{
// Scale up completions 3->4 then succeed indexes 0-3
completions: 4,
succeededIndexes: []int{0, 1, 2, 3},
wantSucceededIndexes: "0-3",
},
},
},
"scale down, verify that the range shrinks, and the job finishes successfully when indexes only in the smaller range finishes successfully, and verify that failures that happened for indexes that are now outside the range still count": {
featureGate: true,
jobUpdates: []jobUpdate{
// First succeed index 1 and fail index 2 while completions is still original value (3).
{
completions: noCompletionsUpdate,
succeededIndexes: []int{1},
failedIndexes: []int{2},
wantSucceededIndexes: "1",
wantFailed: 1,
},
// Scale down completions 3->1, verify prev failure out of range still counts
// but succeeded out of range does not.
{
completions: 1,
succeededIndexes: []int{0},
wantSucceededIndexes: "0",
wantFailed: 1,
},
},
},
"index finishes successfully, scale down to exclude the index, then scale up to include it back. verify that the index was restarted and job finishes successfully after all indexes complete": {
featureGate: true,
jobUpdates: []jobUpdate{
// First succeed index 2 while completions is still original value (3).
{
completions: noCompletionsUpdate,
succeededIndexes: []int{2},
wantSucceededIndexes: "2",
},
// Scale completions down 3->2 to exclude previously succeeded index.
{
completions: 2,
},
// Scale completions back up to include previously succeeded index that was temporarily out of range.
{
completions: 3,
succeededIndexes: []int{0, 1, 2},
wantSucceededIndexes: "0-2",
},
},
},
"scale down to 0, verify that the job succeeds": {
featureGate: true,
jobUpdates: []jobUpdate{
{},
},
},
}
for name, tc := range cases {
tc := tc
t.Run(name, func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.ElasticIndexedJob, tc.featureGate)()
closeFn, restConfig, clientSet, ns := setup(t, "indexed")
defer closeFn()
ctx, cancel := startJobControllerAndWaitForCaches(restConfig)
defer cancel()
resetMetrics()
// Set up initial Job in Indexed completion mode.
mode := batchv1.IndexedCompletion
jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
Spec: batchv1.JobSpec{
Parallelism: pointer.Int32Ptr(int32(initialCompletions)),
Completions: pointer.Int32Ptr(int32(initialCompletions)),
CompletionMode: &mode,
},
})
if err != nil {
t.Fatalf("Failed to create Job: %v", err)
}
jobClient := clientSet.BatchV1().Jobs(jobObj.Namespace)
// Wait for pods to start up.
err = wait.PollImmediate(5*time.Millisecond, wait.ForeverTestTimeout, func() (done bool, err error) {
job, err := jobClient.Get(ctx, jobObj.Name, metav1.GetOptions{})
if err != nil {
return false, err
}
if job.Status.Active == int32(initialCompletions) {
return true, nil
}
return false, nil
})
if err != nil {
t.Fatalf("Error waiting for Job pods to become active: %v", err)
}
currentCompletions := initialCompletions
for _, update := range tc.jobUpdates {
// Update Job spec if necessary.
if update.completions != noCompletionsUpdate {
if jobObj, err = updateJob(ctx, jobClient, jobObj.Name, func(j *batchv1.Job) {
j.Spec.Completions = pointer.Int32Ptr(int32(update.completions))
j.Spec.Parallelism = pointer.Int32Ptr(int32(update.completions))
}); err != nil {
if tc.wantErr == nil {
t.Fatalf("Failed to update Job: %v", err)
}
statusErr := err.(*apierrors.StatusError)
if diff := cmp.Diff(tc.wantErr, statusErr); diff != "" {
t.Fatalf("Unexpected or missing errors (-want/+got): %s", diff)
}
return
}
currentCompletions = update.completions
}
// Succeed specified indexes.
succeededSet := sets.NewInt()
if update.succeededIndexes != nil {
for _, idx := range update.succeededIndexes {
if err := setJobPhaseForIndex(ctx, clientSet, jobObj, v1.PodSucceeded, idx); err != nil {
t.Fatalf("Failed trying to succeed pod with index %d: %v", idx, err)
}
succeededSet.Insert(idx)
}
}
// Fail specified indexes.
if update.failedIndexes != nil {
for _, idx := range update.failedIndexes {
if err := setJobPhaseForIndex(ctx, clientSet, jobObj, v1.PodFailed, idx); err != nil {
t.Fatalf("Failed trying to fail pod with index %d: %v", idx, err)
}
}
}
remainingIndexes := remainingIndexSet(currentCompletions, succeededSet)
newActive := len(remainingIndexes)
// initialCompletions == initial parallelism, and active must be <= parallelism.
if newActive > currentCompletions && currentCompletions != noCompletionsUpdate {
newActive = currentCompletions
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Active: newActive,
Succeeded: len(succeededSet),
Failed: update.wantFailed,
Ready: pointer.Int32(0),
})
validateIndexedJobPods(ctx, t, clientSet, jobObj, remainingIndexes, update.wantSucceededIndexes)
}
validateJobSucceeded(ctx, t, clientSet, jobObj)
})
}
}
// BenchmarkLargeIndexedJob benchmarks the completion of an Indexed Job.
// We expect that large jobs are more commonly used as Indexed. And they are
// also faster to track, as they need less API calls.
@@ -1884,3 +2077,13 @@ func updateJob(ctx context.Context, jobClient typedv1.JobInterface, jobName stri
})
return job, err
}
func remainingIndexSet(completions int, exclude sets.Int) sets.Int {
s := sets.NewInt()
for i := 0; i < completions; i++ {
if !exclude.Has(i) {
s.Insert(i)
}
}
return s
}