Merge pull request #71300 from danielqsj/71165

Use prometheus conventions for workqueue metrics
This commit is contained in:
Kubernetes Prow Robot
2018-12-31 21:18:45 -08:00
committed by GitHub
5 changed files with 312 additions and 47 deletions

View File

@@ -25,6 +25,18 @@ import (
// Package prometheus sets the workqueue DefaultMetricsFactory to produce // Package prometheus sets the workqueue DefaultMetricsFactory to produce
// prometheus metrics. To use this package, you just have to import it. // prometheus metrics. To use this package, you just have to import it.
// Metrics subsystem and keys used by the workqueue.
const (
WorkQueueSubsystem = "workqueue"
DepthKey = "depth"
AddsKey = "adds_total"
QueueLatencyKey = "queue_latency_seconds"
WorkDurationKey = "work_duration_seconds"
UnfinishedWorkKey = "unfinished_work_seconds"
LongestRunningProcessorKey = "longest_running_processor_seconds"
RetriesKey = "retries_total"
)
func init() { func init() {
workqueue.SetProvider(prometheusMetricsProvider{}) workqueue.SetProvider(prometheusMetricsProvider{})
} }
@@ -32,6 +44,90 @@ func init() {
type prometheusMetricsProvider struct{} type prometheusMetricsProvider struct{}
func (prometheusMetricsProvider) NewDepthMetric(name string) workqueue.GaugeMetric { func (prometheusMetricsProvider) NewDepthMetric(name string) workqueue.GaugeMetric {
depth := prometheus.NewGauge(prometheus.GaugeOpts{
Subsystem: WorkQueueSubsystem,
Name: DepthKey,
Help: "Current depth of workqueue",
ConstLabels: prometheus.Labels{"name": name},
})
prometheus.Register(depth)
return depth
}
func (prometheusMetricsProvider) NewAddsMetric(name string) workqueue.CounterMetric {
adds := prometheus.NewCounter(prometheus.CounterOpts{
Subsystem: WorkQueueSubsystem,
Name: AddsKey,
Help: "Total number of adds handled by workqueue",
ConstLabels: prometheus.Labels{"name": name},
})
prometheus.Register(adds)
return adds
}
func (prometheusMetricsProvider) NewLatencyMetric(name string) workqueue.HistogramMetric {
latency := prometheus.NewHistogram(prometheus.HistogramOpts{
Subsystem: WorkQueueSubsystem,
Name: QueueLatencyKey,
Help: "How long in seconds an item stays in workqueue before being requested.",
ConstLabels: prometheus.Labels{"name": name},
Buckets: prometheus.ExponentialBuckets(10e-9, 10, 10),
})
prometheus.Register(latency)
return latency
}
func (prometheusMetricsProvider) NewWorkDurationMetric(name string) workqueue.HistogramMetric {
workDuration := prometheus.NewHistogram(prometheus.HistogramOpts{
Subsystem: WorkQueueSubsystem,
Name: WorkDurationKey,
Help: "How long in seconds processing an item from workqueue takes.",
ConstLabels: prometheus.Labels{"name": name},
Buckets: prometheus.ExponentialBuckets(10e-9, 10, 10),
})
prometheus.Register(workDuration)
return workDuration
}
func (prometheusMetricsProvider) NewUnfinishedWorkSecondsMetric(name string) workqueue.SettableGaugeMetric {
unfinished := prometheus.NewGauge(prometheus.GaugeOpts{
Subsystem: WorkQueueSubsystem,
Name: UnfinishedWorkKey,
Help: "How many seconds of work has done that " +
"is in progress and hasn't been observed by work_duration. Large " +
"values indicate stuck threads. One can deduce the number of stuck " +
"threads by observing the rate at which this increases.",
ConstLabels: prometheus.Labels{"name": name},
})
prometheus.Register(unfinished)
return unfinished
}
func (prometheusMetricsProvider) NewLongestRunningProcessorSecondsMetric(name string) workqueue.SettableGaugeMetric {
unfinished := prometheus.NewGauge(prometheus.GaugeOpts{
Subsystem: WorkQueueSubsystem,
Name: LongestRunningProcessorKey,
Help: "How many seconds has the longest running " +
"processor for workqueue been running.",
ConstLabels: prometheus.Labels{"name": name},
})
prometheus.Register(unfinished)
return unfinished
}
func (prometheusMetricsProvider) NewRetriesMetric(name string) workqueue.CounterMetric {
retries := prometheus.NewCounter(prometheus.CounterOpts{
Subsystem: WorkQueueSubsystem,
Name: RetriesKey,
Help: "Total number of retries handled by workqueue",
ConstLabels: prometheus.Labels{"name": name},
})
prometheus.Register(retries)
return retries
}
// TODO(danielqsj): Remove the following metrics, they are deprecated
func (prometheusMetricsProvider) NewDeprecatedDepthMetric(name string) workqueue.GaugeMetric {
depth := prometheus.NewGauge(prometheus.GaugeOpts{ depth := prometheus.NewGauge(prometheus.GaugeOpts{
Subsystem: name, Subsystem: name,
Name: "depth", Name: "depth",
@@ -41,7 +137,7 @@ func (prometheusMetricsProvider) NewDepthMetric(name string) workqueue.GaugeMetr
return depth return depth
} }
func (prometheusMetricsProvider) NewAddsMetric(name string) workqueue.CounterMetric { func (prometheusMetricsProvider) NewDeprecatedAddsMetric(name string) workqueue.CounterMetric {
adds := prometheus.NewCounter(prometheus.CounterOpts{ adds := prometheus.NewCounter(prometheus.CounterOpts{
Subsystem: name, Subsystem: name,
Name: "adds", Name: "adds",
@@ -51,7 +147,7 @@ func (prometheusMetricsProvider) NewAddsMetric(name string) workqueue.CounterMet
return adds return adds
} }
func (prometheusMetricsProvider) NewLatencyMetric(name string) workqueue.SummaryMetric { func (prometheusMetricsProvider) NewDeprecatedLatencyMetric(name string) workqueue.SummaryMetric {
latency := prometheus.NewSummary(prometheus.SummaryOpts{ latency := prometheus.NewSummary(prometheus.SummaryOpts{
Subsystem: name, Subsystem: name,
Name: "queue_latency", Name: "queue_latency",
@@ -61,7 +157,7 @@ func (prometheusMetricsProvider) NewLatencyMetric(name string) workqueue.Summary
return latency return latency
} }
func (prometheusMetricsProvider) NewWorkDurationMetric(name string) workqueue.SummaryMetric { func (prometheusMetricsProvider) NewDeprecatedWorkDurationMetric(name string) workqueue.SummaryMetric {
workDuration := prometheus.NewSummary(prometheus.SummaryOpts{ workDuration := prometheus.NewSummary(prometheus.SummaryOpts{
Subsystem: name, Subsystem: name,
Name: "work_duration", Name: "work_duration",
@@ -71,7 +167,7 @@ func (prometheusMetricsProvider) NewWorkDurationMetric(name string) workqueue.Su
return workDuration return workDuration
} }
func (prometheusMetricsProvider) NewUnfinishedWorkSecondsMetric(name string) workqueue.SettableGaugeMetric { func (prometheusMetricsProvider) NewDeprecatedUnfinishedWorkSecondsMetric(name string) workqueue.SettableGaugeMetric {
unfinished := prometheus.NewGauge(prometheus.GaugeOpts{ unfinished := prometheus.NewGauge(prometheus.GaugeOpts{
Subsystem: name, Subsystem: name,
Name: "unfinished_work_seconds", Name: "unfinished_work_seconds",
@@ -84,7 +180,7 @@ func (prometheusMetricsProvider) NewUnfinishedWorkSecondsMetric(name string) wor
return unfinished return unfinished
} }
func (prometheusMetricsProvider) NewLongestRunningProcessorMicrosecondsMetric(name string) workqueue.SettableGaugeMetric { func (prometheusMetricsProvider) NewDeprecatedLongestRunningProcessorMicrosecondsMetric(name string) workqueue.SettableGaugeMetric {
unfinished := prometheus.NewGauge(prometheus.GaugeOpts{ unfinished := prometheus.NewGauge(prometheus.GaugeOpts{
Subsystem: name, Subsystem: name,
Name: "longest_running_processor_microseconds", Name: "longest_running_processor_microseconds",
@@ -95,7 +191,7 @@ func (prometheusMetricsProvider) NewLongestRunningProcessorMicrosecondsMetric(na
return unfinished return unfinished
} }
func (prometheusMetricsProvider) NewRetriesMetric(name string) workqueue.CounterMetric { func (prometheusMetricsProvider) NewDeprecatedRetriesMetric(name string) workqueue.CounterMetric {
retries := prometheus.NewCounter(prometheus.CounterOpts{ retries := prometheus.NewCounter(prometheus.CounterOpts{
Subsystem: name, Subsystem: name,
Name: "retries", Name: "retries",

View File

@@ -43,12 +43,13 @@ func NewNamedDelayingQueue(name string) DelayingInterface {
func newDelayingQueue(clock clock.Clock, name string) DelayingInterface { func newDelayingQueue(clock clock.Clock, name string) DelayingInterface {
ret := &delayingType{ ret := &delayingType{
Interface: NewNamed(name), Interface: NewNamed(name),
clock: clock, clock: clock,
heartbeat: clock.NewTicker(maxWait), heartbeat: clock.NewTicker(maxWait),
stopCh: make(chan struct{}), stopCh: make(chan struct{}),
waitingForAddCh: make(chan *waitFor, 1000), waitingForAddCh: make(chan *waitFor, 1000),
metrics: newRetryMetrics(name), metrics: newRetryMetrics(name),
deprecatedMetrics: newDeprecatedRetryMetrics(name),
} }
go ret.waitingLoop() go ret.waitingLoop()
@@ -73,7 +74,8 @@ type delayingType struct {
waitingForAddCh chan *waitFor waitingForAddCh chan *waitFor
// metrics counts the number of retries // metrics counts the number of retries
metrics retryMetrics metrics retryMetrics
deprecatedMetrics retryMetrics
} }
// waitFor holds the data to add and the time it should be added // waitFor holds the data to add and the time it should be added
@@ -146,6 +148,7 @@ func (q *delayingType) AddAfter(item interface{}, duration time.Duration) {
} }
q.metrics.retry() q.metrics.retry()
q.deprecatedMetrics.retry()
// immediately add things with no delay // immediately add things with no delay
if duration <= 0 { if duration <= 0 {

View File

@@ -57,6 +57,11 @@ type SummaryMetric interface {
Observe(float64) Observe(float64)
} }
// HistogramMetric counts individual observations.
type HistogramMetric interface {
Observe(float64)
}
type noopMetric struct{} type noopMetric struct{}
func (noopMetric) Inc() {} func (noopMetric) Inc() {}
@@ -73,15 +78,23 @@ type defaultQueueMetrics struct {
// total number of adds handled by a workqueue // total number of adds handled by a workqueue
adds CounterMetric adds CounterMetric
// how long an item stays in a workqueue // how long an item stays in a workqueue
latency SummaryMetric latency HistogramMetric
// how long processing an item from a workqueue takes // how long processing an item from a workqueue takes
workDuration SummaryMetric workDuration HistogramMetric
addTimes map[t]time.Time addTimes map[t]time.Time
processingStartTimes map[t]time.Time processingStartTimes map[t]time.Time
// how long have current threads been working? // how long have current threads been working?
unfinishedWorkSeconds SettableGaugeMetric unfinishedWorkSeconds SettableGaugeMetric
longestRunningProcessor SettableGaugeMetric longestRunningProcessor SettableGaugeMetric
// TODO(danielqsj): Remove the following metrics, they are deprecated
deprecatedDepth GaugeMetric
deprecatedAdds CounterMetric
deprecatedLatency SummaryMetric
deprecatedWorkDuration SummaryMetric
deprecatedUnfinishedWorkSeconds SettableGaugeMetric
deprecatedLongestRunningProcessor SettableGaugeMetric
} }
func (m *defaultQueueMetrics) add(item t) { func (m *defaultQueueMetrics) add(item t) {
@@ -90,7 +103,9 @@ func (m *defaultQueueMetrics) add(item t) {
} }
m.adds.Inc() m.adds.Inc()
m.deprecatedAdds.Inc()
m.depth.Inc() m.depth.Inc()
m.deprecatedDepth.Inc()
if _, exists := m.addTimes[item]; !exists { if _, exists := m.addTimes[item]; !exists {
m.addTimes[item] = m.clock.Now() m.addTimes[item] = m.clock.Now()
} }
@@ -102,9 +117,11 @@ func (m *defaultQueueMetrics) get(item t) {
} }
m.depth.Dec() m.depth.Dec()
m.deprecatedDepth.Dec()
m.processingStartTimes[item] = m.clock.Now() m.processingStartTimes[item] = m.clock.Now()
if startTime, exists := m.addTimes[item]; exists { if startTime, exists := m.addTimes[item]; exists {
m.latency.Observe(m.sinceInMicroseconds(startTime)) m.latency.Observe(m.sinceInSeconds(startTime))
m.deprecatedLatency.Observe(m.sinceInMicroseconds(startTime))
delete(m.addTimes, item) delete(m.addTimes, item)
} }
} }
@@ -115,7 +132,8 @@ func (m *defaultQueueMetrics) done(item t) {
} }
if startTime, exists := m.processingStartTimes[item]; exists { if startTime, exists := m.processingStartTimes[item]; exists {
m.workDuration.Observe(m.sinceInMicroseconds(startTime)) m.workDuration.Observe(m.sinceInSeconds(startTime))
m.deprecatedWorkDuration.Observe(m.sinceInMicroseconds(startTime))
delete(m.processingStartTimes, item) delete(m.processingStartTimes, item)
} }
} }
@@ -135,7 +153,9 @@ func (m *defaultQueueMetrics) updateUnfinishedWork() {
// Convert to seconds; microseconds is unhelpfully granular for this. // Convert to seconds; microseconds is unhelpfully granular for this.
total /= 1000000 total /= 1000000
m.unfinishedWorkSeconds.Set(total) m.unfinishedWorkSeconds.Set(total)
m.longestRunningProcessor.Set(oldest) // in microseconds. m.deprecatedUnfinishedWorkSeconds.Set(total)
m.longestRunningProcessor.Set(oldest / 1000000)
m.deprecatedLongestRunningProcessor.Set(oldest) // in microseconds.
} }
type noMetrics struct{} type noMetrics struct{}
@@ -150,6 +170,11 @@ func (m *defaultQueueMetrics) sinceInMicroseconds(start time.Time) float64 {
return float64(m.clock.Since(start).Nanoseconds() / time.Microsecond.Nanoseconds()) return float64(m.clock.Since(start).Nanoseconds() / time.Microsecond.Nanoseconds())
} }
// Gets the time since the specified start in seconds.
func (m *defaultQueueMetrics) sinceInSeconds(start time.Time) float64 {
return m.clock.Since(start).Seconds()
}
type retryMetrics interface { type retryMetrics interface {
retry() retry()
} }
@@ -170,11 +195,18 @@ func (m *defaultRetryMetrics) retry() {
type MetricsProvider interface { type MetricsProvider interface {
NewDepthMetric(name string) GaugeMetric NewDepthMetric(name string) GaugeMetric
NewAddsMetric(name string) CounterMetric NewAddsMetric(name string) CounterMetric
NewLatencyMetric(name string) SummaryMetric NewLatencyMetric(name string) HistogramMetric
NewWorkDurationMetric(name string) SummaryMetric NewWorkDurationMetric(name string) HistogramMetric
NewUnfinishedWorkSecondsMetric(name string) SettableGaugeMetric NewUnfinishedWorkSecondsMetric(name string) SettableGaugeMetric
NewLongestRunningProcessorMicrosecondsMetric(name string) SettableGaugeMetric NewLongestRunningProcessorSecondsMetric(name string) SettableGaugeMetric
NewRetriesMetric(name string) CounterMetric NewRetriesMetric(name string) CounterMetric
NewDeprecatedDepthMetric(name string) GaugeMetric
NewDeprecatedAddsMetric(name string) CounterMetric
NewDeprecatedLatencyMetric(name string) SummaryMetric
NewDeprecatedWorkDurationMetric(name string) SummaryMetric
NewDeprecatedUnfinishedWorkSecondsMetric(name string) SettableGaugeMetric
NewDeprecatedLongestRunningProcessorMicrosecondsMetric(name string) SettableGaugeMetric
NewDeprecatedRetriesMetric(name string) CounterMetric
} }
type noopMetricsProvider struct{} type noopMetricsProvider struct{}
@@ -187,11 +219,11 @@ func (_ noopMetricsProvider) NewAddsMetric(name string) CounterMetric {
return noopMetric{} return noopMetric{}
} }
func (_ noopMetricsProvider) NewLatencyMetric(name string) SummaryMetric { func (_ noopMetricsProvider) NewLatencyMetric(name string) HistogramMetric {
return noopMetric{} return noopMetric{}
} }
func (_ noopMetricsProvider) NewWorkDurationMetric(name string) SummaryMetric { func (_ noopMetricsProvider) NewWorkDurationMetric(name string) HistogramMetric {
return noopMetric{} return noopMetric{}
} }
@@ -199,7 +231,7 @@ func (_ noopMetricsProvider) NewUnfinishedWorkSecondsMetric(name string) Settabl
return noopMetric{} return noopMetric{}
} }
func (_ noopMetricsProvider) NewLongestRunningProcessorMicrosecondsMetric(name string) SettableGaugeMetric { func (_ noopMetricsProvider) NewLongestRunningProcessorSecondsMetric(name string) SettableGaugeMetric {
return noopMetric{} return noopMetric{}
} }
@@ -207,6 +239,34 @@ func (_ noopMetricsProvider) NewRetriesMetric(name string) CounterMetric {
return noopMetric{} return noopMetric{}
} }
func (_ noopMetricsProvider) NewDeprecatedDepthMetric(name string) GaugeMetric {
return noopMetric{}
}
func (_ noopMetricsProvider) NewDeprecatedAddsMetric(name string) CounterMetric {
return noopMetric{}
}
func (_ noopMetricsProvider) NewDeprecatedLatencyMetric(name string) SummaryMetric {
return noopMetric{}
}
func (_ noopMetricsProvider) NewDeprecatedWorkDurationMetric(name string) SummaryMetric {
return noopMetric{}
}
func (_ noopMetricsProvider) NewDeprecatedUnfinishedWorkSecondsMetric(name string) SettableGaugeMetric {
return noopMetric{}
}
func (_ noopMetricsProvider) NewDeprecatedLongestRunningProcessorMicrosecondsMetric(name string) SettableGaugeMetric {
return noopMetric{}
}
func (_ noopMetricsProvider) NewDeprecatedRetriesMetric(name string) CounterMetric {
return noopMetric{}
}
var globalMetricsFactory = queueMetricsFactory{ var globalMetricsFactory = queueMetricsFactory{
metricsProvider: noopMetricsProvider{}, metricsProvider: noopMetricsProvider{},
} }
@@ -229,15 +289,21 @@ func (f *queueMetricsFactory) newQueueMetrics(name string, clock clock.Clock) qu
return noMetrics{} return noMetrics{}
} }
return &defaultQueueMetrics{ return &defaultQueueMetrics{
clock: clock, clock: clock,
depth: mp.NewDepthMetric(name), depth: mp.NewDepthMetric(name),
adds: mp.NewAddsMetric(name), adds: mp.NewAddsMetric(name),
latency: mp.NewLatencyMetric(name), latency: mp.NewLatencyMetric(name),
workDuration: mp.NewWorkDurationMetric(name), workDuration: mp.NewWorkDurationMetric(name),
unfinishedWorkSeconds: mp.NewUnfinishedWorkSecondsMetric(name), unfinishedWorkSeconds: mp.NewUnfinishedWorkSecondsMetric(name),
longestRunningProcessor: mp.NewLongestRunningProcessorMicrosecondsMetric(name), longestRunningProcessor: mp.NewLongestRunningProcessorSecondsMetric(name),
addTimes: map[t]time.Time{}, deprecatedDepth: mp.NewDeprecatedDepthMetric(name),
processingStartTimes: map[t]time.Time{}, deprecatedAdds: mp.NewDeprecatedAddsMetric(name),
deprecatedLatency: mp.NewDeprecatedLatencyMetric(name),
deprecatedWorkDuration: mp.NewDeprecatedWorkDurationMetric(name),
deprecatedUnfinishedWorkSeconds: mp.NewDeprecatedUnfinishedWorkSecondsMetric(name),
deprecatedLongestRunningProcessor: mp.NewDeprecatedLongestRunningProcessorMicrosecondsMetric(name),
addTimes: map[t]time.Time{},
processingStartTimes: map[t]time.Time{},
} }
} }
@@ -251,6 +317,16 @@ func newRetryMetrics(name string) retryMetrics {
} }
} }
func newDeprecatedRetryMetrics(name string) retryMetrics {
var ret *defaultRetryMetrics
if len(name) == 0 {
return ret
}
return &defaultRetryMetrics{
retries: globalMetricsFactory.metricsProvider.NewDeprecatedRetriesMetric(name),
}
}
// SetProvider sets the metrics provider for all subsequently created work // SetProvider sets the metrics provider for all subsequently created work
// queues. Only the first call has an effect. // queues. Only the first call has an effect.
func SetProvider(metricsProvider MetricsProvider) { func SetProvider(metricsProvider MetricsProvider) {

View File

@@ -137,6 +137,14 @@ type testMetricsProvider struct {
unfinished testMetric unfinished testMetric
longest testMetric longest testMetric
retries testMetric retries testMetric
// deprecated metrics
deprecatedDepth testMetric
deprecatedAdds testMetric
deprecatedLatency testMetric
deprecatedDuration testMetric
deprecatedUnfinished testMetric
deprecatedLongest testMetric
deprecatedRetries testMetric
} }
func (m *testMetricsProvider) NewDepthMetric(name string) GaugeMetric { func (m *testMetricsProvider) NewDepthMetric(name string) GaugeMetric {
@@ -147,11 +155,11 @@ func (m *testMetricsProvider) NewAddsMetric(name string) CounterMetric {
return &m.adds return &m.adds
} }
func (m *testMetricsProvider) NewLatencyMetric(name string) SummaryMetric { func (m *testMetricsProvider) NewLatencyMetric(name string) HistogramMetric {
return &m.latency return &m.latency
} }
func (m *testMetricsProvider) NewWorkDurationMetric(name string) SummaryMetric { func (m *testMetricsProvider) NewWorkDurationMetric(name string) HistogramMetric {
return &m.duration return &m.duration
} }
@@ -159,7 +167,7 @@ func (m *testMetricsProvider) NewUnfinishedWorkSecondsMetric(name string) Settab
return &m.unfinished return &m.unfinished
} }
func (m *testMetricsProvider) NewLongestRunningProcessorMicrosecondsMetric(name string) SettableGaugeMetric { func (m *testMetricsProvider) NewLongestRunningProcessorSecondsMetric(name string) SettableGaugeMetric {
return &m.longest return &m.longest
} }
@@ -167,6 +175,34 @@ func (m *testMetricsProvider) NewRetriesMetric(name string) CounterMetric {
return &m.retries return &m.retries
} }
func (m *testMetricsProvider) NewDeprecatedDepthMetric(name string) GaugeMetric {
return &m.deprecatedDepth
}
func (m *testMetricsProvider) NewDeprecatedAddsMetric(name string) CounterMetric {
return &m.deprecatedAdds
}
func (m *testMetricsProvider) NewDeprecatedLatencyMetric(name string) SummaryMetric {
return &m.deprecatedLatency
}
func (m *testMetricsProvider) NewDeprecatedWorkDurationMetric(name string) SummaryMetric {
return &m.deprecatedDuration
}
func (m *testMetricsProvider) NewDeprecatedUnfinishedWorkSecondsMetric(name string) SettableGaugeMetric {
return &m.deprecatedUnfinished
}
func (m *testMetricsProvider) NewDeprecatedLongestRunningProcessorMicrosecondsMetric(name string) SettableGaugeMetric {
return &m.deprecatedLongest
}
func (m *testMetricsProvider) NewDeprecatedRetriesMetric(name string) CounterMetric {
return &m.deprecatedRetries
}
func TestSinceInMicroseconds(t *testing.T) { func TestSinceInMicroseconds(t *testing.T) {
mp := testMetricsProvider{} mp := testMetricsProvider{}
c := clock.NewFakeClock(time.Now()) c := clock.NewFakeClock(time.Now())
@@ -201,10 +237,18 @@ func TestMetrics(t *testing.T) {
t.Errorf("expected %v, got %v", e, a) t.Errorf("expected %v, got %v", e, a)
} }
if e, a := 1.0, mp.deprecatedAdds.gaugeValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 1.0, mp.depth.gaugeValue(); e != a { if e, a := 1.0, mp.depth.gaugeValue(); e != a {
t.Errorf("expected %v, got %v", e, a) t.Errorf("expected %v, got %v", e, a)
} }
if e, a := 1.0, mp.deprecatedDepth.gaugeValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
c.Step(50 * time.Microsecond) c.Step(50 * time.Microsecond)
// Start processing // Start processing
@@ -213,15 +257,24 @@ func TestMetrics(t *testing.T) {
t.Errorf("Expected %v, got %v", "foo", i) t.Errorf("Expected %v, got %v", "foo", i)
} }
if e, a := 50.0, mp.latency.observationValue(); e != a { if e, a := 5e-05, mp.latency.observationValue(); e != a {
t.Errorf("expected %v, got %v", e, a) t.Errorf("expected %v, got %v", e, a)
} }
if e, a := 1, mp.latency.observationCount(); e != a { if e, a := 1, mp.latency.observationCount(); e != a {
t.Errorf("expected %v, got %v", e, a) t.Errorf("expected %v, got %v", e, a)
} }
if e, a := 50.0, mp.deprecatedLatency.observationValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 1, mp.deprecatedLatency.observationCount(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 0.0, mp.depth.gaugeValue(); e != a { if e, a := 0.0, mp.depth.gaugeValue(); e != a {
t.Errorf("expected %v, got %v", e, a) t.Errorf("expected %v, got %v", e, a)
} }
if e, a := 0.0, mp.deprecatedDepth.gaugeValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
// Add it back while processing; multiple adds of the same item are // Add it back while processing; multiple adds of the same item are
// de-duped. // de-duped.
@@ -233,27 +286,42 @@ func TestMetrics(t *testing.T) {
if e, a := 2.0, mp.adds.gaugeValue(); e != a { if e, a := 2.0, mp.adds.gaugeValue(); e != a {
t.Errorf("expected %v, got %v", e, a) t.Errorf("expected %v, got %v", e, a)
} }
if e, a := 2.0, mp.deprecatedAdds.gaugeValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
// One thing remains in the queue // One thing remains in the queue
if e, a := 1.0, mp.depth.gaugeValue(); e != a { if e, a := 1.0, mp.depth.gaugeValue(); e != a {
t.Errorf("expected %v, got %v", e, a) t.Errorf("expected %v, got %v", e, a)
} }
if e, a := 1.0, mp.deprecatedDepth.gaugeValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
c.Step(25 * time.Microsecond) c.Step(25 * time.Microsecond)
// Finish it up // Finish it up
q.Done(i) q.Done(i)
if e, a := 25.0, mp.duration.observationValue(); e != a { if e, a := 2.5e-05, mp.duration.observationValue(); e != a {
t.Errorf("expected %v, got %v", e, a) t.Errorf("expected %v, got %v", e, a)
} }
if e, a := 1, mp.duration.observationCount(); e != a { if e, a := 1, mp.duration.observationCount(); e != a {
t.Errorf("expected %v, got %v", e, a) t.Errorf("expected %v, got %v", e, a)
} }
if e, a := 25.0, mp.deprecatedDuration.observationValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 1, mp.deprecatedDuration.observationCount(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
// One thing remains in the queue // One thing remains in the queue
if e, a := 1.0, mp.depth.gaugeValue(); e != a { if e, a := 1.0, mp.depth.gaugeValue(); e != a {
t.Errorf("expected %v, got %v", e, a) t.Errorf("expected %v, got %v", e, a)
} }
if e, a := 1.0, mp.deprecatedDepth.gaugeValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
// It should be back on the queue // It should be back on the queue
i, _ = q.Get() i, _ = q.Get()
@@ -261,33 +329,54 @@ func TestMetrics(t *testing.T) {
t.Errorf("Expected %v, got %v", "foo", i) t.Errorf("Expected %v, got %v", "foo", i)
} }
if e, a := 25.0, mp.latency.observationValue(); e != a { if e, a := 2.5e-05, mp.latency.observationValue(); e != a {
t.Errorf("expected %v, got %v", e, a) t.Errorf("expected %v, got %v", e, a)
} }
if e, a := 2, mp.latency.observationCount(); e != a { if e, a := 2, mp.latency.observationCount(); e != a {
t.Errorf("expected %v, got %v", e, a) t.Errorf("expected %v, got %v", e, a)
} }
if e, a := 25.0, mp.deprecatedLatency.observationValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 2, mp.deprecatedLatency.observationCount(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
// use a channel to ensure we don't look at the metric before it's // use a channel to ensure we don't look at the metric before it's
// been set. // been set.
ch := make(chan struct{}, 1) ch := make(chan struct{}, 1)
mp.unfinished.notifyCh = ch mp.unfinished.notifyCh = ch
mp.deprecatedUnfinished.notifyCh = ch
c.Step(time.Millisecond) c.Step(time.Millisecond)
<-ch <-ch
<-ch
mp.unfinished.notifyCh = nil mp.unfinished.notifyCh = nil
mp.deprecatedUnfinished.notifyCh = nil
if e, a := .001, mp.unfinished.gaugeValue(); e != a { if e, a := .001, mp.unfinished.gaugeValue(); e != a {
t.Errorf("expected %v, got %v", e, a) t.Errorf("expected %v, got %v", e, a)
} }
if e, a := 1000.0, mp.longest.gaugeValue(); e != a { if e, a := .001, mp.deprecatedUnfinished.gaugeValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := .001, mp.longest.gaugeValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 1000.0, mp.deprecatedLongest.gaugeValue(); e != a {
t.Errorf("expected %v, got %v", e, a) t.Errorf("expected %v, got %v", e, a)
} }
// Finish that one up // Finish that one up
q.Done(i) q.Done(i)
if e, a := 1000.0, mp.duration.observationValue(); e != a { if e, a := .001, mp.duration.observationValue(); e != a {
t.Errorf("expected %v, got %v", e, a) t.Errorf("expected %v, got %v", e, a)
} }
if e, a := 2, mp.duration.observationCount(); e != a { if e, a := 2, mp.duration.observationCount(); e != a {
t.Errorf("expected %v, got %v", e, a) t.Errorf("expected %v, got %v", e, a)
} }
if e, a := 1000.0, mp.deprecatedDuration.observationValue(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
if e, a := 2, mp.deprecatedDuration.observationCount(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
} }

View File

@@ -28,12 +28,13 @@ func TestRateLimitingQueue(t *testing.T) {
queue := NewRateLimitingQueue(limiter).(*rateLimitingType) queue := NewRateLimitingQueue(limiter).(*rateLimitingType)
fakeClock := clock.NewFakeClock(time.Now()) fakeClock := clock.NewFakeClock(time.Now())
delayingQueue := &delayingType{ delayingQueue := &delayingType{
Interface: New(), Interface: New(),
clock: fakeClock, clock: fakeClock,
heartbeat: fakeClock.NewTicker(maxWait), heartbeat: fakeClock.NewTicker(maxWait),
stopCh: make(chan struct{}), stopCh: make(chan struct{}),
waitingForAddCh: make(chan *waitFor, 1000), waitingForAddCh: make(chan *waitFor, 1000),
metrics: newRetryMetrics(""), metrics: newRetryMetrics(""),
deprecatedMetrics: newDeprecatedRetryMetrics(""),
} }
queue.DelayingInterface = delayingQueue queue.DelayingInterface = delayingQueue