diff --git a/test/functional/requirements.txt b/test/functional/requirements.txt index 4e9f197..daaa74f 100644 --- a/test/functional/requirements.txt +++ b/test/functional/requirements.txt @@ -1 +1,2 @@ attotime>=0.2.0 +schema==0.7.2 diff --git a/test/functional/tests/performance/conftest.py b/test/functional/tests/performance/conftest.py new file mode 100644 index 0000000..3c9e441 --- /dev/null +++ b/test/functional/tests/performance/conftest.py @@ -0,0 +1,48 @@ +# +# Copyright(c) 2020 Intel Corporation +# SPDX-License-Identifier: BSD-3-Clause-Clear +# + +from datetime import datetime as dt +import os +import json +import pytest + +from utils.performance import PerfContainer, ConfigParameter, BuildTypes +from core.test_run import TestRun +from api.cas.casadm_parser import get_casadm_version + + +@pytest.fixture() +def perf_collector(request): + container = PerfContainer() + yield container + + if container.is_empty: + # No performance metrics submitted by test, no sense in writing any log + TestRun.LOGGER.warning("No performance metrics collected by test using perf_collector") + return + + container.insert_config_param(request.node.name.split("[")[0], ConfigParameter.TEST_NAME) + container.insert_config_param(get_casadm_version(), ConfigParameter.CAS_VERSION) + container.insert_config_param(TestRun.disks["cache"].disk_type, ConfigParameter.CACHE_TYPE) + container.insert_config_param(TestRun.disks["core"].disk_type, ConfigParameter.CORE_TYPE) + container.insert_config_param(dt.now(), ConfigParameter.TIMESTAMP) + container.insert_config_param( + request.config.getoption("--build-type"), ConfigParameter.BUILD_TYPE + ) + if TestRun.dut.ip: + container.insert_config_param(TestRun.dut.ip, ConfigParameter.DUT) + + perf_log_path = os.path.join(TestRun.LOGGER.base_dir, "perf.json") + + with open(perf_log_path, "w") as dump_file: + json.dump(container.to_serializable_dict(), dump_file, indent=4) + + +def pytest_addoption(parser): + parser.addoption("--build-type", choices=BuildTypes, default="other") + + +def pytest_configure(config): + config.addinivalue_line("markers", "performance: performance test") diff --git a/test/functional/tests/performance/test_100p_hits.py b/test/functional/tests/performance/test_100p_hits.py new file mode 100644 index 0000000..6703b55 --- /dev/null +++ b/test/functional/tests/performance/test_100p_hits.py @@ -0,0 +1,156 @@ +# +# Copyright(c) 2020 Intel Corporation +# SPDX-License-Identifier: BSD-3-Clause-Clear +# + +import pytest + +from api.cas import casadm +from api.cas.cache_config import ( + CacheMode, + CacheLineSize, + SeqCutOffPolicy, + CleaningPolicy, +) +from utils.performance import WorkloadParameter +from core.test_run import TestRun +from test_tools.fio.fio import Fio +from test_tools.fio.fio_param import IoEngine, ReadWrite +from test_utils.os_utils import Udev, set_wbt_lat, get_dut_cpu_physical_cores +from test_utils.size import Size, Unit +from test_utils.output import CmdException +from storage_devices.disk import DiskTypeSet, DiskTypeLowerThan, DiskType + + +@pytest.mark.performance() +@pytest.mark.require_disk("cache", DiskTypeSet([DiskType.optane, DiskType.nand])) +@pytest.mark.require_disk("core", DiskTypeLowerThan("cache")) +@pytest.mark.parametrize("queue_depth", [1, 4, 16, 64, 256]) +@pytest.mark.parametrize("numjobs", [1, 4, 16, 64, 256]) +@pytest.mark.parametrize("cache_line_size", CacheLineSize) +def test_4k_100p_hit_reads_wt(queue_depth, numjobs, cache_line_size, perf_collector, request): + """ + title: Test CAS performance in 100% Cache Hit scenario + description: | + Characterize cache device with workload (parametrized by qd and job number), and then run + the same workload on cached volume. + pass_criteria: + - always passes + """ + TESTING_WORKSET = Size(20, Unit.GiB) + + fio_cfg = ( + Fio() + .create_command() + .io_engine(IoEngine.libaio) + .block_size(Size(4, Unit.KiB)) + .read_write(ReadWrite.randread) + .io_depth(queue_depth) + .cpus_allowed(get_dut_cpu_physical_cores()) + .direct() + ) + + with TestRun.step("Characterize cache device"): + cache_dev_characteristics = characterize_cache_device( + request.node.name, fio_cfg, queue_depth, numjobs, TESTING_WORKSET + ) + fio_cfg.clear_jobs() + + with TestRun.step("Prepare cache and core"): + cache, core = prepare_config(cache_line_size, CacheMode.WT) + + fio_cfg = fio_cfg.target(core) + spread_jobs(fio_cfg, numjobs, TESTING_WORKSET) + + with TestRun.step("Fill the cache"): + prefill_cache(core, TESTING_WORKSET) + + with TestRun.step("Run fio"): + cache_results = fio_cfg.run()[0] + + perf_collector.insert_workload_param(numjobs, WorkloadParameter.NUM_JOBS) + perf_collector.insert_workload_param(queue_depth, WorkloadParameter.QUEUE_DEPTH) + perf_collector.insert_cache_metrics_from_fio_job(cache_dev_characteristics) + perf_collector.insert_exp_obj_metrics_from_fio_job(cache_results) + perf_collector.insert_config_from_cache(cache) + + +def prefill_cache(core, size): + ( + Fio() + .create_command() + .io_engine(IoEngine.libaio) + .block_size(Size(4, Unit.KiB)) + .read_write(ReadWrite.write) + .target(core) + .size(size) + .direct() + .run() + ) + + +@pytest.fixture(scope="session", autouse=True) +def disable_wbt_throttling(): + cache_device = TestRun.disks["cache"] + core_device = TestRun.disks["core"] + + try: + set_wbt_lat(cache_device, 0) + except CmdException: + TestRun.LOGGER.warning("Couldn't disable write-back throttling for cache device") + try: + set_wbt_lat(core_device, 0) + except CmdException: + TestRun.LOGGER.warning("Couldn't disable write-back throttling for core device") + + +def prepare_config(cache_line_size, cache_mode): + cache_device = TestRun.disks["cache"] + core_device = TestRun.disks["core"] + + core_device.create_partitions([Size(3, Unit.GiB)]) + + cache = casadm.start_cache( + cache_device, cache_mode=cache_mode, cache_line_size=cache_line_size, force=True, + ) + cache.set_seq_cutoff_policy(SeqCutOffPolicy.never) + + cache.set_cleaning_policy(CleaningPolicy.nop) + + Udev.disable() + + core = cache.add_core(core_device.partitions[0]) + + return cache, core + + +def spread_jobs(fio_cfg, numjobs, size): + offset = (size / numjobs).align_down(Unit.Blocks512.value) + + for i in range(numjobs): + fio_cfg.add_job(f"job_{i+1}").offset(offset * i).size(offset * (i + 1)) + + +def characterize_cache_device(test_name, fio_cfg, queue_depth, numjobs, size): + cache_device = TestRun.disks["cache"] + + try: + return TestRun.dev_characteristics[test_name][queue_depth][numjobs] + except AttributeError: + pass + except KeyError: + pass + + spread_jobs(fio_cfg, numjobs, size) + result = fio_cfg.target(cache_device).run()[0] + + if not hasattr(TestRun, "dev_characteristics"): + TestRun.dev_characteristics = {} + if test_name not in TestRun.dev_characteristics: + TestRun.dev_characteristics[test_name] = {} + if queue_depth not in TestRun.dev_characteristics[test_name]: + TestRun.dev_characteristics[test_name][queue_depth] = {} + + TestRun.dev_characteristics[test_name][queue_depth][numjobs] = result + + return result diff --git a/test/functional/utils/__init__.py b/test/functional/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/test/functional/utils/performance.py b/test/functional/utils/performance.py new file mode 100644 index 0000000..44b474d --- /dev/null +++ b/test/functional/utils/performance.py @@ -0,0 +1,216 @@ +# +# Copyright(c) 2020 Intel Corporation +# SPDX-License-Identifier: BSD-3-Clause-Clear +# + +from enum import Enum +from types import MethodType +from datetime import datetime + +from schema import Schema, Use, And, SchemaError, Or + + +class ValidatableParameter(Enum): + """ + Parameter enumeration together with schema for validating this parameter + If given parameter is always valid put False as its value, otherwise use proper Schema object + """ + + def __new__(cls, schema: Schema): + if not (isinstance(schema, Schema) or not schema): + raise Exception( + f"Invalid {cls.__name__} value. Expected: Schema instance or False, got: {schema}" + ) + + # Trick for changing value which is supplied by enum + # This way we can access Schema from enumeration member instance and still have Enum + # properties maintained + obj = object.__new__(cls) + obj._value_ = obj + obj.schema = schema + obj.validate = MethodType(cls.validate, obj) + + return obj + + def validate(self, param): + if self.schema: + param = self.schema.validate(param) + + return param + + def __repr__(self): + return f"<{type(self).__name__}.{self.name}>" + + def __str__(self): + return str(self.name) + + +class PercentileMetric: + def __init__(self, value): + value = float(value) + + if not 0 < value < 100: + raise SchemaError("Invalid percentile value") + + self.value = value + + def __str__(self): + return f"p{self.value:g}".replace(".", "_") + + +class IOMetric(ValidatableParameter): + read_IOPS = Schema(Use(int)) + write_IOPS = Schema(Use(int)) + read_BW = Schema(Use(int)) + write_BW = Schema(Use(int)) + read_CLAT_AVG = Schema(Use(int)) + write_CLAT_AVG = Schema(Use(int)) + read_CLAT_PERCENTILES = Schema({Use(PercentileMetric): Use(int)}) + write_CLAT_PERCENTILES = Schema({Use(PercentileMetric): Use(int)}) + +BuildTypes = ["master", "pr", "other"] + +class ConfigParameter(ValidatableParameter): + CAS_VERSION = Schema(Use(str)) + DUT = Schema(Use(str)) + TEST_NAME = Schema(str) + BUILD_TYPE = Schema(Or(*BuildTypes)) + CACHE_CONFIG = Schema( + {"cache_mode": Use(str), "cache_line_size": Use(str), "cleaning_policy": Use(str)} + ) + CACHE_TYPE = Schema(Use(str)) + CORE_TYPE = Schema(Use(str)) + TIMESTAMP = Schema(And(datetime, Use(str))) + + +class WorkloadParameter(ValidatableParameter): + NUM_JOBS = Schema(Use(int)) + QUEUE_DEPTH = Schema(Use(int)) + + +class MetricContainer: + def __init__(self, metric_type): + self.metrics = {} + self.metric_type = metric_type + + def insert_metric(self, metric, kind): + if not isinstance(kind, self.metric_type): + raise Exception(f"Invalid metric type. Expected: {self.metric_type}, got: {type(kind)}") + + if kind.value: + metric = kind.value.validate(metric) + + self.metrics[kind] = metric + + @property + def is_empty(self): + return len(self.metrics) == 0 + + def to_serializable_dict(self): + # No easy way for json.dump to deal with custom classes (especially custom Enums) + def stringify_dict(d): + new_dict = {} + for k, v in d.items(): + k = str(k) + + if isinstance(v, dict): + v = stringify_dict(v) + elif isinstance(v, int): + pass + elif isinstance(v, float): + pass + else: + v = str(v) + + new_dict[k] = v + + return new_dict + + return stringify_dict(self.metrics) + + +class PerfContainer: + def __init__(self): + self.conf_params = MetricContainer(ConfigParameter) + + self.workload_params = MetricContainer(WorkloadParameter) + + self.cache_metrics = MetricContainer(IOMetric) + self.core_metrics = MetricContainer(IOMetric) + self.exp_obj_metrics = MetricContainer(IOMetric) + + def insert_config_param(self, param, kind: ConfigParameter): + self.conf_params.insert_metric(param, kind) + + def insert_config_from_cache(self, cache): + cache_config = { + "cache_mode": cache.get_cache_mode(), + "cache_line_size": cache.get_cache_line_size(), + "cleaning_policy": cache.get_cleaning_policy(), + } + + self.conf_params.insert_metric(cache_config, ConfigParameter.CACHE_CONFIG) + + def insert_workload_param(self, param, kind: WorkloadParameter): + self.workload_params.insert_metric(param, kind) + + @staticmethod + def _insert_metrics_from_fio(container, result): + result = result.job + + container.insert_metric(result.read.iops, IOMetric.read_IOPS) + container.insert_metric(result.write.iops, IOMetric.write_IOPS) + container.insert_metric(result.read.bw, IOMetric.read_BW) + container.insert_metric(result.write.bw, IOMetric.write_BW) + container.insert_metric(result.read.clat_ns.mean, IOMetric.read_CLAT_AVG) + container.insert_metric(result.write.clat_ns.mean, IOMetric.write_CLAT_AVG) + if hasattr(result.read.clat_ns, "percentile"): + container.insert_metric( + vars(result.read.clat_ns.percentile), IOMetric.read_CLAT_PERCENTILES + ) + if hasattr(result.write.clat_ns, "percentile"): + container.insert_metric( + vars(result.write.clat_ns.percentile), IOMetric.write_CLAT_PERCENTILES + ) + + def insert_cache_metric(self, metric, kind: IOMetric): + self.cache_metrics.insert_metric(metric, kind) + + def insert_cache_metrics_from_fio_job(self, fio_results): + self._insert_metrics_from_fio(self.cache_metrics, fio_results) + + def insert_core_metric(self, metric, kind: IOMetric): + self.core_metrics.insert_metric(metric, kind) + + def insert_core_metrics_from_fio_job(self, fio_results): + self._insert_metrics_from_fio(self.core_metrics, fio_results) + + def insert_exp_obj_metric(self, metric, kind: IOMetric): + self.exp_obj_metrics.insert_metric(metric, kind) + + def insert_exp_obj_metrics_from_fio_job(self, fio_results): + self._insert_metrics_from_fio(self.exp_obj_metrics, fio_results) + + @property + def is_empty(self): + return ( + self.conf_params.is_empty + and self.workload_params.is_empty + and self.cache_metrics.is_empty + and self.core_metrics.is_empty + and self.exp_obj_metrics.is_empty + ) + + def to_serializable_dict(self): + ret = {**self.conf_params.to_serializable_dict()} + + if not self.workload_params.is_empty: + ret["workload_params"] = self.workload_params.to_serializable_dict() + if not self.cache_metrics.is_empty: + ret["cache_io"] = self.cache_metrics.to_serializable_dict() + if not self.core_metrics.is_empty: + ret["core_io"] = self.core_metrics.to_serializable_dict() + if not self.exp_obj_metrics.is_empty: + ret["exp_obj_io"] = self.exp_obj_metrics.to_serializable_dict() + + return ret