From 8cdda36b1ea0b541ad278e7c067637b4e5e77817 Mon Sep 17 00:00:00 2001 From: Jerzy Szczepkowski Date: Wed, 2 Dec 2015 09:24:17 +0100 Subject: [PATCH] Fixed forbidden window enforcement in horizontal pod autoscaler. Fixed forbidden window enforcement in horizontal pod autoscaler: time of the oldest report instead of now is taken into account. Fixes #17992. --- pkg/controller/podautoscaler/horizontal.go | 21 +++-- .../podautoscaler/metrics/metrics_client.go | 78 ++++++++++--------- .../metrics/metrics_client_test.go | 52 ++++++++----- 3 files changed, 86 insertions(+), 65 deletions(-) diff --git a/pkg/controller/podautoscaler/horizontal.go b/pkg/controller/podautoscaler/horizontal.go index ecffd8b7ca7..28e24c54aa3 100644 --- a/pkg/controller/podautoscaler/horizontal.go +++ b/pkg/controller/podautoscaler/horizontal.go @@ -68,27 +68,27 @@ func (a *HorizontalController) Run(syncPeriod time.Duration) { }, syncPeriod, util.NeverStop) } -func (a *HorizontalController) computeReplicasForCPUUtilization(hpa extensions.HorizontalPodAutoscaler, scale *extensions.Scale) (int, *int, error) { +func (a *HorizontalController) computeReplicasForCPUUtilization(hpa extensions.HorizontalPodAutoscaler, scale *extensions.Scale) (int, *int, time.Time, error) { if hpa.Spec.CPUUtilization == nil { // If CPUTarget is not specified than we should return some default values. // Since we always take maximum number of replicas from all policies it is safe // to just return 0. - return 0, nil, nil + return 0, nil, time.Time{}, nil } currentReplicas := scale.Status.Replicas - currentUtilization, err := a.metricsClient.GetCPUUtilization(hpa.Namespace, scale.Status.Selector) + currentUtilization, timestamp, err := a.metricsClient.GetCPUUtilization(hpa.Namespace, scale.Status.Selector) // TODO: what to do on partial errors (like metrics obtained for 75% of pods). if err != nil { a.eventRecorder.Event(&hpa, api.EventTypeWarning, "FailedGetMetrics", err.Error()) - return 0, nil, fmt.Errorf("failed to get cpu utilization: %v", err) + return 0, nil, time.Time{}, fmt.Errorf("failed to get cpu utilization: %v", err) } usageRatio := float64(*currentUtilization) / float64(hpa.Spec.CPUUtilization.TargetPercentage) if math.Abs(1.0-usageRatio) > tolerance { - return int(math.Ceil(usageRatio * float64(currentReplicas))), currentUtilization, nil + return int(math.Ceil(usageRatio * float64(currentReplicas))), currentUtilization, timestamp, nil } else { - return currentReplicas, currentUtilization, nil + return currentReplicas, currentUtilization, timestamp, nil } } @@ -102,7 +102,7 @@ func (a *HorizontalController) reconcileAutoscaler(hpa extensions.HorizontalPodA } currentReplicas := scale.Status.Replicas - desiredReplicas, currentUtilization, err := a.computeReplicasForCPUUtilization(hpa, scale) + desiredReplicas, currentUtilization, timestamp, err := a.computeReplicasForCPUUtilization(hpa, scale) if err != nil { a.eventRecorder.Event(&hpa, api.EventTypeWarning, "FailedComputeReplicas", err.Error()) return fmt.Errorf("failed to compute desired number of replicas based on CPU utilization for %s: %v", reference, err) @@ -120,7 +120,6 @@ func (a *HorizontalController) reconcileAutoscaler(hpa extensions.HorizontalPodA if desiredReplicas > hpa.Spec.MaxReplicas { desiredReplicas = hpa.Spec.MaxReplicas } - now := time.Now() rescale := false if desiredReplicas != currentReplicas { @@ -128,7 +127,7 @@ func (a *HorizontalController) reconcileAutoscaler(hpa extensions.HorizontalPodA // and there was no rescaling in the last downscaleForbiddenWindow. if desiredReplicas < currentReplicas && (hpa.Status.LastScaleTime == nil || - hpa.Status.LastScaleTime.Add(downscaleForbiddenWindow).Before(now)) { + hpa.Status.LastScaleTime.Add(downscaleForbiddenWindow).Before(timestamp)) { rescale = true } @@ -136,7 +135,7 @@ func (a *HorizontalController) reconcileAutoscaler(hpa extensions.HorizontalPodA // and there was no rescaling in the last upscaleForbiddenWindow. if desiredReplicas > currentReplicas && (hpa.Status.LastScaleTime == nil || - hpa.Status.LastScaleTime.Add(upscaleForbiddenWindow).Before(now)) { + hpa.Status.LastScaleTime.Add(upscaleForbiddenWindow).Before(timestamp)) { rescale = true } } @@ -162,7 +161,7 @@ func (a *HorizontalController) reconcileAutoscaler(hpa extensions.HorizontalPodA LastScaleTime: hpa.Status.LastScaleTime, } if rescale { - now := unversioned.NewTime(now) + now := unversioned.NewTime(time.Now()) hpa.Status.LastScaleTime = &now } diff --git a/pkg/controller/podautoscaler/metrics/metrics_client.go b/pkg/controller/podautoscaler/metrics/metrics_client.go index 4cb9dbb54d0..24cd77c4566 100644 --- a/pkg/controller/podautoscaler/metrics/metrics_client.go +++ b/pkg/controller/podautoscaler/metrics/metrics_client.go @@ -43,10 +43,10 @@ var heapsterQueryStart = -5 * time.Minute // MetricsClient is an interface for getting metrics for pods. type MetricsClient interface { - // GetCPUUtilization returns average utilization over all pods - // represented as a percent of requested CPU, e.g. 70 means that - // an average pod uses 70% of the requested CPU. - GetCPUUtilization(namespace string, selector map[string]string) (*int, error) + // GetCPUUtilization returns the average utilization over all pods represented as a percent of requested CPU + // (e.g. 70 means that an average pod uses 70% of the requested CPU) + // and the time of generation of the oldest of utilization reports for pods. + GetCPUUtilization(namespace string, selector map[string]string) (*int, time.Time, error) } // ResourceConsumption specifies consumption of a particular resource. @@ -55,9 +55,8 @@ type ResourceConsumption struct { Quantity resource.Quantity } -// Aggregates results into ResourceConsumption. Also returns number of -// pods included in the aggregation. -type metricAggregator func(heapster.MetricResultList) (ResourceConsumption, int) +// Aggregates results into ResourceConsumption. Also returns number of pods included in the aggregation. +type metricAggregator func(heapster.MetricResultList) (ResourceConsumption, int, time.Time) type metricDefinition struct { name string @@ -76,23 +75,23 @@ type HeapsterMetricsClient struct { var heapsterMetricDefinitions = map[api.ResourceName]metricDefinition{ api.ResourceCPU: {"cpu-usage", - func(metrics heapster.MetricResultList) (ResourceConsumption, int) { - sum, count := calculateSumFromLatestSample(metrics) + func(metrics heapster.MetricResultList) (ResourceConsumption, int, time.Time) { + sum, count, timestamp := calculateSumFromLatestSample(metrics) value := "0" if count > 0 { // assumes that cpu usage is in millis value = fmt.Sprintf("%dm", sum/uint64(count)) } - return ResourceConsumption{Resource: api.ResourceCPU, Quantity: resource.MustParse(value)}, count + return ResourceConsumption{Resource: api.ResourceCPU, Quantity: resource.MustParse(value)}, count, timestamp }}, api.ResourceMemory: {"memory-usage", - func(metrics heapster.MetricResultList) (ResourceConsumption, int) { - sum, count := calculateSumFromLatestSample(metrics) + func(metrics heapster.MetricResultList) (ResourceConsumption, int, time.Time) { + sum, count, timestamp := calculateSumFromLatestSample(metrics) value := int64(0) if count > 0 { value = int64(sum) / int64(count) } - return ResourceConsumption{Resource: api.ResourceMemory, Quantity: *resource.NewQuantity(value, resource.DecimalSI)}, count + return ResourceConsumption{Resource: api.ResourceMemory, Quantity: *resource.NewQuantity(value, resource.DecimalSI)}, count, timestamp }}, } @@ -108,22 +107,22 @@ func NewHeapsterMetricsClient(client client.Interface, namespace, scheme, servic } } -func (h *HeapsterMetricsClient) GetCPUUtilization(namespace string, selector map[string]string) (*int, error) { - consumption, request, err := h.GetResourceConsumptionAndRequest(api.ResourceCPU, namespace, selector) +func (h *HeapsterMetricsClient) GetCPUUtilization(namespace string, selector map[string]string) (*int, time.Time, error) { + consumption, request, timestamp, err := h.GetResourceConsumptionAndRequest(api.ResourceCPU, namespace, selector) if err != nil { - return nil, fmt.Errorf("failed to get CPU consumption and request: %v", err) + return nil, time.Time{}, fmt.Errorf("failed to get CPU consumption and request: %v", err) } utilization := new(int) *utilization = int(float64(consumption.Quantity.MilliValue()) / float64(request.MilliValue()) * 100) - return utilization, nil + return utilization, timestamp, nil } -func (h *HeapsterMetricsClient) GetResourceConsumptionAndRequest(resourceName api.ResourceName, namespace string, selector map[string]string) (consumption *ResourceConsumption, request *resource.Quantity, err error) { +func (h *HeapsterMetricsClient) GetResourceConsumptionAndRequest(resourceName api.ResourceName, namespace string, selector map[string]string) (consumption *ResourceConsumption, request *resource.Quantity, timestamp time.Time, err error) { podList, err := h.client.Pods(namespace). List(labels.SelectorFromSet(labels.Set(selector)), fields.Everything()) if err != nil { - return nil, nil, fmt.Errorf("failed to get pod list: %v", err) + return nil, nil, time.Time{}, fmt.Errorf("failed to get pod list: %v", err) } podNames := []string{} sum := resource.MustParse("0") @@ -140,22 +139,22 @@ func (h *HeapsterMetricsClient) GetResourceConsumptionAndRequest(resourceName ap } } if missing || sum.Cmp(resource.MustParse("0")) == 0 { - return nil, nil, fmt.Errorf("some pods do not have request for %s", resourceName) + return nil, nil, time.Time{}, fmt.Errorf("some pods do not have request for %s", resourceName) } glog.Infof("Sum of %s requested: %v", resourceName, sum) avg := resource.MustParse(fmt.Sprintf("%dm", sum.MilliValue()/int64(len(podList.Items)))) request = &avg - consumption, err = h.getForPods(resourceName, namespace, podNames) + consumption, timestamp, err = h.getForPods(resourceName, namespace, podNames) if err != nil { - return nil, nil, err + return nil, nil, time.Time{}, err } - return consumption, request, nil + return consumption, request, timestamp, nil } -func (h *HeapsterMetricsClient) getForPods(resourceName api.ResourceName, namespace string, podNames []string) (*ResourceConsumption, error) { +func (h *HeapsterMetricsClient) getForPods(resourceName api.ResourceName, namespace string, podNames []string) (*ResourceConsumption, time.Time, error) { metricSpec, metricDefined := h.resourceDefinitions[resourceName] if !metricDefined { - return nil, fmt.Errorf("heapster metric not defined for %v", resourceName) + return nil, time.Time{}, fmt.Errorf("heapster metric not defined for %v", resourceName) } now := time.Now() @@ -170,30 +169,33 @@ func (h *HeapsterMetricsClient) getForPods(resourceName api.ResourceName, namesp DoRaw() if err != nil { - return nil, fmt.Errorf("failed to get pods metrics: %v", err) + return nil, time.Time{}, fmt.Errorf("failed to get pods metrics: %v", err) } var metrics heapster.MetricResultList err = json.Unmarshal(resultRaw, &metrics) if err != nil { - return nil, fmt.Errorf("failed to unmarshall heapster response: %v", err) + return nil, time.Time{}, fmt.Errorf("failed to unmarshall heapster response: %v", err) } glog.Infof("Metrics available: %s", string(resultRaw)) - currentConsumption, count := metricSpec.aggregator(metrics) + currentConsumption, count, timestamp := metricSpec.aggregator(metrics) if count != len(podNames) { - return nil, fmt.Errorf("metrics obtained for %d/%d of pods", count, len(podNames)) + return nil, time.Time{}, fmt.Errorf("metrics obtained for %d/%d of pods", count, len(podNames)) } - return ¤tConsumption, nil + return ¤tConsumption, timestamp, nil } -func calculateSumFromLatestSample(metrics heapster.MetricResultList) (uint64, int) { - sum := uint64(0) - count := 0 +func calculateSumFromLatestSample(metrics heapster.MetricResultList) (sum uint64, count int, timestamp time.Time) { + sum = uint64(0) + count = 0 + timestamp = time.Time{} + var oldest *time.Time // creation time of the oldest of used samples across pods + oldest = nil for _, metrics := range metrics.Items { - var newest *heapster.MetricPoint + var newest *heapster.MetricPoint // creation time of the newest sample for pod newest = nil for i, metricPoint := range metrics.Metrics { if newest == nil || newest.Timestamp.Before(metricPoint.Timestamp) { @@ -201,9 +203,15 @@ func calculateSumFromLatestSample(metrics heapster.MetricResultList) (uint64, in } } if newest != nil { + if oldest == nil || newest.Timestamp.Before(*oldest) { + oldest = &newest.Timestamp + } sum += newest.Value count++ } } - return sum, count + if oldest != nil { + timestamp = *oldest + } + return sum, count, timestamp } diff --git a/pkg/controller/podautoscaler/metrics/metrics_client_test.go b/pkg/controller/podautoscaler/metrics/metrics_client_test.go index 3925ada02a7..4f065289f35 100644 --- a/pkg/controller/podautoscaler/metrics/metrics_client_test.go +++ b/pkg/controller/podautoscaler/metrics/metrics_client_test.go @@ -35,6 +35,8 @@ import ( "github.com/stretchr/testify/assert" ) +var fixedTimestamp = time.Date(2015, time.November, 10, 12, 30, 0, 0, time.UTC) + func (w fakeResponseWrapper) DoRaw() ([]byte, error) { return w.raw, nil } @@ -62,6 +64,7 @@ type testCase struct { desiredValue int64 desiredError error targetResource api.ResourceName + targetTimestamp int reportedMetricsPoints [][]metricPoint namespace string selector map[string]string @@ -108,12 +111,11 @@ func (tc *testCase) prepareTestClient(t *testing.T) *testclient.Fake { fakeClient.AddProxyReactor("services", func(action testclient.Action) (handled bool, ret client.ResponseWrapper, err error) { metrics := heapster.MetricResultList{} - firstTimestamp := time.Now() var latestTimestamp time.Time for _, reportedMetricPoints := range tc.reportedMetricsPoints { var heapsterMetricPoints []heapster.MetricPoint for _, reportedMetricPoint := range reportedMetricPoints { - timestamp := firstTimestamp.Add(time.Duration(reportedMetricPoint.timestamp) * time.Minute) + timestamp := fixedTimestamp.Add(time.Duration(reportedMetricPoint.timestamp) * time.Minute) if latestTimestamp.Before(timestamp) { latestTimestamp = timestamp } @@ -133,7 +135,7 @@ func (tc *testCase) prepareTestClient(t *testing.T) *testclient.Fake { return fakeClient } -func (tc *testCase) verifyResults(t *testing.T, val *ResourceConsumption, err error) { +func (tc *testCase) verifyResults(t *testing.T, val *ResourceConsumption, timestamp time.Time, err error) { assert.Equal(t, tc.desiredError, err) if tc.desiredError != nil { return @@ -144,13 +146,15 @@ func (tc *testCase) verifyResults(t *testing.T, val *ResourceConsumption, err er if tc.targetResource == api.ResourceMemory { assert.Equal(t, tc.desiredValue, val.Quantity.Value()) } + targetTimestamp := fixedTimestamp.Add(time.Duration(tc.targetTimestamp) * time.Minute) + assert.Equal(t, targetTimestamp, timestamp) } func (tc *testCase) runTest(t *testing.T) { testClient := tc.prepareTestClient(t) metricsClient := NewHeapsterMetricsClient(testClient, DefaultHeapsterNamespace, DefaultHeapsterScheme, DefaultHeapsterService, DefaultHeapsterPort) - val, _, err := metricsClient.GetResourceConsumptionAndRequest(tc.targetResource, tc.namespace, tc.selector) - tc.verifyResults(t, val, err) + val, _, timestamp, err := metricsClient.GetResourceConsumptionAndRequest(tc.targetResource, tc.namespace, tc.selector) + tc.verifyResults(t, val, timestamp, err) } func TestCPU(t *testing.T) { @@ -158,6 +162,7 @@ func TestCPU(t *testing.T) { replicas: 3, desiredValue: 5000, targetResource: api.ResourceCPU, + targetTimestamp: 1, reportedMetricsPoints: [][]metricPoint{{{5000, 1}}, {{5000, 1}}, {{5000, 1}}}, } tc.runTest(t) @@ -168,6 +173,7 @@ func TestMemory(t *testing.T) { replicas: 3, desiredValue: 5000, targetResource: api.ResourceMemory, + targetTimestamp: 1, reportedMetricsPoints: [][]metricPoint{{{5000, 1}}, {{5000, 2}}, {{5000, 4}}}, } tc.runTest(t) @@ -178,6 +184,7 @@ func TestCPUSumEqualZero(t *testing.T) { replicas: 3, desiredValue: 0, targetResource: api.ResourceCPU, + targetTimestamp: 0, reportedMetricsPoints: [][]metricPoint{{{0, 0}}, {{0, 0}}, {{0, 0}}}, } tc.runTest(t) @@ -188,6 +195,7 @@ func TestMemorySumEqualZero(t *testing.T) { replicas: 3, desiredValue: 0, targetResource: api.ResourceMemory, + targetTimestamp: 0, reportedMetricsPoints: [][]metricPoint{{{0, 0}}, {{0, 0}}, {{0, 0}}}, } tc.runTest(t) @@ -195,9 +203,10 @@ func TestMemorySumEqualZero(t *testing.T) { func TestCPUMoreMetrics(t *testing.T) { tc := testCase{ - replicas: 5, - desiredValue: 5000, - targetResource: api.ResourceCPU, + replicas: 5, + desiredValue: 5000, + targetResource: api.ResourceCPU, + targetTimestamp: 10, reportedMetricsPoints: [][]metricPoint{ {{0, 3}, {0, 6}, {5, 4}, {9000, 10}}, {{5000, 2}, {10, 5}, {66, 1}, {0, 10}}, @@ -210,9 +219,10 @@ func TestCPUMoreMetrics(t *testing.T) { func TestMemoryMoreMetrics(t *testing.T) { tc := testCase{ - replicas: 5, - desiredValue: 5000, - targetResource: api.ResourceMemory, + replicas: 5, + desiredValue: 5000, + targetResource: api.ResourceMemory, + targetTimestamp: 10, reportedMetricsPoints: [][]metricPoint{ {{0, 3}, {0, 6}, {5, 4}, {9000, 10}}, {{5000, 2}, {10, 5}, {66, 1}, {0, 10}}, @@ -228,6 +238,7 @@ func TestCPUResultIsFloat(t *testing.T) { replicas: 6, desiredValue: 4783, targetResource: api.ResourceCPU, + targetTimestamp: 4, reportedMetricsPoints: [][]metricPoint{{{4000, 4}}, {{9500, 4}}, {{3000, 4}}, {{7000, 4}}, {{3200, 4}}, {{2000, 4}}}, } tc.runTest(t) @@ -238,6 +249,7 @@ func TestMemoryResultIsFloat(t *testing.T) { replicas: 6, desiredValue: 4783, targetResource: api.ResourceMemory, + targetTimestamp: 4, reportedMetricsPoints: [][]metricPoint{{{4000, 4}}, {{9500, 4}}, {{3000, 4}}, {{7000, 4}}, {{3200, 4}}, {{2000, 4}}}, } tc.runTest(t) @@ -245,22 +257,24 @@ func TestMemoryResultIsFloat(t *testing.T) { func TestCPUSamplesWithRandomTimestamps(t *testing.T) { tc := testCase{ - replicas: 3, - desiredValue: 3000, - targetResource: api.ResourceCPU, + replicas: 3, + desiredValue: 3000, + targetResource: api.ResourceCPU, + targetTimestamp: 3, reportedMetricsPoints: [][]metricPoint{ - {{1, 1}, {3000, 3}, {2, 2}}, + {{1, 1}, {3000, 5}, {2, 2}}, {{2, 2}, {1, 1}, {3000, 3}}, - {{3000, 3}, {1, 1}, {2, 2}}}, + {{3000, 4}, {1, 1}, {2, 2}}}, } tc.runTest(t) } func TestMemorySamplesWithRandomTimestamps(t *testing.T) { tc := testCase{ - replicas: 3, - desiredValue: 3000, - targetResource: api.ResourceMemory, + replicas: 3, + desiredValue: 3000, + targetResource: api.ResourceMemory, + targetTimestamp: 3, reportedMetricsPoints: [][]metricPoint{ {{1, 1}, {3000, 3}, {2, 2}}, {{2, 2}, {1, 1}, {3000, 3}},