Merge pull request #8366 from justinsb/idempotent_loadbalancer

Make LoadBalancer creation more self-healing; don't delete on AWS
This commit is contained in:
Saad Ali
2015-08-20 14:30:55 -07:00
13 changed files with 629 additions and 182 deletions

View File

@@ -80,8 +80,8 @@ type TCPLoadBalancer interface {
// GetTCPLoadBalancer returns whether the specified load balancer exists, and // GetTCPLoadBalancer returns whether the specified load balancer exists, and
// if so, what its status is. // if so, what its status is.
GetTCPLoadBalancer(name, region string) (status *api.LoadBalancerStatus, exists bool, err error) GetTCPLoadBalancer(name, region string) (status *api.LoadBalancerStatus, exists bool, err error)
// CreateTCPLoadBalancer creates a new tcp load balancer. Returns the status of the balancer // EnsureTCPLoadBalancer creates a new tcp load balancer, or updates an existing one. Returns the status of the balancer
CreateTCPLoadBalancer(name, region string, externalIP net.IP, ports []*api.ServicePort, hosts []string, affinityType api.ServiceAffinity) (*api.LoadBalancerStatus, error) EnsureTCPLoadBalancer(name, region string, externalIP net.IP, ports []*api.ServicePort, hosts []string, affinityType api.ServiceAffinity) (*api.LoadBalancerStatus, error)
// UpdateTCPLoadBalancer updates hosts under the specified load balancer. // UpdateTCPLoadBalancer updates hosts under the specified load balancer.
UpdateTCPLoadBalancer(name, region string, hosts []string) error UpdateTCPLoadBalancer(name, region string, hosts []string) error
// EnsureTCPLoadBalancerDeleted deletes the specified load balancer if it // EnsureTCPLoadBalancerDeleted deletes the specified load balancer if it

View File

@@ -109,6 +109,16 @@ type ELB interface {
DescribeLoadBalancers(*elb.DescribeLoadBalancersInput) (*elb.DescribeLoadBalancersOutput, error) DescribeLoadBalancers(*elb.DescribeLoadBalancersInput) (*elb.DescribeLoadBalancersOutput, error)
RegisterInstancesWithLoadBalancer(*elb.RegisterInstancesWithLoadBalancerInput) (*elb.RegisterInstancesWithLoadBalancerOutput, error) RegisterInstancesWithLoadBalancer(*elb.RegisterInstancesWithLoadBalancerInput) (*elb.RegisterInstancesWithLoadBalancerOutput, error)
DeregisterInstancesFromLoadBalancer(*elb.DeregisterInstancesFromLoadBalancerInput) (*elb.DeregisterInstancesFromLoadBalancerOutput, error) DeregisterInstancesFromLoadBalancer(*elb.DeregisterInstancesFromLoadBalancerInput) (*elb.DeregisterInstancesFromLoadBalancerOutput, error)
DetachLoadBalancerFromSubnets(*elb.DetachLoadBalancerFromSubnetsInput) (*elb.DetachLoadBalancerFromSubnetsOutput, error)
AttachLoadBalancerToSubnets(*elb.AttachLoadBalancerToSubnetsInput) (*elb.AttachLoadBalancerToSubnetsOutput, error)
CreateLoadBalancerListeners(*elb.CreateLoadBalancerListenersInput) (*elb.CreateLoadBalancerListenersOutput, error)
DeleteLoadBalancerListeners(*elb.DeleteLoadBalancerListenersInput) (*elb.DeleteLoadBalancerListenersOutput, error)
ApplySecurityGroupsToLoadBalancer(*elb.ApplySecurityGroupsToLoadBalancerInput) (*elb.ApplySecurityGroupsToLoadBalancerOutput, error)
ConfigureHealthCheck(*elb.ConfigureHealthCheckInput) (*elb.ConfigureHealthCheckOutput, error)
} }
// This is a simple pass-through of the Autoscaling client interface, which allows for testing // This is a simple pass-through of the Autoscaling client interface, which allows for testing
@@ -1581,11 +1591,10 @@ func (s *AWSCloud) createTags(request *ec2.CreateTagsInput) (*ec2.CreateTagsOutp
} }
} }
// CreateTCPLoadBalancer implements TCPLoadBalancer.CreateTCPLoadBalancer // EnsureTCPLoadBalancer implements TCPLoadBalancer.EnsureTCPLoadBalancer
// TODO(justinsb): This must be idempotent
// TODO(justinsb) It is weird that these take a region. I suspect it won't work cross-region anwyay. // TODO(justinsb) It is weird that these take a region. I suspect it won't work cross-region anwyay.
func (s *AWSCloud) CreateTCPLoadBalancer(name, region string, publicIP net.IP, ports []*api.ServicePort, hosts []string, affinity api.ServiceAffinity) (*api.LoadBalancerStatus, error) { func (s *AWSCloud) EnsureTCPLoadBalancer(name, region string, publicIP net.IP, ports []*api.ServicePort, hosts []string, affinity api.ServiceAffinity) (*api.LoadBalancerStatus, error) {
glog.V(2).Infof("CreateTCPLoadBalancer(%v, %v, %v, %v, %v)", name, region, publicIP, ports, hosts) glog.V(2).Infof("EnsureTCPLoadBalancer(%v, %v, %v, %v, %v)", name, region, publicIP, ports, hosts)
elbClient, err := s.getELBClient(region) elbClient, err := s.getELBClient(region)
if err != nil { if err != nil {
@@ -1616,7 +1625,7 @@ func (s *AWSCloud) CreateTCPLoadBalancer(name, region string, publicIP net.IP, p
} }
// Construct list of configured subnets // Construct list of configured subnets
subnetIds := []*string{} subnetIDs := []string{}
{ {
request := &ec2.DescribeSubnetsInput{} request := &ec2.DescribeSubnetsInput{}
filters := []*ec2.Filter{} filters := []*ec2.Filter{}
@@ -1632,7 +1641,7 @@ func (s *AWSCloud) CreateTCPLoadBalancer(name, region string, publicIP net.IP, p
// zones := []string{} // zones := []string{}
for _, subnet := range subnets { for _, subnet := range subnets {
subnetIds = append(subnetIds, subnet.SubnetID) subnetIDs = append(subnetIDs, orEmpty(subnet.SubnetID))
if !strings.HasPrefix(orEmpty(subnet.AvailabilityZone), region) { if !strings.HasPrefix(orEmpty(subnet.AvailabilityZone), region) {
glog.Error("found AZ that did not match region", orEmpty(subnet.AvailabilityZone), " vs ", region) glog.Error("found AZ that did not match region", orEmpty(subnet.AvailabilityZone), " vs ", region)
return nil, fmt.Errorf("invalid AZ for region") return nil, fmt.Errorf("invalid AZ for region")
@@ -1671,19 +1680,9 @@ func (s *AWSCloud) CreateTCPLoadBalancer(name, region string, publicIP net.IP, p
return nil, err return nil, err
} }
} }
securityGroupIDs := []string{securityGroupID}
// Build the load balancer itself // Figure out what mappings we want on the load balancer
var loadBalancer *elb.LoadBalancerDescription
{
loadBalancer, err = s.describeLoadBalancer(region, name)
if err != nil {
return nil, err
}
if loadBalancer == nil {
createRequest := &elb.CreateLoadBalancerInput{}
createRequest.LoadBalancerName = aws.String(name)
listeners := []*elb.Listener{} listeners := []*elb.Listener{}
for _, port := range ports { for _, port := range ports {
if port.NodePort == 0 { if port.NodePort == 0 {
@@ -1703,29 +1702,16 @@ func (s *AWSCloud) CreateTCPLoadBalancer(name, region string, publicIP net.IP, p
listeners = append(listeners, listener) listeners = append(listeners, listener)
} }
createRequest.Listeners = listeners // Build the load balancer itself
loadBalancer, err := s.ensureLoadBalancer(region, name, listeners, subnetIDs, securityGroupIDs)
// We are supposed to specify one subnet per AZ.
// TODO: What happens if we have more than one subnet per AZ?
createRequest.Subnets = subnetIds
createRequest.SecurityGroups = []*string{&securityGroupID}
glog.Info("Creating load balancer with name: ", name)
_, err := elbClient.CreateLoadBalancer(createRequest)
if err != nil { if err != nil {
return nil, err return nil, err
} }
loadBalancer, err = s.describeLoadBalancer(region, name) err = s.ensureLoadBalancerHealthCheck(region, loadBalancer, listeners)
if err != nil { if err != nil {
glog.Warning("Unable to retrieve load balancer immediately after creation")
return nil, err return nil, err
} }
} else {
// TODO: Verify that load balancer configuration matches?
}
}
err = s.updateInstanceSecurityGroupsForLoadBalancer(loadBalancer, instances) err = s.updateInstanceSecurityGroupsForLoadBalancer(loadBalancer, instances)
if err != nil { if err != nil {
@@ -1733,22 +1719,12 @@ func (s *AWSCloud) CreateTCPLoadBalancer(name, region string, publicIP net.IP, p
return nil, err return nil, err
} }
registerRequest := &elb.RegisterInstancesWithLoadBalancerInput{} err = s.ensureLoadBalancerInstances(elbClient, orEmpty(loadBalancer.LoadBalancerName), loadBalancer.Instances, instances)
registerRequest.LoadBalancerName = loadBalancer.LoadBalancerName
for _, instance := range instances {
registerInstance := &elb.Instance{}
registerInstance.InstanceID = instance.InstanceID
registerRequest.Instances = append(registerRequest.Instances, registerInstance)
}
registerResponse, err := elbClient.RegisterInstancesWithLoadBalancer(registerRequest)
if err != nil { if err != nil {
// TODO: Is it better to delete the load balancer entirely? glog.Warning("Error registering instances with the load balancer: %v", err)
glog.Warningf("Error registering instances with load-balancer %s: %v", name, err)
return nil, err return nil, err
} }
glog.V(1).Infof("Updated instances registered with load-balancer %s: %v", name, registerResponse.Instances)
glog.V(1).Infof("Loadbalancer %s has DNS name %s", name, orEmpty(loadBalancer.DNSName)) glog.V(1).Infof("Loadbalancer %s has DNS name %s", name, orEmpty(loadBalancer.DNSName))
// TODO: Wait for creation? // TODO: Wait for creation?
@@ -1995,6 +1971,7 @@ func (s *AWSCloud) EnsureTCPLoadBalancerDeleted(name, region string) error {
} }
if len(securityGroupIDs) == 0 { if len(securityGroupIDs) == 0 {
glog.V(2).Info("deleted all security groups for load balancer: ", name)
break break
} }
@@ -2032,51 +2009,9 @@ func (s *AWSCloud) UpdateTCPLoadBalancer(name, region string, hosts []string) er
return fmt.Errorf("Load balancer not found") return fmt.Errorf("Load balancer not found")
} }
existingInstances := map[string]*elb.Instance{} err = s.ensureLoadBalancerInstances(elbClient, orEmpty(lb.LoadBalancerName), lb.Instances, instances)
for _, instance := range lb.Instances {
existingInstances[orEmpty(instance.InstanceID)] = instance
}
wantInstances := map[string]*ec2.Instance{}
for _, instance := range instances {
wantInstances[orEmpty(instance.InstanceID)] = instance
}
addInstances := []*elb.Instance{}
for instanceId := range wantInstances {
addInstance := &elb.Instance{}
addInstance.InstanceID = aws.String(instanceId)
addInstances = append(addInstances, addInstance)
}
removeInstances := []*elb.Instance{}
for instanceId := range existingInstances {
_, found := wantInstances[instanceId]
if !found {
removeInstance := &elb.Instance{}
removeInstance.InstanceID = aws.String(instanceId)
removeInstances = append(removeInstances, removeInstance)
}
}
if len(addInstances) > 0 {
registerRequest := &elb.RegisterInstancesWithLoadBalancerInput{}
registerRequest.Instances = addInstances
registerRequest.LoadBalancerName = lb.LoadBalancerName
_, err = elbClient.RegisterInstancesWithLoadBalancer(registerRequest)
if err != nil { if err != nil {
return err return nil
}
}
if len(removeInstances) > 0 {
deregisterRequest := &elb.DeregisterInstancesFromLoadBalancerInput{}
deregisterRequest.Instances = removeInstances
deregisterRequest.LoadBalancerName = lb.LoadBalancerName
_, err = elbClient.DeregisterInstancesFromLoadBalancer(deregisterRequest)
if err != nil {
return err
}
} }
err = s.updateInstanceSecurityGroupsForLoadBalancer(lb, instances) err = s.updateInstanceSecurityGroupsForLoadBalancer(lb, instances)

View File

@@ -0,0 +1,308 @@
/*
Copyright 2014 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package aws_cloud
import (
"fmt"
"strconv"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/aws/aws-sdk-go/service/elb"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/util"
)
func (s *AWSCloud) ensureLoadBalancer(region, name string, listeners []*elb.Listener, subnetIDs []string, securityGroupIDs []string) (*elb.LoadBalancerDescription, error) {
elbClient, err := s.getELBClient(region)
if err != nil {
return nil, err
}
loadBalancer, err := s.describeLoadBalancer(region, name)
if err != nil {
return nil, err
}
dirty := false
if loadBalancer == nil {
createRequest := &elb.CreateLoadBalancerInput{}
createRequest.LoadBalancerName = aws.String(name)
createRequest.Listeners = listeners
// We are supposed to specify one subnet per AZ.
// TODO: What happens if we have more than one subnet per AZ?
createRequest.Subnets = stringPointerArray(subnetIDs)
createRequest.SecurityGroups = stringPointerArray(securityGroupIDs)
glog.Info("Creating load balancer with name: ", name)
_, err := elbClient.CreateLoadBalancer(createRequest)
if err != nil {
return nil, err
}
dirty = true
} else {
{
// Sync subnets
expected := util.NewStringSet(subnetIDs...)
actual := stringSetFromPointers(loadBalancer.Subnets)
additions := expected.Difference(actual)
removals := actual.Difference(expected)
if len(removals) != 0 {
request := &elb.DetachLoadBalancerFromSubnetsInput{}
request.LoadBalancerName = aws.String(name)
request.Subnets = stringSetToPointers(removals)
glog.V(2).Info("Detaching load balancer from removed subnets")
_, err := elbClient.DetachLoadBalancerFromSubnets(request)
if err != nil {
return nil, fmt.Errorf("error detaching AWS loadbalancer from subnets: %v", err)
}
dirty = true
}
if len(additions) != 0 {
request := &elb.AttachLoadBalancerToSubnetsInput{}
request.LoadBalancerName = aws.String(name)
request.Subnets = stringSetToPointers(additions)
glog.V(2).Info("Attaching load balancer to added subnets")
_, err := elbClient.AttachLoadBalancerToSubnets(request)
if err != nil {
return nil, fmt.Errorf("error attaching AWS loadbalancer to subnets: %v", err)
}
dirty = true
}
}
{
// Sync security groups
expected := util.NewStringSet(securityGroupIDs...)
actual := stringSetFromPointers(loadBalancer.SecurityGroups)
if !expected.Equal(actual) {
// This call just replaces the security groups, unlike e.g. subnets (!)
request := &elb.ApplySecurityGroupsToLoadBalancerInput{}
request.LoadBalancerName = aws.String(name)
request.SecurityGroups = stringPointerArray(securityGroupIDs)
glog.V(2).Info("Applying updated security groups to load balancer")
_, err := elbClient.ApplySecurityGroupsToLoadBalancer(request)
if err != nil {
return nil, fmt.Errorf("error applying AWS loadbalancer security groups: %v", err)
}
dirty = true
}
}
{
// Sync listeners
listenerDescriptions := loadBalancer.ListenerDescriptions
foundSet := make(map[int]bool)
removals := []*int64{}
for _, listenerDescription := range listenerDescriptions {
actual := listenerDescription.Listener
if actual == nil {
glog.Warning("Ignoring empty listener in AWS loadbalancer: ", name)
continue
}
found := -1
for i, expected := range listeners {
if orEmpty(actual.Protocol) != orEmpty(expected.Protocol) {
continue
}
if orEmpty(actual.InstanceProtocol) != orEmpty(expected.InstanceProtocol) {
continue
}
if orZero(actual.InstancePort) != orZero(expected.InstancePort) {
continue
}
if orZero(actual.LoadBalancerPort) != orZero(expected.LoadBalancerPort) {
continue
}
if orEmpty(actual.SSLCertificateID) != orEmpty(expected.SSLCertificateID) {
continue
}
found = i
}
if found != -1 {
foundSet[found] = true
} else {
removals = append(removals, actual.LoadBalancerPort)
}
}
additions := []*elb.Listener{}
for i := range listeners {
if foundSet[i] {
continue
}
additions = append(additions, listeners[i])
}
if len(removals) != 0 {
request := &elb.DeleteLoadBalancerListenersInput{}
request.LoadBalancerName = aws.String(name)
request.LoadBalancerPorts = removals
glog.V(2).Info("Deleting removed load balancer listeners")
_, err := elbClient.DeleteLoadBalancerListeners(request)
if err != nil {
return nil, fmt.Errorf("error deleting AWS loadbalancer listeners: %v", err)
}
dirty = true
}
if len(additions) != 0 {
request := &elb.CreateLoadBalancerListenersInput{}
request.LoadBalancerName = aws.String(name)
request.Listeners = additions
glog.V(2).Info("Creating added load balancer listeners")
_, err := elbClient.CreateLoadBalancerListeners(request)
if err != nil {
return nil, fmt.Errorf("error creating AWS loadbalancer listeners: %v", err)
}
dirty = true
}
}
}
if dirty {
loadBalancer, err = s.describeLoadBalancer(region, name)
if err != nil {
glog.Warning("Unable to retrieve load balancer after creation/update")
return nil, err
}
}
return loadBalancer, nil
}
// Makes sure that the health check for an ELB matches the configured listeners
func (s *AWSCloud) ensureLoadBalancerHealthCheck(region string, loadBalancer *elb.LoadBalancerDescription, listeners []*elb.Listener) error {
elbClient, err := s.getELBClient(region)
if err != nil {
return err
}
actual := loadBalancer.HealthCheck
// Default AWS settings
expectedHealthyThreshold := int64(10)
expectedUnhealthyThreshold := int64(2)
expectedTimeout := int64(5)
expectedInterval := int64(30)
// We only a TCP health-check on the first port
expectedTarget := ""
for _, listener := range listeners {
if listener.InstancePort == nil {
continue
}
expectedTarget = "TCP:" + strconv.FormatInt(*listener.InstancePort, 10)
break
}
if expectedTarget == "" {
return fmt.Errorf("unable to determine health check port (no valid listeners)")
}
if expectedTarget == orEmpty(actual.Target) &&
expectedHealthyThreshold == orZero(actual.HealthyThreshold) &&
expectedUnhealthyThreshold == orZero(actual.UnhealthyThreshold) &&
expectedTimeout == orZero(actual.Timeout) &&
expectedInterval == orZero(actual.Interval) {
return nil
}
glog.V(2).Info("Updating load-balancer health-check")
healthCheck := &elb.HealthCheck{}
healthCheck.HealthyThreshold = &expectedHealthyThreshold
healthCheck.UnhealthyThreshold = &expectedUnhealthyThreshold
healthCheck.Timeout = &expectedTimeout
healthCheck.Interval = &expectedInterval
healthCheck.Target = &expectedTarget
request := &elb.ConfigureHealthCheckInput{}
request.HealthCheck = healthCheck
request.LoadBalancerName = loadBalancer.LoadBalancerName
_, err = elbClient.ConfigureHealthCheck(request)
if err != nil {
return fmt.Errorf("error configuring load-balancer health-check: %v", err)
}
return nil
}
// Makes sure that exactly the specified hosts are registered as instances with the load balancer
func (s *AWSCloud) ensureLoadBalancerInstances(elbClient ELB, loadBalancerName string, lbInstances []*elb.Instance, instances []*ec2.Instance) error {
expected := util.NewStringSet()
for _, instance := range instances {
expected.Insert(orEmpty(instance.InstanceID))
}
actual := util.NewStringSet()
for _, lbInstance := range lbInstances {
actual.Insert(orEmpty(lbInstance.InstanceID))
}
additions := expected.Difference(actual)
removals := actual.Difference(expected)
addInstances := []*elb.Instance{}
for instanceId := range additions {
addInstance := &elb.Instance{}
addInstance.InstanceID = aws.String(instanceId)
addInstances = append(addInstances, addInstance)
}
removeInstances := []*elb.Instance{}
for instanceId := range removals {
removeInstance := &elb.Instance{}
removeInstance.InstanceID = aws.String(instanceId)
removeInstances = append(removeInstances, removeInstance)
}
if len(addInstances) > 0 {
registerRequest := &elb.RegisterInstancesWithLoadBalancerInput{}
registerRequest.Instances = addInstances
registerRequest.LoadBalancerName = aws.String(loadBalancerName)
_, err := elbClient.RegisterInstancesWithLoadBalancer(registerRequest)
if err != nil {
return err
}
glog.V(1).Infof("Instances added to load-balancer %s", loadBalancerName)
}
if len(removeInstances) > 0 {
deregisterRequest := &elb.DeregisterInstancesFromLoadBalancerInput{}
deregisterRequest.Instances = removeInstances
deregisterRequest.LoadBalancerName = aws.String(loadBalancerName)
_, err := elbClient.DeregisterInstancesFromLoadBalancer(deregisterRequest)
if err != nil {
return err
}
glog.V(1).Infof("Instances removed from load-balancer %s", loadBalancerName)
}
return nil
}

View File

@@ -394,19 +394,46 @@ type FakeELB struct {
func (ec2 *FakeELB) CreateLoadBalancer(*elb.CreateLoadBalancerInput) (*elb.CreateLoadBalancerOutput, error) { func (ec2 *FakeELB) CreateLoadBalancer(*elb.CreateLoadBalancerInput) (*elb.CreateLoadBalancerOutput, error) {
panic("Not implemented") panic("Not implemented")
} }
func (ec2 *FakeELB) DeleteLoadBalancer(*elb.DeleteLoadBalancerInput) (*elb.DeleteLoadBalancerOutput, error) { func (ec2 *FakeELB) DeleteLoadBalancer(*elb.DeleteLoadBalancerInput) (*elb.DeleteLoadBalancerOutput, error) {
panic("Not implemented") panic("Not implemented")
} }
func (ec2 *FakeELB) DescribeLoadBalancers(*elb.DescribeLoadBalancersInput) (*elb.DescribeLoadBalancersOutput, error) { func (ec2 *FakeELB) DescribeLoadBalancers(*elb.DescribeLoadBalancersInput) (*elb.DescribeLoadBalancersOutput, error) {
panic("Not implemented") panic("Not implemented")
} }
func (ec2 *FakeELB) RegisterInstancesWithLoadBalancer(*elb.RegisterInstancesWithLoadBalancerInput) (*elb.RegisterInstancesWithLoadBalancerOutput, error) { func (ec2 *FakeELB) RegisterInstancesWithLoadBalancer(*elb.RegisterInstancesWithLoadBalancerInput) (*elb.RegisterInstancesWithLoadBalancerOutput, error) {
panic("Not implemented") panic("Not implemented")
} }
func (ec2 *FakeELB) DeregisterInstancesFromLoadBalancer(*elb.DeregisterInstancesFromLoadBalancerInput) (*elb.DeregisterInstancesFromLoadBalancerOutput, error) { func (ec2 *FakeELB) DeregisterInstancesFromLoadBalancer(*elb.DeregisterInstancesFromLoadBalancerInput) (*elb.DeregisterInstancesFromLoadBalancerOutput, error) {
panic("Not implemented") panic("Not implemented")
} }
func (ec2 *FakeELB) DetachLoadBalancerFromSubnets(*elb.DetachLoadBalancerFromSubnetsInput) (*elb.DetachLoadBalancerFromSubnetsOutput, error) {
panic("Not implemented")
}
func (ec2 *FakeELB) AttachLoadBalancerToSubnets(*elb.AttachLoadBalancerToSubnetsInput) (*elb.AttachLoadBalancerToSubnetsOutput, error) {
panic("Not implemented")
}
func (ec2 *FakeELB) CreateLoadBalancerListeners(*elb.CreateLoadBalancerListenersInput) (*elb.CreateLoadBalancerListenersOutput, error) {
panic("Not implemented")
}
func (ec2 *FakeELB) DeleteLoadBalancerListeners(*elb.DeleteLoadBalancerListenersInput) (*elb.DeleteLoadBalancerListenersOutput, error) {
panic("Not implemented")
}
func (ec2 *FakeELB) ApplySecurityGroupsToLoadBalancer(*elb.ApplySecurityGroupsToLoadBalancerInput) (*elb.ApplySecurityGroupsToLoadBalancerOutput, error) {
panic("Not implemented")
}
func (elb *FakeELB) ConfigureHealthCheck(*elb.ConfigureHealthCheckInput) (*elb.ConfigureHealthCheckOutput, error) {
panic("Not implemented")
}
type FakeASG struct { type FakeASG struct {
aws *FakeAWSServices aws *FakeAWSServices
} }

View File

@@ -0,0 +1,51 @@
/*
Copyright 2014 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package aws_cloud
import (
"github.com/aws/aws-sdk-go/aws"
"k8s.io/kubernetes/pkg/util"
)
func stringSetToPointers(in util.StringSet) []*string {
if in == nil {
return nil
}
out := make([]*string, len(in))
for k := range in {
out = append(out, aws.String(k))
}
return out
}
func stringSetFromPointers(in []*string) util.StringSet {
if in == nil {
return nil
}
out := util.NewStringSet()
for i := range in {
out.Insert(orEmpty(in[i]))
}
return out
}
func orZero(v *int64) int64 {
if v == nil {
return 0
}
return *v
}

View File

@@ -56,7 +56,7 @@ type FakeCloud struct {
ClusterList []string ClusterList []string
MasterName string MasterName string
ExternalIP net.IP ExternalIP net.IP
Balancers []FakeBalancer Balancers map[string]FakeBalancer
UpdateCalls []FakeUpdateBalancerCall UpdateCalls []FakeUpdateBalancerCall
RouteMap map[string]*FakeRoute RouteMap map[string]*FakeRoute
Lock sync.Mutex Lock sync.Mutex
@@ -123,11 +123,14 @@ func (f *FakeCloud) GetTCPLoadBalancer(name, region string) (*api.LoadBalancerSt
return status, f.Exists, f.Err return status, f.Exists, f.Err
} }
// CreateTCPLoadBalancer is a test-spy implementation of TCPLoadBalancer.CreateTCPLoadBalancer. // EnsureTCPLoadBalancer is a test-spy implementation of TCPLoadBalancer.EnsureTCPLoadBalancer.
// It adds an entry "create" into the internal method call record. // It adds an entry "create" into the internal method call record.
func (f *FakeCloud) CreateTCPLoadBalancer(name, region string, externalIP net.IP, ports []*api.ServicePort, hosts []string, affinityType api.ServiceAffinity) (*api.LoadBalancerStatus, error) { func (f *FakeCloud) EnsureTCPLoadBalancer(name, region string, externalIP net.IP, ports []*api.ServicePort, hosts []string, affinityType api.ServiceAffinity) (*api.LoadBalancerStatus, error) {
f.addCall("create") f.addCall("create")
f.Balancers = append(f.Balancers, FakeBalancer{name, region, externalIP, ports, hosts}) if f.Balancers == nil {
f.Balancers = make(map[string]FakeBalancer)
}
f.Balancers[name] = FakeBalancer{name, region, externalIP, ports, hosts}
status := &api.LoadBalancerStatus{} status := &api.LoadBalancerStatus{}
status.Ingress = []api.LoadBalancerIngress{{IP: f.ExternalIP.String()}} status.Ingress = []api.LoadBalancerIngress{{IP: f.ExternalIP.String()}}

View File

@@ -348,11 +348,26 @@ func makeFirewallName(name string) string {
return fmt.Sprintf("k8s-fw-%s", name) return fmt.Sprintf("k8s-fw-%s", name)
} }
// CreateTCPLoadBalancer is an implementation of TCPLoadBalancer.CreateTCPLoadBalancer. // EnsureTCPLoadBalancer is an implementation of TCPLoadBalancer.EnsureTCPLoadBalancer.
// TODO(a-robinson): Don't just ignore specified IP addresses. Check if they're // TODO(a-robinson): Don't just ignore specified IP addresses. Check if they're
// owned by the project and available to be used, and use them if they are. // owned by the project and available to be used, and use them if they are.
func (gce *GCECloud) CreateTCPLoadBalancer(name, region string, externalIP net.IP, ports []*api.ServicePort, hosts []string, affinityType api.ServiceAffinity) (*api.LoadBalancerStatus, error) { func (gce *GCECloud) EnsureTCPLoadBalancer(name, region string, externalIP net.IP, ports []*api.ServicePort, hosts []string, affinityType api.ServiceAffinity) (*api.LoadBalancerStatus, error) {
err := gce.makeTargetPool(name, region, hosts, translateAffinityType(affinityType)) glog.V(2).Info("Checking if load balancer already exists: %s", name)
_, exists, err := gce.GetTCPLoadBalancer(name, region)
if err != nil {
return nil, fmt.Errorf("error checking if GCE load balancer already exists: %v", err)
}
// TODO: Implement a more efficient update strategy for common changes than delete & create
// In particular, if we implement hosts update, we can get rid of UpdateHosts
if exists {
err := gce.EnsureTCPLoadBalancerDeleted(name, region)
if err != nil {
return nil, fmt.Errorf("error deleting existing GCE load balancer: %v", err)
}
}
err = gce.makeTargetPool(name, region, hosts, translateAffinityType(affinityType))
if err != nil { if err != nil {
if !isHTTPErrorCode(err, http.StatusConflict) { if !isHTTPErrorCode(err, http.StatusConflict) {
return nil, err return nil, err

View File

@@ -521,8 +521,8 @@ func (lb *LoadBalancer) GetTCPLoadBalancer(name, region string) (*api.LoadBalanc
// a list of regions (from config) and query/create loadbalancers in // a list of regions (from config) and query/create loadbalancers in
// each region. // each region.
func (lb *LoadBalancer) CreateTCPLoadBalancer(name, region string, externalIP net.IP, ports []*api.ServicePort, hosts []string, affinity api.ServiceAffinity) (*api.LoadBalancerStatus, error) { func (lb *LoadBalancer) EnsureTCPLoadBalancer(name, region string, externalIP net.IP, ports []*api.ServicePort, hosts []string, affinity api.ServiceAffinity) (*api.LoadBalancerStatus, error) {
glog.V(4).Infof("CreateTCPLoadBalancer(%v, %v, %v, %v, %v, %v)", name, region, externalIP, ports, hosts, affinity) glog.V(4).Infof("EnsureTCPLoadBalancer(%v, %v, %v, %v, %v, %v)", name, region, externalIP, ports, hosts, affinity)
if len(ports) > 1 { if len(ports) > 1 {
return nil, fmt.Errorf("multiple ports are not yet supported in openstack load balancers") return nil, fmt.Errorf("multiple ports are not yet supported in openstack load balancers")
@@ -538,6 +538,21 @@ func (lb *LoadBalancer) CreateTCPLoadBalancer(name, region string, externalIP ne
return nil, fmt.Errorf("unsupported load balancer affinity: %v", affinity) return nil, fmt.Errorf("unsupported load balancer affinity: %v", affinity)
} }
glog.V(2).Info("Checking if openstack load balancer already exists: %s", name)
_, exists, err := lb.GetTCPLoadBalancer(name, region)
if err != nil {
return nil, fmt.Errorf("error checking if openstack load balancer already exists: %v", err)
}
// TODO: Implement a more efficient update strategy for common changes than delete & create
// In particular, if we implement hosts update, we can get rid of UpdateHosts
if exists {
err := lb.EnsureTCPLoadBalancerDeleted(name, region)
if err != nil {
return nil, fmt.Errorf("error deleting existing openstack load balancer: %v", err)
}
}
lbmethod := lb.opts.LBMethod lbmethod := lb.opts.LBMethod
if lbmethod == "" { if lbmethod == "" {
lbmethod = pools.LBMethodRoundRobin lbmethod = pools.LBMethodRoundRobin

View File

@@ -238,7 +238,14 @@ func (s *ServiceController) processDelta(delta *cache.Delta) (error, bool) {
case cache.Sync: case cache.Sync:
err, retry := s.createLoadBalancerIfNeeded(namespacedName, service, cachedService.appliedState) err, retry := s.createLoadBalancerIfNeeded(namespacedName, service, cachedService.appliedState)
if err != nil { if err != nil {
s.eventRecorder.Event(service, "creating loadbalancer failed", err.Error()) message := "error creating load balancer"
if retry {
message += " (will retry): "
} else {
message += " (will not retry): "
}
message += err.Error()
s.eventRecorder.Event(service, "creating loadbalancer failed", message)
return err, retry return err, retry
} }
// Always update the cache upon success. // Always update the cache upon success.
@@ -248,11 +255,14 @@ func (s *ServiceController) processDelta(delta *cache.Delta) (error, bool) {
cachedService.appliedState = service cachedService.appliedState = service
s.cache.set(namespacedName.String(), cachedService) s.cache.set(namespacedName.String(), cachedService)
case cache.Deleted: case cache.Deleted:
s.eventRecorder.Event(service, "deleting loadbalancer", "deleting loadbalancer")
err := s.balancer.EnsureTCPLoadBalancerDeleted(s.loadBalancerName(service), s.zone.Region) err := s.balancer.EnsureTCPLoadBalancerDeleted(s.loadBalancerName(service), s.zone.Region)
if err != nil { if err != nil {
s.eventRecorder.Event(service, "deleting loadbalancer failed", err.Error()) message := "error deleting load balancer (will retry): " + err.Error()
s.eventRecorder.Event(service, "deleting loadbalancer failed", message)
return err, retryable return err, retryable
} }
s.eventRecorder.Event(service, "deleted loadbalancer", "deleted loadbalancer")
s.cache.delete(namespacedName.String()) s.cache.delete(namespacedName.String())
default: default:
glog.Errorf("Unexpected delta type: %v", delta.Type) glog.Errorf("Unexpected delta type: %v", delta.Type)
@@ -262,54 +272,59 @@ func (s *ServiceController) processDelta(delta *cache.Delta) (error, bool) {
// Returns whatever error occurred along with a boolean indicator of whether it // Returns whatever error occurred along with a boolean indicator of whether it
// should be retried. // should be retried.
func (s *ServiceController) createLoadBalancerIfNeeded(namespacedName types.NamespacedName, service, cachedService *api.Service) (error, bool) { func (s *ServiceController) createLoadBalancerIfNeeded(namespacedName types.NamespacedName, service, appliedState *api.Service) (error, bool) {
if cachedService != nil && !needsUpdate(cachedService, service) { if appliedState != nil && !needsUpdate(appliedState, service) {
glog.Infof("LB already exists and doesn't need update for service %s", namespacedName) glog.Infof("LB already exists and doesn't need update for service %s", namespacedName)
return nil, notRetryable return nil, notRetryable
} }
if cachedService != nil {
// If the service already exists but needs to be updated, delete it so that // Note: It is safe to just call EnsureTCPLoadBalancer. But, on some clouds that requires a delete & create,
// we can recreate it cleanly. // which may involve service interruption. Also, we would like user-friendly events.
if wantsExternalLoadBalancer(cachedService) {
glog.Infof("Deleting existing load balancer for service %s that needs an updated load balancer.", namespacedName)
if err := s.balancer.EnsureTCPLoadBalancerDeleted(s.loadBalancerName(cachedService), s.zone.Region); err != nil {
return err, retryable
}
}
} else {
// If we don't have any cached memory of the load balancer, we have to ask
// the cloud provider for what it knows about it.
status, exists, err := s.balancer.GetTCPLoadBalancer(s.loadBalancerName(service), s.zone.Region)
if err != nil {
return fmt.Errorf("Error getting LB for service %s: %v", namespacedName, err), retryable
}
if exists && api.LoadBalancerStatusEqual(status, &service.Status.LoadBalancer) {
glog.Infof("LB already exists with status %s for previously uncached service %s", status, namespacedName)
return nil, notRetryable
} else if exists {
glog.Infof("Deleting old LB for previously uncached service %s whose endpoint %s doesn't match the service's desired IPs %v",
namespacedName, status, service.Spec.DeprecatedPublicIPs)
if err := s.balancer.EnsureTCPLoadBalancerDeleted(s.loadBalancerName(service), s.zone.Region); err != nil {
return err, retryable
}
}
}
// Save the state so we can avoid a write if it doesn't change // Save the state so we can avoid a write if it doesn't change
previousState := api.LoadBalancerStatusDeepCopy(&service.Status.LoadBalancer) previousState := api.LoadBalancerStatusDeepCopy(&service.Status.LoadBalancer)
if !wantsExternalLoadBalancer(service) { if !wantsExternalLoadBalancer(service) {
glog.Infof("Not creating LB for service %s that doesn't want one.", namespacedName) needDelete := true
if appliedState != nil {
if !wantsExternalLoadBalancer(appliedState) {
needDelete = false
}
} else {
// If we don't have any cached memory of the load balancer, we have to ask
// the cloud provider for what it knows about it.
// Technically EnsureTCPLoadBalancerDeleted can cope, but we want to post meaningful events
_, exists, err := s.balancer.GetTCPLoadBalancer(s.loadBalancerName(service), s.zone.Region)
if err != nil {
return fmt.Errorf("Error getting LB for service %s: %v", namespacedName, err), retryable
}
if !exists {
needDelete = false
}
}
if needDelete {
glog.Infof("Deleting existing load balancer for service %s that no longer needs a load balancer.", namespacedName)
s.eventRecorder.Event(service, "deleting loadbalancer", "deleting loadbalancer")
if err := s.balancer.EnsureTCPLoadBalancerDeleted(s.loadBalancerName(service), s.zone.Region); err != nil {
return err, retryable
}
s.eventRecorder.Event(service, "deleted loadbalancer", "deleted loadbalancer")
}
service.Status.LoadBalancer = api.LoadBalancerStatus{} service.Status.LoadBalancer = api.LoadBalancerStatus{}
} else { } else {
glog.V(2).Infof("Creating LB for service %s", namespacedName) glog.V(2).Infof("Ensuring LB for service %s", namespacedName)
// TODO: We could do a dry-run here if wanted to avoid the spurious cloud-calls & events when we restart
// The load balancer doesn't exist yet, so create it. // The load balancer doesn't exist yet, so create it.
s.eventRecorder.Event(service, "creating loadbalancer", "creating loadbalancer")
err := s.createExternalLoadBalancer(service) err := s.createExternalLoadBalancer(service)
if err != nil { if err != nil {
return fmt.Errorf("failed to create external load balancer for service %s: %v", namespacedName, err), retryable return fmt.Errorf("failed to create external load balancer for service %s: %v", namespacedName, err), retryable
} }
s.eventRecorder.Event(service, "created loadbalancer", "created loadbalancer")
} }
// Write the state if changed // Write the state if changed
@@ -319,7 +334,7 @@ func (s *ServiceController) createLoadBalancerIfNeeded(namespacedName types.Name
return fmt.Errorf("Failed to persist updated status to apiserver, even after retries. Giving up: %v", err), notRetryable return fmt.Errorf("Failed to persist updated status to apiserver, even after retries. Giving up: %v", err), notRetryable
} }
} else { } else {
glog.Infof("Not persisting unchanged LoadBalancerStatus to registry.") glog.V(2).Infof("Not persisting unchanged LoadBalancerStatus to registry.")
} }
return nil, notRetryable return nil, notRetryable
@@ -367,7 +382,7 @@ func (s *ServiceController) createExternalLoadBalancer(service *api.Service) err
for _, publicIP := range service.Spec.DeprecatedPublicIPs { for _, publicIP := range service.Spec.DeprecatedPublicIPs {
// TODO: Make this actually work for multiple IPs by using different // TODO: Make this actually work for multiple IPs by using different
// names for each. For now, we'll just create the first and break. // names for each. For now, we'll just create the first and break.
status, err := s.balancer.CreateTCPLoadBalancer(name, s.zone.Region, net.ParseIP(publicIP), status, err := s.balancer.EnsureTCPLoadBalancer(name, s.zone.Region, net.ParseIP(publicIP),
ports, hostsFromNodeList(&nodes), service.Spec.SessionAffinity) ports, hostsFromNodeList(&nodes), service.Spec.SessionAffinity)
if err != nil { if err != nil {
return err return err
@@ -377,7 +392,7 @@ func (s *ServiceController) createExternalLoadBalancer(service *api.Service) err
break break
} }
} else { } else {
status, err := s.balancer.CreateTCPLoadBalancer(name, s.zone.Region, nil, status, err := s.balancer.EnsureTCPLoadBalancer(name, s.zone.Region, nil,
ports, hostsFromNodeList(&nodes), service.Spec.SessionAffinity) ports, hostsFromNodeList(&nodes), service.Spec.SessionAffinity)
if err != nil { if err != nil {
return err return err
@@ -628,7 +643,7 @@ func (s *ServiceController) updateLoadBalancerHosts(services []*cachedService, h
func() { func() {
service.mu.Lock() service.mu.Lock()
defer service.mu.Unlock() defer service.mu.Unlock()
// If the service is nil, that means it hasn't yet been successfully dealt // If the applied state is nil, that means it hasn't yet been successfully dealt
// with by the load balancer reconciler. We can trust the load balancer // with by the load balancer reconciler. We can trust the load balancer
// reconciler to ensure the service's load balancer is created to target // reconciler to ensure the service's load balancer is created to target
// the correct nodes. // the correct nodes.
@@ -651,9 +666,11 @@ func (s *ServiceController) lockedUpdateLoadBalancerHosts(service *api.Service,
return nil return nil
} }
// This operation doesn't normally take very long (and happens pretty often), so we only record the final event
name := cloudprovider.GetLoadBalancerName(service) name := cloudprovider.GetLoadBalancerName(service)
err := s.balancer.UpdateTCPLoadBalancer(name, s.zone.Region, hosts) err := s.balancer.UpdateTCPLoadBalancer(name, s.zone.Region, hosts)
if err == nil { if err == nil {
s.eventRecorder.Event(service, "updated loadbalancer", "updated loadbalancer with new hosts")
return nil return nil
} }
@@ -663,6 +680,9 @@ func (s *ServiceController) lockedUpdateLoadBalancerHosts(service *api.Service,
} else if !exists { } else if !exists {
return nil return nil
} }
message := "error updating loadbalancer with new hosts: " + err.Error()
s.eventRecorder.Event(service, "updating loadbalancer failed", message)
return err return err
} }

View File

@@ -110,12 +110,22 @@ func TestCreateExternalLoadBalancer(t *testing.T) {
t.Errorf("unexpected client actions: %v", actions) t.Errorf("unexpected client actions: %v", actions)
} }
} else { } else {
if len(cloud.Balancers) != 1 { var balancer *fake_cloud.FakeBalancer
for k := range cloud.Balancers {
if balancer == nil {
b := cloud.Balancers[k]
balancer = &b
} else {
t.Errorf("expected one load balancer to be created, got %v", cloud.Balancers) t.Errorf("expected one load balancer to be created, got %v", cloud.Balancers)
} else if cloud.Balancers[0].Name != controller.loadBalancerName(item.service) || break
cloud.Balancers[0].Region != region || }
cloud.Balancers[0].Ports[0].Port != item.service.Spec.Ports[0].Port { }
t.Errorf("created load balancer has incorrect parameters: %v", cloud.Balancers[0]) if balancer == nil {
t.Errorf("expected one load balancer to be created, got none")
} else if balancer.Name != controller.loadBalancerName(item.service) ||
balancer.Region != region ||
balancer.Ports[0].Port != item.service.Spec.Ports[0].Port {
t.Errorf("created load balancer has incorrect parameters: %v", balancer)
} }
actionFound := false actionFound := false
for _, action := range actions { for _, action := range actions {

View File

@@ -131,6 +131,21 @@ func (s1 StringSet) IsSuperset(s2 StringSet) bool {
return true return true
} }
// Equal returns true iff s1 is equal (as a set) to s2.
// Two sets are equal if their membership is identical.
// (In practice, this means same elements, order doesn't matter)
func (s1 StringSet) Equal(s2 StringSet) bool {
if len(s1) != len(s2) {
return false
}
for item := range s2 {
if !s1.Has(item) {
return false
}
}
return true
}
// List returns the contents as a sorted string slice. // List returns the contents as a sorted string slice.
func (s StringSet) List() []string { func (s StringSet) List() []string {
res := make([]string, 0, len(s)) res := make([]string, 0, len(s))

View File

@@ -129,3 +129,57 @@ func TestStringSetHasAny(t *testing.T) {
t.Errorf("expected false, got true") t.Errorf("expected false, got true")
} }
} }
func TestStringSetEquals(t *testing.T) {
// Simple case (order doesn't matter)
a := NewStringSet("1", "2")
b := NewStringSet("2", "1")
if !a.Equal(b) {
t.Errorf("Expected to be equal: %v vs %v", a, b)
}
// It is a set; duplicates are ignored
b = NewStringSet("2", "2", "1")
if !a.Equal(b) {
t.Errorf("Expected to be equal: %v vs %v", a, b)
}
// Edge cases around empty sets / empty strings
a = NewStringSet()
b = NewStringSet()
if !a.Equal(b) {
t.Errorf("Expected to be equal: %v vs %v", a, b)
}
b = NewStringSet("1", "2", "3")
if a.Equal(b) {
t.Errorf("Expected to be not-equal: %v vs %v", a, b)
}
b = NewStringSet("1", "2", "")
if a.Equal(b) {
t.Errorf("Expected to be not-equal: %v vs %v", a, b)
}
// Check for equality after mutation
a = NewStringSet()
a.Insert("1")
if a.Equal(b) {
t.Errorf("Expected to be not-equal: %v vs %v", a, b)
}
a.Insert("2")
if a.Equal(b) {
t.Errorf("Expected to be not-equal: %v vs %v", a, b)
}
a.Insert("")
if !a.Equal(b) {
t.Errorf("Expected to be equal: %v vs %v", a, b)
}
a.Delete("")
if a.Equal(b) {
t.Errorf("Expected to be not-equal: %v vs %v", a, b)
}
}

View File

@@ -586,21 +586,15 @@ var _ = Describe("Services", func() {
if len(service.Status.LoadBalancer.Ingress) != 1 { if len(service.Status.LoadBalancer.Ingress) != 1 {
Failf("got unexpected len(Status.LoadBalancer.Ingresss) for NodePort service: %v", service) Failf("got unexpected len(Status.LoadBalancer.Ingresss) for NodePort service: %v", service)
} }
ingress2 := service.Status.LoadBalancer.Ingress[0]
// TODO: Fix the issue here: http://issue.k8s.io/11002 // TODO: Make this less of a hack. Watch for events?
if providerIs("aws") { Logf("Waiting 2 minutes to give service time to settle after changing configuration")
// TODO: Make this less of a hack (or fix the underlying bug)
time.Sleep(time.Second * 120) time.Sleep(time.Second * 120)
service, err = waitForLoadBalancerIngress(f.Client, serviceName, f.Namespace.Name) service, err = waitForLoadBalancerIngress(f.Client, serviceName, f.Namespace.Name)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
// We don't want the ingress point to change, but we should verify that the new ingress point still works ingress2 := service.Status.LoadBalancer.Ingress[0]
ingress2 = service.Status.LoadBalancer.Ingress[0]
Expect(ingress1).NotTo(Equal(ingress2))
} else {
Expect(ingress1).To(Equal(ingress2)) Expect(ingress1).To(Equal(ingress2))
}
By("hitting the pod through the service's updated NodePort") By("hitting the pod through the service's updated NodePort")
testReachable(ip, nodePort2) testReachable(ip, nodePort2)