Improve Reflector unit tests
- Add tests to confirm that Stop is always called. - Add TODOs to show were Stop is not currently being called (to fix in a future PR)
This commit is contained in:
		| @@ -322,3 +322,21 @@ func (pw *ProxyWatcher) ResultChan() <-chan Event { | ||||
| func (pw *ProxyWatcher) StopChan() <-chan struct{} { | ||||
| 	return pw.stopCh | ||||
| } | ||||
|  | ||||
| // MockWatcher implements watch.Interface with mockable functions. | ||||
| type MockWatcher struct { | ||||
| 	StopFunc       func() | ||||
| 	ResultChanFunc func() <-chan Event | ||||
| } | ||||
|  | ||||
| var _ Interface = &MockWatcher{} | ||||
|  | ||||
| // Stop calls StopFunc | ||||
| func (mw MockWatcher) Stop() { | ||||
| 	mw.StopFunc() | ||||
| } | ||||
|  | ||||
| // ResultChan calls ResultChanFunc | ||||
| func (mw MockWatcher) ResultChan() <-chan Event { | ||||
| 	return mw.ResultChanFunc() | ||||
| } | ||||
|   | ||||
| @@ -28,6 +28,7 @@ import ( | ||||
| 	"testing" | ||||
| 	"time" | ||||
|  | ||||
| 	"github.com/stretchr/testify/assert" | ||||
| 	"github.com/stretchr/testify/require" | ||||
|  | ||||
| 	v1 "k8s.io/api/core/v1" | ||||
| @@ -97,19 +98,35 @@ func TestRunUntil(t *testing.T) { | ||||
| 			return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "1"}}, nil | ||||
| 		}, | ||||
| 	} | ||||
| 	go r.Run(stopCh) | ||||
| 	doneCh := make(chan struct{}) | ||||
| 	go func() { | ||||
| 		defer close(doneCh) | ||||
| 		r.Run(stopCh) | ||||
| 	}() | ||||
| 	// Synchronously add a dummy pod into the watch channel so we | ||||
| 	// know the RunUntil go routine is in the watch handler. | ||||
| 	fw.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}}) | ||||
|  | ||||
| 	close(stopCh) | ||||
| 	select { | ||||
| 	case _, ok := <-fw.ResultChan(): | ||||
| 		if ok { | ||||
| 			t.Errorf("Watch channel left open after stopping the watch") | ||||
| 	resultCh := fw.ResultChan() | ||||
| 	for { | ||||
| 		select { | ||||
| 		case <-doneCh: | ||||
| 			if resultCh == nil { | ||||
| 				return // both closed | ||||
| 			} | ||||
| 			doneCh = nil | ||||
| 		case _, ok := <-resultCh: | ||||
| 			if ok { | ||||
| 				t.Fatalf("Watch channel left open after stopping the watch") | ||||
| 			} | ||||
| 			if doneCh == nil { | ||||
| 				return // both closed | ||||
| 			} | ||||
| 			resultCh = nil | ||||
| 		case <-time.After(wait.ForeverTestTimeout): | ||||
| 			t.Fatalf("the cancellation is at least %s late", wait.ForeverTestTimeout.String()) | ||||
| 		} | ||||
| 	case <-time.After(wait.ForeverTestTimeout): | ||||
| 		t.Errorf("the cancellation is at least %s late", wait.ForeverTestTimeout.String()) | ||||
| 		break | ||||
| 	} | ||||
| } | ||||
|  | ||||
| @@ -126,26 +143,61 @@ func TestReflectorResyncChan(t *testing.T) { | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // TestEstablishedWatchStoppedAfterStopCh ensures that | ||||
| // an established watch will be closed right after | ||||
| // the StopCh was also closed. | ||||
| func TestEstablishedWatchStoppedAfterStopCh(t *testing.T) { | ||||
| 	ctx, ctxCancel := context.WithCancel(context.TODO()) | ||||
| 	ctxCancel() | ||||
| 	w := watch.NewFake() | ||||
| 	require.False(t, w.IsStopped()) | ||||
| // TestReflectorWatchStoppedBefore ensures that neither List nor Watch are | ||||
| // called if the stop channel is closed before Reflector.watch is called. | ||||
| func TestReflectorWatchStoppedBefore(t *testing.T) { | ||||
| 	stopCh := make(chan struct{}) | ||||
| 	close(stopCh) | ||||
|  | ||||
| 	// w is stopped when the stopCh is closed | ||||
| 	target := NewReflector(nil, &v1.Pod{}, nil, 0) | ||||
| 	err := target.watch(w, ctx.Done(), nil) | ||||
| 	require.NoError(t, err) | ||||
| 	require.True(t, w.IsStopped()) | ||||
| 	lw := &ListWatch{ | ||||
| 		ListFunc: func(_ metav1.ListOptions) (runtime.Object, error) { | ||||
| 			t.Fatal("ListFunc called unexpectedly") | ||||
| 			return nil, nil | ||||
| 		}, | ||||
| 		WatchFunc: func(_ metav1.ListOptions) (watch.Interface, error) { | ||||
| 			// If WatchFunc is never called, the watcher it returns doesn't need to be stopped. | ||||
| 			t.Fatal("WatchFunc called unexpectedly") | ||||
| 			return nil, nil | ||||
| 		}, | ||||
| 	} | ||||
| 	target := NewReflector(lw, &v1.Pod{}, nil, 0) | ||||
|  | ||||
| 	// noop when the w is nil and the ctx is closed | ||||
| 	err = target.watch(nil, ctx.Done(), nil) | ||||
| 	err := target.watch(nil, stopCh, nil) | ||||
| 	require.NoError(t, err) | ||||
| } | ||||
|  | ||||
| // TestReflectorWatchStoppedAfter ensures that neither the watcher is stopped if | ||||
| // the stop channel is closed after Reflector.watch has started watching. | ||||
| func TestReflectorWatchStoppedAfter(t *testing.T) { | ||||
| 	stopCh := make(chan struct{}) | ||||
|  | ||||
| 	var watchers []*watch.FakeWatcher | ||||
|  | ||||
| 	lw := &ListWatch{ | ||||
| 		ListFunc: func(_ metav1.ListOptions) (runtime.Object, error) { | ||||
| 			t.Fatal("ListFunc called unexpectedly") | ||||
| 			return nil, nil | ||||
| 		}, | ||||
| 		WatchFunc: func(_ metav1.ListOptions) (watch.Interface, error) { | ||||
| 			// Simulate the stop channel being closed after watching has started | ||||
| 			go func() { | ||||
| 				time.Sleep(10 * time.Millisecond) | ||||
| 				close(stopCh) | ||||
| 			}() | ||||
| 			// Use a fake watcher that never sends events | ||||
| 			w := watch.NewFake() | ||||
| 			watchers = append(watchers, w) | ||||
| 			return w, nil | ||||
| 		}, | ||||
| 	} | ||||
| 	target := NewReflector(lw, &v1.Pod{}, nil, 0) | ||||
|  | ||||
| 	err := target.watch(nil, stopCh, nil) | ||||
| 	require.NoError(t, err) | ||||
| 	require.Equal(t, 1, len(watchers)) | ||||
| 	require.True(t, watchers[0].IsStopped()) | ||||
| } | ||||
|  | ||||
| func BenchmarkReflectorResyncChanMany(b *testing.B) { | ||||
| 	s := NewStore(MetaNamespaceKeyFunc) | ||||
| 	g := NewReflector(&testLW{}, &v1.Pod{}, s, 25*time.Millisecond) | ||||
| @@ -158,22 +210,148 @@ func BenchmarkReflectorResyncChanMany(b *testing.B) { | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func TestReflectorWatchHandlerError(t *testing.T) { | ||||
| // TestReflectorHandleWatchStoppedBefore ensures that handleWatch stops when | ||||
| // stopCh is already closed before handleWatch was called. It also ensures that | ||||
| // ResultChan is only called once and that Stop is called after ResultChan. | ||||
| func TestReflectorHandleWatchStoppedBefore(t *testing.T) { | ||||
| 	s := NewStore(MetaNamespaceKeyFunc) | ||||
| 	g := NewReflector(&testLW{}, &v1.Pod{}, s, 0) | ||||
| 	fw := watch.NewFake() | ||||
| 	go func() { | ||||
| 		fw.Stop() | ||||
| 	}() | ||||
| 	stopCh := make(chan struct{}) | ||||
| 	// Simulate the watch channel being closed before the watchHandler is called | ||||
| 	close(stopCh) | ||||
| 	var calls []string | ||||
| 	resultCh := make(chan watch.Event) | ||||
| 	fw := watch.MockWatcher{ | ||||
| 		StopFunc: func() { | ||||
| 			calls = append(calls, "Stop") | ||||
| 			close(resultCh) | ||||
| 		}, | ||||
| 		ResultChanFunc: func() <-chan watch.Event { | ||||
| 			calls = append(calls, "ResultChan") | ||||
| 			return resultCh | ||||
| 		}, | ||||
| 	} | ||||
| 	err := watchHandler(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, nil, g.clock, nevererrc, stopCh) | ||||
| 	if err == nil { | ||||
| 		t.Errorf("unexpected non-error") | ||||
| 	} | ||||
| 	// Ensure the watcher methods are called exactly once in this exact order. | ||||
| 	// TODO(karlkfi): Fix watchHandler to call Stop() | ||||
| 	// assert.Equal(t, []string{"ResultChan", "Stop"}, calls) | ||||
| 	assert.Equal(t, []string{"ResultChan"}, calls) | ||||
| } | ||||
|  | ||||
| // TestReflectorHandleWatchStoppedAfter ensures that handleWatch stops when | ||||
| // stopCh is closed after handleWatch was called. It also ensures that | ||||
| // ResultChan is only called once and that Stop is called after ResultChan. | ||||
| func TestReflectorHandleWatchStoppedAfter(t *testing.T) { | ||||
| 	s := NewStore(MetaNamespaceKeyFunc) | ||||
| 	g := NewReflector(&testLW{}, &v1.Pod{}, s, 0) | ||||
| 	var calls []string | ||||
| 	stopCh := make(chan struct{}) | ||||
| 	resultCh := make(chan watch.Event) | ||||
| 	fw := watch.MockWatcher{ | ||||
| 		StopFunc: func() { | ||||
| 			calls = append(calls, "Stop") | ||||
| 			close(resultCh) | ||||
| 		}, | ||||
| 		ResultChanFunc: func() <-chan watch.Event { | ||||
| 			calls = append(calls, "ResultChan") | ||||
| 			resultCh = make(chan watch.Event) | ||||
| 			// Simulate the watch handler being stopped asynchronously by the | ||||
| 			// caller, after watching has started. | ||||
| 			go func() { | ||||
| 				time.Sleep(10 * time.Millisecond) | ||||
| 				close(stopCh) | ||||
| 			}() | ||||
| 			return resultCh | ||||
| 		}, | ||||
| 	} | ||||
| 	err := watchHandler(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, nil, g.clock, nevererrc, stopCh) | ||||
| 	if err == nil { | ||||
| 		t.Errorf("unexpected non-error") | ||||
| 	} | ||||
| 	// Ensure the watcher methods are called exactly once in this exact order. | ||||
| 	// TODO(karlkfi): Fix watchHandler to call Stop() | ||||
| 	// assert.Equal(t, []string{"ResultChan", "Stop"}, calls) | ||||
| 	assert.Equal(t, []string{"ResultChan"}, calls) | ||||
| } | ||||
|  | ||||
| // TestReflectorHandleWatchResultChanClosedBefore ensures that handleWatch | ||||
| // stops when the result channel is closed before handleWatch was called. | ||||
| func TestReflectorHandleWatchResultChanClosedBefore(t *testing.T) { | ||||
| 	s := NewStore(MetaNamespaceKeyFunc) | ||||
| 	g := NewReflector(&testLW{}, &v1.Pod{}, s, 0) | ||||
| 	var calls []string | ||||
| 	resultCh := make(chan watch.Event) | ||||
| 	fw := watch.MockWatcher{ | ||||
| 		StopFunc: func() { | ||||
| 			calls = append(calls, "Stop") | ||||
| 		}, | ||||
| 		ResultChanFunc: func() <-chan watch.Event { | ||||
| 			calls = append(calls, "ResultChan") | ||||
| 			return resultCh | ||||
| 		}, | ||||
| 	} | ||||
| 	// Simulate the result channel being closed by the producer before handleWatch is called. | ||||
| 	close(resultCh) | ||||
| 	err := watchHandler(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, nil, g.clock, nevererrc, wait.NeverStop) | ||||
| 	if err == nil { | ||||
| 		t.Errorf("unexpected non-error") | ||||
| 	} | ||||
| 	// Ensure the watcher methods are called exactly once in this exact order. | ||||
| 	// TODO(karlkfi): Fix watchHandler to call Stop() | ||||
| 	// assert.Equal(t, []string{"ResultChan", "Stop"}, calls) | ||||
| 	assert.Equal(t, []string{"ResultChan"}, calls) | ||||
| } | ||||
|  | ||||
| // TestReflectorHandleWatchResultChanClosedAfter ensures that handleWatch | ||||
| // stops when the result channel is closed after handleWatch has started watching. | ||||
| func TestReflectorHandleWatchResultChanClosedAfter(t *testing.T) { | ||||
| 	s := NewStore(MetaNamespaceKeyFunc) | ||||
| 	g := NewReflector(&testLW{}, &v1.Pod{}, s, 0) | ||||
| 	var calls []string | ||||
| 	resultCh := make(chan watch.Event) | ||||
| 	fw := watch.MockWatcher{ | ||||
| 		StopFunc: func() { | ||||
| 			calls = append(calls, "Stop") | ||||
| 		}, | ||||
| 		ResultChanFunc: func() <-chan watch.Event { | ||||
| 			calls = append(calls, "ResultChan") | ||||
| 			resultCh = make(chan watch.Event) | ||||
| 			// Simulate the result channel being closed by the producer, after | ||||
| 			// watching has started. | ||||
| 			go func() { | ||||
| 				time.Sleep(10 * time.Millisecond) | ||||
| 				close(resultCh) | ||||
| 			}() | ||||
| 			return resultCh | ||||
| 		}, | ||||
| 	} | ||||
| 	err := watchHandler(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, nil, g.clock, nevererrc, wait.NeverStop) | ||||
| 	if err == nil { | ||||
| 		t.Errorf("unexpected non-error") | ||||
| 	} | ||||
| 	// Ensure the watcher methods are called exactly once in this exact order. | ||||
| 	// TODO(karlkfi): Fix watchHandler to call Stop() | ||||
| 	// assert.Equal(t, []string{"ResultChan", "Stop"}, calls) | ||||
| 	assert.Equal(t, []string{"ResultChan"}, calls) | ||||
| } | ||||
|  | ||||
| func TestReflectorWatchHandler(t *testing.T) { | ||||
| 	s := NewStore(MetaNamespaceKeyFunc) | ||||
| 	g := NewReflector(&testLW{}, &v1.Pod{}, s, 0) | ||||
| 	// Wrap setLastSyncResourceVersion so we can tell the watchHandler to stop | ||||
| 	// watching after all the events have been consumed. This avoids race | ||||
| 	// conditions which can happen if the producer calls Stop(), instead of the | ||||
| 	// consumer. | ||||
| 	stopCh := make(chan struct{}) | ||||
| 	setLastSyncResourceVersion := func(rv string) { | ||||
| 		g.setLastSyncResourceVersion(rv) | ||||
| 		if rv == "32" { | ||||
| 			close(stopCh) | ||||
| 		} | ||||
| 	} | ||||
| 	fw := watch.NewFake() | ||||
| 	s.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}) | ||||
| 	s.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}}) | ||||
| @@ -184,8 +362,8 @@ func TestReflectorWatchHandler(t *testing.T) { | ||||
| 		fw.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "baz", ResourceVersion: "32"}}) | ||||
| 		fw.Stop() | ||||
| 	}() | ||||
| 	err := watchHandler(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, nil, g.clock, nevererrc, wait.NeverStop) | ||||
| 	if err != nil { | ||||
| 	err := watchHandler(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, setLastSyncResourceVersion, nil, g.clock, nevererrc, stopCh) | ||||
| 	if !errors.Is(err, errorStopRequested) { | ||||
| 		t.Errorf("unexpected error %v", err) | ||||
| 	} | ||||
|  | ||||
| @@ -193,6 +371,7 @@ func TestReflectorWatchHandler(t *testing.T) { | ||||
| 		return &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: id, ResourceVersion: rv}} | ||||
| 	} | ||||
|  | ||||
| 	// Validate that the Store was updated by the events | ||||
| 	table := []struct { | ||||
| 		Pod    *v1.Pod | ||||
| 		exists bool | ||||
| @@ -215,12 +394,7 @@ func TestReflectorWatchHandler(t *testing.T) { | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	// RV should send the last version we see. | ||||
| 	if e, a := "32", g.LastSyncResourceVersion(); e != a { | ||||
| 		t.Errorf("expected %v, got %v", e, a) | ||||
| 	} | ||||
|  | ||||
| 	// last sync resource version should be the last version synced with store | ||||
| 	// Validate that setLastSyncResourceVersion was called with the RV from the last event. | ||||
| 	if e, a := "32", g.LastSyncResourceVersion(); e != a { | ||||
| 		t.Errorf("expected %v, got %v", e, a) | ||||
| 	} | ||||
| @@ -230,8 +404,8 @@ func TestReflectorStopWatch(t *testing.T) { | ||||
| 	s := NewStore(MetaNamespaceKeyFunc) | ||||
| 	g := NewReflector(&testLW{}, &v1.Pod{}, s, 0) | ||||
| 	fw := watch.NewFake() | ||||
| 	stopWatch := make(chan struct{}, 1) | ||||
| 	stopWatch <- struct{}{} | ||||
| 	stopWatch := make(chan struct{}) | ||||
| 	close(stopWatch) | ||||
| 	err := watchHandler(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, nil, g.clock, nevererrc, stopWatch) | ||||
| 	if err != errorStopRequested { | ||||
| 		t.Errorf("expected stop error, got %q", err) | ||||
| @@ -361,6 +535,7 @@ func TestReflectorListAndWatchWithErrors(t *testing.T) { | ||||
| 			} | ||||
| 		} | ||||
| 		watchRet, watchErr := item.events, item.watchErr | ||||
| 		stopCh := make(chan struct{}) | ||||
| 		lw := &testLW{ | ||||
| 			WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { | ||||
| 				if watchErr != nil { | ||||
| @@ -372,7 +547,13 @@ func TestReflectorListAndWatchWithErrors(t *testing.T) { | ||||
| 					for _, e := range watchRet { | ||||
| 						fw.Action(e.Type, e.Object) | ||||
| 					} | ||||
| 					fw.Stop() | ||||
| 					// Because FakeWatcher doesn't buffer events, it's safe to | ||||
| 					// close the stop channel immediately without missing events. | ||||
| 					// But usually, the event producer would instead close the | ||||
| 					// result channel, and wait for the consumer to stop the | ||||
| 					// watcher, to avoid race conditions. | ||||
| 					// TODO: Fix the FakeWatcher to separate watcher.Stop from close(resultCh) | ||||
| 					close(stopCh) | ||||
| 				}() | ||||
| 				return fw, nil | ||||
| 			}, | ||||
| @@ -381,7 +562,16 @@ func TestReflectorListAndWatchWithErrors(t *testing.T) { | ||||
| 			}, | ||||
| 		} | ||||
| 		r := NewReflector(lw, &v1.Pod{}, s, 0) | ||||
| 		r.ListAndWatch(wait.NeverStop) | ||||
| 		err := r.ListAndWatch(stopCh) | ||||
| 		if item.listErr != nil && !errors.Is(err, item.listErr) { | ||||
| 			t.Errorf("unexpected ListAndWatch error: %v", err) | ||||
| 		} | ||||
| 		if item.watchErr != nil && !errors.Is(err, item.watchErr) { | ||||
| 			t.Errorf("unexpected ListAndWatch error: %v", err) | ||||
| 		} | ||||
| 		if item.listErr == nil && item.watchErr == nil { | ||||
| 			assert.NoError(t, err) | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|  | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 Karl Isenberg
					Karl Isenberg