Move StorageInterface to pkg/storage.

This commit is contained in:
Wojciech Tyczynski
2015-07-30 09:27:38 +02:00
parent 769230e735
commit d17985f1ad
43 changed files with 284 additions and 223 deletions

View File

@@ -27,6 +27,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/conversion"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/storage"
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools/metrics"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
@@ -35,7 +36,7 @@ import (
"github.com/golang/glog"
)
func NewEtcdStorage(client EtcdClient, codec runtime.Codec, prefix string) StorageInterface {
func NewEtcdStorage(client EtcdClient, codec runtime.Codec, prefix string) storage.StorageInterface {
return &etcdHelper{
client: client,
codec: codec,
@@ -52,7 +53,7 @@ type etcdHelper struct {
codec runtime.Codec
copier runtime.ObjectCopier
// optional, has to be set to perform any atomic operations
versioner StorageVersioner
versioner storage.StorageVersioner
// prefix for all etcd keys
pathPrefix string
@@ -76,7 +77,7 @@ func (h *etcdHelper) Backends() []string {
}
// Implements StorageInterface.
func (h *etcdHelper) Versioner() StorageVersioner {
func (h *etcdHelper) Versioner() storage.StorageVersioner {
return h.versioner
}
@@ -178,7 +179,7 @@ func (h *etcdHelper) RecursiveDelete(key string, recursive bool) error {
}
// Implements StorageInterface.
func (h *etcdHelper) Watch(key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error) {
func (h *etcdHelper) Watch(key string, resourceVersion uint64, filter storage.FilterFunc) (watch.Interface, error) {
key = h.prefixEtcdKey(key)
w := newEtcdWatcher(false, nil, filter, h.codec, h.versioner, nil, h)
go w.etcdWatch(h.client, key, resourceVersion)
@@ -186,7 +187,7 @@ func (h *etcdHelper) Watch(key string, resourceVersion uint64, filter FilterFunc
}
// Implements StorageInterface.
func (h *etcdHelper) WatchList(key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error) {
func (h *etcdHelper) WatchList(key string, resourceVersion uint64, filter storage.FilterFunc) (watch.Interface, error) {
key = h.prefixEtcdKey(key)
w := newEtcdWatcher(true, exceptKey(key), filter, h.codec, h.versioner, nil, h)
go w.etcdWatch(h.client, key, resourceVersion)
@@ -364,18 +365,18 @@ func (h *etcdHelper) listEtcdNode(key string) ([]*etcd.Node, uint64, error) {
return result.Node.Nodes, result.EtcdIndex, nil
}
type SimpleEtcdUpdateFunc func(runtime.Object) (runtime.Object, error)
type SimpleUpdateFunc func(runtime.Object) (runtime.Object, error)
// SimpleUpdateFunc converts SimpleEtcdUpdateFunc into EtcdUpdateFunc
func SimpleUpdate(fn SimpleEtcdUpdateFunc) StorageUpdateFunc {
return func(input runtime.Object, _ ResponseMeta) (runtime.Object, *uint64, error) {
// SimpleUpdateFunc converts SimpleUpdateFunc into StorageUpdateFunc
func SimpleUpdate(fn SimpleUpdateFunc) storage.StorageUpdateFunc {
return func(input runtime.Object, _ storage.ResponseMeta) (runtime.Object, *uint64, error) {
out, err := fn(input)
return out, nil, err
}
}
// Implements StorageInterface.
func (h *etcdHelper) GuaranteedUpdate(key string, ptrToType runtime.Object, ignoreNotFound bool, tryUpdate StorageUpdateFunc) error {
func (h *etcdHelper) GuaranteedUpdate(key string, ptrToType runtime.Object, ignoreNotFound bool, tryUpdate storage.StorageUpdateFunc) error {
v, err := conversion.EnforcePtr(ptrToType)
if err != nil {
// Panic is appropriate, because this is a programming error.
@@ -388,7 +389,7 @@ func (h *etcdHelper) GuaranteedUpdate(key string, ptrToType runtime.Object, igno
if err != nil {
return err
}
meta := ResponseMeta{}
meta := storage.ResponseMeta{}
if node != nil {
meta.TTL = node.TTL
if node.Expiration != nil {

View File

@@ -34,6 +34,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/testapi"
"github.com/GoogleCloudPlatform/kubernetes/pkg/conversion"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/storage"
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools/etcdtest"
"github.com/coreos/go-etcd/etcd"
"github.com/stretchr/testify/assert"
@@ -592,7 +593,7 @@ func TestGuaranteedUpdateTTL(t *testing.T) {
// Create a new node.
fakeClient.ExpectNotFoundGet(key)
obj := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 1}
err := helper.GuaranteedUpdate("/some/key", &TestResource{}, true, func(in runtime.Object, res ResponseMeta) (runtime.Object, *uint64, error) {
err := helper.GuaranteedUpdate("/some/key", &TestResource{}, true, func(in runtime.Object, res storage.ResponseMeta) (runtime.Object, *uint64, error) {
if res.TTL != 0 {
t.Fatalf("unexpected response meta: %#v", res)
}
@@ -618,7 +619,7 @@ func TestGuaranteedUpdateTTL(t *testing.T) {
// Update an existing node.
callbackCalled := false
objUpdate := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 2}
err = helper.GuaranteedUpdate("/some/key", &TestResource{}, true, func(in runtime.Object, res ResponseMeta) (runtime.Object, *uint64, error) {
err = helper.GuaranteedUpdate("/some/key", &TestResource{}, true, func(in runtime.Object, res storage.ResponseMeta) (runtime.Object, *uint64, error) {
if res.TTL != 10 {
t.Fatalf("unexpected response meta: %#v", res)
}
@@ -650,7 +651,7 @@ func TestGuaranteedUpdateTTL(t *testing.T) {
// Update an existing node and change ttl
callbackCalled = false
objUpdate = &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 3}
err = helper.GuaranteedUpdate("/some/key", &TestResource{}, true, func(in runtime.Object, res ResponseMeta) (runtime.Object, *uint64, error) {
err = helper.GuaranteedUpdate("/some/key", &TestResource{}, true, func(in runtime.Object, res storage.ResponseMeta) (runtime.Object, *uint64, error) {
if res.TTL != 10 {
t.Fatalf("unexpected response meta: %#v", res)
}

View File

@@ -22,6 +22,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/storage"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
)
@@ -74,4 +75,4 @@ func (a APIObjectVersioner) ObjectResourceVersion(obj runtime.Object) (uint64, e
}
// APIObjectVersioner implements StorageVersioner
var _ StorageVersioner = APIObjectVersioner{}
var _ storage.StorageVersioner = APIObjectVersioner{}

View File

@@ -24,6 +24,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/storage"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/fielderrors"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
@@ -41,15 +42,6 @@ const (
EtcdDelete = "delete"
)
// FilterFunc is a predicate which takes an API object and returns true
// iff the object should remain in the set.
type FilterFunc func(obj runtime.Object) bool
// Everything is a FilterFunc which accepts all objects.
func Everything(runtime.Object) bool {
return true
}
// ParseWatchResourceVersion takes a resource version argument and converts it to
// the etcd version we should pass to helper.Watch(). Because resourceVersion is
// an opaque value, the default watch behavior for non-zero watch is to watch
@@ -82,12 +74,12 @@ func exceptKey(except string) includeFunc {
// etcdWatcher converts a native etcd watch to a watch.Interface.
type etcdWatcher struct {
encoding runtime.Codec
versioner StorageVersioner
versioner storage.StorageVersioner
transform TransformFunc
list bool // If we're doing a recursive watch, should be true.
include includeFunc
filter FilterFunc
filter storage.FilterFunc
etcdIncoming chan *etcd.Response
etcdError chan error
@@ -110,7 +102,7 @@ const watchWaitDuration = 100 * time.Millisecond
// newEtcdWatcher returns a new etcdWatcher; if list is true, watch sub-nodes. If you provide a transform
// and a versioner, the versioner must be able to handle the objects that transform creates.
func newEtcdWatcher(list bool, include includeFunc, filter FilterFunc, encoding runtime.Codec, versioner StorageVersioner, transform TransformFunc, cache etcdCache) *etcdWatcher {
func newEtcdWatcher(list bool, include includeFunc, filter storage.FilterFunc, encoding runtime.Codec, versioner storage.StorageVersioner, transform TransformFunc, cache etcdCache) *etcdWatcher {
w := &etcdWatcher{
encoding: encoding,
versioner: versioner,

View File

@@ -25,6 +25,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/storage"
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools/etcdtest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
"github.com/coreos/go-etcd/etcd"
@@ -165,7 +166,7 @@ func TestWatchInterpretations(t *testing.T) {
}
func TestWatchInterpretation_ResponseNotSet(t *testing.T) {
w := newEtcdWatcher(false, nil, Everything, codec, versioner, nil, &fakeEtcdCache{})
w := newEtcdWatcher(false, nil, storage.Everything, codec, versioner, nil, &fakeEtcdCache{})
w.emit = func(e watch.Event) {
t.Errorf("Unexpected emit: %v", e)
}
@@ -179,7 +180,7 @@ func TestWatchInterpretation_ResponseNotSet(t *testing.T) {
func TestWatchInterpretation_ResponseNoNode(t *testing.T) {
actions := []string{"create", "set", "compareAndSwap", "delete"}
for _, action := range actions {
w := newEtcdWatcher(false, nil, Everything, codec, versioner, nil, &fakeEtcdCache{})
w := newEtcdWatcher(false, nil, storage.Everything, codec, versioner, nil, &fakeEtcdCache{})
w.emit = func(e watch.Event) {
t.Errorf("Unexpected emit: %v", e)
}
@@ -193,7 +194,7 @@ func TestWatchInterpretation_ResponseNoNode(t *testing.T) {
func TestWatchInterpretation_ResponseBadData(t *testing.T) {
actions := []string{"create", "set", "compareAndSwap", "delete"}
for _, action := range actions {
w := newEtcdWatcher(false, nil, Everything, codec, versioner, nil, &fakeEtcdCache{})
w := newEtcdWatcher(false, nil, storage.Everything, codec, versioner, nil, &fakeEtcdCache{})
w.emit = func(e watch.Event) {
t.Errorf("Unexpected emit: %v", e)
}
@@ -220,7 +221,7 @@ func TestWatchEtcdError(t *testing.T) {
fakeClient.WatchImmediateError = fmt.Errorf("immediate error")
h := newEtcdHelper(fakeClient, codec, etcdtest.PathPrefix())
watching, err := h.Watch("/some/key", 4, Everything)
watching, err := h.Watch("/some/key", 4, storage.Everything)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
@@ -250,7 +251,7 @@ func TestWatch(t *testing.T) {
fakeClient.expectNotFoundGetSet[prefixedKey] = struct{}{}
h := newEtcdHelper(fakeClient, codec, etcdtest.PathPrefix())
watching, err := h.Watch(key, 0, Everything)
watching, err := h.Watch(key, 0, storage.Everything)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
@@ -425,7 +426,7 @@ func TestWatchEtcdState(t *testing.T) {
}
h := newEtcdHelper(fakeClient, codec, etcdtest.PathPrefix())
watching, err := h.Watch(baseKey, testCase.From, Everything)
watching, err := h.Watch(baseKey, testCase.From, storage.Everything)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
@@ -499,7 +500,7 @@ func TestWatchFromZeroIndex(t *testing.T) {
fakeClient.Data[prefixedKey] = testCase.Response
h := newEtcdHelper(fakeClient, codec, etcdtest.PathPrefix())
watching, err := h.Watch(key, 0, Everything)
watching, err := h.Watch(key, 0, storage.Everything)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
@@ -560,7 +561,7 @@ func TestWatchListFromZeroIndex(t *testing.T) {
}
h := newEtcdHelper(fakeClient, codec, etcdtest.PathPrefix())
watching, err := h.WatchList(key, 0, Everything)
watching, err := h.WatchList(key, 0, storage.Everything)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
@@ -600,7 +601,7 @@ func TestWatchListIgnoresRootKey(t *testing.T) {
fakeClient := NewFakeEtcdClient(t)
h := newEtcdHelper(fakeClient, codec, etcdtest.PathPrefix())
watching, err := h.WatchList(key, 1, Everything)
watching, err := h.WatchList(key, 1, storage.Everything)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
@@ -653,7 +654,7 @@ func TestWatchFromNotFound(t *testing.T) {
}
h := newEtcdHelper(fakeClient, codec, etcdtest.PathPrefix())
watching, err := h.Watch(key, 0, Everything)
watching, err := h.Watch(key, 0, storage.Everything)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
@@ -680,7 +681,7 @@ func TestWatchFromOtherError(t *testing.T) {
}
h := newEtcdHelper(fakeClient, codec, etcdtest.PathPrefix())
watching, err := h.Watch(key, 0, Everything)
watching, err := h.Watch(key, 0, storage.Everything)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
@@ -716,7 +717,7 @@ func TestWatchPurposefulShutdown(t *testing.T) {
fakeClient.expectNotFoundGetSet[prefixedKey] = struct{}{}
// Test purposeful shutdown
watching, err := h.Watch(key, 0, Everything)
watching, err := h.Watch(key, 0, storage.Everything)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

View File

@@ -17,11 +17,6 @@ limitations under the License.
package tools
import (
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
"github.com/coreos/go-etcd/etcd"
)
@@ -51,119 +46,3 @@ type EtcdClient interface {
// the etcd client interface which doesn't, and it doesn't seem worth it to wrap the api.
Watch(prefix string, waitIndex uint64, recursive bool, receiver chan *etcd.Response, stop chan bool) (*etcd.Response, error)
}
// StorageVersioner abstracts setting and retrieving metadata fields from the etcd response onto the object
// or list.
type StorageVersioner interface {
// UpdateObject sets etcd storage metadata into an API object. Returns an error if the object
// cannot be updated correctly. May return nil if the requested object does not need metadata
// from etcd.
UpdateObject(obj runtime.Object, expiration *time.Time, resourceVersion uint64) error
// UpdateList sets the resource version into an API list object. Returns an error if the object
// cannot be updated correctly. May return nil if the requested object does not need metadata
// from etcd.
UpdateList(obj runtime.Object, resourceVersion uint64) error
// ObjectResourceVersion returns the resource version (for persistence) of the specified object.
// Should return an error if the specified object does not have a persistable version.
ObjectResourceVersion(obj runtime.Object) (uint64, error)
}
// ResponseMeta contains information about the etcd metadata that is associated with
// an object. It abstracts the actual underlying objects to prevent coupling with etcd
// and to improve testability.
type ResponseMeta struct {
// TTL is the time to live of the node that contained the returned object. It may be
// zero or negative in some cases (objects may be expired after the requested
// expiration time due to server lag).
TTL int64
// Expiration is the time at which the node that contained the returned object will expire and be deleted.
// This can be nil if there is no expiration time set for the node.
Expiration *time.Time
// The resource version of the node that contained the returned object.
ResourceVersion uint64
}
// Pass an StorageUpdateFunc to StorageInterface.GuaranteedUpdate to make an update
// that is guaranteed to succeed.
// See the comment for GuaranteedUpdate for more details.
type StorageUpdateFunc func(input runtime.Object, res ResponseMeta) (output runtime.Object, ttl *uint64, err error)
// StorageInterface offers a common interface for object marshaling/unmarshling operations and
// hids all the storage-related operations behind it.
type StorageInterface interface {
// Returns list of servers addresses of the underyling database.
// TODO: This method is used only in a single place. Consider refactoring and getting rid
// of this method from the interface.
Backends() []string
// Returns StorageVersioner associated with this interface.
Versioner() StorageVersioner
// Create adds a new object at a key unless it already exists. 'ttl' is time-to-live
// in seconds (0 means forever). If no error is returned and out is not nil, out will be
// set to the read value from etcd.
Create(key string, obj, out runtime.Object, ttl uint64) error
// Set marshals obj via json and stores in etcd under key. Will do an atomic update
// if obj's ResourceVersion field is set. 'ttl' is time-to-live in seconds (0 means forever).
// If no error is returned and out is not nil, out will be set to the read value from etcd.
Set(key string, obj, out runtime.Object, ttl uint64) error
// Delete removes the specified key and returns the value that existed at that spot.
Delete(key string, out runtime.Object) error
// RecursiveDelete removes the specified key.
// TODO: Get rid of this method and use Delete() instead.
RecursiveDelete(key string, recursive bool) error
// Watch begins watching the specified key. Events are decoded into API objects,
// and any items passing 'filter' are sent down to returned watch.Interface.
// resourceVersion may be used to specify what version to begin watching
// (e.g. reconnecting without missing any updates).
Watch(key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error)
// WatchList begins watching the specified key's items. Items are decoded into API
// objects and any item passing 'filter' are sent down to returned watch.Interface.
// resourceVersion may be used to specify what version to begin watching
// (e.g. reconnecting without missing any updates).
WatchList(key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error)
// Get unmarshals json found at key into objPtr. On a not found error, will either
// return a zero object of the requested type, or an error, depending on ignoreNotFound.
// Treats empty responses and nil response nodes exactly like a not found error.
Get(key string, objPtr runtime.Object, ignoreNotFound bool) error
// GetToList unmarshals json found at key and opaque it into *List api object
// (an object that satisfies the runtime.IsList definition).
GetToList(key string, listObj runtime.Object) error
// List unmarshalls jsons found at directory defined by key and opaque them
// into *List api object (an object that satisfies runtime.IsList definition).
List(key string, listObj runtime.Object) error
// GuaranteedUpdate keeps calling 'tryUpdate()' to update key 'key' (of type 'ptrToType')
// retrying the update until success if there is etcd index conflict.
// Note that object passed to tryUpdate may change acress incovations of tryUpdate() if
// other writers are simultanously updateing it, to tryUpdate() needs to take into account
// the current contents of the object when deciding how the update object should look.
//
// Exmaple:
//
// s := /* implementation of StorageInterface */
// err := s.GuaranteedUpdate(
// "myKey", &MyType{}, true,
// func(input runtime.Object, res ResponseMeta) (runtime.Object, *uint64, error) {
// // Before each incovation of the user defined function, "input" is reset to
// // etcd's current contents for "myKey".
// curr := input.(*MyType) // Guaranteed to succeed.
//
// // Make the modification
// curr.Counter++
//
// // Return the modified object - return an error to stop iterating. Return
// // a uint64 to alter the TTL on the object, or nil to keep it the same value.
// return cur, nil, nil
// }
// })
GuaranteedUpdate(key string, ptrToType runtime.Object, ignoreNotFound bool, tryUpdate StorageUpdateFunc) error
}