Merge pull request #87923 from ingvagabund/move-direct-prometheus-metrics-under-component-base-metrics

Collect some of scheduling metrics and scheduling throughput (vol. 2)
This commit is contained in:
Kubernetes Prow Robot
2020-02-13 14:13:11 -08:00
committed by GitHub
6 changed files with 616 additions and 4 deletions

View File

@@ -13,6 +13,7 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/version:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/version:go_default_library",
"//staging/src/k8s.io/component-base/metrics:go_default_library", "//staging/src/k8s.io/component-base/metrics:go_default_library",
"//vendor/github.com/prometheus/client_golang/prometheus/testutil:go_default_library", "//vendor/github.com/prometheus/client_golang/prometheus/testutil:go_default_library",
"//vendor/github.com/prometheus/client_model/go:go_default_library",
"//vendor/github.com/prometheus/common/expfmt:go_default_library", "//vendor/github.com/prometheus/common/expfmt:go_default_library",
"//vendor/github.com/prometheus/common/model:go_default_library", "//vendor/github.com/prometheus/common/model:go_default_library",
], ],
@@ -34,7 +35,14 @@ filegroup(
go_test( go_test(
name = "go_default_test", name = "go_default_test",
srcs = ["testutil_test.go"], srcs = [
"metrics_test.go",
"testutil_test.go",
],
embed = [":go_default_library"], embed = [":go_default_library"],
deps = ["//staging/src/k8s.io/component-base/metrics:go_default_library"], deps = [
"//staging/src/k8s.io/component-base/metrics:go_default_library",
"//vendor/github.com/prometheus/client_model/go:go_default_library",
"//vendor/k8s.io/utils/pointer:go_default_library",
],
) )

View File

@@ -19,11 +19,16 @@ package testutil
import ( import (
"fmt" "fmt"
"io" "io"
"math"
"reflect" "reflect"
"sort"
"strings" "strings"
dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt" "github.com/prometheus/common/expfmt"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"k8s.io/component-base/metrics"
) )
var ( var (
@@ -178,3 +183,140 @@ func ValidateMetrics(metrics Metrics, metricName string, expectedLabels ...strin
} }
return nil return nil
} }
// Histogram wraps prometheus histogram DTO (data transfer object)
type Histogram struct {
*dto.Histogram
}
// GetHistogramFromGatherer collects a metric from a gatherer implementing k8s.io/component-base/metrics.Gatherer interface.
// Used only for testing purposes where we need to gather metrics directly from a running binary (without metrics endpoint).
func GetHistogramFromGatherer(gatherer metrics.Gatherer, metricName string) (Histogram, error) {
var metricFamily *dto.MetricFamily
m, err := gatherer.Gather()
if err != nil {
return Histogram{}, err
}
for _, mFamily := range m {
if mFamily.Name != nil && *mFamily.Name == metricName {
metricFamily = mFamily
break
}
}
if metricFamily == nil {
return Histogram{}, fmt.Errorf("Metric %q not found", metricName)
}
if metricFamily.GetMetric() == nil {
return Histogram{}, fmt.Errorf("Metric %q is empty", metricName)
}
if len(metricFamily.GetMetric()) == 0 {
return Histogram{}, fmt.Errorf("Metric %q is empty", metricName)
}
return Histogram{
// Histograms are stored under the first index (based on observation).
// Given there's only one histogram registered per each metric name, accessing
// the first index is sufficient.
metricFamily.GetMetric()[0].GetHistogram(),
}, nil
}
func uint64Ptr(u uint64) *uint64 {
return &u
}
// Bucket of a histogram
type bucket struct {
upperBound float64
count float64
}
func bucketQuantile(q float64, buckets []bucket) float64 {
if q < 0 {
return math.Inf(-1)
}
if q > 1 {
return math.Inf(+1)
}
if len(buckets) < 2 {
return math.NaN()
}
rank := q * buckets[len(buckets)-1].count
b := sort.Search(len(buckets)-1, func(i int) bool { return buckets[i].count >= rank })
if b == 0 {
return buckets[0].upperBound * (rank / buckets[0].count)
}
// linear approximation of b-th bucket
brank := rank - buckets[b-1].count
bSize := buckets[b].upperBound - buckets[b-1].upperBound
bCount := buckets[b].count - buckets[b-1].count
return buckets[b-1].upperBound + bSize*(brank/bCount)
}
// Quantile computes q-th quantile of a cumulative histogram.
// It's expected the histogram is valid (by calling Validate)
func (hist *Histogram) Quantile(q float64) float64 {
buckets := []bucket{}
for _, bckt := range hist.Bucket {
buckets = append(buckets, bucket{
count: float64(*bckt.CumulativeCount),
upperBound: *bckt.UpperBound,
})
}
// bucketQuantile expects the upper bound of the last bucket to be +inf
// buckets[len(buckets)-1].upperBound = math.Inf(+1)
return bucketQuantile(q, buckets)
}
// Average computes histogram's average value
func (hist *Histogram) Average() float64 {
return *hist.SampleSum / float64(*hist.SampleCount)
}
// Clear clears all fields of the wrapped histogram
func (hist *Histogram) Clear() {
if hist.SampleCount != nil {
*hist.SampleCount = 0
}
if hist.SampleSum != nil {
*hist.SampleSum = 0
}
for _, b := range hist.Bucket {
if b.CumulativeCount != nil {
*b.CumulativeCount = 0
}
}
}
// Validate makes sure the wrapped histogram has all necessary fields set and with valid values.
func (hist *Histogram) Validate() error {
if hist.SampleCount == nil || *hist.SampleCount == 0 {
return fmt.Errorf("nil or empty histogram SampleCount")
}
if hist.SampleSum == nil || *hist.SampleSum == 0 {
return fmt.Errorf("nil or empty histogram SampleSum")
}
for _, bckt := range hist.Bucket {
if bckt == nil {
return fmt.Errorf("empty histogram bucket")
}
if bckt.UpperBound == nil || *bckt.UpperBound < 0 {
return fmt.Errorf("nil or negative histogram bucket UpperBound")
}
}
return nil
}

