
The path module has a few different functions: Clean, Split, Join, Ext, Dir, Base, IsAbs. These functions do not take into account the OS-specific path separator, meaning that they won't behave as intended on Windows. For example, Dir is supposed to return all but the last element of the path. For the path "C:\some\dir\somewhere", it is supposed to return "C:\some\dir\", however, it returns ".". Instead of these functions, the ones in filepath should be used instead.
2455 lines
78 KiB
Go
2455 lines
78 KiB
Go
/*
|
|
Copyright 2016 The Kubernetes Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package reconciler
|
|
|
|
import (
|
|
"crypto/md5"
|
|
"fmt"
|
|
"os"
|
|
"path/filepath"
|
|
"testing"
|
|
"time"
|
|
|
|
csitrans "k8s.io/csi-translation-lib"
|
|
"k8s.io/kubernetes/pkg/volume/csimigration"
|
|
|
|
"github.com/stretchr/testify/assert"
|
|
"k8s.io/mount-utils"
|
|
|
|
v1 "k8s.io/api/core/v1"
|
|
"k8s.io/apimachinery/pkg/api/resource"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/runtime"
|
|
k8stypes "k8s.io/apimachinery/pkg/types"
|
|
"k8s.io/apimachinery/pkg/util/wait"
|
|
"k8s.io/client-go/kubernetes/fake"
|
|
core "k8s.io/client-go/testing"
|
|
"k8s.io/client-go/tools/record"
|
|
"k8s.io/klog/v2"
|
|
"k8s.io/kubernetes/pkg/kubelet/volumemanager/cache"
|
|
"k8s.io/kubernetes/pkg/volume"
|
|
volumetesting "k8s.io/kubernetes/pkg/volume/testing"
|
|
"k8s.io/kubernetes/pkg/volume/util"
|
|
"k8s.io/kubernetes/pkg/volume/util/hostutil"
|
|
"k8s.io/kubernetes/pkg/volume/util/operationexecutor"
|
|
"k8s.io/kubernetes/pkg/volume/util/types"
|
|
)
|
|
|
|
const (
|
|
// reconcilerLoopSleepDuration is the amount of time the reconciler loop
|
|
// waits between successive executions
|
|
reconcilerLoopSleepDuration = 1 * time.Nanosecond
|
|
// waitForAttachTimeout is the maximum amount of time a
|
|
// operationexecutor.Mount call will wait for a volume to be attached.
|
|
waitForAttachTimeout = 1 * time.Second
|
|
nodeName = k8stypes.NodeName("mynodename")
|
|
kubeletPodsDir = "fake-dir"
|
|
testOperationBackOffDuration = 100 * time.Millisecond
|
|
reconcilerSyncWaitDuration = 10 * time.Second
|
|
)
|
|
|
|
func hasAddedPods() bool { return true }
|
|
|
|
// Calls Run()
|
|
// Verifies there are no calls to attach, detach, mount, unmount, etc.
|
|
func Test_Run_Positive_DoNothing(t *testing.T) {
|
|
// Arrange
|
|
volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgr(t)
|
|
seLinuxTranslator := util.NewFakeSELinuxLabelTranslator()
|
|
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr, seLinuxTranslator)
|
|
asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
|
|
kubeClient := createTestClient()
|
|
fakeRecorder := &record.FakeRecorder{}
|
|
fakeHandler := volumetesting.NewBlockVolumePathHandler()
|
|
oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
|
|
kubeClient,
|
|
volumePluginMgr,
|
|
fakeRecorder,
|
|
fakeHandler,
|
|
))
|
|
reconciler := NewReconciler(
|
|
kubeClient,
|
|
false, /* controllerAttachDetachEnabled */
|
|
reconcilerLoopSleepDuration,
|
|
waitForAttachTimeout,
|
|
nodeName,
|
|
dsw,
|
|
asw,
|
|
hasAddedPods,
|
|
oex,
|
|
mount.NewFakeMounter(nil),
|
|
hostutil.NewFakeHostUtil(nil),
|
|
volumePluginMgr,
|
|
kubeletPodsDir)
|
|
|
|
// Act
|
|
runReconciler(reconciler)
|
|
|
|
// Assert
|
|
assert.NoError(t, volumetesting.VerifyZeroAttachCalls(fakePlugin))
|
|
assert.NoError(t, volumetesting.VerifyZeroWaitForAttachCallCount(fakePlugin))
|
|
assert.NoError(t, volumetesting.VerifyZeroMountDeviceCallCount(fakePlugin))
|
|
assert.NoError(t, volumetesting.VerifyZeroSetUpCallCount(fakePlugin))
|
|
assert.NoError(t, volumetesting.VerifyZeroTearDownCallCount(fakePlugin))
|
|
assert.NoError(t, volumetesting.VerifyZeroDetachCallCount(fakePlugin))
|
|
}
|
|
|
|
// Populates desiredStateOfWorld cache with one volume/pod.
|
|
// Calls Run()
|
|
// Verifies there is are attach/mount/etc calls and no detach/unmount calls.
|
|
func Test_Run_Positive_VolumeAttachAndMount(t *testing.T) {
|
|
// Arrange
|
|
volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgr(t)
|
|
seLinuxTranslator := util.NewFakeSELinuxLabelTranslator()
|
|
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr, seLinuxTranslator)
|
|
asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
|
|
kubeClient := createTestClient()
|
|
fakeRecorder := &record.FakeRecorder{}
|
|
fakeHandler := volumetesting.NewBlockVolumePathHandler()
|
|
oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
|
|
kubeClient,
|
|
volumePluginMgr,
|
|
fakeRecorder,
|
|
fakeHandler))
|
|
reconciler := NewReconciler(
|
|
kubeClient,
|
|
false, /* controllerAttachDetachEnabled */
|
|
reconcilerLoopSleepDuration,
|
|
waitForAttachTimeout,
|
|
nodeName,
|
|
dsw,
|
|
asw,
|
|
hasAddedPods,
|
|
oex,
|
|
mount.NewFakeMounter(nil),
|
|
hostutil.NewFakeHostUtil(nil),
|
|
volumePluginMgr,
|
|
kubeletPodsDir)
|
|
pod := &v1.Pod{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "pod1",
|
|
UID: "pod1uid",
|
|
},
|
|
Spec: v1.PodSpec{
|
|
Volumes: []v1.Volume{
|
|
{
|
|
Name: "volume-name",
|
|
VolumeSource: v1.VolumeSource{
|
|
GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{
|
|
PDName: "fake-device1",
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
volumeSpec := &volume.Spec{Volume: &pod.Spec.Volumes[0]}
|
|
podName := util.GetUniquePodName(pod)
|
|
generatedVolumeName, err := dsw.AddPodToVolume(
|
|
podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */, nil /* seLinuxLabel */)
|
|
|
|
// Assert
|
|
if err != nil {
|
|
t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
|
|
}
|
|
|
|
// Act
|
|
runReconciler(reconciler)
|
|
waitForMount(t, fakePlugin, generatedVolumeName, asw)
|
|
// Assert
|
|
assert.NoError(t, volumetesting.VerifyAttachCallCount(
|
|
1 /* expectedAttachCallCount */, fakePlugin))
|
|
assert.NoError(t, volumetesting.VerifyWaitForAttachCallCount(
|
|
1 /* expectedWaitForAttachCallCount */, fakePlugin))
|
|
assert.NoError(t, volumetesting.VerifyMountDeviceCallCount(
|
|
1 /* expectedMountDeviceCallCount */, fakePlugin))
|
|
assert.NoError(t, volumetesting.VerifySetUpCallCount(
|
|
1 /* expectedSetUpCallCount */, fakePlugin))
|
|
assert.NoError(t, volumetesting.VerifyZeroTearDownCallCount(fakePlugin))
|
|
assert.NoError(t, volumetesting.VerifyZeroDetachCallCount(fakePlugin))
|
|
}
|
|
|
|
// Populates desiredStateOfWorld cache with one volume/pod.
|
|
// Calls Run()
|
|
// Verifies there is are attach/mount/etc calls and no detach/unmount calls.
|
|
func Test_Run_Positive_VolumeAttachAndMountMigrationEnabled(t *testing.T) {
|
|
// Arrange
|
|
intreeToCSITranslator := csitrans.New()
|
|
node := &v1.Node{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: string(nodeName),
|
|
},
|
|
Spec: v1.NodeSpec{},
|
|
Status: v1.NodeStatus{
|
|
VolumesAttached: []v1.AttachedVolume{
|
|
{
|
|
Name: v1.UniqueVolumeName(fmt.Sprintf("fake-plugin/%s", "pd.csi.storage.gke.io-fake-device1")),
|
|
DevicePath: "fake/path",
|
|
},
|
|
},
|
|
},
|
|
}
|
|
volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgrWithNode(t, node)
|
|
seLinuxTranslator := util.NewFakeSELinuxLabelTranslator()
|
|
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr, seLinuxTranslator)
|
|
|
|
asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
|
|
kubeClient := createTestClient(v1.AttachedVolume{
|
|
Name: v1.UniqueVolumeName(fmt.Sprintf("fake-plugin/%s", "pd.csi.storage.gke.io-fake-device1")),
|
|
DevicePath: "fake/path",
|
|
})
|
|
|
|
fakeRecorder := &record.FakeRecorder{}
|
|
fakeHandler := volumetesting.NewBlockVolumePathHandler()
|
|
oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
|
|
kubeClient,
|
|
volumePluginMgr,
|
|
fakeRecorder,
|
|
fakeHandler))
|
|
reconciler := NewReconciler(
|
|
kubeClient,
|
|
true, /* controllerAttachDetachEnabled */
|
|
reconcilerLoopSleepDuration,
|
|
waitForAttachTimeout,
|
|
nodeName,
|
|
dsw,
|
|
asw,
|
|
hasAddedPods,
|
|
oex,
|
|
mount.NewFakeMounter(nil),
|
|
hostutil.NewFakeHostUtil(nil),
|
|
volumePluginMgr,
|
|
kubeletPodsDir)
|
|
pod := &v1.Pod{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "pod1",
|
|
UID: "pod1uid",
|
|
},
|
|
Spec: v1.PodSpec{
|
|
Volumes: []v1.Volume{
|
|
{
|
|
Name: "volume-name",
|
|
VolumeSource: v1.VolumeSource{
|
|
GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{
|
|
PDName: "fake-device1",
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
volumeSpec := &volume.Spec{Volume: &pod.Spec.Volumes[0]}
|
|
migratedSpec, err := csimigration.TranslateInTreeSpecToCSI(volumeSpec, pod.Namespace, intreeToCSITranslator)
|
|
if err != nil {
|
|
t.Fatalf("unexpected error while translating spec %v: %v", volumeSpec, err)
|
|
}
|
|
|
|
podName := util.GetUniquePodName(pod)
|
|
generatedVolumeName, err := dsw.AddPodToVolume(
|
|
podName,
|
|
pod,
|
|
migratedSpec,
|
|
migratedSpec.Name(),
|
|
"", /* volumeGidValue */
|
|
nil, /* SELinuxContexts */
|
|
)
|
|
|
|
// Assert
|
|
if err != nil {
|
|
t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
|
|
}
|
|
dsw.MarkVolumesReportedInUse([]v1.UniqueVolumeName{generatedVolumeName})
|
|
|
|
// Act
|
|
runReconciler(reconciler)
|
|
waitForMount(t, fakePlugin, generatedVolumeName, asw)
|
|
// Assert
|
|
assert.NoError(t, volumetesting.VerifyWaitForAttachCallCount(
|
|
1 /* expectedWaitForAttachCallCount */, fakePlugin))
|
|
assert.NoError(t, volumetesting.VerifyMountDeviceCallCount(
|
|
1 /* expectedMountDeviceCallCount */, fakePlugin))
|
|
assert.NoError(t, volumetesting.VerifySetUpCallCount(
|
|
1 /* expectedSetUpCallCount */, fakePlugin))
|
|
assert.NoError(t, volumetesting.VerifyZeroTearDownCallCount(fakePlugin))
|
|
assert.NoError(t, volumetesting.VerifyZeroDetachCallCount(fakePlugin))
|
|
}
|
|
|
|
// Populates desiredStateOfWorld cache with one volume/pod.
|
|
// Enables controllerAttachDetachEnabled.
|
|
// Calls Run()
|
|
// Verifies there is one mount call and no unmount calls.
|
|
// Verifies there are no attach/detach calls.
|
|
func Test_Run_Positive_VolumeMountControllerAttachEnabled(t *testing.T) {
|
|
// Arrange
|
|
node := &v1.Node{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: string(nodeName),
|
|
},
|
|
Status: v1.NodeStatus{
|
|
VolumesAttached: []v1.AttachedVolume{
|
|
{
|
|
Name: "fake-plugin/fake-device1",
|
|
DevicePath: "fake/path",
|
|
},
|
|
},
|
|
},
|
|
}
|
|
volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgrWithNode(t, node)
|
|
seLinuxTranslator := util.NewFakeSELinuxLabelTranslator()
|
|
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr, seLinuxTranslator)
|
|
asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
|
|
kubeClient := createTestClient()
|
|
fakeRecorder := &record.FakeRecorder{}
|
|
fakeHandler := volumetesting.NewBlockVolumePathHandler()
|
|
oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
|
|
kubeClient,
|
|
volumePluginMgr,
|
|
fakeRecorder,
|
|
fakeHandler))
|
|
reconciler := NewReconciler(
|
|
kubeClient,
|
|
true, /* controllerAttachDetachEnabled */
|
|
reconcilerLoopSleepDuration,
|
|
waitForAttachTimeout,
|
|
nodeName,
|
|
dsw,
|
|
asw,
|
|
hasAddedPods,
|
|
oex,
|
|
mount.NewFakeMounter(nil),
|
|
hostutil.NewFakeHostUtil(nil),
|
|
volumePluginMgr,
|
|
kubeletPodsDir)
|
|
pod := &v1.Pod{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "pod1",
|
|
UID: "pod1uid",
|
|
},
|
|
Spec: v1.PodSpec{
|
|
Volumes: []v1.Volume{
|
|
{
|
|
Name: "volume-name",
|
|
VolumeSource: v1.VolumeSource{
|
|
GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{
|
|
PDName: "fake-device1",
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
volumeSpec := &volume.Spec{Volume: &pod.Spec.Volumes[0]}
|
|
podName := util.GetUniquePodName(pod)
|
|
generatedVolumeName, err := dsw.AddPodToVolume(
|
|
podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */, nil /* seLinuxLabel */)
|
|
dsw.MarkVolumesReportedInUse([]v1.UniqueVolumeName{generatedVolumeName})
|
|
|
|
// Assert
|
|
if err != nil {
|
|
t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
|
|
}
|
|
|
|
// Act
|
|
runReconciler(reconciler)
|
|
waitForMount(t, fakePlugin, generatedVolumeName, asw)
|
|
|
|
// Assert
|
|
assert.NoError(t, volumetesting.VerifyZeroAttachCalls(fakePlugin))
|
|
assert.NoError(t, volumetesting.VerifyWaitForAttachCallCount(
|
|
1 /* expectedWaitForAttachCallCount */, fakePlugin))
|
|
assert.NoError(t, volumetesting.VerifyMountDeviceCallCount(
|
|
1 /* expectedMountDeviceCallCount */, fakePlugin))
|
|
assert.NoError(t, volumetesting.VerifySetUpCallCount(
|
|
1 /* expectedSetUpCallCount */, fakePlugin))
|
|
assert.NoError(t, volumetesting.VerifyZeroTearDownCallCount(fakePlugin))
|
|
assert.NoError(t, volumetesting.VerifyZeroDetachCallCount(fakePlugin))
|
|
}
|
|
|
|
// Populates desiredStateOfWorld cache with one volume/pod.
|
|
// Enables controllerAttachDetachEnabled.
|
|
// volume is not repored-in-use
|
|
// Calls Run()
|
|
// Verifies that there is not wait-for-mount call
|
|
// Verifies that there is no exponential-backoff triggered
|
|
func Test_Run_Negative_VolumeMountControllerAttachEnabled(t *testing.T) {
|
|
// Arrange
|
|
volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgr(t)
|
|
seLinuxTranslator := util.NewFakeSELinuxLabelTranslator()
|
|
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr, seLinuxTranslator)
|
|
asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
|
|
kubeClient := createTestClient()
|
|
fakeRecorder := &record.FakeRecorder{}
|
|
fakeHandler := volumetesting.NewBlockVolumePathHandler()
|
|
oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
|
|
kubeClient,
|
|
volumePluginMgr,
|
|
fakeRecorder,
|
|
fakeHandler))
|
|
reconciler := NewReconciler(
|
|
kubeClient,
|
|
true, /* controllerAttachDetachEnabled */
|
|
reconcilerLoopSleepDuration,
|
|
waitForAttachTimeout,
|
|
nodeName,
|
|
dsw,
|
|
asw,
|
|
hasAddedPods,
|
|
oex,
|
|
mount.NewFakeMounter(nil),
|
|
hostutil.NewFakeHostUtil(nil),
|
|
volumePluginMgr,
|
|
kubeletPodsDir)
|
|
pod := &v1.Pod{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "pod1",
|
|
UID: "pod1uid",
|
|
},
|
|
Spec: v1.PodSpec{
|
|
Volumes: []v1.Volume{
|
|
{
|
|
Name: "volume-name",
|
|
VolumeSource: v1.VolumeSource{
|
|
GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{
|
|
PDName: "fake-device1",
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
volumeSpec := &volume.Spec{Volume: &pod.Spec.Volumes[0]}
|
|
podName := util.GetUniquePodName(pod)
|
|
generatedVolumeName, err := dsw.AddPodToVolume(
|
|
podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */, nil /* seLinuxLabel */)
|
|
|
|
// Assert
|
|
if err != nil {
|
|
t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
|
|
}
|
|
|
|
// Act
|
|
runReconciler(reconciler)
|
|
time.Sleep(reconcilerSyncWaitDuration)
|
|
|
|
ok := oex.IsOperationSafeToRetry(generatedVolumeName, podName, nodeName, operationexecutor.VerifyControllerAttachedVolumeOpName)
|
|
if !ok {
|
|
t.Errorf("operation on volume %s is not safe to retry", generatedVolumeName)
|
|
}
|
|
|
|
// Assert
|
|
assert.NoError(t, volumetesting.VerifyZeroAttachCalls(fakePlugin))
|
|
assert.NoError(t, volumetesting.VerifyWaitForAttachCallCount(
|
|
0 /* expectedWaitForAttachCallCount */, fakePlugin))
|
|
assert.NoError(t, volumetesting.VerifyMountDeviceCallCount(
|
|
0 /* expectedMountDeviceCallCount */, fakePlugin))
|
|
}
|
|
|
|
// Populates desiredStateOfWorld cache with one volume/pod.
|
|
// Calls Run()
|
|
// Verifies there is one attach/mount/etc call and no detach calls.
|
|
// Deletes volume/pod from desired state of world.
|
|
// Verifies detach/unmount calls are issued.
|
|
func Test_Run_Positive_VolumeAttachMountUnmountDetach(t *testing.T) {
|
|
// Arrange
|
|
volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgr(t)
|
|
seLinuxTranslator := util.NewFakeSELinuxLabelTranslator()
|
|
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr, seLinuxTranslator)
|
|
asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
|
|
kubeClient := createTestClient()
|
|
fakeRecorder := &record.FakeRecorder{}
|
|
fakeHandler := volumetesting.NewBlockVolumePathHandler()
|
|
oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
|
|
kubeClient,
|
|
volumePluginMgr,
|
|
fakeRecorder,
|
|
fakeHandler))
|
|
reconciler := NewReconciler(
|
|
kubeClient,
|
|
false, /* controllerAttachDetachEnabled */
|
|
reconcilerLoopSleepDuration,
|
|
waitForAttachTimeout,
|
|
nodeName,
|
|
dsw,
|
|
asw,
|
|
hasAddedPods,
|
|
oex,
|
|
mount.NewFakeMounter(nil),
|
|
hostutil.NewFakeHostUtil(nil),
|
|
volumePluginMgr,
|
|
kubeletPodsDir)
|
|
pod := &v1.Pod{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "pod1",
|
|
UID: "pod1uid",
|
|
},
|
|
Spec: v1.PodSpec{
|
|
Volumes: []v1.Volume{
|
|
{
|
|
Name: "volume-name",
|
|
VolumeSource: v1.VolumeSource{
|
|
GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{
|
|
PDName: "fake-device1",
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
volumeSpec := &volume.Spec{Volume: &pod.Spec.Volumes[0]}
|
|
podName := util.GetUniquePodName(pod)
|
|
generatedVolumeName, err := dsw.AddPodToVolume(
|
|
podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */, nil /* seLinuxLabel */)
|
|
|
|
// Assert
|
|
if err != nil {
|
|
t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
|
|
}
|
|
|
|
// Act
|
|
runReconciler(reconciler)
|
|
waitForMount(t, fakePlugin, generatedVolumeName, asw)
|
|
// Assert
|
|
assert.NoError(t, volumetesting.VerifyAttachCallCount(
|
|
1 /* expectedAttachCallCount */, fakePlugin))
|
|
assert.NoError(t, volumetesting.VerifyWaitForAttachCallCount(
|
|
1 /* expectedWaitForAttachCallCount */, fakePlugin))
|
|
assert.NoError(t, volumetesting.VerifyMountDeviceCallCount(
|
|
1 /* expectedMountDeviceCallCount */, fakePlugin))
|
|
assert.NoError(t, volumetesting.VerifySetUpCallCount(
|
|
1 /* expectedSetUpCallCount */, fakePlugin))
|
|
assert.NoError(t, volumetesting.VerifyZeroTearDownCallCount(fakePlugin))
|
|
assert.NoError(t, volumetesting.VerifyZeroDetachCallCount(fakePlugin))
|
|
|
|
// Act
|
|
dsw.DeletePodFromVolume(podName, generatedVolumeName)
|
|
waitForDetach(t, generatedVolumeName, asw)
|
|
|
|
// Assert
|
|
assert.NoError(t, volumetesting.VerifyTearDownCallCount(
|
|
1 /* expectedTearDownCallCount */, fakePlugin))
|
|
assert.NoError(t, volumetesting.VerifyDetachCallCount(
|
|
1 /* expectedDetachCallCount */, fakePlugin))
|
|
}
|
|
|
|
// Populates desiredStateOfWorld cache with one volume/pod.
|
|
// Enables controllerAttachDetachEnabled.
|
|
// Calls Run()
|
|
// Verifies one mount call is made and no unmount calls.
|
|
// Deletes volume/pod from desired state of world.
|
|
// Verifies one unmount call is made.
|
|
// Verifies there are no attach/detach calls made.
|
|
func Test_Run_Positive_VolumeUnmountControllerAttachEnabled(t *testing.T) {
|
|
// Arrange
|
|
node := &v1.Node{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: string(nodeName),
|
|
},
|
|
Status: v1.NodeStatus{
|
|
VolumesAttached: []v1.AttachedVolume{
|
|
{
|
|
Name: "fake-plugin/fake-device1",
|
|
DevicePath: "fake/path",
|
|
},
|
|
},
|
|
},
|
|
}
|
|
volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgrWithNode(t, node)
|
|
seLinuxTranslator := util.NewFakeSELinuxLabelTranslator()
|
|
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr, seLinuxTranslator)
|
|
asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
|
|
kubeClient := createTestClient()
|
|
fakeRecorder := &record.FakeRecorder{}
|
|
fakeHandler := volumetesting.NewBlockVolumePathHandler()
|
|
oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
|
|
kubeClient,
|
|
volumePluginMgr,
|
|
fakeRecorder,
|
|
fakeHandler))
|
|
reconciler := NewReconciler(
|
|
kubeClient,
|
|
true, /* controllerAttachDetachEnabled */
|
|
reconcilerLoopSleepDuration,
|
|
waitForAttachTimeout,
|
|
nodeName,
|
|
dsw,
|
|
asw,
|
|
hasAddedPods,
|
|
oex,
|
|
mount.NewFakeMounter(nil),
|
|
hostutil.NewFakeHostUtil(nil),
|
|
volumePluginMgr,
|
|
kubeletPodsDir)
|
|
pod := &v1.Pod{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "pod1",
|
|
UID: "pod1uid",
|
|
},
|
|
Spec: v1.PodSpec{
|
|
Volumes: []v1.Volume{
|
|
{
|
|
Name: "volume-name",
|
|
VolumeSource: v1.VolumeSource{
|
|
GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{
|
|
PDName: "fake-device1",
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
volumeSpec := &volume.Spec{Volume: &pod.Spec.Volumes[0]}
|
|
podName := util.GetUniquePodName(pod)
|
|
generatedVolumeName, err := dsw.AddPodToVolume(
|
|
podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */, nil /* seLinuxLabel */)
|
|
|
|
// Assert
|
|
if err != nil {
|
|
t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
|
|
}
|
|
|
|
// Act
|
|
runReconciler(reconciler)
|
|
|
|
dsw.MarkVolumesReportedInUse([]v1.UniqueVolumeName{generatedVolumeName})
|
|
waitForMount(t, fakePlugin, generatedVolumeName, asw)
|
|
|
|
// Assert
|
|
assert.NoError(t, volumetesting.VerifyZeroAttachCalls(fakePlugin))
|
|
assert.NoError(t, volumetesting.VerifyWaitForAttachCallCount(
|
|
1 /* expectedWaitForAttachCallCount */, fakePlugin))
|
|
assert.NoError(t, volumetesting.VerifyMountDeviceCallCount(
|
|
1 /* expectedMountDeviceCallCount */, fakePlugin))
|
|
assert.NoError(t, volumetesting.VerifySetUpCallCount(
|
|
1 /* expectedSetUpCallCount */, fakePlugin))
|
|
assert.NoError(t, volumetesting.VerifyZeroTearDownCallCount(fakePlugin))
|
|
assert.NoError(t, volumetesting.VerifyZeroDetachCallCount(fakePlugin))
|
|
|
|
// Act
|
|
dsw.DeletePodFromVolume(podName, generatedVolumeName)
|
|
waitForDetach(t, generatedVolumeName, asw)
|
|
|
|
// Assert
|
|
assert.NoError(t, volumetesting.VerifyTearDownCallCount(
|
|
1 /* expectedTearDownCallCount */, fakePlugin))
|
|
assert.NoError(t, volumetesting.VerifyZeroDetachCallCount(fakePlugin))
|
|
}
|
|
|
|
// Populates desiredStateOfWorld cache with one volume/pod.
|
|
// Calls Run()
|
|
// Verifies there are attach/get map paths/setupDevice calls and
|
|
// no detach/teardownDevice calls.
|
|
func Test_Run_Positive_VolumeAttachAndMap(t *testing.T) {
|
|
pod := &v1.Pod{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "pod1",
|
|
UID: "pod1uid",
|
|
Namespace: "ns",
|
|
},
|
|
Spec: v1.PodSpec{},
|
|
}
|
|
|
|
mode := v1.PersistentVolumeBlock
|
|
gcepv := &v1.PersistentVolume{
|
|
ObjectMeta: metav1.ObjectMeta{UID: "001", Name: "volume-name"},
|
|
Spec: v1.PersistentVolumeSpec{
|
|
Capacity: v1.ResourceList{v1.ResourceName(v1.ResourceStorage): resource.MustParse("10G")},
|
|
PersistentVolumeSource: v1.PersistentVolumeSource{GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{PDName: "fake-device1"}},
|
|
AccessModes: []v1.PersistentVolumeAccessMode{
|
|
v1.ReadWriteOnce,
|
|
v1.ReadOnlyMany,
|
|
},
|
|
VolumeMode: &mode,
|
|
ClaimRef: &v1.ObjectReference{Namespace: "ns", Name: "pvc-volume-name"},
|
|
},
|
|
}
|
|
|
|
gcepvc := &v1.PersistentVolumeClaim{
|
|
ObjectMeta: metav1.ObjectMeta{UID: "pvc-001", Name: "pvc-volume-name", Namespace: "ns"},
|
|
Spec: v1.PersistentVolumeClaimSpec{
|
|
VolumeName: "volume-name",
|
|
VolumeMode: &mode,
|
|
},
|
|
Status: v1.PersistentVolumeClaimStatus{
|
|
Phase: v1.ClaimBound,
|
|
Capacity: gcepv.Spec.Capacity,
|
|
},
|
|
}
|
|
|
|
// Arrange
|
|
volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgr(t)
|
|
seLinuxTranslator := util.NewFakeSELinuxLabelTranslator()
|
|
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr, seLinuxTranslator)
|
|
asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
|
|
kubeClient := createtestClientWithPVPVC(gcepv, gcepvc)
|
|
fakeRecorder := &record.FakeRecorder{}
|
|
fakeHandler := volumetesting.NewBlockVolumePathHandler()
|
|
oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
|
|
kubeClient,
|
|
volumePluginMgr,
|
|
fakeRecorder,
|
|
fakeHandler))
|
|
reconciler := NewReconciler(
|
|
kubeClient,
|
|
false, /* controllerAttachDetachEnabled */
|
|
reconcilerLoopSleepDuration,
|
|
waitForAttachTimeout,
|
|
nodeName,
|
|
dsw,
|
|
asw,
|
|
hasAddedPods,
|
|
oex,
|
|
mount.NewFakeMounter(nil),
|
|
hostutil.NewFakeHostUtil(nil),
|
|
volumePluginMgr,
|
|
kubeletPodsDir)
|
|
|
|
volumeSpec := &volume.Spec{
|
|
PersistentVolume: gcepv,
|
|
}
|
|
podName := util.GetUniquePodName(pod)
|
|
generatedVolumeName, err := dsw.AddPodToVolume(
|
|
podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */, nil /* seLinuxLabel */)
|
|
|
|
// Assert
|
|
if err != nil {
|
|
t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
|
|
}
|
|
|
|
// Act
|
|
runReconciler(reconciler)
|
|
waitForMount(t, fakePlugin, generatedVolumeName, asw)
|
|
// Assert
|
|
assert.NoError(t, volumetesting.VerifyAttachCallCount(
|
|
1 /* expectedAttachCallCount */, fakePlugin))
|
|
assert.NoError(t, volumetesting.VerifyWaitForAttachCallCount(
|
|
1 /* expectedWaitForAttachCallCount */, fakePlugin))
|
|
assert.NoError(t, volumetesting.VerifyGetMapPodDeviceCallCount(
|
|
1 /* expectedGetMapDeviceCallCount */, fakePlugin))
|
|
assert.NoError(t, volumetesting.VerifyZeroTearDownDeviceCallCount(fakePlugin))
|
|
assert.NoError(t, volumetesting.VerifyZeroDetachCallCount(fakePlugin))
|
|
}
|
|
|
|
// Populates desiredStateOfWorld cache with one volume/pod.
|
|
// Enables controllerAttachDetachEnabled.
|
|
// Calls Run()
|
|
// Verifies there are two get map path calls, a setupDevice call
|
|
// and no teardownDevice call.
|
|
// Verifies there are no attach/detach calls.
|
|
func Test_Run_Positive_BlockVolumeMapControllerAttachEnabled(t *testing.T) {
|
|
pod := &v1.Pod{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "pod1",
|
|
UID: "pod1uid",
|
|
Namespace: "ns",
|
|
},
|
|
Spec: v1.PodSpec{},
|
|
}
|
|
|
|
mode := v1.PersistentVolumeBlock
|
|
gcepv := &v1.PersistentVolume{
|
|
ObjectMeta: metav1.ObjectMeta{UID: "001", Name: "volume-name"},
|
|
Spec: v1.PersistentVolumeSpec{
|
|
Capacity: v1.ResourceList{v1.ResourceName(v1.ResourceStorage): resource.MustParse("10G")},
|
|
PersistentVolumeSource: v1.PersistentVolumeSource{GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{PDName: "fake-device1"}},
|
|
AccessModes: []v1.PersistentVolumeAccessMode{
|
|
v1.ReadWriteOnce,
|
|
v1.ReadOnlyMany,
|
|
},
|
|
VolumeMode: &mode,
|
|
ClaimRef: &v1.ObjectReference{Namespace: "ns", Name: "pvc-volume-name"},
|
|
},
|
|
}
|
|
gcepvc := &v1.PersistentVolumeClaim{
|
|
ObjectMeta: metav1.ObjectMeta{UID: "pvc-001", Name: "pvc-volume-name", Namespace: "ns"},
|
|
Spec: v1.PersistentVolumeClaimSpec{
|
|
VolumeName: "volume-name",
|
|
VolumeMode: &mode,
|
|
},
|
|
Status: v1.PersistentVolumeClaimStatus{
|
|
Phase: v1.ClaimBound,
|
|
Capacity: gcepv.Spec.Capacity,
|
|
},
|
|
}
|
|
|
|
volumeSpec := &volume.Spec{
|
|
PersistentVolume: gcepv,
|
|
}
|
|
node := &v1.Node{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: string(nodeName),
|
|
},
|
|
Status: v1.NodeStatus{
|
|
VolumesAttached: []v1.AttachedVolume{
|
|
{
|
|
Name: "fake-plugin/fake-device1",
|
|
DevicePath: "fake/path",
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
// Arrange
|
|
volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgrWithNode(t, node)
|
|
seLinuxTranslator := util.NewFakeSELinuxLabelTranslator()
|
|
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr, seLinuxTranslator)
|
|
asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
|
|
kubeClient := createtestClientWithPVPVC(gcepv, gcepvc, v1.AttachedVolume{
|
|
Name: "fake-plugin/fake-device1",
|
|
DevicePath: "/fake/path",
|
|
})
|
|
fakeRecorder := &record.FakeRecorder{}
|
|
fakeHandler := volumetesting.NewBlockVolumePathHandler()
|
|
oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
|
|
kubeClient,
|
|
volumePluginMgr,
|
|
fakeRecorder,
|
|
fakeHandler))
|
|
reconciler := NewReconciler(
|
|
kubeClient,
|
|
true, /* controllerAttachDetachEnabled */
|
|
reconcilerLoopSleepDuration,
|
|
waitForAttachTimeout,
|
|
nodeName,
|
|
dsw,
|
|
asw,
|
|
hasAddedPods,
|
|
oex,
|
|
mount.NewFakeMounter(nil),
|
|
hostutil.NewFakeHostUtil(nil),
|
|
volumePluginMgr,
|
|
kubeletPodsDir)
|
|
|
|
podName := util.GetUniquePodName(pod)
|
|
generatedVolumeName, err := dsw.AddPodToVolume(
|
|
podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */, nil /* seLinuxLabel */)
|
|
dsw.MarkVolumesReportedInUse([]v1.UniqueVolumeName{generatedVolumeName})
|
|
|
|
// Assert
|
|
if err != nil {
|
|
t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
|
|
}
|
|
|
|
// Act
|
|
runReconciler(reconciler)
|
|
waitForMount(t, fakePlugin, generatedVolumeName, asw)
|
|
|
|
// Assert
|
|
assert.NoError(t, volumetesting.VerifyZeroAttachCalls(fakePlugin))
|
|
assert.NoError(t, volumetesting.VerifyWaitForAttachCallCount(
|
|
1 /* expectedWaitForAttachCallCount */, fakePlugin))
|
|
assert.NoError(t, volumetesting.VerifyGetMapPodDeviceCallCount(
|
|
1 /* expectedGetMapDeviceCallCount */, fakePlugin))
|
|
assert.NoError(t, volumetesting.VerifyZeroTearDownDeviceCallCount(fakePlugin))
|
|
assert.NoError(t, volumetesting.VerifyZeroDetachCallCount(fakePlugin))
|
|
}
|
|
|
|
// Populates desiredStateOfWorld cache with one volume/pod.
|
|
// Calls Run()
|
|
// Verifies there is one attach call, two get map path calls,
|
|
// setupDevice call and no detach calls.
|
|
// Deletes volume/pod from desired state of world.
|
|
// Verifies one detach/teardownDevice calls are issued.
|
|
func Test_Run_Positive_BlockVolumeAttachMapUnmapDetach(t *testing.T) {
|
|
pod := &v1.Pod{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "pod1",
|
|
UID: "pod1uid",
|
|
Namespace: "ns",
|
|
},
|
|
Spec: v1.PodSpec{},
|
|
}
|
|
|
|
mode := v1.PersistentVolumeBlock
|
|
gcepv := &v1.PersistentVolume{
|
|
ObjectMeta: metav1.ObjectMeta{UID: "001", Name: "volume-name"},
|
|
Spec: v1.PersistentVolumeSpec{
|
|
Capacity: v1.ResourceList{v1.ResourceName(v1.ResourceStorage): resource.MustParse("10G")},
|
|
PersistentVolumeSource: v1.PersistentVolumeSource{GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{PDName: "fake-device1"}},
|
|
AccessModes: []v1.PersistentVolumeAccessMode{
|
|
v1.ReadWriteOnce,
|
|
v1.ReadOnlyMany,
|
|
},
|
|
VolumeMode: &mode,
|
|
ClaimRef: &v1.ObjectReference{Namespace: "ns", Name: "pvc-volume-name"},
|
|
},
|
|
}
|
|
gcepvc := &v1.PersistentVolumeClaim{
|
|
ObjectMeta: metav1.ObjectMeta{UID: "pvc-001", Name: "pvc-volume-name", Namespace: "ns"},
|
|
Spec: v1.PersistentVolumeClaimSpec{
|
|
VolumeName: "volume-name",
|
|
VolumeMode: &mode,
|
|
},
|
|
Status: v1.PersistentVolumeClaimStatus{
|
|
Phase: v1.ClaimBound,
|
|
Capacity: gcepv.Spec.Capacity,
|
|
},
|
|
}
|
|
|
|
volumeSpec := &volume.Spec{
|
|
PersistentVolume: gcepv,
|
|
}
|
|
|
|
// Arrange
|
|
volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgr(t)
|
|
seLinuxTranslator := util.NewFakeSELinuxLabelTranslator()
|
|
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr, seLinuxTranslator)
|
|
asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
|
|
kubeClient := createtestClientWithPVPVC(gcepv, gcepvc)
|
|
fakeRecorder := &record.FakeRecorder{}
|
|
fakeHandler := volumetesting.NewBlockVolumePathHandler()
|
|
oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
|
|
kubeClient,
|
|
volumePluginMgr,
|
|
fakeRecorder,
|
|
fakeHandler))
|
|
reconciler := NewReconciler(
|
|
kubeClient,
|
|
false, /* controllerAttachDetachEnabled */
|
|
reconcilerLoopSleepDuration,
|
|
waitForAttachTimeout,
|
|
nodeName,
|
|
dsw,
|
|
asw,
|
|
hasAddedPods,
|
|
oex,
|
|
mount.NewFakeMounter(nil),
|
|
hostutil.NewFakeHostUtil(nil),
|
|
volumePluginMgr,
|
|
kubeletPodsDir)
|
|
|
|
podName := util.GetUniquePodName(pod)
|
|
generatedVolumeName, err := dsw.AddPodToVolume(
|
|
podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */, nil /* seLinuxLabel */)
|
|
|
|
// Assert
|
|
if err != nil {
|
|
t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
|
|
}
|
|
|
|
// Act
|
|
runReconciler(reconciler)
|
|
waitForMount(t, fakePlugin, generatedVolumeName, asw)
|
|
// Assert
|
|
assert.NoError(t, volumetesting.VerifyAttachCallCount(
|
|
1 /* expectedAttachCallCount */, fakePlugin))
|
|
assert.NoError(t, volumetesting.VerifyWaitForAttachCallCount(
|
|
1 /* expectedWaitForAttachCallCount */, fakePlugin))
|
|
assert.NoError(t, volumetesting.VerifyGetMapPodDeviceCallCount(
|
|
1 /* expectedGetMapDeviceCallCount */, fakePlugin))
|
|
assert.NoError(t, volumetesting.VerifyZeroTearDownDeviceCallCount(fakePlugin))
|
|
assert.NoError(t, volumetesting.VerifyZeroDetachCallCount(fakePlugin))
|
|
|
|
// Act
|
|
dsw.DeletePodFromVolume(podName, generatedVolumeName)
|
|
waitForDetach(t, generatedVolumeName, asw)
|
|
|
|
// Assert
|
|
assert.NoError(t, volumetesting.VerifyTearDownDeviceCallCount(
|
|
1 /* expectedTearDownDeviceCallCount */, fakePlugin))
|
|
assert.NoError(t, volumetesting.VerifyDetachCallCount(
|
|
1 /* expectedDetachCallCount */, fakePlugin))
|
|
}
|
|
|
|
// Populates desiredStateOfWorld cache with one volume/pod.
|
|
// Enables controllerAttachDetachEnabled.
|
|
// Calls Run()
|
|
// Verifies two map path calls are made and no teardownDevice/detach calls.
|
|
// Deletes volume/pod from desired state of world.
|
|
// Verifies one teardownDevice call is made.
|
|
// Verifies there are no attach/detach calls made.
|
|
func Test_Run_Positive_VolumeUnmapControllerAttachEnabled(t *testing.T) {
|
|
pod := &v1.Pod{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "pod1",
|
|
UID: "pod1uid",
|
|
Namespace: "ns",
|
|
},
|
|
Spec: v1.PodSpec{},
|
|
}
|
|
|
|
mode := v1.PersistentVolumeBlock
|
|
gcepv := &v1.PersistentVolume{
|
|
ObjectMeta: metav1.ObjectMeta{UID: "001", Name: "volume-name"},
|
|
Spec: v1.PersistentVolumeSpec{
|
|
Capacity: v1.ResourceList{v1.ResourceName(v1.ResourceStorage): resource.MustParse("10G")},
|
|
PersistentVolumeSource: v1.PersistentVolumeSource{GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{PDName: "fake-device1"}},
|
|
AccessModes: []v1.PersistentVolumeAccessMode{
|
|
v1.ReadWriteOnce,
|
|
v1.ReadOnlyMany,
|
|
},
|
|
VolumeMode: &mode,
|
|
ClaimRef: &v1.ObjectReference{Namespace: "ns", Name: "pvc-volume-name"},
|
|
},
|
|
}
|
|
gcepvc := &v1.PersistentVolumeClaim{
|
|
ObjectMeta: metav1.ObjectMeta{UID: "pvc-001", Name: "pvc-volume-name", Namespace: "ns"},
|
|
Spec: v1.PersistentVolumeClaimSpec{
|
|
VolumeName: "volume-name",
|
|
VolumeMode: &mode,
|
|
},
|
|
Status: v1.PersistentVolumeClaimStatus{
|
|
Phase: v1.ClaimBound,
|
|
Capacity: gcepv.Spec.Capacity,
|
|
},
|
|
}
|
|
|
|
volumeSpec := &volume.Spec{
|
|
PersistentVolume: gcepv,
|
|
}
|
|
|
|
node := &v1.Node{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: string(nodeName),
|
|
},
|
|
Status: v1.NodeStatus{
|
|
VolumesAttached: []v1.AttachedVolume{
|
|
{
|
|
Name: "fake-plugin/fake-device1",
|
|
DevicePath: "/fake/path",
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
// Arrange
|
|
volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgrWithNode(t, node)
|
|
seLinuxTranslator := util.NewFakeSELinuxLabelTranslator()
|
|
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr, seLinuxTranslator)
|
|
asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
|
|
kubeClient := createtestClientWithPVPVC(gcepv, gcepvc, v1.AttachedVolume{
|
|
Name: "fake-plugin/fake-device1",
|
|
DevicePath: "/fake/path",
|
|
})
|
|
fakeRecorder := &record.FakeRecorder{}
|
|
fakeHandler := volumetesting.NewBlockVolumePathHandler()
|
|
oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
|
|
kubeClient,
|
|
volumePluginMgr,
|
|
fakeRecorder,
|
|
fakeHandler))
|
|
reconciler := NewReconciler(
|
|
kubeClient,
|
|
true, /* controllerAttachDetachEnabled */
|
|
reconcilerLoopSleepDuration,
|
|
waitForAttachTimeout,
|
|
nodeName,
|
|
dsw,
|
|
asw,
|
|
hasAddedPods,
|
|
oex,
|
|
mount.NewFakeMounter(nil),
|
|
hostutil.NewFakeHostUtil(nil),
|
|
volumePluginMgr,
|
|
kubeletPodsDir)
|
|
|
|
podName := util.GetUniquePodName(pod)
|
|
generatedVolumeName, err := dsw.AddPodToVolume(
|
|
podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */, nil /* seLinuxLabel */)
|
|
|
|
// Assert
|
|
if err != nil {
|
|
t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
|
|
}
|
|
|
|
// Act
|
|
runReconciler(reconciler)
|
|
|
|
dsw.MarkVolumesReportedInUse([]v1.UniqueVolumeName{generatedVolumeName})
|
|
waitForMount(t, fakePlugin, generatedVolumeName, asw)
|
|
|
|
// Assert
|
|
assert.NoError(t, volumetesting.VerifyZeroAttachCalls(fakePlugin))
|
|
assert.NoError(t, volumetesting.VerifyWaitForAttachCallCount(
|
|
1 /* expectedWaitForAttachCallCount */, fakePlugin))
|
|
assert.NoError(t, volumetesting.VerifyGetMapPodDeviceCallCount(
|
|
1 /* expectedGetMapDeviceCallCount */, fakePlugin))
|
|
assert.NoError(t, volumetesting.VerifyZeroTearDownDeviceCallCount(fakePlugin))
|
|
assert.NoError(t, volumetesting.VerifyZeroDetachCallCount(fakePlugin))
|
|
|
|
// Act
|
|
dsw.DeletePodFromVolume(podName, generatedVolumeName)
|
|
waitForDetach(t, generatedVolumeName, asw)
|
|
|
|
// Assert
|
|
assert.NoError(t, volumetesting.VerifyTearDownDeviceCallCount(
|
|
1 /* expectedTearDownDeviceCallCount */, fakePlugin))
|
|
assert.NoError(t, volumetesting.VerifyZeroDetachCallCount(fakePlugin))
|
|
}
|
|
|
|
func Test_GenerateMapVolumeFunc_Plugin_Not_Found(t *testing.T) {
|
|
testCases := map[string]struct {
|
|
volumePlugins []volume.VolumePlugin
|
|
expectErr bool
|
|
expectedErrMsg string
|
|
}{
|
|
"volumePlugin is nil": {
|
|
volumePlugins: []volume.VolumePlugin{},
|
|
expectErr: true,
|
|
expectedErrMsg: "MapVolume.FindMapperPluginBySpec failed",
|
|
},
|
|
"blockVolumePlugin is nil": {
|
|
volumePlugins: volumetesting.NewFakeFileVolumePlugin(),
|
|
expectErr: true,
|
|
expectedErrMsg: "MapVolume.FindMapperPluginBySpec failed to find BlockVolumeMapper plugin. Volume plugin is nil.",
|
|
},
|
|
}
|
|
|
|
for name, tc := range testCases {
|
|
t.Run(name, func(t *testing.T) {
|
|
volumePluginMgr := &volume.VolumePluginMgr{}
|
|
volumePluginMgr.InitPlugins(tc.volumePlugins, nil, nil)
|
|
asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
|
|
oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
|
|
nil, /* kubeClient */
|
|
volumePluginMgr,
|
|
nil, /* fakeRecorder */
|
|
nil))
|
|
|
|
pod := &v1.Pod{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "pod1",
|
|
UID: "pod1uid",
|
|
},
|
|
Spec: v1.PodSpec{},
|
|
}
|
|
volumeMode := v1.PersistentVolumeBlock
|
|
tmpSpec := &volume.Spec{PersistentVolume: &v1.PersistentVolume{Spec: v1.PersistentVolumeSpec{VolumeMode: &volumeMode}}}
|
|
volumeToMount := operationexecutor.VolumeToMount{
|
|
Pod: pod,
|
|
VolumeSpec: tmpSpec}
|
|
err := oex.MountVolume(waitForAttachTimeout, volumeToMount, asw, false)
|
|
// Assert
|
|
if assert.Error(t, err) {
|
|
assert.Contains(t, err.Error(), tc.expectedErrMsg)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func Test_GenerateUnmapVolumeFunc_Plugin_Not_Found(t *testing.T) {
|
|
testCases := map[string]struct {
|
|
volumePlugins []volume.VolumePlugin
|
|
expectErr bool
|
|
expectedErrMsg string
|
|
}{
|
|
"volumePlugin is nil": {
|
|
volumePlugins: []volume.VolumePlugin{},
|
|
expectErr: true,
|
|
expectedErrMsg: "UnmapVolume.FindMapperPluginByName failed",
|
|
},
|
|
"blockVolumePlugin is nil": {
|
|
volumePlugins: volumetesting.NewFakeFileVolumePlugin(),
|
|
expectErr: true,
|
|
expectedErrMsg: "UnmapVolume.FindMapperPluginByName failed to find BlockVolumeMapper plugin. Volume plugin is nil.",
|
|
},
|
|
}
|
|
|
|
for name, tc := range testCases {
|
|
t.Run(name, func(t *testing.T) {
|
|
volumePluginMgr := &volume.VolumePluginMgr{}
|
|
volumePluginMgr.InitPlugins(tc.volumePlugins, nil, nil)
|
|
asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
|
|
oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
|
|
nil, /* kubeClient */
|
|
volumePluginMgr,
|
|
nil, /* fakeRecorder */
|
|
nil))
|
|
volumeMode := v1.PersistentVolumeBlock
|
|
tmpSpec := &volume.Spec{PersistentVolume: &v1.PersistentVolume{Spec: v1.PersistentVolumeSpec{VolumeMode: &volumeMode}}}
|
|
volumeToUnmount := operationexecutor.MountedVolume{
|
|
PluginName: "fake-file-plugin",
|
|
VolumeSpec: tmpSpec}
|
|
err := oex.UnmountVolume(volumeToUnmount, asw, "" /* podsDir */)
|
|
// Assert
|
|
if assert.Error(t, err) {
|
|
assert.Contains(t, err.Error(), tc.expectedErrMsg)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func Test_GenerateUnmapDeviceFunc_Plugin_Not_Found(t *testing.T) {
|
|
testCases := map[string]struct {
|
|
volumePlugins []volume.VolumePlugin
|
|
expectErr bool
|
|
expectedErrMsg string
|
|
}{
|
|
"volumePlugin is nil": {
|
|
volumePlugins: []volume.VolumePlugin{},
|
|
expectErr: true,
|
|
expectedErrMsg: "UnmapDevice.FindMapperPluginByName failed",
|
|
},
|
|
"blockVolumePlugin is nil": {
|
|
volumePlugins: volumetesting.NewFakeFileVolumePlugin(),
|
|
expectErr: true,
|
|
expectedErrMsg: "UnmapDevice.FindMapperPluginByName failed to find BlockVolumeMapper plugin. Volume plugin is nil.",
|
|
},
|
|
}
|
|
|
|
for name, tc := range testCases {
|
|
t.Run(name, func(t *testing.T) {
|
|
volumePluginMgr := &volume.VolumePluginMgr{}
|
|
volumePluginMgr.InitPlugins(tc.volumePlugins, nil, nil)
|
|
asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
|
|
oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
|
|
nil, /* kubeClient */
|
|
volumePluginMgr,
|
|
nil, /* fakeRecorder */
|
|
nil))
|
|
var hostutil hostutil.HostUtils
|
|
volumeMode := v1.PersistentVolumeBlock
|
|
tmpSpec := &volume.Spec{PersistentVolume: &v1.PersistentVolume{Spec: v1.PersistentVolumeSpec{VolumeMode: &volumeMode}}}
|
|
deviceToDetach := operationexecutor.AttachedVolume{VolumeSpec: tmpSpec, PluginName: "fake-file-plugin"}
|
|
err := oex.UnmountDevice(deviceToDetach, asw, hostutil)
|
|
// Assert
|
|
if assert.Error(t, err) {
|
|
assert.Contains(t, err.Error(), tc.expectedErrMsg)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
// Populates desiredStateOfWorld cache with one volume/pod.
|
|
// Enables controllerAttachDetachEnabled.
|
|
// Calls Run()
|
|
// Wait for volume mounted.
|
|
// Mark volume as fsResizeRequired in ASW.
|
|
// Verifies volume's fsResizeRequired flag is cleared later.
|
|
func Test_Run_Positive_VolumeFSResizeControllerAttachEnabled(t *testing.T) {
|
|
blockMode := v1.PersistentVolumeBlock
|
|
fsMode := v1.PersistentVolumeFilesystem
|
|
|
|
var tests = []struct {
|
|
name string
|
|
volumeMode *v1.PersistentVolumeMode
|
|
expansionFailed bool
|
|
uncertainTest bool
|
|
pvName string
|
|
pvcSize resource.Quantity
|
|
pvcStatusSize resource.Quantity
|
|
oldPVSize resource.Quantity
|
|
newPVSize resource.Quantity
|
|
}{
|
|
{
|
|
name: "expand-fs-volume",
|
|
volumeMode: &fsMode,
|
|
pvName: "pv",
|
|
pvcSize: resource.MustParse("10G"),
|
|
pvcStatusSize: resource.MustParse("10G"),
|
|
newPVSize: resource.MustParse("15G"),
|
|
oldPVSize: resource.MustParse("10G"),
|
|
},
|
|
{
|
|
name: "expand-raw-block",
|
|
volumeMode: &blockMode,
|
|
pvName: "pv",
|
|
pvcSize: resource.MustParse("10G"),
|
|
pvcStatusSize: resource.MustParse("10G"),
|
|
newPVSize: resource.MustParse("15G"),
|
|
oldPVSize: resource.MustParse("10G"),
|
|
},
|
|
{
|
|
name: "expand-fs-volume with in-use error",
|
|
volumeMode: &fsMode,
|
|
expansionFailed: true,
|
|
pvName: volumetesting.FailWithInUseVolumeName,
|
|
pvcSize: resource.MustParse("10G"),
|
|
pvcStatusSize: resource.MustParse("10G"),
|
|
newPVSize: resource.MustParse("15G"),
|
|
oldPVSize: resource.MustParse("13G"),
|
|
},
|
|
}
|
|
|
|
for _, tc := range tests {
|
|
t.Run(tc.name, func(t *testing.T) {
|
|
pv := getTestPV(tc.pvName, tc.volumeMode, tc.oldPVSize)
|
|
pvc := getTestPVC("pv", tc.volumeMode, tc.pvcSize, tc.pvcStatusSize)
|
|
pod := getTestPod(pvc.Name)
|
|
|
|
// deep copy before reconciler runs to avoid data race.
|
|
pvWithSize := pv.DeepCopy()
|
|
node := &v1.Node{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: string(nodeName),
|
|
},
|
|
Spec: v1.NodeSpec{},
|
|
Status: v1.NodeStatus{
|
|
VolumesAttached: []v1.AttachedVolume{
|
|
{
|
|
Name: v1.UniqueVolumeName(fmt.Sprintf("fake-plugin/%s", tc.pvName)),
|
|
DevicePath: "fake/path",
|
|
},
|
|
},
|
|
},
|
|
}
|
|
volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgrWithNode(t, node)
|
|
seLinuxTranslator := util.NewFakeSELinuxLabelTranslator()
|
|
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr, seLinuxTranslator)
|
|
asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
|
|
kubeClient := createtestClientWithPVPVC(pv, pvc, v1.AttachedVolume{
|
|
Name: v1.UniqueVolumeName(fmt.Sprintf("fake-plugin/%s", tc.pvName)),
|
|
DevicePath: "fake/path",
|
|
})
|
|
fakeRecorder := &record.FakeRecorder{}
|
|
fakeHandler := volumetesting.NewBlockVolumePathHandler()
|
|
oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
|
|
kubeClient,
|
|
volumePluginMgr,
|
|
fakeRecorder,
|
|
fakeHandler))
|
|
|
|
reconciler := NewReconciler(
|
|
kubeClient,
|
|
true, /* controllerAttachDetachEnabled */
|
|
reconcilerLoopSleepDuration,
|
|
waitForAttachTimeout,
|
|
nodeName,
|
|
dsw,
|
|
asw,
|
|
hasAddedPods,
|
|
oex,
|
|
mount.NewFakeMounter(nil),
|
|
hostutil.NewFakeHostUtil(nil),
|
|
volumePluginMgr,
|
|
kubeletPodsDir)
|
|
|
|
volumeSpec := &volume.Spec{PersistentVolume: pv}
|
|
podName := util.GetUniquePodName(pod)
|
|
volumeName, err := dsw.AddPodToVolume(
|
|
podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */, nil /* seLinuxLabel */)
|
|
// Assert
|
|
if err != nil {
|
|
t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
|
|
}
|
|
dsw.MarkVolumesReportedInUse([]v1.UniqueVolumeName{volumeName})
|
|
|
|
// Start the reconciler to fill ASW.
|
|
stopChan, stoppedChan := make(chan struct{}), make(chan struct{})
|
|
go func() {
|
|
defer close(stoppedChan)
|
|
reconciler.Run(stopChan)
|
|
}()
|
|
waitForMount(t, fakePlugin, volumeName, asw)
|
|
// Stop the reconciler.
|
|
close(stopChan)
|
|
<-stoppedChan
|
|
|
|
// Simulate what DSOWP does
|
|
pvWithSize.Spec.Capacity[v1.ResourceStorage] = tc.newPVSize
|
|
volumeSpec = &volume.Spec{PersistentVolume: pvWithSize}
|
|
dsw.AddPodToVolume(podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */, nil /* seLinuxContexts */)
|
|
|
|
t.Logf("Changing size of the volume to %s", tc.newPVSize.String())
|
|
newSize := tc.newPVSize.DeepCopy()
|
|
dsw.UpdatePersistentVolumeSize(volumeName, &newSize)
|
|
|
|
_, _, podExistErr := asw.PodExistsInVolume(podName, volumeName, newSize, "" /* SELinuxLabel */)
|
|
if tc.expansionFailed {
|
|
if cache.IsFSResizeRequiredError(podExistErr) {
|
|
t.Fatalf("volume %s should not throw fsResizeRequired error: %v", volumeName, podExistErr)
|
|
}
|
|
} else {
|
|
if !cache.IsFSResizeRequiredError(podExistErr) {
|
|
t.Fatalf("Volume should be marked as fsResizeRequired, but receive unexpected error: %v", podExistErr)
|
|
}
|
|
go reconciler.Run(wait.NeverStop)
|
|
|
|
waitErr := retryWithExponentialBackOff(testOperationBackOffDuration, func() (done bool, err error) {
|
|
mounted, _, err := asw.PodExistsInVolume(podName, volumeName, newSize, "" /* SELinuxContext */)
|
|
return mounted && err == nil, nil
|
|
})
|
|
if waitErr != nil {
|
|
t.Fatalf("Volume resize should succeeded %v", waitErr)
|
|
}
|
|
}
|
|
|
|
})
|
|
}
|
|
}
|
|
|
|
func getTestPVC(pvName string, volumeMode *v1.PersistentVolumeMode, specSize, statusSize resource.Quantity) *v1.PersistentVolumeClaim {
|
|
pvc := &v1.PersistentVolumeClaim{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "pvc",
|
|
UID: "pvcuid",
|
|
},
|
|
Spec: v1.PersistentVolumeClaimSpec{
|
|
Resources: v1.ResourceRequirements{
|
|
Requests: v1.ResourceList{
|
|
v1.ResourceStorage: specSize,
|
|
},
|
|
},
|
|
VolumeName: pvName,
|
|
VolumeMode: volumeMode,
|
|
},
|
|
Status: v1.PersistentVolumeClaimStatus{
|
|
Capacity: v1.ResourceList{
|
|
v1.ResourceStorage: statusSize,
|
|
},
|
|
},
|
|
}
|
|
return pvc
|
|
}
|
|
|
|
func getTestPV(pvName string, volumeMode *v1.PersistentVolumeMode, pvSize resource.Quantity) *v1.PersistentVolume {
|
|
pv := &v1.PersistentVolume{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: pvName,
|
|
UID: "pvuid",
|
|
},
|
|
Spec: v1.PersistentVolumeSpec{
|
|
ClaimRef: &v1.ObjectReference{Name: "pvc"},
|
|
VolumeMode: volumeMode,
|
|
Capacity: v1.ResourceList{
|
|
v1.ResourceStorage: pvSize,
|
|
},
|
|
},
|
|
}
|
|
return pv
|
|
}
|
|
|
|
func getTestPod(claimName string) *v1.Pod {
|
|
pod := &v1.Pod{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "pod1",
|
|
UID: "pod1uid",
|
|
},
|
|
Spec: v1.PodSpec{
|
|
Volumes: []v1.Volume{
|
|
{
|
|
Name: "volume-name",
|
|
VolumeSource: v1.VolumeSource{
|
|
PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
|
|
ClaimName: claimName,
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
return pod
|
|
}
|
|
|
|
func Test_UncertainDeviceGlobalMounts(t *testing.T) {
|
|
var tests = []struct {
|
|
name string
|
|
deviceState operationexecutor.DeviceMountState
|
|
unmountDeviceCallCount int
|
|
volumeName string
|
|
supportRemount bool
|
|
}{
|
|
{
|
|
name: "timed out operations should result in device marked as uncertain",
|
|
deviceState: operationexecutor.DeviceMountUncertain,
|
|
unmountDeviceCallCount: 1,
|
|
volumeName: volumetesting.TimeoutOnMountDeviceVolumeName,
|
|
},
|
|
{
|
|
name: "failed operation should result in not-mounted device",
|
|
deviceState: operationexecutor.DeviceNotMounted,
|
|
unmountDeviceCallCount: 0,
|
|
volumeName: volumetesting.FailMountDeviceVolumeName,
|
|
},
|
|
{
|
|
name: "timeout followed by failed operation should result in non-mounted device",
|
|
deviceState: operationexecutor.DeviceNotMounted,
|
|
unmountDeviceCallCount: 0,
|
|
volumeName: volumetesting.TimeoutAndFailOnMountDeviceVolumeName,
|
|
},
|
|
{
|
|
name: "success followed by timeout operation should result in mounted device",
|
|
deviceState: operationexecutor.DeviceGloballyMounted,
|
|
unmountDeviceCallCount: 1,
|
|
volumeName: volumetesting.SuccessAndTimeoutDeviceName,
|
|
supportRemount: true,
|
|
},
|
|
{
|
|
name: "success followed by failed operation should result in mounted device",
|
|
deviceState: operationexecutor.DeviceGloballyMounted,
|
|
unmountDeviceCallCount: 1,
|
|
volumeName: volumetesting.SuccessAndFailOnMountDeviceName,
|
|
supportRemount: true,
|
|
},
|
|
}
|
|
|
|
modes := []v1.PersistentVolumeMode{v1.PersistentVolumeBlock, v1.PersistentVolumeFilesystem}
|
|
|
|
for modeIndex := range modes {
|
|
for tcIndex := range tests {
|
|
mode := modes[modeIndex]
|
|
tc := tests[tcIndex]
|
|
testName := fmt.Sprintf("%s [%s]", tc.name, mode)
|
|
uniqueTestString := fmt.Sprintf("global-mount-%s", testName)
|
|
uniquePodDir := fmt.Sprintf("%s-%x", kubeletPodsDir, md5.Sum([]byte(uniqueTestString)))
|
|
t.Run(testName+"[", func(t *testing.T) {
|
|
t.Parallel()
|
|
pv := &v1.PersistentVolume{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: tc.volumeName,
|
|
UID: "pvuid",
|
|
},
|
|
Spec: v1.PersistentVolumeSpec{
|
|
ClaimRef: &v1.ObjectReference{Name: "pvc"},
|
|
VolumeMode: &mode,
|
|
},
|
|
}
|
|
pvc := &v1.PersistentVolumeClaim{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "pvc",
|
|
UID: "pvcuid",
|
|
},
|
|
Spec: v1.PersistentVolumeClaimSpec{
|
|
VolumeName: tc.volumeName,
|
|
VolumeMode: &mode,
|
|
},
|
|
}
|
|
pod := &v1.Pod{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "pod1",
|
|
UID: "pod1uid",
|
|
},
|
|
Spec: v1.PodSpec{
|
|
Volumes: []v1.Volume{
|
|
{
|
|
Name: "volume-name",
|
|
VolumeSource: v1.VolumeSource{
|
|
PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
|
|
ClaimName: pvc.Name,
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
node := &v1.Node{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: string(nodeName),
|
|
},
|
|
Spec: v1.NodeSpec{},
|
|
Status: v1.NodeStatus{
|
|
VolumesAttached: []v1.AttachedVolume{
|
|
{
|
|
Name: v1.UniqueVolumeName(fmt.Sprintf("fake-plugin/%s", tc.volumeName)),
|
|
DevicePath: "fake/path",
|
|
},
|
|
},
|
|
},
|
|
}
|
|
volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgrWithNode(t, node)
|
|
fakePlugin.SupportsRemount = tc.supportRemount
|
|
seLinuxTranslator := util.NewFakeSELinuxLabelTranslator()
|
|
|
|
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr, seLinuxTranslator)
|
|
asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
|
|
kubeClient := createtestClientWithPVPVC(pv, pvc, v1.AttachedVolume{
|
|
Name: v1.UniqueVolumeName(fmt.Sprintf("fake-plugin/%s", tc.volumeName)),
|
|
DevicePath: "fake/path",
|
|
})
|
|
fakeRecorder := &record.FakeRecorder{}
|
|
fakeHandler := volumetesting.NewBlockVolumePathHandler()
|
|
oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
|
|
kubeClient,
|
|
volumePluginMgr,
|
|
fakeRecorder,
|
|
fakeHandler))
|
|
|
|
reconciler := NewReconciler(
|
|
kubeClient,
|
|
true, /* controllerAttachDetachEnabled */
|
|
reconcilerLoopSleepDuration,
|
|
waitForAttachTimeout,
|
|
nodeName,
|
|
dsw,
|
|
asw,
|
|
hasAddedPods,
|
|
oex,
|
|
&mount.FakeMounter{},
|
|
hostutil.NewFakeHostUtil(nil),
|
|
volumePluginMgr,
|
|
uniquePodDir)
|
|
volumeSpec := &volume.Spec{PersistentVolume: pv}
|
|
podName := util.GetUniquePodName(pod)
|
|
volumeName, err := dsw.AddPodToVolume(
|
|
podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */, nil /* seLinuxLabel */)
|
|
// Assert
|
|
if err != nil {
|
|
t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
|
|
}
|
|
dsw.MarkVolumesReportedInUse([]v1.UniqueVolumeName{volumeName})
|
|
|
|
// Start the reconciler to fill ASW.
|
|
stopChan, stoppedChan := make(chan struct{}), make(chan struct{})
|
|
go func() {
|
|
reconciler.Run(stopChan)
|
|
close(stoppedChan)
|
|
}()
|
|
waitForVolumeToExistInASW(t, volumeName, asw)
|
|
if tc.volumeName == volumetesting.TimeoutAndFailOnMountDeviceVolumeName {
|
|
// Wait upto 10s for reconciler to catch up
|
|
time.Sleep(reconcilerSyncWaitDuration)
|
|
}
|
|
|
|
if tc.volumeName == volumetesting.SuccessAndFailOnMountDeviceName ||
|
|
tc.volumeName == volumetesting.SuccessAndTimeoutDeviceName {
|
|
// wait for mount and then break it via remount
|
|
waitForMount(t, fakePlugin, volumeName, asw)
|
|
asw.MarkRemountRequired(podName)
|
|
time.Sleep(reconcilerSyncWaitDuration)
|
|
}
|
|
|
|
if tc.deviceState == operationexecutor.DeviceMountUncertain {
|
|
waitForUncertainGlobalMount(t, volumeName, asw)
|
|
}
|
|
|
|
if tc.deviceState == operationexecutor.DeviceGloballyMounted {
|
|
waitForMount(t, fakePlugin, volumeName, asw)
|
|
}
|
|
|
|
dsw.DeletePodFromVolume(podName, volumeName)
|
|
waitForDetach(t, volumeName, asw)
|
|
if mode == v1.PersistentVolumeFilesystem {
|
|
err = volumetesting.VerifyUnmountDeviceCallCount(tc.unmountDeviceCallCount, fakePlugin)
|
|
} else {
|
|
if tc.unmountDeviceCallCount == 0 {
|
|
err = volumetesting.VerifyZeroTearDownDeviceCallCount(fakePlugin)
|
|
} else {
|
|
err = volumetesting.VerifyTearDownDeviceCallCount(tc.unmountDeviceCallCount, fakePlugin)
|
|
}
|
|
}
|
|
if err != nil {
|
|
t.Errorf("Error verifying UnMountDeviceCallCount: %v", err)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
}
|
|
|
|
func Test_UncertainVolumeMountState(t *testing.T) {
|
|
var tests = []struct {
|
|
name string
|
|
volumeState operationexecutor.VolumeMountState
|
|
unmountDeviceCallCount int
|
|
unmountVolumeCount int
|
|
volumeName string
|
|
supportRemount bool
|
|
pvcStatusSize resource.Quantity
|
|
pvSize resource.Quantity
|
|
}{
|
|
{
|
|
name: "timed out operations should result in volume marked as uncertain",
|
|
volumeState: operationexecutor.VolumeMountUncertain,
|
|
unmountDeviceCallCount: 1,
|
|
unmountVolumeCount: 1,
|
|
volumeName: volumetesting.TimeoutOnSetupVolumeName,
|
|
},
|
|
{
|
|
name: "failed operation should result in not-mounted volume",
|
|
volumeState: operationexecutor.VolumeNotMounted,
|
|
unmountDeviceCallCount: 1,
|
|
unmountVolumeCount: 0,
|
|
volumeName: volumetesting.FailOnSetupVolumeName,
|
|
},
|
|
{
|
|
name: "timeout followed by failed operation should result in non-mounted volume",
|
|
volumeState: operationexecutor.VolumeNotMounted,
|
|
unmountDeviceCallCount: 1,
|
|
unmountVolumeCount: 0,
|
|
volumeName: volumetesting.TimeoutAndFailOnSetupVolumeName,
|
|
},
|
|
{
|
|
name: "success followed by timeout operation should result in mounted volume",
|
|
volumeState: operationexecutor.VolumeMounted,
|
|
unmountDeviceCallCount: 1,
|
|
unmountVolumeCount: 1,
|
|
volumeName: volumetesting.SuccessAndTimeoutSetupVolumeName,
|
|
supportRemount: true,
|
|
},
|
|
{
|
|
name: "success followed by failed operation should result in mounted volume",
|
|
volumeState: operationexecutor.VolumeMounted,
|
|
unmountDeviceCallCount: 1,
|
|
unmountVolumeCount: 1,
|
|
volumeName: volumetesting.SuccessAndFailOnSetupVolumeName,
|
|
supportRemount: true,
|
|
},
|
|
{
|
|
name: "mount success but fail to expand filesystem",
|
|
volumeState: operationexecutor.VolumeMountUncertain,
|
|
unmountDeviceCallCount: 1,
|
|
unmountVolumeCount: 1,
|
|
volumeName: volumetesting.FailVolumeExpansion,
|
|
supportRemount: true,
|
|
pvSize: resource.MustParse("10G"),
|
|
pvcStatusSize: resource.MustParse("2G"),
|
|
},
|
|
}
|
|
modes := []v1.PersistentVolumeMode{v1.PersistentVolumeBlock, v1.PersistentVolumeFilesystem}
|
|
|
|
for modeIndex := range modes {
|
|
for tcIndex := range tests {
|
|
mode := modes[modeIndex]
|
|
tc := tests[tcIndex]
|
|
testName := fmt.Sprintf("%s [%s]", tc.name, mode)
|
|
uniqueTestString := fmt.Sprintf("local-mount-%s", testName)
|
|
uniquePodDir := fmt.Sprintf("%s-%x", kubeletPodsDir, md5.Sum([]byte(uniqueTestString)))
|
|
t.Run(testName, func(t *testing.T) {
|
|
t.Parallel()
|
|
pv := &v1.PersistentVolume{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: tc.volumeName,
|
|
UID: "pvuid",
|
|
},
|
|
Spec: v1.PersistentVolumeSpec{
|
|
ClaimRef: &v1.ObjectReference{Name: "pvc"},
|
|
VolumeMode: &mode,
|
|
},
|
|
}
|
|
if tc.pvSize.CmpInt64(0) > 0 {
|
|
pv.Spec.Capacity = v1.ResourceList{
|
|
v1.ResourceStorage: tc.pvSize,
|
|
}
|
|
}
|
|
pvc := &v1.PersistentVolumeClaim{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "pvc",
|
|
UID: "pvcuid",
|
|
},
|
|
Spec: v1.PersistentVolumeClaimSpec{
|
|
VolumeName: tc.volumeName,
|
|
VolumeMode: &mode,
|
|
},
|
|
}
|
|
if tc.pvcStatusSize.CmpInt64(0) > 0 {
|
|
pvc.Status = v1.PersistentVolumeClaimStatus{
|
|
Capacity: v1.ResourceList{
|
|
v1.ResourceStorage: tc.pvcStatusSize,
|
|
},
|
|
}
|
|
}
|
|
pod := &v1.Pod{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "pod1",
|
|
UID: "pod1uid",
|
|
},
|
|
Spec: v1.PodSpec{
|
|
Volumes: []v1.Volume{
|
|
{
|
|
Name: "volume-name",
|
|
VolumeSource: v1.VolumeSource{
|
|
PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
|
|
ClaimName: pvc.Name,
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
node := &v1.Node{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: string(nodeName),
|
|
},
|
|
Status: v1.NodeStatus{
|
|
VolumesAttached: []v1.AttachedVolume{
|
|
{
|
|
Name: v1.UniqueVolumeName(fmt.Sprintf("fake-plugin/%s", tc.volumeName)),
|
|
DevicePath: "fake/path",
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgrWithNode(t, node)
|
|
fakePlugin.SupportsRemount = tc.supportRemount
|
|
seLinuxTranslator := util.NewFakeSELinuxLabelTranslator()
|
|
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr, seLinuxTranslator)
|
|
asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
|
|
kubeClient := createtestClientWithPVPVC(pv, pvc, v1.AttachedVolume{
|
|
Name: v1.UniqueVolumeName(fmt.Sprintf("fake-plugin/%s", tc.volumeName)),
|
|
DevicePath: "fake/path",
|
|
})
|
|
fakeRecorder := &record.FakeRecorder{}
|
|
fakeHandler := volumetesting.NewBlockVolumePathHandler()
|
|
oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
|
|
kubeClient,
|
|
volumePluginMgr,
|
|
fakeRecorder,
|
|
fakeHandler))
|
|
|
|
reconciler := NewReconciler(
|
|
kubeClient,
|
|
true, /* controllerAttachDetachEnabled */
|
|
reconcilerLoopSleepDuration,
|
|
waitForAttachTimeout,
|
|
nodeName,
|
|
dsw,
|
|
asw,
|
|
hasAddedPods,
|
|
oex,
|
|
&mount.FakeMounter{},
|
|
hostutil.NewFakeHostUtil(nil),
|
|
volumePluginMgr,
|
|
uniquePodDir)
|
|
volumeSpec := &volume.Spec{PersistentVolume: pv}
|
|
podName := util.GetUniquePodName(pod)
|
|
volumeName, err := dsw.AddPodToVolume(
|
|
podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */, nil /* seLinuxLabel */)
|
|
// Assert
|
|
if err != nil {
|
|
t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
|
|
}
|
|
dsw.MarkVolumesReportedInUse([]v1.UniqueVolumeName{volumeName})
|
|
|
|
// Start the reconciler to fill ASW.
|
|
stopChan, stoppedChan := make(chan struct{}), make(chan struct{})
|
|
go func() {
|
|
reconciler.Run(stopChan)
|
|
close(stoppedChan)
|
|
}()
|
|
waitForVolumeToExistInASW(t, volumeName, asw)
|
|
// all of these tests rely on device to be globally mounted and hence waiting for global
|
|
// mount ensures that unmountDevice is called as expected.
|
|
waitForGlobalMount(t, volumeName, asw)
|
|
if tc.volumeName == volumetesting.TimeoutAndFailOnSetupVolumeName {
|
|
// Wait upto 10s for reconciler to catchup
|
|
time.Sleep(reconcilerSyncWaitDuration)
|
|
}
|
|
|
|
if tc.volumeName == volumetesting.SuccessAndFailOnSetupVolumeName ||
|
|
tc.volumeName == volumetesting.SuccessAndTimeoutSetupVolumeName {
|
|
// wait for mount and then break it via remount
|
|
waitForMount(t, fakePlugin, volumeName, asw)
|
|
asw.MarkRemountRequired(podName)
|
|
time.Sleep(reconcilerSyncWaitDuration)
|
|
}
|
|
|
|
if tc.volumeState == operationexecutor.VolumeMountUncertain {
|
|
waitForUncertainPodMount(t, volumeName, podName, asw)
|
|
}
|
|
|
|
if tc.volumeState == operationexecutor.VolumeMounted {
|
|
waitForMount(t, fakePlugin, volumeName, asw)
|
|
}
|
|
|
|
dsw.DeletePodFromVolume(podName, volumeName)
|
|
waitForDetach(t, volumeName, asw)
|
|
|
|
if mode == v1.PersistentVolumeFilesystem {
|
|
if err := volumetesting.VerifyUnmountDeviceCallCount(tc.unmountDeviceCallCount, fakePlugin); err != nil {
|
|
t.Errorf("Error verifying UnMountDeviceCallCount: %v", err)
|
|
}
|
|
if err := volumetesting.VerifyTearDownCallCount(tc.unmountVolumeCount, fakePlugin); err != nil {
|
|
t.Errorf("Error verifying UnMountDeviceCallCount: %v", err)
|
|
}
|
|
} else {
|
|
if tc.unmountVolumeCount == 0 {
|
|
if err := volumetesting.VerifyZeroUnmapPodDeviceCallCount(fakePlugin); err != nil {
|
|
t.Errorf("Error verifying UnMountDeviceCallCount: %v", err)
|
|
}
|
|
} else {
|
|
if err := volumetesting.VerifyUnmapPodDeviceCallCount(tc.unmountVolumeCount, fakePlugin); err != nil {
|
|
t.Errorf("Error verifying UnMountDeviceCallCount: %v", err)
|
|
}
|
|
}
|
|
if tc.unmountDeviceCallCount == 0 {
|
|
if err := volumetesting.VerifyZeroTearDownDeviceCallCount(fakePlugin); err != nil {
|
|
t.Errorf("Error verifying UnMountDeviceCallCount: %v", err)
|
|
}
|
|
} else {
|
|
if err := volumetesting.VerifyTearDownDeviceCallCount(tc.unmountDeviceCallCount, fakePlugin); err != nil {
|
|
t.Errorf("Error verifying UnMountDeviceCallCount: %v", err)
|
|
}
|
|
}
|
|
}
|
|
})
|
|
}
|
|
}
|
|
}
|
|
|
|
func waitForUncertainGlobalMount(t *testing.T, volumeName v1.UniqueVolumeName, asw cache.ActualStateOfWorld) {
|
|
// check if volume is globally mounted in uncertain state
|
|
err := retryWithExponentialBackOff(
|
|
testOperationBackOffDuration,
|
|
func() (bool, error) {
|
|
unmountedVolumes := asw.GetUnmountedVolumes()
|
|
for _, v := range unmountedVolumes {
|
|
if v.VolumeName == volumeName && v.DeviceMountState == operationexecutor.DeviceMountUncertain {
|
|
return true, nil
|
|
}
|
|
}
|
|
return false, nil
|
|
},
|
|
)
|
|
|
|
if err != nil {
|
|
t.Fatalf("expected volumes %s to be mounted in uncertain state globally", volumeName)
|
|
}
|
|
}
|
|
|
|
func waitForGlobalMount(t *testing.T, volumeName v1.UniqueVolumeName, asw cache.ActualStateOfWorld) {
|
|
// check if volume is globally mounted
|
|
err := retryWithExponentialBackOff(
|
|
testOperationBackOffDuration,
|
|
func() (bool, error) {
|
|
mountedVolumes := asw.GetGloballyMountedVolumes()
|
|
for _, v := range mountedVolumes {
|
|
if v.VolumeName == volumeName {
|
|
return true, nil
|
|
}
|
|
}
|
|
return false, nil
|
|
},
|
|
)
|
|
|
|
if err != nil {
|
|
t.Fatalf("expected volume devices %s to be mounted globally", volumeName)
|
|
}
|
|
}
|
|
|
|
func waitForUncertainPodMount(t *testing.T, volumeName v1.UniqueVolumeName, podName types.UniquePodName, asw cache.ActualStateOfWorld) {
|
|
// check if volume is locally pod mounted in uncertain state
|
|
err := retryWithExponentialBackOff(
|
|
testOperationBackOffDuration,
|
|
func() (bool, error) {
|
|
mounted, _, err := asw.PodExistsInVolume(podName, volumeName, resource.Quantity{}, "" /* SELinuxContext */)
|
|
if mounted || err != nil {
|
|
return false, nil
|
|
}
|
|
allMountedVolumes := asw.GetAllMountedVolumes()
|
|
for _, v := range allMountedVolumes {
|
|
if v.VolumeName == volumeName {
|
|
return true, nil
|
|
}
|
|
}
|
|
return false, nil
|
|
},
|
|
)
|
|
|
|
if err != nil {
|
|
t.Fatalf("expected volumes %s to be mounted in uncertain state for pod", volumeName)
|
|
}
|
|
}
|
|
|
|
func waitForMount(
|
|
t *testing.T,
|
|
fakePlugin *volumetesting.FakeVolumePlugin,
|
|
volumeName v1.UniqueVolumeName,
|
|
asw cache.ActualStateOfWorld) {
|
|
err := retryWithExponentialBackOff(
|
|
testOperationBackOffDuration,
|
|
func() (bool, error) {
|
|
mountedVolumes := asw.GetMountedVolumes()
|
|
for _, mountedVolume := range mountedVolumes {
|
|
if mountedVolume.VolumeName == volumeName {
|
|
return true, nil
|
|
}
|
|
}
|
|
|
|
return false, nil
|
|
},
|
|
)
|
|
|
|
if err != nil {
|
|
t.Fatalf("Timed out waiting for volume %q to be attached.", volumeName)
|
|
}
|
|
}
|
|
|
|
func waitForVolumeToExistInASW(t *testing.T, volumeName v1.UniqueVolumeName, asw cache.ActualStateOfWorld) {
|
|
err := retryWithExponentialBackOff(
|
|
testOperationBackOffDuration,
|
|
func() (bool, error) {
|
|
if asw.VolumeExists(volumeName) {
|
|
return true, nil
|
|
}
|
|
return false, nil
|
|
},
|
|
)
|
|
if err != nil {
|
|
t.Fatalf("Timed out waiting for volume %q to be exist in asw.", volumeName)
|
|
}
|
|
}
|
|
|
|
func waitForDetach(
|
|
t *testing.T,
|
|
volumeName v1.UniqueVolumeName,
|
|
asw cache.ActualStateOfWorld) {
|
|
err := retryWithExponentialBackOff(
|
|
testOperationBackOffDuration,
|
|
func() (bool, error) {
|
|
if asw.VolumeExists(volumeName) {
|
|
return false, nil
|
|
}
|
|
|
|
return true, nil
|
|
},
|
|
)
|
|
|
|
if err != nil {
|
|
t.Fatalf("Timed out waiting for volume %q to be detached.", volumeName)
|
|
}
|
|
}
|
|
|
|
func retryWithExponentialBackOff(initialDuration time.Duration, fn wait.ConditionFunc) error {
|
|
backoff := wait.Backoff{
|
|
Duration: initialDuration,
|
|
Factor: 3,
|
|
Jitter: 0,
|
|
Steps: 6,
|
|
}
|
|
return wait.ExponentialBackoff(backoff, fn)
|
|
}
|
|
|
|
func createTestClient(attachedVolumes ...v1.AttachedVolume) *fake.Clientset {
|
|
fakeClient := &fake.Clientset{}
|
|
if len(attachedVolumes) == 0 {
|
|
attachedVolumes = append(attachedVolumes, v1.AttachedVolume{
|
|
Name: "fake-plugin/fake-device1",
|
|
DevicePath: "fake/path",
|
|
})
|
|
}
|
|
fakeClient.AddReactor("get", "nodes",
|
|
func(action core.Action) (bool, runtime.Object, error) {
|
|
return true, &v1.Node{
|
|
ObjectMeta: metav1.ObjectMeta{Name: string(nodeName)},
|
|
Status: v1.NodeStatus{
|
|
VolumesAttached: attachedVolumes,
|
|
},
|
|
}, nil
|
|
},
|
|
)
|
|
|
|
fakeClient.AddReactor("*", "*", func(action core.Action) (bool, runtime.Object, error) {
|
|
return true, nil, fmt.Errorf("no reaction implemented for %s", action)
|
|
})
|
|
return fakeClient
|
|
}
|
|
|
|
func runReconciler(reconciler Reconciler) {
|
|
go reconciler.Run(wait.NeverStop)
|
|
}
|
|
|
|
func createtestClientWithPVPVC(pv *v1.PersistentVolume, pvc *v1.PersistentVolumeClaim, attachedVolumes ...v1.AttachedVolume) *fake.Clientset {
|
|
fakeClient := &fake.Clientset{}
|
|
if len(attachedVolumes) == 0 {
|
|
attachedVolumes = append(attachedVolumes, v1.AttachedVolume{
|
|
Name: "fake-plugin/pv",
|
|
DevicePath: "fake/path",
|
|
})
|
|
}
|
|
fakeClient.AddReactor("get", "nodes",
|
|
func(action core.Action) (bool, runtime.Object, error) {
|
|
return true, &v1.Node{
|
|
ObjectMeta: metav1.ObjectMeta{Name: string(nodeName)},
|
|
Status: v1.NodeStatus{
|
|
VolumesAttached: attachedVolumes,
|
|
},
|
|
}, nil
|
|
})
|
|
fakeClient.AddReactor("get", "persistentvolumeclaims", func(action core.Action) (bool, runtime.Object, error) {
|
|
return true, pvc, nil
|
|
})
|
|
fakeClient.AddReactor("get", "persistentvolumes", func(action core.Action) (bool, runtime.Object, error) {
|
|
return true, pv, nil
|
|
})
|
|
fakeClient.AddReactor("patch", "persistentvolumeclaims", func(action core.Action) (bool, runtime.Object, error) {
|
|
if action.GetSubresource() == "status" {
|
|
return true, pvc, nil
|
|
}
|
|
return true, nil, fmt.Errorf("no reaction implemented for %s", action)
|
|
})
|
|
fakeClient.AddReactor("*", "*", func(action core.Action) (bool, runtime.Object, error) {
|
|
return true, nil, fmt.Errorf("no reaction implemented for %s", action)
|
|
})
|
|
return fakeClient
|
|
}
|
|
|
|
func Test_Run_Positive_VolumeMountControllerAttachEnabledRace(t *testing.T) {
|
|
// Arrange
|
|
node := &v1.Node{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: string(nodeName),
|
|
},
|
|
Status: v1.NodeStatus{
|
|
VolumesAttached: []v1.AttachedVolume{
|
|
{
|
|
Name: "fake-plugin/fake-device1",
|
|
DevicePath: "/fake/path",
|
|
},
|
|
},
|
|
},
|
|
}
|
|
volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgrWithNode(t, node)
|
|
seLinuxTranslator := util.NewFakeSELinuxLabelTranslator()
|
|
|
|
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr, seLinuxTranslator)
|
|
asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
|
|
kubeClient := createTestClient()
|
|
fakeRecorder := &record.FakeRecorder{}
|
|
fakeHandler := volumetesting.NewBlockVolumePathHandler()
|
|
oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
|
|
kubeClient,
|
|
volumePluginMgr,
|
|
fakeRecorder,
|
|
fakeHandler))
|
|
reconciler := NewReconciler(
|
|
kubeClient,
|
|
true, /* controllerAttachDetachEnabled */
|
|
reconcilerLoopSleepDuration,
|
|
waitForAttachTimeout,
|
|
nodeName,
|
|
dsw,
|
|
asw,
|
|
hasAddedPods,
|
|
oex,
|
|
mount.NewFakeMounter(nil),
|
|
hostutil.NewFakeHostUtil(nil),
|
|
volumePluginMgr,
|
|
kubeletPodsDir)
|
|
pod := &v1.Pod{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "pod1",
|
|
UID: "pod1uid",
|
|
},
|
|
Spec: v1.PodSpec{
|
|
Volumes: []v1.Volume{
|
|
{
|
|
Name: "volume-name",
|
|
VolumeSource: v1.VolumeSource{
|
|
GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{
|
|
PDName: "fake-device1",
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
// Some steps are executes out of order in callbacks, follow the numbers.
|
|
|
|
// 1. Add a volume to DSW and wait until it's mounted
|
|
volumeSpec := &volume.Spec{Volume: &pod.Spec.Volumes[0]}
|
|
// copy before reconciler runs to avoid data race.
|
|
volumeSpecCopy := &volume.Spec{Volume: &pod.Spec.Volumes[0]}
|
|
podName := util.GetUniquePodName(pod)
|
|
generatedVolumeName, err := dsw.AddPodToVolume(
|
|
podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */, nil /* seLinuxLabel */)
|
|
dsw.MarkVolumesReportedInUse([]v1.UniqueVolumeName{generatedVolumeName})
|
|
|
|
if err != nil {
|
|
t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
|
|
}
|
|
// Start the reconciler to fill ASW.
|
|
stopChan, stoppedChan := make(chan struct{}), make(chan struct{})
|
|
go func() {
|
|
reconciler.Run(stopChan)
|
|
close(stoppedChan)
|
|
}()
|
|
waitForMount(t, fakePlugin, generatedVolumeName, asw)
|
|
// Stop the reconciler.
|
|
close(stopChan)
|
|
<-stoppedChan
|
|
|
|
finished := make(chan interface{})
|
|
fakePlugin.Lock()
|
|
fakePlugin.UnmountDeviceHook = func(mountPath string) error {
|
|
// Act:
|
|
// 3. While a volume is being unmounted, add it back to the desired state of world
|
|
klog.InfoS("UnmountDevice called")
|
|
var generatedVolumeNameCopy v1.UniqueVolumeName
|
|
generatedVolumeNameCopy, err = dsw.AddPodToVolume(
|
|
podName, pod, volumeSpecCopy, volumeSpec.Name(), "" /* volumeGidValue */, nil /* seLinuxLabel */)
|
|
dsw.MarkVolumesReportedInUse([]v1.UniqueVolumeName{generatedVolumeNameCopy})
|
|
return nil
|
|
}
|
|
|
|
fakePlugin.WaitForAttachHook = func(spec *volume.Spec, devicePath string, pod *v1.Pod, spectimeout time.Duration) (string, error) {
|
|
// Assert
|
|
// 4. When the volume is mounted again, expect that UnmountDevice operation did not clear devicePath
|
|
if devicePath == "" {
|
|
klog.ErrorS(nil, "Expected WaitForAttach called with devicePath from Node.Status")
|
|
close(finished)
|
|
return "", fmt.Errorf("Expected devicePath from Node.Status")
|
|
}
|
|
close(finished)
|
|
return devicePath, nil
|
|
}
|
|
fakePlugin.Unlock()
|
|
|
|
// Start the reconciler again.
|
|
go reconciler.Run(wait.NeverStop)
|
|
|
|
// 2. Delete the volume from DSW (and wait for callbacks)
|
|
dsw.DeletePodFromVolume(podName, generatedVolumeName)
|
|
|
|
<-finished
|
|
waitForMount(t, fakePlugin, generatedVolumeName, asw)
|
|
}
|
|
|
|
func getFakeNode() *v1.Node {
|
|
return &v1.Node{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: string(nodeName),
|
|
},
|
|
Status: v1.NodeStatus{
|
|
VolumesAttached: []v1.AttachedVolume{
|
|
{
|
|
Name: "fake-plugin/fake-device1",
|
|
DevicePath: "/fake/path",
|
|
},
|
|
},
|
|
},
|
|
}
|
|
}
|
|
|
|
func getInlineFakePod(podName, podUUID, outerName, innerName string) *v1.Pod {
|
|
pod := &v1.Pod{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: podName,
|
|
UID: k8stypes.UID(podUUID),
|
|
},
|
|
Spec: v1.PodSpec{
|
|
Volumes: []v1.Volume{
|
|
{
|
|
Name: outerName,
|
|
VolumeSource: v1.VolumeSource{
|
|
GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{
|
|
PDName: innerName,
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
return pod
|
|
}
|
|
|
|
func getReconciler(kubeletDir string, t *testing.T, volumePaths []string) (Reconciler, *volumetesting.FakeVolumePlugin) {
|
|
node := getFakeNode()
|
|
volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgrWithNodeAndRoot(t, node, kubeletDir)
|
|
tmpKubeletPodDir := filepath.Join(kubeletDir, "pods")
|
|
seLinuxTranslator := util.NewFakeSELinuxLabelTranslator()
|
|
|
|
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr, seLinuxTranslator)
|
|
asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
|
|
kubeClient := createTestClient()
|
|
fakeRecorder := &record.FakeRecorder{}
|
|
fakeHandler := volumetesting.NewBlockVolumePathHandler()
|
|
oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
|
|
kubeClient,
|
|
volumePluginMgr,
|
|
fakeRecorder,
|
|
fakeHandler))
|
|
mountPoints := []mount.MountPoint{}
|
|
for _, volumePath := range volumePaths {
|
|
mountPoints = append(mountPoints, mount.MountPoint{Path: volumePath})
|
|
}
|
|
rc := NewReconciler(
|
|
kubeClient,
|
|
true, /* controllerAttachDetachEnabled */
|
|
reconcilerLoopSleepDuration,
|
|
waitForAttachTimeout,
|
|
nodeName,
|
|
dsw,
|
|
asw,
|
|
hasAddedPods,
|
|
oex,
|
|
mount.NewFakeMounter(mountPoints),
|
|
hostutil.NewFakeHostUtil(nil),
|
|
volumePluginMgr,
|
|
tmpKubeletPodDir)
|
|
return rc, fakePlugin
|
|
}
|
|
|
|
func TestSyncStates(t *testing.T) {
|
|
type podInfo struct {
|
|
podName string
|
|
podUID string
|
|
outerVolumeName string
|
|
innerVolumeName string
|
|
}
|
|
defaultPodInfo := podInfo{
|
|
podName: "pod1",
|
|
podUID: "pod1uid",
|
|
outerVolumeName: "volume-name",
|
|
innerVolumeName: "volume-name",
|
|
}
|
|
tests := []struct {
|
|
name string
|
|
volumePaths []string
|
|
createMountPoint bool
|
|
podInfos []podInfo
|
|
postSyncStatCallback func(rcInstance *reconciler, fakePlugin *volumetesting.FakeVolumePlugin) error
|
|
verifyFunc func(rcInstance *reconciler, fakePlugin *volumetesting.FakeVolumePlugin) error
|
|
}{
|
|
{
|
|
name: "when two pods are using same volume and both are deleted",
|
|
volumePaths: []string{
|
|
filepath.Join("pod1", "volumes", "fake-plugin", "pvc-abcdef"),
|
|
filepath.Join("pod2", "volumes", "fake-plugin", "pvc-abcdef"),
|
|
},
|
|
createMountPoint: true,
|
|
podInfos: []podInfo{},
|
|
verifyFunc: func(rcInstance *reconciler, fakePlugin *volumetesting.FakeVolumePlugin) error {
|
|
mountedPods := rcInstance.actualStateOfWorld.GetMountedVolumes()
|
|
if len(mountedPods) != 2 {
|
|
return fmt.Errorf("expected 2 pods to in asw got %d", len(mountedPods))
|
|
}
|
|
return nil
|
|
},
|
|
},
|
|
{
|
|
name: "when two pods are using same volume and one of them is deleted",
|
|
volumePaths: []string{
|
|
filepath.Join("pod1uid", "volumes", "fake-plugin", "volume-name"),
|
|
filepath.Join("pod2uid", "volumes", "fake-plugin", "volume-name"),
|
|
},
|
|
createMountPoint: true,
|
|
podInfos: []podInfo{defaultPodInfo},
|
|
verifyFunc: func(rcInstance *reconciler, fakePlugin *volumetesting.FakeVolumePlugin) error {
|
|
// for pod that is deleted, volume is considered as mounted
|
|
mountedPods := rcInstance.actualStateOfWorld.GetMountedVolumes()
|
|
if len(mountedPods) != 1 {
|
|
return fmt.Errorf("expected 1 pods to in asw got %d", len(mountedPods))
|
|
}
|
|
if types.UniquePodName("pod2uid") != mountedPods[0].PodName {
|
|
return fmt.Errorf("expected mounted pod to be %s got %s", "pod2uid", mountedPods[0].PodName)
|
|
}
|
|
return nil
|
|
},
|
|
},
|
|
{
|
|
name: "when reconstruction fails for a volume, volumes should be cleaned up",
|
|
volumePaths: []string{
|
|
filepath.Join("pod1", "volumes", "fake-plugin", "pvc-abcdef"),
|
|
},
|
|
createMountPoint: false,
|
|
podInfos: []podInfo{},
|
|
verifyFunc: func(rcInstance *reconciler, fakePlugin *volumetesting.FakeVolumePlugin) error {
|
|
return retryWithExponentialBackOff(reconcilerSyncWaitDuration, func() (bool, error) {
|
|
err := volumetesting.VerifyTearDownCallCount(1, fakePlugin)
|
|
if err != nil {
|
|
return false, nil
|
|
}
|
|
return true, nil
|
|
})
|
|
},
|
|
},
|
|
{
|
|
name: "when volume exists in dsow, volume should be recorded in skipped during reconstruction",
|
|
volumePaths: []string{
|
|
filepath.Join("pod1uid", "volumes", "fake-plugin", "volume-name"),
|
|
},
|
|
createMountPoint: true,
|
|
podInfos: []podInfo{defaultPodInfo},
|
|
postSyncStatCallback: func(rcInstance *reconciler, fakePlugin *volumetesting.FakeVolumePlugin) error {
|
|
skippedVolumes := rcInstance.skippedDuringReconstruction
|
|
if len(skippedVolumes) != 1 {
|
|
return fmt.Errorf("expected 1 pods to in skippedDuringReconstruction got %d", len(skippedVolumes))
|
|
}
|
|
rcInstance.processReconstructedVolumes()
|
|
return nil
|
|
},
|
|
verifyFunc: func(rcInstance *reconciler, fakePlugin *volumetesting.FakeVolumePlugin) error {
|
|
mountedPods := rcInstance.actualStateOfWorld.GetAllMountedVolumes()
|
|
if len(mountedPods) != 1 {
|
|
return fmt.Errorf("expected 1 pods to in mounted volume list got %d", len(mountedPods))
|
|
}
|
|
mountedPodVolume := mountedPods[0]
|
|
addedViaReconstruction := rcInstance.actualStateOfWorld.IsVolumeReconstructed(mountedPodVolume.VolumeName, mountedPodVolume.PodName)
|
|
if !addedViaReconstruction {
|
|
return fmt.Errorf("expected volume %s to be marked as added via reconstruction", mountedPodVolume.VolumeName)
|
|
}
|
|
|
|
// check device mount state
|
|
attachedVolumes := rcInstance.actualStateOfWorld.GetAttachedVolumes()
|
|
if len(attachedVolumes) != 1 {
|
|
return fmt.Errorf("expected 1 volume to be unmounted, got %d", len(attachedVolumes))
|
|
}
|
|
firstAttachedVolume := attachedVolumes[0]
|
|
if !firstAttachedVolume.DeviceMayBeMounted() {
|
|
return fmt.Errorf("expected %s volume to be mounted in uncertain state", firstAttachedVolume.VolumeName)
|
|
}
|
|
|
|
// also skippedVolumes map should be empty
|
|
skippedVolumes := rcInstance.skippedDuringReconstruction
|
|
if len(skippedVolumes) > 0 {
|
|
return fmt.Errorf("expected 0 pods in skipped volumes found %d", len(skippedVolumes))
|
|
}
|
|
return nil
|
|
},
|
|
},
|
|
}
|
|
for _, tc := range tests {
|
|
t.Run(tc.name, func(t *testing.T) {
|
|
tmpKubeletDir, err := os.MkdirTemp("", "")
|
|
if err != nil {
|
|
t.Fatalf("can't make a temp directory for kubeletPods: %v", err)
|
|
}
|
|
defer os.RemoveAll(tmpKubeletDir)
|
|
|
|
// create kubelet pod directory
|
|
tmpKubeletPodDir := filepath.Join(tmpKubeletDir, "pods")
|
|
os.MkdirAll(tmpKubeletPodDir, 0755)
|
|
|
|
mountPaths := []string{}
|
|
|
|
// create pod and volume directories so as reconciler can find them.
|
|
for _, volumePath := range tc.volumePaths {
|
|
vp := filepath.Join(tmpKubeletPodDir, volumePath)
|
|
if tc.createMountPoint {
|
|
mountPaths = append(mountPaths, vp)
|
|
}
|
|
os.MkdirAll(vp, 0755)
|
|
}
|
|
|
|
rc, fakePlugin := getReconciler(tmpKubeletDir, t, mountPaths)
|
|
rcInstance, _ := rc.(*reconciler)
|
|
|
|
for _, tpodInfo := range tc.podInfos {
|
|
pod := getInlineFakePod(tpodInfo.podName, tpodInfo.podUID, tpodInfo.outerVolumeName, tpodInfo.innerVolumeName)
|
|
volumeSpec := &volume.Spec{Volume: &pod.Spec.Volumes[0]}
|
|
podName := util.GetUniquePodName(pod)
|
|
volumeName, err := rcInstance.desiredStateOfWorld.AddPodToVolume(
|
|
podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */, nil /* SELinuxContext */)
|
|
if err != nil {
|
|
t.Fatalf("error adding volume %s to dsow: %v", volumeSpec.Name(), err)
|
|
}
|
|
rcInstance.actualStateOfWorld.MarkVolumeAsAttached(volumeName, volumeSpec, nodeName, "")
|
|
}
|
|
|
|
rcInstance.syncStates(tmpKubeletPodDir)
|
|
if tc.postSyncStatCallback != nil {
|
|
err := tc.postSyncStatCallback(rcInstance, fakePlugin)
|
|
if err != nil {
|
|
t.Errorf("test %s, postSyncStatCallback failed: %v", tc.name, err)
|
|
}
|
|
}
|
|
|
|
if err := tc.verifyFunc(rcInstance, fakePlugin); err != nil {
|
|
t.Errorf("test %s failed: %v", tc.name, err)
|
|
}
|
|
})
|
|
}
|
|
}
|