From f10b0205e760116630ce3db93c2ef7ec6061dc4f Mon Sep 17 00:00:00 2001 From: Wojciech Tyczynski Date: Tue, 18 Oct 2016 11:07:56 +0200 Subject: [PATCH] Store keys in watchCache store --- pkg/storage/cacher.go | 10 ++-- pkg/storage/watch_cache.go | 90 ++++++++++++++++++++++++++++----- pkg/storage/watch_cache_test.go | 13 +++-- 3 files changed, 91 insertions(+), 22 deletions(-) diff --git a/pkg/storage/cacher.go b/pkg/storage/cacher.go index 06a21591e98..98626f0dc4e 100644 --- a/pkg/storage/cacher.go +++ b/pkg/storage/cacher.go @@ -183,7 +183,7 @@ type Cacher struct { // internal cache and updating its cache in the background based on the given // configuration. func NewCacherFromConfig(config CacherConfig) *Cacher { - watchCache := newWatchCache(config.CacheCapacity) + watchCache := newWatchCache(config.CacheCapacity, config.KeyFunc) listerWatcher := newCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc) // Give this error when it is constructed rather than when you get the @@ -390,12 +390,12 @@ func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, p } trace.Step(fmt.Sprintf("Listed %d items from cache", len(objs))) for _, obj := range objs { - object, ok := obj.(runtime.Object) + elem, ok := obj.(*storeElement) if !ok { - return fmt.Errorf("non runtime.Object returned from storage: %v", obj) + return fmt.Errorf("non *storeElement returned from storage: %v", obj) } - if filter(object) { - listVal.Set(reflect.Append(listVal, reflect.ValueOf(object).Elem())) + if filter(elem.Object) { + listVal.Set(reflect.Append(listVal, reflect.ValueOf(elem.Object).Elem())) } } trace.Step(fmt.Sprintf("Filtered %d items", listVal.Len())) diff --git a/pkg/storage/watch_cache.go b/pkg/storage/watch_cache.go index c6641e56178..76f56d50160 100644 --- a/pkg/storage/watch_cache.go +++ b/pkg/storage/watch_cache.go @@ -49,6 +49,23 @@ type watchCacheEvent struct { ResourceVersion uint64 } +// Computing a key of an object is generally non-trivial (it performs +// e.g. validation underneath). To avoid computing it multiple times +// (to serve the event in different List/Watch requests), in the +// underlying store we are keeping pair (key, object). +type storeElement struct { + Key string + Object runtime.Object +} + +func storeElementKey(obj interface{}) (string, error) { + elem, ok := obj.(*storeElement) + if !ok { + return "", fmt.Errorf("not a storeElement: %v", obj) + } + return elem.Key, nil +} + // watchCacheElement is a single "watch event" stored in a cache. // It contains the resource version of the object and the object // itself. @@ -72,6 +89,9 @@ type watchCache struct { // Maximum size of history window. capacity int + // keyFunc is used to get a key in the underlying storage for a given object. + keyFunc func(runtime.Object) (string, error) + // cache is used a cyclic buffer - its first element (with the smallest // resourceVersion) is defined by startIndex, its last element is defined // by endIndex (if cache is full it will be startIndex + capacity). @@ -100,13 +120,14 @@ type watchCache struct { clock clock.Clock } -func newWatchCache(capacity int) *watchCache { +func newWatchCache(capacity int, keyFunc func(runtime.Object) (string, error)) *watchCache { wc := &watchCache{ capacity: capacity, + keyFunc: keyFunc, cache: make([]watchCacheElement, capacity), startIndex: 0, endIndex: 0, - store: cache.NewStore(cache.MetaNamespaceKeyFunc), + store: cache.NewStore(storeElementKey), resourceVersion: 0, clock: clock.RealClock{}, } @@ -114,6 +135,7 @@ func newWatchCache(capacity int) *watchCache { return wc } +// Add takes runtime.Object as an argument. func (w *watchCache) Add(obj interface{}) error { object, resourceVersion, err := objectToVersionedRuntimeObject(obj) if err != nil { @@ -121,10 +143,11 @@ func (w *watchCache) Add(obj interface{}) error { } event := watch.Event{Type: watch.Added, Object: object} - f := func(obj runtime.Object) error { return w.store.Add(obj) } + f := func(elem *storeElement) error { return w.store.Add(elem) } return w.processEvent(event, resourceVersion, f) } +// Update takes runtime.Object as an argument. func (w *watchCache) Update(obj interface{}) error { object, resourceVersion, err := objectToVersionedRuntimeObject(obj) if err != nil { @@ -132,10 +155,11 @@ func (w *watchCache) Update(obj interface{}) error { } event := watch.Event{Type: watch.Modified, Object: object} - f := func(obj runtime.Object) error { return w.store.Update(obj) } + f := func(elem *storeElement) error { return w.store.Update(elem) } return w.processEvent(event, resourceVersion, f) } +// Delete takes runtime.Object as an argument. func (w *watchCache) Delete(obj interface{}) error { object, resourceVersion, err := objectToVersionedRuntimeObject(obj) if err != nil { @@ -143,7 +167,7 @@ func (w *watchCache) Delete(obj interface{}) error { } event := watch.Event{Type: watch.Deleted, Object: object} - f := func(obj runtime.Object) error { return w.store.Delete(obj) } + f := func(elem *storeElement) error { return w.store.Delete(elem) } return w.processEvent(event, resourceVersion, f) } @@ -170,16 +194,22 @@ func parseResourceVersion(resourceVersion string) (uint64, error) { return strconv.ParseUint(resourceVersion, 10, 64) } -func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, updateFunc func(runtime.Object) error) error { +func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, updateFunc func(*storeElement) error) error { + key, err := w.keyFunc(event.Object) + if err != nil { + return fmt.Errorf("couldn't compute key: %v", err) + } + elem := &storeElement{Key: key, Object: event.Object} + w.Lock() defer w.Unlock() - previous, exists, err := w.store.Get(event.Object) + previous, exists, err := w.store.Get(elem) if err != nil { return err } var prevObject runtime.Object if exists { - prevObject = previous.(runtime.Object) + prevObject = previous.(*storeElement).Object } watchCacheEvent := watchCacheEvent{ Type: event.Type, @@ -193,7 +223,7 @@ func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, upd w.updateCache(resourceVersion, &watchCacheEvent) w.resourceVersion = resourceVersion w.cond.Broadcast() - return updateFunc(event.Object) + return updateFunc(elem) } // Assumes that lock is already held for write. @@ -206,12 +236,14 @@ func (w *watchCache) updateCache(resourceVersion uint64, event *watchCacheEvent) w.endIndex++ } +// List returns list of pointers to objects. func (w *watchCache) List() []interface{} { w.RLock() defer w.RUnlock() return w.store.List() } +// WaitUntilFreshAndList returns list of pointers to objects. func (w *watchCache) WaitUntilFreshAndList(resourceVersion uint64, trace *util.Trace) ([]interface{}, uint64, error) { startTime := w.clock.Now() go func() { @@ -249,30 +281,56 @@ func (w *watchCache) ListKeys() []string { return w.store.ListKeys() } +// Get takes runtime.Object as a parameter. However, it returns +// pointer to . func (w *watchCache) Get(obj interface{}) (interface{}, bool, error) { + object, ok := obj.(runtime.Object) + if !ok { + return nil, false, fmt.Errorf("obj does not implement runtime.Object interface: %v", obj) + } + key, err := w.keyFunc(object) + if err != nil { + return nil, false, fmt.Errorf("couldn't compute key: %v", err) + } + w.RLock() defer w.RUnlock() - return w.store.Get(obj) + return w.store.Get(&storeElement{Key: key, Object: object}) } +// GetByKey returns pointer to . func (w *watchCache) GetByKey(key string) (interface{}, bool, error) { w.RLock() defer w.RUnlock() return w.store.GetByKey(key) } +// Replace takes slice of runtime.Object as a paramater. func (w *watchCache) Replace(objs []interface{}, resourceVersion string) error { version, err := parseResourceVersion(resourceVersion) if err != nil { return err } + toReplace := make([]interface{}, 0, len(objs)) + for _, obj := range objs { + object, ok := obj.(runtime.Object) + if !ok { + return fmt.Errorf("didn't get runtime.Object for replace: %#v", obj) + } + key, err := w.keyFunc(object) + if err != nil { + return fmt.Errorf("couldn't compute key: %v", err) + } + toReplace = append(toReplace, &storeElement{Key: key, Object: object}) + } + w.Lock() defer w.Unlock() w.startIndex = 0 w.endIndex = 0 - if err := w.store.Replace(objs, resourceVersion); err != nil { + if err := w.store.Replace(toReplace, resourceVersion); err != nil { return err } w.resourceVersion = version @@ -311,7 +369,15 @@ func (w *watchCache) GetAllEventsSinceThreadUnsafe(resourceVersion uint64) ([]wa allItems := w.store.List() result := make([]watchCacheEvent, len(allItems)) for i, item := range allItems { - result[i] = watchCacheEvent{Type: watch.Added, Object: item.(runtime.Object)} + elem, ok := item.(*storeElement) + if !ok { + return nil, fmt.Errorf("not a storeElement: %v", elem) + } + result[i] = watchCacheEvent{ + Type: watch.Added, + Object: elem.Object, + ResourceVersion: w.resourceVersion, + } } return result, nil } diff --git a/pkg/storage/watch_cache_test.go b/pkg/storage/watch_cache_test.go index 7843a9d398e..57628c362b1 100644 --- a/pkg/storage/watch_cache_test.go +++ b/pkg/storage/watch_cache_test.go @@ -44,7 +44,10 @@ func makeTestPod(name string, resourceVersion uint64) *api.Pod { // newTestWatchCache just adds a fake clock. func newTestWatchCache(capacity int) *watchCache { - wc := newWatchCache(capacity) + keyFunc := func(obj runtime.Object) (string, error) { + return NamespaceKeyFunc("prefix", obj) + } + wc := newWatchCache(capacity, keyFunc) wc.clock = clock.NewFakeClock(time.Now()) return wc } @@ -60,7 +63,7 @@ func TestWatchCacheBasic(t *testing.T) { if item, ok, _ := store.Get(pod1); !ok { t.Errorf("didn't find pod") } else { - if !api.Semantic.DeepEqual(pod1, item) { + if !api.Semantic.DeepEqual(&storeElement{Key: "prefix/ns/pod", Object: pod1}, item) { t.Errorf("expected %v, got %v", pod1, item) } } @@ -71,7 +74,7 @@ func TestWatchCacheBasic(t *testing.T) { if item, ok, _ := store.Get(pod2); !ok { t.Errorf("didn't find pod") } else { - if !api.Semantic.DeepEqual(pod2, item) { + if !api.Semantic.DeepEqual(&storeElement{Key: "prefix/ns/pod", Object: pod2}, item) { t.Errorf("expected %v, got %v", pod1, item) } } @@ -90,7 +93,7 @@ func TestWatchCacheBasic(t *testing.T) { { podNames := sets.String{} for _, item := range store.List() { - podNames.Insert(item.(*api.Pod).ObjectMeta.Name) + podNames.Insert(item.(*storeElement).Object.(*api.Pod).ObjectMeta.Name) } if !podNames.HasAll("pod1", "pod2", "pod3") { t.Errorf("missing pods, found %v", podNames) @@ -108,7 +111,7 @@ func TestWatchCacheBasic(t *testing.T) { { podNames := sets.String{} for _, item := range store.List() { - podNames.Insert(item.(*api.Pod).ObjectMeta.Name) + podNames.Insert(item.(*storeElement).Object.(*api.Pod).ObjectMeta.Name) } if !podNames.HasAll("pod4", "pod5") { t.Errorf("missing pods, found %v", podNames)