Merge pull request #67604 from feiskyer/cross-rg-nodes
Automatic merge from submit-queue (batch tested with PRs 66980, 67604, 67741, 67715). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>. Add support of Azure cross resource group nodes **What this PR does / why we need it**: Part of feature [Cross resource group nodes](https://github.com/kubernetes/features/issues/604). This PR adds support of Azure cross resource group nodes that are labeled with `kubernetes.azure.com/resource-group=<rg-name>` and `alpha.service-controller.kubernetes.io/exclude-balancer=true` **Which issue(s) this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close the issue(s) when PR gets merged)*: Fixes # **Special notes for your reviewer**: See designs [here](https://github.com/kubernetes/community/pull/2479). **Release note**: ```release-note Azure cloud provider now supports cross resource group nodes that are labeled with `kubernetes.azure.com/resource-group=<rg-name>` and `alpha.service-controller.kubernetes.io/exclude-balancer=true` ``` /sig azure /kind feature
This commit is contained in:
commit
82924e71cd
@ -58,6 +58,9 @@ const (
|
|||||||
|
|
||||||
loadBalancerSkuBasic = "basic"
|
loadBalancerSkuBasic = "basic"
|
||||||
loadBalancerSkuStandard = "standard"
|
loadBalancerSkuStandard = "standard"
|
||||||
|
|
||||||
|
externalResourceGroupLabel = "kubernetes.azure.com/resource-group"
|
||||||
|
managedByAzureLabel = "kubernetes.azure.com/managed"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@ -156,11 +159,16 @@ type Cloud struct {
|
|||||||
metadata *InstanceMetadata
|
metadata *InstanceMetadata
|
||||||
vmSet VMSet
|
vmSet VMSet
|
||||||
|
|
||||||
// Lock for access to nodeZones
|
// Lock for access to node caches
|
||||||
nodeZonesLock sync.Mutex
|
nodeCachesLock sync.Mutex
|
||||||
// nodeZones is a mapping from Zone to a sets.String of Node's names in the Zone
|
// nodeZones is a mapping from Zone to a sets.String of Node's names in the Zone
|
||||||
// it is updated by the nodeInformer
|
// it is updated by the nodeInformer
|
||||||
nodeZones map[string]sets.String
|
nodeZones map[string]sets.String
|
||||||
|
// nodeResourceGroups holds nodes external resource groups
|
||||||
|
nodeResourceGroups map[string]string
|
||||||
|
// unmanagedNodes holds a list of nodes not managed by Azure cloud provider.
|
||||||
|
unmanagedNodes sets.String
|
||||||
|
// nodeInformerSynced is for determining if the informer has synced.
|
||||||
nodeInformerSynced cache.InformerSynced
|
nodeInformerSynced cache.InformerSynced
|
||||||
|
|
||||||
// Clients for vmss.
|
// Clients for vmss.
|
||||||
@ -254,9 +262,11 @@ func NewCloud(configReader io.Reader) (cloudprovider.Interface, error) {
|
|||||||
rateLimiterWriter: operationPollRateLimiterWrite,
|
rateLimiterWriter: operationPollRateLimiterWrite,
|
||||||
}
|
}
|
||||||
az := Cloud{
|
az := Cloud{
|
||||||
Config: *config,
|
Config: *config,
|
||||||
Environment: *env,
|
Environment: *env,
|
||||||
nodeZones: map[string]sets.String{},
|
nodeZones: map[string]sets.String{},
|
||||||
|
nodeResourceGroups: map[string]string{},
|
||||||
|
unmanagedNodes: sets.NewString(),
|
||||||
|
|
||||||
DisksClient: newAzDisksClient(azClientConfig),
|
DisksClient: newAzDisksClient(azClientConfig),
|
||||||
RoutesClient: newAzRoutesClient(azClientConfig),
|
RoutesClient: newAzRoutesClient(azClientConfig),
|
||||||
@ -294,7 +304,7 @@ func NewCloud(configReader io.Reader) (cloudprovider.Interface, error) {
|
|||||||
Duration: time.Duration(az.CloudProviderBackoffDuration) * time.Second,
|
Duration: time.Duration(az.CloudProviderBackoffDuration) * time.Second,
|
||||||
Jitter: az.CloudProviderBackoffJitter,
|
Jitter: az.CloudProviderBackoffJitter,
|
||||||
}
|
}
|
||||||
glog.V(2).Infof("Azure cloudprovider using retry backoff: retries=%d, exponent=%f, duration=%d, jitter=%f",
|
glog.V(2).Infof("Azure cloudprovider using try backoff: retries=%d, exponent=%f, duration=%d, jitter=%f",
|
||||||
az.CloudProviderBackoffRetries,
|
az.CloudProviderBackoffRetries,
|
||||||
az.CloudProviderBackoffExponent,
|
az.CloudProviderBackoffExponent,
|
||||||
az.CloudProviderBackoffDuration,
|
az.CloudProviderBackoffDuration,
|
||||||
@ -449,7 +459,7 @@ func (az *Cloud) SetInformers(informerFactory informers.SharedInformerFactory) {
|
|||||||
nodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
nodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||||
AddFunc: func(obj interface{}) {
|
AddFunc: func(obj interface{}) {
|
||||||
node := obj.(*v1.Node)
|
node := obj.(*v1.Node)
|
||||||
az.updateNodeZones(nil, node)
|
az.updateNodeCaches(nil, node)
|
||||||
},
|
},
|
||||||
UpdateFunc: func(prev, obj interface{}) {
|
UpdateFunc: func(prev, obj interface{}) {
|
||||||
prevNode := prev.(*v1.Node)
|
prevNode := prev.(*v1.Node)
|
||||||
@ -458,7 +468,7 @@ func (az *Cloud) SetInformers(informerFactory informers.SharedInformerFactory) {
|
|||||||
prevNode.Labels[kubeletapis.LabelZoneFailureDomain] {
|
prevNode.Labels[kubeletapis.LabelZoneFailureDomain] {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
az.updateNodeZones(prevNode, newNode)
|
az.updateNodeCaches(prevNode, newNode)
|
||||||
},
|
},
|
||||||
DeleteFunc: func(obj interface{}) {
|
DeleteFunc: func(obj interface{}) {
|
||||||
node, isNode := obj.(*v1.Node)
|
node, isNode := obj.(*v1.Node)
|
||||||
@ -476,16 +486,19 @@ func (az *Cloud) SetInformers(informerFactory informers.SharedInformerFactory) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
az.updateNodeZones(node, nil)
|
az.updateNodeCaches(node, nil)
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
az.nodeInformerSynced = nodeInformer.HasSynced
|
az.nodeInformerSynced = nodeInformer.HasSynced
|
||||||
}
|
}
|
||||||
|
|
||||||
func (az *Cloud) updateNodeZones(prevNode, newNode *v1.Node) {
|
// updateNodeCaches updates local cache for node's zones and external resource groups.
|
||||||
az.nodeZonesLock.Lock()
|
func (az *Cloud) updateNodeCaches(prevNode, newNode *v1.Node) {
|
||||||
defer az.nodeZonesLock.Unlock()
|
az.nodeCachesLock.Lock()
|
||||||
|
defer az.nodeCachesLock.Unlock()
|
||||||
|
|
||||||
if prevNode != nil {
|
if prevNode != nil {
|
||||||
|
// Remove from nodeZones cache.
|
||||||
prevZone, ok := prevNode.ObjectMeta.Labels[kubeletapis.LabelZoneFailureDomain]
|
prevZone, ok := prevNode.ObjectMeta.Labels[kubeletapis.LabelZoneFailureDomain]
|
||||||
if ok && az.isAvailabilityZone(prevZone) {
|
if ok && az.isAvailabilityZone(prevZone) {
|
||||||
az.nodeZones[prevZone].Delete(prevNode.ObjectMeta.Name)
|
az.nodeZones[prevZone].Delete(prevNode.ObjectMeta.Name)
|
||||||
@ -493,8 +506,22 @@ func (az *Cloud) updateNodeZones(prevNode, newNode *v1.Node) {
|
|||||||
az.nodeZones[prevZone] = nil
|
az.nodeZones[prevZone] = nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Remove from nodeResourceGroups cache.
|
||||||
|
_, ok = prevNode.ObjectMeta.Labels[externalResourceGroupLabel]
|
||||||
|
if ok {
|
||||||
|
delete(az.nodeResourceGroups, prevNode.ObjectMeta.Name)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Remove from unmanagedNodes cache.
|
||||||
|
managed, ok := prevNode.ObjectMeta.Labels[managedByAzureLabel]
|
||||||
|
if ok && managed == "false" {
|
||||||
|
az.unmanagedNodes.Delete(prevNode.ObjectMeta.Name)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if newNode != nil {
|
if newNode != nil {
|
||||||
|
// Add to nodeZones cache.
|
||||||
newZone, ok := newNode.ObjectMeta.Labels[kubeletapis.LabelZoneFailureDomain]
|
newZone, ok := newNode.ObjectMeta.Labels[kubeletapis.LabelZoneFailureDomain]
|
||||||
if ok && az.isAvailabilityZone(newZone) {
|
if ok && az.isAvailabilityZone(newZone) {
|
||||||
if az.nodeZones[newZone] == nil {
|
if az.nodeZones[newZone] == nil {
|
||||||
@ -502,6 +529,18 @@ func (az *Cloud) updateNodeZones(prevNode, newNode *v1.Node) {
|
|||||||
}
|
}
|
||||||
az.nodeZones[newZone].Insert(newNode.ObjectMeta.Name)
|
az.nodeZones[newZone].Insert(newNode.ObjectMeta.Name)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Add to nodeResourceGroups cache.
|
||||||
|
newRG, ok := newNode.ObjectMeta.Labels[externalResourceGroupLabel]
|
||||||
|
if ok && len(newRG) > 0 {
|
||||||
|
az.nodeResourceGroups[newNode.ObjectMeta.Name] = newRG
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add to unmanagedNodes cache.
|
||||||
|
managed, ok := newNode.ObjectMeta.Labels[managedByAzureLabel]
|
||||||
|
if ok && managed == "false" {
|
||||||
|
az.unmanagedNodes.Insert(newNode.ObjectMeta.Name)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -511,8 +550,8 @@ func (az *Cloud) GetActiveZones() (sets.String, error) {
|
|||||||
return nil, fmt.Errorf("Azure cloud provider doesn't have informers set")
|
return nil, fmt.Errorf("Azure cloud provider doesn't have informers set")
|
||||||
}
|
}
|
||||||
|
|
||||||
az.nodeZonesLock.Lock()
|
az.nodeCachesLock.Lock()
|
||||||
defer az.nodeZonesLock.Unlock()
|
defer az.nodeCachesLock.Unlock()
|
||||||
if !az.nodeInformerSynced() {
|
if !az.nodeInformerSynced() {
|
||||||
return nil, fmt.Errorf("node informer is not synced when trying to GetActiveZones")
|
return nil, fmt.Errorf("node informer is not synced when trying to GetActiveZones")
|
||||||
}
|
}
|
||||||
@ -530,3 +569,76 @@ func (az *Cloud) GetActiveZones() (sets.String, error) {
|
|||||||
func (az *Cloud) GetLocation() string {
|
func (az *Cloud) GetLocation() string {
|
||||||
return az.Location
|
return az.Location
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetNodeResourceGroup gets resource group for given node.
|
||||||
|
func (az *Cloud) GetNodeResourceGroup(nodeName string) (string, error) {
|
||||||
|
// Kubelet won't set az.nodeInformerSynced, always return configured resourceGroup.
|
||||||
|
if az.nodeInformerSynced == nil {
|
||||||
|
return az.ResourceGroup, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
az.nodeCachesLock.Lock()
|
||||||
|
defer az.nodeCachesLock.Unlock()
|
||||||
|
if !az.nodeInformerSynced() {
|
||||||
|
return "", fmt.Errorf("node informer is not synced when trying to GetNodeResourceGroup")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Return external resource group if it has been cached.
|
||||||
|
if cachedRG, ok := az.nodeResourceGroups[nodeName]; ok {
|
||||||
|
return cachedRG, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Return resource group from cloud provider options.
|
||||||
|
return az.ResourceGroup, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetResourceGroups returns a set of resource groups that all nodes are running on.
|
||||||
|
func (az *Cloud) GetResourceGroups() (sets.String, error) {
|
||||||
|
// Kubelet won't set az.nodeInformerSynced, always return configured resourceGroup.
|
||||||
|
if az.nodeInformerSynced == nil {
|
||||||
|
return sets.NewString(az.ResourceGroup), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
az.nodeCachesLock.Lock()
|
||||||
|
defer az.nodeCachesLock.Unlock()
|
||||||
|
if !az.nodeInformerSynced() {
|
||||||
|
return nil, fmt.Errorf("node informer is not synced when trying to GetResourceGroups")
|
||||||
|
}
|
||||||
|
|
||||||
|
resourceGroups := sets.NewString(az.ResourceGroup)
|
||||||
|
for _, rg := range az.nodeResourceGroups {
|
||||||
|
resourceGroups.Insert(rg)
|
||||||
|
}
|
||||||
|
|
||||||
|
return resourceGroups, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetUnmanagedNodes returns a list of nodes not managed by Azure cloud provider (e.g. on-prem nodes).
|
||||||
|
func (az *Cloud) GetUnmanagedNodes() (sets.String, error) {
|
||||||
|
// Kubelet won't set az.nodeInformerSynced, always return nil.
|
||||||
|
if az.nodeInformerSynced == nil {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
az.nodeCachesLock.Lock()
|
||||||
|
defer az.nodeCachesLock.Unlock()
|
||||||
|
if !az.nodeInformerSynced() {
|
||||||
|
return nil, fmt.Errorf("node informer is not synced when trying to GetUnmanagedNodes")
|
||||||
|
}
|
||||||
|
|
||||||
|
return sets.NewString(az.unmanagedNodes.List()...), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// ShouldNodeExcludedFromLoadBalancer returns true if node is unmanaged or in external resource group.
|
||||||
|
func (az *Cloud) ShouldNodeExcludedFromLoadBalancer(node *v1.Node) bool {
|
||||||
|
labels := node.ObjectMeta.Labels
|
||||||
|
if rg, ok := labels[externalResourceGroupLabel]; ok && rg != az.ResourceGroup {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
if managed, ok := labels[managedByAzureLabel]; ok && managed == "false" {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
@ -69,20 +69,20 @@ func (az *Cloud) GetVirtualMachineWithRetry(name types.NodeName) (compute.Virtua
|
|||||||
}
|
}
|
||||||
|
|
||||||
// VirtualMachineClientListWithRetry invokes az.VirtualMachinesClient.List with exponential backoff retry
|
// VirtualMachineClientListWithRetry invokes az.VirtualMachinesClient.List with exponential backoff retry
|
||||||
func (az *Cloud) VirtualMachineClientListWithRetry() ([]compute.VirtualMachine, error) {
|
func (az *Cloud) VirtualMachineClientListWithRetry(resourceGroup string) ([]compute.VirtualMachine, error) {
|
||||||
allNodes := []compute.VirtualMachine{}
|
allNodes := []compute.VirtualMachine{}
|
||||||
err := wait.ExponentialBackoff(az.requestBackoff(), func() (bool, error) {
|
err := wait.ExponentialBackoff(az.requestBackoff(), func() (bool, error) {
|
||||||
var retryErr error
|
var retryErr error
|
||||||
ctx, cancel := getContextWithCancel()
|
ctx, cancel := getContextWithCancel()
|
||||||
defer cancel()
|
defer cancel()
|
||||||
allNodes, retryErr = az.VirtualMachinesClient.List(ctx, az.ResourceGroup)
|
allNodes, retryErr = az.VirtualMachinesClient.List(ctx, resourceGroup)
|
||||||
if retryErr != nil {
|
if retryErr != nil {
|
||||||
glog.Errorf("VirtualMachinesClient.List(%v) - backoff: failure, will retry,err=%v",
|
glog.Errorf("VirtualMachinesClient.List(%v) - backoff: failure, will retry,err=%v",
|
||||||
az.ResourceGroup,
|
resourceGroup,
|
||||||
retryErr)
|
retryErr)
|
||||||
return false, retryErr
|
return false, retryErr
|
||||||
}
|
}
|
||||||
glog.V(2).Infof("VirtualMachinesClient.List(%v) - backoff: success", az.ResourceGroup)
|
glog.V(2).Infof("VirtualMachinesClient.List(%v) - backoff: success", resourceGroup)
|
||||||
return true, nil
|
return true, nil
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -281,12 +281,12 @@ func (az *Cloud) DeleteRouteWithRetry(routeName string) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// CreateOrUpdateVMWithRetry invokes az.VirtualMachinesClient.CreateOrUpdate with exponential backoff retry
|
// CreateOrUpdateVMWithRetry invokes az.VirtualMachinesClient.CreateOrUpdate with exponential backoff retry
|
||||||
func (az *Cloud) CreateOrUpdateVMWithRetry(vmName string, newVM compute.VirtualMachine) error {
|
func (az *Cloud) CreateOrUpdateVMWithRetry(resourceGroup, vmName string, newVM compute.VirtualMachine) error {
|
||||||
return wait.ExponentialBackoff(az.requestBackoff(), func() (bool, error) {
|
return wait.ExponentialBackoff(az.requestBackoff(), func() (bool, error) {
|
||||||
ctx, cancel := getContextWithCancel()
|
ctx, cancel := getContextWithCancel()
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
resp, err := az.VirtualMachinesClient.CreateOrUpdate(ctx, az.ResourceGroup, vmName, newVM)
|
resp, err := az.VirtualMachinesClient.CreateOrUpdate(ctx, resourceGroup, vmName, newVM)
|
||||||
glog.V(10).Infof("VirtualMachinesClient.CreateOrUpdate(%s): end", vmName)
|
glog.V(10).Infof("VirtualMachinesClient.CreateOrUpdate(%s): end", vmName)
|
||||||
return processHTTPRetryResponse(resp, err)
|
return processHTTPRetryResponse(resp, err)
|
||||||
})
|
})
|
||||||
|
@ -34,6 +34,12 @@ func (as *availabilitySet) AttachDisk(isManagedDisk bool, diskName, diskURI stri
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
vmName := mapNodeNameToVMName(nodeName)
|
||||||
|
nodeResourceGroup, err := as.GetNodeResourceGroup(vmName)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
disks := *vm.StorageProfile.DataDisks
|
disks := *vm.StorageProfile.DataDisks
|
||||||
if isManagedDisk {
|
if isManagedDisk {
|
||||||
disks = append(disks,
|
disks = append(disks,
|
||||||
@ -67,17 +73,16 @@ func (as *availabilitySet) AttachDisk(isManagedDisk bool, diskName, diskURI stri
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
vmName := mapNodeNameToVMName(nodeName)
|
glog.V(2).Infof("azureDisk - update(%s): vm(%s) - attach disk", nodeResourceGroup, vmName)
|
||||||
glog.V(2).Infof("azureDisk - update(%s): vm(%s) - attach disk", as.resourceGroup, vmName)
|
|
||||||
ctx, cancel := getContextWithCancel()
|
ctx, cancel := getContextWithCancel()
|
||||||
defer cancel()
|
defer cancel()
|
||||||
resp, err := as.VirtualMachinesClient.CreateOrUpdate(ctx, as.resourceGroup, vmName, newVM)
|
resp, err := as.VirtualMachinesClient.CreateOrUpdate(ctx, nodeResourceGroup, vmName, newVM)
|
||||||
if as.CloudProviderBackoff && shouldRetryHTTPRequest(resp, err) {
|
if as.CloudProviderBackoff && shouldRetryHTTPRequest(resp, err) {
|
||||||
glog.V(2).Infof("azureDisk - update(%s) backing off: vm(%s)", as.resourceGroup, vmName)
|
glog.V(2).Infof("azureDisk - update(%s) backing off: vm(%s)", nodeResourceGroup, vmName)
|
||||||
retryErr := as.CreateOrUpdateVMWithRetry(vmName, newVM)
|
retryErr := as.CreateOrUpdateVMWithRetry(nodeResourceGroup, vmName, newVM)
|
||||||
if retryErr != nil {
|
if retryErr != nil {
|
||||||
err = retryErr
|
err = retryErr
|
||||||
glog.V(2).Infof("azureDisk - update(%s) abort backoff: vm(%s)", as.resourceGroup, vmName)
|
glog.V(2).Infof("azureDisk - update(%s) abort backoff: vm(%s)", nodeResourceGroup, vmName)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -106,6 +111,12 @@ func (as *availabilitySet) DetachDiskByName(diskName, diskURI string, nodeName t
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
vmName := mapNodeNameToVMName(nodeName)
|
||||||
|
nodeResourceGroup, err := as.GetNodeResourceGroup(vmName)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
disks := *vm.StorageProfile.DataDisks
|
disks := *vm.StorageProfile.DataDisks
|
||||||
bFoundDisk := false
|
bFoundDisk := false
|
||||||
for i, disk := range disks {
|
for i, disk := range disks {
|
||||||
@ -132,17 +143,16 @@ func (as *availabilitySet) DetachDiskByName(diskName, diskURI string, nodeName t
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
vmName := mapNodeNameToVMName(nodeName)
|
glog.V(2).Infof("azureDisk - update(%s): vm(%s) - detach disk", nodeResourceGroup, vmName)
|
||||||
glog.V(2).Infof("azureDisk - update(%s): vm(%s) - detach disk", as.resourceGroup, vmName)
|
|
||||||
ctx, cancel := getContextWithCancel()
|
ctx, cancel := getContextWithCancel()
|
||||||
defer cancel()
|
defer cancel()
|
||||||
resp, err := as.VirtualMachinesClient.CreateOrUpdate(ctx, as.resourceGroup, vmName, newVM)
|
resp, err := as.VirtualMachinesClient.CreateOrUpdate(ctx, nodeResourceGroup, vmName, newVM)
|
||||||
if as.CloudProviderBackoff && shouldRetryHTTPRequest(resp, err) {
|
if as.CloudProviderBackoff && shouldRetryHTTPRequest(resp, err) {
|
||||||
glog.V(2).Infof("azureDisk - update(%s) backing off: vm(%s)", as.resourceGroup, vmName)
|
glog.V(2).Infof("azureDisk - update(%s) backing off: vm(%s)", nodeResourceGroup, vmName)
|
||||||
retryErr := as.CreateOrUpdateVMWithRetry(vmName, newVM)
|
retryErr := as.CreateOrUpdateVMWithRetry(nodeResourceGroup, vmName, newVM)
|
||||||
if retryErr != nil {
|
if retryErr != nil {
|
||||||
err = retryErr
|
err = retryErr
|
||||||
glog.V(2).Infof("azureDisk - update(%s) abort backoff: vm(%s)", as.ResourceGroup, vmName)
|
glog.V(2).Infof("azureDisk - update(%s) abort backoff: vm(%s)", nodeResourceGroup, vmName)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -29,7 +29,13 @@ import (
|
|||||||
// AttachDisk attaches a vhd to vm
|
// AttachDisk attaches a vhd to vm
|
||||||
// the vhd must exist, can be identified by diskName, diskURI, and lun.
|
// the vhd must exist, can be identified by diskName, diskURI, and lun.
|
||||||
func (ss *scaleSet) AttachDisk(isManagedDisk bool, diskName, diskURI string, nodeName types.NodeName, lun int32, cachingMode compute.CachingTypes) error {
|
func (ss *scaleSet) AttachDisk(isManagedDisk bool, diskName, diskURI string, nodeName types.NodeName, lun int32, cachingMode compute.CachingTypes) error {
|
||||||
ssName, instanceID, vm, err := ss.getVmssVM(string(nodeName))
|
vmName := mapNodeNameToVMName(nodeName)
|
||||||
|
ssName, instanceID, vm, err := ss.getVmssVM(vmName)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
nodeResourceGroup, err := ss.GetNodeResourceGroup(vmName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -65,14 +71,14 @@ func (ss *scaleSet) AttachDisk(isManagedDisk bool, diskName, diskURI string, nod
|
|||||||
|
|
||||||
ctx, cancel := getContextWithCancel()
|
ctx, cancel := getContextWithCancel()
|
||||||
defer cancel()
|
defer cancel()
|
||||||
glog.V(2).Infof("azureDisk - update(%s): vm(%s) - attach disk", ss.resourceGroup, nodeName)
|
glog.V(2).Infof("azureDisk - update(%s): vm(%s) - attach disk", nodeResourceGroup, nodeName)
|
||||||
resp, err := ss.VirtualMachineScaleSetVMsClient.Update(ctx, ss.resourceGroup, ssName, instanceID, vm)
|
resp, err := ss.VirtualMachineScaleSetVMsClient.Update(ctx, nodeResourceGroup, ssName, instanceID, vm)
|
||||||
if ss.CloudProviderBackoff && shouldRetryHTTPRequest(resp, err) {
|
if ss.CloudProviderBackoff && shouldRetryHTTPRequest(resp, err) {
|
||||||
glog.V(2).Infof("azureDisk - update(%s) backing off: vm(%s)", ss.resourceGroup, nodeName)
|
glog.V(2).Infof("azureDisk - update(%s) backing off: vm(%s)", nodeResourceGroup, nodeName)
|
||||||
retryErr := ss.UpdateVmssVMWithRetry(ctx, ss.resourceGroup, ssName, instanceID, vm)
|
retryErr := ss.UpdateVmssVMWithRetry(ctx, nodeResourceGroup, ssName, instanceID, vm)
|
||||||
if retryErr != nil {
|
if retryErr != nil {
|
||||||
err = retryErr
|
err = retryErr
|
||||||
glog.V(2).Infof("azureDisk - update(%s) abort backoff: vm(%s)", ss.resourceGroup, nodeName)
|
glog.V(2).Infof("azureDisk - update(%s) abort backoff: vm(%s)", nodeResourceGroup, nodeName)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -85,7 +91,8 @@ func (ss *scaleSet) AttachDisk(isManagedDisk bool, diskName, diskURI string, nod
|
|||||||
} else {
|
} else {
|
||||||
glog.V(4).Info("azureDisk - azure attach succeeded")
|
glog.V(4).Info("azureDisk - azure attach succeeded")
|
||||||
// Invalidate the cache right after updating
|
// Invalidate the cache right after updating
|
||||||
ss.vmssVMCache.Delete(ss.makeVmssVMName(ssName, instanceID))
|
key := buildVmssCacheKey(nodeResourceGroup, ss.makeVmssVMName(ssName, instanceID))
|
||||||
|
ss.vmssVMCache.Delete(key)
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -93,7 +100,13 @@ func (ss *scaleSet) AttachDisk(isManagedDisk bool, diskName, diskURI string, nod
|
|||||||
// DetachDiskByName detaches a vhd from host
|
// DetachDiskByName detaches a vhd from host
|
||||||
// the vhd can be identified by diskName or diskURI
|
// the vhd can be identified by diskName or diskURI
|
||||||
func (ss *scaleSet) DetachDiskByName(diskName, diskURI string, nodeName types.NodeName) error {
|
func (ss *scaleSet) DetachDiskByName(diskName, diskURI string, nodeName types.NodeName) error {
|
||||||
ssName, instanceID, vm, err := ss.getVmssVM(string(nodeName))
|
vmName := mapNodeNameToVMName(nodeName)
|
||||||
|
ssName, instanceID, vm, err := ss.getVmssVM(vmName)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
nodeResourceGroup, err := ss.GetNodeResourceGroup(vmName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -122,14 +135,14 @@ func (ss *scaleSet) DetachDiskByName(diskName, diskURI string, nodeName types.No
|
|||||||
vm.StorageProfile.DataDisks = &disks
|
vm.StorageProfile.DataDisks = &disks
|
||||||
ctx, cancel := getContextWithCancel()
|
ctx, cancel := getContextWithCancel()
|
||||||
defer cancel()
|
defer cancel()
|
||||||
glog.V(2).Infof("azureDisk - update(%s): vm(%s) - detach disk", ss.resourceGroup, nodeName)
|
glog.V(2).Infof("azureDisk - update(%s): vm(%s) - detach disk", nodeResourceGroup, nodeName)
|
||||||
resp, err := ss.VirtualMachineScaleSetVMsClient.Update(ctx, ss.resourceGroup, ssName, instanceID, vm)
|
resp, err := ss.VirtualMachineScaleSetVMsClient.Update(ctx, nodeResourceGroup, ssName, instanceID, vm)
|
||||||
if ss.CloudProviderBackoff && shouldRetryHTTPRequest(resp, err) {
|
if ss.CloudProviderBackoff && shouldRetryHTTPRequest(resp, err) {
|
||||||
glog.V(2).Infof("azureDisk - update(%s) backing off: vm(%s)", ss.resourceGroup, nodeName)
|
glog.V(2).Infof("azureDisk - update(%s) backing off: vm(%s)", nodeResourceGroup, nodeName)
|
||||||
retryErr := ss.UpdateVmssVMWithRetry(ctx, ss.resourceGroup, ssName, instanceID, vm)
|
retryErr := ss.UpdateVmssVMWithRetry(ctx, nodeResourceGroup, ssName, instanceID, vm)
|
||||||
if retryErr != nil {
|
if retryErr != nil {
|
||||||
err = retryErr
|
err = retryErr
|
||||||
glog.V(2).Infof("azureDisk - update(%s) abort backoff: vm(%s)", ss.resourceGroup, nodeName)
|
glog.V(2).Infof("azureDisk - update(%s) abort backoff: vm(%s)", nodeResourceGroup, nodeName)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -137,7 +150,8 @@ func (ss *scaleSet) DetachDiskByName(diskName, diskURI string, nodeName types.No
|
|||||||
} else {
|
} else {
|
||||||
glog.V(4).Info("azureDisk - azure detach succeeded")
|
glog.V(4).Info("azureDisk - azure detach succeeded")
|
||||||
// Invalidate the cache right after updating
|
// Invalidate the cache right after updating
|
||||||
ss.vmssVMCache.Delete(ss.makeVmssVMName(ssName, instanceID))
|
key := buildVmssCacheKey(nodeResourceGroup, ss.makeVmssVMName(ssName, instanceID))
|
||||||
|
ss.vmssVMCache.Delete(key)
|
||||||
}
|
}
|
||||||
|
|
||||||
return err
|
return err
|
||||||
|
@ -171,9 +171,15 @@ func (az *Cloud) InstanceID(ctx context.Context, name types.NodeName) (string, e
|
|||||||
return az.vmSet.GetInstanceIDByNodeName(nodeName)
|
return az.vmSet.GetInstanceIDByNodeName(nodeName)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Get resource group name.
|
||||||
|
resourceGroup, err := az.metadata.Text("instance/compute/resourceGroupName")
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
|
||||||
// Compose instanceID based on nodeName for standard instance.
|
// Compose instanceID based on nodeName for standard instance.
|
||||||
if az.VMType == vmTypeStandard {
|
if az.VMType == vmTypeStandard {
|
||||||
return az.getStandardMachineID(nodeName), nil
|
return az.getStandardMachineID(resourceGroup, nodeName), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get scale set name and instanceID from vmName for vmss.
|
// Get scale set name and instanceID from vmName for vmss.
|
||||||
@ -181,12 +187,12 @@ func (az *Cloud) InstanceID(ctx context.Context, name types.NodeName) (string, e
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
if err == ErrorNotVmssInstance {
|
if err == ErrorNotVmssInstance {
|
||||||
// Compose machineID for standard Node.
|
// Compose machineID for standard Node.
|
||||||
return az.getStandardMachineID(nodeName), nil
|
return az.getStandardMachineID(resourceGroup, nodeName), nil
|
||||||
}
|
}
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
// Compose instanceID based on ssName and instanceID for vmss instance.
|
// Compose instanceID based on ssName and instanceID for vmss instance.
|
||||||
return az.getVmssMachineID(ssName, instanceID), nil
|
return az.getVmssMachineID(resourceGroup, ssName, instanceID), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return az.vmSet.GetInstanceIDByNodeName(nodeName)
|
return az.vmSet.GetInstanceIDByNodeName(nodeName)
|
||||||
|
@ -62,22 +62,24 @@ const (
|
|||||||
var errNotInVMSet = errors.New("vm is not in the vmset")
|
var errNotInVMSet = errors.New("vm is not in the vmset")
|
||||||
var providerIDRE = regexp.MustCompile(`^` + CloudProviderName + `://(?:.*)/Microsoft.Compute/virtualMachines/(.+)$`)
|
var providerIDRE = regexp.MustCompile(`^` + CloudProviderName + `://(?:.*)/Microsoft.Compute/virtualMachines/(.+)$`)
|
||||||
var backendPoolIDRE = regexp.MustCompile(`^/subscriptions/(?:.*)/resourceGroups/(?:.*)/providers/Microsoft.Network/loadBalancers/(.+)/backendAddressPools/(?:.*)`)
|
var backendPoolIDRE = regexp.MustCompile(`^/subscriptions/(?:.*)/resourceGroups/(?:.*)/providers/Microsoft.Network/loadBalancers/(.+)/backendAddressPools/(?:.*)`)
|
||||||
|
var nicResourceGroupRE = regexp.MustCompile(`.*/subscriptions/(?:.*)/resourceGroups/(.+)/providers/Microsoft.Network/networkInterfaces/(?:.*)`)
|
||||||
|
var publicIPResourceGroupRE = regexp.MustCompile(`.*/subscriptions/(?:.*)/resourceGroups/(.+)/providers/Microsoft.Network/publicIPAddresses/(?:.*)`)
|
||||||
|
|
||||||
// getStandardMachineID returns the full identifier of a virtual machine.
|
// getStandardMachineID returns the full identifier of a virtual machine.
|
||||||
func (az *Cloud) getStandardMachineID(machineName string) string {
|
func (az *Cloud) getStandardMachineID(resourceGroup, machineName string) string {
|
||||||
return fmt.Sprintf(
|
return fmt.Sprintf(
|
||||||
machineIDTemplate,
|
machineIDTemplate,
|
||||||
az.SubscriptionID,
|
az.SubscriptionID,
|
||||||
az.ResourceGroup,
|
resourceGroup,
|
||||||
machineName)
|
machineName)
|
||||||
}
|
}
|
||||||
|
|
||||||
// returns the full identifier of an availabilitySet
|
// returns the full identifier of an availabilitySet
|
||||||
func (az *Cloud) getAvailabilitySetID(availabilitySetName string) string {
|
func (az *Cloud) getAvailabilitySetID(resourceGroup, availabilitySetName string) string {
|
||||||
return fmt.Sprintf(
|
return fmt.Sprintf(
|
||||||
availabilitySetIDTemplate,
|
availabilitySetIDTemplate,
|
||||||
az.SubscriptionID,
|
az.SubscriptionID,
|
||||||
az.ResourceGroup,
|
resourceGroup,
|
||||||
availabilitySetName)
|
availabilitySetName)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -306,51 +308,6 @@ func MakeCRC32(str string) string {
|
|||||||
return strconv.FormatUint(uint64(hash), 10)
|
return strconv.FormatUint(uint64(hash), 10)
|
||||||
}
|
}
|
||||||
|
|
||||||
//ExtractVMData : extract dataDisks, storageProfile from a map struct
|
|
||||||
func ExtractVMData(vmData map[string]interface{}) (dataDisks []interface{},
|
|
||||||
storageProfile map[string]interface{},
|
|
||||||
hardwareProfile map[string]interface{}, err error) {
|
|
||||||
props, ok := vmData["properties"].(map[string]interface{})
|
|
||||||
if !ok {
|
|
||||||
return nil, nil, nil, fmt.Errorf("convert vmData(properties) to map error")
|
|
||||||
}
|
|
||||||
|
|
||||||
storageProfile, ok = props["storageProfile"].(map[string]interface{})
|
|
||||||
if !ok {
|
|
||||||
return nil, nil, nil, fmt.Errorf("convert vmData(storageProfile) to map error")
|
|
||||||
}
|
|
||||||
|
|
||||||
hardwareProfile, ok = props["hardwareProfile"].(map[string]interface{})
|
|
||||||
if !ok {
|
|
||||||
return nil, nil, nil, fmt.Errorf("convert vmData(hardwareProfile) to map error")
|
|
||||||
}
|
|
||||||
|
|
||||||
dataDisks, ok = storageProfile["dataDisks"].([]interface{})
|
|
||||||
if !ok {
|
|
||||||
return nil, nil, nil, fmt.Errorf("convert vmData(dataDisks) to map error")
|
|
||||||
}
|
|
||||||
return dataDisks, storageProfile, hardwareProfile, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
//ExtractDiskData : extract provisioningState, diskState from a map struct
|
|
||||||
func ExtractDiskData(diskData interface{}) (provisioningState string, diskState string, err error) {
|
|
||||||
fragment, ok := diskData.(map[string]interface{})
|
|
||||||
if !ok {
|
|
||||||
return "", "", fmt.Errorf("convert diskData to map error")
|
|
||||||
}
|
|
||||||
|
|
||||||
properties, ok := fragment["properties"].(map[string]interface{})
|
|
||||||
if !ok {
|
|
||||||
return "", "", fmt.Errorf("convert diskData(properties) to map error")
|
|
||||||
}
|
|
||||||
|
|
||||||
provisioningState, ok = properties["provisioningState"].(string) // if there is a disk, provisioningState property will be there
|
|
||||||
if ref, ok := properties["diskState"]; ok {
|
|
||||||
diskState = ref.(string)
|
|
||||||
}
|
|
||||||
return provisioningState, diskState, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// availabilitySet implements VMSet interface for Azure availability sets.
|
// availabilitySet implements VMSet interface for Azure availability sets.
|
||||||
type availabilitySet struct {
|
type availabilitySet struct {
|
||||||
*Cloud
|
*Cloud
|
||||||
@ -483,7 +440,7 @@ func (as *availabilitySet) GetIPByNodeName(name string) (string, string, error)
|
|||||||
// getAgentPoolAvailabiliySets lists the virtual machines for the resource group and then builds
|
// getAgentPoolAvailabiliySets lists the virtual machines for the resource group and then builds
|
||||||
// a list of availability sets that match the nodes available to k8s.
|
// a list of availability sets that match the nodes available to k8s.
|
||||||
func (as *availabilitySet) getAgentPoolAvailabiliySets(nodes []*v1.Node) (agentPoolAvailabilitySets *[]string, err error) {
|
func (as *availabilitySet) getAgentPoolAvailabiliySets(nodes []*v1.Node) (agentPoolAvailabilitySets *[]string, err error) {
|
||||||
vms, err := as.VirtualMachineClientListWithRetry()
|
vms, err := as.VirtualMachineClientListWithRetry(as.ResourceGroup)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("as.getNodeAvailabilitySet - VirtualMachineClientListWithRetry failed, err=%v", err)
|
glog.Errorf("as.getNodeAvailabilitySet - VirtualMachineClientListWithRetry failed, err=%v", err)
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -578,6 +535,26 @@ func (as *availabilitySet) GetPrimaryInterface(nodeName string) (network.Interfa
|
|||||||
return as.getPrimaryInterfaceWithVMSet(nodeName, "")
|
return as.getPrimaryInterfaceWithVMSet(nodeName, "")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// extractResourceGroupByNicID extracts the resource group name by nicID.
|
||||||
|
func extractResourceGroupByNicID(nicID string) (string, error) {
|
||||||
|
matches := nicResourceGroupRE.FindStringSubmatch(nicID)
|
||||||
|
if len(matches) != 2 {
|
||||||
|
return "", fmt.Errorf("error of extracting resourceGroup from nicID %q", nicID)
|
||||||
|
}
|
||||||
|
|
||||||
|
return matches[1], nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// extractResourceGroupByPipID extracts the resource group name by publicIP ID.
|
||||||
|
func extractResourceGroupByPipID(pipID string) (string, error) {
|
||||||
|
matches := publicIPResourceGroupRE.FindStringSubmatch(pipID)
|
||||||
|
if len(matches) != 2 {
|
||||||
|
return "", fmt.Errorf("error of extracting resourceGroup from pipID %q", pipID)
|
||||||
|
}
|
||||||
|
|
||||||
|
return matches[1], nil
|
||||||
|
}
|
||||||
|
|
||||||
// getPrimaryInterfaceWithVMSet gets machine primary network interface by node name and vmSet.
|
// getPrimaryInterfaceWithVMSet gets machine primary network interface by node name and vmSet.
|
||||||
func (as *availabilitySet) getPrimaryInterfaceWithVMSet(nodeName, vmSetName string) (network.Interface, error) {
|
func (as *availabilitySet) getPrimaryInterfaceWithVMSet(nodeName, vmSetName string) (network.Interface, error) {
|
||||||
var machine compute.VirtualMachine
|
var machine compute.VirtualMachine
|
||||||
@ -596,6 +573,10 @@ func (as *availabilitySet) getPrimaryInterfaceWithVMSet(nodeName, vmSetName stri
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return network.Interface{}, err
|
return network.Interface{}, err
|
||||||
}
|
}
|
||||||
|
nodeResourceGroup, err := as.GetNodeResourceGroup(nodeName)
|
||||||
|
if err != nil {
|
||||||
|
return network.Interface{}, err
|
||||||
|
}
|
||||||
|
|
||||||
// Check availability set name. Note that vmSetName is empty string when getting
|
// Check availability set name. Note that vmSetName is empty string when getting
|
||||||
// the Node's IP address. While vmSetName is not empty, it should be checked with
|
// the Node's IP address. While vmSetName is not empty, it should be checked with
|
||||||
@ -605,7 +586,7 @@ func (as *availabilitySet) getPrimaryInterfaceWithVMSet(nodeName, vmSetName stri
|
|||||||
// - For standard SKU load balancer, backend could belong to multiple VMAS, so we
|
// - For standard SKU load balancer, backend could belong to multiple VMAS, so we
|
||||||
// don't check vmSet for it.
|
// don't check vmSet for it.
|
||||||
if vmSetName != "" && !as.useStandardLoadBalancer() {
|
if vmSetName != "" && !as.useStandardLoadBalancer() {
|
||||||
expectedAvailabilitySetName := as.getAvailabilitySetID(vmSetName)
|
expectedAvailabilitySetName := as.getAvailabilitySetID(nodeResourceGroup, vmSetName)
|
||||||
if machine.AvailabilitySet == nil || !strings.EqualFold(*machine.AvailabilitySet.ID, expectedAvailabilitySetName) {
|
if machine.AvailabilitySet == nil || !strings.EqualFold(*machine.AvailabilitySet.ID, expectedAvailabilitySetName) {
|
||||||
glog.V(3).Infof(
|
glog.V(3).Infof(
|
||||||
"GetPrimaryInterface: nic (%s) is not in the availabilitySet(%s)", nicName, vmSetName)
|
"GetPrimaryInterface: nic (%s) is not in the availabilitySet(%s)", nicName, vmSetName)
|
||||||
@ -613,9 +594,14 @@ func (as *availabilitySet) getPrimaryInterfaceWithVMSet(nodeName, vmSetName stri
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
nicResourceGroup, err := extractResourceGroupByNicID(primaryNicID)
|
||||||
|
if err != nil {
|
||||||
|
return network.Interface{}, err
|
||||||
|
}
|
||||||
|
|
||||||
ctx, cancel := getContextWithCancel()
|
ctx, cancel := getContextWithCancel()
|
||||||
defer cancel()
|
defer cancel()
|
||||||
nic, err := as.InterfacesClient.Get(ctx, as.ResourceGroup, nicName, "")
|
nic, err := as.InterfacesClient.Get(ctx, nicResourceGroup, nicName, "")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return network.Interface{}, err
|
return network.Interface{}, err
|
||||||
}
|
}
|
||||||
@ -718,6 +704,11 @@ func (as *availabilitySet) EnsureHostsInPool(serviceName string, nodes []*v1.Nod
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if as.ShouldNodeExcludedFromLoadBalancer(node) {
|
||||||
|
glog.V(4).Infof("Excluding unmanaged/external-resource-group node %q", localNodeName)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
f := func() error {
|
f := func() error {
|
||||||
err := as.ensureHostInPool(serviceName, types.NodeName(localNodeName), backendPoolID, vmSetName, isInternal)
|
err := as.ensureHostInPool(serviceName, types.NodeName(localNodeName), backendPoolID, vmSetName, isInternal)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -956,6 +956,8 @@ func getTestCloud() (az *Cloud) {
|
|||||||
},
|
},
|
||||||
nodeZones: map[string]sets.String{},
|
nodeZones: map[string]sets.String{},
|
||||||
nodeInformerSynced: func() bool { return true },
|
nodeInformerSynced: func() bool { return true },
|
||||||
|
nodeResourceGroups: map[string]string{},
|
||||||
|
unmanagedNodes: sets.NewString(),
|
||||||
}
|
}
|
||||||
az.DisksClient = newFakeDisksClient()
|
az.DisksClient = newFakeDisksClient()
|
||||||
az.InterfacesClient = newFakeAzureInterfacesClient()
|
az.InterfacesClient = newFakeAzureInterfacesClient()
|
||||||
@ -1065,7 +1067,7 @@ func getClusterResources(az *Cloud, vmCount int, availabilitySetCount int) (clus
|
|||||||
az.InterfacesClient.CreateOrUpdate(ctx, az.Config.ResourceGroup, nicName, newNIC)
|
az.InterfacesClient.CreateOrUpdate(ctx, az.Config.ResourceGroup, nicName, newNIC)
|
||||||
|
|
||||||
// create vm
|
// create vm
|
||||||
asID := az.getAvailabilitySetID(asName)
|
asID := az.getAvailabilitySetID(az.Config.ResourceGroup, asName)
|
||||||
newVM := compute.VirtualMachine{
|
newVM := compute.VirtualMachine{
|
||||||
Name: &vmName,
|
Name: &vmName,
|
||||||
Location: &az.Config.Location,
|
Location: &az.Config.Location,
|
||||||
@ -2771,3 +2773,100 @@ func TestGetResourceGroupFromDiskURI(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestGetResourceGroups(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
nodeResourceGroups map[string]string
|
||||||
|
expected sets.String
|
||||||
|
informerSynced bool
|
||||||
|
expectError bool
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "cloud provider configured RG should be returned by default",
|
||||||
|
nodeResourceGroups: map[string]string{},
|
||||||
|
informerSynced: true,
|
||||||
|
expected: sets.NewString("rg"),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "cloud provider configured RG and node RGs should be returned",
|
||||||
|
nodeResourceGroups: map[string]string{"node1": "rg1", "node2": "rg2"},
|
||||||
|
informerSynced: true,
|
||||||
|
expected: sets.NewString("rg", "rg1", "rg2"),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "error should be returned if informer hasn't synced yet",
|
||||||
|
nodeResourceGroups: map[string]string{"node1": "rg1", "node2": "rg2"},
|
||||||
|
informerSynced: false,
|
||||||
|
expectError: true,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
az := getTestCloud()
|
||||||
|
for _, test := range tests {
|
||||||
|
az.nodeResourceGroups = test.nodeResourceGroups
|
||||||
|
if test.informerSynced {
|
||||||
|
az.nodeInformerSynced = func() bool { return true }
|
||||||
|
} else {
|
||||||
|
az.nodeInformerSynced = func() bool { return false }
|
||||||
|
}
|
||||||
|
actual, err := az.GetResourceGroups()
|
||||||
|
if test.expectError {
|
||||||
|
assert.NotNil(t, err, test.name)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
assert.Nil(t, err, test.name)
|
||||||
|
assert.Equal(t, test.expected, actual, test.name)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestGetNodeResourceGroup(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
nodeResourceGroups map[string]string
|
||||||
|
node string
|
||||||
|
expected string
|
||||||
|
informerSynced bool
|
||||||
|
expectError bool
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "cloud provider configured RG should be returned by default",
|
||||||
|
nodeResourceGroups: map[string]string{},
|
||||||
|
informerSynced: true,
|
||||||
|
node: "node1",
|
||||||
|
expected: "rg",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "node RGs should be returned",
|
||||||
|
nodeResourceGroups: map[string]string{"node1": "rg1", "node2": "rg2"},
|
||||||
|
informerSynced: true,
|
||||||
|
node: "node1",
|
||||||
|
expected: "rg1",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "error should be returned if informer hasn't synced yet",
|
||||||
|
nodeResourceGroups: map[string]string{"node1": "rg1", "node2": "rg2"},
|
||||||
|
informerSynced: false,
|
||||||
|
expectError: true,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
az := getTestCloud()
|
||||||
|
for _, test := range tests {
|
||||||
|
az.nodeResourceGroups = test.nodeResourceGroups
|
||||||
|
if test.informerSynced {
|
||||||
|
az.nodeInformerSynced = func() bool { return true }
|
||||||
|
} else {
|
||||||
|
az.nodeInformerSynced = func() bool { return false }
|
||||||
|
}
|
||||||
|
actual, err := az.GetNodeResourceGroup(test.node)
|
||||||
|
if test.expectError {
|
||||||
|
assert.NotNil(t, err, test.name)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
assert.Nil(t, err, test.name)
|
||||||
|
assert.Equal(t, test.expected, actual, test.name)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -40,8 +40,10 @@ var (
|
|||||||
// ErrorNotVmssInstance indicates an instance is not belongint to any vmss.
|
// ErrorNotVmssInstance indicates an instance is not belongint to any vmss.
|
||||||
ErrorNotVmssInstance = errors.New("not a vmss instance")
|
ErrorNotVmssInstance = errors.New("not a vmss instance")
|
||||||
|
|
||||||
scaleSetNameRE = regexp.MustCompile(`.*/subscriptions/(?:.*)/Microsoft.Compute/virtualMachineScaleSets/(.+)/virtualMachines(?:.*)`)
|
scaleSetNameRE = regexp.MustCompile(`.*/subscriptions/(?:.*)/Microsoft.Compute/virtualMachineScaleSets/(.+)/virtualMachines(?:.*)`)
|
||||||
vmssMachineIDTemplate = "/subscriptions/%s/resourceGroups/%s/providers/Microsoft.Compute/virtualMachineScaleSets/%s/virtualMachines/%s"
|
resourceGroupRE = regexp.MustCompile(`.*/subscriptions/(?:.*)/resourceGroups/(.+)/providers/Microsoft.Compute/virtualMachineScaleSets/(?:.*)/virtualMachines(?:.*)`)
|
||||||
|
vmssNicResourceGroupRE = regexp.MustCompile(`.*/subscriptions/(?:.*)/resourceGroups/(.+)/providers/Microsoft.Compute/virtualMachineScaleSets/(?:.*)/virtualMachines/(?:.*)/networkInterfaces/(?:.*)`)
|
||||||
|
vmssMachineIDTemplate = "/subscriptions/%s/resourceGroups/%s/providers/Microsoft.Compute/virtualMachineScaleSets/%s/virtualMachines/%s"
|
||||||
)
|
)
|
||||||
|
|
||||||
// scaleSet implements VMSet interface for Azure scale set.
|
// scaleSet implements VMSet interface for Azure scale set.
|
||||||
@ -106,8 +108,14 @@ func (ss *scaleSet) getVmssVM(nodeName string) (ssName, instanceID string, vm co
|
|||||||
return "", "", vm, cloudprovider.InstanceNotFound
|
return "", "", vm, cloudprovider.InstanceNotFound
|
||||||
}
|
}
|
||||||
|
|
||||||
|
resourceGroup, err := ss.GetNodeResourceGroup(nodeName)
|
||||||
|
if err != nil {
|
||||||
|
return "", "", vm, err
|
||||||
|
}
|
||||||
|
|
||||||
glog.V(4).Infof("getVmssVM gets scaleSetName (%q) and instanceID (%q) for node %q", ssName, instanceID, nodeName)
|
glog.V(4).Infof("getVmssVM gets scaleSetName (%q) and instanceID (%q) for node %q", ssName, instanceID, nodeName)
|
||||||
cachedVM, err := ss.vmssVMCache.Get(ss.makeVmssVMName(ssName, instanceID))
|
key := buildVmssCacheKey(resourceGroup, ss.makeVmssVMName(ssName, instanceID))
|
||||||
|
cachedVM, err := ss.vmssVMCache.Get(key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return ssName, instanceID, vm, err
|
return ssName, instanceID, vm, err
|
||||||
}
|
}
|
||||||
@ -122,9 +130,10 @@ func (ss *scaleSet) getVmssVM(nodeName string) (ssName, instanceID string, vm co
|
|||||||
|
|
||||||
// getCachedVirtualMachineByInstanceID gets scaleSetVMInfo from cache.
|
// getCachedVirtualMachineByInstanceID gets scaleSetVMInfo from cache.
|
||||||
// The node must belong to one of scale sets.
|
// The node must belong to one of scale sets.
|
||||||
func (ss *scaleSet) getVmssVMByInstanceID(scaleSetName, instanceID string) (vm compute.VirtualMachineScaleSetVM, err error) {
|
func (ss *scaleSet) getVmssVMByInstanceID(resourceGroup, scaleSetName, instanceID string) (vm compute.VirtualMachineScaleSetVM, err error) {
|
||||||
vmName := ss.makeVmssVMName(scaleSetName, instanceID)
|
vmName := ss.makeVmssVMName(scaleSetName, instanceID)
|
||||||
cachedVM, err := ss.vmssVMCache.Get(vmName)
|
key := buildVmssCacheKey(resourceGroup, vmName)
|
||||||
|
cachedVM, err := ss.vmssVMCache.Get(key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return vm, err
|
return vm, err
|
||||||
}
|
}
|
||||||
@ -168,13 +177,18 @@ func (ss *scaleSet) GetNodeNameByProviderID(providerID string) (types.NodeName,
|
|||||||
return ss.availabilitySet.GetNodeNameByProviderID(providerID)
|
return ss.availabilitySet.GetNodeNameByProviderID(providerID)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
resourceGroup, err := extractResourceGroupByProviderID(providerID)
|
||||||
|
if err != nil {
|
||||||
|
return "", fmt.Errorf("error of extracting resource group for node %q", providerID)
|
||||||
|
}
|
||||||
|
|
||||||
instanceID, err := getLastSegment(providerID)
|
instanceID, err := getLastSegment(providerID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.V(4).Infof("Can not extract instanceID from providerID (%s), assuming it is mananaged by availability set: %v", providerID, err)
|
glog.V(4).Infof("Can not extract instanceID from providerID (%s), assuming it is mananaged by availability set: %v", providerID, err)
|
||||||
return ss.availabilitySet.GetNodeNameByProviderID(providerID)
|
return ss.availabilitySet.GetNodeNameByProviderID(providerID)
|
||||||
}
|
}
|
||||||
|
|
||||||
vm, err := ss.getVmssVMByInstanceID(scaleSetName, instanceID)
|
vm, err := ss.getVmssVMByInstanceID(resourceGroup, scaleSetName, instanceID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
@ -308,7 +322,7 @@ func getScaleSetVMInstanceID(machineName string) (string, error) {
|
|||||||
return fmt.Sprintf("%d", instanceID), nil
|
return fmt.Sprintf("%d", instanceID), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// extractScaleSetNameByProviderID extracts the scaleset name by node's ProviderID.
|
// extractScaleSetNameByProviderID extracts the scaleset name by vmss node's ProviderID.
|
||||||
func extractScaleSetNameByProviderID(providerID string) (string, error) {
|
func extractScaleSetNameByProviderID(providerID string) (string, error) {
|
||||||
matches := scaleSetNameRE.FindStringSubmatch(providerID)
|
matches := scaleSetNameRE.FindStringSubmatch(providerID)
|
||||||
if len(matches) != 2 {
|
if len(matches) != 2 {
|
||||||
@ -318,13 +332,23 @@ func extractScaleSetNameByProviderID(providerID string) (string, error) {
|
|||||||
return matches[1], nil
|
return matches[1], nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// extractResourceGroupByProviderID extracts the resource group name by vmss node's ProviderID.
|
||||||
|
func extractResourceGroupByProviderID(providerID string) (string, error) {
|
||||||
|
matches := resourceGroupRE.FindStringSubmatch(providerID)
|
||||||
|
if len(matches) != 2 {
|
||||||
|
return "", ErrorNotVmssInstance
|
||||||
|
}
|
||||||
|
|
||||||
|
return matches[1], nil
|
||||||
|
}
|
||||||
|
|
||||||
// listScaleSets lists all scale sets.
|
// listScaleSets lists all scale sets.
|
||||||
func (ss *scaleSet) listScaleSets() ([]string, error) {
|
func (ss *scaleSet) listScaleSets(resourceGroup string) ([]string, error) {
|
||||||
var err error
|
var err error
|
||||||
ctx, cancel := getContextWithCancel()
|
ctx, cancel := getContextWithCancel()
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
allScaleSets, err := ss.VirtualMachineScaleSetsClient.List(ctx, ss.ResourceGroup)
|
allScaleSets, err := ss.VirtualMachineScaleSetsClient.List(ctx, resourceGroup)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("VirtualMachineScaleSetsClient.List failed: %v", err)
|
glog.Errorf("VirtualMachineScaleSetsClient.List failed: %v", err)
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -339,12 +363,12 @@ func (ss *scaleSet) listScaleSets() ([]string, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// listScaleSetVMs lists VMs belonging to the specified scale set.
|
// listScaleSetVMs lists VMs belonging to the specified scale set.
|
||||||
func (ss *scaleSet) listScaleSetVMs(scaleSetName string) ([]compute.VirtualMachineScaleSetVM, error) {
|
func (ss *scaleSet) listScaleSetVMs(scaleSetName, resourceGroup string) ([]compute.VirtualMachineScaleSetVM, error) {
|
||||||
var err error
|
var err error
|
||||||
ctx, cancel := getContextWithCancel()
|
ctx, cancel := getContextWithCancel()
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
allVMs, err := ss.VirtualMachineScaleSetVMsClient.List(ctx, ss.ResourceGroup, scaleSetName, "", "", string(compute.InstanceView))
|
allVMs, err := ss.VirtualMachineScaleSetVMsClient.List(ctx, resourceGroup, scaleSetName, "", "", string(compute.InstanceView))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("VirtualMachineScaleSetVMsClient.List failed: %v", err)
|
glog.Errorf("VirtualMachineScaleSetVMsClient.List failed: %v", err)
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -362,6 +386,10 @@ func (ss *scaleSet) getAgentPoolScaleSets(nodes []*v1.Node) (*[]string, error) {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if ss.ShouldNodeExcludedFromLoadBalancer(nodes[nx]) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
nodeName := nodes[nx].Name
|
nodeName := nodes[nx].Name
|
||||||
ssName, err := ss.getScaleSetNameByNodeName(nodeName)
|
ssName, err := ss.getScaleSetNameByNodeName(nodeName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -429,6 +457,16 @@ func (ss *scaleSet) GetVMSetNames(service *v1.Service, nodes []*v1.Node) (vmSetN
|
|||||||
return vmSetNames, nil
|
return vmSetNames, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// extractResourceGroupByVMSSNicID extracts the resource group name by vmss nicID.
|
||||||
|
func extractResourceGroupByVMSSNicID(nicID string) (string, error) {
|
||||||
|
matches := vmssNicResourceGroupRE.FindStringSubmatch(nicID)
|
||||||
|
if len(matches) != 2 {
|
||||||
|
return "", fmt.Errorf("error of extracting resourceGroup from nicID %q", nicID)
|
||||||
|
}
|
||||||
|
|
||||||
|
return matches[1], nil
|
||||||
|
}
|
||||||
|
|
||||||
// GetPrimaryInterface gets machine primary network interface by node name and vmSet.
|
// GetPrimaryInterface gets machine primary network interface by node name and vmSet.
|
||||||
func (ss *scaleSet) GetPrimaryInterface(nodeName string) (network.Interface, error) {
|
func (ss *scaleSet) GetPrimaryInterface(nodeName string) (network.Interface, error) {
|
||||||
managedByAS, err := ss.isNodeManagedByAvailabilitySet(nodeName)
|
managedByAS, err := ss.isNodeManagedByAvailabilitySet(nodeName)
|
||||||
@ -443,6 +481,11 @@ func (ss *scaleSet) GetPrimaryInterface(nodeName string) (network.Interface, err
|
|||||||
|
|
||||||
ssName, instanceID, vm, err := ss.getVmssVM(nodeName)
|
ssName, instanceID, vm, err := ss.getVmssVM(nodeName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
// VM is availability set, but not cached yet in availabilitySetNodesCache.
|
||||||
|
if err == ErrorNotVmssInstance {
|
||||||
|
return ss.availabilitySet.GetPrimaryInterface(nodeName)
|
||||||
|
}
|
||||||
|
|
||||||
glog.Errorf("error: ss.GetPrimaryInterface(%s), ss.getVmssVM(%s), err=%v", nodeName, nodeName, err)
|
glog.Errorf("error: ss.GetPrimaryInterface(%s), ss.getVmssVM(%s), err=%v", nodeName, nodeName, err)
|
||||||
return network.Interface{}, err
|
return network.Interface{}, err
|
||||||
}
|
}
|
||||||
@ -458,12 +501,16 @@ func (ss *scaleSet) GetPrimaryInterface(nodeName string) (network.Interface, err
|
|||||||
glog.Errorf("error: ss.GetPrimaryInterface(%s), getLastSegment(%s), err=%v", nodeName, primaryInterfaceID, err)
|
glog.Errorf("error: ss.GetPrimaryInterface(%s), getLastSegment(%s), err=%v", nodeName, primaryInterfaceID, err)
|
||||||
return network.Interface{}, err
|
return network.Interface{}, err
|
||||||
}
|
}
|
||||||
|
resourceGroup, err := extractResourceGroupByVMSSNicID(primaryInterfaceID)
|
||||||
|
if err != nil {
|
||||||
|
return network.Interface{}, err
|
||||||
|
}
|
||||||
|
|
||||||
ctx, cancel := getContextWithCancel()
|
ctx, cancel := getContextWithCancel()
|
||||||
defer cancel()
|
defer cancel()
|
||||||
nic, err := ss.InterfacesClient.GetVirtualMachineScaleSetNetworkInterface(ctx, ss.ResourceGroup, ssName, instanceID, nicName, "")
|
nic, err := ss.InterfacesClient.GetVirtualMachineScaleSetNetworkInterface(ctx, resourceGroup, ssName, instanceID, nicName, "")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("error: ss.GetPrimaryInterface(%s), ss.GetVirtualMachineScaleSetNetworkInterface.Get(%s, %s, %s), err=%v", nodeName, ss.ResourceGroup, ssName, nicName, err)
|
glog.Errorf("error: ss.GetPrimaryInterface(%s), ss.GetVirtualMachineScaleSetNetworkInterface.Get(%s, %s, %s), err=%v", nodeName, resourceGroup, ssName, nicName, err)
|
||||||
return network.Interface{}, err
|
return network.Interface{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -566,6 +613,11 @@ func (ss *scaleSet) getNodesScaleSets(nodes []*v1.Node) (map[string]sets.String,
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if ss.ShouldNodeExcludedFromLoadBalancer(curNode) {
|
||||||
|
glog.V(4).Infof("Excluding unmanaged/external-resource-group node %q", curNode.Name)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
curScaleSetName, err := extractScaleSetNameByProviderID(curNode.Spec.ProviderID)
|
curScaleSetName, err := extractScaleSetNameByProviderID(curNode.Spec.ProviderID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.V(4).Infof("Node %q is not belonging to any scale sets, assuming it is belong to availability sets", curNode.Name)
|
glog.V(4).Infof("Node %q is not belonging to any scale sets, assuming it is belong to availability sets", curNode.Name)
|
||||||
@ -884,11 +936,11 @@ func (ss *scaleSet) EnsureBackendPoolDeleted(poolID, vmSetName string, backendAd
|
|||||||
}
|
}
|
||||||
|
|
||||||
// getVmssMachineID returns the full identifier of a vmss virtual machine.
|
// getVmssMachineID returns the full identifier of a vmss virtual machine.
|
||||||
func (az *Cloud) getVmssMachineID(scaleSetName, instanceID string) string {
|
func (az *Cloud) getVmssMachineID(resourceGroup, scaleSetName, instanceID string) string {
|
||||||
return fmt.Sprintf(
|
return fmt.Sprintf(
|
||||||
vmssMachineIDTemplate,
|
vmssMachineIDTemplate,
|
||||||
az.SubscriptionID,
|
az.SubscriptionID,
|
||||||
az.ResourceGroup,
|
resourceGroup,
|
||||||
scaleSetName,
|
scaleSetName,
|
||||||
instanceID)
|
instanceID)
|
||||||
}
|
}
|
||||||
|
@ -27,15 +27,16 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
vmssNameSeparator = "_"
|
vmssNameSeparator = "_"
|
||||||
|
vmssCacheSeparator = "#"
|
||||||
|
|
||||||
nodeNameToScaleSetMappingKey = "k8sNodeNameToScaleSetMappingKey"
|
nodeNameToScaleSetMappingKey = "k8sNodeNameToScaleSetMappingKey"
|
||||||
availabilitySetNodesKey = "k8sAvailabilitySetNodesKey"
|
availabilitySetNodesKey = "k8sAvailabilitySetNodesKey"
|
||||||
|
|
||||||
vmssCacheTTL = time.Minute
|
vmssCacheTTL = time.Minute
|
||||||
vmssVMCacheTTL = time.Minute
|
vmssVMCacheTTL = time.Minute
|
||||||
availabilitySetNodesCacheTTL = 15 * time.Minute
|
availabilitySetNodesCacheTTL = 5 * time.Minute
|
||||||
nodeNameToScaleSetMappingCacheTTL = 15 * time.Minute
|
nodeNameToScaleSetMappingCacheTTL = 5 * time.Minute
|
||||||
)
|
)
|
||||||
|
|
||||||
// nodeNameToScaleSetMapping maps nodeName to scaleSet name.
|
// nodeNameToScaleSetMapping maps nodeName to scaleSet name.
|
||||||
@ -49,19 +50,19 @@ func (ss *scaleSet) makeVmssVMName(scaleSetName, instanceID string) string {
|
|||||||
func extractVmssVMName(name string) (string, string, error) {
|
func extractVmssVMName(name string) (string, string, error) {
|
||||||
split := strings.SplitAfter(name, vmssNameSeparator)
|
split := strings.SplitAfter(name, vmssNameSeparator)
|
||||||
if len(split) < 2 {
|
if len(split) < 2 {
|
||||||
glog.Errorf("Failed to extract vmssVMName %q", name)
|
glog.V(3).Infof("Failed to extract vmssVMName %q", name)
|
||||||
return "", "", ErrorNotVmssInstance
|
return "", "", ErrorNotVmssInstance
|
||||||
}
|
}
|
||||||
|
|
||||||
ssName := strings.Join(split[0:len(split)-1], "")
|
ssName := strings.Join(split[0:len(split)-1], "")
|
||||||
// removing the trailing `vmssNameSeparator` since we used SplitAfter
|
// removing the trailing `vmssNameSeparator` since we used SplitAfter
|
||||||
ssName = ssName[:len(ssName)-1]
|
ssName = ssName[:len(ssName)-1]
|
||||||
|
|
||||||
instanceID := split[len(split)-1]
|
instanceID := split[len(split)-1]
|
||||||
|
|
||||||
return ssName, instanceID, nil
|
return ssName, instanceID, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// vmssCache only holds vmss from ss.ResourceGroup because nodes from other resourceGroups
|
||||||
|
// will be excluded from LB backends.
|
||||||
func (ss *scaleSet) newVmssCache() (*timedCache, error) {
|
func (ss *scaleSet) newVmssCache() (*timedCache, error) {
|
||||||
getter := func(key string) (interface{}, error) {
|
getter := func(key string) (interface{}, error) {
|
||||||
ctx, cancel := getContextWithCancel()
|
ctx, cancel := getContextWithCancel()
|
||||||
@ -85,26 +86,34 @@ func (ss *scaleSet) newVmssCache() (*timedCache, error) {
|
|||||||
|
|
||||||
func (ss *scaleSet) newNodeNameToScaleSetMappingCache() (*timedCache, error) {
|
func (ss *scaleSet) newNodeNameToScaleSetMappingCache() (*timedCache, error) {
|
||||||
getter := func(key string) (interface{}, error) {
|
getter := func(key string) (interface{}, error) {
|
||||||
scaleSetNames, err := ss.listScaleSets()
|
localCache := make(nodeNameToScaleSetMapping)
|
||||||
|
|
||||||
|
allResourceGroups, err := ss.GetResourceGroups()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
localCache := make(nodeNameToScaleSetMapping)
|
for _, resourceGroup := range allResourceGroups.List() {
|
||||||
for _, ssName := range scaleSetNames {
|
scaleSetNames, err := ss.listScaleSets(resourceGroup)
|
||||||
vms, err := ss.listScaleSetVMs(ssName)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, vm := range vms {
|
for _, ssName := range scaleSetNames {
|
||||||
if vm.OsProfile == nil || vm.OsProfile.ComputerName == nil {
|
vms, err := ss.listScaleSetVMs(ssName, resourceGroup)
|
||||||
glog.Warningf("failed to get computerName for vmssVM (%q)", ssName)
|
if err != nil {
|
||||||
continue
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
computerName := strings.ToLower(*vm.OsProfile.ComputerName)
|
for _, vm := range vms {
|
||||||
localCache[computerName] = ssName
|
if vm.OsProfile == nil || vm.OsProfile.ComputerName == nil {
|
||||||
|
glog.Warningf("failed to get computerName for vmssVM (%q)", ssName)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
computerName := strings.ToLower(*vm.OsProfile.ComputerName)
|
||||||
|
localCache[computerName] = ssName
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -116,14 +125,23 @@ func (ss *scaleSet) newNodeNameToScaleSetMappingCache() (*timedCache, error) {
|
|||||||
|
|
||||||
func (ss *scaleSet) newAvailabilitySetNodesCache() (*timedCache, error) {
|
func (ss *scaleSet) newAvailabilitySetNodesCache() (*timedCache, error) {
|
||||||
getter := func(key string) (interface{}, error) {
|
getter := func(key string) (interface{}, error) {
|
||||||
vmList, err := ss.Cloud.VirtualMachineClientListWithRetry()
|
localCache := sets.NewString()
|
||||||
|
resourceGroups, err := ss.GetResourceGroups()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
localCache := sets.NewString()
|
for _, resourceGroup := range resourceGroups.List() {
|
||||||
for _, vm := range vmList {
|
vmList, err := ss.Cloud.VirtualMachineClientListWithRetry(resourceGroup)
|
||||||
localCache.Insert(*vm.Name)
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, vm := range vmList {
|
||||||
|
if vm.Name != nil {
|
||||||
|
localCache.Insert(*vm.Name)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return localCache, nil
|
return localCache, nil
|
||||||
@ -132,10 +150,33 @@ func (ss *scaleSet) newAvailabilitySetNodesCache() (*timedCache, error) {
|
|||||||
return newTimedcache(availabilitySetNodesCacheTTL, getter)
|
return newTimedcache(availabilitySetNodesCacheTTL, getter)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func buildVmssCacheKey(resourceGroup, name string) string {
|
||||||
|
// key is composed of <resourceGroup>#<vmName>
|
||||||
|
return fmt.Sprintf("%s%s%s", resourceGroup, vmssCacheSeparator, name)
|
||||||
|
}
|
||||||
|
|
||||||
|
func extractVmssCacheKey(key string) (string, string, error) {
|
||||||
|
// key is composed of <resourceGroup>#<vmName>
|
||||||
|
keyItems := strings.Split(key, vmssCacheSeparator)
|
||||||
|
if len(keyItems) != 2 {
|
||||||
|
return "", "", fmt.Errorf("key %q is not in format '<resouceGroup>#<vmName>'", key)
|
||||||
|
}
|
||||||
|
|
||||||
|
resourceGroup := keyItems[0]
|
||||||
|
vmName := keyItems[1]
|
||||||
|
return resourceGroup, vmName, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (ss *scaleSet) newVmssVMCache() (*timedCache, error) {
|
func (ss *scaleSet) newVmssVMCache() (*timedCache, error) {
|
||||||
getter := func(key string) (interface{}, error) {
|
getter := func(key string) (interface{}, error) {
|
||||||
// vmssVM name's format is 'scaleSetName_instanceID'
|
// key is composed of <resourceGroup>#<vmName>
|
||||||
ssName, instanceID, err := extractVmssVMName(key)
|
resourceGroup, vmName, err := extractVmssCacheKey(key)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// vmName's format is 'scaleSetName_instanceID'
|
||||||
|
ssName, instanceID, err := extractVmssVMName(vmName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -147,7 +188,7 @@ func (ss *scaleSet) newVmssVMCache() (*timedCache, error) {
|
|||||||
|
|
||||||
ctx, cancel := getContextWithCancel()
|
ctx, cancel := getContextWithCancel()
|
||||||
defer cancel()
|
defer cancel()
|
||||||
result, err := ss.VirtualMachineScaleSetVMsClient.Get(ctx, ss.ResourceGroup, ssName, instanceID)
|
result, err := ss.VirtualMachineScaleSetVMsClient.Get(ctx, resourceGroup, ssName, instanceID)
|
||||||
exists, message, realErr := checkResourceExistsFromError(err)
|
exists, message, realErr := checkResourceExistsFromError(err)
|
||||||
if realErr != nil {
|
if realErr != nil {
|
||||||
return nil, realErr
|
return nil, realErr
|
||||||
|
@ -189,7 +189,13 @@ func (az *Cloud) newVMCache() (*timedCache, error) {
|
|||||||
// Consider adding separate parameter for controlling 'InstanceView' once node update issue #56276 is fixed
|
// Consider adding separate parameter for controlling 'InstanceView' once node update issue #56276 is fixed
|
||||||
ctx, cancel := getContextWithCancel()
|
ctx, cancel := getContextWithCancel()
|
||||||
defer cancel()
|
defer cancel()
|
||||||
vm, err := az.VirtualMachinesClient.Get(ctx, az.ResourceGroup, key, compute.InstanceView)
|
|
||||||
|
resourceGroup, err := az.GetNodeResourceGroup(key)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
vm, err := az.VirtualMachinesClient.Get(ctx, resourceGroup, key, compute.InstanceView)
|
||||||
exists, message, realErr := checkResourceExistsFromError(err)
|
exists, message, realErr := checkResourceExistsFromError(err)
|
||||||
if realErr != nil {
|
if realErr != nil {
|
||||||
return nil, realErr
|
return nil, realErr
|
||||||
|
@ -304,14 +304,16 @@ func (p *azureDiskProvisioner) Provision(selectedNode *v1.Node, allowedTopologie
|
|||||||
|
|
||||||
if zoned {
|
if zoned {
|
||||||
// Set node affinity labels based on availability zone labels.
|
// Set node affinity labels based on availability zone labels.
|
||||||
requirements := make([]v1.NodeSelectorRequirement, 0)
|
if len(labels) > 0 {
|
||||||
for k, v := range labels {
|
requirements := make([]v1.NodeSelectorRequirement, 0)
|
||||||
requirements = append(requirements, v1.NodeSelectorRequirement{Key: k, Operator: v1.NodeSelectorOpIn, Values: []string{v}})
|
for k, v := range labels {
|
||||||
}
|
requirements = append(requirements, v1.NodeSelectorRequirement{Key: k, Operator: v1.NodeSelectorOpIn, Values: []string{v}})
|
||||||
|
}
|
||||||
|
|
||||||
nodeSelectorTerms = append(nodeSelectorTerms, v1.NodeSelectorTerm{
|
nodeSelectorTerms = append(nodeSelectorTerms, v1.NodeSelectorTerm{
|
||||||
MatchExpressions: requirements,
|
MatchExpressions: requirements,
|
||||||
})
|
})
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
// Set node affinity labels based on fault domains.
|
// Set node affinity labels based on fault domains.
|
||||||
// This is required because unzoned AzureDisk can't be attached to zoned nodes.
|
// This is required because unzoned AzureDisk can't be attached to zoned nodes.
|
||||||
@ -336,10 +338,12 @@ func (p *azureDiskProvisioner) Provision(selectedNode *v1.Node, allowedTopologie
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pv.Spec.NodeAffinity = &v1.VolumeNodeAffinity{
|
if len(nodeSelectorTerms) > 0 {
|
||||||
Required: &v1.NodeSelector{
|
pv.Spec.NodeAffinity = &v1.VolumeNodeAffinity{
|
||||||
NodeSelectorTerms: nodeSelectorTerms,
|
Required: &v1.NodeSelector{
|
||||||
},
|
NodeSelectorTerms: nodeSelectorTerms,
|
||||||
|
},
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user