Merge pull request #113262 from jsafrane/rework-reconstruction

Rework volume reconstruction
This commit is contained in:
Kubernetes Prow Robot 2022-11-07 12:42:29 -08:00 committed by GitHub
commit 1c230d519e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
43 changed files with 1720 additions and 875 deletions

View File

@ -424,7 +424,7 @@ func (plugin *TestPlugin) NewUnmounter(name string, podUID types.UID) (volume.Un
return nil, nil
}
func (plugin *TestPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) {
func (plugin *TestPlugin) ConstructVolumeSpec(volumeName, mountPath string) (volume.ReconstructedVolume, error) {
fakeVolume := &v1.Volume{
Name: volumeName,
VolumeSource: v1.VolumeSource{
@ -435,7 +435,9 @@ func (plugin *TestPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*vo
},
},
}
return volume.NewSpecFromVolume(fakeVolume), nil
return volume.ReconstructedVolume{
Spec: volume.NewSpecFromVolume(fakeVolume),
}, nil
}
func (plugin *TestPlugin) NewAttacher() (volume.Attacher, error) {

View File

@ -967,8 +967,8 @@ func (plugin *mockVolumePlugin) SupportsBulkVolumeVerification() bool {
return false
}
func (plugin *mockVolumePlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) {
return nil, nil
func (plugin *mockVolumePlugin) ConstructVolumeSpec(volumeName, mountPath string) (volume.ReconstructedVolume, error) {
return volume.ReconstructedVolume{}, nil
}
func (plugin *mockVolumePlugin) SupportsSELinuxContextMount(spec *volume.Spec) (bool, error) {

View File

@ -171,6 +171,12 @@ type ActualStateOfWorld interface {
// SyncReconstructedVolume check the volume.outerVolumeSpecName in asw and
// the one populated from dsw , if they do not match, update this field from the value from dsw.
SyncReconstructedVolume(volumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName, outerVolumeSpecName string)
// UpdateReconstructedDevicePath updates devicePath of a reconstructed volume
// from Node.Status.VolumesAttached. The ASW is updated only when the volume is still
// uncertain. If the volume got mounted in the meantime, its devicePath must have
// been fixed by such an update.
UpdateReconstructedDevicePath(volumeName v1.UniqueVolumeName, devicePath string)
}
// MountedVolume represents a volume that has successfully been mounted to a pod.
@ -501,6 +507,24 @@ func (asw *actualStateOfWorld) MarkDeviceAsUnmounted(
return asw.SetDeviceMountState(volumeName, operationexecutor.DeviceNotMounted, "", "", "")
}
func (asw *actualStateOfWorld) UpdateReconstructedDevicePath(volumeName v1.UniqueVolumeName, devicePath string) {
asw.Lock()
defer asw.Unlock()
volumeObj, volumeExists := asw.attachedVolumes[volumeName]
if !volumeExists {
return
}
if volumeObj.deviceMountState != operationexecutor.DeviceMountUncertain {
// Reconciler must have updated volume state, i.e. when a pod uses the volume and
// succeeded mounting the volume. Such update has fixed the device path.
return
}
volumeObj.devicePath = devicePath
asw.attachedVolumes[volumeName] = volumeObj
}
func (asw *actualStateOfWorld) GetDeviceMountState(volumeName v1.UniqueVolumeName) operationexecutor.DeviceMountState {
asw.RLock()
defer asw.RUnlock()
@ -636,7 +660,16 @@ func (asw *actualStateOfWorld) AddPodToVolume(markVolumeOpts operationexecutor.M
}
podObj, podExists := volumeObj.mountedPods[podName]
if !podExists {
updateUncertainVolume := false
if podExists {
// Update uncertain volumes - the new markVolumeOpts may have updated information.
// Especially reconstructed volumes (marked as uncertain during reconstruction) need
// an update.
updateUncertainVolume = utilfeature.DefaultFeatureGate.Enabled(features.SELinuxMountReadWriteOncePod) && podObj.volumeMountStateForPod == operationexecutor.VolumeMountUncertain
}
if !podExists || updateUncertainVolume {
// Add new mountedPod or update existing one.
podObj = mountedPod{
podName: podName,
podUID: podUID,

View File

@ -34,8 +34,10 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/component-helpers/storage/ephemeral"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/kubelet/config"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/pod"
@ -153,7 +155,10 @@ func (dswp *desiredStateOfWorldPopulator) Run(sourcesReady config.SourcesReady,
return done, nil
}, stopCh)
dswp.hasAddedPodsLock.Lock()
dswp.hasAddedPods = true
if !dswp.hasAddedPods {
klog.InfoS("Finished populating initial desired state of world")
dswp.hasAddedPods = true
}
dswp.hasAddedPodsLock.Unlock()
wait.Until(dswp.populatorLoop, dswp.loopSleepDuration, stopCh)
}
@ -312,8 +317,12 @@ func (dswp *desiredStateOfWorldPopulator) processPodVolumes(
} else {
klog.V(4).InfoS("Added volume to desired state", "pod", klog.KObj(pod), "volumeName", podVolume.Name, "volumeSpecName", volumeSpec.Name())
}
// sync reconstructed volume
dswp.actualStateOfWorld.SyncReconstructedVolume(uniqueVolumeName, uniquePodName, podVolume.Name)
if !utilfeature.DefaultFeatureGate.Enabled(features.SELinuxMountReadWriteOncePod) {
// sync reconstructed volume. This is necessary only when the old-style reconstruction is still used.
// With reconstruct_new.go, AWS.MarkVolumeAsMounted will update the outer spec name of previously
// uncertain volumes.
dswp.actualStateOfWorld.SyncReconstructedVolume(uniqueVolumeName, uniquePodName, podVolume.Name)
}
dswp.checkVolumeFSResize(pod, podVolume, pvc, volumeSpec, uniquePodName, mountedVolumesForPod)
}

View File

@ -84,6 +84,9 @@ func prepareDswpWithVolume(t *testing.T) (*desiredStateOfWorldPopulator, kubepod
}
func TestFindAndAddNewPods_WithRescontructedVolume(t *testing.T) {
// Outer volume spec replacement is needed only when the old volume reconstruction is used
// (i.e. with SELinuxMountReadWriteOncePod disabled)
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SELinuxMountReadWriteOncePod, false)()
// create dswp
dswp, fakePodManager := prepareDswpWithVolume(t)

View File

@ -20,141 +20,13 @@ limitations under the License.
package reconciler
import (
"context"
"fmt"
"io/fs"
"os"
"path"
"path/filepath"
"time"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/klog/v2"
"k8s.io/mount-utils"
utilpath "k8s.io/utils/path"
utilstrings "k8s.io/utils/strings"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/kubernetes/pkg/kubelet/config"
"k8s.io/kubernetes/pkg/kubelet/volumemanager/cache"
"k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff"
volumepkg "k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/util"
"k8s.io/kubernetes/pkg/volume/util/hostutil"
"k8s.io/kubernetes/pkg/volume/util/nestedpendingoperations"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/volume/util/operationexecutor"
volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
)
// Reconciler runs a periodic loop to reconcile the desired state of the world
// with the actual state of the world by triggering attach, detach, mount, and
// unmount operations.
// Note: This is distinct from the Reconciler implemented by the attach/detach
// controller. This reconciles state for the kubelet volume manager. That
// reconciles state for the attach/detach controller.
type Reconciler interface {
// Starts running the reconciliation loop which executes periodically, checks
// if volumes that should be mounted are mounted and volumes that should
// be unmounted are unmounted. If not, it will trigger mount/unmount
// operations to rectify.
// If attach/detach management is enabled, the manager will also check if
// volumes that should be attached are attached and volumes that should
// be detached are detached and trigger attach/detach operations as needed.
Run(stopCh <-chan struct{})
// StatesHasBeenSynced returns true only after syncStates process starts to sync
// states at least once after kubelet starts
StatesHasBeenSynced() bool
}
// NewReconciler returns a new instance of Reconciler.
//
// controllerAttachDetachEnabled - if true, indicates that the attach/detach
//
// controller is responsible for managing the attach/detach operations for
// this node, and therefore the volume manager should not
//
// loopSleepDuration - the amount of time the reconciler loop sleeps between
//
// successive executions
//
// waitForAttachTimeout - the amount of time the Mount function will wait for
//
// the volume to be attached
//
// nodeName - the Name for this node, used by Attach and Detach methods
// desiredStateOfWorld - cache containing the desired state of the world
// actualStateOfWorld - cache containing the actual state of the world
// populatorHasAddedPods - checker for whether the populator has finished
//
// adding pods to the desiredStateOfWorld cache at least once after sources
// are all ready (before sources are ready, pods are probably missing)
//
// operationExecutor - used to trigger attach/detach/mount/unmount operations
//
// safely (prevents more than one operation from being triggered on the same
// volume)
//
// mounter - mounter passed in from kubelet, passed down unmount path
// hostutil - hostutil passed in from kubelet
// volumePluginMgr - volume plugin manager passed from kubelet
func NewReconciler(
kubeClient clientset.Interface,
controllerAttachDetachEnabled bool,
loopSleepDuration time.Duration,
waitForAttachTimeout time.Duration,
nodeName types.NodeName,
desiredStateOfWorld cache.DesiredStateOfWorld,
actualStateOfWorld cache.ActualStateOfWorld,
populatorHasAddedPods func() bool,
operationExecutor operationexecutor.OperationExecutor,
mounter mount.Interface,
hostutil hostutil.HostUtils,
volumePluginMgr *volumepkg.VolumePluginMgr,
kubeletPodsDir string) Reconciler {
return &reconciler{
kubeClient: kubeClient,
controllerAttachDetachEnabled: controllerAttachDetachEnabled,
loopSleepDuration: loopSleepDuration,
waitForAttachTimeout: waitForAttachTimeout,
nodeName: nodeName,
desiredStateOfWorld: desiredStateOfWorld,
actualStateOfWorld: actualStateOfWorld,
populatorHasAddedPods: populatorHasAddedPods,
operationExecutor: operationExecutor,
mounter: mounter,
hostutil: hostutil,
skippedDuringReconstruction: map[v1.UniqueVolumeName]*globalVolumeInfo{},
volumePluginMgr: volumePluginMgr,
kubeletPodsDir: kubeletPodsDir,
timeOfLastSync: time.Time{},
}
}
type reconciler struct {
kubeClient clientset.Interface
controllerAttachDetachEnabled bool
loopSleepDuration time.Duration
waitForAttachTimeout time.Duration
nodeName types.NodeName
desiredStateOfWorld cache.DesiredStateOfWorld
actualStateOfWorld cache.ActualStateOfWorld
populatorHasAddedPods func() bool
operationExecutor operationexecutor.OperationExecutor
mounter mount.Interface
hostutil hostutil.HostUtils
volumePluginMgr *volumepkg.VolumePluginMgr
skippedDuringReconstruction map[v1.UniqueVolumeName]*globalVolumeInfo
kubeletPodsDir string
timeOfLastSync time.Time
}
func (rc *reconciler) Run(stopCh <-chan struct{}) {
func (rc *reconciler) runOld(stopCh <-chan struct{}) {
wait.Until(rc.reconciliationLoopFunc(), rc.loopSleepDuration, stopCh)
}
@ -196,84 +68,6 @@ func (rc *reconciler) reconcile() {
}
}
func (rc *reconciler) unmountVolumes() {
// Ensure volumes that should be unmounted are unmounted.
for _, mountedVolume := range rc.actualStateOfWorld.GetAllMountedVolumes() {
if !rc.desiredStateOfWorld.PodExistsInVolume(mountedVolume.PodName, mountedVolume.VolumeName, mountedVolume.SELinuxMountContext) {
// Volume is mounted, unmount it
klog.V(5).InfoS(mountedVolume.GenerateMsgDetailed("Starting operationExecutor.UnmountVolume", ""))
err := rc.operationExecutor.UnmountVolume(
mountedVolume.MountedVolume, rc.actualStateOfWorld, rc.kubeletPodsDir)
if err != nil && !isExpectedError(err) {
klog.ErrorS(err, mountedVolume.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.UnmountVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error())
}
if err == nil {
klog.InfoS(mountedVolume.GenerateMsgDetailed("operationExecutor.UnmountVolume started", ""))
}
}
}
}
func (rc *reconciler) mountOrAttachVolumes() {
// Ensure volumes that should be attached/mounted are attached/mounted.
for _, volumeToMount := range rc.desiredStateOfWorld.GetVolumesToMount() {
volMounted, devicePath, err := rc.actualStateOfWorld.PodExistsInVolume(volumeToMount.PodName, volumeToMount.VolumeName, volumeToMount.PersistentVolumeSize, volumeToMount.SELinuxLabel)
volumeToMount.DevicePath = devicePath
if cache.IsSELinuxMountMismatchError(err) {
// The volume is mounted, but with an unexpected SELinux context.
// It will get unmounted in unmountVolumes / unmountDetachDevices and
// then removed from actualStateOfWorld.
rc.desiredStateOfWorld.AddErrorToPod(volumeToMount.PodName, err.Error())
continue
} else if cache.IsVolumeNotAttachedError(err) {
rc.waitForVolumeAttach(volumeToMount)
} else if !volMounted || cache.IsRemountRequiredError(err) {
rc.mountAttachedVolumes(volumeToMount, err)
} else if cache.IsFSResizeRequiredError(err) {
fsResizeRequiredErr, _ := err.(cache.FsResizeRequiredError)
rc.expandVolume(volumeToMount, fsResizeRequiredErr.CurrentSize)
}
}
}
func (rc *reconciler) expandVolume(volumeToMount cache.VolumeToMount, currentSize resource.Quantity) {
klog.V(4).InfoS(volumeToMount.GenerateMsgDetailed("Starting operationExecutor.ExpandInUseVolume", ""), "pod", klog.KObj(volumeToMount.Pod))
err := rc.operationExecutor.ExpandInUseVolume(volumeToMount.VolumeToMount, rc.actualStateOfWorld, currentSize)
if err != nil && !isExpectedError(err) {
klog.ErrorS(err, volumeToMount.GenerateErrorDetailed("operationExecutor.ExpandInUseVolume failed", err).Error(), "pod", klog.KObj(volumeToMount.Pod))
}
if err == nil {
klog.V(4).InfoS(volumeToMount.GenerateMsgDetailed("operationExecutor.ExpandInUseVolume started", ""), "pod", klog.KObj(volumeToMount.Pod))
}
}
func (rc *reconciler) mountAttachedVolumes(volumeToMount cache.VolumeToMount, podExistError error) {
// Volume is not mounted, or is already mounted, but requires remounting
remountingLogStr := ""
isRemount := cache.IsRemountRequiredError(podExistError)
if isRemount {
remountingLogStr = "Volume is already mounted to pod, but remount was requested."
}
klog.V(4).InfoS(volumeToMount.GenerateMsgDetailed("Starting operationExecutor.MountVolume", remountingLogStr), "pod", klog.KObj(volumeToMount.Pod))
err := rc.operationExecutor.MountVolume(
rc.waitForAttachTimeout,
volumeToMount.VolumeToMount,
rc.actualStateOfWorld,
isRemount)
if err != nil && !isExpectedError(err) {
klog.ErrorS(err, volumeToMount.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.MountVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error(), "pod", klog.KObj(volumeToMount.Pod))
}
if err == nil {
if remountingLogStr == "" {
klog.V(1).InfoS(volumeToMount.GenerateMsgDetailed("operationExecutor.MountVolume started", remountingLogStr), "pod", klog.KObj(volumeToMount.Pod))
} else {
klog.V(5).InfoS(volumeToMount.GenerateMsgDetailed("operationExecutor.MountVolume started", remountingLogStr), "pod", klog.KObj(volumeToMount.Pod))
}
}
}
// processReconstructedVolumes checks volumes which were skipped during the reconstruction
// process because it was assumed that since these volumes were present in DSOW they would get
// mounted correctly and make it into ASOW.
@ -336,528 +130,3 @@ func (rc *reconciler) processReconstructedVolumes() {
}
}
}
func (rc *reconciler) waitForVolumeAttach(volumeToMount cache.VolumeToMount) {
if rc.controllerAttachDetachEnabled || !volumeToMount.PluginIsAttachable {
//// lets not spin a goroutine and unnecessarily trigger exponential backoff if this happens
if volumeToMount.PluginIsAttachable && !volumeToMount.ReportedInUse {
klog.V(5).InfoS(volumeToMount.GenerateMsgDetailed("operationExecutor.VerifyControllerAttachedVolume failed", " volume not marked in-use"), "pod", klog.KObj(volumeToMount.Pod))
return
}
// Volume is not attached (or doesn't implement attacher), kubelet attach is disabled, wait
// for controller to finish attaching volume.
klog.V(5).InfoS(volumeToMount.GenerateMsgDetailed("Starting operationExecutor.VerifyControllerAttachedVolume", ""), "pod", klog.KObj(volumeToMount.Pod))
err := rc.operationExecutor.VerifyControllerAttachedVolume(
volumeToMount.VolumeToMount,
rc.nodeName,
rc.actualStateOfWorld)
if err != nil && !isExpectedError(err) {
klog.ErrorS(err, volumeToMount.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.VerifyControllerAttachedVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error(), "pod", klog.KObj(volumeToMount.Pod))
}
if err == nil {
klog.InfoS(volumeToMount.GenerateMsgDetailed("operationExecutor.VerifyControllerAttachedVolume started", ""), "pod", klog.KObj(volumeToMount.Pod))
}
} else {
// Volume is not attached to node, kubelet attach is enabled, volume implements an attacher,
// so attach it
volumeToAttach := operationexecutor.VolumeToAttach{
VolumeName: volumeToMount.VolumeName,
VolumeSpec: volumeToMount.VolumeSpec,
NodeName: rc.nodeName,
}
klog.V(5).InfoS(volumeToAttach.GenerateMsgDetailed("Starting operationExecutor.AttachVolume", ""), "pod", klog.KObj(volumeToMount.Pod))
err := rc.operationExecutor.AttachVolume(volumeToAttach, rc.actualStateOfWorld)
if err != nil && !isExpectedError(err) {
klog.ErrorS(err, volumeToMount.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.AttachVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error(), "pod", klog.KObj(volumeToMount.Pod))
}
if err == nil {
klog.InfoS(volumeToMount.GenerateMsgDetailed("operationExecutor.AttachVolume started", ""), "pod", klog.KObj(volumeToMount.Pod))
}
}
}
func (rc *reconciler) unmountDetachDevices() {
for _, attachedVolume := range rc.actualStateOfWorld.GetUnmountedVolumes() {
// Check IsOperationPending to avoid marking a volume as detached if it's in the process of mounting.
if !rc.desiredStateOfWorld.VolumeExists(attachedVolume.VolumeName, attachedVolume.SELinuxMountContext) &&
!rc.operationExecutor.IsOperationPending(attachedVolume.VolumeName, nestedpendingoperations.EmptyUniquePodName, nestedpendingoperations.EmptyNodeName) {
if attachedVolume.DeviceMayBeMounted() {
// Volume is globally mounted to device, unmount it
klog.V(5).InfoS(attachedVolume.GenerateMsgDetailed("Starting operationExecutor.UnmountDevice", ""))
err := rc.operationExecutor.UnmountDevice(
attachedVolume.AttachedVolume, rc.actualStateOfWorld, rc.hostutil)
if err != nil && !isExpectedError(err) {
klog.ErrorS(err, attachedVolume.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.UnmountDevice failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error())
}
if err == nil {
klog.InfoS(attachedVolume.GenerateMsgDetailed("operationExecutor.UnmountDevice started", ""))
}
} else {
// Volume is attached to node, detach it
// Kubelet not responsible for detaching or this volume has a non-attachable volume plugin.
if rc.controllerAttachDetachEnabled || !attachedVolume.PluginIsAttachable {
rc.actualStateOfWorld.MarkVolumeAsDetached(attachedVolume.VolumeName, attachedVolume.NodeName)
klog.InfoS(attachedVolume.GenerateMsgDetailed("Volume detached", fmt.Sprintf("DevicePath %q", attachedVolume.DevicePath)))
} else {
// Only detach if kubelet detach is enabled
klog.V(5).InfoS(attachedVolume.GenerateMsgDetailed("Starting operationExecutor.DetachVolume", ""))
err := rc.operationExecutor.DetachVolume(
attachedVolume.AttachedVolume, false /* verifySafeToDetach */, rc.actualStateOfWorld)
if err != nil && !isExpectedError(err) {
klog.ErrorS(err, attachedVolume.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.DetachVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error())
}
if err == nil {
klog.InfoS(attachedVolume.GenerateMsgDetailed("operationExecutor.DetachVolume started", ""))
}
}
}
}
}
}
// sync process tries to observe the real world by scanning all pods' volume directories from the disk.
// If the actual and desired state of worlds are not consistent with the observed world, it means that some
// mounted volumes are left out probably during kubelet restart. This process will reconstruct
// the volumes and update the actual and desired states. For the volumes that cannot support reconstruction,
// it will try to clean up the mount paths with operation executor.
func (rc *reconciler) sync() {
defer rc.updateLastSyncTime()
rc.syncStates(rc.kubeletPodsDir)
}
func (rc *reconciler) updateLastSyncTime() {
rc.timeOfLastSync = time.Now()
}
func (rc *reconciler) StatesHasBeenSynced() bool {
return !rc.timeOfLastSync.IsZero()
}
type podVolume struct {
podName volumetypes.UniquePodName
volumeSpecName string
volumePath string
pluginName string
volumeMode v1.PersistentVolumeMode
}
type reconstructedVolume struct {
volumeName v1.UniqueVolumeName
podName volumetypes.UniquePodName
volumeSpec *volumepkg.Spec
outerVolumeSpecName string
pod *v1.Pod
volumeGidValue string
devicePath string
mounter volumepkg.Mounter
deviceMounter volumepkg.DeviceMounter
blockVolumeMapper volumepkg.BlockVolumeMapper
}
// globalVolumeInfo stores reconstructed volume information
// for each pod that was using that volume.
type globalVolumeInfo struct {
volumeName v1.UniqueVolumeName
volumeSpec *volumepkg.Spec
devicePath string
mounter volumepkg.Mounter
deviceMounter volumepkg.DeviceMounter
blockVolumeMapper volumepkg.BlockVolumeMapper
podVolumes map[volumetypes.UniquePodName]*reconstructedVolume
}
func (gvi *globalVolumeInfo) addPodVolume(rcv *reconstructedVolume) {
if gvi.podVolumes == nil {
gvi.podVolumes = map[volumetypes.UniquePodName]*reconstructedVolume{}
}
gvi.podVolumes[rcv.podName] = rcv
}
// syncStates scans the volume directories under the given pod directory.
// If the volume is not in desired state of world, this function will reconstruct
// the volume related information and put it in both the actual and desired state of worlds.
// For some volume plugins that cannot support reconstruction, it will clean up the existing
// mount points since the volume is no long needed (removed from desired state)
func (rc *reconciler) syncStates(kubeletPodDir string) {
// Get volumes information by reading the pod's directory
podVolumes, err := getVolumesFromPodDir(kubeletPodDir)
if err != nil {
klog.ErrorS(err, "Cannot get volumes from disk, skip sync states for volume reconstruction")
return
}
volumesNeedUpdate := make(map[v1.UniqueVolumeName]*globalVolumeInfo)
volumeNeedReport := []v1.UniqueVolumeName{}
for _, volume := range podVolumes {
if rc.actualStateOfWorld.VolumeExistsWithSpecName(volume.podName, volume.volumeSpecName) {
klog.V(4).InfoS("Volume exists in actual state, skip cleaning up mounts", "podName", volume.podName, "volumeSpecName", volume.volumeSpecName)
// There is nothing to reconstruct
continue
}
volumeInDSW := rc.desiredStateOfWorld.VolumeExistsWithSpecName(volume.podName, volume.volumeSpecName)
reconstructedVolume, err := rc.reconstructVolume(volume)
if err != nil {
if volumeInDSW {
// Some pod needs the volume, don't clean it up and hope that
// reconcile() calls SetUp and reconstructs the volume in ASW.
klog.V(4).InfoS("Volume exists in desired state, skip cleaning up mounts", "podName", volume.podName, "volumeSpecName", volume.volumeSpecName)
continue
}
// No pod needs the volume.
klog.InfoS("Could not construct volume information, cleaning up mounts", "podName", volume.podName, "volumeSpecName", volume.volumeSpecName, "err", err)
rc.cleanupMounts(volume)
continue
}
gvl := &globalVolumeInfo{
volumeName: reconstructedVolume.volumeName,
volumeSpec: reconstructedVolume.volumeSpec,
devicePath: reconstructedVolume.devicePath,
deviceMounter: reconstructedVolume.deviceMounter,
blockVolumeMapper: reconstructedVolume.blockVolumeMapper,
mounter: reconstructedVolume.mounter,
}
if cachedInfo, ok := volumesNeedUpdate[reconstructedVolume.volumeName]; ok {
gvl = cachedInfo
}
gvl.addPodVolume(reconstructedVolume)
if volumeInDSW {
// Some pod needs the volume. And it exists on disk. Some previous
// kubelet must have created the directory, therefore it must have
// reported the volume as in use. Mark the volume as in use also in
// this new kubelet so reconcile() calls SetUp and re-mounts the
// volume if it's necessary.
volumeNeedReport = append(volumeNeedReport, reconstructedVolume.volumeName)
rc.skippedDuringReconstruction[reconstructedVolume.volumeName] = gvl
klog.V(4).InfoS("Volume exists in desired state, marking as InUse", "podName", volume.podName, "volumeSpecName", volume.volumeSpecName)
continue
}
// There is no pod that uses the volume.
if rc.operationExecutor.IsOperationPending(reconstructedVolume.volumeName, nestedpendingoperations.EmptyUniquePodName, nestedpendingoperations.EmptyNodeName) {
klog.InfoS("Volume is in pending operation, skip cleaning up mounts")
}
klog.V(2).InfoS("Reconciler sync states: could not find pod information in desired state, update it in actual state", "reconstructedVolume", reconstructedVolume)
volumesNeedUpdate[reconstructedVolume.volumeName] = gvl
}
if len(volumesNeedUpdate) > 0 {
if err = rc.updateStates(volumesNeedUpdate); err != nil {
klog.ErrorS(err, "Error occurred during reconstruct volume from disk")
}
}
if len(volumeNeedReport) > 0 {
rc.desiredStateOfWorld.MarkVolumesReportedInUse(volumeNeedReport)
}
}
func (rc *reconciler) cleanupMounts(volume podVolume) {
klog.V(2).InfoS("Reconciler sync states: could not find volume information in desired state, clean up the mount points", "podName", volume.podName, "volumeSpecName", volume.volumeSpecName)
mountedVolume := operationexecutor.MountedVolume{
PodName: volume.podName,
VolumeName: v1.UniqueVolumeName(volume.volumeSpecName),
InnerVolumeSpecName: volume.volumeSpecName,
PluginName: volume.pluginName,
PodUID: types.UID(volume.podName),
}
// TODO: Currently cleanupMounts only includes UnmountVolume operation. In the next PR, we will add
// to unmount both volume and device in the same routine.
err := rc.operationExecutor.UnmountVolume(mountedVolume, rc.actualStateOfWorld, rc.kubeletPodsDir)
if err != nil {
klog.ErrorS(err, mountedVolume.GenerateErrorDetailed("volumeHandler.UnmountVolumeHandler for UnmountVolume failed", err).Error())
return
}
}
// Reconstruct volume data structure by reading the pod's volume directories
func (rc *reconciler) reconstructVolume(volume podVolume) (*reconstructedVolume, error) {
// plugin initializations
plugin, err := rc.volumePluginMgr.FindPluginByName(volume.pluginName)
if err != nil {
return nil, err
}
// Create pod object
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
UID: types.UID(volume.podName),
},
}
mapperPlugin, err := rc.volumePluginMgr.FindMapperPluginByName(volume.pluginName)
if err != nil {
return nil, err
}
if volume.volumeMode == v1.PersistentVolumeBlock && mapperPlugin == nil {
return nil, fmt.Errorf("could not find block volume plugin %q (spec.Name: %q) pod %q (UID: %q)", volume.pluginName, volume.volumeSpecName, volume.podName, pod.UID)
}
volumeSpec, err := rc.operationExecutor.ReconstructVolumeOperation(
volume.volumeMode,
plugin,
mapperPlugin,
pod.UID,
volume.podName,
volume.volumeSpecName,
volume.volumePath,
volume.pluginName)
if err != nil {
return nil, err
}
// We have to find the plugins by volume spec (NOT by plugin name) here
// in order to correctly reconstruct ephemeral volume types.
// Searching by spec checks whether the volume is actually attachable
// (i.e. has a PV) whereas searching by plugin name can only tell whether
// the plugin supports attachable volumes.
attachablePlugin, err := rc.volumePluginMgr.FindAttachablePluginBySpec(volumeSpec)
if err != nil {
return nil, err
}
deviceMountablePlugin, err := rc.volumePluginMgr.FindDeviceMountablePluginBySpec(volumeSpec)
if err != nil {
return nil, err
}
var uniqueVolumeName v1.UniqueVolumeName
if attachablePlugin != nil || deviceMountablePlugin != nil {
uniqueVolumeName, err = util.GetUniqueVolumeNameFromSpec(plugin, volumeSpec)
if err != nil {
return nil, err
}
} else {
uniqueVolumeName = util.GetUniqueVolumeNameFromSpecWithPod(volume.podName, plugin, volumeSpec)
}
var volumeMapper volumepkg.BlockVolumeMapper
var volumeMounter volumepkg.Mounter
var deviceMounter volumepkg.DeviceMounter
// Path to the mount or block device to check
var checkPath string
if volume.volumeMode == v1.PersistentVolumeBlock {
var newMapperErr error
volumeMapper, newMapperErr = mapperPlugin.NewBlockVolumeMapper(
volumeSpec,
pod,
volumepkg.VolumeOptions{})
if newMapperErr != nil {
return nil, fmt.Errorf(
"reconstructVolume.NewBlockVolumeMapper failed for volume %q (spec.Name: %q) pod %q (UID: %q) with: %v",
uniqueVolumeName,
volumeSpec.Name(),
volume.podName,
pod.UID,
newMapperErr)
}
mapDir, linkName := volumeMapper.GetPodDeviceMapPath()
checkPath = filepath.Join(mapDir, linkName)
} else {
var err error
volumeMounter, err = plugin.NewMounter(
volumeSpec,
pod,
volumepkg.VolumeOptions{})
if err != nil {
return nil, fmt.Errorf(
"reconstructVolume.NewMounter failed for volume %q (spec.Name: %q) pod %q (UID: %q) with: %v",
uniqueVolumeName,
volumeSpec.Name(),
volume.podName,
pod.UID,
err)
}
checkPath = volumeMounter.GetPath()
if deviceMountablePlugin != nil {
deviceMounter, err = deviceMountablePlugin.NewDeviceMounter()
if err != nil {
return nil, fmt.Errorf("reconstructVolume.NewDeviceMounter failed for volume %q (spec.Name: %q) pod %q (UID: %q) with: %v",
uniqueVolumeName,
volumeSpec.Name(),
volume.podName,
pod.UID,
err)
}
}
}
// Check existence of mount point for filesystem volume or symbolic link for block volume
isExist, checkErr := rc.operationExecutor.CheckVolumeExistenceOperation(volumeSpec, checkPath, volumeSpec.Name(), rc.mounter, uniqueVolumeName, volume.podName, pod.UID, attachablePlugin)
if checkErr != nil {
return nil, checkErr
}
// If mount or symlink doesn't exist, volume reconstruction should be failed
if !isExist {
return nil, fmt.Errorf("volume: %q is not mounted", uniqueVolumeName)
}
reconstructedVolume := &reconstructedVolume{
volumeName: uniqueVolumeName,
podName: volume.podName,
volumeSpec: volumeSpec,
// volume.volumeSpecName is actually InnerVolumeSpecName. It will not be used
// for volume cleanup.
// in case pod is added back to desired state, outerVolumeSpecName will be updated from dsw information.
// See issue #103143 and its fix for details.
outerVolumeSpecName: volume.volumeSpecName,
pod: pod,
deviceMounter: deviceMounter,
volumeGidValue: "",
// devicePath is updated during updateStates() by checking node status's VolumesAttached data.
// TODO: get device path directly from the volume mount path.
devicePath: "",
mounter: volumeMounter,
blockVolumeMapper: volumeMapper,
}
return reconstructedVolume, nil
}
// updateDevicePath gets the node status to retrieve volume device path information.
func (rc *reconciler) updateDevicePath(volumesNeedUpdate map[v1.UniqueVolumeName]*globalVolumeInfo) {
node, fetchErr := rc.kubeClient.CoreV1().Nodes().Get(context.TODO(), string(rc.nodeName), metav1.GetOptions{})
if fetchErr != nil {
klog.ErrorS(fetchErr, "UpdateStates in reconciler: could not get node status with error")
} else {
for _, attachedVolume := range node.Status.VolumesAttached {
if volume, exists := volumesNeedUpdate[attachedVolume.Name]; exists {
volume.devicePath = attachedVolume.DevicePath
volumesNeedUpdate[attachedVolume.Name] = volume
klog.V(4).InfoS("Update devicePath from node status for volume", "volumeName", attachedVolume.Name, "path", volume.devicePath)
}
}
}
}
// getDeviceMountPath returns device mount path for block volume which
// implements BlockVolumeMapper or filesystem volume which implements
// DeviceMounter
func getDeviceMountPath(gvi *globalVolumeInfo) (string, error) {
if gvi.blockVolumeMapper != nil {
// for block gvi, we return its global map path
return gvi.blockVolumeMapper.GetGlobalMapPath(gvi.volumeSpec)
} else if gvi.deviceMounter != nil {
// for filesystem gvi, we return its device mount path if the plugin implements DeviceMounter
return gvi.deviceMounter.GetDeviceMountPath(gvi.volumeSpec)
} else {
return "", fmt.Errorf("blockVolumeMapper or deviceMounter required")
}
}
func (rc *reconciler) updateStates(volumesNeedUpdate map[v1.UniqueVolumeName]*globalVolumeInfo) error {
// Get the node status to retrieve volume device path information.
// Skip reporting devicePath in node objects if kubeClient is nil.
// In standalone mode, kubelet is not expected to mount any attachable volume types or secret, configmaps etc.
if rc.kubeClient != nil {
rc.updateDevicePath(volumesNeedUpdate)
}
for _, gvl := range volumesNeedUpdate {
err := rc.actualStateOfWorld.MarkVolumeAsAttached(
//TODO: the devicePath might not be correct for some volume plugins: see issue #54108
gvl.volumeName, gvl.volumeSpec, "" /* nodeName */, gvl.devicePath)
if err != nil {
klog.ErrorS(err, "Could not add volume information to actual state of world", "volumeName", gvl.volumeName)
continue
}
for _, volume := range gvl.podVolumes {
err = rc.markVolumeState(volume, operationexecutor.VolumeMounted)
if err != nil {
klog.ErrorS(err, "Could not add pod to volume information to actual state of world", "pod", klog.KObj(volume.pod))
continue
}
klog.V(4).InfoS("Volume is marked as mounted and added into the actual state", "pod", klog.KObj(volume.pod), "podName", volume.podName, "volumeName", volume.volumeName)
}
// If the volume has device to mount, we mark its device as mounted.
if gvl.deviceMounter != nil || gvl.blockVolumeMapper != nil {
deviceMountPath, err := getDeviceMountPath(gvl)
if err != nil {
klog.ErrorS(err, "Could not find device mount path for volume", "volumeName", gvl.volumeName)
continue
}
// TODO(jsafrane): add reconstructed SELinux context
err = rc.actualStateOfWorld.MarkDeviceAsMounted(gvl.volumeName, gvl.devicePath, deviceMountPath, "")
if err != nil {
klog.ErrorS(err, "Could not mark device is mounted to actual state of world", "volume", gvl.volumeName)
continue
}
klog.V(4).InfoS("Volume is marked device as mounted and added into the actual state", "volumeName", gvl.volumeName)
}
}
return nil
}
func (rc *reconciler) markVolumeState(volume *reconstructedVolume, volumeState operationexecutor.VolumeMountState) error {
markVolumeOpts := operationexecutor.MarkVolumeOpts{
PodName: volume.podName,
PodUID: types.UID(volume.podName),
VolumeName: volume.volumeName,
Mounter: volume.mounter,
BlockVolumeMapper: volume.blockVolumeMapper,
OuterVolumeSpecName: volume.outerVolumeSpecName,
VolumeGidVolume: volume.volumeGidValue,
VolumeSpec: volume.volumeSpec,
VolumeMountState: volumeState,
}
err := rc.actualStateOfWorld.MarkVolumeAsMounted(markVolumeOpts)
return err
}
// getVolumesFromPodDir scans through the volumes directories under the given pod directory.
// It returns a list of pod volume information including pod's uid, volume's plugin name, mount path,
// and volume spec name.
func getVolumesFromPodDir(podDir string) ([]podVolume, error) {
podsDirInfo, err := os.ReadDir(podDir)
if err != nil {
return nil, err
}
volumes := []podVolume{}
for i := range podsDirInfo {
if !podsDirInfo[i].IsDir() {
continue
}
podName := podsDirInfo[i].Name()
podDir := path.Join(podDir, podName)
// Find filesystem volume information
// ex. filesystem volume: /pods/{podUid}/volume/{escapeQualifiedPluginName}/{volumeName}
volumesDirs := map[v1.PersistentVolumeMode]string{
v1.PersistentVolumeFilesystem: path.Join(podDir, config.DefaultKubeletVolumesDirName),
}
// Find block volume information
// ex. block volume: /pods/{podUid}/volumeDevices/{escapeQualifiedPluginName}/{volumeName}
volumesDirs[v1.PersistentVolumeBlock] = path.Join(podDir, config.DefaultKubeletVolumeDevicesDirName)
for volumeMode, volumesDir := range volumesDirs {
var volumesDirInfo []fs.DirEntry
if volumesDirInfo, err = os.ReadDir(volumesDir); err != nil {
// Just skip the loop because given volumesDir doesn't exist depending on volumeMode
continue
}
for _, volumeDir := range volumesDirInfo {
pluginName := volumeDir.Name()
volumePluginPath := path.Join(volumesDir, pluginName)
volumePluginDirs, err := utilpath.ReadDirNoStat(volumePluginPath)
if err != nil {
klog.ErrorS(err, "Could not read volume plugin directory", "volumePluginPath", volumePluginPath)
continue
}
unescapePluginName := utilstrings.UnescapeQualifiedName(pluginName)
for _, volumeName := range volumePluginDirs {
volumePath := path.Join(volumePluginPath, volumeName)
klog.V(5).InfoS("Volume path from volume plugin directory", "podName", podName, "volumePath", volumePath)
volumes = append(volumes, podVolume{
podName: volumetypes.UniquePodName(podName),
volumeSpecName: volumeName,
volumePath: volumePath,
pluginName: unescapePluginName,
volumeMode: volumeMode,
})
}
}
}
}
klog.V(4).InfoS("Get volumes from pod directory", "path", podDir, "volumes", volumes)
return volumes, nil
}
// ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
func isExpectedError(err error) bool {
return nestedpendingoperations.IsAlreadyExists(err) || exponentialbackoff.IsExponentialBackoff(err) || operationexecutor.IsMountFailedPreconditionError(err)
}

View File

@ -0,0 +1,316 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package reconciler
import (
"fmt"
"time"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/types"
utilfeature "k8s.io/apiserver/pkg/util/feature"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/kubelet/volumemanager/cache"
"k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff"
volumepkg "k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/util/hostutil"
"k8s.io/kubernetes/pkg/volume/util/nestedpendingoperations"
"k8s.io/kubernetes/pkg/volume/util/operationexecutor"
"k8s.io/mount-utils"
)
// Reconciler runs a periodic loop to reconcile the desired state of the world
// with the actual state of the world by triggering attach, detach, mount, and
// unmount operations.
// Note: This is distinct from the Reconciler implemented by the attach/detach
// controller. This reconciles state for the kubelet volume manager. That
// reconciles state for the attach/detach controller.
type Reconciler interface {
// Starts running the reconciliation loop which executes periodically, checks
// if volumes that should be mounted are mounted and volumes that should
// be unmounted are unmounted. If not, it will trigger mount/unmount
// operations to rectify.
// If attach/detach management is enabled, the manager will also check if
// volumes that should be attached are attached and volumes that should
// be detached are detached and trigger attach/detach operations as needed.
Run(stopCh <-chan struct{})
// StatesHasBeenSynced returns true only after syncStates process starts to sync
// states at least once after kubelet starts
StatesHasBeenSynced() bool
}
// NewReconciler returns a new instance of Reconciler.
//
// controllerAttachDetachEnabled - if true, indicates that the attach/detach
//
// controller is responsible for managing the attach/detach operations for
// this node, and therefore the volume manager should not
//
// loopSleepDuration - the amount of time the reconciler loop sleeps between
//
// successive executions
//
// waitForAttachTimeout - the amount of time the Mount function will wait for
//
// the volume to be attached
//
// nodeName - the Name for this node, used by Attach and Detach methods
// desiredStateOfWorld - cache containing the desired state of the world
// actualStateOfWorld - cache containing the actual state of the world
// populatorHasAddedPods - checker for whether the populator has finished
//
// adding pods to the desiredStateOfWorld cache at least once after sources
// are all ready (before sources are ready, pods are probably missing)
//
// operationExecutor - used to trigger attach/detach/mount/unmount operations
//
// safely (prevents more than one operation from being triggered on the same
// volume)
//
// mounter - mounter passed in from kubelet, passed down unmount path
// hostutil - hostutil passed in from kubelet
// volumePluginMgr - volume plugin manager passed from kubelet
func NewReconciler(
kubeClient clientset.Interface,
controllerAttachDetachEnabled bool,
loopSleepDuration time.Duration,
waitForAttachTimeout time.Duration,
nodeName types.NodeName,
desiredStateOfWorld cache.DesiredStateOfWorld,
actualStateOfWorld cache.ActualStateOfWorld,
populatorHasAddedPods func() bool,
operationExecutor operationexecutor.OperationExecutor,
mounter mount.Interface,
hostutil hostutil.HostUtils,
volumePluginMgr *volumepkg.VolumePluginMgr,
kubeletPodsDir string) Reconciler {
return &reconciler{
kubeClient: kubeClient,
controllerAttachDetachEnabled: controllerAttachDetachEnabled,
loopSleepDuration: loopSleepDuration,
waitForAttachTimeout: waitForAttachTimeout,
nodeName: nodeName,
desiredStateOfWorld: desiredStateOfWorld,
actualStateOfWorld: actualStateOfWorld,
populatorHasAddedPods: populatorHasAddedPods,
operationExecutor: operationExecutor,
mounter: mounter,
hostutil: hostutil,
skippedDuringReconstruction: map[v1.UniqueVolumeName]*globalVolumeInfo{},
volumePluginMgr: volumePluginMgr,
kubeletPodsDir: kubeletPodsDir,
timeOfLastSync: time.Time{},
volumesFailedReconstruction: make([]podVolume, 0),
volumesNeedDevicePath: make([]v1.UniqueVolumeName, 0),
volumesNeedReportedInUse: make([]v1.UniqueVolumeName, 0),
}
}
type reconciler struct {
kubeClient clientset.Interface
controllerAttachDetachEnabled bool
loopSleepDuration time.Duration
waitForAttachTimeout time.Duration
nodeName types.NodeName
desiredStateOfWorld cache.DesiredStateOfWorld
actualStateOfWorld cache.ActualStateOfWorld
populatorHasAddedPods func() bool
operationExecutor operationexecutor.OperationExecutor
mounter mount.Interface
hostutil hostutil.HostUtils
volumePluginMgr *volumepkg.VolumePluginMgr
skippedDuringReconstruction map[v1.UniqueVolumeName]*globalVolumeInfo
kubeletPodsDir string
timeOfLastSync time.Time
volumesFailedReconstruction []podVolume
volumesNeedDevicePath []v1.UniqueVolumeName
volumesNeedReportedInUse []v1.UniqueVolumeName
}
func (rc *reconciler) Run(stopCh <-chan struct{}) {
if utilfeature.DefaultFeatureGate.Enabled(features.SELinuxMountReadWriteOncePod) {
rc.runNew(stopCh)
return
}
rc.runOld(stopCh)
}
func (rc *reconciler) unmountVolumes() {
// Ensure volumes that should be unmounted are unmounted.
for _, mountedVolume := range rc.actualStateOfWorld.GetAllMountedVolumes() {
if !rc.desiredStateOfWorld.PodExistsInVolume(mountedVolume.PodName, mountedVolume.VolumeName, mountedVolume.SELinuxMountContext) {
// Volume is mounted, unmount it
klog.V(5).InfoS(mountedVolume.GenerateMsgDetailed("Starting operationExecutor.UnmountVolume", ""))
err := rc.operationExecutor.UnmountVolume(
mountedVolume.MountedVolume, rc.actualStateOfWorld, rc.kubeletPodsDir)
if err != nil && !isExpectedError(err) {
klog.ErrorS(err, mountedVolume.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.UnmountVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error())
}
if err == nil {
klog.InfoS(mountedVolume.GenerateMsgDetailed("operationExecutor.UnmountVolume started", ""))
}
}
}
}
func (rc *reconciler) mountOrAttachVolumes() {
// Ensure volumes that should be attached/mounted are attached/mounted.
for _, volumeToMount := range rc.desiredStateOfWorld.GetVolumesToMount() {
volMounted, devicePath, err := rc.actualStateOfWorld.PodExistsInVolume(volumeToMount.PodName, volumeToMount.VolumeName, volumeToMount.PersistentVolumeSize, volumeToMount.SELinuxLabel)
volumeToMount.DevicePath = devicePath
if cache.IsSELinuxMountMismatchError(err) {
// The volume is mounted, but with an unexpected SELinux context.
// It will get unmounted in unmountVolumes / unmountDetachDevices and
// then removed from actualStateOfWorld.
rc.desiredStateOfWorld.AddErrorToPod(volumeToMount.PodName, err.Error())
continue
} else if cache.IsVolumeNotAttachedError(err) {
rc.waitForVolumeAttach(volumeToMount)
} else if !volMounted || cache.IsRemountRequiredError(err) {
rc.mountAttachedVolumes(volumeToMount, err)
} else if cache.IsFSResizeRequiredError(err) {
fsResizeRequiredErr, _ := err.(cache.FsResizeRequiredError)
rc.expandVolume(volumeToMount, fsResizeRequiredErr.CurrentSize)
}
}
}
func (rc *reconciler) expandVolume(volumeToMount cache.VolumeToMount, currentSize resource.Quantity) {
klog.V(4).InfoS(volumeToMount.GenerateMsgDetailed("Starting operationExecutor.ExpandInUseVolume", ""), "pod", klog.KObj(volumeToMount.Pod))
err := rc.operationExecutor.ExpandInUseVolume(volumeToMount.VolumeToMount, rc.actualStateOfWorld, currentSize)
if err != nil && !isExpectedError(err) {
klog.ErrorS(err, volumeToMount.GenerateErrorDetailed("operationExecutor.ExpandInUseVolume failed", err).Error(), "pod", klog.KObj(volumeToMount.Pod))
}
if err == nil {
klog.V(4).InfoS(volumeToMount.GenerateMsgDetailed("operationExecutor.ExpandInUseVolume started", ""), "pod", klog.KObj(volumeToMount.Pod))
}
}
func (rc *reconciler) mountAttachedVolumes(volumeToMount cache.VolumeToMount, podExistError error) {
// Volume is not mounted, or is already mounted, but requires remounting
remountingLogStr := ""
isRemount := cache.IsRemountRequiredError(podExistError)
if isRemount {
remountingLogStr = "Volume is already mounted to pod, but remount was requested."
}
klog.V(4).InfoS(volumeToMount.GenerateMsgDetailed("Starting operationExecutor.MountVolume", remountingLogStr), "pod", klog.KObj(volumeToMount.Pod))
err := rc.operationExecutor.MountVolume(
rc.waitForAttachTimeout,
volumeToMount.VolumeToMount,
rc.actualStateOfWorld,
isRemount)
if err != nil && !isExpectedError(err) {
klog.ErrorS(err, volumeToMount.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.MountVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error(), "pod", klog.KObj(volumeToMount.Pod))
}
if err == nil {
if remountingLogStr == "" {
klog.V(1).InfoS(volumeToMount.GenerateMsgDetailed("operationExecutor.MountVolume started", remountingLogStr), "pod", klog.KObj(volumeToMount.Pod))
} else {
klog.V(5).InfoS(volumeToMount.GenerateMsgDetailed("operationExecutor.MountVolume started", remountingLogStr), "pod", klog.KObj(volumeToMount.Pod))
}
}
}
func (rc *reconciler) waitForVolumeAttach(volumeToMount cache.VolumeToMount) {
if rc.controllerAttachDetachEnabled || !volumeToMount.PluginIsAttachable {
//// lets not spin a goroutine and unnecessarily trigger exponential backoff if this happens
if volumeToMount.PluginIsAttachable && !volumeToMount.ReportedInUse {
klog.V(5).InfoS(volumeToMount.GenerateMsgDetailed("operationExecutor.VerifyControllerAttachedVolume failed", " volume not marked in-use"), "pod", klog.KObj(volumeToMount.Pod))
return
}
// Volume is not attached (or doesn't implement attacher), kubelet attach is disabled, wait
// for controller to finish attaching volume.
klog.V(5).InfoS(volumeToMount.GenerateMsgDetailed("Starting operationExecutor.VerifyControllerAttachedVolume", ""), "pod", klog.KObj(volumeToMount.Pod))
err := rc.operationExecutor.VerifyControllerAttachedVolume(
volumeToMount.VolumeToMount,
rc.nodeName,
rc.actualStateOfWorld)
if err != nil && !isExpectedError(err) {
klog.ErrorS(err, volumeToMount.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.VerifyControllerAttachedVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error(), "pod", klog.KObj(volumeToMount.Pod))
}
if err == nil {
klog.InfoS(volumeToMount.GenerateMsgDetailed("operationExecutor.VerifyControllerAttachedVolume started", ""), "pod", klog.KObj(volumeToMount.Pod))
}
} else {
// Volume is not attached to node, kubelet attach is enabled, volume implements an attacher,
// so attach it
volumeToAttach := operationexecutor.VolumeToAttach{
VolumeName: volumeToMount.VolumeName,
VolumeSpec: volumeToMount.VolumeSpec,
NodeName: rc.nodeName,
}
klog.V(5).InfoS(volumeToAttach.GenerateMsgDetailed("Starting operationExecutor.AttachVolume", ""), "pod", klog.KObj(volumeToMount.Pod))
err := rc.operationExecutor.AttachVolume(volumeToAttach, rc.actualStateOfWorld)
if err != nil && !isExpectedError(err) {
klog.ErrorS(err, volumeToMount.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.AttachVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error(), "pod", klog.KObj(volumeToMount.Pod))
}
if err == nil {
klog.InfoS(volumeToMount.GenerateMsgDetailed("operationExecutor.AttachVolume started", ""), "pod", klog.KObj(volumeToMount.Pod))
}
}
}
func (rc *reconciler) unmountDetachDevices() {
for _, attachedVolume := range rc.actualStateOfWorld.GetUnmountedVolumes() {
// Check IsOperationPending to avoid marking a volume as detached if it's in the process of mounting.
if !rc.desiredStateOfWorld.VolumeExists(attachedVolume.VolumeName, attachedVolume.SELinuxMountContext) &&
!rc.operationExecutor.IsOperationPending(attachedVolume.VolumeName, nestedpendingoperations.EmptyUniquePodName, nestedpendingoperations.EmptyNodeName) {
if attachedVolume.DeviceMayBeMounted() {
// Volume is globally mounted to device, unmount it
klog.V(5).InfoS(attachedVolume.GenerateMsgDetailed("Starting operationExecutor.UnmountDevice", ""))
err := rc.operationExecutor.UnmountDevice(
attachedVolume.AttachedVolume, rc.actualStateOfWorld, rc.hostutil)
if err != nil && !isExpectedError(err) {
klog.ErrorS(err, attachedVolume.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.UnmountDevice failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error())
}
if err == nil {
klog.InfoS(attachedVolume.GenerateMsgDetailed("operationExecutor.UnmountDevice started", ""))
}
} else {
// Volume is attached to node, detach it
// Kubelet not responsible for detaching or this volume has a non-attachable volume plugin.
if rc.controllerAttachDetachEnabled || !attachedVolume.PluginIsAttachable {
rc.actualStateOfWorld.MarkVolumeAsDetached(attachedVolume.VolumeName, attachedVolume.NodeName)
klog.InfoS(attachedVolume.GenerateMsgDetailed("Volume detached", fmt.Sprintf("DevicePath %q", attachedVolume.DevicePath)))
} else {
// Only detach if kubelet detach is enabled
klog.V(5).InfoS(attachedVolume.GenerateMsgDetailed("Starting operationExecutor.DetachVolume", ""))
err := rc.operationExecutor.DetachVolume(
attachedVolume.AttachedVolume, false /* verifySafeToDetach */, rc.actualStateOfWorld)
if err != nil && !isExpectedError(err) {
klog.ErrorS(err, attachedVolume.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.DetachVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error())
}
if err == nil {
klog.InfoS(attachedVolume.GenerateMsgDetailed("operationExecutor.DetachVolume started", ""))
}
}
}
}
}
}
// ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
func isExpectedError(err error) bool {
return nestedpendingoperations.IsAlreadyExists(err) || exponentialbackoff.IsExponentialBackoff(err) || operationexecutor.IsMountFailedPreconditionError(err)
}

View File

@ -0,0 +1,69 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package reconciler
import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"
)
// TODO: move to reconciler.go and remove old code there when SELinuxMountReadWriteOncePod is GA
// TODO: Replace Run() when SELinuxMountReadWriteOncePod is GA
func (rc *reconciler) runNew(stopCh <-chan struct{}) {
rc.reconstructVolumes()
klog.InfoS("Reconciler: start to sync state")
wait.Until(rc.reconcileNew, rc.loopSleepDuration, stopCh)
}
func (rc *reconciler) reconcileNew() {
readyToUnmount := rc.readyToUnmount()
if readyToUnmount {
// Unmounts are triggered before mounts so that a volume that was
// referenced by a pod that was deleted and is now referenced by another
// pod is unmounted from the first pod before being mounted to the new
// pod.
rc.unmountVolumes()
}
// Next we mount required volumes. This function could also trigger
// attach if kubelet is responsible for attaching volumes.
// If underlying PVC was resized while in-use then this function also handles volume
// resizing.
rc.mountOrAttachVolumes()
// Unmount volumes only when DSW and ASW are fully populated to prevent unmounting a volume
// that is still needed, but it did not reach DSW yet.
if readyToUnmount {
// Ensure devices that should be detached/unmounted are detached/unmounted.
rc.unmountDetachDevices()
// Clean up any orphan volumes that failed reconstruction.
rc.cleanOrphanVolumes()
}
if len(rc.volumesNeedDevicePath) != 0 {
rc.updateReconstructedDevicePaths()
}
if len(rc.volumesNeedReportedInUse) != 0 && rc.populatorHasAddedPods() {
// Once DSW is populated, mark all reconstructed as reported in node.status,
// so they can proceed with MountDevice / SetUp.
rc.desiredStateOfWorld.MarkVolumesReportedInUse(rc.volumesNeedReportedInUse)
rc.volumesNeedReportedInUse = nil
}
}

