remove disk locks per vm

maintain map with nodename and lock

move lock map to utils
This commit is contained in:
Anish Ramasekar
2019-11-11 15:41:59 -08:00
parent 8af6906d1f
commit 3916c4a6cf
5 changed files with 88 additions and 13 deletions

View File

@@ -29,6 +29,7 @@ go_library(
"azure_standard.go",
"azure_storage.go",
"azure_storageaccount.go",
"azure_utils.go",
"azure_vmsets.go",
"azure_vmss.go",
"azure_vmss_cache.go",
@@ -76,7 +77,6 @@ go_library(
"//vendor/github.com/Azure/go-autorest/autorest/to:go_default_library",
"//vendor/github.com/rubiojr/go-vhd/vhd:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
"//vendor/k8s.io/utils/keymutex:go_default_library",
"//vendor/k8s.io/utils/net:go_default_library",
"//vendor/sigs.k8s.io/yaml:go_default_library",
],

View File

@@ -614,6 +614,7 @@ func initDiskControllers(az *Cloud) error {
resourceGroup: az.ResourceGroup,
subscriptionID: az.SubscriptionID,
cloud: az,
vmLockMap: newLockMap(),
}
az.BlobDiskController = &BlobDiskController{common: common}

View File

@@ -33,7 +33,6 @@ import (
cloudprovider "k8s.io/cloud-provider"
volerr "k8s.io/cloud-provider/volume/errors"
"k8s.io/klog"
"k8s.io/utils/keymutex"
)
const (
@@ -58,9 +57,6 @@ var defaultBackOff = kwait.Backoff{
Jitter: 0.0,
}
// acquire lock to attach/detach disk in one node
var diskOpMutex = keymutex.NewHashed(0)
type controllerCommon struct {
subscriptionID string
location string
@@ -68,7 +64,9 @@ type controllerCommon struct {
resourceGroup string
// store disk URI when disk is in attaching or detaching process
diskAttachDetachMap sync.Map
cloud *Cloud
// vm disk map used to lock per vm update calls
vmLockMap *lockMap
cloud *Cloud
}
// getNodeVMSet gets the VMSet interface based on config.VMType and the real virtual machine type.
@@ -144,8 +142,8 @@ func (c *controllerCommon) AttachDisk(isManagedDisk bool, diskName, diskURI stri
return -1, fmt.Errorf("failed to get azure instance id for node %q (%v)", nodeName, err)
}
diskOpMutex.LockKey(instanceid)
defer diskOpMutex.UnlockKey(instanceid)
c.vmLockMap.LockEntry(string(nodeName))
defer c.vmLockMap.UnlockEntry(string(nodeName))
lun, err := c.GetNextDiskLun(nodeName)
if err != nil {
@@ -161,7 +159,7 @@ func (c *controllerCommon) AttachDisk(isManagedDisk bool, diskName, diskURI stri
// DetachDisk detaches a disk from host. The vhd can be identified by diskName or diskURI.
func (c *controllerCommon) DetachDisk(diskName, diskURI string, nodeName types.NodeName) error {
instanceid, err := c.cloud.InstanceID(context.TODO(), nodeName)
_, err := c.cloud.InstanceID(context.TODO(), nodeName)
if err != nil {
if err == cloudprovider.InstanceNotFound {
// if host doesn't exist, no need to detach
@@ -181,20 +179,20 @@ func (c *controllerCommon) DetachDisk(diskName, diskURI string, nodeName types.N
klog.V(2).Infof("detach %v from node %q", diskURI, nodeName)
// make the lock here as small as possible
diskOpMutex.LockKey(instanceid)
c.vmLockMap.LockEntry(string(nodeName))
c.diskAttachDetachMap.Store(strings.ToLower(diskURI), "detaching")
resp, err := vmset.DetachDisk(diskName, diskURI, nodeName)
c.diskAttachDetachMap.Delete(strings.ToLower(diskURI))
diskOpMutex.UnlockKey(instanceid)
c.vmLockMap.UnlockEntry(string(nodeName))
if c.cloud.CloudProviderBackoff && shouldRetryHTTPRequest(resp, err) {
klog.V(2).Infof("azureDisk - update backing off: detach disk(%s, %s), err: %v", diskName, diskURI, err)
retryErr := kwait.ExponentialBackoff(c.cloud.RequestBackoff(), func() (bool, error) {
diskOpMutex.LockKey(instanceid)
c.vmLockMap.LockEntry(string(nodeName))
c.diskAttachDetachMap.Store(strings.ToLower(diskURI), "detaching")
resp, err := vmset.DetachDisk(diskName, diskURI, nodeName)
c.diskAttachDetachMap.Delete(strings.ToLower(diskURI))
diskOpMutex.UnlockKey(instanceid)
c.vmLockMap.UnlockEntry(string(nodeName))
return c.cloud.processHTTPRetryResponse(nil, "", resp, err)
})
if retryErr != nil {

View File

@@ -68,6 +68,7 @@ func TestCommonAttachDisk(t *testing.T) {
resourceGroup: testCloud.ResourceGroup,
subscriptionID: testCloud.SubscriptionID,
cloud: testCloud,
vmLockMap: newLockMap(),
}
diskURI := fmt.Sprintf("/subscriptions/%s/resourceGroups/%s/providers/Microsoft.Compute/disks/disk-name",
testCloud.SubscriptionID, testCloud.ResourceGroup)
@@ -116,6 +117,7 @@ func TestCommonDetachDisk(t *testing.T) {
resourceGroup: testCloud.ResourceGroup,
subscriptionID: testCloud.SubscriptionID,
cloud: testCloud,
vmLockMap: newLockMap(),
}
diskURI := fmt.Sprintf("/subscriptions/%s/resourceGroups/%s/providers/Microsoft.Compute/disks/disk-name",
testCloud.SubscriptionID, testCloud.ResourceGroup)
@@ -156,6 +158,7 @@ func TestGetDiskLun(t *testing.T) {
resourceGroup: testCloud.ResourceGroup,
subscriptionID: testCloud.SubscriptionID,
cloud: testCloud,
vmLockMap: newLockMap(),
}
setTestVirtualMachines(testCloud, map[string]string{"vm1": "PowerState/Running"}, false)
@@ -194,6 +197,7 @@ func TestGetNextDiskLun(t *testing.T) {
resourceGroup: testCloud.ResourceGroup,
subscriptionID: testCloud.SubscriptionID,
cloud: testCloud,
vmLockMap: newLockMap(),
}
setTestVirtualMachines(testCloud, map[string]string{"vm1": "PowerState/Running"}, test.isDataDisksFull)
@@ -235,6 +239,7 @@ func TestDisksAreAttached(t *testing.T) {
resourceGroup: testCloud.ResourceGroup,
subscriptionID: testCloud.SubscriptionID,
cloud: testCloud,
vmLockMap: newLockMap(),
}
setTestVirtualMachines(testCloud, map[string]string{"vm1": "PowerState/Running"}, false)

View File

@@ -0,0 +1,71 @@
// +build !providerless
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package azure
import (
"sync"
)
// lockMap used to lock on entries
type lockMap struct {
sync.Mutex
mutexMap map[string]*sync.Mutex
}
// NewLockMap returns a new lock map
func newLockMap() *lockMap {
return &lockMap{
mutexMap: make(map[string]*sync.Mutex),
}
}
// LockEntry acquires a lock associated with the specific entry
func (lm *lockMap) LockEntry(entry string) {
lm.Lock()
// check if entry does not exists, then add entry
if _, exists := lm.mutexMap[entry]; !exists {
lm.addEntry(entry)
}
lm.Unlock()
lm.lockEntry(entry)
}
// UnlockEntry release the lock associated with the specific entry
func (lm *lockMap) UnlockEntry(entry string) {
lm.Lock()
defer lm.Unlock()
if _, exists := lm.mutexMap[entry]; !exists {
return
}
lm.unlockEntry(entry)
}
func (lm *lockMap) addEntry(entry string) {
lm.mutexMap[entry] = &sync.Mutex{}
}
func (lm *lockMap) lockEntry(entry string) {
lm.mutexMap[entry].Lock()
}
func (lm *lockMap) unlockEntry(entry string) {
lm.mutexMap[entry].Unlock()
}