From b911d9265ad4ae4f52c7ffa5ee0506e5f8fe0596 Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Tue, 29 Jul 2014 17:35:09 -0400 Subject: [PATCH] Split watch --- pkg/apiserver/apiserver.go | 87 ------------------- pkg/apiserver/apiserver_test.go | 115 -------------------------- pkg/apiserver/watch.go | 113 +++++++++++++++++++++++++ pkg/apiserver/watch_test.go | 142 ++++++++++++++++++++++++++++++++ 4 files changed, 255 insertions(+), 202 deletions(-) create mode 100644 pkg/apiserver/watch.go create mode 100644 pkg/apiserver/watch_test.go diff --git a/pkg/apiserver/apiserver.go b/pkg/apiserver/apiserver.go index a5986bbe919..763a33de8b6 100644 --- a/pkg/apiserver/apiserver.go +++ b/pkg/apiserver/apiserver.go @@ -18,7 +18,6 @@ package apiserver import ( "bytes" - "encoding/json" "fmt" "io/ioutil" "net" @@ -632,89 +631,3 @@ func (server *APIServer) handleWatch(w http.ResponseWriter, req *http.Request) { server.notFound(w, req) } - -// WatchServer serves a watch.Interface over a websocket or vanilla HTTP. -type WatchServer struct { - watching watch.Interface -} - -// HandleWS implements a websocket handler. -func (w *WatchServer) HandleWS(ws *websocket.Conn) { - done := make(chan struct{}) - go func() { - var unused interface{} - // Expect this to block until the connection is closed. Client should not - // send anything. - websocket.JSON.Receive(ws, &unused) - close(done) - }() - for { - select { - case <-done: - w.watching.Stop() - return - case event, ok := <-w.watching.ResultChan(): - if !ok { - // End of results. - return - } - err := websocket.JSON.Send(ws, &api.WatchEvent{ - Type: event.Type, - Object: api.APIObject{event.Object}, - }) - if err != nil { - // Client disconnect. - w.watching.Stop() - return - } - } - } -} - -// ServeHTTP serves a series of JSON encoded events via straight HTTP with -// Transfer-Encoding: chunked. -func (self *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { - loggedW := httplog.LogOf(w) - w = httplog.Unlogged(w) - - cn, ok := w.(http.CloseNotifier) - if !ok { - loggedW.Addf("unable to get CloseNotifier") - http.NotFound(loggedW, req) - return - } - flusher, ok := w.(http.Flusher) - if !ok { - loggedW.Addf("unable to get Flusher") - http.NotFound(loggedW, req) - return - } - - loggedW.Header().Set("Transfer-Encoding", "chunked") - loggedW.WriteHeader(http.StatusOK) - flusher.Flush() - - encoder := json.NewEncoder(w) - for { - select { - case <-cn.CloseNotify(): - self.watching.Stop() - return - case event, ok := <-self.watching.ResultChan(): - if !ok { - // End of results. - return - } - err := encoder.Encode(&api.WatchEvent{ - Type: event.Type, - Object: api.APIObject{event.Object}, - }) - if err != nil { - // Client disconnect. - self.watching.Stop() - return - } - flusher.Flush() - } - } -} diff --git a/pkg/apiserver/apiserver_test.go b/pkg/apiserver/apiserver_test.go index e2d57840868..042421effd5 100644 --- a/pkg/apiserver/apiserver_test.go +++ b/pkg/apiserver/apiserver_test.go @@ -18,7 +18,6 @@ package apiserver import ( "bytes" - "encoding/json" "fmt" "io/ioutil" "log" @@ -31,7 +30,6 @@ import ( "testing" "time" - "code.google.com/p/go.net/websocket" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" @@ -575,119 +573,6 @@ func TestOpGet(t *testing.T) { t.Errorf("Unexpected response %#v", response) } } - -var watchTestTable = []struct { - t watch.EventType - obj interface{} -}{ - {watch.Added, &Simple{Name: "A Name"}}, - {watch.Modified, &Simple{Name: "Another Name"}}, - {watch.Deleted, &Simple{Name: "Another Name"}}, -} - -func TestWatchWebsocket(t *testing.T) { - simpleStorage := &SimpleRESTStorage{} - handler := New(map[string]RESTStorage{ - "foo": simpleStorage, - }, "/prefix/version") - server := httptest.NewServer(handler) - - dest, _ := url.Parse(server.URL) - dest.Scheme = "ws" // Required by websocket, though the server never sees it. - dest.Path = "/prefix/version/watch/foo" - dest.RawQuery = "id=myID" - - ws, err := websocket.Dial(dest.String(), "", "http://localhost") - expectNoError(t, err) - - if a, e := simpleStorage.requestedID, "myID"; a != e { - t.Fatalf("Expected %v, got %v", e, a) - } - - try := func(action watch.EventType, object interface{}) { - // Send - simpleStorage.fakeWatch.Action(action, object) - // Test receive - var got api.WatchEvent - err := websocket.JSON.Receive(ws, &got) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - if got.Type != action { - t.Errorf("Unexpected type: %v", got.Type) - } - if e, a := object, got.Object.Object; !reflect.DeepEqual(e, a) { - t.Errorf("Expected %v, got %v", e, a) - } - } - - for _, item := range watchTestTable { - try(item.t, item.obj) - } - simpleStorage.fakeWatch.Stop() - - var got api.WatchEvent - err = websocket.JSON.Receive(ws, &got) - if err == nil { - t.Errorf("Unexpected non-error") - } -} - -func TestWatchHTTP(t *testing.T) { - simpleStorage := &SimpleRESTStorage{} - handler := New(map[string]RESTStorage{ - "foo": simpleStorage, - }, "/prefix/version") - server := httptest.NewServer(handler) - client := http.Client{} - - dest, _ := url.Parse(server.URL) - dest.Path = "/prefix/version/watch/foo" - dest.RawQuery = "id=myID" - - request, err := http.NewRequest("GET", dest.String(), nil) - expectNoError(t, err) - response, err := client.Do(request) - expectNoError(t, err) - if response.StatusCode != http.StatusOK { - t.Errorf("Unexpected response %#v", response) - } - - if a, e := simpleStorage.requestedID, "myID"; a != e { - t.Fatalf("Expected %v, got %v", e, a) - } - - decoder := json.NewDecoder(response.Body) - - try := func(action watch.EventType, object interface{}) { - // Send - simpleStorage.fakeWatch.Action(action, object) - // Test receive - var got api.WatchEvent - err := decoder.Decode(&got) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - if got.Type != action { - t.Errorf("Unexpected type: %v", got.Type) - } - if e, a := object, got.Object.Object; !reflect.DeepEqual(e, a) { - t.Errorf("Expected %v, got %v", e, a) - } - } - - for _, item := range watchTestTable { - try(item.t, item.obj) - } - simpleStorage.fakeWatch.Stop() - - var got api.WatchEvent - err = decoder.Decode(&got) - if err == nil { - t.Errorf("Unexpected non-error") - } -} - func TestMinionTransport(t *testing.T) { content := string(`
kubelet.loggoogle.log
`) transport := &minionTransport{} diff --git a/pkg/apiserver/watch.go b/pkg/apiserver/watch.go new file mode 100644 index 00000000000..3218a54380d --- /dev/null +++ b/pkg/apiserver/watch.go @@ -0,0 +1,113 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package apiserver + +import ( + "encoding/json" + "net/http" + + "code.google.com/p/go.net/websocket" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/httplog" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" +) + +// WatchServer serves a watch.Interface over a websocket or vanilla HTTP. +type WatchServer struct { + watching watch.Interface +} + +// HandleWS implements a websocket handler. +func (w *WatchServer) HandleWS(ws *websocket.Conn) { + done := make(chan struct{}) + go func() { + var unused interface{} + // Expect this to block until the connection is closed. Client should not + // send anything. + websocket.JSON.Receive(ws, &unused) + close(done) + }() + for { + select { + case <-done: + w.watching.Stop() + return + case event, ok := <-w.watching.ResultChan(): + if !ok { + // End of results. + return + } + err := websocket.JSON.Send(ws, &api.WatchEvent{ + Type: event.Type, + Object: api.APIObject{event.Object}, + }) + if err != nil { + // Client disconnect. + w.watching.Stop() + return + } + } + } +} + +// ServeHTTP serves a series of JSON encoded events via straight HTTP with +// Transfer-Encoding: chunked. +func (self *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { + loggedW := httplog.LogOf(w) + w = httplog.Unlogged(w) + + cn, ok := w.(http.CloseNotifier) + if !ok { + loggedW.Addf("unable to get CloseNotifier") + http.NotFound(loggedW, req) + return + } + flusher, ok := w.(http.Flusher) + if !ok { + loggedW.Addf("unable to get Flusher") + http.NotFound(loggedW, req) + return + } + + loggedW.Header().Set("Transfer-Encoding", "chunked") + loggedW.WriteHeader(http.StatusOK) + flusher.Flush() + + encoder := json.NewEncoder(w) + for { + select { + case <-cn.CloseNotify(): + self.watching.Stop() + return + case event, ok := <-self.watching.ResultChan(): + if !ok { + // End of results. + return + } + err := encoder.Encode(&api.WatchEvent{ + Type: event.Type, + Object: api.APIObject{event.Object}, + }) + if err != nil { + // Client disconnect. + self.watching.Stop() + return + } + flusher.Flush() + } + } +} diff --git a/pkg/apiserver/watch_test.go b/pkg/apiserver/watch_test.go new file mode 100644 index 00000000000..09fdcfa6241 --- /dev/null +++ b/pkg/apiserver/watch_test.go @@ -0,0 +1,142 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package apiserver + +import ( + "encoding/json" + "net/http" + "net/http/httptest" + "net/url" + "reflect" + "testing" + + "code.google.com/p/go.net/websocket" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" +) + +var watchTestTable = []struct { + t watch.EventType + obj interface{} +}{ + {watch.Added, &Simple{Name: "A Name"}}, + {watch.Modified, &Simple{Name: "Another Name"}}, + {watch.Deleted, &Simple{Name: "Another Name"}}, +} + +func TestWatchWebsocket(t *testing.T) { + simpleStorage := &SimpleRESTStorage{} + handler := New(map[string]RESTStorage{ + "foo": simpleStorage, + }, "/prefix/version") + server := httptest.NewServer(handler) + + dest, _ := url.Parse(server.URL) + dest.Scheme = "ws" // Required by websocket, though the server never sees it. + dest.Path = "/prefix/version/watch/foo" + dest.RawQuery = "id=myID" + + ws, err := websocket.Dial(dest.String(), "", "http://localhost") + expectNoError(t, err) + + if a, e := simpleStorage.requestedID, "myID"; a != e { + t.Fatalf("Expected %v, got %v", e, a) + } + + try := func(action watch.EventType, object interface{}) { + // Send + simpleStorage.fakeWatch.Action(action, object) + // Test receive + var got api.WatchEvent + err := websocket.JSON.Receive(ws, &got) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if got.Type != action { + t.Errorf("Unexpected type: %v", got.Type) + } + if e, a := object, got.Object.Object; !reflect.DeepEqual(e, a) { + t.Errorf("Expected %v, got %v", e, a) + } + } + + for _, item := range watchTestTable { + try(item.t, item.obj) + } + simpleStorage.fakeWatch.Stop() + + var got api.WatchEvent + err = websocket.JSON.Receive(ws, &got) + if err == nil { + t.Errorf("Unexpected non-error") + } +} + +func TestWatchHTTP(t *testing.T) { + simpleStorage := &SimpleRESTStorage{} + handler := New(map[string]RESTStorage{ + "foo": simpleStorage, + }, "/prefix/version") + server := httptest.NewServer(handler) + client := http.Client{} + + dest, _ := url.Parse(server.URL) + dest.Path = "/prefix/version/watch/foo" + dest.RawQuery = "id=myID" + + request, err := http.NewRequest("GET", dest.String(), nil) + expectNoError(t, err) + response, err := client.Do(request) + expectNoError(t, err) + if response.StatusCode != http.StatusOK { + t.Errorf("Unexpected response %#v", response) + } + + if a, e := simpleStorage.requestedID, "myID"; a != e { + t.Fatalf("Expected %v, got %v", e, a) + } + + decoder := json.NewDecoder(response.Body) + + try := func(action watch.EventType, object interface{}) { + // Send + simpleStorage.fakeWatch.Action(action, object) + // Test receive + var got api.WatchEvent + err := decoder.Decode(&got) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if got.Type != action { + t.Errorf("Unexpected type: %v", got.Type) + } + if e, a := object, got.Object.Object; !reflect.DeepEqual(e, a) { + t.Errorf("Expected %v, got %v", e, a) + } + } + + for _, item := range watchTestTable { + try(item.t, item.obj) + } + simpleStorage.fakeWatch.Stop() + + var got api.WatchEvent + err = decoder.Decode(&got) + if err == nil { + t.Errorf("Unexpected non-error") + } +}