open-cas-linux/test/functional/tests/stats/test_stat_max.py
Katarzyna Treder 8a5d531a32 OS tools refactor
Signed-off-by: Katarzyna Treder <katarzyna.treder@h-partners.com>
2024-12-11 07:58:50 +01:00

179 lines
6.5 KiB
Python

#
# Copyright(c) 2020-2021 Intel Corporation
# Copyright(c) 2024 Huawei Technologies Co., Ltd.
# SPDX-License-Identifier: BSD-3-Clause
#
from time import sleep
import pytest
from api.cas import casadm
from api.cas.cache_config import CacheMode, CleaningPolicy
from api.cas.casadm import StatsFilter
from api.cas.statistics import get_stats_dict, get_stat_value
from core.test_run import TestRun
from storage_devices.disk import DiskType, DiskTypeSet, DiskTypeLowerThan
from test_tools.fio.fio import Fio
from test_tools.fio.fio_param import ReadWrite, IoEngine
from test_tools.udev import Udev
from types.size import Size, Unit
cache_size = Size(1, Unit.GibiByte)
core_size = Size(2, Unit.GibiByte)
io_size = Size(10, Unit.MebiByte)
stat_filter = [StatsFilter.usage, StatsFilter.req, StatsFilter.blk]
@pytest.mark.require_disk("cache", DiskTypeSet([DiskType.optane, DiskType.nand]))
@pytest.mark.require_disk("core", DiskTypeLowerThan("cache"))
def test_stat_max_cache():
"""
title: CAS statistics values for maximum cache devices.
description: |
Check CAS ability to display correct values in statistics
for 16 cache devices per cache mode.
pass_criteria:
- Cores' statistics match cache's statistics.
"""
caches_per_cache_mode = 16
cores_per_cache = 1
caches_count = caches_per_cache_mode * len(CacheMode)
with TestRun.step(
f"Create {caches_count} cache and "
f"{cores_per_cache * caches_count} core partitions"
):
cache_dev = TestRun.disks["cache"]
cache_parts = [cache_size] * caches_count
cache_dev.create_partitions(cache_parts)
core_dev = TestRun.disks["core"]
core_parts = [core_size] * cores_per_cache * caches_count
core_dev.create_partitions(core_parts)
Udev.disable()
with TestRun.step(
f"Start {caches_count} caches ({caches_per_cache_mode} for "
f"every cache mode) and add {cores_per_cache} core(s) per cache"
):
caches = []
cores = [[] for i in range(caches_count)]
for i, cache_mode in enumerate(CacheMode):
for j in range(caches_per_cache_mode):
cache_partition_number = i * caches_per_cache_mode + j
caches.append(casadm.start_cache(
cache_dev.partitions[cache_partition_number],
cache_mode=cache_mode,
force=True
))
for i in range(caches_count):
caches[i].set_cleaning_policy(CleaningPolicy.nop)
for j in range(cores_per_cache):
core_partition_number = i * cores_per_cache + j
cores[i].append(
caches[i].add_core(core_dev.partitions[core_partition_number])
)
with TestRun.step("Run 'fio'"):
fio = fio_prepare()
for i in range(caches_count):
for j in range(cores_per_cache):
fio.add_job().target(cores[i][j].path)
fio.run()
sleep(3)
with TestRun.step("Check if cache's statistics match cores' statistics"):
for i in range(caches_count):
cache_stats = get_stats_dict(filter=stat_filter, cache_id=caches[i].cache_id)
cores_stats = [
get_stats_dict(
filter=stat_filter, cache_id=caches[i].cache_id, core_id=cores[i][j].core_id
) for j in range(cores_per_cache)
]
fail_message = f"For cache ID {caches[i].cache_id} ({caches[i].get_cache_mode()}) "
stats_compare(cache_stats, cores_stats, cores_per_cache, fail_message)
@pytest.mark.require_disk("cache", DiskTypeSet([DiskType.optane, DiskType.nand]))
@pytest.mark.require_disk("core", DiskTypeLowerThan("cache"))
@pytest.mark.parametrizex("cache_mode", CacheMode)
def test_stat_max_core(cache_mode):
"""
title: CAS statistics values for maximum core devices.
description: |
Check CAS ability to display correct values in statistics
for 62 core devices.
pass_criteria:
- Cores' statistics match cache's statistics.
"""
cores_per_cache = 62
with TestRun.step(f"Create 1 cache and {cores_per_cache} core partitions"):
cache_dev = TestRun.disks["cache"]
cache_dev.create_partitions([cache_size])
core_dev = TestRun.disks["core"]
core_parts = [core_size] * cores_per_cache
core_dev.create_partitions(core_parts)
Udev.disable()
with TestRun.step(f"Start cache in {cache_mode} cache mode and add {cores_per_cache} cores"):
cache = casadm.start_cache(
cache_dev.partitions[0], cache_mode=cache_mode, force=True
)
cache.set_cleaning_policy(CleaningPolicy.nop)
cores = []
for j in range(cores_per_cache):
cores.append(cache.add_core(core_dev.partitions[j]))
with TestRun.step("Run 'fio'"):
fio = fio_prepare()
for j in range(cores_per_cache):
fio.add_job().target(cores[j].path)
fio.run()
sleep(3)
with TestRun.step("Check if cache's statistics match cores' statistics"):
cache_stats = get_stats_dict(filter=stat_filter, cache_id=cache.cache_id)
cores_stats = [
get_stats_dict(
filter=stat_filter, cache_id=cache.cache_id, core_id=cores[j].core_id
) for j in range(cores_per_cache)
]
fail_message = f"In {cache_mode} cache mode "
stats_compare(cache_stats, cores_stats, cores_per_cache, fail_message)
def fio_prepare():
fio = (
Fio()
.create_command()
.io_engine(IoEngine.libaio)
.read_write(ReadWrite.randwrite)
.size(io_size)
.direct()
)
return fio
def stats_compare(cache_stats, cores_stats, cores_per_cache, fail_message):
for stat_name in cache_stats.keys():
if stat_name.startswith("Free ") or stat_name.endswith("[%]"):
continue
core_stat_sum = 0
try:
cache_stats[stat_name] = get_stat_value(cache_stats, stat_name)
for j in range(cores_per_cache):
cores_stats[j][stat_name] = get_stat_value(cores_stats[j], stat_name)
except AttributeError:
pass
for j in range(cores_per_cache):
core_stat_sum += cores_stats[j][stat_name]
if core_stat_sum != cache_stats[stat_name]:
TestRun.LOGGER.error(fail_message + (
f"sum of cores' '{stat_name}' values is "
f"{core_stat_sum}, should equal cache value: "
f"{cache_stats[stat_name]}\n"))