allow handler to join after the informer has started
This commit is contained in:
		@@ -88,6 +88,10 @@ type sharedIndexInformer struct {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	started     bool
 | 
						started     bool
 | 
				
			||||||
	startedLock sync.Mutex
 | 
						startedLock sync.Mutex
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// blockDeltas gives a way to stop all event distribution so that a late event handler
 | 
				
			||||||
 | 
						// can safely join the shared informer.
 | 
				
			||||||
 | 
						blockDeltas sync.Mutex
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// dummyController hides the fact that a SharedInformer is different from a dedicated one
 | 
					// dummyController hides the fact that a SharedInformer is different from a dedicated one
 | 
				
			||||||
@@ -199,16 +203,35 @@ func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) erro
 | 
				
			|||||||
	s.startedLock.Lock()
 | 
						s.startedLock.Lock()
 | 
				
			||||||
	defer s.startedLock.Unlock()
 | 
						defer s.startedLock.Unlock()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if s.started {
 | 
						if !s.started {
 | 
				
			||||||
		return fmt.Errorf("informer has already started")
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
		listener := newProcessListener(handler)
 | 
							listener := newProcessListener(handler)
 | 
				
			||||||
		s.processor.listeners = append(s.processor.listeners, listener)
 | 
							s.processor.listeners = append(s.processor.listeners, listener)
 | 
				
			||||||
		return nil
 | 
							return nil
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// in order to safely join, we have to
 | 
				
			||||||
 | 
						// 1. stop sending add/update/delete notifications
 | 
				
			||||||
 | 
						// 2. do a list against the store
 | 
				
			||||||
 | 
						// 3. send synthetic "Add" events to the new handler
 | 
				
			||||||
 | 
						// 4. unblock
 | 
				
			||||||
 | 
						s.blockDeltas.Lock()
 | 
				
			||||||
 | 
						defer s.blockDeltas.Unlock()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						listener := newProcessListener(handler)
 | 
				
			||||||
 | 
						s.processor.listeners = append(s.processor.listeners, listener)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						items := s.indexer.List()
 | 
				
			||||||
 | 
						for i := range items {
 | 
				
			||||||
 | 
							listener.add(addNotification{newObj: items[i]})
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						return nil
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
 | 
					func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
 | 
				
			||||||
 | 
						s.blockDeltas.Lock()
 | 
				
			||||||
 | 
						defer s.blockDeltas.Unlock()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// from oldest to newest
 | 
						// from oldest to newest
 | 
				
			||||||
	for _, d := range obj.(cache.Deltas) {
 | 
						for _, d := range obj.(cache.Deltas) {
 | 
				
			||||||
		switch d.Type {
 | 
							switch d.Type {
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user