View File

@@ -0,0 +1,255 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package testutil
import (
"fmt"
"testing"
"k8s.io/utils/pointer"
dto "github.com/prometheus/client_model/go"
)
func samples2Histogram(samples []float64, upperBounds []float64) Histogram {
histogram := dto.Histogram{
SampleCount: uint64Ptr(0),
SampleSum: pointer.Float64Ptr(0.0),
}
for _, ub := range upperBounds {
histogram.Bucket = append(histogram.Bucket, &dto.Bucket{
CumulativeCount: uint64Ptr(0),
UpperBound: pointer.Float64Ptr(ub),
})
}
for _, sample := range samples {
for i, bucket := range histogram.Bucket {
if sample < *bucket.UpperBound {
*histogram.Bucket[i].CumulativeCount++
}
}
*histogram.SampleCount++
*histogram.SampleSum += sample
}
return Histogram{
&histogram,
}
}
func TestHistogramQuantile(t *testing.T) {
tests := []struct {
samples []float64
bounds []float64
q50 float64
q90 float64
q99 float64
}{
{
// repeating numbers
samples: []float64{0.5, 0.5, 0.5, 0.5, 1.5, 1.5, 1.5, 1.5, 3, 3, 3, 3, 6, 6, 6, 6},
bounds: []float64{1, 2, 4, 8},
q50: 2,
q90: 6.4,
q99: 7.84,
},
{
// random numbers
samples: []float64{11, 67, 61, 21, 40, 36, 52, 63, 8, 3, 67, 35, 61, 1, 36, 58},
bounds: []float64{10, 20, 40, 80},
q50: 40,
q90: 72,
q99: 79.2,
},
{
// the last bucket is empty
samples: []float64{6, 34, 30, 10, 20, 18, 26, 31, 4, 2, 33, 17, 30, 1, 18, 29},
bounds: []float64{10, 20, 40, 80},
q50: 20,
q90: 36,
q99: 39.6,
},
}
for _, test := range tests {
h := samples2Histogram(test.samples, test.bounds)
q50 := h.Quantile(0.5)
q90 := h.Quantile(0.9)
q99 := h.Quantile(0.99)
q999999 := h.Quantile(0.999999)
if q50 != test.q50 {
t.Errorf("Expected q50 to be %v, got %v instead", test.q50, q50)
}
if q90 != test.q90 {
t.Errorf("Expected q90 to be %v, got %v instead", test.q90, q90)
}
if q99 != test.q99 {
t.Errorf("Expected q99 to be %v, got %v instead", test.q99, q99)
}
lastUpperBound := test.bounds[len(test.bounds)-1]
if !(q999999 < lastUpperBound) {
t.Errorf("Expected q999999 to be less than %v, got %v instead", lastUpperBound, q999999)
}
}
}
func TestHistogramClear(t *testing.T) {
samples := []float64{0.5, 0.5, 0.5, 0.5, 1.5, 1.5, 1.5, 1.5, 3, 3, 3, 3, 6, 6, 6, 6}
bounds := []float64{1, 2, 4, 8}
h := samples2Histogram(samples, bounds)
if *h.SampleCount == 0 {
t.Errorf("Expected histogram .SampleCount to be non-zero")
}
if *h.SampleSum == 0 {
t.Errorf("Expected histogram .SampleSum to be non-zero")
}
for _, b := range h.Bucket {
if b.CumulativeCount != nil {
if *b.CumulativeCount == 0 {
t.Errorf("Expected histogram bucket to have non-zero comulative count")
}
}
}
h.Clear()
if *h.SampleCount != 0 {
t.Errorf("Expected histogram .SampleCount to be zero, have %v instead", *h.SampleCount)
}
if *h.SampleSum != 0 {
t.Errorf("Expected histogram .SampleSum to be zero, have %v instead", *h.SampleSum)
}
for _, b := range h.Bucket {
if b.CumulativeCount != nil {
if *b.CumulativeCount != 0 {
t.Errorf("Expected histogram bucket to have zero comulative count, have %v instead", *b.CumulativeCount)
}
}
if b.UpperBound != nil {
*b.UpperBound = 0
}
}
}
func TestHistogramValidate(t *testing.T) {
tests := []struct {
name string
h Histogram
err error
}{
{
name: "nil SampleCount",
h: Histogram{
&dto.Histogram{},
},
err: fmt.Errorf("nil or empty histogram SampleCount"),
},
{
name: "empty SampleCount",
h: Histogram{
&dto.Histogram{
SampleCount: uint64Ptr(0),
},
},
err: fmt.Errorf("nil or empty histogram SampleCount"),
},
{
name: "nil SampleSum",
h: Histogram{
&dto.Histogram{
SampleCount: uint64Ptr(1),
},
},
err: fmt.Errorf("nil or empty histogram SampleSum"),
},
{
name: "empty SampleSum",
h: Histogram{
&dto.Histogram{
SampleCount: uint64Ptr(1),
SampleSum: pointer.Float64Ptr(0.0),
},
},
err: fmt.Errorf("nil or empty histogram SampleSum"),
},
{
name: "nil bucket",
h: Histogram{
&dto.Histogram{
SampleCount: uint64Ptr(1),
SampleSum: pointer.Float64Ptr(1.0),
Bucket: []*dto.Bucket{
nil,
},
},
},
err: fmt.Errorf("empty histogram bucket"),
},
{
name: "nil bucket UpperBound",
h: Histogram{
&dto.Histogram{
SampleCount: uint64Ptr(1),
SampleSum: pointer.Float64Ptr(1.0),
Bucket: []*dto.Bucket{
{},
},
},
},
err: fmt.Errorf("nil or negative histogram bucket UpperBound"),
},
{
name: "negative bucket UpperBound",
h: Histogram{
&dto.Histogram{
SampleCount: uint64Ptr(1),
SampleSum: pointer.Float64Ptr(1.0),
Bucket: []*dto.Bucket{
{UpperBound: pointer.Float64Ptr(-1.0)},
},
},
},
err: fmt.Errorf("nil or negative histogram bucket UpperBound"),
},
{
name: "valid histogram",
h: samples2Histogram(
[]float64{0.5, 0.5, 0.5, 0.5, 1.5, 1.5, 1.5, 1.5, 3, 3, 3, 3, 6, 6, 6, 6},
[]float64{1, 2, 4, 8},
),
},
}
for _, test := range tests {
err := test.h.Validate()
if test.err != nil {
if err == nil || err.Error() != test.err.Error() {
t.Errorf("Expected %q error, got %q instead", test.err, err)
}
} else {
if err != nil {
t.Errorf("Expected error to be nil, got %q instead", err)
}
}
}
}

