add a knob to enable quorum read
This commit is contained in:
@@ -30,6 +30,7 @@ type ServerRunOptions struct {
|
||||
BindAddress net.IP
|
||||
CertDirectory string
|
||||
ClientCAFile string
|
||||
EtcdQuorumRead bool
|
||||
InsecureBindAddress net.IP
|
||||
InsecurePort int
|
||||
LongRunningRequestRE string
|
||||
|
||||
@@ -66,9 +66,9 @@ func setUp(t *testing.T) (Master, *etcdtesting.EtcdTestServer, Config, *assert.A
|
||||
storageVersions := make(map[string]string)
|
||||
storageDestinations := genericapiserver.NewStorageDestinations()
|
||||
storageDestinations.AddAPIGroup(
|
||||
api.GroupName, etcdstorage.NewEtcdStorage(server.Client, testapi.Default.Codec(), etcdtest.PathPrefix()))
|
||||
api.GroupName, etcdstorage.NewEtcdStorage(server.Client, testapi.Default.Codec(), etcdtest.PathPrefix(), false))
|
||||
storageDestinations.AddAPIGroup(
|
||||
extensions.GroupName, etcdstorage.NewEtcdStorage(server.Client, testapi.Extensions.Codec(), etcdtest.PathPrefix()))
|
||||
extensions.GroupName, etcdstorage.NewEtcdStorage(server.Client, testapi.Extensions.Codec(), etcdtest.PathPrefix(), false))
|
||||
|
||||
config.StorageDestinations = storageDestinations
|
||||
storageVersions[api.GroupName] = testapi.Default.GroupVersion().String()
|
||||
@@ -348,7 +348,7 @@ func initThirdParty(t *testing.T, version string) (*Master, *etcdtesting.EtcdTes
|
||||
},
|
||||
}
|
||||
master.HandlerContainer = restful.NewContainer()
|
||||
master.thirdPartyStorage = etcdstorage.NewEtcdStorage(etcdserver.Client, testapi.Extensions.Codec(), etcdtest.PathPrefix())
|
||||
master.thirdPartyStorage = etcdstorage.NewEtcdStorage(etcdserver.Client, testapi.Extensions.Codec(), etcdtest.PathPrefix(), false)
|
||||
|
||||
if !assert.NoError(master.InstallThirdPartyResource(api)) {
|
||||
t.FailNow()
|
||||
|
||||
@@ -88,7 +88,7 @@ func hasCreated(t *testing.T, pod *api.Pod) func(runtime.Object) bool {
|
||||
func NewTestGenericEtcdRegistry(t *testing.T) (*etcdtesting.EtcdTestServer, *Etcd) {
|
||||
podPrefix := "/pods"
|
||||
server := etcdtesting.NewEtcdTestClientServer(t)
|
||||
s := etcdstorage.NewEtcdStorage(server.Client, testapi.Default.Codec(), etcdtest.PathPrefix())
|
||||
s := etcdstorage.NewEtcdStorage(server.Client, testapi.Default.Codec(), etcdtest.PathPrefix(), false)
|
||||
strategy := &testRESTStrategy{api.Scheme, api.SimpleNameGenerator, true, false, true}
|
||||
|
||||
return server, &Etcd{
|
||||
|
||||
@@ -37,7 +37,7 @@ import (
|
||||
|
||||
func NewEtcdStorage(t *testing.T, group string) (storage.Interface, *etcdtesting.EtcdTestServer) {
|
||||
server := etcdtesting.NewEtcdTestClientServer(t)
|
||||
storage := etcdstorage.NewEtcdStorage(server.Client, testapi.Groups[group].Codec(), etcdtest.PathPrefix())
|
||||
storage := etcdstorage.NewEtcdStorage(server.Client, testapi.Groups[group].Codec(), etcdtest.PathPrefix(), false)
|
||||
return storage, server
|
||||
}
|
||||
|
||||
|
||||
@@ -42,7 +42,7 @@ import (
|
||||
|
||||
func newEtcdTestStorage(t *testing.T, codec runtime.Codec, prefix string) (*etcdtesting.EtcdTestServer, storage.Interface) {
|
||||
server := etcdtesting.NewEtcdTestClientServer(t)
|
||||
storage := etcdstorage.NewEtcdStorage(server.Client, codec, prefix)
|
||||
storage := etcdstorage.NewEtcdStorage(server.Client, codec, prefix, false)
|
||||
return server, storage
|
||||
}
|
||||
|
||||
|
||||
@@ -46,6 +46,7 @@ type EtcdConfig struct {
|
||||
ServerList []string
|
||||
Codec runtime.Codec
|
||||
Prefix string
|
||||
Quorum bool
|
||||
}
|
||||
|
||||
// implements storage.Config
|
||||
@@ -72,12 +73,12 @@ func (c *EtcdConfig) NewStorage() (storage.Interface, error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return NewEtcdStorage(etcdClient, c.Codec, c.Prefix), nil
|
||||
return NewEtcdStorage(etcdClient, c.Codec, c.Prefix, c.Quorum), nil
|
||||
}
|
||||
|
||||
// Creates a new storage interface from the client
|
||||
// TODO: deprecate in favor of storage.Config abstraction over time
|
||||
func NewEtcdStorage(client etcd.Client, codec runtime.Codec, prefix string) storage.Interface {
|
||||
func NewEtcdStorage(client etcd.Client, codec runtime.Codec, prefix string, quorum bool) storage.Interface {
|
||||
return &etcdHelper{
|
||||
etcdclient: client,
|
||||
client: etcd.NewKeysAPI(client),
|
||||
@@ -85,6 +86,7 @@ func NewEtcdStorage(client etcd.Client, codec runtime.Codec, prefix string) stor
|
||||
versioner: APIObjectVersioner{},
|
||||
copier: api.Scheme,
|
||||
pathPrefix: path.Join("/", prefix),
|
||||
quorum: quorum,
|
||||
cache: util.NewCache(maxEtcdCacheEntries),
|
||||
}
|
||||
}
|
||||
@@ -99,6 +101,8 @@ type etcdHelper struct {
|
||||
versioner storage.Versioner
|
||||
// prefix for all etcd keys
|
||||
pathPrefix string
|
||||
// if true, perform quorum read
|
||||
quorum bool
|
||||
|
||||
// We cache objects stored in etcd. For keys we use Node.ModifiedIndex which is equivalent
|
||||
// to resourceVersion.
|
||||
@@ -269,7 +273,7 @@ func (h *etcdHelper) Watch(ctx context.Context, key string, resourceVersion stri
|
||||
return nil, err
|
||||
}
|
||||
key = h.prefixEtcdKey(key)
|
||||
w := newEtcdWatcher(false, nil, filter, h.codec, h.versioner, nil, h)
|
||||
w := newEtcdWatcher(false, h.quorum, nil, filter, h.codec, h.versioner, nil, h)
|
||||
go w.etcdWatch(ctx, h.client, key, watchRV)
|
||||
return w, nil
|
||||
}
|
||||
@@ -284,7 +288,7 @@ func (h *etcdHelper) WatchList(ctx context.Context, key string, resourceVersion
|
||||
return nil, err
|
||||
}
|
||||
key = h.prefixEtcdKey(key)
|
||||
w := newEtcdWatcher(true, exceptKey(key), filter, h.codec, h.versioner, nil, h)
|
||||
w := newEtcdWatcher(true, h.quorum, exceptKey(key), filter, h.codec, h.versioner, nil, h)
|
||||
go w.etcdWatch(ctx, h.client, key, watchRV)
|
||||
return w, nil
|
||||
}
|
||||
@@ -306,7 +310,12 @@ func (h *etcdHelper) bodyAndExtractObj(ctx context.Context, key string, objPtr r
|
||||
glog.Errorf("Context is nil")
|
||||
}
|
||||
startTime := time.Now()
|
||||
response, err := h.client.Get(ctx, key, nil)
|
||||
|
||||
opts := &etcd.GetOptions{
|
||||
Quorum: h.quorum,
|
||||
}
|
||||
|
||||
response, err := h.client.Get(ctx, key, opts)
|
||||
metrics.RecordEtcdRequestLatency("get", getTypeName(objPtr), startTime)
|
||||
|
||||
if err != nil && !etcdutil.IsEtcdNotFound(err) {
|
||||
@@ -365,7 +374,12 @@ func (h *etcdHelper) GetToList(ctx context.Context, key string, filter storage.F
|
||||
key = h.prefixEtcdKey(key)
|
||||
startTime := time.Now()
|
||||
trace.Step("About to read etcd node")
|
||||
response, err := h.client.Get(ctx, key, nil)
|
||||
|
||||
opts := &etcd.GetOptions{
|
||||
Quorum: h.quorum,
|
||||
}
|
||||
response, err := h.client.Get(ctx, key, opts)
|
||||
|
||||
metrics.RecordEtcdRequestLatency("get", getTypeName(listPtr), startTime)
|
||||
trace.Step("Etcd node read")
|
||||
if err != nil {
|
||||
@@ -473,6 +487,7 @@ func (h *etcdHelper) listEtcdNode(ctx context.Context, key string) ([]*etcd.Node
|
||||
opts := etcd.GetOptions{
|
||||
Recursive: true,
|
||||
Sort: true,
|
||||
Quorum: h.quorum,
|
||||
}
|
||||
result, err := h.client.Get(ctx, key, &opts)
|
||||
if err != nil {
|
||||
|
||||
@@ -63,7 +63,7 @@ func testScheme(t *testing.T) (*runtime.Scheme, runtime.Codec) {
|
||||
}
|
||||
|
||||
func newEtcdHelper(client etcd.Client, codec runtime.Codec, prefix string) etcdHelper {
|
||||
return *NewEtcdStorage(client, codec, prefix).(*etcdHelper)
|
||||
return *NewEtcdStorage(client, codec, prefix, false).(*etcdHelper)
|
||||
}
|
||||
|
||||
// Returns an encoded version of api.Pod with the given name.
|
||||
|
||||
@@ -82,6 +82,7 @@ type etcdWatcher struct {
|
||||
transform TransformFunc
|
||||
|
||||
list bool // If we're doing a recursive watch, should be true.
|
||||
quorum bool // If we enable quorum, shoule be true
|
||||
include includeFunc
|
||||
filter storage.FilterFunc
|
||||
|
||||
@@ -109,12 +110,13 @@ const watchWaitDuration = 100 * time.Millisecond
|
||||
|
||||
// newEtcdWatcher returns a new etcdWatcher; if list is true, watch sub-nodes. If you provide a transform
|
||||
// and a versioner, the versioner must be able to handle the objects that transform creates.
|
||||
func newEtcdWatcher(list bool, include includeFunc, filter storage.FilterFunc, encoding runtime.Codec, versioner storage.Versioner, transform TransformFunc, cache etcdCache) *etcdWatcher {
|
||||
func newEtcdWatcher(list bool, quorum bool, include includeFunc, filter storage.FilterFunc, encoding runtime.Codec, versioner storage.Versioner, transform TransformFunc, cache etcdCache) *etcdWatcher {
|
||||
w := &etcdWatcher{
|
||||
encoding: encoding,
|
||||
versioner: versioner,
|
||||
transform: transform,
|
||||
list: list,
|
||||
quorum: quorum,
|
||||
include: include,
|
||||
filter: filter,
|
||||
// Buffer this channel, so that the etcd client is not forced
|
||||
@@ -171,7 +173,7 @@ func (w *etcdWatcher) etcdWatch(ctx context.Context, client etcd.KeysAPI, key st
|
||||
// Stop() is called in the meantime (which in tests can cause etcd termination and
|
||||
// strange behavior here).
|
||||
if resourceVersion == 0 {
|
||||
latest, err := etcdGetInitialWatchState(ctx, client, key, w.list, w.etcdIncoming)
|
||||
latest, err := etcdGetInitialWatchState(ctx, client, key, w.list, w.quorum, w.etcdIncoming)
|
||||
if err != nil {
|
||||
w.etcdError <- err
|
||||
return true
|
||||
@@ -203,10 +205,11 @@ func (w *etcdWatcher) etcdWatch(ctx context.Context, client etcd.KeysAPI, key st
|
||||
}
|
||||
|
||||
// etcdGetInitialWatchState turns an etcd Get request into a watch equivalent
|
||||
func etcdGetInitialWatchState(ctx context.Context, client etcd.KeysAPI, key string, recursive bool, incoming chan<- *etcd.Response) (resourceVersion uint64, err error) {
|
||||
func etcdGetInitialWatchState(ctx context.Context, client etcd.KeysAPI, key string, recursive bool, quorum bool, incoming chan<- *etcd.Response) (resourceVersion uint64, err error) {
|
||||
opts := etcd.GetOptions{
|
||||
Recursive: recursive,
|
||||
Sort: false,
|
||||
Quorum: quorum,
|
||||
}
|
||||
resp, err := client.Get(ctx, key, &opts)
|
||||
if err != nil {
|
||||
|
||||
@@ -131,7 +131,7 @@ func TestWatchInterpretations(t *testing.T) {
|
||||
|
||||
for name, item := range table {
|
||||
for _, action := range item.actions {
|
||||
w := newEtcdWatcher(true, nil, firstLetterIsB, codec, versioner, nil, &fakeEtcdCache{})
|
||||
w := newEtcdWatcher(true, false, nil, firstLetterIsB, codec, versioner, nil, &fakeEtcdCache{})
|
||||
emitCalled := false
|
||||
w.emit = func(event watch.Event) {
|
||||
emitCalled = true
|
||||
@@ -170,7 +170,7 @@ func TestWatchInterpretations(t *testing.T) {
|
||||
|
||||
func TestWatchInterpretation_ResponseNotSet(t *testing.T) {
|
||||
_, codec := testScheme(t)
|
||||
w := newEtcdWatcher(false, nil, storage.Everything, codec, versioner, nil, &fakeEtcdCache{})
|
||||
w := newEtcdWatcher(false, false, nil, storage.Everything, codec, versioner, nil, &fakeEtcdCache{})
|
||||
w.emit = func(e watch.Event) {
|
||||
t.Errorf("Unexpected emit: %v", e)
|
||||
}
|
||||
@@ -185,7 +185,7 @@ func TestWatchInterpretation_ResponseNoNode(t *testing.T) {
|
||||
_, codec := testScheme(t)
|
||||
actions := []string{"create", "set", "compareAndSwap", "delete"}
|
||||
for _, action := range actions {
|
||||
w := newEtcdWatcher(false, nil, storage.Everything, codec, versioner, nil, &fakeEtcdCache{})
|
||||
w := newEtcdWatcher(false, false, nil, storage.Everything, codec, versioner, nil, &fakeEtcdCache{})
|
||||
w.emit = func(e watch.Event) {
|
||||
t.Errorf("Unexpected emit: %v", e)
|
||||
}
|
||||
@@ -200,7 +200,7 @@ func TestWatchInterpretation_ResponseBadData(t *testing.T) {
|
||||
_, codec := testScheme(t)
|
||||
actions := []string{"create", "set", "compareAndSwap", "delete"}
|
||||
for _, action := range actions {
|
||||
w := newEtcdWatcher(false, nil, storage.Everything, codec, versioner, nil, &fakeEtcdCache{})
|
||||
w := newEtcdWatcher(false, false, nil, storage.Everything, codec, versioner, nil, &fakeEtcdCache{})
|
||||
w.emit = func(e watch.Event) {
|
||||
t.Errorf("Unexpected emit: %v", e)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user