Address remaining comments from #756
Rename Encoding to Codec, Versioning to ResourceVersioner. Add GoDoc. Add Delete(key, recursive) to EtcdHelper
This commit is contained in:
parent
91b31c5552
commit
67dbd15929
@ -25,19 +25,23 @@ import (
|
|||||||
"gopkg.in/v1/yaml"
|
"gopkg.in/v1/yaml"
|
||||||
)
|
)
|
||||||
|
|
||||||
type EncodingInterface interface {
|
// codec defines methods for serializing and deserializing API
|
||||||
|
// objects
|
||||||
|
type codec interface {
|
||||||
Encode(obj interface{}) (data []byte, err error)
|
Encode(obj interface{}) (data []byte, err error)
|
||||||
Decode(data []byte) (interface{}, error)
|
Decode(data []byte) (interface{}, error)
|
||||||
DecodeInto(data []byte, obj interface{}) error
|
DecodeInto(data []byte, obj interface{}) error
|
||||||
}
|
}
|
||||||
|
|
||||||
type VersioningInterface interface {
|
// resourceVersioner provides methods for setting and retrieving
|
||||||
|
// the resource version from an API object
|
||||||
|
type resourceVersioner interface {
|
||||||
SetResourceVersion(obj interface{}, version uint64) error
|
SetResourceVersion(obj interface{}, version uint64) error
|
||||||
ResourceVersion(obj interface{}) (uint64, error)
|
ResourceVersion(obj interface{}) (uint64, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
var Encoding EncodingInterface
|
var Codec codec
|
||||||
var Versioning VersioningInterface
|
var ResourceVersioner resourceVersioner
|
||||||
|
|
||||||
var conversionScheme *conversion.Scheme
|
var conversionScheme *conversion.Scheme
|
||||||
|
|
||||||
@ -101,8 +105,8 @@ func init() {
|
|||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
Encoding = conversionScheme
|
Codec = conversionScheme
|
||||||
Versioning = JSONBaseVersioning{}
|
ResourceVersioner = NewJSONBaseResourceVersioner()
|
||||||
}
|
}
|
||||||
|
|
||||||
// AddKnownTypes registers the types of the arguments to the marshaller of the package api.
|
// AddKnownTypes registers the types of the arguments to the marshaller of the package api.
|
||||||
|
@ -21,10 +21,15 @@ import (
|
|||||||
"reflect"
|
"reflect"
|
||||||
)
|
)
|
||||||
|
|
||||||
// versionedJSONBase allows access to the version state of a JSONBase object
|
// NewJSONBaseVersioner returns a resourceVersioner that can set or retrieve
|
||||||
type JSONBaseVersioning struct{}
|
// ResourceVersion on objects derived from JSONBase.
|
||||||
|
func NewJSONBaseResourceVersioner() resourceVersioner {
|
||||||
|
return &jsonBaseResourceVersioner{}
|
||||||
|
}
|
||||||
|
|
||||||
func (v JSONBaseVersioning) ResourceVersion(obj interface{}) (uint64, error) {
|
type jsonBaseResourceVersioner struct{}
|
||||||
|
|
||||||
|
func (v jsonBaseResourceVersioner) ResourceVersion(obj interface{}) (uint64, error) {
|
||||||
json, err := FindJSONBaseRO(obj)
|
json, err := FindJSONBaseRO(obj)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
@ -32,7 +37,7 @@ func (v JSONBaseVersioning) ResourceVersion(obj interface{}) (uint64, error) {
|
|||||||
return json.ResourceVersion, nil
|
return json.ResourceVersion, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (v JSONBaseVersioning) SetResourceVersion(obj interface{}, version uint64) error {
|
func (v jsonBaseResourceVersioner) SetResourceVersion(obj interface{}, version uint64) error {
|
||||||
json, err := FindJSONBase(obj)
|
json, err := FindJSONBase(obj)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -67,7 +67,7 @@ func TestGenericJSONBase(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestVersioningOfAPI(t *testing.T) {
|
func TestResourceVersionerOfAPI(t *testing.T) {
|
||||||
type T struct {
|
type T struct {
|
||||||
Object interface{}
|
Object interface{}
|
||||||
Expected uint64
|
Expected uint64
|
||||||
@ -77,7 +77,7 @@ func TestVersioningOfAPI(t *testing.T) {
|
|||||||
"api object with version": {Service{JSONBase: JSONBase{ResourceVersion: 1}}, 1},
|
"api object with version": {Service{JSONBase: JSONBase{ResourceVersion: 1}}, 1},
|
||||||
"pointer to api object with version": {&Service{JSONBase: JSONBase{ResourceVersion: 1}}, 1},
|
"pointer to api object with version": {&Service{JSONBase: JSONBase{ResourceVersion: 1}}, 1},
|
||||||
}
|
}
|
||||||
versioning := JSONBaseVersioning{}
|
versioning := NewJSONBaseResourceVersioner()
|
||||||
for key, testCase := range testCases {
|
for key, testCase := range testCases {
|
||||||
actual, err := versioning.ResourceVersion(testCase.Object)
|
actual, err := versioning.ResourceVersion(testCase.Object)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -32,7 +32,6 @@ import (
|
|||||||
|
|
||||||
// EtcdRegistry implements PodRegistry, ControllerRegistry and ServiceRegistry with backed by etcd.
|
// EtcdRegistry implements PodRegistry, ControllerRegistry and ServiceRegistry with backed by etcd.
|
||||||
type EtcdRegistry struct {
|
type EtcdRegistry struct {
|
||||||
client tools.EtcdClient
|
|
||||||
helper tools.EtcdHelper
|
helper tools.EtcdHelper
|
||||||
machines MinionRegistry
|
machines MinionRegistry
|
||||||
manifestFactory ManifestFactory
|
manifestFactory ManifestFactory
|
||||||
@ -44,8 +43,7 @@ type EtcdRegistry struct {
|
|||||||
// 'scheduler' is the scheduling algorithm to use.
|
// 'scheduler' is the scheduling algorithm to use.
|
||||||
func MakeEtcdRegistry(client tools.EtcdClient, machines MinionRegistry) *EtcdRegistry {
|
func MakeEtcdRegistry(client tools.EtcdClient, machines MinionRegistry) *EtcdRegistry {
|
||||||
registry := &EtcdRegistry{
|
registry := &EtcdRegistry{
|
||||||
client: client,
|
helper: tools.EtcdHelper{client, api.Codec, api.ResourceVersioner},
|
||||||
helper: tools.EtcdHelper{client, api.Encoding, api.Versioning},
|
|
||||||
machines: machines,
|
machines: machines,
|
||||||
}
|
}
|
||||||
registry.manifestFactory = &BasicManifestFactory{
|
registry.manifestFactory = &BasicManifestFactory{
|
||||||
@ -118,7 +116,7 @@ func (registry *EtcdRegistry) runPod(pod api.Pod, machine string) error {
|
|||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// Don't strand stuff.
|
// Don't strand stuff.
|
||||||
_, err2 := registry.client.Delete(podKey, false)
|
err2 := registry.helper.Delete(podKey, false)
|
||||||
if err2 != nil {
|
if err2 != nil {
|
||||||
glog.Errorf("Probably stranding a pod, couldn't delete %v: %#v", podKey, err2)
|
glog.Errorf("Probably stranding a pod, couldn't delete %v: %#v", podKey, err2)
|
||||||
}
|
}
|
||||||
@ -143,7 +141,7 @@ func (registry *EtcdRegistry) deletePodFromMachine(machine, podID string) error
|
|||||||
// First delete the pod, so a scheduler doesn't notice it getting removed from the
|
// First delete the pod, so a scheduler doesn't notice it getting removed from the
|
||||||
// machine and attempt to put it somewhere.
|
// machine and attempt to put it somewhere.
|
||||||
podKey := makePodKey(machine, podID)
|
podKey := makePodKey(machine, podID)
|
||||||
_, err := registry.client.Delete(podKey, true)
|
err := registry.helper.Delete(podKey, true)
|
||||||
if tools.IsEtcdNotFound(err) {
|
if tools.IsEtcdNotFound(err) {
|
||||||
return apiserver.NewNotFoundErr("pod", podID)
|
return apiserver.NewNotFoundErr("pod", podID)
|
||||||
}
|
}
|
||||||
@ -247,7 +245,7 @@ func (registry *EtcdRegistry) UpdateController(controller api.ReplicationControl
|
|||||||
// DeleteController deletes a ReplicationController specified by its ID.
|
// DeleteController deletes a ReplicationController specified by its ID.
|
||||||
func (registry *EtcdRegistry) DeleteController(controllerID string) error {
|
func (registry *EtcdRegistry) DeleteController(controllerID string) error {
|
||||||
key := makeControllerKey(controllerID)
|
key := makeControllerKey(controllerID)
|
||||||
_, err := registry.client.Delete(key, false)
|
err := registry.helper.Delete(key, false)
|
||||||
if tools.IsEtcdNotFound(err) {
|
if tools.IsEtcdNotFound(err) {
|
||||||
return apiserver.NewNotFoundErr("replicationController", controllerID)
|
return apiserver.NewNotFoundErr("replicationController", controllerID)
|
||||||
}
|
}
|
||||||
@ -295,7 +293,7 @@ func makeServiceEndpointsKey(name string) string {
|
|||||||
// DeleteService deletes a Service specified by its name.
|
// DeleteService deletes a Service specified by its name.
|
||||||
func (registry *EtcdRegistry) DeleteService(name string) error {
|
func (registry *EtcdRegistry) DeleteService(name string) error {
|
||||||
key := makeServiceKey(name)
|
key := makeServiceKey(name)
|
||||||
_, err := registry.client.Delete(key, true)
|
err := registry.helper.Delete(key, true)
|
||||||
if tools.IsEtcdNotFound(err) {
|
if tools.IsEtcdNotFound(err) {
|
||||||
return apiserver.NewNotFoundErr("service", name)
|
return apiserver.NewNotFoundErr("service", name)
|
||||||
}
|
}
|
||||||
@ -303,7 +301,7 @@ func (registry *EtcdRegistry) DeleteService(name string) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
key = makeServiceEndpointsKey(name)
|
key = makeServiceEndpointsKey(name)
|
||||||
_, err = registry.client.Delete(key, true)
|
err = registry.helper.Delete(key, true)
|
||||||
if !tools.IsEtcdNotFound(err) {
|
if !tools.IsEtcdNotFound(err) {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -42,13 +42,15 @@ var (
|
|||||||
EtcdErrorValueRequired = &etcd.EtcdError{ErrorCode: EtcdErrorCodeValueRequired}
|
EtcdErrorValueRequired = &etcd.EtcdError{ErrorCode: EtcdErrorCodeValueRequired}
|
||||||
)
|
)
|
||||||
|
|
||||||
type Encoding interface {
|
// Codec provides methods for transforming Etcd values into objects and back
|
||||||
|
type Codec interface {
|
||||||
Encode(obj interface{}) (data []byte, err error)
|
Encode(obj interface{}) (data []byte, err error)
|
||||||
Decode(data []byte) (interface{}, error)
|
Decode(data []byte) (interface{}, error)
|
||||||
DecodeInto(data []byte, obj interface{}) error
|
DecodeInto(data []byte, obj interface{}) error
|
||||||
}
|
}
|
||||||
|
|
||||||
type Versioning interface {
|
// ResourceVersioner provides methods for managing object modification tracking
|
||||||
|
type ResourceVersioner interface {
|
||||||
SetResourceVersion(obj interface{}, version uint64) error
|
SetResourceVersion(obj interface{}, version uint64) error
|
||||||
ResourceVersion(obj interface{}) (uint64, error)
|
ResourceVersion(obj interface{}) (uint64, error)
|
||||||
}
|
}
|
||||||
@ -71,16 +73,17 @@ type EtcdGetSet interface {
|
|||||||
Get(key string, sort, recursive bool) (*etcd.Response, error)
|
Get(key string, sort, recursive bool) (*etcd.Response, error)
|
||||||
Set(key, value string, ttl uint64) (*etcd.Response, error)
|
Set(key, value string, ttl uint64) (*etcd.Response, error)
|
||||||
Create(key, value string, ttl uint64) (*etcd.Response, error)
|
Create(key, value string, ttl uint64) (*etcd.Response, error)
|
||||||
|
Delete(key string, recursive bool) (*etcd.Response, error)
|
||||||
CompareAndSwap(key, value string, ttl uint64, prevValue string, prevIndex uint64) (*etcd.Response, error)
|
CompareAndSwap(key, value string, ttl uint64, prevValue string, prevIndex uint64) (*etcd.Response, error)
|
||||||
Watch(prefix string, waitIndex uint64, recursive bool, receiver chan *etcd.Response, stop chan bool) (*etcd.Response, error)
|
Watch(prefix string, waitIndex uint64, recursive bool, receiver chan *etcd.Response, stop chan bool) (*etcd.Response, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// EtcdHelper offers common object marshalling/unmarshalling operations on an etcd client.
|
// EtcdHelper offers common object marshalling/unmarshalling operations on an etcd client.
|
||||||
type EtcdHelper struct {
|
type EtcdHelper struct {
|
||||||
Client EtcdGetSet
|
Client EtcdGetSet
|
||||||
Encoding Encoding
|
Codec Codec
|
||||||
// optional
|
// optional, no atomic operations can be performed without this interface
|
||||||
Versioning Versioning
|
ResourceVersioner ResourceVersioner
|
||||||
}
|
}
|
||||||
|
|
||||||
// IsEtcdNotFound returns true iff err is an etcd not found error.
|
// IsEtcdNotFound returns true iff err is an etcd not found error.
|
||||||
@ -136,7 +139,7 @@ func (h *EtcdHelper) ExtractList(key string, slicePtr interface{}) error {
|
|||||||
v := pv.Elem()
|
v := pv.Elem()
|
||||||
for _, node := range nodes {
|
for _, node := range nodes {
|
||||||
obj := reflect.New(v.Type().Elem())
|
obj := reflect.New(v.Type().Elem())
|
||||||
err = h.Encoding.DecodeInto([]byte(node.Value), obj.Interface())
|
err = h.Codec.DecodeInto([]byte(node.Value), obj.Interface())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -145,7 +148,7 @@ func (h *EtcdHelper) ExtractList(key string, slicePtr interface{}) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Unmarshals json found at key into objPtr. On a not found error, will either return
|
// ExtractObj unmarshals json found at key into objPtr. On a not found error, will either return
|
||||||
// a zero object of the requested type, or an error, depending on ignoreNotFound. Treats
|
// a zero object of the requested type, or an error, depending on ignoreNotFound. Treats
|
||||||
// empty responses and nil response nodes exactly like a not found error.
|
// empty responses and nil response nodes exactly like a not found error.
|
||||||
func (h *EtcdHelper) ExtractObj(key string, objPtr interface{}, ignoreNotFound bool) error {
|
func (h *EtcdHelper) ExtractObj(key string, objPtr interface{}, ignoreNotFound bool) error {
|
||||||
@ -170,21 +173,22 @@ func (h *EtcdHelper) bodyAndExtractObj(key string, objPtr interface{}, ignoreNot
|
|||||||
return "", 0, fmt.Errorf("key '%v' found no nodes field: %#v", key, response)
|
return "", 0, fmt.Errorf("key '%v' found no nodes field: %#v", key, response)
|
||||||
}
|
}
|
||||||
body = response.Node.Value
|
body = response.Node.Value
|
||||||
err = h.Encoding.DecodeInto([]byte(body), objPtr)
|
err = h.Codec.DecodeInto([]byte(body), objPtr)
|
||||||
if h.Versioning != nil {
|
if h.ResourceVersioner != nil {
|
||||||
_ = h.Versioning.SetResourceVersion(objPtr, response.Node.ModifiedIndex)
|
_ = h.ResourceVersioner.SetResourceVersion(objPtr, response.Node.ModifiedIndex)
|
||||||
// being unable to set the version does not prevent the object from being extracted
|
// being unable to set the version does not prevent the object from being extracted
|
||||||
}
|
}
|
||||||
return body, response.Node.ModifiedIndex, err
|
return body, response.Node.ModifiedIndex, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Create adds a new object at a key unless it already exists
|
||||||
func (h *EtcdHelper) CreateObj(key string, obj interface{}) error {
|
func (h *EtcdHelper) CreateObj(key string, obj interface{}) error {
|
||||||
data, err := h.Encoding.Encode(obj)
|
data, err := h.Codec.Encode(obj)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if h.Versioning != nil {
|
if h.ResourceVersioner != nil {
|
||||||
if version, err := h.Versioning.ResourceVersion(obj); err == nil && version != 0 {
|
if version, err := h.ResourceVersioner.ResourceVersion(obj); err == nil && version != 0 {
|
||||||
return errors.New("resourceVersion may not be set on objects to be created")
|
return errors.New("resourceVersion may not be set on objects to be created")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -193,15 +197,21 @@ func (h *EtcdHelper) CreateObj(key string, obj interface{}) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Delete removes the specified key
|
||||||
|
func (h *EtcdHelper) Delete(key string, recursive bool) error {
|
||||||
|
_, err := h.Client.Delete(key, recursive)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
// SetObj marshals obj via json, and stores under key. Will do an
|
// SetObj marshals obj via json, and stores under key. Will do an
|
||||||
// atomic update if obj's ResourceVersion field is set.
|
// atomic update if obj's ResourceVersion field is set.
|
||||||
func (h *EtcdHelper) SetObj(key string, obj interface{}) error {
|
func (h *EtcdHelper) SetObj(key string, obj interface{}) error {
|
||||||
data, err := h.Encoding.Encode(obj)
|
data, err := h.Codec.Encode(obj)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if h.Versioning != nil {
|
if h.ResourceVersioner != nil {
|
||||||
if version, err := h.Versioning.ResourceVersion(obj); err == nil && version != 0 {
|
if version, err := h.ResourceVersioner.ResourceVersion(obj); err == nil && version != 0 {
|
||||||
_, err = h.Client.CompareAndSwap(key, string(data), 0, "", version)
|
_, err = h.Client.CompareAndSwap(key, string(data), 0, "", version)
|
||||||
return err // err is shadowed!
|
return err // err is shadowed!
|
||||||
}
|
}
|
||||||
@ -253,7 +263,7 @@ func (h *EtcdHelper) AtomicUpdate(key string, ptrToType interface{}, tryUpdate E
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
data, err := h.Encoding.Encode(ret)
|
data, err := h.Codec.Encode(ret)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -288,7 +298,7 @@ func Everything(interface{}) bool {
|
|||||||
// API objects, and any items passing 'filter' are sent down the returned
|
// API objects, and any items passing 'filter' are sent down the returned
|
||||||
// watch.Interface.
|
// watch.Interface.
|
||||||
func (h *EtcdHelper) WatchList(key string, filter FilterFunc) (watch.Interface, error) {
|
func (h *EtcdHelper) WatchList(key string, filter FilterFunc) (watch.Interface, error) {
|
||||||
w := newEtcdWatcher(true, filter, h.Encoding)
|
w := newEtcdWatcher(true, filter, h.Codec)
|
||||||
go w.etcdWatch(h.Client, key)
|
go w.etcdWatch(h.Client, key)
|
||||||
return w, nil
|
return w, nil
|
||||||
}
|
}
|
||||||
@ -296,14 +306,14 @@ func (h *EtcdHelper) WatchList(key string, filter FilterFunc) (watch.Interface,
|
|||||||
// Watch begins watching the specified key. Events are decoded into
|
// Watch begins watching the specified key. Events are decoded into
|
||||||
// API objects and sent down the returned watch.Interface.
|
// API objects and sent down the returned watch.Interface.
|
||||||
func (h *EtcdHelper) Watch(key string) (watch.Interface, error) {
|
func (h *EtcdHelper) Watch(key string) (watch.Interface, error) {
|
||||||
w := newEtcdWatcher(false, nil, h.Encoding)
|
w := newEtcdWatcher(false, nil, h.Codec)
|
||||||
go w.etcdWatch(h.Client, key)
|
go w.etcdWatch(h.Client, key)
|
||||||
return w, nil
|
return w, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// etcdWatcher converts a native etcd watch to a watch.Interface.
|
// etcdWatcher converts a native etcd watch to a watch.Interface.
|
||||||
type etcdWatcher struct {
|
type etcdWatcher struct {
|
||||||
encoding Encoding
|
encoding Codec
|
||||||
|
|
||||||
list bool // If we're doing a recursive watch, should be true.
|
list bool // If we're doing a recursive watch, should be true.
|
||||||
filter FilterFunc
|
filter FilterFunc
|
||||||
@ -322,7 +332,7 @@ type etcdWatcher struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Returns a new etcdWatcher; if list is true, watch sub-nodes.
|
// Returns a new etcdWatcher; if list is true, watch sub-nodes.
|
||||||
func newEtcdWatcher(list bool, filter FilterFunc, encoding Encoding) *etcdWatcher {
|
func newEtcdWatcher(list bool, filter FilterFunc, encoding Codec) *etcdWatcher {
|
||||||
w := &etcdWatcher{
|
w := &etcdWatcher{
|
||||||
encoding: encoding,
|
encoding: encoding,
|
||||||
list: list,
|
list: list,
|
||||||
|
@ -40,8 +40,8 @@ type TestResource struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
var scheme *conversion.Scheme
|
var scheme *conversion.Scheme
|
||||||
var encoding = api.Encoding
|
var codec = api.Codec
|
||||||
var versioning = api.Versioning
|
var versioner = api.ResourceVersioner
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
scheme = conversion.NewScheme()
|
scheme = conversion.NewScheme()
|
||||||
@ -87,7 +87,7 @@ func TestExtractList(t *testing.T) {
|
|||||||
{JSONBase: api.JSONBase{ID: "baz"}},
|
{JSONBase: api.JSONBase{ID: "baz"}},
|
||||||
}
|
}
|
||||||
var got []api.Pod
|
var got []api.Pod
|
||||||
helper := EtcdHelper{fakeClient, encoding, versioning}
|
helper := EtcdHelper{fakeClient, codec, versioner}
|
||||||
err := helper.ExtractList("/some/key", &got)
|
err := helper.ExtractList("/some/key", &got)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("Unexpected error %#v", err)
|
t.Errorf("Unexpected error %#v", err)
|
||||||
@ -101,7 +101,7 @@ func TestExtractObj(t *testing.T) {
|
|||||||
fakeClient := MakeFakeEtcdClient(t)
|
fakeClient := MakeFakeEtcdClient(t)
|
||||||
expect := api.Pod{JSONBase: api.JSONBase{ID: "foo"}}
|
expect := api.Pod{JSONBase: api.JSONBase{ID: "foo"}}
|
||||||
fakeClient.Set("/some/key", util.MakeJSONString(expect), 0)
|
fakeClient.Set("/some/key", util.MakeJSONString(expect), 0)
|
||||||
helper := EtcdHelper{fakeClient, encoding, versioning}
|
helper := EtcdHelper{fakeClient, codec, versioner}
|
||||||
var got api.Pod
|
var got api.Pod
|
||||||
err := helper.ExtractObj("/some/key", &got, false)
|
err := helper.ExtractObj("/some/key", &got, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -134,7 +134,7 @@ func TestExtractObjNotFoundErr(t *testing.T) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
helper := EtcdHelper{fakeClient, encoding, versioning}
|
helper := EtcdHelper{fakeClient, codec, versioner}
|
||||||
try := func(key string) {
|
try := func(key string) {
|
||||||
var got api.Pod
|
var got api.Pod
|
||||||
err := helper.ExtractObj(key, &got, false)
|
err := helper.ExtractObj(key, &got, false)
|
||||||
@ -155,12 +155,12 @@ func TestExtractObjNotFoundErr(t *testing.T) {
|
|||||||
func TestSetObj(t *testing.T) {
|
func TestSetObj(t *testing.T) {
|
||||||
obj := api.Pod{JSONBase: api.JSONBase{ID: "foo"}}
|
obj := api.Pod{JSONBase: api.JSONBase{ID: "foo"}}
|
||||||
fakeClient := MakeFakeEtcdClient(t)
|
fakeClient := MakeFakeEtcdClient(t)
|
||||||
helper := EtcdHelper{fakeClient, encoding, versioning}
|
helper := EtcdHelper{fakeClient, codec, versioner}
|
||||||
err := helper.SetObj("/some/key", obj)
|
err := helper.SetObj("/some/key", obj)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("Unexpected error %#v", err)
|
t.Errorf("Unexpected error %#v", err)
|
||||||
}
|
}
|
||||||
data, err := encoding.Encode(obj)
|
data, err := codec.Encode(obj)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("Unexpected error %#v", err)
|
t.Errorf("Unexpected error %#v", err)
|
||||||
}
|
}
|
||||||
@ -184,12 +184,12 @@ func TestSetObjWithVersion(t *testing.T) {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
helper := EtcdHelper{fakeClient, encoding, versioning}
|
helper := EtcdHelper{fakeClient, codec, versioner}
|
||||||
err := helper.SetObj("/some/key", obj)
|
err := helper.SetObj("/some/key", obj)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Unexpected error %#v", err)
|
t.Fatalf("Unexpected error %#v", err)
|
||||||
}
|
}
|
||||||
data, err := encoding.Encode(obj)
|
data, err := codec.Encode(obj)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Unexpected error %#v", err)
|
t.Fatalf("Unexpected error %#v", err)
|
||||||
}
|
}
|
||||||
@ -200,15 +200,15 @@ func TestSetObjWithVersion(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestSetObjWithoutVersioning(t *testing.T) {
|
func TestSetObjWithoutResourceVersioner(t *testing.T) {
|
||||||
obj := api.Pod{JSONBase: api.JSONBase{ID: "foo"}}
|
obj := api.Pod{JSONBase: api.JSONBase{ID: "foo"}}
|
||||||
fakeClient := MakeFakeEtcdClient(t)
|
fakeClient := MakeFakeEtcdClient(t)
|
||||||
helper := EtcdHelper{fakeClient, encoding, nil}
|
helper := EtcdHelper{fakeClient, codec, nil}
|
||||||
err := helper.SetObj("/some/key", obj)
|
err := helper.SetObj("/some/key", obj)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("Unexpected error %#v", err)
|
t.Errorf("Unexpected error %#v", err)
|
||||||
}
|
}
|
||||||
data, err := encoding.Encode(obj)
|
data, err := codec.Encode(obj)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("Unexpected error %#v", err)
|
t.Errorf("Unexpected error %#v", err)
|
||||||
}
|
}
|
||||||
@ -222,8 +222,8 @@ func TestSetObjWithoutVersioning(t *testing.T) {
|
|||||||
func TestAtomicUpdate(t *testing.T) {
|
func TestAtomicUpdate(t *testing.T) {
|
||||||
fakeClient := MakeFakeEtcdClient(t)
|
fakeClient := MakeFakeEtcdClient(t)
|
||||||
fakeClient.TestIndex = true
|
fakeClient.TestIndex = true
|
||||||
encoding := scheme
|
codec := scheme
|
||||||
helper := EtcdHelper{fakeClient, encoding, api.JSONBaseVersioning{}}
|
helper := EtcdHelper{fakeClient, codec, api.NewJSONBaseResourceVersioner()}
|
||||||
|
|
||||||
// Create a new node.
|
// Create a new node.
|
||||||
fakeClient.ExpectNotFoundGet("/some/key")
|
fakeClient.ExpectNotFoundGet("/some/key")
|
||||||
@ -234,7 +234,7 @@ func TestAtomicUpdate(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("Unexpected error %#v", err)
|
t.Errorf("Unexpected error %#v", err)
|
||||||
}
|
}
|
||||||
data, err := encoding.Encode(obj)
|
data, err := codec.Encode(obj)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("Unexpected error %#v", err)
|
t.Errorf("Unexpected error %#v", err)
|
||||||
}
|
}
|
||||||
@ -259,7 +259,7 @@ func TestAtomicUpdate(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("Unexpected error %#v", err)
|
t.Errorf("Unexpected error %#v", err)
|
||||||
}
|
}
|
||||||
data, err = encoding.Encode(objUpdate)
|
data, err = codec.Encode(objUpdate)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("Unexpected error %#v", err)
|
t.Errorf("Unexpected error %#v", err)
|
||||||
}
|
}
|
||||||
@ -277,8 +277,8 @@ func TestAtomicUpdate(t *testing.T) {
|
|||||||
func TestAtomicUpdate_CreateCollision(t *testing.T) {
|
func TestAtomicUpdate_CreateCollision(t *testing.T) {
|
||||||
fakeClient := MakeFakeEtcdClient(t)
|
fakeClient := MakeFakeEtcdClient(t)
|
||||||
fakeClient.TestIndex = true
|
fakeClient.TestIndex = true
|
||||||
encoding := scheme
|
codec := scheme
|
||||||
helper := EtcdHelper{fakeClient, encoding, api.JSONBaseVersioning{}}
|
helper := EtcdHelper{fakeClient, codec, api.NewJSONBaseResourceVersioner()}
|
||||||
|
|
||||||
fakeClient.ExpectNotFoundGet("/some/key")
|
fakeClient.ExpectNotFoundGet("/some/key")
|
||||||
|
|
||||||
@ -317,7 +317,7 @@ func TestAtomicUpdate_CreateCollision(t *testing.T) {
|
|||||||
// Check that stored TestResource has received all updates.
|
// Check that stored TestResource has received all updates.
|
||||||
body := fakeClient.Data["/some/key"].R.Node.Value
|
body := fakeClient.Data["/some/key"].R.Node.Value
|
||||||
stored := &TestResource{}
|
stored := &TestResource{}
|
||||||
if err := encoding.DecodeInto([]byte(body), stored); err != nil {
|
if err := codec.DecodeInto([]byte(body), stored); err != nil {
|
||||||
t.Errorf("Error decoding stored value: %v", body)
|
t.Errorf("Error decoding stored value: %v", body)
|
||||||
}
|
}
|
||||||
if stored.Value != concurrency {
|
if stored.Value != concurrency {
|
||||||
@ -329,9 +329,9 @@ func TestWatchInterpretation_ListAdd(t *testing.T) {
|
|||||||
w := newEtcdWatcher(true, func(interface{}) bool {
|
w := newEtcdWatcher(true, func(interface{}) bool {
|
||||||
t.Errorf("unexpected filter call")
|
t.Errorf("unexpected filter call")
|
||||||
return true
|
return true
|
||||||
}, encoding)
|
}, codec)
|
||||||
pod := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}}
|
pod := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}}
|
||||||
podBytes, _ := encoding.Encode(pod)
|
podBytes, _ := codec.Encode(pod)
|
||||||
|
|
||||||
go w.sendResult(&etcd.Response{
|
go w.sendResult(&etcd.Response{
|
||||||
Action: "set",
|
Action: "set",
|
||||||
@ -353,9 +353,9 @@ func TestWatchInterpretation_Delete(t *testing.T) {
|
|||||||
w := newEtcdWatcher(true, func(interface{}) bool {
|
w := newEtcdWatcher(true, func(interface{}) bool {
|
||||||
t.Errorf("unexpected filter call")
|
t.Errorf("unexpected filter call")
|
||||||
return true
|
return true
|
||||||
}, encoding)
|
}, codec)
|
||||||
pod := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}}
|
pod := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}}
|
||||||
podBytes, _ := encoding.Encode(pod)
|
podBytes, _ := codec.Encode(pod)
|
||||||
|
|
||||||
go w.sendResult(&etcd.Response{
|
go w.sendResult(&etcd.Response{
|
||||||
Action: "delete",
|
Action: "delete",
|
||||||
@ -377,7 +377,7 @@ func TestWatchInterpretation_ResponseNotSet(t *testing.T) {
|
|||||||
w := newEtcdWatcher(false, func(interface{}) bool {
|
w := newEtcdWatcher(false, func(interface{}) bool {
|
||||||
t.Errorf("unexpected filter call")
|
t.Errorf("unexpected filter call")
|
||||||
return true
|
return true
|
||||||
}, encoding)
|
}, codec)
|
||||||
w.emit = func(e watch.Event) {
|
w.emit = func(e watch.Event) {
|
||||||
t.Errorf("Unexpected emit: %v", e)
|
t.Errorf("Unexpected emit: %v", e)
|
||||||
}
|
}
|
||||||
@ -391,7 +391,7 @@ func TestWatchInterpretation_ResponseNoNode(t *testing.T) {
|
|||||||
w := newEtcdWatcher(false, func(interface{}) bool {
|
w := newEtcdWatcher(false, func(interface{}) bool {
|
||||||
t.Errorf("unexpected filter call")
|
t.Errorf("unexpected filter call")
|
||||||
return true
|
return true
|
||||||
}, encoding)
|
}, codec)
|
||||||
w.emit = func(e watch.Event) {
|
w.emit = func(e watch.Event) {
|
||||||
t.Errorf("Unexpected emit: %v", e)
|
t.Errorf("Unexpected emit: %v", e)
|
||||||
}
|
}
|
||||||
@ -404,7 +404,7 @@ func TestWatchInterpretation_ResponseBadData(t *testing.T) {
|
|||||||
w := newEtcdWatcher(false, func(interface{}) bool {
|
w := newEtcdWatcher(false, func(interface{}) bool {
|
||||||
t.Errorf("unexpected filter call")
|
t.Errorf("unexpected filter call")
|
||||||
return true
|
return true
|
||||||
}, encoding)
|
}, codec)
|
||||||
w.emit = func(e watch.Event) {
|
w.emit = func(e watch.Event) {
|
||||||
t.Errorf("Unexpected emit: %v", e)
|
t.Errorf("Unexpected emit: %v", e)
|
||||||
}
|
}
|
||||||
@ -418,7 +418,7 @@ func TestWatchInterpretation_ResponseBadData(t *testing.T) {
|
|||||||
|
|
||||||
func TestWatch(t *testing.T) {
|
func TestWatch(t *testing.T) {
|
||||||
fakeClient := MakeFakeEtcdClient(t)
|
fakeClient := MakeFakeEtcdClient(t)
|
||||||
h := EtcdHelper{fakeClient, encoding, versioning}
|
h := EtcdHelper{fakeClient, codec, versioner}
|
||||||
|
|
||||||
watching, err := h.Watch("/some/key")
|
watching, err := h.Watch("/some/key")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -429,7 +429,7 @@ func TestWatch(t *testing.T) {
|
|||||||
|
|
||||||
// Test normal case
|
// Test normal case
|
||||||
pod := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}}
|
pod := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}}
|
||||||
podBytes, _ := encoding.Encode(pod)
|
podBytes, _ := codec.Encode(pod)
|
||||||
fakeClient.WatchResponse <- &etcd.Response{
|
fakeClient.WatchResponse <- &etcd.Response{
|
||||||
Action: "set",
|
Action: "set",
|
||||||
Node: &etcd.Node{
|
Node: &etcd.Node{
|
||||||
@ -459,7 +459,7 @@ func TestWatch(t *testing.T) {
|
|||||||
|
|
||||||
func TestWatchPurposefulShutdown(t *testing.T) {
|
func TestWatchPurposefulShutdown(t *testing.T) {
|
||||||
fakeClient := MakeFakeEtcdClient(t)
|
fakeClient := MakeFakeEtcdClient(t)
|
||||||
h := EtcdHelper{fakeClient, encoding, versioning}
|
h := EtcdHelper{fakeClient, codec, versioner}
|
||||||
|
|
||||||
// Test purposeful shutdown
|
// Test purposeful shutdown
|
||||||
watching, err := h.Watch("/some/key")
|
watching, err := h.Watch("/some/key")
|
||||||
|
Loading…
Reference in New Issue
Block a user