Use "require_disk" mark

Signed-off-by: Robert Baldyga <robert.baldyga@intel.com>
This commit is contained in:
Robert Baldyga 2019-10-22 13:07:49 +02:00
parent 6bcc95d0cd
commit b65ff40bf0
14 changed files with 155 additions and 235 deletions

@ -1 +1 @@
Subproject commit 5667b7bbf9bb60eedefb20b0625d1b9f32f214f2
Subproject commit 68afc1655f85c1fa3d523c50a02bcf56ed47cc67

View File

@ -14,10 +14,7 @@ LOGGER = logging.getLogger(__name__)
@pytest.mark.parametrize("shortcut", [True, False])
@pytest.mark.parametrize('prepare_and_cleanup',
[{"core_count": 0, "cache_count": 0}],
indirect=True)
def test_cli_help(prepare_and_cleanup, shortcut):
def test_cli_help(shortcut):
prepare()
LOGGER.info("Test run")
output = casadm.help(shortcut)

View File

@ -7,18 +7,15 @@
import pytest
from api.cas import casadm, casadm_parser
from core.test_run import TestRun
from storage_devices.disk import DiskType
from storage_devices.disk import DiskType, DiskTypeSet
from test_utils.size import Unit, Size
@pytest.mark.require_disk("cache", DiskTypeSet([DiskType.optane]))
@pytest.mark.parametrize("shortcut", [True, False])
@pytest.mark.parametrize('prepare_and_cleanup',
[{"core_count": 0, "cache_count": 1, "cache_type": "optane"}, ],
indirect=True)
def test_cli_start_stop_default_value(prepare_and_cleanup, shortcut):
def test_cli_start_stop_default_value(shortcut):
with TestRun.LOGGER.step("Prepare devices"):
cache_device = next(
disk for disk in TestRun.dut.disks if disk.disk_type == DiskType.optane)
cache_device = TestRun.disks['cache']
cache_device.create_partitions([Size(500, Unit.MebiByte)])
cache_device = cache_device.partitions[0]
@ -47,13 +44,10 @@ def test_cli_start_stop_default_value(prepare_and_cleanup, shortcut):
TestRun.LOGGER.error("There is no 'No caches running' info in casadm -L output")
@pytest.mark.require_disk("cache", DiskTypeSet([DiskType.optane]))
@pytest.mark.parametrize("shortcut", [True, False])
@pytest.mark.parametrize('prepare_and_cleanup',
[{"core_count": 1, "cache_count": 1, "cache_type": "optane"}],
indirect=True)
def test_cli_add_remove_default_value(prepare_and_cleanup, shortcut):
cache_device = next(
disk for disk in TestRun.dut.disks if disk.disk_type == DiskType.optane)
def test_cli_add_remove_default_value(shortcut):
cache_device = TestRun.disks['cache']
cache_device.create_partitions([Size(500, Unit.MebiByte)])
cache_device = cache_device.partitions[0]
cache = casadm.start_cache(cache_device, shortcut=shortcut, force=True)

View File

