From 1ab62e541bdaa3f5febd8d398a0656e7806e9c8e Mon Sep 17 00:00:00 2001 From: Justin Santa Barbara Date: Sat, 13 Jun 2015 11:39:52 -0400 Subject: [PATCH 1/9] Add more events to LoadBalancer management It would be nice to see these events in the service like we do with pods; also we've had trouble here so a few more events would be handy! --- pkg/controller/service/servicecontroller.go | 25 +++++++++++++++++++-- 1 file changed, 23 insertions(+), 2 deletions(-) diff --git a/pkg/controller/service/servicecontroller.go b/pkg/controller/service/servicecontroller.go index cb166d7b930..d8022dbd84b 100644 --- a/pkg/controller/service/servicecontroller.go +++ b/pkg/controller/service/servicecontroller.go @@ -238,7 +238,14 @@ func (s *ServiceController) processDelta(delta *cache.Delta) (error, bool) { case cache.Sync: err, retry := s.createLoadBalancerIfNeeded(namespacedName, service, cachedService.appliedState) if err != nil { - s.eventRecorder.Event(service, "creating loadbalancer failed", err.Error()) + message := "error creating load balancer" + if retry { + message += " (will retry): " + } else { + message += " (will not retry): " + } + message += err.Error() + s.eventRecorder.Event(service, "creating loadbalancer failed", message) return err, retry } // Always update the cache upon success. @@ -248,11 +255,14 @@ func (s *ServiceController) processDelta(delta *cache.Delta) (error, bool) { cachedService.appliedState = service s.cache.set(namespacedName.String(), cachedService) case cache.Deleted: + s.eventRecorder.Event(service, "deleting loadbalancer", "deleting loadbalancer") err := s.balancer.EnsureTCPLoadBalancerDeleted(s.loadBalancerName(service), s.zone.Region) if err != nil { - s.eventRecorder.Event(service, "deleting loadbalancer failed", err.Error()) + message := "error deleting load balancer (will retry): " + err.Error() + s.eventRecorder.Event(service, "deleting loadbalancer failed", message) return err, retryable } + s.eventRecorder.Event(service, "deleted loadbalancer", "deleted loadbalancer") s.cache.delete(namespacedName.String()) default: glog.Errorf("Unexpected delta type: %v", delta.Type) @@ -272,9 +282,11 @@ func (s *ServiceController) createLoadBalancerIfNeeded(namespacedName types.Name // we can recreate it cleanly. if wantsExternalLoadBalancer(cachedService) { glog.Infof("Deleting existing load balancer for service %s that needs an updated load balancer.", namespacedName) + s.eventRecorder.Event(service, "deleting loadbalancer", "deleting loadbalancer, will recreate with updated configuration") if err := s.balancer.EnsureTCPLoadBalancerDeleted(s.loadBalancerName(cachedService), s.zone.Region); err != nil { return err, retryable } + s.eventRecorder.Event(service, "deleted loadbalancer", "deleted loadbalancer") } } else { // If we don't have any cached memory of the load balancer, we have to ask @@ -289,9 +301,11 @@ func (s *ServiceController) createLoadBalancerIfNeeded(namespacedName types.Name } else if exists { glog.Infof("Deleting old LB for previously uncached service %s whose endpoint %s doesn't match the service's desired IPs %v", namespacedName, status, service.Spec.DeprecatedPublicIPs) + s.eventRecorder.Event(service, "deleting loadbalancer", "deleting loadbalancer, will recreate with updated IPs") if err := s.balancer.EnsureTCPLoadBalancerDeleted(s.loadBalancerName(service), s.zone.Region); err != nil { return err, retryable } + s.eventRecorder.Event(service, "deleted loadbalancer", "deleted loadbalancer") } } @@ -306,10 +320,12 @@ func (s *ServiceController) createLoadBalancerIfNeeded(namespacedName types.Name glog.V(2).Infof("Creating LB for service %s", namespacedName) // The load balancer doesn't exist yet, so create it. + s.eventRecorder.Event(service, "creating loadbalancer", "creating loadbalancer") err := s.createExternalLoadBalancer(service) if err != nil { return fmt.Errorf("failed to create external load balancer for service %s: %v", namespacedName, err), retryable } + s.eventRecorder.Event(service, "created loadbalancer", "created loadbalancer") } // Write the state if changed @@ -651,9 +667,11 @@ func (s *ServiceController) lockedUpdateLoadBalancerHosts(service *api.Service, return nil } + // This operation doesn't normally take very long (and happens pretty often), so we only record the final event name := cloudprovider.GetLoadBalancerName(service) err := s.balancer.UpdateTCPLoadBalancer(name, s.zone.Region, hosts) if err == nil { + s.eventRecorder.Event(service, "updated loadbalancer", "updated loadbalancer with new hosts") return nil } @@ -663,6 +681,9 @@ func (s *ServiceController) lockedUpdateLoadBalancerHosts(service *api.Service, } else if !exists { return nil } + + message := "error updating loadbalancer with new hosts: " + err.Error() + s.eventRecorder.Event(service, "updating loadbalancer failed", message) return err } From 87df1d6fb687ca36cdae0a7738b83beaa7e5d9e6 Mon Sep 17 00:00:00 2001 From: Justin Santa Barbara Date: Sat, 13 Jun 2015 11:58:39 -0400 Subject: [PATCH 2/9] Change CreateTCPLoadBalancer -> EnsureTCPLoadBalancer; implementations auto-delete if already exists Previously the servicecontroller would do the delete, but by having the cloudprovider take that task on, we can later remove it from the servicecontroller, and the cloudprovider can do something more efficient. --- pkg/cloudprovider/cloud.go | 4 ++-- pkg/cloudprovider/providers/aws/aws.go | 19 ++++++++++++++-- pkg/cloudprovider/providers/fake/fake.go | 11 ++++++---- pkg/cloudprovider/providers/gce/gce.go | 21 +++++++++++++++--- .../providers/openstack/openstack.go | 19 ++++++++++++++-- pkg/controller/service/servicecontroller.go | 6 ++--- .../service/servicecontroller_test.go | 22 ++++++++++++++----- 7 files changed, 80 insertions(+), 22 deletions(-) diff --git a/pkg/cloudprovider/cloud.go b/pkg/cloudprovider/cloud.go index 39ca7493606..50241fd28de 100644 --- a/pkg/cloudprovider/cloud.go +++ b/pkg/cloudprovider/cloud.go @@ -80,8 +80,8 @@ type TCPLoadBalancer interface { // GetTCPLoadBalancer returns whether the specified load balancer exists, and // if so, what its status is. GetTCPLoadBalancer(name, region string) (status *api.LoadBalancerStatus, exists bool, err error) - // CreateTCPLoadBalancer creates a new tcp load balancer. Returns the status of the balancer - CreateTCPLoadBalancer(name, region string, externalIP net.IP, ports []*api.ServicePort, hosts []string, affinityType api.ServiceAffinity) (*api.LoadBalancerStatus, error) + // EnsureTCPLoadBalancer creates a new tcp load balancer, or updates an existing one. Returns the status of the balancer + EnsureTCPLoadBalancer(name, region string, externalIP net.IP, ports []*api.ServicePort, hosts []string, affinityType api.ServiceAffinity) (*api.LoadBalancerStatus, error) // UpdateTCPLoadBalancer updates hosts under the specified load balancer. UpdateTCPLoadBalancer(name, region string, hosts []string) error // EnsureTCPLoadBalancerDeleted deletes the specified load balancer if it diff --git a/pkg/cloudprovider/providers/aws/aws.go b/pkg/cloudprovider/providers/aws/aws.go index 3a049718e56..cf8381fe552 100644 --- a/pkg/cloudprovider/providers/aws/aws.go +++ b/pkg/cloudprovider/providers/aws/aws.go @@ -1584,8 +1584,23 @@ func (s *AWSCloud) createTags(request *ec2.CreateTagsInput) (*ec2.CreateTagsOutp // CreateTCPLoadBalancer implements TCPLoadBalancer.CreateTCPLoadBalancer // TODO(justinsb): This must be idempotent // TODO(justinsb) It is weird that these take a region. I suspect it won't work cross-region anwyay. -func (s *AWSCloud) CreateTCPLoadBalancer(name, region string, publicIP net.IP, ports []*api.ServicePort, hosts []string, affinity api.ServiceAffinity) (*api.LoadBalancerStatus, error) { - glog.V(2).Infof("CreateTCPLoadBalancer(%v, %v, %v, %v, %v)", name, region, publicIP, ports, hosts) +func (s *AWSCloud) EnsureTCPLoadBalancer(name, region string, publicIP net.IP, ports []*api.ServicePort, hosts []string, affinity api.ServiceAffinity) (*api.LoadBalancerStatus, error) { + glog.V(2).Infof("EnsureTCPLoadBalancer(%v, %v, %v, %v, %v)", name, region, publicIP, ports, hosts) + + glog.V(2).Info("Checking if load balancer already exists: %s", name) + _, exists, err := s.GetTCPLoadBalancer(name, region) + if err != nil { + return nil, fmt.Errorf("error checking if AWS load balancer already exists: %v", err) + } + + // TODO: Implement a more efficient update strategy for common changes than delete & create + // In particular, if we implement hosts update, we can get rid of UpdateHosts + if exists { + err := s.EnsureTCPLoadBalancerDeleted(name, region) + if err != nil { + return nil, fmt.Errorf("error deleting existing AWS load balancer: %v", err) + } + } elbClient, err := s.getELBClient(region) if err != nil { diff --git a/pkg/cloudprovider/providers/fake/fake.go b/pkg/cloudprovider/providers/fake/fake.go index c78bf3d5f50..f66eee0c96c 100644 --- a/pkg/cloudprovider/providers/fake/fake.go +++ b/pkg/cloudprovider/providers/fake/fake.go @@ -56,7 +56,7 @@ type FakeCloud struct { ClusterList []string MasterName string ExternalIP net.IP - Balancers []FakeBalancer + Balancers map[string]FakeBalancer UpdateCalls []FakeUpdateBalancerCall RouteMap map[string]*FakeRoute Lock sync.Mutex @@ -123,11 +123,14 @@ func (f *FakeCloud) GetTCPLoadBalancer(name, region string) (*api.LoadBalancerSt return status, f.Exists, f.Err } -// CreateTCPLoadBalancer is a test-spy implementation of TCPLoadBalancer.CreateTCPLoadBalancer. +// EnsureTCPLoadBalancer is a test-spy implementation of TCPLoadBalancer.EnsureTCPLoadBalancer. // It adds an entry "create" into the internal method call record. -func (f *FakeCloud) CreateTCPLoadBalancer(name, region string, externalIP net.IP, ports []*api.ServicePort, hosts []string, affinityType api.ServiceAffinity) (*api.LoadBalancerStatus, error) { +func (f *FakeCloud) EnsureTCPLoadBalancer(name, region string, externalIP net.IP, ports []*api.ServicePort, hosts []string, affinityType api.ServiceAffinity) (*api.LoadBalancerStatus, error) { f.addCall("create") - f.Balancers = append(f.Balancers, FakeBalancer{name, region, externalIP, ports, hosts}) + if f.Balancers == nil { + f.Balancers = make(map[string]FakeBalancer) + } + f.Balancers[name] = FakeBalancer{name, region, externalIP, ports, hosts} status := &api.LoadBalancerStatus{} status.Ingress = []api.LoadBalancerIngress{{IP: f.ExternalIP.String()}} diff --git a/pkg/cloudprovider/providers/gce/gce.go b/pkg/cloudprovider/providers/gce/gce.go index fdbb8c595c0..8d9d0b42987 100644 --- a/pkg/cloudprovider/providers/gce/gce.go +++ b/pkg/cloudprovider/providers/gce/gce.go @@ -348,11 +348,26 @@ func makeFirewallName(name string) string { return fmt.Sprintf("k8s-fw-%s", name) } -// CreateTCPLoadBalancer is an implementation of TCPLoadBalancer.CreateTCPLoadBalancer. +// EnsureTCPLoadBalancer is an implementation of TCPLoadBalancer.EnsureTCPLoadBalancer. // TODO(a-robinson): Don't just ignore specified IP addresses. Check if they're // owned by the project and available to be used, and use them if they are. -func (gce *GCECloud) CreateTCPLoadBalancer(name, region string, externalIP net.IP, ports []*api.ServicePort, hosts []string, affinityType api.ServiceAffinity) (*api.LoadBalancerStatus, error) { - err := gce.makeTargetPool(name, region, hosts, translateAffinityType(affinityType)) +func (gce *GCECloud) EnsureTCPLoadBalancer(name, region string, externalIP net.IP, ports []*api.ServicePort, hosts []string, affinityType api.ServiceAffinity) (*api.LoadBalancerStatus, error) { + glog.V(2).Info("Checking if load balancer already exists: %s", name) + _, exists, err := gce.GetTCPLoadBalancer(name, region) + if err != nil { + return nil, fmt.Errorf("error checking if GCE load balancer already exists: %v", err) + } + + // TODO: Implement a more efficient update strategy for common changes than delete & create + // In particular, if we implement hosts update, we can get rid of UpdateHosts + if exists { + err := gce.EnsureTCPLoadBalancerDeleted(name, region) + if err != nil { + return nil, fmt.Errorf("error deleting existing GCE load balancer: %v", err) + } + } + + err = gce.makeTargetPool(name, region, hosts, translateAffinityType(affinityType)) if err != nil { if !isHTTPErrorCode(err, http.StatusConflict) { return nil, err diff --git a/pkg/cloudprovider/providers/openstack/openstack.go b/pkg/cloudprovider/providers/openstack/openstack.go index 49fdb1882ea..0f42f38b52f 100644 --- a/pkg/cloudprovider/providers/openstack/openstack.go +++ b/pkg/cloudprovider/providers/openstack/openstack.go @@ -521,8 +521,8 @@ func (lb *LoadBalancer) GetTCPLoadBalancer(name, region string) (*api.LoadBalanc // a list of regions (from config) and query/create loadbalancers in // each region. -func (lb *LoadBalancer) CreateTCPLoadBalancer(name, region string, externalIP net.IP, ports []*api.ServicePort, hosts []string, affinity api.ServiceAffinity) (*api.LoadBalancerStatus, error) { - glog.V(4).Infof("CreateTCPLoadBalancer(%v, %v, %v, %v, %v, %v)", name, region, externalIP, ports, hosts, affinity) +func (lb *LoadBalancer) EnsureTCPLoadBalancer(name, region string, externalIP net.IP, ports []*api.ServicePort, hosts []string, affinity api.ServiceAffinity) (*api.LoadBalancerStatus, error) { + glog.V(4).Infof("EnsureTCPLoadBalancer(%v, %v, %v, %v, %v, %v)", name, region, externalIP, ports, hosts, affinity) if len(ports) > 1 { return nil, fmt.Errorf("multiple ports are not yet supported in openstack load balancers") @@ -538,6 +538,21 @@ func (lb *LoadBalancer) CreateTCPLoadBalancer(name, region string, externalIP ne return nil, fmt.Errorf("unsupported load balancer affinity: %v", affinity) } + glog.V(2).Info("Checking if openstack load balancer already exists: %s", name) + _, exists, err := lb.GetTCPLoadBalancer(name, region) + if err != nil { + return nil, fmt.Errorf("error checking if openstack load balancer already exists: %v", err) + } + + // TODO: Implement a more efficient update strategy for common changes than delete & create + // In particular, if we implement hosts update, we can get rid of UpdateHosts + if exists { + err := lb.EnsureTCPLoadBalancerDeleted(name, region) + if err != nil { + return nil, fmt.Errorf("error deleting existing openstack load balancer: %v", err) + } + } + lbmethod := lb.opts.LBMethod if lbmethod == "" { lbmethod = pools.LBMethodRoundRobin diff --git a/pkg/controller/service/servicecontroller.go b/pkg/controller/service/servicecontroller.go index d8022dbd84b..9382109f812 100644 --- a/pkg/controller/service/servicecontroller.go +++ b/pkg/controller/service/servicecontroller.go @@ -335,7 +335,7 @@ func (s *ServiceController) createLoadBalancerIfNeeded(namespacedName types.Name return fmt.Errorf("Failed to persist updated status to apiserver, even after retries. Giving up: %v", err), notRetryable } } else { - glog.Infof("Not persisting unchanged LoadBalancerStatus to registry.") + glog.V(2).Infof("Not persisting unchanged LoadBalancerStatus to registry.") } return nil, notRetryable @@ -383,7 +383,7 @@ func (s *ServiceController) createExternalLoadBalancer(service *api.Service) err for _, publicIP := range service.Spec.DeprecatedPublicIPs { // TODO: Make this actually work for multiple IPs by using different // names for each. For now, we'll just create the first and break. - status, err := s.balancer.CreateTCPLoadBalancer(name, s.zone.Region, net.ParseIP(publicIP), + status, err := s.balancer.EnsureTCPLoadBalancer(name, s.zone.Region, net.ParseIP(publicIP), ports, hostsFromNodeList(&nodes), service.Spec.SessionAffinity) if err != nil { return err @@ -393,7 +393,7 @@ func (s *ServiceController) createExternalLoadBalancer(service *api.Service) err break } } else { - status, err := s.balancer.CreateTCPLoadBalancer(name, s.zone.Region, nil, + status, err := s.balancer.EnsureTCPLoadBalancer(name, s.zone.Region, nil, ports, hostsFromNodeList(&nodes), service.Spec.SessionAffinity) if err != nil { return err diff --git a/pkg/controller/service/servicecontroller_test.go b/pkg/controller/service/servicecontroller_test.go index 28e55cd50ff..2d443494fce 100644 --- a/pkg/controller/service/servicecontroller_test.go +++ b/pkg/controller/service/servicecontroller_test.go @@ -110,12 +110,22 @@ func TestCreateExternalLoadBalancer(t *testing.T) { t.Errorf("unexpected client actions: %v", actions) } } else { - if len(cloud.Balancers) != 1 { - t.Errorf("expected one load balancer to be created, got %v", cloud.Balancers) - } else if cloud.Balancers[0].Name != controller.loadBalancerName(item.service) || - cloud.Balancers[0].Region != region || - cloud.Balancers[0].Ports[0].Port != item.service.Spec.Ports[0].Port { - t.Errorf("created load balancer has incorrect parameters: %v", cloud.Balancers[0]) + var balancer *fake_cloud.FakeBalancer + for k := range cloud.Balancers { + if balancer == nil { + b := cloud.Balancers[k] + balancer = &b + } else { + t.Errorf("expected one load balancer to be created, got %v", cloud.Balancers) + break + } + } + if balancer == nil { + t.Errorf("expected one load balancer to be created, got none") + } else if balancer.Name != controller.loadBalancerName(item.service) || + balancer.Region != region || + balancer.Ports[0].Port != item.service.Spec.Ports[0].Port { + t.Errorf("created load balancer has incorrect parameters: %v", balancer) } actionFound := false for _, action := range actions { From 8c365d51c77dd8a9e99864da5117acaace7af04d Mon Sep 17 00:00:00 2001 From: Justin Santa Barbara Date: Sat, 13 Jun 2015 12:24:26 -0400 Subject: [PATCH 3/9] servicecontroller relies on cloudprovider to delete LB if needed We previously made the cloudproviders take on the responsibility of deleting existing load balancers; this lets us simplify the servicecontroller logic and also lays the groundwork for more efficient cloudprovider LB implementations to do an in-place change on a LB. --- pkg/controller/service/servicecontroller.go | 71 ++++++++++----------- 1 file changed, 35 insertions(+), 36 deletions(-) diff --git a/pkg/controller/service/servicecontroller.go b/pkg/controller/service/servicecontroller.go index 9382109f812..6f936337f78 100644 --- a/pkg/controller/service/servicecontroller.go +++ b/pkg/controller/service/servicecontroller.go @@ -272,52 +272,51 @@ func (s *ServiceController) processDelta(delta *cache.Delta) (error, bool) { // Returns whatever error occurred along with a boolean indicator of whether it // should be retried. -func (s *ServiceController) createLoadBalancerIfNeeded(namespacedName types.NamespacedName, service, cachedService *api.Service) (error, bool) { - if cachedService != nil && !needsUpdate(cachedService, service) { +func (s *ServiceController) createLoadBalancerIfNeeded(namespacedName types.NamespacedName, service, appliedState *api.Service) (error, bool) { + if appliedState != nil && !needsUpdate(appliedState, service) { glog.Infof("LB already exists and doesn't need update for service %s", namespacedName) return nil, notRetryable } - if cachedService != nil { - // If the service already exists but needs to be updated, delete it so that - // we can recreate it cleanly. - if wantsExternalLoadBalancer(cachedService) { - glog.Infof("Deleting existing load balancer for service %s that needs an updated load balancer.", namespacedName) - s.eventRecorder.Event(service, "deleting loadbalancer", "deleting loadbalancer, will recreate with updated configuration") - if err := s.balancer.EnsureTCPLoadBalancerDeleted(s.loadBalancerName(cachedService), s.zone.Region); err != nil { - return err, retryable - } - s.eventRecorder.Event(service, "deleted loadbalancer", "deleted loadbalancer") - } - } else { - // If we don't have any cached memory of the load balancer, we have to ask - // the cloud provider for what it knows about it. - status, exists, err := s.balancer.GetTCPLoadBalancer(s.loadBalancerName(service), s.zone.Region) - if err != nil { - return fmt.Errorf("Error getting LB for service %s: %v", namespacedName, err), retryable - } - if exists && api.LoadBalancerStatusEqual(status, &service.Status.LoadBalancer) { - glog.Infof("LB already exists with status %s for previously uncached service %s", status, namespacedName) - return nil, notRetryable - } else if exists { - glog.Infof("Deleting old LB for previously uncached service %s whose endpoint %s doesn't match the service's desired IPs %v", - namespacedName, status, service.Spec.DeprecatedPublicIPs) - s.eventRecorder.Event(service, "deleting loadbalancer", "deleting loadbalancer, will recreate with updated IPs") - if err := s.balancer.EnsureTCPLoadBalancerDeleted(s.loadBalancerName(service), s.zone.Region); err != nil { - return err, retryable - } - s.eventRecorder.Event(service, "deleted loadbalancer", "deleted loadbalancer") - } - } + + // Note: It is safe to just call EnsureTCPLoadBalancer. But, on some clouds that requires a delete & create, + // which may involve service interruption. Also, we would like user-friendly events. // Save the state so we can avoid a write if it doesn't change previousState := api.LoadBalancerStatusDeepCopy(&service.Status.LoadBalancer) if !wantsExternalLoadBalancer(service) { - glog.Infof("Not creating LB for service %s that doesn't want one.", namespacedName) + needDelete := true + if appliedState != nil { + if !wantsExternalLoadBalancer(appliedState) { + needDelete = false + } + } else { + // If we don't have any cached memory of the load balancer, we have to ask + // the cloud provider for what it knows about it. + // Technically EnsureTCPLoadBalancerDeleted can cope, but we want to post meaningful events + _, exists, err := s.balancer.GetTCPLoadBalancer(s.loadBalancerName(service), s.zone.Region) + if err != nil { + return fmt.Errorf("Error getting LB for service %s: %v", namespacedName, err), retryable + } + if !exists { + needDelete = false + } + } + + if needDelete { + glog.Infof("Deleting existing load balancer for service %s that no longer needs a load balancer.", namespacedName) + s.eventRecorder.Event(service, "deleting loadbalancer", "deleting loadbalancer") + if err := s.balancer.EnsureTCPLoadBalancerDeleted(s.loadBalancerName(service), s.zone.Region); err != nil { + return err, retryable + } + s.eventRecorder.Event(service, "deleted loadbalancer", "deleted loadbalancer") + } service.Status.LoadBalancer = api.LoadBalancerStatus{} } else { - glog.V(2).Infof("Creating LB for service %s", namespacedName) + glog.V(2).Infof("Ensuring LB for service %s", namespacedName) + + // TODO: We could do a dry-run here if wanted to avoid the spurious cloud-calls & events when we restart // The load balancer doesn't exist yet, so create it. s.eventRecorder.Event(service, "creating loadbalancer", "creating loadbalancer") @@ -644,7 +643,7 @@ func (s *ServiceController) updateLoadBalancerHosts(services []*cachedService, h func() { service.mu.Lock() defer service.mu.Unlock() - // If the service is nil, that means it hasn't yet been successfully dealt + // If the applied state is nil, that means it hasn't yet been successfully dealt // with by the load balancer reconciler. We can trust the load balancer // reconciler to ensure the service's load balancer is created to target // the correct nodes. From 924350d5f633d3aa8f305b127bca5ff91f25fdfb Mon Sep 17 00:00:00 2001 From: Justin Santa Barbara Date: Sat, 13 Jun 2015 14:45:38 -0400 Subject: [PATCH 4/9] AWS: Make load balancer creation idempotent on AWS This turned out to be a little convoluted, but is needed because deleting an ELB on AWS is a painful UX - it won't have the same endpoint when it is recreated. Also started splitting the provider into files, but only for new functions (so far!) --- pkg/cloudprovider/aws/aws_loadbalancer.go | 248 ++++++++++++++++++++ pkg/cloudprovider/aws/aws_utils.go | 51 ++++ pkg/cloudprovider/providers/aws/aws.go | 169 ++++--------- pkg/cloudprovider/providers/aws/aws_test.go | 23 ++ pkg/util/set.go | 15 ++ 5 files changed, 378 insertions(+), 128 deletions(-) create mode 100644 pkg/cloudprovider/aws/aws_loadbalancer.go create mode 100644 pkg/cloudprovider/aws/aws_utils.go diff --git a/pkg/cloudprovider/aws/aws_loadbalancer.go b/pkg/cloudprovider/aws/aws_loadbalancer.go new file mode 100644 index 00000000000..519968be676 --- /dev/null +++ b/pkg/cloudprovider/aws/aws_loadbalancer.go @@ -0,0 +1,248 @@ +/* +Copyright 2014 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package aws_cloud + +import ( + "fmt" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/ec2" + "github.com/aws/aws-sdk-go/service/elb" + "github.com/golang/glog" +) + +func (s *AWSCloud) ensureLoadBalancer(region, name string, listeners []*elb.Listener, subnetIDs []string, securityGroupIDs []string) (*elb.LoadBalancerDescription, error) { + elbClient, err := s.getELBClient(region) + if err != nil { + return nil, err + } + + loadBalancer, err := s.describeLoadBalancer(region, name) + if err != nil { + return nil, err + } + + dirty := false + + if loadBalancer == nil { + createRequest := &elb.CreateLoadBalancerInput{} + createRequest.LoadBalancerName = aws.String(name) + + createRequest.Listeners = listeners + + // We are supposed to specify one subnet per AZ. + // TODO: What happens if we have more than one subnet per AZ? + createRequest.Subnets = stringPointerArray(subnetIDs) + + createRequest.SecurityGroups = stringPointerArray(securityGroupIDs) + + glog.Info("Creating load balancer with name: ", name) + _, err := elbClient.CreateLoadBalancer(createRequest) + if err != nil { + return nil, err + } + dirty = true + } else { + { + // Sync subnets + expected := util.NewStringSet(subnetIDs...) + actual := stringSetFromPointers(loadBalancer.Subnets) + + additions := expected.Difference(actual) + removals := actual.Difference(expected) + + if len(removals) != 0 { + request := &elb.DetachLoadBalancerFromSubnetsInput{} + request.LoadBalancerName = aws.String(name) + request.Subnets = stringSetToPointers(removals) + glog.V(2).Info("Detaching load balancer from removed subnets") + _, err := elbClient.DetachLoadBalancerFromSubnets(request) + if err != nil { + return nil, fmt.Errorf("error detaching AWS loadbalancer from subnets: %v", err) + } + dirty = true + } + + if len(additions) != 0 { + request := &elb.AttachLoadBalancerToSubnetsInput{} + request.LoadBalancerName = aws.String(name) + request.Subnets = stringSetToPointers(additions) + glog.V(2).Info("Attaching load balancer to added subnets") + _, err := elbClient.AttachLoadBalancerToSubnets(request) + if err != nil { + return nil, fmt.Errorf("error attaching AWS loadbalancer to subnets: %v", err) + } + dirty = true + } + } + + { + // Sync security groups + expected := util.NewStringSet(securityGroupIDs...) + actual := stringSetFromPointers(loadBalancer.SecurityGroups) + + if !expected.Equal(actual) { + // This call just replaces the security groups, unlike e.g. subnets (!) + request := &elb.ApplySecurityGroupsToLoadBalancerInput{} + request.LoadBalancerName = aws.String(name) + request.SecurityGroups = stringPointerArray(securityGroupIDs) + glog.V(2).Info("Applying updated security groups to load balancer") + _, err := elbClient.ApplySecurityGroupsToLoadBalancer(request) + if err != nil { + return nil, fmt.Errorf("error applying AWS loadbalancer security groups: %v", err) + } + dirty = true + } + } + + { + // Sync listeners + listenerDescriptions := loadBalancer.ListenerDescriptions + + foundSet := make(map[int]bool) + removals := []*int64{} + for _, listenerDescription := range listenerDescriptions { + actual := listenerDescription.Listener + if actual == nil { + glog.Warning("Ignoring empty listener in AWS loadbalancer: ", name) + continue + } + + found := -1 + for i, expected := range listeners { + if orEmpty(actual.Protocol) != orEmpty(expected.Protocol) { + continue + } + if orEmpty(actual.InstanceProtocol) != orEmpty(expected.InstanceProtocol) { + continue + } + if orZero(actual.InstancePort) != orZero(expected.InstancePort) { + continue + } + if orZero(actual.LoadBalancerPort) != orZero(expected.LoadBalancerPort) { + continue + } + if orEmpty(actual.SSLCertificateID) != orEmpty(expected.SSLCertificateID) { + continue + } + found = i + } + if found != -1 { + foundSet[found] = true + } else { + removals = append(removals, actual.LoadBalancerPort) + } + } + + additions := []*elb.Listener{} + for i := range listeners { + if foundSet[i] { + continue + } + additions = append(additions, listeners[i]) + } + + if len(removals) != 0 { + request := &elb.DeleteLoadBalancerListenersInput{} + request.LoadBalancerName = aws.String(name) + request.LoadBalancerPorts = removals + glog.V(2).Info("Deleting removed load balancer listeners") + _, err := elbClient.DeleteLoadBalancerListeners(request) + if err != nil { + return nil, fmt.Errorf("error deleting AWS loadbalancer listeners: %v", err) + } + dirty = true + } + + if len(additions) != 0 { + request := &elb.CreateLoadBalancerListenersInput{} + request.LoadBalancerName = aws.String(name) + request.Listeners = additions + glog.V(2).Info("Creating added load balancer listeners") + _, err := elbClient.CreateLoadBalancerListeners(request) + if err != nil { + return nil, fmt.Errorf("error creating AWS loadbalancer listeners: %v", err) + } + dirty = true + } + } + } + + if dirty { + loadBalancer, err = s.describeLoadBalancer(region, name) + if err != nil { + glog.Warning("Unable to retrieve load balancer after creation/update") + return nil, err + } + } + + return loadBalancer, nil +} + +// Makes sure that exactly the specified hosts are registered as instances with the load balancer +func (s *AWSCloud) ensureLoadBalancerInstances(elbClient ELB, loadBalancerName string, lbInstances []*elb.Instance, instances []*ec2.Instance) error { + expected := util.NewStringSet() + for _, instance := range instances { + expected.Insert(orEmpty(instance.InstanceID)) + } + + actual := util.NewStringSet() + for _, lbInstance := range lbInstances { + actual.Insert(orEmpty(lbInstance.InstanceID)) + } + + additions := expected.Difference(actual) + removals := actual.Difference(expected) + + addInstances := []*elb.Instance{} + for instanceId := range additions { + addInstance := &elb.Instance{} + addInstance.InstanceID = aws.String(instanceId) + addInstances = append(addInstances, addInstance) + } + + removeInstances := []*elb.Instance{} + for instanceId := range removals { + removeInstance := &elb.Instance{} + removeInstance.InstanceID = aws.String(instanceId) + removeInstances = append(removeInstances, removeInstance) + } + + if len(addInstances) > 0 { + registerRequest := &elb.RegisterInstancesWithLoadBalancerInput{} + registerRequest.Instances = addInstances + registerRequest.LoadBalancerName = aws.String(loadBalancerName) + _, err := elbClient.RegisterInstancesWithLoadBalancer(registerRequest) + if err != nil { + return err + } + glog.V(1).Infof("Instances added to load-balancer %s", loadBalancerName) + } + + if len(removeInstances) > 0 { + deregisterRequest := &elb.DeregisterInstancesFromLoadBalancerInput{} + deregisterRequest.Instances = removeInstances + deregisterRequest.LoadBalancerName = aws.String(loadBalancerName) + _, err := elbClient.DeregisterInstancesFromLoadBalancer(deregisterRequest) + if err != nil { + return err + } + glog.V(1).Infof("Instances removed from load-balancer %s", loadBalancerName) + } + + return nil +} diff --git a/pkg/cloudprovider/aws/aws_utils.go b/pkg/cloudprovider/aws/aws_utils.go new file mode 100644 index 00000000000..a4f80ad8a6b --- /dev/null +++ b/pkg/cloudprovider/aws/aws_utils.go @@ -0,0 +1,51 @@ +/* +Copyright 2014 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package aws_cloud + +import ( + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + "github.com/aws/aws-sdk-go/aws" +) + +func stringSetToPointers(in util.StringSet) []*string { + if in == nil { + return nil + } + out := make([]*string, len(in)) + for k := range in { + out = append(out, aws.String(k)) + } + return out +} + +func stringSetFromPointers(in []*string) util.StringSet { + if in == nil { + return nil + } + out := util.NewStringSet() + for i := range in { + out.Insert(orEmpty(in[i])) + } + return out +} + +func orZero(v *int64) int64 { + if v == nil { + return 0 + } + return *v +} diff --git a/pkg/cloudprovider/providers/aws/aws.go b/pkg/cloudprovider/providers/aws/aws.go index cf8381fe552..54cd9d4a586 100644 --- a/pkg/cloudprovider/providers/aws/aws.go +++ b/pkg/cloudprovider/providers/aws/aws.go @@ -109,6 +109,14 @@ type ELB interface { DescribeLoadBalancers(*elb.DescribeLoadBalancersInput) (*elb.DescribeLoadBalancersOutput, error) RegisterInstancesWithLoadBalancer(*elb.RegisterInstancesWithLoadBalancerInput) (*elb.RegisterInstancesWithLoadBalancerOutput, error) DeregisterInstancesFromLoadBalancer(*elb.DeregisterInstancesFromLoadBalancerInput) (*elb.DeregisterInstancesFromLoadBalancerOutput, error) + + DetachLoadBalancerFromSubnets(*elb.DetachLoadBalancerFromSubnetsInput) (*elb.DetachLoadBalancerFromSubnetsOutput, error) + AttachLoadBalancerToSubnets(*elb.AttachLoadBalancerToSubnetsInput) (*elb.AttachLoadBalancerToSubnetsOutput, error) + + CreateLoadBalancerListeners(*elb.CreateLoadBalancerListenersInput) (*elb.CreateLoadBalancerListenersOutput, error) + DeleteLoadBalancerListeners(*elb.DeleteLoadBalancerListenersInput) (*elb.DeleteLoadBalancerListenersOutput, error) + + ApplySecurityGroupsToLoadBalancer(*elb.ApplySecurityGroupsToLoadBalancerInput) (*elb.ApplySecurityGroupsToLoadBalancerOutput, error) } // This is a simple pass-through of the Autoscaling client interface, which allows for testing @@ -1581,27 +1589,11 @@ func (s *AWSCloud) createTags(request *ec2.CreateTagsInput) (*ec2.CreateTagsOutp } } -// CreateTCPLoadBalancer implements TCPLoadBalancer.CreateTCPLoadBalancer -// TODO(justinsb): This must be idempotent +// EnsureTCPLoadBalancer implements TCPLoadBalancer.EnsureTCPLoadBalancer // TODO(justinsb) It is weird that these take a region. I suspect it won't work cross-region anwyay. func (s *AWSCloud) EnsureTCPLoadBalancer(name, region string, publicIP net.IP, ports []*api.ServicePort, hosts []string, affinity api.ServiceAffinity) (*api.LoadBalancerStatus, error) { glog.V(2).Infof("EnsureTCPLoadBalancer(%v, %v, %v, %v, %v)", name, region, publicIP, ports, hosts) - glog.V(2).Info("Checking if load balancer already exists: %s", name) - _, exists, err := s.GetTCPLoadBalancer(name, region) - if err != nil { - return nil, fmt.Errorf("error checking if AWS load balancer already exists: %v", err) - } - - // TODO: Implement a more efficient update strategy for common changes than delete & create - // In particular, if we implement hosts update, we can get rid of UpdateHosts - if exists { - err := s.EnsureTCPLoadBalancerDeleted(name, region) - if err != nil { - return nil, fmt.Errorf("error deleting existing AWS load balancer: %v", err) - } - } - elbClient, err := s.getELBClient(region) if err != nil { return nil, err @@ -1631,7 +1623,7 @@ func (s *AWSCloud) EnsureTCPLoadBalancer(name, region string, publicIP net.IP, p } // Construct list of configured subnets - subnetIds := []*string{} + subnetIDs := []string{} { request := &ec2.DescribeSubnetsInput{} filters := []*ec2.Filter{} @@ -1647,7 +1639,7 @@ func (s *AWSCloud) EnsureTCPLoadBalancer(name, region string, publicIP net.IP, p // zones := []string{} for _, subnet := range subnets { - subnetIds = append(subnetIds, subnet.SubnetID) + subnetIDs = append(subnetIDs, orEmpty(subnet.SubnetID)) if !strings.HasPrefix(orEmpty(subnet.AvailabilityZone), region) { glog.Error("found AZ that did not match region", orEmpty(subnet.AvailabilityZone), " vs ", region) return nil, fmt.Errorf("invalid AZ for region") @@ -1686,60 +1678,32 @@ func (s *AWSCloud) EnsureTCPLoadBalancer(name, region string, publicIP net.IP, p return nil, err } } + securityGroupIDs := []string{securityGroupID} + + // Figure out what mappings we want on the load balancer + listeners := []*elb.Listener{} + for _, port := range ports { + if port.NodePort == 0 { + glog.Errorf("Ignoring port without NodePort defined: %v", port) + continue + } + instancePort := int64(port.NodePort) + loadBalancerPort := int64(port.Port) + protocol := strings.ToLower(string(port.Protocol)) + + listener := &elb.Listener{} + listener.InstancePort = &instancePort + listener.LoadBalancerPort = &loadBalancerPort + listener.Protocol = &protocol + listener.InstanceProtocol = &protocol + + listeners = append(listeners, listener) + } // Build the load balancer itself - var loadBalancer *elb.LoadBalancerDescription - { - loadBalancer, err = s.describeLoadBalancer(region, name) - if err != nil { - return nil, err - } - - if loadBalancer == nil { - createRequest := &elb.CreateLoadBalancerInput{} - createRequest.LoadBalancerName = aws.String(name) - - listeners := []*elb.Listener{} - for _, port := range ports { - if port.NodePort == 0 { - glog.Errorf("Ignoring port without NodePort defined: %v", port) - continue - } - instancePort := int64(port.NodePort) - loadBalancerPort := int64(port.Port) - protocol := strings.ToLower(string(port.Protocol)) - - listener := &elb.Listener{} - listener.InstancePort = &instancePort - listener.LoadBalancerPort = &loadBalancerPort - listener.Protocol = &protocol - listener.InstanceProtocol = &protocol - - listeners = append(listeners, listener) - } - - createRequest.Listeners = listeners - - // We are supposed to specify one subnet per AZ. - // TODO: What happens if we have more than one subnet per AZ? - createRequest.Subnets = subnetIds - - createRequest.SecurityGroups = []*string{&securityGroupID} - - glog.Info("Creating load balancer with name: ", name) - _, err := elbClient.CreateLoadBalancer(createRequest) - if err != nil { - return nil, err - } - - loadBalancer, err = s.describeLoadBalancer(region, name) - if err != nil { - glog.Warning("Unable to retrieve load balancer immediately after creation") - return nil, err - } - } else { - // TODO: Verify that load balancer configuration matches? - } + loadBalancer, err := s.ensureLoadBalancer(region, name, listeners, subnetIDs, securityGroupIDs) + if err != nil { + return nil, err } err = s.updateInstanceSecurityGroupsForLoadBalancer(loadBalancer, instances) @@ -1748,22 +1712,12 @@ func (s *AWSCloud) EnsureTCPLoadBalancer(name, region string, publicIP net.IP, p return nil, err } - registerRequest := &elb.RegisterInstancesWithLoadBalancerInput{} - registerRequest.LoadBalancerName = loadBalancer.LoadBalancerName - for _, instance := range instances { - registerInstance := &elb.Instance{} - registerInstance.InstanceID = instance.InstanceID - registerRequest.Instances = append(registerRequest.Instances, registerInstance) - } - - registerResponse, err := elbClient.RegisterInstancesWithLoadBalancer(registerRequest) + err = s.ensureLoadBalancerInstances(elbClient, orEmpty(loadBalancer.LoadBalancerName), loadBalancer.Instances, instances) if err != nil { - // TODO: Is it better to delete the load balancer entirely? - glog.Warningf("Error registering instances with load-balancer %s: %v", name, err) + glog.Warning("Error registering instances with the load balancer: %v", err) return nil, err } - glog.V(1).Infof("Updated instances registered with load-balancer %s: %v", name, registerResponse.Instances) glog.V(1).Infof("Loadbalancer %s has DNS name %s", name, orEmpty(loadBalancer.DNSName)) // TODO: Wait for creation? @@ -2010,6 +1964,7 @@ func (s *AWSCloud) EnsureTCPLoadBalancerDeleted(name, region string) error { } if len(securityGroupIDs) == 0 { + glog.V(2).Info("deleted all security groups for load balancer: ", name) break } @@ -2047,51 +2002,9 @@ func (s *AWSCloud) UpdateTCPLoadBalancer(name, region string, hosts []string) er return fmt.Errorf("Load balancer not found") } - existingInstances := map[string]*elb.Instance{} - for _, instance := range lb.Instances { - existingInstances[orEmpty(instance.InstanceID)] = instance - } - - wantInstances := map[string]*ec2.Instance{} - for _, instance := range instances { - wantInstances[orEmpty(instance.InstanceID)] = instance - } - - addInstances := []*elb.Instance{} - for instanceId := range wantInstances { - addInstance := &elb.Instance{} - addInstance.InstanceID = aws.String(instanceId) - addInstances = append(addInstances, addInstance) - } - - removeInstances := []*elb.Instance{} - for instanceId := range existingInstances { - _, found := wantInstances[instanceId] - if !found { - removeInstance := &elb.Instance{} - removeInstance.InstanceID = aws.String(instanceId) - removeInstances = append(removeInstances, removeInstance) - } - } - - if len(addInstances) > 0 { - registerRequest := &elb.RegisterInstancesWithLoadBalancerInput{} - registerRequest.Instances = addInstances - registerRequest.LoadBalancerName = lb.LoadBalancerName - _, err = elbClient.RegisterInstancesWithLoadBalancer(registerRequest) - if err != nil { - return err - } - } - - if len(removeInstances) > 0 { - deregisterRequest := &elb.DeregisterInstancesFromLoadBalancerInput{} - deregisterRequest.Instances = removeInstances - deregisterRequest.LoadBalancerName = lb.LoadBalancerName - _, err = elbClient.DeregisterInstancesFromLoadBalancer(deregisterRequest) - if err != nil { - return err - } + err = s.ensureLoadBalancerInstances(elbClient, orEmpty(lb.LoadBalancerName), lb.Instances, instances) + if err != nil { + return nil } err = s.updateInstanceSecurityGroupsForLoadBalancer(lb, instances) diff --git a/pkg/cloudprovider/providers/aws/aws_test.go b/pkg/cloudprovider/providers/aws/aws_test.go index 817363f7c8d..5066ca1407c 100644 --- a/pkg/cloudprovider/providers/aws/aws_test.go +++ b/pkg/cloudprovider/providers/aws/aws_test.go @@ -394,19 +394,42 @@ type FakeELB struct { func (ec2 *FakeELB) CreateLoadBalancer(*elb.CreateLoadBalancerInput) (*elb.CreateLoadBalancerOutput, error) { panic("Not implemented") } + func (ec2 *FakeELB) DeleteLoadBalancer(*elb.DeleteLoadBalancerInput) (*elb.DeleteLoadBalancerOutput, error) { panic("Not implemented") } + func (ec2 *FakeELB) DescribeLoadBalancers(*elb.DescribeLoadBalancersInput) (*elb.DescribeLoadBalancersOutput, error) { panic("Not implemented") } func (ec2 *FakeELB) RegisterInstancesWithLoadBalancer(*elb.RegisterInstancesWithLoadBalancerInput) (*elb.RegisterInstancesWithLoadBalancerOutput, error) { panic("Not implemented") } + func (ec2 *FakeELB) DeregisterInstancesFromLoadBalancer(*elb.DeregisterInstancesFromLoadBalancerInput) (*elb.DeregisterInstancesFromLoadBalancerOutput, error) { panic("Not implemented") } +func (ec2 *FakeELB) DetachLoadBalancerFromSubnets(*elb.DetachLoadBalancerFromSubnetsInput) (*elb.DetachLoadBalancerFromSubnetsOutput, error) { + panic("Not implemented") +} + +func (ec2 *FakeELB) AttachLoadBalancerToSubnets(*elb.AttachLoadBalancerToSubnetsInput) (*elb.AttachLoadBalancerToSubnetsOutput, error) { + panic("Not implemented") +} + +func (ec2 *FakeELB) CreateLoadBalancerListeners(*elb.CreateLoadBalancerListenersInput) (*elb.CreateLoadBalancerListenersOutput, error) { + panic("Not implemented") +} + +func (ec2 *FakeELB) DeleteLoadBalancerListeners(*elb.DeleteLoadBalancerListenersInput) (*elb.DeleteLoadBalancerListenersOutput, error) { + panic("Not implemented") +} + +func (ec2 *FakeELB) ApplySecurityGroupsToLoadBalancer(*elb.ApplySecurityGroupsToLoadBalancerInput) (*elb.ApplySecurityGroupsToLoadBalancerOutput, error) { + panic("Not implemented") +} + type FakeASG struct { aws *FakeAWSServices } diff --git a/pkg/util/set.go b/pkg/util/set.go index 084593f06ba..431312c6d51 100644 --- a/pkg/util/set.go +++ b/pkg/util/set.go @@ -131,6 +131,21 @@ func (s1 StringSet) IsSuperset(s2 StringSet) bool { return true } +// Equal returns true iff s1 is equal (as a set) to s2. +// Two sets are equal if their membership is identical. +// (In practice, this means same elements, order doesn't matter) +func (s1 StringSet) Equal(s2 StringSet) bool { + if len(s1) != len(s2) { + return false + } + for item := range s2 { + if !s1.Has(item) { + return false + } + } + return true +} + // List returns the contents as a sorted string slice. func (s StringSet) List() []string { res := make([]string, 0, len(s)) From cb38b02f2c266e57d37a6a8afd88243d2068dc20 Mon Sep 17 00:00:00 2001 From: Justin Santa Barbara Date: Mon, 27 Jul 2015 11:23:27 -0400 Subject: [PATCH 5/9] Add unit test for StringSet Equal --- pkg/util/set_test.go | 54 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 54 insertions(+) diff --git a/pkg/util/set_test.go b/pkg/util/set_test.go index dbd65a9774e..936891feebf 100644 --- a/pkg/util/set_test.go +++ b/pkg/util/set_test.go @@ -129,3 +129,57 @@ func TestStringSetHasAny(t *testing.T) { t.Errorf("expected false, got true") } } + +func TestStringSetEquals(t *testing.T) { + // Simple case (order doesn't matter) + a := NewStringSet("1", "2") + b := NewStringSet("2", "1") + if !a.Equal(b) { + t.Errorf("Expected to be equal: %v vs %v", a, b) + } + + // It is a set; duplicates are ignored + b = NewStringSet("2", "2", "1") + if !a.Equal(b) { + t.Errorf("Expected to be equal: %v vs %v", a, b) + } + + // Edge cases around empty sets / empty strings + a = NewStringSet() + b = NewStringSet() + if !a.Equal(b) { + t.Errorf("Expected to be equal: %v vs %v", a, b) + } + + b = NewStringSet("1", "2", "3") + if a.Equal(b) { + t.Errorf("Expected to be not-equal: %v vs %v", a, b) + } + + b = NewStringSet("1", "2", "") + if a.Equal(b) { + t.Errorf("Expected to be not-equal: %v vs %v", a, b) + } + + // Check for equality after mutation + a = NewStringSet() + a.Insert("1") + if a.Equal(b) { + t.Errorf("Expected to be not-equal: %v vs %v", a, b) + } + + a.Insert("2") + if a.Equal(b) { + t.Errorf("Expected to be not-equal: %v vs %v", a, b) + } + + a.Insert("") + if !a.Equal(b) { + t.Errorf("Expected to be equal: %v vs %v", a, b) + } + + a.Delete("") + if a.Equal(b) { + t.Errorf("Expected to be not-equal: %v vs %v", a, b) + } +} From fde0a8884f57bd2383418e9556cc541450e431c6 Mon Sep 17 00:00:00 2001 From: Justin Santa Barbara Date: Mon, 27 Jul 2015 11:40:07 -0400 Subject: [PATCH 6/9] e2e: Test load-balanced service endpoint preserved Previously we weren't preserving the service endpoint on a load-balanced service, at least on AWS. The test had to test reality, not aspirations! Now we should have fixed this, so we can revert the e2e test to check that the service endpoint is indeed preserved. Fixes #11002 --- test/e2e/service.go | 22 ++++++++-------------- 1 file changed, 8 insertions(+), 14 deletions(-) diff --git a/test/e2e/service.go b/test/e2e/service.go index 043337517ac..c09ae48dbfc 100644 --- a/test/e2e/service.go +++ b/test/e2e/service.go @@ -586,21 +586,15 @@ var _ = Describe("Services", func() { if len(service.Status.LoadBalancer.Ingress) != 1 { Failf("got unexpected len(Status.LoadBalancer.Ingresss) for NodePort service: %v", service) } + + // TODO: Make this less of a hack. Watch for events? + Logf("Waiting 2 minutes to give service time to settle after changing configuration") + time.Sleep(time.Second * 120) + service, err = waitForLoadBalancerIngress(f.Client, serviceName, f.Namespace.Name) + Expect(err).NotTo(HaveOccurred()) + ingress2 := service.Status.LoadBalancer.Ingress[0] - - // TODO: Fix the issue here: http://issue.k8s.io/11002 - if providerIs("aws") { - // TODO: Make this less of a hack (or fix the underlying bug) - time.Sleep(time.Second * 120) - service, err = waitForLoadBalancerIngress(f.Client, serviceName, f.Namespace.Name) - Expect(err).NotTo(HaveOccurred()) - - // We don't want the ingress point to change, but we should verify that the new ingress point still works - ingress2 = service.Status.LoadBalancer.Ingress[0] - Expect(ingress1).NotTo(Equal(ingress2)) - } else { - Expect(ingress1).To(Equal(ingress2)) - } + Expect(ingress1).To(Equal(ingress2)) By("hitting the pod through the service's updated NodePort") testReachable(ip, nodePort2) From 08e904ad9611fa9233dc29c51c3d1ce5170382b0 Mon Sep 17 00:00:00 2001 From: Justin Santa Barbara Date: Fri, 31 Jul 2015 00:24:46 -0400 Subject: [PATCH 7/9] AWS: Configure LoadBalancer health checks ELB will automatically create a health check, but if we update the listeners the old health check port sticks around, and all the instances are marked offline. Update the health-checks to match the listeners: we just check the first valid service port, with some hard-coded options for timeouts / retries etc. --- pkg/cloudprovider/aws/aws_loadbalancer.go | 60 +++++++++++++++++++++ pkg/cloudprovider/providers/aws/aws.go | 7 +++ pkg/cloudprovider/providers/aws/aws_test.go | 4 ++ 3 files changed, 71 insertions(+) diff --git a/pkg/cloudprovider/aws/aws_loadbalancer.go b/pkg/cloudprovider/aws/aws_loadbalancer.go index 519968be676..2a353440e8d 100644 --- a/pkg/cloudprovider/aws/aws_loadbalancer.go +++ b/pkg/cloudprovider/aws/aws_loadbalancer.go @@ -18,6 +18,8 @@ package aws_cloud import ( "fmt" + "strconv" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/ec2" @@ -193,6 +195,64 @@ func (s *AWSCloud) ensureLoadBalancer(region, name string, listeners []*elb.List return loadBalancer, nil } +// Makes sure that the health check for an ELB matches the configured listeners +func (s *AWSCloud) ensureLoadBalancerHealthCheck(region string, loadBalancer *elb.LoadBalancerDescription, listeners []*elb.Listener) error { + elbClient, err := s.getELBClient(region) + if err != nil { + return err + } + + actual := loadBalancer.HealthCheck + + // Default AWS settings + expectedHealthyThreshold := int64(10) + expectedUnhealthyThreshold := int64(2) + expectedTimeout := int64(5) + expectedInterval := int64(30) + + // We only a TCP health-check on the first port + expectedTarget := "" + for _, listener := range listeners { + if listener.InstancePort == nil { + continue + } + expectedTarget = "TCP:" + strconv.FormatInt(*listener.InstancePort, 10) + break + } + + if expectedTarget == "" { + return fmt.Errorf("unable to determine health check port (no valid listeners)") + } + + if expectedTarget == orEmpty(actual.Target) && + expectedHealthyThreshold == orZero(actual.HealthyThreshold) && + expectedUnhealthyThreshold == orZero(actual.UnhealthyThreshold) && + expectedTimeout == orZero(actual.Timeout) && + expectedInterval == orZero(actual.Interval) { + return nil + } + + glog.V(2).Info("Updating load-balancer health-check") + + healthCheck := &elb.HealthCheck{} + healthCheck.HealthyThreshold = &expectedHealthyThreshold + healthCheck.UnhealthyThreshold = &expectedUnhealthyThreshold + healthCheck.Timeout = &expectedTimeout + healthCheck.Interval = &expectedInterval + healthCheck.Target = &expectedTarget + + request := &elb.ConfigureHealthCheckInput{} + request.HealthCheck = healthCheck + request.LoadBalancerName = loadBalancer.LoadBalancerName + + _, err = elbClient.ConfigureHealthCheck(request) + if err != nil { + return fmt.Errorf("error configuring load-balancer health-check: %v", err) + } + + return nil +} + // Makes sure that exactly the specified hosts are registered as instances with the load balancer func (s *AWSCloud) ensureLoadBalancerInstances(elbClient ELB, loadBalancerName string, lbInstances []*elb.Instance, instances []*ec2.Instance) error { expected := util.NewStringSet() diff --git a/pkg/cloudprovider/providers/aws/aws.go b/pkg/cloudprovider/providers/aws/aws.go index 54cd9d4a586..1ffd48c7a98 100644 --- a/pkg/cloudprovider/providers/aws/aws.go +++ b/pkg/cloudprovider/providers/aws/aws.go @@ -117,6 +117,8 @@ type ELB interface { DeleteLoadBalancerListeners(*elb.DeleteLoadBalancerListenersInput) (*elb.DeleteLoadBalancerListenersOutput, error) ApplySecurityGroupsToLoadBalancer(*elb.ApplySecurityGroupsToLoadBalancerInput) (*elb.ApplySecurityGroupsToLoadBalancerOutput, error) + + ConfigureHealthCheck(*elb.ConfigureHealthCheckInput) (*elb.ConfigureHealthCheckOutput, error) } // This is a simple pass-through of the Autoscaling client interface, which allows for testing @@ -1706,6 +1708,11 @@ func (s *AWSCloud) EnsureTCPLoadBalancer(name, region string, publicIP net.IP, p return nil, err } + err = s.ensureLoadBalancerHealthCheck(region, loadBalancer, listeners) + if err != nil { + return nil, err + } + err = s.updateInstanceSecurityGroupsForLoadBalancer(loadBalancer, instances) if err != nil { glog.Warning("Error opening ingress rules for the load balancer to the instances: ", err) diff --git a/pkg/cloudprovider/providers/aws/aws_test.go b/pkg/cloudprovider/providers/aws/aws_test.go index 5066ca1407c..29bbd8a2e07 100644 --- a/pkg/cloudprovider/providers/aws/aws_test.go +++ b/pkg/cloudprovider/providers/aws/aws_test.go @@ -430,6 +430,10 @@ func (ec2 *FakeELB) ApplySecurityGroupsToLoadBalancer(*elb.ApplySecurityGroupsTo panic("Not implemented") } +func (elb *FakeELB) ConfigureHealthCheck(*elb.ConfigureHealthCheckInput) (*elb.ConfigureHealthCheckOutput, error) { + panic("Not implemented") +} + type FakeASG struct { aws *FakeAWSServices } From d947a8f78bbd6d3b7192a8cb953fd6719abc7c46 Mon Sep 17 00:00:00 2001 From: Justin Santa Barbara Date: Sat, 8 Aug 2015 07:18:05 -0400 Subject: [PATCH 8/9] Update imports to new k8s.io style --- pkg/cloudprovider/aws/aws_loadbalancer.go | 2 +- pkg/cloudprovider/aws/aws_utils.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/cloudprovider/aws/aws_loadbalancer.go b/pkg/cloudprovider/aws/aws_loadbalancer.go index 2a353440e8d..eef723be8cf 100644 --- a/pkg/cloudprovider/aws/aws_loadbalancer.go +++ b/pkg/cloudprovider/aws/aws_loadbalancer.go @@ -20,11 +20,11 @@ import ( "fmt" "strconv" - "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/ec2" "github.com/aws/aws-sdk-go/service/elb" "github.com/golang/glog" + "k8s.io/kubernetes/pkg/util" ) func (s *AWSCloud) ensureLoadBalancer(region, name string, listeners []*elb.Listener, subnetIDs []string, securityGroupIDs []string) (*elb.LoadBalancerDescription, error) { diff --git a/pkg/cloudprovider/aws/aws_utils.go b/pkg/cloudprovider/aws/aws_utils.go index a4f80ad8a6b..99baeeac3ef 100644 --- a/pkg/cloudprovider/aws/aws_utils.go +++ b/pkg/cloudprovider/aws/aws_utils.go @@ -17,8 +17,8 @@ limitations under the License. package aws_cloud import ( - "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/aws/aws-sdk-go/aws" + "k8s.io/kubernetes/pkg/util" ) func stringSetToPointers(in util.StringSet) []*string { From 907090a777b7191ccce699f4956782490ef7f1cb Mon Sep 17 00:00:00 2001 From: Justin Santa Barbara Date: Mon, 17 Aug 2015 08:59:29 -0400 Subject: [PATCH 9/9] Move new files into pkg/cloudprovider/providers/aws/ They were not caught by the general reorg because they weren't on master. --- pkg/cloudprovider/{ => providers}/aws/aws_loadbalancer.go | 0 pkg/cloudprovider/{ => providers}/aws/aws_utils.go | 0 2 files changed, 0 insertions(+), 0 deletions(-) rename pkg/cloudprovider/{ => providers}/aws/aws_loadbalancer.go (100%) rename pkg/cloudprovider/{ => providers}/aws/aws_utils.go (100%) diff --git a/pkg/cloudprovider/aws/aws_loadbalancer.go b/pkg/cloudprovider/providers/aws/aws_loadbalancer.go similarity index 100% rename from pkg/cloudprovider/aws/aws_loadbalancer.go rename to pkg/cloudprovider/providers/aws/aws_loadbalancer.go diff --git a/pkg/cloudprovider/aws/aws_utils.go b/pkg/cloudprovider/providers/aws/aws_utils.go similarity index 100% rename from pkg/cloudprovider/aws/aws_utils.go rename to pkg/cloudprovider/providers/aws/aws_utils.go