indexed job: add three metrics to job controller
This commit is contained in:
		| @@ -47,6 +47,7 @@ import ( | |||||||
| 	"k8s.io/component-base/metrics/prometheus/ratelimiter" | 	"k8s.io/component-base/metrics/prometheus/ratelimiter" | ||||||
| 	"k8s.io/klog/v2" | 	"k8s.io/klog/v2" | ||||||
| 	"k8s.io/kubernetes/pkg/controller" | 	"k8s.io/kubernetes/pkg/controller" | ||||||
|  | 	"k8s.io/kubernetes/pkg/controller/job/metrics" | ||||||
| 	"k8s.io/kubernetes/pkg/features" | 	"k8s.io/kubernetes/pkg/features" | ||||||
| 	"k8s.io/utils/integer" | 	"k8s.io/utils/integer" | ||||||
| ) | ) | ||||||
| @@ -139,6 +140,8 @@ func NewController(podInformer coreinformers.PodInformer, jobInformer batchinfor | |||||||
| 	jm.updateHandler = jm.updateJobStatus | 	jm.updateHandler = jm.updateJobStatus | ||||||
| 	jm.syncHandler = jm.syncJob | 	jm.syncHandler = jm.syncJob | ||||||
|  |  | ||||||
|  | 	metrics.Register() | ||||||
|  |  | ||||||
| 	return jm | 	return jm | ||||||
| } | } | ||||||
|  |  | ||||||
| @@ -440,7 +443,7 @@ func (jm *Controller) getPodsForJob(j *batch.Job) ([]*v1.Pod, error) { | |||||||
| // syncJob will sync the job with the given key if it has had its expectations fulfilled, meaning | // syncJob will sync the job with the given key if it has had its expectations fulfilled, meaning | ||||||
| // it did not expect to see any more of its pods created or deleted. This function is not meant to be invoked | // it did not expect to see any more of its pods created or deleted. This function is not meant to be invoked | ||||||
| // concurrently with the same key. | // concurrently with the same key. | ||||||
| func (jm *Controller) syncJob(key string) (bool, error) { | func (jm *Controller) syncJob(key string) (forget bool, rErr error) { | ||||||
| 	startTime := time.Now() | 	startTime := time.Now() | ||||||
| 	defer func() { | 	defer func() { | ||||||
| 		klog.V(4).Infof("Finished syncing job %q (%v)", key, time.Since(startTime)) | 		klog.V(4).Infof("Finished syncing job %q (%v)", key, time.Since(startTime)) | ||||||
| @@ -480,6 +483,21 @@ func (jm *Controller) syncJob(key string) (bool, error) { | |||||||
| 		return false, nil | 		return false, nil | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|  | 	completionMode := string(batch.NonIndexedCompletion) | ||||||
|  | 	if isIndexedJob(&job) { | ||||||
|  | 		completionMode = string(batch.IndexedCompletion) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	defer func() { | ||||||
|  | 		result := "success" | ||||||
|  | 		if rErr != nil { | ||||||
|  | 			result = "error" | ||||||
|  | 		} | ||||||
|  |  | ||||||
|  | 		metrics.JobSyncDurationSeconds.WithLabelValues(completionMode, result).Observe(time.Since(startTime).Seconds()) | ||||||
|  | 		metrics.JobSyncNum.WithLabelValues(completionMode, result).Inc() | ||||||
|  | 	}() | ||||||
|  |  | ||||||
| 	// Check the expectations of the job before counting active pods, otherwise a new pod can sneak in | 	// Check the expectations of the job before counting active pods, otherwise a new pod can sneak in | ||||||
| 	// and update the expectations after we've retrieved active pods from the store. If a new pod enters | 	// and update the expectations after we've retrieved active pods from the store. If a new pod enters | ||||||
| 	// the store after we've checked the expectation, the job sync is just deferred till the next relist. | 	// the store after we've checked the expectation, the job sync is just deferred till the next relist. | ||||||
| @@ -546,6 +564,7 @@ func (jm *Controller) syncJob(key string) (bool, error) { | |||||||
| 		job.Status.Conditions = append(job.Status.Conditions, newCondition(batch.JobFailed, v1.ConditionTrue, failureReason, failureMessage)) | 		job.Status.Conditions = append(job.Status.Conditions, newCondition(batch.JobFailed, v1.ConditionTrue, failureReason, failureMessage)) | ||||||
| 		jobConditionsChanged = true | 		jobConditionsChanged = true | ||||||
| 		jm.recorder.Event(&job, v1.EventTypeWarning, failureReason, failureMessage) | 		jm.recorder.Event(&job, v1.EventTypeWarning, failureReason, failureMessage) | ||||||
|  | 		metrics.JobFinishedNum.WithLabelValues(completionMode, "failed").Inc() | ||||||
| 	} else { | 	} else { | ||||||
| 		if jobNeedsSync && job.DeletionTimestamp == nil { | 		if jobNeedsSync && job.DeletionTimestamp == nil { | ||||||
| 			active, manageJobErr = jm.manageJob(&job, activePods, succeeded, pods) | 			active, manageJobErr = jm.manageJob(&job, activePods, succeeded, pods) | ||||||
| @@ -581,6 +600,7 @@ func (jm *Controller) syncJob(key string) (bool, error) { | |||||||
| 			now := metav1.Now() | 			now := metav1.Now() | ||||||
| 			job.Status.CompletionTime = &now | 			job.Status.CompletionTime = &now | ||||||
| 			jm.recorder.Event(&job, v1.EventTypeNormal, "Completed", "Job completed") | 			jm.recorder.Event(&job, v1.EventTypeNormal, "Completed", "Job completed") | ||||||
|  | 			metrics.JobFinishedNum.WithLabelValues(completionMode, "succeeded").Inc() | ||||||
| 		} else if utilfeature.DefaultFeatureGate.Enabled(features.SuspendJob) && manageJobCalled { | 		} else if utilfeature.DefaultFeatureGate.Enabled(features.SuspendJob) && manageJobCalled { | ||||||
| 			// Update the conditions / emit events only if manageJob was called in | 			// Update the conditions / emit events only if manageJob was called in | ||||||
| 			// this syncJob. Otherwise wait for the right syncJob call to make | 			// this syncJob. Otherwise wait for the right syncJob call to make | ||||||
| @@ -613,7 +633,7 @@ func (jm *Controller) syncJob(key string) (bool, error) { | |||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	forget := false | 	forget = false | ||||||
| 	// Check if the number of jobs succeeded increased since the last check. If yes "forget" should be true | 	// Check if the number of jobs succeeded increased since the last check. If yes "forget" should be true | ||||||
| 	// This logic is linked to the issue: https://github.com/kubernetes/kubernetes/issues/56853 that aims to | 	// This logic is linked to the issue: https://github.com/kubernetes/kubernetes/issues/56853 that aims to | ||||||
| 	// improve the Job backoff policy when parallelism > 1 and few Jobs failed but others succeed. | 	// improve the Job backoff policy when parallelism > 1 and few Jobs failed but others succeed. | ||||||
|   | |||||||
							
								
								
									
										75
									
								
								pkg/controller/job/metrics/metrics.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										75
									
								
								pkg/controller/job/metrics/metrics.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,75 @@ | |||||||
|  | /* | ||||||
|  | Copyright 2021 The Kubernetes Authors. | ||||||
|  |  | ||||||
|  | Licensed under the Apache License, Version 2.0 (the "License"); | ||||||
|  | you may not use this file except in compliance with the License. | ||||||
|  | You may obtain a copy of the License at | ||||||
|  |  | ||||||
|  |     http://www.apache.org/licenses/LICENSE-2.0 | ||||||
|  |  | ||||||
|  | Unless required by applicable law or agreed to in writing, software | ||||||
|  | distributed under the License is distributed on an "AS IS" BASIS, | ||||||
|  | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||||
|  | See the License for the specific language governing permissions and | ||||||
|  | limitations under the License. | ||||||
|  | */ | ||||||
|  |  | ||||||
|  | package metrics | ||||||
|  |  | ||||||
|  | import ( | ||||||
|  | 	"sync" | ||||||
|  |  | ||||||
|  | 	"k8s.io/component-base/metrics" | ||||||
|  | 	"k8s.io/component-base/metrics/legacyregistry" | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | // JobControllerSubsystem - subsystem name used for this controller. | ||||||
|  | const JobControllerSubsystem = "job_controller" | ||||||
|  |  | ||||||
|  | var ( | ||||||
|  | 	// JobSyncDurationSeconds tracks the latency of job syncs as | ||||||
|  | 	// completion_mode = Indexed / NonIndexed and result = success / error. | ||||||
|  | 	JobSyncDurationSeconds = metrics.NewHistogramVec( | ||||||
|  | 		&metrics.HistogramOpts{ | ||||||
|  | 			Subsystem:      JobControllerSubsystem, | ||||||
|  | 			Name:           "job_sync_duration_seconds", | ||||||
|  | 			Help:           "The time it took to sync a job", | ||||||
|  | 			StabilityLevel: metrics.ALPHA, | ||||||
|  | 			Buckets:        metrics.ExponentialBuckets(0.001, 2, 15), | ||||||
|  | 		}, | ||||||
|  | 		[]string{"completion_mode", "result"}, | ||||||
|  | 	) | ||||||
|  | 	// JobSyncNum tracks the number of job syncs as | ||||||
|  | 	// completion_mode = Indexed / NonIndexed and result = success / error. | ||||||
|  | 	JobSyncNum = metrics.NewCounterVec( | ||||||
|  | 		&metrics.CounterOpts{ | ||||||
|  | 			Subsystem:      JobControllerSubsystem, | ||||||
|  | 			Name:           "job_sync_total", | ||||||
|  | 			Help:           "The number of job syncs", | ||||||
|  | 			StabilityLevel: metrics.ALPHA, | ||||||
|  | 		}, | ||||||
|  | 		[]string{"completion_mode", "result"}, | ||||||
|  | 	) | ||||||
|  | 	// JobFinishedNum tracks the number of jobs that finish as | ||||||
|  | 	// completion_mode = Indexed / NonIndexed and result = failed / succeeded. | ||||||
|  | 	JobFinishedNum = metrics.NewCounterVec( | ||||||
|  | 		&metrics.CounterOpts{ | ||||||
|  | 			Subsystem:      JobControllerSubsystem, | ||||||
|  | 			Name:           "job_finished_total", | ||||||
|  | 			Help:           "The number of finished job", | ||||||
|  | 			StabilityLevel: metrics.ALPHA, | ||||||
|  | 		}, | ||||||
|  | 		[]string{"completion_mode", "result"}, | ||||||
|  | 	) | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | var registerMetrics sync.Once | ||||||
|  |  | ||||||
|  | // Register registers Job controller metrics. | ||||||
|  | func Register() { | ||||||
|  | 	registerMetrics.Do(func() { | ||||||
|  | 		legacyregistry.MustRegister(JobSyncDurationSeconds) | ||||||
|  | 		legacyregistry.MustRegister(JobSyncNum) | ||||||
|  | 		legacyregistry.MustRegister(JobFinishedNum) | ||||||
|  | 	}) | ||||||
|  | } | ||||||
		Reference in New Issue
	
	Block a user
	 Mengxue Zhang
					Mengxue Zhang