Order methods in EtcdHelper

This commit is contained in:
Wojciech Tyczynski
2015-07-24 09:19:08 +02:00
parent fdb3f45077
commit 98d598693b

View File

@@ -70,175 +70,101 @@ func init() {
metrics.Register()
}
func (h *EtcdHelper) listEtcdNode(key string) ([]*etcd.Node, uint64, error) {
result, err := h.Client.Get(key, true, true)
// Implements StorageInterface.
func (h *EtcdHelper) CreateObj(key string, obj, out runtime.Object, ttl uint64) error {
key = h.prefixEtcdKey(key)
data, err := h.Codec.Encode(obj)
if err != nil {
index, ok := etcdErrorIndex(err)
if !ok {
index = 0
}
nodes := make([]*etcd.Node, 0)
if IsEtcdNotFound(err) {
return nodes, index, nil
} else {
return nodes, index, err
return err
}
if h.Versioner != nil {
if version, err := h.Versioner.ObjectResourceVersion(obj); err == nil && version != 0 {
return errors.New("resourceVersion may not be set on objects to be created")
}
}
return result.Node.Nodes, result.EtcdIndex, nil
}
// decodeNodeList walks the tree of each node in the list and decodes into the specified object
func (h *EtcdHelper) decodeNodeList(nodes []*etcd.Node, slicePtr interface{}) error {
trace := util.NewTrace("decodeNodeList " + getTypeName(slicePtr))
defer trace.LogIfLong(500 * time.Millisecond)
v, err := conversion.EnforcePtr(slicePtr)
if err != nil || v.Kind() != reflect.Slice {
// This should not happen at runtime.
panic("need ptr to slice")
}
for _, node := range nodes {
if node.Dir {
trace.Step("Decoding dir " + node.Key + " START")
if err := h.decodeNodeList(node.Nodes, slicePtr); err != nil {
return err
}
trace.Step("Decoding dir " + node.Key + " END")
continue
}
if obj, found := h.getFromCache(node.ModifiedIndex); found {
v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem()))
} else {
obj := reflect.New(v.Type().Elem())
if err := h.Codec.DecodeInto([]byte(node.Value), obj.Interface().(runtime.Object)); err != nil {
return err
}
if h.Versioner != nil {
// being unable to set the version does not prevent the object from being extracted
_ = h.Versioner.UpdateObject(obj.Interface().(runtime.Object), node.Expiration, node.ModifiedIndex)
}
v.Set(reflect.Append(v, obj.Elem()))
if node.ModifiedIndex != 0 {
h.addToCache(node.ModifiedIndex, obj.Interface().(runtime.Object))
}
}
}
trace.Step(fmt.Sprintf("Decoded %v nodes", len(nodes)))
return nil
}
// etcdCache defines interface used for caching objects stored in etcd. Objects are keyed by
// their Node.ModifiedIndex, which is unique across all types.
// All implementations must be thread-safe.
type etcdCache interface {
getFromCache(index uint64) (runtime.Object, bool)
addToCache(index uint64, obj runtime.Object)
}
const maxEtcdCacheEntries int = 50000
func getTypeName(obj interface{}) string {
return reflect.TypeOf(obj).String()
}
func (h *EtcdHelper) getFromCache(index uint64) (runtime.Object, bool) {
startTime := time.Now()
defer func() {
metrics.ObserveGetCache(startTime)
}()
obj, found := h.cache.Get(index)
if found {
// We should not return the object itself to avoid poluting the cache if someone
// modifies returned values.
objCopy, err := h.Copier.Copy(obj.(runtime.Object))
if err != nil {
glog.Errorf("Error during DeepCopy of cached object: %q", err)
return nil, false
}
metrics.ObserveCacheHit()
return objCopy.(runtime.Object), true
}
metrics.ObserveCacheMiss()
return nil, false
}
func (h *EtcdHelper) addToCache(index uint64, obj runtime.Object) {
startTime := time.Now()
defer func() {
metrics.ObserveAddCache(startTime)
}()
objCopy, err := h.Copier.Copy(obj)
response, err := h.Client.Create(key, string(data), ttl)
metrics.RecordEtcdRequestLatency("create", getTypeName(obj), startTime)
if err != nil {
glog.Errorf("Error during DeepCopy of cached object: %q", err)
return
return err
}
isOverwrite := h.cache.Add(index, objCopy)
if !isOverwrite {
metrics.ObserveNewEntry()
if out != nil {
if _, err := conversion.EnforcePtr(out); err != nil {
panic("unable to convert output object to pointer")
}
_, _, err = h.extractObj(response, err, out, false, false)
}
return err
}
// Implements StorageInterface.
func (h *EtcdHelper) ExtractToList(key string, listObj runtime.Object) error {
trace := util.NewTrace("ExtractToList " + getTypeName(listObj))
defer trace.LogIfLong(time.Second)
listPtr, err := runtime.GetItemsPtr(listObj)
func (h *EtcdHelper) SetObj(key string, obj, out runtime.Object, ttl uint64) error {
var response *etcd.Response
data, err := h.Codec.Encode(obj)
if err != nil {
return err
}
key = h.prefixEtcdKey(key)
startTime := time.Now()
trace.Step("About to list etcd node")
nodes, index, err := h.listEtcdNode(key)
metrics.RecordEtcdRequestLatency("list", getTypeName(listPtr), startTime)
trace.Step("Etcd node listed")
create := true
if h.Versioner != nil {
if version, err := h.Versioner.ObjectResourceVersion(obj); err == nil && version != 0 {
create = false
startTime := time.Now()
response, err = h.Client.CompareAndSwap(key, string(data), ttl, "", version)
metrics.RecordEtcdRequestLatency("compareAndSwap", getTypeName(obj), startTime)
if err != nil {
return err
}
}
}
if create {
// Create will fail if a key already exists.
startTime := time.Now()
response, err = h.Client.Create(key, string(data), ttl)
metrics.RecordEtcdRequestLatency("create", getTypeName(obj), startTime)
}
if err != nil {
return err
}
if err := h.decodeNodeList(nodes, listPtr); err != nil {
return err
}
trace.Step("Node list decoded")
if h.Versioner != nil {
if err := h.Versioner.UpdateList(listObj, index); err != nil {
return err
if out != nil {
if _, err := conversion.EnforcePtr(out); err != nil {
panic("unable to convert output object to pointer")
}
_, _, err = h.extractObj(response, err, out, false, false)
}
return nil
return err
}
// Implements StorageInterface.
func (h *EtcdHelper) ExtractObjToList(key string, listObj runtime.Object) error {
trace := util.NewTrace("ExtractObjToList " + getTypeName(listObj))
listPtr, err := runtime.GetItemsPtr(listObj)
if err != nil {
return err
func (h *EtcdHelper) DeleteObj(key string, out runtime.Object) error {
key = h.prefixEtcdKey(key)
if _, err := conversion.EnforcePtr(out); err != nil {
panic("unable to convert output object to pointer")
}
startTime := time.Now()
response, err := h.Client.Delete(key, false)
metrics.RecordEtcdRequestLatency("delete", getTypeName(out), startTime)
if !IsEtcdNotFound(err) {
// if the object that existed prior to the delete is returned by etcd, update out.
if err != nil || response.PrevNode != nil {
_, _, err = h.extractObj(response, err, out, false, true)
}
}
return err
}
// Implements StorageInterface.
func (h *EtcdHelper) Delete(key string, recursive bool) error {
key = h.prefixEtcdKey(key)
startTime := time.Now()
trace.Step("About to read etcd node")
response, err := h.Client.Get(key, false, false)
metrics.RecordEtcdRequestLatency("get", getTypeName(listPtr), startTime)
trace.Step("Etcd node read")
if err != nil {
if IsEtcdNotFound(err) {
return nil
}
return err
}
nodes := make([]*etcd.Node, 0)
nodes = append(nodes, response.Node)
if err := h.decodeNodeList(nodes, listPtr); err != nil {
return err
}
trace.Step("Object decoded")
if h.Versioner != nil {
if err := h.Versioner.UpdateList(listObj, response.EtcdIndex); err != nil {
return err
}
}
return nil
_, err := h.Client.Delete(key, recursive)
metrics.RecordEtcdRequestLatency("delete", "UNKNOWN", startTime)
return err
}
// Implements StorageInterface.
@@ -292,102 +218,124 @@ func (h *EtcdHelper) extractObj(response *etcd.Response, inErr error, objPtr run
return body, node, err
}
// Implements StorageInterface.
func (h *EtcdHelper) CreateObj(key string, obj, out runtime.Object, ttl uint64) error {
key = h.prefixEtcdKey(key)
data, err := h.Codec.Encode(obj)
func (h *EtcdHelper) ExtractObjToList(key string, listObj runtime.Object) error {
trace := util.NewTrace("ExtractObjToList " + getTypeName(listObj))
listPtr, err := runtime.GetItemsPtr(listObj)
if err != nil {
return err
}
key = h.prefixEtcdKey(key)
startTime := time.Now()
trace.Step("About to read etcd node")
response, err := h.Client.Get(key, false, false)
metrics.RecordEtcdRequestLatency("get", getTypeName(listPtr), startTime)
trace.Step("Etcd node read")
if err != nil {
if IsEtcdNotFound(err) {
return nil
}
return err
}
nodes := make([]*etcd.Node, 0)
nodes = append(nodes, response.Node)
if err := h.decodeNodeList(nodes, listPtr); err != nil {
return err
}
trace.Step("Object decoded")
if h.Versioner != nil {
if version, err := h.Versioner.ObjectResourceVersion(obj); err == nil && version != 0 {
return errors.New("resourceVersion may not be set on objects to be created")
if err := h.Versioner.UpdateList(listObj, response.EtcdIndex); err != nil {
return err
}
}
startTime := time.Now()
response, err := h.Client.Create(key, string(data), ttl)
metrics.RecordEtcdRequestLatency("create", getTypeName(obj), startTime)
if err != nil {
return err
}
if out != nil {
if _, err := conversion.EnforcePtr(out); err != nil {
panic("unable to convert output object to pointer")
}
_, _, err = h.extractObj(response, err, out, false, false)
}
return err
return nil
}
// Implements StorageInterface.
func (h *EtcdHelper) Delete(key string, recursive bool) error {
key = h.prefixEtcdKey(key)
startTime := time.Now()
_, err := h.Client.Delete(key, recursive)
metrics.RecordEtcdRequestLatency("delete", "UNKNOWN", startTime)
return err
}
// Implements StorageInterface.
func (h *EtcdHelper) DeleteObj(key string, out runtime.Object) error {
key = h.prefixEtcdKey(key)
if _, err := conversion.EnforcePtr(out); err != nil {
panic("unable to convert output object to pointer")
// decodeNodeList walks the tree of each node in the list and decodes into the specified object
func (h *EtcdHelper) decodeNodeList(nodes []*etcd.Node, slicePtr interface{}) error {
trace := util.NewTrace("decodeNodeList " + getTypeName(slicePtr))
defer trace.LogIfLong(500 * time.Millisecond)
v, err := conversion.EnforcePtr(slicePtr)
if err != nil || v.Kind() != reflect.Slice {
// This should not happen at runtime.
panic("need ptr to slice")
}
startTime := time.Now()
response, err := h.Client.Delete(key, false)
metrics.RecordEtcdRequestLatency("delete", getTypeName(out), startTime)
if !IsEtcdNotFound(err) {
// if the object that existed prior to the delete is returned by etcd, update out.
if err != nil || response.PrevNode != nil {
_, _, err = h.extractObj(response, err, out, false, true)
}
}
return err
}
// Implements StorageInterface.
func (h *EtcdHelper) SetObj(key string, obj, out runtime.Object, ttl uint64) error {
var response *etcd.Response
data, err := h.Codec.Encode(obj)
if err != nil {
return err
}
key = h.prefixEtcdKey(key)
create := true
if h.Versioner != nil {
if version, err := h.Versioner.ObjectResourceVersion(obj); err == nil && version != 0 {
create = false
startTime := time.Now()
response, err = h.Client.CompareAndSwap(key, string(data), ttl, "", version)
metrics.RecordEtcdRequestLatency("compareAndSwap", getTypeName(obj), startTime)
if err != nil {
for _, node := range nodes {
if node.Dir {
trace.Step("Decoding dir " + node.Key + " START")
if err := h.decodeNodeList(node.Nodes, slicePtr); err != nil {
return err
}
trace.Step("Decoding dir " + node.Key + " END")
continue
}
if obj, found := h.getFromCache(node.ModifiedIndex); found {
v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem()))
} else {
obj := reflect.New(v.Type().Elem())
if err := h.Codec.DecodeInto([]byte(node.Value), obj.Interface().(runtime.Object)); err != nil {
return err
}
if h.Versioner != nil {
// being unable to set the version does not prevent the object from being extracted
_ = h.Versioner.UpdateObject(obj.Interface().(runtime.Object), node.Expiration, node.ModifiedIndex)
}
v.Set(reflect.Append(v, obj.Elem()))
if node.ModifiedIndex != 0 {
h.addToCache(node.ModifiedIndex, obj.Interface().(runtime.Object))
}
}
}
if create {
// Create will fail if a key already exists.
startTime := time.Now()
response, err = h.Client.Create(key, string(data), ttl)
metrics.RecordEtcdRequestLatency("create", getTypeName(obj), startTime)
}
trace.Step(fmt.Sprintf("Decoded %v nodes", len(nodes)))
return nil
}
// Implements StorageInterface.
func (h *EtcdHelper) ExtractToList(key string, listObj runtime.Object) error {
trace := util.NewTrace("ExtractToList " + getTypeName(listObj))
defer trace.LogIfLong(time.Second)
listPtr, err := runtime.GetItemsPtr(listObj)
if err != nil {
return err
}
if out != nil {
if _, err := conversion.EnforcePtr(out); err != nil {
panic("unable to convert output object to pointer")
}
_, _, err = h.extractObj(response, err, out, false, false)
key = h.prefixEtcdKey(key)
startTime := time.Now()
trace.Step("About to list etcd node")
nodes, index, err := h.listEtcdNode(key)
metrics.RecordEtcdRequestLatency("list", getTypeName(listPtr), startTime)
trace.Step("Etcd node listed")
if err != nil {
return err
}
if err := h.decodeNodeList(nodes, listPtr); err != nil {
return err
}
trace.Step("Node list decoded")
if h.Versioner != nil {
if err := h.Versioner.UpdateList(listObj, index); err != nil {
return err
}
}
return nil
}
return err
func (h *EtcdHelper) listEtcdNode(key string) ([]*etcd.Node, uint64, error) {
result, err := h.Client.Get(key, true, true)
if err != nil {
index, ok := etcdErrorIndex(err)
if !ok {
index = 0
}
nodes := make([]*etcd.Node, 0)
if IsEtcdNotFound(err) {
return nodes, index, nil
} else {
return nodes, index, err
}
}
return result.Node.Nodes, result.EtcdIndex, nil
}
type SimpleEtcdUpdateFunc func(runtime.Object) (runtime.Object, error)
@@ -484,3 +432,53 @@ func (h *EtcdHelper) prefixEtcdKey(key string) string {
return path.Join("/", h.PathPrefix, key)
}
// etcdCache defines interface used for caching objects stored in etcd. Objects are keyed by
// their Node.ModifiedIndex, which is unique across all types.
// All implementations must be thread-safe.
type etcdCache interface {
getFromCache(index uint64) (runtime.Object, bool)
addToCache(index uint64, obj runtime.Object)
}
const maxEtcdCacheEntries int = 50000
func getTypeName(obj interface{}) string {
return reflect.TypeOf(obj).String()
}
func (h *EtcdHelper) getFromCache(index uint64) (runtime.Object, bool) {
startTime := time.Now()
defer func() {
metrics.ObserveGetCache(startTime)
}()
obj, found := h.cache.Get(index)
if found {
// We should not return the object itself to avoid poluting the cache if someone
// modifies returned values.
objCopy, err := h.Copier.Copy(obj.(runtime.Object))
if err != nil {
glog.Errorf("Error during DeepCopy of cached object: %q", err)
return nil, false
}
metrics.ObserveCacheHit()
return objCopy.(runtime.Object), true
}
metrics.ObserveCacheMiss()
return nil, false
}
func (h *EtcdHelper) addToCache(index uint64, obj runtime.Object) {
startTime := time.Now()
defer func() {
metrics.ObserveAddCache(startTime)
}()
objCopy, err := h.Copier.Copy(obj)
if err != nil {
glog.Errorf("Error during DeepCopy of cached object: %q", err)
return
}
isOverwrite := h.cache.Add(index, objCopy)
if !isOverwrite {
metrics.ObserveNewEntry()
}
}