refactor NLB securityGroup handling
This commit is contained in:
		| @@ -3579,7 +3579,7 @@ func (c *Cloud) EnsureLoadBalancer(ctx context.Context, clusterName string, apiS | |||||||
| 			sourceRangeCidrs = append(sourceRangeCidrs, "0.0.0.0/0") | 			sourceRangeCidrs = append(sourceRangeCidrs, "0.0.0.0/0") | ||||||
| 		} | 		} | ||||||
|  |  | ||||||
| 		err = c.updateInstanceSecurityGroupsForNLB(v2Mappings, instances, loadBalancerName, sourceRangeCidrs) | 		err = c.updateInstanceSecurityGroupsForNLB(loadBalancerName, instances, sourceRangeCidrs, v2Mappings) | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			klog.Warningf("Error opening ingress rules for the load balancer to the instances: %q", err) | 			klog.Warningf("Error opening ingress rules for the load balancer to the instances: %q", err) | ||||||
| 			return nil, err | 			return nil, err | ||||||
| @@ -4158,99 +4158,7 @@ func (c *Cloud) EnsureLoadBalancerDeleted(ctx context.Context, clusterName strin | |||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
|  |  | ||||||
| 		{ | 		return c.updateInstanceSecurityGroupsForNLB(loadBalancerName, nil, nil, nil) | ||||||
| 			var matchingGroups []*ec2.SecurityGroup |  | ||||||
| 			{ |  | ||||||
| 				// Server side filter |  | ||||||
| 				describeRequest := &ec2.DescribeSecurityGroupsInput{} |  | ||||||
| 				describeRequest.Filters = []*ec2.Filter{ |  | ||||||
| 					newEc2Filter("ip-permission.protocol", "tcp"), |  | ||||||
| 				} |  | ||||||
| 				response, err := c.ec2.DescribeSecurityGroups(describeRequest) |  | ||||||
| 				if err != nil { |  | ||||||
| 					return fmt.Errorf("Error querying security groups for NLB: %q", err) |  | ||||||
| 				} |  | ||||||
| 				for _, sg := range response { |  | ||||||
| 					if !c.tagging.hasClusterTag(sg.Tags) { |  | ||||||
| 						continue |  | ||||||
| 					} |  | ||||||
| 					matchingGroups = append(matchingGroups, sg) |  | ||||||
| 				} |  | ||||||
|  |  | ||||||
| 				// client-side filter out groups that don't have IP Rules we've |  | ||||||
| 				// annotated for this service |  | ||||||
| 				matchingGroups = filterForIPRangeDescription(matchingGroups, loadBalancerName) |  | ||||||
| 			} |  | ||||||
|  |  | ||||||
| 			{ |  | ||||||
| 				clientRule := fmt.Sprintf("%s=%s", NLBClientRuleDescription, loadBalancerName) |  | ||||||
| 				mtuRule := fmt.Sprintf("%s=%s", NLBMtuDiscoveryRuleDescription, loadBalancerName) |  | ||||||
| 				healthRule := fmt.Sprintf("%s=%s", NLBHealthCheckRuleDescription, loadBalancerName) |  | ||||||
|  |  | ||||||
| 				for i := range matchingGroups { |  | ||||||
| 					removes := []*ec2.IpPermission{} |  | ||||||
| 					for j := range matchingGroups[i].IpPermissions { |  | ||||||
|  |  | ||||||
| 						v4rangesToRemove := []*ec2.IpRange{} |  | ||||||
| 						v6rangesToRemove := []*ec2.Ipv6Range{} |  | ||||||
|  |  | ||||||
| 						// Find IpPermission that contains k8s description |  | ||||||
| 						// If we removed the whole IpPermission, it could contain other non-k8s specified ranges |  | ||||||
| 						for k := range matchingGroups[i].IpPermissions[j].IpRanges { |  | ||||||
| 							description := aws.StringValue(matchingGroups[i].IpPermissions[j].IpRanges[k].Description) |  | ||||||
| 							if description == clientRule || description == mtuRule || description == healthRule { |  | ||||||
| 								v4rangesToRemove = append(v4rangesToRemove, matchingGroups[i].IpPermissions[j].IpRanges[k]) |  | ||||||
| 							} |  | ||||||
| 						} |  | ||||||
|  |  | ||||||
| 						// Find IpPermission that contains k8s description |  | ||||||
| 						// If we removed the whole IpPermission, it could contain other non-k8s specified rangesk |  | ||||||
| 						for k := range matchingGroups[i].IpPermissions[j].Ipv6Ranges { |  | ||||||
| 							description := aws.StringValue(matchingGroups[i].IpPermissions[j].Ipv6Ranges[k].Description) |  | ||||||
| 							if description == clientRule || description == mtuRule || description == healthRule { |  | ||||||
| 								v6rangesToRemove = append(v6rangesToRemove, matchingGroups[i].IpPermissions[j].Ipv6Ranges[k]) |  | ||||||
| 							} |  | ||||||
| 						} |  | ||||||
|  |  | ||||||
| 						// ipv4 and ipv6 removals cannot be included in the same permission |  | ||||||
| 						if len(v4rangesToRemove) > 0 { |  | ||||||
| 							// create a new *IpPermission to not accidentally remove UserIdGroupPairs |  | ||||||
| 							removedPermission := &ec2.IpPermission{ |  | ||||||
| 								FromPort:   matchingGroups[i].IpPermissions[j].FromPort, |  | ||||||
| 								IpProtocol: matchingGroups[i].IpPermissions[j].IpProtocol, |  | ||||||
| 								IpRanges:   v4rangesToRemove, |  | ||||||
| 								ToPort:     matchingGroups[i].IpPermissions[j].ToPort, |  | ||||||
| 							} |  | ||||||
| 							removes = append(removes, removedPermission) |  | ||||||
| 						} |  | ||||||
| 						if len(v6rangesToRemove) > 0 { |  | ||||||
| 							// create a new *IpPermission to not accidentally remove UserIdGroupPairs |  | ||||||
| 							removedPermission := &ec2.IpPermission{ |  | ||||||
| 								FromPort:   matchingGroups[i].IpPermissions[j].FromPort, |  | ||||||
| 								IpProtocol: matchingGroups[i].IpPermissions[j].IpProtocol, |  | ||||||
| 								Ipv6Ranges: v6rangesToRemove, |  | ||||||
| 								ToPort:     matchingGroups[i].IpPermissions[j].ToPort, |  | ||||||
| 							} |  | ||||||
| 							removes = append(removes, removedPermission) |  | ||||||
| 						} |  | ||||||
|  |  | ||||||
| 					} |  | ||||||
| 					if len(removes) > 0 { |  | ||||||
| 						changed, err := c.removeSecurityGroupIngress(aws.StringValue(matchingGroups[i].GroupId), removes) |  | ||||||
| 						if err != nil { |  | ||||||
| 							return err |  | ||||||
| 						} |  | ||||||
| 						if !changed { |  | ||||||
| 							klog.Warning("Revoking ingress was not needed; concurrent change? groupId=", *matchingGroups[i].GroupId) |  | ||||||
| 						} |  | ||||||
| 					} |  | ||||||
|  |  | ||||||
| 				} |  | ||||||
|  |  | ||||||
| 			} |  | ||||||
|  |  | ||||||
| 		} |  | ||||||
| 		return nil |  | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	lb, err := c.describeLoadBalancer(loadBalancerName) | 	lb, err := c.describeLoadBalancer(loadBalancerName) | ||||||
|   | |||||||
| @@ -68,7 +68,6 @@ type nlbPortMapping struct { | |||||||
|  |  | ||||||
| 	TrafficPort     int64 | 	TrafficPort     int64 | ||||||
| 	TrafficProtocol string | 	TrafficProtocol string | ||||||
| 	ClientCIDR      string |  | ||||||
|  |  | ||||||
| 	HealthCheckPort     int64 | 	HealthCheckPort     int64 | ||||||
| 	HealthCheckPath     string | 	HealthCheckPath     string | ||||||
| @@ -648,50 +647,6 @@ func (c *Cloud) ensureTargetGroup(targetGroup *elbv2.TargetGroup, serviceName ty | |||||||
| 	return targetGroup, nil | 	return targetGroup, nil | ||||||
| } | } | ||||||
|  |  | ||||||
| func portsForNLB(lbName string, sg *ec2.SecurityGroup, clientTraffic bool) sets.Int64 { |  | ||||||
| 	response := sets.NewInt64() |  | ||||||
| 	var annotation string |  | ||||||
| 	if clientTraffic { |  | ||||||
| 		annotation = fmt.Sprintf("%s=%s", NLBClientRuleDescription, lbName) |  | ||||||
| 	} else { |  | ||||||
| 		annotation = fmt.Sprintf("%s=%s", NLBHealthCheckRuleDescription, lbName) |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	for i := range sg.IpPermissions { |  | ||||||
| 		for j := range sg.IpPermissions[i].IpRanges { |  | ||||||
| 			description := aws.StringValue(sg.IpPermissions[i].IpRanges[j].Description) |  | ||||||
| 			if description == annotation { |  | ||||||
| 				// TODO  should probably check FromPort == ToPort |  | ||||||
| 				response.Insert(aws.Int64Value(sg.IpPermissions[i].FromPort)) |  | ||||||
| 			} |  | ||||||
| 		} |  | ||||||
| 	} |  | ||||||
| 	return response |  | ||||||
| } |  | ||||||
|  |  | ||||||
| // filterForIPRangeDescription filters in security groups that have IpRange Descriptions that match a loadBalancerName |  | ||||||
| func filterForIPRangeDescription(securityGroups []*ec2.SecurityGroup, lbName string) []*ec2.SecurityGroup { |  | ||||||
| 	response := []*ec2.SecurityGroup{} |  | ||||||
| 	clientRule := fmt.Sprintf("%s=%s", NLBClientRuleDescription, lbName) |  | ||||||
| 	healthRule := fmt.Sprintf("%s=%s", NLBHealthCheckRuleDescription, lbName) |  | ||||||
| 	alreadyAdded := sets.NewString() |  | ||||||
| 	for i := range securityGroups { |  | ||||||
| 		for j := range securityGroups[i].IpPermissions { |  | ||||||
| 			for k := range securityGroups[i].IpPermissions[j].IpRanges { |  | ||||||
| 				description := aws.StringValue(securityGroups[i].IpPermissions[j].IpRanges[k].Description) |  | ||||||
| 				if description == clientRule || description == healthRule { |  | ||||||
| 					sgIDString := aws.StringValue(securityGroups[i].GroupId) |  | ||||||
| 					if !alreadyAdded.Has(sgIDString) { |  | ||||||
| 						response = append(response, securityGroups[i]) |  | ||||||
| 						alreadyAdded.Insert(sgIDString) |  | ||||||
| 					} |  | ||||||
| 				} |  | ||||||
| 			} |  | ||||||
| 		} |  | ||||||
| 	} |  | ||||||
| 	return response |  | ||||||
| } |  | ||||||
|  |  | ||||||
| func (c *Cloud) getVpcCidrBlocks() ([]string, error) { | func (c *Cloud) getVpcCidrBlocks() ([]string, error) { | ||||||
| 	vpcs, err := c.ec2.DescribeVpcs(&ec2.DescribeVpcsInput{ | 	vpcs, err := c.ec2.DescribeVpcs(&ec2.DescribeVpcsInput{ | ||||||
| 		VpcIds: []*string{aws.String(c.vpcID)}, | 		VpcIds: []*string{aws.String(c.vpcID)}, | ||||||
| @@ -710,203 +665,76 @@ func (c *Cloud) getVpcCidrBlocks() ([]string, error) { | |||||||
| 	return cidrBlocks, nil | 	return cidrBlocks, nil | ||||||
| } | } | ||||||
|  |  | ||||||
| // abstraction for updating SG rules | // updateInstanceSecurityGroupsForNLB will adjust securityGroup's settings to allow inbound traffic into instances from clientCIDRs and portMappings. | ||||||
| // if clientTraffic is false, then only update HealthCheck rules | // TIP: if either instances or clientCIDRs or portMappings are nil, then the securityGroup rules for lbName are cleared. | ||||||
| func (c *Cloud) updateInstanceSecurityGroupsForNLBTraffic(actualGroups []*ec2.SecurityGroup, desiredSgIds []string, ports []int64, lbName string, clientCidrs []string, clientTraffic bool) error { | func (c *Cloud) updateInstanceSecurityGroupsForNLB(lbName string, instances map[InstanceID]*ec2.Instance, clientCIDRs []string, portMappings []nlbPortMapping) error { | ||||||
|  | 	if c.cfg.Global.DisableSecurityGroupIngress { | ||||||
| 	klog.V(8).Infof("updateInstanceSecurityGroupsForNLBTraffic: actualGroups=%v, desiredSgIds=%v, ports=%v, clientTraffic=%v", actualGroups, desiredSgIds, ports, clientTraffic) | 		return nil | ||||||
| 	// Map containing the groups we want to make changes on; the ports to make |  | ||||||
| 	// changes on; and whether to add or remove it. true to add, false to remove |  | ||||||
| 	portChanges := map[string]map[int64]bool{} |  | ||||||
|  |  | ||||||
| 	for _, id := range desiredSgIds { |  | ||||||
| 		// consider everything an addition for now |  | ||||||
| 		if _, ok := portChanges[id]; !ok { |  | ||||||
| 			portChanges[id] = make(map[int64]bool) |  | ||||||
| 		} |  | ||||||
| 		for _, port := range ports { |  | ||||||
| 			portChanges[id][port] = true |  | ||||||
| 		} |  | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	// Compare to actual groups | 	clusterSGs, err := c.getTaggedSecurityGroups() | ||||||
| 	for _, actualGroup := range actualGroups { | 	if err != nil { | ||||||
| 		actualGroupID := aws.StringValue(actualGroup.GroupId) | 		return fmt.Errorf("error querying for tagged security groups: %q", err) | ||||||
| 		if actualGroupID == "" { | 	} | ||||||
| 			klog.Warning("Ignoring group without ID: ", actualGroup) | 	// scan instances for groups we want to open | ||||||
|  | 	desiredSGIDs := sets.String{} | ||||||
|  | 	for _, instance := range instances { | ||||||
|  | 		sg, err := findSecurityGroupForInstance(instance, clusterSGs) | ||||||
|  | 		if err != nil { | ||||||
|  | 			return err | ||||||
|  | 		} | ||||||
|  | 		if sg == nil { | ||||||
|  | 			klog.Warningf("Ignoring instance without security group: %s", aws.StringValue(instance.InstanceId)) | ||||||
| 			continue | 			continue | ||||||
| 		} | 		} | ||||||
|  | 		desiredSGIDs.Insert(aws.StringValue(sg.GroupId)) | ||||||
|  | 	} | ||||||
|  |  | ||||||
| 		addingMap, ok := portChanges[actualGroupID] | 	// TODO(@M00nF1sh): do we really needs to support SG without cluster tag at current version? | ||||||
| 		if ok { | 	// findSecurityGroupForInstance might return SG that are not tagged. | ||||||
| 			desiredSet := sets.NewInt64() | 	{ | ||||||
| 			for port := range addingMap { | 		for sgID := range desiredSGIDs.Difference(sets.StringKeySet(clusterSGs)) { | ||||||
| 				desiredSet.Insert(port) | 			sg, err := c.findSecurityGroup(sgID) | ||||||
| 			} | 			if err != nil { | ||||||
| 			existingSet := portsForNLB(lbName, actualGroup, clientTraffic) | 				return fmt.Errorf("error finding instance group: %q", err) | ||||||
|  |  | ||||||
| 			// remove from portChanges ports that are already allowed |  | ||||||
| 			if intersection := desiredSet.Intersection(existingSet); intersection.Len() > 0 { |  | ||||||
| 				for p := range intersection { |  | ||||||
| 					delete(portChanges[actualGroupID], p) |  | ||||||
| 				} |  | ||||||
| 			} |  | ||||||
|  |  | ||||||
| 			// allowed ports that need to be removed |  | ||||||
| 			if difference := existingSet.Difference(desiredSet); difference.Len() > 0 { |  | ||||||
| 				for p := range difference { |  | ||||||
| 					portChanges[actualGroupID][p] = false |  | ||||||
| 				} |  | ||||||
| 			} | 			} | ||||||
|  | 			clusterSGs[sgID] = sg | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	// Make changes we've planned on | 	{ | ||||||
| 	for instanceSecurityGroupID, portMap := range portChanges { | 		clientPorts := sets.Int64{} | ||||||
| 		adds := []*ec2.IpPermission{} | 		healthCheckPorts := sets.Int64{} | ||||||
| 		removes := []*ec2.IpPermission{} | 		for _, port := range portMappings { | ||||||
| 		for port, add := range portMap { | 			clientPorts.Insert(port.TrafficPort) | ||||||
| 			if add { | 			healthCheckPorts.Insert(port.HealthCheckPort) | ||||||
| 				if clientTraffic { |  | ||||||
| 					klog.V(2).Infof("Adding rule for client MTU discovery from the network load balancer (%s) to instances (%s)", clientCidrs, instanceSecurityGroupID) |  | ||||||
| 					klog.V(2).Infof("Adding rule for client traffic from the network load balancer (%s) to instances (%s), port (%v)", clientCidrs, instanceSecurityGroupID, port) |  | ||||||
| 				} else { |  | ||||||
| 					klog.V(2).Infof("Adding rule for health check traffic from the network load balancer (%s) to instances (%s), port (%v)", clientCidrs, instanceSecurityGroupID, port) |  | ||||||
| 				} |  | ||||||
| 			} else { |  | ||||||
| 				if clientTraffic { |  | ||||||
| 					klog.V(2).Infof("Removing rule for client MTU discovery from the network load balancer (%s) to instances (%s)", clientCidrs, instanceSecurityGroupID) |  | ||||||
| 					klog.V(2).Infof("Removing rule for client traffic from the network load balancer (%s) to instance (%s), port (%v)", clientCidrs, instanceSecurityGroupID, port) |  | ||||||
| 				} |  | ||||||
| 				klog.V(2).Infof("Removing rule for health check traffic from the network load balancer (%s) to instance (%s), port (%v)", clientCidrs, instanceSecurityGroupID, port) |  | ||||||
| 			} |  | ||||||
|  |  | ||||||
| 			if clientTraffic { |  | ||||||
| 				clientRuleAnnotation := fmt.Sprintf("%s=%s", NLBClientRuleDescription, lbName) |  | ||||||
| 				// Client Traffic |  | ||||||
| 				permission := &ec2.IpPermission{ |  | ||||||
| 					FromPort:   aws.Int64(port), |  | ||||||
| 					ToPort:     aws.Int64(port), |  | ||||||
| 					IpProtocol: aws.String("tcp"), |  | ||||||
| 				} |  | ||||||
| 				ranges := []*ec2.IpRange{} |  | ||||||
| 				for _, cidr := range clientCidrs { |  | ||||||
| 					ranges = append(ranges, &ec2.IpRange{ |  | ||||||
| 						CidrIp:      aws.String(cidr), |  | ||||||
| 						Description: aws.String(clientRuleAnnotation), |  | ||||||
| 					}) |  | ||||||
| 				} |  | ||||||
| 				permission.IpRanges = ranges |  | ||||||
| 				if add { |  | ||||||
| 					adds = append(adds, permission) |  | ||||||
| 				} else { |  | ||||||
| 					removes = append(removes, permission) |  | ||||||
| 				} |  | ||||||
| 			} else { |  | ||||||
| 				healthRuleAnnotation := fmt.Sprintf("%s=%s", NLBHealthCheckRuleDescription, lbName) |  | ||||||
|  |  | ||||||
| 				// NLB HealthCheck |  | ||||||
| 				permission := &ec2.IpPermission{ |  | ||||||
| 					FromPort:   aws.Int64(port), |  | ||||||
| 					ToPort:     aws.Int64(port), |  | ||||||
| 					IpProtocol: aws.String("tcp"), |  | ||||||
| 				} |  | ||||||
| 				ranges := []*ec2.IpRange{} |  | ||||||
| 				for _, cidr := range clientCidrs { |  | ||||||
| 					ranges = append(ranges, &ec2.IpRange{ |  | ||||||
| 						CidrIp:      aws.String(cidr), |  | ||||||
| 						Description: aws.String(healthRuleAnnotation), |  | ||||||
| 					}) |  | ||||||
| 				} |  | ||||||
| 				permission.IpRanges = ranges |  | ||||||
| 				if add { |  | ||||||
| 					adds = append(adds, permission) |  | ||||||
| 				} else { |  | ||||||
| 					removes = append(removes, permission) |  | ||||||
| 				} |  | ||||||
| 			} |  | ||||||
| 		} | 		} | ||||||
|  | 		clientRuleAnnotation := fmt.Sprintf("%s=%s", NLBClientRuleDescription, lbName) | ||||||
| 		if len(adds) > 0 { | 		healthRuleAnnotation := fmt.Sprintf("%s=%s", NLBHealthCheckRuleDescription, lbName) | ||||||
| 			changed, err := c.addSecurityGroupIngress(instanceSecurityGroupID, adds) | 		vpcCIDRs, err := c.getVpcCidrBlocks() | ||||||
| 			if err != nil { | 		if err != nil { | ||||||
| 				return err | 			return err | ||||||
| 			} |  | ||||||
| 			if !changed { |  | ||||||
| 				klog.Warning("Allowing ingress was not needed; concurrent change? groupId=", instanceSecurityGroupID) |  | ||||||
| 			} |  | ||||||
| 		} | 		} | ||||||
|  | 		for sgID, sg := range clusterSGs { | ||||||
| 		if len(removes) > 0 { | 			sgPerms := NewIPPermissionSet(sg.IpPermissions...).Ungroup() | ||||||
| 			changed, err := c.removeSecurityGroupIngress(instanceSecurityGroupID, removes) | 			if desiredSGIDs.Has(sgID) { | ||||||
| 			if err != nil { | 				if err := c.updateInstanceSecurityGroupForNLBTraffic(sgID, sgPerms, healthRuleAnnotation, "tcp", healthCheckPorts, vpcCIDRs); err != nil { | ||||||
| 				return err |  | ||||||
| 			} |  | ||||||
| 			if !changed { |  | ||||||
| 				klog.Warning("Revoking ingress was not needed; concurrent change? groupId=", instanceSecurityGroupID) |  | ||||||
| 			} |  | ||||||
| 		} |  | ||||||
|  |  | ||||||
| 		if clientTraffic { |  | ||||||
| 			// MTU discovery |  | ||||||
| 			mtuRuleAnnotation := fmt.Sprintf("%s=%s", NLBMtuDiscoveryRuleDescription, lbName) |  | ||||||
| 			mtuPermission := &ec2.IpPermission{ |  | ||||||
| 				IpProtocol: aws.String("icmp"), |  | ||||||
| 				FromPort:   aws.Int64(3), |  | ||||||
| 				ToPort:     aws.Int64(4), |  | ||||||
| 			} |  | ||||||
| 			ranges := []*ec2.IpRange{} |  | ||||||
| 			for _, cidr := range clientCidrs { |  | ||||||
| 				ranges = append(ranges, &ec2.IpRange{ |  | ||||||
| 					CidrIp:      aws.String(cidr), |  | ||||||
| 					Description: aws.String(mtuRuleAnnotation), |  | ||||||
| 				}) |  | ||||||
| 			} |  | ||||||
| 			mtuPermission.IpRanges = ranges |  | ||||||
|  |  | ||||||
| 			group, err := c.findSecurityGroup(instanceSecurityGroupID) |  | ||||||
| 			if err != nil { |  | ||||||
| 				klog.Warningf("Error retrieving security group: %q", err) |  | ||||||
| 				return err |  | ||||||
| 			} |  | ||||||
|  |  | ||||||
| 			if group == nil { |  | ||||||
| 				klog.Warning("Security group not found: ", instanceSecurityGroupID) |  | ||||||
| 				return nil |  | ||||||
| 			} |  | ||||||
|  |  | ||||||
| 			icmpExists := false |  | ||||||
| 			permCount := 0 |  | ||||||
| 			for _, perm := range group.IpPermissions { |  | ||||||
| 				if *perm.IpProtocol == "icmp" { |  | ||||||
| 					icmpExists = true |  | ||||||
| 					continue |  | ||||||
| 				} |  | ||||||
|  |  | ||||||
| 				if perm.FromPort != nil { |  | ||||||
| 					permCount++ |  | ||||||
| 				} |  | ||||||
| 			} |  | ||||||
|  |  | ||||||
| 			if !icmpExists && permCount > 0 { |  | ||||||
| 				// the icmp permission is missing |  | ||||||
| 				changed, err := c.addSecurityGroupIngress(instanceSecurityGroupID, []*ec2.IpPermission{mtuPermission}) |  | ||||||
| 				if err != nil { |  | ||||||
| 					klog.Warningf("Error adding MTU permission to security group: %q", err) |  | ||||||
| 					return err | 					return err | ||||||
| 				} | 				} | ||||||
| 				if !changed { | 				if err := c.updateInstanceSecurityGroupForNLBTraffic(sgID, sgPerms, clientRuleAnnotation, "tcp", clientPorts, clientCIDRs); err != nil { | ||||||
| 					klog.Warning("Allowing ingress was not needed; concurrent change? groupId=", instanceSecurityGroupID) |  | ||||||
| 				} |  | ||||||
| 			} else if icmpExists && permCount == 0 { |  | ||||||
| 				// there is no additional permissions, remove icmp |  | ||||||
| 				changed, err := c.removeSecurityGroupIngress(instanceSecurityGroupID, []*ec2.IpPermission{mtuPermission}) |  | ||||||
| 				if err != nil { |  | ||||||
| 					klog.Warningf("Error removing MTU permission to security group: %q", err) |  | ||||||
| 					return err | 					return err | ||||||
| 				} | 				} | ||||||
| 				if !changed { | 			} else { | ||||||
| 					klog.Warning("Revoking ingress was not needed; concurrent change? groupId=", instanceSecurityGroupID) | 				if err := c.updateInstanceSecurityGroupForNLBTraffic(sgID, sgPerms, healthRuleAnnotation, "tcp", nil, nil); err != nil { | ||||||
|  | 					return err | ||||||
|  | 				} | ||||||
|  | 				if err := c.updateInstanceSecurityGroupForNLBTraffic(sgID, sgPerms, clientRuleAnnotation, "tcp", nil, nil); err != nil { | ||||||
|  | 					return err | ||||||
|  | 				} | ||||||
|  | 			} | ||||||
|  | 			if !sgPerms.Equal(NewIPPermissionSet(sg.IpPermissions...).Ungroup()) { | ||||||
|  | 				if err := c.updateInstanceSecurityGroupForNLBMTU(sgID, sgPerms); err != nil { | ||||||
|  | 					return err | ||||||
| 				} | 				} | ||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
| @@ -914,102 +742,105 @@ func (c *Cloud) updateInstanceSecurityGroupsForNLBTraffic(actualGroups []*ec2.Se | |||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
|  |  | ||||||
| // Add SG rules for a given NLB | // updateInstanceSecurityGroupForNLBTraffic will manage permissions set(identified by ruleDesc) on securityGroup to match desired set(allow protocol traffic from ports/cidr). | ||||||
| func (c *Cloud) updateInstanceSecurityGroupsForNLB(mappings []nlbPortMapping, instances map[InstanceID]*ec2.Instance, lbName string, clientCidrs []string) error { | // Note: sgPerms will be updated to reflect the current permission set on SG after update. | ||||||
| 	if c.cfg.Global.DisableSecurityGroupIngress { | func (c *Cloud) updateInstanceSecurityGroupForNLBTraffic(sgID string, sgPerms IPPermissionSet, ruleDesc string, protocol string, ports sets.Int64, cidrs []string) error { | ||||||
| 		return nil | 	desiredPerms := NewIPPermissionSet() | ||||||
| 	} | 	for port := range ports { | ||||||
|  | 		for _, cidr := range cidrs { | ||||||
| 	vpcCidrBlocks, err := c.getVpcCidrBlocks() | 			desiredPerms.Insert(&ec2.IpPermission{ | ||||||
| 	if err != nil { | 				IpProtocol: aws.String(protocol), | ||||||
| 		return err | 				FromPort:   aws.Int64(port), | ||||||
| 	} | 				ToPort:     aws.Int64(port), | ||||||
|  | 				IpRanges: []*ec2.IpRange{ | ||||||
| 	// Unlike the classic ELB, NLB does not have a security group that we can | 					{ | ||||||
| 	// filter against all existing groups to see if they allow access. Instead | 						CidrIp:      aws.String(cidr), | ||||||
| 	// we use the IpRange.Description field to annotate NLB health check and | 						Description: aws.String(ruleDesc), | ||||||
| 	// client traffic rules | 					}, | ||||||
|  | 				}, | ||||||
| 	// Get the actual list of groups that allow ingress for the load-balancer | 			}) | ||||||
| 	var actualGroups []*ec2.SecurityGroup |  | ||||||
| 	{ |  | ||||||
| 		// Server side filter |  | ||||||
| 		describeRequest := &ec2.DescribeSecurityGroupsInput{} |  | ||||||
| 		describeRequest.Filters = []*ec2.Filter{ |  | ||||||
| 			newEc2Filter("ip-permission.protocol", "tcp"), |  | ||||||
| 			newEc2Filter("vpc-id", c.vpcID), |  | ||||||
| 		} | 		} | ||||||
| 		response, err := c.ec2.DescribeSecurityGroups(describeRequest) | 	} | ||||||
| 		if err != nil { |  | ||||||
| 			return fmt.Errorf("Error querying security groups for NLB: %q", err) | 	permsToGrant := desiredPerms.Difference(sgPerms) | ||||||
| 		} | 	permsToRevoke := sgPerms.Difference(desiredPerms) | ||||||
| 		for _, sg := range response { | 	permsToRevoke.DeleteIf(IPPermissionNotMatch{IPPermissionMatchDesc{ruleDesc}}) | ||||||
| 			if !c.tagging.hasClusterTag(sg.Tags) { | 	if len(permsToRevoke) > 0 { | ||||||
| 				continue | 		permsToRevokeList := permsToRevoke.List() | ||||||
| 			} | 		changed, err := c.removeSecurityGroupIngress(sgID, permsToRevokeList) | ||||||
| 			actualGroups = append(actualGroups, sg) |  | ||||||
| 		} |  | ||||||
|  |  | ||||||
| 		// client-side filter |  | ||||||
| 		// Filter out groups that don't have IP Rules we've annotated for this service |  | ||||||
| 		actualGroups = filterForIPRangeDescription(actualGroups, lbName) |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	taggedSecurityGroups, err := c.getTaggedSecurityGroups() |  | ||||||
| 	if err != nil { |  | ||||||
| 		return fmt.Errorf("Error querying for tagged security groups: %q", err) |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	externalTrafficPolicyIsLocal := false |  | ||||||
| 	trafficPorts := []int64{} |  | ||||||
| 	for i := range mappings { |  | ||||||
| 		trafficPorts = append(trafficPorts, mappings[i].TrafficPort) |  | ||||||
| 		if mappings[i].TrafficPort != mappings[i].HealthCheckPort { |  | ||||||
| 			externalTrafficPolicyIsLocal = true |  | ||||||
| 		} |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	healthCheckPorts := trafficPorts |  | ||||||
| 	// if externalTrafficPolicy is Local, all listeners use the same health |  | ||||||
| 	// check port |  | ||||||
| 	if externalTrafficPolicyIsLocal && len(mappings) > 0 { |  | ||||||
| 		healthCheckPorts = []int64{mappings[0].HealthCheckPort} |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	desiredGroupIds := []string{} |  | ||||||
| 	// Scan instances for groups we want open |  | ||||||
| 	for _, instance := range instances { |  | ||||||
| 		securityGroup, err := findSecurityGroupForInstance(instance, taggedSecurityGroups) |  | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
|  | 			klog.Warningf("Error remove traffic permission from security group: %q", err) | ||||||
| 			return err | 			return err | ||||||
| 		} | 		} | ||||||
|  | 		if !changed { | ||||||
|  | 			klog.Warning("Revoking ingress was not needed; concurrent change? groupId=", sgID) | ||||||
|  | 		} | ||||||
|  | 		sgPerms.Delete(permsToRevokeList...) | ||||||
|  | 	} | ||||||
|  | 	if len(permsToGrant) > 0 { | ||||||
|  | 		permsToGrantList := permsToGrant.List() | ||||||
|  | 		changed, err := c.addSecurityGroupIngress(sgID, permsToGrantList) | ||||||
|  | 		if err != nil { | ||||||
|  | 			klog.Warningf("Error add traffic permission to security group: %q", err) | ||||||
|  | 			return err | ||||||
|  | 		} | ||||||
|  | 		if !changed { | ||||||
|  | 			klog.Warning("Allowing ingress was not needed; concurrent change? groupId=", sgID) | ||||||
|  | 		} | ||||||
|  | 		sgPerms.Insert(permsToGrantList...) | ||||||
|  | 	} | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  |  | ||||||
| 		if securityGroup == nil { | // Note: sgPerms will be updated to reflect the current permission set on SG after update. | ||||||
| 			klog.Warningf("Ignoring instance without security group: %s", aws.StringValue(instance.InstanceId)) | func (c *Cloud) updateInstanceSecurityGroupForNLBMTU(sgID string, sgPerms IPPermissionSet) error { | ||||||
| 			continue | 	desiredPerms := NewIPPermissionSet() | ||||||
|  | 	for _, perm := range sgPerms { | ||||||
|  | 		for _, ipRange := range perm.IpRanges { | ||||||
|  | 			if strings.Contains(aws.StringValue(ipRange.Description), NLBClientRuleDescription) { | ||||||
|  | 				desiredPerms.Insert(&ec2.IpPermission{ | ||||||
|  | 					IpProtocol: aws.String("icmp"), | ||||||
|  | 					FromPort:   aws.Int64(3), | ||||||
|  | 					ToPort:     aws.Int64(4), | ||||||
|  | 					IpRanges: []*ec2.IpRange{ | ||||||
|  | 						{ | ||||||
|  | 							CidrIp:      ipRange.CidrIp, | ||||||
|  | 							Description: aws.String(NLBMtuDiscoveryRuleDescription), | ||||||
|  | 						}, | ||||||
|  | 					}, | ||||||
|  | 				}) | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	permsToGrant := desiredPerms.Difference(sgPerms) | ||||||
|  | 	permsToRevoke := sgPerms.Difference(desiredPerms) | ||||||
|  | 	permsToRevoke.DeleteIf(IPPermissionNotMatch{IPPermissionMatchDesc{NLBMtuDiscoveryRuleDescription}}) | ||||||
|  | 	if len(permsToRevoke) > 0 { | ||||||
|  | 		permsToRevokeList := permsToRevoke.List() | ||||||
|  | 		changed, err := c.removeSecurityGroupIngress(sgID, permsToRevokeList) | ||||||
|  | 		if err != nil { | ||||||
|  | 			klog.Warningf("Error remove MTU permission from security group: %q", err) | ||||||
|  | 			return err | ||||||
|  | 		} | ||||||
|  | 		if !changed { | ||||||
|  | 			klog.Warning("Revoking ingress was not needed; concurrent change? groupId=", sgID) | ||||||
| 		} | 		} | ||||||
|  |  | ||||||
| 		id := aws.StringValue(securityGroup.GroupId) | 		sgPerms.Delete(permsToRevokeList...) | ||||||
| 		if id == "" { | 	} | ||||||
| 			klog.Warningf("found security group without id: %v", securityGroup) | 	if len(permsToGrant) > 0 { | ||||||
| 			continue | 		permsToGrantList := permsToGrant.List() | ||||||
|  | 		changed, err := c.addSecurityGroupIngress(sgID, permsToGrantList) | ||||||
|  | 		if err != nil { | ||||||
|  | 			klog.Warningf("Error add MTU permission to security group: %q", err) | ||||||
|  | 			return err | ||||||
| 		} | 		} | ||||||
|  | 		if !changed { | ||||||
| 		desiredGroupIds = append(desiredGroupIds, id) | 			klog.Warning("Allowing ingress was not needed; concurrent change? groupId=", sgID) | ||||||
|  | 		} | ||||||
|  | 		sgPerms.Insert(permsToGrantList...) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	// Run once for Client traffic |  | ||||||
| 	err = c.updateInstanceSecurityGroupsForNLBTraffic(actualGroups, desiredGroupIds, trafficPorts, lbName, clientCidrs, true) |  | ||||||
| 	if err != nil { |  | ||||||
| 		return err |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	// Run once for health check traffic |  | ||||||
| 	err = c.updateInstanceSecurityGroupsForNLBTraffic(actualGroups, desiredGroupIds, healthCheckPorts, lbName, vpcCidrBlocks, false) |  | ||||||
| 	if err != nil { |  | ||||||
| 		return err |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
|  |  | ||||||
|   | |||||||
| @@ -17,11 +17,9 @@ limitations under the License. | |||||||
| package aws | package aws | ||||||
|  |  | ||||||
| import ( | import ( | ||||||
| 	"fmt" |  | ||||||
| 	"testing" | 	"testing" | ||||||
|  |  | ||||||
| 	"github.com/aws/aws-sdk-go/aws" | 	"github.com/aws/aws-sdk-go/aws" | ||||||
| 	"github.com/aws/aws-sdk-go/service/ec2" |  | ||||||
| 	"github.com/aws/aws-sdk-go/service/elb" | 	"github.com/aws/aws-sdk-go/service/elb" | ||||||
| 	"github.com/stretchr/testify/assert" | 	"github.com/stretchr/testify/assert" | ||||||
| ) | ) | ||||||
| @@ -165,66 +163,6 @@ func TestIsNLB(t *testing.T) { | |||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
| func TestSecurityGroupFiltering(t *testing.T) { |  | ||||||
| 	grid := []struct { |  | ||||||
| 		in          []*ec2.SecurityGroup |  | ||||||
| 		name        string |  | ||||||
| 		expected    int |  | ||||||
| 		description string |  | ||||||
| 	}{ |  | ||||||
| 		{ |  | ||||||
| 			in: []*ec2.SecurityGroup{ |  | ||||||
| 				{ |  | ||||||
| 					IpPermissions: []*ec2.IpPermission{ |  | ||||||
| 						{ |  | ||||||
| 							IpRanges: []*ec2.IpRange{ |  | ||||||
| 								{ |  | ||||||
| 									Description: aws.String("an unmanaged"), |  | ||||||
| 								}, |  | ||||||
| 							}, |  | ||||||
| 						}, |  | ||||||
| 					}, |  | ||||||
| 				}, |  | ||||||
| 			}, |  | ||||||
| 			name:        "unmanaged", |  | ||||||
| 			expected:    0, |  | ||||||
| 			description: "An environment without managed LBs should have %d, but found %d SecurityGroups", |  | ||||||
| 		}, |  | ||||||
| 		{ |  | ||||||
| 			in: []*ec2.SecurityGroup{ |  | ||||||
| 				{ |  | ||||||
| 					IpPermissions: []*ec2.IpPermission{ |  | ||||||
| 						{ |  | ||||||
| 							IpRanges: []*ec2.IpRange{ |  | ||||||
| 								{ |  | ||||||
| 									Description: aws.String("an unmanaged"), |  | ||||||
| 								}, |  | ||||||
| 								{ |  | ||||||
| 									Description: aws.String(fmt.Sprintf("%s=%s", NLBClientRuleDescription, "managedlb")), |  | ||||||
| 								}, |  | ||||||
| 								{ |  | ||||||
| 									Description: aws.String(fmt.Sprintf("%s=%s", NLBHealthCheckRuleDescription, "managedlb")), |  | ||||||
| 								}, |  | ||||||
| 							}, |  | ||||||
| 						}, |  | ||||||
| 					}, |  | ||||||
| 				}, |  | ||||||
| 			}, |  | ||||||
| 			name:        "managedlb", |  | ||||||
| 			expected:    1, |  | ||||||
| 			description: "Found %d, but should have %d Security Groups", |  | ||||||
| 		}, |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	for _, g := range grid { |  | ||||||
| 		actual := len(filterForIPRangeDescription(g.in, g.name)) |  | ||||||
| 		if actual != g.expected { |  | ||||||
| 			t.Errorf(g.description, actual, g.expected) |  | ||||||
| 		} |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| } |  | ||||||
|  |  | ||||||
| func TestSyncElbListeners(t *testing.T) { | func TestSyncElbListeners(t *testing.T) { | ||||||
| 	tests := []struct { | 	tests := []struct { | ||||||
| 		name                 string | 		name                 string | ||||||
|   | |||||||
| @@ -20,12 +20,19 @@ import ( | |||||||
| 	"encoding/json" | 	"encoding/json" | ||||||
| 	"fmt" | 	"fmt" | ||||||
|  |  | ||||||
|  | 	"github.com/aws/aws-sdk-go/aws" | ||||||
| 	"github.com/aws/aws-sdk-go/service/ec2" | 	"github.com/aws/aws-sdk-go/service/ec2" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| // IPPermissionSet maps IP strings of strings to EC2 IpPermissions | // IPPermissionSet maps IP strings of strings to EC2 IpPermissions | ||||||
| type IPPermissionSet map[string]*ec2.IpPermission | type IPPermissionSet map[string]*ec2.IpPermission | ||||||
|  |  | ||||||
|  | // IPPermissionPredicate is an predicate to test whether IPPermission matches some condition. | ||||||
|  | type IPPermissionPredicate interface { | ||||||
|  | 	// Test checks whether specified IPPermission matches condition. | ||||||
|  | 	Test(perm *ec2.IpPermission) bool | ||||||
|  | } | ||||||
|  |  | ||||||
| // NewIPPermissionSet creates a new IPPermissionSet | // NewIPPermissionSet creates a new IPPermissionSet | ||||||
| func NewIPPermissionSet(items ...*ec2.IpPermission) IPPermissionSet { | func NewIPPermissionSet(items ...*ec2.IpPermission) IPPermissionSet { | ||||||
| 	s := make(IPPermissionSet) | 	s := make(IPPermissionSet) | ||||||
| @@ -90,6 +97,23 @@ func (s IPPermissionSet) Insert(items ...*ec2.IpPermission) { | |||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
|  | // Delete delete permission from the set. | ||||||
|  | func (s IPPermissionSet) Delete(items ...*ec2.IpPermission) { | ||||||
|  | 	for _, p := range items { | ||||||
|  | 		k := keyForIPPermission(p) | ||||||
|  | 		delete(s, k) | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // DeleteIf delete permission from the set if permission matches predicate. | ||||||
|  | func (s IPPermissionSet) DeleteIf(predicate IPPermissionPredicate) { | ||||||
|  | 	for k, p := range s { | ||||||
|  | 		if predicate.Test(p) { | ||||||
|  | 			delete(s, k) | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
| // List returns the contents as a slice.  Order is not defined. | // List returns the contents as a slice.  Order is not defined. | ||||||
| func (s IPPermissionSet) List() []*ec2.IpPermission { | func (s IPPermissionSet) List() []*ec2.IpPermission { | ||||||
| 	res := make([]*ec2.IpPermission, 0, len(s)) | 	res := make([]*ec2.IpPermission, 0, len(s)) | ||||||
| @@ -146,3 +170,47 @@ func keyForIPPermission(p *ec2.IpPermission) string { | |||||||
| 	} | 	} | ||||||
| 	return string(v) | 	return string(v) | ||||||
| } | } | ||||||
|  |  | ||||||
|  | var _ IPPermissionPredicate = IPPermissionMatchDesc{} | ||||||
|  |  | ||||||
|  | // IPPermissionMatchDesc checks whether specific IPPermission contains description. | ||||||
|  | type IPPermissionMatchDesc struct { | ||||||
|  | 	Description string | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // Test whether specific IPPermission contains description. | ||||||
|  | func (p IPPermissionMatchDesc) Test(perm *ec2.IpPermission) bool { | ||||||
|  | 	for _, v4Range := range perm.IpRanges { | ||||||
|  | 		if aws.StringValue(v4Range.Description) == p.Description { | ||||||
|  | 			return true | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 	for _, v6Range := range perm.Ipv6Ranges { | ||||||
|  | 		if aws.StringValue(v6Range.Description) == p.Description { | ||||||
|  | 			return true | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 	for _, prefixListID := range perm.PrefixListIds { | ||||||
|  | 		if aws.StringValue(prefixListID.Description) == p.Description { | ||||||
|  | 			return true | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 	for _, group := range perm.UserIdGroupPairs { | ||||||
|  | 		if aws.StringValue(group.Description) == p.Description { | ||||||
|  | 			return true | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 	return false | ||||||
|  | } | ||||||
|  |  | ||||||
|  | var _ IPPermissionPredicate = IPPermissionNotMatch{} | ||||||
|  |  | ||||||
|  | // IPPermissionNotMatch is the *not* operator for Predicate | ||||||
|  | type IPPermissionNotMatch struct { | ||||||
|  | 	Predicate IPPermissionPredicate | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // Test whether specific IPPermission not match the embed predicate. | ||||||
|  | func (p IPPermissionNotMatch) Test(perm *ec2.IpPermission) bool { | ||||||
|  | 	return !p.Predicate.Test(perm) | ||||||
|  | } | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user
	 M00nF1sh
					M00nF1sh