diff --git a/cmd/integration/integration.go b/cmd/integration/integration.go index 419df3cfce1..afc4c2ca9cb 100644 --- a/cmd/integration/integration.go +++ b/cmd/integration/integration.go @@ -106,6 +106,10 @@ func (fakeKubeletClient) GetNodeInfo(host string) (api.NodeInfo, error) { return api.NodeInfo{}, nil } +func (fakeKubeletClient) GetConnectionInfo(host string) (string, uint, http.RoundTripper, error) { + return "", 0, nil, errors.New("Not Implemented") +} + func (fakeKubeletClient) HealthCheck(host string) (probe.Result, error) { return probe.Success, nil } diff --git a/pkg/api/rest/rest.go b/pkg/api/rest/rest.go index 447377148a2..3698bf2948d 100644 --- a/pkg/api/rest/rest.go +++ b/pkg/api/rest/rest.go @@ -17,6 +17,9 @@ limitations under the License. package rest import ( + "net/http" + "net/url" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/fields" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" @@ -142,6 +145,6 @@ type StandardStorage interface { // Redirector know how to return a remote resource's location. type Redirector interface { - // ResourceLocation should return the remote location of the given resource, or an error. - ResourceLocation(ctx api.Context, name string) (remoteLocation string, err error) + // ResourceLocation should return the remote location of the given resource, and an optional transport to use to request it, or an error. + ResourceLocation(ctx api.Context, id string) (remoteLocation *url.URL, transport http.RoundTripper, err error) } diff --git a/pkg/apiserver/apiserver_test.go b/pkg/apiserver/apiserver_test.go index 45e2191022d..13d439e35e6 100644 --- a/pkg/apiserver/apiserver_test.go +++ b/pkg/apiserver/apiserver_test.go @@ -24,6 +24,7 @@ import ( "io/ioutil" "net/http" "net/http/httptest" + "net/url" "reflect" "strings" "sync" @@ -226,7 +227,7 @@ type SimpleRESTStorage struct { // The id requested, and location to return for ResourceLocation requestedResourceLocationID string - resourceLocation string + resourceLocation *url.URL expectedResourceNamespace string // If non-nil, called inside the WorkFunc when answering update, delete, create. @@ -315,18 +316,23 @@ func (storage *SimpleRESTStorage) Watch(ctx api.Context, label labels.Selector, } // Implement Redirector. -func (storage *SimpleRESTStorage) ResourceLocation(ctx api.Context, id string) (string, error) { +var _ = rest.Redirector(&SimpleRESTStorage{}) + +// Implement Redirector. +func (storage *SimpleRESTStorage) ResourceLocation(ctx api.Context, id string) (*url.URL, http.RoundTripper, error) { storage.checkContext(ctx) // validate that the namespace context on the request matches the expected input storage.requestedResourceNamespace = api.NamespaceValue(ctx) if storage.expectedResourceNamespace != storage.requestedResourceNamespace { - return "", fmt.Errorf("Expected request namespace %s, but got namespace %s", storage.expectedResourceNamespace, storage.requestedResourceNamespace) + return nil, nil, fmt.Errorf("Expected request namespace %s, but got namespace %s", storage.expectedResourceNamespace, storage.requestedResourceNamespace) } storage.requestedResourceLocationID = id if err := storage.errors["resourceLocation"]; err != nil { - return "", err + return nil, nil, err } - return storage.resourceLocation, nil + // Make a copy so the internal URL never gets mutated + locationCopy := *storage.resourceLocation + return &locationCopy, nil, nil } type LegacyRESTStorage struct { diff --git a/pkg/apiserver/proxy.go b/pkg/apiserver/proxy.go index 86d823d9c4e..6299dbfb386 100644 --- a/pkg/apiserver/proxy.go +++ b/pkg/apiserver/proxy.go @@ -19,6 +19,7 @@ package apiserver import ( "bytes" "compress/gzip" + "crypto/tls" "fmt" "io" "io/ioutil" @@ -140,7 +141,7 @@ func (r *ProxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { return } - location, err := redirector.ResourceLocation(ctx, id) + location, transport, err := redirector.ResourceLocation(ctx, id) if err != nil { httplog.LogOf(req, w).Addf("Error getting ResourceLocation: %v", err) status := errToAPIStatus(err) @@ -148,22 +149,31 @@ func (r *ProxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { httpCode = status.Code return } - - destURL, err := url.Parse(location) - if err != nil { - status := errToAPIStatus(err) - writeJSON(status.Code, r.codec, status, w) - httpCode = status.Code + if location == nil { + httplog.LogOf(req, w).Addf("ResourceLocation for %v returned nil", id) + notFound(w, req) + httpCode = http.StatusNotFound return } - if destURL.Scheme == "" { - // If no scheme was present in location, url.Parse sometimes mistakes - // hosts for paths. - destURL.Host = location + + // Default to http + if location.Scheme == "" { + location.Scheme = "http" } - destURL.Path = remainder - destURL.RawQuery = req.URL.RawQuery - newReq, err := http.NewRequest(req.Method, destURL.String(), req.Body) + // Add the subpath + if len(remainder) > 0 { + location.Path = singleJoiningSlash(location.Path, remainder) + } + // Start with anything returned from the storage, and add the original request's parameters + values := location.Query() + for k, vs := range req.URL.Query() { + for _, v := range vs { + values.Add(k, v) + } + } + location.RawQuery = values.Encode() + + newReq, err := http.NewRequest(req.Method, location.String(), req.Body) if err != nil { status := errToAPIStatus(err) writeJSON(status.Code, r.codec, status, w) @@ -177,29 +187,34 @@ func (r *ProxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { // TODO convert this entire proxy to an UpgradeAwareProxy similar to // https://github.com/openshift/origin/blob/master/pkg/util/httpproxy/upgradeawareproxy.go. // That proxy needs to be modified to support multiple backends, not just 1. - if r.tryUpgrade(w, req, newReq, destURL) { + if r.tryUpgrade(w, req, newReq, location, transport) { return } - proxy := httputil.NewSingleHostReverseProxy(&url.URL{Scheme: "http", Host: destURL.Host}) - proxy.Transport = &proxyTransport{ - proxyScheme: req.URL.Scheme, - proxyHost: req.URL.Host, - proxyPathPrepend: requestInfo.URLPath(), + proxy := httputil.NewSingleHostReverseProxy(&url.URL{Scheme: location.Scheme, Host: location.Host}) + if transport == nil { + prepend := path.Join(r.prefix, resource, id) + if len(namespace) > 0 { + prepend = path.Join(r.prefix, "namespaces", namespace, resource, id) + } + transport = &proxyTransport{ + proxyScheme: req.URL.Scheme, + proxyHost: req.URL.Host, + proxyPathPrepend: prepend, + } } + proxy.Transport = transport proxy.FlushInterval = 200 * time.Millisecond proxy.ServeHTTP(w, newReq) } // tryUpgrade returns true if the request was handled. -func (r *ProxyHandler) tryUpgrade(w http.ResponseWriter, req, newReq *http.Request, destURL *url.URL) bool { - connectionHeader := strings.ToLower(req.Header.Get(httpstream.HeaderConnection)) - if !strings.Contains(connectionHeader, strings.ToLower(httpstream.HeaderUpgrade)) || len(req.Header.Get(httpstream.HeaderUpgrade)) == 0 { +func (r *ProxyHandler) tryUpgrade(w http.ResponseWriter, req, newReq *http.Request, location *url.URL, transport http.RoundTripper) bool { + if !httpstream.IsUpgradeRequest(req) { return false } - //TODO support TLS? Doesn't look like proxyTransport does anything special ... - dialAddr := netutil.CanonicalAddr(destURL) - backendConn, err := net.Dial("tcp", dialAddr) + + backendConn, err := dialURL(location, transport) if err != nil { status := errToAPIStatus(err) writeJSON(status.Code, r.codec, status, w) @@ -246,6 +261,54 @@ func (r *ProxyHandler) tryUpgrade(w http.ResponseWriter, req, newReq *http.Reque return true } +func dialURL(url *url.URL, transport http.RoundTripper) (net.Conn, error) { + dialAddr := netutil.CanonicalAddr(url) + + switch url.Scheme { + case "http": + return net.Dial("tcp", dialAddr) + case "https": + // Get the tls config from the transport if we recognize it + var tlsConfig *tls.Config + if transport != nil { + httpTransport, ok := transport.(*http.Transport) + if ok { + tlsConfig = httpTransport.TLSClientConfig + } + } + + // Dial + tlsConn, err := tls.Dial("tcp", dialAddr, tlsConfig) + if err != nil { + return nil, err + } + + // Verify + host, _, _ := net.SplitHostPort(dialAddr) + if err := tlsConn.VerifyHostname(host); err != nil { + tlsConn.Close() + return nil, err + } + + return tlsConn, nil + default: + return nil, fmt.Errorf("Unknown scheme: %s", url.Scheme) + } +} + +// borrowed from net/http/httputil/reverseproxy.go +func singleJoiningSlash(a, b string) string { + aslash := strings.HasSuffix(a, "/") + bslash := strings.HasPrefix(b, "/") + switch { + case aslash && bslash: + return a + b[1:] + case !aslash && !bslash: + return a + "/" + b + } + return a + b +} + type proxyTransport struct { proxyScheme string proxyHost string diff --git a/pkg/apiserver/proxy_test.go b/pkg/apiserver/proxy_test.go index d3b71845da1..8dd6f68ca9c 100644 --- a/pkg/apiserver/proxy_test.go +++ b/pkg/apiserver/proxy_test.go @@ -275,9 +275,10 @@ func TestProxy(t *testing.T) { })) defer proxyServer.Close() + serverURL, _ := url.Parse(proxyServer.URL) simpleStorage := &SimpleRESTStorage{ errors: map[string]error{}, - resourceLocation: proxyServer.URL, + resourceLocation: serverURL, expectedResourceNamespace: item.reqNamespace, } @@ -335,9 +336,10 @@ func TestProxyUpgrade(t *testing.T) { })) defer backendServer.Close() + serverURL, _ := url.Parse(backendServer.URL) simpleStorage := &SimpleRESTStorage{ errors: map[string]error{}, - resourceLocation: backendServer.URL, + resourceLocation: serverURL, expectedResourceNamespace: "myns", } diff --git a/pkg/apiserver/redirect.go b/pkg/apiserver/redirect.go index 3527ffba67c..56f7fa3d606 100644 --- a/pkg/apiserver/redirect.go +++ b/pkg/apiserver/redirect.go @@ -17,7 +17,6 @@ limitations under the License. package apiserver import ( - "fmt" "net/http" "time" @@ -79,15 +78,26 @@ func (r *RedirectHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { return } - location, err := redirector.ResourceLocation(ctx, id) + location, _, err := redirector.ResourceLocation(ctx, id) if err != nil { status := errToAPIStatus(err) writeJSON(status.Code, r.codec, status, w) httpCode = status.Code return } + if location == nil { + httplog.LogOf(req, w).Addf("ResourceLocation for %v returned nil", id) + notFound(w, req) + httpCode = http.StatusNotFound + return + } - w.Header().Set("Location", fmt.Sprintf("http://%s", location)) + // Default to http + if location.Scheme == "" { + location.Scheme = "http" + } + + w.Header().Set("Location", location.String()) w.WriteHeader(http.StatusTemporaryRedirect) httpCode = http.StatusTemporaryRedirect } diff --git a/pkg/apiserver/redirect_test.go b/pkg/apiserver/redirect_test.go index 01da7efc727..d8ff29c6a0c 100644 --- a/pkg/apiserver/redirect_test.go +++ b/pkg/apiserver/redirect_test.go @@ -53,7 +53,7 @@ func TestRedirect(t *testing.T) { for _, item := range table { simpleStorage.errors["resourceLocation"] = item.err - simpleStorage.resourceLocation = item.id + simpleStorage.resourceLocation = &url.URL{Host: item.id} resp, err := client.Get(server.URL + "/api/version/redirect/foo/" + item.id) if resp == nil { t.Fatalf("Unexpected nil resp") @@ -104,7 +104,7 @@ func TestRedirectWithNamespaces(t *testing.T) { for _, item := range table { simpleStorage.errors["resourceLocation"] = item.err - simpleStorage.resourceLocation = item.id + simpleStorage.resourceLocation = &url.URL{Host: item.id} resp, err := client.Get(server.URL + "/api/version/redirect/namespaces/other/foo/" + item.id) if resp == nil { t.Fatalf("Unexpected nil resp") diff --git a/pkg/client/kubelet.go b/pkg/client/kubelet.go index 16f433cf223..b7c8aca7d22 100644 --- a/pkg/client/kubelet.go +++ b/pkg/client/kubelet.go @@ -40,6 +40,7 @@ type KubeletClient interface { KubeletHealthChecker PodInfoGetter NodeInfoGetter + ConnectionInfoGetter } // KubeletHealthchecker is an interface for healthchecking kubelets @@ -59,6 +60,10 @@ type NodeInfoGetter interface { GetNodeInfo(host string) (api.NodeInfo, error) } +type ConnectionInfoGetter interface { + GetConnectionInfo(host string) (scheme string, port uint, transport http.RoundTripper, error error) +} + // HTTPKubeletClient is the default implementation of PodInfoGetter and KubeletHealthchecker, accesses the kubelet over HTTP. type HTTPKubeletClient struct { Client *http.Client @@ -92,6 +97,14 @@ func NewKubeletClient(config *KubeletConfig) (KubeletClient, error) { }, nil } +func (c *HTTPKubeletClient) GetConnectionInfo(host string) (string, uint, http.RoundTripper, error) { + scheme := "http" + if c.EnableHttps { + scheme = "https" + } + return scheme, c.Port, c.Client.Transport, nil +} + func (c *HTTPKubeletClient) url(host, path, query string) string { scheme := "http" if c.EnableHttps { @@ -168,3 +181,7 @@ func (c FakeKubeletClient) GetNodeInfo(host string) (api.NodeInfo, error) { func (c FakeKubeletClient) HealthCheck(host string) (probe.Result, error) { return probe.Unknown, errors.New("Not Implemented") } + +func (c FakeKubeletClient) GetConnectionInfo(host string) (string, uint, http.RoundTripper, error) { + return "", 0, nil, errors.New("Not Implemented") +} diff --git a/pkg/cloudprovider/controller/nodecontroller_test.go b/pkg/cloudprovider/controller/nodecontroller_test.go index e46ca1ad683..20bd2f47383 100644 --- a/pkg/cloudprovider/controller/nodecontroller_test.go +++ b/pkg/cloudprovider/controller/nodecontroller_test.go @@ -20,6 +20,7 @@ import ( "errors" "fmt" "net" + "net/http" "reflect" "sort" "testing" @@ -135,6 +136,10 @@ func (c *FakeKubeletClient) GetNodeInfo(host string) (api.NodeInfo, error) { return api.NodeInfo{}, errors.New("Not Implemented") } +func (c *FakeKubeletClient) GetConnectionInfo(host string) (string, uint, http.RoundTripper, error) { + return "", 0, nil, errors.New("Not Implemented") +} + func (c *FakeKubeletClient) HealthCheck(host string) (probe.Result, error) { return c.Status, c.Err } diff --git a/pkg/master/master.go b/pkg/master/master.go index 432cb55ce33..0da20fd02fb 100644 --- a/pkg/master/master.go +++ b/pkg/master/master.go @@ -368,7 +368,7 @@ func (m *Master) init(c *Config) { m.endpointRegistry = registry m.nodeRegistry = registry - nodeStorage := minion.NewStorage(m.nodeRegistry) + nodeStorage := minion.NewStorage(m.nodeRegistry, c.KubeletClient) // TODO: unify the storage -> registry and storage -> client patterns nodeStorageClient := RESTStorageToNodes(nodeStorage) podCache := NewPodCache( diff --git a/pkg/registry/minion/rest.go b/pkg/registry/minion/rest.go index 148aa792a62..ff3dd791fcd 100644 --- a/pkg/registry/minion/rest.go +++ b/pkg/registry/minion/rest.go @@ -20,28 +20,32 @@ import ( "errors" "fmt" "net" + "net/http" + "net/url" "strconv" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" kerrors "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/rest" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/validation" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/fields" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" - "github.com/GoogleCloudPlatform/kubernetes/pkg/master/ports" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" ) // REST adapts minion into apiserver's RESTStorage model. type REST struct { - registry Registry + registry Registry + connection client.ConnectionInfoGetter } // NewStorage returns a new rest.Storage implementation for minion. -func NewStorage(m Registry) *REST { +func NewStorage(m Registry, connection client.ConnectionInfoGetter) *REST { return &REST{ - registry: m, + registry: m, + connection: connection, } } @@ -138,13 +142,29 @@ func (rs *REST) Watch(ctx api.Context, label labels.Selector, field fields.Selec return rs.registry.WatchMinions(ctx, label, field, resourceVersion) } +// Implement Redirector. +var _ = rest.Redirector(&REST{}) + // ResourceLocation returns a URL to which one can send traffic for the specified minion. -func (rs *REST) ResourceLocation(ctx api.Context, id string) (string, error) { +func (rs *REST) ResourceLocation(ctx api.Context, id string) (*url.URL, http.RoundTripper, error) { minion, err := rs.registry.GetMinion(ctx, id) if err != nil { - return "", err + return nil, nil, err } host := minion.Name - // TODO: Minion webservers should be secure! - return "http://" + net.JoinHostPort(host, strconv.Itoa(ports.KubeletPort)), nil + + scheme, port, transport, err := rs.connection.GetConnectionInfo(host) + if err != nil { + return nil, nil, err + } + + return &url.URL{ + Scheme: scheme, + Host: net.JoinHostPort( + host, + strconv.FormatUint(uint64(port), 10), + ), + }, + transport, + nil } diff --git a/pkg/registry/minion/rest_test.go b/pkg/registry/minion/rest_test.go index f49cdb60479..14b23a0a15a 100644 --- a/pkg/registry/minion/rest_test.go +++ b/pkg/registry/minion/rest_test.go @@ -17,6 +17,7 @@ limitations under the License. package minion import ( + "net/http" "testing" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" @@ -27,8 +28,15 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest" ) +type FakeConnectionInfoGetter struct { +} + +func (FakeConnectionInfoGetter) GetConnectionInfo(host string) (string, uint, http.RoundTripper, error) { + return "http", 12345, nil, nil +} + func TestMinionRegistryREST(t *testing.T) { - ms := NewStorage(registrytest.NewMinionRegistry([]string{"foo", "bar"}, api.NodeResources{})) + ms := NewStorage(registrytest.NewMinionRegistry([]string{"foo", "bar"}, api.NodeResources{}), FakeConnectionInfoGetter{}) ctx := api.NewContext() if obj, err := ms.Get(ctx, "foo"); err != nil || obj.(*api.Node).Name != "foo" { t.Errorf("missing expected object") @@ -88,7 +96,7 @@ func TestMinionRegistryREST(t *testing.T) { } func TestMinionRegistryValidUpdate(t *testing.T) { - storage := NewStorage(registrytest.NewMinionRegistry([]string{"foo", "bar"}, api.NodeResources{})) + storage := NewStorage(registrytest.NewMinionRegistry([]string{"foo", "bar"}, api.NodeResources{}), FakeConnectionInfoGetter{}) ctx := api.NewContext() obj, err := storage.Get(ctx, "foo") if err != nil { @@ -113,7 +121,7 @@ var ( ) func TestMinionRegistryValidatesCreate(t *testing.T) { - storage := NewStorage(registrytest.NewMinionRegistry([]string{"foo", "bar"}, api.NodeResources{})) + storage := NewStorage(registrytest.NewMinionRegistry([]string{"foo", "bar"}, api.NodeResources{}), FakeConnectionInfoGetter{}) ctx := api.NewContext() failureCases := map[string]api.Node{ "zero-length Name": { @@ -156,7 +164,7 @@ func contains(nodes *api.NodeList, nodeID string) bool { func TestCreate(t *testing.T) { registry := registrytest.NewMinionRegistry([]string{"foo", "bar"}, api.NodeResources{}) - test := resttest.New(t, NewStorage(registry), registry.SetError).ClusterScope() + test := resttest.New(t, NewStorage(registry, FakeConnectionInfoGetter{}), registry.SetError).ClusterScope() test.TestCreate( // valid &api.Node{ diff --git a/pkg/registry/pod/etcd/etcd.go b/pkg/registry/pod/etcd/etcd.go index 54514217b66..bcbcefc4e53 100644 --- a/pkg/registry/pod/etcd/etcd.go +++ b/pkg/registry/pod/etcd/etcd.go @@ -18,6 +18,8 @@ package etcd import ( "fmt" + "net/http" + "net/url" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" @@ -75,8 +77,11 @@ func NewStorage(h tools.EtcdHelper) (*REST, *BindingREST, *StatusREST) { return &REST{*store}, &BindingREST{store: store}, &StatusREST{store: &statusStore} } +// Implement Redirector. +var _ = rest.Redirector(&REST{}) + // ResourceLocation returns a pods location from its HostIP -func (r *REST) ResourceLocation(ctx api.Context, name string) (string, error) { +func (r *REST) ResourceLocation(ctx api.Context, name string) (*url.URL, http.RoundTripper, error) { return pod.ResourceLocation(r, ctx, name) } diff --git a/pkg/registry/pod/etcd/etcd_test.go b/pkg/registry/pod/etcd/etcd_test.go index 08010967a72..a425588c355 100644 --- a/pkg/registry/pod/etcd/etcd_test.go +++ b/pkg/registry/pod/etcd/etcd_test.go @@ -683,13 +683,19 @@ func TestResourceLocation(t *testing.T) { storage = storage.WithPodStatus(cache) redirector := rest.Redirector(storage) - location, err := redirector.ResourceLocation(api.NewDefaultContext(), tc.query) + location, _, err := redirector.ResourceLocation(api.NewDefaultContext(), tc.query) if err != nil { t.Errorf("Unexpected error: %v", err) } + if location == nil { + t.Errorf("Unexpected nil: %v", location) + } - if location != tc.location { - t.Errorf("Expected %v, but got %v", tc.location, location) + if location.Scheme != "" { + t.Errorf("Expected '%v', but got '%v'", "", location.Scheme) + } + if location.Host != tc.location { + t.Errorf("Expected %v, but got %v", tc.location, location.Host) } } } diff --git a/pkg/registry/pod/rest.go b/pkg/registry/pod/rest.go index 1ae742d2d83..63f3a21b53a 100644 --- a/pkg/registry/pod/rest.go +++ b/pkg/registry/pod/rest.go @@ -18,6 +18,9 @@ package pod import ( "fmt" + "net" + "net/http" + "net/url" "strings" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" @@ -151,12 +154,12 @@ type ResourceGetter interface { } // ResourceLocation returns a URL to which one can send traffic for the specified pod. -func ResourceLocation(getter ResourceGetter, ctx api.Context, id string) (string, error) { +func ResourceLocation(getter ResourceGetter, ctx api.Context, id string) (*url.URL, http.RoundTripper, error) { // Allow ID as "podname" or "podname:port". If port is not specified, // try to use the first defined port on the pod. parts := strings.Split(id, ":") if len(parts) > 2 { - return "", errors.NewBadRequest(fmt.Sprintf("invalid pod request %q", id)) + return nil, nil, errors.NewBadRequest(fmt.Sprintf("invalid pod request %q", id)) } name := parts[0] port := "" @@ -167,11 +170,11 @@ func ResourceLocation(getter ResourceGetter, ctx api.Context, id string) (string obj, err := getter.Get(ctx, name) if err != nil { - return "", err + return nil, nil, err } pod := obj.(*api.Pod) if pod == nil { - return "", nil + return nil, nil, nil } // Try to figure out a port. @@ -186,9 +189,11 @@ func ResourceLocation(getter ResourceGetter, ctx api.Context, id string) (string // We leave off the scheme ('http://') because we have no idea what sort of server // is listening at this endpoint. - loc := pod.Status.PodIP - if port != "" { - loc += fmt.Sprintf(":%s", port) + loc := &url.URL{} + if port == "" { + loc.Host = pod.Status.PodIP + } else { + loc.Host = net.JoinHostPort(pod.Status.PodIP, port) } - return loc, nil + return loc, nil, nil } diff --git a/pkg/registry/service/rest.go b/pkg/registry/service/rest.go index 83b79b6c0f2..deeaea12cfe 100644 --- a/pkg/registry/service/rest.go +++ b/pkg/registry/service/rest.go @@ -20,6 +20,8 @@ import ( "fmt" "math/rand" "net" + "net/http" + "net/url" "strconv" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" @@ -223,19 +225,24 @@ func (rs *REST) Update(ctx api.Context, obj runtime.Object) (runtime.Object, boo return out, false, err } +// Implement Redirector. +var _ = rest.Redirector(&REST{}) + // ResourceLocation returns a URL to which one can send traffic for the specified service. -func (rs *REST) ResourceLocation(ctx api.Context, id string) (string, error) { +func (rs *REST) ResourceLocation(ctx api.Context, id string) (*url.URL, http.RoundTripper, error) { eps, err := rs.registry.GetEndpoints(ctx, id) if err != nil { - return "", err + return nil, nil, err } if len(eps.Endpoints) == 0 { - return "", fmt.Errorf("no endpoints available for %v", id) + return nil, nil, fmt.Errorf("no endpoints available for %v", id) } // We leave off the scheme ('http://') because we have no idea what sort of server // is listening at this endpoint. ep := &eps.Endpoints[rand.Intn(len(eps.Endpoints))] - return net.JoinHostPort(ep.IP, strconv.Itoa(ep.Port)), nil + return &url.URL{ + Host: net.JoinHostPort(ep.IP, strconv.Itoa(ep.Port)), + }, nil, nil } func (rs *REST) getLoadbalancerName(ctx api.Context, service *api.Service) string { diff --git a/pkg/registry/service/rest_test.go b/pkg/registry/service/rest_test.go index c4bda67df3a..346c61faf1e 100644 --- a/pkg/registry/service/rest_test.go +++ b/pkg/registry/service/rest_test.go @@ -385,11 +385,14 @@ func TestServiceRegistryResourceLocation(t *testing.T) { }, }) redirector := rest.Redirector(storage) - location, err := redirector.ResourceLocation(ctx, "foo") + location, _, err := redirector.ResourceLocation(ctx, "foo") if err != nil { t.Errorf("Unexpected error: %v", err) } - if e, a := "foo:80", location; e != a { + if location == nil { + t.Errorf("Unexpected nil: %v", location) + } + if e, a := "//foo:80", location.String(); e != a { t.Errorf("Expected %v, but got %v", e, a) } if e, a := "foo", registry.GottenID; e != a { @@ -398,7 +401,7 @@ func TestServiceRegistryResourceLocation(t *testing.T) { // Test error path registry.Err = fmt.Errorf("fake error") - if _, err = redirector.ResourceLocation(ctx, "foo"); err == nil { + if _, _, err = redirector.ResourceLocation(ctx, "foo"); err == nil { t.Errorf("unexpected nil error") } } diff --git a/pkg/util/httpstream/httpstream.go b/pkg/util/httpstream/httpstream.go index 6568a13a7d9..1a7f53904b4 100644 --- a/pkg/util/httpstream/httpstream.go +++ b/pkg/util/httpstream/httpstream.go @@ -19,6 +19,7 @@ package httpstream import ( "io" "net/http" + "strings" "time" ) @@ -78,3 +79,13 @@ type Stream interface { // Headers returns the headers used to create the stream. Headers() http.Header } + +// IsUpgradeRequest returns true if the given request is a connection upgrade request +func IsUpgradeRequest(req *http.Request) bool { + for _, h := range req.Header[http.CanonicalHeaderKey(HeaderConnection)] { + if strings.Contains(strings.ToLower(h), strings.ToLower(HeaderUpgrade)) { + return true + } + } + return false +}