Add a simple cache for objects stored in etcd.
This commit is contained in:
@@ -26,6 +26,7 @@ import (
|
||||
"path"
|
||||
"reflect"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/conversion"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
|
||||
@@ -42,6 +43,16 @@ type EtcdHelper struct {
|
||||
Versioner EtcdVersioner
|
||||
// prefix for all etcd keys
|
||||
PathPrefix string
|
||||
|
||||
// We cache objects stored in etcd. For keys we use Node.ModifiedIndex which is equivalent
|
||||
// to resourceVersion.
|
||||
// This depends on etcd's indexes being globally unique across all objects/types. This will
|
||||
// have to revisited if we decide to do things like multiple etcd clusters, or etcd will
|
||||
// support multi-object transaction that will result in many objects with the same index.
|
||||
// Number of entries stored in the cache is controlled by maxEtcdCacheEntries constant.
|
||||
// TODO: Measure how much this cache helps after the conversion code is optimized.
|
||||
cache map[uint64]runtime.Object
|
||||
mutex *sync.RWMutex
|
||||
}
|
||||
|
||||
// NewEtcdHelper creates a helper that works against objects that use the internal
|
||||
@@ -52,6 +63,8 @@ func NewEtcdHelper(client EtcdGetSet, codec runtime.Codec, prefix string) EtcdHe
|
||||
Codec: codec,
|
||||
Versioner: APIObjectVersioner{},
|
||||
PathPrefix: prefix,
|
||||
cache: make(map[uint64]runtime.Object),
|
||||
mutex: new(sync.RWMutex),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -121,19 +134,74 @@ func (h *EtcdHelper) decodeNodeList(nodes []*etcd.Node, slicePtr interface{}) er
|
||||
}
|
||||
continue
|
||||
}
|
||||
obj := reflect.New(v.Type().Elem())
|
||||
if err := h.Codec.DecodeInto([]byte(node.Value), obj.Interface().(runtime.Object)); err != nil {
|
||||
return err
|
||||
if obj, found := h.getFromCache(node.ModifiedIndex); found {
|
||||
v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem()))
|
||||
} else {
|
||||
obj := reflect.New(v.Type().Elem())
|
||||
if err := h.Codec.DecodeInto([]byte(node.Value), obj.Interface().(runtime.Object)); err != nil {
|
||||
return err
|
||||
}
|
||||
if h.Versioner != nil {
|
||||
// being unable to set the version does not prevent the object from being extracted
|
||||
_ = h.Versioner.UpdateObject(obj.Interface().(runtime.Object), node)
|
||||
}
|
||||
v.Set(reflect.Append(v, obj.Elem()))
|
||||
if node.ModifiedIndex != 0 {
|
||||
h.addToCache(node.ModifiedIndex, obj.Interface().(runtime.Object))
|
||||
}
|
||||
}
|
||||
if h.Versioner != nil {
|
||||
// being unable to set the version does not prevent the object from being extracted
|
||||
_ = h.Versioner.UpdateObject(obj.Interface().(runtime.Object), node)
|
||||
}
|
||||
v.Set(reflect.Append(v, obj.Elem()))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// etcdCache defines interface used for caching objects stored in etcd. Objects are keyed by
|
||||
// their Node.ModifiedIndex, which is unique across all types.
|
||||
// All implementations must be thread-safe.
|
||||
type etcdCache interface {
|
||||
getFromCache(index uint64) (runtime.Object, bool)
|
||||
addToCache(index uint64, obj runtime.Object)
|
||||
}
|
||||
|
||||
const maxEtcdCacheEntries int = 50000
|
||||
|
||||
func (h *EtcdHelper) getFromCache(index uint64) (runtime.Object, bool) {
|
||||
var obj runtime.Object
|
||||
func() {
|
||||
h.mutex.RLock()
|
||||
defer h.mutex.RUnlock()
|
||||
obj = h.cache[index]
|
||||
}()
|
||||
if obj != nil {
|
||||
// We should not return the object itself to avoid poluting the cache if someone
|
||||
// modifies returned values.
|
||||
objCopy, err := conversion.DeepCopy(obj)
|
||||
if err != nil {
|
||||
glog.Errorf("Error during DeepCopy of cached object: %q", err)
|
||||
return nil, false
|
||||
}
|
||||
return objCopy.(runtime.Object), true
|
||||
}
|
||||
return nil, false
|
||||
}
|
||||
|
||||
func (h *EtcdHelper) addToCache(index uint64, obj runtime.Object) {
|
||||
objCopy, err := conversion.DeepCopy(obj)
|
||||
if err != nil {
|
||||
glog.Errorf("Error during DeepCopy of cached object: %q", err)
|
||||
return
|
||||
}
|
||||
h.mutex.Lock()
|
||||
defer h.mutex.Unlock()
|
||||
h.cache[index] = objCopy.(runtime.Object)
|
||||
if len(h.cache) > maxEtcdCacheEntries {
|
||||
var randomKey uint64
|
||||
for randomKey = range h.cache {
|
||||
break
|
||||
}
|
||||
delete(h.cache, randomKey)
|
||||
}
|
||||
}
|
||||
|
||||
// ExtractToList works on a *List api object (an object that satisfies the runtime.IsList
|
||||
// definition) and extracts a go object per etcd node into a slice with the resource version.
|
||||
func (h *EtcdHelper) ExtractToList(key string, listObj runtime.Object) error {
|
||||
|
Reference in New Issue
Block a user