Add test for EtcdHelper.AtomicUpdate concurrent create
This commit is contained in:
@@ -19,6 +19,7 @@ package tools
|
|||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"reflect"
|
"reflect"
|
||||||
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@@ -227,7 +228,7 @@ func TestAtomicUpdate(t *testing.T) {
|
|||||||
|
|
||||||
// Create a new node.
|
// Create a new node.
|
||||||
fakeClient.ExpectNotFoundGet("/some/key")
|
fakeClient.ExpectNotFoundGet("/some/key")
|
||||||
obj := TestResource{JSONBase: api.JSONBase{ID: "foo"}, Value: 1}
|
obj := &TestResource{JSONBase: api.JSONBase{ID: "foo"}, Value: 1}
|
||||||
err := helper.AtomicUpdate("/some/key", &TestResource{}, func(in interface{}) (interface{}, error) {
|
err := helper.AtomicUpdate("/some/key", &TestResource{}, func(in interface{}) (interface{}, error) {
|
||||||
return obj, nil
|
return obj, nil
|
||||||
})
|
})
|
||||||
@@ -274,6 +275,57 @@ func TestAtomicUpdate(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestAtomicUpdate_CreateCollision(t *testing.T) {
|
||||||
|
fakeClient := MakeFakeEtcdClient(t)
|
||||||
|
fakeClient.TestIndex = true
|
||||||
|
encoding := scheme
|
||||||
|
helper := EtcdHelper{fakeClient, encoding, api.JSONBaseVersioning{}}
|
||||||
|
|
||||||
|
fakeClient.ExpectNotFoundGet("/some/key")
|
||||||
|
|
||||||
|
const concurrency = 10
|
||||||
|
var wgDone sync.WaitGroup
|
||||||
|
var wgForceCollision sync.WaitGroup
|
||||||
|
wgDone.Add(concurrency)
|
||||||
|
wgForceCollision.Add(concurrency)
|
||||||
|
|
||||||
|
for i := 0; i < concurrency; i++ {
|
||||||
|
// Increment TestResource.Value by 1
|
||||||
|
go func() {
|
||||||
|
defer wgDone.Done()
|
||||||
|
|
||||||
|
firstCall := true
|
||||||
|
err := helper.AtomicUpdate("/some/key", &TestResource{}, func(in interface{}) (interface{}, error) {
|
||||||
|
defer func() { firstCall = false }()
|
||||||
|
|
||||||
|
if firstCall {
|
||||||
|
// Force collision by joining all concurrent AtomicUpdate operations here.
|
||||||
|
wgForceCollision.Done()
|
||||||
|
wgForceCollision.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
currValue := in.(*TestResource).Value
|
||||||
|
obj := TestResource{JSONBase: api.JSONBase{ID: "foo"}, Value: currValue + 1}
|
||||||
|
return obj, nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("Unexpected error %#v", err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
wgDone.Wait()
|
||||||
|
|
||||||
|
// Check that stored TestResource has received all updates.
|
||||||
|
body := fakeClient.Data["/some/key"].R.Node.Value
|
||||||
|
stored := &TestResource{}
|
||||||
|
if err := encoding.DecodeInto([]byte(body), stored); err != nil {
|
||||||
|
t.Errorf("Error decoding stored value: %v", body)
|
||||||
|
}
|
||||||
|
if stored.Value != concurrency {
|
||||||
|
t.Errorf("Some of the writes were lost. Stored value: %d", stored.Value)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestWatchInterpretation_ListAdd(t *testing.T) {
|
func TestWatchInterpretation_ListAdd(t *testing.T) {
|
||||||
w := newEtcdWatcher(true, func(interface{}) bool {
|
w := newEtcdWatcher(true, func(interface{}) bool {
|
||||||
t.Errorf("unexpected filter call")
|
t.Errorf("unexpected filter call")
|
||||||
|
Reference in New Issue
Block a user