Implement ConsistentListFromCache feature gate
Request bookmark every 100ms when there is at least one request blocked on revision not present in watch cache.
This commit is contained in:
		@@ -208,6 +208,13 @@ const (
 | 
			
		||||
	//
 | 
			
		||||
	// Allow the API server to stream individual items instead of chunking
 | 
			
		||||
	WatchList featuregate.Feature = "WatchList"
 | 
			
		||||
 | 
			
		||||
	// owner: @serathius
 | 
			
		||||
	// kep: http://kep.k8s.io/2340
 | 
			
		||||
	// alpha: v1.28
 | 
			
		||||
	//
 | 
			
		||||
	// Allow the API server to serve consistent lists from cache
 | 
			
		||||
	ConsistentListFromCache featuregate.Feature = "ConsistentListFromCache"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func init() {
 | 
			
		||||
@@ -264,4 +271,6 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS
 | 
			
		||||
	InPlacePodVerticalScaling: {Default: false, PreRelease: featuregate.Alpha},
 | 
			
		||||
 | 
			
		||||
	WatchList: {Default: false, PreRelease: featuregate.Alpha},
 | 
			
		||||
 | 
			
		||||
	ConsistentListFromCache: {Default: false, PreRelease: featuregate.Alpha},
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -726,9 +726,10 @@ func shouldDelegateList(opts storage.ListOptions) bool {
 | 
			
		||||
	pred := opts.Predicate
 | 
			
		||||
	match := opts.ResourceVersionMatch
 | 
			
		||||
	pagingEnabled := utilfeature.DefaultFeatureGate.Enabled(features.APIListChunking)
 | 
			
		||||
	consistentListFromCacheEnabled := utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache)
 | 
			
		||||
 | 
			
		||||
	// Serve consistent reads from storage
 | 
			
		||||
	consistentReadFromStorage := resourceVersion == ""
 | 
			
		||||
	// Serve consistent reads from storage if ConsistentListFromCache is disabled
 | 
			
		||||
	consistentReadFromStorage := resourceVersion == "" && !consistentListFromCacheEnabled
 | 
			
		||||
	// Watch cache doesn't support continuations, so serve them from etcd.
 | 
			
		||||
	hasContinuation := pagingEnabled && len(pred.Continue) > 0
 | 
			
		||||
	// Serve paginated requests about revision "0" from watch cache to avoid overwhelming etcd.
 | 
			
		||||
@@ -762,19 +763,21 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio
 | 
			
		||||
		return c.storage.GetList(ctx, key, opts, listObj)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// If resourceVersion is specified, serve it from cache.
 | 
			
		||||
	// It's guaranteed that the returned value is at least that
 | 
			
		||||
	// fresh as the given resourceVersion.
 | 
			
		||||
	listRV, err := c.versioner.ParseResourceVersion(resourceVersion)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if listRV == 0 && !c.ready.check() {
 | 
			
		||||
		// If Cacher is not yet initialized and we don't require any specific
 | 
			
		||||
		// minimal resource version, simply forward the request to storage.
 | 
			
		||||
		return c.storage.GetList(ctx, key, opts, listObj)
 | 
			
		||||
	}
 | 
			
		||||
	if listRV == 0 && utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) {
 | 
			
		||||
		listRV, err = c.getCurrentResourceVersionFromStorage(ctx)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return err
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	ctx, span := tracing.Start(ctx, "cacher list",
 | 
			
		||||
		attribute.String("audit-id", audit.GetAuditIDTruncated(ctx)),
 | 
			
		||||
 
 | 
			
		||||
@@ -31,9 +31,12 @@ import (
 | 
			
		||||
	"k8s.io/apimachinery/pkg/util/wait"
 | 
			
		||||
	"k8s.io/apiserver/pkg/apis/example"
 | 
			
		||||
	examplev1 "k8s.io/apiserver/pkg/apis/example/v1"
 | 
			
		||||
	"k8s.io/apiserver/pkg/features"
 | 
			
		||||
	"k8s.io/apiserver/pkg/storage"
 | 
			
		||||
	etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing"
 | 
			
		||||
	storagetesting "k8s.io/apiserver/pkg/storage/testing"
 | 
			
		||||
	utilfeature "k8s.io/apiserver/pkg/util/feature"
 | 
			
		||||
	featuregatetesting "k8s.io/component-base/featuregate/testing"
 | 
			
		||||
	"k8s.io/utils/clock"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
@@ -150,6 +153,13 @@ func TestList(t *testing.T) {
 | 
			
		||||
	storagetesting.RunTestList(ctx, t, cacher, compactStorage(cacher, server.V3Client), true)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestListWithListFromCache(t *testing.T) {
 | 
			
		||||
	defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, true)()
 | 
			
		||||
	ctx, cacher, server, terminate := testSetupWithEtcdServer(t)
 | 
			
		||||
	t.Cleanup(terminate)
 | 
			
		||||
	storagetesting.RunTestList(ctx, t, cacher, compactStorage(cacher, server.V3Client), true)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestListWithoutPaging(t *testing.T) {
 | 
			
		||||
	ctx, cacher, terminate := testSetup(t, withoutPaging)
 | 
			
		||||
	t.Cleanup(terminate)
 | 
			
		||||
 
 | 
			
		||||
@@ -162,8 +162,7 @@ func TestGetListCacheBypass(t *testing.T) {
 | 
			
		||||
		opts         storage.ListOptions
 | 
			
		||||
		expectBypass bool
 | 
			
		||||
	}
 | 
			
		||||
	testCases := []testCase{
 | 
			
		||||
		{opts: storage.ListOptions{ResourceVersion: ""}, expectBypass: true},
 | 
			
		||||
	commonTestCases := []testCase{
 | 
			
		||||
		{opts: storage.ListOptions{ResourceVersion: "0"}, expectBypass: false},
 | 
			
		||||
		{opts: storage.ListOptions{ResourceVersion: "1"}, expectBypass: false},
 | 
			
		||||
 | 
			
		||||
@@ -180,9 +179,25 @@ func TestGetListCacheBypass(t *testing.T) {
 | 
			
		||||
		{opts: storage.ListOptions{ResourceVersion: "1", ResourceVersionMatch: metav1.ResourceVersionMatchExact}, expectBypass: true},
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for _, tc := range testCases {
 | 
			
		||||
		testGetListCacheBypass(t, tc.opts, tc.expectBypass)
 | 
			
		||||
	}
 | 
			
		||||
	t.Run("ConsistentListFromStorage", func(t *testing.T) {
 | 
			
		||||
		defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, false)()
 | 
			
		||||
		testCases := append(commonTestCases,
 | 
			
		||||
			testCase{opts: storage.ListOptions{ResourceVersion: ""}, expectBypass: true},
 | 
			
		||||
		)
 | 
			
		||||
		for _, tc := range testCases {
 | 
			
		||||
			testGetListCacheBypass(t, tc.opts, tc.expectBypass)
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
	})
 | 
			
		||||
	t.Run("ConsistentListFromCache", func(t *testing.T) {
 | 
			
		||||
		defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, true)()
 | 
			
		||||
		testCases := append(commonTestCases,
 | 
			
		||||
			testCase{opts: storage.ListOptions{ResourceVersion: ""}, expectBypass: false},
 | 
			
		||||
		)
 | 
			
		||||
		for _, tc := range testCases {
 | 
			
		||||
			testGetListCacheBypass(t, tc.opts, tc.expectBypass)
 | 
			
		||||
		}
 | 
			
		||||
	})
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func testGetListCacheBypass(t *testing.T, options storage.ListOptions, expectBypass bool) {
 | 
			
		||||
@@ -200,7 +215,23 @@ func testGetListCacheBypass(t *testing.T, options storage.ListOptions, expectByp
 | 
			
		||||
		t.Fatalf("unexpected error waiting for the cache to be ready")
 | 
			
		||||
	}
 | 
			
		||||
	// Inject error to underlying layer and check if cacher is not bypassed.
 | 
			
		||||
	backingStorage.injectError(errDummy)
 | 
			
		||||
	backingStorage.getListFn = func(_ context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
 | 
			
		||||
		currentResourceVersion := "42"
 | 
			
		||||
		switch {
 | 
			
		||||
		// request made by getCurrentResourceVersionFromStorage by checking Limit
 | 
			
		||||
		case key == cacher.resourcePrefix:
 | 
			
		||||
			podList := listObj.(*example.PodList)
 | 
			
		||||
			podList.ResourceVersion = currentResourceVersion
 | 
			
		||||
			return nil
 | 
			
		||||
		// request made by storage.GetList with revision from original request or
 | 
			
		||||
		// returned by getCurrentResourceVersionFromStorage
 | 
			
		||||
		case opts.ResourceVersion == options.ResourceVersion || opts.ResourceVersion == currentResourceVersion:
 | 
			
		||||
			return errDummy
 | 
			
		||||
		default:
 | 
			
		||||
			t.Fatalf("Unexpected request %+v", opts)
 | 
			
		||||
			return nil
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	err = cacher.GetList(context.TODO(), "pods/ns", options, result)
 | 
			
		||||
	if err != nil && err != errDummy {
 | 
			
		||||
		t.Fatalf("Unexpected error for List request with options: %v, err: %v", options, err)
 | 
			
		||||
 
 | 
			
		||||
@@ -30,8 +30,10 @@ import (
 | 
			
		||||
	"k8s.io/apimachinery/pkg/runtime"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/runtime/schema"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/watch"
 | 
			
		||||
	"k8s.io/apiserver/pkg/features"
 | 
			
		||||
	"k8s.io/apiserver/pkg/storage"
 | 
			
		||||
	"k8s.io/apiserver/pkg/storage/cacher/metrics"
 | 
			
		||||
	utilfeature "k8s.io/apiserver/pkg/util/feature"
 | 
			
		||||
	"k8s.io/client-go/tools/cache"
 | 
			
		||||
	"k8s.io/component-base/tracing"
 | 
			
		||||
	"k8s.io/klog/v2"
 | 
			
		||||
@@ -412,6 +414,7 @@ func (w *watchCache) UpdateResourceVersion(resourceVersion string) {
 | 
			
		||||
		w.Lock()
 | 
			
		||||
		defer w.Unlock()
 | 
			
		||||
		w.resourceVersion = rv
 | 
			
		||||
		w.cond.Broadcast()
 | 
			
		||||
	}()
 | 
			
		||||
 | 
			
		||||
	// Avoid calling event handler under lock.
 | 
			
		||||
@@ -490,7 +493,14 @@ func (s sortableStoreElements) Swap(i, j int) {
 | 
			
		||||
// WaitUntilFreshAndList returns list of pointers to `storeElement` objects along
 | 
			
		||||
// with their ResourceVersion and the name of the index, if any, that was used.
 | 
			
		||||
func (w *watchCache) WaitUntilFreshAndList(ctx context.Context, resourceVersion uint64, matchValues []storage.MatchValue) ([]interface{}, uint64, string, error) {
 | 
			
		||||
	err := w.waitUntilFreshAndBlock(ctx, resourceVersion)
 | 
			
		||||
	var err error
 | 
			
		||||
	if utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) && w.notFresh(resourceVersion) {
 | 
			
		||||
		w.waitingUntilFresh.Add()
 | 
			
		||||
		err = w.waitUntilFreshAndBlock(ctx, resourceVersion)
 | 
			
		||||
		w.waitingUntilFresh.Remove()
 | 
			
		||||
	} else {
 | 
			
		||||
		err = w.waitUntilFreshAndBlock(ctx, resourceVersion)
 | 
			
		||||
	}
 | 
			
		||||
	defer w.RUnlock()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, 0, "", err
 | 
			
		||||
@@ -513,6 +523,12 @@ func (w *watchCache) WaitUntilFreshAndList(ctx context.Context, resourceVersion
 | 
			
		||||
	return result, rv, index, err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (w *watchCache) notFresh(resourceVersion uint64) bool {
 | 
			
		||||
	w.RLock()
 | 
			
		||||
	defer w.RUnlock()
 | 
			
		||||
	return resourceVersion > w.resourceVersion
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// WaitUntilFreshAndGet returns a pointers to <storeElement> object.
 | 
			
		||||
func (w *watchCache) WaitUntilFreshAndGet(ctx context.Context, resourceVersion uint64, key string) (interface{}, bool, uint64, error) {
 | 
			
		||||
	err := w.waitUntilFreshAndBlock(ctx, resourceVersion)
 | 
			
		||||
 
 | 
			
		||||
@@ -34,8 +34,11 @@ import (
 | 
			
		||||
	"k8s.io/apimachinery/pkg/runtime/schema"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/util/wait"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/watch"
 | 
			
		||||
	"k8s.io/apiserver/pkg/features"
 | 
			
		||||
	"k8s.io/apiserver/pkg/storage"
 | 
			
		||||
	utilfeature "k8s.io/apiserver/pkg/util/feature"
 | 
			
		||||
	"k8s.io/client-go/tools/cache"
 | 
			
		||||
	featuregatetesting "k8s.io/component-base/featuregate/testing"
 | 
			
		||||
	"k8s.io/utils/clock"
 | 
			
		||||
	testingclock "k8s.io/utils/clock/testing"
 | 
			
		||||
)
 | 
			
		||||
@@ -516,6 +519,33 @@ func TestWaitUntilFreshAndList(t *testing.T) {
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestWaitUntilFreshAndListFromCache(t *testing.T) {
 | 
			
		||||
	defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, true)()
 | 
			
		||||
	ctx := context.Background()
 | 
			
		||||
	store := newTestWatchCache(3, &cache.Indexers{})
 | 
			
		||||
	defer store.Stop()
 | 
			
		||||
	// In background, update the store.
 | 
			
		||||
	go func() {
 | 
			
		||||
		store.Add(makeTestPod("pod1", 2))
 | 
			
		||||
		store.bookmarkRevision <- 3
 | 
			
		||||
	}()
 | 
			
		||||
 | 
			
		||||
	// list from future revision. Requires watch cache to request bookmark to get it.
 | 
			
		||||
	list, resourceVersion, indexUsed, err := store.WaitUntilFreshAndList(ctx, 3, nil)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatalf("unexpected error: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
	if resourceVersion != 3 {
 | 
			
		||||
		t.Errorf("unexpected resourceVersion: %v, expected: 6", resourceVersion)
 | 
			
		||||
	}
 | 
			
		||||
	if len(list) != 1 {
 | 
			
		||||
		t.Errorf("unexpected list returned: %#v", list)
 | 
			
		||||
	}
 | 
			
		||||
	if indexUsed != "" {
 | 
			
		||||
		t.Errorf("Used index %q but expected none to be used", indexUsed)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestWaitUntilFreshAndGet(t *testing.T) {
 | 
			
		||||
	ctx := context.Background()
 | 
			
		||||
	store := newTestWatchCache(3, &cache.Indexers{})
 | 
			
		||||
@@ -544,31 +574,51 @@ func TestWaitUntilFreshAndGet(t *testing.T) {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestWaitUntilFreshAndListTimeout(t *testing.T) {
 | 
			
		||||
	ctx := context.Background()
 | 
			
		||||
	store := newTestWatchCache(3, &cache.Indexers{})
 | 
			
		||||
	defer store.Stop()
 | 
			
		||||
	fc := store.clock.(*testingclock.FakeClock)
 | 
			
		||||
 | 
			
		||||
	// In background, step clock after the below call starts the timer.
 | 
			
		||||
	go func() {
 | 
			
		||||
		for !fc.HasWaiters() {
 | 
			
		||||
			time.Sleep(time.Millisecond)
 | 
			
		||||
		}
 | 
			
		||||
		fc.Step(blockTimeout)
 | 
			
		||||
 | 
			
		||||
		// Add an object to make sure the test would
 | 
			
		||||
		// eventually fail instead of just waiting
 | 
			
		||||
		// forever.
 | 
			
		||||
		time.Sleep(30 * time.Second)
 | 
			
		||||
		store.Add(makeTestPod("bar", 5))
 | 
			
		||||
	}()
 | 
			
		||||
 | 
			
		||||
	_, _, _, err := store.WaitUntilFreshAndList(ctx, 5, nil)
 | 
			
		||||
	if !errors.IsTimeout(err) {
 | 
			
		||||
		t.Errorf("expected timeout error but got: %v", err)
 | 
			
		||||
	tcs := []struct {
 | 
			
		||||
		name                    string
 | 
			
		||||
		ConsistentListFromCache bool
 | 
			
		||||
	}{
 | 
			
		||||
		{
 | 
			
		||||
			name:                    "FromStorage",
 | 
			
		||||
			ConsistentListFromCache: false,
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			name:                    "FromCache",
 | 
			
		||||
			ConsistentListFromCache: true,
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
	if !storage.IsTooLargeResourceVersion(err) {
 | 
			
		||||
		t.Errorf("expected 'Too large resource version' cause in error but got: %v", err)
 | 
			
		||||
	for _, tc := range tcs {
 | 
			
		||||
		t.Run(tc.name, func(t *testing.T) {
 | 
			
		||||
			defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, tc.ConsistentListFromCache)()
 | 
			
		||||
			ctx := context.Background()
 | 
			
		||||
			store := newTestWatchCache(3, &cache.Indexers{})
 | 
			
		||||
			defer store.Stop()
 | 
			
		||||
			fc := store.clock.(*testingclock.FakeClock)
 | 
			
		||||
 | 
			
		||||
			// In background, step clock after the below call starts the timer.
 | 
			
		||||
			go func() {
 | 
			
		||||
				for !fc.HasWaiters() {
 | 
			
		||||
					time.Sleep(time.Millisecond)
 | 
			
		||||
				}
 | 
			
		||||
				store.Add(makeTestPod("foo", 2))
 | 
			
		||||
				store.bookmarkRevision <- 3
 | 
			
		||||
				fc.Step(blockTimeout)
 | 
			
		||||
 | 
			
		||||
				// Add an object to make sure the test would
 | 
			
		||||
				// eventually fail instead of just waiting
 | 
			
		||||
				// forever.
 | 
			
		||||
				time.Sleep(30 * time.Second)
 | 
			
		||||
				store.Add(makeTestPod("bar", 4))
 | 
			
		||||
			}()
 | 
			
		||||
 | 
			
		||||
			_, _, _, err := store.WaitUntilFreshAndList(ctx, 4, nil)
 | 
			
		||||
			if !errors.IsTimeout(err) {
 | 
			
		||||
				t.Errorf("expected timeout error but got: %v", err)
 | 
			
		||||
			}
 | 
			
		||||
			if !storage.IsTooLargeResourceVersion(err) {
 | 
			
		||||
				t.Errorf("expected 'Too large resource version' cause in error but got: %v", err)
 | 
			
		||||
			}
 | 
			
		||||
		})
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -149,9 +149,10 @@ func shouldListFromStorage(query url.Values, opts *metav1.ListOptions) bool {
 | 
			
		||||
	resourceVersion := opts.ResourceVersion
 | 
			
		||||
	match := opts.ResourceVersionMatch
 | 
			
		||||
	pagingEnabled := utilfeature.DefaultFeatureGate.Enabled(features.APIListChunking)
 | 
			
		||||
	consistentListFromCacheEnabled := utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache)
 | 
			
		||||
 | 
			
		||||
	// Serve consistent reads from storage
 | 
			
		||||
	consistentReadFromStorage := resourceVersion == ""
 | 
			
		||||
	// Serve consistent reads from storage if ConsistentListFromCache is disabled
 | 
			
		||||
	consistentReadFromStorage := resourceVersion == "" && !consistentListFromCacheEnabled
 | 
			
		||||
	// Watch cache doesn't support continuations, so serve them from etcd.
 | 
			
		||||
	hasContinuation := pagingEnabled && len(opts.Continue) > 0
 | 
			
		||||
	// Serve paginated requests about revision "0" from watch cache to avoid overwhelming etcd.
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user