kubernetes/test/e2e/storage/testsuites/multivolume.go
Patrick Ohly 2f6c4f5eab e2e: use Ginkgo context
All code must use the context from Ginkgo when doing API calls or polling for a
change, otherwise the code would not return immediately when the test gets
aborted.
2022-12-16 20:14:04 +01:00

812 lines
34 KiB
Go

/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package testsuites
import (
"context"
"fmt"
"time"
"github.com/onsi/ginkgo/v2"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/errors"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/kubernetes/test/e2e/framework"
e2enode "k8s.io/kubernetes/test/e2e/framework/node"
e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
e2eoutput "k8s.io/kubernetes/test/e2e/framework/pod/output"
e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
e2evolume "k8s.io/kubernetes/test/e2e/framework/volume"
storageframework "k8s.io/kubernetes/test/e2e/storage/framework"
storageutils "k8s.io/kubernetes/test/e2e/storage/utils"
imageutils "k8s.io/kubernetes/test/utils/image"
admissionapi "k8s.io/pod-security-admission/api"
)
type multiVolumeTestSuite struct {
tsInfo storageframework.TestSuiteInfo
}
var _ storageframework.TestSuite = &multiVolumeTestSuite{}
// InitCustomMultiVolumeTestSuite returns multiVolumeTestSuite that implements TestSuite interface
// using custom test patterns
func InitCustomMultiVolumeTestSuite(patterns []storageframework.TestPattern) storageframework.TestSuite {
return &multiVolumeTestSuite{
tsInfo: storageframework.TestSuiteInfo{
Name: "multiVolume [Slow]",
TestPatterns: patterns,
SupportedSizeRange: e2evolume.SizeRange{
Min: "1Mi",
},
},
}
}
// InitMultiVolumeTestSuite returns multiVolumeTestSuite that implements TestSuite interface
// using test suite default patterns
func InitMultiVolumeTestSuite() storageframework.TestSuite {
patterns := []storageframework.TestPattern{
storageframework.FsVolModePreprovisionedPV,
storageframework.FsVolModeDynamicPV,
storageframework.BlockVolModePreprovisionedPV,
storageframework.BlockVolModeDynamicPV,
storageframework.Ext4DynamicPV,
storageframework.XfsDynamicPV,
storageframework.NtfsDynamicPV,
}
return InitCustomMultiVolumeTestSuite(patterns)
}
func (t *multiVolumeTestSuite) GetTestSuiteInfo() storageframework.TestSuiteInfo {
return t.tsInfo
}
func (t *multiVolumeTestSuite) SkipUnsupportedTests(driver storageframework.TestDriver, pattern storageframework.TestPattern) {
dInfo := driver.GetDriverInfo()
skipVolTypePatterns(pattern, driver, storageframework.NewVolTypeMap(storageframework.PreprovisionedPV))
if pattern.VolMode == v1.PersistentVolumeBlock && !dInfo.Capabilities[storageframework.CapBlock] {
e2eskipper.Skipf("Driver %s doesn't support %v -- skipping", dInfo.Name, pattern.VolMode)
}
}
func (t *multiVolumeTestSuite) DefineTests(driver storageframework.TestDriver, pattern storageframework.TestPattern) {
type local struct {
config *storageframework.PerTestConfig
cs clientset.Interface
ns *v1.Namespace
driver storageframework.TestDriver
resources []*storageframework.VolumeResource
migrationCheck *migrationOpCheck
}
var (
dInfo = driver.GetDriverInfo()
l local
)
// Beware that it also registers an AfterEach which renders f unusable. Any code using
// f must run inside an It or Context callback.
f := framework.NewFrameworkWithCustomTimeouts("multivolume", storageframework.GetDriverTimeouts(driver))
f.NamespacePodSecurityEnforceLevel = admissionapi.LevelPrivileged
init := func(ctx context.Context) {
l = local{}
l.ns = f.Namespace
l.cs = f.ClientSet
l.driver = driver
// Now do the more expensive test initialization.
l.config = driver.PrepareTest(ctx, f)
l.migrationCheck = newMigrationOpCheck(ctx, f.ClientSet, f.ClientConfig(), dInfo.InTreePluginName)
}
cleanup := func(ctx context.Context) {
var errs []error
for _, resource := range l.resources {
errs = append(errs, resource.CleanupResource(ctx))
}
framework.ExpectNoError(errors.NewAggregate(errs), "while cleanup resource")
l.migrationCheck.validateMigrationVolumeOpCounts(ctx)
}
// This tests below configuration:
// [pod1] same node [pod2]
// [ node1 ] ==> [ node1 ]
// / \ <- same volume mode / \
// [volume1] [volume2] [volume1] [volume2]
ginkgo.It("should access to two volumes with the same volume mode and retain data across pod recreation on the same node", func(ctx context.Context) {
// Currently, multiple volumes are not generally available for pre-provisoined volume,
// because containerized storage servers, such as iSCSI and rbd, are just returning
// a static volume inside container, not actually creating a new volume per request.
if pattern.VolType == storageframework.PreprovisionedPV {
e2eskipper.Skipf("This test doesn't work with pre-provisioned volume -- skipping")
}
init(ctx)
ginkgo.DeferCleanup(cleanup)
var pvcs []*v1.PersistentVolumeClaim
numVols := 2
for i := 0; i < numVols; i++ {
testVolumeSizeRange := t.GetTestSuiteInfo().SupportedSizeRange
resource := storageframework.CreateVolumeResource(ctx, driver, l.config, pattern, testVolumeSizeRange)
l.resources = append(l.resources, resource)
pvcs = append(pvcs, resource.Pvc)
}
TestAccessMultipleVolumesAcrossPodRecreation(ctx, l.config.Framework, l.cs, l.ns.Name,
l.config.ClientNodeSelection, pvcs, true /* sameNode */)
})
// This tests below configuration:
// [pod1] different node [pod2]
// [ node1 ] ==> [ node2 ]
// / \ <- same volume mode / \
// [volume1] [volume2] [volume1] [volume2]
ginkgo.It("should access to two volumes with the same volume mode and retain data across pod recreation on different node", func(ctx context.Context) {
// Currently, multiple volumes are not generally available for pre-provisoined volume,
// because containerized storage servers, such as iSCSI and rbd, are just returning
// a static volume inside container, not actually creating a new volume per request.
if pattern.VolType == storageframework.PreprovisionedPV {
e2eskipper.Skipf("This test doesn't work with pre-provisioned volume -- skipping")
}
init(ctx)
ginkgo.DeferCleanup(cleanup)
// Check different-node test requirement
if l.driver.GetDriverInfo().Capabilities[storageframework.CapSingleNodeVolume] {
e2eskipper.Skipf("Driver %s only supports %v -- skipping", l.driver.GetDriverInfo().Name, storageframework.CapSingleNodeVolume)
}
if l.config.ClientNodeSelection.Name != "" {
e2eskipper.Skipf("Driver %q requires to deploy on a specific node - skipping", l.driver.GetDriverInfo().Name)
}
if err := ensureTopologyRequirements(ctx, &l.config.ClientNodeSelection, l.cs, dInfo, 2); err != nil {
framework.Failf("Error setting topology requirements: %v", err)
}
var pvcs []*v1.PersistentVolumeClaim
numVols := 2
for i := 0; i < numVols; i++ {
testVolumeSizeRange := t.GetTestSuiteInfo().SupportedSizeRange
resource := storageframework.CreateVolumeResource(ctx, driver, l.config, pattern, testVolumeSizeRange)
l.resources = append(l.resources, resource)
pvcs = append(pvcs, resource.Pvc)
}
TestAccessMultipleVolumesAcrossPodRecreation(ctx, l.config.Framework, l.cs, l.ns.Name,
l.config.ClientNodeSelection, pvcs, false /* sameNode */)
})
// This tests below configuration (only <block, filesystem> pattern is tested):
// [pod1] same node [pod2]
// [ node1 ] ==> [ node1 ]
// / \ <- different volume mode / \
// [volume1] [volume2] [volume1] [volume2]
ginkgo.It("should access to two volumes with different volume mode and retain data across pod recreation on the same node", func(ctx context.Context) {
if pattern.VolMode == v1.PersistentVolumeFilesystem {
e2eskipper.Skipf("Filesystem volume case should be covered by block volume case -- skipping")
}
// Currently, multiple volumes are not generally available for pre-provisoined volume,
// because containerized storage servers, such as iSCSI and rbd, are just returning
// a static volume inside container, not actually creating a new volume per request.
if pattern.VolType == storageframework.PreprovisionedPV {
e2eskipper.Skipf("This test doesn't work with pre-provisioned volume -- skipping")
}
init(ctx)
ginkgo.DeferCleanup(cleanup)
var pvcs []*v1.PersistentVolumeClaim
numVols := 2
for i := 0; i < numVols; i++ {
curPattern := pattern
if i != 0 {
// 1st volume should be block and set filesystem for 2nd and later volumes
curPattern.VolMode = v1.PersistentVolumeFilesystem
}
testVolumeSizeRange := t.GetTestSuiteInfo().SupportedSizeRange
resource := storageframework.CreateVolumeResource(ctx, driver, l.config, curPattern, testVolumeSizeRange)
l.resources = append(l.resources, resource)
pvcs = append(pvcs, resource.Pvc)
}
TestAccessMultipleVolumesAcrossPodRecreation(ctx, l.config.Framework, l.cs, l.ns.Name,
l.config.ClientNodeSelection, pvcs, true /* sameNode */)
})
// This tests below configuration (only <block, filesystem> pattern is tested):
// [pod1] different node [pod2]
// [ node1 ] ==> [ node2 ]
// / \ <- different volume mode / \
// [volume1] [volume2] [volume1] [volume2]
ginkgo.It("should access to two volumes with different volume mode and retain data across pod recreation on different node", func(ctx context.Context) {
if pattern.VolMode == v1.PersistentVolumeFilesystem {
e2eskipper.Skipf("Filesystem volume case should be covered by block volume case -- skipping")
}
// Currently, multiple volumes are not generally available for pre-provisoined volume,
// because containerized storage servers, such as iSCSI and rbd, are just returning
// a static volume inside container, not actually creating a new volume per request.
if pattern.VolType == storageframework.PreprovisionedPV {
e2eskipper.Skipf("This test doesn't work with pre-provisioned volume -- skipping")
}
init(ctx)
ginkgo.DeferCleanup(cleanup)
// Check different-node test requirement
if l.driver.GetDriverInfo().Capabilities[storageframework.CapSingleNodeVolume] {
e2eskipper.Skipf("Driver %s only supports %v -- skipping", l.driver.GetDriverInfo().Name, storageframework.CapSingleNodeVolume)
}
if l.config.ClientNodeSelection.Name != "" {
e2eskipper.Skipf("Driver %q requires to deploy on a specific node - skipping", l.driver.GetDriverInfo().Name)
}
if err := ensureTopologyRequirements(ctx, &l.config.ClientNodeSelection, l.cs, dInfo, 2); err != nil {
framework.Failf("Error setting topology requirements: %v", err)
}
var pvcs []*v1.PersistentVolumeClaim
numVols := 2
for i := 0; i < numVols; i++ {
curPattern := pattern
if i != 0 {
// 1st volume should be block and set filesystem for 2nd and later volumes
curPattern.VolMode = v1.PersistentVolumeFilesystem
}
testVolumeSizeRange := t.GetTestSuiteInfo().SupportedSizeRange
resource := storageframework.CreateVolumeResource(ctx, driver, l.config, curPattern, testVolumeSizeRange)
l.resources = append(l.resources, resource)
pvcs = append(pvcs, resource.Pvc)
}
TestAccessMultipleVolumesAcrossPodRecreation(ctx, l.config.Framework, l.cs, l.ns.Name,
l.config.ClientNodeSelection, pvcs, false /* sameNode */)
})
// This tests below configuration:
// [pod1] [pod2]
// [ node1 ]
// \ / <- same volume mode
// [volume1]
ginkgo.It("should concurrently access the single volume from pods on the same node", func(ctx context.Context) {
init(ctx)
ginkgo.DeferCleanup(cleanup)
numPods := 2
if !l.driver.GetDriverInfo().Capabilities[storageframework.CapMultiPODs] {
e2eskipper.Skipf("Driver %q does not support multiple concurrent pods - skipping", dInfo.Name)
}
// Create volume
testVolumeSizeRange := t.GetTestSuiteInfo().SupportedSizeRange
resource := storageframework.CreateVolumeResource(ctx, l.driver, l.config, pattern, testVolumeSizeRange)
l.resources = append(l.resources, resource)
// Test access to the volume from pods on different node
TestConcurrentAccessToSingleVolume(ctx, l.config.Framework, l.cs, l.ns.Name,
l.config.ClientNodeSelection, resource.Pvc, numPods, true /* sameNode */, false /* readOnly */)
})
// This tests below configuration:
// [pod1] [pod2]
// [ node1 ]
// | | <- same volume mode
// [volume1] -> [restored volume1 snapshot]
ginkgo.It("should concurrently access the volume and restored snapshot from pods on the same node [LinuxOnly][Feature:VolumeSnapshotDataSource][Feature:VolumeSourceXFS]", func(ctx context.Context) {
init(ctx)
ginkgo.DeferCleanup(cleanup)
if !l.driver.GetDriverInfo().Capabilities[storageframework.CapSnapshotDataSource] {
e2eskipper.Skipf("Driver %q does not support volume snapshots - skipping", dInfo.Name)
}
if pattern.SnapshotType == "" {
e2eskipper.Skipf("Driver %q does not support snapshots - skipping", dInfo.Name)
}
// Create a volume
testVolumeSizeRange := t.GetTestSuiteInfo().SupportedSizeRange
resource := storageframework.CreateVolumeResource(ctx, l.driver, l.config, pattern, testVolumeSizeRange)
l.resources = append(l.resources, resource)
pvcs := []*v1.PersistentVolumeClaim{resource.Pvc}
// Create snapshot of it
expectedContent := fmt.Sprintf("volume content %d", time.Now().UTC().UnixNano())
sDriver, ok := driver.(storageframework.SnapshottableTestDriver)
if !ok {
framework.Failf("Driver %q has CapSnapshotDataSource but does not implement SnapshottableTestDriver", dInfo.Name)
}
testConfig := storageframework.ConvertTestConfig(l.config)
dc := l.config.Framework.DynamicClient
dataSourceRef := prepareSnapshotDataSourceForProvisioning(ctx, f, testConfig, l.config, pattern, l.cs, dc, resource.Pvc, resource.Sc, sDriver, pattern.VolMode, expectedContent)
// Create 2nd PVC for testing
pvc2 := &v1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: resource.Pvc.Name + "-restored",
Namespace: resource.Pvc.Namespace,
},
}
resource.Pvc.Spec.DeepCopyInto(&pvc2.Spec)
pvc2.Spec.VolumeName = ""
pvc2.Spec.DataSourceRef = dataSourceRef
pvc2, err := l.cs.CoreV1().PersistentVolumeClaims(pvc2.Namespace).Create(ctx, pvc2, metav1.CreateOptions{})
framework.ExpectNoError(err)
pvcs = append(pvcs, pvc2)
ginkgo.DeferCleanup(framework.IgnoreNotFound(l.cs.CoreV1().PersistentVolumeClaims(pvc2.Namespace).Delete), pvc2.Name, metav1.DeleteOptions{})
// Test access to both volumes on the same node.
TestConcurrentAccessToRelatedVolumes(ctx, l.config.Framework, l.cs, l.ns.Name, l.config.ClientNodeSelection, pvcs, expectedContent)
})
// This tests below configuration:
// [pod1] [pod2]
// [ node1 ]
// | | <- same volume mode
// [volume1] -> [cloned volume1]
ginkgo.It("should concurrently access the volume and its clone from pods on the same node [LinuxOnly][Feature:VolumeSourceXFS]", func(ctx context.Context) {
init(ctx)
ginkgo.DeferCleanup(cleanup)
if !l.driver.GetDriverInfo().Capabilities[storageframework.CapPVCDataSource] {
e2eskipper.Skipf("Driver %q does not support volume clone - skipping", dInfo.Name)
}
// Create a volume
expectedContent := fmt.Sprintf("volume content %d", time.Now().UTC().UnixNano())
testVolumeSizeRange := t.GetTestSuiteInfo().SupportedSizeRange
resource := storageframework.CreateVolumeResource(ctx, l.driver, l.config, pattern, testVolumeSizeRange)
l.resources = append(l.resources, resource)
pvcs := []*v1.PersistentVolumeClaim{resource.Pvc}
testConfig := storageframework.ConvertTestConfig(l.config)
dataSourceRef := preparePVCDataSourceForProvisioning(ctx, f, testConfig, l.cs, resource.Pvc, resource.Sc, pattern.VolMode, expectedContent)
// Create 2nd PVC for testing
pvc2 := &v1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: resource.Pvc.Name + "-cloned",
Namespace: resource.Pvc.Namespace,
},
}
resource.Pvc.Spec.DeepCopyInto(&pvc2.Spec)
pvc2.Spec.VolumeName = ""
pvc2.Spec.DataSourceRef = dataSourceRef
pvc2, err := l.cs.CoreV1().PersistentVolumeClaims(pvc2.Namespace).Create(ctx, pvc2, metav1.CreateOptions{})
framework.ExpectNoError(err)
pvcs = append(pvcs, pvc2)
ginkgo.DeferCleanup(framework.IgnoreNotFound(l.cs.CoreV1().PersistentVolumeClaims(pvc2.Namespace).Delete), pvc2.Name, metav1.DeleteOptions{})
// Test access to both volumes on the same node.
TestConcurrentAccessToRelatedVolumes(ctx, l.config.Framework, l.cs, l.ns.Name, l.config.ClientNodeSelection, pvcs, expectedContent)
})
// This tests below configuration:
// [pod1] [pod2]
// [ node1 ]
// \ / <- same volume mode (read only)
// [volume1]
ginkgo.It("should concurrently access the single read-only volume from pods on the same node", func(ctx context.Context) {
init(ctx)
ginkgo.DeferCleanup(cleanup)
numPods := 2
if !l.driver.GetDriverInfo().Capabilities[storageframework.CapMultiPODs] {
e2eskipper.Skipf("Driver %q does not support multiple concurrent pods - skipping", dInfo.Name)
}
if l.driver.GetDriverInfo().Name == "vsphere" && pattern == storageframework.BlockVolModeDynamicPV {
e2eskipper.Skipf("Driver %q does not support read only raw block volumes - skipping", dInfo.Name)
}
// Create volume
testVolumeSizeRange := t.GetTestSuiteInfo().SupportedSizeRange
resource := storageframework.CreateVolumeResource(ctx, l.driver, l.config, pattern, testVolumeSizeRange)
l.resources = append(l.resources, resource)
// Initialize the volume with a filesystem - it's going to be mounted as read-only below.
initializeVolume(ctx, l.cs, f.Timeouts, l.ns.Name, resource.Pvc, l.config.ClientNodeSelection)
// Test access to the volume from pods on a single node
TestConcurrentAccessToSingleVolume(ctx, l.config.Framework, l.cs, l.ns.Name,
l.config.ClientNodeSelection, resource.Pvc, numPods, true /* sameNode */, true /* readOnly */)
})
// This tests below configuration:
// [pod1] [pod2]
// [ node1 ] [ node2 ]
// \ / <- same volume mode
// [volume1]
ginkgo.It("should concurrently access the single volume from pods on different node", func(ctx context.Context) {
init(ctx)
ginkgo.DeferCleanup(cleanup)
numPods := 2
if !l.driver.GetDriverInfo().Capabilities[storageframework.CapRWX] {
e2eskipper.Skipf("Driver %s doesn't support %v -- skipping", l.driver.GetDriverInfo().Name, storageframework.CapRWX)
}
// Check different-node test requirement
if l.config.ClientNodeSelection.Name != "" {
e2eskipper.Skipf("Driver %q requires to deploy on a specific node - skipping", l.driver.GetDriverInfo().Name)
}
// For multi-node tests there must be enough nodes with the same toopology to schedule the pods
if err := ensureTopologyRequirements(ctx, &l.config.ClientNodeSelection, l.cs, dInfo, 2); err != nil {
framework.Failf("Error setting topology requirements: %v", err)
}
// Create volume
testVolumeSizeRange := t.GetTestSuiteInfo().SupportedSizeRange
resource := storageframework.CreateVolumeResource(ctx, l.driver, l.config, pattern, testVolumeSizeRange)
l.resources = append(l.resources, resource)
// Test access to the volume from pods on different node
TestConcurrentAccessToSingleVolume(ctx, l.config.Framework, l.cs, l.ns.Name,
l.config.ClientNodeSelection, resource.Pvc, numPods, false /* sameNode */, false /* readOnly */)
})
}
// testAccessMultipleVolumes tests access to multiple volumes from single pod on the specified node
// If readSeedBase > 0, read test are done before write/read test assuming that there is already data written.
func testAccessMultipleVolumes(ctx context.Context, f *framework.Framework, cs clientset.Interface, ns string,
node e2epod.NodeSelection, pvcs []*v1.PersistentVolumeClaim, readSeedBase int64, writeSeedBase int64) string {
ginkgo.By(fmt.Sprintf("Creating pod on %+v with multiple volumes", node))
podConfig := e2epod.Config{
NS: ns,
PVCs: pvcs,
SeLinuxLabel: e2epod.GetLinuxLabel(),
NodeSelection: node,
ImageID: e2epod.GetDefaultTestImageID(),
}
pod, err := e2epod.CreateSecPodWithNodeSelection(ctx, cs, &podConfig, f.Timeouts.PodStart)
defer func() {
framework.ExpectNoError(e2epod.DeletePodWithWait(ctx, cs, pod))
}()
framework.ExpectNoError(err)
byteLen := 64
for i, pvc := range pvcs {
// CreateSecPodWithNodeSelection make volumes accessible via /mnt/volume({i} + 1)
index := i + 1
path := fmt.Sprintf("/mnt/volume%d", index)
ginkgo.By(fmt.Sprintf("Checking if the volume%d exists as expected volume mode (%s)", index, *pvc.Spec.VolumeMode))
e2evolume.CheckVolumeModeOfPath(f, pod, *pvc.Spec.VolumeMode, path)
if readSeedBase > 0 {
ginkgo.By(fmt.Sprintf("Checking if read from the volume%d works properly", index))
storageutils.CheckReadFromPath(f, pod, *pvc.Spec.VolumeMode, false, path, byteLen, readSeedBase+int64(i))
}
ginkgo.By(fmt.Sprintf("Checking if write to the volume%d works properly", index))
storageutils.CheckWriteToPath(f, pod, *pvc.Spec.VolumeMode, false, path, byteLen, writeSeedBase+int64(i))
ginkgo.By(fmt.Sprintf("Checking if read from the volume%d works properly", index))
storageutils.CheckReadFromPath(f, pod, *pvc.Spec.VolumeMode, false, path, byteLen, writeSeedBase+int64(i))
}
pod, err = cs.CoreV1().Pods(pod.Namespace).Get(ctx, pod.Name, metav1.GetOptions{})
framework.ExpectNoError(err, "get pod")
return pod.Spec.NodeName
}
// TestAccessMultipleVolumesAcrossPodRecreation tests access to multiple volumes from single pod,
// then recreate pod on the same or different node depending on requiresSameNode,
// and recheck access to the volumes from the recreated pod
func TestAccessMultipleVolumesAcrossPodRecreation(ctx context.Context, f *framework.Framework, cs clientset.Interface, ns string,
node e2epod.NodeSelection, pvcs []*v1.PersistentVolumeClaim, requiresSameNode bool) {
// No data is written in volume, so passing negative value
readSeedBase := int64(-1)
writeSeedBase := time.Now().UTC().UnixNano()
// Test access to multiple volumes on the specified node
nodeName := testAccessMultipleVolumes(ctx, f, cs, ns, node, pvcs, readSeedBase, writeSeedBase)
// Set affinity depending on requiresSameNode
if requiresSameNode {
e2epod.SetAffinity(&node, nodeName)
} else {
e2epod.SetAntiAffinity(&node, nodeName)
}
// Test access to multiple volumes again on the node updated above
// Setting previous writeSeed to current readSeed to check previous data is retained
readSeedBase = writeSeedBase
// Update writeSeed with new value
writeSeedBase = time.Now().UTC().UnixNano()
_ = testAccessMultipleVolumes(ctx, f, cs, ns, node, pvcs, readSeedBase, writeSeedBase)
}
// TestConcurrentAccessToSingleVolume tests access to a single volume from multiple pods,
// then delete the last pod, and recheck access to the volume after pod deletion to check if other
// pod deletion doesn't affect. Pods are deployed on the same node or different nodes depending on requiresSameNode.
// Read/write check are done across pod, by check reading both what pod{n-1} and pod{n} wrote from pod{n}.
func TestConcurrentAccessToSingleVolume(ctx context.Context, f *framework.Framework, cs clientset.Interface, ns string,
node e2epod.NodeSelection, pvc *v1.PersistentVolumeClaim, numPods int, requiresSameNode bool,
readOnly bool) {
var pods []*v1.Pod
// Create each pod with pvc
for i := 0; i < numPods; i++ {
index := i + 1
ginkgo.By(fmt.Sprintf("Creating pod%d with a volume on %+v", index, node))
podConfig := e2epod.Config{
NS: ns,
PVCs: []*v1.PersistentVolumeClaim{pvc},
SeLinuxLabel: e2epod.GetLinuxLabel(),
NodeSelection: node,
PVCsReadOnly: readOnly,
ImageID: e2epod.GetTestImageID(imageutils.JessieDnsutils),
}
pod, err := e2epod.CreateSecPodWithNodeSelection(ctx, cs, &podConfig, f.Timeouts.PodStart)
framework.ExpectNoError(err)
// The pod must get deleted before this function returns because the caller may try to
// delete volumes as part of the tests. Keeping the pod running would block that.
// If the test times out, then the namespace deletion will take care of it.
defer func() {
framework.ExpectNoError(e2epod.DeletePodWithWait(ctx, cs, pod))
}()
pod, err = cs.CoreV1().Pods(pod.Namespace).Get(ctx, pod.Name, metav1.GetOptions{})
pods = append(pods, pod)
framework.ExpectNoError(err, fmt.Sprintf("get pod%d", index))
actualNodeName := pod.Spec.NodeName
// Set affinity depending on requiresSameNode
if requiresSameNode {
e2epod.SetAffinity(&node, actualNodeName)
} else {
e2epod.SetAntiAffinity(&node, actualNodeName)
}
}
path := "/mnt/volume1"
var seed int64
byteLen := 64
directIO := false
// direct IO is needed for Block-mode PVs
if *pvc.Spec.VolumeMode == v1.PersistentVolumeBlock {
if len(pods) < 1 {
framework.Failf("Number of pods shouldn't be less than 1, but got %d", len(pods))
}
// byteLen should be the size of a sector to enable direct I/O
byteLen = storageutils.GetSectorSize(f, pods[0], path)
directIO = true
}
// Check if volume can be accessed from each pod
for i, pod := range pods {
index := i + 1
ginkgo.By(fmt.Sprintf("Checking if the volume in pod%d exists as expected volume mode (%s)", index, *pvc.Spec.VolumeMode))
e2evolume.CheckVolumeModeOfPath(f, pod, *pvc.Spec.VolumeMode, path)
if readOnly {
ginkgo.By("Skipping volume content checks, volume is read-only")
continue
}
if i != 0 {
ginkgo.By(fmt.Sprintf("From pod%d, checking if reading the data that pod%d write works properly", index, index-1))
// For 1st pod, no one has written data yet, so pass the read check
storageutils.CheckReadFromPath(f, pod, *pvc.Spec.VolumeMode, directIO, path, byteLen, seed)
}
// Update the seed and check if write/read works properly
seed = time.Now().UTC().UnixNano()
ginkgo.By(fmt.Sprintf("Checking if write to the volume in pod%d works properly", index))
storageutils.CheckWriteToPath(f, pod, *pvc.Spec.VolumeMode, directIO, path, byteLen, seed)
ginkgo.By(fmt.Sprintf("Checking if read from the volume in pod%d works properly", index))
storageutils.CheckReadFromPath(f, pod, *pvc.Spec.VolumeMode, directIO, path, byteLen, seed)
}
if len(pods) < 2 {
framework.Failf("Number of pods shouldn't be less than 2, but got %d", len(pods))
}
// Delete the last pod and remove from slice of pods
lastPod := pods[len(pods)-1]
framework.ExpectNoError(e2epod.DeletePodWithWait(ctx, cs, lastPod))
pods = pods[:len(pods)-1]
// Recheck if pv can be accessed from each pod after the last pod deletion
for i, pod := range pods {
index := i + 1
// index of pod and index of pvc match, because pods are created above way
ginkgo.By(fmt.Sprintf("Rechecking if the volume in pod%d exists as expected volume mode (%s)", index, *pvc.Spec.VolumeMode))
e2evolume.CheckVolumeModeOfPath(f, pod, *pvc.Spec.VolumeMode, "/mnt/volume1")
if readOnly {
ginkgo.By("Skipping volume content checks, volume is read-only")
continue
}
if i == 0 {
// This time there should be data that last pod wrote, for 1st pod
ginkgo.By(fmt.Sprintf("From pod%d, rechecking if reading the data that last pod write works properly", index))
} else {
ginkgo.By(fmt.Sprintf("From pod%d, rechecking if reading the data that pod%d write works properly", index, index-1))
}
storageutils.CheckReadFromPath(f, pod, *pvc.Spec.VolumeMode, directIO, path, byteLen, seed)
// Update the seed and check if write/read works properly
seed = time.Now().UTC().UnixNano()
ginkgo.By(fmt.Sprintf("Rechecking if write to the volume in pod%d works properly", index))
storageutils.CheckWriteToPath(f, pod, *pvc.Spec.VolumeMode, directIO, path, byteLen, seed)
ginkgo.By(fmt.Sprintf("Rechecking if read from the volume in pod%d works properly", index))
storageutils.CheckReadFromPath(f, pod, *pvc.Spec.VolumeMode, directIO, path, byteLen, seed)
}
}
// TestConcurrentAccessToRelatedVolumes tests access to multiple volumes from multiple pods.
// Each provided PVC is used by a single pod. The test ensures that volumes created from
// another volume (=clone) or volume snapshot can be used together with the original volume.
func TestConcurrentAccessToRelatedVolumes(ctx context.Context, f *framework.Framework, cs clientset.Interface, ns string,
node e2epod.NodeSelection, pvcs []*v1.PersistentVolumeClaim, expectedContent string) {
var pods []*v1.Pod
// Create each pod with pvc
for i := range pvcs {
index := i + 1
ginkgo.By(fmt.Sprintf("Creating pod%d with a volume on %+v", index, node))
podConfig := e2epod.Config{
NS: ns,
PVCs: []*v1.PersistentVolumeClaim{pvcs[i]},
SeLinuxLabel: e2epod.GetLinuxLabel(),
NodeSelection: node,
PVCsReadOnly: false,
ImageID: e2epod.GetTestImageID(imageutils.JessieDnsutils),
}
pod, err := e2epod.CreateSecPodWithNodeSelection(ctx, cs, &podConfig, f.Timeouts.PodStart)
defer func() {
framework.ExpectNoError(e2epod.DeletePodWithWait(ctx, cs, pod))
}()
framework.ExpectNoError(err)
pods = append(pods, pod)
actualNodeName := pod.Spec.NodeName
// Always run the subsequent pods on the same node.
e2epod.SetAffinity(&node, actualNodeName)
}
for i, pvc := range pvcs {
var commands []string
if *pvc.Spec.VolumeMode == v1.PersistentVolumeBlock {
fileName := "/mnt/volume1"
commands = e2evolume.GenerateReadBlockCmd(fileName, len(expectedContent))
// Check that all pods have the same content
index := i + 1
ginkgo.By(fmt.Sprintf("Checking if the volume in pod%d has expected initial content", index))
_, err := e2eoutput.LookForStringInPodExec(pods[i].Namespace, pods[i].Name, commands, expectedContent, time.Minute)
framework.ExpectNoError(err, "failed: finding the contents of the block volume %s.", fileName)
} else {
fileName := "/mnt/volume1/index.html"
commands = e2evolume.GenerateReadFileCmd(fileName)
// Check that all pods have the same content
index := i + 1
ginkgo.By(fmt.Sprintf("Checking if the volume in pod%d has expected initial content", index))
_, err := e2eoutput.LookForStringInPodExec(pods[i].Namespace, pods[i].Name, commands, expectedContent, time.Minute)
framework.ExpectNoError(err, "failed: finding the contents of the mounted file %s.", fileName)
}
}
}
// getCurrentTopologies() goes through all Nodes and returns unique driver topologies and count of Nodes per topology
func getCurrentTopologiesNumber(cs clientset.Interface, nodes *v1.NodeList, keys []string) ([]topology, []int, error) {
topos := []topology{}
topoCount := []int{}
// TODO: scale?
for _, n := range nodes.Items {
topo := map[string]string{}
for _, k := range keys {
v, ok := n.Labels[k]
if ok {
topo[k] = v
}
}
found := false
for i, existingTopo := range topos {
if topologyEqual(existingTopo, topo) {
found = true
topoCount[i]++
break
}
}
if !found && len(topo) > 0 {
framework.Logf("found topology %v", topo)
topos = append(topos, topo)
topoCount = append(topoCount, 1)
}
}
return topos, topoCount, nil
}
// ensureTopologyRequirements check that there are enough nodes in the cluster for a test and
// sets nodeSelection affinity according to given topology keys for drivers that provide them.
func ensureTopologyRequirements(ctx context.Context, nodeSelection *e2epod.NodeSelection, cs clientset.Interface, driverInfo *storageframework.DriverInfo, minCount int) error {
nodes, err := e2enode.GetReadySchedulableNodes(ctx, cs)
framework.ExpectNoError(err)
if len(nodes.Items) < minCount {
e2eskipper.Skipf(fmt.Sprintf("Number of available nodes is less than %d - skipping", minCount))
}
topologyKeys := driverInfo.TopologyKeys
if len(topologyKeys) == 0 {
// The driver does not have any topology restrictions
return nil
}
topologyList, topologyCount, err := getCurrentTopologiesNumber(cs, nodes, topologyKeys)
if err != nil {
return err
}
suitableTopologies := []topology{}
for i, topo := range topologyList {
if topologyCount[i] >= minCount {
suitableTopologies = append(suitableTopologies, topo)
}
}
if len(suitableTopologies) == 0 {
e2eskipper.Skipf("No topology with at least %d nodes found - skipping", minCount)
}
// Take the first suitable topology
e2epod.SetNodeAffinityTopologyRequirement(nodeSelection, suitableTopologies[0])
return nil
}
// initializeVolume creates a filesystem on given volume, so it can be used as read-only later
func initializeVolume(ctx context.Context, cs clientset.Interface, t *framework.TimeoutContext, ns string, pvc *v1.PersistentVolumeClaim, node e2epod.NodeSelection) {
if pvc.Spec.VolumeMode != nil && *pvc.Spec.VolumeMode == v1.PersistentVolumeBlock {
// Block volumes do not need to be initialized.
return
}
ginkgo.By(fmt.Sprintf("Initializing a filesystem on PVC %s", pvc.Name))
// Just create a pod with the volume as read-write. Kubernetes will create a filesystem there
// if it does not exist yet.
podConfig := e2epod.Config{
NS: ns,
PVCs: []*v1.PersistentVolumeClaim{pvc},
SeLinuxLabel: e2epod.GetLinuxLabel(),
NodeSelection: node,
ImageID: e2epod.GetDefaultTestImageID(),
}
pod, err := e2epod.CreateSecPod(ctx, cs, &podConfig, t.PodStart)
defer func() {
framework.ExpectNoError(e2epod.DeletePodWithWait(ctx, cs, pod))
}()
framework.ExpectNoError(err)
}