Make object transformation concurrent to remove watch cache scalability issue for conversion webhook
Test by enabling consistent list from cache in storage version migrator stress test that uses conversion webhook that bottlenects events comming to watch cache. Set concurrency to 10, based on maximum/average transform latency when running stress test. In my testing max was about 60-100ms, while average was 6-10ms.
This commit is contained in:
		 Marek Siarkowicz
					Marek Siarkowicz
				
			
				
					committed by
					
						 Marek Siarkowicz
						Marek Siarkowicz
					
				
			
			
				
	
			
			
			 Marek Siarkowicz
						Marek Siarkowicz
					
				
			
						parent
						
							93a10a7569
						
					
				
				
					commit
					bb686f2033
				
			| @@ -46,8 +46,9 @@ import ( | ||||
|  | ||||
| const ( | ||||
| 	// We have set a buffer in order to reduce times of context switches. | ||||
| 	incomingBufSize = 100 | ||||
| 	outgoingBufSize = 100 | ||||
| 	incomingBufSize         = 100 | ||||
| 	outgoingBufSize         = 100 | ||||
| 	processEventConcurrency = 10 | ||||
| ) | ||||
|  | ||||
| // defaultWatcherMaxLimit is used to facilitate construction tests | ||||
| @@ -230,8 +231,7 @@ func (wc *watchChan) run(initialEventsEndBookmarkRequired, forceInitialEvents bo | ||||
| 	go wc.startWatching(watchClosedCh, initialEventsEndBookmarkRequired, forceInitialEvents) | ||||
|  | ||||
| 	var resultChanWG sync.WaitGroup | ||||
| 	resultChanWG.Add(1) | ||||
| 	go wc.processEvent(&resultChanWG) | ||||
| 	wc.processEvents(&resultChanWG) | ||||
|  | ||||
| 	select { | ||||
| 	case err := <-wc.errChan: | ||||
| @@ -424,10 +424,17 @@ func (wc *watchChan) startWatching(watchClosedCh chan struct{}, initialEventsEnd | ||||
| 	close(watchClosedCh) | ||||
| } | ||||
|  | ||||
| // processEvent processes events from etcd watcher and sends results to resultChan. | ||||
| func (wc *watchChan) processEvent(wg *sync.WaitGroup) { | ||||
| // processEvents processes events from etcd watcher and sends results to resultChan. | ||||
| func (wc *watchChan) processEvents(wg *sync.WaitGroup) { | ||||
| 	if utilfeature.DefaultFeatureGate.Enabled(features.ConcurrentWatchObjectDecode) { | ||||
| 		wc.concurrentProcessEvents(wg) | ||||
| 	} else { | ||||
| 		wg.Add(1) | ||||
| 		go wc.serialProcessEvents(wg) | ||||
| 	} | ||||
| } | ||||
| func (wc *watchChan) serialProcessEvents(wg *sync.WaitGroup) { | ||||
| 	defer wg.Done() | ||||
|  | ||||
| 	for { | ||||
| 		select { | ||||
| 		case e := <-wc.incomingEventChan: | ||||
| @@ -435,7 +442,7 @@ func (wc *watchChan) processEvent(wg *sync.WaitGroup) { | ||||
| 			if res == nil { | ||||
| 				continue | ||||
| 			} | ||||
| 			if len(wc.resultChan) == outgoingBufSize { | ||||
| 			if len(wc.resultChan) == cap(wc.resultChan) { | ||||
| 				klog.V(3).InfoS("Fast watcher, slow processing. Probably caused by slow dispatching events to watchers", "outgoingEvents", outgoingBufSize, "objectType", wc.watcher.objectType, "groupResource", wc.watcher.groupResource) | ||||
| 			} | ||||
| 			// If user couldn't receive results fast enough, we also block incoming events from watcher. | ||||
| @@ -452,6 +459,95 @@ func (wc *watchChan) processEvent(wg *sync.WaitGroup) { | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (wc *watchChan) concurrentProcessEvents(wg *sync.WaitGroup) { | ||||
| 	p := concurrentOrderedEventProcessing{ | ||||
| 		input:           wc.incomingEventChan, | ||||
| 		processFunc:     wc.transform, | ||||
| 		output:          wc.resultChan, | ||||
| 		processingQueue: make(chan chan *watch.Event, processEventConcurrency-1), | ||||
|  | ||||
| 		objectType:    wc.watcher.objectType, | ||||
| 		groupResource: wc.watcher.groupResource, | ||||
| 	} | ||||
| 	wg.Add(1) | ||||
| 	go func() { | ||||
| 		defer wg.Done() | ||||
| 		p.scheduleEventProcessing(wc.ctx, wg) | ||||
| 	}() | ||||
| 	wg.Add(1) | ||||
| 	go func() { | ||||
| 		defer wg.Done() | ||||
| 		p.collectEventProcessing(wc.ctx) | ||||
| 	}() | ||||
| } | ||||
|  | ||||
| type concurrentOrderedEventProcessing struct { | ||||
| 	input       chan *event | ||||
| 	processFunc func(*event) *watch.Event | ||||
| 	output      chan watch.Event | ||||
|  | ||||
| 	processingQueue chan chan *watch.Event | ||||
| 	// Metadata for logging | ||||
| 	objectType    string | ||||
| 	groupResource schema.GroupResource | ||||
| } | ||||
|  | ||||
| func (p *concurrentOrderedEventProcessing) scheduleEventProcessing(ctx context.Context, wg *sync.WaitGroup) { | ||||
| 	var e *event | ||||
| 	for { | ||||
| 		select { | ||||
| 		case <-ctx.Done(): | ||||
| 			return | ||||
| 		case e = <-p.input: | ||||
| 		} | ||||
| 		processingResponse := make(chan *watch.Event, 1) | ||||
| 		select { | ||||
| 		case <-ctx.Done(): | ||||
| 			return | ||||
| 		case p.processingQueue <- processingResponse: | ||||
| 		} | ||||
| 		wg.Add(1) | ||||
| 		go func(e *event, response chan<- *watch.Event) { | ||||
| 			defer wg.Done() | ||||
| 			select { | ||||
| 			case <-ctx.Done(): | ||||
| 			case response <- p.processFunc(e): | ||||
| 			} | ||||
| 		}(e, processingResponse) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (p *concurrentOrderedEventProcessing) collectEventProcessing(ctx context.Context) { | ||||
| 	var processingResponse chan *watch.Event | ||||
| 	var e *watch.Event | ||||
| 	for { | ||||
| 		select { | ||||
| 		case <-ctx.Done(): | ||||
| 			return | ||||
| 		case processingResponse = <-p.processingQueue: | ||||
| 		} | ||||
| 		select { | ||||
| 		case <-ctx.Done(): | ||||
| 			return | ||||
| 		case e = <-processingResponse: | ||||
| 		} | ||||
| 		if e == nil { | ||||
| 			continue | ||||
| 		} | ||||
| 		if len(p.output) == cap(p.output) { | ||||
| 			klog.V(3).InfoS("Fast watcher, slow processing. Probably caused by slow dispatching events to watchers", "outgoingEvents", outgoingBufSize, "objectType", p.objectType, "groupResource", p.groupResource) | ||||
| 		} | ||||
| 		// If user couldn't receive results fast enough, we also block incoming events from watcher. | ||||
| 		// Because storing events in local will cause more memory usage. | ||||
| 		// The worst case would be closing the fast watcher. | ||||
| 		select { | ||||
| 		case <-ctx.Done(): | ||||
| 			return | ||||
| 		case p.output <- *e: | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (wc *watchChan) filter(obj runtime.Object) bool { | ||||
| 	if wc.internalPred.Empty() { | ||||
| 		return true | ||||
|   | ||||
| @@ -133,6 +133,12 @@ func TestEtcdWatchSemantics(t *testing.T) { | ||||
| 	storagetesting.RunWatchSemantics(ctx, t, store) | ||||
| } | ||||
|  | ||||
| func TestEtcdWatchSemanticsWithConcurrentDecode(t *testing.T) { | ||||
| 	featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConcurrentWatchObjectDecode, true) | ||||
| 	ctx, store, _ := testSetup(t) | ||||
| 	storagetesting.RunWatchSemantics(ctx, t, store) | ||||
| } | ||||
|  | ||||
| func TestEtcdWatchSemanticInitialEventsExtended(t *testing.T) { | ||||
| 	ctx, store, _ := testSetup(t) | ||||
| 	storagetesting.RunWatchSemanticInitialEventsExtended(ctx, t, store) | ||||
|   | ||||
		Reference in New Issue
	
	Block a user