From d167c11b59fceac66e1901d9c7718b90f09563b8 Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Mon, 9 Feb 2015 09:47:13 -0500 Subject: [PATCH 1/5] Remove layers of indirection between apiinstaller and resthandler Make the RESTHandler feel more go-restful, set the stage for adding new types of subresource collections. --- pkg/api/errors/errors.go | 17 + pkg/apiserver/api_installer.go | 144 ++++++-- pkg/apiserver/apiserver.go | 59 +-- pkg/apiserver/apiserver_test.go | 210 ++++++++++- pkg/apiserver/errors.go | 16 +- pkg/apiserver/interfaces.go | 10 +- pkg/apiserver/resthandler.go | 581 ++++++++++++++++-------------- pkg/apiserver/resthandler_test.go | 69 ---- pkg/apiserver/watch.go | 18 +- test/integration/auth_test.go | 67 ++-- 10 files changed, 747 insertions(+), 444 deletions(-) delete mode 100644 pkg/apiserver/resthandler_test.go diff --git a/pkg/api/errors/errors.go b/pkg/api/errors/errors.go index e4345761ab1..6507ed8082c 100644 --- a/pkg/api/errors/errors.go +++ b/pkg/api/errors/errors.go @@ -29,6 +29,12 @@ import ( const ( StatusUnprocessableEntity = 422 StatusTooManyRequests = 429 + // HTTP recommendations are for servers to define 5xx error codes + // for scenarios not covered by behavior. In this case, TryAgainLater + // is an indication that a transient server error has occured and the + // client *should* retry, with an optional Retry-After header to specify + // the back off window. + StatusTryAgainLater = 504 ) // StatusError is an error intended for consumption by a REST API server; it can also be @@ -202,6 +208,17 @@ func NewInternalError(err error) error { }} } +// NewTimeoutError returns an error indicating that a timeout occurred before the request +// could be completed. Clients may retry, but the operation may still complete. +func NewTimeoutError(message string) error { + return &StatusError{api.Status{ + Status: api.StatusFailure, + Code: StatusTryAgainLater, + Reason: api.StatusReasonTimeout, + Message: fmt.Sprintf("Timeout: %s", message), + }} +} + // IsNotFound returns true if the specified error was created by NewNotFoundErr. func IsNotFound(err error) bool { return reasonForError(err) == api.StatusReasonNotFound diff --git a/pkg/apiserver/api_installer.go b/pkg/apiserver/api_installer.go index c0b19bb7a00..7ae84677fd0 100644 --- a/pkg/apiserver/api_installer.go +++ b/pkg/apiserver/api_installer.go @@ -17,20 +17,24 @@ limitations under the License. package apiserver import ( + "fmt" "net/http" + "net/url" + gpath "path" "reflect" + "strings" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/meta" + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/emicklei/go-restful" ) type APIInstaller struct { - prefix string // Path prefix where API resources are to be registered. - version string // The API version being installed. - restHandler *RESTHandler - mapper meta.RESTMapper + group *APIGroupVersion + prefix string // Path prefix where API resources are to be registered. + version string // The API version being installed. } // Struct capturing information about an action ("GET", "POST", "WATCH", PROXY", etc). @@ -49,16 +53,16 @@ func (a *APIInstaller) Install() (ws *restful.WebService, errors []error) { // Initialize the custom handlers. watchHandler := (&WatchHandler{ - storage: a.restHandler.storage, - codec: a.restHandler.codec, - canonicalPrefix: a.restHandler.canonicalPrefix, - selfLinker: a.restHandler.selfLinker, - apiRequestInfoResolver: a.restHandler.apiRequestInfoResolver, + storage: a.group.storage, + codec: a.group.codec, + prefix: a.group.prefix, + linker: a.group.linker, + info: a.group.info, }) - redirectHandler := (&RedirectHandler{a.restHandler.storage, a.restHandler.codec, a.restHandler.apiRequestInfoResolver}) - proxyHandler := (&ProxyHandler{a.prefix + "/proxy/", a.restHandler.storage, a.restHandler.codec, a.restHandler.apiRequestInfoResolver}) + redirectHandler := (&RedirectHandler{a.group.storage, a.group.codec, a.group.info}) + proxyHandler := (&ProxyHandler{a.prefix + "/proxy/", a.group.storage, a.group.codec, a.group.info}) - for path, storage := range a.restHandler.storage { + for path, storage := range a.group.storage { if err := a.registerResourceHandlers(path, storage, ws, watchHandler, redirectHandler, proxyHandler); err != nil { errors = append(errors, err) } @@ -78,8 +82,11 @@ func (a *APIInstaller) newWebService() *restful.WebService { } func (a *APIInstaller) registerResourceHandlers(path string, storage RESTStorage, ws *restful.WebService, watchHandler http.Handler, redirectHandler http.Handler, proxyHandler http.Handler) error { - // Handler for standard REST verbs (GET, PUT, POST and DELETE). - restVerbHandler := restfulStripPrefix(a.prefix, a.restHandler) + codec := a.group.codec + admit := a.group.admit + linker := a.group.linker + resource := path + object := storage.New() // TODO: add scheme to APIInstaller rather than using api.Scheme _, kind, err := api.Scheme.ObjectVersionAndKind(object) @@ -103,28 +110,31 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage RESTStorage versionedList = indirectArbitraryPointer(versionedListPtr) } - mapping, err := a.mapper.RESTMapping(kind, a.version) + mapping, err := a.group.mapper.RESTMapping(kind, a.version) if err != nil { return err } // what verbs are supported by the storage, used to know what verbs we support per path storageVerbs := map[string]bool{} - if _, ok := storage.(RESTCreater); ok { - // Handler for standard REST verbs (GET, PUT, POST and DELETE). + creater, ok := storage.(RESTCreater) + if ok { storageVerbs["RESTCreater"] = true } - if _, ok := storage.(RESTLister); ok { - // Handler for standard REST verbs (GET, PUT, POST and DELETE). + lister, ok := storage.(RESTLister) + if ok { storageVerbs["RESTLister"] = true } - if _, ok := storage.(RESTGetter); ok { + getter, ok := storage.(RESTGetter) + if ok { storageVerbs["RESTGetter"] = true } - if _, ok := storage.(RESTDeleter); ok { + deleter, ok := storage.(RESTDeleter) + if ok { storageVerbs["RESTDeleter"] = true } - if _, ok := storage.(RESTUpdater); ok { + updater, ok := storage.(RESTUpdater) + if ok { storageVerbs["RESTUpdater"] = true } if _, ok := storage.(ResourceWatcher); ok { @@ -134,6 +144,14 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage RESTStorage storageVerbs["Redirector"] = true } + var namespaceFn ResourceNamespaceFunc + var nameFn ResourceNameFunc + var generateLinkFn linkFunc + var objNameFn ObjectNameFunc + linkFn := func(req *restful.Request, obj runtime.Object) error { + return setSelfLink(obj, req.Request, a.group.linker, generateLinkFn) + } + allowWatchList := storageVerbs["ResourceWatcher"] && storageVerbs["RESTLister"] // watching on lists is allowed only for kinds that support both watch and list. scope := mapping.Scope nameParam := ws.PathParameter("name", "name of the "+kind).DataType("string") @@ -141,6 +159,11 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage RESTStorage actions := []action{} // Get the list of actions for the given scope. if scope.Name() != meta.RESTScopeNameNamespace { + objNameFn = func(obj runtime.Object) (namespace, name string, err error) { + name, err = linker.Name(obj) + return + } + // Handler for standard REST verbs (GET, PUT, POST and DELETE). actions = appendIf(actions, action{"LIST", path, params}, storageVerbs["RESTLister"]) actions = appendIf(actions, action{"POST", path, params}, storageVerbs["RESTCreater"]) @@ -148,6 +171,19 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage RESTStorage itemPath := path + "/{name}" nameParams := append(params, nameParam) + namespaceFn = func(req *restful.Request) (namespace string, err error) { + return + } + nameFn = func(req *restful.Request) (namespace, name string, err error) { + name = req.PathParameter("name") + return + } + generateLinkFn = func(namespace, name string) (path string, query string) { + path = strings.Replace(itemPath, "{name}", name, 1) + path = gpath.Join(a.prefix, path) + return + } + actions = appendIf(actions, action{"GET", itemPath, nameParams}, storageVerbs["RESTGetter"]) actions = appendIf(actions, action{"PUT", itemPath, nameParams}, storageVerbs["RESTUpdater"]) actions = appendIf(actions, action{"DELETE", itemPath, nameParams}, storageVerbs["RESTDeleter"]) @@ -156,18 +192,46 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage RESTStorage actions = appendIf(actions, action{"PROXY", "/proxy/" + itemPath + "/{path:*}", nameParams}, storageVerbs["Redirector"]) actions = appendIf(actions, action{"PROXY", "/proxy/" + itemPath, nameParams}, storageVerbs["Redirector"]) } else { + objNameFn = func(obj runtime.Object) (namespace, name string, err error) { + if name, err = linker.Name(obj); err != nil { + return + } + namespace, err = linker.Namespace(obj) + return + } + // v1beta3 format with namespace in path if scope.ParamPath() { // Handler for standard REST verbs (GET, PUT, POST and DELETE). namespaceParam := ws.PathParameter(scope.ParamName(), scope.ParamDescription()).DataType("string") namespacedPath := scope.ParamName() + "/{" + scope.ParamName() + "}/" + path namespaceParams := []*restful.Parameter{namespaceParam} + namespaceFn = func(req *restful.Request) (namespace string, err error) { + namespace = req.PathParameter(scope.ParamName()) + if len(namespace) == 0 { + namespace = api.NamespaceDefault + } + return + } + actions = appendIf(actions, action{"LIST", namespacedPath, namespaceParams}, storageVerbs["RESTLister"]) actions = appendIf(actions, action{"POST", namespacedPath, namespaceParams}, storageVerbs["RESTCreater"]) actions = appendIf(actions, action{"WATCHLIST", "/watch/" + namespacedPath, namespaceParams}, allowWatchList) itemPath := namespacedPath + "/{name}" nameParams := append(namespaceParams, nameParam) + nameFn = func(req *restful.Request) (namespace, name string, err error) { + namespace, _ = namespaceFn(req) + name = req.PathParameter("name") + return + } + generateLinkFn = func(namespace, name string) (path string, query string) { + path = strings.Replace(itemPath, "{name}", name, 1) + path = strings.Replace(path, "{"+scope.ParamName()+"}", namespace, 1) + path = gpath.Join(a.prefix, path) + return + } + actions = appendIf(actions, action{"GET", itemPath, nameParams}, storageVerbs["RESTGetter"]) actions = appendIf(actions, action{"PUT", itemPath, nameParams}, storageVerbs["RESTUpdater"]) actions = appendIf(actions, action{"DELETE", itemPath, nameParams}, storageVerbs["RESTDeleter"]) @@ -184,12 +248,36 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage RESTStorage // v1beta1/v1beta2 format where namespace was a query parameter namespaceParam := ws.QueryParameter(scope.ParamName(), scope.ParamDescription()).DataType("string") namespaceParams := []*restful.Parameter{namespaceParam} + namespaceFn = func(req *restful.Request) (namespace string, err error) { + namespace = req.QueryParameter(scope.ParamName()) + if len(namespace) == 0 { + namespace = api.NamespaceDefault + } + return + } + actions = appendIf(actions, action{"LIST", path, namespaceParams}, storageVerbs["RESTLister"]) actions = appendIf(actions, action{"POST", path, namespaceParams}, storageVerbs["RESTCreater"]) actions = appendIf(actions, action{"WATCHLIST", "/watch/" + path, namespaceParams}, allowWatchList) itemPath := path + "/{name}" nameParams := append(namespaceParams, nameParam) + nameFn = func(req *restful.Request) (namespace, name string, err error) { + namespace, _ = namespaceFn(req) + name = req.PathParameter("name") + return + } + generateLinkFn = func(namespace, name string) (path string, query string) { + path = strings.Replace(itemPath, "{name}", name, -1) + path = gpath.Join(a.prefix, path) + if len(namespace) > 0 { + values := make(url.Values) + values.Set(scope.ParamName(), namespace) + query = values.Encode() + } + return + } + actions = appendIf(actions, action{"GET", itemPath, nameParams}, storageVerbs["RESTGetter"]) actions = appendIf(actions, action{"PUT", itemPath, nameParams}, storageVerbs["RESTUpdater"]) actions = appendIf(actions, action{"DELETE", itemPath, nameParams}, storageVerbs["RESTDeleter"]) @@ -220,35 +308,35 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage RESTStorage for _, action := range actions { switch action.Verb { case "GET": // Get a resource. - route := ws.GET(action.Path).To(restVerbHandler). + route := ws.GET(action.Path).To(GetResource(getter, nameFn, linkFn, codec)). Doc("read the specified " + kind). Operation("read" + kind). Writes(versionedObject) addParams(route, action.Params) ws.Route(route) case "LIST": // List all resources of a kind. - route := ws.GET(action.Path).To(restVerbHandler). + route := ws.GET(action.Path).To(ListResource(lister, namespaceFn, linkFn, codec)). Doc("list objects of kind " + kind). Operation("list" + kind). Writes(versionedList) addParams(route, action.Params) ws.Route(route) case "PUT": // Update a resource. - route := ws.PUT(action.Path).To(restVerbHandler). + route := ws.PUT(action.Path).To(UpdateResource(updater, nameFn, objNameFn, linkFn, codec, resource, admit)). Doc("update the specified " + kind). Operation("update" + kind). Reads(versionedObject) addParams(route, action.Params) ws.Route(route) case "POST": // Create a resource. - route := ws.POST(action.Path).To(restVerbHandler). + route := ws.POST(action.Path).To(CreateResource(creater, namespaceFn, linkFn, codec, resource, admit)). Doc("create a " + kind). Operation("create" + kind). Reads(versionedObject) addParams(route, action.Params) ws.Route(route) case "DELETE": // Delete a resource. - route := ws.DELETE(action.Path).To(restVerbHandler). + route := ws.DELETE(action.Path).To(DeleteResource(deleter, nameFn, linkFn, codec, resource, kind, admit)). Doc("delete a " + kind). Operation("delete" + kind) addParams(route, action.Params) @@ -281,6 +369,8 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage RESTStorage addProxyRoute(ws, "PUT", a.prefix, action.Path, proxyHandler, kind, action.Params) addProxyRoute(ws, "POST", a.prefix, action.Path, proxyHandler, kind, action.Params) addProxyRoute(ws, "DELETE", a.prefix, action.Path, proxyHandler, kind, action.Params) + default: + return fmt.Errorf("unrecognized action verb: %s", action.Verb) } // Note: update GetAttribs() when adding a custom handler. } diff --git a/pkg/apiserver/apiserver.go b/pkg/apiserver/apiserver.go index 738639eb257..682634ca034 100644 --- a/pkg/apiserver/apiserver.go +++ b/pkg/apiserver/apiserver.go @@ -89,9 +89,9 @@ type defaultAPIServer struct { // as RESTful resources at prefix, serialized by codec, and also includes the support // http resources. // Note: This method is used only in tests. -func Handle(storage map[string]RESTStorage, codec runtime.Codec, root string, version string, selfLinker runtime.SelfLinker, admissionControl admission.Interface, mapper meta.RESTMapper) http.Handler { - prefix := root + "/" + version - group := NewAPIGroupVersion(storage, codec, root, prefix, selfLinker, admissionControl, mapper) +func Handle(storage map[string]RESTStorage, codec runtime.Codec, root string, version string, linker runtime.SelfLinker, admissionControl admission.Interface, mapper meta.RESTMapper) http.Handler { + prefix := path.Join(root, version) + group := NewAPIGroupVersion(storage, codec, root, prefix, linker, admissionControl, mapper) container := restful.NewContainer() container.Router(restful.CurlyRouter{}) mux := container.ServeMux @@ -102,16 +102,19 @@ func Handle(storage map[string]RESTStorage, codec runtime.Codec, root string, ve return &defaultAPIServer{mux, group} } -// TODO: This is a whole API version right now. Maybe should rename it. -// APIGroupVersion is a http.Handler that exposes multiple RESTStorage objects +// APIGroupVersion is a helper for exposing RESTStorage objects as http.Handlers via go-restful // It handles URLs of the form: // /${storage_key}[/${object_name}] // Where 'storage_key' points to a RESTStorage object stored in storage. -// -// TODO: consider migrating this to go-restful which is a more full-featured version of the same thing. type APIGroupVersion struct { - handler RESTHandler + storage map[string]RESTStorage + codec runtime.Codec + prefix string + linker runtime.SelfLinker + admit admission.Interface mapper meta.RESTMapper + // TODO: put me into a cleaner interface + info *APIRequestInfoResolver } // NewAPIGroupVersion returns an object that will serve a set of REST resources and their @@ -119,18 +122,15 @@ type APIGroupVersion struct { // This is a helper method for registering multiple sets of REST handlers under different // prefixes onto a server. // TODO: add multitype codec serialization -func NewAPIGroupVersion(storage map[string]RESTStorage, codec runtime.Codec, apiRoot, canonicalPrefix string, selfLinker runtime.SelfLinker, admissionControl admission.Interface, mapper meta.RESTMapper) *APIGroupVersion { +func NewAPIGroupVersion(storage map[string]RESTStorage, codec runtime.Codec, root, prefix string, linker runtime.SelfLinker, admissionControl admission.Interface, mapper meta.RESTMapper) *APIGroupVersion { return &APIGroupVersion{ - handler: RESTHandler{ - storage: storage, - codec: codec, - canonicalPrefix: canonicalPrefix, - selfLinker: selfLinker, - ops: NewOperations(), - admissionControl: admissionControl, - apiRequestInfoResolver: &APIRequestInfoResolver{util.NewStringSet(apiRoot), latest.RESTMapper}, - }, - mapper: mapper, + storage: storage, + codec: codec, + prefix: prefix, + linker: linker, + admit: admissionControl, + mapper: mapper, + info: &APIRequestInfoResolver{util.NewStringSet(root), latest.RESTMapper}, } } @@ -139,7 +139,12 @@ func NewAPIGroupVersion(storage map[string]RESTStorage, codec runtime.Codec, api // in a slash. A restful WebService is created for the group and version. func (g *APIGroupVersion) InstallREST(container *restful.Container, root string, version string) error { prefix := path.Join(root, version) - ws, registrationErrors := (&APIInstaller{prefix, version, &g.handler, g.mapper}).Install() + installer := &APIInstaller{ + group: g, + prefix: prefix, + version: version, + } + ws, registrationErrors := installer.Install() container.Add(ws) return errors.NewAggregate(registrationErrors) } @@ -186,15 +191,15 @@ func AddApiWebService(container *restful.Container, apiPrefix string, versions [ // TODO: InstallREST should register each version automatically versionHandler := APIVersionHandler(versions[:]...) - getApiVersionsWebService := new(restful.WebService) - getApiVersionsWebService.Path(apiPrefix) - getApiVersionsWebService.Doc("get available api versions") - getApiVersionsWebService.Route(getApiVersionsWebService.GET("/").To(versionHandler). - Doc("get available api versions"). - Operation("getApiVersions"). + ws := new(restful.WebService) + ws.Path(apiPrefix) + ws.Doc("get available API versions") + ws.Route(ws.GET("/").To(versionHandler). + Doc("get available API versions"). + Operation("getAPIVersions"). Produces(restful.MIME_JSON). Consumes(restful.MIME_JSON)) - container.Add(getApiVersionsWebService) + container.Add(ws) } // handleVersion writes the server's version information. diff --git a/pkg/apiserver/apiserver_test.go b/pkg/apiserver/apiserver_test.go index 2adca83fbce..f00e66076a7 100644 --- a/pkg/apiserver/apiserver_test.go +++ b/pkg/apiserver/apiserver_test.go @@ -489,7 +489,9 @@ func TestGet(t *testing.T) { } selfLinker := &setTestSelfLinker{ t: t, - expectedSet: "/prefix/version/simple/id", + expectedSet: "/prefix/version/simple/id?namespace=default", + name: "id", + namespace: "default", } storage["simple"] = &simpleStorage handler := Handle(storage, codec, "/prefix", testVersion, selfLinker, admissionControl, mapper) @@ -497,6 +499,12 @@ func TestGet(t *testing.T) { defer server.Close() resp, err := http.Get(server.URL + "/prefix/version/simple/id") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if resp.StatusCode != http.StatusOK { + t.Fatalf("unexpected response: %#v", resp) + } var itemOut Simple body, err := extractBody(resp, &itemOut) if err != nil { @@ -511,6 +519,81 @@ func TestGet(t *testing.T) { } } +func TestGetAlternateSelfLink(t *testing.T) { + storage := map[string]RESTStorage{} + simpleStorage := SimpleRESTStorage{ + item: Simple{ + Other: "foo", + }, + } + selfLinker := &setTestSelfLinker{ + t: t, + expectedSet: "/prefix/version/simple/id?namespace=test", + name: "id", + namespace: "test", + } + storage["simple"] = &simpleStorage + handler := Handle(storage, codec, "/prefix", testVersion, selfLinker, admissionControl, legacyNamespaceMapper) + server := httptest.NewServer(handler) + defer server.Close() + + resp, err := http.Get(server.URL + "/prefix/version/simple/id?namespace=test") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if resp.StatusCode != http.StatusOK { + t.Fatalf("unexpected response: %#v", resp) + } + var itemOut Simple + body, err := extractBody(resp, &itemOut) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if itemOut.Name != simpleStorage.item.Name { + t.Errorf("Unexpected data: %#v, expected %#v (%s)", itemOut, simpleStorage.item, string(body)) + } + if !selfLinker.called { + t.Errorf("Never set self link") + } +} + +func TestGetNamespaceSelfLink(t *testing.T) { + storage := map[string]RESTStorage{} + simpleStorage := SimpleRESTStorage{ + item: Simple{ + Other: "foo", + }, + } + selfLinker := &setTestSelfLinker{ + t: t, + expectedSet: "/prefix/version/namespaces/foo/simple/id", + name: "id", + namespace: "foo", + } + storage["simple"] = &simpleStorage + handler := Handle(storage, codec, "/prefix", testVersion, selfLinker, admissionControl, namespaceMapper) + server := httptest.NewServer(handler) + defer server.Close() + + resp, err := http.Get(server.URL + "/prefix/version/namespaces/foo/simple/id") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if resp.StatusCode != http.StatusOK { + t.Fatalf("unexpected response: %#v", resp) + } + var itemOut Simple + body, err := extractBody(resp, &itemOut) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if itemOut.Name != simpleStorage.item.Name { + t.Errorf("Unexpected data: %#v, expected %#v (%s)", itemOut, simpleStorage.item, string(body)) + } + if !selfLinker.called { + t.Errorf("Never set self link") + } +} func TestGetMissing(t *testing.T) { storage := map[string]RESTStorage{} simpleStorage := SimpleRESTStorage{ @@ -542,11 +625,13 @@ func TestDelete(t *testing.T) { client := http.Client{} request, err := http.NewRequest("DELETE", server.URL+"/prefix/version/simple/"+ID, nil) - _, err = client.Do(request) + res, err := client.Do(request) if err != nil { - t.Errorf("unexpected error: %v", err) + t.Fatalf("unexpected error: %v", err) + } + if res.StatusCode != http.StatusOK { + t.Errorf("unexpected response: %#v", res) } - if simpleStorage.deleted != ID { t.Errorf("Unexpected delete: %s, expected %s", simpleStorage.deleted, ID) } @@ -602,13 +687,19 @@ func TestUpdate(t *testing.T) { storage["simple"] = &simpleStorage selfLinker := &setTestSelfLinker{ t: t, - expectedSet: "/prefix/version/simple/" + ID, + expectedSet: "/prefix/version/simple/" + ID + "?namespace=default", + name: ID, + namespace: api.NamespaceDefault, } handler := Handle(storage, codec, "/prefix", testVersion, selfLinker, admissionControl, mapper) server := httptest.NewServer(handler) defer server.Close() item := &Simple{ + ObjectMeta: api.ObjectMeta{ + Name: ID, + Namespace: "", // update should allow the client to send an empty namespace + }, Other: "bar", } body, err := codec.Encode(item) @@ -637,15 +728,15 @@ func TestUpdateInvokesAdmissionControl(t *testing.T) { simpleStorage := SimpleRESTStorage{} ID := "id" storage["simple"] = &simpleStorage - selfLinker := &setTestSelfLinker{ - t: t, - expectedSet: "/prefix/version/simple/" + ID, - } handler := Handle(storage, codec, "/prefix", testVersion, selfLinker, deny.NewAlwaysDeny(), mapper) server := httptest.NewServer(handler) defer server.Close() item := &Simple{ + ObjectMeta: api.ObjectMeta{ + Name: ID, + Namespace: api.NamespaceDefault, + }, Other: "bar", } body, err := codec.Encode(item) @@ -665,6 +756,100 @@ func TestUpdateInvokesAdmissionControl(t *testing.T) { } } +func TestUpdateRequiresMatchingName(t *testing.T) { + storage := map[string]RESTStorage{} + simpleStorage := SimpleRESTStorage{} + ID := "id" + storage["simple"] = &simpleStorage + handler := Handle(storage, codec, "/prefix", testVersion, selfLinker, deny.NewAlwaysDeny(), mapper) + server := httptest.NewServer(handler) + defer server.Close() + + item := &Simple{ + Other: "bar", + } + body, err := codec.Encode(item) + if err != nil { + // The following cases will fail, so die now + t.Fatalf("unexpected error: %v", err) + } + + client := http.Client{} + request, err := http.NewRequest("PUT", server.URL+"/prefix/version/simple/"+ID, bytes.NewReader(body)) + response, err := client.Do(request) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if response.StatusCode != http.StatusBadRequest { + t.Errorf("Unexpected response %#v", response) + } +} + +func TestUpdateAllowsMissingNamespace(t *testing.T) { + storage := map[string]RESTStorage{} + simpleStorage := SimpleRESTStorage{} + ID := "id" + storage["simple"] = &simpleStorage + handler := Handle(storage, codec, "/prefix", testVersion, selfLinker, admissionControl, mapper) + server := httptest.NewServer(handler) + defer server.Close() + + item := &Simple{ + ObjectMeta: api.ObjectMeta{ + Name: ID, + }, + Other: "bar", + } + body, err := codec.Encode(item) + if err != nil { + // The following cases will fail, so die now + t.Fatalf("unexpected error: %v", err) + } + + client := http.Client{} + request, err := http.NewRequest("PUT", server.URL+"/prefix/version/simple/"+ID, bytes.NewReader(body)) + response, err := client.Do(request) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if response.StatusCode != http.StatusOK { + t.Errorf("Unexpected response %#v", response) + } +} + +func TestUpdatePreventsMismatchedNamespace(t *testing.T) { + storage := map[string]RESTStorage{} + simpleStorage := SimpleRESTStorage{} + ID := "id" + storage["simple"] = &simpleStorage + handler := Handle(storage, codec, "/prefix", testVersion, selfLinker, admissionControl, mapper) + server := httptest.NewServer(handler) + defer server.Close() + + item := &Simple{ + ObjectMeta: api.ObjectMeta{ + Name: ID, + Namespace: "other", + }, + Other: "bar", + } + body, err := codec.Encode(item) + if err != nil { + // The following cases will fail, so die now + t.Fatalf("unexpected error: %v", err) + } + + client := http.Client{} + request, err := http.NewRequest("PUT", server.URL+"/prefix/version/simple/"+ID, bytes.NewReader(body)) + response, err := client.Do(request) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if response.StatusCode != http.StatusBadRequest { + t.Errorf("Unexpected response %#v", response) + } +} + func TestUpdateMissing(t *testing.T) { storage := map[string]RESTStorage{} ID := "id" @@ -677,6 +862,10 @@ func TestUpdateMissing(t *testing.T) { defer server.Close() item := &Simple{ + ObjectMeta: api.ObjectMeta{ + Name: ID, + Namespace: api.NamespaceDefault, + }, Other: "bar", } body, err := codec.Encode(item) @@ -690,7 +879,6 @@ func TestUpdateMissing(t *testing.T) { if err != nil { t.Errorf("unexpected error: %v", err) } - if response.StatusCode != http.StatusNotFound { t.Errorf("Unexpected response %#v", response) } @@ -961,7 +1149,7 @@ func TestCreateTimeout(t *testing.T) { simple := &Simple{Other: "foo"} data, _ := codec.Encode(simple) - itemOut := expectApiStatus(t, "POST", server.URL+"/prefix/version/foo?timeout=4ms", data, http.StatusAccepted) + itemOut := expectApiStatus(t, "POST", server.URL+"/prefix/version/foo?timeout=4ms", data, apierrs.StatusTryAgainLater) if itemOut.Status != api.StatusFailure || itemOut.Reason != api.StatusReasonTimeout { t.Errorf("Unexpected status %#v", itemOut) } diff --git a/pkg/apiserver/errors.go b/pkg/apiserver/errors.go index 91b7e564147..a05d7afffb7 100644 --- a/pkg/apiserver/errors.go +++ b/pkg/apiserver/errors.go @@ -35,7 +35,21 @@ func errToAPIStatus(err error) *api.Status { switch t := err.(type) { case statusError: status := t.Status() - status.Status = api.StatusFailure + if len(status.Status) == 0 { + } + switch status.Status { + case api.StatusSuccess: + if status.Code == 0 { + status.Code = http.StatusOK + } + case "": + status.Status = api.StatusFailure + fallthrough + case api.StatusFailure: + if status.Code == 0 { + status.Code = http.StatusInternalServerError + } + } //TODO: check for invalid responses return &status default: diff --git a/pkg/apiserver/interfaces.go b/pkg/apiserver/interfaces.go index 94f58a6747e..4bf2329e00f 100644 --- a/pkg/apiserver/interfaces.go +++ b/pkg/apiserver/interfaces.go @@ -56,14 +56,22 @@ type RESTDeleter interface { } type RESTCreater interface { + // New returns an empty object that can be used with Create after request data has been put into it. + // This object must be a pointer type for use with Codec.DecodeInto([]byte, runtime.Object) + New() runtime.Object + // Create creates a new version of a resource. Create(ctx api.Context, obj runtime.Object) (<-chan RESTResult, error) } type RESTUpdater interface { + // New returns an empty object that can be used with Update after request data has been put into it. + // This object must be a pointer type for use with Codec.DecodeInto([]byte, runtime.Object) + New() runtime.Object + // Update finds a resource in the storage and updates it. Some implementations // may allow updates creates the object - they should set the Created flag of - // the returned RESTResultto true. In the event of an asynchronous error returned + // the returned RESTResult to true. In the event of an asynchronous error returned // via an api.Status object, the Created flag is ignored. Update(ctx api.Context, obj runtime.Object) (<-chan RESTResult, error) } diff --git a/pkg/apiserver/resthandler.go b/pkg/apiserver/resthandler.go index e8cb3a35e92..11e725aa19e 100644 --- a/pkg/apiserver/resthandler.go +++ b/pkg/apiserver/resthandler.go @@ -17,8 +17,8 @@ limitations under the License. package apiserver import ( + "fmt" "net/http" - "path" "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/admission" @@ -27,73 +27,322 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" - "github.com/golang/glog" + "github.com/emicklei/go-restful" ) -// RESTHandler implements HTTP verbs on a set of RESTful resources identified by name. -type RESTHandler struct { - storage map[string]RESTStorage - codec runtime.Codec - canonicalPrefix string - selfLinker runtime.SelfLinker - ops *Operations - admissionControl admission.Interface - apiRequestInfoResolver *APIRequestInfoResolver +// ResourceNameFunc returns a name (and optional namespace) given a request - if no name is present +// an error must be returned. +type ResourceNameFunc func(req *restful.Request) (namespace, name string, err error) + +// ObjectNameFunc returns the name (and optional namespace) of an object +type ObjectNameFunc func(obj runtime.Object) (namespace, name string, err error) + +// ResourceNamespaceFunc returns the namespace associated with the given request - if no namespace +// is present an error must be returned. +type ResourceNamespaceFunc func(req *restful.Request) (namespace string, err error) + +// LinkResourceFunc updates the provided object with a SelfLink that is appropriate for the current +// request. +type LinkResourceFunc func(req *restful.Request, obj runtime.Object) error + +// GetResource returns a function that handles retrieving a single resource from a RESTStorage object. +func GetResource(r RESTGetter, nameFn ResourceNameFunc, linkFn LinkResourceFunc, codec runtime.Codec) restful.RouteFunction { + return func(req *restful.Request, res *restful.Response) { + w := res.ResponseWriter + namespace, name, err := nameFn(req) + if err != nil { + notFound(w, req.Request) + return + } + ctx := api.NewContext() + if len(namespace) > 0 { + ctx = api.WithNamespace(ctx, namespace) + } + item, err := r.Get(ctx, name) + if err != nil { + errorJSON(err, codec, w) + return + } + if err := linkFn(req, item); err != nil { + errorJSON(err, codec, w) + return + } + writeJSON(http.StatusOK, codec, item, w) + } } -// ServeHTTP handles requests to all RESTStorage objects. -func (h *RESTHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { - var verb string - var apiResource string - var httpCode int - reqStart := time.Now() - defer func() { monitor("rest", verb, apiResource, httpCode, reqStart) }() +// ListResource returns a function that handles retrieving a list of resources from a RESTStorage object. +func ListResource(r RESTLister, namespaceFn ResourceNamespaceFunc, linkFn LinkResourceFunc, codec runtime.Codec) restful.RouteFunction { + return func(req *restful.Request, res *restful.Response) { + w := res.ResponseWriter - requestInfo, err := h.apiRequestInfoResolver.GetAPIRequestInfo(req) - if err != nil { - glog.Errorf("Unable to handle request %s %s %v", requestInfo.Namespace, requestInfo.Kind, err) - notFound(w, req) - httpCode = http.StatusNotFound - return + namespace, err := namespaceFn(req) + if err != nil { + notFound(w, req.Request) + return + } + ctx := api.NewContext() + if len(namespace) > 0 { + ctx = api.WithNamespace(ctx, namespace) + } + label, err := labels.ParseSelector(req.Request.URL.Query().Get("labels")) + if err != nil { + errorJSON(err, codec, w) + return + } + field, err := labels.ParseSelector(req.Request.URL.Query().Get("fields")) + if err != nil { + errorJSON(err, codec, w) + return + } + + item, err := r.List(ctx, label, field) + if err != nil { + errorJSON(err, codec, w) + return + } + if err := linkFn(req, item); err != nil { + errorJSON(err, codec, w) + return + } + writeJSON(http.StatusOK, codec, item, w) } - verb = requestInfo.Verb - - storage, ok := h.storage[requestInfo.Resource] - if !ok { - notFound(w, req) - httpCode = http.StatusNotFound - return - } - apiResource = requestInfo.Resource - - httpCode = h.handleRESTStorage(requestInfo.Parts, req, w, storage, requestInfo.Namespace, requestInfo.Resource) } -// Sets the SelfLink field of the object. -func (h *RESTHandler) setSelfLink(obj runtime.Object, req *http.Request) error { - newURL := *req.URL - newURL.Path = path.Join(h.canonicalPrefix, req.URL.Path) - newURL.RawQuery = "" - newURL.Fragment = "" - namespace, err := h.selfLinker.Namespace(obj) +// CreateResource returns a function that will handle a resource creation. +func CreateResource(r RESTCreater, namespaceFn ResourceNamespaceFunc, linkFn LinkResourceFunc, codec runtime.Codec, resource string, admit admission.Interface) restful.RouteFunction { + return func(req *restful.Request, res *restful.Response) { + w := res.ResponseWriter + + // TODO: we either want to remove timeout or document it (if we document, move timeout out of this function and declare it in api_installer) + timeout := parseTimeout(req.Request.URL.Query().Get("timeout")) + + namespace, err := namespaceFn(req) + if err != nil { + notFound(w, req.Request) + return + } + ctx := api.NewContext() + if len(namespace) > 0 { + ctx = api.WithNamespace(ctx, namespace) + } + + body, err := readBody(req.Request) + if err != nil { + errorJSON(err, codec, w) + return + } + + obj := r.New() + if err := codec.DecodeInto(body, obj); err != nil { + errorJSON(err, codec, w) + return + } + + err = admit.Admit(admission.NewAttributesRecord(obj, namespace, resource, "CREATE")) + if err != nil { + errorJSON(err, codec, w) + return + } + + out, err := r.Create(ctx, obj) + if err != nil { + errorJSON(err, codec, w) + return + } + + result, err := finishRequest(out, timeout, codec) + if err != nil { + errorJSON(err, codec, w) + return + } + + item := result.Object + if err := linkFn(req, item); err != nil { + errorJSON(err, codec, w) + return + } + + status := http.StatusOK + if result.Created { + status = http.StatusCreated + } + writeJSON(status, codec, item, w) + } +} + +// UpdateResource returns a function that will handle a resource update +func UpdateResource(r RESTUpdater, nameFn ResourceNameFunc, objNameFunc ObjectNameFunc, linkFn LinkResourceFunc, codec runtime.Codec, resource string, admit admission.Interface) restful.RouteFunction { + return func(req *restful.Request, res *restful.Response) { + w := res.ResponseWriter + + // TODO: we either want to remove timeout or document it (if we document, move timeout out of this function and declare it in api_installer) + timeout := parseTimeout(req.Request.URL.Query().Get("timeout")) + + namespace, name, err := nameFn(req) + if err != nil { + notFound(w, req.Request) + return + } + ctx := api.NewContext() + if len(namespace) > 0 { + ctx = api.WithNamespace(ctx, namespace) + } + + body, err := readBody(req.Request) + if err != nil { + errorJSON(err, codec, w) + return + } + + obj := r.New() + if err := codec.DecodeInto(body, obj); err != nil { + errorJSON(err, codec, w) + return + } + + objNamespace, objName, err := objNameFunc(obj) + if err != nil { + errorJSON(err, codec, w) + return + } + if objName != name { + errorJSON(errors.NewBadRequest("the name of the object does not match the name on the URL"), codec, w) + return + } + if len(namespace) > 0 { + if len(objNamespace) > 0 && objNamespace != namespace { + errorJSON(errors.NewBadRequest("the namespace of the object does not match the namespace on the request"), codec, w) + return + } + } + + err = admit.Admit(admission.NewAttributesRecord(obj, namespace, resource, "UPDATE")) + if err != nil { + errorJSON(err, codec, w) + return + } + + out, err := r.Update(ctx, obj) + if err != nil { + errorJSON(err, codec, w) + return + } + + result, err := finishRequest(out, timeout, codec) + if err != nil { + errorJSON(err, codec, w) + return + } + + item := result.Object + if err := linkFn(req, item); err != nil { + errorJSON(err, codec, w) + return + } + + status := http.StatusOK + if result.Created { + status = http.StatusCreated + } + writeJSON(status, codec, item, w) + } +} + +// DeleteResource returns a function that will handle a resource deletion +func DeleteResource(r RESTDeleter, nameFn ResourceNameFunc, linkFn LinkResourceFunc, codec runtime.Codec, resource, kind string, admit admission.Interface) restful.RouteFunction { + return func(req *restful.Request, res *restful.Response) { + w := res.ResponseWriter + + // TODO: we either want to remove timeout or document it (if we document, move timeout out of this function and declare it in api_installer) + timeout := parseTimeout(req.Request.URL.Query().Get("timeout")) + + namespace, name, err := nameFn(req) + if err != nil { + notFound(w, req.Request) + return + } + ctx := api.NewContext() + if len(namespace) > 0 { + ctx = api.WithNamespace(ctx, namespace) + } + + err = admit.Admit(admission.NewAttributesRecord(nil, namespace, resource, "DELETE")) + if err != nil { + errorJSON(err, codec, w) + return + } + + out, err := r.Delete(ctx, name) + if err != nil { + errorJSON(err, codec, w) + return + } + + result, err := finishRequest(out, timeout, codec) + if err != nil { + errorJSON(err, codec, w) + return + } + + // if the RESTDeleter returns a nil object, fill out a status. Callers may return a valid + // object with the response. + item := result.Object + if item == nil { + item = &api.Status{ + Status: api.StatusSuccess, + Code: http.StatusOK, + Details: &api.StatusDetails{ + ID: name, + Kind: kind, + }, + } + } + writeJSON(http.StatusOK, codec, item, w) + } +} + +// finishRequest waits for the result channel to close or clear, and writes the appropriate response. +// Any api.Status object returned is considered an "error", which interrupts the normal response flow. +func finishRequest(ch <-chan RESTResult, timeout time.Duration, codec runtime.Codec) (*RESTResult, error) { + select { + case result, ok := <-ch: + if !ok { + // likely programming error + return nil, fmt.Errorf("operation channel closed without returning result") + } + if status, ok := result.Object.(*api.Status); ok { + return nil, errors.FromObject(status) + } + return &result, nil + case <-time.After(timeout): + return nil, errors.NewTimeoutError("request did not complete within allowed duration") + } +} + +type linkFunc func(namespace, name string) (path string, query string) + +// setSelfLink sets the self link of an object (or the child items in a list) to the base URL of the request +// plus the path and query generated by the provided linkFunc +func setSelfLink(obj runtime.Object, req *http.Request, linker runtime.SelfLinker, fn linkFunc) error { + namespace, err := linker.Namespace(obj) if err != nil { return err } - - // we need to add namespace as a query param, if its not in the resource path - if len(namespace) > 0 { - parts := splitPath(req.URL.Path) - if parts[0] != "ns" { - query := newURL.Query() - query.Set("namespace", namespace) - newURL.RawQuery = query.Encode() - } - } - - err = h.selfLinker.SetSelfLink(obj, newURL.String()) + name, err := linker.Name(obj) if err != nil { return err } + path, query := fn(namespace, name) + + newURL := *req.URL + newURL.Path = path + newURL.RawQuery = query + newURL.Fragment = "" + + if err := linker.SetSelfLink(obj, newURL.String()); err != nil { + return err + } if !runtime.IsListType(obj) { return nil } @@ -104,231 +353,9 @@ func (h *RESTHandler) setSelfLink(obj runtime.Object, req *http.Request) error { return err } for i := range items { - if err := h.setSelfLinkAddName(items[i], req); err != nil { + if err := setSelfLink(items[i], req, linker, fn); err != nil { return err } } return runtime.SetList(obj, items) } - -// Like setSelfLink, but appends the object's name. -func (h *RESTHandler) setSelfLinkAddName(obj runtime.Object, req *http.Request) error { - name, err := h.selfLinker.Name(obj) - if err != nil { - return err - } - namespace, err := h.selfLinker.Namespace(obj) - if err != nil { - return err - } - newURL := *req.URL - newURL.Path = path.Join(h.canonicalPrefix, req.URL.Path, name) - newURL.RawQuery = "" - newURL.Fragment = "" - // we need to add namespace as a query param, if its not in the resource path - if len(namespace) > 0 { - parts := splitPath(req.URL.Path) - if parts[0] != "ns" { - query := newURL.Query() - query.Set("namespace", namespace) - newURL.RawQuery = query.Encode() - } - } - return h.selfLinker.SetSelfLink(obj, newURL.String()) -} - -// curry adapts either of the self link setting functions into a function appropriate for operation's hook. -func curry(f func(runtime.Object, *http.Request) error, req *http.Request) func(RESTResult) { - return func(obj RESTResult) { - if err := f(obj.Object, req); err != nil { - glog.Errorf("unable to set self link for %#v: %v", obj, err) - } - } -} - -// handleRESTStorage is the main dispatcher for a storage object. It switches on the HTTP method, and then -// on path length, according to the following table: -// Method Path Action -// GET /foo list -// GET /foo/bar get 'bar' -// POST /foo create -// PUT /foo/bar update 'bar' -// DELETE /foo/bar delete 'bar' -// Responds with a 404 if the method/pattern doesn't match one of these entries. -// The s accepts several query parameters: -// timeout= Timeout for synchronous requests -// labels= Used for filtering list operations -// Returns the HTTP status code written to the response. -func (h *RESTHandler) handleRESTStorage(parts []string, req *http.Request, w http.ResponseWriter, storage RESTStorage, namespace, kind string) int { - ctx := api.WithNamespace(api.NewContext(), namespace) - // TODO: Document the timeout query parameter. - timeout := parseTimeout(req.URL.Query().Get("timeout")) - switch req.Method { - case "GET": - switch len(parts) { - case 1: - label, err := labels.ParseSelector(req.URL.Query().Get("labels")) - if err != nil { - return errorJSON(err, h.codec, w) - } - field, err := labels.ParseSelector(req.URL.Query().Get("fields")) - if err != nil { - return errorJSON(err, h.codec, w) - } - lister, ok := storage.(RESTLister) - if !ok { - return errorJSON(errors.NewMethodNotSupported(kind, "list"), h.codec, w) - } - list, err := lister.List(ctx, label, field) - if err != nil { - return errorJSON(err, h.codec, w) - } - if err := h.setSelfLink(list, req); err != nil { - return errorJSON(err, h.codec, w) - } - writeJSON(http.StatusOK, h.codec, list, w) - case 2: - getter, ok := storage.(RESTGetter) - if !ok { - return errorJSON(errors.NewMethodNotSupported(kind, "get"), h.codec, w) - } - item, err := getter.Get(ctx, parts[1]) - if err != nil { - return errorJSON(err, h.codec, w) - } - if err := h.setSelfLink(item, req); err != nil { - return errorJSON(err, h.codec, w) - } - writeJSON(http.StatusOK, h.codec, item, w) - default: - notFound(w, req) - return http.StatusNotFound - } - - case "POST": - if len(parts) != 1 { - notFound(w, req) - return http.StatusNotFound - } - creater, ok := storage.(RESTCreater) - if !ok { - return errorJSON(errors.NewMethodNotSupported(kind, "create"), h.codec, w) - } - - body, err := readBody(req) - if err != nil { - return errorJSON(err, h.codec, w) - } - obj := storage.New() - err = h.codec.DecodeInto(body, obj) - if err != nil { - return errorJSON(err, h.codec, w) - } - - // invoke admission control - err = h.admissionControl.Admit(admission.NewAttributesRecord(obj, namespace, parts[0], "CREATE")) - if err != nil { - return errorJSON(err, h.codec, w) - } - - out, err := creater.Create(ctx, obj) - if err != nil { - return errorJSON(err, h.codec, w) - } - op := h.createOperation(out, timeout, curry(h.setSelfLinkAddName, req)) - return h.finishReq(op, req, w) - - case "DELETE": - if len(parts) != 2 { - notFound(w, req) - return http.StatusNotFound - } - deleter, ok := storage.(RESTDeleter) - if !ok { - return errorJSON(errors.NewMethodNotSupported(kind, "delete"), h.codec, w) - } - - // invoke admission control - err := h.admissionControl.Admit(admission.NewAttributesRecord(nil, namespace, parts[0], "DELETE")) - if err != nil { - return errorJSON(err, h.codec, w) - } - - out, err := deleter.Delete(ctx, parts[1]) - if err != nil { - return errorJSON(err, h.codec, w) - } - op := h.createOperation(out, timeout, nil) - return h.finishReq(op, req, w) - - case "PUT": - if len(parts) != 2 { - notFound(w, req) - return http.StatusNotFound - } - updater, ok := storage.(RESTUpdater) - if !ok { - return errorJSON(errors.NewMethodNotSupported(kind, "create"), h.codec, w) - } - - body, err := readBody(req) - if err != nil { - return errorJSON(err, h.codec, w) - } - obj := storage.New() - err = h.codec.DecodeInto(body, obj) - if err != nil { - return errorJSON(err, h.codec, w) - } - - // invoke admission control - err = h.admissionControl.Admit(admission.NewAttributesRecord(obj, namespace, parts[0], "UPDATE")) - if err != nil { - return errorJSON(err, h.codec, w) - } - - out, err := updater.Update(ctx, obj) - if err != nil { - return errorJSON(err, h.codec, w) - } - op := h.createOperation(out, timeout, curry(h.setSelfLink, req)) - return h.finishReq(op, req, w) - - default: - notFound(w, req) - return http.StatusNotFound - } - return http.StatusOK -} - -// createOperation creates an operation to process a channel response. -func (h *RESTHandler) createOperation(out <-chan RESTResult, timeout time.Duration, onReceive func(RESTResult)) *Operation { - op := h.ops.NewOperation(out, onReceive) - op.WaitFor(timeout) - return op -} - -// finishReq finishes up a request, waiting until the operation finishes or, after a timeout, creating an -// Operation to receive the result and returning its ID down the writer. -// Returns the HTTP status code written to the response. -func (h *RESTHandler) finishReq(op *Operation, req *http.Request, w http.ResponseWriter) int { - result, complete := op.StatusOrResult() - obj := result.Object - var status int - if complete { - status = http.StatusOK - if result.Created { - status = http.StatusCreated - } - switch stat := obj.(type) { - case *api.Status: - if stat.Code != 0 { - status = stat.Code - } - } - } else { - status = http.StatusAccepted - } - writeJSON(status, h.codec, obj, w) - return status -} diff --git a/pkg/apiserver/resthandler_test.go b/pkg/apiserver/resthandler_test.go deleted file mode 100644 index 2f471beda19..00000000000 --- a/pkg/apiserver/resthandler_test.go +++ /dev/null @@ -1,69 +0,0 @@ -/* -Copyright 2014 Google Inc. All rights reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package apiserver - -import ( - "encoding/json" - "net/http" - "net/http/httptest" - "testing" - "time" - - "github.com/GoogleCloudPlatform/kubernetes/pkg/api" -) - -func TestFinishReq(t *testing.T) { - handler := &RESTHandler{codec: api.Codec} - op := &Operation{finished: &time.Time{}, result: RESTResult{Object: &api.Status{Code: http.StatusNotFound}}} - resp := httptest.NewRecorder() - handler.finishReq(op, nil, resp) - status := &api.Status{} - if err := json.Unmarshal([]byte(resp.Body.String()), status); err != nil { - t.Fatalf("unexpected error: %v", err) - } - if resp.Code != http.StatusNotFound || status.Code != http.StatusNotFound { - t.Errorf("unexpected status: %#v", status) - } -} - -func TestFinishReqUnwrap(t *testing.T) { - handler := &RESTHandler{codec: api.Codec} - op := &Operation{finished: &time.Time{}, result: RESTResult{Created: true, Object: &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}}} - resp := httptest.NewRecorder() - handler.finishReq(op, nil, resp) - obj := &api.Pod{} - if err := json.Unmarshal([]byte(resp.Body.String()), obj); err != nil { - t.Fatalf("unexpected error: %v", err) - } - if resp.Code != http.StatusCreated || obj.Name != "foo" { - t.Errorf("unexpected object: %#v", obj) - } -} - -func TestFinishReqUnwrapStatus(t *testing.T) { - handler := &RESTHandler{codec: api.Codec} - op := &Operation{finished: &time.Time{}, result: RESTResult{Created: true, Object: &api.Status{Code: http.StatusNotFound}}} - resp := httptest.NewRecorder() - handler.finishReq(op, nil, resp) - obj := &api.Status{} - if err := json.Unmarshal([]byte(resp.Body.String()), obj); err != nil { - t.Fatalf("unexpected error: %v", err) - } - if resp.Code != http.StatusNotFound || obj.Code != http.StatusNotFound { - t.Errorf("unexpected object: %#v", obj) - } -} diff --git a/pkg/apiserver/watch.go b/pkg/apiserver/watch.go index 9f6578be991..ae3178504d0 100644 --- a/pkg/apiserver/watch.go +++ b/pkg/apiserver/watch.go @@ -37,24 +37,24 @@ import ( ) type WatchHandler struct { - storage map[string]RESTStorage - codec runtime.Codec - canonicalPrefix string - selfLinker runtime.SelfLinker - apiRequestInfoResolver *APIRequestInfoResolver + storage map[string]RESTStorage + codec runtime.Codec + prefix string + linker runtime.SelfLinker + info *APIRequestInfoResolver } // setSelfLinkAddName sets the self link, appending the object's name to the canonical path & type. func (h *WatchHandler) setSelfLinkAddName(obj runtime.Object, req *http.Request) error { - name, err := h.selfLinker.Name(obj) + name, err := h.linker.Name(obj) if err != nil { return err } newURL := *req.URL - newURL.Path = path.Join(h.canonicalPrefix, req.URL.Path, name) + newURL.Path = path.Join(h.prefix, req.URL.Path, name) newURL.RawQuery = "" newURL.Fragment = "" - return h.selfLinker.SetSelfLink(obj, newURL.String()) + return h.linker.SetSelfLink(obj, newURL.String()) } func getWatchParams(query url.Values) (label, field labels.Selector, resourceVersion string, err error) { @@ -96,7 +96,7 @@ func (h *WatchHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { return } - requestInfo, err := h.apiRequestInfoResolver.GetAPIRequestInfo(req) + requestInfo, err := h.info.GetAPIRequestInfo(req) if err != nil { notFound(w, req) httpCode = http.StatusNotFound diff --git a/test/integration/auth_test.go b/test/integration/auth_test.go index 397c2c4b020..63b66e51a08 100644 --- a/test/integration/auth_test.go +++ b/test/integration/auth_test.go @@ -78,6 +78,20 @@ var aPod string = ` }%s } ` +var aPodInBar string = ` +{ + "kind": "Pod", + "apiVersion": "v1beta1", + "id": "a", + "desiredState": { + "manifest": { + "version": "v1beta1", + "id": "a", + "containers": [{ "name": "foo", "image": "bar/foo", }] + } + }%s +} +` var aRC string = ` { "kind": "ReplicationController", @@ -126,7 +140,6 @@ var aEvent string = ` { "kind": "Event", "apiVersion": "v1beta1", - "namespace": "default", "id": "a", "involvedObject": { "kind": "Minion", @@ -316,14 +329,16 @@ func TestAuthModeAlwaysAllow(t *testing.T) { t.Logf("case %v", r) var bodyStr string if r.body != "" { - bodyStr = fmt.Sprintf(r.body, "") + sub := "" if r.verb == "PUT" && r.body != "" { // For update operations, insert previous resource version if resVersion := previousResourceVersion[getPreviousResourceVersionKey(r.URL, "")]; resVersion != 0 { - resourceVersionJson := fmt.Sprintf(",\r\n\"resourceVersion\": %v", resVersion) - bodyStr = fmt.Sprintf(r.body, resourceVersionJson) + sub += fmt.Sprintf(",\r\n\"resourceVersion\": %v", resVersion) } + namespace := "default" + sub += fmt.Sprintf(",\r\n\"namespace\": %v", namespace) } + bodyStr = fmt.Sprintf(r.body, sub) } bodyBytes := bytes.NewReader([]byte(bodyStr)) req, err := http.NewRequest(r.verb, s.URL+r.URL, bodyBytes) @@ -483,14 +498,16 @@ func TestAliceNotForbiddenOrUnauthorized(t *testing.T) { t.Logf("case %v", r) var bodyStr string if r.body != "" { - bodyStr = fmt.Sprintf(r.body, "") + sub := "" if r.verb == "PUT" && r.body != "" { // For update operations, insert previous resource version if resVersion := previousResourceVersion[getPreviousResourceVersionKey(r.URL, "")]; resVersion != 0 { - resourceVersionJson := fmt.Sprintf(",\r\n\"resourceVersion\": %v", resVersion) - bodyStr = fmt.Sprintf(r.body, resourceVersionJson) + sub += fmt.Sprintf(",\r\n\"resourceVersion\": %v", resVersion) } + namespace := "default" + sub += fmt.Sprintf(",\r\n\"namespace\": %v", namespace) } + bodyStr = fmt.Sprintf(r.body, sub) } bodyBytes := bytes.NewReader([]byte(bodyStr)) req, err := http.NewRequest(r.verb, s.URL+r.URL, bodyBytes) @@ -705,24 +722,25 @@ func TestNamespaceAuthorization(t *testing.T) { requests := []struct { verb string URL string + namespace string body string statusCodes map[int]bool // allowed status codes. }{ - {"POST", "/api/v1beta1/pods" + timeoutFlag + "&namespace=foo", aPod, code200}, - {"GET", "/api/v1beta1/pods?namespace=foo", "", code200}, - {"GET", "/api/v1beta1/pods/a?namespace=foo", "", code200}, - {"DELETE", "/api/v1beta1/pods/a" + timeoutFlag + "&namespace=foo", "", code200}, + {"POST", "/api/v1beta1/pods" + timeoutFlag + "&namespace=foo", "foo", aPod, code200}, + {"GET", "/api/v1beta1/pods?namespace=foo", "foo", "", code200}, + {"GET", "/api/v1beta1/pods/a?namespace=foo", "foo", "", code200}, + {"DELETE", "/api/v1beta1/pods/a" + timeoutFlag + "&namespace=foo", "foo", "", code200}, - {"POST", "/api/v1beta1/pods" + timeoutFlag + "&namespace=bar", aPod, code403}, - {"GET", "/api/v1beta1/pods?namespace=bar", "", code403}, - {"GET", "/api/v1beta1/pods/a?namespace=bar", "", code403}, - {"DELETE", "/api/v1beta1/pods/a" + timeoutFlag + "&namespace=bar", "", code403}, + {"POST", "/api/v1beta1/pods" + timeoutFlag + "&namespace=bar", "bar", aPod, code403}, + {"GET", "/api/v1beta1/pods?namespace=bar", "bar", "", code403}, + {"GET", "/api/v1beta1/pods/a?namespace=bar", "bar", "", code403}, + {"DELETE", "/api/v1beta1/pods/a" + timeoutFlag + "&namespace=bar", "bar", "", code403}, - {"POST", "/api/v1beta1/pods" + timeoutFlag, aPod, code403}, - {"GET", "/api/v1beta1/pods", "", code403}, - {"GET", "/api/v1beta1/pods/a", "", code403}, - {"DELETE", "/api/v1beta1/pods/a" + timeoutFlag, "", code403}, + {"POST", "/api/v1beta1/pods" + timeoutFlag, "", aPod, code403}, + {"GET", "/api/v1beta1/pods", "", "", code403}, + {"GET", "/api/v1beta1/pods/a", "", "", code403}, + {"DELETE", "/api/v1beta1/pods/a" + timeoutFlag, "", "", code403}, } for _, r := range requests { @@ -730,14 +748,19 @@ func TestNamespaceAuthorization(t *testing.T) { t.Logf("case %v", r) var bodyStr string if r.body != "" { - bodyStr = fmt.Sprintf(r.body, "") + sub := "" if r.verb == "PUT" && r.body != "" { // For update operations, insert previous resource version if resVersion := previousResourceVersion[getPreviousResourceVersionKey(r.URL, "")]; resVersion != 0 { - resourceVersionJson := fmt.Sprintf(",\r\n\"resourceVersion\": %v", resVersion) - bodyStr = fmt.Sprintf(r.body, resourceVersionJson) + sub += fmt.Sprintf(",\r\n\"resourceVersion\": %v", resVersion) } + namespace := r.namespace + if len(namespace) == 0 { + namespace = "default" + } + sub += fmt.Sprintf(",\r\n\"namespace\": %v", namespace) } + bodyStr = fmt.Sprintf(r.body, sub) } bodyBytes := bytes.NewReader([]byte(bodyStr)) req, err := http.NewRequest(r.verb, s.URL+r.URL, bodyBytes) From 79cb93002e7fbc9026e957c39e6e589d332bed2a Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Tue, 10 Feb 2015 09:26:26 -0500 Subject: [PATCH 2/5] Remove asynchronous channel on RESTStorage interfaces --- pkg/api/rest/resttest/resttest.go | 20 +++---- pkg/apiserver/apiserver_test.go | 44 +++++++-------- pkg/apiserver/async.go | 25 -------- pkg/apiserver/interfaces.go | 13 +++-- pkg/apiserver/resthandler.go | 94 ++++++++++++++++--------------- pkg/master/publish.go | 26 ++------- 6 files changed, 88 insertions(+), 134 deletions(-) diff --git a/pkg/api/rest/resttest/resttest.go b/pkg/api/rest/resttest/resttest.go index fa3fb7de03e..a5bceff4fd6 100644 --- a/pkg/api/rest/resttest/resttest.go +++ b/pkg/api/rest/resttest/resttest.go @@ -85,12 +85,12 @@ func (t *Tester) TestCreateResetsUserData(valid runtime.Object) { objectMeta.UID = "bad-uid" objectMeta.CreationTimestamp = now - channel, err := t.storage.(apiserver.RESTCreater).Create(api.NewDefaultContext(), valid) + obj, err := t.storage.(apiserver.RESTCreater).Create(api.NewDefaultContext(), valid) if err != nil { t.Fatalf("Unexpected error: %v", err) } - if obj := <-channel; obj.Object == nil { - t.Fatalf("Unexpected object from channel: %#v", obj) + if obj == nil { + t.Fatalf("Unexpected object from result: %#v", obj) } if objectMeta.UID == "bad-uid" || objectMeta.CreationTimestamp == now { t.Errorf("ObjectMeta did not reset basic fields: %#v", objectMeta) @@ -111,12 +111,12 @@ func (t *Tester) TestCreateHasMetadata(valid runtime.Object) { context = api.NewContext() } - channel, err := t.storage.(apiserver.RESTCreater).Create(context, valid) + obj, err := t.storage.(apiserver.RESTCreater).Create(context, valid) if err != nil { t.Fatalf("Unexpected error: %v", err) } - if obj := <-channel; obj.Object == nil { - t.Fatalf("Unexpected object from channel: %#v", obj) + if obj == nil { + t.Fatalf("Unexpected object from result: %#v", obj) } if !api.HasObjectMetaSystemFieldValues(objectMeta) { t.Errorf("storage did not populate object meta field values") @@ -148,12 +148,8 @@ func (t *Tester) TestCreateGeneratesNameReturnsTryAgain(valid runtime.Object) { objectMeta.GenerateName = "test-" t.withStorageError(errors.NewAlreadyExists("kind", "thing"), func() { - ch, err := t.storage.(apiserver.RESTCreater).Create(api.NewDefaultContext(), valid) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - res := <-ch - if err := errors.FromObject(res.Object); err == nil || !errors.IsTryAgainLater(err) { + _, err := t.storage.(apiserver.RESTCreater).Create(api.NewDefaultContext(), valid) + if err == nil || !errors.IsTryAgainLater(err) { t.Fatalf("Unexpected error: %v", err) } }) diff --git a/pkg/apiserver/apiserver_test.go b/pkg/apiserver/apiserver_test.go index f00e66076a7..55be37e4f30 100644 --- a/pkg/apiserver/apiserver_test.go +++ b/pkg/apiserver/apiserver_test.go @@ -180,17 +180,17 @@ func (storage *SimpleRESTStorage) Get(ctx api.Context, id string) (runtime.Objec return api.Scheme.CopyOrDie(&storage.item), storage.errors["get"] } -func (storage *SimpleRESTStorage) Delete(ctx api.Context, id string) (<-chan RESTResult, error) { +func (storage *SimpleRESTStorage) Delete(ctx api.Context, id string) (runtime.Object, error) { storage.deleted = id if err := storage.errors["delete"]; err != nil { return nil, err } - return MakeAsync(func() (runtime.Object, error) { - if storage.injectedFunction != nil { - return storage.injectedFunction(&Simple{ObjectMeta: api.ObjectMeta{Name: id}}) - } - return &api.Status{Status: api.StatusSuccess}, nil - }), nil + var obj runtime.Object = &api.Status{Status: api.StatusSuccess} + var err error + if storage.injectedFunction != nil { + obj, err = storage.injectedFunction(&Simple{ObjectMeta: api.ObjectMeta{Name: id}}) + } + return obj, err } func (storage *SimpleRESTStorage) New() runtime.Object { @@ -201,30 +201,28 @@ func (storage *SimpleRESTStorage) NewList() runtime.Object { return &SimpleList{} } -func (storage *SimpleRESTStorage) Create(ctx api.Context, obj runtime.Object) (<-chan RESTResult, error) { +func (storage *SimpleRESTStorage) Create(ctx api.Context, obj runtime.Object) (runtime.Object, error) { storage.created = obj.(*Simple) if err := storage.errors["create"]; err != nil { return nil, err } - return MakeAsync(func() (runtime.Object, error) { - if storage.injectedFunction != nil { - return storage.injectedFunction(obj) - } - return obj, nil - }), nil + var err error + if storage.injectedFunction != nil { + obj, err = storage.injectedFunction(obj) + } + return obj, err } -func (storage *SimpleRESTStorage) Update(ctx api.Context, obj runtime.Object) (<-chan RESTResult, error) { +func (storage *SimpleRESTStorage) Update(ctx api.Context, obj runtime.Object) (runtime.Object, bool, error) { storage.updated = obj.(*Simple) if err := storage.errors["update"]; err != nil { - return nil, err + return nil, false, err } - return MakeAsync(func() (runtime.Object, error) { - if storage.injectedFunction != nil { - return storage.injectedFunction(obj) - } - return obj, nil - }), nil + var err error + if storage.injectedFunction != nil { + obj, err = storage.injectedFunction(obj) + } + return obj, false, err } // Implement ResourceWatcher. @@ -994,7 +992,7 @@ func TestCreate(t *testing.T) { if !reflect.DeepEqual(&itemOut, simple) { t.Errorf("Unexpected data: %#v, expected %#v (%s)", itemOut, simple, string(body)) } - if response.StatusCode != http.StatusOK { + if response.StatusCode != http.StatusCreated { t.Errorf("Unexpected status: %d, Expected: %d, %#v", response.StatusCode, http.StatusOK, response) } if !selfLinker.called { diff --git a/pkg/apiserver/async.go b/pkg/apiserver/async.go index a96a151be34..ca1d0aa32cf 100644 --- a/pkg/apiserver/async.go +++ b/pkg/apiserver/async.go @@ -45,28 +45,3 @@ func MakeAsync(fn WorkFunc) <-chan RESTResult { }() return channel } - -// WorkFunc is used to perform any time consuming work for an api call, after -// the input has been validated. Pass one of these to MakeAsync to create an -// appropriate return value for the Update, Delete, and Create methods. -type WorkResultFunc func() (result RESTResult, err error) - -// MakeAsync takes a function and executes it, delivering the result in the way required -// by RESTStorage's Update, Delete, and Create methods. -func MakeAsyncResult(fn WorkResultFunc) <-chan RESTResult { - channel := make(chan RESTResult) - go func() { - defer util.HandleCrash() - obj, err := fn() - if err != nil { - channel <- RESTResult{Object: errToAPIStatus(err)} - } else { - channel <- obj - } - // 'close' is used to signal that no further values will - // be written to the channel. Not strictly necessary, but - // also won't hurt. - close(channel) - }() - return channel -} diff --git a/pkg/apiserver/interfaces.go b/pkg/apiserver/interfaces.go index 4bf2329e00f..233bd12236e 100644 --- a/pkg/apiserver/interfaces.go +++ b/pkg/apiserver/interfaces.go @@ -52,7 +52,9 @@ type RESTDeleter interface { // Delete finds a resource in the storage and deletes it. // Although it can return an arbitrary error value, IsNotFound(err) is true for the // returned error value err when the specified resource is not found. - Delete(ctx api.Context, id string) (<-chan RESTResult, error) + // Delete *may* return the object that was deleted, or a status object indicating additional + // information about deletion. + Delete(ctx api.Context, id string) (runtime.Object, error) } type RESTCreater interface { @@ -61,7 +63,7 @@ type RESTCreater interface { New() runtime.Object // Create creates a new version of a resource. - Create(ctx api.Context, obj runtime.Object) (<-chan RESTResult, error) + Create(ctx api.Context, obj runtime.Object) (runtime.Object, error) } type RESTUpdater interface { @@ -70,10 +72,9 @@ type RESTUpdater interface { New() runtime.Object // Update finds a resource in the storage and updates it. Some implementations - // may allow updates creates the object - they should set the Created flag of - // the returned RESTResult to true. In the event of an asynchronous error returned - // via an api.Status object, the Created flag is ignored. - Update(ctx api.Context, obj runtime.Object) (<-chan RESTResult, error) + // may allow updates creates the object - they should set the created boolean + // to true. + Update(ctx api.Context, obj runtime.Object) (runtime.Object, bool, error) } // RESTResult indicates the result of a REST transformation. diff --git a/pkg/apiserver/resthandler.go b/pkg/apiserver/resthandler.go index 11e725aa19e..b09be207367 100644 --- a/pkg/apiserver/resthandler.go +++ b/pkg/apiserver/resthandler.go @@ -17,7 +17,6 @@ limitations under the License. package apiserver import ( - "fmt" "net/http" "time" @@ -145,29 +144,20 @@ func CreateResource(r RESTCreater, namespaceFn ResourceNamespaceFunc, linkFn Lin return } - out, err := r.Create(ctx, obj) + result, err := finishRequest(timeout, func() (runtime.Object, error) { + return r.Create(ctx, obj) + }) if err != nil { errorJSON(err, codec, w) return } - result, err := finishRequest(out, timeout, codec) - if err != nil { + if err := linkFn(req, result); err != nil { errorJSON(err, codec, w) return } - item := result.Object - if err := linkFn(req, item); err != nil { - errorJSON(err, codec, w) - return - } - - status := http.StatusOK - if result.Created { - status = http.StatusCreated - } - writeJSON(status, codec, item, w) + writeJSON(http.StatusCreated, codec, result, w) } } @@ -223,29 +213,27 @@ func UpdateResource(r RESTUpdater, nameFn ResourceNameFunc, objNameFunc ObjectNa return } - out, err := r.Update(ctx, obj) + wasCreated := false + result, err := finishRequest(timeout, func() (runtime.Object, error) { + obj, created, err := r.Update(ctx, obj) + wasCreated = created + return obj, err + }) if err != nil { errorJSON(err, codec, w) return } - result, err := finishRequest(out, timeout, codec) - if err != nil { - errorJSON(err, codec, w) - return - } - - item := result.Object - if err := linkFn(req, item); err != nil { + if err := linkFn(req, result); err != nil { errorJSON(err, codec, w) return } status := http.StatusOK - if result.Created { + if wasCreated { status = http.StatusCreated } - writeJSON(status, codec, item, w) + writeJSON(status, codec, result, w) } } @@ -273,13 +261,9 @@ func DeleteResource(r RESTDeleter, nameFn ResourceNameFunc, linkFn LinkResourceF return } - out, err := r.Delete(ctx, name) - if err != nil { - errorJSON(err, codec, w) - return - } - - result, err := finishRequest(out, timeout, codec) + result, err := finishRequest(timeout, func() (runtime.Object, error) { + return r.Delete(ctx, name) + }) if err != nil { errorJSON(err, codec, w) return @@ -287,9 +271,8 @@ func DeleteResource(r RESTDeleter, nameFn ResourceNameFunc, linkFn LinkResourceF // if the RESTDeleter returns a nil object, fill out a status. Callers may return a valid // object with the response. - item := result.Object - if item == nil { - item = &api.Status{ + if result == nil { + result = &api.Status{ Status: api.StatusSuccess, Code: http.StatusOK, Details: &api.StatusDetails{ @@ -297,24 +280,43 @@ func DeleteResource(r RESTDeleter, nameFn ResourceNameFunc, linkFn LinkResourceF Kind: kind, }, } + } else { + // when a non-status response is returned, set the self link + if _, ok := result.(*api.Status); !ok { + if err := linkFn(req, result); err != nil { + errorJSON(err, codec, w) + return + } + } } - writeJSON(http.StatusOK, codec, item, w) + writeJSON(http.StatusOK, codec, result, w) } } -// finishRequest waits for the result channel to close or clear, and writes the appropriate response. +// resultFunc is a function that returns a rest result and can be run in a goroutine +type resultFunc func() (runtime.Object, error) + +// finishRequest makes a given resultFunc asynchronous and handles errors returned by the response. // Any api.Status object returned is considered an "error", which interrupts the normal response flow. -func finishRequest(ch <-chan RESTResult, timeout time.Duration, codec runtime.Codec) (*RESTResult, error) { - select { - case result, ok := <-ch: - if !ok { - // likely programming error - return nil, fmt.Errorf("operation channel closed without returning result") +func finishRequest(timeout time.Duration, fn resultFunc) (result runtime.Object, err error) { + ch := make(chan runtime.Object) + errCh := make(chan error) + go func() { + if result, err := fn(); err != nil { + errCh <- err + } else { + ch <- result } - if status, ok := result.Object.(*api.Status); ok { + }() + + select { + case result = <-ch: + if status, ok := result.(*api.Status); ok { return nil, errors.FromObject(status) } - return &result, nil + return result, nil + case err = <-errCh: + return nil, err case <-time.After(timeout): return nil, errors.NewTimeoutError("request did not complete within allowed duration") } diff --git a/pkg/master/publish.go b/pkg/master/publish.go index 47b9803f478..ced152954b9 100644 --- a/pkg/master/publish.go +++ b/pkg/master/publish.go @@ -17,7 +17,6 @@ limitations under the License. package master import ( - "fmt" "net" "strconv" "time" @@ -92,15 +91,8 @@ func (m *Master) createMasterNamespaceIfNeeded(ns string) error { Namespace: "", }, } - c, err := m.storage["namespaces"].(apiserver.RESTCreater).Create(ctx, namespace) - if err != nil { - return err - } - resp := <-c - if _, ok := resp.Object.(*api.Service); ok { - return nil - } - return fmt.Errorf("unexpected response %#v", resp) + _, err := m.storage["namespaces"].(apiserver.RESTCreater).Create(ctx, namespace) + return err } // createMasterServiceIfNeeded will create the specified service if it @@ -126,18 +118,8 @@ func (m *Master) createMasterServiceIfNeeded(serviceName string, serviceIP net.I SessionAffinity: api.AffinityTypeNone, }, } - // Kids, don't do this at home: this is a hack. There's no good way to call the business - // logic which lives in the REST object from here. - c, err := m.storage["services"].(apiserver.RESTCreater).Create(ctx, svc) - if err != nil { - return err - } - resp := <-c - if _, ok := resp.Object.(*api.Service); ok { - // If all worked, we get back an *api.Service object. - return nil - } - return fmt.Errorf("unexpected response: %#v", resp.Object) + _, err := m.storage["services"].(apiserver.RESTCreater).Create(ctx, svc) + return err } // ensureEndpointsContain sets the endpoints for the given service. Also removes From 26f08b78076f679022bd2eedfca67316dc0269bf Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Tue, 10 Feb 2015 12:49:56 -0500 Subject: [PATCH 3/5] RESTStorage should not need to know about async behavior Also make sure all POST operations return 201 by default. Removes the remainder of the asych logic in RESTStorage and leaves it up to the API server to expose that behavior. --- pkg/registry/binding/rest.go | 14 ++- pkg/registry/binding/rest_test.go | 22 ++-- pkg/registry/controller/rest.go | 41 ++++--- pkg/registry/controller/rest_test.go | 25 ++--- pkg/registry/endpoint/rest.go | 32 +++--- pkg/registry/event/rest.go | 42 ++++---- pkg/registry/event/rest_test.go | 18 ++-- pkg/registry/limitrange/rest.go | 43 ++++---- pkg/registry/minion/rest.go | 41 +++---- pkg/registry/minion/rest_test.go | 16 ++- pkg/registry/namespace/rest.go | 42 +++----- pkg/registry/namespace/rest_test.go | 18 ++-- pkg/registry/pod/rest.go | 48 ++++----- pkg/registry/pod/rest_test.go | 53 ++++----- pkg/registry/resourcequota/rest.go | 43 ++++---- pkg/registry/resourcequotausage/rest.go | 13 +-- pkg/registry/service/rest.go | 136 +++++++++++------------- pkg/registry/service/rest_test.go | 107 ++++++++----------- test/integration/auth_test.go | 21 ++-- 19 files changed, 333 insertions(+), 442 deletions(-) diff --git a/pkg/registry/binding/rest.go b/pkg/registry/binding/rest.go index 52907852c93..42d7970870e 100644 --- a/pkg/registry/binding/rest.go +++ b/pkg/registry/binding/rest.go @@ -18,9 +18,9 @@ package binding import ( "fmt" + "net/http" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" - "github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" ) @@ -44,15 +44,13 @@ func (*REST) New() runtime.Object { } // Create attempts to make the assignment indicated by the binding it recieves. -func (b *REST) Create(ctx api.Context, obj runtime.Object) (<-chan apiserver.RESTResult, error) { +func (b *REST) Create(ctx api.Context, obj runtime.Object) (runtime.Object, error) { binding, ok := obj.(*api.Binding) if !ok { return nil, fmt.Errorf("incorrect type: %#v", obj) } - return apiserver.MakeAsync(func() (runtime.Object, error) { - if err := b.registry.ApplyBinding(ctx, binding); err != nil { - return nil, err - } - return &api.Status{Status: api.StatusSuccess}, nil - }), nil + if err := b.registry.ApplyBinding(ctx, binding); err != nil { + return nil, err + } + return &api.Status{Status: api.StatusSuccess, Code: http.StatusCreated}, nil } diff --git a/pkg/registry/binding/rest_test.go b/pkg/registry/binding/rest_test.go index 134d3df5481..5ddac234105 100644 --- a/pkg/registry/binding/rest_test.go +++ b/pkg/registry/binding/rest_test.go @@ -71,22 +71,20 @@ func TestRESTPost(t *testing.T) { } ctx := api.NewContext() b := NewREST(mockRegistry) - resultChan, err := b.Create(ctx, item.b) - if err != nil { + result, err := b.Create(ctx, item.b) + if err != nil && item.err == nil { t.Errorf("Unexpected error %v", err) continue } - var expect *api.Status - if item.err == nil { - expect = &api.Status{Status: api.StatusSuccess} - } else { - expect = &api.Status{ - Status: api.StatusFailure, - Code: http.StatusInternalServerError, - Message: item.err.Error(), - } + if err == nil && item.err != nil { + t.Errorf("Unexpected error %v", err) + continue } - if e, a := expect, (<-resultChan).Object; !reflect.DeepEqual(e, a) { + var expect interface{} + if item.err == nil { + expect = &api.Status{Status: api.StatusSuccess, Code: http.StatusCreated} + } + if e, a := expect, result; !reflect.DeepEqual(e, a) { t.Errorf("%v: expected %#v, got %#v", i, e, a) } } diff --git a/pkg/registry/controller/rest.go b/pkg/registry/controller/rest.go index 94c13664ef1..4ea56e49618 100644 --- a/pkg/registry/controller/rest.go +++ b/pkg/registry/controller/rest.go @@ -50,7 +50,7 @@ func NewREST(registry Registry, podLister PodLister) *REST { } // Create registers the given ReplicationController. -func (rs *REST) Create(ctx api.Context, obj runtime.Object) (<-chan apiserver.RESTResult, error) { +func (rs *REST) Create(ctx api.Context, obj runtime.Object) (runtime.Object, error) { controller, ok := obj.(*api.ReplicationController) if !ok { return nil, fmt.Errorf("not a replication controller: %#v", obj) @@ -60,20 +60,16 @@ func (rs *REST) Create(ctx api.Context, obj runtime.Object) (<-chan apiserver.RE return nil, err } - return apiserver.MakeAsync(func() (runtime.Object, error) { - if err := rs.registry.CreateController(ctx, controller); err != nil { - err = rest.CheckGeneratedNameError(rest.ReplicationControllers, err, controller) - return apiserver.RESTResult{}, err - } - return rs.registry.GetController(ctx, controller.Name) - }), nil + if err := rs.registry.CreateController(ctx, controller); err != nil { + err = rest.CheckGeneratedNameError(rest.ReplicationControllers, err, controller) + return apiserver.RESTResult{}, err + } + return rs.registry.GetController(ctx, controller.Name) } // Delete asynchronously deletes the ReplicationController specified by its id. -func (rs *REST) Delete(ctx api.Context, id string) (<-chan apiserver.RESTResult, error) { - return apiserver.MakeAsync(func() (runtime.Object, error) { - return &api.Status{Status: api.StatusSuccess}, rs.registry.DeleteController(ctx, id) - }), nil +func (rs *REST) Delete(ctx api.Context, id string) (runtime.Object, error) { + return &api.Status{Status: api.StatusSuccess}, rs.registry.DeleteController(ctx, id) } // Get obtains the ReplicationController specified by its id. @@ -117,24 +113,23 @@ func (*REST) NewList() runtime.Object { // Update replaces a given ReplicationController instance with an existing // instance in storage.registry. -func (rs *REST) Update(ctx api.Context, obj runtime.Object) (<-chan apiserver.RESTResult, error) { +func (rs *REST) Update(ctx api.Context, obj runtime.Object) (runtime.Object, bool, error) { controller, ok := obj.(*api.ReplicationController) if !ok { - return nil, fmt.Errorf("not a replication controller: %#v", obj) + return nil, false, fmt.Errorf("not a replication controller: %#v", obj) } if !api.ValidNamespace(ctx, &controller.ObjectMeta) { - return nil, errors.NewConflict("controller", controller.Namespace, fmt.Errorf("Controller.Namespace does not match the provided context")) + return nil, false, errors.NewConflict("controller", controller.Namespace, fmt.Errorf("Controller.Namespace does not match the provided context")) } if errs := validation.ValidateReplicationController(controller); len(errs) > 0 { - return nil, errors.NewInvalid("replicationController", controller.Name, errs) + return nil, false, errors.NewInvalid("replicationController", controller.Name, errs) } - return apiserver.MakeAsync(func() (runtime.Object, error) { - err := rs.registry.UpdateController(ctx, controller) - if err != nil { - return nil, err - } - return rs.registry.GetController(ctx, controller.Name) - }), nil + err := rs.registry.UpdateController(ctx, controller) + if err != nil { + return nil, false, err + } + out, err := rs.registry.GetController(ctx, controller.Name) + return out, false, err } // Watch returns ReplicationController events via a watch.Interface. diff --git a/pkg/registry/controller/rest_test.go b/pkg/registry/controller/rest_test.go index 577b1061415..9616c654210 100644 --- a/pkg/registry/controller/rest_test.go +++ b/pkg/registry/controller/rest_test.go @@ -22,7 +22,6 @@ import ( "io/ioutil" "strings" "testing" - "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" @@ -268,23 +267,17 @@ func TestCreateController(t *testing.T) { }, } ctx := api.NewDefaultContext() - channel, err := storage.Create(ctx, controller) + obj, err := storage.Create(ctx, controller) if err != nil { t.Fatalf("Unexpected error: %v", err) } - if err != nil { - t.Errorf("unexpected error: %v", err) + if obj == nil { + t.Errorf("unexpected object") } if !api.HasObjectMetaSystemFieldValues(&controller.ObjectMeta) { t.Errorf("storage did not populate object meta field values") } - select { - case <-channel: - // expected case - case <-time.After(time.Millisecond * 100): - t.Error("Unexpected timeout from async channel") - } } // TODO: remove, covered by TestCreate @@ -338,9 +331,9 @@ func TestControllerStorageValidatesUpdate(t *testing.T) { } ctx := api.NewDefaultContext() for _, failureCase := range failureCases { - c, err := storage.Update(ctx, &failureCase) - if c != nil { - t.Errorf("Expected nil channel") + c, created, err := storage.Update(ctx, &failureCase) + if c != nil || created { + t.Errorf("Expected nil object and not created") } if !errors.IsInvalid(err) { t.Errorf("Expected to get an invalid resource error, got %v", err) @@ -441,9 +434,9 @@ func TestUpdateControllerWithConflictingNamespace(t *testing.T) { } ctx := api.NewDefaultContext() - channel, err := storage.Update(ctx, controller) - if channel != nil { - t.Error("Expected a nil channel, but we got a value") + obj, created, err := storage.Update(ctx, controller) + if obj != nil || created { + t.Error("Expected a nil object, but we got a value or created was true") } if err == nil { t.Errorf("Expected an error, but we didn't get one") diff --git a/pkg/registry/endpoint/rest.go b/pkg/registry/endpoint/rest.go index 40a7831fc23..147a9df248c 100644 --- a/pkg/registry/endpoint/rest.go +++ b/pkg/registry/endpoint/rest.go @@ -21,7 +21,6 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" - "github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" @@ -59,7 +58,7 @@ func (rs *REST) Watch(ctx api.Context, label, field labels.Selector, resourceVer } // Create satisfies the RESTStorage interface. -func (rs *REST) Create(ctx api.Context, obj runtime.Object) (<-chan apiserver.RESTResult, error) { +func (rs *REST) Create(ctx api.Context, obj runtime.Object) (runtime.Object, error) { endpoints, ok := obj.(*api.Endpoints) if !ok { return nil, fmt.Errorf("not an endpoints: %#v", obj) @@ -72,28 +71,25 @@ func (rs *REST) Create(ctx api.Context, obj runtime.Object) (<-chan apiserver.RE } api.FillObjectMetaSystemFields(ctx, &endpoints.ObjectMeta) - return apiserver.MakeAsync(func() (runtime.Object, error) { - err := rs.registry.UpdateEndpoints(ctx, endpoints) - if err != nil { - return nil, err - } - return rs.registry.GetEndpoints(ctx, endpoints.Name) - }), nil + err := rs.registry.UpdateEndpoints(ctx, endpoints) + if err != nil { + return nil, err + } + return rs.registry.GetEndpoints(ctx, endpoints.Name) } // Update satisfies the RESTStorage interface. -func (rs *REST) Update(ctx api.Context, obj runtime.Object) (<-chan apiserver.RESTResult, error) { +func (rs *REST) Update(ctx api.Context, obj runtime.Object) (runtime.Object, bool, error) { endpoints, ok := obj.(*api.Endpoints) if !ok { - return nil, fmt.Errorf("not an endpoints: %#v", obj) + return nil, false, fmt.Errorf("not an endpoints: %#v", obj) } - return apiserver.MakeAsync(func() (runtime.Object, error) { - err := rs.registry.UpdateEndpoints(ctx, endpoints) - if err != nil { - return nil, err - } - return rs.registry.GetEndpoints(ctx, endpoints.Name) - }), nil + err := rs.registry.UpdateEndpoints(ctx, endpoints) + if err != nil { + return nil, false, err + } + out, err := rs.registry.GetEndpoints(ctx, endpoints.Name) + return out, false, err } // New implements the RESTStorage interface. diff --git a/pkg/registry/event/rest.go b/pkg/registry/event/rest.go index 216c356d032..c9307eb86d8 100644 --- a/pkg/registry/event/rest.go +++ b/pkg/registry/event/rest.go @@ -22,7 +22,6 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/validation" - "github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" @@ -42,7 +41,7 @@ func NewREST(registry generic.Registry) *REST { } } -func (rs *REST) Create(ctx api.Context, obj runtime.Object) (<-chan apiserver.RESTResult, error) { +func (rs *REST) Create(ctx api.Context, obj runtime.Object) (runtime.Object, error) { event, ok := obj.(*api.Event) if !ok { return nil, fmt.Errorf("invalid object type") @@ -57,41 +56,38 @@ func (rs *REST) Create(ctx api.Context, obj runtime.Object) (<-chan apiserver.RE } api.FillObjectMetaSystemFields(ctx, &event.ObjectMeta) - return apiserver.MakeAsync(func() (runtime.Object, error) { - err := rs.registry.Create(ctx, event.Name, event) - if err != nil { - return nil, err - } - return rs.registry.Get(ctx, event.Name) - }), nil + err := rs.registry.Create(ctx, event.Name, event) + if err != nil { + return nil, err + } + return rs.registry.Get(ctx, event.Name) } // Update replaces an existing Event instance in storage.registry, with the given instance. -func (rs *REST) Update(ctx api.Context, obj runtime.Object) (<-chan apiserver.RESTResult, error) { +func (rs *REST) Update(ctx api.Context, obj runtime.Object) (runtime.Object, bool, error) { event, ok := obj.(*api.Event) if !ok { - return nil, fmt.Errorf("not an event object: %#v", obj) + return nil, false, fmt.Errorf("not an event object: %#v", obj) } if api.NamespaceValue(ctx) != "" { if !api.ValidNamespace(ctx, &event.ObjectMeta) { - return nil, errors.NewConflict("event", event.Namespace, fmt.Errorf("event.namespace does not match the provided context")) + return nil, false, errors.NewConflict("event", event.Namespace, fmt.Errorf("event.namespace does not match the provided context")) } } if errs := validation.ValidateEvent(event); len(errs) > 0 { - return nil, errors.NewInvalid("event", event.Name, errs) + return nil, false, errors.NewInvalid("event", event.Name, errs) } api.FillObjectMetaSystemFields(ctx, &event.ObjectMeta) - return apiserver.MakeAsync(func() (runtime.Object, error) { - err := rs.registry.Update(ctx, event.Name, event) - if err != nil { - return nil, err - } - return rs.registry.Get(ctx, event.Name) - }), nil + err := rs.registry.Update(ctx, event.Name, event) + if err != nil { + return nil, false, err + } + out, err := rs.registry.Get(ctx, event.Name) + return out, false, err } -func (rs *REST) Delete(ctx api.Context, id string) (<-chan apiserver.RESTResult, error) { +func (rs *REST) Delete(ctx api.Context, id string) (runtime.Object, error) { obj, err := rs.registry.Get(ctx, id) if err != nil { return nil, err @@ -100,9 +96,7 @@ func (rs *REST) Delete(ctx api.Context, id string) (<-chan apiserver.RESTResult, if !ok { return nil, fmt.Errorf("invalid object type") } - return apiserver.MakeAsync(func() (runtime.Object, error) { - return &api.Status{Status: api.StatusSuccess}, rs.registry.Delete(ctx, id) - }), nil + return &api.Status{Status: api.StatusSuccess}, rs.registry.Delete(ctx, id) } func (rs *REST) Get(ctx api.Context, id string) (runtime.Object, error) { diff --git a/pkg/registry/event/rest_test.go b/pkg/registry/event/rest_test.go index 922603dd9b0..cbcce2fbe11 100644 --- a/pkg/registry/event/rest_test.go +++ b/pkg/registry/event/rest_test.go @@ -89,7 +89,7 @@ func TestRESTCreate(t *testing.T) { if !api.HasObjectMetaSystemFieldValues(&item.event.ObjectMeta) { t.Errorf("storage did not populate object meta field values") } - if e, a := item.event, (<-c).Object; !reflect.DeepEqual(e, a) { + if e, a := item.event, c; !reflect.DeepEqual(e, a) { t.Errorf("diff: %s", util.ObjectDiff(e, a)) } // Ensure we implement the interface @@ -100,11 +100,10 @@ func TestRESTCreate(t *testing.T) { func TestRESTUpdate(t *testing.T) { _, rest := NewTestREST() eventA := testEvent("foo") - c, err := rest.Create(api.NewDefaultContext(), eventA) + _, err := rest.Create(api.NewDefaultContext(), eventA) if err != nil { t.Fatalf("Unexpected error %v", err) } - <-c got, err := rest.Get(api.NewDefaultContext(), eventA.Name) if err != nil { t.Fatalf("Unexpected error %v", err) @@ -113,11 +112,10 @@ func TestRESTUpdate(t *testing.T) { t.Errorf("diff: %s", util.ObjectDiff(e, a)) } eventB := testEvent("bar") - u, err := rest.Update(api.NewDefaultContext(), eventB) + _, _, err = rest.Update(api.NewDefaultContext(), eventB) if err != nil { t.Fatalf("Unexpected error %v", err) } - <-u got2, err := rest.Get(api.NewDefaultContext(), eventB.Name) if err != nil { t.Fatalf("Unexpected error %v", err) @@ -131,16 +129,15 @@ func TestRESTUpdate(t *testing.T) { func TestRESTDelete(t *testing.T) { _, rest := NewTestREST() eventA := testEvent("foo") - c, err := rest.Create(api.NewDefaultContext(), eventA) + _, err := rest.Create(api.NewDefaultContext(), eventA) if err != nil { t.Fatalf("Unexpected error %v", err) } - <-c - c, err = rest.Delete(api.NewDefaultContext(), eventA.Name) + c, err := rest.Delete(api.NewDefaultContext(), eventA.Name) if err != nil { t.Fatalf("Unexpected error %v", err) } - if stat := (<-c).Object.(*api.Status); stat.Status != api.StatusSuccess { + if stat := c.(*api.Status); stat.Status != api.StatusSuccess { t.Errorf("unexpected status: %v", stat) } } @@ -148,11 +145,10 @@ func TestRESTDelete(t *testing.T) { func TestRESTGet(t *testing.T) { _, rest := NewTestREST() eventA := testEvent("foo") - c, err := rest.Create(api.NewDefaultContext(), eventA) + _, err := rest.Create(api.NewDefaultContext(), eventA) if err != nil { t.Fatalf("Unexpected error %v", err) } - <-c got, err := rest.Get(api.NewDefaultContext(), eventA.Name) if err != nil { t.Fatalf("Unexpected error %v", err) diff --git a/pkg/registry/limitrange/rest.go b/pkg/registry/limitrange/rest.go index f5eaaa1a71d..878dfc46ebd 100644 --- a/pkg/registry/limitrange/rest.go +++ b/pkg/registry/limitrange/rest.go @@ -22,7 +22,6 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/validation" - "github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" @@ -44,7 +43,7 @@ func NewREST(registry generic.Registry) *REST { } // Create a LimitRange object -func (rs *REST) Create(ctx api.Context, obj runtime.Object) (<-chan apiserver.RESTResult, error) { +func (rs *REST) Create(ctx api.Context, obj runtime.Object) (runtime.Object, error) { limitRange, ok := obj.(*api.LimitRange) if !ok { return nil, fmt.Errorf("invalid object type") @@ -63,29 +62,27 @@ func (rs *REST) Create(ctx api.Context, obj runtime.Object) (<-chan apiserver.RE } api.FillObjectMetaSystemFields(ctx, &limitRange.ObjectMeta) - return apiserver.MakeAsync(func() (runtime.Object, error) { - err := rs.registry.Create(ctx, limitRange.Name, limitRange) - if err != nil { - return nil, err - } - return rs.registry.Get(ctx, limitRange.Name) - }), nil + err := rs.registry.Create(ctx, limitRange.Name, limitRange) + if err != nil { + return nil, err + } + return rs.registry.Get(ctx, limitRange.Name) } // Update updates a LimitRange object. -func (rs *REST) Update(ctx api.Context, obj runtime.Object) (<-chan apiserver.RESTResult, error) { +func (rs *REST) Update(ctx api.Context, obj runtime.Object) (runtime.Object, bool, error) { limitRange, ok := obj.(*api.LimitRange) if !ok { - return nil, fmt.Errorf("invalid object type") + return nil, false, fmt.Errorf("invalid object type") } if !api.ValidNamespace(ctx, &limitRange.ObjectMeta) { - return nil, errors.NewConflict("limitRange", limitRange.Namespace, fmt.Errorf("LimitRange.Namespace does not match the provided context")) + return nil, false, errors.NewConflict("limitRange", limitRange.Namespace, fmt.Errorf("LimitRange.Namespace does not match the provided context")) } oldObj, err := rs.registry.Get(ctx, limitRange.Name) if err != nil { - return nil, err + return nil, false, err } editLimitRange := oldObj.(*api.LimitRange) @@ -97,20 +94,18 @@ func (rs *REST) Update(ctx api.Context, obj runtime.Object) (<-chan apiserver.RE editLimitRange.Spec = limitRange.Spec if errs := validation.ValidateLimitRange(editLimitRange); len(errs) > 0 { - return nil, errors.NewInvalid("limitRange", editLimitRange.Name, errs) + return nil, false, errors.NewInvalid("limitRange", editLimitRange.Name, errs) } - return apiserver.MakeAsync(func() (runtime.Object, error) { - err := rs.registry.Update(ctx, editLimitRange.Name, editLimitRange) - if err != nil { - return nil, err - } - return rs.registry.Get(ctx, editLimitRange.Name) - }), nil + if err := rs.registry.Update(ctx, editLimitRange.Name, editLimitRange); err != nil { + return nil, false, err + } + out, err := rs.registry.Get(ctx, editLimitRange.Name) + return out, false, err } // Delete deletes the LimitRange with the specified name -func (rs *REST) Delete(ctx api.Context, name string) (<-chan apiserver.RESTResult, error) { +func (rs *REST) Delete(ctx api.Context, name string) (runtime.Object, error) { obj, err := rs.registry.Get(ctx, name) if err != nil { return nil, err @@ -119,9 +114,7 @@ func (rs *REST) Delete(ctx api.Context, name string) (<-chan apiserver.RESTResul if !ok { return nil, fmt.Errorf("invalid object type") } - return apiserver.MakeAsync(func() (runtime.Object, error) { - return &api.Status{Status: api.StatusSuccess}, rs.registry.Delete(ctx, name) - }), nil + return &api.Status{Status: api.StatusSuccess}, rs.registry.Delete(ctx, name) } // Get gets a LimitRange with the specified name diff --git a/pkg/registry/minion/rest.go b/pkg/registry/minion/rest.go index 5c0ab637383..cd6db1ef475 100644 --- a/pkg/registry/minion/rest.go +++ b/pkg/registry/minion/rest.go @@ -26,7 +26,6 @@ import ( kerrors "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/rest" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/validation" - "github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/master/ports" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" @@ -49,7 +48,7 @@ var ErrDoesNotExist = errors.New("The requested resource does not exist.") var ErrNotHealty = errors.New("The requested minion is not healthy.") // Create satisfies the RESTStorage interface. -func (rs *REST) Create(ctx api.Context, obj runtime.Object) (<-chan apiserver.RESTResult, error) { +func (rs *REST) Create(ctx api.Context, obj runtime.Object) (runtime.Object, error) { minion, ok := obj.(*api.Node) if !ok { return nil, fmt.Errorf("not a minion: %#v", obj) @@ -59,17 +58,15 @@ func (rs *REST) Create(ctx api.Context, obj runtime.Object) (<-chan apiserver.RE return nil, err } - return apiserver.MakeAsync(func() (runtime.Object, error) { - if err := rs.registry.CreateMinion(ctx, minion); err != nil { - err = rest.CheckGeneratedNameError(rest.Nodes, err, minion) - return nil, err - } - return minion, nil - }), nil + if err := rs.registry.CreateMinion(ctx, minion); err != nil { + err = rest.CheckGeneratedNameError(rest.Nodes, err, minion) + return nil, err + } + return minion, nil } // Delete satisfies the RESTStorage interface. -func (rs *REST) Delete(ctx api.Context, id string) (<-chan apiserver.RESTResult, error) { +func (rs *REST) Delete(ctx api.Context, id string) (runtime.Object, error) { minion, err := rs.registry.GetMinion(ctx, id) if minion == nil { return nil, ErrDoesNotExist @@ -77,9 +74,7 @@ func (rs *REST) Delete(ctx api.Context, id string) (<-chan apiserver.RESTResult, if err != nil { return nil, err } - return apiserver.MakeAsync(func() (runtime.Object, error) { - return &api.Status{Status: api.StatusSuccess}, rs.registry.DeleteMinion(ctx, id) - }), nil + return &api.Status{Status: api.StatusSuccess}, rs.registry.DeleteMinion(ctx, id) } // Get satisfies the RESTStorage interface. @@ -108,10 +103,10 @@ func (*REST) NewList() runtime.Object { } // Update satisfies the RESTStorage interface. -func (rs *REST) Update(ctx api.Context, obj runtime.Object) (<-chan apiserver.RESTResult, error) { +func (rs *REST) Update(ctx api.Context, obj runtime.Object) (runtime.Object, bool, error) { minion, ok := obj.(*api.Node) if !ok { - return nil, fmt.Errorf("not a minion: %#v", obj) + return nil, false, fmt.Errorf("not a minion: %#v", obj) } // This is hacky, but minions don't really have a namespace, but kubectl currently automatically // stuffs one in there. Fix it here temporarily until we fix kubectl @@ -123,7 +118,7 @@ func (rs *REST) Update(ctx api.Context, obj runtime.Object) (<-chan apiserver.RE oldMinion, err := rs.registry.GetMinion(ctx, minion.Name) if err != nil { - return nil, err + return nil, false, err } // This is hacky, but minion HostIP has been moved from spec to status since v1beta2. When updating @@ -134,16 +129,14 @@ func (rs *REST) Update(ctx api.Context, obj runtime.Object) (<-chan apiserver.RE } if errs := validation.ValidateMinionUpdate(oldMinion, minion); len(errs) > 0 { - return nil, kerrors.NewInvalid("minion", minion.Name, errs) + return nil, false, kerrors.NewInvalid("minion", minion.Name, errs) } - return apiserver.MakeAsync(func() (runtime.Object, error) { - err := rs.registry.UpdateMinion(ctx, minion) - if err != nil { - return nil, err - } - return rs.registry.GetMinion(ctx, minion.Name) - }), nil + if err := rs.registry.UpdateMinion(ctx, minion); err != nil { + return nil, false, err + } + out, err := rs.registry.GetMinion(ctx, minion.Name) + return out, false, err } // Watch returns Minions events via a watch.Interface. diff --git a/pkg/registry/minion/rest_test.go b/pkg/registry/minion/rest_test.go index 0e3e2c87b83..fe53157e31a 100644 --- a/pkg/registry/minion/rest_test.go +++ b/pkg/registry/minion/rest_test.go @@ -39,27 +39,25 @@ func TestMinionRegistryREST(t *testing.T) { t.Errorf("has unexpected error: %v", err) } - c, err := ms.Create(ctx, &api.Node{ObjectMeta: api.ObjectMeta{Name: "baz"}}) + obj, err := ms.Create(ctx, &api.Node{ObjectMeta: api.ObjectMeta{Name: "baz"}}) if err != nil { t.Fatalf("insert failed: %v", err) } - obj := <-c - if !api.HasObjectMetaSystemFieldValues(&obj.Object.(*api.Node).ObjectMeta) { + if !api.HasObjectMetaSystemFieldValues(&obj.(*api.Node).ObjectMeta) { t.Errorf("storage did not populate object meta field values") } - if m, ok := obj.Object.(*api.Node); !ok || m.Name != "baz" { + if m, ok := obj.(*api.Node); !ok || m.Name != "baz" { t.Errorf("insert return value was weird: %#v", obj) } if obj, err := ms.Get(ctx, "baz"); err != nil || obj.(*api.Node).Name != "baz" { t.Errorf("insert didn't actually insert") } - c, err = ms.Delete(ctx, "bar") + obj, err = ms.Delete(ctx, "bar") if err != nil { t.Fatalf("delete failed") } - obj = <-c - if s, ok := obj.Object.(*api.Status); !ok || s.Status != api.StatusSuccess { + if s, ok := obj.(*api.Status); !ok || s.Status != api.StatusSuccess { t.Errorf("delete return value was weird: %#v", obj) } if _, err := ms.Get(ctx, "bar"); !errors.IsNotFound(err) { @@ -103,7 +101,7 @@ func TestMinionRegistryValidUpdate(t *testing.T) { "foo": "bar", "baz": "home", } - if _, err = storage.Update(ctx, minion); err != nil { + if _, _, err = storage.Update(ctx, minion); err != nil { t.Errorf("Unexpected error: %v", err) } } @@ -136,7 +134,7 @@ func TestMinionRegistryValidatesCreate(t *testing.T) { for _, failureCase := range failureCases { c, err := storage.Create(ctx, &failureCase) if c != nil { - t.Errorf("Expected nil channel") + t.Errorf("Expected nil object") } if !errors.IsInvalid(err) { t.Errorf("Expected to get an invalid resource error, got %v", err) diff --git a/pkg/registry/namespace/rest.go b/pkg/registry/namespace/rest.go index eeaad26519d..21a6a45bf61 100644 --- a/pkg/registry/namespace/rest.go +++ b/pkg/registry/namespace/rest.go @@ -23,7 +23,6 @@ import ( kerrors "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/rest" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/validation" - "github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" @@ -44,48 +43,44 @@ func NewREST(registry generic.Registry) *REST { } // Create creates a Namespace object -func (rs *REST) Create(ctx api.Context, obj runtime.Object) (<-chan apiserver.RESTResult, error) { +func (rs *REST) Create(ctx api.Context, obj runtime.Object) (runtime.Object, error) { namespace := obj.(*api.Namespace) if err := rest.BeforeCreate(rest.Namespaces, ctx, obj); err != nil { return nil, err } - return apiserver.MakeAsync(func() (runtime.Object, error) { - if err := rs.registry.Create(ctx, namespace.Name, namespace); err != nil { - err = rest.CheckGeneratedNameError(rest.Namespaces, err, namespace) - return nil, err - } - return rs.registry.Get(ctx, namespace.Name) - }), nil + if err := rs.registry.Create(ctx, namespace.Name, namespace); err != nil { + err = rest.CheckGeneratedNameError(rest.Namespaces, err, namespace) + return nil, err + } + return rs.registry.Get(ctx, namespace.Name) } // Update updates a Namespace object. -func (rs *REST) Update(ctx api.Context, obj runtime.Object) (<-chan apiserver.RESTResult, error) { +func (rs *REST) Update(ctx api.Context, obj runtime.Object) (runtime.Object, bool, error) { namespace, ok := obj.(*api.Namespace) if !ok { - return nil, fmt.Errorf("not a namespace: %#v", obj) + return nil, false, fmt.Errorf("not a namespace: %#v", obj) } oldObj, err := rs.registry.Get(ctx, namespace.Name) if err != nil { - return nil, err + return nil, false, err } oldNamespace := oldObj.(*api.Namespace) if errs := validation.ValidateNamespaceUpdate(oldNamespace, namespace); len(errs) > 0 { - return nil, kerrors.NewInvalid("namespace", namespace.Name, errs) + return nil, false, kerrors.NewInvalid("namespace", namespace.Name, errs) } - return apiserver.MakeAsync(func() (runtime.Object, error) { - err := rs.registry.Update(ctx, oldNamespace.Name, oldNamespace) - if err != nil { - return nil, err - } - return rs.registry.Get(ctx, oldNamespace.Name) - }), nil + if err := rs.registry.Update(ctx, oldNamespace.Name, oldNamespace); err != nil { + return nil, false, err + } + out, err := rs.registry.Get(ctx, oldNamespace.Name) + return out, false, err } // Delete deletes the Namespace with the specified name -func (rs *REST) Delete(ctx api.Context, id string) (<-chan apiserver.RESTResult, error) { +func (rs *REST) Delete(ctx api.Context, id string) (runtime.Object, error) { obj, err := rs.registry.Get(ctx, id) if err != nil { return nil, err @@ -94,10 +89,7 @@ func (rs *REST) Delete(ctx api.Context, id string) (<-chan apiserver.RESTResult, if !ok { return nil, fmt.Errorf("invalid object type") } - - return apiserver.MakeAsync(func() (runtime.Object, error) { - return &api.Status{Status: api.StatusSuccess}, rs.registry.Delete(ctx, id) - }), nil + return &api.Status{Status: api.StatusSuccess}, rs.registry.Delete(ctx, id) } func (rs *REST) Get(ctx api.Context, id string) (runtime.Object, error) { diff --git a/pkg/registry/namespace/rest_test.go b/pkg/registry/namespace/rest_test.go index e053e4dd215..9a694984489 100644 --- a/pkg/registry/namespace/rest_test.go +++ b/pkg/registry/namespace/rest_test.go @@ -78,7 +78,7 @@ func TestRESTCreate(t *testing.T) { if !api.HasObjectMetaSystemFieldValues(&item.namespace.ObjectMeta) { t.Errorf("storage did not populate object meta field values") } - if e, a := item.namespace, (<-c).Object; !reflect.DeepEqual(e, a) { + if e, a := item.namespace, c; !reflect.DeepEqual(e, a) { t.Errorf("diff: %s", util.ObjectDiff(e, a)) } // Ensure we implement the interface @@ -89,11 +89,10 @@ func TestRESTCreate(t *testing.T) { func TestRESTUpdate(t *testing.T) { _, rest := NewTestREST() namespaceA := testNamespace("foo") - c, err := rest.Create(api.NewDefaultContext(), namespaceA) + _, err := rest.Create(api.NewDefaultContext(), namespaceA) if err != nil { t.Fatalf("Unexpected error %v", err) } - <-c got, err := rest.Get(api.NewDefaultContext(), namespaceA.Name) if err != nil { t.Fatalf("Unexpected error %v", err) @@ -102,11 +101,10 @@ func TestRESTUpdate(t *testing.T) { t.Errorf("diff: %s", util.ObjectDiff(e, a)) } namespaceB := testNamespace("foo") - u, err := rest.Update(api.NewDefaultContext(), namespaceB) + _, _, err = rest.Update(api.NewDefaultContext(), namespaceB) if err != nil { t.Fatalf("Unexpected error %v", err) } - <-u got2, err := rest.Get(api.NewDefaultContext(), namespaceB.Name) if err != nil { t.Fatalf("Unexpected error %v", err) @@ -120,16 +118,15 @@ func TestRESTUpdate(t *testing.T) { func TestRESTDelete(t *testing.T) { _, rest := NewTestREST() namespaceA := testNamespace("foo") - c, err := rest.Create(api.NewContext(), namespaceA) + _, err := rest.Create(api.NewContext(), namespaceA) if err != nil { t.Fatalf("Unexpected error %v", err) } - <-c - c, err = rest.Delete(api.NewContext(), namespaceA.Name) + c, err := rest.Delete(api.NewContext(), namespaceA.Name) if err != nil { t.Fatalf("Unexpected error %v", err) } - if stat := (<-c).Object.(*api.Status); stat.Status != api.StatusSuccess { + if stat := c.(*api.Status); stat.Status != api.StatusSuccess { t.Errorf("unexpected status: %v", stat) } } @@ -137,11 +134,10 @@ func TestRESTDelete(t *testing.T) { func TestRESTGet(t *testing.T) { _, rest := NewTestREST() namespaceA := testNamespace("foo") - c, err := rest.Create(api.NewContext(), namespaceA) + _, err := rest.Create(api.NewContext(), namespaceA) if err != nil { t.Fatalf("Unexpected error %v", err) } - <-c got, err := rest.Get(api.NewContext(), namespaceA.Name) if err != nil { t.Fatalf("Unexpected error %v", err) diff --git a/pkg/registry/pod/rest.go b/pkg/registry/pod/rest.go index b27290b1fdb..4e7c995ae69 100644 --- a/pkg/registry/pod/rest.go +++ b/pkg/registry/pod/rest.go @@ -25,7 +25,6 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api/rest" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/v1beta1" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/validation" - "github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" @@ -55,32 +54,28 @@ func NewREST(config *RESTConfig) *REST { } } -func (rs *REST) Create(ctx api.Context, obj runtime.Object) (<-chan apiserver.RESTResult, error) { +func (rs *REST) Create(ctx api.Context, obj runtime.Object) (runtime.Object, error) { pod := obj.(*api.Pod) if err := rest.BeforeCreate(rest.Pods, ctx, obj); err != nil { return nil, err } - return apiserver.MakeAsync(func() (runtime.Object, error) { - if err := rs.registry.CreatePod(ctx, pod); err != nil { - err = rest.CheckGeneratedNameError(rest.Pods, err, pod) - return nil, err - } - return rs.registry.GetPod(ctx, pod.Name) - }), nil + if err := rs.registry.CreatePod(ctx, pod); err != nil { + err = rest.CheckGeneratedNameError(rest.Pods, err, pod) + return nil, err + } + return rs.registry.GetPod(ctx, pod.Name) } -func (rs *REST) Delete(ctx api.Context, id string) (<-chan apiserver.RESTResult, error) { - return apiserver.MakeAsync(func() (runtime.Object, error) { - namespace, found := api.NamespaceFrom(ctx) - if !found { - return &api.Status{Status: api.StatusFailure}, nil - } - rs.podCache.ClearPodStatus(namespace, id) +func (rs *REST) Delete(ctx api.Context, id string) (runtime.Object, error) { + namespace, found := api.NamespaceFrom(ctx) + if !found { + return &api.Status{Status: api.StatusFailure}, nil + } + rs.podCache.ClearPodStatus(namespace, id) - return &api.Status{Status: api.StatusSuccess}, rs.registry.DeletePod(ctx, id) - }), nil + return &api.Status{Status: api.StatusSuccess}, rs.registry.DeletePod(ctx, id) } func (rs *REST) Get(ctx api.Context, id string) (runtime.Object, error) { @@ -167,20 +162,19 @@ func (*REST) NewList() runtime.Object { return &api.PodList{} } -func (rs *REST) Update(ctx api.Context, obj runtime.Object) (<-chan apiserver.RESTResult, error) { +func (rs *REST) Update(ctx api.Context, obj runtime.Object) (runtime.Object, bool, error) { pod := obj.(*api.Pod) if !api.ValidNamespace(ctx, &pod.ObjectMeta) { - return nil, errors.NewConflict("pod", pod.Namespace, fmt.Errorf("Pod.Namespace does not match the provided context")) + return nil, false, errors.NewConflict("pod", pod.Namespace, fmt.Errorf("Pod.Namespace does not match the provided context")) } if errs := validation.ValidatePod(pod); len(errs) > 0 { - return nil, errors.NewInvalid("pod", pod.Name, errs) + return nil, false, errors.NewInvalid("pod", pod.Name, errs) } - return apiserver.MakeAsync(func() (runtime.Object, error) { - if err := rs.registry.UpdatePod(ctx, pod); err != nil { - return nil, err - } - return rs.registry.GetPod(ctx, pod.Name) - }), nil + if err := rs.registry.UpdatePod(ctx, pod); err != nil { + return nil, false, err + } + out, err := rs.registry.GetPod(ctx, pod.Name) + return out, false, err } // ResourceLocation returns a URL to which one can send traffic for the specified pod. diff --git a/pkg/registry/pod/rest_test.go b/pkg/registry/pod/rest_test.go index e0b2c1c9031..ff133675499 100644 --- a/pkg/registry/pod/rest_test.go +++ b/pkg/registry/pod/rest_test.go @@ -21,7 +21,6 @@ import ( "reflect" "strings" "testing" - "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" @@ -31,6 +30,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest" + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" ) @@ -55,9 +55,8 @@ func (f *fakeCache) ClearPodStatus(namespace, name string) { f.clearedName = name } -func expectApiStatusError(t *testing.T, ch <-chan apiserver.RESTResult, msg string) { - out := <-ch - status, ok := out.Object.(*api.Status) +func expectApiStatusError(t *testing.T, out runtime.Object, msg string) { + status, ok := out.(*api.Status) if !ok { t.Errorf("Expected an api.Status object, was %#v", out) return @@ -67,9 +66,8 @@ func expectApiStatusError(t *testing.T, ch <-chan apiserver.RESTResult, msg stri } } -func expectPod(t *testing.T, ch <-chan apiserver.RESTResult) (*api.Pod, bool) { - out := <-ch - pod, ok := out.Object.(*api.Pod) +func expectPod(t *testing.T, out runtime.Object) (*api.Pod, bool) { + pod, ok := out.(*api.Pod) if !ok || pod == nil { t.Errorf("Expected an api.Pod object, was %#v", out) return nil, false @@ -94,11 +92,10 @@ func TestCreatePodRegistryError(t *testing.T) { }, } ctx := api.NewDefaultContext() - ch, err := storage.Create(ctx, pod) - if err != nil { + _, err := storage.Create(ctx, pod) + if err != podRegistry.Err { t.Fatalf("unexpected error: %v", err) } - expectApiStatusError(t, ch, podRegistry.Err.Error()) } func TestCreatePodSetsIds(t *testing.T) { @@ -118,11 +115,10 @@ func TestCreatePodSetsIds(t *testing.T) { }, } ctx := api.NewDefaultContext() - ch, err := storage.Create(ctx, pod) - if err != nil { + _, err := storage.Create(ctx, pod) + if err != podRegistry.Err { t.Fatalf("unexpected error: %v", err) } - expectApiStatusError(t, ch, podRegistry.Err.Error()) if len(podRegistry.Pod.Name) == 0 { t.Errorf("Expected pod ID to be set, Got %#v", pod) @@ -149,11 +145,10 @@ func TestCreatePodSetsUID(t *testing.T) { }, } ctx := api.NewDefaultContext() - ch, err := storage.Create(ctx, pod) - if err != nil { + _, err := storage.Create(ctx, pod) + if err != podRegistry.Err { t.Fatalf("unexpected error: %v", err) } - expectApiStatusError(t, ch, podRegistry.Err.Error()) if len(podRegistry.Pod.UID) == 0 { t.Errorf("Expected pod UID to be set, Got %#v", pod) @@ -471,15 +466,12 @@ func TestCreatePod(t *testing.T) { } pod.Name = "foo" ctx := api.NewDefaultContext() - channel, err := storage.Create(ctx, pod) + obj, err := storage.Create(ctx, pod) if err != nil { t.Fatalf("unexpected error: %v", err) } - select { - case <-channel: - // Do nothing, this is expected. - case <-time.After(time.Millisecond * 100): - t.Error("Unexpected timeout on async channel") + if obj == nil { + t.Fatalf("unexpected object: %#v", obj) } if !api.HasObjectMetaSystemFieldValues(&podRegistry.Pod.ObjectMeta) { t.Errorf("Expected ObjectMeta field values were populated") @@ -520,9 +512,9 @@ func TestUpdatePodWithConflictingNamespace(t *testing.T) { } ctx := api.NewDefaultContext() - channel, err := storage.Update(ctx, pod) - if channel != nil { - t.Error("Expected a nil channel, but we got a value") + obj, created, err := storage.Update(ctx, pod) + if obj != nil || created { + t.Error("Expected a nil channel, but we got a value or created") } if err == nil { t.Errorf("Expected an error, but we didn't get one") @@ -648,19 +640,12 @@ func TestDeletePod(t *testing.T) { podCache: fakeCache, } ctx := api.NewDefaultContext() - channel, err := storage.Delete(ctx, "foo") + result, err := storage.Delete(ctx, "foo") if err != nil { t.Fatalf("unexpected error: %v", err) } - var result apiserver.RESTResult - select { - case result = <-channel: - // Do nothing, this is expected. - case <-time.After(time.Millisecond * 100): - t.Error("Unexpected timeout on async channel") - } if fakeCache.clearedNamespace != "default" || fakeCache.clearedName != "foo" { - t.Errorf("Unexpeceted cache delete: %s %s %#v", fakeCache.clearedName, fakeCache.clearedNamespace, result.Object) + t.Errorf("Unexpeceted cache delete: %s %s %#v", fakeCache.clearedName, fakeCache.clearedNamespace, result) } } diff --git a/pkg/registry/resourcequota/rest.go b/pkg/registry/resourcequota/rest.go index 3f22be18826..e577e0cc516 100644 --- a/pkg/registry/resourcequota/rest.go +++ b/pkg/registry/resourcequota/rest.go @@ -22,7 +22,6 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/validation" - "github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" @@ -44,7 +43,7 @@ func NewREST(registry generic.Registry) *REST { } // Create a ResourceQuota object -func (rs *REST) Create(ctx api.Context, obj runtime.Object) (<-chan apiserver.RESTResult, error) { +func (rs *REST) Create(ctx api.Context, obj runtime.Object) (runtime.Object, error) { resourceQuota, ok := obj.(*api.ResourceQuota) if !ok { return nil, fmt.Errorf("invalid object type") @@ -66,29 +65,27 @@ func (rs *REST) Create(ctx api.Context, obj runtime.Object) (<-chan apiserver.RE } api.FillObjectMetaSystemFields(ctx, &resourceQuota.ObjectMeta) - return apiserver.MakeAsync(func() (runtime.Object, error) { - err := rs.registry.Create(ctx, resourceQuota.Name, resourceQuota) - if err != nil { - return nil, err - } - return rs.registry.Get(ctx, resourceQuota.Name) - }), nil + err := rs.registry.Create(ctx, resourceQuota.Name, resourceQuota) + if err != nil { + return nil, err + } + return rs.registry.Get(ctx, resourceQuota.Name) } // Update updates a ResourceQuota object. -func (rs *REST) Update(ctx api.Context, obj runtime.Object) (<-chan apiserver.RESTResult, error) { +func (rs *REST) Update(ctx api.Context, obj runtime.Object) (runtime.Object, bool, error) { resourceQuota, ok := obj.(*api.ResourceQuota) if !ok { - return nil, fmt.Errorf("invalid object type") + return nil, false, fmt.Errorf("invalid object type") } if !api.ValidNamespace(ctx, &resourceQuota.ObjectMeta) { - return nil, errors.NewConflict("resourceQuota", resourceQuota.Namespace, fmt.Errorf("ResourceQuota.Namespace does not match the provided context")) + return nil, false, errors.NewConflict("resourceQuota", resourceQuota.Namespace, fmt.Errorf("ResourceQuota.Namespace does not match the provided context")) } oldObj, err := rs.registry.Get(ctx, resourceQuota.Name) if err != nil { - return nil, err + return nil, false, err } editResourceQuota := oldObj.(*api.ResourceQuota) @@ -100,20 +97,18 @@ func (rs *REST) Update(ctx api.Context, obj runtime.Object) (<-chan apiserver.RE editResourceQuota.Spec = resourceQuota.Spec if errs := validation.ValidateResourceQuota(editResourceQuota); len(errs) > 0 { - return nil, errors.NewInvalid("resourceQuota", editResourceQuota.Name, errs) + return nil, false, errors.NewInvalid("resourceQuota", editResourceQuota.Name, errs) } - return apiserver.MakeAsync(func() (runtime.Object, error) { - err := rs.registry.Update(ctx, editResourceQuota.Name, editResourceQuota) - if err != nil { - return nil, err - } - return rs.registry.Get(ctx, editResourceQuota.Name) - }), nil + if err := rs.registry.Update(ctx, editResourceQuota.Name, editResourceQuota); err != nil { + return nil, false, err + } + out, err := rs.registry.Get(ctx, editResourceQuota.Name) + return out, false, err } // Delete deletes the ResourceQuota with the specified name -func (rs *REST) Delete(ctx api.Context, name string) (<-chan apiserver.RESTResult, error) { +func (rs *REST) Delete(ctx api.Context, name string) (runtime.Object, error) { obj, err := rs.registry.Get(ctx, name) if err != nil { return nil, err @@ -122,9 +117,7 @@ func (rs *REST) Delete(ctx api.Context, name string) (<-chan apiserver.RESTResul if !ok { return nil, fmt.Errorf("invalid object type") } - return apiserver.MakeAsync(func() (runtime.Object, error) { - return &api.Status{Status: api.StatusSuccess}, rs.registry.Delete(ctx, name) - }), nil + return &api.Status{Status: api.StatusSuccess}, rs.registry.Delete(ctx, name) } // Get gets a ResourceQuota with the specified name diff --git a/pkg/registry/resourcequotausage/rest.go b/pkg/registry/resourcequotausage/rest.go index cfd63cf4c7a..b2d9a51454e 100644 --- a/pkg/registry/resourcequotausage/rest.go +++ b/pkg/registry/resourcequotausage/rest.go @@ -20,7 +20,6 @@ import ( "fmt" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" - "github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" ) @@ -42,15 +41,13 @@ func (*REST) New() runtime.Object { } // Create takes the incoming ResourceQuotaUsage and applies the latest status atomically to a ResourceQuota -func (b *REST) Create(ctx api.Context, obj runtime.Object) (<-chan apiserver.RESTResult, error) { +func (b *REST) Create(ctx api.Context, obj runtime.Object) (runtime.Object, error) { resourceQuotaUsage, ok := obj.(*api.ResourceQuotaUsage) if !ok { return nil, fmt.Errorf("incorrect type: %#v", obj) } - return apiserver.MakeAsync(func() (runtime.Object, error) { - if err := b.registry.ApplyStatus(ctx, resourceQuotaUsage); err != nil { - return nil, err - } - return &api.Status{Status: api.StatusSuccess}, nil - }), nil + if err := b.registry.ApplyStatus(ctx, resourceQuotaUsage); err != nil { + return nil, err + } + return &api.Status{Status: api.StatusSuccess}, nil } diff --git a/pkg/registry/service/rest.go b/pkg/registry/service/rest.go index 638cf592c84..a2906179219 100644 --- a/pkg/registry/service/rest.go +++ b/pkg/registry/service/rest.go @@ -25,7 +25,6 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/rest" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/validation" - "github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver" "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/minion" @@ -80,7 +79,7 @@ func reloadIPsFromStorage(ipa *ipAllocator, registry Registry) { } } -func (rs *REST) Create(ctx api.Context, obj runtime.Object) (<-chan apiserver.RESTResult, error) { +func (rs *REST) Create(ctx api.Context, obj runtime.Object) (runtime.Object, error) { service := obj.(*api.Service) if err := rest.BeforeCreate(rest.Services, ctx, obj); err != nil { @@ -102,61 +101,59 @@ func (rs *REST) Create(ctx api.Context, obj runtime.Object) (<-chan apiserver.RE } } - return apiserver.MakeAsync(func() (runtime.Object, error) { - // TODO: Move this to post-creation rectification loop, so that we make/remove external load balancers - // correctly no matter what http operations happen. - if service.Spec.CreateExternalLoadBalancer { - if rs.cloud == nil { - return nil, fmt.Errorf("requested an external service, but no cloud provider supplied.") - } - if service.Spec.Protocol != api.ProtocolTCP { - // TODO: Support UDP here too. - return nil, fmt.Errorf("external load balancers for non TCP services are not currently supported.") - } - balancer, ok := rs.cloud.TCPLoadBalancer() - if !ok { - return nil, fmt.Errorf("the cloud provider does not support external TCP load balancers.") - } - zones, ok := rs.cloud.Zones() - if !ok { - return nil, fmt.Errorf("the cloud provider does not support zone enumeration.") - } - hosts, err := rs.machines.ListMinions(ctx) - if err != nil { - return nil, err - } - zone, err := zones.GetZone() - if err != nil { - return nil, err - } - // TODO: We should be able to rely on valid input, and not do defaulting here. - var affinityType api.AffinityType = service.Spec.SessionAffinity - if affinityType == "" { - affinityType = api.AffinityTypeNone - } - if len(service.Spec.PublicIPs) > 0 { - for _, publicIP := range service.Spec.PublicIPs { - _, err = balancer.CreateTCPLoadBalancer(service.Name, zone.Region, net.ParseIP(publicIP), service.Spec.Port, hostsFromMinionList(hosts), affinityType) - if err != nil { - // TODO: have to roll-back any successful calls. - return nil, err - } - } - } else { - ip, err := balancer.CreateTCPLoadBalancer(service.Name, zone.Region, nil, service.Spec.Port, hostsFromMinionList(hosts), affinityType) - if err != nil { - return nil, err - } - service.Spec.PublicIPs = []string{ip.String()} - } + // TODO: Move this to post-creation rectification loop, so that we make/remove external load balancers + // correctly no matter what http operations happen. + if service.Spec.CreateExternalLoadBalancer { + if rs.cloud == nil { + return nil, fmt.Errorf("requested an external service, but no cloud provider supplied.") } - - if err := rs.registry.CreateService(ctx, service); err != nil { - err = rest.CheckGeneratedNameError(rest.Services, err, service) + if service.Spec.Protocol != api.ProtocolTCP { + // TODO: Support UDP here too. + return nil, fmt.Errorf("external load balancers for non TCP services are not currently supported.") + } + balancer, ok := rs.cloud.TCPLoadBalancer() + if !ok { + return nil, fmt.Errorf("the cloud provider does not support external TCP load balancers.") + } + zones, ok := rs.cloud.Zones() + if !ok { + return nil, fmt.Errorf("the cloud provider does not support zone enumeration.") + } + hosts, err := rs.machines.ListMinions(ctx) + if err != nil { return nil, err } - return rs.registry.GetService(ctx, service.Name) - }), nil + zone, err := zones.GetZone() + if err != nil { + return nil, err + } + // TODO: We should be able to rely on valid input, and not do defaulting here. + var affinityType api.AffinityType = service.Spec.SessionAffinity + if affinityType == "" { + affinityType = api.AffinityTypeNone + } + if len(service.Spec.PublicIPs) > 0 { + for _, publicIP := range service.Spec.PublicIPs { + _, err = balancer.CreateTCPLoadBalancer(service.Name, zone.Region, net.ParseIP(publicIP), service.Spec.Port, hostsFromMinionList(hosts), affinityType) + if err != nil { + // TODO: have to roll-back any successful calls. + return nil, err + } + } + } else { + ip, err := balancer.CreateTCPLoadBalancer(service.Name, zone.Region, nil, service.Spec.Port, hostsFromMinionList(hosts), affinityType) + if err != nil { + return nil, err + } + service.Spec.PublicIPs = []string{ip.String()} + } + } + + if err := rs.registry.CreateService(ctx, service); err != nil { + err = rest.CheckGeneratedNameError(rest.Services, err, service) + return nil, err + } + return rs.registry.GetService(ctx, service.Name) } func hostsFromMinionList(list *api.NodeList) []string { @@ -167,16 +164,14 @@ func hostsFromMinionList(list *api.NodeList) []string { return result } -func (rs *REST) Delete(ctx api.Context, id string) (<-chan apiserver.RESTResult, error) { +func (rs *REST) Delete(ctx api.Context, id string) (runtime.Object, error) { service, err := rs.registry.GetService(ctx, id) if err != nil { return nil, err } rs.portalMgr.Release(net.ParseIP(service.Spec.PortalIP)) - return apiserver.MakeAsync(func() (runtime.Object, error) { - rs.deleteExternalLoadBalancer(service) - return &api.Status{Status: api.StatusSuccess}, rs.registry.DeleteService(ctx, id) - }), nil + rs.deleteExternalLoadBalancer(service) + return &api.Status{Status: api.StatusSuccess}, rs.registry.DeleteService(ctx, id) } func (rs *REST) Get(ctx api.Context, id string) (runtime.Object, error) { @@ -217,30 +212,29 @@ func (*REST) NewList() runtime.Object { return &api.Service{} } -func (rs *REST) Update(ctx api.Context, obj runtime.Object) (<-chan apiserver.RESTResult, error) { +func (rs *REST) Update(ctx api.Context, obj runtime.Object) (runtime.Object, bool, error) { service := obj.(*api.Service) if !api.ValidNamespace(ctx, &service.ObjectMeta) { - return nil, errors.NewConflict("service", service.Namespace, fmt.Errorf("Service.Namespace does not match the provided context")) + return nil, false, errors.NewConflict("service", service.Namespace, fmt.Errorf("Service.Namespace does not match the provided context")) } oldService, err := rs.registry.GetService(ctx, service.Name) if err != nil { - return nil, err + return nil, false, err } // Copy over non-user fields // TODO: make this a merge function if errs := validation.ValidateServiceUpdate(oldService, service); len(errs) > 0 { - return nil, errors.NewInvalid("service", service.Name, errs) + return nil, false, errors.NewInvalid("service", service.Name, errs) } - return apiserver.MakeAsync(func() (runtime.Object, error) { - // TODO: check to see if external load balancer status changed - err = rs.registry.UpdateService(ctx, service) - if err != nil { - return nil, err - } - return rs.registry.GetService(ctx, service.Name) - }), nil + // TODO: check to see if external load balancer status changed + err = rs.registry.UpdateService(ctx, service) + if err != nil { + return nil, false, err + } + out, err := rs.registry.GetService(ctx, service.Name) + return out, false, err } // ResourceLocation returns a URL to which one can send traffic for the specified service. diff --git a/pkg/registry/service/rest_test.go b/pkg/registry/service/rest_test.go index 8d228844734..60a3096015f 100644 --- a/pkg/registry/service/rest_test.go +++ b/pkg/registry/service/rest_test.go @@ -56,9 +56,8 @@ func TestServiceRegistryCreate(t *testing.T) { }, } ctx := api.NewDefaultContext() - c, _ := storage.Create(ctx, svc) - created_svc := <-c - created_service := created_svc.Object.(*api.Service) + created_svc, _ := storage.Create(ctx, svc) + created_service := created_svc.(*api.Service) if !api.HasObjectMetaSystemFieldValues(&created_service.ObjectMeta) { t.Errorf("storage did not populate object meta field values") } @@ -109,7 +108,7 @@ func TestServiceStorageValidatesCreate(t *testing.T) { for _, failureCase := range failureCases { c, err := storage.Create(ctx, &failureCase) if c != nil { - t.Errorf("Expected nil channel") + t.Errorf("Expected nil object") } if !errors.IsInvalid(err) { t.Errorf("Expected to get an invalid resource error, got %v", err) @@ -129,7 +128,7 @@ func TestServiceRegistryUpdate(t *testing.T) { }, }) storage := NewREST(registry, nil, nil, makeIPNet(t)) - c, err := storage.Update(ctx, &api.Service{ + updated_svc, created, err := storage.Update(ctx, &api.Service{ ObjectMeta: api.ObjectMeta{Name: "foo"}, Spec: api.ServiceSpec{ Port: 6502, @@ -141,11 +140,13 @@ func TestServiceRegistryUpdate(t *testing.T) { if err != nil { t.Fatalf("Expected no error: %v", err) } - if c == nil { - t.Errorf("Expected non-nil channel") + if updated_svc == nil { + t.Errorf("Expected non-nil object") } - updated_svc := <-c - updated_service := updated_svc.Object.(*api.Service) + if created { + t.Errorf("expected not created") + } + updated_service := updated_svc.(*api.Service) if updated_service.Name != "foo" { t.Errorf("Expected foo, but got %v", updated_service.Name) } @@ -186,9 +187,9 @@ func TestServiceStorageValidatesUpdate(t *testing.T) { }, } for _, failureCase := range failureCases { - c, err := storage.Update(ctx, &failureCase) - if c != nil { - t.Errorf("Expected nil channel") + c, created, err := storage.Update(ctx, &failureCase) + if c != nil || created { + t.Errorf("Expected nil object or created false") } if !errors.IsInvalid(err) { t.Errorf("Expected to get an invalid resource error, got %v", err) @@ -212,8 +213,7 @@ func TestServiceRegistryExternalService(t *testing.T) { SessionAffinity: api.AffinityTypeNone, }, } - c, _ := storage.Create(ctx, svc) - <-c + storage.Create(ctx, svc) if len(fakeCloud.Calls) != 2 || fakeCloud.Calls[0] != "get-zone" || fakeCloud.Calls[1] != "create" { t.Errorf("Unexpected call(s): %#v", fakeCloud.Calls) } @@ -244,8 +244,7 @@ func TestServiceRegistryExternalServiceError(t *testing.T) { }, } ctx := api.NewDefaultContext() - c, _ := storage.Create(ctx, svc) - <-c + storage.Create(ctx, svc) if len(fakeCloud.Calls) != 1 || fakeCloud.Calls[0] != "get-zone" { t.Errorf("Unexpected call(s): %#v", fakeCloud.Calls) } @@ -269,8 +268,7 @@ func TestServiceRegistryDelete(t *testing.T) { }, } registry.CreateService(ctx, svc) - c, _ := storage.Delete(ctx, svc.Name) - <-c + storage.Delete(ctx, svc.Name) if len(fakeCloud.Calls) != 0 { t.Errorf("Unexpected call(s): %#v", fakeCloud.Calls) } @@ -295,8 +293,7 @@ func TestServiceRegistryDeleteExternal(t *testing.T) { }, } registry.CreateService(ctx, svc) - c, _ := storage.Delete(ctx, svc.Name) - <-c + storage.Delete(ctx, svc.Name) if len(fakeCloud.Calls) != 2 || fakeCloud.Calls[0] != "get-zone" || fakeCloud.Calls[1] != "delete" { t.Errorf("Unexpected call(s): %#v", fakeCloud.Calls) } @@ -413,9 +410,8 @@ func TestServiceRegistryIPAllocation(t *testing.T) { }, } ctx := api.NewDefaultContext() - c1, _ := rest.Create(ctx, svc1) - created_svc1 := <-c1 - created_service_1 := created_svc1.Object.(*api.Service) + created_svc1, _ := rest.Create(ctx, svc1) + created_service_1 := created_svc1.(*api.Service) if created_service_1.Name != "foo" { t.Errorf("Expected foo, but got %v", created_service_1.Name) } @@ -432,9 +428,8 @@ func TestServiceRegistryIPAllocation(t *testing.T) { SessionAffinity: api.AffinityTypeNone, }} ctx = api.NewDefaultContext() - c2, _ := rest.Create(ctx, svc2) - created_svc2 := <-c2 - created_service_2 := created_svc2.Object.(*api.Service) + created_svc2, _ := rest.Create(ctx, svc2) + created_service_2 := created_svc2.(*api.Service) if created_service_2.Name != "bar" { t.Errorf("Expected bar, but got %v", created_service_2.Name) } @@ -453,9 +448,8 @@ func TestServiceRegistryIPAllocation(t *testing.T) { }, } ctx = api.NewDefaultContext() - c3, _ := rest.Create(ctx, svc3) - created_svc3 := <-c3 - created_service_3 := created_svc3.Object.(*api.Service) + created_svc3, _ := rest.Create(ctx, svc3) + created_service_3 := created_svc3.(*api.Service) if created_service_3.Spec.PortalIP != "1.2.3.93" { // specific IP t.Errorf("Unexpected PortalIP: %s", created_service_3.Spec.PortalIP) } @@ -478,9 +472,8 @@ func TestServiceRegistryIPReallocation(t *testing.T) { }, } ctx := api.NewDefaultContext() - c1, _ := rest.Create(ctx, svc1) - created_svc1 := <-c1 - created_service_1 := created_svc1.Object.(*api.Service) + created_svc1, _ := rest.Create(ctx, svc1) + created_service_1 := created_svc1.(*api.Service) if created_service_1.Name != "foo" { t.Errorf("Expected foo, but got %v", created_service_1.Name) } @@ -488,8 +481,7 @@ func TestServiceRegistryIPReallocation(t *testing.T) { t.Errorf("Unexpected PortalIP: %s", created_service_1.Spec.PortalIP) } - c, _ := rest.Delete(ctx, created_service_1.Name) - <-c + rest.Delete(ctx, created_service_1.Name) svc2 := &api.Service{ ObjectMeta: api.ObjectMeta{Name: "bar"}, @@ -501,9 +493,8 @@ func TestServiceRegistryIPReallocation(t *testing.T) { }, } ctx = api.NewDefaultContext() - c2, _ := rest.Create(ctx, svc2) - created_svc2 := <-c2 - created_service_2 := created_svc2.Object.(*api.Service) + created_svc2, _ := rest.Create(ctx, svc2) + created_service_2 := created_svc2.(*api.Service) if created_service_2.Name != "bar" { t.Errorf("Expected bar, but got %v", created_service_2.Name) } @@ -529,9 +520,8 @@ func TestServiceRegistryIPUpdate(t *testing.T) { }, } ctx := api.NewDefaultContext() - c, _ := rest.Create(ctx, svc) - created_svc := <-c - created_service := created_svc.Object.(*api.Service) + created_svc, _ := rest.Create(ctx, svc) + created_service := created_svc.(*api.Service) if created_service.Spec.Port != 6502 { t.Errorf("Expected port 6502, but got %v", created_service.Spec.Port) } @@ -543,9 +533,8 @@ func TestServiceRegistryIPUpdate(t *testing.T) { *update = *created_service update.Spec.Port = 6503 - c, _ = rest.Update(ctx, update) - updated_svc := <-c - updated_service := updated_svc.Object.(*api.Service) + updated_svc, _, _ := rest.Update(ctx, update) + updated_service := updated_svc.(*api.Service) if updated_service.Spec.Port != 6503 { t.Errorf("Expected port 6503, but got %v", updated_service.Spec.Port) } @@ -554,7 +543,7 @@ func TestServiceRegistryIPUpdate(t *testing.T) { update.Spec.Port = 6503 update.Spec.PortalIP = "1.2.3.76" // error - _, err := rest.Update(ctx, update) + _, _, err := rest.Update(ctx, update) if err == nil || !errors.IsInvalid(err) { t.Error("Unexpected error type: %v", err) } @@ -578,9 +567,8 @@ func TestServiceRegistryIPExternalLoadBalancer(t *testing.T) { }, } ctx := api.NewDefaultContext() - c, _ := rest.Create(ctx, svc) - created_svc := <-c - created_service := created_svc.Object.(*api.Service) + created_svc, _ := rest.Create(ctx, svc) + created_service := created_svc.(*api.Service) if created_service.Spec.Port != 6502 { t.Errorf("Expected port 6502, but got %v", created_service.Spec.Port) } @@ -591,7 +579,7 @@ func TestServiceRegistryIPExternalLoadBalancer(t *testing.T) { update := new(api.Service) *update = *created_service - _, err := rest.Update(ctx, update) + _, _, err := rest.Update(ctx, update) if err != nil { t.Errorf("Unexpected error %v", err) } @@ -614,8 +602,7 @@ func TestServiceRegistryIPReloadFromStorage(t *testing.T) { }, } ctx := api.NewDefaultContext() - c, _ := rest1.Create(ctx, svc) - <-c + rest1.Create(ctx, svc) svc = &api.Service{ ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: api.NamespaceDefault}, Spec: api.ServiceSpec{ @@ -625,8 +612,7 @@ func TestServiceRegistryIPReloadFromStorage(t *testing.T) { SessionAffinity: api.AffinityTypeNone, }, } - c, _ = rest1.Create(ctx, svc) - <-c + rest1.Create(ctx, svc) // This will reload from storage, finding the previous 2 rest2 := NewREST(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{}), makeIPNet(t)) @@ -641,9 +627,8 @@ func TestServiceRegistryIPReloadFromStorage(t *testing.T) { SessionAffinity: api.AffinityTypeNone, }, } - c, _ = rest2.Create(ctx, svc) - created_svc := <-c - created_service := created_svc.Object.(*api.Service) + created_svc, _ := rest2.Create(ctx, svc) + created_service := created_svc.(*api.Service) if created_service.Spec.PortalIP != "1.2.3.3" { t.Errorf("Unexpected PortalIP: %s", created_service.Spec.PortalIP) } @@ -657,9 +642,9 @@ func TestCreateServiceWithConflictingNamespace(t *testing.T) { } ctx := api.NewDefaultContext() - channel, err := storage.Create(ctx, service) - if channel != nil { - t.Error("Expected a nil channel, but we got a value") + obj, err := storage.Create(ctx, service) + if obj != nil { + t.Error("Expected a nil object, but we got a value") } if err == nil { t.Errorf("Expected an error, but we didn't get one") @@ -675,9 +660,9 @@ func TestUpdateServiceWithConflictingNamespace(t *testing.T) { } ctx := api.NewDefaultContext() - channel, err := storage.Update(ctx, service) - if channel != nil { - t.Error("Expected a nil channel, but we got a value") + obj, created, err := storage.Update(ctx, service) + if obj != nil || created { + t.Error("Expected a nil object, but we got a value or created was true") } if err == nil { t.Errorf("Expected an error, but we didn't get one") diff --git a/test/integration/auth_test.go b/test/integration/auth_test.go index 63b66e51a08..4f761589bd9 100644 --- a/test/integration/auth_test.go +++ b/test/integration/auth_test.go @@ -175,6 +175,7 @@ var timeoutFlag = "?timeout=60s" // Requests to try. Each one should be forbidden or not forbidden // depending on the authentication and authorization setup of the master. var code200 = map[int]bool{200: true} +var code201 = map[int]bool{201: true} var code400 = map[int]bool{400: true} var code403 = map[int]bool{403: true} var code404 = map[int]bool{404: true} @@ -197,7 +198,7 @@ func getTestRequests() []struct { }{ // Normal methods on pods {"GET", "/api/v1beta1/pods", "", code200}, - {"POST", "/api/v1beta1/pods" + timeoutFlag, aPod, code200}, + {"POST", "/api/v1beta1/pods" + timeoutFlag, aPod, code201}, {"PUT", "/api/v1beta1/pods/a" + timeoutFlag, aPod, code200}, {"GET", "/api/v1beta1/pods", "", code200}, {"GET", "/api/v1beta1/pods/a", "", code200}, @@ -217,7 +218,7 @@ func getTestRequests() []struct { // Normal methods on services {"GET", "/api/v1beta1/services", "", code200}, - {"POST", "/api/v1beta1/services" + timeoutFlag, aService, code200}, + {"POST", "/api/v1beta1/services" + timeoutFlag, aService, code201}, {"PUT", "/api/v1beta1/services/a" + timeoutFlag, aService, code200}, {"GET", "/api/v1beta1/services", "", code200}, {"GET", "/api/v1beta1/services/a", "", code200}, @@ -225,7 +226,7 @@ func getTestRequests() []struct { // Normal methods on replicationControllers {"GET", "/api/v1beta1/replicationControllers", "", code200}, - {"POST", "/api/v1beta1/replicationControllers" + timeoutFlag, aRC, code200}, + {"POST", "/api/v1beta1/replicationControllers" + timeoutFlag, aRC, code201}, {"PUT", "/api/v1beta1/replicationControllers/a" + timeoutFlag, aRC, code200}, {"GET", "/api/v1beta1/replicationControllers", "", code200}, {"GET", "/api/v1beta1/replicationControllers/a", "", code200}, @@ -233,7 +234,7 @@ func getTestRequests() []struct { // Normal methods on endpoints {"GET", "/api/v1beta1/endpoints", "", code200}, - {"POST", "/api/v1beta1/endpoints" + timeoutFlag, aEndpoints, code200}, + {"POST", "/api/v1beta1/endpoints" + timeoutFlag, aEndpoints, code201}, {"PUT", "/api/v1beta1/endpoints/a" + timeoutFlag, aEndpoints, code200}, {"GET", "/api/v1beta1/endpoints", "", code200}, {"GET", "/api/v1beta1/endpoints/a", "", code200}, @@ -241,7 +242,7 @@ func getTestRequests() []struct { // Normal methods on minions {"GET", "/api/v1beta1/minions", "", code200}, - {"POST", "/api/v1beta1/minions" + timeoutFlag, aMinion, code200}, + {"POST", "/api/v1beta1/minions" + timeoutFlag, aMinion, code201}, {"PUT", "/api/v1beta1/minions/a" + timeoutFlag, aMinion, code409}, // See #2115 about why 409 {"GET", "/api/v1beta1/minions", "", code200}, {"GET", "/api/v1beta1/minions/a", "", code200}, @@ -249,7 +250,7 @@ func getTestRequests() []struct { // Normal methods on events {"GET", "/api/v1beta1/events", "", code200}, - {"POST", "/api/v1beta1/events" + timeoutFlag, aEvent, code200}, + {"POST", "/api/v1beta1/events" + timeoutFlag, aEvent, code201}, {"PUT", "/api/v1beta1/events/a" + timeoutFlag, aEvent, code200}, {"GET", "/api/v1beta1/events", "", code200}, {"GET", "/api/v1beta1/events", "", code200}, @@ -258,8 +259,8 @@ func getTestRequests() []struct { // Normal methods on bindings {"GET", "/api/v1beta1/bindings", "", code405}, // Bindings are write-only - {"POST", "/api/v1beta1/pods" + timeoutFlag, aPod, code200}, // Need a pod to bind or you get a 404 - {"POST", "/api/v1beta1/bindings" + timeoutFlag, aBinding, code200}, + {"POST", "/api/v1beta1/pods" + timeoutFlag, aPod, code201}, // Need a pod to bind or you get a 404 + {"POST", "/api/v1beta1/bindings" + timeoutFlag, aBinding, code201}, {"PUT", "/api/v1beta1/bindings/a" + timeoutFlag, aBinding, code404}, {"GET", "/api/v1beta1/bindings", "", code405}, {"GET", "/api/v1beta1/bindings/a", "", code404}, // No bindings instances @@ -727,7 +728,7 @@ func TestNamespaceAuthorization(t *testing.T) { statusCodes map[int]bool // allowed status codes. }{ - {"POST", "/api/v1beta1/pods" + timeoutFlag + "&namespace=foo", "foo", aPod, code200}, + {"POST", "/api/v1beta1/pods" + timeoutFlag + "&namespace=foo", "foo", aPod, code201}, {"GET", "/api/v1beta1/pods?namespace=foo", "foo", "", code200}, {"GET", "/api/v1beta1/pods/a?namespace=foo", "foo", "", code200}, {"DELETE", "/api/v1beta1/pods/a" + timeoutFlag + "&namespace=foo", "foo", "", code200}, @@ -838,7 +839,7 @@ func TestKindAuthorization(t *testing.T) { body string statusCodes map[int]bool // allowed status codes. }{ - {"POST", "/api/v1beta1/services" + timeoutFlag, aService, code200}, + {"POST", "/api/v1beta1/services" + timeoutFlag, aService, code201}, {"GET", "/api/v1beta1/services", "", code200}, {"GET", "/api/v1beta1/services/a", "", code200}, {"DELETE", "/api/v1beta1/services/a" + timeoutFlag, "", code200}, From 8f6e3607a941c323ddb44bb01f4da18c1b4ae279 Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Wed, 11 Feb 2015 17:07:23 -0500 Subject: [PATCH 4/5] Add monitoring as a filter --- pkg/apiserver/api_installer.go | 20 +++++++++++++++----- pkg/apiserver/apiserver.go | 9 +++++++++ 2 files changed, 24 insertions(+), 5 deletions(-) diff --git a/pkg/apiserver/api_installer.go b/pkg/apiserver/api_installer.go index 7ae84677fd0..466387761f1 100644 --- a/pkg/apiserver/api_installer.go +++ b/pkg/apiserver/api_installer.go @@ -306,9 +306,11 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage RESTStorage // test/integration/auth_test.go is currently the most comprehensive status code test for _, action := range actions { + m := monitorFilter(action.Verb, resource) switch action.Verb { case "GET": // Get a resource. route := ws.GET(action.Path).To(GetResource(getter, nameFn, linkFn, codec)). + Filter(m). Doc("read the specified " + kind). Operation("read" + kind). Writes(versionedObject) @@ -316,6 +318,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage RESTStorage ws.Route(route) case "LIST": // List all resources of a kind. route := ws.GET(action.Path).To(ListResource(lister, namespaceFn, linkFn, codec)). + Filter(m). Doc("list objects of kind " + kind). Operation("list" + kind). Writes(versionedList) @@ -323,6 +326,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage RESTStorage ws.Route(route) case "PUT": // Update a resource. route := ws.PUT(action.Path).To(UpdateResource(updater, nameFn, objNameFn, linkFn, codec, resource, admit)). + Filter(m). Doc("update the specified " + kind). Operation("update" + kind). Reads(versionedObject) @@ -330,6 +334,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage RESTStorage ws.Route(route) case "POST": // Create a resource. route := ws.POST(action.Path).To(CreateResource(creater, namespaceFn, linkFn, codec, resource, admit)). + Filter(m). Doc("create a " + kind). Operation("create" + kind). Reads(versionedObject) @@ -337,12 +342,14 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage RESTStorage ws.Route(route) case "DELETE": // Delete a resource. route := ws.DELETE(action.Path).To(DeleteResource(deleter, nameFn, linkFn, codec, resource, kind, admit)). + Filter(m). Doc("delete a " + kind). Operation("delete" + kind) addParams(route, action.Params) ws.Route(route) case "WATCH": // Watch a resource. route := ws.GET(action.Path).To(restfulStripPrefix(a.prefix+"/watch", watchHandler)). + Filter(m). Doc("watch a particular " + kind). Operation("watch" + kind). Writes(versionedObject) @@ -350,6 +357,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage RESTStorage ws.Route(route) case "WATCHLIST": // Watch all resources of a kind. route := ws.GET(action.Path).To(restfulStripPrefix(a.prefix+"/watch", watchHandler)). + Filter(m). Doc("watch a list of " + kind). Operation("watch" + kind + "list"). Writes(versionedList) @@ -357,6 +365,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage RESTStorage ws.Route(route) case "REDIRECT": // Get the redirect URL for a resource. route := ws.GET(action.Path).To(restfulStripPrefix(a.prefix+"/redirect", redirectHandler)). + Filter(m). Doc("redirect GET request to " + kind). Operation("redirect" + kind). Produces("*/*"). @@ -365,10 +374,10 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage RESTStorage ws.Route(route) case "PROXY": // Proxy requests to a resource. // Accept all methods as per https://github.com/GoogleCloudPlatform/kubernetes/issues/3996 - addProxyRoute(ws, "GET", a.prefix, action.Path, proxyHandler, kind, action.Params) - addProxyRoute(ws, "PUT", a.prefix, action.Path, proxyHandler, kind, action.Params) - addProxyRoute(ws, "POST", a.prefix, action.Path, proxyHandler, kind, action.Params) - addProxyRoute(ws, "DELETE", a.prefix, action.Path, proxyHandler, kind, action.Params) + addProxyRoute(ws, "GET", a.prefix, action.Path, proxyHandler, kind, resource, action.Params) + addProxyRoute(ws, "PUT", a.prefix, action.Path, proxyHandler, kind, resource, action.Params) + addProxyRoute(ws, "POST", a.prefix, action.Path, proxyHandler, kind, resource, action.Params) + addProxyRoute(ws, "DELETE", a.prefix, action.Path, proxyHandler, kind, resource, action.Params) default: return fmt.Errorf("unrecognized action verb: %s", action.Verb) } @@ -396,8 +405,9 @@ func restfulStripPrefix(prefix string, handler http.Handler) restful.RouteFuncti } } -func addProxyRoute(ws *restful.WebService, method string, prefix string, path string, proxyHandler http.Handler, kind string, params []*restful.Parameter) { +func addProxyRoute(ws *restful.WebService, method string, prefix string, path string, proxyHandler http.Handler, kind, resource string, params []*restful.Parameter) { proxyRoute := ws.Method(method).Path(path).To(restfulStripPrefix(prefix+"/proxy", proxyHandler)). + Filter(monitorFilter("PROXY", resource)). Doc("proxy " + method + " requests to " + kind). Operation("proxy" + method + kind). Produces("*/*"). diff --git a/pkg/apiserver/apiserver.go b/pkg/apiserver/apiserver.go index 682634ca034..68db292b60a 100644 --- a/pkg/apiserver/apiserver.go +++ b/pkg/apiserver/apiserver.go @@ -73,6 +73,15 @@ func monitor(handler, verb, resource string, httpCode int, reqStart time.Time) { requestLatencies.WithLabelValues(handler, verb).Observe(float64((time.Since(reqStart)) / time.Microsecond)) } +// monitorFilter creates a filter that reports the metrics for a given resource and action. +func monitorFilter(action, resource string) restful.FilterFunction { + return func(req *restful.Request, res *restful.Response, chain *restful.FilterChain) { + reqStart := time.Now() + chain.ProcessFilter(req, res) + monitor("rest", action, resource, res.StatusCode(), reqStart) + } +} + // mux is an object that can register http handlers. type Mux interface { Handle(pattern string, handler http.Handler) From e6fdac13651922db320a2f7276a294cc4eb6e5ac Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Thu, 12 Feb 2015 09:45:13 -0500 Subject: [PATCH 5/5] Return an error when names are empty --- pkg/apiserver/api_installer.go | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/pkg/apiserver/api_installer.go b/pkg/apiserver/api_installer.go index 466387761f1..fc3c3b91574 100644 --- a/pkg/apiserver/api_installer.go +++ b/pkg/apiserver/api_installer.go @@ -44,6 +44,9 @@ type action struct { Params []*restful.Parameter // List of parameters associated with the action. } +// errEmptyName is returned when API requests do not fill the name section of the path. +var errEmptyName = fmt.Errorf("name must be provided") + // Installs handlers for API resources. func (a *APIInstaller) Install() (ws *restful.WebService, errors []error) { errors = make([]error, 0) @@ -161,6 +164,9 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage RESTStorage if scope.Name() != meta.RESTScopeNameNamespace { objNameFn = func(obj runtime.Object) (namespace, name string, err error) { name, err = linker.Name(obj) + if len(name) == 0 { + err = errEmptyName + } return } @@ -176,6 +182,9 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage RESTStorage } nameFn = func(req *restful.Request) (namespace, name string, err error) { name = req.PathParameter("name") + if len(name) == 0 { + err = errEmptyName + } return } generateLinkFn = func(namespace, name string) (path string, query string) { @@ -223,6 +232,9 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage RESTStorage nameFn = func(req *restful.Request) (namespace, name string, err error) { namespace, _ = namespaceFn(req) name = req.PathParameter("name") + if len(name) == 0 { + err = errEmptyName + } return } generateLinkFn = func(namespace, name string) (path string, query string) { @@ -265,6 +277,9 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage RESTStorage nameFn = func(req *restful.Request) (namespace, name string, err error) { namespace, _ = namespaceFn(req) name = req.PathParameter("name") + if len(name) == 0 { + err = errEmptyName + } return } generateLinkFn = func(namespace, name string) (path string, query string) {