View File

@@ -20,7 +20,10 @@ go_library(
"//staging/src/k8s.io/client-go/informers/core/v1:go_default_library", "//staging/src/k8s.io/client-go/informers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/rest:go_default_library", "//staging/src/k8s.io/client-go/rest:go_default_library",
"//staging/src/k8s.io/component-base/metrics/legacyregistry:go_default_library",
"//staging/src/k8s.io/component-base/metrics/testutil:go_default_library",
"//test/integration/util:go_default_library", "//test/integration/util:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
], ],
) )

View File

@@ -38,6 +38,15 @@ const (
configFile = "config/performance-config.yaml" configFile = "config/performance-config.yaml"
) )
var (
defaultMetrics = []string{
"scheduler_scheduling_algorithm_predicate_evaluation_seconds",
"scheduler_scheduling_algorithm_priority_evaluation_seconds",
"scheduler_binding_duration_seconds",
"scheduler_e2e_scheduling_duration_seconds",
}
)
// testCase configures a test case to run the scheduler performance test. Users should be able to // testCase configures a test case to run the scheduler performance test. Users should be able to
// provide this via a YAML file. // provide this via a YAML file.
// //
@@ -92,6 +101,7 @@ type testParams struct {
} }
func BenchmarkPerfScheduling(b *testing.B) { func BenchmarkPerfScheduling(b *testing.B) {
dataItems := DataItems{Version: "v1"}
tests := getSimpleTestCases(configFile) tests := getSimpleTestCases(configFile)
for _, test := range tests { for _, test := range tests {
@@ -100,12 +110,15 @@ func BenchmarkPerfScheduling(b *testing.B) {
for feature, flag := range test.FeatureGates { for feature, flag := range test.FeatureGates {
defer featuregatetesting.SetFeatureGateDuringTest(b, utilfeature.DefaultFeatureGate, feature, flag)() defer featuregatetesting.SetFeatureGateDuringTest(b, utilfeature.DefaultFeatureGate, feature, flag)()
} }
perfScheduling(test, b) dataItems.DataItems = append(dataItems.DataItems, perfScheduling(test, b)...)
}) })
} }
if err := dataItems2JSONFile(dataItems, b.Name()); err != nil {
klog.Fatalf("%v: unable to write measured data: %v", b.Name(), err)
}
} }
func perfScheduling(test testCase, b *testing.B) { func perfScheduling(test testCase, b *testing.B) []DataItem {
var nodeStrategy testutils.PrepareNodeStrategy = &testutils.TrivialNodePrepareStrategy{} var nodeStrategy testutils.PrepareNodeStrategy = &testutils.TrivialNodePrepareStrategy{}
if test.Nodes.NodeAllocatableStrategy != nil { if test.Nodes.NodeAllocatableStrategy != nil {
nodeStrategy = test.Nodes.NodeAllocatableStrategy nodeStrategy = test.Nodes.NodeAllocatableStrategy
@@ -180,15 +193,45 @@ func perfScheduling(test testCase, b *testing.B) {
// start benchmark // start benchmark
b.ResetTimer() b.ResetTimer()
// Start measuring throughput
stopCh := make(chan struct{})
throughputCollector := newThroughputCollector(podInformer)
go throughputCollector.run(stopCh)
// Scheduling the main workload
config = testutils.NewTestPodCreatorConfig() config = testutils.NewTestPodCreatorConfig()
config.AddStrategy(testNamespace, test.PodsToSchedule.Num, testPodStrategy) config.AddStrategy(testNamespace, test.PodsToSchedule.Num, testPodStrategy)
podCreator = testutils.NewTestPodCreator(clientset, config) podCreator = testutils.NewTestPodCreator(clientset, config)
podCreator.CreatePods() podCreator.CreatePods()
<-completedCh <-completedCh
close(stopCh)
// Note: without this line we're taking the overhead of defer() into account. // Note: without this line we're taking the overhead of defer() into account.
b.StopTimer() b.StopTimer()
setNameLabel := func(dataItem *DataItem) DataItem {
if dataItem.Labels == nil {
dataItem.Labels = map[string]string{}
}
dataItem.Labels["Name"] = b.Name()
return *dataItem
}
dataItems := []DataItem{
setNameLabel(throughputCollector.collect()),
}
for _, metric := range defaultMetrics {
dataItem := newMetricsCollector(metric).collect()
if dataItem == nil {
continue
}
dataItems = append(dataItems, setNameLabel(dataItem))
}
return dataItems
} }
func getPodStrategy(pc podCase) testutils.TestPodCreateStrategy { func getPodStrategy(pc podCase) testutils.TestPodCreateStrategy {

View File

@@ -17,15 +17,34 @@ limitations under the License.
package benchmark package benchmark
import ( import (
"encoding/json"
"flag"
"fmt"
"io/ioutil"
"math"
"path"
"sort"
"time"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
coreinformers "k8s.io/client-go/informers/core/v1" coreinformers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest" restclient "k8s.io/client-go/rest"
"k8s.io/component-base/metrics/legacyregistry"
"k8s.io/component-base/metrics/testutil"
"k8s.io/klog"
"k8s.io/kubernetes/test/integration/util" "k8s.io/kubernetes/test/integration/util"
) )
const (
dateFormat = "2006-01-02T15:04:05Z"
throughputSampleFrequency = time.Second
)
var dataItemsDir = flag.String("data-items-dir", "", "destination directory for storing generated data items for perf dashboard")
// mustSetupScheduler starts the following components: // mustSetupScheduler starts the following components:
// - k8s api server (a.k.a. master) // - k8s api server (a.k.a. master)
// - scheduler // - scheduler
@@ -66,3 +85,145 @@ func getScheduledPods(podInformer coreinformers.PodInformer) ([]*v1.Pod, error)
} }
return scheduled, nil return scheduled, nil
} }
// DataItem is the data point.
type DataItem struct {
// Data is a map from bucket to real data point (e.g. "Perc90" -> 23.5). Notice
// that all data items with the same label combination should have the same buckets.
Data map[string]float64 `json:"data"`
// Unit is the data unit. Notice that all data items with the same label combination
// should have the same unit.
Unit string `json:"unit"`
// Labels is the labels of the data item.
Labels map[string]string `json:"labels,omitempty"`
}
// DataItems is the data point set. It is the struct that perf dashboard expects.
type DataItems struct {
Version string `json:"version"`
DataItems []DataItem `json:"dataItems"`
}
func dataItems2JSONFile(dataItems DataItems, namePrefix string) error {
b, err := json.Marshal(dataItems)
if err != nil {
return err
}
destFile := fmt.Sprintf("%v_%v.json", namePrefix, time.Now().Format(dateFormat))
if *dataItemsDir != "" {
destFile = path.Join(*dataItemsDir, destFile)
}
return ioutil.WriteFile(destFile, b, 0644)
}
// metricsCollector collects metrics from legacyregistry.DefaultGatherer.Gather() endpoint.
// Currently only Histrogram metrics are supported.
type metricsCollector struct {
metric string
}
func newMetricsCollector(metric string) *metricsCollector {
return &metricsCollector{
metric: metric,
}
}
func (pc *metricsCollector) collect() *DataItem {
hist, err := testutil.GetHistogramFromGatherer(legacyregistry.DefaultGatherer, pc.metric)
if err != nil {
klog.Error(err)
return nil
}
if err := hist.Validate(); err != nil {
klog.Error(err)
return nil
}
q50 := hist.Quantile(0.50)
q90 := hist.Quantile(0.90)
q99 := hist.Quantile(0.95)
avg := hist.Average()
// clear the metrics so that next test always starts with empty prometheus
// metrics (since the metrics are shared among all tests run inside the same binary)
hist.Clear()
msFactor := float64(time.Second) / float64(time.Millisecond)
return &DataItem{
Labels: map[string]string{
"Metric": pc.metric,
},
Data: map[string]float64{
"Perc50": q50 * msFactor,
"Perc90": q90 * msFactor,
"Perc99": q99 * msFactor,
"Average": avg * msFactor,
},
Unit: "ms",
}
}
type throughputCollector struct {
podInformer coreinformers.PodInformer
schedulingThroughputs []float64
}
func newThroughputCollector(podInformer coreinformers.PodInformer) *throughputCollector {
return &throughputCollector{
podInformer: podInformer,
}
}
func (tc *throughputCollector) run(stopCh chan struct{}) {
podsScheduled, err := getScheduledPods(tc.podInformer)
if err != nil {
klog.Fatalf("%v", err)
}
lastScheduledCount := len(podsScheduled)
for {
select {
case <-stopCh:
return
case <-time.After(throughputSampleFrequency):
podsScheduled, err := getScheduledPods(tc.podInformer)
if err != nil {
klog.Fatalf("%v", err)
}
scheduled := len(podsScheduled)
samplingRatioSeconds := float64(throughputSampleFrequency) / float64(time.Second)
throughput := float64(scheduled-lastScheduledCount) / samplingRatioSeconds
tc.schedulingThroughputs = append(tc.schedulingThroughputs, throughput)
lastScheduledCount = scheduled
klog.Infof("%d pods scheduled", lastScheduledCount)
}
}
}
func (tc *throughputCollector) collect() *DataItem {
throughputSummary := &DataItem{}
if length := len(tc.schedulingThroughputs); length > 0 {
sort.Float64s(tc.schedulingThroughputs)
sum := 0.0
for i := range tc.schedulingThroughputs {
sum += tc.schedulingThroughputs[i]
}
throughputSummary.Labels = map[string]string{
"Metric": "SchedulingThroughput",
}
throughputSummary.Data = map[string]float64{
"Average": sum / float64(length),
"Perc50": tc.schedulingThroughputs[int(math.Ceil(float64(length*50)/100))-1],
"Perc90": tc.schedulingThroughputs[int(math.Ceil(float64(length*90)/100))-1],
"Perc99": tc.schedulingThroughputs[int(math.Ceil(float64(length*99)/100))-1],
}
throughputSummary.Unit = "pods/s"
}
return throughputSummary
}