storage/cacher: ensure the cache is at the Most Recent ResourceVersion when streaming was requested
This commit is contained in:
		@@ -513,6 +513,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
 | 
				
			|||||||
	if !utilfeature.DefaultFeatureGate.Enabled(features.WatchList) && opts.SendInitialEvents != nil {
 | 
						if !utilfeature.DefaultFeatureGate.Enabled(features.WatchList) && opts.SendInitialEvents != nil {
 | 
				
			||||||
		opts.SendInitialEvents = nil
 | 
							opts.SendInitialEvents = nil
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
						// TODO: we should eventually get rid of this legacy case
 | 
				
			||||||
	if opts.SendInitialEvents == nil && opts.ResourceVersion == "" {
 | 
						if opts.SendInitialEvents == nil && opts.ResourceVersion == "" {
 | 
				
			||||||
		return c.storage.Watch(ctx, key, opts)
 | 
							return c.storage.Watch(ctx, key, opts)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
@@ -557,14 +558,14 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
 | 
				
			|||||||
	//   watchers on our watcher having a processing hiccup
 | 
						//   watchers on our watcher having a processing hiccup
 | 
				
			||||||
	chanSize := c.watchCache.suggestedWatchChannelSize(c.indexedTrigger != nil, triggerSupported)
 | 
						chanSize := c.watchCache.suggestedWatchChannelSize(c.indexedTrigger != nil, triggerSupported)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// Determine a function that computes the bookmarkAfterResourceVersion
 | 
						// Determine the ResourceVersion to which the watch cache must be synchronized
 | 
				
			||||||
	bookmarkAfterResourceVersionFn, err := c.getBookmarkAfterResourceVersionLockedFunc(ctx, requestedWatchRV, opts)
 | 
						requiredResourceVersion, err := c.getWatchCacheResourceVersion(ctx, requestedWatchRV, opts)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		return newErrWatcher(err), nil
 | 
							return newErrWatcher(err), nil
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// Determine a function that computes the watchRV we should start from
 | 
						// Determine a function that computes the bookmarkAfterResourceVersion
 | 
				
			||||||
	startWatchResourceVersionFn, err := c.getStartResourceVersionForWatchLockedFunc(ctx, requestedWatchRV, opts)
 | 
						bookmarkAfterResourceVersionFn, err := c.getBookmarkAfterResourceVersionLockedFunc(requestedWatchRV, requiredResourceVersion, opts)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		return newErrWatcher(err), nil
 | 
							return newErrWatcher(err), nil
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
@@ -596,7 +597,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
 | 
				
			|||||||
	// moreover even though the c.waitUntilWatchCacheFreshAndForceAllEvents acquires a lock
 | 
						// moreover even though the c.waitUntilWatchCacheFreshAndForceAllEvents acquires a lock
 | 
				
			||||||
	// it is safe to release the lock after the method finishes because we don't require
 | 
						// it is safe to release the lock after the method finishes because we don't require
 | 
				
			||||||
	// any atomicity between the call to the method and further calls that actually get the events.
 | 
						// any atomicity between the call to the method and further calls that actually get the events.
 | 
				
			||||||
	forceAllEvents, err := c.waitUntilWatchCacheFreshAndForceAllEvents(ctx, requestedWatchRV, opts)
 | 
						err = c.waitUntilWatchCacheFreshAndForceAllEvents(ctx, requiredResourceVersion, opts)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		return newErrWatcher(err), nil
 | 
							return newErrWatcher(err), nil
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
@@ -609,13 +610,8 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
 | 
				
			|||||||
	c.watchCache.RLock()
 | 
						c.watchCache.RLock()
 | 
				
			||||||
	defer c.watchCache.RUnlock()
 | 
						defer c.watchCache.RUnlock()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	startWatchRV := startWatchResourceVersionFn()
 | 
					 | 
				
			||||||
	var cacheInterval *watchCacheInterval
 | 
						var cacheInterval *watchCacheInterval
 | 
				
			||||||
	if forceAllEvents {
 | 
						cacheInterval, err = c.watchCache.getAllEventsSinceLocked(requiredResourceVersion, opts)
 | 
				
			||||||
		cacheInterval, err = c.watchCache.getIntervalFromStoreLocked()
 | 
					 | 
				
			||||||
	} else {
 | 
					 | 
				
			||||||
		cacheInterval, err = c.watchCache.getAllEventsSinceLocked(startWatchRV)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		// To match the uncached watch implementation, once we have passed authn/authz/admission,
 | 
							// To match the uncached watch implementation, once we have passed authn/authz/admission,
 | 
				
			||||||
		// and successfully parsed a resource version, other errors must fail with a watch event of type ERROR,
 | 
							// and successfully parsed a resource version, other errors must fail with a watch event of type ERROR,
 | 
				
			||||||
@@ -657,7 +653,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
 | 
				
			|||||||
		return newImmediateCloseWatcher(), nil
 | 
							return newImmediateCloseWatcher(), nil
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	go watcher.processInterval(ctx, cacheInterval, startWatchRV)
 | 
						go watcher.processInterval(ctx, cacheInterval, requiredResourceVersion)
 | 
				
			||||||
	return watcher, nil
 | 
						return watcher, nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -1249,59 +1245,61 @@ func (c *Cacher) LastSyncResourceVersion() (uint64, error) {
 | 
				
			|||||||
// spits a ResourceVersion after which the bookmark event will be delivered.
 | 
					// spits a ResourceVersion after which the bookmark event will be delivered.
 | 
				
			||||||
//
 | 
					//
 | 
				
			||||||
// The returned function must be called under the watchCache lock.
 | 
					// The returned function must be called under the watchCache lock.
 | 
				
			||||||
func (c *Cacher) getBookmarkAfterResourceVersionLockedFunc(ctx context.Context, parsedResourceVersion uint64, opts storage.ListOptions) (func() uint64, error) {
 | 
					func (c *Cacher) getBookmarkAfterResourceVersionLockedFunc(parsedResourceVersion, requiredResourceVersion uint64, opts storage.ListOptions) (func() uint64, error) {
 | 
				
			||||||
	if opts.SendInitialEvents == nil || !*opts.SendInitialEvents || !opts.Predicate.AllowWatchBookmarks {
 | 
						if opts.SendInitialEvents == nil || !*opts.SendInitialEvents || !opts.Predicate.AllowWatchBookmarks {
 | 
				
			||||||
		return func() uint64 { return 0 }, nil
 | 
							return func() uint64 { return 0 }, nil
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	return c.getCommonResourceVersionLockedFunc(ctx, parsedResourceVersion, opts)
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
// getStartResourceVersionForWatchLockedFunc returns a function that
 | 
					 | 
				
			||||||
// spits a ResourceVersion the watch will be started from.
 | 
					 | 
				
			||||||
// Depending on the input parameters the semantics of the returned ResourceVersion are:
 | 
					 | 
				
			||||||
//   - start at Exact (return parsedWatchResourceVersion)
 | 
					 | 
				
			||||||
//   - start at Most Recent (return an RV from etcd)
 | 
					 | 
				
			||||||
//   - start at Any (return the current watchCache's RV)
 | 
					 | 
				
			||||||
//
 | 
					 | 
				
			||||||
// The returned function must be called under the watchCache lock.
 | 
					 | 
				
			||||||
func (c *Cacher) getStartResourceVersionForWatchLockedFunc(ctx context.Context, parsedWatchResourceVersion uint64, opts storage.ListOptions) (func() uint64, error) {
 | 
					 | 
				
			||||||
	if opts.SendInitialEvents == nil || *opts.SendInitialEvents {
 | 
					 | 
				
			||||||
		return func() uint64 { return parsedWatchResourceVersion }, nil
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	return c.getCommonResourceVersionLockedFunc(ctx, parsedWatchResourceVersion, opts)
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// getCommonResourceVersionLockedFunc a helper that simply computes a ResourceVersion
 | 
					 | 
				
			||||||
// based on the input parameters. Please examine callers of this method to get more context.
 | 
					 | 
				
			||||||
//
 | 
					 | 
				
			||||||
// The returned function must be called under the watchCache lock.
 | 
					 | 
				
			||||||
func (c *Cacher) getCommonResourceVersionLockedFunc(ctx context.Context, parsedWatchResourceVersion uint64, opts storage.ListOptions) (func() uint64, error) {
 | 
					 | 
				
			||||||
	switch {
 | 
						switch {
 | 
				
			||||||
	case len(opts.ResourceVersion) == 0:
 | 
						case len(opts.ResourceVersion) == 0:
 | 
				
			||||||
		rv, err := storage.GetCurrentResourceVersionFromStorage(ctx, c.storage, c.newListFunc, c.resourcePrefix, c.objectType.String())
 | 
							return func() uint64 { return requiredResourceVersion }, nil
 | 
				
			||||||
		if err != nil {
 | 
						case parsedResourceVersion == 0:
 | 
				
			||||||
			return nil, err
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
		return func() uint64 { return rv }, nil
 | 
					 | 
				
			||||||
	case parsedWatchResourceVersion == 0:
 | 
					 | 
				
			||||||
		// here we assume that watchCache locked is already held
 | 
							// here we assume that watchCache locked is already held
 | 
				
			||||||
		return func() uint64 { return c.watchCache.resourceVersion }, nil
 | 
							return func() uint64 { return c.watchCache.resourceVersion }, nil
 | 
				
			||||||
	default:
 | 
						default:
 | 
				
			||||||
		return func() uint64 { return parsedWatchResourceVersion }, nil
 | 
							return func() uint64 { return parsedResourceVersion }, nil
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// getWatchCacheResourceVersion returns a ResourceVersion to which the watch cache must be synchronized to
 | 
				
			||||||
 | 
					//
 | 
				
			||||||
 | 
					// Depending on the input parameters, the semantics of the returned ResourceVersion are:
 | 
				
			||||||
 | 
					//   - must be at Exact RV (when parsedWatchResourceVersion > 0)
 | 
				
			||||||
 | 
					//   - can be at Any RV (when parsedWatchResourceVersion = 0)
 | 
				
			||||||
 | 
					//   - must be at Most Recent RV (return an RV from etcd)
 | 
				
			||||||
 | 
					//
 | 
				
			||||||
 | 
					// note that the above semantic is enforced by the API validation (defined elsewhere):
 | 
				
			||||||
 | 
					//
 | 
				
			||||||
 | 
					//	if SendInitiaEvents != nil => ResourceVersionMatch = NotOlderThan
 | 
				
			||||||
 | 
					//	if ResourceVersionmatch != nil => ResourceVersionMatch = NotOlderThan & SendInitialEvents != nil
 | 
				
			||||||
 | 
					//
 | 
				
			||||||
 | 
					// to satisfy the legacy case (SendInitialEvents = true, RV="") we skip checking opts.Predicate.AllowWatchBookmarks
 | 
				
			||||||
 | 
					func (c *Cacher) getWatchCacheResourceVersion(ctx context.Context, parsedWatchResourceVersion uint64, opts storage.ListOptions) (uint64, error) {
 | 
				
			||||||
 | 
						if len(opts.ResourceVersion) != 0 {
 | 
				
			||||||
 | 
							return parsedWatchResourceVersion, nil
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						rv, err := storage.GetCurrentResourceVersionFromStorage(ctx, c.storage, c.newListFunc, c.resourcePrefix, c.objectType.String())
 | 
				
			||||||
 | 
						return rv, err
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// waitUntilWatchCacheFreshAndForceAllEvents waits until cache is at least
 | 
					// waitUntilWatchCacheFreshAndForceAllEvents waits until cache is at least
 | 
				
			||||||
// as fresh as given requestedWatchRV if sendInitialEvents was requested.
 | 
					// as fresh as given requestedWatchRV if sendInitialEvents was requested.
 | 
				
			||||||
// Additionally, it instructs the caller whether it should ask for
 | 
					// otherwise, we allow for establishing the connection because the clients
 | 
				
			||||||
// all events from the cache (full state) or not.
 | 
					// can wait for events without unnecessary blocking.
 | 
				
			||||||
func (c *Cacher) waitUntilWatchCacheFreshAndForceAllEvents(ctx context.Context, requestedWatchRV uint64, opts storage.ListOptions) (bool, error) {
 | 
					func (c *Cacher) waitUntilWatchCacheFreshAndForceAllEvents(ctx context.Context, requestedWatchRV uint64, opts storage.ListOptions) error {
 | 
				
			||||||
	if opts.SendInitialEvents != nil && *opts.SendInitialEvents {
 | 
						if opts.SendInitialEvents != nil && *opts.SendInitialEvents {
 | 
				
			||||||
 | 
							// TODO(p0lyn0mial): adapt the following logic once
 | 
				
			||||||
 | 
							//   https://github.com/kubernetes/kubernetes/pull/123264 merges
 | 
				
			||||||
 | 
							if utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) && c.watchCache.notFresh(requestedWatchRV) {
 | 
				
			||||||
 | 
								c.watchCache.waitingUntilFresh.Add()
 | 
				
			||||||
 | 
								defer c.watchCache.waitingUntilFresh.Remove()
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							// TODO(p0lyn0mial): add a metric to track the number of times we have failed while waiting
 | 
				
			||||||
		err := c.watchCache.waitUntilFreshAndBlock(ctx, requestedWatchRV)
 | 
							err := c.watchCache.waitUntilFreshAndBlock(ctx, requestedWatchRV)
 | 
				
			||||||
		defer c.watchCache.RUnlock()
 | 
							defer c.watchCache.RUnlock()
 | 
				
			||||||
		return err == nil, err
 | 
							return err
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	return false, nil
 | 
						return nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// errWatcher implements watch.Interface to return a single error
 | 
					// errWatcher implements watch.Interface to return a single error
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -32,6 +32,7 @@ import (
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	apiequality "k8s.io/apimachinery/pkg/api/equality"
 | 
						apiequality "k8s.io/apimachinery/pkg/api/equality"
 | 
				
			||||||
	apierrors "k8s.io/apimachinery/pkg/api/errors"
 | 
						apierrors "k8s.io/apimachinery/pkg/api/errors"
 | 
				
			||||||
 | 
						"k8s.io/apimachinery/pkg/api/meta"
 | 
				
			||||||
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
						metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
				
			||||||
	"k8s.io/apimachinery/pkg/fields"
 | 
						"k8s.io/apimachinery/pkg/fields"
 | 
				
			||||||
	"k8s.io/apimachinery/pkg/labels"
 | 
						"k8s.io/apimachinery/pkg/labels"
 | 
				
			||||||
@@ -85,12 +86,25 @@ type dummyStorage struct {
 | 
				
			|||||||
	err       error
 | 
						err       error
 | 
				
			||||||
	getListFn func(_ context.Context, _ string, _ storage.ListOptions, listObj runtime.Object) error
 | 
						getListFn func(_ context.Context, _ string, _ storage.ListOptions, listObj runtime.Object) error
 | 
				
			||||||
	watchFn   func(_ context.Context, _ string, _ storage.ListOptions) (watch.Interface, error)
 | 
						watchFn   func(_ context.Context, _ string, _ storage.ListOptions) (watch.Interface, error)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// use getRequestWatchProgressCounter when reading
 | 
				
			||||||
 | 
						// the value of the counter
 | 
				
			||||||
 | 
						requestWatchProgressCounter int
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (d *dummyStorage) RequestWatchProgress(ctx context.Context) error {
 | 
					func (d *dummyStorage) RequestWatchProgress(ctx context.Context) error {
 | 
				
			||||||
 | 
						d.Lock()
 | 
				
			||||||
 | 
						defer d.Unlock()
 | 
				
			||||||
 | 
						d.requestWatchProgressCounter++
 | 
				
			||||||
	return nil
 | 
						return nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (d *dummyStorage) getRequestWatchProgressCounter() int {
 | 
				
			||||||
 | 
						d.RLock()
 | 
				
			||||||
 | 
						defer d.RUnlock()
 | 
				
			||||||
 | 
						return d.requestWatchProgressCounter
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
type dummyWatch struct {
 | 
					type dummyWatch struct {
 | 
				
			||||||
	ch chan watch.Event
 | 
						ch chan watch.Event
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
@@ -1605,40 +1619,143 @@ func TestCacheIntervalInvalidationStopsWatch(t *testing.T) {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
func TestWaitUntilWatchCacheFreshAndForceAllEvents(t *testing.T) {
 | 
					func TestWaitUntilWatchCacheFreshAndForceAllEvents(t *testing.T) {
 | 
				
			||||||
	defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchList, true)()
 | 
						defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchList, true)()
 | 
				
			||||||
	backingStorage := &dummyStorage{}
 | 
						defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, true)()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						scenarios := []struct {
 | 
				
			||||||
 | 
							name               string
 | 
				
			||||||
 | 
							opts               storage.ListOptions
 | 
				
			||||||
 | 
							backingStorage     *dummyStorage
 | 
				
			||||||
 | 
							verifyBackingStore func(t *testing.T, s *dummyStorage)
 | 
				
			||||||
 | 
						}{
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								name: "allowWatchBookmarks=true, sendInitialEvents=true, RV=105",
 | 
				
			||||||
 | 
								opts: storage.ListOptions{
 | 
				
			||||||
 | 
									Predicate: func() storage.SelectionPredicate {
 | 
				
			||||||
 | 
										p := storage.Everything
 | 
				
			||||||
 | 
										p.AllowWatchBookmarks = true
 | 
				
			||||||
 | 
										return p
 | 
				
			||||||
 | 
									}(),
 | 
				
			||||||
 | 
									SendInitialEvents: pointer.Bool(true),
 | 
				
			||||||
 | 
									ResourceVersion:   "105",
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
								verifyBackingStore: func(t *testing.T, s *dummyStorage) {
 | 
				
			||||||
 | 
									require.NotEqual(t, 0, s.requestWatchProgressCounter, "expected store.RequestWatchProgressCounter to be > 0. It looks like watch progress wasn't requested!")
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								name: "legacy: allowWatchBookmarks=false, sendInitialEvents=true, RV=unset",
 | 
				
			||||||
 | 
								opts: storage.ListOptions{
 | 
				
			||||||
 | 
									Predicate: func() storage.SelectionPredicate {
 | 
				
			||||||
 | 
										p := storage.Everything
 | 
				
			||||||
 | 
										p.AllowWatchBookmarks = false
 | 
				
			||||||
 | 
										return p
 | 
				
			||||||
 | 
									}(),
 | 
				
			||||||
 | 
									SendInitialEvents: pointer.Bool(true),
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
								backingStorage: func() *dummyStorage {
 | 
				
			||||||
 | 
									hasBeenPrimed := false
 | 
				
			||||||
 | 
									s := &dummyStorage{}
 | 
				
			||||||
 | 
									s.getListFn = func(_ context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
 | 
				
			||||||
 | 
										listAccessor, err := meta.ListAccessor(listObj)
 | 
				
			||||||
 | 
										if err != nil {
 | 
				
			||||||
 | 
											return err
 | 
				
			||||||
 | 
										}
 | 
				
			||||||
 | 
										// the first call to this function
 | 
				
			||||||
 | 
										// primes the cacher
 | 
				
			||||||
 | 
										if !hasBeenPrimed {
 | 
				
			||||||
 | 
											listAccessor.SetResourceVersion("100")
 | 
				
			||||||
 | 
											hasBeenPrimed = true
 | 
				
			||||||
 | 
											return nil
 | 
				
			||||||
 | 
										}
 | 
				
			||||||
 | 
										listAccessor.SetResourceVersion("105")
 | 
				
			||||||
 | 
										return nil
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
 | 
									return s
 | 
				
			||||||
 | 
								}(),
 | 
				
			||||||
 | 
								verifyBackingStore: func(t *testing.T, s *dummyStorage) {
 | 
				
			||||||
 | 
									require.NotEqual(t, 0, s.getRequestWatchProgressCounter(), "expected store.RequestWatchProgressCounter to be > 0. It looks like watch progress wasn't requested!")
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								name: "allowWatchBookmarks=true, sendInitialEvents=true, RV=unset",
 | 
				
			||||||
 | 
								opts: storage.ListOptions{
 | 
				
			||||||
 | 
									Predicate: func() storage.SelectionPredicate {
 | 
				
			||||||
 | 
										p := storage.Everything
 | 
				
			||||||
 | 
										p.AllowWatchBookmarks = true
 | 
				
			||||||
 | 
										return p
 | 
				
			||||||
 | 
									}(),
 | 
				
			||||||
 | 
									SendInitialEvents: pointer.Bool(true),
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
								backingStorage: func() *dummyStorage {
 | 
				
			||||||
 | 
									hasBeenPrimed := false
 | 
				
			||||||
 | 
									s := &dummyStorage{}
 | 
				
			||||||
 | 
									s.getListFn = func(_ context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
 | 
				
			||||||
 | 
										listAccessor, err := meta.ListAccessor(listObj)
 | 
				
			||||||
 | 
										if err != nil {
 | 
				
			||||||
 | 
											return err
 | 
				
			||||||
 | 
										}
 | 
				
			||||||
 | 
										// the first call to this function
 | 
				
			||||||
 | 
										// primes the cacher
 | 
				
			||||||
 | 
										if !hasBeenPrimed {
 | 
				
			||||||
 | 
											listAccessor.SetResourceVersion("100")
 | 
				
			||||||
 | 
											hasBeenPrimed = true
 | 
				
			||||||
 | 
											return nil
 | 
				
			||||||
 | 
										}
 | 
				
			||||||
 | 
										listAccessor.SetResourceVersion("105")
 | 
				
			||||||
 | 
										return nil
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
 | 
									return s
 | 
				
			||||||
 | 
								}(),
 | 
				
			||||||
 | 
								verifyBackingStore: func(t *testing.T, s *dummyStorage) {
 | 
				
			||||||
 | 
									require.NotEqual(t, 0, s.getRequestWatchProgressCounter(), "expected store.RequestWatchProgressCounter to be > 0. It looks like watch progress wasn't requested!")
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						for _, scenario := range scenarios {
 | 
				
			||||||
 | 
							t.Run(scenario.name, func(t *testing.T) {
 | 
				
			||||||
 | 
								var backingStorage *dummyStorage
 | 
				
			||||||
 | 
								if scenario.backingStorage != nil {
 | 
				
			||||||
 | 
									backingStorage = scenario.backingStorage
 | 
				
			||||||
 | 
								} else {
 | 
				
			||||||
 | 
									backingStorage = &dummyStorage{}
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
			cacher, _, err := newTestCacher(backingStorage)
 | 
								cacher, _, err := newTestCacher(backingStorage)
 | 
				
			||||||
			if err != nil {
 | 
								if err != nil {
 | 
				
			||||||
				t.Fatalf("Couldn't create cacher: %v", err)
 | 
									t.Fatalf("Couldn't create cacher: %v", err)
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
			defer cacher.Stop()
 | 
								defer cacher.Stop()
 | 
				
			||||||
 | 
								if err := cacher.ready.wait(context.Background()); err != nil {
 | 
				
			||||||
	opts := storage.ListOptions{
 | 
									t.Fatalf("unexpected error waiting for the cache to be ready")
 | 
				
			||||||
		Predicate:         storage.Everything,
 | 
					 | 
				
			||||||
		SendInitialEvents: pointer.Bool(true),
 | 
					 | 
				
			||||||
		ResourceVersion:   "105",
 | 
					 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
	opts.Predicate.AllowWatchBookmarks = true
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
	w, err := cacher.Watch(context.Background(), "pods/ns", opts)
 | 
								w, err := cacher.Watch(context.Background(), "pods/ns", scenario.opts)
 | 
				
			||||||
			require.NoError(t, err, "failed to create watch: %v")
 | 
								require.NoError(t, err, "failed to create watch: %v")
 | 
				
			||||||
			defer w.Stop()
 | 
								defer w.Stop()
 | 
				
			||||||
 | 
								var expectedErr *apierrors.StatusError
 | 
				
			||||||
 | 
								if !errors.As(storage.NewTooLargeResourceVersionError(105, 100, resourceVersionTooHighRetrySeconds), &expectedErr) {
 | 
				
			||||||
 | 
									t.Fatalf("Unable to convert NewTooLargeResourceVersionError to apierrors.StatusError")
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
			verifyEvents(t, w, []watch.Event{
 | 
								verifyEvents(t, w, []watch.Event{
 | 
				
			||||||
				{
 | 
									{
 | 
				
			||||||
					Type: watch.Error,
 | 
										Type: watch.Error,
 | 
				
			||||||
					Object: &metav1.Status{
 | 
										Object: &metav1.Status{
 | 
				
			||||||
						Status:  metav1.StatusFailure,
 | 
											Status:  metav1.StatusFailure,
 | 
				
			||||||
				Message: storage.NewTooLargeResourceVersionError(105, 100, resourceVersionTooHighRetrySeconds).Error(),
 | 
											Message: expectedErr.Error(),
 | 
				
			||||||
				Details: storage.NewTooLargeResourceVersionError(105, 100, resourceVersionTooHighRetrySeconds).(*apierrors.StatusError).Status().Details,
 | 
											Details: expectedErr.ErrStatus.Details,
 | 
				
			||||||
						Reason:  metav1.StatusReasonTimeout,
 | 
											Reason:  metav1.StatusReasonTimeout,
 | 
				
			||||||
						Code:    504,
 | 
											Code:    504,
 | 
				
			||||||
					},
 | 
										},
 | 
				
			||||||
				},
 | 
									},
 | 
				
			||||||
			}, true)
 | 
								}, true)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	go func() {
 | 
								go func(t *testing.T) {
 | 
				
			||||||
		cacher.watchCache.Add(makeTestPodDetails("pod1", 105, "node1", map[string]string{"label": "value1"}))
 | 
									err := cacher.watchCache.Add(makeTestPodDetails("pod1", 105, "node1", map[string]string{"label": "value1"}))
 | 
				
			||||||
	}()
 | 
									require.NoError(t, err, "failed adding a pod to the watchCache")
 | 
				
			||||||
	w, err = cacher.Watch(context.Background(), "pods/ns", opts)
 | 
								}(t)
 | 
				
			||||||
 | 
								w, err = cacher.Watch(context.Background(), "pods/ns", scenario.opts)
 | 
				
			||||||
			require.NoError(t, err, "failed to create watch: %v")
 | 
								require.NoError(t, err, "failed to create watch: %v")
 | 
				
			||||||
			defer w.Stop()
 | 
								defer w.Stop()
 | 
				
			||||||
			verifyEvents(t, w, []watch.Event{
 | 
								verifyEvents(t, w, []watch.Event{
 | 
				
			||||||
@@ -1647,6 +1764,11 @@ func TestWaitUntilWatchCacheFreshAndForceAllEvents(t *testing.T) {
 | 
				
			|||||||
					Object: makeTestPodDetails("pod1", 105, "node1", map[string]string{"label": "value1"}),
 | 
										Object: makeTestPodDetails("pod1", 105, "node1", map[string]string{"label": "value1"}),
 | 
				
			||||||
				},
 | 
									},
 | 
				
			||||||
			}, true)
 | 
								}, true)
 | 
				
			||||||
 | 
								if scenario.verifyBackingStore != nil {
 | 
				
			||||||
 | 
									scenario.verifyBackingStore(t, backingStorage)
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
							})
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
type fakeStorage struct {
 | 
					type fakeStorage struct {
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user