Merge pull request #21627 from justinsb/fix_11324
Auto commit by PR queue bot
This commit is contained in:
		@@ -1411,17 +1411,7 @@ func (s *AWSCloud) CreateDisk(volumeOptions *VolumeOptions) (string, error) {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	// apply tags
 | 
						// apply tags
 | 
				
			||||||
	if volumeOptions.Tags != nil {
 | 
						if volumeOptions.Tags != nil {
 | 
				
			||||||
		tags := []*ec2.Tag{}
 | 
							if err := s.createTags(awsID, *volumeOptions.Tags); err != nil {
 | 
				
			||||||
		for k, v := range *volumeOptions.Tags {
 | 
					 | 
				
			||||||
			tag := &ec2.Tag{}
 | 
					 | 
				
			||||||
			tag.Key = aws.String(k)
 | 
					 | 
				
			||||||
			tag.Value = aws.String(v)
 | 
					 | 
				
			||||||
			tags = append(tags, tag)
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
		tagRequest := &ec2.CreateTagsInput{}
 | 
					 | 
				
			||||||
		tagRequest.Resources = []*string{&awsID}
 | 
					 | 
				
			||||||
		tagRequest.Tags = tags
 | 
					 | 
				
			||||||
		if _, err := s.createTags(tagRequest); err != nil {
 | 
					 | 
				
			||||||
			// delete the volume and hope it succeeds
 | 
								// delete the volume and hope it succeeds
 | 
				
			||||||
			_, delerr := s.DeleteDisk(volumeName)
 | 
								_, delerr := s.DeleteDisk(volumeName)
 | 
				
			||||||
			if delerr != nil {
 | 
								if delerr != nil {
 | 
				
			||||||
@@ -1726,6 +1716,36 @@ func (s *AWSCloud) removeSecurityGroupIngress(securityGroupId string, removePerm
 | 
				
			|||||||
	return true, nil
 | 
						return true, nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// Ensure that a resource has the correct tags
 | 
				
			||||||
 | 
					// If it has no tags, we assume that this was a problem caused by an error in between creation and tagging,
 | 
				
			||||||
 | 
					// and we add the tags.  If it has a different cluster's tags, that is an error.
 | 
				
			||||||
 | 
					func (s *AWSCloud) ensureClusterTags(resourceID string, tags []*ec2.Tag) error {
 | 
				
			||||||
 | 
						actualTags := make(map[string]string)
 | 
				
			||||||
 | 
						for _, tag := range tags {
 | 
				
			||||||
 | 
							actualTags[aws.StringValue(tag.Key)] = aws.StringValue(tag.Value)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						addTags := make(map[string]string)
 | 
				
			||||||
 | 
						for k, expected := range s.filterTags {
 | 
				
			||||||
 | 
							actual := actualTags[k]
 | 
				
			||||||
 | 
							if actual == expected {
 | 
				
			||||||
 | 
								continue
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							if actual == "" {
 | 
				
			||||||
 | 
								glog.Warningf("Resource %q was missing expected cluster tag %q.  Will add (with value %q)", resourceID, k, expected)
 | 
				
			||||||
 | 
								addTags[k] = expected
 | 
				
			||||||
 | 
							} else {
 | 
				
			||||||
 | 
								return fmt.Errorf("resource %q has tag belonging to another cluster: %q=%q (expected %q)", resourceID, k, actual, expected)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if err := s.createTags(resourceID, addTags); err != nil {
 | 
				
			||||||
 | 
							return fmt.Errorf("error adding missing tags to resource %q: %v", resourceID, err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						return nil
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// Makes sure the security group exists
 | 
					// Makes sure the security group exists
 | 
				
			||||||
// Returns the security group id or error
 | 
					// Returns the security group id or error
 | 
				
			||||||
func (s *AWSCloud) ensureSecurityGroup(name string, description string, vpcID string) (string, error) {
 | 
					func (s *AWSCloud) ensureSecurityGroup(name string, description string, vpcID string) (string, error) {
 | 
				
			||||||
@@ -1739,7 +1759,12 @@ func (s *AWSCloud) ensureSecurityGroup(name string, description string, vpcID st
 | 
				
			|||||||
			newEc2Filter("group-name", name),
 | 
								newEc2Filter("group-name", name),
 | 
				
			||||||
			newEc2Filter("vpc-id", vpcID),
 | 
								newEc2Filter("vpc-id", vpcID),
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		request.Filters = s.addFilters(filters)
 | 
							// Note that we do _not_ add our tag filters; group-name + vpc-id is the EC2 primary key.
 | 
				
			||||||
 | 
							// However, we do check that it matches our tags.
 | 
				
			||||||
 | 
							// If it doesn't have any tags, we tag it; this is how we recover if we failed to tag before.
 | 
				
			||||||
 | 
							// If it has a different cluster's tags, that is an error.
 | 
				
			||||||
 | 
							// This shouldn't happen because name is expected to be globally unique (UUID derived)
 | 
				
			||||||
 | 
							request.Filters = filters
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		securityGroups, err := s.ec2.DescribeSecurityGroups(request)
 | 
							securityGroups, err := s.ec2.DescribeSecurityGroups(request)
 | 
				
			||||||
		if err != nil {
 | 
							if err != nil {
 | 
				
			||||||
@@ -1750,7 +1775,12 @@ func (s *AWSCloud) ensureSecurityGroup(name string, description string, vpcID st
 | 
				
			|||||||
			if len(securityGroups) > 1 {
 | 
								if len(securityGroups) > 1 {
 | 
				
			||||||
				glog.Warning("Found multiple security groups with name:", name)
 | 
									glog.Warning("Found multiple security groups with name:", name)
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
			return orEmpty(securityGroups[0].GroupId), nil
 | 
								err := s.ensureClusterTags(aws.StringValue(securityGroups[0].GroupId), securityGroups[0].Tags)
 | 
				
			||||||
 | 
								if err != nil {
 | 
				
			||||||
 | 
									return "", err
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								return aws.StringValue(securityGroups[0].GroupId), nil
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		createRequest := &ec2.CreateSecurityGroupInput{}
 | 
							createRequest := &ec2.CreateSecurityGroupInput{}
 | 
				
			||||||
@@ -1782,22 +1812,13 @@ func (s *AWSCloud) ensureSecurityGroup(name string, description string, vpcID st
 | 
				
			|||||||
		return "", fmt.Errorf("created security group, but id was not returned: %s", name)
 | 
							return "", fmt.Errorf("created security group, but id was not returned: %s", name)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	tags := []*ec2.Tag{}
 | 
						err := s.createTags(groupID, s.filterTags)
 | 
				
			||||||
	for k, v := range s.filterTags {
 | 
						if err != nil {
 | 
				
			||||||
		tag := &ec2.Tag{}
 | 
							// If we retry, ensureClusterTags will recover from this - it
 | 
				
			||||||
		tag.Key = aws.String(k)
 | 
							// will add the missing tags.  We could delete the security
 | 
				
			||||||
		tag.Value = aws.String(v)
 | 
							// group here, but that doesn't feel like the right thing, as
 | 
				
			||||||
		tags = append(tags, tag)
 | 
							// the caller is likely to retry the create
 | 
				
			||||||
	}
 | 
							return "", fmt.Errorf("error tagging security group: %v", err)
 | 
				
			||||||
 | 
					 | 
				
			||||||
	if len(tags) > 0 {
 | 
					 | 
				
			||||||
		tagRequest := &ec2.CreateTagsInput{}
 | 
					 | 
				
			||||||
		tagRequest.Resources = []*string{&groupID}
 | 
					 | 
				
			||||||
		tagRequest.Tags = tags
 | 
					 | 
				
			||||||
		if _, err := s.createTags(tagRequest); err != nil {
 | 
					 | 
				
			||||||
			// Not clear how to recover fully from this; we're OK because we don't match on tags, but that is a little odd
 | 
					 | 
				
			||||||
			return "", fmt.Errorf("error tagging security group: %v", err)
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	return groupID, nil
 | 
						return groupID, nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
@@ -1805,15 +1826,32 @@ func (s *AWSCloud) ensureSecurityGroup(name string, description string, vpcID st
 | 
				
			|||||||
// createTags calls EC2 CreateTags, but adds retry-on-failure logic
 | 
					// createTags calls EC2 CreateTags, but adds retry-on-failure logic
 | 
				
			||||||
// We retry mainly because if we create an object, we cannot tag it until it is "fully created" (eventual consistency)
 | 
					// We retry mainly because if we create an object, we cannot tag it until it is "fully created" (eventual consistency)
 | 
				
			||||||
// The error code varies though (depending on what we are tagging), so we simply retry on all errors
 | 
					// The error code varies though (depending on what we are tagging), so we simply retry on all errors
 | 
				
			||||||
func (s *AWSCloud) createTags(request *ec2.CreateTagsInput) (*ec2.CreateTagsOutput, error) {
 | 
					func (s *AWSCloud) createTags(resourceID string, tags map[string]string) error {
 | 
				
			||||||
 | 
						if tags == nil || len(tags) == 0 {
 | 
				
			||||||
 | 
							return nil
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						var awsTags []*ec2.Tag
 | 
				
			||||||
 | 
						for k, v := range tags {
 | 
				
			||||||
 | 
							tag := &ec2.Tag{
 | 
				
			||||||
 | 
								Key:   aws.String(k),
 | 
				
			||||||
 | 
								Value: aws.String(v),
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							awsTags = append(awsTags, tag)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						request := &ec2.CreateTagsInput{}
 | 
				
			||||||
 | 
						request.Resources = []*string{&resourceID}
 | 
				
			||||||
 | 
						request.Tags = awsTags
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// TODO: We really should do exponential backoff here
 | 
						// TODO: We really should do exponential backoff here
 | 
				
			||||||
	attempt := 0
 | 
						attempt := 0
 | 
				
			||||||
	maxAttempts := 60
 | 
						maxAttempts := 60
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	for {
 | 
						for {
 | 
				
			||||||
		response, err := s.ec2.CreateTags(request)
 | 
							_, err := s.ec2.CreateTags(request)
 | 
				
			||||||
		if err == nil {
 | 
							if err == nil {
 | 
				
			||||||
			return response, err
 | 
								return nil
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		// We could check that the error is retryable, but the error code changes based on what we are tagging
 | 
							// We could check that the error is retryable, but the error code changes based on what we are tagging
 | 
				
			||||||
@@ -1821,7 +1859,7 @@ func (s *AWSCloud) createTags(request *ec2.CreateTagsInput) (*ec2.CreateTagsOutp
 | 
				
			|||||||
		attempt++
 | 
							attempt++
 | 
				
			||||||
		if attempt > maxAttempts {
 | 
							if attempt > maxAttempts {
 | 
				
			||||||
			glog.Warningf("Failed to create tags (too many attempts): %v", err)
 | 
								glog.Warningf("Failed to create tags (too many attempts): %v", err)
 | 
				
			||||||
			return response, err
 | 
								return err
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		glog.V(2).Infof("Failed to create tags; will retry.  Error was %v", err)
 | 
							glog.V(2).Infof("Failed to create tags; will retry.  Error was %v", err)
 | 
				
			||||||
		time.Sleep(1 * time.Second)
 | 
							time.Sleep(1 * time.Second)
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user