Merge pull request #41695 from justinsb/shared_tag
Automatic merge from submit-queue (batch tested with PRs 41921, 41695, 42139, 42090, 41949) AWS: Support shared tag `kubernetes.io/cluster/<clusterid>` We recognize an additional cluster tag: kubernetes.io/cluster/<clusterid> This now allows us to share resources, in particular subnets. In addition, the value is used to track ownership/lifecycle. When we create objects, we record the value as "owned". We also refactor out tags into its own file & class, as we are touching most of these functions anyway. ```release-note AWS: Support shared tag `kubernetes.io/cluster/<clusterid>` ```
This commit is contained in:
		| @@ -21,6 +21,7 @@ go_library( | ||||
|         "regions.go", | ||||
|         "retry_handler.go", | ||||
|         "sets_ippermissions.go", | ||||
|         "tags.go", | ||||
|         "volumes.go", | ||||
|     ], | ||||
|     tags = ["automanaged"], | ||||
| @@ -56,6 +57,7 @@ go_test( | ||||
|         "device_allocator_test.go", | ||||
|         "regions_test.go", | ||||
|         "retry_handler_test.go", | ||||
|         "tags_test.go", | ||||
|     ], | ||||
|     library = ":go_default_library", | ||||
|     tags = ["automanaged"], | ||||
|   | ||||
| @@ -54,10 +54,6 @@ import ( | ||||
| // ProviderName is the name of this cloud provider. | ||||
| const ProviderName = "aws" | ||||
|  | ||||
| // TagNameKubernetesCluster is the tag name we use to differentiate multiple | ||||
| // logically independent clusters running in the same AZ | ||||
| const TagNameKubernetesCluster = "KubernetesCluster" | ||||
|  | ||||
| // TagNameKubernetesService is the tag name we use to differentiate multiple | ||||
| // services. Used currently for ELBs only. | ||||
| const TagNameKubernetesService = "kubernetes.io/service-name" | ||||
| @@ -363,7 +359,7 @@ type Cloud struct { | ||||
| 	region   string | ||||
| 	vpcID    string | ||||
|  | ||||
| 	filterTags map[string]string | ||||
| 	tagging awsTagging | ||||
|  | ||||
| 	// The AWS instance that we are running on | ||||
| 	// Note that we cache some state in awsInstance (mountpoints), so we must preserve the instance | ||||
| @@ -392,7 +388,10 @@ type CloudConfig struct { | ||||
| 		// Maybe if we're not running on AWS, e.g. bootstrap; for now it is not very useful | ||||
| 		Zone string | ||||
|  | ||||
| 		// KubernetesClusterTag is the legacy cluster id we'll use to identify our cluster resources | ||||
| 		KubernetesClusterTag string | ||||
| 		// KubernetesClusterTag is the cluster id we'll use to identify our cluster resources | ||||
| 		KubernetesClusterID string | ||||
|  | ||||
| 		//The aws provider creates an inbound rule per load balancer on the node security | ||||
| 		//group. However, this can run into the AWS security group rule limit of 50 if | ||||
| @@ -538,12 +537,12 @@ func orEmpty(s *string) string { | ||||
| 	return aws.StringValue(s) | ||||
| } | ||||
|  | ||||
| func newEc2Filter(name string, value string) *ec2.Filter { | ||||
| func newEc2Filter(name string, values ...string) *ec2.Filter { | ||||
| 	filter := &ec2.Filter{ | ||||
| 		Name: aws.String(name), | ||||
| 		Values: []*string{ | ||||
| 			aws.String(value), | ||||
| 		}, | ||||
| 	} | ||||
| 	for _, value := range values { | ||||
| 		filter.Values = append(filter.Values, aws.String(value)) | ||||
| 	} | ||||
| 	return filter | ||||
| } | ||||
| @@ -821,33 +820,21 @@ func newAWSCloud(config io.Reader, awsServices Services) (*Cloud, error) { | ||||
| 	awsCloud.selfAWSInstance = selfAWSInstance | ||||
| 	awsCloud.vpcID = selfAWSInstance.vpcID | ||||
|  | ||||
| 	filterTags := map[string]string{} | ||||
| 	if cfg.Global.KubernetesClusterTag != "" { | ||||
| 		filterTags[TagNameKubernetesCluster] = cfg.Global.KubernetesClusterTag | ||||
| 	if cfg.Global.KubernetesClusterTag != "" || cfg.Global.KubernetesClusterID != "" { | ||||
| 		if err := awsCloud.tagging.init(cfg.Global.KubernetesClusterTag, cfg.Global.KubernetesClusterID); err != nil { | ||||
| 			return nil, err | ||||
| 		} | ||||
| 	} else { | ||||
| 		// TODO: Clean up double-API query | ||||
| 		info, err := selfAWSInstance.describeInstance() | ||||
| 		if err != nil { | ||||
| 			return nil, err | ||||
| 		} | ||||
| 		for _, tag := range info.Tags { | ||||
| 			if orEmpty(tag.Key) == TagNameKubernetesCluster { | ||||
| 				filterTags[TagNameKubernetesCluster] = orEmpty(tag.Value) | ||||
| 			} | ||||
| 		if err := awsCloud.tagging.initFromTags(info.Tags); err != nil { | ||||
| 			return nil, err | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	if filterTags[TagNameKubernetesCluster] == "" { | ||||
| 		glog.Errorf("Tag %q not found; Kubernetes may behave unexpectedly.", TagNameKubernetesCluster) | ||||
| 	} | ||||
|  | ||||
| 	awsCloud.filterTags = filterTags | ||||
| 	if len(filterTags) > 0 { | ||||
| 		glog.Infof("AWS cloud filtering on tags: %v", filterTags) | ||||
| 	} else { | ||||
| 		glog.Infof("AWS cloud - no tag filtering") | ||||
| 	} | ||||
|  | ||||
| 	// Register regions, in particular for ECR credentials | ||||
| 	once.Do(func() { | ||||
| 		RecognizeWellKnownRegions() | ||||
| @@ -1020,15 +1007,12 @@ func (c *Cloud) InstanceType(nodeName types.NodeName) (string, error) { | ||||
| // Return a list of instances matching regex string. | ||||
| func (c *Cloud) getInstancesByRegex(regex string) ([]types.NodeName, error) { | ||||
| 	filters := []*ec2.Filter{newEc2Filter("instance-state-name", "running")} | ||||
| 	filters = c.addFilters(filters) | ||||
| 	request := &ec2.DescribeInstancesInput{ | ||||
| 		Filters: filters, | ||||
| 	} | ||||
|  | ||||
| 	instances, err := c.ec2.DescribeInstances(request) | ||||
| 	instances, err := c.describeInstances(filters) | ||||
| 	if err != nil { | ||||
| 		return []types.NodeName{}, err | ||||
| 	} | ||||
|  | ||||
| 	if len(instances) == 0 { | ||||
| 		return []types.NodeName{}, fmt.Errorf("no instances returned") | ||||
| 	} | ||||
| @@ -1080,15 +1064,12 @@ func (c *Cloud) getCandidateZonesForDynamicVolume() (sets.String, error) { | ||||
| 	// TODO: We could also query for subnets, I think | ||||
|  | ||||
| 	filters := []*ec2.Filter{newEc2Filter("instance-state-name", "running")} | ||||
| 	filters = c.addFilters(filters) | ||||
| 	request := &ec2.DescribeInstancesInput{ | ||||
| 		Filters: filters, | ||||
| 	} | ||||
|  | ||||
| 	instances, err := c.ec2.DescribeInstances(request) | ||||
| 	instances, err := c.describeInstances(filters) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
|  | ||||
| 	if len(instances) == 0 { | ||||
| 		return nil, fmt.Errorf("no instances returned") | ||||
| 	} | ||||
| @@ -1686,26 +1667,16 @@ func (c *Cloud) CreateDisk(volumeOptions *VolumeOptions) (KubernetesVolumeID, er | ||||
| 	volumeName := KubernetesVolumeID("aws://" + aws.StringValue(response.AvailabilityZone) + "/" + string(awsID)) | ||||
|  | ||||
| 	// apply tags | ||||
| 	tags := make(map[string]string) | ||||
| 	for k, v := range volumeOptions.Tags { | ||||
| 		tags[k] = v | ||||
| 	} | ||||
|  | ||||
| 	if c.getClusterName() != "" { | ||||
| 		tags[TagNameKubernetesCluster] = c.getClusterName() | ||||
| 	} | ||||
|  | ||||
| 	if len(tags) != 0 { | ||||
| 		if err := c.createTags(string(awsID), tags); err != nil { | ||||
| 			// delete the volume and hope it succeeds | ||||
| 			_, delerr := c.DeleteDisk(volumeName) | ||||
| 			if delerr != nil { | ||||
| 				// delete did not succeed, we have a stray volume! | ||||
| 				return "", fmt.Errorf("error tagging volume %s, could not delete the volume: %v", volumeName, delerr) | ||||
| 			} | ||||
| 			return "", fmt.Errorf("error tagging volume %s: %v", volumeName, err) | ||||
| 	if err := c.tagging.createTags(c.ec2, string(awsID), ResourceLifecycleOwned, volumeOptions.Tags); err != nil { | ||||
| 		// delete the volume and hope it succeeds | ||||
| 		_, delerr := c.DeleteDisk(volumeName) | ||||
| 		if delerr != nil { | ||||
| 			// delete did not succeed, we have a stray volume! | ||||
| 			return "", fmt.Errorf("error tagging volume %s, could not delete the volume: %v", volumeName, delerr) | ||||
| 		} | ||||
| 		return "", fmt.Errorf("error tagging volume %s: %v", volumeName, err) | ||||
| 	} | ||||
|  | ||||
| 	return volumeName, nil | ||||
| } | ||||
|  | ||||
| @@ -2154,36 +2125,6 @@ func (c *Cloud) removeSecurityGroupIngress(securityGroupID string, removePermiss | ||||
| 	return true, nil | ||||
| } | ||||
|  | ||||
| // Ensure that a resource has the correct tags | ||||
| // If it has no tags, we assume that this was a problem caused by an error in between creation and tagging, | ||||
| // and we add the tags.  If it has a different cluster's tags, that is an error. | ||||
| func (c *Cloud) ensureClusterTags(resourceID string, tags []*ec2.Tag) error { | ||||
| 	actualTags := make(map[string]string) | ||||
| 	for _, tag := range tags { | ||||
| 		actualTags[aws.StringValue(tag.Key)] = aws.StringValue(tag.Value) | ||||
| 	} | ||||
|  | ||||
| 	addTags := make(map[string]string) | ||||
| 	for k, expected := range c.filterTags { | ||||
| 		actual := actualTags[k] | ||||
| 		if actual == expected { | ||||
| 			continue | ||||
| 		} | ||||
| 		if actual == "" { | ||||
| 			glog.Warningf("Resource %q was missing expected cluster tag %q.  Will add (with value %q)", resourceID, k, expected) | ||||
| 			addTags[k] = expected | ||||
| 		} else { | ||||
| 			return fmt.Errorf("resource %q has tag belonging to another cluster: %q=%q (expected %q)", resourceID, k, actual, expected) | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	if err := c.createTags(resourceID, addTags); err != nil { | ||||
| 		return fmt.Errorf("error adding missing tags to resource %q: %v", resourceID, err) | ||||
| 	} | ||||
|  | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // Makes sure the security group exists. | ||||
| // For multi-cluster isolation, name must be globally unique, for example derived from the service UUID. | ||||
| // Returns the security group id or error | ||||
| @@ -2214,7 +2155,9 @@ func (c *Cloud) ensureSecurityGroup(name string, description string) (string, er | ||||
| 			if len(securityGroups) > 1 { | ||||
| 				glog.Warningf("Found multiple security groups with name: %q", name) | ||||
| 			} | ||||
| 			err := c.ensureClusterTags(aws.StringValue(securityGroups[0].GroupId), securityGroups[0].Tags) | ||||
| 			err := c.tagging.readRepairClusterTags( | ||||
| 				c.ec2, aws.StringValue(securityGroups[0].GroupId), | ||||
| 				ResourceLifecycleOwned, nil, securityGroups[0].Tags) | ||||
| 			if err != nil { | ||||
| 				return "", err | ||||
| 			} | ||||
| @@ -2251,7 +2194,7 @@ func (c *Cloud) ensureSecurityGroup(name string, description string) (string, er | ||||
| 		return "", fmt.Errorf("created security group, but id was not returned: %s", name) | ||||
| 	} | ||||
|  | ||||
| 	err := c.createTags(groupID, c.filterTags) | ||||
| 	err := c.tagging.createTags(c.ec2, groupID, ResourceLifecycleOwned, nil) | ||||
| 	if err != nil { | ||||
| 		// If we retry, ensureClusterTags will recover from this - it | ||||
| 		// will add the missing tags.  We could delete the security | ||||
| @@ -2262,52 +2205,6 @@ func (c *Cloud) ensureSecurityGroup(name string, description string) (string, er | ||||
| 	return groupID, nil | ||||
| } | ||||
|  | ||||
| // createTags calls EC2 CreateTags, but adds retry-on-failure logic | ||||
| // We retry mainly because if we create an object, we cannot tag it until it is "fully created" (eventual consistency) | ||||
| // The error code varies though (depending on what we are tagging), so we simply retry on all errors | ||||
| func (c *Cloud) createTags(resourceID string, tags map[string]string) error { | ||||
| 	if tags == nil || len(tags) == 0 { | ||||
| 		return nil | ||||
| 	} | ||||
|  | ||||
| 	var awsTags []*ec2.Tag | ||||
| 	for k, v := range tags { | ||||
| 		tag := &ec2.Tag{ | ||||
| 			Key:   aws.String(k), | ||||
| 			Value: aws.String(v), | ||||
| 		} | ||||
| 		awsTags = append(awsTags, tag) | ||||
| 	} | ||||
|  | ||||
| 	backoff := wait.Backoff{ | ||||
| 		Duration: createTagInitialDelay, | ||||
| 		Factor:   createTagFactor, | ||||
| 		Steps:    createTagSteps, | ||||
| 	} | ||||
| 	request := &ec2.CreateTagsInput{} | ||||
| 	request.Resources = []*string{&resourceID} | ||||
| 	request.Tags = awsTags | ||||
|  | ||||
| 	var lastErr error | ||||
| 	err := wait.ExponentialBackoff(backoff, func() (bool, error) { | ||||
| 		_, err := c.ec2.CreateTags(request) | ||||
| 		if err == nil { | ||||
| 			return true, nil | ||||
| 		} | ||||
|  | ||||
| 		// We could check that the error is retryable, but the error code changes based on what we are tagging | ||||
| 		// SecurityGroup: InvalidGroup.NotFound | ||||
| 		glog.V(2).Infof("Failed to create tags; will retry.  Error was %v", err) | ||||
| 		lastErr = err | ||||
| 		return false, nil | ||||
| 	}) | ||||
| 	if err == wait.ErrWaitTimeout { | ||||
| 		// return real CreateTags error instead of timeout | ||||
| 		err = lastErr | ||||
| 	} | ||||
| 	return err | ||||
| } | ||||
|  | ||||
| // Finds the value for a given tag. | ||||
| func findTag(tags []*ec2.Tag, key string) (string, bool) { | ||||
| 	for _, tag := range tags { | ||||
| @@ -2323,18 +2220,23 @@ func findTag(tags []*ec2.Tag, key string) (string, bool) { | ||||
| // However, in future this will likely be treated as an error. | ||||
| func (c *Cloud) findSubnets() ([]*ec2.Subnet, error) { | ||||
| 	request := &ec2.DescribeSubnetsInput{} | ||||
| 	vpcIDFilter := newEc2Filter("vpc-id", c.vpcID) | ||||
| 	filters := []*ec2.Filter{vpcIDFilter} | ||||
| 	filters = c.addFilters(filters) | ||||
| 	request.Filters = filters | ||||
| 	filters := []*ec2.Filter{newEc2Filter("vpc-id", c.vpcID)} | ||||
| 	request.Filters = c.tagging.addFilters(filters) | ||||
|  | ||||
| 	subnets, err := c.ec2.DescribeSubnets(request) | ||||
| 	if err != nil { | ||||
| 		return nil, fmt.Errorf("error describing subnets: %v", err) | ||||
| 	} | ||||
|  | ||||
| 	if len(subnets) != 0 { | ||||
| 		return subnets, nil | ||||
| 	var matches []*ec2.Subnet | ||||
| 	for _, subnet := range subnets { | ||||
| 		if c.tagging.hasClusterTag(subnet.Tags) { | ||||
| 			matches = append(matches, subnet) | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	if len(matches) != 0 { | ||||
| 		return matches, nil | ||||
| 	} | ||||
|  | ||||
| 	// Fall back to the current instance subnets, if nothing is tagged | ||||
| @@ -2889,7 +2791,7 @@ func findSecurityGroupForInstance(instance *ec2.Instance, taggedSecurityGroups m | ||||
| // Return all the security groups that are tagged as being part of our cluster | ||||
| func (c *Cloud) getTaggedSecurityGroups() (map[string]*ec2.SecurityGroup, error) { | ||||
| 	request := &ec2.DescribeSecurityGroupsInput{} | ||||
| 	request.Filters = c.addFilters(nil) | ||||
| 	request.Filters = c.tagging.addFilters(nil) | ||||
| 	groups, err := c.ec2.DescribeSecurityGroups(request) | ||||
| 	if err != nil { | ||||
| 		return nil, fmt.Errorf("error querying security groups: %v", err) | ||||
| @@ -2897,6 +2799,10 @@ func (c *Cloud) getTaggedSecurityGroups() (map[string]*ec2.SecurityGroup, error) | ||||
|  | ||||
| 	m := make(map[string]*ec2.SecurityGroup) | ||||
| 	for _, group := range groups { | ||||
| 		if !c.tagging.hasClusterTag(group.Tags) { | ||||
| 			continue | ||||
| 		} | ||||
|  | ||||
| 		id := aws.StringValue(group.GroupId) | ||||
| 		if id == "" { | ||||
| 			glog.Warningf("Ignoring group without id: %v", group) | ||||
| @@ -2931,13 +2837,23 @@ func (c *Cloud) updateInstanceSecurityGroupsForLoadBalancer(lb *elb.LoadBalancer | ||||
| 	} | ||||
|  | ||||
| 	// Get the actual list of groups that allow ingress from the load-balancer | ||||
| 	describeRequest := &ec2.DescribeSecurityGroupsInput{} | ||||
| 	filters := []*ec2.Filter{} | ||||
| 	filters = append(filters, newEc2Filter("ip-permission.group-id", loadBalancerSecurityGroupID)) | ||||
| 	describeRequest.Filters = c.addFilters(filters) | ||||
| 	actualGroups, err := c.ec2.DescribeSecurityGroups(describeRequest) | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("error querying security groups for ELB: %v", err) | ||||
| 	var actualGroups []*ec2.SecurityGroup | ||||
| 	{ | ||||
| 		describeRequest := &ec2.DescribeSecurityGroupsInput{} | ||||
| 		filters := []*ec2.Filter{ | ||||
| 			newEc2Filter("ip-permission.group-id", loadBalancerSecurityGroupID), | ||||
| 		} | ||||
| 		describeRequest.Filters = c.tagging.addFilters(filters) | ||||
| 		response, err := c.ec2.DescribeSecurityGroups(describeRequest) | ||||
| 		if err != nil { | ||||
| 			return fmt.Errorf("error querying security groups for ELB: %v", err) | ||||
| 		} | ||||
| 		for _, sg := range response { | ||||
| 			if !c.tagging.hasClusterTag(sg.Tags) { | ||||
| 				continue | ||||
| 			} | ||||
| 			actualGroups = append(actualGroups, sg) | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	taggedSecurityGroups, err := c.getTaggedSecurityGroups() | ||||
| @@ -3226,12 +3142,7 @@ func (c *Cloud) getInstancesByNodeNamesCached(nodeNames sets.String) ([]*ec2.Ins | ||||
| 		newEc2Filter("instance-state-name", "running"), | ||||
| 	} | ||||
|  | ||||
| 	filters = c.addFilters(filters) | ||||
| 	request := &ec2.DescribeInstancesInput{ | ||||
| 		Filters: filters, | ||||
| 	} | ||||
|  | ||||
| 	instances, err := c.ec2.DescribeInstances(request) | ||||
| 	instances, err := c.describeInstances(filters) | ||||
| 	if err != nil { | ||||
| 		glog.V(2).Infof("Failed to describe instances %v", nodeNames) | ||||
| 		return nil, err | ||||
| @@ -3248,6 +3159,26 @@ func (c *Cloud) getInstancesByNodeNamesCached(nodeNames sets.String) ([]*ec2.Ins | ||||
| 	return instances, nil | ||||
| } | ||||
|  | ||||
| func (c *Cloud) describeInstances(filters []*ec2.Filter) ([]*ec2.Instance, error) { | ||||
| 	filters = c.tagging.addFilters(filters) | ||||
| 	request := &ec2.DescribeInstancesInput{ | ||||
| 		Filters: filters, | ||||
| 	} | ||||
|  | ||||
| 	response, err := c.ec2.DescribeInstances(request) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
|  | ||||
| 	var matches []*ec2.Instance | ||||
| 	for _, instance := range response { | ||||
| 		if c.tagging.hasClusterTag(instance.Tags) { | ||||
| 			matches = append(matches, instance) | ||||
| 		} | ||||
| 	} | ||||
| 	return matches, nil | ||||
| } | ||||
|  | ||||
| // mapNodeNameToPrivateDNSName maps a k8s NodeName to an AWS Instance PrivateDNSName | ||||
| // This is a simple string cast | ||||
| func mapNodeNameToPrivateDNSName(nodeName types.NodeName) string { | ||||
| @@ -3267,15 +3198,12 @@ func (c *Cloud) findInstanceByNodeName(nodeName types.NodeName) (*ec2.Instance, | ||||
| 		newEc2Filter("private-dns-name", privateDNSName), | ||||
| 		newEc2Filter("instance-state-name", "running"), | ||||
| 	} | ||||
| 	filters = c.addFilters(filters) | ||||
| 	request := &ec2.DescribeInstancesInput{ | ||||
| 		Filters: filters, | ||||
| 	} | ||||
|  | ||||
| 	instances, err := c.ec2.DescribeInstances(request) | ||||
| 	instances, err := c.describeInstances(filters) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
|  | ||||
| 	if len(instances) == 0 { | ||||
| 		return nil, nil | ||||
| 	} | ||||
| @@ -3307,23 +3235,3 @@ func (c *Cloud) getFullInstance(nodeName types.NodeName) (*awsInstance, *ec2.Ins | ||||
| 	awsInstance := newAWSInstance(c.ec2, instance) | ||||
| 	return awsInstance, instance, err | ||||
| } | ||||
|  | ||||
| // Add additional filters, to match on our tags | ||||
| // This lets us run multiple k8s clusters in a single EC2 AZ | ||||
| func (c *Cloud) addFilters(filters []*ec2.Filter) []*ec2.Filter { | ||||
| 	for k, v := range c.filterTags { | ||||
| 		filters = append(filters, newEc2Filter("tag:"+k, v)) | ||||
| 	} | ||||
| 	if len(filters) == 0 { | ||||
| 		// We can't pass a zero-length Filters to AWS (it's an error) | ||||
| 		// So if we end up with no filters; just return nil | ||||
| 		return nil | ||||
| 	} | ||||
|  | ||||
| 	return filters | ||||
| } | ||||
|  | ||||
| // Returns the cluster name or an empty string | ||||
| func (c *Cloud) getClusterName() string { | ||||
| 	return c.filterTags[TagNameKubernetesCluster] | ||||
| } | ||||
|   | ||||
| @@ -55,9 +55,14 @@ func (c *Cloud) ensureLoadBalancer(namespacedName types.NamespacedName, loadBala | ||||
|  | ||||
| 		createRequest.SecurityGroups = stringPointerArray(securityGroupIDs) | ||||
|  | ||||
| 		createRequest.Tags = []*elb.Tag{ | ||||
| 			{Key: aws.String(TagNameKubernetesCluster), Value: aws.String(c.getClusterName())}, | ||||
| 			{Key: aws.String(TagNameKubernetesService), Value: aws.String(namespacedName.String())}, | ||||
| 		tags := c.tagging.buildTags(ResourceLifecycleOwned, map[string]string{ | ||||
| 			TagNameKubernetesService: namespacedName.String(), | ||||
| 		}) | ||||
|  | ||||
| 		for k, v := range tags { | ||||
| 			createRequest.Tags = append(createRequest.Tags, &elb.Tag{ | ||||
| 				Key: aws.String(k), Value: aws.String(v), | ||||
| 			}) | ||||
| 		} | ||||
|  | ||||
| 		glog.Infof("Creating load balancer for %v with name: %s", namespacedName, loadBalancerName) | ||||
|   | ||||
| @@ -29,14 +29,20 @@ func (c *Cloud) findRouteTable(clusterName string) (*ec2.RouteTable, error) { | ||||
| 	// This should be unnecessary (we already filter on TagNameKubernetesCluster, | ||||
| 	// and something is broken if cluster name doesn't match, but anyway... | ||||
| 	// TODO: All clouds should be cluster-aware by default | ||||
| 	filters := []*ec2.Filter{newEc2Filter("tag:"+TagNameKubernetesCluster, clusterName)} | ||||
| 	request := &ec2.DescribeRouteTablesInput{Filters: c.addFilters(filters)} | ||||
| 	request := &ec2.DescribeRouteTablesInput{Filters: c.tagging.addFilters(nil)} | ||||
|  | ||||
| 	tables, err := c.ec2.DescribeRouteTables(request) | ||||
| 	response, err := c.ec2.DescribeRouteTables(request) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
|  | ||||
| 	var tables []*ec2.RouteTable | ||||
| 	for _, table := range response { | ||||
| 		if c.tagging.hasClusterTag(table.Tags) { | ||||
| 			tables = append(tables, table) | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	if len(tables) == 0 { | ||||
| 		return nil, fmt.Errorf("unable to find route table for AWS cluster: %s", clusterName) | ||||
| 	} | ||||
|   | ||||
| @@ -146,7 +146,7 @@ func NewFakeAWSServices() *FakeAWSServices { | ||||
| 	s.instances = []*ec2.Instance{selfInstance} | ||||
|  | ||||
| 	var tag ec2.Tag | ||||
| 	tag.Key = aws.String(TagNameKubernetesCluster) | ||||
| 	tag.Key = aws.String(TagNameKubernetesClusterLegacy) | ||||
| 	tag.Value = aws.String(TestClusterId) | ||||
| 	selfInstance.Tags = []*ec2.Tag{&tag} | ||||
|  | ||||
| @@ -177,24 +177,6 @@ func (s *FakeAWSServices) Metadata() (EC2Metadata, error) { | ||||
| 	return s.metadata, nil | ||||
| } | ||||
|  | ||||
| func TestFilterTags(t *testing.T) { | ||||
| 	awsServices := NewFakeAWSServices() | ||||
| 	c, err := newAWSCloud(strings.NewReader("[global]"), awsServices) | ||||
| 	if err != nil { | ||||
| 		t.Errorf("Error building aws cloud: %v", err) | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	if len(c.filterTags) != 1 { | ||||
| 		t.Errorf("unexpected filter tags: %v", c.filterTags) | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	if c.filterTags[TagNameKubernetesCluster] != TestClusterId { | ||||
| 		t.Errorf("unexpected filter tags: %v", c.filterTags) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func TestNewAWSCloud(t *testing.T) { | ||||
| 	tests := []struct { | ||||
| 		name string | ||||
| @@ -279,6 +261,15 @@ func instanceMatchesFilter(instance *ec2.Instance, filter *ec2.Filter) bool { | ||||
| 		return contains(filter.Values, *instance.State.Name) | ||||
| 	} | ||||
|  | ||||
| 	if name == "tag-key" { | ||||
| 		for _, instanceTag := range instance.Tags { | ||||
| 			if contains(filter.Values, aws.StringValue(instanceTag.Key)) { | ||||
| 				return true | ||||
| 			} | ||||
| 		} | ||||
| 		return false | ||||
| 	} | ||||
|  | ||||
| 	if strings.HasPrefix(name, "tag:") { | ||||
| 		tagName := name[4:] | ||||
| 		for _, instanceTag := range instance.Tags { | ||||
| @@ -286,7 +277,9 @@ func instanceMatchesFilter(instance *ec2.Instance, filter *ec2.Filter) bool { | ||||
| 				return true | ||||
| 			} | ||||
| 		} | ||||
| 		return false | ||||
| 	} | ||||
|  | ||||
| 	panic("Unknown filter name: " + name) | ||||
| } | ||||
|  | ||||
| @@ -974,13 +967,14 @@ func TestIpPermissionExistsHandlesMultipleGroupIdsWithUserIds(t *testing.T) { | ||||
| 		t.Errorf("Should have not been considered equal since first is not in the second array of groups") | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func TestFindInstanceByNodeNameExcludesTerminatedInstances(t *testing.T) { | ||||
| 	awsServices := NewFakeAWSServices() | ||||
|  | ||||
| 	nodeName := types.NodeName("my-dns.internal") | ||||
|  | ||||
| 	var tag ec2.Tag | ||||
| 	tag.Key = aws.String(TagNameKubernetesCluster) | ||||
| 	tag.Key = aws.String(TagNameKubernetesClusterLegacy) | ||||
| 	tag.Value = aws.String(TestClusterId) | ||||
| 	tags := []*ec2.Tag{&tag} | ||||
|  | ||||
| @@ -1024,8 +1018,8 @@ func TestFindInstancesByNodeNameCached(t *testing.T) { | ||||
| 	nodeNameTwo := "my-dns-two.internal" | ||||
|  | ||||
| 	var tag ec2.Tag | ||||
| 	tag.Key = aws.String(TagNameKubernetesCluster) | ||||
| 	tag.Value = aws.String(TestClusterId) | ||||
| 	tag.Key = aws.String(TagNameKubernetesClusterPrefix + TestClusterId) | ||||
| 	tag.Value = aws.String("") | ||||
| 	tags := []*ec2.Tag{&tag} | ||||
|  | ||||
| 	var runningInstance ec2.Instance | ||||
|   | ||||
							
								
								
									
										256
									
								
								pkg/cloudprovider/providers/aws/tags.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										256
									
								
								pkg/cloudprovider/providers/aws/tags.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,256 @@ | ||||
| /* | ||||
| Copyright 2017 The Kubernetes Authors. | ||||
|  | ||||
| Licensed under the Apache License, Version 2.0 (the "License"); | ||||
| you may not use this file except in compliance with the License. | ||||
| You may obtain a copy of the License at | ||||
|  | ||||
|     http://www.apache.org/licenses/LICENSE-2.0 | ||||
|  | ||||
| Unless required by applicable law or agreed to in writing, software | ||||
| distributed under the License is distributed on an "AS IS" BASIS, | ||||
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
| See the License for the specific language governing permissions and | ||||
| limitations under the License. | ||||
| */ | ||||
|  | ||||
| package aws | ||||
|  | ||||
| import ( | ||||
| 	"fmt" | ||||
|  | ||||
| 	"github.com/aws/aws-sdk-go/aws" | ||||
| 	"github.com/aws/aws-sdk-go/service/ec2" | ||||
| 	"github.com/golang/glog" | ||||
| 	"k8s.io/apimachinery/pkg/util/wait" | ||||
| 	"strings" | ||||
| ) | ||||
|  | ||||
| // TagNameKubernetesClusterPrefix is the tag name we use to differentiate multiple | ||||
| // logically independent clusters running in the same AZ. | ||||
| // The tag key = TagNameKubernetesClusterPrefix + clusterID | ||||
| // The tag value is an ownership value | ||||
| const TagNameKubernetesClusterPrefix = "kubernetes.io/cluster/" | ||||
|  | ||||
| // TagNameKubernetesClusterLegacy is the legacy tag name we use to differentiate multiple | ||||
| // logically independent clusters running in the same AZ.  The problem with it was that it | ||||
| // did not allow shared resources. | ||||
| const TagNameKubernetesClusterLegacy = "KubernetesCluster" | ||||
|  | ||||
| type ResourceLifecycle string | ||||
|  | ||||
| const ( | ||||
| 	// ResourceLifecycleOwned is the value we use when tagging resources to indicate | ||||
| 	// that the resource is considered owned and managed by the cluster, | ||||
| 	// and in particular that the lifecycle is tied to the lifecycle of the cluster. | ||||
| 	ResourceLifecycleOwned = "owned" | ||||
| 	// ResourceLifecycleShared is the value we use when tagging resources to indicate | ||||
| 	// that the resource is shared between multiple clusters, and should not be destroyed | ||||
| 	// if the cluster is destroyed. | ||||
| 	ResourceLifecycleShared = "shared" | ||||
| ) | ||||
|  | ||||
| type awsTagging struct { | ||||
| 	// ClusterID is our cluster identifier: we tag AWS resources with this value, | ||||
| 	// and thus we can run two independent clusters in the same VPC or subnets. | ||||
| 	// This gives us similar functionality to GCE projects. | ||||
| 	ClusterID string | ||||
|  | ||||
| 	// usesLegacyTags is true if we are using the legacy TagNameKubernetesClusterLegacy tags | ||||
| 	usesLegacyTags bool | ||||
| } | ||||
|  | ||||
| func (t *awsTagging) init(legacyClusterID string, clusterID string) error { | ||||
| 	if legacyClusterID != "" { | ||||
| 		if clusterID != "" && legacyClusterID != clusterID { | ||||
| 			return fmt.Errorf("ClusterID tags did not match: %q vs %q", clusterID, legacyClusterID) | ||||
| 		} | ||||
| 		t.usesLegacyTags = true | ||||
| 		clusterID = legacyClusterID | ||||
| 	} | ||||
|  | ||||
| 	t.ClusterID = clusterID | ||||
|  | ||||
| 	if clusterID != "" { | ||||
| 		glog.Infof("AWS cloud filtering on ClusterID: %v", clusterID) | ||||
| 	} else { | ||||
| 		glog.Infof("AWS cloud - no clusterID filtering") | ||||
| 	} | ||||
|  | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // Extracts a clusterID from the given tags, if one is present | ||||
| // If no clusterID is found, returns "", nil | ||||
| // If multiple (different) clusterIDs are found, returns an error | ||||
| func (t *awsTagging) initFromTags(tags []*ec2.Tag) error { | ||||
| 	legacyClusterID, newClusterID, err := findClusterIDs(tags) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	if legacyClusterID == "" && newClusterID == "" { | ||||
| 		glog.Errorf("Tag %q nor %q not found; Kubernetes may behave unexpectedly.", TagNameKubernetesClusterLegacy, TagNameKubernetesClusterPrefix+"...") | ||||
| 	} | ||||
|  | ||||
| 	return t.init(legacyClusterID, newClusterID) | ||||
| } | ||||
|  | ||||
| // Extracts the legacy & new cluster ids from the given tags, if they are present | ||||
| // If duplicate tags are found, returns an error | ||||
| func findClusterIDs(tags []*ec2.Tag) (string, string, error) { | ||||
| 	legacyClusterID := "" | ||||
| 	newClusterID := "" | ||||
|  | ||||
| 	for _, tag := range tags { | ||||
| 		tagKey := aws.StringValue(tag.Key) | ||||
| 		if strings.HasPrefix(tagKey, TagNameKubernetesClusterPrefix) { | ||||
| 			id := strings.TrimPrefix(tagKey, TagNameKubernetesClusterPrefix) | ||||
| 			if newClusterID != "" { | ||||
| 				return "", "", fmt.Errorf("Found multiple cluster tags with prefix %s (%q and %q)", TagNameKubernetesClusterPrefix, newClusterID, id) | ||||
| 			} | ||||
| 			newClusterID = id | ||||
| 		} | ||||
|  | ||||
| 		if tagKey == TagNameKubernetesClusterLegacy { | ||||
| 			id := aws.StringValue(tag.Value) | ||||
| 			if legacyClusterID != "" { | ||||
| 				return "", "", fmt.Errorf("Found multiple %s tags (%q and %q)", TagNameKubernetesClusterLegacy, legacyClusterID, id) | ||||
| 			} | ||||
| 			legacyClusterID = id | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	return legacyClusterID, newClusterID, nil | ||||
| } | ||||
|  | ||||
| func (t *awsTagging) clusterTagKey() string { | ||||
| 	return TagNameKubernetesClusterPrefix + t.ClusterID | ||||
| } | ||||
|  | ||||
| func (t *awsTagging) hasClusterTag(tags []*ec2.Tag) bool { | ||||
| 	clusterTagKey := t.clusterTagKey() | ||||
| 	for _, tag := range tags { | ||||
| 		tagKey := aws.StringValue(tag.Key) | ||||
| 		// For 1.6, we continue to recognize the legacy tags, for the 1.5 -> 1.6 upgrade | ||||
| 		if tagKey == TagNameKubernetesClusterLegacy { | ||||
| 			return aws.StringValue(tag.Value) == t.ClusterID | ||||
| 		} | ||||
|  | ||||
| 		if tagKey == clusterTagKey { | ||||
| 			return true | ||||
| 		} | ||||
| 	} | ||||
| 	return false | ||||
| } | ||||
|  | ||||
| // Ensure that a resource has the correct tags | ||||
| // If it has no tags, we assume that this was a problem caused by an error in between creation and tagging, | ||||
| // and we add the tags.  If it has a different cluster's tags, that is an error. | ||||
| func (c *awsTagging) readRepairClusterTags(client EC2, resourceID string, lifecycle ResourceLifecycle, additionalTags map[string]string, observedTags []*ec2.Tag) error { | ||||
| 	actualTagMap := make(map[string]string) | ||||
| 	for _, tag := range observedTags { | ||||
| 		actualTagMap[aws.StringValue(tag.Key)] = aws.StringValue(tag.Value) | ||||
| 	} | ||||
|  | ||||
| 	expectedTags := c.buildTags(lifecycle, additionalTags) | ||||
|  | ||||
| 	addTags := make(map[string]string) | ||||
| 	for k, expected := range expectedTags { | ||||
| 		actual := actualTagMap[k] | ||||
| 		if actual == expected { | ||||
| 			continue | ||||
| 		} | ||||
| 		if actual == "" { | ||||
| 			glog.Warningf("Resource %q was missing expected cluster tag %q.  Will add (with value %q)", resourceID, k, expected) | ||||
| 			addTags[k] = expected | ||||
| 		} else { | ||||
| 			return fmt.Errorf("resource %q has tag belonging to another cluster: %q=%q (expected %q)", resourceID, k, actual, expected) | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	if err := c.createTags(client, resourceID, lifecycle, additionalTags); err != nil { | ||||
| 		return fmt.Errorf("error adding missing tags to resource %q: %v", resourceID, err) | ||||
| 	} | ||||
|  | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // createTags calls EC2 CreateTags, but adds retry-on-failure logic | ||||
| // We retry mainly because if we create an object, we cannot tag it until it is "fully created" (eventual consistency) | ||||
| // The error code varies though (depending on what we are tagging), so we simply retry on all errors | ||||
| func (t *awsTagging) createTags(client EC2, resourceID string, lifecycle ResourceLifecycle, additionalTags map[string]string) error { | ||||
| 	tags := t.buildTags(lifecycle, additionalTags) | ||||
|  | ||||
| 	if tags == nil || len(tags) == 0 { | ||||
| 		return nil | ||||
| 	} | ||||
|  | ||||
| 	var awsTags []*ec2.Tag | ||||
| 	for k, v := range tags { | ||||
| 		tag := &ec2.Tag{ | ||||
| 			Key:   aws.String(k), | ||||
| 			Value: aws.String(v), | ||||
| 		} | ||||
| 		awsTags = append(awsTags, tag) | ||||
| 	} | ||||
|  | ||||
| 	backoff := wait.Backoff{ | ||||
| 		Duration: createTagInitialDelay, | ||||
| 		Factor:   createTagFactor, | ||||
| 		Steps:    createTagSteps, | ||||
| 	} | ||||
| 	request := &ec2.CreateTagsInput{} | ||||
| 	request.Resources = []*string{&resourceID} | ||||
| 	request.Tags = awsTags | ||||
|  | ||||
| 	var lastErr error | ||||
| 	err := wait.ExponentialBackoff(backoff, func() (bool, error) { | ||||
| 		_, err := client.CreateTags(request) | ||||
| 		if err == nil { | ||||
| 			return true, nil | ||||
| 		} | ||||
|  | ||||
| 		// We could check that the error is retryable, but the error code changes based on what we are tagging | ||||
| 		// SecurityGroup: InvalidGroup.NotFound | ||||
| 		glog.V(2).Infof("Failed to create tags; will retry.  Error was %v", err) | ||||
| 		lastErr = err | ||||
| 		return false, nil | ||||
| 	}) | ||||
| 	if err == wait.ErrWaitTimeout { | ||||
| 		// return real CreateTags error instead of timeout | ||||
| 		err = lastErr | ||||
| 	} | ||||
| 	return err | ||||
| } | ||||
|  | ||||
| // Add additional filters, to match on our tags | ||||
| // This lets us run multiple k8s clusters in a single EC2 AZ | ||||
| func (t *awsTagging) addFilters(filters []*ec2.Filter) []*ec2.Filter { | ||||
| 	// For 1.6, we always recognize the legacy tag, for the 1.5 -> 1.6 upgrade | ||||
| 	// There are no "or" filters by key, so we look for both the legacy and new key, and then we have to post-filter | ||||
| 	f := newEc2Filter("tag-key", TagNameKubernetesClusterLegacy, t.clusterTagKey()) | ||||
| 	filters = append(filters, f) | ||||
|  | ||||
| 	if len(filters) == 0 { | ||||
| 		// We can't pass a zero-length Filters to AWS (it's an error) | ||||
| 		// So if we end up with no filters; just return nil | ||||
| 		return nil | ||||
| 	} | ||||
|  | ||||
| 	return filters | ||||
| } | ||||
|  | ||||
| func (t *awsTagging) buildTags(lifecycle ResourceLifecycle, additionalTags map[string]string) map[string]string { | ||||
| 	tags := make(map[string]string) | ||||
| 	for k, v := range additionalTags { | ||||
| 		tags[k] = v | ||||
| 	} | ||||
| 	// We only create legacy tags if we are using legacy tags, i.e. if we have seen a legacy tag on our instance | ||||
| 	if t.usesLegacyTags { | ||||
| 		tags[TagNameKubernetesClusterLegacy] = t.ClusterID | ||||
| 	} | ||||
| 	tags[t.clusterTagKey()] = string(lifecycle) | ||||
|  | ||||
| 	return tags | ||||
| } | ||||
							
								
								
									
										111
									
								
								pkg/cloudprovider/providers/aws/tags_test.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										111
									
								
								pkg/cloudprovider/providers/aws/tags_test.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,111 @@ | ||||
| /* | ||||
| Copyright 2014 The Kubernetes Authors. | ||||
|  | ||||
| Licensed under the Apache License, Version 2.0 (the "License"); | ||||
| you may not use this file except in compliance with the License. | ||||
| You may obtain a copy of the License at | ||||
|  | ||||
|     http://www.apache.org/licenses/LICENSE-2.0 | ||||
|  | ||||
| Unless required by applicable law or agreed to in writing, software | ||||
| distributed under the License is distributed on an "AS IS" BASIS, | ||||
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
| See the License for the specific language governing permissions and | ||||
| limitations under the License. | ||||
| */ | ||||
|  | ||||
| package aws | ||||
|  | ||||
| import ( | ||||
| 	"github.com/aws/aws-sdk-go/aws" | ||||
| 	"github.com/aws/aws-sdk-go/service/ec2" | ||||
| 	"strings" | ||||
| 	"testing" | ||||
| ) | ||||
|  | ||||
| func TestFilterTags(t *testing.T) { | ||||
| 	awsServices := NewFakeAWSServices() | ||||
| 	c, err := newAWSCloud(strings.NewReader("[global]"), awsServices) | ||||
| 	if err != nil { | ||||
| 		t.Errorf("Error building aws cloud: %v", err) | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	if c.tagging.ClusterID != TestClusterId { | ||||
| 		t.Errorf("unexpected ClusterID: %v", c.tagging.ClusterID) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func TestFindClusterID(t *testing.T) { | ||||
| 	grid := []struct { | ||||
| 		Tags           map[string]string | ||||
| 		ExpectedNew    string | ||||
| 		ExpectedLegacy string | ||||
| 		ExpectError    bool | ||||
| 	}{ | ||||
| 		{ | ||||
| 			Tags: map[string]string{}, | ||||
| 		}, | ||||
| 		{ | ||||
| 			Tags: map[string]string{ | ||||
| 				TagNameKubernetesClusterLegacy: "a", | ||||
| 			}, | ||||
| 			ExpectedLegacy: "a", | ||||
| 		}, | ||||
| 		{ | ||||
| 			Tags: map[string]string{ | ||||
| 				TagNameKubernetesClusterPrefix + "a": "owned", | ||||
| 			}, | ||||
| 			ExpectedNew: "a", | ||||
| 		}, | ||||
| 		{ | ||||
| 			Tags: map[string]string{ | ||||
| 				TagNameKubernetesClusterPrefix + "a": "", | ||||
| 			}, | ||||
| 			ExpectedNew: "a", | ||||
| 		}, | ||||
| 		{ | ||||
| 			Tags: map[string]string{ | ||||
| 				TagNameKubernetesClusterLegacy:       "a", | ||||
| 				TagNameKubernetesClusterPrefix + "a": "", | ||||
| 			}, | ||||
| 			ExpectedLegacy: "a", | ||||
| 			ExpectedNew:    "a", | ||||
| 		}, | ||||
| 		{ | ||||
| 			Tags: map[string]string{ | ||||
| 				TagNameKubernetesClusterPrefix + "a": "", | ||||
| 				TagNameKubernetesClusterPrefix + "b": "", | ||||
| 			}, | ||||
| 			ExpectError: true, | ||||
| 		}, | ||||
| 	} | ||||
| 	for _, g := range grid { | ||||
| 		var ec2Tags []*ec2.Tag | ||||
| 		for k, v := range g.Tags { | ||||
| 			ec2Tags = append(ec2Tags, &ec2.Tag{Key: aws.String(k), Value: aws.String(v)}) | ||||
| 		} | ||||
| 		actualLegacy, actualNew, err := findClusterIDs(ec2Tags) | ||||
| 		if g.ExpectError { | ||||
| 			if err == nil { | ||||
| 				t.Errorf("expected error for tags %v", g.Tags) | ||||
| 				continue | ||||
| 			} | ||||
| 		} else { | ||||
| 			if err != nil { | ||||
| 				t.Errorf("unexpected error for tags %v: %v", g.Tags, err) | ||||
| 				continue | ||||
| 			} | ||||
|  | ||||
| 			if g.ExpectedNew != actualNew { | ||||
| 				t.Errorf("unexpected new clusterid for tags %v: %s vs %s", g.Tags, g.ExpectedNew, actualNew) | ||||
| 				continue | ||||
| 			} | ||||
|  | ||||
| 			if g.ExpectedLegacy != actualLegacy { | ||||
| 				t.Errorf("unexpected new clusterid for tags %v: %s vs %s", g.Tags, g.ExpectedLegacy, actualLegacy) | ||||
| 				continue | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
		Reference in New Issue
	
	Block a user
	 Kubernetes Submit Queue
					Kubernetes Submit Queue