tests: add test_load_corrupted

Signed-off-by: Michal Mielewczyk <michal.mielewczyk@intel.com>
This commit is contained in:
Michal Mielewczyk 2022-04-19 13:25:02 +02:00
parent ab99c08504
commit 5dd65bb64b

View File

@ -15,7 +15,7 @@ from storage_devices.disk import DiskType, DiskTypeSet, DiskTypeLowerThan
from test_utils.size import Size, Unit
from api.cas.cli_messages import check_stderr_msg, missing_param, disallowed_param
from api.cas.cache_config import CacheLineSize, CacheMode
from api.cas.cli import standby_activate_cmd
from api.cas.cli import standby_activate_cmd, standby_load_cmd
from api.cas.ioclass_config import IoClass
from test_tools.dd import Dd
from test_utils.os_utils import sync
@ -81,6 +81,50 @@ def test_activate_corrupted():
md_dump.remove()
@pytest.mark.require_disk("cache", DiskTypeSet([DiskType.nand, DiskType.optane]))
@pytest.mark.require_disk("core", DiskTypeLowerThan("cache"))
def test_load_corrupted():
"""
title: Standby-load corrupted metadata
description: |
Try to load standby instance from corrupted metadata
pass_criteria:
- Kernel panic doesn't occur
"""
with TestRun.step("Prepare devices for the cache and core."):
cache_device = TestRun.disks["cache"]
cache_device.create_partitions([Size(200, Unit.MebiByte)])
cache_device = cache_device.partitions[0]
core_device = TestRun.disks["core"]
core_device.create_partitions([Size(500, Unit.MebiByte)])
core_device = core_device.partitions[0]
with TestRun.step("Prepare metadata dump"):
cache_id = 1
cls = CacheLineSize.LINE_32KiB
md_dump = prepare_md_dump(cache_device, core_device, cls, cache_id)
for offset in get_offsets_to_corrupt(md_dump.size, block_size):
with TestRun.step(f"Corrupt {block_size} on the offset {offset*block_size}"):
corrupted_md = prepare_corrupted_md(md_dump, offset, block_size)
with TestRun.step(f"Copy corrupted metadata to the cache-to-be device"):
Dd().input(corrupted_md.full_path).output(cache_device.path).run()
sync()
with TestRun.step("Try to load cache instance"):
output = TestRun.executor.run(standby_load_cmd(cache_dev=cache_device.path))
with TestRun.step("Per iteration cleanup"):
if output.exit_code:
casadm.stop_all_caches()
corrupted_md.remove(force=True, ignore_errors=True)
with TestRun.step("Test cleanup"):
md_dump.remove()
def get_offsets_to_corrupt(md_size, bs, count=100):
offsets = list(range(0, int(md_size.value), bs.value))
offsets = random.choices(offsets, k=min(len(offsets), count))