Merge pull request #135 from brendandburns/lb
Add load balancing support to services.
This commit is contained in:
@@ -140,9 +140,10 @@ type ServiceList struct {
|
||||
// Defines a service abstraction by a name (for example, mysql) consisting of local port
|
||||
// (for example 3306) that the proxy listens on, and the labels that define the service.
|
||||
type Service struct {
|
||||
JSONBase `json:",inline" yaml:",inline"`
|
||||
Port int `json:"port,omitempty" yaml:"port,omitempty"`
|
||||
Labels map[string]string `json:"labels,omitempty" yaml:"labels,omitempty"`
|
||||
JSONBase `json:",inline" yaml:",inline"`
|
||||
Port int `json:"port,omitempty" yaml:"port,omitempty"`
|
||||
Labels map[string]string `json:"labels,omitempty" yaml:"labels,omitempty"`
|
||||
CreateExternalLoadBalancer bool `json:"createExternalLoadBalancer,omitempty" yaml:"createExternalLoadBalancer,omitempty"`
|
||||
}
|
||||
|
||||
// Defines the endpoints that implement the actual service, for example:
|
||||
|
||||
@@ -190,8 +190,12 @@ func (server *ApiServer) handleREST(parts []string, requestUrl *url.URL, req *ht
|
||||
server.error(err, w)
|
||||
return
|
||||
}
|
||||
storage.Create(obj)
|
||||
server.write(200, obj, w)
|
||||
err = storage.Create(obj)
|
||||
if err != nil {
|
||||
server.error(err, w)
|
||||
} else {
|
||||
server.write(200, obj, w)
|
||||
}
|
||||
return
|
||||
case "DELETE":
|
||||
if len(parts) != 2 {
|
||||
|
||||
31
pkg/cloudprovider/cloud.go
Normal file
31
pkg/cloudprovider/cloud.go
Normal file
@@ -0,0 +1,31 @@
|
||||
/*
|
||||
Copyright 2014 Google Inc. All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package cloudprovider
|
||||
|
||||
// CloudInterface is an abstract, pluggable interface for cloud providers
|
||||
type Interface interface {
|
||||
// TCPLoadBalancer returns a balancer interface, or nil if none is supported. Returns an error if one occurs.
|
||||
TCPLoadBalancer() (TCPLoadBalancer, error)
|
||||
}
|
||||
|
||||
type TCPLoadBalancer interface {
|
||||
// TODO: Break this up into different interfaces (LB, etc) when we have more than one type of service
|
||||
TCPLoadBalancerExists(name, region string) (bool, error)
|
||||
CreateTCPLoadBalancer(name, region string, port int, hosts []string) error
|
||||
UpdateTCPLoadBalancer(name, region string, hosts []string) error
|
||||
DeleteTCPLoadBalancer(name, region string) error
|
||||
}
|
||||
20
pkg/cloudprovider/doc.go
Normal file
20
pkg/cloudprovider/doc.go
Normal file
@@ -0,0 +1,20 @@
|
||||
/*
|
||||
Copyright 2014 Google Inc. All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
// Package cloudprovider supplies interfaces and implementations for cloud service providers
|
||||
package cloudprovider
|
||||
|
||||
import ()
|
||||
164
pkg/cloudprovider/gce.go
Normal file
164
pkg/cloudprovider/gce.go
Normal file
@@ -0,0 +1,164 @@
|
||||
/*
|
||||
Copyright 2014 Google Inc. All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package cloudprovider
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"code.google.com/p/goauth2/compute/serviceaccount"
|
||||
compute "code.google.com/p/google-api-go-client/compute/v1"
|
||||
)
|
||||
|
||||
type GCECloud struct {
|
||||
service *compute.Service
|
||||
projectID string
|
||||
zone string
|
||||
}
|
||||
|
||||
func getProjectAndZone() (string, string, error) {
|
||||
client := http.Client{}
|
||||
url := "http://metadata/computeMetadata/v1/instance/zone"
|
||||
req, err := http.NewRequest("GET", url, nil)
|
||||
if err != nil {
|
||||
return "", "", err
|
||||
}
|
||||
req.Header.Add("X-Google-Metadata-Request", "True")
|
||||
res, err := client.Do(req)
|
||||
if err != nil {
|
||||
return "", "", err
|
||||
}
|
||||
defer res.Body.Close()
|
||||
data, err := ioutil.ReadAll(res.Body)
|
||||
if err != nil {
|
||||
return "", "", err
|
||||
}
|
||||
parts := strings.Split(string(data), "/")
|
||||
if len(parts) != 4 {
|
||||
return "", "", fmt.Errorf("Unexpected response: %s", string(data))
|
||||
}
|
||||
return parts[1], parts[3], nil
|
||||
}
|
||||
|
||||
func NewGCECloud() (*GCECloud, error) {
|
||||
projectID, zone, err := getProjectAndZone()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
client, err := serviceaccount.NewClient(&serviceaccount.Options{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
svc, err := compute.New(client)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &GCECloud{
|
||||
service: svc,
|
||||
projectID: projectID,
|
||||
zone: zone,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (gce *GCECloud) TCPLoadBalancer() (TCPLoadBalancer, error) {
|
||||
return gce, nil
|
||||
}
|
||||
|
||||
func makeHostLink(projectID, zone, host string) string {
|
||||
ix := strings.Index(host, ".")
|
||||
if ix != -1 {
|
||||
host = host[:ix]
|
||||
}
|
||||
return fmt.Sprintf("https://www.googleapis.com/compute/v1/projects/%s/zones/%s/instances/%s",
|
||||
projectID, zone, host)
|
||||
}
|
||||
|
||||
func (gce *GCECloud) makeTargetPool(name, region string, hosts []string) (string, error) {
|
||||
var instances []string
|
||||
for _, host := range hosts {
|
||||
instances = append(instances, makeHostLink(gce.projectID, gce.zone, host))
|
||||
}
|
||||
pool := &compute.TargetPool{
|
||||
Name: name,
|
||||
Instances: instances,
|
||||
}
|
||||
_, err := gce.service.TargetPools.Insert(gce.projectID, region, pool).Do()
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
link := fmt.Sprintf("https://www.googleapis.com/compute/v1/projects/%s/regions/%s/targetPools/%s", gce.projectID, region, name)
|
||||
return link, nil
|
||||
}
|
||||
|
||||
func (gce *GCECloud) waitForRegionOp(op *compute.Operation, region string) error {
|
||||
pollOp := op
|
||||
for pollOp.Status != "DONE" {
|
||||
var err error
|
||||
time.Sleep(time.Second * 10)
|
||||
pollOp, err = gce.service.RegionOperations.Get(gce.projectID, region, op.Name).Do()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (gce *GCECloud) TCPLoadBalancerExists(name, region string) (bool, error) {
|
||||
_, err := gce.service.ForwardingRules.Get(gce.projectID, region, name).Do()
|
||||
return false, err
|
||||
}
|
||||
|
||||
func (gce *GCECloud) CreateTCPLoadBalancer(name, region string, port int, hosts []string) error {
|
||||
pool, err := gce.makeTargetPool(name, region, hosts)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
req := &compute.ForwardingRule{
|
||||
Name: name,
|
||||
IPProtocol: "TCP",
|
||||
PortRange: strconv.Itoa(port),
|
||||
Target: pool,
|
||||
}
|
||||
_, err = gce.service.ForwardingRules.Insert(gce.projectID, region, req).Do()
|
||||
return err
|
||||
}
|
||||
|
||||
func (gce *GCECloud) UpdateTCPLoadBalancer(name, region string, hosts []string) error {
|
||||
var refs []*compute.InstanceReference
|
||||
for _, host := range hosts {
|
||||
refs = append(refs, &compute.InstanceReference{host})
|
||||
}
|
||||
req := &compute.TargetPoolsAddInstanceRequest{
|
||||
Instances: refs,
|
||||
}
|
||||
|
||||
_, err := gce.service.TargetPools.AddInstance(gce.projectID, region, name, req).Do()
|
||||
return err
|
||||
}
|
||||
|
||||
func (gce *GCECloud) DeleteTCPLoadBalancer(name, region string) error {
|
||||
_, err := gce.service.ForwardingRules.Delete(gce.projectID, region, name).Do()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = gce.service.TargetPools.Delete(gce.projectID, region, name).Do()
|
||||
return err
|
||||
}
|
||||
@@ -23,6 +23,7 @@ import (
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
||||
"github.com/coreos/go-etcd/etcd"
|
||||
@@ -40,29 +41,29 @@ type Master struct {
|
||||
}
|
||||
|
||||
// Returns a memory (not etcd) backed apiserver.
|
||||
func NewMemoryServer(minions []string) *Master {
|
||||
func NewMemoryServer(minions []string, cloud cloudprovider.Interface) *Master {
|
||||
m := &Master{
|
||||
podRegistry: registry.MakeMemoryRegistry(),
|
||||
controllerRegistry: registry.MakeMemoryRegistry(),
|
||||
serviceRegistry: registry.MakeMemoryRegistry(),
|
||||
}
|
||||
m.init(minions)
|
||||
m.init(minions, cloud)
|
||||
return m
|
||||
}
|
||||
|
||||
// Returns a new apiserver.
|
||||
func New(etcdServers, minions []string) *Master {
|
||||
func New(etcdServers, minions []string, cloud cloudprovider.Interface) *Master {
|
||||
etcdClient := etcd.NewClient(etcdServers)
|
||||
m := &Master{
|
||||
podRegistry: registry.MakeEtcdRegistry(etcdClient, minions),
|
||||
controllerRegistry: registry.MakeEtcdRegistry(etcdClient, minions),
|
||||
serviceRegistry: registry.MakeEtcdRegistry(etcdClient, minions),
|
||||
}
|
||||
m.init(minions)
|
||||
m.init(minions, cloud)
|
||||
return m
|
||||
}
|
||||
|
||||
func (m *Master) init(minions []string) {
|
||||
func (m *Master) init(minions []string, cloud cloudprovider.Interface) {
|
||||
containerInfo := &client.HTTPContainerInfo{
|
||||
Client: http.DefaultClient,
|
||||
Port: 10250,
|
||||
@@ -73,7 +74,7 @@ func (m *Master) init(minions []string) {
|
||||
m.storage = map[string]apiserver.RESTStorage{
|
||||
"pods": registry.MakePodRegistryStorage(m.podRegistry, containerInfo, registry.MakeFirstFitScheduler(m.minions, m.podRegistry, m.random)),
|
||||
"replicationControllers": registry.MakeControllerRegistryStorage(m.controllerRegistry),
|
||||
"services": registry.MakeServiceRegistryStorage(m.serviceRegistry),
|
||||
"services": registry.MakeServiceRegistryStorage(m.serviceRegistry, cloud, m.minions),
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -17,20 +17,28 @@ package registry
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
|
||||
)
|
||||
|
||||
type ServiceRegistryStorage struct {
|
||||
registry ServiceRegistry
|
||||
cloud cloudprovider.Interface
|
||||
hosts []string
|
||||
}
|
||||
|
||||
func MakeServiceRegistryStorage(registry ServiceRegistry) apiserver.RESTStorage {
|
||||
return &ServiceRegistryStorage{registry: registry}
|
||||
func MakeServiceRegistryStorage(registry ServiceRegistry, cloud cloudprovider.Interface, hosts []string) apiserver.RESTStorage {
|
||||
return &ServiceRegistryStorage{
|
||||
registry: registry,
|
||||
cloud: cloud,
|
||||
hosts: hosts,
|
||||
}
|
||||
}
|
||||
|
||||
// GetServiceEnvironmentVariables populates a list of environment variables that are use
|
||||
@@ -76,6 +84,25 @@ func (sr *ServiceRegistryStorage) Get(id string) (interface{}, error) {
|
||||
}
|
||||
|
||||
func (sr *ServiceRegistryStorage) Delete(id string) error {
|
||||
svc, err := sr.Get(id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if svc.(api.Service).CreateExternalLoadBalancer {
|
||||
var balancer cloudprovider.TCPLoadBalancer
|
||||
if sr.cloud != nil {
|
||||
balancer, err = sr.cloud.TCPLoadBalancer()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if balancer != nil {
|
||||
err = balancer.DeleteTCPLoadBalancer(id, "us-central1")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
return sr.registry.DeleteService(id)
|
||||
}
|
||||
|
||||
@@ -87,7 +114,26 @@ func (sr *ServiceRegistryStorage) Extract(body string) (interface{}, error) {
|
||||
}
|
||||
|
||||
func (sr *ServiceRegistryStorage) Create(obj interface{}) error {
|
||||
return sr.registry.CreateService(obj.(api.Service))
|
||||
srv := obj.(api.Service)
|
||||
if srv.CreateExternalLoadBalancer {
|
||||
var balancer cloudprovider.TCPLoadBalancer
|
||||
if sr.cloud != nil {
|
||||
var err error
|
||||
balancer, err = sr.cloud.TCPLoadBalancer()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if balancer != nil {
|
||||
err := balancer.CreateTCPLoadBalancer(srv.ID, "us-central1", srv.Port, sr.hosts)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
return fmt.Errorf("requested an external service, but no cloud provider supplied.")
|
||||
}
|
||||
}
|
||||
return sr.registry.CreateService(srv)
|
||||
}
|
||||
|
||||
func (sr *ServiceRegistryStorage) Update(obj interface{}) error {
|
||||
|
||||
Reference in New Issue
Block a user