Merge pull request #68068 from krzysztof-jastrzebski/hpas2

Automatic merge from submit-queue. If you want to cherry-pick this change to another branch, please follow the instructions here: https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md.

Change CPU sample sanitization in HPA.

**What this PR does / why we need it**:
Change CPU sample sanitization in HPA.
    Ignore samples if:
    - Pod is beeing initalized - 5 minutes from start defined by flag
        - pod is unready
        - pod is ready but full window of metric hasn't been colected since
        transition
    - Pod is initialized - 5 minutes from start defined by flag:
        - Pod has never been ready after initial readiness period.

**Release notes:**
```release-note
Improve CPU sample sanitization in HPA by taking metric's freshness into account.
```
This commit is contained in:
Kubernetes Submit Queue
2018-08-31 10:17:44 -07:00
committed by GitHub
23 changed files with 382 additions and 119 deletions

View File

@@ -292,8 +292,9 @@ func (tc *testCase) prepareTestClient(t *testing.T) (*fake.Clientset, *metricsfa
Phase: podPhase,
Conditions: []v1.PodCondition{
{
Type: v1.PodReady,
Status: podReadiness,
Type: v1.PodReady,
Status: podReadiness,
LastTransitionTime: podStartTime,
},
},
StartTime: &podStartTime,
@@ -474,6 +475,7 @@ func (tc *testCase) prepareTestClient(t *testing.T) (*fake.Clientset, *metricsfa
Labels: labelSet,
},
Timestamp: metav1.Time{Time: time.Now()},
Window: metav1.Duration{Duration: time.Minute},
Containers: []metricsapi.ContainerMetrics{
{
Name: "container",
@@ -657,7 +659,7 @@ func (tc *testCase) setupController(t *testing.T) (*HorizontalController, inform
return true, obj, nil
})
replicaCalc := NewReplicaCalculator(metricsClient, testClient.Core(), defaultTestingTolerance, defaultTestingCpuTaintAfterStart, defaultTestingDelayOfInitialReadinessStatus)
replicaCalc := NewReplicaCalculator(metricsClient, testClient.Core(), defaultTestingTolerance, defaultTestingCpuInitializationPeriod, defaultTestingDelayOfInitialReadinessStatus)
informerFactory := informers.NewSharedInformerFactory(testClient, controller.NoResyncPeriodFunc())
defaultDownscaleForbiddenWindow := 5 * time.Minute

View File

@@ -485,7 +485,7 @@ func (tc *legacyTestCase) runTest(t *testing.T) {
return true, obj, nil
})
replicaCalc := NewReplicaCalculator(metricsClient, testClient.Core(), defaultTestingTolerance, defaultTestingCpuTaintAfterStart, defaultTestingDelayOfInitialReadinessStatus)
replicaCalc := NewReplicaCalculator(metricsClient, testClient.Core(), defaultTestingTolerance, defaultTestingCpuInitializationPeriod, defaultTestingDelayOfInitialReadinessStatus)
informerFactory := informers.NewSharedInformerFactory(testClient, controller.NoResyncPeriodFunc())
defaultDownscaleForbiddenWindow := 5 * time.Minute

View File

@@ -186,7 +186,7 @@ func (tc *legacyReplicaCalcTestCase) runTest(t *testing.T) {
testClient := tc.prepareTestClient(t)
metricsClient := metrics.NewHeapsterMetricsClient(testClient, metrics.DefaultHeapsterNamespace, metrics.DefaultHeapsterScheme, metrics.DefaultHeapsterService, metrics.DefaultHeapsterPort)
replicaCalc := NewReplicaCalculator(metricsClient, testClient.Core(), defaultTestingTolerance, defaultTestingCpuTaintAfterStart, defaultTestingDelayOfInitialReadinessStatus)
replicaCalc := NewReplicaCalculator(metricsClient, testClient.Core(), defaultTestingTolerance, defaultTestingCpuInitializationPeriod, defaultTestingDelayOfInitialReadinessStatus)
selector, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{
MatchLabels: map[string]string{"name": podNamePrefix},

View File

@@ -24,9 +24,15 @@ import (
"k8s.io/apimachinery/pkg/labels"
)
// PodMetricsInfo contains pod metric values as a map from pod names to
// metric values (the metric values are expected to be the metric as a milli-value)
type PodMetricsInfo map[string]int64
// PodMetric contains pod metric value (the metric values are expected to be the metric as a milli-value)
type PodMetric struct {
Timestamp time.Time
Window time.Duration
Value int64
}
// PodMetricsInfo contains pod metrics as a map from pod names to PodMetricsInfo
type PodMetricsInfo map[string]PodMetric
// MetricsClient knows how to query a remote interface to retrieve container-level
// resource metrics as well as pod-level arbitrary metrics

View File

@@ -35,10 +35,11 @@ import (
)
const (
DefaultHeapsterNamespace = "kube-system"
DefaultHeapsterScheme = "http"
DefaultHeapsterService = "heapster"
DefaultHeapsterPort = "" // use the first exposed port on the service
DefaultHeapsterNamespace = "kube-system"
DefaultHeapsterScheme = "http"
DefaultHeapsterService = "heapster"
DefaultHeapsterPort = "" // use the first exposed port on the service
heapsterDefaultMetricWindow = time.Minute
)
var heapsterQueryStart = -5 * time.Minute
@@ -100,7 +101,11 @@ func (h *HeapsterMetricsClient) GetResourceMetric(resource v1.ResourceName, name
}
if !missing {
res[m.Name] = int64(podSum)
res[m.Name] = PodMetric{
Timestamp: m.Timestamp.Time,
Window: m.Window.Duration,
Value: int64(podSum),
}
}
}
@@ -159,7 +164,12 @@ func (h *HeapsterMetricsClient) GetRawMetric(metricName string, namespace string
for i, podMetrics := range metrics.Items {
val, podTimestamp, hadMetrics := collapseTimeSamples(podMetrics, time.Minute)
if hadMetrics {
res[podNames[i]] = val
res[podNames[i]] = PodMetric{
Timestamp: podTimestamp,
Window: heapsterDefaultMetricWindow,
Value: int64(val),
}
if timestamp == nil || podTimestamp.Before(*timestamp) {
timestamp = &podTimestamp
}

View File

@@ -68,6 +68,7 @@ type testCase struct {
replicas int
targetTimestamp int
window time.Duration
reportedMetricsPoints [][]metricPoint
reportedPodMetrics [][]int64
@@ -109,7 +110,8 @@ func (tc *testCase) prepareTestClient(t *testing.T) *fake.Clientset {
Name: fmt.Sprintf("%s-%d", podNamePrefix, i),
Namespace: namespace,
},
Timestamp: metav1.Time{Time: fixedTimestamp.Add(time.Duration(tc.targetTimestamp) * time.Minute)},
Timestamp: metav1.Time{Time: offsetTimestampBy(tc.targetTimestamp)},
Window: metav1.Duration{Duration: tc.window},
Containers: []metricsapi.ContainerMetrics{},
}
for j, cpu := range containers {
@@ -138,7 +140,7 @@ func (tc *testCase) prepareTestClient(t *testing.T) *fake.Clientset {
for _, reportedMetricPoints := range tc.reportedMetricsPoints {
var heapsterMetricPoints []heapster.MetricPoint
for _, reportedMetricPoint := range reportedMetricPoints {
timestamp := fixedTimestamp.Add(time.Duration(reportedMetricPoint.timestamp) * time.Minute)
timestamp := offsetTimestampBy(reportedMetricPoint.timestamp)
if latestTimestamp.Before(timestamp) {
latestTimestamp = timestamp
}
@@ -197,10 +199,20 @@ func (tc *testCase) verifyResults(t *testing.T, metrics PodMetricsInfo, timestam
}
assert.NoError(t, err, "there should be no error retrieving the metrics")
assert.NotNil(t, metrics, "there should be metrics returned")
if len(metrics) != len(tc.desiredMetricValues) {
t.Errorf("Not equal:\nexpected: %v\nactual: %v", tc.desiredMetricValues, metrics)
} else {
for k, m := range metrics {
if !m.Timestamp.Equal(tc.desiredMetricValues[k].Timestamp) ||
m.Window != tc.desiredMetricValues[k].Window ||
m.Value != tc.desiredMetricValues[k].Value {
t.Errorf("Not equal:\nexpected: %v\nactual: %v", tc.desiredMetricValues, metrics)
break
}
}
}
assert.Equal(t, tc.desiredMetricValues, metrics, "the metrics values should be as expected")
targetTimestamp := fixedTimestamp.Add(time.Duration(tc.targetTimestamp) * time.Minute)
targetTimestamp := offsetTimestampBy(tc.targetTimestamp)
assert.True(t, targetTimestamp.Equal(timestamp), fmt.Sprintf("the timestamp should be as expected (%s) but was %s", targetTimestamp, timestamp))
}
@@ -218,65 +230,86 @@ func (tc *testCase) runTest(t *testing.T) {
}
func TestCPU(t *testing.T) {
targetTimestamp := 1
window := 30 * time.Second
tc := testCase{
replicas: 3,
desiredMetricValues: PodMetricsInfo{
"test-pod-0": 5000, "test-pod-1": 5000, "test-pod-2": 5000,
"test-pod-0": PodMetric{Value: 5000, Timestamp: offsetTimestampBy(targetTimestamp), Window: window},
"test-pod-1": PodMetric{Value: 5000, Timestamp: offsetTimestampBy(targetTimestamp), Window: window},
"test-pod-2": PodMetric{Value: 5000, Timestamp: offsetTimestampBy(targetTimestamp), Window: window},
},
resourceName: v1.ResourceCPU,
targetTimestamp: 1,
targetTimestamp: targetTimestamp,
window: window,
reportedPodMetrics: [][]int64{{5000}, {5000}, {5000}},
}
tc.runTest(t)
}
func TestQPS(t *testing.T) {
targetTimestamp := 1
tc := testCase{
replicas: 3,
desiredMetricValues: PodMetricsInfo{
"test-pod-0": 10000, "test-pod-1": 20000, "test-pod-2": 10000,
"test-pod-0": PodMetric{Value: 10000, Timestamp: offsetTimestampBy(targetTimestamp), Window: heapsterDefaultMetricWindow},
"test-pod-1": PodMetric{Value: 20000, Timestamp: offsetTimestampBy(targetTimestamp), Window: heapsterDefaultMetricWindow},
"test-pod-2": PodMetric{Value: 10000, Timestamp: offsetTimestampBy(targetTimestamp), Window: heapsterDefaultMetricWindow},
},
metricName: "qps",
targetTimestamp: 1,
targetTimestamp: targetTimestamp,
reportedMetricsPoints: [][]metricPoint{{{10, 1}}, {{20, 1}}, {{10, 1}}},
}
tc.runTest(t)
}
func TestQpsSumEqualZero(t *testing.T) {
targetTimestamp := 0
tc := testCase{
replicas: 3,
desiredMetricValues: PodMetricsInfo{
"test-pod-0": 0, "test-pod-1": 0, "test-pod-2": 0,
"test-pod-0": PodMetric{Value: 0, Timestamp: offsetTimestampBy(targetTimestamp), Window: heapsterDefaultMetricWindow},
"test-pod-1": PodMetric{Value: 0, Timestamp: offsetTimestampBy(targetTimestamp), Window: heapsterDefaultMetricWindow},
"test-pod-2": PodMetric{Value: 0, Timestamp: offsetTimestampBy(targetTimestamp), Window: heapsterDefaultMetricWindow},
},
metricName: "qps",
targetTimestamp: 0,
targetTimestamp: targetTimestamp,
reportedMetricsPoints: [][]metricPoint{{{0, 0}}, {{0, 0}}, {{0, 0}}},
}
tc.runTest(t)
}
func TestCPUMoreMetrics(t *testing.T) {
targetTimestamp := 10
window := 30 * time.Second
tc := testCase{
replicas: 5,
desiredMetricValues: PodMetricsInfo{
"test-pod-0": 5000, "test-pod-1": 5000, "test-pod-2": 5000,
"test-pod-3": 5000, "test-pod-4": 5000,
"test-pod-0": PodMetric{Value: 5000, Timestamp: offsetTimestampBy(targetTimestamp), Window: window},
"test-pod-1": PodMetric{Value: 5000, Timestamp: offsetTimestampBy(targetTimestamp), Window: window},
"test-pod-2": PodMetric{Value: 5000, Timestamp: offsetTimestampBy(targetTimestamp), Window: window},
"test-pod-3": PodMetric{Value: 5000, Timestamp: offsetTimestampBy(targetTimestamp), Window: window},
"test-pod-4": PodMetric{Value: 5000, Timestamp: offsetTimestampBy(targetTimestamp), Window: window},
},
resourceName: v1.ResourceCPU,
targetTimestamp: 10,
targetTimestamp: targetTimestamp,
window: window,
reportedPodMetrics: [][]int64{{1000, 2000, 2000}, {5000}, {1000, 1000, 1000, 2000}, {4000, 1000}, {5000}},
}
tc.runTest(t)
}
func TestCPUMissingMetrics(t *testing.T) {
targetTimestamp := 0
window := 30 * time.Second
tc := testCase{
replicas: 3,
desiredMetricValues: PodMetricsInfo{
"test-pod-0": 4000,
"test-pod-0": PodMetric{Value: 4000, Timestamp: offsetTimestampBy(targetTimestamp), Window: window},
},
resourceName: v1.ResourceCPU,
targetTimestamp: targetTimestamp,
window: window,
reportedPodMetrics: [][]int64{{4000}},
}
tc.runTest(t)
@@ -315,13 +348,15 @@ func TestCPUEmptyMetrics(t *testing.T) {
}
func TestQpsEmptyEntries(t *testing.T) {
targetTimestamp := 4
tc := testCase{
replicas: 3,
metricName: "qps",
desiredMetricValues: PodMetricsInfo{
"test-pod-0": 4000000, "test-pod-2": 2000000,
"test-pod-0": PodMetric{Value: 4000000, Timestamp: offsetTimestampBy(targetTimestamp), Window: heapsterDefaultMetricWindow},
"test-pod-2": PodMetric{Value: 2000000, Timestamp: offsetTimestampBy(targetTimestamp), Window: heapsterDefaultMetricWindow},
},
targetTimestamp: 4,
targetTimestamp: targetTimestamp,
reportedMetricsPoints: [][]metricPoint{{{4000, 4}}, {}, {{2000, 4}}},
}
tc.runTest(t)
@@ -338,12 +373,17 @@ func TestCPUZeroReplicas(t *testing.T) {
}
func TestCPUEmptyMetricsForOnePod(t *testing.T) {
targetTimestamp := 0
window := 30 * time.Second
tc := testCase{
replicas: 3,
resourceName: v1.ResourceCPU,
desiredMetricValues: PodMetricsInfo{
"test-pod-0": 100, "test-pod-1": 700,
"test-pod-0": PodMetric{Value: 100, Timestamp: offsetTimestampBy(targetTimestamp), Window: window},
"test-pod-1": PodMetric{Value: 700, Timestamp: offsetTimestampBy(targetTimestamp), Window: window},
},
targetTimestamp: targetTimestamp,
window: window,
reportedPodMetrics: [][]int64{{100}, {300, 400}, {}},
}
tc.runTest(t)
@@ -364,3 +404,7 @@ func testCollapseTimeSamples(t *testing.T) {
assert.InEpsilon(t, float64(75), val, 0.1, "collapsed sample value should be as expected")
assert.True(t, timestamp.Equal(now), "timestamp should be the current time (the newest)")
}
func offsetTimestampBy(t int) time.Time {
return fixedTimestamp.Add(time.Duration(t) * time.Minute)
}

View File

@@ -33,6 +33,10 @@ import (
externalclient "k8s.io/metrics/pkg/client/external_metrics"
)
const (
metricServerDefaultMetricWindow = time.Minute
)
func NewRESTMetricsClient(resourceClient resourceclient.PodMetricsesGetter, customClient customclient.CustomMetricsClient, externalClient externalclient.ExternalMetricsClient) MetricsClient {
return &restMetricsClient{
&resourceMetricsClient{resourceClient},
@@ -84,7 +88,11 @@ func (c *resourceMetricsClient) GetResourceMetric(resource v1.ResourceName, name
}
if !missing {
res[m.Name] = int64(podSum)
res[m.Name] = PodMetric{
Timestamp: m.Timestamp.Time,
Window: m.Window.Duration,
Value: int64(podSum),
}
}
}
@@ -113,7 +121,17 @@ func (c *customMetricsClient) GetRawMetric(metricName string, namespace string,
res := make(PodMetricsInfo, len(metrics.Items))
for _, m := range metrics.Items {
res[m.DescribedObject.Name] = m.Value.MilliValue()
window := metricServerDefaultMetricWindow
if m.WindowSeconds != nil {
window = time.Duration(*m.WindowSeconds) * time.Second
}
res[m.DescribedObject.Name] = PodMetric{
Timestamp: m.Timestamp.Time,
Window: window,
Value: int64(m.Value.MilliValue()),
}
m.Value.MilliValue()
}
timestamp := metrics.Items[0].Timestamp.Time

View File

@@ -48,6 +48,7 @@ type restClientTestCase struct {
// "timestamps" here are actually the offset in minutes from a base timestamp
targetTimestamp int
window time.Duration
reportedMetricPoints []metricPoint
reportedPodMetrics [][]int64
singleObject *autoscalingapi.CrossVersionObjectReference
@@ -86,7 +87,8 @@ func (tc *restClientTestCase) prepareTestClient(t *testing.T) (*metricsfake.Clie
Namespace: namespace,
Labels: podLabels,
},
Timestamp: metav1.Time{Time: fixedTimestamp.Add(time.Duration(tc.targetTimestamp) * time.Minute)},
Timestamp: metav1.Time{Time: offsetTimestampBy(tc.targetTimestamp)},
Window: metav1.Duration{Duration: tc.window},
Containers: []metricsapi.ContainerMetrics{},
}
for j, cpu := range containers {
@@ -115,7 +117,7 @@ func (tc *restClientTestCase) prepareTestClient(t *testing.T) (*metricsfake.Clie
metrics := emapi.ExternalMetricValueList{}
for _, metricPoint := range tc.reportedMetricPoints {
timestamp := fixedTimestamp.Add(time.Duration(metricPoint.timestamp) * time.Minute)
timestamp := offsetTimestampBy(metricPoint.timestamp)
metric := emapi.ExternalMetricValue{
Value: *resource.NewMilliQuantity(int64(metricPoint.level), resource.DecimalSI),
Timestamp: metav1.Time{Time: timestamp},
@@ -136,7 +138,7 @@ func (tc *restClientTestCase) prepareTestClient(t *testing.T) (*metricsfake.Clie
assert.Equal(t, "pods", getForAction.GetResource().Resource, "type of object that we requested multiple metrics for should have been pods")
for i, metricPoint := range tc.reportedMetricPoints {
timestamp := fixedTimestamp.Add(time.Duration(metricPoint.timestamp) * time.Minute)
timestamp := offsetTimestampBy(metricPoint.timestamp)
metric := cmapi.MetricValue{
DescribedObject: v1.ObjectReference{
Kind: "Pod",
@@ -168,7 +170,7 @@ func (tc *restClientTestCase) prepareTestClient(t *testing.T) (*metricsfake.Clie
assert.Equal(t, groupResource.String(), getForAction.GetResource().Resource, "should have requested metrics for the resource matching the GroupKind passed in")
assert.Equal(t, tc.singleObject.Name, name, "should have requested metrics for the object matching the name passed in")
metricPoint := tc.reportedMetricPoints[0]
timestamp := fixedTimestamp.Add(time.Duration(metricPoint.timestamp) * time.Minute)
timestamp := offsetTimestampBy(metricPoint.timestamp)
metrics := &cmapi.MetricValueList{
Items: []cmapi.MetricValue{
@@ -204,9 +206,20 @@ func (tc *restClientTestCase) verifyResults(t *testing.T, metrics PodMetricsInfo
assert.NoError(t, err, "there should be no error retrieving the metrics")
assert.NotNil(t, metrics, "there should be metrics returned")
assert.Equal(t, tc.desiredMetricValues, metrics, "the metrics values should be as expected")
if len(metrics) != len(tc.desiredMetricValues) {
t.Errorf("Not equal:\nexpected: %v\nactual: %v", tc.desiredMetricValues, metrics)
} else {
for k, m := range metrics {
if !m.Timestamp.Equal(tc.desiredMetricValues[k].Timestamp) ||
m.Window != tc.desiredMetricValues[k].Window ||
m.Value != tc.desiredMetricValues[k].Value {
t.Errorf("Not equal:\nexpected: %v\nactual: %v", tc.desiredMetricValues, metrics)
break
}
}
}
targetTimestamp := fixedTimestamp.Add(time.Duration(tc.targetTimestamp) * time.Minute)
targetTimestamp := offsetTimestampBy(tc.targetTimestamp)
assert.True(t, targetTimestamp.Equal(timestamp), fmt.Sprintf("the timestamp should be as expected (%s) but was %s", targetTimestamp, timestamp))
}
@@ -227,7 +240,7 @@ func (tc *restClientTestCase) runTest(t *testing.T) {
val, timestamp, err := metricsClient.GetExternalMetric(tc.metricName, tc.namespace, tc.metricLabelSelector)
info := make(PodMetricsInfo, len(val))
for i, metricVal := range val {
info[fmt.Sprintf("%v-val-%v", tc.metricName, i)] = metricVal
info[fmt.Sprintf("%v-val-%v", tc.metricName, i)] = PodMetric{Value: metricVal}
}
tc.verifyResults(t, info, timestamp, err)
} else if tc.singleObject == nil {
@@ -235,18 +248,23 @@ func (tc *restClientTestCase) runTest(t *testing.T) {
tc.verifyResults(t, info, timestamp, err)
} else {
val, timestamp, err := metricsClient.GetObjectMetric(tc.metricName, tc.namespace, tc.singleObject, tc.metricLabelSelector)
info := PodMetricsInfo{tc.singleObject.Name: val}
info := PodMetricsInfo{tc.singleObject.Name: {Value: val}}
tc.verifyResults(t, info, timestamp, err)
}
}
func TestRESTClientCPU(t *testing.T) {
targetTimestamp := 1
window := 30 * time.Second
tc := restClientTestCase{
desiredMetricValues: PodMetricsInfo{
"test-pod-0": 5000, "test-pod-1": 5000, "test-pod-2": 5000,
"test-pod-0": {Value: 5000, Timestamp: offsetTimestampBy(targetTimestamp), Window: window},
"test-pod-1": {Value: 5000, Timestamp: offsetTimestampBy(targetTimestamp), Window: window},
"test-pod-2": {Value: 5000, Timestamp: offsetTimestampBy(targetTimestamp), Window: window},
},
resourceName: v1.ResourceCPU,
targetTimestamp: 1,
targetTimestamp: targetTimestamp,
window: window,
reportedPodMetrics: [][]int64{{5000}, {5000}, {5000}},
}
tc.runTest(t)
@@ -255,7 +273,7 @@ func TestRESTClientCPU(t *testing.T) {
func TestRESTClientExternal(t *testing.T) {
tc := restClientTestCase{
desiredMetricValues: PodMetricsInfo{
"external-val-0": 10000, "external-val-1": 20000, "external-val-2": 10000,
"external-val-0": {Value: 10000}, "external-val-1": {Value: 20000}, "external-val-2": {Value: 10000},
},
metricSelector: &metav1.LabelSelector{MatchLabels: map[string]string{"label": "value"}},
metricName: "external",
@@ -266,12 +284,15 @@ func TestRESTClientExternal(t *testing.T) {
}
func TestRESTClientQPS(t *testing.T) {
targetTimestamp := 1
tc := restClientTestCase{
desiredMetricValues: PodMetricsInfo{
"test-pod-0": 10000, "test-pod-1": 20000, "test-pod-2": 10000,
"test-pod-0": {Value: 10000, Timestamp: offsetTimestampBy(targetTimestamp), Window: metricServerDefaultMetricWindow},
"test-pod-1": {Value: 20000, Timestamp: offsetTimestampBy(targetTimestamp), Window: metricServerDefaultMetricWindow},
"test-pod-2": {Value: 10000, Timestamp: offsetTimestampBy(targetTimestamp), Window: metricServerDefaultMetricWindow},
},
metricName: "qps",
targetTimestamp: 1,
targetTimestamp: targetTimestamp,
reportedMetricPoints: []metricPoint{{10000, 1}, {20000, 1}, {10000, 1}},
}
tc.runTest(t)
@@ -279,7 +300,7 @@ func TestRESTClientQPS(t *testing.T) {
func TestRESTClientSingleObject(t *testing.T) {
tc := restClientTestCase{
desiredMetricValues: PodMetricsInfo{"some-dep": 10},
desiredMetricValues: PodMetricsInfo{"some-dep": {Value: 10}},
metricName: "queue-length",
targetTimestamp: 1,
reportedMetricPoints: []metricPoint{{10, 1}},
@@ -293,12 +314,15 @@ func TestRESTClientSingleObject(t *testing.T) {
}
func TestRESTClientQpsSumEqualZero(t *testing.T) {
targetTimestamp := 0
tc := restClientTestCase{
desiredMetricValues: PodMetricsInfo{
"test-pod-0": 0, "test-pod-1": 0, "test-pod-2": 0,
"test-pod-0": {Value: 0, Timestamp: offsetTimestampBy(targetTimestamp), Window: metricServerDefaultMetricWindow},
"test-pod-1": {Value: 0, Timestamp: offsetTimestampBy(targetTimestamp), Window: metricServerDefaultMetricWindow},
"test-pod-2": {Value: 0, Timestamp: offsetTimestampBy(targetTimestamp), Window: metricServerDefaultMetricWindow},
},
metricName: "qps",
targetTimestamp: 0,
targetTimestamp: targetTimestamp,
reportedMetricPoints: []metricPoint{{0, 0}, {0, 0}, {0, 0}},
}
tc.runTest(t)
@@ -307,7 +331,7 @@ func TestRESTClientQpsSumEqualZero(t *testing.T) {
func TestRESTClientExternalSumEqualZero(t *testing.T) {
tc := restClientTestCase{
desiredMetricValues: PodMetricsInfo{
"external-val-0": 0, "external-val-1": 0, "external-val-2": 0,
"external-val-0": {Value: 0}, "external-val-1": {Value: 0}, "external-val-2": {Value: 0},
},
metricSelector: &metav1.LabelSelector{MatchLabels: map[string]string{"label": "value"}},
metricName: "external",
@@ -349,11 +373,16 @@ func TestRESTClientCPUEmptyMetrics(t *testing.T) {
}
func TestRESTClientCPUEmptyMetricsForOnePod(t *testing.T) {
targetTimestamp := 1
window := 30 * time.Second
tc := restClientTestCase{
resourceName: v1.ResourceCPU,
desiredMetricValues: PodMetricsInfo{
"test-pod-0": 100, "test-pod-1": 700,
"test-pod-0": {Value: 100, Timestamp: offsetTimestampBy(targetTimestamp), Window: window},
"test-pod-1": {Value: 700, Timestamp: offsetTimestampBy(targetTimestamp), Window: window},
},
targetTimestamp: targetTimestamp,
window: window,
reportedPodMetrics: [][]int64{{100}, {300, 400}, {}},
}
tc.runTest(t)

View File

@@ -28,14 +28,14 @@ func GetResourceUtilizationRatio(metrics PodMetricsInfo, requests map[string]int
requestsTotal := int64(0)
numEntries := 0
for podName, metricValue := range metrics {
for podName, metric := range metrics {
request, hasRequest := requests[podName]
if !hasRequest {
// we check for missing requests elsewhere, so assuming missing requests == extraneous metrics
continue
}
metricsTotal += metricValue
metricsTotal += metric.Value
requestsTotal += request
numEntries++
}
@@ -56,8 +56,8 @@ func GetResourceUtilizationRatio(metrics PodMetricsInfo, requests map[string]int
// (returning that and the actual utilization)
func GetMetricUtilizationRatio(metrics PodMetricsInfo, targetUtilization int64) (utilizationRatio float64, currentUtilization int64) {
metricsTotal := int64(0)
for _, metricValue := range metrics {
metricsTotal += metricValue
for _, metric := range metrics {
metricsTotal += metric.Value
}
currentUtilization = metricsTotal / int64(len(metrics))

View File

@@ -67,7 +67,7 @@ func (tc *metricUtilizationRatioTestCase) runTest(t *testing.T) {
func TestGetResourceUtilizationRatioBaseCase(t *testing.T) {
tc := resourceUtilizationRatioTestCase{
metrics: PodMetricsInfo{
"test-pod-0": 50, "test-pod-1": 76,
"test-pod-0": {Value: 50}, "test-pod-1": {Value: 76},
},
requests: map[string]int64{
"test-pod-0": 100, "test-pod-1": 100,
@@ -85,7 +85,7 @@ func TestGetResourceUtilizationRatioBaseCase(t *testing.T) {
func TestGetResourceUtilizationRatioIgnorePodsWithNoRequest(t *testing.T) {
tc := resourceUtilizationRatioTestCase{
metrics: PodMetricsInfo{
"test-pod-0": 50, "test-pod-1": 76, "test-pod-no-request": 100,
"test-pod-0": {Value: 50}, "test-pod-1": {Value: 76}, "test-pod-no-request": {Value: 100},
},
requests: map[string]int64{
"test-pod-0": 100, "test-pod-1": 100,
@@ -103,7 +103,7 @@ func TestGetResourceUtilizationRatioIgnorePodsWithNoRequest(t *testing.T) {
func TestGetResourceUtilizationRatioExtraRequest(t *testing.T) {
tc := resourceUtilizationRatioTestCase{
metrics: PodMetricsInfo{
"test-pod-0": 50, "test-pod-1": 76,
"test-pod-0": {Value: 50}, "test-pod-1": {Value: 76},
},
requests: map[string]int64{
"test-pod-0": 100, "test-pod-1": 100, "test-pod-extra-request": 500,
@@ -121,7 +121,7 @@ func TestGetResourceUtilizationRatioExtraRequest(t *testing.T) {
func TestGetResourceUtilizationRatioNoRequests(t *testing.T) {
tc := resourceUtilizationRatioTestCase{
metrics: PodMetricsInfo{
"test-pod-0": 50, "test-pod-1": 76,
"test-pod-0": {Value: 50}, "test-pod-1": {Value: 76},
},
requests: map[string]int64{},
targetUtilization: 50,
@@ -138,7 +138,7 @@ func TestGetResourceUtilizationRatioNoRequests(t *testing.T) {
func TestGetMetricUtilizationRatioBaseCase(t *testing.T) {
tc := metricUtilizationRatioTestCase{
metrics: PodMetricsInfo{
"test-pod-0": 5000, "test-pod-1": 10000,
"test-pod-0": {Value: 5000}, "test-pod-1": {Value: 10000},
},
targetUtilization: 10000,
expectedUtilizationRatio: .75,

View File

@@ -32,12 +32,10 @@ import (
)
const (
// TODO(jbartosik): use actual value.
cpuSampleWindow = time.Minute
// defaultTestingTolerance is default value for calculating when to
// scale up/scale down.
defaultTestingTolerance = 0.1
defaultTestingCpuTaintAfterStart = 2 * time.Minute
defaultTestingCpuInitializationPeriod = 2 * time.Minute
defaultTestingDelayOfInitialReadinessStatus = 10 * time.Second
)
@@ -45,16 +43,16 @@ type ReplicaCalculator struct {
metricsClient metricsclient.MetricsClient
podsGetter v1coreclient.PodsGetter
tolerance float64
cpuTaintAfterStart time.Duration
cpuInitializationPeriod time.Duration
delayOfInitialReadinessStatus time.Duration
}
func NewReplicaCalculator(metricsClient metricsclient.MetricsClient, podsGetter v1coreclient.PodsGetter, tolerance float64, cpuTaintAfterStart, delayOfInitialReadinessStatus time.Duration) *ReplicaCalculator {
func NewReplicaCalculator(metricsClient metricsclient.MetricsClient, podsGetter v1coreclient.PodsGetter, tolerance float64, cpuInitializationPeriod, delayOfInitialReadinessStatus time.Duration) *ReplicaCalculator {
return &ReplicaCalculator{
metricsClient: metricsClient,
podsGetter: podsGetter,
tolerance: tolerance,
cpuTaintAfterStart: cpuTaintAfterStart,
cpuInitializationPeriod: cpuInitializationPeriod,
delayOfInitialReadinessStatus: delayOfInitialReadinessStatus,
}
}
@@ -77,7 +75,7 @@ func (c *ReplicaCalculator) GetResourceReplicas(currentReplicas int32, targetUti
return 0, 0, 0, time.Time{}, fmt.Errorf("no pods returned by selector while calculating replica count")
}
readyPodCount, ignoredPods, missingPods := groupPods(podList.Items, metrics, resource, c.cpuTaintAfterStart, c.delayOfInitialReadinessStatus)
readyPodCount, ignoredPods, missingPods := groupPods(podList.Items, metrics, resource, c.cpuInitializationPeriod, c.delayOfInitialReadinessStatus)
removeMetricsForPods(metrics, ignoredPods)
requests, err := calculatePodRequests(podList.Items, resource)
if err != nil {
@@ -108,12 +106,12 @@ func (c *ReplicaCalculator) GetResourceReplicas(currentReplicas int32, targetUti
if usageRatio < 1.0 {
// on a scale-down, treat missing pods as using 100% of the resource request
for podName := range missingPods {
metrics[podName] = requests[podName]
metrics[podName] = metricsclient.PodMetric{Value: requests[podName]}
}
} else if usageRatio > 1.0 {
// on a scale-up, treat missing pods as using 0% of the resource request
for podName := range missingPods {
metrics[podName] = 0
metrics[podName] = metricsclient.PodMetric{Value: 0}
}
}
}
@@ -121,7 +119,7 @@ func (c *ReplicaCalculator) GetResourceReplicas(currentReplicas int32, targetUti
if rebalanceIgnored {
// on a scale-up, treat unready pods as using 0% of the resource request
for podName := range ignoredPods {
metrics[podName] = 0
metrics[podName] = metricsclient.PodMetric{Value: 0}
}
}
@@ -178,7 +176,7 @@ func (c *ReplicaCalculator) calcPlainMetricReplicas(metrics metricsclient.PodMet
return 0, 0, fmt.Errorf("no pods returned by selector while calculating replica count")
}
readyPodCount, ignoredPods, missingPods := groupPods(podList.Items, metrics, resource, c.cpuTaintAfterStart, c.delayOfInitialReadinessStatus)
readyPodCount, ignoredPods, missingPods := groupPods(podList.Items, metrics, resource, c.cpuInitializationPeriod, c.delayOfInitialReadinessStatus)
removeMetricsForPods(metrics, ignoredPods)
if len(metrics) == 0 {
@@ -203,12 +201,12 @@ func (c *ReplicaCalculator) calcPlainMetricReplicas(metrics metricsclient.PodMet
if usageRatio < 1.0 {
// on a scale-down, treat missing pods as using 100% of the resource request
for podName := range missingPods {
metrics[podName] = targetUtilization
metrics[podName] = metricsclient.PodMetric{Value: targetUtilization}
}
} else {
// on a scale-up, treat missing pods as using 0% of the resource request
for podName := range missingPods {
metrics[podName] = 0
metrics[podName] = metricsclient.PodMetric{Value: 0}
}
}
}
@@ -216,7 +214,7 @@ func (c *ReplicaCalculator) calcPlainMetricReplicas(metrics metricsclient.PodMet
if rebalanceIgnored {
// on a scale-up, treat unready pods as using 0% of the resource request
for podName := range ignoredPods {
metrics[podName] = 0
metrics[podName] = metricsclient.PodMetric{Value: 0}
}
}
@@ -342,14 +340,15 @@ func (c *ReplicaCalculator) GetExternalPerPodMetricReplicas(currentReplicas int3
return replicaCount, utilization, timestamp, nil
}
func groupPods(pods []v1.Pod, metrics metricsclient.PodMetricsInfo, resource v1.ResourceName, cpuTaintAfterStart, delayOfInitialReadinessStatus time.Duration) (readyPodCount int, ignoredPods sets.String, missingPods sets.String) {
func groupPods(pods []v1.Pod, metrics metricsclient.PodMetricsInfo, resource v1.ResourceName, cpuInitializationPeriod, delayOfInitialReadinessStatus time.Duration) (readyPodCount int, ignoredPods sets.String, missingPods sets.String) {
missingPods = sets.NewString()
ignoredPods = sets.NewString()
for _, pod := range pods {
if pod.DeletionTimestamp != nil || pod.Status.Phase == v1.PodFailed {
continue
}
if _, found := metrics[pod.Name]; !found {
metric, found := metrics[pod.Name]
if !found {
missingPods.Insert(pod.Name)
continue
}
@@ -359,10 +358,13 @@ func groupPods(pods []v1.Pod, metrics metricsclient.PodMetricsInfo, resource v1.
if condition == nil || pod.Status.StartTime == nil {
ignorePod = true
} else {
if condition.Status == v1.ConditionTrue {
ignorePod = pod.Status.StartTime.Add(cpuTaintAfterStart + cpuSampleWindow).After(time.Now())
// Pod still within possible initialisation period.
if pod.Status.StartTime.Add(cpuInitializationPeriod).After(time.Now()) {
// Ignore sample if pod is unready or one window of metric wasn't collected since last state transition.
ignorePod = condition.Status == v1.ConditionFalse || metric.Timestamp.Before(condition.LastTransitionTime.Time.Add(metric.Window))
} else {
ignorePod = pod.Status.StartTime.Add(delayOfInitialReadinessStatus).After(condition.LastTransitionTime.Time)
// Ignore metric if pod is unready and it has never been ready.
ignorePod = condition.Status == v1.ConditionFalse && pod.Status.StartTime.Add(delayOfInitialReadinessStatus).After(condition.LastTransitionTime.Time)
}
}
if ignorePod {
@@ -370,7 +372,6 @@ func groupPods(pods []v1.Pod, metrics metricsclient.PodMetricsInfo, resource v1.
continue
}
}
readyPodCount++
}
return

View File

@@ -194,6 +194,7 @@ func (tc *replicaCalcTestCase) prepareTestMetricsClient() *metricsfake.Clientset
Labels: map[string]string{"name": podNamePrefix},
},
Timestamp: metav1.Time{Time: tc.timestamp},
Window: metav1.Duration{Duration: time.Minute},
Containers: make([]metricsapi.ContainerMetrics, numContainersPerPod),
}
@@ -337,7 +338,7 @@ func (tc *replicaCalcTestCase) runTest(t *testing.T) {
testClient, testMetricsClient, testCMClient, testEMClient := tc.prepareTestClient(t)
metricsClient := metricsclient.NewRESTMetricsClient(testMetricsClient.MetricsV1beta1(), testCMClient, testEMClient)
replicaCalc := NewReplicaCalculator(metricsClient, testClient.Core(), defaultTestingTolerance, defaultTestingCpuTaintAfterStart, defaultTestingDelayOfInitialReadinessStatus)
replicaCalc := NewReplicaCalculator(metricsClient, testClient.Core(), defaultTestingTolerance, defaultTestingCpuInitializationPeriod, defaultTestingDelayOfInitialReadinessStatus)
selector, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{
MatchLabels: map[string]string{"name": podNamePrefix},
@@ -1227,20 +1228,20 @@ func TestGroupPods(t *testing.T) {
metrics metricsclient.PodMetricsInfo
resource v1.ResourceName
expectReadyPodCount int
expectUnreadyPods sets.String
expectIgnoredPods sets.String
expectMissingPods sets.String
}{
{
"void",
[]v1.Pod{},
metricsclient.PodMetricsInfo{},
v1.ResourceName(""),
v1.ResourceCPU,
0,
sets.NewString(),
sets.NewString(),
},
{
"a ready pod",
"count in a ready pod - memory",
[]v1.Pod{
{
ObjectMeta: metav1.ObjectMeta{
@@ -1252,15 +1253,15 @@ func TestGroupPods(t *testing.T) {
},
},
metricsclient.PodMetricsInfo{
"bentham": 1,
"bentham": metricsclient.PodMetric{Value: 1, Timestamp: time.Now(), Window: time.Minute},
},
v1.ResourceName("hedons"),
v1.ResourceMemory,
1,
sets.NewString(),
sets.NewString(),
},
{
"an unready pod",
"ignore a pod without ready condition - CPU",
[]v1.Pod{
{
ObjectMeta: metav1.ObjectMeta{
@@ -1275,7 +1276,7 @@ func TestGroupPods(t *testing.T) {
},
},
metricsclient.PodMetricsInfo{
"lucretius": 1,
"lucretius": metricsclient.PodMetric{Value: 1},
},
v1.ResourceCPU,
0,
@@ -1283,11 +1284,101 @@ func TestGroupPods(t *testing.T) {
sets.NewString(),
},
{
"a ready cpu pod",
"count in a ready pod with fresh metrics during initialization period - CPU",
[]v1.Pod{
{
ObjectMeta: metav1.ObjectMeta{
Name: "niccolo",
Name: "bentham",
},
Status: v1.PodStatus{
Phase: v1.PodSucceeded,
StartTime: &metav1.Time{
Time: time.Now().Add(-1 * time.Minute),
},
Conditions: []v1.PodCondition{
{
Type: v1.PodReady,
LastTransitionTime: metav1.Time{Time: time.Now().Add(-30 * time.Second)},
Status: v1.ConditionTrue,
},
},
},
},
},
metricsclient.PodMetricsInfo{
"bentham": metricsclient.PodMetric{Value: 1, Timestamp: time.Now(), Window: 30 * time.Second},
},
v1.ResourceCPU,
1,
sets.NewString(),
sets.NewString(),
},
{
"ignore a ready pod without fresh metrics during initialization period - CPU",
[]v1.Pod{
{
ObjectMeta: metav1.ObjectMeta{
Name: "bentham",
},
Status: v1.PodStatus{
Phase: v1.PodSucceeded,
StartTime: &metav1.Time{
Time: time.Now().Add(-1 * time.Minute),
},
Conditions: []v1.PodCondition{
{
Type: v1.PodReady,
LastTransitionTime: metav1.Time{Time: time.Now().Add(-30 * time.Second)},
Status: v1.ConditionTrue,
},
},
},
},
},
metricsclient.PodMetricsInfo{
"bentham": metricsclient.PodMetric{Value: 1, Timestamp: time.Now(), Window: 60 * time.Second},
},
v1.ResourceCPU,
0,
sets.NewString("bentham"),
sets.NewString(),
},
{
"ignore an unready pod during initialization period - CPU",
[]v1.Pod{
{
ObjectMeta: metav1.ObjectMeta{
Name: "lucretius",
},
Status: v1.PodStatus{
Phase: v1.PodSucceeded,
StartTime: &metav1.Time{
Time: time.Now().Add(-10 * time.Minute),
},
Conditions: []v1.PodCondition{
{
Type: v1.PodReady,
LastTransitionTime: metav1.Time{Time: time.Now().Add(-9*time.Minute - 54*time.Second)},
Status: v1.ConditionFalse,
},
},
},
},
},
metricsclient.PodMetricsInfo{
"lucretius": metricsclient.PodMetric{Value: 1},
},
v1.ResourceCPU,
0,
sets.NewString("lucretius"),
sets.NewString(),
},
{
"count in a ready pod without fresh metrics after initialization period - CPU",
[]v1.Pod{
{
ObjectMeta: metav1.ObjectMeta{
Name: "bentham",
},
Status: v1.PodStatus{
Phase: v1.PodSucceeded,
@@ -1305,7 +1396,68 @@ func TestGroupPods(t *testing.T) {
},
},
metricsclient.PodMetricsInfo{
"niccolo": 1,
"bentham": metricsclient.PodMetric{Value: 1, Timestamp: time.Now().Add(-2 * time.Minute), Window: time.Minute},
},
v1.ResourceCPU,
1,
sets.NewString(),
sets.NewString(),
},
{
"count in an unready pod that was ready after initialization period - CPU",
[]v1.Pod{
{
ObjectMeta: metav1.ObjectMeta{
Name: "lucretius",
},
Status: v1.PodStatus{
Phase: v1.PodSucceeded,
StartTime: &metav1.Time{
Time: time.Now().Add(-10 * time.Minute),
},
Conditions: []v1.PodCondition{
{
Type: v1.PodReady,
LastTransitionTime: metav1.Time{Time: time.Now().Add(-9 * time.Minute)},
Status: v1.ConditionFalse,
},
},
},
},
},
metricsclient.PodMetricsInfo{
"lucretius": metricsclient.PodMetric{Value: 1},
},
v1.ResourceCPU,
1,
sets.NewString(),
sets.NewString(),
},
{
"ignore pod that has never been ready after initialization period - CPU",
[]v1.Pod{
{
ObjectMeta: metav1.ObjectMeta{
Name: "lucretius",
},
Status: v1.PodStatus{
Phase: v1.PodSucceeded,
StartTime: &metav1.Time{
Time: time.Now().Add(-10 * time.Minute),
},
Conditions: []v1.PodCondition{
{
Type: v1.PodReady,
LastTransitionTime: metav1.Time{Time: time.Now().Add(-9*time.Minute - 50*time.Second)},
Status: v1.ConditionFalse,
},
},
},
},
},
metricsclient.PodMetricsInfo{
"lucretius": metricsclient.PodMetric{Value: 1},
},
v1.ResourceCPU,
1,
@@ -1334,7 +1486,7 @@ func TestGroupPods(t *testing.T) {
sets.NewString("epicurus"),
},
{
"all together",
"several pods",
[]v1.Pod{
{
ObjectMeta: metav1.ObjectMeta{
@@ -1378,8 +1530,8 @@ func TestGroupPods(t *testing.T) {
},
},
metricsclient.PodMetricsInfo{
"lucretius": 1,
"niccolo": 1,
"lucretius": metricsclient.PodMetric{Value: 1},
"niccolo": metricsclient.PodMetric{Value: 1},
},
v1.ResourceCPU,
1,
@@ -1388,12 +1540,12 @@ func TestGroupPods(t *testing.T) {
},
}
for _, tc := range tests {
readyPodCount, unreadyPods, missingPods := groupPods(tc.pods, tc.metrics, tc.resource, defaultTestingCpuTaintAfterStart, defaultTestingDelayOfInitialReadinessStatus)
readyPodCount, ignoredPods, missingPods := groupPods(tc.pods, tc.metrics, tc.resource, defaultTestingCpuInitializationPeriod, defaultTestingDelayOfInitialReadinessStatus)
if readyPodCount != tc.expectReadyPodCount {
t.Errorf("%s got readyPodCount %d, expected %d", tc.name, readyPodCount, tc.expectReadyPodCount)
}
if !unreadyPods.Equal(tc.expectUnreadyPods) {
t.Errorf("%s got unreadyPods %v, expected %v", tc.name, unreadyPods, tc.expectUnreadyPods)
if !ignoredPods.Equal(tc.expectIgnoredPods) {
t.Errorf("%s got unreadyPods %v, expected %v", tc.name, ignoredPods, tc.expectIgnoredPods)
}
if !missingPods.Equal(tc.expectMissingPods) {
t.Errorf("%s got missingPods %v, expected %v", tc.name, missingPods, tc.expectMissingPods)