Fix collisions issues / timeouts for mounts

For non-attachable volumes, do not call GetVolumeName on the plugin and instead
generate a unique name based on the identity of the pod and the name of the volume
within the pod.
This commit is contained in:
Paul Morie
2016-07-27 17:53:40 -04:00
parent 03fe6b962c
commit c884297990
7 changed files with 144 additions and 70 deletions

View File

@@ -224,7 +224,7 @@ type nodeToUpdateStatusFor struct {
}
func (asw *actualStateOfWorld) MarkVolumeAsAttached(
volumeSpec *volume.Spec, nodeName string, devicePath string) error {
_ api.UniqueVolumeName, volumeSpec *volume.Spec, nodeName string, devicePath string) error {
_, err := asw.AddVolumeNode(volumeSpec, nodeName, devicePath)
return err
}

View File

@@ -51,14 +51,6 @@ type ActualStateOfWorld interface {
// operationexecutor to interact with it.
operationexecutor.ActualStateOfWorldAttacherUpdater
// AddVolume adds the given volume to the cache indicating the specified
// volume is attached to this node. A unique volume name is generated from
// the volumeSpec and returned on success.
// If a volume with the same generated name already exists, this is a noop.
// If no volume plugin can support the given volumeSpec or more than one
// plugin can support it, an error is returned.
AddVolume(volumeSpec *volume.Spec, devicePath string) (api.UniqueVolumeName, error)
// AddPodToVolume adds the given pod to the given volume in the cache
// indicating the specified volume has been successfully mounted to the
// specified pod.
@@ -274,9 +266,8 @@ type mountedPod struct {
}
func (asw *actualStateOfWorld) MarkVolumeAsAttached(
volumeSpec *volume.Spec, nodeName string, devicePath string) error {
_, err := asw.AddVolume(volumeSpec, devicePath)
return err
volumeName api.UniqueVolumeName, volumeSpec *volume.Spec, _, devicePath string) error {
return asw.addVolume(volumeName, volumeSpec, devicePath)
}
func (asw *actualStateOfWorld) MarkVolumeAsDetached(
@@ -315,28 +306,35 @@ func (asw *actualStateOfWorld) MarkDeviceAsUnmounted(
return asw.SetVolumeGloballyMounted(volumeName, false /* globallyMounted */)
}
func (asw *actualStateOfWorld) AddVolume(
volumeSpec *volume.Spec, devicePath string) (api.UniqueVolumeName, error) {
// addVolume adds the given volume to the cache indicating the specified
// volume is attached to this node. If no volume name is supplied, a unique
// volume name is generated from the volumeSpec and returned on success. If a
// volume with the same generated name already exists, this is a noop. If no
// volume plugin can support the given volumeSpec or more than one plugin can
// support it, an error is returned.
func (asw *actualStateOfWorld) addVolume(
volumeName api.UniqueVolumeName, volumeSpec *volume.Spec, devicePath string) error {
asw.Lock()
defer asw.Unlock()
volumePlugin, err := asw.volumePluginMgr.FindPluginBySpec(volumeSpec)
if err != nil || volumePlugin == nil {
return "", fmt.Errorf(
return fmt.Errorf(
"failed to get Plugin from volumeSpec for volume %q err=%v",
volumeSpec.Name(),
err)
}
volumeName, err :=
volumehelper.GetUniqueVolumeNameFromSpec(volumePlugin, volumeSpec)
if len(volumeName) == 0 {
volumeName, err = volumehelper.GetUniqueVolumeNameFromSpec(volumePlugin, volumeSpec)
if err != nil {
return "", fmt.Errorf(
return fmt.Errorf(
"failed to GetUniqueVolumeNameFromSpec for volumeSpec %q using volume plugin %q err=%v",
volumeSpec.Name(),
volumePlugin.GetPluginName(),
err)
}
}
pluginIsAttachable := false
if _, ok := volumePlugin.(volume.AttachableVolumePlugin); ok {
@@ -357,7 +355,7 @@ func (asw *actualStateOfWorld) AddVolume(
asw.attachedVolumes[volumeName] = volumeObj
}
return volumeObj.volumeName, nil
return nil
}
func (asw *actualStateOfWorld) AddPodToVolume(

View File

@@ -26,12 +26,14 @@ import (
"k8s.io/kubernetes/pkg/volume/util/volumehelper"
)
// Calls AddVolume() once to add volume
var emptyVolumeName = api.UniqueVolumeName("")
// Calls MarkVolumeAsAttached() once to add volume
// Verifies newly added volume exists in GetUnmountedVolumes()
// Verifies newly added volume doesn't exist in GetGloballyMountedVolumes()
func Test_AddVolume_Positive_NewVolume(t *testing.T) {
func Test_MarkVolumeAsAttached_Positive_NewVolume(t *testing.T) {
// Arrange
volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t)
volumePluginMgr, plugin := volumetesting.GetTestVolumePluginMgr(t)
asw := NewActualStateOfWorld("mynode" /* nodeName */, volumePluginMgr)
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{
@@ -53,13 +55,14 @@ func Test_AddVolume_Positive_NewVolume(t *testing.T) {
}
volumeSpec := &volume.Spec{Volume: &pod.Spec.Volumes[0]}
devicePath := "fake/device/path"
generatedVolumeName, _ := volumehelper.GetUniqueVolumeNameFromSpec(plugin, volumeSpec)
// Act
generatedVolumeName, err := asw.AddVolume(volumeSpec, devicePath)
err := asw.MarkVolumeAsAttached(emptyVolumeName, volumeSpec, "" /* nodeName */, devicePath)
// Assert
if err != nil {
t.Fatalf("AddVolume failed. Expected: <no error> Actual: <%v>", err)
t.Fatalf("MarkVolumeAsAttached failed. Expected: <no error> Actual: <%v>", err)
}
verifyVolumeExistsAsw(t, generatedVolumeName, true /* shouldExist */, asw)
@@ -67,13 +70,57 @@ func Test_AddVolume_Positive_NewVolume(t *testing.T) {
verifyVolumeDoesntExistInGloballyMountedVolumes(t, generatedVolumeName, asw)
}
// Calls AddVolume() twice to add the same volume
// Calls MarkVolumeAsAttached() once to add volume, specifying a name --
// establishes that the supplied volume name is used to register the volume
// rather than the generated one.
// Verifies newly added volume exists in GetUnmountedVolumes()
// Verifies newly added volume doesn't exist in GetGloballyMountedVolumes()
func Test_MarkVolumeAsAttached_SuppliedVolumeName_Positive_NewVolume(t *testing.T) {
// Arrange
volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t)
asw := NewActualStateOfWorld("mynode" /* nodeName */, volumePluginMgr)
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "pod1",
UID: "pod1uid",
},
Spec: api.PodSpec{
Volumes: []api.Volume{
{
Name: "volume-name",
VolumeSource: api.VolumeSource{
GCEPersistentDisk: &api.GCEPersistentDiskVolumeSource{
PDName: "fake-device1",
},
},
},
},
},
}
volumeSpec := &volume.Spec{Volume: &pod.Spec.Volumes[0]}
devicePath := "fake/device/path"
volumeName := api.UniqueVolumeName("this-would-never-be-a-volume-name")
// Act
err := asw.MarkVolumeAsAttached(volumeName, volumeSpec, "" /* nodeName */, devicePath)
// Assert
if err != nil {
t.Fatalf("MarkVolumeAsAttached failed. Expected: <no error> Actual: <%v>", err)
}
verifyVolumeExistsAsw(t, volumeName, true /* shouldExist */, asw)
verifyVolumeExistsInUnmountedVolumes(t, volumeName, asw)
verifyVolumeDoesntExistInGloballyMountedVolumes(t, volumeName, asw)
}
// Calls MarkVolumeAsAttached() twice to add the same volume
// Verifies second call doesn't fail
// Verifies newly added volume exists in GetUnmountedVolumes()
// Verifies newly added volume doesn't exist in GetGloballyMountedVolumes()
func Test_AddVolume_Positive_ExistingVolume(t *testing.T) {
func Test_MarkVolumeAsAttached_Positive_ExistingVolume(t *testing.T) {
// Arrange
volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t)
volumePluginMgr, plugin := volumetesting.GetTestVolumePluginMgr(t)
devicePath := "fake/device/path"
asw := NewActualStateOfWorld("mynode" /* nodeName */, volumePluginMgr)
pod := &api.Pod{
@@ -94,19 +141,20 @@ func Test_AddVolume_Positive_ExistingVolume(t *testing.T) {
},
},
}
volumeSpec := &volume.Spec{Volume: &pod.Spec.Volumes[0]}
generatedVolumeName, err := asw.AddVolume(volumeSpec, devicePath)
generatedVolumeName, _ := volumehelper.GetUniqueVolumeNameFromSpec(plugin, volumeSpec)
err := asw.MarkVolumeAsAttached(emptyVolumeName, volumeSpec, "" /* nodeName */, devicePath)
if err != nil {
t.Fatalf("AddVolume failed. Expected: <no error> Actual: <%v>", err)
t.Fatalf("MarkVolumeAsAttached failed. Expected: <no error> Actual: <%v>", err)
}
// Act
generatedVolumeName, err = asw.AddVolume(volumeSpec, devicePath)
err = asw.MarkVolumeAsAttached(emptyVolumeName, volumeSpec, "" /* nodeName */, devicePath)
// Assert
if err != nil {
t.Fatalf("AddVolume failed. Expected: <no error> Actual: <%v>", err)
t.Fatalf("MarkVolumeAsAttached failed. Expected: <no error> Actual: <%v>", err)
}
verifyVolumeExistsAsw(t, generatedVolumeName, true /* shouldExist */, asw)
@@ -141,14 +189,12 @@ func Test_AddPodToVolume_Positive_ExistingVolumeNewNode(t *testing.T) {
},
},
}
volumeSpec := &volume.Spec{Volume: &pod.Spec.Volumes[0]}
volumeName, err := volumehelper.GetUniqueVolumeNameFromSpec(
plugin, volumeSpec)
generatedVolumeName, err := volumehelper.GetUniqueVolumeNameFromSpec(plugin, volumeSpec)
generatedVolumeName, err := asw.AddVolume(volumeSpec, devicePath)
err = asw.MarkVolumeAsAttached(emptyVolumeName, volumeSpec, "" /* nodeName */, devicePath)
if err != nil {
t.Fatalf("AddVolume failed. Expected: <no error> Actual: <%v>", err)
t.Fatalf("MarkVolumeAsAttached failed. Expected: <no error> Actual: <%v>", err)
}
podName := volumehelper.GetUniquePodName(pod)
@@ -159,7 +205,7 @@ func Test_AddPodToVolume_Positive_ExistingVolumeNewNode(t *testing.T) {
// Act
err = asw.AddPodToVolume(
podName, pod.UID, volumeName, mounter, volumeSpec.Name(), "" /* volumeGidValue */)
podName, pod.UID, generatedVolumeName, mounter, volumeSpec.Name(), "" /* volumeGidValue */)
// Assert
if err != nil {
@@ -202,12 +248,12 @@ func Test_AddPodToVolume_Positive_ExistingVolumeExistingNode(t *testing.T) {
}
volumeSpec := &volume.Spec{Volume: &pod.Spec.Volumes[0]}
volumeName, err := volumehelper.GetUniqueVolumeNameFromSpec(
generatedVolumeName, err := volumehelper.GetUniqueVolumeNameFromSpec(
plugin, volumeSpec)
generatedVolumeName, err := asw.AddVolume(volumeSpec, devicePath)
err = asw.MarkVolumeAsAttached(emptyVolumeName, volumeSpec, "" /* nodeName */, devicePath)
if err != nil {
t.Fatalf("AddVolume failed. Expected: <no error> Actual: <%v>", err)
t.Fatalf("MarkVolumeAsAttached failed. Expected: <no error> Actual: <%v>", err)
}
podName := volumehelper.GetUniquePodName(pod)
@@ -217,14 +263,14 @@ func Test_AddPodToVolume_Positive_ExistingVolumeExistingNode(t *testing.T) {
}
err = asw.AddPodToVolume(
podName, pod.UID, volumeName, mounter, volumeSpec.Name(), "" /* volumeGidValue */)
podName, pod.UID, generatedVolumeName, mounter, volumeSpec.Name(), "" /* volumeGidValue */)
if err != nil {
t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
}
// Act
err = asw.AddPodToVolume(
podName, pod.UID, volumeName, mounter, volumeSpec.Name(), "" /* volumeGidValue */)
podName, pod.UID, generatedVolumeName, mounter, volumeSpec.Name(), "" /* volumeGidValue */)
// Assert
if err != nil {
@@ -301,13 +347,13 @@ func Test_AddPodToVolume_Negative_VolumeDoesntExist(t *testing.T) {
asw)
}
// Calls AddVolume() once to add volume
// Calls MarkVolumeAsAttached() once to add volume
// Calls MarkDeviceAsMounted() to mark volume as globally mounted.
// Verifies newly added volume exists in GetUnmountedVolumes()
// Verifies newly added volume exists in GetGloballyMountedVolumes()
func Test_MarkDeviceAsMounted_Positive_NewVolume(t *testing.T) {
// Arrange
volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t)
volumePluginMgr, plugin := volumetesting.GetTestVolumePluginMgr(t)
asw := NewActualStateOfWorld("mynode" /* nodeName */, volumePluginMgr)
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{
@@ -329,9 +375,11 @@ func Test_MarkDeviceAsMounted_Positive_NewVolume(t *testing.T) {
}
volumeSpec := &volume.Spec{Volume: &pod.Spec.Volumes[0]}
devicePath := "fake/device/path"
generatedVolumeName, err := asw.AddVolume(volumeSpec, devicePath)
generatedVolumeName, err := volumehelper.GetUniqueVolumeNameFromSpec(plugin, volumeSpec)
err = asw.MarkVolumeAsAttached(emptyVolumeName, volumeSpec, "" /* nodeName */, devicePath)
if err != nil {
t.Fatalf("AddVolume failed. Expected: <no error> Actual: <%v>", err)
t.Fatalf("MarkVolumeAsAttached failed. Expected: <no error> Actual: <%v>", err)
}
// Act

View File

@@ -183,7 +183,15 @@ func (dsw *desiredStateOfWorld) AddPodToVolume(
err)
}
volumeName, err :=
var volumeName api.UniqueVolumeName
// The unique volume name used depends on whether the volume is attachable
// or not.
attachable := dsw.isAttachableVolume(volumeSpec)
if attachable {
// For attachable volumes, use the unique volume name as reported by
// the plugin.
volumeName, err =
volumehelper.GetUniqueVolumeNameFromSpec(volumePlugin, volumeSpec)
if err != nil {
return "", fmt.Errorf(
@@ -192,13 +200,18 @@ func (dsw *desiredStateOfWorld) AddPodToVolume(
volumePlugin.GetPluginName(),
err)
}
} else {
// For non-attachable volumes, generate a unique name based on the pod
// namespace and name and the name of the volume within the pod.
volumeName = volumehelper.GetUniqueVolumeNameForNonAttachableVolume(podName, volumePlugin, outerVolumeSpecName)
}
volumeObj, volumeExists := dsw.volumesToMount[volumeName]
if !volumeExists {
volumeObj = volumeToMount{
volumeName: volumeName,
podsToMount: make(map[types.UniquePodName]podToMount),
pluginIsAttachable: dsw.isAttachableVolume(volumeSpec),
pluginIsAttachable: attachable,
volumeGidValue: volumeGidValue,
reportedInUse: false,
}

View File

@@ -61,7 +61,7 @@ func (plugin *gitRepoPlugin) GetPluginName() string {
func (plugin *gitRepoPlugin) GetVolumeName(spec *volume.Spec) (string, error) {
volumeSource, _ := getVolumeSource(spec)
if volumeSource == nil {
return "", fmt.Errorf("Spec does not reference a GCE volume type")
return "", fmt.Errorf("Spec does not reference a Git repo volume type")
}
return fmt.Sprintf(

View File

@@ -15,8 +15,9 @@ limitations under the License.
*/
// Package operationexecutor implements interfaces that enable execution of
// attach, detach, mount, and unmount operations with a nestedpendingoperations
// so that more than one operation is never triggered on the same volume.
// attach, detach, mount, and unmount operations with a
// nestedpendingoperations so that more than one operation is never triggered
// on the same volume for the same pod.
package operationexecutor
import (
@@ -131,8 +132,14 @@ type ActualStateOfWorldMounterUpdater interface {
// ActualStateOfWorldAttacherUpdater defines a set of operations updating the
// actual state of the world cache after successful attach/detach/mount/unmount.
type ActualStateOfWorldAttacherUpdater interface {
// Marks the specified volume as attached to the specified node
MarkVolumeAsAttached(volumeSpec *volume.Spec, nodeName string, devicePath string) error
// Marks the specified volume as attached to the specified node. If the
// volume name is supplied, that volume name will be used. If not, the
// volume name is computed using the result from querying the plugin.
//
// TODO: in the future, we should be able to remove the volumeName
// argument to this method -- since it is used only for attachable
// volumes. See issue 29695.
MarkVolumeAsAttached(volumeName api.UniqueVolumeName, volumeSpec *volume.Spec, nodeName, devicePath string) error
// Marks the specified volume as detached from the specified node
MarkVolumeAsDetached(volumeName api.UniqueVolumeName, nodeName string)
@@ -370,6 +377,7 @@ func (oe *operationExecutor) MountVolume(
}
podName := volumetypes.UniquePodName("")
// TODO: remove this -- not necessary
if !volumeToMount.PluginIsAttachable {
// Non-attachable volume plugins can execute mount for multiple pods
// referencing the same volume in parallel
@@ -473,7 +481,7 @@ func (oe *operationExecutor) generateAttachVolumeFunc(
// Update actual state of world
addVolumeNodeErr := actualStateOfWorld.MarkVolumeAsAttached(
volumeToAttach.VolumeSpec, volumeToAttach.NodeName, devicePath)
api.UniqueVolumeName(""), volumeToAttach.VolumeSpec, volumeToAttach.NodeName, devicePath)
if addVolumeNodeErr != nil {
// On failure, return error. Caller will log and retry.
return fmt.Errorf(
@@ -931,12 +939,13 @@ func (oe *operationExecutor) generateVerifyControllerAttachedVolumeFunc(
// If the volume does not implement the attacher interface, it is
// assumed to be attached and the the actual state of the world is
// updated accordingly.
addVolumeNodeErr := actualStateOfWorld.MarkVolumeAsAttached(
volumeToMount.VolumeSpec, nodeName, volumeToMount.DevicePath)
volumeToMount.VolumeName, volumeToMount.VolumeSpec, nodeName, "" /* devicePath */)
if addVolumeNodeErr != nil {
// On failure, return error. Caller will log and retry.
return fmt.Errorf(
"VerifyControllerAttachedVolume.MarkVolumeAsAttached failed for volume %q (spec.Name: %q) pod %q (UID: %q) with: %v.",
"VerifyControllerAttachedVolume.MarkVolumeAsAttachedByUniqueVolumeName failed for volume %q (spec.Name: %q) pod %q (UID: %q) with: %v.",
volumeToMount.VolumeName,
volumeToMount.VolumeSpec.Name(),
volumeToMount.PodName,
@@ -987,7 +996,7 @@ func (oe *operationExecutor) generateVerifyControllerAttachedVolumeFunc(
for _, attachedVolume := range node.Status.VolumesAttached {
if attachedVolume.Name == volumeToMount.VolumeName {
addVolumeNodeErr := actualStateOfWorld.MarkVolumeAsAttached(
volumeToMount.VolumeSpec, nodeName, attachedVolume.DevicePath)
api.UniqueVolumeName(""), volumeToMount.VolumeSpec, nodeName, attachedVolume.DevicePath)
glog.Infof("Controller successfully attached volume %q (spec.Name: %q) pod %q (UID: %q) devicePath: %q",
volumeToMount.VolumeName,
volumeToMount.VolumeSpec.Name(),

View File

@@ -52,6 +52,12 @@ func GetUniqueVolumeName(pluginName, volumeName string) api.UniqueVolumeName {
return api.UniqueVolumeName(fmt.Sprintf("%s/%s", pluginName, volumeName))
}
// GetUniqueVolumeNameForNonAttachableVolume returns the unique volume name
// for a non-attachable volume.
func GetUniqueVolumeNameForNonAttachableVolume(podName types.UniquePodName, volumePlugin volume.VolumePlugin, podSpecName string) api.UniqueVolumeName {
return api.UniqueVolumeName(fmt.Sprintf("%s/%v-%s", volumePlugin.GetPluginName(), podName, podSpecName))
}
// GetUniqueVolumeNameFromSpec uses the given VolumePlugin to generate a unique
// name representing the volume defined in the specified volume spec.
// This returned name can be used to uniquely reference the actual backing