@ -36,8 +36,52 @@ pytest_options = {}
def get_pytest_options(request):
pytest_options["remote"] = request.config.getoption("--remote")
pytest_options["branch"] = request.config.getoption("--repo-tag")
pytest_options["force_reinstall"] = request.config.getoption("--force-reinstall")
pytest_options["log_path"] = request.config.getoption("--log-path")
def pytest_runtest_setup(item):
# There should be dut config file added to config package and
# pytest should be executed with option --dut-config=conf_name'.
#
# 'ip' field should be filled with valid IP string to use remote ssh executor
# or it should be commented out when user want to execute tests on local machine
#
# User can also have own test wrapper, which runs test prepare, cleanup, etc.
# Then it should be placed in plugins package
TestRun.prepare(item)
test_name = item.name.split('[')[0]
TestRun.LOGGER = create_log(item.config.getoption('--log-path'), test_name)
with TestRun.LOGGER.step("Dut prepare"):
try:
try:
with open(item.config.getoption('--dut-config')) as cfg:
dut_config = yaml.safe_load(cfg)
except Exception:
dut_config = {}
if 'test_wrapper' in sys.modules:
if 'ip' in dut_config:
try:
IP(dut_config['ip'])
except ValueError:
raise Exception("IP address from configuration file is in invalid format.")
dut_config = test_wrapper.prepare(dut_config)
TestRun.setup(dut_config)
if 'test_wrapper' in sys.modules:
test_wrapper.try_setup_serial_log(dut_config)
TestRun.plugins['opencas'] = {'already_updated': False}
except Exception as e:
TestRun.LOGGER.exception(f"{str(e)}\n{traceback.format_exc()}")
TestRun.LOGGER.info(f"DUT info: {TestRun.dut}")
base_prepare(item)
TestRun.LOGGER.write_to_command_log("Test body")
TestRun.LOGGER.start_group("Test body")
def pytest_runtest_teardown():
@ -69,54 +113,8 @@ def pytest_runtest_teardown():
TestRun.LOGGER.get_additional_logs()
@pytest.fixture()
def prepare_and_cleanup(request):
"""
This fixture returns the dictionary, which contains DUT ip, IPMI, spider, list of disks.
This fixture also returns the executor of commands
"""
# There should be dut config file added to config package and
# pytest should be executed with option --dut-config=conf_name'.
#
# 'ip' field should be filled with valid IP string to use remote ssh executor
# or it should be commented out when user want to execute tests on local machine
#
# User can also have own test wrapper, which runs test prepare, cleanup, etc.
# Then it should be placed in plugins package
test_name = request.node.name.split('[')[0]
TestRun.LOGGER = create_log(f'{get_log_path_param()}', test_name)
with TestRun.LOGGER.step("Dut prepare"):
try:
try:
with open(request.config.getoption('--dut-config')) as cfg:
dut_config = yaml.safe_load(cfg)
except Exception:
dut_config = {}
if 'test_wrapper' in sys.modules:
if 'ip' in dut_config:
try:
IP(dut_config['ip'])
except ValueError:
raise Exception("IP address from configuration file is in invalid format.")
dut_config = test_wrapper.prepare(request.param, dut_config)
TestRun.prepare(dut_config)
if 'test_wrapper' in sys.modules:
test_wrapper.try_setup_serial_log(dut_config)
TestRun.plugins['opencas'] = {'already_updated': False}
except Exception as e:
TestRun.LOGGER.exception(f"{str(e)}\n{traceback.format_exc()}")
TestRun.LOGGER.info(f"DUT info: {TestRun.dut}")
base_prepare()
TestRun.LOGGER.write_to_command_log("Test body")
TestRun.LOGGER.start_group("Test body")
def pytest_configure(config):
TestRun.configure(config)
def pytest_addoption(parser):
@ -137,14 +135,6 @@ def get_branch():
return pytest_options["branch"]
def get_force_param():
return pytest_options["force_reinstall"]
def get_log_path_param():
return pytest_options["log_path"]
def unmount_cas_devices():
output = TestRun.executor.run("cat /proc/mounts | grep cas")
# If exit code is '1' but stdout is empty, there is no mounted cas devices
@ -173,7 +163,11 @@ def kill_all_io():
TestRun.executor.run("pkill --signal SIGKILL fio*")
def base_prepare():
def get_force_param(item):
return item.config.getoption("--force-reinstall") is not "False"
def base_prepare(item):
with TestRun.LOGGER.step("Cleanup before test"):
Udev.enable()
kill_all_io()
@ -185,7 +179,7 @@ def base_prepare():
except Exception:
pass # TODO: Reboot DUT if test is executed remotely
if get_force_param() is not "False" and not TestRun.plugins['opencas']['already_updated']:
if get_force_param(item) and not TestRun.plugins['opencas']['already_updated']:
installer.reinstall_opencas()
elif not installer.check_if_installed():
installer.install_opencas()

