Merge pull request #5744 from mxpv/metrics
Add runtime label to metrics
This commit is contained in:
		| @@ -29,7 +29,7 @@ import ( | ||||
| 	"github.com/containerd/containerd/namespaces" | ||||
| 	"github.com/containerd/containerd/runtime" | ||||
| 	"github.com/containerd/containerd/runtime/v1/linux" | ||||
| 	metrics "github.com/docker/go-metrics" | ||||
| 	"github.com/docker/go-metrics" | ||||
| 	"github.com/sirupsen/logrus" | ||||
| ) | ||||
|  | ||||
| @@ -55,8 +55,8 @@ type cgroupsMonitor struct { | ||||
| 	publisher events.Publisher | ||||
| } | ||||
|  | ||||
| func (m *cgroupsMonitor) Monitor(c runtime.Task) error { | ||||
| 	if err := m.collector.Add(c); err != nil { | ||||
| func (m *cgroupsMonitor) Monitor(c runtime.Task, labels map[string]string) error { | ||||
| 	if err := m.collector.Add(c, labels); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	t, ok := c.(*linux.Task) | ||||
|   | ||||
| @@ -28,7 +28,7 @@ import ( | ||||
| 	v1 "github.com/containerd/containerd/metrics/types/v1" | ||||
| 	"github.com/containerd/containerd/namespaces" | ||||
| 	"github.com/containerd/typeurl" | ||||
| 	metrics "github.com/docker/go-metrics" | ||||
| 	"github.com/docker/go-metrics" | ||||
| 	"github.com/gogo/protobuf/types" | ||||
| 	"github.com/prometheus/client_golang/prometheus" | ||||
| ) | ||||
| @@ -53,7 +53,7 @@ func NewCollector(ns *metrics.Namespace) *Collector { | ||||
| 	// add machine cpus and memory info | ||||
| 	c := &Collector{ | ||||
| 		ns:    ns, | ||||
| 		tasks: make(map[string]Statable), | ||||
| 		tasks: make(map[string]entry), | ||||
| 	} | ||||
| 	c.metrics = append(c.metrics, pidMetrics...) | ||||
| 	c.metrics = append(c.metrics, cpuMetrics...) | ||||
| @@ -69,12 +69,19 @@ func taskID(id, namespace string) string { | ||||
| 	return fmt.Sprintf("%s-%s", id, namespace) | ||||
| } | ||||
|  | ||||
| type entry struct { | ||||
| 	task Statable | ||||
| 	// ns is an optional child namespace that contains additional to parent labels. | ||||
| 	// This can be used to append task specific labels to be able to differentiate the different containerd metrics. | ||||
| 	ns *metrics.Namespace | ||||
| } | ||||
|  | ||||
| // Collector provides the ability to collect container stats and export | ||||
| // them in the prometheus format | ||||
| type Collector struct { | ||||
| 	mu sync.RWMutex | ||||
|  | ||||
| 	tasks         map[string]Statable | ||||
| 	tasks         map[string]entry | ||||
| 	ns            *metrics.Namespace | ||||
| 	metrics       []*metric | ||||
| 	storedMetrics chan prometheus.Metric | ||||
| @@ -109,10 +116,11 @@ storedLoop: | ||||
| 	wg.Wait() | ||||
| } | ||||
|  | ||||
| func (c *Collector) collect(t Statable, ch chan<- prometheus.Metric, block bool, wg *sync.WaitGroup) { | ||||
| func (c *Collector) collect(entry entry, ch chan<- prometheus.Metric, block bool, wg *sync.WaitGroup) { | ||||
| 	if wg != nil { | ||||
| 		defer wg.Done() | ||||
| 	} | ||||
| 	t := entry.task | ||||
| 	ctx := namespaces.WithNamespace(context.Background(), t.Namespace()) | ||||
| 	stats, err := t.Stats(ctx) | ||||
| 	if err != nil { | ||||
| @@ -129,13 +137,17 @@ func (c *Collector) collect(t Statable, ch chan<- prometheus.Metric, block bool, | ||||
| 		log.L.WithError(err).Errorf("invalid metric type for %s", t.ID()) | ||||
| 		return | ||||
| 	} | ||||
| 	ns := entry.ns | ||||
| 	if ns == nil { | ||||
| 		ns = c.ns | ||||
| 	} | ||||
| 	for _, m := range c.metrics { | ||||
| 		m.collect(t.ID(), t.Namespace(), s, c.ns, ch, block) | ||||
| 		m.collect(t.ID(), t.Namespace(), s, ns, ch, block) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // Add adds the provided cgroup and id so that metrics are collected and exported | ||||
| func (c *Collector) Add(t Statable) error { | ||||
| func (c *Collector) Add(t Statable, labels map[string]string) error { | ||||
| 	if c.ns == nil { | ||||
| 		return nil | ||||
| 	} | ||||
| @@ -145,7 +157,11 @@ func (c *Collector) Add(t Statable) error { | ||||
| 	if _, ok := c.tasks[id]; ok { | ||||
| 		return nil // requests to collect metrics should be idempotent | ||||
| 	} | ||||
| 	c.tasks[id] = t | ||||
| 	entry := entry{task: t} | ||||
| 	if labels != nil { | ||||
| 		entry.ns = c.ns.WithConstLabels(labels) | ||||
| 	} | ||||
| 	c.tasks[id] = entry | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| @@ -165,6 +181,6 @@ func (c *Collector) RemoveAll() { | ||||
| 		return | ||||
| 	} | ||||
| 	c.mu.Lock() | ||||
| 	c.tasks = make(map[string]Statable) | ||||
| 	c.tasks = make(map[string]entry) | ||||
| 	c.mu.Unlock() | ||||
| } | ||||
|   | ||||
| @@ -23,7 +23,7 @@ import ( | ||||
|  | ||||
| 	"github.com/containerd/containerd/events" | ||||
| 	"github.com/containerd/containerd/runtime" | ||||
| 	metrics "github.com/docker/go-metrics" | ||||
| 	"github.com/docker/go-metrics" | ||||
| ) | ||||
|  | ||||
| // NewTaskMonitor returns a new cgroups monitor | ||||
| @@ -42,8 +42,8 @@ type cgroupsMonitor struct { | ||||
| 	publisher events.Publisher | ||||
| } | ||||
|  | ||||
| func (m *cgroupsMonitor) Monitor(c runtime.Task) error { | ||||
| 	if err := m.collector.Add(c); err != nil { | ||||
| func (m *cgroupsMonitor) Monitor(c runtime.Task, labels map[string]string) error { | ||||
| 	if err := m.collector.Add(c, labels); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	return nil | ||||
|   | ||||
| @@ -27,7 +27,7 @@ import ( | ||||
| 	v2 "github.com/containerd/containerd/metrics/types/v2" | ||||
| 	"github.com/containerd/containerd/namespaces" | ||||
| 	"github.com/containerd/typeurl" | ||||
| 	metrics "github.com/docker/go-metrics" | ||||
| 	"github.com/docker/go-metrics" | ||||
| 	"github.com/gogo/protobuf/types" | ||||
| 	"github.com/prometheus/client_golang/prometheus" | ||||
| ) | ||||
| @@ -47,7 +47,7 @@ func NewCollector(ns *metrics.Namespace) *Collector { | ||||
| 	} | ||||
| 	c := &Collector{ | ||||
| 		ns:    ns, | ||||
| 		tasks: make(map[string]Statable), | ||||
| 		tasks: make(map[string]entry), | ||||
| 	} | ||||
| 	c.metrics = append(c.metrics, pidMetrics...) | ||||
| 	c.metrics = append(c.metrics, cpuMetrics...) | ||||
| @@ -62,12 +62,19 @@ func taskID(id, namespace string) string { | ||||
| 	return fmt.Sprintf("%s-%s", id, namespace) | ||||
| } | ||||
|  | ||||
| type entry struct { | ||||
| 	task Statable | ||||
| 	// ns is an optional child namespace that contains additional to parent labels. | ||||
| 	// This can be used to append task specific labels to be able to differentiate the different containerd metrics. | ||||
| 	ns *metrics.Namespace | ||||
| } | ||||
|  | ||||
| // Collector provides the ability to collect container stats and export | ||||
| // them in the prometheus format | ||||
| type Collector struct { | ||||
| 	mu sync.RWMutex | ||||
|  | ||||
| 	tasks         map[string]Statable | ||||
| 	tasks         map[string]entry | ||||
| 	ns            *metrics.Namespace | ||||
| 	metrics       []*metric | ||||
| 	storedMetrics chan prometheus.Metric | ||||
| @@ -102,10 +109,11 @@ storedLoop: | ||||
| 	wg.Wait() | ||||
| } | ||||
|  | ||||
| func (c *Collector) collect(t Statable, ch chan<- prometheus.Metric, block bool, wg *sync.WaitGroup) { | ||||
| func (c *Collector) collect(entry entry, ch chan<- prometheus.Metric, block bool, wg *sync.WaitGroup) { | ||||
| 	if wg != nil { | ||||
| 		defer wg.Done() | ||||
| 	} | ||||
| 	t := entry.task | ||||
| 	ctx := namespaces.WithNamespace(context.Background(), t.Namespace()) | ||||
| 	stats, err := t.Stats(ctx) | ||||
| 	if err != nil { | ||||
| @@ -122,13 +130,17 @@ func (c *Collector) collect(t Statable, ch chan<- prometheus.Metric, block bool, | ||||
| 		log.L.WithError(err).Errorf("invalid metric type for %s", t.ID()) | ||||
| 		return | ||||
| 	} | ||||
| 	ns := entry.ns | ||||
| 	if ns == nil { | ||||
| 		ns = c.ns | ||||
| 	} | ||||
| 	for _, m := range c.metrics { | ||||
| 		m.collect(t.ID(), t.Namespace(), s, c.ns, ch, block) | ||||
| 		m.collect(t.ID(), t.Namespace(), s, ns, ch, block) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // Add adds the provided cgroup and id so that metrics are collected and exported | ||||
| func (c *Collector) Add(t Statable) error { | ||||
| func (c *Collector) Add(t Statable, labels map[string]string) error { | ||||
| 	if c.ns == nil { | ||||
| 		return nil | ||||
| 	} | ||||
| @@ -138,7 +150,11 @@ func (c *Collector) Add(t Statable) error { | ||||
| 	if _, ok := c.tasks[id]; ok { | ||||
| 		return nil // requests to collect metrics should be idempotent | ||||
| 	} | ||||
| 	c.tasks[id] = t | ||||
| 	entry := entry{task: t} | ||||
| 	if labels != nil { | ||||
| 		entry.ns = c.ns.WithConstLabels(labels) | ||||
| 	} | ||||
| 	c.tasks[id] = entry | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| @@ -158,6 +174,6 @@ func (c *Collector) RemoveAll() { | ||||
| 		return | ||||
| 	} | ||||
| 	c.mu.Lock() | ||||
| 	c.tasks = make(map[string]Statable) | ||||
| 	c.tasks = make(map[string]entry) | ||||
| 	c.mu.Unlock() | ||||
| } | ||||
|   | ||||
| @@ -18,10 +18,11 @@ package runtime | ||||
|  | ||||
| // TaskMonitor provides an interface for monitoring of containers within containerd | ||||
| type TaskMonitor interface { | ||||
| 	// Monitor adds the provided container to the monitor | ||||
| 	Monitor(Task) error | ||||
| 	// Monitor adds the provided container to the monitor. | ||||
| 	// Labels are optional (can be nil) key value pairs to be added to the metrics namespace. | ||||
| 	Monitor(task Task, labels map[string]string) error | ||||
| 	// Stop stops and removes the provided container from the monitor | ||||
| 	Stop(Task) error | ||||
| 	Stop(task Task) error | ||||
| } | ||||
|  | ||||
| // NewMultiTaskMonitor returns a new TaskMonitor broadcasting to the provided monitors | ||||
| @@ -39,7 +40,7 @@ func NewNoopMonitor() TaskMonitor { | ||||
| type noopTaskMonitor struct { | ||||
| } | ||||
|  | ||||
| func (mm *noopTaskMonitor) Monitor(c Task) error { | ||||
| func (mm *noopTaskMonitor) Monitor(c Task, labels map[string]string) error { | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| @@ -51,9 +52,9 @@ type multiTaskMonitor struct { | ||||
| 	monitors []TaskMonitor | ||||
| } | ||||
|  | ||||
| func (mm *multiTaskMonitor) Monitor(c Task) error { | ||||
| func (mm *multiTaskMonitor) Monitor(task Task, labels map[string]string) error { | ||||
| 	for _, m := range mm.monitors { | ||||
| 		if err := m.Monitor(c); err != nil { | ||||
| 		if err := m.Monitor(task, labels); err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| 	} | ||||
|   | ||||
| @@ -115,7 +115,7 @@ func initFunc(ic *plugin.InitContext) (interface{}, error) { | ||||
| 			return nil, err | ||||
| 		} | ||||
| 		for _, t := range tasks { | ||||
| 			l.monitor.Monitor(t) | ||||
| 			l.monitor.Monitor(t, nil) | ||||
| 		} | ||||
| 	} | ||||
| 	v2Tasks, err := l.v2Runtime.Tasks(ic.Context, true) | ||||
| @@ -123,7 +123,7 @@ func initFunc(ic *plugin.InitContext) (interface{}, error) { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	for _, t := range v2Tasks { | ||||
| 		l.monitor.Monitor(t) | ||||
| 		l.monitor.Monitor(t, nil) | ||||
| 	} | ||||
| 	return l, nil | ||||
| } | ||||
| @@ -211,7 +211,8 @@ func (l *local) Create(ctx context.Context, r *api.CreateTaskRequest, _ ...grpc. | ||||
| 	if err != nil { | ||||
| 		return nil, errdefs.ToGRPC(err) | ||||
| 	} | ||||
| 	if err := l.monitor.Monitor(c); err != nil { | ||||
| 	labels := map[string]string{"runtime": container.Runtime.Name} | ||||
| 	if err := l.monitor.Monitor(c, labels); err != nil { | ||||
| 		return nil, errors.Wrap(err, "monitor task") | ||||
| 	} | ||||
| 	return &api.CreateTaskResponse{ | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 Maksym Pavlenko
					Maksym Pavlenko