Add new endpoint for resource metrics.

This commit is contained in:
RainbowMango 2019-12-15 17:06:13 +08:00
parent 5ead4974e0
commit 0db7074e1a
4 changed files with 358 additions and 8 deletions

View File

@ -0,0 +1,153 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package collectors
import (
"time"
"k8s.io/component-base/metrics"
"k8s.io/klog"
summary "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1"
"k8s.io/kubernetes/pkg/kubelet/server/stats"
)
var (
nodeCPUUsageDesc = metrics.NewDesc("node_cpu_usage_seconds",
"Cumulative cpu time consumed by the node in core-seconds",
nil,
nil,
metrics.ALPHA,
"")
nodeMemoryUsageDesc = metrics.NewDesc("node_memory_working_set_bytes",
"Current working set of the node in bytes",
nil,
nil,
metrics.ALPHA,
"")
containerCPUUsageDesc = metrics.NewDesc("container_cpu_usage_seconds",
"Cumulative cpu time consumed by the container in core-seconds",
[]string{"container", "pod", "namespace"},
nil,
metrics.ALPHA,
"")
containerMemoryUsageDesc = metrics.NewDesc("container_memory_working_set_bytes",
"Current working set of the container in bytes",
[]string{"container", "pod", "namespace"},
nil,
metrics.ALPHA,
"")
resouceScrapeResultDesc = metrics.NewDesc("scrape_error",
"1 if there was an error while getting container metrics, 0 otherwise",
nil,
nil,
metrics.ALPHA,
"")
)
// NewResourceMetricsCollector returns a metrics.StableCollector which exports resource metrics
func NewResourceMetricsCollector(provider stats.SummaryProvider) metrics.StableCollector {
return &resourceMetricsCollector{
provider: provider,
}
}
type resourceMetricsCollector struct {
metrics.BaseStableCollector
provider stats.SummaryProvider
}
// Check if resourceMetricsCollector implements necessary interface
var _ metrics.StableCollector = &resourceMetricsCollector{}
// DescribeWithStability implements metrics.StableCollector
func (rc *resourceMetricsCollector) DescribeWithStability(ch chan<- *metrics.Desc) {
ch <- nodeCPUUsageDesc
ch <- nodeMemoryUsageDesc
ch <- containerCPUUsageDesc
ch <- containerMemoryUsageDesc
ch <- resouceScrapeResultDesc
}
// CollectWithStability implements metrics.StableCollector
// Since new containers are frequently created and removed, using the Gauge would
// leak metric collectors for containers or pods that no longer exist. Instead, implement
// custom collector in a way that only collects metrics for active containers.
func (rc *resourceMetricsCollector) CollectWithStability(ch chan<- metrics.Metric) {
var errorCount float64
defer func() {
ch <- metrics.NewLazyConstMetric(resouceScrapeResultDesc, metrics.GaugeValue, errorCount)
}()
statsSummary, err := rc.provider.GetCPUAndMemoryStats()
if err != nil {
errorCount = 1
klog.Warningf("Error getting summary for resourceMetric prometheus endpoint: %v", err)
return
}
rc.collectNodeCPUMetrics(ch, statsSummary.Node)
rc.collectNodeMemoryMetrics(ch, statsSummary.Node)
for _, pod := range statsSummary.Pods {
for _, container := range pod.Containers {
rc.collectContainerCPUMetrics(ch, pod, container)
rc.collectContainerMemoryMetrics(ch, pod, container)
}
}
}
func (rc *resourceMetricsCollector) collectNodeCPUMetrics(ch chan<- metrics.Metric, s summary.NodeStats) {
if s.CPU == nil {
return
}
ch <- metrics.NewLazyMetricWithTimestamp(s.CPU.Time.Time,
metrics.NewLazyConstMetric(nodeCPUUsageDesc, metrics.GaugeValue, float64(*s.CPU.UsageCoreNanoSeconds)/float64(time.Second)))
}
func (rc *resourceMetricsCollector) collectNodeMemoryMetrics(ch chan<- metrics.Metric, s summary.NodeStats) {
if s.Memory == nil {
return
}
ch <- metrics.NewLazyMetricWithTimestamp(s.Memory.Time.Time,
metrics.NewLazyConstMetric(nodeMemoryUsageDesc, metrics.GaugeValue, float64(*s.Memory.WorkingSetBytes)))
}
func (rc *resourceMetricsCollector) collectContainerCPUMetrics(ch chan<- metrics.Metric, pod summary.PodStats, s summary.ContainerStats) {
if s.CPU == nil {
return
}
ch <- metrics.NewLazyMetricWithTimestamp(s.CPU.Time.Time,
metrics.NewLazyConstMetric(containerCPUUsageDesc, metrics.GaugeValue,
float64(*s.CPU.UsageCoreNanoSeconds)/float64(time.Second), s.Name, pod.PodRef.Name, pod.PodRef.Namespace))
}
func (rc *resourceMetricsCollector) collectContainerMemoryMetrics(ch chan<- metrics.Metric, pod summary.PodStats, s summary.ContainerStats) {
if s.Memory == nil {
return
}
ch <- metrics.NewLazyMetricWithTimestamp(s.Memory.Time.Time,
metrics.NewLazyConstMetric(containerMemoryUsageDesc, metrics.GaugeValue,
float64(*s.Memory.WorkingSetBytes), s.Name, pod.PodRef.Name, pod.PodRef.Namespace))
}

