client-go/consistency-detector: change the signature of checkWatchListConsistencyIfRequested
the signature of the method was tightly connected to the reflector, making it difficult to use for anything other than a reflector. this simple refactor makes the method more generic.
This commit is contained in:
		| @@ -695,7 +695,7 @@ func (r *Reflector) watchList(stopCh <-chan struct{}) (watch.Interface, error) { | ||||
| 	// we utilize the temporaryStore to ensure independence from the current store implementation. | ||||
| 	// as of today, the store is implemented as a queue and will be drained by the higher-level | ||||
| 	// component as soon as it finishes replacing the content. | ||||
| 	checkWatchListConsistencyIfRequested(stopCh, r.name, resourceVersion, r.listerWatcher, temporaryStore) | ||||
| 	checkWatchListDataConsistencyIfRequested(wait.ContextForChannel(stopCh), r.name, resourceVersion, wrapListFuncWithContext(r.listerWatcher.List), temporaryStore.List) | ||||
|  | ||||
| 	if err = r.store.Replace(temporaryStore.List(), resourceVersion); err != nil { | ||||
| 		return nil, fmt.Errorf("unable to sync watch-list result: %v", err) | ||||
| @@ -933,6 +933,13 @@ func isWatchErrorRetriable(err error) bool { | ||||
| 	return false | ||||
| } | ||||
|  | ||||
| // wrapListFuncWithContext simply wraps ListFunction into another function that accepts a context and ignores it. | ||||
| func wrapListFuncWithContext(listFn ListFunc) func(ctx context.Context, options metav1.ListOptions) (runtime.Object, error) { | ||||
| 	return func(_ context.Context, options metav1.ListOptions) (runtime.Object, error) { | ||||
| 		return listFn(options) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // initialEventsEndBookmarkTicker a ticker that produces a warning if the bookmark event | ||||
| // which marks the end of the watch stream, has not been received within the defined tick interval. | ||||
| // | ||||
|   | ||||
| @@ -18,6 +18,7 @@ package cache | ||||
|  | ||||
| import ( | ||||
| 	"context" | ||||
| 	"fmt" | ||||
| 	"os" | ||||
| 	"sort" | ||||
| 	"strconv" | ||||
| @@ -32,42 +33,46 @@ import ( | ||||
| 	"k8s.io/klog/v2" | ||||
| ) | ||||
|  | ||||
| var dataConsistencyDetectionEnabled = false | ||||
| var dataConsistencyDetectionForWatchListEnabled = false | ||||
|  | ||||
| func init() { | ||||
| 	dataConsistencyDetectionEnabled, _ = strconv.ParseBool(os.Getenv("KUBE_WATCHLIST_INCONSISTENCY_DETECTOR")) | ||||
| 	dataConsistencyDetectionForWatchListEnabled, _ = strconv.ParseBool(os.Getenv("KUBE_WATCHLIST_INCONSISTENCY_DETECTOR")) | ||||
| } | ||||
|  | ||||
| // checkWatchListConsistencyIfRequested performs a data consistency check only when | ||||
| type retrieveItemsFunc[U any] func() []U | ||||
|  | ||||
| type listFunc[T runtime.Object] func(ctx context.Context, options metav1.ListOptions) (T, error) | ||||
|  | ||||
| // checkWatchListDataConsistencyIfRequested performs a data consistency check only when | ||||
| // the KUBE_WATCHLIST_INCONSISTENCY_DETECTOR environment variable was set during a binary startup. | ||||
| // | ||||
| // The consistency check is meant to be enforced only in the CI, not in production. | ||||
| // The check ensures that data retrieved by the watch-list api call | ||||
| // is exactly the same as data received by the standard list api call. | ||||
| // is exactly the same as data received by the standard list api call against etcd. | ||||
| // | ||||
| // Note that this function will panic when data inconsistency is detected. | ||||
| // This is intentional because we want to catch it in the CI. | ||||
| func checkWatchListConsistencyIfRequested(stopCh <-chan struct{}, identity string, lastSyncedResourceVersion string, listerWatcher Lister, store Store) { | ||||
| 	if !dataConsistencyDetectionEnabled { | ||||
| func checkWatchListDataConsistencyIfRequested[T runtime.Object, U any](ctx context.Context, identity string, lastSyncedResourceVersion string, listFn listFunc[T], retrieveItemsFn retrieveItemsFunc[U]) { | ||||
| 	if !dataConsistencyDetectionForWatchListEnabled { | ||||
| 		return | ||||
| 	} | ||||
| 	checkWatchListConsistency(stopCh, identity, lastSyncedResourceVersion, listerWatcher, store) | ||||
| 	// for informers we pass an empty ListOptions because | ||||
| 	// listFn might be wrapped for filtering during informer construction. | ||||
| 	checkDataConsistency(ctx, identity, lastSyncedResourceVersion, listFn, metav1.ListOptions{}, retrieveItemsFn) | ||||
| } | ||||
|  | ||||
| // checkWatchListConsistency exists solely for testing purposes. | ||||
| // we cannot use checkWatchListConsistencyIfRequested because | ||||
| // checkDataConsistency exists solely for testing purposes. | ||||
| // we cannot use checkWatchListDataConsistencyIfRequested because | ||||
| // it is guarded by an environmental variable. | ||||
| // we cannot manipulate the environmental variable because | ||||
| // it will affect other tests in this package. | ||||
| func checkWatchListConsistency(stopCh <-chan struct{}, identity string, lastSyncedResourceVersion string, listerWatcher Lister, store Store) { | ||||
| 	klog.Warningf("%s: data consistency check for the watch-list feature is enabled, this will result in an additional call to the API server.", identity) | ||||
| 	opts := metav1.ListOptions{ | ||||
| 		ResourceVersion:      lastSyncedResourceVersion, | ||||
| 		ResourceVersionMatch: metav1.ResourceVersionMatchExact, | ||||
| 	} | ||||
| func checkDataConsistency[T runtime.Object, U any](ctx context.Context, identity string, lastSyncedResourceVersion string, listFn listFunc[T], listOptions metav1.ListOptions, retrieveItemsFn retrieveItemsFunc[U]) { | ||||
| 	klog.Warningf("data consistency check for %s is enabled, this will result in an additional call to the API server.", identity) | ||||
| 	listOptions.ResourceVersion = lastSyncedResourceVersion | ||||
| 	listOptions.ResourceVersionMatch = metav1.ResourceVersionMatchExact | ||||
| 	var list runtime.Object | ||||
| 	err := wait.PollUntilContextCancel(wait.ContextForChannel(stopCh), time.Second, true, func(_ context.Context) (done bool, err error) { | ||||
| 		list, err = listerWatcher.List(opts) | ||||
| 	err := wait.PollUntilContextCancel(ctx, time.Second, true, func(_ context.Context) (done bool, err error) { | ||||
| 		list, err = listFn(ctx, listOptions) | ||||
| 		if err != nil { | ||||
| 			// the consistency check will only be enabled in the CI | ||||
| 			// and LIST calls in general will be retired by the client-go library | ||||
| @@ -78,7 +83,7 @@ func checkWatchListConsistency(stopCh <-chan struct{}, identity string, lastSync | ||||
| 		return true, nil | ||||
| 	}) | ||||
| 	if err != nil { | ||||
| 		klog.Errorf("failed to list data from the server, the watch-list consistency check won't be performed, stopCh was closed, err: %v", err) | ||||
| 		klog.Errorf("failed to list data from the server, the data consistency check for %s won't be performed, stopCh was closed, err: %v", identity, err) | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| @@ -88,14 +93,14 @@ func checkWatchListConsistency(stopCh <-chan struct{}, identity string, lastSync | ||||
| 	} | ||||
|  | ||||
| 	listItems := toMetaObjectSliceOrDie(rawListItems) | ||||
| 	storeItems := toMetaObjectSliceOrDie(store.List()) | ||||
| 	retrievedItems := toMetaObjectSliceOrDie(retrieveItemsFn()) | ||||
|  | ||||
| 	sort.Sort(byUID(listItems)) | ||||
| 	sort.Sort(byUID(storeItems)) | ||||
| 	sort.Sort(byUID(retrievedItems)) | ||||
|  | ||||
| 	if !cmp.Equal(listItems, storeItems) { | ||||
| 		klog.Infof("%s: data received by the new watch-list api call is different than received by the standard list api call, diff: %v", identity, cmp.Diff(listItems, storeItems)) | ||||
| 		msg := "data inconsistency detected for the watch-list feature, panicking!" | ||||
| 	if !cmp.Equal(listItems, retrievedItems) { | ||||
| 		klog.Infof("previously received data for %s is different than received by the standard list api call against etcd, diff: %v", identity, cmp.Diff(listItems, retrievedItems)) | ||||
| 		msg := fmt.Sprintf("data inconsistency detected for %s, panicking!", identity) | ||||
| 		panic(msg) | ||||
| 	} | ||||
| } | ||||
|   | ||||
| @@ -17,6 +17,7 @@ limitations under the License. | ||||
| package cache | ||||
|  | ||||
| import ( | ||||
| 	"context" | ||||
| 	"fmt" | ||||
| 	"testing" | ||||
|  | ||||
| @@ -25,62 +26,71 @@ import ( | ||||
| 	v1 "k8s.io/api/core/v1" | ||||
| 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||||
| 	"k8s.io/apimachinery/pkg/runtime" | ||||
| 	"k8s.io/apimachinery/pkg/util/wait" | ||||
| 	"k8s.io/apimachinery/pkg/watch" | ||||
| 	"k8s.io/utils/ptr" | ||||
| ) | ||||
|  | ||||
| func TestWatchListConsistency(t *testing.T) { | ||||
| func TestDataConsistencyChecker(t *testing.T) { | ||||
| 	scenarios := []struct { | ||||
| 		name string | ||||
|  | ||||
| 		podList      *v1.PodList | ||||
| 		storeContent []*v1.Pod | ||||
| 		podList        *v1.PodList | ||||
| 		storeContent   []*v1.Pod | ||||
| 		requestOptions metav1.ListOptions | ||||
|  | ||||
| 		expectedRequestOptions []metav1.ListOptions | ||||
| 		expectedListRequests   int | ||||
| 		expectPanic            bool | ||||
| 	}{ | ||||
| 		{ | ||||
| 			name: "watchlist consistency check won't panic when data is consistent", | ||||
| 			name: "data consistency check won't panic when data is consistent", | ||||
| 			podList: &v1.PodList{ | ||||
| 				ListMeta: metav1.ListMeta{ResourceVersion: "2"}, | ||||
| 				Items:    []v1.Pod{*makePod("p1", "1"), *makePod("p2", "2")}, | ||||
| 			}, | ||||
| 			requestOptions:       metav1.ListOptions{TimeoutSeconds: ptr.To(int64(39))}, | ||||
| 			storeContent:         []*v1.Pod{makePod("p1", "1"), makePod("p2", "2")}, | ||||
| 			expectedListRequests: 1, | ||||
| 			expectedRequestOptions: []metav1.ListOptions{ | ||||
| 				{ | ||||
| 					ResourceVersion:      "2", | ||||
| 					ResourceVersionMatch: metav1.ResourceVersionMatchExact, | ||||
| 					TimeoutSeconds:       ptr.To(int64(39)), | ||||
| 				}, | ||||
| 			}, | ||||
| 		}, | ||||
|  | ||||
| 		{ | ||||
| 			name: "watchlist consistency check won't panic when there is no data", | ||||
| 			name: "data consistency check won't panic when there is no data", | ||||
| 			podList: &v1.PodList{ | ||||
| 				ListMeta: metav1.ListMeta{ResourceVersion: "2"}, | ||||
| 			}, | ||||
| 			requestOptions:       metav1.ListOptions{TimeoutSeconds: ptr.To(int64(39))}, | ||||
| 			expectedListRequests: 1, | ||||
| 			expectedRequestOptions: []metav1.ListOptions{ | ||||
| 				{ | ||||
| 					ResourceVersion:      "2", | ||||
| 					ResourceVersionMatch: metav1.ResourceVersionMatchExact, | ||||
| 					TimeoutSeconds:       ptr.To(int64(39)), | ||||
| 				}, | ||||
| 			}, | ||||
| 		}, | ||||
|  | ||||
| 		{ | ||||
| 			name: "watchlist consistency panics when data is inconsistent", | ||||
| 			name: "data consistency panics when data is inconsistent", | ||||
| 			podList: &v1.PodList{ | ||||
| 				ListMeta: metav1.ListMeta{ResourceVersion: "2"}, | ||||
| 				Items:    []v1.Pod{*makePod("p1", "1"), *makePod("p2", "2"), *makePod("p3", "3")}, | ||||
| 			}, | ||||
| 			requestOptions:       metav1.ListOptions{TimeoutSeconds: ptr.To(int64(39))}, | ||||
| 			storeContent:         []*v1.Pod{makePod("p1", "1"), makePod("p2", "2")}, | ||||
| 			expectedListRequests: 1, | ||||
| 			expectedRequestOptions: []metav1.ListOptions{ | ||||
| 				{ | ||||
| 					ResourceVersion:      "2", | ||||
| 					ResourceVersionMatch: metav1.ResourceVersionMatchExact, | ||||
| 					TimeoutSeconds:       ptr.To(int64(39)), | ||||
| 				}, | ||||
| 			}, | ||||
| 			expectPanic: true, | ||||
| @@ -90,15 +100,18 @@ func TestWatchListConsistency(t *testing.T) { | ||||
| 	for _, scenario := range scenarios { | ||||
| 		t.Run(scenario.name, func(t *testing.T) { | ||||
| 			listWatcher, store, _, stopCh := testData() | ||||
| 			ctx := wait.ContextForChannel(stopCh) | ||||
| 			for _, obj := range scenario.storeContent { | ||||
| 				require.NoError(t, store.Add(obj)) | ||||
| 			} | ||||
| 			listWatcher.customListResponse = scenario.podList | ||||
|  | ||||
| 			if scenario.expectPanic { | ||||
| 				require.Panics(t, func() { checkWatchListConsistency(stopCh, "", scenario.podList.ResourceVersion, listWatcher, store) }) | ||||
| 				require.Panics(t, func() { | ||||
| 					checkDataConsistency(ctx, "", scenario.podList.ResourceVersion, wrapListFuncWithContext(listWatcher.List), scenario.requestOptions, store.List) | ||||
| 				}) | ||||
| 			} else { | ||||
| 				checkWatchListConsistency(stopCh, "", scenario.podList.ResourceVersion, listWatcher, store) | ||||
| 				checkDataConsistency(ctx, "", scenario.podList.ResourceVersion, wrapListFuncWithContext(listWatcher.List), scenario.requestOptions, store.List) | ||||
| 			} | ||||
|  | ||||
| 			verifyListCounter(t, listWatcher, scenario.expectedListRequests) | ||||
| @@ -108,20 +121,18 @@ func TestWatchListConsistency(t *testing.T) { | ||||
| } | ||||
|  | ||||
| func TestDriveWatchLisConsistencyIfRequired(t *testing.T) { | ||||
| 	stopCh := make(chan struct{}) | ||||
| 	defer close(stopCh) | ||||
| 	checkWatchListConsistencyIfRequested(stopCh, "", "", nil, nil) | ||||
| 	ctx := context.TODO() | ||||
| 	checkWatchListDataConsistencyIfRequested[runtime.Object, runtime.Object](ctx, "", "", nil, nil) | ||||
| } | ||||
|  | ||||
| func TestWatchListConsistencyRetry(t *testing.T) { | ||||
| func TestDataConsistencyCheckerRetry(t *testing.T) { | ||||
| 	store := NewStore(MetaNamespaceKeyFunc) | ||||
| 	stopCh := make(chan struct{}) | ||||
| 	defer close(stopCh) | ||||
| 	ctx := context.TODO() | ||||
|  | ||||
| 	stopListErrorAfter := 5 | ||||
| 	errLister := &errorLister{stopErrorAfter: stopListErrorAfter} | ||||
|  | ||||
| 	checkWatchListConsistency(stopCh, "", "", errLister, store) | ||||
| 	checkDataConsistency(ctx, "", "", wrapListFuncWithContext(errLister.List), metav1.ListOptions{}, store.List) | ||||
| 	require.Equal(t, errLister.listCounter, errLister.stopErrorAfter) | ||||
| } | ||||
|  | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 Lukasz Szaszkiewicz
					Lukasz Szaszkiewicz