View File

@ -7,6 +7,7 @@ import pytest
from test_tools.disk_utils import Filesystem
from test_utils.size import Size, Unit
from core.test_run import TestRun
from storage_devices.disk import DiskType, DiskTypeSet
from tests.conftest import base_prepare
from test_utils.filesystem.file import File
from test_utils.filesystem.directory import Directory
@ -17,14 +18,12 @@ def setup_module():
TestRun.LOGGER.warning("Entering setup method")
@pytest.mark.parametrize('prepare_and_cleanup',
[{"cache_type": "nand", "cache_count": 1}],
indirect=True)
def test_create_example_partitions(prepare_and_cleanup):
@pytest.mark.require_disk("cache", DiskTypeSet([DiskType.optane, DiskType.nand]))
def test_create_example_partitions():
prepare()
TestRun.LOGGER.info("Test run")
TestRun.LOGGER.info(f"DUT info: {TestRun.dut}")
test_disk = TestRun.dut.disks[0]
test_disk = TestRun.disks['cache']
part_sizes = []
for i in range(1, 6):
part_sizes.append(Size(10 * i + 100, Unit.MebiByte))
@ -33,10 +32,7 @@ def test_create_example_partitions(prepare_and_cleanup):
test_disk.partitions[0].create_filesystem(Filesystem.ext3)
@pytest.mark.parametrize('prepare_and_cleanup',
[{"cache_type": "nand", "cache_count": 1}],
indirect=True)
def test_create_example_files(prepare_and_cleanup):
def test_create_example_files():
prepare()
TestRun.LOGGER.info("Test run")
file1 = File.create_file("example_file")

View File

