Merge pull request #86555 from feiskyer/azure-retry-error

Return typed errors for Azure client interfaces
This commit is contained in:
Kubernetes Prow Robot 2019-12-25 01:21:43 -08:00 committed by GitHub
commit 6c1080b3ca
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
27 changed files with 1294 additions and 997 deletions

View File

@ -68,6 +68,7 @@ go_library(
"//staging/src/k8s.io/component-base/metrics:go_default_library",
"//staging/src/k8s.io/component-base/metrics/legacyregistry:go_default_library",
"//staging/src/k8s.io/legacy-cloud-providers/azure/auth:go_default_library",
"//staging/src/k8s.io/legacy-cloud-providers/azure/retry:go_default_library",
"//vendor/github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2019-07-01/compute:go_default_library",
"//vendor/github.com/Azure/azure-sdk-for-go/services/network/mgmt/2019-06-01/network:go_default_library",
"//vendor/github.com/Azure/azure-sdk-for-go/services/storage/mgmt/2019-06-01/storage:go_default_library",
@ -86,7 +87,6 @@ go_library(
go_test(
name = "go_default_test",
srcs = [
"azure_backoff_test.go",
"azure_cache_test.go",
"azure_config_test.go",
"azure_controller_common_test.go",
@ -118,10 +118,10 @@ go_test(
"//staging/src/k8s.io/cloud-provider:go_default_library",
"//staging/src/k8s.io/cloud-provider/service/helpers:go_default_library",
"//staging/src/k8s.io/legacy-cloud-providers/azure/auth:go_default_library",
"//staging/src/k8s.io/legacy-cloud-providers/azure/retry:go_default_library",
"//vendor/github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2019-07-01/compute:go_default_library",
"//vendor/github.com/Azure/azure-sdk-for-go/services/network/mgmt/2019-06-01/network:go_default_library",
"//vendor/github.com/Azure/azure-sdk-for-go/services/storage/mgmt/2019-06-01/storage:go_default_library",
"//vendor/github.com/Azure/go-autorest/autorest:go_default_library",
"//vendor/github.com/Azure/go-autorest/autorest/to:go_default_library",
"//vendor/github.com/stretchr/testify/assert:go_default_library",
"//vendor/k8s.io/utils/pointer:go_default_library",
@ -141,6 +141,7 @@ filegroup(
srcs = [
":package-srcs",
"//staging/src/k8s.io/legacy-cloud-providers/azure/auth:all-srcs",
"//staging/src/k8s.io/legacy-cloud-providers/azure/retry:all-srcs",
],
tags = ["automanaged"],
)

View File

@ -19,7 +19,6 @@ limitations under the License.
package azure
import (
"fmt"
"net/http"
"strings"
@ -33,6 +32,7 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
cloudprovider "k8s.io/cloud-provider"
"k8s.io/klog"
"k8s.io/legacy-cloud-providers/azure/retry"
)
const (
@ -90,7 +90,7 @@ func (az *Cloud) GetVirtualMachineWithRetry(name types.NodeName, crt cacheReadTy
func (az *Cloud) ListVirtualMachinesWithRetry(resourceGroup string) ([]compute.VirtualMachine, error) {
allNodes := []compute.VirtualMachine{}
err := wait.ExponentialBackoff(az.RequestBackoff(), func() (bool, error) {
var retryErr error
var retryErr *retry.Error
ctx, cancel := getContextWithCancel()
defer cancel()
allNodes, retryErr = az.VirtualMachinesClient.List(ctx, resourceGroup)
@ -98,7 +98,7 @@ func (az *Cloud) ListVirtualMachinesWithRetry(resourceGroup string) ([]compute.V
klog.Errorf("VirtualMachinesClient.List(%v) - backoff: failure, will retry,err=%v",
resourceGroup,
retryErr)
return false, retryErr
return false, retryErr.Error()
}
klog.V(2).Infof("VirtualMachinesClient.List(%v) - backoff: success", resourceGroup)
return true, nil
@ -107,7 +107,7 @@ func (az *Cloud) ListVirtualMachinesWithRetry(resourceGroup string) ([]compute.V
return nil, err
}
return allNodes, err
return allNodes, nil
}
// ListVirtualMachines invokes az.VirtualMachinesClient.List with exponential backoff retry
@ -116,10 +116,10 @@ func (az *Cloud) ListVirtualMachines(resourceGroup string) ([]compute.VirtualMac
ctx, cancel := getContextWithCancel()
defer cancel()
allNodes, err := az.VirtualMachinesClient.List(ctx, resourceGroup)
if err != nil {
klog.Errorf("VirtualMachinesClient.List(%v) failure with err=%v", resourceGroup, err)
return nil, err
allNodes, rerr := az.VirtualMachinesClient.List(ctx, resourceGroup)
if rerr != nil {
klog.Errorf("VirtualMachinesClient.List(%v) failure with err=%v", resourceGroup, rerr)
return nil, rerr.Error()
}
klog.V(2).Infof("VirtualMachinesClient.List(%v) success", resourceGroup)
return allNodes, nil
@ -187,28 +187,25 @@ func (az *Cloud) CreateOrUpdateSecurityGroup(service *v1.Service, sg network.Sec
ctx, cancel := getContextWithCancel()
defer cancel()
resp, err := az.SecurityGroupsClient.CreateOrUpdate(ctx, az.ResourceGroup, *sg.Name, sg, to.String(sg.Etag))
rerr := az.SecurityGroupsClient.CreateOrUpdate(ctx, az.ResourceGroup, *sg.Name, sg, to.String(sg.Etag))
klog.V(10).Infof("SecurityGroupsClient.CreateOrUpdate(%s): end", *sg.Name)
if err == nil {
if isSuccessHTTPResponse(resp) {
// Invalidate the cache right after updating
az.nsgCache.Delete(*sg.Name)
} else if resp != nil {
return fmt.Errorf("HTTP response %q", resp.Status)
}
if rerr == nil {
// Invalidate the cache right after updating
az.nsgCache.Delete(*sg.Name)
return nil
}
// Invalidate the cache because ETAG precondition mismatch.
if resp != nil && resp.StatusCode == http.StatusPreconditionFailed {
if rerr.HTTPStatusCode == http.StatusPreconditionFailed {
az.nsgCache.Delete(*sg.Name)
}
// Invalidate the cache because another new operation has canceled the current request.
if err != nil && strings.Contains(strings.ToLower(err.Error()), operationCancledErrorMessage) {
if strings.Contains(strings.ToLower(rerr.Error().Error()), operationCancledErrorMessage) {
az.nsgCache.Delete(*sg.Name)
}
return err
return rerr.Error()
}
return az.CreateOrUpdateSGWithRetry(service, sg)
@ -220,27 +217,27 @@ func (az *Cloud) CreateOrUpdateSGWithRetry(service *v1.Service, sg network.Secur
ctx, cancel := getContextWithCancel()
defer cancel()
resp, err := az.SecurityGroupsClient.CreateOrUpdate(ctx, az.ResourceGroup, *sg.Name, sg, to.String(sg.Etag))
rerr := az.SecurityGroupsClient.CreateOrUpdate(ctx, az.ResourceGroup, *sg.Name, sg, to.String(sg.Etag))
klog.V(10).Infof("SecurityGroupsClient.CreateOrUpdate(%s): end", *sg.Name)
done, retryError := az.processHTTPRetryResponse(service, "CreateOrUpdateSecurityGroup", resp, err)
if done && err == nil {
if rerr == nil {
// Invalidate the cache right after updating
az.nsgCache.Delete(*sg.Name)
return true, nil
}
// Invalidate the cache and abort backoff because ETAG precondition mismatch.
if resp != nil && resp.StatusCode == http.StatusPreconditionFailed {
if rerr.HTTPStatusCode == http.StatusPreconditionFailed {
az.nsgCache.Delete(*sg.Name)
return true, err
return true, rerr.Error()
}
// Invalidate the cache and abort backoff because another new operation has canceled the current request.
if err != nil && strings.Contains(strings.ToLower(err.Error()), operationCancledErrorMessage) {
if strings.Contains(strings.ToLower(rerr.Error().Error()), operationCancledErrorMessage) {
az.nsgCache.Delete(*sg.Name)
return true, err
return true, rerr.Error()
}
return done, retryError
return !rerr.Retriable, rerr.Error()
})
}
@ -251,26 +248,23 @@ func (az *Cloud) CreateOrUpdateLB(service *v1.Service, lb network.LoadBalancer)
defer cancel()
rgName := az.getLoadBalancerResourceGroup()
resp, err := az.LoadBalancerClient.CreateOrUpdate(ctx, rgName, *lb.Name, lb, to.String(lb.Etag))
rerr := az.LoadBalancerClient.CreateOrUpdate(ctx, rgName, *lb.Name, lb, to.String(lb.Etag))
klog.V(10).Infof("LoadBalancerClient.CreateOrUpdate(%s): end", *lb.Name)
if err == nil {
if isSuccessHTTPResponse(resp) {
// Invalidate the cache right after updating
az.lbCache.Delete(*lb.Name)
} else if resp != nil {
return fmt.Errorf("HTTP response %q", resp.Status)
}
if rerr == nil {
// Invalidate the cache right after updating
az.lbCache.Delete(*lb.Name)
return nil
}
// Invalidate the cache because ETAG precondition mismatch.
if resp != nil && resp.StatusCode == http.StatusPreconditionFailed {
if rerr.HTTPStatusCode == http.StatusPreconditionFailed {
az.lbCache.Delete(*lb.Name)
}
// Invalidate the cache because another new operation has canceled the current request.
if err != nil && strings.Contains(strings.ToLower(err.Error()), operationCancledErrorMessage) {
if strings.Contains(strings.ToLower(rerr.Error().Error()), operationCancledErrorMessage) {
az.lbCache.Delete(*lb.Name)
}
return err
return rerr.Error()
}
return az.createOrUpdateLBWithRetry(service, lb)
@ -283,25 +277,25 @@ func (az *Cloud) createOrUpdateLBWithRetry(service *v1.Service, lb network.LoadB
defer cancel()
rgName := az.getLoadBalancerResourceGroup()
resp, err := az.LoadBalancerClient.CreateOrUpdate(ctx, rgName, *lb.Name, lb, to.String(lb.Etag))
rerr := az.LoadBalancerClient.CreateOrUpdate(ctx, rgName, *lb.Name, lb, to.String(lb.Etag))
klog.V(10).Infof("LoadBalancerClient.CreateOrUpdate(%s): end", *lb.Name)
done, retryError := az.processHTTPRetryResponse(service, "CreateOrUpdateLoadBalancer", resp, err)
if done && err == nil {
if rerr == nil {
// Invalidate the cache right after updating
az.lbCache.Delete(*lb.Name)
return true, nil
}
// Invalidate the cache and abort backoff because ETAG precondition mismatch.
if resp != nil && resp.StatusCode == http.StatusPreconditionFailed {
if rerr.HTTPStatusCode == http.StatusPreconditionFailed {
az.lbCache.Delete(*lb.Name)
return true, err
return true, rerr.Error()
}
// Invalidate the cache and abort backoff because another new operation has canceled the current request.
if err != nil && strings.Contains(strings.ToLower(err.Error()), operationCancledErrorMessage) {
if strings.Contains(strings.ToLower(rerr.Error().Error()), operationCancledErrorMessage) {
az.lbCache.Delete(*lb.Name)
return true, err
return true, rerr.Error()
}
return done, retryError
return !rerr.Retriable, rerr.Error()
})
}
@ -312,11 +306,11 @@ func (az *Cloud) ListLB(service *v1.Service) ([]network.LoadBalancer, error) {
defer cancel()
rgName := az.getLoadBalancerResourceGroup()
allLBs, err := az.LoadBalancerClient.List(ctx, rgName)
if err != nil {
az.Event(service, v1.EventTypeWarning, "ListLoadBalancers", err.Error())
klog.Errorf("LoadBalancerClient.List(%v) failure with err=%v", rgName, err)
return nil, err
allLBs, rerr := az.LoadBalancerClient.List(ctx, rgName)
if rerr != nil {
az.Event(service, v1.EventTypeWarning, "ListLoadBalancers", rerr.Error().Error())
klog.Errorf("LoadBalancerClient.List(%v) failure with err=%v", rgName, rerr)
return nil, rerr.Error()
}
klog.V(2).Infof("LoadBalancerClient.List(%v) success", rgName)
return allLBs, nil
@ -327,21 +321,21 @@ func (az *Cloud) ListLB(service *v1.Service) ([]network.LoadBalancer, error) {
// listLBWithRetry invokes az.LoadBalancerClient.List with exponential backoff retry
func (az *Cloud) listLBWithRetry(service *v1.Service) ([]network.LoadBalancer, error) {
var retryErr *retry.Error
var allLBs []network.LoadBalancer
err := wait.ExponentialBackoff(az.RequestBackoff(), func() (bool, error) {
var retryErr error
ctx, cancel := getContextWithCancel()
defer cancel()
rgName := az.getLoadBalancerResourceGroup()
allLBs, retryErr = az.LoadBalancerClient.List(ctx, rgName)
if retryErr != nil {
az.Event(service, v1.EventTypeWarning, "ListLoadBalancers", retryErr.Error())
az.Event(service, v1.EventTypeWarning, "ListLoadBalancers", retryErr.Error().Error())
klog.Errorf("LoadBalancerClient.List(%v) - backoff: failure, will retry,err=%v",
rgName,
retryErr)
return false, retryErr
return false, retryErr.Error()
}
klog.V(2).Infof("LoadBalancerClient.List(%v) - backoff: success", az.ResourceGroup)
return true, nil
@ -359,11 +353,11 @@ func (az *Cloud) ListPIP(service *v1.Service, pipResourceGroup string) ([]networ
ctx, cancel := getContextWithCancel()
defer cancel()
allPIPs, err := az.PublicIPAddressesClient.List(ctx, pipResourceGroup)
if err != nil {
az.Event(service, v1.EventTypeWarning, "ListPublicIPs", err.Error())
klog.Errorf("PublicIPAddressesClient.List(%v) failure with err=%v", pipResourceGroup, err)
return nil, err
allPIPs, rerr := az.PublicIPAddressesClient.List(ctx, pipResourceGroup)
if rerr != nil {
az.Event(service, v1.EventTypeWarning, "ListPublicIPs", rerr.Error().Error())
klog.Errorf("PublicIPAddressesClient.List(%v) failure with err=%v", pipResourceGroup, rerr)
return nil, rerr.Error()
}
klog.V(2).Infof("PublicIPAddressesClient.List(%v) success", pipResourceGroup)
return allPIPs, nil
@ -377,17 +371,17 @@ func (az *Cloud) listPIPWithRetry(service *v1.Service, pipResourceGroup string)
var allPIPs []network.PublicIPAddress
err := wait.ExponentialBackoff(az.RequestBackoff(), func() (bool, error) {
var retryErr error
ctx, cancel := getContextWithCancel()
defer cancel()
var retryErr *retry.Error
allPIPs, retryErr = az.PublicIPAddressesClient.List(ctx, pipResourceGroup)
if retryErr != nil {
az.Event(service, v1.EventTypeWarning, "ListPublicIPs", retryErr.Error())
az.Event(service, v1.EventTypeWarning, "ListPublicIPs", retryErr.Error().Error())
klog.Errorf("PublicIPAddressesClient.List(%v) - backoff: failure, will retry,err=%v",
pipResourceGroup,
retryErr)
return false, retryErr
return false, retryErr.Error()
}
klog.V(2).Infof("PublicIPAddressesClient.List(%v) - backoff: success", pipResourceGroup)
return true, nil
@ -405,9 +399,15 @@ func (az *Cloud) CreateOrUpdatePIP(service *v1.Service, pipResourceGroup string,
ctx, cancel := getContextWithCancel()
defer cancel()
resp, err := az.PublicIPAddressesClient.CreateOrUpdate(ctx, pipResourceGroup, *pip.Name, pip)
rerr := az.PublicIPAddressesClient.CreateOrUpdate(ctx, pipResourceGroup, *pip.Name, pip)
klog.V(10).Infof("PublicIPAddressesClient.CreateOrUpdate(%s, %s): end", pipResourceGroup, *pip.Name)
return az.processHTTPResponse(service, "CreateOrUpdatePublicIPAddress", resp, err)
if rerr != nil {
klog.Errorf("PublicIPAddressesClient.CreateOrUpdate(%s, %s) failed: %s", pipResourceGroup, *pip.Name, rerr.Error().Error())
az.Event(service, v1.EventTypeWarning, "CreateOrUpdatePublicIPAddress", rerr.Error().Error())
return rerr.Error()
}
return nil
}
return az.createOrUpdatePIPWithRetry(service, pipResourceGroup, pip)
@ -419,9 +419,15 @@ func (az *Cloud) createOrUpdatePIPWithRetry(service *v1.Service, pipResourceGrou
ctx, cancel := getContextWithCancel()
defer cancel()
resp, err := az.PublicIPAddressesClient.CreateOrUpdate(ctx, pipResourceGroup, *pip.Name, pip)
rerr := az.PublicIPAddressesClient.CreateOrUpdate(ctx, pipResourceGroup, *pip.Name, pip)
klog.V(10).Infof("PublicIPAddressesClient.CreateOrUpdate(%s, %s): end", pipResourceGroup, *pip.Name)
return az.processHTTPRetryResponse(service, "CreateOrUpdatePublicIPAddress", resp, err)
if rerr != nil {
klog.Errorf("PublicIPAddressesClient.CreateOrUpdate(%s, %s) failed: %s", pipResourceGroup, *pip.Name, rerr.Error().Error())
az.Event(service, v1.EventTypeWarning, "CreateOrUpdatePublicIPAddress", rerr.Error().Error())
return !rerr.Retriable, rerr.Error()
}
return true, nil
})
}
@ -431,9 +437,15 @@ func (az *Cloud) CreateOrUpdateInterface(service *v1.Service, nic network.Interf
ctx, cancel := getContextWithCancel()
defer cancel()
resp, err := az.InterfacesClient.CreateOrUpdate(ctx, az.ResourceGroup, *nic.Name, nic)
rerr := az.InterfacesClient.CreateOrUpdate(ctx, az.ResourceGroup, *nic.Name, nic)
klog.V(10).Infof("InterfacesClient.CreateOrUpdate(%s): end", *nic.Name)
return az.processHTTPResponse(service, "CreateOrUpdateInterface", resp, err)
if rerr != nil {
klog.Errorf("InterfacesClient.CreateOrUpdate(%s) failed: %s", *nic.Name, rerr.Error().Error())
az.Event(service, v1.EventTypeWarning, "CreateOrUpdateInterface", rerr.Error().Error())
return rerr.Error()
}
return nil
}
return az.createOrUpdateInterfaceWithRetry(service, nic)
@ -445,9 +457,15 @@ func (az *Cloud) createOrUpdateInterfaceWithRetry(service *v1.Service, nic netwo
ctx, cancel := getContextWithCancel()
defer cancel()
resp, err := az.InterfacesClient.CreateOrUpdate(ctx, az.ResourceGroup, *nic.Name, nic)
rerr := az.InterfacesClient.CreateOrUpdate(ctx, az.ResourceGroup, *nic.Name, nic)
klog.V(10).Infof("InterfacesClient.CreateOrUpdate(%s): end", *nic.Name)
return az.processHTTPRetryResponse(service, "CreateOrUpdateInterface", resp, err)
if rerr != nil {
klog.Errorf("InterfacesClient.CreateOrUpdate(%s) faild: %s", *nic.Name, rerr.Error().Error())
az.Event(service, v1.EventTypeWarning, "CreateOrUpdateInterface", rerr.Error().Error())
return !rerr.Retriable, rerr.Error()
}
return true, nil
})
}
@ -457,8 +475,14 @@ func (az *Cloud) DeletePublicIP(service *v1.Service, pipResourceGroup string, pi
ctx, cancel := getContextWithCancel()
defer cancel()
resp, err := az.PublicIPAddressesClient.Delete(ctx, pipResourceGroup, pipName)
return az.processHTTPResponse(service, "DeletePublicIPAddress", resp, err)
rerr := az.PublicIPAddressesClient.Delete(ctx, pipResourceGroup, pipName)
if rerr != nil {
klog.Errorf("PublicIPAddressesClient.Delete(%s) failed: %s", pipName, rerr.Error().Error())
az.Event(service, v1.EventTypeWarning, "DeletePublicIPAddress", rerr.Error().Error())
return rerr.Error()
}
return nil
}
return az.deletePublicIPWithRetry(service, pipResourceGroup, pipName)
@ -470,8 +494,14 @@ func (az *Cloud) deletePublicIPWithRetry(service *v1.Service, pipResourceGroup s
ctx, cancel := getContextWithCancel()
defer cancel()
resp, err := az.PublicIPAddressesClient.Delete(ctx, pipResourceGroup, pipName)
return az.processHTTPRetryResponse(service, "DeletePublicIPAddress", resp, err)
rerr := az.PublicIPAddressesClient.Delete(ctx, pipResourceGroup, pipName)
if rerr != nil {
klog.Errorf("PublicIPAddressesClient.Delete(%s) failed: %s", pipName, rerr.Error().Error())
az.Event(service, v1.EventTypeWarning, "DeletePublicIPAddress", rerr.Error().Error())
return !rerr.Retriable, rerr.Error()
}
return true, nil
})
}
@ -482,16 +512,16 @@ func (az *Cloud) DeleteLB(service *v1.Service, lbName string) error {
defer cancel()
rgName := az.getLoadBalancerResourceGroup()
resp, err := az.LoadBalancerClient.Delete(ctx, rgName, lbName)
if err == nil {
if isSuccessHTTPResponse(resp) {
// Invalidate the cache right after updating
az.lbCache.Delete(lbName)
} else if resp != nil {
return fmt.Errorf("HTTP response %q", resp.Status)
}
rerr := az.LoadBalancerClient.Delete(ctx, rgName, lbName)
if rerr == nil {
// Invalidate the cache right after updating
az.lbCache.Delete(lbName)
return nil
}
return err
klog.Errorf("LoadBalancerClient.Delete(%s) failed: %s", lbName, rerr.Error().Error())
az.Event(service, v1.EventTypeWarning, "DeleteLoadBalancer", rerr.Error().Error())
return rerr.Error()
}
return az.deleteLBWithRetry(service, lbName)
@ -504,13 +534,16 @@ func (az *Cloud) deleteLBWithRetry(service *v1.Service, lbName string) error {
defer cancel()
rgName := az.getLoadBalancerResourceGroup()
resp, err := az.LoadBalancerClient.Delete(ctx, rgName, lbName)
done, err := az.processHTTPRetryResponse(service, "DeleteLoadBalancer", resp, err)
if done && err == nil {
rerr := az.LoadBalancerClient.Delete(ctx, rgName, lbName)
if rerr == nil {
// Invalidate the cache right after deleting
az.lbCache.Delete(lbName)
return true, nil
}
return done, err
klog.Errorf("LoadBalancerClient.Delete(%s) failed: %s", lbName, rerr.Error().Error())
az.Event(service, v1.EventTypeWarning, "CreateOrUpdateInterface", rerr.Error().Error())
return !rerr.Retriable, rerr.Error()
})
}
@ -520,15 +553,23 @@ func (az *Cloud) CreateOrUpdateRouteTable(routeTable network.RouteTable) error {
ctx, cancel := getContextWithCancel()
defer cancel()
resp, err := az.RouteTablesClient.CreateOrUpdate(ctx, az.RouteTableResourceGroup, az.RouteTableName, routeTable, to.String(routeTable.Etag))
if resp != nil && resp.StatusCode == http.StatusPreconditionFailed {
rerr := az.RouteTablesClient.CreateOrUpdate(ctx, az.RouteTableResourceGroup, az.RouteTableName, routeTable, to.String(routeTable.Etag))
if rerr == nil {
// Invalidate the cache right after updating
az.rtCache.Delete(*routeTable.Name)
return nil
}
// Invalidate the cache because etag mismatch.
if rerr.HTTPStatusCode == http.StatusPreconditionFailed {
az.rtCache.Delete(*routeTable.Name)
}
// Invalidate the cache because another new operation has canceled the current request.
if err != nil && strings.Contains(strings.ToLower(err.Error()), operationCancledErrorMessage) {
if strings.Contains(strings.ToLower(rerr.Error().Error()), operationCancledErrorMessage) {
az.rtCache.Delete(*routeTable.Name)
}
return az.processHTTPResponse(nil, "", resp, err)
klog.Errorf("RouteTablesClient.CreateOrUpdate(%s) failed: %v", az.RouteTableName, rerr.Error())
return rerr.Error()
}
return az.createOrUpdateRouteTableWithRetry(routeTable)
@ -540,24 +581,24 @@ func (az *Cloud) createOrUpdateRouteTableWithRetry(routeTable network.RouteTable
ctx, cancel := getContextWithCancel()
defer cancel()
resp, err := az.RouteTablesClient.CreateOrUpdate(ctx, az.RouteTableResourceGroup, az.RouteTableName, routeTable, to.String(routeTable.Etag))
done, retryError := az.processHTTPRetryResponse(nil, "", resp, err)
if done && err == nil {
rerr := az.RouteTablesClient.CreateOrUpdate(ctx, az.RouteTableResourceGroup, az.RouteTableName, routeTable, to.String(routeTable.Etag))
if rerr == nil {
az.rtCache.Delete(*routeTable.Name)
return done, nil
return true, nil
}
// Invalidate the cache and abort backoff because ETAG precondition mismatch.
if resp != nil && resp.StatusCode == http.StatusPreconditionFailed {
if rerr.HTTPStatusCode == http.StatusPreconditionFailed {
az.rtCache.Delete(*routeTable.Name)
return true, err
return true, rerr.Error()
}
// Invalidate the cache and abort backoff because another new operation has canceled the current request.
if err != nil && strings.Contains(strings.ToLower(err.Error()), operationCancledErrorMessage) {
if strings.Contains(strings.ToLower(rerr.Error().Error()), operationCancledErrorMessage) {
az.rtCache.Delete(*routeTable.Name)
return true, err
return true, rerr.Error()
}
return done, retryError
klog.Errorf("RouteTablesClient.CreateOrUpdate(%s) failed: %v", az.RouteTableName, rerr.Error())
return !rerr.Retriable, rerr.Error()
})
}
@ -567,16 +608,21 @@ func (az *Cloud) CreateOrUpdateRoute(route network.Route) error {
ctx, cancel := getContextWithCancel()
defer cancel()
resp, err := az.RoutesClient.CreateOrUpdate(ctx, az.RouteTableResourceGroup, az.RouteTableName, *route.Name, route, to.String(route.Etag))
rerr := az.RoutesClient.CreateOrUpdate(ctx, az.RouteTableResourceGroup, az.RouteTableName, *route.Name, route, to.String(route.Etag))
klog.V(10).Infof("RoutesClient.CreateOrUpdate(%s): end", *route.Name)
if resp != nil && resp.StatusCode == http.StatusPreconditionFailed {
if rerr == nil {
az.rtCache.Delete(az.RouteTableName)
return nil
}
if rerr.HTTPStatusCode == http.StatusPreconditionFailed {
az.rtCache.Delete(az.RouteTableName)
}
// Invalidate the cache because another new operation has canceled the current request.
if err != nil && strings.Contains(strings.ToLower(err.Error()), operationCancledErrorMessage) {
if strings.Contains(strings.ToLower(rerr.Error().Error()), operationCancledErrorMessage) {
az.rtCache.Delete(az.RouteTableName)
}
return az.processHTTPResponse(nil, "", resp, err)
return rerr.Error()
}
return az.createOrUpdateRouteWithRetry(route)
@ -588,27 +634,26 @@ func (az *Cloud) createOrUpdateRouteWithRetry(route network.Route) error {
ctx, cancel := getContextWithCancel()
defer cancel()
resp, err := az.RoutesClient.CreateOrUpdate(ctx, az.RouteTableResourceGroup, az.RouteTableName, *route.Name, route, to.String(route.Etag))
rerr := az.RoutesClient.CreateOrUpdate(ctx, az.RouteTableResourceGroup, az.RouteTableName, *route.Name, route, to.String(route.Etag))
klog.V(10).Infof("RoutesClient.CreateOrUpdate(%s): end", *route.Name)
done, retryError := az.processHTTPRetryResponse(nil, "", resp, err)
if done && err == nil {
if rerr == nil {
az.rtCache.Delete(az.RouteTableName)
return done, nil
return true, nil
}
// Invalidate the cache and abort backoff because ETAG precondition mismatch.
if resp != nil && resp.StatusCode == http.StatusPreconditionFailed {
if rerr.HTTPStatusCode == http.StatusPreconditionFailed {
az.rtCache.Delete(az.RouteTableName)
return true, err
return true, rerr.Error()
}
// Invalidate the cache and abort backoff because another new operation has canceled the current request.
if err != nil && strings.Contains(strings.ToLower(err.Error()), operationCancledErrorMessage) {
if strings.Contains(strings.ToLower(rerr.Error().Error()), operationCancledErrorMessage) {
az.rtCache.Delete(az.RouteTableName)
return true, err
return true, rerr.Error()
}
return done, retryError
return !rerr.Retriable, rerr.Error()
})
}
@ -618,9 +663,14 @@ func (az *Cloud) DeleteRouteWithName(routeName string) error {
ctx, cancel := getContextWithCancel()
defer cancel()
resp, err := az.RoutesClient.Delete(ctx, az.RouteTableResourceGroup, az.RouteTableName, routeName)
rerr := az.RoutesClient.Delete(ctx, az.RouteTableResourceGroup, az.RouteTableName, routeName)
klog.V(10).Infof("RoutesClient.Delete(%s,%s): end", az.RouteTableName, routeName)
return az.processHTTPResponse(nil, "", resp, err)
if rerr == nil {
return nil
}
klog.Errorf("RoutesClient.Delete(%s, %s) failed: %v", az.RouteTableName, routeName, rerr.Error())
return rerr.Error()
}
return az.deleteRouteWithRetry(routeName)
@ -632,9 +682,14 @@ func (az *Cloud) deleteRouteWithRetry(routeName string) error {
ctx, cancel := getContextWithCancel()
defer cancel()
resp, err := az.RoutesClient.Delete(ctx, az.RouteTableResourceGroup, az.RouteTableName, routeName)
rerr := az.RoutesClient.Delete(ctx, az.RouteTableResourceGroup, az.RouteTableName, routeName)
klog.V(10).Infof("RoutesClient.Delete(%s,%s): end", az.RouteTableName, routeName)
return az.processHTTPRetryResponse(nil, "", resp, err)
if rerr == nil {
return true, nil
}
klog.Errorf("RoutesClient.Delete(%s, %s) failed: %v", az.RouteTableName, routeName, rerr.Error())
return !rerr.Retriable, rerr.Error()
})
}
@ -644,17 +699,20 @@ func (az *Cloud) UpdateVmssVMWithRetry(resourceGroupName string, VMScaleSetName
ctx, cancel := getContextWithCancel()
defer cancel()
resp, err := az.VirtualMachineScaleSetVMsClient.Update(ctx, resourceGroupName, VMScaleSetName, instanceID, parameters, source)
rerr := az.VirtualMachineScaleSetVMsClient.Update(ctx, resourceGroupName, VMScaleSetName, instanceID, parameters, source)
klog.V(10).Infof("UpdateVmssVMWithRetry: VirtualMachineScaleSetVMsClient.Update(%s,%s): end", VMScaleSetName, instanceID)
if rerr == nil {
return true, nil
}
if err != nil && strings.Contains(err.Error(), vmssVMNotActiveErrorMessage) {
if strings.Contains(rerr.Error().Error(), vmssVMNotActiveErrorMessage) {
// When instances are under deleting, updating API would report "not an active Virtual Machine Scale Set VM instanceId" error.
// Since they're under deleting, we shouldn't send more update requests for it.
klog.V(3).Infof("UpdateVmssVMWithRetry: VirtualMachineScaleSetVMsClient.Update(%s,%s) gets error message %q, abort backoff because it's probably under deleting", VMScaleSetName, instanceID, vmssVMNotActiveErrorMessage)
return true, nil
}
return az.processHTTPRetryResponse(nil, "", resp, err)
return !rerr.Retriable, rerr.Error()
})
}
@ -667,26 +725,29 @@ func (az *Cloud) CreateOrUpdateVmssWithRetry(resourceGroupName string, VMScaleSe
// When vmss is being deleted, CreateOrUpdate API would report "the vmss is being deleted" error.
// Since it is being deleted, we shouldn't send more CreateOrUpdate requests for it.
klog.V(3).Infof("CreateOrUpdateVmssWithRetry: verify the status of the vmss being created or updated")
vmss, err := az.VirtualMachineScaleSetsClient.Get(ctx, resourceGroupName, VMScaleSetName)
if err != nil {
klog.Warningf("CreateOrUpdateVmssWithRetry: error getting vmss: %s", err)
vmss, rerr := az.VirtualMachineScaleSetsClient.Get(ctx, resourceGroupName, VMScaleSetName)
if rerr != nil {
klog.Warningf("CreateOrUpdateVmssWithRetry: error getting vmss: %v", rerr)
}
if vmss.ProvisioningState != nil && strings.EqualFold(*vmss.ProvisioningState, virtualMachineScaleSetsDeallocating) {
klog.V(3).Infof("CreateOrUpdateVmssWithRetry: found vmss %s being deleted, skipping", VMScaleSetName)
return true, nil
}
resp, err := az.VirtualMachineScaleSetsClient.CreateOrUpdate(ctx, resourceGroupName, VMScaleSetName, parameters)
rerr = az.VirtualMachineScaleSetsClient.CreateOrUpdate(ctx, resourceGroupName, VMScaleSetName, parameters)
klog.V(10).Infof("UpdateVmssVMWithRetry: VirtualMachineScaleSetsClient.CreateOrUpdate(%s): end", VMScaleSetName)
if rerr == nil {
return true, nil
}
return az.processHTTPRetryResponse(nil, "", resp, err)
return !rerr.Retriable, rerr.Error()
})
}
// GetScaleSetWithRetry gets scale set with exponential backoff retry
func (az *Cloud) GetScaleSetWithRetry(service *v1.Service, resourceGroupName, vmssName string) (compute.VirtualMachineScaleSet, error) {
var result compute.VirtualMachineScaleSet
var retryErr error
var retryErr *retry.Error
err := wait.ExponentialBackoff(az.RequestBackoff(), func() (bool, error) {
ctx, cancel := getContextWithCancel()
@ -694,7 +755,7 @@ func (az *Cloud) GetScaleSetWithRetry(service *v1.Service, resourceGroupName, vm
result, retryErr = az.VirtualMachineScaleSetsClient.Get(ctx, resourceGroupName, vmssName)
if retryErr != nil {
az.Event(service, v1.EventTypeWarning, "GetVirtualMachineScaleSet", retryErr.Error())
az.Event(service, v1.EventTypeWarning, "GetVirtualMachineScaleSet", retryErr.Error().Error())
klog.Errorf("backoff: failure for scale set %q, will retry,err=%v", vmssName, retryErr)
return false, nil
}
@ -705,85 +766,6 @@ func (az *Cloud) GetScaleSetWithRetry(service *v1.Service, resourceGroupName, vm
return result, err
}
// isSuccessHTTPResponse determines if the response from an HTTP request suggests success
func isSuccessHTTPResponse(resp *http.Response) bool {
if resp == nil {
return false
}
// HTTP 2xx suggests a successful response
if 199 < resp.StatusCode && resp.StatusCode < 300 {
return true
}
return false
}
func shouldRetryHTTPRequest(resp *http.Response, err error) bool {
if resp != nil {
// HTTP 412 (StatusPreconditionFailed) means etag mismatch, hence we shouldn't retry.
if resp.StatusCode == http.StatusPreconditionFailed {
return false
}
// HTTP 4xx (except 412) or 5xx suggests we should retry.
if 399 < resp.StatusCode && resp.StatusCode < 600 {
return true
}
}
if err != nil {
return true
}
return false
}
// processHTTPRetryResponse : return true means stop retry, false means continue retry
func (az *Cloud) processHTTPRetryResponse(service *v1.Service, reason string, resp *http.Response, err error) (bool, error) {
if err == nil && resp != nil && isSuccessHTTPResponse(resp) {
// HTTP 2xx suggests a successful response
return true, nil
}
if shouldRetryHTTPRequest(resp, err) {
message := "processHTTPRetryResponse: backoff failure, will retry"
if resp != nil {
message = fmt.Sprintf("%s, HTTP response: %d", message, resp.StatusCode)
}
if err != nil {
message = fmt.Sprintf("%s, error: %v", message, err)
}
az.Event(service, v1.EventTypeWarning, reason, message)
klog.Error(message)
// suppress the error object so that backoff process continues
return false, nil
}
// Fall-through: stop periodic backoff
return true, nil
}
func (az *Cloud) processHTTPResponse(service *v1.Service, reason string, resp *http.Response, err error) error {
if err == nil && isSuccessHTTPResponse(resp) {
// HTTP 2xx suggests a successful response
return nil
}
message := "processHTTPResponse failed"
if resp != nil {
message = fmt.Sprintf("%s, HTTP response: %d", message, resp.StatusCode)
}
if err != nil {
message = fmt.Sprintf("%s, error: %v", message, err)
}
az.Event(service, v1.EventTypeWarning, reason, message)
klog.Error(message)
return fmt.Errorf(message)
}
func (cfg *Config) shouldOmitCloudProviderBackoff() bool {
return cfg.CloudProviderBackoffMode == backoffModeV2
}

View File

@ -1,147 +0,0 @@
// +build !providerless
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package azure
import (
"fmt"
"net/http"
"testing"
)
func TestShouldRetryHTTPRequest(t *testing.T) {
tests := []struct {
code int
err error
expected bool
}{
{
code: http.StatusBadRequest,
expected: true,
},
{
code: http.StatusInternalServerError,
expected: true,
},
{
code: http.StatusOK,
err: fmt.Errorf("some error"),
expected: true,
},
{
code: http.StatusOK,
expected: false,
},
{
code: 399,
expected: false,
},
}
for _, test := range tests {
resp := &http.Response{
StatusCode: test.code,
}
res := shouldRetryHTTPRequest(resp, test.err)
if res != test.expected {
t.Errorf("expected: %v, saw: %v", test.expected, res)
}
}
}
func TestIsSuccessResponse(t *testing.T) {
tests := []struct {
code int
expected bool
}{
{
code: http.StatusNotFound,
expected: false,
},
{
code: http.StatusInternalServerError,
expected: false,
},
{
code: http.StatusOK,
expected: true,
},
}
for _, test := range tests {
resp := http.Response{
StatusCode: test.code,
}
res := isSuccessHTTPResponse(&resp)
if res != test.expected {
t.Errorf("expected: %v, saw: %v", test.expected, res)
}
}
}
func TestProcessRetryResponse(t *testing.T) {
az := &Cloud{}
tests := []struct {
code int
err error
stop bool
}{
{
code: http.StatusBadRequest,
stop: false,
},
{
code: http.StatusInternalServerError,
stop: false,
},
{
code: http.StatusSeeOther,
err: fmt.Errorf("some error"),
stop: false,
},
{
code: http.StatusSeeOther,
stop: true,
},
{
code: http.StatusOK,
stop: true,
},
{
code: http.StatusOK,
err: fmt.Errorf("some error"),
stop: false,
},
{
code: 399,
stop: true,
},
}
for _, test := range tests {
resp := &http.Response{
StatusCode: test.code,
}
res, err := az.processHTTPRetryResponse(nil, "", resp, test.err)
if res != test.stop {
t.Errorf("expected: %v, saw: %v", test.stop, res)
}
if err != nil {
t.Errorf("unexpected error: %v", err)
}
}
}

View File

@ -287,9 +287,9 @@ func (c *BlobDiskController) getStorageAccountKey(SAName string) (string, error)
ctx, cancel := getContextWithCancel()
defer cancel()
listKeysResult, err := c.common.cloud.StorageAccountClient.ListKeys(ctx, c.common.resourceGroup, SAName)
if err != nil {
return "", err
listKeysResult, rerr := c.common.cloud.StorageAccountClient.ListKeys(ctx, c.common.resourceGroup, SAName)
if rerr != nil {
return "", rerr.Error()
}
if listKeysResult.Keys == nil {
return "", fmt.Errorf("azureDisk - empty listKeysResult in storage account:%s keys", SAName)
@ -443,9 +443,9 @@ func (c *BlobDiskController) getDiskCount(SAName string) (int, error) {
func (c *BlobDiskController) getAllStorageAccounts() (map[string]*storageAccountState, error) {
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
accountListResult, err := c.common.cloud.StorageAccountClient.ListByResourceGroup(ctx, c.common.resourceGroup)
if err != nil {
return nil, err
accountListResult, rerr := c.common.cloud.StorageAccountClient.ListByResourceGroup(ctx, c.common.resourceGroup)
if rerr != nil {
return nil, rerr.Error()
}
if accountListResult.Value == nil {
return nil, fmt.Errorf("azureDisk - empty accountListResult")
@ -502,9 +502,9 @@ func (c *BlobDiskController) createStorageAccount(storageAccountName string, sto
ctx, cancel := getContextWithCancel()
defer cancel()
_, err := c.common.cloud.StorageAccountClient.Create(ctx, c.common.resourceGroup, storageAccountName, cp)
err := c.common.cloud.StorageAccountClient.Create(ctx, c.common.resourceGroup, storageAccountName, cp)
if err != nil {
return fmt.Errorf(fmt.Sprintf("Create Storage Account: %s, error: %s", storageAccountName, err))
return fmt.Errorf(fmt.Sprintf("Create Storage Account: %s, error: %v", storageAccountName, err))
}
newAccountState := &storageAccountState{
@ -599,9 +599,9 @@ func (c *BlobDiskController) findSANameForDisk(storageAccountType storage.SkuNam
func (c *BlobDiskController) getStorageAccountState(storageAccountName string) (bool, storage.ProvisioningState, error) {
ctx, cancel := getContextWithCancel()
defer cancel()
account, err := c.common.cloud.StorageAccountClient.GetProperties(ctx, c.common.resourceGroup, storageAccountName)
if err != nil {
return false, "", err
account, rerr := c.common.cloud.StorageAccountClient.GetProperties(ctx, c.common.resourceGroup, storageAccountName)
if rerr != nil {
return false, "", rerr.Error()
}
return true, account.AccountProperties.ProvisioningState, nil
}

View File

@ -34,6 +34,7 @@ import (
cloudprovider "k8s.io/cloud-provider"
volerr "k8s.io/cloud-provider/volume/errors"
"k8s.io/klog"
"k8s.io/legacy-cloud-providers/azure/retry"
)
const (
@ -122,9 +123,9 @@ func (c *controllerCommon) AttachDisk(isManagedDisk bool, diskName, diskURI stri
ctx, cancel := getContextWithCancel()
defer cancel()
disk, err := c.cloud.DisksClient.Get(ctx, resourceGroup, diskName)
if err != nil {
return -1, err
disk, rerr := c.cloud.DisksClient.Get(ctx, resourceGroup, diskName)
if rerr != nil {
return -1, rerr.Error()
}
if disk.ManagedBy != nil {
@ -193,19 +194,24 @@ func (c *controllerCommon) DetachDisk(diskName, diskURI string, nodeName types.N
// make the lock here as small as possible
c.vmLockMap.LockEntry(strings.ToLower(string(nodeName)))
c.diskAttachDetachMap.Store(strings.ToLower(diskURI), "detaching")
resp, err := vmset.DetachDisk(diskName, diskURI, nodeName)
err = vmset.DetachDisk(diskName, diskURI, nodeName)
c.diskAttachDetachMap.Delete(strings.ToLower(diskURI))
c.vmLockMap.UnlockEntry(strings.ToLower(string(nodeName)))
if c.cloud.CloudProviderBackoff && shouldRetryHTTPRequest(resp, err) {
if err != nil && retry.IsErrorRetriable(err) && c.cloud.CloudProviderBackoff {
klog.V(2).Infof("azureDisk - update backing off: detach disk(%s, %s), err: %v", diskName, diskURI, err)
retryErr := kwait.ExponentialBackoff(c.cloud.RequestBackoff(), func() (bool, error) {
c.vmLockMap.LockEntry(strings.ToLower(string(nodeName)))
c.diskAttachDetachMap.Store(strings.ToLower(diskURI), "detaching")
resp, err := vmset.DetachDisk(diskName, diskURI, nodeName)
err := vmset.DetachDisk(diskName, diskURI, nodeName)
c.diskAttachDetachMap.Delete(strings.ToLower(diskURI))
c.vmLockMap.UnlockEntry(strings.ToLower(string(nodeName)))
return c.cloud.processHTTPRetryResponse(nil, "", resp, err)
retriable := false
if err != nil && retry.IsErrorRetriable(err) {
retriable = true
}
return !retriable, err
})
if retryErr != nil {
err = retryErr
@ -214,11 +220,11 @@ func (c *controllerCommon) DetachDisk(diskName, diskURI string, nodeName types.N
}
if err != nil {
klog.Errorf("azureDisk - detach disk(%s, %s) failed, err: %v", diskName, diskURI, err)
} else {
klog.V(2).Infof("azureDisk - detach disk(%s, %s) succeeded", diskName, diskURI)
return err
}
return err
klog.V(2).Infof("azureDisk - detach disk(%s, %s) succeeded", diskName, diskURI)
return nil
}
// getNodeDataDisks invokes vmSet interfaces to get data disks for the node.

View File

@ -126,7 +126,7 @@ func TestCommonDetachDisk(t *testing.T) {
setTestVirtualMachines(testCloud, test.vmList, false)
err := common.DetachDisk(test.diskName, diskURI, test.nodeName)
assert.Equal(t, test.expectedErr, err != nil, "TestCase[%d]: %s", i, test.desc)
assert.Equal(t, test.expectedErr, err != nil, "TestCase[%d]: %s, err: %v", i, test.desc, err)
}
}

View File

@ -19,7 +19,6 @@ limitations under the License.
package azure
import (
"net/http"
"strings"
"github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2019-07-01/compute"
@ -94,35 +93,37 @@ func (as *availabilitySet) AttachDisk(isManagedDisk bool, diskName, diskURI stri
// Invalidate the cache right after updating
defer as.cloud.vmCache.Delete(vmName)
_, err = as.VirtualMachinesClient.Update(ctx, nodeResourceGroup, vmName, newVM, "attach_disk")
if err != nil {
klog.Errorf("azureDisk - attach disk(%s, %s) failed, err: %v", diskName, diskURI, err)
detail := err.Error()
rerr := as.VirtualMachinesClient.Update(ctx, nodeResourceGroup, vmName, newVM, "attach_disk")
if rerr != nil {
klog.Errorf("azureDisk - attach disk(%s, %s) failed, err: %v", diskName, diskURI, rerr)
detail := rerr.Error().Error()
if strings.Contains(detail, errLeaseFailed) || strings.Contains(detail, errDiskBlobNotFound) {
// if lease cannot be acquired or disk not found, immediately detach the disk and return the original error
klog.V(2).Infof("azureDisk - err %v, try detach disk(%s, %s)", err, diskName, diskURI)
klog.V(2).Infof("azureDisk - err %v, try detach disk(%s, %s)", rerr, diskName, diskURI)
as.DetachDisk(diskName, diskURI, nodeName)
}
} else {
klog.V(2).Infof("azureDisk - attach disk(%s, %s) succeeded", diskName, diskURI)
return rerr.Error()
}
return err
klog.V(2).Infof("azureDisk - attach disk(%s, %s) succeeded", diskName, diskURI)
return nil
}
// DetachDisk detaches a disk from host
// the vhd can be identified by diskName or diskURI
func (as *availabilitySet) DetachDisk(diskName, diskURI string, nodeName types.NodeName) (*http.Response, error) {
func (as *availabilitySet) DetachDisk(diskName, diskURI string, nodeName types.NodeName) error {
vm, err := as.getVirtualMachine(nodeName, cacheReadTypeDefault)
if err != nil {
// if host doesn't exist, no need to detach
klog.Warningf("azureDisk - cannot find node %s, skip detaching disk(%s, %s)", nodeName, diskName, diskURI)
return nil, nil
return nil
}
vmName := mapNodeNameToVMName(nodeName)
nodeResourceGroup, err := as.GetNodeResourceGroup(vmName)
if err != nil {
return nil, err
return err
}
disks := filterDetachingDisks(*vm.StorageProfile.DataDisks)
@ -160,7 +161,12 @@ func (as *availabilitySet) DetachDisk(diskName, diskURI string, nodeName types.N
// Invalidate the cache right after updating
defer as.cloud.vmCache.Delete(vmName)
return as.VirtualMachinesClient.Update(ctx, nodeResourceGroup, vmName, newVM, "detach_disk")
rerr := as.VirtualMachinesClient.Update(ctx, nodeResourceGroup, vmName, newVM, "detach_disk")
if rerr != nil {
return rerr.Error()
}
return nil
}
// GetDataDisks gets a list of data disks attached to the node.

View File

@ -54,7 +54,7 @@ func TestStandardAttachDisk(t *testing.T) {
err := vmSet.AttachDisk(true, "",
"uri", test.nodeName, 0, compute.CachingTypesReadOnly, "")
assert.Equal(t, test.expectedErr, err != nil, "TestCase[%d]: %s", i, test.desc)
assert.Equal(t, test.expectedErr, err != nil, "TestCase[%d]: %s, err: %v", i, test.desc, err)
}
}
@ -89,7 +89,7 @@ func TestStandardDetachDisk(t *testing.T) {
vmSet := testCloud.vmSet
setTestVirtualMachines(testCloud, map[string]string{"vm1": "PowerState/Running"}, false)
_, err := vmSet.DetachDisk(test.diskName, "", test.nodeName)
err := vmSet.DetachDisk(test.diskName, "", test.nodeName)
assert.Equal(t, test.expectedError, err != nil, "TestCase[%d]: %s", i, test.desc)
}
}

View File

@ -19,7 +19,6 @@ limitations under the License.
package azure
import (
"net/http"
"strings"
"github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2019-07-01/compute"
@ -99,32 +98,34 @@ func (ss *scaleSet) AttachDisk(isManagedDisk bool, diskName, diskURI string, nod
defer ss.deleteCacheForNode(vmName)
klog.V(2).Infof("azureDisk - update(%s): vm(%s) - attach disk(%s, %s) with DiskEncryptionSetID(%s)", nodeResourceGroup, nodeName, diskName, diskURI, diskEncryptionSetID)
_, err = ss.VirtualMachineScaleSetVMsClient.Update(ctx, nodeResourceGroup, ssName, instanceID, newVM, "attach_disk")
if err != nil {
detail := err.Error()
rerr := ss.VirtualMachineScaleSetVMsClient.Update(ctx, nodeResourceGroup, ssName, instanceID, newVM, "attach_disk")
if rerr != nil {
detail := rerr.Error().Error()
if strings.Contains(detail, errLeaseFailed) || strings.Contains(detail, errDiskBlobNotFound) {
// if lease cannot be acquired or disk not found, immediately detach the disk and return the original error
klog.Infof("azureDisk - err %s, try detach disk(%s, %s)", detail, diskName, diskURI)
ss.DetachDisk(diskName, diskURI, nodeName)
}
} else {
klog.V(2).Infof("azureDisk - attach disk(%s, %s) succeeded", diskName, diskURI)
return rerr.Error()
}
return err
klog.V(2).Infof("azureDisk - attach disk(%s, %s) succeeded", diskName, diskURI)
return nil
}
// DetachDisk detaches a disk from host
// the vhd can be identified by diskName or diskURI
func (ss *scaleSet) DetachDisk(diskName, diskURI string, nodeName types.NodeName) (*http.Response, error) {
func (ss *scaleSet) DetachDisk(diskName, diskURI string, nodeName types.NodeName) error {
vmName := mapNodeNameToVMName(nodeName)
ssName, instanceID, vm, err := ss.getVmssVM(vmName, cacheReadTypeDefault)
if err != nil {
return nil, err
return err
}
nodeResourceGroup, err := ss.GetNodeResourceGroup(vmName)
if err != nil {
return nil, err
return err
}
disks := []compute.DataDisk{}
@ -168,7 +169,12 @@ func (ss *scaleSet) DetachDisk(diskName, diskURI string, nodeName types.NodeName
defer ss.deleteCacheForNode(vmName)
klog.V(2).Infof("azureDisk - update(%s): vm(%s) - detach disk(%s, %s)", nodeResourceGroup, nodeName, diskName, diskURI)
return ss.VirtualMachineScaleSetVMsClient.Update(ctx, nodeResourceGroup, ssName, instanceID, newVM, "detach_disk")
rerr := ss.VirtualMachineScaleSetVMsClient.Update(ctx, nodeResourceGroup, ssName, instanceID, newVM, "detach_disk")
if rerr != nil {
return rerr.Error()
}
return nil
}
// GetDataDisks gets a list of data disks attached to the node.

View File

@ -20,6 +20,7 @@ package azure
import (
"context"
"errors"
"fmt"
"math/rand"
"net/http"
@ -29,12 +30,12 @@ import (
"github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2019-07-01/compute"
"github.com/Azure/azure-sdk-for-go/services/network/mgmt/2019-06-01/network"
"github.com/Azure/azure-sdk-for-go/services/storage/mgmt/2019-06-01/storage"
"github.com/Azure/go-autorest/autorest"
"github.com/Azure/go-autorest/autorest/to"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
cloudprovider "k8s.io/cloud-provider"
"k8s.io/legacy-cloud-providers/azure/retry"
)
var (
@ -53,7 +54,7 @@ func newFakeAzureLBClient() *fakeAzureLBClient {
return fLBC
}
func (fLBC *fakeAzureLBClient) CreateOrUpdate(ctx context.Context, resourceGroupName string, loadBalancerName string, parameters network.LoadBalancer, etag string) (resp *http.Response, err error) {
func (fLBC *fakeAzureLBClient) CreateOrUpdate(ctx context.Context, resourceGroupName string, loadBalancerName string, parameters network.LoadBalancer, etag string) *retry.Error {
fLBC.mutex.Lock()
defer fLBC.mutex.Unlock()
@ -73,26 +74,24 @@ func (fLBC *fakeAzureLBClient) CreateOrUpdate(ctx context.Context, resourceGroup
}
fLBC.FakeStore[resourceGroupName][loadBalancerName] = parameters
return nil, nil
return nil
}
func (fLBC *fakeAzureLBClient) Delete(ctx context.Context, resourceGroupName string, loadBalancerName string) (resp *http.Response, err error) {
func (fLBC *fakeAzureLBClient) Delete(ctx context.Context, resourceGroupName string, loadBalancerName string) *retry.Error {
fLBC.mutex.Lock()
defer fLBC.mutex.Unlock()
if rgLBs, ok := fLBC.FakeStore[resourceGroupName]; ok {
if _, ok := rgLBs[loadBalancerName]; ok {
delete(rgLBs, loadBalancerName)
return nil, nil
return nil
}
}
return &http.Response{
StatusCode: http.StatusNotFound,
}, nil
return nil
}
func (fLBC *fakeAzureLBClient) Get(ctx context.Context, resourceGroupName string, loadBalancerName string, expand string) (result network.LoadBalancer, err error) {
func (fLBC *fakeAzureLBClient) Get(ctx context.Context, resourceGroupName string, loadBalancerName string, expand string) (result network.LoadBalancer, err *retry.Error) {
fLBC.mutex.Lock()
defer fLBC.mutex.Unlock()
if _, ok := fLBC.FakeStore[resourceGroupName]; ok {
@ -100,13 +99,14 @@ func (fLBC *fakeAzureLBClient) Get(ctx context.Context, resourceGroupName string
return entity, nil
}
}
return result, autorest.DetailedError{
StatusCode: http.StatusNotFound,
Message: "Not such LB",
}
return result, retry.GetError(
&http.Response{
StatusCode: http.StatusNotFound,
},
errors.New("Not such LB"))
}
func (fLBC *fakeAzureLBClient) List(ctx context.Context, resourceGroupName string) (result []network.LoadBalancer, err error) {
func (fLBC *fakeAzureLBClient) List(ctx context.Context, resourceGroupName string) (result []network.LoadBalancer, err *retry.Error) {
fLBC.mutex.Lock()
defer fLBC.mutex.Unlock()
@ -144,7 +144,7 @@ func newFakeAzurePIPClient(subscriptionID string) *fakeAzurePIPClient {
return fAPC
}
func (fAPC *fakeAzurePIPClient) CreateOrUpdate(ctx context.Context, resourceGroupName string, publicIPAddressName string, parameters network.PublicIPAddress) (resp *http.Response, err error) {
func (fAPC *fakeAzurePIPClient) CreateOrUpdate(ctx context.Context, resourceGroupName string, publicIPAddressName string, parameters network.PublicIPAddress) *retry.Error {
fAPC.mutex.Lock()
defer fAPC.mutex.Unlock()
@ -165,26 +165,28 @@ func (fAPC *fakeAzurePIPClient) CreateOrUpdate(ctx context.Context, resourceGrou
fAPC.FakeStore[resourceGroupName][publicIPAddressName] = parameters
return nil, nil
return nil
}
func (fAPC *fakeAzurePIPClient) Delete(ctx context.Context, resourceGroupName string, publicIPAddressName string) (resp *http.Response, err error) {
func (fAPC *fakeAzurePIPClient) Delete(ctx context.Context, resourceGroupName string, publicIPAddressName string) *retry.Error {
fAPC.mutex.Lock()
defer fAPC.mutex.Unlock()
if rgPIPs, ok := fAPC.FakeStore[resourceGroupName]; ok {
if _, ok := rgPIPs[publicIPAddressName]; ok {
delete(rgPIPs, publicIPAddressName)
return nil, nil
return nil
}
}
return &http.Response{
StatusCode: http.StatusNotFound,
}, nil
return retry.GetError(
&http.Response{
StatusCode: http.StatusNotFound,
},
errors.New("Not such PIP"))
}
func (fAPC *fakeAzurePIPClient) Get(ctx context.Context, resourceGroupName string, publicIPAddressName string, expand string) (result network.PublicIPAddress, err error) {
func (fAPC *fakeAzurePIPClient) Get(ctx context.Context, resourceGroupName string, publicIPAddressName string, expand string) (result network.PublicIPAddress, err *retry.Error) {
fAPC.mutex.Lock()
defer fAPC.mutex.Unlock()
if _, ok := fAPC.FakeStore[resourceGroupName]; ok {
@ -192,13 +194,14 @@ func (fAPC *fakeAzurePIPClient) Get(ctx context.Context, resourceGroupName strin
return entity, nil
}
}
return result, autorest.DetailedError{
StatusCode: http.StatusNotFound,
Message: "No such PIP",
}
return result, retry.GetError(
&http.Response{
StatusCode: http.StatusNotFound,
},
errors.New("Not such PIP"))
}
func (fAPC *fakeAzurePIPClient) GetVirtualMachineScaleSetPublicIPAddress(ctx context.Context, resourceGroupName string, virtualMachineScaleSetName string, virtualmachineIndex string, networkInterfaceName string, IPConfigurationName string, publicIPAddressName string, expand string) (result network.PublicIPAddress, err error) {
func (fAPC *fakeAzurePIPClient) GetVirtualMachineScaleSetPublicIPAddress(ctx context.Context, resourceGroupName string, virtualMachineScaleSetName string, virtualmachineIndex string, networkInterfaceName string, IPConfigurationName string, publicIPAddressName string, expand string) (result network.PublicIPAddress, err *retry.Error) {
fAPC.mutex.Lock()
defer fAPC.mutex.Unlock()
if _, ok := fAPC.FakeStore[resourceGroupName]; ok {
@ -206,13 +209,14 @@ func (fAPC *fakeAzurePIPClient) GetVirtualMachineScaleSetPublicIPAddress(ctx con
return entity, nil
}
}
return result, autorest.DetailedError{
StatusCode: http.StatusNotFound,
Message: "No such PIP",
}
return result, retry.GetError(
&http.Response{
StatusCode: http.StatusNotFound,
},
errors.New("Not such PIP"))
}
func (fAPC *fakeAzurePIPClient) List(ctx context.Context, resourceGroupName string) (result []network.PublicIPAddress, err error) {
func (fAPC *fakeAzurePIPClient) List(ctx context.Context, resourceGroupName string) (result []network.PublicIPAddress, err *retry.Error) {
fAPC.mutex.Lock()
defer fAPC.mutex.Unlock()
@ -246,7 +250,7 @@ func newFakeAzureInterfacesClient() *fakeAzureInterfacesClient {
return fIC
}
func (fIC *fakeAzureInterfacesClient) CreateOrUpdate(ctx context.Context, resourceGroupName string, networkInterfaceName string, parameters network.Interface) (resp *http.Response, err error) {
func (fIC *fakeAzureInterfacesClient) CreateOrUpdate(ctx context.Context, resourceGroupName string, networkInterfaceName string, parameters network.Interface) *retry.Error {
fIC.mutex.Lock()
defer fIC.mutex.Unlock()
@ -255,10 +259,10 @@ func (fIC *fakeAzureInterfacesClient) CreateOrUpdate(ctx context.Context, resour
}
fIC.FakeStore[resourceGroupName][networkInterfaceName] = parameters
return nil, nil
return nil
}
func (fIC *fakeAzureInterfacesClient) Get(ctx context.Context, resourceGroupName string, networkInterfaceName string, expand string) (result network.Interface, err error) {
func (fIC *fakeAzureInterfacesClient) Get(ctx context.Context, resourceGroupName string, networkInterfaceName string, expand string) (result network.Interface, err *retry.Error) {
fIC.mutex.Lock()
defer fIC.mutex.Unlock()
if _, ok := fIC.FakeStore[resourceGroupName]; ok {
@ -266,13 +270,14 @@ func (fIC *fakeAzureInterfacesClient) Get(ctx context.Context, resourceGroupName
return entity, nil
}
}
return result, autorest.DetailedError{
StatusCode: http.StatusNotFound,
Message: "Not such Interface",
}
return result, retry.GetError(
&http.Response{
StatusCode: http.StatusNotFound,
},
errors.New("Not such Interface"))
}
func (fIC *fakeAzureInterfacesClient) GetVirtualMachineScaleSetNetworkInterface(ctx context.Context, resourceGroupName string, virtualMachineScaleSetName string, virtualmachineIndex string, networkInterfaceName string, expand string) (result network.Interface, err error) {
func (fIC *fakeAzureInterfacesClient) GetVirtualMachineScaleSetNetworkInterface(ctx context.Context, resourceGroupName string, virtualMachineScaleSetName string, virtualmachineIndex string, networkInterfaceName string, expand string) (result network.Interface, err *retry.Error) {
fIC.mutex.Lock()
defer fIC.mutex.Unlock()
if _, ok := fIC.FakeStore[resourceGroupName]; ok {
@ -280,10 +285,11 @@ func (fIC *fakeAzureInterfacesClient) GetVirtualMachineScaleSetNetworkInterface(
return entity, nil
}
}
return result, autorest.DetailedError{
StatusCode: http.StatusNotFound,
Message: "Not such Interface",
}
return result, retry.GetError(
&http.Response{
StatusCode: http.StatusNotFound,
},
errors.New("Not such Interface"))
}
func (fIC *fakeAzureInterfacesClient) setFakeStore(store map[string]map[string]network.Interface) {
@ -305,7 +311,7 @@ func newFakeAzureVirtualMachinesClient() *fakeAzureVirtualMachinesClient {
return fVMC
}
func (fVMC *fakeAzureVirtualMachinesClient) CreateOrUpdate(ctx context.Context, resourceGroupName string, VMName string, parameters compute.VirtualMachine, source string) (resp *http.Response, err error) {
func (fVMC *fakeAzureVirtualMachinesClient) CreateOrUpdate(ctx context.Context, resourceGroupName string, VMName string, parameters compute.VirtualMachine, source string) *retry.Error {
fVMC.mutex.Lock()
defer fVMC.mutex.Unlock()
@ -314,10 +320,10 @@ func (fVMC *fakeAzureVirtualMachinesClient) CreateOrUpdate(ctx context.Context,
}
fVMC.FakeStore[resourceGroupName][VMName] = parameters
return nil, nil
return nil
}
func (fVMC *fakeAzureVirtualMachinesClient) Update(ctx context.Context, resourceGroupName string, VMName string, parameters compute.VirtualMachineUpdate, source string) (resp *http.Response, err error) {
func (fVMC *fakeAzureVirtualMachinesClient) Update(ctx context.Context, resourceGroupName string, VMName string, parameters compute.VirtualMachineUpdate, source string) *retry.Error {
fVMC.mutex.Lock()
defer fVMC.mutex.Unlock()
@ -325,10 +331,10 @@ func (fVMC *fakeAzureVirtualMachinesClient) Update(ctx context.Context, resource
fVMC.FakeStore[resourceGroupName] = make(map[string]compute.VirtualMachine)
}
return nil, nil
return nil
}
func (fVMC *fakeAzureVirtualMachinesClient) Get(ctx context.Context, resourceGroupName string, VMName string, expand compute.InstanceViewTypes) (result compute.VirtualMachine, err error) {
func (fVMC *fakeAzureVirtualMachinesClient) Get(ctx context.Context, resourceGroupName string, VMName string, expand compute.InstanceViewTypes) (result compute.VirtualMachine, err *retry.Error) {
fVMC.mutex.Lock()
defer fVMC.mutex.Unlock()
if _, ok := fVMC.FakeStore[resourceGroupName]; ok {
@ -336,13 +342,14 @@ func (fVMC *fakeAzureVirtualMachinesClient) Get(ctx context.Context, resourceGro
return entity, nil
}
}
return result, autorest.DetailedError{
StatusCode: http.StatusNotFound,
Message: "Not such VM",
}
return result, retry.GetError(
&http.Response{
StatusCode: http.StatusNotFound,
},
errors.New("Not such VM"))
}
func (fVMC *fakeAzureVirtualMachinesClient) List(ctx context.Context, resourceGroupName string) (result []compute.VirtualMachine, err error) {
func (fVMC *fakeAzureVirtualMachinesClient) List(ctx context.Context, resourceGroupName string) (result []compute.VirtualMachine, err *retry.Error) {
fVMC.mutex.Lock()
defer fVMC.mutex.Unlock()
@ -375,7 +382,7 @@ func newFakeAzureSubnetsClient() *fakeAzureSubnetsClient {
return fASC
}
func (fASC *fakeAzureSubnetsClient) CreateOrUpdate(ctx context.Context, resourceGroupName string, virtualNetworkName string, subnetName string, subnetParameters network.Subnet) (resp *http.Response, err error) {
func (fASC *fakeAzureSubnetsClient) CreateOrUpdate(ctx context.Context, resourceGroupName string, virtualNetworkName string, subnetName string, subnetParameters network.Subnet) *retry.Error {
fASC.mutex.Lock()
defer fASC.mutex.Unlock()
@ -385,10 +392,10 @@ func (fASC *fakeAzureSubnetsClient) CreateOrUpdate(ctx context.Context, resource
}
fASC.FakeStore[rgVnet][subnetName] = subnetParameters
return nil, nil
return nil
}
func (fASC *fakeAzureSubnetsClient) Delete(ctx context.Context, resourceGroupName string, virtualNetworkName string, subnetName string) (resp *http.Response, err error) {
func (fASC *fakeAzureSubnetsClient) Delete(ctx context.Context, resourceGroupName string, virtualNetworkName string, subnetName string) *retry.Error {
fASC.mutex.Lock()
defer fASC.mutex.Unlock()
@ -396,16 +403,18 @@ func (fASC *fakeAzureSubnetsClient) Delete(ctx context.Context, resourceGroupNam
if rgSubnets, ok := fASC.FakeStore[rgVnet]; ok {
if _, ok := rgSubnets[subnetName]; ok {
delete(rgSubnets, subnetName)
return nil, nil
return nil
}
}
return &http.Response{
StatusCode: http.StatusNotFound,
}, nil
return retry.GetError(
&http.Response{
StatusCode: http.StatusNotFound,
},
errors.New("Not such Subnet"))
}
func (fASC *fakeAzureSubnetsClient) Get(ctx context.Context, resourceGroupName string, virtualNetworkName string, subnetName string, expand string) (result network.Subnet, err error) {
func (fASC *fakeAzureSubnetsClient) Get(ctx context.Context, resourceGroupName string, virtualNetworkName string, subnetName string, expand string) (result network.Subnet, err *retry.Error) {
fASC.mutex.Lock()
defer fASC.mutex.Unlock()
rgVnet := strings.Join([]string{resourceGroupName, virtualNetworkName}, "AND")
@ -414,13 +423,14 @@ func (fASC *fakeAzureSubnetsClient) Get(ctx context.Context, resourceGroupName s
return entity, nil
}
}
return result, autorest.DetailedError{
StatusCode: http.StatusNotFound,
Message: "Not such Subnet",
}
return result, retry.GetError(
&http.Response{
StatusCode: http.StatusNotFound,
},
errors.New("Not such Subnet"))
}
func (fASC *fakeAzureSubnetsClient) List(ctx context.Context, resourceGroupName string, virtualNetworkName string) (result []network.Subnet, err error) {
func (fASC *fakeAzureSubnetsClient) List(ctx context.Context, resourceGroupName string, virtualNetworkName string) (result []network.Subnet, err *retry.Error) {
fASC.mutex.Lock()
defer fASC.mutex.Unlock()
@ -447,7 +457,7 @@ func newFakeAzureNSGClient() *fakeAzureNSGClient {
return fNSG
}
func (fNSG *fakeAzureNSGClient) CreateOrUpdate(ctx context.Context, resourceGroupName string, networkSecurityGroupName string, parameters network.SecurityGroup, etag string) (resp *http.Response, err error) {
func (fNSG *fakeAzureNSGClient) CreateOrUpdate(ctx context.Context, resourceGroupName string, networkSecurityGroupName string, parameters network.SecurityGroup, etag string) *retry.Error {
fNSG.mutex.Lock()
defer fNSG.mutex.Unlock()
@ -457,33 +467,35 @@ func (fNSG *fakeAzureNSGClient) CreateOrUpdate(ctx context.Context, resourceGrou
if nsg, ok := fNSG.FakeStore[resourceGroupName][networkSecurityGroupName]; ok {
if etag != "" && to.String(nsg.Etag) != "" && etag != to.String(nsg.Etag) {
return &http.Response{
return retry.GetError(&http.Response{
StatusCode: http.StatusPreconditionFailed,
}, errPreconditionFailedEtagMismatch
}, errPreconditionFailedEtagMismatch)
}
}
fNSG.FakeStore[resourceGroupName][networkSecurityGroupName] = parameters
return nil, nil
return nil
}
func (fNSG *fakeAzureNSGClient) Delete(ctx context.Context, resourceGroupName string, networkSecurityGroupName string) (resp *http.Response, err error) {
func (fNSG *fakeAzureNSGClient) Delete(ctx context.Context, resourceGroupName string, networkSecurityGroupName string) *retry.Error {
fNSG.mutex.Lock()
defer fNSG.mutex.Unlock()
if rgSGs, ok := fNSG.FakeStore[resourceGroupName]; ok {
if _, ok := rgSGs[networkSecurityGroupName]; ok {
delete(rgSGs, networkSecurityGroupName)
return nil, nil
return nil
}
}
return &http.Response{
StatusCode: http.StatusNotFound,
}, nil
return retry.GetError(
&http.Response{
StatusCode: http.StatusNotFound,
},
errors.New("Not such NSG"))
}
func (fNSG *fakeAzureNSGClient) Get(ctx context.Context, resourceGroupName string, networkSecurityGroupName string, expand string) (result network.SecurityGroup, err error) {
func (fNSG *fakeAzureNSGClient) Get(ctx context.Context, resourceGroupName string, networkSecurityGroupName string, expand string) (result network.SecurityGroup, err *retry.Error) {
fNSG.mutex.Lock()
defer fNSG.mutex.Unlock()
if _, ok := fNSG.FakeStore[resourceGroupName]; ok {
@ -491,13 +503,14 @@ func (fNSG *fakeAzureNSGClient) Get(ctx context.Context, resourceGroupName strin
return entity, nil
}
}
return result, autorest.DetailedError{
StatusCode: http.StatusNotFound,
Message: "Not such NSG",
}
return result, retry.GetError(
&http.Response{
StatusCode: http.StatusNotFound,
},
errors.New("Not such NSG"))
}
func (fNSG *fakeAzureNSGClient) List(ctx context.Context, resourceGroupName string) (result []network.SecurityGroup, err error) {
func (fNSG *fakeAzureNSGClient) List(ctx context.Context, resourceGroupName string) (result []network.SecurityGroup, err *retry.Error) {
fNSG.mutex.Lock()
defer fNSG.mutex.Unlock()
@ -535,7 +548,7 @@ func (fVMC *fakeVirtualMachineScaleSetVMsClient) setFakeStore(store map[string]m
fVMC.FakeStore = store
}
func (fVMC *fakeVirtualMachineScaleSetVMsClient) List(ctx context.Context, resourceGroupName string, virtualMachineScaleSetName string, filter string, selectParameter string, expand string) (result []compute.VirtualMachineScaleSetVM, err error) {
func (fVMC *fakeVirtualMachineScaleSetVMsClient) List(ctx context.Context, resourceGroupName string, virtualMachineScaleSetName string, filter string, selectParameter string, expand string) (result []compute.VirtualMachineScaleSetVM, err *retry.Error) {
fVMC.mutex.Lock()
defer fVMC.mutex.Unlock()
@ -549,7 +562,7 @@ func (fVMC *fakeVirtualMachineScaleSetVMsClient) List(ctx context.Context, resou
return result, nil
}
func (fVMC *fakeVirtualMachineScaleSetVMsClient) Get(ctx context.Context, resourceGroupName string, VMScaleSetName string, instanceID string, expand compute.InstanceViewTypes) (result compute.VirtualMachineScaleSetVM, err error) {
func (fVMC *fakeVirtualMachineScaleSetVMsClient) Get(ctx context.Context, resourceGroupName string, VMScaleSetName string, instanceID string, expand compute.InstanceViewTypes) (result compute.VirtualMachineScaleSetVM, err *retry.Error) {
fVMC.mutex.Lock()
defer fVMC.mutex.Unlock()
@ -560,13 +573,14 @@ func (fVMC *fakeVirtualMachineScaleSetVMsClient) Get(ctx context.Context, resour
}
}
return result, autorest.DetailedError{
StatusCode: http.StatusNotFound,
Message: "No such VirtualMachineScaleSetVM",
}
return result, retry.GetError(
&http.Response{
StatusCode: http.StatusNotFound,
},
errors.New("Not such VirtualMachineScaleSetVM"))
}
func (fVMC *fakeVirtualMachineScaleSetVMsClient) Update(ctx context.Context, resourceGroupName string, VMScaleSetName string, instanceID string, parameters compute.VirtualMachineScaleSetVM, source string) (resp *http.Response, err error) {
func (fVMC *fakeVirtualMachineScaleSetVMsClient) Update(ctx context.Context, resourceGroupName string, VMScaleSetName string, instanceID string, parameters compute.VirtualMachineScaleSetVM, source string) *retry.Error {
fVMC.mutex.Lock()
defer fVMC.mutex.Unlock()
@ -577,7 +591,7 @@ func (fVMC *fakeVirtualMachineScaleSetVMsClient) Update(ctx context.Context, res
}
}
return nil, nil
return nil
}
type fakeVirtualMachineScaleSetsClient struct {
@ -600,7 +614,7 @@ func (fVMSSC *fakeVirtualMachineScaleSetsClient) setFakeStore(store map[string]m
fVMSSC.FakeStore = store
}
func (fVMSSC *fakeVirtualMachineScaleSetsClient) CreateOrUpdate(ctx context.Context, resourceGroupName string, VMScaleSetName string, parameters compute.VirtualMachineScaleSet) (resp *http.Response, err error) {
func (fVMSSC *fakeVirtualMachineScaleSetsClient) CreateOrUpdate(ctx context.Context, resourceGroupName string, VMScaleSetName string, parameters compute.VirtualMachineScaleSet) *retry.Error {
fVMSSC.mutex.Lock()
defer fVMSSC.mutex.Unlock()
@ -609,10 +623,10 @@ func (fVMSSC *fakeVirtualMachineScaleSetsClient) CreateOrUpdate(ctx context.Cont
}
fVMSSC.FakeStore[resourceGroupName][VMScaleSetName] = parameters
return nil, nil
return nil
}
func (fVMSSC *fakeVirtualMachineScaleSetsClient) Get(ctx context.Context, resourceGroupName string, VMScaleSetName string) (result compute.VirtualMachineScaleSet, err error) {
func (fVMSSC *fakeVirtualMachineScaleSetsClient) Get(ctx context.Context, resourceGroupName string, VMScaleSetName string) (result compute.VirtualMachineScaleSet, err *retry.Error) {
fVMSSC.mutex.Lock()
defer fVMSSC.mutex.Unlock()
@ -622,13 +636,14 @@ func (fVMSSC *fakeVirtualMachineScaleSetsClient) Get(ctx context.Context, resour
}
}
return result, autorest.DetailedError{
StatusCode: http.StatusNotFound,
Message: "No such ScaleSet",
}
return result, retry.GetError(
&http.Response{
StatusCode: http.StatusNotFound,
},
errors.New("Not such ScaleSet"))
}
func (fVMSSC *fakeVirtualMachineScaleSetsClient) List(ctx context.Context, resourceGroupName string) (result []compute.VirtualMachineScaleSet, err error) {
func (fVMSSC *fakeVirtualMachineScaleSetsClient) List(ctx context.Context, resourceGroupName string) (result []compute.VirtualMachineScaleSet, err *retry.Error) {
fVMSSC.mutex.Lock()
defer fVMSSC.mutex.Unlock()
@ -642,8 +657,8 @@ func (fVMSSC *fakeVirtualMachineScaleSetsClient) List(ctx context.Context, resou
return result, nil
}
func (fVMSSC *fakeVirtualMachineScaleSetsClient) UpdateInstances(ctx context.Context, resourceGroupName string, VMScaleSetName string, VMInstanceIDs compute.VirtualMachineScaleSetVMInstanceRequiredIDs) (resp *http.Response, err error) {
return nil, nil
func (fVMSSC *fakeVirtualMachineScaleSetsClient) UpdateInstances(ctx context.Context, resourceGroupName string, VMScaleSetName string, VMInstanceIDs compute.VirtualMachineScaleSetVMInstanceRequiredIDs) *retry.Error {
return nil
}
type fakeRoutesClient struct {
@ -658,7 +673,7 @@ func newFakeRoutesClient() *fakeRoutesClient {
return fRC
}
func (fRC *fakeRoutesClient) CreateOrUpdate(ctx context.Context, resourceGroupName string, routeTableName string, routeName string, routeParameters network.Route, etag string) (resp *http.Response, err error) {
func (fRC *fakeRoutesClient) CreateOrUpdate(ctx context.Context, resourceGroupName string, routeTableName string, routeName string, routeParameters network.Route, etag string) *retry.Error {
fRC.mutex.Lock()
defer fRC.mutex.Unlock()
@ -667,23 +682,25 @@ func (fRC *fakeRoutesClient) CreateOrUpdate(ctx context.Context, resourceGroupNa
}
fRC.FakeStore[routeTableName][routeName] = routeParameters
return nil, nil
return nil
}
func (fRC *fakeRoutesClient) Delete(ctx context.Context, resourceGroupName string, routeTableName string, routeName string) (resp *http.Response, err error) {
func (fRC *fakeRoutesClient) Delete(ctx context.Context, resourceGroupName string, routeTableName string, routeName string) *retry.Error {
fRC.mutex.Lock()
defer fRC.mutex.Unlock()
if routes, ok := fRC.FakeStore[routeTableName]; ok {
if _, ok := routes[routeName]; ok {
delete(routes, routeName)
return nil, nil
return nil
}
}
return &http.Response{
StatusCode: http.StatusNotFound,
}, nil
return retry.GetError(
&http.Response{
StatusCode: http.StatusNotFound,
},
errors.New("Not such Route"))
}
type fakeRouteTablesClient struct {
@ -699,7 +716,7 @@ func newFakeRouteTablesClient() *fakeRouteTablesClient {
return fRTC
}
func (fRTC *fakeRouteTablesClient) CreateOrUpdate(ctx context.Context, resourceGroupName string, routeTableName string, parameters network.RouteTable, etag string) (resp *http.Response, err error) {
func (fRTC *fakeRouteTablesClient) CreateOrUpdate(ctx context.Context, resourceGroupName string, routeTableName string, parameters network.RouteTable, etag string) *retry.Error {
fRTC.mutex.Lock()
defer fRTC.mutex.Unlock()
@ -710,10 +727,10 @@ func (fRTC *fakeRouteTablesClient) CreateOrUpdate(ctx context.Context, resourceG
}
fRTC.FakeStore[resourceGroupName][routeTableName] = parameters
return nil, nil
return nil
}
func (fRTC *fakeRouteTablesClient) Get(ctx context.Context, resourceGroupName string, routeTableName string, expand string) (result network.RouteTable, err error) {
func (fRTC *fakeRouteTablesClient) Get(ctx context.Context, resourceGroupName string, routeTableName string, expand string) (result network.RouteTable, err *retry.Error) {
fRTC.mutex.Lock()
defer fRTC.mutex.Unlock()
@ -724,10 +741,11 @@ func (fRTC *fakeRouteTablesClient) Get(ctx context.Context, resourceGroupName st
return entity, nil
}
}
return result, autorest.DetailedError{
StatusCode: http.StatusNotFound,
Message: "Not such RouteTable",
}
return result, retry.GetError(
&http.Response{
StatusCode: http.StatusNotFound,
},
errors.New("Not such RouteTable"))
}
type fakeFileClient struct {
@ -760,7 +778,7 @@ func newFakeStorageAccountClient() *fakeStorageAccountClient {
return fSAC
}
func (fSAC *fakeStorageAccountClient) Create(ctx context.Context, resourceGroupName string, accountName string, parameters storage.AccountCreateParameters) (resp *http.Response, err error) {
func (fSAC *fakeStorageAccountClient) Create(ctx context.Context, resourceGroupName string, accountName string, parameters storage.AccountCreateParameters) *retry.Error {
fSAC.mutex.Lock()
defer fSAC.mutex.Unlock()
@ -777,42 +795,36 @@ func (fSAC *fakeStorageAccountClient) Create(ctx context.Context, resourceGroupN
AccountProperties: &storage.AccountProperties{},
}
return nil, nil
return nil
}
func (fSAC *fakeStorageAccountClient) Delete(ctx context.Context, resourceGroupName string, accountName string) (result autorest.Response, err error) {
func (fSAC *fakeStorageAccountClient) Delete(ctx context.Context, resourceGroupName string, accountName string) *retry.Error {
fSAC.mutex.Lock()
defer fSAC.mutex.Unlock()
if rgAccounts, ok := fSAC.FakeStore[resourceGroupName]; ok {
if _, ok := rgAccounts[accountName]; ok {
delete(rgAccounts, accountName)
result.Response = &http.Response{
StatusCode: http.StatusAccepted,
}
return result, nil
return nil
}
}
result.Response = &http.Response{
StatusCode: http.StatusNotFound,
}
err = autorest.DetailedError{
StatusCode: http.StatusNotFound,
Message: "Not such StorageAccount",
}
return result, err
return retry.GetError(
&http.Response{
StatusCode: http.StatusNotFound,
},
errors.New("Not such StorageAccount"))
}
func (fSAC *fakeStorageAccountClient) ListKeys(ctx context.Context, resourceGroupName string, accountName string) (result storage.AccountListKeysResult, err error) {
return fSAC.Keys, fSAC.Err
func (fSAC *fakeStorageAccountClient) ListKeys(ctx context.Context, resourceGroupName string, accountName string) (result storage.AccountListKeysResult, err *retry.Error) {
return fSAC.Keys, nil
}
func (fSAC *fakeStorageAccountClient) ListByResourceGroup(ctx context.Context, resourceGroupName string) (result storage.AccountListResult, err error) {
return fSAC.Accounts, fSAC.Err
func (fSAC *fakeStorageAccountClient) ListByResourceGroup(ctx context.Context, resourceGroupName string) (result storage.AccountListResult, err *retry.Error) {
return fSAC.Accounts, nil
}
func (fSAC *fakeStorageAccountClient) GetProperties(ctx context.Context, resourceGroupName string, accountName string) (result storage.Account, err error) {
func (fSAC *fakeStorageAccountClient) GetProperties(ctx context.Context, resourceGroupName string, accountName string) (result storage.Account, err *retry.Error) {
fSAC.mutex.Lock()
defer fSAC.mutex.Unlock()
@ -822,10 +834,11 @@ func (fSAC *fakeStorageAccountClient) GetProperties(ctx context.Context, resourc
}
}
return result, autorest.DetailedError{
StatusCode: http.StatusNotFound,
Message: "Not such StorageAccount",
}
return result, retry.GetError(
&http.Response{
StatusCode: http.StatusNotFound,
},
errors.New("Not such StorageAccount"))
}
type fakeDisksClient struct {
@ -840,7 +853,7 @@ func newFakeDisksClient() *fakeDisksClient {
return fDC
}
func (fDC *fakeDisksClient) CreateOrUpdate(ctx context.Context, resourceGroupName string, diskName string, diskParameter compute.Disk) (resp *http.Response, err error) {
func (fDC *fakeDisksClient) CreateOrUpdate(ctx context.Context, resourceGroupName string, diskName string, diskParameter compute.Disk) *retry.Error {
fDC.mutex.Lock()
defer fDC.mutex.Unlock()
@ -849,26 +862,28 @@ func (fDC *fakeDisksClient) CreateOrUpdate(ctx context.Context, resourceGroupNam
}
fDC.FakeStore[resourceGroupName][diskName] = diskParameter
return nil, nil
return nil
}
func (fDC *fakeDisksClient) Delete(ctx context.Context, resourceGroupName string, diskName string) (resp *http.Response, err error) {
func (fDC *fakeDisksClient) Delete(ctx context.Context, resourceGroupName string, diskName string) *retry.Error {
fDC.mutex.Lock()
defer fDC.mutex.Unlock()
if rgDisks, ok := fDC.FakeStore[resourceGroupName]; ok {
if _, ok := rgDisks[diskName]; ok {
delete(rgDisks, diskName)
return nil, nil
return nil
}
}
return &http.Response{
StatusCode: http.StatusAccepted,
}, nil
return retry.GetError(
&http.Response{
StatusCode: http.StatusNotFound,
},
errors.New("Not such Disk"))
}
func (fDC *fakeDisksClient) Get(ctx context.Context, resourceGroupName string, diskName string) (result compute.Disk, err error) {
func (fDC *fakeDisksClient) Get(ctx context.Context, resourceGroupName string, diskName string) (result compute.Disk, err *retry.Error) {
fDC.mutex.Lock()
defer fDC.mutex.Unlock()
@ -878,10 +893,11 @@ func (fDC *fakeDisksClient) Get(ctx context.Context, resourceGroupName string, d
}
}
return result, autorest.DetailedError{
StatusCode: http.StatusNotFound,
Message: "Not such Disk",
}
return result, retry.GetError(
&http.Response{
StatusCode: http.StatusNotFound,
},
errors.New("Not such Disk"))
}
type fakeVMSet struct {
@ -946,8 +962,8 @@ func (f *fakeVMSet) AttachDisk(isManagedDisk bool, diskName, diskURI string, nod
return fmt.Errorf("unimplemented")
}
func (f *fakeVMSet) DetachDisk(diskName, diskURI string, nodeName types.NodeName) (*http.Response, error) {
return nil, fmt.Errorf("unimplemented")
func (f *fakeVMSet) DetachDisk(diskName, diskURI string, nodeName types.NodeName) error {
return fmt.Errorf("unimplemented")
}
func (f *fakeVMSet) GetDataDisks(nodeName types.NodeName, crt cacheReadType) ([]compute.DataDisk, error) {

View File

@ -154,11 +154,13 @@ func (az *Cloud) EnsureLoadBalancer(ctx context.Context, clusterName string, ser
lb, err := az.reconcileLoadBalancer(clusterName, service, nodes, true /* wantLb */)
if err != nil {
klog.Errorf("reconcileLoadBalancer(%s) failed: %v", serviceName, err)
return nil, err
}
lbStatus, err := az.getServiceLoadBalancerStatus(service, lb)
if err != nil {
klog.Errorf("getServiceLoadBalancerStatus(%s) failed: %v", serviceName, err)
return nil, err
}
@ -168,17 +170,20 @@ func (az *Cloud) EnsureLoadBalancer(ctx context.Context, clusterName string, ser
}
klog.V(2).Infof("EnsureLoadBalancer: reconciling security group for service %q with IP %q, wantLb = true", serviceName, logSafe(serviceIP))
if _, err := az.reconcileSecurityGroup(clusterName, service, serviceIP, true /* wantLb */); err != nil {
klog.Errorf("reconcileSecurityGroup(%s) failed: %#v", serviceName, err)
return nil, err
}
updateService := updateServiceLoadBalancerIP(service, to.String(serviceIP))
flippedService := flipServiceInternalAnnotation(updateService)
if _, err := az.reconcileLoadBalancer(clusterName, flippedService, nil, false /* wantLb */); err != nil {
klog.Errorf("reconcileLoadBalancer(%s) failed: %#v", serviceName, err)
return nil, err
}
// lb is not reused here because the ETAG may be changed in above operations, hence reconcilePublicIP() would get lb again from cache.
if _, err := az.reconcilePublicIP(clusterName, updateService, to.String(lb.Name), true /* wantLb */); err != nil {
klog.Errorf("reconcilePublicIP(%s) failed: %#v", serviceName, err)
return nil, err
}
@ -206,42 +211,22 @@ func (az *Cloud) EnsureLoadBalancerDeleted(ctx context.Context, clusterName stri
serviceName := getServiceName(service)
klog.V(5).Infof("Delete service (%s): START clusterName=%q", serviceName, clusterName)
ignoreErrors := func(err error) error {
if ignoreStatusNotFoundFromError(err) == nil {
klog.V(5).Infof("EnsureLoadBalancerDeleted: ignoring StatusNotFound error because the resource doesn't exist (%v)", err)
return nil
}
if ignoreStatusForbiddenFromError(err) == nil {
klog.V(5).Infof("EnsureLoadBalancerDeleted: ignoring StatusForbidden error (%v). This may be caused by wrong configuration via service annotations", err)
return nil
}
return err
}
serviceIPToCleanup, err := az.findServiceIPAddress(ctx, clusterName, service, isInternal)
if ignoreErrors(err) != nil {
if err != nil {
return err
}
klog.V(2).Infof("EnsureLoadBalancerDeleted: reconciling security group for service %q with IP %q, wantLb = false", serviceName, serviceIPToCleanup)
if _, err := az.reconcileSecurityGroup(clusterName, service, &serviceIPToCleanup, false /* wantLb */); err != nil {
if ignoreErrors(err) != nil {
return err
}
return err
}
if _, err := az.reconcileLoadBalancer(clusterName, service, nil, false /* wantLb */); err != nil {
if ignoreErrors(err) != nil {
return err
}
return err
}
if _, err := az.reconcilePublicIP(clusterName, service, "", false /* wantLb */); err != nil {
if ignoreErrors(err) != nil {
return err
}
return err
}
klog.V(2).Infof("Delete service (%s): FINISH", serviceName)
@ -592,9 +577,9 @@ func (az *Cloud) ensurePublicIPExists(service *v1.Service, pipName string, domai
ctx, cancel := getContextWithCancel()
defer cancel()
pip, err = az.PublicIPAddressesClient.Get(ctx, pipResourceGroup, *pip.Name, "")
if err != nil {
return nil, err
pip, rerr := az.PublicIPAddressesClient.Get(ctx, pipResourceGroup, *pip.Name, "")
if rerr != nil {
return nil, rerr.Error()
}
return &pip, nil
}
@ -1610,9 +1595,7 @@ func (az *Cloud) safeDeletePublicIP(service *v1.Service, pipResourceGroup string
klog.V(10).Infof("DeletePublicIP(%s, %q): start", pipResourceGroup, pipName)
err := az.DeletePublicIP(service, pipResourceGroup, pipName)
if err != nil {
if err = ignoreStatusNotFoundFromError(err); err != nil {
return err
}
return err
}
klog.V(10).Infof("DeletePublicIP(%s, %q): end", pipResourceGroup, pipName)

View File

@ -393,8 +393,8 @@ func TestEnsureLoadBalancerDeleted(t *testing.T) {
} else {
assert.Nil(t, err, "TestCase[%d]: %s", i, c.desc)
assert.NotNil(t, lbStatus, "TestCase[%d]: %s", i, c.desc)
result, err := az.LoadBalancerClient.List(context.TODO(), az.Config.ResourceGroup)
assert.Nil(t, err, "TestCase[%d]: %s", i, c.desc)
result, rerr := az.LoadBalancerClient.List(context.TODO(), az.Config.ResourceGroup)
assert.Nil(t, rerr, "TestCase[%d]: %s", i, c.desc)
assert.Equal(t, len(result), 1, "TestCase[%d]: %s", i, c.desc)
assert.Equal(t, len(*result[0].LoadBalancingRules), 1, "TestCase[%d]: %s", i, c.desc)
}
@ -402,8 +402,8 @@ func TestEnsureLoadBalancerDeleted(t *testing.T) {
// finally, delete it.
err = az.EnsureLoadBalancerDeleted(context.TODO(), testClusterName, &c.service)
assert.Nil(t, err, "TestCase[%d]: %s", i, c.desc)
result, err := az.LoadBalancerClient.List(context.Background(), az.Config.ResourceGroup)
assert.Nil(t, err, "TestCase[%d]: %s", i, c.desc)
result, rerr := az.LoadBalancerClient.List(context.Background(), az.Config.ResourceGroup)
assert.Nil(t, rerr, "TestCase[%d]: %s", i, c.desc)
assert.Equal(t, len(result), 0, "TestCase[%d]: %s", i, c.desc)
}
}
@ -697,7 +697,7 @@ func TestGetServiceLoadBalancer(t *testing.T) {
clusterResources := getClusterResources(az, 3, 3)
for _, existingLB := range test.existingLBs {
_, err := az.LoadBalancerClient.CreateOrUpdate(context.TODO(), "rg", *existingLB.Name, existingLB, "")
err := az.LoadBalancerClient.CreateOrUpdate(context.TODO(), "rg", *existingLB.Name, existingLB, "")
if err != nil {
t.Fatalf("TestCase[%d] meets unexpected error: %v", i, err)
}
@ -904,22 +904,22 @@ func TestIsFrontendIPChanged(t *testing.T) {
for i, test := range testCases {
az := getTestCloud()
_, err := az.SubnetsClient.CreateOrUpdate(context.TODO(), "rg", "vnet", "testSubnet", test.exsistingSubnet)
err := az.SubnetsClient.CreateOrUpdate(context.TODO(), "rg", "vnet", "testSubnet", test.exsistingSubnet)
if err != nil {
t.Fatalf("TestCase[%d] meets unexpected error: %v", i, err)
}
for _, existingPIP := range test.exsistingPIPs {
_, err := az.PublicIPAddressesClient.CreateOrUpdate(context.TODO(), "rg", "pipName", existingPIP)
err := az.PublicIPAddressesClient.CreateOrUpdate(context.TODO(), "rg", "pipName", existingPIP)
if err != nil {
t.Fatalf("TestCase[%d] meets unexpected error: %v", i, err)
}
}
test.service.Spec.LoadBalancerIP = test.loadBalancerIP
test.service.Annotations[ServiceAnnotationLoadBalancerInternalSubnet] = test.annotations
flag, err := az.isFrontendIPChanged("testCluster", test.config,
flag, rerr := az.isFrontendIPChanged("testCluster", test.config,
&test.service, test.lbFrontendIPConfigName)
assert.Equal(t, test.expectedFlag, flag, "TestCase[%d]: %s", i, test.desc)
assert.Equal(t, test.expectedError, err != nil, "TestCase[%d]: %s", i, test.desc)
assert.Equal(t, test.expectedError, rerr != nil, "TestCase[%d]: %s", i, test.desc)
}
}
@ -964,7 +964,7 @@ func TestDeterminePublicIPName(t *testing.T) {
service := getTestService("test1", v1.ProtocolTCP, nil, 80)
service.Spec.LoadBalancerIP = test.loadBalancerIP
for _, existingPIP := range test.exsistingPIPs {
_, err := az.PublicIPAddressesClient.CreateOrUpdate(context.TODO(), "rg", "test", existingPIP)
err := az.PublicIPAddressesClient.CreateOrUpdate(context.TODO(), "rg", "test", existingPIP)
if err != nil {
t.Fatalf("TestCase[%d] meets unexpected error: %v", i, err)
}
@ -1558,7 +1558,7 @@ func TestReconcileLoadBalancer(t *testing.T) {
clusterResources := getClusterResources(az, 3, 3)
test.service.Spec.LoadBalancerIP = "1.2.3.4"
_, err := az.PublicIPAddressesClient.CreateOrUpdate(context.TODO(), "rg", "pipName", network.PublicIPAddress{
err := az.PublicIPAddressesClient.CreateOrUpdate(context.TODO(), "rg", "pipName", network.PublicIPAddress{
Name: to.StringPtr("pipName"),
PublicIPAddressPropertiesFormat: &network.PublicIPAddressPropertiesFormat{
IPAddress: to.StringPtr("1.2.3.4"),
@ -1568,13 +1568,13 @@ func TestReconcileLoadBalancer(t *testing.T) {
t.Fatalf("TestCase[%d] meets unexpected error: %v", i, err)
}
_, err = az.LoadBalancerClient.CreateOrUpdate(context.TODO(), az.getLoadBalancerResourceGroup(), "lb1", test.existingLB, "")
err = az.LoadBalancerClient.CreateOrUpdate(context.TODO(), az.getLoadBalancerResourceGroup(), "lb1", test.existingLB, "")
if err != nil {
t.Fatalf("TestCase[%d] meets unexpected error: %v", i, err)
}
lb, err := az.reconcileLoadBalancer("testCluster", &test.service, clusterResources.nodes, test.wantLb)
assert.Equal(t, test.expectedError, err, "TestCase[%d]: %s", i, test.desc)
lb, rerr := az.reconcileLoadBalancer("testCluster", &test.service, clusterResources.nodes, test.wantLb)
assert.Equal(t, test.expectedError, rerr, "TestCase[%d]: %s", i, test.desc)
if test.expectedError == nil {
assert.Equal(t, &test.expectedLB, lb, "TestCase[%d]: %s", i, test.desc)
@ -1806,7 +1806,7 @@ func TestReconcileSecurityGroup(t *testing.T) {
for i, test := range testCases {
az := getTestCloud()
for name, sg := range test.existingSgs {
_, err := az.SecurityGroupsClient.CreateOrUpdate(context.TODO(), "rg", name, sg, "")
err := az.SecurityGroupsClient.CreateOrUpdate(context.TODO(), "rg", name, sg, "")
if err != nil {
t.Fatalf("TestCase[%d] meets unexpected error: %v", i, err)
}
@ -1853,7 +1853,7 @@ func TestSafeDeletePublicIP(t *testing.T) {
for i, test := range testCases {
az := getTestCloud()
_, err := az.PublicIPAddressesClient.CreateOrUpdate(context.TODO(), "rg", "pip1", network.PublicIPAddress{
err := az.PublicIPAddressesClient.CreateOrUpdate(context.TODO(), "rg", "pip1", network.PublicIPAddress{
Name: to.StringPtr("pip1"),
PublicIPAddressPropertiesFormat: &network.PublicIPAddressPropertiesFormat{
IPConfiguration: &network.IPConfiguration{
@ -1865,10 +1865,10 @@ func TestSafeDeletePublicIP(t *testing.T) {
t.Fatalf("TestCase[%d] meets unexpected error: %v", i, err)
}
service := getTestService("test1", v1.ProtocolTCP, nil, 80)
err = az.safeDeletePublicIP(&service, "rg", test.pip, test.lb)
rerr := az.safeDeletePublicIP(&service, "rg", test.pip, test.lb)
assert.Equal(t, 0, len(*test.lb.FrontendIPConfigurations), "TestCase[%d]: %s", i, test.desc)
assert.Equal(t, 0, len(*test.lb.LoadBalancingRules), "TestCase[%d]: %s", i, test.desc)
assert.Equal(t, test.expectedError, err != nil, "TestCase[%d]: %s", i, test.desc)
assert.Equal(t, test.expectedError, rerr != nil, "TestCase[%d]: %s", i, test.desc)
}
}
@ -1975,7 +1975,7 @@ func TestReconcilePublicIP(t *testing.T) {
service := getTestService("test1", v1.ProtocolTCP, nil, 80)
service.Annotations = test.annotations
for _, pip := range test.existingPIPs {
_, err := az.PublicIPAddressesClient.CreateOrUpdate(context.TODO(), "rg", to.String(pip.Name), pip)
err := az.PublicIPAddressesClient.CreateOrUpdate(context.TODO(), "rg", to.String(pip.Name), pip)
if err != nil {
t.Fatalf("TestCase[%d] meets unexpected error: %v", i, err)
}
@ -2041,7 +2041,7 @@ func TestEnsurePublicIPExists(t *testing.T) {
az := getTestCloud()
service := getTestService("test1", v1.ProtocolTCP, nil, 80)
for _, pip := range test.existingPIPs {
_, err := az.PublicIPAddressesClient.CreateOrUpdate(context.TODO(), "rg", to.String(pip.Name), pip)
err := az.PublicIPAddressesClient.CreateOrUpdate(context.TODO(), "rg", to.String(pip.Name), pip)
if err != nil {
t.Fatalf("TestCase[%d] meets unexpected error: %v", i, err)
}
@ -2093,7 +2093,7 @@ func TestShouldUpdateLoadBalancer(t *testing.T) {
az := getTestCloud()
service := getTestService("test1", v1.ProtocolTCP, nil, 80)
if test.lbHasDeletionTimestamp {
service.ObjectMeta.DeletionTimestamp = &metav1.Time{time.Now()}
service.ObjectMeta.DeletionTimestamp = &metav1.Time{Time: time.Now()}
}
if test.existsLb {
lb := network.LoadBalancer{
@ -2109,7 +2109,7 @@ func TestShouldUpdateLoadBalancer(t *testing.T) {
},
},
}
_, err := az.LoadBalancerClient.CreateOrUpdate(context.TODO(), "rg", *lb.Name, lb, "")
err := az.LoadBalancerClient.CreateOrUpdate(context.TODO(), "rg", *lb.Name, lb, "")
if err != nil {
t.Fatalf("TestCase[%d] meets unexpected error: %v", i, err)
}

View File

@ -168,9 +168,9 @@ func (c *ManagedDiskController) CreateManagedDisk(options *ManagedDiskOptions) (
ctx, cancel := getContextWithCancel()
defer cancel()
_, err = c.common.cloud.DisksClient.CreateOrUpdate(ctx, options.ResourceGroup, options.DiskName, model)
if err != nil {
return "", err
rerr := c.common.cloud.DisksClient.CreateOrUpdate(ctx, options.ResourceGroup, options.DiskName, model)
if rerr != nil {
return "", rerr.Error()
}
diskID := ""
@ -214,9 +214,9 @@ func (c *ManagedDiskController) DeleteManagedDisk(diskURI string) error {
return fmt.Errorf("failed to delete disk(%s) since it's in attaching or detaching state", diskURI)
}
_, err = c.common.cloud.DisksClient.Delete(ctx, resourceGroup, diskName)
if err != nil {
return err
rerr := c.common.cloud.DisksClient.Delete(ctx, resourceGroup, diskName)
if rerr != nil {
return rerr.Error()
}
// We don't need poll here, k8s will immediately stop referencing the disk
// the disk will be eventually deleted - cleanly - by ARM
@ -231,16 +231,16 @@ func (c *ManagedDiskController) GetDisk(resourceGroup, diskName string) (string,
ctx, cancel := getContextWithCancel()
defer cancel()
result, err := c.common.cloud.DisksClient.Get(ctx, resourceGroup, diskName)
if err != nil {
return "", "", err
result, rerr := c.common.cloud.DisksClient.Get(ctx, resourceGroup, diskName)
if rerr != nil {
return "", "", rerr.Error()
}
if result.DiskProperties != nil && (*result.DiskProperties).ProvisioningState != nil {
return *(*result.DiskProperties).ProvisioningState, *result.ID, nil
}
return "", "", err
return "", "", nil
}
// ResizeDisk Expand the disk to new size
@ -254,9 +254,9 @@ func (c *ManagedDiskController) ResizeDisk(diskURI string, oldSize resource.Quan
return oldSize, err
}
result, err := c.common.cloud.DisksClient.Get(ctx, resourceGroup, diskName)
if err != nil {
return oldSize, err
result, rerr := c.common.cloud.DisksClient.Get(ctx, resourceGroup, diskName)
if rerr != nil {
return oldSize, rerr.Error()
}
if result.DiskProperties == nil || result.DiskProperties.DiskSizeGB == nil {
@ -277,8 +277,8 @@ func (c *ManagedDiskController) ResizeDisk(diskURI string, oldSize resource.Quan
ctx, cancel = getContextWithCancel()
defer cancel()
if _, err := c.common.cloud.DisksClient.CreateOrUpdate(ctx, resourceGroup, diskName, result); err != nil {
return oldSize, err
if rerr := c.common.cloud.DisksClient.CreateOrUpdate(ctx, resourceGroup, diskName, result); rerr != nil {
return oldSize, rerr.Error()
}
klog.V(2).Infof("azureDisk - resize disk(%s) with new size(%d) completed", diskName, requestGiB)
@ -325,10 +325,10 @@ func (c *Cloud) GetAzureDiskLabels(diskURI string) (map[string]string, error) {
// Get information of the disk.
ctx, cancel := getContextWithCancel()
defer cancel()
disk, err := c.DisksClient.Get(ctx, resourceGroup, diskName)
if err != nil {
klog.Errorf("Failed to get information for AzureDisk %q: %v", diskName, err)
return nil, err
disk, rerr := c.DisksClient.Get(ctx, resourceGroup, diskName)
if rerr != nil {
klog.Errorf("Failed to get information for AzureDisk %q: %v", diskName, rerr)
return nil, rerr.Error()
}
// Check whether availability zone is specified.

View File

@ -707,9 +707,9 @@ func (as *availabilitySet) getPrimaryInterfaceWithVMSet(nodeName, vmSetName stri
ctx, cancel := getContextWithCancel()
defer cancel()
nic, err := as.InterfacesClient.Get(ctx, nicResourceGroup, nicName, "")
if err != nil {
return network.Interface{}, err
nic, rerr := as.InterfacesClient.Get(ctx, nicResourceGroup, nicName, "")
if rerr != nil {
return network.Interface{}, rerr.Error()
}
return nic, nil

View File

@ -36,9 +36,9 @@ type accountWithLocation struct {
func (az *Cloud) getStorageAccounts(matchingAccountType, matchingAccountKind, resourceGroup, matchingLocation string) ([]accountWithLocation, error) {
ctx, cancel := getContextWithCancel()
defer cancel()
result, err := az.StorageAccountClient.ListByResourceGroup(ctx, resourceGroup)
if err != nil {
return nil, err
result, rerr := az.StorageAccountClient.ListByResourceGroup(ctx, resourceGroup)
if rerr != nil {
return nil, rerr.Error()
}
if result.Value == nil {
return nil, fmt.Errorf("unexpected error when listing storage accounts from resource group %s", resourceGroup)
@ -72,9 +72,9 @@ func (az *Cloud) GetStorageAccesskey(account, resourceGroup string) (string, err
ctx, cancel := getContextWithCancel()
defer cancel()
result, err := az.StorageAccountClient.ListKeys(ctx, resourceGroup, account)
if err != nil {
return "", err
result, rerr := az.StorageAccountClient.ListKeys(ctx, resourceGroup, account)
if rerr != nil {
return "", rerr.Error()
}
if result.Keys == nil {
return "", fmt.Errorf("empty keys")
@ -132,9 +132,9 @@ func (az *Cloud) EnsureStorageAccount(accountName, accountType, accountKind, res
ctx, cancel := getContextWithCancel()
defer cancel()
_, err := az.StorageAccountClient.Create(ctx, resourceGroup, accountName, cp)
if err != nil {
return "", "", fmt.Errorf(fmt.Sprintf("Failed to create storage account %s, error: %s", accountName, err))
rerr := az.StorageAccountClient.Create(ctx, resourceGroup, accountName, cp)
if rerr != nil {
return "", "", fmt.Errorf(fmt.Sprintf("Failed to create storage account %s, error: %v", accountName, rerr))
}
}
}

View File

@ -22,6 +22,7 @@ import (
"context"
"fmt"
"math"
"net/http"
"strings"
"testing"
@ -37,6 +38,7 @@ import (
"k8s.io/client-go/tools/record"
servicehelpers "k8s.io/cloud-provider/service/helpers"
"k8s.io/legacy-cloud-providers/azure/auth"
"k8s.io/legacy-cloud-providers/azure/retry"
)
var testClusterName = "testCluster"
@ -795,7 +797,11 @@ func TestReconcileSecurityGroupEtagMismatch(t *testing.T) {
newSG, err := az.reconcileSecurityGroup(testClusterName, &svc1, &lbStatus.Ingress[0].IP, true /* wantLb */)
assert.Nil(t, newSG)
assert.NotNil(t, err)
assert.Equal(t, err, errPreconditionFailedEtagMismatch)
expectedError := &retry.Error{
HTTPStatusCode: http.StatusPreconditionFailed,
RawError: errPreconditionFailedEtagMismatch,
}
assert.Equal(t, err, expectedError.Error())
}
func TestReconcilePublicIPWithNewService(t *testing.T) {
@ -1747,7 +1753,7 @@ func addTestSubnet(t *testing.T, az *Cloud, svc *v1.Service) {
ctx, cancel := getContextWithCancel()
defer cancel()
_, err := az.SubnetsClient.CreateOrUpdate(ctx, az.VnetResourceGroup, az.VnetName, subName,
err := az.SubnetsClient.CreateOrUpdate(ctx, az.VnetResourceGroup, az.VnetName, subName,
network.Subnet{
ID: &subnetID,
Name: &subName,

View File

@ -19,8 +19,6 @@ limitations under the License.
package azure
import (
"net/http"
"github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2019-07-01/compute"
"github.com/Azure/azure-sdk-for-go/services/network/mgmt/2019-06-01/network"
@ -68,7 +66,7 @@ type VMSet interface {
// AttachDisk attaches a vhd to vm. The vhd must exist, can be identified by diskName, diskURI, and lun.
AttachDisk(isManagedDisk bool, diskName, diskURI string, nodeName types.NodeName, lun int32, cachingMode compute.CachingTypes, diskEncryptionSetID string) error
// DetachDisk detaches a vhd from host. The vhd can be identified by diskName or diskURI.
DetachDisk(diskName, diskURI string, nodeName types.NodeName) (*http.Response, error)
DetachDisk(diskName, diskURI string, nodeName types.NodeName) error
// GetDataDisks gets a list of data disks attached to the node.
GetDataDisks(nodeName types.NodeName, crt cacheReadType) ([]compute.DataDisk, error)

View File

@ -36,7 +36,6 @@ import (
utilerrors "k8s.io/apimachinery/pkg/util/errors"
cloudprovider "k8s.io/cloud-provider"
"k8s.io/klog"
utilnet "k8s.io/utils/net"
)
@ -418,23 +417,22 @@ func (ss *scaleSet) GetIPByNodeName(nodeName string) (string, string, error) {
return internalIP, publicIP, nil
}
func (ss *scaleSet) getVMSSPublicIPAddress(resourceGroupName string, virtualMachineScaleSetName string, virtualmachineIndex string, networkInterfaceName string, IPConfigurationName string, publicIPAddressName string) (pip network.PublicIPAddress, exists bool, err error) {
var realErr error
var message string
func (ss *scaleSet) getVMSSPublicIPAddress(resourceGroupName string, virtualMachineScaleSetName string, virtualmachineIndex string, networkInterfaceName string, IPConfigurationName string, publicIPAddressName string) (network.PublicIPAddress, bool, error) {
ctx, cancel := getContextWithCancel()
defer cancel()
pip, err = ss.PublicIPAddressesClient.GetVirtualMachineScaleSetPublicIPAddress(ctx, resourceGroupName, virtualMachineScaleSetName, virtualmachineIndex, networkInterfaceName, IPConfigurationName, publicIPAddressName, "")
exists, message, realErr = checkResourceExistsFromError(err)
if realErr != nil {
return pip, false, realErr
pip, err := ss.PublicIPAddressesClient.GetVirtualMachineScaleSetPublicIPAddress(ctx, resourceGroupName, virtualMachineScaleSetName, virtualmachineIndex, networkInterfaceName, IPConfigurationName, publicIPAddressName, "")
exists, rerr := checkResourceExistsFromError(err)
if rerr != nil {
return pip, false, rerr.Error()
}
if !exists {
klog.V(2).Infof("Public IP %q not found with message: %q", publicIPAddressName, message)
klog.V(2).Infof("Public IP %q not found", publicIPAddressName)
return pip, false, nil
}
return pip, exists, err
return pip, exists, nil
}
// returns a list of private ips assigned to node
@ -525,14 +523,13 @@ func extractResourceGroupByProviderID(providerID string) (string, error) {
// listScaleSets lists all scale sets.
func (ss *scaleSet) listScaleSets(resourceGroup string) ([]string, error) {
var err error
ctx, cancel := getContextWithCancel()
defer cancel()
allScaleSets, err := ss.VirtualMachineScaleSetsClient.List(ctx, resourceGroup)
if err != nil {
klog.Errorf("VirtualMachineScaleSetsClient.List failed: %v", err)
return nil, err
allScaleSets, rerr := ss.VirtualMachineScaleSetsClient.List(ctx, resourceGroup)
if rerr != nil {
klog.Errorf("VirtualMachineScaleSetsClient.List failed: %v", rerr)
return nil, rerr.Error()
}
ssNames := make([]string, 0)
@ -551,14 +548,13 @@ func (ss *scaleSet) listScaleSets(resourceGroup string) ([]string, error) {
// listScaleSetVMs lists VMs belonging to the specified scale set.
func (ss *scaleSet) listScaleSetVMs(scaleSetName, resourceGroup string) ([]compute.VirtualMachineScaleSetVM, error) {
var err error
ctx, cancel := getContextWithCancel()
defer cancel()
allVMs, err := ss.VirtualMachineScaleSetVMsClient.List(ctx, resourceGroup, scaleSetName, "", "", string(compute.InstanceView))
if err != nil {
klog.Errorf("VirtualMachineScaleSetVMsClient.List failed: %v", err)
return nil, err
allVMs, rerr := ss.VirtualMachineScaleSetVMsClient.List(ctx, resourceGroup, scaleSetName, "", "", string(compute.InstanceView))
if rerr != nil {
klog.Errorf("VirtualMachineScaleSetVMsClient.List failed: %v", rerr)
return nil, rerr.Error()
}
return allVMs, nil
@ -695,9 +691,9 @@ func (ss *scaleSet) GetPrimaryInterface(nodeName string) (network.Interface, err
ctx, cancel := getContextWithCancel()
defer cancel()
nic, err := ss.InterfacesClient.GetVirtualMachineScaleSetNetworkInterface(ctx, resourceGroup, ssName, instanceID, nicName, "")
if err != nil {
exists, _, realErr := checkResourceExistsFromError(err)
nic, rerr := ss.InterfacesClient.GetVirtualMachineScaleSetNetworkInterface(ctx, resourceGroup, ssName, instanceID, nicName, "")
if rerr != nil {
exists, realErr := checkResourceExistsFromError(rerr)
if realErr != nil {
klog.Errorf("error: ss.GetPrimaryInterface(%s), ss.GetVirtualMachineScaleSetNetworkInterface.Get(%s, %s, %s), err=%v", nodeName, resourceGroup, ssName, nicName, realErr)
return network.Interface{}, err
@ -900,17 +896,17 @@ func (ss *scaleSet) EnsureHostInPool(service *v1.Service, nodeName types.NodeNam
ctx, cancel := getContextWithCancel()
defer cancel()
klog.V(2).Infof("EnsureHostInPool begins to update vmssVM(%s) with new backendPoolID %s", vmName, backendPoolID)
resp, err := ss.VirtualMachineScaleSetVMsClient.Update(ctx, nodeResourceGroup, ssName, instanceID, newVM, "network_update")
if ss.CloudProviderBackoff && shouldRetryHTTPRequest(resp, err) {
rerr := ss.VirtualMachineScaleSetVMsClient.Update(ctx, nodeResourceGroup, ssName, instanceID, newVM, "network_update")
if rerr != nil && rerr.Retriable && ss.CloudProviderBackoff {
klog.V(2).Infof("EnsureHostInPool update backing off vmssVM(%s) with new backendPoolID %s, err: %v", vmName, backendPoolID, err)
retryErr := ss.UpdateVmssVMWithRetry(nodeResourceGroup, ssName, instanceID, newVM, "network_update")
if retryErr != nil {
err = retryErr
klog.Errorf("EnsureHostInPool update abort backoff vmssVM(%s) with new backendPoolID %s, err: %v", vmName, backendPoolID, err)
}
return retryErr
}
return err
return rerr.Error()
}
func getVmssAndResourceGroupNameByVMProviderID(providerID string) (string, string, error) {
@ -1031,18 +1027,17 @@ func (ss *scaleSet) ensureVMSSInPool(service *v1.Service, nodes []*v1.Node, back
defer cancel()
klog.V(2).Infof("ensureVMSSInPool begins to update vmss(%s) with new backendPoolID %s", vmssName, backendPoolID)
resp, err := ss.VirtualMachineScaleSetsClient.CreateOrUpdate(ctx, ss.ResourceGroup, vmssName, newVMSS)
if ss.CloudProviderBackoff && shouldRetryHTTPRequest(resp, err) {
rerr := ss.VirtualMachineScaleSetsClient.CreateOrUpdate(ctx, ss.ResourceGroup, vmssName, newVMSS)
if rerr != nil && rerr.Retriable && ss.CloudProviderBackoff {
klog.V(2).Infof("ensureVMSSInPool update backing off vmss(%s) with new backendPoolID %s, err: %v", vmssName, backendPoolID, err)
retryErr := ss.CreateOrUpdateVmssWithRetry(ss.ResourceGroup, vmssName, newVMSS)
if retryErr != nil {
err = retryErr
klog.Errorf("ensureVMSSInPool update abort backoff vmssVM(%s) with new backendPoolID %s, err: %v", vmssName, backendPoolID, err)
return retryErr
}
}
if err != nil {
return err
if rerr != nil {
return rerr.Error()
}
}
return nil
@ -1177,8 +1172,8 @@ func (ss *scaleSet) ensureBackendPoolDeletedFromNode(service *v1.Service, nodeNa
ctx, cancel := getContextWithCancel()
defer cancel()
klog.V(2).Infof("ensureBackendPoolDeletedFromNode begins to update vmssVM(%s) with backendPoolID %s", nodeName, backendPoolID)
resp, err := ss.VirtualMachineScaleSetVMsClient.Update(ctx, nodeResourceGroup, ssName, instanceID, newVM, "network_update")
if ss.CloudProviderBackoff && shouldRetryHTTPRequest(resp, err) {
rerr := ss.VirtualMachineScaleSetVMsClient.Update(ctx, nodeResourceGroup, ssName, instanceID, newVM, "network_update")
if rerr != nil && rerr.Retriable && ss.CloudProviderBackoff {
klog.V(2).Infof("ensureBackendPoolDeletedFromNode update backing off vmssVM(%s) with backendPoolID %s, err: %v", nodeName, backendPoolID, err)
retryErr := ss.UpdateVmssVMWithRetry(nodeResourceGroup, ssName, instanceID, newVM, "network_update")
if retryErr != nil {
@ -1186,12 +1181,12 @@ func (ss *scaleSet) ensureBackendPoolDeletedFromNode(service *v1.Service, nodeNa
klog.Errorf("ensureBackendPoolDeletedFromNode update abort backoff vmssVM(%s) with backendPoolID %s, err: %v", nodeName, backendPoolID, err)
}
}
if err != nil {
if rerr != nil {
klog.Errorf("ensureBackendPoolDeletedFromNode failed to update vmssVM(%s) with backendPoolID %s: %v", nodeName, backendPoolID, err)
} else {
klog.V(2).Infof("ensureBackendPoolDeletedFromNode update vmssVM(%s) with backendPoolID %s succeeded", nodeName, backendPoolID)
}
return err
return rerr.Error()
}
// getNodeNameByIPConfigurationID gets the node name by IP configuration ID.
@ -1314,17 +1309,17 @@ func (ss *scaleSet) ensureBackendPoolDeletedFromVMSS(service *v1.Service, backen
defer cancel()
klog.V(2).Infof("ensureBackendPoolDeletedFromVMSS begins to update vmss(%s) with backendPoolID %s", vmssName, backendPoolID)
resp, err := ss.VirtualMachineScaleSetsClient.CreateOrUpdate(ctx, ss.ResourceGroup, vmssName, newVMSS)
if ss.CloudProviderBackoff && shouldRetryHTTPRequest(resp, err) {
rerr := ss.VirtualMachineScaleSetsClient.CreateOrUpdate(ctx, ss.ResourceGroup, vmssName, newVMSS)
if rerr != nil && rerr.Retriable && ss.CloudProviderBackoff {
klog.V(2).Infof("ensureBackendPoolDeletedFromVMSS update backing off vmss(%s) with backendPoolID %s, err: %v", vmssName, backendPoolID, err)
retryErr := ss.CreateOrUpdateVmssWithRetry(ss.ResourceGroup, vmssName, newVMSS)
if retryErr != nil {
err = retryErr
klog.Errorf("ensureBackendPoolDeletedFromVMSS update abort backoff vmssVM(%s) with backendPoolID %s, err: %v", vmssName, backendPoolID, err)
klog.Errorf("ensureBackendPoolDeletedFromVMSS update abort backoff vmssVM(%s) with backendPoolID %s, err: %v", vmssName, backendPoolID, retryErr)
return retryErr
}
}
if err != nil {
return err
if rerr != nil {
return rerr.Error()
}
}

View File

@ -66,10 +66,10 @@ func (ss *scaleSet) newVMSSCache() (*timedCache, error) {
}
for _, resourceGroup := range allResourceGroups.List() {
allScaleSets, err := ss.VirtualMachineScaleSetsClient.List(context.Background(), resourceGroup)
if err != nil {
klog.Errorf("VirtualMachineScaleSetsClient.List failed: %v", err)
return nil, err
allScaleSets, rerr := ss.VirtualMachineScaleSetsClient.List(context.Background(), resourceGroup)
if rerr != nil {
klog.Errorf("VirtualMachineScaleSetsClient.List failed: %v", rerr)
return nil, rerr.Error()
}
for _, scaleSet := range allScaleSets {

View File

@ -78,15 +78,15 @@ func TestVMSSVMCache(t *testing.T) {
assert.NoError(t, err)
// validate getting VMSS VM via cache.
virtualMachines, err := ss.VirtualMachineScaleSetVMsClient.List(
virtualMachines, rerr := ss.VirtualMachineScaleSetVMsClient.List(
context.Background(), "rg", "vmss", "", "", "")
assert.NoError(t, err)
assert.Nil(t, rerr)
assert.Equal(t, 3, len(virtualMachines))
for i := range virtualMachines {
vm := virtualMachines[i]
vmName := to.String(vm.OsProfile.ComputerName)
ssName, instanceID, realVM, err := ss.getVmssVM(vmName, cacheReadTypeDefault)
assert.NoError(t, err)
assert.Nil(t, err)
assert.Equal(t, "vmss", ssName)
assert.Equal(t, to.String(vm.InstanceID), instanceID)
assert.Equal(t, &vm, realVM)

View File

@ -27,11 +27,11 @@ import (
"github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2019-07-01/compute"
"github.com/Azure/azure-sdk-for-go/services/network/mgmt/2019-06-01/network"
"github.com/Azure/go-autorest/autorest"
"k8s.io/apimachinery/pkg/types"
cloudprovider "k8s.io/cloud-provider"
"k8s.io/klog"
"k8s.io/legacy-cloud-providers/azure/retry"
)
var (
@ -47,44 +47,16 @@ var (
// checkExistsFromError inspects an error and returns a true if err is nil,
// false if error is an autorest.Error with StatusCode=404 and will return the
// error back if error is another status code or another type of error.
func checkResourceExistsFromError(err error) (bool, string, error) {
func checkResourceExistsFromError(err *retry.Error) (bool, *retry.Error) {
if err == nil {
return true, "", nil
return true, nil
}
v, ok := err.(autorest.DetailedError)
if !ok {
return false, "", err
}
if v.StatusCode == http.StatusNotFound {
return false, err.Error(), nil
}
return false, "", v
}
// If it is StatusNotFound return nil,
// Otherwise, return what it is
func ignoreStatusNotFoundFromError(err error) error {
if err == nil {
return nil
if err.HTTPStatusCode == http.StatusNotFound {
return false, nil
}
v, ok := err.(autorest.DetailedError)
if ok && v.StatusCode == http.StatusNotFound {
return nil
}
return err
}
// ignoreStatusForbiddenFromError returns nil if the status code is StatusForbidden.
// This happens when AuthorizationFailed is reported from Azure API.
func ignoreStatusForbiddenFromError(err error) error {
if err == nil {
return nil
}
v, ok := err.(autorest.DetailedError)
if ok && v.StatusCode == http.StatusForbidden {
return nil
}
return err
return false, err
}
/// getVirtualMachine calls 'VirtualMachinesClient.Get' with a timed cache
@ -117,35 +89,30 @@ func (az *Cloud) getRouteTable(crt cacheReadType) (routeTable network.RouteTable
return *(cachedRt.(*network.RouteTable)), true, nil
}
func (az *Cloud) getPublicIPAddress(pipResourceGroup string, pipName string) (pip network.PublicIPAddress, exists bool, err error) {
func (az *Cloud) getPublicIPAddress(pipResourceGroup string, pipName string) (network.PublicIPAddress, bool, error) {
resourceGroup := az.ResourceGroup
if pipResourceGroup != "" {
resourceGroup = pipResourceGroup
}
var realErr error
var message string
ctx, cancel := getContextWithCancel()
defer cancel()
pip, err = az.PublicIPAddressesClient.Get(ctx, resourceGroup, pipName, "")
exists, message, realErr = checkResourceExistsFromError(err)
if realErr != nil {
return pip, false, realErr
pip, err := az.PublicIPAddressesClient.Get(ctx, resourceGroup, pipName, "")
exists, rerr := checkResourceExistsFromError(err)
if rerr != nil {
return pip, false, rerr.Error()
}
if !exists {
klog.V(2).Infof("Public IP %q not found with message: %q", pipName, message)
klog.V(2).Infof("Public IP %q not found", pipName)
return pip, false, nil
}
return pip, exists, err
return pip, exists, nil
}
func (az *Cloud) getSubnet(virtualNetworkName string, subnetName string) (subnet network.Subnet, exists bool, err error) {
var realErr error
var message string
func (az *Cloud) getSubnet(virtualNetworkName string, subnetName string) (network.Subnet, bool, error) {
var rg string
if len(az.VnetResourceGroup) > 0 {
rg = az.VnetResourceGroup
} else {
@ -154,18 +121,18 @@ func (az *Cloud) getSubnet(virtualNetworkName string, subnetName string) (subnet
ctx, cancel := getContextWithCancel()
defer cancel()
subnet, err = az.SubnetsClient.Get(ctx, rg, virtualNetworkName, subnetName, "")
exists, message, realErr = checkResourceExistsFromError(err)
if realErr != nil {
return subnet, false, realErr
subnet, err := az.SubnetsClient.Get(ctx, rg, virtualNetworkName, subnetName, "")
exists, rerr := checkResourceExistsFromError(err)
if rerr != nil {
return subnet, false, rerr.Error()
}
if !exists {
klog.V(2).Infof("Subnet %q not found with message: %q", subnetName, message)
klog.V(2).Infof("Subnet %q not found", subnetName)
return subnet, false, nil
}
return subnet, exists, err
return subnet, exists, nil
}
func (az *Cloud) getAzureLoadBalancer(name string, crt cacheReadType) (lb network.LoadBalancer, exists bool, err error) {
@ -181,7 +148,8 @@ func (az *Cloud) getAzureLoadBalancer(name string, crt cacheReadType) (lb networ
return *(cachedLB.(*network.LoadBalancer)), true, nil
}
func (az *Cloud) getSecurityGroup(crt cacheReadType) (nsg network.SecurityGroup, err error) {
func (az *Cloud) getSecurityGroup(crt cacheReadType) (network.SecurityGroup, error) {
nsg := network.SecurityGroup{}
if az.SecurityGroupName == "" {
return nsg, fmt.Errorf("securityGroupName is not configured")
}
@ -214,14 +182,14 @@ func (az *Cloud) newVMCache() (*timedCache, error) {
return nil, err
}
vm, err := az.VirtualMachinesClient.Get(ctx, resourceGroup, key, compute.InstanceView)
exists, message, realErr := checkResourceExistsFromError(err)
if realErr != nil {
return nil, realErr
vm, verr := az.VirtualMachinesClient.Get(ctx, resourceGroup, key, compute.InstanceView)
exists, rerr := checkResourceExistsFromError(verr)
if rerr != nil {
return nil, rerr.Error()
}
if !exists {
klog.V(2).Infof("Virtual machine %q not found with message: %q", key, message)
klog.V(2).Infof("Virtual machine %q not found", key)
return nil, nil
}
@ -240,13 +208,13 @@ func (az *Cloud) newLBCache() (*timedCache, error) {
defer cancel()
lb, err := az.LoadBalancerClient.Get(ctx, az.getLoadBalancerResourceGroup(), key, "")
exists, message, realErr := checkResourceExistsFromError(err)
if realErr != nil {
return nil, realErr
exists, rerr := checkResourceExistsFromError(err)
if rerr != nil {
return nil, rerr.Error()
}
if !exists {
klog.V(2).Infof("Load balancer %q not found with message: %q", key, message)
klog.V(2).Infof("Load balancer %q not found", key)
return nil, nil
}
@ -264,13 +232,13 @@ func (az *Cloud) newNSGCache() (*timedCache, error) {
ctx, cancel := getContextWithCancel()
defer cancel()
nsg, err := az.SecurityGroupsClient.Get(ctx, az.ResourceGroup, key, "")
exists, message, realErr := checkResourceExistsFromError(err)
if realErr != nil {
return nil, realErr
exists, rerr := checkResourceExistsFromError(err)
if rerr != nil {
return nil, rerr.Error()
}
if !exists {
klog.V(2).Infof("Security group %q not found with message: %q", key, message)
klog.V(2).Infof("Security group %q not found", key)
return nil, nil
}
@ -288,13 +256,13 @@ func (az *Cloud) newRouteTableCache() (*timedCache, error) {
ctx, cancel := getContextWithCancel()
defer cancel()
rt, err := az.RouteTablesClient.Get(ctx, az.RouteTableResourceGroup, key, "")
exists, message, realErr := checkResourceExistsFromError(err)
if realErr != nil {
return nil, realErr
exists, rerr := checkResourceExistsFromError(err)
if rerr != nil {
return nil, rerr.Error()
}
if !exists {
klog.V(2).Infof("Route table %q not found with message: %q", key, message)
klog.V(2).Infof("Route table %q not found", key)
return nil, nil
}

View File

@ -19,25 +19,23 @@ limitations under the License.
package azure
import (
"fmt"
"net/http"
"reflect"
"testing"
"github.com/Azure/go-autorest/autorest"
"github.com/stretchr/testify/assert"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/legacy-cloud-providers/azure/retry"
)
func TestExtractNotFound(t *testing.T) {
notFound := autorest.DetailedError{StatusCode: http.StatusNotFound}
otherHTTP := autorest.DetailedError{StatusCode: http.StatusForbidden}
otherErr := fmt.Errorf("other error")
notFound := &retry.Error{HTTPStatusCode: http.StatusNotFound}
otherHTTP := &retry.Error{HTTPStatusCode: http.StatusForbidden}
otherErr := &retry.Error{HTTPStatusCode: http.StatusTooManyRequests}
tests := []struct {
err error
expectedErr error
err *retry.Error
expectedErr *retry.Error
exists bool
}{
{nil, nil, true},
@ -47,7 +45,7 @@ func TestExtractNotFound(t *testing.T) {
}
for _, test := range tests {
exists, _, err := checkResourceExistsFromError(test.err)
exists, err := checkResourceExistsFromError(test.err)
if test.exists != exists {
t.Errorf("expected: %v, saw: %v", test.exists, exists)
}

View File

@ -0,0 +1,31 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = ["azure_error.go"],
importmap = "k8s.io/kubernetes/vendor/k8s.io/legacy-cloud-providers/azure/retry",
importpath = "k8s.io/legacy-cloud-providers/azure/retry",
visibility = ["//visibility:public"],
deps = ["//vendor/k8s.io/klog:go_default_library"],
)
go_test(
name = "go_default_test",
srcs = ["azure_error_test.go"],
embed = [":go_default_library"],
deps = ["//vendor/github.com/stretchr/testify/assert:go_default_library"],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -0,0 +1,198 @@
// +build !providerless
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package retry
import (
"fmt"
"net/http"
"strconv"
"strings"
"time"
"k8s.io/klog"
)
var (
// The function to get current time.
now = time.Now
)
// Error indicates an error returned by Azure APIs.
type Error struct {
// Retriable indicates whether the request is retriable.
Retriable bool
// HTTPStatusCode indicates the response HTTP status code.
HTTPStatusCode int
// RetryAfter indicates the time when the request should retry after throttling.
// A throttled request is retriable.
RetryAfter time.Time
// RetryAfter indicates the raw error from API.
RawError error
}
// Error returns the error.
// Note that Error doesn't implement error interface because (nil *Error) != (nil error).
func (err *Error) Error() error {
if err == nil {
return nil
}
return fmt.Errorf("Retriable: %v, RetryAfter: %s, HTTPStatusCode: %d, RawError: %v",
err.Retriable, err.RetryAfter.String(), err.HTTPStatusCode, err.RawError)
}
// NewError creates a new Error.
func NewError(retriable bool, err error) *Error {
return &Error{
Retriable: retriable,
RawError: err,
}
}
// GetRetriableError gets new retriable Error.
func GetRetriableError(err error) *Error {
return &Error{
Retriable: true,
RawError: err,
}
}
// GetError gets a new Error based on resp and error.
func GetError(resp *http.Response, err error) *Error {
if err == nil && resp == nil {
return nil
}
if err == nil && resp != nil && isSuccessHTTPResponse(resp) {
// HTTP 2xx suggests a successful response
return nil
}
retryAfter := time.Time{}
if retryAfterDuration := getRetryAfter(resp); retryAfterDuration != 0 {
retryAfter = now().Add(retryAfterDuration)
}
rawError := err
if err == nil && resp != nil {
rawError = fmt.Errorf("HTTP response: %v", resp.StatusCode)
}
return &Error{
RawError: rawError,
RetryAfter: retryAfter,
Retriable: shouldRetryHTTPRequest(resp, err),
HTTPStatusCode: getHTTPStatusCode(resp),
}
}
// isSuccessHTTPResponse determines if the response from an HTTP request suggests success
func isSuccessHTTPResponse(resp *http.Response) bool {
if resp == nil {
return false
}
// HTTP 2xx suggests a successful response
if 199 < resp.StatusCode && resp.StatusCode < 300 {
return true
}
return false
}
func getHTTPStatusCode(resp *http.Response) int {
if resp == nil {
return -1
}
return resp.StatusCode
}
// shouldRetryHTTPRequest determines if the request is retriable.
func shouldRetryHTTPRequest(resp *http.Response, err error) bool {
if resp != nil {
// HTTP 412 (StatusPreconditionFailed) means etag mismatch, hence we shouldn't retry.
if resp.StatusCode == http.StatusPreconditionFailed {
return false
}
// HTTP 4xx (except 412) or 5xx suggests we should retry.
if 399 < resp.StatusCode && resp.StatusCode < 600 {
return true
}
}
if err != nil {
return true
}
return false
}
// getRetryAfter gets the retryAfter from http response.
// The value of Retry-After can be either the number of seconds or a date in RFC1123 format.
func getRetryAfter(resp *http.Response) time.Duration {
if resp == nil {
return 0
}
ra := resp.Header.Get("Retry-After")
if ra == "" {
return 0
}
var dur time.Duration
if retryAfter, _ := strconv.Atoi(ra); retryAfter > 0 {
dur = time.Duration(retryAfter) * time.Second
} else if t, err := time.Parse(time.RFC1123, ra); err == nil {
dur = t.Sub(now())
}
return dur
}
// GetStatusNotFoundAndForbiddenIgnoredError gets an error with StatusNotFound and StatusForbidden ignored.
// It is only used in DELETE operations.
func GetStatusNotFoundAndForbiddenIgnoredError(resp *http.Response, err error) *Error {
rerr := GetError(resp, err)
if rerr == nil {
return nil
}
// Returns nil when it is StatusNotFound error.
if rerr.HTTPStatusCode == http.StatusNotFound {
klog.V(3).Infof("Ignoring StatusNotFound error: %v", rerr)
return nil
}
// Returns nil if the status code is StatusForbidden.
// This happens when AuthorizationFailed is reported from Azure API.
if rerr.HTTPStatusCode == http.StatusForbidden {
klog.V(3).Infof("Ignoring StatusForbidden error: %v", rerr)
return nil
}
return rerr
}
// IsErrorRetriable returns true if the error is retriable.
func IsErrorRetriable(err error) bool {
if err == nil {
return false
}
return strings.Contains(err.Error(), "Retriable: true")
}

View File

@ -0,0 +1,253 @@
// +build !providerless
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package retry
import (
"fmt"
"net/http"
"testing"
"time"
"github.com/stretchr/testify/assert"
)
func TestGetError(t *testing.T) {
now = func() time.Time {
return time.Time{}
}
tests := []struct {
code int
retryAfter int
err error
expected *Error
}{
{
code: http.StatusOK,
expected: nil,
},
{
code: http.StatusOK,
err: fmt.Errorf("some error"),
expected: &Error{
Retriable: true,
HTTPStatusCode: http.StatusOK,
RawError: fmt.Errorf("some error"),
},
},
{
code: http.StatusBadRequest,
expected: &Error{
Retriable: true,
HTTPStatusCode: http.StatusBadRequest,
RawError: fmt.Errorf("HTTP response: 400"),
},
},
{
code: http.StatusInternalServerError,
expected: &Error{
Retriable: true,
HTTPStatusCode: http.StatusInternalServerError,
RawError: fmt.Errorf("HTTP response: 500"),
},
},
{
code: http.StatusSeeOther,
err: fmt.Errorf("some error"),
expected: &Error{
Retriable: true,
HTTPStatusCode: http.StatusSeeOther,
RawError: fmt.Errorf("some error"),
},
},
{
code: http.StatusTooManyRequests,
retryAfter: 100,
expected: &Error{
Retriable: true,
HTTPStatusCode: http.StatusTooManyRequests,
RetryAfter: now().Add(100 * time.Second),
RawError: fmt.Errorf("HTTP response: 429"),
},
},
}
for _, test := range tests {
resp := &http.Response{
StatusCode: test.code,
Header: http.Header{},
}
if test.retryAfter != 0 {
resp.Header.Add("Retry-After", fmt.Sprintf("%d", test.retryAfter))
}
rerr := GetError(resp, test.err)
assert.Equal(t, test.expected, rerr)
}
}
func TestGetStatusNotFoundAndForbiddenIgnoredError(t *testing.T) {
now = func() time.Time {
return time.Time{}
}
tests := []struct {
code int
retryAfter int
err error
expected *Error
}{
{
code: http.StatusOK,
expected: nil,
},
{
code: http.StatusNotFound,
expected: nil,
},
{
code: http.StatusForbidden,
expected: nil,
},
{
code: http.StatusOK,
err: fmt.Errorf("some error"),
expected: &Error{
Retriable: true,
HTTPStatusCode: http.StatusOK,
RawError: fmt.Errorf("some error"),
},
},
{
code: http.StatusBadRequest,
expected: &Error{
Retriable: true,
HTTPStatusCode: http.StatusBadRequest,
RawError: fmt.Errorf("HTTP response: 400"),
},
},
{
code: http.StatusInternalServerError,
expected: &Error{
Retriable: true,
HTTPStatusCode: http.StatusInternalServerError,
RawError: fmt.Errorf("HTTP response: 500"),
},
},
{
code: http.StatusSeeOther,
err: fmt.Errorf("some error"),
expected: &Error{
Retriable: true,
HTTPStatusCode: http.StatusSeeOther,
RawError: fmt.Errorf("some error"),
},
},
{
code: http.StatusTooManyRequests,
retryAfter: 100,
expected: &Error{
Retriable: true,
HTTPStatusCode: http.StatusTooManyRequests,
RetryAfter: now().Add(100 * time.Second),
RawError: fmt.Errorf("HTTP response: 429"),
},
},
}
for _, test := range tests {
resp := &http.Response{
StatusCode: test.code,
Header: http.Header{},
}
if test.retryAfter != 0 {
resp.Header.Add("Retry-After", fmt.Sprintf("%d", test.retryAfter))
}
rerr := GetStatusNotFoundAndForbiddenIgnoredError(resp, test.err)
assert.Equal(t, test.expected, rerr)
}
}
func TestShouldRetryHTTPRequest(t *testing.T) {
tests := []struct {
code int
err error
expected bool
}{
{
code: http.StatusBadRequest,
expected: true,
},
{
code: http.StatusInternalServerError,
expected: true,
},
{
code: http.StatusOK,
err: fmt.Errorf("some error"),
expected: true,
},
{
code: http.StatusOK,
expected: false,
},
{
code: 399,
expected: false,
},
}
for _, test := range tests {
resp := &http.Response{
StatusCode: test.code,
}
res := shouldRetryHTTPRequest(resp, test.err)
if res != test.expected {
t.Errorf("expected: %v, saw: %v", test.expected, res)
}
}
}
func TestIsSuccessResponse(t *testing.T) {
tests := []struct {
code int
expected bool
}{
{
code: http.StatusNotFound,
expected: false,
},
{
code: http.StatusInternalServerError,
expected: false,
},
{
code: http.StatusOK,
expected: true,
},
}
for _, test := range tests {
resp := http.Response{
StatusCode: test.code,
}
res := isSuccessHTTPResponse(&resp)
if res != test.expected {
t.Errorf("expected: %v, saw: %v", test.expected, res)
}
}
}

1
vendor/modules.txt vendored
View File

@ -1836,6 +1836,7 @@ k8s.io/kubelet/pkg/apis/pluginregistration/v1
k8s.io/legacy-cloud-providers/aws
k8s.io/legacy-cloud-providers/azure
k8s.io/legacy-cloud-providers/azure/auth
k8s.io/legacy-cloud-providers/azure/retry
k8s.io/legacy-cloud-providers/gce
k8s.io/legacy-cloud-providers/openstack
k8s.io/legacy-cloud-providers/vsphere