Merge pull request #103664 from wojtek-t/pf_rename_width
Rename width to "work estimate" in P&F code
This commit is contained in:
		| @@ -222,9 +222,6 @@ type Config struct { | ||||
| 	// If not specify any in flags, then genericapiserver will only enable defaultAPIResourceConfig. | ||||
| 	MergedResourceConfig *serverstore.ResourceConfig | ||||
|  | ||||
| 	// RequestWidthEstimator is used to estimate the "width" of the incoming request(s). | ||||
| 	RequestWidthEstimator flowcontrolrequest.WidthEstimatorFunc | ||||
|  | ||||
| 	// lifecycleSignals provides access to the various signals | ||||
| 	// that happen during lifecycle of the apiserver. | ||||
| 	// it's intentionally marked private as it should never be overridden. | ||||
| @@ -352,9 +349,8 @@ func NewConfig(codecs serializer.CodecFactory) *Config { | ||||
|  | ||||
| 		// Default to treating watch as a long-running operation | ||||
| 		// Generic API servers have no inherent long-running subresources | ||||
| 		LongRunningFunc:       genericfilters.BasicLongRunningRequestCheck(sets.NewString("watch"), sets.NewString()), | ||||
| 		RequestWidthEstimator: flowcontrolrequest.DefaultWidthEstimator, | ||||
| 		lifecycleSignals:      newLifecycleSignals(), | ||||
| 		LongRunningFunc:  genericfilters.BasicLongRunningRequestCheck(sets.NewString("watch"), sets.NewString()), | ||||
| 		lifecycleSignals: newLifecycleSignals(), | ||||
|  | ||||
| 		APIServerID:           id, | ||||
| 		StorageVersionManager: storageversion.NewDefaultManager(), | ||||
| @@ -747,7 +743,7 @@ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler { | ||||
|  | ||||
| 	if c.FlowControl != nil { | ||||
| 		handler = filterlatency.TrackCompleted(handler) | ||||
| 		handler = genericfilters.WithPriorityAndFairness(handler, c.LongRunningFunc, c.FlowControl, c.RequestWidthEstimator) | ||||
| 		handler = genericfilters.WithPriorityAndFairness(handler, c.LongRunningFunc, c.FlowControl, flowcontrolrequest.DefaultWorkEstimator) | ||||
| 		handler = filterlatency.TrackStarted(handler, "priorityandfairness") | ||||
| 	} else { | ||||
| 		handler = genericfilters.WithMaxInFlightLimit(handler, c.MaxRequestsInFlight, c.MaxMutatingRequestsInFlight, c.LongRunningFunc) | ||||
|   | ||||
| @@ -61,7 +61,7 @@ func WithPriorityAndFairness( | ||||
| 	handler http.Handler, | ||||
| 	longRunningRequestCheck apirequest.LongRunningRequestCheck, | ||||
| 	fcIfc utilflowcontrol.Interface, | ||||
| 	widthEstimator flowcontrolrequest.WidthEstimatorFunc, | ||||
| 	workEstimator flowcontrolrequest.WorkEstimatorFunc, | ||||
| ) http.Handler { | ||||
| 	if fcIfc == nil { | ||||
| 		klog.Warningf("priority and fairness support not found, skipping") | ||||
| @@ -122,11 +122,14 @@ func WithPriorityAndFairness( | ||||
| 			} | ||||
| 		} | ||||
|  | ||||
| 		// find the estimated "width" of the request | ||||
| 		// TODO: Maybe just make it costEstimator and let it return additionalLatency too for the watch? | ||||
| 		// find the estimated amount of work of the request | ||||
| 		// TODO: Estimate cost should also take fcIfc.GetWatchCount(requestInfo) as a parameter. | ||||
| 		width := widthEstimator.EstimateWidth(r) | ||||
| 		digest := utilflowcontrol.RequestDigest{RequestInfo: requestInfo, User: user, Width: width} | ||||
| 		workEstimate := workEstimator.EstimateWork(r) | ||||
| 		digest := utilflowcontrol.RequestDigest{ | ||||
| 			RequestInfo:  requestInfo, | ||||
| 			User:         user, | ||||
| 			WorkEstimate: workEstimate, | ||||
| 		} | ||||
|  | ||||
| 		if isWatchRequest { | ||||
| 			// This channel blocks calling handler.ServeHTTP() until closed, and is closed inside execute(). | ||||
|   | ||||
| @@ -70,7 +70,9 @@ const ( | ||||
| 	decisionSkipFilter | ||||
| ) | ||||
|  | ||||
| var defaultRequestWidthEstimator = func(*http.Request) fcrequest.Width { return fcrequest.Width{Seats: 1} } | ||||
| var defaultRequestWorkEstimator = func(*http.Request) fcrequest.WorkEstimate { | ||||
| 	return fcrequest.WorkEstimate{Seats: 1} | ||||
| } | ||||
|  | ||||
| type fakeApfFilter struct { | ||||
| 	mockDecision mockDecision | ||||
| @@ -165,7 +167,7 @@ func newApfHandlerWithFilter(t *testing.T, flowControlFilter utilflowcontrol.Int | ||||
|  | ||||
| 	apfHandler := WithPriorityAndFairness(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | ||||
| 		onExecute() | ||||
| 	}), longRunningRequestCheck, flowControlFilter, defaultRequestWidthEstimator) | ||||
| 	}), longRunningRequestCheck, flowControlFilter, defaultRequestWorkEstimator) | ||||
|  | ||||
| 	handler := apifilters.WithRequestInfo(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | ||||
| 		r = r.WithContext(apirequest.WithUser(r.Context(), &user.DefaultInfo{ | ||||
| @@ -649,7 +651,7 @@ func TestApfWithRequestDigest(t *testing.T) { | ||||
| 	reqDigestExpected := &utilflowcontrol.RequestDigest{ | ||||
| 		RequestInfo: &apirequest.RequestInfo{Verb: "get"}, | ||||
| 		User:        &user.DefaultInfo{Name: "foo"}, | ||||
| 		Width: fcrequest.Width{ | ||||
| 		WorkEstimate: fcrequest.WorkEstimate{ | ||||
| 			Seats: 5, | ||||
| 		}, | ||||
| 	} | ||||
| @@ -657,7 +659,7 @@ func TestApfWithRequestDigest(t *testing.T) { | ||||
| 	handler := WithPriorityAndFairness(http.HandlerFunc(func(_ http.ResponseWriter, req *http.Request) {}), | ||||
| 		longRunningFunc, | ||||
| 		fakeFilter, | ||||
| 		func(_ *http.Request) fcrequest.Width { return reqDigestExpected.Width }, | ||||
| 		func(_ *http.Request) fcrequest.WorkEstimate { return reqDigestExpected.WorkEstimate }, | ||||
| 	) | ||||
|  | ||||
| 	w := httptest.NewRecorder() | ||||
| @@ -1171,7 +1173,7 @@ func newHandlerChain(t *testing.T, handler http.Handler, filter utilflowcontrol. | ||||
| 	requestInfoFactory := &apirequest.RequestInfoFactory{APIPrefixes: sets.NewString("apis", "api"), GrouplessAPIPrefixes: sets.NewString("api")} | ||||
| 	longRunningRequestCheck := BasicLongRunningRequestCheck(sets.NewString("watch"), sets.NewString("proxy")) | ||||
|  | ||||
| 	apfHandler := WithPriorityAndFairness(handler, longRunningRequestCheck, filter, defaultRequestWidthEstimator) | ||||
| 	apfHandler := WithPriorityAndFairness(handler, longRunningRequestCheck, filter, defaultRequestWorkEstimator) | ||||
|  | ||||
| 	// add the handler in the chain that adds the specified user to the request context | ||||
| 	handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | ||||
|   | ||||
| @@ -80,9 +80,9 @@ type StartFunction func(ctx context.Context, hashValue uint64) (execute bool, af | ||||
|  | ||||
| // RequestDigest holds necessary info from request for flow-control | ||||
| type RequestDigest struct { | ||||
| 	RequestInfo *request.RequestInfo | ||||
| 	User        user.Info | ||||
| 	Width       fcrequest.Width | ||||
| 	RequestInfo  *request.RequestInfo | ||||
| 	User         user.Info | ||||
| 	WorkEstimate fcrequest.WorkEstimate | ||||
| } | ||||
|  | ||||
| // `*configController` maintains eventual consistency with the API | ||||
| @@ -327,7 +327,7 @@ func (cfgCtlr *configController) processNextWorkItem() bool { | ||||
|  | ||||
| 	func(obj interface{}) { | ||||
| 		defer cfgCtlr.configQueue.Done(obj) | ||||
| 		specificDelay, err := cfgCtlr.syncOne(map[string]string{}) | ||||
| 		specificDelay, err := cfgCtlr.syncOne() | ||||
| 		switch { | ||||
| 		case err != nil: | ||||
| 			klog.Error(err) | ||||
| @@ -346,7 +346,7 @@ func (cfgCtlr *configController) processNextWorkItem() bool { | ||||
| // objects that configure API Priority and Fairness and updates the | ||||
| // local configController accordingly. | ||||
| // Only invoke this in the one and only worker goroutine | ||||
| func (cfgCtlr *configController) syncOne(flowSchemaRVs map[string]string) (specificDelay time.Duration, err error) { | ||||
| func (cfgCtlr *configController) syncOne() (specificDelay time.Duration, err error) { | ||||
| 	klog.V(5).Infof("%s syncOne at %s", cfgCtlr.name, cfgCtlr.clock.Now().Format(timeFmt)) | ||||
| 	all := labels.Everything() | ||||
| 	newPLs, err := cfgCtlr.plLister.List(all) | ||||
| @@ -357,7 +357,7 @@ func (cfgCtlr *configController) syncOne(flowSchemaRVs map[string]string) (speci | ||||
| 	if err != nil { | ||||
| 		return 0, fmt.Errorf("unable to list FlowSchema objects: %w", err) | ||||
| 	} | ||||
| 	return cfgCtlr.digestConfigObjects(newPLs, newFSs, flowSchemaRVs) | ||||
| 	return cfgCtlr.digestConfigObjects(newPLs, newFSs) | ||||
| } | ||||
|  | ||||
| // cfgMeal is the data involved in the process of digesting the API | ||||
| @@ -398,7 +398,7 @@ type fsStatusUpdate struct { | ||||
| // digestConfigObjects is given all the API objects that configure | ||||
| // cfgCtlr and writes its consequent new configState. | ||||
| // Only invoke this in the one and only worker goroutine | ||||
| func (cfgCtlr *configController) digestConfigObjects(newPLs []*flowcontrol.PriorityLevelConfiguration, newFSs []*flowcontrol.FlowSchema, flowSchemaRVs map[string]string) (time.Duration, error) { | ||||
| func (cfgCtlr *configController) digestConfigObjects(newPLs []*flowcontrol.PriorityLevelConfiguration, newFSs []*flowcontrol.FlowSchema) (time.Duration, error) { | ||||
| 	fsStatusUpdates := cfgCtlr.lockAndDigestConfigObjects(newPLs, newFSs) | ||||
| 	var errs []error | ||||
| 	currResult := updateAttempt{ | ||||
| @@ -427,16 +427,15 @@ func (cfgCtlr *configController) digestConfigObjects(newPLs []*flowcontrol.Prior | ||||
| 		fsIfc := cfgCtlr.flowcontrolClient.FlowSchemas() | ||||
| 		patchBytes := []byte(fmt.Sprintf(`{"status": {"conditions": [ %s ] } }`, string(enc))) | ||||
| 		patchOptions := metav1.PatchOptions{FieldManager: cfgCtlr.asFieldManager} | ||||
| 		patchedFlowSchema, err := fsIfc.Patch(context.TODO(), fsu.flowSchema.Name, apitypes.StrategicMergePatchType, patchBytes, patchOptions, "status") | ||||
| 		if err == nil { | ||||
| 			key, _ := cache.MetaNamespaceKeyFunc(patchedFlowSchema) | ||||
| 			flowSchemaRVs[key] = patchedFlowSchema.ResourceVersion | ||||
| 		} else if apierrors.IsNotFound(err) { | ||||
| 			// This object has been deleted.  A notification is coming | ||||
| 			// and nothing more needs to be done here. | ||||
| 			klog.V(5).Infof("%s at %s: attempted update of concurrently deleted FlowSchema %s; nothing more needs to be done", cfgCtlr.name, cfgCtlr.clock.Now().Format(timeFmt), fsu.flowSchema.Name) | ||||
| 		} else { | ||||
| 			errs = append(errs, fmt.Errorf("failed to set a status.condition for FlowSchema %s: %w", fsu.flowSchema.Name, err)) | ||||
| 		_, err = fsIfc.Patch(context.TODO(), fsu.flowSchema.Name, apitypes.StrategicMergePatchType, patchBytes, patchOptions, "status") | ||||
| 		if err != nil { | ||||
| 			if apierrors.IsNotFound(err) { | ||||
| 				// This object has been deleted.  A notification is coming | ||||
| 				// and nothing more needs to be done here. | ||||
| 				klog.V(5).Infof("%s at %s: attempted update of concurrently deleted FlowSchema %s; nothing more needs to be done", cfgCtlr.name, cfgCtlr.clock.Now().Format(timeFmt), fsu.flowSchema.Name) | ||||
| 			} else { | ||||
| 				errs = append(errs, fmt.Errorf("failed to set a status.condition for FlowSchema %s: %w", fsu.flowSchema.Name, err)) | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
| 	cfgCtlr.addUpdateResult(currResult) | ||||
| @@ -809,7 +808,7 @@ func (cfgCtlr *configController) startRequest(ctx context.Context, rd RequestDig | ||||
| 	} | ||||
| 	startWaitingTime = time.Now() | ||||
| 	klog.V(7).Infof("startRequest(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, numQueues=%d", rd, selectedFlowSchema.Name, selectedFlowSchema.Spec.DistinguisherMethod, plName, numQueues) | ||||
| 	req, idle := plState.queues.StartRequest(ctx, &rd.Width, hashValue, flowDistinguisher, selectedFlowSchema.Name, rd.RequestInfo, rd.User, queueNoteFn) | ||||
| 	req, idle := plState.queues.StartRequest(ctx, &rd.WorkEstimate, hashValue, flowDistinguisher, selectedFlowSchema.Name, rd.RequestInfo, rd.User, queueNoteFn) | ||||
| 	if idle { | ||||
| 		cfgCtlr.maybeReapLocked(plName, plState) | ||||
| 	} | ||||
|   | ||||
| @@ -140,7 +140,7 @@ func (cqs *ctlrTestQueueSet) IsIdle() bool { | ||||
| 	return cqs.countActive == 0 | ||||
| } | ||||
|  | ||||
| func (cqs *ctlrTestQueueSet) StartRequest(ctx context.Context, width *fcrequest.Width, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn fq.QueueNoteFn) (req fq.Request, idle bool) { | ||||
| func (cqs *ctlrTestQueueSet) StartRequest(ctx context.Context, width *fcrequest.WorkEstimate, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn fq.QueueNoteFn) (req fq.Request, idle bool) { | ||||
| 	cqs.cts.lock.Lock() | ||||
| 	defer cqs.cts.lock.Unlock() | ||||
| 	cqs.countActive++ | ||||
|   | ||||
| @@ -81,7 +81,7 @@ type QueueSet interface { | ||||
| 	// was idle at the moment of the return.  Otherwise idle==false | ||||
| 	// and the client must call the Finish method of the Request | ||||
| 	// exactly once. | ||||
| 	StartRequest(ctx context.Context, width *request.Width, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn QueueNoteFn) (req Request, idle bool) | ||||
| 	StartRequest(ctx context.Context, width *request.WorkEstimate, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn QueueNoteFn) (req Request, idle bool) | ||||
|  | ||||
| 	// UpdateObservations makes sure any time-based statistics have | ||||
| 	// caught up with the current clock reading | ||||
|   | ||||
| @@ -154,7 +154,7 @@ func TestFIFOSeatsSum(t *testing.T) { | ||||
| 	list := newRequestFIFO() | ||||
|  | ||||
| 	newRequest := func(width uint) *request { | ||||
| 		return &request{width: fcrequest.Width{Seats: width}} | ||||
| 		return &request{workEstimate: fcrequest.WorkEstimate{Seats: width}} | ||||
| 	} | ||||
| 	arrival := []*request{newRequest(1), newRequest(2), newRequest(3)} | ||||
| 	removeFn := make([]removeFromFIFOFunc, 0) | ||||
|   | ||||
| @@ -235,7 +235,7 @@ const ( | ||||
| // executing at each point where there is a change in that quantity, | ||||
| // because the metrics --- and only the metrics --- track that | ||||
| // quantity per FlowSchema. | ||||
| func (qs *queueSet) StartRequest(ctx context.Context, width *fqrequest.Width, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn fq.QueueNoteFn) (fq.Request, bool) { | ||||
| func (qs *queueSet) StartRequest(ctx context.Context, workEstimate *fqrequest.WorkEstimate, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn fq.QueueNoteFn) (fq.Request, bool) { | ||||
| 	qs.lockAndSyncTime() | ||||
| 	defer qs.lock.Unlock() | ||||
| 	var req *request | ||||
| @@ -244,13 +244,13 @@ func (qs *queueSet) StartRequest(ctx context.Context, width *fqrequest.Width, ha | ||||
| 	// Step 0: | ||||
| 	// Apply only concurrency limit, if zero queues desired | ||||
| 	if qs.qCfg.DesiredNumQueues < 1 { | ||||
| 		if !qs.canAccommodateSeatsLocked(int(width.Seats)) { | ||||
| 		if !qs.canAccommodateSeatsLocked(int(workEstimate.Seats)) { | ||||
| 			klog.V(5).Infof("QS(%s): rejecting request %q %#+v %#+v because %d seats are asked for, %d seats are in use (%d are executing) and the limit is %d", | ||||
| 				qs.qCfg.Name, fsName, descr1, descr2, width, qs.totSeatsInUse, qs.totRequestsExecuting, qs.dCfg.ConcurrencyLimit) | ||||
| 				qs.qCfg.Name, fsName, descr1, descr2, workEstimate, qs.totSeatsInUse, qs.totRequestsExecuting, qs.dCfg.ConcurrencyLimit) | ||||
| 			metrics.AddReject(ctx, qs.qCfg.Name, fsName, "concurrency-limit") | ||||
| 			return nil, qs.isIdleLocked() | ||||
| 		} | ||||
| 		req = qs.dispatchSansQueueLocked(ctx, width, flowDistinguisher, fsName, descr1, descr2) | ||||
| 		req = qs.dispatchSansQueueLocked(ctx, workEstimate, flowDistinguisher, fsName, descr1, descr2) | ||||
| 		return req, false | ||||
| 	} | ||||
|  | ||||
| @@ -261,7 +261,7 @@ func (qs *queueSet) StartRequest(ctx context.Context, width *fqrequest.Width, ha | ||||
| 	// 3) Reject current request if there is not enough concurrency shares and | ||||
| 	// we are at max queue length | ||||
| 	// 4) If not rejected, create a request and enqueue | ||||
| 	req = qs.timeoutOldRequestsAndRejectOrEnqueueLocked(ctx, width, hashValue, flowDistinguisher, fsName, descr1, descr2, queueNoteFn) | ||||
| 	req = qs.timeoutOldRequestsAndRejectOrEnqueueLocked(ctx, workEstimate, hashValue, flowDistinguisher, fsName, descr1, descr2, queueNoteFn) | ||||
| 	// req == nil means that the request was rejected - no remaining | ||||
| 	// concurrency shares and at max queue length already | ||||
| 	if req == nil { | ||||
| @@ -316,7 +316,7 @@ func (qs *queueSet) StartRequest(ctx context.Context, width *fqrequest.Width, ha | ||||
|  | ||||
| // Seats returns the number of seats this request requires. | ||||
| func (req *request) Seats() int { | ||||
| 	return int(req.width.Seats) | ||||
| 	return int(req.workEstimate.Seats) | ||||
| } | ||||
|  | ||||
| func (req *request) NoteQueued(inQueue bool) { | ||||
| @@ -437,7 +437,7 @@ func (qs *queueSet) getVirtualTimeRatioLocked() float64 { | ||||
| // returns the enqueud request on a successful enqueue | ||||
| // returns nil in the case that there is no available concurrency or | ||||
| // the queuelengthlimit has been reached | ||||
| func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Context, width *fqrequest.Width, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn fq.QueueNoteFn) *request { | ||||
| func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Context, workEstimate *fqrequest.WorkEstimate, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn fq.QueueNoteFn) *request { | ||||
| 	// Start with the shuffle sharding, to pick a queue. | ||||
| 	queueIdx := qs.chooseQueueIndexLocked(hashValue, descr1, descr2) | ||||
| 	queue := qs.queues[queueIdx] | ||||
| @@ -459,7 +459,7 @@ func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Conte | ||||
| 		descr1:            descr1, | ||||
| 		descr2:            descr2, | ||||
| 		queueNoteFn:       queueNoteFn, | ||||
| 		width:             *width, | ||||
| 		workEstimate:      *workEstimate, | ||||
| 	} | ||||
| 	if ok := qs.rejectOrEnqueueLocked(req); !ok { | ||||
| 		return nil | ||||
| @@ -476,7 +476,7 @@ func (qs *queueSet) chooseQueueIndexLocked(hashValue uint64, descr1, descr2 inte | ||||
| 	// the dealer uses the current desired number of queues, which is no larger than the number in `qs.queues`. | ||||
| 	qs.dealer.Deal(hashValue, func(queueIdx int) { | ||||
| 		// TODO: Consider taking into account `additional latency` of requests | ||||
| 		// in addition to their widths. | ||||
| 		// in addition to their seats. | ||||
| 		// Ideally, this should be based on projected completion time in the | ||||
| 		// virtual world of the youngest request in the queue. | ||||
| 		thisSeatsSum := qs.queues[queueIdx].requests.SeatsSum() | ||||
| @@ -579,7 +579,7 @@ func (qs *queueSet) dispatchAsMuchAsPossibleLocked() { | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, width *fqrequest.Width, flowDistinguisher, fsName string, descr1, descr2 interface{}) *request { | ||||
| func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, workEstimate *fqrequest.WorkEstimate, flowDistinguisher, fsName string, descr1, descr2 interface{}) *request { | ||||
| 	// does not call metrics.SetDispatchMetrics because there is no queuing and thus no interesting virtual world | ||||
| 	now := qs.clock.Now() | ||||
| 	req := &request{ | ||||
| @@ -592,7 +592,7 @@ func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, width *fqreques | ||||
| 		arrivalTime:       now, | ||||
| 		descr1:            descr1, | ||||
| 		descr2:            descr2, | ||||
| 		width:             *width, | ||||
| 		workEstimate:      *workEstimate, | ||||
| 	} | ||||
| 	req.decision.SetLocked(decisionExecute) | ||||
| 	qs.totRequestsExecuting++ | ||||
|   | ||||
| @@ -227,7 +227,7 @@ func (ust *uniformScenarioThread) callK(k int) { | ||||
| 	if k >= ust.nCalls { | ||||
| 		return | ||||
| 	} | ||||
| 	req, idle := ust.uss.qs.StartRequest(context.Background(), &fcrequest.Width{Seats: 1}, ust.uc.hash, "", ust.fsName, ust.uss.name, []int{ust.i, ust.j, k}, nil) | ||||
| 	req, idle := ust.uss.qs.StartRequest(context.Background(), &fcrequest.WorkEstimate{Seats: 1}, ust.uc.hash, "", ust.fsName, ust.uss.name, []int{ust.i, ust.j, k}, nil) | ||||
| 	ust.uss.t.Logf("%s: %d, %d, %d got req=%p, idle=%v", ust.uss.clk.Now().Format(nsTimeFmt), ust.i, ust.j, k, req, idle) | ||||
| 	if req == nil { | ||||
| 		atomic.AddUint64(&ust.uss.failedCount, 1) | ||||
| @@ -672,7 +672,7 @@ func TestContextCancel(t *testing.T) { | ||||
| 	ctx1 := context.Background() | ||||
| 	b2i := map[bool]int{false: 0, true: 1} | ||||
| 	var qnc [2][2]int32 | ||||
| 	req1, _ := qs.StartRequest(ctx1, &fcrequest.Width{Seats: 1}, 1, "", "fs1", "test", "one", func(inQueue bool) { atomic.AddInt32(&qnc[0][b2i[inQueue]], 1) }) | ||||
| 	req1, _ := qs.StartRequest(ctx1, &fcrequest.WorkEstimate{Seats: 1}, 1, "", "fs1", "test", "one", func(inQueue bool) { atomic.AddInt32(&qnc[0][b2i[inQueue]], 1) }) | ||||
| 	if req1 == nil { | ||||
| 		t.Error("Request rejected") | ||||
| 		return | ||||
| @@ -700,7 +700,7 @@ func TestContextCancel(t *testing.T) { | ||||
| 			counter.Add(1) | ||||
| 			cancel2() | ||||
| 		}() | ||||
| 		req2, idle2a := qs.StartRequest(ctx2, &fcrequest.Width{Seats: 1}, 2, "", "fs2", "test", "two", func(inQueue bool) { atomic.AddInt32(&qnc[1][b2i[inQueue]], 1) }) | ||||
| 		req2, idle2a := qs.StartRequest(ctx2, &fcrequest.WorkEstimate{Seats: 1}, 2, "", "fs2", "test", "two", func(inQueue bool) { atomic.AddInt32(&qnc[1][b2i[inQueue]], 1) }) | ||||
| 		if idle2a { | ||||
| 			t.Error("2nd StartRequest returned idle") | ||||
| 		} | ||||
| @@ -759,7 +759,7 @@ func TestTotalRequestsExecutingWithPanic(t *testing.T) { | ||||
| 	} | ||||
|  | ||||
| 	ctx := context.Background() | ||||
| 	req, _ := qs.StartRequest(ctx, &fcrequest.Width{Seats: 1}, 1, "", "fs", "test", "one", func(inQueue bool) {}) | ||||
| 	req, _ := qs.StartRequest(ctx, &fcrequest.WorkEstimate{Seats: 1}, 1, "", "fs", "test", "one", func(inQueue bool) {}) | ||||
| 	if req == nil { | ||||
| 		t.Fatal("expected a Request object from StartRequest, but got nil") | ||||
| 	} | ||||
| @@ -812,13 +812,13 @@ func TestSelectQueueLocked(t *testing.T) { | ||||
| 				{ | ||||
| 					virtualStart: 200, | ||||
| 					requests: newFIFO( | ||||
| 						&request{width: fcrequest.Width{Seats: 1}}, | ||||
| 						&request{workEstimate: fcrequest.WorkEstimate{Seats: 1}}, | ||||
| 					), | ||||
| 				}, | ||||
| 				{ | ||||
| 					virtualStart: 100, | ||||
| 					requests: newFIFO( | ||||
| 						&request{width: fcrequest.Width{Seats: 1}}, | ||||
| 						&request{workEstimate: fcrequest.WorkEstimate{Seats: 1}}, | ||||
| 					), | ||||
| 				}, | ||||
| 			}, | ||||
| @@ -835,7 +835,7 @@ func TestSelectQueueLocked(t *testing.T) { | ||||
| 				{ | ||||
| 					virtualStart: 200, | ||||
| 					requests: newFIFO( | ||||
| 						&request{width: fcrequest.Width{Seats: 1}}, | ||||
| 						&request{workEstimate: fcrequest.WorkEstimate{Seats: 1}}, | ||||
| 					), | ||||
| 				}, | ||||
| 			}, | ||||
| @@ -852,13 +852,13 @@ func TestSelectQueueLocked(t *testing.T) { | ||||
| 				{ | ||||
| 					virtualStart: 200, | ||||
| 					requests: newFIFO( | ||||
| 						&request{width: fcrequest.Width{Seats: 50}}, | ||||
| 						&request{workEstimate: fcrequest.WorkEstimate{Seats: 50}}, | ||||
| 					), | ||||
| 				}, | ||||
| 				{ | ||||
| 					virtualStart: 100, | ||||
| 					requests: newFIFO( | ||||
| 						&request{width: fcrequest.Width{Seats: 25}}, | ||||
| 						&request{workEstimate: fcrequest.WorkEstimate{Seats: 25}}, | ||||
| 					), | ||||
| 				}, | ||||
| 			}, | ||||
| @@ -875,13 +875,13 @@ func TestSelectQueueLocked(t *testing.T) { | ||||
| 				{ | ||||
| 					virtualStart: 200, | ||||
| 					requests: newFIFO( | ||||
| 						&request{width: fcrequest.Width{Seats: 10}}, | ||||
| 						&request{workEstimate: fcrequest.WorkEstimate{Seats: 10}}, | ||||
| 					), | ||||
| 				}, | ||||
| 				{ | ||||
| 					virtualStart: 100, | ||||
| 					requests: newFIFO( | ||||
| 						&request{width: fcrequest.Width{Seats: 25}}, | ||||
| 						&request{workEstimate: fcrequest.WorkEstimate{Seats: 25}}, | ||||
| 					), | ||||
| 				}, | ||||
| 			}, | ||||
| @@ -898,13 +898,13 @@ func TestSelectQueueLocked(t *testing.T) { | ||||
| 				{ | ||||
| 					virtualStart: 200, | ||||
| 					requests: newFIFO( | ||||
| 						&request{width: fcrequest.Width{Seats: 10}}, | ||||
| 						&request{workEstimate: fcrequest.WorkEstimate{Seats: 10}}, | ||||
| 					), | ||||
| 				}, | ||||
| 				{ | ||||
| 					virtualStart: 100, | ||||
| 					requests: newFIFO( | ||||
| 						&request{width: fcrequest.Width{Seats: 25}}, | ||||
| 						&request{workEstimate: fcrequest.WorkEstimate{Seats: 25}}, | ||||
| 					), | ||||
| 				}, | ||||
| 			}, | ||||
|   | ||||
| @@ -44,8 +44,8 @@ type request struct { | ||||
| 	// startTime is the real time when the request began executing | ||||
| 	startTime time.Time | ||||
|  | ||||
| 	// width of the request | ||||
| 	width fcrequest.Width | ||||
| 	// estimated amount of work of the request | ||||
| 	workEstimate fcrequest.WorkEstimate | ||||
|  | ||||
| 	// decision gets set to a `requestDecision` indicating what to do | ||||
| 	// with this request.  It gets set exactly once, when the request | ||||
|   | ||||
| @@ -56,7 +56,7 @@ func (noRestraint) IsIdle() bool { | ||||
| 	return false | ||||
| } | ||||
|  | ||||
| func (noRestraint) StartRequest(ctx context.Context, width *fcrequest.Width, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn fq.QueueNoteFn) (fq.Request, bool) { | ||||
| func (noRestraint) StartRequest(ctx context.Context, workEstimate *fcrequest.WorkEstimate, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn fq.QueueNoteFn) (fq.Request, bool) { | ||||
| 	return noRestraintRequest{}, false | ||||
| } | ||||
|  | ||||
|   | ||||
| @@ -102,8 +102,8 @@ func TestLiterals(t *testing.T) { | ||||
| 			Name:              "eman", | ||||
| 			Parts:             []string{"goodrscs", "eman"}, | ||||
| 		}, | ||||
| 		User:  ui, | ||||
| 		Width: fcrequest.Width{Seats: 1}, | ||||
| 		User:         ui, | ||||
| 		WorkEstimate: fcrequest.WorkEstimate{Seats: 1}, | ||||
| 	} | ||||
| 	reqRU := RequestDigest{ | ||||
| 		RequestInfo: &request.RequestInfo{ | ||||
| @@ -118,8 +118,8 @@ func TestLiterals(t *testing.T) { | ||||
| 			Name:              "eman", | ||||
| 			Parts:             []string{"goodrscs", "eman"}, | ||||
| 		}, | ||||
| 		User:  ui, | ||||
| 		Width: fcrequest.Width{Seats: 1}, | ||||
| 		User:         ui, | ||||
| 		WorkEstimate: fcrequest.WorkEstimate{Seats: 1}, | ||||
| 	} | ||||
| 	reqN := RequestDigest{ | ||||
| 		RequestInfo: &request.RequestInfo{ | ||||
| @@ -127,8 +127,8 @@ func TestLiterals(t *testing.T) { | ||||
| 			Path:              "/openapi/v2", | ||||
| 			Verb:              "goodverb", | ||||
| 		}, | ||||
| 		User:  ui, | ||||
| 		Width: fcrequest.Width{Seats: 1}, | ||||
| 		User:         ui, | ||||
| 		WorkEstimate: fcrequest.WorkEstimate{Seats: 1}, | ||||
| 	} | ||||
| 	checkRules(t, true, reqRN, []flowcontrol.PolicyRulesWithSubjects{{ | ||||
| 		Subjects: []flowcontrol.Subject{{Kind: flowcontrol.SubjectKindUser, | ||||
|   | ||||
| @@ -20,28 +20,28 @@ import ( | ||||
| 	"net/http" | ||||
| ) | ||||
|  | ||||
| type Width struct { | ||||
| type WorkEstimate struct { | ||||
| 	// Seats represents the number of seats associated with this request | ||||
| 	Seats uint | ||||
| } | ||||
|  | ||||
| // DefaultWidthEstimator returns returns '1' as the "width" | ||||
| // of the given request. | ||||
| // DefaultWorkEstimator returns estimation with default number of seats | ||||
| // of 1. | ||||
| // | ||||
| // TODO: when we plumb in actual "width" handling for different | ||||
| // TODO: when we plumb in actual work estimate handling for different | ||||
| //  type of request(s) this function will iterate through a chain | ||||
| //  of widthEstimator instance(s). | ||||
| func DefaultWidthEstimator(_ *http.Request) Width { | ||||
| 	return Width{ | ||||
| //  of workEstimator instance(s). | ||||
| func DefaultWorkEstimator(_ *http.Request) WorkEstimate { | ||||
| 	return WorkEstimate{ | ||||
| 		Seats: 1, | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // WidthEstimatorFunc returns the estimated "width" of a given request. | ||||
| // WorkEstimatorFunc returns the estimated work of a given request. | ||||
| // This function will be used by the Priority & Fairness filter to | ||||
| // estimate the "width" of incoming requests. | ||||
| type WidthEstimatorFunc func(*http.Request) Width | ||||
| // estimate the work of of incoming requests. | ||||
| type WorkEstimatorFunc func(*http.Request) WorkEstimate | ||||
|  | ||||
| func (e WidthEstimatorFunc) EstimateWidth(r *http.Request) Width { | ||||
| func (e WorkEstimatorFunc) EstimateWork(r *http.Request) WorkEstimate { | ||||
| 	return e(r) | ||||
| } | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 Kubernetes Prow Robot
					Kubernetes Prow Robot