@ -7,17 +7,16 @@
import pytest
from core.test_run import TestRun
from tests.conftest import base_prepare
from storage_devices.disk import DiskType
from storage_devices.disk import DiskType, DiskTypeSet, DiskTypeLowerThan
from test_utils.size import Size, Unit
from api.cas.cache_config import CacheMode
from api.cas import casadm
from test_tools.dd import Dd
@pytest.mark.parametrize(
"prepare_and_cleanup", [{"core_count": 1, "cache_count": 1}], indirect=True
)
def test_core_inactive(prepare_and_cleanup):
@pytest.mark.require_disk("cache", DiskTypeSet([DiskType.optane, DiskType.nand]))
@pytest.mark.require_disk("core", DiskTypeLowerThan("cache"))
def test_core_inactive():
"""
1. Start cache with 3 cores.
2. Stop cache.
@ -48,10 +47,9 @@ def test_core_inactive(prepare_and_cleanup):
assert stats["inactive core devices"] == 1
@pytest.mark.parametrize(
"prepare_and_cleanup", [{"core_count": 1, "cache_count": 1}], indirect=True
)
def test_core_inactive_stats(prepare_and_cleanup):
@pytest.mark.require_disk("cache", DiskTypeSet([DiskType.optane, DiskType.nand]))
@pytest.mark.require_disk("core", DiskTypeLowerThan("cache"))
def test_core_inactive_stats():
"""
1. Start cache with 3 cores.
2. Switch cache into WB mode.
@ -136,18 +134,8 @@ def test_core_inactive_stats(prepare_and_cleanup):
def prepare():
base_prepare()
cache_device = next(
disk
for disk in TestRun.dut.disks
if disk.disk_type in [DiskType.optane, DiskType.nand]
)
core_device = next(
disk
for disk in TestRun.dut.disks
if (
disk.disk_type.value > cache_device.disk_type.value and disk != cache_device
)
)
cache_device = TestRun.disks['cache']
core_device = TestRun.disks['core']
cache_device.create_partitions([Size(500, Unit.MebiByte)])
core_device.create_partitions(

View File

@ -16,14 +16,8 @@ mountpoint = "/tmp/cas1-1"
def prepare():
ioclass_config.remove_ioclass_config()
cache_device = next(filter(
lambda disk: disk.disk_type in [DiskType.optane, DiskType.nand],
TestRun.dut.disks
))
core_device = next(filter(
lambda disk: disk.disk_type.value > cache_device.disk_type.value,
TestRun.dut.disks
))
cache_device = TestRun.disks['cache']
core_device = TestRun.disks['core']
cache_device.create_partitions([Size(500, Unit.MebiByte)])
core_device.create_partitions([Size(1, Unit.GibiByte)])

View File

@ -14,14 +14,14 @@ from test_tools.disk_utils import Filesystem
from test_utils.filesystem.directory import Directory
from test_utils.filesystem.file import File
from test_utils.os_utils import drop_caches, DropCachesMode, sync, Udev
from storage_devices.disk import DiskType, DiskTypeSet, DiskTypeLowerThan
from .io_class_common import *
@pytest.mark.require_disk("cache", DiskTypeSet([DiskType.optane, DiskType.nand]))
@pytest.mark.require_disk("core", DiskTypeLowerThan("cache"))
@pytest.mark.parametrize("filesystem", Filesystem)
@pytest.mark.parametrize(
"prepare_and_cleanup", [{"core_count": 1, "cache_count": 1}], indirect=True
)
def test_ioclass_directory_depth(prepare_and_cleanup, filesystem):
def test_ioclass_directory_depth(filesystem):
"""
Test if directory classification works properly for deeply nested directories for read and
write operations.
@ -109,11 +109,10 @@ def test_ioclass_directory_depth(prepare_and_cleanup, filesystem):
f"Expected: {base_occupancy + test_file_2.size}, actual: {new_occupancy}"
@pytest.mark.require_disk("cache", DiskTypeSet([DiskType.optane, DiskType.nand]))
@pytest.mark.require_disk("core", DiskTypeLowerThan("cache"))
@pytest.mark.parametrize("filesystem", Filesystem)
@pytest.mark.parametrize(
"prepare_and_cleanup", [{"core_count": 1, "cache_count": 1}], indirect=True
)
def test_ioclass_directory_dir_operations(prepare_and_cleanup, filesystem):
def test_ioclass_directory_dir_operations(filesystem):
"""
Test if directory classification works properly after directory operations like move or rename.
The operations themselves should not cause reclassification but IO after those operations
@ -280,11 +279,10 @@ def test_ioclass_directory_dir_operations(prepare_and_cleanup, filesystem):
directory=dir_1, with_delay=True)
@pytest.mark.require_disk("cache", DiskTypeSet([DiskType.optane, DiskType.nand]))
@pytest.mark.require_disk("core", DiskTypeLowerThan("cache"))
@pytest.mark.parametrize("filesystem", Filesystem)
@pytest.mark.parametrize(
"prepare_and_cleanup", [{"core_count": 1, "cache_count": 1}], indirect=True
)
def test_ioclass_directory_file_operations(prepare_and_cleanup, filesystem):
def test_ioclass_directory_file_operations(filesystem):
"""
Test if directory classification works properly after file operations like move or rename.
The operations themselves should not cause reclassification but IO after those operations

View File

@ -11,13 +11,13 @@ from test_tools.dd import Dd
from test_tools.disk_utils import Filesystem
from test_utils.filesystem.file import File
from test_utils.os_utils import sync, Udev, DropCachesMode, drop_caches
from storage_devices.disk import DiskType, DiskTypeSet, DiskTypeLowerThan
from .io_class_common import *
@pytest.mark.parametrize(
"prepare_and_cleanup", [{"core_count": 1, "cache_count": 1}], indirect=True
)
def test_ioclass_file_extension(prepare_and_cleanup):
@pytest.mark.require_disk("cache", DiskTypeSet([DiskType.optane, DiskType.nand]))
@pytest.mark.require_disk("core", DiskTypeLowerThan("cache"))
def test_ioclass_file_extension():
cache, core = prepare()
iterations = 50
ioclass_id = 1
@ -77,10 +77,9 @@ def test_ioclass_file_extension(prepare_and_cleanup):
assert stats["dirty"].get_value(Unit.Blocks4096) == 0
@pytest.mark.parametrize(
"prepare_and_cleanup", [{"core_count": 1, "cache_count": 1}], indirect=True
)
def test_ioclass_file_extension_preexisting_filesystem(prepare_and_cleanup):
@pytest.mark.require_disk("cache", DiskTypeSet([DiskType.optane, DiskType.nand]))
@pytest.mark.require_disk("core", DiskTypeLowerThan("cache"))
def test_ioclass_file_extension_preexisting_filesystem():
"""Create files on filesystem, add device with filesystem as a core,
write data to files and check if they are cached properly"""
cache, core = prepare()
@ -143,10 +142,9 @@ def test_ioclass_file_extension_preexisting_filesystem(prepare_and_cleanup):
)
@pytest.mark.parametrize(
"prepare_and_cleanup", [{"core_count": 1, "cache_count": 1}], indirect=True
)
def test_ioclass_file_offset(prepare_and_cleanup):
@pytest.mark.require_disk("cache", DiskTypeSet([DiskType.optane, DiskType.nand]))
@pytest.mark.require_disk("core", DiskTypeLowerThan("cache"))
def test_ioclass_file_offset():
cache, core = prepare()
ioclass_id = 1
@ -220,11 +218,10 @@ def test_ioclass_file_offset(prepare_and_cleanup):
), f"Inappropriately cached offset: {file_offset}"
@pytest.mark.require_disk("cache", DiskTypeSet([DiskType.optane, DiskType.nand]))
@pytest.mark.require_disk("core", DiskTypeLowerThan("cache"))
@pytest.mark.parametrize("filesystem", Filesystem)
@pytest.mark.parametrize(
"prepare_and_cleanup", [{"core_count": 1, "cache_count": 1}], indirect=True
)
def test_ioclass_file_size(prepare_and_cleanup, filesystem):
def test_ioclass_file_size(filesystem):
"""
File size IO class rules are configured in a way that each tested file size is unambiguously
classified.

