Private EtcdHelper

This commit is contained in:
Wojciech Tyczynski
2015-07-24 13:09:49 +02:00
parent 5bd82ffe6d
commit 9d943df397
56 changed files with 471 additions and 482 deletions

View File

@@ -34,27 +34,26 @@ import (
"github.com/golang/glog"
)
func NewEtcdHelper(client EtcdClient, codec runtime.Codec, prefix string) EtcdHelper {
return EtcdHelper{
Client: client,
Codec: codec,
Versioner: APIObjectVersioner{},
Copier: api.Scheme,
PathPrefix: prefix,
func NewEtcdStorage(client EtcdClient, codec runtime.Codec, prefix string) StorageInterface {
return &etcdHelper{
client: client,
codec: codec,
versioner: APIObjectVersioner{},
copier: api.Scheme,
pathPrefix: prefix,
cache: util.NewCache(maxEtcdCacheEntries),
}
}
// EtcdHelper is the reference implementation of StorageInterface.
// TODO(wojtekt): Make it private and expose only StorageInterface to outside world.
type EtcdHelper struct {
Client EtcdClient
Codec runtime.Codec
Copier runtime.ObjectCopier
// etcdHelper is the reference implementation of StorageInterface.
type etcdHelper struct {
client EtcdClient
codec runtime.Codec
copier runtime.ObjectCopier
// optional, has to be set to perform any atomic operations
Versioner EtcdVersioner
versioner StorageVersioner
// prefix for all etcd keys
PathPrefix string
pathPrefix string
// We cache objects stored in etcd. For keys we use Node.ModifiedIndex which is equivalent
// to resourceVersion.
@@ -71,20 +70,30 @@ func init() {
}
// Implements StorageInterface.
func (h *EtcdHelper) CreateObj(key string, obj, out runtime.Object, ttl uint64) error {
func (h *etcdHelper) Backends() []string {
return h.client.GetCluster()
}
// Implements StorageInterface.
func (h *etcdHelper) Versioner() StorageVersioner {
return h.versioner
}
// Implements StorageInterface.
func (h *etcdHelper) CreateObj(key string, obj, out runtime.Object, ttl uint64) error {
key = h.prefixEtcdKey(key)
data, err := h.Codec.Encode(obj)
data, err := h.codec.Encode(obj)
if err != nil {
return err
}
if h.Versioner != nil {
if version, err := h.Versioner.ObjectResourceVersion(obj); err == nil && version != 0 {
if h.versioner != nil {
if version, err := h.versioner.ObjectResourceVersion(obj); err == nil && version != 0 {
return errors.New("resourceVersion may not be set on objects to be created")
}
}
startTime := time.Now()
response, err := h.Client.Create(key, string(data), ttl)
response, err := h.client.Create(key, string(data), ttl)
metrics.RecordEtcdRequestLatency("create", getTypeName(obj), startTime)
if err != nil {
return err
@@ -99,20 +108,20 @@ func (h *EtcdHelper) CreateObj(key string, obj, out runtime.Object, ttl uint64)
}
// Implements StorageInterface.
func (h *EtcdHelper) SetObj(key string, obj, out runtime.Object, ttl uint64) error {
func (h *etcdHelper) SetObj(key string, obj, out runtime.Object, ttl uint64) error {
var response *etcd.Response
data, err := h.Codec.Encode(obj)
data, err := h.codec.Encode(obj)
if err != nil {
return err
}
key = h.prefixEtcdKey(key)
create := true
if h.Versioner != nil {
if version, err := h.Versioner.ObjectResourceVersion(obj); err == nil && version != 0 {
if h.versioner != nil {
if version, err := h.versioner.ObjectResourceVersion(obj); err == nil && version != 0 {
create = false
startTime := time.Now()
response, err = h.Client.CompareAndSwap(key, string(data), ttl, "", version)
response, err = h.client.CompareAndSwap(key, string(data), ttl, "", version)
metrics.RecordEtcdRequestLatency("compareAndSwap", getTypeName(obj), startTime)
if err != nil {
return err
@@ -122,7 +131,7 @@ func (h *EtcdHelper) SetObj(key string, obj, out runtime.Object, ttl uint64) err
if create {
// Create will fail if a key already exists.
startTime := time.Now()
response, err = h.Client.Create(key, string(data), ttl)
response, err = h.client.Create(key, string(data), ttl)
metrics.RecordEtcdRequestLatency("create", getTypeName(obj), startTime)
}
@@ -140,14 +149,14 @@ func (h *EtcdHelper) SetObj(key string, obj, out runtime.Object, ttl uint64) err
}
// Implements StorageInterface.
func (h *EtcdHelper) DeleteObj(key string, out runtime.Object) error {
func (h *etcdHelper) DeleteObj(key string, out runtime.Object) error {
key = h.prefixEtcdKey(key)
if _, err := conversion.EnforcePtr(out); err != nil {
panic("unable to convert output object to pointer")
}
startTime := time.Now()
response, err := h.Client.Delete(key, false)
response, err := h.client.Delete(key, false)
metrics.RecordEtcdRequestLatency("delete", getTypeName(out), startTime)
if !IsEtcdNotFound(err) {
// if the object that existed prior to the delete is returned by etcd, update out.
@@ -159,16 +168,16 @@ func (h *EtcdHelper) DeleteObj(key string, out runtime.Object) error {
}
// Implements StorageInterface.
func (h *EtcdHelper) Delete(key string, recursive bool) error {
func (h *etcdHelper) Delete(key string, recursive bool) error {
key = h.prefixEtcdKey(key)
startTime := time.Now()
_, err := h.Client.Delete(key, recursive)
_, err := h.client.Delete(key, recursive)
metrics.RecordEtcdRequestLatency("delete", "UNKNOWN", startTime)
return err
}
// Implements StorageInterface.
func (h *EtcdHelper) ExtractObj(key string, objPtr runtime.Object, ignoreNotFound bool) error {
func (h *etcdHelper) ExtractObj(key string, objPtr runtime.Object, ignoreNotFound bool) error {
key = h.prefixEtcdKey(key)
_, _, _, err := h.bodyAndExtractObj(key, objPtr, ignoreNotFound)
return err
@@ -176,9 +185,9 @@ func (h *EtcdHelper) ExtractObj(key string, objPtr runtime.Object, ignoreNotFoun
// bodyAndExtractObj performs the normal Get path to etcd, returning the parsed node and response for additional information
// about the response, like the current etcd index and the ttl.
func (h *EtcdHelper) bodyAndExtractObj(key string, objPtr runtime.Object, ignoreNotFound bool) (body string, node *etcd.Node, res *etcd.Response, err error) {
func (h *etcdHelper) bodyAndExtractObj(key string, objPtr runtime.Object, ignoreNotFound bool) (body string, node *etcd.Node, res *etcd.Response, err error) {
startTime := time.Now()
response, err := h.Client.Get(key, false, false)
response, err := h.client.Get(key, false, false)
metrics.RecordEtcdRequestLatency("get", getTypeName(objPtr), startTime)
if err != nil && !IsEtcdNotFound(err) {
@@ -188,7 +197,7 @@ func (h *EtcdHelper) bodyAndExtractObj(key string, objPtr runtime.Object, ignore
return body, node, response, err
}
func (h *EtcdHelper) extractObj(response *etcd.Response, inErr error, objPtr runtime.Object, ignoreNotFound, prevNode bool) (body string, node *etcd.Node, err error) {
func (h *etcdHelper) extractObj(response *etcd.Response, inErr error, objPtr runtime.Object, ignoreNotFound, prevNode bool) (body string, node *etcd.Node, err error) {
if response != nil {
if prevNode {
node = response.PrevNode
@@ -210,16 +219,16 @@ func (h *EtcdHelper) extractObj(response *etcd.Response, inErr error, objPtr run
return "", nil, fmt.Errorf("unable to locate a value on the response: %#v", response)
}
body = node.Value
err = h.Codec.DecodeInto([]byte(body), objPtr)
if h.Versioner != nil {
_ = h.Versioner.UpdateObject(objPtr, node.Expiration, node.ModifiedIndex)
err = h.codec.DecodeInto([]byte(body), objPtr)
if h.versioner != nil {
_ = h.versioner.UpdateObject(objPtr, node.Expiration, node.ModifiedIndex)
// being unable to set the version does not prevent the object from being extracted
}
return body, node, err
}
// Implements StorageInterface.
func (h *EtcdHelper) ExtractObjToList(key string, listObj runtime.Object) error {
func (h *etcdHelper) ExtractObjToList(key string, listObj runtime.Object) error {
trace := util.NewTrace("ExtractObjToList " + getTypeName(listObj))
listPtr, err := runtime.GetItemsPtr(listObj)
if err != nil {
@@ -228,7 +237,7 @@ func (h *EtcdHelper) ExtractObjToList(key string, listObj runtime.Object) error
key = h.prefixEtcdKey(key)
startTime := time.Now()
trace.Step("About to read etcd node")
response, err := h.Client.Get(key, false, false)
response, err := h.client.Get(key, false, false)
metrics.RecordEtcdRequestLatency("get", getTypeName(listPtr), startTime)
trace.Step("Etcd node read")
if err != nil {
@@ -245,8 +254,8 @@ func (h *EtcdHelper) ExtractObjToList(key string, listObj runtime.Object) error
return err
}
trace.Step("Object decoded")
if h.Versioner != nil {
if err := h.Versioner.UpdateList(listObj, response.EtcdIndex); err != nil {
if h.versioner != nil {
if err := h.versioner.UpdateList(listObj, response.EtcdIndex); err != nil {
return err
}
}
@@ -254,7 +263,7 @@ func (h *EtcdHelper) ExtractObjToList(key string, listObj runtime.Object) error
}
// decodeNodeList walks the tree of each node in the list and decodes into the specified object
func (h *EtcdHelper) decodeNodeList(nodes []*etcd.Node, slicePtr interface{}) error {
func (h *etcdHelper) decodeNodeList(nodes []*etcd.Node, slicePtr interface{}) error {
trace := util.NewTrace("decodeNodeList " + getTypeName(slicePtr))
defer trace.LogIfLong(500 * time.Millisecond)
v, err := conversion.EnforcePtr(slicePtr)
@@ -275,12 +284,12 @@ func (h *EtcdHelper) decodeNodeList(nodes []*etcd.Node, slicePtr interface{}) er
v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem()))
} else {
obj := reflect.New(v.Type().Elem())
if err := h.Codec.DecodeInto([]byte(node.Value), obj.Interface().(runtime.Object)); err != nil {
if err := h.codec.DecodeInto([]byte(node.Value), obj.Interface().(runtime.Object)); err != nil {
return err
}
if h.Versioner != nil {
if h.versioner != nil {
// being unable to set the version does not prevent the object from being extracted
_ = h.Versioner.UpdateObject(obj.Interface().(runtime.Object), node.Expiration, node.ModifiedIndex)
_ = h.versioner.UpdateObject(obj.Interface().(runtime.Object), node.Expiration, node.ModifiedIndex)
}
v.Set(reflect.Append(v, obj.Elem()))
if node.ModifiedIndex != 0 {
@@ -293,7 +302,7 @@ func (h *EtcdHelper) decodeNodeList(nodes []*etcd.Node, slicePtr interface{}) er
}
// Implements StorageInterface.
func (h *EtcdHelper) ExtractToList(key string, listObj runtime.Object) error {
func (h *etcdHelper) ExtractToList(key string, listObj runtime.Object) error {
trace := util.NewTrace("ExtractToList " + getTypeName(listObj))
defer trace.LogIfLong(time.Second)
listPtr, err := runtime.GetItemsPtr(listObj)
@@ -313,16 +322,16 @@ func (h *EtcdHelper) ExtractToList(key string, listObj runtime.Object) error {
return err
}
trace.Step("Node list decoded")
if h.Versioner != nil {
if err := h.Versioner.UpdateList(listObj, index); err != nil {
if h.versioner != nil {
if err := h.versioner.UpdateList(listObj, index); err != nil {
return err
}
}
return nil
}
func (h *EtcdHelper) listEtcdNode(key string) ([]*etcd.Node, uint64, error) {
result, err := h.Client.Get(key, true, true)
func (h *etcdHelper) listEtcdNode(key string) ([]*etcd.Node, uint64, error) {
result, err := h.client.Get(key, true, true)
if err != nil {
index, ok := etcdErrorIndex(err)
if !ok {
@@ -349,7 +358,7 @@ func SimpleUpdate(fn SimpleEtcdUpdateFunc) StorageUpdateFunc {
}
// Implements StorageInterface.
func (h *EtcdHelper) GuaranteedUpdate(key string, ptrToType runtime.Object, ignoreNotFound bool, tryUpdate StorageUpdateFunc) error {
func (h *etcdHelper) GuaranteedUpdate(key string, ptrToType runtime.Object, ignoreNotFound bool, tryUpdate StorageUpdateFunc) error {
v, err := conversion.EnforcePtr(ptrToType)
if err != nil {
// Panic is appropriate, because this is a programming error.
@@ -391,7 +400,7 @@ func (h *EtcdHelper) GuaranteedUpdate(key string, ptrToType runtime.Object, igno
ttl = *newTTL
}
data, err := h.Codec.Encode(ret)
data, err := h.codec.Encode(ret)
if err != nil {
return err
}
@@ -399,7 +408,7 @@ func (h *EtcdHelper) GuaranteedUpdate(key string, ptrToType runtime.Object, igno
// First time this key has been used, try creating new value.
if index == 0 {
startTime := time.Now()
response, err := h.Client.Create(key, string(data), ttl)
response, err := h.client.Create(key, string(data), ttl)
metrics.RecordEtcdRequestLatency("create", getTypeName(ptrToType), startTime)
if IsEtcdNodeExist(err) {
continue
@@ -414,7 +423,7 @@ func (h *EtcdHelper) GuaranteedUpdate(key string, ptrToType runtime.Object, igno
startTime := time.Now()
// Swap origBody with data, if origBody is the latest etcd data.
response, err := h.Client.CompareAndSwap(key, string(data), ttl, origBody, index)
response, err := h.client.CompareAndSwap(key, string(data), ttl, origBody, index)
metrics.RecordEtcdRequestLatency("compareAndSwap", getTypeName(ptrToType), startTime)
if IsEtcdTestFailed(err) {
// Try again.
@@ -425,11 +434,11 @@ func (h *EtcdHelper) GuaranteedUpdate(key string, ptrToType runtime.Object, igno
}
}
func (h *EtcdHelper) prefixEtcdKey(key string) string {
if strings.HasPrefix(key, path.Join("/", h.PathPrefix)) {
func (h *etcdHelper) prefixEtcdKey(key string) string {
if strings.HasPrefix(key, path.Join("/", h.pathPrefix)) {
return key
}
return path.Join("/", h.PathPrefix, key)
return path.Join("/", h.pathPrefix, key)
}
// etcdCache defines interface used for caching objects stored in etcd. Objects are keyed by
@@ -446,7 +455,7 @@ func getTypeName(obj interface{}) string {
return reflect.TypeOf(obj).String()
}
func (h *EtcdHelper) getFromCache(index uint64) (runtime.Object, bool) {
func (h *etcdHelper) getFromCache(index uint64) (runtime.Object, bool) {
startTime := time.Now()
defer func() {
metrics.ObserveGetCache(startTime)
@@ -455,7 +464,7 @@ func (h *EtcdHelper) getFromCache(index uint64) (runtime.Object, bool) {
if found {
// We should not return the object itself to avoid poluting the cache if someone
// modifies returned values.
objCopy, err := h.Copier.Copy(obj.(runtime.Object))
objCopy, err := h.copier.Copy(obj.(runtime.Object))
if err != nil {
glog.Errorf("Error during DeepCopy of cached object: %q", err)
return nil, false
@@ -467,12 +476,12 @@ func (h *EtcdHelper) getFromCache(index uint64) (runtime.Object, bool) {
return nil, false
}
func (h *EtcdHelper) addToCache(index uint64, obj runtime.Object) {
func (h *etcdHelper) addToCache(index uint64, obj runtime.Object) {
startTime := time.Now()
defer func() {
metrics.ObserveAddCache(startTime)
}()
objCopy, err := h.Copier.Copy(obj)
objCopy, err := h.copier.Copy(obj)
if err != nil {
glog.Errorf("Error during DeepCopy of cached object: %q", err)
return

View File

@@ -65,6 +65,10 @@ func init() {
)
}
func newEtcdHelper(client EtcdClient, codec runtime.Codec, prefix string) etcdHelper {
return *NewEtcdStorage(client, codec, prefix).(*etcdHelper)
}
func TestIsEtcdNotFound(t *testing.T) {
try := func(err error, isNotFound bool) {
if IsEtcdNotFound(err) != isNotFound {
@@ -87,7 +91,7 @@ func getEncodedPod(name string) string {
func TestExtractToList(t *testing.T) {
fakeClient := NewFakeEtcdClient(t)
helper := NewEtcdHelper(fakeClient, testapi.Codec(), etcdtest.PathPrefix())
helper := newEtcdHelper(fakeClient, testapi.Codec(), etcdtest.PathPrefix())
key := etcdtest.AddPrefix("/some/key")
fakeClient.Data[key] = EtcdResponseWithError{
R: &etcd.Response{
@@ -157,7 +161,7 @@ func TestExtractToList(t *testing.T) {
// TestExtractToListAcrossDirectories ensures that the client excludes directories and flattens tree-response - simulates cross-namespace query
func TestExtractToListAcrossDirectories(t *testing.T) {
fakeClient := NewFakeEtcdClient(t)
helper := NewEtcdHelper(fakeClient, testapi.Codec(), etcdtest.PathPrefix())
helper := newEtcdHelper(fakeClient, testapi.Codec(), etcdtest.PathPrefix())
key := etcdtest.AddPrefix("/some/key")
fakeClient.Data[key] = EtcdResponseWithError{
R: &etcd.Response{
@@ -240,7 +244,7 @@ func TestExtractToListAcrossDirectories(t *testing.T) {
func TestExtractToListExcludesDirectories(t *testing.T) {
fakeClient := NewFakeEtcdClient(t)
helper := NewEtcdHelper(fakeClient, testapi.Codec(), etcdtest.PathPrefix())
helper := newEtcdHelper(fakeClient, testapi.Codec(), etcdtest.PathPrefix())
key := etcdtest.AddPrefix("/some/key")
fakeClient.Data[key] = EtcdResponseWithError{
R: &etcd.Response{
@@ -311,7 +315,7 @@ func TestExtractToListExcludesDirectories(t *testing.T) {
func TestExtractObj(t *testing.T) {
fakeClient := NewFakeEtcdClient(t)
helper := NewEtcdHelper(fakeClient, testapi.Codec(), etcdtest.PathPrefix())
helper := newEtcdHelper(fakeClient, testapi.Codec(), etcdtest.PathPrefix())
key := etcdtest.AddPrefix("/some/key")
expect := api.Pod{
ObjectMeta: api.ObjectMeta{Name: "foo"},
@@ -333,7 +337,7 @@ func TestExtractObj(t *testing.T) {
func TestExtractObjNotFoundErr(t *testing.T) {
fakeClient := NewFakeEtcdClient(t)
helper := NewEtcdHelper(fakeClient, testapi.Codec(), etcdtest.PathPrefix())
helper := newEtcdHelper(fakeClient, testapi.Codec(), etcdtest.PathPrefix())
key1 := etcdtest.AddPrefix("/some/key")
fakeClient.Data[key1] = EtcdResponseWithError{
R: &etcd.Response{
@@ -377,7 +381,7 @@ func TestExtractObjNotFoundErr(t *testing.T) {
func TestCreateObj(t *testing.T) {
obj := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}
fakeClient := NewFakeEtcdClient(t)
helper := NewEtcdHelper(fakeClient, testapi.Codec(), etcdtest.PathPrefix())
helper := newEtcdHelper(fakeClient, testapi.Codec(), etcdtest.PathPrefix())
returnedObj := &api.Pod{}
err := helper.CreateObj("/some/key", obj, returnedObj, 5)
if err != nil {
@@ -403,7 +407,7 @@ func TestCreateObj(t *testing.T) {
func TestCreateObjNilOutParam(t *testing.T) {
obj := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}
fakeClient := NewFakeEtcdClient(t)
helper := NewEtcdHelper(fakeClient, testapi.Codec(), etcdtest.PathPrefix())
helper := newEtcdHelper(fakeClient, testapi.Codec(), etcdtest.PathPrefix())
err := helper.CreateObj("/some/key", obj, nil, 5)
if err != nil {
t.Errorf("Unexpected error %#v", err)
@@ -413,7 +417,7 @@ func TestCreateObjNilOutParam(t *testing.T) {
func TestSetObj(t *testing.T) {
obj := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}
fakeClient := NewFakeEtcdClient(t)
helper := NewEtcdHelper(fakeClient, testapi.Codec(), etcdtest.PathPrefix())
helper := newEtcdHelper(fakeClient, testapi.Codec(), etcdtest.PathPrefix())
returnedObj := &api.Pod{}
err := helper.SetObj("/some/key", obj, returnedObj, 5)
if err != nil {
@@ -441,7 +445,7 @@ func TestSetObjFailCAS(t *testing.T) {
obj := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo", ResourceVersion: "1"}}
fakeClient := NewFakeEtcdClient(t)
fakeClient.CasErr = fakeClient.NewError(123)
helper := NewEtcdHelper(fakeClient, testapi.Codec(), etcdtest.PathPrefix())
helper := newEtcdHelper(fakeClient, testapi.Codec(), etcdtest.PathPrefix())
err := helper.SetObj("/some/key", obj, nil, 5)
if err == nil {
t.Errorf("Expecting error.")
@@ -452,7 +456,7 @@ func TestSetObjWithVersion(t *testing.T) {
obj := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo", ResourceVersion: "1"}}
fakeClient := NewFakeEtcdClient(t)
fakeClient.TestIndex = true
helper := NewEtcdHelper(fakeClient, testapi.Codec(), etcdtest.PathPrefix())
helper := newEtcdHelper(fakeClient, testapi.Codec(), etcdtest.PathPrefix())
key := etcdtest.AddPrefix("/some/key")
fakeClient.Data[key] = EtcdResponseWithError{
R: &etcd.Response{
@@ -488,8 +492,8 @@ func TestSetObjWithVersion(t *testing.T) {
func TestSetObjWithoutResourceVersioner(t *testing.T) {
obj := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}
fakeClient := NewFakeEtcdClient(t)
helper := NewEtcdHelper(fakeClient, testapi.Codec(), etcdtest.PathPrefix())
helper.Versioner = nil
helper := newEtcdHelper(fakeClient, testapi.Codec(), etcdtest.PathPrefix())
helper.versioner = nil
returnedObj := &api.Pod{}
err := helper.SetObj("/some/key", obj, returnedObj, 3)
key := etcdtest.AddPrefix("/some/key")
@@ -516,8 +520,8 @@ func TestSetObjWithoutResourceVersioner(t *testing.T) {
func TestSetObjNilOutParam(t *testing.T) {
obj := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}
fakeClient := NewFakeEtcdClient(t)
helper := NewEtcdHelper(fakeClient, testapi.Codec(), etcdtest.PathPrefix())
helper.Versioner = nil
helper := newEtcdHelper(fakeClient, testapi.Codec(), etcdtest.PathPrefix())
helper.versioner = nil
err := helper.SetObj("/some/key", obj, nil, 3)
if err != nil {
t.Errorf("Unexpected error %#v", err)
@@ -527,7 +531,7 @@ func TestSetObjNilOutParam(t *testing.T) {
func TestGuaranteedUpdate(t *testing.T) {
fakeClient := NewFakeEtcdClient(t)
fakeClient.TestIndex = true
helper := NewEtcdHelper(fakeClient, codec, etcdtest.PathPrefix())
helper := newEtcdHelper(fakeClient, codec, etcdtest.PathPrefix())
key := etcdtest.AddPrefix("/some/key")
// Create a new node.
@@ -582,7 +586,7 @@ func TestGuaranteedUpdate(t *testing.T) {
func TestGuaranteedUpdateTTL(t *testing.T) {
fakeClient := NewFakeEtcdClient(t)
fakeClient.TestIndex = true
helper := NewEtcdHelper(fakeClient, codec, etcdtest.PathPrefix())
helper := newEtcdHelper(fakeClient, codec, etcdtest.PathPrefix())
key := etcdtest.AddPrefix("/some/key")
// Create a new node.
@@ -683,7 +687,7 @@ func TestGuaranteedUpdateTTL(t *testing.T) {
func TestGuaranteedUpdateNoChange(t *testing.T) {
fakeClient := NewFakeEtcdClient(t)
fakeClient.TestIndex = true
helper := NewEtcdHelper(fakeClient, codec, etcdtest.PathPrefix())
helper := newEtcdHelper(fakeClient, codec, etcdtest.PathPrefix())
key := etcdtest.AddPrefix("/some/key")
// Create a new node.
@@ -715,7 +719,7 @@ func TestGuaranteedUpdateNoChange(t *testing.T) {
func TestGuaranteedUpdateKeyNotFound(t *testing.T) {
fakeClient := NewFakeEtcdClient(t)
fakeClient.TestIndex = true
helper := NewEtcdHelper(fakeClient, codec, etcdtest.PathPrefix())
helper := newEtcdHelper(fakeClient, codec, etcdtest.PathPrefix())
key := etcdtest.AddPrefix("/some/key")
// Create a new node.
@@ -742,7 +746,7 @@ func TestGuaranteedUpdateKeyNotFound(t *testing.T) {
func TestGuaranteedUpdate_CreateCollision(t *testing.T) {
fakeClient := NewFakeEtcdClient(t)
fakeClient.TestIndex = true
helper := NewEtcdHelper(fakeClient, codec, etcdtest.PathPrefix())
helper := newEtcdHelper(fakeClient, codec, etcdtest.PathPrefix())
key := etcdtest.AddPrefix("/some/key")
fakeClient.ExpectNotFoundGet(key)
@@ -840,7 +844,7 @@ func TestGetEtcdVersion_NotListening(t *testing.T) {
func TestPrefixEtcdKey(t *testing.T) {
fakeClient := NewFakeEtcdClient(t)
prefix := path.Join("/", etcdtest.PathPrefix())
helper := NewEtcdHelper(fakeClient, testapi.Codec(), prefix)
helper := newEtcdHelper(fakeClient, testapi.Codec(), prefix)
baseKey := "/some/key"

View File

@@ -70,46 +70,23 @@ func ParseWatchResourceVersion(resourceVersion, kind string) (uint64, error) {
// API objects, and any items passing 'filter' are sent down the returned
// watch.Interface. resourceVersion may be used to specify what version to begin
// watching (e.g., for reconnecting without missing any updates).
func (h *EtcdHelper) WatchList(key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error) {
func (h *etcdHelper) WatchList(key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error) {
key = h.prefixEtcdKey(key)
w := newEtcdWatcher(true, exceptKey(key), filter, h.Codec, h.Versioner, nil, h)
go w.etcdWatch(h.Client, key, resourceVersion)
w := newEtcdWatcher(true, exceptKey(key), filter, h.codec, h.versioner, nil, h)
go w.etcdWatch(h.client, key, resourceVersion)
return w, nil
}
// Watch begins watching the specified key. Events are decoded into
// API objects and sent down the returned watch.Interface.
// Errors will be sent down the channel.
func (h *EtcdHelper) Watch(key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error) {
func (h *etcdHelper) Watch(key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error) {
key = h.prefixEtcdKey(key)
w := newEtcdWatcher(false, nil, filter, h.Codec, h.Versioner, nil, h)
go w.etcdWatch(h.Client, key, resourceVersion)
w := newEtcdWatcher(false, nil, filter, h.codec, h.versioner, nil, h)
go w.etcdWatch(h.client, key, resourceVersion)
return w, nil
}
// WatchAndTransform begins watching the specified key. Events are decoded into
// API objects and sent down the returned watch.Interface. If the transform
// function is provided, the value decoded from etcd will be passed to the function
// prior to being returned.
//
// The transform function can be used to populate data not available to etcd, or to
// change or wrap the serialized etcd object.
//
// startTime := time.Now()
// helper.WatchAndTransform(key, version, func(input runtime.Object) (runtime.Object, error) {
// value := input.(TimeAwareValue)
// value.Since = startTime
// return value, nil
// })
//
// Errors will be sent down the channel.
/*func (h *EtcdHelper) WatchAndTransform(key string, resourceVersion uint64, transform TransformFunc) watch.Interface {
key = h.prefixEtcdKey(key)
w := newEtcdWatcher(false, nil, Everything, h.Codec, h.Versioner, transform, h)
go w.etcdWatch(h.Client, key, resourceVersion)
return w
}*/
// TransformFunc attempts to convert an object to another object for use with a watcher.
type TransformFunc func(runtime.Object) (runtime.Object, error)
@@ -126,7 +103,7 @@ func exceptKey(except string) includeFunc {
// etcdWatcher converts a native etcd watch to a watch.Interface.
type etcdWatcher struct {
encoding runtime.Codec
versioner EtcdVersioner
versioner StorageVersioner
transform TransformFunc
list bool // If we're doing a recursive watch, should be true.
@@ -154,7 +131,7 @@ const watchWaitDuration = 100 * time.Millisecond
// newEtcdWatcher returns a new etcdWatcher; if list is true, watch sub-nodes. If you provide a transform
// and a versioner, the versioner must be able to handle the objects that transform creates.
func newEtcdWatcher(list bool, include includeFunc, filter FilterFunc, encoding runtime.Codec, versioner EtcdVersioner, transform TransformFunc, cache etcdCache) *etcdWatcher {
func newEtcdWatcher(list bool, include includeFunc, filter FilterFunc, encoding runtime.Codec, versioner StorageVersioner, transform TransformFunc, cache etcdCache) *etcdWatcher {
w := &etcdWatcher{
encoding: encoding,
versioner: versioner,

View File

@@ -218,7 +218,7 @@ func TestWatchEtcdError(t *testing.T) {
fakeClient := NewFakeEtcdClient(t)
fakeClient.expectNotFoundGetSet["/some/key"] = struct{}{}
fakeClient.WatchImmediateError = fmt.Errorf("immediate error")
h := NewEtcdHelper(fakeClient, codec, etcdtest.PathPrefix())
h := newEtcdHelper(fakeClient, codec, etcdtest.PathPrefix())
watching, err := h.Watch("/some/key", 4, Everything)
if err != nil {
@@ -248,7 +248,7 @@ func TestWatch(t *testing.T) {
key := "/some/key"
prefixedKey := etcdtest.AddPrefix(key)
fakeClient.expectNotFoundGetSet[prefixedKey] = struct{}{}
h := NewEtcdHelper(fakeClient, codec, etcdtest.PathPrefix())
h := newEtcdHelper(fakeClient, codec, etcdtest.PathPrefix())
watching, err := h.Watch(key, 0, Everything)
if err != nil {
@@ -424,7 +424,7 @@ func TestWatchEtcdState(t *testing.T) {
fakeClient.Data[key] = value
}
h := NewEtcdHelper(fakeClient, codec, etcdtest.PathPrefix())
h := newEtcdHelper(fakeClient, codec, etcdtest.PathPrefix())
watching, err := h.Watch(baseKey, testCase.From, Everything)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
@@ -497,7 +497,7 @@ func TestWatchFromZeroIndex(t *testing.T) {
key := "/some/key"
prefixedKey := etcdtest.AddPrefix(key)
fakeClient.Data[prefixedKey] = testCase.Response
h := NewEtcdHelper(fakeClient, codec, etcdtest.PathPrefix())
h := newEtcdHelper(fakeClient, codec, etcdtest.PathPrefix())
watching, err := h.Watch(key, 0, Everything)
if err != nil {
@@ -558,7 +558,7 @@ func TestWatchListFromZeroIndex(t *testing.T) {
EtcdIndex: 3,
},
}
h := NewEtcdHelper(fakeClient, codec, etcdtest.PathPrefix())
h := newEtcdHelper(fakeClient, codec, etcdtest.PathPrefix())
watching, err := h.WatchList(key, 0, Everything)
if err != nil {
@@ -598,7 +598,7 @@ func TestWatchListIgnoresRootKey(t *testing.T) {
prefixedKey := etcdtest.AddPrefix(key)
fakeClient := NewFakeEtcdClient(t)
h := NewEtcdHelper(fakeClient, codec, etcdtest.PathPrefix())
h := newEtcdHelper(fakeClient, codec, etcdtest.PathPrefix())
watching, err := h.WatchList(key, 1, Everything)
if err != nil {
@@ -651,7 +651,7 @@ func TestWatchFromNotFound(t *testing.T) {
ErrorCode: 100,
},
}
h := NewEtcdHelper(fakeClient, codec, etcdtest.PathPrefix())
h := newEtcdHelper(fakeClient, codec, etcdtest.PathPrefix())
watching, err := h.Watch(key, 0, Everything)
if err != nil {
@@ -678,7 +678,7 @@ func TestWatchFromOtherError(t *testing.T) {
ErrorCode: 101,
},
}
h := NewEtcdHelper(fakeClient, codec, etcdtest.PathPrefix())
h := newEtcdHelper(fakeClient, codec, etcdtest.PathPrefix())
watching, err := h.Watch(key, 0, Everything)
if err != nil {
@@ -710,7 +710,7 @@ func TestWatchFromOtherError(t *testing.T) {
func TestWatchPurposefulShutdown(t *testing.T) {
fakeClient := NewFakeEtcdClient(t)
h := NewEtcdHelper(fakeClient, codec, etcdtest.PathPrefix())
h := newEtcdHelper(fakeClient, codec, etcdtest.PathPrefix())
key := "/some/key"
prefixedKey := etcdtest.AddPrefix(key)
fakeClient.expectNotFoundGetSet[prefixedKey] = struct{}{}

View File

@@ -29,7 +29,7 @@ import (
// for objects that have an embedded ObjectMeta or ListMeta field.
type APIObjectVersioner struct{}
// UpdateObject implements EtcdVersioner
// UpdateObject implements StorageVersioner
func (a APIObjectVersioner) UpdateObject(obj runtime.Object, expiration *time.Time, resourceVersion uint64) error {
objectMeta, err := api.ObjectMetaFor(obj)
if err != nil {
@@ -46,7 +46,7 @@ func (a APIObjectVersioner) UpdateObject(obj runtime.Object, expiration *time.Ti
return nil
}
// UpdateList implements EtcdVersioner
// UpdateList implements StorageVersioner
func (a APIObjectVersioner) UpdateList(obj runtime.Object, resourceVersion uint64) error {
listMeta, err := api.ListMetaFor(obj)
if err != nil || listMeta == nil {
@@ -60,7 +60,7 @@ func (a APIObjectVersioner) UpdateList(obj runtime.Object, resourceVersion uint6
return nil
}
// ObjectResourceVersion implements EtcdVersioner
// ObjectResourceVersion implements StorageVersioner
func (a APIObjectVersioner) ObjectResourceVersion(obj runtime.Object) (uint64, error) {
meta, err := api.ObjectMetaFor(obj)
if err != nil {
@@ -73,5 +73,5 @@ func (a APIObjectVersioner) ObjectResourceVersion(obj runtime.Object) (uint64, e
return strconv.ParseUint(version, 10, 64)
}
// APIObjectVersioner implements EtcdVersioner
var _ EtcdVersioner = APIObjectVersioner{}
// APIObjectVersioner implements StorageVersioner
var _ StorageVersioner = APIObjectVersioner{}

View File

@@ -52,9 +52,9 @@ type EtcdClient interface {
Watch(prefix string, waitIndex uint64, recursive bool, receiver chan *etcd.Response, stop chan bool) (*etcd.Response, error)
}
// EtcdVersioner abstracts setting and retrieving fields from the etcd response onto the object
// StorageVersioner abstracts setting and retrieving metadata fields from the etcd response onto the object
// or list.
type EtcdVersioner interface {
type StorageVersioner interface {
// UpdateObject sets etcd storage metadata into an API object. Returns an error if the object
// cannot be updated correctly. May return nil if the requested object does not need metadata
// from etcd.
@@ -91,6 +91,14 @@ type StorageUpdateFunc func(input runtime.Object, res ResponseMeta) (output runt
// StorageInterface offers a common interface for object marshaling/unmarshling operations and
// hids all the storage-related operations behind it.
type StorageInterface interface {
// Returns list of servers addresses of the underyling database.
// TODO: This method is used only in a single place. Consider refactoring and getting rid
// of this method from the interface.
Backends() []string
// Returns StorageVersioner associated with this interface.
Versioner() StorageVersioner
// CreateObj adds a new object at a key unless it already exists. 'ttl' is time-to-live
// in seconds (0 means forever). If no error is returned and out is not nil, out will be
// set to the read value from etcd.