open-cas-linux/test/functional/api/cas/ioclass_config.py
Kamil Gierszewski f0753339dd
test-api: change default file path
Signed-off-by: Kamil Gierszewski <kamil.gierszewski@huawei.com>
2025-01-10 14:54:44 +01:00

298 lines
9.5 KiB
Python

#
# Copyright(c) 2019-2022 Intel Corporation
# Copyright(c) 2024-2025 Huawei Technologies Co., Ltd.
# SPDX-License-Identifier: BSD-3-Clause
#
import enum
import functools
import random
import re
import string
from datetime import timedelta
from packaging import version
from core.test_run import TestRun
from test_tools.fs_tools import write_file
from test_tools.os_tools import get_kernel_version
default_config_file_path = TestRun.TEST_RUN_DATA_PATH + "/opencas_ioclass.conf"
MAX_IO_CLASS_ID = 32
MAX_IO_CLASS_PRIORITY = 255
DEFAULT_IO_CLASS_ID = 0
DEFAULT_IO_CLASS_PRIORITY = 255
DEFAULT_IO_CLASS_RULE = "unclassified"
MAX_CLASSIFICATION_DELAY = timedelta(seconds=6)
IO_CLASS_CONFIG_HEADER = "IO class id,IO class name,Eviction priority,Allocation"
@functools.total_ordering
class IoClass:
def __init__(
self,
class_id: int,
rule: str = "",
priority: int = None,
allocation: str = "1.00",
):
self.id = class_id
self.rule = rule
self.priority = priority
self.allocation = allocation
def __str__(self):
return (
f'{self.id},{self.rule},{"" if self.priority is None else self.priority}'
f",{self.allocation}"
)
def __eq__(self, other):
return (
self.id,
self.rule,
self.priority,
self.allocation,
) == (
other.id,
other.rule,
other.priority,
other.allocation,
)
def __lt__(self, other):
return (
self.id,
self.rule,
self.priority,
self.allocation,
) < (
other.id,
other.rule,
other.priority,
other.allocation,
)
@staticmethod
def from_string(ioclass_str: str):
parts = [part.strip() for part in re.split("[,|]", ioclass_str.replace("", ""))]
return IoClass(
class_id=int(parts[0]),
rule=parts[1],
priority=int(parts[2]),
allocation="%.2f" % float(parts[3]),
)
@staticmethod
def list_to_csv(ioclass_list: [], add_default_rule: bool = True):
list_copy = ioclass_list[:]
if add_default_rule and not len([c for c in list_copy if c.id == 0]):
list_copy.insert(0, IoClass.default())
list_copy.insert(0, IO_CLASS_CONFIG_HEADER)
return "\n".join(str(c) for c in list_copy)
@staticmethod
def csv_to_list(csv: str):
ioclass_list = []
for line in csv.splitlines():
if line.strip() == IO_CLASS_CONFIG_HEADER:
continue
ioclass_list.append(IoClass.from_string(line))
return ioclass_list
@staticmethod
def save_list_to_config_file(
ioclass_list: [],
add_default_rule: bool = True,
ioclass_config_path: str = default_config_file_path,
):
TestRun.LOGGER.info(f"Creating config file {ioclass_config_path}")
write_file(
ioclass_config_path, IoClass.list_to_csv(ioclass_list, add_default_rule)
)
@staticmethod
def default(priority=DEFAULT_IO_CLASS_PRIORITY, allocation="1.00"):
return IoClass(DEFAULT_IO_CLASS_ID, DEFAULT_IO_CLASS_RULE, priority, allocation)
@staticmethod
def default_header_dict():
return {
"id": "IO class id",
"name": "IO class name",
"eviction_prio": "Eviction priority",
"allocation": "Allocation",
}
@staticmethod
def default_header():
return ",".join(IoClass.default_header_dict().values())
@staticmethod
def compare_ioclass_lists(list1: [], list2: []):
return sorted(list1) == sorted(list2)
@staticmethod
def generate_random_ioclass_list(count: int, max_priority: int = MAX_IO_CLASS_PRIORITY):
random_list = [
IoClass.default(
priority=random.randint(0, max_priority),
allocation=f"{random.randint(0, 100) / 100:0.2f}",
)
]
for i in range(1, count):
random_list.append(
IoClass(
i,
priority=random.randint(0, max_priority),
allocation=f"{random.randint(0, 100) / 100:0.2f}",
).set_random_rule()
)
return random_list
def set_random_rule(self):
rules = [
"metadata",
"direct",
"file_size",
"directory",
"io_class",
"extension",
"file_name_prefix",
"lba",
"pid",
"process_name",
"file_offset",
"request_size",
]
if get_kernel_version() >= version.Version("4.13"):
rules.append("wlth")
rule = random.choice(rules)
self.rule = IoClass.add_random_params(rule)
return self
@staticmethod
def add_random_params(rule: str):
if rule == "directory":
allowed_chars = string.ascii_letters + string.digits + "/"
rule += f":/{''.join(random.choices(allowed_chars, k=random.randint(1, 40)))}"
elif rule in ["file_size", "lba", "pid", "file_offset", "request_size", "wlth"]:
rule += f":{Operator(random.randrange(len(Operator))).name}:{random.randrange(1000000)}"
elif rule == "io_class":
rule += f":{random.randrange(MAX_IO_CLASS_PRIORITY + 1)}"
elif rule in ["extension", "process_name", "file_name_prefix"]:
allowed_chars = string.ascii_letters + string.digits
rule += f":{''.join(random.choices(allowed_chars, k=random.randint(1, 10)))}"
if random.randrange(2):
rule += "&done"
return rule
class Operator(enum.Enum):
eq = 0
gt = 1
ge = 2
lt = 3
le = 4
# TODO: replace below methods with methods using IoClass
def create_ioclass_config(
add_default_rule: bool = True, ioclass_config_path: str = default_config_file_path
):
TestRun.LOGGER.info(f"Creating config file {ioclass_config_path}")
output = TestRun.executor.run(f"echo {IO_CLASS_CONFIG_HEADER} > {ioclass_config_path}")
if output.exit_code != 0:
raise Exception(
"Failed to create ioclass config file. "
+ f"stdout: {output.stdout} \n stderr :{output.stderr}"
)
if add_default_rule:
output = TestRun.executor.run(
f'echo "{DEFAULT_IO_CLASS_ID},{DEFAULT_IO_CLASS_RULE},{DEFAULT_IO_CLASS_PRIORITY},"'
+ f'"1.00" >> {ioclass_config_path}'
)
if output.exit_code != 0:
raise Exception(
"Failed to create ioclass config file. "
+ f"stdout: {output.stdout} \n stderr :{output.stderr}"
)
def remove_ioclass_config(ioclass_config_path: str = default_config_file_path):
TestRun.LOGGER.info(f"Removing config file {ioclass_config_path}")
output = TestRun.executor.run(f"rm -f {ioclass_config_path}")
if output.exit_code != 0:
raise Exception(
f"Failed to remove config file. stdout: {output.stdout} \n stderr :{output.stderr}"
)
def add_ioclass(
ioclass_id: int,
rule: str,
eviction_priority: int,
allocation,
ioclass_config_path: str = default_config_file_path,
):
new_ioclass = f"{ioclass_id},{rule},{eviction_priority},{allocation}"
TestRun.LOGGER.info(f"Adding rule {new_ioclass} to config file {ioclass_config_path}")
output = TestRun.executor.run(f'echo "{new_ioclass}" >> {ioclass_config_path}')
if output.exit_code != 0:
raise Exception(
"Failed to append ioclass to config file. "
+ f"stdout: {output.stdout} \n stderr :{output.stderr}"
)
def get_ioclass(ioclass_id: int, ioclass_config_path: str = default_config_file_path):
TestRun.LOGGER.info(f"Retrieving rule no. {ioclass_id} from config file {ioclass_config_path}")
output = TestRun.executor.run(f"cat {ioclass_config_path}")
if output.exit_code != 0:
raise Exception(
"Failed to read ioclass config file. "
+ f"stdout: {output.stdout} \n stderr :{output.stderr}"
)
ioclass_config = output.stdout.splitlines()
for ioclass in ioclass_config:
if int(ioclass.split(",")[0]) == ioclass_id:
return ioclass
def remove_ioclass(ioclass_id: int, ioclass_config_path: str = default_config_file_path):
TestRun.LOGGER.info(f"Removing rule no.{ioclass_id} from config file {ioclass_config_path}")
output = TestRun.executor.run(f"cat {ioclass_config_path}")
if output.exit_code != 0:
raise Exception(
"Failed to read ioclass config file. "
+ f"stdout: {output.stdout} \n stderr :{output.stderr}"
)
old_ioclass_config = output.stdout.splitlines()
config_header = old_ioclass_config[0]
# First line in valid config file is always a header, not a rule - it is
# already extracted above
new_ioclass_config = [x for x in old_ioclass_config[1:] if int(x.split(",")[0]) != ioclass_id]
new_ioclass_config.insert(0, config_header)
if len(new_ioclass_config) == len(old_ioclass_config):
raise Exception(
f"Failed to remove ioclass {ioclass_id} from config file {ioclass_config_path}"
)
new_ioclass_config_str = "\n".join(new_ioclass_config)
output = TestRun.executor.run(f'echo "{new_ioclass_config_str}" > {ioclass_config_path}')
if output.exit_code != 0:
raise Exception(
"Failed to save new ioclass config. "
+ f"stdout: {output.stdout} \n stderr :{output.stderr}"
)