View File

@ -0,0 +1,189 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package reconciler
import (
"context"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/volume/util/nestedpendingoperations"
"k8s.io/kubernetes/pkg/volume/util/operationexecutor"
)
// sync process tries to observe the real world by scanning all pods' volume directories from the disk.
// If the actual and desired state of worlds are not consistent with the observed world, it means that some
// mounted volumes are left out probably during kubelet restart. This process will reconstruct
// the volumes and update the actual and desired states. For the volumes that cannot support reconstruction,
// it will try to clean up the mount paths with operation executor.
func (rc *reconciler) sync() {
defer rc.updateLastSyncTime()
rc.syncStates(rc.kubeletPodsDir)
}
// syncStates scans the volume directories under the given pod directory.
// If the volume is not in desired state of world, this function will reconstruct
// the volume related information and put it in both the actual and desired state of worlds.
// For some volume plugins that cannot support reconstruction, it will clean up the existing
// mount points since the volume is no long needed (removed from desired state)
func (rc *reconciler) syncStates(kubeletPodDir string) {
// Get volumes information by reading the pod's directory
podVolumes, err := getVolumesFromPodDir(kubeletPodDir)
if err != nil {
klog.ErrorS(err, "Cannot get volumes from disk, skip sync states for volume reconstruction")
return
}
volumesNeedUpdate := make(map[v1.UniqueVolumeName]*globalVolumeInfo)
volumeNeedReport := []v1.UniqueVolumeName{}
for _, volume := range podVolumes {
if rc.actualStateOfWorld.VolumeExistsWithSpecName(volume.podName, volume.volumeSpecName) {
klog.V(4).InfoS("Volume exists in actual state, skip cleaning up mounts", "podName", volume.podName, "volumeSpecName", volume.volumeSpecName)
// There is nothing to reconstruct
continue
}
volumeInDSW := rc.desiredStateOfWorld.VolumeExistsWithSpecName(volume.podName, volume.volumeSpecName)
reconstructedVolume, err := rc.reconstructVolume(volume)
if err != nil {
if volumeInDSW {
// Some pod needs the volume, don't clean it up and hope that
// reconcile() calls SetUp and reconstructs the volume in ASW.
klog.V(4).InfoS("Volume exists in desired state, skip cleaning up mounts", "podName", volume.podName, "volumeSpecName", volume.volumeSpecName)
continue
}
// No pod needs the volume.
klog.InfoS("Could not construct volume information, cleaning up mounts", "podName", volume.podName, "volumeSpecName", volume.volumeSpecName, "err", err)
rc.cleanupMounts(volume)
continue
}
gvl := &globalVolumeInfo{
volumeName: reconstructedVolume.volumeName,
volumeSpec: reconstructedVolume.volumeSpec,
devicePath: reconstructedVolume.devicePath,
deviceMounter: reconstructedVolume.deviceMounter,
blockVolumeMapper: reconstructedVolume.blockVolumeMapper,
mounter: reconstructedVolume.mounter,
}
if cachedInfo, ok := volumesNeedUpdate[reconstructedVolume.volumeName]; ok {
gvl = cachedInfo
}
gvl.addPodVolume(reconstructedVolume)
if volumeInDSW {
// Some pod needs the volume. And it exists on disk. Some previous
// kubelet must have created the directory, therefore it must have
// reported the volume as in use. Mark the volume as in use also in
// this new kubelet so reconcile() calls SetUp and re-mounts the
// volume if it's necessary.
volumeNeedReport = append(volumeNeedReport, reconstructedVolume.volumeName)
rc.skippedDuringReconstruction[reconstructedVolume.volumeName] = gvl
klog.V(4).InfoS("Volume exists in desired state, marking as InUse", "podName", volume.podName, "volumeSpecName", volume.volumeSpecName)
continue
}
// There is no pod that uses the volume.
if rc.operationExecutor.IsOperationPending(reconstructedVolume.volumeName, nestedpendingoperations.EmptyUniquePodName, nestedpendingoperations.EmptyNodeName) {
klog.InfoS("Volume is in pending operation, skip cleaning up mounts")
}
klog.V(2).InfoS("Reconciler sync states: could not find pod information in desired state, update it in actual state", "reconstructedVolume", reconstructedVolume)
volumesNeedUpdate[reconstructedVolume.volumeName] = gvl
}
if len(volumesNeedUpdate) > 0 {
if err = rc.updateStates(volumesNeedUpdate); err != nil {
klog.ErrorS(err, "Error occurred during reconstruct volume from disk")
}
}
if len(volumeNeedReport) > 0 {
rc.desiredStateOfWorld.MarkVolumesReportedInUse(volumeNeedReport)
}
}
// updateDevicePath gets the node status to retrieve volume device path information.
func (rc *reconciler) updateDevicePath(volumesNeedUpdate map[v1.UniqueVolumeName]*globalVolumeInfo) {
node, fetchErr := rc.kubeClient.CoreV1().Nodes().Get(context.TODO(), string(rc.nodeName), metav1.GetOptions{})
if fetchErr != nil {
klog.ErrorS(fetchErr, "UpdateStates in reconciler: could not get node status with error")
} else {
for _, attachedVolume := range node.Status.VolumesAttached {
if volume, exists := volumesNeedUpdate[attachedVolume.Name]; exists {
volume.devicePath = attachedVolume.DevicePath
volumesNeedUpdate[attachedVolume.Name] = volume
klog.V(4).InfoS("Update devicePath from node status for volume", "volumeName", attachedVolume.Name, "path", volume.devicePath)
}
}
}
}
func (rc *reconciler) updateStates(volumesNeedUpdate map[v1.UniqueVolumeName]*globalVolumeInfo) error {
// Get the node status to retrieve volume device path information.
// Skip reporting devicePath in node objects if kubeClient is nil.
// In standalone mode, kubelet is not expected to mount any attachable volume types or secret, configmaps etc.
if rc.kubeClient != nil {
rc.updateDevicePath(volumesNeedUpdate)
}
for _, gvl := range volumesNeedUpdate {
err := rc.actualStateOfWorld.MarkVolumeAsAttached(
//TODO: the devicePath might not be correct for some volume plugins: see issue #54108
gvl.volumeName, gvl.volumeSpec, rc.nodeName, gvl.devicePath)
if err != nil {
klog.ErrorS(err, "Could not add volume information to actual state of world", "volumeName", gvl.volumeName)
continue
}
for _, volume := range gvl.podVolumes {
err = rc.markVolumeState(volume, operationexecutor.VolumeMounted)
if err != nil {
klog.ErrorS(err, "Could not add pod to volume information to actual state of world", "pod", klog.KObj(volume.pod))
continue
}
klog.V(2).InfoS("Volume is marked as mounted and added into the actual state", "pod", klog.KObj(volume.pod), "podName", volume.podName, "volumeName", volume.volumeName)
}
// If the volume has device to mount, we mark its device as mounted.
if gvl.deviceMounter != nil || gvl.blockVolumeMapper != nil {
deviceMountPath, err := getDeviceMountPath(gvl)
if err != nil {
klog.ErrorS(err, "Could not find device mount path for volume", "volumeName", gvl.volumeName)
continue
}
// TODO(jsafrane): add reconstructed SELinux context
err = rc.actualStateOfWorld.MarkDeviceAsMounted(gvl.volumeName, gvl.devicePath, deviceMountPath, "")
if err != nil {
klog.ErrorS(err, "Could not mark device is mounted to actual state of world", "volume", gvl.volumeName)
continue
}
klog.V(2).InfoS("Volume is marked device as mounted and added into the actual state", "volumeName", gvl.volumeName)
}
}
return nil
}
func (rc *reconciler) markVolumeState(volume *reconstructedVolume, volumeState operationexecutor.VolumeMountState) error {
markVolumeOpts := operationexecutor.MarkVolumeOpts{
PodName: volume.podName,
PodUID: types.UID(volume.podName),
VolumeName: volume.volumeName,
Mounter: volume.mounter,
BlockVolumeMapper: volume.blockVolumeMapper,
OuterVolumeSpecName: volume.outerVolumeSpecName,
VolumeGidVolume: volume.volumeGidValue,
VolumeSpec: volume.volumeSpec,
VolumeMountState: volumeState,
}
err := rc.actualStateOfWorld.MarkVolumeAsMounted(markVolumeOpts)
return err
}

