Merge pull request #58177 from jingxu97/Jan/reconstruct

Automatic merge from submit-queue. If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Redesign and implement volume reconstruction work

This PR is the first part of redesign of volume reconstruction work. The detailed design information is https://github.com/kubernetes/community/pull/1601

The changes include
1. Remove dependency on volume spec stored in actual state for volume
cleanup process (UnmountVolume and UnmountDevice)

Modify AttachedVolume struct to add DeviceMountPath so that volume
unmount operation can use this information instead of constructing from
volume spec

2. Modify reconciler's volume reconstruction process (syncState). Currently workflow
is when kubelet restarts, syncState() is only called once before
reconciler starts its loop.
a. If volume plugin supports reconstruction, it will use the
reconstructed volume spec information to update actual state as before.
b. If volume plugin cannot support reconstruction, it will use the
scanned mount path information to clean up the mounts.

In this PR, all the plugins still support reconstruction (except
glusterfs), so reconstruction of some plugins will still have issues.
The next PR will modify those plugins that cannot support reconstruction
well.

This PR addresses issue #52683
This commit is contained in:
Kubernetes Submit Queue
2018-02-08 18:21:34 -08:00
committed by GitHub
13 changed files with 329 additions and 186 deletions

View File

@@ -96,7 +96,7 @@ func (c *PodConfig) SeenAllSources(seenSources sets.String) bool {
if c.pods == nil {
return false
}
glog.V(6).Infof("Looking for %v, have seen %v", c.sources.List(), seenSources)
glog.V(5).Infof("Looking for %v, have seen %v", c.sources.List(), seenSources)
return seenSources.HasAll(c.sources.List()...) && c.pods.seenSources(c.sources.List()...)
}

View File

@@ -73,7 +73,7 @@ type ActualStateOfWorld interface {
// must unmounted prior to detach.
// If a volume with the name volumeName does not exist in the list of
// attached volumes, an error is returned.
SetVolumeGloballyMounted(volumeName v1.UniqueVolumeName, globallyMounted bool) error
SetVolumeGloballyMounted(volumeName v1.UniqueVolumeName, globallyMounted bool, devicePath, deviceMountPath string) error
// DeletePodFromVolume removes the given pod from the given volume in the
// cache indicating the volume has been successfully unmounted from the pod.
@@ -109,6 +109,13 @@ type ActualStateOfWorld interface {
// volumes that do not need to update contents should not fail.
PodExistsInVolume(podName volumetypes.UniquePodName, volumeName v1.UniqueVolumeName) (bool, string, error)
// VolumeExistsWithSpecName returns true if the given volume specified with the
// volume spec name (a.k.a., InnerVolumeSpecName) exists in the list of
// volumes that should be attached to this node.
// If a pod with the same name does not exist under the specified
// volume, false is returned.
VolumeExistsWithSpecName(podName volumetypes.UniquePodName, volumeSpecName string) bool
// VolumeExists returns true if the given volume exists in the list of
// attached volumes in the cache, indicating the volume is attached to this
// node.
@@ -240,6 +247,10 @@ type attachedVolume struct {
// devicePath contains the path on the node where the volume is attached for
// attachable volumes
devicePath string
// deviceMountPath contains the path on the node where the device should
// be mounted after it is attached.
deviceMountPath string
}
// The mountedPod object represents a pod for which the kubelet volume manager
@@ -318,13 +329,13 @@ func (asw *actualStateOfWorld) MarkVolumeAsUnmounted(
}
func (asw *actualStateOfWorld) MarkDeviceAsMounted(
volumeName v1.UniqueVolumeName) error {
return asw.SetVolumeGloballyMounted(volumeName, true /* globallyMounted */)
volumeName v1.UniqueVolumeName, devicePath, deviceMountPath string) error {
return asw.SetVolumeGloballyMounted(volumeName, true /* globallyMounted */, devicePath, deviceMountPath)
}
func (asw *actualStateOfWorld) MarkDeviceAsUnmounted(
volumeName v1.UniqueVolumeName) error {
return asw.SetVolumeGloballyMounted(volumeName, false /* globallyMounted */)
return asw.SetVolumeGloballyMounted(volumeName, false /* globallyMounted */, "", "")
}
// addVolume adds the given volume to the cache indicating the specified
@@ -454,7 +465,7 @@ func (asw *actualStateOfWorld) MarkRemountRequired(
}
func (asw *actualStateOfWorld) SetVolumeGloballyMounted(
volumeName v1.UniqueVolumeName, globallyMounted bool) error {
volumeName v1.UniqueVolumeName, globallyMounted bool, devicePath, deviceMountPath string) error {
asw.Lock()
defer asw.Unlock()
@@ -466,6 +477,8 @@ func (asw *actualStateOfWorld) SetVolumeGloballyMounted(
}
volumeObj.globallyMounted = globallyMounted
volumeObj.deviceMountPath = deviceMountPath
volumeObj.devicePath = devicePath
asw.attachedVolumes[volumeName] = volumeObj
return nil
}
@@ -529,6 +542,19 @@ func (asw *actualStateOfWorld) PodExistsInVolume(
return podExists, volumeObj.devicePath, nil
}
func (asw *actualStateOfWorld) VolumeExistsWithSpecName(podName volumetypes.UniquePodName, volumeSpecName string) bool {
asw.RLock()
defer asw.RUnlock()
for _, volumeObj := range asw.attachedVolumes {
for name := range volumeObj.mountedPods {
if podName == name && volumeObj.spec.Name() == volumeSpecName {
return true
}
}
}
return false
}
func (asw *actualStateOfWorld) VolumeExists(
volumeName v1.UniqueVolumeName) bool {
asw.RLock()
@@ -625,8 +651,11 @@ func (asw *actualStateOfWorld) newAttachedVolume(
VolumeSpec: attachedVolume.spec,
NodeName: asw.nodeName,
PluginIsAttachable: attachedVolume.pluginIsAttachable,
DevicePath: attachedVolume.devicePath},
GloballyMounted: attachedVolume.globallyMounted}
DevicePath: attachedVolume.devicePath,
DeviceMountPath: attachedVolume.deviceMountPath,
PluginName: attachedVolume.pluginName},
GloballyMounted: attachedVolume.globallyMounted,
}
}
// Compile-time check to ensure volumeNotAttachedError implements the error interface
@@ -691,5 +720,6 @@ func getMountedVolume(
Mounter: mountedPod.mounter,
BlockVolumeMapper: mountedPod.blockVolumeMapper,
VolumeGidValue: mountedPod.volumeGidValue,
VolumeSpec: attachedVolume.spec}}
VolumeSpec: attachedVolume.spec,
DeviceMountPath: attachedVolume.deviceMountPath}}
}

