Job: Support for the JobSuccessPolicy (alpha)

Signed-off-by: Yuki Iwai <yuki.iwai.tz@gmail.com>
This commit is contained in:
Yuki Iwai
2024-02-21 15:49:35 +09:00
parent 05cb0a55c8
commit e216742672
35 changed files with 3874 additions and 149 deletions

View File

@@ -66,6 +66,11 @@ const (
// maximum length of the value of the managedBy field
maxManagedByLength = 63
// maximum length of succeededIndexes in JobSuccessPolicy.
maxJobSuccessPolicySucceededIndexesLimit = 64 * 1024
// maximum number of rules in successPolicy.
maxSuccessPolicyRule = 20
)
var (
@@ -259,6 +264,13 @@ func validateJobSpec(spec *batch.JobSpec, fldPath *field.Path, opts apivalidatio
if spec.PodFailurePolicy != nil {
allErrs = append(allErrs, validatePodFailurePolicy(spec, fldPath.Child("podFailurePolicy"))...)
}
if spec.SuccessPolicy != nil {
if ptr.Deref(spec.CompletionMode, batch.NonIndexedCompletion) != batch.IndexedCompletion {
allErrs = append(allErrs, field.Invalid(fldPath.Child("successPolicy"), *spec.SuccessPolicy, "requires indexed completion mode"))
} else {
allErrs = append(allErrs, validateSuccessPolicy(spec, fldPath.Child("successPolicy"))...)
}
}
allErrs = append(allErrs, validatePodReplacementPolicy(spec, fldPath.Child("podReplacementPolicy"))...)
@@ -400,6 +412,50 @@ func validatePodFailurePolicyRuleOnExitCodes(onExitCode *batch.PodFailurePolicyO
return allErrs
}
func validateSuccessPolicy(spec *batch.JobSpec, fldPath *field.Path) field.ErrorList {
var allErrs field.ErrorList
rulesPath := fldPath.Child("rules")
if len(spec.SuccessPolicy.Rules) == 0 {
allErrs = append(allErrs, field.Required(rulesPath, "at least one rules must be specified when the successPolicy is specified"))
}
if len(spec.SuccessPolicy.Rules) > maxSuccessPolicyRule {
allErrs = append(allErrs, field.TooMany(rulesPath, len(spec.SuccessPolicy.Rules), maxSuccessPolicyRule))
}
for i, rule := range spec.SuccessPolicy.Rules {
allErrs = append(allErrs, validateSuccessPolicyRule(spec, &rule, rulesPath.Index(i))...)
}
return allErrs
}
func validateSuccessPolicyRule(spec *batch.JobSpec, rule *batch.SuccessPolicyRule, rulePath *field.Path) field.ErrorList {
var allErrs field.ErrorList
if rule.SucceededCount == nil && rule.SucceededIndexes == nil {
allErrs = append(allErrs, field.Required(rulePath, "at least one of succeededCount or succeededIndexes must be specified"))
}
var totalIndexes int32
if rule.SucceededIndexes != nil {
succeededIndexes := rulePath.Child("succeededIndexes")
if len(*rule.SucceededIndexes) > maxJobSuccessPolicySucceededIndexesLimit {
allErrs = append(allErrs, field.TooLong(succeededIndexes, *rule.SucceededIndexes, maxJobSuccessPolicySucceededIndexesLimit))
}
var err error
if totalIndexes, err = validateIndexesFormat(*rule.SucceededIndexes, *spec.Completions); err != nil {
allErrs = append(allErrs, field.Invalid(succeededIndexes, *rule.SucceededIndexes, fmt.Sprintf("error parsing succeededIndexes: %s", err.Error())))
}
}
if rule.SucceededCount != nil {
succeededCountPath := rulePath.Child("succeededCount")
allErrs = append(allErrs, apivalidation.ValidateNonnegativeField(int64(*rule.SucceededCount), succeededCountPath)...)
if *rule.SucceededCount > *spec.Completions {
allErrs = append(allErrs, field.Invalid(succeededCountPath, *rule.SucceededCount, fmt.Sprintf("must be less than or equal to %d (the number of specified completions)", *spec.Completions)))
}
if rule.SucceededIndexes != nil && *rule.SucceededCount > totalIndexes {
allErrs = append(allErrs, field.Invalid(succeededCountPath, *rule.SucceededCount, fmt.Sprintf("must be less than or equal to %d (the number of indexes in the specified succeededIndexes field)", totalIndexes)))
}
}
return allErrs
}
// validateJobStatus validates a JobStatus and returns an ErrorList with any errors.
func validateJobStatus(job *batch.Job, fldPath *field.Path, opts JobStatusValidationOptions) field.ErrorList {
allErrs := field.ErrorList{}
@@ -485,14 +541,14 @@ func validateJobStatus(job *batch.Job, fldPath *field.Path, opts JobStatusValida
}
if opts.RejectInvalidCompletedIndexes {
if job.Spec.Completions != nil {
if err := validateIndexesFormat(status.CompletedIndexes, int32(*job.Spec.Completions)); err != nil {
if _, err := validateIndexesFormat(status.CompletedIndexes, int32(*job.Spec.Completions)); err != nil {
allErrs = append(allErrs, field.Invalid(fldPath.Child("completedIndexes"), status.CompletedIndexes, fmt.Sprintf("error parsing completedIndexes: %s", err.Error())))
}
}
}
if opts.RejectInvalidFailedIndexes {
if job.Spec.Completions != nil && job.Spec.BackoffLimitPerIndex != nil && status.FailedIndexes != nil {
if err := validateIndexesFormat(*status.FailedIndexes, int32(*job.Spec.Completions)); err != nil {
if _, err := validateIndexesFormat(*status.FailedIndexes, int32(*job.Spec.Completions)); err != nil {
allErrs = append(allErrs, field.Invalid(fldPath.Child("failedIndexes"), status.FailedIndexes, fmt.Sprintf("error parsing failedIndexes: %s", err.Error())))
}
}
@@ -522,6 +578,21 @@ func validateJobStatus(job *batch.Job, fldPath *field.Path, opts JobStatusValida
}
}
}
if ptr.Deref(job.Spec.CompletionMode, batch.NonIndexedCompletion) != batch.IndexedCompletion && isJobSuccessCriteriaMet(job) {
allErrs = append(allErrs, field.Invalid(fldPath.Child("conditions"), field.OmitValueType{}, "cannot set SuccessCriteriaMet to NonIndexed Job"))
}
if isJobSuccessCriteriaMet(job) && IsJobFailed(job) {
allErrs = append(allErrs, field.Invalid(fldPath.Child("conditions"), field.OmitValueType{}, "cannot set SuccessCriteriaMet=True and Failed=true conditions"))
}
if isJobSuccessCriteriaMet(job) && isJobFailureTarget(job) {
allErrs = append(allErrs, field.Invalid(fldPath.Child("conditions"), field.OmitValueType{}, "cannot set SuccessCriteriaMet=True and FailureTarget=true conditions"))
}
if job.Spec.SuccessPolicy == nil && isJobSuccessCriteriaMet(job) {
allErrs = append(allErrs, field.Invalid(fldPath.Child("conditions"), field.OmitValueType{}, "cannot set SuccessCriteriaMet=True for Job without SuccessPolicy"))
}
if job.Spec.SuccessPolicy != nil && !isJobSuccessCriteriaMet(job) && IsJobComplete(job) {
allErrs = append(allErrs, field.Invalid(fldPath.Child("conditions"), field.OmitValueType{}, "cannot set Complete=True for Job with SuccessPolicy unless SuccessCriteriaMet=True"))
}
return allErrs
}
@@ -550,6 +621,7 @@ func ValidateJobSpecUpdate(spec, oldSpec batch.JobSpec, fldPath *field.Path, opt
allErrs = append(allErrs, apivalidation.ValidateImmutableField(spec.PodFailurePolicy, oldSpec.PodFailurePolicy, fldPath.Child("podFailurePolicy"))...)
allErrs = append(allErrs, apivalidation.ValidateImmutableField(spec.BackoffLimitPerIndex, oldSpec.BackoffLimitPerIndex, fldPath.Child("backoffLimitPerIndex"))...)
allErrs = append(allErrs, apivalidation.ValidateImmutableField(spec.ManagedBy, oldSpec.ManagedBy, fldPath.Child("managedBy"))...)
allErrs = append(allErrs, apivalidation.ValidateImmutableField(spec.SuccessPolicy, oldSpec.SuccessPolicy, fldPath.Child("successPolicy"))...)
return allErrs
}
@@ -621,6 +693,12 @@ func ValidateJobStatusUpdate(job, oldJob *batch.Job, opts JobStatusValidationOpt
allErrs = append(allErrs, field.Required(statusFld.Child("startTime"), "startTime cannot be removed for unsuspended job"))
}
}
if isJobSuccessCriteriaMet(oldJob) && !isJobSuccessCriteriaMet(job) {
allErrs = append(allErrs, field.Invalid(statusFld.Child("conditions"), field.OmitValueType{}, "cannot disable the SuccessCriteriaMet=True condition"))
}
if IsJobComplete(oldJob) && !isJobSuccessCriteriaMet(oldJob) && isJobSuccessCriteriaMet(job) {
allErrs = append(allErrs, field.Invalid(statusFld.Child("conditions"), field.OmitValueType{}, "cannot set SuccessCriteriaMet=True for Job already has Complete=true conditions"))
}
return allErrs
}
@@ -815,6 +893,14 @@ func IsJobFailed(job *batch.Job) bool {
return IsConditionTrue(job.Status.Conditions, batch.JobFailed)
}
func isJobSuccessCriteriaMet(job *batch.Job) bool {
return IsConditionTrue(job.Status.Conditions, batch.JobSuccessCriteriaMet)
}
func isJobFailureTarget(job *batch.Job) bool {
return IsConditionTrue(job.Status.Conditions, batch.JobFailureTarget)
}
func IsConditionTrue(list []batch.JobCondition, cType batch.JobConditionType) bool {
for _, c := range list {
if c.Type == cType && c.Status == api.ConditionTrue {
@@ -870,22 +956,24 @@ func validateFailedIndexesNotOverlapCompleted(completedIndexesStr string, failed
return nil
}
func validateIndexesFormat(indexesStr string, completions int32) error {
func validateIndexesFormat(indexesStr string, completions int32) (int32, error) {
if len(indexesStr) == 0 {
return nil
return 0, nil
}
var lastIndex *int32
var total int32
for _, intervalStr := range strings.Split(indexesStr, ",") {
x, y, err := parseIndexInterval(intervalStr, completions)
if err != nil {
return err
return 0, err
}
if lastIndex != nil && *lastIndex >= x {
return fmt.Errorf("non-increasing order, previous: %d, current: %d", *lastIndex, x)
return 0, fmt.Errorf("non-increasing order, previous: %d, current: %d", *lastIndex, x)
}
total += y - x + 1
lastIndex = &y
}
return nil
return total, nil
}
func parseIndexInterval(intervalStr string, completions int32) (int32, int32, error) {