View File

@ -0,0 +1,320 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package reconciler
import (
"fmt"
"io/fs"
"os"
"path"
"path/filepath"
"time"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/kubelet/config"
volumepkg "k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/util"
"k8s.io/kubernetes/pkg/volume/util/operationexecutor"
volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
utilpath "k8s.io/utils/path"
utilstrings "k8s.io/utils/strings"
)
type podVolume struct {
podName volumetypes.UniquePodName
volumeSpecName string
volumePath string
pluginName string
volumeMode v1.PersistentVolumeMode
}
type reconstructedVolume struct {
volumeName v1.UniqueVolumeName
podName volumetypes.UniquePodName
volumeSpec *volumepkg.Spec
outerVolumeSpecName string
pod *v1.Pod
volumeGidValue string
devicePath string
mounter volumepkg.Mounter
deviceMounter volumepkg.DeviceMounter
blockVolumeMapper volumepkg.BlockVolumeMapper
}
// globalVolumeInfo stores reconstructed volume information
// for each pod that was using that volume.
type globalVolumeInfo struct {
volumeName v1.UniqueVolumeName
volumeSpec *volumepkg.Spec
devicePath string
mounter volumepkg.Mounter
deviceMounter volumepkg.DeviceMounter
blockVolumeMapper volumepkg.BlockVolumeMapper
podVolumes map[volumetypes.UniquePodName]*reconstructedVolume
}
func (rc *reconciler) updateLastSyncTime() {
rc.timeOfLastSync = time.Now()
}
func (rc *reconciler) StatesHasBeenSynced() bool {
return !rc.timeOfLastSync.IsZero()
}
func (gvi *globalVolumeInfo) addPodVolume(rcv *reconstructedVolume) {
if gvi.podVolumes == nil {
gvi.podVolumes = map[volumetypes.UniquePodName]*reconstructedVolume{}
}
gvi.podVolumes[rcv.podName] = rcv
}
func (rc *reconciler) cleanupMounts(volume podVolume) {
klog.V(2).InfoS("Reconciler sync states: could not find volume information in desired state, clean up the mount points", "podName", volume.podName, "volumeSpecName", volume.volumeSpecName)
mountedVolume := operationexecutor.MountedVolume{
PodName: volume.podName,
VolumeName: v1.UniqueVolumeName(volume.volumeSpecName),
InnerVolumeSpecName: volume.volumeSpecName,
PluginName: volume.pluginName,
PodUID: types.UID(volume.podName),
}
// TODO: Currently cleanupMounts only includes UnmountVolume operation. In the next PR, we will add
// to unmount both volume and device in the same routine.
err := rc.operationExecutor.UnmountVolume(mountedVolume, rc.actualStateOfWorld, rc.kubeletPodsDir)
if err != nil {
klog.ErrorS(err, mountedVolume.GenerateErrorDetailed("volumeHandler.UnmountVolumeHandler for UnmountVolume failed", err).Error())
return
}
}
// getDeviceMountPath returns device mount path for block volume which
// implements BlockVolumeMapper or filesystem volume which implements
// DeviceMounter
func getDeviceMountPath(gvi *globalVolumeInfo) (string, error) {
if gvi.blockVolumeMapper != nil {
// for block gvi, we return its global map path
return gvi.blockVolumeMapper.GetGlobalMapPath(gvi.volumeSpec)
} else if gvi.deviceMounter != nil {
// for filesystem gvi, we return its device mount path if the plugin implements DeviceMounter
return gvi.deviceMounter.GetDeviceMountPath(gvi.volumeSpec)
} else {
return "", fmt.Errorf("blockVolumeMapper or deviceMounter required")
}
}
// getVolumesFromPodDir scans through the volumes directories under the given pod directory.
// It returns a list of pod volume information including pod's uid, volume's plugin name, mount path,
// and volume spec name.
func getVolumesFromPodDir(podDir string) ([]podVolume, error) {
podsDirInfo, err := os.ReadDir(podDir)
if err != nil {
return nil, err
}
volumes := []podVolume{}
for i := range podsDirInfo {
if !podsDirInfo[i].IsDir() {
continue
}
podName := podsDirInfo[i].Name()
podDir := path.Join(podDir, podName)
// Find filesystem volume information
// ex. filesystem volume: /pods/{podUid}/volume/{escapeQualifiedPluginName}/{volumeName}
volumesDirs := map[v1.PersistentVolumeMode]string{
v1.PersistentVolumeFilesystem: path.Join(podDir, config.DefaultKubeletVolumesDirName),
}
// Find block volume information
// ex. block volume: /pods/{podUid}/volumeDevices/{escapeQualifiedPluginName}/{volumeName}
volumesDirs[v1.PersistentVolumeBlock] = path.Join(podDir, config.DefaultKubeletVolumeDevicesDirName)
for volumeMode, volumesDir := range volumesDirs {
var volumesDirInfo []fs.DirEntry
if volumesDirInfo, err = os.ReadDir(volumesDir); err != nil {
// Just skip the loop because given volumesDir doesn't exist depending on volumeMode
continue
}
for _, volumeDir := range volumesDirInfo {
pluginName := volumeDir.Name()
volumePluginPath := path.Join(volumesDir, pluginName)
volumePluginDirs, err := utilpath.ReadDirNoStat(volumePluginPath)
if err != nil {
klog.ErrorS(err, "Could not read volume plugin directory", "volumePluginPath", volumePluginPath)
continue
}
unescapePluginName := utilstrings.UnescapeQualifiedName(pluginName)
for _, volumeName := range volumePluginDirs {
volumePath := path.Join(volumePluginPath, volumeName)
klog.V(5).InfoS("Volume path from volume plugin directory", "podName", podName, "volumePath", volumePath)
volumes = append(volumes, podVolume{
podName: volumetypes.UniquePodName(podName),
volumeSpecName: volumeName,
volumePath: volumePath,
pluginName: unescapePluginName,
volumeMode: volumeMode,
})
}
}
}
}
klog.V(4).InfoS("Get volumes from pod directory", "path", podDir, "volumes", volumes)
return volumes, nil
}
// Reconstruct volume data structure by reading the pod's volume directories
func (rc *reconciler) reconstructVolume(volume podVolume) (*reconstructedVolume, error) {
// plugin initializations
plugin, err := rc.volumePluginMgr.FindPluginByName(volume.pluginName)
if err != nil {
return nil, err
}
// Create pod object
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
UID: types.UID(volume.podName),
},
}
mapperPlugin, err := rc.volumePluginMgr.FindMapperPluginByName(volume.pluginName)
if err != nil {
return nil, err
}
if volume.volumeMode == v1.PersistentVolumeBlock && mapperPlugin == nil {
return nil, fmt.Errorf("could not find block volume plugin %q (spec.Name: %q) pod %q (UID: %q)", volume.pluginName, volume.volumeSpecName, volume.podName, pod.UID)
}
reconstructed, err := rc.operationExecutor.ReconstructVolumeOperation(
volume.volumeMode,
plugin,
mapperPlugin,
pod.UID,
volume.podName,
volume.volumeSpecName,
volume.volumePath,
volume.pluginName)
if err != nil {
return nil, err
}
volumeSpec := reconstructed.Spec
// We have to find the plugins by volume spec (NOT by plugin name) here
// in order to correctly reconstruct ephemeral volume types.
// Searching by spec checks whether the volume is actually attachable
// (i.e. has a PV) whereas searching by plugin name can only tell whether
// the plugin supports attachable volumes.
attachablePlugin, err := rc.volumePluginMgr.FindAttachablePluginBySpec(volumeSpec)
if err != nil {
return nil, err
}
deviceMountablePlugin, err := rc.volumePluginMgr.FindDeviceMountablePluginBySpec(volumeSpec)
if err != nil {
return nil, err
}
var uniqueVolumeName v1.UniqueVolumeName
if attachablePlugin != nil || deviceMountablePlugin != nil {
uniqueVolumeName, err = util.GetUniqueVolumeNameFromSpec(plugin, volumeSpec)
if err != nil {
return nil, err
}
} else {
uniqueVolumeName = util.GetUniqueVolumeNameFromSpecWithPod(volume.podName, plugin, volumeSpec)
}
var volumeMapper volumepkg.BlockVolumeMapper
var volumeMounter volumepkg.Mounter
var deviceMounter volumepkg.DeviceMounter
// Path to the mount or block device to check
var checkPath string
if volume.volumeMode == v1.PersistentVolumeBlock {
var newMapperErr error
volumeMapper, newMapperErr = mapperPlugin.NewBlockVolumeMapper(
volumeSpec,
pod,
volumepkg.VolumeOptions{})
if newMapperErr != nil {
return nil, fmt.Errorf(
"reconstructVolume.NewBlockVolumeMapper failed for volume %q (spec.Name: %q) pod %q (UID: %q) with: %v",
uniqueVolumeName,
volumeSpec.Name(),
volume.podName,
pod.UID,
newMapperErr)
}
mapDir, linkName := volumeMapper.GetPodDeviceMapPath()
checkPath = filepath.Join(mapDir, linkName)
} else {
var err error
volumeMounter, err = plugin.NewMounter(
volumeSpec,
pod,
volumepkg.VolumeOptions{})
if err != nil {
return nil, fmt.Errorf(
"reconstructVolume.NewMounter failed for volume %q (spec.Name: %q) pod %q (UID: %q) with: %v",
uniqueVolumeName,
volumeSpec.Name(),
volume.podName,
pod.UID,
err)
}
checkPath = volumeMounter.GetPath()
if deviceMountablePlugin != nil {
deviceMounter, err = deviceMountablePlugin.NewDeviceMounter()
if err != nil {
return nil, fmt.Errorf("reconstructVolume.NewDeviceMounter failed for volume %q (spec.Name: %q) pod %q (UID: %q) with: %v",
uniqueVolumeName,
volumeSpec.Name(),
volume.podName,
pod.UID,
err)
}
}
}
// Check existence of mount point for filesystem volume or symbolic link for block volume
isExist, checkErr := rc.operationExecutor.CheckVolumeExistenceOperation(volumeSpec, checkPath, volumeSpec.Name(), rc.mounter, uniqueVolumeName, volume.podName, pod.UID, attachablePlugin)
if checkErr != nil {
return nil, checkErr
}
// If mount or symlink doesn't exist, volume reconstruction should be failed
if !isExist {
return nil, fmt.Errorf("volume: %q is not mounted", uniqueVolumeName)
}
reconstructedVolume := &reconstructedVolume{
volumeName: uniqueVolumeName,
podName: volume.podName,
volumeSpec: volumeSpec,
// volume.volumeSpecName is actually InnerVolumeSpecName. It will not be used
// for volume cleanup.
// in case pod is added back to desired state, outerVolumeSpecName will be updated from dsw information.
// See issue #103143 and its fix for details.
outerVolumeSpecName: volume.volumeSpecName,
pod: pod,
deviceMounter: deviceMounter,
volumeGidValue: "",
// devicePath is updated during updateStates() by checking node status's VolumesAttached data.
// TODO: get device path directly from the volume mount path.
devicePath: "",
mounter: volumeMounter,
blockVolumeMapper: volumeMapper,
}
return reconstructedVolume, nil
}

