Merge pull request #117641 from wojtek-t/cleanup_cacher_tests_847
Refactor some watchcache tests
This commit is contained in:
		@@ -62,7 +62,7 @@ func TestCacheWatcherCleanupNotBlockedByResult(t *testing.T) {
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
	// set the size of the buffer of w.result to 0, so that the writes to
 | 
						// set the size of the buffer of w.result to 0, so that the writes to
 | 
				
			||||||
	// w.result is blocked.
 | 
						// w.result is blocked.
 | 
				
			||||||
	w = newCacheWatcher(0, filter, forget, testVersioner{}, time.Now(), false, schema.GroupResource{Resource: "pods"}, "")
 | 
						w = newCacheWatcher(0, filter, forget, storage.APIObjectVersioner{}, time.Now(), false, schema.GroupResource{Resource: "pods"}, "")
 | 
				
			||||||
	go w.processInterval(context.Background(), intervalFromEvents(initEvents), 0)
 | 
						go w.processInterval(context.Background(), intervalFromEvents(initEvents), 0)
 | 
				
			||||||
	w.Stop()
 | 
						w.Stop()
 | 
				
			||||||
	if err := wait.PollImmediate(1*time.Second, 5*time.Second, func() (bool, error) {
 | 
						if err := wait.PollImmediate(1*time.Second, 5*time.Second, func() (bool, error) {
 | 
				
			||||||
@@ -182,7 +182,7 @@ TestCase:
 | 
				
			|||||||
			testCase.events[j].ResourceVersion = uint64(j) + 1
 | 
								testCase.events[j].ResourceVersion = uint64(j) + 1
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		w := newCacheWatcher(0, filter, forget, testVersioner{}, time.Now(), false, schema.GroupResource{Resource: "pods"}, "")
 | 
							w := newCacheWatcher(0, filter, forget, storage.APIObjectVersioner{}, time.Now(), false, schema.GroupResource{Resource: "pods"}, "")
 | 
				
			||||||
		go w.processInterval(context.Background(), intervalFromEvents(testCase.events), 0)
 | 
							go w.processInterval(context.Background(), intervalFromEvents(testCase.events), 0)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		ch := w.ResultChan()
 | 
							ch := w.ResultChan()
 | 
				
			||||||
@@ -219,7 +219,7 @@ func TestCacheWatcherStoppedInAnotherGoroutine(t *testing.T) {
 | 
				
			|||||||
	// timeout to zero and run the Stop goroutine concurrently.
 | 
						// timeout to zero and run the Stop goroutine concurrently.
 | 
				
			||||||
	// May sure that the watch will not be blocked on Stop.
 | 
						// May sure that the watch will not be blocked on Stop.
 | 
				
			||||||
	for i := 0; i < maxRetriesToProduceTheRaceCondition; i++ {
 | 
						for i := 0; i < maxRetriesToProduceTheRaceCondition; i++ {
 | 
				
			||||||
		w = newCacheWatcher(0, filter, forget, testVersioner{}, time.Now(), false, schema.GroupResource{Resource: "pods"}, "")
 | 
							w = newCacheWatcher(0, filter, forget, storage.APIObjectVersioner{}, time.Now(), false, schema.GroupResource{Resource: "pods"}, "")
 | 
				
			||||||
		go w.Stop()
 | 
							go w.Stop()
 | 
				
			||||||
		select {
 | 
							select {
 | 
				
			||||||
		case <-done:
 | 
							case <-done:
 | 
				
			||||||
@@ -231,7 +231,7 @@ func TestCacheWatcherStoppedInAnotherGoroutine(t *testing.T) {
 | 
				
			|||||||
	deadline := time.Now().Add(time.Hour)
 | 
						deadline := time.Now().Add(time.Hour)
 | 
				
			||||||
	// After that, verifies the cacheWatcher.process goroutine works correctly.
 | 
						// After that, verifies the cacheWatcher.process goroutine works correctly.
 | 
				
			||||||
	for i := 0; i < maxRetriesToProduceTheRaceCondition; i++ {
 | 
						for i := 0; i < maxRetriesToProduceTheRaceCondition; i++ {
 | 
				
			||||||
		w = newCacheWatcher(2, filter, emptyFunc, testVersioner{}, deadline, false, schema.GroupResource{Resource: "pods"}, "")
 | 
							w = newCacheWatcher(2, filter, emptyFunc, storage.APIObjectVersioner{}, deadline, false, schema.GroupResource{Resource: "pods"}, "")
 | 
				
			||||||
		w.input <- &watchCacheEvent{Object: &v1.Pod{}, ResourceVersion: uint64(i + 1)}
 | 
							w.input <- &watchCacheEvent{Object: &v1.Pod{}, ResourceVersion: uint64(i + 1)}
 | 
				
			||||||
		ctx, cancel := context.WithDeadline(context.Background(), deadline)
 | 
							ctx, cancel := context.WithDeadline(context.Background(), deadline)
 | 
				
			||||||
		defer cancel()
 | 
							defer cancel()
 | 
				
			||||||
@@ -308,7 +308,7 @@ func TestResourceVersionAfterInitEvents(t *testing.T) {
 | 
				
			|||||||
	filter := func(_ string, _ labels.Set, _ fields.Set) bool { return true }
 | 
						filter := func(_ string, _ labels.Set, _ fields.Set) bool { return true }
 | 
				
			||||||
	forget := func(_ bool) {}
 | 
						forget := func(_ bool) {}
 | 
				
			||||||
	deadline := time.Now().Add(time.Minute)
 | 
						deadline := time.Now().Add(time.Minute)
 | 
				
			||||||
	w := newCacheWatcher(numObjects+1, filter, forget, testVersioner{}, deadline, true, schema.GroupResource{Resource: "pods"}, "")
 | 
						w := newCacheWatcher(numObjects+1, filter, forget, storage.APIObjectVersioner{}, deadline, true, schema.GroupResource{Resource: "pods"}, "")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// Simulate a situation when the last event will that was already in
 | 
						// Simulate a situation when the last event will that was already in
 | 
				
			||||||
	// the state, wasn't yet processed by cacher and will be delivered
 | 
						// the state, wasn't yet processed by cacher and will be delivered
 | 
				
			||||||
@@ -351,7 +351,7 @@ func TestTimeBucketWatchersBasic(t *testing.T) {
 | 
				
			|||||||
	forget := func(bool) {}
 | 
						forget := func(bool) {}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	newWatcher := func(deadline time.Time) *cacheWatcher {
 | 
						newWatcher := func(deadline time.Time) *cacheWatcher {
 | 
				
			||||||
		w := newCacheWatcher(0, filter, forget, testVersioner{}, deadline, true, schema.GroupResource{Resource: "pods"}, "")
 | 
							w := newCacheWatcher(0, filter, forget, storage.APIObjectVersioner{}, deadline, true, schema.GroupResource{Resource: "pods"}, "")
 | 
				
			||||||
		w.setBookmarkAfterResourceVersion(0)
 | 
							w.setBookmarkAfterResourceVersion(0)
 | 
				
			||||||
		return w
 | 
							return w
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
@@ -418,7 +418,7 @@ func TestCacheWatcherDraining(t *testing.T) {
 | 
				
			|||||||
		makeWatchCacheEvent(5),
 | 
							makeWatchCacheEvent(5),
 | 
				
			||||||
		makeWatchCacheEvent(6),
 | 
							makeWatchCacheEvent(6),
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	w = newCacheWatcher(1, filter, forget, testVersioner{}, time.Now(), true, schema.GroupResource{Resource: "pods"}, "")
 | 
						w = newCacheWatcher(1, filter, forget, storage.APIObjectVersioner{}, time.Now(), true, schema.GroupResource{Resource: "pods"}, "")
 | 
				
			||||||
	go w.processInterval(context.Background(), intervalFromEvents(initEvents), 1)
 | 
						go w.processInterval(context.Background(), intervalFromEvents(initEvents), 1)
 | 
				
			||||||
	if !w.add(makeWatchCacheEvent(7), time.NewTimer(1*time.Second)) {
 | 
						if !w.add(makeWatchCacheEvent(7), time.NewTimer(1*time.Second)) {
 | 
				
			||||||
		t.Fatal("failed adding an even to the watcher")
 | 
							t.Fatal("failed adding an even to the watcher")
 | 
				
			||||||
@@ -459,7 +459,7 @@ func TestCacheWatcherDrainingRequestedButNotDrained(t *testing.T) {
 | 
				
			|||||||
		makeWatchCacheEvent(5),
 | 
							makeWatchCacheEvent(5),
 | 
				
			||||||
		makeWatchCacheEvent(6),
 | 
							makeWatchCacheEvent(6),
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	w = newCacheWatcher(1, filter, forget, testVersioner{}, time.Now(), true, schema.GroupResource{Resource: "pods"}, "")
 | 
						w = newCacheWatcher(1, filter, forget, storage.APIObjectVersioner{}, time.Now(), true, schema.GroupResource{Resource: "pods"}, "")
 | 
				
			||||||
	go w.processInterval(context.Background(), intervalFromEvents(initEvents), 1)
 | 
						go w.processInterval(context.Background(), intervalFromEvents(initEvents), 1)
 | 
				
			||||||
	if !w.add(makeWatchCacheEvent(7), time.NewTimer(1*time.Second)) {
 | 
						if !w.add(makeWatchCacheEvent(7), time.NewTimer(1*time.Second)) {
 | 
				
			||||||
		t.Fatal("failed adding an even to the watcher")
 | 
							t.Fatal("failed adding an even to the watcher")
 | 
				
			||||||
@@ -496,7 +496,7 @@ func TestCacheWatcherDrainingNoBookmarkAfterResourceVersionReceived(t *testing.T
 | 
				
			|||||||
		{Object: &v1.Pod{}},
 | 
							{Object: &v1.Pod{}},
 | 
				
			||||||
		{Object: &v1.Pod{}},
 | 
							{Object: &v1.Pod{}},
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	w = newCacheWatcher(0, filter, forget, testVersioner{}, time.Now(), true, schema.GroupResource{Resource: "pods"}, "")
 | 
						w = newCacheWatcher(0, filter, forget, storage.APIObjectVersioner{}, time.Now(), true, schema.GroupResource{Resource: "pods"}, "")
 | 
				
			||||||
	w.setBookmarkAfterResourceVersion(10)
 | 
						w.setBookmarkAfterResourceVersion(10)
 | 
				
			||||||
	go w.processInterval(context.Background(), intervalFromEvents(initEvents), 0)
 | 
						go w.processInterval(context.Background(), intervalFromEvents(initEvents), 0)
 | 
				
			||||||
	if w.add(&watchCacheEvent{Object: &v1.Pod{}}, time.NewTimer(1*time.Second)) {
 | 
						if w.add(&watchCacheEvent{Object: &v1.Pod{}}, time.NewTimer(1*time.Second)) {
 | 
				
			||||||
@@ -542,7 +542,7 @@ func TestCacheWatcherDrainingNoBookmarkAfterResourceVersionSent(t *testing.T) {
 | 
				
			|||||||
		w.stopLocked()
 | 
							w.stopLocked()
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	initEvents := []*watchCacheEvent{{Object: makePod(1)}, {Object: makePod(2)}}
 | 
						initEvents := []*watchCacheEvent{{Object: makePod(1)}, {Object: makePod(2)}}
 | 
				
			||||||
	w = newCacheWatcher(2, filter, forget, testVersioner{}, time.Now(), true, schema.GroupResource{Resource: "pods"}, "")
 | 
						w = newCacheWatcher(2, filter, forget, storage.APIObjectVersioner{}, time.Now(), true, schema.GroupResource{Resource: "pods"}, "")
 | 
				
			||||||
	w.setBookmarkAfterResourceVersion(10)
 | 
						w.setBookmarkAfterResourceVersion(10)
 | 
				
			||||||
	go w.processInterval(ctx, intervalFromEvents(initEvents), 0)
 | 
						go w.processInterval(ctx, intervalFromEvents(initEvents), 0)
 | 
				
			||||||
	watchInitializationSignal.Wait()
 | 
						watchInitializationSignal.Wait()
 | 
				
			||||||
@@ -596,7 +596,7 @@ func TestCacheWatcherDrainingNoBookmarkAfterResourceVersionSent(t *testing.T) {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
func TestBookmarkAfterResourceVersionWatchers(t *testing.T) {
 | 
					func TestBookmarkAfterResourceVersionWatchers(t *testing.T) {
 | 
				
			||||||
	newWatcher := func(id string, deadline time.Time) *cacheWatcher {
 | 
						newWatcher := func(id string, deadline time.Time) *cacheWatcher {
 | 
				
			||||||
		w := newCacheWatcher(0, func(_ string, _ labels.Set, _ fields.Set) bool { return true }, func(bool) {}, testVersioner{}, deadline, true, schema.GroupResource{Resource: "pods"}, id)
 | 
							w := newCacheWatcher(0, func(_ string, _ labels.Set, _ fields.Set) bool { return true }, func(bool) {}, storage.APIObjectVersioner{}, deadline, true, schema.GroupResource{Resource: "pods"}, id)
 | 
				
			||||||
		w.setBookmarkAfterResourceVersion(10)
 | 
							w.setBookmarkAfterResourceVersion(10)
 | 
				
			||||||
		return w
 | 
							return w
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -33,7 +33,6 @@ import (
 | 
				
			|||||||
	"k8s.io/apimachinery/pkg/api/apitesting"
 | 
						"k8s.io/apimachinery/pkg/api/apitesting"
 | 
				
			||||||
	apiequality "k8s.io/apimachinery/pkg/api/equality"
 | 
						apiequality "k8s.io/apimachinery/pkg/api/equality"
 | 
				
			||||||
	apierrors "k8s.io/apimachinery/pkg/api/errors"
 | 
						apierrors "k8s.io/apimachinery/pkg/api/errors"
 | 
				
			||||||
	"k8s.io/apimachinery/pkg/api/meta"
 | 
					 | 
				
			||||||
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
						metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
				
			||||||
	"k8s.io/apimachinery/pkg/fields"
 | 
						"k8s.io/apimachinery/pkg/fields"
 | 
				
			||||||
	"k8s.io/apimachinery/pkg/labels"
 | 
						"k8s.io/apimachinery/pkg/labels"
 | 
				
			||||||
@@ -58,42 +57,6 @@ import (
 | 
				
			|||||||
	"k8s.io/utils/pointer"
 | 
						"k8s.io/utils/pointer"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
type testVersioner struct{}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func (testVersioner) UpdateObject(obj runtime.Object, resourceVersion uint64) error {
 | 
					 | 
				
			||||||
	return meta.NewAccessor().SetResourceVersion(obj, strconv.FormatUint(resourceVersion, 10))
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
func (testVersioner) UpdateList(obj runtime.Object, resourceVersion uint64, continueValue string, count *int64) error {
 | 
					 | 
				
			||||||
	listAccessor, err := meta.ListAccessor(obj)
 | 
					 | 
				
			||||||
	if err != nil || listAccessor == nil {
 | 
					 | 
				
			||||||
		return err
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	listAccessor.SetResourceVersion(strconv.FormatUint(resourceVersion, 10))
 | 
					 | 
				
			||||||
	listAccessor.SetContinue(continueValue)
 | 
					 | 
				
			||||||
	listAccessor.SetRemainingItemCount(count)
 | 
					 | 
				
			||||||
	return nil
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
func (testVersioner) PrepareObjectForStorage(obj runtime.Object) error {
 | 
					 | 
				
			||||||
	return fmt.Errorf("unimplemented")
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
func (testVersioner) ObjectResourceVersion(obj runtime.Object) (uint64, error) {
 | 
					 | 
				
			||||||
	accessor, err := meta.Accessor(obj)
 | 
					 | 
				
			||||||
	if err != nil {
 | 
					 | 
				
			||||||
		return 0, err
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	version := accessor.GetResourceVersion()
 | 
					 | 
				
			||||||
	if len(version) == 0 {
 | 
					 | 
				
			||||||
		return 0, nil
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	return strconv.ParseUint(version, 10, 64)
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
func (testVersioner) ParseResourceVersion(resourceVersion string) (uint64, error) {
 | 
					 | 
				
			||||||
	if len(resourceVersion) == 0 {
 | 
					 | 
				
			||||||
		return 0, nil
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	return strconv.ParseUint(resourceVersion, 10, 64)
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
var (
 | 
					var (
 | 
				
			||||||
	scheme   = runtime.NewScheme()
 | 
						scheme   = runtime.NewScheme()
 | 
				
			||||||
	codecs   = serializer.NewCodecFactory(scheme)
 | 
						codecs   = serializer.NewCodecFactory(scheme)
 | 
				
			||||||
@@ -111,7 +74,7 @@ func newTestCacher(s storage.Interface) (*Cacher, storage.Versioner, error) {
 | 
				
			|||||||
	prefix := "pods"
 | 
						prefix := "pods"
 | 
				
			||||||
	config := Config{
 | 
						config := Config{
 | 
				
			||||||
		Storage:        s,
 | 
							Storage:        s,
 | 
				
			||||||
		Versioner:      testVersioner{},
 | 
							Versioner:      storage.APIObjectVersioner{},
 | 
				
			||||||
		GroupResource:  schema.GroupResource{Resource: "pods"},
 | 
							GroupResource:  schema.GroupResource{Resource: "pods"},
 | 
				
			||||||
		ResourcePrefix: prefix,
 | 
							ResourcePrefix: prefix,
 | 
				
			||||||
		KeyFunc:        func(obj runtime.Object) (string, error) { return storage.NamespaceKeyFunc(prefix, obj) },
 | 
							KeyFunc:        func(obj runtime.Object) (string, error) { return storage.NamespaceKeyFunc(prefix, obj) },
 | 
				
			||||||
@@ -133,7 +96,7 @@ func newTestCacher(s storage.Interface) (*Cacher, storage.Versioner, error) {
 | 
				
			|||||||
		Clock:       clock.RealClock{},
 | 
							Clock:       clock.RealClock{},
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	cacher, err := NewCacherFromConfig(config)
 | 
						cacher, err := NewCacherFromConfig(config)
 | 
				
			||||||
	return cacher, testVersioner{}, err
 | 
						return cacher, storage.APIObjectVersioner{}, err
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
type dummyStorage struct {
 | 
					type dummyStorage struct {
 | 
				
			||||||
@@ -348,6 +311,90 @@ func TestWatchCacheBypass(t *testing.T) {
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func TestEmptyWatchEventCache(t *testing.T) {
 | 
				
			||||||
 | 
						server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix())
 | 
				
			||||||
 | 
						defer server.Terminate(t)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// add a few objects
 | 
				
			||||||
 | 
						v := storage.APIObjectVersioner{}
 | 
				
			||||||
 | 
						lastRV := uint64(0)
 | 
				
			||||||
 | 
						for i := 0; i < 5; i++ {
 | 
				
			||||||
 | 
							pod := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("foo-%d", i), Namespace: "test-ns"}}
 | 
				
			||||||
 | 
							out := &example.Pod{}
 | 
				
			||||||
 | 
							key := computePodKey(pod)
 | 
				
			||||||
 | 
							if err := etcdStorage.Create(context.Background(), key, pod, out, 0); err != nil {
 | 
				
			||||||
 | 
								t.Fatalf("Create failed: %v", err)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							var err error
 | 
				
			||||||
 | 
							if lastRV, err = v.ParseResourceVersion(out.ResourceVersion); err != nil {
 | 
				
			||||||
 | 
								t.Fatalf("Unexpected error: %v", err)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						cacher, _, err := newTestCacher(etcdStorage)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							t.Fatalf("Couldn't create cacher: %v", err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						defer cacher.Stop()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// Given that cacher is always initialized from the "current" version of etcd,
 | 
				
			||||||
 | 
						// we now have a cacher with an empty cache of watch events and a resourceVersion of rv.
 | 
				
			||||||
 | 
						// It should support establishing watches from rv and higher, but not older.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						expectedResourceExpiredError := apierrors.NewResourceExpired("").ErrStatus
 | 
				
			||||||
 | 
						tests := []struct {
 | 
				
			||||||
 | 
							name            string
 | 
				
			||||||
 | 
							resourceVersion uint64
 | 
				
			||||||
 | 
							expectedEvent   *watch.Event
 | 
				
			||||||
 | 
						}{
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								name:            "RV-1",
 | 
				
			||||||
 | 
								resourceVersion: lastRV - 1,
 | 
				
			||||||
 | 
								expectedEvent:   &watch.Event{Type: watch.Error, Object: &expectedResourceExpiredError},
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								name:            "RV",
 | 
				
			||||||
 | 
								resourceVersion: lastRV,
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								name:            "RV+1",
 | 
				
			||||||
 | 
								resourceVersion: lastRV + 1,
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						for _, tt := range tests {
 | 
				
			||||||
 | 
							t.Run(tt.name, func(t *testing.T) {
 | 
				
			||||||
 | 
								opts := storage.ListOptions{
 | 
				
			||||||
 | 
									ResourceVersion: strconv.Itoa(int(tt.resourceVersion)),
 | 
				
			||||||
 | 
									Predicate:       storage.Everything,
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								watcher, err := cacher.Watch(context.Background(), "/pods/test-ns", opts)
 | 
				
			||||||
 | 
								if err != nil {
 | 
				
			||||||
 | 
									t.Fatalf("Failed to create watch: %v", err)
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								defer watcher.Stop()
 | 
				
			||||||
 | 
								select {
 | 
				
			||||||
 | 
								case event := <-watcher.ResultChan():
 | 
				
			||||||
 | 
									if tt.expectedEvent == nil {
 | 
				
			||||||
 | 
										t.Errorf("Unexpected event: type=%#v, object=%#v", event.Type, event.Object)
 | 
				
			||||||
 | 
										break
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
 | 
									if e, a := tt.expectedEvent.Type, event.Type; e != a {
 | 
				
			||||||
 | 
										t.Errorf("Expected: %s, got: %s", e, a)
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
 | 
									if e, a := tt.expectedEvent.Object, event.Object; !apiequality.Semantic.DeepDerivative(e, a) {
 | 
				
			||||||
 | 
										t.Errorf("Expected: %#v, got: %#v", e, a)
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
 | 
								case <-time.After(3 * time.Second):
 | 
				
			||||||
 | 
									if tt.expectedEvent != nil {
 | 
				
			||||||
 | 
										t.Errorf("Failed to get an event")
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
 | 
									// watch remained established successfully
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
							})
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func TestWatchNotHangingOnStartupFailure(t *testing.T) {
 | 
					func TestWatchNotHangingOnStartupFailure(t *testing.T) {
 | 
				
			||||||
	// Configure cacher so that it can't initialize, because of
 | 
						// Configure cacher so that it can't initialize, because of
 | 
				
			||||||
	// constantly failing lists to the underlying storage.
 | 
						// constantly failing lists to the underlying storage.
 | 
				
			||||||
@@ -378,7 +425,7 @@ func TestWatchNotHangingOnStartupFailure(t *testing.T) {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
func TestWatcherNotGoingBackInTime(t *testing.T) {
 | 
					func TestWatcherNotGoingBackInTime(t *testing.T) {
 | 
				
			||||||
	backingStorage := &dummyStorage{}
 | 
						backingStorage := &dummyStorage{}
 | 
				
			||||||
	cacher, _, err := newTestCacher(backingStorage)
 | 
						cacher, v, err := newTestCacher(backingStorage)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		t.Fatalf("Couldn't create cacher: %v", err)
 | 
							t.Fatalf("Couldn't create cacher: %v", err)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
@@ -448,7 +495,7 @@ func TestWatcherNotGoingBackInTime(t *testing.T) {
 | 
				
			|||||||
				shouldContinue = false
 | 
									shouldContinue = false
 | 
				
			||||||
				break
 | 
									break
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
			rv, err := testVersioner{}.ParseResourceVersion(event.Object.(metaRuntimeInterface).GetResourceVersion())
 | 
								rv, err := v.ParseResourceVersion(event.Object.(metaRuntimeInterface).GetResourceVersion())
 | 
				
			||||||
			if err != nil {
 | 
								if err != nil {
 | 
				
			||||||
				t.Errorf("unexpected parsing error: %v", err)
 | 
									t.Errorf("unexpected parsing error: %v", err)
 | 
				
			||||||
			} else {
 | 
								} else {
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -35,6 +35,10 @@ import (
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
func newPod() runtime.Object { return &example.Pod{} }
 | 
					func newPod() runtime.Object { return &example.Pod{} }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func computePodKey(obj *example.Pod) string {
 | 
				
			||||||
 | 
						return fmt.Sprintf("/pods/%s/%s", obj.Namespace, obj.Name)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func newEtcdTestStorage(t *testing.T, prefix string) (*etcd3testing.EtcdTestServer, storage.Interface) {
 | 
					func newEtcdTestStorage(t *testing.T, prefix string) (*etcd3testing.EtcdTestServer, storage.Interface) {
 | 
				
			||||||
	server, _ := etcd3testing.NewUnsecuredEtcd3TestClientServer(t)
 | 
						server, _ := etcd3testing.NewUnsecuredEtcd3TestClientServer(t)
 | 
				
			||||||
	storage := etcd3.New(
 | 
						storage := etcd3.New(
 | 
				
			||||||
@@ -61,7 +65,7 @@ func TestCacherListerWatcher(t *testing.T) {
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
	for _, obj := range objects {
 | 
						for _, obj := range objects {
 | 
				
			||||||
		out := &example.Pod{}
 | 
							out := &example.Pod{}
 | 
				
			||||||
		key := fmt.Sprintf("/pods/%s/%s", obj.Namespace, obj.Name)
 | 
							key := computePodKey(obj)
 | 
				
			||||||
		if err := store.Create(context.Background(), key, obj, out, 0); err != nil {
 | 
							if err := store.Create(context.Background(), key, obj, out, 0); err != nil {
 | 
				
			||||||
			t.Fatalf("Create failed: %v", err)
 | 
								t.Fatalf("Create failed: %v", err)
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
@@ -97,7 +101,7 @@ func TestCacherListerWatcherPagination(t *testing.T) {
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
	for _, obj := range objects {
 | 
						for _, obj := range objects {
 | 
				
			||||||
		out := &example.Pod{}
 | 
							out := &example.Pod{}
 | 
				
			||||||
		key := fmt.Sprintf("/pods/%s/%s", obj.Namespace, obj.Name)
 | 
							key := computePodKey(obj)
 | 
				
			||||||
		if err := store.Create(context.Background(), key, obj, out, 0); err != nil {
 | 
							if err := store.Create(context.Background(), key, obj, out, 0); err != nil {
 | 
				
			||||||
			t.Fatalf("Create failed: %v", err)
 | 
								t.Fatalf("Create failed: %v", err)
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -112,10 +112,6 @@ func newEtcdTestStorage(t *testing.T, prefix string, pagingEnabled bool) (*etcd3
 | 
				
			|||||||
	return server, storage
 | 
						return server, storage
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func newTestCacher(s storage.Interface) (*cacherstorage.Cacher, storage.Versioner, error) {
 | 
					 | 
				
			||||||
	return newTestCacherWithClock(s, clock.RealClock{})
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func newTestCacherWithClock(s storage.Interface, clock clock.Clock) (*cacherstorage.Cacher, storage.Versioner, error) {
 | 
					func newTestCacherWithClock(s storage.Interface, clock clock.Clock) (*cacherstorage.Cacher, storage.Versioner, error) {
 | 
				
			||||||
	prefix := "pods"
 | 
						prefix := "pods"
 | 
				
			||||||
	v := storage.APIObjectVersioner{}
 | 
						v := storage.APIObjectVersioner{}
 | 
				
			||||||
@@ -490,73 +486,6 @@ func TestWatchDeprecated(t *testing.T) {
 | 
				
			|||||||
	verifyWatchEvent(t, tooOldWatcher, watch.Error, &expectedResourceExpiredError)
 | 
						verifyWatchEvent(t, tooOldWatcher, watch.Error, &expectedResourceExpiredError)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func TestEmptyWatchEventCache(t *testing.T) {
 | 
					 | 
				
			||||||
	server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix(), true)
 | 
					 | 
				
			||||||
	defer server.Terminate(t)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	// add a few objects
 | 
					 | 
				
			||||||
	updatePod(t, etcdStorage, makeTestPod("pod1"), nil)
 | 
					 | 
				
			||||||
	updatePod(t, etcdStorage, makeTestPod("pod2"), nil)
 | 
					 | 
				
			||||||
	updatePod(t, etcdStorage, makeTestPod("pod3"), nil)
 | 
					 | 
				
			||||||
	updatePod(t, etcdStorage, makeTestPod("pod4"), nil)
 | 
					 | 
				
			||||||
	updatePod(t, etcdStorage, makeTestPod("pod5"), nil)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	fooCreated := updatePod(t, etcdStorage, makeTestPod("foo"), nil)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	cacher, v, err := newTestCacher(etcdStorage)
 | 
					 | 
				
			||||||
	if err != nil {
 | 
					 | 
				
			||||||
		t.Fatalf("Couldn't create cacher: %v", err)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	defer cacher.Stop()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	// get rv of last pod created
 | 
					 | 
				
			||||||
	rv, err := v.ParseResourceVersion(fooCreated.ResourceVersion)
 | 
					 | 
				
			||||||
	if err != nil {
 | 
					 | 
				
			||||||
		t.Fatalf("Unexpected error: %v", err)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	// We now have a cacher with an empty cache of watch events and a resourceVersion of rv.
 | 
					 | 
				
			||||||
	// It should support establishing watches from rv and higher, but not older.
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	{
 | 
					 | 
				
			||||||
		watcher, err := cacher.Watch(context.TODO(), "pods/ns", storage.ListOptions{ResourceVersion: strconv.Itoa(int(rv - 1)), Predicate: storage.Everything})
 | 
					 | 
				
			||||||
		if err != nil {
 | 
					 | 
				
			||||||
			t.Fatalf("Unexpected error: %v", err)
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
		defer watcher.Stop()
 | 
					 | 
				
			||||||
		expectedResourceExpiredError := errors.NewResourceExpired("").ErrStatus
 | 
					 | 
				
			||||||
		verifyWatchEvent(t, watcher, watch.Error, &expectedResourceExpiredError)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	{
 | 
					 | 
				
			||||||
		watcher, err := cacher.Watch(context.TODO(), "pods/ns", storage.ListOptions{ResourceVersion: strconv.Itoa(int(rv + 1)), Predicate: storage.Everything})
 | 
					 | 
				
			||||||
		if err != nil {
 | 
					 | 
				
			||||||
			t.Fatalf("Unexpected error: %v", err)
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
		defer watcher.Stop()
 | 
					 | 
				
			||||||
		select {
 | 
					 | 
				
			||||||
		case e := <-watcher.ResultChan():
 | 
					 | 
				
			||||||
			t.Errorf("unexpected event %#v", e)
 | 
					 | 
				
			||||||
		case <-time.After(3 * time.Second):
 | 
					 | 
				
			||||||
			// watch from rv+1 remained established successfully
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	{
 | 
					 | 
				
			||||||
		watcher, err := cacher.Watch(context.TODO(), "pods/ns", storage.ListOptions{ResourceVersion: strconv.Itoa(int(rv)), Predicate: storage.Everything})
 | 
					 | 
				
			||||||
		if err != nil {
 | 
					 | 
				
			||||||
			t.Fatalf("Unexpected error: %v", err)
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
		defer watcher.Stop()
 | 
					 | 
				
			||||||
		select {
 | 
					 | 
				
			||||||
		case e := <-watcher.ResultChan():
 | 
					 | 
				
			||||||
			t.Errorf("unexpected event %#v", e)
 | 
					 | 
				
			||||||
		case <-time.After(3 * time.Second):
 | 
					 | 
				
			||||||
			// watch from rv remained established successfully
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func TestWatchDispatchBookmarkEvents(t *testing.T) {
 | 
					func TestWatchDispatchBookmarkEvents(t *testing.T) {
 | 
				
			||||||
	ctx, cacher, terminate := testSetup(t)
 | 
						ctx, cacher, terminate := testSetup(t)
 | 
				
			||||||
	t.Cleanup(terminate)
 | 
						t.Cleanup(terminate)
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user