Adds ability to define a prefix for etcd paths
The API server can be supplied (via a command line flag) with a custom prefix that is prepended to etcd resources paths. Refs: #3476
This commit is contained in:
@@ -23,7 +23,9 @@ import (
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"os/exec"
|
||||
"path"
|
||||
"reflect"
|
||||
"strings"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/conversion"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
|
||||
@@ -38,15 +40,18 @@ type EtcdHelper struct {
|
||||
Codec runtime.Codec
|
||||
// optional, no atomic operations can be performed without this interface
|
||||
Versioner EtcdVersioner
|
||||
// prefix for all etcd keys
|
||||
PathPrefix string
|
||||
}
|
||||
|
||||
// NewEtcdHelper creates a helper that works against objects that use the internal
|
||||
// Kubernetes API objects.
|
||||
func NewEtcdHelper(client EtcdGetSet, codec runtime.Codec) EtcdHelper {
|
||||
func NewEtcdHelper(client EtcdGetSet, codec runtime.Codec, prefix string) EtcdHelper {
|
||||
return EtcdHelper{
|
||||
Client: client,
|
||||
Codec: codec,
|
||||
Versioner: APIObjectVersioner{},
|
||||
Client: client,
|
||||
Codec: codec,
|
||||
Versioner: APIObjectVersioner{},
|
||||
PathPrefix: prefix,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -136,6 +141,7 @@ func (h *EtcdHelper) ExtractToList(key string, listObj runtime.Object) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
key = h.PrefixEtcdKey(key)
|
||||
nodes, index, err := h.listEtcdNode(key)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -158,7 +164,7 @@ func (h *EtcdHelper) ExtractObjToList(key string, listObj runtime.Object) error
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
key = h.PrefixEtcdKey(key)
|
||||
response, err := h.Client.Get(key, false, false)
|
||||
if err != nil {
|
||||
if IsEtcdNotFound(err) {
|
||||
@@ -185,6 +191,7 @@ func (h *EtcdHelper) ExtractObjToList(key string, listObj runtime.Object) error
|
||||
// a zero object of the requested type, or an error, depending on ignoreNotFound. Treats
|
||||
// empty responses and nil response nodes exactly like a not found error.
|
||||
func (h *EtcdHelper) ExtractObj(key string, objPtr runtime.Object, ignoreNotFound bool) error {
|
||||
key = h.PrefixEtcdKey(key)
|
||||
_, _, err := h.bodyAndExtractObj(key, objPtr, ignoreNotFound)
|
||||
return err
|
||||
}
|
||||
@@ -233,6 +240,7 @@ func (h *EtcdHelper) extractObj(response *etcd.Response, inErr error, objPtr run
|
||||
// and 0 means forever. If no error is returned and out is not nil, out will be set to the read value
|
||||
// from etcd.
|
||||
func (h *EtcdHelper) CreateObj(key string, obj, out runtime.Object, ttl uint64) error {
|
||||
key = h.PrefixEtcdKey(key)
|
||||
data, err := h.Codec.Encode(obj)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -242,6 +250,7 @@ func (h *EtcdHelper) CreateObj(key string, obj, out runtime.Object, ttl uint64)
|
||||
return errors.New("resourceVersion may not be set on objects to be created")
|
||||
}
|
||||
}
|
||||
|
||||
response, err := h.Client.Create(key, string(data), ttl)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -257,15 +266,18 @@ func (h *EtcdHelper) CreateObj(key string, obj, out runtime.Object, ttl uint64)
|
||||
|
||||
// Delete removes the specified key.
|
||||
func (h *EtcdHelper) Delete(key string, recursive bool) error {
|
||||
key = h.PrefixEtcdKey(key)
|
||||
_, err := h.Client.Delete(key, recursive)
|
||||
return err
|
||||
}
|
||||
|
||||
// DeleteObj removes the specified key and returns the value that existed at that spot.
|
||||
func (h *EtcdHelper) DeleteObj(key string, out runtime.Object) error {
|
||||
key = h.PrefixEtcdKey(key)
|
||||
if _, err := conversion.EnforcePtr(out); err != nil {
|
||||
panic("unable to convert output object to pointer")
|
||||
}
|
||||
|
||||
response, err := h.Client.Delete(key, false)
|
||||
if !IsEtcdNotFound(err) {
|
||||
// if the object that existed prior to the delete is returned by etcd, update out.
|
||||
@@ -285,6 +297,7 @@ func (h *EtcdHelper) SetObj(key string, obj, out runtime.Object, ttl uint64) err
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
key = h.PrefixEtcdKey(key)
|
||||
|
||||
create := true
|
||||
if h.Versioner != nil {
|
||||
@@ -346,6 +359,7 @@ func (h *EtcdHelper) GuaranteedUpdate(key string, ptrToType runtime.Object, igno
|
||||
// Panic is appropriate, because this is a programming error.
|
||||
panic("need ptr to type")
|
||||
}
|
||||
key = h.PrefixEtcdKey(key)
|
||||
for {
|
||||
obj := reflect.New(v.Type()).Interface().(runtime.Object)
|
||||
origBody, index, err := h.bodyAndExtractObj(key, obj, ignoreNotFound)
|
||||
@@ -386,6 +400,13 @@ func (h *EtcdHelper) GuaranteedUpdate(key string, ptrToType runtime.Object, igno
|
||||
}
|
||||
}
|
||||
|
||||
func (h *EtcdHelper) PrefixEtcdKey(key string) string {
|
||||
if strings.HasPrefix(key, path.Join("/", h.PathPrefix)) {
|
||||
return key
|
||||
}
|
||||
return path.Join("/", h.PathPrefix, key)
|
||||
}
|
||||
|
||||
// GetEtcdVersion performs a version check against the provided Etcd server, returning a triplet
|
||||
// of the release version, internal version, and error (if any).
|
||||
func GetEtcdVersion(host string) (releaseVersion, internalVersion string, err error) {
|
||||
|
@@ -21,6 +21,7 @@ import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"path"
|
||||
"reflect"
|
||||
"sync"
|
||||
"testing"
|
||||
@@ -29,6 +30,7 @@ import (
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/testapi"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/conversion"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools/etcdtest"
|
||||
"github.com/coreos/go-etcd/etcd"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
@@ -79,7 +81,9 @@ func getEncodedPod(name string) string {
|
||||
|
||||
func TestExtractToList(t *testing.T) {
|
||||
fakeClient := NewFakeEtcdClient(t)
|
||||
fakeClient.Data["/some/key"] = EtcdResponseWithError{
|
||||
helper := NewEtcdHelper(fakeClient, testapi.Codec(), etcdtest.PathPrefix())
|
||||
key := etcdtest.AddPrefix("/some/key")
|
||||
fakeClient.Data[key] = EtcdResponseWithError{
|
||||
R: &etcd.Response{
|
||||
EtcdIndex: 10,
|
||||
Node: &etcd.Node{
|
||||
@@ -135,7 +139,6 @@ func TestExtractToList(t *testing.T) {
|
||||
}
|
||||
|
||||
var got api.PodList
|
||||
helper := NewEtcdHelper(fakeClient, testapi.Codec())
|
||||
err := helper.ExtractToList("/some/key", &got)
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error %v", err)
|
||||
@@ -148,7 +151,9 @@ func TestExtractToList(t *testing.T) {
|
||||
// TestExtractToListAcrossDirectories ensures that the client excludes directories and flattens tree-response - simulates cross-namespace query
|
||||
func TestExtractToListAcrossDirectories(t *testing.T) {
|
||||
fakeClient := NewFakeEtcdClient(t)
|
||||
fakeClient.Data["/some/key"] = EtcdResponseWithError{
|
||||
helper := NewEtcdHelper(fakeClient, testapi.Codec(), etcdtest.PathPrefix())
|
||||
key := etcdtest.AddPrefix("/some/key")
|
||||
fakeClient.Data[key] = EtcdResponseWithError{
|
||||
R: &etcd.Response{
|
||||
EtcdIndex: 10,
|
||||
Node: &etcd.Node{
|
||||
@@ -218,7 +223,6 @@ func TestExtractToListAcrossDirectories(t *testing.T) {
|
||||
}
|
||||
|
||||
var got api.PodList
|
||||
helper := NewEtcdHelper(fakeClient, testapi.Codec())
|
||||
err := helper.ExtractToList("/some/key", &got)
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error %v", err)
|
||||
@@ -230,7 +234,9 @@ func TestExtractToListAcrossDirectories(t *testing.T) {
|
||||
|
||||
func TestExtractToListExcludesDirectories(t *testing.T) {
|
||||
fakeClient := NewFakeEtcdClient(t)
|
||||
fakeClient.Data["/some/key"] = EtcdResponseWithError{
|
||||
helper := NewEtcdHelper(fakeClient, testapi.Codec(), etcdtest.PathPrefix())
|
||||
key := etcdtest.AddPrefix("/some/key")
|
||||
fakeClient.Data[key] = EtcdResponseWithError{
|
||||
R: &etcd.Response{
|
||||
EtcdIndex: 10,
|
||||
Node: &etcd.Node{
|
||||
@@ -288,7 +294,6 @@ func TestExtractToListExcludesDirectories(t *testing.T) {
|
||||
}
|
||||
|
||||
var got api.PodList
|
||||
helper := NewEtcdHelper(fakeClient, testapi.Codec())
|
||||
err := helper.ExtractToList("/some/key", &got)
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error %v", err)
|
||||
@@ -300,6 +305,8 @@ func TestExtractToListExcludesDirectories(t *testing.T) {
|
||||
|
||||
func TestExtractObj(t *testing.T) {
|
||||
fakeClient := NewFakeEtcdClient(t)
|
||||
helper := NewEtcdHelper(fakeClient, testapi.Codec(), etcdtest.PathPrefix())
|
||||
key := etcdtest.AddPrefix("/some/key")
|
||||
expect := api.Pod{
|
||||
ObjectMeta: api.ObjectMeta{Name: "foo"},
|
||||
Spec: api.PodSpec{
|
||||
@@ -307,8 +314,7 @@ func TestExtractObj(t *testing.T) {
|
||||
DNSPolicy: api.DNSClusterFirst,
|
||||
},
|
||||
}
|
||||
fakeClient.Set("/some/key", runtime.EncodeOrDie(testapi.Codec(), &expect), 0)
|
||||
helper := NewEtcdHelper(fakeClient, testapi.Codec())
|
||||
fakeClient.Set(key, runtime.EncodeOrDie(testapi.Codec(), &expect), 0)
|
||||
var got api.Pod
|
||||
err := helper.ExtractObj("/some/key", &got, false)
|
||||
if err != nil {
|
||||
@@ -321,7 +327,9 @@ func TestExtractObj(t *testing.T) {
|
||||
|
||||
func TestExtractObjNotFoundErr(t *testing.T) {
|
||||
fakeClient := NewFakeEtcdClient(t)
|
||||
fakeClient.Data["/some/key"] = EtcdResponseWithError{
|
||||
helper := NewEtcdHelper(fakeClient, testapi.Codec(), etcdtest.PathPrefix())
|
||||
key1 := etcdtest.AddPrefix("/some/key")
|
||||
fakeClient.Data[key1] = EtcdResponseWithError{
|
||||
R: &etcd.Response{
|
||||
Node: nil,
|
||||
},
|
||||
@@ -329,19 +337,20 @@ func TestExtractObjNotFoundErr(t *testing.T) {
|
||||
ErrorCode: 100,
|
||||
},
|
||||
}
|
||||
fakeClient.Data["/some/key2"] = EtcdResponseWithError{
|
||||
key2 := etcdtest.AddPrefix("/some/key2")
|
||||
fakeClient.Data[key2] = EtcdResponseWithError{
|
||||
R: &etcd.Response{
|
||||
Node: nil,
|
||||
},
|
||||
}
|
||||
fakeClient.Data["/some/key3"] = EtcdResponseWithError{
|
||||
key3 := etcdtest.AddPrefix("/some/key3")
|
||||
fakeClient.Data[key3] = EtcdResponseWithError{
|
||||
R: &etcd.Response{
|
||||
Node: &etcd.Node{
|
||||
Value: "",
|
||||
},
|
||||
},
|
||||
}
|
||||
helper := NewEtcdHelper(fakeClient, codec)
|
||||
try := func(key string) {
|
||||
var got api.Pod
|
||||
err := helper.ExtractObj(key, &got, false)
|
||||
@@ -362,7 +371,7 @@ func TestExtractObjNotFoundErr(t *testing.T) {
|
||||
func TestCreateObj(t *testing.T) {
|
||||
obj := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}
|
||||
fakeClient := NewFakeEtcdClient(t)
|
||||
helper := NewEtcdHelper(fakeClient, testapi.Codec())
|
||||
helper := NewEtcdHelper(fakeClient, testapi.Codec(), etcdtest.PathPrefix())
|
||||
returnedObj := &api.Pod{}
|
||||
err := helper.CreateObj("/some/key", obj, returnedObj, 5)
|
||||
if err != nil {
|
||||
@@ -372,7 +381,8 @@ func TestCreateObj(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error %#v", err)
|
||||
}
|
||||
node := fakeClient.Data["/some/key"].R.Node
|
||||
key := etcdtest.AddPrefix("/some/key")
|
||||
node := fakeClient.Data[key].R.Node
|
||||
if e, a := string(data), node.Value; e != a {
|
||||
t.Errorf("Wanted %v, got %v", e, a)
|
||||
}
|
||||
@@ -387,7 +397,7 @@ func TestCreateObj(t *testing.T) {
|
||||
func TestCreateObjNilOutParam(t *testing.T) {
|
||||
obj := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}
|
||||
fakeClient := NewFakeEtcdClient(t)
|
||||
helper := NewEtcdHelper(fakeClient, testapi.Codec())
|
||||
helper := NewEtcdHelper(fakeClient, testapi.Codec(), etcdtest.PathPrefix())
|
||||
err := helper.CreateObj("/some/key", obj, nil, 5)
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error %#v", err)
|
||||
@@ -397,7 +407,7 @@ func TestCreateObjNilOutParam(t *testing.T) {
|
||||
func TestSetObj(t *testing.T) {
|
||||
obj := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}
|
||||
fakeClient := NewFakeEtcdClient(t)
|
||||
helper := NewEtcdHelper(fakeClient, testapi.Codec())
|
||||
helper := NewEtcdHelper(fakeClient, testapi.Codec(), etcdtest.PathPrefix())
|
||||
returnedObj := &api.Pod{}
|
||||
err := helper.SetObj("/some/key", obj, returnedObj, 5)
|
||||
if err != nil {
|
||||
@@ -408,7 +418,8 @@ func TestSetObj(t *testing.T) {
|
||||
t.Errorf("Unexpected error %#v", err)
|
||||
}
|
||||
expect := string(data)
|
||||
got := fakeClient.Data["/some/key"].R.Node.Value
|
||||
key := etcdtest.AddPrefix("/some/key")
|
||||
got := fakeClient.Data[key].R.Node.Value
|
||||
if expect != got {
|
||||
t.Errorf("Wanted %v, got %v", expect, got)
|
||||
}
|
||||
@@ -424,7 +435,7 @@ func TestSetObjFailCAS(t *testing.T) {
|
||||
obj := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo", ResourceVersion: "1"}}
|
||||
fakeClient := NewFakeEtcdClient(t)
|
||||
fakeClient.CasErr = fakeClient.NewError(123)
|
||||
helper := NewEtcdHelper(fakeClient, testapi.Codec())
|
||||
helper := NewEtcdHelper(fakeClient, testapi.Codec(), etcdtest.PathPrefix())
|
||||
err := helper.SetObj("/some/key", obj, nil, 5)
|
||||
if err == nil {
|
||||
t.Errorf("Expecting error.")
|
||||
@@ -435,7 +446,9 @@ func TestSetObjWithVersion(t *testing.T) {
|
||||
obj := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo", ResourceVersion: "1"}}
|
||||
fakeClient := NewFakeEtcdClient(t)
|
||||
fakeClient.TestIndex = true
|
||||
fakeClient.Data["/some/key"] = EtcdResponseWithError{
|
||||
helper := NewEtcdHelper(fakeClient, testapi.Codec(), etcdtest.PathPrefix())
|
||||
key := etcdtest.AddPrefix("/some/key")
|
||||
fakeClient.Data[key] = EtcdResponseWithError{
|
||||
R: &etcd.Response{
|
||||
Node: &etcd.Node{
|
||||
Value: runtime.EncodeOrDie(testapi.Codec(), obj),
|
||||
@@ -444,7 +457,6 @@ func TestSetObjWithVersion(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
helper := NewEtcdHelper(fakeClient, testapi.Codec())
|
||||
returnedObj := &api.Pod{}
|
||||
err := helper.SetObj("/some/key", obj, returnedObj, 7)
|
||||
if err != nil {
|
||||
@@ -455,7 +467,7 @@ func TestSetObjWithVersion(t *testing.T) {
|
||||
t.Fatalf("Unexpected error %#v", err)
|
||||
}
|
||||
expect := string(data)
|
||||
got := fakeClient.Data["/some/key"].R.Node.Value
|
||||
got := fakeClient.Data[key].R.Node.Value
|
||||
if expect != got {
|
||||
t.Errorf("Wanted %v, got %v", expect, got)
|
||||
}
|
||||
@@ -470,9 +482,10 @@ func TestSetObjWithVersion(t *testing.T) {
|
||||
func TestSetObjWithoutResourceVersioner(t *testing.T) {
|
||||
obj := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}
|
||||
fakeClient := NewFakeEtcdClient(t)
|
||||
helper := EtcdHelper{fakeClient, testapi.Codec(), nil}
|
||||
helper := EtcdHelper{fakeClient, testapi.Codec(), nil, etcdtest.PathPrefix()}
|
||||
returnedObj := &api.Pod{}
|
||||
err := helper.SetObj("/some/key", obj, returnedObj, 3)
|
||||
key := etcdtest.AddPrefix("/some/key")
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error %#v", err)
|
||||
}
|
||||
@@ -481,7 +494,7 @@ func TestSetObjWithoutResourceVersioner(t *testing.T) {
|
||||
t.Errorf("Unexpected error %#v", err)
|
||||
}
|
||||
expect := string(data)
|
||||
got := fakeClient.Data["/some/key"].R.Node.Value
|
||||
got := fakeClient.Data[key].R.Node.Value
|
||||
if expect != got {
|
||||
t.Errorf("Wanted %v, got %v", expect, got)
|
||||
}
|
||||
@@ -496,7 +509,7 @@ func TestSetObjWithoutResourceVersioner(t *testing.T) {
|
||||
func TestSetObjNilOutParam(t *testing.T) {
|
||||
obj := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}
|
||||
fakeClient := NewFakeEtcdClient(t)
|
||||
helper := EtcdHelper{fakeClient, testapi.Codec(), nil}
|
||||
helper := EtcdHelper{fakeClient, testapi.Codec(), nil, etcdtest.PathPrefix()}
|
||||
err := helper.SetObj("/some/key", obj, nil, 3)
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error %#v", err)
|
||||
@@ -506,10 +519,11 @@ func TestSetObjNilOutParam(t *testing.T) {
|
||||
func TestGuaranteedUpdate(t *testing.T) {
|
||||
fakeClient := NewFakeEtcdClient(t)
|
||||
fakeClient.TestIndex = true
|
||||
helper := NewEtcdHelper(fakeClient, codec)
|
||||
helper := NewEtcdHelper(fakeClient, codec, etcdtest.PathPrefix())
|
||||
key := etcdtest.AddPrefix("/some/key")
|
||||
|
||||
// Create a new node.
|
||||
fakeClient.ExpectNotFoundGet("/some/key")
|
||||
fakeClient.ExpectNotFoundGet(key)
|
||||
obj := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 1}
|
||||
err := helper.GuaranteedUpdate("/some/key", &TestResource{}, true, func(in runtime.Object) (runtime.Object, uint64, error) {
|
||||
return obj, 0, nil
|
||||
@@ -522,7 +536,7 @@ func TestGuaranteedUpdate(t *testing.T) {
|
||||
t.Errorf("Unexpected error %#v", err)
|
||||
}
|
||||
expect := string(data)
|
||||
got := fakeClient.Data["/some/key"].R.Node.Value
|
||||
got := fakeClient.Data[key].R.Node.Value
|
||||
if expect != got {
|
||||
t.Errorf("Wanted %v, got %v", expect, got)
|
||||
}
|
||||
@@ -547,7 +561,7 @@ func TestGuaranteedUpdate(t *testing.T) {
|
||||
t.Errorf("Unexpected error %#v", err)
|
||||
}
|
||||
expect = string(data)
|
||||
got = fakeClient.Data["/some/key"].R.Node.Value
|
||||
got = fakeClient.Data[key].R.Node.Value
|
||||
if expect != got {
|
||||
t.Errorf("Wanted %v, got %v", expect, got)
|
||||
}
|
||||
@@ -560,10 +574,11 @@ func TestGuaranteedUpdate(t *testing.T) {
|
||||
func TestGuaranteedUpdateNoChange(t *testing.T) {
|
||||
fakeClient := NewFakeEtcdClient(t)
|
||||
fakeClient.TestIndex = true
|
||||
helper := NewEtcdHelper(fakeClient, codec)
|
||||
helper := NewEtcdHelper(fakeClient, codec, etcdtest.PathPrefix())
|
||||
key := etcdtest.AddPrefix("/some/key")
|
||||
|
||||
// Create a new node.
|
||||
fakeClient.ExpectNotFoundGet("/some/key")
|
||||
fakeClient.ExpectNotFoundGet(key)
|
||||
obj := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 1}
|
||||
err := helper.GuaranteedUpdate("/some/key", &TestResource{}, true, func(in runtime.Object) (runtime.Object, uint64, error) {
|
||||
return obj, 0, nil
|
||||
@@ -591,10 +606,11 @@ func TestGuaranteedUpdateNoChange(t *testing.T) {
|
||||
func TestGuaranteedUpdateKeyNotFound(t *testing.T) {
|
||||
fakeClient := NewFakeEtcdClient(t)
|
||||
fakeClient.TestIndex = true
|
||||
helper := NewEtcdHelper(fakeClient, codec)
|
||||
helper := NewEtcdHelper(fakeClient, codec, etcdtest.PathPrefix())
|
||||
key := etcdtest.AddPrefix("/some/key")
|
||||
|
||||
// Create a new node.
|
||||
fakeClient.ExpectNotFoundGet("/some/key")
|
||||
fakeClient.ExpectNotFoundGet(key)
|
||||
obj := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 1}
|
||||
|
||||
f := func(in runtime.Object) (runtime.Object, uint64, error) {
|
||||
@@ -617,9 +633,10 @@ func TestGuaranteedUpdateKeyNotFound(t *testing.T) {
|
||||
func TestGuaranteedUpdate_CreateCollision(t *testing.T) {
|
||||
fakeClient := NewFakeEtcdClient(t)
|
||||
fakeClient.TestIndex = true
|
||||
helper := NewEtcdHelper(fakeClient, codec)
|
||||
helper := NewEtcdHelper(fakeClient, codec, etcdtest.PathPrefix())
|
||||
key := etcdtest.AddPrefix("/some/key")
|
||||
|
||||
fakeClient.ExpectNotFoundGet("/some/key")
|
||||
fakeClient.ExpectNotFoundGet(key)
|
||||
|
||||
const concurrency = 10
|
||||
var wgDone sync.WaitGroup
|
||||
@@ -654,7 +671,7 @@ func TestGuaranteedUpdate_CreateCollision(t *testing.T) {
|
||||
wgDone.Wait()
|
||||
|
||||
// Check that stored TestResource has received all updates.
|
||||
body := fakeClient.Data["/some/key"].R.Node.Value
|
||||
body := fakeClient.Data[key].R.Node.Value
|
||||
stored := &TestResource{}
|
||||
if err := codec.DecodeInto([]byte(body), stored); err != nil {
|
||||
t.Errorf("Error decoding stored value: %v", body)
|
||||
@@ -708,3 +725,23 @@ func TestGetEtcdVersion_ErrorStatus(t *testing.T) {
|
||||
_, _, err = GetEtcdVersion(testServer.URL)
|
||||
assert.NotNil(t, err)
|
||||
}
|
||||
|
||||
func TestPrefixEtcdKey(t *testing.T) {
|
||||
fakeClient := NewFakeEtcdClient(t)
|
||||
prefix := path.Join("/", etcdtest.PathPrefix())
|
||||
helper := NewEtcdHelper(fakeClient, testapi.Codec(), prefix)
|
||||
|
||||
baseKey := "/some/key"
|
||||
|
||||
// Verify prefix is added
|
||||
keyBefore := baseKey
|
||||
keyAfter := helper.PrefixEtcdKey(keyBefore)
|
||||
|
||||
assert.Equal(t, keyAfter, path.Join(prefix, baseKey), "Prefix incorrectly added by EtcdHelper")
|
||||
|
||||
// Verify prefix is not added
|
||||
keyBefore = path.Join(prefix, baseKey)
|
||||
keyAfter = helper.PrefixEtcdKey(keyBefore)
|
||||
|
||||
assert.Equal(t, keyBefore, keyAfter, "Prefix incorrectly added by EtcdHelper")
|
||||
}
|
||||
|
@@ -71,6 +71,7 @@ func ParseWatchResourceVersion(resourceVersion, kind string) (uint64, error) {
|
||||
// watch.Interface. resourceVersion may be used to specify what version to begin
|
||||
// watching (e.g., for reconnecting without missing any updates).
|
||||
func (h *EtcdHelper) WatchList(key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error) {
|
||||
key = h.PrefixEtcdKey(key)
|
||||
w := newEtcdWatcher(true, exceptKey(key), filter, h.Codec, h.Versioner, nil)
|
||||
go w.etcdWatch(h.Client, key, resourceVersion)
|
||||
return w, nil
|
||||
@@ -80,6 +81,7 @@ func (h *EtcdHelper) WatchList(key string, resourceVersion uint64, filter Filter
|
||||
// API objects and sent down the returned watch.Interface.
|
||||
// Errors will be sent down the channel.
|
||||
func (h *EtcdHelper) Watch(key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error) {
|
||||
key = h.PrefixEtcdKey(key)
|
||||
w := newEtcdWatcher(false, nil, filter, h.Codec, h.Versioner, nil)
|
||||
go w.etcdWatch(h.Client, key, resourceVersion)
|
||||
return w, nil
|
||||
@@ -102,6 +104,7 @@ func (h *EtcdHelper) Watch(key string, resourceVersion uint64, filter FilterFunc
|
||||
//
|
||||
// Errors will be sent down the channel.
|
||||
func (h *EtcdHelper) WatchAndTransform(key string, resourceVersion uint64, transform TransformFunc) watch.Interface {
|
||||
key = h.PrefixEtcdKey(key)
|
||||
w := newEtcdWatcher(false, nil, Everything, h.Codec, h.Versioner, transform)
|
||||
go w.etcdWatch(h.Client, key, resourceVersion)
|
||||
return w
|
||||
|
@@ -25,6 +25,7 @@ import (
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools/etcdtest"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
|
||||
"github.com/coreos/go-etcd/etcd"
|
||||
)
|
||||
@@ -205,7 +206,7 @@ func TestWatchEtcdError(t *testing.T) {
|
||||
fakeClient := NewFakeEtcdClient(t)
|
||||
fakeClient.expectNotFoundGetSet["/some/key"] = struct{}{}
|
||||
fakeClient.WatchImmediateError = fmt.Errorf("immediate error")
|
||||
h := EtcdHelper{fakeClient, codec, versioner}
|
||||
h := EtcdHelper{fakeClient, codec, versioner, etcdtest.PathPrefix()}
|
||||
|
||||
watching, err := h.Watch("/some/key", 4, Everything)
|
||||
if err != nil {
|
||||
@@ -232,10 +233,12 @@ func TestWatchEtcdError(t *testing.T) {
|
||||
func TestWatch(t *testing.T) {
|
||||
codec := latest.Codec
|
||||
fakeClient := NewFakeEtcdClient(t)
|
||||
fakeClient.expectNotFoundGetSet["/some/key"] = struct{}{}
|
||||
h := EtcdHelper{fakeClient, codec, versioner}
|
||||
key := "/some/key"
|
||||
prefixedKey := etcdtest.AddPrefix(key)
|
||||
fakeClient.expectNotFoundGetSet[prefixedKey] = struct{}{}
|
||||
h := EtcdHelper{fakeClient, codec, versioner, etcdtest.PathPrefix()}
|
||||
|
||||
watching, err := h.Watch("/some/key", 0, Everything)
|
||||
watching, err := h.Watch(key, 0, Everything)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
@@ -300,6 +303,8 @@ func makeSubsets(ip string, port int) []api.EndpointSubset {
|
||||
|
||||
func TestWatchEtcdState(t *testing.T) {
|
||||
codec := latest.Codec
|
||||
baseKey := "/somekey/foo"
|
||||
prefixedKey := etcdtest.AddPrefix(baseKey)
|
||||
type T struct {
|
||||
Type watch.EventType
|
||||
Endpoints []api.EndpointSubset
|
||||
@@ -357,7 +362,7 @@ func TestWatchEtcdState(t *testing.T) {
|
||||
},
|
||||
"from initial state": {
|
||||
Initial: map[string]EtcdResponseWithError{
|
||||
"/somekey/foo": {
|
||||
prefixedKey: {
|
||||
R: &etcd.Response{
|
||||
Action: "get",
|
||||
Node: &etcd.Node{
|
||||
@@ -406,8 +411,9 @@ func TestWatchEtcdState(t *testing.T) {
|
||||
for key, value := range testCase.Initial {
|
||||
fakeClient.Data[key] = value
|
||||
}
|
||||
h := EtcdHelper{fakeClient, codec, versioner}
|
||||
watching, err := h.Watch("/somekey/foo", testCase.From, Everything)
|
||||
|
||||
h := EtcdHelper{fakeClient, codec, versioner, etcdtest.PathPrefix()}
|
||||
watching, err := h.Watch(baseKey, testCase.From, Everything)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
@@ -476,10 +482,12 @@ func TestWatchFromZeroIndex(t *testing.T) {
|
||||
|
||||
for k, testCase := range testCases {
|
||||
fakeClient := NewFakeEtcdClient(t)
|
||||
fakeClient.Data["/some/key"] = testCase.Response
|
||||
h := EtcdHelper{fakeClient, codec, versioner}
|
||||
key := "/some/key"
|
||||
prefixedKey := etcdtest.AddPrefix(key)
|
||||
fakeClient.Data[prefixedKey] = testCase.Response
|
||||
h := EtcdHelper{fakeClient, codec, versioner, etcdtest.PathPrefix()}
|
||||
|
||||
watching, err := h.Watch("/some/key", 0, Everything)
|
||||
watching, err := h.Watch(key, 0, Everything)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
@@ -512,9 +520,10 @@ func TestWatchFromZeroIndex(t *testing.T) {
|
||||
func TestWatchListFromZeroIndex(t *testing.T) {
|
||||
codec := latest.Codec
|
||||
pod := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}
|
||||
|
||||
key := "/some/key"
|
||||
prefixedKey := etcdtest.AddPrefix(key)
|
||||
fakeClient := NewFakeEtcdClient(t)
|
||||
fakeClient.Data["/some/key"] = EtcdResponseWithError{
|
||||
fakeClient.Data[prefixedKey] = EtcdResponseWithError{
|
||||
R: &etcd.Response{
|
||||
Node: &etcd.Node{
|
||||
Dir: true,
|
||||
@@ -537,9 +546,9 @@ func TestWatchListFromZeroIndex(t *testing.T) {
|
||||
EtcdIndex: 3,
|
||||
},
|
||||
}
|
||||
h := EtcdHelper{fakeClient, codec, versioner}
|
||||
h := EtcdHelper{fakeClient, codec, versioner, etcdtest.PathPrefix()}
|
||||
|
||||
watching, err := h.WatchList("/some/key", 0, Everything)
|
||||
watching, err := h.WatchList(key, 0, Everything)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
@@ -573,11 +582,13 @@ func TestWatchListFromZeroIndex(t *testing.T) {
|
||||
func TestWatchListIgnoresRootKey(t *testing.T) {
|
||||
codec := latest.Codec
|
||||
pod := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}
|
||||
key := "/some/key"
|
||||
prefixedKey := etcdtest.AddPrefix(key)
|
||||
|
||||
fakeClient := NewFakeEtcdClient(t)
|
||||
h := EtcdHelper{fakeClient, codec, versioner}
|
||||
h := EtcdHelper{fakeClient, codec, versioner, etcdtest.PathPrefix()}
|
||||
|
||||
watching, err := h.WatchList("/some/key", 1, Everything)
|
||||
watching, err := h.WatchList(key, 1, Everything)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
@@ -587,7 +598,7 @@ func TestWatchListIgnoresRootKey(t *testing.T) {
|
||||
fakeClient.WatchResponse <- &etcd.Response{
|
||||
Action: "delete",
|
||||
PrevNode: &etcd.Node{
|
||||
Key: "/some/key",
|
||||
Key: prefixedKey,
|
||||
Value: runtime.EncodeOrDie(codec, pod),
|
||||
CreatedIndex: 1,
|
||||
ModifiedIndex: 1,
|
||||
@@ -598,7 +609,7 @@ func TestWatchListIgnoresRootKey(t *testing.T) {
|
||||
fakeClient.WatchResponse <- &etcd.Response{
|
||||
Action: "delete",
|
||||
PrevNode: &etcd.Node{
|
||||
Key: "/some/key",
|
||||
Key: prefixedKey,
|
||||
Value: "",
|
||||
CreatedIndex: 1,
|
||||
ModifiedIndex: 1,
|
||||
@@ -617,7 +628,9 @@ func TestWatchListIgnoresRootKey(t *testing.T) {
|
||||
|
||||
func TestWatchFromNotFound(t *testing.T) {
|
||||
fakeClient := NewFakeEtcdClient(t)
|
||||
fakeClient.Data["/some/key"] = EtcdResponseWithError{
|
||||
key := "/some/key"
|
||||
prefixedKey := etcdtest.AddPrefix(key)
|
||||
fakeClient.Data[prefixedKey] = EtcdResponseWithError{
|
||||
R: &etcd.Response{
|
||||
Node: nil,
|
||||
},
|
||||
@@ -626,13 +639,12 @@ func TestWatchFromNotFound(t *testing.T) {
|
||||
ErrorCode: 100,
|
||||
},
|
||||
}
|
||||
h := EtcdHelper{fakeClient, codec, versioner}
|
||||
h := EtcdHelper{fakeClient, codec, versioner, etcdtest.PathPrefix()}
|
||||
|
||||
watching, err := h.Watch("/some/key", 0, Everything)
|
||||
watching, err := h.Watch(key, 0, Everything)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
|
||||
fakeClient.WaitForWatchCompletion()
|
||||
if fakeClient.WatchIndex != 3 {
|
||||
t.Errorf("Expected client to wait for %d, got %#v", 3, fakeClient)
|
||||
@@ -643,7 +655,9 @@ func TestWatchFromNotFound(t *testing.T) {
|
||||
|
||||
func TestWatchFromOtherError(t *testing.T) {
|
||||
fakeClient := NewFakeEtcdClient(t)
|
||||
fakeClient.Data["/some/key"] = EtcdResponseWithError{
|
||||
key := "/some/key"
|
||||
prefixedKey := etcdtest.AddPrefix(key)
|
||||
fakeClient.Data[prefixedKey] = EtcdResponseWithError{
|
||||
R: &etcd.Response{
|
||||
Node: nil,
|
||||
},
|
||||
@@ -652,9 +666,8 @@ func TestWatchFromOtherError(t *testing.T) {
|
||||
ErrorCode: 101,
|
||||
},
|
||||
}
|
||||
h := EtcdHelper{fakeClient, codec, versioner}
|
||||
|
||||
watching, err := h.Watch("/some/key", 0, Everything)
|
||||
h := EtcdHelper{fakeClient, codec, versioner, etcdtest.PathPrefix()}
|
||||
watching, err := h.Watch(key, 0, Everything)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
@@ -683,11 +696,13 @@ func TestWatchFromOtherError(t *testing.T) {
|
||||
|
||||
func TestWatchPurposefulShutdown(t *testing.T) {
|
||||
fakeClient := NewFakeEtcdClient(t)
|
||||
h := EtcdHelper{fakeClient, codec, versioner}
|
||||
fakeClient.expectNotFoundGetSet["/some/key"] = struct{}{}
|
||||
h := EtcdHelper{fakeClient, codec, versioner, etcdtest.PathPrefix()}
|
||||
key := "/some/key"
|
||||
prefixedKey := etcdtest.AddPrefix(key)
|
||||
fakeClient.expectNotFoundGetSet[prefixedKey] = struct{}{}
|
||||
|
||||
// Test purposeful shutdown
|
||||
watching, err := h.Watch("/some/key", 0, Everything)
|
||||
watching, err := h.Watch(key, 0, Everything)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
|
17
pkg/tools/etcdtest/doc.go
Normal file
17
pkg/tools/etcdtest/doc.go
Normal file
@@ -0,0 +1,17 @@
|
||||
/*
|
||||
Copyright 2014 Google Inc. All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package etcdtest
|
32
pkg/tools/etcdtest/etcdtest.go
Normal file
32
pkg/tools/etcdtest/etcdtest.go
Normal file
@@ -0,0 +1,32 @@
|
||||
/*
|
||||
Copyright 2014 Google Inc. All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package etcdtest
|
||||
|
||||
import (
|
||||
"os"
|
||||
"path"
|
||||
)
|
||||
|
||||
// Returns the prefix set via the ETCD_PREFIX environment variable (if any).
|
||||
func PathPrefix() string {
|
||||
return path.Join("/", os.Getenv("ETCD_PREFIX"))
|
||||
}
|
||||
|
||||
// Adds the ETCD_PREFIX to the provided key
|
||||
func AddPrefix(in string) string {
|
||||
return path.Join(PathPrefix(), in)
|
||||
}
|
Reference in New Issue
Block a user