Merge pull request #125335 from p0lyn0mial/upstream-consistency-decector-handles-legacy-case
client-go/consistencydetector: handles the watch cache legacy case
This commit is contained in:
		@@ -46,8 +46,9 @@ func CheckDataConsistency[T runtime.Object, U any](ctx context.Context, identity
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	klog.Warningf("data consistency check for %s is enabled, this will result in an additional call to the API server.", identity)
 | 
			
		||||
	listOptions.ResourceVersion = lastSyncedResourceVersion
 | 
			
		||||
	listOptions.ResourceVersionMatch = metav1.ResourceVersionMatchExact
 | 
			
		||||
 | 
			
		||||
	retrievedItems := toMetaObjectSliceOrDie(retrieveItemsFn())
 | 
			
		||||
	listOptions = prepareListCallOptions(lastSyncedResourceVersion, listOptions, len(retrievedItems))
 | 
			
		||||
	var list runtime.Object
 | 
			
		||||
	err := wait.PollUntilContextCancel(ctx, time.Second, true, func(_ context.Context) (done bool, err error) {
 | 
			
		||||
		list, err = listFn(ctx, listOptions)
 | 
			
		||||
@@ -69,9 +70,7 @@ func CheckDataConsistency[T runtime.Object, U any](ctx context.Context, identity
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		panic(err) // this should never happen
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	listItems := toMetaObjectSliceOrDie(rawListItems)
 | 
			
		||||
	retrievedItems := toMetaObjectSliceOrDie(retrieveItemsFn())
 | 
			
		||||
 | 
			
		||||
	sort.Sort(byUID(listItems))
 | 
			
		||||
	sort.Sort(byUID(retrievedItems))
 | 
			
		||||
@@ -85,24 +84,49 @@ func CheckDataConsistency[T runtime.Object, U any](ctx context.Context, identity
 | 
			
		||||
 | 
			
		||||
// canFormAdditionalListCall ensures that we can form a valid LIST requests
 | 
			
		||||
// for checking data consistency.
 | 
			
		||||
func canFormAdditionalListCall(resourceVersion string, options metav1.ListOptions) bool {
 | 
			
		||||
func canFormAdditionalListCall(lastSyncedResourceVersion string, listOptions metav1.ListOptions) bool {
 | 
			
		||||
	// since we are setting ResourceVersionMatch to metav1.ResourceVersionMatchExact
 | 
			
		||||
	// we need to make sure that the continuation hasn't been set
 | 
			
		||||
	// https://github.com/kubernetes/kubernetes/blob/be4afb9ef90b19ccb6f7e595cbdb247e088b2347/staging/src/k8s.io/apimachinery/pkg/apis/meta/internalversion/validation/validation.go#L38
 | 
			
		||||
	if len(options.Continue) > 0 {
 | 
			
		||||
	if len(listOptions.Continue) > 0 {
 | 
			
		||||
		return false
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// since we are setting ResourceVersionMatch to metav1.ResourceVersionMatchExact
 | 
			
		||||
	// we need to make sure that the RV is valid because the validation code forbids RV == "0"
 | 
			
		||||
	// https://github.com/kubernetes/kubernetes/blob/be4afb9ef90b19ccb6f7e595cbdb247e088b2347/staging/src/k8s.io/apimachinery/pkg/apis/meta/internalversion/validation/validation.go#L44
 | 
			
		||||
	if resourceVersion == "0" {
 | 
			
		||||
	if lastSyncedResourceVersion == "0" {
 | 
			
		||||
		return false
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return true
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// prepareListCallOptions changes the input list options so that
 | 
			
		||||
// the list call goes directly to etcd
 | 
			
		||||
func prepareListCallOptions(lastSyncedResourceVersion string, listOptions metav1.ListOptions, retrievedItemsCount int) metav1.ListOptions {
 | 
			
		||||
	// this is our legacy case:
 | 
			
		||||
	//
 | 
			
		||||
	// the watch cache skips the Limit if the ResourceVersion was set to "0"
 | 
			
		||||
	// thus, to compare with data retrieved directly from etcd
 | 
			
		||||
	// we need to skip the limit to for the list call as well.
 | 
			
		||||
	//
 | 
			
		||||
	// note that when the number of retrieved items is less than the request limit,
 | 
			
		||||
	// it means either the watch cache is disabled, or there is not enough data.
 | 
			
		||||
	// in both cases, we can use the limit because we will be able to compare
 | 
			
		||||
	// the data with the items retrieved from etcd.
 | 
			
		||||
	if listOptions.ResourceVersion == "0" && listOptions.Limit > 0 && int64(retrievedItemsCount) > listOptions.Limit {
 | 
			
		||||
		listOptions.Limit = 0
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// set the RV and RVM so that we get the snapshot of data
 | 
			
		||||
	// directly from etcd.
 | 
			
		||||
	listOptions.ResourceVersion = lastSyncedResourceVersion
 | 
			
		||||
	listOptions.ResourceVersionMatch = metav1.ResourceVersionMatchExact
 | 
			
		||||
 | 
			
		||||
	return listOptions
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type byUID []metav1.Object
 | 
			
		||||
 | 
			
		||||
func (a byUID) Len() int           { return len(a) }
 | 
			
		||||
 
 | 
			
		||||
@@ -60,6 +60,59 @@ func TestDataConsistencyChecker(t *testing.T) {
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
 | 
			
		||||
		{
 | 
			
		||||
			name: "legacy, the limit is removed from the list options when it wasn't honored by the watch cache",
 | 
			
		||||
			listResponse: &v1.PodList{
 | 
			
		||||
				ListMeta: metav1.ListMeta{ResourceVersion: "2"},
 | 
			
		||||
				Items:    []v1.Pod{*makePod("p1", "1"), *makePod("p2", "2"), *makePod("p3", "3")},
 | 
			
		||||
			},
 | 
			
		||||
			requestOptions:       metav1.ListOptions{ResourceVersion: "0", Limit: 2},
 | 
			
		||||
			retrievedItems:       []*v1.Pod{makePod("p1", "1"), makePod("p2", "2"), makePod("p3", "3")},
 | 
			
		||||
			expectedListRequests: 1,
 | 
			
		||||
			expectedRequestOptions: []metav1.ListOptions{
 | 
			
		||||
				{
 | 
			
		||||
					ResourceVersion:      "2",
 | 
			
		||||
					ResourceVersionMatch: metav1.ResourceVersionMatchExact,
 | 
			
		||||
				},
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
 | 
			
		||||
		{
 | 
			
		||||
			name: "the limit is NOT removed from the list options for non-legacy request",
 | 
			
		||||
			listResponse: &v1.PodList{
 | 
			
		||||
				ListMeta: metav1.ListMeta{ResourceVersion: "2"},
 | 
			
		||||
				Items:    []v1.Pod{*makePod("p1", "1"), *makePod("p2", "2"), *makePod("p3", "3")},
 | 
			
		||||
			},
 | 
			
		||||
			requestOptions:       metav1.ListOptions{ResourceVersion: "2", Limit: 2},
 | 
			
		||||
			retrievedItems:       []*v1.Pod{makePod("p1", "1"), makePod("p2", "2"), makePod("p3", "3")},
 | 
			
		||||
			expectedListRequests: 1,
 | 
			
		||||
			expectedRequestOptions: []metav1.ListOptions{
 | 
			
		||||
				{
 | 
			
		||||
					Limit:                2,
 | 
			
		||||
					ResourceVersion:      "2",
 | 
			
		||||
					ResourceVersionMatch: metav1.ResourceVersionMatchExact,
 | 
			
		||||
				},
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
 | 
			
		||||
		{
 | 
			
		||||
			name: "legacy, the limit is NOT removed from the list options when the watch cache is disabled",
 | 
			
		||||
			listResponse: &v1.PodList{
 | 
			
		||||
				ListMeta: metav1.ListMeta{ResourceVersion: "2"},
 | 
			
		||||
				Items:    []v1.Pod{*makePod("p1", "1"), *makePod("p2", "2"), *makePod("p3", "3")},
 | 
			
		||||
			},
 | 
			
		||||
			requestOptions:       metav1.ListOptions{ResourceVersion: "0", Limit: 5},
 | 
			
		||||
			retrievedItems:       []*v1.Pod{makePod("p1", "1"), makePod("p2", "2"), makePod("p3", "3")},
 | 
			
		||||
			expectedListRequests: 1,
 | 
			
		||||
			expectedRequestOptions: []metav1.ListOptions{
 | 
			
		||||
				{
 | 
			
		||||
					Limit:                5,
 | 
			
		||||
					ResourceVersion:      "2",
 | 
			
		||||
					ResourceVersionMatch: metav1.ResourceVersionMatchExact,
 | 
			
		||||
				},
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
 | 
			
		||||
		{
 | 
			
		||||
			name: "data consistency check won't panic when there is no data",
 | 
			
		||||
			listResponse: &v1.PodList{
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user