View File

@ -9,13 +9,13 @@ import pytest
from test_tools.dd import Dd
from test_utils.os_utils import sync, Udev
from storage_devices.disk import DiskType, DiskTypeSet, DiskTypeLowerThan
from .io_class_common import *
@pytest.mark.parametrize(
"prepare_and_cleanup", [{"core_count": 1, "cache_count": 1}], indirect=True
)
def test_ioclass_process_name(prepare_and_cleanup):
@pytest.mark.require_disk("cache", DiskTypeSet([DiskType.optane, DiskType.nand]))
@pytest.mark.require_disk("core", DiskTypeLowerThan("cache"))
def test_ioclass_process_name():
"""Check if data generated by process with particular name is cached"""
cache, core = prepare()
@ -54,10 +54,9 @@ def test_ioclass_process_name(prepare_and_cleanup):
assert stats["dirty"].get_value(Unit.Blocks4096) == (i + 1) * dd_count
@pytest.mark.parametrize(
"prepare_and_cleanup", [{"core_count": 1, "cache_count": 1}], indirect=True
)
def test_ioclass_pid(prepare_and_cleanup):
@pytest.mark.require_disk("cache", DiskTypeSet([DiskType.optane, DiskType.nand]))
@pytest.mark.require_disk("core", DiskTypeLowerThan("cache"))
def test_ioclass_pid():
cache, core = prepare()
ioclass_id = 1

View File

