Merge pull request #85675 from jsafrane/aws-attach-consistency
Fix AWS eventual consistency of AttachDisk
This commit is contained in:
commit
b920b388e9
@ -2105,17 +2105,18 @@ func (c *Cloud) applyUnSchedulableTaint(nodeName types.NodeName, reason string)
|
|||||||
|
|
||||||
// waitForAttachmentStatus polls until the attachment status is the expected value
|
// waitForAttachmentStatus polls until the attachment status is the expected value
|
||||||
// On success, it returns the last attachment state.
|
// On success, it returns the last attachment state.
|
||||||
func (d *awsDisk) waitForAttachmentStatus(status string) (*ec2.VolumeAttachment, error) {
|
func (d *awsDisk) waitForAttachmentStatus(status string, expectedInstance, expectedDevice string) (*ec2.VolumeAttachment, error) {
|
||||||
backoff := wait.Backoff{
|
backoff := wait.Backoff{
|
||||||
Duration: volumeAttachmentStatusPollDelay,
|
Duration: volumeAttachmentStatusPollDelay,
|
||||||
Factor: volumeAttachmentStatusFactor,
|
Factor: volumeAttachmentStatusFactor,
|
||||||
Steps: volumeAttachmentStatusSteps,
|
Steps: volumeAttachmentStatusSteps,
|
||||||
}
|
}
|
||||||
|
|
||||||
// Because of rate limiting, we often see errors from describeVolume
|
// Because of rate limiting, we often see errors from describeVolume.
|
||||||
|
// Or AWS eventual consistency returns unexpected data.
|
||||||
// So we tolerate a limited number of failures.
|
// So we tolerate a limited number of failures.
|
||||||
// But once we see more than 10 errors in a row, we return the error
|
// But once we see more than 10 errors in a row, we return the error.
|
||||||
describeErrorCount := 0
|
errorCount := 0
|
||||||
|
|
||||||
// Attach/detach usually takes time. It does not make sense to start
|
// Attach/detach usually takes time. It does not make sense to start
|
||||||
// polling DescribeVolumes before some initial delay to let AWS
|
// polling DescribeVolumes before some initial delay to let AWS
|
||||||
@ -2144,8 +2145,8 @@ func (d *awsDisk) waitForAttachmentStatus(status string) (*ec2.VolumeAttachment,
|
|||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
describeErrorCount++
|
errorCount++
|
||||||
if describeErrorCount > volumeAttachmentStatusConsecutiveErrorLimit {
|
if errorCount > volumeAttachmentStatusConsecutiveErrorLimit {
|
||||||
// report the error
|
// report the error
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
@ -2154,8 +2155,6 @@ func (d *awsDisk) waitForAttachmentStatus(status string) (*ec2.VolumeAttachment,
|
|||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
describeErrorCount = 0
|
|
||||||
|
|
||||||
if len(info.Attachments) > 1 {
|
if len(info.Attachments) > 1 {
|
||||||
// Shouldn't happen; log so we know if it is
|
// Shouldn't happen; log so we know if it is
|
||||||
klog.Warningf("Found multiple attachments for volume %q: %v", d.awsID, info)
|
klog.Warningf("Found multiple attachments for volume %q: %v", d.awsID, info)
|
||||||
@ -2177,11 +2176,39 @@ func (d *awsDisk) waitForAttachmentStatus(status string) (*ec2.VolumeAttachment,
|
|||||||
if attachmentStatus == "" {
|
if attachmentStatus == "" {
|
||||||
attachmentStatus = "detached"
|
attachmentStatus = "detached"
|
||||||
}
|
}
|
||||||
|
if attachment != nil {
|
||||||
|
// AWS eventual consistency can go back in time.
|
||||||
|
// For example, we're waiting for a volume to be attached as /dev/xvdba, but AWS can tell us it's
|
||||||
|
// attached as /dev/xvdbb, where it was attached before and it was already detached.
|
||||||
|
// Retry couple of times, hoping AWS starts reporting the right status.
|
||||||
|
device := aws.StringValue(attachment.Device)
|
||||||
|
if expectedDevice != "" && device != "" && device != expectedDevice {
|
||||||
|
klog.Warningf("Expected device %s %s for volume %s, but found device %s %s", expectedDevice, status, d.name, device, attachmentStatus)
|
||||||
|
errorCount++
|
||||||
|
if errorCount > volumeAttachmentStatusConsecutiveErrorLimit {
|
||||||
|
// report the error
|
||||||
|
return false, fmt.Errorf("attachment of disk %q failed: requested device %q but found %q", d.name, expectedDevice, device)
|
||||||
|
}
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
instanceID := aws.StringValue(attachment.InstanceId)
|
||||||
|
if expectedInstance != "" && instanceID != "" && instanceID != expectedInstance {
|
||||||
|
klog.Warningf("Expected instance %s/%s for volume %s, but found instance %s/%s", expectedInstance, status, d.name, instanceID, attachmentStatus)
|
||||||
|
errorCount++
|
||||||
|
if errorCount > volumeAttachmentStatusConsecutiveErrorLimit {
|
||||||
|
// report the error
|
||||||
|
return false, fmt.Errorf("attachment of disk %q failed: requested device %q but found %q", d.name, expectedDevice, device)
|
||||||
|
}
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if attachmentStatus == status {
|
if attachmentStatus == status {
|
||||||
// Attachment is in requested state, finish waiting
|
// Attachment is in requested state, finish waiting
|
||||||
return true, nil
|
return true, nil
|
||||||
}
|
}
|
||||||
// continue waiting
|
// continue waiting
|
||||||
|
errorCount = 0
|
||||||
klog.V(2).Infof("Waiting for volume %q state: actual=%s, desired=%s", d.awsID, attachmentStatus, status)
|
klog.V(2).Infof("Waiting for volume %q state: actual=%s, desired=%s", d.awsID, attachmentStatus, status)
|
||||||
return false, nil
|
return false, nil
|
||||||
})
|
})
|
||||||
@ -2321,7 +2348,7 @@ func (c *Cloud) AttachDisk(diskName KubernetesVolumeID, nodeName types.NodeName)
|
|||||||
klog.V(2).Infof("AttachVolume volume=%q instance=%q request returned %v", disk.awsID, awsInstance.awsID, attachResponse)
|
klog.V(2).Infof("AttachVolume volume=%q instance=%q request returned %v", disk.awsID, awsInstance.awsID, attachResponse)
|
||||||
}
|
}
|
||||||
|
|
||||||
attachment, err := disk.waitForAttachmentStatus("attached")
|
attachment, err := disk.waitForAttachmentStatus("attached", awsInstance.awsID, ec2Device)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err == wait.ErrWaitTimeout {
|
if err == wait.ErrWaitTimeout {
|
||||||
@ -2341,6 +2368,7 @@ func (c *Cloud) AttachDisk(diskName KubernetesVolumeID, nodeName types.NodeName)
|
|||||||
return "", fmt.Errorf("unexpected state: attachment nil after attached %q to %q", diskName, nodeName)
|
return "", fmt.Errorf("unexpected state: attachment nil after attached %q to %q", diskName, nodeName)
|
||||||
}
|
}
|
||||||
if ec2Device != aws.StringValue(attachment.Device) {
|
if ec2Device != aws.StringValue(attachment.Device) {
|
||||||
|
// Already checked in waitForAttachmentStatus(), but just to be sure...
|
||||||
return "", fmt.Errorf("disk attachment of %q to %q failed: requested device %q but found %q", diskName, nodeName, ec2Device, aws.StringValue(attachment.Device))
|
return "", fmt.Errorf("disk attachment of %q to %q failed: requested device %q but found %q", diskName, nodeName, ec2Device, aws.StringValue(attachment.Device))
|
||||||
}
|
}
|
||||||
if awsInstance.awsID != aws.StringValue(attachment.InstanceId) {
|
if awsInstance.awsID != aws.StringValue(attachment.InstanceId) {
|
||||||
@ -2398,7 +2426,7 @@ func (c *Cloud) DetachDisk(diskName KubernetesVolumeID, nodeName types.NodeName)
|
|||||||
return "", errors.New("no response from DetachVolume")
|
return "", errors.New("no response from DetachVolume")
|
||||||
}
|
}
|
||||||
|
|
||||||
attachment, err := diskInfo.disk.waitForAttachmentStatus("detached")
|
attachment, err := diskInfo.disk.waitForAttachmentStatus("detached", awsInstance.awsID, "")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user