diff --git a/metrics/cgroups/cgroups.go b/metrics/cgroups/cgroups.go index 609d9f3e1..d67352ad1 100644 --- a/metrics/cgroups/cgroups.go +++ b/metrics/cgroups/cgroups.go @@ -70,6 +70,8 @@ func (m *cgroupsMonitor) Monitor(c runtime.Task) error { func (m *cgroupsMonitor) Stop(c runtime.Task) error { info := c.Info() + t := c.(*linux.Task) + m.collector.collect(info.ID, info.Namespace, t.Cgroup(), m.collector.storedMetrics, false, nil) m.collector.Remove(info.ID, info.Namespace) return nil } diff --git a/metrics/cgroups/metric.go b/metrics/cgroups/metric.go index da532dd00..1abe84153 100644 --- a/metrics/cgroups/metric.go +++ b/metrics/cgroups/metric.go @@ -28,9 +28,18 @@ func (m *metric) desc(ns *metrics.Namespace) *prometheus.Desc { return ns.NewDesc(m.name, m.help, m.unit, append([]string{"container_id", "namespace"}, m.labels...)...) } -func (m *metric) collect(id, namespace string, stats *cgroups.Metrics, ns *metrics.Namespace, ch chan<- prometheus.Metric) { +func (m *metric) collect(id, namespace string, stats *cgroups.Metrics, ns *metrics.Namespace, ch chan<- prometheus.Metric, block bool) { values := m.getValues(stats) for _, v := range values { - ch <- prometheus.MustNewConstMetric(m.desc(ns), m.vt, v.v, append([]string{id, namespace}, v.l...)...) + // block signals to block on the sending the metrics so none are missed + if block { + ch <- prometheus.MustNewConstMetric(m.desc(ns), m.vt, v.v, append([]string{id, namespace}, v.l...)...) + continue + } + // non-blocking metrics can be dropped if the chan is full + select { + case ch <- prometheus.MustNewConstMetric(m.desc(ns), m.vt, v.v, append([]string{id, namespace}, v.l...)...): + default: + } } } diff --git a/metrics/cgroups/metrics.go b/metrics/cgroups/metrics.go index a842ed0ad..f5a29b86a 100644 --- a/metrics/cgroups/metrics.go +++ b/metrics/cgroups/metrics.go @@ -40,6 +40,7 @@ func newCollector(ns *metrics.Namespace) *collector { c.metrics = append(c.metrics, memoryMetrics...) c.metrics = append(c.metrics, hugetlbMetrics...) c.metrics = append(c.metrics, blkioMetrics...) + c.storedMetrics = make(chan prometheus.Metric, 100*len(c.metrics)) ns.Add(c) return c } @@ -59,9 +60,10 @@ func taskID(id, namespace string) string { type collector struct { mu sync.RWMutex - cgroups map[string]*task - ns *metrics.Namespace - metrics []*metric + cgroups map[string]*task + ns *metrics.Namespace + metrics []*metric + storedMetrics chan prometheus.Metric } func (c *collector) Describe(ch chan<- *prometheus.Desc) { @@ -75,21 +77,33 @@ func (c *collector) Collect(ch chan<- prometheus.Metric) { wg := &sync.WaitGroup{} for _, t := range c.cgroups { wg.Add(1) - go c.collect(t.id, t.namespace, t.cgroup, ch, wg) + go c.collect(t.id, t.namespace, t.cgroup, ch, true, wg) + } +storedLoop: + for { + // read stored metrics until the channel is flushed + select { + case m := <-c.storedMetrics: + ch <- m + default: + break storedLoop + } } c.mu.RUnlock() wg.Wait() } -func (c *collector) collect(id, namespace string, cg cgroups.Cgroup, ch chan<- prometheus.Metric, wg *sync.WaitGroup) { - defer wg.Done() +func (c *collector) collect(id, namespace string, cg cgroups.Cgroup, ch chan<- prometheus.Metric, block bool, wg *sync.WaitGroup) { + if wg != nil { + defer wg.Done() + } stats, err := cg.Stat(cgroups.IgnoreNotExist) if err != nil { logrus.WithError(err).Errorf("stat cgroup %s", id) return } for _, m := range c.metrics { - m.collect(id, namespace, stats, c.ns, ch) + m.collect(id, namespace, stats, c.ns, ch, block) } }