tests: update tests

Signed-off-by: Kamil Gierszewski <kamil.gierszewski@huawei.com>
This commit is contained in:
Kamil Gierszewski
2024-08-29 12:04:26 +02:00
parent ec0e03fb39
commit e8bdcdae4f
23 changed files with 2129 additions and 1665 deletions

View File

@@ -0,0 +1,4 @@
#
# Copyright(c) 2024 Huawei Technologies Co., Ltd.
# SPDX-License-Identifier: BSD-3-Clause
#

View File

@@ -1,13 +1,12 @@
#
# Copyright(c) 2019-2021 Intel Corporation
# Copyright(c) 2024 Huawei Technologies Co., Ltd.
# SPDX-License-Identifier: BSD-3-Clause
#
import time
import pytest
from datetime import timedelta
from api.cas import casadm
from api.cas.cache_config import (
CacheMode,
@@ -19,7 +18,7 @@ from api.cas.cache_config import (
from storage_devices.disk import DiskType, DiskTypeSet, DiskTypeLowerThan
from core.test_run import TestRun
from test_utils.size import Size, Unit
from test_utils.os_utils import Udev, sync
from test_utils.os_utils import Udev
from test_tools.fio.fio import Fio
from test_tools.fio.fio_param import ReadWrite, IoEngine
@@ -39,216 +38,201 @@ cas_cleaner_process_name = "cas_cl_"
@pytest.mark.parametrize("cleaning_policy", CleaningPolicy)
@pytest.mark.require_disk("cache", DiskTypeSet([DiskType.optane, DiskType.nand]))
@pytest.mark.require_disk("core", DiskTypeLowerThan("cache"))
def test_cleaning_policies_in_write_back(cleaning_policy):
def test_cleaning_policies_in_write_back(cleaning_policy: CleaningPolicy):
"""
title: Test for cleaning policy operation in Write-Back cache mode.
description: |
Check if ALRU, NOP and ACP cleaning policies preserve their
parameters when changed and if they flush dirty data properly
in Write-Back cache mode.
pass_criteria:
- Flush parameters preserve their values when changed.
- Dirty data is flushed or not according to the policy used.
title: Test for cleaning policy operation in Write-Back cache mode.
description: |
Check if ALRU, NOP and ACP cleaning policies preserve their
parameters when changed and if they flush dirty data properly
in Write-Back cache mode.
pass_criteria:
- Flush parameters preserve their values when changed.
- Dirty data is flushed or not according to the policy used.
"""
with TestRun.step("Partition cache and core devices"):
cache_dev, core_dev = storage_prepare()
with TestRun.step("Prepare cache and core devices"):
cache_dev = TestRun.disks["cache"]
core_dev = TestRun.disks["core"]
cache_dev.create_partitions([Size(1, Unit.GibiByte)])
core_dev.create_partitions([Size(2, Unit.GibiByte)] * cores_count)
with TestRun.step("Disable udev"):
Udev.disable()
with TestRun.step(
f"Start cache in Write-Back mode with {cleaning_policy} cleaning policy"
):
with TestRun.step(f"Start cache in Write-Back mode with {cleaning_policy} cleaning policy"):
cache = casadm.start_cache(cache_dev.partitions[0], CacheMode.WB, force=True)
set_cleaning_policy_and_params(cache, cleaning_policy)
cache.set_cleaning_policy(cleaning_policy=cleaning_policy)
set_cleaning_policy_params(cache, cleaning_policy)
with TestRun.step("Check for running CAS cleaner"):
if TestRun.executor.run(f"pgrep {cas_cleaner_process_name}").exit_code != 0:
output = TestRun.executor.run(f"pgrep {cas_cleaner_process_name}")
if output.exit_code != 0:
TestRun.fail("CAS cleaner process is not running!")
with TestRun.step(f"Add {cores_count} cores to the cache"):
core = []
for i in range(cores_count):
core.append(cache.add_core(core_dev.partitions[i]))
cores = [cache.add_core(partition) for partition in core_dev.partitions]
with TestRun.step("Run fio"):
fio = (
Fio()
.create_command()
.io_engine(IoEngine.libaio)
.block_size(Size(4, Unit.KibiByte))
.size(io_size)
.read_write(ReadWrite.randwrite)
.direct(True)
)
for core in cores:
fio.add_job().target(core.path)
with TestRun.step("Run 'fio'"):
fio = fio_prepare()
for i in range(cores_count):
fio.add_job().target(core[i].path)
fio.run()
time.sleep(3)
core_writes_before_wait_for_cleaning = (
cache.get_statistics().block_stats.core.writes
)
core_writes_before_wait_for_cleaning = cache.get_statistics().block_stats.core.writes
with TestRun.step(f"Wait {time_to_wait} seconds"):
time.sleep(time_to_wait)
with TestRun.step("Check write statistics for core device"):
core_writes_after_wait_for_cleaning = (
cache.get_statistics().block_stats.core.writes
)
core_writes_after_wait_for_cleaning = cache.get_statistics().block_stats.core.writes
check_cleaning_policy_operation(
cleaning_policy,
core_writes_before_wait_for_cleaning,
core_writes_after_wait_for_cleaning,
)
with TestRun.step("Stop all caches"):
casadm.stop_all_caches()
Udev.enable()
@pytest.mark.parametrize("cleaning_policy", CleaningPolicy)
@pytest.mark.require_disk("cache", DiskTypeSet([DiskType.optane, DiskType.nand]))
@pytest.mark.require_disk("core", DiskTypeLowerThan("cache"))
def test_cleaning_policies_in_write_through(cleaning_policy):
"""
title: Test for cleaning policy operation in Write-Through cache mode.
description: |
Check if ALRU, NOP and ACP cleaning policies preserve their
parameters when changed and if they flush dirty data properly
in Write-Through cache mode.
pass_criteria:
- Flush parameters preserve their values when changed.
- Dirty data is flushed or not according to the policy used.
title: Test for cleaning policy operation in Write-Through cache mode.
description: |
Check if ALRU, NOP and ACP cleaning policies preserve their
parameters when changed and if they flush dirty data properly
in Write-Through cache mode.
pass_criteria:
- Flush parameters preserve their values when changed.
- Dirty data is flushed or not according to the policy used.
"""
with TestRun.step("Partition cache and core devices"):
cache_dev, core_dev = storage_prepare()
with TestRun.step("Prepare cache and core devices"):
cache_dev = TestRun.disks["cache"]
core_dev = TestRun.disks["core"]
cache_dev.create_partitions([Size(1, Unit.GibiByte)])
core_dev.create_partitions([Size(2, Unit.GibiByte)] * cores_count)
with TestRun.step("Disable udev"):
Udev.disable()
with TestRun.step(
f"Start cache in Write-Through mode with {cleaning_policy} cleaning policy"
):
with TestRun.step(f"Start cache in Write-Through mode with {cleaning_policy} cleaning policy"):
cache = casadm.start_cache(cache_dev.partitions[0], CacheMode.WT, force=True)
set_cleaning_policy_and_params(cache, cleaning_policy)
set_cleaning_policy_params(cache, cleaning_policy)
with TestRun.step("Check for running CAS cleaner"):
if TestRun.executor.run(f"pgrep {cas_cleaner_process_name}").exit_code != 0:
output = TestRun.executor.run(f"pgrep {cas_cleaner_process_name}")
if output.exit_code != 0:
TestRun.fail("CAS cleaner process is not running!")
with TestRun.step(f"Add {cores_count} cores to the cache"):
core = []
for i in range(cores_count):
core.append(cache.add_core(core_dev.partitions[i]))
cores = [cache.add_core(partition) for partition in core_dev.partitions]
with TestRun.step("Change cache mode to Write-Back"):
cache.set_cache_mode(CacheMode.WB)
with TestRun.step("Run 'fio'"):
fio = fio_prepare()
for i in range(cores_count):
fio.add_job().target(core[i].path)
with TestRun.step("Populate cache with dirty data"):
fio = (
Fio()
.create_command()
.io_engine(IoEngine.libaio)
.block_size(Size(4, Unit.KibiByte))
.size(io_size)
.read_write(ReadWrite.randwrite)
.direct(1)
)
for core in cores:
fio.add_job().target(core.path)
fio.run()
time.sleep(3)
with TestRun.step("Change cache mode back to Write-Through"):
cache.set_cache_mode(CacheMode.WT, flush=False)
core_writes_before_wait_for_cleaning = (
cache.get_statistics().block_stats.core.writes
)
core_writes_before_wait_for_cleaning = cache.get_statistics().block_stats.core.writes
with TestRun.step(f"Wait {time_to_wait} seconds"):
time.sleep(time_to_wait)
with TestRun.step("Check write statistics for core device"):
core_writes_after_wait_for_cleaning = (
cache.get_statistics().block_stats.core.writes
)
core_writes_after_wait_for_cleaning = cache.get_statistics().block_stats.core.writes
check_cleaning_policy_operation(
cleaning_policy,
core_writes_before_wait_for_cleaning,
core_writes_after_wait_for_cleaning,
)
with TestRun.step("Stop all caches"):
casadm.stop_all_caches()
Udev.enable()
def storage_prepare():
cache_dev = TestRun.disks["cache"]
cache_dev.create_partitions([Size(1, Unit.GibiByte)])
core_dev = TestRun.disks["core"]
parts = [Size(2, Unit.GibiByte)] * cores_count
core_dev.create_partitions(parts)
return cache_dev, core_dev
def set_cleaning_policy_and_params(cache, cleaning_policy):
if cleaning_policy != CleaningPolicy.DEFAULT:
cache.set_cleaning_policy(cleaning_policy)
def set_cleaning_policy_params(cache, cleaning_policy):
current_cleaning_policy = cache.get_cleaning_policy()
if current_cleaning_policy != cleaning_policy:
TestRun.LOGGER.error(
f"Cleaning policy is {current_cleaning_policy}, "
f"should be {cleaning_policy}"
f"Cleaning policy is {current_cleaning_policy}, should be {cleaning_policy}"
)
if cleaning_policy == CleaningPolicy.alru:
alru_params = FlushParametersAlru()
alru_params.wake_up_time = Time(seconds=10)
alru_params.staleness_time = Time(seconds=2)
alru_params.flush_max_buffers = 100
alru_params.activity_threshold = Time(milliseconds=1000)
cache.set_params_alru(alru_params)
current_alru_params = cache.get_flush_parameters_alru()
if current_alru_params != alru_params:
failed_params = ""
if current_alru_params.wake_up_time != alru_params.wake_up_time:
failed_params += (
f"Wake Up time is {current_alru_params.wake_up_time}, "
f"should be {alru_params.wake_up_time}\n"
)
if current_alru_params.staleness_time != alru_params.staleness_time:
failed_params += (
f"Staleness Time is {current_alru_params.staleness_time}, "
f"should be {alru_params.staleness_time}\n"
)
if current_alru_params.flush_max_buffers != alru_params.flush_max_buffers:
failed_params += (
f"Flush Max Buffers is {current_alru_params.flush_max_buffers}, "
f"should be {alru_params.flush_max_buffers}\n"
)
if current_alru_params.activity_threshold != alru_params.activity_threshold:
failed_params += (
f"Activity Threshold is {current_alru_params.activity_threshold}, "
f"should be {alru_params.activity_threshold}\n"
)
TestRun.LOGGER.error(f"ALRU parameters did not switch properly:\n{failed_params}")
match cleaning_policy:
case CleaningPolicy.acp:
acp_params = FlushParametersAcp()
acp_params.wake_up_time = Time(milliseconds=100)
acp_params.flush_max_buffers = 64
cache.set_params_acp(acp_params)
current_acp_params = cache.get_flush_parameters_acp()
if current_acp_params != acp_params:
failed_params = ""
if cleaning_policy == CleaningPolicy.acp:
acp_params = FlushParametersAcp()
acp_params.wake_up_time = Time(milliseconds=100)
acp_params.flush_max_buffers = 64
cache.set_params_acp(acp_params)
current_acp_params = cache.get_flush_parameters_acp()
if current_acp_params != acp_params:
failed_params = ""
if current_acp_params.wake_up_time != acp_params.wake_up_time:
failed_params += (
f"Wake Up time is {current_acp_params.wake_up_time}, "
f"should be {acp_params.wake_up_time}\n"
)
if current_acp_params.flush_max_buffers != acp_params.flush_max_buffers:
failed_params += (
f"Flush Max Buffers is {current_acp_params.flush_max_buffers}, "
f"should be {acp_params.flush_max_buffers}\n"
)
TestRun.LOGGER.error(f"ACP parameters did not switch properly:\n{failed_params}")
if current_acp_params.wake_up_time != acp_params.wake_up_time:
failed_params += (
f"Wake Up time is {current_acp_params.wake_up_time}, "
f"should be {acp_params.wake_up_time}\n"
)
if current_acp_params.flush_max_buffers != acp_params.flush_max_buffers:
failed_params += (
f"Flush Max Buffers is {current_acp_params.flush_max_buffers}, "
f"should be {acp_params.flush_max_buffers}\n"
)
TestRun.LOGGER.error(f"ACP parameters did not switch properly:\n{failed_params}")
def fio_prepare():
fio = (
Fio()
.create_command()
.io_engine(IoEngine.libaio)
.block_size(Size(4, Unit.KibiByte))
.size(io_size)
.read_write(ReadWrite.randwrite)
.direct(1)
)
return fio
case CleaningPolicy.alru:
alru_params = FlushParametersAlru()
alru_params.wake_up_time = Time(seconds=10)
alru_params.staleness_time = Time(seconds=2)
alru_params.flush_max_buffers = 100
alru_params.activity_threshold = Time(milliseconds=1000)
cache.set_params_alru(alru_params)
current_alru_params = cache.get_flush_parameters_alru()
if current_alru_params != alru_params:
failed_params = ""
if current_alru_params.wake_up_time != alru_params.wake_up_time:
failed_params += (
f"Wake Up time is {current_alru_params.wake_up_time}, "
f"should be {alru_params.wake_up_time}\n"
)
if current_alru_params.staleness_time != alru_params.staleness_time:
failed_params += (
f"Staleness Time is {current_alru_params.staleness_time}, "
f"should be {alru_params.staleness_time}\n"
)
if current_alru_params.flush_max_buffers != alru_params.flush_max_buffers:
failed_params += (
f"Flush Max Buffers is {current_alru_params.flush_max_buffers}, "
f"should be {alru_params.flush_max_buffers}\n"
)
if current_alru_params.activity_threshold != alru_params.activity_threshold:
failed_params += (
f"Activity Threshold is {current_alru_params.activity_threshold}, "
f"should be {alru_params.activity_threshold}\n"
)
TestRun.LOGGER.error(f"ALRU parameters did not switch properly:\n{failed_params}")
def check_cleaning_policy_operation(
@@ -256,36 +240,37 @@ def check_cleaning_policy_operation(
core_writes_before_wait_for_cleaning,
core_writes_after_wait_for_cleaning,
):
if cleaning_policy == CleaningPolicy.alru:
if core_writes_before_wait_for_cleaning.value != 0:
TestRun.LOGGER.error(
"CAS cleaner started to clean dirty data right after IO! "
"According to ALRU parameters set in this test cleaner should "
"wait 10 seconds after IO before cleaning dirty data."
)
if core_writes_after_wait_for_cleaning <= core_writes_before_wait_for_cleaning:
TestRun.LOGGER.error(
"ALRU cleaning policy is not working properly! "
"Core writes should increase in time while cleaning dirty data."
)
if cleaning_policy == CleaningPolicy.nop:
if (
core_writes_after_wait_for_cleaning.value != 0
or core_writes_before_wait_for_cleaning.value != 0
):
TestRun.LOGGER.error(
"NOP cleaning policy is not working properly! "
"There should be no core writes as there is no cleaning of dirty data."
)
if cleaning_policy == CleaningPolicy.acp:
if core_writes_before_wait_for_cleaning.value == 0:
TestRun.LOGGER.error(
"CAS cleaner did not start cleaning dirty data right after IO! "
"According to ACP policy cleaner should start "
"cleaning dirty data right after IO."
)
if core_writes_after_wait_for_cleaning <= core_writes_before_wait_for_cleaning:
TestRun.LOGGER.error(
"ACP cleaning policy is not working properly! "
"Core writes should increase in time while cleaning dirty data."
)
match cleaning_policy:
case CleaningPolicy.alru:
if core_writes_before_wait_for_cleaning != Size.zero():
TestRun.LOGGER.error(
"CAS cleaner started to clean dirty data right after IO! "
"According to ALRU parameters set in this test cleaner should "
"wait 10 seconds after IO before cleaning dirty data"
)
if core_writes_after_wait_for_cleaning <= core_writes_before_wait_for_cleaning:
TestRun.LOGGER.error(
"ALRU cleaning policy is not working properly! "
"Core writes should increase in time while cleaning dirty data"
)
case CleaningPolicy.nop:
if (
core_writes_after_wait_for_cleaning != Size.zero()
or core_writes_before_wait_for_cleaning.value != Size.zero()
):
TestRun.LOGGER.error(
"NOP cleaning policy is not working properly! "
"There should be no core writes as there is no cleaning of dirty data"
)
case CleaningPolicy.acp:
if core_writes_before_wait_for_cleaning == Size.zero():
TestRun.LOGGER.error(
"CAS cleaner did not start cleaning dirty data right after IO! "
"According to ACP policy cleaner should start "
"cleaning dirty data right after IO"
)
if core_writes_after_wait_for_cleaning <= core_writes_before_wait_for_cleaning:
TestRun.LOGGER.error(
"ACP cleaning policy is not working properly! "
"Core writes should increase in time while cleaning dirty data"
)

View File

@@ -1,195 +1,221 @@
#
# Copyright(c) 2020-2021 Intel Corporation
# Copyright(c) 2024 Huawei Technologies Co., Ltd.
# SPDX-License-Identifier: BSD-3-Clause
#
from time import sleep
import pytest
from time import sleep
from api.cas import casadm, casadm_parser, cli
from api.cas.cache_config import CacheMode, CleaningPolicy, CacheModeTrait, SeqCutOffPolicy
from core.test_run import TestRun
from storage_devices.disk import DiskType, DiskTypeSet, DiskTypeLowerThan
from test_tools.dd import Dd
from test_tools.fio.fio import Fio
from test_tools.fio.fio_param import IoEngine, ReadWrite
from test_utils.output import CmdException
from test_utils.size import Size, Unit
cache_size = Size(2, Unit.GibiByte)
caches_number = 3
@pytest.mark.parametrize("cache_mode", CacheMode.with_traits(CacheModeTrait.LazyWrites))
@pytest.mark.require_disk("cache", DiskTypeSet([DiskType.optane, DiskType.nand]))
@pytest.mark.require_disk("core", DiskTypeSet([DiskType.hdd, DiskType.hdd4k]))
def test_concurrent_cores_flush(cache_mode):
def test_concurrent_cores_flush(cache_mode: CacheMode):
"""
title: Fail to flush two cores simultaneously.
description: |
CAS should return an error on attempt to flush second core if there is already
one flush in progress.
pass_criteria:
- No system crash.
- First core flushing should finish successfully.
- It should not be possible to run flushing command on cores within
the same cache simultaneously.
title: Fail to flush two cores simultaneously.
description: |
CAS should return an error on attempt to flush second core if there is already
one flush in progress.
pass_criteria:
- No system crash.
- First core flushing should finish successfully.
- It should not be possible to run flushing command on cores within
the same cache simultaneously.
"""
with TestRun.step("Prepare cache and core."):
cache_dev = TestRun.disks['cache']
cache_dev.create_partitions([cache_size])
with TestRun.step("Prepare cache and core devices"):
cache_dev = TestRun.disks["cache"]
core_dev = TestRun.disks["core"]
cache_dev.create_partitions([Size(2, Unit.GibiByte)])
core_dev.create_partitions([Size(5, Unit.GibiByte)] * 2)
cache_part = cache_dev.partitions[0]
core_dev = TestRun.disks['core']
core_dev.create_partitions([cache_size * 2] * 2)
core_part1 = core_dev.partitions[0]
core_part2 = core_dev.partitions[1]
with TestRun.step("Start cache."):
with TestRun.step("Start cache"):
cache = casadm.start_cache(cache_part, cache_mode, force=True)
with TestRun.step("Disable cleaning and sequential cutoff."):
cache.set_cleaning_policy(CleaningPolicy.nop)
cache.set_seq_cutoff_policy(SeqCutOffPolicy.never)
with TestRun.step(f"Add both core devices to cache."):
with TestRun.step(f"Add both core devices to cache"):
core1 = cache.add_core(core_part1)
core2 = cache.add_core(core_part2)
with TestRun.step("Run workload on concurrent cores."):
block_size = Size(4, Unit.MebiByte)
count = int(cache_size.value / 2 / block_size.value)
with TestRun.step("Disable cleaning and sequential cutoff"):
cache.set_cleaning_policy(CleaningPolicy.nop)
cache.set_seq_cutoff_policy(SeqCutOffPolicy.never)
dd_pid = Dd().output(core1.path) \
.input("/dev/urandom") \
.block_size(block_size) \
.count(count) \
.run_in_background()
with TestRun.step("Run concurrent fio on both cores"):
fio_pids = []
for core in [core1, core2]:
fio = (
Fio()
.create_command()
.io_engine(IoEngine.libaio)
.target(core.path)
.size(core.size)
.block_size(Size(4, Unit.MebiByte))
.read_write(ReadWrite.write)
.direct(1)
)
fio_pid = fio.run_in_background()
fio_pids.append(fio_pid)
Dd().output(core2.path) \
.input("/dev/urandom") \
.block_size(block_size) \
.count(count) \
.run()
for fio_pid in fio_pids:
if not TestRun.executor.check_if_process_exists(fio_pid):
TestRun.fail("Fio failed to start")
with TestRun.step("Check if both DD operations finished."):
while TestRun.executor.run(f"ls /proc/{dd_pid}").exit_code == 0:
sleep(1)
with TestRun.step("Wait for fio to finish"):
for fio_pid in fio_pids:
while TestRun.executor.check_if_process_exists(fio_pid):
sleep(1)
with TestRun.step("Check if both cores contain dirty blocks."):
if int(core1.get_dirty_blocks()) == 0:
TestRun.fail("The first core does not contain dirty blocks.")
if int(core2.get_dirty_blocks()) == 0:
TestRun.fail("The second core does not contain dirty blocks.")
core2_dirty_blocks_before = int(core2.get_dirty_blocks())
with TestRun.step("Check if both cores contain dirty blocks"):
if core1.get_dirty_blocks() == Size.zero():
TestRun.fail("The first core does not contain dirty blocks")
if core2.get_dirty_blocks() == Size.zero():
TestRun.fail("The second core does not contain dirty blocks")
core2_dirty_blocks_before = core2.get_dirty_blocks()
with TestRun.step("Start flushing the first core."):
TestRun.executor.run_in_background(
with TestRun.step("Start flushing the first core in background"):
output_pid = TestRun.executor.run_in_background(
cli.flush_core_cmd(str(cache.cache_id), str(core1.core_id))
)
if not TestRun.executor.check_if_process_exists(output_pid):
TestRun.fail("Failed to start core flush in background")
with TestRun.step("Wait some time and start flushing the second core."):
sleep(2)
percentage = casadm_parser.get_flushing_progress(cache.cache_id, core1.core_id)
with TestRun.step("Wait until flush starts"):
while TestRun.executor.check_if_process_exists(output_pid):
try:
casadm_parser.get_flushing_progress(cache.cache_id, core1.core_id)
break
except CmdException:
pass
with TestRun.step(
"Wait until first core reach 40% flush and start flush operation on the second core"
):
percentage = 0
while percentage < 40:
percentage = casadm_parser.get_flushing_progress(cache.cache_id, core1.core_id)
try:
core2.flush_core()
TestRun.fail("The first core is flushing right now so flush attempt of the second core "
"should fail.")
TestRun.fail(
"The first core is flushing right now so flush attempt of the second core "
"should fail"
)
except CmdException:
TestRun.LOGGER.info("The first core is flushing right now so the second core's flush "
"fails as expected.")
TestRun.LOGGER.info(
"The first core is flushing right now so the second core's flush "
"fails as expected"
)
with TestRun.step("Wait for the first core to finish flushing."):
with TestRun.step("Wait for the first core to finish flushing"):
try:
percentage = casadm_parser.get_flushing_progress(cache.cache_id, core1.core_id)
percentage = 0
while percentage < 100:
percentage = casadm_parser.get_flushing_progress(cache.cache_id, core1.core_id)
sleep(1)
except CmdException:
TestRun.LOGGER.info("The first core is not flushing dirty data anymore.")
TestRun.LOGGER.info("The first core is not flushing dirty data anymore")
with TestRun.step("Check number of dirty data on both cores."):
if int(core1.get_dirty_blocks()) > 0:
TestRun.LOGGER.error("The quantity of dirty cache lines on the first core "
"after completed flush should be zero.")
with TestRun.step("Check number of dirty data on both cores"):
if core1.get_dirty_blocks() > Size.zero():
TestRun.LOGGER.error(
"The quantity of dirty cache lines on the first core "
"after completed flush should be zero"
)
core2_dirty_blocks_after = int(core2.get_dirty_blocks())
core2_dirty_blocks_after = core2.get_dirty_blocks()
if core2_dirty_blocks_before != core2_dirty_blocks_after:
TestRun.LOGGER.error("The quantity of dirty cache lines on the second core "
"after failed flush should not change.")
with TestRun.step("Stop cache."):
cache.stop()
TestRun.LOGGER.error(
"The quantity of dirty cache lines on the second core "
"after failed flush should not change"
)
@pytest.mark.parametrize("cache_mode", CacheMode.with_traits(CacheModeTrait.LazyWrites))
@pytest.mark.require_disk("cache", DiskTypeSet([DiskType.optane, DiskType.nand]))
@pytest.mark.require_disk("core", DiskTypeLowerThan("cache"))
def test_concurrent_caches_flush(cache_mode):
def test_concurrent_caches_flush(cache_mode: CacheMode):
"""
title: Success to flush two caches simultaneously.
description: |
CAS should successfully flush multiple caches if there is already other flush in progress.
pass_criteria:
- No system crash.
- Flush for each cache should finish successfully.
title: Success to flush two caches simultaneously.
description: |
CAS should successfully flush multiple caches if there is already other flush in progress.
pass_criteria:
- No system crash.
- Flush for each cache should finish successfully.
"""
with TestRun.step("Prepare caches and cores."):
cache_dev = TestRun.disks['cache']
cache_dev.create_partitions([cache_size] * caches_number)
core_dev = TestRun.disks['core']
core_dev.create_partitions([cache_size * 2] * caches_number)
caches_number = 3
with TestRun.step(f"Start {caches_number} caches."):
caches = []
for part in cache_dev.partitions:
caches.append(casadm.start_cache(part, cache_mode, force=True))
with TestRun.step("Prepare cache and core devices"):
cache_dev = TestRun.disks["cache"]
core_dev = TestRun.disks["core"]
with TestRun.step("Disable cleaning and sequential cutoff."):
cache_dev.create_partitions([Size(2, Unit.GibiByte)] * caches_number)
core_dev.create_partitions([Size(2, Unit.GibiByte) * 2] * caches_number)
with TestRun.step(f"Start {caches_number} caches"):
caches = [
casadm.start_cache(cache_dev=part, cache_mode=cache_mode, force=True)
for part in cache_dev.partitions
]
with TestRun.step("Disable cleaning and sequential cutoff"):
for cache in caches:
cache.set_cleaning_policy(CleaningPolicy.nop)
cache.set_seq_cutoff_policy(SeqCutOffPolicy.never)
with TestRun.step(f"Add core devices to caches."):
cores = []
for i, cache in enumerate(caches):
cores.append(cache.add_core(core_dev.partitions[i]))
with TestRun.step(f"Add core devices to caches"):
cores = [cache.add_core(core_dev=core_dev.partitions[i]) for i, cache in enumerate(caches)]
with TestRun.step("Run workload on each OpenCAS device."):
# Each cache has one core fully saturated with dirty blocks.
block_size = Size(4, Unit.MebiByte)
count = int(cache_size.value / block_size.value)
total_saturation = block_size * count
with TestRun.step("Run fio on all cores"):
fio_pids = []
for core in cores:
Dd().output(core.path) \
.input("/dev/urandom") \
.block_size(block_size) \
.count(count) \
.run()
with TestRun.step("Check if each cache is full of dirty blocks."):
for cache in caches:
if not int(cache.get_dirty_blocks()) != total_saturation.get_value(Unit.Blocks4096):
TestRun.fail(f"The cache {cache.cache_id} does not contain dirty blocks.")
with TestRun.step("Start flushing all caches simultaneously."):
flush_pids = []
for cache in caches:
flush_pids.append(
TestRun.executor.run_in_background(cli.flush_cache_cmd(str(cache.cache_id)))
fio = (
Fio()
.create_command()
.target(core)
.io_engine(IoEngine.libaio)
.block_size(Size(4, Unit.MebiByte))
.size(core.size)
.read_write(ReadWrite.write)
.direct(1)
)
fio_pids.append(fio.run_in_background())
with TestRun.step("Wait for all caches to finish flushing."):
is_flushing = [True] * len(flush_pids)
while any(is_flushing):
for i, pid in enumerate(flush_pids):
is_flushing[i] = (TestRun.executor.run(f"ls /proc/{pid}").exit_code == 0)
with TestRun.step("Check number of dirty data on each cache."):
with TestRun.step("Check if each cache is full of dirty blocks"):
for cache in caches:
if int(cache.get_dirty_blocks()) > 0:
TestRun.LOGGER.error(f"The quantity of dirty cache lines on the cache "
f"{str(cache.cache_id)} after complete flush should be zero.")
if not cache.get_dirty_blocks() != core.size:
TestRun.fail(f"The cache {cache.cache_id} does not contain dirty blocks")
with TestRun.step("Stop all caches."):
casadm.stop_all_caches()
with TestRun.step("Start flush operation on all caches simultaneously"):
flush_pids = [
TestRun.executor.run_in_background(cli.flush_cache_cmd(str(cache.cache_id)))
for cache in caches
]
with TestRun.step("Wait for all caches to finish flushing"):
for flush_pid in flush_pids:
while TestRun.executor.check_if_process_exists(flush_pid):
sleep(1)
with TestRun.step("Check number of dirty data on each cache"):
for cache in caches:
if cache.get_dirty_blocks() > Size.zero():
TestRun.LOGGER.error(
f"The quantity of dirty cache lines on the cache "
f"{str(cache.cache_id)} after complete flush should be zero"
)

View File

@@ -1,7 +1,9 @@
#
# Copyright(c) 2020-2021 Intel Corporation
# Copyright(c) 2024 Huawei Technologies Co., Ltd.
# SPDX-License-Identifier: BSD-3-Clause
#
import random
import pytest
@@ -21,35 +23,36 @@ cores_amount = 3
@pytest.mark.require_disk("core", DiskTypeLowerThan("cache"))
def test_remove_core_when_other_mounted_auto_numeration():
"""
title: |
Test for removing one core from the cache when the other core is mounted.
Cores are numerated automatically.
description: |
Test of the ability to remove the unmounted core from the cache when the other core
is mounted and its ID starts with a different digit.
pass_criteria:
- No system crash.
- Removing unmounted core finished with success.
title: Remove one core when other are mounted - auto-numerated.
description: |
Test of the ability to remove the unmounted core from the cache when the other core
is mounted and its ID starts with a different digit.
pass_criteria:
- No system crash.
- Removing unmounted core finished with success.
"""
with TestRun.step("Prepare devices."):
cache_device = TestRun.disks['cache']
with TestRun.step("Prepare cache and core devices"):
cache_device = TestRun.disks["cache"]
core_device = TestRun.disks["core"]
cache_device.create_partitions([Size(50, Unit.MebiByte)])
cache_part = cache_device.partitions[0]
core_device = TestRun.disks['core']
core_device.create_partitions([Size(200, Unit.MebiByte)] * cores_amount)
with TestRun.step("Start cache."):
cache = casadm.start_cache(cache_part, force=True)
with TestRun.step("Start cache"):
cache = casadm.start_cache(cache_device.partitions[0], force=True)
with TestRun.step("Add cores to cache and mount them except the first one."):
with TestRun.step(f"Add {cores_amount} cores to cache and mount them except the first one"):
free_core = cache.add_core(core_device.partitions[0])
mounted_cores = []
for i, part in enumerate(core_device.partitions[1:]):
part.create_filesystem(Filesystem.xfs)
mounted_cores.append(cache.add_core(part))
mounted_cores[i].mount(f"{mount_point}{cache.cache_id}-{mounted_cores[i].core_id}")
mounted_cores[i].mount(
mount_point=f"{mount_point}{cache.cache_id}-{mounted_cores[i].core_id}"
)
with TestRun.step("Remove the unmounted core."):
with TestRun.step("Remove the unmounted core"):
try:
cache.remove_core(free_core.core_id)
except CmdException as exc:
@@ -60,38 +63,42 @@ def test_remove_core_when_other_mounted_auto_numeration():
@pytest.mark.require_disk("core", DiskTypeLowerThan("cache"))
def test_remove_core_when_other_mounted_custom_numeration():
"""
title: |
Test for removing one core from the cache when the other core is mounted.
Cores have custom numeration, starting with the same digit.
description: |
Test of the ability to remove the unmounted core from the cache when the other core
is mounted and its ID starts with the same digit.
pass_criteria:
- No system crash.
- Removing unmounted core finished with success.
title: Remove one core when other are mounted - custom numeration.
description: |
Test of the ability to remove the unmounted core from the cache when the other core
is mounted and its ID starts with the same digit.
pass_criteria:
- No system crash.
- Removing unmounted core finished with success.
"""
with TestRun.step("Prepare devices."):
cache_device = TestRun.disks['cache']
with TestRun.step("Prepare cache and core devices"):
cache_device = TestRun.disks["cache"]
core_device = TestRun.disks["core"]
cache_device.create_partitions([Size(50, Unit.MebiByte)])
cache_part = cache_device.partitions[0]
core_device = TestRun.disks['core']
core_device.create_partitions([Size(200, Unit.MebiByte)] * cores_amount)
core_device.create_partitions([Size(200, Unit.MebiByte)] * 3)
with TestRun.step("Start cache."):
cache = casadm.start_cache(cache_part, force=True)
with TestRun.step("Start cache"):
cache = casadm.start_cache(cache_device.partitions[0], force=True)
with TestRun.step("Add cores to cache and mount them except the first one."):
with TestRun.step(f"Add {cores_amount} cores to cache and mount them except the first one"):
random_prefix = random.randint(1, 9)
random_interfix = random.randint(1, 9)
free_core = cache.add_core(core_device.partitions[0], random_prefix)
free_core = cache.add_core(core_dev=core_device.partitions[0], core_id=random_prefix)
mounted_cores = []
for i, part in enumerate(core_device.partitions[1:]):
part.create_filesystem(Filesystem.xfs)
mounted_cores.append(cache.add_core(part, f"{random_prefix}{random_interfix}{i}"))
mounted_cores[i].mount(f"{mount_point}{cache.cache_id}-{mounted_cores[i].core_id}")
mounted_cores.append(
cache.add_core(core_dev=part, core_id=int(f"{random_prefix}{random_interfix}{i}"))
)
mounted_cores[i].mount(
mount_point=f"{mount_point}{cache.cache_id}-{mounted_cores[i].core_id}"
)
with TestRun.step("Remove the unmounted core."):
with TestRun.step("Remove the unmounted core"):
try:
cache.remove_core(free_core.core_id)
except CmdException as exc:

View File

@@ -1,14 +1,14 @@
#
# Copyright(c) 2019-2021 Intel Corporation
# Copyright(c) 2024 Huawei Technologies Co., Ltd.
# SPDX-License-Identifier: BSD-3-Clause
#
import time
from datetime import timedelta
import pytest
from datetime import timedelta
from api.cas import casadm
from api.cas.cache_config import CacheMode
from core.test_run import TestRun
@@ -25,12 +25,12 @@ io_size = Size(10000, Unit.Blocks4096)
"cache_mode",
[
(CacheMode.WT, CacheMode.WB),
(CacheMode.WB, CacheMode.PT),
(CacheMode.WB, CacheMode.WT),
(CacheMode.PT, CacheMode.WT),
(CacheMode.WT, CacheMode.WA),
(CacheMode.WT, CacheMode.WO),
(CacheMode.WB, CacheMode.PT),
(CacheMode.WB, CacheMode.WT),
(CacheMode.WB, CacheMode.WO),
(CacheMode.PT, CacheMode.WT),
(CacheMode.PT, CacheMode.WO),
(CacheMode.WA, CacheMode.WO),
(CacheMode.WO, CacheMode.WT),
@@ -43,26 +43,31 @@ io_size = Size(10000, Unit.Blocks4096)
@pytest.mark.require_disk("core", DiskTypeLowerThan("cache"))
def test_cache_stop_and_load(cache_mode):
"""
title: Test for stopping and loading cache back with dynamic cache mode switching.
description: |
Validate the ability of the CAS to switch cache modes at runtime and
check if all of them are working properly after switching and
after stopping and reloading cache back.
Check also other parameters consistency after reload.
pass_criteria:
- In all cache modes data reads and writes are handled properly before and after reload.
- All cache parameters preserve their values after reload.
title: Test for stopping and loading cache back with dynamic cache mode switching.
description: |
Validate the ability of the CAS to switch cache modes at runtime and
check if all of them are working properly after switching and
after stopping and reloading cache back.
Check also other parameters consistency after reload.
pass_criteria:
- In all cache modes data reads and writes are handled properly before and after reload.
- All cache parameters preserve their values after reload.
"""
with TestRun.step("Partition cache and core devices"):
cache_dev, core_dev = storage_prepare()
cache_dev = TestRun.disks["cache"]
cache_dev.create_partitions([Size(1, Unit.GibiByte)])
core_dev = TestRun.disks["core"]
core_dev.create_partitions([Size(2, Unit.GibiByte)])
with TestRun.step(f"Start cache in {cache_mode[0]} mode"):
cache = casadm.start_cache(cache_dev, cache_mode[0], force=True)
with TestRun.step(f"Disable udev"):
Udev.disable()
with TestRun.step(f"Start cache in {cache_mode[0]} mode"):
cache = casadm.start_cache(cache_dev.partitions[0], cache_mode[0], force=True)
with TestRun.step("Add core to the cache"):
core = cache.add_core(core_dev)
core = cache.add_core(core_dev.partitions[0])
with TestRun.step(f"Change cache mode to {cache_mode[1]}"):
cache.set_cache_mode(cache_mode[1], flush=True)
@@ -73,7 +78,7 @@ def test_cache_stop_and_load(cache_mode):
with TestRun.step("Stop and load cache back"):
cache.stop()
cache = casadm.load_cache(cache_dev)
cache = casadm.load_cache(cache_dev.partitions[0])
with TestRun.step("Check parameters consistency"):
if check_cache_config != cache.get_cache_config():
@@ -95,18 +100,12 @@ def test_cache_stop_and_load(cache_mode):
)
TestRun.fail(f"Parameters do not match after reload:\n{failed_params}")
with TestRun.step(
f"Check if {cache_mode[1]} cache mode works properly after reload"
):
with TestRun.step(f"Check if {cache_mode[1]} cache mode works properly after reload"):
if cache_mode[1] == CacheMode.WA or cache_mode[1] == CacheMode.WO:
check_separated_read_write_after_reload(cache, core, cache_mode[1], io_size)
else:
check_cache_mode_operation(cache, core, cache_mode[1])
with TestRun.step("Stop all caches"):
casadm.stop_all_caches()
Udev.enable()
@pytest.mark.parametrize(
"cache_mode_1,cache_mode_2,flush",
@@ -138,26 +137,38 @@ def test_cache_stop_and_load(cache_mode):
@pytest.mark.require_disk("core", DiskTypeLowerThan("cache"))
def test_cache_mode_switching_during_io(cache_mode_1, cache_mode_2, flush, io_mode):
"""
title: Test for dynamic cache mode switching during IO.
description: |
Validate the ability of CAS to switch cache modes
during working IO on CAS device.
pass_criteria:
- Cache mode is switched without errors.
title: Test for dynamic cache mode switching during IO.
description: |
Validate the ability of CAS to switch cache modes
during working IO on CAS device.
pass_criteria:
- Cache mode is switched without errors.
"""
with TestRun.step("Partition cache and core devices"):
cache_dev, core_dev = storage_prepare()
with TestRun.step("Prepare cache and core devices"):
cache_dev = TestRun.disks["cache"]
core_dev = TestRun.disks["core"]
cache_dev.create_partitions([Size(1, Unit.GibiByte)])
core_dev.create_partitions([Size(2, Unit.GibiByte)])
with TestRun.step(f"Start cache in {cache_mode_1} mode"):
cache = casadm.start_cache(cache_dev, cache_mode_1, force=True)
cache = casadm.start_cache(
cache_dev=cache_dev.partitions[0], cache_mode=cache_mode_1, force=True
)
with TestRun.step("Add core to the cache"):
core = cache.add_core(core_dev)
core = cache.add_core(core_dev.partitions[0])
with TestRun.step("Run 'fio'"):
with TestRun.step("Run fio in background"):
fio = (
fio_prepare(core, io_mode)
Fio()
.create_command()
.io_engine(IoEngine.libaio)
.size(io_size)
.read_write(io_mode)
.target(core.path)
.direct(1)
.verify(VerifyMethod.sha1)
.run_time(timedelta(minutes=4))
.time_based()
@@ -168,9 +179,7 @@ def test_cache_mode_switching_during_io(cache_mode_1, cache_mode_2, flush, io_mo
with TestRun.step(
f"Change cache mode to {cache_mode_2} with flush cache option set to: {flush}"
):
cache_mode_switch_output = cache.set_cache_mode(cache_mode_2, flush)
if cache_mode_switch_output.exit_code != 0:
TestRun.fail("Cache mode switch failed!")
cache.set_cache_mode(cache_mode=cache_mode_2, flush=flush)
with TestRun.step(f"Check if cache mode has switched properly during IO"):
cache_mode_after_switch = cache.get_cache_mode()
@@ -188,162 +197,147 @@ def test_cache_mode_switching_during_io(cache_mode_1, cache_mode_2, flush, io_mo
casadm.stop_all_caches()
def storage_prepare():
cache_dev = TestRun.disks["cache"]
cache_dev.create_partitions([Size(1, Unit.GibiByte)])
core_dev = TestRun.disks["core"]
core_dev.create_partitions([Size(2, Unit.GibiByte)])
return cache_dev.partitions[0], core_dev.partitions[0]
def check_cache_mode_operation(cache, core, cache_mode):
cache.reset_counters()
if cache_mode == CacheMode.WT:
io_mode = ReadWrite.randwrite
run_io_and_verify(cache, core, io_mode)
if cache_mode == CacheMode.WB or cache_mode == CacheMode.PT:
io_mode = ReadWrite.randrw
run_io_and_verify(cache, core, io_mode)
if cache_mode == CacheMode.WA or cache_mode == CacheMode.WO:
io_mode = ReadWrite.randread
run_io_and_verify(cache, core, io_mode)
cache.reset_counters()
io_mode = ReadWrite.randwrite
run_io_and_verify(cache, core, io_mode)
match cache_mode:
case CacheMode.WT:
io_mode = ReadWrite.randwrite
run_io_and_verify(cache, core, io_mode)
case CacheMode.WB | CacheMode.PT:
io_mode = ReadWrite.randrw
run_io_and_verify(cache, core, io_mode)
case CacheMode.WA | CacheMode.WO:
io_mode = ReadWrite.randread
run_io_and_verify(cache, core, io_mode)
cache.reset_counters()
io_mode = ReadWrite.randwrite
run_io_and_verify(cache, core, io_mode)
def run_io_and_verify(cache, core, io_mode):
fio_prepare(core, io_mode).run()
sync()
cache_mode = cache.get_cache_mode()
cache_stats = cache.get_statistics()
core_stats = core.get_statistics()
if cache_mode == CacheMode.WB:
if (
core_stats.block_stats.core.writes.value != 0
or core_stats.block_stats.exp_obj.writes.value <= 0
):
TestRun.fail(
"Write-Back cache mode is not working properly! "
"There should be some writes to CAS device and none to the core."
)
if cache_mode == CacheMode.PT:
if (
cache_stats.block_stats.cache.writes.value != 0
or cache_stats.block_stats.cache.reads.value != 0
):
TestRun.fail(
"Pass-Through cache mode is not working properly! "
"There should be no reads or writes from/to cache."
)
if cache_mode == CacheMode.WT:
if cache_stats.block_stats.cache != cache_stats.block_stats.core:
TestRun.fail(
"Write-Through cache mode is not working properly! "
"'cache writes' and 'core writes' counts should be the same."
)
if cache_mode == CacheMode.WA:
if io_mode == ReadWrite.randread:
cache_block_stats = cache.get_statistics().block_stats
core_block_stats = core.get_statistics().block_stats
match cache_mode:
case CacheMode.WB:
if (
cache_stats.block_stats.cache.writes != io_size
or cache_stats.block_stats.core.reads != io_size
cache_block_stats.core.writes.value != 0
or cache_block_stats.exp_obj.writes.value <= 0
):
TestRun.fail(
"Write-Around cache mode is not working properly for data reads! "
"'cache writes' and 'core reads' should equal total data reads."
"Write-Back cache mode is not working properly! "
"There should be some writes to CAS device and none to the core"
)
if io_mode == ReadWrite.randwrite:
if cache_stats.block_stats.cache.writes != io_size:
TestRun.fail(
"Write-Around cache mode is not working properly for data writes! "
"There should be no writes to cache since previous read operation."
)
if cache_mode == CacheMode.WO:
if io_mode == ReadWrite.randread:
case CacheMode.PT:
if (
cache_stats.block_stats.cache.writes.value != 0
or cache_stats.block_stats.cache.reads.value != 0
cache_block_stats.cache.writes.value != 0
or cache_block_stats.cache.reads.value != 0
):
TestRun.fail(
"Write-Only cache mode is not working properly for data reads! "
"There should be no reads or writes from/to cache."
"Pass-Through cache mode is not working properly! "
"There should be no reads or writes from/to cache"
)
if io_mode == ReadWrite.randwrite:
if (
core_stats.block_stats.core.writes.value != 0
or core_stats.block_stats.exp_obj.writes != io_size
):
case CacheMode.WT:
if cache_block_stats.cache != cache_block_stats.core:
TestRun.fail(
"Write-Only cache mode is not working properly for data writes! "
"All writes should be passed to CAS device and none to the core."
"Write-Through cache mode is not working properly! "
"'cache writes' and 'core writes' counts should be the same"
)
case CacheMode.WA:
if io_mode == ReadWrite.randread:
if (
cache_block_stats.cache.writes != io_size
or cache_block_stats.core.reads != io_size
):
TestRun.fail(
"Write-Around cache mode is not working properly for data reads! "
"'cache writes' and 'core reads' should equal total data reads"
)
if io_mode == ReadWrite.randwrite:
if cache_block_stats.cache.writes != io_size:
TestRun.fail(
"Write-Around cache mode is not working properly for data writes! "
"There should be no writes to cache since previous read operation"
)
case CacheMode.WO:
if io_mode == ReadWrite.randread:
if (
cache_block_stats.cache.writes.value != 0
or cache_block_stats.cache.reads.value != 0
):
TestRun.fail(
"Write-Only cache mode is not working properly for data reads! "
"There should be no reads or writes from/to cache"
)
if io_mode == ReadWrite.randwrite:
if (
core_block_stats.core.writes.value != 0
or core_block_stats.exp_obj.writes != io_size
):
TestRun.fail(
"Write-Only cache mode is not working properly for data writes! "
"All writes should be passed to CAS device and none to the core"
)
def check_separated_read_write_after_reload(cache, core, cache_mode, io_size):
# io_size_after_reload should be set to a greater value then global io_size value
io_size_after_reload = Size(12000, Unit.Blocks4096)
if io_size_after_reload <= io_size:
TestRun.fail(
"io_size_after_reload value is not greater then global io_size value!"
)
TestRun.fail("io_size_after_reload value is not greater then global io_size value!")
io_mode = ReadWrite.randread
fio_prepare(core, io_mode, io_size_after_reload).run()
sync()
cache_stats = cache.get_statistics()
cache_block_stats = cache.get_statistics().block_stats
io_new_data = io_size_after_reload - io_size
if cache_mode == CacheMode.WA:
if (
cache_stats.block_stats.cache.writes != io_new_data
or cache_stats.block_stats.core.reads != io_new_data
cache_block_stats.cache.writes != io_new_data
or cache_block_stats.core.reads != io_new_data
):
TestRun.fail(
"Write-Around cache mode is not working properly for data reads after reload! "
"'cache writes' and 'core reads' should equal "
"the difference from previous data reads."
"the difference from previous data reads"
)
if cache_mode == CacheMode.WO:
if (
cache_stats.block_stats.cache.writes.value != 0
or cache_stats.block_stats.cache.reads != io_size
cache_block_stats.cache.writes != Size.zero()
or cache_block_stats.cache.reads != io_size
):
TestRun.fail(
"Write-Only cache mode is not working properly for data reads after reload! "
"There should be no writes to cache and reads "
"from cache should equal previous writes to it."
"from cache should equal previous writes to it"
)
cache.reset_counters()
io_mode = ReadWrite.randwrite
fio_prepare(core, io_mode, io_size_after_reload).run()
sync()
cache_stats = cache.get_statistics()
core_stats = core.get_statistics()
cache_block_stats = cache.get_statistics().block_stats
core_block_stats = core.get_statistics().block_stats
if cache_mode == CacheMode.WA:
if cache_stats.block_stats.cache.writes != io_size_after_reload:
TestRun.fail(
"Write-Around cache mode is not working properly for data writes after reload! "
"There should be no writes to cache since previous read operation."
)
if cache_mode == CacheMode.WO:
if (
core_stats.block_stats.core.writes.value != 0
or core_stats.block_stats.exp_obj.writes != io_size_after_reload
):
TestRun.fail(
"Write-Only cache mode is not working properly for data writes after reload! "
"All writes should be passed to CAS device and none to the core."
)
match cache_mode:
case CacheMode.WA:
if cache_block_stats.cache.writes != io_size_after_reload:
TestRun.fail(
"Write-Around cache mode is not working properly for data writes after reload! "
"There should be no writes to cache since previous read operation"
)
case CacheMode.WO:
if (
core_block_stats.core.writes != Size.zero()
or core_block_stats.exp_obj.writes != io_size_after_reload
):
TestRun.fail(
"Write-Only cache mode is not working properly for data writes after reload! "
"All writes should be passed to CAS device and none to the core"
)
def fio_prepare(core, io_mode, io_size=io_size):

View File

@@ -1,5 +1,6 @@
#
# Copyright(c) 2020-2022 Intel Corporation
# Copyright(c) 2024 Huawei Technologies Co., Ltd.
# SPDX-License-Identifier: BSD-3-Clause
#
@@ -15,41 +16,40 @@ from test_utils.size import Size, Unit
@pytest.mark.require_disk("core", DiskTypeLowerThan("cache"))
def test_remove_multilevel_core():
"""
title: Test of the ability to remove a core used in a multilevel cache.
description: |
Negative test if OpenCAS does not allow to remove a core when the related exported object
is used as a core device for another cache instance.
pass_criteria:
- No system crash.
- OpenCAS does not allow removing a core used in a multilevel cache instance.
title: Test of the ability to remove a core used in a multilevel cache.
description: |
Negative test if OpenCAS does not allow to remove a core when the related exported object
is used as a core device for another cache instance.
pass_criteria:
- No system crash.
- OpenCAS does not allow removing a core used in a multilevel cache instance.
"""
with TestRun.step("Prepare two devices for cache and one for core."):
cache_dev = TestRun.disks['cache']
with TestRun.step("Prepare cache and core devices"):
cache_dev = TestRun.disks["cache"]
core_dev = TestRun.disks["core"]
cache_dev.create_partitions([Size(512, Unit.MebiByte)] * 2)
cache_part1 = cache_dev.partitions[0]
cache_part2 = cache_dev.partitions[1]
core_dev = TestRun.disks['core']
core_dev.create_partitions([Size(1, Unit.GibiByte)])
core_dev = core_dev.partitions[0]
with TestRun.step("Start the first cache instance"):
cache1 = casadm.start_cache(cache_part1, force=True)
cache1 = casadm.start_cache(cache_dev.partitions[0], force=True)
with TestRun.step("Add a core device to the first cache instance."):
core1 = cache1.add_core(core_dev)
core1 = cache1.add_core(core_dev.partitions[0])
with TestRun.step("Start the second cache instance"):
cache2 = casadm.start_cache(cache_part2, force=True)
cache2 = casadm.start_cache(cache_dev.partitions[1], force=True)
with TestRun.step("Add the first cache's exported object as a core "
"to the second cache instance."):
with TestRun.step(
"Add the first cache's exported object as a core to the second cache instance."
):
cache2.add_core(core1)
with TestRun.step("Try to remove core from the first level cache."):
output = TestRun.executor.run_expect_fail(cli.remove_core_cmd(cache_id=str(cache1.cache_id),
core_id=str(core1.core_id),
force=True))
output = TestRun.executor.run_expect_fail(
cli.remove_core_cmd(
cache_id=str(cache1.cache_id), core_id=str(core1.core_id), force=True
)
)
cli_messages.check_stderr_msg(output, cli_messages.remove_multilevel_core)
with TestRun.step("Stop cache."):
casadm.stop_all_caches()

View File

@@ -1,15 +1,21 @@
#
# Copyright(c) 2020-2022 Intel Corporation
# Copyright(c) 2024 Huawei Technologies Co., Ltd.
# SPDX-License-Identifier: BSD-3-Clause
#
import os
import random
from time import sleep
import posixpath
import random
import pytest
from time import sleep
from api.cas import casadm
from api.cas.cache_config import CacheMode, SeqCutOffPolicy, CacheModeTrait
from api.cas.cache_config import (
CacheMode,
SeqCutOffPolicy,
CacheModeTrait,
)
from core.test_run_utils import TestRun
from storage_devices.disk import DiskTypeSet, DiskTypeLowerThan, DiskType
from test_tools.dd import Dd
@@ -19,45 +25,54 @@ from test_tools.fio.fio_param import IoEngine, ReadWrite
from test_utils.os_utils import Udev
from test_utils.size import Size, Unit
random_thresholds = random.sample(range(1028, 1024 ** 2, 4), 3)
random_thresholds = random.sample(range(1028, 1024**2, 4), 3)
random_stream_numbers = random.sample(range(2, 128), 3)
mount_point = "/mnt"
@pytest.mark.os_dependent
@pytest.mark.parametrizex("streams_number", [1, 128] + random_stream_numbers)
@pytest.mark.parametrizex("threshold",
[Size(1, Unit.MebiByte), Size(1, Unit.GibiByte)]
+ [Size(x, Unit.KibiByte) for x in random_thresholds])
@pytest.mark.parametrizex(
"threshold",
[Size(1, Unit.MebiByte), Size(1, Unit.GibiByte)]
+ [Size(x, Unit.KibiByte) for x in random_thresholds],
)
@pytest.mark.require_disk("cache", DiskTypeSet([DiskType.optane, DiskType.nand]))
@pytest.mark.require_disk("core", DiskTypeLowerThan("cache"))
def test_multistream_seq_cutoff_functional(threshold, streams_number):
def test_multistream_seq_cutoff_functional(streams_number, threshold):
"""
title: Functional test for multistream sequential cutoff
description: |
Testing if amount of data written to cache and core is correct after running sequential
writes from multiple streams with different sequential cut-off thresholds.
pass_criteria:
- Amount of data written to cache is equal to amount set with sequential cutoff threshold
- Amount of data written in pass-through is equal to io size run after reaching the
- Amount of data written to cache is equal to amount set with sequential cutoff threshold
- Amount of data written in pass-through is equal to io size run after reaching the
sequential cutoff threshold
"""
with TestRun.step("Start cache and add core device."):
cache_disk = TestRun.disks['cache']
core_disk = TestRun.disks['core']
cache = casadm.start_cache(cache_disk, CacheMode.WB, force=True)
with TestRun.step("Disable udev"):
Udev.disable()
with TestRun.step(f"Start cache in Write-Back"):
cache_disk = TestRun.disks["cache"]
core_disk = TestRun.disks["core"]
cache = casadm.start_cache(cache_disk, CacheMode.WB, force=True)
core = cache.add_core(core_disk)
with TestRun.step(f"Set seq-cutoff policy to always, threshold to {threshold} "
f"and reset statistics counters."):
with TestRun.step(
f"Set seq-cutoff policy to always, threshold to {threshold} "
f"and reset statistics counters"
):
core.set_seq_cutoff_policy(SeqCutOffPolicy.always)
core.set_seq_cutoff_threshold(threshold)
core.set_seq_cutoff_promotion_count(1)
core.reset_counters()
with TestRun.step(f"Run {streams_number} I/O streams with amount of sequential writes equal to "
f"seq-cutoff threshold value minus one 4k block."):
with TestRun.step(
f"Run {streams_number} I/O streams with amount of sequential writes equal to "
f"seq-cutoff threshold value minus one 4k block"
):
kib_between_streams = 100
range_step = int(threshold.get_value(Unit.KibiByte)) + kib_between_streams
max_range_offset = streams_number * range_step
@@ -69,68 +84,50 @@ def test_multistream_seq_cutoff_functional(threshold, streams_number):
TestRun.LOGGER.info(f"Statistics before I/O:\n{core_statistics_before}")
offset = Size(offsets[i], Unit.KibiByte)
run_dd(core.path, count=int(threshold.get_value(Unit.Blocks4096) - 1),
seek=int(offset.get_value(Unit.Blocks4096)))
run_dd(
core.path,
count=int(threshold.get_value(Unit.Blocks4096) - 1),
seek=int(offset.get_value(Unit.Blocks4096)),
)
core_statistics_after = core.get_statistics()
check_statistics(core_statistics_before,
core_statistics_after,
expected_pt=0,
expected_writes_to_cache=threshold - Size(1, Unit.Blocks4096))
check_statistics(
core_statistics_before,
core_statistics_after,
expected_pt=0,
expected_writes_to_cache=threshold - Size(1, Unit.Blocks4096),
)
core_statistics_before = core_statistics_after
with TestRun.step("Write random number of 4k block requests to each stream and check if all "
"writes were sent in pass-through mode."):
with TestRun.step(
"Write random number of 4k block requests to each stream and check if all "
"writes were sent in pass-through mode"
):
core_statistics_before = core.get_statistics()
random.shuffle(offsets)
for i in TestRun.iteration(range(0, len(offsets))):
TestRun.LOGGER.info(f"Statistics before second I/O:\n{core_statistics_before}")
additional_4k_blocks_writes = random.randint(1, kib_between_streams / 4)
additional_4k_blocks_writes = random.randint(1, kib_between_streams // 4)
offset = Size(offsets[i], Unit.KibiByte)
run_dd(
core.path, count=additional_4k_blocks_writes,
seek=int(offset.get_value(Unit.Blocks4096)
+ threshold.get_value(Unit.Blocks4096) - 1))
core.path,
count=additional_4k_blocks_writes,
seek=int(
offset.get_value(Unit.Blocks4096) + threshold.get_value(Unit.Blocks4096) - 1
),
)
core_statistics_after = core.get_statistics()
check_statistics(core_statistics_before,
core_statistics_after,
expected_pt=additional_4k_blocks_writes,
expected_writes_to_cache=Size.zero())
check_statistics(
core_statistics_before,
core_statistics_after,
expected_pt=additional_4k_blocks_writes,
expected_writes_to_cache=Size.zero(),
)
core_statistics_before = core_statistics_after
def check_statistics(stats_before, stats_after, expected_pt, expected_writes_to_cache):
TestRun.LOGGER.info(f"Statistics after I/O:\n{stats_after}")
writes_to_cache_before = stats_before.block_stats.cache.writes
writes_to_cache_after = stats_after.block_stats.cache.writes
pt_writes_before = stats_before.request_stats.pass_through_writes
pt_writes_after = stats_after.request_stats.pass_through_writes
actual_pt = pt_writes_after - pt_writes_before
actual_writes_to_cache = writes_to_cache_after - writes_to_cache_before
if actual_pt != expected_pt:
TestRun.LOGGER.error(f"Expected pass-through writes: {expected_pt}\n"
f"Actual pass-through writes: {actual_pt}")
if actual_writes_to_cache != expected_writes_to_cache:
TestRun.LOGGER.error(
f"Expected writes to cache: {expected_writes_to_cache}\n"
f"Actual writes to cache: {actual_writes_to_cache}")
def run_dd(target_path, count, seek):
dd = Dd() \
.input("/dev/zero") \
.output(target_path) \
.block_size(Size(1, Unit.Blocks4096)) \
.count(count) \
.oflag("direct") \
.seek(seek)
dd.run()
TestRun.LOGGER.info(f"dd command:\n{dd}")
@pytest.mark.os_dependent
@pytest.mark.parametrizex("streams_seq_rand", [(64, 64), (64, 192)])
@pytest.mark.require_disk("cache", DiskTypeSet([DiskType.optane, DiskType.nand]))
@@ -146,31 +143,41 @@ def test_multistream_seq_cutoff_stress_raw(streams_seq_rand):
pass_criteria:
- No system crash
"""
with TestRun.step("Start cache and add core device."):
cache_disk = TestRun.disks['cache']
core_disk = TestRun.disks['core']
with TestRun.step("Prepare cache and core devices"):
cache_disk = TestRun.disks["cache"]
core_disk = TestRun.disks["core"]
cache_disk.create_partitions([Size(1.5, Unit.GibiByte)])
cache_dev = cache_disk.partitions[0]
cache = casadm.start_cache(cache_dev, CacheMode.WB, force=True)
with TestRun.step(f"Disable udev"):
Udev.disable()
with TestRun.step(f"Start cache in Write-Back mode and add core"):
cache = casadm.start_cache(
cache_dev=cache_disk.partitions[0], cache_mode=CacheMode.WB, force=True
)
core = cache.add_core(core_disk)
with TestRun.step(f"Set seq-cutoff policy to always and threshold to 512KiB."):
with TestRun.step(f"Set seq-cutoff policy to always and threshold to 512KiB"):
core.set_seq_cutoff_policy(SeqCutOffPolicy.always)
core.set_seq_cutoff_threshold(Size(512, Unit.KibiByte))
with TestRun.step("Reset core statistics counters."):
with TestRun.step("Reset core statistics counters"):
core.reset_counters()
with TestRun.step("Run I/O"):
with TestRun.step("Run FIO on core device"):
stream_size = min(core_disk.size / 256, Size(256, Unit.MebiByte))
sequential_streams = streams_seq_rand[0]
random_streams = streams_seq_rand[1]
fio = (Fio().create_command()
.io_engine(IoEngine.libaio)
.block_size(Size(1, Unit.Blocks4096))
.direct()
.offset_increment(stream_size))
fio = (
Fio()
.create_command()
.io_engine(IoEngine.libaio)
.block_size(Size(1, Unit.Blocks4096))
.direct()
.offset_increment(stream_size)
)
for i in range(0, sequential_streams + random_streams):
fio_job = fio.add_job(job_name=f"stream_{i}")
@@ -181,8 +188,8 @@ def test_multistream_seq_cutoff_stress_raw(streams_seq_rand):
else:
fio_job.read_write(ReadWrite.randwrite)
pid = fio.run_in_background()
while TestRun.executor.check_if_process_exists(pid):
fio_pid = fio.run_in_background()
while TestRun.executor.check_if_process_exists(fio_pid):
sleep(5)
TestRun.LOGGER.info(f"{core.get_statistics()}")
@@ -199,46 +206,51 @@ def test_multistream_seq_cutoff_stress_fs(streams_seq_rand, filesystem, cache_mo
description: |
Testing the stability of a system when there are multiple sequential and random I/O streams
running against the exported object with a filesystem when the sequential cutoff policy is
set to always and the sequential cutoff threshold is set to a value which is able
to be reached by sequential I/O streams.
set to always and the sequential cutoff threshold is configured to a value that can be
achieved by sequential I/O streams.
pass_criteria:
- No system crash
"""
mount_point = "/mnt"
with TestRun.step("Prepare devices. Create filesystem on core device."):
cache_disk = TestRun.disks['cache']
core_disk = TestRun.disks['core']
with TestRun.step(f"Disable udev"):
Udev.disable()
with TestRun.step("Create filesystem on core device"):
cache_disk = TestRun.disks["cache"]
core_disk = TestRun.disks["core"]
core_disk.create_filesystem(filesystem)
with TestRun.step("Start cache and add core."):
cache = casadm.start_cache(cache_disk, cache_mode, force=True)
Udev.disable()
core = cache.add_core(core_disk)
with TestRun.step("Start cache and add core"):
cache = casadm.start_cache(cache_dev=cache_disk, cache_mode=cache_mode, force=True)
core = cache.add_core(core_dev=core_disk)
with TestRun.step("Mount core."):
core.mount(mount_point)
with TestRun.step("Mount core"):
core.mount(mount_point=mount_point)
with TestRun.step(f"Set seq-cutoff policy to always and threshold to 20MiB."):
core.set_seq_cutoff_policy(SeqCutOffPolicy.always)
core.set_seq_cutoff_threshold(Size(20, Unit.MebiByte))
with TestRun.step(f"Set seq-cutoff policy to always and threshold to 20MiB"):
core.set_seq_cutoff_policy(policy=SeqCutOffPolicy.always)
core.set_seq_cutoff_threshold(threshold=Size(20, Unit.MebiByte))
with TestRun.step("Reset core statistics counters."):
with TestRun.step("Reset core statistic counters"):
core.reset_counters()
with TestRun.step("Run I/O"):
with TestRun.step("Run fio on exported object"):
sequential_streams = streams_seq_rand[0]
random_streams = streams_seq_rand[1]
stream_size = core_disk.size / 256
fio = (Fio().create_command()
.io_engine(IoEngine.libaio)
.block_size(Size(1, Unit.Blocks4096))
.direct()
.offset_increment(stream_size))
fio = (
Fio()
.create_command()
.io_engine(IoEngine.libaio)
.block_size(Size(1, Unit.Blocks4096))
.direct()
.offset_increment(stream_size)
)
for i in range(0, sequential_streams + random_streams):
fio_job = fio.add_job(job_name=f"stream_{i}")
fio_job.size(stream_size)
fio_job.target(os.path.join(mount_point, f"file_{i}"))
fio_job.target(posixpath.join(mount_point, f"file_{i}"))
if i < sequential_streams:
fio_job.read_write(ReadWrite.write)
else:
@@ -248,3 +260,38 @@ def test_multistream_seq_cutoff_stress_fs(streams_seq_rand, filesystem, cache_mo
while TestRun.executor.check_if_process_exists(pid):
sleep(5)
TestRun.LOGGER.info(f"{core.get_statistics()}")
def run_dd(target_path, count, seek):
dd = (
Dd()
.input("/dev/zero")
.output(target_path)
.block_size(Size(1, Unit.Blocks4096))
.count(count)
.oflag("direct")
.seek(seek)
)
dd.run()
TestRun.LOGGER.info(f"dd command:\n{dd}")
def check_statistics(stats_before, stats_after, expected_pt, expected_writes_to_cache):
TestRun.LOGGER.info(f"Statistics after I/O:\n{stats_after}")
writes_to_cache_before = stats_before.block_stats.cache.writes
writes_to_cache_after = stats_after.block_stats.cache.writes
pt_writes_before = stats_before.request_stats.pass_through_writes
pt_writes_after = stats_after.request_stats.pass_through_writes
actual_pt = pt_writes_after - pt_writes_before
actual_writes_to_cache = writes_to_cache_after - writes_to_cache_before
if actual_pt != expected_pt:
TestRun.LOGGER.error(
f"Expected pass-through writes: {expected_pt}\n"
f"Actual pass-through writes: {actual_pt}"
)
if actual_writes_to_cache != expected_writes_to_cache:
TestRun.LOGGER.error(
f"Expected writes to cache: {expected_writes_to_cache}\n"
f"Actual writes to cache: {actual_writes_to_cache}"
)

View File

@@ -1,14 +1,13 @@
#
# Copyright(c) 2019-2021 Intel Corporation
# Copyright(c) 2024 Huawei Technologies Co., Ltd.
# SPDX-License-Identifier: BSD-3-Clause
#
import random
from enum import Enum, auto
import pytest
from enum import Enum, auto
from api.cas import casadm
from api.cas.cache_config import SeqCutOffPolicy, CacheMode, CacheLineSize
from api.cas.core import SEQ_CUTOFF_THRESHOLD_MAX
@@ -26,21 +25,19 @@ class VerifyType(Enum):
EQUAL = auto()
@pytest.mark.parametrize("thresholds_list", [[
random.randint(1, int(SEQ_CUTOFF_THRESHOLD_MAX.get_value(Unit.KibiByte))),
random.randint(1, int(SEQ_CUTOFF_THRESHOLD_MAX.get_value(Unit.KibiByte))),
random.randint(1, int(SEQ_CUTOFF_THRESHOLD_MAX.get_value(Unit.KibiByte))),
random.randint(1, int(SEQ_CUTOFF_THRESHOLD_MAX.get_value(Unit.KibiByte))),
]])
@pytest.mark.parametrize("cache_mode, io_type, io_type_last", [
(CacheMode.WB, ReadWrite.write, ReadWrite.randwrite),
(CacheMode.WT, ReadWrite.write, ReadWrite.randwrite),
(CacheMode.WA, ReadWrite.read, ReadWrite.randread),
(CacheMode.WO, ReadWrite.write, ReadWrite.randwrite)])
@pytest.mark.parametrizex("cls", CacheLineSize)
@pytest.mark.parametrize(
"cache_mode, io_type, io_type_last",
[
(CacheMode.WB, ReadWrite.write, ReadWrite.randwrite),
(CacheMode.WT, ReadWrite.write, ReadWrite.randwrite),
(CacheMode.WO, ReadWrite.write, ReadWrite.randwrite),
(CacheMode.WA, ReadWrite.read, ReadWrite.randread),
],
)
@pytest.mark.parametrizex("cache_line_size", CacheLineSize)
@pytest.mark.require_disk("cache", DiskTypeSet([DiskType.optane, DiskType.nand]))
@pytest.mark.require_disk("core", DiskTypeLowerThan("cache"))
def test_seq_cutoff_multi_core(thresholds_list, cache_mode, io_type, io_type_last, cls):
def test_seq_cutoff_multi_core(cache_mode, io_type, io_type_last, cache_line_size):
"""
title: Sequential cut-off tests during sequential and random IO 'always' policy with 4 cores
description: |
@@ -48,87 +45,120 @@ def test_seq_cutoff_multi_core(thresholds_list, cache_mode, io_type, io_type_las
sequential cut-off thresholds on each core, while running sequential IO on 3 out of 4
cores and random IO against the last core, is correct.
pass_criteria:
- Amount of written blocks to cache is less or equal than amount set
with sequential cut-off threshold for three first cores.
- Amount of written blocks to cache is equal to io size run against last core.
- Amount of written blocks to cache is less or equal than amount set
with sequential cut-off threshold for three first cores.
- Amount of written blocks to cache is equal to io size run against last core.
"""
with TestRun.step(f"Test prepare (start cache (cache line size: {cls}) and add cores)"):
cache, cores = prepare(cores_count=len(thresholds_list), cache_line_size=cls)
writes_before = []
io_sizes = []
thresholds = []
with TestRun.step("Prepare cache and core devices"):
cache_device = TestRun.disks["cache"]
core_device = TestRun.disks["core"]
cache_device.create_partitions(
[(SEQ_CUTOFF_THRESHOLD_MAX * 4 + Size(value=5, unit=Unit.GibiByte))]
)
core_device.create_partitions(
[(SEQ_CUTOFF_THRESHOLD_MAX + Size(value=10, unit=Unit.GibiByte))] * 4
)
cache_part = cache_device.partitions[0]
core_parts = core_device.partitions
with TestRun.step("Disable udev"):
Udev.disable()
with TestRun.step(
f"Start cache in {cache_mode} mode and add {len(core_parts)} cores to the cache"
):
cache = casadm.start_cache(
cache_dev=cache_part, cache_mode=cache_mode, force=True, cache_line_size=cache_line_size
)
core_list = [cache.add_core(core_dev=core_part) for core_part in core_parts]
with TestRun.step("Set sequential cut-off parameters for all cores"):
writes_before_list = []
fio_additional_size = Size(10, Unit.Blocks4096)
for i in range(len(thresholds_list)):
thresholds.append(Size(thresholds_list[i], Unit.KibiByte))
io_sizes.append((thresholds[i] + fio_additional_size).align_down(0x1000))
thresholds_list = [
Size.generate_random_size(
min_size=1,
max_size=SEQ_CUTOFF_THRESHOLD_MAX.get_value(Unit.KibiByte),
unit=Unit.KibiByte,
)
for _ in core_list
]
io_sizes_list = [
(threshold_size + fio_additional_size).align_down(0x1000)
for threshold_size in thresholds_list
]
with TestRun.step(f"Setting cache mode to {cache_mode}"):
cache.set_cache_mode(cache_mode)
for i, core in TestRun.iteration(enumerate(cores), "Set sequential cut-off parameters for "
"all cores"):
with TestRun.step(f"Setting core sequential cut off policy to {SeqCutOffPolicy.always}"):
for core, threshold in zip(core_list, thresholds_list):
core.set_seq_cutoff_policy(SeqCutOffPolicy.always)
core.set_seq_cutoff_threshold(threshold)
with TestRun.step(f"Setting core sequential cut off threshold to {thresholds[i]}"):
core.set_seq_cutoff_threshold(thresholds[i])
with TestRun.step("Creating FIO command (one job per core)"):
with TestRun.step("Prepare sequential IO against first three cores"):
block_size = Size(4, Unit.KibiByte)
fio = (Fio().create_command()
.io_engine(IoEngine.libaio)
.block_size(block_size)
.direct())
fio = Fio().create_command().io_engine(IoEngine.libaio).block_size(block_size).direct(True)
# Run sequential IO against first three cores
for i, core in enumerate(cores[:-1]):
fio_job = fio.add_job(job_name=f"core_{core.core_id}")
fio_job.size(io_sizes[i])
for core, io_size in zip(core_list[:-1], io_sizes_list[:-1]):
fio_job = fio.add_job(f"core_{core.core_id}")
fio_job.size(io_size)
fio_job.read_write(io_type)
fio_job.target(core.path)
writes_before.append(core.get_statistics().block_stats.cache.writes)
writes_before_list.append(core.get_statistics().block_stats.cache.writes)
# Run random IO against the last core
fio_job = fio.add_job(job_name=f"core_{cores[-1].core_id}")
fio_job.size(io_sizes[-1])
with TestRun.step("Prepare random IO against the last core"):
fio_job = fio.add_job(f"core_{core_list[-1].core_id}")
fio_job.size(io_sizes_list[-1])
fio_job.read_write(io_type_last)
fio_job.target(cores[-1].path)
writes_before.append(cores[-1].get_statistics().block_stats.cache.writes)
fio_job.target(core_list[-1].path)
writes_before_list.append(core_list[-1].get_statistics().block_stats.cache.writes)
with TestRun.step("Running IO against all cores"):
with TestRun.step("Run fio against all cores"):
fio.run()
with TestRun.step("Verifying writes to cache count after IO"):
margins = []
for i, core in enumerate(cores[:-1]):
promotion_count = core.get_seq_cut_off_parameters().promotion_count
cutoff_threshold = thresholds[i]
margins.append(min(block_size * (promotion_count - 1), cutoff_threshold))
margin = sum(margins)
with TestRun.step("Verify writes to cache count after IO"):
margins = [
min(block_size * (core.get_seq_cut_off_parameters().promotion_count - 1), threshold)
for core, threshold in zip(core_list[:-1], thresholds_list[:-1])
]
margin = Size.zero()
for size in margins:
margin += size
for i, core in enumerate(cores[:-1]):
verify_writes_count(core, writes_before[i], thresholds[i], io_sizes[i],
VerifyType.POSITIVE, io_margin=margin)
for core, writes, threshold, io_size in zip(
core_list[:-1], writes_before_list[:-1], thresholds_list[:-1], io_sizes_list[:-1]
):
verify_writes_count(
core=core,
writes_before=writes,
threshold=threshold,
io_size=io_size,
ver_type=VerifyType.POSITIVE,
io_margin=margin,
)
verify_writes_count(cores[-1], writes_before[-1], thresholds[-1], io_sizes[-1],
VerifyType.EQUAL)
verify_writes_count(
core=core_list[-1],
writes_before=writes_before_list[-1],
threshold=thresholds_list[-1],
io_size=io_sizes_list[-1],
ver_type=VerifyType.EQUAL,
)
@pytest.mark.parametrize("thresholds_list", [[
random.randint(1, int(SEQ_CUTOFF_THRESHOLD_MAX.get_value(Unit.KibiByte))),
random.randint(1, int(SEQ_CUTOFF_THRESHOLD_MAX.get_value(Unit.KibiByte))),
random.randint(1, int(SEQ_CUTOFF_THRESHOLD_MAX.get_value(Unit.KibiByte))),
random.randint(1, int(SEQ_CUTOFF_THRESHOLD_MAX.get_value(Unit.KibiByte))),
]])
@pytest.mark.parametrize("cache_mode, io_type, io_type_last", [
(CacheMode.WB, ReadWrite.write, ReadWrite.randwrite),
(CacheMode.WT, ReadWrite.write, ReadWrite.randwrite),
(CacheMode.WA, ReadWrite.read, ReadWrite.randread),
(CacheMode.WO, ReadWrite.write, ReadWrite.randwrite)])
@pytest.mark.parametrizex("cls", CacheLineSize)
@pytest.mark.parametrize(
"cache_mode, io_type, io_type_last",
[
(CacheMode.WB, ReadWrite.write, ReadWrite.randwrite),
(CacheMode.WT, ReadWrite.write, ReadWrite.randwrite),
(CacheMode.WA, ReadWrite.read, ReadWrite.randread),
(CacheMode.WO, ReadWrite.write, ReadWrite.randwrite),
],
)
@pytest.mark.parametrizex("cache_line_size", CacheLineSize)
@pytest.mark.require_disk("cache", DiskTypeSet([DiskType.optane, DiskType.nand]))
@pytest.mark.require_disk("core", DiskTypeLowerThan("cache"))
def test_seq_cutoff_multi_core_io_pinned(thresholds_list, cache_mode, io_type, io_type_last, cls):
def test_seq_cutoff_multi_core_io_pinned(cache_mode, io_type, io_type_last, cache_line_size):
"""
title: Sequential cut-off tests during sequential and random IO 'always' policy with 4 cores
description: |
@@ -136,77 +166,120 @@ def test_seq_cutoff_multi_core_io_pinned(thresholds_list, cache_mode, io_type, i
sequential cut-off thresholds on each core, while running sequential IO, pinned,
on 3 out of 4 cores and random IO against the last core, is correct.
pass_criteria:
- Amount of written blocks to cache is less or equal than amount set
with sequential cut-off threshold for three first cores.
- Amount of written blocks to cache is equal to io size run against last core.
- Amount of written blocks to cache is less or equal than amount set
with sequential cut-off threshold for three first cores.
- Amount of written blocks to cache is equal to io size run against last core.
"""
with TestRun.step(f"Test prepare (start cache (cache line size: {cls}) and add cores)"):
cache, cores = prepare(cores_count=len(thresholds_list), cache_line_size=cls)
writes_before = []
io_sizes = []
thresholds = []
with TestRun.step("Partition cache and core devices"):
cache_device = TestRun.disks["cache"]
core_device = TestRun.disks["core"]
cache_device.create_partitions(
[(SEQ_CUTOFF_THRESHOLD_MAX * 4 + Size(value=5, unit=Unit.GibiByte))]
)
core_device.create_partitions(
[(SEQ_CUTOFF_THRESHOLD_MAX + Size(value=10, unit=Unit.GibiByte))] * 4
)
cache_part = cache_device.partitions[0]
core_parts = core_device.partitions
with TestRun.step("Disable udev"):
Udev.disable()
with TestRun.step(
f"Start cache in {cache_mode} mode and add {len(core_parts)} cores to the cache"
):
cache = casadm.start_cache(
cache_dev=cache_part,
cache_mode=cache_mode,
force=True,
cache_line_size=cache_line_size,
)
core_list = [cache.add_core(core_dev=core_part) for core_part in core_parts]
with TestRun.step(f"Set sequential cut-off parameters for all cores"):
writes_before_list = []
fio_additional_size = Size(10, Unit.Blocks4096)
for i in range(len(thresholds_list)):
thresholds.append(Size(thresholds_list[i], Unit.KibiByte))
io_sizes.append((thresholds[i] + fio_additional_size).align_down(0x1000))
thresholds_list = [
Size.generate_random_size(
min_size=1,
max_size=SEQ_CUTOFF_THRESHOLD_MAX.get_value(Unit.KibiByte),
unit=Unit.KibiByte,
)
for _ in core_list
]
io_sizes_list = [
(threshold_size + fio_additional_size).align_down(0x1000)
for threshold_size in thresholds_list
]
with TestRun.step(f"Setting cache mode to {cache_mode}"):
cache.set_cache_mode(cache_mode)
for i, core in TestRun.iteration(enumerate(cores), "Set sequential cut-off parameters for "
"all cores"):
with TestRun.step(f"Setting core sequential cut off policy to {SeqCutOffPolicy.always}"):
for core, threshold in zip(core_list, thresholds_list):
core.set_seq_cutoff_policy(SeqCutOffPolicy.always)
core.set_seq_cutoff_threshold(threshold)
with TestRun.step(f"Setting core sequential cut off threshold to {thresholds[i]}"):
core.set_seq_cutoff_threshold(thresholds[i])
with TestRun.step("Creating FIO command (one job per core)"):
fio = (Fio().create_command()
.io_engine(IoEngine.libaio)
.block_size(Size(1, Unit.Blocks4096))
.direct()
.cpus_allowed(get_dut_cpu_physical_cores())
.cpus_allowed_policy(CpusAllowedPolicy.split))
with TestRun.step("Prepare sequential IO against first three cores"):
fio = (
Fio()
.create_command()
.io_engine(IoEngine.libaio)
.block_size(Size(1, Unit.Blocks4096))
.direct(True)
.cpus_allowed(get_dut_cpu_physical_cores())
.cpus_allowed_policy(CpusAllowedPolicy.split)
)
# Run sequential IO against first three cores
for i, core in enumerate(cores[:-1]):
for core, io_size in zip(core_list[:-1], io_sizes_list[:-1]):
fio_job = fio.add_job(job_name=f"core_{core.core_id}")
fio_job.size(io_sizes[i])
fio_job.size(io_size)
fio_job.read_write(io_type)
fio_job.target(core.path)
writes_before.append(core.get_statistics().block_stats.cache.writes)
writes_before_list.append(core.get_statistics().block_stats.cache.writes)
# Run random IO against the last core
fio_job = fio.add_job(job_name=f"core_{cores[-1].core_id}")
fio_job.size(io_sizes[-1])
fio_job = fio.add_job(job_name=f"core_{core_list[-1].core_id}")
fio_job.size(io_sizes_list[-1])
fio_job.read_write(io_type_last)
fio_job.target(cores[-1].path)
writes_before.append(cores[-1].get_statistics().block_stats.cache.writes)
fio_job.target(core_list[-1].path)
writes_before_list.append(core_list[-1].get_statistics().block_stats.cache.writes)
with TestRun.step("Running IO against all cores"):
fio.run()
with TestRun.step("Verifying writes to cache count after IO"):
for i, core in enumerate(cores[:-1]):
verify_writes_count(core, writes_before[i], thresholds[i], io_sizes[i],
VerifyType.POSITIVE)
for core, writes, threshold, io_size in zip(
core_list[:-1], writes_before_list[:-1], thresholds_list[:-1], io_sizes_list[:-1]
):
verify_writes_count(
core=core,
writes_before=writes,
threshold=threshold,
io_size=io_size,
ver_type=VerifyType.POSITIVE,
)
verify_writes_count(cores[-1], writes_before[-1], thresholds[-1], io_sizes[-1],
VerifyType.EQUAL)
verify_writes_count(
core=core_list[-1],
writes_before=writes_before_list[-1],
threshold=thresholds_list[-1],
io_size=io_sizes_list[-1],
ver_type=VerifyType.EQUAL,
)
@pytest.mark.parametrize("threshold_param", [
random.randint(1, int(SEQ_CUTOFF_THRESHOLD_MAX.get_value(Unit.KibiByte)))
])
@pytest.mark.parametrize("policy, verify_type", [(SeqCutOffPolicy.never, VerifyType.NEGATIVE),
(SeqCutOffPolicy.always, VerifyType.POSITIVE),
(SeqCutOffPolicy.full, VerifyType.NEGATIVE)])
@pytest.mark.parametrizex("cls", CacheLineSize)
@pytest.mark.parametrize(
"policy, verify_type",
[
(SeqCutOffPolicy.never, VerifyType.NEGATIVE),
(SeqCutOffPolicy.always, VerifyType.POSITIVE),
(SeqCutOffPolicy.full, VerifyType.NEGATIVE),
],
)
@pytest.mark.parametrizex("cache_line_size", CacheLineSize)
@pytest.mark.parametrizex("io_dir", [ReadWrite.write, ReadWrite.read])
@pytest.mark.require_disk("cache", DiskTypeSet([DiskType.optane, DiskType.nand]))
@pytest.mark.require_disk("core", DiskTypeLowerThan("cache"))
def test_seq_cutoff_thresh(threshold_param, cls, io_dir, policy, verify_type):
def test_seq_cutoff_thresh(cache_line_size, io_dir, policy, verify_type):
"""
title: Sequential cut-off tests for writes and reads for 'never', 'always' and 'full' policies
description: |
@@ -215,47 +288,80 @@ def test_seq_cutoff_thresh(threshold_param, cls, io_dir, policy, verify_type):
is valid for sequential cut-off threshold parameter, assuming that cache occupancy
doesn't reach 100% during test.
pass_criteria:
- Amount of written blocks to cache is less or equal than amount set
with sequential cut-off parameter in case of 'always' policy.
- Amount of written blocks to cache is at least equal io size in case of 'never' and 'full'
policy.
- Amount of written blocks to cache is less or equal than amount set
with sequential cut-off parameter in case of 'always' policy.
- Amount of written blocks to cache is at least equal io size in case of 'never' and 'full'
policy.
"""
with TestRun.step(f"Test prepare (start cache (cache line size: {cls}) and add cores)"):
cache, cores = prepare(cores_count=1, cache_line_size=cls)
with TestRun.step("Partition cache and core devices"):
cache_device = TestRun.disks["cache"]
core_device = TestRun.disks["core"]
cache_device.create_partitions(
[(SEQ_CUTOFF_THRESHOLD_MAX * 4 + Size(value=5, unit=Unit.GibiByte))]
)
core_device.create_partitions(
[(SEQ_CUTOFF_THRESHOLD_MAX + Size(value=10, unit=Unit.GibiByte))]
)
cache_part = cache_device.partitions[0]
core_part = core_device.partitions[0]
with TestRun.step("Disable udev"):
Udev.disable()
with TestRun.step(f"Start cache and add core"):
cache = casadm.start_cache(
cache_dev=cache_part,
force=True,
cache_line_size=cache_line_size,
)
core = cache.add_core(core_dev=core_part)
fio_additional_size = Size(10, Unit.Blocks4096)
threshold = Size(threshold_param, Unit.KibiByte)
threshold = Size.generate_random_size(
min_size=1,
max_size=SEQ_CUTOFF_THRESHOLD_MAX.get_value(Unit.KibiByte),
unit=Unit.KibiByte,
)
io_size = (threshold + fio_additional_size).align_down(0x1000)
with TestRun.step(f"Setting cache sequential cut off policy mode to {policy}"):
cache.set_seq_cutoff_policy(policy)
with TestRun.step(f"Setting cache sequential cut off policy threshold to "
f"{threshold}"):
with TestRun.step(f"Setting cache sequential cut off policy threshold to {threshold}"):
cache.set_seq_cutoff_threshold(threshold)
with TestRun.step(f"Running sequential IO ({io_dir})"):
with TestRun.step("Prepare sequential IO against core"):
sync()
writes_before = cores[0].get_statistics().block_stats.cache.writes
(Fio().create_command()
.io_engine(IoEngine.libaio)
.size(io_size)
.read_write(io_dir)
.target(f"{cores[0].path}")
.direct()
).run()
writes_before = core.get_statistics().block_stats.cache.writes
fio = (
Fio()
.create_command()
.io_engine(IoEngine.libaio)
.size(io_size)
.read_write(io_dir)
.target(f"{core.path}")
.direct()
)
with TestRun.step("Run fio"):
fio.run()
with TestRun.step("Verify writes to cache count"):
verify_writes_count(cores[0], writes_before, threshold, io_size, verify_type)
verify_writes_count(
core=core,
writes_before=writes_before,
threshold=threshold,
io_size=io_size,
ver_type=verify_type,
)
@pytest.mark.parametrize("threshold_param", [
random.randint(1, int(SEQ_CUTOFF_THRESHOLD_MAX.get_value(Unit.KibiByte)))
])
@pytest.mark.parametrizex("cls", CacheLineSize)
@pytest.mark.parametrizex("cache_line_size", CacheLineSize)
@pytest.mark.parametrizex("io_dir", [ReadWrite.write, ReadWrite.read])
@pytest.mark.require_disk("cache", DiskTypeSet([DiskType.optane, DiskType.nand]))
@pytest.mark.require_disk("core", DiskTypeLowerThan("cache"))
def test_seq_cutoff_thresh_fill(threshold_param, cls, io_dir):
def test_seq_cutoff_thresh_fill(cache_line_size, io_dir):
"""
title: Sequential cut-off tests during writes and reads on full cache for 'full' policy
description: |
@@ -263,93 +369,116 @@ def test_seq_cutoff_thresh_fill(threshold_param, cls, io_dir):
cache for 'full' sequential cut-off policy with cache configured with different cache
line sizes is valid for sequential cut-off threshold parameter.
pass_criteria:
- Amount of written blocks to cache is big enough to fill cache when 'never' sequential
cut-off policy is set
- Amount of written blocks to cache is less or equal than amount set
with sequential cut-off parameter in case of 'full' policy.
- Amount of written blocks to cache is big enough to fill cache when 'never' sequential
cut-off policy is set
- Amount of written blocks to cache is less or equal than amount set
with sequential cut-off parameter in case of 'full' policy.
"""
with TestRun.step(f"Test prepare (start cache (cache line size: {cls}) and add cores)"):
cache, cores = prepare(cores_count=1, cache_line_size=cls)
with TestRun.step("Partition cache and core devices"):
cache_device = TestRun.disks["cache"]
core_device = TestRun.disks["core"]
cache_device.create_partitions(
[(SEQ_CUTOFF_THRESHOLD_MAX + Size(value=5, unit=Unit.GibiByte))]
)
core_device.create_partitions(
[(SEQ_CUTOFF_THRESHOLD_MAX + Size(value=10, unit=Unit.GibiByte))]
)
cache_part = cache_device.partitions[0]
core_part = core_device.partitions[0]
with TestRun.step("Disable udev"):
Udev.disable()
with TestRun.step(f"Start cache and add core"):
cache = casadm.start_cache(
cache_dev=cache_part,
force=True,
cache_line_size=cache_line_size,
)
core = cache.add_core(core_dev=core_part)
fio_additional_size = Size(10, Unit.Blocks4096)
threshold = Size(threshold_param, Unit.KibiByte)
threshold = Size.generate_random_size(
min_size=1,
max_size=SEQ_CUTOFF_THRESHOLD_MAX.get_value(Unit.KibiByte),
unit=Unit.KibiByte,
)
io_size = (threshold + fio_additional_size).align_down(0x1000)
with TestRun.step(f"Setting cache sequential cut off policy mode to "
f"{SeqCutOffPolicy.never}"):
with TestRun.step(f"Setting cache sequential cut off policy mode to {SeqCutOffPolicy.never}"):
cache.set_seq_cutoff_policy(SeqCutOffPolicy.never)
with TestRun.step("Filling cache (sequential writes IO with size of cache device)"):
with TestRun.step("Prepare sequential IO against core"):
sync()
(Fio().create_command()
.io_engine(IoEngine.libaio)
.size(cache.cache_device.size)
.read_write(io_dir)
.target(f"{cores[0].path}")
.direct()
).run()
fio = (
Fio()
.create_command()
.io_engine(IoEngine.libaio)
.size(cache.size)
.read_write(io_dir)
.target(f"{core.path}")
.direct()
)
with TestRun.step("Check if cache is filled enough (expecting occupancy not less than "
"95%)"):
occupancy = cache.get_statistics(percentage_val=True).usage_stats.occupancy
if occupancy < 95:
TestRun.fail(f"Cache occupancy is too small: {occupancy}, expected at least 95%")
with TestRun.step("Run fio"):
fio.run()
with TestRun.step(f"Setting cache sequential cut off policy mode to "
f"{SeqCutOffPolicy.full}"):
with TestRun.step("Check if cache is filled enough (expecting occupancy not less than 95%)"):
occupancy_percentage = cache.get_statistics(percentage_val=True).usage_stats.occupancy
if occupancy_percentage < 95:
TestRun.fail(
f"Cache occupancy is too small: {occupancy_percentage}, expected at least 95%"
)
with TestRun.step(f"Setting cache sequential cut off policy mode to {SeqCutOffPolicy.full}"):
cache.set_seq_cutoff_policy(SeqCutOffPolicy.full)
with TestRun.step(f"Setting cache sequential cut off policy threshold to "
f"{threshold}"):
with TestRun.step(f"Setting cache sequential cut off policy threshold to {threshold}"):
cache.set_seq_cutoff_threshold(threshold)
with TestRun.step(f"Running sequential IO ({io_dir})"):
sync()
writes_before = cores[0].get_statistics().block_stats.cache.writes
(Fio().create_command()
.io_engine(IoEngine.libaio)
.size(io_size)
.read_write(io_dir)
.target(f"{cores[0].path}")
.direct()
).run()
writes_before = core.get_statistics().block_stats.cache.writes
fio = (
Fio()
.create_command()
.io_engine(IoEngine.libaio)
.size(io_size)
.read_write(io_dir)
.target(f"{core.path}")
.direct()
)
with TestRun.step("Run fio"):
fio.run()
with TestRun.step("Verify writes to cache count"):
verify_writes_count(cores[0], writes_before, threshold, io_size, VerifyType.POSITIVE)
verify_writes_count(core, writes_before, threshold, io_size, VerifyType.POSITIVE)
def verify_writes_count(core, writes_before, threshold, io_size, ver_type=VerifyType.NEGATIVE,
io_margin=Size(8, Unit.KibiByte)):
def verify_writes_count(
core,
writes_before,
threshold,
io_size,
ver_type=VerifyType.NEGATIVE,
io_margin=Size(8, Unit.KibiByte),
):
writes_after = core.get_statistics().block_stats.cache.writes
writes_difference = writes_after - writes_before
if ver_type is VerifyType.NEGATIVE:
if writes_difference < io_size:
TestRun.fail(f"Wrong writes count: {writes_difference} , expected at least "
f"{io_size}")
elif ver_type is VerifyType.POSITIVE:
if writes_difference >= threshold + io_margin:
TestRun.fail(f"Wrong writes count: {writes_difference} , expected at most "
f"{threshold + io_margin}")
elif ver_type is VerifyType.EQUAL:
if writes_difference != io_size:
TestRun.fail(f"Wrong writes count: {writes_difference} , expected {io_size}")
def prepare(cores_count=1, cache_line_size: CacheLineSize = None):
cache_device = TestRun.disks['cache']
core_device = TestRun.disks['core']
cache_device.create_partitions(
[(SEQ_CUTOFF_THRESHOLD_MAX * cores_count + Size(5, Unit.GibiByte)).align_down(0x1000)])
partitions = \
[(SEQ_CUTOFF_THRESHOLD_MAX + Size(10, Unit.GibiByte)).align_down(0x1000)] * cores_count
core_device.create_partitions(partitions)
cache_part = cache_device.partitions[0]
core_parts = core_device.partitions
TestRun.LOGGER.info("Starting cache")
cache = casadm.start_cache(cache_part, force=True, cache_line_size=cache_line_size)
Udev.disable()
TestRun.LOGGER.info("Adding core devices")
core_list = []
for core_part in core_parts:
core_list.append(cache.add_core(core_dev=core_part))
return cache, core_list
match ver_type:
case VerifyType.NEGATIVE:
if writes_difference < io_size:
TestRun.fail(
f"Wrong writes count: {writes_difference} , expected at least {io_size}"
)
case VerifyType.POSITIVE:
if writes_difference >= threshold + io_margin:
TestRun.fail(
f"Wrong writes count: {writes_difference} , expected at most "
f"{threshold + io_margin}"
)
case VerifyType.EQUAL:
if writes_difference != io_size:
TestRun.fail(f"Wrong writes count: {writes_difference} , expected {io_size}")