@ -14,13 +14,13 @@ from test_tools.fio.fio import Fio
from test_tools.fio.fio_param import ReadWrite, IoEngine
from test_utils.filesystem.file import File
from test_utils.os_utils import sync, Udev
from storage_devices.disk import DiskType, DiskTypeSet, DiskTypeLowerThan
from .io_class_common import *
@pytest.mark.parametrize(
"prepare_and_cleanup", [{"core_count": 1, "cache_count": 1}], indirect=True
)
def test_ioclass_lba(prepare_and_cleanup):
@pytest.mark.require_disk("cache", DiskTypeSet([DiskType.optane, DiskType.nand]))
@pytest.mark.require_disk("core", DiskTypeLowerThan("cache"))
def test_ioclass_lba():
"""Write data to random lba and check if it is cached according to range
defined in ioclass rule"""
cache, core = prepare()
@ -92,10 +92,9 @@ def test_ioclass_lba(prepare_and_cleanup):
), f"Inappropriately cached lba: {rand_lba}"
@pytest.mark.parametrize(
"prepare_and_cleanup", [{"core_count": 1, "cache_count": 1}], indirect=True
)
def test_ioclass_request_size(prepare_and_cleanup):
@pytest.mark.require_disk("cache", DiskTypeSet([DiskType.optane, DiskType.nand]))
@pytest.mark.require_disk("core", DiskTypeLowerThan("cache"))
def test_ioclass_request_size():
cache, core = prepare()
ioclass_id = 1
@ -161,11 +160,10 @@ def test_ioclass_request_size(prepare_and_cleanup):
assert stats["dirty"].get_value(Unit.Blocks4096) == 0
@pytest.mark.require_disk("cache", DiskTypeSet([DiskType.optane, DiskType.nand]))
@pytest.mark.require_disk("core", DiskTypeLowerThan("cache"))
@pytest.mark.parametrize("filesystem", list(Filesystem) + [False])
@pytest.mark.parametrize(
"prepare_and_cleanup", [{"core_count": 1, "cache_count": 1}], indirect=True
)
def test_ioclass_direct(prepare_and_cleanup, filesystem):
def test_ioclass_direct(filesystem):
"""
Perform buffered/direct IO to/from files or raw block device.
Data from buffered IO should be cached.
@ -247,11 +245,10 @@ def test_ioclass_direct(prepare_and_cleanup, filesystem):
f"Expected: {base_occupancy + io_size}, actual: {new_occupancy}"
@pytest.mark.require_disk("cache", DiskTypeSet([DiskType.optane, DiskType.nand]))
@pytest.mark.require_disk("core", DiskTypeLowerThan("cache"))
@pytest.mark.parametrize("filesystem", Filesystem)
@pytest.mark.parametrize(
"prepare_and_cleanup", [{"core_count": 1, "cache_count": 1}], indirect=True
)
def test_ioclass_metadata(prepare_and_cleanup, filesystem):
def test_ioclass_metadata(filesystem):
"""
Perform operations on files that cause metadata update.
Determine if every such operation results in increased writes to cached metadata.
@ -338,11 +335,10 @@ def test_ioclass_metadata(prepare_and_cleanup, filesystem):
pytest.xfail("No requests to metadata while deleting directory with files!")
@pytest.mark.require_disk("cache", DiskTypeSet([DiskType.optane, DiskType.nand]))
@pytest.mark.require_disk("core", DiskTypeLowerThan("cache"))
@pytest.mark.parametrize("filesystem", Filesystem)
@pytest.mark.parametrize(
"prepare_and_cleanup", [{"core_count": 1, "cache_count": 1}], indirect=True
)
def test_ioclass_id_as_condition(prepare_and_cleanup, filesystem):
def test_ioclass_id_as_condition(filesystem):
"""
Load config in which IO class ids are used as conditions in other IO class definitions.
Check if performed IO is properly classified.
@ -478,11 +474,10 @@ def test_ioclass_id_as_condition(prepare_and_cleanup, filesystem):
f"Expected: {base_occupancy + ioclass_file_size}, actual: {new_occupancy}"
@pytest.mark.require_disk("cache", DiskTypeSet([DiskType.optane, DiskType.nand]))
@pytest.mark.require_disk("core", DiskTypeLowerThan("cache"))
@pytest.mark.parametrize("filesystem", Filesystem)
@pytest.mark.parametrize(
"prepare_and_cleanup", [{"core_count": 1, "cache_count": 1}], indirect=True
)
def test_ioclass_conditions_or(prepare_and_cleanup, filesystem):
def test_ioclass_conditions_or(filesystem):
"""
Load config with IO class combining 5 contradicting conditions connected by OR operator.
Check if every IO fulfilling one condition is classified properly.
@ -527,11 +522,10 @@ def test_ioclass_conditions_or(prepare_and_cleanup, filesystem):
f"Expected: {base_occupancy + file_size}, actual: {new_occupancy}"
@pytest.mark.require_disk("cache", DiskTypeSet([DiskType.optane, DiskType.nand]))
@pytest.mark.require_disk("core", DiskTypeLowerThan("cache"))
@pytest.mark.parametrize("filesystem", Filesystem)
@pytest.mark.parametrize(
"prepare_and_cleanup", [{"core_count": 1, "cache_count": 1}], indirect=True
)
def test_ioclass_conditions_and(prepare_and_cleanup, filesystem):
def test_ioclass_conditions_and(filesystem):
"""
Load config with IO class combining 5 conditions contradicting at least one other condition
connected by AND operator.

View File

@ -8,14 +8,13 @@ import pytest
from api.cas import casadm, casadm_parser
from tests.conftest import base_prepare
from core.test_run import TestRun
from storage_devices.disk import DiskType
from storage_devices.disk import DiskType, DiskTypeSet, DiskTypeLowerThan
from test_utils.size import Size, Unit
@pytest.mark.parametrize(
"prepare_and_cleanup", [{"core_count": 1, "cache_count": 1}], indirect=True
)
def test_load_occupied_id(prepare_and_cleanup):
@pytest.mark.require_disk("cache", DiskTypeSet([DiskType.optane, DiskType.nand]))
@pytest.mark.require_disk("core", DiskTypeLowerThan("cache"))
def test_load_occupied_id():
"""
1. Start new cache instance (don't specify cache id)
2. Add core to newly create cache.
@ -26,18 +25,8 @@ def test_load_occupied_id(prepare_and_cleanup):
"""
prepare()
cache_device = next(
disk
for disk in TestRun.dut.disks
if disk.disk_type in [DiskType.optane, DiskType.nand]
)
core_device = next(
disk
for disk in TestRun.dut.disks
if (
disk.disk_type.value > cache_device.disk_type.value and disk != cache_device
)
)
cache_device = TestRun.disks['cache']
core_device = TestRun.disks['core']
TestRun.LOGGER.info("Creating partitons for test")
cache_device.create_partitions([Size(500, Unit.MebiByte), Size(500, Unit.MebiByte)])

View File

@ -12,7 +12,7 @@ from test_tools.dd import Dd
from api.cas.cache_config import CacheMode, CleaningPolicy
from tests.conftest import base_prepare
from core.test_run import TestRun
from storage_devices.disk import DiskType
from storage_devices.disk import DiskType, DiskTypeSet, DiskTypeLowerThan
from test_utils.size import Size, Unit
from test_utils.os_utils import Udev
@ -72,6 +72,8 @@ write_wo_zero_stats = [
]
@pytest.mark.require_disk("cache", DiskTypeSet([DiskType.optane, DiskType.nand]))
@pytest.mark.require_disk("core", DiskTypeLowerThan("cache"))
@pytest.mark.parametrize(
"cache_mode,zero_stats",
[
@ -82,10 +84,7 @@ write_wo_zero_stats = [
(CacheMode.WO, write_wo_zero_stats),
],
)
@pytest.mark.parametrize(
"prepare_and_cleanup", [{"core_count": 1, "cache_count": 1}], indirect=True
)
def test_block_stats_write(prepare_and_cleanup, cache_mode, zero_stats):
def test_block_stats_write(cache_mode, zero_stats):
"""Perform read and write operations to cache instance in different cache modes
and check if block stats values are correct"""
cache, cores = prepare(cache_mode)
@ -198,6 +197,8 @@ read_wo_zero_stats = [
]
@pytest.mark.require_disk("cache", DiskTypeSet([DiskType.optane, DiskType.nand]))
@pytest.mark.require_disk("core", DiskTypeLowerThan("cache"))
@pytest.mark.parametrize(
"cache_mode,zero_stats",
[
@ -208,10 +209,7 @@ read_wo_zero_stats = [
(CacheMode.WO, read_wo_zero_stats),
],
)
@pytest.mark.parametrize(
"prepare_and_cleanup", [{"core_count": 1, "cache_count": 1}], indirect=True
)
def test_block_stats_read(prepare_and_cleanup, cache_mode, zero_stats):
def test_block_stats_read(cache_mode, zero_stats):
"""Perform read and write operations to cache instance in different cache modes
and check if block stats values are correct"""
cache, cores = prepare(cache_mode)
@ -294,16 +292,8 @@ def flush(cache):
def prepare(cache_mode: CacheMode):
base_prepare()
ioclass_config.remove_ioclass_config()
cache_device = next(
disk
for disk in TestRun.dut.disks
if disk.disk_type in [DiskType.optane, DiskType.nand]
)
core_device = next(
disk
for disk in TestRun.dut.disks
if (disk.disk_type.value > cache_device.disk_type.value and disk != cache_device)
)
cache_device = TestRun.disks['cache']
core_device = TestRun.disks['core']
cache_device.create_partitions([Size(500, Unit.MebiByte)])
core_device.create_partitions(

View File

@ -12,7 +12,7 @@ from api.cas import casadm_parser
from api.cas.cache_config import CleaningPolicy
from tests.conftest import base_prepare
from core.test_run import TestRun
from storage_devices.disk import DiskType
from storage_devices.disk import DiskType, DiskTypeSet, DiskTypeLowerThan
from test_tools.disk_utils import Filesystem
from test_utils.size import Size, Unit
from test_utils.os_utils import sync, Udev
@ -23,10 +23,9 @@ mountpoint = "/tmp/cas1-1"
cache_id = 1
@pytest.mark.parametrize(
"prepare_and_cleanup", [{"core_count": 1, "cache_count": 1}], indirect=True
)
def test_ioclass_stats_set(prepare_and_cleanup):
@pytest.mark.require_disk("cache", DiskTypeSet([DiskType.optane, DiskType.nand]))
@pytest.mark.require_disk("core", DiskTypeLowerThan("cache"))
def test_ioclass_stats_set():
"""Try to retrieve stats for all set ioclasses"""
prepare()
min_ioclass_id = 1
@ -56,10 +55,9 @@ def test_ioclass_stats_set(prepare_and_cleanup):
)
@pytest.mark.parametrize(
"prepare_and_cleanup", [{"core_count": 1, "cache_count": 1}], indirect=True
)
def test_ioclass_stats_sum(prepare_and_cleanup):
@pytest.mark.require_disk("cache", DiskTypeSet([DiskType.optane, DiskType.nand]))
@pytest.mark.require_disk("core", DiskTypeLowerThan("cache"))
def test_ioclass_stats_sum():
"""Check if stats for all set ioclasses sum up to cache stats"""
cache, core = prepare()
min_ioclass_id = 1
@ -148,16 +146,8 @@ def flush_cache(cache_id):
def prepare():
base_prepare()
ioclass_config.remove_ioclass_config()
cache_device = next(
disk
for disk in TestRun.dut.disks
if disk.disk_type in [DiskType.optane, DiskType.nand]
)
core_device = next(
disk
for disk in TestRun.dut.disks
if (disk.disk_type.value > cache_device.disk_type.value and disk != cache_device)
)
cache_device = TestRun.disks['cache']
core_device = TestRun.disks['core']
cache_device.create_partitions([Size(500, Unit.MebiByte)])
core_device.create_partitions([Size(2, Unit.GibiByte)])