diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index bc6500909c3..d913154e000 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -482,6 +482,13 @@ func (c *Cacher) Delete( // Watch implements storage.Interface. func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) { pred := opts.Predicate + // If the resourceVersion is unset, ensure that the rv + // from which the watch is being served, is the latest + // one. "latest" is ensured by serving the watch from + // the underlying storage. + if opts.ResourceVersion == "" { + return c.storage.Watch(ctx, key, opts) + } watchRV, err := c.versioner.ParseResourceVersion(opts.ResourceVersion) if err != nil { return nil, err diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go index c15ff5e8645..a06ab757e6e 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go @@ -277,6 +277,7 @@ func newTestCacher(s storage.Interface) (*Cacher, storage.Versioner, error) { } type dummyStorage struct { + sync.RWMutex err error } @@ -306,12 +307,21 @@ func (d *dummyStorage) Delete(_ context.Context, _ string, _ runtime.Object, _ * return fmt.Errorf("unimplemented") } func (d *dummyStorage) Watch(_ context.Context, _ string, _ storage.ListOptions) (watch.Interface, error) { - return newDummyWatch(), nil + d.RLock() + defer d.RUnlock() + + return newDummyWatch(), d.err } func (d *dummyStorage) Get(_ context.Context, _ string, _ storage.GetOptions, _ runtime.Object) error { + d.RLock() + defer d.RUnlock() + return d.err } func (d *dummyStorage) GetList(_ context.Context, _ string, _ storage.ListOptions, listObj runtime.Object) error { + d.RLock() + defer d.RUnlock() + podList := listObj.(*example.PodList) podList.ListMeta = metav1.ListMeta{ResourceVersion: "100"} return d.err @@ -322,6 +332,12 @@ func (d *dummyStorage) GuaranteedUpdate(_ context.Context, _ string, _ runtime.O func (d *dummyStorage) Count(_ string) (int64, error) { return 0, fmt.Errorf("unimplemented") } +func (d *dummyStorage) injectError(err error) { + d.Lock() + defer d.Unlock() + + d.err = err +} func TestGetListCacheBypass(t *testing.T) { backingStorage := &dummyStorage{} @@ -342,7 +358,7 @@ func TestGetListCacheBypass(t *testing.T) { } // Inject error to underlying layer and check if cacher is not bypassed. - backingStorage.err = errDummy + backingStorage.injectError(errDummy) err = cacher.GetList(context.TODO(), "pods/ns", storage.ListOptions{ ResourceVersion: "0", Predicate: pred, @@ -381,7 +397,7 @@ func TestGetListNonRecursiveCacheBypass(t *testing.T) { } // Inject error to underlying layer and check if cacher is not bypassed. - backingStorage.err = errDummy + backingStorage.injectError(errDummy) err = cacher.GetList(context.TODO(), "pods/ns", storage.ListOptions{ ResourceVersion: "0", Predicate: pred, @@ -415,7 +431,7 @@ func TestGetCacheBypass(t *testing.T) { } // Inject error to underlying layer and check if cacher is not bypassed. - backingStorage.err = errDummy + backingStorage.injectError(errDummy) err = cacher.Get(context.TODO(), "pods/ns/pod-0", storage.GetOptions{ IgnoreNotFound: true, ResourceVersion: "0", @@ -433,6 +449,37 @@ func TestGetCacheBypass(t *testing.T) { } } +func TestWatchCacheBypass(t *testing.T) { + backingStorage := &dummyStorage{} + cacher, _, err := newTestCacher(backingStorage) + if err != nil { + t.Fatalf("Couldn't create cacher: %v", err) + } + defer cacher.Stop() + + // Wait until cacher is initialized. + if err := cacher.ready.wait(); err != nil { + t.Fatalf("unexpected error waiting for the cache to be ready") + } + + // Inject error to underlying layer and check if cacher is not bypassed. + backingStorage.injectError(errDummy) + _, err = cacher.Watch(context.TODO(), "pod/ns", storage.ListOptions{ + ResourceVersion: "0", + }) + if err != nil { + t.Errorf("Watch with RV=0 should be served from cache: %v", err) + } + + // With unset RV, check if cacher is bypassed. + _, err = cacher.Watch(context.TODO(), "pod/ns", storage.ListOptions{ + ResourceVersion: "", + }) + if err != errDummy { + t.Errorf("Watch with unset RV should bypass cacher: %v", err) + } +} + func TestWatcherNotGoingBackInTime(t *testing.T) { backingStorage := &dummyStorage{} cacher, _, err := newTestCacher(backingStorage)