Merge pull request #125299 from karlkfi/karl-reflector-fix-2
Improve Reflector unit tests
This commit is contained in:
		@@ -322,3 +322,21 @@ func (pw *ProxyWatcher) ResultChan() <-chan Event {
 | 
				
			|||||||
func (pw *ProxyWatcher) StopChan() <-chan struct{} {
 | 
					func (pw *ProxyWatcher) StopChan() <-chan struct{} {
 | 
				
			||||||
	return pw.stopCh
 | 
						return pw.stopCh
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// MockWatcher implements watch.Interface with mockable functions.
 | 
				
			||||||
 | 
					type MockWatcher struct {
 | 
				
			||||||
 | 
						StopFunc       func()
 | 
				
			||||||
 | 
						ResultChanFunc func() <-chan Event
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					var _ Interface = &MockWatcher{}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// Stop calls StopFunc
 | 
				
			||||||
 | 
					func (mw MockWatcher) Stop() {
 | 
				
			||||||
 | 
						mw.StopFunc()
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// ResultChan calls ResultChanFunc
 | 
				
			||||||
 | 
					func (mw MockWatcher) ResultChan() <-chan Event {
 | 
				
			||||||
 | 
						return mw.ResultChanFunc()
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -28,6 +28,7 @@ import (
 | 
				
			|||||||
	"testing"
 | 
						"testing"
 | 
				
			||||||
	"time"
 | 
						"time"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						"github.com/stretchr/testify/assert"
 | 
				
			||||||
	"github.com/stretchr/testify/require"
 | 
						"github.com/stretchr/testify/require"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	v1 "k8s.io/api/core/v1"
 | 
						v1 "k8s.io/api/core/v1"
 | 
				
			||||||
@@ -97,19 +98,35 @@ func TestRunUntil(t *testing.T) {
 | 
				
			|||||||
			return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "1"}}, nil
 | 
								return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "1"}}, nil
 | 
				
			||||||
		},
 | 
							},
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	go r.Run(stopCh)
 | 
						doneCh := make(chan struct{})
 | 
				
			||||||
 | 
						go func() {
 | 
				
			||||||
 | 
							defer close(doneCh)
 | 
				
			||||||
 | 
							r.Run(stopCh)
 | 
				
			||||||
 | 
						}()
 | 
				
			||||||
	// Synchronously add a dummy pod into the watch channel so we
 | 
						// Synchronously add a dummy pod into the watch channel so we
 | 
				
			||||||
	// know the RunUntil go routine is in the watch handler.
 | 
						// know the RunUntil go routine is in the watch handler.
 | 
				
			||||||
	fw.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}})
 | 
						fw.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	close(stopCh)
 | 
						close(stopCh)
 | 
				
			||||||
 | 
						resultCh := fw.ResultChan()
 | 
				
			||||||
 | 
						for {
 | 
				
			||||||
		select {
 | 
							select {
 | 
				
			||||||
	case _, ok := <-fw.ResultChan():
 | 
							case <-doneCh:
 | 
				
			||||||
		if ok {
 | 
								if resultCh == nil {
 | 
				
			||||||
			t.Errorf("Watch channel left open after stopping the watch")
 | 
									return // both closed
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
 | 
								doneCh = nil
 | 
				
			||||||
 | 
							case _, ok := <-resultCh:
 | 
				
			||||||
 | 
								if ok {
 | 
				
			||||||
 | 
									t.Fatalf("Watch channel left open after stopping the watch")
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								if doneCh == nil {
 | 
				
			||||||
 | 
									return // both closed
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								resultCh = nil
 | 
				
			||||||
		case <-time.After(wait.ForeverTestTimeout):
 | 
							case <-time.After(wait.ForeverTestTimeout):
 | 
				
			||||||
		t.Errorf("the cancellation is at least %s late", wait.ForeverTestTimeout.String())
 | 
								t.Fatalf("the cancellation is at least %s late", wait.ForeverTestTimeout.String())
 | 
				
			||||||
		break
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -126,24 +143,59 @@ func TestReflectorResyncChan(t *testing.T) {
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// TestEstablishedWatchStoppedAfterStopCh ensures that
 | 
					// TestReflectorWatchStoppedBefore ensures that neither List nor Watch are
 | 
				
			||||||
// an established watch will be closed right after
 | 
					// called if the stop channel is closed before Reflector.watch is called.
 | 
				
			||||||
// the StopCh was also closed.
 | 
					func TestReflectorWatchStoppedBefore(t *testing.T) {
 | 
				
			||||||
func TestEstablishedWatchStoppedAfterStopCh(t *testing.T) {
 | 
						stopCh := make(chan struct{})
 | 
				
			||||||
	ctx, ctxCancel := context.WithCancel(context.TODO())
 | 
						close(stopCh)
 | 
				
			||||||
	ctxCancel()
 | 
					
 | 
				
			||||||
 | 
						lw := &ListWatch{
 | 
				
			||||||
 | 
							ListFunc: func(_ metav1.ListOptions) (runtime.Object, error) {
 | 
				
			||||||
 | 
								t.Fatal("ListFunc called unexpectedly")
 | 
				
			||||||
 | 
								return nil, nil
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							WatchFunc: func(_ metav1.ListOptions) (watch.Interface, error) {
 | 
				
			||||||
 | 
								// If WatchFunc is never called, the watcher it returns doesn't need to be stopped.
 | 
				
			||||||
 | 
								t.Fatal("WatchFunc called unexpectedly")
 | 
				
			||||||
 | 
								return nil, nil
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						target := NewReflector(lw, &v1.Pod{}, nil, 0)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						err := target.watch(nil, stopCh, nil)
 | 
				
			||||||
 | 
						require.NoError(t, err)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// TestReflectorWatchStoppedAfter ensures that neither the watcher is stopped if
 | 
				
			||||||
 | 
					// the stop channel is closed after Reflector.watch has started watching.
 | 
				
			||||||
 | 
					func TestReflectorWatchStoppedAfter(t *testing.T) {
 | 
				
			||||||
 | 
						stopCh := make(chan struct{})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						var watchers []*watch.FakeWatcher
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						lw := &ListWatch{
 | 
				
			||||||
 | 
							ListFunc: func(_ metav1.ListOptions) (runtime.Object, error) {
 | 
				
			||||||
 | 
								t.Fatal("ListFunc called unexpectedly")
 | 
				
			||||||
 | 
								return nil, nil
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							WatchFunc: func(_ metav1.ListOptions) (watch.Interface, error) {
 | 
				
			||||||
 | 
								// Simulate the stop channel being closed after watching has started
 | 
				
			||||||
 | 
								go func() {
 | 
				
			||||||
 | 
									time.Sleep(10 * time.Millisecond)
 | 
				
			||||||
 | 
									close(stopCh)
 | 
				
			||||||
 | 
								}()
 | 
				
			||||||
 | 
								// Use a fake watcher that never sends events
 | 
				
			||||||
			w := watch.NewFake()
 | 
								w := watch.NewFake()
 | 
				
			||||||
	require.False(t, w.IsStopped())
 | 
								watchers = append(watchers, w)
 | 
				
			||||||
 | 
								return w, nil
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						target := NewReflector(lw, &v1.Pod{}, nil, 0)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// w is stopped when the stopCh is closed
 | 
						err := target.watch(nil, stopCh, nil)
 | 
				
			||||||
	target := NewReflector(nil, &v1.Pod{}, nil, 0)
 | 
					 | 
				
			||||||
	err := target.watch(w, ctx.Done(), nil)
 | 
					 | 
				
			||||||
	require.NoError(t, err)
 | 
					 | 
				
			||||||
	require.True(t, w.IsStopped())
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	// noop when the w is nil and the ctx is closed
 | 
					 | 
				
			||||||
	err = target.watch(nil, ctx.Done(), nil)
 | 
					 | 
				
			||||||
	require.NoError(t, err)
 | 
						require.NoError(t, err)
 | 
				
			||||||
 | 
						require.Equal(t, 1, len(watchers))
 | 
				
			||||||
 | 
						require.True(t, watchers[0].IsStopped())
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func BenchmarkReflectorResyncChanMany(b *testing.B) {
 | 
					func BenchmarkReflectorResyncChanMany(b *testing.B) {
 | 
				
			||||||
@@ -158,22 +210,148 @@ func BenchmarkReflectorResyncChanMany(b *testing.B) {
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func TestReflectorWatchHandlerError(t *testing.T) {
 | 
					// TestReflectorHandleWatchStoppedBefore ensures that handleWatch stops when
 | 
				
			||||||
 | 
					// stopCh is already closed before handleWatch was called. It also ensures that
 | 
				
			||||||
 | 
					// ResultChan is only called once and that Stop is called after ResultChan.
 | 
				
			||||||
 | 
					func TestReflectorHandleWatchStoppedBefore(t *testing.T) {
 | 
				
			||||||
	s := NewStore(MetaNamespaceKeyFunc)
 | 
						s := NewStore(MetaNamespaceKeyFunc)
 | 
				
			||||||
	g := NewReflector(&testLW{}, &v1.Pod{}, s, 0)
 | 
						g := NewReflector(&testLW{}, &v1.Pod{}, s, 0)
 | 
				
			||||||
	fw := watch.NewFake()
 | 
						stopCh := make(chan struct{})
 | 
				
			||||||
 | 
						// Simulate the watch channel being closed before the watchHandler is called
 | 
				
			||||||
 | 
						close(stopCh)
 | 
				
			||||||
 | 
						var calls []string
 | 
				
			||||||
 | 
						resultCh := make(chan watch.Event)
 | 
				
			||||||
 | 
						fw := watch.MockWatcher{
 | 
				
			||||||
 | 
							StopFunc: func() {
 | 
				
			||||||
 | 
								calls = append(calls, "Stop")
 | 
				
			||||||
 | 
								close(resultCh)
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							ResultChanFunc: func() <-chan watch.Event {
 | 
				
			||||||
 | 
								calls = append(calls, "ResultChan")
 | 
				
			||||||
 | 
								return resultCh
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						err := watchHandler(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, nil, g.clock, nevererrc, stopCh)
 | 
				
			||||||
 | 
						if err == nil {
 | 
				
			||||||
 | 
							t.Errorf("unexpected non-error")
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						// Ensure the watcher methods are called exactly once in this exact order.
 | 
				
			||||||
 | 
						// TODO(karlkfi): Fix watchHandler to call Stop()
 | 
				
			||||||
 | 
						// assert.Equal(t, []string{"ResultChan", "Stop"}, calls)
 | 
				
			||||||
 | 
						assert.Equal(t, []string{"ResultChan"}, calls)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// TestReflectorHandleWatchStoppedAfter ensures that handleWatch stops when
 | 
				
			||||||
 | 
					// stopCh is closed after handleWatch was called. It also ensures that
 | 
				
			||||||
 | 
					// ResultChan is only called once and that Stop is called after ResultChan.
 | 
				
			||||||
 | 
					func TestReflectorHandleWatchStoppedAfter(t *testing.T) {
 | 
				
			||||||
 | 
						s := NewStore(MetaNamespaceKeyFunc)
 | 
				
			||||||
 | 
						g := NewReflector(&testLW{}, &v1.Pod{}, s, 0)
 | 
				
			||||||
 | 
						var calls []string
 | 
				
			||||||
 | 
						stopCh := make(chan struct{})
 | 
				
			||||||
 | 
						resultCh := make(chan watch.Event)
 | 
				
			||||||
 | 
						fw := watch.MockWatcher{
 | 
				
			||||||
 | 
							StopFunc: func() {
 | 
				
			||||||
 | 
								calls = append(calls, "Stop")
 | 
				
			||||||
 | 
								close(resultCh)
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							ResultChanFunc: func() <-chan watch.Event {
 | 
				
			||||||
 | 
								calls = append(calls, "ResultChan")
 | 
				
			||||||
 | 
								resultCh = make(chan watch.Event)
 | 
				
			||||||
 | 
								// Simulate the watch handler being stopped asynchronously by the
 | 
				
			||||||
 | 
								// caller, after watching has started.
 | 
				
			||||||
			go func() {
 | 
								go func() {
 | 
				
			||||||
		fw.Stop()
 | 
									time.Sleep(10 * time.Millisecond)
 | 
				
			||||||
 | 
									close(stopCh)
 | 
				
			||||||
			}()
 | 
								}()
 | 
				
			||||||
 | 
								return resultCh
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						err := watchHandler(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, nil, g.clock, nevererrc, stopCh)
 | 
				
			||||||
 | 
						if err == nil {
 | 
				
			||||||
 | 
							t.Errorf("unexpected non-error")
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						// Ensure the watcher methods are called exactly once in this exact order.
 | 
				
			||||||
 | 
						// TODO(karlkfi): Fix watchHandler to call Stop()
 | 
				
			||||||
 | 
						// assert.Equal(t, []string{"ResultChan", "Stop"}, calls)
 | 
				
			||||||
 | 
						assert.Equal(t, []string{"ResultChan"}, calls)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// TestReflectorHandleWatchResultChanClosedBefore ensures that handleWatch
 | 
				
			||||||
 | 
					// stops when the result channel is closed before handleWatch was called.
 | 
				
			||||||
 | 
					func TestReflectorHandleWatchResultChanClosedBefore(t *testing.T) {
 | 
				
			||||||
 | 
						s := NewStore(MetaNamespaceKeyFunc)
 | 
				
			||||||
 | 
						g := NewReflector(&testLW{}, &v1.Pod{}, s, 0)
 | 
				
			||||||
 | 
						var calls []string
 | 
				
			||||||
 | 
						resultCh := make(chan watch.Event)
 | 
				
			||||||
 | 
						fw := watch.MockWatcher{
 | 
				
			||||||
 | 
							StopFunc: func() {
 | 
				
			||||||
 | 
								calls = append(calls, "Stop")
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							ResultChanFunc: func() <-chan watch.Event {
 | 
				
			||||||
 | 
								calls = append(calls, "ResultChan")
 | 
				
			||||||
 | 
								return resultCh
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						// Simulate the result channel being closed by the producer before handleWatch is called.
 | 
				
			||||||
 | 
						close(resultCh)
 | 
				
			||||||
	err := watchHandler(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, nil, g.clock, nevererrc, wait.NeverStop)
 | 
						err := watchHandler(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, nil, g.clock, nevererrc, wait.NeverStop)
 | 
				
			||||||
	if err == nil {
 | 
						if err == nil {
 | 
				
			||||||
		t.Errorf("unexpected non-error")
 | 
							t.Errorf("unexpected non-error")
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
						// Ensure the watcher methods are called exactly once in this exact order.
 | 
				
			||||||
 | 
						// TODO(karlkfi): Fix watchHandler to call Stop()
 | 
				
			||||||
 | 
						// assert.Equal(t, []string{"ResultChan", "Stop"}, calls)
 | 
				
			||||||
 | 
						assert.Equal(t, []string{"ResultChan"}, calls)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// TestReflectorHandleWatchResultChanClosedAfter ensures that handleWatch
 | 
				
			||||||
 | 
					// stops when the result channel is closed after handleWatch has started watching.
 | 
				
			||||||
 | 
					func TestReflectorHandleWatchResultChanClosedAfter(t *testing.T) {
 | 
				
			||||||
 | 
						s := NewStore(MetaNamespaceKeyFunc)
 | 
				
			||||||
 | 
						g := NewReflector(&testLW{}, &v1.Pod{}, s, 0)
 | 
				
			||||||
 | 
						var calls []string
 | 
				
			||||||
 | 
						resultCh := make(chan watch.Event)
 | 
				
			||||||
 | 
						fw := watch.MockWatcher{
 | 
				
			||||||
 | 
							StopFunc: func() {
 | 
				
			||||||
 | 
								calls = append(calls, "Stop")
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							ResultChanFunc: func() <-chan watch.Event {
 | 
				
			||||||
 | 
								calls = append(calls, "ResultChan")
 | 
				
			||||||
 | 
								resultCh = make(chan watch.Event)
 | 
				
			||||||
 | 
								// Simulate the result channel being closed by the producer, after
 | 
				
			||||||
 | 
								// watching has started.
 | 
				
			||||||
 | 
								go func() {
 | 
				
			||||||
 | 
									time.Sleep(10 * time.Millisecond)
 | 
				
			||||||
 | 
									close(resultCh)
 | 
				
			||||||
 | 
								}()
 | 
				
			||||||
 | 
								return resultCh
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						err := watchHandler(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, nil, g.clock, nevererrc, wait.NeverStop)
 | 
				
			||||||
 | 
						if err == nil {
 | 
				
			||||||
 | 
							t.Errorf("unexpected non-error")
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						// Ensure the watcher methods are called exactly once in this exact order.
 | 
				
			||||||
 | 
						// TODO(karlkfi): Fix watchHandler to call Stop()
 | 
				
			||||||
 | 
						// assert.Equal(t, []string{"ResultChan", "Stop"}, calls)
 | 
				
			||||||
 | 
						assert.Equal(t, []string{"ResultChan"}, calls)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func TestReflectorWatchHandler(t *testing.T) {
 | 
					func TestReflectorWatchHandler(t *testing.T) {
 | 
				
			||||||
	s := NewStore(MetaNamespaceKeyFunc)
 | 
						s := NewStore(MetaNamespaceKeyFunc)
 | 
				
			||||||
	g := NewReflector(&testLW{}, &v1.Pod{}, s, 0)
 | 
						g := NewReflector(&testLW{}, &v1.Pod{}, s, 0)
 | 
				
			||||||
 | 
						// Wrap setLastSyncResourceVersion so we can tell the watchHandler to stop
 | 
				
			||||||
 | 
						// watching after all the events have been consumed. This avoids race
 | 
				
			||||||
 | 
						// conditions which can happen if the producer calls Stop(), instead of the
 | 
				
			||||||
 | 
						// consumer.
 | 
				
			||||||
 | 
						stopCh := make(chan struct{})
 | 
				
			||||||
 | 
						setLastSyncResourceVersion := func(rv string) {
 | 
				
			||||||
 | 
							g.setLastSyncResourceVersion(rv)
 | 
				
			||||||
 | 
							if rv == "32" {
 | 
				
			||||||
 | 
								close(stopCh)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
	fw := watch.NewFake()
 | 
						fw := watch.NewFake()
 | 
				
			||||||
	s.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
 | 
						s.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
 | 
				
			||||||
	s.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}})
 | 
						s.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}})
 | 
				
			||||||
@@ -184,8 +362,8 @@ func TestReflectorWatchHandler(t *testing.T) {
 | 
				
			|||||||
		fw.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "baz", ResourceVersion: "32"}})
 | 
							fw.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "baz", ResourceVersion: "32"}})
 | 
				
			||||||
		fw.Stop()
 | 
							fw.Stop()
 | 
				
			||||||
	}()
 | 
						}()
 | 
				
			||||||
	err := watchHandler(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, nil, g.clock, nevererrc, wait.NeverStop)
 | 
						err := watchHandler(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, setLastSyncResourceVersion, nil, g.clock, nevererrc, stopCh)
 | 
				
			||||||
	if err != nil {
 | 
						if !errors.Is(err, errorStopRequested) {
 | 
				
			||||||
		t.Errorf("unexpected error %v", err)
 | 
							t.Errorf("unexpected error %v", err)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -193,6 +371,7 @@ func TestReflectorWatchHandler(t *testing.T) {
 | 
				
			|||||||
		return &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: id, ResourceVersion: rv}}
 | 
							return &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: id, ResourceVersion: rv}}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// Validate that the Store was updated by the events
 | 
				
			||||||
	table := []struct {
 | 
						table := []struct {
 | 
				
			||||||
		Pod    *v1.Pod
 | 
							Pod    *v1.Pod
 | 
				
			||||||
		exists bool
 | 
							exists bool
 | 
				
			||||||
@@ -215,12 +394,7 @@ func TestReflectorWatchHandler(t *testing.T) {
 | 
				
			|||||||
		}
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// RV should send the last version we see.
 | 
						// Validate that setLastSyncResourceVersion was called with the RV from the last event.
 | 
				
			||||||
	if e, a := "32", g.LastSyncResourceVersion(); e != a {
 | 
					 | 
				
			||||||
		t.Errorf("expected %v, got %v", e, a)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	// last sync resource version should be the last version synced with store
 | 
					 | 
				
			||||||
	if e, a := "32", g.LastSyncResourceVersion(); e != a {
 | 
						if e, a := "32", g.LastSyncResourceVersion(); e != a {
 | 
				
			||||||
		t.Errorf("expected %v, got %v", e, a)
 | 
							t.Errorf("expected %v, got %v", e, a)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
@@ -230,8 +404,8 @@ func TestReflectorStopWatch(t *testing.T) {
 | 
				
			|||||||
	s := NewStore(MetaNamespaceKeyFunc)
 | 
						s := NewStore(MetaNamespaceKeyFunc)
 | 
				
			||||||
	g := NewReflector(&testLW{}, &v1.Pod{}, s, 0)
 | 
						g := NewReflector(&testLW{}, &v1.Pod{}, s, 0)
 | 
				
			||||||
	fw := watch.NewFake()
 | 
						fw := watch.NewFake()
 | 
				
			||||||
	stopWatch := make(chan struct{}, 1)
 | 
						stopWatch := make(chan struct{})
 | 
				
			||||||
	stopWatch <- struct{}{}
 | 
						close(stopWatch)
 | 
				
			||||||
	err := watchHandler(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, nil, g.clock, nevererrc, stopWatch)
 | 
						err := watchHandler(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, nil, g.clock, nevererrc, stopWatch)
 | 
				
			||||||
	if err != errorStopRequested {
 | 
						if err != errorStopRequested {
 | 
				
			||||||
		t.Errorf("expected stop error, got %q", err)
 | 
							t.Errorf("expected stop error, got %q", err)
 | 
				
			||||||
@@ -361,6 +535,7 @@ func TestReflectorListAndWatchWithErrors(t *testing.T) {
 | 
				
			|||||||
			}
 | 
								}
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		watchRet, watchErr := item.events, item.watchErr
 | 
							watchRet, watchErr := item.events, item.watchErr
 | 
				
			||||||
 | 
							stopCh := make(chan struct{})
 | 
				
			||||||
		lw := &testLW{
 | 
							lw := &testLW{
 | 
				
			||||||
			WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
 | 
								WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
 | 
				
			||||||
				if watchErr != nil {
 | 
									if watchErr != nil {
 | 
				
			||||||
@@ -372,7 +547,13 @@ func TestReflectorListAndWatchWithErrors(t *testing.T) {
 | 
				
			|||||||
					for _, e := range watchRet {
 | 
										for _, e := range watchRet {
 | 
				
			||||||
						fw.Action(e.Type, e.Object)
 | 
											fw.Action(e.Type, e.Object)
 | 
				
			||||||
					}
 | 
										}
 | 
				
			||||||
					fw.Stop()
 | 
										// Because FakeWatcher doesn't buffer events, it's safe to
 | 
				
			||||||
 | 
										// close the stop channel immediately without missing events.
 | 
				
			||||||
 | 
										// But usually, the event producer would instead close the
 | 
				
			||||||
 | 
										// result channel, and wait for the consumer to stop the
 | 
				
			||||||
 | 
										// watcher, to avoid race conditions.
 | 
				
			||||||
 | 
										// TODO: Fix the FakeWatcher to separate watcher.Stop from close(resultCh)
 | 
				
			||||||
 | 
										close(stopCh)
 | 
				
			||||||
				}()
 | 
									}()
 | 
				
			||||||
				return fw, nil
 | 
									return fw, nil
 | 
				
			||||||
			},
 | 
								},
 | 
				
			||||||
@@ -381,7 +562,16 @@ func TestReflectorListAndWatchWithErrors(t *testing.T) {
 | 
				
			|||||||
			},
 | 
								},
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		r := NewReflector(lw, &v1.Pod{}, s, 0)
 | 
							r := NewReflector(lw, &v1.Pod{}, s, 0)
 | 
				
			||||||
		r.ListAndWatch(wait.NeverStop)
 | 
							err := r.ListAndWatch(stopCh)
 | 
				
			||||||
 | 
							if item.listErr != nil && !errors.Is(err, item.listErr) {
 | 
				
			||||||
 | 
								t.Errorf("unexpected ListAndWatch error: %v", err)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							if item.watchErr != nil && !errors.Is(err, item.watchErr) {
 | 
				
			||||||
 | 
								t.Errorf("unexpected ListAndWatch error: %v", err)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							if item.listErr == nil && item.watchErr == nil {
 | 
				
			||||||
 | 
								assert.NoError(t, err)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user