View File

@ -0,0 +1,189 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package collectors
import (
"fmt"
"strings"
"testing"
"time"
"github.com/stretchr/testify/mock"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/component-base/metrics/testutil"
statsapi "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1"
)
type mockSummaryProvider struct {
mock.Mock
}
func (m *mockSummaryProvider) Get(updateStats bool) (*statsapi.Summary, error) {
args := m.Called(updateStats)
return args.Get(0).(*statsapi.Summary), args.Error(1)
}
func (m *mockSummaryProvider) GetCPUAndMemoryStats() (*statsapi.Summary, error) {
args := m.Called()
return args.Get(0).(*statsapi.Summary), args.Error(1)
}
func TestCollectResourceMetrics(t *testing.T) {
testTime := metav1.NewTime(time.Unix(2, 0)) // a static timestamp: 2000
interestedMetrics := []string{
"scrape_error",
"node_cpu_usage_seconds",
"node_memory_working_set_bytes",
"container_cpu_usage_seconds",
"container_memory_working_set_bytes",
}
tests := []struct {
name string
summary *statsapi.Summary
summaryErr error
expectedMetrics string
}{
{
name: "error getting summary",
summary: nil,
summaryErr: fmt.Errorf("failed to get summary"),
expectedMetrics: `
# HELP scrape_error [ALPHA] 1 if there was an error while getting container metrics, 0 otherwise
# TYPE scrape_error gauge
scrape_error 1
`,
},
{
name: "arbitrary node metrics",
summary: &statsapi.Summary{
Node: statsapi.NodeStats{
CPU: &statsapi.CPUStats{
Time: testTime,
UsageCoreNanoSeconds: uint64Ptr(10000000000),
},
Memory: &statsapi.MemoryStats{
Time: testTime,
WorkingSetBytes: uint64Ptr(1000),
},
},
},
summaryErr: nil,
expectedMetrics: `
# HELP node_cpu_usage_seconds [ALPHA] Cumulative cpu time consumed by the node in core-seconds
# TYPE node_cpu_usage_seconds gauge
node_cpu_usage_seconds 10 2000
# HELP node_memory_working_set_bytes [ALPHA] Current working set of the node in bytes
# TYPE node_memory_working_set_bytes gauge
node_memory_working_set_bytes 1000 2000
# HELP scrape_error [ALPHA] 1 if there was an error while getting container metrics, 0 otherwise
# TYPE scrape_error gauge
scrape_error 0
`,
},
{
name: "arbitrary container metrics for different container, pods and namespaces",
summary: &statsapi.Summary{
Pods: []statsapi.PodStats{
{
PodRef: statsapi.PodReference{
Name: "pod_a",
Namespace: "namespace_a",
},
Containers: []statsapi.ContainerStats{
{
Name: "container_a",
CPU: &statsapi.CPUStats{
Time: testTime,
UsageCoreNanoSeconds: uint64Ptr(10000000000),
},
Memory: &statsapi.MemoryStats{
Time: testTime,
WorkingSetBytes: uint64Ptr(1000),
},
},
{
Name: "container_b",
CPU: &statsapi.CPUStats{
Time: testTime,
UsageCoreNanoSeconds: uint64Ptr(10000000000),
},
Memory: &statsapi.MemoryStats{
Time: testTime,
WorkingSetBytes: uint64Ptr(1000),
},
},
},
},
{
PodRef: statsapi.PodReference{
Name: "pod_b",
Namespace: "namespace_b",
},
Containers: []statsapi.ContainerStats{
{
Name: "container_a",
CPU: &statsapi.CPUStats{
Time: testTime,
UsageCoreNanoSeconds: uint64Ptr(10000000000),
},
Memory: &statsapi.MemoryStats{
Time: testTime,
WorkingSetBytes: uint64Ptr(1000),
},
},
},
},
},
},
summaryErr: nil,
expectedMetrics: `
# HELP scrape_error [ALPHA] 1 if there was an error while getting container metrics, 0 otherwise
# TYPE scrape_error gauge
scrape_error 0
# HELP container_cpu_usage_seconds [ALPHA] Cumulative cpu time consumed by the container in core-seconds
# TYPE container_cpu_usage_seconds gauge
container_cpu_usage_seconds{container="container_a",namespace="namespace_a",pod="pod_a"} 10 2000
container_cpu_usage_seconds{container="container_a",namespace="namespace_b",pod="pod_b"} 10 2000
container_cpu_usage_seconds{container="container_b",namespace="namespace_a",pod="pod_a"} 10 2000
# HELP container_memory_working_set_bytes [ALPHA] Current working set of the container in bytes
# TYPE container_memory_working_set_bytes gauge
container_memory_working_set_bytes{container="container_a",namespace="namespace_a",pod="pod_a"} 1000 2000
container_memory_working_set_bytes{container="container_a",namespace="namespace_b",pod="pod_b"} 1000 2000
container_memory_working_set_bytes{container="container_b",namespace="namespace_a",pod="pod_a"} 1000 2000
`,
},
}
for _, test := range tests {
tc := test
t.Run(tc.name, func(t *testing.T) {
provider := &mockSummaryProvider{}
provider.On("GetCPUAndMemoryStats").Return(tc.summary, tc.summaryErr)
collector := NewResourceMetricsCollector(provider)
if err := testutil.CustomCollectAndCompare(collector, strings.NewReader(tc.expectedMetrics), interestedMetrics...); err != nil {
t.Fatal(err)
}
})
}
}
func uint64Ptr(u uint64) *uint64 {
return &u
}

