In OpenStack Mitaka, the name field for members was added as an optional field but does not exist in Liberty. Therefore the current implementation for lbaas v2 will not work in Liberty.
993 lines
28 KiB
Go
993 lines
28 KiB
Go
/*
|
|
Copyright 2016 The Kubernetes Authors All rights reserved.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package openstack
|
|
|
|
import (
|
|
"time"
|
|
|
|
"github.com/rackspace/gophercloud"
|
|
"github.com/rackspace/gophercloud/openstack/networking/v2/extensions/layer3/floatingips"
|
|
"github.com/rackspace/gophercloud/openstack/networking/v2/extensions/lbaas/members"
|
|
"github.com/rackspace/gophercloud/openstack/networking/v2/extensions/lbaas/monitors"
|
|
"github.com/rackspace/gophercloud/openstack/networking/v2/extensions/lbaas/pools"
|
|
"github.com/rackspace/gophercloud/openstack/networking/v2/extensions/lbaas/vips"
|
|
"github.com/rackspace/gophercloud/openstack/networking/v2/extensions/lbaas_v2/listeners"
|
|
"github.com/rackspace/gophercloud/openstack/networking/v2/extensions/lbaas_v2/loadbalancers"
|
|
v2_monitors "github.com/rackspace/gophercloud/openstack/networking/v2/extensions/lbaas_v2/monitors"
|
|
v2_pools "github.com/rackspace/gophercloud/openstack/networking/v2/extensions/lbaas_v2/pools"
|
|
neutron_ports "github.com/rackspace/gophercloud/openstack/networking/v2/ports"
|
|
"github.com/rackspace/gophercloud/pagination"
|
|
|
|
"fmt"
|
|
|
|
"github.com/golang/glog"
|
|
"k8s.io/kubernetes/pkg/api"
|
|
"k8s.io/kubernetes/pkg/api/service"
|
|
"k8s.io/kubernetes/pkg/cloudprovider"
|
|
)
|
|
|
|
// Note: when creating a new Loadbalancer (VM), it can take some time before it is ready for use,
|
|
// this timeout is used for waiting until the Loadbalancer provisioning status goes to ACTIVE state.
|
|
const loadbalancerActiveTimeoutSeconds = 120
|
|
const loadbalancerDeleteTimeoutSeconds = 30
|
|
|
|
// LoadBalancer implementation for LBaaS v1
|
|
type LbaasV1 struct {
|
|
LoadBalancer
|
|
}
|
|
|
|
// LoadBalancer implementation for LBaaS v2
|
|
type LbaasV2 struct {
|
|
LoadBalancer
|
|
}
|
|
|
|
func getPortIDByIP(client *gophercloud.ServiceClient, ipAddress string) (string, error) {
|
|
var portID string
|
|
|
|
err := neutron_ports.List(client, neutron_ports.ListOpts{}).EachPage(func(page pagination.Page) (bool, error) {
|
|
portList, err := neutron_ports.ExtractPorts(page)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
for _, port := range portList {
|
|
for _, ip := range port.FixedIPs {
|
|
if ip.IPAddress == ipAddress {
|
|
portID = port.ID
|
|
return false, nil
|
|
}
|
|
}
|
|
}
|
|
|
|
return true, nil
|
|
})
|
|
|
|
return portID, err
|
|
}
|
|
|
|
func getFloatingIPByPortID(client *gophercloud.ServiceClient, portID string) (*floatingips.FloatingIP, error) {
|
|
opts := floatingips.ListOpts{
|
|
PortID: portID,
|
|
}
|
|
pager := floatingips.List(client, opts)
|
|
|
|
floatingIPList := make([]floatingips.FloatingIP, 0, 1)
|
|
|
|
err := pager.EachPage(func(page pagination.Page) (bool, error) {
|
|
f, err := floatingips.ExtractFloatingIPs(page)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
floatingIPList = append(floatingIPList, f...)
|
|
if len(floatingIPList) > 1 {
|
|
return false, ErrMultipleResults
|
|
}
|
|
return true, nil
|
|
})
|
|
if err != nil {
|
|
if isNotFound(err) {
|
|
return nil, ErrNotFound
|
|
}
|
|
return nil, err
|
|
}
|
|
|
|
if len(floatingIPList) == 0 {
|
|
return nil, ErrNotFound
|
|
} else if len(floatingIPList) > 1 {
|
|
return nil, ErrMultipleResults
|
|
}
|
|
|
|
return &floatingIPList[0], nil
|
|
}
|
|
|
|
func getPoolByName(client *gophercloud.ServiceClient, name string) (*pools.Pool, error) {
|
|
opts := pools.ListOpts{
|
|
Name: name,
|
|
}
|
|
pager := pools.List(client, opts)
|
|
|
|
poolList := make([]pools.Pool, 0, 1)
|
|
|
|
err := pager.EachPage(func(page pagination.Page) (bool, error) {
|
|
p, err := pools.ExtractPools(page)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
poolList = append(poolList, p...)
|
|
if len(poolList) > 1 {
|
|
return false, ErrMultipleResults
|
|
}
|
|
return true, nil
|
|
})
|
|
if err != nil {
|
|
if isNotFound(err) {
|
|
return nil, ErrNotFound
|
|
}
|
|
return nil, err
|
|
}
|
|
|
|
if len(poolList) == 0 {
|
|
return nil, ErrNotFound
|
|
} else if len(poolList) > 1 {
|
|
return nil, ErrMultipleResults
|
|
}
|
|
|
|
return &poolList[0], nil
|
|
}
|
|
|
|
func getVipByName(client *gophercloud.ServiceClient, name string) (*vips.VirtualIP, error) {
|
|
opts := vips.ListOpts{
|
|
Name: name,
|
|
}
|
|
pager := vips.List(client, opts)
|
|
|
|
vipList := make([]vips.VirtualIP, 0, 1)
|
|
|
|
err := pager.EachPage(func(page pagination.Page) (bool, error) {
|
|
v, err := vips.ExtractVIPs(page)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
vipList = append(vipList, v...)
|
|
if len(vipList) > 1 {
|
|
return false, ErrMultipleResults
|
|
}
|
|
return true, nil
|
|
})
|
|
if err != nil {
|
|
if isNotFound(err) {
|
|
return nil, ErrNotFound
|
|
}
|
|
return nil, err
|
|
}
|
|
|
|
if len(vipList) == 0 {
|
|
return nil, ErrNotFound
|
|
} else if len(vipList) > 1 {
|
|
return nil, ErrMultipleResults
|
|
}
|
|
|
|
return &vipList[0], nil
|
|
}
|
|
|
|
func getLoadbalancerByName(client *gophercloud.ServiceClient, name string) (*loadbalancers.LoadBalancer, error) {
|
|
opts := loadbalancers.ListOpts{
|
|
Name: name,
|
|
}
|
|
pager := loadbalancers.List(client, opts)
|
|
|
|
loadbalancerList := make([]loadbalancers.LoadBalancer, 0, 1)
|
|
|
|
err := pager.EachPage(func(page pagination.Page) (bool, error) {
|
|
v, err := loadbalancers.ExtractLoadbalancers(page)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
loadbalancerList = append(loadbalancerList, v...)
|
|
if len(loadbalancerList) > 1 {
|
|
return false, ErrMultipleResults
|
|
}
|
|
return true, nil
|
|
})
|
|
if err != nil {
|
|
if isNotFound(err) {
|
|
return nil, ErrNotFound
|
|
}
|
|
return nil, err
|
|
}
|
|
|
|
if len(loadbalancerList) == 0 {
|
|
return nil, ErrNotFound
|
|
} else if len(loadbalancerList) > 1 {
|
|
return nil, ErrMultipleResults
|
|
}
|
|
|
|
return &loadbalancerList[0], nil
|
|
}
|
|
|
|
func waitLoadbalancerActiveProvisioningStatus(client *gophercloud.ServiceClient, loadbalancerID string) error {
|
|
start := time.Now().Second()
|
|
for {
|
|
loadbalancer, err := loadbalancers.Get(client, loadbalancerID).Extract()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if loadbalancer.ProvisioningStatus == "ACTIVE" {
|
|
return nil
|
|
}
|
|
|
|
time.Sleep(1 * time.Second)
|
|
|
|
if time.Now().Second()-start >= loadbalancerActiveTimeoutSeconds {
|
|
return fmt.Errorf("Loadbalancer failed to go into ACTIVE provisioning status within alloted time")
|
|
|
|
}
|
|
}
|
|
}
|
|
|
|
func waitLoadbalancerDeleted(client *gophercloud.ServiceClient, loadbalancerID string) error {
|
|
start := time.Now().Second()
|
|
for {
|
|
_, err := loadbalancers.Get(client, loadbalancerID).Extract()
|
|
if err != nil {
|
|
if err == ErrNotFound {
|
|
return nil
|
|
} else {
|
|
return err
|
|
}
|
|
}
|
|
|
|
time.Sleep(1 * time.Second)
|
|
|
|
if time.Now().Second()-start >= loadbalancerDeleteTimeoutSeconds {
|
|
return fmt.Errorf("Loadbalancer failed to delete within the alloted time")
|
|
}
|
|
|
|
}
|
|
}
|
|
|
|
func (lbaas *LbaasV2) GetLoadBalancer(service *api.Service) (*api.LoadBalancerStatus, bool, error) {
|
|
loadBalancerName := cloudprovider.GetLoadBalancerName(service)
|
|
loadbalancer, err := getLoadbalancerByName(lbaas.network, loadBalancerName)
|
|
if err == ErrNotFound {
|
|
return nil, false, nil
|
|
}
|
|
if loadbalancer == nil {
|
|
return nil, false, err
|
|
}
|
|
|
|
status := &api.LoadBalancerStatus{}
|
|
status.Ingress = []api.LoadBalancerIngress{{IP: loadbalancer.VipAddress}}
|
|
|
|
return status, true, err
|
|
}
|
|
|
|
// TODO: This code currently ignores 'region' and always creates a
|
|
// loadbalancer in only the current OpenStack region. We should take
|
|
// a list of regions (from config) and query/create loadbalancers in
|
|
// each region.
|
|
|
|
func (lbaas *LbaasV2) EnsureLoadBalancer(apiService *api.Service, hosts []string) (*api.LoadBalancerStatus, error) {
|
|
glog.V(4).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v, %v)", apiService.Namespace, apiService.Name, apiService.Spec.LoadBalancerIP, apiService.Spec.Ports, hosts, apiService.Annotations)
|
|
|
|
ports := apiService.Spec.Ports
|
|
if len(ports) > 1 {
|
|
return nil, fmt.Errorf("multiple ports are not yet supported in openstack load balancers")
|
|
} else if len(ports) == 0 {
|
|
return nil, fmt.Errorf("no ports provided to openstack load balancer")
|
|
}
|
|
|
|
// The service controller verified all the protocols match on the ports, just check and use the first one
|
|
// TODO: Convert all error messages to use an event recorder
|
|
if ports[0].Protocol != api.ProtocolTCP {
|
|
return nil, fmt.Errorf("Only TCP LoadBalancer is supported for openstack load balancers")
|
|
}
|
|
|
|
affinity := api.ServiceAffinityNone //apiService.Spec.SessionAffinity
|
|
var persistence *v2_pools.SessionPersistence
|
|
switch affinity {
|
|
case api.ServiceAffinityNone:
|
|
persistence = nil
|
|
case api.ServiceAffinityClientIP:
|
|
persistence = &v2_pools.SessionPersistence{Type: "SOURCE_IP"}
|
|
default:
|
|
return nil, fmt.Errorf("unsupported load balancer affinity: %v", affinity)
|
|
}
|
|
|
|
sourceRanges, err := service.GetLoadBalancerSourceRanges(apiService)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if !service.IsAllowAll(sourceRanges) {
|
|
return nil, fmt.Errorf("Source range restrictions are not supported for openstack load balancers")
|
|
}
|
|
|
|
glog.V(2).Infof("Checking if openstack load balancer already exists: %s", cloudprovider.GetLoadBalancerName(apiService))
|
|
_, exists, err := lbaas.GetLoadBalancer(apiService)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("error checking if openstack load balancer already exists: %v", err)
|
|
}
|
|
|
|
// TODO: Implement a more efficient update strategy for common changes than delete & create
|
|
// In particular, if we implement hosts update, we can get rid of UpdateHosts
|
|
if exists {
|
|
err := lbaas.EnsureLoadBalancerDeleted(apiService)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("error deleting existing openstack load balancer: %v", err)
|
|
}
|
|
}
|
|
|
|
lbmethod := v2_pools.LBMethod(lbaas.opts.LBMethod)
|
|
if lbmethod == "" {
|
|
lbmethod = v2_pools.LBMethodRoundRobin
|
|
}
|
|
name := cloudprovider.GetLoadBalancerName(apiService)
|
|
|
|
createOpts := loadbalancers.CreateOpts{
|
|
Name: name,
|
|
Description: fmt.Sprintf("Kubernetes external service %s", name),
|
|
VipSubnetID: lbaas.opts.SubnetId,
|
|
}
|
|
|
|
loadBalancerIP := apiService.Spec.LoadBalancerIP
|
|
if loadBalancerIP != "" {
|
|
createOpts.VipAddress = loadBalancerIP
|
|
}
|
|
|
|
loadbalancer, err := loadbalancers.Create(lbaas.network, createOpts).Extract()
|
|
if err != nil {
|
|
// cleanup what was created so far
|
|
_ = lbaas.EnsureLoadBalancerDeleted(apiService)
|
|
return nil, err
|
|
}
|
|
|
|
waitLoadbalancerActiveProvisioningStatus(lbaas.network, loadbalancer.ID)
|
|
|
|
listener, err := listeners.Create(lbaas.network, listeners.CreateOpts{
|
|
Name: name,
|
|
Protocol: listeners.ProtocolTCP,
|
|
ProtocolPort: (int)(ports[0].Port), //TODO: need to handle multi-port
|
|
LoadbalancerID: loadbalancer.ID,
|
|
}).Extract()
|
|
if err != nil {
|
|
// cleanup what was created so far
|
|
_ = lbaas.EnsureLoadBalancerDeleted(apiService)
|
|
return nil, err
|
|
}
|
|
|
|
waitLoadbalancerActiveProvisioningStatus(lbaas.network, loadbalancer.ID)
|
|
|
|
pool, err := v2_pools.Create(lbaas.network, v2_pools.CreateOpts{
|
|
Name: name,
|
|
Protocol: v2_pools.ProtocolTCP,
|
|
LBMethod: lbmethod,
|
|
ListenerID: listener.ID,
|
|
Persistence: persistence,
|
|
}).Extract()
|
|
if err != nil {
|
|
// cleanup what was created so far
|
|
_ = lbaas.EnsureLoadBalancerDeleted(apiService)
|
|
return nil, err
|
|
}
|
|
|
|
for _, host := range hosts {
|
|
addr, err := getAddressByName(lbaas.compute, host)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
waitLoadbalancerActiveProvisioningStatus(lbaas.network, loadbalancer.ID)
|
|
|
|
_, err = v2_pools.CreateAssociateMember(lbaas.network, pool.ID, v2_pools.MemberCreateOpts{
|
|
ProtocolPort: int(ports[0].NodePort), //TODO: need to handle multi-port
|
|
Address: addr,
|
|
SubnetID: lbaas.opts.SubnetId,
|
|
}).Extract()
|
|
if err != nil {
|
|
// cleanup what was created so far
|
|
_ = lbaas.EnsureLoadBalancerDeleted(apiService)
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
if lbaas.opts.CreateMonitor {
|
|
waitLoadbalancerActiveProvisioningStatus(lbaas.network, loadbalancer.ID)
|
|
|
|
_, err = v2_monitors.Create(lbaas.network, v2_monitors.CreateOpts{
|
|
PoolID: pool.ID,
|
|
Type: monitors.TypeTCP,
|
|
Delay: int(lbaas.opts.MonitorDelay.Duration.Seconds()),
|
|
Timeout: int(lbaas.opts.MonitorTimeout.Duration.Seconds()),
|
|
MaxRetries: int(lbaas.opts.MonitorMaxRetries),
|
|
}).Extract()
|
|
if err != nil {
|
|
// cleanup what was created so far
|
|
_ = lbaas.EnsureLoadBalancerDeleted(apiService)
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
status := &api.LoadBalancerStatus{}
|
|
|
|
status.Ingress = []api.LoadBalancerIngress{{IP: loadbalancer.VipAddress}}
|
|
|
|
if lbaas.opts.FloatingNetworkId != "" {
|
|
portID, err := getPortIDByIP(lbaas.network, loadbalancer.VipAddress)
|
|
if err != nil {
|
|
// cleanup what was created so far
|
|
_ = lbaas.EnsureLoadBalancerDeleted(apiService)
|
|
return nil, err
|
|
}
|
|
|
|
floatIPOpts := floatingips.CreateOpts{
|
|
FloatingNetworkID: lbaas.opts.FloatingNetworkId,
|
|
PortID: portID,
|
|
}
|
|
floatIP, err := floatingips.Create(lbaas.network, floatIPOpts).Extract()
|
|
if err != nil {
|
|
// cleanup what was created so far
|
|
_ = lbaas.EnsureLoadBalancerDeleted(apiService)
|
|
return nil, err
|
|
}
|
|
|
|
status.Ingress = append(status.Ingress, api.LoadBalancerIngress{IP: floatIP.FloatingIP})
|
|
}
|
|
|
|
return status, nil
|
|
}
|
|
|
|
func (lbaas *LbaasV2) UpdateLoadBalancer(service *api.Service, hosts []string) error {
|
|
loadBalancerName := cloudprovider.GetLoadBalancerName(service)
|
|
glog.V(4).Infof("UpdateLoadBalancer(%v, %v)", loadBalancerName, hosts)
|
|
|
|
ports := service.Spec.Ports
|
|
if len(ports) > 1 {
|
|
return fmt.Errorf("multiple ports are not yet supported in openstack load balancers")
|
|
} else if len(ports) == 0 {
|
|
return fmt.Errorf("no ports provided to openstack load balancer")
|
|
}
|
|
|
|
loadbalancer, err := getLoadbalancerByName(lbaas.network, loadBalancerName)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if loadbalancer == nil {
|
|
return fmt.Errorf("Loadbalancer %s does not exist", loadBalancerName)
|
|
}
|
|
|
|
// Set of member (addresses) that _should_ exist
|
|
addrs := map[string]bool{}
|
|
for _, host := range hosts {
|
|
addr, err := getAddressByName(lbaas.compute, host)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
addrs[addr] = true
|
|
}
|
|
|
|
// Iterate over members in each pool that _do_ exist
|
|
var poolID string
|
|
err = v2_pools.List(lbaas.network, v2_pools.ListOpts{LoadbalancerID: loadbalancer.ID}).EachPage(func(page pagination.Page) (bool, error) {
|
|
poolsList, err := v2_pools.ExtractPools(page)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
for _, pool := range poolsList {
|
|
poolID = pool.ID
|
|
err := v2_pools.ListAssociateMembers(lbaas.network, poolID, v2_pools.MemberListOpts{}).EachPage(func(page pagination.Page) (bool, error) {
|
|
membersList, err := v2_pools.ExtractMembers(page)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
for _, member := range membersList {
|
|
if _, found := addrs[member.Address]; found {
|
|
// Member already exists, remove from update list
|
|
delete(addrs, member.Address)
|
|
} else {
|
|
// Member needs to be deleted
|
|
err = v2_pools.DeleteMember(lbaas.network, poolID, member.ID).ExtractErr()
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
waitLoadbalancerActiveProvisioningStatus(lbaas.network, loadbalancer.ID)
|
|
}
|
|
}
|
|
|
|
return true, nil
|
|
})
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
}
|
|
return true, nil
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Anything left in addrs is a new member that needs to be added to a pool
|
|
for addr := range addrs {
|
|
_, err := v2_pools.CreateAssociateMember(lbaas.network, poolID, v2_pools.MemberCreateOpts{
|
|
Address: addr,
|
|
ProtocolPort: int(ports[0].NodePort),
|
|
SubnetID: lbaas.opts.SubnetId,
|
|
}).Extract()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
waitLoadbalancerActiveProvisioningStatus(lbaas.network, loadbalancer.ID)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (lbaas *LbaasV2) EnsureLoadBalancerDeleted(service *api.Service) error {
|
|
loadBalancerName := cloudprovider.GetLoadBalancerName(service)
|
|
glog.V(4).Infof("EnsureLoadBalancerDeleted(%v)", loadBalancerName)
|
|
|
|
loadbalancer, err := getLoadbalancerByName(lbaas.network, loadBalancerName)
|
|
if err != nil && err != ErrNotFound {
|
|
return err
|
|
}
|
|
if loadbalancer == nil {
|
|
return nil
|
|
}
|
|
|
|
if lbaas.opts.FloatingNetworkId != "" && loadbalancer != nil {
|
|
portID, err := getPortIDByIP(lbaas.network, loadbalancer.VipAddress)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
floatingIP, err := getFloatingIPByPortID(lbaas.network, portID)
|
|
if err != nil && err != ErrNotFound {
|
|
return err
|
|
}
|
|
if floatingIP != nil {
|
|
err = floatingips.Delete(lbaas.network, floatingIP.ID).ExtractErr()
|
|
if err != nil && !isNotFound(err) {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
// get all listeners associated with this loadbalancer
|
|
var listenerIDs []string
|
|
err = listeners.List(lbaas.network, listeners.ListOpts{LoadbalancerID: loadbalancer.ID}).EachPage(func(page pagination.Page) (bool, error) {
|
|
listenerList, err := listeners.ExtractListeners(page)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
for _, listener := range listenerList {
|
|
listenerIDs = append(listenerIDs, listener.ID)
|
|
}
|
|
|
|
return true, nil
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// get all pools associated with this loadbalancer
|
|
var poolIDs []string
|
|
err = v2_pools.List(lbaas.network, v2_pools.ListOpts{LoadbalancerID: loadbalancer.ID}).EachPage(func(page pagination.Page) (bool, error) {
|
|
poolsList, err := v2_pools.ExtractPools(page)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
for _, pool := range poolsList {
|
|
poolIDs = append(poolIDs, pool.ID)
|
|
}
|
|
|
|
return true, nil
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// get all members associated with each poolIDs
|
|
var memberIDs []string
|
|
for _, poolID := range poolIDs {
|
|
err := v2_pools.ListAssociateMembers(lbaas.network, poolID, v2_pools.MemberListOpts{}).EachPage(func(page pagination.Page) (bool, error) {
|
|
membersList, err := v2_pools.ExtractMembers(page)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
for _, member := range membersList {
|
|
memberIDs = append(memberIDs, member.ID)
|
|
}
|
|
|
|
return true, nil
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// get all monitors associated with each poolIDs
|
|
var monitorIDs []string
|
|
for _, poolID := range poolIDs {
|
|
err = v2_monitors.List(lbaas.network, v2_monitors.ListOpts{PoolID: poolID}).EachPage(func(page pagination.Page) (bool, error) {
|
|
monitorsList, err := v2_monitors.ExtractMonitors(page)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
for _, monitor := range monitorsList {
|
|
monitorIDs = append(monitorIDs, monitor.ID)
|
|
}
|
|
|
|
return true, nil
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// delete all monitors
|
|
for _, monitorID := range monitorIDs {
|
|
err := v2_monitors.Delete(lbaas.network, monitorID).ExtractErr()
|
|
if err != nil && !isNotFound(err) {
|
|
return err
|
|
}
|
|
waitLoadbalancerActiveProvisioningStatus(lbaas.network, loadbalancer.ID)
|
|
}
|
|
|
|
// delete all members and pools
|
|
for _, poolID := range poolIDs {
|
|
// delete all members for this pool
|
|
for _, memberID := range memberIDs {
|
|
err := v2_pools.DeleteMember(lbaas.network, poolID, memberID).ExtractErr()
|
|
if err != nil && !isNotFound(err) {
|
|
return err
|
|
}
|
|
waitLoadbalancerActiveProvisioningStatus(lbaas.network, loadbalancer.ID)
|
|
}
|
|
|
|
// delete pool
|
|
err := v2_pools.Delete(lbaas.network, poolID).ExtractErr()
|
|
if err != nil && !isNotFound(err) {
|
|
return err
|
|
}
|
|
waitLoadbalancerActiveProvisioningStatus(lbaas.network, loadbalancer.ID)
|
|
}
|
|
|
|
// delete all listeners
|
|
for _, listenerID := range listenerIDs {
|
|
err := listeners.Delete(lbaas.network, listenerID).ExtractErr()
|
|
if err != nil && !isNotFound(err) {
|
|
return err
|
|
}
|
|
waitLoadbalancerActiveProvisioningStatus(lbaas.network, loadbalancer.ID)
|
|
}
|
|
|
|
// delete loadbalancer
|
|
err = loadbalancers.Delete(lbaas.network, loadbalancer.ID).ExtractErr()
|
|
if err != nil && !isNotFound(err) {
|
|
return err
|
|
}
|
|
waitLoadbalancerDeleted(lbaas.network, loadbalancer.ID)
|
|
return nil
|
|
}
|
|
|
|
func (lb *LbaasV1) GetLoadBalancer(service *api.Service) (*api.LoadBalancerStatus, bool, error) {
|
|
loadBalancerName := cloudprovider.GetLoadBalancerName(service)
|
|
vip, err := getVipByName(lb.network, loadBalancerName)
|
|
if err == ErrNotFound {
|
|
return nil, false, nil
|
|
}
|
|
if vip == nil {
|
|
return nil, false, err
|
|
}
|
|
|
|
status := &api.LoadBalancerStatus{}
|
|
status.Ingress = []api.LoadBalancerIngress{{IP: vip.Address}}
|
|
|
|
return status, true, err
|
|
}
|
|
|
|
// TODO: This code currently ignores 'region' and always creates a
|
|
// loadbalancer in only the current OpenStack region. We should take
|
|
// a list of regions (from config) and query/create loadbalancers in
|
|
// each region.
|
|
|
|
func (lb *LbaasV1) EnsureLoadBalancer(apiService *api.Service, hosts []string) (*api.LoadBalancerStatus, error) {
|
|
glog.V(4).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v, %v)", apiService.Namespace, apiService.Name, apiService.Spec.LoadBalancerIP, apiService.Spec.Ports, hosts, apiService.Annotations)
|
|
|
|
ports := apiService.Spec.Ports
|
|
if len(ports) > 1 {
|
|
return nil, fmt.Errorf("multiple ports are not yet supported in openstack load balancers")
|
|
} else if len(ports) == 0 {
|
|
return nil, fmt.Errorf("no ports provided to openstack load balancer")
|
|
}
|
|
|
|
// The service controller verified all the protocols match on the ports, just check and use the first one
|
|
// TODO: Convert all error messages to use an event recorder
|
|
if ports[0].Protocol != api.ProtocolTCP {
|
|
return nil, fmt.Errorf("Only TCP LoadBalancer is supported for openstack load balancers")
|
|
}
|
|
|
|
affinity := apiService.Spec.SessionAffinity
|
|
var persistence *vips.SessionPersistence
|
|
switch affinity {
|
|
case api.ServiceAffinityNone:
|
|
persistence = nil
|
|
case api.ServiceAffinityClientIP:
|
|
persistence = &vips.SessionPersistence{Type: "SOURCE_IP"}
|
|
default:
|
|
return nil, fmt.Errorf("unsupported load balancer affinity: %v", affinity)
|
|
}
|
|
|
|
sourceRanges, err := service.GetLoadBalancerSourceRanges(apiService)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if !service.IsAllowAll(sourceRanges) {
|
|
return nil, fmt.Errorf("Source range restrictions are not supported for openstack load balancers")
|
|
}
|
|
|
|
glog.V(2).Infof("Checking if openstack load balancer already exists: %s", cloudprovider.GetLoadBalancerName(apiService))
|
|
_, exists, err := lb.GetLoadBalancer(apiService)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("error checking if openstack load balancer already exists: %v", err)
|
|
}
|
|
|
|
// TODO: Implement a more efficient update strategy for common changes than delete & create
|
|
// In particular, if we implement hosts update, we can get rid of UpdateHosts
|
|
if exists {
|
|
err := lb.EnsureLoadBalancerDeleted(apiService)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("error deleting existing openstack load balancer: %v", err)
|
|
}
|
|
}
|
|
|
|
lbmethod := lb.opts.LBMethod
|
|
if lbmethod == "" {
|
|
lbmethod = pools.LBMethodRoundRobin
|
|
}
|
|
name := cloudprovider.GetLoadBalancerName(apiService)
|
|
pool, err := pools.Create(lb.network, pools.CreateOpts{
|
|
Name: name,
|
|
Protocol: pools.ProtocolTCP,
|
|
SubnetID: lb.opts.SubnetId,
|
|
LBMethod: lbmethod,
|
|
}).Extract()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
for _, host := range hosts {
|
|
addr, err := getAddressByName(lb.compute, host)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
_, err = members.Create(lb.network, members.CreateOpts{
|
|
PoolID: pool.ID,
|
|
ProtocolPort: int(ports[0].NodePort), //TODO: need to handle multi-port
|
|
Address: addr,
|
|
}).Extract()
|
|
if err != nil {
|
|
pools.Delete(lb.network, pool.ID)
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
var mon *monitors.Monitor
|
|
if lb.opts.CreateMonitor {
|
|
mon, err = monitors.Create(lb.network, monitors.CreateOpts{
|
|
Type: monitors.TypeTCP,
|
|
Delay: int(lb.opts.MonitorDelay.Duration.Seconds()),
|
|
Timeout: int(lb.opts.MonitorTimeout.Duration.Seconds()),
|
|
MaxRetries: int(lb.opts.MonitorMaxRetries),
|
|
}).Extract()
|
|
if err != nil {
|
|
pools.Delete(lb.network, pool.ID)
|
|
return nil, err
|
|
}
|
|
|
|
_, err = pools.AssociateMonitor(lb.network, pool.ID, mon.ID).Extract()
|
|
if err != nil {
|
|
monitors.Delete(lb.network, mon.ID)
|
|
pools.Delete(lb.network, pool.ID)
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
createOpts := vips.CreateOpts{
|
|
Name: name,
|
|
Description: fmt.Sprintf("Kubernetes external service %s", name),
|
|
Protocol: "TCP",
|
|
ProtocolPort: int(ports[0].Port), //TODO: need to handle multi-port
|
|
PoolID: pool.ID,
|
|
SubnetID: lb.opts.SubnetId,
|
|
Persistence: persistence,
|
|
}
|
|
|
|
loadBalancerIP := apiService.Spec.LoadBalancerIP
|
|
if loadBalancerIP != "" {
|
|
createOpts.Address = loadBalancerIP
|
|
}
|
|
|
|
vip, err := vips.Create(lb.network, createOpts).Extract()
|
|
if err != nil {
|
|
if mon != nil {
|
|
monitors.Delete(lb.network, mon.ID)
|
|
}
|
|
pools.Delete(lb.network, pool.ID)
|
|
return nil, err
|
|
}
|
|
|
|
status := &api.LoadBalancerStatus{}
|
|
|
|
status.Ingress = []api.LoadBalancerIngress{{IP: vip.Address}}
|
|
|
|
if lb.opts.FloatingNetworkId != "" {
|
|
floatIPOpts := floatingips.CreateOpts{
|
|
FloatingNetworkID: lb.opts.FloatingNetworkId,
|
|
PortID: vip.PortID,
|
|
}
|
|
floatIP, err := floatingips.Create(lb.network, floatIPOpts).Extract()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
status.Ingress = append(status.Ingress, api.LoadBalancerIngress{IP: floatIP.FloatingIP})
|
|
}
|
|
|
|
return status, nil
|
|
|
|
}
|
|
|
|
func (lb *LbaasV1) UpdateLoadBalancer(service *api.Service, hosts []string) error {
|
|
loadBalancerName := cloudprovider.GetLoadBalancerName(service)
|
|
glog.V(4).Infof("UpdateLoadBalancer(%v, %v)", loadBalancerName, hosts)
|
|
|
|
vip, err := getVipByName(lb.network, loadBalancerName)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Set of member (addresses) that _should_ exist
|
|
addrs := map[string]bool{}
|
|
for _, host := range hosts {
|
|
addr, err := getAddressByName(lb.compute, host)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
addrs[addr] = true
|
|
}
|
|
|
|
// Iterate over members that _do_ exist
|
|
pager := members.List(lb.network, members.ListOpts{PoolID: vip.PoolID})
|
|
err = pager.EachPage(func(page pagination.Page) (bool, error) {
|
|
memList, err := members.ExtractMembers(page)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
for _, member := range memList {
|
|
if _, found := addrs[member.Address]; found {
|
|
// Member already exists
|
|
delete(addrs, member.Address)
|
|
} else {
|
|
// Member needs to be deleted
|
|
err = members.Delete(lb.network, member.ID).ExtractErr()
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
}
|
|
}
|
|
|
|
return true, nil
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Anything left in addrs is a new member that needs to be added
|
|
for addr := range addrs {
|
|
_, err := members.Create(lb.network, members.CreateOpts{
|
|
PoolID: vip.PoolID,
|
|
Address: addr,
|
|
ProtocolPort: vip.ProtocolPort,
|
|
}).Extract()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (lb *LbaasV1) EnsureLoadBalancerDeleted(service *api.Service) error {
|
|
loadBalancerName := cloudprovider.GetLoadBalancerName(service)
|
|
glog.V(4).Infof("EnsureLoadBalancerDeleted(%v)", loadBalancerName)
|
|
|
|
vip, err := getVipByName(lb.network, loadBalancerName)
|
|
if err != nil && err != ErrNotFound {
|
|
return err
|
|
}
|
|
|
|
if lb.opts.FloatingNetworkId != "" && vip != nil {
|
|
floatingIP, err := getFloatingIPByPortID(lb.network, vip.PortID)
|
|
if err != nil && !isNotFound(err) {
|
|
return err
|
|
}
|
|
if floatingIP != nil {
|
|
err = floatingips.Delete(lb.network, floatingIP.ID).ExtractErr()
|
|
if err != nil && !isNotFound(err) {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
// We have to delete the VIP before the pool can be deleted,
|
|
// so no point continuing if this fails.
|
|
if vip != nil {
|
|
err := vips.Delete(lb.network, vip.ID).ExtractErr()
|
|
if err != nil && !isNotFound(err) {
|
|
return err
|
|
}
|
|
}
|
|
|
|
var pool *pools.Pool
|
|
if vip != nil {
|
|
pool, err = pools.Get(lb.network, vip.PoolID).Extract()
|
|
if err != nil && !isNotFound(err) {
|
|
return err
|
|
}
|
|
} else {
|
|
// The VIP is gone, but it is conceivable that a Pool
|
|
// still exists that we failed to delete on some
|
|
// previous occasion. Make a best effort attempt to
|
|
// cleanup any pools with the same name as the VIP.
|
|
pool, err = getPoolByName(lb.network, service.Name)
|
|
if err != nil && err != ErrNotFound {
|
|
return err
|
|
}
|
|
}
|
|
|
|
if pool != nil {
|
|
for _, monId := range pool.MonitorIDs {
|
|
_, err = pools.DisassociateMonitor(lb.network, pool.ID, monId).Extract()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = monitors.Delete(lb.network, monId).ExtractErr()
|
|
if err != nil && !isNotFound(err) {
|
|
return err
|
|
}
|
|
}
|
|
err = pools.Delete(lb.network, pool.ID).ExtractErr()
|
|
if err != nil && !isNotFound(err) {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|