From d92f6eea1f65cffa23a4d1248c581b5a2bf5bdd5 Mon Sep 17 00:00:00 2001 From: Michael Crosby Date: Fri, 6 Oct 2017 12:04:05 -0400 Subject: [PATCH] Allow blocking and non-blocking metrics collection Signed-off-by: Michael Crosby --- metrics/cgroups/cgroups.go | 2 +- metrics/cgroups/metric.go | 9 +++++++-- metrics/cgroups/metrics.go | 15 ++++++++------- 3 files changed, 16 insertions(+), 10 deletions(-) diff --git a/metrics/cgroups/cgroups.go b/metrics/cgroups/cgroups.go index 36091e2e5..d67352ad1 100644 --- a/metrics/cgroups/cgroups.go +++ b/metrics/cgroups/cgroups.go @@ -71,7 +71,7 @@ func (m *cgroupsMonitor) Monitor(c runtime.Task) error { func (m *cgroupsMonitor) Stop(c runtime.Task) error { info := c.Info() t := c.(*linux.Task) - m.collector.collect(info.ID, info.Namespace, t.Cgroup(), m.collector.storedMetrics, nil) + m.collector.collect(info.ID, info.Namespace, t.Cgroup(), m.collector.storedMetrics, false, nil) m.collector.Remove(info.ID, info.Namespace) return nil } diff --git a/metrics/cgroups/metric.go b/metrics/cgroups/metric.go index 6be16bc58..1abe84153 100644 --- a/metrics/cgroups/metric.go +++ b/metrics/cgroups/metric.go @@ -28,13 +28,18 @@ func (m *metric) desc(ns *metrics.Namespace) *prometheus.Desc { return ns.NewDesc(m.name, m.help, m.unit, append([]string{"container_id", "namespace"}, m.labels...)...) } -func (m *metric) collect(id, namespace string, stats *cgroups.Metrics, ns *metrics.Namespace, ch chan<- prometheus.Metric) { +func (m *metric) collect(id, namespace string, stats *cgroups.Metrics, ns *metrics.Namespace, ch chan<- prometheus.Metric, block bool) { values := m.getValues(stats) for _, v := range values { + // block signals to block on the sending the metrics so none are missed + if block { + ch <- prometheus.MustNewConstMetric(m.desc(ns), m.vt, v.v, append([]string{id, namespace}, v.l...)...) + continue + } + // non-blocking metrics can be dropped if the chan is full select { case ch <- prometheus.MustNewConstMetric(m.desc(ns), m.vt, v.v, append([]string{id, namespace}, v.l...)...): default: - break } } } diff --git a/metrics/cgroups/metrics.go b/metrics/cgroups/metrics.go index 760d81445..f5a29b86a 100644 --- a/metrics/cgroups/metrics.go +++ b/metrics/cgroups/metrics.go @@ -77,22 +77,23 @@ func (c *collector) Collect(ch chan<- prometheus.Metric) { wg := &sync.WaitGroup{} for _, t := range c.cgroups { wg.Add(1) - go c.collect(t.id, t.namespace, t.cgroup, ch, wg) + go c.collect(t.id, t.namespace, t.cgroup, ch, true, wg) } -SelectLoop: +storedLoop: for { + // read stored metrics until the channel is flushed select { - case value := <-c.storedMetrics: - ch <- value + case m := <-c.storedMetrics: + ch <- m default: - break SelectLoop + break storedLoop } } c.mu.RUnlock() wg.Wait() } -func (c *collector) collect(id, namespace string, cg cgroups.Cgroup, ch chan<- prometheus.Metric, wg *sync.WaitGroup) { +func (c *collector) collect(id, namespace string, cg cgroups.Cgroup, ch chan<- prometheus.Metric, block bool, wg *sync.WaitGroup) { if wg != nil { defer wg.Done() } @@ -102,7 +103,7 @@ func (c *collector) collect(id, namespace string, cg cgroups.Cgroup, ch chan<- p return } for _, m := range c.metrics { - m.collect(id, namespace, stats, c.ns, ch) + m.collect(id, namespace, stats, c.ns, ch, block) } }