Revert "Use atomic create in EtcdHelper.AtomicUpdate"

This commit is contained in:
Daniel Smith 2014-08-05 11:34:00 -07:00
parent 51872d65c9
commit 73b64aa490
3 changed files with 16 additions and 98 deletions

View File

@ -98,11 +98,6 @@ func IsEtcdTestFailed(err error) bool {
return isEtcdErrorNum(err, EtcdErrorCodeTestFailed) return isEtcdErrorNum(err, EtcdErrorCodeTestFailed)
} }
// IsEtcdNodeExist returns true iff err is an etcd node aleady exist error.
func IsEtcdNodeExist(err error) bool {
return isEtcdErrorNum(err, EtcdErrorCodeNodeExist)
}
// IsEtcdWatchStoppedByUser returns true iff err is a client triggered stop. // IsEtcdWatchStoppedByUser returns true iff err is a client triggered stop.
func IsEtcdWatchStoppedByUser(err error) bool { func IsEtcdWatchStoppedByUser(err error) bool {
return etcd.ErrWatchStoppedByUser == err return etcd.ErrWatchStoppedByUser == err
@ -258,20 +253,15 @@ func (h *EtcdHelper) AtomicUpdate(key string, ptrToType interface{}, tryUpdate E
return err return err
} }
// First time this key has been used, just set.
if index == 0 {
return h.SetObj(key, ret)
}
data, err := h.Encoding.Encode(ret) data, err := h.Encoding.Encode(ret)
if err != nil { if err != nil {
return err return err
} }
// First time this key has been used, try creating new value.
if index == 0 {
_, err = h.Client.Create(key, string(data), 0)
if IsEtcdNodeExist(err) {
continue
}
return err
}
_, err = h.Client.CompareAndSwap(key, string(data), 0, origBody, index) _, err = h.Client.CompareAndSwap(key, string(data), 0, origBody, index)
if IsEtcdTestFailed(err) { if IsEtcdTestFailed(err) {
continue continue

View File

@ -19,7 +19,6 @@ package tools
import ( import (
"fmt" "fmt"
"reflect" "reflect"
"sync"
"testing" "testing"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
@ -227,7 +226,7 @@ func TestAtomicUpdate(t *testing.T) {
// Create a new node. // Create a new node.
fakeClient.ExpectNotFoundGet("/some/key") fakeClient.ExpectNotFoundGet("/some/key")
obj := &TestResource{JSONBase: api.JSONBase{ID: "foo"}, Value: 1} obj := TestResource{JSONBase: api.JSONBase{ID: "foo"}, Value: 1}
err := helper.AtomicUpdate("/some/key", &TestResource{}, func(in interface{}) (interface{}, error) { err := helper.AtomicUpdate("/some/key", &TestResource{}, func(in interface{}) (interface{}, error) {
return obj, nil return obj, nil
}) })
@ -243,6 +242,7 @@ func TestAtomicUpdate(t *testing.T) {
if expect != got { if expect != got {
t.Errorf("Wanted %v, got %v", expect, got) t.Errorf("Wanted %v, got %v", expect, got)
} }
return
// Update an existing node. // Update an existing node.
callbackCalled := false callbackCalled := false
@ -274,57 +274,6 @@ func TestAtomicUpdate(t *testing.T) {
} }
} }
func TestAtomicUpdate_CreateCollision(t *testing.T) {
fakeClient := MakeFakeEtcdClient(t)
fakeClient.TestIndex = true
encoding := scheme
helper := EtcdHelper{fakeClient, encoding, api.JSONBaseVersioning{}}
fakeClient.ExpectNotFoundGet("/some/key")
const concurrency = 10
var wgDone sync.WaitGroup
var wgForceCollision sync.WaitGroup
wgDone.Add(concurrency)
wgForceCollision.Add(concurrency)
for i := 0; i < concurrency; i++ {
// Increment TestResource.Value by 1
go func() {
defer wgDone.Done()
firstCall := true
err := helper.AtomicUpdate("/some/key", &TestResource{}, func(in interface{}) (interface{}, error) {
defer func() { firstCall = false }()
if firstCall {
// Force collision by joining all concurrent AtomicUpdate operations here.
wgForceCollision.Done()
wgForceCollision.Wait()
}
currValue := in.(*TestResource).Value
obj := TestResource{JSONBase: api.JSONBase{ID: "foo"}, Value: currValue + 1}
return obj, nil
})
if err != nil {
t.Errorf("Unexpected error %#v", err)
}
}()
}
wgDone.Wait()
// Check that stored TestResource has received all updates.
body := fakeClient.Data["/some/key"].R.Node.Value
stored := &TestResource{}
if err := encoding.DecodeInto([]byte(body), stored); err != nil {
t.Errorf("Error decoding stored value: %v", body)
}
if stored.Value != concurrency {
t.Errorf("Some of the writes were lost. Stored value: %d", stored.Value)
}
}
func TestWatchInterpretation_ListAdd(t *testing.T) { func TestWatchInterpretation_ListAdd(t *testing.T) {
w := newEtcdWatcher(true, func(interface{}) bool { w := newEtcdWatcher(true, func(interface{}) bool {
t.Errorf("unexpected filter call") t.Errorf("unexpected filter call")

View File

@ -19,7 +19,6 @@ package tools
import ( import (
"errors" "errors"
"fmt" "fmt"
"sync"
"github.com/coreos/go-etcd/etcd" "github.com/coreos/go-etcd/etcd"
) )
@ -41,12 +40,11 @@ type FakeEtcdClient struct {
Data map[string]EtcdResponseWithError Data map[string]EtcdResponseWithError
DeletedKeys []string DeletedKeys []string
expectNotFoundGetSet map[string]struct{} expectNotFoundGetSet map[string]struct{}
sync.Mutex Err error
Err error t TestLogger
t TestLogger Ix int
Ix int TestIndex bool
TestIndex bool ChangeIndex uint64
ChangeIndex uint64
// Will become valid after Watch is called; tester may write to it. Tester may // Will become valid after Watch is called; tester may write to it. Tester may
// also read from it to verify that it's closed after injecting an error. // also read from it to verify that it's closed after injecting an error.
@ -91,17 +89,11 @@ func (f *FakeEtcdClient) generateIndex() uint64 {
} }
func (f *FakeEtcdClient) AddChild(key, data string, ttl uint64) (*etcd.Response, error) { func (f *FakeEtcdClient) AddChild(key, data string, ttl uint64) (*etcd.Response, error) {
f.Mutex.Lock()
defer f.Mutex.Unlock()
f.Ix = f.Ix + 1 f.Ix = f.Ix + 1
return f.setLocked(fmt.Sprintf("%s/%d", key, f.Ix), data, ttl) return f.Set(fmt.Sprintf("%s/%d", key, f.Ix), data, ttl)
} }
func (f *FakeEtcdClient) Get(key string, sort, recursive bool) (*etcd.Response, error) { func (f *FakeEtcdClient) Get(key string, sort, recursive bool) (*etcd.Response, error) {
f.Mutex.Lock()
defer f.Mutex.Unlock()
result := f.Data[key] result := f.Data[key]
if result.R == nil { if result.R == nil {
if _, ok := f.expectNotFoundGetSet[key]; !ok { if _, ok := f.expectNotFoundGetSet[key]; !ok {
@ -118,7 +110,7 @@ func (f *FakeEtcdClient) nodeExists(key string) bool {
return ok && result.R != nil && result.R.Node != nil return ok && result.R != nil && result.R.Node != nil
} }
func (f *FakeEtcdClient) setLocked(key, value string, ttl uint64) (*etcd.Response, error) { func (f *FakeEtcdClient) Set(key, value string, ttl uint64) (*etcd.Response, error) {
if f.Err != nil { if f.Err != nil {
return nil, f.Err return nil, f.Err
} }
@ -154,13 +146,6 @@ func (f *FakeEtcdClient) setLocked(key, value string, ttl uint64) (*etcd.Respons
return result.R, nil return result.R, nil
} }
func (f *FakeEtcdClient) Set(key, value string, ttl uint64) (*etcd.Response, error) {
f.Mutex.Lock()
defer f.Mutex.Unlock()
return f.setLocked(key, value, ttl)
}
func (f *FakeEtcdClient) CompareAndSwap(key, value string, ttl uint64, prevValue string, prevIndex uint64) (*etcd.Response, error) { func (f *FakeEtcdClient) CompareAndSwap(key, value string, ttl uint64, prevValue string, prevIndex uint64) (*etcd.Response, error) {
if f.Err != nil { if f.Err != nil {
return nil, f.Err return nil, f.Err
@ -175,9 +160,6 @@ func (f *FakeEtcdClient) CompareAndSwap(key, value string, ttl uint64, prevValue
return nil, errors.New("Either prevValue or prevIndex must be specified.") return nil, errors.New("Either prevValue or prevIndex must be specified.")
} }
f.Mutex.Lock()
defer f.Mutex.Unlock()
if !f.nodeExists(key) { if !f.nodeExists(key) {
return nil, EtcdErrorNotFound return nil, EtcdErrorNotFound
} }
@ -192,18 +174,15 @@ func (f *FakeEtcdClient) CompareAndSwap(key, value string, ttl uint64, prevValue
return nil, EtcdErrorTestFailed return nil, EtcdErrorTestFailed
} }
return f.setLocked(key, value, ttl) return f.Set(key, value, ttl)
} }
func (f *FakeEtcdClient) Create(key, value string, ttl uint64) (*etcd.Response, error) { func (f *FakeEtcdClient) Create(key, value string, ttl uint64) (*etcd.Response, error) {
f.Mutex.Lock()
defer f.Mutex.Unlock()
if f.nodeExists(key) { if f.nodeExists(key) {
return nil, EtcdErrorNodeExist return nil, EtcdErrorNodeExist
} }
return f.setLocked(key, value, ttl) return f.Set(key, value, ttl)
} }
func (f *FakeEtcdClient) Delete(key string, recursive bool) (*etcd.Response, error) { func (f *FakeEtcdClient) Delete(key string, recursive bool) (*etcd.Response, error) {