export cgroups collectors

This makes it easier to extend the collectors to be used by external code and
task managers

Signed-off-by: Michael Crosby <crosbymichael@gmail.com>
This commit is contained in:
Michael Crosby 2020-03-05 14:12:10 -05:00
parent 35a8de8996
commit 1239f54035
4 changed files with 71 additions and 33 deletions

View File

@ -35,7 +35,7 @@ import (
// NewTaskMonitor returns a new cgroups monitor // NewTaskMonitor returns a new cgroups monitor
func NewTaskMonitor(ctx context.Context, publisher events.Publisher, ns *metrics.Namespace) (runtime.TaskMonitor, error) { func NewTaskMonitor(ctx context.Context, publisher events.Publisher, ns *metrics.Namespace) (runtime.TaskMonitor, error) {
collector := newCollector(ns) collector := NewCollector(ns)
oom, err := newOOMCollector(ns) oom, err := newOOMCollector(ns)
if err != nil { if err != nil {
return nil, err return nil, err
@ -49,7 +49,7 @@ func NewTaskMonitor(ctx context.Context, publisher events.Publisher, ns *metrics
} }
type cgroupsMonitor struct { type cgroupsMonitor struct {
collector *collector collector *Collector
oom *oomCollector oom *oomCollector
context context.Context context context.Context
publisher events.Publisher publisher events.Publisher

View File

@ -27,26 +27,33 @@ import (
"github.com/containerd/containerd/log" "github.com/containerd/containerd/log"
v1 "github.com/containerd/containerd/metrics/types/v1" v1 "github.com/containerd/containerd/metrics/types/v1"
"github.com/containerd/containerd/namespaces" "github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/runtime"
"github.com/containerd/typeurl" "github.com/containerd/typeurl"
metrics "github.com/docker/go-metrics" metrics "github.com/docker/go-metrics"
"github.com/gogo/protobuf/types"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
) )
// Statable type that returns cgroup metrics
type Statable interface {
ID() string
Namespace() string
Stats(context.Context) (*types.Any, error)
}
// Trigger will be called when an event happens and provides the cgroup // Trigger will be called when an event happens and provides the cgroup
// where the event originated from // where the event originated from
type Trigger func(string, string, cgroups.Cgroup) type Trigger func(string, string, cgroups.Cgroup)
// newCollector registers the collector with the provided namespace and returns it so // NewCollector registers the collector with the provided namespace and returns it so
// that cgroups can be added for collection // that cgroups can be added for collection
func newCollector(ns *metrics.Namespace) *collector { func NewCollector(ns *metrics.Namespace) *Collector {
if ns == nil { if ns == nil {
return &collector{} return &Collector{}
} }
// add machine cpus and memory info // add machine cpus and memory info
c := &collector{ c := &Collector{
ns: ns, ns: ns,
tasks: make(map[string]runtime.Task), tasks: make(map[string]Statable),
} }
c.metrics = append(c.metrics, pidMetrics...) c.metrics = append(c.metrics, pidMetrics...)
c.metrics = append(c.metrics, cpuMetrics...) c.metrics = append(c.metrics, cpuMetrics...)
@ -62,24 +69,26 @@ func taskID(id, namespace string) string {
return fmt.Sprintf("%s-%s", id, namespace) return fmt.Sprintf("%s-%s", id, namespace)
} }
// collector provides the ability to collect container stats and export // Collector provides the ability to collect container stats and export
// them in the prometheus format // them in the prometheus format
type collector struct { type Collector struct {
mu sync.RWMutex mu sync.RWMutex
tasks map[string]runtime.Task tasks map[string]Statable
ns *metrics.Namespace ns *metrics.Namespace
metrics []*metric metrics []*metric
storedMetrics chan prometheus.Metric storedMetrics chan prometheus.Metric
} }
func (c *collector) Describe(ch chan<- *prometheus.Desc) { // Describe prometheus metrics
func (c *Collector) Describe(ch chan<- *prometheus.Desc) {
for _, m := range c.metrics { for _, m := range c.metrics {
ch <- m.desc(c.ns) ch <- m.desc(c.ns)
} }
} }
func (c *collector) Collect(ch chan<- prometheus.Metric) { // Collect prometheus metrics
func (c *Collector) Collect(ch chan<- prometheus.Metric) {
c.mu.RLock() c.mu.RLock()
wg := &sync.WaitGroup{} wg := &sync.WaitGroup{}
for _, t := range c.tasks { for _, t := range c.tasks {
@ -100,7 +109,7 @@ storedLoop:
wg.Wait() wg.Wait()
} }
func (c *collector) collect(t runtime.Task, ch chan<- prometheus.Metric, block bool, wg *sync.WaitGroup) { func (c *Collector) collect(t Statable, ch chan<- prometheus.Metric, block bool, wg *sync.WaitGroup) {
if wg != nil { if wg != nil {
defer wg.Done() defer wg.Done()
} }
@ -126,7 +135,7 @@ func (c *collector) collect(t runtime.Task, ch chan<- prometheus.Metric, block b
} }
// Add adds the provided cgroup and id so that metrics are collected and exported // Add adds the provided cgroup and id so that metrics are collected and exported
func (c *collector) Add(t runtime.Task) error { func (c *Collector) Add(t Statable) error {
if c.ns == nil { if c.ns == nil {
return nil return nil
} }
@ -141,11 +150,21 @@ func (c *collector) Add(t runtime.Task) error {
} }
// Remove removes the provided cgroup by id from the collector // Remove removes the provided cgroup by id from the collector
func (c *collector) Remove(t runtime.Task) { func (c *Collector) Remove(t Statable) {
if c.ns == nil { if c.ns == nil {
return return
} }
c.mu.Lock() c.mu.Lock()
defer c.mu.Unlock()
delete(c.tasks, taskID(t.ID(), t.Namespace())) delete(c.tasks, taskID(t.ID(), t.Namespace()))
c.mu.Unlock()
}
// RemoveAll statable items from the collector
func (c *Collector) RemoveAll() {
if c.ns == nil {
return
}
c.mu.Lock()
c.tasks = make(map[string]Statable)
c.mu.Unlock()
} }

View File

@ -30,7 +30,7 @@ import (
// NewTaskMonitor returns a new cgroups monitor // NewTaskMonitor returns a new cgroups monitor
func NewTaskMonitor(ctx context.Context, publisher events.Publisher, ns *metrics.Namespace) (runtime.TaskMonitor, error) { func NewTaskMonitor(ctx context.Context, publisher events.Publisher, ns *metrics.Namespace) (runtime.TaskMonitor, error) {
collector := newCollector(ns) collector := NewCollector(ns)
return &cgroupsMonitor{ return &cgroupsMonitor{
collector: collector, collector: collector,
context: ctx, context: ctx,
@ -39,7 +39,7 @@ func NewTaskMonitor(ctx context.Context, publisher events.Publisher, ns *metrics
} }
type cgroupsMonitor struct { type cgroupsMonitor struct {
collector *collector collector *Collector
context context.Context context context.Context
publisher events.Publisher publisher events.Publisher
} }

View File

@ -26,21 +26,28 @@ import (
"github.com/containerd/containerd/log" "github.com/containerd/containerd/log"
v2 "github.com/containerd/containerd/metrics/types/v2" v2 "github.com/containerd/containerd/metrics/types/v2"
"github.com/containerd/containerd/namespaces" "github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/runtime"
"github.com/containerd/typeurl" "github.com/containerd/typeurl"
metrics "github.com/docker/go-metrics" metrics "github.com/docker/go-metrics"
"github.com/gogo/protobuf/types"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
) )
// newCollector registers the collector with the provided namespace and returns it so // Statable type that returns cgroup metrics
type Statable interface {
ID() string
Namespace() string
Stats(context.Context) (*types.Any, error)
}
// NewCollector registers the collector with the provided namespace and returns it so
// that cgroups can be added for collection // that cgroups can be added for collection
func newCollector(ns *metrics.Namespace) *collector { func NewCollector(ns *metrics.Namespace) *Collector {
if ns == nil { if ns == nil {
return &collector{} return &Collector{}
} }
c := &collector{ c := &Collector{
ns: ns, ns: ns,
tasks: make(map[string]runtime.Task), tasks: make(map[string]Statable),
} }
c.metrics = append(c.metrics, pidMetrics...) c.metrics = append(c.metrics, pidMetrics...)
c.metrics = append(c.metrics, cpuMetrics...) c.metrics = append(c.metrics, cpuMetrics...)
@ -55,24 +62,26 @@ func taskID(id, namespace string) string {
return fmt.Sprintf("%s-%s", id, namespace) return fmt.Sprintf("%s-%s", id, namespace)
} }
// collector provides the ability to collect container stats and export // Collector provides the ability to collect container stats and export
// them in the prometheus format // them in the prometheus format
type collector struct { type Collector struct {
mu sync.RWMutex mu sync.RWMutex
tasks map[string]runtime.Task tasks map[string]Statable
ns *metrics.Namespace ns *metrics.Namespace
metrics []*metric metrics []*metric
storedMetrics chan prometheus.Metric storedMetrics chan prometheus.Metric
} }
func (c *collector) Describe(ch chan<- *prometheus.Desc) { // Describe prometheus metrics
func (c *Collector) Describe(ch chan<- *prometheus.Desc) {
for _, m := range c.metrics { for _, m := range c.metrics {
ch <- m.desc(c.ns) ch <- m.desc(c.ns)
} }
} }
func (c *collector) Collect(ch chan<- prometheus.Metric) { // Collect prometheus metrics
func (c *Collector) Collect(ch chan<- prometheus.Metric) {
c.mu.RLock() c.mu.RLock()
wg := &sync.WaitGroup{} wg := &sync.WaitGroup{}
for _, t := range c.tasks { for _, t := range c.tasks {
@ -93,7 +102,7 @@ storedLoop:
wg.Wait() wg.Wait()
} }
func (c *collector) collect(t runtime.Task, ch chan<- prometheus.Metric, block bool, wg *sync.WaitGroup) { func (c *Collector) collect(t Statable, ch chan<- prometheus.Metric, block bool, wg *sync.WaitGroup) {
if wg != nil { if wg != nil {
defer wg.Done() defer wg.Done()
} }
@ -119,7 +128,7 @@ func (c *collector) collect(t runtime.Task, ch chan<- prometheus.Metric, block b
} }
// Add adds the provided cgroup and id so that metrics are collected and exported // Add adds the provided cgroup and id so that metrics are collected and exported
func (c *collector) Add(t runtime.Task) error { func (c *Collector) Add(t Statable) error {
if c.ns == nil { if c.ns == nil {
return nil return nil
} }
@ -134,7 +143,7 @@ func (c *collector) Add(t runtime.Task) error {
} }
// Remove removes the provided cgroup by id from the collector // Remove removes the provided cgroup by id from the collector
func (c *collector) Remove(t runtime.Task) { func (c *Collector) Remove(t Statable) {
if c.ns == nil { if c.ns == nil {
return return
} }
@ -142,3 +151,13 @@ func (c *collector) Remove(t runtime.Task) {
defer c.mu.Unlock() defer c.mu.Unlock()
delete(c.tasks, taskID(t.ID(), t.Namespace())) delete(c.tasks, taskID(t.ID(), t.Namespace()))
} }
// RemoveAll statable items from the collector
func (c *Collector) RemoveAll() {
if c.ns == nil {
return
}
c.mu.Lock()
c.tasks = make(map[string]Statable)
c.mu.Unlock()
}