Merge pull request #123674 from serathius/non-recursive
Fix non-recursive list returning "resource version too high" error when consistent list from cache is enabled
This commit is contained in:
		@@ -182,9 +182,20 @@ func TestListWithListFromCache(t *testing.T) {
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func TestGetListNonRecursive(t *testing.T) {
 | 
					func TestGetListNonRecursive(t *testing.T) {
 | 
				
			||||||
	ctx, cacher, terminate := testSetup(t)
 | 
						defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, false)()
 | 
				
			||||||
 | 
						ctx, cacher, server, terminate := testSetupWithEtcdServer(t)
 | 
				
			||||||
	t.Cleanup(terminate)
 | 
						t.Cleanup(terminate)
 | 
				
			||||||
	storagetesting.RunTestGetListNonRecursive(ctx, t, cacher)
 | 
						storagetesting.RunTestGetListNonRecursive(ctx, t, compactStorage(cacher, server.V3Client), cacher)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func TestGetListNonRecursiveWithConsistentListFromCache(t *testing.T) {
 | 
				
			||||||
 | 
						defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, true)()
 | 
				
			||||||
 | 
						ctx, cacher, server, terminate := testSetupWithEtcdServer(t)
 | 
				
			||||||
 | 
						t.Cleanup(terminate)
 | 
				
			||||||
 | 
						// Wait before sending watch progress request to avoid https://github.com/etcd-io/etcd/issues/17507
 | 
				
			||||||
 | 
						// TODO(https://github.com/etcd-io/etcd/issues/17507): Remove sleep when etcd is upgraded to version with fix.
 | 
				
			||||||
 | 
						time.Sleep(100 * time.Millisecond)
 | 
				
			||||||
 | 
						storagetesting.RunTestGetListNonRecursive(ctx, t, compactStorage(cacher, server.V3Client), cacher)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func checkStorageCalls(t *testing.T, pageSize, estimatedProcessedObjects uint64) {
 | 
					func checkStorageCalls(t *testing.T, pageSize, estimatedProcessedObjects uint64) {
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -536,7 +536,14 @@ func (w *watchCache) notFresh(resourceVersion uint64) bool {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
// WaitUntilFreshAndGet returns a pointers to <storeElement> object.
 | 
					// WaitUntilFreshAndGet returns a pointers to <storeElement> object.
 | 
				
			||||||
func (w *watchCache) WaitUntilFreshAndGet(ctx context.Context, resourceVersion uint64, key string) (interface{}, bool, uint64, error) {
 | 
					func (w *watchCache) WaitUntilFreshAndGet(ctx context.Context, resourceVersion uint64, key string) (interface{}, bool, uint64, error) {
 | 
				
			||||||
	err := w.waitUntilFreshAndBlock(ctx, resourceVersion)
 | 
						var err error
 | 
				
			||||||
 | 
						if utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) && w.notFresh(resourceVersion) {
 | 
				
			||||||
 | 
							w.waitingUntilFresh.Add()
 | 
				
			||||||
 | 
							err = w.waitUntilFreshAndBlock(ctx, resourceVersion)
 | 
				
			||||||
 | 
							w.waitingUntilFresh.Remove()
 | 
				
			||||||
 | 
						} else {
 | 
				
			||||||
 | 
							err = w.waitUntilFreshAndBlock(ctx, resourceVersion)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
	defer w.RUnlock()
 | 
						defer w.RUnlock()
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		return nil, false, 0, err
 | 
							return nil, false, 0, err
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -162,8 +162,8 @@ func TestPreconditionalDeleteWithSuggestionPass(t *testing.T) {
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func TestGetListNonRecursive(t *testing.T) {
 | 
					func TestGetListNonRecursive(t *testing.T) {
 | 
				
			||||||
	ctx, store, _ := testSetup(t)
 | 
						ctx, store, client := testSetup(t)
 | 
				
			||||||
	storagetesting.RunTestGetListNonRecursive(ctx, t, store)
 | 
						storagetesting.RunTestGetListNonRecursive(ctx, t, compactStorage(client), store)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
type storeWithPrefixTransformer struct {
 | 
					type storeWithPrefixTransformer struct {
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -1341,7 +1341,7 @@ func seedMultiLevelData(ctx context.Context, store storage.Interface) (string, [
 | 
				
			|||||||
	return initialRV, created, nil
 | 
						return initialRV, created, nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func RunTestGetListNonRecursive(ctx context.Context, t *testing.T, store storage.Interface) {
 | 
					func RunTestGetListNonRecursive(ctx context.Context, t *testing.T, compaction Compaction, store storage.Interface) {
 | 
				
			||||||
	key, prevStoredObj := testPropagateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "test-ns"}})
 | 
						key, prevStoredObj := testPropagateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "test-ns"}})
 | 
				
			||||||
	prevRV, _ := strconv.Atoi(prevStoredObj.ResourceVersion)
 | 
						prevRV, _ := strconv.Atoi(prevStoredObj.ResourceVersion)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -1354,7 +1354,11 @@ func RunTestGetListNonRecursive(ctx context.Context, t *testing.T, store storage
 | 
				
			|||||||
		}, nil); err != nil {
 | 
							}, nil); err != nil {
 | 
				
			||||||
		t.Fatalf("update failed: %v", err)
 | 
							t.Fatalf("update failed: %v", err)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	currentRV, _ := strconv.Atoi(storedObj.ResourceVersion)
 | 
						objRV, _ := strconv.Atoi(storedObj.ResourceVersion)
 | 
				
			||||||
 | 
						// Use compact to increase etcd global revision without changes to any resources.
 | 
				
			||||||
 | 
						// The increase in resources version comes from Kubernetes compaction updating hidden key.
 | 
				
			||||||
 | 
						// Used to test consistent List to confirm it returns latest etcd revision.
 | 
				
			||||||
 | 
						compaction(ctx, t, prevStoredObj.ResourceVersion)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	tests := []struct {
 | 
						tests := []struct {
 | 
				
			||||||
		name                 string
 | 
							name                 string
 | 
				
			||||||
@@ -1388,13 +1392,13 @@ func RunTestGetListNonRecursive(ctx context.Context, t *testing.T, store storage
 | 
				
			|||||||
		key:         key,
 | 
							key:         key,
 | 
				
			||||||
		pred:        storage.Everything,
 | 
							pred:        storage.Everything,
 | 
				
			||||||
		expectedOut: []example.Pod{*storedObj},
 | 
							expectedOut: []example.Pod{*storedObj},
 | 
				
			||||||
		rv:          fmt.Sprintf("%d", currentRV),
 | 
							rv:          fmt.Sprintf("%d", objRV),
 | 
				
			||||||
	}, {
 | 
						}, {
 | 
				
			||||||
		name:        "existing key, resourceVersion=current, resourceVersionMatch=notOlderThan",
 | 
							name:        "existing key, resourceVersion=current, resourceVersionMatch=notOlderThan",
 | 
				
			||||||
		key:         key,
 | 
							key:         key,
 | 
				
			||||||
		pred:        storage.Everything,
 | 
							pred:        storage.Everything,
 | 
				
			||||||
		expectedOut: []example.Pod{*storedObj},
 | 
							expectedOut: []example.Pod{*storedObj},
 | 
				
			||||||
		rv:          fmt.Sprintf("%d", currentRV),
 | 
							rv:          fmt.Sprintf("%d", objRV),
 | 
				
			||||||
		rvMatch:     metav1.ResourceVersionMatchNotOlderThan,
 | 
							rvMatch:     metav1.ResourceVersionMatchNotOlderThan,
 | 
				
			||||||
	}, {
 | 
						}, {
 | 
				
			||||||
		name:                 "existing key, resourceVersion=previous, resourceVersionMatch=notOlderThan",
 | 
							name:                 "existing key, resourceVersion=previous, resourceVersionMatch=notOlderThan",
 | 
				
			||||||
@@ -1408,7 +1412,7 @@ func RunTestGetListNonRecursive(ctx context.Context, t *testing.T, store storage
 | 
				
			|||||||
		key:         key,
 | 
							key:         key,
 | 
				
			||||||
		pred:        storage.Everything,
 | 
							pred:        storage.Everything,
 | 
				
			||||||
		expectedOut: []example.Pod{*storedObj},
 | 
							expectedOut: []example.Pod{*storedObj},
 | 
				
			||||||
		rv:          fmt.Sprintf("%d", currentRV),
 | 
							rv:          fmt.Sprintf("%d", objRV),
 | 
				
			||||||
		rvMatch:     metav1.ResourceVersionMatchExact,
 | 
							rvMatch:     metav1.ResourceVersionMatchExact,
 | 
				
			||||||
	}, {
 | 
						}, {
 | 
				
			||||||
		name:        "existing key, resourceVersion=previous, resourceVersionMatch=exact",
 | 
							name:        "existing key, resourceVersion=previous, resourceVersionMatch=exact",
 | 
				
			||||||
@@ -1453,7 +1457,7 @@ func RunTestGetListNonRecursive(ctx context.Context, t *testing.T, store storage
 | 
				
			|||||||
			},
 | 
								},
 | 
				
			||||||
		},
 | 
							},
 | 
				
			||||||
		expectedOut: []example.Pod{},
 | 
							expectedOut: []example.Pod{},
 | 
				
			||||||
		rv:          fmt.Sprintf("%d", currentRV),
 | 
							rv:          fmt.Sprintf("%d", objRV),
 | 
				
			||||||
	}}
 | 
						}}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	for _, tt := range tests {
 | 
						for _, tt := range tests {
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user