View File

@ -0,0 +1,198 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package reconciler
import (
"context"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/volume/util/operationexecutor"
)
// TODO: move to reconstruct.go and remove old code there.
// readyToUnmount returns true when reconciler can start unmounting volumes.
func (rc *reconciler) readyToUnmount() bool {
// During kubelet startup, all volumes present on disk are added as uncertain to ASW.
// Allow unmount only when DSW is fully populated to prevent unmounting volumes that
// did not reach DSW yet.
if !rc.populatorHasAddedPods() {
return false
}
// Allow unmount only when ASW device paths were corrected from node.status to prevent
// calling unmount with a wrong devicePath.
if len(rc.volumesNeedDevicePath) != 0 {
return false
}
return true
}
// reconstructVolumes tries to reconstruct the actual state of world by scanning all pods' volume
// directories from the disk. For the volumes that cannot support or fail reconstruction, it will
// put the volumes to volumesFailedReconstruction to be cleaned up later when DesiredStateOfWorld
// is populated.
func (rc *reconciler) reconstructVolumes() {
defer rc.updateLastSyncTime()
// Get volumes information by reading the pod's directory
podVolumes, err := getVolumesFromPodDir(rc.kubeletPodsDir)
if err != nil {
klog.ErrorS(err, "Cannot get volumes from disk, skip sync states for volume reconstruction")
return
}
reconstructedVolumes := make(map[v1.UniqueVolumeName]*globalVolumeInfo)
reconstructedVolumeNames := []v1.UniqueVolumeName{}
for _, volume := range podVolumes {
if rc.actualStateOfWorld.VolumeExistsWithSpecName(volume.podName, volume.volumeSpecName) {
klog.V(4).InfoS("Volume exists in actual state, skip cleaning up mounts", "podName", volume.podName, "volumeSpecName", volume.volumeSpecName)
// There is nothing to reconstruct
continue
}
reconstructedVolume, err := rc.reconstructVolume(volume)
if err != nil {
klog.InfoS("Could not construct volume information", "podName", volume.podName, "volumeSpecName", volume.volumeSpecName, "err", err)
// We can't reconstruct the volume. Remember to check DSW after it's fully populated and force unmount the volume when it's orphaned.
rc.volumesFailedReconstruction = append(rc.volumesFailedReconstruction, volume)
continue
}
klog.V(4).InfoS("Adding reconstructed volume to actual state and node status", "podName", volume.podName, "volumeSpecName", volume.volumeSpecName)
gvl := &globalVolumeInfo{
volumeName: reconstructedVolume.volumeName,
volumeSpec: reconstructedVolume.volumeSpec,
devicePath: reconstructedVolume.devicePath,
deviceMounter: reconstructedVolume.deviceMounter,
blockVolumeMapper: reconstructedVolume.blockVolumeMapper,
mounter: reconstructedVolume.mounter,
}
if cachedInfo, ok := reconstructedVolumes[reconstructedVolume.volumeName]; ok {
gvl = cachedInfo
}
gvl.addPodVolume(reconstructedVolume)
reconstructedVolumeNames = append(reconstructedVolumeNames, reconstructedVolume.volumeName)
reconstructedVolumes[reconstructedVolume.volumeName] = gvl
}
if len(reconstructedVolumes) > 0 {
// Add the volumes to ASW
rc.updateStatesNew(reconstructedVolumes)
// The reconstructed volumes are mounted, hence a previous kubelet must have already put it into node.status.volumesInUse.
// Remember to update DSW with this information.
rc.volumesNeedReportedInUse = reconstructedVolumeNames
// Remember to update devicePath from node.status.volumesAttached
rc.volumesNeedDevicePath = reconstructedVolumeNames
}
klog.V(2).InfoS("Volume reconstruction finished")
}
func (rc *reconciler) updateStatesNew(reconstructedVolumes map[v1.UniqueVolumeName]*globalVolumeInfo) {
for _, gvl := range reconstructedVolumes {
err := rc.actualStateOfWorld.MarkVolumeAsAttached(
//TODO: the devicePath might not be correct for some volume plugins: see issue #54108
gvl.volumeName, gvl.volumeSpec, rc.nodeName, gvl.devicePath)
if err != nil {
klog.ErrorS(err, "Could not add volume information to actual state of world", "volumeName", gvl.volumeName)
continue
}
for _, volume := range gvl.podVolumes {
markVolumeOpts := operationexecutor.MarkVolumeOpts{
PodName: volume.podName,
PodUID: types.UID(volume.podName),
VolumeName: volume.volumeName,
Mounter: volume.mounter,
BlockVolumeMapper: volume.blockVolumeMapper,
OuterVolumeSpecName: volume.outerVolumeSpecName,
VolumeGidVolume: volume.volumeGidValue,
VolumeSpec: volume.volumeSpec,
VolumeMountState: operationexecutor.VolumeMountUncertain,
}
_, err = rc.actualStateOfWorld.CheckAndMarkVolumeAsUncertainViaReconstruction(markVolumeOpts)
if err != nil {
klog.ErrorS(err, "Could not add pod to volume information to actual state of world", "pod", klog.KObj(volume.pod))
continue
}
klog.V(2).InfoS("Volume is marked as uncertain and added into the actual state", "pod", klog.KObj(volume.pod), "podName", volume.podName, "volumeName", volume.volumeName)
}
// If the volume has device to mount, we mark its device as uncertain.
if gvl.deviceMounter != nil || gvl.blockVolumeMapper != nil {
deviceMountPath, err := getDeviceMountPath(gvl)
if err != nil {
klog.ErrorS(err, "Could not find device mount path for volume", "volumeName", gvl.volumeName)
continue
}
err = rc.actualStateOfWorld.MarkDeviceAsUncertain(gvl.volumeName, gvl.devicePath, deviceMountPath, "")
if err != nil {
klog.ErrorS(err, "Could not mark device is uncertain to actual state of world", "volumeName", gvl.volumeName, "deviceMountPath", deviceMountPath)
continue
}
klog.V(2).InfoS("Volume is marked device as uncertain and added into the actual state", "volumeName", gvl.volumeName, "deviceMountPath", deviceMountPath)
}
}
}
// cleanOrphanVolumes tries to clean up all volumes that failed reconstruction.
func (rc *reconciler) cleanOrphanVolumes() {
if len(rc.volumesFailedReconstruction) == 0 {
return
}
for _, volume := range rc.volumesFailedReconstruction {
if rc.desiredStateOfWorld.VolumeExistsWithSpecName(volume.podName, volume.volumeSpecName) {
// Some pod needs the volume, don't clean it up and hope that
// reconcile() calls SetUp and reconstructs the volume in ASW.
klog.V(4).InfoS("Volume exists in desired state, skip cleaning up mounts", "podName", volume.podName, "volumeSpecName", volume.volumeSpecName)
continue
}
klog.InfoS("Cleaning up mounts for volume that could not be reconstructed", "podName", volume.podName, "volumeSpecName", volume.volumeSpecName)
rc.cleanupMounts(volume)
}
klog.V(2).InfoS("Orphan volume cleanup finished")
// Clean the cache, cleanup is one shot operation.
rc.volumesFailedReconstruction = make([]podVolume, 0)
}
// updateReconstructedDevicePaths tries to file devicePaths of reconstructed volumes from
// node.Status.VolumesAttached. This can be done only after connection to the API
// server is established, i.e. it can't be part of reconstructVolumes().
func (rc *reconciler) updateReconstructedDevicePaths() {
klog.V(4).InfoS("Updating reconstructed devicePaths")
node, fetchErr := rc.kubeClient.CoreV1().Nodes().Get(context.TODO(), string(rc.nodeName), metav1.GetOptions{})
if fetchErr != nil {
// This may repeat few times per second until kubelet is able to read its own status for the first time.
klog.V(2).ErrorS(fetchErr, "Failed to get Node status to reconstruct device paths")
return
}
for _, volumeID := range rc.volumesNeedDevicePath {
for _, attachedVolume := range node.Status.VolumesAttached {
if volumeID != attachedVolume.Name {
continue
}
rc.actualStateOfWorld.UpdateReconstructedDevicePath(volumeID, attachedVolume.DevicePath)
klog.V(4).InfoS("Updated devicePath from node status for volume", "volumeName", attachedVolume.Name, "path", attachedVolume.DevicePath)
}
}
klog.V(2).InfoS("DevicePaths of reconstructed volumes updated")
rc.volumesNeedDevicePath = nil
}

