apiserver: define metrics for API Priority and Fairness borrowing
This commit is contained in:
		@@ -216,6 +216,9 @@ type priorityLevelState struct {
 | 
				
			|||||||
	// Integrator of seat demand, reset every CurrentCL adjustment period
 | 
						// Integrator of seat demand, reset every CurrentCL adjustment period
 | 
				
			||||||
	seatDemandIntegrator fq.Integrator
 | 
						seatDemandIntegrator fq.Integrator
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// Gauge of seat demand / nominalCL
 | 
				
			||||||
 | 
						seatDemandRatioedGauge metrics.RatioedGauge
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// seatDemandStats is derived from periodically examining the seatDemandIntegrator.
 | 
						// seatDemandStats is derived from periodically examining the seatDemandIntegrator.
 | 
				
			||||||
	// The average, standard deviation, and high watermark come directly from the integrator.
 | 
						// The average, standard deviation, and high watermark come directly from the integrator.
 | 
				
			||||||
	// envelope = avg + stdDev.
 | 
						// envelope = avg + stdDev.
 | 
				
			||||||
@@ -371,7 +374,7 @@ func (cfgCtlr *configController) updateBorrowing() {
 | 
				
			|||||||
	for _, plState := range cfgCtlr.priorityLevelStates {
 | 
						for _, plState := range cfgCtlr.priorityLevelStates {
 | 
				
			||||||
		obs := plState.seatDemandIntegrator.Reset()
 | 
							obs := plState.seatDemandIntegrator.Reset()
 | 
				
			||||||
		plState.seatDemandStats.update(obs)
 | 
							plState.seatDemandStats.update(obs)
 | 
				
			||||||
		// TODO: set borrowing metrics introduced in https://github.com/kubernetes/enhancements/pull/3391
 | 
							metrics.NotePriorityLevelConcurrencyAdjustment(plState.pl.Name, plState.seatDemandStats.highWatermark, plState.seatDemandStats.avg, plState.seatDemandStats.stdDev, plState.seatDemandStats.smoothed /* TODO: add the designed rest for borrowing */)
 | 
				
			||||||
		// TODO: updathe CurrentCL as described in https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/1040-priority-and-fairness#dispatching
 | 
							// TODO: updathe CurrentCL as described in https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/1040-priority-and-fairness#dispatching
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
@@ -602,9 +605,10 @@ func (meal *cfgMeal) digestNewPLsLocked(newPLs []*flowcontrol.PriorityLevelConfi
 | 
				
			|||||||
				reqsGaugePair:          metrics.RatioedGaugeVecPhasedElementPair(meal.cfgCtlr.reqsGaugeVec, 1, 1, labelValues),
 | 
									reqsGaugePair:          metrics.RatioedGaugeVecPhasedElementPair(meal.cfgCtlr.reqsGaugeVec, 1, 1, labelValues),
 | 
				
			||||||
				execSeatsObs:           meal.cfgCtlr.execSeatsGaugeVec.NewForLabelValuesSafe(0, 1, labelValues),
 | 
									execSeatsObs:           meal.cfgCtlr.execSeatsGaugeVec.NewForLabelValuesSafe(0, 1, labelValues),
 | 
				
			||||||
				seatDemandIntegrator:   fq.NewNamedIntegrator(meal.cfgCtlr.clock, pl.Name),
 | 
									seatDemandIntegrator:   fq.NewNamedIntegrator(meal.cfgCtlr.clock, pl.Name),
 | 
				
			||||||
 | 
									seatDemandRatioedGauge: metrics.ApiserverSeatDemands.NewForLabelValuesSafe(0, 1, []string{pl.Name}),
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		qsCompleter, err := queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, state.queues, pl, meal.cfgCtlr.requestWaitLimit, state.reqsGaugePair, state.execSeatsObs, state.seatDemandIntegrator)
 | 
							qsCompleter, err := queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, state.queues, pl, meal.cfgCtlr.requestWaitLimit, state.reqsGaugePair, state.execSeatsObs, metrics.NewUnionGauge(state.seatDemandIntegrator, state.seatDemandRatioedGauge))
 | 
				
			||||||
		if err != nil {
 | 
							if err != nil {
 | 
				
			||||||
			klog.Warningf("Ignoring PriorityLevelConfiguration object %s because its spec (%s) is broken: %s", pl.Name, fcfmt.Fmt(pl.Spec), err)
 | 
								klog.Warningf("Ignoring PriorityLevelConfiguration object %s because its spec (%s) is broken: %s", pl.Name, fcfmt.Fmt(pl.Spec), err)
 | 
				
			||||||
			continue
 | 
								continue
 | 
				
			||||||
@@ -708,7 +712,7 @@ func (meal *cfgMeal) processOldPLsLocked() {
 | 
				
			|||||||
			}
 | 
								}
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		var err error
 | 
							var err error
 | 
				
			||||||
		plState.qsCompleter, err = queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, plState.queues, plState.pl, meal.cfgCtlr.requestWaitLimit, plState.reqsGaugePair, plState.execSeatsObs, plState.seatDemandIntegrator)
 | 
							plState.qsCompleter, err = queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, plState.queues, plState.pl, meal.cfgCtlr.requestWaitLimit, plState.reqsGaugePair, plState.execSeatsObs, metrics.NewUnionGauge(plState.seatDemandIntegrator, plState.seatDemandRatioedGauge))
 | 
				
			||||||
		if err != nil {
 | 
							if err != nil {
 | 
				
			||||||
			// This can not happen because queueSetCompleterForPL already approved this config
 | 
								// This can not happen because queueSetCompleterForPL already approved this config
 | 
				
			||||||
			panic(fmt.Sprintf("%s from name=%q spec=%s", err, plName, fcfmt.Fmt(plState.pl.Spec)))
 | 
								panic(fmt.Sprintf("%s from name=%q spec=%s", err, plName, fcfmt.Fmt(plState.pl.Spec)))
 | 
				
			||||||
@@ -743,6 +747,8 @@ func (meal *cfgMeal) finishQueueSetReconfigsLocked() {
 | 
				
			|||||||
		// difference will be negligible.
 | 
							// difference will be negligible.
 | 
				
			||||||
		concurrencyLimit := int(math.Ceil(float64(meal.cfgCtlr.serverConcurrencyLimit) * float64(plState.pl.Spec.Limited.NominalConcurrencyShares) / meal.shareSum))
 | 
							concurrencyLimit := int(math.Ceil(float64(meal.cfgCtlr.serverConcurrencyLimit) * float64(plState.pl.Spec.Limited.NominalConcurrencyShares) / meal.shareSum))
 | 
				
			||||||
		metrics.UpdateSharedConcurrencyLimit(plName, concurrencyLimit)
 | 
							metrics.UpdateSharedConcurrencyLimit(plName, concurrencyLimit)
 | 
				
			||||||
 | 
							metrics.SetPriorityLevelConfiguration(plName, concurrencyLimit /* TODO: pass min and max once new API is available */, concurrencyLimit, concurrencyLimit)
 | 
				
			||||||
 | 
							plState.seatDemandRatioedGauge.SetDenominator(float64(concurrencyLimit))
 | 
				
			||||||
		meal.maxExecutingRequests += concurrencyLimit
 | 
							meal.maxExecutingRequests += concurrencyLimit
 | 
				
			||||||
		var waitLimit int
 | 
							var waitLimit int
 | 
				
			||||||
		if qCfg := plState.pl.Spec.Limited.LimitResponse.Queuing; qCfg != nil {
 | 
							if qCfg := plState.pl.Spec.Limited.LimitResponse.Queuing; qCfg != nil {
 | 
				
			||||||
@@ -765,7 +771,7 @@ func (meal *cfgMeal) finishQueueSetReconfigsLocked() {
 | 
				
			|||||||
// given priority level configuration.  Returns nil if that config
 | 
					// given priority level configuration.  Returns nil if that config
 | 
				
			||||||
// does not call for limiting.  Returns nil and an error if the given
 | 
					// does not call for limiting.  Returns nil and an error if the given
 | 
				
			||||||
// object is malformed in a way that is a problem for this package.
 | 
					// object is malformed in a way that is a problem for this package.
 | 
				
			||||||
func queueSetCompleterForPL(qsf fq.QueueSetFactory, queues fq.QueueSet, pl *flowcontrol.PriorityLevelConfiguration, requestWaitLimit time.Duration, reqsIntPair metrics.RatioedGaugePair, execSeatsObs metrics.RatioedGauge, seatDemandIntgrator fq.Integrator) (fq.QueueSetCompleter, error) {
 | 
					func queueSetCompleterForPL(qsf fq.QueueSetFactory, queues fq.QueueSet, pl *flowcontrol.PriorityLevelConfiguration, requestWaitLimit time.Duration, reqsIntPair metrics.RatioedGaugePair, execSeatsObs metrics.RatioedGauge, seatDemandGauge metrics.Gauge) (fq.QueueSetCompleter, error) {
 | 
				
			||||||
	if (pl.Spec.Type == flowcontrol.PriorityLevelEnablementExempt) != (pl.Spec.Limited == nil) {
 | 
						if (pl.Spec.Type == flowcontrol.PriorityLevelEnablementExempt) != (pl.Spec.Limited == nil) {
 | 
				
			||||||
		return nil, errors.New("broken union structure at the top")
 | 
							return nil, errors.New("broken union structure at the top")
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
@@ -794,7 +800,7 @@ func queueSetCompleterForPL(qsf fq.QueueSetFactory, queues fq.QueueSet, pl *flow
 | 
				
			|||||||
	if queues != nil {
 | 
						if queues != nil {
 | 
				
			||||||
		qsc, err = queues.BeginConfigChange(qcQS)
 | 
							qsc, err = queues.BeginConfigChange(qcQS)
 | 
				
			||||||
	} else {
 | 
						} else {
 | 
				
			||||||
		qsc, err = qsf.BeginConstruction(qcQS, reqsIntPair, execSeatsObs, seatDemandIntgrator)
 | 
							qsc, err = qsf.BeginConstruction(qcQS, reqsIntPair, execSeatsObs, seatDemandGauge)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		err = fmt.Errorf("priority level %q has QueuingConfiguration %#+v, which is invalid: %w", pl.Name, qcAPI, err)
 | 
							err = fmt.Errorf("priority level %q has QueuingConfiguration %#+v, which is invalid: %w", pl.Name, qcAPI, err)
 | 
				
			||||||
@@ -843,7 +849,8 @@ func (meal *cfgMeal) imaginePL(proto *flowcontrol.PriorityLevelConfiguration, re
 | 
				
			|||||||
	reqsGaugePair := metrics.RatioedGaugeVecPhasedElementPair(meal.cfgCtlr.reqsGaugeVec, 1, 1, labelValues)
 | 
						reqsGaugePair := metrics.RatioedGaugeVecPhasedElementPair(meal.cfgCtlr.reqsGaugeVec, 1, 1, labelValues)
 | 
				
			||||||
	execSeatsObs := meal.cfgCtlr.execSeatsGaugeVec.NewForLabelValuesSafe(0, 1, labelValues)
 | 
						execSeatsObs := meal.cfgCtlr.execSeatsGaugeVec.NewForLabelValuesSafe(0, 1, labelValues)
 | 
				
			||||||
	seatDemandIntegrator := fq.NewNamedIntegrator(meal.cfgCtlr.clock, proto.Name)
 | 
						seatDemandIntegrator := fq.NewNamedIntegrator(meal.cfgCtlr.clock, proto.Name)
 | 
				
			||||||
	qsCompleter, err := queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, nil, proto, requestWaitLimit, reqsGaugePair, execSeatsObs, seatDemandIntegrator)
 | 
						seatDemandRatioedGauge := metrics.ApiserverSeatDemands.NewForLabelValuesSafe(0, 1, []string{proto.Name})
 | 
				
			||||||
 | 
						qsCompleter, err := queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, nil, proto, requestWaitLimit, reqsGaugePair, execSeatsObs, metrics.NewUnionGauge(seatDemandIntegrator, seatDemandRatioedGauge))
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		// This can not happen because proto is one of the mandatory
 | 
							// This can not happen because proto is one of the mandatory
 | 
				
			||||||
		// objects and these are not erroneous
 | 
							// objects and these are not erroneous
 | 
				
			||||||
@@ -855,6 +862,7 @@ func (meal *cfgMeal) imaginePL(proto *flowcontrol.PriorityLevelConfiguration, re
 | 
				
			|||||||
		reqsGaugePair:          reqsGaugePair,
 | 
							reqsGaugePair:          reqsGaugePair,
 | 
				
			||||||
		execSeatsObs:           execSeatsObs,
 | 
							execSeatsObs:           execSeatsObs,
 | 
				
			||||||
		seatDemandIntegrator:   seatDemandIntegrator,
 | 
							seatDemandIntegrator:   seatDemandIntegrator,
 | 
				
			||||||
 | 
							seatDemandRatioedGauge: seatDemandRatioedGauge,
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	if proto.Spec.Limited != nil {
 | 
						if proto.Spec.Limited != nil {
 | 
				
			||||||
		meal.shareSum += float64(proto.Spec.Limited.NominalConcurrencyShares)
 | 
							meal.shareSum += float64(proto.Spec.Limited.NominalConcurrencyShares)
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -105,7 +105,7 @@ type ctlrTestRequest struct {
 | 
				
			|||||||
	descr1, descr2 interface{}
 | 
						descr1, descr2 interface{}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (cts *ctlrTestState) BeginConstruction(qc fq.QueuingConfig, rip metrics.RatioedGaugePair, eso metrics.RatioedGauge, sdi fq.Integrator) (fq.QueueSetCompleter, error) {
 | 
					func (cts *ctlrTestState) BeginConstruction(qc fq.QueuingConfig, rip metrics.RatioedGaugePair, eso metrics.RatioedGauge, sdi metrics.Gauge) (fq.QueueSetCompleter, error) {
 | 
				
			||||||
	return ctlrTestQueueSetCompleter{cts, nil, qc}, nil
 | 
						return ctlrTestQueueSetCompleter{cts, nil, qc}, nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -21,6 +21,8 @@ import (
 | 
				
			|||||||
	"sync"
 | 
						"sync"
 | 
				
			||||||
	"time"
 | 
						"time"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						fcmetrics "k8s.io/apiserver/pkg/util/flowcontrol/metrics"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	"k8s.io/utils/clock"
 | 
						"k8s.io/utils/clock"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -29,8 +31,7 @@ import (
 | 
				
			|||||||
// Integrator is created, and ends at the latest operation on the
 | 
					// Integrator is created, and ends at the latest operation on the
 | 
				
			||||||
// Integrator.
 | 
					// Integrator.
 | 
				
			||||||
type Integrator interface {
 | 
					type Integrator interface {
 | 
				
			||||||
	Set(float64)
 | 
						fcmetrics.Gauge
 | 
				
			||||||
	Add(float64)
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
	GetResults() IntegratorResults
 | 
						GetResults() IntegratorResults
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -77,6 +78,24 @@ func (igr *integrator) Set(x float64) {
 | 
				
			|||||||
	igr.Unlock()
 | 
						igr.Unlock()
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (igr *integrator) Add(deltaX float64) {
 | 
				
			||||||
 | 
						igr.Lock()
 | 
				
			||||||
 | 
						igr.setLocked(igr.x + deltaX)
 | 
				
			||||||
 | 
						igr.Unlock()
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (igr *integrator) Inc() {
 | 
				
			||||||
 | 
						igr.Add(1)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (igr *integrator) Dec() {
 | 
				
			||||||
 | 
						igr.Add(-1)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (igr *integrator) SetToCurrentTime() {
 | 
				
			||||||
 | 
						igr.Set(float64(time.Now().UnixNano()))
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (igr *integrator) setLocked(x float64) {
 | 
					func (igr *integrator) setLocked(x float64) {
 | 
				
			||||||
	igr.updateLocked()
 | 
						igr.updateLocked()
 | 
				
			||||||
	igr.x = x
 | 
						igr.x = x
 | 
				
			||||||
@@ -88,12 +107,6 @@ func (igr *integrator) setLocked(x float64) {
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (igr *integrator) Add(deltaX float64) {
 | 
					 | 
				
			||||||
	igr.Lock()
 | 
					 | 
				
			||||||
	igr.setLocked(igr.x + deltaX)
 | 
					 | 
				
			||||||
	igr.Unlock()
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func (igr *integrator) updateLocked() {
 | 
					func (igr *integrator) updateLocked() {
 | 
				
			||||||
	now := igr.clock.Now()
 | 
						now := igr.clock.Now()
 | 
				
			||||||
	dt := now.Sub(igr.lastTime).Seconds()
 | 
						dt := now.Sub(igr.lastTime).Seconds()
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -35,9 +35,8 @@ type QueueSetFactory interface {
 | 
				
			|||||||
	// The RatioedGaugePair observes number of requests,
 | 
						// The RatioedGaugePair observes number of requests,
 | 
				
			||||||
	// execution covering just the regular phase.
 | 
						// execution covering just the regular phase.
 | 
				
			||||||
	// The RatioedGauge observes number of seats occupied through all phases of execution.
 | 
						// The RatioedGauge observes number of seats occupied through all phases of execution.
 | 
				
			||||||
	// The Integrator observes the seat demand (executing + queued seats), and
 | 
						// The Gauge observes the seat demand (executing + queued seats).
 | 
				
			||||||
	// the queueset does not read or reset the Integrator.
 | 
						BeginConstruction(QueuingConfig, metrics.RatioedGaugePair, metrics.RatioedGauge, metrics.Gauge) (QueueSetCompleter, error)
 | 
				
			||||||
	BeginConstruction(QueuingConfig, metrics.RatioedGaugePair, metrics.RatioedGauge, Integrator) (QueueSetCompleter, error)
 | 
					 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// QueueSetCompleter finishes the two-step process of creating or
 | 
					// QueueSetCompleter finishes the two-step process of creating or
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -63,7 +63,7 @@ type queueSetCompleter struct {
 | 
				
			|||||||
	factory              *queueSetFactory
 | 
						factory              *queueSetFactory
 | 
				
			||||||
	reqsGaugePair        metrics.RatioedGaugePair
 | 
						reqsGaugePair        metrics.RatioedGaugePair
 | 
				
			||||||
	execSeatsGauge       metrics.RatioedGauge
 | 
						execSeatsGauge       metrics.RatioedGauge
 | 
				
			||||||
	seatDemandIntegrator fq.Integrator
 | 
						seatDemandIntegrator metrics.Gauge
 | 
				
			||||||
	theSet               *queueSet
 | 
						theSet               *queueSet
 | 
				
			||||||
	qCfg                 fq.QueuingConfig
 | 
						qCfg                 fq.QueuingConfig
 | 
				
			||||||
	dealer               *shufflesharding.Dealer
 | 
						dealer               *shufflesharding.Dealer
 | 
				
			||||||
@@ -94,7 +94,7 @@ type queueSet struct {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	execSeatsGauge metrics.RatioedGauge // for all phases of execution
 | 
						execSeatsGauge metrics.RatioedGauge // for all phases of execution
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	seatDemandIntegrator fq.Integrator
 | 
						seatDemandIntegrator metrics.Gauge
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	promiseFactory promiseFactory
 | 
						promiseFactory promiseFactory
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -163,7 +163,7 @@ func newTestableQueueSetFactory(c eventclock.Interface, promiseFactoryFactory pr
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (qsf *queueSetFactory) BeginConstruction(qCfg fq.QueuingConfig, reqsGaugePair metrics.RatioedGaugePair, execSeatsGauge metrics.RatioedGauge, seatDemandIntegrator fq.Integrator) (fq.QueueSetCompleter, error) {
 | 
					func (qsf *queueSetFactory) BeginConstruction(qCfg fq.QueuingConfig, reqsGaugePair metrics.RatioedGaugePair, execSeatsGauge metrics.RatioedGauge, seatDemandIntegrator metrics.Gauge) (fq.QueueSetCompleter, error) {
 | 
				
			||||||
	dealer, err := checkConfig(qCfg)
 | 
						dealer, err := checkConfig(qCfg)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		return nil, err
 | 
							return nil, err
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -40,7 +40,7 @@ type noRestraint struct{}
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
type noRestraintRequest struct{}
 | 
					type noRestraintRequest struct{}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (noRestraintFactory) BeginConstruction(fq.QueuingConfig, metrics.RatioedGaugePair, metrics.RatioedGauge, fq.Integrator) (fq.QueueSetCompleter, error) {
 | 
					func (noRestraintFactory) BeginConstruction(fq.QueuingConfig, metrics.RatioedGaugePair, metrics.RatioedGauge, metrics.Gauge) (fq.QueueSetCompleter, error) {
 | 
				
			||||||
	return noRestraintCompleter{}, nil
 | 
						return noRestraintCompleter{}, nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -65,12 +65,13 @@ type resettable interface {
 | 
				
			|||||||
	Reset()
 | 
						Reset()
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// Reset all metrics to zero
 | 
					// Reset all resettable metrics to zero
 | 
				
			||||||
func Reset() {
 | 
					func Reset() {
 | 
				
			||||||
	for _, metric := range metrics {
 | 
						for _, metric := range metrics {
 | 
				
			||||||
		rm := metric.(resettable)
 | 
							if rm, ok := metric.(resettable); ok {
 | 
				
			||||||
			rm.Reset()
 | 
								rm.Reset()
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// GatherAndCompare the given metrics with the given Prometheus syntax expected value
 | 
					// GatherAndCompare the given metrics with the given Prometheus syntax expected value
 | 
				
			||||||
@@ -316,6 +317,120 @@ var (
 | 
				
			|||||||
		},
 | 
							},
 | 
				
			||||||
		[]string{priorityLevel, flowSchema},
 | 
							[]string{priorityLevel, flowSchema},
 | 
				
			||||||
	)
 | 
						)
 | 
				
			||||||
 | 
						apiserverNominalConcurrencyLimits = compbasemetrics.NewGaugeVec(
 | 
				
			||||||
 | 
							&compbasemetrics.GaugeOpts{
 | 
				
			||||||
 | 
								Namespace:      namespace,
 | 
				
			||||||
 | 
								Subsystem:      subsystem,
 | 
				
			||||||
 | 
								Name:           "nominal_limit_seats",
 | 
				
			||||||
 | 
								Help:           "Nominal number of execution seats configured for each priority level",
 | 
				
			||||||
 | 
								StabilityLevel: compbasemetrics.ALPHA,
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							[]string{priorityLevel},
 | 
				
			||||||
 | 
						)
 | 
				
			||||||
 | 
						apiserverMinimumConcurrencyLimits = compbasemetrics.NewGaugeVec(
 | 
				
			||||||
 | 
							&compbasemetrics.GaugeOpts{
 | 
				
			||||||
 | 
								Namespace:      namespace,
 | 
				
			||||||
 | 
								Subsystem:      subsystem,
 | 
				
			||||||
 | 
								Name:           "lower_limit_seats",
 | 
				
			||||||
 | 
								Help:           "Configured lower bound on number of execution seats available to each priority level",
 | 
				
			||||||
 | 
								StabilityLevel: compbasemetrics.ALPHA,
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							[]string{priorityLevel},
 | 
				
			||||||
 | 
						)
 | 
				
			||||||
 | 
						apiserverMaximumConcurrencyLimits = compbasemetrics.NewGaugeVec(
 | 
				
			||||||
 | 
							&compbasemetrics.GaugeOpts{
 | 
				
			||||||
 | 
								Namespace:      namespace,
 | 
				
			||||||
 | 
								Subsystem:      subsystem,
 | 
				
			||||||
 | 
								Name:           "upper_limit_seats",
 | 
				
			||||||
 | 
								Help:           "Configured upper bound on number of execution seats available to each priority level",
 | 
				
			||||||
 | 
								StabilityLevel: compbasemetrics.ALPHA,
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							[]string{priorityLevel},
 | 
				
			||||||
 | 
						)
 | 
				
			||||||
 | 
						ApiserverSeatDemands = NewTimingRatioHistogramVec(
 | 
				
			||||||
 | 
							&compbasemetrics.TimingHistogramOpts{
 | 
				
			||||||
 | 
								Namespace: namespace,
 | 
				
			||||||
 | 
								Subsystem: subsystem,
 | 
				
			||||||
 | 
								Name:      "demand_seats",
 | 
				
			||||||
 | 
								Help:      "Observations, at the end of every nanosecond, of (the number of seats each priority level could use) / (nominal number of seats for that level)",
 | 
				
			||||||
 | 
								// Rationale for the bucket boundaries:
 | 
				
			||||||
 | 
								// For 0--1, evenly spaced and not too many;
 | 
				
			||||||
 | 
								// For 1--2, roughly powers of sqrt(sqrt(2));
 | 
				
			||||||
 | 
								// For 2--6, roughly powers of sqrt(2);
 | 
				
			||||||
 | 
								// We need coverage over 1, but do not want too many buckets.
 | 
				
			||||||
 | 
								Buckets:        []float64{0.2, 0.4, 0.6, 0.8, 1, 1.2, 1.4, 1.7, 2, 2.8, 4, 6},
 | 
				
			||||||
 | 
								StabilityLevel: compbasemetrics.ALPHA,
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							priorityLevel,
 | 
				
			||||||
 | 
						)
 | 
				
			||||||
 | 
						apiserverSeatDemandHighWatermarks = compbasemetrics.NewGaugeVec(
 | 
				
			||||||
 | 
							&compbasemetrics.GaugeOpts{
 | 
				
			||||||
 | 
								Namespace:      namespace,
 | 
				
			||||||
 | 
								Subsystem:      subsystem,
 | 
				
			||||||
 | 
								Name:           "demand_seats_high_watermark",
 | 
				
			||||||
 | 
								Help:           "High watermark, over last adjustment period, of demand_seats",
 | 
				
			||||||
 | 
								StabilityLevel: compbasemetrics.ALPHA,
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							[]string{priorityLevel},
 | 
				
			||||||
 | 
						)
 | 
				
			||||||
 | 
						apiserverSeatDemandAverages = compbasemetrics.NewGaugeVec(
 | 
				
			||||||
 | 
							&compbasemetrics.GaugeOpts{
 | 
				
			||||||
 | 
								Namespace:      namespace,
 | 
				
			||||||
 | 
								Subsystem:      subsystem,
 | 
				
			||||||
 | 
								Name:           "demand_seats_average",
 | 
				
			||||||
 | 
								Help:           "Time-weighted average, over last adjustment period, of demand_seats",
 | 
				
			||||||
 | 
								StabilityLevel: compbasemetrics.ALPHA,
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							[]string{priorityLevel},
 | 
				
			||||||
 | 
						)
 | 
				
			||||||
 | 
						apiserverSeatDemandStandardDeviations = compbasemetrics.NewGaugeVec(
 | 
				
			||||||
 | 
							&compbasemetrics.GaugeOpts{
 | 
				
			||||||
 | 
								Namespace:      namespace,
 | 
				
			||||||
 | 
								Subsystem:      subsystem,
 | 
				
			||||||
 | 
								Name:           "demand_seats_stdev",
 | 
				
			||||||
 | 
								Help:           "Time-weighted standard deviation, over last adjustment period, of demand_seats",
 | 
				
			||||||
 | 
								StabilityLevel: compbasemetrics.ALPHA,
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							[]string{priorityLevel},
 | 
				
			||||||
 | 
						)
 | 
				
			||||||
 | 
						apiserverSeatDemandSmootheds = compbasemetrics.NewGaugeVec(
 | 
				
			||||||
 | 
							&compbasemetrics.GaugeOpts{
 | 
				
			||||||
 | 
								Namespace:      namespace,
 | 
				
			||||||
 | 
								Subsystem:      subsystem,
 | 
				
			||||||
 | 
								Name:           "demand_seats_smoothed",
 | 
				
			||||||
 | 
								Help:           "Smoothed seat demands",
 | 
				
			||||||
 | 
								StabilityLevel: compbasemetrics.ALPHA,
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							[]string{priorityLevel},
 | 
				
			||||||
 | 
						)
 | 
				
			||||||
 | 
						apiserverSeatDemandTargets = compbasemetrics.NewGaugeVec(
 | 
				
			||||||
 | 
							&compbasemetrics.GaugeOpts{
 | 
				
			||||||
 | 
								Namespace:      namespace,
 | 
				
			||||||
 | 
								Subsystem:      subsystem,
 | 
				
			||||||
 | 
								Name:           "target_seats",
 | 
				
			||||||
 | 
								Help:           "Seat allocation targets",
 | 
				
			||||||
 | 
								StabilityLevel: compbasemetrics.ALPHA,
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							[]string{priorityLevel},
 | 
				
			||||||
 | 
						)
 | 
				
			||||||
 | 
						apiserverFairFracs = compbasemetrics.NewGauge(
 | 
				
			||||||
 | 
							&compbasemetrics.GaugeOpts{
 | 
				
			||||||
 | 
								Namespace:      namespace,
 | 
				
			||||||
 | 
								Subsystem:      subsystem,
 | 
				
			||||||
 | 
								Name:           "seat_fair_frac",
 | 
				
			||||||
 | 
								Help:           "Fair fraction of server's concurrency to allocate to each priority level that can use it",
 | 
				
			||||||
 | 
								StabilityLevel: compbasemetrics.ALPHA,
 | 
				
			||||||
 | 
							})
 | 
				
			||||||
 | 
						apiserverCurrentConcurrencyLimits = compbasemetrics.NewGaugeVec(
 | 
				
			||||||
 | 
							&compbasemetrics.GaugeOpts{
 | 
				
			||||||
 | 
								Namespace:      namespace,
 | 
				
			||||||
 | 
								Subsystem:      subsystem,
 | 
				
			||||||
 | 
								Name:           "current_limit_seats",
 | 
				
			||||||
 | 
								Help:           "current derived number of execution seats available to each priority level",
 | 
				
			||||||
 | 
								StabilityLevel: compbasemetrics.ALPHA,
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							[]string{priorityLevel},
 | 
				
			||||||
 | 
						)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	metrics = Registerables{
 | 
						metrics = Registerables{
 | 
				
			||||||
		apiserverRejectedRequestsTotal,
 | 
							apiserverRejectedRequestsTotal,
 | 
				
			||||||
@@ -336,10 +451,21 @@ var (
 | 
				
			|||||||
		apiserverEpochAdvances,
 | 
							apiserverEpochAdvances,
 | 
				
			||||||
		apiserverWorkEstimatedSeats,
 | 
							apiserverWorkEstimatedSeats,
 | 
				
			||||||
		apiserverDispatchWithNoAccommodation,
 | 
							apiserverDispatchWithNoAccommodation,
 | 
				
			||||||
 | 
							apiserverNominalConcurrencyLimits,
 | 
				
			||||||
 | 
							apiserverMinimumConcurrencyLimits,
 | 
				
			||||||
 | 
							apiserverMaximumConcurrencyLimits,
 | 
				
			||||||
 | 
							apiserverSeatDemandHighWatermarks,
 | 
				
			||||||
 | 
							apiserverSeatDemandAverages,
 | 
				
			||||||
 | 
							apiserverSeatDemandStandardDeviations,
 | 
				
			||||||
 | 
							apiserverSeatDemandSmootheds,
 | 
				
			||||||
 | 
							apiserverSeatDemandTargets,
 | 
				
			||||||
 | 
							apiserverFairFracs,
 | 
				
			||||||
 | 
							apiserverCurrentConcurrencyLimits,
 | 
				
			||||||
	}.
 | 
						}.
 | 
				
			||||||
		Append(PriorityLevelExecutionSeatsGaugeVec.metrics()...).
 | 
							Append(PriorityLevelExecutionSeatsGaugeVec.metrics()...).
 | 
				
			||||||
		Append(PriorityLevelConcurrencyGaugeVec.metrics()...).
 | 
							Append(PriorityLevelConcurrencyGaugeVec.metrics()...).
 | 
				
			||||||
		Append(readWriteConcurrencyGaugeVec.metrics()...)
 | 
							Append(readWriteConcurrencyGaugeVec.metrics()...).
 | 
				
			||||||
 | 
							Append(ApiserverSeatDemands.metrics()...)
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
type indexOnce struct {
 | 
					type indexOnce struct {
 | 
				
			||||||
@@ -457,3 +583,23 @@ func ObserveWorkEstimatedSeats(priorityLevel, flowSchema string, seats int) {
 | 
				
			|||||||
func AddDispatchWithNoAccommodation(priorityLevel, flowSchema string) {
 | 
					func AddDispatchWithNoAccommodation(priorityLevel, flowSchema string) {
 | 
				
			||||||
	apiserverDispatchWithNoAccommodation.WithLabelValues(priorityLevel, flowSchema).Inc()
 | 
						apiserverDispatchWithNoAccommodation.WithLabelValues(priorityLevel, flowSchema).Inc()
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func SetPriorityLevelConfiguration(priorityLevel string, nominalCL, minCL, maxCL int) {
 | 
				
			||||||
 | 
						apiserverNominalConcurrencyLimits.WithLabelValues(priorityLevel).Set(float64(nominalCL))
 | 
				
			||||||
 | 
						apiserverMinimumConcurrencyLimits.WithLabelValues(priorityLevel).Set(float64(minCL))
 | 
				
			||||||
 | 
						apiserverMaximumConcurrencyLimits.WithLabelValues(priorityLevel).Set(float64(maxCL))
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func NotePriorityLevelConcurrencyAdjustment(priorityLevel string, seatDemandHWM, seatDemandAvg, seatDemandStdev, seatDemandSmoothed float64 /* TODO: seatDemandTarget float64, currentCL int */) {
 | 
				
			||||||
 | 
						apiserverSeatDemandHighWatermarks.WithLabelValues(priorityLevel).Set(seatDemandHWM)
 | 
				
			||||||
 | 
						apiserverSeatDemandAverages.WithLabelValues(priorityLevel).Set(seatDemandAvg)
 | 
				
			||||||
 | 
						apiserverSeatDemandStandardDeviations.WithLabelValues(priorityLevel).Set(seatDemandStdev)
 | 
				
			||||||
 | 
						apiserverSeatDemandSmootheds.WithLabelValues(priorityLevel).Set(seatDemandSmoothed)
 | 
				
			||||||
 | 
						// TODO: the following once new API is available
 | 
				
			||||||
 | 
						// apiserverSeatDemandTargets.WithLabelValues(priorityLevel).Set(seatDemandTarget)
 | 
				
			||||||
 | 
						// apiserverCurrentConcurrencyLimits.WithLabelValues(priorityLevel).Set(currentCL)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func SetFairFrac(fairFrac float64) {
 | 
				
			||||||
 | 
						apiserverFairFracs.Set(fairFrac)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -0,0 +1,56 @@
 | 
				
			|||||||
 | 
					/*
 | 
				
			||||||
 | 
					Copyright 2022 The Kubernetes Authors.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					Licensed under the Apache License, Version 2.0 (the "License");
 | 
				
			||||||
 | 
					you may not use this file except in compliance with the License.
 | 
				
			||||||
 | 
					You may obtain a copy of the License at
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    http://www.apache.org/licenses/LICENSE-2.0
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					Unless required by applicable law or agreed to in writing, software
 | 
				
			||||||
 | 
					distributed under the License is distributed on an "AS IS" BASIS,
 | 
				
			||||||
 | 
					WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
				
			||||||
 | 
					See the License for the specific language governing permissions and
 | 
				
			||||||
 | 
					limitations under the License.
 | 
				
			||||||
 | 
					*/
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					package metrics
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					type unionGauge []Gauge
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					var _ Gauge = unionGauge(nil)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// NewUnionGauge constructs a Gauge that delegates to all of the given Gauges
 | 
				
			||||||
 | 
					func NewUnionGauge(elts ...Gauge) Gauge {
 | 
				
			||||||
 | 
						return unionGauge(elts)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (ug unionGauge) Set(x float64) {
 | 
				
			||||||
 | 
						for _, gauge := range ug {
 | 
				
			||||||
 | 
							gauge.Set(x)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (ug unionGauge) Add(x float64) {
 | 
				
			||||||
 | 
						for _, gauge := range ug {
 | 
				
			||||||
 | 
							gauge.Add(x)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (ug unionGauge) Inc() {
 | 
				
			||||||
 | 
						for _, gauge := range ug {
 | 
				
			||||||
 | 
							gauge.Inc()
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (ug unionGauge) Dec() {
 | 
				
			||||||
 | 
						for _, gauge := range ug {
 | 
				
			||||||
 | 
							gauge.Dec()
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (ug unionGauge) SetToCurrentTime() {
 | 
				
			||||||
 | 
						for _, gauge := range ug {
 | 
				
			||||||
 | 
							gauge.SetToCurrentTime()
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
		Reference in New Issue
	
	Block a user