Simplify ResourceWatcher interface to one function.

This commit is contained in:
Daniel Smith 2014-08-06 14:55:37 -07:00
parent 71709ae09e
commit 49cded3800
7 changed files with 34 additions and 79 deletions

View File

@ -64,13 +64,8 @@ type SimpleRESTStorage struct {
updated *Simple updated *Simple
created *Simple created *Simple
// Valid if WatchAll or WatchSingle is called // These are set when Watch is called
fakeWatch *watch.FakeWatcher fakeWatch *watch.FakeWatcher
// Set if WatchSingle is called
requestedID string
// Set if WatchAll is called
requestedLabelSelector labels.Selector requestedLabelSelector labels.Selector
requestedFieldSelector labels.Selector requestedFieldSelector labels.Selector
requestedResourceVersion uint64 requestedResourceVersion uint64
@ -135,22 +130,11 @@ func (storage *SimpleRESTStorage) Update(obj interface{}) (<-chan interface{}, e
} }
// Implement ResourceWatcher. // Implement ResourceWatcher.
func (storage *SimpleRESTStorage) WatchAll(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { func (storage *SimpleRESTStorage) Watch(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) {
storage.requestedLabelSelector = label storage.requestedLabelSelector = label
storage.requestedFieldSelector = field storage.requestedFieldSelector = field
storage.requestedResourceVersion = resourceVersion storage.requestedResourceVersion = resourceVersion
if err := storage.errors["watchAll"]; err != nil { if err := storage.errors["watch"]; err != nil {
return nil, err
}
storage.fakeWatch = watch.NewFake()
return storage.fakeWatch, nil
}
// Implement ResourceWatcher.
func (storage *SimpleRESTStorage) WatchSingle(id string, resourceVersion uint64) (watch.Interface, error) {
storage.requestedID = id
storage.requestedResourceVersion = resourceVersion
if err := storage.errors["watchSingle"]; err != nil {
return nil, err return nil, err
} }
storage.fakeWatch = watch.NewFake() storage.fakeWatch = watch.NewFake()

View File

@ -33,11 +33,13 @@ type RESTStorage interface {
List(labels.Selector) (interface{}, error) List(labels.Selector) (interface{}, error)
// Get finds a resource in the storage by id and returns it. // Get finds a resource in the storage by id and returns it.
// Although it can return an arbitrary error value, IsNotFound(err) is true for the returned error value err when the specified resource is not found. // Although it can return an arbitrary error value, IsNotFound(err) is true for the
// returned error value err when the specified resource is not found.
Get(id string) (interface{}, error) Get(id string) (interface{}, error)
// Delete finds a resource in the storage and deletes it. // Delete finds a resource in the storage and deletes it.
// Although it can return an arbitrary error value, IsNotFound(err) is true for the returned error value err when the specified resource is not found. // Although it can return an arbitrary error value, IsNotFound(err) is true for the
// returned error value err when the specified resource is not found.
Delete(id string) (<-chan interface{}, error) Delete(id string) (<-chan interface{}, error)
Create(interface{}) (<-chan interface{}, error) Create(interface{}) (<-chan interface{}, error)
@ -47,10 +49,9 @@ type RESTStorage interface {
// ResourceWatcher should be implemented by all RESTStorage objects that // ResourceWatcher should be implemented by all RESTStorage objects that
// want to offer the ability to watch for changes through the watch api. // want to offer the ability to watch for changes through the watch api.
type ResourceWatcher interface { type ResourceWatcher interface {
// label selects on labels; field selects on the objects fields. Not all fields // 'label' selects on labels; 'field' selects on the object's fields. Not all fields
// are supported; an error will be returned if you try to select for a field that // are supported; an error should be returned if 'field' tries to select on a field that
// isn't supported. // isn't supported. 'resourceVersion' allows for continuing/starting a watch at a
WatchAll(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) // particular version.
// TODO: Decide if we need to keep WatchSingle? Watch(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error)
WatchSingle(id string, resourceVersion uint64) (watch.Interface, error)
} }

View File

@ -33,8 +33,7 @@ type WatchHandler struct {
storage map[string]RESTStorage storage map[string]RESTStorage
} }
func (s *APIServer) getWatchParams(query url.Values) (id string, label, field labels.Selector, resourceVersion uint64) { func getWatchParams(query url.Values) (label, field labels.Selector, resourceVersion uint64) {
id = query.Get("id")
if s, err := labels.ParseSelector(query.Get("labels")); err != nil { if s, err := labels.ParseSelector(query.Get("labels")); err != nil {
label = labels.Everything() label = labels.Everything()
} else { } else {
@ -48,7 +47,7 @@ func (s *APIServer) getWatchParams(query url.Values) (id string, label, field la
if rv, err := strconv.ParseUint(query.Get("resourceVersion"), 10, 64); err == nil { if rv, err := strconv.ParseUint(query.Get("resourceVersion"), 10, 64); err == nil {
resourceVersion = rv resourceVersion = rv
} }
return id, label, field, resourceVersion return label, field, resourceVersion
} }
// handleWatch processes a watch request // handleWatch processes a watch request
@ -62,14 +61,8 @@ func (h *WatchHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
notFound(w, req) notFound(w, req)
} }
if watcher, ok := storage.(ResourceWatcher); ok { if watcher, ok := storage.(ResourceWatcher); ok {
var watching watch.Interface label, field, resourceVersion := getWatchParams(req.URL.Query())
var err error watching, err := watcher.Watch(label, field, resourceVersion)
id, label, field, resourceVersion := s.getWatchParams(req.URL.Query())
if id != "" {
watching, err = watcher.WatchSingle(id, resourceVersion)
} else {
watching, err = watcher.WatchAll(label, field, resourceVersion)
}
if err != nil { if err != nil {
internalError(err, w) internalError(err, w)
return return

View File

@ -40,6 +40,7 @@ var watchTestTable = []struct {
func TestWatchWebsocket(t *testing.T) { func TestWatchWebsocket(t *testing.T) {
simpleStorage := &SimpleRESTStorage{} simpleStorage := &SimpleRESTStorage{}
_ = ResourceWatcher(simpleStorage) // Give compile error if this doesn't work.
handler := New(map[string]RESTStorage{ handler := New(map[string]RESTStorage{
"foo": simpleStorage, "foo": simpleStorage,
}, codec, "/prefix/version") }, codec, "/prefix/version")
@ -48,17 +49,13 @@ func TestWatchWebsocket(t *testing.T) {
dest, _ := url.Parse(server.URL) dest, _ := url.Parse(server.URL)
dest.Scheme = "ws" // Required by websocket, though the server never sees it. dest.Scheme = "ws" // Required by websocket, though the server never sees it.
dest.Path = "/prefix/version/watch/foo" dest.Path = "/prefix/version/watch/foo"
dest.RawQuery = "id=myID" dest.RawQuery = ""
ws, err := websocket.Dial(dest.String(), "", "http://localhost") ws, err := websocket.Dial(dest.String(), "", "http://localhost")
if err != nil { if err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
if a, e := simpleStorage.requestedID, "myID"; a != e {
t.Fatalf("Expected %v, got %v", e, a)
}
try := func(action watch.EventType, object interface{}) { try := func(action watch.EventType, object interface{}) {
// Send // Send
simpleStorage.fakeWatch.Action(action, object) simpleStorage.fakeWatch.Action(action, object)
@ -98,7 +95,7 @@ func TestWatchHTTP(t *testing.T) {
dest, _ := url.Parse(server.URL) dest, _ := url.Parse(server.URL)
dest.Path = "/prefix/version/watch/foo" dest.Path = "/prefix/version/watch/foo"
dest.RawQuery = "id=myID" dest.RawQuery = ""
request, err := http.NewRequest("GET", dest.String(), nil) request, err := http.NewRequest("GET", dest.String(), nil)
if err != nil { if err != nil {
@ -114,10 +111,6 @@ func TestWatchHTTP(t *testing.T) {
t.Errorf("Unexpected response %#v", response) t.Errorf("Unexpected response %#v", response)
} }
if a, e := simpleStorage.requestedID, "myID"; a != e {
t.Fatalf("Expected %v, got %v", e, a)
}
decoder := json.NewDecoder(response.Body) decoder := json.NewDecoder(response.Body)
try := func(action watch.EventType, object interface{}) { try := func(action watch.EventType, object interface{}) {
@ -164,26 +157,27 @@ func TestWatchParamParsing(t *testing.T) {
resourceVersion uint64 resourceVersion uint64
labelSelector string labelSelector string
fieldSelector string fieldSelector string
id string
}{ }{
{ {
rawQuery: "id=myID&resourceVersion=1234", rawQuery: "resourceVersion=1234",
resourceVersion: 1234, resourceVersion: 1234,
labelSelector: "", labelSelector: "",
fieldSelector: "", fieldSelector: "",
id: "myID",
}, { }, {
rawQuery: "resourceVersion=314159&fields=Host%3D&labels=name%3Dfoo", rawQuery: "resourceVersion=314159&fields=Host%3D&labels=name%3Dfoo",
resourceVersion: 314159, resourceVersion: 314159,
labelSelector: "name=foo", labelSelector: "name=foo",
fieldSelector: "Host=", fieldSelector: "Host=",
id: "", }, {
rawQuery: "fields=ID%3dfoo&resourceVersion=1492",
resourceVersion: 1492,
labelSelector: "",
fieldSelector: "ID=foo",
}, { }, {
rawQuery: "", rawQuery: "",
resourceVersion: 0, resourceVersion: 0,
labelSelector: "", labelSelector: "",
fieldSelector: "", fieldSelector: "",
id: "",
}, },
} }
@ -191,7 +185,6 @@ func TestWatchParamParsing(t *testing.T) {
simpleStorage.requestedLabelSelector = nil simpleStorage.requestedLabelSelector = nil
simpleStorage.requestedFieldSelector = nil simpleStorage.requestedFieldSelector = nil
simpleStorage.requestedResourceVersion = 5 // Prove this is set in all cases simpleStorage.requestedResourceVersion = 5 // Prove this is set in all cases
simpleStorage.requestedID = ""
dest.RawQuery = item.rawQuery dest.RawQuery = item.rawQuery
resp, err := http.Get(dest.String()) resp, err := http.Get(dest.String())
if err != nil { if err != nil {
@ -199,19 +192,14 @@ func TestWatchParamParsing(t *testing.T) {
continue continue
} }
resp.Body.Close() resp.Body.Close()
if e, a := item.id, simpleStorage.requestedID; e != a {
t.Errorf("%v: expected %v, got %v", item.rawQuery, e, a)
}
if e, a := item.resourceVersion, simpleStorage.requestedResourceVersion; e != a { if e, a := item.resourceVersion, simpleStorage.requestedResourceVersion; e != a {
t.Errorf("%v: expected %v, got %v", item.rawQuery, e, a) t.Errorf("%v: expected %v, got %v", item.rawQuery, e, a)
} }
if simpleStorage.requestedID == "" { if e, a := item.labelSelector, simpleStorage.requestedLabelSelector.String(); e != a {
if e, a := item.labelSelector, simpleStorage.requestedLabelSelector.String(); e != a { t.Errorf("%v: expected %v, got %v", item.rawQuery, e, a)
t.Errorf("%v: expected %v, got %v", item.rawQuery, e, a) }
} if e, a := item.fieldSelector, simpleStorage.requestedFieldSelector.String(); e != a {
if e, a := item.fieldSelector, simpleStorage.requestedFieldSelector.String(); e != a { t.Errorf("%v: expected %v, got %v", item.rawQuery, e, a)
t.Errorf("%v: expected %v, got %v", item.rawQuery, e, a)
}
} }
} }
} }

View File

@ -24,7 +24,6 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
"github.com/golang/glog" "github.com/golang/glog"
) )
@ -37,9 +36,6 @@ type ReplicationManager struct {
// To allow injection of syncReplicationController for testing. // To allow injection of syncReplicationController for testing.
syncHandler func(controllerSpec api.ReplicationController) error syncHandler func(controllerSpec api.ReplicationController) error
// To allow injection of watch creation.
watchMaker func() (watch.Interface, error)
} }
// PodControlInterface is an interface that knows how to add or delete pods // PodControlInterface is an interface that knows how to add or delete pods

View File

@ -17,7 +17,6 @@ limitations under the License.
package registry package registry
import ( import (
"errors"
"fmt" "fmt"
"time" "time"
@ -138,12 +137,6 @@ func (storage *ControllerRegistryStorage) waitForController(ctrl api.Replication
// WatchAll returns ReplicationController events via a watch.Interface, implementing // WatchAll returns ReplicationController events via a watch.Interface, implementing
// apiserver.ResourceWatcher. // apiserver.ResourceWatcher.
func (storage *ControllerRegistryStorage) WatchAll(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { func (storage *ControllerRegistryStorage) Watch(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) {
return storage.registry.WatchControllers(label, field, resourceVersion) return storage.registry.WatchControllers(label, field, resourceVersion)
} }
// WatchSingle returns events for a single ReplicationController via a watch.Interface,
// implementing apiserver.ResourceWatcher.
func (storage *ControllerRegistryStorage) WatchSingle(id string, resourceVersion uint64) (watch.Interface, error) {
return nil, errors.New("unimplemented")
}

View File

@ -329,9 +329,9 @@ func TestWatchInterpretation_ListCreate(t *testing.T) {
w := newEtcdWatcher(true, func(interface{}) bool { w := newEtcdWatcher(true, func(interface{}) bool {
t.Errorf("unexpected filter call") t.Errorf("unexpected filter call")
return true return true
}, encoding) }, codec)
pod := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}} pod := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}}
podBytes, _ := encoding.Encode(pod) podBytes, _ := codec.Encode(pod)
go w.sendResult(&etcd.Response{ go w.sendResult(&etcd.Response{
Action: "create", Action: "create",