storage/cacher: dispatchEvents use progressRequester
This commit is contained in:
		@@ -905,7 +905,23 @@ func (c *Cacher) dispatchEvents() {
 | 
				
			|||||||
	bookmarkTimer := c.clock.NewTimer(wait.Jitter(time.Second, 0.25))
 | 
						bookmarkTimer := c.clock.NewTimer(wait.Jitter(time.Second, 0.25))
 | 
				
			||||||
	defer bookmarkTimer.Stop()
 | 
						defer bookmarkTimer.Stop()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// The internal informer populates the RV as soon as it conducts
 | 
				
			||||||
 | 
						// The first successful sync with the underlying store.
 | 
				
			||||||
 | 
						// The cache must wait until this first sync is completed to be deemed ready.
 | 
				
			||||||
 | 
						// Since we cannot send a bookmark when the lastProcessedResourceVersion is 0,
 | 
				
			||||||
 | 
						// we poll aggressively for the first RV before entering the dispatch loop.
 | 
				
			||||||
	lastProcessedResourceVersion := uint64(0)
 | 
						lastProcessedResourceVersion := uint64(0)
 | 
				
			||||||
 | 
						if err := wait.PollUntilContextCancel(wait.ContextForChannel(c.stopCh), 10*time.Millisecond, true, func(_ context.Context) (bool, error) {
 | 
				
			||||||
 | 
							if rv := c.watchCache.getResourceVersion(); rv != 0 {
 | 
				
			||||||
 | 
								lastProcessedResourceVersion = rv
 | 
				
			||||||
 | 
								return true, nil
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							return false, nil
 | 
				
			||||||
 | 
						}); err != nil {
 | 
				
			||||||
 | 
							// given the function above never returns error,
 | 
				
			||||||
 | 
							// the non-empty error means that the stopCh was closed
 | 
				
			||||||
 | 
							return
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
	for {
 | 
						for {
 | 
				
			||||||
		select {
 | 
							select {
 | 
				
			||||||
		case event, ok := <-c.incoming:
 | 
							case event, ok := <-c.incoming:
 | 
				
			||||||
@@ -929,29 +945,6 @@ func (c *Cacher) dispatchEvents() {
 | 
				
			|||||||
			metrics.EventsCounter.WithLabelValues(c.groupResource.String()).Inc()
 | 
								metrics.EventsCounter.WithLabelValues(c.groupResource.String()).Inc()
 | 
				
			||||||
		case <-bookmarkTimer.C():
 | 
							case <-bookmarkTimer.C():
 | 
				
			||||||
			bookmarkTimer.Reset(wait.Jitter(time.Second, 0.25))
 | 
								bookmarkTimer.Reset(wait.Jitter(time.Second, 0.25))
 | 
				
			||||||
			// Never send a bookmark event if we did not see an event here, this is fine
 | 
					 | 
				
			||||||
			// because we don't provide any guarantees on sending bookmarks.
 | 
					 | 
				
			||||||
			//
 | 
					 | 
				
			||||||
			// Just pop closed watchers and requeue others if needed.
 | 
					 | 
				
			||||||
			//
 | 
					 | 
				
			||||||
			// TODO(#115478): rework the following logic
 | 
					 | 
				
			||||||
			//  in a way that would allow more
 | 
					 | 
				
			||||||
			//  efficient cleanup of closed watchers
 | 
					 | 
				
			||||||
			if lastProcessedResourceVersion == 0 {
 | 
					 | 
				
			||||||
				func() {
 | 
					 | 
				
			||||||
					c.Lock()
 | 
					 | 
				
			||||||
					defer c.Unlock()
 | 
					 | 
				
			||||||
					for _, watchers := range c.bookmarkWatchers.popExpiredWatchersThreadUnsafe() {
 | 
					 | 
				
			||||||
						for _, watcher := range watchers {
 | 
					 | 
				
			||||||
							if watcher.stopped {
 | 
					 | 
				
			||||||
								continue
 | 
					 | 
				
			||||||
							}
 | 
					 | 
				
			||||||
							c.bookmarkWatchers.addWatcherThreadUnsafe(watcher)
 | 
					 | 
				
			||||||
						}
 | 
					 | 
				
			||||||
					}
 | 
					 | 
				
			||||||
				}()
 | 
					 | 
				
			||||||
				continue
 | 
					 | 
				
			||||||
			}
 | 
					 | 
				
			||||||
			bookmarkEvent := &watchCacheEvent{
 | 
								bookmarkEvent := &watchCacheEvent{
 | 
				
			||||||
				Type:            watch.Bookmark,
 | 
									Type:            watch.Bookmark,
 | 
				
			||||||
				Object:          c.newFunc(),
 | 
									Object:          c.newFunc(),
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -1902,16 +1902,13 @@ func BenchmarkCacher_GetList(b *testing.B) {
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// TestDoNotPopExpiredWatchersWhenNoEventsSeen makes sure that
 | 
					// TestWatchListIsSynchronisedWhenNoEventsFromStoreReceived makes sure that
 | 
				
			||||||
// a bookmark event will be delivered after the cacher has seen an event.
 | 
					// a bookmark event will be delivered even if the cacher has not received an event.
 | 
				
			||||||
// Previously the watchers have been removed from the "want bookmark" queue.
 | 
					func TestWatchListIsSynchronisedWhenNoEventsFromStoreReceived(t *testing.T) {
 | 
				
			||||||
func TestDoNotPopExpiredWatchersWhenNoEventsSeen(t *testing.T) {
 | 
					 | 
				
			||||||
	featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchList, true)
 | 
						featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchList, true)
 | 
				
			||||||
	backingStorage := &dummyStorage{}
 | 
						backingStorage := &dummyStorage{}
 | 
				
			||||||
	cacher, _, err := newTestCacher(backingStorage)
 | 
						cacher, _, err := newTestCacher(backingStorage)
 | 
				
			||||||
	if err != nil {
 | 
						require.NoError(t, err, "failed to create cacher")
 | 
				
			||||||
		t.Fatalf("Couldn't create cacher: %v", err)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	defer cacher.Stop()
 | 
						defer cacher.Stop()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// wait until cacher is initialized.
 | 
						// wait until cacher is initialized.
 | 
				
			||||||
@@ -1929,29 +1926,10 @@ func TestDoNotPopExpiredWatchersWhenNoEventsSeen(t *testing.T) {
 | 
				
			|||||||
	require.NoError(t, err, "failed to create watch: %v")
 | 
						require.NoError(t, err, "failed to create watch: %v")
 | 
				
			||||||
	defer w.Stop()
 | 
						defer w.Stop()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// Ensure that popExpiredWatchers is called to ensure that our watch isn't removed from bookmarkWatchers.
 | 
					 | 
				
			||||||
	// We do that every ~1s, so waiting 2 seconds seems enough.
 | 
					 | 
				
			||||||
	time.Sleep(2 * time.Second)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	// Send an event to ensure that lastProcessedResourceVersion in Cacher will change to non-zero value.
 | 
					 | 
				
			||||||
	makePod := func(rv uint64) *example.Pod {
 | 
					 | 
				
			||||||
		return &example.Pod{
 | 
					 | 
				
			||||||
			ObjectMeta: metav1.ObjectMeta{
 | 
					 | 
				
			||||||
				Name:            fmt.Sprintf("pod-%d", rv),
 | 
					 | 
				
			||||||
				Namespace:       "ns",
 | 
					 | 
				
			||||||
				ResourceVersion: fmt.Sprintf("%d", rv),
 | 
					 | 
				
			||||||
				Annotations:     map[string]string{},
 | 
					 | 
				
			||||||
			},
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	err = cacher.watchCache.Add(makePod(102))
 | 
					 | 
				
			||||||
	require.NoError(t, err)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	verifyEvents(t, w, []watch.Event{
 | 
						verifyEvents(t, w, []watch.Event{
 | 
				
			||||||
		{Type: watch.Added, Object: makePod(102)},
 | 
					 | 
				
			||||||
		{Type: watch.Bookmark, Object: &example.Pod{
 | 
							{Type: watch.Bookmark, Object: &example.Pod{
 | 
				
			||||||
			ObjectMeta: metav1.ObjectMeta{
 | 
								ObjectMeta: metav1.ObjectMeta{
 | 
				
			||||||
				ResourceVersion: "102",
 | 
									ResourceVersion: "100",
 | 
				
			||||||
				Annotations:     map[string]string{metav1.InitialEventsAnnotationKey: "true"},
 | 
									Annotations:     map[string]string{metav1.InitialEventsAnnotationKey: "true"},
 | 
				
			||||||
			},
 | 
								},
 | 
				
			||||||
		}},
 | 
							}},
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -641,6 +641,12 @@ func (w *watchCache) Resync() error {
 | 
				
			|||||||
	return nil
 | 
						return nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (w *watchCache) getResourceVersion() uint64 {
 | 
				
			||||||
 | 
						w.RLock()
 | 
				
			||||||
 | 
						defer w.RUnlock()
 | 
				
			||||||
 | 
						return w.resourceVersion
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (w *watchCache) currentCapacity() int {
 | 
					func (w *watchCache) currentCapacity() int {
 | 
				
			||||||
	w.RLock()
 | 
						w.RLock()
 | 
				
			||||||
	defer w.RUnlock()
 | 
						defer w.RUnlock()
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user