Merge pull request #88259 from feiskyer/cache-cleanup
chore: move Azure caches to a separate package
This commit is contained in:
		| @@ -12,7 +12,6 @@ go_library( | |||||||
|         "azure.go", |         "azure.go", | ||||||
|         "azure_backoff.go", |         "azure_backoff.go", | ||||||
|         "azure_blobDiskController.go", |         "azure_blobDiskController.go", | ||||||
|         "azure_cache.go", |  | ||||||
|         "azure_config.go", |         "azure_config.go", | ||||||
|         "azure_controller_common.go", |         "azure_controller_common.go", | ||||||
|         "azure_controller_standard.go", |         "azure_controller_standard.go", | ||||||
| @@ -62,6 +61,7 @@ go_library( | |||||||
|         "//staging/src/k8s.io/cloud-provider/volume/helpers:go_default_library", |         "//staging/src/k8s.io/cloud-provider/volume/helpers:go_default_library", | ||||||
|         "//staging/src/k8s.io/component-base/featuregate:go_default_library", |         "//staging/src/k8s.io/component-base/featuregate:go_default_library", | ||||||
|         "//staging/src/k8s.io/legacy-cloud-providers/azure/auth:go_default_library", |         "//staging/src/k8s.io/legacy-cloud-providers/azure/auth:go_default_library", | ||||||
|  |         "//staging/src/k8s.io/legacy-cloud-providers/azure/cache:go_default_library", | ||||||
|         "//staging/src/k8s.io/legacy-cloud-providers/azure/clients:go_default_library", |         "//staging/src/k8s.io/legacy-cloud-providers/azure/clients:go_default_library", | ||||||
|         "//staging/src/k8s.io/legacy-cloud-providers/azure/clients/diskclient:go_default_library", |         "//staging/src/k8s.io/legacy-cloud-providers/azure/clients/diskclient:go_default_library", | ||||||
|         "//staging/src/k8s.io/legacy-cloud-providers/azure/clients/interfaceclient:go_default_library", |         "//staging/src/k8s.io/legacy-cloud-providers/azure/clients/interfaceclient:go_default_library", | ||||||
| @@ -95,7 +95,6 @@ go_library( | |||||||
| go_test( | go_test( | ||||||
|     name = "go_default_test", |     name = "go_default_test", | ||||||
|     srcs = [ |     srcs = [ | ||||||
|         "azure_cache_test.go", |  | ||||||
|         "azure_config_test.go", |         "azure_config_test.go", | ||||||
|         "azure_controller_common_test.go", |         "azure_controller_common_test.go", | ||||||
|         "azure_controller_standard_test.go", |         "azure_controller_standard_test.go", | ||||||
| @@ -125,6 +124,7 @@ go_test( | |||||||
|         "//staging/src/k8s.io/cloud-provider:go_default_library", |         "//staging/src/k8s.io/cloud-provider:go_default_library", | ||||||
|         "//staging/src/k8s.io/cloud-provider/service/helpers:go_default_library", |         "//staging/src/k8s.io/cloud-provider/service/helpers:go_default_library", | ||||||
|         "//staging/src/k8s.io/legacy-cloud-providers/azure/auth:go_default_library", |         "//staging/src/k8s.io/legacy-cloud-providers/azure/auth:go_default_library", | ||||||
|  |         "//staging/src/k8s.io/legacy-cloud-providers/azure/cache:go_default_library", | ||||||
|         "//staging/src/k8s.io/legacy-cloud-providers/azure/clients:go_default_library", |         "//staging/src/k8s.io/legacy-cloud-providers/azure/clients:go_default_library", | ||||||
|         "//staging/src/k8s.io/legacy-cloud-providers/azure/retry:go_default_library", |         "//staging/src/k8s.io/legacy-cloud-providers/azure/retry:go_default_library", | ||||||
|         "//vendor/github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2019-07-01/compute:go_default_library", |         "//vendor/github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2019-07-01/compute:go_default_library", | ||||||
| @@ -149,6 +149,7 @@ filegroup( | |||||||
|     srcs = [ |     srcs = [ | ||||||
|         ":package-srcs", |         ":package-srcs", | ||||||
|         "//staging/src/k8s.io/legacy-cloud-providers/azure/auth:all-srcs", |         "//staging/src/k8s.io/legacy-cloud-providers/azure/auth:all-srcs", | ||||||
|  |         "//staging/src/k8s.io/legacy-cloud-providers/azure/cache:all-srcs", | ||||||
|         "//staging/src/k8s.io/legacy-cloud-providers/azure/clients:all-srcs", |         "//staging/src/k8s.io/legacy-cloud-providers/azure/clients:all-srcs", | ||||||
|         "//staging/src/k8s.io/legacy-cloud-providers/azure/metrics:all-srcs", |         "//staging/src/k8s.io/legacy-cloud-providers/azure/metrics:all-srcs", | ||||||
|         "//staging/src/k8s.io/legacy-cloud-providers/azure/retry:all-srcs", |         "//staging/src/k8s.io/legacy-cloud-providers/azure/retry:all-srcs", | ||||||
|   | |||||||
| @@ -43,6 +43,7 @@ import ( | |||||||
| 	cloudprovider "k8s.io/cloud-provider" | 	cloudprovider "k8s.io/cloud-provider" | ||||||
| 	"k8s.io/klog" | 	"k8s.io/klog" | ||||||
| 	"k8s.io/legacy-cloud-providers/azure/auth" | 	"k8s.io/legacy-cloud-providers/azure/auth" | ||||||
|  | 	azcache "k8s.io/legacy-cloud-providers/azure/cache" | ||||||
| 	azclients "k8s.io/legacy-cloud-providers/azure/clients" | 	azclients "k8s.io/legacy-cloud-providers/azure/clients" | ||||||
| 	"k8s.io/legacy-cloud-providers/azure/clients/diskclient" | 	"k8s.io/legacy-cloud-providers/azure/clients/diskclient" | ||||||
| 	"k8s.io/legacy-cloud-providers/azure/clients/interfaceclient" | 	"k8s.io/legacy-cloud-providers/azure/clients/interfaceclient" | ||||||
| @@ -274,10 +275,10 @@ type Cloud struct { | |||||||
| 	eventRecorder    record.EventRecorder | 	eventRecorder    record.EventRecorder | ||||||
| 	routeUpdater     *delayedRouteUpdater | 	routeUpdater     *delayedRouteUpdater | ||||||
|  |  | ||||||
| 	vmCache  *timedCache | 	vmCache  *azcache.TimedCache | ||||||
| 	lbCache  *timedCache | 	lbCache  *azcache.TimedCache | ||||||
| 	nsgCache *timedCache | 	nsgCache *azcache.TimedCache | ||||||
| 	rtCache  *timedCache | 	rtCache  *azcache.TimedCache | ||||||
|  |  | ||||||
| 	*BlobDiskController | 	*BlobDiskController | ||||||
| 	*ManagedDiskController | 	*ManagedDiskController | ||||||
|   | |||||||
| @@ -32,6 +32,7 @@ import ( | |||||||
| 	"k8s.io/apimachinery/pkg/util/wait" | 	"k8s.io/apimachinery/pkg/util/wait" | ||||||
| 	cloudprovider "k8s.io/cloud-provider" | 	cloudprovider "k8s.io/cloud-provider" | ||||||
| 	"k8s.io/klog" | 	"k8s.io/klog" | ||||||
|  | 	azcache "k8s.io/legacy-cloud-providers/azure/cache" | ||||||
| 	"k8s.io/legacy-cloud-providers/azure/retry" | 	"k8s.io/legacy-cloud-providers/azure/retry" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| @@ -65,7 +66,7 @@ func (az *Cloud) Event(obj runtime.Object, eventtype, reason, message string) { | |||||||
| } | } | ||||||
|  |  | ||||||
| // GetVirtualMachineWithRetry invokes az.getVirtualMachine with exponential backoff retry | // GetVirtualMachineWithRetry invokes az.getVirtualMachine with exponential backoff retry | ||||||
| func (az *Cloud) GetVirtualMachineWithRetry(name types.NodeName, crt cacheReadType) (compute.VirtualMachine, error) { | func (az *Cloud) GetVirtualMachineWithRetry(name types.NodeName, crt azcache.AzureCacheReadType) (compute.VirtualMachine, error) { | ||||||
| 	var machine compute.VirtualMachine | 	var machine compute.VirtualMachine | ||||||
| 	var retryErr error | 	var retryErr error | ||||||
| 	err := wait.ExponentialBackoff(az.RequestBackoff(), func() (bool, error) { | 	err := wait.ExponentialBackoff(az.RequestBackoff(), func() (bool, error) { | ||||||
|   | |||||||
| @@ -34,6 +34,7 @@ import ( | |||||||
| 	cloudprovider "k8s.io/cloud-provider" | 	cloudprovider "k8s.io/cloud-provider" | ||||||
| 	volerr "k8s.io/cloud-provider/volume/errors" | 	volerr "k8s.io/cloud-provider/volume/errors" | ||||||
| 	"k8s.io/klog" | 	"k8s.io/klog" | ||||||
|  | 	azcache "k8s.io/legacy-cloud-providers/azure/cache" | ||||||
| 	"k8s.io/legacy-cloud-providers/azure/retry" | 	"k8s.io/legacy-cloud-providers/azure/retry" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| @@ -87,7 +88,7 @@ type controllerCommon struct { | |||||||
| } | } | ||||||
|  |  | ||||||
| // getNodeVMSet gets the VMSet interface based on config.VMType and the real virtual machine type. | // getNodeVMSet gets the VMSet interface based on config.VMType and the real virtual machine type. | ||||||
| func (c *controllerCommon) getNodeVMSet(nodeName types.NodeName, crt cacheReadType) (VMSet, error) { | func (c *controllerCommon) getNodeVMSet(nodeName types.NodeName, crt azcache.AzureCacheReadType) (VMSet, error) { | ||||||
| 	// 1. vmType is standard, return cloud.vmSet directly. | 	// 1. vmType is standard, return cloud.vmSet directly. | ||||||
| 	if c.cloud.VMType == vmTypeStandard { | 	if c.cloud.VMType == vmTypeStandard { | ||||||
| 		return c.cloud.vmSet, nil | 		return c.cloud.vmSet, nil | ||||||
| @@ -155,7 +156,7 @@ func (c *controllerCommon) AttachDisk(isManagedDisk bool, diskName, diskURI stri | |||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	vmset, err := c.getNodeVMSet(nodeName, cacheReadTypeUnsafe) | 	vmset, err := c.getNodeVMSet(nodeName, azcache.CacheReadTypeUnsafe) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return -1, err | 		return -1, err | ||||||
| 	} | 	} | ||||||
| @@ -195,7 +196,7 @@ func (c *controllerCommon) DetachDisk(diskName, diskURI string, nodeName types.N | |||||||
| 		return fmt.Errorf("failed to get azure instance id for node %q (%v)", nodeName, err) | 		return fmt.Errorf("failed to get azure instance id for node %q (%v)", nodeName, err) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	vmset, err := c.getNodeVMSet(nodeName, cacheReadTypeUnsafe) | 	vmset, err := c.getNodeVMSet(nodeName, azcache.CacheReadTypeUnsafe) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
| @@ -239,7 +240,7 @@ func (c *controllerCommon) DetachDisk(diskName, diskURI string, nodeName types.N | |||||||
| } | } | ||||||
|  |  | ||||||
| // getNodeDataDisks invokes vmSet interfaces to get data disks for the node. | // getNodeDataDisks invokes vmSet interfaces to get data disks for the node. | ||||||
| func (c *controllerCommon) getNodeDataDisks(nodeName types.NodeName, crt cacheReadType) ([]compute.DataDisk, error) { | func (c *controllerCommon) getNodeDataDisks(nodeName types.NodeName, crt azcache.AzureCacheReadType) ([]compute.DataDisk, error) { | ||||||
| 	vmset, err := c.getNodeVMSet(nodeName, crt) | 	vmset, err := c.getNodeVMSet(nodeName, crt) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return nil, err | 		return nil, err | ||||||
| @@ -252,7 +253,7 @@ func (c *controllerCommon) getNodeDataDisks(nodeName types.NodeName, crt cacheRe | |||||||
| func (c *controllerCommon) GetDiskLun(diskName, diskURI string, nodeName types.NodeName) (int32, error) { | func (c *controllerCommon) GetDiskLun(diskName, diskURI string, nodeName types.NodeName) (int32, error) { | ||||||
| 	// getNodeDataDisks need to fetch the cached data/fresh data if cache expired here | 	// getNodeDataDisks need to fetch the cached data/fresh data if cache expired here | ||||||
| 	// to ensure we get LUN based on latest entry. | 	// to ensure we get LUN based on latest entry. | ||||||
| 	disks, err := c.getNodeDataDisks(nodeName, cacheReadTypeDefault) | 	disks, err := c.getNodeDataDisks(nodeName, azcache.CacheReadTypeDefault) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		klog.Errorf("error of getting data disks for node %q: %v", nodeName, err) | 		klog.Errorf("error of getting data disks for node %q: %v", nodeName, err) | ||||||
| 		return -1, err | 		return -1, err | ||||||
| @@ -276,7 +277,7 @@ func (c *controllerCommon) GetDiskLun(diskName, diskURI string, nodeName types.N | |||||||
|  |  | ||||||
| // GetNextDiskLun searches all vhd attachment on the host and find unused lun. Return -1 if all luns are used. | // GetNextDiskLun searches all vhd attachment on the host and find unused lun. Return -1 if all luns are used. | ||||||
| func (c *controllerCommon) GetNextDiskLun(nodeName types.NodeName) (int32, error) { | func (c *controllerCommon) GetNextDiskLun(nodeName types.NodeName) (int32, error) { | ||||||
| 	disks, err := c.getNodeDataDisks(nodeName, cacheReadTypeDefault) | 	disks, err := c.getNodeDataDisks(nodeName, azcache.CacheReadTypeDefault) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		klog.Errorf("error of getting data disks for node %q: %v", nodeName, err) | 		klog.Errorf("error of getting data disks for node %q: %v", nodeName, err) | ||||||
| 		return -1, err | 		return -1, err | ||||||
| @@ -307,7 +308,7 @@ func (c *controllerCommon) DisksAreAttached(diskNames []string, nodeName types.N | |||||||
| 	// for every reconcile call. The cache is invalidated after Attach/Detach | 	// for every reconcile call. The cache is invalidated after Attach/Detach | ||||||
| 	// disk. So the new entry will be fetched and cached the first time reconcile | 	// disk. So the new entry will be fetched and cached the first time reconcile | ||||||
| 	// loop runs after the Attach/Disk OP which will reflect the latest model. | 	// loop runs after the Attach/Disk OP which will reflect the latest model. | ||||||
| 	disks, err := c.getNodeDataDisks(nodeName, cacheReadTypeUnsafe) | 	disks, err := c.getNodeDataDisks(nodeName, azcache.CacheReadTypeUnsafe) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		if err == cloudprovider.InstanceNotFound { | 		if err == cloudprovider.InstanceNotFound { | ||||||
| 			// if host doesn't exist, no need to detach | 			// if host doesn't exist, no need to detach | ||||||
|   | |||||||
| @@ -26,12 +26,13 @@ import ( | |||||||
|  |  | ||||||
| 	"k8s.io/apimachinery/pkg/types" | 	"k8s.io/apimachinery/pkg/types" | ||||||
| 	"k8s.io/klog" | 	"k8s.io/klog" | ||||||
|  | 	azcache "k8s.io/legacy-cloud-providers/azure/cache" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| // AttachDisk attaches a vhd to vm | // AttachDisk attaches a vhd to vm | ||||||
| // the vhd must exist, can be identified by diskName, diskURI, and lun. | // the vhd must exist, can be identified by diskName, diskURI, and lun. | ||||||
| func (as *availabilitySet) AttachDisk(isManagedDisk bool, diskName, diskURI string, nodeName types.NodeName, lun int32, cachingMode compute.CachingTypes, diskEncryptionSetID string, writeAcceleratorEnabled bool) error { | func (as *availabilitySet) AttachDisk(isManagedDisk bool, diskName, diskURI string, nodeName types.NodeName, lun int32, cachingMode compute.CachingTypes, diskEncryptionSetID string, writeAcceleratorEnabled bool) error { | ||||||
| 	vm, err := as.getVirtualMachine(nodeName, cacheReadTypeDefault) | 	vm, err := as.getVirtualMachine(nodeName, azcache.CacheReadTypeDefault) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
| @@ -115,7 +116,7 @@ func (as *availabilitySet) AttachDisk(isManagedDisk bool, diskName, diskURI stri | |||||||
| // DetachDisk detaches a disk from host | // DetachDisk detaches a disk from host | ||||||
| // the vhd can be identified by diskName or diskURI | // the vhd can be identified by diskName or diskURI | ||||||
| func (as *availabilitySet) DetachDisk(diskName, diskURI string, nodeName types.NodeName) error { | func (as *availabilitySet) DetachDisk(diskName, diskURI string, nodeName types.NodeName) error { | ||||||
| 	vm, err := as.getVirtualMachine(nodeName, cacheReadTypeDefault) | 	vm, err := as.getVirtualMachine(nodeName, azcache.CacheReadTypeDefault) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		// if host doesn't exist, no need to detach | 		// if host doesn't exist, no need to detach | ||||||
| 		klog.Warningf("azureDisk - cannot find node %s, skip detaching disk(%s, %s)", nodeName, diskName, diskURI) | 		klog.Warningf("azureDisk - cannot find node %s, skip detaching disk(%s, %s)", nodeName, diskName, diskURI) | ||||||
| @@ -172,7 +173,7 @@ func (as *availabilitySet) DetachDisk(diskName, diskURI string, nodeName types.N | |||||||
| } | } | ||||||
|  |  | ||||||
| // GetDataDisks gets a list of data disks attached to the node. | // GetDataDisks gets a list of data disks attached to the node. | ||||||
| func (as *availabilitySet) GetDataDisks(nodeName types.NodeName, crt cacheReadType) ([]compute.DataDisk, error) { | func (as *availabilitySet) GetDataDisks(nodeName types.NodeName, crt azcache.AzureCacheReadType) ([]compute.DataDisk, error) { | ||||||
| 	vm, err := as.getVirtualMachine(nodeName, crt) | 	vm, err := as.getVirtualMachine(nodeName, crt) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return nil, err | 		return nil, err | ||||||
|   | |||||||
| @@ -27,6 +27,11 @@ import ( | |||||||
| 	"github.com/stretchr/testify/assert" | 	"github.com/stretchr/testify/assert" | ||||||
|  |  | ||||||
| 	"k8s.io/apimachinery/pkg/types" | 	"k8s.io/apimachinery/pkg/types" | ||||||
|  | 	azcache "k8s.io/legacy-cloud-providers/azure/cache" | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | var ( | ||||||
|  | 	fakeCacheTTL = 2 * time.Second | ||||||
| ) | ) | ||||||
|  |  | ||||||
| func TestStandardAttachDisk(t *testing.T) { | func TestStandardAttachDisk(t *testing.T) { | ||||||
| @@ -100,14 +105,14 @@ func TestGetDataDisks(t *testing.T) { | |||||||
| 		nodeName          types.NodeName | 		nodeName          types.NodeName | ||||||
| 		expectedDataDisks []compute.DataDisk | 		expectedDataDisks []compute.DataDisk | ||||||
| 		expectedError     bool | 		expectedError     bool | ||||||
| 		crt               cacheReadType | 		crt               azcache.AzureCacheReadType | ||||||
| 	}{ | 	}{ | ||||||
| 		{ | 		{ | ||||||
| 			desc:              "an error shall be returned if there's no corresponding vm", | 			desc:              "an error shall be returned if there's no corresponding vm", | ||||||
| 			nodeName:          "vm2", | 			nodeName:          "vm2", | ||||||
| 			expectedDataDisks: nil, | 			expectedDataDisks: nil, | ||||||
| 			expectedError:     true, | 			expectedError:     true, | ||||||
| 			crt:               cacheReadTypeDefault, | 			crt:               azcache.CacheReadTypeDefault, | ||||||
| 		}, | 		}, | ||||||
| 		{ | 		{ | ||||||
| 			desc:     "correct list of data disks shall be returned if everything is good", | 			desc:     "correct list of data disks shall be returned if everything is good", | ||||||
| @@ -119,7 +124,7 @@ func TestGetDataDisks(t *testing.T) { | |||||||
| 				}, | 				}, | ||||||
| 			}, | 			}, | ||||||
| 			expectedError: false, | 			expectedError: false, | ||||||
| 			crt:           cacheReadTypeDefault, | 			crt:           azcache.CacheReadTypeDefault, | ||||||
| 		}, | 		}, | ||||||
| 		{ | 		{ | ||||||
| 			desc:     "correct list of data disks shall be returned if everything is good", | 			desc:     "correct list of data disks shall be returned if everything is good", | ||||||
| @@ -131,7 +136,7 @@ func TestGetDataDisks(t *testing.T) { | |||||||
| 				}, | 				}, | ||||||
| 			}, | 			}, | ||||||
| 			expectedError: false, | 			expectedError: false, | ||||||
| 			crt:           cacheReadTypeUnsafe, | 			crt:           azcache.CacheReadTypeUnsafe, | ||||||
| 		}, | 		}, | ||||||
| 	} | 	} | ||||||
| 	for i, test := range testCases { | 	for i, test := range testCases { | ||||||
| @@ -143,7 +148,7 @@ func TestGetDataDisks(t *testing.T) { | |||||||
| 		assert.Equal(t, test.expectedDataDisks, dataDisks, "TestCase[%d]: %s", i, test.desc) | 		assert.Equal(t, test.expectedDataDisks, dataDisks, "TestCase[%d]: %s", i, test.desc) | ||||||
| 		assert.Equal(t, test.expectedError, err != nil, "TestCase[%d]: %s", i, test.desc) | 		assert.Equal(t, test.expectedError, err != nil, "TestCase[%d]: %s", i, test.desc) | ||||||
|  |  | ||||||
| 		if test.crt == cacheReadTypeUnsafe { | 		if test.crt == azcache.CacheReadTypeUnsafe { | ||||||
| 			time.Sleep(fakeCacheTTL) | 			time.Sleep(fakeCacheTTL) | ||||||
| 			dataDisks, err := vmSet.GetDataDisks(test.nodeName, test.crt) | 			dataDisks, err := vmSet.GetDataDisks(test.nodeName, test.crt) | ||||||
| 			assert.Equal(t, test.expectedDataDisks, dataDisks, "TestCase[%d]: %s", i, test.desc) | 			assert.Equal(t, test.expectedDataDisks, dataDisks, "TestCase[%d]: %s", i, test.desc) | ||||||
|   | |||||||
| @@ -26,13 +26,14 @@ import ( | |||||||
|  |  | ||||||
| 	"k8s.io/apimachinery/pkg/types" | 	"k8s.io/apimachinery/pkg/types" | ||||||
| 	"k8s.io/klog" | 	"k8s.io/klog" | ||||||
|  | 	azcache "k8s.io/legacy-cloud-providers/azure/cache" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| // AttachDisk attaches a vhd to vm | // AttachDisk attaches a vhd to vm | ||||||
| // the vhd must exist, can be identified by diskName, diskURI, and lun. | // the vhd must exist, can be identified by diskName, diskURI, and lun. | ||||||
| func (ss *scaleSet) AttachDisk(isManagedDisk bool, diskName, diskURI string, nodeName types.NodeName, lun int32, cachingMode compute.CachingTypes, diskEncryptionSetID string, writeAcceleratorEnabled bool) error { | func (ss *scaleSet) AttachDisk(isManagedDisk bool, diskName, diskURI string, nodeName types.NodeName, lun int32, cachingMode compute.CachingTypes, diskEncryptionSetID string, writeAcceleratorEnabled bool) error { | ||||||
| 	vmName := mapNodeNameToVMName(nodeName) | 	vmName := mapNodeNameToVMName(nodeName) | ||||||
| 	ssName, instanceID, vm, err := ss.getVmssVM(vmName, cacheReadTypeDefault) | 	ssName, instanceID, vm, err := ss.getVmssVM(vmName, azcache.CacheReadTypeDefault) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
| @@ -120,7 +121,7 @@ func (ss *scaleSet) AttachDisk(isManagedDisk bool, diskName, diskURI string, nod | |||||||
| // the vhd can be identified by diskName or diskURI | // the vhd can be identified by diskName or diskURI | ||||||
| func (ss *scaleSet) DetachDisk(diskName, diskURI string, nodeName types.NodeName) error { | func (ss *scaleSet) DetachDisk(diskName, diskURI string, nodeName types.NodeName) error { | ||||||
| 	vmName := mapNodeNameToVMName(nodeName) | 	vmName := mapNodeNameToVMName(nodeName) | ||||||
| 	ssName, instanceID, vm, err := ss.getVmssVM(vmName, cacheReadTypeDefault) | 	ssName, instanceID, vm, err := ss.getVmssVM(vmName, azcache.CacheReadTypeDefault) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
| @@ -180,7 +181,7 @@ func (ss *scaleSet) DetachDisk(diskName, diskURI string, nodeName types.NodeName | |||||||
| } | } | ||||||
|  |  | ||||||
| // GetDataDisks gets a list of data disks attached to the node. | // GetDataDisks gets a list of data disks attached to the node. | ||||||
| func (ss *scaleSet) GetDataDisks(nodeName types.NodeName, crt cacheReadType) ([]compute.DataDisk, error) { | func (ss *scaleSet) GetDataDisks(nodeName types.NodeName, crt azcache.AzureCacheReadType) ([]compute.DataDisk, error) { | ||||||
| 	_, _, vm, err := ss.getVmssVM(string(nodeName), crt) | 	_, _, vm, err := ss.getVmssVM(string(nodeName), crt) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return nil, err | 		return nil, err | ||||||
|   | |||||||
| @@ -35,6 +35,7 @@ import ( | |||||||
| 	v1 "k8s.io/api/core/v1" | 	v1 "k8s.io/api/core/v1" | ||||||
| 	"k8s.io/apimachinery/pkg/types" | 	"k8s.io/apimachinery/pkg/types" | ||||||
| 	cloudprovider "k8s.io/cloud-provider" | 	cloudprovider "k8s.io/cloud-provider" | ||||||
|  | 	azcache "k8s.io/legacy-cloud-providers/azure/cache" | ||||||
| 	"k8s.io/legacy-cloud-providers/azure/retry" | 	"k8s.io/legacy-cloud-providers/azure/retry" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| @@ -983,7 +984,7 @@ func (f *fakeVMSet) DetachDisk(diskName, diskURI string, nodeName types.NodeName | |||||||
| 	return fmt.Errorf("unimplemented") | 	return fmt.Errorf("unimplemented") | ||||||
| } | } | ||||||
|  |  | ||||||
| func (f *fakeVMSet) GetDataDisks(nodeName types.NodeName, crt cacheReadType) ([]compute.DataDisk, error) { | func (f *fakeVMSet) GetDataDisks(nodeName types.NodeName, crt azcache.AzureCacheReadType) ([]compute.DataDisk, error) { | ||||||
| 	return nil, fmt.Errorf("unimplemented") | 	return nil, fmt.Errorf("unimplemented") | ||||||
| } | } | ||||||
|  |  | ||||||
|   | |||||||
| @@ -24,6 +24,8 @@ import ( | |||||||
| 	"io/ioutil" | 	"io/ioutil" | ||||||
| 	"net/http" | 	"net/http" | ||||||
| 	"time" | 	"time" | ||||||
|  |  | ||||||
|  | 	azcache "k8s.io/legacy-cloud-providers/azure/cache" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| const ( | const ( | ||||||
| @@ -87,7 +89,7 @@ type InstanceMetadata struct { | |||||||
| // InstanceMetadataService knows how to query the Azure instance metadata server. | // InstanceMetadataService knows how to query the Azure instance metadata server. | ||||||
| type InstanceMetadataService struct { | type InstanceMetadataService struct { | ||||||
| 	metadataURL string | 	metadataURL string | ||||||
| 	imsCache    *timedCache | 	imsCache    *azcache.TimedCache | ||||||
| } | } | ||||||
|  |  | ||||||
| // NewInstanceMetadataService creates an instance of the InstanceMetadataService accessor object. | // NewInstanceMetadataService creates an instance of the InstanceMetadataService accessor object. | ||||||
| @@ -96,7 +98,7 @@ func NewInstanceMetadataService(metadataURL string) (*InstanceMetadataService, e | |||||||
| 		metadataURL: metadataURL, | 		metadataURL: metadataURL, | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	imsCache, err := newTimedcache(metadataCacheTTL, ims.getInstanceMetadata) | 	imsCache, err := azcache.NewTimedcache(metadataCacheTTL, ims.getInstanceMetadata) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return nil, err | 		return nil, err | ||||||
| 	} | 	} | ||||||
| @@ -145,7 +147,7 @@ func (ims *InstanceMetadataService) getInstanceMetadata(key string) (interface{} | |||||||
|  |  | ||||||
| // GetMetadata gets instance metadata from cache. | // GetMetadata gets instance metadata from cache. | ||||||
| // crt determines if we can get data from stalled cache/need fresh if cache expired. | // crt determines if we can get data from stalled cache/need fresh if cache expired. | ||||||
| func (ims *InstanceMetadataService) GetMetadata(crt cacheReadType) (*InstanceMetadata, error) { | func (ims *InstanceMetadataService) GetMetadata(crt azcache.AzureCacheReadType) (*InstanceMetadata, error) { | ||||||
| 	cache, err := ims.imsCache.Get(metadataCacheKey, crt) | 	cache, err := ims.imsCache.Get(metadataCacheKey, crt) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return nil, err | 		return nil, err | ||||||
|   | |||||||
| @@ -28,6 +28,7 @@ import ( | |||||||
| 	"k8s.io/apimachinery/pkg/types" | 	"k8s.io/apimachinery/pkg/types" | ||||||
| 	cloudprovider "k8s.io/cloud-provider" | 	cloudprovider "k8s.io/cloud-provider" | ||||||
| 	"k8s.io/klog" | 	"k8s.io/klog" | ||||||
|  | 	azcache "k8s.io/legacy-cloud-providers/azure/cache" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| const ( | const ( | ||||||
| @@ -73,7 +74,7 @@ func (az *Cloud) NodeAddresses(ctx context.Context, name types.NodeName) ([]v1.N | |||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	if az.UseInstanceMetadata { | 	if az.UseInstanceMetadata { | ||||||
| 		metadata, err := az.metadata.GetMetadata(cacheReadTypeUnsafe) | 		metadata, err := az.metadata.GetMetadata(azcache.CacheReadTypeUnsafe) | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			return nil, err | 			return nil, err | ||||||
| 		} | 		} | ||||||
| @@ -259,7 +260,7 @@ func (az *Cloud) InstanceID(ctx context.Context, name types.NodeName) (string, e | |||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	if az.UseInstanceMetadata { | 	if az.UseInstanceMetadata { | ||||||
| 		metadata, err := az.metadata.GetMetadata(cacheReadTypeUnsafe) | 		metadata, err := az.metadata.GetMetadata(azcache.CacheReadTypeUnsafe) | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			return "", err | 			return "", err | ||||||
| 		} | 		} | ||||||
| @@ -346,7 +347,7 @@ func (az *Cloud) InstanceType(ctx context.Context, name types.NodeName) (string, | |||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	if az.UseInstanceMetadata { | 	if az.UseInstanceMetadata { | ||||||
| 		metadata, err := az.metadata.GetMetadata(cacheReadTypeUnsafe) | 		metadata, err := az.metadata.GetMetadata(azcache.CacheReadTypeUnsafe) | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			return "", err | 			return "", err | ||||||
| 		} | 		} | ||||||
|   | |||||||
| @@ -35,6 +35,7 @@ import ( | |||||||
| 	cloudprovider "k8s.io/cloud-provider" | 	cloudprovider "k8s.io/cloud-provider" | ||||||
| 	servicehelpers "k8s.io/cloud-provider/service/helpers" | 	servicehelpers "k8s.io/cloud-provider/service/helpers" | ||||||
| 	"k8s.io/klog" | 	"k8s.io/klog" | ||||||
|  | 	azcache "k8s.io/legacy-cloud-providers/azure/cache" | ||||||
| 	utilnet "k8s.io/utils/net" | 	utilnet "k8s.io/utils/net" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| @@ -961,7 +962,7 @@ func (az *Cloud) reconcileLoadBalancer(clusterName string, service *v1.Service, | |||||||
|  |  | ||||||
| 			if isInternal { | 			if isInternal { | ||||||
| 				// Refresh updated lb which will be used later in other places. | 				// Refresh updated lb which will be used later in other places. | ||||||
| 				newLB, exist, err := az.getAzureLoadBalancer(lbName, cacheReadTypeDefault) | 				newLB, exist, err := az.getAzureLoadBalancer(lbName, azcache.CacheReadTypeDefault) | ||||||
| 				if err != nil { | 				if err != nil { | ||||||
| 					klog.V(2).Infof("reconcileLoadBalancer for service(%s): getAzureLoadBalancer(%s) failed: %v", serviceName, lbName, err) | 					klog.V(2).Infof("reconcileLoadBalancer for service(%s): getAzureLoadBalancer(%s) failed: %v", serviceName, lbName, err) | ||||||
| 					return nil, err | 					return nil, err | ||||||
| @@ -1125,7 +1126,7 @@ func (az *Cloud) reconcileSecurityGroup(clusterName string, service *v1.Service, | |||||||
| 		ports = []v1.ServicePort{} | 		ports = []v1.ServicePort{} | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	sg, err := az.getSecurityGroup(cacheReadTypeDefault) | 	sg, err := az.getSecurityGroup(azcache.CacheReadTypeDefault) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return nil, err | 		return nil, err | ||||||
| 	} | 	} | ||||||
| @@ -1466,7 +1467,7 @@ func (az *Cloud) reconcilePublicIP(clusterName string, service *v1.Service, lbNa | |||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	if lbName != "" { | 	if lbName != "" { | ||||||
| 		loadBalancer, _, err := az.getAzureLoadBalancer(lbName, cacheReadTypeDefault) | 		loadBalancer, _, err := az.getAzureLoadBalancer(lbName, azcache.CacheReadTypeDefault) | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			return nil, err | 			return nil, err | ||||||
| 		} | 		} | ||||||
|   | |||||||
| @@ -31,6 +31,7 @@ import ( | |||||||
| 	"k8s.io/apimachinery/pkg/types" | 	"k8s.io/apimachinery/pkg/types" | ||||||
| 	cloudprovider "k8s.io/cloud-provider" | 	cloudprovider "k8s.io/cloud-provider" | ||||||
| 	"k8s.io/klog" | 	"k8s.io/klog" | ||||||
|  | 	azcache "k8s.io/legacy-cloud-providers/azure/cache" | ||||||
| 	utilnet "k8s.io/utils/net" | 	utilnet "k8s.io/utils/net" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| @@ -117,7 +118,7 @@ func (d *delayedRouteUpdater) updateRoutes() { | |||||||
|  |  | ||||||
| 	var routeTable network.RouteTable | 	var routeTable network.RouteTable | ||||||
| 	var existsRouteTable bool | 	var existsRouteTable bool | ||||||
| 	routeTable, existsRouteTable, err = d.az.getRouteTable(cacheReadTypeDefault) | 	routeTable, existsRouteTable, err = d.az.getRouteTable(azcache.CacheReadTypeDefault) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		klog.Errorf("getRouteTable() failed with error: %v", err) | 		klog.Errorf("getRouteTable() failed with error: %v", err) | ||||||
| 		return | 		return | ||||||
| @@ -131,7 +132,7 @@ func (d *delayedRouteUpdater) updateRoutes() { | |||||||
| 			return | 			return | ||||||
| 		} | 		} | ||||||
|  |  | ||||||
| 		routeTable, _, err = d.az.getRouteTable(cacheReadTypeDefault) | 		routeTable, _, err = d.az.getRouteTable(azcache.CacheReadTypeDefault) | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			klog.Errorf("getRouteTable() failed with error: %v", err) | 			klog.Errorf("getRouteTable() failed with error: %v", err) | ||||||
| 			return | 			return | ||||||
| @@ -200,7 +201,7 @@ func (d *delayedRouteUpdater) addRouteOperation(operation routeOperation, route | |||||||
| // ListRoutes lists all managed routes that belong to the specified clusterName | // ListRoutes lists all managed routes that belong to the specified clusterName | ||||||
| func (az *Cloud) ListRoutes(ctx context.Context, clusterName string) ([]*cloudprovider.Route, error) { | func (az *Cloud) ListRoutes(ctx context.Context, clusterName string) ([]*cloudprovider.Route, error) { | ||||||
| 	klog.V(10).Infof("ListRoutes: START clusterName=%q", clusterName) | 	klog.V(10).Infof("ListRoutes: START clusterName=%q", clusterName) | ||||||
| 	routeTable, existsRouteTable, err := az.getRouteTable(cacheReadTypeDefault) | 	routeTable, existsRouteTable, err := az.getRouteTable(azcache.CacheReadTypeDefault) | ||||||
| 	routes, err := processRoutes(az.ipv6DualStackEnabled, routeTable, existsRouteTable, err) | 	routes, err := processRoutes(az.ipv6DualStackEnabled, routeTable, existsRouteTable, err) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return nil, err | 		return nil, err | ||||||
|   | |||||||
| @@ -39,6 +39,7 @@ import ( | |||||||
| 	"k8s.io/apimachinery/pkg/util/uuid" | 	"k8s.io/apimachinery/pkg/util/uuid" | ||||||
| 	cloudprovider "k8s.io/cloud-provider" | 	cloudprovider "k8s.io/cloud-provider" | ||||||
| 	"k8s.io/klog" | 	"k8s.io/klog" | ||||||
|  | 	azcache "k8s.io/legacy-cloud-providers/azure/cache" | ||||||
|  |  | ||||||
| 	"k8s.io/component-base/featuregate" | 	"k8s.io/component-base/featuregate" | ||||||
| 	utilnet "k8s.io/utils/net" | 	utilnet "k8s.io/utils/net" | ||||||
| @@ -391,14 +392,14 @@ func (as *availabilitySet) GetInstanceIDByNodeName(name string) (string, error) | |||||||
| 	var machine compute.VirtualMachine | 	var machine compute.VirtualMachine | ||||||
| 	var err error | 	var err error | ||||||
|  |  | ||||||
| 	machine, err = as.getVirtualMachine(types.NodeName(name), cacheReadTypeUnsafe) | 	machine, err = as.getVirtualMachine(types.NodeName(name), azcache.CacheReadTypeUnsafe) | ||||||
| 	if err == cloudprovider.InstanceNotFound { | 	if err == cloudprovider.InstanceNotFound { | ||||||
| 		return "", cloudprovider.InstanceNotFound | 		return "", cloudprovider.InstanceNotFound | ||||||
| 	} | 	} | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		if as.CloudProviderBackoff { | 		if as.CloudProviderBackoff { | ||||||
| 			klog.V(2).Infof("GetInstanceIDByNodeName(%s) backing off", name) | 			klog.V(2).Infof("GetInstanceIDByNodeName(%s) backing off", name) | ||||||
| 			machine, err = as.GetVirtualMachineWithRetry(types.NodeName(name), cacheReadTypeUnsafe) | 			machine, err = as.GetVirtualMachineWithRetry(types.NodeName(name), azcache.CacheReadTypeUnsafe) | ||||||
| 			if err != nil { | 			if err != nil { | ||||||
| 				klog.V(2).Infof("GetInstanceIDByNodeName(%s) abort backoff", name) | 				klog.V(2).Infof("GetInstanceIDByNodeName(%s) abort backoff", name) | ||||||
| 				return "", err | 				return "", err | ||||||
| @@ -419,7 +420,7 @@ func (as *availabilitySet) GetInstanceIDByNodeName(name string) (string, error) | |||||||
|  |  | ||||||
| // GetPowerStatusByNodeName returns the power state of the specified node. | // GetPowerStatusByNodeName returns the power state of the specified node. | ||||||
| func (as *availabilitySet) GetPowerStatusByNodeName(name string) (powerState string, err error) { | func (as *availabilitySet) GetPowerStatusByNodeName(name string) (powerState string, err error) { | ||||||
| 	vm, err := as.getVirtualMachine(types.NodeName(name), cacheReadTypeDefault) | 	vm, err := as.getVirtualMachine(types.NodeName(name), azcache.CacheReadTypeDefault) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return powerState, err | 		return powerState, err | ||||||
| 	} | 	} | ||||||
| @@ -452,7 +453,7 @@ func (as *availabilitySet) GetNodeNameByProviderID(providerID string) (types.Nod | |||||||
|  |  | ||||||
| // GetInstanceTypeByNodeName gets the instance type by node name. | // GetInstanceTypeByNodeName gets the instance type by node name. | ||||||
| func (as *availabilitySet) GetInstanceTypeByNodeName(name string) (string, error) { | func (as *availabilitySet) GetInstanceTypeByNodeName(name string) (string, error) { | ||||||
| 	machine, err := as.getVirtualMachine(types.NodeName(name), cacheReadTypeUnsafe) | 	machine, err := as.getVirtualMachine(types.NodeName(name), azcache.CacheReadTypeUnsafe) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		klog.Errorf("as.GetInstanceTypeByNodeName(%s) failed: as.getVirtualMachine(%s) err=%v", name, name, err) | 		klog.Errorf("as.GetInstanceTypeByNodeName(%s) failed: as.getVirtualMachine(%s) err=%v", name, name, err) | ||||||
| 		return "", err | 		return "", err | ||||||
| @@ -464,7 +465,7 @@ func (as *availabilitySet) GetInstanceTypeByNodeName(name string) (string, error | |||||||
| // GetZoneByNodeName gets availability zone for the specified node. If the node is not running | // GetZoneByNodeName gets availability zone for the specified node. If the node is not running | ||||||
| // with availability zone, then it returns fault domain. | // with availability zone, then it returns fault domain. | ||||||
| func (as *availabilitySet) GetZoneByNodeName(name string) (cloudprovider.Zone, error) { | func (as *availabilitySet) GetZoneByNodeName(name string) (cloudprovider.Zone, error) { | ||||||
| 	vm, err := as.getVirtualMachine(types.NodeName(name), cacheReadTypeUnsafe) | 	vm, err := as.getVirtualMachine(types.NodeName(name), azcache.CacheReadTypeUnsafe) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return cloudprovider.Zone{}, err | 		return cloudprovider.Zone{}, err | ||||||
| 	} | 	} | ||||||
| @@ -665,7 +666,7 @@ func extractResourceGroupByNicID(nicID string) (string, error) { | |||||||
| func (as *availabilitySet) getPrimaryInterfaceWithVMSet(nodeName, vmSetName string) (network.Interface, error) { | func (as *availabilitySet) getPrimaryInterfaceWithVMSet(nodeName, vmSetName string) (network.Interface, error) { | ||||||
| 	var machine compute.VirtualMachine | 	var machine compute.VirtualMachine | ||||||
|  |  | ||||||
| 	machine, err := as.GetVirtualMachineWithRetry(types.NodeName(nodeName), cacheReadTypeDefault) | 	machine, err := as.GetVirtualMachineWithRetry(types.NodeName(nodeName), azcache.CacheReadTypeDefault) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		klog.V(2).Infof("GetPrimaryInterface(%s, %s) abort backoff", nodeName, vmSetName) | 		klog.V(2).Infof("GetPrimaryInterface(%s, %s) abort backoff", nodeName, vmSetName) | ||||||
| 		return network.Interface{}, err | 		return network.Interface{}, err | ||||||
|   | |||||||
| @@ -25,6 +25,7 @@ import ( | |||||||
| 	v1 "k8s.io/api/core/v1" | 	v1 "k8s.io/api/core/v1" | ||||||
| 	"k8s.io/apimachinery/pkg/types" | 	"k8s.io/apimachinery/pkg/types" | ||||||
| 	cloudprovider "k8s.io/cloud-provider" | 	cloudprovider "k8s.io/cloud-provider" | ||||||
|  | 	azcache "k8s.io/legacy-cloud-providers/azure/cache" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| // VMSet defines functions all vmsets (including scale set and availability | // VMSet defines functions all vmsets (including scale set and availability | ||||||
| @@ -68,7 +69,7 @@ type VMSet interface { | |||||||
| 	// DetachDisk detaches a vhd from host. The vhd can be identified by diskName or diskURI. | 	// DetachDisk detaches a vhd from host. The vhd can be identified by diskName or diskURI. | ||||||
| 	DetachDisk(diskName, diskURI string, nodeName types.NodeName) error | 	DetachDisk(diskName, diskURI string, nodeName types.NodeName) error | ||||||
| 	// GetDataDisks gets a list of data disks attached to the node. | 	// GetDataDisks gets a list of data disks attached to the node. | ||||||
| 	GetDataDisks(nodeName types.NodeName, crt cacheReadType) ([]compute.DataDisk, error) | 	GetDataDisks(nodeName types.NodeName, string azcache.AzureCacheReadType) ([]compute.DataDisk, error) | ||||||
|  |  | ||||||
| 	// GetPowerStatusByNodeName returns the power state of the specified node. | 	// GetPowerStatusByNodeName returns the power state of the specified node. | ||||||
| 	GetPowerStatusByNodeName(name string) (string, error) | 	GetPowerStatusByNodeName(name string) (string, error) | ||||||
|   | |||||||
| @@ -36,6 +36,7 @@ import ( | |||||||
| 	utilerrors "k8s.io/apimachinery/pkg/util/errors" | 	utilerrors "k8s.io/apimachinery/pkg/util/errors" | ||||||
| 	cloudprovider "k8s.io/cloud-provider" | 	cloudprovider "k8s.io/cloud-provider" | ||||||
| 	"k8s.io/klog" | 	"k8s.io/klog" | ||||||
|  | 	azcache "k8s.io/legacy-cloud-providers/azure/cache" | ||||||
| 	utilnet "k8s.io/utils/net" | 	utilnet "k8s.io/utils/net" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| @@ -62,9 +63,9 @@ type scaleSet struct { | |||||||
| 	// (e.g. master nodes) may not belong to any scale sets. | 	// (e.g. master nodes) may not belong to any scale sets. | ||||||
| 	availabilitySet VMSet | 	availabilitySet VMSet | ||||||
|  |  | ||||||
| 	vmssCache                 *timedCache | 	vmssCache                 *azcache.TimedCache | ||||||
| 	vmssVMCache               *timedCache | 	vmssVMCache               *azcache.TimedCache | ||||||
| 	availabilitySetNodesCache *timedCache | 	availabilitySetNodesCache *azcache.TimedCache | ||||||
| } | } | ||||||
|  |  | ||||||
| // newScaleSet creates a new scaleSet. | // newScaleSet creates a new scaleSet. | ||||||
| @@ -95,7 +96,7 @@ func newScaleSet(az *Cloud) (VMSet, error) { | |||||||
| 	return ss, nil | 	return ss, nil | ||||||
| } | } | ||||||
|  |  | ||||||
| func (ss *scaleSet) getVMSS(vmssName string, crt cacheReadType) (*compute.VirtualMachineScaleSet, error) { | func (ss *scaleSet) getVMSS(vmssName string, crt azcache.AzureCacheReadType) (*compute.VirtualMachineScaleSet, error) { | ||||||
| 	getter := func(vmssName string) (*compute.VirtualMachineScaleSet, error) { | 	getter := func(vmssName string) (*compute.VirtualMachineScaleSet, error) { | ||||||
| 		cached, err := ss.vmssCache.Get(vmssKey, crt) | 		cached, err := ss.vmssCache.Get(vmssKey, crt) | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| @@ -134,8 +135,8 @@ func (ss *scaleSet) getVMSS(vmssName string, crt cacheReadType) (*compute.Virtua | |||||||
|  |  | ||||||
| // getVmssVM gets virtualMachineScaleSetVM by nodeName from cache. | // getVmssVM gets virtualMachineScaleSetVM by nodeName from cache. | ||||||
| // It returns cloudprovider.InstanceNotFound if node does not belong to any scale sets. | // It returns cloudprovider.InstanceNotFound if node does not belong to any scale sets. | ||||||
| func (ss *scaleSet) getVmssVM(nodeName string, crt cacheReadType) (string, string, *compute.VirtualMachineScaleSetVM, error) { | func (ss *scaleSet) getVmssVM(nodeName string, crt azcache.AzureCacheReadType) (string, string, *compute.VirtualMachineScaleSetVM, error) { | ||||||
| 	getter := func(nodeName string, crt cacheReadType) (string, string, *compute.VirtualMachineScaleSetVM, bool, error) { | 	getter := func(nodeName string, crt azcache.AzureCacheReadType) (string, string, *compute.VirtualMachineScaleSetVM, bool, error) { | ||||||
| 		var found bool | 		var found bool | ||||||
| 		cached, err := ss.vmssVMCache.Get(vmssVirtualMachinesKey, crt) | 		cached, err := ss.vmssVMCache.Get(vmssVirtualMachinesKey, crt) | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| @@ -164,7 +165,7 @@ func (ss *scaleSet) getVmssVM(nodeName string, crt cacheReadType) (string, strin | |||||||
|  |  | ||||||
| 	if !found { | 	if !found { | ||||||
| 		klog.V(2).Infof("Couldn't find VMSS VM with nodeName %s, refreshing the cache", nodeName) | 		klog.V(2).Infof("Couldn't find VMSS VM with nodeName %s, refreshing the cache", nodeName) | ||||||
| 		vmssName, instanceID, vm, found, err = getter(nodeName, cacheReadTypeForceRefresh) | 		vmssName, instanceID, vm, found, err = getter(nodeName, azcache.CacheReadTypeForceRefresh) | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			return "", "", nil, err | 			return "", "", nil, err | ||||||
| 		} | 		} | ||||||
| @@ -182,7 +183,7 @@ func (ss *scaleSet) getVmssVM(nodeName string, crt cacheReadType) (string, strin | |||||||
|  |  | ||||||
| // GetPowerStatusByNodeName returns the power state of the specified node. | // GetPowerStatusByNodeName returns the power state of the specified node. | ||||||
| func (ss *scaleSet) GetPowerStatusByNodeName(name string) (powerState string, err error) { | func (ss *scaleSet) GetPowerStatusByNodeName(name string) (powerState string, err error) { | ||||||
| 	_, _, vm, err := ss.getVmssVM(name, cacheReadTypeDefault) | 	_, _, vm, err := ss.getVmssVM(name, azcache.CacheReadTypeDefault) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return powerState, err | 		return powerState, err | ||||||
| 	} | 	} | ||||||
| @@ -204,8 +205,8 @@ func (ss *scaleSet) GetPowerStatusByNodeName(name string) (powerState string, er | |||||||
|  |  | ||||||
| // getCachedVirtualMachineByInstanceID gets scaleSetVMInfo from cache. | // getCachedVirtualMachineByInstanceID gets scaleSetVMInfo from cache. | ||||||
| // The node must belong to one of scale sets. | // The node must belong to one of scale sets. | ||||||
| func (ss *scaleSet) getVmssVMByInstanceID(resourceGroup, scaleSetName, instanceID string, crt cacheReadType) (*compute.VirtualMachineScaleSetVM, error) { | func (ss *scaleSet) getVmssVMByInstanceID(resourceGroup, scaleSetName, instanceID string, crt azcache.AzureCacheReadType) (*compute.VirtualMachineScaleSetVM, error) { | ||||||
| 	getter := func(crt cacheReadType) (vm *compute.VirtualMachineScaleSetVM, found bool, err error) { | 	getter := func(crt azcache.AzureCacheReadType) (vm *compute.VirtualMachineScaleSetVM, found bool, err error) { | ||||||
| 		cached, err := ss.vmssVMCache.Get(vmssVirtualMachinesKey, crt) | 		cached, err := ss.vmssVMCache.Get(vmssVirtualMachinesKey, crt) | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			return nil, false, err | 			return nil, false, err | ||||||
| @@ -234,7 +235,7 @@ func (ss *scaleSet) getVmssVMByInstanceID(resourceGroup, scaleSetName, instanceI | |||||||
| 	} | 	} | ||||||
| 	if !found { | 	if !found { | ||||||
| 		klog.V(2).Infof("Couldn't find VMSS VM with scaleSetName %q and instanceID %q, refreshing the cache", scaleSetName, instanceID) | 		klog.V(2).Infof("Couldn't find VMSS VM with scaleSetName %q and instanceID %q, refreshing the cache", scaleSetName, instanceID) | ||||||
| 		vm, found, err = getter(cacheReadTypeForceRefresh) | 		vm, found, err = getter(azcache.CacheReadTypeForceRefresh) | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			return nil, err | 			return nil, err | ||||||
| 		} | 		} | ||||||
| @@ -253,7 +254,7 @@ func (ss *scaleSet) getVmssVMByInstanceID(resourceGroup, scaleSetName, instanceI | |||||||
| // It must return ("", cloudprovider.InstanceNotFound) if the instance does | // It must return ("", cloudprovider.InstanceNotFound) if the instance does | ||||||
| // not exist or is no longer running. | // not exist or is no longer running. | ||||||
| func (ss *scaleSet) GetInstanceIDByNodeName(name string) (string, error) { | func (ss *scaleSet) GetInstanceIDByNodeName(name string) (string, error) { | ||||||
| 	managedByAS, err := ss.isNodeManagedByAvailabilitySet(name, cacheReadTypeUnsafe) | 	managedByAS, err := ss.isNodeManagedByAvailabilitySet(name, azcache.CacheReadTypeUnsafe) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		klog.Errorf("Failed to check isNodeManagedByAvailabilitySet: %v", err) | 		klog.Errorf("Failed to check isNodeManagedByAvailabilitySet: %v", err) | ||||||
| 		return "", err | 		return "", err | ||||||
| @@ -263,7 +264,7 @@ func (ss *scaleSet) GetInstanceIDByNodeName(name string) (string, error) { | |||||||
| 		return ss.availabilitySet.GetInstanceIDByNodeName(name) | 		return ss.availabilitySet.GetInstanceIDByNodeName(name) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	_, _, vm, err := ss.getVmssVM(name, cacheReadTypeUnsafe) | 	_, _, vm, err := ss.getVmssVM(name, azcache.CacheReadTypeUnsafe) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return "", err | 		return "", err | ||||||
| 	} | 	} | ||||||
| @@ -297,7 +298,7 @@ func (ss *scaleSet) GetNodeNameByProviderID(providerID string) (types.NodeName, | |||||||
| 		return ss.availabilitySet.GetNodeNameByProviderID(providerID) | 		return ss.availabilitySet.GetNodeNameByProviderID(providerID) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	vm, err := ss.getVmssVMByInstanceID(resourceGroup, scaleSetName, instanceID, cacheReadTypeUnsafe) | 	vm, err := ss.getVmssVMByInstanceID(resourceGroup, scaleSetName, instanceID, azcache.CacheReadTypeUnsafe) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return "", err | 		return "", err | ||||||
| 	} | 	} | ||||||
| @@ -312,7 +313,7 @@ func (ss *scaleSet) GetNodeNameByProviderID(providerID string) (types.NodeName, | |||||||
|  |  | ||||||
| // GetInstanceTypeByNodeName gets the instance type by node name. | // GetInstanceTypeByNodeName gets the instance type by node name. | ||||||
| func (ss *scaleSet) GetInstanceTypeByNodeName(name string) (string, error) { | func (ss *scaleSet) GetInstanceTypeByNodeName(name string) (string, error) { | ||||||
| 	managedByAS, err := ss.isNodeManagedByAvailabilitySet(name, cacheReadTypeUnsafe) | 	managedByAS, err := ss.isNodeManagedByAvailabilitySet(name, azcache.CacheReadTypeUnsafe) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		klog.Errorf("Failed to check isNodeManagedByAvailabilitySet: %v", err) | 		klog.Errorf("Failed to check isNodeManagedByAvailabilitySet: %v", err) | ||||||
| 		return "", err | 		return "", err | ||||||
| @@ -322,7 +323,7 @@ func (ss *scaleSet) GetInstanceTypeByNodeName(name string) (string, error) { | |||||||
| 		return ss.availabilitySet.GetInstanceTypeByNodeName(name) | 		return ss.availabilitySet.GetInstanceTypeByNodeName(name) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	_, _, vm, err := ss.getVmssVM(name, cacheReadTypeUnsafe) | 	_, _, vm, err := ss.getVmssVM(name, azcache.CacheReadTypeUnsafe) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return "", err | 		return "", err | ||||||
| 	} | 	} | ||||||
| @@ -337,7 +338,7 @@ func (ss *scaleSet) GetInstanceTypeByNodeName(name string) (string, error) { | |||||||
| // GetZoneByNodeName gets availability zone for the specified node. If the node is not running | // GetZoneByNodeName gets availability zone for the specified node. If the node is not running | ||||||
| // with availability zone, then it returns fault domain. | // with availability zone, then it returns fault domain. | ||||||
| func (ss *scaleSet) GetZoneByNodeName(name string) (cloudprovider.Zone, error) { | func (ss *scaleSet) GetZoneByNodeName(name string) (cloudprovider.Zone, error) { | ||||||
| 	managedByAS, err := ss.isNodeManagedByAvailabilitySet(name, cacheReadTypeUnsafe) | 	managedByAS, err := ss.isNodeManagedByAvailabilitySet(name, azcache.CacheReadTypeUnsafe) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		klog.Errorf("Failed to check isNodeManagedByAvailabilitySet: %v", err) | 		klog.Errorf("Failed to check isNodeManagedByAvailabilitySet: %v", err) | ||||||
| 		return cloudprovider.Zone{}, err | 		return cloudprovider.Zone{}, err | ||||||
| @@ -347,7 +348,7 @@ func (ss *scaleSet) GetZoneByNodeName(name string) (cloudprovider.Zone, error) { | |||||||
| 		return ss.availabilitySet.GetZoneByNodeName(name) | 		return ss.availabilitySet.GetZoneByNodeName(name) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	_, _, vm, err := ss.getVmssVM(name, cacheReadTypeUnsafe) | 	_, _, vm, err := ss.getVmssVM(name, azcache.CacheReadTypeUnsafe) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return cloudprovider.Zone{}, err | 		return cloudprovider.Zone{}, err | ||||||
| 	} | 	} | ||||||
| @@ -583,7 +584,7 @@ func (ss *scaleSet) getAgentPoolScaleSets(nodes []*v1.Node) (*[]string, error) { | |||||||
| 		} | 		} | ||||||
|  |  | ||||||
| 		nodeName := nodes[nx].Name | 		nodeName := nodes[nx].Name | ||||||
| 		ssName, _, _, err := ss.getVmssVM(nodeName, cacheReadTypeDefault) | 		ssName, _, _, err := ss.getVmssVM(nodeName, azcache.CacheReadTypeDefault) | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			return nil, err | 			return nil, err | ||||||
| 		} | 		} | ||||||
| @@ -661,7 +662,7 @@ func extractResourceGroupByVMSSNicID(nicID string) (string, error) { | |||||||
|  |  | ||||||
| // GetPrimaryInterface gets machine primary network interface by node name and vmSet. | // GetPrimaryInterface gets machine primary network interface by node name and vmSet. | ||||||
| func (ss *scaleSet) GetPrimaryInterface(nodeName string) (network.Interface, error) { | func (ss *scaleSet) GetPrimaryInterface(nodeName string) (network.Interface, error) { | ||||||
| 	managedByAS, err := ss.isNodeManagedByAvailabilitySet(nodeName, cacheReadTypeDefault) | 	managedByAS, err := ss.isNodeManagedByAvailabilitySet(nodeName, azcache.CacheReadTypeDefault) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		klog.Errorf("Failed to check isNodeManagedByAvailabilitySet: %v", err) | 		klog.Errorf("Failed to check isNodeManagedByAvailabilitySet: %v", err) | ||||||
| 		return network.Interface{}, err | 		return network.Interface{}, err | ||||||
| @@ -671,7 +672,7 @@ func (ss *scaleSet) GetPrimaryInterface(nodeName string) (network.Interface, err | |||||||
| 		return ss.availabilitySet.GetPrimaryInterface(nodeName) | 		return ss.availabilitySet.GetPrimaryInterface(nodeName) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	ssName, instanceID, vm, err := ss.getVmssVM(nodeName, cacheReadTypeDefault) | 	ssName, instanceID, vm, err := ss.getVmssVM(nodeName, azcache.CacheReadTypeDefault) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		// VM is availability set, but not cached yet in availabilitySetNodesCache. | 		// VM is availability set, but not cached yet in availabilitySetNodesCache. | ||||||
| 		if err == ErrorNotVmssInstance { | 		if err == ErrorNotVmssInstance { | ||||||
| @@ -794,7 +795,7 @@ func (ss *scaleSet) getConfigForScaleSetByIPFamily(config *compute.VirtualMachin | |||||||
| func (ss *scaleSet) EnsureHostInPool(service *v1.Service, nodeName types.NodeName, backendPoolID string, vmSetName string, isInternal bool) error { | func (ss *scaleSet) EnsureHostInPool(service *v1.Service, nodeName types.NodeName, backendPoolID string, vmSetName string, isInternal bool) error { | ||||||
| 	klog.V(3).Infof("ensuring node %q of scaleset %q in LB backendpool %q", nodeName, vmSetName, backendPoolID) | 	klog.V(3).Infof("ensuring node %q of scaleset %q in LB backendpool %q", nodeName, vmSetName, backendPoolID) | ||||||
| 	vmName := mapNodeNameToVMName(nodeName) | 	vmName := mapNodeNameToVMName(nodeName) | ||||||
| 	ssName, instanceID, vm, err := ss.getVmssVM(vmName, cacheReadTypeDefault) | 	ssName, instanceID, vm, err := ss.getVmssVM(vmName, azcache.CacheReadTypeDefault) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
| @@ -947,7 +948,7 @@ func (ss *scaleSet) ensureVMSSInPool(service *v1.Service, nodes []*v1.Node, back | |||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	for vmssName := range vmssNamesMap { | 	for vmssName := range vmssNamesMap { | ||||||
| 		vmss, err := ss.getVMSS(vmssName, cacheReadTypeDefault) | 		vmss, err := ss.getVMSS(vmssName, azcache.CacheReadTypeDefault) | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			return err | 			return err | ||||||
| 		} | 		} | ||||||
| @@ -1056,7 +1057,7 @@ func (ss *scaleSet) EnsureHostsInPool(service *v1.Service, nodes []*v1.Node, bac | |||||||
|  |  | ||||||
| 		f := func() error { | 		f := func() error { | ||||||
| 			// Check whether the node is VMAS virtual machine. | 			// Check whether the node is VMAS virtual machine. | ||||||
| 			managedByAS, err := ss.isNodeManagedByAvailabilitySet(localNodeName, cacheReadTypeDefault) | 			managedByAS, err := ss.isNodeManagedByAvailabilitySet(localNodeName, azcache.CacheReadTypeDefault) | ||||||
| 			if err != nil { | 			if err != nil { | ||||||
| 				klog.Errorf("Failed to check isNodeManagedByAvailabilitySet(%s): %v", localNodeName, err) | 				klog.Errorf("Failed to check isNodeManagedByAvailabilitySet(%s): %v", localNodeName, err) | ||||||
| 				return err | 				return err | ||||||
| @@ -1097,7 +1098,7 @@ func (ss *scaleSet) EnsureHostsInPool(service *v1.Service, nodes []*v1.Node, bac | |||||||
|  |  | ||||||
| // ensureBackendPoolDeletedFromNode ensures the loadBalancer backendAddressPools deleted from the specified node. | // ensureBackendPoolDeletedFromNode ensures the loadBalancer backendAddressPools deleted from the specified node. | ||||||
| func (ss *scaleSet) ensureBackendPoolDeletedFromNode(service *v1.Service, nodeName, backendPoolID string) error { | func (ss *scaleSet) ensureBackendPoolDeletedFromNode(service *v1.Service, nodeName, backendPoolID string) error { | ||||||
| 	ssName, instanceID, vm, err := ss.getVmssVM(nodeName, cacheReadTypeDefault) | 	ssName, instanceID, vm, err := ss.getVmssVM(nodeName, azcache.CacheReadTypeDefault) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
| @@ -1186,7 +1187,7 @@ func (ss *scaleSet) getNodeNameByIPConfigurationID(ipConfigurationID string) (st | |||||||
| 	resourceGroup := matches[1] | 	resourceGroup := matches[1] | ||||||
| 	scaleSetName := matches[2] | 	scaleSetName := matches[2] | ||||||
| 	instanceID := matches[3] | 	instanceID := matches[3] | ||||||
| 	vm, err := ss.getVmssVMByInstanceID(resourceGroup, scaleSetName, instanceID, cacheReadTypeUnsafe) | 	vm, err := ss.getVmssVMByInstanceID(resourceGroup, scaleSetName, instanceID, azcache.CacheReadTypeUnsafe) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return "", err | 		return "", err | ||||||
| 	} | 	} | ||||||
| @@ -1232,7 +1233,7 @@ func (ss *scaleSet) ensureBackendPoolDeletedFromVMSS(service *v1.Service, backen | |||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	for vmssName := range vmssNamesMap { | 	for vmssName := range vmssNamesMap { | ||||||
| 		vmss, err := ss.getVMSS(vmssName, cacheReadTypeDefault) | 		vmss, err := ss.getVMSS(vmssName, azcache.CacheReadTypeDefault) | ||||||
|  |  | ||||||
| 		// When vmss is being deleted, CreateOrUpdate API would report "the vmss is being deleted" error. | 		// When vmss is being deleted, CreateOrUpdate API would report "the vmss is being deleted" error. | ||||||
| 		// Since it is being deleted, we shouldn't send more CreateOrUpdate requests for it. | 		// Since it is being deleted, we shouldn't send more CreateOrUpdate requests for it. | ||||||
|   | |||||||
| @@ -29,6 +29,7 @@ import ( | |||||||
|  |  | ||||||
| 	"k8s.io/apimachinery/pkg/util/sets" | 	"k8s.io/apimachinery/pkg/util/sets" | ||||||
| 	"k8s.io/klog" | 	"k8s.io/klog" | ||||||
|  | 	azcache "k8s.io/legacy-cloud-providers/azure/cache" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| var ( | var ( | ||||||
| @@ -56,7 +57,7 @@ type vmssEntry struct { | |||||||
| 	lastUpdate time.Time | 	lastUpdate time.Time | ||||||
| } | } | ||||||
|  |  | ||||||
| func (ss *scaleSet) newVMSSCache() (*timedCache, error) { | func (ss *scaleSet) newVMSSCache() (*azcache.TimedCache, error) { | ||||||
| 	getter := func(key string) (interface{}, error) { | 	getter := func(key string) (interface{}, error) { | ||||||
| 		localCache := &sync.Map{} // [vmssName]*vmssEntry | 		localCache := &sync.Map{} // [vmssName]*vmssEntry | ||||||
|  |  | ||||||
| @@ -90,7 +91,7 @@ func (ss *scaleSet) newVMSSCache() (*timedCache, error) { | |||||||
| 	if ss.Config.VmssCacheTTLInSeconds == 0 { | 	if ss.Config.VmssCacheTTLInSeconds == 0 { | ||||||
| 		ss.Config.VmssCacheTTLInSeconds = vmssCacheTTLDefaultInSeconds | 		ss.Config.VmssCacheTTLInSeconds = vmssCacheTTLDefaultInSeconds | ||||||
| 	} | 	} | ||||||
| 	return newTimedcache(time.Duration(ss.Config.VmssCacheTTLInSeconds)*time.Second, getter) | 	return azcache.NewTimedcache(time.Duration(ss.Config.VmssCacheTTLInSeconds)*time.Second, getter) | ||||||
| } | } | ||||||
|  |  | ||||||
| func extractVmssVMName(name string) (string, string, error) { | func extractVmssVMName(name string) (string, string, error) { | ||||||
| @@ -107,7 +108,7 @@ func extractVmssVMName(name string) (string, string, error) { | |||||||
| 	return ssName, instanceID, nil | 	return ssName, instanceID, nil | ||||||
| } | } | ||||||
|  |  | ||||||
| func (ss *scaleSet) newVMSSVirtualMachinesCache() (*timedCache, error) { | func (ss *scaleSet) newVMSSVirtualMachinesCache() (*azcache.TimedCache, error) { | ||||||
| 	getter := func(key string) (interface{}, error) { | 	getter := func(key string) (interface{}, error) { | ||||||
| 		localCache := &sync.Map{} // [nodeName]*vmssVirtualMachinesEntry | 		localCache := &sync.Map{} // [nodeName]*vmssVirtualMachinesEntry | ||||||
|  |  | ||||||
| @@ -115,12 +116,12 @@ func (ss *scaleSet) newVMSSVirtualMachinesCache() (*timedCache, error) { | |||||||
|  |  | ||||||
| 		if ss.vmssVMCache != nil { | 		if ss.vmssVMCache != nil { | ||||||
| 			// get old cache before refreshing the cache | 			// get old cache before refreshing the cache | ||||||
| 			entry, exists, err := ss.vmssVMCache.store.GetByKey(vmssVirtualMachinesKey) | 			entry, exists, err := ss.vmssVMCache.Store.GetByKey(vmssVirtualMachinesKey) | ||||||
| 			if err != nil { | 			if err != nil { | ||||||
| 				return nil, err | 				return nil, err | ||||||
| 			} | 			} | ||||||
| 			if exists { | 			if exists { | ||||||
| 				cached := entry.(*cacheEntry).data | 				cached := entry.(*azcache.AzureCacheEntry).Data | ||||||
| 				if cached != nil { | 				if cached != nil { | ||||||
| 					virtualMachines := cached.(*sync.Map) | 					virtualMachines := cached.(*sync.Map) | ||||||
| 					virtualMachines.Range(func(key, value interface{}) bool { | 					virtualMachines.Range(func(key, value interface{}) bool { | ||||||
| @@ -210,11 +211,11 @@ func (ss *scaleSet) newVMSSVirtualMachinesCache() (*timedCache, error) { | |||||||
| 	if ss.Config.VmssVirtualMachinesCacheTTLInSeconds == 0 { | 	if ss.Config.VmssVirtualMachinesCacheTTLInSeconds == 0 { | ||||||
| 		ss.Config.VmssVirtualMachinesCacheTTLInSeconds = vmssVirtualMachinesCacheTTLDefaultInSeconds | 		ss.Config.VmssVirtualMachinesCacheTTLInSeconds = vmssVirtualMachinesCacheTTLDefaultInSeconds | ||||||
| 	} | 	} | ||||||
| 	return newTimedcache(time.Duration(ss.Config.VmssVirtualMachinesCacheTTLInSeconds)*time.Second, getter) | 	return azcache.NewTimedcache(time.Duration(ss.Config.VmssVirtualMachinesCacheTTLInSeconds)*time.Second, getter) | ||||||
| } | } | ||||||
|  |  | ||||||
| func (ss *scaleSet) deleteCacheForNode(nodeName string) error { | func (ss *scaleSet) deleteCacheForNode(nodeName string) error { | ||||||
| 	cached, err := ss.vmssVMCache.Get(vmssVirtualMachinesKey, cacheReadTypeUnsafe) | 	cached, err := ss.vmssVMCache.Get(vmssVirtualMachinesKey, azcache.CacheReadTypeUnsafe) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		klog.Errorf("deleteCacheForNode(%s) failed with error: %v", nodeName, err) | 		klog.Errorf("deleteCacheForNode(%s) failed with error: %v", nodeName, err) | ||||||
| 		return err | 		return err | ||||||
| @@ -225,7 +226,7 @@ func (ss *scaleSet) deleteCacheForNode(nodeName string) error { | |||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
|  |  | ||||||
| func (ss *scaleSet) newAvailabilitySetNodesCache() (*timedCache, error) { | func (ss *scaleSet) newAvailabilitySetNodesCache() (*azcache.TimedCache, error) { | ||||||
| 	getter := func(key string) (interface{}, error) { | 	getter := func(key string) (interface{}, error) { | ||||||
| 		localCache := sets.NewString() | 		localCache := sets.NewString() | ||||||
| 		resourceGroups, err := ss.GetResourceGroups() | 		resourceGroups, err := ss.GetResourceGroups() | ||||||
| @@ -252,10 +253,10 @@ func (ss *scaleSet) newAvailabilitySetNodesCache() (*timedCache, error) { | |||||||
| 	if ss.Config.AvailabilitySetNodesCacheTTLInSeconds == 0 { | 	if ss.Config.AvailabilitySetNodesCacheTTLInSeconds == 0 { | ||||||
| 		ss.Config.AvailabilitySetNodesCacheTTLInSeconds = availabilitySetNodesCacheTTLDefaultInSeconds | 		ss.Config.AvailabilitySetNodesCacheTTLInSeconds = availabilitySetNodesCacheTTLDefaultInSeconds | ||||||
| 	} | 	} | ||||||
| 	return newTimedcache(time.Duration(ss.Config.AvailabilitySetNodesCacheTTLInSeconds)*time.Second, getter) | 	return azcache.NewTimedcache(time.Duration(ss.Config.AvailabilitySetNodesCacheTTLInSeconds)*time.Second, getter) | ||||||
| } | } | ||||||
|  |  | ||||||
| func (ss *scaleSet) isNodeManagedByAvailabilitySet(nodeName string, crt cacheReadType) (bool, error) { | func (ss *scaleSet) isNodeManagedByAvailabilitySet(nodeName string, crt azcache.AzureCacheReadType) (bool, error) { | ||||||
| 	// Assume all nodes are managed by VMSS when DisableAvailabilitySetNodes is enabled. | 	// Assume all nodes are managed by VMSS when DisableAvailabilitySetNodes is enabled. | ||||||
| 	if ss.DisableAvailabilitySetNodes { | 	if ss.DisableAvailabilitySetNodes { | ||||||
| 		klog.V(2).Infof("Assuming node %q is managed by VMSS since DisableAvailabilitySetNodes is set to true", nodeName) | 		klog.V(2).Infof("Assuming node %q is managed by VMSS since DisableAvailabilitySetNodes is set to true", nodeName) | ||||||
|   | |||||||
| @@ -27,6 +27,7 @@ import ( | |||||||
| 	"github.com/Azure/go-autorest/autorest/to" | 	"github.com/Azure/go-autorest/autorest/to" | ||||||
| 	"github.com/stretchr/testify/assert" | 	"github.com/stretchr/testify/assert" | ||||||
| 	cloudprovider "k8s.io/cloud-provider" | 	cloudprovider "k8s.io/cloud-provider" | ||||||
|  | 	azcache "k8s.io/legacy-cloud-providers/azure/cache" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| func TestExtractVmssVMName(t *testing.T) { | func TestExtractVmssVMName(t *testing.T) { | ||||||
| @@ -87,7 +88,7 @@ func TestVMSSVMCache(t *testing.T) { | |||||||
| 	for i := range virtualMachines { | 	for i := range virtualMachines { | ||||||
| 		vm := virtualMachines[i] | 		vm := virtualMachines[i] | ||||||
| 		vmName := to.String(vm.OsProfile.ComputerName) | 		vmName := to.String(vm.OsProfile.ComputerName) | ||||||
| 		ssName, instanceID, realVM, err := ss.getVmssVM(vmName, cacheReadTypeDefault) | 		ssName, instanceID, realVM, err := ss.getVmssVM(vmName, azcache.CacheReadTypeDefault) | ||||||
| 		assert.Nil(t, err) | 		assert.Nil(t, err) | ||||||
| 		assert.Equal(t, "vmss", ssName) | 		assert.Equal(t, "vmss", ssName) | ||||||
| 		assert.Equal(t, to.String(vm.InstanceID), instanceID) | 		assert.Equal(t, to.String(vm.InstanceID), instanceID) | ||||||
| @@ -101,14 +102,14 @@ func TestVMSSVMCache(t *testing.T) { | |||||||
| 	assert.NoError(t, err) | 	assert.NoError(t, err) | ||||||
|  |  | ||||||
| 	// the VM should be removed from cache after deleteCacheForNode(). | 	// the VM should be removed from cache after deleteCacheForNode(). | ||||||
| 	cached, err := ss.vmssVMCache.Get(vmssVirtualMachinesKey, cacheReadTypeDefault) | 	cached, err := ss.vmssVMCache.Get(vmssVirtualMachinesKey, azcache.CacheReadTypeDefault) | ||||||
| 	assert.NoError(t, err) | 	assert.NoError(t, err) | ||||||
| 	cachedVirtualMachines := cached.(*sync.Map) | 	cachedVirtualMachines := cached.(*sync.Map) | ||||||
| 	_, ok := cachedVirtualMachines.Load(vmName) | 	_, ok := cachedVirtualMachines.Load(vmName) | ||||||
| 	assert.Equal(t, false, ok) | 	assert.Equal(t, false, ok) | ||||||
|  |  | ||||||
| 	// the VM should be get back after another cache refresh. | 	// the VM should be get back after another cache refresh. | ||||||
| 	ssName, instanceID, realVM, err := ss.getVmssVM(vmName, cacheReadTypeDefault) | 	ssName, instanceID, realVM, err := ss.getVmssVM(vmName, azcache.CacheReadTypeDefault) | ||||||
| 	assert.NoError(t, err) | 	assert.NoError(t, err) | ||||||
| 	assert.Equal(t, "vmss", ssName) | 	assert.Equal(t, "vmss", ssName) | ||||||
| 	assert.Equal(t, to.String(vm.InstanceID), instanceID) | 	assert.Equal(t, to.String(vm.InstanceID), instanceID) | ||||||
| @@ -130,7 +131,7 @@ func TestVMSSVMCacheWithDeletingNodes(t *testing.T) { | |||||||
| 		vmName := to.String(vm.OsProfile.ComputerName) | 		vmName := to.String(vm.OsProfile.ComputerName) | ||||||
| 		assert.Equal(t, vm.ProvisioningState, to.StringPtr(string(compute.ProvisioningStateDeleting))) | 		assert.Equal(t, vm.ProvisioningState, to.StringPtr(string(compute.ProvisioningStateDeleting))) | ||||||
|  |  | ||||||
| 		ssName, instanceID, realVM, err := ss.getVmssVM(vmName, cacheReadTypeDefault) | 		ssName, instanceID, realVM, err := ss.getVmssVM(vmName, azcache.CacheReadTypeDefault) | ||||||
| 		assert.Nil(t, realVM) | 		assert.Nil(t, realVM) | ||||||
| 		assert.Equal(t, "", ssName) | 		assert.Equal(t, "", ssName) | ||||||
| 		assert.Equal(t, instanceID, ssName) | 		assert.Equal(t, instanceID, ssName) | ||||||
|   | |||||||
| @@ -32,6 +32,7 @@ import ( | |||||||
| 	"k8s.io/apimachinery/pkg/types" | 	"k8s.io/apimachinery/pkg/types" | ||||||
| 	cloudprovider "k8s.io/cloud-provider" | 	cloudprovider "k8s.io/cloud-provider" | ||||||
| 	"k8s.io/klog" | 	"k8s.io/klog" | ||||||
|  | 	azcache "k8s.io/legacy-cloud-providers/azure/cache" | ||||||
| 	"k8s.io/legacy-cloud-providers/azure/retry" | 	"k8s.io/legacy-cloud-providers/azure/retry" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| @@ -63,7 +64,7 @@ func checkResourceExistsFromError(err *retry.Error) (bool, *retry.Error) { | |||||||
| /// getVirtualMachine calls 'VirtualMachinesClient.Get' with a timed cache | /// getVirtualMachine calls 'VirtualMachinesClient.Get' with a timed cache | ||||||
| /// The service side has throttling control that delays responses if there're multiple requests onto certain vm | /// The service side has throttling control that delays responses if there're multiple requests onto certain vm | ||||||
| /// resource request in short period. | /// resource request in short period. | ||||||
| func (az *Cloud) getVirtualMachine(nodeName types.NodeName, crt cacheReadType) (vm compute.VirtualMachine, err error) { | func (az *Cloud) getVirtualMachine(nodeName types.NodeName, crt azcache.AzureCacheReadType) (vm compute.VirtualMachine, err error) { | ||||||
| 	vmName := string(nodeName) | 	vmName := string(nodeName) | ||||||
| 	cachedVM, err := az.vmCache.Get(vmName, crt) | 	cachedVM, err := az.vmCache.Get(vmName, crt) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| @@ -77,7 +78,7 @@ func (az *Cloud) getVirtualMachine(nodeName types.NodeName, crt cacheReadType) ( | |||||||
| 	return *(cachedVM.(*compute.VirtualMachine)), nil | 	return *(cachedVM.(*compute.VirtualMachine)), nil | ||||||
| } | } | ||||||
|  |  | ||||||
| func (az *Cloud) getRouteTable(crt cacheReadType) (routeTable network.RouteTable, exists bool, err error) { | func (az *Cloud) getRouteTable(crt azcache.AzureCacheReadType) (routeTable network.RouteTable, exists bool, err error) { | ||||||
| 	cachedRt, err := az.rtCache.Get(az.RouteTableName, crt) | 	cachedRt, err := az.rtCache.Get(az.RouteTableName, crt) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return routeTable, false, err | 		return routeTable, false, err | ||||||
| @@ -136,7 +137,7 @@ func (az *Cloud) getSubnet(virtualNetworkName string, subnetName string) (networ | |||||||
| 	return subnet, exists, nil | 	return subnet, exists, nil | ||||||
| } | } | ||||||
|  |  | ||||||
| func (az *Cloud) getAzureLoadBalancer(name string, crt cacheReadType) (lb network.LoadBalancer, exists bool, err error) { | func (az *Cloud) getAzureLoadBalancer(name string, crt azcache.AzureCacheReadType) (lb network.LoadBalancer, exists bool, err error) { | ||||||
| 	cachedLB, err := az.lbCache.Get(name, crt) | 	cachedLB, err := az.lbCache.Get(name, crt) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return lb, false, err | 		return lb, false, err | ||||||
| @@ -149,7 +150,7 @@ func (az *Cloud) getAzureLoadBalancer(name string, crt cacheReadType) (lb networ | |||||||
| 	return *(cachedLB.(*network.LoadBalancer)), true, nil | 	return *(cachedLB.(*network.LoadBalancer)), true, nil | ||||||
| } | } | ||||||
|  |  | ||||||
| func (az *Cloud) getSecurityGroup(crt cacheReadType) (network.SecurityGroup, error) { | func (az *Cloud) getSecurityGroup(crt azcache.AzureCacheReadType) (network.SecurityGroup, error) { | ||||||
| 	nsg := network.SecurityGroup{} | 	nsg := network.SecurityGroup{} | ||||||
| 	if az.SecurityGroupName == "" { | 	if az.SecurityGroupName == "" { | ||||||
| 		return nsg, fmt.Errorf("securityGroupName is not configured") | 		return nsg, fmt.Errorf("securityGroupName is not configured") | ||||||
| @@ -167,7 +168,7 @@ func (az *Cloud) getSecurityGroup(crt cacheReadType) (network.SecurityGroup, err | |||||||
| 	return *(securityGroup.(*network.SecurityGroup)), nil | 	return *(securityGroup.(*network.SecurityGroup)), nil | ||||||
| } | } | ||||||
|  |  | ||||||
| func (az *Cloud) newVMCache() (*timedCache, error) { | func (az *Cloud) newVMCache() (*azcache.TimedCache, error) { | ||||||
| 	getter := func(key string) (interface{}, error) { | 	getter := func(key string) (interface{}, error) { | ||||||
| 		// Currently InstanceView request are used by azure_zones, while the calls come after non-InstanceView | 		// Currently InstanceView request are used by azure_zones, while the calls come after non-InstanceView | ||||||
| 		// request. If we first send an InstanceView request and then a non InstanceView request, the second | 		// request. If we first send an InstanceView request and then a non InstanceView request, the second | ||||||
| @@ -206,10 +207,10 @@ func (az *Cloud) newVMCache() (*timedCache, error) { | |||||||
| 	if az.VMCacheTTLInSeconds == 0 { | 	if az.VMCacheTTLInSeconds == 0 { | ||||||
| 		az.VMCacheTTLInSeconds = vmCacheTTLDefaultInSeconds | 		az.VMCacheTTLInSeconds = vmCacheTTLDefaultInSeconds | ||||||
| 	} | 	} | ||||||
| 	return newTimedcache(time.Duration(az.VMCacheTTLInSeconds)*time.Second, getter) | 	return azcache.NewTimedcache(time.Duration(az.VMCacheTTLInSeconds)*time.Second, getter) | ||||||
| } | } | ||||||
|  |  | ||||||
| func (az *Cloud) newLBCache() (*timedCache, error) { | func (az *Cloud) newLBCache() (*azcache.TimedCache, error) { | ||||||
| 	getter := func(key string) (interface{}, error) { | 	getter := func(key string) (interface{}, error) { | ||||||
| 		ctx, cancel := getContextWithCancel() | 		ctx, cancel := getContextWithCancel() | ||||||
| 		defer cancel() | 		defer cancel() | ||||||
| @@ -231,10 +232,10 @@ func (az *Cloud) newLBCache() (*timedCache, error) { | |||||||
| 	if az.LoadBalancerCacheTTLInSeconds == 0 { | 	if az.LoadBalancerCacheTTLInSeconds == 0 { | ||||||
| 		az.LoadBalancerCacheTTLInSeconds = loadBalancerCacheTTLDefaultInSeconds | 		az.LoadBalancerCacheTTLInSeconds = loadBalancerCacheTTLDefaultInSeconds | ||||||
| 	} | 	} | ||||||
| 	return newTimedcache(time.Duration(az.LoadBalancerCacheTTLInSeconds)*time.Second, getter) | 	return azcache.NewTimedcache(time.Duration(az.LoadBalancerCacheTTLInSeconds)*time.Second, getter) | ||||||
| } | } | ||||||
|  |  | ||||||
| func (az *Cloud) newNSGCache() (*timedCache, error) { | func (az *Cloud) newNSGCache() (*azcache.TimedCache, error) { | ||||||
| 	getter := func(key string) (interface{}, error) { | 	getter := func(key string) (interface{}, error) { | ||||||
| 		ctx, cancel := getContextWithCancel() | 		ctx, cancel := getContextWithCancel() | ||||||
| 		defer cancel() | 		defer cancel() | ||||||
| @@ -255,10 +256,10 @@ func (az *Cloud) newNSGCache() (*timedCache, error) { | |||||||
| 	if az.NsgCacheTTLInSeconds == 0 { | 	if az.NsgCacheTTLInSeconds == 0 { | ||||||
| 		az.NsgCacheTTLInSeconds = nsgCacheTTLDefaultInSeconds | 		az.NsgCacheTTLInSeconds = nsgCacheTTLDefaultInSeconds | ||||||
| 	} | 	} | ||||||
| 	return newTimedcache(time.Duration(az.NsgCacheTTLInSeconds)*time.Second, getter) | 	return azcache.NewTimedcache(time.Duration(az.NsgCacheTTLInSeconds)*time.Second, getter) | ||||||
| } | } | ||||||
|  |  | ||||||
| func (az *Cloud) newRouteTableCache() (*timedCache, error) { | func (az *Cloud) newRouteTableCache() (*azcache.TimedCache, error) { | ||||||
| 	getter := func(key string) (interface{}, error) { | 	getter := func(key string) (interface{}, error) { | ||||||
| 		ctx, cancel := getContextWithCancel() | 		ctx, cancel := getContextWithCancel() | ||||||
| 		defer cancel() | 		defer cancel() | ||||||
| @@ -279,7 +280,7 @@ func (az *Cloud) newRouteTableCache() (*timedCache, error) { | |||||||
| 	if az.RouteTableCacheTTLInSeconds == 0 { | 	if az.RouteTableCacheTTLInSeconds == 0 { | ||||||
| 		az.RouteTableCacheTTLInSeconds = routeTableCacheTTLDefaultInSeconds | 		az.RouteTableCacheTTLInSeconds = routeTableCacheTTLDefaultInSeconds | ||||||
| 	} | 	} | ||||||
| 	return newTimedcache(time.Duration(az.RouteTableCacheTTLInSeconds)*time.Second, getter) | 	return azcache.NewTimedcache(time.Duration(az.RouteTableCacheTTLInSeconds)*time.Second, getter) | ||||||
| } | } | ||||||
|  |  | ||||||
| func (az *Cloud) useStandardLoadBalancer() bool { | func (az *Cloud) useStandardLoadBalancer() bool { | ||||||
|   | |||||||
| @@ -28,6 +28,7 @@ import ( | |||||||
| 	"k8s.io/apimachinery/pkg/types" | 	"k8s.io/apimachinery/pkg/types" | ||||||
| 	cloudprovider "k8s.io/cloud-provider" | 	cloudprovider "k8s.io/cloud-provider" | ||||||
| 	"k8s.io/klog" | 	"k8s.io/klog" | ||||||
|  | 	azcache "k8s.io/legacy-cloud-providers/azure/cache" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| // makeZone returns the zone value in format of <region>-<zone-id>. | // makeZone returns the zone value in format of <region>-<zone-id>. | ||||||
| @@ -53,7 +54,7 @@ func (az *Cloud) GetZoneID(zoneLabel string) string { | |||||||
| // If the node is not running with availability zones, then it will fall back to fault domain. | // If the node is not running with availability zones, then it will fall back to fault domain. | ||||||
| func (az *Cloud) GetZone(ctx context.Context) (cloudprovider.Zone, error) { | func (az *Cloud) GetZone(ctx context.Context) (cloudprovider.Zone, error) { | ||||||
| 	if az.UseInstanceMetadata { | 	if az.UseInstanceMetadata { | ||||||
| 		metadata, err := az.metadata.GetMetadata(cacheReadTypeUnsafe) | 		metadata, err := az.metadata.GetMetadata(azcache.CacheReadTypeUnsafe) | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			return cloudprovider.Zone{}, err | 			return cloudprovider.Zone{}, err | ||||||
| 		} | 		} | ||||||
|   | |||||||
							
								
								
									
										34
									
								
								staging/src/k8s.io/legacy-cloud-providers/azure/cache/BUILD
									
									
									
									
										vendored
									
									
										Normal file
									
								
							
							
						
						
									
										34
									
								
								staging/src/k8s.io/legacy-cloud-providers/azure/cache/BUILD
									
									
									
									
										vendored
									
									
										Normal file
									
								
							| @@ -0,0 +1,34 @@ | |||||||
|  | load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") | ||||||
|  |  | ||||||
|  | go_library( | ||||||
|  |     name = "go_default_library", | ||||||
|  |     srcs = [ | ||||||
|  |         "azure_cache.go", | ||||||
|  |         "doc.go", | ||||||
|  |     ], | ||||||
|  |     importmap = "k8s.io/kubernetes/vendor/k8s.io/legacy-cloud-providers/azure/cache", | ||||||
|  |     importpath = "k8s.io/legacy-cloud-providers/azure/cache", | ||||||
|  |     visibility = ["//visibility:public"], | ||||||
|  |     deps = ["//staging/src/k8s.io/client-go/tools/cache:go_default_library"], | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | go_test( | ||||||
|  |     name = "go_default_test", | ||||||
|  |     srcs = ["azure_cache_test.go"], | ||||||
|  |     embed = [":go_default_library"], | ||||||
|  |     deps = ["//vendor/github.com/stretchr/testify/assert:go_default_library"], | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | filegroup( | ||||||
|  |     name = "package-srcs", | ||||||
|  |     srcs = glob(["**"]), | ||||||
|  |     tags = ["automanaged"], | ||||||
|  |     visibility = ["//visibility:private"], | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | filegroup( | ||||||
|  |     name = "all-srcs", | ||||||
|  |     srcs = [":package-srcs"], | ||||||
|  |     tags = ["automanaged"], | ||||||
|  |     visibility = ["//visibility:public"], | ||||||
|  | ) | ||||||
| @@ -16,7 +16,7 @@ See the License for the specific language governing permissions and | |||||||
| limitations under the License. | limitations under the License. | ||||||
| */ | */ | ||||||
| 
 | 
 | ||||||
| package azure | package cache | ||||||
| 
 | 
 | ||||||
| import ( | import ( | ||||||
| 	"fmt" | 	"fmt" | ||||||
| @@ -26,143 +26,143 @@ import ( | |||||||
| 	"k8s.io/client-go/tools/cache" | 	"k8s.io/client-go/tools/cache" | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| // cacheReadType defines the read type for cache data | // AzureCacheReadType defines the read type for cache data | ||||||
| type cacheReadType int | type AzureCacheReadType int | ||||||
| 
 | 
 | ||||||
| const ( | const ( | ||||||
| 	// cacheReadTypeDefault returns data from cache if cache entry not expired | 	// CacheReadTypeDefault returns data from cache if cache entry not expired | ||||||
| 	// if cache entry expired, then it will refetch the data using getter | 	// if cache entry expired, then it will refetch the data using getter | ||||||
| 	// save the entry in cache and then return | 	// save the entry in cache and then return | ||||||
| 	cacheReadTypeDefault cacheReadType = iota | 	CacheReadTypeDefault AzureCacheReadType = iota | ||||||
| 	// cacheReadTypeUnsafe returns data from cache even if the cache entry is | 	// CacheReadTypeUnsafe returns data from cache even if the cache entry is | ||||||
| 	// active/expired. If entry doesn't exist in cache, then data is fetched | 	// active/expired. If entry doesn't exist in cache, then data is fetched | ||||||
| 	// using getter, saved in cache and returned | 	// using getter, saved in cache and returned | ||||||
| 	cacheReadTypeUnsafe | 	CacheReadTypeUnsafe | ||||||
| 	// cacheReadTypeForceRefresh force refreshes the cache even if the cache entry | 	// CacheReadTypeForceRefresh force refreshes the cache even if the cache entry | ||||||
| 	// is not expired | 	// is not expired | ||||||
| 	cacheReadTypeForceRefresh | 	CacheReadTypeForceRefresh | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| // getFunc defines a getter function for timedCache. | // GetFunc defines a getter function for timedCache. | ||||||
| type getFunc func(key string) (interface{}, error) | type GetFunc func(key string) (interface{}, error) | ||||||
| 
 | 
 | ||||||
| // cacheEntry is the internal structure stores inside TTLStore. | // AzureCacheEntry is the internal structure stores inside TTLStore. | ||||||
| type cacheEntry struct { | type AzureCacheEntry struct { | ||||||
| 	key  string | 	Key  string | ||||||
| 	data interface{} | 	Data interface{} | ||||||
| 
 | 
 | ||||||
| 	// The lock to ensure not updating same entry simultaneously. | 	// The lock to ensure not updating same entry simultaneously. | ||||||
| 	lock sync.Mutex | 	Lock sync.Mutex | ||||||
| 	// time when entry was fetched and created | 	// time when entry was fetched and created | ||||||
| 	createdOn time.Time | 	CreatedOn time.Time | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // cacheKeyFunc defines the key function required in TTLStore. | // cacheKeyFunc defines the key function required in TTLStore. | ||||||
| func cacheKeyFunc(obj interface{}) (string, error) { | func cacheKeyFunc(obj interface{}) (string, error) { | ||||||
| 	return obj.(*cacheEntry).key, nil | 	return obj.(*AzureCacheEntry).Key, nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // timedCache is a cache with TTL. | // TimedCache is a cache with TTL. | ||||||
| type timedCache struct { | type TimedCache struct { | ||||||
| 	store  cache.Store | 	Store  cache.Store | ||||||
| 	lock   sync.Mutex | 	Lock   sync.Mutex | ||||||
| 	getter getFunc | 	Getter GetFunc | ||||||
| 	ttl    time.Duration | 	TTL    time.Duration | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // newTimedcache creates a new timedCache. | // NewTimedcache creates a new TimedCache. | ||||||
| func newTimedcache(ttl time.Duration, getter getFunc) (*timedCache, error) { | func NewTimedcache(ttl time.Duration, getter GetFunc) (*TimedCache, error) { | ||||||
| 	if getter == nil { | 	if getter == nil { | ||||||
| 		return nil, fmt.Errorf("getter is not provided") | 		return nil, fmt.Errorf("getter is not provided") | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	return &timedCache{ | 	return &TimedCache{ | ||||||
| 		getter: getter, | 		Getter: getter, | ||||||
| 		// switch to using NewStore instead of NewTTLStore so that we can | 		// switch to using NewStore instead of NewTTLStore so that we can | ||||||
| 		// reuse entries for calls that are fine with reading expired/stalled data. | 		// reuse entries for calls that are fine with reading expired/stalled data. | ||||||
| 		// with NewTTLStore, entries are not returned if they have already expired. | 		// with NewTTLStore, entries are not returned if they have already expired. | ||||||
| 		store: cache.NewStore(cacheKeyFunc), | 		Store: cache.NewStore(cacheKeyFunc), | ||||||
| 		ttl:   ttl, | 		TTL:   ttl, | ||||||
| 	}, nil | 	}, nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // getInternal returns cacheEntry by key. If the key is not cached yet, | // getInternal returns AzureCacheEntry by key. If the key is not cached yet, | ||||||
| // it returns a cacheEntry with nil data. | // it returns a AzureCacheEntry with nil data. | ||||||
| func (t *timedCache) getInternal(key string) (*cacheEntry, error) { | func (t *TimedCache) getInternal(key string) (*AzureCacheEntry, error) { | ||||||
| 	entry, exists, err := t.store.GetByKey(key) | 	entry, exists, err := t.Store.GetByKey(key) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return nil, err | 		return nil, err | ||||||
| 	} | 	} | ||||||
| 	// if entry exists, return the entry | 	// if entry exists, return the entry | ||||||
| 	if exists { | 	if exists { | ||||||
| 		return entry.(*cacheEntry), nil | 		return entry.(*AzureCacheEntry), nil | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	// lock here to ensure if entry doesn't exist, we add a new entry | 	// lock here to ensure if entry doesn't exist, we add a new entry | ||||||
| 	// avoiding overwrites | 	// avoiding overwrites | ||||||
| 	t.lock.Lock() | 	t.Lock.Lock() | ||||||
| 	defer t.lock.Unlock() | 	defer t.Lock.Unlock() | ||||||
| 
 | 
 | ||||||
| 	// Still not found, add new entry with nil data. | 	// Still not found, add new entry with nil data. | ||||||
| 	// Note the data will be filled later by getter. | 	// Note the data will be filled later by getter. | ||||||
| 	newEntry := &cacheEntry{ | 	newEntry := &AzureCacheEntry{ | ||||||
| 		key:  key, | 		Key:  key, | ||||||
| 		data: nil, | 		Data: nil, | ||||||
| 	} | 	} | ||||||
| 	t.store.Add(newEntry) | 	t.Store.Add(newEntry) | ||||||
| 	return newEntry, nil | 	return newEntry, nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // Get returns the requested item by key. | // Get returns the requested item by key. | ||||||
| func (t *timedCache) Get(key string, crt cacheReadType) (interface{}, error) { | func (t *TimedCache) Get(key string, crt AzureCacheReadType) (interface{}, error) { | ||||||
| 	entry, err := t.getInternal(key) | 	entry, err := t.getInternal(key) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return nil, err | 		return nil, err | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	entry.lock.Lock() | 	entry.Lock.Lock() | ||||||
| 	defer entry.lock.Unlock() | 	defer entry.Lock.Unlock() | ||||||
| 
 | 
 | ||||||
| 	// entry exists and if cache is not force refreshed | 	// entry exists and if cache is not force refreshed | ||||||
| 	if entry.data != nil && crt != cacheReadTypeForceRefresh { | 	if entry.Data != nil && crt != CacheReadTypeForceRefresh { | ||||||
| 		// allow unsafe read, so return data even if expired | 		// allow unsafe read, so return data even if expired | ||||||
| 		if crt == cacheReadTypeUnsafe { | 		if crt == CacheReadTypeUnsafe { | ||||||
| 			return entry.data, nil | 			return entry.Data, nil | ||||||
| 		} | 		} | ||||||
| 		// if cached data is not expired, return cached data | 		// if cached data is not expired, return cached data | ||||||
| 		if crt == cacheReadTypeDefault && time.Since(entry.createdOn) < t.ttl { | 		if crt == CacheReadTypeDefault && time.Since(entry.CreatedOn) < t.TTL { | ||||||
| 			return entry.data, nil | 			return entry.Data, nil | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| 	// Data is not cached yet, cache data is expired or requested force refresh | 	// Data is not cached yet, cache data is expired or requested force refresh | ||||||
| 	// cache it by getter. entry is locked before getting to ensure concurrent | 	// cache it by getter. entry is locked before getting to ensure concurrent | ||||||
| 	// gets don't result in multiple ARM calls. | 	// gets don't result in multiple ARM calls. | ||||||
| 	data, err := t.getter(key) | 	data, err := t.Getter(key) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return nil, err | 		return nil, err | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	// set the data in cache and also set the last update time | 	// set the data in cache and also set the last update time | ||||||
| 	// to now as the data was recently fetched | 	// to now as the data was recently fetched | ||||||
| 	entry.data = data | 	entry.Data = data | ||||||
| 	entry.createdOn = time.Now().UTC() | 	entry.CreatedOn = time.Now().UTC() | ||||||
| 
 | 
 | ||||||
| 	return entry.data, nil | 	return entry.Data, nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // Delete removes an item from the cache. | // Delete removes an item from the cache. | ||||||
| func (t *timedCache) Delete(key string) error { | func (t *TimedCache) Delete(key string) error { | ||||||
| 	return t.store.Delete(&cacheEntry{ | 	return t.Store.Delete(&AzureCacheEntry{ | ||||||
| 		key: key, | 		Key: key, | ||||||
| 	}) | 	}) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // Set sets the data cache for the key. | // Set sets the data cache for the key. | ||||||
| // It is only used for testing. | // It is only used for testing. | ||||||
| func (t *timedCache) Set(key string, data interface{}) { | func (t *TimedCache) Set(key string, data interface{}) { | ||||||
| 	t.store.Add(&cacheEntry{ | 	t.Store.Add(&AzureCacheEntry{ | ||||||
| 		key:       key, | 		Key:       key, | ||||||
| 		data:      data, | 		Data:      data, | ||||||
| 		createdOn: time.Now().UTC(), | 		CreatedOn: time.Now().UTC(), | ||||||
| 	}) | 	}) | ||||||
| } | } | ||||||
| @@ -16,7 +16,7 @@ See the License for the specific language governing permissions and | |||||||
| limitations under the License. | limitations under the License. | ||||||
| */ | */ | ||||||
| 
 | 
 | ||||||
| package azure | package cache | ||||||
| 
 | 
 | ||||||
| import ( | import ( | ||||||
| 	"fmt" | 	"fmt" | ||||||
| @@ -59,12 +59,12 @@ func (fake *fakeDataSource) set(data map[string]*fakeDataObj) { | |||||||
| 	fake.called = 0 | 	fake.called = 0 | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func newFakeCache(t *testing.T) (*fakeDataSource, *timedCache) { | func newFakeCache(t *testing.T) (*fakeDataSource, *TimedCache) { | ||||||
| 	dataSource := &fakeDataSource{ | 	dataSource := &fakeDataSource{ | ||||||
| 		data: make(map[string]*fakeDataObj), | 		data: make(map[string]*fakeDataObj), | ||||||
| 	} | 	} | ||||||
| 	getter := dataSource.get | 	getter := dataSource.get | ||||||
| 	cache, err := newTimedcache(fakeCacheTTL, getter) | 	cache, err := NewTimedcache(fakeCacheTTL, getter) | ||||||
| 	assert.NoError(t, err) | 	assert.NoError(t, err) | ||||||
| 	return dataSource, cache | 	return dataSource, cache | ||||||
| } | } | ||||||
| @@ -99,7 +99,7 @@ func TestCacheGet(t *testing.T) { | |||||||
| 	for _, c := range cases { | 	for _, c := range cases { | ||||||
| 		dataSource, cache := newFakeCache(t) | 		dataSource, cache := newFakeCache(t) | ||||||
| 		dataSource.set(c.data) | 		dataSource.set(c.data) | ||||||
| 		val, err := cache.Get(c.key, cacheReadTypeDefault) | 		val, err := cache.Get(c.key, CacheReadTypeDefault) | ||||||
| 		assert.NoError(t, err, c.name) | 		assert.NoError(t, err, c.name) | ||||||
| 		assert.Equal(t, c.expected, val, c.name) | 		assert.Equal(t, c.expected, val, c.name) | ||||||
| 	} | 	} | ||||||
| @@ -110,10 +110,10 @@ func TestCacheGetError(t *testing.T) { | |||||||
| 	getter := func(key string) (interface{}, error) { | 	getter := func(key string) (interface{}, error) { | ||||||
| 		return nil, getError | 		return nil, getError | ||||||
| 	} | 	} | ||||||
| 	cache, err := newTimedcache(fakeCacheTTL, getter) | 	cache, err := NewTimedcache(fakeCacheTTL, getter) | ||||||
| 	assert.NoError(t, err) | 	assert.NoError(t, err) | ||||||
| 
 | 
 | ||||||
| 	val, err := cache.Get("key", cacheReadTypeDefault) | 	val, err := cache.Get("key", CacheReadTypeDefault) | ||||||
| 	assert.Error(t, err) | 	assert.Error(t, err) | ||||||
| 	assert.Equal(t, getError, err) | 	assert.Equal(t, getError, err) | ||||||
| 	assert.Nil(t, val) | 	assert.Nil(t, val) | ||||||
| @@ -128,13 +128,13 @@ func TestCacheDelete(t *testing.T) { | |||||||
| 	dataSource, cache := newFakeCache(t) | 	dataSource, cache := newFakeCache(t) | ||||||
| 	dataSource.set(data) | 	dataSource.set(data) | ||||||
| 
 | 
 | ||||||
| 	v, err := cache.Get(key, cacheReadTypeDefault) | 	v, err := cache.Get(key, CacheReadTypeDefault) | ||||||
| 	assert.NoError(t, err) | 	assert.NoError(t, err) | ||||||
| 	assert.Equal(t, val, v, "cache should get correct data") | 	assert.Equal(t, val, v, "cache should get correct data") | ||||||
| 
 | 
 | ||||||
| 	dataSource.set(nil) | 	dataSource.set(nil) | ||||||
| 	cache.Delete(key) | 	cache.Delete(key) | ||||||
| 	v, err = cache.Get(key, cacheReadTypeDefault) | 	v, err = cache.Get(key, CacheReadTypeDefault) | ||||||
| 	assert.NoError(t, err) | 	assert.NoError(t, err) | ||||||
| 	assert.Equal(t, 1, dataSource.called) | 	assert.Equal(t, 1, dataSource.called) | ||||||
| 	assert.Equal(t, nil, v, "cache should get nil after data is removed") | 	assert.Equal(t, nil, v, "cache should get nil after data is removed") | ||||||
| @@ -149,13 +149,13 @@ func TestCacheExpired(t *testing.T) { | |||||||
| 	dataSource, cache := newFakeCache(t) | 	dataSource, cache := newFakeCache(t) | ||||||
| 	dataSource.set(data) | 	dataSource.set(data) | ||||||
| 
 | 
 | ||||||
| 	v, err := cache.Get(key, cacheReadTypeDefault) | 	v, err := cache.Get(key, CacheReadTypeDefault) | ||||||
| 	assert.NoError(t, err) | 	assert.NoError(t, err) | ||||||
| 	assert.Equal(t, 1, dataSource.called) | 	assert.Equal(t, 1, dataSource.called) | ||||||
| 	assert.Equal(t, val, v, "cache should get correct data") | 	assert.Equal(t, val, v, "cache should get correct data") | ||||||
| 
 | 
 | ||||||
| 	time.Sleep(fakeCacheTTL) | 	time.Sleep(fakeCacheTTL) | ||||||
| 	v, err = cache.Get(key, cacheReadTypeDefault) | 	v, err = cache.Get(key, CacheReadTypeDefault) | ||||||
| 	assert.NoError(t, err) | 	assert.NoError(t, err) | ||||||
| 	assert.Equal(t, 2, dataSource.called) | 	assert.Equal(t, 2, dataSource.called) | ||||||
| 	assert.Equal(t, val, v, "cache should get correct data even after expired") | 	assert.Equal(t, val, v, "cache should get correct data even after expired") | ||||||
| @@ -170,13 +170,13 @@ func TestCacheAllowUnsafeRead(t *testing.T) { | |||||||
| 	dataSource, cache := newFakeCache(t) | 	dataSource, cache := newFakeCache(t) | ||||||
| 	dataSource.set(data) | 	dataSource.set(data) | ||||||
| 
 | 
 | ||||||
| 	v, err := cache.Get(key, cacheReadTypeDefault) | 	v, err := cache.Get(key, CacheReadTypeDefault) | ||||||
| 	assert.NoError(t, err) | 	assert.NoError(t, err) | ||||||
| 	assert.Equal(t, 1, dataSource.called) | 	assert.Equal(t, 1, dataSource.called) | ||||||
| 	assert.Equal(t, val, v, "cache should get correct data") | 	assert.Equal(t, val, v, "cache should get correct data") | ||||||
| 
 | 
 | ||||||
| 	time.Sleep(fakeCacheTTL) | 	time.Sleep(fakeCacheTTL) | ||||||
| 	v, err = cache.Get(key, cacheReadTypeUnsafe) | 	v, err = cache.Get(key, CacheReadTypeUnsafe) | ||||||
| 	assert.NoError(t, err) | 	assert.NoError(t, err) | ||||||
| 	assert.Equal(t, 1, dataSource.called) | 	assert.Equal(t, 1, dataSource.called) | ||||||
| 	assert.Equal(t, val, v, "cache should return expired as allow unsafe read is allowed") | 	assert.Equal(t, val, v, "cache should return expired as allow unsafe read is allowed") | ||||||
| @@ -195,10 +195,10 @@ func TestCacheNoConcurrentGet(t *testing.T) { | |||||||
| 	var wg sync.WaitGroup | 	var wg sync.WaitGroup | ||||||
| 	for i := 0; i < 5; i++ { | 	for i := 0; i < 5; i++ { | ||||||
| 		wg.Add(1) | 		wg.Add(1) | ||||||
| 		go cache.Get(key, cacheReadTypeDefault) | 		go cache.Get(key, CacheReadTypeDefault) | ||||||
| 		wg.Done() | 		wg.Done() | ||||||
| 	} | 	} | ||||||
| 	v, err := cache.Get(key, cacheReadTypeDefault) | 	v, err := cache.Get(key, CacheReadTypeDefault) | ||||||
| 	wg.Wait() | 	wg.Wait() | ||||||
| 	assert.NoError(t, err) | 	assert.NoError(t, err) | ||||||
| 	assert.Equal(t, 1, dataSource.called) | 	assert.Equal(t, 1, dataSource.called) | ||||||
| @@ -214,12 +214,12 @@ func TestCacheForceRefresh(t *testing.T) { | |||||||
| 	dataSource, cache := newFakeCache(t) | 	dataSource, cache := newFakeCache(t) | ||||||
| 	dataSource.set(data) | 	dataSource.set(data) | ||||||
| 
 | 
 | ||||||
| 	v, err := cache.Get(key, cacheReadTypeDefault) | 	v, err := cache.Get(key, CacheReadTypeDefault) | ||||||
| 	assert.NoError(t, err) | 	assert.NoError(t, err) | ||||||
| 	assert.Equal(t, 1, dataSource.called) | 	assert.Equal(t, 1, dataSource.called) | ||||||
| 	assert.Equal(t, val, v, "cache should get correct data") | 	assert.Equal(t, val, v, "cache should get correct data") | ||||||
| 
 | 
 | ||||||
| 	v, err = cache.Get(key, cacheReadTypeForceRefresh) | 	v, err = cache.Get(key, CacheReadTypeForceRefresh) | ||||||
| 	assert.NoError(t, err) | 	assert.NoError(t, err) | ||||||
| 	assert.Equal(t, 2, dataSource.called) | 	assert.Equal(t, 2, dataSource.called) | ||||||
| 	assert.Equal(t, val, v, "should refetch unexpired data as forced refresh") | 	assert.Equal(t, val, v, "should refetch unexpired data as forced refresh") | ||||||
							
								
								
									
										20
									
								
								staging/src/k8s.io/legacy-cloud-providers/azure/cache/doc.go
									
									
									
									
										vendored
									
									
										Normal file
									
								
							
							
						
						
									
										20
									
								
								staging/src/k8s.io/legacy-cloud-providers/azure/cache/doc.go
									
									
									
									
										vendored
									
									
										Normal file
									
								
							| @@ -0,0 +1,20 @@ | |||||||
|  | // +build !providerless | ||||||
|  |  | ||||||
|  | /* | ||||||
|  | Copyright 2019 The Kubernetes Authors. | ||||||
|  |  | ||||||
|  | Licensed under the Apache License, Version 2.0 (the "License"); | ||||||
|  | you may not use this file except in compliance with the License. | ||||||
|  | You may obtain a copy of the License at | ||||||
|  |  | ||||||
|  |     http://www.apache.org/licenses/LICENSE-2.0 | ||||||
|  |  | ||||||
|  | Unless required by applicable law or agreed to in writing, software | ||||||
|  | distributed under the License is distributed on an "AS IS" BASIS, | ||||||
|  | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||||
|  | See the License for the specific language governing permissions and | ||||||
|  | limitations under the License. | ||||||
|  | */ | ||||||
|  |  | ||||||
|  | // Package cache is an implementation of Azure caches. | ||||||
|  | package cache // import "k8s.io/legacy-cloud-providers/azure/cache" | ||||||
							
								
								
									
										1
									
								
								vendor/modules.txt
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										1
									
								
								vendor/modules.txt
									
									
									
									
										vendored
									
									
								
							| @@ -1862,6 +1862,7 @@ k8s.io/kubelet/pkg/apis/pluginregistration/v1 | |||||||
| k8s.io/legacy-cloud-providers/aws | k8s.io/legacy-cloud-providers/aws | ||||||
| k8s.io/legacy-cloud-providers/azure | k8s.io/legacy-cloud-providers/azure | ||||||
| k8s.io/legacy-cloud-providers/azure/auth | k8s.io/legacy-cloud-providers/azure/auth | ||||||
|  | k8s.io/legacy-cloud-providers/azure/cache | ||||||
| k8s.io/legacy-cloud-providers/azure/clients | k8s.io/legacy-cloud-providers/azure/clients | ||||||
| k8s.io/legacy-cloud-providers/azure/clients/armclient | k8s.io/legacy-cloud-providers/azure/clients/armclient | ||||||
| k8s.io/legacy-cloud-providers/azure/clients/diskclient | k8s.io/legacy-cloud-providers/azure/clients/diskclient | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user
	 Kubernetes Prow Robot
					Kubernetes Prow Robot