View File

@ -128,6 +128,7 @@ func AuthzTestCases() []AuthzTestCase {
"/metrics/cadvisor": "metrics",
"/metrics/probes": "metrics",
"/metrics/resource/v1alpha1": "metrics",
"/metrics/resource": "metrics",
"/pods/": "proxy",
"/portForward/{podNamespace}/{podID}": "proxy",
"/portForward/{podNamespace}/{podID}/{uid}": "proxy",

View File

@ -38,6 +38,7 @@ import (
"github.com/google/cadvisor/metrics"
"google.golang.org/grpc"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/kubelet/metrics/collectors"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -74,13 +75,13 @@ import (
)
const (
metricsPath = "/metrics"
cadvisorMetricsPath = "/metrics/cadvisor"
resourceMetricsPathPrefix = "/metrics/resource"
proberMetricsPath = "/metrics/probes"
specPath = "/spec/"
statsPath = "/stats/"
logsPath = "/logs/"
metricsPath = "/metrics"
cadvisorMetricsPath = "/metrics/cadvisor"
resourceMetricsPath = "/metrics/resource"
proberMetricsPath = "/metrics/probes"
specPath = "/spec/"
statsPath = "/stats/"
logsPath = "/logs/"
)
// Server is a http.Handler which exposes kubelet functionality over HTTP.
@ -321,10 +322,16 @@ func (s *Server) InstallDefaultHandlers(enableCAdvisorJSONEndpoints bool) {
v1alpha1ResourceRegistry := compbasemetrics.NewKubeRegistry()
v1alpha1ResourceRegistry.CustomMustRegister(stats.NewPrometheusResourceMetricCollector(s.resourceAnalyzer, v1alpha1.Config()))
s.restfulCont.Handle(path.Join(resourceMetricsPathPrefix, v1alpha1.Version),
s.restfulCont.Handle(path.Join(resourceMetricsPath, v1alpha1.Version),
compbasemetrics.HandlerFor(v1alpha1ResourceRegistry, compbasemetrics.HandlerOpts{ErrorHandling: compbasemetrics.ContinueOnError}),
)
resourceRegistry := compbasemetrics.NewKubeRegistry()
resourceRegistry.CustomMustRegister(collectors.NewResourceMetricsCollector(s.resourceAnalyzer))
s.restfulCont.Handle(resourceMetricsPath,
compbasemetrics.HandlerFor(resourceRegistry, compbasemetrics.HandlerOpts{ErrorHandling: compbasemetrics.ContinueOnError}),
)
// prober metrics are exposed under a different endpoint
p := compbasemetrics.NewKubeRegistry()