Merge pull request #125115 from serathius/progress-notify-timer
Improve progress notify to have more correct and predictable period of triggering
This commit is contained in:
		@@ -140,30 +140,41 @@ func newTestWatchCache(capacity int, indexers *cache.Indexers) *testWatchCache {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
type immediateTickerFactory struct{}
 | 
					type immediateTickerFactory struct{}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (t *immediateTickerFactory) NewTicker(d time.Duration) clock.Ticker {
 | 
					func (t *immediateTickerFactory) NewTimer(d time.Duration) clock.Timer {
 | 
				
			||||||
	return &immediateTicker{stopCh: make(chan struct{})}
 | 
						timer := immediateTicker{
 | 
				
			||||||
 | 
							c: make(chan time.Time),
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						timer.Reset(d)
 | 
				
			||||||
 | 
						return &timer
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
type immediateTicker struct {
 | 
					type immediateTicker struct {
 | 
				
			||||||
	stopCh chan struct{}
 | 
						c chan time.Time
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (t *immediateTicker) Reset(d time.Duration) (active bool) {
 | 
				
			||||||
 | 
						select {
 | 
				
			||||||
 | 
						case <-t.c:
 | 
				
			||||||
 | 
							active = true
 | 
				
			||||||
 | 
						default:
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						go func() {
 | 
				
			||||||
 | 
							t.c <- time.Now()
 | 
				
			||||||
 | 
						}()
 | 
				
			||||||
 | 
						return active
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (t *immediateTicker) C() <-chan time.Time {
 | 
					func (t *immediateTicker) C() <-chan time.Time {
 | 
				
			||||||
	ch := make(chan time.Time)
 | 
						return t.c
 | 
				
			||||||
	go func() {
 | 
					 | 
				
			||||||
		for {
 | 
					 | 
				
			||||||
			select {
 | 
					 | 
				
			||||||
			case ch <- time.Now():
 | 
					 | 
				
			||||||
			case <-t.stopCh:
 | 
					 | 
				
			||||||
				return
 | 
					 | 
				
			||||||
			}
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	}()
 | 
					 | 
				
			||||||
	return ch
 | 
					 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (t *immediateTicker) Stop() {
 | 
					func (t *immediateTicker) Stop() bool {
 | 
				
			||||||
	close(t.stopCh)
 | 
						select {
 | 
				
			||||||
 | 
						case <-t.c:
 | 
				
			||||||
 | 
							return true
 | 
				
			||||||
 | 
						default:
 | 
				
			||||||
 | 
							return false
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (w *testWatchCache) RequestWatchProgress(ctx context.Context) error {
 | 
					func (w *testWatchCache) RequestWatchProgress(ctx context.Context) error {
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -42,14 +42,14 @@ func newConditionalProgressRequester(requestWatchProgress WatchProgressRequester
 | 
				
			|||||||
		requestWatchProgress: requestWatchProgress,
 | 
							requestWatchProgress: requestWatchProgress,
 | 
				
			||||||
		contextMetadata:      contextMetadata,
 | 
							contextMetadata:      contextMetadata,
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	pr.cond = sync.NewCond(pr.mux.RLocker())
 | 
						pr.cond = sync.NewCond(&pr.mux)
 | 
				
			||||||
	return pr
 | 
						return pr
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
type WatchProgressRequester func(ctx context.Context) error
 | 
					type WatchProgressRequester func(ctx context.Context) error
 | 
				
			||||||
 | 
					
 | 
				
			||||||
type TickerFactory interface {
 | 
					type TickerFactory interface {
 | 
				
			||||||
	NewTicker(time.Duration) clock.Ticker
 | 
						NewTimer(time.Duration) clock.Timer
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// conditionalProgressRequester will request progress notification if there
 | 
					// conditionalProgressRequester will request progress notification if there
 | 
				
			||||||
@@ -59,7 +59,7 @@ type conditionalProgressRequester struct {
 | 
				
			|||||||
	requestWatchProgress WatchProgressRequester
 | 
						requestWatchProgress WatchProgressRequester
 | 
				
			||||||
	contextMetadata      metadata.MD
 | 
						contextMetadata      metadata.MD
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	mux     sync.RWMutex
 | 
						mux     sync.Mutex
 | 
				
			||||||
	cond    *sync.Cond
 | 
						cond    *sync.Cond
 | 
				
			||||||
	waiting int
 | 
						waiting int
 | 
				
			||||||
	stopped bool
 | 
						stopped bool
 | 
				
			||||||
@@ -78,12 +78,12 @@ func (pr *conditionalProgressRequester) Run(stopCh <-chan struct{}) {
 | 
				
			|||||||
		pr.stopped = true
 | 
							pr.stopped = true
 | 
				
			||||||
		pr.cond.Signal()
 | 
							pr.cond.Signal()
 | 
				
			||||||
	}()
 | 
						}()
 | 
				
			||||||
	ticker := pr.clock.NewTicker(progressRequestPeriod)
 | 
						timer := pr.clock.NewTimer(progressRequestPeriod)
 | 
				
			||||||
	defer ticker.Stop()
 | 
						defer timer.Stop()
 | 
				
			||||||
	for {
 | 
						for {
 | 
				
			||||||
		stopped := func() bool {
 | 
							stopped := func() bool {
 | 
				
			||||||
			pr.mux.RLock()
 | 
								pr.mux.Lock()
 | 
				
			||||||
			defer pr.mux.RUnlock()
 | 
								defer pr.mux.Unlock()
 | 
				
			||||||
			for pr.waiting == 0 && !pr.stopped {
 | 
								for pr.waiting == 0 && !pr.stopped {
 | 
				
			||||||
				pr.cond.Wait()
 | 
									pr.cond.Wait()
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
@@ -94,15 +94,17 @@ func (pr *conditionalProgressRequester) Run(stopCh <-chan struct{}) {
 | 
				
			|||||||
		}
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		select {
 | 
							select {
 | 
				
			||||||
		case <-ticker.C():
 | 
							case <-timer.C():
 | 
				
			||||||
			shouldRequest := func() bool {
 | 
								shouldRequest := func() bool {
 | 
				
			||||||
				pr.mux.RLock()
 | 
									pr.mux.Lock()
 | 
				
			||||||
				defer pr.mux.RUnlock()
 | 
									defer pr.mux.Unlock()
 | 
				
			||||||
				return pr.waiting > 0 && !pr.stopped
 | 
									return pr.waiting > 0 && !pr.stopped
 | 
				
			||||||
			}()
 | 
								}()
 | 
				
			||||||
			if !shouldRequest {
 | 
								if !shouldRequest {
 | 
				
			||||||
 | 
									timer.Reset(0)
 | 
				
			||||||
				continue
 | 
									continue
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
 | 
								timer.Reset(progressRequestPeriod)
 | 
				
			||||||
			err := pr.requestWatchProgress(ctx)
 | 
								err := pr.requestWatchProgress(ctx)
 | 
				
			||||||
			if err != nil {
 | 
								if err != nil {
 | 
				
			||||||
				klog.V(4).InfoS("Error requesting bookmark", "err", err)
 | 
									klog.V(4).InfoS("Error requesting bookmark", "err", err)
 | 
				
			||||||
@@ -124,5 +126,4 @@ func (pr *conditionalProgressRequester) Remove() {
 | 
				
			|||||||
	pr.mux.Lock()
 | 
						pr.mux.Lock()
 | 
				
			||||||
	defer pr.mux.Unlock()
 | 
						defer pr.mux.Unlock()
 | 
				
			||||||
	pr.waiting -= 1
 | 
						pr.waiting -= 1
 | 
				
			||||||
	pr.cond.Signal()
 | 
					 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user