/* Copyright 2014 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package aws import ( "crypto/sha1" "fmt" "reflect" "strconv" "strings" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/service/ec2" "github.com/aws/aws-sdk-go/service/elb" "github.com/aws/aws-sdk-go/service/elbv2" "github.com/golang/glog" "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" ) const ( ProxyProtocolPolicyName = "k8s-proxyprotocol-enabled" SSLNegotiationPolicyNameFormat = "k8s-SSLNegotiationPolicy-%s" ) var ( // Defaults for ELB Healthcheck defaultHCHealthyThreshold = int64(2) defaultHCUnhealthyThreshold = int64(6) defaultHCTimeout = int64(5) defaultHCInterval = int64(10) ) func isNLB(annotations map[string]string) bool { if annotations[ServiceAnnotationLoadBalancerType] == "nlb" { return true } return false } type nlbPortMapping struct { FrontendPort int64 TrafficPort int64 ClientCIDR string HealthCheckPort int64 HealthCheckPath string HealthCheckProtocol string } // getLoadBalancerAdditionalTags converts the comma separated list of key-value // pairs in the ServiceAnnotationLoadBalancerAdditionalTags annotation and returns // it as a map. func getLoadBalancerAdditionalTags(annotations map[string]string) map[string]string { additionalTags := make(map[string]string) if additionalTagsList, ok := annotations[ServiceAnnotationLoadBalancerAdditionalTags]; ok { additionalTagsList = strings.TrimSpace(additionalTagsList) // Break up list of "Key1=Val,Key2=Val2" tagList := strings.Split(additionalTagsList, ",") // Break up "Key=Val" for _, tagSet := range tagList { tag := strings.Split(strings.TrimSpace(tagSet), "=") // Accept "Key=val" or "Key=" or just "Key" if len(tag) >= 2 && len(tag[0]) != 0 { // There is a key and a value, so save it additionalTags[tag[0]] = tag[1] } else if len(tag) == 1 && len(tag[0]) != 0 { // Just "Key" additionalTags[tag[0]] = "" } } } return additionalTags } // ensureLoadBalancerv2 ensures a v2 load balancer is created func (c *Cloud) ensureLoadBalancerv2(namespacedName types.NamespacedName, loadBalancerName string, mappings []nlbPortMapping, instanceIDs, subnetIDs []string, internalELB bool, annotations map[string]string) (*elbv2.LoadBalancer, error) { loadBalancer, err := c.describeLoadBalancerv2(loadBalancerName) if err != nil { return nil, err } dirty := false // Get additional tags set by the user tags := getLoadBalancerAdditionalTags(annotations) // Add default tags tags[TagNameKubernetesService] = namespacedName.String() tags = c.tagging.buildTags(ResourceLifecycleOwned, tags) if loadBalancer == nil { // Create the LB createRequest := &elbv2.CreateLoadBalancerInput{ Type: aws.String(elbv2.LoadBalancerTypeEnumNetwork), Name: aws.String(loadBalancerName), } if internalELB { createRequest.Scheme = aws.String("internal") } // We are supposed to specify one subnet per AZ. // TODO: What happens if we have more than one subnet per AZ? createRequest.SubnetMappings = createSubnetMappings(subnetIDs) for k, v := range tags { createRequest.Tags = append(createRequest.Tags, &elbv2.Tag{ Key: aws.String(k), Value: aws.String(v), }) } glog.Infof("Creating load balancer for %v with name: %s", namespacedName, loadBalancerName) createResponse, err := c.elbv2.CreateLoadBalancer(createRequest) if err != nil { return nil, fmt.Errorf("Error creating load balancer: %q", err) } loadBalancer = createResponse.LoadBalancers[0] // Create Target Groups addTagsInput := &elbv2.AddTagsInput{ ResourceArns: []*string{}, Tags: []*elbv2.Tag{}, } for i := range mappings { // It is easier to keep track of updates by having possibly // duplicate target groups where the backend port is the same _, targetGroupArn, err := c.createListenerV2(createResponse.LoadBalancers[0].LoadBalancerArn, mappings[i], namespacedName, instanceIDs, *createResponse.LoadBalancers[0].VpcId) if err != nil { return nil, fmt.Errorf("Error creating listener: %q", err) } addTagsInput.ResourceArns = append(addTagsInput.ResourceArns, targetGroupArn) } // Add tags to targets for k, v := range tags { addTagsInput.Tags = append(addTagsInput.Tags, &elbv2.Tag{ Key: aws.String(k), Value: aws.String(v), }) } if len(addTagsInput.ResourceArns) > 0 && len(addTagsInput.Tags) > 0 { _, err = c.elbv2.AddTags(addTagsInput) if err != nil { return nil, fmt.Errorf("Error adding tags after creating Load Balancer: %q", err) } } } else { // TODO: Sync internal vs non-internal // sync mappings { listenerDescriptions, err := c.elbv2.DescribeListeners( &elbv2.DescribeListenersInput{ LoadBalancerArn: loadBalancer.LoadBalancerArn, }, ) if err != nil { return nil, fmt.Errorf("Error describing listeners: %q", err) } // actual maps FrontendPort to an elbv2.Listener actual := map[int64]*elbv2.Listener{} for _, listener := range listenerDescriptions.Listeners { actual[*listener.Port] = listener } actualTargetGroups, err := c.elbv2.DescribeTargetGroups( &elbv2.DescribeTargetGroupsInput{ LoadBalancerArn: loadBalancer.LoadBalancerArn, }, ) if err != nil { return nil, fmt.Errorf("Error listing target groups: %q", err) } nodePortTargetGroup := map[int64]*elbv2.TargetGroup{} for _, targetGroup := range actualTargetGroups.TargetGroups { nodePortTargetGroup[*targetGroup.Port] = targetGroup } // Create Target Groups addTagsInput := &elbv2.AddTagsInput{ ResourceArns: []*string{}, Tags: []*elbv2.Tag{}, } // Handle additions/modifications for _, mapping := range mappings { frontendPort := mapping.FrontendPort nodePort := mapping.TrafficPort // modifications if listener, ok := actual[frontendPort]; ok { // nodePort must have changed, we'll need to delete old TG // and recreate if targetGroup, ok := nodePortTargetGroup[nodePort]; !ok { // Create new Target group targetName := createTargetName(namespacedName, frontendPort, nodePort) targetGroup, err = c.ensureTargetGroup( nil, mapping, instanceIDs, targetName, *loadBalancer.VpcId, ) if err != nil { return nil, err } // Associate new target group to LB _, err := c.elbv2.ModifyListener(&elbv2.ModifyListenerInput{ ListenerArn: listener.ListenerArn, Port: aws.Int64(frontendPort), Protocol: aws.String("TCP"), DefaultActions: []*elbv2.Action{{ TargetGroupArn: targetGroup.TargetGroupArn, Type: aws.String("forward"), }}, }) if err != nil { return nil, fmt.Errorf("Error updating load balancer listener: %q", err) } // Delete old target group _, err = c.elbv2.DeleteTargetGroup(&elbv2.DeleteTargetGroupInput{ TargetGroupArn: listener.DefaultActions[0].TargetGroupArn, }) if err != nil { return nil, fmt.Errorf("Error deleting old target group: %q", err) } } else { // Run ensureTargetGroup to make sure instances in service are up-to-date targetName := createTargetName(namespacedName, frontendPort, nodePort) _, err = c.ensureTargetGroup( targetGroup, mapping, instanceIDs, targetName, *loadBalancer.VpcId, ) if err != nil { return nil, err } } dirty = true continue } // Additions _, targetGroupArn, err := c.createListenerV2(loadBalancer.LoadBalancerArn, mapping, namespacedName, instanceIDs, *loadBalancer.VpcId) if err != nil { return nil, err } addTagsInput.ResourceArns = append(addTagsInput.ResourceArns, targetGroupArn) dirty = true } frontEndPorts := map[int64]bool{} for i := range mappings { frontEndPorts[mappings[i].FrontendPort] = true } // handle deletions for port, listener := range actual { if _, ok := frontEndPorts[port]; !ok { err := c.deleteListenerV2(listener) if err != nil { return nil, err } dirty = true } } // Add tags to new targets for k, v := range tags { addTagsInput.Tags = append(addTagsInput.Tags, &elbv2.Tag{ Key: aws.String(k), Value: aws.String(v), }) } if len(addTagsInput.ResourceArns) > 0 && len(addTagsInput.Tags) > 0 { _, err = c.elbv2.AddTags(addTagsInput) if err != nil { return nil, fmt.Errorf("Error adding tags after modifying load balancer targets: %q", err) } } } // Subnets cannot be modified on NLBs if dirty { loadBalancers, err := c.elbv2.DescribeLoadBalancers( &elbv2.DescribeLoadBalancersInput{ LoadBalancerArns: []*string{ loadBalancer.LoadBalancerArn, }, }, ) if err != nil { return nil, fmt.Errorf("Error retrieving load balancer after update: %q", err) } loadBalancer = loadBalancers.LoadBalancers[0] } } return loadBalancer, nil } // create a valid target group name - ensure name is not over 32 characters func createTargetName(namespacedName types.NamespacedName, frontendPort, nodePort int64) string { sha := fmt.Sprintf("%x", sha1.Sum([]byte(namespacedName.String())))[:13] return fmt.Sprintf("k8s-tg-%s-%d-%d", sha, frontendPort, nodePort) } func (c *Cloud) createListenerV2(loadBalancerArn *string, mapping nlbPortMapping, namespacedName types.NamespacedName, instanceIDs []string, vpcID string) (listener *elbv2.Listener, targetGroupArn *string, err error) { targetName := createTargetName(namespacedName, mapping.FrontendPort, mapping.TrafficPort) glog.Infof("Creating load balancer target group for %v with name: %s", namespacedName, targetName) target, err := c.ensureTargetGroup( nil, mapping, instanceIDs, targetName, vpcID, ) if err != nil { return nil, aws.String(""), err } createListernerInput := &elbv2.CreateListenerInput{ LoadBalancerArn: loadBalancerArn, Port: aws.Int64(mapping.FrontendPort), Protocol: aws.String("TCP"), DefaultActions: []*elbv2.Action{{ TargetGroupArn: target.TargetGroupArn, Type: aws.String(elbv2.ActionTypeEnumForward), }}, } glog.Infof("Creating load balancer listener for %v", namespacedName) createListenerOutput, err := c.elbv2.CreateListener(createListernerInput) if err != nil { return nil, aws.String(""), fmt.Errorf("Error creating load balancer listener: %q", err) } return createListenerOutput.Listeners[0], target.TargetGroupArn, nil } // cleans up listener and corresponding target group func (c *Cloud) deleteListenerV2(listener *elbv2.Listener) error { _, err := c.elbv2.DeleteListener(&elbv2.DeleteListenerInput{ListenerArn: listener.ListenerArn}) if err != nil { return fmt.Errorf("Error deleting load balancer listener: %q", err) } _, err = c.elbv2.DeleteTargetGroup(&elbv2.DeleteTargetGroupInput{TargetGroupArn: listener.DefaultActions[0].TargetGroupArn}) if err != nil { return fmt.Errorf("Error deleting load balancer target group: %q", err) } return nil } // ensureTargetGroup creates a target group with a set of instances func (c *Cloud) ensureTargetGroup(targetGroup *elbv2.TargetGroup, mapping nlbPortMapping, instances []string, name string, vpcID string) (*elbv2.TargetGroup, error) { dirty := false if targetGroup == nil { input := &elbv2.CreateTargetGroupInput{ VpcId: aws.String(vpcID), Name: aws.String(name), Port: aws.Int64(mapping.TrafficPort), Protocol: aws.String("TCP"), TargetType: aws.String("instance"), HealthCheckIntervalSeconds: aws.Int64(30), HealthCheckPort: aws.String("traffic-port"), HealthCheckProtocol: aws.String("TCP"), HealthyThresholdCount: aws.Int64(3), UnhealthyThresholdCount: aws.Int64(3), } input.HealthCheckProtocol = aws.String(mapping.HealthCheckProtocol) if mapping.HealthCheckProtocol != elbv2.ProtocolEnumTcp { input.HealthCheckPath = aws.String(mapping.HealthCheckPath) } // Account for externalTrafficPolicy = "Local" if mapping.HealthCheckPort != mapping.TrafficPort { input.HealthCheckPort = aws.String(strconv.Itoa(int(mapping.HealthCheckPort))) } result, err := c.elbv2.CreateTargetGroup(input) if err != nil { return nil, fmt.Errorf("Error creating load balancer target group: %q", err) } if len(result.TargetGroups) != 1 { return nil, fmt.Errorf("Expected only one target group on CreateTargetGroup, got %d groups", len(result.TargetGroups)) } registerInput := &elbv2.RegisterTargetsInput{ TargetGroupArn: result.TargetGroups[0].TargetGroupArn, Targets: []*elbv2.TargetDescription{}, } for _, instanceID := range instances { registerInput.Targets = append(registerInput.Targets, &elbv2.TargetDescription{ Id: aws.String(string(instanceID)), Port: aws.Int64(mapping.TrafficPort), }) } _, err = c.elbv2.RegisterTargets(registerInput) if err != nil { return nil, fmt.Errorf("Error registering targets for load balancer: %q", err) } return result.TargetGroups[0], nil } // handle instances in service { healthResponse, err := c.elbv2.DescribeTargetHealth(&elbv2.DescribeTargetHealthInput{TargetGroupArn: targetGroup.TargetGroupArn}) if err != nil { return nil, fmt.Errorf("Error describing target group health: %q", err) } actualIDs := []string{} for _, healthDescription := range healthResponse.TargetHealthDescriptions { if healthDescription.TargetHealth.Reason != nil { switch aws.StringValue(healthDescription.TargetHealth.Reason) { case elbv2.TargetHealthReasonEnumTargetDeregistrationInProgress: // We don't need to count this instance in service if it is // on its way out default: actualIDs = append(actualIDs, *healthDescription.Target.Id) } } } actual := sets.NewString(actualIDs...) expected := sets.NewString(instances...) additions := expected.Difference(actual) removals := actual.Difference(expected) if len(additions) > 0 { registerInput := &elbv2.RegisterTargetsInput{ TargetGroupArn: targetGroup.TargetGroupArn, Targets: []*elbv2.TargetDescription{}, } for instanceID := range additions { registerInput.Targets = append(registerInput.Targets, &elbv2.TargetDescription{ Id: aws.String(instanceID), Port: aws.Int64(mapping.TrafficPort), }) } _, err := c.elbv2.RegisterTargets(registerInput) if err != nil { return nil, fmt.Errorf("Error registering new targets in target group: %q", err) } dirty = true } if len(removals) > 0 { deregisterInput := &elbv2.DeregisterTargetsInput{ TargetGroupArn: targetGroup.TargetGroupArn, Targets: []*elbv2.TargetDescription{}, } for instanceID := range removals { deregisterInput.Targets = append(deregisterInput.Targets, &elbv2.TargetDescription{ Id: aws.String(instanceID), Port: aws.Int64(mapping.TrafficPort), }) } _, err := c.elbv2.DeregisterTargets(deregisterInput) if err != nil { return nil, fmt.Errorf("Error trying to deregister targets in target group: %q", err) } dirty = true } } // ensure the health check is correct { dirtyHealthCheck := false input := &elbv2.ModifyTargetGroupInput{ TargetGroupArn: targetGroup.TargetGroupArn, } if aws.StringValue(targetGroup.HealthCheckProtocol) != mapping.HealthCheckProtocol { input.HealthCheckProtocol = aws.String(mapping.HealthCheckProtocol) dirtyHealthCheck = true } if aws.StringValue(targetGroup.HealthCheckPort) != strconv.Itoa(int(mapping.HealthCheckPort)) { input.HealthCheckPort = aws.String(strconv.Itoa(int(mapping.HealthCheckPort))) dirtyHealthCheck = true } if mapping.HealthCheckPath != "" && mapping.HealthCheckProtocol != elbv2.ProtocolEnumTcp { input.HealthCheckPath = aws.String(mapping.HealthCheckPath) dirtyHealthCheck = true } if dirtyHealthCheck { _, err := c.elbv2.ModifyTargetGroup(input) if err != nil { return nil, fmt.Errorf("Error modifying target group health check: %q", err) } dirty = true } } if dirty { result, err := c.elbv2.DescribeTargetGroups(&elbv2.DescribeTargetGroupsInput{ Names: []*string{aws.String(name)}, }) if err != nil { return nil, fmt.Errorf("Error retrieving target group after creation/update: %q", err) } targetGroup = result.TargetGroups[0] } return targetGroup, nil } func portsForNLB(lbName string, sg *ec2.SecurityGroup, clientTraffic bool) sets.Int64 { response := sets.NewInt64() var annotation string if clientTraffic { annotation = fmt.Sprintf("%s=%s", NLBClientRuleDescription, lbName) } else { annotation = fmt.Sprintf("%s=%s", NLBHealthCheckRuleDescription, lbName) } for i := range sg.IpPermissions { for j := range sg.IpPermissions[i].IpRanges { description := aws.StringValue(sg.IpPermissions[i].IpRanges[j].Description) if description == annotation { // TODO should probably check FromPort == ToPort response.Insert(aws.Int64Value(sg.IpPermissions[i].FromPort)) } } } return response } // filterForIPRangeDescription filters in security groups that have IpRange Descriptions that match a loadBalancerName func filterForIPRangeDescription(securityGroups []*ec2.SecurityGroup, lbName string) []*ec2.SecurityGroup { response := []*ec2.SecurityGroup{} clientRule := fmt.Sprintf("%s=%s", NLBClientRuleDescription, lbName) healthRule := fmt.Sprintf("%s=%s", NLBHealthCheckRuleDescription, lbName) for i := range securityGroups { for j := range securityGroups[i].IpPermissions { for k := range securityGroups[i].IpPermissions[j].IpRanges { description := aws.StringValue(securityGroups[i].IpPermissions[j].IpRanges[k].Description) if description == clientRule || description == healthRule { response = append(response, securityGroups[i]) } } } } return response } func (c *Cloud) getVpcCidrBlock() (*string, error) { vpcs, err := c.ec2.DescribeVpcs(&ec2.DescribeVpcsInput{ VpcIds: []*string{aws.String(c.vpcID)}, }) if err != nil { return nil, fmt.Errorf("Error querying VPC for ELB: %q", err) } if len(vpcs.Vpcs) != 1 { return nil, fmt.Errorf("Error querying VPC for ELB, got %d vpcs for %s", len(vpcs.Vpcs), c.vpcID) } return vpcs.Vpcs[0].CidrBlock, nil } // abstraction for updating SG rules // if clientTraffic is false, then only update HealthCheck rules func (c *Cloud) updateInstanceSecurityGroupsForNLBTraffic(actualGroups []*ec2.SecurityGroup, desiredSgIds []string, ports []int64, lbName string, clientCidrs []string, clientTraffic bool) error { // Map containing the groups we want to make changes on; the ports to make // changes on; and whether to add or remove it. true to add, false to remove portChanges := map[string]map[int64]bool{} for _, id := range desiredSgIds { // consider everything an addition for now if _, ok := portChanges[id]; !ok { portChanges[id] = make(map[int64]bool) } for _, port := range ports { portChanges[id][port] = true } } // Compare to actual groups for _, actualGroup := range actualGroups { actualGroupID := aws.StringValue(actualGroup.GroupId) if actualGroupID == "" { glog.Warning("Ignoring group without ID: ", actualGroup) continue } addingMap, ok := portChanges[actualGroupID] if ok { desiredSet := sets.NewInt64() for port := range addingMap { desiredSet.Insert(port) } existingSet := portsForNLB(lbName, actualGroup, clientTraffic) // remove from portChanges ports that are already allowed if intersection := desiredSet.Intersection(existingSet); intersection.Len() > 0 { for p := range intersection { delete(portChanges[actualGroupID], p) } } // allowed ports that need to be removed if difference := existingSet.Difference(desiredSet); difference.Len() > 0 { for p := range difference { portChanges[actualGroupID][p] = false } } } } // Make changes we've planned on for instanceSecurityGroupID, portMap := range portChanges { adds := []*ec2.IpPermission{} removes := []*ec2.IpPermission{} for port, add := range portMap { if add { if clientTraffic { glog.V(2).Infof("Adding rule for client MTU discovery from the network load balancer (%s) to instances (%s)", clientCidrs, instanceSecurityGroupID) glog.V(2).Infof("Adding rule for client traffic from the network load balancer (%s) to instances (%s)", clientCidrs, instanceSecurityGroupID) } else { glog.V(2).Infof("Adding rule for health check traffic from the network load balancer (%s) to instances (%s)", clientCidrs, instanceSecurityGroupID) } } else { if clientTraffic { glog.V(2).Infof("Removing rule for client MTU discovery from the network load balancer (%s) to instances (%s)", clientCidrs, instanceSecurityGroupID) glog.V(2).Infof("Removing rule for client traffic from the network load balancer (%s) to instance (%s)", clientCidrs, instanceSecurityGroupID) } glog.V(2).Infof("Removing rule for health check traffic from the network load balancer (%s) to instance (%s)", clientCidrs, instanceSecurityGroupID) } if clientTraffic { clientRuleAnnotation := fmt.Sprintf("%s=%s", NLBClientRuleDescription, lbName) // Client Traffic permission := &ec2.IpPermission{ FromPort: aws.Int64(port), ToPort: aws.Int64(port), IpProtocol: aws.String("tcp"), } ranges := []*ec2.IpRange{} for _, cidr := range clientCidrs { ranges = append(ranges, &ec2.IpRange{ CidrIp: aws.String(cidr), Description: aws.String(clientRuleAnnotation), }) } permission.IpRanges = ranges if add { adds = append(adds, permission) } else { removes = append(removes, permission) } } else { healthRuleAnnotation := fmt.Sprintf("%s=%s", NLBHealthCheckRuleDescription, lbName) // NLB HealthCheck permission := &ec2.IpPermission{ FromPort: aws.Int64(port), ToPort: aws.Int64(port), IpProtocol: aws.String("tcp"), } ranges := []*ec2.IpRange{} for _, cidr := range clientCidrs { ranges = append(ranges, &ec2.IpRange{ CidrIp: aws.String(cidr), Description: aws.String(healthRuleAnnotation), }) } permission.IpRanges = ranges if add { adds = append(adds, permission) } else { removes = append(removes, permission) } } } if len(adds) > 0 { changed, err := c.addSecurityGroupIngress(instanceSecurityGroupID, adds) if err != nil { return err } if !changed { glog.Warning("Allowing ingress was not needed; concurrent change? groupId=", instanceSecurityGroupID) } } if len(removes) > 0 { changed, err := c.removeSecurityGroupIngress(instanceSecurityGroupID, removes) if err != nil { return err } if !changed { glog.Warning("Revoking ingress was not needed; concurrent change? groupId=", instanceSecurityGroupID) } } if clientTraffic { // MTU discovery mtuRuleAnnotation := fmt.Sprintf("%s=%s", NLBMtuDiscoveryRuleDescription, lbName) mtuPermission := &ec2.IpPermission{ IpProtocol: aws.String("icmp"), FromPort: aws.Int64(3), ToPort: aws.Int64(4), } ranges := []*ec2.IpRange{} for _, cidr := range clientCidrs { ranges = append(ranges, &ec2.IpRange{ CidrIp: aws.String(cidr), Description: aws.String(mtuRuleAnnotation), }) } mtuPermission.IpRanges = ranges group, err := c.findSecurityGroup(instanceSecurityGroupID) if err != nil { glog.Warningf("Error retrieving security group: %q", err) return err } if group == nil { glog.Warning("Security group not found: ", instanceSecurityGroupID) return nil } icmpExists := false permCount := 0 for _, perm := range group.IpPermissions { if *perm.IpProtocol == "icmp" { icmpExists = true continue } if perm.FromPort != nil { permCount++ } } if !icmpExists && permCount > 0 { // the icmp permission is missing changed, err := c.addSecurityGroupIngress(instanceSecurityGroupID, []*ec2.IpPermission{mtuPermission}) if err != nil { glog.Warningf("Error adding MTU permission to security group: %q", err) return err } if !changed { glog.Warning("Allowing ingress was not needed; concurrent change? groupId=", instanceSecurityGroupID) } } else if icmpExists && permCount == 0 { // there is no additional permissions, remove icmp changed, err := c.removeSecurityGroupIngress(instanceSecurityGroupID, []*ec2.IpPermission{mtuPermission}) if err != nil { glog.Warningf("Error removing MTU permission to security group: %q", err) return err } if !changed { glog.Warning("Revoking ingress was not needed; concurrent change? groupId=", instanceSecurityGroupID) } } } } return nil } // Add SG rules for a given NLB func (c *Cloud) updateInstanceSecurityGroupsForNLB(mappings []nlbPortMapping, instances map[awsInstanceID]*ec2.Instance, lbName string, clientCidrs []string) error { if c.cfg.Global.DisableSecurityGroupIngress { return nil } vpcCidr, err := c.getVpcCidrBlock() if err != nil { return err } // Unlike the classic ELB, NLB does not have a security group that we can // filter against all existing groups to see if they allow access. Instead // we use the IpRange.Description field to annotate NLB health check and // client traffic rules // Get the actual list of groups that allow ingress for the load-balancer var actualGroups []*ec2.SecurityGroup { // Server side filter describeRequest := &ec2.DescribeSecurityGroupsInput{} filters := []*ec2.Filter{ newEc2Filter("ip-permission.protocol", "tcp"), } describeRequest.Filters = c.tagging.addFilters(filters) response, err := c.ec2.DescribeSecurityGroups(describeRequest) if err != nil { return fmt.Errorf("Error querying security groups for NLB: %q", err) } for _, sg := range response { if !c.tagging.hasClusterTag(sg.Tags) { continue } actualGroups = append(actualGroups, sg) } // client-side filter // Filter out groups that don't have IP Rules we've annotated for this service actualGroups = filterForIPRangeDescription(actualGroups, lbName) } taggedSecurityGroups, err := c.getTaggedSecurityGroups() if err != nil { return fmt.Errorf("Error querying for tagged security groups: %q", err) } externalTrafficPolicyIsLocal := false trafficPorts := []int64{} for i := range mappings { trafficPorts = append(trafficPorts, mappings[i].TrafficPort) if mappings[i].TrafficPort != mappings[i].HealthCheckPort { externalTrafficPolicyIsLocal = true } } healthCheckPorts := trafficPorts // if externalTrafficPolicy is Local, all listeners use the same health // check port if externalTrafficPolicyIsLocal && len(mappings) > 0 { healthCheckPorts = []int64{mappings[0].HealthCheckPort} } desiredGroupIds := []string{} // Scan instances for groups we want open for _, instance := range instances { securityGroup, err := findSecurityGroupForInstance(instance, taggedSecurityGroups) if err != nil { return err } if securityGroup == nil { glog.Warningf("Ignoring instance without security group: %s", aws.StringValue(instance.InstanceId)) continue } id := aws.StringValue(securityGroup.GroupId) if id == "" { glog.Warningf("found security group without id: %v", securityGroup) continue } desiredGroupIds = append(desiredGroupIds, id) } // Run once for Client traffic err = c.updateInstanceSecurityGroupsForNLBTraffic(actualGroups, desiredGroupIds, trafficPorts, lbName, clientCidrs, true) if err != nil { return err } // Run once for health check traffic err = c.updateInstanceSecurityGroupsForNLBTraffic(actualGroups, desiredGroupIds, healthCheckPorts, lbName, []string{aws.StringValue(vpcCidr)}, false) if err != nil { return err } return nil } func (c *Cloud) ensureLoadBalancer(namespacedName types.NamespacedName, loadBalancerName string, listeners []*elb.Listener, subnetIDs []string, securityGroupIDs []string, internalELB, proxyProtocol bool, loadBalancerAttributes *elb.LoadBalancerAttributes, annotations map[string]string) (*elb.LoadBalancerDescription, error) { loadBalancer, err := c.describeLoadBalancer(loadBalancerName) if err != nil { return nil, err } dirty := false if loadBalancer == nil { createRequest := &elb.CreateLoadBalancerInput{} createRequest.LoadBalancerName = aws.String(loadBalancerName) createRequest.Listeners = listeners if internalELB { createRequest.Scheme = aws.String("internal") } // We are supposed to specify one subnet per AZ. // TODO: What happens if we have more than one subnet per AZ? createRequest.Subnets = stringPointerArray(subnetIDs) createRequest.SecurityGroups = stringPointerArray(securityGroupIDs) // Get additional tags set by the user tags := getLoadBalancerAdditionalTags(annotations) // Add default tags tags[TagNameKubernetesService] = namespacedName.String() tags = c.tagging.buildTags(ResourceLifecycleOwned, tags) for k, v := range tags { createRequest.Tags = append(createRequest.Tags, &elb.Tag{ Key: aws.String(k), Value: aws.String(v), }) } glog.Infof("Creating load balancer for %v with name: %s", namespacedName, loadBalancerName) _, err := c.elb.CreateLoadBalancer(createRequest) if err != nil { return nil, err } if proxyProtocol { err = c.createProxyProtocolPolicy(loadBalancerName) if err != nil { return nil, err } for _, listener := range listeners { glog.V(2).Infof("Adjusting AWS loadbalancer proxy protocol on node port %d. Setting to true", *listener.InstancePort) err := c.setBackendPolicies(loadBalancerName, *listener.InstancePort, []*string{aws.String(ProxyProtocolPolicyName)}) if err != nil { return nil, err } } } dirty = true } else { // TODO: Sync internal vs non-internal { // Sync subnets expected := sets.NewString(subnetIDs...) actual := stringSetFromPointers(loadBalancer.Subnets) additions := expected.Difference(actual) removals := actual.Difference(expected) if removals.Len() != 0 { request := &elb.DetachLoadBalancerFromSubnetsInput{} request.LoadBalancerName = aws.String(loadBalancerName) request.Subnets = stringSetToPointers(removals) glog.V(2).Info("Detaching load balancer from removed subnets") _, err := c.elb.DetachLoadBalancerFromSubnets(request) if err != nil { return nil, fmt.Errorf("error detaching AWS loadbalancer from subnets: %q", err) } dirty = true } if additions.Len() != 0 { request := &elb.AttachLoadBalancerToSubnetsInput{} request.LoadBalancerName = aws.String(loadBalancerName) request.Subnets = stringSetToPointers(additions) glog.V(2).Info("Attaching load balancer to added subnets") _, err := c.elb.AttachLoadBalancerToSubnets(request) if err != nil { return nil, fmt.Errorf("error attaching AWS loadbalancer to subnets: %q", err) } dirty = true } } { // Sync security groups expected := sets.NewString(securityGroupIDs...) actual := stringSetFromPointers(loadBalancer.SecurityGroups) if !expected.Equal(actual) { // This call just replaces the security groups, unlike e.g. subnets (!) request := &elb.ApplySecurityGroupsToLoadBalancerInput{} request.LoadBalancerName = aws.String(loadBalancerName) request.SecurityGroups = stringPointerArray(securityGroupIDs) glog.V(2).Info("Applying updated security groups to load balancer") _, err := c.elb.ApplySecurityGroupsToLoadBalancer(request) if err != nil { return nil, fmt.Errorf("error applying AWS loadbalancer security groups: %q", err) } dirty = true } } { // Sync listeners listenerDescriptions := loadBalancer.ListenerDescriptions foundSet := make(map[int]bool) removals := []*int64{} for _, listenerDescription := range listenerDescriptions { actual := listenerDescription.Listener if actual == nil { glog.Warning("Ignoring empty listener in AWS loadbalancer: ", loadBalancerName) continue } found := -1 for i, expected := range listeners { if elbProtocolsAreEqual(actual.Protocol, expected.Protocol) { continue } if elbProtocolsAreEqual(actual.InstanceProtocol, expected.InstanceProtocol) { continue } if orZero(actual.InstancePort) != orZero(expected.InstancePort) { continue } if orZero(actual.LoadBalancerPort) != orZero(expected.LoadBalancerPort) { continue } if awsArnEquals(actual.SSLCertificateId, expected.SSLCertificateId) { continue } found = i } if found != -1 { foundSet[found] = true } else { removals = append(removals, actual.LoadBalancerPort) } } additions := []*elb.Listener{} for i := range listeners { if foundSet[i] { continue } additions = append(additions, listeners[i]) } if len(removals) != 0 { request := &elb.DeleteLoadBalancerListenersInput{} request.LoadBalancerName = aws.String(loadBalancerName) request.LoadBalancerPorts = removals glog.V(2).Info("Deleting removed load balancer listeners") _, err := c.elb.DeleteLoadBalancerListeners(request) if err != nil { return nil, fmt.Errorf("error deleting AWS loadbalancer listeners: %q", err) } dirty = true } if len(additions) != 0 { request := &elb.CreateLoadBalancerListenersInput{} request.LoadBalancerName = aws.String(loadBalancerName) request.Listeners = additions glog.V(2).Info("Creating added load balancer listeners") _, err := c.elb.CreateLoadBalancerListeners(request) if err != nil { return nil, fmt.Errorf("error creating AWS loadbalancer listeners: %q", err) } dirty = true } } { // Sync proxy protocol state for new and existing listeners proxyPolicies := make([]*string, 0) if proxyProtocol { // Ensure the backend policy exists // NOTE The documentation for the AWS API indicates we could get an HTTP 400 // back if a policy of the same name already exists. However, the aws-sdk does not // seem to return an error to us in these cases. Therefore, this will issue an API // request every time. err := c.createProxyProtocolPolicy(loadBalancerName) if err != nil { return nil, err } proxyPolicies = append(proxyPolicies, aws.String(ProxyProtocolPolicyName)) } foundBackends := make(map[int64]bool) proxyProtocolBackends := make(map[int64]bool) for _, backendListener := range loadBalancer.BackendServerDescriptions { foundBackends[*backendListener.InstancePort] = false proxyProtocolBackends[*backendListener.InstancePort] = proxyProtocolEnabled(backendListener) } for _, listener := range listeners { setPolicy := false instancePort := *listener.InstancePort if currentState, ok := proxyProtocolBackends[instancePort]; !ok { // This is a new ELB backend so we only need to worry about // potentially adding a policy and not removing an // existing one setPolicy = proxyProtocol } else { foundBackends[instancePort] = true // This is an existing ELB backend so we need to determine // if the state changed setPolicy = (currentState != proxyProtocol) } if setPolicy { glog.V(2).Infof("Adjusting AWS loadbalancer proxy protocol on node port %d. Setting to %t", instancePort, proxyProtocol) err := c.setBackendPolicies(loadBalancerName, instancePort, proxyPolicies) if err != nil { return nil, err } dirty = true } } // We now need to figure out if any backend policies need removed // because these old policies will stick around even if there is no // corresponding listener anymore for instancePort, found := range foundBackends { if !found { glog.V(2).Infof("Adjusting AWS loadbalancer proxy protocol on node port %d. Setting to false", instancePort) err := c.setBackendPolicies(loadBalancerName, instancePort, []*string{}) if err != nil { return nil, err } dirty = true } } } { // Add additional tags glog.V(2).Infof("Creating additional load balancer tags for %s", loadBalancerName) tags := getLoadBalancerAdditionalTags(annotations) if len(tags) > 0 { err := c.addLoadBalancerTags(loadBalancerName, tags) if err != nil { return nil, fmt.Errorf("unable to create additional load balancer tags: %v", err) } } } } // Whether the ELB was new or existing, sync attributes regardless. This accounts for things // that cannot be specified at the time of creation and can only be modified after the fact, // e.g. idle connection timeout. { describeAttributesRequest := &elb.DescribeLoadBalancerAttributesInput{} describeAttributesRequest.LoadBalancerName = aws.String(loadBalancerName) describeAttributesOutput, err := c.elb.DescribeLoadBalancerAttributes(describeAttributesRequest) if err != nil { glog.Warning("Unable to retrieve load balancer attributes during attribute sync") return nil, err } foundAttributes := &describeAttributesOutput.LoadBalancerAttributes // Update attributes if they're dirty if !reflect.DeepEqual(loadBalancerAttributes, foundAttributes) { glog.V(2).Infof("Updating load-balancer attributes for %q", loadBalancerName) modifyAttributesRequest := &elb.ModifyLoadBalancerAttributesInput{} modifyAttributesRequest.LoadBalancerName = aws.String(loadBalancerName) modifyAttributesRequest.LoadBalancerAttributes = loadBalancerAttributes _, err = c.elb.ModifyLoadBalancerAttributes(modifyAttributesRequest) if err != nil { return nil, fmt.Errorf("Unable to update load balancer attributes during attribute sync: %q", err) } dirty = true } } if dirty { loadBalancer, err = c.describeLoadBalancer(loadBalancerName) if err != nil { glog.Warning("Unable to retrieve load balancer after creation/update") return nil, err } } return loadBalancer, nil } func createSubnetMappings(subnetIDs []string) []*elbv2.SubnetMapping { response := []*elbv2.SubnetMapping{} for _, id := range subnetIDs { // Ignore AllocationId for now response = append(response, &elbv2.SubnetMapping{SubnetId: aws.String(id)}) } return response } // elbProtocolsAreEqual checks if two ELB protocol strings are considered the same // Comparison is case insensitive func elbProtocolsAreEqual(l, r *string) bool { if l == nil || r == nil { return l == r } return strings.EqualFold(aws.StringValue(l), aws.StringValue(r)) } // awsArnEquals checks if two ARN strings are considered the same // Comparison is case insensitive func awsArnEquals(l, r *string) bool { if l == nil || r == nil { return l == r } return strings.EqualFold(aws.StringValue(l), aws.StringValue(r)) } // getExpectedHealthCheck returns an elb.Healthcheck for the provided target // and using either sensible defaults or overrides via Service annotations func (c *Cloud) getExpectedHealthCheck(target string, annotations map[string]string) (*elb.HealthCheck, error) { healthcheck := &elb.HealthCheck{Target: &target} getOrDefault := func(annotation string, defaultValue int64) (*int64, error) { i64 := defaultValue var err error if s, ok := annotations[annotation]; ok { i64, err = strconv.ParseInt(s, 10, 0) if err != nil { return nil, fmt.Errorf("failed parsing health check annotation value: %v", err) } } return &i64, nil } var err error healthcheck.HealthyThreshold, err = getOrDefault(ServiceAnnotationLoadBalancerHCHealthyThreshold, defaultHCHealthyThreshold) if err != nil { return nil, err } healthcheck.UnhealthyThreshold, err = getOrDefault(ServiceAnnotationLoadBalancerHCUnhealthyThreshold, defaultHCUnhealthyThreshold) if err != nil { return nil, err } healthcheck.Timeout, err = getOrDefault(ServiceAnnotationLoadBalancerHCTimeout, defaultHCTimeout) if err != nil { return nil, err } healthcheck.Interval, err = getOrDefault(ServiceAnnotationLoadBalancerHCInterval, defaultHCInterval) if err != nil { return nil, err } if err = healthcheck.Validate(); err != nil { return nil, fmt.Errorf("some of the load balancer health check parameters are invalid: %v", err) } return healthcheck, nil } // Makes sure that the health check for an ELB matches the configured health check node port func (c *Cloud) ensureLoadBalancerHealthCheck(loadBalancer *elb.LoadBalancerDescription, protocol string, port int32, path string, annotations map[string]string) error { name := aws.StringValue(loadBalancer.LoadBalancerName) actual := loadBalancer.HealthCheck expectedTarget := protocol + ":" + strconv.FormatInt(int64(port), 10) + path expected, err := c.getExpectedHealthCheck(expectedTarget, annotations) if err != nil { return fmt.Errorf("cannot update health check for load balancer %q: %q", name, err) } // comparing attributes 1 by 1 to avoid breakage in case a new field is // added to the HC which breaks the equality if aws.StringValue(expected.Target) == aws.StringValue(actual.Target) && aws.Int64Value(expected.HealthyThreshold) == aws.Int64Value(actual.HealthyThreshold) && aws.Int64Value(expected.UnhealthyThreshold) == aws.Int64Value(actual.UnhealthyThreshold) && aws.Int64Value(expected.Interval) == aws.Int64Value(actual.Interval) && aws.Int64Value(expected.Timeout) == aws.Int64Value(actual.Timeout) { return nil } request := &elb.ConfigureHealthCheckInput{} request.HealthCheck = expected request.LoadBalancerName = loadBalancer.LoadBalancerName _, err = c.elb.ConfigureHealthCheck(request) if err != nil { return fmt.Errorf("error configuring load balancer health check for %q: %q", name, err) } return nil } // Makes sure that exactly the specified hosts are registered as instances with the load balancer func (c *Cloud) ensureLoadBalancerInstances(loadBalancerName string, lbInstances []*elb.Instance, instanceIDs map[awsInstanceID]*ec2.Instance) error { expected := sets.NewString() for id := range instanceIDs { expected.Insert(string(id)) } actual := sets.NewString() for _, lbInstance := range lbInstances { actual.Insert(aws.StringValue(lbInstance.InstanceId)) } additions := expected.Difference(actual) removals := actual.Difference(expected) addInstances := []*elb.Instance{} for _, instanceId := range additions.List() { addInstance := &elb.Instance{} addInstance.InstanceId = aws.String(instanceId) addInstances = append(addInstances, addInstance) } removeInstances := []*elb.Instance{} for _, instanceId := range removals.List() { removeInstance := &elb.Instance{} removeInstance.InstanceId = aws.String(instanceId) removeInstances = append(removeInstances, removeInstance) } if len(addInstances) > 0 { registerRequest := &elb.RegisterInstancesWithLoadBalancerInput{} registerRequest.Instances = addInstances registerRequest.LoadBalancerName = aws.String(loadBalancerName) _, err := c.elb.RegisterInstancesWithLoadBalancer(registerRequest) if err != nil { return err } glog.V(1).Infof("Instances added to load-balancer %s", loadBalancerName) } if len(removeInstances) > 0 { deregisterRequest := &elb.DeregisterInstancesFromLoadBalancerInput{} deregisterRequest.Instances = removeInstances deregisterRequest.LoadBalancerName = aws.String(loadBalancerName) _, err := c.elb.DeregisterInstancesFromLoadBalancer(deregisterRequest) if err != nil { return err } glog.V(1).Infof("Instances removed from load-balancer %s", loadBalancerName) } return nil } func (c *Cloud) getLoadBalancerTLSPorts(loadBalancer *elb.LoadBalancerDescription) []int64 { ports := []int64{} for _, listenerDescription := range loadBalancer.ListenerDescriptions { protocol := aws.StringValue(listenerDescription.Listener.Protocol) if protocol == "SSL" || protocol == "HTTPS" { ports = append(ports, aws.Int64Value(listenerDescription.Listener.LoadBalancerPort)) } } return ports } func (c *Cloud) ensureSSLNegotiationPolicy(loadBalancer *elb.LoadBalancerDescription, policyName string) error { glog.V(2).Info("Describing load balancer policies on load balancer") result, err := c.elb.DescribeLoadBalancerPolicies(&elb.DescribeLoadBalancerPoliciesInput{ LoadBalancerName: loadBalancer.LoadBalancerName, PolicyNames: []*string{ aws.String(fmt.Sprintf(SSLNegotiationPolicyNameFormat, policyName)), }, }) if err != nil { if aerr, ok := err.(awserr.Error); ok { switch aerr.Code() { case "PolicyNotFound": // TODO change from string to `elb.ErrCodePolicyNotFoundException` once the AWS SDK is updated default: return fmt.Errorf("error describing security policies on load balancer: %q", err) } } } if len(result.PolicyDescriptions) > 0 { return nil } glog.V(2).Infof("Creating SSL negotiation policy '%s' on load balancer", fmt.Sprintf(SSLNegotiationPolicyNameFormat, policyName)) // there is an upper limit of 98 policies on an ELB, we're pretty safe from // running into it _, err = c.elb.CreateLoadBalancerPolicy(&elb.CreateLoadBalancerPolicyInput{ LoadBalancerName: loadBalancer.LoadBalancerName, PolicyName: aws.String(fmt.Sprintf(SSLNegotiationPolicyNameFormat, policyName)), PolicyTypeName: aws.String("SSLNegotiationPolicyType"), PolicyAttributes: []*elb.PolicyAttribute{ { AttributeName: aws.String("Reference-Security-Policy"), AttributeValue: aws.String(policyName), }, }, }) if err != nil { return fmt.Errorf("error creating security policy on load balancer: %q", err) } return nil } func (c *Cloud) setSSLNegotiationPolicy(loadBalancerName, sslPolicyName string, port int64) error { policyName := fmt.Sprintf(SSLNegotiationPolicyNameFormat, sslPolicyName) request := &elb.SetLoadBalancerPoliciesOfListenerInput{ LoadBalancerName: aws.String(loadBalancerName), LoadBalancerPort: aws.Int64(port), PolicyNames: []*string{ aws.String(policyName), }, } glog.V(2).Infof("Setting SSL negotiation policy '%s' on load balancer", policyName) _, err := c.elb.SetLoadBalancerPoliciesOfListener(request) if err != nil { return fmt.Errorf("error setting SSL negotiation policy '%s' on load balancer: %q", policyName, err) } return nil } func (c *Cloud) createProxyProtocolPolicy(loadBalancerName string) error { request := &elb.CreateLoadBalancerPolicyInput{ LoadBalancerName: aws.String(loadBalancerName), PolicyName: aws.String(ProxyProtocolPolicyName), PolicyTypeName: aws.String("ProxyProtocolPolicyType"), PolicyAttributes: []*elb.PolicyAttribute{ { AttributeName: aws.String("ProxyProtocol"), AttributeValue: aws.String("true"), }, }, } glog.V(2).Info("Creating proxy protocol policy on load balancer") _, err := c.elb.CreateLoadBalancerPolicy(request) if err != nil { return fmt.Errorf("error creating proxy protocol policy on load balancer: %q", err) } return nil } func (c *Cloud) setBackendPolicies(loadBalancerName string, instancePort int64, policies []*string) error { request := &elb.SetLoadBalancerPoliciesForBackendServerInput{ InstancePort: aws.Int64(instancePort), LoadBalancerName: aws.String(loadBalancerName), PolicyNames: policies, } if len(policies) > 0 { glog.V(2).Infof("Adding AWS loadbalancer backend policies on node port %d", instancePort) } else { glog.V(2).Infof("Removing AWS loadbalancer backend policies on node port %d", instancePort) } _, err := c.elb.SetLoadBalancerPoliciesForBackendServer(request) if err != nil { return fmt.Errorf("error adjusting AWS loadbalancer backend policies: %q", err) } return nil } func proxyProtocolEnabled(backend *elb.BackendServerDescription) bool { for _, policy := range backend.PolicyNames { if aws.StringValue(policy) == ProxyProtocolPolicyName { return true } } return false } // findInstancesForELB gets the EC2 instances corresponding to the Nodes, for setting up an ELB // We ignore Nodes (with a log message) where the instanceid cannot be determined from the provider, // and we ignore instances which are not found func (c *Cloud) findInstancesForELB(nodes []*v1.Node) (map[awsInstanceID]*ec2.Instance, error) { // Map to instance ids ignoring Nodes where we cannot find the id (but logging) instanceIDs := mapToAWSInstanceIDsTolerant(nodes) cacheCriteria := cacheCriteria{ // MaxAge not required, because we only care about security groups, which should not change HasInstances: instanceIDs, // Refresh if any of the instance ids are missing } snapshot, err := c.instanceCache.describeAllInstancesCached(cacheCriteria) if err != nil { return nil, err } instances := snapshot.FindInstances(instanceIDs) // We ignore instances that cannot be found return instances, nil }