View File

@@ -222,6 +222,7 @@ func Test_AddPodToVolume_Positive_ExistingVolumeNewNode(t *testing.T) {
verifyVolumeDoesntExistInUnmountedVolumes(t, generatedVolumeName, asw)
verifyVolumeDoesntExistInGloballyMountedVolumes(t, generatedVolumeName, asw)
verifyPodExistsInVolumeAsw(t, podName, generatedVolumeName, "fake/device/path" /* expectedDevicePath */, asw)
verifyVolumeExistsWithSpecNameInVolumeAsw(t, podName, volumeSpec.Name(), asw)
}
// Populates data struct with a volume
@@ -292,6 +293,7 @@ func Test_AddPodToVolume_Positive_ExistingVolumeExistingNode(t *testing.T) {
verifyVolumeDoesntExistInUnmountedVolumes(t, generatedVolumeName, asw)
verifyVolumeDoesntExistInGloballyMountedVolumes(t, generatedVolumeName, asw)
verifyPodExistsInVolumeAsw(t, podName, generatedVolumeName, "fake/device/path" /* expectedDevicePath */, asw)
verifyVolumeExistsWithSpecNameInVolumeAsw(t, podName, volumeSpec.Name(), asw)
}
// Calls AddPodToVolume() to add pod to empty data stuct
@@ -370,6 +372,7 @@ func Test_AddPodToVolume_Negative_VolumeDoesntExist(t *testing.T) {
volumeName,
false, /* expectVolumeToExist */
asw)
verifyVolumeDoesntExistWithSpecNameInVolumeAsw(t, podName, volumeSpec.Name(), asw)
}
// Calls MarkVolumeAsAttached() once to add volume
@@ -400,6 +403,7 @@ func Test_MarkDeviceAsMounted_Positive_NewVolume(t *testing.T) {
}
volumeSpec := &volume.Spec{Volume: &pod.Spec.Volumes[0]}
devicePath := "fake/device/path"
deviceMountPath := "fake/device/mount/path"
generatedVolumeName, err := volumehelper.GetUniqueVolumeNameFromSpec(plugin, volumeSpec)
err = asw.MarkVolumeAsAttached(emptyVolumeName, volumeSpec, "" /* nodeName */, devicePath)
@@ -408,7 +412,7 @@ func Test_MarkDeviceAsMounted_Positive_NewVolume(t *testing.T) {
}
// Act
err = asw.MarkDeviceAsMounted(generatedVolumeName)
err = asw.MarkDeviceAsMounted(generatedVolumeName, devicePath, deviceMountPath)
// Assert
if err != nil {
@@ -546,3 +550,33 @@ func verifyPodDoesntExistInVolumeAsw(
devicePath)
}
}
func verifyVolumeExistsWithSpecNameInVolumeAsw(
t *testing.T,
expectedPodName volumetypes.UniquePodName,
expectedVolumeName string,
asw ActualStateOfWorld) {
podExistsInVolume :=
asw.VolumeExistsWithSpecName(expectedPodName, expectedVolumeName)
if !podExistsInVolume {
t.Fatalf(
"ASW VolumeExistsWithSpecName result invalid. Expected: <true> Actual: <%v>",
podExistsInVolume)
}
}
func verifyVolumeDoesntExistWithSpecNameInVolumeAsw(
t *testing.T,
podToCheck volumetypes.UniquePodName,
volumeToCheck string,
asw ActualStateOfWorld) {
podExistsInVolume :=
asw.VolumeExistsWithSpecName(podToCheck, volumeToCheck)
if podExistsInVolume {
t.Fatalf(
"ASW VolumeExistsWithSpecName result invalid. Expected: <false> Actual: <%v>",
podExistsInVolume)
}
}

View File

@@ -98,6 +98,13 @@ type DesiredStateOfWorld interface {
// with pod's unique name. This map can be used to determine which pod is currently
// in desired state of world.
GetPods() map[types.UniquePodName]bool
// VolumeExistsWithSpecName returns true if the given volume specified with the
// volume spec name (a.k.a., InnerVolumeSpecName) exists in the list of
// volumes that should be attached to this node.
// If a pod with the same name does not exist under the specified
// volume, false is returned.
VolumeExistsWithSpecName(podName types.UniquePodName, volumeSpecName string) bool
}
// VolumeToMount represents a volume that is attached to this node and needs to
@@ -234,7 +241,6 @@ func (dsw *desiredStateOfWorld) AddPodToVolume(
spec: volumeSpec,
outerVolumeSpecName: outerVolumeSpecName,
}
return volumeName, nil
}
@@ -303,6 +309,19 @@ func (dsw *desiredStateOfWorld) PodExistsInVolume(
return podExists
}
func (dsw *desiredStateOfWorld) VolumeExistsWithSpecName(podName types.UniquePodName, volumeSpecName string) bool {
dsw.RLock()
defer dsw.RUnlock()
for _, volumeObj := range dsw.volumesToMount {
for name, podObj := range volumeObj.podsToMount {
if podName == name && podObj.spec.Name() == volumeSpecName {
return true
}
}
}
return false
}
func (dsw *desiredStateOfWorld) GetPods() map[types.UniquePodName]bool {
dsw.RLock()
defer dsw.RUnlock()

View File

@@ -69,6 +69,7 @@ func Test_AddPodToVolume_Positive_NewPodNewVolume(t *testing.T) {
verifyVolumeExistsInVolumesToMount(
t, generatedVolumeName, false /* expectReportedInUse */, dsw)
verifyPodExistsInVolumeDsw(t, podName, generatedVolumeName, dsw)
verifyVolumeExistsWithSpecNameInVolumeDsw(t, podName, volumeSpec.Name(), dsw)
}
// Calls AddPodToVolume() twice to add the same pod to the same volume
@@ -113,6 +114,7 @@ func Test_AddPodToVolume_Positive_ExistingPodExistingVolume(t *testing.T) {
verifyVolumeExistsInVolumesToMount(
t, generatedVolumeName, false /* expectReportedInUse */, dsw)
verifyPodExistsInVolumeDsw(t, podName, generatedVolumeName, dsw)
verifyVolumeExistsWithSpecNameInVolumeDsw(t, podName, volumeSpec.Name(), dsw)
}
// Populates data struct with a new volume/pod
@@ -160,6 +162,7 @@ func Test_DeletePodFromVolume_Positive_PodExistsVolumeExists(t *testing.T) {
verifyVolumeDoesntExist(t, generatedVolumeName, dsw)
verifyVolumeDoesntExistInVolumesToMount(t, generatedVolumeName, dsw)
verifyPodDoesntExistInVolumeDsw(t, podName, generatedVolumeName, dsw)
verifyVolumeDoesntExistWithSpecNameInVolumeDsw(t, podName, volumeSpec.Name(), dsw)
}
// Calls AddPodToVolume() to add three new volumes to data struct
@@ -380,3 +383,29 @@ func verifyPodDoesntExistInVolumeDsw(
podExistsInVolume)
}
}
func verifyVolumeExistsWithSpecNameInVolumeDsw(
t *testing.T,
expectedPodName volumetypes.UniquePodName,
expectedVolumeSpecName string,
dsw DesiredStateOfWorld) {
if podExistsInVolume := dsw.VolumeExistsWithSpecName(
expectedPodName, expectedVolumeSpecName); !podExistsInVolume {
t.Fatalf(
"DSW VolumeExistsWithSpecNam returned incorrect value. Expected: <true> Actual: <%v>",
podExistsInVolume)
}
}
func verifyVolumeDoesntExistWithSpecNameInVolumeDsw(
t *testing.T,
expectedPodName volumetypes.UniquePodName,
expectedVolumeSpecName string,
dsw DesiredStateOfWorld) {
if podExistsInVolume := dsw.VolumeExistsWithSpecName(
expectedPodName, expectedVolumeSpecName); podExistsInVolume {
t.Fatalf(
"DSW VolumeExistsWithSpecNam returned incorrect value. Expected: <true> Actual: <%v>",
podExistsInVolume)
}
}

