From 09e85983d84b5f6c3fed6c09dd0adcbdde7e9d5f Mon Sep 17 00:00:00 2001 From: Lukasz Szaszkiewicz Date: Mon, 10 Jun 2024 10:14:38 +0200 Subject: [PATCH 1/3] apiserver/storage/cacher: cache supports pagination --- .../apiserver/pkg/storage/cacher/cacher.go | 39 ++++++++++-- .../storage/cacher/cacher_whitebox_test.go | 60 ++++++++++++++++++- 2 files changed, 92 insertions(+), 7 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index 70ccbad200d..1f0058237dc 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -768,12 +768,26 @@ func shouldDelegateList(opts storage.ListOptions) bool { consistentReadFromStorage := resourceVersion == "" && !(consistentListFromCacheEnabled && requestWatchProgressSupported) // Watch cache doesn't support continuations, so serve them from etcd. hasContinuation := len(pred.Continue) > 0 - // Serve paginated requests about revision "0" from watch cache to avoid overwhelming etcd. - hasLimit := pred.Limit > 0 && resourceVersion != "0" // Watch cache only supports ResourceVersionMatchNotOlderThan (default). - unsupportedMatch := match != "" && match != metav1.ResourceVersionMatchNotOlderThan + // see https://kubernetes.io/docs/reference/using-api/api-concepts/#semantics-for-get-and-list + isLegacyExactMatch := opts.Predicate.Limit > 0 && match == "" && len(resourceVersion) > 0 && resourceVersion != "0" + unsupportedMatch := match != "" && match != metav1.ResourceVersionMatchNotOlderThan || isLegacyExactMatch - return consistentReadFromStorage || hasContinuation || hasLimit || unsupportedMatch + return consistentReadFromStorage || hasContinuation || unsupportedMatch +} + +// computeListLimit determines whether the cacher should +// apply a limit to an incoming LIST request and returns its value. +// +// note that this function doesn't check RVM nor the Continuation token. +// these parameters are validated by the shouldDelegateList function. +// +// as of today, the limit is ignored for requests that set RV == 0 +func computeListLimit(opts storage.ListOptions) int64 { + if opts.Predicate.Limit <= 0 || opts.ResourceVersion == "0" { + return 0 + } + return opts.Predicate.Limit } func shouldDelegateListOnNotReadyCache(opts storage.ListOptions) bool { @@ -883,13 +897,21 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio // the elements in ListObject are Struct type, making slice will bring excessive memory consumption. // so we try to delay this action as much as possible var selectedObjects []runtime.Object - for _, obj := range objs { + var lastSelectedObjectKey string + var hasMoreListItems bool + limit := computeListLimit(opts) + for i, obj := range objs { elem, ok := obj.(*storeElement) if !ok { return fmt.Errorf("non *storeElement returned from storage: %v", obj) } if filter(elem.Key, elem.Labels, elem.Fields) { selectedObjects = append(selectedObjects, elem.Object) + lastSelectedObjectKey = elem.Key + } + if limit > 0 && int64(len(selectedObjects)) >= limit { + hasMoreListItems = i < len(objs)-1 + break } } if len(selectedObjects) == 0 { @@ -905,7 +927,12 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio } span.AddEvent("Filtered items", attribute.Int("count", listVal.Len())) if c.versioner != nil { - if err := c.versioner.UpdateList(listObj, readResourceVersion, "", nil); err != nil { + continueValue, remainingItemCount, err := storage.PrepareContinueToken(lastSelectedObjectKey, key, int64(readResourceVersion), int64(len(objs)), hasMoreListItems, opts) + if err != nil { + return err + } + + if err = c.versioner.UpdateList(listObj, readResourceVersion, continueValue, remainingItemCount); err != nil { return err } } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go index 9ffe0510509..a5e62cbb689 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go @@ -201,7 +201,6 @@ func TestGetListCacheBypass(t *testing.T) { {opts: storage.ListOptions{ResourceVersion: "0", Predicate: storage.SelectionPredicate{Continue: "a"}}, expectBypass: true}, {opts: storage.ListOptions{ResourceVersion: "1", Predicate: storage.SelectionPredicate{Continue: "a"}}, expectBypass: true}, - {opts: storage.ListOptions{ResourceVersion: "", Predicate: storage.SelectionPredicate{Limit: 500}}, expectBypass: true}, {opts: storage.ListOptions{ResourceVersion: "0", Predicate: storage.SelectionPredicate{Limit: 500}}, expectBypass: false}, {opts: storage.ListOptions{ResourceVersion: "1", Predicate: storage.SelectionPredicate{Limit: 500}}, expectBypass: true}, @@ -214,6 +213,7 @@ func TestGetListCacheBypass(t *testing.T) { featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, false) testCases := append(commonTestCases, testCase{opts: storage.ListOptions{ResourceVersion: ""}, expectBypass: true}, + testCase{opts: storage.ListOptions{ResourceVersion: "", Predicate: storage.SelectionPredicate{Limit: 500}}, expectBypass: true}, ) for _, tc := range testCases { testGetListCacheBypass(t, tc.opts, tc.expectBypass) @@ -233,6 +233,7 @@ func TestGetListCacheBypass(t *testing.T) { testCases := append(commonTestCases, testCase{opts: storage.ListOptions{ResourceVersion: ""}, expectBypass: false}, + testCase{opts: storage.ListOptions{ResourceVersion: "", Predicate: storage.SelectionPredicate{Limit: 500}}, expectBypass: false}, ) for _, tc := range testCases { testGetListCacheBypass(t, tc.opts, tc.expectBypass) @@ -2591,6 +2592,63 @@ func TestWatchStreamSeparation(t *testing.T) { } } +func TestComputeListLimit(t *testing.T) { + scenarios := []struct { + name string + opts storage.ListOptions + expectedLimit int64 + }{ + { + name: "limit is zero", + opts: storage.ListOptions{ + Predicate: storage.SelectionPredicate{ + Limit: 0, + }, + }, + expectedLimit: 0, + }, + { + name: "limit is positive, RV is unset", + opts: storage.ListOptions{ + Predicate: storage.SelectionPredicate{ + Limit: 1, + }, + ResourceVersion: "", + }, + expectedLimit: 1, + }, + { + name: "limit is positive, RV = 100", + opts: storage.ListOptions{ + Predicate: storage.SelectionPredicate{ + Limit: 1, + }, + ResourceVersion: "100", + }, + expectedLimit: 1, + }, + { + name: "legacy case: limit is positive, RV = 0", + opts: storage.ListOptions{ + Predicate: storage.SelectionPredicate{ + Limit: 1, + }, + ResourceVersion: "0", + }, + expectedLimit: 0, + }, + } + + for _, scenario := range scenarios { + t.Run(scenario.name, func(t *testing.T) { + actualLimit := computeListLimit(scenario.opts) + if actualLimit != scenario.expectedLimit { + t.Errorf("computeListLimit returned = %v, expected %v", actualLimit, scenario.expectedLimit) + } + }) + } +} + func watchAndWaitForBookmark(t *testing.T, ctx context.Context, etcdStorage storage.Interface) func() (resourceVersion uint64) { opts := storage.ListOptions{ResourceVersion: "", Predicate: storage.Everything, Recursive: true} opts.Predicate.AllowWatchBookmarks = true From c259fe2342162a0c883845bfbdf8a838697fe085 Mon Sep 17 00:00:00 2001 From: Lukasz Szaszkiewicz Date: Mon, 17 Jun 2024 16:14:39 +0200 Subject: [PATCH 2/3] flowcontrol/request/list_work_estimator: sync shouldDelegateList --- .../pkg/util/flowcontrol/request/list_work_estimator.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/list_work_estimator.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/list_work_estimator.go index d880469659c..a9e76141e93 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/list_work_estimator.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/list_work_estimator.go @@ -173,10 +173,10 @@ func shouldListFromStorage(query url.Values, opts *metav1.ListOptions) bool { consistentReadFromStorage := resourceVersion == "" && !(consistentListFromCacheEnabled && requestWatchProgressSupported) // Watch cache doesn't support continuations, so serve them from etcd. hasContinuation := len(opts.Continue) > 0 - // Serve paginated requests about revision "0" from watch cache to avoid overwhelming etcd. - hasLimit := opts.Limit > 0 && resourceVersion != "0" // Watch cache only supports ResourceVersionMatchNotOlderThan (default). - unsupportedMatch := match != "" && match != metav1.ResourceVersionMatchNotOlderThan + // see https://kubernetes.io/docs/reference/using-api/api-concepts/#semantics-for-get-and-list + isLegacyExactMatch := opts.Limit > 0 && match == "" && len(resourceVersion) > 0 && resourceVersion != "0" + unsupportedMatch := match != "" && match != metav1.ResourceVersionMatchNotOlderThan || isLegacyExactMatch - return consistentReadFromStorage || hasContinuation || hasLimit || unsupportedMatch + return consistentReadFromStorage || hasContinuation || unsupportedMatch } From 2f9660db6b0ba37ff383559b1b0324c635f1eb66 Mon Sep 17 00:00:00 2001 From: Lukasz Szaszkiewicz Date: Wed, 26 Jun 2024 14:34:32 +0200 Subject: [PATCH 3/3] apiserver/storage/watchcache: WaitUntilFreshAndList supports path prefix --- .../apiserver/pkg/storage/cacher/cacher.go | 9 +++---- .../pkg/storage/cacher/watch_cache.go | 25 +++++++++++++++++-- .../pkg/storage/cacher/watch_cache_test.go | 16 ++++++------ 3 files changed, 35 insertions(+), 15 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index 1f0058237dc..cdbf028c3b5 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -615,7 +615,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions // to compute watcher.forget function (which has to happen under lock). watcher := newCacheWatcher( chanSize, - filterWithAttrsFunction(key, pred), + filterWithAttrsAndPrefixFunction(key, pred), emptyFunc, c.versioner, deadline, @@ -809,7 +809,7 @@ func (c *Cacher) listItems(ctx context.Context, listRV uint64, key string, pred } return nil, readResourceVersion, "", nil } - return c.watchCache.WaitUntilFreshAndList(ctx, listRV, pred.MatcherIndex(ctx)) + return c.watchCache.WaitUntilFreshAndList(ctx, listRV, key, pred.MatcherIndex(ctx)) } // GetList implements storage.Interface @@ -885,7 +885,6 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio if listVal.Kind() != reflect.Slice { return fmt.Errorf("need a pointer to slice, got %v", listVal.Kind()) } - filter := filterWithAttrsFunction(preparedKey, pred) objs, readResourceVersion, indexUsed, err := c.listItems(ctx, listRV, preparedKey, pred, recursive) if err != nil { @@ -905,7 +904,7 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio if !ok { return fmt.Errorf("non *storeElement returned from storage: %v", obj) } - if filter(elem.Key, elem.Labels, elem.Fields) { + if pred.MatchesObjectAttributes(elem.Labels, elem.Fields) { selectedObjects = append(selectedObjects, elem.Object) lastSelectedObjectKey = elem.Key } @@ -1320,7 +1319,7 @@ func forgetWatcher(c *Cacher, w *cacheWatcher, index int, scope namespacedName, } } -func filterWithAttrsFunction(key string, p storage.SelectionPredicate) filterWithAttrsFunc { +func filterWithAttrsAndPrefixFunction(key string, p storage.SelectionPredicate) filterWithAttrsFunc { filterFunc := func(objKey string, label labels.Set, field fields.Set) bool { if !hasPathPrefix(objKey, key) { return false diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go index c2c7efcbecb..34a48ac637d 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go @@ -501,7 +501,29 @@ func (s sortableStoreElements) Swap(i, j int) { // WaitUntilFreshAndList returns list of pointers to `storeElement` objects along // with their ResourceVersion and the name of the index, if any, that was used. -func (w *watchCache) WaitUntilFreshAndList(ctx context.Context, resourceVersion uint64, matchValues []storage.MatchValue) (result []interface{}, rv uint64, index string, err error) { +func (w *watchCache) WaitUntilFreshAndList(ctx context.Context, resourceVersion uint64, key string, matchValues []storage.MatchValue) ([]interface{}, uint64, string, error) { + items, rv, index, err := w.waitUntilFreshAndListItems(ctx, resourceVersion, key, matchValues) + if err != nil { + return nil, 0, "", err + } + + var result []interface{} + for _, item := range items { + elem, ok := item.(*storeElement) + if !ok { + return nil, 0, "", fmt.Errorf("non *storeElement returned from storage: %v", item) + } + if !hasPathPrefix(elem.Key, key) { + continue + } + result = append(result, item) + } + + sort.Sort(sortableStoreElements(result)) + return result, rv, index, nil +} + +func (w *watchCache) waitUntilFreshAndListItems(ctx context.Context, resourceVersion uint64, key string, matchValues []storage.MatchValue) (result []interface{}, rv uint64, index string, err error) { requestWatchProgressSupported := etcdfeature.DefaultFeatureSupportChecker.Supports(storage.RequestWatchProgress) if utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) && requestWatchProgressSupported && w.notFresh(resourceVersion) { w.waitingUntilFresh.Add() @@ -511,7 +533,6 @@ func (w *watchCache) WaitUntilFreshAndList(ctx context.Context, resourceVersion err = w.waitUntilFreshAndBlock(ctx, resourceVersion) } - defer func() { sort.Sort(sortableStoreElements(result)) }() defer w.RUnlock() if err != nil { return result, rv, index, err diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go index e7093fd3e2f..27f695014ca 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go @@ -462,7 +462,7 @@ func TestWaitUntilFreshAndList(t *testing.T) { }() // list by empty MatchValues. - list, resourceVersion, indexUsed, err := store.WaitUntilFreshAndList(ctx, 5, nil) + list, resourceVersion, indexUsed, err := store.WaitUntilFreshAndList(ctx, 5, "prefix/", nil) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -481,7 +481,7 @@ func TestWaitUntilFreshAndList(t *testing.T) { {IndexName: "l:label", Value: "value1"}, {IndexName: "f:spec.nodeName", Value: "node2"}, } - list, resourceVersion, indexUsed, err = store.WaitUntilFreshAndList(ctx, 5, matchValues) + list, resourceVersion, indexUsed, err = store.WaitUntilFreshAndList(ctx, 5, "prefix/", matchValues) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -500,7 +500,7 @@ func TestWaitUntilFreshAndList(t *testing.T) { {IndexName: "l:not-exist-label", Value: "whatever"}, {IndexName: "f:spec.nodeName", Value: "node2"}, } - list, resourceVersion, indexUsed, err = store.WaitUntilFreshAndList(ctx, 5, matchValues) + list, resourceVersion, indexUsed, err = store.WaitUntilFreshAndList(ctx, 5, "prefix/", matchValues) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -518,7 +518,7 @@ func TestWaitUntilFreshAndList(t *testing.T) { matchValues = []storage.MatchValue{ {IndexName: "l:not-exist-label", Value: "whatever"}, } - list, resourceVersion, indexUsed, err = store.WaitUntilFreshAndList(ctx, 5, matchValues) + list, resourceVersion, indexUsed, err = store.WaitUntilFreshAndList(ctx, 5, "prefix/", matchValues) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -546,7 +546,7 @@ func TestWaitUntilFreshAndListFromCache(t *testing.T) { }() // list from future revision. Requires watch cache to request bookmark to get it. - list, resourceVersion, indexUsed, err := store.WaitUntilFreshAndList(ctx, 3, nil) + list, resourceVersion, indexUsed, err := store.WaitUntilFreshAndList(ctx, 3, "prefix/", nil) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -626,7 +626,7 @@ func TestWaitUntilFreshAndListTimeout(t *testing.T) { store.Add(makeTestPod("bar", 4)) }() - _, _, _, err := store.WaitUntilFreshAndList(ctx, 4, nil) + _, _, _, err := store.WaitUntilFreshAndList(ctx, 4, "", nil) if !errors.IsTimeout(err) { t.Errorf("expected timeout error but got: %v", err) } @@ -655,7 +655,7 @@ func TestReflectorForWatchCache(t *testing.T) { defer store.Stop() { - _, version, _, err := store.WaitUntilFreshAndList(ctx, 0, nil) + _, version, _, err := store.WaitUntilFreshAndList(ctx, 0, "", nil) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -678,7 +678,7 @@ func TestReflectorForWatchCache(t *testing.T) { r.ListAndWatch(wait.NeverStop) { - _, version, _, err := store.WaitUntilFreshAndList(ctx, 10, nil) + _, version, _, err := store.WaitUntilFreshAndList(ctx, 10, "", nil) if err != nil { t.Fatalf("unexpected error: %v", err) }