Refactor Reflector ListAndWatch
- Extract watchWithResync to simplify ListAndWatch - Wrap watchHandler with two variants, one for WatchList and one for just Watch. - Replace a bool pointer arg with a bool arg and bool return, to improve readability. - Use errors.Is to satisfy the linter - Use %w to wrap the store.Replace error, to allow unwrapping.
This commit is contained in:
		| @@ -366,12 +366,7 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { | |||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	klog.V(2).Infof("Caches populated for %v from %s", r.typeDescription, r.name) | 	klog.V(2).Infof("Caches populated for %v from %s", r.typeDescription, r.name) | ||||||
|  | 	return r.watchWithResync(w, stopCh) | ||||||
| 	resyncerrc := make(chan error, 1) |  | ||||||
| 	cancelCh := make(chan struct{}) |  | ||||||
| 	defer close(cancelCh) |  | ||||||
| 	go r.startResync(stopCh, cancelCh, resyncerrc) |  | ||||||
| 	return r.watch(w, stopCh, resyncerrc) |  | ||||||
| } | } | ||||||
|  |  | ||||||
| // startResync periodically calls r.store.Resync() method. | // startResync periodically calls r.store.Resync() method. | ||||||
| @@ -402,6 +397,15 @@ func (r *Reflector) startResync(stopCh <-chan struct{}, cancelCh <-chan struct{} | |||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
|  | // watchWithResync runs watch with startResync in the background. | ||||||
|  | func (r *Reflector) watchWithResync(w watch.Interface, stopCh <-chan struct{}) error { | ||||||
|  | 	resyncerrc := make(chan error, 1) | ||||||
|  | 	cancelCh := make(chan struct{}) | ||||||
|  | 	defer close(cancelCh) | ||||||
|  | 	go r.startResync(stopCh, cancelCh, resyncerrc) | ||||||
|  | 	return r.watch(w, stopCh, resyncerrc) | ||||||
|  | } | ||||||
|  |  | ||||||
| // watch simply starts a watch request with the server. | // watch simply starts a watch request with the server. | ||||||
| func (r *Reflector) watch(w watch.Interface, stopCh <-chan struct{}, resyncerrc chan error) error { | func (r *Reflector) watch(w watch.Interface, stopCh <-chan struct{}, resyncerrc chan error) error { | ||||||
| 	var err error | 	var err error | ||||||
| @@ -451,13 +455,14 @@ func (r *Reflector) watch(w watch.Interface, stopCh <-chan struct{}, resyncerrc | |||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
|  |  | ||||||
| 		err = watchHandler(start, w, r.store, r.expectedType, r.expectedGVK, r.name, r.typeDescription, r.setLastSyncResourceVersion, nil, r.clock, resyncerrc, stopCh) | 		err = handleWatch(start, w, r.store, r.expectedType, r.expectedGVK, r.name, r.typeDescription, r.setLastSyncResourceVersion, | ||||||
|  | 			r.clock, resyncerrc, stopCh) | ||||||
| 		// Ensure that watch will not be reused across iterations. | 		// Ensure that watch will not be reused across iterations. | ||||||
| 		w.Stop() | 		w.Stop() | ||||||
| 		w = nil | 		w = nil | ||||||
| 		retry.After(err) | 		retry.After(err) | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			if err != errorStopRequested { | 			if !errors.Is(err, errorStopRequested) { | ||||||
| 				switch { | 				switch { | ||||||
| 				case isExpiredError(err): | 				case isExpiredError(err): | ||||||
| 					// Don't set LastSyncResourceVersionUnavailable - LIST call with ResourceVersion=RV already | 					// Don't set LastSyncResourceVersionUnavailable - LIST call with ResourceVersion=RV already | ||||||
| @@ -668,14 +673,12 @@ func (r *Reflector) watchList(stopCh <-chan struct{}) (watch.Interface, error) { | |||||||
| 			} | 			} | ||||||
| 			return nil, err | 			return nil, err | ||||||
| 		} | 		} | ||||||
| 		bookmarkReceived := pointer.Bool(false) | 		watchListBookmarkReceived, err := handleListWatch(start, w, temporaryStore, r.expectedType, r.expectedGVK, r.name, r.typeDescription, | ||||||
| 		err = watchHandler(start, w, temporaryStore, r.expectedType, r.expectedGVK, r.name, r.typeDescription, |  | ||||||
| 			func(rv string) { resourceVersion = rv }, | 			func(rv string) { resourceVersion = rv }, | ||||||
| 			bookmarkReceived, |  | ||||||
| 			r.clock, make(chan error), stopCh) | 			r.clock, make(chan error), stopCh) | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			w.Stop() // stop and retry with clean state | 			w.Stop() // stop and retry with clean state | ||||||
| 			if err == errorStopRequested { | 			if errors.Is(err, errorStopRequested) { | ||||||
| 				return nil, nil | 				return nil, nil | ||||||
| 			} | 			} | ||||||
| 			if isErrorRetriableWithSideEffectsFn(err) { | 			if isErrorRetriableWithSideEffectsFn(err) { | ||||||
| @@ -683,7 +686,7 @@ func (r *Reflector) watchList(stopCh <-chan struct{}) (watch.Interface, error) { | |||||||
| 			} | 			} | ||||||
| 			return nil, err | 			return nil, err | ||||||
| 		} | 		} | ||||||
| 		if *bookmarkReceived { | 		if watchListBookmarkReceived { | ||||||
| 			break | 			break | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| @@ -697,8 +700,8 @@ func (r *Reflector) watchList(stopCh <-chan struct{}) (watch.Interface, error) { | |||||||
| 	// component as soon as it finishes replacing the content. | 	// component as soon as it finishes replacing the content. | ||||||
| 	checkWatchListDataConsistencyIfRequested(wait.ContextForChannel(stopCh), r.name, resourceVersion, wrapListFuncWithContext(r.listerWatcher.List), temporaryStore.List) | 	checkWatchListDataConsistencyIfRequested(wait.ContextForChannel(stopCh), r.name, resourceVersion, wrapListFuncWithContext(r.listerWatcher.List), temporaryStore.List) | ||||||
|  |  | ||||||
| 	if err = r.store.Replace(temporaryStore.List(), resourceVersion); err != nil { | 	if err := r.store.Replace(temporaryStore.List(), resourceVersion); err != nil { | ||||||
| 		return nil, fmt.Errorf("unable to sync watch-list result: %v", err) | 		return nil, fmt.Errorf("unable to sync watch-list result: %w", err) | ||||||
| 	} | 	} | ||||||
| 	initTrace.Step("SyncWith done") | 	initTrace.Step("SyncWith done") | ||||||
| 	r.setLastSyncResourceVersion(resourceVersion) | 	r.setLastSyncResourceVersion(resourceVersion) | ||||||
| @@ -715,8 +718,12 @@ func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) err | |||||||
| 	return r.store.Replace(found, resourceVersion) | 	return r.store.Replace(found, resourceVersion) | ||||||
| } | } | ||||||
|  |  | ||||||
| // watchHandler watches w and sets setLastSyncResourceVersion | // handleListWatch consumes events from w, updates the Store, and records the | ||||||
| func watchHandler(start time.Time, | // last seen ResourceVersion, to allow continuing from that ResourceVersion on | ||||||
|  | // retry. If successful, the watcher will be left open after receiving the | ||||||
|  | // initial set of objects, to allow watching for future events. | ||||||
|  | func handleListWatch( | ||||||
|  | 	start time.Time, | ||||||
| 	w watch.Interface, | 	w watch.Interface, | ||||||
| 	store Store, | 	store Store, | ||||||
| 	expectedType reflect.Type, | 	expectedType reflect.Type, | ||||||
| @@ -724,33 +731,77 @@ func watchHandler(start time.Time, | |||||||
| 	name string, | 	name string, | ||||||
| 	expectedTypeName string, | 	expectedTypeName string, | ||||||
| 	setLastSyncResourceVersion func(string), | 	setLastSyncResourceVersion func(string), | ||||||
| 	exitOnInitialEventsEndBookmark *bool, |  | ||||||
| 	clock clock.Clock, | 	clock clock.Clock, | ||||||
| 	errc chan error, | 	errCh chan error, | ||||||
|  | 	stopCh <-chan struct{}, | ||||||
|  | ) (bool, error) { | ||||||
|  | 	exitOnWatchListBookmarkReceived := true | ||||||
|  | 	return handleAnyWatch(start, w, store, expectedType, expectedGVK, name, expectedTypeName, | ||||||
|  | 		setLastSyncResourceVersion, exitOnWatchListBookmarkReceived, clock, errCh, stopCh) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // handleListWatch consumes events from w, updates the Store, and records the | ||||||
|  | // last seen ResourceVersion, to allow continuing from that ResourceVersion on | ||||||
|  | // retry. The watcher will always be stopped on exit. | ||||||
|  | func handleWatch( | ||||||
|  | 	start time.Time, | ||||||
|  | 	w watch.Interface, | ||||||
|  | 	store Store, | ||||||
|  | 	expectedType reflect.Type, | ||||||
|  | 	expectedGVK *schema.GroupVersionKind, | ||||||
|  | 	name string, | ||||||
|  | 	expectedTypeName string, | ||||||
|  | 	setLastSyncResourceVersion func(string), | ||||||
|  | 	clock clock.Clock, | ||||||
|  | 	errCh chan error, | ||||||
| 	stopCh <-chan struct{}, | 	stopCh <-chan struct{}, | ||||||
| ) error { | ) error { | ||||||
| 	eventCount := 0 | 	exitOnWatchListBookmarkReceived := false | ||||||
| 	initialEventsEndBookmarkWarningTicker := newInitialEventsEndBookmarkTicker(name, clock, start, exitOnInitialEventsEndBookmark != nil) | 	_, err := handleAnyWatch(start, w, store, expectedType, expectedGVK, name, expectedTypeName, | ||||||
| 	defer initialEventsEndBookmarkWarningTicker.Stop() | 		setLastSyncResourceVersion, exitOnWatchListBookmarkReceived, clock, errCh, stopCh) | ||||||
| 	if exitOnInitialEventsEndBookmark != nil { | 	return err | ||||||
| 		// set it to false just in case somebody |  | ||||||
| 		// made it positive |  | ||||||
| 		*exitOnInitialEventsEndBookmark = false |  | ||||||
| } | } | ||||||
|  |  | ||||||
|  | // handleAnyWatch consumes events from w, updates the Store, and records the last | ||||||
|  | // seen ResourceVersion, to allow continuing from that ResourceVersion on retry. | ||||||
|  | // If exitOnWatchListBookmarkReceived is true, the watch events will be consumed | ||||||
|  | // until a bookmark event is received with the WatchList annotation present. | ||||||
|  | // Returns true (watchListBookmarkReceived) if the WatchList bookmark was | ||||||
|  | // received, even if exitOnWatchListBookmarkReceived is false. | ||||||
|  | // The watcher will always be stopped, unless exitOnWatchListBookmarkReceived is | ||||||
|  | // true and watchListBookmarkReceived is true. This allows the same watch stream | ||||||
|  | // to be re-used by the caller to continue watching for new events. | ||||||
|  | func handleAnyWatch(start time.Time, | ||||||
|  | 	w watch.Interface, | ||||||
|  | 	store Store, | ||||||
|  | 	expectedType reflect.Type, | ||||||
|  | 	expectedGVK *schema.GroupVersionKind, | ||||||
|  | 	name string, | ||||||
|  | 	expectedTypeName string, | ||||||
|  | 	setLastSyncResourceVersion func(string), | ||||||
|  | 	exitOnWatchListBookmarkReceived bool, | ||||||
|  | 	clock clock.Clock, | ||||||
|  | 	errCh chan error, | ||||||
|  | 	stopCh <-chan struct{}, | ||||||
|  | ) (bool, error) { | ||||||
|  | 	watchListBookmarkReceived := false | ||||||
|  | 	eventCount := 0 | ||||||
|  | 	initialEventsEndBookmarkWarningTicker := newInitialEventsEndBookmarkTicker(name, clock, start, exitOnWatchListBookmarkReceived) | ||||||
|  | 	defer initialEventsEndBookmarkWarningTicker.Stop() | ||||||
|  |  | ||||||
| loop: | loop: | ||||||
| 	for { | 	for { | ||||||
| 		select { | 		select { | ||||||
| 		case <-stopCh: | 		case <-stopCh: | ||||||
| 			return errorStopRequested | 			return watchListBookmarkReceived, errorStopRequested | ||||||
| 		case err := <-errc: | 		case err := <-errCh: | ||||||
| 			return err | 			return watchListBookmarkReceived, err | ||||||
| 		case event, ok := <-w.ResultChan(): | 		case event, ok := <-w.ResultChan(): | ||||||
| 			if !ok { | 			if !ok { | ||||||
| 				break loop | 				break loop | ||||||
| 			} | 			} | ||||||
| 			if event.Type == watch.Error { | 			if event.Type == watch.Error { | ||||||
| 				return apierrors.FromObject(event.Object) | 				return watchListBookmarkReceived, apierrors.FromObject(event.Object) | ||||||
| 			} | 			} | ||||||
| 			if expectedType != nil { | 			if expectedType != nil { | ||||||
| 				if e, a := expectedType, reflect.TypeOf(event.Object); e != a { | 				if e, a := expectedType, reflect.TypeOf(event.Object); e != a { | ||||||
| @@ -792,9 +843,7 @@ loop: | |||||||
| 			case watch.Bookmark: | 			case watch.Bookmark: | ||||||
| 				// A `Bookmark` means watch has synced here, just update the resourceVersion | 				// A `Bookmark` means watch has synced here, just update the resourceVersion | ||||||
| 				if meta.GetAnnotations()[metav1.InitialEventsAnnotationKey] == "true" { | 				if meta.GetAnnotations()[metav1.InitialEventsAnnotationKey] == "true" { | ||||||
| 					if exitOnInitialEventsEndBookmark != nil { | 					watchListBookmarkReceived = true | ||||||
| 						*exitOnInitialEventsEndBookmark = true |  | ||||||
| 					} |  | ||||||
| 				} | 				} | ||||||
| 			default: | 			default: | ||||||
| 				utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", name, event)) | 				utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", name, event)) | ||||||
| @@ -804,10 +853,10 @@ loop: | |||||||
| 				rvu.UpdateResourceVersion(resourceVersion) | 				rvu.UpdateResourceVersion(resourceVersion) | ||||||
| 			} | 			} | ||||||
| 			eventCount++ | 			eventCount++ | ||||||
| 			if exitOnInitialEventsEndBookmark != nil && *exitOnInitialEventsEndBookmark { | 			if exitOnWatchListBookmarkReceived && watchListBookmarkReceived { | ||||||
| 				watchDuration := clock.Since(start) | 				watchDuration := clock.Since(start) | ||||||
| 				klog.V(4).Infof("exiting %v Watch because received the bookmark that marks the end of initial events stream, total %v items received in %v", name, eventCount, watchDuration) | 				klog.V(4).Infof("exiting %v Watch because received the bookmark that marks the end of initial events stream, total %v items received in %v", name, eventCount, watchDuration) | ||||||
| 				return nil | 				return watchListBookmarkReceived, nil | ||||||
| 			} | 			} | ||||||
| 			initialEventsEndBookmarkWarningTicker.observeLastEventTimeStamp(clock.Now()) | 			initialEventsEndBookmarkWarningTicker.observeLastEventTimeStamp(clock.Now()) | ||||||
| 		case <-initialEventsEndBookmarkWarningTicker.C(): | 		case <-initialEventsEndBookmarkWarningTicker.C(): | ||||||
| @@ -817,10 +866,10 @@ loop: | |||||||
|  |  | ||||||
| 	watchDuration := clock.Since(start) | 	watchDuration := clock.Since(start) | ||||||
| 	if watchDuration < 1*time.Second && eventCount == 0 { | 	if watchDuration < 1*time.Second && eventCount == 0 { | ||||||
| 		return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", name) | 		return watchListBookmarkReceived, fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", name) | ||||||
| 	} | 	} | ||||||
| 	klog.V(4).Infof("%s: Watch close - %v total %v items received", name, expectedTypeName, eventCount) | 	klog.V(4).Infof("%s: Watch close - %v total %v items received", name, expectedTypeName, eventCount) | ||||||
| 	return nil | 	return watchListBookmarkReceived, nil | ||||||
| } | } | ||||||
|  |  | ||||||
| // LastSyncResourceVersion is the resource version observed when last sync with the underlying store | // LastSyncResourceVersion is the resource version observed when last sync with the underlying store | ||||||
| @@ -962,14 +1011,14 @@ type initialEventsEndBookmarkTicker struct { | |||||||
| // Note that the caller controls whether to call t.C() and t.Stop(). | // Note that the caller controls whether to call t.C() and t.Stop(). | ||||||
| // | // | ||||||
| // In practice, the reflector exits the watchHandler as soon as the bookmark event is received and calls the t.C() method. | // In practice, the reflector exits the watchHandler as soon as the bookmark event is received and calls the t.C() method. | ||||||
| func newInitialEventsEndBookmarkTicker(name string, c clock.Clock, watchStart time.Time, exitOnInitialEventsEndBookmarkRequested bool) *initialEventsEndBookmarkTicker { | func newInitialEventsEndBookmarkTicker(name string, c clock.Clock, watchStart time.Time, exitOnWatchListBookmarkReceived bool) *initialEventsEndBookmarkTicker { | ||||||
| 	return newInitialEventsEndBookmarkTickerInternal(name, c, watchStart, 10*time.Second, exitOnInitialEventsEndBookmarkRequested) | 	return newInitialEventsEndBookmarkTickerInternal(name, c, watchStart, 10*time.Second, exitOnWatchListBookmarkReceived) | ||||||
| } | } | ||||||
|  |  | ||||||
| func newInitialEventsEndBookmarkTickerInternal(name string, c clock.Clock, watchStart time.Time, tickInterval time.Duration, exitOnInitialEventsEndBookmarkRequested bool) *initialEventsEndBookmarkTicker { | func newInitialEventsEndBookmarkTickerInternal(name string, c clock.Clock, watchStart time.Time, tickInterval time.Duration, exitOnWatchListBookmarkReceived bool) *initialEventsEndBookmarkTicker { | ||||||
| 	clockWithTicker, ok := c.(clock.WithTicker) | 	clockWithTicker, ok := c.(clock.WithTicker) | ||||||
| 	if !ok || !exitOnInitialEventsEndBookmarkRequested { | 	if !ok || !exitOnWatchListBookmarkReceived { | ||||||
| 		if exitOnInitialEventsEndBookmarkRequested { | 		if exitOnWatchListBookmarkReceived { | ||||||
| 			klog.Warningf("clock does not support WithTicker interface but exitOnInitialEventsEndBookmark was requested") | 			klog.Warningf("clock does not support WithTicker interface but exitOnInitialEventsEndBookmark was requested") | ||||||
| 		} | 		} | ||||||
| 		return &initialEventsEndBookmarkTicker{ | 		return &initialEventsEndBookmarkTicker{ | ||||||
|   | |||||||
| @@ -231,7 +231,7 @@ func TestReflectorHandleWatchStoppedBefore(t *testing.T) { | |||||||
| 			return resultCh | 			return resultCh | ||||||
| 		}, | 		}, | ||||||
| 	} | 	} | ||||||
| 	err := watchHandler(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, nil, g.clock, nevererrc, stopCh) | 	err := handleWatch(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, g.clock, nevererrc, stopCh) | ||||||
| 	if err == nil { | 	if err == nil { | ||||||
| 		t.Errorf("unexpected non-error") | 		t.Errorf("unexpected non-error") | ||||||
| 	} | 	} | ||||||
| @@ -267,7 +267,7 @@ func TestReflectorHandleWatchStoppedAfter(t *testing.T) { | |||||||
| 			return resultCh | 			return resultCh | ||||||
| 		}, | 		}, | ||||||
| 	} | 	} | ||||||
| 	err := watchHandler(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, nil, g.clock, nevererrc, stopCh) | 	err := handleWatch(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, g.clock, nevererrc, stopCh) | ||||||
| 	if err == nil { | 	if err == nil { | ||||||
| 		t.Errorf("unexpected non-error") | 		t.Errorf("unexpected non-error") | ||||||
| 	} | 	} | ||||||
| @@ -295,7 +295,7 @@ func TestReflectorHandleWatchResultChanClosedBefore(t *testing.T) { | |||||||
| 	} | 	} | ||||||
| 	// Simulate the result channel being closed by the producer before handleWatch is called. | 	// Simulate the result channel being closed by the producer before handleWatch is called. | ||||||
| 	close(resultCh) | 	close(resultCh) | ||||||
| 	err := watchHandler(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, nil, g.clock, nevererrc, wait.NeverStop) | 	err := handleWatch(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, g.clock, nevererrc, wait.NeverStop) | ||||||
| 	if err == nil { | 	if err == nil { | ||||||
| 		t.Errorf("unexpected non-error") | 		t.Errorf("unexpected non-error") | ||||||
| 	} | 	} | ||||||
| @@ -328,7 +328,7 @@ func TestReflectorHandleWatchResultChanClosedAfter(t *testing.T) { | |||||||
| 			return resultCh | 			return resultCh | ||||||
| 		}, | 		}, | ||||||
| 	} | 	} | ||||||
| 	err := watchHandler(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, nil, g.clock, nevererrc, wait.NeverStop) | 	err := handleWatch(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, g.clock, nevererrc, wait.NeverStop) | ||||||
| 	if err == nil { | 	if err == nil { | ||||||
| 		t.Errorf("unexpected non-error") | 		t.Errorf("unexpected non-error") | ||||||
| 	} | 	} | ||||||
| @@ -362,8 +362,9 @@ func TestReflectorWatchHandler(t *testing.T) { | |||||||
| 		fw.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "baz", ResourceVersion: "32"}}) | 		fw.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "baz", ResourceVersion: "32"}}) | ||||||
| 		fw.Stop() | 		fw.Stop() | ||||||
| 	}() | 	}() | ||||||
| 	err := watchHandler(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, setLastSyncResourceVersion, nil, g.clock, nevererrc, stopCh) | 	err := handleWatch(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, setLastSyncResourceVersion, g.clock, nevererrc, stopCh) | ||||||
| 	if !errors.Is(err, errorStopRequested) { | 	// TODO(karlkfi): Fix FakeWatcher to avoid race condition between watcher.Stop() & close(stopCh) | ||||||
|  | 	if err != nil && !errors.Is(err, errorStopRequested) { | ||||||
| 		t.Errorf("unexpected error %v", err) | 		t.Errorf("unexpected error %v", err) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| @@ -406,7 +407,7 @@ func TestReflectorStopWatch(t *testing.T) { | |||||||
| 	fw := watch.NewFake() | 	fw := watch.NewFake() | ||||||
| 	stopWatch := make(chan struct{}) | 	stopWatch := make(chan struct{}) | ||||||
| 	close(stopWatch) | 	close(stopWatch) | ||||||
| 	err := watchHandler(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, nil, g.clock, nevererrc, stopWatch) | 	err := handleWatch(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, g.clock, nevererrc, stopWatch) | ||||||
| 	if err != errorStopRequested { | 	if err != errorStopRequested { | ||||||
| 		t.Errorf("expected stop error, got %q", err) | 		t.Errorf("expected stop error, got %q", err) | ||||||
| 	} | 	} | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user
	 Karl Isenberg
					Karl Isenberg