Improve ResourceLocation API, allow proxy to use authenticated transport

This commit is contained in:
Jordan Liggitt 2015-03-23 14:42:39 -04:00
parent 1dc7bcf53b
commit a75b501821
18 changed files with 247 additions and 72 deletions

View File

@ -106,6 +106,10 @@ func (fakeKubeletClient) GetNodeInfo(host string) (api.NodeInfo, error) {
return api.NodeInfo{}, nil return api.NodeInfo{}, nil
} }
func (fakeKubeletClient) GetConnectionInfo(host string) (string, uint, http.RoundTripper, error) {
return "", 0, nil, errors.New("Not Implemented")
}
func (fakeKubeletClient) HealthCheck(host string) (probe.Result, error) { func (fakeKubeletClient) HealthCheck(host string) (probe.Result, error) {
return probe.Success, nil return probe.Success, nil
} }

View File

@ -17,6 +17,9 @@ limitations under the License.
package rest package rest
import ( import (
"net/http"
"net/url"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields" "github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
@ -142,6 +145,6 @@ type StandardStorage interface {
// Redirector know how to return a remote resource's location. // Redirector know how to return a remote resource's location.
type Redirector interface { type Redirector interface {
// ResourceLocation should return the remote location of the given resource, or an error. // ResourceLocation should return the remote location of the given resource, and an optional transport to use to request it, or an error.
ResourceLocation(ctx api.Context, name string) (remoteLocation string, err error) ResourceLocation(ctx api.Context, id string) (remoteLocation *url.URL, transport http.RoundTripper, err error)
} }

View File

@ -24,6 +24,7 @@ import (
"io/ioutil" "io/ioutil"
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"net/url"
"reflect" "reflect"
"strings" "strings"
"sync" "sync"
@ -226,7 +227,7 @@ type SimpleRESTStorage struct {
// The id requested, and location to return for ResourceLocation // The id requested, and location to return for ResourceLocation
requestedResourceLocationID string requestedResourceLocationID string
resourceLocation string resourceLocation *url.URL
expectedResourceNamespace string expectedResourceNamespace string
// If non-nil, called inside the WorkFunc when answering update, delete, create. // If non-nil, called inside the WorkFunc when answering update, delete, create.
@ -315,18 +316,23 @@ func (storage *SimpleRESTStorage) Watch(ctx api.Context, label labels.Selector,
} }
// Implement Redirector. // Implement Redirector.
func (storage *SimpleRESTStorage) ResourceLocation(ctx api.Context, id string) (string, error) { var _ = rest.Redirector(&SimpleRESTStorage{})
// Implement Redirector.
func (storage *SimpleRESTStorage) ResourceLocation(ctx api.Context, id string) (*url.URL, http.RoundTripper, error) {
storage.checkContext(ctx) storage.checkContext(ctx)
// validate that the namespace context on the request matches the expected input // validate that the namespace context on the request matches the expected input
storage.requestedResourceNamespace = api.NamespaceValue(ctx) storage.requestedResourceNamespace = api.NamespaceValue(ctx)
if storage.expectedResourceNamespace != storage.requestedResourceNamespace { if storage.expectedResourceNamespace != storage.requestedResourceNamespace {
return "", fmt.Errorf("Expected request namespace %s, but got namespace %s", storage.expectedResourceNamespace, storage.requestedResourceNamespace) return nil, nil, fmt.Errorf("Expected request namespace %s, but got namespace %s", storage.expectedResourceNamespace, storage.requestedResourceNamespace)
} }
storage.requestedResourceLocationID = id storage.requestedResourceLocationID = id
if err := storage.errors["resourceLocation"]; err != nil { if err := storage.errors["resourceLocation"]; err != nil {
return "", err return nil, nil, err
} }
return storage.resourceLocation, nil // Make a copy so the internal URL never gets mutated
locationCopy := *storage.resourceLocation
return &locationCopy, nil, nil
} }
type LegacyRESTStorage struct { type LegacyRESTStorage struct {

View File

@ -19,6 +19,7 @@ package apiserver
import ( import (
"bytes" "bytes"
"compress/gzip" "compress/gzip"
"crypto/tls"
"fmt" "fmt"
"io" "io"
"io/ioutil" "io/ioutil"
@ -140,7 +141,7 @@ func (r *ProxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
return return
} }
location, err := redirector.ResourceLocation(ctx, id) location, transport, err := redirector.ResourceLocation(ctx, id)
if err != nil { if err != nil {
httplog.LogOf(req, w).Addf("Error getting ResourceLocation: %v", err) httplog.LogOf(req, w).Addf("Error getting ResourceLocation: %v", err)
status := errToAPIStatus(err) status := errToAPIStatus(err)
@ -148,22 +149,31 @@ func (r *ProxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
httpCode = status.Code httpCode = status.Code
return return
} }
if location == nil {
destURL, err := url.Parse(location) httplog.LogOf(req, w).Addf("ResourceLocation for %v returned nil", id)
if err != nil { notFound(w, req)
status := errToAPIStatus(err) httpCode = http.StatusNotFound
writeJSON(status.Code, r.codec, status, w)
httpCode = status.Code
return return
} }
if destURL.Scheme == "" {
// If no scheme was present in location, url.Parse sometimes mistakes // Default to http
// hosts for paths. if location.Scheme == "" {
destURL.Host = location location.Scheme = "http"
} }
destURL.Path = remainder // Add the subpath
destURL.RawQuery = req.URL.RawQuery if len(remainder) > 0 {
newReq, err := http.NewRequest(req.Method, destURL.String(), req.Body) location.Path = singleJoiningSlash(location.Path, remainder)
}
// Start with anything returned from the storage, and add the original request's parameters
values := location.Query()
for k, vs := range req.URL.Query() {
for _, v := range vs {
values.Add(k, v)
}
}
location.RawQuery = values.Encode()
newReq, err := http.NewRequest(req.Method, location.String(), req.Body)
if err != nil { if err != nil {
status := errToAPIStatus(err) status := errToAPIStatus(err)
writeJSON(status.Code, r.codec, status, w) writeJSON(status.Code, r.codec, status, w)
@ -177,29 +187,34 @@ func (r *ProxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
// TODO convert this entire proxy to an UpgradeAwareProxy similar to // TODO convert this entire proxy to an UpgradeAwareProxy similar to
// https://github.com/openshift/origin/blob/master/pkg/util/httpproxy/upgradeawareproxy.go. // https://github.com/openshift/origin/blob/master/pkg/util/httpproxy/upgradeawareproxy.go.
// That proxy needs to be modified to support multiple backends, not just 1. // That proxy needs to be modified to support multiple backends, not just 1.
if r.tryUpgrade(w, req, newReq, destURL) { if r.tryUpgrade(w, req, newReq, location, transport) {
return return
} }
proxy := httputil.NewSingleHostReverseProxy(&url.URL{Scheme: "http", Host: destURL.Host}) proxy := httputil.NewSingleHostReverseProxy(&url.URL{Scheme: location.Scheme, Host: location.Host})
proxy.Transport = &proxyTransport{ if transport == nil {
proxyScheme: req.URL.Scheme, prepend := path.Join(r.prefix, resource, id)
proxyHost: req.URL.Host, if len(namespace) > 0 {
proxyPathPrepend: requestInfo.URLPath(), prepend = path.Join(r.prefix, "namespaces", namespace, resource, id)
}
transport = &proxyTransport{
proxyScheme: req.URL.Scheme,
proxyHost: req.URL.Host,
proxyPathPrepend: prepend,
}
} }
proxy.Transport = transport
proxy.FlushInterval = 200 * time.Millisecond proxy.FlushInterval = 200 * time.Millisecond
proxy.ServeHTTP(w, newReq) proxy.ServeHTTP(w, newReq)
} }
// tryUpgrade returns true if the request was handled. // tryUpgrade returns true if the request was handled.
func (r *ProxyHandler) tryUpgrade(w http.ResponseWriter, req, newReq *http.Request, destURL *url.URL) bool { func (r *ProxyHandler) tryUpgrade(w http.ResponseWriter, req, newReq *http.Request, location *url.URL, transport http.RoundTripper) bool {
connectionHeader := strings.ToLower(req.Header.Get(httpstream.HeaderConnection)) if !httpstream.IsUpgradeRequest(req) {
if !strings.Contains(connectionHeader, strings.ToLower(httpstream.HeaderUpgrade)) || len(req.Header.Get(httpstream.HeaderUpgrade)) == 0 {
return false return false
} }
//TODO support TLS? Doesn't look like proxyTransport does anything special ...
dialAddr := netutil.CanonicalAddr(destURL) backendConn, err := dialURL(location, transport)
backendConn, err := net.Dial("tcp", dialAddr)
if err != nil { if err != nil {
status := errToAPIStatus(err) status := errToAPIStatus(err)
writeJSON(status.Code, r.codec, status, w) writeJSON(status.Code, r.codec, status, w)
@ -246,6 +261,54 @@ func (r *ProxyHandler) tryUpgrade(w http.ResponseWriter, req, newReq *http.Reque
return true return true
} }
func dialURL(url *url.URL, transport http.RoundTripper) (net.Conn, error) {
dialAddr := netutil.CanonicalAddr(url)
switch url.Scheme {
case "http":
return net.Dial("tcp", dialAddr)
case "https":
// Get the tls config from the transport if we recognize it
var tlsConfig *tls.Config
if transport != nil {
httpTransport, ok := transport.(*http.Transport)
if ok {
tlsConfig = httpTransport.TLSClientConfig
}
}
// Dial
tlsConn, err := tls.Dial("tcp", dialAddr, tlsConfig)
if err != nil {
return nil, err
}
// Verify
host, _, _ := net.SplitHostPort(dialAddr)
if err := tlsConn.VerifyHostname(host); err != nil {
tlsConn.Close()
return nil, err
}
return tlsConn, nil
default:
return nil, fmt.Errorf("Unknown scheme: %s", url.Scheme)
}
}
// borrowed from net/http/httputil/reverseproxy.go
func singleJoiningSlash(a, b string) string {
aslash := strings.HasSuffix(a, "/")
bslash := strings.HasPrefix(b, "/")
switch {
case aslash && bslash:
return a + b[1:]
case !aslash && !bslash:
return a + "/" + b
}
return a + b
}
type proxyTransport struct { type proxyTransport struct {
proxyScheme string proxyScheme string
proxyHost string proxyHost string

View File

@ -275,9 +275,10 @@ func TestProxy(t *testing.T) {
})) }))
defer proxyServer.Close() defer proxyServer.Close()
serverURL, _ := url.Parse(proxyServer.URL)
simpleStorage := &SimpleRESTStorage{ simpleStorage := &SimpleRESTStorage{
errors: map[string]error{}, errors: map[string]error{},
resourceLocation: proxyServer.URL, resourceLocation: serverURL,
expectedResourceNamespace: item.reqNamespace, expectedResourceNamespace: item.reqNamespace,
} }
@ -335,9 +336,10 @@ func TestProxyUpgrade(t *testing.T) {
})) }))
defer backendServer.Close() defer backendServer.Close()
serverURL, _ := url.Parse(backendServer.URL)
simpleStorage := &SimpleRESTStorage{ simpleStorage := &SimpleRESTStorage{
errors: map[string]error{}, errors: map[string]error{},
resourceLocation: backendServer.URL, resourceLocation: serverURL,
expectedResourceNamespace: "myns", expectedResourceNamespace: "myns",
} }

View File

@ -17,7 +17,6 @@ limitations under the License.
package apiserver package apiserver
import ( import (
"fmt"
"net/http" "net/http"
"time" "time"
@ -79,15 +78,26 @@ func (r *RedirectHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
return return
} }
location, err := redirector.ResourceLocation(ctx, id) location, _, err := redirector.ResourceLocation(ctx, id)
if err != nil { if err != nil {
status := errToAPIStatus(err) status := errToAPIStatus(err)
writeJSON(status.Code, r.codec, status, w) writeJSON(status.Code, r.codec, status, w)
httpCode = status.Code httpCode = status.Code
return return
} }
if location == nil {
httplog.LogOf(req, w).Addf("ResourceLocation for %v returned nil", id)
notFound(w, req)
httpCode = http.StatusNotFound
return
}
w.Header().Set("Location", fmt.Sprintf("http://%s", location)) // Default to http
if location.Scheme == "" {
location.Scheme = "http"
}
w.Header().Set("Location", location.String())
w.WriteHeader(http.StatusTemporaryRedirect) w.WriteHeader(http.StatusTemporaryRedirect)
httpCode = http.StatusTemporaryRedirect httpCode = http.StatusTemporaryRedirect
} }

View File

@ -53,7 +53,7 @@ func TestRedirect(t *testing.T) {
for _, item := range table { for _, item := range table {
simpleStorage.errors["resourceLocation"] = item.err simpleStorage.errors["resourceLocation"] = item.err
simpleStorage.resourceLocation = item.id simpleStorage.resourceLocation = &url.URL{Host: item.id}
resp, err := client.Get(server.URL + "/api/version/redirect/foo/" + item.id) resp, err := client.Get(server.URL + "/api/version/redirect/foo/" + item.id)
if resp == nil { if resp == nil {
t.Fatalf("Unexpected nil resp") t.Fatalf("Unexpected nil resp")
@ -104,7 +104,7 @@ func TestRedirectWithNamespaces(t *testing.T) {
for _, item := range table { for _, item := range table {
simpleStorage.errors["resourceLocation"] = item.err simpleStorage.errors["resourceLocation"] = item.err
simpleStorage.resourceLocation = item.id simpleStorage.resourceLocation = &url.URL{Host: item.id}
resp, err := client.Get(server.URL + "/api/version/redirect/namespaces/other/foo/" + item.id) resp, err := client.Get(server.URL + "/api/version/redirect/namespaces/other/foo/" + item.id)
if resp == nil { if resp == nil {
t.Fatalf("Unexpected nil resp") t.Fatalf("Unexpected nil resp")

View File

@ -40,6 +40,7 @@ type KubeletClient interface {
KubeletHealthChecker KubeletHealthChecker
PodInfoGetter PodInfoGetter
NodeInfoGetter NodeInfoGetter
ConnectionInfoGetter
} }
// KubeletHealthchecker is an interface for healthchecking kubelets // KubeletHealthchecker is an interface for healthchecking kubelets
@ -59,6 +60,10 @@ type NodeInfoGetter interface {
GetNodeInfo(host string) (api.NodeInfo, error) GetNodeInfo(host string) (api.NodeInfo, error)
} }
type ConnectionInfoGetter interface {
GetConnectionInfo(host string) (scheme string, port uint, transport http.RoundTripper, error error)
}
// HTTPKubeletClient is the default implementation of PodInfoGetter and KubeletHealthchecker, accesses the kubelet over HTTP. // HTTPKubeletClient is the default implementation of PodInfoGetter and KubeletHealthchecker, accesses the kubelet over HTTP.
type HTTPKubeletClient struct { type HTTPKubeletClient struct {
Client *http.Client Client *http.Client
@ -92,6 +97,14 @@ func NewKubeletClient(config *KubeletConfig) (KubeletClient, error) {
}, nil }, nil
} }
func (c *HTTPKubeletClient) GetConnectionInfo(host string) (string, uint, http.RoundTripper, error) {
scheme := "http"
if c.EnableHttps {
scheme = "https"
}
return scheme, c.Port, c.Client.Transport, nil
}
func (c *HTTPKubeletClient) url(host, path, query string) string { func (c *HTTPKubeletClient) url(host, path, query string) string {
scheme := "http" scheme := "http"
if c.EnableHttps { if c.EnableHttps {
@ -168,3 +181,7 @@ func (c FakeKubeletClient) GetNodeInfo(host string) (api.NodeInfo, error) {
func (c FakeKubeletClient) HealthCheck(host string) (probe.Result, error) { func (c FakeKubeletClient) HealthCheck(host string) (probe.Result, error) {
return probe.Unknown, errors.New("Not Implemented") return probe.Unknown, errors.New("Not Implemented")
} }
func (c FakeKubeletClient) GetConnectionInfo(host string) (string, uint, http.RoundTripper, error) {
return "", 0, nil, errors.New("Not Implemented")
}

View File

@ -20,6 +20,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"net" "net"
"net/http"
"reflect" "reflect"
"sort" "sort"
"testing" "testing"
@ -135,6 +136,10 @@ func (c *FakeKubeletClient) GetNodeInfo(host string) (api.NodeInfo, error) {
return api.NodeInfo{}, errors.New("Not Implemented") return api.NodeInfo{}, errors.New("Not Implemented")
} }
func (c *FakeKubeletClient) GetConnectionInfo(host string) (string, uint, http.RoundTripper, error) {
return "", 0, nil, errors.New("Not Implemented")
}
func (c *FakeKubeletClient) HealthCheck(host string) (probe.Result, error) { func (c *FakeKubeletClient) HealthCheck(host string) (probe.Result, error) {
return c.Status, c.Err return c.Status, c.Err
} }

View File

@ -368,7 +368,7 @@ func (m *Master) init(c *Config) {
m.endpointRegistry = registry m.endpointRegistry = registry
m.nodeRegistry = registry m.nodeRegistry = registry
nodeStorage := minion.NewStorage(m.nodeRegistry) nodeStorage := minion.NewStorage(m.nodeRegistry, c.KubeletClient)
// TODO: unify the storage -> registry and storage -> client patterns // TODO: unify the storage -> registry and storage -> client patterns
nodeStorageClient := RESTStorageToNodes(nodeStorage) nodeStorageClient := RESTStorageToNodes(nodeStorage)
podCache := NewPodCache( podCache := NewPodCache(

View File

@ -20,28 +20,32 @@ import (
"errors" "errors"
"fmt" "fmt"
"net" "net"
"net/http"
"net/url"
"strconv" "strconv"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
kerrors "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" kerrors "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/rest" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/rest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/validation" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/validation"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields" "github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/master/ports"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
) )
// REST adapts minion into apiserver's RESTStorage model. // REST adapts minion into apiserver's RESTStorage model.
type REST struct { type REST struct {
registry Registry registry Registry
connection client.ConnectionInfoGetter
} }
// NewStorage returns a new rest.Storage implementation for minion. // NewStorage returns a new rest.Storage implementation for minion.
func NewStorage(m Registry) *REST { func NewStorage(m Registry, connection client.ConnectionInfoGetter) *REST {
return &REST{ return &REST{
registry: m, registry: m,
connection: connection,
} }
} }
@ -138,13 +142,29 @@ func (rs *REST) Watch(ctx api.Context, label labels.Selector, field fields.Selec
return rs.registry.WatchMinions(ctx, label, field, resourceVersion) return rs.registry.WatchMinions(ctx, label, field, resourceVersion)
} }
// Implement Redirector.
var _ = rest.Redirector(&REST{})
// ResourceLocation returns a URL to which one can send traffic for the specified minion. // ResourceLocation returns a URL to which one can send traffic for the specified minion.
func (rs *REST) ResourceLocation(ctx api.Context, id string) (string, error) { func (rs *REST) ResourceLocation(ctx api.Context, id string) (*url.URL, http.RoundTripper, error) {
minion, err := rs.registry.GetMinion(ctx, id) minion, err := rs.registry.GetMinion(ctx, id)
if err != nil { if err != nil {
return "", err return nil, nil, err
} }
host := minion.Name host := minion.Name
// TODO: Minion webservers should be secure!
return "http://" + net.JoinHostPort(host, strconv.Itoa(ports.KubeletPort)), nil scheme, port, transport, err := rs.connection.GetConnectionInfo(host)
if err != nil {
return nil, nil, err
}
return &url.URL{
Scheme: scheme,
Host: net.JoinHostPort(
host,
strconv.FormatUint(uint64(port), 10),
),
},
transport,
nil
} }

View File

@ -17,6 +17,7 @@ limitations under the License.
package minion package minion
import ( import (
"net/http"
"testing" "testing"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
@ -27,8 +28,15 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest"
) )
type FakeConnectionInfoGetter struct {
}
func (FakeConnectionInfoGetter) GetConnectionInfo(host string) (string, uint, http.RoundTripper, error) {
return "http", 12345, nil, nil
}
func TestMinionRegistryREST(t *testing.T) { func TestMinionRegistryREST(t *testing.T) {
ms := NewStorage(registrytest.NewMinionRegistry([]string{"foo", "bar"}, api.NodeResources{})) ms := NewStorage(registrytest.NewMinionRegistry([]string{"foo", "bar"}, api.NodeResources{}), FakeConnectionInfoGetter{})
ctx := api.NewContext() ctx := api.NewContext()
if obj, err := ms.Get(ctx, "foo"); err != nil || obj.(*api.Node).Name != "foo" { if obj, err := ms.Get(ctx, "foo"); err != nil || obj.(*api.Node).Name != "foo" {
t.Errorf("missing expected object") t.Errorf("missing expected object")
@ -88,7 +96,7 @@ func TestMinionRegistryREST(t *testing.T) {
} }
func TestMinionRegistryValidUpdate(t *testing.T) { func TestMinionRegistryValidUpdate(t *testing.T) {
storage := NewStorage(registrytest.NewMinionRegistry([]string{"foo", "bar"}, api.NodeResources{})) storage := NewStorage(registrytest.NewMinionRegistry([]string{"foo", "bar"}, api.NodeResources{}), FakeConnectionInfoGetter{})
ctx := api.NewContext() ctx := api.NewContext()
obj, err := storage.Get(ctx, "foo") obj, err := storage.Get(ctx, "foo")
if err != nil { if err != nil {
@ -113,7 +121,7 @@ var (
) )
func TestMinionRegistryValidatesCreate(t *testing.T) { func TestMinionRegistryValidatesCreate(t *testing.T) {
storage := NewStorage(registrytest.NewMinionRegistry([]string{"foo", "bar"}, api.NodeResources{})) storage := NewStorage(registrytest.NewMinionRegistry([]string{"foo", "bar"}, api.NodeResources{}), FakeConnectionInfoGetter{})
ctx := api.NewContext() ctx := api.NewContext()
failureCases := map[string]api.Node{ failureCases := map[string]api.Node{
"zero-length Name": { "zero-length Name": {
@ -156,7 +164,7 @@ func contains(nodes *api.NodeList, nodeID string) bool {
func TestCreate(t *testing.T) { func TestCreate(t *testing.T) {
registry := registrytest.NewMinionRegistry([]string{"foo", "bar"}, api.NodeResources{}) registry := registrytest.NewMinionRegistry([]string{"foo", "bar"}, api.NodeResources{})
test := resttest.New(t, NewStorage(registry), registry.SetError).ClusterScope() test := resttest.New(t, NewStorage(registry, FakeConnectionInfoGetter{}), registry.SetError).ClusterScope()
test.TestCreate( test.TestCreate(
// valid // valid
&api.Node{ &api.Node{

View File

@ -18,6 +18,8 @@ package etcd
import ( import (
"fmt" "fmt"
"net/http"
"net/url"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
@ -75,8 +77,11 @@ func NewStorage(h tools.EtcdHelper) (*REST, *BindingREST, *StatusREST) {
return &REST{*store}, &BindingREST{store: store}, &StatusREST{store: &statusStore} return &REST{*store}, &BindingREST{store: store}, &StatusREST{store: &statusStore}
} }
// Implement Redirector.
var _ = rest.Redirector(&REST{})
// ResourceLocation returns a pods location from its HostIP // ResourceLocation returns a pods location from its HostIP
func (r *REST) ResourceLocation(ctx api.Context, name string) (string, error) { func (r *REST) ResourceLocation(ctx api.Context, name string) (*url.URL, http.RoundTripper, error) {
return pod.ResourceLocation(r, ctx, name) return pod.ResourceLocation(r, ctx, name)
} }

View File

@ -683,13 +683,19 @@ func TestResourceLocation(t *testing.T) {
storage = storage.WithPodStatus(cache) storage = storage.WithPodStatus(cache)
redirector := rest.Redirector(storage) redirector := rest.Redirector(storage)
location, err := redirector.ResourceLocation(api.NewDefaultContext(), tc.query) location, _, err := redirector.ResourceLocation(api.NewDefaultContext(), tc.query)
if err != nil { if err != nil {
t.Errorf("Unexpected error: %v", err) t.Errorf("Unexpected error: %v", err)
} }
if location == nil {
t.Errorf("Unexpected nil: %v", location)
}
if location != tc.location { if location.Scheme != "" {
t.Errorf("Expected %v, but got %v", tc.location, location) t.Errorf("Expected '%v', but got '%v'", "", location.Scheme)
}
if location.Host != tc.location {
t.Errorf("Expected %v, but got %v", tc.location, location.Host)
} }
} }
} }

View File

@ -18,6 +18,9 @@ package pod
import ( import (
"fmt" "fmt"
"net"
"net/http"
"net/url"
"strings" "strings"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
@ -151,12 +154,12 @@ type ResourceGetter interface {
} }
// ResourceLocation returns a URL to which one can send traffic for the specified pod. // ResourceLocation returns a URL to which one can send traffic for the specified pod.
func ResourceLocation(getter ResourceGetter, ctx api.Context, id string) (string, error) { func ResourceLocation(getter ResourceGetter, ctx api.Context, id string) (*url.URL, http.RoundTripper, error) {
// Allow ID as "podname" or "podname:port". If port is not specified, // Allow ID as "podname" or "podname:port". If port is not specified,
// try to use the first defined port on the pod. // try to use the first defined port on the pod.
parts := strings.Split(id, ":") parts := strings.Split(id, ":")
if len(parts) > 2 { if len(parts) > 2 {
return "", errors.NewBadRequest(fmt.Sprintf("invalid pod request %q", id)) return nil, nil, errors.NewBadRequest(fmt.Sprintf("invalid pod request %q", id))
} }
name := parts[0] name := parts[0]
port := "" port := ""
@ -167,11 +170,11 @@ func ResourceLocation(getter ResourceGetter, ctx api.Context, id string) (string
obj, err := getter.Get(ctx, name) obj, err := getter.Get(ctx, name)
if err != nil { if err != nil {
return "", err return nil, nil, err
} }
pod := obj.(*api.Pod) pod := obj.(*api.Pod)
if pod == nil { if pod == nil {
return "", nil return nil, nil, nil
} }
// Try to figure out a port. // Try to figure out a port.
@ -186,9 +189,11 @@ func ResourceLocation(getter ResourceGetter, ctx api.Context, id string) (string
// We leave off the scheme ('http://') because we have no idea what sort of server // We leave off the scheme ('http://') because we have no idea what sort of server
// is listening at this endpoint. // is listening at this endpoint.
loc := pod.Status.PodIP loc := &url.URL{}
if port != "" { if port == "" {
loc += fmt.Sprintf(":%s", port) loc.Host = pod.Status.PodIP
} else {
loc.Host = net.JoinHostPort(pod.Status.PodIP, port)
} }
return loc, nil return loc, nil, nil
} }

View File

@ -20,6 +20,8 @@ import (
"fmt" "fmt"
"math/rand" "math/rand"
"net" "net"
"net/http"
"net/url"
"strconv" "strconv"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
@ -223,19 +225,24 @@ func (rs *REST) Update(ctx api.Context, obj runtime.Object) (runtime.Object, boo
return out, false, err return out, false, err
} }
// Implement Redirector.
var _ = rest.Redirector(&REST{})
// ResourceLocation returns a URL to which one can send traffic for the specified service. // ResourceLocation returns a URL to which one can send traffic for the specified service.
func (rs *REST) ResourceLocation(ctx api.Context, id string) (string, error) { func (rs *REST) ResourceLocation(ctx api.Context, id string) (*url.URL, http.RoundTripper, error) {
eps, err := rs.registry.GetEndpoints(ctx, id) eps, err := rs.registry.GetEndpoints(ctx, id)
if err != nil { if err != nil {
return "", err return nil, nil, err
} }
if len(eps.Endpoints) == 0 { if len(eps.Endpoints) == 0 {
return "", fmt.Errorf("no endpoints available for %v", id) return nil, nil, fmt.Errorf("no endpoints available for %v", id)
} }
// We leave off the scheme ('http://') because we have no idea what sort of server // We leave off the scheme ('http://') because we have no idea what sort of server
// is listening at this endpoint. // is listening at this endpoint.
ep := &eps.Endpoints[rand.Intn(len(eps.Endpoints))] ep := &eps.Endpoints[rand.Intn(len(eps.Endpoints))]
return net.JoinHostPort(ep.IP, strconv.Itoa(ep.Port)), nil return &url.URL{
Host: net.JoinHostPort(ep.IP, strconv.Itoa(ep.Port)),
}, nil, nil
} }
func (rs *REST) getLoadbalancerName(ctx api.Context, service *api.Service) string { func (rs *REST) getLoadbalancerName(ctx api.Context, service *api.Service) string {

View File

@ -385,11 +385,14 @@ func TestServiceRegistryResourceLocation(t *testing.T) {
}, },
}) })
redirector := rest.Redirector(storage) redirector := rest.Redirector(storage)
location, err := redirector.ResourceLocation(ctx, "foo") location, _, err := redirector.ResourceLocation(ctx, "foo")
if err != nil { if err != nil {
t.Errorf("Unexpected error: %v", err) t.Errorf("Unexpected error: %v", err)
} }
if e, a := "foo:80", location; e != a { if location == nil {
t.Errorf("Unexpected nil: %v", location)
}
if e, a := "//foo:80", location.String(); e != a {
t.Errorf("Expected %v, but got %v", e, a) t.Errorf("Expected %v, but got %v", e, a)
} }
if e, a := "foo", registry.GottenID; e != a { if e, a := "foo", registry.GottenID; e != a {
@ -398,7 +401,7 @@ func TestServiceRegistryResourceLocation(t *testing.T) {
// Test error path // Test error path
registry.Err = fmt.Errorf("fake error") registry.Err = fmt.Errorf("fake error")
if _, err = redirector.ResourceLocation(ctx, "foo"); err == nil { if _, _, err = redirector.ResourceLocation(ctx, "foo"); err == nil {
t.Errorf("unexpected nil error") t.Errorf("unexpected nil error")
} }
} }

View File

@ -19,6 +19,7 @@ package httpstream
import ( import (
"io" "io"
"net/http" "net/http"
"strings"
"time" "time"
) )
@ -78,3 +79,13 @@ type Stream interface {
// Headers returns the headers used to create the stream. // Headers returns the headers used to create the stream.
Headers() http.Header Headers() http.Header
} }
// IsUpgradeRequest returns true if the given request is a connection upgrade request
func IsUpgradeRequest(req *http.Request) bool {
for _, h := range req.Header[http.CanonicalHeaderKey(HeaderConnection)] {
if strings.Contains(strings.ToLower(h), strings.ToLower(HeaderUpgrade)) {
return true
}
}
return false
}