Merge pull request #6497 from csrwng/pod_log_subresource

Pod log subresource
This commit is contained in:
Clayton Coleman
2015-04-07 09:29:12 -04:00
26 changed files with 733 additions and 76 deletions

View File

@@ -58,6 +58,7 @@ func init() {
&PersistentVolumeClaimList{},
&DeleteOptions{},
&ListOptions{},
&PodLogOptions{},
)
// Legacy names are supported
Scheme.AddKnownTypeWithName("", "Minion", &Node{})
@@ -97,3 +98,4 @@ func (*PersistentVolumeClaim) IsAnAPIObject() {}
func (*PersistentVolumeClaimList) IsAnAPIObject() {}
func (*DeleteOptions) IsAnAPIObject() {}
func (*ListOptions) IsAnAPIObject() {}
func (*PodLogOptions) IsAnAPIObject() {}

View File

@@ -28,7 +28,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
)
// RESTStorage is a generic interface for RESTful storage services.
// Storage is a generic interface for RESTful storage services.
// Resources which are exported to the RESTful API of apiserver need to implement this interface. It is expected
// that objects may implement any of the below interfaces.
type Storage interface {
@@ -54,6 +54,21 @@ type Getter interface {
Get(ctx api.Context, name string) (runtime.Object, error)
}
// GetterWithOptions is an object that retrieve a named RESTful resource and takes
// additional options on the get request
type GetterWithOptions interface {
// Get finds a resource in the storage by name and returns it.
// Although it can return an arbitrary error value, IsNotFound(err) is true for the
// returned error value err when the specified resource is not found.
// The options object passed to it is of the same type returned by the NewGetOptions
// method.
Get(ctx api.Context, name string, options runtime.Object) (runtime.Object, error)
// NewGetOptions returns an empty options object that will be used to pass
// options to the Get method.
NewGetOptions() runtime.Object
}
// Deleter is an object that can delete a named RESTful resource.
type Deleter interface {
// Delete finds a resource in the storage and deletes it.
@@ -119,6 +134,7 @@ type CreaterUpdater interface {
// CreaterUpdater must satisfy the Updater interface.
var _ Updater = CreaterUpdater(nil)
// Patcher is a storage object that supports both get and update.
type Patcher interface {
Getter
Updater
@@ -153,11 +169,12 @@ type Redirector interface {
// ResourceStreamer is an interface implemented by objects that prefer to be streamed from the server
// instead of decoded directly.
type ResourceStreamer interface {
// InputStream should return an io.Reader if the provided object supports streaming. The desired
// InputStream should return an io.ReadCloser if the provided object supports streaming. The desired
// api version and a accept header (may be empty) are passed to the call. If no error occurs,
// the caller may return a content type string with the reader that indicates the type of the
// stream.
InputStream(apiVersion, acceptHeader string) (io.ReadCloser, string, error)
// the caller may return a flag indicating whether the result should be flushed as writes occur
// and a content type string that indicates the type of the stream.
// If a null stream is returned, a StatusNoContent response wil be generated.
InputStream(apiVersion, acceptHeader string) (stream io.ReadCloser, flush bool, mimeType string, err error)
}
// StorageMetadata is an optional interface that callers can implement to provide additional

View File

@@ -1249,6 +1249,17 @@ type ListOptions struct {
ResourceVersion string
}
// PodLogOptions is the query options for a Pod's logs REST call
type PodLogOptions struct {
TypeMeta
// Container for which to return logs
Container string
// If true, follow the logs for the pod
Follow bool
}
// Status is a return value for calls that don't return other objects.
// TODO: this could go in apiserver, but I'm including it here so clients needn't
// import both.

View File

@@ -65,6 +65,7 @@ func init() {
&PersistentVolumeClaimList{},
&DeleteOptions{},
&ListOptions{},
&PodLogOptions{},
)
// Future names are supported
api.Scheme.AddKnownTypeWithName("v1beta1", "Node", &Minion{})
@@ -104,3 +105,4 @@ func (*PersistentVolumeClaim) IsAnAPIObject() {}
func (*PersistentVolumeClaimList) IsAnAPIObject() {}
func (*DeleteOptions) IsAnAPIObject() {}
func (*ListOptions) IsAnAPIObject() {}
func (*PodLogOptions) IsAnAPIObject() {}

View File

@@ -1104,6 +1104,17 @@ type ListOptions struct {
ResourceVersion string `json:"resourceVersion" description:"when specified with a watch call, shows changes that occur after that particular version of a resource; defaults to changes from the beginning of history"`
}
// PodLogOptions is the query options for a Pod's logs REST call
type PodLogOptions struct {
TypeMeta `json:",inline"`
// Container for which to return logs
Container string `json:"container,omitempty" description:"the container for which to stream logs; defaults to only container if there is one container in the pod"`
// If true, follow the logs for the pod
Follow bool `json:"follow,omitempty" description:"follow the log stream of the pod; defaults to false"`
}
// Status is a return value for calls that don't return other objects.
// TODO: this could go in apiserver, but I'm including it here so clients needn't
// import both.

View File

@@ -65,6 +65,7 @@ func init() {
&PersistentVolumeClaimList{},
&DeleteOptions{},
&ListOptions{},
&PodLogOptions{},
)
// Future names are supported
api.Scheme.AddKnownTypeWithName("v1beta2", "Node", &Minion{})
@@ -104,3 +105,4 @@ func (*PersistentVolumeClaim) IsAnAPIObject() {}
func (*PersistentVolumeClaimList) IsAnAPIObject() {}
func (*DeleteOptions) IsAnAPIObject() {}
func (*ListOptions) IsAnAPIObject() {}
func (*PodLogOptions) IsAnAPIObject() {}

View File

@@ -1118,6 +1118,17 @@ type ListOptions struct {
ResourceVersion string `json:"resourceVersion" description:"when specified with a watch call, shows changes that occur after that particular version of a resource; defaults to changes from the beginning of history"`
}
// PodLogOptions is the query options for a Pod's logs REST call
type PodLogOptions struct {
TypeMeta `json:",inline"`
// Container for which to return logs
Container string `json:"container,omitempty" description:"the container for which to stream logs; defaults to only container if there is one container in the pod"`
// If true, follow the logs for the pod
Follow bool `json:"follow,omitempty" description:"follow the log stream of the pod; defaults to false"`
}
// Status is a return value for calls that don't return other objects.
// TODO: this could go in apiserver, but I'm including it here so clients needn't
// import both.

View File

@@ -59,6 +59,7 @@ func init() {
&PersistentVolumeClaimList{},
&DeleteOptions{},
&ListOptions{},
&PodLogOptions{},
)
// Legacy names are supported
api.Scheme.AddKnownTypeWithName("v1beta3", "Minion", &Node{})
@@ -98,3 +99,4 @@ func (*PersistentVolumeClaim) IsAnAPIObject() {}
func (*PersistentVolumeClaimList) IsAnAPIObject() {}
func (*DeleteOptions) IsAnAPIObject() {}
func (*ListOptions) IsAnAPIObject() {}
func (*PodLogOptions) IsAnAPIObject() {}

View File

@@ -1236,6 +1236,17 @@ type ListOptions struct {
ResourceVersion string `json:"resourceVersion" description:"when specified with a watch call, shows changes that occur after that particular version of a resource; defaults to changes from the beginning of history"`
}
// PodLogOptions is the query options for a Pod's logs REST call
type PodLogOptions struct {
TypeMeta `json:",inline"`
// Container for which to return logs
Container string `json:"container,omitempty" description:"the container for which to stream logs; defaults to only container if there is one container in the pod"`
// If true, follow the logs for the pod
Follow bool `json:"follow,omitempty" description:"follow the log stream of the pod; defaults to false"`
}
// Status is a return value for calls that don't return other objects.
type Status struct {
TypeMeta `json:",inline"`

View File

@@ -130,6 +130,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
creater, isCreater := storage.(rest.Creater)
lister, isLister := storage.(rest.Lister)
getter, isGetter := storage.(rest.Getter)
getterWithOptions, isGetterWithOptions := storage.(rest.GetterWithOptions)
deleter, isDeleter := storage.(rest.Deleter)
gracefulDeleter, isGracefulDeleter := storage.(rest.GracefulDeleter)
updater, isUpdater := storage.(rest.Updater)
@@ -170,6 +171,17 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
gracefulDeleter = rest.GracefulDeleteAdapter{deleter}
}
var getOptions runtime.Object
var getOptionsKind string
if isGetterWithOptions {
getOptions = getterWithOptions.NewGetOptions()
_, getOptionsKind, err = a.group.Typer.ObjectVersionAndKind(getOptions)
if err != nil {
return err
}
isGetter = true
}
var ctxFn ContextFunc
ctxFn = func(req *restful.Request) api.Context {
if ctx, ok := context.Get(req.Request); ok {
@@ -316,12 +328,23 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
m := monitorFilter(action.Verb, resource)
switch action.Verb {
case "GET": // Get a resource.
route := ws.GET(action.Path).To(GetResource(getter, reqScope)).
var handler restful.RouteFunction
if isGetterWithOptions {
handler = GetResourceWithOptions(getterWithOptions, reqScope, getOptionsKind)
} else {
handler = GetResource(getter, reqScope)
}
route := ws.GET(action.Path).To(handler).
Filter(m).
Doc("read the specified " + kind).
Operation("read" + kind).
Produces(append(storageMeta.ProducesMIMETypes(action.Verb), "application/json")...).
Writes(versionedObject)
if isGetterWithOptions {
if err := addObjectParams(ws, route, getOptions); err != nil {
return err
}
}
addParams(route, action.Params)
ws.Route(route)
case "LIST": // List all resources of a kind.

View File

@@ -36,6 +36,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/errors"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/flushwriter"
"github.com/GoogleCloudPlatform/kubernetes/pkg/version"
"github.com/emicklei/go-restful"
@@ -204,18 +205,27 @@ func APIVersionHandler(versions ...string) restful.RouteFunction {
// be "application/octet-stream". All other objects are sent to standard JSON serialization.
func write(statusCode int, apiVersion string, codec runtime.Codec, object runtime.Object, w http.ResponseWriter, req *http.Request) {
if stream, ok := object.(rest.ResourceStreamer); ok {
out, contentType, err := stream.InputStream(apiVersion, req.Header.Get("Accept"))
out, flush, contentType, err := stream.InputStream(apiVersion, req.Header.Get("Accept"))
if err != nil {
errorJSONFatal(err, codec, w)
return
}
if out == nil {
// No output provided - return StatusNoContent
w.WriteHeader(http.StatusNoContent)
return
}
defer out.Close()
if len(contentType) == 0 {
contentType = "application/octet-stream"
}
w.Header().Set("Content-Type", contentType)
w.WriteHeader(statusCode)
io.Copy(w, out)
writer := w.(io.Writer)
if flush {
writer = flushwriter.Wrap(w)
}
io.Copy(writer, out)
return
}
writeJSON(statusCode, codec, object, w)

View File

@@ -114,13 +114,17 @@ func init() {
// api.Status is returned in errors
// "internal" version
api.Scheme.AddKnownTypes("", &Simple{}, &SimpleList{}, &api.Status{}, &api.ListOptions{})
api.Scheme.AddKnownTypes("", &Simple{}, &SimpleList{}, &api.Status{}, &api.ListOptions{}, &SimpleGetOptions{})
// "version" version
// TODO: Use versioned api objects?
api.Scheme.AddKnownTypes(testVersion, &Simple{}, &SimpleList{}, &v1beta1.Status{})
api.Scheme.AddKnownTypes(testVersion, &Simple{}, &SimpleList{}, &v1beta1.Status{}, &SimpleGetOptions{})
// "version2" version
// TODO: Use versioned api objects?
api.Scheme.AddKnownTypes(testVersion2, &Simple{}, &SimpleList{}, &v1beta3.Status{})
api.Scheme.AddKnownTypes(testVersion2, &Simple{}, &SimpleList{}, &v1beta3.Status{}, &SimpleGetOptions{})
// Register SimpleGetOptions with the server versions to convert query params to it
api.Scheme.AddKnownTypes("v1beta1", &SimpleGetOptions{})
api.Scheme.AddKnownTypes("v1beta3", &SimpleGetOptions{})
nsMapper := newMapper()
legacyNsMapper := newMapper()
@@ -231,6 +235,14 @@ type Simple struct {
func (*Simple) IsAnAPIObject() {}
type SimpleGetOptions struct {
api.TypeMeta `json:",inline"`
Param1 string `json:"param1"`
Param2 string `json:"param2"`
}
func (*SimpleGetOptions) IsAnAPIObject() {}
type SimpleList struct {
api.TypeMeta `json:",inline"`
api.ListMeta `json:"metadata,inline"`
@@ -254,6 +266,21 @@ func TestSimpleSetupRight(t *testing.T) {
}
}
func TestSimpleOptionsSetupRight(t *testing.T) {
s := &SimpleGetOptions{}
wire, err := codec.Encode(s)
if err != nil {
t.Fatal(err)
}
s2, err := codec.Decode(wire)
if err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(s, s2) {
t.Fatalf("encode/decode broken:\n%#v\n%#v\n", s, s2)
}
}
type SimpleRESTStorage struct {
errors map[string]error
list []Simple
@@ -314,10 +341,10 @@ func (s *SimpleStream) Close() error {
func (s *SimpleStream) IsAnAPIObject() {}
func (s *SimpleStream) InputStream(version, accept string) (io.ReadCloser, string, error) {
func (s *SimpleStream) InputStream(version, accept string) (io.ReadCloser, bool, string, error) {
s.version = version
s.accept = accept
return s, s.contentType, s.err
return s, false, s.contentType, s.err
}
func (storage *SimpleRESTStorage) Get(ctx api.Context, id string) (runtime.Object, error) {
@@ -432,6 +459,23 @@ func (m *MetadataRESTStorage) ProducesMIMETypes(method string) []string {
return m.types
}
type GetWithOptionsRESTStorage struct {
*SimpleRESTStorage
optionsReceived runtime.Object
}
func (r *GetWithOptionsRESTStorage) Get(ctx api.Context, name string, options runtime.Object) (runtime.Object, error) {
if _, ok := options.(*SimpleGetOptions); !ok {
return nil, fmt.Errorf("Unexpected options object: %#v", options)
}
r.optionsReceived = options
return r.SimpleRESTStorage.Get(ctx, name)
}
func (r *GetWithOptionsRESTStorage) NewGetOptions() runtime.Object {
return &SimpleGetOptions{}
}
func extractBody(response *http.Response, object runtime.Object) (string, error) {
defer response.Body.Close()
body, err := ioutil.ReadAll(response.Body)
@@ -878,6 +922,47 @@ func TestGetBinary(t *testing.T) {
}
}
func TestGetWithOptions(t *testing.T) {
storage := map[string]rest.Storage{}
simpleStorage := GetWithOptionsRESTStorage{
SimpleRESTStorage: &SimpleRESTStorage{
item: Simple{
Other: "foo",
},
},
}
storage["simple"] = &simpleStorage
handler := handle(storage)
server := httptest.NewServer(handler)
defer server.Close()
resp, err := http.Get(server.URL + "/api/version/simple/id?param1=test1&param2=test2")
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if resp.StatusCode != http.StatusOK {
t.Fatalf("unexpected response: %#v", resp)
}
var itemOut Simple
body, err := extractBody(resp, &itemOut)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if itemOut.Name != simpleStorage.item.Name {
t.Errorf("Unexpected data: %#v, expected %#v (%s)", itemOut, simpleStorage.item, string(body))
}
opts, ok := simpleStorage.optionsReceived.(*SimpleGetOptions)
if !ok {
t.Errorf("Unexpected options object received: %#v", simpleStorage.optionsReceived)
return
}
if opts.Param1 != "test1" || opts.Param2 != "test2" {
t.Errorf("Did not receive expected options: %#v", opts)
}
}
func TestGetAlternateSelfLink(t *testing.T) {
storage := map[string]rest.Storage{}
simpleStorage := SimpleRESTStorage{

View File

@@ -74,8 +74,13 @@ type RequestScope struct {
ServerAPIVersion string
}
// GetResource returns a function that handles retrieving a single resource from a rest.Storage object.
func GetResource(r rest.Getter, scope RequestScope) restful.RouteFunction {
// getterFunc performs a get request with the given context and object name. The request
// may be used to deserialize an options object to pass to the getter.
type getterFunc func(ctx api.Context, name string, req *restful.Request) (runtime.Object, error)
// getResourceHandler is an HTTP handler function for get requests. It delegates to the
// passed-in getterFunc to perform the actual get.
func getResourceHandler(scope RequestScope, getter getterFunc) restful.RouteFunction {
return func(req *restful.Request, res *restful.Response) {
w := res.ResponseWriter
namespace, name, err := scope.Namer.Name(req)
@@ -86,7 +91,7 @@ func GetResource(r rest.Getter, scope RequestScope) restful.RouteFunction {
ctx := scope.ContextFunc(req)
ctx = api.WithNamespace(ctx, namespace)
result, err := r.Get(ctx, name)
result, err := getter(ctx, name, req)
if err != nil {
errorJSON(err, scope.Codec, w)
return
@@ -99,6 +104,26 @@ func GetResource(r rest.Getter, scope RequestScope) restful.RouteFunction {
}
}
// GetResource returns a function that handles retrieving a single resource from a rest.Storage object.
func GetResource(r rest.Getter, scope RequestScope) restful.RouteFunction {
return getResourceHandler(scope,
func(ctx api.Context, name string, req *restful.Request) (runtime.Object, error) {
return r.Get(ctx, name)
})
}
// GetResourceWithOptions returns a function that handles retrieving a single resource from a rest.Storage object.
func GetResourceWithOptions(r rest.GetterWithOptions, scope RequestScope, getOptionsKind string) restful.RouteFunction {
return getResourceHandler(scope,
func(ctx api.Context, name string, req *restful.Request) (runtime.Object, error) {
opts, err := queryToObject(req.Request.URL.Query(), scope, getOptionsKind)
if err != nil {
return nil, err
}
return r.Get(ctx, name, opts)
})
}
// ListResource returns a function that handles retrieving a list of resources from a rest.Storage object.
func ListResource(r rest.Lister, rw rest.Watcher, scope RequestScope, forceWatch bool) restful.RouteFunction {
return func(req *restful.Request, res *restful.Response) {

View File

@@ -18,9 +18,7 @@ package kubelet
import (
"fmt"
"io"
"net"
"net/http"
"strconv"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
@@ -95,21 +93,3 @@ func (h *httpActionHandler) Run(podFullName string, uid types.UID, container *ap
_, err := h.client.Get(url)
return err
}
// FlushWriter provides wrapper for responseWriter with HTTP streaming capabilities
type FlushWriter struct {
flusher http.Flusher
writer io.Writer
}
// Write is a FlushWriter implementation of the io.Writer that sends any buffered data to the client.
func (fw *FlushWriter) Write(p []byte) (n int, err error) {
n, err = fw.writer.Write(p)
if err != nil {
return
}
if fw.flusher != nil {
fw.flusher.Flush()
}
return
}

View File

@@ -38,6 +38,7 @@ import (
kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/flushwriter"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/httpstream"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/httpstream/spdy"
"github.com/golang/glog"
@@ -262,15 +263,14 @@ func (s *Server) handleContainerLogs(w http.ResponseWriter, req *http.Request) {
return
}
fw := FlushWriter{writer: w}
if flusher, ok := fw.writer.(http.Flusher); ok {
fw.flusher = flusher
} else {
s.error(w, fmt.Errorf("unable to convert %v into http.Flusher", fw))
if _, ok := w.(http.Flusher); !ok {
s.error(w, fmt.Errorf("unable to convert %v into http.Flusher", w))
return
}
fw := flushwriter.Wrap(w)
w.Header().Set("Transfer-Encoding", "chunked")
w.WriteHeader(http.StatusOK)
err = s.host.GetKubeletContainerLogs(kubecontainer.GetPodFullName(pod), containerName, tail, follow, &fw, &fw)
err = s.host.GetKubeletContainerLogs(kubecontainer.GetPodFullName(pod), containerName, tail, follow, fw, fw)
if err != nil {
s.error(w, err)
return

View File

@@ -357,8 +357,8 @@ func logStackOnRecover(panicReason interface{}, httpWriter http.ResponseWriter)
// init initializes master.
func (m *Master) init(c *Config) {
podStorage, bindingStorage, podStatusStorage := podetcd.NewStorage(c.EtcdHelper)
podRegistry := pod.NewRegistry(podStorage)
podStorage := podetcd.NewStorage(c.EtcdHelper, c.KubeletClient)
podRegistry := pod.NewRegistry(podStorage.Pod)
eventRegistry := event.NewEtcdRegistry(c.EtcdHelper, uint64(c.EventTTL.Seconds()))
limitRangeRegistry := limitrange.NewEtcdRegistry(c.EtcdHelper)
@@ -385,10 +385,11 @@ func (m *Master) init(c *Config) {
// TODO: Factor out the core API registration
m.storage = map[string]rest.Storage{
"pods": podStorage,
"pods/status": podStatusStorage,
"pods/binding": bindingStorage,
"bindings": bindingStorage,
"pods": podStorage.Pod,
"pods/status": podStorage.Status,
"pods/log": podStorage.Log,
"pods/binding": podStorage.Binding,
"bindings": podStorage.Binding,
"replicationControllers": controllerStorage,
"services": service.NewStorage(m.serviceRegistry, c.Cloud, m.nodeRegistry, m.endpointRegistry, m.portalNet, c.ClusterName),

View File

@@ -44,9 +44,9 @@ func NewTestEtcdRegistry(client tools.EtcdClient) *Registry {
func NewTestEtcdRegistryWithPods(client tools.EtcdClient) *Registry {
helper := tools.NewEtcdHelper(client, latest.Codec)
podStorage, _, _ := podetcd.NewStorage(helper)
podStorage := podetcd.NewStorage(helper, nil)
endpointStorage := endpointetcd.NewStorage(helper)
registry := NewRegistry(helper, pod.NewRegistry(podStorage), endpoint.NewRegistry(endpointStorage))
registry := NewRegistry(helper, pod.NewRegistry(podStorage.Pod), endpoint.NewRegistry(endpointStorage))
return registry
}

View File

@@ -0,0 +1,19 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package rest has generic implementations of resources used for
// REST responses
package rest

View File

@@ -0,0 +1,69 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package rest
import (
"io"
"net/http"
"net/url"
"strings"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/rest"
)
// LocationStreamer is a resource that streams the contents of a particular
// location URL
type LocationStreamer struct {
Location *url.URL
Transport http.RoundTripper
ContentType string
Flush bool
}
// a LocationStreamer must implement a rest.ResourceStreamer
var _ rest.ResourceStreamer = &LocationStreamer{}
// IsAnAPIObject marks this object as a runtime.Object
func (*LocationStreamer) IsAnAPIObject() {}
// InputStream returns a stream with the contents of the URL location. If no location is provided,
// a null stream is returned.
func (s *LocationStreamer) InputStream(apiVersion, acceptHeader string) (stream io.ReadCloser, flush bool, contentType string, err error) {
if s.Location == nil {
// If no location was provided, return a null stream
return nil, false, "", nil
}
transport := s.Transport
if transport == nil {
transport = http.DefaultTransport
}
client := &http.Client{Transport: transport}
resp, err := client.Get(s.Location.String())
if err != nil {
return
}
contentType = s.ContentType
if len(contentType) == 0 {
contentType = resp.Header.Get("Content-Type")
if len(contentType) > 0 {
contentType = strings.TrimSpace(strings.SplitN(contentType, ";", 2)[0])
}
}
flush = s.Flush
stream = resp.Body
return
}

View File

@@ -0,0 +1,118 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package rest
import (
"bufio"
"bytes"
"fmt"
"io/ioutil"
"net/http"
"net/http/httptest"
"net/url"
"testing"
)
func TestInputStreamReader(t *testing.T) {
resultString := "Test output"
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
w.Write([]byte(resultString))
}))
defer s.Close()
u, err := url.Parse(s.URL)
if err != nil {
t.Errorf("Error parsing server URL: %v", err)
return
}
streamer := &LocationStreamer{
Location: u,
}
readCloser, _, _, err := streamer.InputStream("v1beta1", "text/plain")
if err != nil {
t.Errorf("Unexpected error when getting stream: %v", err)
return
}
defer readCloser.Close()
result, err := ioutil.ReadAll(readCloser)
if string(result) != resultString {
t.Errorf("Stream content does not match. Got: %s. Expected: %s.", string(result), resultString)
}
}
func TestInputStreamNullLocation(t *testing.T) {
streamer := &LocationStreamer{
Location: nil,
}
readCloser, _, _, err := streamer.InputStream("v1beta1", "text/plain")
if err != nil {
t.Errorf("Unexpected error when getting stream with null location: %v", err)
}
if readCloser != nil {
t.Errorf("Expected stream to be nil. Got: %#v", readCloser)
}
}
type testTransport struct {
body string
err error
}
func (tt *testTransport) RoundTrip(req *http.Request) (*http.Response, error) {
r := bufio.NewReader(bytes.NewBufferString(tt.body))
return http.ReadResponse(r, req)
}
func fakeTransport(mime, message string) http.RoundTripper {
content := fmt.Sprintf("HTTP/1.1 200 OK\nContent-Type: %s\n\n%s", mime, message)
return &testTransport{body: content}
}
func TestInputStreamContentType(t *testing.T) {
location, _ := url.Parse("http://www.example.com")
streamer := &LocationStreamer{
Location: location,
Transport: fakeTransport("application/json", "hello world"),
}
readCloser, _, contentType, err := streamer.InputStream("v1beta1", "text/plain")
if err != nil {
t.Errorf("Unexpected error when getting stream: %v", err)
return
}
defer readCloser.Close()
if contentType != "application/json" {
t.Errorf("Unexpected content type. Got: %s. Expected: application/json", contentType)
}
}
func TestInputStreamTransport(t *testing.T) {
message := "hello world"
location, _ := url.Parse("http://www.example.com")
streamer := &LocationStreamer{
Location: location,
Transport: fakeTransport("text/plain", message),
}
readCloser, _, _, err := streamer.InputStream("v1beta1", "text/plain")
if err != nil {
t.Errorf("Unexpected error when getting stream: %v", err)
return
}
defer readCloser.Close()
result, err := ioutil.ReadAll(readCloser)
if string(result) != message {
t.Errorf("Stream content does not match. Got: %s. Expected: %s.", string(result), message)
}
}

View File

@@ -25,23 +25,33 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
etcderr "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors/etcd"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/rest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic"
etcdgeneric "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic/etcd"
genericrest "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic/rest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/pod"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/fielderrors"
)
// rest implements a RESTStorage for pods against etcd
// PodStorage includes storage for pods and all sub resources
type PodStorage struct {
Pod *REST
Binding *BindingREST
Status *StatusREST
Log *LogREST
}
// REST implements a RESTStorage for pods against etcd
type REST struct {
etcdgeneric.Etcd
}
// NewStorage returns a RESTStorage object that will work against pods.
func NewStorage(h tools.EtcdHelper) (*REST, *BindingREST, *StatusREST) {
func NewStorage(h tools.EtcdHelper, k client.ConnectionInfoGetter) PodStorage {
prefix := "/registry/pods"
store := &etcdgeneric.Etcd{
NewFunc: func() runtime.Object { return &api.Pod{} },
@@ -74,7 +84,12 @@ func NewStorage(h tools.EtcdHelper) (*REST, *BindingREST, *StatusREST) {
statusStore.UpdateStrategy = pod.StatusStrategy
return &REST{*store}, &BindingREST{store: store}, &StatusREST{store: &statusStore}
return PodStorage{
Pod: &REST{*store},
Binding: &BindingREST{store: store},
Status: &StatusREST{store: &statusStore},
Log: &LogREST{store: store, kubeletConn: k},
}
}
// Implement Redirector.
@@ -90,6 +105,7 @@ type BindingREST struct {
store *etcdgeneric.Etcd
}
// New creates a new binding resource
func (r *BindingREST) New() runtime.Object {
return &api.Binding{}
}
@@ -165,6 +181,7 @@ type StatusREST struct {
store *etcdgeneric.Etcd
}
// New creates a new pod resource
func (r *StatusREST) New() runtime.Object {
return &api.Pod{}
}
@@ -173,3 +190,37 @@ func (r *StatusREST) New() runtime.Object {
func (r *StatusREST) Update(ctx api.Context, obj runtime.Object) (runtime.Object, bool, error) {
return r.store.Update(ctx, obj)
}
// LogREST implements the log endpoint for a Pod
type LogREST struct {
store *etcdgeneric.Etcd
kubeletConn client.ConnectionInfoGetter
}
// New creates a new Pod log options object
func (r *LogREST) New() runtime.Object {
return &api.PodLogOptions{}
}
// Get retrieves a runtime.Object that will stream the contents of the pod log
func (r *LogREST) Get(ctx api.Context, name string, opts runtime.Object) (runtime.Object, error) {
logOpts, ok := opts.(*api.PodLogOptions)
if !ok {
return nil, fmt.Errorf("Invalid options object: %#v", opts)
}
location, transport, err := pod.LogLocation(r.store, r.kubeletConn, ctx, name, logOpts)
if err != nil {
return nil, err
}
return &genericrest.LocationStreamer{
Location: location,
Transport: transport,
ContentType: "text/plain",
Flush: logOpts.Follow,
}, nil
}
// NewGetOptions creates a new options object
func (r *LogREST) NewGetOptions() runtime.Object {
return &api.PodLogOptions{}
}

View File

@@ -47,8 +47,8 @@ func newHelper(t *testing.T) (*tools.FakeEtcdClient, tools.EtcdHelper) {
func newStorage(t *testing.T) (*REST, *BindingREST, *StatusREST, *tools.FakeEtcdClient, tools.EtcdHelper) {
fakeEtcdClient, h := newHelper(t)
storage, bindingStorage, statusStorage := NewStorage(h)
return storage, bindingStorage, statusStorage, fakeEtcdClient, h
storage := NewStorage(h, nil)
return storage.Pod, storage.Binding, storage.Status, fakeEtcdClient, h
}
func validNewPod() *api.Pod {
@@ -89,7 +89,7 @@ func TestStorage(t *testing.T) {
func TestCreate(t *testing.T) {
fakeEtcdClient, helper := newHelper(t)
storage, _, _ := NewStorage(helper)
storage := NewStorage(helper, nil).Pod
test := resttest.New(t, storage, fakeEtcdClient.SetError)
pod := validNewPod()
pod.ObjectMeta = api.ObjectMeta{}
@@ -107,7 +107,7 @@ func TestCreate(t *testing.T) {
func TestDelete(t *testing.T) {
fakeEtcdClient, helper := newHelper(t)
storage, _, _ := NewStorage(helper)
storage := NewStorage(helper, nil).Pod
test := resttest.New(t, storage, fakeEtcdClient.SetError)
createFn := func() runtime.Object {
@@ -143,7 +143,7 @@ func expectPod(t *testing.T, out runtime.Object) (*api.Pod, bool) {
func TestCreateRegistryError(t *testing.T) {
fakeEtcdClient, helper := newHelper(t)
fakeEtcdClient.Err = fmt.Errorf("test error")
storage, _, _ := NewStorage(helper)
storage := NewStorage(helper, nil).Pod
pod := validNewPod()
_, err := storage.Create(api.NewDefaultContext(), pod)
@@ -154,7 +154,7 @@ func TestCreateRegistryError(t *testing.T) {
func TestCreateSetsFields(t *testing.T) {
fakeEtcdClient, helper := newHelper(t)
storage, _, _ := NewStorage(helper)
storage := NewStorage(helper, nil).Pod
pod := validNewPod()
_, err := storage.Create(api.NewDefaultContext(), pod)
if err != fakeEtcdClient.Err {
@@ -176,7 +176,7 @@ func TestCreateSetsFields(t *testing.T) {
func TestListError(t *testing.T) {
fakeEtcdClient, helper := newHelper(t)
fakeEtcdClient.Err = fmt.Errorf("test error")
storage, _, _ := NewStorage(helper)
storage := NewStorage(helper, nil).Pod
pods, err := storage.List(api.NewDefaultContext(), labels.Everything(), fields.Everything())
if err != fakeEtcdClient.Err {
t.Fatalf("Expected %#v, Got %#v", fakeEtcdClient.Err, err)
@@ -194,7 +194,7 @@ func TestListEmptyPodList(t *testing.T) {
E: fakeEtcdClient.NewError(tools.EtcdErrorCodeNotFound),
}
storage, _, _ := NewStorage(helper)
storage := NewStorage(helper, nil).Pod
pods, err := storage.List(api.NewContext(), labels.Everything(), fields.Everything())
if err != nil {
t.Fatalf("unexpected error: %v", err)
@@ -231,7 +231,7 @@ func TestListPodList(t *testing.T) {
},
},
}
storage, _, _ := NewStorage(helper)
storage := NewStorage(helper, nil).Pod
podsObj, err := storage.List(api.NewDefaultContext(), labels.Everything(), fields.Everything())
pods := podsObj.(*api.PodList)
@@ -280,7 +280,7 @@ func TestListPodListSelection(t *testing.T) {
},
},
}
storage, _, _ := NewStorage(helper)
storage := NewStorage(helper, nil).Pod
ctx := api.NewDefaultContext()
@@ -345,7 +345,7 @@ func TestListPodListSelection(t *testing.T) {
}
func TestPodDecode(t *testing.T) {
storage, _, _ := NewStorage(tools.EtcdHelper{})
storage := NewStorage(tools.EtcdHelper{}, nil).Pod
expected := validNewPod()
body, err := latest.Codec.Encode(expected)
if err != nil {
@@ -375,7 +375,7 @@ func TestGet(t *testing.T) {
},
},
}
storage, _, _ := NewStorage(helper)
storage := NewStorage(helper, nil).Pod
obj, err := storage.Get(api.WithNamespace(api.NewContext(), "test"), "foo")
pod := obj.(*api.Pod)
@@ -392,7 +392,7 @@ func TestGet(t *testing.T) {
func TestPodStorageValidatesCreate(t *testing.T) {
fakeEtcdClient, helper := newHelper(t)
fakeEtcdClient.Err = fmt.Errorf("test error")
storage, _, _ := NewStorage(helper)
storage := NewStorage(helper, nil).Pod
pod := validNewPod()
pod.Labels = map[string]string{
@@ -410,7 +410,7 @@ func TestPodStorageValidatesCreate(t *testing.T) {
// TODO: remove, this is covered by RESTTest.TestCreate
func TestCreatePod(t *testing.T) {
_, helper := newHelper(t)
storage, _, _ := NewStorage(helper)
storage := NewStorage(helper, nil).Pod
pod := validNewPod()
obj, err := storage.Create(api.NewDefaultContext(), pod)
@@ -432,7 +432,7 @@ func TestCreatePod(t *testing.T) {
// TODO: remove, this is covered by RESTTest.TestCreate
func TestCreateWithConflictingNamespace(t *testing.T) {
_, helper := newHelper(t)
storage, _, _ := NewStorage(helper)
storage := NewStorage(helper, nil).Pod
pod := validNewPod()
pod.Namespace = "not-default"
@@ -461,7 +461,7 @@ func TestUpdateWithConflictingNamespace(t *testing.T) {
},
},
}
storage, _, _ := NewStorage(helper)
storage := NewStorage(helper, nil).Pod
pod := validChangedPod()
pod.Namespace = "not-default"
@@ -578,7 +578,7 @@ func TestResourceLocation(t *testing.T) {
},
},
}
storage, _, _ := NewStorage(helper)
storage := NewStorage(helper, nil).Pod
redirector := rest.Redirector(storage)
location, _, err := redirector.ResourceLocation(api.NewDefaultContext(), tc.query)
@@ -616,7 +616,7 @@ func TestDeletePod(t *testing.T) {
},
},
}
storage, _, _ := NewStorage(helper)
storage := NewStorage(helper, nil).Pod
_, err := storage.Delete(api.NewDefaultContext(), "foo", nil)
if err != nil {

View File

@@ -26,6 +26,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/validation"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic"
@@ -133,6 +134,18 @@ type ResourceGetter interface {
Get(api.Context, string) (runtime.Object, error)
}
func getPod(getter ResourceGetter, ctx api.Context, name string) (*api.Pod, error) {
obj, err := getter.Get(ctx, name)
if err != nil {
return nil, err
}
pod := obj.(*api.Pod)
if pod == nil {
return nil, fmt.Errorf("Unexpected object type: %#v", pod)
}
return pod, nil
}
// ResourceLocation returns a URL to which one can send traffic for the specified pod.
func ResourceLocation(getter ResourceGetter, ctx api.Context, id string) (*url.URL, http.RoundTripper, error) {
// Allow ID as "podname" or "podname:port". If port is not specified,
@@ -148,14 +161,10 @@ func ResourceLocation(getter ResourceGetter, ctx api.Context, id string) (*url.U
port = parts[1]
}
obj, err := getter.Get(ctx, name)
pod, err := getPod(getter, ctx, name)
if err != nil {
return nil, nil, err
}
pod := obj.(*api.Pod)
if pod == nil {
return nil, nil, nil
}
// Try to figure out a port.
if port == "" {
@@ -177,3 +186,43 @@ func ResourceLocation(getter ResourceGetter, ctx api.Context, id string) (*url.U
}
return loc, nil, nil
}
// LogLocation returns a the log URL for a pod container. If opts.Container is blank
// and only one container is present in the pod, that container is used.
func LogLocation(getter ResourceGetter, connInfo client.ConnectionInfoGetter, ctx api.Context, name string, opts *api.PodLogOptions) (*url.URL, http.RoundTripper, error) {
pod, err := getPod(getter, ctx, name)
if err != nil {
return nil, nil, err
}
// Try to figure out a container
container := opts.Container
if container == "" {
if len(pod.Spec.Containers) == 1 {
container = pod.Spec.Containers[0].Name
} else {
return nil, nil, fmt.Errorf("a container name must be specified for pod %s", name)
}
}
nodeHost := pod.Status.HostIP
if len(nodeHost) == 0 {
// If pod has not been assigned a host, return an empty location
return nil, nil, nil
}
nodeScheme, nodePort, nodeTransport, err := connInfo.GetConnectionInfo(nodeHost)
if err != nil {
return nil, nil, err
}
params := url.Values{}
if opts.Follow {
params.Add("follow", "true")
}
loc := &url.URL{
Scheme: nodeScheme,
Host: fmt.Sprintf("%s:%d", nodeHost, nodePort),
Path: fmt.Sprintf("/containerLogs/%s/%s/%s", pod.Namespace, name, container),
RawQuery: params.Encode(),
}
return loc, nodeTransport, nil
}

View File

@@ -0,0 +1,19 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package flushwriter implements a wrapper for a writer that flushes on every
// write if that writer implements the io.Flusher interface
package flushwriter

View File

@@ -0,0 +1,53 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package flushwriter
import (
"io"
"net/http"
)
// Wrap wraps an io.Writer into a writer that flushes after every write if
// the writer implements the Flusher interface.
func Wrap(w io.Writer) io.Writer {
fw := &flushWriter{
writer: w,
}
if flusher, ok := w.(http.Flusher); ok {
fw.flusher = flusher
}
return fw
}
// flushWriter provides wrapper for responseWriter with HTTP streaming capabilities
type flushWriter struct {
flusher http.Flusher
writer io.Writer
}
// Write is a FlushWriter implementation of the io.Writer that sends any buffered
// data to the client.
func (fw *flushWriter) Write(p []byte) (n int, err error) {
n, err = fw.writer.Write(p)
if err != nil {
return
}
if fw.flusher != nil {
fw.flusher.Flush()
}
return
}

View File

@@ -0,0 +1,86 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package flushwriter
import (
"fmt"
"testing"
)
type writerWithFlush struct {
writeCount, flushCount int
err error
}
func (w *writerWithFlush) Flush() {
w.flushCount++
}
func (w *writerWithFlush) Write(p []byte) (n int, err error) {
w.writeCount++
return len(p), w.err
}
type writerWithNoFlush struct {
writeCount int
}
func (w *writerWithNoFlush) Write(p []byte) (n int, err error) {
w.writeCount++
return len(p), nil
}
func TestWriteWithFlush(t *testing.T) {
w := &writerWithFlush{}
fw := Wrap(w)
for i := 0; i < 10; i++ {
_, err := fw.Write([]byte("Test write"))
if err != nil {
t.Errorf("Unexpected error while writing with flush writer: %v", err)
}
}
if w.flushCount != 10 {
t.Errorf("Flush not called the expected number of times. Actual: %d", w.flushCount)
}
if w.writeCount != 10 {
t.Errorf("Write not called the expected number of times. Actual: %d", w.writeCount)
}
}
func TestWriteWithoutFlush(t *testing.T) {
w := &writerWithNoFlush{}
fw := Wrap(w)
for i := 0; i < 10; i++ {
_, err := fw.Write([]byte("Test write"))
if err != nil {
t.Errorf("Unexpected error while writing with flush writer: %v", err)
}
}
if w.writeCount != 10 {
t.Errorf("Write not called the expected number of times. Actual: %d", w.writeCount)
}
}
func TestWriteError(t *testing.T) {
e := fmt.Errorf("Error")
w := &writerWithFlush{err: e}
fw := Wrap(w)
_, err := fw.Write([]byte("Test write"))
if err != e {
t.Errorf("Did not get expected error. Got: %#v", err)
}
}