diff --git a/pkg/controller/volume/attachdetach/attach_detach_controller.go b/pkg/controller/volume/attachdetach/attach_detach_controller.go index dbfef5f038b..a7c1d5edc80 100644 --- a/pkg/controller/volume/attachdetach/attach_detach_controller.go +++ b/pkg/controller/volume/attachdetach/attach_detach_controller.go @@ -858,6 +858,10 @@ func (adc *attachDetachController) GetNodeAllocatable() (v1.ResourceList, error) return v1.ResourceList{}, nil } +func (adc *attachDetachController) GetAttachedVolumesFromNodeStatus() (map[v1.UniqueVolumeName]string, error) { + return map[v1.UniqueVolumeName]string{}, nil +} + func (adc *attachDetachController) GetSecretFunc() func(namespace, name string) (*v1.Secret, error) { return func(_, _ string) (*v1.Secret, error) { return nil, fmt.Errorf("GetSecret unsupported in attachDetachController") diff --git a/pkg/controller/volume/expand/expand_controller.go b/pkg/controller/volume/expand/expand_controller.go index 1d9bbde29f1..78865c56b9f 100644 --- a/pkg/controller/volume/expand/expand_controller.go +++ b/pkg/controller/volume/expand/expand_controller.go @@ -456,6 +456,10 @@ func (expc *expandController) GetConfigMapFunc() func(namespace, name string) (* } } +func (expc *expandController) GetAttachedVolumesFromNodeStatus() (map[v1.UniqueVolumeName]string, error) { + return map[v1.UniqueVolumeName]string{}, nil +} + func (expc *expandController) GetServiceAccountTokenFunc() func(_, _ string, _ *authenticationv1.TokenRequest) (*authenticationv1.TokenRequest, error) { return func(_, _ string, _ *authenticationv1.TokenRequest) (*authenticationv1.TokenRequest, error) { return nil, fmt.Errorf("GetServiceAccountToken unsupported in expandController") diff --git a/pkg/controller/volume/persistentvolume/volume_host.go b/pkg/controller/volume/persistentvolume/volume_host.go index 8cd7c12e8b5..c064823c9c2 100644 --- a/pkg/controller/volume/persistentvolume/volume_host.go +++ b/pkg/controller/volume/persistentvolume/volume_host.go @@ -95,6 +95,10 @@ func (ctrl *PersistentVolumeController) GetNodeAllocatable() (v1.ResourceList, e return v1.ResourceList{}, nil } +func (ctrl *PersistentVolumeController) GetAttachedVolumesFromNodeStatus() (map[v1.UniqueVolumeName]string, error) { + return map[v1.UniqueVolumeName]string{}, nil +} + func (ctrl *PersistentVolumeController) GetSecretFunc() func(namespace, name string) (*v1.Secret, error) { return func(_, _ string) (*v1.Secret, error) { return nil, fmt.Errorf("GetSecret unsupported in PersistentVolumeController") diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 644432c31db..f257acf6c14 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -212,6 +212,12 @@ func newTestKubeletWithImageList( Address: testKubeletHostIP, }, }, + VolumesAttached: []v1.AttachedVolume{ + { + Name: "fake/fake-device", + DevicePath: "fake/path", + }, + }, }, }, }, diff --git a/pkg/kubelet/volume_host.go b/pkg/kubelet/volume_host.go index d2e538539bd..fe880506b2b 100644 --- a/pkg/kubelet/volume_host.go +++ b/pkg/kubelet/volume_host.go @@ -270,6 +270,20 @@ func (kvh *kubeletVolumeHost) GetNodeLabels() (map[string]string, error) { return node.Labels, nil } +func (kvh *kubeletVolumeHost) GetAttachedVolumesFromNodeStatus() (map[v1.UniqueVolumeName]string, error) { + node, err := kvh.kubelet.GetNode() + if err != nil { + return nil, fmt.Errorf("error retrieving node: %v", err) + } + attachedVolumes := node.Status.VolumesAttached + result := map[v1.UniqueVolumeName]string{} + for i := range attachedVolumes { + attachedVolume := attachedVolumes[i] + result[attachedVolume.Name] = attachedVolume.DevicePath + } + return result, nil +} + func (kvh *kubeletVolumeHost) GetNodeName() types.NodeName { return kvh.kubelet.nodeName } diff --git a/pkg/kubelet/volumemanager/reconciler/reconciler.go b/pkg/kubelet/volumemanager/reconciler/reconciler.go index 32ce780267c..c1efe7cd8dc 100644 --- a/pkg/kubelet/volumemanager/reconciler/reconciler.go +++ b/pkg/kubelet/volumemanager/reconciler/reconciler.go @@ -202,6 +202,11 @@ func (rc *reconciler) mountAttachVolumes() { volumeToMount.DevicePath = devicePath if cache.IsVolumeNotAttachedError(err) { if rc.controllerAttachDetachEnabled || !volumeToMount.PluginIsAttachable { + //// lets not spin a goroutine and unnecessarily trigger exponential backoff if this happens + if volumeToMount.PluginIsAttachable && !volumeToMount.ReportedInUse { + klog.V(5).InfoS(volumeToMount.GenerateMsgDetailed("operationExecutor.VerifyControllerAttachedVolume failed", " volume not marked in-use"), "pod", klog.KObj(volumeToMount.Pod)) + continue + } // Volume is not attached (or doesn't implement attacher), kubelet attach is disabled, wait // for controller to finish attaching volume. klog.V(5).InfoS(volumeToMount.GenerateMsgDetailed("Starting operationExecutor.VerifyControllerAttachedVolume", ""), "pod", klog.KObj(volumeToMount.Pod)) @@ -704,5 +709,5 @@ func getVolumesFromPodDir(podDir string) ([]podVolume, error) { // ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected. func isExpectedError(err error) bool { - return nestedpendingoperations.IsAlreadyExists(err) || exponentialbackoff.IsExponentialBackoff(err) + return nestedpendingoperations.IsAlreadyExists(err) || exponentialbackoff.IsExponentialBackoff(err) || operationexecutor.IsMountFailedPreconditionError(err) } diff --git a/pkg/kubelet/volumemanager/reconciler/reconciler_test.go b/pkg/kubelet/volumemanager/reconciler/reconciler_test.go index 2c70e0d49a9..b0c1c0aaa05 100644 --- a/pkg/kubelet/volumemanager/reconciler/reconciler_test.go +++ b/pkg/kubelet/volumemanager/reconciler/reconciler_test.go @@ -189,7 +189,20 @@ func Test_Run_Positive_VolumeAttachAndMount(t *testing.T) { // Verifies there are no attach/detach calls. func Test_Run_Positive_VolumeMountControllerAttachEnabled(t *testing.T) { // Arrange - volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgr(t) + node := &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: string(nodeName), + }, + Status: v1.NodeStatus{ + VolumesAttached: []v1.AttachedVolume{ + { + Name: "fake-plugin/fake-device1", + DevicePath: "fake/path", + }, + }, + }, + } + volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgrWithNode(t, node) dsw := cache.NewDesiredStateOfWorld(volumePluginMgr) asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr) kubeClient := createTestClient() @@ -261,6 +274,86 @@ func Test_Run_Positive_VolumeMountControllerAttachEnabled(t *testing.T) { assert.NoError(t, volumetesting.VerifyZeroDetachCallCount(fakePlugin)) } +// Populates desiredStateOfWorld cache with one volume/pod. +// Enables controllerAttachDetachEnabled. +// volume is not repored-in-use +// Calls Run() +// Verifies that there is not wait-for-mount call +// Verifies that there is no exponential-backoff triggered +func Test_Run_Negative_VolumeMountControllerAttachEnabled(t *testing.T) { + // Arrange + volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgr(t) + dsw := cache.NewDesiredStateOfWorld(volumePluginMgr) + asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr) + kubeClient := createTestClient() + fakeRecorder := &record.FakeRecorder{} + fakeHandler := volumetesting.NewBlockVolumePathHandler() + oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator( + kubeClient, + volumePluginMgr, + fakeRecorder, + false, /* checkNodeCapabilitiesBeforeMount */ + fakeHandler)) + reconciler := NewReconciler( + kubeClient, + true, /* controllerAttachDetachEnabled */ + reconcilerLoopSleepDuration, + waitForAttachTimeout, + nodeName, + dsw, + asw, + hasAddedPods, + oex, + mount.NewFakeMounter(nil), + hostutil.NewFakeHostUtil(nil), + volumePluginMgr, + kubeletPodsDir) + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod1", + UID: "pod1uid", + }, + Spec: v1.PodSpec{ + Volumes: []v1.Volume{ + { + Name: "volume-name", + VolumeSource: v1.VolumeSource{ + GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{ + PDName: "fake-device1", + }, + }, + }, + }, + }, + } + + volumeSpec := &volume.Spec{Volume: &pod.Spec.Volumes[0]} + podName := util.GetUniquePodName(pod) + generatedVolumeName, err := dsw.AddPodToVolume( + podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */) + + // Assert + if err != nil { + t.Fatalf("AddPodToVolume failed. Expected: Actual: <%v>", err) + } + + // Act + runReconciler(reconciler) + time.Sleep(reconcilerSyncWaitDuration) + + ok := oex.IsOperationSafeToRetry(generatedVolumeName, podName, nodeName, operationexecutor.VerifyControllerAttachedVolumeOpName) + if !ok { + t.Errorf("operation on volume %s is not safe to retry", generatedVolumeName) + } + + // Assert + assert.NoError(t, volumetesting.VerifyZeroAttachCalls(fakePlugin)) + assert.NoError(t, volumetesting.VerifyWaitForAttachCallCount( + 0 /* expectedWaitForAttachCallCount */, fakePlugin)) + assert.NoError(t, volumetesting.VerifyMountDeviceCallCount( + 0 /* expectedMountDeviceCallCount */, fakePlugin)) +} + // Populates desiredStateOfWorld cache with one volume/pod. // Calls Run() // Verifies there is one attach/mount/etc call and no detach calls. @@ -358,7 +451,20 @@ func Test_Run_Positive_VolumeAttachMountUnmountDetach(t *testing.T) { // Verifies there are no attach/detach calls made. func Test_Run_Positive_VolumeUnmountControllerAttachEnabled(t *testing.T) { // Arrange - volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgr(t) + node := &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: string(nodeName), + }, + Status: v1.NodeStatus{ + VolumesAttached: []v1.AttachedVolume{ + { + Name: "fake-plugin/fake-device1", + DevicePath: "fake/path", + }, + }, + }, + } + volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgrWithNode(t, node) dsw := cache.NewDesiredStateOfWorld(volumePluginMgr) asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr) kubeClient := createTestClient() @@ -580,9 +686,22 @@ func Test_Run_Positive_BlockVolumeMapControllerAttachEnabled(t *testing.T) { volumeSpec := &volume.Spec{ PersistentVolume: gcepv, } + node := &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: string(nodeName), + }, + Status: v1.NodeStatus{ + VolumesAttached: []v1.AttachedVolume{ + { + Name: "fake-plugin/fake-device1", + DevicePath: "fake/path", + }, + }, + }, + } // Arrange - volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgr(t) + volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgrWithNode(t, node) dsw := cache.NewDesiredStateOfWorld(volumePluginMgr) asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr) kubeClient := createtestClientWithPVPVC(gcepv, gcepvc, v1.AttachedVolume{ @@ -790,8 +909,22 @@ func Test_Run_Positive_VolumeUnmapControllerAttachEnabled(t *testing.T) { PersistentVolume: gcepv, } + node := &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: string(nodeName), + }, + Status: v1.NodeStatus{ + VolumesAttached: []v1.AttachedVolume{ + { + Name: "fake-plugin/fake-device1", + DevicePath: "/fake/path", + }, + }, + }, + } + // Arrange - volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgr(t) + volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgrWithNode(t, node) dsw := cache.NewDesiredStateOfWorld(volumePluginMgr) asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr) kubeClient := createtestClientWithPVPVC(gcepv, gcepvc, v1.AttachedVolume{ @@ -1099,7 +1232,21 @@ func Test_Run_Positive_VolumeFSResizeControllerAttachEnabled(t *testing.T) { // deep copy before reconciler runs to avoid data race. pvWithSize := pv.DeepCopy() - volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgr(t) + node := &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: string(nodeName), + }, + Spec: v1.NodeSpec{}, + Status: v1.NodeStatus{ + VolumesAttached: []v1.AttachedVolume{ + { + Name: v1.UniqueVolumeName(fmt.Sprintf("fake-plugin/%s", tc.pvName)), + DevicePath: "fake/path", + }, + }, + }, + } + volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgrWithNode(t, node) dsw := cache.NewDesiredStateOfWorld(volumePluginMgr) asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr) kubeClient := createtestClientWithPVPVC(pv, pvc, v1.AttachedVolume{ @@ -1274,7 +1421,21 @@ func Test_UncertainDeviceGlobalMounts(t *testing.T) { }, } - volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgr(t) + node := &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: string(nodeName), + }, + Spec: v1.NodeSpec{}, + Status: v1.NodeStatus{ + VolumesAttached: []v1.AttachedVolume{ + { + Name: v1.UniqueVolumeName(fmt.Sprintf("fake-plugin/%s", tc.volumeName)), + DevicePath: "fake/path", + }, + }, + }, + } + volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgrWithNode(t, node) fakePlugin.SupportsRemount = tc.supportRemount dsw := cache.NewDesiredStateOfWorld(volumePluginMgr) @@ -1484,7 +1645,21 @@ func Test_UncertainVolumeMountState(t *testing.T) { }, } - volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgr(t) + node := &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: string(nodeName), + }, + Status: v1.NodeStatus{ + VolumesAttached: []v1.AttachedVolume{ + { + Name: v1.UniqueVolumeName(fmt.Sprintf("fake-plugin/%s", tc.volumeName)), + DevicePath: "fake/path", + }, + }, + }, + } + + volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgrWithNode(t, node) fakePlugin.SupportsRemount = tc.supportRemount dsw := cache.NewDesiredStateOfWorld(volumePluginMgr) asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr) @@ -1786,7 +1961,20 @@ func createtestClientWithPVPVC(pv *v1.PersistentVolume, pvc *v1.PersistentVolume func Test_Run_Positive_VolumeMountControllerAttachEnabledRace(t *testing.T) { // Arrange - volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgr(t) + node := &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: string(nodeName), + }, + Status: v1.NodeStatus{ + VolumesAttached: []v1.AttachedVolume{ + { + Name: "fake-plugin/fake-device1", + DevicePath: "/fake/path", + }, + }, + }, + } + volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgrWithNode(t, node) dsw := cache.NewDesiredStateOfWorld(volumePluginMgr) asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr) diff --git a/pkg/kubelet/volumemanager/volume_manager_test.go b/pkg/kubelet/volumemanager/volume_manager_test.go index 9ea2531924d..5698c79e2e4 100644 --- a/pkg/kubelet/volumemanager/volume_manager_test.go +++ b/pkg/kubelet/volumemanager/volume_manager_test.go @@ -99,7 +99,7 @@ func TestGetMountedVolumesForPodAndGetVolumesInUse(t *testing.T) { node, pod, pv, claim := createObjects(test.pvMode, test.podMode) kubeClient := fake.NewSimpleClientset(node, pod, pv, claim) - manager := newTestVolumeManager(t, tmpDir, podManager, kubeClient) + manager := newTestVolumeManager(t, tmpDir, podManager, kubeClient, node) stopCh := runVolumeManager(manager) defer close(stopCh) @@ -161,7 +161,7 @@ func TestInitialPendingVolumesForPodAndGetVolumesInUse(t *testing.T) { kubeClient := fake.NewSimpleClientset(node, pod, pv, claim) - manager := newTestVolumeManager(t, tmpDir, podManager, kubeClient) + manager := newTestVolumeManager(t, tmpDir, podManager, kubeClient, node) stopCh := runVolumeManager(manager) defer close(stopCh) @@ -251,7 +251,7 @@ func TestGetExtraSupplementalGroupsForPod(t *testing.T) { } kubeClient := fake.NewSimpleClientset(node, pod, pv, claim) - manager := newTestVolumeManager(t, tmpDir, podManager, kubeClient) + manager := newTestVolumeManager(t, tmpDir, podManager, kubeClient, node) stopCh := runVolumeManager(manager) defer close(stopCh) @@ -292,12 +292,15 @@ func (p *fakePodStateProvider) ShouldPodContainersBeTerminating(uid kubetypes.UI return ok } -func newTestVolumeManager(t *testing.T, tmpDir string, podManager kubepod.Manager, kubeClient clientset.Interface) VolumeManager { +func newTestVolumeManager(t *testing.T, tmpDir string, podManager kubepod.Manager, kubeClient clientset.Interface, node *v1.Node) VolumeManager { plug := &volumetest.FakeVolumePlugin{PluginName: "fake", Host: nil} fakeRecorder := &record.FakeRecorder{} plugMgr := &volume.VolumePluginMgr{} // TODO (#51147) inject mock prober - plugMgr.InitPlugins([]volume.VolumePlugin{plug}, nil /* prober */, volumetest.NewFakeKubeletVolumeHost(t, tmpDir, kubeClient, nil)) + fakeVolumeHost := volumetest.NewFakeKubeletVolumeHost(t, tmpDir, kubeClient, nil) + fakeVolumeHost.WithNode(node) + + plugMgr.InitPlugins([]volume.VolumePlugin{plug}, nil /* prober */, fakeVolumeHost) stateProvider := &fakePodStateProvider{} fakePathHandler := volumetest.NewBlockVolumePathHandler() vm := NewVolumeManager( diff --git a/pkg/volume/plugins.go b/pkg/volume/plugins.go index eedfd7eebcd..fc80f4d071f 100644 --- a/pkg/volume/plugins.go +++ b/pkg/volume/plugins.go @@ -449,6 +449,8 @@ type VolumeHost interface { // Returns the name of the node GetNodeName() types.NodeName + GetAttachedVolumesFromNodeStatus() (map[v1.UniqueVolumeName]string, error) + // Returns the event recorder of kubelet. GetEventRecorder() record.EventRecorder diff --git a/pkg/volume/testing/testing.go b/pkg/volume/testing/testing.go index b70d23ac6dd..346cf4d9ad7 100644 --- a/pkg/volume/testing/testing.go +++ b/pkg/volume/testing/testing.go @@ -1655,6 +1655,19 @@ func GetTestKubeletVolumePluginMgr(t *testing.T) (*VolumePluginMgr, *FakeVolumeP return v.GetPluginMgr(), plugins[0].(*FakeVolumePlugin) } +func GetTestKubeletVolumePluginMgrWithNode(t *testing.T, node *v1.Node) (*VolumePluginMgr, *FakeVolumePlugin) { + plugins := ProbeVolumePlugins(VolumeConfig{}) + v := NewFakeKubeletVolumeHost( + t, + "", /* rootDir */ + nil, /* kubeClient */ + plugins, /* plugins */ + ) + v.WithNode(node) + + return v.GetPluginMgr(), plugins[0].(*FakeVolumePlugin) +} + // CreateTestPVC returns a provisionable PVC for tests func CreateTestPVC(capacity string, accessModes []v1.PersistentVolumeAccessMode) *v1.PersistentVolumeClaim { claim := v1.PersistentVolumeClaim{ diff --git a/pkg/volume/testing/volume_host.go b/pkg/volume/testing/volume_host.go index 802d19d3e47..e422acfdcf9 100644 --- a/pkg/volume/testing/volume_host.go +++ b/pkg/volume/testing/volume_host.go @@ -64,6 +64,7 @@ type fakeVolumeHost struct { nodeLabels map[string]string nodeName string subpather subpath.Interface + node *v1.Node csiDriverLister storagelistersv1.CSIDriverLister volumeAttachmentLister storagelistersv1.VolumeAttachmentLister informerFactory informers.SharedInformerFactory @@ -153,6 +154,10 @@ func (f *fakeVolumeHost) GetPluginMgr() *VolumePluginMgr { return f.pluginMgr } +func (f *fakeVolumeHost) GetAttachedVolumesFromNodeStatus() (map[v1.UniqueVolumeName]string, error) { + return map[v1.UniqueVolumeName]string{}, nil +} + func (f *fakeVolumeHost) NewWrapperMounter(volName string, spec Spec, pod *v1.Pod, opts VolumeOptions) (Mounter, error) { // The name of wrapper volume is set to "wrapped_{wrapped_volume_name}" wrapperVolumeName := "wrapped_" + volName @@ -305,25 +310,25 @@ type fakeKubeletVolumeHost struct { var _ KubeletVolumeHost = &fakeKubeletVolumeHost{} var _ FakeVolumeHost = &fakeKubeletVolumeHost{} -func NewFakeKubeletVolumeHost(t *testing.T, rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin) FakeVolumeHost { +func NewFakeKubeletVolumeHost(t *testing.T, rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin) *fakeKubeletVolumeHost { return newFakeKubeletVolumeHost(t, rootDir, kubeClient, plugins, nil, nil, "", nil, nil) } -func NewFakeKubeletVolumeHostWithCloudProvider(t *testing.T, rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, cloud cloudprovider.Interface) FakeVolumeHost { +func NewFakeKubeletVolumeHostWithCloudProvider(t *testing.T, rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, cloud cloudprovider.Interface) *fakeKubeletVolumeHost { return newFakeKubeletVolumeHost(t, rootDir, kubeClient, plugins, cloud, nil, "", nil, nil) } -func NewFakeKubeletVolumeHostWithNodeLabels(t *testing.T, rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, labels map[string]string) FakeVolumeHost { +func NewFakeKubeletVolumeHostWithNodeLabels(t *testing.T, rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, labels map[string]string) *fakeKubeletVolumeHost { volHost := newFakeKubeletVolumeHost(t, rootDir, kubeClient, plugins, nil, nil, "", nil, nil) volHost.nodeLabels = labels return volHost } -func NewFakeKubeletVolumeHostWithCSINodeName(t *testing.T, rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, nodeName string, driverLister storagelistersv1.CSIDriverLister, volumeAttachLister storagelistersv1.VolumeAttachmentLister) FakeVolumeHost { +func NewFakeKubeletVolumeHostWithCSINodeName(t *testing.T, rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, nodeName string, driverLister storagelistersv1.CSIDriverLister, volumeAttachLister storagelistersv1.VolumeAttachmentLister) *fakeKubeletVolumeHost { return newFakeKubeletVolumeHost(t, rootDir, kubeClient, plugins, nil, nil, nodeName, driverLister, volumeAttachLister) } -func NewFakeKubeletVolumeHostWithMounterFSType(t *testing.T, rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, pathToTypeMap map[string]hostutil.FileType) FakeVolumeHost { +func NewFakeKubeletVolumeHostWithMounterFSType(t *testing.T, rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, pathToTypeMap map[string]hostutil.FileType) *fakeKubeletVolumeHost { return newFakeKubeletVolumeHost(t, rootDir, kubeClient, plugins, nil, pathToTypeMap, "", nil, nil) } @@ -351,6 +356,11 @@ func newFakeKubeletVolumeHost(t *testing.T, rootDir string, kubeClient clientset return host } +func (f *fakeKubeletVolumeHost) WithNode(node *v1.Node) *fakeKubeletVolumeHost { + f.node = node + return f +} + func (f *fakeKubeletVolumeHost) SetKubeletError(err error) { f.mux.Lock() defer f.mux.Unlock() @@ -362,6 +372,17 @@ func (f *fakeKubeletVolumeHost) GetInformerFactory() informers.SharedInformerFac return f.informerFactory } +func (f *fakeKubeletVolumeHost) GetAttachedVolumesFromNodeStatus() (map[v1.UniqueVolumeName]string, error) { + result := map[v1.UniqueVolumeName]string{} + if f.node != nil { + for _, av := range f.node.Status.VolumesAttached { + result[av.Name] = av.DevicePath + } + } + + return result, nil +} + func (f *fakeKubeletVolumeHost) CSIDriverLister() storagelistersv1.CSIDriverLister { return f.csiDriverLister } diff --git a/pkg/volume/util/operationexecutor/operation_executor.go b/pkg/volume/util/operationexecutor/operation_executor.go index ca4f6ca0cf9..d7ec7989efe 100644 --- a/pkg/volume/util/operationexecutor/operation_executor.go +++ b/pkg/volume/util/operationexecutor/operation_executor.go @@ -21,6 +21,7 @@ limitations under the License. package operationexecutor import ( + "errors" "fmt" "time" @@ -418,6 +419,23 @@ const ( VolumeNotMounted VolumeMountState = "VolumeNotMounted" ) +type MountPreConditionFailed struct { + msg string +} + +func (err *MountPreConditionFailed) Error() string { + return err.msg +} + +func NewMountPreConditionFailedError(msg string) *MountPreConditionFailed { + return &MountPreConditionFailed{msg: msg} +} + +func IsMountFailedPreconditionError(err error) bool { + var failedPreconditionError *MountPreConditionFailed + return errors.As(err, &failedPreconditionError) +} + // GenerateMsgDetailed returns detailed msgs for volumes to mount func (volume *VolumeToMount) GenerateMsgDetailed(prefixMsg, suffixMsg string) (detailedMsg string) { detailedStr := fmt.Sprintf("(UniqueName: %q) pod %q (UID: %q)", volume.VolumeName, volume.Pod.Name, volume.Pod.UID) diff --git a/pkg/volume/util/operationexecutor/operation_generator.go b/pkg/volume/util/operationexecutor/operation_generator.go index d7cb5121a6c..9b7482f6f5d 100644 --- a/pkg/volume/util/operationexecutor/operation_generator.go +++ b/pkg/volume/util/operationexecutor/operation_generator.go @@ -48,9 +48,10 @@ import ( ) const ( - unknownVolumePlugin string = "UnknownVolumePlugin" - unknownAttachableVolumePlugin string = "UnknownAttachableVolumePlugin" - DetachOperationName string = "volume_detach" + unknownVolumePlugin string = "UnknownVolumePlugin" + unknownAttachableVolumePlugin string = "UnknownAttachableVolumePlugin" + DetachOperationName string = "volume_detach" + VerifyControllerAttachedVolumeOpName string = "verify_controller_attached_volume" ) // InTreeToCSITranslator contains methods required to check migratable status @@ -1514,6 +1515,20 @@ func (og *operationGenerator) GenerateVerifyControllerAttachedVolumeFunc( return volumetypes.GeneratedOperations{}, volumeToMount.GenerateErrorDetailed("VerifyControllerAttachedVolume.FindPluginBySpec failed", err) } + // For attachable volume types, lets check if volume is attached by reading from node lister. + // This would avoid exponential back-off and creation of goroutine unnecessarily. We still + // verify status of attached volume by directly reading from API server later on.This is necessarily + // to ensure any race conditions because of cached state in the informer. + if volumeToMount.PluginIsAttachable { + cachedAttachedVolumes, _ := og.volumePluginMgr.Host.GetAttachedVolumesFromNodeStatus() + if cachedAttachedVolumes != nil { + _, volumeFound := cachedAttachedVolumes[volumeToMount.VolumeName] + if !volumeFound { + return volumetypes.GeneratedOperations{}, NewMountPreConditionFailedError(fmt.Sprintf("volume %s is not yet in node's status", volumeToMount.VolumeName)) + } + } + } + verifyControllerAttachedVolumeFunc := func() volumetypes.OperationContext { migrated := getMigratedStatusBySpec(volumeToMount.VolumeSpec) if !volumeToMount.PluginIsAttachable { @@ -1579,7 +1594,7 @@ func (og *operationGenerator) GenerateVerifyControllerAttachedVolumeFunc( } return volumetypes.GeneratedOperations{ - OperationName: "verify_controller_attached_volume", + OperationName: VerifyControllerAttachedVolumeOpName, OperationFunc: verifyControllerAttachedVolumeFunc, CompleteFunc: util.OperationCompleteHook(util.GetFullQualifiedPluginNameForVolume(volumePlugin.GetPluginName(), volumeToMount.VolumeSpec), "verify_controller_attached_volume"), EventRecorderFunc: nil, // nil because we do not want to generate event on error