HPA: expose the metrics "reconciliations_total" and "reconciliation_duration_seconds" from HPA controller (#116010)
This commit is contained in:
@@ -18,6 +18,7 @@ package podautoscaler
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math"
|
||||
"sync"
|
||||
@@ -27,7 +28,7 @@ import (
|
||||
autoscalingv2 "k8s.io/api/autoscaling/v2"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
apiequality "k8s.io/apimachinery/pkg/api/equality"
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
k8serrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
apimeta "k8s.io/apimachinery/pkg/api/meta"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
@@ -50,6 +51,7 @@ import (
|
||||
"k8s.io/klog/v2"
|
||||
"k8s.io/kubernetes/pkg/controller"
|
||||
metricsclient "k8s.io/kubernetes/pkg/controller/podautoscaler/metrics"
|
||||
"k8s.io/kubernetes/pkg/controller/podautoscaler/monitor"
|
||||
"k8s.io/kubernetes/pkg/controller/util/selectors"
|
||||
)
|
||||
|
||||
@@ -58,6 +60,13 @@ var (
|
||||
scaleUpLimitMinimum = 4.0
|
||||
)
|
||||
|
||||
var (
|
||||
// errSpec is used to determine if the error comes from the spec of HPA object in reconcileAutoscaler.
|
||||
// All such errors should have this error as a root error so that the upstream function can distinguish spec errors from internal errors.
|
||||
// e.g., fmt.Errorf("invalid spec%w", errSpec)
|
||||
errSpec error = errors.New("")
|
||||
)
|
||||
|
||||
type timestampedRecommendation struct {
|
||||
recommendation int32
|
||||
timestamp time.Time
|
||||
@@ -82,6 +91,8 @@ type HorizontalController struct {
|
||||
|
||||
downscaleStabilisationWindow time.Duration
|
||||
|
||||
monitor monitor.Monitor
|
||||
|
||||
// hpaLister is able to list/get HPAs from the shared cache from the informer passed in to
|
||||
// NewHorizontalController.
|
||||
hpaLister autoscalinglisters.HorizontalPodAutoscalerLister
|
||||
@@ -139,6 +150,7 @@ func NewHorizontalController(
|
||||
scaleNamespacer: scaleNamespacer,
|
||||
hpaNamespacer: hpaNamespacer,
|
||||
downscaleStabilisationWindow: downscaleStabilisationWindow,
|
||||
monitor: monitor.New(),
|
||||
queue: workqueue.NewNamedRateLimitingQueue(NewDefaultHPARateLimiter(resyncPeriod), "horizontalpodautoscaler"),
|
||||
mapper: mapper,
|
||||
recommendations: map[string][]timestampedRecommendation{},
|
||||
@@ -174,6 +186,8 @@ func NewHorizontalController(
|
||||
)
|
||||
hpaController.replicaCalc = replicaCalc
|
||||
|
||||
monitor.Register()
|
||||
|
||||
return hpaController
|
||||
}
|
||||
|
||||
@@ -276,14 +290,17 @@ func (a *HorizontalController) processNextWorkItem(ctx context.Context) bool {
|
||||
}
|
||||
|
||||
// computeReplicasForMetrics computes the desired number of replicas for the metric specifications listed in the HPA,
|
||||
// returning the maximum of the computed replica counts, a description of the associated metric, and the statuses of
|
||||
// returning the maximum of the computed replica counts, a description of the associated metric, and the statuses of
|
||||
// all metrics computed.
|
||||
// It may return both valid metricDesiredReplicas and an error,
|
||||
// when some metrics still work and HPA should perform scaling based on them.
|
||||
// If HPA cannot do anything due to error, it returns -1 in metricDesiredReplicas as a failure signal.
|
||||
func (a *HorizontalController) computeReplicasForMetrics(ctx context.Context, hpa *autoscalingv2.HorizontalPodAutoscaler, scale *autoscalingv1.Scale,
|
||||
metricSpecs []autoscalingv2.MetricSpec) (replicas int32, metric string, statuses []autoscalingv2.MetricStatus, timestamp time.Time, err error) {
|
||||
|
||||
selector, err := a.validateAndParseSelector(hpa, scale.Status.Selector)
|
||||
if err != nil {
|
||||
return 0, "", nil, time.Time{}, err
|
||||
return -1, "", nil, time.Time{}, err
|
||||
}
|
||||
|
||||
specReplicas := scale.Spec.Replicas
|
||||
@@ -303,23 +320,29 @@ func (a *HorizontalController) computeReplicasForMetrics(ctx context.Context, hp
|
||||
invalidMetricError = err
|
||||
}
|
||||
invalidMetricsCount++
|
||||
continue
|
||||
}
|
||||
if err == nil && (replicas == 0 || replicaCountProposal > replicas) {
|
||||
if replicas == 0 || replicaCountProposal > replicas {
|
||||
timestamp = timestampProposal
|
||||
replicas = replicaCountProposal
|
||||
metric = metricNameProposal
|
||||
}
|
||||
}
|
||||
|
||||
if invalidMetricError != nil {
|
||||
invalidMetricError = fmt.Errorf("invalid metrics (%v invalid out of %v), first error is: %v", invalidMetricsCount, len(metricSpecs), invalidMetricError)
|
||||
}
|
||||
|
||||
// If all metrics are invalid or some are invalid and we would scale down,
|
||||
// return an error and set the condition of the hpa based on the first invalid metric.
|
||||
// Otherwise set the condition as scaling active as we're going to scale
|
||||
if invalidMetricsCount >= len(metricSpecs) || (invalidMetricsCount > 0 && replicas < specReplicas) {
|
||||
setCondition(hpa, invalidMetricCondition.Type, invalidMetricCondition.Status, invalidMetricCondition.Reason, invalidMetricCondition.Message)
|
||||
return 0, "", statuses, time.Time{}, fmt.Errorf("invalid metrics (%v invalid out of %v), first error is: %v", invalidMetricsCount, len(metricSpecs), invalidMetricError)
|
||||
return -1, "", statuses, time.Time{}, invalidMetricError
|
||||
}
|
||||
setCondition(hpa, autoscalingv2.ScalingActive, v1.ConditionTrue, "ValidMetricFound", "the HPA was able to successfully calculate a replica count from %s", metric)
|
||||
return replicas, metric, statuses, timestamp, nil
|
||||
|
||||
return replicas, metric, statuses, timestamp, invalidMetricError
|
||||
}
|
||||
|
||||
// hpasControllingPodsUnderSelector returns a list of keys of all HPAs that control a given list of pods.
|
||||
@@ -445,8 +468,8 @@ func (a *HorizontalController) computeReplicasForMetric(ctx context.Context, hpa
|
||||
return 0, "", time.Time{}, condition, fmt.Errorf("failed to get %s external metric value: %v", spec.External.Metric.Name, err)
|
||||
}
|
||||
default:
|
||||
errMsg := fmt.Sprintf("unknown metric source type %q", string(spec.Type))
|
||||
err = fmt.Errorf(errMsg)
|
||||
// It shouldn't reach here as invalid metric source type is filtered out in the api-server's validation.
|
||||
err = fmt.Errorf("unknown metric source type %q%w", string(spec.Type), errSpec)
|
||||
condition := a.getUnableComputeReplicaCountCondition(hpa, "InvalidMetricSourceType", err)
|
||||
return 0, "", time.Time{}, condition, err
|
||||
}
|
||||
@@ -462,7 +485,7 @@ func (a *HorizontalController) reconcileKey(ctx context.Context, key string) (de
|
||||
logger := klog.FromContext(ctx)
|
||||
|
||||
hpa, err := a.hpaLister.HorizontalPodAutoscalers(namespace).Get(name)
|
||||
if errors.IsNotFound(err) {
|
||||
if k8serrors.IsNotFound(err) {
|
||||
logger.Info("Horizontal Pod Autoscaler has been deleted", "HPA", klog.KRef(namespace, name))
|
||||
|
||||
a.recommendationsLock.Lock()
|
||||
@@ -691,7 +714,23 @@ func (a *HorizontalController) recordInitialRecommendation(currentReplicas int32
|
||||
}
|
||||
}
|
||||
|
||||
func (a *HorizontalController) reconcileAutoscaler(ctx context.Context, hpaShared *autoscalingv2.HorizontalPodAutoscaler, key string) error {
|
||||
func (a *HorizontalController) reconcileAutoscaler(ctx context.Context, hpaShared *autoscalingv2.HorizontalPodAutoscaler, key string) (retErr error) {
|
||||
// actionLabel is used to report which actions this reconciliation has taken.
|
||||
actionLabel := monitor.ActionLabelNone
|
||||
start := time.Now()
|
||||
defer func() {
|
||||
errorLabel := monitor.ErrorLabelNone
|
||||
if retErr != nil {
|
||||
// In case of error, set "internal" as default.
|
||||
errorLabel = monitor.ErrorLabelInternal
|
||||
}
|
||||
if errors.Is(retErr, errSpec) {
|
||||
errorLabel = monitor.ErrorLabelSpec
|
||||
}
|
||||
|
||||
a.monitor.ObserveReconciliationResult(actionLabel, errorLabel, time.Since(start))
|
||||
}()
|
||||
|
||||
// make a copy so that we never mutate the shared informer cache (conversion can mutate the object)
|
||||
hpa := hpaShared.DeepCopy()
|
||||
hpaStatusOriginal := hpa.Status.DeepCopy()
|
||||
@@ -705,7 +744,7 @@ func (a *HorizontalController) reconcileAutoscaler(ctx context.Context, hpaShare
|
||||
if err := a.updateStatusIfNeeded(ctx, hpaStatusOriginal, hpa); err != nil {
|
||||
utilruntime.HandleError(err)
|
||||
}
|
||||
return fmt.Errorf("invalid API version in scale target reference: %v", err)
|
||||
return fmt.Errorf("invalid API version in scale target reference: %v%w", err, errSpec)
|
||||
}
|
||||
|
||||
targetGK := schema.GroupKind{
|
||||
@@ -771,7 +810,9 @@ func (a *HorizontalController) reconcileAutoscaler(ctx context.Context, hpaShare
|
||||
} else {
|
||||
var metricTimestamp time.Time
|
||||
metricDesiredReplicas, metricName, metricStatuses, metricTimestamp, err = a.computeReplicasForMetrics(ctx, hpa, scale, hpa.Spec.Metrics)
|
||||
if err != nil {
|
||||
// computeReplicasForMetrics may return both non-zero metricDesiredReplicas and an error.
|
||||
// That means some metrics still work and HPA should perform scaling based on them.
|
||||
if err != nil && metricDesiredReplicas == -1 {
|
||||
a.setCurrentReplicasInStatus(hpa, currentReplicas)
|
||||
if err := a.updateStatusIfNeeded(ctx, hpaStatusOriginal, hpa); err != nil {
|
||||
utilruntime.HandleError(err)
|
||||
@@ -779,6 +820,10 @@ func (a *HorizontalController) reconcileAutoscaler(ctx context.Context, hpaShare
|
||||
a.eventRecorder.Event(hpa, v1.EventTypeWarning, "FailedComputeMetricsReplicas", err.Error())
|
||||
return fmt.Errorf("failed to compute desired number of replicas based on listed metrics for %s: %v", reference, err)
|
||||
}
|
||||
if err != nil {
|
||||
// We proceed to scaling, but return this error from reconcileAutoscaler() finally.
|
||||
retErr = err
|
||||
}
|
||||
|
||||
logger.V(4).Info("Proposing desired replicas",
|
||||
"desiredReplicas", metricDesiredReplicas,
|
||||
@@ -825,6 +870,12 @@ func (a *HorizontalController) reconcileAutoscaler(ctx context.Context, hpaShare
|
||||
"currentReplicas", currentReplicas,
|
||||
"desiredReplicas", desiredReplicas,
|
||||
"reason", rescaleReason)
|
||||
|
||||
if desiredReplicas > currentReplicas {
|
||||
actionLabel = monitor.ActionLabelScaleUp
|
||||
} else {
|
||||
actionLabel = monitor.ActionLabelScaleDown
|
||||
}
|
||||
} else {
|
||||
logger.V(4).Info("Decided not to scale",
|
||||
"scaleTarget", reference,
|
||||
@@ -834,7 +885,14 @@ func (a *HorizontalController) reconcileAutoscaler(ctx context.Context, hpaShare
|
||||
}
|
||||
|
||||
a.setStatus(hpa, currentReplicas, desiredReplicas, metricStatuses, rescale)
|
||||
return a.updateStatusIfNeeded(ctx, hpaStatusOriginal, hpa)
|
||||
|
||||
err = a.updateStatusIfNeeded(ctx, hpaStatusOriginal, hpa)
|
||||
if err != nil {
|
||||
// we can overwrite retErr in this case because it's an internal error.
|
||||
return err
|
||||
}
|
||||
|
||||
return retErr
|
||||
}
|
||||
|
||||
// stabilizeRecommendation:
|
||||
|
Reference in New Issue
Block a user