Add a utility for doing master election via etcd.

This commit is contained in:
Brendan Burns
2014-08-04 14:42:51 -07:00
parent 24b5d58268
commit a17acd30ee
6 changed files with 346 additions and 5 deletions

View File

@@ -91,16 +91,16 @@ func IsEtcdNotFound(err error) bool {
return isEtcdErrorNum(err, EtcdErrorCodeNotFound)
}
// IsEtcdTestFailed returns true iff err is an etcd write conflict.
func IsEtcdTestFailed(err error) bool {
return isEtcdErrorNum(err, EtcdErrorCodeTestFailed)
}
// IsEtcdNodeExist returns true iff err is an etcd node aleady exist error.
func IsEtcdNodeExist(err error) bool {
return isEtcdErrorNum(err, EtcdErrorCodeNodeExist)
}
// IsEtcdTestFailed returns true iff err is an etcd write conflict.
func IsEtcdTestFailed(err error) bool {
return isEtcdErrorNum(err, EtcdErrorCodeTestFailed)
}
// IsEtcdWatchStoppedByUser returns true iff err is a client triggered stop.
func IsEtcdWatchStoppedByUser(err error) bool {
return etcd.ErrWatchStoppedByUser == err

View File

@@ -27,6 +27,8 @@ import (
type EtcdResponseWithError struct {
R *etcd.Response
E error
// if N is non-null, it will be assigned into the map after this response is used for an operation
N *EtcdResponseWithError
}
// TestLogger is a type passed to Test functions to support formatted test logs.
@@ -92,6 +94,15 @@ func (f *FakeEtcdClient) generateIndex() uint64 {
return f.ChangeIndex
}
// Requires that f.Mutex be held.
func (f *FakeEtcdClient) updateResponse(key string) {
resp, found := f.Data[key]
if !found || resp.N == nil {
return
}
f.Data[key] = *resp.N
}
func (f *FakeEtcdClient) AddChild(key, data string, ttl uint64) (*etcd.Response, error) {
f.Mutex.Lock()
defer f.Mutex.Unlock()
@@ -103,6 +114,7 @@ func (f *FakeEtcdClient) AddChild(key, data string, ttl uint64) (*etcd.Response,
func (f *FakeEtcdClient) Get(key string, sort, recursive bool) (*etcd.Response, error) {
f.Mutex.Lock()
defer f.Mutex.Unlock()
defer f.updateResponse(key)
result := f.Data[key]
if result.R == nil {
@@ -161,6 +173,7 @@ func (f *FakeEtcdClient) setLocked(key, value string, ttl uint64) (*etcd.Respons
func (f *FakeEtcdClient) Set(key, value string, ttl uint64) (*etcd.Response, error) {
f.Mutex.Lock()
defer f.Mutex.Unlock()
defer f.updateResponse(key)
return f.setLocked(key, value, ttl)
}
@@ -182,6 +195,7 @@ func (f *FakeEtcdClient) CompareAndSwap(key, value string, ttl uint64, prevValue
f.Mutex.Lock()
defer f.Mutex.Unlock()
defer f.updateResponse(key)
if !f.nodeExists(key) {
f.t.Logf("c&s: node doesn't exist")
@@ -206,6 +220,7 @@ func (f *FakeEtcdClient) CompareAndSwap(key, value string, ttl uint64, prevValue
func (f *FakeEtcdClient) Create(key, value string, ttl uint64) (*etcd.Response, error) {
f.Mutex.Lock()
defer f.Mutex.Unlock()
defer f.updateResponse(key)
if f.nodeExists(key) {
return nil, EtcdErrorNodeExist