Merge pull request #91177 from MikeSpreitzer/more-concurrency-details
Introduce more metrics on concurrency
This commit is contained in:
@@ -122,7 +122,7 @@ var (
|
||||
Help: "Number of requests dropped with 'Try again later' response",
|
||||
StabilityLevel: compbasemetrics.ALPHA,
|
||||
},
|
||||
[]string{"requestKind"},
|
||||
[]string{"request_kind"},
|
||||
)
|
||||
// TLSHandshakeErrors is a number of requests dropped with 'TLS handshake error from' error
|
||||
TLSHandshakeErrors = compbasemetrics.NewCounter(
|
||||
@@ -166,7 +166,15 @@ var (
|
||||
Help: "Maximal number of currently used inflight request limit of this apiserver per request kind in last second.",
|
||||
StabilityLevel: compbasemetrics.ALPHA,
|
||||
},
|
||||
[]string{"requestKind"},
|
||||
[]string{"request_kind"},
|
||||
)
|
||||
currentInqueueRequests = compbasemetrics.NewGaugeVec(
|
||||
&compbasemetrics.GaugeOpts{
|
||||
Name: "apiserver_current_inqueue_requests",
|
||||
Help: "Maximal number of queued requests in this apiserver per request kind in last second.",
|
||||
StabilityLevel: compbasemetrics.ALPHA,
|
||||
},
|
||||
[]string{"request_kind"},
|
||||
)
|
||||
|
||||
requestTerminationsTotal = compbasemetrics.NewCounterVec(
|
||||
@@ -191,6 +199,7 @@ var (
|
||||
WatchEvents,
|
||||
WatchEventsSizes,
|
||||
currentInflightRequests,
|
||||
currentInqueueRequests,
|
||||
requestTerminationsTotal,
|
||||
}
|
||||
|
||||
@@ -231,6 +240,11 @@ const (
|
||||
ReadOnlyKind = "readOnly"
|
||||
// MutatingKind is a string identifying mutating request kind
|
||||
MutatingKind = "mutating"
|
||||
|
||||
// WaitingPhase is the phase value for a request waiting in a queue
|
||||
WaitingPhase = "waiting"
|
||||
// ExecutingPhase is the phase value for an executing request
|
||||
ExecutingPhase = "executing"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -261,9 +275,19 @@ func Reset() {
|
||||
}
|
||||
}
|
||||
|
||||
func UpdateInflightRequestMetrics(nonmutating, mutating int) {
|
||||
currentInflightRequests.WithLabelValues(ReadOnlyKind).Set(float64(nonmutating))
|
||||
currentInflightRequests.WithLabelValues(MutatingKind).Set(float64(mutating))
|
||||
// UpdateInflightRequestMetrics reports concurrency metrics classified by
|
||||
// mutating vs Readonly.
|
||||
func UpdateInflightRequestMetrics(phase string, nonmutating, mutating int) {
|
||||
for _, kc := range []struct {
|
||||
kind string
|
||||
count int
|
||||
}{{ReadOnlyKind, nonmutating}, {MutatingKind, mutating}} {
|
||||
if phase == ExecutingPhase {
|
||||
currentInflightRequests.WithLabelValues(kc.kind).Set(float64(kc.count))
|
||||
} else {
|
||||
currentInqueueRequests.WithLabelValues(kc.kind).Set(float64(kc.count))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// RecordRequestTermination records that the request was terminated early as part of a resource
|
||||
|
@@ -62,6 +62,7 @@ go_library(
|
||||
"//staging/src/k8s.io/apiserver/pkg/endpoints/request:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/server/httplog:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library",
|
||||
"//vendor/k8s.io/klog/v2:go_default_library",
|
||||
],
|
||||
|
@@ -27,6 +27,7 @@ import (
|
||||
"k8s.io/apiserver/pkg/authentication/user"
|
||||
"k8s.io/apiserver/pkg/endpoints/metrics"
|
||||
apirequest "k8s.io/apiserver/pkg/endpoints/request"
|
||||
fcmetrics "k8s.io/apiserver/pkg/util/flowcontrol/metrics"
|
||||
|
||||
"k8s.io/klog/v2"
|
||||
)
|
||||
@@ -50,13 +51,17 @@ func handleError(w http.ResponseWriter, r *http.Request, err error) {
|
||||
klog.Errorf(err.Error())
|
||||
}
|
||||
|
||||
// requestWatermark is used to trak maximal usage of inflight requests.
|
||||
// requestWatermark is used to track maximal numbers of requests in a particular phase of handling
|
||||
type requestWatermark struct {
|
||||
phase string
|
||||
readOnlyObserver, mutatingObserver fcmetrics.TimedObserver
|
||||
lock sync.Mutex
|
||||
readOnlyWatermark, mutatingWatermark int
|
||||
}
|
||||
|
||||
func (w *requestWatermark) recordMutating(mutatingVal int) {
|
||||
w.mutatingObserver.Set(float64(mutatingVal))
|
||||
|
||||
w.lock.Lock()
|
||||
defer w.lock.Unlock()
|
||||
|
||||
@@ -66,6 +71,8 @@ func (w *requestWatermark) recordMutating(mutatingVal int) {
|
||||
}
|
||||
|
||||
func (w *requestWatermark) recordReadOnly(readOnlyVal int) {
|
||||
w.readOnlyObserver.Set(float64(readOnlyVal))
|
||||
|
||||
w.lock.Lock()
|
||||
defer w.lock.Unlock()
|
||||
|
||||
@@ -74,9 +81,14 @@ func (w *requestWatermark) recordReadOnly(readOnlyVal int) {
|
||||
}
|
||||
}
|
||||
|
||||
var watermark = &requestWatermark{}
|
||||
// watermark tracks requests being executed (not waiting in a queue)
|
||||
var watermark = &requestWatermark{
|
||||
phase: metrics.ExecutingPhase,
|
||||
readOnlyObserver: fcmetrics.ReadWriteConcurrencyObserverPairGenerator.Generate(1, 1, []string{metrics.ReadOnlyKind}).RequestsExecuting,
|
||||
mutatingObserver: fcmetrics.ReadWriteConcurrencyObserverPairGenerator.Generate(1, 1, []string{metrics.MutatingKind}).RequestsExecuting,
|
||||
}
|
||||
|
||||
func startRecordingUsage() {
|
||||
func startRecordingUsage(watermark *requestWatermark) {
|
||||
go func() {
|
||||
wait.Forever(func() {
|
||||
watermark.lock.Lock()
|
||||
@@ -86,7 +98,7 @@ func startRecordingUsage() {
|
||||
watermark.mutatingWatermark = 0
|
||||
watermark.lock.Unlock()
|
||||
|
||||
metrics.UpdateInflightRequestMetrics(readOnlyWatermark, mutatingWatermark)
|
||||
metrics.UpdateInflightRequestMetrics(watermark.phase, readOnlyWatermark, mutatingWatermark)
|
||||
}, inflightUsageMetricUpdatePeriod)
|
||||
}()
|
||||
}
|
||||
@@ -100,7 +112,7 @@ func WithMaxInFlightLimit(
|
||||
mutatingLimit int,
|
||||
longRunningRequestCheck apirequest.LongRunningRequestCheck,
|
||||
) http.Handler {
|
||||
startOnce.Do(startRecordingUsage)
|
||||
startOnce.Do(func() { startRecordingUsage(watermark) })
|
||||
if nonMutatingLimit == 0 && mutatingLimit == 0 {
|
||||
return handler
|
||||
}
|
||||
@@ -108,9 +120,11 @@ func WithMaxInFlightLimit(
|
||||
var mutatingChan chan bool
|
||||
if nonMutatingLimit != 0 {
|
||||
nonMutatingChan = make(chan bool, nonMutatingLimit)
|
||||
watermark.readOnlyObserver.SetX1(float64(nonMutatingLimit))
|
||||
}
|
||||
if mutatingLimit != 0 {
|
||||
mutatingChan = make(chan bool, mutatingLimit)
|
||||
watermark.mutatingObserver.SetX1(float64(mutatingLimit))
|
||||
}
|
||||
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
@@ -141,21 +155,22 @@ func WithMaxInFlightLimit(
|
||||
|
||||
select {
|
||||
case c <- true:
|
||||
var mutatingLen, readOnlyLen int
|
||||
// We note the concurrency level both while the
|
||||
// request is being served and after it is done being
|
||||
// served, because both states contribute to the
|
||||
// sampled stats on concurrency.
|
||||
if isMutatingRequest {
|
||||
mutatingLen = len(mutatingChan)
|
||||
watermark.recordMutating(len(c))
|
||||
} else {
|
||||
readOnlyLen = len(nonMutatingChan)
|
||||
watermark.recordReadOnly(len(c))
|
||||
}
|
||||
|
||||
defer func() {
|
||||
<-c
|
||||
if isMutatingRequest {
|
||||
watermark.recordMutating(mutatingLen)
|
||||
watermark.recordMutating(len(c))
|
||||
} else {
|
||||
watermark.recordReadOnly(readOnlyLen)
|
||||
watermark.recordReadOnly(len(c))
|
||||
}
|
||||
|
||||
}()
|
||||
handler.ServeHTTP(w, r)
|
||||
|
||||
|
@@ -24,8 +24,10 @@ import (
|
||||
|
||||
fcv1a1 "k8s.io/api/flowcontrol/v1alpha1"
|
||||
apitypes "k8s.io/apimachinery/pkg/types"
|
||||
epmetrics "k8s.io/apiserver/pkg/endpoints/metrics"
|
||||
apirequest "k8s.io/apiserver/pkg/endpoints/request"
|
||||
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
|
||||
fcmetrics "k8s.io/apiserver/pkg/util/flowcontrol/metrics"
|
||||
"k8s.io/klog/v2"
|
||||
)
|
||||
|
||||
@@ -53,7 +55,15 @@ func GetClassification(ctx context.Context) *PriorityAndFairnessClassification {
|
||||
return ctx.Value(priorityAndFairnessKey).(*PriorityAndFairnessClassification)
|
||||
}
|
||||
|
||||
var atomicMutatingLen, atomicNonMutatingLen int32
|
||||
// waitingMark tracks requests waiting rather than being executed
|
||||
var waitingMark = &requestWatermark{
|
||||
phase: epmetrics.WaitingPhase,
|
||||
readOnlyObserver: fcmetrics.ReadWriteConcurrencyObserverPairGenerator.Generate(1, 1, []string{epmetrics.ReadOnlyKind}).RequestsWaiting,
|
||||
mutatingObserver: fcmetrics.ReadWriteConcurrencyObserverPairGenerator.Generate(1, 1, []string{epmetrics.MutatingKind}).RequestsWaiting,
|
||||
}
|
||||
|
||||
var atomicMutatingExecuting, atomicReadOnlyExecuting int32
|
||||
var atomicMutatingWaiting, atomicReadOnlyWaiting int32
|
||||
|
||||
// WithPriorityAndFairness limits the number of in-flight
|
||||
// requests in a fine-grained way.
|
||||
@@ -66,7 +76,10 @@ func WithPriorityAndFairness(
|
||||
klog.Warningf("priority and fairness support not found, skipping")
|
||||
return handler
|
||||
}
|
||||
startOnce.Do(startRecordingUsage)
|
||||
startOnce.Do(func() {
|
||||
startRecordingUsage(watermark)
|
||||
startRecordingUsage(waitingMark)
|
||||
})
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := r.Context()
|
||||
requestInfo, ok := apirequest.RequestInfoFrom(ctx)
|
||||
@@ -98,22 +111,23 @@ func WithPriorityAndFairness(
|
||||
|
||||
var served bool
|
||||
isMutatingRequest := !nonMutatingRequestVerbs.Has(requestInfo.Verb)
|
||||
noteExecutingDelta := func(delta int32) {
|
||||
if isMutatingRequest {
|
||||
watermark.recordMutating(int(atomic.AddInt32(&atomicMutatingExecuting, delta)))
|
||||
} else {
|
||||
watermark.recordReadOnly(int(atomic.AddInt32(&atomicReadOnlyExecuting, delta)))
|
||||
}
|
||||
}
|
||||
noteWaitingDelta := func(delta int32) {
|
||||
if isMutatingRequest {
|
||||
waitingMark.recordMutating(int(atomic.AddInt32(&atomicMutatingWaiting, delta)))
|
||||
} else {
|
||||
waitingMark.recordReadOnly(int(atomic.AddInt32(&atomicReadOnlyWaiting, delta)))
|
||||
}
|
||||
}
|
||||
execute := func() {
|
||||
var mutatingLen, readOnlyLen int
|
||||
if isMutatingRequest {
|
||||
mutatingLen = int(atomic.AddInt32(&atomicMutatingLen, 1))
|
||||
} else {
|
||||
readOnlyLen = int(atomic.AddInt32(&atomicNonMutatingLen, 1))
|
||||
}
|
||||
defer func() {
|
||||
if isMutatingRequest {
|
||||
atomic.AddInt32(&atomicMutatingLen, -11)
|
||||
watermark.recordMutating(mutatingLen)
|
||||
} else {
|
||||
atomic.AddInt32(&atomicNonMutatingLen, -1)
|
||||
watermark.recordReadOnly(readOnlyLen)
|
||||
}
|
||||
}()
|
||||
noteExecutingDelta(1)
|
||||
defer noteExecutingDelta(-1)
|
||||
served = true
|
||||
innerCtx := context.WithValue(ctx, priorityAndFairnessKey, classification)
|
||||
innerReq := r.Clone(innerCtx)
|
||||
@@ -122,10 +136,15 @@ func WithPriorityAndFairness(
|
||||
handler.ServeHTTP(w, innerReq)
|
||||
}
|
||||
digest := utilflowcontrol.RequestDigest{requestInfo, user}
|
||||
fcIfc.Handle(ctx, digest, note, execute)
|
||||
fcIfc.Handle(ctx, digest, note, func(inQueue bool) {
|
||||
if inQueue {
|
||||
noteWaitingDelta(1)
|
||||
} else {
|
||||
noteWaitingDelta(-1)
|
||||
}
|
||||
}, execute)
|
||||
if !served {
|
||||
tooManyRequests(r, w)
|
||||
return
|
||||
}
|
||||
|
||||
})
|
||||
|
@@ -85,6 +85,7 @@ go_test(
|
||||
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/format:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/informers:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/kubernetes/typed/flowcontrol/v1alpha1:go_default_library",
|
||||
|
@@ -85,6 +85,7 @@ type RequestDigest struct {
|
||||
// "Locked" means that the caller must hold the configController lock.
|
||||
type configController struct {
|
||||
queueSetFactory fq.QueueSetFactory
|
||||
obsPairGenerator metrics.TimedObserverPairGenerator
|
||||
|
||||
// configQueue holds `(interface{})(0)` when the configuration
|
||||
// objects need to be reprocessed.
|
||||
@@ -144,6 +145,9 @@ type priorityLevelState struct {
|
||||
// number of goroutines between Controller::Match and calling the
|
||||
// returned StartFunction
|
||||
numPending int
|
||||
|
||||
// Observers tracking number waiting, executing
|
||||
obsPair metrics.TimedObserverPair
|
||||
}
|
||||
|
||||
// NewTestableController is extra flexible to facilitate testing
|
||||
@@ -152,104 +156,106 @@ func newTestableController(
|
||||
flowcontrolClient fcclientv1a1.FlowcontrolV1alpha1Interface,
|
||||
serverConcurrencyLimit int,
|
||||
requestWaitLimit time.Duration,
|
||||
obsPairGenerator metrics.TimedObserverPairGenerator,
|
||||
queueSetFactory fq.QueueSetFactory,
|
||||
) *configController {
|
||||
cfgCtl := &configController{
|
||||
cfgCtlr := &configController{
|
||||
queueSetFactory: queueSetFactory,
|
||||
obsPairGenerator: obsPairGenerator,
|
||||
serverConcurrencyLimit: serverConcurrencyLimit,
|
||||
requestWaitLimit: requestWaitLimit,
|
||||
flowcontrolClient: flowcontrolClient,
|
||||
priorityLevelStates: make(map[string]*priorityLevelState),
|
||||
}
|
||||
klog.V(2).Infof("NewTestableController with serverConcurrencyLimit=%d, requestWaitLimit=%s", serverConcurrencyLimit, requestWaitLimit)
|
||||
cfgCtl.initializeConfigController(informerFactory)
|
||||
cfgCtlr.initializeConfigController(informerFactory)
|
||||
// ensure the data structure reflects the mandatory config
|
||||
cfgCtl.lockAndDigestConfigObjects(nil, nil)
|
||||
return cfgCtl
|
||||
cfgCtlr.lockAndDigestConfigObjects(nil, nil)
|
||||
return cfgCtlr
|
||||
}
|
||||
|
||||
// initializeConfigController sets up the controller that processes
|
||||
// config API objects.
|
||||
func (cfgCtl *configController) initializeConfigController(informerFactory kubeinformers.SharedInformerFactory) {
|
||||
cfgCtl.configQueue = workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(200*time.Millisecond, 8*time.Hour), "priority_and_fairness_config_queue")
|
||||
func (cfgCtlr *configController) initializeConfigController(informerFactory kubeinformers.SharedInformerFactory) {
|
||||
cfgCtlr.configQueue = workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(200*time.Millisecond, 8*time.Hour), "priority_and_fairness_config_queue")
|
||||
fci := informerFactory.Flowcontrol().V1alpha1()
|
||||
pli := fci.PriorityLevelConfigurations()
|
||||
fsi := fci.FlowSchemas()
|
||||
cfgCtl.plLister = pli.Lister()
|
||||
cfgCtl.plInformerSynced = pli.Informer().HasSynced
|
||||
cfgCtl.fsLister = fsi.Lister()
|
||||
cfgCtl.fsInformerSynced = fsi.Informer().HasSynced
|
||||
cfgCtlr.plLister = pli.Lister()
|
||||
cfgCtlr.plInformerSynced = pli.Informer().HasSynced
|
||||
cfgCtlr.fsLister = fsi.Lister()
|
||||
cfgCtlr.fsInformerSynced = fsi.Informer().HasSynced
|
||||
pli.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj interface{}) {
|
||||
pl := obj.(*fctypesv1a1.PriorityLevelConfiguration)
|
||||
klog.V(7).Infof("Triggered API priority and fairness config reloading due to creation of PLC %s", pl.Name)
|
||||
cfgCtl.configQueue.Add(0)
|
||||
cfgCtlr.configQueue.Add(0)
|
||||
},
|
||||
UpdateFunc: func(oldObj, newObj interface{}) {
|
||||
newPL := newObj.(*fctypesv1a1.PriorityLevelConfiguration)
|
||||
oldPL := oldObj.(*fctypesv1a1.PriorityLevelConfiguration)
|
||||
if !apiequality.Semantic.DeepEqual(oldPL.Spec, newPL.Spec) {
|
||||
klog.V(7).Infof("Triggered API priority and fairness config reloading due to spec update of PLC %s", newPL.Name)
|
||||
cfgCtl.configQueue.Add(0)
|
||||
cfgCtlr.configQueue.Add(0)
|
||||
}
|
||||
},
|
||||
DeleteFunc: func(obj interface{}) {
|
||||
name, _ := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
|
||||
klog.V(7).Infof("Triggered API priority and fairness config reloading due to deletion of PLC %s", name)
|
||||
cfgCtl.configQueue.Add(0)
|
||||
cfgCtlr.configQueue.Add(0)
|
||||
|
||||
}})
|
||||
fsi.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj interface{}) {
|
||||
fs := obj.(*fctypesv1a1.FlowSchema)
|
||||
klog.V(7).Infof("Triggered API priority and fairness config reloading due to creation of FS %s", fs.Name)
|
||||
cfgCtl.configQueue.Add(0)
|
||||
cfgCtlr.configQueue.Add(0)
|
||||
},
|
||||
UpdateFunc: func(oldObj, newObj interface{}) {
|
||||
newFS := newObj.(*fctypesv1a1.FlowSchema)
|
||||
oldFS := oldObj.(*fctypesv1a1.FlowSchema)
|
||||
if !apiequality.Semantic.DeepEqual(oldFS.Spec, newFS.Spec) {
|
||||
klog.V(7).Infof("Triggered API priority and fairness config reloading due to spec update of FS %s", newFS.Name)
|
||||
cfgCtl.configQueue.Add(0)
|
||||
cfgCtlr.configQueue.Add(0)
|
||||
}
|
||||
},
|
||||
DeleteFunc: func(obj interface{}) {
|
||||
name, _ := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
|
||||
klog.V(7).Infof("Triggered API priority and fairness config reloading due to deletion of FS %s", name)
|
||||
cfgCtl.configQueue.Add(0)
|
||||
cfgCtlr.configQueue.Add(0)
|
||||
|
||||
}})
|
||||
}
|
||||
|
||||
func (cfgCtl *configController) Run(stopCh <-chan struct{}) error {
|
||||
defer cfgCtl.configQueue.ShutDown()
|
||||
func (cfgCtlr *configController) Run(stopCh <-chan struct{}) error {
|
||||
defer cfgCtlr.configQueue.ShutDown()
|
||||
klog.Info("Starting API Priority and Fairness config controller")
|
||||
if ok := cache.WaitForCacheSync(stopCh, cfgCtl.plInformerSynced, cfgCtl.fsInformerSynced); !ok {
|
||||
if ok := cache.WaitForCacheSync(stopCh, cfgCtlr.plInformerSynced, cfgCtlr.fsInformerSynced); !ok {
|
||||
return fmt.Errorf("Never achieved initial sync")
|
||||
}
|
||||
klog.Info("Running API Priority and Fairness config worker")
|
||||
wait.Until(cfgCtl.runWorker, time.Second, stopCh)
|
||||
wait.Until(cfgCtlr.runWorker, time.Second, stopCh)
|
||||
klog.Info("Shutting down API Priority and Fairness config worker")
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cfgCtl *configController) runWorker() {
|
||||
for cfgCtl.processNextWorkItem() {
|
||||
func (cfgCtlr *configController) runWorker() {
|
||||
for cfgCtlr.processNextWorkItem() {
|
||||
}
|
||||
}
|
||||
|
||||
func (cfgCtl *configController) processNextWorkItem() bool {
|
||||
obj, shutdown := cfgCtl.configQueue.Get()
|
||||
func (cfgCtlr *configController) processNextWorkItem() bool {
|
||||
obj, shutdown := cfgCtlr.configQueue.Get()
|
||||
if shutdown {
|
||||
return false
|
||||
}
|
||||
|
||||
func(obj interface{}) {
|
||||
defer cfgCtl.configQueue.Done(obj)
|
||||
if !cfgCtl.syncOne() {
|
||||
cfgCtl.configQueue.AddRateLimited(obj)
|
||||
defer cfgCtlr.configQueue.Done(obj)
|
||||
if !cfgCtlr.syncOne() {
|
||||
cfgCtlr.configQueue.AddRateLimited(obj)
|
||||
} else {
|
||||
cfgCtl.configQueue.Forget(obj)
|
||||
cfgCtlr.configQueue.Forget(obj)
|
||||
}
|
||||
}(obj)
|
||||
|
||||
@@ -259,19 +265,19 @@ func (cfgCtl *configController) processNextWorkItem() bool {
|
||||
// syncOne attempts to sync all the API Priority and Fairness config
|
||||
// objects. It either succeeds and returns `true` or logs an error
|
||||
// and returns `false`.
|
||||
func (cfgCtl *configController) syncOne() bool {
|
||||
func (cfgCtlr *configController) syncOne() bool {
|
||||
all := labels.Everything()
|
||||
newPLs, err := cfgCtl.plLister.List(all)
|
||||
newPLs, err := cfgCtlr.plLister.List(all)
|
||||
if err != nil {
|
||||
klog.Errorf("Unable to list PriorityLevelConfiguration objects: %s", err.Error())
|
||||
return false
|
||||
}
|
||||
newFSs, err := cfgCtl.fsLister.List(all)
|
||||
newFSs, err := cfgCtlr.fsLister.List(all)
|
||||
if err != nil {
|
||||
klog.Errorf("Unable to list FlowSchema objects: %s", err.Error())
|
||||
return false
|
||||
}
|
||||
err = cfgCtl.digestConfigObjects(newPLs, newFSs)
|
||||
err = cfgCtlr.digestConfigObjects(newPLs, newFSs)
|
||||
if err == nil {
|
||||
return true
|
||||
}
|
||||
@@ -288,7 +294,7 @@ func (cfgCtl *configController) syncOne() bool {
|
||||
// FlowSchemas --- with the work dvided among the passes according to
|
||||
// those dependencies.
|
||||
type cfgMeal struct {
|
||||
cfgCtl *configController
|
||||
cfgCtlr *configController
|
||||
|
||||
newPLStates map[string]*priorityLevelState
|
||||
|
||||
@@ -315,9 +321,9 @@ type fsStatusUpdate struct {
|
||||
}
|
||||
|
||||
// digestConfigObjects is given all the API objects that configure
|
||||
// cfgCtl and writes its consequent new configState.
|
||||
func (cfgCtl *configController) digestConfigObjects(newPLs []*fctypesv1a1.PriorityLevelConfiguration, newFSs []*fctypesv1a1.FlowSchema) error {
|
||||
fsStatusUpdates := cfgCtl.lockAndDigestConfigObjects(newPLs, newFSs)
|
||||
// cfgCtlr and writes its consequent new configState.
|
||||
func (cfgCtlr *configController) digestConfigObjects(newPLs []*fctypesv1a1.PriorityLevelConfiguration, newFSs []*fctypesv1a1.FlowSchema) error {
|
||||
fsStatusUpdates := cfgCtlr.lockAndDigestConfigObjects(newPLs, newFSs)
|
||||
var errs []error
|
||||
for _, fsu := range fsStatusUpdates {
|
||||
enc, err := json.Marshal(fsu.condition)
|
||||
@@ -326,7 +332,7 @@ func (cfgCtl *configController) digestConfigObjects(newPLs []*fctypesv1a1.Priori
|
||||
panic(fmt.Sprintf("Failed to json.Marshall(%#+v): %s", fsu.condition, err.Error()))
|
||||
}
|
||||
klog.V(4).Infof("Writing Condition %s to FlowSchema %s because its previous value was %s", string(enc), fsu.flowSchema.Name, fcfmt.Fmt(fsu.oldValue))
|
||||
_, err = cfgCtl.flowcontrolClient.FlowSchemas().Patch(context.TODO(), fsu.flowSchema.Name, apitypes.StrategicMergePatchType, []byte(fmt.Sprintf(`{"status": {"conditions": [ %s ] } }`, string(enc))), metav1.PatchOptions{FieldManager: "api-priority-and-fairness-config-consumer-v1"}, "status")
|
||||
_, err = cfgCtlr.flowcontrolClient.FlowSchemas().Patch(context.TODO(), fsu.flowSchema.Name, apitypes.StrategicMergePatchType, []byte(fmt.Sprintf(`{"status": {"conditions": [ %s ] } }`, string(enc))), metav1.PatchOptions{FieldManager: "api-priority-and-fairness-config-consumer-v1"}, "status")
|
||||
if err != nil {
|
||||
errs = append(errs, errors.Wrap(err, fmt.Sprintf("failed to set a status.condition for FlowSchema %s", fsu.flowSchema.Name)))
|
||||
}
|
||||
@@ -337,11 +343,11 @@ func (cfgCtl *configController) digestConfigObjects(newPLs []*fctypesv1a1.Priori
|
||||
return apierrors.NewAggregate(errs)
|
||||
}
|
||||
|
||||
func (cfgCtl *configController) lockAndDigestConfigObjects(newPLs []*fctypesv1a1.PriorityLevelConfiguration, newFSs []*fctypesv1a1.FlowSchema) []fsStatusUpdate {
|
||||
cfgCtl.lock.Lock()
|
||||
defer cfgCtl.lock.Unlock()
|
||||
func (cfgCtlr *configController) lockAndDigestConfigObjects(newPLs []*fctypesv1a1.PriorityLevelConfiguration, newFSs []*fctypesv1a1.FlowSchema) []fsStatusUpdate {
|
||||
cfgCtlr.lock.Lock()
|
||||
defer cfgCtlr.lock.Unlock()
|
||||
meal := cfgMeal{
|
||||
cfgCtl: cfgCtl,
|
||||
cfgCtlr: cfgCtlr,
|
||||
newPLStates: make(map[string]*priorityLevelState),
|
||||
}
|
||||
|
||||
@@ -351,16 +357,16 @@ func (cfgCtl *configController) lockAndDigestConfigObjects(newPLs []*fctypesv1a1
|
||||
|
||||
// Supply missing mandatory PriorityLevelConfiguration objects
|
||||
if !meal.haveExemptPL {
|
||||
meal.imaginePL(fcboot.MandatoryPriorityLevelConfigurationExempt, cfgCtl.requestWaitLimit)
|
||||
meal.imaginePL(fcboot.MandatoryPriorityLevelConfigurationExempt, cfgCtlr.requestWaitLimit)
|
||||
}
|
||||
if !meal.haveCatchAllPL {
|
||||
meal.imaginePL(fcboot.MandatoryPriorityLevelConfigurationCatchAll, cfgCtl.requestWaitLimit)
|
||||
meal.imaginePL(fcboot.MandatoryPriorityLevelConfigurationCatchAll, cfgCtlr.requestWaitLimit)
|
||||
}
|
||||
|
||||
meal.finishQueueSetReconfigsLocked()
|
||||
|
||||
// The new config has been constructed
|
||||
cfgCtl.priorityLevelStates = meal.newPLStates
|
||||
cfgCtlr.priorityLevelStates = meal.newPLStates
|
||||
klog.V(5).Infof("Switched to new API Priority and Fairness configuration")
|
||||
return meal.fsStatusUpdates
|
||||
}
|
||||
@@ -369,11 +375,11 @@ func (cfgCtl *configController) lockAndDigestConfigObjects(newPLs []*fctypesv1a1
|
||||
// Pretend broken ones do not exist.
|
||||
func (meal *cfgMeal) digestNewPLsLocked(newPLs []*fctypesv1a1.PriorityLevelConfiguration) {
|
||||
for _, pl := range newPLs {
|
||||
state := meal.cfgCtl.priorityLevelStates[pl.Name]
|
||||
state := meal.cfgCtlr.priorityLevelStates[pl.Name]
|
||||
if state == nil {
|
||||
state = &priorityLevelState{}
|
||||
state = &priorityLevelState{obsPair: meal.cfgCtlr.obsPairGenerator.Generate(1, 1, []string{pl.Name})}
|
||||
}
|
||||
qsCompleter, err := qscOfPL(meal.cfgCtl.queueSetFactory, state.queues, pl, meal.cfgCtl.requestWaitLimit)
|
||||
qsCompleter, err := queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, state.queues, pl, meal.cfgCtlr.requestWaitLimit, state.obsPair)
|
||||
if err != nil {
|
||||
klog.Warningf("Ignoring PriorityLevelConfiguration object %s because its spec (%s) is broken: %s", pl.Name, fcfmt.Fmt(pl.Spec), err)
|
||||
continue
|
||||
@@ -439,7 +445,7 @@ func (meal *cfgMeal) digestFlowSchemasLocked(newFSs []*fctypesv1a1.FlowSchema) {
|
||||
fsSeq = append(fsSeq, fcboot.MandatoryFlowSchemaCatchAll)
|
||||
}
|
||||
|
||||
meal.cfgCtl.flowSchemas = fsSeq
|
||||
meal.cfgCtlr.flowSchemas = fsSeq
|
||||
if klog.V(5).Enabled() {
|
||||
for _, fs := range fsSeq {
|
||||
klog.Infof("Using FlowSchema %s", fcfmt.Fmt(fs))
|
||||
@@ -453,7 +459,7 @@ func (meal *cfgMeal) digestFlowSchemasLocked(newFSs []*fctypesv1a1.FlowSchema) {
|
||||
// queues, otherwise start the quiescing process if that has not
|
||||
// already been started.
|
||||
func (meal *cfgMeal) processOldPLsLocked() {
|
||||
for plName, plState := range meal.cfgCtl.priorityLevelStates {
|
||||
for plName, plState := range meal.cfgCtlr.priorityLevelStates {
|
||||
if meal.newPLStates[plName] != nil {
|
||||
// Still desired and already updated
|
||||
continue
|
||||
@@ -476,9 +482,9 @@ func (meal *cfgMeal) processOldPLsLocked() {
|
||||
}
|
||||
}
|
||||
var err error
|
||||
plState.qsCompleter, err = qscOfPL(meal.cfgCtl.queueSetFactory, plState.queues, plState.pl, meal.cfgCtl.requestWaitLimit)
|
||||
plState.qsCompleter, err = queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, plState.queues, plState.pl, meal.cfgCtlr.requestWaitLimit, plState.obsPair)
|
||||
if err != nil {
|
||||
// This can not happen because qscOfPL already approved this config
|
||||
// This can not happen because queueSetCompleterForPL already approved this config
|
||||
panic(fmt.Sprintf("%s from name=%q spec=%s", err, plName, fcfmt.Fmt(plState.pl.Spec)))
|
||||
}
|
||||
if plState.pl.Spec.Limited != nil {
|
||||
@@ -509,7 +515,7 @@ func (meal *cfgMeal) finishQueueSetReconfigsLocked() {
|
||||
// The use of math.Ceil here means that the results might sum
|
||||
// to a little more than serverConcurrencyLimit but the
|
||||
// difference will be negligible.
|
||||
concurrencyLimit := int(math.Ceil(float64(meal.cfgCtl.serverConcurrencyLimit) * float64(plState.pl.Spec.Limited.AssuredConcurrencyShares) / meal.shareSum))
|
||||
concurrencyLimit := int(math.Ceil(float64(meal.cfgCtlr.serverConcurrencyLimit) * float64(plState.pl.Spec.Limited.AssuredConcurrencyShares) / meal.shareSum))
|
||||
metrics.UpdateSharedConcurrencyLimit(plName, concurrencyLimit)
|
||||
|
||||
if plState.queues == nil {
|
||||
@@ -521,10 +527,11 @@ func (meal *cfgMeal) finishQueueSetReconfigsLocked() {
|
||||
}
|
||||
}
|
||||
|
||||
// qscOfPL returns a pointer to an appropriate QueuingConfig or nil
|
||||
// if no limiting is called for. Returns nil and an error if the given
|
||||
// queueSetCompleterForPL returns an appropriate QueueSetCompleter for the
|
||||
// given priority level configuration. Returns nil if that config
|
||||
// does not call for limiting. Returns nil and an error if the given
|
||||
// object is malformed in a way that is a problem for this package.
|
||||
func qscOfPL(qsf fq.QueueSetFactory, queues fq.QueueSet, pl *fctypesv1a1.PriorityLevelConfiguration, requestWaitLimit time.Duration) (fq.QueueSetCompleter, error) {
|
||||
func queueSetCompleterForPL(qsf fq.QueueSetFactory, queues fq.QueueSet, pl *fctypesv1a1.PriorityLevelConfiguration, requestWaitLimit time.Duration, intPair metrics.TimedObserverPair) (fq.QueueSetCompleter, error) {
|
||||
if (pl.Spec.Type == fctypesv1a1.PriorityLevelEnablementExempt) != (pl.Spec.Limited == nil) {
|
||||
return nil, errors.New("broken union structure at the top")
|
||||
}
|
||||
@@ -553,7 +560,7 @@ func qscOfPL(qsf fq.QueueSetFactory, queues fq.QueueSet, pl *fctypesv1a1.Priorit
|
||||
if queues != nil {
|
||||
qsc, err = queues.BeginConfigChange(qcQS)
|
||||
} else {
|
||||
qsc, err = qsf.BeginConstruction(qcQS)
|
||||
qsc, err = qsf.BeginConstruction(qcQS, intPair)
|
||||
}
|
||||
if err != nil {
|
||||
err = errors.Wrap(err, fmt.Sprintf("priority level %q has QueuingConfiguration %#+v, which is invalid", pl.Name, qcAPI))
|
||||
@@ -594,9 +601,11 @@ func (meal *cfgMeal) presyncFlowSchemaStatus(fs *fctypesv1a1.FlowSchema, isDangl
|
||||
}
|
||||
|
||||
// imaginePL adds a priority level based on one of the mandatory ones
|
||||
// that does not actually exist (right now) as a real API object.
|
||||
func (meal *cfgMeal) imaginePL(proto *fctypesv1a1.PriorityLevelConfiguration, requestWaitLimit time.Duration) {
|
||||
klog.V(3).Infof("No %s PriorityLevelConfiguration found, imagining one", proto.Name)
|
||||
qsCompleter, err := qscOfPL(meal.cfgCtl.queueSetFactory, nil, proto, requestWaitLimit)
|
||||
obsPair := meal.cfgCtlr.obsPairGenerator.Generate(1, 1, []string{proto.Name})
|
||||
qsCompleter, err := queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, nil, proto, requestWaitLimit, obsPair)
|
||||
if err != nil {
|
||||
// This can not happen because proto is one of the mandatory
|
||||
// objects and these are not erroneous
|
||||
@@ -605,6 +614,7 @@ func (meal *cfgMeal) imaginePL(proto *fctypesv1a1.PriorityLevelConfiguration, re
|
||||
meal.newPLStates[proto.Name] = &priorityLevelState{
|
||||
pl: proto,
|
||||
qsCompleter: qsCompleter,
|
||||
obsPair: obsPair,
|
||||
}
|
||||
if proto.Spec.Limited != nil {
|
||||
meal.shareSum += float64(proto.Spec.Limited.AssuredConcurrencyShares)
|
||||
@@ -624,14 +634,14 @@ func (immediateRequest) Finish(execute func()) bool {
|
||||
// The returned bool indicates whether the request is exempt from
|
||||
// limitation. The startWaitingTime is when the request started
|
||||
// waiting in its queue, or `Time{}` if this did not happen.
|
||||
func (cfgCtl *configController) startRequest(ctx context.Context, rd RequestDigest) (fs *fctypesv1a1.FlowSchema, pl *fctypesv1a1.PriorityLevelConfiguration, isExempt bool, req fq.Request, startWaitingTime time.Time) {
|
||||
func (cfgCtlr *configController) startRequest(ctx context.Context, rd RequestDigest, queueNoteFn fq.QueueNoteFn) (fs *fctypesv1a1.FlowSchema, pl *fctypesv1a1.PriorityLevelConfiguration, isExempt bool, req fq.Request, startWaitingTime time.Time) {
|
||||
klog.V(7).Infof("startRequest(%#+v)", rd)
|
||||
cfgCtl.lock.Lock()
|
||||
defer cfgCtl.lock.Unlock()
|
||||
for _, fs := range cfgCtl.flowSchemas {
|
||||
cfgCtlr.lock.Lock()
|
||||
defer cfgCtlr.lock.Unlock()
|
||||
for _, fs := range cfgCtlr.flowSchemas {
|
||||
if matchesFlowSchema(rd, fs) {
|
||||
plName := fs.Spec.PriorityLevelConfiguration.Name
|
||||
plState := cfgCtl.priorityLevelStates[plName]
|
||||
plState := cfgCtlr.priorityLevelStates[plName]
|
||||
if plState.pl.Spec.Type == fctypesv1a1.PriorityLevelEnablementExempt {
|
||||
klog.V(7).Infof("startRequest(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, immediate", rd, fs.Name, fs.Spec.DistinguisherMethod, plName)
|
||||
return fs, plState.pl, true, immediateRequest{}, time.Time{}
|
||||
@@ -649,9 +659,9 @@ func (cfgCtl *configController) startRequest(ctx context.Context, rd RequestDige
|
||||
}
|
||||
startWaitingTime = time.Now()
|
||||
klog.V(7).Infof("startRequest(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, numQueues=%d", rd, fs.Name, fs.Spec.DistinguisherMethod, plName, numQueues)
|
||||
req, idle := plState.queues.StartRequest(ctx, hashValue, flowDistinguisher, fs.Name, rd.RequestInfo, rd.User)
|
||||
req, idle := plState.queues.StartRequest(ctx, hashValue, flowDistinguisher, fs.Name, rd.RequestInfo, rd.User, queueNoteFn)
|
||||
if idle {
|
||||
cfgCtl.maybeReapLocked(plName, plState)
|
||||
cfgCtlr.maybeReapLocked(plName, plState)
|
||||
}
|
||||
return fs, plState.pl, false, req, startWaitingTime
|
||||
}
|
||||
@@ -660,7 +670,7 @@ func (cfgCtl *configController) startRequest(ctx context.Context, rd RequestDige
|
||||
// FlowSchema that matches everything. If somehow control reaches
|
||||
// here, panic with some relevant information.
|
||||
var catchAll *fctypesv1a1.FlowSchema
|
||||
for _, fs := range cfgCtl.flowSchemas {
|
||||
for _, fs := range cfgCtlr.flowSchemas {
|
||||
if fs.Name == fctypesv1a1.FlowSchemaNameCatchAll {
|
||||
catchAll = fs
|
||||
}
|
||||
@@ -669,10 +679,10 @@ func (cfgCtl *configController) startRequest(ctx context.Context, rd RequestDige
|
||||
}
|
||||
|
||||
// Call this after getting a clue that the given priority level is undesired and idle
|
||||
func (cfgCtl *configController) maybeReap(plName string) {
|
||||
cfgCtl.lock.Lock()
|
||||
defer cfgCtl.lock.Unlock()
|
||||
plState := cfgCtl.priorityLevelStates[plName]
|
||||
func (cfgCtlr *configController) maybeReap(plName string) {
|
||||
cfgCtlr.lock.Lock()
|
||||
defer cfgCtlr.lock.Unlock()
|
||||
plState := cfgCtlr.priorityLevelStates[plName]
|
||||
if plState == nil {
|
||||
klog.V(7).Infof("plName=%s, plState==nil", plName)
|
||||
return
|
||||
@@ -685,17 +695,17 @@ func (cfgCtl *configController) maybeReap(plName string) {
|
||||
}
|
||||
}
|
||||
klog.V(3).Infof("Triggered API priority and fairness config reloading because priority level %s is undesired and idle", plName)
|
||||
cfgCtl.configQueue.Add(0)
|
||||
cfgCtlr.configQueue.Add(0)
|
||||
}
|
||||
|
||||
// Call this if both (1) plState.queues is non-nil and reported being
|
||||
// idle, and (2) cfgCtl's lock has not been released since then.
|
||||
func (cfgCtl *configController) maybeReapLocked(plName string, plState *priorityLevelState) {
|
||||
// idle, and (2) cfgCtlr's lock has not been released since then.
|
||||
func (cfgCtlr *configController) maybeReapLocked(plName string, plState *priorityLevelState) {
|
||||
if !(plState.quiescing && plState.numPending == 0) {
|
||||
return
|
||||
}
|
||||
klog.V(3).Infof("Triggered API priority and fairness config reloading because priority level %s is undesired and idle", plName)
|
||||
cfgCtl.configQueue.Add(0)
|
||||
cfgCtlr.configQueue.Add(0)
|
||||
}
|
||||
|
||||
// computeFlowDistinguisher extracts the flow distinguisher according to the given method
|
||||
|
@@ -34,20 +34,20 @@ const (
|
||||
queryIncludeRequestDetails = "includeRequestDetails"
|
||||
)
|
||||
|
||||
func (cfgCtl *configController) Install(c *mux.PathRecorderMux) {
|
||||
func (cfgCtlr *configController) Install(c *mux.PathRecorderMux) {
|
||||
// TODO(yue9944882): handle "Accept" header properly
|
||||
// debugging dumps a CSV content for three levels of granularity
|
||||
// 1. row per priority-level
|
||||
c.UnlistedHandleFunc("/debug/api_priority_and_fairness/dump_priority_levels", cfgCtl.dumpPriorityLevels)
|
||||
c.UnlistedHandleFunc("/debug/api_priority_and_fairness/dump_priority_levels", cfgCtlr.dumpPriorityLevels)
|
||||
// 2. row per queue
|
||||
c.UnlistedHandleFunc("/debug/api_priority_and_fairness/dump_queues", cfgCtl.dumpQueues)
|
||||
c.UnlistedHandleFunc("/debug/api_priority_and_fairness/dump_queues", cfgCtlr.dumpQueues)
|
||||
// 3. row per request
|
||||
c.UnlistedHandleFunc("/debug/api_priority_and_fairness/dump_requests", cfgCtl.dumpRequests)
|
||||
c.UnlistedHandleFunc("/debug/api_priority_and_fairness/dump_requests", cfgCtlr.dumpRequests)
|
||||
}
|
||||
|
||||
func (cfgCtl *configController) dumpPriorityLevels(w http.ResponseWriter, r *http.Request) {
|
||||
cfgCtl.lock.Lock()
|
||||
defer cfgCtl.lock.Unlock()
|
||||
func (cfgCtlr *configController) dumpPriorityLevels(w http.ResponseWriter, r *http.Request) {
|
||||
cfgCtlr.lock.Lock()
|
||||
defer cfgCtlr.lock.Unlock()
|
||||
tabWriter := tabwriter.NewWriter(w, 8, 0, 1, ' ', 0)
|
||||
columnHeaders := []string{
|
||||
"PriorityLevelName", // 1
|
||||
@@ -59,7 +59,7 @@ func (cfgCtl *configController) dumpPriorityLevels(w http.ResponseWriter, r *htt
|
||||
}
|
||||
tabPrint(tabWriter, rowForHeaders(columnHeaders))
|
||||
endline(tabWriter)
|
||||
for _, plState := range cfgCtl.priorityLevelStates {
|
||||
for _, plState := range cfgCtlr.priorityLevelStates {
|
||||
if plState.queues == nil {
|
||||
tabPrint(tabWriter, row(
|
||||
plState.pl.Name, // 1
|
||||
@@ -93,9 +93,9 @@ func (cfgCtl *configController) dumpPriorityLevels(w http.ResponseWriter, r *htt
|
||||
runtime.HandleError(tabWriter.Flush())
|
||||
}
|
||||
|
||||
func (cfgCtl *configController) dumpQueues(w http.ResponseWriter, r *http.Request) {
|
||||
cfgCtl.lock.Lock()
|
||||
defer cfgCtl.lock.Unlock()
|
||||
func (cfgCtlr *configController) dumpQueues(w http.ResponseWriter, r *http.Request) {
|
||||
cfgCtlr.lock.Lock()
|
||||
defer cfgCtlr.lock.Unlock()
|
||||
tabWriter := tabwriter.NewWriter(w, 8, 0, 1, ' ', 0)
|
||||
columnHeaders := []string{
|
||||
"PriorityLevelName", // 1
|
||||
@@ -106,7 +106,7 @@ func (cfgCtl *configController) dumpQueues(w http.ResponseWriter, r *http.Reques
|
||||
}
|
||||
tabPrint(tabWriter, rowForHeaders(columnHeaders))
|
||||
endline(tabWriter)
|
||||
for _, plState := range cfgCtl.priorityLevelStates {
|
||||
for _, plState := range cfgCtlr.priorityLevelStates {
|
||||
if plState.queues == nil {
|
||||
tabPrint(tabWriter, row(
|
||||
plState.pl.Name, // 1
|
||||
@@ -133,9 +133,9 @@ func (cfgCtl *configController) dumpQueues(w http.ResponseWriter, r *http.Reques
|
||||
runtime.HandleError(tabWriter.Flush())
|
||||
}
|
||||
|
||||
func (cfgCtl *configController) dumpRequests(w http.ResponseWriter, r *http.Request) {
|
||||
cfgCtl.lock.Lock()
|
||||
defer cfgCtl.lock.Unlock()
|
||||
func (cfgCtlr *configController) dumpRequests(w http.ResponseWriter, r *http.Request) {
|
||||
cfgCtlr.lock.Lock()
|
||||
defer cfgCtlr.lock.Unlock()
|
||||
|
||||
includeRequestDetails := len(r.URL.Query().Get(queryIncludeRequestDetails)) > 0
|
||||
|
||||
@@ -161,7 +161,7 @@ func (cfgCtl *configController) dumpRequests(w http.ResponseWriter, r *http.Requ
|
||||
}))
|
||||
}
|
||||
endline(tabWriter)
|
||||
for _, plState := range cfgCtl.priorityLevelStates {
|
||||
for _, plState := range cfgCtlr.priorityLevelStates {
|
||||
if plState.queues == nil {
|
||||
tabPrint(tabWriter, row(
|
||||
plState.pl.Name, // 1
|
||||
|
@@ -45,6 +45,7 @@ type Interface interface {
|
||||
Handle(ctx context.Context,
|
||||
requestDigest RequestDigest,
|
||||
noteFn func(fs *fctypesv1a1.FlowSchema, pl *fctypesv1a1.PriorityLevelConfiguration),
|
||||
queueNoteFn fq.QueueNoteFn,
|
||||
execFn func(),
|
||||
)
|
||||
|
||||
@@ -72,6 +73,7 @@ func New(
|
||||
flowcontrolClient,
|
||||
serverConcurrencyLimit,
|
||||
requestWaitLimit,
|
||||
metrics.PriorityLevelConcurrencyObserverPairGenerator,
|
||||
fqs.NewQueueSetFactory(&clock.RealClock{}, grc),
|
||||
)
|
||||
}
|
||||
@@ -82,15 +84,17 @@ func NewTestable(
|
||||
flowcontrolClient fcclientv1a1.FlowcontrolV1alpha1Interface,
|
||||
serverConcurrencyLimit int,
|
||||
requestWaitLimit time.Duration,
|
||||
obsPairGenerator metrics.TimedObserverPairGenerator,
|
||||
queueSetFactory fq.QueueSetFactory,
|
||||
) Interface {
|
||||
return newTestableController(informerFactory, flowcontrolClient, serverConcurrencyLimit, requestWaitLimit, queueSetFactory)
|
||||
return newTestableController(informerFactory, flowcontrolClient, serverConcurrencyLimit, requestWaitLimit, obsPairGenerator, queueSetFactory)
|
||||
}
|
||||
|
||||
func (cfgCtl *configController) Handle(ctx context.Context, requestDigest RequestDigest,
|
||||
func (cfgCtlr *configController) Handle(ctx context.Context, requestDigest RequestDigest,
|
||||
noteFn func(fs *fctypesv1a1.FlowSchema, pl *fctypesv1a1.PriorityLevelConfiguration),
|
||||
queueNoteFn fq.QueueNoteFn,
|
||||
execFn func()) {
|
||||
fs, pl, isExempt, req, startWaitingTime := cfgCtl.startRequest(ctx, requestDigest)
|
||||
fs, pl, isExempt, req, startWaitingTime := cfgCtlr.startRequest(ctx, requestDigest, queueNoteFn)
|
||||
queued := startWaitingTime != time.Time{}
|
||||
noteFn(fs, pl)
|
||||
if req == nil {
|
||||
@@ -117,6 +121,6 @@ func (cfgCtl *configController) Handle(ctx context.Context, requestDigest Reques
|
||||
}
|
||||
klog.V(7).Infof("Handle(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, isExempt=%v, queued=%v, Finish() => idle=%v", requestDigest, fs.Name, fs.Spec.DistinguisherMethod, pl.Name, isExempt, queued, idle)
|
||||
if idle {
|
||||
cfgCtl.maybeReap(pl.Name)
|
||||
cfgCtlr.maybeReap(pl.Name)
|
||||
}
|
||||
}
|
||||
|
@@ -31,6 +31,7 @@ import (
|
||||
"k8s.io/apiserver/pkg/util/flowcontrol/debug"
|
||||
fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing"
|
||||
fcfmt "k8s.io/apiserver/pkg/util/flowcontrol/format"
|
||||
"k8s.io/apiserver/pkg/util/flowcontrol/metrics"
|
||||
"k8s.io/client-go/informers"
|
||||
clientsetfake "k8s.io/client-go/kubernetes/fake"
|
||||
fcclient "k8s.io/client-go/kubernetes/typed/flowcontrol/v1alpha1"
|
||||
@@ -50,16 +51,16 @@ var mandPLs = func() map[string]*fcv1a1.PriorityLevelConfiguration {
|
||||
return ans
|
||||
}()
|
||||
|
||||
type ctlTestState struct {
|
||||
type ctlrTestState struct {
|
||||
t *testing.T
|
||||
cfgCtl *configController
|
||||
cfgCtlr *configController
|
||||
fcIfc fcclient.FlowcontrolV1alpha1Interface
|
||||
existingPLs map[string]*fcv1a1.PriorityLevelConfiguration
|
||||
existingFSs map[string]*fcv1a1.FlowSchema
|
||||
heldRequestsMap map[string][]heldRequest
|
||||
requestWG sync.WaitGroup
|
||||
lock sync.Mutex
|
||||
queues map[string]*ctlTestQueueSet
|
||||
queues map[string]*ctlrTestQueueSet
|
||||
}
|
||||
|
||||
type heldRequest struct {
|
||||
@@ -67,45 +68,45 @@ type heldRequest struct {
|
||||
finishCh chan struct{}
|
||||
}
|
||||
|
||||
var _ fq.QueueSetFactory = (*ctlTestState)(nil)
|
||||
var _ fq.QueueSetFactory = (*ctlrTestState)(nil)
|
||||
|
||||
type ctlTestQueueSetCompleter struct {
|
||||
cts *ctlTestState
|
||||
cqs *ctlTestQueueSet
|
||||
type ctlrTestQueueSetCompleter struct {
|
||||
cts *ctlrTestState
|
||||
cqs *ctlrTestQueueSet
|
||||
qc fq.QueuingConfig
|
||||
}
|
||||
|
||||
type ctlTestQueueSet struct {
|
||||
cts *ctlTestState
|
||||
type ctlrTestQueueSet struct {
|
||||
cts *ctlrTestState
|
||||
qc fq.QueuingConfig
|
||||
dc fq.DispatchingConfig
|
||||
countActive int
|
||||
}
|
||||
|
||||
type ctlTestRequest struct {
|
||||
cqs *ctlTestQueueSet
|
||||
type ctlrTestRequest struct {
|
||||
cqs *ctlrTestQueueSet
|
||||
qsName string
|
||||
descr1, descr2 interface{}
|
||||
}
|
||||
|
||||
func (cts *ctlTestState) BeginConstruction(qc fq.QueuingConfig) (fq.QueueSetCompleter, error) {
|
||||
return ctlTestQueueSetCompleter{cts, nil, qc}, nil
|
||||
func (cts *ctlrTestState) BeginConstruction(qc fq.QueuingConfig, ip metrics.TimedObserverPair) (fq.QueueSetCompleter, error) {
|
||||
return ctlrTestQueueSetCompleter{cts, nil, qc}, nil
|
||||
}
|
||||
|
||||
func (cqs *ctlTestQueueSet) BeginConfigChange(qc fq.QueuingConfig) (fq.QueueSetCompleter, error) {
|
||||
return ctlTestQueueSetCompleter{cqs.cts, cqs, qc}, nil
|
||||
func (cqs *ctlrTestQueueSet) BeginConfigChange(qc fq.QueuingConfig) (fq.QueueSetCompleter, error) {
|
||||
return ctlrTestQueueSetCompleter{cqs.cts, cqs, qc}, nil
|
||||
}
|
||||
|
||||
func (cqs *ctlTestQueueSet) Dump(bool) debug.QueueSetDump {
|
||||
func (cqs *ctlrTestQueueSet) Dump(bool) debug.QueueSetDump {
|
||||
return debug.QueueSetDump{}
|
||||
}
|
||||
|
||||
func (cqc ctlTestQueueSetCompleter) Complete(dc fq.DispatchingConfig) fq.QueueSet {
|
||||
func (cqc ctlrTestQueueSetCompleter) Complete(dc fq.DispatchingConfig) fq.QueueSet {
|
||||
cqc.cts.lock.Lock()
|
||||
defer cqc.cts.lock.Unlock()
|
||||
qs := cqc.cqs
|
||||
if qs == nil {
|
||||
qs = &ctlTestQueueSet{cts: cqc.cts, qc: cqc.qc, dc: dc}
|
||||
qs = &ctlrTestQueueSet{cts: cqc.cts, qc: cqc.qc, dc: dc}
|
||||
cqc.cts.queues[cqc.qc.Name] = qs
|
||||
} else {
|
||||
qs.qc, qs.dc = cqc.qc, dc
|
||||
@@ -113,22 +114,22 @@ func (cqc ctlTestQueueSetCompleter) Complete(dc fq.DispatchingConfig) fq.QueueSe
|
||||
return qs
|
||||
}
|
||||
|
||||
func (cqs *ctlTestQueueSet) IsIdle() bool {
|
||||
func (cqs *ctlrTestQueueSet) IsIdle() bool {
|
||||
cqs.cts.lock.Lock()
|
||||
defer cqs.cts.lock.Unlock()
|
||||
klog.V(7).Infof("For %p QS %s, countActive==%d", cqs, cqs.qc.Name, cqs.countActive)
|
||||
return cqs.countActive == 0
|
||||
}
|
||||
|
||||
func (cqs *ctlTestQueueSet) StartRequest(ctx context.Context, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}) (req fq.Request, idle bool) {
|
||||
func (cqs *ctlrTestQueueSet) StartRequest(ctx context.Context, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn fq.QueueNoteFn) (req fq.Request, idle bool) {
|
||||
cqs.cts.lock.Lock()
|
||||
defer cqs.cts.lock.Unlock()
|
||||
cqs.countActive++
|
||||
cqs.cts.t.Logf("Queued %q %#+v %#+v for %p QS=%s, countActive:=%d", fsName, descr1, descr2, cqs, cqs.qc.Name, cqs.countActive)
|
||||
return &ctlTestRequest{cqs, cqs.qc.Name, descr1, descr2}, false
|
||||
return &ctlrTestRequest{cqs, cqs.qc.Name, descr1, descr2}, false
|
||||
}
|
||||
|
||||
func (ctr *ctlTestRequest) Finish(execute func()) bool {
|
||||
func (ctr *ctlrTestRequest) Finish(execute func()) bool {
|
||||
execute()
|
||||
ctr.cqs.cts.lock.Lock()
|
||||
defer ctr.cqs.cts.lock.Unlock()
|
||||
@@ -137,13 +138,13 @@ func (ctr *ctlTestRequest) Finish(execute func()) bool {
|
||||
return ctr.cqs.countActive == 0
|
||||
}
|
||||
|
||||
func (cts *ctlTestState) getQueueSetNames() sets.String {
|
||||
func (cts *ctlrTestState) getQueueSetNames() sets.String {
|
||||
cts.lock.Lock()
|
||||
defer cts.lock.Unlock()
|
||||
return sets.StringKeySet(cts.queues)
|
||||
}
|
||||
|
||||
func (cts *ctlTestState) getNonIdleQueueSetNames() sets.String {
|
||||
func (cts *ctlrTestState) getNonIdleQueueSetNames() sets.String {
|
||||
cts.lock.Lock()
|
||||
defer cts.lock.Unlock()
|
||||
ans := sets.NewString()
|
||||
@@ -155,14 +156,14 @@ func (cts *ctlTestState) getNonIdleQueueSetNames() sets.String {
|
||||
return ans
|
||||
}
|
||||
|
||||
func (cts *ctlTestState) hasNonIdleQueueSet(name string) bool {
|
||||
func (cts *ctlrTestState) hasNonIdleQueueSet(name string) bool {
|
||||
cts.lock.Lock()
|
||||
defer cts.lock.Unlock()
|
||||
qs := cts.queues[name]
|
||||
return qs != nil && qs.countActive > 0
|
||||
}
|
||||
|
||||
func (cts *ctlTestState) addHeldRequest(plName string, rd RequestDigest, finishCh chan struct{}) {
|
||||
func (cts *ctlrTestState) addHeldRequest(plName string, rd RequestDigest, finishCh chan struct{}) {
|
||||
cts.lock.Lock()
|
||||
defer cts.lock.Unlock()
|
||||
hrs := cts.heldRequestsMap[plName]
|
||||
@@ -171,7 +172,7 @@ func (cts *ctlTestState) addHeldRequest(plName string, rd RequestDigest, finishC
|
||||
cts.t.Logf("Holding %#+v for %s, count:=%d", rd, plName, len(hrs))
|
||||
}
|
||||
|
||||
func (cts *ctlTestState) popHeldRequest() (plName string, hr *heldRequest, nCount int) {
|
||||
func (cts *ctlrTestState) popHeldRequest() (plName string, hr *heldRequest, nCount int) {
|
||||
cts.lock.Lock()
|
||||
defer cts.lock.Unlock()
|
||||
var hrs []heldRequest
|
||||
@@ -219,21 +220,22 @@ func TestConfigConsumer(t *testing.T) {
|
||||
clientset := clientsetfake.NewSimpleClientset()
|
||||
informerFactory := informers.NewSharedInformerFactory(clientset, 0)
|
||||
flowcontrolClient := clientset.FlowcontrolV1alpha1()
|
||||
cts := &ctlTestState{t: t,
|
||||
cts := &ctlrTestState{t: t,
|
||||
fcIfc: flowcontrolClient,
|
||||
existingFSs: map[string]*fcv1a1.FlowSchema{},
|
||||
existingPLs: map[string]*fcv1a1.PriorityLevelConfiguration{},
|
||||
heldRequestsMap: map[string][]heldRequest{},
|
||||
queues: map[string]*ctlTestQueueSet{},
|
||||
queues: map[string]*ctlrTestQueueSet{},
|
||||
}
|
||||
ctl := newTestableController(
|
||||
ctlr := newTestableController(
|
||||
informerFactory,
|
||||
flowcontrolClient,
|
||||
100, // server concurrency limit
|
||||
time.Minute, // request wait limit
|
||||
metrics.PriorityLevelConcurrencyObserverPairGenerator,
|
||||
cts,
|
||||
)
|
||||
cts.cfgCtl = ctl
|
||||
cts.cfgCtlr = ctlr
|
||||
persistingPLNames := sets.NewString()
|
||||
trialStep := fmt.Sprintf("trial%d-0", i)
|
||||
_, _, desiredPLNames, newBadPLNames := genPLs(rng, trialStep, persistingPLNames, 0)
|
||||
@@ -290,7 +292,7 @@ func TestConfigConsumer(t *testing.T) {
|
||||
for _, newFS := range newFSs {
|
||||
t.Logf("For %s, digesting newFS=%s", trialStep, fcfmt.Fmt(newFS))
|
||||
}
|
||||
_ = ctl.lockAndDigestConfigObjects(newPLs, newFSs)
|
||||
_ = ctlr.lockAndDigestConfigObjects(newPLs, newFSs)
|
||||
}
|
||||
for plName, hr, nCount := cts.popHeldRequest(); hr != nil; plName, hr, nCount = cts.popHeldRequest() {
|
||||
desired := desiredPLNames.Has(plName) || mandPLs[plName] != nil
|
||||
@@ -302,9 +304,9 @@ func TestConfigConsumer(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func checkNewFS(cts *ctlTestState, rng *rand.Rand, trialName string, ftr *fsTestingRecord, catchAlls map[bool]*fcv1a1.FlowSchema) {
|
||||
func checkNewFS(cts *ctlrTestState, rng *rand.Rand, trialName string, ftr *fsTestingRecord, catchAlls map[bool]*fcv1a1.FlowSchema) {
|
||||
t := cts.t
|
||||
ctl := cts.cfgCtl
|
||||
ctlr := cts.cfgCtlr
|
||||
fs := ftr.fs
|
||||
expectedPLName := fs.Spec.PriorityLevelConfiguration.Name
|
||||
ctx := context.Background()
|
||||
@@ -320,17 +322,18 @@ func checkNewFS(cts *ctlTestState, rng *rand.Rand, trialName string, ftr *fsTest
|
||||
startWG.Add(1)
|
||||
go func(matches, isResource bool, rdu RequestDigest) {
|
||||
expectedMatch := matches && ftr.wellFormed && (fsPrecedes(fs, catchAlls[isResource]) || fs.Name == catchAlls[isResource].Name)
|
||||
ctl.Handle(ctx, rdu, func(matchFS *fcv1a1.FlowSchema, matchPL *fcv1a1.PriorityLevelConfiguration) {
|
||||
ctlr.Handle(ctx, rdu, func(matchFS *fcv1a1.FlowSchema, matchPL *fcv1a1.PriorityLevelConfiguration) {
|
||||
matchIsExempt := matchPL.Spec.Type == fcv1a1.PriorityLevelEnablementExempt
|
||||
t.Logf("Considering FlowSchema %s, expectedMatch=%v, isResource=%v: Handle(%#+v) => note(fs=%s, pl=%s, isExempt=%v)", fs.Name, expectedMatch, isResource, rdu, matchFS.Name, matchPL.Name, matchIsExempt)
|
||||
if e, a := expectedMatch, matchFS.Name == fs.Name; e != a {
|
||||
t.Errorf("Fail at %s/%s: rd=%#+v, expectedMatch=%v, actualMatch=%v, matchFSName=%q, catchAlls=%#+v", trialName, fs.Name, rdu, e, a, matchFS.Name, catchAlls)
|
||||
if a := matchFS.Name == fs.Name; expectedMatch != a {
|
||||
t.Errorf("Fail at %s/%s: rd=%#+v, expectedMatch=%v, actualMatch=%v, matchFSName=%q, catchAlls=%#+v", trialName, fs.Name, rdu, expectedMatch, a, matchFS.Name, catchAlls)
|
||||
}
|
||||
if matchFS.Name == fs.Name {
|
||||
if e, a := fs.Spec.PriorityLevelConfiguration.Name, matchPL.Name; e != a {
|
||||
t.Errorf("Fail at %s/%s: e=%v, a=%v", trialName, fs.Name, e, a)
|
||||
if fs.Spec.PriorityLevelConfiguration.Name != matchPL.Name {
|
||||
t.Errorf("Fail at %s/%s: expected=%v, actual=%v", trialName, fs.Name, fs.Spec.PriorityLevelConfiguration.Name, matchPL.Name)
|
||||
}
|
||||
}
|
||||
}, func(inQueue bool) {
|
||||
}, func() {
|
||||
startWG.Done()
|
||||
_ = <-finishCh
|
||||
|
@@ -1,12 +1,19 @@
|
||||
load("@io_bazel_rules_go//go:def.bzl", "go_library")
|
||||
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
|
||||
|
||||
go_library(
|
||||
name = "go_default_library",
|
||||
srcs = ["interface.go"],
|
||||
srcs = [
|
||||
"integrator.go",
|
||||
"interface.go",
|
||||
],
|
||||
importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing",
|
||||
importpath = "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing",
|
||||
visibility = ["//visibility:public"],
|
||||
deps = ["//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/debug:go_default_library"],
|
||||
deps = [
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/debug:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
filegroup(
|
||||
@@ -27,3 +34,10 @@ filegroup(
|
||||
tags = ["automanaged"],
|
||||
visibility = ["//visibility:public"],
|
||||
)
|
||||
|
||||
go_test(
|
||||
name = "go_default_test",
|
||||
srcs = ["integrator_test.go"],
|
||||
embed = [":go_default_library"],
|
||||
deps = ["//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library"],
|
||||
)
|
||||
|
@@ -0,0 +1,180 @@
|
||||
/*
|
||||
Copyright 2019 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package fairqueuing
|
||||
|
||||
import (
|
||||
"math"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"k8s.io/apimachinery/pkg/util/clock"
|
||||
"k8s.io/apiserver/pkg/util/flowcontrol/metrics"
|
||||
)
|
||||
|
||||
// Integrator computes the moments of some variable X over time as
|
||||
// read from a particular clock. The integrals start when the
|
||||
// Integrator is created, and ends at the latest operation on the
|
||||
// Integrator. As a `metrics.TimedObserver` this fixes X1=1 and
|
||||
// ignores attempts to change X1.
|
||||
type Integrator interface {
|
||||
metrics.TimedObserver
|
||||
|
||||
GetResults() IntegratorResults
|
||||
|
||||
// Return the results of integrating to now, and reset integration to start now
|
||||
Reset() IntegratorResults
|
||||
}
|
||||
|
||||
// IntegratorResults holds statistical abstracts of the integration
|
||||
type IntegratorResults struct {
|
||||
Duration float64 //seconds
|
||||
Average float64 //time-weighted
|
||||
Deviation float64 //standard deviation: sqrt(avg((value-avg)^2))
|
||||
Min, Max float64
|
||||
}
|
||||
|
||||
// Equal tests for semantic equality.
|
||||
// This considers all NaN values to be equal to each other.
|
||||
func (x *IntegratorResults) Equal(y *IntegratorResults) bool {
|
||||
return x == y || x != nil && y != nil && x.Duration == y.Duration && x.Min == y.Min && x.Max == y.Max && (x.Average == y.Average || math.IsNaN(x.Average) && math.IsNaN(y.Average)) && (x.Deviation == y.Deviation || math.IsNaN(x.Deviation) && math.IsNaN(y.Deviation))
|
||||
}
|
||||
|
||||
type integrator struct {
|
||||
clock clock.PassiveClock
|
||||
sync.Mutex
|
||||
lastTime time.Time
|
||||
x float64
|
||||
moments Moments
|
||||
min, max float64
|
||||
}
|
||||
|
||||
// NewIntegrator makes one that uses the given clock
|
||||
func NewIntegrator(clock clock.PassiveClock) Integrator {
|
||||
return &integrator{
|
||||
clock: clock,
|
||||
lastTime: clock.Now(),
|
||||
}
|
||||
}
|
||||
|
||||
func (igr *integrator) SetX1(x1 float64) {
|
||||
}
|
||||
|
||||
func (igr *integrator) Set(x float64) {
|
||||
igr.Lock()
|
||||
igr.setLocked(x)
|
||||
igr.Unlock()
|
||||
}
|
||||
|
||||
func (igr *integrator) setLocked(x float64) {
|
||||
igr.updateLocked()
|
||||
igr.x = x
|
||||
if x < igr.min {
|
||||
igr.min = x
|
||||
}
|
||||
if x > igr.max {
|
||||
igr.max = x
|
||||
}
|
||||
}
|
||||
|
||||
func (igr *integrator) Add(deltaX float64) {
|
||||
igr.Lock()
|
||||
igr.setLocked(igr.x + deltaX)
|
||||
igr.Unlock()
|
||||
}
|
||||
|
||||
func (igr *integrator) updateLocked() {
|
||||
now := igr.clock.Now()
|
||||
dt := now.Sub(igr.lastTime).Seconds()
|
||||
igr.lastTime = now
|
||||
igr.moments = igr.moments.Add(ConstantMoments(dt, igr.x))
|
||||
}
|
||||
|
||||
func (igr *integrator) GetResults() IntegratorResults {
|
||||
igr.Lock()
|
||||
defer igr.Unlock()
|
||||
return igr.getResultsLocked()
|
||||
}
|
||||
|
||||
func (igr *integrator) Reset() IntegratorResults {
|
||||
igr.Lock()
|
||||
defer igr.Unlock()
|
||||
results := igr.getResultsLocked()
|
||||
igr.moments = Moments{}
|
||||
igr.min = igr.x
|
||||
igr.max = igr.x
|
||||
return results
|
||||
}
|
||||
|
||||
func (igr *integrator) getResultsLocked() (results IntegratorResults) {
|
||||
igr.updateLocked()
|
||||
results.Min, results.Max = igr.min, igr.max
|
||||
results.Duration = igr.moments.ElapsedSeconds
|
||||
results.Average, results.Deviation = igr.moments.AvgAndStdDev()
|
||||
return
|
||||
}
|
||||
|
||||
// Moments are the integrals of the 0, 1, and 2 powers of some
|
||||
// variable X over some range of time.
|
||||
type Moments struct {
|
||||
ElapsedSeconds float64 // integral of dt
|
||||
IntegralX float64 // integral of x dt
|
||||
IntegralXX float64 // integral of x*x dt
|
||||
}
|
||||
|
||||
// ConstantMoments is for a constant X
|
||||
func ConstantMoments(dt, x float64) Moments {
|
||||
return Moments{
|
||||
ElapsedSeconds: dt,
|
||||
IntegralX: x * dt,
|
||||
IntegralXX: x * x * dt,
|
||||
}
|
||||
}
|
||||
|
||||
// Add combines over two ranges of time
|
||||
func (igr Moments) Add(ogr Moments) Moments {
|
||||
return Moments{
|
||||
ElapsedSeconds: igr.ElapsedSeconds + ogr.ElapsedSeconds,
|
||||
IntegralX: igr.IntegralX + ogr.IntegralX,
|
||||
IntegralXX: igr.IntegralXX + ogr.IntegralXX,
|
||||
}
|
||||
}
|
||||
|
||||
// Sub finds the difference between a range of time and a subrange
|
||||
func (igr Moments) Sub(ogr Moments) Moments {
|
||||
return Moments{
|
||||
ElapsedSeconds: igr.ElapsedSeconds - ogr.ElapsedSeconds,
|
||||
IntegralX: igr.IntegralX - ogr.IntegralX,
|
||||
IntegralXX: igr.IntegralXX - ogr.IntegralXX,
|
||||
}
|
||||
}
|
||||
|
||||
// AvgAndStdDev returns the average and standard devation
|
||||
func (igr Moments) AvgAndStdDev() (float64, float64) {
|
||||
if igr.ElapsedSeconds <= 0 {
|
||||
return math.NaN(), math.NaN()
|
||||
}
|
||||
avg := igr.IntegralX / igr.ElapsedSeconds
|
||||
// standard deviation is sqrt( average( (x - xbar)^2 ) )
|
||||
// = sqrt( Integral( x^2 + xbar^2 -2*x*xbar dt ) / Duration )
|
||||
// = sqrt( ( Integral( x^2 dt ) + Duration * xbar^2 - 2*xbar*Integral(x dt) ) / Duration)
|
||||
// = sqrt( Integral(x^2 dt)/Duration - xbar^2 )
|
||||
variance := igr.IntegralXX/igr.ElapsedSeconds - avg*avg
|
||||
if variance >= 0 {
|
||||
return avg, math.Sqrt(variance)
|
||||
}
|
||||
return avg, math.NaN()
|
||||
}
|
@@ -0,0 +1,53 @@
|
||||
/*
|
||||
Copyright 2020 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package fairqueuing
|
||||
|
||||
import (
|
||||
"math"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"k8s.io/apimachinery/pkg/util/clock"
|
||||
)
|
||||
|
||||
func TestIntegrator(t *testing.T) {
|
||||
now := time.Now()
|
||||
clk := clock.NewFakeClock(now)
|
||||
igr := NewIntegrator(clk)
|
||||
igr.Add(3)
|
||||
clk.Step(time.Second)
|
||||
results := igr.GetResults()
|
||||
rToo := igr.Reset()
|
||||
if e := (IntegratorResults{Duration: time.Second.Seconds(), Average: 3, Deviation: 0, Min: 0, Max: 3}); !e.Equal(&results) {
|
||||
t.Errorf("expected %#+v, got %#+v", e, results)
|
||||
}
|
||||
if !results.Equal(&rToo) {
|
||||
t.Errorf("expected %#+v, got %#+v", results, rToo)
|
||||
}
|
||||
igr.Set(2)
|
||||
results = igr.GetResults()
|
||||
if e := (IntegratorResults{Duration: 0, Average: math.NaN(), Deviation: math.NaN(), Min: 2, Max: 3}); !e.Equal(&results) {
|
||||
t.Errorf("expected %#+v, got %#+v", e, results)
|
||||
}
|
||||
clk.Step(time.Millisecond)
|
||||
igr.Add(-1)
|
||||
clk.Step(time.Millisecond)
|
||||
results = igr.GetResults()
|
||||
if e := (IntegratorResults{Duration: 2 * time.Millisecond.Seconds(), Average: 1.5, Deviation: 0.5, Min: 1, Max: 3}); !e.Equal(&results) {
|
||||
t.Errorf("expected %#+v, got %#+v", e, results)
|
||||
}
|
||||
}
|
@@ -21,6 +21,7 @@ import (
|
||||
"time"
|
||||
|
||||
"k8s.io/apiserver/pkg/util/flowcontrol/debug"
|
||||
"k8s.io/apiserver/pkg/util/flowcontrol/metrics"
|
||||
)
|
||||
|
||||
// QueueSetFactory is used to create QueueSet objects. Creation, like
|
||||
@@ -30,7 +31,7 @@ import (
|
||||
// before committing to a concurrency allotment for the second.
|
||||
type QueueSetFactory interface {
|
||||
// BeginConstruction does the first phase of creating a QueueSet
|
||||
BeginConstruction(QueuingConfig) (QueueSetCompleter, error)
|
||||
BeginConstruction(QueuingConfig, metrics.TimedObserverPair) (QueueSetCompleter, error)
|
||||
}
|
||||
|
||||
// QueueSetCompleter finishes the two-step process of creating or
|
||||
@@ -79,7 +80,7 @@ type QueueSet interface {
|
||||
// was idle at the moment of the return. Otherwise idle==false
|
||||
// and the client must call the Finish method of the Request
|
||||
// exactly once.
|
||||
StartRequest(ctx context.Context, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}) (req Request, idle bool)
|
||||
StartRequest(ctx context.Context, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn QueueNoteFn) (req Request, idle bool)
|
||||
|
||||
// Dump saves and returns the instant internal state of the queue-set.
|
||||
// Note that dumping process will stop the queue-set from proceeding
|
||||
@@ -88,6 +89,9 @@ type QueueSet interface {
|
||||
Dump(includeRequestDetails bool) debug.QueueSetDump
|
||||
}
|
||||
|
||||
// QueueNoteFn is called when a request enters and leaves a queue
|
||||
type QueueNoteFn func(inQueue bool)
|
||||
|
||||
// Request represents the remainder of the handling of one request
|
||||
type Request interface {
|
||||
// Finish determines whether to execute or reject the request and
|
||||
|
@@ -31,6 +31,7 @@ go_test(
|
||||
srcs = ["queueset_test.go"],
|
||||
embed = [":go_default_library"],
|
||||
deps = [
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/counter:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing:go_default_library",
|
||||
|
@@ -49,6 +49,7 @@ type queueSetFactory struct {
|
||||
// the fields `factory` and `theSet` is non-nil.
|
||||
type queueSetCompleter struct {
|
||||
factory *queueSetFactory
|
||||
obsPair metrics.TimedObserverPair
|
||||
theSet *queueSet
|
||||
qCfg fq.QueuingConfig
|
||||
dealer *shufflesharding.Dealer
|
||||
@@ -67,6 +68,7 @@ type queueSet struct {
|
||||
clock clock.PassiveClock
|
||||
counter counter.GoRoutineCounter
|
||||
estimatedServiceTime float64
|
||||
obsPair metrics.TimedObserverPair
|
||||
|
||||
lock sync.Mutex
|
||||
|
||||
@@ -116,13 +118,14 @@ func NewQueueSetFactory(c clock.PassiveClock, counter counter.GoRoutineCounter)
|
||||
}
|
||||
}
|
||||
|
||||
func (qsf *queueSetFactory) BeginConstruction(qCfg fq.QueuingConfig) (fq.QueueSetCompleter, error) {
|
||||
func (qsf *queueSetFactory) BeginConstruction(qCfg fq.QueuingConfig, obsPair metrics.TimedObserverPair) (fq.QueueSetCompleter, error) {
|
||||
dealer, err := checkConfig(qCfg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &queueSetCompleter{
|
||||
factory: qsf,
|
||||
obsPair: obsPair,
|
||||
qCfg: qCfg,
|
||||
dealer: dealer}, nil
|
||||
}
|
||||
@@ -148,6 +151,7 @@ func (qsc *queueSetCompleter) Complete(dCfg fq.DispatchingConfig) fq.QueueSet {
|
||||
clock: qsc.factory.clock,
|
||||
counter: qsc.factory.counter,
|
||||
estimatedServiceTime: 60,
|
||||
obsPair: qsc.obsPair,
|
||||
qCfg: qsc.qCfg,
|
||||
virtualTime: 0,
|
||||
lastRealTime: qsc.factory.clock.Now(),
|
||||
@@ -203,6 +207,12 @@ func (qs *queueSet) setConfiguration(qCfg fq.QueuingConfig, dealer *shuffleshard
|
||||
qs.qCfg = qCfg
|
||||
qs.dCfg = dCfg
|
||||
qs.dealer = dealer
|
||||
qll := qCfg.QueueLengthLimit
|
||||
if qll < 1 {
|
||||
qll = 1
|
||||
}
|
||||
qs.obsPair.RequestsWaiting.SetX1(float64(qll))
|
||||
qs.obsPair.RequestsExecuting.SetX1(float64(dCfg.ConcurrencyLimit))
|
||||
|
||||
qs.dispatchAsMuchAsPossibleLocked()
|
||||
}
|
||||
@@ -222,7 +232,7 @@ const (
|
||||
// executing at each point where there is a change in that quantity,
|
||||
// because the metrics --- and only the metrics --- track that
|
||||
// quantity per FlowSchema.
|
||||
func (qs *queueSet) StartRequest(ctx context.Context, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}) (fq.Request, bool) {
|
||||
func (qs *queueSet) StartRequest(ctx context.Context, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn fq.QueueNoteFn) (fq.Request, bool) {
|
||||
qs.lockAndSyncTime()
|
||||
defer qs.lock.Unlock()
|
||||
var req *request
|
||||
@@ -247,7 +257,7 @@ func (qs *queueSet) StartRequest(ctx context.Context, hashValue uint64, flowDist
|
||||
// 3) Reject current request if there is not enough concurrency shares and
|
||||
// we are at max queue length
|
||||
// 4) If not rejected, create a request and enqueue
|
||||
req = qs.timeoutOldRequestsAndRejectOrEnqueueLocked(ctx, hashValue, flowDistinguisher, fsName, descr1, descr2)
|
||||
req = qs.timeoutOldRequestsAndRejectOrEnqueueLocked(ctx, hashValue, flowDistinguisher, fsName, descr1, descr2, queueNoteFn)
|
||||
// req == nil means that the request was rejected - no remaining
|
||||
// concurrency shares and at max queue length already
|
||||
if req == nil {
|
||||
@@ -295,6 +305,12 @@ func (qs *queueSet) StartRequest(ctx context.Context, hashValue uint64, flowDist
|
||||
return req, false
|
||||
}
|
||||
|
||||
func (req *request) NoteQueued(inQueue bool) {
|
||||
if req.queueNoteFn != nil {
|
||||
req.queueNoteFn(inQueue)
|
||||
}
|
||||
}
|
||||
|
||||
func (req *request) Finish(execFn func()) bool {
|
||||
exec, idle := req.wait()
|
||||
if !exec {
|
||||
@@ -399,7 +415,7 @@ func (qs *queueSet) getVirtualTimeRatioLocked() float64 {
|
||||
// returns the enqueud request on a successful enqueue
|
||||
// returns nil in the case that there is no available concurrency or
|
||||
// the queuelengthlimit has been reached
|
||||
func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Context, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}) *request {
|
||||
func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Context, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn fq.QueueNoteFn) *request {
|
||||
// Start with the shuffle sharding, to pick a queue.
|
||||
queueIdx := qs.chooseQueueIndexLocked(hashValue, descr1, descr2)
|
||||
queue := qs.queues[queueIdx]
|
||||
@@ -420,6 +436,7 @@ func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Conte
|
||||
queue: queue,
|
||||
descr1: descr1,
|
||||
descr2: descr2,
|
||||
queueNoteFn: queueNoteFn,
|
||||
}
|
||||
if ok := qs.rejectOrEnqueueLocked(req); !ok {
|
||||
return nil
|
||||
@@ -463,6 +480,7 @@ func (qs *queueSet) removeTimedOutRequestsFromQueueLocked(queue *queue, fsName s
|
||||
// get index for timed out requests
|
||||
timeoutIdx = i
|
||||
metrics.AddRequestsInQueues(qs.qCfg.Name, req.fsName, -1)
|
||||
req.NoteQueued(false)
|
||||
} else {
|
||||
break
|
||||
}
|
||||
@@ -475,6 +493,7 @@ func (qs *queueSet) removeTimedOutRequestsFromQueueLocked(queue *queue, fsName s
|
||||
queue.requests = reqs[removeIdx:]
|
||||
// decrement the # of requestsEnqueued
|
||||
qs.totRequestsWaiting -= removeIdx
|
||||
qs.obsPair.RequestsWaiting.Add(float64(-removeIdx))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -498,16 +517,19 @@ func (qs *queueSet) rejectOrEnqueueLocked(request *request) bool {
|
||||
// enqueues a request into its queue.
|
||||
func (qs *queueSet) enqueueLocked(request *request) {
|
||||
queue := request.queue
|
||||
now := qs.clock.Now()
|
||||
if len(queue.requests) == 0 && queue.requestsExecuting == 0 {
|
||||
// the queue’s virtual start time is set to the virtual time.
|
||||
queue.virtualStart = qs.virtualTime
|
||||
if klog.V(6).Enabled() {
|
||||
klog.Infof("QS(%s) at r=%s v=%.9fs: initialized queue %d virtual start time due to request %#+v %#+v", qs.qCfg.Name, qs.clock.Now().Format(nsTimeFmt), queue.virtualStart, queue.index, request.descr1, request.descr2)
|
||||
klog.Infof("QS(%s) at r=%s v=%.9fs: initialized queue %d virtual start time due to request %#+v %#+v", qs.qCfg.Name, now.Format(nsTimeFmt), queue.virtualStart, queue.index, request.descr1, request.descr2)
|
||||
}
|
||||
}
|
||||
queue.Enqueue(request)
|
||||
qs.totRequestsWaiting++
|
||||
metrics.AddRequestsInQueues(qs.qCfg.Name, request.fsName, 1)
|
||||
request.NoteQueued(true)
|
||||
qs.obsPair.RequestsWaiting.Add(1)
|
||||
}
|
||||
|
||||
// dispatchAsMuchAsPossibleLocked runs a loop, as long as there
|
||||
@@ -541,6 +563,7 @@ func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, flowDistinguish
|
||||
req.decision.SetLocked(decisionExecute)
|
||||
qs.totRequestsExecuting++
|
||||
metrics.AddRequestsExecuting(qs.qCfg.Name, fsName, 1)
|
||||
qs.obsPair.RequestsExecuting.Add(1)
|
||||
if klog.V(5).Enabled() {
|
||||
klog.Infof("QS(%s) at r=%s v=%.9fs: immediate dispatch of request %q %#+v %#+v, qs will have %d executing", qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, fsName, descr1, descr2, qs.totRequestsExecuting)
|
||||
}
|
||||
@@ -570,7 +593,10 @@ func (qs *queueSet) dispatchLocked() bool {
|
||||
qs.totRequestsExecuting++
|
||||
queue.requestsExecuting++
|
||||
metrics.AddRequestsInQueues(qs.qCfg.Name, request.fsName, -1)
|
||||
request.NoteQueued(false)
|
||||
metrics.AddRequestsExecuting(qs.qCfg.Name, request.fsName, 1)
|
||||
qs.obsPair.RequestsWaiting.Add(-1)
|
||||
qs.obsPair.RequestsExecuting.Add(1)
|
||||
if klog.V(6).Enabled() {
|
||||
klog.Infof("QS(%s) at r=%s v=%.9fs: dispatching request %#+v %#+v from queue %d with virtual start time %.9fs, queue will have %d waiting & %d executing", qs.qCfg.Name, request.startTime.Format(nsTimeFmt), qs.virtualTime, request.descr1, request.descr2, queue.index, queue.virtualStart, len(queue.requests), queue.requestsExecuting)
|
||||
}
|
||||
@@ -599,6 +625,8 @@ func (qs *queueSet) cancelWait(req *request) {
|
||||
queue.requests = append(queue.requests[:i], queue.requests[i+1:]...)
|
||||
qs.totRequestsWaiting--
|
||||
metrics.AddRequestsInQueues(qs.qCfg.Name, req.fsName, -1)
|
||||
req.NoteQueued(false)
|
||||
qs.obsPair.RequestsWaiting.Add(-1)
|
||||
break
|
||||
}
|
||||
}
|
||||
@@ -650,17 +678,19 @@ func (qs *queueSet) finishRequestAndDispatchAsMuchAsPossible(req *request) bool
|
||||
// previously dispatched request has completed it's service. This
|
||||
// callback updates important state in the queueSet
|
||||
func (qs *queueSet) finishRequestLocked(r *request) {
|
||||
now := qs.clock.Now()
|
||||
qs.totRequestsExecuting--
|
||||
metrics.AddRequestsExecuting(qs.qCfg.Name, r.fsName, -1)
|
||||
qs.obsPair.RequestsExecuting.Add(-1)
|
||||
|
||||
if r.queue == nil {
|
||||
if klog.V(6).Enabled() {
|
||||
klog.Infof("QS(%s) at r=%s v=%.9fs: request %#+v %#+v finished, qs will have %d executing", qs.qCfg.Name, qs.clock.Now().Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, qs.totRequestsExecuting)
|
||||
klog.Infof("QS(%s) at r=%s v=%.9fs: request %#+v %#+v finished, qs will have %d executing", qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, qs.totRequestsExecuting)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
S := qs.clock.Since(r.startTime).Seconds()
|
||||
S := now.Sub(r.startTime).Seconds()
|
||||
|
||||
// When a request finishes being served, and the actual service time was S,
|
||||
// the queue’s virtual start time is decremented by G - S.
|
||||
@@ -670,7 +700,7 @@ func (qs *queueSet) finishRequestLocked(r *request) {
|
||||
r.queue.requestsExecuting--
|
||||
|
||||
if klog.V(6).Enabled() {
|
||||
klog.Infof("QS(%s) at r=%s v=%.9fs: request %#+v %#+v finished, adjusted queue %d virtual start time to %.9fs due to service time %.9fs, queue will have %d waiting & %d executing", qs.qCfg.Name, qs.clock.Now().Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, r.queue.index, r.queue.virtualStart, S, len(r.queue.requests), r.queue.requestsExecuting)
|
||||
klog.Infof("QS(%s) at r=%s v=%.9fs: request %#+v %#+v finished, adjusted queue %d virtual start time to %.9fs due to service time %.9fs, queue will have %d waiting & %d executing", qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, r.queue.index, r.queue.virtualStart, S, len(r.queue.requests), r.queue.requestsExecuting)
|
||||
}
|
||||
|
||||
// If there are more queues than desired and this one has no
|
||||
|
@@ -26,10 +26,11 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"k8s.io/apimachinery/pkg/util/clock"
|
||||
"k8s.io/apiserver/pkg/util/flowcontrol/counter"
|
||||
fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing"
|
||||
test "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing"
|
||||
"k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/clock"
|
||||
testclock "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/clock"
|
||||
"k8s.io/apiserver/pkg/util/flowcontrol/metrics"
|
||||
"k8s.io/klog/v2"
|
||||
)
|
||||
@@ -128,7 +129,7 @@ type uniformScenario struct {
|
||||
expectAllRequests bool
|
||||
evalInqueueMetrics, evalExecutingMetrics bool
|
||||
rejectReason string
|
||||
clk *clock.FakeEventClock
|
||||
clk *testclock.FakeEventClock
|
||||
counter counter.GoRoutineCounter
|
||||
}
|
||||
|
||||
@@ -137,7 +138,7 @@ func (us uniformScenario) exercise(t *testing.T) {
|
||||
t: t,
|
||||
uniformScenario: us,
|
||||
startTime: time.Now(),
|
||||
integrators: make([]test.Integrator, len(us.clients)),
|
||||
integrators: make([]fq.Integrator, len(us.clients)),
|
||||
executions: make([]int32, len(us.clients)),
|
||||
rejects: make([]int32, len(us.clients)),
|
||||
}
|
||||
@@ -152,7 +153,7 @@ type uniformScenarioState struct {
|
||||
uniformScenario
|
||||
startTime time.Time
|
||||
doSplit bool
|
||||
integrators []test.Integrator
|
||||
integrators []fq.Integrator
|
||||
failedCount uint64
|
||||
expectedInqueue, expectedExecuting string
|
||||
executions, rejects []int32
|
||||
@@ -164,7 +165,7 @@ func (uss *uniformScenarioState) exercise() {
|
||||
metrics.Reset()
|
||||
}
|
||||
for i, uc := range uss.clients {
|
||||
uss.integrators[i] = test.NewIntegrator(uss.clk)
|
||||
uss.integrators[i] = fq.NewIntegrator(uss.clk)
|
||||
fsName := fmt.Sprintf("client%d", i)
|
||||
uss.expectedInqueue = uss.expectedInqueue + fmt.Sprintf(` apiserver_flowcontrol_current_inqueue_requests{flowSchema=%q,priorityLevel=%q} 0%s`, fsName, uss.name, "\n")
|
||||
for j := 0; j < uc.nThreads; j++ {
|
||||
@@ -193,7 +194,7 @@ type uniformScenarioThread struct {
|
||||
i, j int
|
||||
nCalls int
|
||||
uc uniformClient
|
||||
igr test.Integrator
|
||||
igr fq.Integrator
|
||||
fsName string
|
||||
}
|
||||
|
||||
@@ -223,7 +224,7 @@ func (ust *uniformScenarioThread) callK(k int) {
|
||||
if k >= ust.nCalls {
|
||||
return
|
||||
}
|
||||
req, idle := ust.uss.qs.StartRequest(context.Background(), ust.uc.hash, "", ust.fsName, ust.uss.name, []int{ust.i, ust.j, k})
|
||||
req, idle := ust.uss.qs.StartRequest(context.Background(), ust.uc.hash, "", ust.fsName, ust.uss.name, []int{ust.i, ust.j, k}, nil)
|
||||
ust.uss.t.Logf("%s: %d, %d, %d got req=%p, idle=%v", ust.uss.clk.Now().Format(nsTimeFmt), ust.i, ust.j, k, req, idle)
|
||||
if req == nil {
|
||||
atomic.AddUint64(&ust.uss.failedCount, 1)
|
||||
@@ -236,7 +237,8 @@ func (ust *uniformScenarioThread) callK(k int) {
|
||||
var executed bool
|
||||
idle2 := req.Finish(func() {
|
||||
executed = true
|
||||
ust.uss.t.Logf("%s: %d, %d, %d executing", ust.uss.clk.Now().Format(nsTimeFmt), ust.i, ust.j, k)
|
||||
execStart := ust.uss.clk.Now()
|
||||
ust.uss.t.Logf("%s: %d, %d, %d executing", execStart.Format(nsTimeFmt), ust.i, ust.j, k)
|
||||
atomic.AddInt32(&ust.uss.executions[ust.i], 1)
|
||||
ust.igr.Add(1)
|
||||
ust.uss.clk.EventAfterDuration(ust.genCallK(k+1), ust.uc.execDuration+ust.uc.thinkDuration)
|
||||
@@ -339,7 +341,7 @@ func (uss *uniformScenarioState) finalReview() {
|
||||
}
|
||||
}
|
||||
|
||||
func ClockWait(clk *clock.FakeEventClock, counter counter.GoRoutineCounter, duration time.Duration) {
|
||||
func ClockWait(clk *testclock.FakeEventClock, counter counter.GoRoutineCounter, duration time.Duration) {
|
||||
dunch := make(chan struct{})
|
||||
clk.EventAfterDuration(func(time.Time) {
|
||||
counter.Add(1)
|
||||
@@ -359,8 +361,8 @@ func init() {
|
||||
func TestNoRestraint(t *testing.T) {
|
||||
metrics.Register()
|
||||
now := time.Now()
|
||||
clk, counter := clock.NewFakeEventClock(now, 0, nil)
|
||||
nrc, err := test.NewNoRestraintFactory().BeginConstruction(fq.QueuingConfig{})
|
||||
clk, counter := testclock.NewFakeEventClock(now, 0, nil)
|
||||
nrc, err := test.NewNoRestraintFactory().BeginConstruction(fq.QueuingConfig{}, newObserverPair(clk))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -384,7 +386,7 @@ func TestUniformFlowsHandSize1(t *testing.T) {
|
||||
metrics.Register()
|
||||
now := time.Now()
|
||||
|
||||
clk, counter := clock.NewFakeEventClock(now, 0, nil)
|
||||
clk, counter := testclock.NewFakeEventClock(now, 0, nil)
|
||||
qsf := NewQueueSetFactory(clk, counter)
|
||||
qCfg := fq.QueuingConfig{
|
||||
Name: "TestUniformFlowsHandSize1",
|
||||
@@ -393,7 +395,7 @@ func TestUniformFlowsHandSize1(t *testing.T) {
|
||||
HandSize: 1,
|
||||
RequestWaitLimit: 10 * time.Minute,
|
||||
}
|
||||
qsc, err := qsf.BeginConstruction(qCfg)
|
||||
qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -420,7 +422,7 @@ func TestUniformFlowsHandSize3(t *testing.T) {
|
||||
metrics.Register()
|
||||
now := time.Now()
|
||||
|
||||
clk, counter := clock.NewFakeEventClock(now, 0, nil)
|
||||
clk, counter := testclock.NewFakeEventClock(now, 0, nil)
|
||||
qsf := NewQueueSetFactory(clk, counter)
|
||||
qCfg := fq.QueuingConfig{
|
||||
Name: "TestUniformFlowsHandSize3",
|
||||
@@ -429,7 +431,7 @@ func TestUniformFlowsHandSize3(t *testing.T) {
|
||||
HandSize: 3,
|
||||
RequestWaitLimit: 10 * time.Minute,
|
||||
}
|
||||
qsc, err := qsf.BeginConstruction(qCfg)
|
||||
qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -455,7 +457,7 @@ func TestDifferentFlowsExpectEqual(t *testing.T) {
|
||||
metrics.Register()
|
||||
now := time.Now()
|
||||
|
||||
clk, counter := clock.NewFakeEventClock(now, 0, nil)
|
||||
clk, counter := testclock.NewFakeEventClock(now, 0, nil)
|
||||
qsf := NewQueueSetFactory(clk, counter)
|
||||
qCfg := fq.QueuingConfig{
|
||||
Name: "DiffFlowsExpectEqual",
|
||||
@@ -464,7 +466,7 @@ func TestDifferentFlowsExpectEqual(t *testing.T) {
|
||||
HandSize: 1,
|
||||
RequestWaitLimit: 10 * time.Minute,
|
||||
}
|
||||
qsc, err := qsf.BeginConstruction(qCfg)
|
||||
qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -491,7 +493,7 @@ func TestDifferentFlowsExpectUnequal(t *testing.T) {
|
||||
metrics.Register()
|
||||
now := time.Now()
|
||||
|
||||
clk, counter := clock.NewFakeEventClock(now, 0, nil)
|
||||
clk, counter := testclock.NewFakeEventClock(now, 0, nil)
|
||||
qsf := NewQueueSetFactory(clk, counter)
|
||||
qCfg := fq.QueuingConfig{
|
||||
Name: "DiffFlowsExpectUnequal",
|
||||
@@ -500,7 +502,7 @@ func TestDifferentFlowsExpectUnequal(t *testing.T) {
|
||||
HandSize: 1,
|
||||
RequestWaitLimit: 10 * time.Minute,
|
||||
}
|
||||
qsc, err := qsf.BeginConstruction(qCfg)
|
||||
qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -527,7 +529,7 @@ func TestWindup(t *testing.T) {
|
||||
metrics.Register()
|
||||
now := time.Now()
|
||||
|
||||
clk, counter := clock.NewFakeEventClock(now, 0, nil)
|
||||
clk, counter := testclock.NewFakeEventClock(now, 0, nil)
|
||||
qsf := NewQueueSetFactory(clk, counter)
|
||||
qCfg := fq.QueuingConfig{
|
||||
Name: "TestWindup",
|
||||
@@ -536,7 +538,7 @@ func TestWindup(t *testing.T) {
|
||||
HandSize: 1,
|
||||
RequestWaitLimit: 10 * time.Minute,
|
||||
}
|
||||
qsc, err := qsf.BeginConstruction(qCfg)
|
||||
qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -562,13 +564,13 @@ func TestDifferentFlowsWithoutQueuing(t *testing.T) {
|
||||
metrics.Register()
|
||||
now := time.Now()
|
||||
|
||||
clk, counter := clock.NewFakeEventClock(now, 0, nil)
|
||||
clk, counter := testclock.NewFakeEventClock(now, 0, nil)
|
||||
qsf := NewQueueSetFactory(clk, counter)
|
||||
qCfg := fq.QueuingConfig{
|
||||
Name: "TestDifferentFlowsWithoutQueuing",
|
||||
DesiredNumQueues: 0,
|
||||
}
|
||||
qsc, err := qsf.BeginConstruction(qCfg)
|
||||
qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -594,7 +596,7 @@ func TestTimeout(t *testing.T) {
|
||||
metrics.Register()
|
||||
now := time.Now()
|
||||
|
||||
clk, counter := clock.NewFakeEventClock(now, 0, nil)
|
||||
clk, counter := testclock.NewFakeEventClock(now, 0, nil)
|
||||
qsf := NewQueueSetFactory(clk, counter)
|
||||
qCfg := fq.QueuingConfig{
|
||||
Name: "TestTimeout",
|
||||
@@ -603,7 +605,7 @@ func TestTimeout(t *testing.T) {
|
||||
HandSize: 1,
|
||||
RequestWaitLimit: 0,
|
||||
}
|
||||
qsc, err := qsf.BeginConstruction(qCfg)
|
||||
qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -629,7 +631,7 @@ func TestContextCancel(t *testing.T) {
|
||||
metrics.Register()
|
||||
metrics.Reset()
|
||||
now := time.Now()
|
||||
clk, counter := clock.NewFakeEventClock(now, 0, nil)
|
||||
clk, counter := testclock.NewFakeEventClock(now, 0, nil)
|
||||
qsf := NewQueueSetFactory(clk, counter)
|
||||
qCfg := fq.QueuingConfig{
|
||||
Name: "TestContextCancel",
|
||||
@@ -638,18 +640,26 @@ func TestContextCancel(t *testing.T) {
|
||||
HandSize: 1,
|
||||
RequestWaitLimit: 15 * time.Second,
|
||||
}
|
||||
qsc, err := qsf.BeginConstruction(qCfg)
|
||||
qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 1})
|
||||
counter.Add(1) // account for the goroutine running this test
|
||||
ctx1 := context.Background()
|
||||
req1, _ := qs.StartRequest(ctx1, 1, "", "fs1", "test", "one")
|
||||
b2i := map[bool]int{false: 0, true: 1}
|
||||
var qnc [2][2]int32
|
||||
req1, _ := qs.StartRequest(ctx1, 1, "", "fs1", "test", "one", func(inQueue bool) { atomic.AddInt32(&qnc[0][b2i[inQueue]], 1) })
|
||||
if req1 == nil {
|
||||
t.Error("Request rejected")
|
||||
return
|
||||
}
|
||||
if a := atomic.AddInt32(&qnc[0][0], 0); a != 1 {
|
||||
t.Errorf("Got %d calls to queueNoteFn1(false), expected 1", a)
|
||||
}
|
||||
if a := atomic.AddInt32(&qnc[0][1], 0); a != 1 {
|
||||
t.Errorf("Got %d calls to queueNoteFn1(true), expected 1", a)
|
||||
}
|
||||
var executed1 bool
|
||||
idle1 := req1.Finish(func() {
|
||||
executed1 = true
|
||||
@@ -657,11 +667,17 @@ func TestContextCancel(t *testing.T) {
|
||||
tBefore := time.Now()
|
||||
go func() {
|
||||
time.Sleep(time.Second)
|
||||
if a := atomic.AddInt32(&qnc[1][0], 0); a != 0 {
|
||||
t.Errorf("Got %d calls to queueNoteFn2(false), expected 0", a)
|
||||
}
|
||||
if a := atomic.AddInt32(&qnc[1][1], 0); a != 1 {
|
||||
t.Errorf("Got %d calls to queueNoteFn2(true), expected 1", a)
|
||||
}
|
||||
// account for unblocking the goroutine that waits on cancelation
|
||||
counter.Add(1)
|
||||
cancel2()
|
||||
}()
|
||||
req2, idle2a := qs.StartRequest(ctx2, 2, "", "fs2", "test", "two")
|
||||
req2, idle2a := qs.StartRequest(ctx2, 2, "", "fs2", "test", "two", func(inQueue bool) { atomic.AddInt32(&qnc[1][b2i[inQueue]], 1) })
|
||||
if idle2a {
|
||||
t.Error("2nd StartRequest returned idle")
|
||||
}
|
||||
@@ -672,6 +688,9 @@ func TestContextCancel(t *testing.T) {
|
||||
if idle2b {
|
||||
t.Error("2nd Finish returned idle")
|
||||
}
|
||||
if a := atomic.AddInt32(&qnc[1][0], 0); a != 1 {
|
||||
t.Errorf("Got %d calls to queueNoteFn2(false), expected 1", a)
|
||||
}
|
||||
}
|
||||
tAfter := time.Now()
|
||||
dt := tAfter.Sub(tBefore)
|
||||
@@ -686,3 +705,7 @@ func TestContextCancel(t *testing.T) {
|
||||
t.Error("Not idle at the end")
|
||||
}
|
||||
}
|
||||
|
||||
func newObserverPair(clk clock.PassiveClock) metrics.TimedObserverPair {
|
||||
return metrics.PriorityLevelConcurrencyObserverPairGenerator.Generate(1, 1, []string{"test"})
|
||||
}
|
||||
|
@@ -22,6 +22,7 @@ import (
|
||||
|
||||
genericrequest "k8s.io/apiserver/pkg/endpoints/request"
|
||||
"k8s.io/apiserver/pkg/util/flowcontrol/debug"
|
||||
fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing"
|
||||
"k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise"
|
||||
)
|
||||
|
||||
@@ -58,6 +59,8 @@ type request struct {
|
||||
|
||||
// Indicates whether client has called Request::Wait()
|
||||
waitStarted bool
|
||||
|
||||
queueNoteFn fq.QueueNoteFn
|
||||
}
|
||||
|
||||
// queue is an array of requests with additional metadata required for
|
||||
|
@@ -2,17 +2,14 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library")
|
||||
|
||||
go_library(
|
||||
name = "go_default_library",
|
||||
srcs = [
|
||||
"integrator.go",
|
||||
"no-restraint.go",
|
||||
],
|
||||
srcs = ["no-restraint.go"],
|
||||
importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing",
|
||||
importpath = "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing",
|
||||
visibility = ["//visibility:public"],
|
||||
deps = [
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/debug:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
|
@@ -1,116 +0,0 @@
|
||||
/*
|
||||
Copyright 2019 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package testing
|
||||
|
||||
import (
|
||||
"math"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"k8s.io/apimachinery/pkg/util/clock"
|
||||
)
|
||||
|
||||
// Integrator computes the integral of some variable X over time as
|
||||
// read from a particular clock. The integral starts when the
|
||||
// Integrator is created, and ends at the latest operation on the
|
||||
// Integrator.
|
||||
type Integrator interface {
|
||||
Set(float64) // set the value of X
|
||||
Add(float64) // add the given quantity to X
|
||||
GetResults() IntegratorResults
|
||||
Reset() IntegratorResults // restart the integration from now
|
||||
}
|
||||
|
||||
// IntegratorResults holds statistical abstracts of the integration
|
||||
type IntegratorResults struct {
|
||||
Duration float64 //seconds
|
||||
Average float64
|
||||
Deviation float64 //sqrt(avg((value-avg)^2))
|
||||
}
|
||||
|
||||
type integrator struct {
|
||||
sync.Mutex
|
||||
clk clock.PassiveClock
|
||||
lastTime time.Time
|
||||
x float64
|
||||
integrals [3]float64 // integral of x^0, x^1, and x^2
|
||||
}
|
||||
|
||||
// NewIntegrator makes one that uses the given clock
|
||||
func NewIntegrator(clk clock.PassiveClock) Integrator {
|
||||
return &integrator{
|
||||
clk: clk,
|
||||
lastTime: clk.Now(),
|
||||
}
|
||||
}
|
||||
|
||||
func (igr *integrator) Set(x float64) {
|
||||
igr.Lock()
|
||||
igr.updateLocked()
|
||||
igr.x = x
|
||||
igr.Unlock()
|
||||
}
|
||||
|
||||
func (igr *integrator) Add(deltaX float64) {
|
||||
igr.Lock()
|
||||
igr.updateLocked()
|
||||
igr.x += deltaX
|
||||
igr.Unlock()
|
||||
}
|
||||
|
||||
func (igr *integrator) updateLocked() {
|
||||
now := igr.clk.Now()
|
||||
dt := now.Sub(igr.lastTime).Seconds()
|
||||
igr.lastTime = now
|
||||
igr.integrals[0] += dt
|
||||
igr.integrals[1] += dt * igr.x
|
||||
igr.integrals[2] += dt * igr.x * igr.x
|
||||
}
|
||||
|
||||
func (igr *integrator) GetResults() (results IntegratorResults) {
|
||||
igr.Lock()
|
||||
defer func() { igr.Unlock() }()
|
||||
return igr.getResultsLocked()
|
||||
}
|
||||
|
||||
func (igr *integrator) getResultsLocked() (results IntegratorResults) {
|
||||
igr.updateLocked()
|
||||
results.Duration = igr.integrals[0]
|
||||
if results.Duration <= 0 {
|
||||
results.Average = math.NaN()
|
||||
results.Deviation = math.NaN()
|
||||
return
|
||||
}
|
||||
results.Average = igr.integrals[1] / igr.integrals[0]
|
||||
// Deviation is sqrt( Integral( (x - xbar)^2 dt) / Duration )
|
||||
// = sqrt( Integral( x^2 + xbar^2 -2*x*xbar dt ) / Duration )
|
||||
// = sqrt( ( Integral( x^2 dt ) + Duration * xbar^2 - 2*xbar*Integral(x dt) ) / Duration)
|
||||
// = sqrt( Integral(x^2 dt)/Duration - xbar^2 )
|
||||
variance := igr.integrals[2]/igr.integrals[0] - results.Average*results.Average
|
||||
if variance > 0 {
|
||||
results.Deviation = math.Sqrt(variance)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (igr *integrator) Reset() (results IntegratorResults) {
|
||||
igr.Lock()
|
||||
defer func() { igr.Unlock() }()
|
||||
results = igr.getResultsLocked()
|
||||
igr.integrals = [3]float64{0, 0, 0}
|
||||
return
|
||||
}
|
@@ -21,6 +21,7 @@ import (
|
||||
|
||||
"k8s.io/apiserver/pkg/util/flowcontrol/debug"
|
||||
fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing"
|
||||
"k8s.io/apiserver/pkg/util/flowcontrol/metrics"
|
||||
)
|
||||
|
||||
// NewNoRestraintFactory makes a QueueSetFactory that produces
|
||||
@@ -38,7 +39,7 @@ type noRestraint struct{}
|
||||
|
||||
type noRestraintRequest struct{}
|
||||
|
||||
func (noRestraintFactory) BeginConstruction(qCfg fq.QueuingConfig) (fq.QueueSetCompleter, error) {
|
||||
func (noRestraintFactory) BeginConstruction(fq.QueuingConfig, metrics.TimedObserverPair) (fq.QueueSetCompleter, error) {
|
||||
return noRestraintCompleter{}, nil
|
||||
}
|
||||
|
||||
@@ -54,7 +55,7 @@ func (noRestraint) IsIdle() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (noRestraint) StartRequest(ctx context.Context, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}) (fq.Request, bool) {
|
||||
func (noRestraint) StartRequest(ctx context.Context, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn fq.QueueNoteFn) (fq.Request, bool) {
|
||||
return noRestraintRequest{}, false
|
||||
}
|
||||
|
||||
|
@@ -31,6 +31,7 @@ import (
|
||||
"k8s.io/apiserver/pkg/endpoints/request"
|
||||
fqtesting "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing"
|
||||
fcfmt "k8s.io/apiserver/pkg/util/flowcontrol/format"
|
||||
"k8s.io/apiserver/pkg/util/flowcontrol/metrics"
|
||||
)
|
||||
|
||||
var noRestraintQSF = fqtesting.NewNoRestraintFactory()
|
||||
@@ -55,7 +56,7 @@ func genPL(rng *rand.Rand, name string) *fcv1a1.PriorityLevelConfiguration {
|
||||
HandSize: hs,
|
||||
QueueLengthLimit: 5}
|
||||
}
|
||||
_, err := qscOfPL(noRestraintQSF, nil, plc, time.Minute)
|
||||
_, err := queueSetCompleterForPL(noRestraintQSF, nil, plc, time.Minute, metrics.PriorityLevelConcurrencyObserverPairGenerator.Generate(1, 1, []string{"test"}))
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
@@ -2,14 +2,20 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library")
|
||||
|
||||
go_library(
|
||||
name = "go_default_library",
|
||||
srcs = ["metrics.go"],
|
||||
srcs = [
|
||||
"metrics.go",
|
||||
"sample_and_watermark.go",
|
||||
"timed_observer.go",
|
||||
],
|
||||
importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/util/flowcontrol/metrics",
|
||||
importpath = "k8s.io/apiserver/pkg/util/flowcontrol/metrics",
|
||||
visibility = ["//visibility:public"],
|
||||
deps = [
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",
|
||||
"//staging/src/k8s.io/component-base/metrics:go_default_library",
|
||||
"//staging/src/k8s.io/component-base/metrics/legacyregistry:go_default_library",
|
||||
"//staging/src/k8s.io/component-base/metrics/testutil:go_default_library",
|
||||
"//vendor/k8s.io/klog/v2:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
|
@@ -21,6 +21,7 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"k8s.io/apimachinery/pkg/util/clock"
|
||||
compbasemetrics "k8s.io/component-base/metrics"
|
||||
"k8s.io/component-base/metrics/legacyregistry"
|
||||
basemetricstestutil "k8s.io/component-base/metrics/testutil"
|
||||
@@ -32,8 +33,11 @@ const (
|
||||
)
|
||||
|
||||
const (
|
||||
requestKind = "request_kind"
|
||||
priorityLevel = "priorityLevel"
|
||||
flowSchema = "flowSchema"
|
||||
phase = "phase"
|
||||
mark = "mark"
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -69,6 +73,14 @@ func GatherAndCompare(expected string, metricNames ...string) error {
|
||||
return basemetricstestutil.GatherAndCompare(legacyregistry.DefaultGatherer, strings.NewReader(expected), metricNames...)
|
||||
}
|
||||
|
||||
// Registerables is a slice of Registerable
|
||||
type Registerables []compbasemetrics.Registerable
|
||||
|
||||
// Append adds more
|
||||
func (rs Registerables) Append(more ...compbasemetrics.Registerable) Registerables {
|
||||
return append(rs, more...)
|
||||
}
|
||||
|
||||
var (
|
||||
apiserverRejectedRequestsTotal = compbasemetrics.NewCounterVec(
|
||||
&compbasemetrics.CounterOpts{
|
||||
@@ -88,6 +100,47 @@ var (
|
||||
},
|
||||
[]string{priorityLevel, flowSchema},
|
||||
)
|
||||
|
||||
// PriorityLevelConcurrencyObserverPairGenerator creates pairs that observe concurrency for priority levels
|
||||
PriorityLevelConcurrencyObserverPairGenerator = NewSampleAndWaterMarkHistogramsPairGenerator(clock.RealClock{}, time.Millisecond,
|
||||
&compbasemetrics.HistogramOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystem,
|
||||
Name: "priority_level_request_count_samples",
|
||||
Help: "Periodic observations of the number of requests",
|
||||
Buckets: []float64{0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1},
|
||||
StabilityLevel: compbasemetrics.ALPHA,
|
||||
},
|
||||
&compbasemetrics.HistogramOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystem,
|
||||
Name: "priority_level_request_count_watermarks",
|
||||
Help: "Watermarks of the number of requests",
|
||||
Buckets: []float64{0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1},
|
||||
StabilityLevel: compbasemetrics.ALPHA,
|
||||
},
|
||||
[]string{priorityLevel})
|
||||
|
||||
// ReadWriteConcurrencyObserverPairGenerator creates pairs that observe concurrency broken down by mutating vs readonly
|
||||
ReadWriteConcurrencyObserverPairGenerator = NewSampleAndWaterMarkHistogramsPairGenerator(clock.RealClock{}, time.Millisecond,
|
||||
&compbasemetrics.HistogramOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystem,
|
||||
Name: "read_vs_write_request_count_samples",
|
||||
Help: "Periodic observations of the number of requests",
|
||||
Buckets: []float64{0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1},
|
||||
StabilityLevel: compbasemetrics.ALPHA,
|
||||
},
|
||||
&compbasemetrics.HistogramOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystem,
|
||||
Name: "read_vs_write_request_count_watermarks",
|
||||
Help: "Watermarks of the number of requests",
|
||||
Buckets: []float64{0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1},
|
||||
StabilityLevel: compbasemetrics.ALPHA,
|
||||
},
|
||||
[]string{requestKind})
|
||||
|
||||
apiserverCurrentInqueueRequests = compbasemetrics.NewGaugeVec(
|
||||
&compbasemetrics.GaugeOpts{
|
||||
Namespace: namespace,
|
||||
@@ -145,7 +198,7 @@ var (
|
||||
},
|
||||
[]string{priorityLevel, flowSchema},
|
||||
)
|
||||
metrics = []compbasemetrics.Registerable{
|
||||
metrics = Registerables{
|
||||
apiserverRejectedRequestsTotal,
|
||||
apiserverDispatchedRequestsTotal,
|
||||
apiserverCurrentInqueueRequests,
|
||||
@@ -154,7 +207,9 @@ var (
|
||||
apiserverCurrentExecutingRequests,
|
||||
apiserverRequestWaitingSeconds,
|
||||
apiserverRequestExecutionSeconds,
|
||||
}
|
||||
}.
|
||||
Append(PriorityLevelConcurrencyObserverPairGenerator.metrics()...).
|
||||
Append(ReadWriteConcurrencyObserverPairGenerator.metrics()...)
|
||||
)
|
||||
|
||||
// AddRequestsInQueues adds the given delta to the gauge of the # of requests in the queues of the specified flowSchema and priorityLevel
|
||||
|
@@ -0,0 +1,192 @@
|
||||
/*
|
||||
Copyright 2019 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package metrics
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"k8s.io/apimachinery/pkg/util/clock"
|
||||
compbasemetrics "k8s.io/component-base/metrics"
|
||||
"k8s.io/klog/v2"
|
||||
)
|
||||
|
||||
const (
|
||||
labelNameMark = "mark"
|
||||
labelValueLo = "low"
|
||||
labelValueHi = "high"
|
||||
labelNamePhase = "phase"
|
||||
labelValueWaiting = "waiting"
|
||||
labelValueExecuting = "executing"
|
||||
)
|
||||
|
||||
// SampleAndWaterMarkPairGenerator makes pairs of TimedObservers that
|
||||
// track samples and watermarks.
|
||||
type SampleAndWaterMarkPairGenerator struct {
|
||||
urGenerator SampleAndWaterMarkObserverGenerator
|
||||
}
|
||||
|
||||
var _ TimedObserverPairGenerator = SampleAndWaterMarkPairGenerator{}
|
||||
|
||||
// NewSampleAndWaterMarkHistogramsPairGenerator makes a new pair generator
|
||||
func NewSampleAndWaterMarkHistogramsPairGenerator(clock clock.PassiveClock, samplePeriod time.Duration, sampleOpts, waterMarkOpts *compbasemetrics.HistogramOpts, labelNames []string) SampleAndWaterMarkPairGenerator {
|
||||
return SampleAndWaterMarkPairGenerator{
|
||||
urGenerator: NewSampleAndWaterMarkHistogramsGenerator(clock, samplePeriod, sampleOpts, waterMarkOpts, append([]string{labelNamePhase}, labelNames...)),
|
||||
}
|
||||
}
|
||||
|
||||
// Generate makes a new pair
|
||||
func (spg SampleAndWaterMarkPairGenerator) Generate(waiting1, executing1 float64, labelValues []string) TimedObserverPair {
|
||||
return TimedObserverPair{
|
||||
RequestsWaiting: spg.urGenerator.Generate(0, waiting1, append([]string{labelValueWaiting}, labelValues...)),
|
||||
RequestsExecuting: spg.urGenerator.Generate(0, executing1, append([]string{labelValueExecuting}, labelValues...)),
|
||||
}
|
||||
}
|
||||
|
||||
func (spg SampleAndWaterMarkPairGenerator) metrics() Registerables {
|
||||
return spg.urGenerator.metrics()
|
||||
}
|
||||
|
||||
// SampleAndWaterMarkObserverGenerator creates TimedObservers that
|
||||
// populate histograms of samples and low- and high-water-marks. The
|
||||
// generator has a samplePeriod, and the histograms get an observation
|
||||
// every samplePeriod.
|
||||
type SampleAndWaterMarkObserverGenerator struct {
|
||||
*sampleAndWaterMarkObserverGenerator
|
||||
}
|
||||
|
||||
type sampleAndWaterMarkObserverGenerator struct {
|
||||
clock clock.PassiveClock
|
||||
samplePeriod time.Duration
|
||||
samples *compbasemetrics.HistogramVec
|
||||
waterMarks *compbasemetrics.HistogramVec
|
||||
}
|
||||
|
||||
var _ TimedObserverGenerator = (*sampleAndWaterMarkObserverGenerator)(nil)
|
||||
|
||||
// NewSampleAndWaterMarkHistogramsGenerator makes a new one
|
||||
func NewSampleAndWaterMarkHistogramsGenerator(clock clock.PassiveClock, samplePeriod time.Duration, sampleOpts, waterMarkOpts *compbasemetrics.HistogramOpts, labelNames []string) SampleAndWaterMarkObserverGenerator {
|
||||
return SampleAndWaterMarkObserverGenerator{
|
||||
&sampleAndWaterMarkObserverGenerator{
|
||||
clock: clock,
|
||||
samplePeriod: samplePeriod,
|
||||
samples: compbasemetrics.NewHistogramVec(sampleOpts, labelNames),
|
||||
waterMarks: compbasemetrics.NewHistogramVec(waterMarkOpts, append([]string{labelNameMark}, labelNames...)),
|
||||
}}
|
||||
}
|
||||
|
||||
func (swg *sampleAndWaterMarkObserverGenerator) quantize(when time.Time) int64 {
|
||||
return when.UnixNano() / int64(swg.samplePeriod)
|
||||
}
|
||||
|
||||
// Generate makes a new TimedObserver
|
||||
func (swg *sampleAndWaterMarkObserverGenerator) Generate(x, x1 float64, labelValues []string) TimedObserver {
|
||||
relX := x / x1
|
||||
when := swg.clock.Now()
|
||||
return &sampleAndWaterMarkHistograms{
|
||||
sampleAndWaterMarkObserverGenerator: swg,
|
||||
labelValues: labelValues,
|
||||
loLabelValues: append([]string{labelValueLo}, labelValues...),
|
||||
hiLabelValues: append([]string{labelValueHi}, labelValues...),
|
||||
x1: x1,
|
||||
sampleAndWaterMarkAccumulator: sampleAndWaterMarkAccumulator{
|
||||
lastSet: when,
|
||||
lastSetInt: swg.quantize(when),
|
||||
x: x,
|
||||
relX: relX,
|
||||
loRelX: relX,
|
||||
hiRelX: relX,
|
||||
}}
|
||||
}
|
||||
|
||||
func (swg *sampleAndWaterMarkObserverGenerator) metrics() Registerables {
|
||||
return Registerables{swg.samples, swg.waterMarks}
|
||||
}
|
||||
|
||||
type sampleAndWaterMarkHistograms struct {
|
||||
*sampleAndWaterMarkObserverGenerator
|
||||
labelValues []string
|
||||
loLabelValues, hiLabelValues []string
|
||||
|
||||
sync.Mutex
|
||||
x1 float64
|
||||
sampleAndWaterMarkAccumulator
|
||||
}
|
||||
|
||||
type sampleAndWaterMarkAccumulator struct {
|
||||
lastSet time.Time
|
||||
lastSetInt int64 // lastSet / samplePeriod
|
||||
x float64
|
||||
relX float64 // x / x1
|
||||
loRelX, hiRelX float64
|
||||
}
|
||||
|
||||
var _ TimedObserver = (*sampleAndWaterMarkHistograms)(nil)
|
||||
|
||||
func (saw *sampleAndWaterMarkHistograms) Add(deltaX float64) {
|
||||
saw.innerSet(func() {
|
||||
saw.x += deltaX
|
||||
})
|
||||
}
|
||||
|
||||
func (saw *sampleAndWaterMarkHistograms) Set(x float64) {
|
||||
saw.innerSet(func() {
|
||||
saw.x = x
|
||||
})
|
||||
}
|
||||
|
||||
func (saw *sampleAndWaterMarkHistograms) SetX1(x1 float64) {
|
||||
saw.innerSet(func() {
|
||||
saw.x1 = x1
|
||||
})
|
||||
}
|
||||
|
||||
func (saw *sampleAndWaterMarkHistograms) innerSet(updateXOrX1 func()) {
|
||||
saw.Lock()
|
||||
when := saw.clock.Now()
|
||||
whenInt := saw.quantize(when)
|
||||
acc := saw.sampleAndWaterMarkAccumulator
|
||||
wellOrdered := !when.Before(acc.lastSet)
|
||||
if wellOrdered {
|
||||
updateXOrX1()
|
||||
saw.relX = saw.x / saw.x1
|
||||
if acc.lastSetInt < whenInt {
|
||||
saw.loRelX, saw.hiRelX = acc.relX, acc.relX
|
||||
saw.lastSetInt = whenInt
|
||||
}
|
||||
if saw.relX < saw.loRelX {
|
||||
saw.loRelX = saw.relX
|
||||
} else if saw.relX > saw.hiRelX {
|
||||
saw.hiRelX = saw.relX
|
||||
}
|
||||
saw.lastSet = when
|
||||
}
|
||||
saw.Unlock()
|
||||
if !wellOrdered {
|
||||
lastSetS := acc.lastSet.Format(time.RFC3339Nano)
|
||||
whenS := when.Format(time.RFC3339Nano)
|
||||
klog.Fatalf("Time went backwards from %s to %s for labelValues=%#+v", lastSetS, whenS, saw.labelValues)
|
||||
panic(append([]string{lastSetS, whenS}, saw.labelValues...))
|
||||
}
|
||||
for acc.lastSetInt < whenInt {
|
||||
saw.samples.WithLabelValues(saw.labelValues...).Observe(acc.relX)
|
||||
saw.waterMarks.WithLabelValues(saw.loLabelValues...).Observe(acc.loRelX)
|
||||
saw.waterMarks.WithLabelValues(saw.hiLabelValues...).Observe(acc.hiRelX)
|
||||
acc.lastSetInt++
|
||||
acc.loRelX, acc.hiRelX = acc.relX, acc.relX
|
||||
}
|
||||
}
|
@@ -0,0 +1,52 @@
|
||||
/*
|
||||
Copyright 2019 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package metrics
|
||||
|
||||
// TimedObserver gets informed about the values assigned to a variable
|
||||
// `X float64` over time, and reports on the ratio `X/X1`.
|
||||
type TimedObserver interface {
|
||||
// Add notes a change to the variable
|
||||
Add(deltaX float64)
|
||||
|
||||
// Set notes a setting of the variable
|
||||
Set(x float64)
|
||||
|
||||
// SetX1 changes the value to use for X1
|
||||
SetX1(x1 float64)
|
||||
}
|
||||
|
||||
// TimedObserverGenerator creates related observers that are
|
||||
// differentiated by a series of label values
|
||||
type TimedObserverGenerator interface {
|
||||
Generate(x, x1 float64, labelValues []string) TimedObserver
|
||||
}
|
||||
|
||||
// TimedObserverPair is a corresponding pair of observers, one for the
|
||||
// number of requests waiting in queue(s) and one for the number of
|
||||
// requests being executed
|
||||
type TimedObserverPair struct {
|
||||
// RequestsWaiting is given observations of the number of currently queued requests
|
||||
RequestsWaiting TimedObserver
|
||||
|
||||
// RequestsExecuting is given observations of the number of requests currently executing
|
||||
RequestsExecuting TimedObserver
|
||||
}
|
||||
|
||||
// TimedObserverPairGenerator generates pairs
|
||||
type TimedObserverPairGenerator interface {
|
||||
Generate(waiting1, executing1 float64, labelValues []string) TimedObserverPair
|
||||
}
|
Reference in New Issue
Block a user