Merge pull request #117804 from jsafrane/fix-csi-attachable-reconstruction
Fix reconstruction of CSI volumes
This commit is contained in:
		@@ -169,14 +169,20 @@ type ActualStateOfWorld interface {
 | 
			
		||||
	GetAttachedVolumes() []AttachedVolume
 | 
			
		||||
 | 
			
		||||
	// SyncReconstructedVolume check the volume.outerVolumeSpecName in asw and
 | 
			
		||||
	// the one populated from dsw , if they do not match, update this field from the value from dsw.
 | 
			
		||||
	// the one populated from dsw, if they do not match, update this field from the value from dsw.
 | 
			
		||||
	SyncReconstructedVolume(volumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName, outerVolumeSpecName string)
 | 
			
		||||
 | 
			
		||||
	// Add the specified volume to ASW as uncertainly attached.
 | 
			
		||||
	AddAttachUncertainReconstructedVolume(volumeName v1.UniqueVolumeName, volumeSpec *volume.Spec, nodeName types.NodeName, devicePath string) error
 | 
			
		||||
 | 
			
		||||
	// UpdateReconstructedDevicePath updates devicePath of a reconstructed volume
 | 
			
		||||
	// from Node.Status.VolumesAttached. The ASW is updated only when the volume is still
 | 
			
		||||
	// uncertain. If the volume got mounted in the meantime, its devicePath must have
 | 
			
		||||
	// been fixed by such an update.
 | 
			
		||||
	UpdateReconstructedDevicePath(volumeName v1.UniqueVolumeName, devicePath string)
 | 
			
		||||
 | 
			
		||||
	// UpdateReconstructedVolumeAttachability updates volume attachability from the API server.
 | 
			
		||||
	UpdateReconstructedVolumeAttachability(volumeName v1.UniqueVolumeName, volumeAttachable bool)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// MountedVolume represents a volume that has successfully been mounted to a pod.
 | 
			
		||||
@@ -251,6 +257,14 @@ type actualStateOfWorld struct {
 | 
			
		||||
	sync.RWMutex
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type volumeAttachability string
 | 
			
		||||
 | 
			
		||||
const (
 | 
			
		||||
	volumeAttachabilityTrue      volumeAttachability = "True"
 | 
			
		||||
	volumeAttachabilityFalse     volumeAttachability = "False"
 | 
			
		||||
	volumeAttachabilityUncertain volumeAttachability = "Uncertain"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// attachedVolume represents a volume the kubelet volume manager believes to be
 | 
			
		||||
// successfully attached to a node it is managing. Volume types that do not
 | 
			
		||||
// implement an attacher are assumed to be in this state.
 | 
			
		||||
@@ -280,7 +294,7 @@ type attachedVolume struct {
 | 
			
		||||
 | 
			
		||||
	// pluginIsAttachable indicates the volume plugin used to attach and mount
 | 
			
		||||
	// this volume implements the volume.Attacher interface
 | 
			
		||||
	pluginIsAttachable bool
 | 
			
		||||
	pluginIsAttachable volumeAttachability
 | 
			
		||||
 | 
			
		||||
	// deviceMountState stores information that tells us if device is mounted
 | 
			
		||||
	// globally or not
 | 
			
		||||
@@ -361,7 +375,19 @@ type mountedPod struct {
 | 
			
		||||
func (asw *actualStateOfWorld) MarkVolumeAsAttached(
 | 
			
		||||
	logger klog.Logger,
 | 
			
		||||
	volumeName v1.UniqueVolumeName, volumeSpec *volume.Spec, _ types.NodeName, devicePath string) error {
 | 
			
		||||
	return asw.addVolume(volumeName, volumeSpec, devicePath)
 | 
			
		||||
 | 
			
		||||
	pluginIsAttachable := volumeAttachabilityFalse
 | 
			
		||||
	if attachablePlugin, err := asw.volumePluginMgr.FindAttachablePluginBySpec(volumeSpec); err == nil && attachablePlugin != nil {
 | 
			
		||||
		pluginIsAttachable = volumeAttachabilityTrue
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return asw.addVolume(volumeName, volumeSpec, devicePath, pluginIsAttachable)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (asw *actualStateOfWorld) AddAttachUncertainReconstructedVolume(
 | 
			
		||||
	volumeName v1.UniqueVolumeName, volumeSpec *volume.Spec, _ types.NodeName, devicePath string) error {
 | 
			
		||||
 | 
			
		||||
	return asw.addVolume(volumeName, volumeSpec, devicePath, volumeAttachabilityUncertain)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (asw *actualStateOfWorld) MarkVolumeAsUncertain(
 | 
			
		||||
@@ -526,6 +552,28 @@ func (asw *actualStateOfWorld) UpdateReconstructedDevicePath(volumeName v1.Uniqu
 | 
			
		||||
	asw.attachedVolumes[volumeName] = volumeObj
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (asw *actualStateOfWorld) UpdateReconstructedVolumeAttachability(volumeName v1.UniqueVolumeName, attachable bool) {
 | 
			
		||||
	asw.Lock()
 | 
			
		||||
	defer asw.Unlock()
 | 
			
		||||
 | 
			
		||||
	volumeObj, volumeExists := asw.attachedVolumes[volumeName]
 | 
			
		||||
	if !volumeExists {
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	if volumeObj.pluginIsAttachable != volumeAttachabilityUncertain {
 | 
			
		||||
		// Reconciler must have updated volume state, i.e. when a pod uses the volume and
 | 
			
		||||
		// succeeded mounting the volume. Such update has fixed the device path.
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if attachable {
 | 
			
		||||
		volumeObj.pluginIsAttachable = volumeAttachabilityTrue
 | 
			
		||||
	} else {
 | 
			
		||||
		volumeObj.pluginIsAttachable = volumeAttachabilityFalse
 | 
			
		||||
	}
 | 
			
		||||
	asw.attachedVolumes[volumeName] = volumeObj
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (asw *actualStateOfWorld) GetDeviceMountState(volumeName v1.UniqueVolumeName) operationexecutor.DeviceMountState {
 | 
			
		||||
	asw.RLock()
 | 
			
		||||
	defer asw.RUnlock()
 | 
			
		||||
@@ -592,7 +640,7 @@ func (asw *actualStateOfWorld) IsVolumeMountedElsewhere(volumeName v1.UniqueVolu
 | 
			
		||||
// volume plugin can support the given volumeSpec or more than one plugin can
 | 
			
		||||
// support it, an error is returned.
 | 
			
		||||
func (asw *actualStateOfWorld) addVolume(
 | 
			
		||||
	volumeName v1.UniqueVolumeName, volumeSpec *volume.Spec, devicePath string) error {
 | 
			
		||||
	volumeName v1.UniqueVolumeName, volumeSpec *volume.Spec, devicePath string, attachability volumeAttachability) error {
 | 
			
		||||
	asw.Lock()
 | 
			
		||||
	defer asw.Unlock()
 | 
			
		||||
 | 
			
		||||
@@ -615,11 +663,6 @@ func (asw *actualStateOfWorld) addVolume(
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	pluginIsAttachable := false
 | 
			
		||||
	if attachablePlugin, err := asw.volumePluginMgr.FindAttachablePluginBySpec(volumeSpec); err == nil && attachablePlugin != nil {
 | 
			
		||||
		pluginIsAttachable = true
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	volumeObj, volumeExists := asw.attachedVolumes[volumeName]
 | 
			
		||||
	if !volumeExists {
 | 
			
		||||
		volumeObj = attachedVolume{
 | 
			
		||||
@@ -627,7 +670,7 @@ func (asw *actualStateOfWorld) addVolume(
 | 
			
		||||
			spec:               volumeSpec,
 | 
			
		||||
			mountedPods:        make(map[volumetypes.UniquePodName]mountedPod),
 | 
			
		||||
			pluginName:         volumePlugin.GetPluginName(),
 | 
			
		||||
			pluginIsAttachable: pluginIsAttachable,
 | 
			
		||||
			pluginIsAttachable: attachability,
 | 
			
		||||
			deviceMountState:   operationexecutor.DeviceNotMounted,
 | 
			
		||||
			devicePath:         devicePath,
 | 
			
		||||
		}
 | 
			
		||||
@@ -1094,7 +1137,7 @@ func (asw *actualStateOfWorld) newAttachedVolume(
 | 
			
		||||
			VolumeName:          attachedVolume.volumeName,
 | 
			
		||||
			VolumeSpec:          attachedVolume.spec,
 | 
			
		||||
			NodeName:            asw.nodeName,
 | 
			
		||||
			PluginIsAttachable:  attachedVolume.pluginIsAttachable,
 | 
			
		||||
			PluginIsAttachable:  attachedVolume.pluginIsAttachable == volumeAttachabilityTrue,
 | 
			
		||||
			DevicePath:          attachedVolume.devicePath,
 | 
			
		||||
			DeviceMountPath:     attachedVolume.deviceMountPath,
 | 
			
		||||
			PluginName:          attachedVolume.pluginName,
 | 
			
		||||
 
 | 
			
		||||
@@ -525,6 +525,54 @@ func TestActualStateOfWorld_FoundDuringReconstruction(t *testing.T) {
 | 
			
		||||
				return nil
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			name: "uncertain attachability is resolved to attachable",
 | 
			
		||||
			opCallback: func(asw ActualStateOfWorld, volumeOpts operationexecutor.MarkVolumeOpts) error {
 | 
			
		||||
				asw.UpdateReconstructedVolumeAttachability(volumeOpts.VolumeName, true)
 | 
			
		||||
				return nil
 | 
			
		||||
			},
 | 
			
		||||
			verifyCallback: func(asw ActualStateOfWorld, volumeOpts operationexecutor.MarkVolumeOpts) error {
 | 
			
		||||
				verifyVolumeAttachability(t, volumeOpts.VolumeName, asw, volumeAttachabilityTrue)
 | 
			
		||||
				return nil
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			name: "uncertain attachability is resolved to non-attachable",
 | 
			
		||||
			opCallback: func(asw ActualStateOfWorld, volumeOpts operationexecutor.MarkVolumeOpts) error {
 | 
			
		||||
				asw.UpdateReconstructedVolumeAttachability(volumeOpts.VolumeName, false)
 | 
			
		||||
				return nil
 | 
			
		||||
			},
 | 
			
		||||
			verifyCallback: func(asw ActualStateOfWorld, volumeOpts operationexecutor.MarkVolumeOpts) error {
 | 
			
		||||
				verifyVolumeAttachability(t, volumeOpts.VolumeName, asw, volumeAttachabilityFalse)
 | 
			
		||||
				return nil
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			name: "certain (false) attachability cannot be changed",
 | 
			
		||||
			opCallback: func(asw ActualStateOfWorld, volumeOpts operationexecutor.MarkVolumeOpts) error {
 | 
			
		||||
				asw.UpdateReconstructedVolumeAttachability(volumeOpts.VolumeName, false)
 | 
			
		||||
				// This function should be NOOP:
 | 
			
		||||
				asw.UpdateReconstructedVolumeAttachability(volumeOpts.VolumeName, true)
 | 
			
		||||
				return nil
 | 
			
		||||
			},
 | 
			
		||||
			verifyCallback: func(asw ActualStateOfWorld, volumeOpts operationexecutor.MarkVolumeOpts) error {
 | 
			
		||||
				verifyVolumeAttachability(t, volumeOpts.VolumeName, asw, volumeAttachabilityFalse)
 | 
			
		||||
				return nil
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			name: "certain (true) attachability cannot be changed",
 | 
			
		||||
			opCallback: func(asw ActualStateOfWorld, volumeOpts operationexecutor.MarkVolumeOpts) error {
 | 
			
		||||
				asw.UpdateReconstructedVolumeAttachability(volumeOpts.VolumeName, true)
 | 
			
		||||
				// This function should be NOOP:
 | 
			
		||||
				asw.UpdateReconstructedVolumeAttachability(volumeOpts.VolumeName, false)
 | 
			
		||||
				return nil
 | 
			
		||||
			},
 | 
			
		||||
			verifyCallback: func(asw ActualStateOfWorld, volumeOpts operationexecutor.MarkVolumeOpts) error {
 | 
			
		||||
				verifyVolumeAttachability(t, volumeOpts.VolumeName, asw, volumeAttachabilityTrue)
 | 
			
		||||
				return nil
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
	for _, tc := range tests {
 | 
			
		||||
		t.Run(tc.name, func(t *testing.T) {
 | 
			
		||||
@@ -537,8 +585,7 @@ func TestActualStateOfWorld_FoundDuringReconstruction(t *testing.T) {
 | 
			
		||||
			generatedVolumeName1, err := util.GetUniqueVolumeNameFromSpec(
 | 
			
		||||
				plugin, volumeSpec1)
 | 
			
		||||
			require.NoError(t, err)
 | 
			
		||||
			logger, _ := ktesting.NewTestContext(t)
 | 
			
		||||
			err = asw.MarkVolumeAsAttached(logger, generatedVolumeName1, volumeSpec1, "" /* nodeName */, devicePath)
 | 
			
		||||
			err = asw.AddAttachUncertainReconstructedVolume(generatedVolumeName1, volumeSpec1, "" /* nodeName */, devicePath)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				t.Fatalf("MarkVolumeAsAttached failed. Expected: <no error> Actual: <%v>", err)
 | 
			
		||||
			}
 | 
			
		||||
@@ -575,6 +622,7 @@ func TestActualStateOfWorld_FoundDuringReconstruction(t *testing.T) {
 | 
			
		||||
			verifyVolumeExistsWithSpecNameInVolumeAsw(t, podName1, volumeSpec1.Name(), asw)
 | 
			
		||||
			verifyVolumeSpecNameInVolumeAsw(t, podName1, []*volume.Spec{volumeSpec1}, asw)
 | 
			
		||||
			verifyVolumeFoundInReconstruction(t, podName1, generatedVolumeName1, asw)
 | 
			
		||||
			verifyVolumeAttachability(t, generatedVolumeName1, asw, volumeAttachabilityUncertain)
 | 
			
		||||
 | 
			
		||||
			if tc.opCallback != nil {
 | 
			
		||||
				err = tc.opCallback(asw, markVolumeOpts1)
 | 
			
		||||
@@ -1307,3 +1355,28 @@ func verifyVolumeFoundInReconstruction(t *testing.T, podToCheck volumetypes.Uniq
 | 
			
		||||
		t.Fatalf("ASW IsVolumeReconstructed result invalid. expected <true> Actual <false>")
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func verifyVolumeAttachability(t *testing.T, volumeToCheck v1.UniqueVolumeName, asw ActualStateOfWorld, expected volumeAttachability) {
 | 
			
		||||
	attached := asw.GetAttachedVolumes()
 | 
			
		||||
	attachable := false
 | 
			
		||||
 | 
			
		||||
	for _, volume := range attached {
 | 
			
		||||
		if volume.VolumeName == volumeToCheck {
 | 
			
		||||
			attachable = volume.PluginIsAttachable
 | 
			
		||||
			break
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	switch expected {
 | 
			
		||||
	case volumeAttachabilityTrue:
 | 
			
		||||
		if !attachable {
 | 
			
		||||
			t.Errorf("ASW reports %s as not-attachable, when %s was expected", volumeToCheck, expected)
 | 
			
		||||
		}
 | 
			
		||||
	// ASW does not have any special difference between False and Uncertain.
 | 
			
		||||
	// Uncertain only allows to be changed to True / False.
 | 
			
		||||
	case volumeAttachabilityUncertain, volumeAttachabilityFalse:
 | 
			
		||||
		if attachable {
 | 
			
		||||
			t.Errorf("ASW reports %s as attachable, when %s was expected", volumeToCheck, expected)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -104,24 +104,24 @@ func NewReconciler(
 | 
			
		||||
	volumePluginMgr *volumepkg.VolumePluginMgr,
 | 
			
		||||
	kubeletPodsDir string) Reconciler {
 | 
			
		||||
	return &reconciler{
 | 
			
		||||
		kubeClient:                    kubeClient,
 | 
			
		||||
		controllerAttachDetachEnabled: controllerAttachDetachEnabled,
 | 
			
		||||
		loopSleepDuration:             loopSleepDuration,
 | 
			
		||||
		waitForAttachTimeout:          waitForAttachTimeout,
 | 
			
		||||
		nodeName:                      nodeName,
 | 
			
		||||
		desiredStateOfWorld:           desiredStateOfWorld,
 | 
			
		||||
		actualStateOfWorld:            actualStateOfWorld,
 | 
			
		||||
		populatorHasAddedPods:         populatorHasAddedPods,
 | 
			
		||||
		operationExecutor:             operationExecutor,
 | 
			
		||||
		mounter:                       mounter,
 | 
			
		||||
		hostutil:                      hostutil,
 | 
			
		||||
		skippedDuringReconstruction:   map[v1.UniqueVolumeName]*globalVolumeInfo{},
 | 
			
		||||
		volumePluginMgr:               volumePluginMgr,
 | 
			
		||||
		kubeletPodsDir:                kubeletPodsDir,
 | 
			
		||||
		timeOfLastSync:                time.Time{},
 | 
			
		||||
		volumesFailedReconstruction:   make([]podVolume, 0),
 | 
			
		||||
		volumesNeedDevicePath:         make([]v1.UniqueVolumeName, 0),
 | 
			
		||||
		volumesNeedReportedInUse:      make([]v1.UniqueVolumeName, 0),
 | 
			
		||||
		kubeClient:                      kubeClient,
 | 
			
		||||
		controllerAttachDetachEnabled:   controllerAttachDetachEnabled,
 | 
			
		||||
		loopSleepDuration:               loopSleepDuration,
 | 
			
		||||
		waitForAttachTimeout:            waitForAttachTimeout,
 | 
			
		||||
		nodeName:                        nodeName,
 | 
			
		||||
		desiredStateOfWorld:             desiredStateOfWorld,
 | 
			
		||||
		actualStateOfWorld:              actualStateOfWorld,
 | 
			
		||||
		populatorHasAddedPods:           populatorHasAddedPods,
 | 
			
		||||
		operationExecutor:               operationExecutor,
 | 
			
		||||
		mounter:                         mounter,
 | 
			
		||||
		hostutil:                        hostutil,
 | 
			
		||||
		skippedDuringReconstruction:     map[v1.UniqueVolumeName]*globalVolumeInfo{},
 | 
			
		||||
		volumePluginMgr:                 volumePluginMgr,
 | 
			
		||||
		kubeletPodsDir:                  kubeletPodsDir,
 | 
			
		||||
		timeOfLastSync:                  time.Time{},
 | 
			
		||||
		volumesFailedReconstruction:     make([]podVolume, 0),
 | 
			
		||||
		volumesNeedUpdateFromNodeStatus: make([]v1.UniqueVolumeName, 0),
 | 
			
		||||
		volumesNeedReportedInUse:        make([]v1.UniqueVolumeName, 0),
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@@ -141,11 +141,11 @@ type reconciler struct {
 | 
			
		||||
	skippedDuringReconstruction   map[v1.UniqueVolumeName]*globalVolumeInfo
 | 
			
		||||
	kubeletPodsDir                string
 | 
			
		||||
	// lock protects timeOfLastSync for updating and checking
 | 
			
		||||
	timeOfLastSyncLock          sync.Mutex
 | 
			
		||||
	timeOfLastSync              time.Time
 | 
			
		||||
	volumesFailedReconstruction []podVolume
 | 
			
		||||
	volumesNeedDevicePath       []v1.UniqueVolumeName
 | 
			
		||||
	volumesNeedReportedInUse    []v1.UniqueVolumeName
 | 
			
		||||
	timeOfLastSyncLock              sync.Mutex
 | 
			
		||||
	timeOfLastSync                  time.Time
 | 
			
		||||
	volumesFailedReconstruction     []podVolume
 | 
			
		||||
	volumesNeedUpdateFromNodeStatus []v1.UniqueVolumeName
 | 
			
		||||
	volumesNeedReportedInUse        []v1.UniqueVolumeName
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (rc *reconciler) Run(stopCh <-chan struct{}) {
 | 
			
		||||
 
 | 
			
		||||
@@ -56,8 +56,14 @@ func (rc *reconciler) reconcileNew() {
 | 
			
		||||
		rc.cleanOrphanVolumes()
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if len(rc.volumesNeedDevicePath) != 0 {
 | 
			
		||||
		rc.updateReconstructedDevicePaths()
 | 
			
		||||
	if len(rc.volumesNeedUpdateFromNodeStatus) != 0 {
 | 
			
		||||
		rc.updateReconstructedFromNodeStatus()
 | 
			
		||||
	}
 | 
			
		||||
	if len(rc.volumesNeedUpdateFromNodeStatus) == 0 {
 | 
			
		||||
		// ASW is fully populated only after both devicePaths and uncertain volume attach-ability
 | 
			
		||||
		// were reconstructed from the API server.
 | 
			
		||||
		// This will start reconciliation of node.status.volumesInUse.
 | 
			
		||||
		rc.updateLastSyncTime()
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if len(rc.volumesNeedReportedInUse) != 0 && rc.populatorHasAddedPods() {
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										144
									
								
								pkg/kubelet/volumemanager/reconciler/reconciler_new_test.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										144
									
								
								pkg/kubelet/volumemanager/reconciler/reconciler_new_test.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,144 @@
 | 
			
		||||
/*
 | 
			
		||||
Copyright 2023 The Kubernetes Authors.
 | 
			
		||||
 | 
			
		||||
Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
you may not use this file except in compliance with the License.
 | 
			
		||||
You may obtain a copy of the License at
 | 
			
		||||
 | 
			
		||||
    http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 | 
			
		||||
Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
See the License for the specific language governing permissions and
 | 
			
		||||
limitations under the License.
 | 
			
		||||
*/
 | 
			
		||||
 | 
			
		||||
package reconciler
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"testing"
 | 
			
		||||
 | 
			
		||||
	"github.com/stretchr/testify/assert"
 | 
			
		||||
	v1 "k8s.io/api/core/v1"
 | 
			
		||||
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
			
		||||
	"k8s.io/client-go/tools/record"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/kubelet/volumemanager/cache"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/volume"
 | 
			
		||||
	volumetesting "k8s.io/kubernetes/pkg/volume/testing"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/volume/util"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/volume/util/hostutil"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/volume/util/operationexecutor"
 | 
			
		||||
	"k8s.io/mount-utils"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func TestReconcileWithUpdateReconstructedFromAPIServer(t *testing.T) {
 | 
			
		||||
	// Calls Run() with two reconstructed volumes.
 | 
			
		||||
	// Verifies the devicePaths + volume attachability are reconstructed from node.status.
 | 
			
		||||
 | 
			
		||||
	// Arrange
 | 
			
		||||
	node := &v1.Node{
 | 
			
		||||
		ObjectMeta: metav1.ObjectMeta{
 | 
			
		||||
			Name: string(nodeName),
 | 
			
		||||
		},
 | 
			
		||||
		Status: v1.NodeStatus{
 | 
			
		||||
			VolumesAttached: []v1.AttachedVolume{
 | 
			
		||||
				{
 | 
			
		||||
					Name:       "fake-plugin/fake-device1",
 | 
			
		||||
					DevicePath: "fake/path",
 | 
			
		||||
				},
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
	volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgrWithNode(t, node)
 | 
			
		||||
	seLinuxTranslator := util.NewFakeSELinuxLabelTranslator()
 | 
			
		||||
	dsw := cache.NewDesiredStateOfWorld(volumePluginMgr, seLinuxTranslator)
 | 
			
		||||
	asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
 | 
			
		||||
	kubeClient := createTestClient()
 | 
			
		||||
	fakeRecorder := &record.FakeRecorder{}
 | 
			
		||||
	fakeHandler := volumetesting.NewBlockVolumePathHandler()
 | 
			
		||||
	oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
 | 
			
		||||
		kubeClient,
 | 
			
		||||
		volumePluginMgr,
 | 
			
		||||
		fakeRecorder,
 | 
			
		||||
		fakeHandler))
 | 
			
		||||
	rc := NewReconciler(
 | 
			
		||||
		kubeClient,
 | 
			
		||||
		true, /* controllerAttachDetachEnabled */
 | 
			
		||||
		reconcilerLoopSleepDuration,
 | 
			
		||||
		waitForAttachTimeout,
 | 
			
		||||
		nodeName,
 | 
			
		||||
		dsw,
 | 
			
		||||
		asw,
 | 
			
		||||
		hasAddedPods,
 | 
			
		||||
		oex,
 | 
			
		||||
		mount.NewFakeMounter(nil),
 | 
			
		||||
		hostutil.NewFakeHostUtil(nil),
 | 
			
		||||
		volumePluginMgr,
 | 
			
		||||
		kubeletPodsDir)
 | 
			
		||||
	reconciler := rc.(*reconciler)
 | 
			
		||||
 | 
			
		||||
	// The pod has two volumes, fake-device1 is attachable, fake-device2 is not.
 | 
			
		||||
	pod := &v1.Pod{
 | 
			
		||||
		ObjectMeta: metav1.ObjectMeta{
 | 
			
		||||
			Name: "pod1",
 | 
			
		||||
			UID:  "pod1uid",
 | 
			
		||||
		},
 | 
			
		||||
		Spec: v1.PodSpec{
 | 
			
		||||
			Volumes: []v1.Volume{
 | 
			
		||||
				{
 | 
			
		||||
					Name: "volume-name",
 | 
			
		||||
					VolumeSource: v1.VolumeSource{
 | 
			
		||||
						GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{
 | 
			
		||||
							PDName: "fake-device1",
 | 
			
		||||
						},
 | 
			
		||||
					},
 | 
			
		||||
				},
 | 
			
		||||
				{
 | 
			
		||||
					Name: "volume-name2",
 | 
			
		||||
					VolumeSource: v1.VolumeSource{
 | 
			
		||||
						GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{
 | 
			
		||||
							PDName: "fake-device2",
 | 
			
		||||
						},
 | 
			
		||||
					},
 | 
			
		||||
				},
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	volumeSpec1 := &volume.Spec{Volume: &pod.Spec.Volumes[0]}
 | 
			
		||||
	volumeName1 := util.GetUniqueVolumeName(fakePlugin.GetPluginName(), "fake-device1")
 | 
			
		||||
	volumeSpec2 := &volume.Spec{Volume: &pod.Spec.Volumes[1]}
 | 
			
		||||
	volumeName2 := util.GetUniqueVolumeName(fakePlugin.GetPluginName(), "fake-device2")
 | 
			
		||||
 | 
			
		||||
	assert.NoError(t, asw.AddAttachUncertainReconstructedVolume(volumeName1, volumeSpec1, nodeName, ""))
 | 
			
		||||
	assert.NoError(t, asw.MarkDeviceAsUncertain(volumeName1, "/dev/badly/reconstructed", "/var/lib/kubelet/plugins/global1", ""))
 | 
			
		||||
	assert.NoError(t, asw.AddAttachUncertainReconstructedVolume(volumeName2, volumeSpec2, nodeName, ""))
 | 
			
		||||
	assert.NoError(t, asw.MarkDeviceAsUncertain(volumeName2, "/dev/reconstructed", "/var/lib/kubelet/plugins/global2", ""))
 | 
			
		||||
 | 
			
		||||
	assert.False(t, reconciler.StatesHasBeenSynced())
 | 
			
		||||
 | 
			
		||||
	reconciler.volumesNeedUpdateFromNodeStatus = append(reconciler.volumesNeedUpdateFromNodeStatus, volumeName1, volumeName2)
 | 
			
		||||
	// Act - run reconcile loop just once.
 | 
			
		||||
	// "volumesNeedUpdateFromNodeStatus" is not empty, so no unmount will be triggered.
 | 
			
		||||
	reconciler.reconcileNew()
 | 
			
		||||
 | 
			
		||||
	// Assert
 | 
			
		||||
	assert.True(t, reconciler.StatesHasBeenSynced())
 | 
			
		||||
	assert.Empty(t, reconciler.volumesNeedUpdateFromNodeStatus)
 | 
			
		||||
 | 
			
		||||
	attachedVolumes := asw.GetAttachedVolumes()
 | 
			
		||||
	assert.Equalf(t, len(attachedVolumes), 2, "two volumes in ASW expected")
 | 
			
		||||
	for _, vol := range attachedVolumes {
 | 
			
		||||
		if vol.VolumeName == volumeName1 {
 | 
			
		||||
			// devicePath + attachability must have been updated from node.status
 | 
			
		||||
			assert.True(t, vol.PluginIsAttachable)
 | 
			
		||||
			assert.Equal(t, vol.DevicePath, "fake/path")
 | 
			
		||||
		}
 | 
			
		||||
		if vol.VolumeName == volumeName2 {
 | 
			
		||||
			// only attachability was updated from node.status
 | 
			
		||||
			assert.False(t, vol.PluginIsAttachable)
 | 
			
		||||
			assert.Equal(t, vol.DevicePath, "/dev/reconstructed")
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
@@ -236,17 +236,28 @@ func (rc *reconciler) reconstructVolume(volume podVolume) (rvolume *reconstructe
 | 
			
		||||
	// Searching by spec checks whether the volume is actually attachable
 | 
			
		||||
	// (i.e. has a PV) whereas searching by plugin name can only tell whether
 | 
			
		||||
	// the plugin supports attachable volumes.
 | 
			
		||||
	attachablePlugin, err := rc.volumePluginMgr.FindAttachablePluginBySpec(volumeSpec)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
	deviceMountablePlugin, err := rc.volumePluginMgr.FindDeviceMountablePluginBySpec(volumeSpec)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// The unique volume name used depends on whether the volume is attachable/device-mountable
 | 
			
		||||
	// (needsNameFromSpec = true) or not.
 | 
			
		||||
	needsNameFromSpec := deviceMountablePlugin != nil
 | 
			
		||||
	if !needsNameFromSpec {
 | 
			
		||||
		// Check attach-ability of a volume only as a fallback to avoid calling
 | 
			
		||||
		// FindAttachablePluginBySpec for CSI volumes - it needs a connection to the API server,
 | 
			
		||||
		// but it may not be available at this stage of kubelet startup.
 | 
			
		||||
		// All CSI volumes are device-mountable, so they won't reach this code.
 | 
			
		||||
		attachablePlugin, err := rc.volumePluginMgr.FindAttachablePluginBySpec(volumeSpec)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return nil, err
 | 
			
		||||
		}
 | 
			
		||||
		needsNameFromSpec = attachablePlugin != nil
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	var uniqueVolumeName v1.UniqueVolumeName
 | 
			
		||||
	if attachablePlugin != nil || deviceMountablePlugin != nil {
 | 
			
		||||
	if needsNameFromSpec {
 | 
			
		||||
		uniqueVolumeName, err = util.GetUniqueVolumeNameFromSpec(plugin, volumeSpec)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return nil, err
 | 
			
		||||
 
 | 
			
		||||
@@ -39,7 +39,7 @@ func (rc *reconciler) readyToUnmount() bool {
 | 
			
		||||
 | 
			
		||||
	// Allow unmount only when ASW device paths were corrected from node.status to prevent
 | 
			
		||||
	// calling unmount with a wrong devicePath.
 | 
			
		||||
	if len(rc.volumesNeedDevicePath) != 0 {
 | 
			
		||||
	if len(rc.volumesNeedUpdateFromNodeStatus) != 0 {
 | 
			
		||||
		return false
 | 
			
		||||
	}
 | 
			
		||||
	return true
 | 
			
		||||
@@ -50,7 +50,6 @@ func (rc *reconciler) readyToUnmount() bool {
 | 
			
		||||
// put the volumes to volumesFailedReconstruction to be cleaned up later when DesiredStateOfWorld
 | 
			
		||||
// is populated.
 | 
			
		||||
func (rc *reconciler) reconstructVolumes() {
 | 
			
		||||
	defer rc.updateLastSyncTime()
 | 
			
		||||
	// Get volumes information by reading the pod's directory
 | 
			
		||||
	podVolumes, err := getVolumesFromPodDir(rc.kubeletPodsDir)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
@@ -98,16 +97,16 @@ func (rc *reconciler) reconstructVolumes() {
 | 
			
		||||
		// Remember to update DSW with this information.
 | 
			
		||||
		rc.volumesNeedReportedInUse = reconstructedVolumeNames
 | 
			
		||||
		// Remember to update devicePath from node.status.volumesAttached
 | 
			
		||||
		rc.volumesNeedDevicePath = reconstructedVolumeNames
 | 
			
		||||
		rc.volumesNeedUpdateFromNodeStatus = reconstructedVolumeNames
 | 
			
		||||
	}
 | 
			
		||||
	klog.V(2).InfoS("Volume reconstruction finished")
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (rc *reconciler) updateStatesNew(reconstructedVolumes map[v1.UniqueVolumeName]*globalVolumeInfo) {
 | 
			
		||||
	for _, gvl := range reconstructedVolumes {
 | 
			
		||||
		err := rc.actualStateOfWorld.MarkVolumeAsAttached(
 | 
			
		||||
		err := rc.actualStateOfWorld.AddAttachUncertainReconstructedVolume(
 | 
			
		||||
			//TODO: the devicePath might not be correct for some volume plugins: see issue #54108
 | 
			
		||||
			klog.TODO(), gvl.volumeName, gvl.volumeSpec, rc.nodeName, gvl.devicePath)
 | 
			
		||||
			gvl.volumeName, gvl.volumeSpec, rc.nodeName, gvl.devicePath)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			klog.ErrorS(err, "Could not add volume information to actual state of world", "volumeName", gvl.volumeName)
 | 
			
		||||
			continue
 | 
			
		||||
@@ -174,36 +173,40 @@ func (rc *reconciler) cleanOrphanVolumes() {
 | 
			
		||||
	rc.volumesFailedReconstruction = make([]podVolume, 0)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// updateReconstructedDevicePaths tries to file devicePaths of reconstructed volumes from
 | 
			
		||||
// updateReconstructedFromNodeStatus tries to file devicePaths of reconstructed volumes from
 | 
			
		||||
// node.Status.VolumesAttached. This can be done only after connection to the API
 | 
			
		||||
// server is established, i.e. it can't be part of reconstructVolumes().
 | 
			
		||||
func (rc *reconciler) updateReconstructedDevicePaths() {
 | 
			
		||||
func (rc *reconciler) updateReconstructedFromNodeStatus() {
 | 
			
		||||
	klog.V(4).InfoS("Updating reconstructed devicePaths")
 | 
			
		||||
 | 
			
		||||
	if rc.kubeClient == nil {
 | 
			
		||||
		// Skip reconstructing devicePath from node objects if kubelet is in standalone mode.
 | 
			
		||||
		// Such kubelet is not expected to mount any attachable volume or Secrets / ConfigMap.
 | 
			
		||||
		klog.V(2).InfoS("Skipped reconstruction of DevicePaths from node.status in standalone mode")
 | 
			
		||||
		rc.volumesNeedDevicePath = nil
 | 
			
		||||
		rc.volumesNeedUpdateFromNodeStatus = nil
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	node, fetchErr := rc.kubeClient.CoreV1().Nodes().Get(context.TODO(), string(rc.nodeName), metav1.GetOptions{})
 | 
			
		||||
	if fetchErr != nil {
 | 
			
		||||
		// This may repeat few times per second until kubelet is able to read its own status for the first time.
 | 
			
		||||
		klog.V(2).ErrorS(fetchErr, "Failed to get Node status to reconstruct device paths")
 | 
			
		||||
		klog.V(4).ErrorS(fetchErr, "Failed to get Node status to reconstruct device paths")
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for _, volumeID := range rc.volumesNeedDevicePath {
 | 
			
		||||
	for _, volumeID := range rc.volumesNeedUpdateFromNodeStatus {
 | 
			
		||||
		attachable := false
 | 
			
		||||
		for _, attachedVolume := range node.Status.VolumesAttached {
 | 
			
		||||
			if volumeID != attachedVolume.Name {
 | 
			
		||||
				continue
 | 
			
		||||
			}
 | 
			
		||||
			rc.actualStateOfWorld.UpdateReconstructedDevicePath(volumeID, attachedVolume.DevicePath)
 | 
			
		||||
			attachable = true
 | 
			
		||||
			klog.V(4).InfoS("Updated devicePath from node status for volume", "volumeName", attachedVolume.Name, "path", attachedVolume.DevicePath)
 | 
			
		||||
		}
 | 
			
		||||
		rc.actualStateOfWorld.UpdateReconstructedVolumeAttachability(volumeID, attachable)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	klog.V(2).InfoS("DevicePaths of reconstructed volumes updated")
 | 
			
		||||
	rc.volumesNeedDevicePath = nil
 | 
			
		||||
	rc.volumesNeedUpdateFromNodeStatus = nil
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -117,8 +117,8 @@ func TestReconstructVolumes(t *testing.T) {
 | 
			
		||||
			for i := range tc.expectedVolumesNeedDevicePath {
 | 
			
		||||
				expectedVolumes[i] = v1.UniqueVolumeName(tc.expectedVolumesNeedDevicePath[i])
 | 
			
		||||
			}
 | 
			
		||||
			if !reflect.DeepEqual(expectedVolumes, rcInstance.volumesNeedDevicePath) {
 | 
			
		||||
				t.Errorf("Expected expectedVolumesNeedDevicePath:\n%v\n got:\n%v", expectedVolumes, rcInstance.volumesNeedDevicePath)
 | 
			
		||||
			if !reflect.DeepEqual(expectedVolumes, rcInstance.volumesNeedUpdateFromNodeStatus) {
 | 
			
		||||
				t.Errorf("Expected expectedVolumesNeedDevicePath:\n%v\n got:\n%v", expectedVolumes, rcInstance.volumesNeedUpdateFromNodeStatus)
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			expectedVolumes = make([]v1.UniqueVolumeName, len(tc.expectedVolumesNeedReportedInUse))
 | 
			
		||||
@@ -333,7 +333,7 @@ func TestReconstructVolumesMount(t *testing.T) {
 | 
			
		||||
				return true
 | 
			
		||||
			}
 | 
			
		||||
			// Mark devices paths as reconciled to allow unmounting of volumes.
 | 
			
		||||
			rcInstance.volumesNeedDevicePath = nil
 | 
			
		||||
			rcInstance.volumesNeedUpdateFromNodeStatus = nil
 | 
			
		||||
 | 
			
		||||
			// Act 2 - reconcile once
 | 
			
		||||
			rcInstance.reconcileNew()
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user