diff --git a/DESIGN.md b/DESIGN.md index b3c081a49bc..1bbd50ff26e 100644 --- a/DESIGN.md +++ b/DESIGN.md @@ -96,9 +96,9 @@ There are 4 ways that a container manifest can be provided to the Kubelet: ### Kubernetes Proxy -Each node also runs a simple network proxy. This reflects `services` as defined in the Kubernetes API on each node and can do simple TCP stream forwarding or round robin TCP forwarding across a set of backends. +Each node also runs a simple network proxy. This reflects `services` (see [here](docs/services.md) for more details) as defined in the Kubernetes API on each node and can do simple TCP and UDP stream forwarding (round robin) across a set of backends. -Service endpoints are currently found through [Docker-links-compatible](https://docs.docker.com/userguide/dockerlinks/) environment variables specifying ports opened by the service proxy. Currently the user must select a unique port to expose the service on on the proxy, as well as the container's port to target. +Service endpoints are currently found through environment variables (both [Docker-links-compatible](https://docs.docker.com/userguide/dockerlinks/) and Kubernetes {FOO}_SERVICE_HOST and {FOO}_SERVICE_PORT variables are supported). These variables resolve to ports managed by the service proxy. ## The Kubernetes Control Plane diff --git a/cluster/gce/config-default.sh b/cluster/gce/config-default.sh index 09595df99f9..f822acbd4c0 100755 --- a/cluster/gce/config-default.sh +++ b/cluster/gce/config-default.sh @@ -31,3 +31,4 @@ MINION_IP_RANGES=($(eval echo "10.244.{1..${NUM_MINIONS}}.0/24")) MINION_SCOPES="compute-rw" # Increase the sleep interval value if concerned about API rate limits. 3, in seconds, is the default. POLL_SLEEP_INTERVAL=3 +PORTAL_NET="10.0.0.0/16" diff --git a/cluster/gce/config-test.sh b/cluster/gce/config-test.sh index 3c9a12c88e3..a5cc2389798 100755 --- a/cluster/gce/config-test.sh +++ b/cluster/gce/config-test.sh @@ -31,3 +31,4 @@ MINION_IP_RANGES=($(eval echo "10.245.{1..${NUM_MINIONS}}.0/24")) MINION_SCOPES="" # Increase the sleep interval value if concerned about API rate limits. 3, in seconds, is the default. POLL_SLEEP_INTERVAL=3 +PORTAL_NET="10.0.0.0/16" diff --git a/cluster/gce/templates/create-dynamic-salt-files.sh b/cluster/gce/templates/create-dynamic-salt-files.sh index 9255fbf0c24..afea82b88c8 100644 --- a/cluster/gce/templates/create-dynamic-salt-files.sh +++ b/cluster/gce/templates/create-dynamic-salt-files.sh @@ -21,6 +21,7 @@ mkdir -p /srv/salt-overlay/pillar cat </srv/salt-overlay/pillar/cluster-params.sls node_instance_prefix: $NODE_INSTANCE_PREFIX +portal_net: $PORTAL_NET EOF mkdir -p /srv/salt-overlay/salt/nginx diff --git a/cluster/gce/util.sh b/cluster/gce/util.sh index 1631820be10..1dda16160e4 100755 --- a/cluster/gce/util.sh +++ b/cluster/gce/util.sh @@ -263,6 +263,7 @@ function kube-up { echo "readonly SERVER_BINARY_TAR_URL='${SERVER_BINARY_TAR_URL}'" echo "readonly SALT_TAR_URL='${SALT_TAR_URL}'" echo "readonly MASTER_HTPASSWD='${htpasswd}'" + echo "readonly PORTAL_NET='${PORTAL_NET}'" grep -v "^#" "${KUBE_ROOT}/cluster/gce/templates/create-dynamic-salt-files.sh" grep -v "^#" "${KUBE_ROOT}/cluster/gce/templates/download-release.sh" grep -v "^#" "${KUBE_ROOT}/cluster/gce/templates/salt-master.sh" @@ -530,3 +531,15 @@ function test-teardown { "${MINION_TAG}-${INSTANCE_PREFIX}-http-alt" || true > /dev/null "${KUBE_ROOT}/cluster/kube-down.sh" > /dev/null } + +# SSH to a node by name ($1) and run a command ($2). +function ssh-to-node { + local node="$1" + local cmd="$2" + gcutil --log_level=WARNING ssh --ssh_arg "-o LogLevel=quiet" "${node}" "${cmd}" +} + +# Restart the kube-proxy on a node ($1) +function restart-kube-proxy { + ssh-to-node "$1" "sudo /etc/init.d/kube-proxy restart" +} diff --git a/cluster/saltbase/salt/apiserver/default b/cluster/saltbase/salt/apiserver/default index d84085fe878..f3a902bbda1 100644 --- a/cluster/saltbase/salt/apiserver/default +++ b/cluster/saltbase/salt/apiserver/default @@ -55,5 +55,8 @@ {%- set minion_regexp = "" %} {% endif %} {% endif %} +{% if pillar['portal_net'] is defined %} + {% set portal_net = "-portal_net=" + pillar['portal_net'] %} +{% endif %} -DAEMON_ARGS="{{daemon_args}} {{address}} {{machines}} {{etcd_servers}} {{ minion_regexp }} {{ cloud_provider }} --allow_privileged={{pillar['allow_privileged']}}" +DAEMON_ARGS="{{daemon_args}} {{address}} {{machines}} {{etcd_servers}} {{ minion_regexp }} {{ cloud_provider }} --allow_privileged={{pillar['allow_privileged']}} {{portal_net}}" diff --git a/cluster/saltbase/salt/kube-proxy/initd b/cluster/saltbase/salt/kube-proxy/initd index c1d7cdfc254..9e3e94d7b2c 100644 --- a/cluster/saltbase/salt/kube-proxy/initd +++ b/cluster/saltbase/salt/kube-proxy/initd @@ -22,7 +22,7 @@ DAEMON_ARGS="" DAEMON_LOG_FILE=/var/log/$NAME.log PIDFILE=/var/run/$NAME.pid SCRIPTNAME=/etc/init.d/$NAME -DAEMON_USER=kube-proxy +DAEMON_USER=root # Exit if the package is not installed [ -x "$DAEMON" ] || exit 0 diff --git a/cluster/vagrant/config-default.sh b/cluster/vagrant/config-default.sh index bd9b21378ad..6e8a96c8722 100755 --- a/cluster/vagrant/config-default.sh +++ b/cluster/vagrant/config-default.sh @@ -31,9 +31,13 @@ MINION_TAG="${INSTANCE_PREFIX}-minion" # IP LOCATIONS FOR INTERACTING WITH THE MINIONS MINION_IP_BASE="10.245.2." +declare -A VAGRANT_MINION_NAMES_BY_IP + for (( i=0; i <${NUM_MINIONS}; i++)) do KUBE_MINION_IP_ADDRESSES[$i]="${MINION_IP_BASE}$[$i+2]" MINION_IP[$i]="${MINION_IP_BASE}$[$i+2]" MINION_NAMES[$i]="${MINION_IP[$i]}" VAGRANT_MINION_NAMES[$i]="minion-$[$i+1]" + + VAGRANT_MINION_NAMES_BY_IP["${MINION_IP[$i]}"]="${VAGRANT_MINION_NAMES[$i]}" done diff --git a/cluster/vagrant/provision-config.sh b/cluster/vagrant/provision-config.sh index 98e986e6237..db470a4095a 100755 --- a/cluster/vagrant/provision-config.sh +++ b/cluster/vagrant/provision-config.sh @@ -33,3 +33,5 @@ MASTER_PASSWD=vagrant # Location to hold temp files for provision process KUBE_TEMP=/var/kube-temp + +PORTAL_NET=10.0.0.0/16 diff --git a/cluster/vagrant/provision-master.sh b/cluster/vagrant/provision-master.sh index fc093c6fae4..f0946d0906d 100755 --- a/cluster/vagrant/provision-master.sh +++ b/cluster/vagrant/provision-master.sh @@ -75,6 +75,11 @@ grains: - kubernetes-master EOF +mkdir -p /srv/salt-overlay/pillar +cat </srv/salt-overlay/pillar/cluster-params.sls + portal_net: $PORTAL_NET +EOF + # Configure the salt-master # Auto accept all keys from minions that try to join mkdir -p /etc/salt/master.d diff --git a/cluster/vagrant/util.sh b/cluster/vagrant/util.sh index a704d06140e..6e23d6ccf60 100644 --- a/cluster/vagrant/util.sh +++ b/cluster/vagrant/util.sh @@ -143,3 +143,16 @@ function get-password { export KUBE_PASSWORD=vagrant echo "Using credentials: $KUBE_USER:$KUBE_PASSWORD" } + +# SSH to a node by name ($1) and run a command ($2). +function ssh-to-node { + local node="$1" + local cmd="$2" + local machine="${VAGRANT_MINION_NAMES_BY_IP[${node}]}" + vagrant ssh "${machine}" -c "${cmd}" | grep -v "Connection to.*closed" +} + +# Restart the kube-proxy on a node ($1) +function restart-kube-proxy { + ssh-to-node "$1" "sudo systemctl restart kube-proxy" +} diff --git a/cmd/apiserver/apiserver.go b/cmd/apiserver/apiserver.go index a96d39cb987..0aa28abe708 100644 --- a/cmd/apiserver/apiserver.go +++ b/cmd/apiserver/apiserver.go @@ -64,6 +64,7 @@ var ( machineList util.StringList corsAllowedOriginList util.StringList allowPrivileged = flag.Bool("allow_privileged", false, "If true, allow privileged containers.") + portalNet util.IPNet // TODO: make this a list // TODO: Discover these by pinging the host machines, and rip out these flags. nodeMilliCPU = flag.Int("node_milli_cpu", 1000, "The amount of MilliCPU provisioned on each node") nodeMemory = flag.Int("node_memory", 3*1024*1024*1024, "The amount of memory (in bytes) provisioned on each node") @@ -75,6 +76,7 @@ func init() { flag.Var(&etcdServerList, "etcd_servers", "List of etcd servers to watch (http://ip:port), comma separated. Mutually exclusive with -etcd_config") flag.Var(&machineList, "machines", "List of machines to schedule onto, comma separated.") flag.Var(&corsAllowedOriginList, "cors_allowed_origins", "List of allowed origins for CORS, comma separated. An allowed origin can be a regular expression to support subdomain matching. If this list is empty CORS will not be enabled.") + flag.Var(&portalNet, "portal_net", "A CIDR notation IP range from which to assign portal IPs. This must not overlap with any IP ranges assigned to nodes for pods.") } func verifyMinionFlags() { @@ -89,6 +91,13 @@ func verifyMinionFlags() { } } +// TODO: Longer term we should read this from some config store, rather than a flag. +func verifyPortalFlags() { + if portalNet.IP == nil { + glog.Fatal("No -portal_net specified") + } +} + func initCloudProvider(name string, configFilePath string) cloudprovider.Interface { var config *os.File @@ -141,6 +150,7 @@ func main() { verflag.PrintAndExitIfRequested() verifyMinionFlags() + verifyPortalFlags() if (*etcdConfigFile != "" && len(etcdServerList) != 0) || (*etcdConfigFile == "" && len(etcdServerList) == 0) { glog.Fatalf("specify either -etcd_servers or -etcd_config") @@ -172,6 +182,7 @@ func main() { glog.Fatalf("Invalid storage version or misconfigured etcd: %v", err) } + n := net.IPNet(portalNet) m := master.New(&master.Config{ Client: client, Cloud: cloud, @@ -188,6 +199,7 @@ func main() { resources.Memory: util.NewIntOrStringFromInt(*nodeMemory), }, }, + PortalNet: &n, }) mux := http.NewServeMux() diff --git a/cmd/integration/integration.go b/cmd/integration/integration.go index 6e56dcf5ded..c24384cc278 100644 --- a/cmd/integration/integration.go +++ b/cmd/integration/integration.go @@ -123,11 +123,16 @@ func startComponents(manifestURL string) (apiServerURL string) { } // Master + _, portalNet, err := net.ParseCIDR("10.0.0.0/24") + if err != nil { + glog.Fatalf("Unable to parse CIDR: %v", err) + } m := master.New(&master.Config{ Client: cl, EtcdHelper: helper, Minions: machineList, PodInfoGetter: fakePodInfoGetter{}, + PortalNet: portalNet, }) mux := http.NewServeMux() apiserver.NewAPIGroup(m.API_v1beta1()).InstallREST(mux, "/api/v1beta1") @@ -349,18 +354,33 @@ func runServiceTest(client *client.Client) { if err := wait.Poll(time.Second, time.Second*20, podExists(client, ctx, pod.ID)); err != nil { glog.Fatalf("FAILED: pod never started running %v", err) } - svc := api.Service{ + svc1 := api.Service{ TypeMeta: api.TypeMeta{ID: "service1"}, Selector: map[string]string{ "name": "thisisalonglabel", }, Port: 8080, } - _, err = client.CreateService(ctx, &svc) + _, err = client.CreateService(ctx, &svc1) if err != nil { - glog.Fatalf("Failed to create service: %v, %v", svc, err) + glog.Fatalf("Failed to create service: %v, %v", svc1, err) } - if err := wait.Poll(time.Second, time.Second*10, endpointsSet(client, ctx, svc.ID, 1)); err != nil { + if err := wait.Poll(time.Second, time.Second*20, endpointsSet(client, ctx, svc1.ID, 1)); err != nil { + glog.Fatalf("FAILED: unexpected endpoints: %v", err) + } + // A second service with the same port. + svc2 := api.Service{ + TypeMeta: api.TypeMeta{ID: "service2"}, + Selector: map[string]string{ + "name": "thisisalonglabel", + }, + Port: 8080, + } + _, err = client.CreateService(ctx, &svc2) + if err != nil { + glog.Fatalf("Failed to create service: %v, %v", svc2, err) + } + if err := wait.Poll(time.Second, time.Second*20, endpointsSet(client, ctx, svc2.ID, 1)); err != nil { glog.Fatalf("FAILED: unexpected endpoints: %v", err) } glog.Info("Service test passed.") diff --git a/cmd/proxy/proxy.go b/cmd/proxy/proxy.go index 4945cf5c08e..5731262e805 100644 --- a/cmd/proxy/proxy.go +++ b/cmd/proxy/proxy.go @@ -25,6 +25,8 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/proxy" "github.com/GoogleCloudPlatform/kubernetes/pkg/proxy/config" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util/exec" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util/iptables" "github.com/GoogleCloudPlatform/kubernetes/pkg/version/verflag" "github.com/coreos/go-etcd/etcd" "github.com/golang/glog" @@ -40,7 +42,7 @@ var ( func init() { client.BindClientConfigFlags(flag.CommandLine, clientConfig) flag.Var(&etcdServerList, "etcd_servers", "List of etcd servers to watch (http://ip:port), comma separated (optional). Mutually exclusive with -etcd_config") - flag.Var(&bindAddress, "bind_address", "The address for the proxy server to serve on (set to 0.0.0.0 for all interfaces)") + flag.Var(&bindAddress, "bind_address", "The IP address for the proxy server to serve on (set to 0.0.0.0 for all interfaces)") } func main() { @@ -97,12 +99,12 @@ func main() { } loadBalancer := proxy.NewLoadBalancerRR() - proxier := proxy.NewProxier(loadBalancer, net.IP(bindAddress)) + proxier := proxy.NewProxier(loadBalancer, net.IP(bindAddress), iptables.New(exec.New())) // Wire proxier to handle changes to services serviceConfig.RegisterHandler(proxier) // And wire loadBalancer to handle changes to endpoints to services endpointsConfig.RegisterHandler(loadBalancer) // Just loop forever for now... - select {} + proxier.SyncLoop() } diff --git a/docs/getting-started-guides/locally.md b/docs/getting-started-guides/locally.md index 397165cb855..09ba5cf2744 100644 --- a/docs/getting-started-guides/locally.md +++ b/docs/getting-started-guides/locally.md @@ -8,32 +8,30 @@ Not running Linux? Consider running Linux in a local virtual machine with [Vagra #### Docker -At least [Docker](https://docs.docker.com/installation/#installation) 1.0.0+. Ensure the Docker daemon is running and can be contacted by the user you plan to run as (try `docker ps`). +At least [Docker](https://docs.docker.com/installation/#installation) 1.0.0+. Ensure the Docker daemon is running and can be contacted (try `docker ps`). Some of the kubernetes components need to run as root, which normally works fine with docker. #### etcd You need an [etcd](https://github.com/coreos/etcd) somewhere in your path. Get the [latest release](https://github.com/coreos/etcd/releases/) and place it in `/usr/bin`. - ### Starting the cluster In a separate tab of your terminal, run: ``` cd kubernetes -hack/local-up-cluster.sh +sudo hack/local-up-cluster.sh ``` -This will build and start a lightweight local cluster, consisting of a master and a single minion. Type Control-C to shut it down. +This will build and start a lightweight local cluster, consisting of a master +and a single minion. Type Control-C to shut it down. You can use the cluster/kubecfg.sh script to interact with the local cluster. +You must set the KUBERNETES_MASTER environment variable to let other programs +know how to reach your master. ``` -cd kubernetes -modify cluster/kube-env.sh: - KUBERNETES_PROVIDER="local" - -cluster/kubecfg.sh => interact with the local cluster +export KUBERNETES_MASTER=http://localhost:8080 ``` ### Running a container @@ -41,6 +39,7 @@ cluster/kubecfg.sh => interact with the local cluster Your cluster is running, and you want to start running containers! You can now use any of the cluster/kubecfg.sh commands to interact with your local setup. + ``` cluster/kubecfg.sh list /pods cluster/kubecfg.sh list /services @@ -66,6 +65,17 @@ Congratulations! ### Troubleshooting +#### I can't reach service IPs on the network. + +Some firewall software that uses iptables may not interact well with +kubernetes. If you're having trouble around networking, try disabling any +firewall or other iptables-using systems, first. + +By default the IP range for service portals is 10.0.*.* - depending on your +docker installation, this may conflict with IPs for containers. If you find +containers running with IPs in this range, edit hack/local-cluster-up.sh and +change the portal_net flag to something else. + #### I cannot create a replication controller with replica size greater than 1! What gives? You are running a single minion setup. This has the limitation of only supporting a single replica of a given pod. If you are interested in running with larger replica sizes, we encourage you to try the local vagrant setup or one of the cloud providers. diff --git a/docs/labels.md b/docs/labels.md index 31798835808..e1da0e0af55 100644 --- a/docs/labels.md +++ b/docs/labels.md @@ -40,7 +40,7 @@ key1 exists LIST and WATCH operations may specify label selectors to filter the sets of objects returned using a query parameter: `?labels=key1%3Dvalue1,key2%3Dvalue2,...`. We may extend such filtering to DELETE operations in the future. Kubernetes also currently supports two objects that use label selectors to keep track of their members, `service`s and `replicationController`s: -- `service`: A service is a configuration unit for the proxies that run on every worker node. It is named and points to one or more pods. +- `service`: A [service](services.md) is a configuration unit for the proxies that run on every worker node. It is named and points to one or more pods. - `replicationController`: A [replication controller](replication-controller.md) ensures that a specified number of pod "replicas" are running at any one time. If there are too many, it'll kill some. If there are too few, it'll start more. The set of pods that a `service` targets is defined with a label selector. Similarly, the population of pods that a `replicationController` is monitoring is also defined with a label selector. diff --git a/docs/networking.md b/docs/networking.md index c88b32a3887..37278da82a3 100644 --- a/docs/networking.md +++ b/docs/networking.md @@ -82,7 +82,7 @@ We want to be able to assign IP addresses externally from Docker ([Docker issue In addition to enabling self-registration with 3rd-party discovery mechanisms, we'd like to setup DDNS automatically ([Issue #146](https://github.com/GoogleCloudPlatform/kubernetes/issues/146)). hostname, $HOSTNAME, etc. should return a name for the pod ([Issue #298](https://github.com/GoogleCloudPlatform/kubernetes/issues/298)), and gethostbyname should be able to resolve names of other pods. Probably we need to set up a DNS resolver to do the latter ([Docker issue #2267](https://github.com/dotcloud/docker/issues/2267)), so that we don't need to keep /etc/hosts files up to date dynamically. -Service endpoints are currently found through [Docker-links-compatible](https://docs.docker.com/userguide/dockerlinks/) environment variables specifying ports opened by the service proxy. We don't actually use [the Docker ambassador pattern](https://docs.docker.com/articles/ambassador_pattern_linking/) to link containers because we don't require applications to identify all clients at configuration time. Regardless, we're considering moving away from the current approach to an approach more akin to our approach for individual pods: allocate an IP address per service and automatically register the service in DDNS -- L3 load balancing, essentially. Using a flat service namespace doesn't scale and environment variables don't permit dynamic updates, which complicates service deployment by imposing implicit ordering constraints. +[Service](https://github.com/GoogleCloudPlatform/kubernetes/blob/master/docs/services.md) endpoints are currently found through environment variables. Both [Docker-links-compatible](https://docs.docker.com/userguide/dockerlinks/) variables and kubernetes-specific variables ({NAME}_SERVICE_HOST and {NAME}_SERVICE_BAR) are supported, and resolve to ports opened by the service proxy. We don't actually use [the Docker ambassador pattern](https://docs.docker.com/articles/ambassador_pattern_linking/) to link containers because we don't require applications to identify all clients at configuration time, yet. While services today are managed by the service proxy, this is an implementation detail that applications should not rely on. Clients should instead use the [service portal IP](https://github.com/GoogleCloudPlatform/kubernetes/blob/master/docs/services.md) (which the above environment variables will resolve to). However, a flat service namespace doesn't scale and environment variables don't permit dynamic updates, which complicates service deployment by imposing implicit ordering constraints. We intend to register each service portal IP in DNS, and for that to become the preferred resolution protocol. We'd also like to accommodate other load-balancing solutions (e.g., HAProxy), non-load-balanced services ([Issue #260](https://github.com/GoogleCloudPlatform/kubernetes/issues/260)), and other types of groups (worker pools, etc.). Providing the ability to Watch a label selector applied to pod addresses would enable efficient monitoring of group membership, which could be directly consumed or synced with a discovery mechanism. Event hooks ([Issue #140](https://github.com/GoogleCloudPlatform/kubernetes/issues/140)) for join/leave events would probably make this even easier. diff --git a/docs/services.md b/docs/services.md new file mode 100644 index 00000000000..44281500f3d --- /dev/null +++ b/docs/services.md @@ -0,0 +1,151 @@ +# Services in Kubernetes + +## Overview + +Kubernetes [`Pods`](pods.md) are ephemeral. They can come and go over time, especially when +driven by things like [ReplicationControllers](replication-controller.md). +While each `pod` gets its own IP address, those IP addresses can not be relied +upon to be stable over time. This leads to a problem: if some set of `pods` +(let's call them backends) provides functionality to other `pods` (let's call +them frontends) inside the Kubernetes cluster, how do those frontends find the +backends? + +Enter `services`. + +A Kubernetes `service` is an abstraction which defines a logical set of `pods` and +a policy by which to access them - sometimes called a micro-service. The goal +of `services` is to provide a bridge for non-Kubernetes-native applications to +access backends without the need to write code that is specific to Kubernetes. +A `service` offers clients an IP and port pair which, when accessed, redirects +to the appropriate backends. The set of `pods` targetted is determined by a label +selector. + +As an example, consider an image-process backend which is running with 3 live +replicas. Those replicas are fungible - frontends do not care which backend +they use. While the actual `pods` that comprise the set may change, the +frontend client(s) do not need to know that. The `service` abstraction +enables this decoupling. + +## Defining a service + +A `service` in Kubernetes is a REST object, similar to a `pod`. Like a `pod` a +`service` definitions can be POSTed to the apiserver to create a new instance. +For example, suppose you have a set of `pods` that each expose port 9376 and +carry a label "app=MyApp". + +```json +{ + "id": "myapp", + "selector": { + "app": "MyApp" + }, + "containerPort": 9376, + "protocol": "TCP", + "port": 8765 +} +``` + +This specification will create a new `service` named "myapp" which resolves to +TCP port 9376 on any `pod` with the "app=MyApp" label. To access this +`service`, a client can simply connect to $MYAPP_SERVICE_HOST on port +$MYAPP_SERVICE_PORT. + +## How do they work? + +Each node in a Kubernetes cluster runs a `service proxy`. This application +watches the Kubernetes master for the addition and removal of `service` +objects and `endpoints` (pods that satisfy a service's label selector), and +maintains a mapping of `service` to list of `endpoints`. It opens a port on the +local node for each `service` and forwards traffic to backends (ostensibly +according to a policy, but the only policy supported for now is round-robin). + +When a `pod` is scheduled, the master adds a set of environment variables for +each active `service`. We support both +[Docker-links-compatible](https://docs.docker.com/userguide/dockerlinks/) +variables and simpler {SVCNAME}_SERVICE_HOST and {SVCNAME}_SERVICE_PORT +variables. This does imply an ordering requirement - any `service` that a `pod` +wants to access must be created before the `pod` itself, or else the environment +variables will not be populated. This restriction will be removed once DNS for +`services` is supported. + +A `service`, through its label selector, can resolve to 0 or more `endpoints`. +Over the life of a `services`, the set of `pods` which comprise that +`services` can +grow, shrink, or turn over completely. Clients will only see issues if they are +actively using a backend when that backend is removed from the `services` (and even +then, open connections will persist for some protocols). + +![Services overview diagram](services_overview.png) + +## The gory details + +The previous information should be sufficient for many people who just want to +use `services`. However, there is a lot going on behind the scenes that may be +worth understanding. + +### Avoiding collisions + +One of the primary philosophies of Kubernetes is that users should not be +exposed to situations that could cause their actions to fail through no fault +of their own. In this situation, we are looking at network ports - users +should not have to choose a port number if that choice might collide with +another user. That is an isolation failure. + +In order to allow users to choose a port number for their `services`, we must +ensure that no two `services` can collide. We do that by allocating each +`service` its own IP address. + +### IPs and Portals + +Unlike `pod` IP addresses, which actually route to a fixed destination, +`service` IPs are not actually answered by a single host. Instead, we use +`iptables` (packet processing logic in Linux) to define "virtual" IP addresses +which are transparently redirected as needed. We call the tuple of the +`service` IP and the `service` port the `portal`. When clients connect to the +`portal`, their traffic is automatically transported to an appropriate +endpoint. The environment variables for `services` are actually populated in +terms of the portal IP and port. We will be adding DNS support for +`services`, too. + +As an example, consider the image processing application described above. +when the backend `services` is created, the Kubernetes master assigns a portal +IP address, for example 10.0.0.1. Assuming the `service` port is 1234, the +portal is 10.0.0.1:1234. The master stores that information, which is then +observed by all of the `service proxy` instances in the cluster. When a proxy +sees a new portal, it opens a new random port, establish an iptables redirect +from the portal to this new port, and starts accepting connections on it. + +When a client connects to `MYAPP_SERVICE_HOST` on the portal port (whether +they know the port statically or look it up as MYAPP_SERVICE_PORT), the +iptables rule kicks in, and redirects the packets to the `service proxy`'s own +port. The `service proxy` chooses a backend, and starts proxying traffic from +the client to the backend. + +The net result is that users can choose any `service` port they want without +risk of collision. Clients can simply connect to an IP and port, without +being aware of which `pods` they are accessing. + +![Services detailed diagram](services_detail.png) + +## Shortcomings + +Part of the `service` specification is a `createExternalLoadBalancer` flag, +which tells the master to make an external load balancer that points to the +service. In order to do this today, the service proxy must answer on a known +(i.e. not random) port. In this case, the service port is promoted to the +proxy port. This means that is is still possible for users to collide with +each others services or with other pods. We expect most `services` will not +set this flag, mitigating the exposure. + +We expect that using iptables for portals will work at small scale, but will +not scale to large clusters with thousands of services. See [the original +design proposal for +portals](https://github.com/GoogleCloudPlatform/kubernetes/issues/1107) for +more details. + +## Future work + +In the future we envision that the proxy policy can become more nuanced than +simple round robin balancing, for example master elected or sharded. We also +envision that some `services` will have "real" load balancers, in which case the +portal will simply transport the packets there. diff --git a/docs/services_detail.png b/docs/services_detail.png new file mode 100644 index 00000000000..3fc28a0bc7d Binary files /dev/null and b/docs/services_detail.png differ diff --git a/docs/services_detail.svg b/docs/services_detail.svg new file mode 100644 index 00000000000..1c1feee1bb4 --- /dev/null +++ b/docs/services_detail.svg @@ -0,0 +1,538 @@ + + + + + + + + + + image/svg+xml + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + Backend Pod + labels: app=MyAppport: 9376 + + + + + + Backend Pod + labels: app=MyAppport: 9376 + + + + + + Backend Pod + labels: app=MyAppport: 9376 + + + + + + + + + + + + + + + Client Pod + + + + + iptables + + + + + Service Proxy + + + + + + + apiserver + + + + connect to 10.0.0.1:1234 + redirect to (random) proxy port + update service specs + install portal rules + + diff --git a/docs/services_overview.png b/docs/services_overview.png new file mode 100644 index 00000000000..ddbed70894a Binary files /dev/null and b/docs/services_overview.png differ diff --git a/docs/services_overview.svg b/docs/services_overview.svg new file mode 100644 index 00000000000..a3b9dadbde1 --- /dev/null +++ b/docs/services_overview.svg @@ -0,0 +1,417 @@ + + + + + + + + + + image/svg+xml + + + + + + + + + + + + + + + + + + + + + + + Backend Pod + labels: app=MyAppport: 9376 + + + + + + Backend Pod + labels: app=MyAppport: 9376 + + + + + + Backend Pod + labels: app=MyAppport: 9376 + + + + + + + + + + + + + + + Client Pod + + + + + + Service Proxy + + + + + + + apiserver + + + + + diff --git a/examples/guestbook-go/README.md b/examples/guestbook-go/README.md index 2e83713608d..63226331a53 100644 --- a/examples/guestbook-go/README.md +++ b/examples/guestbook-go/README.md @@ -85,7 +85,7 @@ redis-slave-controller gurpartap/redis name=redis,role=slave 2 The redis slave configures itself by looking for the Kubernetes service environment variables in the container environment. In particular, the redis slave is started with the following command: ```shell -redis-server --slaveof $SERVICE_HOST $REDIS_MASTER_SERVICE_PORT +redis-server --slaveof $REDIS_MASTER_SERVICE_HOST $REDIS_MASTER_SERVICE_PORT ``` Once that's up you can list the pods in the cluster, to verify that the master and slaves are running: diff --git a/examples/guestbook-go/_src/main.go b/examples/guestbook-go/_src/main.go index 78eaf8e06e7..45936fb26de 100644 --- a/examples/guestbook-go/_src/main.go +++ b/examples/guestbook-go/_src/main.go @@ -71,7 +71,7 @@ func HandleError(result interface{}, err error) (r interface{}) { } func main() { - pool = simpleredis.NewConnectionPoolHost(os.Getenv("SERVICE_HOST") + ":" + os.Getenv("REDIS_MASTER_SERVICE_PORT")) + pool = simpleredis.NewConnectionPoolHost(os.Getenv("REDIS_MASTER_SERVICE_HOST") + ":" + os.Getenv("REDIS_MASTER_SERVICE_PORT")) defer pool.Close() r := mux.NewRouter() diff --git a/hack/e2e-suite/services.sh b/hack/e2e-suite/services.sh new file mode 100755 index 00000000000..5ee499bd9db --- /dev/null +++ b/hack/e2e-suite/services.sh @@ -0,0 +1,343 @@ +#!/bin/bash + +# Copyright 2014 Google Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Verifies that services and portals work. + +set -o errexit +set -o nounset +set -o pipefail + +KUBE_ROOT=$(dirname "${BASH_SOURCE}")/../.. +source "${KUBE_ROOT}/cluster/kube-env.sh" +source "${KUBE_ROOT}/cluster/${KUBERNETES_PROVIDER}/util.sh" + +function error() { + echo "$@" >&2 + exit 1 +} + +function sort_args() { + printf "%s\n" "$@" | sort -n | tr '\n\r' ' ' | sed 's/ */ /g' +} + +svcs_to_clean=() +function do_teardown() { + local svc + for svc in "${svcs_to_clean[@]:+${svcs_to_clean[@]}}"; do + stop_service "${svc}" + done +} + +# Args: +# $1: service name +# $2: service port +# $3: service replica count +function start_service() { + echo "Starting service '$1' on port $2 with $3 replicas" + ${KUBECFG} -s "$2" -p 9376 run kubernetes/serve_hostname "$3" "$1" + svcs_to_clean+=("$1") +} + +# Args: +# $1: service name +function stop_service() { + echo "Stopping service '$1'" + ${KUBECFG} stop "$1" || true + ${KUBECFG} delete "/replicationControllers/$1" || true + ${KUBECFG} delete "/services/$1" || true +} + +# Args: +# $1: service name +# $2: expected pod count +function query_pods() { + # This fails very occasionally, so retry a bit. + pods_unsorted=() + local i + for i in $(seq 1 10); do + pods_unsorted=($(${KUBECFG} \ + '-template={{range.Items}}{{.ID}} {{end}}' \ + -l replicationController="$1" list pods)) + found="${#pods_unsorted[*]}" + if [[ "${found}" == "$2" ]]; then + break + fi + sleep 3 + done + if [[ "${found}" != "$2" ]]; then + error "Failed to query pods for $1: expected $2, found ${found}" + fi + + # The "return" is a sorted list of pod IDs. + sort_args "${pods_unsorted[@]}" +} + +# Args: +# $1: service name +# $2: pod count +function wait_for_pods() { + echo "Querying pods in $1" + local pods_sorted=$(query_pods "$1" "$2") + printf '\t%s\n' ${pods_sorted} + + # Container turn up on a clean cluster can take a while for the docker image + # pulls. Wait a generous amount of time. + # TODO: Sometimes pods change underneath us, which makes the GET fail (404). + # Maybe this test can be loosened and still be useful? + pods_needed=$2 + local i + for i in $(seq 1 30); do + echo "Waiting for ${pods_needed} pods to become 'running'" + pods_needed="$2" + for id in ${pods_sorted}; do + status=$(${KUBECFG} -template '{{.CurrentState.Status}}' get "pods/${id}") + if [[ "${status}" == "Running" ]]; then + pods_needed=$((pods_needed-1)) + fi + done + if [[ "${pods_needed}" == 0 ]]; then + break + fi + sleep 3 + done + if [[ "${pods_needed}" -gt 0 ]]; then + error "Pods for $1 did not come up in time" + fi +} + +# Args: +# $1: service name +# $2: service IP +# $3: service port +# $4: pod count +# $5: pod IDs +function wait_for_service_up() { + local i + for i in $(seq 1 20); do + results=($(ssh-to-node "${test_node}" " + set -e; + for i in $(seq -s' ' 1 $4); do + curl -s --connect-timeout 1 http://$2:$3; + done | sort | uniq + ")) + found_pods=$(sort_args "${results[@]:+${results[@]}}") + if [[ "${found_pods}" == "$5" ]]; then + break + fi + echo "Waiting for endpoints to propagate" + sleep 3 + done + if [[ "${found_pods}" != "$5" ]]; then + error "Endpoints did not propagate in time" + fi +} + +# Args: +# $1: service name +# $2: service IP +# $3: service port +function wait_for_service_down() { + local i + for i in $(seq 1 15); do + $(ssh-to-node "${test_node}" " + curl -s --connect-timeout 2 "http://$2:$3" >/dev/null 2>&1 && exit 1 || exit 0; + ") && break + echo "Waiting for $1 to go down" + sleep 2 + done +} + +# Args: +# $1: service name +# $2: service IP +# $3: service port +# $4: pod count +# $5: pod IDs +function verify_from_container() { + results=($(ssh-to-node "${test_node}" " + set -e; + sudo docker pull busybox >/dev/null; + sudo docker run busybox sh -c ' + for i in $(seq -s' ' 1 $4); do + wget -q -T 1 -O - http://$2:$3; + done + '")) \ + || error "testing $1 portal from container failed" + found_pods=$(sort_args "${results[@]}") + if [[ "${found_pods}" != "$5" ]]; then + error -e "$1 portal failed from container, expected:\n + $(printf '\t%s\n' $5)\n + got:\n + $(printf '\t%s\n' ${found_pods}) + " + fi +} + +trap "do_teardown" EXIT + +# Get node IP addresses and pick one as our test point. +detect-minions +test_node="${MINION_NAMES[0]}" +master="${MASTER_NAME}" + +# Launch some pods and services. +svc1_name="service1" +svc1_port=80 +svc1_count=3 +start_service "${svc1_name}" "${svc1_port}" "${svc1_count}" + +svc2_name="service2" +svc2_port=80 +svc2_count=3 +start_service "${svc2_name}" "${svc2_port}" "${svc2_count}" + +# Wait for the pods to become "running". +wait_for_pods "${svc1_name}" "${svc1_count}" +wait_for_pods "${svc2_name}" "${svc2_count}" + +# Get the sorted lists of pods. +svc1_pods=$(query_pods "${svc1_name}" "${svc1_count}") +svc2_pods=$(query_pods "${svc2_name}" "${svc2_count}") + +# Get the portal IPs. +svc1_ip=$(${KUBECFG} -template '{{.PortalIP}}' get "services/${svc1_name}") +test -n "${svc1_ip}" || error "Service1 IP is blank" +svc2_ip=$(${KUBECFG} -template '{{.PortalIP}}' get "services/${svc2_name}") +test -n "${svc2_ip}" || error "Service2 IP is blank" +if [[ "${svc1_ip}" == "${svc2_ip}" ]]; then + error "Portal IPs conflict: ${svc1_ip}" +fi + +# +# Test 1: Prove that the service portal is alive. +# +echo "Verifying the portals from the host" +wait_for_service_up "${svc1_name}" "${svc1_ip}" "${svc1_port}" \ + "${svc1_count}" "${svc1_pods}" +wait_for_service_up "${svc2_name}" "${svc2_ip}" "${svc2_port}" \ + "${svc2_count}" "${svc2_pods}" +echo "Verifying the portals from a container" +verify_from_container "${svc1_name}" "${svc1_ip}" "${svc1_port}" \ + "${svc1_count}" "${svc1_pods}" +verify_from_container "${svc2_name}" "${svc2_ip}" "${svc2_port}" \ + "${svc2_count}" "${svc2_pods}" + +# +# Test 2: Bounce the proxy and make sure the portal comes back. +# +echo "Restarting kube-proxy" +restart-kube-proxy "${test_node}" +echo "Verifying the portals from the host" +wait_for_service_up "${svc1_name}" "${svc1_ip}" "${svc1_port}" \ + "${svc1_count}" "${svc1_pods}" +wait_for_service_up "${svc2_name}" "${svc2_ip}" "${svc2_port}" \ + "${svc2_count}" "${svc2_pods}" +echo "Verifying the portals from a container" +verify_from_container "${svc1_name}" "${svc1_ip}" "${svc1_port}" \ + "${svc1_count}" "${svc1_pods}" +verify_from_container "${svc2_name}" "${svc2_ip}" "${svc2_port}" \ + "${svc2_count}" "${svc2_pods}" + +# +# Test 3: Stop one service and make sure it is gone. +# +stop_service "${svc1_name}" +wait_for_service_down "${svc1_name}" "${svc1_ip}" "${svc1_port}" + +# +# Test 4: Bring up another service, make sure it re-uses Portal IPs. +# +svc3_name="service3" +svc3_port=80 +svc3_count=3 +start_service "${svc3_name}" "${svc3_port}" "${svc3_count}" + +# Wait for the pods to become "running". +wait_for_pods "${svc3_name}" "${svc3_count}" + +# Get the sorted lists of pods. +svc3_pods=$(query_pods "${svc3_name}" "${svc3_count}") + +# Get the portal IP. +svc3_ip=$(${KUBECFG} -template '{{.PortalIP}}' get "services/${svc3_name}") +test -n "${svc3_ip}" || error "Service3 IP is blank" +if [[ "${svc3_ip}" != "${svc1_ip}" ]]; then + error "Portal IPs not resued: ${svc3_ip} != ${svc1_ip}" +fi + +echo "Verifying the portals from the host" +wait_for_service_up "${svc3_name}" "${svc3_ip}" "${svc3_port}" \ + "${svc3_count}" "${svc3_pods}" +echo "Verifying the portals from a container" +verify_from_container "${svc3_name}" "${svc3_ip}" "${svc3_port}" \ + "${svc3_count}" "${svc3_pods}" + +# +# Test 5: Remove the iptables rules, make sure they come back. +# +echo "Manually removing iptables rules" +ssh-to-node "${test_node}" "sudo iptables -t nat -F KUBE-PROXY" +echo "Verifying the portals from the host" +wait_for_service_up "${svc3_name}" "${svc3_ip}" "${svc3_port}" \ + "${svc3_count}" "${svc3_pods}" +echo "Verifying the portals from a container" +verify_from_container "${svc3_name}" "${svc3_ip}" "${svc3_port}" \ + "${svc3_count}" "${svc3_pods}" + +# +# Test 6: Restart the master, make sure portals come back. +# +echo "Restarting the master" +ssh-to-node "${master}" "sudo /etc/init.d/apiserver restart" +sleep 5 +echo "Verifying the portals from the host" +wait_for_service_up "${svc3_name}" "${svc3_ip}" "${svc3_port}" \ + "${svc3_count}" "${svc3_pods}" +echo "Verifying the portals from a container" +verify_from_container "${svc3_name}" "${svc3_ip}" "${svc3_port}" \ + "${svc3_count}" "${svc3_pods}" + +# +# Test 7: Bring up another service, make sure it does not re-use Portal IPs. +# +svc4_name="service4" +svc4_port=80 +svc4_count=3 +start_service "${svc4_name}" "${svc4_port}" "${svc4_count}" + +# Wait for the pods to become "running". +wait_for_pods "${svc4_name}" "${svc4_count}" + +# Get the sorted lists of pods. +svc4_pods=$(query_pods "${svc4_name}" "${svc4_count}") + +# Get the portal IP. +svc4_ip=$(${KUBECFG} -template '{{.PortalIP}}' get "services/${svc4_name}") +test -n "${svc4_ip}" || error "Service4 IP is blank" +if [[ "${svc4_ip}" == "${svc2_ip}" || "${svc4_ip}" == "${svc3_ip}" ]]; then + error "Portal IPs conflict: ${svc4_ip}" +fi + +echo "Verifying the portals from the host" +wait_for_service_up "${svc4_name}" "${svc4_ip}" "${svc4_port}" \ + "${svc4_count}" "${svc4_pods}" +echo "Verifying the portals from a container" +verify_from_container "${svc4_name}" "${svc4_ip}" "${svc4_port}" \ + "${svc4_count}" "${svc4_pods}" + +# TODO: test createExternalLoadBalancer + +exit 0 diff --git a/hack/local-up-cluster.sh b/hack/local-up-cluster.sh index c5f651a82de..53d114fc68b 100755 --- a/hack/local-up-cluster.sh +++ b/hack/local-up-cluster.sh @@ -53,6 +53,7 @@ APISERVER_LOG=/tmp/apiserver.log --port="${API_PORT}" \ --etcd_servers="http://127.0.0.1:4001" \ --machines="127.0.0.1" \ + --portal_net="10.0.0.0/24" \ --cors_allowed_origins="${API_CORS_ALLOWED_ORIGINS}" >"${APISERVER_LOG}" 2>&1 & APISERVER_PID=$! diff --git a/hack/test-cmd.sh b/hack/test-cmd.sh index ca5e1f50900..d5fb2cd3a72 100755 --- a/hack/test-cmd.sh +++ b/hack/test-cmd.sh @@ -71,7 +71,8 @@ ${GO_OUT}/apiserver \ --port="${API_PORT}" \ --etcd_servers="http://${ETCD_HOST}:${ETCD_PORT}" \ --machines="127.0.0.1" \ - --minion_port=${KUBELET_PORT} 1>&2 & + --minion_port=${KUBELET_PORT} \ + --portal_net="10.0.0.0/24" 1>&2 & APISERVER_PID=$! wait_for_url "http://127.0.0.1:${API_PORT}/healthz" "apiserver: " diff --git a/icebox/cluster/rackspace/cloud-config/master-cloud-config.yaml b/icebox/cluster/rackspace/cloud-config/master-cloud-config.yaml index cec8bf2e13a..a9b3800eea9 100644 --- a/icebox/cluster/rackspace/cloud-config/master-cloud-config.yaml +++ b/icebox/cluster/rackspace/cloud-config/master-cloud-config.yaml @@ -7,6 +7,7 @@ write_files: - kubernetes-master cloud: rackspace etcd_servers: KUBE_MASTER + portal_net: PORTAL_NET path: /etc/salt/minion.d/grains.conf - content: | auto_accept: True diff --git a/icebox/cluster/rackspace/config-default.sh b/icebox/cluster/rackspace/config-default.sh index 4febe577102..6d4837a6fc7 100644 --- a/icebox/cluster/rackspace/config-default.sh +++ b/icebox/cluster/rackspace/config-default.sh @@ -36,3 +36,4 @@ RAX_NUM_MINIONS="${RAX_NUM_MINIONS-4}" MINION_TAG="tag=${INSTANCE_PREFIX}-minion" MINION_NAMES=($(eval echo ${INSTANCE_PREFIX}-minion-{1..${RAX_NUM_MINIONS}})) KUBE_NETWORK=($(eval echo "10.240.{1..${RAX_NUM_MINIONS}}.0/24")) +PORTAL_NET="10.0.0.0/16" diff --git a/icebox/cluster/rackspace/util.sh b/icebox/cluster/rackspace/util.sh index fc667e13632..30a3293894f 100644 --- a/icebox/cluster/rackspace/util.sh +++ b/icebox/cluster/rackspace/util.sh @@ -70,9 +70,11 @@ rax-boot-master() { grep -v "^#" $(dirname $0)/templates/download-release.sh ) > ${KUBE_TEMP}/masterStart.sh -# Copy cloud-config to KUBE_TEMP and work some sed magic - sed -e "s/KUBE_MASTER/$MASTER_NAME/g" \ - -e "s/MASTER_HTPASSWD/$HTPASSWD/" \ +# Copy cloud-config to KUBE_TEMP and work some sed magic. Some vars can have +# '/' embedded, so don't use that for sed. + sed -e "s|KUBE_MASTER|$MASTER_NAME|g" \ + -e "s|MASTER_HTPASSWD|$HTPASSWD|" \ + -e "s|PORTAL_NET|$PORTAL_NET|" \ $(dirname $0)/cloud-config/master-cloud-config.yaml > $KUBE_TEMP/master-cloud-config.yaml diff --git a/pkg/api/types.go b/pkg/api/types.go index ad47551e85d..5c828b267aa 100644 --- a/pkg/api/types.go +++ b/pkg/api/types.go @@ -423,6 +423,13 @@ type Service struct { // ContainerPort is the name of the port on the container to direct traffic to. // Optional, if unspecified use the first port on the container. ContainerPort util.IntOrString `json:"containerPort,omitempty" yaml:"containerPort,omitempty"` + + // PortalIP is assigned by the master. If specified by the user it will be ignored. + // TODO: This is awkward - if we had a BoundService, it would be better factored. + PortalIP string `json:"portalIP,omitempty" yaml:"portalIP,omitempty"` + + // ProxyPort is assigned by the master. If specified by the user it will be ignored. + ProxyPort int `json:"proxyPort,omitempty" yaml:"proxyPort,omitempty"` } // Endpoints is a collection of endpoints that implement the actual service, for example: diff --git a/pkg/api/v1beta1/types.go b/pkg/api/v1beta1/types.go index f8c136e29ce..468dde2c1db 100644 --- a/pkg/api/v1beta1/types.go +++ b/pkg/api/v1beta1/types.go @@ -451,6 +451,12 @@ type Service struct { // ContainerPort is the name of the port on the container to direct traffic to. // Optional, if unspecified use the first port on the container. ContainerPort util.IntOrString `json:"containerPort,omitempty" yaml:"containerPort,omitempty"` + + // PortalIP is assigned by the master. If specified by the user it will be ignored. + PortalIP string `json:"portalIP,omitempty" yaml:"portalIP,omitempty"` + + // ProxyPort is assigned by the master. If specified by the user it will be ignored. + ProxyPort int `json:"proxyPort,omitempty" yaml:"proxyPort,omitempty"` } // Endpoints is a collection of endpoints that implement the actual service, for example: diff --git a/pkg/api/v1beta2/types.go b/pkg/api/v1beta2/types.go index 93f2710bd1b..6375c0450ad 100644 --- a/pkg/api/v1beta2/types.go +++ b/pkg/api/v1beta2/types.go @@ -416,6 +416,12 @@ type Service struct { // ContainerPort is the name of the port on the container to direct traffic to. // Optional, if unspecified use the first port on the container. ContainerPort util.IntOrString `json:"containerPort,omitempty" yaml:"containerPort,omitempty"` + + // PortalIP is assigned by the master. If specified by the user it will be ignored. + PortalIP string `json:"portalIP,omitempty" yaml:"portalIP,omitempty"` + + // ProxyPort is assigned by the master. If specified by the user it will be ignored. + ProxyPort int `json:"proxyPort,omitempty" yaml:"proxyPort,omitempty"` } // Endpoints is a collection of endpoints that implement the actual service, for example: diff --git a/pkg/api/v1beta3/types.go b/pkg/api/v1beta3/types.go index f2b89fbd2c2..23016bd7c98 100644 --- a/pkg/api/v1beta3/types.go +++ b/pkg/api/v1beta3/types.go @@ -570,6 +570,11 @@ type ReplicationControllerList struct { // ServiceStatus represents the current status of a service type ServiceStatus struct { + // PortalIP is assigned by the master. + PortalIP string `json:"portalIP,omitempty" yaml:"portalIP,omitempty"` + + // ProxyPort is assigned by the master. If 0, the proxy will choose an ephemeral port. + ProxyPort int `json:"proxyPort,omitempty" yaml:"proxyPort,omitempty"` } // ServiceSpec describes the attributes that a user creates on a service diff --git a/pkg/kubecfg/resource_printer.go b/pkg/kubecfg/resource_printer.go index f54532a3f94..e528f82177f 100644 --- a/pkg/kubecfg/resource_printer.go +++ b/pkg/kubecfg/resource_printer.go @@ -140,7 +140,7 @@ func (h *HumanReadablePrinter) validatePrintHandlerFunc(printFunc reflect.Value) var podColumns = []string{"ID", "Image(s)", "Host", "Labels", "Status"} var replicationControllerColumns = []string{"ID", "Image(s)", "Selector", "Replicas"} -var serviceColumns = []string{"ID", "Labels", "Selector", "Port"} +var serviceColumns = []string{"ID", "Labels", "Selector", "IP", "Port"} var minionColumns = []string{"Minion identifier"} var statusColumns = []string{"Status"} var eventColumns = []string{"Name", "Kind", "Status", "Reason", "Message"} @@ -226,8 +226,8 @@ func printReplicationControllerList(list *api.ReplicationControllerList, w io.Wr } func printService(svc *api.Service, w io.Writer) error { - _, err := fmt.Fprintf(w, "%s\t%s\t%s\t%d\n", svc.ID, labels.Set(svc.Labels), - labels.Set(svc.Selector), svc.Port) + _, err := fmt.Fprintf(w, "%s\t%s\t%s\t%s\t%d\n", svc.ID, labels.Set(svc.Labels), + labels.Set(svc.Selector), svc.PortalIP, svc.Port) return err } diff --git a/pkg/kubectl/resource_printer.go b/pkg/kubectl/resource_printer.go index c8652913152..fb5bb3f168a 100644 --- a/pkg/kubectl/resource_printer.go +++ b/pkg/kubectl/resource_printer.go @@ -152,7 +152,7 @@ func (h *HumanReadablePrinter) validatePrintHandlerFunc(printFunc reflect.Value) var podColumns = []string{"ID", "IMAGE(S)", "HOST", "LABELS", "STATUS"} var replicationControllerColumns = []string{"ID", "IMAGE(S)", "SELECTOR", "REPLICAS"} -var serviceColumns = []string{"ID", "LABELS", "SELECTOR", "PORT"} +var serviceColumns = []string{"ID", "LABELS", "SELECTOR", "IP", "PORT"} var minionColumns = []string{"ID"} var statusColumns = []string{"STATUS"} @@ -222,8 +222,8 @@ func printReplicationControllerList(list *api.ReplicationControllerList, w io.Wr } func printService(svc *api.Service, w io.Writer) error { - _, err := fmt.Fprintf(w, "%s\t%s\t%s\t%d\n", svc.ID, labels.Set(svc.Labels), - labels.Set(svc.Selector), svc.Port) + _, err := fmt.Fprintf(w, "%s\t%s\t%s\t%s\t%d\n", svc.ID, labels.Set(svc.Labels), + labels.Set(svc.Selector), svc.PortalIP, svc.Port) return err } diff --git a/pkg/master/master.go b/pkg/master/master.go index 366e8a3557c..37bb8c6915f 100644 --- a/pkg/master/master.go +++ b/pkg/master/master.go @@ -17,6 +17,7 @@ limitations under the License. package master import ( + "net" "net/http" "time" @@ -54,6 +55,7 @@ type Config struct { MinionRegexp string PodInfoGetter client.PodInfoGetter NodeResources api.NodeResources + PortalNet *net.IPNet } // Master contains state for a Kubernetes cluster master/api server. @@ -67,6 +69,7 @@ type Master struct { eventRegistry generic.Registry storage map[string]apiserver.RESTStorage client *client.Client + portalNet *net.IPNet } // NewEtcdHelper returns an EtcdHelper for the provided arguments or an error if the version @@ -98,6 +101,7 @@ func New(c *Config) *Master { eventRegistry: event.NewEtcdRegistry(c.EtcdHelper, uint64(c.EventTTL.Seconds())), minionRegistry: minionRegistry, client: c.Client, + portalNet: c.PortalNet, } m.init(c) return m @@ -137,7 +141,7 @@ func (m *Master) init(c *Config) { Minions: m.client, }), "replicationControllers": controller.NewREST(m.controllerRegistry, m.podRegistry), - "services": service.NewREST(m.serviceRegistry, c.Cloud, m.minionRegistry), + "services": service.NewREST(m.serviceRegistry, c.Cloud, m.minionRegistry, m.portalNet), "endpoints": endpoint.NewREST(m.endpointRegistry), "minions": minion.NewREST(m.minionRegistry), "events": event.NewREST(m.eventRegistry), diff --git a/pkg/proxy/proxier.go b/pkg/proxy/proxier.go index 720992dc1e5..539691a0979 100644 --- a/pkg/proxy/proxier.go +++ b/pkg/proxy/proxier.go @@ -27,16 +27,19 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util/iptables" "github.com/golang/glog" ) type serviceInfo struct { - port int - protocol api.Protocol - socket proxySocket - timeout time.Duration - mu sync.Mutex // protects active - active bool + portalIP net.IP + portalPort int + protocol api.Protocol + proxyPort int + socket proxySocket + timeout time.Duration + mu sync.Mutex // protects active + active bool } func (si *serviceInfo) isActive() bool { @@ -64,7 +67,7 @@ type proxySocket interface { // on the impact of calling Close while sessions are active. Close() error // ProxyLoop proxies incoming connections for the specified service to the service endpoints. - ProxyLoop(service string, proxier *Proxier) + ProxyLoop(service string, info *serviceInfo, proxier *Proxier) } // tcpProxySocket implements proxySocket. Close() is implemented by net.Listener. When Close() is called, @@ -73,12 +76,7 @@ type tcpProxySocket struct { net.Listener } -func (tcp *tcpProxySocket) ProxyLoop(service string, proxier *Proxier) { - info, found := proxier.getServiceInfo(service) - if !found { - glog.Errorf("Failed to find service: %s", service) - return - } +func (tcp *tcpProxySocket) ProxyLoop(service string, info *serviceInfo, proxier *Proxier) { for { if !info.isActive() { break @@ -97,7 +95,7 @@ func (tcp *tcpProxySocket) ProxyLoop(service string, proxier *Proxier) { inConn.Close() continue } - glog.V(3).Infof("Mapped service %s to endpoint %s", service, endpoint) + glog.V(3).Infof("Mapped service %q to endpoint %s", service, endpoint) // TODO: This could spin up a new goroutine to make the outbound connection, // and keep accepting inbound traffic. outConn, err := net.DialTimeout("tcp", endpoint, endpointDialTimeout) @@ -118,22 +116,23 @@ func proxyTCP(in, out *net.TCPConn) { wg.Add(2) glog.V(4).Infof("Creating proxy between %v <-> %v <-> %v <-> %v", in.RemoteAddr(), in.LocalAddr(), out.LocalAddr(), out.RemoteAddr()) - go copyBytes(in, out, &wg) - go copyBytes(out, in, &wg) + go copyBytes("from backend", in, out, &wg) + go copyBytes("to backend", out, in, &wg) wg.Wait() in.Close() out.Close() } -func copyBytes(in, out *net.TCPConn, wg *sync.WaitGroup) { +func copyBytes(direction string, dest, src *net.TCPConn, wg *sync.WaitGroup) { defer wg.Done() - glog.V(4).Infof("Copying from %v <-> %v <-> %v <-> %v", - in.RemoteAddr(), in.LocalAddr(), out.LocalAddr(), out.RemoteAddr()) - if _, err := io.Copy(in, out); err != nil { + glog.V(4).Infof("Copying %s: %s -> %s", direction, src.RemoteAddr(), dest.RemoteAddr()) + n, err := io.Copy(dest, src) + if err != nil { glog.Errorf("I/O error: %v", err) } - in.CloseRead() - out.CloseWrite() + glog.V(4).Infof("Copied %d bytes %s: %s -> %s", n, direction, src.RemoteAddr(), dest.RemoteAddr()) + dest.CloseWrite() + src.CloseRead() } // udpProxySocket implements proxySocket. Close() is implemented by net.UDPConn. When Close() is called, @@ -157,12 +156,7 @@ func newClientCache() *clientCache { return &clientCache{clients: map[string]net.Conn{}} } -func (udp *udpProxySocket) ProxyLoop(service string, proxier *Proxier) { - info, found := proxier.getServiceInfo(service) - if !found { - glog.Errorf("Failed to find service: %s", service) - return - } +func (udp *udpProxySocket) ProxyLoop(service string, info *serviceInfo, proxier *Proxier) { activeClients := newClientCache() var buffer [4096]byte // 4KiB should be enough for most whole-packets for { @@ -220,7 +214,7 @@ func (udp *udpProxySocket) getBackendConn(activeClients *clientCache, cliAddr ne glog.Errorf("Couldn't find an endpoint for %s %v", service, err) return nil, err } - glog.V(4).Infof("Mapped service %s to endpoint %s", service, endpoint) + glog.V(3).Infof("Mapped service %q to endpoint %s", service, endpoint) svrConn, err = net.DialTimeout("udp", endpoint, endpointDialTimeout) if err != nil { // TODO: Try another endpoint? @@ -237,6 +231,7 @@ func (udp *udpProxySocket) getBackendConn(activeClients *clientCache, cliAddr ne } // This function is expected to be called as a goroutine. +// TODO: Track and log bytes copied, like TCP func (udp *udpProxySocket) proxyClient(cliAddr net.Addr, svrConn net.Conn, activeClients *clientCache, timeout time.Duration) { defer svrConn.Close() var buffer [4096]byte @@ -302,19 +297,64 @@ func newProxySocket(protocol api.Protocol, ip net.IP, port int) (proxySocket, er // Proxier is a simple proxy for TCP connections between a localhost:lport // and services that provide the actual implementations. type Proxier struct { - loadBalancer LoadBalancer - mu sync.Mutex // protects serviceMap - serviceMap map[string]*serviceInfo - address net.IP + loadBalancer LoadBalancer + mu sync.Mutex // protects serviceMap + serviceMap map[string]*serviceInfo + listenAddress net.IP + iptables iptables.Interface } -// NewProxier returns a new Proxier given a LoadBalancer and an -// address on which to listen -func NewProxier(loadBalancer LoadBalancer, address net.IP) *Proxier { +// NewProxier returns a new Proxier given a LoadBalancer and an address on +// which to listen. Because of the iptables logic, It is assumed that there +// is only a single Proxier active on a machine. +func NewProxier(loadBalancer LoadBalancer, listenAddress net.IP, iptables iptables.Interface) *Proxier { + glog.Infof("Initializing iptables") + // Set up the iptables foundations we need. + if err := iptablesInit(iptables); err != nil { + glog.Errorf("Failed to initialize iptables: %s", err) + return nil + } + // Flush old iptables rules (since the bound ports will be invalid after a restart). + // When OnUpdate() is first called, the rules will be recreated. + if err := iptablesFlush(iptables); err != nil { + glog.Errorf("Failed to flush iptables: %s", err) + return nil + } return &Proxier{ - loadBalancer: loadBalancer, - serviceMap: make(map[string]*serviceInfo), - address: address, + loadBalancer: loadBalancer, + serviceMap: make(map[string]*serviceInfo), + listenAddress: listenAddress, + iptables: iptables, + } +} + +// The periodic interval for checking the state of things. +const syncInterval = 5 * time.Second + +// SyncLoop runs periodic work. This is expected to run as a goroutine or as the main loop of the app. It does not return. +func (proxier *Proxier) SyncLoop() { + for { + select { + case <-time.After(syncInterval): + glog.V(2).Infof("Periodic sync") + if err := iptablesInit(proxier.iptables); err != nil { + glog.Errorf("Failed to ensure iptables: %s", err) + } + proxier.ensurePortals() + } + } +} + +// Ensure that portals exist for all services. +func (proxier *Proxier) ensurePortals() { + proxier.mu.Lock() + defer proxier.mu.Unlock() + // NB: This does not remove rules that should not be present. + for name, info := range proxier.serviceMap { + err := proxier.openPortal(name, info) + if err != nil { + glog.Errorf("Failed to ensure portal for %q: %s", name, err) + } } } @@ -330,7 +370,6 @@ func (proxier *Proxier) stopProxyInternal(service string, info *serviceInfo) err if !info.setActive(false) { return nil } - glog.V(3).Infof("Removing service: %s", service) delete(proxier.serviceMap, service) return info.socket.Close() } @@ -348,39 +387,40 @@ func (proxier *Proxier) setServiceInfo(service string, info *serviceInfo) { proxier.serviceMap[service] = info } -// addServiceOnUnusedPort starts listening for a new service, returning the -// port it's using. For testing on a system with unknown ports used. The timeout only applies to UDP +// addServiceOnPort starts listening for a new service, returning the serviceInfo. +// Pass proxyPort=0 to allocate a random port. The timeout only applies to UDP // connections, for now. -func (proxier *Proxier) addServiceOnUnusedPort(service string, protocol api.Protocol, timeout time.Duration) (string, error) { - sock, err := newProxySocket(protocol, proxier.address, 0) +func (proxier *Proxier) addServiceOnPort(service string, protocol api.Protocol, proxyPort int, timeout time.Duration) (*serviceInfo, error) { + sock, err := newProxySocket(protocol, proxier.listenAddress, proxyPort) if err != nil { - return "", err + return nil, err } - _, port, err := net.SplitHostPort(sock.Addr().String()) + _, portStr, err := net.SplitHostPort(sock.Addr().String()) if err != nil { - return "", err + sock.Close() + return nil, err } - portNum, err := strconv.Atoi(port) + portNum, err := strconv.Atoi(portStr) if err != nil { - return "", err + sock.Close() + return nil, err } - proxier.setServiceInfo(service, &serviceInfo{ - port: portNum, - protocol: protocol, - active: true, - socket: sock, - timeout: timeout, - }) - proxier.startAccepting(service, sock) - return port, nil -} + si := &serviceInfo{ + proxyPort: portNum, + protocol: protocol, + active: true, + socket: sock, + timeout: timeout, + } + proxier.setServiceInfo(service, si) -func (proxier *Proxier) startAccepting(service string, sock proxySocket) { - glog.V(1).Infof("Listening for %s on %s:%s", service, sock.Addr().Network(), sock.Addr().String()) - go func(service string, proxier *Proxier) { + glog.V(1).Infof("Proxying for service %q on %s port %d", service, protocol, portNum) + go func(service string, info *serviceInfo, proxier *Proxier) { defer util.HandleCrash() - sock.ProxyLoop(service, proxier) - }(service, proxier) + sock.ProxyLoop(service, info, proxier) + }(service, si, proxier) + + return si, nil } // How long we leave idle UDP connections open. @@ -395,39 +435,133 @@ func (proxier *Proxier) OnUpdate(services []api.Service) { for _, service := range services { activeServices.Insert(service.ID) info, exists := proxier.getServiceInfo(service.ID) + serviceIP := net.ParseIP(service.PortalIP) // TODO: check health of the socket? What if ProxyLoop exited? - if exists && info.isActive() && info.port == service.Port { + if exists && info.isActive() && info.portalPort == service.Port && info.portalIP.Equal(serviceIP) { continue } - if exists && info.port != service.Port { - err := proxier.stopProxy(service.ID, info) + if exists && (info.portalPort != service.Port || !info.portalIP.Equal(serviceIP)) { + glog.V(4).Infof("Something changed for service %q: stopping it", service.ID) + err := proxier.closePortal(service.ID, info) if err != nil { - glog.Errorf("error stopping %s: %v", service.ID, err) + glog.Errorf("Failed to close portal for %q: %s", service.ID, err) + } + err = proxier.stopProxy(service.ID, info) + if err != nil { + glog.Errorf("Failed to stop service %q: %s", service.ID, err) } } - glog.V(3).Infof("Adding a new service %s on %s port %d", service.ID, service.Protocol, service.Port) - sock, err := newProxySocket(service.Protocol, proxier.address, service.Port) + glog.V(1).Infof("Adding new service %q at %s:%d/%s (local :%d)", service.ID, serviceIP, service.Port, service.Protocol, service.ProxyPort) + info, err := proxier.addServiceOnPort(service.ID, service.Protocol, service.ProxyPort, udpIdleTimeout) if err != nil { - glog.Errorf("Failed to get a socket for %s: %+v", service.ID, err) + glog.Errorf("Failed to start proxy for %q: %+v", service.ID, err) continue } - proxier.setServiceInfo(service.ID, &serviceInfo{ - port: service.Port, - protocol: service.Protocol, - active: true, - socket: sock, - timeout: udpIdleTimeout, - }) - proxier.startAccepting(service.ID, sock) + info.portalIP = serviceIP + info.portalPort = service.Port + err = proxier.openPortal(service.ID, info) + if err != nil { + glog.Errorf("Failed to open portal for %q: %s", service.ID, err) + } } proxier.mu.Lock() defer proxier.mu.Unlock() for name, info := range proxier.serviceMap { if !activeServices.Has(name) { - err := proxier.stopProxyInternal(name, info) + glog.V(1).Infof("Stopping service %q", name) + err := proxier.closePortal(name, info) if err != nil { - glog.Errorf("error stopping %s: %v", name, err) + glog.Errorf("Failed to close portal for %q: %s", name, err) + } + err = proxier.stopProxyInternal(name, info) + if err != nil { + glog.Errorf("Failed to stop service %q: %s", name, err) } } } } + +func (proxier *Proxier) openPortal(service string, info *serviceInfo) error { + args := iptablesPortalArgs(info.portalIP, info.portalPort, proxier.listenAddress, info.proxyPort, service) + existed, err := proxier.iptables.EnsureRule(iptables.TableNAT, iptablesProxyChain, args...) + if err != nil { + glog.Errorf("Failed to install iptables %s rule for service %q", iptablesProxyChain, service) + return err + } + if !existed { + glog.Infof("Opened iptables portal for service %q on %s:%d", service, info.portalIP, info.portalPort) + } + return nil +} + +func (proxier *Proxier) closePortal(service string, info *serviceInfo) error { + args := iptablesPortalArgs(info.portalIP, info.portalPort, proxier.listenAddress, info.proxyPort, service) + if err := proxier.iptables.DeleteRule(iptables.TableNAT, iptablesProxyChain, args...); err != nil { + glog.Errorf("Failed to delete iptables %s rule for service %q", iptablesProxyChain, service) + return err + } + glog.Infof("Closed iptables portal for service %q", service) + return nil +} + +var iptablesProxyChain iptables.Chain = "KUBE-PROXY" + +// Ensure that the iptables infrastructure we use is set up. This can safely be called periodically. +func iptablesInit(ipt iptables.Interface) error { + // TODO: There is almost certainly room for optimization here. E.g. If + // we knew the portal_net CIDR we could fast-track outbound packets not + // destined for a service. There's probably more, help wanted. + if _, err := ipt.EnsureChain(iptables.TableNAT, iptablesProxyChain); err != nil { + return err + } + if _, err := ipt.EnsureRule(iptables.TableNAT, iptables.ChainPrerouting, "-j", string(iptablesProxyChain)); err != nil { + return err + } + if _, err := ipt.EnsureRule(iptables.TableNAT, iptables.ChainOutput, "-j", string(iptablesProxyChain)); err != nil { + return err + } + return nil +} + +// Flush all of our custom iptables rules. +func iptablesFlush(ipt iptables.Interface) error { + return ipt.FlushChain(iptables.TableNAT, iptablesProxyChain) +} + +// Used below. +var zeroIP = net.ParseIP("0.0.0.0") +var localhostIP = net.ParseIP("127.0.0.1") + +// Build a slice of iptables args for a portal rule. +func iptablesPortalArgs(destIP net.IP, destPort int, proxyIP net.IP, proxyPort int, service string) []string { + args := []string{ + "-m", "comment", + "--comment", service, + "-p", "tcp", + "-d", destIP.String(), + "--dport", fmt.Sprintf("%d", destPort), + } + // This is tricky. If the proxy is bound (see Proxier.listenAddress) + // to 0.0.0.0 ("any interface") or 127.0.0.1, we can use REDIRECT, + // which will bring packets back to the host's loopback interface. If + // the proxy is bound to any other interface, then it is not listening + // on the hosts's loopback, so we have to use DNAT to that specific + // IP. We can not simply use DNAT to 127.0.0.1 in the first case + // because from within a container, 127.0.0.1 is the container's + // loopback interface, not the host's. + // + // Why would anyone bind to an address that is not inclusive of + // localhost? Apparently some cloud environments have their public IP + // exposed as a real network interface AND do not have firewalling. We + // don't want to expose everything out to the world. + // + // Unfortunately, I don't know of any way to listen on some (N > 1) + // interfaces but not ALL interfaces, short of doing it manually, and + // this is simpler than that. + if proxyIP.Equal(zeroIP) || proxyIP.Equal(localhostIP) { + args = append(args, "-j", "REDIRECT", "--to-ports", fmt.Sprintf("%d", proxyPort)) + } else { + args = append(args, "-j", "DNAT", "--to-destination", fmt.Sprintf("%s:%d", proxyIP.String(), proxyPort)) + } + return args +} diff --git a/pkg/proxy/proxier_test.go b/pkg/proxy/proxier_test.go index b3af1a0f140..809d7c5de75 100644 --- a/pkg/proxy/proxier_test.go +++ b/pkg/proxy/proxier_test.go @@ -28,23 +28,28 @@ import ( "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util/iptables" ) -func waitForClosedPortTCP(p *Proxier, proxyPort string) error { +func joinHostPort(host string, port int) string { + return net.JoinHostPort(host, fmt.Sprintf("%d", port)) +} + +func waitForClosedPortTCP(p *Proxier, proxyPort int) error { for i := 0; i < 50; i++ { - conn, err := net.Dial("tcp", net.JoinHostPort("127.0.0.1", proxyPort)) + conn, err := net.Dial("tcp", joinHostPort("", proxyPort)) if err != nil { return nil } conn.Close() time.Sleep(1 * time.Millisecond) } - return fmt.Errorf("port %s still open", proxyPort) + return fmt.Errorf("port %d still open", proxyPort) } -func waitForClosedPortUDP(p *Proxier, proxyPort string) error { +func waitForClosedPortUDP(p *Proxier, proxyPort int) error { for i := 0; i < 50; i++ { - conn, err := net.Dial("udp", net.JoinHostPort("127.0.0.1", proxyPort)) + conn, err := net.Dial("udp", joinHostPort("", proxyPort)) if err != nil { return nil } @@ -66,7 +71,26 @@ func waitForClosedPortUDP(p *Proxier, proxyPort string) error { conn.Close() time.Sleep(1 * time.Millisecond) } - return fmt.Errorf("port %s still open", proxyPort) + return fmt.Errorf("port %d still open", proxyPort) +} + +// The iptables logic has to be tested in a proper end-to-end test, so this just stubs everything out. +type fakeIptables struct{} + +func (fake *fakeIptables) EnsureChain(table iptables.Table, chain iptables.Chain) (bool, error) { + return false, nil +} + +func (fake *fakeIptables) FlushChain(table iptables.Table, chain iptables.Chain) error { + return nil +} + +func (fake *fakeIptables) EnsureRule(table iptables.Table, chain iptables.Chain, args ...string) (bool, error) { + return false, nil +} + +func (fake *fakeIptables) DeleteRule(table iptables.Table, chain iptables.Chain, args ...string) error { + return nil } var tcpServerPort string @@ -99,9 +123,9 @@ func init() { go udp.Loop() } -func testEchoTCP(t *testing.T, address, port string) { +func testEchoTCP(t *testing.T, address string, port int) { path := "aaaaa" - res, err := http.Get("http://" + address + ":" + port + "/" + path) + res, err := http.Get("http://" + address + ":" + fmt.Sprintf("%d", port) + "/" + path) if err != nil { t.Fatalf("error connecting to server: %v", err) } @@ -115,10 +139,10 @@ func testEchoTCP(t *testing.T, address, port string) { } } -func testEchoUDP(t *testing.T, address, port string) { +func testEchoUDP(t *testing.T, address string, port int) { data := "abc123" - conn, err := net.Dial("udp", net.JoinHostPort(address, port)) + conn, err := net.Dial("udp", joinHostPort(address, port)) if err != nil { t.Fatalf("error connecting to server: %v", err) } @@ -144,13 +168,13 @@ func TestTCPProxy(t *testing.T) { }, }) - p := NewProxier(lb, net.ParseIP("127.0.0.1")) + p := NewProxier(lb, net.ParseIP("127.0.0.1"), &fakeIptables{}) - proxyPort, err := p.addServiceOnUnusedPort("echo", "TCP", 0) + svcInfo, err := p.addServiceOnPort("echo", "TCP", 0, time.Second) if err != nil { t.Fatalf("error adding new service: %#v", err) } - testEchoTCP(t, "127.0.0.1", proxyPort) + testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort) } func TestUDPProxy(t *testing.T) { @@ -162,13 +186,13 @@ func TestUDPProxy(t *testing.T) { }, }) - p := NewProxier(lb, net.ParseIP("127.0.0.1")) + p := NewProxier(lb, net.ParseIP("127.0.0.1"), &fakeIptables{}) - proxyPort, err := p.addServiceOnUnusedPort("echo", "UDP", time.Second) + svcInfo, err := p.addServiceOnPort("echo", "UDP", 0, time.Second) if err != nil { t.Fatalf("error adding new service: %#v", err) } - testEchoUDP(t, "127.0.0.1", proxyPort) + testEchoUDP(t, "127.0.0.1", svcInfo.proxyPort) } // Helper: Stops the proxy for the named service. @@ -189,13 +213,13 @@ func TestTCPProxyStop(t *testing.T) { }, }) - p := NewProxier(lb, net.ParseIP("127.0.0.1")) + p := NewProxier(lb, net.ParseIP("127.0.0.1"), &fakeIptables{}) - proxyPort, err := p.addServiceOnUnusedPort("echo", "TCP", 0) + svcInfo, err := p.addServiceOnPort("echo", "TCP", 0, time.Second) if err != nil { t.Fatalf("error adding new service: %#v", err) } - conn, err := net.Dial("tcp", net.JoinHostPort("127.0.0.1", proxyPort)) + conn, err := net.Dial("tcp", joinHostPort("", svcInfo.proxyPort)) if err != nil { t.Fatalf("error connecting to proxy: %v", err) } @@ -203,7 +227,7 @@ func TestTCPProxyStop(t *testing.T) { stopProxyByName(p, "echo") // Wait for the port to really close. - if err := waitForClosedPortTCP(p, proxyPort); err != nil { + if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil { t.Fatalf(err.Error()) } } @@ -217,13 +241,13 @@ func TestUDPProxyStop(t *testing.T) { }, }) - p := NewProxier(lb, net.ParseIP("127.0.0.1")) + p := NewProxier(lb, net.ParseIP("127.0.0.1"), &fakeIptables{}) - proxyPort, err := p.addServiceOnUnusedPort("echo", "UDP", time.Second) + svcInfo, err := p.addServiceOnPort("echo", "UDP", 0, time.Second) if err != nil { t.Fatalf("error adding new service: %#v", err) } - conn, err := net.Dial("udp", net.JoinHostPort("127.0.0.1", proxyPort)) + conn, err := net.Dial("udp", joinHostPort("", svcInfo.proxyPort)) if err != nil { t.Fatalf("error connecting to proxy: %v", err) } @@ -231,7 +255,7 @@ func TestUDPProxyStop(t *testing.T) { stopProxyByName(p, "echo") // Wait for the port to really close. - if err := waitForClosedPortUDP(p, proxyPort); err != nil { + if err := waitForClosedPortUDP(p, svcInfo.proxyPort); err != nil { t.Fatalf(err.Error()) } } @@ -245,20 +269,20 @@ func TestTCPProxyUpdateDelete(t *testing.T) { }, }) - p := NewProxier(lb, net.ParseIP("127.0.0.1")) + p := NewProxier(lb, net.ParseIP("127.0.0.1"), &fakeIptables{}) - proxyPort, err := p.addServiceOnUnusedPort("echo", "TCP", 0) + svcInfo, err := p.addServiceOnPort("echo", "TCP", 0, time.Second) if err != nil { t.Fatalf("error adding new service: %#v", err) } - conn, err := net.Dial("tcp", net.JoinHostPort("127.0.0.1", proxyPort)) + conn, err := net.Dial("tcp", joinHostPort("", svcInfo.proxyPort)) if err != nil { t.Fatalf("error connecting to proxy: %v", err) } conn.Close() p.OnUpdate([]api.Service{}) - if err := waitForClosedPortTCP(p, proxyPort); err != nil { + if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil { t.Fatalf(err.Error()) } } @@ -272,20 +296,20 @@ func TestUDPProxyUpdateDelete(t *testing.T) { }, }) - p := NewProxier(lb, net.ParseIP("127.0.0.1")) + p := NewProxier(lb, net.ParseIP("127.0.0.1"), &fakeIptables{}) - proxyPort, err := p.addServiceOnUnusedPort("echo", "UDP", time.Second) + svcInfo, err := p.addServiceOnPort("echo", "UDP", 0, time.Second) if err != nil { t.Fatalf("error adding new service: %#v", err) } - conn, err := net.Dial("udp", net.JoinHostPort("127.0.0.1", proxyPort)) + conn, err := net.Dial("udp", joinHostPort("", svcInfo.proxyPort)) if err != nil { t.Fatalf("error connecting to proxy: %v", err) } conn.Close() p.OnUpdate([]api.Service{}) - if err := waitForClosedPortUDP(p, proxyPort); err != nil { + if err := waitForClosedPortUDP(p, svcInfo.proxyPort); err != nil { t.Fatalf(err.Error()) } } @@ -299,27 +323,26 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) { }, }) - p := NewProxier(lb, net.ParseIP("127.0.0.1")) + p := NewProxier(lb, net.ParseIP("127.0.0.1"), &fakeIptables{}) - proxyPort, err := p.addServiceOnUnusedPort("echo", "TCP", 0) + svcInfo, err := p.addServiceOnPort("echo", "TCP", 0, time.Second) if err != nil { t.Fatalf("error adding new service: %#v", err) } - conn, err := net.Dial("tcp", net.JoinHostPort("127.0.0.1", proxyPort)) + conn, err := net.Dial("tcp", joinHostPort("", svcInfo.proxyPort)) if err != nil { t.Fatalf("error connecting to proxy: %v", err) } conn.Close() p.OnUpdate([]api.Service{}) - if err := waitForClosedPortTCP(p, proxyPort); err != nil { + if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil { t.Fatalf(err.Error()) } - proxyPortNum, _ := strconv.Atoi(proxyPort) p.OnUpdate([]api.Service{ - {TypeMeta: api.TypeMeta{ID: "echo"}, Port: proxyPortNum, Protocol: "TCP"}, + {TypeMeta: api.TypeMeta{ID: "echo"}, Port: svcInfo.proxyPort, ProxyPort: svcInfo.proxyPort, Protocol: "TCP"}, }) - testEchoTCP(t, "127.0.0.1", proxyPort) + testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort) } func TestUDPProxyUpdateDeleteUpdate(t *testing.T) { @@ -331,27 +354,26 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) { }, }) - p := NewProxier(lb, net.ParseIP("127.0.0.1")) + p := NewProxier(lb, net.ParseIP("127.0.0.1"), &fakeIptables{}) - proxyPort, err := p.addServiceOnUnusedPort("echo", "UDP", time.Second) + svcInfo, err := p.addServiceOnPort("echo", "UDP", 0, time.Second) if err != nil { t.Fatalf("error adding new service: %#v", err) } - conn, err := net.Dial("udp", net.JoinHostPort("127.0.0.1", proxyPort)) + conn, err := net.Dial("udp", joinHostPort("", svcInfo.proxyPort)) if err != nil { t.Fatalf("error connecting to proxy: %v", err) } conn.Close() p.OnUpdate([]api.Service{}) - if err := waitForClosedPortUDP(p, proxyPort); err != nil { + if err := waitForClosedPortUDP(p, svcInfo.proxyPort); err != nil { t.Fatalf(err.Error()) } - proxyPortNum, _ := strconv.Atoi(proxyPort) p.OnUpdate([]api.Service{ - {TypeMeta: api.TypeMeta{ID: "echo"}, Port: proxyPortNum, Protocol: "UDP"}, + {TypeMeta: api.TypeMeta{ID: "echo"}, Port: svcInfo.proxyPort, ProxyPort: svcInfo.proxyPort, Protocol: "UDP"}, }) - testEchoUDP(t, "127.0.0.1", proxyPort) + testEchoUDP(t, "127.0.0.1", svcInfo.proxyPort) } func TestTCPProxyUpdatePort(t *testing.T) { @@ -363,36 +385,36 @@ func TestTCPProxyUpdatePort(t *testing.T) { }, }) - p := NewProxier(lb, net.ParseIP("127.0.0.1")) + p := NewProxier(lb, net.ParseIP("127.0.0.1"), &fakeIptables{}) - proxyPort, err := p.addServiceOnUnusedPort("echo", "TCP", 0) + svcInfo, err := p.addServiceOnPort("echo", "TCP", 0, time.Second) if err != nil { t.Fatalf("error adding new service: %#v", err) } // add a new dummy listener in order to get a port that is free l, _ := net.Listen("tcp", ":0") - _, newPort, _ := net.SplitHostPort(l.Addr().String()) - newPortNum, _ := strconv.Atoi(newPort) + _, newPortStr, _ := net.SplitHostPort(l.Addr().String()) + newPort, _ := strconv.Atoi(newPortStr) l.Close() // Wait for the socket to actually get free. if err := waitForClosedPortTCP(p, newPort); err != nil { t.Fatalf(err.Error()) } - if proxyPort == newPort { - t.Errorf("expected difference, got %s %s", newPort, proxyPort) + if svcInfo.proxyPort == newPort { + t.Errorf("expected difference, got %d %d", newPort, svcInfo.proxyPort) } p.OnUpdate([]api.Service{ - {TypeMeta: api.TypeMeta{ID: "echo"}, Port: newPortNum, Protocol: "TCP"}, + {TypeMeta: api.TypeMeta{ID: "echo"}, Port: newPort, ProxyPort: newPort, Protocol: "TCP"}, }) - if err := waitForClosedPortTCP(p, proxyPort); err != nil { + if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil { t.Fatalf(err.Error()) } testEchoTCP(t, "127.0.0.1", newPort) // Ensure the old port is released and re-usable. - l, err = net.Listen("tcp", net.JoinHostPort("", proxyPort)) + l, err = net.Listen("tcp", joinHostPort("", svcInfo.proxyPort)) if err != nil { t.Fatalf("can't claim released port: %s", err) } @@ -408,36 +430,36 @@ func TestUDPProxyUpdatePort(t *testing.T) { }, }) - p := NewProxier(lb, net.ParseIP("127.0.0.1")) + p := NewProxier(lb, net.ParseIP("127.0.0.1"), &fakeIptables{}) - proxyPort, err := p.addServiceOnUnusedPort("echo", "UDP", time.Second) + svcInfo, err := p.addServiceOnPort("echo", "UDP", 0, time.Second) if err != nil { t.Fatalf("error adding new service: %#v", err) } // add a new dummy listener in order to get a port that is free pc, _ := net.ListenPacket("udp", ":0") - _, newPort, _ := net.SplitHostPort(pc.LocalAddr().String()) - newPortNum, _ := strconv.Atoi(newPort) + _, newPortStr, _ := net.SplitHostPort(pc.LocalAddr().String()) + newPort, _ := strconv.Atoi(newPortStr) pc.Close() // Wait for the socket to actually get free. if err := waitForClosedPortUDP(p, newPort); err != nil { t.Fatalf(err.Error()) } - if proxyPort == newPort { - t.Errorf("expected difference, got %s %s", newPort, proxyPort) + if svcInfo.proxyPort == newPort { + t.Errorf("expected difference, got %d %d", newPort, svcInfo.proxyPort) } p.OnUpdate([]api.Service{ - {TypeMeta: api.TypeMeta{ID: "echo"}, Port: newPortNum, Protocol: "UDP"}, + {TypeMeta: api.TypeMeta{ID: "echo"}, Port: newPort, ProxyPort: newPort, Protocol: "UDP"}, }) - if err := waitForClosedPortUDP(p, proxyPort); err != nil { + if err := waitForClosedPortUDP(p, svcInfo.proxyPort); err != nil { t.Fatalf(err.Error()) } testEchoUDP(t, "127.0.0.1", newPort) // Ensure the old port is released and re-usable. - pc, err = net.ListenPacket("udp", net.JoinHostPort("", proxyPort)) + pc, err = net.ListenPacket("udp", joinHostPort("", svcInfo.proxyPort)) if err != nil { t.Fatalf("can't claim released port: %s", err) } diff --git a/pkg/registry/pod/manifest_factory_test.go b/pkg/registry/pod/manifest_factory_test.go index af1ccec3a80..e24d127c296 100644 --- a/pkg/registry/pod/manifest_factory_test.go +++ b/pkg/registry/pod/manifest_factory_test.go @@ -48,10 +48,8 @@ func TestMakeManifestNoServices(t *testing.T) { } container := manifest.Containers[0] - if len(container.Env) != 1 || - container.Env[0].Name != "SERVICE_HOST" || - container.Env[0].Value != "machine" { - t.Errorf("Expected one env vars, got: %#v", manifest) + if len(container.Env) != 0 { + t.Errorf("Expected zero env vars, got: %#v", manifest) } if manifest.ID != "foobar" { t.Errorf("Failed to assign ID to manifest: %#v", manifest.ID) @@ -69,6 +67,7 @@ func TestMakeManifestServices(t *testing.T) { Kind: util.IntstrInt, IntVal: 900, }, + PortalIP: "1.2.3.4", }, }, }, @@ -96,7 +95,7 @@ func TestMakeManifestServices(t *testing.T) { envs := []api.EnvVar{ { Name: "TEST_SERVICE_HOST", - Value: "machine", + Value: "1.2.3.4", }, { Name: "TEST_SERVICE_PORT", @@ -104,11 +103,11 @@ func TestMakeManifestServices(t *testing.T) { }, { Name: "TEST_PORT", - Value: "tcp://machine:8080", + Value: "tcp://1.2.3.4:8080", }, { Name: "TEST_PORT_8080_TCP", - Value: "tcp://machine:8080", + Value: "tcp://1.2.3.4:8080", }, { Name: "TEST_PORT_8080_TCP_PROTO", @@ -120,11 +119,7 @@ func TestMakeManifestServices(t *testing.T) { }, { Name: "TEST_PORT_8080_TCP_ADDR", - Value: "machine", - }, - { - Name: "SERVICE_HOST", - Value: "machine", + Value: "1.2.3.4", }, } if len(container.Env) != len(envs) { @@ -149,6 +144,7 @@ func TestMakeManifestServicesExistingEnvVar(t *testing.T) { Kind: util.IntstrInt, IntVal: 900, }, + PortalIP: "1.2.3.4", }, }, }, @@ -186,7 +182,7 @@ func TestMakeManifestServicesExistingEnvVar(t *testing.T) { }, { Name: "TEST_SERVICE_HOST", - Value: "machine", + Value: "1.2.3.4", }, { Name: "TEST_SERVICE_PORT", @@ -194,11 +190,11 @@ func TestMakeManifestServicesExistingEnvVar(t *testing.T) { }, { Name: "TEST_PORT", - Value: "tcp://machine:8080", + Value: "tcp://1.2.3.4:8080", }, { Name: "TEST_PORT_8080_TCP", - Value: "tcp://machine:8080", + Value: "tcp://1.2.3.4:8080", }, { Name: "TEST_PORT_8080_TCP_PROTO", @@ -210,11 +206,7 @@ func TestMakeManifestServicesExistingEnvVar(t *testing.T) { }, { Name: "TEST_PORT_8080_TCP_ADDR", - Value: "machine", - }, - { - Name: "SERVICE_HOST", - Value: "machine", + Value: "1.2.3.4", }, } if len(container.Env) != len(envs) { diff --git a/pkg/registry/registrytest/service.go b/pkg/registry/registrytest/service.go index 8578d738a20..2871620c752 100644 --- a/pkg/registry/registrytest/service.go +++ b/pkg/registry/registrytest/service.go @@ -17,6 +17,8 @@ limitations under the License. package registrytest import ( + "sync" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" @@ -27,6 +29,7 @@ func NewServiceRegistry() *ServiceRegistry { } type ServiceRegistry struct { + mu sync.Mutex List api.ServiceList Service *api.Service Err error @@ -39,48 +42,84 @@ type ServiceRegistry struct { } func (r *ServiceRegistry) ListServices(ctx api.Context) (*api.ServiceList, error) { - return &r.List, r.Err + r.mu.Lock() + defer r.mu.Unlock() + + // Return by copy to avoid data races + res := new(api.ServiceList) + *res = r.List + res.Items = append([]api.Service{}, r.List.Items...) + return res, r.Err } func (r *ServiceRegistry) CreateService(ctx api.Context, svc *api.Service) error { + r.mu.Lock() + defer r.mu.Unlock() + r.Service = svc r.List.Items = append(r.List.Items, *svc) return r.Err } func (r *ServiceRegistry) GetService(ctx api.Context, id string) (*api.Service, error) { + r.mu.Lock() + defer r.mu.Unlock() + r.GottenID = id return r.Service, r.Err } func (r *ServiceRegistry) DeleteService(ctx api.Context, id string) error { + r.mu.Lock() + defer r.mu.Unlock() + r.DeletedID = id + r.Service = nil return r.Err } func (r *ServiceRegistry) UpdateService(ctx api.Context, svc *api.Service) error { + r.mu.Lock() + defer r.mu.Unlock() + r.UpdatedID = svc.ID + r.Service = svc return r.Err } func (r *ServiceRegistry) WatchServices(ctx api.Context, label labels.Selector, field labels.Selector, resourceVersion string) (watch.Interface, error) { + r.mu.Lock() + defer r.mu.Unlock() + return nil, r.Err } func (r *ServiceRegistry) ListEndpoints(ctx api.Context) (*api.EndpointsList, error) { + r.mu.Lock() + defer r.mu.Unlock() + return &r.EndpointsList, r.Err } func (r *ServiceRegistry) GetEndpoints(ctx api.Context, id string) (*api.Endpoints, error) { + r.mu.Lock() + defer r.mu.Unlock() + r.GottenID = id return &r.Endpoints, r.Err } func (r *ServiceRegistry) UpdateEndpoints(ctx api.Context, e *api.Endpoints) error { + r.mu.Lock() + defer r.mu.Unlock() + r.Endpoints = *e return r.Err } func (r *ServiceRegistry) WatchEndpoints(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) { + r.mu.Lock() + defer r.mu.Unlock() + return nil, r.Err } diff --git a/pkg/registry/service/ip_allocator.go b/pkg/registry/service/ip_allocator.go index e6717a7540c..f48511803f4 100644 --- a/pkg/registry/service/ip_allocator.go +++ b/pkg/registry/service/ip_allocator.go @@ -32,7 +32,6 @@ type ipAllocator struct { } // newIPAllocator creates and intializes a new ipAllocator object. -// FIXME: resync from storage at startup. func newIPAllocator(subnet *net.IPNet) *ipAllocator { if subnet == nil || subnet.IP == nil || subnet.Mask == nil { return nil diff --git a/pkg/registry/service/rest.go b/pkg/registry/service/rest.go index 3742c31dff0..42f26810d53 100644 --- a/pkg/registry/service/rest.go +++ b/pkg/registry/service/rest.go @@ -19,6 +19,7 @@ package service import ( "fmt" "math/rand" + "net" "strconv" "strings" @@ -32,21 +33,49 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" + "github.com/golang/glog" ) // REST adapts a service registry into apiserver's RESTStorage model. type REST struct { - registry Registry - cloud cloudprovider.Interface - machines minion.Registry + registry Registry + cloud cloudprovider.Interface + machines minion.Registry + portalMgr *ipAllocator } // NewREST returns a new REST. -func NewREST(registry Registry, cloud cloudprovider.Interface, machines minion.Registry) *REST { +func NewREST(registry Registry, cloud cloudprovider.Interface, machines minion.Registry, portalNet *net.IPNet) *REST { + // TODO: Before we can replicate masters, this has to be synced (e.g. lives in etcd) + ipa := newIPAllocator(portalNet) + reloadIPsFromStorage(ipa, registry) + return &REST{ - registry: registry, - cloud: cloud, - machines: machines, + registry: registry, + cloud: cloud, + machines: machines, + portalMgr: ipa, + } +} + +// Helper: mark all previously allocated IPs in the allocator. +func reloadIPsFromStorage(ipa *ipAllocator, registry Registry) { + services, err := registry.ListServices(api.NewContext()) + if err != nil { + // This is really bad. + glog.Errorf("can't list services to init service REST: %s", err) + return + } + for i := range services.Items { + s := &services.Items[i] + if s.PortalIP == "" { + glog.Warningf("service %q has no PortalIP", s.ID) + continue + } + if err := ipa.Allocate(net.ParseIP(s.PortalIP)); err != nil { + // This is really bad. + glog.Errorf("service %q PortalIP %s could not be allocated: %s", s.ID, s.PortalIP, err) + } } } @@ -61,9 +90,16 @@ func (rs *REST) Create(ctx api.Context, obj runtime.Object) (<-chan runtime.Obje srv.CreationTimestamp = util.Now() + if ip, err := rs.portalMgr.AllocateNext(); err != nil { + return nil, err + } else { + srv.PortalIP = ip.String() + } + return apiserver.MakeAsync(func() (runtime.Object, error) { // TODO: Consider moving this to a rectification loop, so that we make/remove external load balancers // correctly no matter what http operations happen. + srv.ProxyPort = 0 if srv.CreateExternalLoadBalancer { if rs.cloud == nil { return nil, fmt.Errorf("requested an external service, but no cloud provider supplied.") @@ -88,6 +124,9 @@ func (rs *REST) Create(ctx api.Context, obj runtime.Object) (<-chan runtime.Obje if err != nil { return nil, err } + // External load-balancers require a known port for the service proxy. + // TODO: If we end up brokering HostPorts between Pods and Services, this can be any port. + srv.ProxyPort = srv.Port } err := rs.registry.CreateService(ctx, srv) if err != nil { @@ -110,6 +149,7 @@ func (rs *REST) Delete(ctx api.Context, id string) (<-chan runtime.Object, error if err != nil { return nil, err } + rs.portalMgr.Release(net.ParseIP(service.PortalIP)) return apiserver.MakeAsync(func() (runtime.Object, error) { rs.deleteExternalLoadBalancer(service) return &api.Status{Status: api.StatusSuccess}, rs.registry.DeleteService(ctx, id) @@ -161,16 +201,13 @@ func GetServiceEnvironmentVariables(ctx api.Context, registry Registry, machine for _, service := range services.Items { // Host name := makeEnvVariableName(service.ID) + "_SERVICE_HOST" - result = append(result, api.EnvVar{Name: name, Value: machine}) + result = append(result, api.EnvVar{Name: name, Value: service.PortalIP}) // Port name = makeEnvVariableName(service.ID) + "_SERVICE_PORT" result = append(result, api.EnvVar{Name: name, Value: strconv.Itoa(service.Port)}) // Docker-compatible vars. - result = append(result, makeLinkVariables(service, machine)...) + result = append(result, makeLinkVariables(service)...) } - // The 'SERVICE_HOST' variable is deprecated. - // TODO(thockin): get rid of it once ip-per-service is in and "deployed". - result = append(result, api.EnvVar{Name: "SERVICE_HOST", Value: machine}) return result, nil } @@ -183,8 +220,15 @@ func (rs *REST) Update(ctx api.Context, obj runtime.Object) (<-chan runtime.Obje return nil, errors.NewInvalid("service", srv.ID, errs) } return apiserver.MakeAsync(func() (runtime.Object, error) { + cur, err := rs.registry.GetService(ctx, srv.ID) + if err != nil { + return nil, err + } + // Copy over non-user fields. + srv.PortalIP = cur.PortalIP + srv.ProxyPort = cur.ProxyPort // TODO: check to see if external load balancer status changed - err := rs.registry.UpdateService(ctx, srv) + err = rs.registry.UpdateService(ctx, srv) if err != nil { return nil, err } @@ -234,7 +278,7 @@ func makeEnvVariableName(str string) string { return strings.ToUpper(strings.Replace(str, "-", "_", -1)) } -func makeLinkVariables(service api.Service, machine string) []api.EnvVar { +func makeLinkVariables(service api.Service) []api.EnvVar { prefix := makeEnvVariableName(service.ID) protocol := string(api.ProtocolTCP) if service.Protocol != "" { @@ -244,11 +288,11 @@ func makeLinkVariables(service api.Service, machine string) []api.EnvVar { return []api.EnvVar{ { Name: prefix + "_PORT", - Value: fmt.Sprintf("%s://%s:%d", strings.ToLower(protocol), machine, service.Port), + Value: fmt.Sprintf("%s://%s:%d", strings.ToLower(protocol), service.PortalIP, service.Port), }, { Name: portPrefix, - Value: fmt.Sprintf("%s://%s:%d", strings.ToLower(protocol), machine, service.Port), + Value: fmt.Sprintf("%s://%s:%d", strings.ToLower(protocol), service.PortalIP, service.Port), }, { Name: portPrefix + "_PROTO", @@ -260,7 +304,7 @@ func makeLinkVariables(service api.Service, machine string) []api.EnvVar { }, { Name: portPrefix + "_ADDR", - Value: machine, + Value: service.PortalIP, }, } } diff --git a/pkg/registry/service/rest_test.go b/pkg/registry/service/rest_test.go index 1e82e197b81..5907146b016 100644 --- a/pkg/registry/service/rest_test.go +++ b/pkg/registry/service/rest_test.go @@ -18,6 +18,7 @@ package service import ( "fmt" + "net" "reflect" "testing" @@ -29,11 +30,19 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest" ) +func makeIPNet(t *testing.T) *net.IPNet { + _, net, err := net.ParseCIDR("1.2.3.0/24") + if err != nil { + t.Error(err) + } + return net +} + func TestServiceRegistryCreate(t *testing.T) { registry := registrytest.NewServiceRegistry() fakeCloud := &cloud.FakeCloud{} machines := []string{"foo", "bar", "baz"} - storage := NewREST(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{})) + storage := NewREST(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{}), makeIPNet(t)) svc := &api.Service{ Port: 6502, TypeMeta: api.TypeMeta{ID: "foo"}, @@ -49,6 +58,12 @@ func TestServiceRegistryCreate(t *testing.T) { if created_service.CreationTimestamp.IsZero() { t.Errorf("Expected timestamp to be set, got: %v", created_service.CreationTimestamp) } + if created_service.PortalIP != "1.2.3.1" { + t.Errorf("Unexpected PortalIP: %s", created_service.PortalIP) + } + if created_service.ProxyPort != 0 { + t.Errorf("Unexpected ProxyPort: %d", created_service.ProxyPort) + } if len(fakeCloud.Calls) != 0 { t.Errorf("Unexpected call(s): %#v", fakeCloud.Calls) } @@ -63,7 +78,7 @@ func TestServiceRegistryCreate(t *testing.T) { func TestServiceStorageValidatesCreate(t *testing.T) { registry := registrytest.NewServiceRegistry() - storage := NewREST(registry, nil, nil) + storage := NewREST(registry, nil, nil, makeIPNet(t)) failureCases := map[string]api.Service{ "empty ID": { Port: 6502, @@ -96,7 +111,7 @@ func TestServiceRegistryUpdate(t *testing.T) { TypeMeta: api.TypeMeta{ID: "foo"}, Selector: map[string]string{"bar": "baz1"}, }) - storage := NewREST(registry, nil, nil) + storage := NewREST(registry, nil, nil, makeIPNet(t)) c, err := storage.Update(ctx, &api.Service{ Port: 6502, TypeMeta: api.TypeMeta{ID: "foo"}, @@ -126,7 +141,7 @@ func TestServiceStorageValidatesUpdate(t *testing.T) { TypeMeta: api.TypeMeta{ID: "foo"}, Selector: map[string]string{"bar": "baz"}, }) - storage := NewREST(registry, nil, nil) + storage := NewREST(registry, nil, nil, makeIPNet(t)) failureCases := map[string]api.Service{ "empty ID": { Port: 6502, @@ -155,7 +170,7 @@ func TestServiceRegistryExternalService(t *testing.T) { registry := registrytest.NewServiceRegistry() fakeCloud := &cloud.FakeCloud{} machines := []string{"foo", "bar", "baz"} - storage := NewREST(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{})) + storage := NewREST(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{}), makeIPNet(t)) svc := &api.Service{ Port: 6502, TypeMeta: api.TypeMeta{ID: "foo"}, @@ -182,7 +197,7 @@ func TestServiceRegistryExternalServiceError(t *testing.T) { Err: fmt.Errorf("test error"), } machines := []string{"foo", "bar", "baz"} - storage := NewREST(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{})) + storage := NewREST(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{}), makeIPNet(t)) svc := &api.Service{ Port: 6502, TypeMeta: api.TypeMeta{ID: "foo"}, @@ -205,7 +220,7 @@ func TestServiceRegistryDelete(t *testing.T) { registry := registrytest.NewServiceRegistry() fakeCloud := &cloud.FakeCloud{} machines := []string{"foo", "bar", "baz"} - storage := NewREST(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{})) + storage := NewREST(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{}), makeIPNet(t)) svc := &api.Service{ TypeMeta: api.TypeMeta{ID: "foo"}, Selector: map[string]string{"bar": "baz"}, @@ -226,7 +241,7 @@ func TestServiceRegistryDeleteExternal(t *testing.T) { registry := registrytest.NewServiceRegistry() fakeCloud := &cloud.FakeCloud{} machines := []string{"foo", "bar", "baz"} - storage := NewREST(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{})) + storage := NewREST(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{}), makeIPNet(t)) svc := &api.Service{ TypeMeta: api.TypeMeta{ID: "foo"}, Selector: map[string]string{"bar": "baz"}, @@ -253,18 +268,21 @@ func TestServiceRegistryMakeLinkVariables(t *testing.T) { Selector: map[string]string{"bar": "baz"}, Port: 8080, Protocol: "TCP", + PortalIP: "1.2.3.4", }, { TypeMeta: api.TypeMeta{ID: "abc-123"}, Selector: map[string]string{"bar": "baz"}, Port: 8081, Protocol: "UDP", + PortalIP: "5.6.7.8", }, { TypeMeta: api.TypeMeta{ID: "q-u-u-x"}, Selector: map[string]string{"bar": "baz"}, Port: 8082, Protocol: "", + PortalIP: "9.8.7.6", }, }, } @@ -274,28 +292,27 @@ func TestServiceRegistryMakeLinkVariables(t *testing.T) { t.Errorf("Unexpected err: %v", err) } expected := []api.EnvVar{ - {Name: "FOO_BAR_SERVICE_HOST", Value: "machine"}, + {Name: "FOO_BAR_SERVICE_HOST", Value: "1.2.3.4"}, {Name: "FOO_BAR_SERVICE_PORT", Value: "8080"}, - {Name: "FOO_BAR_PORT", Value: "tcp://machine:8080"}, - {Name: "FOO_BAR_PORT_8080_TCP", Value: "tcp://machine:8080"}, + {Name: "FOO_BAR_PORT", Value: "tcp://1.2.3.4:8080"}, + {Name: "FOO_BAR_PORT_8080_TCP", Value: "tcp://1.2.3.4:8080"}, {Name: "FOO_BAR_PORT_8080_TCP_PROTO", Value: "tcp"}, {Name: "FOO_BAR_PORT_8080_TCP_PORT", Value: "8080"}, - {Name: "FOO_BAR_PORT_8080_TCP_ADDR", Value: "machine"}, - {Name: "ABC_123_SERVICE_HOST", Value: "machine"}, + {Name: "FOO_BAR_PORT_8080_TCP_ADDR", Value: "1.2.3.4"}, + {Name: "ABC_123_SERVICE_HOST", Value: "5.6.7.8"}, {Name: "ABC_123_SERVICE_PORT", Value: "8081"}, - {Name: "ABC_123_PORT", Value: "udp://machine:8081"}, - {Name: "ABC_123_PORT_8081_UDP", Value: "udp://machine:8081"}, + {Name: "ABC_123_PORT", Value: "udp://5.6.7.8:8081"}, + {Name: "ABC_123_PORT_8081_UDP", Value: "udp://5.6.7.8:8081"}, {Name: "ABC_123_PORT_8081_UDP_PROTO", Value: "udp"}, {Name: "ABC_123_PORT_8081_UDP_PORT", Value: "8081"}, - {Name: "ABC_123_PORT_8081_UDP_ADDR", Value: "machine"}, - {Name: "Q_U_U_X_SERVICE_HOST", Value: "machine"}, + {Name: "ABC_123_PORT_8081_UDP_ADDR", Value: "5.6.7.8"}, + {Name: "Q_U_U_X_SERVICE_HOST", Value: "9.8.7.6"}, {Name: "Q_U_U_X_SERVICE_PORT", Value: "8082"}, - {Name: "Q_U_U_X_PORT", Value: "tcp://machine:8082"}, - {Name: "Q_U_U_X_PORT_8082_TCP", Value: "tcp://machine:8082"}, + {Name: "Q_U_U_X_PORT", Value: "tcp://9.8.7.6:8082"}, + {Name: "Q_U_U_X_PORT_8082_TCP", Value: "tcp://9.8.7.6:8082"}, {Name: "Q_U_U_X_PORT_8082_TCP_PROTO", Value: "tcp"}, {Name: "Q_U_U_X_PORT_8082_TCP_PORT", Value: "8082"}, - {Name: "Q_U_U_X_PORT_8082_TCP_ADDR", Value: "machine"}, - {Name: "SERVICE_HOST", Value: "machine"}, + {Name: "Q_U_U_X_PORT_8082_TCP_ADDR", Value: "9.8.7.6"}, } if len(vars) != len(expected) { t.Errorf("Expected %d env vars, got: %+v", len(expected), vars) @@ -313,7 +330,7 @@ func TestServiceRegistryGet(t *testing.T) { registry := registrytest.NewServiceRegistry() fakeCloud := &cloud.FakeCloud{} machines := []string{"foo", "bar", "baz"} - storage := NewREST(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{})) + storage := NewREST(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{}), makeIPNet(t)) registry.CreateService(ctx, &api.Service{ TypeMeta: api.TypeMeta{ID: "foo"}, Selector: map[string]string{"bar": "baz"}, @@ -333,7 +350,7 @@ func TestServiceRegistryResourceLocation(t *testing.T) { registry.Endpoints = api.Endpoints{Endpoints: []string{"foo:80"}} fakeCloud := &cloud.FakeCloud{} machines := []string{"foo", "bar", "baz"} - storage := NewREST(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{})) + storage := NewREST(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{}), makeIPNet(t)) registry.CreateService(ctx, &api.Service{ TypeMeta: api.TypeMeta{ID: "foo"}, Selector: map[string]string{"bar": "baz"}, @@ -362,7 +379,7 @@ func TestServiceRegistryList(t *testing.T) { registry := registrytest.NewServiceRegistry() fakeCloud := &cloud.FakeCloud{} machines := []string{"foo", "bar", "baz"} - storage := NewREST(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{})) + storage := NewREST(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{}), makeIPNet(t)) registry.CreateService(ctx, &api.Service{ TypeMeta: api.TypeMeta{ID: "foo"}, Selector: map[string]string{"bar": "baz"}, @@ -390,3 +407,194 @@ func TestServiceRegistryList(t *testing.T) { t.Errorf("Unexpected resource version: %#v", sl) } } + +func TestServiceRegistryIPAllocation(t *testing.T) { + registry := registrytest.NewServiceRegistry() + fakeCloud := &cloud.FakeCloud{} + machines := []string{"foo", "bar", "baz"} + rest := NewREST(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{}), makeIPNet(t)) + + svc1 := &api.Service{ + Port: 6502, + TypeMeta: api.TypeMeta{ID: "foo"}, + Selector: map[string]string{"bar": "baz"}, + } + ctx := api.NewDefaultContext() + c1, _ := rest.Create(ctx, svc1) + created_svc1 := <-c1 + created_service_1 := created_svc1.(*api.Service) + if created_service_1.ID != "foo" { + t.Errorf("Expected foo, but got %v", created_service_1.ID) + } + if created_service_1.PortalIP != "1.2.3.1" { + t.Errorf("Unexpected PortalIP: %s", created_service_1.PortalIP) + } + + svc2 := &api.Service{ + Port: 6502, + TypeMeta: api.TypeMeta{ID: "bar"}, + Selector: map[string]string{"bar": "baz"}, + } + ctx = api.NewDefaultContext() + c2, _ := rest.Create(ctx, svc2) + created_svc2 := <-c2 + created_service_2 := created_svc2.(*api.Service) + if created_service_2.ID != "bar" { + t.Errorf("Expected bar, but got %v", created_service_2.ID) + } + if created_service_2.PortalIP != "1.2.3.2" { // new IP + t.Errorf("Unexpected PortalIP: %s", created_service_2.PortalIP) + } +} + +func TestServiceRegistryIPReallocation(t *testing.T) { + registry := registrytest.NewServiceRegistry() + fakeCloud := &cloud.FakeCloud{} + machines := []string{"foo", "bar", "baz"} + rest := NewREST(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{}), makeIPNet(t)) + + svc1 := &api.Service{ + Port: 6502, + TypeMeta: api.TypeMeta{ID: "foo"}, + Selector: map[string]string{"bar": "baz"}, + } + ctx := api.NewDefaultContext() + c1, _ := rest.Create(ctx, svc1) + created_svc1 := <-c1 + created_service_1 := created_svc1.(*api.Service) + if created_service_1.ID != "foo" { + t.Errorf("Expected foo, but got %v", created_service_1.ID) + } + if created_service_1.PortalIP != "1.2.3.1" { + t.Errorf("Unexpected PortalIP: %s", created_service_1.PortalIP) + } + + c, _ := rest.Delete(ctx, created_service_1.ID) + <-c + + svc2 := &api.Service{ + Port: 6502, + TypeMeta: api.TypeMeta{ID: "bar"}, + Selector: map[string]string{"bar": "baz"}, + } + ctx = api.NewDefaultContext() + c2, _ := rest.Create(ctx, svc2) + created_svc2 := <-c2 + created_service_2 := created_svc2.(*api.Service) + if created_service_2.ID != "bar" { + t.Errorf("Expected bar, but got %v", created_service_2.ID) + } + if created_service_2.PortalIP != "1.2.3.1" { // same IP as before + t.Errorf("Unexpected PortalIP: %s", created_service_2.PortalIP) + } +} + +func TestServiceRegistryIPUpdate(t *testing.T) { + registry := registrytest.NewServiceRegistry() + fakeCloud := &cloud.FakeCloud{} + machines := []string{"foo", "bar", "baz"} + rest := NewREST(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{}), makeIPNet(t)) + + svc := &api.Service{ + Port: 6502, + TypeMeta: api.TypeMeta{ID: "foo"}, + Selector: map[string]string{"bar": "baz"}, + } + ctx := api.NewDefaultContext() + c, _ := rest.Create(ctx, svc) + created_svc := <-c + created_service := created_svc.(*api.Service) + if created_service.Port != 6502 { + t.Errorf("Expected port 6502, but got %v", created_service.Port) + } + if created_service.PortalIP != "1.2.3.1" { + t.Errorf("Unexpected PortalIP: %s", created_service.PortalIP) + } + if created_service.ProxyPort != 0 { + t.Errorf("Unexpected ProxyPort: %d", created_service.ProxyPort) + } + + update := new(api.Service) + *update = *created_service + update.Port = 6503 + update.PortalIP = "8.6.7.5" + update.ProxyPort = 309 + + c, _ = rest.Update(ctx, update) + updated_svc := <-c + updated_service := updated_svc.(*api.Service) + if updated_service.Port != 6503 { + t.Errorf("Expected port 6503, but got %v", updated_service.Port) + } + if updated_service.PortalIP != "1.2.3.1" { // unchanged, despite trying + t.Errorf("Unexpected PortalIP: %s", updated_service.PortalIP) + } + if updated_service.ProxyPort != 0 { // unchanged, despite trying + t.Errorf("Unexpected ProxyPort: %d", updated_service.ProxyPort) + } +} + +func TestServiceRegistryIPExternalLoadBalancer(t *testing.T) { + registry := registrytest.NewServiceRegistry() + fakeCloud := &cloud.FakeCloud{} + machines := []string{"foo", "bar", "baz"} + rest := NewREST(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{}), makeIPNet(t)) + + svc := &api.Service{ + Port: 6502, + TypeMeta: api.TypeMeta{ID: "foo"}, + Selector: map[string]string{"bar": "baz"}, + CreateExternalLoadBalancer: true, + } + ctx := api.NewDefaultContext() + c, _ := rest.Create(ctx, svc) + created_svc := <-c + created_service := created_svc.(*api.Service) + if created_service.Port != 6502 { + t.Errorf("Expected port 6502, but got %v", created_service.Port) + } + if created_service.PortalIP != "1.2.3.1" { + t.Errorf("Unexpected PortalIP: %s", created_service.PortalIP) + } + if created_service.ProxyPort != 6502 { + t.Errorf("Unexpected ProxyPort: %d", created_service.ProxyPort) + } +} + +func TestServiceRegistryIPReloadFromStorage(t *testing.T) { + registry := registrytest.NewServiceRegistry() + fakeCloud := &cloud.FakeCloud{} + machines := []string{"foo", "bar", "baz"} + rest1 := NewREST(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{}), makeIPNet(t)) + + svc := &api.Service{ + Port: 6502, + TypeMeta: api.TypeMeta{ID: "foo"}, + Selector: map[string]string{"bar": "baz"}, + } + ctx := api.NewDefaultContext() + c, _ := rest1.Create(ctx, svc) + <-c + svc = &api.Service{ + Port: 6502, + TypeMeta: api.TypeMeta{ID: "foo"}, + Selector: map[string]string{"bar": "baz"}, + } + c, _ = rest1.Create(ctx, svc) + <-c + + // This will reload from storage, finding the previous 2 + rest2 := NewREST(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{}), makeIPNet(t)) + + svc = &api.Service{ + Port: 6502, + TypeMeta: api.TypeMeta{ID: "foo"}, + Selector: map[string]string{"bar": "baz"}, + } + c, _ = rest2.Create(ctx, svc) + created_svc := <-c + created_service := created_svc.(*api.Service) + if created_service.PortalIP != "1.2.3.3" { + t.Errorf("Unexpected PortalIP: %s", created_service.PortalIP) + } +}