open-cas-linux/test/functional/api/cas/ioclass_config.py
Rafal Stefanowski acec05060d Fix license
Change license to BSD-3-Clause

Signed-off-by: Rafal Stefanowski <rafal.stefanowski@intel.com>
2021-10-28 12:46:42 +02:00

253 lines
8.8 KiB
Python

#
# Copyright(c) 2019-2021 Intel Corporation
# SPDX-License-Identifier: BSD-3-Clause
#
import enum
import functools
import random
import re
import string
from datetime import timedelta
from packaging import version
from core.test_run import TestRun
from test_tools import fs_utils
from test_utils import os_utils
from test_utils.generator import random_string
default_config_file_path = "/tmp/opencas_ioclass.conf"
MAX_IO_CLASS_ID = 32
MAX_IO_CLASS_PRIORITY = 255
DEFAULT_IO_CLASS_ID = 0
DEFAULT_IO_CLASS_PRIORITY = 255
DEFAULT_IO_CLASS_RULE = "unclassified"
MAX_CLASSIFICATION_DELAY = timedelta(seconds=6)
IO_CLASS_CONFIG_HEADER = "IO class id,IO class name,Eviction priority,Allocation"
@functools.total_ordering
class IoClass:
def __init__(self, class_id: int, rule: str = '', priority: int = None,
allocation: str = "1.00"):
self.id = class_id
self.rule = rule
self.priority = priority
self.allocation = allocation
def __str__(self):
return (f'{self.id},{self.rule},{"" if self.priority is None else self.priority}'
f',{self.allocation}')
def __eq__(self, other):
return ((self.id, self.rule, self.priority, self.allocation)
== (other.id, other.rule, other.priority, other.allocation))
def __lt__(self, other):
return ((self.id, self.rule, self.priority, self.allocation)
< (other.id, other.rule, other.priority, other.allocation))
@staticmethod
def from_string(ioclass_str: str):
parts = [part.strip() for part in re.split('[,|]', ioclass_str.replace('', ''))]
return IoClass(
class_id=int(parts[0]),
rule=parts[1],
priority=int(parts[2]),
allocation=parts[3])
@staticmethod
def list_to_csv(ioclass_list: [], add_default_rule: bool = True):
list_copy = ioclass_list[:]
if add_default_rule and not len([c for c in list_copy if c.id == 0]):
list_copy.insert(0, IoClass.default())
list_copy.insert(0, IO_CLASS_CONFIG_HEADER)
return '\n'.join(str(c) for c in list_copy)
@staticmethod
def csv_to_list(csv: str):
ioclass_list = []
for line in csv.splitlines():
if line.strip() == IO_CLASS_CONFIG_HEADER:
continue
ioclass_list.append(IoClass.from_string(line))
return ioclass_list
@staticmethod
def save_list_to_config_file(ioclass_list: [],
add_default_rule: bool = True,
ioclass_config_path: str = default_config_file_path):
TestRun.LOGGER.info(f"Creating config file {ioclass_config_path}")
fs_utils.write_file(ioclass_config_path,
IoClass.list_to_csv(ioclass_list, add_default_rule))
@staticmethod
def default(priority=DEFAULT_IO_CLASS_PRIORITY, allocation="1.00"):
return IoClass(DEFAULT_IO_CLASS_ID, DEFAULT_IO_CLASS_RULE, priority, allocation)
@staticmethod
def compare_ioclass_lists(list1: [], list2: []):
return sorted(list1) == sorted(list2)
@staticmethod
def generate_random_ioclass_list(count: int, max_priority: int = MAX_IO_CLASS_PRIORITY):
random_list = [IoClass.default(priority=random.randint(0, max_priority),
allocation=f"{random.randint(0,100)/100:0.2f}")]
for i in range(1, count):
random_list.append(IoClass(i, priority=random.randint(0, max_priority),
allocation=f"{random.randint(0,100)/100:0.2f}")
.set_random_rule())
return random_list
def set_random_rule(self):
rules = ["metadata", "direct", "file_size", "directory", "io_class",
"extension", "file_name_prefix", "lba", "pid", "process_name",
"file_offset", "request_size"]
if os_utils.get_kernel_version() >= version.Version("4.13"):
rules.append("wlth")
rule = random.choice(rules)
self.rule = IoClass.add_random_params(rule)
return self
@staticmethod
def add_random_params(rule: str):
if rule == "directory":
allowed_chars = string.ascii_letters + string.digits + '/'
rule += f":/{random_string(random.randint(1, 40), allowed_chars)}"
elif rule in ["file_size", "lba", "pid", "file_offset", "request_size", "wlth"]:
rule += f":{Operator(random.randrange(len(Operator))).name}:{random.randrange(1000000)}"
elif rule == "io_class":
rule += f":{random.randrange(MAX_IO_CLASS_PRIORITY + 1)}"
elif rule in ["extension", "process_name", "file_name_prefix"]:
rule += f":{random_string(random.randint(1, 10))}"
if random.randrange(2):
rule += "&done"
return rule
class Operator(enum.Enum):
eq = 0
gt = 1
ge = 2
lt = 3
le = 4
# TODO: replace below methods with methods using IoClass
def create_ioclass_config(
add_default_rule: bool = True, ioclass_config_path: str = default_config_file_path
):
TestRun.LOGGER.info(f"Creating config file {ioclass_config_path}")
output = TestRun.executor.run(
f'echo {IO_CLASS_CONFIG_HEADER} > {ioclass_config_path}'
)
if output.exit_code != 0:
raise Exception(
"Failed to create ioclass config file. "
+ f"stdout: {output.stdout} \n stderr :{output.stderr}"
)
if add_default_rule:
output = TestRun.executor.run(
f'echo "{DEFAULT_IO_CLASS_ID},{DEFAULT_IO_CLASS_RULE},{DEFAULT_IO_CLASS_PRIORITY},"'
+ f'"1.00" >> {ioclass_config_path}'
)
if output.exit_code != 0:
raise Exception(
"Failed to create ioclass config file. "
+ f"stdout: {output.stdout} \n stderr :{output.stderr}"
)
def remove_ioclass_config(ioclass_config_path: str = default_config_file_path):
TestRun.LOGGER.info(f"Removing config file {ioclass_config_path}")
output = TestRun.executor.run(f"rm -f {ioclass_config_path}")
if output.exit_code != 0:
raise Exception(
"Failed to remove config file. "
+ f"stdout: {output.stdout} \n stderr :{output.stderr}"
)
def add_ioclass(
ioclass_id: int,
rule: str,
eviction_priority: int,
allocation,
ioclass_config_path: str = default_config_file_path,
):
new_ioclass = f"{ioclass_id},{rule},{eviction_priority},{allocation}"
TestRun.LOGGER.info(
f"Adding rule {new_ioclass} " + f"to config file {ioclass_config_path}"
)
output = TestRun.executor.run(
f'echo "{new_ioclass}" >> {ioclass_config_path}'
)
if output.exit_code != 0:
raise Exception(
"Failed to append ioclass to config file. "
+ f"stdout: {output.stdout} \n stderr :{output.stderr}"
)
def get_ioclass(ioclass_id: int, ioclass_config_path: str = default_config_file_path):
TestRun.LOGGER.info(
f"Retrieving rule no. {ioclass_id} " + f"from config file {ioclass_config_path}"
)
output = TestRun.executor.run(f"cat {ioclass_config_path}")
if output.exit_code != 0:
raise Exception(
"Failed to read ioclass config file. "
+ f"stdout: {output.stdout} \n stderr :{output.stderr}"
)
ioclass_config = output.stdout.splitlines()
for ioclass in ioclass_config:
if int(ioclass.split(",")[0]) == ioclass_id:
return ioclass
def remove_ioclass(
ioclass_id: int, ioclass_config_path: str = default_config_file_path
):
TestRun.LOGGER.info(
f"Removing rule no.{ioclass_id} " + f"from config file {ioclass_config_path}"
)
output = TestRun.executor.run(f"cat {ioclass_config_path}")
if output.exit_code != 0:
raise Exception(
"Failed to read ioclass config file. "
+ f"stdout: {output.stdout} \n stderr :{output.stderr}"
)
old_ioclass_config = output.stdout.splitlines()
config_header = old_ioclass_config[0]
# First line in valid config file is always a header, not a rule - it is
# already extracted above
new_ioclass_config = [
x for x in old_ioclass_config[1:] if int(x.split(",")[0]) != ioclass_id
]
new_ioclass_config.insert(0, config_header)
if len(new_ioclass_config) == len(old_ioclass_config):
raise Exception(
f"Failed to remove ioclass {ioclass_id} from config file {ioclass_config_path}"
)
new_ioclass_config_str = "\n".join(new_ioclass_config)
output = TestRun.executor.run(
f'echo "{new_ioclass_config_str}" > {ioclass_config_path}'
)
if output.exit_code != 0:
raise Exception(
"Failed to save new ioclass config. "
+ f"stdout: {output.stdout} \n stderr :{output.stderr}"
)