View File

@@ -123,6 +123,7 @@ type processedPods struct {
func (dswp *desiredStateOfWorldPopulator) Run(sourcesReady config.SourcesReady, stopCh <-chan struct{}) {
// Wait for the completion of a loop that started after sources are all ready, then set hasAddedPods accordingly
glog.Infof("Desired state populator starts to run")
wait.PollUntil(dswp.loopSleepDuration, func() (bool, error) {
done := sourcesReady.AllReady()
dswp.populatorLoopFunc()()

View File

@@ -140,26 +140,18 @@ type reconciler struct {
}
func (rc *reconciler) Run(stopCh <-chan struct{}) {
// Wait for the populator to indicate that it has actually populated the desired state of world, meaning it has
// completed a populate loop that started after sources are all ready. After, there's no need to keep checking.
wait.PollUntil(rc.loopSleepDuration, func() (bool, error) {
rc.reconciliationLoopFunc(rc.populatorHasAddedPods())()
return rc.populatorHasAddedPods(), nil
}, stopCh)
wait.Until(rc.reconciliationLoopFunc(true), rc.loopSleepDuration, stopCh)
wait.Until(rc.reconciliationLoopFunc(), rc.loopSleepDuration, stopCh)
}
func (rc *reconciler) reconciliationLoopFunc(populatorHasAddedPods bool) func() {
func (rc *reconciler) reconciliationLoopFunc() func() {
return func() {
rc.reconcile()
// Add a check that the populator has added pods so that reconciler's reconstruct process will start
// after desired state of world is populated with pod volume information. Otherwise, reconciler's
// reconstruct process may add incomplete volume information and cause confusion. In addition, if the
// desired state of world has not been populated yet, the reconstruct process may clean up pods' volumes
// that are still in use because desired state of world does not contain a complete list of pods.
if populatorHasAddedPods && time.Since(rc.timeOfLastSync) > rc.syncDuration {
glog.V(5).Infof("Desired state of world has been populated with pods, starting reconstruct state function")
// Sync the state with the reality once after all existing pods are added to the desired state from all sources.
// Otherwise, the reconstruct process may clean up pods' volumes that are still in use because
// desired state of world does not contain a complete list of pods.
if rc.populatorHasAddedPods() && !rc.StatesHasBeenSynced() {
glog.Infof("Reconciler: start to sync state")
rc.sync()
}
}
@@ -319,11 +311,11 @@ func (rc *reconciler) reconcile() {
// sync process tries to observe the real world by scanning all pods' volume directories from the disk.
// If the actual and desired state of worlds are not consistent with the observed world, it means that some
// mounted volumes are left out probably during kubelet restart. This process will reconstruct
// the volumes and udpate the actual and desired states. In the following reconciler loop, those volumes will
// be cleaned up.
// the volumes and udpate the actual and desired states. For the volumes that cannot support reconstruction,
// it will try to clean up the mount paths with operation executor.
func (rc *reconciler) sync() {
defer rc.updateLastSyncTime()
rc.syncStates(rc.kubeletPodsDir)
rc.syncStates()
}
func (rc *reconciler) updateLastSyncTime() {
@@ -348,7 +340,7 @@ type reconstructedVolume struct {
volumeSpec *volumepkg.Spec
outerVolumeSpecName string
pod *v1.Pod
pluginIsAttachable bool
attachablePlugin volumepkg.AttachableVolumePlugin
volumeGidValue string
devicePath string
reportedInUse bool
@@ -356,66 +348,41 @@ type reconstructedVolume struct {
blockVolumeMapper volumepkg.BlockVolumeMapper
}
// reconstructFromDisk scans the volume directories under the given pod directory. If the volume is not
// in either actual or desired state of world, or pending operation, this function will reconstruct
// the volume spec and put it in both the actual and desired state of worlds. If no running
// container is mounting the volume, the volume will be removed by desired state of world's populator and
// cleaned up by the reconciler.
func (rc *reconciler) syncStates(podsDir string) {
// syncStates scans the volume directories under the given pod directory.
// If the volume is not in desired state of world, this function will reconstruct
// the volume related information and put it in both the actual and desired state of worlds.
// For some volume plugins that cannot support reconstruction, it will clean up the existing
// mount points since the volume is no long needed (removed from desired state)
func (rc *reconciler) syncStates() {
// Get volumes information by reading the pod's directory
podVolumes, err := getVolumesFromPodDir(podsDir)
podVolumes, err := getVolumesFromPodDir(rc.kubeletPodsDir)
if err != nil {
glog.Errorf("Cannot get volumes from disk %v", err)
return
}
volumesNeedUpdate := make(map[v1.UniqueVolumeName]*reconstructedVolume)
for _, volume := range podVolumes {
reconstructedVolume, err := rc.reconstructVolume(volume)
if err != nil {
glog.Errorf("Could not construct volume information: %v", err)
if rc.desiredStateOfWorld.VolumeExistsWithSpecName(volume.podName, volume.volumeSpecName) {
glog.V(4).Info("Volume exists in desired state (volume.SpecName %s, pod.UID %s), skip cleaning up mounts", volume.volumeSpecName, volume.podName)
continue
}
// Check if there is an pending operation for the given pod and volume.
// Need to check pending operation before checking the actual and desired
// states to avoid race condition during checking. For example, the following
// might happen if pending operation is checked after checking actual and desired states.
// 1. Checking the pod and it does not exist in either actual or desired state.
// 2. An operation for the given pod finishes and the actual state is updated.
// 3. Checking and there is no pending operation for the given pod.
// During state reconstruction period, no new volume operations could be issued. If the
// mounted path is not in either pending operation, or actual or desired states, this
// volume needs to be reconstructed back to the states.
pending := rc.operationExecutor.IsOperationPending(reconstructedVolume.volumeName, reconstructedVolume.podName)
dswExist := rc.desiredStateOfWorld.PodExistsInVolume(reconstructedVolume.podName, reconstructedVolume.volumeName)
aswExist, _, _ := rc.actualStateOfWorld.PodExistsInVolume(reconstructedVolume.podName, reconstructedVolume.volumeName)
if !rc.StatesHasBeenSynced() {
// In case this is the first time to reconstruct state after kubelet starts, for a persistant volume, it must have
// been mounted before kubelet restarts because no mount operations could be started at this time (node
// status has not yet been updated before this very first syncStates finishes, so that VerifyControllerAttachedVolume will fail),
// In this case, the volume state should be put back to actual state now no matter desired state has it or not.
// This is to prevent node status from being updated to empty for attachable volumes. This might happen because
// in the case that a volume is discovered on disk, and it is part of desired state, but is then quickly deleted
// from the desired state. If in such situation, the volume is not added to the actual state, the node status updater will
// not get this volume from either actual or desired state. In turn, this might cause master controller
// detaching while the volume is still mounted.
if aswExist || !reconstructedVolume.pluginIsAttachable {
continue
}
} else {
// Check pending first since no new operations could be started at this point.
// Otherwise there might a race condition in checking actual states and pending operations
if pending || dswExist || aswExist {
continue
}
if rc.actualStateOfWorld.VolumeExistsWithSpecName(volume.podName, volume.volumeSpecName) {
glog.V(4).Info("Volume exists in actual state (volume.SpecName %s, pod.UID %s), skip cleaning up mounts", volume.volumeSpecName, volume.podName)
continue
}
reconstructedVolume, err := rc.reconstructVolume(volume)
if err != nil {
glog.Warning("Could not construct volume information, cleanup the mounts. (pod.UID %s, volume.SpecName %s): %v", volume.podName, volume.volumeSpecName, err)
rc.cleanupMounts(volume)
continue
}
if rc.operationExecutor.IsOperationPending(reconstructedVolume.volumeName, nestedpendingoperations.EmptyUniquePodName) {
glog.Warning("Volume is in pending operation, skip cleaning up mounts")
}
glog.V(2).Infof(
"Reconciler sync states: could not find pod information in desired or actual states or pending operation, update it in both states: %+v",
"Reconciler sync states: could not find pod information in desired state, update it in actual state: %+v",
reconstructedVolume)
volumesNeedUpdate[reconstructedVolume.volumeName] = reconstructedVolume
}
if len(volumesNeedUpdate) > 0 {
@@ -426,7 +393,31 @@ func (rc *reconciler) syncStates(podsDir string) {
}
// Reconstruct Volume object and reconstructedVolume data structure by reading the pod's volume directories
func (rc *reconciler) cleanupMounts(volume podVolume) {
glog.V(2).Infof("Reconciler sync states: could not find information (PID: %s) (Volume SpecName: %s) in desired state, clean up the mount points",
volume.podName, volume.volumeSpecName)
mountedVolume := operationexecutor.MountedVolume{
PodName: volume.podName,
VolumeName: v1.UniqueVolumeName(volume.volumeSpecName),
InnerVolumeSpecName: volume.volumeSpecName,
PluginName: volume.pluginName,
PodUID: types.UID(volume.podName),
}
volumeHandler, err := operationexecutor.NewVolumeHandlerWithMode(volume.volumeMode, rc.operationExecutor)
if err != nil {
glog.Errorf(mountedVolume.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.NewVolumeHandler for UnmountVolume failed"), err).Error())
return
}
// TODO: Currently cleanupMounts only includes UnmountVolume operation. In the next PR, we will add
// to unmount both volume and device in the same routine.
err = volumeHandler.UnmountVolumeHandler(mountedVolume, rc.actualStateOfWorld)
if err != nil {
glog.Errorf(mountedVolume.GenerateErrorDetailed(fmt.Sprintf("volumeHandler.UnmountVolumeHandler for UnmountVolume failed"), err).Error())
return
}
}
// Reconstruct volume data structure by reading the pod's volume directories
func (rc *reconciler) reconstructVolume(volume podVolume) (*reconstructedVolume, error) {
// plugin initializations
plugin, err := rc.volumePluginMgr.FindPluginByName(volume.pluginName)
@@ -438,23 +429,17 @@ func (rc *reconciler) reconstructVolume(volume podVolume) (*reconstructedVolume,
return nil, err
}
// Create volumeSpec
// Create pod object
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
UID: types.UID(volume.podName),
},
}
// TODO: remove feature gate check after no longer needed
var mapperPlugin volumepkg.BlockVolumePlugin
tmpSpec := &volumepkg.Spec{PersistentVolume: &v1.PersistentVolume{Spec: v1.PersistentVolumeSpec{}}}
if utilfeature.DefaultFeatureGate.Enabled(features.BlockVolume) {
mapperPlugin, err = rc.volumePluginMgr.FindMapperPluginByName(volume.pluginName)
if err != nil {
return nil, err
}
tmpSpec = &volumepkg.Spec{PersistentVolume: &v1.PersistentVolume{Spec: v1.PersistentVolumeSpec{VolumeMode: &volume.volumeMode}}}
volumeHandler, err := operationexecutor.NewVolumeHandlerWithMode(volume.volumeMode, rc.operationExecutor)
if err != nil {
return nil, err
}
volumeHandler, err := operationexecutor.NewVolumeHandler(tmpSpec, rc.operationExecutor)
mapperPlugin, err := rc.volumePluginMgr.FindMapperPluginByName(volume.pluginName)
if err != nil {
return nil, err
}
@@ -480,7 +465,6 @@ func (rc *reconciler) reconstructVolume(volume podVolume) (*reconstructedVolume,
} else {
uniqueVolumeName = volumehelper.GetUniqueVolumeNameForNonAttachableVolume(volume.podName, plugin, volumeSpec)
}
// Check existence of mount point for filesystem volume or symbolic link for block volume
isExist, checkErr := volumeHandler.CheckVolumeExistence(volume.mountPath, volumeSpec.Name(), rc.mounter, uniqueVolumeName, volume.podName, pod.UID, attachablePlugin)
if checkErr != nil {
@@ -507,7 +491,7 @@ func (rc *reconciler) reconstructVolume(volume podVolume) (*reconstructedVolume,
// TODO: remove feature gate check after no longer needed
var volumeMapper volumepkg.BlockVolumeMapper
if utilfeature.DefaultFeatureGate.Enabled(features.BlockVolume) {
if utilfeature.DefaultFeatureGate.Enabled(features.BlockVolume) && volume.volumeMode == v1.PersistentVolumeBlock {
var newMapperErr error
if mapperPlugin != nil {
volumeMapper, newMapperErr = mapperPlugin.NewBlockVolumeMapper(
@@ -530,23 +514,24 @@ func (rc *reconciler) reconstructVolume(volume podVolume) (*reconstructedVolume,
volumeName: uniqueVolumeName,
podName: volume.podName,
volumeSpec: volumeSpec,
// volume.volumeSpecName is actually InnerVolumeSpecName. But this information will likely to be updated in updateStates()
// by checking the desired state volumeToMount list and getting the real OuterVolumeSpecName.
// In case the pod is deleted during this period and desired state does not have this information, it will not be used
// volume.volumeSpecName is actually InnerVolumeSpecName. It will not be used
// for volume cleanup.
// TODO: in case pod is added back before reconciler starts to unmount, we can update this field from desired state information
outerVolumeSpecName: volume.volumeSpecName,
pod: pod,
pluginIsAttachable: attachablePlugin != nil,
attachablePlugin: attachablePlugin,
volumeGidValue: "",
devicePath: "",
mounter: volumeMounter,
blockVolumeMapper: volumeMapper,
// devicePath is updated during updateStates() by checking node status's VolumesAttached data.
// TODO: get device path directly from the volume mount path.
devicePath: "",
mounter: volumeMounter,
blockVolumeMapper: volumeMapper,
}
return reconstructedVolume, nil
}
func (rc *reconciler) updateStates(volumesNeedUpdate map[v1.UniqueVolumeName]*reconstructedVolume) error {
// Get the node status to retrieve volume device path information.
// updateDevicePath gets the node status to retrieve volume device path information.
func (rc *reconciler) updateDevicePath(volumesNeedUpdate map[v1.UniqueVolumeName]*reconstructedVolume) {
node, fetchErr := rc.kubeClient.CoreV1().Nodes().Get(string(rc.nodeName), metav1.GetOptions{})
if fetchErr != nil {
glog.Errorf("updateStates in reconciler: could not get node status with error %v", fetchErr)
@@ -559,26 +544,42 @@ func (rc *reconciler) updateStates(volumesNeedUpdate map[v1.UniqueVolumeName]*re
}
}
}
}
// Get the list of volumes from desired state and update OuterVolumeSpecName if the information is available
volumesToMount := rc.desiredStateOfWorld.GetVolumesToMount()
for _, volumeToMount := range volumesToMount {
if volume, exists := volumesNeedUpdate[volumeToMount.VolumeName]; exists {
volume.outerVolumeSpecName = volumeToMount.OuterVolumeSpecName
volumesNeedUpdate[volumeToMount.VolumeName] = volume
glog.V(4).Infof("Update OuterVolumeSpecName from desired state for volume (%q): %q",
volumeToMount.VolumeName, volume.outerVolumeSpecName)
func getDeviceMountPath(volume *reconstructedVolume) (string, error) {
volumeAttacher, err := volume.attachablePlugin.NewAttacher()
if volumeAttacher == nil || err != nil {
return "", err
}
deviceMountPath, err :=
volumeAttacher.GetDeviceMountPath(volume.volumeSpec)
if err != nil {
return "", err
}
if volume.blockVolumeMapper != nil {
deviceMountPath, err =
volume.blockVolumeMapper.GetGlobalMapPath(volume.volumeSpec)
if err != nil {
return "", err
}
}
return deviceMountPath, nil
}
func (rc *reconciler) updateStates(volumesNeedUpdate map[v1.UniqueVolumeName]*reconstructedVolume) error {
// Get the node status to retrieve volume device path information.
rc.updateDevicePath(volumesNeedUpdate)
for _, volume := range volumesNeedUpdate {
err := rc.actualStateOfWorld.MarkVolumeAsAttached(
//TODO: the devicePath might not be correct for some volume plugins: see issue #54108
volume.volumeName, volume.volumeSpec, "" /* nodeName */, volume.devicePath)
if err != nil {
glog.Errorf("Could not add volume information to actual state of world: %v", err)
continue
}
err = rc.actualStateOfWorld.AddPodToVolume(
err = rc.actualStateOfWorld.MarkVolumeAsMounted(
volume.podName,
types.UID(volume.podName),
volume.volumeName,
@@ -590,22 +591,19 @@ func (rc *reconciler) updateStates(volumesNeedUpdate map[v1.UniqueVolumeName]*re
glog.Errorf("Could not add pod to volume information to actual state of world: %v", err)
continue
}
if volume.pluginIsAttachable {
err = rc.actualStateOfWorld.MarkDeviceAsMounted(volume.volumeName)
glog.V(4).Infof("Volume: %s (pod UID %s) is marked as mounted and added into the actual state", volume.volumeName, volume.podName)
if volume.attachablePlugin != nil {
deviceMountPath, err := getDeviceMountPath(volume)
if err != nil {
glog.Errorf("Could not find device mount path for volume %s", volume.volumeName)
continue
}
err = rc.actualStateOfWorld.MarkDeviceAsMounted(volume.volumeName, volume.devicePath, deviceMountPath)
if err != nil {
glog.Errorf("Could not mark device is mounted to actual state of world: %v", err)
continue
}
glog.Infof("Volume: %v is mounted", volume.volumeName)
}
_, err = rc.desiredStateOfWorld.AddPodToVolume(volume.podName,
volume.pod,
volume.volumeSpec,
volume.outerVolumeSpecName,
volume.volumeGidValue)
if err != nil {
glog.Errorf("Could not add pod to volume information to desired state of world: %v", err)
glog.V(4).Infof("Volume: %s (pod UID %s) is marked device as mounted and added into the actual state", volume.volumeName, volume.podName)
}
}
return nil
@@ -667,6 +665,6 @@ func getVolumesFromPodDir(podDir string) ([]podVolume, error) {
}
}
}
glog.V(10).Infof("Get volumes from pod directory %q %+v", podDir, volumes)
glog.V(4).Infof("Get volumes from pod directory %q %+v", podDir, volumes)
return volumes, nil
}

View File

@@ -933,7 +933,8 @@ func Test_GenerateUnmapDeviceFunc_Plugin_Not_Found(t *testing.T) {
false, /* checkNodeCapabilitiesBeforeMount */
nil))
var mounter mount.Interface
deviceToDetach := operationexecutor.AttachedVolume{VolumeSpec: &volume.Spec{}}
plugins := volumetesting.NewFakeFileVolumePlugin()
deviceToDetach := operationexecutor.AttachedVolume{VolumeSpec: &volume.Spec{}, PluginName: plugins[0].GetPluginName()}
err := oex.UnmapDevice(deviceToDetach, asw, mounter)
// Assert
if assert.Error(t, err) {
@@ -949,7 +950,7 @@ func waitForMount(
volumeName v1.UniqueVolumeName,
asw cache.ActualStateOfWorld) {
err := retryWithExponentialBackOff(
time.Duration(5*time.Millisecond),
time.Duration(500*time.Millisecond),
func() (bool, error) {
mountedVolumes := asw.GetMountedVolumes()
for _, mountedVolume := range mountedVolumes {
@@ -973,7 +974,7 @@ func waitForDetach(
volumeName v1.UniqueVolumeName,
asw cache.ActualStateOfWorld) {
err := retryWithExponentialBackOff(
time.Duration(5*time.Millisecond),
time.Duration(500*time.Millisecond),
func() (bool, error) {
if asw.VolumeExists(volumeName) {
return false, nil

View File

@@ -21,6 +21,7 @@ package mount
import (
"os"
"path/filepath"
"strings"
)
type FileType string
@@ -261,3 +262,21 @@ func isBind(options []string) (bool, []string) {
return bind, bindRemountOpts
}
// TODO: this is a workaround for the unmount device issue caused by gci mounter.
// In GCI cluster, if gci mounter is used for mounting, the container started by mounter
// script will cause additional mounts created in the container. Since these mounts are
// irrelavant to the original mounts, they should be not considered when checking the
// mount references. Current solution is to filter out those mount paths that contain
// the string of original mount path.
// Plan to work on better approach to solve this issue.
func HasMountRefs(mountPath string, mountRefs []string) bool {
count := 0
for _, ref := range mountRefs {
if !strings.Contains(ref, mountPath) {
count = count + 1
}
}
return count > 0
}

View File

@@ -22,7 +22,6 @@ package operationexecutor
import (
"fmt"
"strings"
"time"
"github.com/golang/glog"
@@ -169,7 +168,7 @@ type ActualStateOfWorldMounterUpdater interface {
MarkVolumeAsUnmounted(podName volumetypes.UniquePodName, volumeName v1.UniqueVolumeName) error
// Marks the specified volume as having been globally mounted.
MarkDeviceAsMounted(volumeName v1.UniqueVolumeName) error
MarkDeviceAsMounted(volumeName v1.UniqueVolumeName, devicePath, deviceMountPath string) error
// Marks the specified volume as having its global mount unmounted.
MarkDeviceAsUnmounted(volumeName v1.UniqueVolumeName) error
@@ -386,6 +385,14 @@ type AttachedVolume struct {
// DevicePath contains the path on the node where the volume is attached.
// For non-attachable volumes this is empty.
DevicePath string
// DeviceMountPath contains the path on the node where the device should
// be mounted after it is attached.
DeviceMountPath string
// PluginName is the Unescaped Qualified name of the volume plugin used to
// attach and mount this volume.
PluginName string
}
// GenerateMsgDetailed returns detailed msgs for attached volumes
@@ -529,6 +536,10 @@ type MountedVolume struct {
// VolumeSpec is a volume spec containing the specification for the volume
// that should be mounted.
VolumeSpec *volume.Spec
// DeviceMountPath contains the path on the node where the device should
// be mounted after it is attached.
DeviceMountPath string
}
// GenerateMsgDetailed returns detailed msgs for mounted volumes
@@ -866,6 +877,21 @@ func NewVolumeHandler(volumeSpec *volume.Spec, oe OperationExecutor) (VolumeStat
return volumeHandler, nil
}
// NewVolumeHandlerWithMode return a new instance of volumeHandler depens on a volumeMode
func NewVolumeHandlerWithMode(volumeMode v1.PersistentVolumeMode, oe OperationExecutor) (VolumeStateHandler, error) {
var volumeHandler VolumeStateHandler
if utilfeature.DefaultFeatureGate.Enabled(features.BlockVolume) {
if volumeMode == v1.PersistentVolumeFilesystem {
volumeHandler = NewFilesystemVolumeHandler(oe)
} else {
volumeHandler = NewBlockVolumeHandler(oe)
}
} else {
volumeHandler = NewFilesystemVolumeHandler(oe)
}
return volumeHandler, nil
}
// NewFilesystemVolumeHandler returns a new instance of FilesystemVolumeHandler.
func NewFilesystemVolumeHandler(operationExecutor OperationExecutor) FilesystemVolumeHandler {
return FilesystemVolumeHandler{
@@ -924,7 +950,7 @@ func (f FilesystemVolumeHandler) UnmountDeviceHandler(attachedVolume AttachedVol
// ReconstructVolumeHandler create volumeSpec from mount path
// This method is handler for filesystem volume
func (f FilesystemVolumeHandler) ReconstructVolumeHandler(plugin volume.VolumePlugin, _ volume.BlockVolumePlugin, _ types.UID, _ volumetypes.UniquePodName, volumeSpecName string, mountPath string, _ string) (*volume.Spec, error) {
glog.V(12).Infof("Starting operationExecutor.ReconstructVolumepodName")
glog.V(4).Infof("Starting operationExecutor.ReconstructVolumepodName volume spec name %s, mount path %s", volumeSpecName, mountPath)
volumeSpec, err := plugin.ConstructVolumeSpec(volumeSpecName, mountPath)
if err != nil {
return nil, err
@@ -1024,21 +1050,3 @@ func (b BlockVolumeHandler) CheckVolumeExistence(mountPath, volumeName string, m
}
return islinkExist, nil
}
// TODO: this is a workaround for the unmount device issue caused by gci mounter.
// In GCI cluster, if gci mounter is used for mounting, the container started by mounter
// script will cause additional mounts created in the container. Since these mounts are
// irrelavant to the original mounts, they should be not considered when checking the
// mount references. Current solution is to filter out those mount paths that contain
// the string of original mount path.
// Plan to work on better approach to solve this issue.
func hasMountRefs(mountPath string, mountRefs []string) bool {
count := 0
for _, ref := range mountRefs {
if !strings.Contains(ref, mountPath) {
count = count + 1
}
}
return count > 0
}

View File

@@ -514,7 +514,7 @@ func (og *operationGenerator) GenerateMountVolumeFunc(
// Update actual state of world to reflect volume is globally mounted
markDeviceMountedErr := actualStateOfWorld.MarkDeviceAsMounted(
volumeToMount.VolumeName)
volumeToMount.VolumeName, devicePath, deviceMountPath)
if markDeviceMountedErr != nil {
// On failure, return error. Caller will log and retry.
return volumeToMount.GenerateError("MountVolume.MarkDeviceAsMounted failed", markDeviceMountedErr)
@@ -707,7 +707,7 @@ func (og *operationGenerator) GenerateUnmountDeviceFunc(
mounter mount.Interface) (volumetypes.GeneratedOperations, error) {
// Get attacher plugin
attachableVolumePlugin, err :=
og.volumePluginMgr.FindAttachablePluginBySpec(deviceToDetach.VolumeSpec)
og.volumePluginMgr.FindAttachablePluginByName(deviceToDetach.PluginName)
if err != nil || attachableVolumePlugin == nil {
return volumetypes.GeneratedOperations{}, deviceToDetach.GenerateErrorDetailed("UnmountDevice.FindAttachablePluginBySpec failed", err)
}
@@ -716,22 +716,11 @@ func (og *operationGenerator) GenerateUnmountDeviceFunc(
if err != nil {
return volumetypes.GeneratedOperations{}, deviceToDetach.GenerateErrorDetailed("UnmountDevice.NewDetacher failed", err)
}
volumeAttacher, err := attachableVolumePlugin.NewAttacher()
if err != nil {
return volumetypes.GeneratedOperations{}, deviceToDetach.GenerateErrorDetailed("UnmountDevice.NewAttacher failed", err)
}
unmountDeviceFunc := func() (error, error) {
deviceMountPath, err :=
volumeAttacher.GetDeviceMountPath(deviceToDetach.VolumeSpec)
if err != nil {
// On failure, return error. Caller will log and retry.
return deviceToDetach.GenerateError("GetDeviceMountPath failed", err)
}
deviceMountPath := deviceToDetach.DeviceMountPath
refs, err := attachableVolumePlugin.GetDeviceMountRefs(deviceMountPath)
if err != nil || hasMountRefs(deviceMountPath, refs) {
if err != nil || mount.HasMountRefs(deviceMountPath, refs) {
if err == nil {
err = fmt.Errorf("The device mount path %q is still mounted by other references %v", deviceMountPath, refs)
}
@@ -828,6 +817,13 @@ func (og *operationGenerator) GenerateMapVolumeFunc(
mapVolumeFunc := func() (error, error) {
var devicePath string
// Set up global map path under the given plugin directory using symbolic link
globalMapPath, err :=
blockVolumeMapper.GetGlobalMapPath(volumeToMount.VolumeSpec)
if err != nil {
// On failure, return error. Caller will log and retry.
return volumeToMount.GenerateError("MapVolume.GetDeviceMountPath failed", err)
}
if volumeAttacher != nil {
// Wait for attachable volumes to finish attaching
glog.Infof(volumeToMount.GenerateMsgDetailed("MapVolume.WaitForAttach entering", fmt.Sprintf("DevicePath %q", volumeToMount.DevicePath)))
@@ -843,7 +839,7 @@ func (og *operationGenerator) GenerateMapVolumeFunc(
// Update actual state of world to reflect volume is globally mounted
markDeviceMappedErr := actualStateOfWorld.MarkDeviceAsMounted(
volumeToMount.VolumeName)
volumeToMount.VolumeName, devicePath, globalMapPath)
if markDeviceMappedErr != nil {
// On failure, return error. Caller will log and retry.
return volumeToMount.GenerateError("MapVolume.MarkDeviceAsMounted failed", markDeviceMappedErr)
@@ -863,18 +859,13 @@ func (og *operationGenerator) GenerateMapVolumeFunc(
return volumeToMount.GenerateError("MapVolume failed", fmt.Errorf("Device path of the volume is empty"))
}
}
// Set up global map path under the given plugin directory using symbolic link
globalMapPath, err :=
blockVolumeMapper.GetGlobalMapPath(volumeToMount.VolumeSpec)
if err != nil {
// On failure, return error. Caller will log and retry.
return volumeToMount.GenerateError("MapVolume.GetDeviceMountPath failed", err)
}
mapErr = og.blkUtil.MapDevice(devicePath, globalMapPath, string(volumeToMount.Pod.UID))
if mapErr != nil {
// On failure, return error. Caller will log and retry.
return volumeToMount.GenerateError("MapVolume.MapDevice failed", mapErr)
}
// Device mapping for global map path succeeded
simpleMsg, detailedMsg := volumeToMount.GenerateMsg("MapVolume.MapDevice succeeded", fmt.Sprintf("globalMapPath %q", globalMapPath))
verbosity := glog.Level(4)
@@ -969,8 +960,7 @@ func (og *operationGenerator) GenerateUnmapVolumeFunc(
}
// Try to unmap podUID symlink under global map path dir
// plugins/kubernetes.io/{PluginName}/volumeDevices/{volumePluginDependentPath}/{podUID}
globalUnmapPath, err :=
blockVolumeUnmapper.GetGlobalMapPath(volumeToUnmount.VolumeSpec)
globalUnmapPath := volumeToUnmount.DeviceMountPath
if err != nil {
// On failure, return error. Caller will log and retry.
return volumeToUnmount.GenerateError("UnmapVolume.GetGlobalUnmapPath failed", err)
@@ -1024,23 +1014,14 @@ func (og *operationGenerator) GenerateUnmapDeviceFunc(
actualStateOfWorld ActualStateOfWorldMounterUpdater,
mounter mount.Interface) (volumetypes.GeneratedOperations, error) {
// Get block volume mapper plugin
var blockVolumeMapper volume.BlockVolumeMapper
blockVolumePlugin, err :=
og.volumePluginMgr.FindMapperPluginBySpec(deviceToDetach.VolumeSpec)
og.volumePluginMgr.FindMapperPluginByName(deviceToDetach.PluginName)
if err != nil {
return volumetypes.GeneratedOperations{}, deviceToDetach.GenerateErrorDetailed("UnmapDevice.FindMapperPluginBySpec failed", err)
}
if blockVolumePlugin == nil {
return volumetypes.GeneratedOperations{}, deviceToDetach.GenerateErrorDetailed("UnmapDevice.FindMapperPluginBySpec failed to find BlockVolumeMapper plugin. Volume plugin is nil.", nil)
}
blockVolumeMapper, newMapperErr := blockVolumePlugin.NewBlockVolumeMapper(
deviceToDetach.VolumeSpec,
nil, /* Pod */
volume.VolumeOptions{})
if newMapperErr != nil {
return volumetypes.GeneratedOperations{}, deviceToDetach.GenerateErrorDetailed("UnmapDevice.NewBlockVolumeMapper initialization failed", newMapperErr)
}
blockVolumeUnmapper, newUnmapperErr := blockVolumePlugin.NewBlockVolumeUnmapper(
string(deviceToDetach.VolumeName),
@@ -1052,8 +1033,7 @@ func (og *operationGenerator) GenerateUnmapDeviceFunc(
unmapDeviceFunc := func() (error, error) {
// Search under globalMapPath dir if all symbolic links from pods have been removed already.
// If symbolick links are there, pods may still refer the volume.
globalMapPath, err :=
blockVolumeMapper.GetGlobalMapPath(deviceToDetach.VolumeSpec)
globalMapPath := deviceToDetach.DeviceMountPath
if err != nil {
// On failure, return error. Caller will log and retry.
return deviceToDetach.GenerateError("UnmapDevice.GetGlobalMapPath failed", err)

View File

@@ -223,6 +223,10 @@ var _ = utils.SIGDescribe("PersistentVolumes[Disruptive][Flaky]", func() {
testItStmt: "Should test that a volume mounted to a pod that is deleted while the kubelet is down unmounts when the kubelet returns.",
runTest: utils.TestVolumeUnmountsFromDeletedPod,
},
{
testItStmt: "Should test that a volume mounted to a pod that is force deleted while the kubelet is down unmounts when the kubelet returns.",
runTest: utils.TestVolumeUnmountsFromForceDeletedPod,
},
}
// Test loop executes each disruptiveTest iteratively.

View File

@@ -156,7 +156,8 @@ func TestKubeletRestartsAndRestoresMount(c clientset.Interface, f *framework.Fra
}
// TestVolumeUnmountsFromDeletedPod tests that a volume unmounts if the client pod was deleted while the kubelet was down.
func TestVolumeUnmountsFromDeletedPod(c clientset.Interface, f *framework.Framework, clientPod *v1.Pod, pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume) {
// forceDelete is true indicating whether the pod is forcelly deleted.
func TestVolumeUnmountsFromDeletedPodWithForceOption(c clientset.Interface, f *framework.Framework, clientPod *v1.Pod, pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume, forceDelete bool) {
nodeIP, err := framework.GetHostExternalAddress(c, clientPod)
Expect(err).NotTo(HaveOccurred())
nodeIP = nodeIP + ":22"
@@ -175,7 +176,11 @@ func TestVolumeUnmountsFromDeletedPod(c clientset.Interface, f *framework.Framew
}
}()
By(fmt.Sprintf("Deleting Pod %q", clientPod.Name))
err = c.CoreV1().Pods(clientPod.Namespace).Delete(clientPod.Name, &metav1.DeleteOptions{})
if forceDelete {
err = c.CoreV1().Pods(clientPod.Namespace).Delete(clientPod.Name, metav1.NewDeleteOptions(0))
} else {
err = c.CoreV1().Pods(clientPod.Namespace).Delete(clientPod.Name, &metav1.DeleteOptions{})
}
Expect(err).NotTo(HaveOccurred())
By("Starting the kubelet and waiting for pod to delete.")
KubeletCommand(KStart, c, clientPod)
@@ -184,6 +189,11 @@ func TestVolumeUnmountsFromDeletedPod(c clientset.Interface, f *framework.Framew
Expect(err).NotTo(HaveOccurred(), "Expected pod to terminate.")
}
if forceDelete {
// With forceDelete, since pods are immediately deleted from API server, there is no way to be sure when volumes are torn down
// so wait some time to finish
time.Sleep(30 * time.Second)
}
By("Expecting the volume mount not to be found.")
result, err = framework.SSH(fmt.Sprintf("mount | grep %s", clientPod.UID), nodeIP, framework.TestContext.Provider)
framework.LogSSHResult(result)
@@ -192,6 +202,16 @@ func TestVolumeUnmountsFromDeletedPod(c clientset.Interface, f *framework.Framew
framework.Logf("Volume unmounted on node %s", clientPod.Spec.NodeName)
}
// TestVolumeUnmountsFromDeletedPod tests that a volume unmounts if the client pod was deleted while the kubelet was down.
func TestVolumeUnmountsFromDeletedPod(c clientset.Interface, f *framework.Framework, clientPod *v1.Pod, pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume) {
TestVolumeUnmountsFromDeletedPodWithForceOption(c, f, clientPod, pvc, pv, false)
}
// TestVolumeUnmountsFromFoceDeletedPod tests that a volume unmounts if the client pod was forcelly deleted while the kubelet was down.
func TestVolumeUnmountsFromForceDeletedPod(c clientset.Interface, f *framework.Framework, clientPod *v1.Pod, pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume) {
TestVolumeUnmountsFromDeletedPodWithForceOption(c, f, clientPod, pvc, pv, true)
}
// RunInPodWithVolume runs a command in a pod with given claim mounted to /mnt directory.
func RunInPodWithVolume(c clientset.Interface, ns, claimName, command string) {
pod := &v1.Pod{