View File

@ -0,0 +1,385 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package reconciler
import (
"fmt"
"os"
"path"
"path/filepath"
"reflect"
"testing"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
utilfeature "k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/volume"
volumetesting "k8s.io/kubernetes/pkg/volume/testing"
"k8s.io/kubernetes/pkg/volume/util"
)
func TestReconstructVolumes(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SELinuxMountReadWriteOncePod, true)()
tests := []struct {
name string
volumePaths []string
expectedVolumesNeedReportedInUse []string
expectedVolumesNeedDevicePath []string
expectedVolumesFailedReconstruction []string
verifyFunc func(rcInstance *reconciler, fakePlugin *volumetesting.FakeVolumePlugin) error
}{
{
name: "when two pods are using same volume and both are deleted",
volumePaths: []string{
path.Join("pod1", "volumes", "fake-plugin", "pvc-abcdef"),
path.Join("pod2", "volumes", "fake-plugin", "pvc-abcdef"),
},
expectedVolumesNeedReportedInUse: []string{"fake-plugin/pvc-abcdef", "fake-plugin/pvc-abcdef"},
expectedVolumesNeedDevicePath: []string{"fake-plugin/pvc-abcdef", "fake-plugin/pvc-abcdef"},
expectedVolumesFailedReconstruction: []string{},
verifyFunc: func(rcInstance *reconciler, fakePlugin *volumetesting.FakeVolumePlugin) error {
mountedPods := rcInstance.actualStateOfWorld.GetMountedVolumes()
if len(mountedPods) != 0 {
return fmt.Errorf("expected 0 certain pods in asw got %d", len(mountedPods))
}
allPods := rcInstance.actualStateOfWorld.GetAllMountedVolumes()
if len(allPods) != 2 {
return fmt.Errorf("expected 2 uncertain pods in asw got %d", len(allPods))
}
volumes := rcInstance.actualStateOfWorld.GetPossiblyMountedVolumesForPod("pod1")
if len(volumes) != 1 {
return fmt.Errorf("expected 1 uncertain volume in asw got %d", len(volumes))
}
// The volume should be marked as reconstructed in ASW
if reconstructed := rcInstance.actualStateOfWorld.IsVolumeReconstructed("fake-plugin/pvc-abcdef", "pod1"); !reconstructed {
t.Errorf("expected volume to be marked as reconstructed, got %v", reconstructed)
}
return nil
},
},
{
name: "when reconstruction fails for a volume, volumes should be cleaned up",
volumePaths: []string{
path.Join("pod1", "volumes", "missing-plugin", "pvc-abcdef"),
},
expectedVolumesNeedReportedInUse: []string{},
expectedVolumesNeedDevicePath: []string{},
expectedVolumesFailedReconstruction: []string{"pvc-abcdef"},
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
tmpKubeletDir, err := os.MkdirTemp("", "")
if err != nil {
t.Fatalf("can't make a temp directory for kubeletPods: %v", err)
}
defer os.RemoveAll(tmpKubeletDir)
// create kubelet pod directory
tmpKubeletPodDir := filepath.Join(tmpKubeletDir, "pods")
os.MkdirAll(tmpKubeletPodDir, 0755)
mountPaths := []string{}
// create pod and volume directories so as reconciler can find them.
for _, volumePath := range tc.volumePaths {
vp := filepath.Join(tmpKubeletPodDir, volumePath)
mountPaths = append(mountPaths, vp)
os.MkdirAll(vp, 0755)
}
rc, fakePlugin := getReconciler(tmpKubeletDir, t, mountPaths)
rcInstance, _ := rc.(*reconciler)
// Act
rcInstance.reconstructVolumes()
// Assert
// Convert to []UniqueVolumeName
expectedVolumes := make([]v1.UniqueVolumeName, len(tc.expectedVolumesNeedDevicePath))
for i := range tc.expectedVolumesNeedDevicePath {
expectedVolumes[i] = v1.UniqueVolumeName(tc.expectedVolumesNeedDevicePath[i])
}
if !reflect.DeepEqual(expectedVolumes, rcInstance.volumesNeedDevicePath) {
t.Errorf("Expected expectedVolumesNeedDevicePath:\n%v\n got:\n%v", expectedVolumes, rcInstance.volumesNeedDevicePath)
}
expectedVolumes = make([]v1.UniqueVolumeName, len(tc.expectedVolumesNeedReportedInUse))
for i := range tc.expectedVolumesNeedReportedInUse {
expectedVolumes[i] = v1.UniqueVolumeName(tc.expectedVolumesNeedReportedInUse[i])
}
if !reflect.DeepEqual(expectedVolumes, rcInstance.volumesNeedReportedInUse) {
t.Errorf("Expected volumesNeedReportedInUse:\n%v\n got:\n%v", expectedVolumes, rcInstance.volumesNeedReportedInUse)
}
volumesFailedReconstruction := sets.NewString()
for _, vol := range rcInstance.volumesFailedReconstruction {
volumesFailedReconstruction.Insert(vol.volumeSpecName)
}
if !reflect.DeepEqual(volumesFailedReconstruction.List(), tc.expectedVolumesFailedReconstruction) {
t.Errorf("Expected volumesFailedReconstruction:\n%v\n got:\n%v", tc.expectedVolumesFailedReconstruction, volumesFailedReconstruction.List())
}
if tc.verifyFunc != nil {
if err := tc.verifyFunc(rcInstance, fakePlugin); err != nil {
t.Errorf("Test %s failed: %v", tc.name, err)
}
}
})
}
}
func TestCleanOrphanVolumes(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SELinuxMountReadWriteOncePod, true)()
type podInfo struct {
podName string
podUID string
outerVolumeName string
innerVolumeName string
}
defaultPodInfo := podInfo{
podName: "pod1",
podUID: "pod1uid",
outerVolumeName: "volume-name",
innerVolumeName: "volume-name",
}
defaultVolume := podVolume{
podName: "pod1uid",
volumeSpecName: "volume-name",
volumePath: "",
pluginName: "fake-plugin",
volumeMode: v1.PersistentVolumeFilesystem,
}
tests := []struct {
name string
podInfos []podInfo
volumesFailedReconstruction []podVolume
expectedUnmounts int
}{
{
name: "volume is in DSW and is not cleaned",
podInfos: []podInfo{defaultPodInfo},
volumesFailedReconstruction: []podVolume{defaultVolume},
expectedUnmounts: 0,
},
{
name: "volume is not in DSW and is cleaned",
podInfos: []podInfo{},
volumesFailedReconstruction: []podVolume{defaultVolume},
expectedUnmounts: 1,
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
// Arrange
tmpKubeletDir, err := os.MkdirTemp("", "")
if err != nil {
t.Fatalf("can't make a temp directory for kubeletPods: %v", err)
}
defer os.RemoveAll(tmpKubeletDir)
// create kubelet pod directory
tmpKubeletPodDir := filepath.Join(tmpKubeletDir, "pods")
os.MkdirAll(tmpKubeletPodDir, 0755)
mountPaths := []string{}
rc, fakePlugin := getReconciler(tmpKubeletDir, t, mountPaths)
rcInstance, _ := rc.(*reconciler)
rcInstance.volumesFailedReconstruction = tc.volumesFailedReconstruction
for _, tpodInfo := range tc.podInfos {
pod := getInlineFakePod(tpodInfo.podName, tpodInfo.podUID, tpodInfo.outerVolumeName, tpodInfo.innerVolumeName)
volumeSpec := &volume.Spec{Volume: &pod.Spec.Volumes[0]}
podName := util.GetUniquePodName(pod)
volumeName, err := rcInstance.desiredStateOfWorld.AddPodToVolume(
podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */, nil /* SELinuxContext */)
if err != nil {
t.Fatalf("Error adding volume %s to dsow: %v", volumeSpec.Name(), err)
}
rcInstance.actualStateOfWorld.MarkVolumeAsAttached(volumeName, volumeSpec, nodeName, "")
}
// Act
rcInstance.cleanOrphanVolumes()
// Assert
if len(rcInstance.volumesFailedReconstruction) != 0 {
t.Errorf("Expected volumesFailedReconstruction to be empty, got %+v", rcInstance.volumesFailedReconstruction)
}
// Unmount runs in a go routine, wait for its finish
var lastErr error
err = retryWithExponentialBackOff(testOperationBackOffDuration, func() (bool, error) {
if err := verifyTearDownCalls(fakePlugin, tc.expectedUnmounts); err != nil {
lastErr = err
return false, nil
}
return true, nil
})
if err != nil {
t.Errorf("Error waiting for volumes to get unmounted: %s: %s", err, lastErr)
}
})
}
}
func verifyTearDownCalls(plugin *volumetesting.FakeVolumePlugin, expected int) error {
unmounters := plugin.GetUnmounters()
if len(unmounters) == 0 && (expected == 0) {
return nil
}
actualCallCount := 0
for _, unmounter := range unmounters {
actualCallCount = unmounter.GetTearDownCallCount()
if actualCallCount == expected {
return nil
}
}
return fmt.Errorf("expected TearDown calls %d, got %d", expected, actualCallCount)
}
func TestReconstructVolumesMount(t *testing.T) {
// This test checks volume reconstruction + subsequent failed mount.
// Since the volume is reconstructed, it must be marked as uncertain
// even after a final SetUp error, see https://github.com/kubernetes/kubernetes/issues/96635
// and https://github.com/kubernetes/kubernetes/pull/110670.
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SELinuxMountReadWriteOncePod, true)()
tests := []struct {
name string
volumePath string
expectMount bool
}{
{
name: "reconstructed volume is mounted",
volumePath: path.Join("pod1uid", "volumes", "fake-plugin", "volumename"),
expectMount: true,
},
{
name: "reconstructed volume fails to mount",
// FailOnSetupVolumeName: MountDevice succeeds, SetUp fails
volumePath: path.Join("pod1uid", "volumes", "fake-plugin", volumetesting.FailOnSetupVolumeName),
expectMount: false,
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
tmpKubeletDir, err := os.MkdirTemp("", "")
if err != nil {
t.Fatalf("can't make a temp directory for kubeletPods: %v", err)
}
defer os.RemoveAll(tmpKubeletDir)
// create kubelet pod directory
tmpKubeletPodDir := filepath.Join(tmpKubeletDir, "pods")
os.MkdirAll(tmpKubeletPodDir, 0755)
// create pod and volume directories so as reconciler can find them.
vp := filepath.Join(tmpKubeletPodDir, tc.volumePath)
mountPaths := []string{vp}
os.MkdirAll(vp, 0755)
rc, fakePlugin := getReconciler(tmpKubeletDir, t, mountPaths)
rcInstance, _ := rc.(*reconciler)
// Act 1 - reconstruction
rcInstance.reconstructVolumes()
// Assert 1 - the volume is Uncertain
mountedPods := rcInstance.actualStateOfWorld.GetMountedVolumes()
if len(mountedPods) != 0 {
t.Errorf("expected 0 mounted volumes, got %+v", mountedPods)
}
allPods := rcInstance.actualStateOfWorld.GetAllMountedVolumes()
if len(allPods) != 1 {
t.Errorf("expected 1 uncertain volume in asw, got %+v", allPods)
}
// Arrange 2 - populate DSW
outerName := filepath.Base(tc.volumePath)
pod := getInlineFakePod("pod1", "pod1uid", outerName, outerName)
volumeSpec := &volume.Spec{Volume: &pod.Spec.Volumes[0]}
podName := util.GetUniquePodName(pod)
volumeName, err := rcInstance.desiredStateOfWorld.AddPodToVolume(
podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */, nil /* SELinuxContext */)
if err != nil {
t.Fatalf("Error adding volume %s to dsow: %v", volumeSpec.Name(), err)
}
rcInstance.actualStateOfWorld.MarkVolumeAsAttached(volumeName, volumeSpec, nodeName, "")
rcInstance.populatorHasAddedPods = func() bool {
// Mark DSW populated to allow unmounting of volumes.
return true
}
// Mark devices paths as reconciled to allow unmounting of volumes.
rcInstance.volumesNeedDevicePath = nil
// Act 2 - reconcile once
rcInstance.reconcileNew()
// Assert 2
// MountDevice was attempted
var lastErr error
err = retryWithExponentialBackOff(testOperationBackOffDuration, func() (bool, error) {
// MountDevice should always be called and succeed
if err := volumetesting.VerifyMountDeviceCallCount(1, fakePlugin); err != nil {
lastErr = err
return false, nil
}
return true, nil
})
if err != nil {
t.Errorf("Error waiting for volumes to get mounted: %s: %s", err, lastErr)
}
if tc.expectMount {
// The volume should be fully mounted
waitForMount(t, fakePlugin, volumeName, rcInstance.actualStateOfWorld)
// SetUp was called and succeeded
if err := volumetesting.VerifySetUpCallCount(1, fakePlugin); err != nil {
t.Errorf("Expected SetUp() to be called, got %s", err)
}
} else {
// The test does not expect any change in ASW, yet it needs to wait for volume operations to finish
err = retryWithExponentialBackOff(testOperationBackOffDuration, func() (bool, error) {
return !rcInstance.operationExecutor.IsOperationPending(volumeName, "pod1uid", nodeName), nil
})
if err != nil {
t.Errorf("Error waiting for operation to get finished: %s", err)
}
// The volume is uncertain
mountedPods := rcInstance.actualStateOfWorld.GetMountedVolumes()
if len(mountedPods) != 0 {
t.Errorf("expected 0 mounted volumes after reconcile, got %+v", mountedPods)
}
allPods := rcInstance.actualStateOfWorld.GetAllMountedVolumes()
if len(allPods) != 1 {
t.Errorf("expected 1 mounted or uncertain volumes after reconcile, got %+v", allPods)
}
}
// Unmount was *not* attempted in any case
verifyTearDownCalls(fakePlugin, 0)
})
}
}

