Support for the Job managedBy field (alpha) (#123273)

* support for the managed-by label in Job

* Use managedBy field instead of managed-by label

* Additional review remarks

* Review remarks 2

* review remarks 3

* Skip cleanup of finalizers for job with custom managedBy

* Drop the performance optimization

* imrpove logs
This commit is contained in:
Michał Woźniak
2024-03-05 18:25:15 +01:00
committed by GitHub
parent a81411594a
commit e568a77a93
32 changed files with 2445 additions and 177 deletions

View File

@@ -19,6 +19,7 @@ package validation
import (
"fmt"
"regexp"
"strconv"
"strings"
"time"
@@ -36,6 +37,7 @@ import (
api "k8s.io/kubernetes/pkg/apis/core"
apivalidation "k8s.io/kubernetes/pkg/apis/core/validation"
"k8s.io/utils/pointer"
"k8s.io/utils/ptr"
)
// maxParallelismForIndexJob is the maximum parallelism that an Indexed Job
@@ -61,6 +63,9 @@ const (
// maximum number of patterns for a OnPodConditions requirement in pod failure policy
maxPodFailurePolicyOnPodConditionsPatterns = 20
// maximum length of the value of the managedBy field
maxManagedByLength = 63
)
var (
@@ -206,6 +211,12 @@ func validateJobSpec(spec *batch.JobSpec, fldPath *field.Path, opts apivalidatio
allErrs = append(allErrs, field.Required(fldPath.Child("backoffLimitPerIndex"), fmt.Sprintf("when maxFailedIndexes is specified")))
}
}
if spec.ManagedBy != nil {
allErrs = append(allErrs, apimachineryvalidation.IsDomainPrefixedPath(fldPath.Child("managedBy"), *spec.ManagedBy)...)
if len(*spec.ManagedBy) > maxManagedByLength {
allErrs = append(allErrs, field.TooLongMaxLength(fldPath.Child("managedBy"), *spec.ManagedBy, maxManagedByLength))
}
}
if spec.CompletionMode != nil {
if *spec.CompletionMode != batch.NonIndexedCompletion && *spec.CompletionMode != batch.IndexedCompletion {
allErrs = append(allErrs, field.NotSupported(fldPath.Child("completionMode"), spec.CompletionMode, []batch.CompletionMode{batch.NonIndexedCompletion, batch.IndexedCompletion}))
@@ -390,8 +401,9 @@ func validatePodFailurePolicyRuleOnExitCodes(onExitCode *batch.PodFailurePolicyO
}
// validateJobStatus validates a JobStatus and returns an ErrorList with any errors.
func validateJobStatus(status *batch.JobStatus, fldPath *field.Path) field.ErrorList {
func validateJobStatus(job *batch.Job, fldPath *field.Path, opts JobStatusValidationOptions) field.ErrorList {
allErrs := field.ErrorList{}
status := job.Status
allErrs = append(allErrs, apivalidation.ValidateNonnegativeField(int64(status.Active), fldPath.Child("active"))...)
allErrs = append(allErrs, apivalidation.ValidateNonnegativeField(int64(status.Succeeded), fldPath.Child("succeeded"))...)
allErrs = append(allErrs, apivalidation.ValidateNonnegativeField(int64(status.Failed), fldPath.Child("failed"))...)
@@ -425,6 +437,91 @@ func validateJobStatus(status *batch.JobStatus, fldPath *field.Path) field.Error
}
}
}
if opts.RejectCompleteJobWithFailedCondition {
if IsJobComplete(job) && IsJobFailed(job) {
allErrs = append(allErrs, field.Invalid(fldPath.Child("conditions"), field.OmitValueType{}, "cannot set Complete=True and Failed=true conditions"))
}
}
if opts.RejectCompleteJobWithFailureTargetCondition {
if IsJobComplete(job) && IsConditionTrue(status.Conditions, batch.JobFailureTarget) {
allErrs = append(allErrs, field.Invalid(fldPath.Child("conditions"), field.OmitValueType{}, "cannot set Complete=True and FailureTarget=true conditions"))
}
}
if opts.RejectNotCompleteJobWithCompletionTime {
if status.CompletionTime != nil && !IsJobComplete(job) {
allErrs = append(allErrs, field.Invalid(fldPath.Child("completionTime"), status.CompletionTime, "cannot set completionTime when there is no Complete=True condition"))
}
}
if opts.RejectCompleteJobWithoutCompletionTime {
if status.CompletionTime == nil && IsJobComplete(job) {
allErrs = append(allErrs, field.Required(fldPath.Child("completionTime"), "completionTime is required for Complete jobs"))
}
}
if opts.RejectCompletionTimeBeforeStartTime {
if status.StartTime != nil && status.CompletionTime != nil && status.CompletionTime.Before(status.StartTime) {
allErrs = append(allErrs, field.Invalid(fldPath.Child("completionTime"), status.CompletionTime, "completionTime cannot be set before startTime"))
}
}
isJobFinished := IsJobFinished(job)
if opts.RejectFinishedJobWithActivePods {
if status.Active > 0 && isJobFinished {
allErrs = append(allErrs, field.Invalid(fldPath.Child("active"), status.Active, "active>0 is invalid for finished job"))
}
}
if opts.RejectFinishedJobWithTerminatingPods {
if status.Terminating != nil && *status.Terminating > 0 && isJobFinished {
allErrs = append(allErrs, field.Invalid(fldPath.Child("terminating"), status.Terminating, "terminating>0 is invalid for finished job"))
}
}
if opts.RejectFinishedJobWithoutStartTime {
if status.StartTime == nil && isJobFinished {
allErrs = append(allErrs, field.Required(fldPath.Child("startTime"), "startTime is required for finished job"))
}
}
if opts.RejectFinishedJobWithUncountedTerminatedPods {
if isJobFinished && status.UncountedTerminatedPods != nil && len(status.UncountedTerminatedPods.Failed)+len(status.UncountedTerminatedPods.Succeeded) > 0 {
allErrs = append(allErrs, field.Invalid(fldPath.Child("uncountedTerminatedPods"), status.UncountedTerminatedPods, "uncountedTerminatedPods needs to be empty for finished job"))
}
}
if opts.RejectInvalidCompletedIndexes {
if job.Spec.Completions != nil {
if err := validateIndexesFormat(status.CompletedIndexes, int32(*job.Spec.Completions)); err != nil {
allErrs = append(allErrs, field.Invalid(fldPath.Child("completedIndexes"), status.CompletedIndexes, fmt.Sprintf("error parsing completedIndexes: %s", err.Error())))
}
}
}
if opts.RejectInvalidFailedIndexes {
if job.Spec.Completions != nil && job.Spec.BackoffLimitPerIndex != nil && status.FailedIndexes != nil {
if err := validateIndexesFormat(*status.FailedIndexes, int32(*job.Spec.Completions)); err != nil {
allErrs = append(allErrs, field.Invalid(fldPath.Child("failedIndexes"), status.FailedIndexes, fmt.Sprintf("error parsing failedIndexes: %s", err.Error())))
}
}
}
isIndexed := ptr.Deref(job.Spec.CompletionMode, batch.NonIndexedCompletion) == batch.IndexedCompletion
if opts.RejectCompletedIndexesForNonIndexedJob {
if len(status.CompletedIndexes) != 0 && !isIndexed {
allErrs = append(allErrs, field.Invalid(fldPath.Child("completedIndexes"), status.CompletedIndexes, "cannot set non-empty completedIndexes when non-indexed completion mode"))
}
}
if opts.RejectFailedIndexesForNoBackoffLimitPerIndex {
// Note that this check also verifies that FailedIndexes are not used for
// regular (non-indexed) jobs, because regular jobs have backoffLimitPerIndex = nil.
if job.Spec.BackoffLimitPerIndex == nil && status.FailedIndexes != nil {
allErrs = append(allErrs, field.Invalid(fldPath.Child("failedIndexes"), *status.FailedIndexes, "cannot set non-null failedIndexes when backoffLimitPerIndex is null"))
}
}
if opts.RejectMoreReadyThanActivePods {
if status.Ready != nil && *status.Ready > status.Active {
allErrs = append(allErrs, field.Invalid(fldPath.Child("ready"), *status.Ready, "cannot set more ready pods than active"))
}
}
if opts.RejectFailedIndexesOverlappingCompleted {
if job.Spec.Completions != nil && status.FailedIndexes != nil {
if err := validateFailedIndexesNotOverlapCompleted(status.CompletedIndexes, *status.FailedIndexes, int32(*job.Spec.Completions)); err != nil {
allErrs = append(allErrs, field.Invalid(fldPath.Child("failedIndexes"), *status.FailedIndexes, err.Error()))
}
}
}
return allErrs
}
@@ -436,9 +533,9 @@ func ValidateJobUpdate(job, oldJob *batch.Job, opts JobValidationOptions) field.
}
// ValidateJobUpdateStatus validates an update to the status of a Job and returns an ErrorList with any errors.
func ValidateJobUpdateStatus(job, oldJob *batch.Job) field.ErrorList {
func ValidateJobUpdateStatus(job, oldJob *batch.Job, opts JobStatusValidationOptions) field.ErrorList {
allErrs := apivalidation.ValidateObjectMetaUpdate(&job.ObjectMeta, &oldJob.ObjectMeta, field.NewPath("metadata"))
allErrs = append(allErrs, ValidateJobStatusUpdate(job.Status, oldJob.Status)...)
allErrs = append(allErrs, ValidateJobStatusUpdate(job, oldJob, opts)...)
return allErrs
}
@@ -452,6 +549,7 @@ func ValidateJobSpecUpdate(spec, oldSpec batch.JobSpec, fldPath *field.Path, opt
allErrs = append(allErrs, apivalidation.ValidateImmutableField(spec.CompletionMode, oldSpec.CompletionMode, fldPath.Child("completionMode"))...)
allErrs = append(allErrs, apivalidation.ValidateImmutableField(spec.PodFailurePolicy, oldSpec.PodFailurePolicy, fldPath.Child("podFailurePolicy"))...)
allErrs = append(allErrs, apivalidation.ValidateImmutableField(spec.BackoffLimitPerIndex, oldSpec.BackoffLimitPerIndex, fldPath.Child("backoffLimitPerIndex"))...)
allErrs = append(allErrs, apivalidation.ValidateImmutableField(spec.ManagedBy, oldSpec.ManagedBy, fldPath.Child("managedBy"))...)
return allErrs
}
@@ -486,9 +584,43 @@ func validatePodTemplateUpdate(spec, oldSpec batch.JobSpec, fldPath *field.Path,
}
// ValidateJobStatusUpdate validates an update to a JobStatus and returns an ErrorList with any errors.
func ValidateJobStatusUpdate(status, oldStatus batch.JobStatus) field.ErrorList {
func ValidateJobStatusUpdate(job, oldJob *batch.Job, opts JobStatusValidationOptions) field.ErrorList {
allErrs := field.ErrorList{}
allErrs = append(allErrs, validateJobStatus(&status, field.NewPath("status"))...)
statusFld := field.NewPath("status")
allErrs = append(allErrs, validateJobStatus(job, statusFld, opts)...)
if opts.RejectDisablingTerminalCondition {
for _, cType := range []batch.JobConditionType{batch.JobFailed, batch.JobComplete, batch.JobFailureTarget} {
if IsConditionTrue(oldJob.Status.Conditions, cType) && !IsConditionTrue(job.Status.Conditions, cType) {
allErrs = append(allErrs, field.Invalid(statusFld.Child("conditions"), field.OmitValueType{}, fmt.Sprintf("cannot disable the terminal %s=True condition", string(cType))))
}
}
}
if opts.RejectDecreasingFailedCounter {
if job.Status.Failed < oldJob.Status.Failed {
allErrs = append(allErrs, field.Invalid(statusFld.Child("failed"), job.Status.Failed, "cannot decrease the failed counter"))
}
}
if opts.RejectDecreasingSucceededCounter {
if job.Status.Succeeded < oldJob.Status.Succeeded {
allErrs = append(allErrs, field.Invalid(statusFld.Child("succeeded"), job.Status.Succeeded, "cannot decrease the succeeded counter"))
}
}
if opts.RejectMutatingCompletionTime {
// Note that we check the condition only when `job.Status.CompletionTime != nil`, this is because
// we don't want to block transitions to completionTime = nil when the job is not finished yet.
// Setting completionTime = nil for finished jobs is prevented in RejectCompleteJobWithoutCompletionTime.
if job.Status.CompletionTime != nil && oldJob.Status.CompletionTime != nil && !ptr.Equal(job.Status.CompletionTime, oldJob.Status.CompletionTime) {
allErrs = append(allErrs, field.Invalid(statusFld.Child("completionTime"), job.Status.CompletionTime, "completionTime cannot be mutated"))
}
}
if opts.RejectStartTimeUpdateForUnsuspendedJob {
// Note that we check `oldJob.Status.StartTime != nil` to allow transitioning from
// startTime = nil to startTime != nil for unsuspended jobs, which is a desired transition.
if oldJob.Status.StartTime != nil && !ptr.Equal(oldJob.Status.StartTime, job.Status.StartTime) && !ptr.Deref(job.Spec.Suspend, false) {
allErrs = append(allErrs, field.Required(statusFld.Child("startTime"), "startTime cannot be removed for unsuspended job"))
}
}
return allErrs
}
@@ -666,6 +798,124 @@ func validateCompletions(spec, oldSpec batch.JobSpec, fldPath *field.Path, opts
return allErrs
}
func IsJobFinished(job *batch.Job) bool {
for _, c := range job.Status.Conditions {
if (c.Type == batch.JobComplete || c.Type == batch.JobFailed) && c.Status == api.ConditionTrue {
return true
}
}
return false
}
func IsJobComplete(job *batch.Job) bool {
return IsConditionTrue(job.Status.Conditions, batch.JobComplete)
}
func IsJobFailed(job *batch.Job) bool {
return IsConditionTrue(job.Status.Conditions, batch.JobFailed)
}
func IsConditionTrue(list []batch.JobCondition, cType batch.JobConditionType) bool {
for _, c := range list {
if c.Type == cType && c.Status == api.ConditionTrue {
return true
}
}
return false
}
func validateFailedIndexesNotOverlapCompleted(completedIndexesStr string, failedIndexesStr string, completions int32) error {
if len(completedIndexesStr) == 0 || len(failedIndexesStr) == 0 {
return nil
}
completedIndexesIntervals := strings.Split(completedIndexesStr, ",")
failedIndexesIntervals := strings.Split(failedIndexesStr, ",")
var completedPos, failedPos int
cX, cY, cErr := parseIndexInterval(completedIndexesIntervals[completedPos], completions)
fX, fY, fErr := parseIndexInterval(failedIndexesIntervals[failedPos], completions)
for completedPos < len(completedIndexesIntervals) && failedPos < len(failedIndexesIntervals) {
if cErr != nil {
// Failure to parse "completed" interval. We go to the next interval,
// the error will be reported to the user when validating the format.
completedPos++
if completedPos < len(completedIndexesIntervals) {
cX, cY, cErr = parseIndexInterval(completedIndexesIntervals[completedPos], completions)
}
} else if fErr != nil {
// Failure to parse "failed" interval. We go to the next interval,
// the error will be reported to the user when validating the format.
failedPos++
if failedPos < len(failedIndexesIntervals) {
fX, fY, fErr = parseIndexInterval(failedIndexesIntervals[failedPos], completions)
}
} else {
// We have one failed and one completed interval parsed.
if cX <= fY && fX <= cY {
return fmt.Errorf("failedIndexes and completedIndexes overlap at index: %d", max(cX, fX))
}
// No overlap, let's move to the next one.
if cX <= fX {
completedPos++
if completedPos < len(completedIndexesIntervals) {
cX, cY, cErr = parseIndexInterval(completedIndexesIntervals[completedPos], completions)
}
} else {
failedPos++
if failedPos < len(failedIndexesIntervals) {
fX, fY, fErr = parseIndexInterval(failedIndexesIntervals[failedPos], completions)
}
}
}
}
return nil
}
func validateIndexesFormat(indexesStr string, completions int32) error {
if len(indexesStr) == 0 {
return nil
}
var lastIndex *int32
for _, intervalStr := range strings.Split(indexesStr, ",") {
x, y, err := parseIndexInterval(intervalStr, completions)
if err != nil {
return err
}
if lastIndex != nil && *lastIndex >= x {
return fmt.Errorf("non-increasing order, previous: %d, current: %d", *lastIndex, x)
}
lastIndex = &y
}
return nil
}
func parseIndexInterval(intervalStr string, completions int32) (int32, int32, error) {
limitsStr := strings.Split(intervalStr, "-")
if len(limitsStr) > 2 {
return 0, 0, fmt.Errorf("the fragment %q violates the requirement that an index interval can have at most two parts separated by '-'", intervalStr)
}
x, err := strconv.Atoi(limitsStr[0])
if err != nil {
return 0, 0, fmt.Errorf("cannot convert string to integer for index: %q", limitsStr[0])
}
if x >= int(completions) {
return 0, 0, fmt.Errorf("too large index: %q", limitsStr[0])
}
if len(limitsStr) > 1 {
y, err := strconv.Atoi(limitsStr[1])
if err != nil {
return 0, 0, fmt.Errorf("cannot convert string to integer for index: %q", limitsStr[1])
}
if y >= int(completions) {
return 0, 0, fmt.Errorf("too large index: %q", limitsStr[1])
}
if x >= y {
return 0, 0, fmt.Errorf("non-increasing order, previous: %d, current: %d", x, y)
}
return int32(x), int32(y), nil
}
return int32(x), int32(x), nil
}
type JobValidationOptions struct {
apivalidation.PodValidationOptions
// Allow mutable node affinity, selector and tolerations of the template
@@ -675,3 +925,26 @@ type JobValidationOptions struct {
// Require Job to have the label on batch.kubernetes.io/job-name and batch.kubernetes.io/controller-uid
RequirePrefixedLabels bool
}
type JobStatusValidationOptions struct {
RejectDecreasingSucceededCounter bool
RejectDecreasingFailedCounter bool
RejectDisablingTerminalCondition bool
RejectInvalidCompletedIndexes bool
RejectInvalidFailedIndexes bool
RejectFailedIndexesOverlappingCompleted bool
RejectCompletedIndexesForNonIndexedJob bool
RejectFailedIndexesForNoBackoffLimitPerIndex bool
RejectMoreReadyThanActivePods bool
RejectFinishedJobWithActivePods bool
RejectFinishedJobWithTerminatingPods bool
RejectFinishedJobWithoutStartTime bool
RejectFinishedJobWithUncountedTerminatedPods bool
RejectStartTimeUpdateForUnsuspendedJob bool
RejectCompletionTimeBeforeStartTime bool
RejectMutatingCompletionTime bool
RejectCompleteJobWithoutCompletionTime bool
RejectNotCompleteJobWithCompletionTime bool
RejectCompleteJobWithFailedCondition bool
RejectCompleteJobWithFailureTargetCondition bool
}