Implement conditionalProgressRequester that allows requesting watch progress notification if watch cache is not fresh
This commit is contained in:
		@@ -104,7 +104,7 @@ type Config struct {
 | 
			
		||||
 | 
			
		||||
	Codec runtime.Codec
 | 
			
		||||
 | 
			
		||||
	Clock clock.Clock
 | 
			
		||||
	Clock clock.WithTicker
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type watchersMap map[int]*cacheWatcher
 | 
			
		||||
@@ -329,6 +329,10 @@ type Cacher struct {
 | 
			
		||||
	expiredBookmarkWatchers []*cacheWatcher
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *Cacher) RequestWatchProgress(ctx context.Context) error {
 | 
			
		||||
	return c.storage.RequestWatchProgress(ctx)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// NewCacherFromConfig creates a new Cacher responsible for servicing WATCH and LIST requests from
 | 
			
		||||
// its internal cache and updating its cache in the background based on the
 | 
			
		||||
// given configuration.
 | 
			
		||||
@@ -397,9 +401,9 @@ func NewCacherFromConfig(config Config) (*Cacher, error) {
 | 
			
		||||
		// so that future reuse does not get a spurious timeout.
 | 
			
		||||
		<-cacher.timer.C
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	progressRequester := newConditionalProgressRequester(config.Storage.RequestWatchProgress, config.Clock)
 | 
			
		||||
	watchCache := newWatchCache(
 | 
			
		||||
		config.KeyFunc, cacher.processEvent, config.GetAttrsFunc, config.Versioner, config.Indexers, config.Clock, config.GroupResource)
 | 
			
		||||
		config.KeyFunc, cacher.processEvent, config.GetAttrsFunc, config.Versioner, config.Indexers, config.Clock, config.GroupResource, progressRequester)
 | 
			
		||||
	listerWatcher := NewListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc)
 | 
			
		||||
	reflectorName := "storage/cacher.go:" + config.ResourcePrefix
 | 
			
		||||
 | 
			
		||||
@@ -419,6 +423,7 @@ func NewCacherFromConfig(config Config) (*Cacher, error) {
 | 
			
		||||
	cacher.reflector = reflector
 | 
			
		||||
 | 
			
		||||
	go cacher.dispatchEvents()
 | 
			
		||||
	go progressRequester.Run(stopCh)
 | 
			
		||||
 | 
			
		||||
	cacher.stopWg.Add(1)
 | 
			
		||||
	go func() {
 | 
			
		||||
 
 | 
			
		||||
@@ -328,7 +328,7 @@ type setupOptions struct {
 | 
			
		||||
	keyFunc        func(runtime.Object) (string, error)
 | 
			
		||||
	indexerFuncs   map[string]storage.IndexerFunc
 | 
			
		||||
	pagingEnabled  bool
 | 
			
		||||
	clock          clock.Clock
 | 
			
		||||
	clock          clock.WithTicker
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type setupOption func(*setupOptions)
 | 
			
		||||
 
 | 
			
		||||
@@ -90,6 +90,10 @@ type dummyStorage struct {
 | 
			
		||||
	watchFn   func(_ context.Context, _ string, _ storage.ListOptions) (watch.Interface, error)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (d *dummyStorage) RequestWatchProgress(ctx context.Context) error {
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type dummyWatch struct {
 | 
			
		||||
	ch chan watch.Event
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -196,6 +196,10 @@ type watchCache struct {
 | 
			
		||||
 | 
			
		||||
	// For testing cache interval invalidation.
 | 
			
		||||
	indexValidator indexValidator
 | 
			
		||||
 | 
			
		||||
	// Requests progress notification if there are requests waiting for watch
 | 
			
		||||
	// to be fresh
 | 
			
		||||
	waitingUntilFresh *conditionalProgressRequester
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func newWatchCache(
 | 
			
		||||
@@ -204,8 +208,9 @@ func newWatchCache(
 | 
			
		||||
	getAttrsFunc func(runtime.Object) (labels.Set, fields.Set, error),
 | 
			
		||||
	versioner storage.Versioner,
 | 
			
		||||
	indexers *cache.Indexers,
 | 
			
		||||
	clock clock.Clock,
 | 
			
		||||
	groupResource schema.GroupResource) *watchCache {
 | 
			
		||||
	clock clock.WithTicker,
 | 
			
		||||
	groupResource schema.GroupResource,
 | 
			
		||||
	progressRequester *conditionalProgressRequester) *watchCache {
 | 
			
		||||
	wc := &watchCache{
 | 
			
		||||
		capacity:            defaultLowerBoundCapacity,
 | 
			
		||||
		keyFunc:             keyFunc,
 | 
			
		||||
@@ -222,6 +227,7 @@ func newWatchCache(
 | 
			
		||||
		clock:               clock,
 | 
			
		||||
		versioner:           versioner,
 | 
			
		||||
		groupResource:       groupResource,
 | 
			
		||||
		waitingUntilFresh:   progressRequester,
 | 
			
		||||
	}
 | 
			
		||||
	metrics.WatchCacheCapacity.WithLabelValues(groupResource.String()).Set(float64(wc.capacity))
 | 
			
		||||
	wc.cond = sync.NewCond(wc.RLocker())
 | 
			
		||||
 
 | 
			
		||||
@@ -287,6 +287,7 @@ func TestCacheIntervalNextFromWatchCache(t *testing.T) {
 | 
			
		||||
	for _, c := range cases {
 | 
			
		||||
		t.Run(c.name, func(t *testing.T) {
 | 
			
		||||
			wc := newTestWatchCache(capacity, &cache.Indexers{})
 | 
			
		||||
			defer wc.Stop()
 | 
			
		||||
			for i := 0; i < c.eventsAddedToWatchcache; i++ {
 | 
			
		||||
				wc.Add(makeTestPod(fmt.Sprintf("pod%d", i), uint64(i)))
 | 
			
		||||
			}
 | 
			
		||||
 
 | 
			
		||||
@@ -68,6 +68,9 @@ func makeTestStoreElement(pod *v1.Pod) *storeElement {
 | 
			
		||||
 | 
			
		||||
type testWatchCache struct {
 | 
			
		||||
	*watchCache
 | 
			
		||||
 | 
			
		||||
	bookmarkRevision chan int64
 | 
			
		||||
	stopCh           chan struct{}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (w *testWatchCache) getAllEventsSince(resourceVersion uint64) ([]*watchCacheEvent, error) {
 | 
			
		||||
@@ -112,7 +115,13 @@ func newTestWatchCache(capacity int, indexers *cache.Indexers) *testWatchCache {
 | 
			
		||||
	}
 | 
			
		||||
	versioner := storage.APIObjectVersioner{}
 | 
			
		||||
	mockHandler := func(*watchCacheEvent) {}
 | 
			
		||||
	wc := newWatchCache(keyFunc, mockHandler, getAttrsFunc, versioner, indexers, testingclock.NewFakeClock(time.Now()), schema.GroupResource{Resource: "pods"})
 | 
			
		||||
	wc := &testWatchCache{}
 | 
			
		||||
	wc.bookmarkRevision = make(chan int64, 1)
 | 
			
		||||
	wc.stopCh = make(chan struct{})
 | 
			
		||||
	clock := testingclock.NewFakeClock(time.Now())
 | 
			
		||||
	pr := newConditionalProgressRequester(wc.RequestWatchProgress, clock)
 | 
			
		||||
	go pr.Run(wc.stopCh)
 | 
			
		||||
	wc.watchCache = newWatchCache(keyFunc, mockHandler, getAttrsFunc, versioner, indexers, clock, schema.GroupResource{Resource: "pods"}, pr)
 | 
			
		||||
	// To preserve behavior of tests that assume a given capacity,
 | 
			
		||||
	// resize it to th expected size.
 | 
			
		||||
	wc.capacity = capacity
 | 
			
		||||
@@ -120,11 +129,28 @@ func newTestWatchCache(capacity int, indexers *cache.Indexers) *testWatchCache {
 | 
			
		||||
	wc.lowerBoundCapacity = min(capacity, defaultLowerBoundCapacity)
 | 
			
		||||
	wc.upperBoundCapacity = max(capacity, defaultUpperBoundCapacity)
 | 
			
		||||
 | 
			
		||||
	return &testWatchCache{watchCache: wc}
 | 
			
		||||
	return wc
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (w *testWatchCache) RequestWatchProgress(ctx context.Context) error {
 | 
			
		||||
	go func() {
 | 
			
		||||
		select {
 | 
			
		||||
		case rev := <-w.bookmarkRevision:
 | 
			
		||||
			w.UpdateResourceVersion(fmt.Sprintf("%d", rev))
 | 
			
		||||
		case <-ctx.Done():
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
	}()
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (w *testWatchCache) Stop() {
 | 
			
		||||
	close(w.stopCh)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestWatchCacheBasic(t *testing.T) {
 | 
			
		||||
	store := newTestWatchCache(2, &cache.Indexers{})
 | 
			
		||||
	defer store.Stop()
 | 
			
		||||
 | 
			
		||||
	// Test Add/Update/Delete.
 | 
			
		||||
	pod1 := makeTestPod("pod", 1)
 | 
			
		||||
@@ -202,6 +228,7 @@ func TestWatchCacheBasic(t *testing.T) {
 | 
			
		||||
 | 
			
		||||
func TestEvents(t *testing.T) {
 | 
			
		||||
	store := newTestWatchCache(5, &cache.Indexers{})
 | 
			
		||||
	defer store.Stop()
 | 
			
		||||
 | 
			
		||||
	// no dynamic-size cache to fit old tests.
 | 
			
		||||
	store.lowerBoundCapacity = 5
 | 
			
		||||
@@ -326,6 +353,7 @@ func TestEvents(t *testing.T) {
 | 
			
		||||
 | 
			
		||||
func TestMarker(t *testing.T) {
 | 
			
		||||
	store := newTestWatchCache(3, &cache.Indexers{})
 | 
			
		||||
	defer store.Stop()
 | 
			
		||||
 | 
			
		||||
	// First thing that is called when propagated from storage is Replace.
 | 
			
		||||
	store.Replace([]interface{}{
 | 
			
		||||
@@ -380,7 +408,7 @@ func TestWaitUntilFreshAndList(t *testing.T) {
 | 
			
		||||
			return []string{pod.Spec.NodeName}, nil
 | 
			
		||||
		},
 | 
			
		||||
	})
 | 
			
		||||
 | 
			
		||||
	defer store.Stop()
 | 
			
		||||
	// In background, update the store.
 | 
			
		||||
	go func() {
 | 
			
		||||
		store.Add(makeTestPodDetails("pod1", 2, "node1", map[string]string{"label": "value1"}))
 | 
			
		||||
@@ -463,6 +491,7 @@ func TestWaitUntilFreshAndList(t *testing.T) {
 | 
			
		||||
func TestWaitUntilFreshAndGet(t *testing.T) {
 | 
			
		||||
	ctx := context.Background()
 | 
			
		||||
	store := newTestWatchCache(3, &cache.Indexers{})
 | 
			
		||||
	defer store.Stop()
 | 
			
		||||
 | 
			
		||||
	// In background, update the store.
 | 
			
		||||
	go func() {
 | 
			
		||||
@@ -489,6 +518,7 @@ func TestWaitUntilFreshAndGet(t *testing.T) {
 | 
			
		||||
func TestWaitUntilFreshAndListTimeout(t *testing.T) {
 | 
			
		||||
	ctx := context.Background()
 | 
			
		||||
	store := newTestWatchCache(3, &cache.Indexers{})
 | 
			
		||||
	defer store.Stop()
 | 
			
		||||
	fc := store.clock.(*testingclock.FakeClock)
 | 
			
		||||
 | 
			
		||||
	// In background, step clock after the below call starts the timer.
 | 
			
		||||
@@ -529,6 +559,7 @@ func (t *testLW) Watch(options metav1.ListOptions) (watch.Interface, error) {
 | 
			
		||||
func TestReflectorForWatchCache(t *testing.T) {
 | 
			
		||||
	ctx := context.Background()
 | 
			
		||||
	store := newTestWatchCache(5, &cache.Indexers{})
 | 
			
		||||
	defer store.Stop()
 | 
			
		||||
 | 
			
		||||
	{
 | 
			
		||||
		_, version, _, err := store.WaitUntilFreshAndList(ctx, 0, nil)
 | 
			
		||||
@@ -792,6 +823,7 @@ func TestDynamicCache(t *testing.T) {
 | 
			
		||||
	for _, test := range tests {
 | 
			
		||||
		t.Run(test.name, func(t *testing.T) {
 | 
			
		||||
			store := newTestWatchCache(test.cacheCapacity, &cache.Indexers{})
 | 
			
		||||
			defer store.Stop()
 | 
			
		||||
			store.cache = make([]*watchCacheEvent, test.cacheCapacity)
 | 
			
		||||
			store.startIndex = test.startIndex
 | 
			
		||||
			store.lowerBoundCapacity = test.lowerBoundCapacity
 | 
			
		||||
@@ -840,6 +872,7 @@ func checkCacheElements(cache *testWatchCache) bool {
 | 
			
		||||
 | 
			
		||||
func TestCacheIncreaseDoesNotBreakWatch(t *testing.T) {
 | 
			
		||||
	store := newTestWatchCache(2, &cache.Indexers{})
 | 
			
		||||
	defer store.Stop()
 | 
			
		||||
 | 
			
		||||
	now := store.clock.Now()
 | 
			
		||||
	addEvent := func(key string, rv uint64, t time.Time) {
 | 
			
		||||
@@ -988,6 +1021,7 @@ func TestSuggestedWatchChannelSize(t *testing.T) {
 | 
			
		||||
	for _, test := range testCases {
 | 
			
		||||
		t.Run(test.name, func(t *testing.T) {
 | 
			
		||||
			store := newTestWatchCache(test.capacity, &cache.Indexers{})
 | 
			
		||||
			defer store.Stop()
 | 
			
		||||
			got := store.suggestedWatchChannelSize(test.indexExists, test.triggerUsed)
 | 
			
		||||
			if got != test.expected {
 | 
			
		||||
				t.Errorf("unexpected channel size got: %v, expected: %v", got, test.expected)
 | 
			
		||||
@@ -998,6 +1032,7 @@ func TestSuggestedWatchChannelSize(t *testing.T) {
 | 
			
		||||
 | 
			
		||||
func BenchmarkWatchCache_updateCache(b *testing.B) {
 | 
			
		||||
	store := newTestWatchCache(defaultUpperBoundCapacity, &cache.Indexers{})
 | 
			
		||||
	defer store.Stop()
 | 
			
		||||
	store.cache = store.cache[:0]
 | 
			
		||||
	store.upperBoundCapacity = defaultUpperBoundCapacity
 | 
			
		||||
	loadEventWithDuration(store, defaultUpperBoundCapacity, 0)
 | 
			
		||||
 
 | 
			
		||||
@@ -0,0 +1,117 @@
 | 
			
		||||
/*
 | 
			
		||||
Copyright 2023 The Kubernetes Authors.
 | 
			
		||||
 | 
			
		||||
Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
you may not use this file except in compliance with the License.
 | 
			
		||||
You may obtain a copy of the License at
 | 
			
		||||
 | 
			
		||||
    http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 | 
			
		||||
Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
See the License for the specific language governing permissions and
 | 
			
		||||
limitations under the License.
 | 
			
		||||
*/
 | 
			
		||||
 | 
			
		||||
package cacher
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"context"
 | 
			
		||||
	"sync"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/util/wait"
 | 
			
		||||
 | 
			
		||||
	"k8s.io/klog/v2"
 | 
			
		||||
	"k8s.io/utils/clock"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
const (
 | 
			
		||||
	// progressRequestPeriod determines period of requesting progress
 | 
			
		||||
	// from etcd when there is a request waiting for watch cache to be fresh.
 | 
			
		||||
	progressRequestPeriod = 100 * time.Millisecond
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func newConditionalProgressRequester(requestWatchProgress WatchProgressRequester, clock clock.WithTicker) *conditionalProgressRequester {
 | 
			
		||||
	pr := &conditionalProgressRequester{
 | 
			
		||||
		clock:                clock,
 | 
			
		||||
		requestWatchProgress: requestWatchProgress,
 | 
			
		||||
	}
 | 
			
		||||
	pr.cond = sync.NewCond(pr.mux.RLocker())
 | 
			
		||||
	return pr
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type WatchProgressRequester func(ctx context.Context) error
 | 
			
		||||
 | 
			
		||||
// conditionalProgressRequester will request progress notification if there
 | 
			
		||||
// is a request waiting for watch cache to be fresh.
 | 
			
		||||
type conditionalProgressRequester struct {
 | 
			
		||||
	clock                clock.WithTicker
 | 
			
		||||
	requestWatchProgress WatchProgressRequester
 | 
			
		||||
 | 
			
		||||
	mux     sync.RWMutex
 | 
			
		||||
	cond    *sync.Cond
 | 
			
		||||
	waiting int
 | 
			
		||||
	stopped bool
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (pr *conditionalProgressRequester) Run(stopCh <-chan struct{}) {
 | 
			
		||||
	ctx := wait.ContextForChannel(stopCh)
 | 
			
		||||
	go func() {
 | 
			
		||||
		defer utilruntime.HandleCrash()
 | 
			
		||||
		<-stopCh
 | 
			
		||||
		pr.mux.Lock()
 | 
			
		||||
		defer pr.mux.Unlock()
 | 
			
		||||
		pr.stopped = true
 | 
			
		||||
		pr.cond.Signal()
 | 
			
		||||
	}()
 | 
			
		||||
	ticker := pr.clock.NewTicker(progressRequestPeriod)
 | 
			
		||||
	defer ticker.Stop()
 | 
			
		||||
	for {
 | 
			
		||||
		stopped := func() bool {
 | 
			
		||||
			pr.mux.RLock()
 | 
			
		||||
			defer pr.mux.RUnlock()
 | 
			
		||||
			for pr.waiting == 0 && !pr.stopped {
 | 
			
		||||
				pr.cond.Wait()
 | 
			
		||||
			}
 | 
			
		||||
			return pr.stopped
 | 
			
		||||
		}()
 | 
			
		||||
		if stopped {
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		select {
 | 
			
		||||
		case <-ticker.C():
 | 
			
		||||
			shouldRequest := func() bool {
 | 
			
		||||
				pr.mux.RLock()
 | 
			
		||||
				defer pr.mux.RUnlock()
 | 
			
		||||
				return pr.waiting > 0 && !pr.stopped
 | 
			
		||||
			}()
 | 
			
		||||
			if !shouldRequest {
 | 
			
		||||
				continue
 | 
			
		||||
			}
 | 
			
		||||
			err := pr.requestWatchProgress(ctx)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				klog.V(4).InfoS("Error requesting bookmark", "err", err)
 | 
			
		||||
			}
 | 
			
		||||
		case <-stopCh:
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (pr *conditionalProgressRequester) Add() {
 | 
			
		||||
	pr.mux.Lock()
 | 
			
		||||
	defer pr.mux.Unlock()
 | 
			
		||||
	pr.waiting += 1
 | 
			
		||||
	pr.cond.Signal()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (pr *conditionalProgressRequester) Remove() {
 | 
			
		||||
	pr.mux.Lock()
 | 
			
		||||
	defer pr.mux.Unlock()
 | 
			
		||||
	pr.waiting -= 1
 | 
			
		||||
	pr.cond.Signal()
 | 
			
		||||
}
 | 
			
		||||
@@ -0,0 +1,129 @@
 | 
			
		||||
/*
 | 
			
		||||
Copyright 2023 The Kubernetes Authors.
 | 
			
		||||
 | 
			
		||||
Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
you may not use this file except in compliance with the License.
 | 
			
		||||
You may obtain a copy of the License at
 | 
			
		||||
 | 
			
		||||
    http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 | 
			
		||||
Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
See the License for the specific language governing permissions and
 | 
			
		||||
limitations under the License.
 | 
			
		||||
*/
 | 
			
		||||
 | 
			
		||||
package cacher
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"context"
 | 
			
		||||
	"sync/atomic"
 | 
			
		||||
	"testing"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"k8s.io/apimachinery/pkg/util/wait"
 | 
			
		||||
 | 
			
		||||
	"k8s.io/klog/v2"
 | 
			
		||||
	"k8s.io/klog/v2/ktesting"
 | 
			
		||||
	"k8s.io/utils/clock"
 | 
			
		||||
	testingclock "k8s.io/utils/clock/testing"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
var (
 | 
			
		||||
	pollPeriod      = time.Millisecond
 | 
			
		||||
	minimalNoChange = 20 * time.Millisecond
 | 
			
		||||
	pollTimeout     = 5 * time.Second
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func TestConditionalProgressRequester(t *testing.T) {
 | 
			
		||||
	_, ctx := ktesting.NewTestContext(t)
 | 
			
		||||
	logger := klog.FromContext(ctx)
 | 
			
		||||
 | 
			
		||||
	clock := testingclock.NewFakeClock(time.Now())
 | 
			
		||||
	pr := newTestConditionalProgressRequester(clock)
 | 
			
		||||
	stopCh := make(chan struct{})
 | 
			
		||||
	go pr.Run(stopCh)
 | 
			
		||||
	var wantRequestsSent int32
 | 
			
		||||
	var requestsSent int32
 | 
			
		||||
 | 
			
		||||
	logger.Info("No progress requests if no-one is waiting")
 | 
			
		||||
	clock.Step(progressRequestPeriod * 2)
 | 
			
		||||
 | 
			
		||||
	if err := pollConditionNoChange(pollPeriod, minimalNoChange, pollTimeout, func() bool {
 | 
			
		||||
		requestsSent = pr.progressRequestsSentCount.Load()
 | 
			
		||||
		return requestsSent == wantRequestsSent
 | 
			
		||||
	}); err != nil {
 | 
			
		||||
		t.Errorf("Failed to wait progress requests, err: %s, want: %d , got %d", err, wantRequestsSent, requestsSent)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	logger.Info("Adding allows progress request to be sent every period")
 | 
			
		||||
	pr.Add()
 | 
			
		||||
	for wantRequestsSent < 10 {
 | 
			
		||||
		clock.Step(progressRequestPeriod)
 | 
			
		||||
		wantRequestsSent++
 | 
			
		||||
 | 
			
		||||
		if err := pollConditionNoChange(pollPeriod, minimalNoChange, pollTimeout, func() bool {
 | 
			
		||||
			requestsSent = pr.progressRequestsSentCount.Load()
 | 
			
		||||
			return requestsSent == wantRequestsSent
 | 
			
		||||
		}); err != nil {
 | 
			
		||||
			t.Errorf("Failed to wait progress requests, err: %s, want: %d , got %d", err, wantRequestsSent, requestsSent)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	pr.Remove()
 | 
			
		||||
 | 
			
		||||
	logger.Info("No progress requests if no-one is waiting")
 | 
			
		||||
	clock.Step(progressRequestPeriod * 2)
 | 
			
		||||
	if err := pollConditionNoChange(pollPeriod, minimalNoChange, pollTimeout, func() bool {
 | 
			
		||||
		requestsSent = pr.progressRequestsSentCount.Load()
 | 
			
		||||
		return requestsSent == wantRequestsSent
 | 
			
		||||
	}); err != nil {
 | 
			
		||||
		t.Errorf("Failed to wait progress requests, err: %s, want: %d , got %d", err, wantRequestsSent, requestsSent)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	logger.Info("No progress after stopping")
 | 
			
		||||
	close(stopCh)
 | 
			
		||||
	if err := pollConditionNoChange(pollPeriod, minimalNoChange, pollTimeout, func() bool {
 | 
			
		||||
		requestsSent = pr.progressRequestsSentCount.Load()
 | 
			
		||||
		return requestsSent == wantRequestsSent
 | 
			
		||||
	}); err != nil {
 | 
			
		||||
		t.Errorf("Failed to wait progress requests, err: %s, want: %d , got %d", err, wantRequestsSent, requestsSent)
 | 
			
		||||
	}
 | 
			
		||||
	pr.Add()
 | 
			
		||||
	clock.Step(progressRequestPeriod * 2)
 | 
			
		||||
	if err := pollConditionNoChange(pollPeriod, minimalNoChange, pollTimeout, func() bool {
 | 
			
		||||
		requestsSent = pr.progressRequestsSentCount.Load()
 | 
			
		||||
		return requestsSent == wantRequestsSent
 | 
			
		||||
	}); err != nil {
 | 
			
		||||
		t.Errorf("Failed to wait progress requests, err: %s, want: %d , got %d", err, wantRequestsSent, requestsSent)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func newTestConditionalProgressRequester(clock clock.WithTicker) *testConditionalProgressRequester {
 | 
			
		||||
	pr := &testConditionalProgressRequester{}
 | 
			
		||||
	pr.conditionalProgressRequester = newConditionalProgressRequester(pr.RequestWatchProgress, clock)
 | 
			
		||||
	return pr
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type testConditionalProgressRequester struct {
 | 
			
		||||
	*conditionalProgressRequester
 | 
			
		||||
	progressRequestsSentCount atomic.Int32
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (pr *testConditionalProgressRequester) RequestWatchProgress(ctx context.Context) error {
 | 
			
		||||
	pr.progressRequestsSentCount.Add(1)
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func pollConditionNoChange(interval, stable, timeout time.Duration, condition func() bool) error {
 | 
			
		||||
	passCounter := 0
 | 
			
		||||
	requiredNumberOfPasses := int(stable/interval) + 1
 | 
			
		||||
	return wait.Poll(interval, timeout, func() (done bool, err error) {
 | 
			
		||||
		if condition() {
 | 
			
		||||
			passCounter++
 | 
			
		||||
		} else {
 | 
			
		||||
			passCounter = 0
 | 
			
		||||
		}
 | 
			
		||||
		return passCounter >= requiredNumberOfPasses, nil
 | 
			
		||||
	})
 | 
			
		||||
}
 | 
			
		||||
@@ -85,6 +85,12 @@ type store struct {
 | 
			
		||||
	leaseManager        *leaseManager
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (s *store) RequestWatchProgress(ctx context.Context) error {
 | 
			
		||||
	// Use watchContext to match ctx metadata provided when creating the watch.
 | 
			
		||||
	// In best case scenario we would use the same context that watch was created, but there is no way access it from watchCache.
 | 
			
		||||
	return s.client.RequestProgress(s.watchContext(ctx))
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type objState struct {
 | 
			
		||||
	obj   runtime.Object
 | 
			
		||||
	meta  *storage.ResponseMeta
 | 
			
		||||
 
 | 
			
		||||
@@ -215,6 +215,10 @@ func (wc *watchChan) ResultChan() <-chan watch.Event {
 | 
			
		||||
	return wc.resultChan
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (wc *watchChan) RequestWatchProgress() error {
 | 
			
		||||
	return wc.watcher.client.RequestProgress(wc.ctx)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// sync tries to retrieve existing data and send them to process.
 | 
			
		||||
// The revision to watch will be set to the revision in response.
 | 
			
		||||
// All events sent will have isCreated=true
 | 
			
		||||
 
 | 
			
		||||
@@ -236,6 +236,21 @@ type Interface interface {
 | 
			
		||||
 | 
			
		||||
	// Count returns number of different entries under the key (generally being path prefix).
 | 
			
		||||
	Count(key string) (int64, error)
 | 
			
		||||
 | 
			
		||||
	// RequestWatchProgress requests the a watch stream progress status be sent in the
 | 
			
		||||
	// watch response stream as soon as possible.
 | 
			
		||||
	// Used for monitor watch progress even if watching resources with no changes.
 | 
			
		||||
	//
 | 
			
		||||
	// If watch is lagging, progress status might:
 | 
			
		||||
	// * be pointing to stale resource version. Use etcd KV request to get linearizable resource version.
 | 
			
		||||
	// * not be delivered at all. It's recommended to poll request progress periodically.
 | 
			
		||||
	//
 | 
			
		||||
	// Note: Only watches with matching context grpc metadata will be notified.
 | 
			
		||||
	// https://github.com/kubernetes/kubernetes/blob/9325a57125e8502941d1b0c7379c4bb80a678d5c/vendor/go.etcd.io/etcd/client/v3/watch.go#L1037-L1042
 | 
			
		||||
	//
 | 
			
		||||
	// TODO: Remove when storage.Interface will be separate from etc3.store.
 | 
			
		||||
	// Deprecated: Added temporarily to simplify exposing RequestProgress for watch cache.
 | 
			
		||||
	RequestWatchProgress(ctx context.Context) error
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// GetOptions provides the options that may be provided for storage get operations.
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user