Merge pull request #125929 from sanposhiho/requeueing-metrics
add: implement event_handling_duration_seconds metric
This commit is contained in:
		| @@ -41,11 +41,14 @@ import ( | ||||
| 	"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeports" | ||||
| 	"k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources" | ||||
| 	"k8s.io/kubernetes/pkg/scheduler/internal/queue" | ||||
| 	"k8s.io/kubernetes/pkg/scheduler/metrics" | ||||
| 	"k8s.io/kubernetes/pkg/scheduler/profile" | ||||
| 	"k8s.io/kubernetes/pkg/scheduler/util/assumecache" | ||||
| ) | ||||
|  | ||||
| func (sched *Scheduler) onStorageClassAdd(obj interface{}) { | ||||
| 	start := time.Now() | ||||
| 	defer metrics.EventHandlingLatency.WithLabelValues(queue.StorageClassAdd.Label).Observe(metrics.SinceInSeconds(start)) | ||||
| 	logger := sched.logger | ||||
| 	sc, ok := obj.(*storagev1.StorageClass) | ||||
| 	if !ok { | ||||
| @@ -65,6 +68,8 @@ func (sched *Scheduler) onStorageClassAdd(obj interface{}) { | ||||
| } | ||||
|  | ||||
| func (sched *Scheduler) addNodeToCache(obj interface{}) { | ||||
| 	start := time.Now() | ||||
| 	defer metrics.EventHandlingLatency.WithLabelValues(queue.NodeAdd.Label).Observe(metrics.SinceInSeconds(start)) | ||||
| 	logger := sched.logger | ||||
| 	node, ok := obj.(*v1.Node) | ||||
| 	if !ok { | ||||
| @@ -78,6 +83,7 @@ func (sched *Scheduler) addNodeToCache(obj interface{}) { | ||||
| } | ||||
|  | ||||
| func (sched *Scheduler) updateNodeInCache(oldObj, newObj interface{}) { | ||||
| 	start := time.Now() | ||||
| 	logger := sched.logger | ||||
| 	oldNode, ok := oldObj.(*v1.Node) | ||||
| 	if !ok { | ||||
| @@ -92,13 +98,25 @@ func (sched *Scheduler) updateNodeInCache(oldObj, newObj interface{}) { | ||||
|  | ||||
| 	logger.V(4).Info("Update event for node", "node", klog.KObj(newNode)) | ||||
| 	nodeInfo := sched.Cache.UpdateNode(logger, oldNode, newNode) | ||||
| 	events := queue.NodeSchedulingPropertiesChange(newNode, oldNode) | ||||
|  | ||||
| 	// Save the time it takes to update the node in the cache. | ||||
| 	updatingDuration := metrics.SinceInSeconds(start) | ||||
|  | ||||
| 	// Only requeue unschedulable pods if the node became more schedulable. | ||||
| 	for _, evt := range queue.NodeSchedulingPropertiesChange(newNode, oldNode) { | ||||
| 	for _, evt := range events { | ||||
| 		startMoving := time.Now() | ||||
| 		sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, evt, oldNode, newNode, preCheckForNode(nodeInfo)) | ||||
| 		movingDuration := metrics.SinceInSeconds(startMoving) | ||||
|  | ||||
| 		metrics.EventHandlingLatency.WithLabelValues(evt.Label).Observe(updatingDuration + movingDuration) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (sched *Scheduler) deleteNodeFromCache(obj interface{}) { | ||||
| 	start := time.Now() | ||||
| 	defer metrics.EventHandlingLatency.WithLabelValues(queue.NodeDelete.Label).Observe(metrics.SinceInSeconds(start)) | ||||
|  | ||||
| 	logger := sched.logger | ||||
| 	var node *v1.Node | ||||
| 	switch t := obj.(type) { | ||||
| @@ -123,6 +141,9 @@ func (sched *Scheduler) deleteNodeFromCache(obj interface{}) { | ||||
| } | ||||
|  | ||||
| func (sched *Scheduler) addPodToSchedulingQueue(obj interface{}) { | ||||
| 	start := time.Now() | ||||
| 	defer metrics.EventHandlingLatency.WithLabelValues(queue.UnscheduledPodAdd.Label).Observe(metrics.SinceInSeconds(start)) | ||||
|  | ||||
| 	logger := sched.logger | ||||
| 	pod := obj.(*v1.Pod) | ||||
| 	logger.V(3).Info("Add event for unscheduled pod", "pod", klog.KObj(pod)) | ||||
| @@ -132,6 +153,8 @@ func (sched *Scheduler) addPodToSchedulingQueue(obj interface{}) { | ||||
| } | ||||
|  | ||||
| func (sched *Scheduler) updatePodInSchedulingQueue(oldObj, newObj interface{}) { | ||||
| 	start := time.Now() | ||||
| 	defer metrics.EventHandlingLatency.WithLabelValues(queue.UnscheduledPodUpdate.Label).Observe(metrics.SinceInSeconds(start)) | ||||
| 	logger := sched.logger | ||||
| 	oldPod, newPod := oldObj.(*v1.Pod), newObj.(*v1.Pod) | ||||
| 	// Bypass update event that carries identical objects; otherwise, a duplicated | ||||
| @@ -155,6 +178,9 @@ func (sched *Scheduler) updatePodInSchedulingQueue(oldObj, newObj interface{}) { | ||||
| } | ||||
|  | ||||
| func (sched *Scheduler) deletePodFromSchedulingQueue(obj interface{}) { | ||||
| 	start := time.Now() | ||||
| 	defer metrics.EventHandlingLatency.WithLabelValues(queue.UnscheduledPodDelete.Label).Observe(metrics.SinceInSeconds(start)) | ||||
|  | ||||
| 	logger := sched.logger | ||||
| 	var pod *v1.Pod | ||||
| 	switch t := obj.(type) { | ||||
| @@ -192,6 +218,9 @@ func (sched *Scheduler) deletePodFromSchedulingQueue(obj interface{}) { | ||||
| } | ||||
|  | ||||
| func (sched *Scheduler) addPodToCache(obj interface{}) { | ||||
| 	start := time.Now() | ||||
| 	defer metrics.EventHandlingLatency.WithLabelValues(queue.AssignedPodAdd.Label).Observe(metrics.SinceInSeconds(start)) | ||||
|  | ||||
| 	logger := sched.logger | ||||
| 	pod, ok := obj.(*v1.Pod) | ||||
| 	if !ok { | ||||
| @@ -221,6 +250,9 @@ func (sched *Scheduler) addPodToCache(obj interface{}) { | ||||
| } | ||||
|  | ||||
| func (sched *Scheduler) updatePodInCache(oldObj, newObj interface{}) { | ||||
| 	start := time.Now() | ||||
| 	defer metrics.EventHandlingLatency.WithLabelValues(queue.AssignedPodUpdate.Label).Observe(metrics.SinceInSeconds(start)) | ||||
|  | ||||
| 	logger := sched.logger | ||||
| 	oldPod, ok := oldObj.(*v1.Pod) | ||||
| 	if !ok { | ||||
| @@ -255,6 +287,9 @@ func (sched *Scheduler) updatePodInCache(oldObj, newObj interface{}) { | ||||
| } | ||||
|  | ||||
| func (sched *Scheduler) deletePodFromCache(obj interface{}) { | ||||
| 	start := time.Now() | ||||
| 	defer metrics.EventHandlingLatency.WithLabelValues(queue.AssignedPodDelete.Label).Observe(metrics.SinceInSeconds(start)) | ||||
|  | ||||
| 	logger := sched.logger | ||||
| 	var pod *v1.Pod | ||||
| 	switch t := obj.(type) { | ||||
| @@ -399,21 +434,30 @@ func addAllEventHandlers( | ||||
| 	buildEvtResHandler := func(at framework.ActionType, gvk framework.GVK, shortGVK string) cache.ResourceEventHandlerFuncs { | ||||
| 		funcs := cache.ResourceEventHandlerFuncs{} | ||||
| 		if at&framework.Add != 0 { | ||||
| 			evt := framework.ClusterEvent{Resource: gvk, ActionType: framework.Add, Label: fmt.Sprintf("%vAdd", shortGVK)} | ||||
| 			label := fmt.Sprintf("%vAdd", shortGVK) | ||||
| 			evt := framework.ClusterEvent{Resource: gvk, ActionType: framework.Add, Label: label} | ||||
| 			funcs.AddFunc = func(obj interface{}) { | ||||
| 				start := time.Now() | ||||
| 				sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, evt, nil, obj, nil) | ||||
| 				metrics.EventHandlingLatency.WithLabelValues(label).Observe(metrics.SinceInSeconds(start)) | ||||
| 			} | ||||
| 		} | ||||
| 		if at&framework.Update != 0 { | ||||
| 			evt := framework.ClusterEvent{Resource: gvk, ActionType: framework.Update, Label: fmt.Sprintf("%vUpdate", shortGVK)} | ||||
| 			label := fmt.Sprintf("%vUpdate", shortGVK) | ||||
| 			evt := framework.ClusterEvent{Resource: gvk, ActionType: framework.Update, Label: label} | ||||
| 			funcs.UpdateFunc = func(old, obj interface{}) { | ||||
| 				start := time.Now() | ||||
| 				sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, evt, old, obj, nil) | ||||
| 				metrics.EventHandlingLatency.WithLabelValues(label).Observe(metrics.SinceInSeconds(start)) | ||||
| 			} | ||||
| 		} | ||||
| 		if at&framework.Delete != 0 { | ||||
| 			evt := framework.ClusterEvent{Resource: gvk, ActionType: framework.Delete, Label: fmt.Sprintf("%vDelete", shortGVK)} | ||||
| 			label := fmt.Sprintf("%vDelete", shortGVK) | ||||
| 			evt := framework.ClusterEvent{Resource: gvk, ActionType: framework.Delete, Label: label} | ||||
| 			funcs.DeleteFunc = func(obj interface{}) { | ||||
| 				start := time.Now() | ||||
| 				sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, evt, obj, nil, nil) | ||||
| 				metrics.EventHandlingLatency.WithLabelValues(label).Observe(metrics.SinceInSeconds(start)) | ||||
| 			} | ||||
| 		} | ||||
| 		return funcs | ||||
| @@ -530,6 +574,8 @@ func addAllEventHandlers( | ||||
| 				if handlerRegistration, err = informerFactory.Storage().V1().StorageClasses().Informer().AddEventHandler( | ||||
| 					cache.ResourceEventHandlerFuncs{ | ||||
| 						UpdateFunc: func(old, obj interface{}) { | ||||
| 							start := time.Now() | ||||
| 							defer metrics.EventHandlingLatency.WithLabelValues(queue.StorageClassUpdate.Label).Observe(metrics.SinceInSeconds(start)) | ||||
| 							sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, queue.StorageClassUpdate, old, obj, nil) | ||||
| 						}, | ||||
| 					}, | ||||
|   | ||||
| @@ -41,10 +41,16 @@ var ( | ||||
| 	AssignedPodAdd = framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Add, Label: "AssignedPodAdd"} | ||||
| 	// NodeAdd is the event when a new node is added to the cluster. | ||||
| 	NodeAdd = framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add, Label: "NodeAdd"} | ||||
| 	// NodeDelete is the event when a node is deleted from the cluster. | ||||
| 	NodeDelete = framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Delete, Label: "NodeDelete"} | ||||
| 	// AssignedPodUpdate is the event when an assigned pod is updated. | ||||
| 	AssignedPodUpdate = framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Update, Label: "AssignedPodUpdate"} | ||||
| 	// UnscheduledPodAdd is the event when an unscheduled pod is added. | ||||
| 	UnscheduledPodAdd = framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Update, Label: "UnschedulablePodAdd"} | ||||
| 	// UnscheduledPodUpdate is the event when an unscheduled pod is updated. | ||||
| 	UnscheduledPodUpdate = framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Update, Label: "UnschedulablePodUpdate"} | ||||
| 	// UnscheduledPodDelete is the event when an unscheduled pod is deleted. | ||||
| 	UnscheduledPodDelete = framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Update, Label: "UnschedulablePodDelete"} | ||||
| 	// AssignedPodDelete is the event when an assigned pod is deleted. | ||||
| 	AssignedPodDelete = framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Delete, Label: "AssignedPodDelete"} | ||||
| 	// NodeSpecUnschedulableChange is the event when unschedulable node spec is changed. | ||||
|   | ||||
| @@ -83,6 +83,16 @@ var ( | ||||
| 			StabilityLevel: metrics.STABLE, | ||||
| 		}, []string{"result", "profile"}) | ||||
|  | ||||
| 	EventHandlingLatency = metrics.NewHistogramVec( | ||||
| 		&metrics.HistogramOpts{ | ||||
| 			Subsystem: SchedulerSubsystem, | ||||
| 			Name:      "event_handling_duration_seconds", | ||||
| 			Help:      "Event handling latency in seconds.", | ||||
| 			// Start with 0.1ms with the last bucket being [~200ms, Inf) | ||||
| 			Buckets:        metrics.ExponentialBuckets(0.0001, 2, 12), | ||||
| 			StabilityLevel: metrics.ALPHA, | ||||
| 		}, []string{"event"}) | ||||
|  | ||||
| 	schedulingLatency = metrics.NewHistogramVec( | ||||
| 		&metrics.HistogramOpts{ | ||||
| 			Subsystem:      SchedulerSubsystem, | ||||
| @@ -234,6 +244,7 @@ var ( | ||||
| 		scheduleAttempts, | ||||
| 		schedulingLatency, | ||||
| 		SchedulingAlgorithmLatency, | ||||
| 		EventHandlingLatency, | ||||
| 		PreemptionVictims, | ||||
| 		PreemptionAttempts, | ||||
| 		pendingPods, | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 Kubernetes Prow Robot
					Kubernetes Prow Robot