Add caches for cross node resource groups and unmanaged nodes.

This commit is contained in:
Pengfei Ni 2018-08-16 17:10:37 +08:00
parent 59fdc02b13
commit d499855ba2

View File

@ -58,6 +58,9 @@ const (
loadBalancerSkuBasic = "basic"
loadBalancerSkuStandard = "standard"
externalResourceGroupLabel = "kubernetes.azure.com/resource-group"
managedByAzureLabel = "kubernetes.azure.com/managed"
)
var (
@ -156,11 +159,16 @@ type Cloud struct {
metadata *InstanceMetadata
vmSet VMSet
// Lock for access to nodeZones
nodeZonesLock sync.Mutex
// Lock for access to node caches
nodeCachesLock sync.Mutex
// nodeZones is a mapping from Zone to a sets.String of Node's names in the Zone
// it is updated by the nodeInformer
nodeZones map[string]sets.String
// nodeResourceGroups holds nodes external resource groups
nodeResourceGroups map[string]string
// unmanagedNodes holds a list of nodes not managed by Azure cloud provider.
unmanagedNodes sets.String
// nodeInformerSynced is for determining if the informer has synced.
nodeInformerSynced cache.InformerSynced
// Clients for vmss.
@ -257,6 +265,8 @@ func NewCloud(configReader io.Reader) (cloudprovider.Interface, error) {
Config: *config,
Environment: *env,
nodeZones: map[string]sets.String{},
nodeResourceGroups: map[string]string{},
unmanagedNodes: sets.NewString(),
DisksClient: newAzDisksClient(azClientConfig),
RoutesClient: newAzRoutesClient(azClientConfig),
@ -294,7 +304,7 @@ func NewCloud(configReader io.Reader) (cloudprovider.Interface, error) {
Duration: time.Duration(az.CloudProviderBackoffDuration) * time.Second,
Jitter: az.CloudProviderBackoffJitter,
}
glog.V(2).Infof("Azure cloudprovider using retry backoff: retries=%d, exponent=%f, duration=%d, jitter=%f",
glog.V(2).Infof("Azure cloudprovider using try backoff: retries=%d, exponent=%f, duration=%d, jitter=%f",
az.CloudProviderBackoffRetries,
az.CloudProviderBackoffExponent,
az.CloudProviderBackoffDuration,
@ -449,7 +459,7 @@ func (az *Cloud) SetInformers(informerFactory informers.SharedInformerFactory) {
nodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
node := obj.(*v1.Node)
az.updateNodeZones(nil, node)
az.updateNodeCaches(nil, node)
},
UpdateFunc: func(prev, obj interface{}) {
prevNode := prev.(*v1.Node)
@ -458,7 +468,7 @@ func (az *Cloud) SetInformers(informerFactory informers.SharedInformerFactory) {
prevNode.Labels[kubeletapis.LabelZoneFailureDomain] {
return
}
az.updateNodeZones(prevNode, newNode)
az.updateNodeCaches(prevNode, newNode)
},
DeleteFunc: func(obj interface{}) {
node, isNode := obj.(*v1.Node)
@ -476,16 +486,19 @@ func (az *Cloud) SetInformers(informerFactory informers.SharedInformerFactory) {
return
}
}
az.updateNodeZones(node, nil)
az.updateNodeCaches(node, nil)
},
})
az.nodeInformerSynced = nodeInformer.HasSynced
}
func (az *Cloud) updateNodeZones(prevNode, newNode *v1.Node) {
az.nodeZonesLock.Lock()
defer az.nodeZonesLock.Unlock()
// updateNodeCaches updates local cache for node's zones and external resource groups.
func (az *Cloud) updateNodeCaches(prevNode, newNode *v1.Node) {
az.nodeCachesLock.Lock()
defer az.nodeCachesLock.Unlock()
if prevNode != nil {
// Remove from nodeZones cache.
prevZone, ok := prevNode.ObjectMeta.Labels[kubeletapis.LabelZoneFailureDomain]
if ok && az.isAvailabilityZone(prevZone) {
az.nodeZones[prevZone].Delete(prevNode.ObjectMeta.Name)
@ -493,8 +506,22 @@ func (az *Cloud) updateNodeZones(prevNode, newNode *v1.Node) {
az.nodeZones[prevZone] = nil
}
}
// Remove from nodeResourceGroups cache.
_, ok = prevNode.ObjectMeta.Labels[externalResourceGroupLabel]
if ok {
delete(az.nodeResourceGroups, prevNode.ObjectMeta.Name)
}
// Remove from unmanagedNodes cache.
managed, ok := prevNode.ObjectMeta.Labels[managedByAzureLabel]
if ok && managed == "false" {
az.unmanagedNodes.Delete(prevNode.ObjectMeta.Name)
}
}
if newNode != nil {
// Add to nodeZones cache.
newZone, ok := newNode.ObjectMeta.Labels[kubeletapis.LabelZoneFailureDomain]
if ok && az.isAvailabilityZone(newZone) {
if az.nodeZones[newZone] == nil {
@ -502,6 +529,18 @@ func (az *Cloud) updateNodeZones(prevNode, newNode *v1.Node) {
}
az.nodeZones[newZone].Insert(newNode.ObjectMeta.Name)
}
// Add to nodeResourceGroups cache.
newRG, ok := newNode.ObjectMeta.Labels[externalResourceGroupLabel]
if ok && len(newRG) > 0 {
az.nodeResourceGroups[newNode.ObjectMeta.Name] = newRG
}
// Add to unmanagedNodes cache.
managed, ok := newNode.ObjectMeta.Labels[managedByAzureLabel]
if ok && managed == "false" {
az.unmanagedNodes.Insert(newNode.ObjectMeta.Name)
}
}
}
@ -511,8 +550,8 @@ func (az *Cloud) GetActiveZones() (sets.String, error) {
return nil, fmt.Errorf("Azure cloud provider doesn't have informers set")
}
az.nodeZonesLock.Lock()
defer az.nodeZonesLock.Unlock()
az.nodeCachesLock.Lock()
defer az.nodeCachesLock.Unlock()
if !az.nodeInformerSynced() {
return nil, fmt.Errorf("node informer is not synced when trying to GetActiveZones")
}
@ -530,3 +569,76 @@ func (az *Cloud) GetActiveZones() (sets.String, error) {
func (az *Cloud) GetLocation() string {
return az.Location
}
// GetNodeResourceGroup gets resource group for given node.
func (az *Cloud) GetNodeResourceGroup(nodeName string) (string, error) {
// Kubelet won't set az.nodeInformerSynced, always return configured resourceGroup.
if az.nodeInformerSynced == nil {
return az.ResourceGroup, nil
}
az.nodeCachesLock.Lock()
defer az.nodeCachesLock.Unlock()
if !az.nodeInformerSynced() {
return "", fmt.Errorf("node informer is not synced when trying to GetNodeResourceGroup")
}
// Return external resource group if it has been cached.
if cachedRG, ok := az.nodeResourceGroups[nodeName]; ok {
return cachedRG, nil
}
// Return resource group from cloud provider options.
return az.ResourceGroup, nil
}
// GetResourceGroups returns a set of resource groups that all nodes are running on.
func (az *Cloud) GetResourceGroups() (sets.String, error) {
// Kubelet won't set az.nodeInformerSynced, always return configured resourceGroup.
if az.nodeInformerSynced == nil {
return sets.NewString(az.ResourceGroup), nil
}
az.nodeCachesLock.Lock()
defer az.nodeCachesLock.Unlock()
if !az.nodeInformerSynced() {
return nil, fmt.Errorf("node informer is not synced when trying to GetResourceGroups")
}
resourceGroups := sets.NewString(az.ResourceGroup)
for _, rg := range az.nodeResourceGroups {
resourceGroups.Insert(rg)
}
return resourceGroups, nil
}
// GetUnmanagedNodes returns a list of nodes not managed by Azure cloud provider (e.g. on-prem nodes).
func (az *Cloud) GetUnmanagedNodes() (sets.String, error) {
// Kubelet won't set az.nodeInformerSynced, always return nil.
if az.nodeInformerSynced == nil {
return nil, nil
}
az.nodeCachesLock.Lock()
defer az.nodeCachesLock.Unlock()
if !az.nodeInformerSynced() {
return nil, fmt.Errorf("node informer is not synced when trying to GetUnmanagedNodes")
}
return sets.NewString(az.unmanagedNodes.List()...), nil
}
// ShouldNodeExcludedFromLoadBalancer returns true if node is unmanaged or in external resource group.
func (az *Cloud) ShouldNodeExcludedFromLoadBalancer(node *v1.Node) bool {
labels := node.ObjectMeta.Labels
if rg, ok := labels[externalResourceGroupLabel]; ok && rg != az.ResourceGroup {
return true
}
if managed, ok := labels[managedByAzureLabel]; ok && managed == "false" {
return true
}
return false
}