diff --git a/pkg/cloudprovider/cloud.go b/pkg/cloudprovider/cloud.go index 39ca7493606..50241fd28de 100644 --- a/pkg/cloudprovider/cloud.go +++ b/pkg/cloudprovider/cloud.go @@ -80,8 +80,8 @@ type TCPLoadBalancer interface { // GetTCPLoadBalancer returns whether the specified load balancer exists, and // if so, what its status is. GetTCPLoadBalancer(name, region string) (status *api.LoadBalancerStatus, exists bool, err error) - // CreateTCPLoadBalancer creates a new tcp load balancer. Returns the status of the balancer - CreateTCPLoadBalancer(name, region string, externalIP net.IP, ports []*api.ServicePort, hosts []string, affinityType api.ServiceAffinity) (*api.LoadBalancerStatus, error) + // EnsureTCPLoadBalancer creates a new tcp load balancer, or updates an existing one. Returns the status of the balancer + EnsureTCPLoadBalancer(name, region string, externalIP net.IP, ports []*api.ServicePort, hosts []string, affinityType api.ServiceAffinity) (*api.LoadBalancerStatus, error) // UpdateTCPLoadBalancer updates hosts under the specified load balancer. UpdateTCPLoadBalancer(name, region string, hosts []string) error // EnsureTCPLoadBalancerDeleted deletes the specified load balancer if it diff --git a/pkg/cloudprovider/providers/aws/aws.go b/pkg/cloudprovider/providers/aws/aws.go index 3a049718e56..1ffd48c7a98 100644 --- a/pkg/cloudprovider/providers/aws/aws.go +++ b/pkg/cloudprovider/providers/aws/aws.go @@ -109,6 +109,16 @@ type ELB interface { DescribeLoadBalancers(*elb.DescribeLoadBalancersInput) (*elb.DescribeLoadBalancersOutput, error) RegisterInstancesWithLoadBalancer(*elb.RegisterInstancesWithLoadBalancerInput) (*elb.RegisterInstancesWithLoadBalancerOutput, error) DeregisterInstancesFromLoadBalancer(*elb.DeregisterInstancesFromLoadBalancerInput) (*elb.DeregisterInstancesFromLoadBalancerOutput, error) + + DetachLoadBalancerFromSubnets(*elb.DetachLoadBalancerFromSubnetsInput) (*elb.DetachLoadBalancerFromSubnetsOutput, error) + AttachLoadBalancerToSubnets(*elb.AttachLoadBalancerToSubnetsInput) (*elb.AttachLoadBalancerToSubnetsOutput, error) + + CreateLoadBalancerListeners(*elb.CreateLoadBalancerListenersInput) (*elb.CreateLoadBalancerListenersOutput, error) + DeleteLoadBalancerListeners(*elb.DeleteLoadBalancerListenersInput) (*elb.DeleteLoadBalancerListenersOutput, error) + + ApplySecurityGroupsToLoadBalancer(*elb.ApplySecurityGroupsToLoadBalancerInput) (*elb.ApplySecurityGroupsToLoadBalancerOutput, error) + + ConfigureHealthCheck(*elb.ConfigureHealthCheckInput) (*elb.ConfigureHealthCheckOutput, error) } // This is a simple pass-through of the Autoscaling client interface, which allows for testing @@ -1581,11 +1591,10 @@ func (s *AWSCloud) createTags(request *ec2.CreateTagsInput) (*ec2.CreateTagsOutp } } -// CreateTCPLoadBalancer implements TCPLoadBalancer.CreateTCPLoadBalancer -// TODO(justinsb): This must be idempotent +// EnsureTCPLoadBalancer implements TCPLoadBalancer.EnsureTCPLoadBalancer // TODO(justinsb) It is weird that these take a region. I suspect it won't work cross-region anwyay. -func (s *AWSCloud) CreateTCPLoadBalancer(name, region string, publicIP net.IP, ports []*api.ServicePort, hosts []string, affinity api.ServiceAffinity) (*api.LoadBalancerStatus, error) { - glog.V(2).Infof("CreateTCPLoadBalancer(%v, %v, %v, %v, %v)", name, region, publicIP, ports, hosts) +func (s *AWSCloud) EnsureTCPLoadBalancer(name, region string, publicIP net.IP, ports []*api.ServicePort, hosts []string, affinity api.ServiceAffinity) (*api.LoadBalancerStatus, error) { + glog.V(2).Infof("EnsureTCPLoadBalancer(%v, %v, %v, %v, %v)", name, region, publicIP, ports, hosts) elbClient, err := s.getELBClient(region) if err != nil { @@ -1616,7 +1625,7 @@ func (s *AWSCloud) CreateTCPLoadBalancer(name, region string, publicIP net.IP, p } // Construct list of configured subnets - subnetIds := []*string{} + subnetIDs := []string{} { request := &ec2.DescribeSubnetsInput{} filters := []*ec2.Filter{} @@ -1632,7 +1641,7 @@ func (s *AWSCloud) CreateTCPLoadBalancer(name, region string, publicIP net.IP, p // zones := []string{} for _, subnet := range subnets { - subnetIds = append(subnetIds, subnet.SubnetID) + subnetIDs = append(subnetIDs, orEmpty(subnet.SubnetID)) if !strings.HasPrefix(orEmpty(subnet.AvailabilityZone), region) { glog.Error("found AZ that did not match region", orEmpty(subnet.AvailabilityZone), " vs ", region) return nil, fmt.Errorf("invalid AZ for region") @@ -1671,60 +1680,37 @@ func (s *AWSCloud) CreateTCPLoadBalancer(name, region string, publicIP net.IP, p return nil, err } } + securityGroupIDs := []string{securityGroupID} + + // Figure out what mappings we want on the load balancer + listeners := []*elb.Listener{} + for _, port := range ports { + if port.NodePort == 0 { + glog.Errorf("Ignoring port without NodePort defined: %v", port) + continue + } + instancePort := int64(port.NodePort) + loadBalancerPort := int64(port.Port) + protocol := strings.ToLower(string(port.Protocol)) + + listener := &elb.Listener{} + listener.InstancePort = &instancePort + listener.LoadBalancerPort = &loadBalancerPort + listener.Protocol = &protocol + listener.InstanceProtocol = &protocol + + listeners = append(listeners, listener) + } // Build the load balancer itself - var loadBalancer *elb.LoadBalancerDescription - { - loadBalancer, err = s.describeLoadBalancer(region, name) - if err != nil { - return nil, err - } + loadBalancer, err := s.ensureLoadBalancer(region, name, listeners, subnetIDs, securityGroupIDs) + if err != nil { + return nil, err + } - if loadBalancer == nil { - createRequest := &elb.CreateLoadBalancerInput{} - createRequest.LoadBalancerName = aws.String(name) - - listeners := []*elb.Listener{} - for _, port := range ports { - if port.NodePort == 0 { - glog.Errorf("Ignoring port without NodePort defined: %v", port) - continue - } - instancePort := int64(port.NodePort) - loadBalancerPort := int64(port.Port) - protocol := strings.ToLower(string(port.Protocol)) - - listener := &elb.Listener{} - listener.InstancePort = &instancePort - listener.LoadBalancerPort = &loadBalancerPort - listener.Protocol = &protocol - listener.InstanceProtocol = &protocol - - listeners = append(listeners, listener) - } - - createRequest.Listeners = listeners - - // We are supposed to specify one subnet per AZ. - // TODO: What happens if we have more than one subnet per AZ? - createRequest.Subnets = subnetIds - - createRequest.SecurityGroups = []*string{&securityGroupID} - - glog.Info("Creating load balancer with name: ", name) - _, err := elbClient.CreateLoadBalancer(createRequest) - if err != nil { - return nil, err - } - - loadBalancer, err = s.describeLoadBalancer(region, name) - if err != nil { - glog.Warning("Unable to retrieve load balancer immediately after creation") - return nil, err - } - } else { - // TODO: Verify that load balancer configuration matches? - } + err = s.ensureLoadBalancerHealthCheck(region, loadBalancer, listeners) + if err != nil { + return nil, err } err = s.updateInstanceSecurityGroupsForLoadBalancer(loadBalancer, instances) @@ -1733,22 +1719,12 @@ func (s *AWSCloud) CreateTCPLoadBalancer(name, region string, publicIP net.IP, p return nil, err } - registerRequest := &elb.RegisterInstancesWithLoadBalancerInput{} - registerRequest.LoadBalancerName = loadBalancer.LoadBalancerName - for _, instance := range instances { - registerInstance := &elb.Instance{} - registerInstance.InstanceID = instance.InstanceID - registerRequest.Instances = append(registerRequest.Instances, registerInstance) - } - - registerResponse, err := elbClient.RegisterInstancesWithLoadBalancer(registerRequest) + err = s.ensureLoadBalancerInstances(elbClient, orEmpty(loadBalancer.LoadBalancerName), loadBalancer.Instances, instances) if err != nil { - // TODO: Is it better to delete the load balancer entirely? - glog.Warningf("Error registering instances with load-balancer %s: %v", name, err) + glog.Warning("Error registering instances with the load balancer: %v", err) return nil, err } - glog.V(1).Infof("Updated instances registered with load-balancer %s: %v", name, registerResponse.Instances) glog.V(1).Infof("Loadbalancer %s has DNS name %s", name, orEmpty(loadBalancer.DNSName)) // TODO: Wait for creation? @@ -1995,6 +1971,7 @@ func (s *AWSCloud) EnsureTCPLoadBalancerDeleted(name, region string) error { } if len(securityGroupIDs) == 0 { + glog.V(2).Info("deleted all security groups for load balancer: ", name) break } @@ -2032,51 +2009,9 @@ func (s *AWSCloud) UpdateTCPLoadBalancer(name, region string, hosts []string) er return fmt.Errorf("Load balancer not found") } - existingInstances := map[string]*elb.Instance{} - for _, instance := range lb.Instances { - existingInstances[orEmpty(instance.InstanceID)] = instance - } - - wantInstances := map[string]*ec2.Instance{} - for _, instance := range instances { - wantInstances[orEmpty(instance.InstanceID)] = instance - } - - addInstances := []*elb.Instance{} - for instanceId := range wantInstances { - addInstance := &elb.Instance{} - addInstance.InstanceID = aws.String(instanceId) - addInstances = append(addInstances, addInstance) - } - - removeInstances := []*elb.Instance{} - for instanceId := range existingInstances { - _, found := wantInstances[instanceId] - if !found { - removeInstance := &elb.Instance{} - removeInstance.InstanceID = aws.String(instanceId) - removeInstances = append(removeInstances, removeInstance) - } - } - - if len(addInstances) > 0 { - registerRequest := &elb.RegisterInstancesWithLoadBalancerInput{} - registerRequest.Instances = addInstances - registerRequest.LoadBalancerName = lb.LoadBalancerName - _, err = elbClient.RegisterInstancesWithLoadBalancer(registerRequest) - if err != nil { - return err - } - } - - if len(removeInstances) > 0 { - deregisterRequest := &elb.DeregisterInstancesFromLoadBalancerInput{} - deregisterRequest.Instances = removeInstances - deregisterRequest.LoadBalancerName = lb.LoadBalancerName - _, err = elbClient.DeregisterInstancesFromLoadBalancer(deregisterRequest) - if err != nil { - return err - } + err = s.ensureLoadBalancerInstances(elbClient, orEmpty(lb.LoadBalancerName), lb.Instances, instances) + if err != nil { + return nil } err = s.updateInstanceSecurityGroupsForLoadBalancer(lb, instances) diff --git a/pkg/cloudprovider/providers/aws/aws_loadbalancer.go b/pkg/cloudprovider/providers/aws/aws_loadbalancer.go new file mode 100644 index 00000000000..eef723be8cf --- /dev/null +++ b/pkg/cloudprovider/providers/aws/aws_loadbalancer.go @@ -0,0 +1,308 @@ +/* +Copyright 2014 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package aws_cloud + +import ( + "fmt" + "strconv" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/ec2" + "github.com/aws/aws-sdk-go/service/elb" + "github.com/golang/glog" + "k8s.io/kubernetes/pkg/util" +) + +func (s *AWSCloud) ensureLoadBalancer(region, name string, listeners []*elb.Listener, subnetIDs []string, securityGroupIDs []string) (*elb.LoadBalancerDescription, error) { + elbClient, err := s.getELBClient(region) + if err != nil { + return nil, err + } + + loadBalancer, err := s.describeLoadBalancer(region, name) + if err != nil { + return nil, err + } + + dirty := false + + if loadBalancer == nil { + createRequest := &elb.CreateLoadBalancerInput{} + createRequest.LoadBalancerName = aws.String(name) + + createRequest.Listeners = listeners + + // We are supposed to specify one subnet per AZ. + // TODO: What happens if we have more than one subnet per AZ? + createRequest.Subnets = stringPointerArray(subnetIDs) + + createRequest.SecurityGroups = stringPointerArray(securityGroupIDs) + + glog.Info("Creating load balancer with name: ", name) + _, err := elbClient.CreateLoadBalancer(createRequest) + if err != nil { + return nil, err + } + dirty = true + } else { + { + // Sync subnets + expected := util.NewStringSet(subnetIDs...) + actual := stringSetFromPointers(loadBalancer.Subnets) + + additions := expected.Difference(actual) + removals := actual.Difference(expected) + + if len(removals) != 0 { + request := &elb.DetachLoadBalancerFromSubnetsInput{} + request.LoadBalancerName = aws.String(name) + request.Subnets = stringSetToPointers(removals) + glog.V(2).Info("Detaching load balancer from removed subnets") + _, err := elbClient.DetachLoadBalancerFromSubnets(request) + if err != nil { + return nil, fmt.Errorf("error detaching AWS loadbalancer from subnets: %v", err) + } + dirty = true + } + + if len(additions) != 0 { + request := &elb.AttachLoadBalancerToSubnetsInput{} + request.LoadBalancerName = aws.String(name) + request.Subnets = stringSetToPointers(additions) + glog.V(2).Info("Attaching load balancer to added subnets") + _, err := elbClient.AttachLoadBalancerToSubnets(request) + if err != nil { + return nil, fmt.Errorf("error attaching AWS loadbalancer to subnets: %v", err) + } + dirty = true + } + } + + { + // Sync security groups + expected := util.NewStringSet(securityGroupIDs...) + actual := stringSetFromPointers(loadBalancer.SecurityGroups) + + if !expected.Equal(actual) { + // This call just replaces the security groups, unlike e.g. subnets (!) + request := &elb.ApplySecurityGroupsToLoadBalancerInput{} + request.LoadBalancerName = aws.String(name) + request.SecurityGroups = stringPointerArray(securityGroupIDs) + glog.V(2).Info("Applying updated security groups to load balancer") + _, err := elbClient.ApplySecurityGroupsToLoadBalancer(request) + if err != nil { + return nil, fmt.Errorf("error applying AWS loadbalancer security groups: %v", err) + } + dirty = true + } + } + + { + // Sync listeners + listenerDescriptions := loadBalancer.ListenerDescriptions + + foundSet := make(map[int]bool) + removals := []*int64{} + for _, listenerDescription := range listenerDescriptions { + actual := listenerDescription.Listener + if actual == nil { + glog.Warning("Ignoring empty listener in AWS loadbalancer: ", name) + continue + } + + found := -1 + for i, expected := range listeners { + if orEmpty(actual.Protocol) != orEmpty(expected.Protocol) { + continue + } + if orEmpty(actual.InstanceProtocol) != orEmpty(expected.InstanceProtocol) { + continue + } + if orZero(actual.InstancePort) != orZero(expected.InstancePort) { + continue + } + if orZero(actual.LoadBalancerPort) != orZero(expected.LoadBalancerPort) { + continue + } + if orEmpty(actual.SSLCertificateID) != orEmpty(expected.SSLCertificateID) { + continue + } + found = i + } + if found != -1 { + foundSet[found] = true + } else { + removals = append(removals, actual.LoadBalancerPort) + } + } + + additions := []*elb.Listener{} + for i := range listeners { + if foundSet[i] { + continue + } + additions = append(additions, listeners[i]) + } + + if len(removals) != 0 { + request := &elb.DeleteLoadBalancerListenersInput{} + request.LoadBalancerName = aws.String(name) + request.LoadBalancerPorts = removals + glog.V(2).Info("Deleting removed load balancer listeners") + _, err := elbClient.DeleteLoadBalancerListeners(request) + if err != nil { + return nil, fmt.Errorf("error deleting AWS loadbalancer listeners: %v", err) + } + dirty = true + } + + if len(additions) != 0 { + request := &elb.CreateLoadBalancerListenersInput{} + request.LoadBalancerName = aws.String(name) + request.Listeners = additions + glog.V(2).Info("Creating added load balancer listeners") + _, err := elbClient.CreateLoadBalancerListeners(request) + if err != nil { + return nil, fmt.Errorf("error creating AWS loadbalancer listeners: %v", err) + } + dirty = true + } + } + } + + if dirty { + loadBalancer, err = s.describeLoadBalancer(region, name) + if err != nil { + glog.Warning("Unable to retrieve load balancer after creation/update") + return nil, err + } + } + + return loadBalancer, nil +} + +// Makes sure that the health check for an ELB matches the configured listeners +func (s *AWSCloud) ensureLoadBalancerHealthCheck(region string, loadBalancer *elb.LoadBalancerDescription, listeners []*elb.Listener) error { + elbClient, err := s.getELBClient(region) + if err != nil { + return err + } + + actual := loadBalancer.HealthCheck + + // Default AWS settings + expectedHealthyThreshold := int64(10) + expectedUnhealthyThreshold := int64(2) + expectedTimeout := int64(5) + expectedInterval := int64(30) + + // We only a TCP health-check on the first port + expectedTarget := "" + for _, listener := range listeners { + if listener.InstancePort == nil { + continue + } + expectedTarget = "TCP:" + strconv.FormatInt(*listener.InstancePort, 10) + break + } + + if expectedTarget == "" { + return fmt.Errorf("unable to determine health check port (no valid listeners)") + } + + if expectedTarget == orEmpty(actual.Target) && + expectedHealthyThreshold == orZero(actual.HealthyThreshold) && + expectedUnhealthyThreshold == orZero(actual.UnhealthyThreshold) && + expectedTimeout == orZero(actual.Timeout) && + expectedInterval == orZero(actual.Interval) { + return nil + } + + glog.V(2).Info("Updating load-balancer health-check") + + healthCheck := &elb.HealthCheck{} + healthCheck.HealthyThreshold = &expectedHealthyThreshold + healthCheck.UnhealthyThreshold = &expectedUnhealthyThreshold + healthCheck.Timeout = &expectedTimeout + healthCheck.Interval = &expectedInterval + healthCheck.Target = &expectedTarget + + request := &elb.ConfigureHealthCheckInput{} + request.HealthCheck = healthCheck + request.LoadBalancerName = loadBalancer.LoadBalancerName + + _, err = elbClient.ConfigureHealthCheck(request) + if err != nil { + return fmt.Errorf("error configuring load-balancer health-check: %v", err) + } + + return nil +} + +// Makes sure that exactly the specified hosts are registered as instances with the load balancer +func (s *AWSCloud) ensureLoadBalancerInstances(elbClient ELB, loadBalancerName string, lbInstances []*elb.Instance, instances []*ec2.Instance) error { + expected := util.NewStringSet() + for _, instance := range instances { + expected.Insert(orEmpty(instance.InstanceID)) + } + + actual := util.NewStringSet() + for _, lbInstance := range lbInstances { + actual.Insert(orEmpty(lbInstance.InstanceID)) + } + + additions := expected.Difference(actual) + removals := actual.Difference(expected) + + addInstances := []*elb.Instance{} + for instanceId := range additions { + addInstance := &elb.Instance{} + addInstance.InstanceID = aws.String(instanceId) + addInstances = append(addInstances, addInstance) + } + + removeInstances := []*elb.Instance{} + for instanceId := range removals { + removeInstance := &elb.Instance{} + removeInstance.InstanceID = aws.String(instanceId) + removeInstances = append(removeInstances, removeInstance) + } + + if len(addInstances) > 0 { + registerRequest := &elb.RegisterInstancesWithLoadBalancerInput{} + registerRequest.Instances = addInstances + registerRequest.LoadBalancerName = aws.String(loadBalancerName) + _, err := elbClient.RegisterInstancesWithLoadBalancer(registerRequest) + if err != nil { + return err + } + glog.V(1).Infof("Instances added to load-balancer %s", loadBalancerName) + } + + if len(removeInstances) > 0 { + deregisterRequest := &elb.DeregisterInstancesFromLoadBalancerInput{} + deregisterRequest.Instances = removeInstances + deregisterRequest.LoadBalancerName = aws.String(loadBalancerName) + _, err := elbClient.DeregisterInstancesFromLoadBalancer(deregisterRequest) + if err != nil { + return err + } + glog.V(1).Infof("Instances removed from load-balancer %s", loadBalancerName) + } + + return nil +} diff --git a/pkg/cloudprovider/providers/aws/aws_test.go b/pkg/cloudprovider/providers/aws/aws_test.go index 817363f7c8d..29bbd8a2e07 100644 --- a/pkg/cloudprovider/providers/aws/aws_test.go +++ b/pkg/cloudprovider/providers/aws/aws_test.go @@ -394,19 +394,46 @@ type FakeELB struct { func (ec2 *FakeELB) CreateLoadBalancer(*elb.CreateLoadBalancerInput) (*elb.CreateLoadBalancerOutput, error) { panic("Not implemented") } + func (ec2 *FakeELB) DeleteLoadBalancer(*elb.DeleteLoadBalancerInput) (*elb.DeleteLoadBalancerOutput, error) { panic("Not implemented") } + func (ec2 *FakeELB) DescribeLoadBalancers(*elb.DescribeLoadBalancersInput) (*elb.DescribeLoadBalancersOutput, error) { panic("Not implemented") } func (ec2 *FakeELB) RegisterInstancesWithLoadBalancer(*elb.RegisterInstancesWithLoadBalancerInput) (*elb.RegisterInstancesWithLoadBalancerOutput, error) { panic("Not implemented") } + func (ec2 *FakeELB) DeregisterInstancesFromLoadBalancer(*elb.DeregisterInstancesFromLoadBalancerInput) (*elb.DeregisterInstancesFromLoadBalancerOutput, error) { panic("Not implemented") } +func (ec2 *FakeELB) DetachLoadBalancerFromSubnets(*elb.DetachLoadBalancerFromSubnetsInput) (*elb.DetachLoadBalancerFromSubnetsOutput, error) { + panic("Not implemented") +} + +func (ec2 *FakeELB) AttachLoadBalancerToSubnets(*elb.AttachLoadBalancerToSubnetsInput) (*elb.AttachLoadBalancerToSubnetsOutput, error) { + panic("Not implemented") +} + +func (ec2 *FakeELB) CreateLoadBalancerListeners(*elb.CreateLoadBalancerListenersInput) (*elb.CreateLoadBalancerListenersOutput, error) { + panic("Not implemented") +} + +func (ec2 *FakeELB) DeleteLoadBalancerListeners(*elb.DeleteLoadBalancerListenersInput) (*elb.DeleteLoadBalancerListenersOutput, error) { + panic("Not implemented") +} + +func (ec2 *FakeELB) ApplySecurityGroupsToLoadBalancer(*elb.ApplySecurityGroupsToLoadBalancerInput) (*elb.ApplySecurityGroupsToLoadBalancerOutput, error) { + panic("Not implemented") +} + +func (elb *FakeELB) ConfigureHealthCheck(*elb.ConfigureHealthCheckInput) (*elb.ConfigureHealthCheckOutput, error) { + panic("Not implemented") +} + type FakeASG struct { aws *FakeAWSServices } diff --git a/pkg/cloudprovider/providers/aws/aws_utils.go b/pkg/cloudprovider/providers/aws/aws_utils.go new file mode 100644 index 00000000000..99baeeac3ef --- /dev/null +++ b/pkg/cloudprovider/providers/aws/aws_utils.go @@ -0,0 +1,51 @@ +/* +Copyright 2014 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package aws_cloud + +import ( + "github.com/aws/aws-sdk-go/aws" + "k8s.io/kubernetes/pkg/util" +) + +func stringSetToPointers(in util.StringSet) []*string { + if in == nil { + return nil + } + out := make([]*string, len(in)) + for k := range in { + out = append(out, aws.String(k)) + } + return out +} + +func stringSetFromPointers(in []*string) util.StringSet { + if in == nil { + return nil + } + out := util.NewStringSet() + for i := range in { + out.Insert(orEmpty(in[i])) + } + return out +} + +func orZero(v *int64) int64 { + if v == nil { + return 0 + } + return *v +} diff --git a/pkg/cloudprovider/providers/fake/fake.go b/pkg/cloudprovider/providers/fake/fake.go index c78bf3d5f50..f66eee0c96c 100644 --- a/pkg/cloudprovider/providers/fake/fake.go +++ b/pkg/cloudprovider/providers/fake/fake.go @@ -56,7 +56,7 @@ type FakeCloud struct { ClusterList []string MasterName string ExternalIP net.IP - Balancers []FakeBalancer + Balancers map[string]FakeBalancer UpdateCalls []FakeUpdateBalancerCall RouteMap map[string]*FakeRoute Lock sync.Mutex @@ -123,11 +123,14 @@ func (f *FakeCloud) GetTCPLoadBalancer(name, region string) (*api.LoadBalancerSt return status, f.Exists, f.Err } -// CreateTCPLoadBalancer is a test-spy implementation of TCPLoadBalancer.CreateTCPLoadBalancer. +// EnsureTCPLoadBalancer is a test-spy implementation of TCPLoadBalancer.EnsureTCPLoadBalancer. // It adds an entry "create" into the internal method call record. -func (f *FakeCloud) CreateTCPLoadBalancer(name, region string, externalIP net.IP, ports []*api.ServicePort, hosts []string, affinityType api.ServiceAffinity) (*api.LoadBalancerStatus, error) { +func (f *FakeCloud) EnsureTCPLoadBalancer(name, region string, externalIP net.IP, ports []*api.ServicePort, hosts []string, affinityType api.ServiceAffinity) (*api.LoadBalancerStatus, error) { f.addCall("create") - f.Balancers = append(f.Balancers, FakeBalancer{name, region, externalIP, ports, hosts}) + if f.Balancers == nil { + f.Balancers = make(map[string]FakeBalancer) + } + f.Balancers[name] = FakeBalancer{name, region, externalIP, ports, hosts} status := &api.LoadBalancerStatus{} status.Ingress = []api.LoadBalancerIngress{{IP: f.ExternalIP.String()}} diff --git a/pkg/cloudprovider/providers/gce/gce.go b/pkg/cloudprovider/providers/gce/gce.go index fdbb8c595c0..8d9d0b42987 100644 --- a/pkg/cloudprovider/providers/gce/gce.go +++ b/pkg/cloudprovider/providers/gce/gce.go @@ -348,11 +348,26 @@ func makeFirewallName(name string) string { return fmt.Sprintf("k8s-fw-%s", name) } -// CreateTCPLoadBalancer is an implementation of TCPLoadBalancer.CreateTCPLoadBalancer. +// EnsureTCPLoadBalancer is an implementation of TCPLoadBalancer.EnsureTCPLoadBalancer. // TODO(a-robinson): Don't just ignore specified IP addresses. Check if they're // owned by the project and available to be used, and use them if they are. -func (gce *GCECloud) CreateTCPLoadBalancer(name, region string, externalIP net.IP, ports []*api.ServicePort, hosts []string, affinityType api.ServiceAffinity) (*api.LoadBalancerStatus, error) { - err := gce.makeTargetPool(name, region, hosts, translateAffinityType(affinityType)) +func (gce *GCECloud) EnsureTCPLoadBalancer(name, region string, externalIP net.IP, ports []*api.ServicePort, hosts []string, affinityType api.ServiceAffinity) (*api.LoadBalancerStatus, error) { + glog.V(2).Info("Checking if load balancer already exists: %s", name) + _, exists, err := gce.GetTCPLoadBalancer(name, region) + if err != nil { + return nil, fmt.Errorf("error checking if GCE load balancer already exists: %v", err) + } + + // TODO: Implement a more efficient update strategy for common changes than delete & create + // In particular, if we implement hosts update, we can get rid of UpdateHosts + if exists { + err := gce.EnsureTCPLoadBalancerDeleted(name, region) + if err != nil { + return nil, fmt.Errorf("error deleting existing GCE load balancer: %v", err) + } + } + + err = gce.makeTargetPool(name, region, hosts, translateAffinityType(affinityType)) if err != nil { if !isHTTPErrorCode(err, http.StatusConflict) { return nil, err diff --git a/pkg/cloudprovider/providers/openstack/openstack.go b/pkg/cloudprovider/providers/openstack/openstack.go index b563c6544b7..048c8174053 100644 --- a/pkg/cloudprovider/providers/openstack/openstack.go +++ b/pkg/cloudprovider/providers/openstack/openstack.go @@ -521,8 +521,8 @@ func (lb *LoadBalancer) GetTCPLoadBalancer(name, region string) (*api.LoadBalanc // a list of regions (from config) and query/create loadbalancers in // each region. -func (lb *LoadBalancer) CreateTCPLoadBalancer(name, region string, externalIP net.IP, ports []*api.ServicePort, hosts []string, affinity api.ServiceAffinity) (*api.LoadBalancerStatus, error) { - glog.V(4).Infof("CreateTCPLoadBalancer(%v, %v, %v, %v, %v, %v)", name, region, externalIP, ports, hosts, affinity) +func (lb *LoadBalancer) EnsureTCPLoadBalancer(name, region string, externalIP net.IP, ports []*api.ServicePort, hosts []string, affinity api.ServiceAffinity) (*api.LoadBalancerStatus, error) { + glog.V(4).Infof("EnsureTCPLoadBalancer(%v, %v, %v, %v, %v, %v)", name, region, externalIP, ports, hosts, affinity) if len(ports) > 1 { return nil, fmt.Errorf("multiple ports are not yet supported in openstack load balancers") @@ -538,6 +538,21 @@ func (lb *LoadBalancer) CreateTCPLoadBalancer(name, region string, externalIP ne return nil, fmt.Errorf("unsupported load balancer affinity: %v", affinity) } + glog.V(2).Info("Checking if openstack load balancer already exists: %s", name) + _, exists, err := lb.GetTCPLoadBalancer(name, region) + if err != nil { + return nil, fmt.Errorf("error checking if openstack load balancer already exists: %v", err) + } + + // TODO: Implement a more efficient update strategy for common changes than delete & create + // In particular, if we implement hosts update, we can get rid of UpdateHosts + if exists { + err := lb.EnsureTCPLoadBalancerDeleted(name, region) + if err != nil { + return nil, fmt.Errorf("error deleting existing openstack load balancer: %v", err) + } + } + lbmethod := lb.opts.LBMethod if lbmethod == "" { lbmethod = pools.LBMethodRoundRobin diff --git a/pkg/controller/service/servicecontroller.go b/pkg/controller/service/servicecontroller.go index 7e831bbcae0..4159edca834 100644 --- a/pkg/controller/service/servicecontroller.go +++ b/pkg/controller/service/servicecontroller.go @@ -238,7 +238,14 @@ func (s *ServiceController) processDelta(delta *cache.Delta) (error, bool) { case cache.Sync: err, retry := s.createLoadBalancerIfNeeded(namespacedName, service, cachedService.appliedState) if err != nil { - s.eventRecorder.Event(service, "creating loadbalancer failed", err.Error()) + message := "error creating load balancer" + if retry { + message += " (will retry): " + } else { + message += " (will not retry): " + } + message += err.Error() + s.eventRecorder.Event(service, "creating loadbalancer failed", message) return err, retry } // Always update the cache upon success. @@ -248,11 +255,14 @@ func (s *ServiceController) processDelta(delta *cache.Delta) (error, bool) { cachedService.appliedState = service s.cache.set(namespacedName.String(), cachedService) case cache.Deleted: + s.eventRecorder.Event(service, "deleting loadbalancer", "deleting loadbalancer") err := s.balancer.EnsureTCPLoadBalancerDeleted(s.loadBalancerName(service), s.zone.Region) if err != nil { - s.eventRecorder.Event(service, "deleting loadbalancer failed", err.Error()) + message := "error deleting load balancer (will retry): " + err.Error() + s.eventRecorder.Event(service, "deleting loadbalancer failed", message) return err, retryable } + s.eventRecorder.Event(service, "deleted loadbalancer", "deleted loadbalancer") s.cache.delete(namespacedName.String()) default: glog.Errorf("Unexpected delta type: %v", delta.Type) @@ -262,54 +272,59 @@ func (s *ServiceController) processDelta(delta *cache.Delta) (error, bool) { // Returns whatever error occurred along with a boolean indicator of whether it // should be retried. -func (s *ServiceController) createLoadBalancerIfNeeded(namespacedName types.NamespacedName, service, cachedService *api.Service) (error, bool) { - if cachedService != nil && !needsUpdate(cachedService, service) { +func (s *ServiceController) createLoadBalancerIfNeeded(namespacedName types.NamespacedName, service, appliedState *api.Service) (error, bool) { + if appliedState != nil && !needsUpdate(appliedState, service) { glog.Infof("LB already exists and doesn't need update for service %s", namespacedName) return nil, notRetryable } - if cachedService != nil { - // If the service already exists but needs to be updated, delete it so that - // we can recreate it cleanly. - if wantsExternalLoadBalancer(cachedService) { - glog.Infof("Deleting existing load balancer for service %s that needs an updated load balancer.", namespacedName) - if err := s.balancer.EnsureTCPLoadBalancerDeleted(s.loadBalancerName(cachedService), s.zone.Region); err != nil { - return err, retryable - } - } - } else { - // If we don't have any cached memory of the load balancer, we have to ask - // the cloud provider for what it knows about it. - status, exists, err := s.balancer.GetTCPLoadBalancer(s.loadBalancerName(service), s.zone.Region) - if err != nil { - return fmt.Errorf("Error getting LB for service %s: %v", namespacedName, err), retryable - } - if exists && api.LoadBalancerStatusEqual(status, &service.Status.LoadBalancer) { - glog.Infof("LB already exists with status %s for previously uncached service %s", status, namespacedName) - return nil, notRetryable - } else if exists { - glog.Infof("Deleting old LB for previously uncached service %s whose endpoint %s doesn't match the service's desired IPs %v", - namespacedName, status, service.Spec.DeprecatedPublicIPs) - if err := s.balancer.EnsureTCPLoadBalancerDeleted(s.loadBalancerName(service), s.zone.Region); err != nil { - return err, retryable - } - } - } + + // Note: It is safe to just call EnsureTCPLoadBalancer. But, on some clouds that requires a delete & create, + // which may involve service interruption. Also, we would like user-friendly events. // Save the state so we can avoid a write if it doesn't change previousState := api.LoadBalancerStatusDeepCopy(&service.Status.LoadBalancer) if !wantsExternalLoadBalancer(service) { - glog.Infof("Not creating LB for service %s that doesn't want one.", namespacedName) + needDelete := true + if appliedState != nil { + if !wantsExternalLoadBalancer(appliedState) { + needDelete = false + } + } else { + // If we don't have any cached memory of the load balancer, we have to ask + // the cloud provider for what it knows about it. + // Technically EnsureTCPLoadBalancerDeleted can cope, but we want to post meaningful events + _, exists, err := s.balancer.GetTCPLoadBalancer(s.loadBalancerName(service), s.zone.Region) + if err != nil { + return fmt.Errorf("Error getting LB for service %s: %v", namespacedName, err), retryable + } + if !exists { + needDelete = false + } + } + + if needDelete { + glog.Infof("Deleting existing load balancer for service %s that no longer needs a load balancer.", namespacedName) + s.eventRecorder.Event(service, "deleting loadbalancer", "deleting loadbalancer") + if err := s.balancer.EnsureTCPLoadBalancerDeleted(s.loadBalancerName(service), s.zone.Region); err != nil { + return err, retryable + } + s.eventRecorder.Event(service, "deleted loadbalancer", "deleted loadbalancer") + } service.Status.LoadBalancer = api.LoadBalancerStatus{} } else { - glog.V(2).Infof("Creating LB for service %s", namespacedName) + glog.V(2).Infof("Ensuring LB for service %s", namespacedName) + + // TODO: We could do a dry-run here if wanted to avoid the spurious cloud-calls & events when we restart // The load balancer doesn't exist yet, so create it. + s.eventRecorder.Event(service, "creating loadbalancer", "creating loadbalancer") err := s.createExternalLoadBalancer(service) if err != nil { return fmt.Errorf("failed to create external load balancer for service %s: %v", namespacedName, err), retryable } + s.eventRecorder.Event(service, "created loadbalancer", "created loadbalancer") } // Write the state if changed @@ -319,7 +334,7 @@ func (s *ServiceController) createLoadBalancerIfNeeded(namespacedName types.Name return fmt.Errorf("Failed to persist updated status to apiserver, even after retries. Giving up: %v", err), notRetryable } } else { - glog.Infof("Not persisting unchanged LoadBalancerStatus to registry.") + glog.V(2).Infof("Not persisting unchanged LoadBalancerStatus to registry.") } return nil, notRetryable @@ -367,7 +382,7 @@ func (s *ServiceController) createExternalLoadBalancer(service *api.Service) err for _, publicIP := range service.Spec.DeprecatedPublicIPs { // TODO: Make this actually work for multiple IPs by using different // names for each. For now, we'll just create the first and break. - status, err := s.balancer.CreateTCPLoadBalancer(name, s.zone.Region, net.ParseIP(publicIP), + status, err := s.balancer.EnsureTCPLoadBalancer(name, s.zone.Region, net.ParseIP(publicIP), ports, hostsFromNodeList(&nodes), service.Spec.SessionAffinity) if err != nil { return err @@ -377,7 +392,7 @@ func (s *ServiceController) createExternalLoadBalancer(service *api.Service) err break } } else { - status, err := s.balancer.CreateTCPLoadBalancer(name, s.zone.Region, nil, + status, err := s.balancer.EnsureTCPLoadBalancer(name, s.zone.Region, nil, ports, hostsFromNodeList(&nodes), service.Spec.SessionAffinity) if err != nil { return err @@ -628,7 +643,7 @@ func (s *ServiceController) updateLoadBalancerHosts(services []*cachedService, h func() { service.mu.Lock() defer service.mu.Unlock() - // If the service is nil, that means it hasn't yet been successfully dealt + // If the applied state is nil, that means it hasn't yet been successfully dealt // with by the load balancer reconciler. We can trust the load balancer // reconciler to ensure the service's load balancer is created to target // the correct nodes. @@ -651,9 +666,11 @@ func (s *ServiceController) lockedUpdateLoadBalancerHosts(service *api.Service, return nil } + // This operation doesn't normally take very long (and happens pretty often), so we only record the final event name := cloudprovider.GetLoadBalancerName(service) err := s.balancer.UpdateTCPLoadBalancer(name, s.zone.Region, hosts) if err == nil { + s.eventRecorder.Event(service, "updated loadbalancer", "updated loadbalancer with new hosts") return nil } @@ -663,6 +680,9 @@ func (s *ServiceController) lockedUpdateLoadBalancerHosts(service *api.Service, } else if !exists { return nil } + + message := "error updating loadbalancer with new hosts: " + err.Error() + s.eventRecorder.Event(service, "updating loadbalancer failed", message) return err } diff --git a/pkg/controller/service/servicecontroller_test.go b/pkg/controller/service/servicecontroller_test.go index a775638a51f..7c7da8b3ef3 100644 --- a/pkg/controller/service/servicecontroller_test.go +++ b/pkg/controller/service/servicecontroller_test.go @@ -110,12 +110,22 @@ func TestCreateExternalLoadBalancer(t *testing.T) { t.Errorf("unexpected client actions: %v", actions) } } else { - if len(cloud.Balancers) != 1 { - t.Errorf("expected one load balancer to be created, got %v", cloud.Balancers) - } else if cloud.Balancers[0].Name != controller.loadBalancerName(item.service) || - cloud.Balancers[0].Region != region || - cloud.Balancers[0].Ports[0].Port != item.service.Spec.Ports[0].Port { - t.Errorf("created load balancer has incorrect parameters: %v", cloud.Balancers[0]) + var balancer *fake_cloud.FakeBalancer + for k := range cloud.Balancers { + if balancer == nil { + b := cloud.Balancers[k] + balancer = &b + } else { + t.Errorf("expected one load balancer to be created, got %v", cloud.Balancers) + break + } + } + if balancer == nil { + t.Errorf("expected one load balancer to be created, got none") + } else if balancer.Name != controller.loadBalancerName(item.service) || + balancer.Region != region || + balancer.Ports[0].Port != item.service.Spec.Ports[0].Port { + t.Errorf("created load balancer has incorrect parameters: %v", balancer) } actionFound := false for _, action := range actions { diff --git a/pkg/util/set.go b/pkg/util/set.go index 084593f06ba..431312c6d51 100644 --- a/pkg/util/set.go +++ b/pkg/util/set.go @@ -131,6 +131,21 @@ func (s1 StringSet) IsSuperset(s2 StringSet) bool { return true } +// Equal returns true iff s1 is equal (as a set) to s2. +// Two sets are equal if their membership is identical. +// (In practice, this means same elements, order doesn't matter) +func (s1 StringSet) Equal(s2 StringSet) bool { + if len(s1) != len(s2) { + return false + } + for item := range s2 { + if !s1.Has(item) { + return false + } + } + return true +} + // List returns the contents as a sorted string slice. func (s StringSet) List() []string { res := make([]string, 0, len(s)) diff --git a/pkg/util/set_test.go b/pkg/util/set_test.go index dbd65a9774e..936891feebf 100644 --- a/pkg/util/set_test.go +++ b/pkg/util/set_test.go @@ -129,3 +129,57 @@ func TestStringSetHasAny(t *testing.T) { t.Errorf("expected false, got true") } } + +func TestStringSetEquals(t *testing.T) { + // Simple case (order doesn't matter) + a := NewStringSet("1", "2") + b := NewStringSet("2", "1") + if !a.Equal(b) { + t.Errorf("Expected to be equal: %v vs %v", a, b) + } + + // It is a set; duplicates are ignored + b = NewStringSet("2", "2", "1") + if !a.Equal(b) { + t.Errorf("Expected to be equal: %v vs %v", a, b) + } + + // Edge cases around empty sets / empty strings + a = NewStringSet() + b = NewStringSet() + if !a.Equal(b) { + t.Errorf("Expected to be equal: %v vs %v", a, b) + } + + b = NewStringSet("1", "2", "3") + if a.Equal(b) { + t.Errorf("Expected to be not-equal: %v vs %v", a, b) + } + + b = NewStringSet("1", "2", "") + if a.Equal(b) { + t.Errorf("Expected to be not-equal: %v vs %v", a, b) + } + + // Check for equality after mutation + a = NewStringSet() + a.Insert("1") + if a.Equal(b) { + t.Errorf("Expected to be not-equal: %v vs %v", a, b) + } + + a.Insert("2") + if a.Equal(b) { + t.Errorf("Expected to be not-equal: %v vs %v", a, b) + } + + a.Insert("") + if !a.Equal(b) { + t.Errorf("Expected to be equal: %v vs %v", a, b) + } + + a.Delete("") + if a.Equal(b) { + t.Errorf("Expected to be not-equal: %v vs %v", a, b) + } +} diff --git a/test/e2e/service.go b/test/e2e/service.go index c4a900bc424..c73425d332d 100644 --- a/test/e2e/service.go +++ b/test/e2e/service.go @@ -586,21 +586,15 @@ var _ = Describe("Services", func() { if len(service.Status.LoadBalancer.Ingress) != 1 { Failf("got unexpected len(Status.LoadBalancer.Ingresss) for NodePort service: %v", service) } + + // TODO: Make this less of a hack. Watch for events? + Logf("Waiting 2 minutes to give service time to settle after changing configuration") + time.Sleep(time.Second * 120) + service, err = waitForLoadBalancerIngress(f.Client, serviceName, f.Namespace.Name) + Expect(err).NotTo(HaveOccurred()) + ingress2 := service.Status.LoadBalancer.Ingress[0] - - // TODO: Fix the issue here: http://issue.k8s.io/11002 - if providerIs("aws") { - // TODO: Make this less of a hack (or fix the underlying bug) - time.Sleep(time.Second * 120) - service, err = waitForLoadBalancerIngress(f.Client, serviceName, f.Namespace.Name) - Expect(err).NotTo(HaveOccurred()) - - // We don't want the ingress point to change, but we should verify that the new ingress point still works - ingress2 = service.Status.LoadBalancer.Ingress[0] - Expect(ingress1).NotTo(Equal(ingress2)) - } else { - Expect(ingress1).To(Equal(ingress2)) - } + Expect(ingress1).To(Equal(ingress2)) By("hitting the pod through the service's updated NodePort") testReachable(ip, nodePort2)