 e79d666cdd
			
		
	
	e79d666cdd
	
	
	
		
			
			full diff:92cb4ed978..61b7af7564This adds new dependency github.com/fsnotify/fsnotify since4ce334aa49Signed-off-by: Kohei Tokunaga <ktokunaga.mail@gmail.com>
		
			
				
	
	
		
			262 lines
		
	
	
		
			6.6 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			262 lines
		
	
	
		
			6.6 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| /*
 | |
| Copyright 2016 The Kubernetes Authors.
 | |
| 
 | |
| Licensed under the Apache License, Version 2.0 (the "License");
 | |
| you may not use this file except in compliance with the License.
 | |
| You may obtain a copy of the License at
 | |
| 
 | |
|     http://www.apache.org/licenses/LICENSE-2.0
 | |
| 
 | |
| Unless required by applicable law or agreed to in writing, software
 | |
| distributed under the License is distributed on an "AS IS" BASIS,
 | |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| See the License for the specific language governing permissions and
 | |
| limitations under the License.
 | |
| */
 | |
| 
 | |
| package workqueue
 | |
| 
 | |
| import (
 | |
| 	"sync"
 | |
| 	"time"
 | |
| 
 | |
| 	"k8s.io/apimachinery/pkg/util/clock"
 | |
| )
 | |
| 
 | |
| // This file provides abstractions for setting the provider (e.g., prometheus)
 | |
| // of metrics.
 | |
| 
 | |
| type queueMetrics interface {
 | |
| 	add(item t)
 | |
| 	get(item t)
 | |
| 	done(item t)
 | |
| 	updateUnfinishedWork()
 | |
| }
 | |
| 
 | |
| // GaugeMetric represents a single numerical value that can arbitrarily go up
 | |
| // and down.
 | |
| type GaugeMetric interface {
 | |
| 	Inc()
 | |
| 	Dec()
 | |
| }
 | |
| 
 | |
| // SettableGaugeMetric represents a single numerical value that can arbitrarily go up
 | |
| // and down. (Separate from GaugeMetric to preserve backwards compatibility.)
 | |
| type SettableGaugeMetric interface {
 | |
| 	Set(float64)
 | |
| }
 | |
| 
 | |
| // CounterMetric represents a single numerical value that only ever
 | |
| // goes up.
 | |
| type CounterMetric interface {
 | |
| 	Inc()
 | |
| }
 | |
| 
 | |
| // SummaryMetric captures individual observations.
 | |
| type SummaryMetric interface {
 | |
| 	Observe(float64)
 | |
| }
 | |
| 
 | |
| // HistogramMetric counts individual observations.
 | |
| type HistogramMetric interface {
 | |
| 	Observe(float64)
 | |
| }
 | |
| 
 | |
| type noopMetric struct{}
 | |
| 
 | |
| func (noopMetric) Inc()            {}
 | |
| func (noopMetric) Dec()            {}
 | |
| func (noopMetric) Set(float64)     {}
 | |
| func (noopMetric) Observe(float64) {}
 | |
| 
 | |
| // defaultQueueMetrics expects the caller to lock before setting any metrics.
 | |
| type defaultQueueMetrics struct {
 | |
| 	clock clock.Clock
 | |
| 
 | |
| 	// current depth of a workqueue
 | |
| 	depth GaugeMetric
 | |
| 	// total number of adds handled by a workqueue
 | |
| 	adds CounterMetric
 | |
| 	// how long an item stays in a workqueue
 | |
| 	latency HistogramMetric
 | |
| 	// how long processing an item from a workqueue takes
 | |
| 	workDuration         HistogramMetric
 | |
| 	addTimes             map[t]time.Time
 | |
| 	processingStartTimes map[t]time.Time
 | |
| 
 | |
| 	// how long have current threads been working?
 | |
| 	unfinishedWorkSeconds   SettableGaugeMetric
 | |
| 	longestRunningProcessor SettableGaugeMetric
 | |
| }
 | |
| 
 | |
