From 6ec84ef83c69b5dea436d8e7b15e8fabe07ff85e Mon Sep 17 00:00:00 2001 From: Michael Crosby Date: Tue, 27 Jun 2017 10:26:02 -0700 Subject: [PATCH] Add namespace to container metrics Signed-off-by: Michael Crosby --- container_test.go | 2 +- linux/runtime.go | 18 +++--------------- linux/shim/init.go | 2 +- metrics/cgroups/cgroups.go | 14 +++++--------- metrics/cgroups/metric.go | 7 ++++--- metrics/cgroups/metrics.go | 38 ++++++++++++++++++++++++-------------- metrics/cgroups/oom.go | 22 ++++++++++++---------- 7 files changed, 50 insertions(+), 53 deletions(-) diff --git a/container_test.go b/container_test.go index ccf815099..56888dcbc 100644 --- a/container_test.go +++ b/container_test.go @@ -276,7 +276,7 @@ func TestContainerExec(t *testing.T) { "exit 6", } - process, err := task.Exec(ctx, &processSpec, empty()) + process, err := task.Exec(ctx, processSpec, empty()) if err != nil { t.Error(err) return diff --git a/linux/runtime.go b/linux/runtime.go index 5a2145dd3..f903c7665 100644 --- a/linux/runtime.go +++ b/linux/runtime.go @@ -400,21 +400,9 @@ func (r *Runtime) terminate(ctx context.Context, bundle *bundle, ns, id string) if err != nil { return err } - if err := rt.Kill(ctx, id, int(unix.SIGKILL), &runc.KillOpts{All: true}); err != nil { - log.G(ctx).WithError(err).Warnf("kill all processes for %s", id) - } - // it can take a while for the container to be killed so poll for the container's status - // until it is in a stopped state - status := "running" - for status != "stopped" { - c, err := rt.State(ctx, id) - if err != nil { - break - } - status = c.Status - time.Sleep(50 * time.Millisecond) - } - if err := rt.Delete(ctx, id); err != nil { + if err := rt.Delete(ctx, id, &runc.DeleteOpts{ + Force: true, + }); err != nil { log.G(ctx).WithError(err).Warnf("delete runtime state %s", id) } if err := unix.Unmount(filepath.Join(bundle.path, "rootfs"), 0); err != nil { diff --git a/linux/shim/init.go b/linux/shim/init.go index b1897e8b9..81aebcb28 100644 --- a/linux/shim/init.go +++ b/linux/shim/init.go @@ -205,7 +205,7 @@ func (p *initProcess) Delete(context context.Context) error { } p.killAll(context) p.Wait() - err = p.runc.Delete(context, p.id) + err = p.runc.Delete(context, p.id, nil) if p.io != nil { for _, c := range p.closers { c.Close() diff --git a/metrics/cgroups/cgroups.go b/metrics/cgroups/cgroups.go index 2eff4dd28..10f2a80ac 100644 --- a/metrics/cgroups/cgroups.go +++ b/metrics/cgroups/cgroups.go @@ -3,7 +3,6 @@ package cgroups import ( - "fmt" "time" "github.com/containerd/cgroups" @@ -44,12 +43,8 @@ type cgroupsMonitor struct { events chan<- *plugin.Event } -func getID(t plugin.Task) string { - return fmt.Sprintf("%s-%s", t.Info().Namespace, t.Info().ID) -} - func (m *cgroupsMonitor) Monitor(c plugin.Task) error { - id := getID(c) + info := c.Info() state, err := c.State(m.context) if err != nil { return err @@ -58,14 +53,15 @@ func (m *cgroupsMonitor) Monitor(c plugin.Task) error { if err != nil { return err } - if err := m.collector.Add(id, cg); err != nil { + if err := m.collector.Add(info.ID, info.Namespace, cg); err != nil { return err } - return m.oom.Add(id, cg, m.trigger) + return m.oom.Add(info.ID, info.Namespace, cg, m.trigger) } func (m *cgroupsMonitor) Stop(c plugin.Task) error { - m.collector.Remove(getID(c)) + info := c.Info() + m.collector.Remove(info.ID, info.Namespace) return nil } diff --git a/metrics/cgroups/metric.go b/metrics/cgroups/metric.go index 1fc71e7cb..bc9d170b5 100644 --- a/metrics/cgroups/metric.go +++ b/metrics/cgroups/metric.go @@ -22,12 +22,13 @@ type metric struct { } func (m *metric) desc(ns *metrics.Namespace) *prometheus.Desc { - return ns.NewDesc(m.name, m.help, m.unit, append([]string{"id"}, m.labels...)...) + // the namespace label is for containerd namespaces + return ns.NewDesc(m.name, m.help, m.unit, append([]string{"id", "namespace"}, m.labels...)...) } -func (m *metric) collect(id string, stats *cgroups.Stats, ns *metrics.Namespace, ch chan<- prometheus.Metric) { +func (m *metric) collect(id, namespace string, stats *cgroups.Stats, ns *metrics.Namespace, ch chan<- prometheus.Metric) { values := m.getValues(stats) for _, v := range values { - ch <- prometheus.MustNewConstMetric(m.desc(ns), m.vt, v.v, append([]string{id}, v.l...)...) + ch <- prometheus.MustNewConstMetric(m.desc(ns), m.vt, v.v, append([]string{id, namespace}, v.l...)...) } } diff --git a/metrics/cgroups/metrics.go b/metrics/cgroups/metrics.go index f32980635..ed170b0b5 100644 --- a/metrics/cgroups/metrics.go +++ b/metrics/cgroups/metrics.go @@ -26,7 +26,7 @@ func NewCollector(ns *metrics.Namespace) *Collector { // add machine cpus and memory info c := &Collector{ ns: ns, - cgroups: make(map[string]cgroups.Cgroup), + cgroups: make(map[string]*task), } c.metrics = append(c.metrics, pidMetrics...) c.metrics = append(c.metrics, cpuMetrics...) @@ -37,12 +37,18 @@ func NewCollector(ns *metrics.Namespace) *Collector { return c } +type task struct { + id string + namespace string + cgroup cgroups.Cgroup +} + // Collector provides the ability to collect container stats and export // them in the prometheus format type Collector struct { mu sync.RWMutex - cgroups map[string]cgroups.Cgroup + cgroups map[string]*task ns *metrics.Namespace metrics []*metric } @@ -56,15 +62,15 @@ func (c *Collector) Describe(ch chan<- *prometheus.Desc) { func (c *Collector) Collect(ch chan<- prometheus.Metric) { c.mu.RLock() wg := &sync.WaitGroup{} - for id, cg := range c.cgroups { + for _, t := range c.cgroups { wg.Add(1) - go c.collect(id, cg, ch, wg) + go c.collect(t.id, t.namespace, t.cgroup, ch, wg) } c.mu.RUnlock() wg.Wait() } -func (c *Collector) collect(id string, cg cgroups.Cgroup, ch chan<- prometheus.Metric, wg *sync.WaitGroup) { +func (c *Collector) collect(id, namespace string, cg cgroups.Cgroup, ch chan<- prometheus.Metric, wg *sync.WaitGroup) { defer wg.Done() stats, err := cg.Stat(cgroups.IgnoreNotExist) if err != nil { @@ -72,38 +78,42 @@ func (c *Collector) collect(id string, cg cgroups.Cgroup, ch chan<- prometheus.M return } for _, m := range c.metrics { - m.collect(id, stats, c.ns, ch) + m.collect(id, namespace, stats, c.ns, ch) } } // Add adds the provided cgroup and id so that metrics are collected and exported -func (c *Collector) Add(id string, cg cgroups.Cgroup) error { +func (c *Collector) Add(id, namespace string, cg cgroups.Cgroup) error { c.mu.Lock() defer c.mu.Unlock() - if _, ok := c.cgroups[id]; ok { + if _, ok := c.cgroups[id+namespace]; ok { return ErrAlreadyCollected } - c.cgroups[id] = cg + c.cgroups[id+namespace] = &task{ + id: id, + namespace: namespace, + cgroup: cg, + } return nil } // Get returns the cgroup that is being collected under the provided id // returns ErrCgroupNotExists if the id is not being collected -func (c *Collector) Get(id string) (cgroups.Cgroup, error) { +func (c *Collector) Get(id, namespace string) (cgroups.Cgroup, error) { c.mu.Lock() defer c.mu.Unlock() - cg, ok := c.cgroups[id] + t, ok := c.cgroups[id+namespace] if !ok { return nil, ErrCgroupNotExists } - return cg, nil + return t.cgroup, nil } // Remove removes the provided cgroup by id from the collector -func (c *Collector) Remove(id string) { +func (c *Collector) Remove(id, namespace string) { c.mu.Lock() defer c.mu.Unlock() - delete(c.cgroups, id) + delete(c.cgroups, id+namespace) } func blkioValues(l []cgroups.BlkioEntry) []value { diff --git a/metrics/cgroups/oom.go b/metrics/cgroups/oom.go index 239723e5d..493dd0be2 100644 --- a/metrics/cgroups/oom.go +++ b/metrics/cgroups/oom.go @@ -17,7 +17,7 @@ func NewOOMCollector(ns *metrics.Namespace) (*OOMCollector, error) { } c := &OOMCollector{ fd: fd, - memoryOOM: ns.NewLabeledGauge("memory_oom", "The number of times a container received an oom event", metrics.Total, "id"), + memoryOOM: ns.NewLabeledGauge("memory_oom", "The number of times a container received an oom event", metrics.Total, "id", "namespace"), set: make(map[uintptr]*oom), } go c.start() @@ -33,12 +33,13 @@ type OOMCollector struct { } type oom struct { - id string - c cgroups.Cgroup - triggers []Trigger + id string + namespace string + c cgroups.Cgroup + triggers []Trigger } -func (o *OOMCollector) Add(id string, cg cgroups.Cgroup, triggers ...Trigger) error { +func (o *OOMCollector) Add(id, namespace string, cg cgroups.Cgroup, triggers ...Trigger) error { o.mu.Lock() defer o.mu.Unlock() fd, err := cg.OOMEventFD() @@ -46,12 +47,13 @@ func (o *OOMCollector) Add(id string, cg cgroups.Cgroup, triggers ...Trigger) er return err } o.set[fd] = &oom{ - id: id, - c: cg, - triggers: triggers, + id: id, + c: cg, + triggers: triggers, + namespace: namespace, } // set the gauge's default value - o.memoryOOM.WithValues(id).Set(0) + o.memoryOOM.WithValues(id, namespace).Set(0) event := unix.EpollEvent{ Fd: int32(fd), Events: unix.EPOLLHUP | unix.EPOLLIN | unix.EPOLLERR, @@ -103,7 +105,7 @@ func (o *OOMCollector) process(fd uintptr, event uint32) { unix.Close(int(fd)) return } - o.memoryOOM.WithValues(info.id).Inc(1) + o.memoryOOM.WithValues(info.id, info.namespace).Inc(1) for _, t := range info.triggers { t(info.id, info.c) }