resource version parsing should all be in one place
This commit is contained in:
		| @@ -20,7 +20,6 @@ import ( | ||||
| 	"fmt" | ||||
| 	"net/http" | ||||
| 	"reflect" | ||||
| 	"strconv" | ||||
| 	"sync" | ||||
| 	"time" | ||||
|  | ||||
| @@ -290,7 +289,7 @@ func (c *Cacher) Delete(ctx context.Context, key string, out runtime.Object, pre | ||||
|  | ||||
| // Implements storage.Interface. | ||||
| func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string, pred SelectionPredicate) (watch.Interface, error) { | ||||
| 	watchRV, err := ParseWatchResourceVersion(resourceVersion) | ||||
| 	watchRV, err := c.versioner.ParseWatchResourceVersion(resourceVersion) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| @@ -361,7 +360,7 @@ func (c *Cacher) Get(ctx context.Context, key string, resourceVersion string, ob | ||||
| 	// If resourceVersion is specified, serve it from cache. | ||||
| 	// It's guaranteed that the returned value is at least that | ||||
| 	// fresh as the given resourceVersion. | ||||
| 	getRV, err := ParseListResourceVersion(resourceVersion) | ||||
| 	getRV, err := c.versioner.ParseListResourceVersion(resourceVersion) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| @@ -414,7 +413,7 @@ func (c *Cacher) GetToList(ctx context.Context, key string, resourceVersion stri | ||||
| 	// If resourceVersion is specified, serve it from cache. | ||||
| 	// It's guaranteed that the returned value is at least that | ||||
| 	// fresh as the given resourceVersion. | ||||
| 	listRV, err := ParseListResourceVersion(resourceVersion) | ||||
| 	listRV, err := c.versioner.ParseListResourceVersion(resourceVersion) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| @@ -483,7 +482,7 @@ func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, p | ||||
| 	// If resourceVersion is specified, serve it from cache. | ||||
| 	// It's guaranteed that the returned value is at least that | ||||
| 	// fresh as the given resourceVersion. | ||||
| 	listRV, err := ParseListResourceVersion(resourceVersion) | ||||
| 	listRV, err := c.versioner.ParseListResourceVersion(resourceVersion) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| @@ -711,11 +710,7 @@ func (c *Cacher) LastSyncResourceVersion() (uint64, error) { | ||||
| 	c.ready.wait() | ||||
|  | ||||
| 	resourceVersion := c.reflector.LastSyncResourceVersion() | ||||
| 	if resourceVersion == "" { | ||||
| 		return 0, nil | ||||
| 	} | ||||
|  | ||||
| 	return strconv.ParseUint(resourceVersion, 10, 64) | ||||
| 	return c.versioner.ParseListResourceVersion(resourceVersion) | ||||
| } | ||||
|  | ||||
| // cacherListerWatcher opaques storage.Interface to expose cache.ListerWatcher. | ||||
|   | ||||
| @@ -203,3 +203,9 @@ func (testVersioner) PrepareObjectForStorage(obj runtime.Object) error { | ||||
| func (testVersioner) ObjectResourceVersion(obj runtime.Object) (uint64, error) { | ||||
| 	return 0, fmt.Errorf("unimplemented") | ||||
| } | ||||
| func (testVersioner) ParseWatchResourceVersion(resourceVersion string) (uint64, error) { | ||||
| 	return strconv.ParseUint(resourceVersion, 10, 64) | ||||
| } | ||||
| func (testVersioner) ParseListResourceVersion(resourceVersion string) (uint64, error) { | ||||
| 	return strconv.ParseUint(resourceVersion, 10, 64) | ||||
| } | ||||
|   | ||||
| @@ -58,6 +58,7 @@ go_library( | ||||
|         "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", | ||||
|         "//vendor/k8s.io/apimachinery/pkg/util/cache:go_default_library", | ||||
|         "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library", | ||||
|         "//vendor/k8s.io/apimachinery/pkg/util/validation/field:go_default_library", | ||||
|         "//vendor/k8s.io/apimachinery/pkg/watch:go_default_library", | ||||
|         "//vendor/k8s.io/apiserver/pkg/storage:go_default_library", | ||||
|         "//vendor/k8s.io/apiserver/pkg/storage/etcd/metrics:go_default_library", | ||||
|   | ||||
| @@ -21,6 +21,7 @@ import ( | ||||
|  | ||||
| 	"k8s.io/apimachinery/pkg/api/meta" | ||||
| 	"k8s.io/apimachinery/pkg/runtime" | ||||
| 	"k8s.io/apimachinery/pkg/util/validation/field" | ||||
| 	"k8s.io/apiserver/pkg/storage" | ||||
| ) | ||||
|  | ||||
| @@ -81,6 +82,44 @@ func (a APIObjectVersioner) ObjectResourceVersion(obj runtime.Object) (uint64, e | ||||
| 	return strconv.ParseUint(version, 10, 64) | ||||
| } | ||||
|  | ||||
| // ParseWatchResourceVersion takes a resource version argument and converts it to | ||||
| // the etcd version we should pass to helper.Watch(). Because resourceVersion is | ||||
| // an opaque value, the default watch behavior for non-zero watch is to watch | ||||
| // the next value (if you pass "1", you will see updates from "2" onwards). | ||||
| func (a APIObjectVersioner) ParseWatchResourceVersion(resourceVersion string) (uint64, error) { | ||||
| 	if resourceVersion == "" || resourceVersion == "0" { | ||||
| 		return 0, nil | ||||
| 	} | ||||
| 	version, err := strconv.ParseUint(resourceVersion, 10, 64) | ||||
| 	if err != nil { | ||||
| 		return 0, storage.NewInvalidError(field.ErrorList{ | ||||
| 			// Validation errors are supposed to return version-specific field | ||||
| 			// paths, but this is probably close enough. | ||||
| 			field.Invalid(field.NewPath("resourceVersion"), resourceVersion, err.Error()), | ||||
| 		}) | ||||
| 	} | ||||
| 	return version, nil | ||||
| } | ||||
|  | ||||
| // ParseListResourceVersion takes a resource version argument and converts it to | ||||
| // the etcd version. | ||||
| // TODO: reevaluate whether it is really clearer to have both this and the | ||||
| // Watch version of this function, since they perform the same logic. | ||||
| func (a APIObjectVersioner) ParseListResourceVersion(resourceVersion string) (uint64, error) { | ||||
| 	if resourceVersion == "" { | ||||
| 		return 0, nil | ||||
| 	} | ||||
| 	version, err := strconv.ParseUint(resourceVersion, 10, 64) | ||||
| 	if err != nil { | ||||
| 		return 0, storage.NewInvalidError(field.ErrorList{ | ||||
| 			// Validation errors are supposed to return version-specific field | ||||
| 			// paths, but this is probably close enough. | ||||
| 			field.Invalid(field.NewPath("resourceVersion"), resourceVersion, err.Error()), | ||||
| 		}) | ||||
| 	} | ||||
| 	return version, nil | ||||
| } | ||||
|  | ||||
| // APIObjectVersioner implements Versioner | ||||
| var Versioner storage.Versioner = APIObjectVersioner{} | ||||
|  | ||||
|   | ||||
| @@ -20,6 +20,7 @@ import ( | ||||
| 	"testing" | ||||
|  | ||||
| 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||||
| 	"k8s.io/apiserver/pkg/storage" | ||||
| 	storagetesting "k8s.io/apiserver/pkg/storage/testing" | ||||
| ) | ||||
|  | ||||
| @@ -40,6 +41,43 @@ func TestObjectVersioner(t *testing.T) { | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func TestEtcdParseResourceVersion(t *testing.T) { | ||||
| 	testCases := []struct { | ||||
| 		Version       string | ||||
| 		ExpectVersion uint64 | ||||
| 		Err           bool | ||||
| 	}{ | ||||
| 		{Version: "", ExpectVersion: 0}, | ||||
| 		{Version: "a", Err: true}, | ||||
| 		{Version: " ", Err: true}, | ||||
| 		{Version: "1", ExpectVersion: 1}, | ||||
| 		{Version: "10", ExpectVersion: 10}, | ||||
| 	} | ||||
|  | ||||
| 	v := APIObjectVersioner{} | ||||
| 	testFuncs := []func(string) (uint64, error){ | ||||
| 		v.ParseListResourceVersion, | ||||
| 		v.ParseWatchResourceVersion, | ||||
| 	} | ||||
|  | ||||
| 	for _, testCase := range testCases { | ||||
| 		for i, f := range testFuncs { | ||||
| 			version, err := f(testCase.Version) | ||||
| 			switch { | ||||
| 			case testCase.Err && err == nil: | ||||
| 				t.Errorf("%s[%v]: unexpected non-error", testCase.Version, i) | ||||
| 			case testCase.Err && !storage.IsInvalidError(err): | ||||
| 				t.Errorf("%s[%v]: unexpected error: %v", testCase.Version, i, err) | ||||
| 			case !testCase.Err && err != nil: | ||||
| 				t.Errorf("%s[%v]: unexpected error: %v", testCase.Version, i, err) | ||||
| 			} | ||||
| 			if version != testCase.ExpectVersion { | ||||
| 				t.Errorf("%s[%v]: expected version %d but was %d", testCase.Version, i, testCase.ExpectVersion, version) | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func TestCompareResourceVersion(t *testing.T) { | ||||
| 	five := &storagetesting.TestResource{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "5"}} | ||||
| 	six := &storagetesting.TestResource{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "6"}} | ||||
|   | ||||
| @@ -235,7 +235,7 @@ func (h *etcdHelper) Watch(ctx context.Context, key string, resourceVersion stri | ||||
| 	if ctx == nil { | ||||
| 		glog.Errorf("Context is nil") | ||||
| 	} | ||||
| 	watchRV, err := storage.ParseWatchResourceVersion(resourceVersion) | ||||
| 	watchRV, err := h.versioner.ParseWatchResourceVersion(resourceVersion) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| @@ -250,7 +250,7 @@ func (h *etcdHelper) WatchList(ctx context.Context, key string, resourceVersion | ||||
| 	if ctx == nil { | ||||
| 		glog.Errorf("Context is nil") | ||||
| 	} | ||||
| 	watchRV, err := storage.ParseWatchResourceVersion(resourceVersion) | ||||
| 	watchRV, err := h.versioner.ParseWatchResourceVersion(resourceVersion) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
|   | ||||
| @@ -24,7 +24,6 @@ import ( | ||||
| 	"fmt" | ||||
| 	"path" | ||||
| 	"reflect" | ||||
| 	"strconv" | ||||
| 	"strings" | ||||
| 	"time" | ||||
|  | ||||
| @@ -524,14 +523,14 @@ func (s *store) List(ctx context.Context, key, resourceVersion string, pred stor | ||||
|  | ||||
| 	case s.pagingEnabled && pred.Limit > 0: | ||||
| 		if len(resourceVersion) > 0 { | ||||
| 			fromRV, err := strconv.ParseInt(resourceVersion, 10, 64) | ||||
| 			fromRV, err := s.versioner.ParseListResourceVersion(resourceVersion) | ||||
| 			if err != nil { | ||||
| 				return apierrors.NewBadRequest(fmt.Sprintf("invalid resource version: %v", err)) | ||||
| 			} | ||||
| 			if fromRV > 0 { | ||||
| 				options = append(options, clientv3.WithRev(fromRV)) | ||||
| 				options = append(options, clientv3.WithRev(int64(fromRV))) | ||||
| 			} | ||||
| 			returnedRV = fromRV | ||||
| 			returnedRV = int64(fromRV) | ||||
| 		} | ||||
|  | ||||
| 		rangeEnd := clientv3.GetPrefixRangeEnd(keyPrefix) | ||||
| @@ -539,14 +538,14 @@ func (s *store) List(ctx context.Context, key, resourceVersion string, pred stor | ||||
|  | ||||
| 	default: | ||||
| 		if len(resourceVersion) > 0 { | ||||
| 			fromRV, err := strconv.ParseInt(resourceVersion, 10, 64) | ||||
| 			fromRV, err := s.versioner.ParseListResourceVersion(resourceVersion) | ||||
| 			if err != nil { | ||||
| 				return apierrors.NewBadRequest(fmt.Sprintf("invalid resource version: %v", err)) | ||||
| 			} | ||||
| 			if fromRV > 0 { | ||||
| 				options = append(options, clientv3.WithRev(fromRV)) | ||||
| 				options = append(options, clientv3.WithRev(int64(fromRV))) | ||||
| 			} | ||||
| 			returnedRV = fromRV | ||||
| 			returnedRV = int64(fromRV) | ||||
| 		} | ||||
|  | ||||
| 		options = append(options, clientv3.WithPrefix()) | ||||
| @@ -666,7 +665,7 @@ func (s *store) WatchList(ctx context.Context, key string, resourceVersion strin | ||||
| } | ||||
|  | ||||
| func (s *store) watch(ctx context.Context, key string, rv string, pred storage.SelectionPredicate, recursive bool) (watch.Interface, error) { | ||||
| 	rev, err := storage.ParseWatchResourceVersion(rv) | ||||
| 	rev, err := s.versioner.ParseWatchResourceVersion(rv) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
|   | ||||
| @@ -19,7 +19,6 @@ package etcd3 | ||||
| import ( | ||||
| 	"fmt" | ||||
| 	"reflect" | ||||
| 	"strconv" | ||||
| 	"sync" | ||||
| 	"testing" | ||||
| 	"time" | ||||
| @@ -186,7 +185,7 @@ func TestWatchFromZero(t *testing.T) { | ||||
| 	} | ||||
|  | ||||
| 	// Compact previous versions | ||||
| 	revToCompact, err := strconv.Atoi(out.ResourceVersion) | ||||
| 	revToCompact, err := store.versioner.ParseListResourceVersion(out.ResourceVersion) | ||||
| 	if err != nil { | ||||
| 		t.Fatalf("Error converting %q to an int: %v", storedObj.ResourceVersion, err) | ||||
| 	} | ||||
| @@ -305,7 +304,7 @@ func TestWatchDeleteEventObjectHaveLatestRV(t *testing.T) { | ||||
| 	var wres clientv3.WatchResponse | ||||
| 	wres = <-etcdW | ||||
|  | ||||
| 	watchedDeleteRev, err := storage.ParseWatchResourceVersion(watchedDeleteObj.ResourceVersion) | ||||
| 	watchedDeleteRev, err := store.versioner.ParseWatchResourceVersion(watchedDeleteObj.ResourceVersion) | ||||
| 	if err != nil { | ||||
| 		t.Fatalf("ParseWatchResourceVersion failed: %v", err) | ||||
| 	} | ||||
|   | ||||
| @@ -28,7 +28,9 @@ import ( | ||||
| // Versioner abstracts setting and retrieving metadata fields from database response | ||||
| // onto the object ot list. It is required to maintain storage invariants - updating an | ||||
| // object twice with the same data except for the ResourceVersion and SelfLink must be | ||||
| // a no-op. | ||||
| // a no-op. A resourceVersion of type uint64 is a 'raw' resourceVersion, | ||||
| // intended to be sent directly to or from the backend. A resourceVersion of | ||||
| // type string is a 'safe' resourceVersion, intended for consumption by users. | ||||
| type Versioner interface { | ||||
| 	// UpdateObject sets storage metadata into an API object. Returns an error if the object | ||||
| 	// cannot be updated correctly. May return nil if the requested object does not need metadata | ||||
| @@ -45,6 +47,17 @@ type Versioner interface { | ||||
| 	// ObjectResourceVersion returns the resource version (for persistence) of the specified object. | ||||
| 	// Should return an error if the specified object does not have a persistable version. | ||||
| 	ObjectResourceVersion(obj runtime.Object) (uint64, error) | ||||
|  | ||||
| 	// ParseWatchResourceVersion takes a resource version argument and | ||||
| 	// converts it to the storage backend we should pass to helper.Watch(). | ||||
| 	// Because resourceVersion is an opaque value, the default watch | ||||
| 	// behavior for non-zero watch is to watch the next value (if you pass | ||||
| 	// "1", you will see updates from "2" onwards). | ||||
| 	ParseWatchResourceVersion(resourceVersion string) (uint64, error) | ||||
| 	// ParseListResourceVersion takes a resource version argument and | ||||
| 	// converts it to the storage backend version. Appropriate for | ||||
| 	// everything that's not intended as an argument for watch. | ||||
| 	ParseListResourceVersion(resourceVersion string) (uint64, error) | ||||
| } | ||||
|  | ||||
| // ResponseMeta contains information about the database metadata that is associated with | ||||
|   | ||||
| @@ -97,12 +97,13 @@ func newEtcdTestStorage(t *testing.T, prefix string) (*etcdtesting.EtcdTestServe | ||||
| 	return server, storage | ||||
| } | ||||
|  | ||||
| func newTestCacher(s storage.Interface, cap int) *storage.Cacher { | ||||
| func newTestCacher(s storage.Interface, cap int) (*storage.Cacher, storage.Versioner) { | ||||
| 	prefix := "pods" | ||||
| 	v := etcdstorage.APIObjectVersioner{} | ||||
| 	config := storage.CacherConfig{ | ||||
| 		CacheCapacity:  cap, | ||||
| 		Storage:        s, | ||||
| 		Versioner:      etcdstorage.APIObjectVersioner{}, | ||||
| 		Versioner:      v, | ||||
| 		Type:           &example.Pod{}, | ||||
| 		ResourcePrefix: prefix, | ||||
| 		KeyFunc:        func(obj runtime.Object) (string, error) { return storage.NamespaceKeyFunc(prefix, obj) }, | ||||
| @@ -110,7 +111,7 @@ func newTestCacher(s storage.Interface, cap int) *storage.Cacher { | ||||
| 		NewListFunc:    func() runtime.Object { return &example.PodList{} }, | ||||
| 		Codec:          codecs.LegacyCodec(examplev1.SchemeGroupVersion), | ||||
| 	} | ||||
| 	return storage.NewCacherFromConfig(config) | ||||
| 	return storage.NewCacherFromConfig(config), v | ||||
| } | ||||
|  | ||||
| func makeTestPod(name string) *example.Pod { | ||||
| @@ -139,7 +140,7 @@ func updatePod(t *testing.T, s storage.Interface, obj, old *example.Pod) *exampl | ||||
| func TestGet(t *testing.T) { | ||||
| 	server, etcdStorage := newEtcdTestStorage(t, etcdtest.PathPrefix()) | ||||
| 	defer server.Terminate(t) | ||||
| 	cacher := newTestCacher(etcdStorage, 10) | ||||
| 	cacher, _ := newTestCacher(etcdStorage, 10) | ||||
| 	defer cacher.Stop() | ||||
|  | ||||
| 	podFoo := makeTestPod("foo") | ||||
| @@ -170,7 +171,7 @@ func TestGet(t *testing.T) { | ||||
| func TestList(t *testing.T) { | ||||
| 	server, etcdStorage := newEtcdTestStorage(t, etcdtest.PathPrefix()) | ||||
| 	defer server.Terminate(t) | ||||
| 	cacher := newTestCacher(etcdStorage, 10) | ||||
| 	cacher, _ := newTestCacher(etcdStorage, 10) | ||||
| 	defer cacher.Stop() | ||||
|  | ||||
| 	podFoo := makeTestPod("foo") | ||||
| @@ -251,14 +252,14 @@ func TestList(t *testing.T) { | ||||
| func TestInfiniteList(t *testing.T) { | ||||
| 	server, etcdStorage := newEtcdTestStorage(t, etcdtest.PathPrefix()) | ||||
| 	defer server.Terminate(t) | ||||
| 	cacher := newTestCacher(etcdStorage, 10) | ||||
| 	cacher, v := newTestCacher(etcdStorage, 10) | ||||
| 	defer cacher.Stop() | ||||
|  | ||||
| 	podFoo := makeTestPod("foo") | ||||
| 	fooCreated := updatePod(t, etcdStorage, podFoo, nil) | ||||
|  | ||||
| 	// Set up List at fooCreated.ResourceVersion + 10 | ||||
| 	rv, err := storage.ParseWatchResourceVersion(fooCreated.ResourceVersion) | ||||
| 	rv, err := v.ParseWatchResourceVersion(fooCreated.ResourceVersion) | ||||
| 	if err != nil { | ||||
| 		t.Fatalf("Unexpected error: %v", err) | ||||
| 	} | ||||
| @@ -307,7 +308,7 @@ func TestWatch(t *testing.T) { | ||||
| 	// Inject one list error to make sure we test the relist case. | ||||
| 	etcdStorage = &injectListError{errors: 1, Interface: etcdStorage} | ||||
| 	defer server.Terminate(t) | ||||
| 	cacher := newTestCacher(etcdStorage, 3) // small capacity to trigger "too old version" error | ||||
| 	cacher, _ := newTestCacher(etcdStorage, 3) // small capacity to trigger "too old version" error | ||||
| 	defer cacher.Stop() | ||||
|  | ||||
| 	podFoo := makeTestPod("foo") | ||||
| @@ -382,7 +383,7 @@ func TestWatch(t *testing.T) { | ||||
| func TestWatcherTimeout(t *testing.T) { | ||||
| 	server, etcdStorage := newEtcdTestStorage(t, etcdtest.PathPrefix()) | ||||
| 	defer server.Terminate(t) | ||||
| 	cacher := newTestCacher(etcdStorage, 10) | ||||
| 	cacher, _ := newTestCacher(etcdStorage, 10) | ||||
| 	defer cacher.Stop() | ||||
|  | ||||
| 	// initialVersion is used to initate the watcher at the beginning of the world, | ||||
| @@ -424,7 +425,7 @@ func TestWatcherTimeout(t *testing.T) { | ||||
| func TestFiltering(t *testing.T) { | ||||
| 	server, etcdStorage := newEtcdTestStorage(t, etcdtest.PathPrefix()) | ||||
| 	defer server.Terminate(t) | ||||
| 	cacher := newTestCacher(etcdStorage, 10) | ||||
| 	cacher, _ := newTestCacher(etcdStorage, 10) | ||||
| 	defer cacher.Stop() | ||||
|  | ||||
| 	// Ensure that the cacher is initialized, before creating any pods, | ||||
| @@ -486,7 +487,7 @@ func TestFiltering(t *testing.T) { | ||||
| func TestStartingResourceVersion(t *testing.T) { | ||||
| 	server, etcdStorage := newEtcdTestStorage(t, etcdtest.PathPrefix()) | ||||
| 	defer server.Terminate(t) | ||||
| 	cacher := newTestCacher(etcdStorage, 10) | ||||
| 	cacher, v := newTestCacher(etcdStorage, 10) | ||||
| 	defer cacher.Stop() | ||||
|  | ||||
| 	// add 1 object | ||||
| @@ -494,7 +495,7 @@ func TestStartingResourceVersion(t *testing.T) { | ||||
| 	fooCreated := updatePod(t, etcdStorage, podFoo, nil) | ||||
|  | ||||
| 	// Set up Watch starting at fooCreated.ResourceVersion + 10 | ||||
| 	rv, err := storage.ParseWatchResourceVersion(fooCreated.ResourceVersion) | ||||
| 	rv, err := v.ParseWatchResourceVersion(fooCreated.ResourceVersion) | ||||
| 	if err != nil { | ||||
| 		t.Fatalf("Unexpected error: %v", err) | ||||
| 	} | ||||
| @@ -517,7 +518,7 @@ func TestStartingResourceVersion(t *testing.T) { | ||||
| 	select { | ||||
| 	case e := <-watcher.ResultChan(): | ||||
| 		pod := e.Object.(*example.Pod) | ||||
| 		podRV, err := storage.ParseWatchResourceVersion(pod.ResourceVersion) | ||||
| 		podRV, err := v.ParseWatchResourceVersion(pod.ResourceVersion) | ||||
| 		if err != nil { | ||||
| 			t.Fatalf("unexpected error: %v", err) | ||||
| 		} | ||||
| @@ -544,15 +545,15 @@ func TestEmptyWatchEventCache(t *testing.T) { | ||||
|  | ||||
| 	fooCreated := updatePod(t, etcdStorage, makeTestPod("foo"), nil) | ||||
|  | ||||
| 	cacher, v := newTestCacher(etcdStorage, 10) | ||||
| 	defer cacher.Stop() | ||||
|  | ||||
| 	// get rv of last pod created | ||||
| 	rv, err := storage.ParseWatchResourceVersion(fooCreated.ResourceVersion) | ||||
| 	rv, err := v.ParseWatchResourceVersion(fooCreated.ResourceVersion) | ||||
| 	if err != nil { | ||||
| 		t.Fatalf("Unexpected error: %v", err) | ||||
| 	} | ||||
|  | ||||
| 	cacher := newTestCacher(etcdStorage, 10) | ||||
| 	defer cacher.Stop() | ||||
|  | ||||
| 	// We now have a cacher with an empty cache of watch events and a resourceVersion of rv. | ||||
| 	// It should support establishing watches from rv and higher, but not older. | ||||
|  | ||||
| @@ -598,11 +599,11 @@ func TestEmptyWatchEventCache(t *testing.T) { | ||||
| func TestRandomWatchDeliver(t *testing.T) { | ||||
| 	server, etcdStorage := newEtcdTestStorage(t, etcdtest.PathPrefix()) | ||||
| 	defer server.Terminate(t) | ||||
| 	cacher := newTestCacher(etcdStorage, 10) | ||||
| 	cacher, v := newTestCacher(etcdStorage, 10) | ||||
| 	defer cacher.Stop() | ||||
|  | ||||
| 	fooCreated := updatePod(t, etcdStorage, makeTestPod("foo"), nil) | ||||
| 	rv, err := storage.ParseWatchResourceVersion(fooCreated.ResourceVersion) | ||||
| 	rv, err := v.ParseWatchResourceVersion(fooCreated.ResourceVersion) | ||||
| 	if err != nil { | ||||
| 		t.Fatalf("Unexpected error: %v", err) | ||||
| 	} | ||||
|   | ||||
| @@ -18,14 +18,12 @@ package storage | ||||
|  | ||||
| import ( | ||||
| 	"fmt" | ||||
| 	"strconv" | ||||
| 	"strings" | ||||
| 	"sync/atomic" | ||||
|  | ||||
| 	"k8s.io/apimachinery/pkg/api/meta" | ||||
| 	"k8s.io/apimachinery/pkg/api/validation/path" | ||||
| 	"k8s.io/apimachinery/pkg/runtime" | ||||
| 	"k8s.io/apimachinery/pkg/util/validation/field" | ||||
| ) | ||||
|  | ||||
| type SimpleUpdateFunc func(runtime.Object) (runtime.Object, error) | ||||
| @@ -50,35 +48,6 @@ func NoTriggerPublisher(runtime.Object) []MatchValue { | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // ParseWatchResourceVersion takes a resource version argument and converts it to | ||||
| // the etcd version we should pass to helper.Watch(). Because resourceVersion is | ||||
| // an opaque value, the default watch behavior for non-zero watch is to watch | ||||
| // the next value (if you pass "1", you will see updates from "2" onwards). | ||||
| func ParseWatchResourceVersion(resourceVersion string) (uint64, error) { | ||||
| 	if resourceVersion == "" || resourceVersion == "0" { | ||||
| 		return 0, nil | ||||
| 	} | ||||
| 	version, err := strconv.ParseUint(resourceVersion, 10, 64) | ||||
| 	if err != nil { | ||||
| 		return 0, NewInvalidError(field.ErrorList{ | ||||
| 			// Validation errors are supposed to return version-specific field | ||||
| 			// paths, but this is probably close enough. | ||||
| 			field.Invalid(field.NewPath("resourceVersion"), resourceVersion, err.Error()), | ||||
| 		}) | ||||
| 	} | ||||
| 	return version, nil | ||||
| } | ||||
|  | ||||
| // ParseListResourceVersion takes a resource version argument and converts it to | ||||
| // the etcd version. | ||||
| func ParseListResourceVersion(resourceVersion string) (uint64, error) { | ||||
| 	if resourceVersion == "" { | ||||
| 		return 0, nil | ||||
| 	} | ||||
| 	version, err := strconv.ParseUint(resourceVersion, 10, 64) | ||||
| 	return version, err | ||||
| } | ||||
|  | ||||
| func NamespaceKeyFunc(prefix string, obj runtime.Object) (string, error) { | ||||
| 	meta, err := meta.Accessor(obj) | ||||
| 	if err != nil { | ||||
|   | ||||
| @@ -22,40 +22,6 @@ import ( | ||||
| 	"testing" | ||||
| ) | ||||
|  | ||||
| func TestEtcdParseWatchResourceVersion(t *testing.T) { | ||||
| 	testCases := []struct { | ||||
| 		Version       string | ||||
| 		ExpectVersion uint64 | ||||
| 		Err           bool | ||||
| 	}{ | ||||
| 		{Version: "", ExpectVersion: 0}, | ||||
| 		{Version: "a", Err: true}, | ||||
| 		{Version: " ", Err: true}, | ||||
| 		{Version: "1", ExpectVersion: 1}, | ||||
| 		{Version: "10", ExpectVersion: 10}, | ||||
| 	} | ||||
| 	for _, testCase := range testCases { | ||||
| 		version, err := ParseWatchResourceVersion(testCase.Version) | ||||
| 		switch { | ||||
| 		case testCase.Err: | ||||
| 			if err == nil { | ||||
| 				t.Errorf("%s: unexpected non-error", testCase.Version) | ||||
| 				continue | ||||
| 			} | ||||
| 			if !IsInvalidError(err) { | ||||
| 				t.Errorf("%s: unexpected error: %v", testCase.Version, err) | ||||
| 				continue | ||||
| 			} | ||||
| 		case !testCase.Err && err != nil: | ||||
| 			t.Errorf("%s: unexpected error: %v", testCase.Version, err) | ||||
| 			continue | ||||
| 		} | ||||
| 		if version != testCase.ExpectVersion { | ||||
| 			t.Errorf("%s: expected version %d but was %d", testCase.Version, testCase.ExpectVersion, version) | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func TestHasPathPrefix(t *testing.T) { | ||||
| 	validTestcases := []struct { | ||||
| 		s      string | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 Daniel Smith
					Daniel Smith