kubernetes/pkg/cloudprovider/openstack/openstack.go
Justin Santa Barbara 3884d5fc59 Add LoadBalancer status to ServiceStatus
This will replace publicIPs
2015-05-22 18:27:05 -04:00

694 lines
18 KiB
Go

/*
Copyright 2014 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package openstack
import (
"errors"
"fmt"
"io"
"net"
"net/http"
"regexp"
"time"
"code.google.com/p/gcfg"
"github.com/rackspace/gophercloud"
"github.com/rackspace/gophercloud/openstack"
"github.com/rackspace/gophercloud/openstack/compute/v2/flavors"
"github.com/rackspace/gophercloud/openstack/compute/v2/servers"
"github.com/rackspace/gophercloud/openstack/networking/v2/extensions/lbaas/members"
"github.com/rackspace/gophercloud/openstack/networking/v2/extensions/lbaas/monitors"
"github.com/rackspace/gophercloud/openstack/networking/v2/extensions/lbaas/pools"
"github.com/rackspace/gophercloud/openstack/networking/v2/extensions/lbaas/vips"
"github.com/rackspace/gophercloud/pagination"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource"
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider"
"github.com/golang/glog"
)
var ErrNotFound = errors.New("Failed to find object")
var ErrMultipleResults = errors.New("Multiple results where only one expected")
var ErrNoAddressFound = errors.New("No address found for host")
var ErrAttrNotFound = errors.New("Expected attribute not found")
const (
MiB = 1024 * 1024
GB = 1000 * 1000 * 1000
)
// encoding.TextUnmarshaler interface for time.Duration
type MyDuration struct {
time.Duration
}
func (d *MyDuration) UnmarshalText(text []byte) error {
res, err := time.ParseDuration(string(text))
if err != nil {
return err
}
d.Duration = res
return nil
}
type LoadBalancerOpts struct {
SubnetId string `gcfg:"subnet-id"` // required
LBMethod string `gfcg:"lb-method"`
CreateMonitor bool `gcfg:"create-monitor"`
MonitorDelay MyDuration `gcfg:"monitor-delay"`
MonitorTimeout MyDuration `gcfg:"monitor-timeout"`
MonitorMaxRetries uint `gcfg:"monitor-max-retries"`
}
// OpenStack is an implementation of cloud provider Interface for OpenStack.
type OpenStack struct {
provider *gophercloud.ProviderClient
region string
lbOpts LoadBalancerOpts
}
type Config struct {
Global struct {
AuthUrl string `gcfg:"auth-url"`
Username string
UserId string `gcfg:"user-id"`
Password string
ApiKey string `gcfg:"api-key"`
TenantId string `gcfg:"tenant-id"`
TenantName string `gcfg:"tenant-name"`
DomainId string `gcfg:"domain-id"`
DomainName string `gcfg:"domain-name"`
Region string
}
LoadBalancer LoadBalancerOpts
}
func init() {
cloudprovider.RegisterCloudProvider("openstack", func(config io.Reader) (cloudprovider.Interface, error) {
cfg, err := readConfig(config)
if err != nil {
return nil, err
}
return newOpenStack(cfg)
})
}
func (cfg Config) toAuthOptions() gophercloud.AuthOptions {
return gophercloud.AuthOptions{
IdentityEndpoint: cfg.Global.AuthUrl,
Username: cfg.Global.Username,
UserID: cfg.Global.UserId,
Password: cfg.Global.Password,
APIKey: cfg.Global.ApiKey,
TenantID: cfg.Global.TenantId,
TenantName: cfg.Global.TenantName,
// Persistent service, so we need to be able to renew tokens.
AllowReauth: true,
}
}
func readConfig(config io.Reader) (Config, error) {
if config == nil {
err := fmt.Errorf("no OpenStack cloud provider config file given")
return Config{}, err
}
var cfg Config
err := gcfg.ReadInto(&cfg, config)
return cfg, err
}
func newOpenStack(cfg Config) (*OpenStack, error) {
provider, err := openstack.AuthenticatedClient(cfg.toAuthOptions())
if err != nil {
return nil, err
}
os := OpenStack{
provider: provider,
region: cfg.Global.Region,
lbOpts: cfg.LoadBalancer,
}
return &os, nil
}
type Instances struct {
compute *gophercloud.ServiceClient
flavor_to_resource map[string]*api.NodeResources // keyed by flavor id
}
// Instances returns an implementation of Instances for OpenStack.
func (os *OpenStack) Instances() (cloudprovider.Instances, bool) {
glog.V(4).Info("openstack.Instances() called")
compute, err := openstack.NewComputeV2(os.provider, gophercloud.EndpointOpts{
Region: os.region,
})
if err != nil {
glog.Warningf("Failed to find compute endpoint: %v", err)
return nil, false
}
pager := flavors.ListDetail(compute, nil)
flavor_to_resource := make(map[string]*api.NodeResources)
err = pager.EachPage(func(page pagination.Page) (bool, error) {
flavorList, err := flavors.ExtractFlavors(page)
if err != nil {
return false, err
}
for _, flavor := range flavorList {
rsrc := api.NodeResources{
Capacity: api.ResourceList{
api.ResourceCPU: *resource.NewQuantity(int64(flavor.VCPUs), resource.DecimalSI),
api.ResourceMemory: *resource.NewQuantity(int64(flavor.RAM)*MiB, resource.BinarySI),
"openstack.org/disk": *resource.NewQuantity(int64(flavor.Disk)*GB, resource.DecimalSI),
"openstack.org/rxTxFactor": *resource.NewMilliQuantity(int64(flavor.RxTxFactor)*1000, resource.DecimalSI),
"openstack.org/swap": *resource.NewQuantity(int64(flavor.Swap)*MiB, resource.BinarySI),
},
}
flavor_to_resource[flavor.ID] = &rsrc
}
return true, nil
})
if err != nil {
glog.Warningf("Failed to find compute flavors: %v", err)
return nil, false
}
glog.V(3).Infof("Found %v compute flavors", len(flavor_to_resource))
glog.V(1).Info("Claiming to support Instances")
return &Instances{compute, flavor_to_resource}, true
}
func (i *Instances) List(name_filter string) ([]string, error) {
glog.V(4).Infof("openstack List(%v) called", name_filter)
opts := servers.ListOpts{
Name: name_filter,
Status: "ACTIVE",
}
pager := servers.List(i.compute, opts)
ret := make([]string, 0)
err := pager.EachPage(func(page pagination.Page) (bool, error) {
sList, err := servers.ExtractServers(page)
if err != nil {
return false, err
}
for _, server := range sList {
ret = append(ret, server.Name)
}
return true, nil
})
if err != nil {
return nil, err
}
glog.V(3).Infof("Found %v instances matching %v: %v",
len(ret), name_filter, ret)
return ret, nil
}
func getServerByName(client *gophercloud.ServiceClient, name string) (*servers.Server, error) {
opts := servers.ListOpts{
Name: fmt.Sprintf("^%s$", regexp.QuoteMeta(name)),
Status: "ACTIVE",
}
pager := servers.List(client, opts)
serverList := make([]servers.Server, 0, 1)
err := pager.EachPage(func(page pagination.Page) (bool, error) {
s, err := servers.ExtractServers(page)
if err != nil {
return false, err
}
serverList = append(serverList, s...)
if len(serverList) > 1 {
return false, ErrMultipleResults
}
return true, nil
})
if err != nil {
return nil, err
}
if len(serverList) == 0 {
return nil, ErrNotFound
} else if len(serverList) > 1 {
return nil, ErrMultipleResults
}
return &serverList[0], nil
}
func findAddrs(netblob interface{}) []string {
// Run-time types for the win :(
ret := []string{}
list, ok := netblob.([]interface{})
if !ok {
return ret
}
for _, item := range list {
props, ok := item.(map[string]interface{})
if !ok {
continue
}
tmp, ok := props["addr"]
if !ok {
continue
}
addr, ok := tmp.(string)
if !ok {
continue
}
ret = append(ret, addr)
}
return ret
}
func getAddressByName(api *gophercloud.ServiceClient, name string) (string, error) {
srv, err := getServerByName(api, name)
if err != nil {
return "", err
}
var s string
if s == "" {
if tmp := findAddrs(srv.Addresses["private"]); len(tmp) >= 1 {
s = tmp[0]
}
}
if s == "" {
if tmp := findAddrs(srv.Addresses["public"]); len(tmp) >= 1 {
s = tmp[0]
}
}
if s == "" {
s = srv.AccessIPv4
}
if s == "" {
s = srv.AccessIPv6
}
if s == "" {
return "", ErrNoAddressFound
}
return s, nil
}
func (i *Instances) NodeAddresses(name string) ([]api.NodeAddress, error) {
glog.V(4).Infof("NodeAddresses(%v) called", name)
srv, err := getServerByName(i.compute, name)
if err != nil {
return nil, err
}
addrs := []api.NodeAddress{}
for _, addr := range findAddrs(srv.Addresses["private"]) {
addrs = append(addrs, api.NodeAddress{
Type: api.NodeInternalIP,
Address: addr,
})
}
for _, addr := range findAddrs(srv.Addresses["public"]) {
addrs = append(addrs, api.NodeAddress{
Type: api.NodeExternalIP,
Address: addr,
})
}
// AccessIPs are usually duplicates of "public" addresses.
api.AddToNodeAddresses(&addrs,
api.NodeAddress{
Type: api.NodeExternalIP,
Address: srv.AccessIPv6,
},
api.NodeAddress{
Type: api.NodeExternalIP,
Address: srv.AccessIPv4,
},
)
glog.V(4).Infof("NodeAddresses(%v) => %v", name, addrs)
return addrs, nil
}
// ExternalID returns the cloud provider ID of the specified instance.
func (i *Instances) ExternalID(name string) (string, error) {
srv, err := getServerByName(i.compute, name)
if err != nil {
return "", err
}
return srv.ID, nil
}
func (i *Instances) GetNodeResources(name string) (*api.NodeResources, error) {
glog.V(4).Infof("GetNodeResources(%v) called", name)
srv, err := getServerByName(i.compute, name)
if err != nil {
return nil, err
}
s, ok := srv.Flavor["id"]
if !ok {
return nil, ErrAttrNotFound
}
flavId, ok := s.(string)
if !ok {
return nil, ErrAttrNotFound
}
rsrc, ok := i.flavor_to_resource[flavId]
if !ok {
return nil, ErrNotFound
}
glog.V(4).Infof("GetNodeResources(%v) => %v", name, rsrc)
return rsrc, nil
}
func (os *OpenStack) Clusters() (cloudprovider.Clusters, bool) {
return nil, false
}
type LoadBalancer struct {
network *gophercloud.ServiceClient
compute *gophercloud.ServiceClient
opts LoadBalancerOpts
}
func (os *OpenStack) TCPLoadBalancer() (cloudprovider.TCPLoadBalancer, bool) {
glog.V(4).Info("openstack.TCPLoadBalancer() called")
// TODO: Search for and support Rackspace loadbalancer API, and others.
network, err := openstack.NewNetworkV2(os.provider, gophercloud.EndpointOpts{
Region: os.region,
})
if err != nil {
glog.Warningf("Failed to find neutron endpoint: %v", err)
return nil, false
}
compute, err := openstack.NewComputeV2(os.provider, gophercloud.EndpointOpts{
Region: os.region,
})
if err != nil {
glog.Warningf("Failed to find compute endpoint: %v", err)
return nil, false
}
glog.V(1).Info("Claiming to support TCPLoadBalancer")
return &LoadBalancer{network, compute, os.lbOpts}, true
}
func getVipByName(client *gophercloud.ServiceClient, name string) (*vips.VirtualIP, error) {
opts := vips.ListOpts{
Name: name,
}
pager := vips.List(client, opts)
vipList := make([]vips.VirtualIP, 0, 1)
err := pager.EachPage(func(page pagination.Page) (bool, error) {
v, err := vips.ExtractVIPs(page)
if err != nil {
return false, err
}
vipList = append(vipList, v...)
if len(vipList) > 1 {
return false, ErrMultipleResults
}
return true, nil
})
if err != nil {
return nil, err
}
if len(vipList) == 0 {
return nil, ErrNotFound
} else if len(vipList) > 1 {
return nil, ErrMultipleResults
}
return &vipList[0], nil
}
func (lb *LoadBalancer) GetTCPLoadBalancer(name, region string) (*api.LoadBalancerStatus, bool, error) {
vip, err := getVipByName(lb.network, name)
if err == ErrNotFound {
return nil, false, nil
}
if vip == nil {
return nil, false, err
}
status := &api.LoadBalancerStatus{}
status.Ingress = []api.LoadBalancerIngress{{IP: vip.Address}}
return status, true, err
}
// TODO: This code currently ignores 'region' and always creates a
// loadbalancer in only the current OpenStack region. We should take
// a list of regions (from config) and query/create loadbalancers in
// each region.
func (lb *LoadBalancer) CreateTCPLoadBalancer(name, region string, externalIP net.IP, ports []int, hosts []string, affinity api.ServiceAffinity) (*api.LoadBalancerStatus, error) {
glog.V(4).Infof("CreateTCPLoadBalancer(%v, %v, %v, %v, %v, %v)", name, region, externalIP, ports, hosts, affinity)
if len(ports) > 1 {
return nil, fmt.Errorf("multiple ports are not yet supported in openstack load balancers")
}
var persistence *vips.SessionPersistence
switch affinity {
case api.ServiceAffinityNone:
persistence = nil
case api.ServiceAffinityClientIP:
persistence = &vips.SessionPersistence{Type: "SOURCE_IP"}
default:
return nil, fmt.Errorf("unsupported load balancer affinity: %v", affinity)
}
lbmethod := lb.opts.LBMethod
if lbmethod == "" {
lbmethod = pools.LBMethodRoundRobin
}
pool, err := pools.Create(lb.network, pools.CreateOpts{
Name: name,
Protocol: pools.ProtocolTCP,
SubnetID: lb.opts.SubnetId,
LBMethod: lbmethod,
}).Extract()
if err != nil {
return nil, err
}
for _, host := range hosts {
addr, err := getAddressByName(lb.compute, host)
if err != nil {
return nil, err
}
_, err = members.Create(lb.network, members.CreateOpts{
PoolID: pool.ID,
ProtocolPort: ports[0], //TODO: need to handle multi-port
Address: addr,
}).Extract()
if err != nil {
pools.Delete(lb.network, pool.ID)
return nil, err
}
}
var mon *monitors.Monitor
if lb.opts.CreateMonitor {
mon, err = monitors.Create(lb.network, monitors.CreateOpts{
Type: monitors.TypeTCP,
Delay: int(lb.opts.MonitorDelay.Duration.Seconds()),
Timeout: int(lb.opts.MonitorTimeout.Duration.Seconds()),
MaxRetries: int(lb.opts.MonitorMaxRetries),
}).Extract()
if err != nil {
pools.Delete(lb.network, pool.ID)
return nil, err
}
_, err = pools.AssociateMonitor(lb.network, pool.ID, mon.ID).Extract()
if err != nil {
monitors.Delete(lb.network, mon.ID)
pools.Delete(lb.network, pool.ID)
return nil, err
}
}
vip, err := vips.Create(lb.network, vips.CreateOpts{
Name: name,
Description: fmt.Sprintf("Kubernetes external service %s", name),
Address: externalIP.String(),
Protocol: "TCP",
ProtocolPort: ports[0], //TODO: need to handle multi-port
PoolID: pool.ID,
Persistence: persistence,
}).Extract()
if err != nil {
if mon != nil {
monitors.Delete(lb.network, mon.ID)
}
pools.Delete(lb.network, pool.ID)
return nil, err
}
status := &api.LoadBalancerStatus{}
status.Ingress = []api.LoadBalancerIngress{{IP: vip.Address}}
return status, nil
}
func (lb *LoadBalancer) UpdateTCPLoadBalancer(name, region string, hosts []string) error {
glog.V(4).Infof("UpdateTCPLoadBalancer(%v, %v, %v)", name, region, hosts)
vip, err := getVipByName(lb.network, name)
if err != nil {
return err
}
// Set of member (addresses) that _should_ exist
addrs := map[string]bool{}
for _, host := range hosts {
addr, err := getAddressByName(lb.compute, host)
if err != nil {
return err
}
addrs[addr] = true
}
// Iterate over members that _do_ exist
pager := members.List(lb.network, members.ListOpts{PoolID: vip.PoolID})
err = pager.EachPage(func(page pagination.Page) (bool, error) {
memList, err := members.ExtractMembers(page)
if err != nil {
return false, err
}
for _, member := range memList {
if _, found := addrs[member.Address]; found {
// Member already exists
delete(addrs, member.Address)
} else {
// Member needs to be deleted
err = members.Delete(lb.network, member.ID).ExtractErr()
if err != nil {
return false, err
}
}
}
return true, nil
})
if err != nil {
return err
}
// Anything left in addrs is a new member that needs to be added
for addr := range addrs {
_, err := members.Create(lb.network, members.CreateOpts{
PoolID: vip.PoolID,
Address: addr,
ProtocolPort: vip.ProtocolPort,
}).Extract()
if err != nil {
return err
}
}
return nil
}
func (lb *LoadBalancer) EnsureTCPLoadBalancerDeleted(name, region string) error {
glog.V(4).Infof("EnsureTCPLoadBalancerDeleted(%v, %v)", name, region)
// TODO(#8352): Because we look up the pool using the VIP object, if the VIP
// is already gone we can't attempt to delete the pool. We should instead
// continue even if the VIP doesn't exist and attempt to delete the pool by
// name.
vip, vipErr := getVipByName(lb.network, name)
if vipErr == ErrNotFound {
return nil
} else if vipErr != nil {
return vipErr
}
// It's ok if the pool doesn't exist, as we may still need to delete the vip
// (although I don't believe the system should ever be in that state).
pool, poolErr := pools.Get(lb.network, vip.PoolID).Extract()
if poolErr != nil {
detailedErr, ok := poolErr.(*gophercloud.UnexpectedResponseCodeError)
if !ok || detailedErr.Actual != http.StatusNotFound {
return poolErr
}
}
poolExists := (poolErr == nil)
// We have to delete the VIP before the pool can be deleted, so we can't
// continue on if this fails.
// TODO(#8352): Only do this if the VIP exists once we can delete pools by
// name rather than by ID.
err := vips.Delete(lb.network, vip.ID).ExtractErr()
if err != nil && err != ErrNotFound {
return err
}
// Ignore errors for everything following here
if poolExists {
for _, monId := range pool.MonitorIDs {
// TODO(#8352): Delete the monitor, don't just disassociate it.
pools.DisassociateMonitor(lb.network, pool.ID, monId)
}
pools.Delete(lb.network, pool.ID)
}
return nil
}
func (os *OpenStack) Zones() (cloudprovider.Zones, bool) {
glog.V(1).Info("Claiming to support Zones")
return os, true
}
func (os *OpenStack) GetZone() (cloudprovider.Zone, error) {
glog.V(1).Infof("Current zone is %v", os.region)
return cloudprovider.Zone{Region: os.region}, nil
}
func (os *OpenStack) Routes() (cloudprovider.Routes, bool) {
return nil, false
}