diff --git a/metrics/cgroups/v1/cgroups.go b/metrics/cgroups/v1/cgroups.go index 23e4893da..80cf20ac0 100644 --- a/metrics/cgroups/v1/cgroups.go +++ b/metrics/cgroups/v1/cgroups.go @@ -35,7 +35,7 @@ import ( // NewTaskMonitor returns a new cgroups monitor func NewTaskMonitor(ctx context.Context, publisher events.Publisher, ns *metrics.Namespace) (runtime.TaskMonitor, error) { - collector := newCollector(ns) + collector := NewCollector(ns) oom, err := newOOMCollector(ns) if err != nil { return nil, err @@ -49,7 +49,7 @@ func NewTaskMonitor(ctx context.Context, publisher events.Publisher, ns *metrics } type cgroupsMonitor struct { - collector *collector + collector *Collector oom *oomCollector context context.Context publisher events.Publisher diff --git a/metrics/cgroups/v1/metrics.go b/metrics/cgroups/v1/metrics.go index e4a41edd6..eac3e26db 100644 --- a/metrics/cgroups/v1/metrics.go +++ b/metrics/cgroups/v1/metrics.go @@ -27,26 +27,33 @@ import ( "github.com/containerd/containerd/log" v1 "github.com/containerd/containerd/metrics/types/v1" "github.com/containerd/containerd/namespaces" - "github.com/containerd/containerd/runtime" "github.com/containerd/typeurl" metrics "github.com/docker/go-metrics" + "github.com/gogo/protobuf/types" "github.com/prometheus/client_golang/prometheus" ) +// Statable type that returns cgroup metrics +type Statable interface { + ID() string + Namespace() string + Stats(context.Context) (*types.Any, error) +} + // Trigger will be called when an event happens and provides the cgroup // where the event originated from type Trigger func(string, string, cgroups.Cgroup) -// newCollector registers the collector with the provided namespace and returns it so +// NewCollector registers the collector with the provided namespace and returns it so // that cgroups can be added for collection -func newCollector(ns *metrics.Namespace) *collector { +func NewCollector(ns *metrics.Namespace) *Collector { if ns == nil { - return &collector{} + return &Collector{} } // add machine cpus and memory info - c := &collector{ + c := &Collector{ ns: ns, - tasks: make(map[string]runtime.Task), + tasks: make(map[string]Statable), } c.metrics = append(c.metrics, pidMetrics...) c.metrics = append(c.metrics, cpuMetrics...) @@ -62,24 +69,26 @@ func taskID(id, namespace string) string { return fmt.Sprintf("%s-%s", id, namespace) } -// collector provides the ability to collect container stats and export +// Collector provides the ability to collect container stats and export // them in the prometheus format -type collector struct { +type Collector struct { mu sync.RWMutex - tasks map[string]runtime.Task + tasks map[string]Statable ns *metrics.Namespace metrics []*metric storedMetrics chan prometheus.Metric } -func (c *collector) Describe(ch chan<- *prometheus.Desc) { +// Describe prometheus metrics +func (c *Collector) Describe(ch chan<- *prometheus.Desc) { for _, m := range c.metrics { ch <- m.desc(c.ns) } } -func (c *collector) Collect(ch chan<- prometheus.Metric) { +// Collect prometheus metrics +func (c *Collector) Collect(ch chan<- prometheus.Metric) { c.mu.RLock() wg := &sync.WaitGroup{} for _, t := range c.tasks { @@ -100,7 +109,7 @@ storedLoop: wg.Wait() } -func (c *collector) collect(t runtime.Task, ch chan<- prometheus.Metric, block bool, wg *sync.WaitGroup) { +func (c *Collector) collect(t Statable, ch chan<- prometheus.Metric, block bool, wg *sync.WaitGroup) { if wg != nil { defer wg.Done() } @@ -126,7 +135,7 @@ func (c *collector) collect(t runtime.Task, ch chan<- prometheus.Metric, block b } // Add adds the provided cgroup and id so that metrics are collected and exported -func (c *collector) Add(t runtime.Task) error { +func (c *Collector) Add(t Statable) error { if c.ns == nil { return nil } @@ -141,11 +150,21 @@ func (c *collector) Add(t runtime.Task) error { } // Remove removes the provided cgroup by id from the collector -func (c *collector) Remove(t runtime.Task) { +func (c *Collector) Remove(t Statable) { if c.ns == nil { return } c.mu.Lock() - defer c.mu.Unlock() delete(c.tasks, taskID(t.ID(), t.Namespace())) + c.mu.Unlock() +} + +// RemoveAll statable items from the collector +func (c *Collector) RemoveAll() { + if c.ns == nil { + return + } + c.mu.Lock() + c.tasks = make(map[string]Statable) + c.mu.Unlock() } diff --git a/metrics/cgroups/v2/cgroups.go b/metrics/cgroups/v2/cgroups.go index 1f54049f1..c0bce5996 100644 --- a/metrics/cgroups/v2/cgroups.go +++ b/metrics/cgroups/v2/cgroups.go @@ -30,7 +30,7 @@ import ( // NewTaskMonitor returns a new cgroups monitor func NewTaskMonitor(ctx context.Context, publisher events.Publisher, ns *metrics.Namespace) (runtime.TaskMonitor, error) { - collector := newCollector(ns) + collector := NewCollector(ns) return &cgroupsMonitor{ collector: collector, context: ctx, @@ -39,7 +39,7 @@ func NewTaskMonitor(ctx context.Context, publisher events.Publisher, ns *metrics } type cgroupsMonitor struct { - collector *collector + collector *Collector context context.Context publisher events.Publisher } diff --git a/metrics/cgroups/v2/metrics.go b/metrics/cgroups/v2/metrics.go index e352cd1fc..e8bade1a6 100644 --- a/metrics/cgroups/v2/metrics.go +++ b/metrics/cgroups/v2/metrics.go @@ -26,21 +26,28 @@ import ( "github.com/containerd/containerd/log" v2 "github.com/containerd/containerd/metrics/types/v2" "github.com/containerd/containerd/namespaces" - "github.com/containerd/containerd/runtime" "github.com/containerd/typeurl" metrics "github.com/docker/go-metrics" + "github.com/gogo/protobuf/types" "github.com/prometheus/client_golang/prometheus" ) -// newCollector registers the collector with the provided namespace and returns it so +// Statable type that returns cgroup metrics +type Statable interface { + ID() string + Namespace() string + Stats(context.Context) (*types.Any, error) +} + +// NewCollector registers the collector with the provided namespace and returns it so // that cgroups can be added for collection -func newCollector(ns *metrics.Namespace) *collector { +func NewCollector(ns *metrics.Namespace) *Collector { if ns == nil { - return &collector{} + return &Collector{} } - c := &collector{ + c := &Collector{ ns: ns, - tasks: make(map[string]runtime.Task), + tasks: make(map[string]Statable), } c.metrics = append(c.metrics, pidMetrics...) c.metrics = append(c.metrics, cpuMetrics...) @@ -55,24 +62,26 @@ func taskID(id, namespace string) string { return fmt.Sprintf("%s-%s", id, namespace) } -// collector provides the ability to collect container stats and export +// Collector provides the ability to collect container stats and export // them in the prometheus format -type collector struct { +type Collector struct { mu sync.RWMutex - tasks map[string]runtime.Task + tasks map[string]Statable ns *metrics.Namespace metrics []*metric storedMetrics chan prometheus.Metric } -func (c *collector) Describe(ch chan<- *prometheus.Desc) { +// Describe prometheus metrics +func (c *Collector) Describe(ch chan<- *prometheus.Desc) { for _, m := range c.metrics { ch <- m.desc(c.ns) } } -func (c *collector) Collect(ch chan<- prometheus.Metric) { +// Collect prometheus metrics +func (c *Collector) Collect(ch chan<- prometheus.Metric) { c.mu.RLock() wg := &sync.WaitGroup{} for _, t := range c.tasks { @@ -93,7 +102,7 @@ storedLoop: wg.Wait() } -func (c *collector) collect(t runtime.Task, ch chan<- prometheus.Metric, block bool, wg *sync.WaitGroup) { +func (c *Collector) collect(t Statable, ch chan<- prometheus.Metric, block bool, wg *sync.WaitGroup) { if wg != nil { defer wg.Done() } @@ -119,7 +128,7 @@ func (c *collector) collect(t runtime.Task, ch chan<- prometheus.Metric, block b } // Add adds the provided cgroup and id so that metrics are collected and exported -func (c *collector) Add(t runtime.Task) error { +func (c *Collector) Add(t Statable) error { if c.ns == nil { return nil } @@ -134,7 +143,7 @@ func (c *collector) Add(t runtime.Task) error { } // Remove removes the provided cgroup by id from the collector -func (c *collector) Remove(t runtime.Task) { +func (c *Collector) Remove(t Statable) { if c.ns == nil { return } @@ -142,3 +151,13 @@ func (c *collector) Remove(t runtime.Task) { defer c.mu.Unlock() delete(c.tasks, taskID(t.ID(), t.Namespace())) } + +// RemoveAll statable items from the collector +func (c *Collector) RemoveAll() { + if c.ns == nil { + return + } + c.mu.Lock() + c.tasks = make(map[string]Statable) + c.mu.Unlock() +}