Initial support for listing from in-memory cache.
This commit is contained in:
@@ -45,6 +45,11 @@ type CacherConfig struct {
|
||||
// An underlying storage.Versioner.
|
||||
Versioner Versioner
|
||||
|
||||
// Whether to serve Lists from in-memory cache.
|
||||
//
|
||||
// NOTE: DO NOT SET TO TRUE IN PRODUCTION CODE!
|
||||
ListFromCache bool
|
||||
|
||||
// The Cache will be caching objects of a given Type and assumes that they
|
||||
// are all stored under ResourcePrefix directory in the underlying database.
|
||||
Type interface{}
|
||||
@@ -99,6 +104,11 @@ type Cacher struct {
|
||||
|
||||
// keyFunc is used to get a key in the underyling storage for a given object.
|
||||
keyFunc func(runtime.Object) (string, error)
|
||||
|
||||
// Whether to serve Lists from in-memory cache.
|
||||
//
|
||||
// NOTE: DO NOT SET TO TRUE IN PRODUCTION CODE!
|
||||
ListFromCache bool
|
||||
}
|
||||
|
||||
// Create a new Cacher responsible from service WATCH and LIST requests from its
|
||||
@@ -109,14 +119,15 @@ func NewCacher(config CacherConfig) *Cacher {
|
||||
listerWatcher := newCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc)
|
||||
|
||||
cacher := &Cacher{
|
||||
usable: sync.RWMutex{},
|
||||
storage: config.Storage,
|
||||
watchCache: watchCache,
|
||||
reflector: cache.NewReflector(listerWatcher, config.Type, watchCache, 0),
|
||||
watcherIdx: 0,
|
||||
watchers: make(map[int]*cacheWatcher),
|
||||
versioner: config.Versioner,
|
||||
keyFunc: config.KeyFunc,
|
||||
usable: sync.RWMutex{},
|
||||
storage: config.Storage,
|
||||
watchCache: watchCache,
|
||||
reflector: cache.NewReflector(listerWatcher, config.Type, watchCache, 0),
|
||||
watcherIdx: 0,
|
||||
watchers: make(map[int]*cacheWatcher),
|
||||
versioner: config.Versioner,
|
||||
keyFunc: config.KeyFunc,
|
||||
ListFromCache: config.ListFromCache,
|
||||
}
|
||||
cacher.usable.Lock()
|
||||
// See startCaching method for why explanation on it.
|
||||
@@ -220,21 +231,19 @@ func (c *Cacher) GetToList(ctx context.Context, key string, filter FilterFunc, l
|
||||
|
||||
// Implements storage.Interface.
|
||||
func (c *Cacher) List(ctx context.Context, key string, resourceVersion uint64, filter FilterFunc, listObj runtime.Object) error {
|
||||
return c.storage.List(ctx, key, resourceVersion, filter, listObj)
|
||||
}
|
||||
if !c.ListFromCache {
|
||||
return c.storage.List(ctx, key, resourceVersion, filter, listObj)
|
||||
}
|
||||
|
||||
// ListFromMemory implements list operation (the same signature as List method)
|
||||
// but it serves the contents from memory.
|
||||
// Current we cannot use ListFromMemory() instead of List(), because it only
|
||||
// guarantees eventual consistency (e.g. it's possible for Get called right after
|
||||
// Create to return not-exist, before the change is propagate).
|
||||
// TODO: We may consider changing to use ListFromMemory in the future, but this
|
||||
// requires wider discussion as an "api semantic change".
|
||||
func (c *Cacher) ListFromMemory(key string, listObj runtime.Object) error {
|
||||
// Do NOT allow Watch to start when the underlying structures are not propagated.
|
||||
// To avoid situation when List is proceesed before the underlying
|
||||
// watchCache is propagated for the first time, we acquire and immediately
|
||||
// release the 'usable' lock.
|
||||
// We don't need to hold it all the time, because watchCache is thread-safe
|
||||
// and it would complicate already very difficult locking pattern.
|
||||
c.usable.RLock()
|
||||
defer c.usable.RUnlock()
|
||||
c.usable.RUnlock()
|
||||
|
||||
// List elements from cache, with at least 'resourceVersion'.
|
||||
listPtr, err := runtime.GetItemsPtr(listObj)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -243,15 +252,15 @@ func (c *Cacher) ListFromMemory(key string, listObj runtime.Object) error {
|
||||
if err != nil || listVal.Kind() != reflect.Slice {
|
||||
return fmt.Errorf("need a pointer to slice, got %v", listVal.Kind())
|
||||
}
|
||||
filter := filterFunction(key, c.keyFunc, Everything)
|
||||
filterFunc := filterFunction(key, c.keyFunc, filter)
|
||||
|
||||
objs, resourceVersion := c.watchCache.ListWithVersion()
|
||||
objs, resourceVersion := c.watchCache.WaitUntilFreshAndList(resourceVersion)
|
||||
for _, obj := range objs {
|
||||
object, ok := obj.(runtime.Object)
|
||||
if !ok {
|
||||
return fmt.Errorf("non runtime.Object returned from storage: %v", obj)
|
||||
}
|
||||
if filter(object) {
|
||||
if filterFunc(object) {
|
||||
listVal.Set(reflect.Append(listVal, reflect.ValueOf(object).Elem()))
|
||||
}
|
||||
}
|
||||
|
@@ -37,7 +37,6 @@ import (
|
||||
"k8s.io/kubernetes/pkg/tools/etcdtest"
|
||||
"k8s.io/kubernetes/pkg/util"
|
||||
"k8s.io/kubernetes/pkg/util/sets"
|
||||
"k8s.io/kubernetes/pkg/util/wait"
|
||||
"k8s.io/kubernetes/pkg/watch"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
@@ -47,8 +46,9 @@ func newTestCacher(client tools.EtcdClient) *storage.Cacher {
|
||||
prefix := "pods"
|
||||
config := storage.CacherConfig{
|
||||
CacheCapacity: 10,
|
||||
Versioner: etcdstorage.APIObjectVersioner{},
|
||||
Storage: etcdstorage.NewEtcdStorage(client, testapi.Default.Codec(), etcdtest.PathPrefix()),
|
||||
Versioner: etcdstorage.APIObjectVersioner{},
|
||||
ListFromCache: true,
|
||||
Type: &api.Pod{},
|
||||
ResourcePrefix: prefix,
|
||||
KeyFunc: func(obj runtime.Object) (string, error) { return storage.NamespaceKeyFunc(prefix, obj) },
|
||||
@@ -65,18 +65,7 @@ func makeTestPod(name string) *api.Pod {
|
||||
}
|
||||
}
|
||||
|
||||
func waitForUpToDateCache(cacher *storage.Cacher, resourceVersion uint64) error {
|
||||
ready := func() (bool, error) {
|
||||
result, err := cacher.LastSyncResourceVersion()
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
return result == resourceVersion, nil
|
||||
}
|
||||
return wait.Poll(10*time.Millisecond, util.ForeverTestTimeout, ready)
|
||||
}
|
||||
|
||||
func TestListFromMemory(t *testing.T) {
|
||||
func TestList(t *testing.T) {
|
||||
fakeClient := tools.NewFakeEtcdClient(t)
|
||||
prefixedKey := etcdtest.AddPrefix("pods")
|
||||
fakeClient.ExpectNotFoundGet(prefixedKey)
|
||||
@@ -146,12 +135,9 @@ func TestListFromMemory(t *testing.T) {
|
||||
for _, test := range testCases {
|
||||
fakeClient.WatchResponse <- test
|
||||
}
|
||||
if err := waitForUpToDateCache(cacher, 5); err != nil {
|
||||
t.Errorf("watch cache didn't propagated correctly: %v", err)
|
||||
}
|
||||
|
||||
result := &api.PodList{}
|
||||
if err := cacher.ListFromMemory("pods/ns", result); err != nil {
|
||||
if err := cacher.List(context.TODO(), "pods/ns", 5, storage.Everything, result); err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
if result.ListMeta.ResourceVersion != "5" {
|
||||
|
@@ -55,6 +55,10 @@ type watchCacheElement struct {
|
||||
type watchCache struct {
|
||||
sync.RWMutex
|
||||
|
||||
// Condition on which lists are waiting for the fresh enough
|
||||
// resource version.
|
||||
cond *sync.Cond
|
||||
|
||||
// Maximum size of history window.
|
||||
capacity int
|
||||
|
||||
@@ -84,7 +88,7 @@ type watchCache struct {
|
||||
}
|
||||
|
||||
func newWatchCache(capacity int) *watchCache {
|
||||
return &watchCache{
|
||||
wc := &watchCache{
|
||||
capacity: capacity,
|
||||
cache: make([]watchCacheElement, capacity),
|
||||
startIndex: 0,
|
||||
@@ -92,6 +96,8 @@ func newWatchCache(capacity int) *watchCache {
|
||||
store: cache.NewStore(cache.MetaNamespaceKeyFunc),
|
||||
resourceVersion: 0,
|
||||
}
|
||||
wc.cond = sync.NewCond(wc.RLocker())
|
||||
return wc
|
||||
}
|
||||
|
||||
func (w *watchCache) Add(obj interface{}) error {
|
||||
@@ -169,6 +175,7 @@ func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, upd
|
||||
}
|
||||
w.updateCache(resourceVersion, watchCacheEvent)
|
||||
w.resourceVersion = resourceVersion
|
||||
w.cond.Broadcast()
|
||||
return updateFunc(event.Object)
|
||||
}
|
||||
|
||||
@@ -188,8 +195,11 @@ func (w *watchCache) List() []interface{} {
|
||||
return w.store.List()
|
||||
}
|
||||
|
||||
func (w *watchCache) ListWithVersion() ([]interface{}, uint64) {
|
||||
func (w *watchCache) WaitUntilFreshAndList(resourceVersion uint64) ([]interface{}, uint64) {
|
||||
w.RLock()
|
||||
for w.resourceVersion < resourceVersion {
|
||||
w.cond.Wait()
|
||||
}
|
||||
defer w.RUnlock()
|
||||
return w.store.List(), w.resourceVersion
|
||||
}
|
||||
@@ -230,6 +240,7 @@ func (w *watchCache) Replace(objs []interface{}, resourceVersion string) error {
|
||||
if w.onReplace != nil {
|
||||
w.onReplace()
|
||||
}
|
||||
w.cond.Broadcast()
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@@ -230,6 +230,24 @@ func TestEvents(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestWaitUntilFreshAndList(t *testing.T) {
|
||||
store := newWatchCache(3)
|
||||
|
||||
// In background, update the store.
|
||||
go func() {
|
||||
store.Add(makeTestPod("foo", 2))
|
||||
store.Add(makeTestPod("bar", 5))
|
||||
}()
|
||||
|
||||
list, resourceVersion := store.WaitUntilFreshAndList(4)
|
||||
if resourceVersion != 5 {
|
||||
t.Errorf("unexpected resourceVersion: %v, expected: 5", resourceVersion)
|
||||
}
|
||||
if len(list) != 2 {
|
||||
t.Errorf("unexpected list returned: %#v", list)
|
||||
}
|
||||
}
|
||||
|
||||
type testLW struct {
|
||||
ListFunc func() (runtime.Object, error)
|
||||
WatchFunc func(options api.ListOptions) (watch.Interface, error)
|
||||
@@ -244,7 +262,7 @@ func TestReflectorForWatchCache(t *testing.T) {
|
||||
store := newWatchCache(5)
|
||||
|
||||
{
|
||||
_, version := store.ListWithVersion()
|
||||
_, version := store.WaitUntilFreshAndList(0)
|
||||
if version != 0 {
|
||||
t.Errorf("unexpected resource version: %d", version)
|
||||
}
|
||||
@@ -264,7 +282,7 @@ func TestReflectorForWatchCache(t *testing.T) {
|
||||
r.ListAndWatch(util.NeverStop)
|
||||
|
||||
{
|
||||
_, version := store.ListWithVersion()
|
||||
_, version := store.WaitUntilFreshAndList(10)
|
||||
if version != 10 {
|
||||
t.Errorf("unexpected resource version: %d", version)
|
||||
}
|
||||
|
Reference in New Issue
Block a user