Merge pull request #106853 from gnufied/disable-exp-backoff-volume-not-inuse
When volume is not marked in-use, do not backoff
This commit is contained in:
commit
f0dbc32ed9
@ -858,6 +858,10 @@ func (adc *attachDetachController) GetNodeAllocatable() (v1.ResourceList, error)
|
|||||||
return v1.ResourceList{}, nil
|
return v1.ResourceList{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (adc *attachDetachController) GetAttachedVolumesFromNodeStatus() (map[v1.UniqueVolumeName]string, error) {
|
||||||
|
return map[v1.UniqueVolumeName]string{}, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (adc *attachDetachController) GetSecretFunc() func(namespace, name string) (*v1.Secret, error) {
|
func (adc *attachDetachController) GetSecretFunc() func(namespace, name string) (*v1.Secret, error) {
|
||||||
return func(_, _ string) (*v1.Secret, error) {
|
return func(_, _ string) (*v1.Secret, error) {
|
||||||
return nil, fmt.Errorf("GetSecret unsupported in attachDetachController")
|
return nil, fmt.Errorf("GetSecret unsupported in attachDetachController")
|
||||||
|
@ -456,6 +456,10 @@ func (expc *expandController) GetConfigMapFunc() func(namespace, name string) (*
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (expc *expandController) GetAttachedVolumesFromNodeStatus() (map[v1.UniqueVolumeName]string, error) {
|
||||||
|
return map[v1.UniqueVolumeName]string{}, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (expc *expandController) GetServiceAccountTokenFunc() func(_, _ string, _ *authenticationv1.TokenRequest) (*authenticationv1.TokenRequest, error) {
|
func (expc *expandController) GetServiceAccountTokenFunc() func(_, _ string, _ *authenticationv1.TokenRequest) (*authenticationv1.TokenRequest, error) {
|
||||||
return func(_, _ string, _ *authenticationv1.TokenRequest) (*authenticationv1.TokenRequest, error) {
|
return func(_, _ string, _ *authenticationv1.TokenRequest) (*authenticationv1.TokenRequest, error) {
|
||||||
return nil, fmt.Errorf("GetServiceAccountToken unsupported in expandController")
|
return nil, fmt.Errorf("GetServiceAccountToken unsupported in expandController")
|
||||||
|
@ -95,6 +95,10 @@ func (ctrl *PersistentVolumeController) GetNodeAllocatable() (v1.ResourceList, e
|
|||||||
return v1.ResourceList{}, nil
|
return v1.ResourceList{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (ctrl *PersistentVolumeController) GetAttachedVolumesFromNodeStatus() (map[v1.UniqueVolumeName]string, error) {
|
||||||
|
return map[v1.UniqueVolumeName]string{}, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (ctrl *PersistentVolumeController) GetSecretFunc() func(namespace, name string) (*v1.Secret, error) {
|
func (ctrl *PersistentVolumeController) GetSecretFunc() func(namespace, name string) (*v1.Secret, error) {
|
||||||
return func(_, _ string) (*v1.Secret, error) {
|
return func(_, _ string) (*v1.Secret, error) {
|
||||||
return nil, fmt.Errorf("GetSecret unsupported in PersistentVolumeController")
|
return nil, fmt.Errorf("GetSecret unsupported in PersistentVolumeController")
|
||||||
|
@ -212,6 +212,12 @@ func newTestKubeletWithImageList(
|
|||||||
Address: testKubeletHostIP,
|
Address: testKubeletHostIP,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
VolumesAttached: []v1.AttachedVolume{
|
||||||
|
{
|
||||||
|
Name: "fake/fake-device",
|
||||||
|
DevicePath: "fake/path",
|
||||||
|
},
|
||||||
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
@ -270,6 +270,20 @@ func (kvh *kubeletVolumeHost) GetNodeLabels() (map[string]string, error) {
|
|||||||
return node.Labels, nil
|
return node.Labels, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (kvh *kubeletVolumeHost) GetAttachedVolumesFromNodeStatus() (map[v1.UniqueVolumeName]string, error) {
|
||||||
|
node, err := kvh.kubelet.GetNode()
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("error retrieving node: %v", err)
|
||||||
|
}
|
||||||
|
attachedVolumes := node.Status.VolumesAttached
|
||||||
|
result := map[v1.UniqueVolumeName]string{}
|
||||||
|
for i := range attachedVolumes {
|
||||||
|
attachedVolume := attachedVolumes[i]
|
||||||
|
result[attachedVolume.Name] = attachedVolume.DevicePath
|
||||||
|
}
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (kvh *kubeletVolumeHost) GetNodeName() types.NodeName {
|
func (kvh *kubeletVolumeHost) GetNodeName() types.NodeName {
|
||||||
return kvh.kubelet.nodeName
|
return kvh.kubelet.nodeName
|
||||||
}
|
}
|
||||||
|
@ -202,6 +202,11 @@ func (rc *reconciler) mountAttachVolumes() {
|
|||||||
volumeToMount.DevicePath = devicePath
|
volumeToMount.DevicePath = devicePath
|
||||||
if cache.IsVolumeNotAttachedError(err) {
|
if cache.IsVolumeNotAttachedError(err) {
|
||||||
if rc.controllerAttachDetachEnabled || !volumeToMount.PluginIsAttachable {
|
if rc.controllerAttachDetachEnabled || !volumeToMount.PluginIsAttachable {
|
||||||
|
//// lets not spin a goroutine and unnecessarily trigger exponential backoff if this happens
|
||||||
|
if volumeToMount.PluginIsAttachable && !volumeToMount.ReportedInUse {
|
||||||
|
klog.V(5).InfoS(volumeToMount.GenerateMsgDetailed("operationExecutor.VerifyControllerAttachedVolume failed", " volume not marked in-use"), "pod", klog.KObj(volumeToMount.Pod))
|
||||||
|
continue
|
||||||
|
}
|
||||||
// Volume is not attached (or doesn't implement attacher), kubelet attach is disabled, wait
|
// Volume is not attached (or doesn't implement attacher), kubelet attach is disabled, wait
|
||||||
// for controller to finish attaching volume.
|
// for controller to finish attaching volume.
|
||||||
klog.V(5).InfoS(volumeToMount.GenerateMsgDetailed("Starting operationExecutor.VerifyControllerAttachedVolume", ""), "pod", klog.KObj(volumeToMount.Pod))
|
klog.V(5).InfoS(volumeToMount.GenerateMsgDetailed("Starting operationExecutor.VerifyControllerAttachedVolume", ""), "pod", klog.KObj(volumeToMount.Pod))
|
||||||
@ -704,5 +709,5 @@ func getVolumesFromPodDir(podDir string) ([]podVolume, error) {
|
|||||||
|
|
||||||
// ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
|
// ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
|
||||||
func isExpectedError(err error) bool {
|
func isExpectedError(err error) bool {
|
||||||
return nestedpendingoperations.IsAlreadyExists(err) || exponentialbackoff.IsExponentialBackoff(err)
|
return nestedpendingoperations.IsAlreadyExists(err) || exponentialbackoff.IsExponentialBackoff(err) || operationexecutor.IsMountFailedPreconditionError(err)
|
||||||
}
|
}
|
||||||
|
@ -189,7 +189,20 @@ func Test_Run_Positive_VolumeAttachAndMount(t *testing.T) {
|
|||||||
// Verifies there are no attach/detach calls.
|
// Verifies there are no attach/detach calls.
|
||||||
func Test_Run_Positive_VolumeMountControllerAttachEnabled(t *testing.T) {
|
func Test_Run_Positive_VolumeMountControllerAttachEnabled(t *testing.T) {
|
||||||
// Arrange
|
// Arrange
|
||||||
volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgr(t)
|
node := &v1.Node{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: string(nodeName),
|
||||||
|
},
|
||||||
|
Status: v1.NodeStatus{
|
||||||
|
VolumesAttached: []v1.AttachedVolume{
|
||||||
|
{
|
||||||
|
Name: "fake-plugin/fake-device1",
|
||||||
|
DevicePath: "fake/path",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgrWithNode(t, node)
|
||||||
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
|
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
|
||||||
asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
|
asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
|
||||||
kubeClient := createTestClient()
|
kubeClient := createTestClient()
|
||||||
@ -261,6 +274,86 @@ func Test_Run_Positive_VolumeMountControllerAttachEnabled(t *testing.T) {
|
|||||||
assert.NoError(t, volumetesting.VerifyZeroDetachCallCount(fakePlugin))
|
assert.NoError(t, volumetesting.VerifyZeroDetachCallCount(fakePlugin))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Populates desiredStateOfWorld cache with one volume/pod.
|
||||||
|
// Enables controllerAttachDetachEnabled.
|
||||||
|
// volume is not repored-in-use
|
||||||
|
// Calls Run()
|
||||||
|
// Verifies that there is not wait-for-mount call
|
||||||
|
// Verifies that there is no exponential-backoff triggered
|
||||||
|
func Test_Run_Negative_VolumeMountControllerAttachEnabled(t *testing.T) {
|
||||||
|
// Arrange
|
||||||
|
volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgr(t)
|
||||||
|
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
|
||||||
|
asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
|
||||||
|
kubeClient := createTestClient()
|
||||||
|
fakeRecorder := &record.FakeRecorder{}
|
||||||
|
fakeHandler := volumetesting.NewBlockVolumePathHandler()
|
||||||
|
oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
|
||||||
|
kubeClient,
|
||||||
|
volumePluginMgr,
|
||||||
|
fakeRecorder,
|
||||||
|
false, /* checkNodeCapabilitiesBeforeMount */
|
||||||
|
fakeHandler))
|
||||||
|
reconciler := NewReconciler(
|
||||||
|
kubeClient,
|
||||||
|
true, /* controllerAttachDetachEnabled */
|
||||||
|
reconcilerLoopSleepDuration,
|
||||||
|
waitForAttachTimeout,
|
||||||
|
nodeName,
|
||||||
|
dsw,
|
||||||
|
asw,
|
||||||
|
hasAddedPods,
|
||||||
|
oex,
|
||||||
|
mount.NewFakeMounter(nil),
|
||||||
|
hostutil.NewFakeHostUtil(nil),
|
||||||
|
volumePluginMgr,
|
||||||
|
kubeletPodsDir)
|
||||||
|
pod := &v1.Pod{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: "pod1",
|
||||||
|
UID: "pod1uid",
|
||||||
|
},
|
||||||
|
Spec: v1.PodSpec{
|
||||||
|
Volumes: []v1.Volume{
|
||||||
|
{
|
||||||
|
Name: "volume-name",
|
||||||
|
VolumeSource: v1.VolumeSource{
|
||||||
|
GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{
|
||||||
|
PDName: "fake-device1",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
volumeSpec := &volume.Spec{Volume: &pod.Spec.Volumes[0]}
|
||||||
|
podName := util.GetUniquePodName(pod)
|
||||||
|
generatedVolumeName, err := dsw.AddPodToVolume(
|
||||||
|
podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */)
|
||||||
|
|
||||||
|
// Assert
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Act
|
||||||
|
runReconciler(reconciler)
|
||||||
|
time.Sleep(reconcilerSyncWaitDuration)
|
||||||
|
|
||||||
|
ok := oex.IsOperationSafeToRetry(generatedVolumeName, podName, nodeName, operationexecutor.VerifyControllerAttachedVolumeOpName)
|
||||||
|
if !ok {
|
||||||
|
t.Errorf("operation on volume %s is not safe to retry", generatedVolumeName)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Assert
|
||||||
|
assert.NoError(t, volumetesting.VerifyZeroAttachCalls(fakePlugin))
|
||||||
|
assert.NoError(t, volumetesting.VerifyWaitForAttachCallCount(
|
||||||
|
0 /* expectedWaitForAttachCallCount */, fakePlugin))
|
||||||
|
assert.NoError(t, volumetesting.VerifyMountDeviceCallCount(
|
||||||
|
0 /* expectedMountDeviceCallCount */, fakePlugin))
|
||||||
|
}
|
||||||
|
|
||||||
// Populates desiredStateOfWorld cache with one volume/pod.
|
// Populates desiredStateOfWorld cache with one volume/pod.
|
||||||
// Calls Run()
|
// Calls Run()
|
||||||
// Verifies there is one attach/mount/etc call and no detach calls.
|
// Verifies there is one attach/mount/etc call and no detach calls.
|
||||||
@ -358,7 +451,20 @@ func Test_Run_Positive_VolumeAttachMountUnmountDetach(t *testing.T) {
|
|||||||
// Verifies there are no attach/detach calls made.
|
// Verifies there are no attach/detach calls made.
|
||||||
func Test_Run_Positive_VolumeUnmountControllerAttachEnabled(t *testing.T) {
|
func Test_Run_Positive_VolumeUnmountControllerAttachEnabled(t *testing.T) {
|
||||||
// Arrange
|
// Arrange
|
||||||
volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgr(t)
|
node := &v1.Node{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: string(nodeName),
|
||||||
|
},
|
||||||
|
Status: v1.NodeStatus{
|
||||||
|
VolumesAttached: []v1.AttachedVolume{
|
||||||
|
{
|
||||||
|
Name: "fake-plugin/fake-device1",
|
||||||
|
DevicePath: "fake/path",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgrWithNode(t, node)
|
||||||
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
|
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
|
||||||
asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
|
asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
|
||||||
kubeClient := createTestClient()
|
kubeClient := createTestClient()
|
||||||
@ -580,9 +686,22 @@ func Test_Run_Positive_BlockVolumeMapControllerAttachEnabled(t *testing.T) {
|
|||||||
volumeSpec := &volume.Spec{
|
volumeSpec := &volume.Spec{
|
||||||
PersistentVolume: gcepv,
|
PersistentVolume: gcepv,
|
||||||
}
|
}
|
||||||
|
node := &v1.Node{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: string(nodeName),
|
||||||
|
},
|
||||||
|
Status: v1.NodeStatus{
|
||||||
|
VolumesAttached: []v1.AttachedVolume{
|
||||||
|
{
|
||||||
|
Name: "fake-plugin/fake-device1",
|
||||||
|
DevicePath: "fake/path",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
// Arrange
|
// Arrange
|
||||||
volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgr(t)
|
volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgrWithNode(t, node)
|
||||||
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
|
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
|
||||||
asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
|
asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
|
||||||
kubeClient := createtestClientWithPVPVC(gcepv, gcepvc, v1.AttachedVolume{
|
kubeClient := createtestClientWithPVPVC(gcepv, gcepvc, v1.AttachedVolume{
|
||||||
@ -790,8 +909,22 @@ func Test_Run_Positive_VolumeUnmapControllerAttachEnabled(t *testing.T) {
|
|||||||
PersistentVolume: gcepv,
|
PersistentVolume: gcepv,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
node := &v1.Node{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: string(nodeName),
|
||||||
|
},
|
||||||
|
Status: v1.NodeStatus{
|
||||||
|
VolumesAttached: []v1.AttachedVolume{
|
||||||
|
{
|
||||||
|
Name: "fake-plugin/fake-device1",
|
||||||
|
DevicePath: "/fake/path",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
// Arrange
|
// Arrange
|
||||||
volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgr(t)
|
volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgrWithNode(t, node)
|
||||||
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
|
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
|
||||||
asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
|
asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
|
||||||
kubeClient := createtestClientWithPVPVC(gcepv, gcepvc, v1.AttachedVolume{
|
kubeClient := createtestClientWithPVPVC(gcepv, gcepvc, v1.AttachedVolume{
|
||||||
@ -1099,7 +1232,21 @@ func Test_Run_Positive_VolumeFSResizeControllerAttachEnabled(t *testing.T) {
|
|||||||
|
|
||||||
// deep copy before reconciler runs to avoid data race.
|
// deep copy before reconciler runs to avoid data race.
|
||||||
pvWithSize := pv.DeepCopy()
|
pvWithSize := pv.DeepCopy()
|
||||||
volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgr(t)
|
node := &v1.Node{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: string(nodeName),
|
||||||
|
},
|
||||||
|
Spec: v1.NodeSpec{},
|
||||||
|
Status: v1.NodeStatus{
|
||||||
|
VolumesAttached: []v1.AttachedVolume{
|
||||||
|
{
|
||||||
|
Name: v1.UniqueVolumeName(fmt.Sprintf("fake-plugin/%s", tc.pvName)),
|
||||||
|
DevicePath: "fake/path",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgrWithNode(t, node)
|
||||||
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
|
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
|
||||||
asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
|
asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
|
||||||
kubeClient := createtestClientWithPVPVC(pv, pvc, v1.AttachedVolume{
|
kubeClient := createtestClientWithPVPVC(pv, pvc, v1.AttachedVolume{
|
||||||
@ -1274,7 +1421,21 @@ func Test_UncertainDeviceGlobalMounts(t *testing.T) {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgr(t)
|
node := &v1.Node{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: string(nodeName),
|
||||||
|
},
|
||||||
|
Spec: v1.NodeSpec{},
|
||||||
|
Status: v1.NodeStatus{
|
||||||
|
VolumesAttached: []v1.AttachedVolume{
|
||||||
|
{
|
||||||
|
Name: v1.UniqueVolumeName(fmt.Sprintf("fake-plugin/%s", tc.volumeName)),
|
||||||
|
DevicePath: "fake/path",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgrWithNode(t, node)
|
||||||
fakePlugin.SupportsRemount = tc.supportRemount
|
fakePlugin.SupportsRemount = tc.supportRemount
|
||||||
|
|
||||||
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
|
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
|
||||||
@ -1484,7 +1645,21 @@ func Test_UncertainVolumeMountState(t *testing.T) {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgr(t)
|
node := &v1.Node{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: string(nodeName),
|
||||||
|
},
|
||||||
|
Status: v1.NodeStatus{
|
||||||
|
VolumesAttached: []v1.AttachedVolume{
|
||||||
|
{
|
||||||
|
Name: v1.UniqueVolumeName(fmt.Sprintf("fake-plugin/%s", tc.volumeName)),
|
||||||
|
DevicePath: "fake/path",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgrWithNode(t, node)
|
||||||
fakePlugin.SupportsRemount = tc.supportRemount
|
fakePlugin.SupportsRemount = tc.supportRemount
|
||||||
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
|
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
|
||||||
asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
|
asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
|
||||||
@ -1786,7 +1961,20 @@ func createtestClientWithPVPVC(pv *v1.PersistentVolume, pvc *v1.PersistentVolume
|
|||||||
|
|
||||||
func Test_Run_Positive_VolumeMountControllerAttachEnabledRace(t *testing.T) {
|
func Test_Run_Positive_VolumeMountControllerAttachEnabledRace(t *testing.T) {
|
||||||
// Arrange
|
// Arrange
|
||||||
volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgr(t)
|
node := &v1.Node{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: string(nodeName),
|
||||||
|
},
|
||||||
|
Status: v1.NodeStatus{
|
||||||
|
VolumesAttached: []v1.AttachedVolume{
|
||||||
|
{
|
||||||
|
Name: "fake-plugin/fake-device1",
|
||||||
|
DevicePath: "/fake/path",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgrWithNode(t, node)
|
||||||
|
|
||||||
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
|
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
|
||||||
asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
|
asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
|
||||||
|
@ -99,7 +99,7 @@ func TestGetMountedVolumesForPodAndGetVolumesInUse(t *testing.T) {
|
|||||||
node, pod, pv, claim := createObjects(test.pvMode, test.podMode)
|
node, pod, pv, claim := createObjects(test.pvMode, test.podMode)
|
||||||
kubeClient := fake.NewSimpleClientset(node, pod, pv, claim)
|
kubeClient := fake.NewSimpleClientset(node, pod, pv, claim)
|
||||||
|
|
||||||
manager := newTestVolumeManager(t, tmpDir, podManager, kubeClient)
|
manager := newTestVolumeManager(t, tmpDir, podManager, kubeClient, node)
|
||||||
|
|
||||||
stopCh := runVolumeManager(manager)
|
stopCh := runVolumeManager(manager)
|
||||||
defer close(stopCh)
|
defer close(stopCh)
|
||||||
@ -161,7 +161,7 @@ func TestInitialPendingVolumesForPodAndGetVolumesInUse(t *testing.T) {
|
|||||||
|
|
||||||
kubeClient := fake.NewSimpleClientset(node, pod, pv, claim)
|
kubeClient := fake.NewSimpleClientset(node, pod, pv, claim)
|
||||||
|
|
||||||
manager := newTestVolumeManager(t, tmpDir, podManager, kubeClient)
|
manager := newTestVolumeManager(t, tmpDir, podManager, kubeClient, node)
|
||||||
|
|
||||||
stopCh := runVolumeManager(manager)
|
stopCh := runVolumeManager(manager)
|
||||||
defer close(stopCh)
|
defer close(stopCh)
|
||||||
@ -251,7 +251,7 @@ func TestGetExtraSupplementalGroupsForPod(t *testing.T) {
|
|||||||
}
|
}
|
||||||
kubeClient := fake.NewSimpleClientset(node, pod, pv, claim)
|
kubeClient := fake.NewSimpleClientset(node, pod, pv, claim)
|
||||||
|
|
||||||
manager := newTestVolumeManager(t, tmpDir, podManager, kubeClient)
|
manager := newTestVolumeManager(t, tmpDir, podManager, kubeClient, node)
|
||||||
|
|
||||||
stopCh := runVolumeManager(manager)
|
stopCh := runVolumeManager(manager)
|
||||||
defer close(stopCh)
|
defer close(stopCh)
|
||||||
@ -292,12 +292,15 @@ func (p *fakePodStateProvider) ShouldPodContainersBeTerminating(uid kubetypes.UI
|
|||||||
return ok
|
return ok
|
||||||
}
|
}
|
||||||
|
|
||||||
func newTestVolumeManager(t *testing.T, tmpDir string, podManager kubepod.Manager, kubeClient clientset.Interface) VolumeManager {
|
func newTestVolumeManager(t *testing.T, tmpDir string, podManager kubepod.Manager, kubeClient clientset.Interface, node *v1.Node) VolumeManager {
|
||||||
plug := &volumetest.FakeVolumePlugin{PluginName: "fake", Host: nil}
|
plug := &volumetest.FakeVolumePlugin{PluginName: "fake", Host: nil}
|
||||||
fakeRecorder := &record.FakeRecorder{}
|
fakeRecorder := &record.FakeRecorder{}
|
||||||
plugMgr := &volume.VolumePluginMgr{}
|
plugMgr := &volume.VolumePluginMgr{}
|
||||||
// TODO (#51147) inject mock prober
|
// TODO (#51147) inject mock prober
|
||||||
plugMgr.InitPlugins([]volume.VolumePlugin{plug}, nil /* prober */, volumetest.NewFakeKubeletVolumeHost(t, tmpDir, kubeClient, nil))
|
fakeVolumeHost := volumetest.NewFakeKubeletVolumeHost(t, tmpDir, kubeClient, nil)
|
||||||
|
fakeVolumeHost.WithNode(node)
|
||||||
|
|
||||||
|
plugMgr.InitPlugins([]volume.VolumePlugin{plug}, nil /* prober */, fakeVolumeHost)
|
||||||
stateProvider := &fakePodStateProvider{}
|
stateProvider := &fakePodStateProvider{}
|
||||||
fakePathHandler := volumetest.NewBlockVolumePathHandler()
|
fakePathHandler := volumetest.NewBlockVolumePathHandler()
|
||||||
vm := NewVolumeManager(
|
vm := NewVolumeManager(
|
||||||
|
@ -449,6 +449,8 @@ type VolumeHost interface {
|
|||||||
// Returns the name of the node
|
// Returns the name of the node
|
||||||
GetNodeName() types.NodeName
|
GetNodeName() types.NodeName
|
||||||
|
|
||||||
|
GetAttachedVolumesFromNodeStatus() (map[v1.UniqueVolumeName]string, error)
|
||||||
|
|
||||||
// Returns the event recorder of kubelet.
|
// Returns the event recorder of kubelet.
|
||||||
GetEventRecorder() record.EventRecorder
|
GetEventRecorder() record.EventRecorder
|
||||||
|
|
||||||
|
@ -1655,6 +1655,19 @@ func GetTestKubeletVolumePluginMgr(t *testing.T) (*VolumePluginMgr, *FakeVolumeP
|
|||||||
return v.GetPluginMgr(), plugins[0].(*FakeVolumePlugin)
|
return v.GetPluginMgr(), plugins[0].(*FakeVolumePlugin)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func GetTestKubeletVolumePluginMgrWithNode(t *testing.T, node *v1.Node) (*VolumePluginMgr, *FakeVolumePlugin) {
|
||||||
|
plugins := ProbeVolumePlugins(VolumeConfig{})
|
||||||
|
v := NewFakeKubeletVolumeHost(
|
||||||
|
t,
|
||||||
|
"", /* rootDir */
|
||||||
|
nil, /* kubeClient */
|
||||||
|
plugins, /* plugins */
|
||||||
|
)
|
||||||
|
v.WithNode(node)
|
||||||
|
|
||||||
|
return v.GetPluginMgr(), plugins[0].(*FakeVolumePlugin)
|
||||||
|
}
|
||||||
|
|
||||||
// CreateTestPVC returns a provisionable PVC for tests
|
// CreateTestPVC returns a provisionable PVC for tests
|
||||||
func CreateTestPVC(capacity string, accessModes []v1.PersistentVolumeAccessMode) *v1.PersistentVolumeClaim {
|
func CreateTestPVC(capacity string, accessModes []v1.PersistentVolumeAccessMode) *v1.PersistentVolumeClaim {
|
||||||
claim := v1.PersistentVolumeClaim{
|
claim := v1.PersistentVolumeClaim{
|
||||||
|
@ -64,6 +64,7 @@ type fakeVolumeHost struct {
|
|||||||
nodeLabels map[string]string
|
nodeLabels map[string]string
|
||||||
nodeName string
|
nodeName string
|
||||||
subpather subpath.Interface
|
subpather subpath.Interface
|
||||||
|
node *v1.Node
|
||||||
csiDriverLister storagelistersv1.CSIDriverLister
|
csiDriverLister storagelistersv1.CSIDriverLister
|
||||||
volumeAttachmentLister storagelistersv1.VolumeAttachmentLister
|
volumeAttachmentLister storagelistersv1.VolumeAttachmentLister
|
||||||
informerFactory informers.SharedInformerFactory
|
informerFactory informers.SharedInformerFactory
|
||||||
@ -153,6 +154,10 @@ func (f *fakeVolumeHost) GetPluginMgr() *VolumePluginMgr {
|
|||||||
return f.pluginMgr
|
return f.pluginMgr
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (f *fakeVolumeHost) GetAttachedVolumesFromNodeStatus() (map[v1.UniqueVolumeName]string, error) {
|
||||||
|
return map[v1.UniqueVolumeName]string{}, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (f *fakeVolumeHost) NewWrapperMounter(volName string, spec Spec, pod *v1.Pod, opts VolumeOptions) (Mounter, error) {
|
func (f *fakeVolumeHost) NewWrapperMounter(volName string, spec Spec, pod *v1.Pod, opts VolumeOptions) (Mounter, error) {
|
||||||
// The name of wrapper volume is set to "wrapped_{wrapped_volume_name}"
|
// The name of wrapper volume is set to "wrapped_{wrapped_volume_name}"
|
||||||
wrapperVolumeName := "wrapped_" + volName
|
wrapperVolumeName := "wrapped_" + volName
|
||||||
@ -305,25 +310,25 @@ type fakeKubeletVolumeHost struct {
|
|||||||
var _ KubeletVolumeHost = &fakeKubeletVolumeHost{}
|
var _ KubeletVolumeHost = &fakeKubeletVolumeHost{}
|
||||||
var _ FakeVolumeHost = &fakeKubeletVolumeHost{}
|
var _ FakeVolumeHost = &fakeKubeletVolumeHost{}
|
||||||
|
|
||||||
func NewFakeKubeletVolumeHost(t *testing.T, rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin) FakeVolumeHost {
|
func NewFakeKubeletVolumeHost(t *testing.T, rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin) *fakeKubeletVolumeHost {
|
||||||
return newFakeKubeletVolumeHost(t, rootDir, kubeClient, plugins, nil, nil, "", nil, nil)
|
return newFakeKubeletVolumeHost(t, rootDir, kubeClient, plugins, nil, nil, "", nil, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewFakeKubeletVolumeHostWithCloudProvider(t *testing.T, rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, cloud cloudprovider.Interface) FakeVolumeHost {
|
func NewFakeKubeletVolumeHostWithCloudProvider(t *testing.T, rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, cloud cloudprovider.Interface) *fakeKubeletVolumeHost {
|
||||||
return newFakeKubeletVolumeHost(t, rootDir, kubeClient, plugins, cloud, nil, "", nil, nil)
|
return newFakeKubeletVolumeHost(t, rootDir, kubeClient, plugins, cloud, nil, "", nil, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewFakeKubeletVolumeHostWithNodeLabels(t *testing.T, rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, labels map[string]string) FakeVolumeHost {
|
func NewFakeKubeletVolumeHostWithNodeLabels(t *testing.T, rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, labels map[string]string) *fakeKubeletVolumeHost {
|
||||||
volHost := newFakeKubeletVolumeHost(t, rootDir, kubeClient, plugins, nil, nil, "", nil, nil)
|
volHost := newFakeKubeletVolumeHost(t, rootDir, kubeClient, plugins, nil, nil, "", nil, nil)
|
||||||
volHost.nodeLabels = labels
|
volHost.nodeLabels = labels
|
||||||
return volHost
|
return volHost
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewFakeKubeletVolumeHostWithCSINodeName(t *testing.T, rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, nodeName string, driverLister storagelistersv1.CSIDriverLister, volumeAttachLister storagelistersv1.VolumeAttachmentLister) FakeVolumeHost {
|
func NewFakeKubeletVolumeHostWithCSINodeName(t *testing.T, rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, nodeName string, driverLister storagelistersv1.CSIDriverLister, volumeAttachLister storagelistersv1.VolumeAttachmentLister) *fakeKubeletVolumeHost {
|
||||||
return newFakeKubeletVolumeHost(t, rootDir, kubeClient, plugins, nil, nil, nodeName, driverLister, volumeAttachLister)
|
return newFakeKubeletVolumeHost(t, rootDir, kubeClient, plugins, nil, nil, nodeName, driverLister, volumeAttachLister)
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewFakeKubeletVolumeHostWithMounterFSType(t *testing.T, rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, pathToTypeMap map[string]hostutil.FileType) FakeVolumeHost {
|
func NewFakeKubeletVolumeHostWithMounterFSType(t *testing.T, rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, pathToTypeMap map[string]hostutil.FileType) *fakeKubeletVolumeHost {
|
||||||
return newFakeKubeletVolumeHost(t, rootDir, kubeClient, plugins, nil, pathToTypeMap, "", nil, nil)
|
return newFakeKubeletVolumeHost(t, rootDir, kubeClient, plugins, nil, pathToTypeMap, "", nil, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -351,6 +356,11 @@ func newFakeKubeletVolumeHost(t *testing.T, rootDir string, kubeClient clientset
|
|||||||
return host
|
return host
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (f *fakeKubeletVolumeHost) WithNode(node *v1.Node) *fakeKubeletVolumeHost {
|
||||||
|
f.node = node
|
||||||
|
return f
|
||||||
|
}
|
||||||
|
|
||||||
func (f *fakeKubeletVolumeHost) SetKubeletError(err error) {
|
func (f *fakeKubeletVolumeHost) SetKubeletError(err error) {
|
||||||
f.mux.Lock()
|
f.mux.Lock()
|
||||||
defer f.mux.Unlock()
|
defer f.mux.Unlock()
|
||||||
@ -362,6 +372,17 @@ func (f *fakeKubeletVolumeHost) GetInformerFactory() informers.SharedInformerFac
|
|||||||
return f.informerFactory
|
return f.informerFactory
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (f *fakeKubeletVolumeHost) GetAttachedVolumesFromNodeStatus() (map[v1.UniqueVolumeName]string, error) {
|
||||||
|
result := map[v1.UniqueVolumeName]string{}
|
||||||
|
if f.node != nil {
|
||||||
|
for _, av := range f.node.Status.VolumesAttached {
|
||||||
|
result[av.Name] = av.DevicePath
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (f *fakeKubeletVolumeHost) CSIDriverLister() storagelistersv1.CSIDriverLister {
|
func (f *fakeKubeletVolumeHost) CSIDriverLister() storagelistersv1.CSIDriverLister {
|
||||||
return f.csiDriverLister
|
return f.csiDriverLister
|
||||||
}
|
}
|
||||||
|
@ -21,6 +21,7 @@ limitations under the License.
|
|||||||
package operationexecutor
|
package operationexecutor
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -418,6 +419,23 @@ const (
|
|||||||
VolumeNotMounted VolumeMountState = "VolumeNotMounted"
|
VolumeNotMounted VolumeMountState = "VolumeNotMounted"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type MountPreConditionFailed struct {
|
||||||
|
msg string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (err *MountPreConditionFailed) Error() string {
|
||||||
|
return err.msg
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewMountPreConditionFailedError(msg string) *MountPreConditionFailed {
|
||||||
|
return &MountPreConditionFailed{msg: msg}
|
||||||
|
}
|
||||||
|
|
||||||
|
func IsMountFailedPreconditionError(err error) bool {
|
||||||
|
var failedPreconditionError *MountPreConditionFailed
|
||||||
|
return errors.As(err, &failedPreconditionError)
|
||||||
|
}
|
||||||
|
|
||||||
// GenerateMsgDetailed returns detailed msgs for volumes to mount
|
// GenerateMsgDetailed returns detailed msgs for volumes to mount
|
||||||
func (volume *VolumeToMount) GenerateMsgDetailed(prefixMsg, suffixMsg string) (detailedMsg string) {
|
func (volume *VolumeToMount) GenerateMsgDetailed(prefixMsg, suffixMsg string) (detailedMsg string) {
|
||||||
detailedStr := fmt.Sprintf("(UniqueName: %q) pod %q (UID: %q)", volume.VolumeName, volume.Pod.Name, volume.Pod.UID)
|
detailedStr := fmt.Sprintf("(UniqueName: %q) pod %q (UID: %q)", volume.VolumeName, volume.Pod.Name, volume.Pod.UID)
|
||||||
|
@ -48,9 +48,10 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
unknownVolumePlugin string = "UnknownVolumePlugin"
|
unknownVolumePlugin string = "UnknownVolumePlugin"
|
||||||
unknownAttachableVolumePlugin string = "UnknownAttachableVolumePlugin"
|
unknownAttachableVolumePlugin string = "UnknownAttachableVolumePlugin"
|
||||||
DetachOperationName string = "volume_detach"
|
DetachOperationName string = "volume_detach"
|
||||||
|
VerifyControllerAttachedVolumeOpName string = "verify_controller_attached_volume"
|
||||||
)
|
)
|
||||||
|
|
||||||
// InTreeToCSITranslator contains methods required to check migratable status
|
// InTreeToCSITranslator contains methods required to check migratable status
|
||||||
@ -1514,6 +1515,20 @@ func (og *operationGenerator) GenerateVerifyControllerAttachedVolumeFunc(
|
|||||||
return volumetypes.GeneratedOperations{}, volumeToMount.GenerateErrorDetailed("VerifyControllerAttachedVolume.FindPluginBySpec failed", err)
|
return volumetypes.GeneratedOperations{}, volumeToMount.GenerateErrorDetailed("VerifyControllerAttachedVolume.FindPluginBySpec failed", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// For attachable volume types, lets check if volume is attached by reading from node lister.
|
||||||
|
// This would avoid exponential back-off and creation of goroutine unnecessarily. We still
|
||||||
|
// verify status of attached volume by directly reading from API server later on.This is necessarily
|
||||||
|
// to ensure any race conditions because of cached state in the informer.
|
||||||
|
if volumeToMount.PluginIsAttachable {
|
||||||
|
cachedAttachedVolumes, _ := og.volumePluginMgr.Host.GetAttachedVolumesFromNodeStatus()
|
||||||
|
if cachedAttachedVolumes != nil {
|
||||||
|
_, volumeFound := cachedAttachedVolumes[volumeToMount.VolumeName]
|
||||||
|
if !volumeFound {
|
||||||
|
return volumetypes.GeneratedOperations{}, NewMountPreConditionFailedError(fmt.Sprintf("volume %s is not yet in node's status", volumeToMount.VolumeName))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
verifyControllerAttachedVolumeFunc := func() volumetypes.OperationContext {
|
verifyControllerAttachedVolumeFunc := func() volumetypes.OperationContext {
|
||||||
migrated := getMigratedStatusBySpec(volumeToMount.VolumeSpec)
|
migrated := getMigratedStatusBySpec(volumeToMount.VolumeSpec)
|
||||||
if !volumeToMount.PluginIsAttachable {
|
if !volumeToMount.PluginIsAttachable {
|
||||||
@ -1579,7 +1594,7 @@ func (og *operationGenerator) GenerateVerifyControllerAttachedVolumeFunc(
|
|||||||
}
|
}
|
||||||
|
|
||||||
return volumetypes.GeneratedOperations{
|
return volumetypes.GeneratedOperations{
|
||||||
OperationName: "verify_controller_attached_volume",
|
OperationName: VerifyControllerAttachedVolumeOpName,
|
||||||
OperationFunc: verifyControllerAttachedVolumeFunc,
|
OperationFunc: verifyControllerAttachedVolumeFunc,
|
||||||
CompleteFunc: util.OperationCompleteHook(util.GetFullQualifiedPluginNameForVolume(volumePlugin.GetPluginName(), volumeToMount.VolumeSpec), "verify_controller_attached_volume"),
|
CompleteFunc: util.OperationCompleteHook(util.GetFullQualifiedPluginNameForVolume(volumePlugin.GetPluginName(), volumeToMount.VolumeSpec), "verify_controller_attached_volume"),
|
||||||
EventRecorderFunc: nil, // nil because we do not want to generate event on error
|
EventRecorderFunc: nil, // nil because we do not want to generate event on error
|
||||||
|
Loading…
Reference in New Issue
Block a user