Store keys in watchCache store
This commit is contained in:
@@ -183,7 +183,7 @@ type Cacher struct {
|
||||
// internal cache and updating its cache in the background based on the given
|
||||
// configuration.
|
||||
func NewCacherFromConfig(config CacherConfig) *Cacher {
|
||||
watchCache := newWatchCache(config.CacheCapacity)
|
||||
watchCache := newWatchCache(config.CacheCapacity, config.KeyFunc)
|
||||
listerWatcher := newCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc)
|
||||
|
||||
// Give this error when it is constructed rather than when you get the
|
||||
@@ -390,12 +390,12 @@ func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, p
|
||||
}
|
||||
trace.Step(fmt.Sprintf("Listed %d items from cache", len(objs)))
|
||||
for _, obj := range objs {
|
||||
object, ok := obj.(runtime.Object)
|
||||
elem, ok := obj.(*storeElement)
|
||||
if !ok {
|
||||
return fmt.Errorf("non runtime.Object returned from storage: %v", obj)
|
||||
return fmt.Errorf("non *storeElement returned from storage: %v", obj)
|
||||
}
|
||||
if filter(object) {
|
||||
listVal.Set(reflect.Append(listVal, reflect.ValueOf(object).Elem()))
|
||||
if filter(elem.Object) {
|
||||
listVal.Set(reflect.Append(listVal, reflect.ValueOf(elem.Object).Elem()))
|
||||
}
|
||||
}
|
||||
trace.Step(fmt.Sprintf("Filtered %d items", listVal.Len()))
|
||||
|
@@ -49,6 +49,23 @@ type watchCacheEvent struct {
|
||||
ResourceVersion uint64
|
||||
}
|
||||
|
||||
// Computing a key of an object is generally non-trivial (it performs
|
||||
// e.g. validation underneath). To avoid computing it multiple times
|
||||
// (to serve the event in different List/Watch requests), in the
|
||||
// underlying store we are keeping pair (key, object).
|
||||
type storeElement struct {
|
||||
Key string
|
||||
Object runtime.Object
|
||||
}
|
||||
|
||||
func storeElementKey(obj interface{}) (string, error) {
|
||||
elem, ok := obj.(*storeElement)
|
||||
if !ok {
|
||||
return "", fmt.Errorf("not a storeElement: %v", obj)
|
||||
}
|
||||
return elem.Key, nil
|
||||
}
|
||||
|
||||
// watchCacheElement is a single "watch event" stored in a cache.
|
||||
// It contains the resource version of the object and the object
|
||||
// itself.
|
||||
@@ -72,6 +89,9 @@ type watchCache struct {
|
||||
// Maximum size of history window.
|
||||
capacity int
|
||||
|
||||
// keyFunc is used to get a key in the underlying storage for a given object.
|
||||
keyFunc func(runtime.Object) (string, error)
|
||||
|
||||
// cache is used a cyclic buffer - its first element (with the smallest
|
||||
// resourceVersion) is defined by startIndex, its last element is defined
|
||||
// by endIndex (if cache is full it will be startIndex + capacity).
|
||||
@@ -100,13 +120,14 @@ type watchCache struct {
|
||||
clock clock.Clock
|
||||
}
|
||||
|
||||
func newWatchCache(capacity int) *watchCache {
|
||||
func newWatchCache(capacity int, keyFunc func(runtime.Object) (string, error)) *watchCache {
|
||||
wc := &watchCache{
|
||||
capacity: capacity,
|
||||
keyFunc: keyFunc,
|
||||
cache: make([]watchCacheElement, capacity),
|
||||
startIndex: 0,
|
||||
endIndex: 0,
|
||||
store: cache.NewStore(cache.MetaNamespaceKeyFunc),
|
||||
store: cache.NewStore(storeElementKey),
|
||||
resourceVersion: 0,
|
||||
clock: clock.RealClock{},
|
||||
}
|
||||
@@ -114,6 +135,7 @@ func newWatchCache(capacity int) *watchCache {
|
||||
return wc
|
||||
}
|
||||
|
||||
// Add takes runtime.Object as an argument.
|
||||
func (w *watchCache) Add(obj interface{}) error {
|
||||
object, resourceVersion, err := objectToVersionedRuntimeObject(obj)
|
||||
if err != nil {
|
||||
@@ -121,10 +143,11 @@ func (w *watchCache) Add(obj interface{}) error {
|
||||
}
|
||||
event := watch.Event{Type: watch.Added, Object: object}
|
||||
|
||||
f := func(obj runtime.Object) error { return w.store.Add(obj) }
|
||||
f := func(elem *storeElement) error { return w.store.Add(elem) }
|
||||
return w.processEvent(event, resourceVersion, f)
|
||||
}
|
||||
|
||||
// Update takes runtime.Object as an argument.
|
||||
func (w *watchCache) Update(obj interface{}) error {
|
||||
object, resourceVersion, err := objectToVersionedRuntimeObject(obj)
|
||||
if err != nil {
|
||||
@@ -132,10 +155,11 @@ func (w *watchCache) Update(obj interface{}) error {
|
||||
}
|
||||
event := watch.Event{Type: watch.Modified, Object: object}
|
||||
|
||||
f := func(obj runtime.Object) error { return w.store.Update(obj) }
|
||||
f := func(elem *storeElement) error { return w.store.Update(elem) }
|
||||
return w.processEvent(event, resourceVersion, f)
|
||||
}
|
||||
|
||||
// Delete takes runtime.Object as an argument.
|
||||
func (w *watchCache) Delete(obj interface{}) error {
|
||||
object, resourceVersion, err := objectToVersionedRuntimeObject(obj)
|
||||
if err != nil {
|
||||
@@ -143,7 +167,7 @@ func (w *watchCache) Delete(obj interface{}) error {
|
||||
}
|
||||
event := watch.Event{Type: watch.Deleted, Object: object}
|
||||
|
||||
f := func(obj runtime.Object) error { return w.store.Delete(obj) }
|
||||
f := func(elem *storeElement) error { return w.store.Delete(elem) }
|
||||
return w.processEvent(event, resourceVersion, f)
|
||||
}
|
||||
|
||||
@@ -170,16 +194,22 @@ func parseResourceVersion(resourceVersion string) (uint64, error) {
|
||||
return strconv.ParseUint(resourceVersion, 10, 64)
|
||||
}
|
||||
|
||||
func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, updateFunc func(runtime.Object) error) error {
|
||||
func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, updateFunc func(*storeElement) error) error {
|
||||
key, err := w.keyFunc(event.Object)
|
||||
if err != nil {
|
||||
return fmt.Errorf("couldn't compute key: %v", err)
|
||||
}
|
||||
elem := &storeElement{Key: key, Object: event.Object}
|
||||
|
||||
w.Lock()
|
||||
defer w.Unlock()
|
||||
previous, exists, err := w.store.Get(event.Object)
|
||||
previous, exists, err := w.store.Get(elem)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
var prevObject runtime.Object
|
||||
if exists {
|
||||
prevObject = previous.(runtime.Object)
|
||||
prevObject = previous.(*storeElement).Object
|
||||
}
|
||||
watchCacheEvent := watchCacheEvent{
|
||||
Type: event.Type,
|
||||
@@ -193,7 +223,7 @@ func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, upd
|
||||
w.updateCache(resourceVersion, &watchCacheEvent)
|
||||
w.resourceVersion = resourceVersion
|
||||
w.cond.Broadcast()
|
||||
return updateFunc(event.Object)
|
||||
return updateFunc(elem)
|
||||
}
|
||||
|
||||
// Assumes that lock is already held for write.
|
||||
@@ -206,12 +236,14 @@ func (w *watchCache) updateCache(resourceVersion uint64, event *watchCacheEvent)
|
||||
w.endIndex++
|
||||
}
|
||||
|
||||
// List returns list of pointers to <storeElement> objects.
|
||||
func (w *watchCache) List() []interface{} {
|
||||
w.RLock()
|
||||
defer w.RUnlock()
|
||||
return w.store.List()
|
||||
}
|
||||
|
||||
// WaitUntilFreshAndList returns list of pointers to <storeElement> objects.
|
||||
func (w *watchCache) WaitUntilFreshAndList(resourceVersion uint64, trace *util.Trace) ([]interface{}, uint64, error) {
|
||||
startTime := w.clock.Now()
|
||||
go func() {
|
||||
@@ -249,30 +281,56 @@ func (w *watchCache) ListKeys() []string {
|
||||
return w.store.ListKeys()
|
||||
}
|
||||
|
||||
// Get takes runtime.Object as a parameter. However, it returns
|
||||
// pointer to <storeElement>.
|
||||
func (w *watchCache) Get(obj interface{}) (interface{}, bool, error) {
|
||||
object, ok := obj.(runtime.Object)
|
||||
if !ok {
|
||||
return nil, false, fmt.Errorf("obj does not implement runtime.Object interface: %v", obj)
|
||||
}
|
||||
key, err := w.keyFunc(object)
|
||||
if err != nil {
|
||||
return nil, false, fmt.Errorf("couldn't compute key: %v", err)
|
||||
}
|
||||
|
||||
w.RLock()
|
||||
defer w.RUnlock()
|
||||
return w.store.Get(obj)
|
||||
return w.store.Get(&storeElement{Key: key, Object: object})
|
||||
}
|
||||
|
||||
// GetByKey returns pointer to <storeElement>.
|
||||
func (w *watchCache) GetByKey(key string) (interface{}, bool, error) {
|
||||
w.RLock()
|
||||
defer w.RUnlock()
|
||||
return w.store.GetByKey(key)
|
||||
}
|
||||
|
||||
// Replace takes slice of runtime.Object as a paramater.
|
||||
func (w *watchCache) Replace(objs []interface{}, resourceVersion string) error {
|
||||
version, err := parseResourceVersion(resourceVersion)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
toReplace := make([]interface{}, 0, len(objs))
|
||||
for _, obj := range objs {
|
||||
object, ok := obj.(runtime.Object)
|
||||
if !ok {
|
||||
return fmt.Errorf("didn't get runtime.Object for replace: %#v", obj)
|
||||
}
|
||||
key, err := w.keyFunc(object)
|
||||
if err != nil {
|
||||
return fmt.Errorf("couldn't compute key: %v", err)
|
||||
}
|
||||
toReplace = append(toReplace, &storeElement{Key: key, Object: object})
|
||||
}
|
||||
|
||||
w.Lock()
|
||||
defer w.Unlock()
|
||||
|
||||
w.startIndex = 0
|
||||
w.endIndex = 0
|
||||
if err := w.store.Replace(objs, resourceVersion); err != nil {
|
||||
if err := w.store.Replace(toReplace, resourceVersion); err != nil {
|
||||
return err
|
||||
}
|
||||
w.resourceVersion = version
|
||||
@@ -311,7 +369,15 @@ func (w *watchCache) GetAllEventsSinceThreadUnsafe(resourceVersion uint64) ([]wa
|
||||
allItems := w.store.List()
|
||||
result := make([]watchCacheEvent, len(allItems))
|
||||
for i, item := range allItems {
|
||||
result[i] = watchCacheEvent{Type: watch.Added, Object: item.(runtime.Object)}
|
||||
elem, ok := item.(*storeElement)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("not a storeElement: %v", elem)
|
||||
}
|
||||
result[i] = watchCacheEvent{
|
||||
Type: watch.Added,
|
||||
Object: elem.Object,
|
||||
ResourceVersion: w.resourceVersion,
|
||||
}
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
@@ -44,7 +44,10 @@ func makeTestPod(name string, resourceVersion uint64) *api.Pod {
|
||||
|
||||
// newTestWatchCache just adds a fake clock.
|
||||
func newTestWatchCache(capacity int) *watchCache {
|
||||
wc := newWatchCache(capacity)
|
||||
keyFunc := func(obj runtime.Object) (string, error) {
|
||||
return NamespaceKeyFunc("prefix", obj)
|
||||
}
|
||||
wc := newWatchCache(capacity, keyFunc)
|
||||
wc.clock = clock.NewFakeClock(time.Now())
|
||||
return wc
|
||||
}
|
||||
@@ -60,7 +63,7 @@ func TestWatchCacheBasic(t *testing.T) {
|
||||
if item, ok, _ := store.Get(pod1); !ok {
|
||||
t.Errorf("didn't find pod")
|
||||
} else {
|
||||
if !api.Semantic.DeepEqual(pod1, item) {
|
||||
if !api.Semantic.DeepEqual(&storeElement{Key: "prefix/ns/pod", Object: pod1}, item) {
|
||||
t.Errorf("expected %v, got %v", pod1, item)
|
||||
}
|
||||
}
|
||||
@@ -71,7 +74,7 @@ func TestWatchCacheBasic(t *testing.T) {
|
||||
if item, ok, _ := store.Get(pod2); !ok {
|
||||
t.Errorf("didn't find pod")
|
||||
} else {
|
||||
if !api.Semantic.DeepEqual(pod2, item) {
|
||||
if !api.Semantic.DeepEqual(&storeElement{Key: "prefix/ns/pod", Object: pod2}, item) {
|
||||
t.Errorf("expected %v, got %v", pod1, item)
|
||||
}
|
||||
}
|
||||
@@ -90,7 +93,7 @@ func TestWatchCacheBasic(t *testing.T) {
|
||||
{
|
||||
podNames := sets.String{}
|
||||
for _, item := range store.List() {
|
||||
podNames.Insert(item.(*api.Pod).ObjectMeta.Name)
|
||||
podNames.Insert(item.(*storeElement).Object.(*api.Pod).ObjectMeta.Name)
|
||||
}
|
||||
if !podNames.HasAll("pod1", "pod2", "pod3") {
|
||||
t.Errorf("missing pods, found %v", podNames)
|
||||
@@ -108,7 +111,7 @@ func TestWatchCacheBasic(t *testing.T) {
|
||||
{
|
||||
podNames := sets.String{}
|
||||
for _, item := range store.List() {
|
||||
podNames.Insert(item.(*api.Pod).ObjectMeta.Name)
|
||||
podNames.Insert(item.(*storeElement).Object.(*api.Pod).ObjectMeta.Name)
|
||||
}
|
||||
if !podNames.HasAll("pod4", "pod5") {
|
||||
t.Errorf("missing pods, found %v", podNames)
|
||||
|
Reference in New Issue
Block a user