Support multiple scale sets in same cluster

This commit is contained in:
Pengfei Ni 2017-12-21 10:12:46 +08:00
parent 5001198f0e
commit c07ab68005
3 changed files with 239 additions and 158 deletions

View File

@ -58,23 +58,6 @@ func (az *Cloud) GetVirtualMachineWithRetry(name types.NodeName) (compute.Virtua
return machine, exists, err
}
// GetScaleSetsVMWithRetry invokes ss.getScaleSetVM with exponential backoff retry
func (ss *scaleSet) GetScaleSetsVMWithRetry(name types.NodeName, scaleSetName string) (compute.VirtualMachineScaleSetVM, bool, error) {
var machine compute.VirtualMachineScaleSetVM
var exists bool
err := wait.ExponentialBackoff(ss.resourceRequestBackoff, func() (bool, error) {
var retryErr error
machine, exists, retryErr = ss.getScaleSetVM(string(name), scaleSetName)
if retryErr != nil {
glog.Errorf("GetScaleSetsVMWithRetry backoff: failure, will retry,err=%v", retryErr)
return false, nil
}
glog.V(10).Infof("GetScaleSetsVMWithRetry backoff: success")
return true, nil
})
return machine, exists, err
}
// VirtualMachineClientGetWithRetry invokes az.VirtualMachinesClient.Get with exponential backoff retry
func (az *Cloud) VirtualMachineClientGetWithRetry(resourceGroup, vmName string, types compute.InstanceViewTypes) (compute.VirtualMachine, error) {
var machine compute.VirtualMachine

View File

@ -753,12 +753,18 @@ func (az *Cloud) reconcileLoadBalancer(clusterName string, service *v1.Service,
// Remove backend pools from vmSets. This is required for virtual machine scale sets before removing the LB.
vmSetName := az.mapLoadBalancerNameToVMSet(lbName, clusterName)
az.vmSet.EnsureBackendPoolDeleted(lbBackendPoolID, vmSetName)
glog.V(10).Infof("EnsureBackendPoolDeleted(%s, %s): start", lbBackendPoolID, vmSetName)
err := az.vmSet.EnsureBackendPoolDeleted(lbBackendPoolID, vmSetName)
if err != nil {
glog.Errorf("EnsureBackendPoolDeleted(%s, %s) failed: %v", lbBackendPoolID, vmSetName, err)
return nil, err
}
glog.V(10).Infof("EnsureBackendPoolDeleted(%s, %s): end", lbBackendPoolID, vmSetName)
// Remove the LB.
az.operationPollRateLimiter.Accept()
glog.V(10).Infof("LoadBalancerClient.Delete(%q): start", lbName)
err := az.DeleteLBWithRetry(lbName)
err = az.DeleteLBWithRetry(lbName)
if err != nil {
glog.V(2).Infof("delete(%s) abort backoff: lb(%s) - deleting; no remaining frontendipconfigs", serviceName, lbName)
return nil, err

View File

@ -23,6 +23,8 @@ import (
"sort"
"strconv"
"strings"
"sync"
"time"
"github.com/Azure/azure-sdk-for-go/arm/compute"
"github.com/Azure/azure-sdk-for-go/arm/network"
@ -40,9 +42,29 @@ var (
// ErrorNotVmssInstance indicates an instance is not belongint to any vmss.
ErrorNotVmssInstance = errors.New("not a vmss instance")
scaleSetNameRE = regexp.MustCompile(`^/subscriptions/(?:.*)/Microsoft.Compute/virtualMachineScaleSets/(.+)/virtualMachines(?:.*)`)
scaleSetNameRE = regexp.MustCompile(`.*/subscriptions/(?:.*)/Microsoft.Compute/virtualMachineScaleSets/(.+)/virtualMachines(?:.*)`)
)
// scaleSetVMInfo includes basic information of a virtual machine.
type scaleSetVMInfo struct {
// The ID of the machine.
ID string
// Instance ID of the machine (only for scale sets vm).
InstanceID string
// Node name of the machine.
NodeName string
// Set name of the machine.
ScaleSetName string
// The type of the machine.
Type string
// The region of the machine.
Region string
// Primary interface ID of the machine.
PrimaryInterfaceID string
// Fault domain of the machine.
FaultDomain string
}
// scaleSet implements VMSet interface for Azure scale set.
type scaleSet struct {
*Cloud
@ -50,23 +72,173 @@ type scaleSet struct {
// availabilitySet is also required for scaleSet because some instances
// (e.g. master nodes) may not belong to any scale sets.
availabilitySet VMSet
cacheMutex sync.Mutex
// A local cache of scale sets. The key is scale set name and the value is a
// list of virtual machines belonging to the scale set.
cache map[string][]scaleSetVMInfo
}
// newScaleSet creates a new scaleSet.
func newScaleSet(az *Cloud) VMSet {
return &scaleSet{
ss := &scaleSet{
Cloud: az,
availabilitySet: newAvailabilitySet(az),
cache: make(map[string][]scaleSetVMInfo),
}
go wait.Until(func() {
ss.cacheMutex.Lock()
defer ss.cacheMutex.Unlock()
if err := ss.updateCache(); err != nil {
glog.Errorf("updateCache failed: %v", err)
}
}, 5*time.Minute, wait.NeverStop)
return ss
}
// updateCache updates scale sets cache. It should be called within a lock.
func (ss *scaleSet) updateCache() error {
scaleSetNames, err := ss.listScaleSetsWithRetry()
if err != nil {
return err
}
localCache := make(map[string][]scaleSetVMInfo)
for _, scaleSetName := range scaleSetNames {
if _, ok := localCache[scaleSetName]; !ok {
localCache[scaleSetName] = make([]scaleSetVMInfo, 0)
}
vms, err := ss.listScaleSetVMsWithRetry(scaleSetName)
if err != nil {
return err
}
for _, vm := range vms {
nodeName := ""
if vm.OsProfile != nil && vm.OsProfile.ComputerName != nil {
nodeName = *vm.OsProfile.ComputerName
}
vmSize := ""
if vm.Sku != nil && vm.Sku.Name != nil {
vmSize = *vm.Sku.Name
}
primaryInterfaceID, err := ss.getPrimaryInterfaceID(vm)
if err != nil {
glog.Errorf("getPrimaryInterfaceID for %s failed: %v", nodeName, err)
return err
}
faultDomain := ""
if vm.InstanceView != nil && vm.InstanceView.PlatformFaultDomain != nil {
faultDomain = strconv.Itoa(int(*vm.InstanceView.PlatformFaultDomain))
}
localCache[scaleSetName] = append(localCache[scaleSetName], scaleSetVMInfo{
ID: *vm.ID,
Type: vmSize,
NodeName: nodeName,
FaultDomain: faultDomain,
ScaleSetName: scaleSetName,
Region: *vm.Location,
InstanceID: *vm.InstanceID,
PrimaryInterfaceID: primaryInterfaceID,
})
}
}
// Only update cache after all steps are success.
ss.cache = localCache
return nil
}
// getCachedVirtualMachine gets virtualMachine by nodeName from cache.
func (ss *scaleSet) getCachedVirtualMachine(nodeName string) (scaleSetVMInfo, error) {
ss.cacheMutex.Lock()
defer ss.cacheMutex.Unlock()
getVMFromCache := func(nodeName string) (scaleSetVMInfo, error) {
for scaleSetName := range ss.cache {
for _, vm := range ss.cache[scaleSetName] {
if vm.NodeName == nodeName {
return vm, nil
}
}
}
return scaleSetVMInfo{}, cloudprovider.InstanceNotFound
}
vm, err := getVMFromCache(nodeName)
if err == nil {
return vm, nil
}
// Update cache and try again.
if err = ss.updateCache(); err != nil {
return scaleSetVMInfo{}, err
}
vm, err = getVMFromCache(nodeName)
if err == nil {
return vm, nil
}
return scaleSetVMInfo{}, cloudprovider.InstanceNotFound
}
func (ss *scaleSet) getCachedVirtualMachineByInstanceID(scaleSetName, instanceID string) (scaleSetVMInfo, error) {
ss.cacheMutex.Lock()
defer ss.cacheMutex.Unlock()
getVMByID := func(scaleSetName, instanceID string) (scaleSetVMInfo, error) {
vms, ok := ss.cache[scaleSetName]
if !ok {
glog.V(4).Infof("scale set (%s) not found", scaleSetName)
return scaleSetVMInfo{}, cloudprovider.InstanceNotFound
}
for _, vm := range vms {
if vm.InstanceID == instanceID {
glog.V(4).Infof("getCachedVirtualMachineByInstanceID gets vm (%s) by instanceID (%s) within scale set (%s)", vm.NodeName, instanceID, scaleSetName)
return vm, nil
}
}
glog.V(4).Infof("instanceID (%s) not found in scale set (%s)", instanceID, scaleSetName)
return scaleSetVMInfo{}, cloudprovider.InstanceNotFound
}
vm, err := getVMByID(scaleSetName, instanceID)
if err == nil {
return vm, nil
}
// Update cache and try again.
if err = ss.updateCache(); err != nil {
return scaleSetVMInfo{}, err
}
vm, err = getVMByID(scaleSetName, instanceID)
if err == nil {
return vm, nil
}
return scaleSetVMInfo{}, cloudprovider.InstanceNotFound
}
// GetInstanceIDByNodeName gets the cloud provider ID by node name.
// It must return ("", cloudprovider.InstanceNotFound) if the instance does
// not exist or is no longer running.
func (ss *scaleSet) GetInstanceIDByNodeName(name string) (string, error) {
instanceID, err := ss.getScaleSetInstanceIDByName(name, ss.PrimaryScaleSetName)
vm, err := ss.getCachedVirtualMachine(name)
if err != nil {
if err == cloudprovider.InstanceNotFound || err == ErrorNotVmssInstance {
if err == cloudprovider.InstanceNotFound {
glog.V(4).Infof("GetInstanceIDByNodeName: node %q is not found in scale sets, assuming it is managed by availability set", name)
// Retry with standard type because master nodes may not belong to any vmss.
// TODO: find a better way to identify the type of VM.
return ss.availabilitySet.GetInstanceIDByNodeName(name)
@ -75,89 +247,39 @@ func (ss *scaleSet) GetInstanceIDByNodeName(name string) (string, error) {
return "", err
}
return instanceID, nil
}
func (ss *scaleSet) getScaleSetInstanceIDByName(name, scaleSetName string) (string, error) {
var machine compute.VirtualMachineScaleSetVM
var exists bool
var err error
ss.operationPollRateLimiter.Accept()
machine, exists, err = ss.getScaleSetVM(name, scaleSetName)
if err != nil {
if ss.CloudProviderBackoff {
glog.V(2).Infof("InstanceID(%s) backing off", name)
machine, exists, err = ss.GetScaleSetsVMWithRetry(types.NodeName(name), scaleSetName)
if err != nil {
glog.V(2).Infof("InstanceID(%s) abort backoff", name)
return "", err
}
} else {
return "", err
}
} else if !exists {
return "", cloudprovider.InstanceNotFound
}
return *machine.ID, nil
}
func (ss *scaleSet) getScaleSetVM(nodeName, scaleSetName string) (vm compute.VirtualMachineScaleSetVM, exists bool, err error) {
instanceID, err := getScaleSetVMInstanceID(nodeName)
if err != nil {
return vm, false, err
}
return ss.getScaleSetVMByID(instanceID, scaleSetName)
}
func (ss *scaleSet) getScaleSetVMByID(instanceID, scaleSetName string) (vm compute.VirtualMachineScaleSetVM, exists bool, err error) {
var realErr error
// scaleSetName is required to query VM info.
if scaleSetName == "" {
scaleSetName = ss.PrimaryScaleSetName
}
ss.operationPollRateLimiter.Accept()
glog.V(10).Infof("VirtualMachineScaleSetVMsClient.Get(%s): start", instanceID)
vm, err = ss.VirtualMachineScaleSetVMsClient.Get(ss.ResourceGroup, scaleSetName, instanceID)
glog.V(10).Infof("VirtualMachineScaleSetVMsClient.Get(%s): end", instanceID)
exists, realErr = checkResourceExistsFromError(err)
if realErr != nil {
return vm, false, realErr
}
if !exists {
return vm, false, nil
}
return vm, exists, err
return vm.ID, nil
}
// GetNodeNameByProviderID gets the node name by provider ID.
func (ss *scaleSet) GetNodeNameByProviderID(providerID string) (types.NodeName, error) {
// NodeName is not part of providerID for vmss instances.
parts := strings.Split(providerID, "/")
instanceID := parts[len(parts)-1]
machine, exist, err := ss.getScaleSetVMByID(instanceID, ss.PrimaryScaleSetName)
if !exist {
return "", cloudprovider.InstanceNotFound
scaleSetName, err := extractScaleSetNameByVMID(providerID)
if err != nil {
glog.V(4).Infof("Can not extract scale set name from providerID (%s), assuming it is mananaged by availability set: %v", providerID, err)
return ss.availabilitySet.GetNodeNameByProviderID(providerID)
}
instanceID, err := getLastSegment(providerID)
if err != nil {
glog.V(4).Infof("Can not extract instanceID from providerID (%s), assuming it is mananaged by availability set: %v", providerID, err)
return ss.availabilitySet.GetNodeNameByProviderID(providerID)
}
vm, err := ss.getCachedVirtualMachineByInstanceID(scaleSetName, instanceID)
if err != nil {
return "", err
}
return types.NodeName(*machine.OsProfile.ComputerName), nil
return types.NodeName(vm.NodeName), nil
}
// GetInstanceTypeByNodeName gets the instance type by node name.
func (ss *scaleSet) GetInstanceTypeByNodeName(name string) (string, error) {
instanceType, err := ss.getScaleSetInstanceTypeByNodeName(name)
vm, err := ss.getCachedVirtualMachine(name)
if err != nil {
if err == cloudprovider.InstanceNotFound || err == ErrorNotVmssInstance {
if err == cloudprovider.InstanceNotFound {
glog.V(4).Infof("GetInstanceTypeByNodeName: node %q is not found in scale sets, assuming it is managed by availability set", name)
// Retry with standard type because master nodes may not belong to any vmss.
// TODO: find a better way to identify the type of VM.
return ss.availabilitySet.GetInstanceTypeByNodeName(name)
@ -166,30 +288,15 @@ func (ss *scaleSet) GetInstanceTypeByNodeName(name string) (string, error) {
return "", err
}
return instanceType, nil
}
func (ss *scaleSet) getScaleSetInstanceTypeByNodeName(name string) (string, error) {
machine, exists, err := ss.getScaleSetVM(name, ss.PrimaryScaleSetName)
if err != nil {
glog.Errorf("error: ss.getScaleSetInstanceTypeByNodeName(%s), ss.getScaleSetVM(%s) err=%v", name, name, err)
return "", err
} else if !exists {
return "", cloudprovider.InstanceNotFound
}
if machine.Sku.Name != nil {
return *machine.Sku.Name, nil
}
return "", fmt.Errorf("instance type is not defined")
return vm.Type, nil
}
// GetZoneByNodeName gets cloudprovider.Zone by node name.
func (ss *scaleSet) GetZoneByNodeName(name string) (cloudprovider.Zone, error) {
instanceID, err := getScaleSetVMInstanceID(name)
vm, err := ss.getCachedVirtualMachine(name)
if err != nil {
if err == ErrorNotVmssInstance {
if err == cloudprovider.InstanceNotFound {
glog.V(4).Infof("GetZoneByNodeName: node %q is not found in scale sets, assuming it is managed by availability set", name)
// Retry with standard type because master nodes may not belong to any vmss.
// TODO: find a better way to identify the type of VM.
return ss.availabilitySet.GetZoneByNodeName(name)
@ -197,23 +304,10 @@ func (ss *scaleSet) GetZoneByNodeName(name string) (cloudprovider.Zone, error) {
return cloudprovider.Zone{}, err
}
vm, err := ss.VirtualMachineScaleSetVMsClient.Get(ss.ResourceGroup, ss.Config.PrimaryScaleSetName, instanceID)
if err != nil {
return cloudprovider.Zone{}, err
}
// PlatformFaultDomain is not included in VirtualMachineScaleSetVM, so we get it from VirtualMachineScaleSetVMInstanceView.
vmView, err := ss.VirtualMachineScaleSetVMsClient.GetInstanceView(ss.ResourceGroup, ss.Config.PrimaryScaleSetName, instanceID)
if err != nil {
return cloudprovider.Zone{}, err
}
failureDomain := strconv.Itoa(int(*vmView.PlatformFaultDomain))
zone := cloudprovider.Zone{
FailureDomain: failureDomain,
Region: *(vm.Location),
}
return zone, nil
return cloudprovider.Zone{
FailureDomain: vm.FaultDomain,
Region: vm.Region,
}, nil
}
// GetPrimaryVMSetName returns the VM set name depending on the configured vmType.
@ -345,7 +439,7 @@ func (ss *scaleSet) listScaleSetVMsWithRetry(scaleSetName string) ([]compute.Vir
backoffError := wait.ExponentialBackoff(ss.requestBackoff(), func() (bool, error) {
ss.operationPollRateLimiter.Accept()
glog.V(10).Infof("VirtualMachineScaleSetVMsClient.List start for %v", scaleSetName)
result, err = ss.VirtualMachineScaleSetVMsClient.List(ss.ResourceGroup, scaleSetName, "", "", "")
result, err = ss.VirtualMachineScaleSetVMsClient.List(ss.ResourceGroup, scaleSetName, "", "", string(compute.InstanceView))
glog.V(10).Infof("VirtualMachineScaleSetVMsClient.List end for %v", scaleSetName)
if err != nil {
glog.Errorf("VirtualMachineScaleSetVMsClient.List for %v failed: %v", scaleSetName, err)
@ -388,25 +482,24 @@ func (ss *scaleSet) listScaleSetVMsWithRetry(scaleSetName string) ([]compute.Vir
return allVMs, nil
}
// getAgentPoolAvailabiliySets lists the virtual machines for for the resource group and then builds
// a list of availability sets that match the nodes available to k8s.
// getAgentPoolScaleSets lists the virtual machines for for the resource group and then builds
// a list of scale sets that match the nodes available to k8s.
func (ss *scaleSet) getAgentPoolScaleSets(nodes []*v1.Node) (*[]string, error) {
scaleSetNames, err := ss.listScaleSetsWithRetry()
if err != nil {
return nil, err
}
vmNameToScaleSetName := make(map[string]string, len(scaleSetNames))
for _, scaleSetName := range scaleSetNames {
vms, err := ss.listScaleSetVMsWithRetry(scaleSetName)
if err != nil {
ss.cacheMutex.Lock()
defer ss.cacheMutex.Unlock()
// Always update cache to get latest lists of scale sets and virtual machines.
if err := ss.updateCache(); err != nil {
return nil, err
}
vmNameToScaleSetName := make(map[string]string)
for scaleSetName := range ss.cache {
vms := ss.cache[scaleSetName]
for idx := range vms {
vm := vms[idx]
if vm.OsProfile != nil || vm.OsProfile.ComputerName != nil {
vmNameToScaleSetName[*vm.OsProfile.ComputerName] = scaleSetName
if vm.NodeName != "" {
vmNameToScaleSetName[vm.NodeName] = scaleSetName
}
}
}
@ -488,43 +581,42 @@ func (ss *scaleSet) GetVMSetNames(service *v1.Service, nodes []*v1.Node) (vmSetN
// GetPrimaryInterface gets machine primary network interface by node name and vmSet.
func (ss *scaleSet) GetPrimaryInterface(nodeName, vmSetName string) (network.Interface, error) {
ss.operationPollRateLimiter.Accept()
machine, exists, err := ss.getScaleSetVM(nodeName, vmSetName)
if !exists || err == cloudprovider.InstanceNotFound || err == ErrorNotVmssInstance {
vm, err := ss.getCachedVirtualMachine(nodeName)
if err != nil {
if err == cloudprovider.InstanceNotFound {
// Retry with standard type because master nodes may not belong to any vmss.
// TODO: find a better way to identify the type of VM.
return ss.availabilitySet.GetPrimaryInterface(nodeName, "")
}
if err != nil {
glog.Errorf("error: ss.GetPrimaryInterface(%s), ss.getScaleSetVM(%s), err=%v", nodeName, nodeName, err)
glog.Errorf("error: ss.GetPrimaryInterface(%s), ss.getCachedVirtualMachine(%s), err=%v", nodeName, nodeName, err)
return network.Interface{}, err
}
nicID, err := ss.getPrimaryInterfaceID(machine)
if err != nil {
glog.Errorf("error: ss.GetPrimaryInterface(%s), getPrimaryInterfaceID(%v), err=%v", nodeName, machine, err)
return network.Interface{}, err
// Check scale set name.
if vmSetName != "" && !strings.EqualFold(vm.ScaleSetName, vmSetName) {
return network.Interface{}, errNotInVMSet
}
nicName, err := getLastSegment(nicID)
nicName, err := getLastSegment(vm.PrimaryInterfaceID)
if err != nil {
glog.Errorf("error: ss.GetPrimaryInterface(%s), getLastSegment(%s), err=%v", nodeName, nicID, err)
glog.Errorf("error: ss.GetPrimaryInterface(%s), getLastSegment(%s), err=%v", nodeName, vm.PrimaryInterfaceID, err)
return network.Interface{}, err
}
ss.operationPollRateLimiter.Accept()
glog.V(10).Infof("InterfacesClient.Get(%q): start", nicName)
nic, err := ss.InterfacesClient.GetVirtualMachineScaleSetNetworkInterface(ss.ResourceGroup, ss.Config.PrimaryScaleSetName, *machine.InstanceID, nicName, "")
nic, err := ss.InterfacesClient.GetVirtualMachineScaleSetNetworkInterface(ss.ResourceGroup, vm.ScaleSetName, vm.InstanceID, nicName, "")
glog.V(10).Infof("InterfacesClient.Get(%q): end", nicName)
if err != nil {
glog.Errorf("error: ss.GetPrimaryInterface(%s), ss.GetVirtualMachineScaleSetNetworkInterface.Get(%s, %s, %s), err=%v", nodeName, ss.ResourceGroup, nicName, "", err)
glog.Errorf("error: ss.GetPrimaryInterface(%s), ss.GetVirtualMachineScaleSetNetworkInterface.Get(%s, %s, %s), err=%v", nodeName, ss.ResourceGroup, vm.ScaleSetName, nicName, err)
return network.Interface{}, err
}
// Fix interface's location, which is required when updating the interface.
// TODO: is this a bug of azure SDK?
if nic.Location == nil || *nic.Location == "" {
nic.Location = &ss.Config.Location
nic.Location = &vm.Region
}
return nic, nil