Merge pull request #33718 from justinsb/arbitrary_names_2b

Automatic merge from submit-queue

Use nodeutil.GetHostIP consistently when talking to nodes

Most of our communications from apiserver -> nodes used
    nodutil.GetNodeHostIP, but a few places didn't - and this meant that the
    node name needed to be resolvable _and_ we needed to populate valid IP
    addresses.

```release-note
The apiserver now uses addresses reported by the kubelet in the Node object's status for apiserver->kubelet communications, rather than the name of the Node object. The address type used defaults to `InternalIP`, `ExternalIP`, and `LegacyHostIP` address types, in that order.
```
This commit is contained in:
Kubernetes Submit Queue 2016-10-10 11:00:26 -07:00 committed by GitHub
commit ff20b172ef
5 changed files with 40 additions and 33 deletions

View File

@ -28,6 +28,7 @@ import (
"k8s.io/kubernetes/pkg/api/validation" "k8s.io/kubernetes/pkg/api/validation"
"k8s.io/kubernetes/pkg/client/restclient" "k8s.io/kubernetes/pkg/client/restclient"
"k8s.io/kubernetes/pkg/client/transport" "k8s.io/kubernetes/pkg/client/transport"
"k8s.io/kubernetes/pkg/types"
utilnet "k8s.io/kubernetes/pkg/util/net" utilnet "k8s.io/kubernetes/pkg/util/net"
) )
@ -51,11 +52,11 @@ type KubeletClientConfig struct {
// KubeletClient is an interface for all kubelet functionality // KubeletClient is an interface for all kubelet functionality
type KubeletClient interface { type KubeletClient interface {
GetRawConnectionInfo(ctx api.Context, nodeName string) (scheme string, port uint, transport http.RoundTripper, err error) GetRawConnectionInfo(ctx api.Context, nodeName types.NodeName) (scheme string, port uint, transport http.RoundTripper, err error)
} }
type ConnectionInfoGetter interface { type ConnectionInfoGetter interface {
GetConnectionInfo(ctx api.Context, nodeName string) (scheme string, port uint, transport http.RoundTripper, err error) GetConnectionInfo(ctx api.Context, nodeName types.NodeName) (scheme string, host string, port uint, transport http.RoundTripper, err error)
} }
// HTTPKubeletClient is the default implementation of KubeletHealthchecker, accesses the kubelet over HTTP. // HTTPKubeletClient is the default implementation of KubeletHealthchecker, accesses the kubelet over HTTP.
@ -98,8 +99,8 @@ func NewStaticKubeletClient(config *KubeletClientConfig) (KubeletClient, error)
} }
// In default HTTPKubeletClient ctx is unused. // In default HTTPKubeletClient ctx is unused.
func (c *HTTPKubeletClient) GetRawConnectionInfo(ctx api.Context, nodeName string) (string, uint, http.RoundTripper, error) { func (c *HTTPKubeletClient) GetRawConnectionInfo(ctx api.Context, nodeName types.NodeName) (string, uint, http.RoundTripper, error) {
if errs := validation.ValidateNodeName(nodeName, false); len(errs) != 0 { if errs := validation.ValidateNodeName(string(nodeName), false); len(errs) != 0 {
return "", 0, nil, fmt.Errorf("invalid node name: %s", strings.Join(errs, ";")) return "", 0, nil, fmt.Errorf("invalid node name: %s", strings.Join(errs, ";"))
} }
scheme := "http" scheme := "http"
@ -114,7 +115,7 @@ func (c *HTTPKubeletClient) GetRawConnectionInfo(ctx api.Context, nodeName strin
// no kubelets. // no kubelets.
type FakeKubeletClient struct{} type FakeKubeletClient struct{}
func (c FakeKubeletClient) GetRawConnectionInfo(ctx api.Context, nodeName string) (string, uint, http.RoundTripper, error) { func (c FakeKubeletClient) GetRawConnectionInfo(ctx api.Context, nodeName types.NodeName) (string, uint, http.RoundTripper, error) {
return "", 0, nil, errors.New("Not Implemented") return "", 0, nil, errors.New("Not Implemented")
} }

View File

@ -30,6 +30,8 @@ import (
"k8s.io/kubernetes/pkg/registry/generic" "k8s.io/kubernetes/pkg/registry/generic"
"k8s.io/kubernetes/pkg/registry/generic/registry" "k8s.io/kubernetes/pkg/registry/generic/registry"
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/types"
nodeutil "k8s.io/kubernetes/pkg/util/node"
) )
// NodeStorage includes storage for nodes and all sub resources // NodeStorage includes storage for nodes and all sub resources
@ -126,30 +128,31 @@ func (r *REST) ResourceLocation(ctx api.Context, id string) (*url.URL, http.Roun
var _ = client.ConnectionInfoGetter(&REST{}) var _ = client.ConnectionInfoGetter(&REST{})
func (r *REST) getKubeletPort(ctx api.Context, nodeName string) (int, error) { func (r *REST) GetConnectionInfo(ctx api.Context, nodeName types.NodeName) (string, string, uint, http.RoundTripper, error) {
// We probably shouldn't care about context when looking for Node object. scheme, port, transport, err := r.connection.GetRawConnectionInfo(ctx, nodeName)
obj, err := r.Get(ctx, nodeName)
if err != nil { if err != nil {
return 0, err return "", "", 0, nil, err
}
// We probably shouldn't care about context when looking for Node object.
obj, err := r.Get(ctx, string(nodeName))
if err != nil {
return "", "", 0, nil, err
} }
node, ok := obj.(*api.Node) node, ok := obj.(*api.Node)
if !ok { if !ok {
return 0, fmt.Errorf("Unexpected object type: %#v", node) return "", "", 0, nil, fmt.Errorf("Unexpected object type: %#v", node)
} }
return int(node.Status.DaemonEndpoints.KubeletEndpoint.Port), nil
}
func (c *REST) GetConnectionInfo(ctx api.Context, nodeName string) (string, uint, http.RoundTripper, error) { hostIP, err := nodeutil.GetNodeHostIP(node)
scheme, port, transport, err := c.connection.GetRawConnectionInfo(ctx, nodeName)
if err != nil { if err != nil {
return "", 0, nil, err return "", "", 0, nil, err
}
daemonPort, err := c.getKubeletPort(ctx, nodeName)
if err != nil {
return "", 0, nil, err
} }
host := hostIP.String()
daemonPort := int(node.Status.DaemonEndpoints.KubeletEndpoint.Port)
if daemonPort > 0 { if daemonPort > 0 {
return scheme, uint(daemonPort), transport, nil return scheme, host, uint(daemonPort), transport, nil
} }
return scheme, port, transport, nil return scheme, host, port, transport, nil
} }

View File

@ -28,12 +28,13 @@ import (
"k8s.io/kubernetes/pkg/registry/registrytest" "k8s.io/kubernetes/pkg/registry/registrytest"
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
etcdtesting "k8s.io/kubernetes/pkg/storage/etcd/testing" etcdtesting "k8s.io/kubernetes/pkg/storage/etcd/testing"
"k8s.io/kubernetes/pkg/types"
) )
type fakeConnectionInfoGetter struct { type fakeConnectionInfoGetter struct {
} }
func (fakeConnectionInfoGetter) GetRawConnectionInfo(ctx api.Context, nodeName string) (string, uint, http.RoundTripper, error) { func (fakeConnectionInfoGetter) GetRawConnectionInfo(ctx api.Context, nodeName types.NodeName) (string, uint, http.RoundTripper, error) {
return "http", 12345, nil, nil return "http", 12345, nil, nil
} }

View File

@ -33,6 +33,7 @@ import (
"k8s.io/kubernetes/pkg/registry/generic" "k8s.io/kubernetes/pkg/registry/generic"
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
pkgstorage "k8s.io/kubernetes/pkg/storage" pkgstorage "k8s.io/kubernetes/pkg/storage"
"k8s.io/kubernetes/pkg/types"
utilnet "k8s.io/kubernetes/pkg/util/net" utilnet "k8s.io/kubernetes/pkg/util/net"
nodeutil "k8s.io/kubernetes/pkg/util/node" nodeutil "k8s.io/kubernetes/pkg/util/node"
"k8s.io/kubernetes/pkg/util/validation/field" "k8s.io/kubernetes/pkg/util/validation/field"
@ -189,13 +190,13 @@ func ResourceLocation(getter ResourceGetter, connection client.ConnectionInfoGet
// We check if we want to get a default Kubelet's transport. It happens if either: // We check if we want to get a default Kubelet's transport. It happens if either:
// - no port is specified in request (Kubelet's port is default), // - no port is specified in request (Kubelet's port is default),
// - we're using Port stored as a DaemonEndpoint and requested port is a Kubelet's port stored in the DaemonEndpoint, // - we're using Port stored as a DaemonEndpoint and requested port is a Kubelet's port stored in the DaemonEndpoint,
// - there's no information in the API about DaemonEnpoint (legacy cluster) and requested port is equal to ports.KubeletPort (cluster-wide config) // - there's no information in the API about DaemonEndpoint (legacy cluster) and requested port is equal to ports.KubeletPort (cluster-wide config)
kubeletPort := node.Status.DaemonEndpoints.KubeletEndpoint.Port kubeletPort := node.Status.DaemonEndpoints.KubeletEndpoint.Port
if kubeletPort == 0 { if kubeletPort == 0 {
kubeletPort = ports.KubeletPort kubeletPort = ports.KubeletPort
} }
if portReq == "" || strconv.Itoa(int(kubeletPort)) == portReq { if portReq == "" || strconv.Itoa(int(kubeletPort)) == portReq {
scheme, port, kubeletTransport, err := connection.GetConnectionInfo(ctx, node.Name) scheme, host, port, kubeletTransport, err := connection.GetConnectionInfo(ctx, types.NodeName(node.Name))
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }

View File

@ -34,6 +34,7 @@ import (
"k8s.io/kubernetes/pkg/registry/generic" "k8s.io/kubernetes/pkg/registry/generic"
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage" "k8s.io/kubernetes/pkg/storage"
"k8s.io/kubernetes/pkg/types"
utilnet "k8s.io/kubernetes/pkg/util/net" utilnet "k8s.io/kubernetes/pkg/util/net"
"k8s.io/kubernetes/pkg/util/validation/field" "k8s.io/kubernetes/pkg/util/validation/field"
) )
@ -299,12 +300,12 @@ func LogLocation(
return nil, nil, errors.NewBadRequest(fmt.Sprintf("container %s is not valid for pod %s", container, name)) return nil, nil, errors.NewBadRequest(fmt.Sprintf("container %s is not valid for pod %s", container, name))
} }
} }
nodeHost := pod.Spec.NodeName nodeName := types.NodeName(pod.Spec.NodeName)
if len(nodeHost) == 0 { if len(nodeName) == 0 {
// If pod has not been assigned a host, return an empty location // If pod has not been assigned a host, return an empty location
return nil, nil, nil return nil, nil, nil
} }
nodeScheme, nodePort, nodeTransport, err := connInfo.GetConnectionInfo(ctx, nodeHost) nodeScheme, nodeHost, nodePort, nodeTransport, err := connInfo.GetConnectionInfo(ctx, nodeName)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
@ -450,12 +451,12 @@ func streamLocation(
return nil, nil, errors.NewBadRequest(fmt.Sprintf("container %s is not valid for pod %s", container, name)) return nil, nil, errors.NewBadRequest(fmt.Sprintf("container %s is not valid for pod %s", container, name))
} }
} }
nodeHost := pod.Spec.NodeName nodeName := types.NodeName(pod.Spec.NodeName)
if len(nodeHost) == 0 { if len(nodeName) == 0 {
// If pod has not been assigned a host, return an empty location // If pod has not been assigned a host, return an empty location
return nil, nil, errors.NewBadRequest(fmt.Sprintf("pod %s does not have a host assigned", name)) return nil, nil, errors.NewBadRequest(fmt.Sprintf("pod %s does not have a host assigned", name))
} }
nodeScheme, nodePort, nodeTransport, err := connInfo.GetConnectionInfo(ctx, nodeHost) nodeScheme, nodeHost, nodePort, nodeTransport, err := connInfo.GetConnectionInfo(ctx, nodeName)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
@ -484,12 +485,12 @@ func PortForwardLocation(
return nil, nil, err return nil, nil, err
} }
nodeHost := pod.Spec.NodeName nodeName := types.NodeName(pod.Spec.NodeName)
if len(nodeHost) == 0 { if len(nodeName) == 0 {
// If pod has not been assigned a host, return an empty location // If pod has not been assigned a host, return an empty location
return nil, nil, errors.NewBadRequest(fmt.Sprintf("pod %s does not have a host assigned", name)) return nil, nil, errors.NewBadRequest(fmt.Sprintf("pod %s does not have a host assigned", name))
} }
nodeScheme, nodePort, nodeTransport, err := connInfo.GetConnectionInfo(ctx, nodeHost) nodeScheme, nodeHost, nodePort, nodeTransport, err := connInfo.GetConnectionInfo(ctx, nodeName)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }