Extend the Job API for BackoffLimitPerIndex

This commit is contained in:
Michal Wozniak
2023-07-13 09:36:07 +02:00
parent b2a9c06b2e
commit fcbfdc1710
31 changed files with 1359 additions and 133 deletions

View File

@@ -42,7 +42,16 @@ import (
// .status.completedIndexes.
const maxParallelismForIndexedJob = 100000
// maxFailedIndexesForIndexedJob is the maximum number of failed indexes that
// an Indexed Job is allowed to have. This threshold allows to cap the length of
// .status.completedIndexes and .status.failedIndexes.
const maxFailedIndexesForIndexedJob = 100_000
const (
completionsSoftLimit = 100_000
parallelismLimitForHighCompletions = 10_000
maxFailedIndexesLimitForHighCompletions = 10_000
// maximum number of rules in pod failure policy
maxPodFailurePolicyRules = 20
@@ -56,6 +65,7 @@ const (
var (
supportedPodFailurePolicyActions = sets.New(
string(batch.PodFailurePolicyActionCount),
string(batch.PodFailurePolicyActionFailIndex),
string(batch.PodFailurePolicyActionFailJob),
string(batch.PodFailurePolicyActionIgnore))
@@ -182,6 +192,15 @@ func validateJobSpec(spec *batch.JobSpec, fldPath *field.Path, opts apivalidatio
if spec.TTLSecondsAfterFinished != nil {
allErrs = append(allErrs, apivalidation.ValidateNonnegativeField(int64(*spec.TTLSecondsAfterFinished), fldPath.Child("ttlSecondsAfterFinished"))...)
}
if spec.BackoffLimitPerIndex != nil {
allErrs = append(allErrs, apivalidation.ValidateNonnegativeField(int64(*spec.BackoffLimitPerIndex), fldPath.Child("backoffLimitPerIndex"))...)
}
if spec.MaxFailedIndexes != nil {
allErrs = append(allErrs, apivalidation.ValidateNonnegativeField(int64(*spec.MaxFailedIndexes), fldPath.Child("maxFailedIndexes"))...)
if spec.BackoffLimitPerIndex == nil {
allErrs = append(allErrs, field.Required(fldPath.Child("backoffLimitPerIndex"), fmt.Sprintf("when maxFailedIndexes is specified")))
}
}
if spec.CompletionMode != nil {
if *spec.CompletionMode != batch.NonIndexedCompletion && *spec.CompletionMode != batch.IndexedCompletion {
allErrs = append(allErrs, field.NotSupported(fldPath.Child("completionMode"), spec.CompletionMode, []string{string(batch.NonIndexedCompletion), string(batch.IndexedCompletion)}))
@@ -193,6 +212,31 @@ func validateJobSpec(spec *batch.JobSpec, fldPath *field.Path, opts apivalidatio
if spec.Parallelism != nil && *spec.Parallelism > maxParallelismForIndexedJob {
allErrs = append(allErrs, field.Invalid(fldPath.Child("parallelism"), *spec.Parallelism, fmt.Sprintf("must be less than or equal to %d when completion mode is %s", maxParallelismForIndexedJob, batch.IndexedCompletion)))
}
if spec.Completions != nil && spec.MaxFailedIndexes != nil && *spec.MaxFailedIndexes > *spec.Completions {
allErrs = append(allErrs, field.Invalid(fldPath.Child("maxFailedIndexes"), *spec.MaxFailedIndexes, "must be less than or equal to completions"))
}
if spec.MaxFailedIndexes != nil && *spec.MaxFailedIndexes > maxFailedIndexesForIndexedJob {
allErrs = append(allErrs, field.Invalid(fldPath.Child("maxFailedIndexes"), *spec.MaxFailedIndexes, fmt.Sprintf("must be less than or equal to %d", maxFailedIndexesForIndexedJob)))
}
if spec.Completions != nil && *spec.Completions > completionsSoftLimit && spec.BackoffLimitPerIndex != nil {
if spec.MaxFailedIndexes == nil {
allErrs = append(allErrs, field.Required(fldPath.Child("maxFailedIndexes"), fmt.Sprintf("must be specified when completions is above %d", completionsSoftLimit)))
}
if spec.Parallelism != nil && *spec.Parallelism > parallelismLimitForHighCompletions {
allErrs = append(allErrs, field.Invalid(fldPath.Child("parallelism"), *spec.Parallelism, fmt.Sprintf("must be less than or equal to %d when completions are above %d and used with backoff limit per index", parallelismLimitForHighCompletions, completionsSoftLimit)))
}
if spec.MaxFailedIndexes != nil && *spec.MaxFailedIndexes > maxFailedIndexesLimitForHighCompletions {
allErrs = append(allErrs, field.Invalid(fldPath.Child("maxFailedIndexes"), *spec.MaxFailedIndexes, fmt.Sprintf("must be less than or equal to %d when completions are above %d and used with backoff limit per index", maxFailedIndexesLimitForHighCompletions, completionsSoftLimit)))
}
}
}
}
if spec.CompletionMode == nil || *spec.CompletionMode == batch.NonIndexedCompletion {
if spec.BackoffLimitPerIndex != nil {
allErrs = append(allErrs, field.Invalid(fldPath.Child("backoffLimitPerIndex"), *spec.BackoffLimitPerIndex, "requires indexed completion mode"))
}
if spec.MaxFailedIndexes != nil {
allErrs = append(allErrs, field.Invalid(fldPath.Child("maxFailedIndexes"), *spec.MaxFailedIndexes, "requires indexed completion mode"))
}
}
@@ -232,16 +276,20 @@ func validatePodFailurePolicy(spec *batch.JobSpec, fldPath *field.Path) field.Er
containerNames.Insert(containerSpec.Name)
}
for i, rule := range spec.PodFailurePolicy.Rules {
allErrs = append(allErrs, validatePodFailurePolicyRule(&rule, rulesPath.Index(i), containerNames)...)
allErrs = append(allErrs, validatePodFailurePolicyRule(spec, &rule, rulesPath.Index(i), containerNames)...)
}
return allErrs
}
func validatePodFailurePolicyRule(rule *batch.PodFailurePolicyRule, rulePath *field.Path, containerNames sets.String) field.ErrorList {
func validatePodFailurePolicyRule(spec *batch.JobSpec, rule *batch.PodFailurePolicyRule, rulePath *field.Path, containerNames sets.String) field.ErrorList {
var allErrs field.ErrorList
actionPath := rulePath.Child("action")
if rule.Action == "" {
allErrs = append(allErrs, field.Required(actionPath, fmt.Sprintf("valid values: %q", sets.List(supportedPodFailurePolicyActions))))
} else if rule.Action == batch.PodFailurePolicyActionFailIndex {
if spec.BackoffLimitPerIndex == nil {
allErrs = append(allErrs, field.Invalid(actionPath, rule.Action, "requires the backoffLimitPerIndex to be set"))
}
} else if !supportedPodFailurePolicyActions.Has(string(rule.Action)) {
allErrs = append(allErrs, field.NotSupported(actionPath, rule.Action, sets.List(supportedPodFailurePolicyActions)))
}
@@ -377,6 +425,7 @@ func ValidateJobSpecUpdate(spec, oldSpec batch.JobSpec, fldPath *field.Path, opt
allErrs = append(allErrs, validatePodTemplateUpdate(spec, oldSpec, fldPath, opts)...)
allErrs = append(allErrs, apivalidation.ValidateImmutableField(spec.CompletionMode, oldSpec.CompletionMode, fldPath.Child("completionMode"))...)
allErrs = append(allErrs, apivalidation.ValidateImmutableField(spec.PodFailurePolicy, oldSpec.PodFailurePolicy, fldPath.Child("podFailurePolicy"))...)
allErrs = append(allErrs, apivalidation.ValidateImmutableField(spec.BackoffLimitPerIndex, oldSpec.BackoffLimitPerIndex, fldPath.Child("backoffLimitPerIndex"))...)
return allErrs
}