Move etcd storage to pkg/storage/etcd

This commit is contained in:
Wojciech Tyczynski
2015-07-30 13:27:18 +02:00
parent 99d6b0e9f4
commit 3cbbe72f9f
43 changed files with 289 additions and 202 deletions

View File

@@ -0,0 +1,78 @@
/*
Copyright 2014 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package etcd
import (
"strconv"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/storage"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
)
// APIObjectVersioner implements versioning and extracting etcd node information
// for objects that have an embedded ObjectMeta or ListMeta field.
type APIObjectVersioner struct{}
// UpdateObject implements Versioner
func (a APIObjectVersioner) UpdateObject(obj runtime.Object, expiration *time.Time, resourceVersion uint64) error {
objectMeta, err := api.ObjectMetaFor(obj)
if err != nil {
return err
}
if expiration != nil {
objectMeta.DeletionTimestamp = &util.Time{*expiration}
}
versionString := ""
if resourceVersion != 0 {
versionString = strconv.FormatUint(resourceVersion, 10)
}
objectMeta.ResourceVersion = versionString
return nil
}
// UpdateList implements Versioner
func (a APIObjectVersioner) UpdateList(obj runtime.Object, resourceVersion uint64) error {
listMeta, err := api.ListMetaFor(obj)
if err != nil || listMeta == nil {
return err
}
versionString := ""
if resourceVersion != 0 {
versionString = strconv.FormatUint(resourceVersion, 10)
}
listMeta.ResourceVersion = versionString
return nil
}
// ObjectResourceVersion implements Versioner
func (a APIObjectVersioner) ObjectResourceVersion(obj runtime.Object) (uint64, error) {
meta, err := api.ObjectMetaFor(obj)
if err != nil {
return 0, err
}
version := meta.ResourceVersion
if len(version) == 0 {
return 0, nil
}
return strconv.ParseUint(version, 10, 64)
}
// APIObjectVersioner implements Versioner
var _ storage.Versioner = APIObjectVersioner{}

View File

@@ -0,0 +1,50 @@
/*
Copyright 2014 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package etcd
import (
"testing"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
)
func TestObjectVersioner(t *testing.T) {
v := APIObjectVersioner{}
if ver, err := v.ObjectResourceVersion(&TestResource{ObjectMeta: api.ObjectMeta{ResourceVersion: "5"}}); err != nil || ver != 5 {
t.Errorf("unexpected version: %d %v", ver, err)
}
if ver, err := v.ObjectResourceVersion(&TestResource{ObjectMeta: api.ObjectMeta{ResourceVersion: "a"}}); err == nil || ver != 0 {
t.Errorf("unexpected version: %d %v", ver, err)
}
obj := &TestResource{ObjectMeta: api.ObjectMeta{ResourceVersion: "a"}}
if err := v.UpdateObject(obj, nil, 5); err != nil {
t.Fatalf("unexpected error: %v", err)
}
if obj.ResourceVersion != "5" || obj.DeletionTimestamp != nil {
t.Errorf("unexpected resource version: %#v", obj)
}
now := util.Time{time.Now()}
obj = &TestResource{ObjectMeta: api.ObjectMeta{ResourceVersion: "a"}}
if err := v.UpdateObject(obj, &now.Time, 5); err != nil {
t.Fatalf("unexpected error: %v", err)
}
if obj.ResourceVersion != "5" || *obj.DeletionTimestamp != now {
t.Errorf("unexpected resource version: %#v", obj)
}
}

17
pkg/storage/etcd/doc.go Normal file
View File

@@ -0,0 +1,17 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package etcd

View File

@@ -0,0 +1,502 @@
/*
Copyright 2014 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package etcd
import (
"errors"
"fmt"
"path"
"reflect"
"strings"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/conversion"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/storage"
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools/metrics"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
"github.com/coreos/go-etcd/etcd"
"github.com/golang/glog"
)
func NewEtcdStorage(client tools.EtcdClient, codec runtime.Codec, prefix string) storage.Interface {
return &etcdHelper{
client: client,
codec: codec,
versioner: APIObjectVersioner{},
copier: api.Scheme,
pathPrefix: prefix,
cache: util.NewCache(maxEtcdCacheEntries),
}
}
// etcdHelper is the reference implementation of storage.Interface.
type etcdHelper struct {
client tools.EtcdClient
codec runtime.Codec
copier runtime.ObjectCopier
// optional, has to be set to perform any atomic operations
versioner storage.Versioner
// prefix for all etcd keys
pathPrefix string
// We cache objects stored in etcd. For keys we use Node.ModifiedIndex which is equivalent
// to resourceVersion.
// This depends on etcd's indexes being globally unique across all objects/types. This will
// have to revisited if we decide to do things like multiple etcd clusters, or etcd will
// support multi-object transaction that will result in many objects with the same index.
// Number of entries stored in the cache is controlled by maxEtcdCacheEntries constant.
// TODO: Measure how much this cache helps after the conversion code is optimized.
cache util.Cache
}
func init() {
metrics.Register()
}
// Implements storage.Interface.
func (h *etcdHelper) Backends() []string {
return h.client.GetCluster()
}
// Implements storage.Interface.
func (h *etcdHelper) Versioner() storage.Versioner {
return h.versioner
}
// Implements storage.Interface.
func (h *etcdHelper) Create(key string, obj, out runtime.Object, ttl uint64) error {
key = h.prefixEtcdKey(key)
data, err := h.codec.Encode(obj)
if err != nil {
return err
}
if h.versioner != nil {
if version, err := h.versioner.ObjectResourceVersion(obj); err == nil && version != 0 {
return errors.New("resourceVersion may not be set on objects to be created")
}
}
startTime := time.Now()
response, err := h.client.Create(key, string(data), ttl)
metrics.RecordEtcdRequestLatency("create", getTypeName(obj), startTime)
if err != nil {
return err
}
if out != nil {
if _, err := conversion.EnforcePtr(out); err != nil {
panic("unable to convert output object to pointer")
}
_, _, err = h.extractObj(response, err, out, false, false)
}
return err
}
// Implements storage.Interface.
func (h *etcdHelper) Set(key string, obj, out runtime.Object, ttl uint64) error {
var response *etcd.Response
data, err := h.codec.Encode(obj)
if err != nil {
return err
}
key = h.prefixEtcdKey(key)
create := true
if h.versioner != nil {
if version, err := h.versioner.ObjectResourceVersion(obj); err == nil && version != 0 {
create = false
startTime := time.Now()
response, err = h.client.CompareAndSwap(key, string(data), ttl, "", version)
metrics.RecordEtcdRequestLatency("compareAndSwap", getTypeName(obj), startTime)
if err != nil {
return err
}
}
}
if create {
// Create will fail if a key already exists.
startTime := time.Now()
response, err = h.client.Create(key, string(data), ttl)
metrics.RecordEtcdRequestLatency("create", getTypeName(obj), startTime)
}
if err != nil {
return err
}
if out != nil {
if _, err := conversion.EnforcePtr(out); err != nil {
panic("unable to convert output object to pointer")
}
_, _, err = h.extractObj(response, err, out, false, false)
}
return err
}
// Implements storage.Interface.
func (h *etcdHelper) Delete(key string, out runtime.Object) error {
key = h.prefixEtcdKey(key)
if _, err := conversion.EnforcePtr(out); err != nil {
panic("unable to convert output object to pointer")
}
startTime := time.Now()
response, err := h.client.Delete(key, false)
metrics.RecordEtcdRequestLatency("delete", getTypeName(out), startTime)
if !IsEtcdNotFound(err) {
// if the object that existed prior to the delete is returned by etcd, update out.
if err != nil || response.PrevNode != nil {
_, _, err = h.extractObj(response, err, out, false, true)
}
}
return err
}
// Implements storage.Interface.
func (h *etcdHelper) RecursiveDelete(key string, recursive bool) error {
key = h.prefixEtcdKey(key)
startTime := time.Now()
_, err := h.client.Delete(key, recursive)
metrics.RecordEtcdRequestLatency("delete", "UNKNOWN", startTime)
return err
}
// Implements storage.Interface.
func (h *etcdHelper) Watch(key string, resourceVersion uint64, filter storage.FilterFunc) (watch.Interface, error) {
key = h.prefixEtcdKey(key)
w := newEtcdWatcher(false, nil, filter, h.codec, h.versioner, nil, h)
go w.etcdWatch(h.client, key, resourceVersion)
return w, nil
}
// Implements storage.Interface.
func (h *etcdHelper) WatchList(key string, resourceVersion uint64, filter storage.FilterFunc) (watch.Interface, error) {
key = h.prefixEtcdKey(key)
w := newEtcdWatcher(true, exceptKey(key), filter, h.codec, h.versioner, nil, h)
go w.etcdWatch(h.client, key, resourceVersion)
return w, nil
}
// Implements storage.Interface.
func (h *etcdHelper) Get(key string, objPtr runtime.Object, ignoreNotFound bool) error {
key = h.prefixEtcdKey(key)
_, _, _, err := h.bodyAndExtractObj(key, objPtr, ignoreNotFound)
return err
}
// bodyAndExtractObj performs the normal Get path to etcd, returning the parsed node and response for additional information
// about the response, like the current etcd index and the ttl.
func (h *etcdHelper) bodyAndExtractObj(key string, objPtr runtime.Object, ignoreNotFound bool) (body string, node *etcd.Node, res *etcd.Response, err error) {
startTime := time.Now()
response, err := h.client.Get(key, false, false)
metrics.RecordEtcdRequestLatency("get", getTypeName(objPtr), startTime)
if err != nil && !IsEtcdNotFound(err) {
return "", nil, nil, err
}
body, node, err = h.extractObj(response, err, objPtr, ignoreNotFound, false)
return body, node, response, err
}
func (h *etcdHelper) extractObj(response *etcd.Response, inErr error, objPtr runtime.Object, ignoreNotFound, prevNode bool) (body string, node *etcd.Node, err error) {
if response != nil {
if prevNode {
node = response.PrevNode
} else {
node = response.Node
}
}
if inErr != nil || node == nil || len(node.Value) == 0 {
if ignoreNotFound {
v, err := conversion.EnforcePtr(objPtr)
if err != nil {
return "", nil, err
}
v.Set(reflect.Zero(v.Type()))
return "", nil, nil
} else if inErr != nil {
return "", nil, inErr
}
return "", nil, fmt.Errorf("unable to locate a value on the response: %#v", response)
}
body = node.Value
err = h.codec.DecodeInto([]byte(body), objPtr)
if h.versioner != nil {
_ = h.versioner.UpdateObject(objPtr, node.Expiration, node.ModifiedIndex)
// being unable to set the version does not prevent the object from being extracted
}
return body, node, err
}
// Implements storage.Interface.
func (h *etcdHelper) GetToList(key string, listObj runtime.Object) error {
trace := util.NewTrace("GetToList " + getTypeName(listObj))
listPtr, err := runtime.GetItemsPtr(listObj)
if err != nil {
return err
}
key = h.prefixEtcdKey(key)
startTime := time.Now()
trace.Step("About to read etcd node")
response, err := h.client.Get(key, false, false)
metrics.RecordEtcdRequestLatency("get", getTypeName(listPtr), startTime)
trace.Step("Etcd node read")
if err != nil {
if IsEtcdNotFound(err) {
return nil
}
return err
}
nodes := make([]*etcd.Node, 0)
nodes = append(nodes, response.Node)
if err := h.decodeNodeList(nodes, listPtr); err != nil {
return err
}
trace.Step("Object decoded")
if h.versioner != nil {
if err := h.versioner.UpdateList(listObj, response.EtcdIndex); err != nil {
return err
}
}
return nil
}
// decodeNodeList walks the tree of each node in the list and decodes into the specified object
func (h *etcdHelper) decodeNodeList(nodes []*etcd.Node, slicePtr interface{}) error {
trace := util.NewTrace("decodeNodeList " + getTypeName(slicePtr))
defer trace.LogIfLong(500 * time.Millisecond)
v, err := conversion.EnforcePtr(slicePtr)
if err != nil || v.Kind() != reflect.Slice {
// This should not happen at runtime.
panic("need ptr to slice")
}
for _, node := range nodes {
if node.Dir {
trace.Step("Decoding dir " + node.Key + " START")
if err := h.decodeNodeList(node.Nodes, slicePtr); err != nil {
return err
}
trace.Step("Decoding dir " + node.Key + " END")
continue
}
if obj, found := h.getFromCache(node.ModifiedIndex); found {
v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem()))
} else {
obj := reflect.New(v.Type().Elem())
if err := h.codec.DecodeInto([]byte(node.Value), obj.Interface().(runtime.Object)); err != nil {
return err
}
if h.versioner != nil {
// being unable to set the version does not prevent the object from being extracted
_ = h.versioner.UpdateObject(obj.Interface().(runtime.Object), node.Expiration, node.ModifiedIndex)
}
v.Set(reflect.Append(v, obj.Elem()))
if node.ModifiedIndex != 0 {
h.addToCache(node.ModifiedIndex, obj.Interface().(runtime.Object))
}
}
}
trace.Step(fmt.Sprintf("Decoded %v nodes", len(nodes)))
return nil
}
// Implements storage.Interface.
func (h *etcdHelper) List(key string, listObj runtime.Object) error {
trace := util.NewTrace("List " + getTypeName(listObj))
defer trace.LogIfLong(time.Second)
listPtr, err := runtime.GetItemsPtr(listObj)
if err != nil {
return err
}
key = h.prefixEtcdKey(key)
startTime := time.Now()
trace.Step("About to list etcd node")
nodes, index, err := h.listEtcdNode(key)
metrics.RecordEtcdRequestLatency("list", getTypeName(listPtr), startTime)
trace.Step("Etcd node listed")
if err != nil {
return err
}
if err := h.decodeNodeList(nodes, listPtr); err != nil {
return err
}
trace.Step("Node list decoded")
if h.versioner != nil {
if err := h.versioner.UpdateList(listObj, index); err != nil {
return err
}
}
return nil
}
func (h *etcdHelper) listEtcdNode(key string) ([]*etcd.Node, uint64, error) {
result, err := h.client.Get(key, true, true)
if err != nil {
index, ok := etcdErrorIndex(err)
if !ok {
index = 0
}
nodes := make([]*etcd.Node, 0)
if IsEtcdNotFound(err) {
return nodes, index, nil
} else {
return nodes, index, err
}
}
return result.Node.Nodes, result.EtcdIndex, nil
}
// Implements storage.Interface.
func (h *etcdHelper) GuaranteedUpdate(key string, ptrToType runtime.Object, ignoreNotFound bool, tryUpdate storage.UpdateFunc) error {
v, err := conversion.EnforcePtr(ptrToType)
if err != nil {
// Panic is appropriate, because this is a programming error.
panic("need ptr to type")
}
key = h.prefixEtcdKey(key)
for {
obj := reflect.New(v.Type()).Interface().(runtime.Object)
origBody, node, res, err := h.bodyAndExtractObj(key, obj, ignoreNotFound)
if err != nil {
return err
}
meta := storage.ResponseMeta{}
if node != nil {
meta.TTL = node.TTL
if node.Expiration != nil {
meta.Expiration = node.Expiration
}
meta.ResourceVersion = node.ModifiedIndex
}
// Get the object to be written by calling tryUpdate.
ret, newTTL, err := tryUpdate(obj, meta)
if err != nil {
return err
}
index := uint64(0)
ttl := uint64(0)
if node != nil {
index = node.ModifiedIndex
if node.TTL > 0 {
ttl = uint64(node.TTL)
}
} else if res != nil {
index = res.EtcdIndex
}
if newTTL != nil {
ttl = *newTTL
}
data, err := h.codec.Encode(ret)
if err != nil {
return err
}
// First time this key has been used, try creating new value.
if index == 0 {
startTime := time.Now()
response, err := h.client.Create(key, string(data), ttl)
metrics.RecordEtcdRequestLatency("create", getTypeName(ptrToType), startTime)
if IsEtcdNodeExist(err) {
continue
}
_, _, err = h.extractObj(response, err, ptrToType, false, false)
return err
}
if string(data) == origBody {
return nil
}
startTime := time.Now()
// Swap origBody with data, if origBody is the latest etcd data.
response, err := h.client.CompareAndSwap(key, string(data), ttl, origBody, index)
metrics.RecordEtcdRequestLatency("compareAndSwap", getTypeName(ptrToType), startTime)
if IsEtcdTestFailed(err) {
// Try again.
continue
}
_, _, err = h.extractObj(response, err, ptrToType, false, false)
return err
}
}
func (h *etcdHelper) prefixEtcdKey(key string) string {
if strings.HasPrefix(key, path.Join("/", h.pathPrefix)) {
return key
}
return path.Join("/", h.pathPrefix, key)
}
// etcdCache defines interface used for caching objects stored in etcd. Objects are keyed by
// their Node.ModifiedIndex, which is unique across all types.
// All implementations must be thread-safe.
type etcdCache interface {
getFromCache(index uint64) (runtime.Object, bool)
addToCache(index uint64, obj runtime.Object)
}
const maxEtcdCacheEntries int = 50000
func getTypeName(obj interface{}) string {
return reflect.TypeOf(obj).String()
}
func (h *etcdHelper) getFromCache(index uint64) (runtime.Object, bool) {
startTime := time.Now()
defer func() {
metrics.ObserveGetCache(startTime)
}()
obj, found := h.cache.Get(index)
if found {
// We should not return the object itself to avoid poluting the cache if someone
// modifies returned values.
objCopy, err := h.copier.Copy(obj.(runtime.Object))
if err != nil {
glog.Errorf("Error during DeepCopy of cached object: %q", err)
return nil, false
}
metrics.ObserveCacheHit()
return objCopy.(runtime.Object), true
}
metrics.ObserveCacheMiss()
return nil, false
}
func (h *etcdHelper) addToCache(index uint64, obj runtime.Object) {
startTime := time.Now()
defer func() {
metrics.ObserveAddCache(startTime)
}()
objCopy, err := h.copier.Copy(obj)
if err != nil {
glog.Errorf("Error during DeepCopy of cached object: %q", err)
return
}
isOverwrite := h.cache.Add(index, objCopy)
if !isOverwrite {
metrics.ObserveNewEntry()
}
}

View File

@@ -0,0 +1,893 @@
/*
Copyright 2014 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package etcd
import (
"errors"
"fmt"
"math/rand"
"net"
"net/http"
"net/http/httptest"
"path"
"reflect"
"strconv"
"sync"
"testing"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/testapi"
"github.com/GoogleCloudPlatform/kubernetes/pkg/conversion"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/storage"
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools/etcdtest"
"github.com/coreos/go-etcd/etcd"
"github.com/stretchr/testify/assert"
)
const validEtcdVersion = "etcd 2.0.9"
type TestResource struct {
api.TypeMeta `json:",inline"`
api.ObjectMeta `json:"metadata"`
Value int `json:"value"`
}
func (*TestResource) IsAnAPIObject() {}
var scheme *runtime.Scheme
var codec runtime.Codec
func init() {
scheme = runtime.NewScheme()
scheme.AddKnownTypes("", &TestResource{})
scheme.AddKnownTypes(testapi.Version(), &TestResource{})
codec = runtime.CodecFor(scheme, testapi.Version())
scheme.AddConversionFuncs(
func(in *TestResource, out *TestResource, s conversion.Scope) error {
*out = *in
return nil
},
)
}
func newEtcdHelper(client tools.EtcdClient, codec runtime.Codec, prefix string) etcdHelper {
return *NewEtcdStorage(client, codec, prefix).(*etcdHelper)
}
func TestIsEtcdNotFound(t *testing.T) {
try := func(err error, isNotFound bool) {
if IsEtcdNotFound(err) != isNotFound {
t.Errorf("Expected %#v to return %v, but it did not", err, isNotFound)
}
}
try(tools.EtcdErrorNotFound, true)
try(&etcd.EtcdError{ErrorCode: 101}, false)
try(nil, false)
try(fmt.Errorf("some other kind of error"), false)
}
// Returns an encoded version of api.Pod with the given name.
func getEncodedPod(name string) string {
pod, _ := testapi.Codec().Encode(&api.Pod{
ObjectMeta: api.ObjectMeta{Name: name},
})
return string(pod)
}
func TestList(t *testing.T) {
fakeClient := tools.NewFakeEtcdClient(t)
helper := newEtcdHelper(fakeClient, testapi.Codec(), etcdtest.PathPrefix())
key := etcdtest.AddPrefix("/some/key")
fakeClient.Data[key] = tools.EtcdResponseWithError{
R: &etcd.Response{
EtcdIndex: 10,
Node: &etcd.Node{
Dir: true,
Nodes: []*etcd.Node{
{
Key: "/foo",
Value: getEncodedPod("foo"),
Dir: false,
ModifiedIndex: 1,
},
{
Key: "/bar",
Value: getEncodedPod("bar"),
Dir: false,
ModifiedIndex: 2,
},
{
Key: "/baz",
Value: getEncodedPod("baz"),
Dir: false,
ModifiedIndex: 3,
},
},
},
},
}
expect := api.PodList{
ListMeta: api.ListMeta{ResourceVersion: "10"},
Items: []api.Pod{
{
ObjectMeta: api.ObjectMeta{Name: "bar", ResourceVersion: "2"},
Spec: api.PodSpec{
RestartPolicy: api.RestartPolicyAlways,
DNSPolicy: api.DNSClusterFirst,
},
},
{
ObjectMeta: api.ObjectMeta{Name: "baz", ResourceVersion: "3"},
Spec: api.PodSpec{
RestartPolicy: api.RestartPolicyAlways,
DNSPolicy: api.DNSClusterFirst,
},
},
{
ObjectMeta: api.ObjectMeta{Name: "foo", ResourceVersion: "1"},
Spec: api.PodSpec{
RestartPolicy: api.RestartPolicyAlways,
DNSPolicy: api.DNSClusterFirst,
},
},
},
}
var got api.PodList
err := helper.List("/some/key", &got)
if err != nil {
t.Errorf("Unexpected error %v", err)
}
if e, a := expect, got; !reflect.DeepEqual(e, a) {
t.Errorf("Expected %#v, got %#v", e, a)
}
}
// TestListAcrossDirectories ensures that the client excludes directories and flattens tree-response - simulates cross-namespace query
func TestListAcrossDirectories(t *testing.T) {
fakeClient := tools.NewFakeEtcdClient(t)
helper := newEtcdHelper(fakeClient, testapi.Codec(), etcdtest.PathPrefix())
key := etcdtest.AddPrefix("/some/key")
fakeClient.Data[key] = tools.EtcdResponseWithError{
R: &etcd.Response{
EtcdIndex: 10,
Node: &etcd.Node{
Dir: true,
Nodes: []*etcd.Node{
{
Key: "/directory1",
Value: `{"name": "directory1"}`,
Dir: true,
Nodes: []*etcd.Node{
{
Key: "/foo",
Value: getEncodedPod("foo"),
Dir: false,
ModifiedIndex: 1,
},
{
Key: "/baz",
Value: getEncodedPod("baz"),
Dir: false,
ModifiedIndex: 3,
},
},
},
{
Key: "/directory2",
Value: `{"name": "directory2"}`,
Dir: true,
Nodes: []*etcd.Node{
{
Key: "/bar",
Value: getEncodedPod("bar"),
ModifiedIndex: 2,
},
},
},
},
},
},
}
expect := api.PodList{
ListMeta: api.ListMeta{ResourceVersion: "10"},
Items: []api.Pod{
// We expect list to be sorted by directory (e.g. namespace) first, then by name.
{
ObjectMeta: api.ObjectMeta{Name: "baz", ResourceVersion: "3"},
Spec: api.PodSpec{
RestartPolicy: api.RestartPolicyAlways,
DNSPolicy: api.DNSClusterFirst,
},
},
{
ObjectMeta: api.ObjectMeta{Name: "foo", ResourceVersion: "1"},
Spec: api.PodSpec{
RestartPolicy: api.RestartPolicyAlways,
DNSPolicy: api.DNSClusterFirst,
},
},
{
ObjectMeta: api.ObjectMeta{Name: "bar", ResourceVersion: "2"},
Spec: api.PodSpec{
RestartPolicy: api.RestartPolicyAlways,
DNSPolicy: api.DNSClusterFirst,
},
},
},
}
var got api.PodList
err := helper.List("/some/key", &got)
if err != nil {
t.Errorf("Unexpected error %v", err)
}
if e, a := expect, got; !reflect.DeepEqual(e, a) {
t.Errorf("Expected %#v, got %#v", e, a)
}
}
func TestListExcludesDirectories(t *testing.T) {
fakeClient := tools.NewFakeEtcdClient(t)
helper := newEtcdHelper(fakeClient, testapi.Codec(), etcdtest.PathPrefix())
key := etcdtest.AddPrefix("/some/key")
fakeClient.Data[key] = tools.EtcdResponseWithError{
R: &etcd.Response{
EtcdIndex: 10,
Node: &etcd.Node{
Dir: true,
Nodes: []*etcd.Node{
{
Key: "/foo",
Value: getEncodedPod("foo"),
ModifiedIndex: 1,
},
{
Key: "/bar",
Value: getEncodedPod("bar"),
ModifiedIndex: 2,
},
{
Key: "/baz",
Value: getEncodedPod("baz"),
ModifiedIndex: 3,
},
{
Key: "/directory",
Value: `{"name": "directory"}`,
Dir: true,
},
},
},
},
}
expect := api.PodList{
ListMeta: api.ListMeta{ResourceVersion: "10"},
Items: []api.Pod{
{
ObjectMeta: api.ObjectMeta{Name: "bar", ResourceVersion: "2"},
Spec: api.PodSpec{
RestartPolicy: api.RestartPolicyAlways,
DNSPolicy: api.DNSClusterFirst,
},
},
{
ObjectMeta: api.ObjectMeta{Name: "baz", ResourceVersion: "3"},
Spec: api.PodSpec{
RestartPolicy: api.RestartPolicyAlways,
DNSPolicy: api.DNSClusterFirst,
},
},
{
ObjectMeta: api.ObjectMeta{Name: "foo", ResourceVersion: "1"},
Spec: api.PodSpec{
RestartPolicy: api.RestartPolicyAlways,
DNSPolicy: api.DNSClusterFirst,
},
},
},
}
var got api.PodList
err := helper.List("/some/key", &got)
if err != nil {
t.Errorf("Unexpected error %v", err)
}
if e, a := expect, got; !reflect.DeepEqual(e, a) {
t.Errorf("Expected %#v, got %#v", e, a)
}
}
func TestGet(t *testing.T) {
fakeClient := tools.NewFakeEtcdClient(t)
helper := newEtcdHelper(fakeClient, testapi.Codec(), etcdtest.PathPrefix())
key := etcdtest.AddPrefix("/some/key")
expect := api.Pod{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Spec: api.PodSpec{
RestartPolicy: api.RestartPolicyAlways,
DNSPolicy: api.DNSClusterFirst,
},
}
fakeClient.Set(key, runtime.EncodeOrDie(testapi.Codec(), &expect), 0)
var got api.Pod
err := helper.Get("/some/key", &got, false)
if err != nil {
t.Errorf("Unexpected error %#v", err)
}
if !reflect.DeepEqual(got, expect) {
t.Errorf("Wanted %#v, got %#v", expect, got)
}
}
func TestGetNotFoundErr(t *testing.T) {
fakeClient := tools.NewFakeEtcdClient(t)
helper := newEtcdHelper(fakeClient, testapi.Codec(), etcdtest.PathPrefix())
key1 := etcdtest.AddPrefix("/some/key")
fakeClient.Data[key1] = tools.EtcdResponseWithError{
R: &etcd.Response{
Node: nil,
},
E: &etcd.EtcdError{
ErrorCode: 100,
},
}
key2 := etcdtest.AddPrefix("/some/key2")
fakeClient.Data[key2] = tools.EtcdResponseWithError{
R: &etcd.Response{
Node: nil,
},
}
key3 := etcdtest.AddPrefix("/some/key3")
fakeClient.Data[key3] = tools.EtcdResponseWithError{
R: &etcd.Response{
Node: &etcd.Node{
Value: "",
},
},
}
try := func(key string) {
var got api.Pod
err := helper.Get(key, &got, false)
if err == nil {
t.Errorf("%s: wanted error but didn't get one", key)
}
err = helper.Get(key, &got, true)
if err != nil {
t.Errorf("%s: didn't want error but got %#v", key, err)
}
}
try("/some/key")
try("/some/key2")
try("/some/key3")
}
func TestCreate(t *testing.T) {
obj := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}
fakeClient := tools.NewFakeEtcdClient(t)
helper := newEtcdHelper(fakeClient, testapi.Codec(), etcdtest.PathPrefix())
returnedObj := &api.Pod{}
err := helper.Create("/some/key", obj, returnedObj, 5)
if err != nil {
t.Errorf("Unexpected error %#v", err)
}
data, err := testapi.Codec().Encode(obj)
if err != nil {
t.Errorf("Unexpected error %#v", err)
}
key := etcdtest.AddPrefix("/some/key")
node := fakeClient.Data[key].R.Node
if e, a := string(data), node.Value; e != a {
t.Errorf("Wanted %v, got %v", e, a)
}
if e, a := uint64(5), fakeClient.LastSetTTL; e != a {
t.Errorf("Wanted %v, got %v", e, a)
}
if obj.ResourceVersion != returnedObj.ResourceVersion || obj.Name != returnedObj.Name {
t.Errorf("If set was successful but returned object did not have correct resource version")
}
}
func TestCreateNilOutParam(t *testing.T) {
obj := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}
fakeClient := tools.NewFakeEtcdClient(t)
helper := newEtcdHelper(fakeClient, testapi.Codec(), etcdtest.PathPrefix())
err := helper.Create("/some/key", obj, nil, 5)
if err != nil {
t.Errorf("Unexpected error %#v", err)
}
}
func TestSet(t *testing.T) {
obj := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}
fakeClient := tools.NewFakeEtcdClient(t)
helper := newEtcdHelper(fakeClient, testapi.Codec(), etcdtest.PathPrefix())
returnedObj := &api.Pod{}
err := helper.Set("/some/key", obj, returnedObj, 5)
if err != nil {
t.Errorf("Unexpected error %#v", err)
}
data, err := testapi.Codec().Encode(obj)
if err != nil {
t.Errorf("Unexpected error %#v", err)
}
expect := string(data)
key := etcdtest.AddPrefix("/some/key")
got := fakeClient.Data[key].R.Node.Value
if expect != got {
t.Errorf("Wanted %v, got %v", expect, got)
}
if e, a := uint64(5), fakeClient.LastSetTTL; e != a {
t.Errorf("Wanted %v, got %v", e, a)
}
if obj.ResourceVersion != returnedObj.ResourceVersion || obj.Name != returnedObj.Name {
t.Errorf("If set was successful but returned object did not have correct resource version")
}
}
func TestSetFailCAS(t *testing.T) {
obj := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo", ResourceVersion: "1"}}
fakeClient := tools.NewFakeEtcdClient(t)
fakeClient.CasErr = fakeClient.NewError(123)
helper := newEtcdHelper(fakeClient, testapi.Codec(), etcdtest.PathPrefix())
err := helper.Set("/some/key", obj, nil, 5)
if err == nil {
t.Errorf("Expecting error.")
}
}
func TestSetWithVersion(t *testing.T) {
obj := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo", ResourceVersion: "1"}}
fakeClient := tools.NewFakeEtcdClient(t)
fakeClient.TestIndex = true
helper := newEtcdHelper(fakeClient, testapi.Codec(), etcdtest.PathPrefix())
key := etcdtest.AddPrefix("/some/key")
fakeClient.Data[key] = tools.EtcdResponseWithError{
R: &etcd.Response{
Node: &etcd.Node{
Value: runtime.EncodeOrDie(testapi.Codec(), obj),
ModifiedIndex: 1,
},
},
}
returnedObj := &api.Pod{}
err := helper.Set("/some/key", obj, returnedObj, 7)
if err != nil {
t.Fatalf("Unexpected error %#v", err)
}
data, err := testapi.Codec().Encode(obj)
if err != nil {
t.Fatalf("Unexpected error %#v", err)
}
expect := string(data)
got := fakeClient.Data[key].R.Node.Value
if expect != got {
t.Errorf("Wanted %v, got %v", expect, got)
}
if e, a := uint64(7), fakeClient.LastSetTTL; e != a {
t.Errorf("Wanted %v, got %v", e, a)
}
if obj.ResourceVersion != returnedObj.ResourceVersion || obj.Name != returnedObj.Name {
t.Errorf("If set was successful but returned object did not have correct resource version")
}
}
func TestSetWithoutResourceVersioner(t *testing.T) {
obj := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}
fakeClient := tools.NewFakeEtcdClient(t)
helper := newEtcdHelper(fakeClient, testapi.Codec(), etcdtest.PathPrefix())
helper.versioner = nil
returnedObj := &api.Pod{}
err := helper.Set("/some/key", obj, returnedObj, 3)
key := etcdtest.AddPrefix("/some/key")
if err != nil {
t.Errorf("Unexpected error %#v", err)
}
data, err := testapi.Codec().Encode(obj)
if err != nil {
t.Errorf("Unexpected error %#v", err)
}
expect := string(data)
got := fakeClient.Data[key].R.Node.Value
if expect != got {
t.Errorf("Wanted %v, got %v", expect, got)
}
if e, a := uint64(3), fakeClient.LastSetTTL; e != a {
t.Errorf("Wanted %v, got %v", e, a)
}
if obj.ResourceVersion != returnedObj.ResourceVersion || obj.Name != returnedObj.Name {
t.Errorf("If set was successful but returned object did not have correct resource version")
}
}
func TestSetNilOutParam(t *testing.T) {
obj := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}
fakeClient := tools.NewFakeEtcdClient(t)
helper := newEtcdHelper(fakeClient, testapi.Codec(), etcdtest.PathPrefix())
helper.versioner = nil
err := helper.Set("/some/key", obj, nil, 3)
if err != nil {
t.Errorf("Unexpected error %#v", err)
}
}
func TestGuaranteedUpdate(t *testing.T) {
fakeClient := tools.NewFakeEtcdClient(t)
fakeClient.TestIndex = true
helper := newEtcdHelper(fakeClient, codec, etcdtest.PathPrefix())
key := etcdtest.AddPrefix("/some/key")
// Create a new node.
fakeClient.ExpectNotFoundGet(key)
obj := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 1}
err := helper.GuaranteedUpdate("/some/key", &TestResource{}, true, storage.SimpleUpdate(func(in runtime.Object) (runtime.Object, error) {
return obj, nil
}))
if err != nil {
t.Errorf("Unexpected error %#v", err)
}
data, err := codec.Encode(obj)
if err != nil {
t.Errorf("Unexpected error %#v", err)
}
expect := string(data)
got := fakeClient.Data[key].R.Node.Value
if expect != got {
t.Errorf("Wanted %v, got %v", expect, got)
}
// Update an existing node.
callbackCalled := false
objUpdate := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 2}
err = helper.GuaranteedUpdate("/some/key", &TestResource{}, true, storage.SimpleUpdate(func(in runtime.Object) (runtime.Object, error) {
callbackCalled = true
if in.(*TestResource).Value != 1 {
t.Errorf("Callback input was not current set value")
}
return objUpdate, nil
}))
if err != nil {
t.Errorf("Unexpected error %#v", err)
}
data, err = codec.Encode(objUpdate)
if err != nil {
t.Errorf("Unexpected error %#v", err)
}
expect = string(data)
got = fakeClient.Data[key].R.Node.Value
if expect != got {
t.Errorf("Wanted %v, got %v", expect, got)
}
if !callbackCalled {
t.Errorf("tryUpdate callback should have been called.")
}
}
func TestGuaranteedUpdateTTL(t *testing.T) {
fakeClient := tools.NewFakeEtcdClient(t)
fakeClient.TestIndex = true
helper := newEtcdHelper(fakeClient, codec, etcdtest.PathPrefix())
key := etcdtest.AddPrefix("/some/key")
// Create a new node.
fakeClient.ExpectNotFoundGet(key)
obj := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 1}
err := helper.GuaranteedUpdate("/some/key", &TestResource{}, true, func(in runtime.Object, res storage.ResponseMeta) (runtime.Object, *uint64, error) {
if res.TTL != 0 {
t.Fatalf("unexpected response meta: %#v", res)
}
ttl := uint64(10)
return obj, &ttl, nil
})
if err != nil {
t.Errorf("Unexpected error %#v", err)
}
data, err := codec.Encode(obj)
if err != nil {
t.Errorf("Unexpected error %#v", err)
}
expect := string(data)
got := fakeClient.Data[key].R.Node.Value
if expect != got {
t.Errorf("Wanted %v, got %v", expect, got)
}
if fakeClient.Data[key].R.Node.TTL != 10 {
t.Errorf("expected TTL set: %d", fakeClient.Data[key].R.Node.TTL)
}
// Update an existing node.
callbackCalled := false
objUpdate := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 2}
err = helper.GuaranteedUpdate("/some/key", &TestResource{}, true, func(in runtime.Object, res storage.ResponseMeta) (runtime.Object, *uint64, error) {
if res.TTL != 10 {
t.Fatalf("unexpected response meta: %#v", res)
}
callbackCalled = true
if in.(*TestResource).Value != 1 {
t.Errorf("Callback input was not current set value")
}
return objUpdate, nil, nil
})
if err != nil {
t.Errorf("Unexpected error %#v", err)
}
data, err = codec.Encode(objUpdate)
if err != nil {
t.Errorf("Unexpected error %#v", err)
}
expect = string(data)
got = fakeClient.Data[key].R.Node.Value
if expect != got {
t.Errorf("Wanted %v, got %v", expect, got)
}
if fakeClient.Data[key].R.Node.TTL != 10 {
t.Errorf("expected TTL remained set: %d", fakeClient.Data[key].R.Node.TTL)
}
// Update an existing node and change ttl
callbackCalled = false
objUpdate = &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 3}
err = helper.GuaranteedUpdate("/some/key", &TestResource{}, true, func(in runtime.Object, res storage.ResponseMeta) (runtime.Object, *uint64, error) {
if res.TTL != 10 {
t.Fatalf("unexpected response meta: %#v", res)
}
callbackCalled = true
if in.(*TestResource).Value != 2 {
t.Errorf("Callback input was not current set value")
}
newTTL := uint64(20)
return objUpdate, &newTTL, nil
})
if err != nil {
t.Errorf("Unexpected error %#v", err)
}
data, err = codec.Encode(objUpdate)
if err != nil {
t.Errorf("Unexpected error %#v", err)
}
expect = string(data)
got = fakeClient.Data[key].R.Node.Value
if expect != got {
t.Errorf("Wanted %v, got %v", expect, got)
}
if fakeClient.Data[key].R.Node.TTL != 20 {
t.Errorf("expected TTL changed: %d", fakeClient.Data[key].R.Node.TTL)
}
if !callbackCalled {
t.Errorf("tryUpdate callback should have been called.")
}
}
func TestGuaranteedUpdateNoChange(t *testing.T) {
fakeClient := tools.NewFakeEtcdClient(t)
fakeClient.TestIndex = true
helper := newEtcdHelper(fakeClient, codec, etcdtest.PathPrefix())
key := etcdtest.AddPrefix("/some/key")
// Create a new node.
fakeClient.ExpectNotFoundGet(key)
obj := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 1}
err := helper.GuaranteedUpdate("/some/key", &TestResource{}, true, storage.SimpleUpdate(func(in runtime.Object) (runtime.Object, error) {
return obj, nil
}))
if err != nil {
t.Errorf("Unexpected error %#v", err)
}
// Update an existing node with the same data
callbackCalled := false
objUpdate := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 1}
err = helper.GuaranteedUpdate("/some/key", &TestResource{}, true, storage.SimpleUpdate(func(in runtime.Object) (runtime.Object, error) {
fakeClient.Err = errors.New("should not be called")
callbackCalled = true
return objUpdate, nil
}))
if err != nil {
t.Fatalf("Unexpected error %#v", err)
}
if !callbackCalled {
t.Errorf("tryUpdate callback should have been called.")
}
}
func TestGuaranteedUpdateKeyNotFound(t *testing.T) {
fakeClient := tools.NewFakeEtcdClient(t)
fakeClient.TestIndex = true
helper := newEtcdHelper(fakeClient, codec, etcdtest.PathPrefix())
key := etcdtest.AddPrefix("/some/key")
// Create a new node.
fakeClient.ExpectNotFoundGet(key)
obj := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 1}
f := storage.SimpleUpdate(func(in runtime.Object) (runtime.Object, error) {
return obj, nil
})
ignoreNotFound := false
err := helper.GuaranteedUpdate("/some/key", &TestResource{}, ignoreNotFound, f)
if err == nil {
t.Errorf("Expected error for key not found.")
}
ignoreNotFound = true
err = helper.GuaranteedUpdate("/some/key", &TestResource{}, ignoreNotFound, f)
if err != nil {
t.Errorf("Unexpected error %v.", err)
}
}
func TestGuaranteedUpdate_CreateCollision(t *testing.T) {
fakeClient := tools.NewFakeEtcdClient(t)
fakeClient.TestIndex = true
helper := newEtcdHelper(fakeClient, codec, etcdtest.PathPrefix())
key := etcdtest.AddPrefix("/some/key")
fakeClient.ExpectNotFoundGet(key)
const concurrency = 10
var wgDone sync.WaitGroup
var wgForceCollision sync.WaitGroup
wgDone.Add(concurrency)
wgForceCollision.Add(concurrency)
for i := 0; i < concurrency; i++ {
// Increment TestResource.Value by 1
go func() {
defer wgDone.Done()
firstCall := true
err := helper.GuaranteedUpdate("/some/key", &TestResource{}, true, storage.SimpleUpdate(func(in runtime.Object) (runtime.Object, error) {
defer func() { firstCall = false }()
if firstCall {
// Force collision by joining all concurrent GuaranteedUpdate operations here.
wgForceCollision.Done()
wgForceCollision.Wait()
}
currValue := in.(*TestResource).Value
obj := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: currValue + 1}
return obj, nil
}))
if err != nil {
t.Errorf("Unexpected error %#v", err)
}
}()
}
wgDone.Wait()
// Check that stored TestResource has received all updates.
body := fakeClient.Data[key].R.Node.Value
stored := &TestResource{}
if err := codec.DecodeInto([]byte(body), stored); err != nil {
t.Errorf("Error decoding stored value: %v", body)
}
if stored.Value != concurrency {
t.Errorf("Some of the writes were lost. Stored value: %d", stored.Value)
}
}
func TestGetEtcdVersion_ValidVersion(t *testing.T) {
testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Fprint(w, validEtcdVersion)
}))
defer testServer.Close()
var version string
var err error
if version, err = GetEtcdVersion(testServer.URL); err != nil {
t.Errorf("Unexpected error: %v", err)
}
assert.Equal(t, validEtcdVersion, version, "Unexpected version")
assert.Nil(t, err)
}
func TestGetEtcdVersion_ErrorStatus(t *testing.T) {
testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusServiceUnavailable)
}))
defer testServer.Close()
_, err := GetEtcdVersion(testServer.URL)
assert.NotNil(t, err)
}
func TestGetEtcdVersion_NotListening(t *testing.T) {
portIsOpen := func(port int) bool {
conn, err := net.DialTimeout("tcp", "127.0.0.1:"+strconv.Itoa(port), 1*time.Second)
if err == nil {
conn.Close()
return true
}
return false
}
port := rand.Intn((1 << 16) - 1)
for tried := 0; portIsOpen(port); tried++ {
if tried >= 10 {
t.Fatal("Couldn't find a closed TCP port to continue testing")
}
port++
}
_, err := GetEtcdVersion("http://127.0.0.1:" + strconv.Itoa(port))
assert.NotNil(t, err)
}
func TestPrefixEtcdKey(t *testing.T) {
fakeClient := tools.NewFakeEtcdClient(t)
prefix := path.Join("/", etcdtest.PathPrefix())
helper := newEtcdHelper(fakeClient, testapi.Codec(), prefix)
baseKey := "/some/key"
// Verify prefix is added
keyBefore := baseKey
keyAfter := helper.prefixEtcdKey(keyBefore)
assert.Equal(t, keyAfter, path.Join(prefix, baseKey), "Prefix incorrectly added by EtcdHelper")
// Verify prefix is not added
keyBefore = path.Join(prefix, baseKey)
keyAfter = helper.prefixEtcdKey(keyBefore)
assert.Equal(t, keyBefore, keyAfter, "Prefix incorrectly added by EtcdHelper")
}
func TestEtcdHealthCheck(t *testing.T) {
tests := []struct {
data string
expectErr bool
}{
{
data: "{\"health\": \"true\"}",
expectErr: false,
},
{
data: "{\"health\": \"false\"}",
expectErr: true,
},
{
data: "invalid json",
expectErr: true,
},
}
for _, test := range tests {
err := EtcdHealthCheck([]byte(test.data))
if err != nil && !test.expectErr {
t.Errorf("unexpected error: %v", err)
}
if err == nil && test.expectErr {
t.Error("unexpected non-error")
}
}
}

View File

@@ -0,0 +1,122 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package etcd
import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"os/exec"
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
goetcd "github.com/coreos/go-etcd/etcd"
"github.com/golang/glog"
)
// IsEtcdNotFound returns true iff err is an etcd not found error.
func IsEtcdNotFound(err error) bool {
return isEtcdErrorNum(err, tools.EtcdErrorCodeNotFound)
}
// IsEtcdNodeExist returns true iff err is an etcd node aleady exist error.
func IsEtcdNodeExist(err error) bool {
return isEtcdErrorNum(err, tools.EtcdErrorCodeNodeExist)
}
// IsEtcdTestFailed returns true iff err is an etcd write conflict.
func IsEtcdTestFailed(err error) bool {
return isEtcdErrorNum(err, tools.EtcdErrorCodeTestFailed)
}
// IsEtcdWatchStoppedByUser returns true iff err is a client triggered stop.
func IsEtcdWatchStoppedByUser(err error) bool {
return goetcd.ErrWatchStoppedByUser == err
}
// isEtcdErrorNum returns true iff err is an etcd error, whose errorCode matches errorCode
func isEtcdErrorNum(err error, errorCode int) bool {
etcdError, ok := err.(*goetcd.EtcdError)
return ok && etcdError != nil && etcdError.ErrorCode == errorCode
}
// etcdErrorIndex returns the index associated with the error message and whether the
// index was available.
func etcdErrorIndex(err error) (uint64, bool) {
if etcdError, ok := err.(*goetcd.EtcdError); ok {
return etcdError.Index, true
}
return 0, false
}
// GetEtcdVersion performs a version check against the provided Etcd server,
// returning the string response, and error (if any).
func GetEtcdVersion(host string) (string, error) {
response, err := http.Get(host + "/version")
if err != nil {
return "", err
}
defer response.Body.Close()
if response.StatusCode != http.StatusOK {
return "", fmt.Errorf("unsuccessful response from etcd server %q: %v", host, err)
}
versionBytes, err := ioutil.ReadAll(response.Body)
if err != nil {
return "", err
}
return string(versionBytes), nil
}
func startEtcd() (*exec.Cmd, error) {
cmd := exec.Command("etcd")
err := cmd.Start()
if err != nil {
return nil, err
}
return cmd, nil
}
func NewEtcdClientStartServerIfNecessary(server string) (tools.EtcdClient, error) {
_, err := GetEtcdVersion(server)
if err != nil {
glog.Infof("Failed to find etcd, attempting to start.")
_, err := startEtcd()
if err != nil {
return nil, err
}
}
servers := []string{server}
return goetcd.NewClient(servers), nil
}
type etcdHealth struct {
// Note this has to be public so the json library can modify it.
Health string `json:health`
}
func EtcdHealthCheck(data []byte) error {
obj := etcdHealth{}
if err := json.Unmarshal(data, &obj); err != nil {
return err
}
if obj.Health != "true" {
return fmt.Errorf("Unhealthy status: %s", obj.Health)
}
return nil
}

View File

@@ -0,0 +1,377 @@
/*
Copyright 2014 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package etcd
import (
"sync"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/storage"
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
"github.com/coreos/go-etcd/etcd"
"github.com/golang/glog"
)
// Etcd watch event actions
const (
EtcdCreate = "create"
EtcdGet = "get"
EtcdSet = "set"
EtcdCAS = "compareAndSwap"
EtcdDelete = "delete"
)
// TransformFunc attempts to convert an object to another object for use with a watcher.
type TransformFunc func(runtime.Object) (runtime.Object, error)
// includeFunc returns true if the given key should be considered part of a watch
type includeFunc func(key string) bool
// exceptKey is an includeFunc that returns false when the provided key matches the watched key
func exceptKey(except string) includeFunc {
return func(key string) bool {
return key != except
}
}
// etcdWatcher converts a native etcd watch to a watch.Interface.
type etcdWatcher struct {
encoding runtime.Codec
versioner storage.Versioner
transform TransformFunc
list bool // If we're doing a recursive watch, should be true.
include includeFunc
filter storage.FilterFunc
etcdIncoming chan *etcd.Response
etcdError chan error
etcdStop chan bool
etcdCallEnded chan struct{}
outgoing chan watch.Event
userStop chan struct{}
stopped bool
stopLock sync.Mutex
// Injectable for testing. Send the event down the outgoing channel.
emit func(watch.Event)
cache etcdCache
}
// watchWaitDuration is the amount of time to wait for an error from watch.
const watchWaitDuration = 100 * time.Millisecond
// newEtcdWatcher returns a new etcdWatcher; if list is true, watch sub-nodes. If you provide a transform
// and a versioner, the versioner must be able to handle the objects that transform creates.
func newEtcdWatcher(list bool, include includeFunc, filter storage.FilterFunc, encoding runtime.Codec, versioner storage.Versioner, transform TransformFunc, cache etcdCache) *etcdWatcher {
w := &etcdWatcher{
encoding: encoding,
versioner: versioner,
transform: transform,
list: list,
include: include,
filter: filter,
// Buffer this channel, so that the etcd client is not forced
// to context switch with every object it gets, and so that a
// long time spent decoding an object won't block the *next*
// object. Basically, we see a lot of "401 window exceeded"
// errors from etcd, and that's due to the client not streaming
// results but rather getting them one at a time. So we really
// want to never block the etcd client, if possible. The 100 is
// mostly arbitrary--we know it goes as high as 50, though.
// There's a V(2) log message that prints the length so we can
// monitor how much of this buffer is actually used.
etcdIncoming: make(chan *etcd.Response, 100),
etcdError: make(chan error, 1),
etcdStop: make(chan bool),
outgoing: make(chan watch.Event),
userStop: make(chan struct{}),
cache: cache,
}
w.emit = func(e watch.Event) { w.outgoing <- e }
go w.translate()
return w
}
// etcdWatch calls etcd's Watch function, and handles any errors. Meant to be called
// as a goroutine.
func (w *etcdWatcher) etcdWatch(client tools.EtcdClient, key string, resourceVersion uint64) {
defer util.HandleCrash()
defer close(w.etcdError)
if resourceVersion == 0 {
latest, err := etcdGetInitialWatchState(client, key, w.list, w.etcdIncoming)
if err != nil {
w.etcdError <- err
return
}
resourceVersion = latest + 1
}
_, err := client.Watch(key, resourceVersion, w.list, w.etcdIncoming, w.etcdStop)
if err != nil && err != etcd.ErrWatchStoppedByUser {
w.etcdError <- err
}
}
// etcdGetInitialWatchState turns an etcd Get request into a watch equivalent
func etcdGetInitialWatchState(client tools.EtcdClient, key string, recursive bool, incoming chan<- *etcd.Response) (resourceVersion uint64, err error) {
resp, err := client.Get(key, false, recursive)
if err != nil {
if !IsEtcdNotFound(err) {
glog.Errorf("watch was unable to retrieve the current index for the provided key (%q): %v", key, err)
return resourceVersion, err
}
if index, ok := etcdErrorIndex(err); ok {
resourceVersion = index
}
return resourceVersion, nil
}
resourceVersion = resp.EtcdIndex
convertRecursiveResponse(resp.Node, resp, incoming)
return
}
// convertRecursiveResponse turns a recursive get response from etcd into individual response objects
// by copying the original response. This emulates the behavior of a recursive watch.
func convertRecursiveResponse(node *etcd.Node, response *etcd.Response, incoming chan<- *etcd.Response) {
if node.Dir {
for i := range node.Nodes {
convertRecursiveResponse(node.Nodes[i], response, incoming)
}
return
}
copied := *response
copied.Action = "get"
copied.Node = node
incoming <- &copied
}
var (
watchChannelHWM util.HighWaterMark
)
// translate pulls stuff from etcd, converts, and pushes out the outgoing channel. Meant to be
// called as a goroutine.
func (w *etcdWatcher) translate() {
defer close(w.outgoing)
defer util.HandleCrash()
for {
select {
case err := <-w.etcdError:
if err != nil {
w.emit(watch.Event{
watch.Error,
&api.Status{
Status: api.StatusFailure,
Message: err.Error(),
},
})
}
return
case <-w.userStop:
w.etcdStop <- true
return
case res, ok := <-w.etcdIncoming:
if ok {
if curLen := int64(len(w.etcdIncoming)); watchChannelHWM.Check(curLen) {
// Monitor if this gets backed up, and how much.
glog.V(2).Infof("watch: %v objects queued in channel.", curLen)
}
w.sendResult(res)
}
// If !ok, don't return here-- must wait for etcdError channel
// to give an error or be closed.
}
}
}
func (w *etcdWatcher) decodeObject(node *etcd.Node) (runtime.Object, error) {
if obj, found := w.cache.getFromCache(node.ModifiedIndex); found {
return obj, nil
}
obj, err := w.encoding.Decode([]byte(node.Value))
if err != nil {
return nil, err
}
// ensure resource version is set on the object we load from etcd
if w.versioner != nil {
if err := w.versioner.UpdateObject(obj, node.Expiration, node.ModifiedIndex); err != nil {
glog.Errorf("failure to version api object (%d) %#v: %v", node.ModifiedIndex, obj, err)
}
}
// perform any necessary transformation
if w.transform != nil {
obj, err = w.transform(obj)
if err != nil {
glog.Errorf("failure to transform api object %#v: %v", obj, err)
return nil, err
}
}
if node.ModifiedIndex != 0 {
w.cache.addToCache(node.ModifiedIndex, obj)
}
return obj, nil
}
func (w *etcdWatcher) sendAdd(res *etcd.Response) {
if res.Node == nil {
glog.Errorf("unexpected nil node: %#v", res)
return
}
if w.include != nil && !w.include(res.Node.Key) {
return
}
obj, err := w.decodeObject(res.Node)
if err != nil {
glog.Errorf("failure to decode api object: '%v' from %#v %#v", string(res.Node.Value), res, res.Node)
// TODO: expose an error through watch.Interface?
// Ignore this value. If we stop the watch on a bad value, a client that uses
// the resourceVersion to resume will never be able to get past a bad value.
return
}
if !w.filter(obj) {
return
}
action := watch.Added
if res.Node.ModifiedIndex != res.Node.CreatedIndex {
action = watch.Modified
}
w.emit(watch.Event{
Type: action,
Object: obj,
})
}
func (w *etcdWatcher) sendModify(res *etcd.Response) {
if res.Node == nil {
glog.Errorf("unexpected nil node: %#v", res)
return
}
if w.include != nil && !w.include(res.Node.Key) {
return
}
curObj, err := w.decodeObject(res.Node)
if err != nil {
glog.Errorf("failure to decode api object: '%v' from %#v %#v", string(res.Node.Value), res, res.Node)
// TODO: expose an error through watch.Interface?
// Ignore this value. If we stop the watch on a bad value, a client that uses
// the resourceVersion to resume will never be able to get past a bad value.
return
}
curObjPasses := w.filter(curObj)
oldObjPasses := false
var oldObj runtime.Object
if res.PrevNode != nil && res.PrevNode.Value != "" {
// Ignore problems reading the old object.
if oldObj, err = w.decodeObject(res.PrevNode); err == nil {
oldObjPasses = w.filter(oldObj)
}
}
// Some changes to an object may cause it to start or stop matching a filter.
// We need to report those as adds/deletes. So we have to check both the previous
// and current value of the object.
switch {
case curObjPasses && oldObjPasses:
w.emit(watch.Event{
Type: watch.Modified,
Object: curObj,
})
case curObjPasses && !oldObjPasses:
w.emit(watch.Event{
Type: watch.Added,
Object: curObj,
})
case !curObjPasses && oldObjPasses:
w.emit(watch.Event{
Type: watch.Deleted,
Object: oldObj,
})
}
// Do nothing if neither new nor old object passed the filter.
}
func (w *etcdWatcher) sendDelete(res *etcd.Response) {
if res.PrevNode == nil {
glog.Errorf("unexpected nil prev node: %#v", res)
return
}
if w.include != nil && !w.include(res.PrevNode.Key) {
return
}
node := *res.PrevNode
if res.Node != nil {
// Note that this sends the *old* object with the etcd index for the time at
// which it gets deleted. This will allow users to restart the watch at the right
// index.
node.ModifiedIndex = res.Node.ModifiedIndex
}
obj, err := w.decodeObject(&node)
if err != nil {
glog.Errorf("failure to decode api object: '%v' from %#v %#v", string(res.PrevNode.Value), res, res.PrevNode)
// TODO: expose an error through watch.Interface?
// Ignore this value. If we stop the watch on a bad value, a client that uses
// the resourceVersion to resume will never be able to get past a bad value.
return
}
if !w.filter(obj) {
return
}
w.emit(watch.Event{
Type: watch.Deleted,
Object: obj,
})
}
func (w *etcdWatcher) sendResult(res *etcd.Response) {
switch res.Action {
case EtcdCreate, EtcdGet:
w.sendAdd(res)
case EtcdSet, EtcdCAS:
w.sendModify(res)
case EtcdDelete:
w.sendDelete(res)
default:
glog.Errorf("unknown action: %v", res.Action)
}
}
// ResultChan implements watch.Interface.
func (w *etcdWatcher) ResultChan() <-chan watch.Event {
return w.outgoing
}
// Stop implements watch.Interface.
func (w *etcdWatcher) Stop() {
w.stopLock.Lock()
defer w.stopLock.Unlock()
// Prevent double channel closes.
if !w.stopped {
w.stopped = true
close(w.userStop)
}
}

View File

@@ -0,0 +1,735 @@
/*
Copyright 2014 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package etcd
import (
"fmt"
"testing"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/storage"
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools/etcdtest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
"github.com/coreos/go-etcd/etcd"
)
var versioner = APIObjectVersioner{}
// Implements etcdCache interface as empty methods (i.e. does not cache any objects)
type fakeEtcdCache struct{}
func (f *fakeEtcdCache) getFromCache(index uint64) (runtime.Object, bool) {
return nil, false
}
func (f *fakeEtcdCache) addToCache(index uint64, obj runtime.Object) {
}
var _ etcdCache = &fakeEtcdCache{}
func TestWatchInterpretations(t *testing.T) {
codec := latest.Codec
// Declare some pods to make the test cases compact.
podFoo := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}
podBar := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "bar"}}
podBaz := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "baz"}}
firstLetterIsB := func(obj runtime.Object) bool {
return obj.(*api.Pod).Name[0] == 'b'
}
// All of these test cases will be run with the firstLetterIsB FilterFunc.
table := map[string]struct {
actions []string // Run this test item for every action here.
prevNodeValue string
nodeValue string
expectEmit bool
expectType watch.EventType
expectObject runtime.Object
}{
"create": {
actions: []string{"create", "get"},
nodeValue: runtime.EncodeOrDie(codec, podBar),
expectEmit: true,
expectType: watch.Added,
expectObject: podBar,
},
"create but filter blocks": {
actions: []string{"create", "get"},
nodeValue: runtime.EncodeOrDie(codec, podFoo),
expectEmit: false,
},
"delete": {
actions: []string{"delete"},
prevNodeValue: runtime.EncodeOrDie(codec, podBar),
expectEmit: true,
expectType: watch.Deleted,
expectObject: podBar,
},
"delete but filter blocks": {
actions: []string{"delete"},
nodeValue: runtime.EncodeOrDie(codec, podFoo),
expectEmit: false,
},
"modify appears to create 1": {
actions: []string{"set", "compareAndSwap"},
nodeValue: runtime.EncodeOrDie(codec, podBar),
expectEmit: true,
expectType: watch.Added,
expectObject: podBar,
},
"modify appears to create 2": {
actions: []string{"set", "compareAndSwap"},
prevNodeValue: runtime.EncodeOrDie(codec, podFoo),
nodeValue: runtime.EncodeOrDie(codec, podBar),
expectEmit: true,
expectType: watch.Added,
expectObject: podBar,
},
"modify appears to delete": {
actions: []string{"set", "compareAndSwap"},
prevNodeValue: runtime.EncodeOrDie(codec, podBar),
nodeValue: runtime.EncodeOrDie(codec, podFoo),
expectEmit: true,
expectType: watch.Deleted,
expectObject: podBar, // Should return last state that passed the filter!
},
"modify modifies": {
actions: []string{"set", "compareAndSwap"},
prevNodeValue: runtime.EncodeOrDie(codec, podBar),
nodeValue: runtime.EncodeOrDie(codec, podBaz),
expectEmit: true,
expectType: watch.Modified,
expectObject: podBaz,
},
"modify ignores": {
actions: []string{"set", "compareAndSwap"},
nodeValue: runtime.EncodeOrDie(codec, podFoo),
expectEmit: false,
},
}
for name, item := range table {
for _, action := range item.actions {
w := newEtcdWatcher(true, nil, firstLetterIsB, codec, versioner, nil, &fakeEtcdCache{})
emitCalled := false
w.emit = func(event watch.Event) {
emitCalled = true
if !item.expectEmit {
return
}
if e, a := item.expectType, event.Type; e != a {
t.Errorf("'%v - %v': expected %v, got %v", name, action, e, a)
}
if e, a := item.expectObject, event.Object; !api.Semantic.DeepDerivative(e, a) {
t.Errorf("'%v - %v': expected %v, got %v", name, action, e, a)
}
}
var n, pn *etcd.Node
if item.nodeValue != "" {
n = &etcd.Node{Value: item.nodeValue}
}
if item.prevNodeValue != "" {
pn = &etcd.Node{Value: item.prevNodeValue}
}
w.sendResult(&etcd.Response{
Action: action,
Node: n,
PrevNode: pn,
})
if e, a := item.expectEmit, emitCalled; e != a {
t.Errorf("'%v - %v': expected %v, got %v", name, action, e, a)
}
w.Stop()
}
}
}
func TestWatchInterpretation_ResponseNotSet(t *testing.T) {
w := newEtcdWatcher(false, nil, storage.Everything, codec, versioner, nil, &fakeEtcdCache{})
w.emit = func(e watch.Event) {
t.Errorf("Unexpected emit: %v", e)
}
w.sendResult(&etcd.Response{
Action: "update",
})
w.Stop()
}
func TestWatchInterpretation_ResponseNoNode(t *testing.T) {
actions := []string{"create", "set", "compareAndSwap", "delete"}
for _, action := range actions {
w := newEtcdWatcher(false, nil, storage.Everything, codec, versioner, nil, &fakeEtcdCache{})
w.emit = func(e watch.Event) {
t.Errorf("Unexpected emit: %v", e)
}
w.sendResult(&etcd.Response{
Action: action,
})
w.Stop()
}
}
func TestWatchInterpretation_ResponseBadData(t *testing.T) {
actions := []string{"create", "set", "compareAndSwap", "delete"}
for _, action := range actions {
w := newEtcdWatcher(false, nil, storage.Everything, codec, versioner, nil, &fakeEtcdCache{})
w.emit = func(e watch.Event) {
t.Errorf("Unexpected emit: %v", e)
}
w.sendResult(&etcd.Response{
Action: action,
Node: &etcd.Node{
Value: "foobar",
},
})
w.sendResult(&etcd.Response{
Action: action,
PrevNode: &etcd.Node{
Value: "foobar",
},
})
w.Stop()
}
}
func TestWatchEtcdError(t *testing.T) {
codec := latest.Codec
fakeClient := tools.NewFakeEtcdClient(t)
fakeClient.ExpectNotFoundGet("/some/key")
fakeClient.WatchImmediateError = fmt.Errorf("immediate error")
h := newEtcdHelper(fakeClient, codec, etcdtest.PathPrefix())
watching, err := h.Watch("/some/key", 4, storage.Everything)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer watching.Stop()
got := <-watching.ResultChan()
if got.Type != watch.Error {
t.Fatalf("Unexpected non-error")
}
status, ok := got.Object.(*api.Status)
if !ok {
t.Fatalf("Unexpected non-error object type")
}
if status.Message != "immediate error" {
t.Errorf("Unexpected wrong error")
}
if status.Status != api.StatusFailure {
t.Errorf("Unexpected wrong error status")
}
}
func TestWatch(t *testing.T) {
codec := latest.Codec
fakeClient := tools.NewFakeEtcdClient(t)
key := "/some/key"
prefixedKey := etcdtest.AddPrefix(key)
fakeClient.ExpectNotFoundGet(prefixedKey)
h := newEtcdHelper(fakeClient, codec, etcdtest.PathPrefix())
watching, err := h.Watch(key, 0, storage.Everything)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
fakeClient.WaitForWatchCompletion()
// when server returns not found, the watch index starts at the next value (1)
if fakeClient.WatchIndex != 1 {
t.Errorf("Expected client to be at index %d, got %#v", 1, fakeClient)
}
// Test normal case
pod := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}
podBytes, _ := codec.Encode(pod)
fakeClient.WatchResponse <- &etcd.Response{
Action: "set",
Node: &etcd.Node{
Value: string(podBytes),
},
}
event := <-watching.ResultChan()
if e, a := watch.Added, event.Type; e != a {
t.Errorf("Expected %v, got %v", e, a)
}
if e, a := pod, event.Object; !api.Semantic.DeepDerivative(e, a) {
t.Errorf("Expected %v, got %v", e, a)
}
// Test error case
fakeClient.WatchInjectError <- fmt.Errorf("Injected error")
if errEvent, ok := <-watching.ResultChan(); !ok {
t.Errorf("no error result?")
} else {
if e, a := watch.Error, errEvent.Type; e != a {
t.Errorf("Expected %v, got %v", e, a)
}
if e, a := "Injected error", errEvent.Object.(*api.Status).Message; e != a {
t.Errorf("Expected %v, got %v", e, a)
}
}
// Did everything shut down?
if _, open := <-fakeClient.WatchResponse; open {
t.Errorf("An injected error did not cause a graceful shutdown")
}
if _, open := <-watching.ResultChan(); open {
t.Errorf("An injected error did not cause a graceful shutdown")
}
}
func emptySubsets() []api.EndpointSubset {
return []api.EndpointSubset{}
}
func makeSubsets(ip string, port int) []api.EndpointSubset {
return []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: ip}},
Ports: []api.EndpointPort{{Port: port}},
}}
}
func TestWatchEtcdState(t *testing.T) {
codec := latest.Codec
baseKey := "/somekey/foo"
prefixedKey := etcdtest.AddPrefix(baseKey)
type T struct {
Type watch.EventType
Endpoints []api.EndpointSubset
}
testCases := map[string]struct {
Initial map[string]tools.EtcdResponseWithError
Responses []*etcd.Response
From uint64
Expected []*T
}{
"from not found": {
Initial: map[string]tools.EtcdResponseWithError{},
Responses: []*etcd.Response{
{
Action: "create",
Node: &etcd.Node{
Value: string(runtime.EncodeOrDie(codec, &api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Subsets: emptySubsets(),
})),
},
},
},
From: 1,
Expected: []*T{
{watch.Added, nil},
},
},
"from version 1": {
Responses: []*etcd.Response{
{
Action: "compareAndSwap",
Node: &etcd.Node{
Value: string(runtime.EncodeOrDie(codec, &api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Subsets: makeSubsets("127.0.0.1", 9000),
})),
CreatedIndex: 1,
ModifiedIndex: 2,
},
PrevNode: &etcd.Node{
Value: string(runtime.EncodeOrDie(codec, &api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Subsets: emptySubsets(),
})),
CreatedIndex: 1,
ModifiedIndex: 1,
},
},
},
From: 1,
Expected: []*T{
{watch.Modified, makeSubsets("127.0.0.1", 9000)},
},
},
"from initial state": {
Initial: map[string]tools.EtcdResponseWithError{
prefixedKey: {
R: &etcd.Response{
Action: "get",
Node: &etcd.Node{
Value: string(runtime.EncodeOrDie(codec, &api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Subsets: emptySubsets(),
})),
CreatedIndex: 1,
ModifiedIndex: 1,
},
EtcdIndex: 1,
},
},
},
Responses: []*etcd.Response{
nil,
{
Action: "compareAndSwap",
Node: &etcd.Node{
Value: string(runtime.EncodeOrDie(codec, &api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Subsets: makeSubsets("127.0.0.1", 9000),
})),
CreatedIndex: 1,
ModifiedIndex: 2,
},
PrevNode: &etcd.Node{
Value: string(runtime.EncodeOrDie(codec, &api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Subsets: emptySubsets(),
})),
CreatedIndex: 1,
ModifiedIndex: 1,
},
},
},
Expected: []*T{
{watch.Added, nil},
{watch.Modified, makeSubsets("127.0.0.1", 9000)},
},
},
}
for k, testCase := range testCases {
fakeClient := tools.NewFakeEtcdClient(t)
for key, value := range testCase.Initial {
fakeClient.Data[key] = value
}
h := newEtcdHelper(fakeClient, codec, etcdtest.PathPrefix())
watching, err := h.Watch(baseKey, testCase.From, storage.Everything)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
fakeClient.WaitForWatchCompletion()
t.Logf("Testing %v", k)
for i := range testCase.Responses {
if testCase.Responses[i] != nil {
fakeClient.WatchResponse <- testCase.Responses[i]
}
event := <-watching.ResultChan()
if e, a := testCase.Expected[i].Type, event.Type; e != a {
t.Errorf("%s: expected type %v, got %v", k, e, a)
break
}
if e, a := testCase.Expected[i].Endpoints, event.Object.(*api.Endpoints).Subsets; !api.Semantic.DeepDerivative(e, a) {
t.Errorf("%s: expected type %v, got %v", k, e, a)
break
}
}
watching.Stop()
}
}
func TestWatchFromZeroIndex(t *testing.T) {
codec := latest.Codec
pod := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}
testCases := map[string]struct {
Response tools.EtcdResponseWithError
ExpectedVersion string
ExpectedType watch.EventType
}{
"get value created": {
tools.EtcdResponseWithError{
R: &etcd.Response{
Node: &etcd.Node{
Value: runtime.EncodeOrDie(codec, pod),
CreatedIndex: 1,
ModifiedIndex: 1,
},
Action: "get",
EtcdIndex: 2,
},
},
"1",
watch.Added,
},
"get value modified": {
tools.EtcdResponseWithError{
R: &etcd.Response{
Node: &etcd.Node{
Value: runtime.EncodeOrDie(codec, pod),
CreatedIndex: 1,
ModifiedIndex: 2,
},
Action: "get",
EtcdIndex: 3,
},
},
"2",
watch.Modified,
},
}
for k, testCase := range testCases {
fakeClient := tools.NewFakeEtcdClient(t)
key := "/some/key"
prefixedKey := etcdtest.AddPrefix(key)
fakeClient.Data[prefixedKey] = testCase.Response
h := newEtcdHelper(fakeClient, codec, etcdtest.PathPrefix())
watching, err := h.Watch(key, 0, storage.Everything)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
fakeClient.WaitForWatchCompletion()
if e, a := testCase.Response.R.EtcdIndex+1, fakeClient.WatchIndex; e != a {
t.Errorf("%s: expected watch index to be %d, got %d", k, e, a)
}
// the existing node is detected and the index set
event := <-watching.ResultChan()
if e, a := testCase.ExpectedType, event.Type; e != a {
t.Errorf("%s: expected %v, got %v", k, e, a)
}
actualPod, ok := event.Object.(*api.Pod)
if !ok {
t.Fatalf("%s: expected a pod, got %#v", k, event.Object)
}
if actualPod.ResourceVersion != testCase.ExpectedVersion {
t.Errorf("%s: expected pod with resource version %v, Got %#v", k, testCase.ExpectedVersion, actualPod)
}
pod.ResourceVersion = testCase.ExpectedVersion
if e, a := pod, event.Object; !api.Semantic.DeepDerivative(e, a) {
t.Errorf("%s: expected %v, got %v", k, e, a)
}
watching.Stop()
}
}
func TestWatchListFromZeroIndex(t *testing.T) {
codec := latest.Codec
pod := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}
key := "/some/key"
prefixedKey := etcdtest.AddPrefix(key)
fakeClient := tools.NewFakeEtcdClient(t)
fakeClient.Data[prefixedKey] = tools.EtcdResponseWithError{
R: &etcd.Response{
Node: &etcd.Node{
Dir: true,
Nodes: etcd.Nodes{
&etcd.Node{
Value: runtime.EncodeOrDie(codec, pod),
CreatedIndex: 1,
ModifiedIndex: 1,
Nodes: etcd.Nodes{},
},
&etcd.Node{
Value: runtime.EncodeOrDie(codec, pod),
CreatedIndex: 2,
ModifiedIndex: 2,
Nodes: etcd.Nodes{},
},
},
},
Action: "get",
EtcdIndex: 3,
},
}
h := newEtcdHelper(fakeClient, codec, etcdtest.PathPrefix())
watching, err := h.WatchList(key, 0, storage.Everything)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// the existing node is detected and the index set
event, open := <-watching.ResultChan()
if !open {
t.Fatalf("unexpected channel close")
}
for i := 0; i < 2; i++ {
if e, a := watch.Added, event.Type; e != a {
t.Errorf("Expected %v, got %v", e, a)
}
actualPod, ok := event.Object.(*api.Pod)
if !ok {
t.Fatalf("expected a pod, got %#v", event.Object)
}
if actualPod.ResourceVersion != "1" {
t.Errorf("Expected pod with resource version %d, Got %#v", 1, actualPod)
}
pod.ResourceVersion = "1"
if e, a := pod, event.Object; !api.Semantic.DeepDerivative(e, a) {
t.Errorf("Expected %v, got %v", e, a)
}
}
fakeClient.WaitForWatchCompletion()
watching.Stop()
}
func TestWatchListIgnoresRootKey(t *testing.T) {
codec := latest.Codec
pod := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}
key := "/some/key"
prefixedKey := etcdtest.AddPrefix(key)
fakeClient := tools.NewFakeEtcdClient(t)
h := newEtcdHelper(fakeClient, codec, etcdtest.PathPrefix())
watching, err := h.WatchList(key, 1, storage.Everything)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
fakeClient.WaitForWatchCompletion()
// This is the root directory of the watch, which happens to have a value encoded
fakeClient.WatchResponse <- &etcd.Response{
Action: "delete",
PrevNode: &etcd.Node{
Key: prefixedKey,
Value: runtime.EncodeOrDie(codec, pod),
CreatedIndex: 1,
ModifiedIndex: 1,
},
}
// Delete of the parent directory of a key is an event that a list watch would receive,
// but will have no value so the decode will fail.
fakeClient.WatchResponse <- &etcd.Response{
Action: "delete",
PrevNode: &etcd.Node{
Key: prefixedKey,
Value: "",
CreatedIndex: 1,
ModifiedIndex: 1,
},
}
close(fakeClient.WatchStop)
// the existing node is detected and the index set
_, open := <-watching.ResultChan()
if open {
t.Fatalf("unexpected channel open")
}
watching.Stop()
}
func TestWatchFromNotFound(t *testing.T) {
fakeClient := tools.NewFakeEtcdClient(t)
key := "/some/key"
prefixedKey := etcdtest.AddPrefix(key)
fakeClient.Data[prefixedKey] = tools.EtcdResponseWithError{
R: &etcd.Response{
Node: nil,
},
E: &etcd.EtcdError{
Index: 2,
ErrorCode: 100,
},
}
h := newEtcdHelper(fakeClient, codec, etcdtest.PathPrefix())
watching, err := h.Watch(key, 0, storage.Everything)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
fakeClient.WaitForWatchCompletion()
if fakeClient.WatchIndex != 3 {
t.Errorf("Expected client to wait for %d, got %#v", 3, fakeClient)
}
watching.Stop()
}
func TestWatchFromOtherError(t *testing.T) {
fakeClient := tools.NewFakeEtcdClient(t)
key := "/some/key"
prefixedKey := etcdtest.AddPrefix(key)
fakeClient.Data[prefixedKey] = tools.EtcdResponseWithError{
R: &etcd.Response{
Node: nil,
},
E: &etcd.EtcdError{
Index: 2,
ErrorCode: 101,
},
}
h := newEtcdHelper(fakeClient, codec, etcdtest.PathPrefix())
watching, err := h.Watch(key, 0, storage.Everything)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
errEvent := <-watching.ResultChan()
if e, a := watch.Error, errEvent.Type; e != a {
t.Errorf("Expected %v, got %v", e, a)
}
if e, a := "101: () [2]", errEvent.Object.(*api.Status).Message; e != a {
t.Errorf("Expected %v, got %v", e, a)
}
select {
case _, ok := <-watching.ResultChan():
if ok {
t.Fatalf("expected result channel to be closed")
}
case <-time.After(1 * time.Second):
t.Fatalf("watch should have closed channel: %#v", watching)
}
if fakeClient.WatchResponse != nil || fakeClient.WatchIndex != 0 {
t.Fatalf("Watch should not have been invoked: %#v", fakeClient)
}
}
func TestWatchPurposefulShutdown(t *testing.T) {
fakeClient := tools.NewFakeEtcdClient(t)
h := newEtcdHelper(fakeClient, codec, etcdtest.PathPrefix())
key := "/some/key"
prefixedKey := etcdtest.AddPrefix(key)
fakeClient.ExpectNotFoundGet(prefixedKey)
// Test purposeful shutdown
watching, err := h.Watch(key, 0, storage.Everything)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
fakeClient.WaitForWatchCompletion()
watching.Stop()
// Did everything shut down?
if _, open := <-fakeClient.WatchResponse; open {
t.Errorf("A stop did not cause a graceful shutdown")
}
if _, open := <-watching.ResultChan(); open {
t.Errorf("An injected error did not cause a graceful shutdown")
}
}