Merge pull request #124446 from p0lyn0mial/watch-list-consistency-detector-more-generic
client-go/consistency-detector: change the signature of checkWatchListConsistencyIfRequested
This commit is contained in:
		| @@ -695,7 +695,7 @@ func (r *Reflector) watchList(stopCh <-chan struct{}) (watch.Interface, error) { | |||||||
| 	// we utilize the temporaryStore to ensure independence from the current store implementation. | 	// we utilize the temporaryStore to ensure independence from the current store implementation. | ||||||
| 	// as of today, the store is implemented as a queue and will be drained by the higher-level | 	// as of today, the store is implemented as a queue and will be drained by the higher-level | ||||||
| 	// component as soon as it finishes replacing the content. | 	// component as soon as it finishes replacing the content. | ||||||
| 	checkWatchListConsistencyIfRequested(stopCh, r.name, resourceVersion, r.listerWatcher, temporaryStore) | 	checkWatchListDataConsistencyIfRequested(wait.ContextForChannel(stopCh), r.name, resourceVersion, wrapListFuncWithContext(r.listerWatcher.List), temporaryStore.List) | ||||||
|  |  | ||||||
| 	if err = r.store.Replace(temporaryStore.List(), resourceVersion); err != nil { | 	if err = r.store.Replace(temporaryStore.List(), resourceVersion); err != nil { | ||||||
| 		return nil, fmt.Errorf("unable to sync watch-list result: %v", err) | 		return nil, fmt.Errorf("unable to sync watch-list result: %v", err) | ||||||
| @@ -933,6 +933,13 @@ func isWatchErrorRetriable(err error) bool { | |||||||
| 	return false | 	return false | ||||||
| } | } | ||||||
|  |  | ||||||
|  | // wrapListFuncWithContext simply wraps ListFunction into another function that accepts a context and ignores it. | ||||||
|  | func wrapListFuncWithContext(listFn ListFunc) func(ctx context.Context, options metav1.ListOptions) (runtime.Object, error) { | ||||||
|  | 	return func(_ context.Context, options metav1.ListOptions) (runtime.Object, error) { | ||||||
|  | 		return listFn(options) | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
| // initialEventsEndBookmarkTicker a ticker that produces a warning if the bookmark event | // initialEventsEndBookmarkTicker a ticker that produces a warning if the bookmark event | ||||||
| // which marks the end of the watch stream, has not been received within the defined tick interval. | // which marks the end of the watch stream, has not been received within the defined tick interval. | ||||||
| // | // | ||||||
|   | |||||||
| @@ -18,6 +18,7 @@ package cache | |||||||
|  |  | ||||||
| import ( | import ( | ||||||
| 	"context" | 	"context" | ||||||
|  | 	"fmt" | ||||||
| 	"os" | 	"os" | ||||||
| 	"sort" | 	"sort" | ||||||
| 	"strconv" | 	"strconv" | ||||||
| @@ -32,42 +33,46 @@ import ( | |||||||
| 	"k8s.io/klog/v2" | 	"k8s.io/klog/v2" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| var dataConsistencyDetectionEnabled = false | var dataConsistencyDetectionForWatchListEnabled = false | ||||||
|  |  | ||||||
| func init() { | func init() { | ||||||
| 	dataConsistencyDetectionEnabled, _ = strconv.ParseBool(os.Getenv("KUBE_WATCHLIST_INCONSISTENCY_DETECTOR")) | 	dataConsistencyDetectionForWatchListEnabled, _ = strconv.ParseBool(os.Getenv("KUBE_WATCHLIST_INCONSISTENCY_DETECTOR")) | ||||||
| } | } | ||||||
|  |  | ||||||
| // checkWatchListConsistencyIfRequested performs a data consistency check only when | type retrieveItemsFunc[U any] func() []U | ||||||
|  |  | ||||||
|  | type listFunc[T runtime.Object] func(ctx context.Context, options metav1.ListOptions) (T, error) | ||||||
|  |  | ||||||
|  | // checkWatchListDataConsistencyIfRequested performs a data consistency check only when | ||||||
| // the KUBE_WATCHLIST_INCONSISTENCY_DETECTOR environment variable was set during a binary startup. | // the KUBE_WATCHLIST_INCONSISTENCY_DETECTOR environment variable was set during a binary startup. | ||||||
| // | // | ||||||
| // The consistency check is meant to be enforced only in the CI, not in production. | // The consistency check is meant to be enforced only in the CI, not in production. | ||||||
| // The check ensures that data retrieved by the watch-list api call | // The check ensures that data retrieved by the watch-list api call | ||||||
| // is exactly the same as data received by the standard list api call. | // is exactly the same as data received by the standard list api call against etcd. | ||||||
| // | // | ||||||
| // Note that this function will panic when data inconsistency is detected. | // Note that this function will panic when data inconsistency is detected. | ||||||
| // This is intentional because we want to catch it in the CI. | // This is intentional because we want to catch it in the CI. | ||||||
| func checkWatchListConsistencyIfRequested(stopCh <-chan struct{}, identity string, lastSyncedResourceVersion string, listerWatcher Lister, store Store) { | func checkWatchListDataConsistencyIfRequested[T runtime.Object, U any](ctx context.Context, identity string, lastSyncedResourceVersion string, listFn listFunc[T], retrieveItemsFn retrieveItemsFunc[U]) { | ||||||
| 	if !dataConsistencyDetectionEnabled { | 	if !dataConsistencyDetectionForWatchListEnabled { | ||||||
| 		return | 		return | ||||||
| 	} | 	} | ||||||
| 	checkWatchListConsistency(stopCh, identity, lastSyncedResourceVersion, listerWatcher, store) | 	// for informers we pass an empty ListOptions because | ||||||
|  | 	// listFn might be wrapped for filtering during informer construction. | ||||||
|  | 	checkDataConsistency(ctx, identity, lastSyncedResourceVersion, listFn, metav1.ListOptions{}, retrieveItemsFn) | ||||||
| } | } | ||||||
|  |  | ||||||
| // checkWatchListConsistency exists solely for testing purposes. | // checkDataConsistency exists solely for testing purposes. | ||||||
| // we cannot use checkWatchListConsistencyIfRequested because | // we cannot use checkWatchListDataConsistencyIfRequested because | ||||||
| // it is guarded by an environmental variable. | // it is guarded by an environmental variable. | ||||||
| // we cannot manipulate the environmental variable because | // we cannot manipulate the environmental variable because | ||||||
| // it will affect other tests in this package. | // it will affect other tests in this package. | ||||||
| func checkWatchListConsistency(stopCh <-chan struct{}, identity string, lastSyncedResourceVersion string, listerWatcher Lister, store Store) { | func checkDataConsistency[T runtime.Object, U any](ctx context.Context, identity string, lastSyncedResourceVersion string, listFn listFunc[T], listOptions metav1.ListOptions, retrieveItemsFn retrieveItemsFunc[U]) { | ||||||
| 	klog.Warningf("%s: data consistency check for the watch-list feature is enabled, this will result in an additional call to the API server.", identity) | 	klog.Warningf("data consistency check for %s is enabled, this will result in an additional call to the API server.", identity) | ||||||
| 	opts := metav1.ListOptions{ | 	listOptions.ResourceVersion = lastSyncedResourceVersion | ||||||
| 		ResourceVersion:      lastSyncedResourceVersion, | 	listOptions.ResourceVersionMatch = metav1.ResourceVersionMatchExact | ||||||
| 		ResourceVersionMatch: metav1.ResourceVersionMatchExact, |  | ||||||
| 	} |  | ||||||
| 	var list runtime.Object | 	var list runtime.Object | ||||||
| 	err := wait.PollUntilContextCancel(wait.ContextForChannel(stopCh), time.Second, true, func(_ context.Context) (done bool, err error) { | 	err := wait.PollUntilContextCancel(ctx, time.Second, true, func(_ context.Context) (done bool, err error) { | ||||||
| 		list, err = listerWatcher.List(opts) | 		list, err = listFn(ctx, listOptions) | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			// the consistency check will only be enabled in the CI | 			// the consistency check will only be enabled in the CI | ||||||
| 			// and LIST calls in general will be retired by the client-go library | 			// and LIST calls in general will be retired by the client-go library | ||||||
| @@ -78,7 +83,7 @@ func checkWatchListConsistency(stopCh <-chan struct{}, identity string, lastSync | |||||||
| 		return true, nil | 		return true, nil | ||||||
| 	}) | 	}) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		klog.Errorf("failed to list data from the server, the watch-list consistency check won't be performed, stopCh was closed, err: %v", err) | 		klog.Errorf("failed to list data from the server, the data consistency check for %s won't be performed, stopCh was closed, err: %v", identity, err) | ||||||
| 		return | 		return | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| @@ -88,14 +93,14 @@ func checkWatchListConsistency(stopCh <-chan struct{}, identity string, lastSync | |||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	listItems := toMetaObjectSliceOrDie(rawListItems) | 	listItems := toMetaObjectSliceOrDie(rawListItems) | ||||||
| 	storeItems := toMetaObjectSliceOrDie(store.List()) | 	retrievedItems := toMetaObjectSliceOrDie(retrieveItemsFn()) | ||||||
|  |  | ||||||
| 	sort.Sort(byUID(listItems)) | 	sort.Sort(byUID(listItems)) | ||||||
| 	sort.Sort(byUID(storeItems)) | 	sort.Sort(byUID(retrievedItems)) | ||||||
|  |  | ||||||
| 	if !cmp.Equal(listItems, storeItems) { | 	if !cmp.Equal(listItems, retrievedItems) { | ||||||
| 		klog.Infof("%s: data received by the new watch-list api call is different than received by the standard list api call, diff: %v", identity, cmp.Diff(listItems, storeItems)) | 		klog.Infof("previously received data for %s is different than received by the standard list api call against etcd, diff: %v", identity, cmp.Diff(listItems, retrievedItems)) | ||||||
| 		msg := "data inconsistency detected for the watch-list feature, panicking!" | 		msg := fmt.Sprintf("data inconsistency detected for %s, panicking!", identity) | ||||||
| 		panic(msg) | 		panic(msg) | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|   | |||||||
| @@ -17,6 +17,7 @@ limitations under the License. | |||||||
| package cache | package cache | ||||||
|  |  | ||||||
| import ( | import ( | ||||||
|  | 	"context" | ||||||
| 	"fmt" | 	"fmt" | ||||||
| 	"testing" | 	"testing" | ||||||
|  |  | ||||||
| @@ -25,62 +26,71 @@ import ( | |||||||
| 	v1 "k8s.io/api/core/v1" | 	v1 "k8s.io/api/core/v1" | ||||||
| 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||||||
| 	"k8s.io/apimachinery/pkg/runtime" | 	"k8s.io/apimachinery/pkg/runtime" | ||||||
|  | 	"k8s.io/apimachinery/pkg/util/wait" | ||||||
| 	"k8s.io/apimachinery/pkg/watch" | 	"k8s.io/apimachinery/pkg/watch" | ||||||
|  | 	"k8s.io/utils/ptr" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| func TestWatchListConsistency(t *testing.T) { | func TestDataConsistencyChecker(t *testing.T) { | ||||||
| 	scenarios := []struct { | 	scenarios := []struct { | ||||||
| 		name string | 		name string | ||||||
|  |  | ||||||
| 		podList        *v1.PodList | 		podList        *v1.PodList | ||||||
| 		storeContent   []*v1.Pod | 		storeContent   []*v1.Pod | ||||||
|  | 		requestOptions metav1.ListOptions | ||||||
|  |  | ||||||
| 		expectedRequestOptions []metav1.ListOptions | 		expectedRequestOptions []metav1.ListOptions | ||||||
| 		expectedListRequests   int | 		expectedListRequests   int | ||||||
| 		expectPanic            bool | 		expectPanic            bool | ||||||
| 	}{ | 	}{ | ||||||
| 		{ | 		{ | ||||||
| 			name: "watchlist consistency check won't panic when data is consistent", | 			name: "data consistency check won't panic when data is consistent", | ||||||
| 			podList: &v1.PodList{ | 			podList: &v1.PodList{ | ||||||
| 				ListMeta: metav1.ListMeta{ResourceVersion: "2"}, | 				ListMeta: metav1.ListMeta{ResourceVersion: "2"}, | ||||||
| 				Items:    []v1.Pod{*makePod("p1", "1"), *makePod("p2", "2")}, | 				Items:    []v1.Pod{*makePod("p1", "1"), *makePod("p2", "2")}, | ||||||
| 			}, | 			}, | ||||||
|  | 			requestOptions:       metav1.ListOptions{TimeoutSeconds: ptr.To(int64(39))}, | ||||||
| 			storeContent:         []*v1.Pod{makePod("p1", "1"), makePod("p2", "2")}, | 			storeContent:         []*v1.Pod{makePod("p1", "1"), makePod("p2", "2")}, | ||||||
| 			expectedListRequests: 1, | 			expectedListRequests: 1, | ||||||
| 			expectedRequestOptions: []metav1.ListOptions{ | 			expectedRequestOptions: []metav1.ListOptions{ | ||||||
| 				{ | 				{ | ||||||
| 					ResourceVersion:      "2", | 					ResourceVersion:      "2", | ||||||
| 					ResourceVersionMatch: metav1.ResourceVersionMatchExact, | 					ResourceVersionMatch: metav1.ResourceVersionMatchExact, | ||||||
|  | 					TimeoutSeconds:       ptr.To(int64(39)), | ||||||
| 				}, | 				}, | ||||||
| 			}, | 			}, | ||||||
| 		}, | 		}, | ||||||
|  |  | ||||||
| 		{ | 		{ | ||||||
| 			name: "watchlist consistency check won't panic when there is no data", | 			name: "data consistency check won't panic when there is no data", | ||||||
| 			podList: &v1.PodList{ | 			podList: &v1.PodList{ | ||||||
| 				ListMeta: metav1.ListMeta{ResourceVersion: "2"}, | 				ListMeta: metav1.ListMeta{ResourceVersion: "2"}, | ||||||
| 			}, | 			}, | ||||||
|  | 			requestOptions:       metav1.ListOptions{TimeoutSeconds: ptr.To(int64(39))}, | ||||||
| 			expectedListRequests: 1, | 			expectedListRequests: 1, | ||||||
| 			expectedRequestOptions: []metav1.ListOptions{ | 			expectedRequestOptions: []metav1.ListOptions{ | ||||||
| 				{ | 				{ | ||||||
| 					ResourceVersion:      "2", | 					ResourceVersion:      "2", | ||||||
| 					ResourceVersionMatch: metav1.ResourceVersionMatchExact, | 					ResourceVersionMatch: metav1.ResourceVersionMatchExact, | ||||||
|  | 					TimeoutSeconds:       ptr.To(int64(39)), | ||||||
| 				}, | 				}, | ||||||
| 			}, | 			}, | ||||||
| 		}, | 		}, | ||||||
|  |  | ||||||
| 		{ | 		{ | ||||||
| 			name: "watchlist consistency panics when data is inconsistent", | 			name: "data consistency panics when data is inconsistent", | ||||||
| 			podList: &v1.PodList{ | 			podList: &v1.PodList{ | ||||||
| 				ListMeta: metav1.ListMeta{ResourceVersion: "2"}, | 				ListMeta: metav1.ListMeta{ResourceVersion: "2"}, | ||||||
| 				Items:    []v1.Pod{*makePod("p1", "1"), *makePod("p2", "2"), *makePod("p3", "3")}, | 				Items:    []v1.Pod{*makePod("p1", "1"), *makePod("p2", "2"), *makePod("p3", "3")}, | ||||||
| 			}, | 			}, | ||||||
|  | 			requestOptions:       metav1.ListOptions{TimeoutSeconds: ptr.To(int64(39))}, | ||||||
| 			storeContent:         []*v1.Pod{makePod("p1", "1"), makePod("p2", "2")}, | 			storeContent:         []*v1.Pod{makePod("p1", "1"), makePod("p2", "2")}, | ||||||
| 			expectedListRequests: 1, | 			expectedListRequests: 1, | ||||||
| 			expectedRequestOptions: []metav1.ListOptions{ | 			expectedRequestOptions: []metav1.ListOptions{ | ||||||
| 				{ | 				{ | ||||||
| 					ResourceVersion:      "2", | 					ResourceVersion:      "2", | ||||||
| 					ResourceVersionMatch: metav1.ResourceVersionMatchExact, | 					ResourceVersionMatch: metav1.ResourceVersionMatchExact, | ||||||
|  | 					TimeoutSeconds:       ptr.To(int64(39)), | ||||||
| 				}, | 				}, | ||||||
| 			}, | 			}, | ||||||
| 			expectPanic: true, | 			expectPanic: true, | ||||||
| @@ -90,15 +100,18 @@ func TestWatchListConsistency(t *testing.T) { | |||||||
| 	for _, scenario := range scenarios { | 	for _, scenario := range scenarios { | ||||||
| 		t.Run(scenario.name, func(t *testing.T) { | 		t.Run(scenario.name, func(t *testing.T) { | ||||||
| 			listWatcher, store, _, stopCh := testData() | 			listWatcher, store, _, stopCh := testData() | ||||||
|  | 			ctx := wait.ContextForChannel(stopCh) | ||||||
| 			for _, obj := range scenario.storeContent { | 			for _, obj := range scenario.storeContent { | ||||||
| 				require.NoError(t, store.Add(obj)) | 				require.NoError(t, store.Add(obj)) | ||||||
| 			} | 			} | ||||||
| 			listWatcher.customListResponse = scenario.podList | 			listWatcher.customListResponse = scenario.podList | ||||||
|  |  | ||||||
| 			if scenario.expectPanic { | 			if scenario.expectPanic { | ||||||
| 				require.Panics(t, func() { checkWatchListConsistency(stopCh, "", scenario.podList.ResourceVersion, listWatcher, store) }) | 				require.Panics(t, func() { | ||||||
|  | 					checkDataConsistency(ctx, "", scenario.podList.ResourceVersion, wrapListFuncWithContext(listWatcher.List), scenario.requestOptions, store.List) | ||||||
|  | 				}) | ||||||
| 			} else { | 			} else { | ||||||
| 				checkWatchListConsistency(stopCh, "", scenario.podList.ResourceVersion, listWatcher, store) | 				checkDataConsistency(ctx, "", scenario.podList.ResourceVersion, wrapListFuncWithContext(listWatcher.List), scenario.requestOptions, store.List) | ||||||
| 			} | 			} | ||||||
|  |  | ||||||
| 			verifyListCounter(t, listWatcher, scenario.expectedListRequests) | 			verifyListCounter(t, listWatcher, scenario.expectedListRequests) | ||||||
| @@ -108,20 +121,18 @@ func TestWatchListConsistency(t *testing.T) { | |||||||
| } | } | ||||||
|  |  | ||||||
| func TestDriveWatchLisConsistencyIfRequired(t *testing.T) { | func TestDriveWatchLisConsistencyIfRequired(t *testing.T) { | ||||||
| 	stopCh := make(chan struct{}) | 	ctx := context.TODO() | ||||||
| 	defer close(stopCh) | 	checkWatchListDataConsistencyIfRequested[runtime.Object, runtime.Object](ctx, "", "", nil, nil) | ||||||
| 	checkWatchListConsistencyIfRequested(stopCh, "", "", nil, nil) |  | ||||||
| } | } | ||||||
|  |  | ||||||
| func TestWatchListConsistencyRetry(t *testing.T) { | func TestDataConsistencyCheckerRetry(t *testing.T) { | ||||||
| 	store := NewStore(MetaNamespaceKeyFunc) | 	store := NewStore(MetaNamespaceKeyFunc) | ||||||
| 	stopCh := make(chan struct{}) | 	ctx := context.TODO() | ||||||
| 	defer close(stopCh) |  | ||||||
|  |  | ||||||
| 	stopListErrorAfter := 5 | 	stopListErrorAfter := 5 | ||||||
| 	errLister := &errorLister{stopErrorAfter: stopListErrorAfter} | 	errLister := &errorLister{stopErrorAfter: stopListErrorAfter} | ||||||
|  |  | ||||||
| 	checkWatchListConsistency(stopCh, "", "", errLister, store) | 	checkDataConsistency(ctx, "", "", wrapListFuncWithContext(errLister.List), metav1.ListOptions{}, store.List) | ||||||
| 	require.Equal(t, errLister.listCounter, errLister.stopErrorAfter) | 	require.Equal(t, errLister.listCounter, errLister.stopErrorAfter) | ||||||
| } | } | ||||||
|  |  | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user
	 Kubernetes Prow Robot
					Kubernetes Prow Robot