paginate initial list inside the storage watcher
Signed-off-by: wackxu <xushiwei5@huawei.com>
This commit is contained in:
		@@ -24,6 +24,7 @@ import (
 | 
			
		||||
	"strconv"
 | 
			
		||||
	"strings"
 | 
			
		||||
	"sync"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	grpccodes "google.golang.org/grpc/codes"
 | 
			
		||||
	grpcstatus "google.golang.org/grpc/status"
 | 
			
		||||
@@ -48,6 +49,9 @@ const (
 | 
			
		||||
	outgoingBufSize = 100
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// defaultWatcherMaxLimit is used to facilitate construction tests
 | 
			
		||||
var defaultWatcherMaxLimit int64 = maxLimit
 | 
			
		||||
 | 
			
		||||
// fatalOnDecodeError is used during testing to panic the server if watcher encounters a decoding error
 | 
			
		||||
var fatalOnDecodeError = false
 | 
			
		||||
 | 
			
		||||
@@ -211,17 +215,58 @@ func (wc *watchChan) RequestWatchProgress() error {
 | 
			
		||||
func (wc *watchChan) sync() error {
 | 
			
		||||
	opts := []clientv3.OpOption{}
 | 
			
		||||
	if wc.recursive {
 | 
			
		||||
		opts = append(opts, clientv3.WithPrefix())
 | 
			
		||||
		opts = append(opts, clientv3.WithLimit(defaultWatcherMaxLimit))
 | 
			
		||||
		rangeEnd := clientv3.GetPrefixRangeEnd(wc.key)
 | 
			
		||||
		opts = append(opts, clientv3.WithRange(rangeEnd))
 | 
			
		||||
	}
 | 
			
		||||
	getResp, err := wc.watcher.client.Get(wc.ctx, wc.key, opts...)
 | 
			
		||||
 | 
			
		||||
	var err error
 | 
			
		||||
	var lastKey []byte
 | 
			
		||||
	var withRev int64
 | 
			
		||||
	var getResp *clientv3.GetResponse
 | 
			
		||||
 | 
			
		||||
	metricsOp := "get"
 | 
			
		||||
	if wc.recursive {
 | 
			
		||||
		metricsOp = "list"
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	preparedKey := wc.key
 | 
			
		||||
 | 
			
		||||
	for {
 | 
			
		||||
		startTime := time.Now()
 | 
			
		||||
		getResp, err = wc.watcher.client.KV.Get(wc.ctx, preparedKey, opts...)
 | 
			
		||||
		metrics.RecordEtcdRequest(metricsOp, wc.watcher.groupResource.String(), err, startTime)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
			return interpretListError(err, true, preparedKey, wc.key)
 | 
			
		||||
		}
 | 
			
		||||
	wc.initialRev = getResp.Header.Revision
 | 
			
		||||
	for _, kv := range getResp.Kvs {
 | 
			
		||||
 | 
			
		||||
		if len(getResp.Kvs) == 0 && getResp.More {
 | 
			
		||||
			return fmt.Errorf("no results were found, but etcd indicated there were more values remaining")
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		// send items from the response until no more results
 | 
			
		||||
		for i, kv := range getResp.Kvs {
 | 
			
		||||
			lastKey = kv.Key
 | 
			
		||||
			wc.sendEvent(parseKV(kv))
 | 
			
		||||
			// free kv early. Long lists can take O(seconds) to decode.
 | 
			
		||||
			getResp.Kvs[i] = nil
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		if withRev == 0 {
 | 
			
		||||
			wc.initialRev = getResp.Header.Revision
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		// no more results remain
 | 
			
		||||
		if !getResp.More {
 | 
			
		||||
			return nil
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		preparedKey = string(lastKey) + "\x00"
 | 
			
		||||
		if withRev == 0 {
 | 
			
		||||
			withRev = getResp.Header.Revision
 | 
			
		||||
			opts = append(opts, clientv3.WithRev(withRev))
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func logWatchChannelErr(err error) {
 | 
			
		||||
 
 | 
			
		||||
@@ -20,13 +20,15 @@ import (
 | 
			
		||||
	"context"
 | 
			
		||||
	"errors"
 | 
			
		||||
	"fmt"
 | 
			
		||||
 | 
			
		||||
	"sync"
 | 
			
		||||
	"testing"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	clientv3 "go.etcd.io/etcd/client/v3"
 | 
			
		||||
	apierrors "k8s.io/apimachinery/pkg/api/errors"
 | 
			
		||||
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/watch"
 | 
			
		||||
	"k8s.io/apiserver/pkg/apis/example"
 | 
			
		||||
	"k8s.io/apiserver/pkg/storage"
 | 
			
		||||
	"k8s.io/apiserver/pkg/storage/etcd3/testserver"
 | 
			
		||||
	storagetesting "k8s.io/apiserver/pkg/storage/testing"
 | 
			
		||||
@@ -158,3 +160,156 @@ func TestWatchErrorWhenNoNewFunc(t *testing.T) {
 | 
			
		||||
		t.Fatalf("unexpected err = %v, expected = %v", err, expectedError)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestWatchChanSync(t *testing.T) {
 | 
			
		||||
	testCases := []struct {
 | 
			
		||||
		name             string
 | 
			
		||||
		watchKey         string
 | 
			
		||||
		watcherMaxLimit  int64
 | 
			
		||||
		expectEventCount int
 | 
			
		||||
		expectGetCount   int
 | 
			
		||||
	}{
 | 
			
		||||
		{
 | 
			
		||||
			name:            "None of the current objects match watchKey: sync with empty page",
 | 
			
		||||
			watchKey:        "/pods/test/",
 | 
			
		||||
			watcherMaxLimit: 1,
 | 
			
		||||
			expectGetCount:  1,
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			name:             "The number of current objects is less than defaultWatcherMaxLimit: sync with one page",
 | 
			
		||||
			watchKey:         "/pods/",
 | 
			
		||||
			watcherMaxLimit:  3,
 | 
			
		||||
			expectEventCount: 2,
 | 
			
		||||
			expectGetCount:   1,
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			name:             "a new item added to etcd before returning a second page is not returned: sync with two page",
 | 
			
		||||
			watchKey:         "/pods/",
 | 
			
		||||
			watcherMaxLimit:  1,
 | 
			
		||||
			expectEventCount: 2,
 | 
			
		||||
			expectGetCount:   2,
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for _, testCase := range testCases {
 | 
			
		||||
		t.Run(testCase.name, func(t *testing.T) {
 | 
			
		||||
			defaultWatcherMaxLimit = testCase.watcherMaxLimit
 | 
			
		||||
 | 
			
		||||
			origCtx, store, _ := testSetup(t)
 | 
			
		||||
			initList, err := initStoreData(origCtx, store)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				t.Fatal(err)
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			kvWrapper := newEtcdClientKVWrapper(store.client.KV)
 | 
			
		||||
			kvWrapper.getReactors = append(kvWrapper.getReactors, func() {
 | 
			
		||||
				barThird := &example.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: "third", Name: "bar"}}
 | 
			
		||||
				podKey := fmt.Sprintf("/pods/%s/%s", barThird.Namespace, barThird.Name)
 | 
			
		||||
				storedObj := &example.Pod{}
 | 
			
		||||
 | 
			
		||||
				err := store.Create(context.Background(), podKey, barThird, storedObj, 0)
 | 
			
		||||
				if err != nil {
 | 
			
		||||
					t.Errorf("failed to create object: %v", err)
 | 
			
		||||
				}
 | 
			
		||||
			})
 | 
			
		||||
 | 
			
		||||
			store.client.KV = kvWrapper
 | 
			
		||||
 | 
			
		||||
			w := store.watcher.createWatchChan(
 | 
			
		||||
				origCtx,
 | 
			
		||||
				testCase.watchKey,
 | 
			
		||||
				0,
 | 
			
		||||
				true,
 | 
			
		||||
				false,
 | 
			
		||||
				storage.Everything)
 | 
			
		||||
 | 
			
		||||
			err = w.sync()
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				t.Fatal(err)
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			// close incomingEventChan so we can read incomingEventChan non-blocking
 | 
			
		||||
			close(w.incomingEventChan)
 | 
			
		||||
 | 
			
		||||
			eventsReceived := 0
 | 
			
		||||
			for event := range w.incomingEventChan {
 | 
			
		||||
				eventsReceived++
 | 
			
		||||
				storagetesting.ExpectContains(t, "incorrect list pods", initList, event.key)
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			if eventsReceived != testCase.expectEventCount {
 | 
			
		||||
				t.Errorf("Unexpected number of events: %v, expected: %v", eventsReceived, testCase.expectEventCount)
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			if kvWrapper.getCallCounter != testCase.expectGetCount {
 | 
			
		||||
				t.Errorf("Unexpected called times of client.KV.Get() : %v, expected: %v", kvWrapper.getCallCounter, testCase.expectGetCount)
 | 
			
		||||
			}
 | 
			
		||||
		})
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// NOTE: it's not thread-safe
 | 
			
		||||
type etcdClientKVWrapper struct {
 | 
			
		||||
	clientv3.KV
 | 
			
		||||
	// keeps track of the number of times Get method is called
 | 
			
		||||
	getCallCounter int
 | 
			
		||||
	// getReactors is called after the etcd KV's get function is executed.
 | 
			
		||||
	getReactors []func()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func newEtcdClientKVWrapper(kv clientv3.KV) *etcdClientKVWrapper {
 | 
			
		||||
	return &etcdClientKVWrapper{
 | 
			
		||||
		KV:             kv,
 | 
			
		||||
		getCallCounter: 0,
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (ecw *etcdClientKVWrapper) Get(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.GetResponse, error) {
 | 
			
		||||
	resp, err := ecw.KV.Get(ctx, key, opts...)
 | 
			
		||||
	ecw.getCallCounter++
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if len(ecw.getReactors) > 0 {
 | 
			
		||||
		reactor := ecw.getReactors[0]
 | 
			
		||||
		ecw.getReactors = ecw.getReactors[1:]
 | 
			
		||||
		reactor()
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return resp, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func initStoreData(ctx context.Context, store storage.Interface) ([]interface{}, error) {
 | 
			
		||||
	barFirst := &example.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: "first", Name: "bar"}}
 | 
			
		||||
	barSecond := &example.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: "second", Name: "bar"}}
 | 
			
		||||
 | 
			
		||||
	preset := []struct {
 | 
			
		||||
		key       string
 | 
			
		||||
		obj       *example.Pod
 | 
			
		||||
		storedObj *example.Pod
 | 
			
		||||
	}{
 | 
			
		||||
		{
 | 
			
		||||
			key: fmt.Sprintf("/pods/%s/%s", barFirst.Namespace, barFirst.Name),
 | 
			
		||||
			obj: barFirst,
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			key: fmt.Sprintf("/pods/%s/%s", barSecond.Namespace, barSecond.Name),
 | 
			
		||||
			obj: barSecond,
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for i, ps := range preset {
 | 
			
		||||
		preset[i].storedObj = &example.Pod{}
 | 
			
		||||
		err := store.Create(ctx, ps.key, ps.obj, preset[i].storedObj, 0)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return nil, fmt.Errorf("failed to create object: %w", err)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	var created []interface{}
 | 
			
		||||
	for _, item := range preset {
 | 
			
		||||
		created = append(created, item.key)
 | 
			
		||||
	}
 | 
			
		||||
	return created, nil
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user