| func (m *defaultQueueMetrics) add(item t) {
 | |
| 	if m == nil {
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	m.adds.Inc()
 | |
| 	m.depth.Inc()
 | |
| 	if _, exists := m.addTimes[item]; !exists {
 | |
| 		m.addTimes[item] = m.clock.Now()
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (m *defaultQueueMetrics) get(item t) {
 | |
| 	if m == nil {
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	m.depth.Dec()
 | |
| 	m.processingStartTimes[item] = m.clock.Now()
 | |
| 	if startTime, exists := m.addTimes[item]; exists {
 | |
| 		m.latency.Observe(m.sinceInSeconds(startTime))
 | |
| 		delete(m.addTimes, item)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (m *defaultQueueMetrics) done(item t) {
 | |
| 	if m == nil {
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	if startTime, exists := m.processingStartTimes[item]; exists {
 | |
| 		m.workDuration.Observe(m.sinceInSeconds(startTime))
 | |
| 		delete(m.processingStartTimes, item)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (m *defaultQueueMetrics) updateUnfinishedWork() {
 | |
| 	// Note that a summary metric would be better for this, but prometheus
 | |
| 	// doesn't seem to have non-hacky ways to reset the summary metrics.
 | |
| 	var total float64
 | |
| 	var oldest float64
 | |
| 	for _, t := range m.processingStartTimes {
 | |
| 		age := m.sinceInSeconds(t)
 | |
| 		total += age
 | |
| 		if age > oldest {
 | |
| 			oldest = age
 | |
| 		}
 | |
| 	}
 | |
| 	m.unfinishedWorkSeconds.Set(total)
 | |
| 	m.longestRunningProcessor.Set(oldest)
 | |
| }
 | |
| 
 | |
| type noMetrics struct{}
 | |
| 
 | |
| func (noMetrics) add(item t)            {}
 | |
| func (noMetrics) get(item t)            {}
 | |
| func (noMetrics) done(item t)           {}
 | |
| func (noMetrics) updateUnfinishedWork() {}
 | |
| 
 | |
| // Gets the time since the specified start in seconds.
 | |
| func (m *defaultQueueMetrics) sinceInSeconds(start time.Time) float64 {
 | |
| 	return m.clock.Since(start).Seconds()
 | |
| }
 | |
| 
 | |
| type retryMetrics interface {
 | |
| 	retry()
 | |
| }
 | |
| 
 | |
| type defaultRetryMetrics struct {
 | |
| 	retries CounterMetric
 | |
| }
 | |
| 
 | |
| func (m *defaultRetryMetrics) retry() {
 | |
| 	if m == nil {
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	m.retries.Inc()
 | |
| }
 | |
| 
 | |
| // MetricsProvider generates various metrics used by the queue.
 | |
| type MetricsProvider interface {
 | |
| 	NewDepthMetric(name string) GaugeMetric
 | |
| 	NewAddsMetric(name string) CounterMetric
 | |
| 	NewLatencyMetric(name string) HistogramMetric
 | |
| 	NewWorkDurationMetric(name string) HistogramMetric
 | |
| 	NewUnfinishedWorkSecondsMetric(name string) SettableGaugeMetric
 | |
| 	NewLongestRunningProcessorSecondsMetric(name string) SettableGaugeMetric
 | |
| 	NewRetriesMetric(name string) CounterMetric
 | |
| }
 | |
| 
 | |
| type noopMetricsProvider struct{}
 | |
| 
 | |
| func (_ noopMetricsProvider) NewDepthMetric(name string) GaugeMetric {
 | |
| 	return noopMetric{}
 | |
| }
 | |
| 
 | |
| func (_ noopMetricsProvider) NewAddsMetric(name string) CounterMetric {
 | |
| 	return noopMetric{}
 | |
| }
 | |
| 
 | |
| func (_ noopMetricsProvider) NewLatencyMetric(name string) HistogramMetric {
 | |
| 	return noopMetric{}
 | |
| }
 | |
| 
 | |
| func (_ noopMetricsProvider) NewWorkDurationMetric(name string) HistogramMetric {
 | |
| 	return noopMetric{}
 | |
| }
 | |
| 
 | |
| func (_ noopMetricsProvider) NewUnfinishedWorkSecondsMetric(name string) SettableGaugeMetric {
 | |
| 	return noopMetric{}
 | |
| }
 | |
| 
 | |
| func (_ noopMetricsProvider) NewLongestRunningProcessorSecondsMetric(name string) SettableGaugeMetric {
 | |
| 	return noopMetric{}
 | |
| }
 | |
| 
 | |
| func (_ noopMetricsProvider) NewRetriesMetric(name string) CounterMetric {
 | |
| 	return noopMetric{}
 | |
| }
 | |
| 
 | |
| var globalMetricsFactory = queueMetricsFactory{
 | |
| 	metricsProvider: noopMetricsProvider{},
 | |
| }
 | |
| 
 | |
| type queueMetricsFactory struct {
 | |
| 	metricsProvider MetricsProvider
 | |
| 
 | |
| 	onlyOnce sync.Once
 | |
| }
 | |
| 
 | |
| func (f *queueMetricsFactory) setProvider(mp MetricsProvider) {
 | |
| 	f.onlyOnce.Do(func() {
 | |
| 		f.metricsProvider = mp
 | |
| 	})
 | |
| }
 | |
| 
 | |
| func (f *queueMetricsFactory) newQueueMetrics(name string, clock clock.Clock) queueMetrics {
 | |
| 	mp := f.metricsProvider
 | |
| 	if len(name) == 0 || mp == (noopMetricsProvider{}) {
 | |
| 		return noMetrics{}
 | |
| 	}
 | |
| 	return &defaultQueueMetrics{
 | |
| 		clock:                   clock,
 | |
| 		depth:                   mp.NewDepthMetric(name),
 | |
| 		adds:                    mp.NewAddsMetric(name),
 | |
| 		latency:                 mp.NewLatencyMetric(name),
 | |
| 		workDuration:            mp.NewWorkDurationMetric(name),
 | |
| 		unfinishedWorkSeconds:   mp.NewUnfinishedWorkSecondsMetric(name),
 | |
| 		longestRunningProcessor: mp.NewLongestRunningProcessorSecondsMetric(name),
 | |
| 		addTimes:                map[t]time.Time{},
 | |
| 		processingStartTimes:    map[t]time.Time{},
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func newRetryMetrics(name string) retryMetrics {
 | |
| 	var ret *defaultRetryMetrics
 | |
| 	if len(name) == 0 {
 | |
| 		return ret
 | |
| 	}
 | |
| 	return &defaultRetryMetrics{
 | |
| 		retries: globalMetricsFactory.metricsProvider.NewRetriesMetric(name),
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // SetProvider sets the metrics provider for all subsequently created work
 | |
| // queues. Only the first call has an effect.
 | |
| func SetProvider(metricsProvider MetricsProvider) {
 | |
| 	globalMetricsFactory.setProvider(metricsProvider)
 | |
| }
 |