diff --git a/pkg/api/register.go b/pkg/api/register.go index 64d9ee529e0..067ee837f20 100644 --- a/pkg/api/register.go +++ b/pkg/api/register.go @@ -58,6 +58,7 @@ func init() { &PersistentVolumeClaimList{}, &DeleteOptions{}, &ListOptions{}, + &PodLogOptions{}, ) // Legacy names are supported Scheme.AddKnownTypeWithName("", "Minion", &Node{}) @@ -97,3 +98,4 @@ func (*PersistentVolumeClaim) IsAnAPIObject() {} func (*PersistentVolumeClaimList) IsAnAPIObject() {} func (*DeleteOptions) IsAnAPIObject() {} func (*ListOptions) IsAnAPIObject() {} +func (*PodLogOptions) IsAnAPIObject() {} diff --git a/pkg/api/rest/rest.go b/pkg/api/rest/rest.go index 14e434c7749..65ee1c81b25 100644 --- a/pkg/api/rest/rest.go +++ b/pkg/api/rest/rest.go @@ -28,7 +28,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" ) -// RESTStorage is a generic interface for RESTful storage services. +// Storage is a generic interface for RESTful storage services. // Resources which are exported to the RESTful API of apiserver need to implement this interface. It is expected // that objects may implement any of the below interfaces. type Storage interface { @@ -54,6 +54,21 @@ type Getter interface { Get(ctx api.Context, name string) (runtime.Object, error) } +// GetterWithOptions is an object that retrieve a named RESTful resource and takes +// additional options on the get request +type GetterWithOptions interface { + // Get finds a resource in the storage by name and returns it. + // Although it can return an arbitrary error value, IsNotFound(err) is true for the + // returned error value err when the specified resource is not found. + // The options object passed to it is of the same type returned by the NewGetOptions + // method. + Get(ctx api.Context, name string, options runtime.Object) (runtime.Object, error) + + // NewGetOptions returns an empty options object that will be used to pass + // options to the Get method. + NewGetOptions() runtime.Object +} + // Deleter is an object that can delete a named RESTful resource. type Deleter interface { // Delete finds a resource in the storage and deletes it. @@ -119,6 +134,7 @@ type CreaterUpdater interface { // CreaterUpdater must satisfy the Updater interface. var _ Updater = CreaterUpdater(nil) +// Patcher is a storage object that supports both get and update. type Patcher interface { Getter Updater @@ -153,11 +169,12 @@ type Redirector interface { // ResourceStreamer is an interface implemented by objects that prefer to be streamed from the server // instead of decoded directly. type ResourceStreamer interface { - // InputStream should return an io.Reader if the provided object supports streaming. The desired + // InputStream should return an io.ReadCloser if the provided object supports streaming. The desired // api version and a accept header (may be empty) are passed to the call. If no error occurs, - // the caller may return a content type string with the reader that indicates the type of the - // stream. - InputStream(apiVersion, acceptHeader string) (io.ReadCloser, string, error) + // the caller may return a flag indicating whether the result should be flushed as writes occur + // and a content type string that indicates the type of the stream. + // If a null stream is returned, a StatusNoContent response wil be generated. + InputStream(apiVersion, acceptHeader string) (stream io.ReadCloser, flush bool, mimeType string, err error) } // StorageMetadata is an optional interface that callers can implement to provide additional diff --git a/pkg/api/types.go b/pkg/api/types.go index c2b603ec247..6202b83a6e9 100644 --- a/pkg/api/types.go +++ b/pkg/api/types.go @@ -1249,6 +1249,17 @@ type ListOptions struct { ResourceVersion string } +// PodLogOptions is the query options for a Pod's logs REST call +type PodLogOptions struct { + TypeMeta + + // Container for which to return logs + Container string + + // If true, follow the logs for the pod + Follow bool +} + // Status is a return value for calls that don't return other objects. // TODO: this could go in apiserver, but I'm including it here so clients needn't // import both. diff --git a/pkg/api/v1beta1/register.go b/pkg/api/v1beta1/register.go index a0b9fb38876..42d8be4dca3 100644 --- a/pkg/api/v1beta1/register.go +++ b/pkg/api/v1beta1/register.go @@ -65,6 +65,7 @@ func init() { &PersistentVolumeClaimList{}, &DeleteOptions{}, &ListOptions{}, + &PodLogOptions{}, ) // Future names are supported api.Scheme.AddKnownTypeWithName("v1beta1", "Node", &Minion{}) @@ -104,3 +105,4 @@ func (*PersistentVolumeClaim) IsAnAPIObject() {} func (*PersistentVolumeClaimList) IsAnAPIObject() {} func (*DeleteOptions) IsAnAPIObject() {} func (*ListOptions) IsAnAPIObject() {} +func (*PodLogOptions) IsAnAPIObject() {} diff --git a/pkg/api/v1beta1/types.go b/pkg/api/v1beta1/types.go index e6d4fd33174..ed7d8559d18 100644 --- a/pkg/api/v1beta1/types.go +++ b/pkg/api/v1beta1/types.go @@ -1104,6 +1104,17 @@ type ListOptions struct { ResourceVersion string `json:"resourceVersion" description:"when specified with a watch call, shows changes that occur after that particular version of a resource; defaults to changes from the beginning of history"` } +// PodLogOptions is the query options for a Pod's logs REST call +type PodLogOptions struct { + TypeMeta `json:",inline"` + + // Container for which to return logs + Container string `json:"container,omitempty" description:"the container for which to stream logs; defaults to only container if there is one container in the pod"` + + // If true, follow the logs for the pod + Follow bool `json:"follow,omitempty" description:"follow the log stream of the pod; defaults to false"` +} + // Status is a return value for calls that don't return other objects. // TODO: this could go in apiserver, but I'm including it here so clients needn't // import both. diff --git a/pkg/api/v1beta2/register.go b/pkg/api/v1beta2/register.go index ad69988caff..ccd8f064772 100644 --- a/pkg/api/v1beta2/register.go +++ b/pkg/api/v1beta2/register.go @@ -65,6 +65,7 @@ func init() { &PersistentVolumeClaimList{}, &DeleteOptions{}, &ListOptions{}, + &PodLogOptions{}, ) // Future names are supported api.Scheme.AddKnownTypeWithName("v1beta2", "Node", &Minion{}) @@ -104,3 +105,4 @@ func (*PersistentVolumeClaim) IsAnAPIObject() {} func (*PersistentVolumeClaimList) IsAnAPIObject() {} func (*DeleteOptions) IsAnAPIObject() {} func (*ListOptions) IsAnAPIObject() {} +func (*PodLogOptions) IsAnAPIObject() {} diff --git a/pkg/api/v1beta2/types.go b/pkg/api/v1beta2/types.go index 1cb0522e750..8a9be79b454 100644 --- a/pkg/api/v1beta2/types.go +++ b/pkg/api/v1beta2/types.go @@ -1118,6 +1118,17 @@ type ListOptions struct { ResourceVersion string `json:"resourceVersion" description:"when specified with a watch call, shows changes that occur after that particular version of a resource; defaults to changes from the beginning of history"` } +// PodLogOptions is the query options for a Pod's logs REST call +type PodLogOptions struct { + TypeMeta `json:",inline"` + + // Container for which to return logs + Container string `json:"container,omitempty" description:"the container for which to stream logs; defaults to only container if there is one container in the pod"` + + // If true, follow the logs for the pod + Follow bool `json:"follow,omitempty" description:"follow the log stream of the pod; defaults to false"` +} + // Status is a return value for calls that don't return other objects. // TODO: this could go in apiserver, but I'm including it here so clients needn't // import both. diff --git a/pkg/api/v1beta3/register.go b/pkg/api/v1beta3/register.go index 0d9c7a7a994..9622b4d983a 100644 --- a/pkg/api/v1beta3/register.go +++ b/pkg/api/v1beta3/register.go @@ -59,6 +59,7 @@ func init() { &PersistentVolumeClaimList{}, &DeleteOptions{}, &ListOptions{}, + &PodLogOptions{}, ) // Legacy names are supported api.Scheme.AddKnownTypeWithName("v1beta3", "Minion", &Node{}) @@ -98,3 +99,4 @@ func (*PersistentVolumeClaim) IsAnAPIObject() {} func (*PersistentVolumeClaimList) IsAnAPIObject() {} func (*DeleteOptions) IsAnAPIObject() {} func (*ListOptions) IsAnAPIObject() {} +func (*PodLogOptions) IsAnAPIObject() {} diff --git a/pkg/api/v1beta3/types.go b/pkg/api/v1beta3/types.go index 23070a36477..904f54b9335 100644 --- a/pkg/api/v1beta3/types.go +++ b/pkg/api/v1beta3/types.go @@ -1236,6 +1236,17 @@ type ListOptions struct { ResourceVersion string `json:"resourceVersion" description:"when specified with a watch call, shows changes that occur after that particular version of a resource; defaults to changes from the beginning of history"` } +// PodLogOptions is the query options for a Pod's logs REST call +type PodLogOptions struct { + TypeMeta `json:",inline"` + + // Container for which to return logs + Container string `json:"container,omitempty" description:"the container for which to stream logs; defaults to only container if there is one container in the pod"` + + // If true, follow the logs for the pod + Follow bool `json:"follow,omitempty" description:"follow the log stream of the pod; defaults to false"` +} + // Status is a return value for calls that don't return other objects. type Status struct { TypeMeta `json:",inline"` diff --git a/pkg/apiserver/api_installer.go b/pkg/apiserver/api_installer.go index 25d73fbb3b7..92c861d5624 100644 --- a/pkg/apiserver/api_installer.go +++ b/pkg/apiserver/api_installer.go @@ -130,6 +130,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag creater, isCreater := storage.(rest.Creater) lister, isLister := storage.(rest.Lister) getter, isGetter := storage.(rest.Getter) + getterWithOptions, isGetterWithOptions := storage.(rest.GetterWithOptions) deleter, isDeleter := storage.(rest.Deleter) gracefulDeleter, isGracefulDeleter := storage.(rest.GracefulDeleter) updater, isUpdater := storage.(rest.Updater) @@ -170,6 +171,17 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag gracefulDeleter = rest.GracefulDeleteAdapter{deleter} } + var getOptions runtime.Object + var getOptionsKind string + if isGetterWithOptions { + getOptions = getterWithOptions.NewGetOptions() + _, getOptionsKind, err = a.group.Typer.ObjectVersionAndKind(getOptions) + if err != nil { + return err + } + isGetter = true + } + var ctxFn ContextFunc ctxFn = func(req *restful.Request) api.Context { if ctx, ok := context.Get(req.Request); ok { @@ -316,12 +328,23 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag m := monitorFilter(action.Verb, resource) switch action.Verb { case "GET": // Get a resource. - route := ws.GET(action.Path).To(GetResource(getter, reqScope)). + var handler restful.RouteFunction + if isGetterWithOptions { + handler = GetResourceWithOptions(getterWithOptions, reqScope, getOptionsKind) + } else { + handler = GetResource(getter, reqScope) + } + route := ws.GET(action.Path).To(handler). Filter(m). Doc("read the specified " + kind). Operation("read" + kind). Produces(append(storageMeta.ProducesMIMETypes(action.Verb), "application/json")...). Writes(versionedObject) + if isGetterWithOptions { + if err := addObjectParams(ws, route, getOptions); err != nil { + return err + } + } addParams(route, action.Params) ws.Route(route) case "LIST": // List all resources of a kind. diff --git a/pkg/apiserver/apiserver.go b/pkg/apiserver/apiserver.go index a3d0b56976d..bcfc1551cfd 100644 --- a/pkg/apiserver/apiserver.go +++ b/pkg/apiserver/apiserver.go @@ -36,6 +36,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util/errors" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util/flushwriter" "github.com/GoogleCloudPlatform/kubernetes/pkg/version" "github.com/emicklei/go-restful" @@ -204,18 +205,27 @@ func APIVersionHandler(versions ...string) restful.RouteFunction { // be "application/octet-stream". All other objects are sent to standard JSON serialization. func write(statusCode int, apiVersion string, codec runtime.Codec, object runtime.Object, w http.ResponseWriter, req *http.Request) { if stream, ok := object.(rest.ResourceStreamer); ok { - out, contentType, err := stream.InputStream(apiVersion, req.Header.Get("Accept")) + out, flush, contentType, err := stream.InputStream(apiVersion, req.Header.Get("Accept")) if err != nil { errorJSONFatal(err, codec, w) return } + if out == nil { + // No output provided - return StatusNoContent + w.WriteHeader(http.StatusNoContent) + return + } defer out.Close() if len(contentType) == 0 { contentType = "application/octet-stream" } w.Header().Set("Content-Type", contentType) w.WriteHeader(statusCode) - io.Copy(w, out) + writer := w.(io.Writer) + if flush { + writer = flushwriter.Wrap(w) + } + io.Copy(writer, out) return } writeJSON(statusCode, codec, object, w) diff --git a/pkg/apiserver/apiserver_test.go b/pkg/apiserver/apiserver_test.go index 5631b4df79d..ecc3a6af0b5 100644 --- a/pkg/apiserver/apiserver_test.go +++ b/pkg/apiserver/apiserver_test.go @@ -114,13 +114,17 @@ func init() { // api.Status is returned in errors // "internal" version - api.Scheme.AddKnownTypes("", &Simple{}, &SimpleList{}, &api.Status{}, &api.ListOptions{}) + api.Scheme.AddKnownTypes("", &Simple{}, &SimpleList{}, &api.Status{}, &api.ListOptions{}, &SimpleGetOptions{}) // "version" version // TODO: Use versioned api objects? - api.Scheme.AddKnownTypes(testVersion, &Simple{}, &SimpleList{}, &v1beta1.Status{}) + api.Scheme.AddKnownTypes(testVersion, &Simple{}, &SimpleList{}, &v1beta1.Status{}, &SimpleGetOptions{}) // "version2" version // TODO: Use versioned api objects? - api.Scheme.AddKnownTypes(testVersion2, &Simple{}, &SimpleList{}, &v1beta3.Status{}) + api.Scheme.AddKnownTypes(testVersion2, &Simple{}, &SimpleList{}, &v1beta3.Status{}, &SimpleGetOptions{}) + + // Register SimpleGetOptions with the server versions to convert query params to it + api.Scheme.AddKnownTypes("v1beta1", &SimpleGetOptions{}) + api.Scheme.AddKnownTypes("v1beta3", &SimpleGetOptions{}) nsMapper := newMapper() legacyNsMapper := newMapper() @@ -231,6 +235,14 @@ type Simple struct { func (*Simple) IsAnAPIObject() {} +type SimpleGetOptions struct { + api.TypeMeta `json:",inline"` + Param1 string `json:"param1"` + Param2 string `json:"param2"` +} + +func (*SimpleGetOptions) IsAnAPIObject() {} + type SimpleList struct { api.TypeMeta `json:",inline"` api.ListMeta `json:"metadata,inline"` @@ -254,6 +266,21 @@ func TestSimpleSetupRight(t *testing.T) { } } +func TestSimpleOptionsSetupRight(t *testing.T) { + s := &SimpleGetOptions{} + wire, err := codec.Encode(s) + if err != nil { + t.Fatal(err) + } + s2, err := codec.Decode(wire) + if err != nil { + t.Fatal(err) + } + if !reflect.DeepEqual(s, s2) { + t.Fatalf("encode/decode broken:\n%#v\n%#v\n", s, s2) + } +} + type SimpleRESTStorage struct { errors map[string]error list []Simple @@ -314,10 +341,10 @@ func (s *SimpleStream) Close() error { func (s *SimpleStream) IsAnAPIObject() {} -func (s *SimpleStream) InputStream(version, accept string) (io.ReadCloser, string, error) { +func (s *SimpleStream) InputStream(version, accept string) (io.ReadCloser, bool, string, error) { s.version = version s.accept = accept - return s, s.contentType, s.err + return s, false, s.contentType, s.err } func (storage *SimpleRESTStorage) Get(ctx api.Context, id string) (runtime.Object, error) { @@ -432,6 +459,23 @@ func (m *MetadataRESTStorage) ProducesMIMETypes(method string) []string { return m.types } +type GetWithOptionsRESTStorage struct { + *SimpleRESTStorage + optionsReceived runtime.Object +} + +func (r *GetWithOptionsRESTStorage) Get(ctx api.Context, name string, options runtime.Object) (runtime.Object, error) { + if _, ok := options.(*SimpleGetOptions); !ok { + return nil, fmt.Errorf("Unexpected options object: %#v", options) + } + r.optionsReceived = options + return r.SimpleRESTStorage.Get(ctx, name) +} + +func (r *GetWithOptionsRESTStorage) NewGetOptions() runtime.Object { + return &SimpleGetOptions{} +} + func extractBody(response *http.Response, object runtime.Object) (string, error) { defer response.Body.Close() body, err := ioutil.ReadAll(response.Body) @@ -878,6 +922,47 @@ func TestGetBinary(t *testing.T) { } } +func TestGetWithOptions(t *testing.T) { + storage := map[string]rest.Storage{} + simpleStorage := GetWithOptionsRESTStorage{ + SimpleRESTStorage: &SimpleRESTStorage{ + item: Simple{ + Other: "foo", + }, + }, + } + storage["simple"] = &simpleStorage + handler := handle(storage) + server := httptest.NewServer(handler) + defer server.Close() + + resp, err := http.Get(server.URL + "/api/version/simple/id?param1=test1¶m2=test2") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if resp.StatusCode != http.StatusOK { + t.Fatalf("unexpected response: %#v", resp) + } + var itemOut Simple + body, err := extractBody(resp, &itemOut) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + + if itemOut.Name != simpleStorage.item.Name { + t.Errorf("Unexpected data: %#v, expected %#v (%s)", itemOut, simpleStorage.item, string(body)) + } + + opts, ok := simpleStorage.optionsReceived.(*SimpleGetOptions) + if !ok { + t.Errorf("Unexpected options object received: %#v", simpleStorage.optionsReceived) + return + } + if opts.Param1 != "test1" || opts.Param2 != "test2" { + t.Errorf("Did not receive expected options: %#v", opts) + } +} + func TestGetAlternateSelfLink(t *testing.T) { storage := map[string]rest.Storage{} simpleStorage := SimpleRESTStorage{ diff --git a/pkg/apiserver/resthandler.go b/pkg/apiserver/resthandler.go index 3e6b211a6fc..bd3d351c6f6 100644 --- a/pkg/apiserver/resthandler.go +++ b/pkg/apiserver/resthandler.go @@ -74,8 +74,13 @@ type RequestScope struct { ServerAPIVersion string } -// GetResource returns a function that handles retrieving a single resource from a rest.Storage object. -func GetResource(r rest.Getter, scope RequestScope) restful.RouteFunction { +// getterFunc performs a get request with the given context and object name. The request +// may be used to deserialize an options object to pass to the getter. +type getterFunc func(ctx api.Context, name string, req *restful.Request) (runtime.Object, error) + +// getResourceHandler is an HTTP handler function for get requests. It delegates to the +// passed-in getterFunc to perform the actual get. +func getResourceHandler(scope RequestScope, getter getterFunc) restful.RouteFunction { return func(req *restful.Request, res *restful.Response) { w := res.ResponseWriter namespace, name, err := scope.Namer.Name(req) @@ -86,7 +91,7 @@ func GetResource(r rest.Getter, scope RequestScope) restful.RouteFunction { ctx := scope.ContextFunc(req) ctx = api.WithNamespace(ctx, namespace) - result, err := r.Get(ctx, name) + result, err := getter(ctx, name, req) if err != nil { errorJSON(err, scope.Codec, w) return @@ -99,6 +104,26 @@ func GetResource(r rest.Getter, scope RequestScope) restful.RouteFunction { } } +// GetResource returns a function that handles retrieving a single resource from a rest.Storage object. +func GetResource(r rest.Getter, scope RequestScope) restful.RouteFunction { + return getResourceHandler(scope, + func(ctx api.Context, name string, req *restful.Request) (runtime.Object, error) { + return r.Get(ctx, name) + }) +} + +// GetResourceWithOptions returns a function that handles retrieving a single resource from a rest.Storage object. +func GetResourceWithOptions(r rest.GetterWithOptions, scope RequestScope, getOptionsKind string) restful.RouteFunction { + return getResourceHandler(scope, + func(ctx api.Context, name string, req *restful.Request) (runtime.Object, error) { + opts, err := queryToObject(req.Request.URL.Query(), scope, getOptionsKind) + if err != nil { + return nil, err + } + return r.Get(ctx, name, opts) + }) +} + // ListResource returns a function that handles retrieving a list of resources from a rest.Storage object. func ListResource(r rest.Lister, rw rest.Watcher, scope RequestScope, forceWatch bool) restful.RouteFunction { return func(req *restful.Request, res *restful.Response) { diff --git a/pkg/kubelet/handlers.go b/pkg/kubelet/handlers.go index 2e8181b3c08..0f37709e70a 100644 --- a/pkg/kubelet/handlers.go +++ b/pkg/kubelet/handlers.go @@ -18,9 +18,7 @@ package kubelet import ( "fmt" - "io" "net" - "net/http" "strconv" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" @@ -95,21 +93,3 @@ func (h *httpActionHandler) Run(podFullName string, uid types.UID, container *ap _, err := h.client.Get(url) return err } - -// FlushWriter provides wrapper for responseWriter with HTTP streaming capabilities -type FlushWriter struct { - flusher http.Flusher - writer io.Writer -} - -// Write is a FlushWriter implementation of the io.Writer that sends any buffered data to the client. -func (fw *FlushWriter) Write(p []byte) (n int, err error) { - n, err = fw.writer.Write(p) - if err != nil { - return - } - if fw.flusher != nil { - fw.flusher.Flush() - } - return -} diff --git a/pkg/kubelet/server.go b/pkg/kubelet/server.go index 3eca3ce1e20..a63a40c2873 100644 --- a/pkg/kubelet/server.go +++ b/pkg/kubelet/server.go @@ -38,6 +38,7 @@ import ( kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/types" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util/flushwriter" "github.com/GoogleCloudPlatform/kubernetes/pkg/util/httpstream" "github.com/GoogleCloudPlatform/kubernetes/pkg/util/httpstream/spdy" "github.com/golang/glog" @@ -262,15 +263,14 @@ func (s *Server) handleContainerLogs(w http.ResponseWriter, req *http.Request) { return } - fw := FlushWriter{writer: w} - if flusher, ok := fw.writer.(http.Flusher); ok { - fw.flusher = flusher - } else { - s.error(w, fmt.Errorf("unable to convert %v into http.Flusher", fw)) + if _, ok := w.(http.Flusher); !ok { + s.error(w, fmt.Errorf("unable to convert %v into http.Flusher", w)) + return } + fw := flushwriter.Wrap(w) w.Header().Set("Transfer-Encoding", "chunked") w.WriteHeader(http.StatusOK) - err = s.host.GetKubeletContainerLogs(kubecontainer.GetPodFullName(pod), containerName, tail, follow, &fw, &fw) + err = s.host.GetKubeletContainerLogs(kubecontainer.GetPodFullName(pod), containerName, tail, follow, fw, fw) if err != nil { s.error(w, err) return diff --git a/pkg/master/master.go b/pkg/master/master.go index 9a92bf48d82..a306c18f3ed 100644 --- a/pkg/master/master.go +++ b/pkg/master/master.go @@ -357,8 +357,8 @@ func logStackOnRecover(panicReason interface{}, httpWriter http.ResponseWriter) // init initializes master. func (m *Master) init(c *Config) { - podStorage, bindingStorage, podStatusStorage := podetcd.NewStorage(c.EtcdHelper) - podRegistry := pod.NewRegistry(podStorage) + podStorage := podetcd.NewStorage(c.EtcdHelper, c.KubeletClient) + podRegistry := pod.NewRegistry(podStorage.Pod) eventRegistry := event.NewEtcdRegistry(c.EtcdHelper, uint64(c.EventTTL.Seconds())) limitRangeRegistry := limitrange.NewEtcdRegistry(c.EtcdHelper) @@ -385,10 +385,11 @@ func (m *Master) init(c *Config) { // TODO: Factor out the core API registration m.storage = map[string]rest.Storage{ - "pods": podStorage, - "pods/status": podStatusStorage, - "pods/binding": bindingStorage, - "bindings": bindingStorage, + "pods": podStorage.Pod, + "pods/status": podStorage.Status, + "pods/log": podStorage.Log, + "pods/binding": podStorage.Binding, + "bindings": podStorage.Binding, "replicationControllers": controllerStorage, "services": service.NewStorage(m.serviceRegistry, c.Cloud, m.nodeRegistry, m.endpointRegistry, m.portalNet, c.ClusterName), diff --git a/pkg/registry/etcd/etcd_test.go b/pkg/registry/etcd/etcd_test.go index 1e51b3d5f6c..2039d963503 100644 --- a/pkg/registry/etcd/etcd_test.go +++ b/pkg/registry/etcd/etcd_test.go @@ -44,9 +44,9 @@ func NewTestEtcdRegistry(client tools.EtcdClient) *Registry { func NewTestEtcdRegistryWithPods(client tools.EtcdClient) *Registry { helper := tools.NewEtcdHelper(client, latest.Codec) - podStorage, _, _ := podetcd.NewStorage(helper) + podStorage := podetcd.NewStorage(helper, nil) endpointStorage := endpointetcd.NewStorage(helper) - registry := NewRegistry(helper, pod.NewRegistry(podStorage), endpoint.NewRegistry(endpointStorage)) + registry := NewRegistry(helper, pod.NewRegistry(podStorage.Pod), endpoint.NewRegistry(endpointStorage)) return registry } diff --git a/pkg/registry/generic/rest/doc.go b/pkg/registry/generic/rest/doc.go new file mode 100644 index 00000000000..9bf2b7c78b7 --- /dev/null +++ b/pkg/registry/generic/rest/doc.go @@ -0,0 +1,19 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package rest has generic implementations of resources used for +// REST responses +package rest diff --git a/pkg/registry/generic/rest/streamer.go b/pkg/registry/generic/rest/streamer.go new file mode 100644 index 00000000000..9cebbb95092 --- /dev/null +++ b/pkg/registry/generic/rest/streamer.go @@ -0,0 +1,69 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package rest + +import ( + "io" + "net/http" + "net/url" + "strings" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/rest" +) + +// LocationStreamer is a resource that streams the contents of a particular +// location URL +type LocationStreamer struct { + Location *url.URL + Transport http.RoundTripper + ContentType string + Flush bool +} + +// a LocationStreamer must implement a rest.ResourceStreamer +var _ rest.ResourceStreamer = &LocationStreamer{} + +// IsAnAPIObject marks this object as a runtime.Object +func (*LocationStreamer) IsAnAPIObject() {} + +// InputStream returns a stream with the contents of the URL location. If no location is provided, +// a null stream is returned. +func (s *LocationStreamer) InputStream(apiVersion, acceptHeader string) (stream io.ReadCloser, flush bool, contentType string, err error) { + if s.Location == nil { + // If no location was provided, return a null stream + return nil, false, "", nil + } + transport := s.Transport + if transport == nil { + transport = http.DefaultTransport + } + client := &http.Client{Transport: transport} + resp, err := client.Get(s.Location.String()) + if err != nil { + return + } + contentType = s.ContentType + if len(contentType) == 0 { + contentType = resp.Header.Get("Content-Type") + if len(contentType) > 0 { + contentType = strings.TrimSpace(strings.SplitN(contentType, ";", 2)[0]) + } + } + flush = s.Flush + stream = resp.Body + return +} diff --git a/pkg/registry/generic/rest/streamer_test.go b/pkg/registry/generic/rest/streamer_test.go new file mode 100644 index 00000000000..53d2d9aaaa3 --- /dev/null +++ b/pkg/registry/generic/rest/streamer_test.go @@ -0,0 +1,118 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package rest + +import ( + "bufio" + "bytes" + "fmt" + "io/ioutil" + "net/http" + "net/http/httptest" + "net/url" + "testing" +) + +func TestInputStreamReader(t *testing.T) { + resultString := "Test output" + s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + w.Write([]byte(resultString)) + })) + defer s.Close() + u, err := url.Parse(s.URL) + if err != nil { + t.Errorf("Error parsing server URL: %v", err) + return + } + streamer := &LocationStreamer{ + Location: u, + } + readCloser, _, _, err := streamer.InputStream("v1beta1", "text/plain") + if err != nil { + t.Errorf("Unexpected error when getting stream: %v", err) + return + } + defer readCloser.Close() + result, err := ioutil.ReadAll(readCloser) + if string(result) != resultString { + t.Errorf("Stream content does not match. Got: %s. Expected: %s.", string(result), resultString) + } +} + +func TestInputStreamNullLocation(t *testing.T) { + streamer := &LocationStreamer{ + Location: nil, + } + readCloser, _, _, err := streamer.InputStream("v1beta1", "text/plain") + if err != nil { + t.Errorf("Unexpected error when getting stream with null location: %v", err) + } + if readCloser != nil { + t.Errorf("Expected stream to be nil. Got: %#v", readCloser) + } +} + +type testTransport struct { + body string + err error +} + +func (tt *testTransport) RoundTrip(req *http.Request) (*http.Response, error) { + r := bufio.NewReader(bytes.NewBufferString(tt.body)) + return http.ReadResponse(r, req) +} + +func fakeTransport(mime, message string) http.RoundTripper { + content := fmt.Sprintf("HTTP/1.1 200 OK\nContent-Type: %s\n\n%s", mime, message) + return &testTransport{body: content} +} + +func TestInputStreamContentType(t *testing.T) { + location, _ := url.Parse("http://www.example.com") + streamer := &LocationStreamer{ + Location: location, + Transport: fakeTransport("application/json", "hello world"), + } + readCloser, _, contentType, err := streamer.InputStream("v1beta1", "text/plain") + if err != nil { + t.Errorf("Unexpected error when getting stream: %v", err) + return + } + defer readCloser.Close() + if contentType != "application/json" { + t.Errorf("Unexpected content type. Got: %s. Expected: application/json", contentType) + } +} + +func TestInputStreamTransport(t *testing.T) { + message := "hello world" + location, _ := url.Parse("http://www.example.com") + streamer := &LocationStreamer{ + Location: location, + Transport: fakeTransport("text/plain", message), + } + readCloser, _, _, err := streamer.InputStream("v1beta1", "text/plain") + if err != nil { + t.Errorf("Unexpected error when getting stream: %v", err) + return + } + defer readCloser.Close() + result, err := ioutil.ReadAll(readCloser) + if string(result) != message { + t.Errorf("Stream content does not match. Got: %s. Expected: %s.", string(result), message) + } +} diff --git a/pkg/registry/pod/etcd/etcd.go b/pkg/registry/pod/etcd/etcd.go index f35eaf8128e..008d27a8b9e 100644 --- a/pkg/registry/pod/etcd/etcd.go +++ b/pkg/registry/pod/etcd/etcd.go @@ -25,23 +25,33 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" etcderr "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors/etcd" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/rest" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/fields" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic" etcdgeneric "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic/etcd" + genericrest "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic/rest" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/pod" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" "github.com/GoogleCloudPlatform/kubernetes/pkg/util/fielderrors" ) -// rest implements a RESTStorage for pods against etcd +// PodStorage includes storage for pods and all sub resources +type PodStorage struct { + Pod *REST + Binding *BindingREST + Status *StatusREST + Log *LogREST +} + +// REST implements a RESTStorage for pods against etcd type REST struct { etcdgeneric.Etcd } // NewStorage returns a RESTStorage object that will work against pods. -func NewStorage(h tools.EtcdHelper) (*REST, *BindingREST, *StatusREST) { +func NewStorage(h tools.EtcdHelper, k client.ConnectionInfoGetter) PodStorage { prefix := "/registry/pods" store := &etcdgeneric.Etcd{ NewFunc: func() runtime.Object { return &api.Pod{} }, @@ -74,7 +84,12 @@ func NewStorage(h tools.EtcdHelper) (*REST, *BindingREST, *StatusREST) { statusStore.UpdateStrategy = pod.StatusStrategy - return &REST{*store}, &BindingREST{store: store}, &StatusREST{store: &statusStore} + return PodStorage{ + Pod: &REST{*store}, + Binding: &BindingREST{store: store}, + Status: &StatusREST{store: &statusStore}, + Log: &LogREST{store: store, kubeletConn: k}, + } } // Implement Redirector. @@ -90,6 +105,7 @@ type BindingREST struct { store *etcdgeneric.Etcd } +// New creates a new binding resource func (r *BindingREST) New() runtime.Object { return &api.Binding{} } @@ -165,6 +181,7 @@ type StatusREST struct { store *etcdgeneric.Etcd } +// New creates a new pod resource func (r *StatusREST) New() runtime.Object { return &api.Pod{} } @@ -173,3 +190,37 @@ func (r *StatusREST) New() runtime.Object { func (r *StatusREST) Update(ctx api.Context, obj runtime.Object) (runtime.Object, bool, error) { return r.store.Update(ctx, obj) } + +// LogREST implements the log endpoint for a Pod +type LogREST struct { + store *etcdgeneric.Etcd + kubeletConn client.ConnectionInfoGetter +} + +// New creates a new Pod log options object +func (r *LogREST) New() runtime.Object { + return &api.PodLogOptions{} +} + +// Get retrieves a runtime.Object that will stream the contents of the pod log +func (r *LogREST) Get(ctx api.Context, name string, opts runtime.Object) (runtime.Object, error) { + logOpts, ok := opts.(*api.PodLogOptions) + if !ok { + return nil, fmt.Errorf("Invalid options object: %#v", opts) + } + location, transport, err := pod.LogLocation(r.store, r.kubeletConn, ctx, name, logOpts) + if err != nil { + return nil, err + } + return &genericrest.LocationStreamer{ + Location: location, + Transport: transport, + ContentType: "text/plain", + Flush: logOpts.Follow, + }, nil +} + +// NewGetOptions creates a new options object +func (r *LogREST) NewGetOptions() runtime.Object { + return &api.PodLogOptions{} +} diff --git a/pkg/registry/pod/etcd/etcd_test.go b/pkg/registry/pod/etcd/etcd_test.go index 2b414572568..d682ddda0f0 100644 --- a/pkg/registry/pod/etcd/etcd_test.go +++ b/pkg/registry/pod/etcd/etcd_test.go @@ -47,8 +47,8 @@ func newHelper(t *testing.T) (*tools.FakeEtcdClient, tools.EtcdHelper) { func newStorage(t *testing.T) (*REST, *BindingREST, *StatusREST, *tools.FakeEtcdClient, tools.EtcdHelper) { fakeEtcdClient, h := newHelper(t) - storage, bindingStorage, statusStorage := NewStorage(h) - return storage, bindingStorage, statusStorage, fakeEtcdClient, h + storage := NewStorage(h, nil) + return storage.Pod, storage.Binding, storage.Status, fakeEtcdClient, h } func validNewPod() *api.Pod { @@ -89,7 +89,7 @@ func TestStorage(t *testing.T) { func TestCreate(t *testing.T) { fakeEtcdClient, helper := newHelper(t) - storage, _, _ := NewStorage(helper) + storage := NewStorage(helper, nil).Pod test := resttest.New(t, storage, fakeEtcdClient.SetError) pod := validNewPod() pod.ObjectMeta = api.ObjectMeta{} @@ -107,7 +107,7 @@ func TestCreate(t *testing.T) { func TestDelete(t *testing.T) { fakeEtcdClient, helper := newHelper(t) - storage, _, _ := NewStorage(helper) + storage := NewStorage(helper, nil).Pod test := resttest.New(t, storage, fakeEtcdClient.SetError) createFn := func() runtime.Object { @@ -143,7 +143,7 @@ func expectPod(t *testing.T, out runtime.Object) (*api.Pod, bool) { func TestCreateRegistryError(t *testing.T) { fakeEtcdClient, helper := newHelper(t) fakeEtcdClient.Err = fmt.Errorf("test error") - storage, _, _ := NewStorage(helper) + storage := NewStorage(helper, nil).Pod pod := validNewPod() _, err := storage.Create(api.NewDefaultContext(), pod) @@ -154,7 +154,7 @@ func TestCreateRegistryError(t *testing.T) { func TestCreateSetsFields(t *testing.T) { fakeEtcdClient, helper := newHelper(t) - storage, _, _ := NewStorage(helper) + storage := NewStorage(helper, nil).Pod pod := validNewPod() _, err := storage.Create(api.NewDefaultContext(), pod) if err != fakeEtcdClient.Err { @@ -176,7 +176,7 @@ func TestCreateSetsFields(t *testing.T) { func TestListError(t *testing.T) { fakeEtcdClient, helper := newHelper(t) fakeEtcdClient.Err = fmt.Errorf("test error") - storage, _, _ := NewStorage(helper) + storage := NewStorage(helper, nil).Pod pods, err := storage.List(api.NewDefaultContext(), labels.Everything(), fields.Everything()) if err != fakeEtcdClient.Err { t.Fatalf("Expected %#v, Got %#v", fakeEtcdClient.Err, err) @@ -194,7 +194,7 @@ func TestListEmptyPodList(t *testing.T) { E: fakeEtcdClient.NewError(tools.EtcdErrorCodeNotFound), } - storage, _, _ := NewStorage(helper) + storage := NewStorage(helper, nil).Pod pods, err := storage.List(api.NewContext(), labels.Everything(), fields.Everything()) if err != nil { t.Fatalf("unexpected error: %v", err) @@ -231,7 +231,7 @@ func TestListPodList(t *testing.T) { }, }, } - storage, _, _ := NewStorage(helper) + storage := NewStorage(helper, nil).Pod podsObj, err := storage.List(api.NewDefaultContext(), labels.Everything(), fields.Everything()) pods := podsObj.(*api.PodList) @@ -280,7 +280,7 @@ func TestListPodListSelection(t *testing.T) { }, }, } - storage, _, _ := NewStorage(helper) + storage := NewStorage(helper, nil).Pod ctx := api.NewDefaultContext() @@ -345,7 +345,7 @@ func TestListPodListSelection(t *testing.T) { } func TestPodDecode(t *testing.T) { - storage, _, _ := NewStorage(tools.EtcdHelper{}) + storage := NewStorage(tools.EtcdHelper{}, nil).Pod expected := validNewPod() body, err := latest.Codec.Encode(expected) if err != nil { @@ -375,7 +375,7 @@ func TestGet(t *testing.T) { }, }, } - storage, _, _ := NewStorage(helper) + storage := NewStorage(helper, nil).Pod obj, err := storage.Get(api.WithNamespace(api.NewContext(), "test"), "foo") pod := obj.(*api.Pod) @@ -392,7 +392,7 @@ func TestGet(t *testing.T) { func TestPodStorageValidatesCreate(t *testing.T) { fakeEtcdClient, helper := newHelper(t) fakeEtcdClient.Err = fmt.Errorf("test error") - storage, _, _ := NewStorage(helper) + storage := NewStorage(helper, nil).Pod pod := validNewPod() pod.Labels = map[string]string{ @@ -410,7 +410,7 @@ func TestPodStorageValidatesCreate(t *testing.T) { // TODO: remove, this is covered by RESTTest.TestCreate func TestCreatePod(t *testing.T) { _, helper := newHelper(t) - storage, _, _ := NewStorage(helper) + storage := NewStorage(helper, nil).Pod pod := validNewPod() obj, err := storage.Create(api.NewDefaultContext(), pod) @@ -432,7 +432,7 @@ func TestCreatePod(t *testing.T) { // TODO: remove, this is covered by RESTTest.TestCreate func TestCreateWithConflictingNamespace(t *testing.T) { _, helper := newHelper(t) - storage, _, _ := NewStorage(helper) + storage := NewStorage(helper, nil).Pod pod := validNewPod() pod.Namespace = "not-default" @@ -461,7 +461,7 @@ func TestUpdateWithConflictingNamespace(t *testing.T) { }, }, } - storage, _, _ := NewStorage(helper) + storage := NewStorage(helper, nil).Pod pod := validChangedPod() pod.Namespace = "not-default" @@ -578,7 +578,7 @@ func TestResourceLocation(t *testing.T) { }, }, } - storage, _, _ := NewStorage(helper) + storage := NewStorage(helper, nil).Pod redirector := rest.Redirector(storage) location, _, err := redirector.ResourceLocation(api.NewDefaultContext(), tc.query) @@ -616,7 +616,7 @@ func TestDeletePod(t *testing.T) { }, }, } - storage, _, _ := NewStorage(helper) + storage := NewStorage(helper, nil).Pod _, err := storage.Delete(api.NewDefaultContext(), "foo", nil) if err != nil { diff --git a/pkg/registry/pod/rest.go b/pkg/registry/pod/rest.go index 9c1867d853e..04aab008267 100644 --- a/pkg/registry/pod/rest.go +++ b/pkg/registry/pod/rest.go @@ -26,6 +26,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/validation" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/fields" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic" @@ -133,6 +134,18 @@ type ResourceGetter interface { Get(api.Context, string) (runtime.Object, error) } +func getPod(getter ResourceGetter, ctx api.Context, name string) (*api.Pod, error) { + obj, err := getter.Get(ctx, name) + if err != nil { + return nil, err + } + pod := obj.(*api.Pod) + if pod == nil { + return nil, fmt.Errorf("Unexpected object type: %#v", pod) + } + return pod, nil +} + // ResourceLocation returns a URL to which one can send traffic for the specified pod. func ResourceLocation(getter ResourceGetter, ctx api.Context, id string) (*url.URL, http.RoundTripper, error) { // Allow ID as "podname" or "podname:port". If port is not specified, @@ -148,14 +161,10 @@ func ResourceLocation(getter ResourceGetter, ctx api.Context, id string) (*url.U port = parts[1] } - obj, err := getter.Get(ctx, name) + pod, err := getPod(getter, ctx, name) if err != nil { return nil, nil, err } - pod := obj.(*api.Pod) - if pod == nil { - return nil, nil, nil - } // Try to figure out a port. if port == "" { @@ -177,3 +186,43 @@ func ResourceLocation(getter ResourceGetter, ctx api.Context, id string) (*url.U } return loc, nil, nil } + +// LogLocation returns a the log URL for a pod container. If opts.Container is blank +// and only one container is present in the pod, that container is used. +func LogLocation(getter ResourceGetter, connInfo client.ConnectionInfoGetter, ctx api.Context, name string, opts *api.PodLogOptions) (*url.URL, http.RoundTripper, error) { + + pod, err := getPod(getter, ctx, name) + if err != nil { + return nil, nil, err + } + + // Try to figure out a container + container := opts.Container + if container == "" { + if len(pod.Spec.Containers) == 1 { + container = pod.Spec.Containers[0].Name + } else { + return nil, nil, fmt.Errorf("a container name must be specified for pod %s", name) + } + } + nodeHost := pod.Status.HostIP + if len(nodeHost) == 0 { + // If pod has not been assigned a host, return an empty location + return nil, nil, nil + } + nodeScheme, nodePort, nodeTransport, err := connInfo.GetConnectionInfo(nodeHost) + if err != nil { + return nil, nil, err + } + params := url.Values{} + if opts.Follow { + params.Add("follow", "true") + } + loc := &url.URL{ + Scheme: nodeScheme, + Host: fmt.Sprintf("%s:%d", nodeHost, nodePort), + Path: fmt.Sprintf("/containerLogs/%s/%s/%s", pod.Namespace, name, container), + RawQuery: params.Encode(), + } + return loc, nodeTransport, nil +} diff --git a/pkg/util/flushwriter/doc.go b/pkg/util/flushwriter/doc.go new file mode 100644 index 00000000000..5a81e510732 --- /dev/null +++ b/pkg/util/flushwriter/doc.go @@ -0,0 +1,19 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package flushwriter implements a wrapper for a writer that flushes on every +// write if that writer implements the io.Flusher interface +package flushwriter diff --git a/pkg/util/flushwriter/writer.go b/pkg/util/flushwriter/writer.go new file mode 100644 index 00000000000..01dffb44a37 --- /dev/null +++ b/pkg/util/flushwriter/writer.go @@ -0,0 +1,53 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package flushwriter + +import ( + "io" + "net/http" +) + +// Wrap wraps an io.Writer into a writer that flushes after every write if +// the writer implements the Flusher interface. +func Wrap(w io.Writer) io.Writer { + fw := &flushWriter{ + writer: w, + } + if flusher, ok := w.(http.Flusher); ok { + fw.flusher = flusher + } + return fw +} + +// flushWriter provides wrapper for responseWriter with HTTP streaming capabilities +type flushWriter struct { + flusher http.Flusher + writer io.Writer +} + +// Write is a FlushWriter implementation of the io.Writer that sends any buffered +// data to the client. +func (fw *flushWriter) Write(p []byte) (n int, err error) { + n, err = fw.writer.Write(p) + if err != nil { + return + } + if fw.flusher != nil { + fw.flusher.Flush() + } + return +} diff --git a/pkg/util/flushwriter/writer_test.go b/pkg/util/flushwriter/writer_test.go new file mode 100644 index 00000000000..61ba9795416 --- /dev/null +++ b/pkg/util/flushwriter/writer_test.go @@ -0,0 +1,86 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package flushwriter + +import ( + "fmt" + "testing" +) + +type writerWithFlush struct { + writeCount, flushCount int + err error +} + +func (w *writerWithFlush) Flush() { + w.flushCount++ +} + +func (w *writerWithFlush) Write(p []byte) (n int, err error) { + w.writeCount++ + return len(p), w.err +} + +type writerWithNoFlush struct { + writeCount int +} + +func (w *writerWithNoFlush) Write(p []byte) (n int, err error) { + w.writeCount++ + return len(p), nil +} + +func TestWriteWithFlush(t *testing.T) { + w := &writerWithFlush{} + fw := Wrap(w) + for i := 0; i < 10; i++ { + _, err := fw.Write([]byte("Test write")) + if err != nil { + t.Errorf("Unexpected error while writing with flush writer: %v", err) + } + } + if w.flushCount != 10 { + t.Errorf("Flush not called the expected number of times. Actual: %d", w.flushCount) + } + if w.writeCount != 10 { + t.Errorf("Write not called the expected number of times. Actual: %d", w.writeCount) + } +} + +func TestWriteWithoutFlush(t *testing.T) { + w := &writerWithNoFlush{} + fw := Wrap(w) + for i := 0; i < 10; i++ { + _, err := fw.Write([]byte("Test write")) + if err != nil { + t.Errorf("Unexpected error while writing with flush writer: %v", err) + } + } + if w.writeCount != 10 { + t.Errorf("Write not called the expected number of times. Actual: %d", w.writeCount) + } +} + +func TestWriteError(t *testing.T) { + e := fmt.Errorf("Error") + w := &writerWithFlush{err: e} + fw := Wrap(w) + _, err := fw.Write([]byte("Test write")) + if err != e { + t.Errorf("Did not get expected error. Got: %#v", err) + } +}