informer: fix race against Run and SetTransform/SetWatchErrorHandler
`SetWatchErrorHandler` claims it will fail if Run() has already started. But if they are called concurrently, it will actually trigger a data race. With this PR: ``` 62702 runs so far, 0 failures (100.00% pass rate). 59.152682ms avg, 189.068387ms max, 26.623785ms min ``` Without this PR: ``` 5012 runs so far, 38 failures (99.25% pass rate). 58.675502ms avg, 186.018084ms max, 29.468104ms min ```
This commit is contained in:
		@@ -459,29 +459,30 @@ func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
 | 
			
		||||
		klog.Warningf("The sharedIndexInformer has started, run more than once is not allowed")
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
 | 
			
		||||
		KnownObjects:          s.indexer,
 | 
			
		||||
		EmitDeltaTypeReplaced: true,
 | 
			
		||||
		Transformer:           s.transform,
 | 
			
		||||
	})
 | 
			
		||||
 | 
			
		||||
	cfg := &Config{
 | 
			
		||||
		Queue:             fifo,
 | 
			
		||||
		ListerWatcher:     s.listerWatcher,
 | 
			
		||||
		ObjectType:        s.objectType,
 | 
			
		||||
		ObjectDescription: s.objectDescription,
 | 
			
		||||
		FullResyncPeriod:  s.resyncCheckPeriod,
 | 
			
		||||
		RetryOnError:      false,
 | 
			
		||||
		ShouldResync:      s.processor.shouldResync,
 | 
			
		||||
 | 
			
		||||
		Process:           s.HandleDeltas,
 | 
			
		||||
		WatchErrorHandler: s.watchErrorHandler,
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	func() {
 | 
			
		||||
		s.startedLock.Lock()
 | 
			
		||||
		defer s.startedLock.Unlock()
 | 
			
		||||
 | 
			
		||||
		fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
 | 
			
		||||
			KnownObjects:          s.indexer,
 | 
			
		||||
			EmitDeltaTypeReplaced: true,
 | 
			
		||||
			Transformer:           s.transform,
 | 
			
		||||
		})
 | 
			
		||||
 | 
			
		||||
		cfg := &Config{
 | 
			
		||||
			Queue:             fifo,
 | 
			
		||||
			ListerWatcher:     s.listerWatcher,
 | 
			
		||||
			ObjectType:        s.objectType,
 | 
			
		||||
			ObjectDescription: s.objectDescription,
 | 
			
		||||
			FullResyncPeriod:  s.resyncCheckPeriod,
 | 
			
		||||
			RetryOnError:      false,
 | 
			
		||||
			ShouldResync:      s.processor.shouldResync,
 | 
			
		||||
 | 
			
		||||
			Process:           s.HandleDeltas,
 | 
			
		||||
			WatchErrorHandler: s.watchErrorHandler,
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		s.controller = New(cfg)
 | 
			
		||||
		s.controller.(*controller).clock = s.clock
 | 
			
		||||
		s.started = true
 | 
			
		||||
 
 | 
			
		||||
@@ -393,6 +393,33 @@ func TestSharedInformerErrorHandling(t *testing.T) {
 | 
			
		||||
	close(stop)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// TestSharedInformerStartRace is a regression test to ensure there is no race between
 | 
			
		||||
// Run and SetWatchErrorHandler, and Run and SetTransform.
 | 
			
		||||
func TestSharedInformerStartRace(t *testing.T) {
 | 
			
		||||
	source := fcache.NewFakeControllerSource()
 | 
			
		||||
	informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
 | 
			
		||||
	stop := make(chan struct{})
 | 
			
		||||
	go func() {
 | 
			
		||||
		for {
 | 
			
		||||
			select {
 | 
			
		||||
			case <-stop:
 | 
			
		||||
				return
 | 
			
		||||
			default:
 | 
			
		||||
			}
 | 
			
		||||
			// Set dummy functions, just to test for race
 | 
			
		||||
			informer.SetTransform(func(i interface{}) (interface{}, error) {
 | 
			
		||||
				return i, nil
 | 
			
		||||
			})
 | 
			
		||||
			informer.SetWatchErrorHandler(func(r *Reflector, err error) {
 | 
			
		||||
			})
 | 
			
		||||
		}
 | 
			
		||||
	}()
 | 
			
		||||
 | 
			
		||||
	go informer.Run(stop)
 | 
			
		||||
 | 
			
		||||
	close(stop)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestSharedInformerTransformer(t *testing.T) {
 | 
			
		||||
	// source simulates an apiserver object endpoint.
 | 
			
		||||
	source := fcache.NewFakeControllerSource()
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user