diff --git a/pkg/cloudprovider/providers/azure/azure_backoff.go b/pkg/cloudprovider/providers/azure/azure_backoff.go index 33fc9064095..0494201b6b2 100644 --- a/pkg/cloudprovider/providers/azure/azure_backoff.go +++ b/pkg/cloudprovider/providers/azure/azure_backoff.go @@ -58,23 +58,6 @@ func (az *Cloud) GetVirtualMachineWithRetry(name types.NodeName) (compute.Virtua return machine, exists, err } -// GetScaleSetsVMWithRetry invokes ss.getScaleSetVM with exponential backoff retry -func (ss *scaleSet) GetScaleSetsVMWithRetry(name types.NodeName, scaleSetName string) (compute.VirtualMachineScaleSetVM, bool, error) { - var machine compute.VirtualMachineScaleSetVM - var exists bool - err := wait.ExponentialBackoff(ss.resourceRequestBackoff, func() (bool, error) { - var retryErr error - machine, exists, retryErr = ss.getScaleSetVM(string(name), scaleSetName) - if retryErr != nil { - glog.Errorf("GetScaleSetsVMWithRetry backoff: failure, will retry,err=%v", retryErr) - return false, nil - } - glog.V(10).Infof("GetScaleSetsVMWithRetry backoff: success") - return true, nil - }) - return machine, exists, err -} - // VirtualMachineClientGetWithRetry invokes az.VirtualMachinesClient.Get with exponential backoff retry func (az *Cloud) VirtualMachineClientGetWithRetry(resourceGroup, vmName string, types compute.InstanceViewTypes) (compute.VirtualMachine, error) { var machine compute.VirtualMachine diff --git a/pkg/cloudprovider/providers/azure/azure_loadbalancer.go b/pkg/cloudprovider/providers/azure/azure_loadbalancer.go index 54e108fcc84..d6f4bdfac38 100644 --- a/pkg/cloudprovider/providers/azure/azure_loadbalancer.go +++ b/pkg/cloudprovider/providers/azure/azure_loadbalancer.go @@ -753,12 +753,18 @@ func (az *Cloud) reconcileLoadBalancer(clusterName string, service *v1.Service, // Remove backend pools from vmSets. This is required for virtual machine scale sets before removing the LB. vmSetName := az.mapLoadBalancerNameToVMSet(lbName, clusterName) - az.vmSet.EnsureBackendPoolDeleted(lbBackendPoolID, vmSetName) + glog.V(10).Infof("EnsureBackendPoolDeleted(%s, %s): start", lbBackendPoolID, vmSetName) + err := az.vmSet.EnsureBackendPoolDeleted(lbBackendPoolID, vmSetName) + if err != nil { + glog.Errorf("EnsureBackendPoolDeleted(%s, %s) failed: %v", lbBackendPoolID, vmSetName, err) + return nil, err + } + glog.V(10).Infof("EnsureBackendPoolDeleted(%s, %s): end", lbBackendPoolID, vmSetName) // Remove the LB. az.operationPollRateLimiter.Accept() glog.V(10).Infof("LoadBalancerClient.Delete(%q): start", lbName) - err := az.DeleteLBWithRetry(lbName) + err = az.DeleteLBWithRetry(lbName) if err != nil { glog.V(2).Infof("delete(%s) abort backoff: lb(%s) - deleting; no remaining frontendipconfigs", serviceName, lbName) return nil, err diff --git a/pkg/cloudprovider/providers/azure/azure_util_vmss.go b/pkg/cloudprovider/providers/azure/azure_util_vmss.go index 45f631af951..71a881c9fef 100644 --- a/pkg/cloudprovider/providers/azure/azure_util_vmss.go +++ b/pkg/cloudprovider/providers/azure/azure_util_vmss.go @@ -23,6 +23,8 @@ import ( "sort" "strconv" "strings" + "sync" + "time" "github.com/Azure/azure-sdk-for-go/arm/compute" "github.com/Azure/azure-sdk-for-go/arm/network" @@ -40,9 +42,29 @@ var ( // ErrorNotVmssInstance indicates an instance is not belongint to any vmss. ErrorNotVmssInstance = errors.New("not a vmss instance") - scaleSetNameRE = regexp.MustCompile(`^/subscriptions/(?:.*)/Microsoft.Compute/virtualMachineScaleSets/(.+)/virtualMachines(?:.*)`) + scaleSetNameRE = regexp.MustCompile(`.*/subscriptions/(?:.*)/Microsoft.Compute/virtualMachineScaleSets/(.+)/virtualMachines(?:.*)`) ) +// scaleSetVMInfo includes basic information of a virtual machine. +type scaleSetVMInfo struct { + // The ID of the machine. + ID string + // Instance ID of the machine (only for scale sets vm). + InstanceID string + // Node name of the machine. + NodeName string + // Set name of the machine. + ScaleSetName string + // The type of the machine. + Type string + // The region of the machine. + Region string + // Primary interface ID of the machine. + PrimaryInterfaceID string + // Fault domain of the machine. + FaultDomain string +} + // scaleSet implements VMSet interface for Azure scale set. type scaleSet struct { *Cloud @@ -50,23 +72,173 @@ type scaleSet struct { // availabilitySet is also required for scaleSet because some instances // (e.g. master nodes) may not belong to any scale sets. availabilitySet VMSet + + cacheMutex sync.Mutex + // A local cache of scale sets. The key is scale set name and the value is a + // list of virtual machines belonging to the scale set. + cache map[string][]scaleSetVMInfo } // newScaleSet creates a new scaleSet. func newScaleSet(az *Cloud) VMSet { - return &scaleSet{ + ss := &scaleSet{ Cloud: az, availabilitySet: newAvailabilitySet(az), + cache: make(map[string][]scaleSetVMInfo), } + + go wait.Until(func() { + ss.cacheMutex.Lock() + defer ss.cacheMutex.Unlock() + + if err := ss.updateCache(); err != nil { + glog.Errorf("updateCache failed: %v", err) + } + }, 5*time.Minute, wait.NeverStop) + + return ss +} + +// updateCache updates scale sets cache. It should be called within a lock. +func (ss *scaleSet) updateCache() error { + scaleSetNames, err := ss.listScaleSetsWithRetry() + if err != nil { + return err + } + + localCache := make(map[string][]scaleSetVMInfo) + for _, scaleSetName := range scaleSetNames { + if _, ok := localCache[scaleSetName]; !ok { + localCache[scaleSetName] = make([]scaleSetVMInfo, 0) + } + vms, err := ss.listScaleSetVMsWithRetry(scaleSetName) + if err != nil { + return err + } + + for _, vm := range vms { + nodeName := "" + if vm.OsProfile != nil && vm.OsProfile.ComputerName != nil { + nodeName = *vm.OsProfile.ComputerName + } + + vmSize := "" + if vm.Sku != nil && vm.Sku.Name != nil { + vmSize = *vm.Sku.Name + } + + primaryInterfaceID, err := ss.getPrimaryInterfaceID(vm) + if err != nil { + glog.Errorf("getPrimaryInterfaceID for %s failed: %v", nodeName, err) + return err + } + + faultDomain := "" + if vm.InstanceView != nil && vm.InstanceView.PlatformFaultDomain != nil { + faultDomain = strconv.Itoa(int(*vm.InstanceView.PlatformFaultDomain)) + } + + localCache[scaleSetName] = append(localCache[scaleSetName], scaleSetVMInfo{ + ID: *vm.ID, + Type: vmSize, + NodeName: nodeName, + FaultDomain: faultDomain, + ScaleSetName: scaleSetName, + Region: *vm.Location, + InstanceID: *vm.InstanceID, + PrimaryInterfaceID: primaryInterfaceID, + }) + } + } + + // Only update cache after all steps are success. + ss.cache = localCache + + return nil +} + +// getCachedVirtualMachine gets virtualMachine by nodeName from cache. +func (ss *scaleSet) getCachedVirtualMachine(nodeName string) (scaleSetVMInfo, error) { + ss.cacheMutex.Lock() + defer ss.cacheMutex.Unlock() + + getVMFromCache := func(nodeName string) (scaleSetVMInfo, error) { + for scaleSetName := range ss.cache { + for _, vm := range ss.cache[scaleSetName] { + if vm.NodeName == nodeName { + return vm, nil + } + } + } + + return scaleSetVMInfo{}, cloudprovider.InstanceNotFound + } + + vm, err := getVMFromCache(nodeName) + if err == nil { + return vm, nil + } + + // Update cache and try again. + if err = ss.updateCache(); err != nil { + return scaleSetVMInfo{}, err + } + vm, err = getVMFromCache(nodeName) + if err == nil { + return vm, nil + } + + return scaleSetVMInfo{}, cloudprovider.InstanceNotFound +} + +func (ss *scaleSet) getCachedVirtualMachineByInstanceID(scaleSetName, instanceID string) (scaleSetVMInfo, error) { + ss.cacheMutex.Lock() + defer ss.cacheMutex.Unlock() + + getVMByID := func(scaleSetName, instanceID string) (scaleSetVMInfo, error) { + vms, ok := ss.cache[scaleSetName] + if !ok { + glog.V(4).Infof("scale set (%s) not found", scaleSetName) + return scaleSetVMInfo{}, cloudprovider.InstanceNotFound + } + + for _, vm := range vms { + if vm.InstanceID == instanceID { + glog.V(4).Infof("getCachedVirtualMachineByInstanceID gets vm (%s) by instanceID (%s) within scale set (%s)", vm.NodeName, instanceID, scaleSetName) + return vm, nil + } + } + + glog.V(4).Infof("instanceID (%s) not found in scale set (%s)", instanceID, scaleSetName) + return scaleSetVMInfo{}, cloudprovider.InstanceNotFound + } + + vm, err := getVMByID(scaleSetName, instanceID) + if err == nil { + return vm, nil + } + + // Update cache and try again. + if err = ss.updateCache(); err != nil { + return scaleSetVMInfo{}, err + } + vm, err = getVMByID(scaleSetName, instanceID) + if err == nil { + return vm, nil + } + + return scaleSetVMInfo{}, cloudprovider.InstanceNotFound } // GetInstanceIDByNodeName gets the cloud provider ID by node name. // It must return ("", cloudprovider.InstanceNotFound) if the instance does // not exist or is no longer running. func (ss *scaleSet) GetInstanceIDByNodeName(name string) (string, error) { - instanceID, err := ss.getScaleSetInstanceIDByName(name, ss.PrimaryScaleSetName) + vm, err := ss.getCachedVirtualMachine(name) if err != nil { - if err == cloudprovider.InstanceNotFound || err == ErrorNotVmssInstance { + if err == cloudprovider.InstanceNotFound { + glog.V(4).Infof("GetInstanceIDByNodeName: node %q is not found in scale sets, assuming it is managed by availability set", name) + // Retry with standard type because master nodes may not belong to any vmss. // TODO: find a better way to identify the type of VM. return ss.availabilitySet.GetInstanceIDByNodeName(name) @@ -75,89 +247,39 @@ func (ss *scaleSet) GetInstanceIDByNodeName(name string) (string, error) { return "", err } - return instanceID, nil -} - -func (ss *scaleSet) getScaleSetInstanceIDByName(name, scaleSetName string) (string, error) { - var machine compute.VirtualMachineScaleSetVM - var exists bool - var err error - - ss.operationPollRateLimiter.Accept() - machine, exists, err = ss.getScaleSetVM(name, scaleSetName) - if err != nil { - if ss.CloudProviderBackoff { - glog.V(2).Infof("InstanceID(%s) backing off", name) - machine, exists, err = ss.GetScaleSetsVMWithRetry(types.NodeName(name), scaleSetName) - if err != nil { - glog.V(2).Infof("InstanceID(%s) abort backoff", name) - return "", err - } - } else { - return "", err - } - } else if !exists { - return "", cloudprovider.InstanceNotFound - } - - return *machine.ID, nil -} - -func (ss *scaleSet) getScaleSetVM(nodeName, scaleSetName string) (vm compute.VirtualMachineScaleSetVM, exists bool, err error) { - instanceID, err := getScaleSetVMInstanceID(nodeName) - if err != nil { - return vm, false, err - } - - return ss.getScaleSetVMByID(instanceID, scaleSetName) -} - -func (ss *scaleSet) getScaleSetVMByID(instanceID, scaleSetName string) (vm compute.VirtualMachineScaleSetVM, exists bool, err error) { - var realErr error - - // scaleSetName is required to query VM info. - if scaleSetName == "" { - scaleSetName = ss.PrimaryScaleSetName - } - - ss.operationPollRateLimiter.Accept() - glog.V(10).Infof("VirtualMachineScaleSetVMsClient.Get(%s): start", instanceID) - vm, err = ss.VirtualMachineScaleSetVMsClient.Get(ss.ResourceGroup, scaleSetName, instanceID) - glog.V(10).Infof("VirtualMachineScaleSetVMsClient.Get(%s): end", instanceID) - - exists, realErr = checkResourceExistsFromError(err) - if realErr != nil { - return vm, false, realErr - } - - if !exists { - return vm, false, nil - } - - return vm, exists, err + return vm.ID, nil } // GetNodeNameByProviderID gets the node name by provider ID. func (ss *scaleSet) GetNodeNameByProviderID(providerID string) (types.NodeName, error) { // NodeName is not part of providerID for vmss instances. - parts := strings.Split(providerID, "/") - instanceID := parts[len(parts)-1] - machine, exist, err := ss.getScaleSetVMByID(instanceID, ss.PrimaryScaleSetName) - if !exist { - return "", cloudprovider.InstanceNotFound + scaleSetName, err := extractScaleSetNameByVMID(providerID) + if err != nil { + glog.V(4).Infof("Can not extract scale set name from providerID (%s), assuming it is mananaged by availability set: %v", providerID, err) + return ss.availabilitySet.GetNodeNameByProviderID(providerID) } + + instanceID, err := getLastSegment(providerID) + if err != nil { + glog.V(4).Infof("Can not extract instanceID from providerID (%s), assuming it is mananaged by availability set: %v", providerID, err) + return ss.availabilitySet.GetNodeNameByProviderID(providerID) + } + + vm, err := ss.getCachedVirtualMachineByInstanceID(scaleSetName, instanceID) if err != nil { return "", err } - return types.NodeName(*machine.OsProfile.ComputerName), nil + return types.NodeName(vm.NodeName), nil } // GetInstanceTypeByNodeName gets the instance type by node name. func (ss *scaleSet) GetInstanceTypeByNodeName(name string) (string, error) { - instanceType, err := ss.getScaleSetInstanceTypeByNodeName(name) + vm, err := ss.getCachedVirtualMachine(name) if err != nil { - if err == cloudprovider.InstanceNotFound || err == ErrorNotVmssInstance { + if err == cloudprovider.InstanceNotFound { + glog.V(4).Infof("GetInstanceTypeByNodeName: node %q is not found in scale sets, assuming it is managed by availability set", name) + // Retry with standard type because master nodes may not belong to any vmss. // TODO: find a better way to identify the type of VM. return ss.availabilitySet.GetInstanceTypeByNodeName(name) @@ -166,30 +288,15 @@ func (ss *scaleSet) GetInstanceTypeByNodeName(name string) (string, error) { return "", err } - return instanceType, nil -} - -func (ss *scaleSet) getScaleSetInstanceTypeByNodeName(name string) (string, error) { - machine, exists, err := ss.getScaleSetVM(name, ss.PrimaryScaleSetName) - if err != nil { - glog.Errorf("error: ss.getScaleSetInstanceTypeByNodeName(%s), ss.getScaleSetVM(%s) err=%v", name, name, err) - return "", err - } else if !exists { - return "", cloudprovider.InstanceNotFound - } - - if machine.Sku.Name != nil { - return *machine.Sku.Name, nil - } - - return "", fmt.Errorf("instance type is not defined") + return vm.Type, nil } // GetZoneByNodeName gets cloudprovider.Zone by node name. func (ss *scaleSet) GetZoneByNodeName(name string) (cloudprovider.Zone, error) { - instanceID, err := getScaleSetVMInstanceID(name) + vm, err := ss.getCachedVirtualMachine(name) if err != nil { - if err == ErrorNotVmssInstance { + if err == cloudprovider.InstanceNotFound { + glog.V(4).Infof("GetZoneByNodeName: node %q is not found in scale sets, assuming it is managed by availability set", name) // Retry with standard type because master nodes may not belong to any vmss. // TODO: find a better way to identify the type of VM. return ss.availabilitySet.GetZoneByNodeName(name) @@ -197,23 +304,10 @@ func (ss *scaleSet) GetZoneByNodeName(name string) (cloudprovider.Zone, error) { return cloudprovider.Zone{}, err } - vm, err := ss.VirtualMachineScaleSetVMsClient.Get(ss.ResourceGroup, ss.Config.PrimaryScaleSetName, instanceID) - if err != nil { - return cloudprovider.Zone{}, err - } - - // PlatformFaultDomain is not included in VirtualMachineScaleSetVM, so we get it from VirtualMachineScaleSetVMInstanceView. - vmView, err := ss.VirtualMachineScaleSetVMsClient.GetInstanceView(ss.ResourceGroup, ss.Config.PrimaryScaleSetName, instanceID) - if err != nil { - return cloudprovider.Zone{}, err - } - - failureDomain := strconv.Itoa(int(*vmView.PlatformFaultDomain)) - zone := cloudprovider.Zone{ - FailureDomain: failureDomain, - Region: *(vm.Location), - } - return zone, nil + return cloudprovider.Zone{ + FailureDomain: vm.FaultDomain, + Region: vm.Region, + }, nil } // GetPrimaryVMSetName returns the VM set name depending on the configured vmType. @@ -345,7 +439,7 @@ func (ss *scaleSet) listScaleSetVMsWithRetry(scaleSetName string) ([]compute.Vir backoffError := wait.ExponentialBackoff(ss.requestBackoff(), func() (bool, error) { ss.operationPollRateLimiter.Accept() glog.V(10).Infof("VirtualMachineScaleSetVMsClient.List start for %v", scaleSetName) - result, err = ss.VirtualMachineScaleSetVMsClient.List(ss.ResourceGroup, scaleSetName, "", "", "") + result, err = ss.VirtualMachineScaleSetVMsClient.List(ss.ResourceGroup, scaleSetName, "", "", string(compute.InstanceView)) glog.V(10).Infof("VirtualMachineScaleSetVMsClient.List end for %v", scaleSetName) if err != nil { glog.Errorf("VirtualMachineScaleSetVMsClient.List for %v failed: %v", scaleSetName, err) @@ -388,25 +482,24 @@ func (ss *scaleSet) listScaleSetVMsWithRetry(scaleSetName string) ([]compute.Vir return allVMs, nil } -// getAgentPoolAvailabiliySets lists the virtual machines for for the resource group and then builds -// a list of availability sets that match the nodes available to k8s. +// getAgentPoolScaleSets lists the virtual machines for for the resource group and then builds +// a list of scale sets that match the nodes available to k8s. func (ss *scaleSet) getAgentPoolScaleSets(nodes []*v1.Node) (*[]string, error) { - scaleSetNames, err := ss.listScaleSetsWithRetry() - if err != nil { + ss.cacheMutex.Lock() + defer ss.cacheMutex.Unlock() + + // Always update cache to get latest lists of scale sets and virtual machines. + if err := ss.updateCache(); err != nil { return nil, err } - vmNameToScaleSetName := make(map[string]string, len(scaleSetNames)) - for _, scaleSetName := range scaleSetNames { - vms, err := ss.listScaleSetVMsWithRetry(scaleSetName) - if err != nil { - return nil, err - } - + vmNameToScaleSetName := make(map[string]string) + for scaleSetName := range ss.cache { + vms := ss.cache[scaleSetName] for idx := range vms { vm := vms[idx] - if vm.OsProfile != nil || vm.OsProfile.ComputerName != nil { - vmNameToScaleSetName[*vm.OsProfile.ComputerName] = scaleSetName + if vm.NodeName != "" { + vmNameToScaleSetName[vm.NodeName] = scaleSetName } } } @@ -488,43 +581,42 @@ func (ss *scaleSet) GetVMSetNames(service *v1.Service, nodes []*v1.Node) (vmSetN // GetPrimaryInterface gets machine primary network interface by node name and vmSet. func (ss *scaleSet) GetPrimaryInterface(nodeName, vmSetName string) (network.Interface, error) { - ss.operationPollRateLimiter.Accept() - machine, exists, err := ss.getScaleSetVM(nodeName, vmSetName) - if !exists || err == cloudprovider.InstanceNotFound || err == ErrorNotVmssInstance { - // Retry with standard type because master nodes may not belong to any vmss. - // TODO: find a better way to identify the type of VM. - return ss.availabilitySet.GetPrimaryInterface(nodeName, "") - } + vm, err := ss.getCachedVirtualMachine(nodeName) if err != nil { - glog.Errorf("error: ss.GetPrimaryInterface(%s), ss.getScaleSetVM(%s), err=%v", nodeName, nodeName, err) + if err == cloudprovider.InstanceNotFound { + // Retry with standard type because master nodes may not belong to any vmss. + // TODO: find a better way to identify the type of VM. + return ss.availabilitySet.GetPrimaryInterface(nodeName, "") + } + + glog.Errorf("error: ss.GetPrimaryInterface(%s), ss.getCachedVirtualMachine(%s), err=%v", nodeName, nodeName, err) return network.Interface{}, err } - nicID, err := ss.getPrimaryInterfaceID(machine) - if err != nil { - glog.Errorf("error: ss.GetPrimaryInterface(%s), getPrimaryInterfaceID(%v), err=%v", nodeName, machine, err) - return network.Interface{}, err + // Check scale set name. + if vmSetName != "" && !strings.EqualFold(vm.ScaleSetName, vmSetName) { + return network.Interface{}, errNotInVMSet } - nicName, err := getLastSegment(nicID) + nicName, err := getLastSegment(vm.PrimaryInterfaceID) if err != nil { - glog.Errorf("error: ss.GetPrimaryInterface(%s), getLastSegment(%s), err=%v", nodeName, nicID, err) + glog.Errorf("error: ss.GetPrimaryInterface(%s), getLastSegment(%s), err=%v", nodeName, vm.PrimaryInterfaceID, err) return network.Interface{}, err } ss.operationPollRateLimiter.Accept() glog.V(10).Infof("InterfacesClient.Get(%q): start", nicName) - nic, err := ss.InterfacesClient.GetVirtualMachineScaleSetNetworkInterface(ss.ResourceGroup, ss.Config.PrimaryScaleSetName, *machine.InstanceID, nicName, "") + nic, err := ss.InterfacesClient.GetVirtualMachineScaleSetNetworkInterface(ss.ResourceGroup, vm.ScaleSetName, vm.InstanceID, nicName, "") glog.V(10).Infof("InterfacesClient.Get(%q): end", nicName) if err != nil { - glog.Errorf("error: ss.GetPrimaryInterface(%s), ss.GetVirtualMachineScaleSetNetworkInterface.Get(%s, %s, %s), err=%v", nodeName, ss.ResourceGroup, nicName, "", err) + glog.Errorf("error: ss.GetPrimaryInterface(%s), ss.GetVirtualMachineScaleSetNetworkInterface.Get(%s, %s, %s), err=%v", nodeName, ss.ResourceGroup, vm.ScaleSetName, nicName, err) return network.Interface{}, err } // Fix interface's location, which is required when updating the interface. // TODO: is this a bug of azure SDK? if nic.Location == nil || *nic.Location == "" { - nic.Location = &ss.Config.Location + nic.Location = &vm.Region } return nic, nil