View File

@ -249,25 +249,27 @@ func getVolumeSource(
return nil, false, fmt.Errorf("Spec does not reference an AWS EBS volume type")
}
func (plugin *awsElasticBlockStorePlugin) ConstructVolumeSpec(volName, mountPath string) (*volume.Spec, error) {
func (plugin *awsElasticBlockStorePlugin) ConstructVolumeSpec(volName, mountPath string) (volume.ReconstructedVolume, error) {
mounter := plugin.host.GetMounter(plugin.GetPluginName())
kvh, ok := plugin.host.(volume.KubeletVolumeHost)
if !ok {
return nil, fmt.Errorf("plugin volume host does not implement KubeletVolumeHost interface")
return volume.ReconstructedVolume{}, fmt.Errorf("plugin volume host does not implement KubeletVolumeHost interface")
}
hu := kvh.GetHostUtil()
pluginMntDir := util.GetPluginMountDir(plugin.host, plugin.GetPluginName())
volumeID, err := hu.GetDeviceNameFromMount(mounter, mountPath, pluginMntDir)
if err != nil {
return nil, err
return volume.ReconstructedVolume{}, err
}
volumeID, err = formatVolumeID(volumeID)
if err != nil {
return nil, fmt.Errorf("failed to get AWS volume id from mount path %q: %v", mountPath, err)
return volume.ReconstructedVolume{}, fmt.Errorf("failed to get AWS volume id from mount path %q: %v", mountPath, err)
}
file := v1.PersistentVolumeFilesystem
return newAWSVolumeSpec(volName, volumeID, file), nil
return volume.ReconstructedVolume{
Spec: newAWSVolumeSpec(volName, volumeID, file),
}, nil
}
func (plugin *awsElasticBlockStorePlugin) RequiresFSResize() bool {

View File

@ -202,7 +202,7 @@ func (plugin *azureFilePlugin) ExpandVolumeDevice(
return newSize, nil
}
func (plugin *azureFilePlugin) ConstructVolumeSpec(volName, mountPath string) (*volume.Spec, error) {
func (plugin *azureFilePlugin) ConstructVolumeSpec(volName, mountPath string) (volume.ReconstructedVolume, error) {
azureVolume := &v1.Volume{
Name: volName,
VolumeSource: v1.VolumeSource{
@ -212,7 +212,9 @@ func (plugin *azureFilePlugin) ConstructVolumeSpec(volName, mountPath string) (*
},
},
}
return volume.NewSpecFromVolume(azureVolume), nil
return volume.ReconstructedVolume{
Spec: volume.NewSpecFromVolume(azureVolume),
}, nil
}
// azureFile volumes represent mount of an AzureFile share.

View File

@ -316,18 +316,18 @@ func (plugin *azureDataDiskPlugin) NodeExpand(resizeOptions volume.NodeResizeOpt
var _ volume.NodeExpandableVolumePlugin = &azureDataDiskPlugin{}
func (plugin *azureDataDiskPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) {
func (plugin *azureDataDiskPlugin) ConstructVolumeSpec(volumeName, mountPath string) (volume.ReconstructedVolume, error) {
mounter := plugin.host.GetMounter(plugin.GetPluginName())
kvh, ok := plugin.host.(volume.KubeletVolumeHost)
if !ok {
return nil, fmt.Errorf("plugin volume host does not implement KubeletVolumeHost interface")
return volume.ReconstructedVolume{}, fmt.Errorf("plugin volume host does not implement KubeletVolumeHost interface")
}
hu := kvh.GetHostUtil()
pluginMntDir := util.GetPluginMountDir(plugin.host, plugin.GetPluginName())
sourceName, err := hu.GetDeviceNameFromMount(mounter, mountPath, pluginMntDir)
if err != nil {
return nil, err
return volume.ReconstructedVolume{}, err
}
azureVolume := &v1.Volume{
@ -338,7 +338,9 @@ func (plugin *azureDataDiskPlugin) ConstructVolumeSpec(volumeName, mountPath str
},
},
}
return volume.NewSpecFromVolume(azureVolume), nil
return volume.ReconstructedVolume{
Spec: volume.NewSpecFromVolume(azureVolume),
}, nil
}
func (plugin *azureDataDiskPlugin) GetDeviceMountRefs(deviceMountPath string) ([]string, error) {

View File

@ -173,7 +173,7 @@ func (plugin *cephfsPlugin) newUnmounterInternal(volName string, podUID types.UI
}, nil
}
func (plugin *cephfsPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) {
func (plugin *cephfsPlugin) ConstructVolumeSpec(volumeName, mountPath string) (volume.ReconstructedVolume, error) {
cephfsVolume := &v1.Volume{
Name: volumeName,
VolumeSource: v1.VolumeSource{
@ -183,7 +183,9 @@ func (plugin *cephfsPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*
},
},
}
return volume.NewSpecFromVolume(cephfsVolume), nil
return volume.ReconstructedVolume{
Spec: volume.NewSpecFromVolume(cephfsVolume),
}, nil
}
// CephFS volumes represent a bare host file or directory mount of an CephFS export.

View File

@ -128,13 +128,13 @@ func TestConstructVolumeSpec(t *testing.T) {
t.Errorf("can't find cephfs plugin by name")
}
cephfsSpec, err := plug.(*cephfsPlugin).ConstructVolumeSpec("cephfsVolume", "/cephfsVolume/")
cephfsVol, err := plug.(*cephfsPlugin).ConstructVolumeSpec("cephfsVolume", "/cephfsVolume/")
if err != nil {
t.Errorf("ConstructVolumeSpec() failed: %v", err)
}
if cephfsSpec.Name() != "cephfsVolume" {
t.Errorf("Get wrong cephfs spec name, got: %s", cephfsSpec.Name())
if cephfsVol.Spec.Name() != "cephfsVolume" {
t.Errorf("Get wrong cephfs spec name, got: %s", cephfsVol.Spec.Name())
}
}

View File

@ -122,14 +122,16 @@ func (plugin *configMapPlugin) NewUnmounter(volName string, podUID types.UID) (v
}, nil
}
func (plugin *configMapPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) {
func (plugin *configMapPlugin) ConstructVolumeSpec(volumeName, mountPath string) (volume.ReconstructedVolume, error) {
configMapVolume := &v1.Volume{
Name: volumeName,
VolumeSource: v1.VolumeSource{
ConfigMap: &v1.ConfigMapVolumeSource{},
},
}
return volume.NewSpecFromVolume(configMapVolume), nil
return volume.ReconstructedVolume{
Spec: volume.NewSpecFromVolume(configMapVolume),
}, nil
}
type configMapVolume struct {

View File

@ -448,12 +448,12 @@ func (p *csiPlugin) NewUnmounter(specName string, podUID types.UID) (volume.Unmo
return unmounter, nil
}
func (p *csiPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) {
func (p *csiPlugin) ConstructVolumeSpec(volumeName, mountPath string) (volume.ReconstructedVolume, error) {
klog.V(4).Info(log("plugin.ConstructVolumeSpec [pv.Name=%v, path=%v]", volumeName, mountPath))
volData, err := loadVolumeData(mountPath, volDataFileName)
if err != nil {
return nil, errors.New(log("plugin.ConstructVolumeSpec failed loading volume data using [%s]: %v", mountPath, err))
return volume.ReconstructedVolume{}, errors.New(log("plugin.ConstructVolumeSpec failed loading volume data using [%s]: %v", mountPath, err))
}
klog.V(4).Info(log("plugin.ConstructVolumeSpec extracted [%#v]", volData))
@ -464,11 +464,13 @@ func (p *csiPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.S
// use constructPVSourceSpec to construct volume construct pv source spec.
if storage.VolumeLifecycleMode(volData[volDataKey.volumeLifecycleMode]) == storage.VolumeLifecycleEphemeral {
spec = p.constructVolSourceSpec(volData[volDataKey.specVolID], volData[volDataKey.driverName])
return spec, nil
return volume.ReconstructedVolume{Spec: spec}, nil
}
spec = p.constructPVSourceSpec(volData[volDataKey.specVolID], volData[volDataKey.driverName], volData[volDataKey.volHandle])
return spec, nil
spec = p.constructPVSourceSpec(volData[volDataKey.specVolID], volData[volDataKey.driverName], volData[volDataKey.volHandle])
return volume.ReconstructedVolume{
Spec: spec,
}, nil
}
// constructVolSourceSpec constructs volume.Spec with CSIVolumeSource

View File

@ -362,38 +362,38 @@ func TestPluginConstructVolumeSpec(t *testing.T) {
}
// rebuild spec
spec, err := plug.ConstructVolumeSpec("test-pv", filepath.Dir(csiMounter.GetPath()))
rec, err := plug.ConstructVolumeSpec("test-pv", filepath.Dir(csiMounter.GetPath()))
if err != nil {
t.Fatal(err)
}
if spec == nil {
if rec.Spec == nil {
t.Fatal("nil volume.Spec constructed")
}
// inspect spec
if spec.PersistentVolume == nil || spec.PersistentVolume.Spec.CSI == nil {
if rec.Spec.PersistentVolume == nil || rec.Spec.PersistentVolume.Spec.CSI == nil {
t.Fatal("CSIPersistentVolume not found in constructed spec ")
}
volHandle := spec.PersistentVolume.Spec.CSI.VolumeHandle
volHandle := rec.Spec.PersistentVolume.Spec.CSI.VolumeHandle
if volHandle != tc.originSpec.PersistentVolume.Spec.CSI.VolumeHandle {
t.Error("unexpected volumeHandle constructed:", volHandle)
}
driverName := spec.PersistentVolume.Spec.CSI.Driver
driverName := rec.Spec.PersistentVolume.Spec.CSI.Driver
if driverName != tc.originSpec.PersistentVolume.Spec.CSI.Driver {
t.Error("unexpected driverName constructed:", driverName)
}
if spec.PersistentVolume.Spec.VolumeMode == nil {
if rec.Spec.PersistentVolume.Spec.VolumeMode == nil {
t.Fatalf("Volume mode has not been set.")
}
if *spec.PersistentVolume.Spec.VolumeMode != api.PersistentVolumeFilesystem {
t.Errorf("Unexpected volume mode %q", *spec.PersistentVolume.Spec.VolumeMode)
if *rec.Spec.PersistentVolume.Spec.VolumeMode != api.PersistentVolumeFilesystem {
t.Errorf("Unexpected volume mode %q", *rec.Spec.PersistentVolume.Spec.VolumeMode)
}
if spec.Name() != tc.specVolID {
t.Errorf("Unexpected spec name constructed %s", spec.Name())
if rec.Spec.Name() != tc.specVolID {
t.Errorf("Unexpected spec name constructed %s", rec.Spec.Name())
}
})
}
@ -496,44 +496,44 @@ func TestPluginConstructVolumeSpecWithInline(t *testing.T) {
}
// rebuild spec
spec, err := plug.ConstructVolumeSpec("test-pv", filepath.Dir(csiMounter.GetPath()))
rec, err := plug.ConstructVolumeSpec("test-pv", filepath.Dir(csiMounter.GetPath()))
if err != nil {
t.Fatal(err)
}
if spec == nil {
if rec.Spec == nil {
t.Fatal("nil volume.Spec constructed")
}
if spec.Name() != tc.specVolID {
t.Errorf("unexpected spec name constructed volume.Spec: %s", spec.Name())
if rec.Spec.Name() != tc.specVolID {
t.Errorf("unexpected spec name constructed volume.Spec: %s", rec.Spec.Name())
}
switch {
case spec.Volume != nil:
if spec.Volume.CSI == nil {
case rec.Spec.Volume != nil:
if rec.Spec.Volume.CSI == nil {
t.Error("missing CSIVolumeSource in constructed volume.Spec")
}
if spec.Volume.CSI.Driver != tc.originSpec.Volume.CSI.Driver {
t.Error("unexpected driver in constructed volume source:", spec.Volume.CSI.Driver)
if rec.Spec.Volume.CSI.Driver != tc.originSpec.Volume.CSI.Driver {
t.Error("unexpected driver in constructed volume source:", rec.Spec.Volume.CSI.Driver)
}
case spec.PersistentVolume != nil:
if spec.PersistentVolume.Spec.CSI == nil {
case rec.Spec.PersistentVolume != nil:
if rec.Spec.PersistentVolume.Spec.CSI == nil {
t.Fatal("missing CSIPersistentVolumeSource in constructed volume.spec")
}
volHandle := spec.PersistentVolume.Spec.CSI.VolumeHandle
volHandle := rec.Spec.PersistentVolume.Spec.CSI.VolumeHandle
if volHandle != tc.originSpec.PersistentVolume.Spec.CSI.VolumeHandle {
t.Error("unexpected volumeHandle constructed in persistent volume source:", volHandle)
}
driverName := spec.PersistentVolume.Spec.CSI.Driver
driverName := rec.Spec.PersistentVolume.Spec.CSI.Driver
if driverName != tc.originSpec.PersistentVolume.Spec.CSI.Driver {
t.Error("unexpected driverName constructed in persistent volume source:", driverName)
}
if spec.PersistentVolume.Spec.VolumeMode == nil {
if rec.Spec.PersistentVolume.Spec.VolumeMode == nil {
t.Fatalf("Volume mode has not been set.")
}
if *spec.PersistentVolume.Spec.VolumeMode != api.PersistentVolumeFilesystem {
t.Errorf("Unexpected volume mode %q", *spec.PersistentVolume.Spec.VolumeMode)
if *rec.Spec.PersistentVolume.Spec.VolumeMode != api.PersistentVolumeFilesystem {
t.Errorf("Unexpected volume mode %q", *rec.Spec.PersistentVolume.Spec.VolumeMode)
}
default:
t.Fatal("invalid volume.Spec constructed")

View File

@ -443,14 +443,14 @@ func TestCSI_VolumeAll(t *testing.T) {
// ******** Volume Reconstruction ************* //
volPath := filepath.Dir(csiMounter.GetPath())
t.Log("csiTest.VolumeAll entering plugin.ConstructVolumeSpec for path", volPath)
spec, err := volPlug.ConstructVolumeSpec(test.volName, volPath)
rec, err := volPlug.ConstructVolumeSpec(test.volName, volPath)
if err != nil {
t.Fatalf("csiTest.VolumeAll plugin.ConstructVolumeSpec failed: %s", err)
} else {
if spec == nil {
if rec.Spec == nil {
t.Fatalf("csiTest.VolumeAll plugin.ConstructVolumeSpec returned nil spec")
} else {
volSpec = spec
volSpec = rec.Spec
if test.isInline {
if volSpec.Volume == nil || volSpec.Volume.CSI == nil {

View File

@ -123,14 +123,16 @@ func (plugin *downwardAPIPlugin) NewUnmounter(volName string, podUID types.UID)
}, nil
}
func (plugin *downwardAPIPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) {
func (plugin *downwardAPIPlugin) ConstructVolumeSpec(volumeName, mountPath string) (volume.ReconstructedVolume, error) {
downwardAPIVolume := &v1.Volume{
Name: volumeName,
VolumeSource: v1.VolumeSource{
DownwardAPI: &v1.DownwardAPIVolumeSource{},
},
}
return volume.NewSpecFromVolume(downwardAPIVolume), nil
return volume.ReconstructedVolume{
Spec: volume.NewSpecFromVolume(downwardAPIVolume),
}, nil
}
// downwardAPIVolume retrieves downward API data and placing them into the volume on the host.

View File

@ -188,14 +188,16 @@ func (plugin *emptyDirPlugin) newUnmounterInternal(volName string, podUID types.
return ed, nil
}
func (plugin *emptyDirPlugin) ConstructVolumeSpec(volName, mountPath string) (*volume.Spec, error) {
func (plugin *emptyDirPlugin) ConstructVolumeSpec(volName, mountPath string) (volume.ReconstructedVolume, error) {
emptyDirVolume := &v1.Volume{
Name: volName,
VolumeSource: v1.VolumeSource{
EmptyDir: &v1.EmptyDirVolumeSource{},
},
}
return volume.NewSpecFromVolume(emptyDirVolume), nil
return volume.ReconstructedVolume{
Spec: volume.NewSpecFromVolume(emptyDirVolume),
}, nil
}
// mountDetector abstracts how to find what kind of mount a path is backed by.

View File

@ -240,7 +240,7 @@ func (plugin *fcPlugin) newUnmapperInternal(volName string, podUID types.UID, ma
}, nil
}
func (plugin *fcPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) {
func (plugin *fcPlugin) ConstructVolumeSpec(volumeName, mountPath string) (volume.ReconstructedVolume, error) {
// Find globalPDPath from pod volume directory(mountPath)
// examples:
// mountPath: pods/{podUid}/volumes/kubernetes.io~fc/{volumeName}
@ -256,10 +256,10 @@ func (plugin *fcPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volu
if io.IsInconsistentReadError(err) {
klog.Errorf("Failed to read mount refs from /proc/mounts for %s: %s", mountPath, err)
klog.Errorf("Kubelet cannot unmount volume at %s, please unmount it manually", mountPath)
return nil, err
return volume.ReconstructedVolume{}, err
}
if err != nil {
return nil, err
return volume.ReconstructedVolume{}, err
}
for _, path := range paths {
if strings.Contains(path, plugin.host.GetPluginDir(fcPluginName)) {
@ -269,12 +269,12 @@ func (plugin *fcPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volu
}
// Couldn't fetch globalPDPath
if len(globalPDPath) == 0 {
return nil, fmt.Errorf("couldn't fetch globalPDPath. failed to obtain volume spec")
return volume.ReconstructedVolume{}, fmt.Errorf("couldn't fetch globalPDPath. failed to obtain volume spec")
}
wwns, lun, wwids, err := parsePDName(globalPDPath)
if err != nil {
return nil, fmt.Errorf("failed to retrieve volume plugin information from globalPDPath: %s", err)
return volume.ReconstructedVolume{}, fmt.Errorf("failed to retrieve volume plugin information from globalPDPath: %s", err)
}
// Create volume from wwn+lun or wwid
fcVolume := &v1.Volume{
@ -285,7 +285,9 @@ func (plugin *fcPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volu
}
klog.V(5).Infof("ConstructVolumeSpec: TargetWWNs: %v, Lun: %v, WWIDs: %v",
fcVolume.VolumeSource.FC.TargetWWNs, *fcVolume.VolumeSource.FC.Lun, fcVolume.VolumeSource.FC.WWIDs)
return volume.NewSpecFromVolume(fcVolume), nil
return volume.ReconstructedVolume{
Spec: volume.NewSpecFromVolume(fcVolume),
}, nil
}
// ConstructBlockVolumeSpec creates a new volume.Spec with following steps.

View File

@ -260,7 +260,7 @@ func (plugin *flexVolumeAttachablePlugin) CanDeviceMount(spec *volume.Spec) (boo
}
// ConstructVolumeSpec is part of the volume.AttachableVolumePlugin interface.
func (plugin *flexVolumePlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) {
func (plugin *flexVolumePlugin) ConstructVolumeSpec(volumeName, mountPath string) (volume.ReconstructedVolume, error) {
flexVolume := &api.Volume{
Name: volumeName,
VolumeSource: api.VolumeSource{
@ -269,7 +269,9 @@ func (plugin *flexVolumePlugin) ConstructVolumeSpec(volumeName, mountPath string
},
},
}
return volume.NewSpecFromVolume(flexVolume), nil
return volume.ReconstructedVolume{
Spec: volume.NewSpecFromVolume(flexVolume),
}, nil
}
func (plugin *flexVolumePlugin) SupportsMountOption() bool {

View File

@ -299,17 +299,17 @@ func (plugin *gcePersistentDiskPlugin) NodeExpand(resizeOptions volume.NodeResiz
var _ volume.NodeExpandableVolumePlugin = &gcePersistentDiskPlugin{}
func (plugin *gcePersistentDiskPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) {
func (plugin *gcePersistentDiskPlugin) ConstructVolumeSpec(volumeName, mountPath string) (volume.ReconstructedVolume, error) {
mounter := plugin.host.GetMounter(plugin.GetPluginName())
kvh, ok := plugin.host.(volume.KubeletVolumeHost)
if !ok {
return nil, fmt.Errorf("plugin volume host does not implement KubeletVolumeHost interface")
return volume.ReconstructedVolume{}, fmt.Errorf("plugin volume host does not implement KubeletVolumeHost interface")
}
hu := kvh.GetHostUtil()
pluginMntDir := util.GetPluginMountDir(plugin.host, plugin.GetPluginName())
sourceName, err := hu.GetDeviceNameFromMount(mounter, mountPath, pluginMntDir)
if err != nil {
return nil, err
return volume.ReconstructedVolume{}, err
}
gceVolume := &v1.Volume{
Name: volumeName,
@ -319,7 +319,9 @@ func (plugin *gcePersistentDiskPlugin) ConstructVolumeSpec(volumeName, mountPath
},
},
}
return volume.NewSpecFromVolume(gceVolume), nil
return volume.ReconstructedVolume{
Spec: volume.NewSpecFromVolume(gceVolume),
}, nil
}
// Abstract interface to PD operations.

View File

@ -123,14 +123,16 @@ func (plugin *gitRepoPlugin) NewUnmounter(volName string, podUID types.UID) (vol
}, nil
}
func (plugin *gitRepoPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) {
func (plugin *gitRepoPlugin) ConstructVolumeSpec(volumeName, mountPath string) (volume.ReconstructedVolume, error) {
gitVolume := &v1.Volume{
Name: volumeName,
VolumeSource: v1.VolumeSource{
GitRepo: &v1.GitRepoVolumeSource{},
},
}
return volume.NewSpecFromVolume(gitVolume), nil
return volume.ReconstructedVolume{
Spec: volume.NewSpecFromVolume(gitVolume),
}, nil
}
// gitRepo volumes are directories which are pre-filled from a git repository.

View File

@ -181,7 +181,7 @@ func (plugin *hostPathPlugin) NewProvisioner(options volume.VolumeOptions) (volu
return newProvisioner(options, plugin.host, plugin)
}
func (plugin *hostPathPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) {
func (plugin *hostPathPlugin) ConstructVolumeSpec(volumeName, mountPath string) (volume.ReconstructedVolume, error) {
hostPathVolume := &v1.Volume{
Name: volumeName,
VolumeSource: v1.VolumeSource{
@ -190,7 +190,9 @@ func (plugin *hostPathPlugin) ConstructVolumeSpec(volumeName, mountPath string)
},
},
}
return volume.NewSpecFromVolume(hostPathVolume), nil
return volume.ReconstructedVolume{
Spec: volume.NewSpecFromVolume(hostPathVolume),
}, nil
}
func newDeleter(spec *volume.Spec, host volume.VolumeHost) (volume.Deleter, error) {

View File

@ -221,7 +221,7 @@ func (plugin *iscsiPlugin) newUnmapperInternal(volName string, podUID types.UID,
}, nil
}
func (plugin *iscsiPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) {
func (plugin *iscsiPlugin) ConstructVolumeSpec(volumeName, mountPath string) (volume.ReconstructedVolume, error) {
// Find globalPDPath from pod volume directory(mountPath)
var globalPDPath string
mounter := plugin.host.GetMounter(plugin.GetPluginName())
@ -233,10 +233,10 @@ func (plugin *iscsiPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*v
if io.IsInconsistentReadError(err) {
klog.Errorf("Failed to read mount refs from /proc/mounts for %s: %s", mountPath, err)
klog.Errorf("Kubelet cannot unmount volume at %s, please unmount it and all mounts of the same device manually.", mountPath)
return nil, err
return volume.ReconstructedVolume{}, err
}
if err != nil {
return nil, err
return volume.ReconstructedVolume{}, err
}
for _, path := range paths {
@ -247,25 +247,25 @@ func (plugin *iscsiPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*v
}
// Couldn't fetch globalPDPath
if len(globalPDPath) == 0 {
return nil, fmt.Errorf("couldn't fetch globalPDPath. failed to obtain volume spec")
return volume.ReconstructedVolume{}, fmt.Errorf("couldn't fetch globalPDPath. failed to obtain volume spec")
}
// Obtain iscsi disk configurations from globalPDPath
device, _, err := extractDeviceAndPrefix(globalPDPath)
if err != nil {
return nil, err
return volume.ReconstructedVolume{}, err
}
bkpPortal, iqn, err := extractPortalAndIqn(device)
if err != nil {
return nil, err
return volume.ReconstructedVolume{}, err
}
arr := strings.Split(device, "-lun-")
if len(arr) < 2 {
return nil, fmt.Errorf("failed to retrieve lun from globalPDPath: %v", globalPDPath)
return volume.ReconstructedVolume{}, fmt.Errorf("failed to retrieve lun from globalPDPath: %v", globalPDPath)
}
lun, err := strconv.Atoi(arr[1])
if err != nil {
return nil, err
return volume.ReconstructedVolume{}, err
}
iface, _ := extractIface(globalPDPath)
iscsiVolume := &v1.Volume{
@ -279,7 +279,9 @@ func (plugin *iscsiPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*v
},
},
}
return volume.NewSpecFromVolume(iscsiVolume), nil
return volume.ReconstructedVolume{
Spec: volume.NewSpecFromVolume(iscsiVolume),
}, nil
}
func (plugin *iscsiPlugin) ConstructBlockVolumeSpec(podUID types.UID, volumeName, mapPath string) (*volume.Spec, error) {

View File

@ -197,7 +197,7 @@ func (plugin *localVolumePlugin) NewBlockVolumeUnmapper(volName string,
}
// TODO: check if no path and no topology constraints are ok
func (plugin *localVolumePlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) {
func (plugin *localVolumePlugin) ConstructVolumeSpec(volumeName, mountPath string) (volume.ReconstructedVolume, error) {
fs := v1.PersistentVolumeFilesystem
// The main purpose of reconstructed volume is to clean unused mount points
// and directories.
@ -209,7 +209,7 @@ func (plugin *localVolumePlugin) ConstructVolumeSpec(volumeName, mountPath strin
mounter := plugin.host.GetMounter(plugin.GetPluginName())
refs, err := mounter.GetMountRefs(mountPath)
if err != nil {
return nil, err
return volume.ReconstructedVolume{}, err
}
baseMountPath := plugin.generateBlockDeviceBaseGlobalPath()
for _, ref := range refs {
@ -221,7 +221,7 @@ func (plugin *localVolumePlugin) ConstructVolumeSpec(volumeName, mountPath strin
// source and can be used in reconstructed volume.
path, _, err = mount.GetDeviceNameFromMount(mounter, ref)
if err != nil {
return nil, err
return volume.ReconstructedVolume{}, err
}
klog.V(4).Infof("local: reconstructing volume %q (pod volume mount: %q) with device %q", volumeName, mountPath, path)
break
@ -240,7 +240,9 @@ func (plugin *localVolumePlugin) ConstructVolumeSpec(volumeName, mountPath strin
VolumeMode: &fs,
},
}
return volume.NewSpecFromPersistentVolume(localVolume, false), nil
return volume.ReconstructedVolume{
Spec: volume.NewSpecFromPersistentVolume(localVolume, false),
}, nil
}
func (plugin *localVolumePlugin) ConstructBlockVolumeSpec(podUID types.UID, volumeName,

View File

@ -543,34 +543,34 @@ func TestConstructVolumeSpec(t *testing.T) {
}
mounter.(*mount.FakeMounter).MountPoints = fakeMountPoints
volPath := filepath.Join(tmpDir, testMountPath)
spec, err := plug.ConstructVolumeSpec(testPVName, volPath)
rec, err := plug.ConstructVolumeSpec(testPVName, volPath)
if err != nil {
t.Errorf("ConstructVolumeSpec() failed: %v", err)
}
if spec == nil {
if rec.Spec == nil {
t.Fatalf("ConstructVolumeSpec() returned nil")
}
volName := spec.Name()
volName := rec.Spec.Name()
if volName != testPVName {
t.Errorf("Expected volume name %q, got %q", testPVName, volName)
}
if spec.Volume != nil {
if rec.Spec.Volume != nil {
t.Errorf("Volume object returned, expected nil")
}
pv := spec.PersistentVolume
pv := rec.Spec.PersistentVolume
if pv == nil {
t.Fatalf("PersistentVolume object nil")
}
if spec.PersistentVolume.Spec.VolumeMode == nil {
if rec.Spec.PersistentVolume.Spec.VolumeMode == nil {
t.Fatalf("Volume mode has not been set.")
}
if *spec.PersistentVolume.Spec.VolumeMode != v1.PersistentVolumeFilesystem {
t.Errorf("Unexpected volume mode %q", *spec.PersistentVolume.Spec.VolumeMode)
if *rec.Spec.PersistentVolume.Spec.VolumeMode != v1.PersistentVolumeFilesystem {
t.Errorf("Unexpected volume mode %q", *rec.Spec.PersistentVolume.Spec.VolumeMode)
}
ls := pv.Spec.PersistentVolumeSource.Local

View File

@ -176,7 +176,7 @@ func (plugin *nfsPlugin) Recycle(pvName string, spec *volume.Spec, eventRecorder
return recyclerclient.RecycleVolumeByWatchingPodUntilCompletion(pvName, pod, plugin.host.GetKubeClient(), eventRecorder)
}
func (plugin *nfsPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) {
func (plugin *nfsPlugin) ConstructVolumeSpec(volumeName, mountPath string) (volume.ReconstructedVolume, error) {
nfsVolume := &v1.Volume{
Name: volumeName,
VolumeSource: v1.VolumeSource{
@ -185,7 +185,9 @@ func (plugin *nfsPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*vol
},
},
}
return volume.NewSpecFromVolume(nfsVolume), nil
return volume.ReconstructedVolume{
Spec: volume.NewSpecFromVolume(nfsVolume),
}, nil
}
// NFS volumes represent a bare host file or directory mount of an NFS export.

View File

@ -60,8 +60,8 @@ func (n *noopExpandableVolumePluginInstance) NewUnmounter(name string, podUID ty
return nil, nil
}
func (n *noopExpandableVolumePluginInstance) ConstructVolumeSpec(volumeName, mountPath string) (*Spec, error) {
return n.spec, nil
func (n *noopExpandableVolumePluginInstance) ConstructVolumeSpec(volumeName, mountPath string) (ReconstructedVolume, error) {
return ReconstructedVolume{Spec: n.spec}, nil
}
func (n *noopExpandableVolumePluginInstance) SupportsMountOption() bool {

View File

@ -166,7 +166,7 @@ type VolumePlugin interface {
// and volumePath. The spec may have incomplete information due to limited
// information from input. This function is used by volume manager to reconstruct
// volume spec by reading the volume directories from disk
ConstructVolumeSpec(volumeName, volumePath string) (*Spec, error)
ConstructVolumeSpec(volumeName, volumePath string) (ReconstructedVolume, error)
// SupportsMountOption returns true if volume plugins supports Mount options
// Specifying mount options in a volume plugin that doesn't support
@ -570,6 +570,12 @@ type VolumeConfig struct {
ProvisioningEnabled bool
}
// ReconstructedVolume contains information about a volume reconstructed by
// ConstructVolumeSpec().
type ReconstructedVolume struct {
Spec *Spec
}
// NewSpecFromVolume creates an Spec from an v1.Volume
func NewSpecFromVolume(vs *v1.Volume) *Spec {
return &Spec{

View File

@ -99,8 +99,8 @@ func (plugin *testPlugins) NewUnmounter(name string, podUID types.UID) (Unmounte
return nil, nil
}
func (plugin *testPlugins) ConstructVolumeSpec(volumeName, mountPath string) (*Spec, error) {
return nil, nil
func (plugin *testPlugins) ConstructVolumeSpec(volumeName, mountPath string) (ReconstructedVolume, error) {
return ReconstructedVolume{}, nil
}
func newTestPlugin() []VolumePlugin {

View File

@ -210,7 +210,7 @@ func (plugin *portworxVolumePlugin) ExpandVolumeDevice(
return newSize, nil
}
func (plugin *portworxVolumePlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) {
func (plugin *portworxVolumePlugin) ConstructVolumeSpec(volumeName, mountPath string) (volume.ReconstructedVolume, error) {
portworxVolume := &v1.Volume{
Name: volumeName,
VolumeSource: v1.VolumeSource{
@ -219,7 +219,9 @@ func (plugin *portworxVolumePlugin) ConstructVolumeSpec(volumeName, mountPath st
},
},
}
return volume.NewSpecFromVolume(portworxVolume), nil
return volume.ReconstructedVolume{
Spec: volume.NewSpecFromVolume(portworxVolume),
}, nil
}
func (plugin *portworxVolumePlugin) SupportsMountOption() bool {

View File

@ -135,7 +135,7 @@ func (plugin *projectedPlugin) NewUnmounter(volName string, podUID types.UID) (v
}, nil
}
func (plugin *projectedPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) {
func (plugin *projectedPlugin) ConstructVolumeSpec(volumeName, mountPath string) (volume.ReconstructedVolume, error) {
projectedVolume := &v1.Volume{
Name: volumeName,
VolumeSource: v1.VolumeSource{
@ -143,7 +143,9 @@ func (plugin *projectedPlugin) ConstructVolumeSpec(volumeName, mountPath string)
},
}
return volume.NewSpecFromVolume(projectedVolume), nil
return volume.ReconstructedVolume{
Spec: volume.NewSpecFromVolume(projectedVolume),
}, nil
}
type projectedVolume struct {

View File

@ -386,17 +386,17 @@ func (plugin *rbdPlugin) newUnmounterInternal(volName string, podUID types.UID,
}, nil
}
func (plugin *rbdPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) {
func (plugin *rbdPlugin) ConstructVolumeSpec(volumeName, mountPath string) (volume.ReconstructedVolume, error) {
mounter := plugin.host.GetMounter(plugin.GetPluginName())
kvh, ok := plugin.host.(volume.KubeletVolumeHost)
if !ok {
return nil, fmt.Errorf("plugin volume host does not implement KubeletVolumeHost interface")
return volume.ReconstructedVolume{}, fmt.Errorf("plugin volume host does not implement KubeletVolumeHost interface")
}
hu := kvh.GetHostUtil()
pluginMntDir := volutil.GetPluginMountDir(plugin.host, plugin.GetPluginName())
sourceName, err := hu.GetDeviceNameFromMount(mounter, mountPath, pluginMntDir)
if err != nil {
return nil, err
return volume.ReconstructedVolume{}, err
}
s := dstrings.Split(sourceName, "-image-")
if len(s) != 2 {
@ -414,11 +414,11 @@ func (plugin *rbdPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*vol
klog.V(3).Infof("SourceName %s wrong, fallback to old format", sourceName)
sourceName, err = plugin.getDeviceNameFromOldMountPath(mounter, mountPath)
if err != nil {
return nil, err
return volume.ReconstructedVolume{}, err
}
s = dstrings.Split(sourceName, "-image-")
if len(s) != 2 {
return nil, fmt.Errorf("sourceName %s wrong, should be pool+\"-image-\"+imageName", sourceName)
return volume.ReconstructedVolume{}, fmt.Errorf("sourceName %s wrong, should be pool+\"-image-\"+imageName", sourceName)
}
}
rbdVolume := &v1.Volume{
@ -430,7 +430,9 @@ func (plugin *rbdPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*vol
},
},
}
return volume.NewSpecFromVolume(rbdVolume), nil
return volume.ReconstructedVolume{
Spec: volume.NewSpecFromVolume(rbdVolume),
}, nil
}
func (plugin *rbdPlugin) ConstructBlockVolumeSpec(podUID types.UID, volumeName, mapPath string) (*volume.Spec, error) {

View File

@ -631,15 +631,15 @@ func TestConstructVolumeSpec(t *testing.T) {
if err = fakeMounter.Mount(c.targetPath, podMountPath, "fake", []string{"bind"}); err != nil {
t.Fatalf("Mount %s to %s failed: %v", c.targetPath, podMountPath, err)
}
spec, err := plug.ConstructVolumeSpec(c.volumeName, podMountPath)
rec, err := plug.ConstructVolumeSpec(c.volumeName, podMountPath)
if err != nil {
t.Errorf("ConstructVolumeSpec failed: %v", err)
} else {
if spec.Volume.RBD.RBDPool != pool {
t.Errorf("Mismatch rbd pool: wanted %s, got %s", pool, spec.Volume.RBD.RBDPool)
if rec.Spec.Volume.RBD.RBDPool != pool {
t.Errorf("Mismatch rbd pool: wanted %s, got %s", pool, rec.Spec.Volume.RBD.RBDPool)
}
if spec.Volume.RBD.RBDImage != image {
t.Fatalf("Mismatch rbd image: wanted %s, got %s", image, spec.Volume.RBD.RBDImage)
if rec.Spec.Volume.RBD.RBDImage != image {
t.Fatalf("Mismatch rbd image: wanted %s, got %s", image, rec.Spec.Volume.RBD.RBDImage)
}
}
if err = fakeMounter.Unmount(podMountPath); err != nil {

View File

@ -125,7 +125,7 @@ func (plugin *secretPlugin) NewUnmounter(volName string, podUID types.UID) (volu
}, nil
}
func (plugin *secretPlugin) ConstructVolumeSpec(volName, mountPath string) (*volume.Spec, error) {
func (plugin *secretPlugin) ConstructVolumeSpec(volName, mountPath string) (volume.ReconstructedVolume, error) {
secretVolume := &v1.Volume{
Name: volName,
VolumeSource: v1.VolumeSource{
@ -134,7 +134,9 @@ func (plugin *secretPlugin) ConstructVolumeSpec(volName, mountPath string) (*vol
},
},
}
return volume.NewSpecFromVolume(secretVolume), nil
return volume.ReconstructedVolume{
Spec: volume.NewSpecFromVolume(secretVolume),
}, nil
}
type secretVolume struct {

View File

@ -451,10 +451,12 @@ func (plugin *FakeVolumePlugin) GetAccessModes() []v1.PersistentVolumeAccessMode
return []v1.PersistentVolumeAccessMode{}
}
func (plugin *FakeVolumePlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) {
return &volume.Spec{
Volume: &v1.Volume{
Name: volumeName,
func (plugin *FakeVolumePlugin) ConstructVolumeSpec(volumeName, mountPath string) (volume.ReconstructedVolume, error) {
return volume.ReconstructedVolume{
Spec: &volume.Spec{
Volume: &v1.Volume{
Name: volumeName,
},
},
}, nil
}
@ -526,7 +528,7 @@ func (f *FakeBasicVolumePlugin) CanSupport(spec *volume.Spec) bool {
return strings.HasPrefix(spec.Name(), f.GetPluginName())
}
func (f *FakeBasicVolumePlugin) ConstructVolumeSpec(ame, mountPath string) (*volume.Spec, error) {
func (f *FakeBasicVolumePlugin) ConstructVolumeSpec(ame, mountPath string) (volume.ReconstructedVolume, error) {
return f.Plugin.ConstructVolumeSpec(ame, mountPath)
}
@ -647,8 +649,8 @@ func (plugin *FakeFileVolumePlugin) NewUnmounter(name string, podUID types.UID)
return nil, nil
}
func (plugin *FakeFileVolumePlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) {
return nil, nil
func (plugin *FakeFileVolumePlugin) ConstructVolumeSpec(volumeName, mountPath string) (volume.ReconstructedVolume, error) {
return volume.ReconstructedVolume{}, nil
}
func NewFakeFileVolumePlugin() []volume.VolumePlugin {

View File

@ -150,7 +150,7 @@ type OperationExecutor interface {
// ExpandInUseVolume will resize volume's file system to expected size without unmounting the volume.
ExpandInUseVolume(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater, currentSize resource.Quantity) error
// ReconstructVolumeOperation construct a new volumeSpec and returns it created by plugin
ReconstructVolumeOperation(volumeMode v1.PersistentVolumeMode, plugin volume.VolumePlugin, mapperPlugin volume.BlockVolumePlugin, uid types.UID, podName volumetypes.UniquePodName, volumeSpecName string, volumePath string, pluginName string) (*volume.Spec, error)
ReconstructVolumeOperation(volumeMode v1.PersistentVolumeMode, plugin volume.VolumePlugin, mapperPlugin volume.BlockVolumePlugin, uid types.UID, podName volumetypes.UniquePodName, volumeSpecName string, volumePath string, pluginName string) (volume.ReconstructedVolume, error)
// CheckVolumeExistenceOperation checks volume existence
CheckVolumeExistenceOperation(volumeSpec *volume.Spec, mountPath, volumeName string, mounter mount.Interface, uniqueVolumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName, podUID types.UID, attachable volume.AttachableVolumePlugin) (bool, error)
}
@ -1061,17 +1061,17 @@ func (oe *operationExecutor) ReconstructVolumeOperation(
podName volumetypes.UniquePodName,
volumeSpecName string,
volumePath string,
pluginName string) (*volume.Spec, error) {
pluginName string) (volume.ReconstructedVolume, error) {
// Filesystem Volume case
if volumeMode == v1.PersistentVolumeFilesystem {
// Create volumeSpec from mount path
klog.V(5).Infof("Starting operationExecutor.ReconstructVolume for file volume on pod %q", podName)
volumeSpec, err := plugin.ConstructVolumeSpec(volumeSpecName, volumePath)
reconstructed, err := plugin.ConstructVolumeSpec(volumeSpecName, volumePath)
if err != nil {
return nil, err
return volume.ReconstructedVolume{}, err
}
return volumeSpec, nil
return reconstructed, nil
}
// Block Volume case
@ -1083,9 +1083,11 @@ func (oe *operationExecutor) ReconstructVolumeOperation(
// ex. volumePath: pods/{podUid}}/{DefaultKubeletVolumeDevicesDirName}/{escapeQualifiedPluginName}/{volumeName}
volumeSpec, err := mapperPlugin.ConstructBlockVolumeSpec(uid, volumeSpecName, volumePath)
if err != nil {
return nil, err
return volume.ReconstructedVolume{}, err
}
return volumeSpec, nil
return volume.ReconstructedVolume{
Spec: volumeSpec,
}, nil
}
// CheckVolumeExistenceOperation checks mount path directory if volume still exists

View File

@ -153,17 +153,17 @@ func (plugin *vsphereVolumePlugin) newUnmounterInternal(volName string, podUID t
}}, nil
}
func (plugin *vsphereVolumePlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) {
func (plugin *vsphereVolumePlugin) ConstructVolumeSpec(volumeName, mountPath string) (volume.ReconstructedVolume, error) {
mounter := plugin.host.GetMounter(plugin.GetPluginName())
kvh, ok := plugin.host.(volume.KubeletVolumeHost)
if !ok {
return nil, fmt.Errorf("plugin volume host does not implement KubeletVolumeHost interface")
return volume.ReconstructedVolume{}, fmt.Errorf("plugin volume host does not implement KubeletVolumeHost interface")
}
hu := kvh.GetHostUtil()
pluginMntDir := util.GetPluginMountDir(plugin.host, plugin.GetPluginName())
volumePath, err := hu.GetDeviceNameFromMount(mounter, mountPath, pluginMntDir)
if err != nil {
return nil, err
return volume.ReconstructedVolume{}, err
}
volumePath = strings.Replace(volumePath, "\\040", " ", -1)
klog.V(5).Infof("vSphere volume path is %q", volumePath)
@ -175,7 +175,9 @@ func (plugin *vsphereVolumePlugin) ConstructVolumeSpec(volumeName, mountPath str
},
},
}
return volume.NewSpecFromVolume(vsphereVolume), nil
return volume.ReconstructedVolume{
Spec: volume.NewSpecFromVolume(vsphereVolume),
}, nil
}
// Abstract interface to disk operations.