Merge pull request #63278 from feiskyer/azure-lb-new-interface

Automatic merge from submit-queue (batch tested with PRs 62657, 63278, 62903, 63375). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Use new Azure SDK APIs for load balancer and public IP operations

**What this PR does / why we need it**:

#63063 updated Azure SDK to a stable version. After that, we should also update existing clients to use new SDK APIs.

Without this, public IP listing will be blocked forever in some case.

**Which issue(s) this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close the issue(s) when PR gets merged)*:
Fixes #

**Special notes for your reviewer**:

A following PR will be sent for other interfaces, e.g. routes and NSGs.

**Release note**:

```release-note
NONE
```
This commit is contained in:
Kubernetes Submit Queue
2018-05-02 20:13:07 -07:00
committed by GitHub
6 changed files with 166 additions and 389 deletions

View File

@@ -127,11 +127,12 @@ func (az *Cloud) CreateOrUpdateSGWithRetry(sg network.SecurityGroup) error {
// CreateOrUpdateLBWithRetry invokes az.LoadBalancerClient.CreateOrUpdate with exponential backoff retry
func (az *Cloud) CreateOrUpdateLBWithRetry(lb network.LoadBalancer) error {
return wait.ExponentialBackoff(az.requestBackoff(), func() (bool, error) {
respChan, errChan := az.LoadBalancerClient.CreateOrUpdate(az.ResourceGroup, *lb.Name, lb, nil)
resp := <-respChan
err := <-errChan
ctx, cancel := getContextWithCancel()
defer cancel()
resp, err := az.LoadBalancerClient.CreateOrUpdate(ctx, az.ResourceGroup, *lb.Name, lb)
glog.V(10).Infof("LoadBalancerClient.CreateOrUpdate(%s): end", *lb.Name)
done, err := processRetryResponse(resp.Response, err)
done, err := processHTTPRetryResponse(resp, err)
if done && err == nil {
// Invalidate the cache right after updating
az.lbCache.Delete(*lb.Name)
@@ -142,20 +143,20 @@ func (az *Cloud) CreateOrUpdateLBWithRetry(lb network.LoadBalancer) error {
// ListLBWithRetry invokes az.LoadBalancerClient.List with exponential backoff retry
func (az *Cloud) ListLBWithRetry() ([]network.LoadBalancer, error) {
allLBs := []network.LoadBalancer{}
var result network.LoadBalancerListResult
var resultPage LoadBalancerListResultPage
var allLBs []network.LoadBalancer
err := wait.ExponentialBackoff(az.requestBackoff(), func() (bool, error) {
var retryErr error
resultPage, retryErr = az.LoadBalancerClient.List(az.ResourceGroup)
ctx, cancel := getContextWithCancel()
defer cancel()
allLBs, retryErr = az.LoadBalancerClient.List(ctx, az.ResourceGroup)
if retryErr != nil {
glog.Errorf("LoadBalancerClient.List(%v) - backoff: failure, will retry,err=%v",
az.ResourceGroup,
retryErr)
return false, retryErr
}
result = resultPage.Response()
glog.V(2).Infof("LoadBalancerClient.List(%v) - backoff: success", az.ResourceGroup)
return true, nil
})
@@ -163,52 +164,25 @@ func (az *Cloud) ListLBWithRetry() ([]network.LoadBalancer, error) {
return nil, err
}
appendResults := (result.Value != nil && len(*result.Value) > 0)
for appendResults {
allLBs = append(allLBs, *result.Value...)
appendResults = false
// follow the next link to get all the vms for resource group
if result.NextLink != nil {
err := wait.ExponentialBackoff(az.requestBackoff(), func() (bool, error) {
var retryErr error
resultPage, retryErr = az.LoadBalancerClient.ListNextResults(az.ResourceGroup, resultPage)
if retryErr != nil {
glog.Errorf("LoadBalancerClient.ListNextResults(%v) - backoff: failure, will retry,err=%v",
az.ResourceGroup,
retryErr)
return false, retryErr
}
result = resultPage.Response()
glog.V(2).Infof("LoadBalancerClient.ListNextResults(%v) - backoff: success", az.ResourceGroup)
return true, nil
})
if err != nil {
return allLBs, err
}
appendResults = (result.Value != nil && len(*result.Value) > 0)
}
}
return allLBs, nil
}
// ListPIPWithRetry list the PIP resources in the given resource group
func (az *Cloud) ListPIPWithRetry(pipResourceGroup string) ([]network.PublicIPAddress, error) {
allPIPs := []network.PublicIPAddress{}
var result network.PublicIPAddressListResult
var resultPage PublicIPAddressListResultPage
var allPIPs []network.PublicIPAddress
err := wait.ExponentialBackoff(az.requestBackoff(), func() (bool, error) {
var retryErr error
resultPage, retryErr = az.PublicIPAddressesClient.List(pipResourceGroup)
ctx, cancel := getContextWithCancel()
defer cancel()
allPIPs, retryErr = az.PublicIPAddressesClient.List(ctx, pipResourceGroup)
if retryErr != nil {
glog.Errorf("PublicIPAddressesClient.List(%v) - backoff: failure, will retry,err=%v",
pipResourceGroup,
retryErr)
return false, retryErr
}
result = resultPage.Response()
glog.V(2).Infof("PublicIPAddressesClient.List(%v) - backoff: success", pipResourceGroup)
return true, nil
})
@@ -216,44 +190,18 @@ func (az *Cloud) ListPIPWithRetry(pipResourceGroup string) ([]network.PublicIPAd
return nil, err
}
appendResults := (result.Value != nil && len(*result.Value) > 0)
for appendResults {
allPIPs = append(allPIPs, *result.Value...)
appendResults = false
// follow the next link to get all the pip resources for resource group
if result.NextLink != nil {
err := wait.ExponentialBackoff(az.requestBackoff(), func() (bool, error) {
var retryErr error
resultPage, retryErr = az.PublicIPAddressesClient.ListNextResults(az.ResourceGroup, resultPage)
if retryErr != nil {
glog.Errorf("PublicIPAddressesClient.ListNextResults(%v) - backoff: failure, will retry,err=%v",
pipResourceGroup,
retryErr)
return false, retryErr
}
result = resultPage.Response()
glog.V(2).Infof("PublicIPAddressesClient.ListNextResults(%v) - backoff: success", pipResourceGroup)
return true, nil
})
if err != nil {
return allPIPs, err
}
appendResults = (result.Value != nil && len(*result.Value) > 0)
}
}
return allPIPs, nil
}
// CreateOrUpdatePIPWithRetry invokes az.PublicIPAddressesClient.CreateOrUpdate with exponential backoff retry
func (az *Cloud) CreateOrUpdatePIPWithRetry(pipResourceGroup string, pip network.PublicIPAddress) error {
return wait.ExponentialBackoff(az.requestBackoff(), func() (bool, error) {
respChan, errChan := az.PublicIPAddressesClient.CreateOrUpdate(pipResourceGroup, *pip.Name, pip, nil)
resp := <-respChan
err := <-errChan
ctx, cancel := getContextWithCancel()
defer cancel()
resp, err := az.PublicIPAddressesClient.CreateOrUpdate(ctx, pipResourceGroup, *pip.Name, pip)
glog.V(10).Infof("PublicIPAddressesClient.CreateOrUpdate(%s, %s): end", pipResourceGroup, *pip.Name)
return processRetryResponse(resp.Response, err)
return processHTTPRetryResponse(resp, err)
})
}
@@ -271,20 +219,22 @@ func (az *Cloud) CreateOrUpdateInterfaceWithRetry(nic network.Interface) error {
// DeletePublicIPWithRetry invokes az.PublicIPAddressesClient.Delete with exponential backoff retry
func (az *Cloud) DeletePublicIPWithRetry(pipResourceGroup string, pipName string) error {
return wait.ExponentialBackoff(az.requestBackoff(), func() (bool, error) {
respChan, errChan := az.PublicIPAddressesClient.Delete(pipResourceGroup, pipName, nil)
resp := <-respChan
err := <-errChan
return processRetryResponse(resp, err)
ctx, cancel := getContextWithCancel()
defer cancel()
resp, err := az.PublicIPAddressesClient.Delete(ctx, pipResourceGroup, pipName)
return processHTTPRetryResponse(resp, err)
})
}
// DeleteLBWithRetry invokes az.LoadBalancerClient.Delete with exponential backoff retry
func (az *Cloud) DeleteLBWithRetry(lbName string) error {
return wait.ExponentialBackoff(az.requestBackoff(), func() (bool, error) {
respChan, errChan := az.LoadBalancerClient.Delete(az.ResourceGroup, lbName, nil)
resp := <-respChan
err := <-errChan
done, err := processRetryResponse(resp, err)
ctx, cancel := getContextWithCancel()
defer cancel()
resp, err := az.LoadBalancerClient.Delete(ctx, az.ResourceGroup, lbName)
done, err := processHTTPRetryResponse(resp, err)
if done && err == nil {
// Invalidate the cache right after deleting
az.lbCache.Delete(lbName)

View File

@@ -96,20 +96,18 @@ type InterfacesClient interface {
// LoadBalancersClient defines needed functions for azure network.LoadBalancersClient
type LoadBalancersClient interface {
CreateOrUpdate(resourceGroupName string, loadBalancerName string, parameters network.LoadBalancer, cancel <-chan struct{}) (<-chan network.LoadBalancer, <-chan error)
Delete(resourceGroupName string, loadBalancerName string, cancel <-chan struct{}) (<-chan autorest.Response, <-chan error)
Get(resourceGroupName string, loadBalancerName string, expand string) (result network.LoadBalancer, err error)
List(resourceGroupName string) (result LoadBalancerListResultPage, err error)
ListNextResults(resourceGroupName string, lastResult LoadBalancerListResultPage) (result LoadBalancerListResultPage, err error)
CreateOrUpdate(ctx context.Context, resourceGroupName string, loadBalancerName string, parameters network.LoadBalancer) (resp *http.Response, err error)
Delete(ctx context.Context, resourceGroupName string, loadBalancerName string) (resp *http.Response, err error)
Get(ctx context.Context, resourceGroupName string, loadBalancerName string, expand string) (result network.LoadBalancer, err error)
List(ctx context.Context, resourceGroupName string) (result []network.LoadBalancer, err error)
}
// PublicIPAddressesClient defines needed functions for azure network.PublicIPAddressesClient
type PublicIPAddressesClient interface {
CreateOrUpdate(resourceGroupName string, publicIPAddressName string, parameters network.PublicIPAddress, cancel <-chan struct{}) (<-chan network.PublicIPAddress, <-chan error)
Delete(resourceGroupName string, publicIPAddressName string, cancel <-chan struct{}) (<-chan autorest.Response, <-chan error)
Get(resourceGroupName string, publicIPAddressName string, expand string) (result network.PublicIPAddress, err error)
List(resourceGroupName string) (result PublicIPAddressListResultPage, err error)
ListNextResults(resourceGroupName string, lastResults PublicIPAddressListResultPage) (result PublicIPAddressListResultPage, err error)
CreateOrUpdate(ctx context.Context, resourceGroupName string, publicIPAddressName string, parameters network.PublicIPAddress) (resp *http.Response, err error)
Delete(ctx context.Context, resourceGroupName string, publicIPAddressName string) (resp *http.Response, err error)
Get(ctx context.Context, resourceGroupName string, publicIPAddressName string, expand string) (result network.PublicIPAddress, err error)
List(ctx context.Context, resourceGroupName string) (result []network.PublicIPAddress, err error)
}
// SubnetsClient defines needed functions for azure network.SubnetsClient
@@ -392,13 +390,11 @@ func newAzLoadBalancersClient(config *azClientConfig) *azLoadBalancersClient {
}
}
func (az *azLoadBalancersClient) CreateOrUpdate(resourceGroupName string, loadBalancerName string, parameters network.LoadBalancer, cancel <-chan struct{}) (<-chan network.LoadBalancer, <-chan error) {
func (az *azLoadBalancersClient) CreateOrUpdate(ctx context.Context, resourceGroupName string, loadBalancerName string, parameters network.LoadBalancer) (resp *http.Response, err error) {
/* Write rate limiting */
if !az.rateLimiterWriter.TryAccept() {
errChan := createARMRateLimitErrChannel(true, "LBCreateOrUpdate")
resultChan := make(chan network.LoadBalancer, 1)
resultChan <- network.LoadBalancer{}
return resultChan, errChan
err = createARMRateLimitErr(true, "LBCreateOrUpdate")
return nil, err
}
glog.V(10).Infof("azLoadBalancersClient.CreateOrUpdate(%q,%q): start", resourceGroupName, loadBalancerName)
@@ -406,37 +402,23 @@ func (az *azLoadBalancersClient) CreateOrUpdate(resourceGroupName string, loadBa
glog.V(10).Infof("azLoadBalancersClient.CreateOrUpdate(%q,%q): end", resourceGroupName, loadBalancerName)
}()
ctx := context.TODO()
errChan := make(chan error, 1)
resultChan := make(chan network.LoadBalancer, 1)
mc := newMetricContext("load_balancers", "create_or_update", resourceGroupName, az.client.SubscriptionID)
future, err := az.client.CreateOrUpdate(ctx, resourceGroupName, loadBalancerName, parameters)
mc.Observe(err)
if err != nil {
mc.Observe(err)
errChan <- err
return resultChan, errChan
}
go func() {
if err := future.WaitForCompletion(ctx, az.client.Client); err != nil {
mc.Observe(err)
errChan <- err
return
}
result, err := future.Result(az.client)
mc.Observe(err)
errChan <- err
resultChan <- result
}()
return resultChan, errChan
return future.Response(), err
}
func (az *azLoadBalancersClient) Delete(resourceGroupName string, loadBalancerName string, cancel <-chan struct{}) (<-chan autorest.Response, <-chan error) {
err = future.WaitForCompletion(ctx, az.client.Client)
mc.Observe(err)
return future.Response(), err
}
func (az *azLoadBalancersClient) Delete(ctx context.Context, resourceGroupName string, loadBalancerName string) (resp *http.Response, err error) {
/* Write rate limiting */
if !az.rateLimiterWriter.TryAccept() {
errChan := createARMRateLimitErrChannel(true, "LBDelete")
resultChan := make(chan autorest.Response, 1)
resultChan <- autorest.Response{}
return resultChan, errChan
err = createARMRateLimitErr(true, "LBDelete")
return nil, err
}
glog.V(10).Infof("azLoadBalancersClient.Delete(%q,%q): start", resourceGroupName, loadBalancerName)
@@ -444,31 +426,19 @@ func (az *azLoadBalancersClient) Delete(resourceGroupName string, loadBalancerNa
glog.V(10).Infof("azLoadBalancersClient.Delete(%q,%q): end", resourceGroupName, loadBalancerName)
}()
ctx := context.TODO()
errChan := make(chan error, 1)
resultChan := make(chan autorest.Response, 1)
mc := newMetricContext("load_balancers", "delete", resourceGroupName, az.client.SubscriptionID)
future, err := az.client.Delete(context.TODO(), resourceGroupName, loadBalancerName)
future, err := az.client.Delete(ctx, resourceGroupName, loadBalancerName)
mc.Observe(err)
if err != nil {
mc.Observe(err)
errChan <- err
return resultChan, errChan
}
go func() {
if err := future.WaitForCompletion(ctx, az.client.Client); err != nil {
mc.Observe(err)
errChan <- err
return
}
result, err := future.Result(az.client)
mc.Observe(err)
errChan <- err
resultChan <- result
}()
return resultChan, errChan
return future.Response(), err
}
func (az *azLoadBalancersClient) Get(resourceGroupName string, loadBalancerName string, expand string) (result network.LoadBalancer, err error) {
err = future.WaitForCompletion(ctx, az.client.Client)
mc.Observe(err)
return future.Response(), err
}
func (az *azLoadBalancersClient) Get(ctx context.Context, resourceGroupName string, loadBalancerName string, expand string) (result network.LoadBalancer, err error) {
if !az.rateLimiterReader.TryAccept() {
err = createARMRateLimitErr(false, "LBGet")
return
@@ -480,15 +450,15 @@ func (az *azLoadBalancersClient) Get(resourceGroupName string, loadBalancerName
}()
mc := newMetricContext("load_balancers", "get", resourceGroupName, az.client.SubscriptionID)
result, err = az.client.Get(context.TODO(), resourceGroupName, loadBalancerName, expand)
result, err = az.client.Get(ctx, resourceGroupName, loadBalancerName, expand)
mc.Observe(err)
return
}
func (az *azLoadBalancersClient) List(resourceGroupName string) (LoadBalancerListResultPage, error) {
func (az *azLoadBalancersClient) List(ctx context.Context, resourceGroupName string) ([]network.LoadBalancer, error) {
if !az.rateLimiterReader.TryAccept() {
err := createARMRateLimitErr(false, "LBList")
return &network.LoadBalancerListResultPage{}, err
return nil, err
}
glog.V(10).Infof("azLoadBalancersClient.List(%q): start", resourceGroupName)
@@ -497,26 +467,22 @@ func (az *azLoadBalancersClient) List(resourceGroupName string) (LoadBalancerLis
}()
mc := newMetricContext("load_balancers", "list", resourceGroupName, az.client.SubscriptionID)
result, err := az.client.List(context.TODO(), resourceGroupName)
iterator, err := az.client.ListComplete(ctx, resourceGroupName)
mc.Observe(err)
return &result, err
if err != nil {
return nil, err
}
func (az *azLoadBalancersClient) ListNextResults(resourceGroupName string, lastResult LoadBalancerListResultPage) (result LoadBalancerListResultPage, err error) {
if !az.rateLimiterReader.TryAccept() {
err = createARMRateLimitErr(false, "LBListNextResults")
return
result := make([]network.LoadBalancer, 0)
for ; iterator.NotDone(); err = iterator.Next() {
if err != nil {
return nil, err
}
glog.V(10).Infof("azLoadBalancersClient.ListNextResults(%q): start", lastResult)
defer func() {
glog.V(10).Infof("azLoadBalancersClient.ListNextResults(%q): end", lastResult)
}()
result = append(result, iterator.Value())
}
mc := newMetricContext("load_balancers", "list_next_results", resourceGroupName, az.client.SubscriptionID)
err = lastResult.Next()
mc.Observe(err)
return lastResult, err
return result, nil
}
// azPublicIPAddressesClient implements PublicIPAddressesClient.
@@ -540,13 +506,11 @@ func newAzPublicIPAddressesClient(config *azClientConfig) *azPublicIPAddressesCl
}
}
func (az *azPublicIPAddressesClient) CreateOrUpdate(resourceGroupName string, publicIPAddressName string, parameters network.PublicIPAddress, cancel <-chan struct{}) (<-chan network.PublicIPAddress, <-chan error) {
func (az *azPublicIPAddressesClient) CreateOrUpdate(ctx context.Context, resourceGroupName string, publicIPAddressName string, parameters network.PublicIPAddress) (resp *http.Response, err error) {
/* Write rate limiting */
if !az.rateLimiterWriter.TryAccept() {
errChan := createARMRateLimitErrChannel(true, "PublicIPCreateOrUpdate")
resultChan := make(chan network.PublicIPAddress, 1)
resultChan <- network.PublicIPAddress{}
return resultChan, errChan
err = createARMRateLimitErr(true, "PublicIPCreateOrUpdate")
return nil, err
}
glog.V(10).Infof("azPublicIPAddressesClient.CreateOrUpdate(%q,%q): start", resourceGroupName, publicIPAddressName)
@@ -554,37 +518,23 @@ func (az *azPublicIPAddressesClient) CreateOrUpdate(resourceGroupName string, pu
glog.V(10).Infof("azPublicIPAddressesClient.CreateOrUpdate(%q,%q): end", resourceGroupName, publicIPAddressName)
}()
ctx := context.TODO()
errChan := make(chan error, 1)
resultChan := make(chan network.PublicIPAddress, 1)
mc := newMetricContext("public_ip_addresses", "create_or_update", resourceGroupName, az.client.SubscriptionID)
future, err := az.client.CreateOrUpdate(context.TODO(), resourceGroupName, publicIPAddressName, parameters)
future, err := az.client.CreateOrUpdate(ctx, resourceGroupName, publicIPAddressName, parameters)
mc.Observe(err)
if err != nil {
mc.Observe(err)
errChan <- err
return resultChan, errChan
}
go func() {
if err := future.WaitForCompletion(ctx, az.client.Client); err != nil {
mc.Observe(err)
errChan <- err
return
}
result, err := future.Result(az.client)
mc.Observe(err)
errChan <- err
resultChan <- result
}()
return resultChan, errChan
return future.Response(), err
}
func (az *azPublicIPAddressesClient) Delete(resourceGroupName string, publicIPAddressName string, cancel <-chan struct{}) (<-chan autorest.Response, <-chan error) {
err = future.WaitForCompletion(ctx, az.client.Client)
mc.Observe(err)
return future.Response(), err
}
func (az *azPublicIPAddressesClient) Delete(ctx context.Context, resourceGroupName string, publicIPAddressName string) (resp *http.Response, err error) {
/* Write rate limiting */
if !az.rateLimiterWriter.TryAccept() {
errChan := createARMRateLimitErrChannel(true, "PublicIPDelete")
resultChan := make(chan autorest.Response, 1)
resultChan <- autorest.Response{}
return resultChan, errChan
err = createARMRateLimitErr(true, "PublicIPDelete")
return nil, err
}
glog.V(10).Infof("azPublicIPAddressesClient.Delete(%q,%q): start", resourceGroupName, publicIPAddressName)
@@ -592,31 +542,19 @@ func (az *azPublicIPAddressesClient) Delete(resourceGroupName string, publicIPAd
glog.V(10).Infof("azPublicIPAddressesClient.Delete(%q,%q): end", resourceGroupName, publicIPAddressName)
}()
ctx := context.TODO()
resultChan := make(chan autorest.Response, 1)
errChan := make(chan error, 1)
mc := newMetricContext("public_ip_addresses", "delete", resourceGroupName, az.client.SubscriptionID)
future, err := az.client.Delete(ctx, resourceGroupName, publicIPAddressName)
mc.Observe(err)
if err != nil {
mc.Observe(err)
errChan <- err
return resultChan, errChan
}
go func() {
if err := future.WaitForCompletion(ctx, az.client.Client); err != nil {
mc.Observe(err)
errChan <- err
return
}
result, err := future.Result(az.client)
mc.Observe(err)
errChan <- err
resultChan <- result
}()
return resultChan, errChan
return future.Response(), err
}
func (az *azPublicIPAddressesClient) Get(resourceGroupName string, publicIPAddressName string, expand string) (result network.PublicIPAddress, err error) {
err = future.WaitForCompletion(ctx, az.client.Client)
mc.Observe(err)
return future.Response(), err
}
func (az *azPublicIPAddressesClient) Get(ctx context.Context, resourceGroupName string, publicIPAddressName string, expand string) (result network.PublicIPAddress, err error) {
if !az.rateLimiterReader.TryAccept() {
err = createARMRateLimitErr(false, "PublicIPGet")
return
@@ -628,13 +566,12 @@ func (az *azPublicIPAddressesClient) Get(resourceGroupName string, publicIPAddre
}()
mc := newMetricContext("public_ip_addresses", "get", resourceGroupName, az.client.SubscriptionID)
ctx := context.TODO()
result, err = az.client.Get(ctx, resourceGroupName, publicIPAddressName, expand)
mc.Observe(err)
return
}
func (az *azPublicIPAddressesClient) List(resourceGroupName string) (PublicIPAddressListResultPage, error) {
func (az *azPublicIPAddressesClient) List(ctx context.Context, resourceGroupName string) ([]network.PublicIPAddress, error) {
if !az.rateLimiterReader.TryAccept() {
return nil, createARMRateLimitErr(false, "PublicIPList")
}
@@ -645,27 +582,22 @@ func (az *azPublicIPAddressesClient) List(resourceGroupName string) (PublicIPAdd
}()
mc := newMetricContext("public_ip_addresses", "list", resourceGroupName, az.client.SubscriptionID)
ctx := context.TODO()
result, err := az.client.List(ctx, resourceGroupName)
iterator, err := az.client.ListComplete(ctx, resourceGroupName)
mc.Observe(err)
return &result, err
if err != nil {
return nil, err
}
func (az *azPublicIPAddressesClient) ListNextResults(resourceGroupName string, lastResults PublicIPAddressListResultPage) (result PublicIPAddressListResultPage, err error) {
if !az.rateLimiterReader.TryAccept() {
err = createARMRateLimitErr(false, "PublicIPListNextResults")
return
result := make([]network.PublicIPAddress, 0)
for ; iterator.NotDone(); err = iterator.Next() {
if err != nil {
return nil, err
}
glog.V(10).Infof("azPublicIPAddressesClient.ListNextResults(%q): start", lastResults)
defer func() {
glog.V(10).Infof("azPublicIPAddressesClient.ListNextResults(%q): end", lastResults)
}()
result = append(result, iterator.Value())
}
mc := newMetricContext("public_ip_addresses", "list_next_results", resourceGroupName, az.client.SubscriptionID)
err = lastResults.Next()
mc.Observe(err)
return lastResults, err
return result, nil
}
// azSubnetsClient implements SubnetsClient.

View File

@@ -48,19 +48,10 @@ func newFakeAzureLBClient() *fakeAzureLBClient {
return fLBC
}
func (fLBC *fakeAzureLBClient) CreateOrUpdate(resourceGroupName string, loadBalancerName string, parameters network.LoadBalancer, cancel <-chan struct{}) (<-chan network.LoadBalancer, <-chan error) {
func (fLBC *fakeAzureLBClient) CreateOrUpdate(ctx context.Context, resourceGroupName string, loadBalancerName string, parameters network.LoadBalancer) (resp *http.Response, err error) {
fLBC.mutex.Lock()
defer fLBC.mutex.Unlock()
resultChan := make(chan network.LoadBalancer, 1)
errChan := make(chan error, 1)
var result network.LoadBalancer
var err error
defer func() {
resultChan <- result
errChan <- err
close(resultChan)
close(errChan)
}()
if _, ok := fLBC.FakeStore[resourceGroupName]; !ok {
fLBC.FakeStore[resourceGroupName] = make(map[string]network.LoadBalancer)
}
@@ -76,48 +67,27 @@ func (fLBC *fakeAzureLBClient) CreateOrUpdate(resourceGroupName string, loadBala
}
}
fLBC.FakeStore[resourceGroupName][loadBalancerName] = parameters
result = fLBC.FakeStore[resourceGroupName][loadBalancerName]
result.Response.Response = &http.Response{
StatusCode: http.StatusOK,
}
err = nil
return resultChan, errChan
return nil, nil
}
func (fLBC *fakeAzureLBClient) Delete(resourceGroupName string, loadBalancerName string, cancel <-chan struct{}) (<-chan autorest.Response, <-chan error) {
func (fLBC *fakeAzureLBClient) Delete(ctx context.Context, resourceGroupName string, loadBalancerName string) (resp *http.Response, err error) {
fLBC.mutex.Lock()
defer fLBC.mutex.Unlock()
respChan := make(chan autorest.Response, 1)
errChan := make(chan error, 1)
var resp autorest.Response
var err error
defer func() {
respChan <- resp
errChan <- err
close(respChan)
close(errChan)
}()
if rgLBs, ok := fLBC.FakeStore[resourceGroupName]; ok {
if _, ok := rgLBs[loadBalancerName]; ok {
delete(rgLBs, loadBalancerName)
resp.Response = &http.Response{
StatusCode: http.StatusAccepted,
return nil, nil
}
err = nil
return respChan, errChan
}
}
resp.Response = &http.Response{
StatusCode: http.StatusNotFound,
}
err = autorest.DetailedError{
StatusCode: http.StatusNotFound,
Message: "Not such LB",
}
return respChan, errChan
}
func (fLBC *fakeAzureLBClient) Get(resourceGroupName string, loadBalancerName string, expand string) (result network.LoadBalancer, err error) {
return &http.Response{
StatusCode: http.StatusNotFound,
}, nil
}
func (fLBC *fakeAzureLBClient) Get(ctx context.Context, resourceGroupName string, loadBalancerName string, expand string) (result network.LoadBalancer, err error) {
fLBC.mutex.Lock()
defer fLBC.mutex.Unlock()
if _, ok := fLBC.FakeStore[resourceGroupName]; ok {
@@ -131,48 +101,17 @@ func (fLBC *fakeAzureLBClient) Get(resourceGroupName string, loadBalancerName st
}
}
type fakeLoadBalancerListResultPage struct {
next LoadBalancerListResultPage
value network.LoadBalancerListResult
values []network.LoadBalancer
err error
}
func (pg *fakeLoadBalancerListResultPage) Next() error {
return nil
}
func (pg *fakeLoadBalancerListResultPage) NotDone() bool {
return pg.next != nil
}
func (pg *fakeLoadBalancerListResultPage) Response() network.LoadBalancerListResult {
return pg.value
}
func (pg *fakeLoadBalancerListResultPage) Values() []network.LoadBalancer {
return pg.values
}
func (fLBC *fakeAzureLBClient) List(resourceGroupName string) (result LoadBalancerListResultPage, err error) {
func (fLBC *fakeAzureLBClient) List(ctx context.Context, resourceGroupName string) (result []network.LoadBalancer, err error) {
fLBC.mutex.Lock()
defer fLBC.mutex.Unlock()
var value []network.LoadBalancer
if _, ok := fLBC.FakeStore[resourceGroupName]; ok {
for _, v := range fLBC.FakeStore[resourceGroupName] {
value = append(value, v)
}
}
return &fakeLoadBalancerListResultPage{
value: network.LoadBalancerListResult{
Value: &value,
},
values: value,
}, nil
}
func (fLBC *fakeAzureLBClient) ListNextResults(resourceGroupName string, lastResult LoadBalancerListResultPage) (result LoadBalancerListResultPage, err error) {
fLBC.mutex.Lock()
defer fLBC.mutex.Unlock()
return &fakeLoadBalancerListResultPage{}, nil
return value, nil
}
type fakeAzurePIPClient struct {
@@ -200,19 +139,10 @@ func newFakeAzurePIPClient(subscriptionID string) *fakeAzurePIPClient {
return fAPC
}
func (fAPC *fakeAzurePIPClient) CreateOrUpdate(resourceGroupName string, publicIPAddressName string, parameters network.PublicIPAddress, cancel <-chan struct{}) (<-chan network.PublicIPAddress, <-chan error) {
func (fAPC *fakeAzurePIPClient) CreateOrUpdate(ctx context.Context, resourceGroupName string, publicIPAddressName string, parameters network.PublicIPAddress) (resp *http.Response, err error) {
fAPC.mutex.Lock()
defer fAPC.mutex.Unlock()
resultChan := make(chan network.PublicIPAddress, 1)
errChan := make(chan error, 1)
var result network.PublicIPAddress
var err error
defer func() {
resultChan <- result
errChan <- err
close(resultChan)
close(errChan)
}()
if _, ok := fAPC.FakeStore[resourceGroupName]; !ok {
fAPC.FakeStore[resourceGroupName] = make(map[string]network.PublicIPAddress)
}
@@ -229,48 +159,27 @@ func (fAPC *fakeAzurePIPClient) CreateOrUpdate(resourceGroupName string, publicI
}
fAPC.FakeStore[resourceGroupName][publicIPAddressName] = parameters
result = fAPC.FakeStore[resourceGroupName][publicIPAddressName]
result.Response.Response = &http.Response{
StatusCode: http.StatusOK,
}
err = nil
return resultChan, errChan
return nil, nil
}
func (fAPC *fakeAzurePIPClient) Delete(resourceGroupName string, publicIPAddressName string, cancel <-chan struct{}) (<-chan autorest.Response, <-chan error) {
func (fAPC *fakeAzurePIPClient) Delete(ctx context.Context, resourceGroupName string, publicIPAddressName string) (resp *http.Response, err error) {
fAPC.mutex.Lock()
defer fAPC.mutex.Unlock()
respChan := make(chan autorest.Response, 1)
errChan := make(chan error, 1)
var resp autorest.Response
var err error
defer func() {
respChan <- resp
errChan <- err
close(respChan)
close(errChan)
}()
if rgPIPs, ok := fAPC.FakeStore[resourceGroupName]; ok {
if _, ok := rgPIPs[publicIPAddressName]; ok {
delete(rgPIPs, publicIPAddressName)
resp.Response = &http.Response{
StatusCode: http.StatusAccepted,
return nil, nil
}
err = nil
return respChan, errChan
}
}
resp.Response = &http.Response{
StatusCode: http.StatusNotFound,
}
err = autorest.DetailedError{
StatusCode: http.StatusNotFound,
Message: "Not such PIP",
}
return respChan, errChan
}
func (fAPC *fakeAzurePIPClient) Get(resourceGroupName string, publicIPAddressName string, expand string) (result network.PublicIPAddress, err error) {
return &http.Response{
StatusCode: http.StatusNotFound,
}, nil
}
func (fAPC *fakeAzurePIPClient) Get(ctx context.Context, resourceGroupName string, publicIPAddressName string, expand string) (result network.PublicIPAddress, err error) {
fAPC.mutex.Lock()
defer fAPC.mutex.Unlock()
if _, ok := fAPC.FakeStore[resourceGroupName]; ok {
@@ -284,49 +193,18 @@ func (fAPC *fakeAzurePIPClient) Get(resourceGroupName string, publicIPAddressNam
}
}
type fakePublicIPAddressListResultPage struct {
next PublicIPAddressListResultPage
value network.PublicIPAddressListResult
values []network.PublicIPAddress
err error
}
func (pg *fakePublicIPAddressListResultPage) Next() error {
return nil
}
func (pg *fakePublicIPAddressListResultPage) NotDone() bool {
return pg.next != nil
}
func (pg *fakePublicIPAddressListResultPage) Response() network.PublicIPAddressListResult {
return pg.value
}
func (pg *fakePublicIPAddressListResultPage) Values() []network.PublicIPAddress {
return pg.values
}
func (fAPC *fakeAzurePIPClient) ListNextResults(resourceGroupName string, lastResults PublicIPAddressListResultPage) (result PublicIPAddressListResultPage, err error) {
func (fAPC *fakeAzurePIPClient) List(ctx context.Context, resourceGroupName string) (result []network.PublicIPAddress, err error) {
fAPC.mutex.Lock()
defer fAPC.mutex.Unlock()
return &fakePublicIPAddressListResultPage{}, nil
}
func (fAPC *fakeAzurePIPClient) List(resourceGroupName string) (result PublicIPAddressListResultPage, err error) {
fAPC.mutex.Lock()
defer fAPC.mutex.Unlock()
var value []network.PublicIPAddress
if _, ok := fAPC.FakeStore[resourceGroupName]; ok {
for _, v := range fAPC.FakeStore[resourceGroupName] {
value = append(value, v)
}
}
result = &fakePublicIPAddressListResultPage{
value: network.PublicIPAddressListResult{
Value: &value,
},
values: value,
}
return result, nil
return value, nil
}
type fakeAzureInterfacesClient struct {

View File

@@ -454,7 +454,9 @@ func (az *Cloud) ensurePublicIPExists(service *v1.Service, pipName string, domai
}
glog.V(10).Infof("CreateOrUpdatePIPWithRetry(%s, %q): end", pipResourceGroup, *pip.Name)
pip, err = az.PublicIPAddressesClient.Get(pipResourceGroup, *pip.Name, "")
ctx, cancel := getContextWithCancel()
defer cancel()
pip, err = az.PublicIPAddressesClient.Get(ctx, pipResourceGroup, *pip.Name, "")
if err != nil {
return nil, err
}

View File

@@ -215,9 +215,11 @@ func testLoadBalancerServiceDefaultModeSelection(t *testing.T, isInternal bool)
expectedLBName = testClusterName + "-internal"
}
result, _ := az.LoadBalancerClient.List(az.Config.ResourceGroup)
lb := result.Values()[0]
lbCount := len(result.Values())
ctx, cancel := getContextWithCancel()
defer cancel()
result, _ := az.LoadBalancerClient.List(ctx, az.Config.ResourceGroup)
lb := result[0]
lbCount := len(result)
expectedNumOfLB := 1
if lbCount != expectedNumOfLB {
t.Errorf("Unexpected number of LB's: Expected (%d) Found (%d)", expectedNumOfLB, lbCount)
@@ -265,15 +267,17 @@ func testLoadBalancerServiceAutoModeSelection(t *testing.T, isInternal bool) {
// expected is MIN(index, availabilitySetCount)
expectedNumOfLB := int(math.Min(float64(index), float64(availabilitySetCount)))
result, _ := az.LoadBalancerClient.List(az.Config.ResourceGroup)
lbCount := len(result.Values())
ctx, cancel := getContextWithCancel()
defer cancel()
result, _ := az.LoadBalancerClient.List(ctx, az.Config.ResourceGroup)
lbCount := len(result)
if lbCount != expectedNumOfLB {
t.Errorf("Unexpected number of LB's: Expected (%d) Found (%d)", expectedNumOfLB, lbCount)
}
maxRules := 0
minRules := serviceCount
for _, lb := range result.Values() {
for _, lb := range result {
ruleCount := len(*lb.LoadBalancingRules)
if ruleCount < minRules {
minRules = ruleCount
@@ -328,8 +332,10 @@ func testLoadBalancerServicesSpecifiedSelection(t *testing.T, isInternal bool) {
// expected is MIN(index, 2)
expectedNumOfLB := int(math.Min(float64(index), float64(2)))
result, _ := az.LoadBalancerClient.List(az.Config.ResourceGroup)
lbCount := len(result.Values())
ctx, cancel := getContextWithCancel()
defer cancel()
result, _ := az.LoadBalancerClient.List(ctx, az.Config.ResourceGroup)
lbCount := len(result)
if lbCount != expectedNumOfLB {
t.Errorf("Unexpected number of LB's: Expected (%d) Found (%d)", expectedNumOfLB, lbCount)
}
@@ -366,8 +372,10 @@ func testLoadBalancerMaxRulesServices(t *testing.T, isInternal bool) {
// expected is MIN(index, az.Config.MaximumLoadBalancerRuleCount)
expectedNumOfLBRules := int(math.Min(float64(index), float64(az.Config.MaximumLoadBalancerRuleCount)))
result, _ := az.LoadBalancerClient.List(az.Config.ResourceGroup)
lbCount := len(result.Values())
ctx, cancel := getContextWithCancel()
defer cancel()
result, _ := az.LoadBalancerClient.List(ctx, az.Config.ResourceGroup)
lbCount := len(result)
if lbCount != expectedNumOfLBRules {
t.Errorf("Unexpected number of LB's: Expected (%d) Found (%d)", expectedNumOfLBRules, lbCount)
}
@@ -436,8 +444,10 @@ func testLoadBalancerServiceAutoModeDeleteSelection(t *testing.T, isInternal boo
// expected is MIN(index, availabilitySetCount)
expectedNumOfLB := int(math.Min(float64(index), float64(availabilitySetCount)))
result, _ := az.LoadBalancerClient.List(az.Config.ResourceGroup)
lbCount := len(result.Values())
ctx, cancel := getContextWithCancel()
defer cancel()
result, _ := az.LoadBalancerClient.List(ctx, az.Config.ResourceGroup)
lbCount := len(result)
if lbCount != expectedNumOfLB {
t.Errorf("Unexpected number of LB's: Expected (%d) Found (%d)", expectedNumOfLB, lbCount)
}

View File

@@ -104,7 +104,9 @@ func (az *Cloud) getPublicIPAddress(pipResourceGroup string, pipName string) (pi
}
var realErr error
pip, err = az.PublicIPAddressesClient.Get(resourceGroup, pipName, "")
ctx, cancel := getContextWithCancel()
defer cancel()
pip, err = az.PublicIPAddressesClient.Get(ctx, resourceGroup, pipName, "")
exists, realErr = checkResourceExistsFromError(err)
if realErr != nil {
return pip, false, realErr
@@ -194,7 +196,10 @@ func (az *Cloud) newVMCache() (*timedCache, error) {
func (az *Cloud) newLBCache() (*timedCache, error) {
getter := func(key string) (interface{}, error) {
lb, err := az.LoadBalancerClient.Get(az.ResourceGroup, key, "")
ctx, cancel := getContextWithCancel()
defer cancel()
lb, err := az.LoadBalancerClient.Get(ctx, az.ResourceGroup, key, "")
exists, realErr := checkResourceExistsFromError(err)
if realErr != nil {
return nil, realErr