Compare commits

..

64 Commits

Author SHA1 Message Date
Katarzyna Treder
ace1ff49d4 Merge pull request #38 from Kamoppl/kamilg/fix_paths_type
test-framework: force TF to use linux path
2025-04-08 07:42:59 +02:00
Kamil Gierszewski
e6b4eaa6a3 test-framework: force using linux paths in TF
Signed-off-by: Kamil Gierszewski <kamil.gierszewski@huawei.com>
2025-04-01 13:55:25 +02:00
Katarzyna Treder
1d7589644a Merge pull request #35 from katlapinka/kasiat/smaller-logs
Making logs smaller by getting it only for the time of test execution
2025-03-19 10:10:50 +01:00
Katarzyna Treder
bcc5c38c27 Making logs smaller by getting it only for the time of test execution
Signed-off-by: Katarzyna Treder <katarzyna.treder@h-partners.com>
2025-03-04 08:23:21 +01:00
Katarzyna Treder
d7fb6885bf Merge pull request #36 from katlapinka/kasiat/test-data-path
Set TEST_RUN_DATA_PATH for test-framework base tests
2025-03-04 08:18:45 +01:00
Katarzyna Treder
fe060c9c59 Set TEST_RUN_DATA_PATH for test-framework base tests
Signed-off-by: Katarzyna Treder <katarzyna.treder@h-partners.com>
2025-02-27 11:17:03 +01:00
Kamil Gierszewski
cb50633e34 Merge pull request #33 from Kamoppl/kamilg/update_tf_dec
Few tf fixes/improvements
2025-01-30 22:28:43 +01:00
Kamil Gierszewski
3374881bdd test-framework: fix config ip handle
Signed-off-by: Kamil Gierszewski <kamil.gierszewski@huawei.com>
2025-01-02 01:44:16 +01:00
Kamil Gierszewski
0b9ea5f0ff test-framework: code refactor in power plugin
Signed-off-by: Kamil Gierszewski <kamil.gierszewski@huawei.com>
2025-01-02 01:42:37 +01:00
Kamil Gierszewski
5f6ce5d2a4 test-framework: minor refactor
Signed-off-by: Kamil Gierszewski <kamil.gierszewski@huawei.com>
2025-01-02 01:36:42 +01:00
Katarzyna Treder
aea7532756 Merge pull request #34 from katlapinka/kasiat/tf-refactor
TF refactor
2024-12-31 12:44:20 +01:00
Katarzyna Treder
f7f2914e41 Add trim verification fio param
This param will be needed to replace some vdbench configs after removing
vdbench plugin

Signed-off-by: Katarzyna Treder <katarzyna.treder@h-partners.com>
2024-12-31 11:51:17 +01:00
Katarzyna Treder
5bd6a656c5 Fix imports
Signed-off-by: Katarzyna Treder <katarzyna.treder@h-partners.com>
2024-12-31 11:51:05 +01:00
Katarzyna Treder
fd869a0afc Refactor disk tools and fs tools
Signed-off-by: Katarzyna Treder <katarzyna.treder@h-partners.com>
2024-12-11 19:13:39 +01:00
Katarzyna Treder
6dd9c9ca8c Iostat refactor
Signed-off-by: Katarzyna Treder <katarzyna.treder@h-partners.com>
2024-12-11 17:54:57 +01:00
Katarzyna Treder
c218f64c81 Types fix - rename to type_def
Signed-off-by: Katarzyna Treder <katarzyna.treder@h-partners.com>
2024-12-11 13:55:06 +01:00
Katarzyna Treder
6970a43107 Disk finder refactor
Signed-off-by: Katarzyna Treder <katarzyna.treder@h-partners.com>
2024-12-11 09:03:49 +01:00
Katarzyna Treder
7512420e2a OS tools refactor:
- make udev, runlevel, memory and wget separate files
- rename os_utils to os_tools and move it to test_tools
- remove ModuleRemoveMethod and set modprobe as default
- fix regex in is_kernel_module_loaded method
- remove get_sys_block_path
- move get_block_device_names method to disk tools

Signed-off-by: Katarzyna Treder <katarzyna.treder@h-partners.com>
2024-12-11 07:54:25 +01:00
Katarzyna Treder
ae9b036b47 Rename systemd to systemctl and move it to tools
Signed-off-by: Katarzyna Treder <katarzyna.treder@h-partners.com>
2024-12-10 14:46:04 +01:00
Katarzyna Treder
a6848ed825 Remove kedr
Signed-off-by: Katarzyna Treder <katarzyna.treder@h-partners.com>
2024-12-10 14:39:38 +01:00
Katarzyna Treder
a6551b032a Move scsi_debug to tools
Signed-off-by: Katarzyna Treder <katarzyna.treder@h-partners.com>
2024-12-10 14:35:01 +01:00
Katarzyna Treder
40f850c1f9 Move git to tools
Signed-off-by: Katarzyna Treder <katarzyna.treder@h-partners.com>
2024-12-10 14:31:39 +01:00
Katarzyna Treder
324edb0eba Move fstab to tools
Signed-off-by: Katarzyna Treder <katarzyna.treder@h-partners.com>
2024-12-10 14:28:16 +01:00
Katarzyna Treder
2390e4c2af Remove generator from test utils
Signed-off-by: Katarzyna Treder <katarzyna.treder@h-partners.com>
2024-12-10 14:20:54 +01:00
Katarzyna Treder
7f5bbb5240 Move dmesg to tools
Signed-off-by: Katarzyna Treder <katarzyna.treder@h-partners.com>
2024-12-10 14:05:43 +01:00
Katarzyna Treder
a954e47b33 Move linux command and wait method to common tools
Signed-off-by: Katarzyna Treder <katarzyna.treder@h-partners.com>
2024-12-10 13:57:18 +01:00
Katarzyna Treder
f60e90192e Move error device to storage devices as separate file
Signed-off-by: Katarzyna Treder <katarzyna.treder@h-partners.com>
2024-12-10 13:46:25 +01:00
Katarzyna Treder
8a7e848afc Move checksec to scripts
Signed-off-by: Katarzyna Treder <katarzyna.treder@h-partners.com>
2024-12-10 13:38:45 +01:00
Katarzyna Treder
de4305af8c Move singleton to common utils
Signed-off-by: Katarzyna Treder <katarzyna.treder@h-partners.com>
2024-12-10 13:24:33 +01:00
Katarzyna Treder
5409fa9a40 Move retry to connection utils
Signed-off-by: Katarzyna Treder <katarzyna.treder@h-partners.com>
2024-12-10 12:54:18 +01:00
Katarzyna Treder
f508707394 Move asynchronous to connection utils
Signed-off-by: Katarzyna Treder <katarzyna.treder@h-partners.com>
2024-12-10 12:50:13 +01:00
Katarzyna Treder
161cc7957a Move output to connection utils
Signed-off-by: Katarzyna Treder <katarzyna.treder@h-partners.com>
2024-12-10 12:29:44 +01:00
Katarzyna Treder
513f006d17 Move Time to types
Signed-off-by: Katarzyna Treder <katarzyna.treder@h-partners.com>
2024-12-10 12:19:57 +01:00
Katarzyna Treder
622c64c46b Move size to types
Signed-off-by: Katarzyna Treder <katarzyna.treder@h-partners.com>
2024-12-10 12:18:07 +01:00
Katarzyna Treder
072c72b08c Merge pull request #31 from katlapinka/kasiat/initramfs-update
Add method for updating initramfs accordingly to OS version
2024-11-27 10:24:06 +01:00
Katarzyna Treder
2bec959544 Add method for updating initramfs accordingly to OS version
Signed-off-by: Katarzyna Treder <katarzyna.treder@h-partners.com>
2024-11-27 09:19:03 +01:00
Katarzyna Treder
2b085c1cff Merge pull request #32 from katlapinka/kasiat/device-get-serial
Add method for getting device sysfs serial
2024-11-27 07:47:16 +01:00
Katarzyna Treder
b7aa8a1974 Add method for getting device sysfs serial
Signed-off-by: Katarzyna Treder <katarzyna.treder@h-partners.com>
2024-11-26 12:48:00 +01:00
Katarzyna Treder
6dc07dad3e Merge pull request #27 from katlapinka/kasiat/lvm-resolve-os-disk
Prevent removing OS partition logical volume
2024-11-26 12:40:51 +01:00
Katarzyna Treder
4e0dad274f Prevent removing OS partition logical volume
Signed-off-by: Katarzyna Treder <katarzyna.treder@h-partners.com>
2024-11-26 12:40:10 +01:00
Katarzyna Treder
704038297f Fix for getting disk serial number
Signed-off-by: Katarzyna Treder <katarzyna.treder@h-partners.com>
2024-11-26 12:40:10 +01:00
Katarzyna Treder
8739a782e1 Merge pull request #28 from katlapinka/kasiat/lvm-os-disk-filter
Add OS disk filters to LVM config file
2024-11-26 12:39:20 +01:00
Katarzyna Treder
9feb5a7245 Add OS disk filter to LVM config file
Signed-off-by: Katarzyna Treder <katarzyna.treder@h-partners.com>
2024-11-26 12:38:44 +01:00
Katarzyna Treder
5a16cd1f51 Merge pull request #30 from Deixx/fix-logs
Fix log gathering
2024-11-25 09:53:58 +01:00
Daniel Madej
eaabd446b8 Fix log gathering
For DUTs without IP defined use host when creating log dir
(IP set to None caused an exception)
Use paramiko method for copying log files
(works when running tests from Windows)

Signed-off-by: Daniel Madej <daniel.madej@huawei.com>
2024-11-25 09:50:47 +01:00
Katarzyna Treder
a29e65f38d Merge pull request #29 from Deixx/stdout-to-file
Add logging stdout to a file in test result directory
2024-11-25 09:12:04 +01:00
Daniel Madej
1d68e0d571 Add logging stdout to a file in test result directory
Signed-off-by: Daniel Madej <daniel.madej@huawei.com>
2024-11-18 14:46:51 +01:00
Katarzyna Treder
5d8afd3fba Merge pull request #24 from katlapinka/kasiat/update-fio-version
Update fio version and make it possible to set it in config file
2024-11-12 11:33:55 +01:00
Katarzyna Treder
d7f9d90667 Update fio version and make it possible to set it in config file
Signed-off-by: Katarzyna Treder <katarzyna.treder@h-partners.com>
2024-11-12 11:33:05 +01:00
Katarzyna Treder
a5a05f4ac2 Merge pull request #23 from Kamoppl/kamilg/speed_up_TF
Kamilg/update tf
2024-11-12 11:24:58 +01:00
Kamil Gierszewski
6ea1c16066 test-framework: Fail the test if something broke in prepare
Signed-off-by: Kamil Gierszewski <kamil.gierszewski@huawei.com>
2024-11-12 11:23:27 +01:00
Kamil Gierszewski
0edba4f01b test-framework: Make kill_all_io faster
Signed-off-by: Kamil Gierszewski <kamil.gierszewski@huawei.com>
2024-11-12 11:23:27 +01:00
Kamil Gierszewski
f59fc28ef2 test-framework: Wait for raids after creating, not while discovering
Signed-off-by: Kamil Gierszewski <kamil.gierszewski@huawei.com>
2024-11-12 11:23:27 +01:00
Kamil Gierszewski
7b741e2c96 test-framework: Parallelize SATA plug_all command
Signed-off-by: Kamil Gierszewski <kamil.gierszewski@huawei.com>
2024-11-12 11:23:27 +01:00
Kamil Gierszewski
5bbbf559fd test-framework: Make pre-logger exceptions readable
Signed-off-by: Kamil Gierszewski <kamil.gierszewski@huawei.com>
2024-11-12 11:23:27 +01:00
Kamil Gierszewski
5226f62cf2 test-framework: Make BaseLogResult comparable
Signed-off-by: Kamil Gierszewski <kamil.gierszewski@huawei.com>
2024-11-12 11:23:27 +01:00
Katarzyna Treder
4739cfab27 Merge pull request #25 from katlapinka/kasiat/block-dev-list
Add method for gathering list of block devices names
2024-11-12 07:41:54 +01:00
Katarzyna Treder
c961c08d93 Add method for gathering list of block devices names
Signed-off-by: Katarzyna Treder <katarzyna.treder@h-partners.com>
2024-11-07 10:59:03 +01:00
Katarzyna Treder
775036de91 Merge pull request #26 from katlapinka/kasiat/lvm-api-update
LVM API refactor
2024-11-07 10:54:25 +01:00
Katarzyna Treder
3ceea87e87 LVM API refactor
Signed-off-by: Katarzyna Treder <katarzyna.treder@h-partners.com>
2024-11-05 11:44:52 +01:00
Katarzyna Treder
3dacb82a13 Merge pull request #22 from katlapinka/kasiat/peach-fix-command-type
Fix parameter type in peach fuzzer get_fuzzed_command method
2024-10-15 11:13:11 +02:00
Katarzyna Treder
b2b20cb714 Fix parameter type in peach fuzzer get_fuzzed_command method
Signed-off-by: Katarzyna Treder <katarzyna.treder@h-partners.com>
2024-10-15 11:12:38 +02:00
Katarzyna Treder
b8929972b4 Merge pull request #21 from katlapinka/kasiat/os-utils-user
Introduce methods for creating or checking if user exists
2024-10-15 09:31:15 +02:00
Katarzyna Treder
3e4a19ef34 Introduce methods for creating or checking if user exists
Signed-off-by: Katarzyna Treder <katarzyna.treder@h-partners.com>
2024-10-08 12:28:50 +02:00
71 changed files with 1172 additions and 1213 deletions

View File

@@ -8,7 +8,7 @@ import time
from datetime import timedelta
from core.test_run import TestRun
from test_utils.output import CmdException
from connection.utils.output import CmdException
class BaseExecutor:

View File

@@ -9,8 +9,8 @@ from datetime import timedelta
from connection.base_executor import BaseExecutor
from core.test_run import TestRun
from test_tools.fs_utils import copy
from test_utils.output import Output, CmdException
from test_tools.fs_tools import copy
from connection.utils.output import Output, CmdException
class LocalExecutor(BaseExecutor):

View File

@@ -3,17 +3,18 @@
# Copyright(c) 2024 Huawei Technologies Co., Ltd.
# SPDX-License-Identifier: BSD-3-Clause
#
import os
import re
import paramiko
import socket
import subprocess
from datetime import timedelta, datetime
import paramiko
from datetime import timedelta, datetime
from connection.base_executor import BaseExecutor
from core.test_run import TestRun, Blocked
from test_utils.output import Output
from connection.utils.output import Output
class SshExecutor(BaseExecutor):
@@ -46,7 +47,7 @@ class SshExecutor(BaseExecutor):
hostname = target["hostname"]
key_filename = target.get("identityfile", None)
user = target.get("user", user)
port = target.get("port", port)
port = int(target.get("port", port))
if target.get("proxyjump", None) is not None:
proxy = config.lookup(target["proxyjump"])
jump = paramiko.SSHClient()

View File

@@ -0,0 +1,4 @@
#
# Copyright(c) 2024 Huawei Technologies Co., Ltd.
# SPDX-License-Identifier: BSD-3-Clause
#

View File

@@ -1,9 +1,10 @@
#
# Copyright(c) 2020-2021 Intel Corporation
# Copyright(c) 2024 Huawei Technologies Co., Ltd.
# SPDX-License-Identifier: BSD-3-Clause
#
import concurrent
from concurrent.futures import ThreadPoolExecutor
def start_async_func(func, *args):
@@ -14,5 +15,5 @@ def start_async_func(func, *args):
- done() method returns True when task ended (have a result or ended with an exception)
otherwise returns False
"""
executor = concurrent.futures.ThreadPoolExecutor()
executor = ThreadPoolExecutor()
return executor.submit(func, *args)

View File

@@ -1,5 +1,6 @@
#
# Copyright(c) 2019-2021 Intel Corporation
# Copyright(c) 2024 Huawei Technologies Co., Ltd.
# SPDX-License-Identifier: BSD-3-Clause
#

View File

@@ -1,5 +1,6 @@
#
# Copyright(c) 2021 Intel Corporation
# Copyright(c) 2024 Huawei Technologies Co., Ltd.
# SPDX-License-Identifier: BSD-3-Clause
#
@@ -33,7 +34,7 @@ class Retry:
try:
result = func()
return True
except:
except Exception:
return False
cls.run_while_false(wrapped_func, retries=retries, timeout=timeout)

View File

@@ -22,6 +22,7 @@ class TestRun:
plugin_manager = None
duts = None
disks = None
TEST_RUN_DATA_PATH = None
@classmethod
@contextmanager

View File

@@ -1,6 +1,6 @@
#
# Copyright(c) 2019-2021 Intel Corporation
# Copyright(c) 2023-2024 Huawei Technologies Co., Ltd.
# Copyright(c) 2023-2025 Huawei Technologies Co., Ltd.
# SPDX-License-Identifier: BSD-3-Clause
#
@@ -19,10 +19,11 @@ from core.pair_testing import generate_pair_testing_testcases, register_testcase
from core.plugins import PluginManager
from log.base_log import BaseLogResult
from storage_devices.disk import Disk
from test_utils import disk_finder
from test_tools import disk_finder
from test_utils.dut import Dut
TestRun = core.test_run.TestRun
TestRun.TEST_RUN_DATA_PATH = "/tmp/test_data"
@classmethod
@@ -132,7 +133,8 @@ def __presetup(cls):
if cls.config['type'] == 'ssh':
try:
IP(cls.config['ip'])
cls.config['host'] = cls.config['ip']
if not cls.config['host']:
cls.config['host'] = cls.config['ip']
except ValueError:
TestRun.block("IP address from config is in invalid format.")
except KeyError:
@@ -211,7 +213,7 @@ def __makereport(cls, item, call, res):
if res.outcome == "skipped":
cls.LOGGER.skip("Test skipped.")
if res.when == "call" and cls.LOGGER.get_result() == BaseLogResult.FAILED:
if res.when in ["call", "setup"] and cls.LOGGER.get_result() >= BaseLogResult.FAILED:
res.outcome = "failed"
# To print additional message in final test report, assign it to res.longrepr

View File

@@ -1,6 +1,6 @@
#
# Copyright(c) 2020-2021 Intel Corporation
# Copyright(c) 2023-2024 Huawei Technologies Co., Ltd.
# Copyright(c) 2023-2025 Huawei Technologies Co., Ltd.
# SPDX-License-Identifier: BSD-3-Clause
#
@@ -46,18 +46,25 @@ class PowerControlPlugin:
def teardown(self):
pass
def power_cycle(self):
self.executor.run_expect_success(f"sudo virsh reset {TestRun.dut.virsh['vm_name']}")
def power_cycle(self, wait_for_connection: bool = False, delay_until_reboot: int = 0) -> None:
self.executor.run_expect_success(f"sudo virsh destroy {TestRun.dut.virsh['vm_name']}")
TestRun.executor.disconnect()
TestRun.executor.wait_for_connection(timedelta(seconds=TestRun.dut.virsh["reboot_timeout"]))
self.executor.run_expect_success(
f"(sleep {delay_until_reboot} && sudo virsh start {TestRun.dut.virsh['vm_name']}) &"
)
if wait_for_connection:
TestRun.executor.wait_for_connection(
timedelta(seconds=TestRun.dut.virsh["reboot_timeout"])
)
def check_if_vm_exists(self, vm_name) -> bool:
return self.executor.run(f"sudo virsh list|grep -w {vm_name}").exit_code == 0
def parse_virsh_config(self, vm_name, reboot_timeout=DEFAULT_REBOOT_TIMEOUT) -> dict | None:
if not self.check_if_vm_exists(vm_name=vm_name):
raise ValueError(f"Virsh power plugin error: couldn't find VM {vm_name} on host "
f"{self.host}")
raise ValueError(
f"Virsh power plugin error: couldn't find VM {vm_name} on host {self.host}"
)
return {
"vm_name": vm_name,
"reboot_timeout": reboot_timeout,

View File

@@ -6,8 +6,8 @@ from time import sleep
from core.test_run_utils import TestRun
from storage_devices.device import Device
from test_utils import os_utils
from test_utils.output import CmdException
from connection.utils.output import CmdException
from test_tools.os_tools import load_kernel_module, is_kernel_module_loaded, unload_kernel_module
class ScsiDebug:
@@ -24,7 +24,7 @@ class ScsiDebug:
def reload(self):
self.teardown()
sleep(1)
load_output = os_utils.load_kernel_module(self.module_name, self.params)
load_output = load_kernel_module(self.module_name, self.params)
if load_output.exit_code != 0:
raise CmdException(f"Failed to load {self.module_name} module", load_output)
TestRun.LOGGER.info(f"{self.module_name} loaded successfully.")
@@ -32,8 +32,8 @@ class ScsiDebug:
TestRun.scsi_debug_devices = Device.get_scsi_debug_devices()
def teardown(self):
if os_utils.is_kernel_module_loaded(self.module_name):
os_utils.unload_kernel_module(self.module_name)
if is_kernel_module_loaded(self.module_name):
unload_kernel_module(self.module_name)
plugin_class = ScsiDebug

View File

@@ -1,98 +0,0 @@
#
# Copyright(c) 2020-2021 Intel Corporation
# Copyright(c) 2023-2024 Huawei Technologies Co., Ltd.
# SPDX-License-Identifier: BSD-3-Clause
#
import time
import posixpath
from datetime import timedelta
from core.test_run import TestRun
from test_tools import fs_utils
class Vdbench:
def __init__(self, params, config):
print("VDBench plugin initialization")
self.run_time = timedelta(seconds=60)
try:
self.working_dir = config["working_dir"]
self.reinstall = config["reinstall"]
self.source_dir = config["source_dir"]
except Exception:
raise Exception("Missing fields in config! ('working_dir', 'source_dir' and "
"'reinstall' required)")
self.result_dir = posixpath.join(self.working_dir, 'result.tod')
def pre_setup(self):
pass
def post_setup(self):
print("VDBench plugin post setup")
if not self.reinstall and fs_utils.check_if_directory_exists(self.working_dir):
return
if fs_utils.check_if_directory_exists(self.working_dir):
fs_utils.remove(self.working_dir, True, True)
fs_utils.create_directory(self.working_dir)
TestRun.LOGGER.info("Copying vdbench to working dir.")
fs_utils.copy(
source=self.source_dir, destination=self.working_dir, force=True, recursive=True
)
def teardown(self):
pass
def create_config(self, config, run_time: timedelta):
self.run_time = run_time
if config[-1] != ",":
config += ","
config += f"elapsed={int(run_time.total_seconds())}"
TestRun.LOGGER.info(f"Vdbench config:\n{config}")
fs_utils.write_file(posixpath.join(self.working_dir, "param.ini"), config)
def run(self):
cmd = f"{posixpath.join(self.working_dir, 'vdbench')} " \
f"-f {posixpath.join(self.working_dir, 'param.ini')} " \
f"-vr -o {self.result_dir}"
full_cmd = f"screen -dmS vdbench {cmd}"
TestRun.executor.run(full_cmd)
start_time = time.time()
timeout = self.run_time * 1.5
while True:
if not TestRun.executor.run(f"ps aux | grep '{cmd}' | grep -v grep").exit_code == 0:
return self.analyze_log()
if time.time() - start_time > timeout.total_seconds():
TestRun.LOGGER.error("Vdbench timeout.")
return False
time.sleep(1)
def analyze_log(self):
output = TestRun.executor.run(
f"ls -1td {self.result_dir[0:len(self.result_dir) - 3]}* | head -1")
log_path = posixpath.join(output.stdout if output.exit_code == 0 else self.result_dir,
"logfile.html")
log_file = fs_utils.read_file(log_path)
if "Vdbench execution completed successfully" in log_file:
TestRun.LOGGER.info("Vdbench execution completed successfully.")
return True
if "Data Validation error" in log_file or "data_errors=1" in log_file:
TestRun.LOGGER.error("Data corruption occurred!")
elif "Heartbeat monitor:" in log_file:
TestRun.LOGGER.error("Vdbench: heartbeat.")
else:
TestRun.LOGGER.error("Vdbench unknown result.")
return False
plugin_class = Vdbench

View File

@@ -1,13 +1,14 @@
#
# Copyright(c) 2019-2021 Intel Corporation
# Copyright(c) 2024 Huawei Technologies
# SPDX-License-Identifier: BSD-3-Clause
#
from enum import Enum
from enum import IntEnum
from re import sub
class BaseLogResult(Enum):
class BaseLogResult(IntEnum):
DEBUG = 10
PASSED = 11
WORKAROUND = 12

View File

@@ -1,10 +1,13 @@
#
# Copyright(c) 2019-2021 Intel Corporation
# Copyright(c) 2025 Huawei Technologies Co., Ltd.
# SPDX-License-Identifier: BSD-3-Clause
#
import logging
import os
import posixpath
import re
import sys
from contextlib import contextmanager
from datetime import datetime
@@ -15,8 +18,8 @@ import portalocker
from log.html_log_config import HtmlLogConfig
from log.html_log_manager import HtmlLogManager
from log.html_presentation_policy import html_policy
from test_utils.output import Output
from test_utils.singleton import Singleton
from connection.utils.output import Output
from test_utils.common.singleton import Singleton
def create_log(log_base_path, test_module, additional_args=None):
@@ -35,6 +38,7 @@ def create_log(log_base_path, test_module, additional_args=None):
finally:
log.begin(test_name)
print(f"\n<LogFile>{os.path.join(log.base_dir, 'main.html')}</LogFile>")
Log.add_file_logger(log.base_dir)
if error_msg:
log.exception(error_msg)
return log
@@ -44,6 +48,7 @@ class Log(HtmlLogManager, metaclass=Singleton):
logger = None
LOG_FORMAT = '%(asctime)s %(levelname)s:\t%(message)s'
DATE_FORMAT = "%Y/%m/%d %H:%M:%S"
unique_test_identifier = ""
command_id = 0
lock = Lock()
@@ -77,6 +82,14 @@ class Log(HtmlLogManager, metaclass=Singleton):
cls.logger = logger
logger.info("Logger successfully initialized.")
@classmethod
def add_file_logger(cls, log_path):
file_handler = logging.FileHandler(os.path.join(log_path, 'stdout.log'))
file_handler.setLevel(logging.DEBUG)
formatter = logging.Formatter(Log.LOG_FORMAT, Log.DATE_FORMAT)
file_handler.setFormatter(formatter)
cls.logger.addHandler(file_handler)
@contextmanager
def step(self, message):
self.step_info(message)
@@ -179,23 +192,32 @@ class Log(HtmlLogManager, metaclass=Singleton):
def get_additional_logs(self):
from core.test_run import TestRun
from test_tools.fs_utils import check_if_file_exists
from test_tools.fs_tools import check_if_file_exists
messages_log = "/var/log/messages"
if not check_if_file_exists(messages_log):
messages_log = "/var/log/syslog"
log_files = {"messages.log": messages_log,
"dmesg.log": "/tmp/dmesg"}
log_files = {"messages.log": posixpath.join(TestRun.TEST_RUN_DATA_PATH, "messages"),
"dmesg.log": posixpath.join(TestRun.TEST_RUN_DATA_PATH, "dmesg")}
extra_logs = TestRun.config.get("extra_logs", {})
log_files.update(extra_logs)
TestRun.executor.run(f"dmesg > {log_files['dmesg.log']}")
# Escape special characters from test identifier to be properly processed by awk
test_identifier = re.escape(TestRun.LOGGER.unique_test_identifier)
TestRun.executor.run(
f"dmesg | awk '/{test_identifier}/,0' > {log_files['dmesg.log']}")
TestRun.executor.run(
f"awk '/{test_identifier}/,0' {messages_log} > {log_files['messages.log']}")
dut_identifier = TestRun.dut.ip if TestRun.dut.ip else TestRun.dut.config["host"]
for log_name, log_source_path in log_files.items():
try:
log_destination_path = os.path.join(
self.base_dir, f"dut_info", TestRun.dut.ip, log_name
self.base_dir, "dut_info", dut_identifier, log_name
)
TestRun.executor.rsync_from(log_source_path, log_destination_path)
TestRun.executor.copy_from(log_source_path, log_destination_path)
except Exception as e:
TestRun.LOGGER.warning(
f"There was a problem during gathering {log_name} log.\n{str(e)}"
@@ -218,3 +240,11 @@ class Log(HtmlLogManager, metaclass=Singleton):
}
}
json.dump(data, summary)
def print_test_identifier_to_logs(self):
from core.test_run import TestRun
# Add test identifier to dmesg
TestRun.executor.run(f"echo {self.unique_test_identifier} > /dev/kmsg")
# Add test identifier to messages log
TestRun.executor.run(f"logger {self.unique_test_identifier}")

4
scripts/__init__.py Normal file
View File

@@ -0,0 +1,4 @@
#
# Copyright(c) 2024 Huawei Technologies Co., Ltd.
# SPDX-License-Identifier: BSD-3-Clause
#

View File

@@ -3,29 +3,32 @@
# Copyright(c) 2023-2024 Huawei Technologies Co., Ltd.
# SPDX-License-Identifier: BSD-3-Clause
#
import posixpath
from core.test_run import TestRun
from test_tools import disk_utils, fs_utils
from test_tools.disk_utils import get_device_filesystem_type, get_sysfs_path
from test_tools import disk_tools
from test_tools.disk_tools import get_sysfs_path, validate_dev_path, get_size
from test_tools.fs_tools import (get_device_filesystem_type, Filesystem, wipefs,
readlink, write_file, mkfs, ls, parse_ls_output)
from test_utils.io_stats import IoStats
from test_utils.size import Size, Unit
from type_def.size import Size, Unit
class Device:
def __init__(self, path):
disk_utils.validate_dev_path(path)
validate_dev_path(path)
self.path = path
self.size = Size(disk_utils.get_size(self.get_device_id()), Unit.Byte)
self.size = Size(get_size(self.get_device_id()), Unit.Byte)
self.filesystem = get_device_filesystem_type(self.get_device_id())
self.mount_point = None
def create_filesystem(self, fs_type: disk_utils.Filesystem, force=True, blocksize=None):
disk_utils.create_filesystem(self, fs_type, force, blocksize)
def create_filesystem(self, fs_type: Filesystem, force=True, blocksize=None):
mkfs(self, fs_type, force, blocksize)
self.filesystem = fs_type
def wipe_filesystem(self, force=True):
disk_utils.wipe_filesystem(self, force)
wipefs(self, force)
self.filesystem = None
def is_mounted(self):
@@ -34,13 +37,13 @@ class Device:
return False
else:
mount_point_line = output.stdout.split('\n')[1]
device_path = fs_utils.readlink(self.path)
device_path = readlink(self.path)
self.mount_point = mount_point_line[0:mount_point_line.find(device_path)].strip()
return True
def mount(self, mount_point, options: [str] = None):
if not self.is_mounted():
if disk_utils.mount(self, mount_point, options):
if disk_tools.mount(self, mount_point, options):
self.mount_point = mount_point
else:
raise Exception(f"Device is already mounted! Actual mount point: {self.mount_point}")
@@ -48,7 +51,7 @@ class Device:
def unmount(self):
if not self.is_mounted():
TestRun.LOGGER.info("Device is not mounted.")
elif disk_utils.unmount(self):
elif disk_tools.unmount(self):
self.mount_point = None
def get_device_link(self, directory: str):
@@ -56,27 +59,26 @@ class Device:
return next(i for i in items if i.full_path.startswith(directory))
def get_device_id(self):
return fs_utils.readlink(self.path).split('/')[-1]
return readlink(self.path).split('/')[-1]
def get_all_device_links(self, directory: str):
from test_tools import fs_utils
output = fs_utils.ls(f"$(find -L {directory} -samefile {self.path})")
return fs_utils.parse_ls_output(output, self.path)
output = ls(f"$(find -L {directory} -samefile {self.path})")
return parse_ls_output(output, self.path)
def get_io_stats(self):
return IoStats.get_io_stats(self.get_device_id())
def get_sysfs_property(self, property_name):
path = posixpath.join(disk_utils.get_sysfs_path(self.get_device_id()),
path = posixpath.join(get_sysfs_path(self.get_device_id()),
"queue", property_name)
return TestRun.executor.run_expect_success(f"cat {path}").stdout
def set_sysfs_property(self, property_name, value):
TestRun.LOGGER.info(
f"Setting {property_name} for device {self.get_device_id()} to {value}.")
path = posixpath.join(disk_utils.get_sysfs_path(self.get_device_id()), "queue",
path = posixpath.join(get_sysfs_path(self.get_device_id()), "queue",
property_name)
fs_utils.write_file(path, str(value))
write_file(path, str(value))
def set_max_io_size(self, new_max_io_size: Size):
self.set_sysfs_property("max_sectors_kb",
@@ -101,6 +103,11 @@ class Device:
return int(TestRun.executor.run_expect_success(
f"cat {get_sysfs_path(self.get_device_id())}/device/numa_node").stdout)
def get_serial(self):
sysfs_path = get_sysfs_path(self.get_device_id())
serial_path = posixpath.join(sysfs_path, "device", "serial")
return TestRun.executor.run_expect_success(f"cat {serial_path}").stdout
def __str__(self):
return (
f'system path: {self.path}, short link: /dev/{self.get_device_id()},'

View File

@@ -11,12 +11,14 @@ from datetime import timedelta
from enum import IntEnum
from core.test_run import TestRun
from connection.utils.output import Output
from storage_devices.device import Device
from test_tools import disk_utils, fs_utils, nvme_cli
from test_utils import disk_finder
from test_utils.os_utils import wait
from test_utils.output import Output
from test_utils.size import Unit
from test_tools import disk_tools, nvme_cli
from test_tools.common.wait import wait
from test_tools.disk_finder import get_block_devices_list, resolve_to_by_id_link
from test_tools.disk_tools import PartitionTable
from test_tools.fs_tools import readlink, is_mounted, ls_item, parse_ls_output
from type_def.size import Unit
class DiskType(IntEnum):
@@ -137,33 +139,33 @@ class Disk(Device):
)
return recognized_types[0]
def create_partitions(self, sizes: [], partition_table_type=disk_utils.PartitionTable.gpt):
disk_utils.create_partitions(self, sizes, partition_table_type)
def create_partitions(self, sizes: [], partition_table_type=PartitionTable.gpt):
disk_tools.create_partitions(self, sizes, partition_table_type)
def remove_partition(self, part):
part_number = int(part.path.split("part")[1])
disk_utils.remove_parition(self, part_number)
disk_tools.remove_parition(self, part_number)
self.partitions.remove(part)
def umount_all_partitions(self):
TestRun.LOGGER.info(f"Unmounting all partitions from: {self.path}")
cmd = f"umount -l {fs_utils.readlink(self.path)}*?"
cmd = f"umount -l {readlink(self.path)}*?"
TestRun.executor.run(cmd)
def remove_partitions(self):
for part in self.partitions:
if part.is_mounted():
if is_mounted(part.path):
part.unmount()
if disk_utils.remove_partitions(self):
if disk_tools.remove_partitions(self):
self.partitions.clear()
def is_detected(self):
if self.serial_number:
serial_numbers = disk_finder.get_all_serial_numbers()
serial_numbers = Disk.get_all_serial_numbers()
return self.serial_number in serial_numbers
elif self.path:
output = fs_utils.ls_item(f"{self.path}")
return fs_utils.parse_ls_output(output)[0] is not None
output = ls_item(f"{self.path}")
return parse_ls_output(output)[0] is not None
raise Exception("Couldn't check if device is detected by the system")
def wait_for_plug_status(self, should_be_visible):
@@ -214,6 +216,40 @@ class Disk(Device):
for disk_type in cls.types_registry:
disk_type.plug_all()
@staticmethod
def get_all_serial_numbers():
serial_numbers = {}
block_devices = get_block_devices_list()
for dev in block_devices:
serial = Disk.get_disk_serial_number(dev)
try:
path = resolve_to_by_id_link(dev)
except Exception:
continue
if serial:
serial_numbers[serial] = path
else:
TestRun.LOGGER.warning(f"Device {path} ({dev}) does not have a serial number.")
serial_numbers[path] = path
return serial_numbers
@staticmethod
def get_disk_serial_number(dev_path):
commands = [
f"(udevadm info --query=all --name={dev_path} | grep 'SCSI.*_SERIAL' || "
f"udevadm info --query=all --name={dev_path} | grep 'ID_SERIAL_SHORT') | "
"awk -F '=' '{print $NF}'",
f"sg_inq {dev_path} 2> /dev/null | grep '[Ss]erial number:' | "
"awk '{print $NF}'",
f"udevadm info --query=all --name={dev_path} | grep 'ID_SERIAL' | "
"awk -F '=' '{print $NF}'"
]
for command in commands:
serial = TestRun.executor.run(command).stdout
if serial:
return serial.split('\n')[0]
return None
@static_init
class NvmeDisk(Disk):
@@ -255,8 +291,8 @@ class NvmeDisk(Disk):
base = f"/sys/block/{device_id}/device"
for suffix in ["/remove", "/device/remove"]:
try:
output = fs_utils.ls_item(base + suffix)
fs_utils.parse_ls_output(output)[0]
output = ls_item(base + suffix)
parse_ls_output(output)[0]
except TypeError:
continue
return base + suffix
@@ -288,8 +324,8 @@ class SataDisk(Disk):
@classmethod
def plug_all(cls) -> Output:
cmd = (
f"for i in $(find -H /sys/devices/ -path '*/scsi_host/*/scan' -type f); do echo "
f"'- - -' > $i; done;"
"find -H /sys/devices/ -path '*/scsi_host/*/scan' -type f |"
" xargs -P20 -I % sh -c \"echo '- - -' | tee %\""
)
output = TestRun.executor.run_expect_success(cmd)
return output
@@ -311,8 +347,8 @@ class SataDisk(Disk):
@staticmethod
def get_sysfs_addr(device_id):
ls_command = f"$(find -H /sys/devices/ -name {device_id} -type d)"
output = fs_utils.ls_item(f"{ls_command}")
sysfs_addr = fs_utils.parse_ls_output(output)[0]
output = ls_item(ls_command)
sysfs_addr = parse_ls_output(output)[0]
if not sysfs_addr:
raise Exception(f"Failed to find sysfs address: ls -l {ls_command}")
return sysfs_addr.full_path
@@ -378,8 +414,8 @@ class VirtioDisk(Disk):
@staticmethod
def get_sysfs_addr(device_id: str) -> str:
ls_command = f"$(find -H /sys/devices/ -name {device_id} -type d)"
output = fs_utils.ls_item(f"{ls_command}")
sysfs_addr = fs_utils.parse_ls_output(output)[0]
output = ls_item(ls_command)
sysfs_addr = parse_ls_output(output)[0]
if not sysfs_addr:
raise Exception(f"Failed to find sysfs address: ls -l {ls_command}")

View File

@@ -3,14 +3,13 @@
# SPDX-License-Identifier: BSD-3-Clause-Clear
#
import os
import posixpath
from core.test_run import TestRun
from storage_devices.device import Device
from test_tools.drbdadm import Drbdadm
from test_utils.filesystem.symlink import Symlink
from test_utils.output import CmdException
from connection.utils.output import CmdException
class Drbd(Device):

View File

@@ -0,0 +1,87 @@
#
# Copyright(c) 2019-2022 Intel Corporation
# Copyright(c) 2023-2024 Huawei Technologies Co., Ltd.
# SPDX-License-Identifier: BSD-3-Clause
#
from core.test_run import TestRun
from storage_devices.device import Device
from test_tools.device_mapper import DmTable, DeviceMapper
from test_tools.disk_finder import resolve_to_by_id_link
class ErrorDevice(Device):
def __init__(self, name: str, base_device: Device, table: DmTable = None):
self.device = base_device
self.mapper = DeviceMapper(name)
self.name = name
self.table = DmTable.passthrough_table(base_device) if not table else table
self.active = False
self.start()
self.path = resolve_to_by_id_link(self.mapper.get_path().replace('/dev/', ''))
@property
def system_path(self):
if self.active:
output = TestRun.executor.run_expect_success(f"realpath {self.mapper.get_path()}")
return output.stdout
return None
@property
def size(self):
if self.active:
return self.table.get_size()
return None
def start(self):
self.mapper.create(self.table)
self.active = True
def stop(self):
self.mapper.remove()
self.active = False
def change_table(self, table: DmTable, permanent=True):
if self.active:
self.mapper.suspend()
self.mapper.reload(table)
self.mapper.resume()
if permanent:
self.table = table
def suspend_errors(self):
empty_table = DmTable.passthrough_table(self.device)
TestRun.LOGGER.info(f"Suspending issuing errors for error device '{self.name}'")
self.change_table(empty_table, False)
def resume_errors(self):
TestRun.LOGGER.info(f"Resuming issuing errors for error device '{self.name}'")
self.change_table(self.table, False)
def suspend(self):
if not self.active:
TestRun.LOGGER.warning(
f"cannot suspend error device '{self.name}'! It's already running"
)
self.mapper.suspend()
self.active = False
def resume(self):
if self.active:
TestRun.LOGGER.warning(
f"cannot resume error device '{self.name}'! It's already running"
)
self.mapper.resume()
self.active = True

View File

@@ -1,20 +1,19 @@
#
# Copyright(c) 2022 Intel Corporation
# Copyright(c) 2024 Huawei Technologies Co., Ltd.
# SPDX-License-Identifier: BSD-3-Clause
#
import threading
import threading
from typing import Union
from api.cas.core import Core
from core.test_run import TestRun
from storage_devices.device import Device
from storage_devices.disk import Disk, NvmeDisk
from storage_devices.partition import Partition
from test_tools.fs_utils import readlink
from test_utils.disk_finder import resolve_to_by_id_link
from storage_devices.disk import Disk
from test_tools.fs_tools import readlink
from test_tools.disk_finder import resolve_to_by_id_link, get_system_disks
from test_utils.filesystem.symlink import Symlink
from test_utils.size import Size
from type_def.size import Size
lvm_config_path = "/etc/lvm/lvm.conf"
filter_prototype_regex = r"^\sfilter\s=\s\["
@@ -29,16 +28,12 @@ class LvmConfiguration:
lvm_filters: [] = None,
pv_num: int = None,
vg_num: int = None,
lv_num: int = None,
cache_num: int = None,
cas_dev_num: int = None
lv_num: int = None
):
self.lvm_filters = lvm_filters
self.pv_num = pv_num
self.vg_num = vg_num
self.lv_num = lv_num
self.cache_num = cache_num
self.cas_dev_num = cas_dev_num
@staticmethod
def __read_definition_from_lvm_config(
@@ -122,7 +117,7 @@ class LvmConfiguration:
return cls.__read_definition_from_lvm_config(global_filter_prototype_regex)
@classmethod
def add_block_devices_to_lvm_config(
def add_block_device_to_lvm_config(
cls,
device_type: str
):
@@ -137,44 +132,27 @@ class LvmConfiguration:
filters: []
):
if filters is None:
TestRun.LOGGER.error(f"Lvm filters for lvm config not provided.")
raise ValueError(f"Lvm filters for lvm config not provided.")
for f in filters:
cls.__add_filter_to_lvm_config(f)
@classmethod
def configure_dev_types_in_config(
cls,
devices: ([Device], Device)
):
if isinstance(devices, list):
devs = []
for device in devices:
dev = device.parent_device if isinstance(device, Partition) else device
devs.append(dev)
if any(isinstance(dev, Core) for dev in devs):
cls.add_block_devices_to_lvm_config("cas")
if any(isinstance(dev, NvmeDisk) for dev in devs):
cls.add_block_devices_to_lvm_config("nvme")
else:
dev = devices.parent_device if isinstance(devices, Partition) else devices
if isinstance(dev, Core):
cls.add_block_devices_to_lvm_config("cas")
if isinstance(dev, NvmeDisk):
cls.add_block_devices_to_lvm_config("nvme")
@classmethod
def configure_filters(
cls,
lvm_filters: [],
devices: ([Device], Device)
):
if lvm_filters:
TestRun.LOGGER.info(f"Preparing configuration for LVMs - filters.")
LvmConfiguration.add_filters_to_lvm_config(lvm_filters)
cls.configure_dev_types_in_config(devices)
os_disk_filters = [
f"a|/dev/{disk}|" for disk in get_system_disks()
] if Lvm.get_os_vg() else None
if os_disk_filters:
TestRun.LOGGER.info(f"Add OS disks to LVM filters.")
LvmConfiguration.add_filters_to_lvm_config(os_disk_filters)
@staticmethod
def remove_global_filter_from_config():
@@ -186,6 +164,12 @@ class LvmConfiguration:
cmd = f"sed -i '/{filter_prototype_regex}/d' {lvm_config_path}"
TestRun.executor.run(cmd)
@staticmethod
def set_use_devices_file(use_devices_file=True):
cmd = (fr"sed -i 's/^\s*#*\s*\(use_devicesfile\).*/\t\1 = "
fr"{1 if use_devices_file else 0}/' {lvm_config_path}")
TestRun.executor.run(cmd)
class VolumeGroup:
__unique_vg_id = 0
@@ -266,7 +250,7 @@ class VolumeGroup:
if vg_name in volume_groups:
return cls(vg_name)
else:
TestRun.LOGGER.error("Had not found newly created VG.")
raise Exception("Had not found newly created VG.")
@staticmethod
def remove(vg_name: str):
@@ -332,13 +316,10 @@ class Lvm(Disk):
@classmethod
def configure_global_filter(
cls,
dev_first: Device,
lv_amount: int,
pv_devs: ([Device], Device)
):
device_first = dev_first.parent_device if isinstance(dev_first, Partition) else dev_first
if lv_amount > 1 and isinstance(device_first, Core):
if lv_amount > 1:
global_filter_def = LvmConfiguration.read_global_filter_definition_from_lvm_config()
if not isinstance(pv_devs, list):
pv_devs = [pv_devs]
@@ -388,13 +369,13 @@ class Lvm(Disk):
cls,
devices: ([Device], Device),
lvm_configuration: LvmConfiguration,
lvm_as_core: bool = False
global_filter: bool = False
):
pv_per_vg = int(lvm_configuration.pv_num / lvm_configuration.vg_num)
lv_per_vg = int(lvm_configuration.lv_num / lvm_configuration.vg_num)
lv_size_percentage = int(100 / lv_per_vg)
LvmConfiguration.configure_filters(lvm_configuration.lvm_filters, devices)
LvmConfiguration.configure_filters(lvm_configuration.lvm_filters)
logical_volumes = []
@@ -405,17 +386,15 @@ class Lvm(Disk):
end_range = start_range + pv_per_vg
for i in range(start_range, end_range):
pv_devs.append(devices[i])
device_first = devices[0]
else:
pv_devs = devices
device_first = devices
for j in range(lv_per_vg):
lv = cls.create(lv_size_percentage, pv_devs)
logical_volumes.append(lv)
if lvm_as_core:
cls.configure_global_filter(device_first, lv_per_vg, pv_devs)
if global_filter:
cls.configure_global_filter(lv_per_vg, pv_devs)
return logical_volumes
@@ -431,7 +410,7 @@ class Lvm(Disk):
elif isinstance(volume_size_or_percent, int):
size_cmd = f"--extents {volume_size_or_percent}%VG"
else:
TestRun.LOGGER.error(f"Incorrect type of the first argument (volume_size_or_percent).")
raise ValueError(f"Incorrect type of the first argument (volume_size_or_percent).")
if not name:
name = cls.__get_unique_lv_name()
@@ -481,13 +460,8 @@ class Lvm(Disk):
return cls.discover_logical_volumes()
@staticmethod
def remove(lv_name: str, vg_name: str):
if not lv_name:
raise ValueError("LV name needed for LV remove operation.")
if not vg_name:
raise ValueError("VG name needed for LV remove operation.")
cmd = f"lvremove -f {vg_name}/{lv_name}"
def remove(lv_path: str):
cmd = f"lvremove -f {lv_path}"
return TestRun.executor.run(cmd)
@staticmethod
@@ -498,22 +472,41 @@ class Lvm(Disk):
cmd = f"pvremove {pv_name}"
return TestRun.executor.run(cmd)
@staticmethod
def get_os_vg():
disks = get_system_disks()
cmd = (f"pvdisplay -c | grep -e /dev/{' -e /dev/'.join(disks)} | "
"awk -F':' '$2 != \"\" {print $2}'") # display non-empty groups
os_vg_names = TestRun.executor.run(cmd).stdout
if os_vg_names:
return set(os_vg_names.split("\n")) # remove duplicates
return []
@staticmethod
def get_non_os_vg():
disks = get_system_disks()
cmd = (f"pvdisplay -c | grep -Ev \'/dev/{'|/dev'.join(disks)}' | "
"awk -F':' '$2 != \"\" {print $2}\'") # display non-empty groups
non_os_vg_names = TestRun.executor.run(cmd).stdout
if non_os_vg_names:
return set(non_os_vg_names.split("\n")) # remove duplicates
return []
@classmethod
def remove_all(cls):
cmd = f"lvdisplay | grep 'LV Path' | awk '{{print $3}}'"
lvm_paths = TestRun.executor.run(cmd).stdout.splitlines()
for lvm_path in lvm_paths:
lv_name = lvm_path.split('/')[-1]
vg_name = lvm_path.split('/')[-2]
cls.remove(lv_name, vg_name)
non_os_vg_names = Lvm.get_non_os_vg()
cmd = "lvdisplay -c | awk -F':' '{{print $1,$2}}'" # prints lv_path vg_name
lvs = [tuple(lv.strip().split(' ')) for lv in TestRun.executor.run(cmd).stdout.splitlines()]
[cls.remove(lv[0]) for lv in lvs if lv[1] in non_os_vg_names]
cmd = f"vgdisplay | grep 'VG Name' | awk '{{print $3}}'"
vg_names = TestRun.executor.run(cmd).stdout.splitlines()
for vg_name in vg_names:
for vg_name in non_os_vg_names:
TestRun.executor.run(f"vgchange -an {vg_name}")
VolumeGroup.remove(vg_name)
cmd = f"pvdisplay | grep 'PV Name' | awk '{{print $3}}'"
os_disks = get_system_disks()
# invert grep to make sure os_disks won`t be wiped during lvms cleanup
cmd += "".join([f" | grep -v {os_disk}" for os_disk in os_disks])
pv_names = TestRun.executor.run(cmd).stdout.splitlines()
for pv_name in pv_names:
cls.remove_pv(pv_name)

View File

@@ -5,11 +5,10 @@
from core.test_run import TestRun
from storage_devices.device import Device
from test_tools.fs_utils import ls, parse_ls_output
from test_utils.os_utils import (
from test_tools.fs_tools import ls, parse_ls_output
from test_tools.os_tools import (
unload_kernel_module,
is_kernel_module_loaded,
ModuleRemoveMethod,
reload_kernel_module,
)
@@ -37,7 +36,7 @@ class NullBlk(Device):
if not is_kernel_module_loaded(cls._module):
return
TestRun.LOGGER.info("Removing null_blk ")
unload_kernel_module(module_name=cls._module, unload_method=ModuleRemoveMethod.modprobe)
unload_kernel_module(module_name=cls._module)
@classmethod
def list(cls):

View File

@@ -4,13 +4,13 @@
#
from storage_devices.device import Device
from test_tools import disk_utils
from test_utils.size import Size
from test_tools.disk_tools import get_partition_path
from type_def.size import Size
class Partition(Device):
def __init__(self, parent_dev, type, number, begin: Size, end: Size):
Device.__init__(self, disk_utils.get_partition_path(parent_dev.path, number))
Device.__init__(self, get_partition_path(parent_dev.path, number))
self.number = number
self.parent_device = parent_dev
self.type = type

View File

@@ -2,16 +2,17 @@
# Copyright(c) 2020-2021 Intel Corporation
# SPDX-License-Identifier: BSD-3-Clause
#
import threading
from enum import IntEnum, Enum
from core.test_run import TestRun
from storage_devices.device import Device
from storage_devices.disk import Disk
from test_tools.fs_utils import readlink
from test_tools.fs_tools import readlink
from test_tools.mdadm import Mdadm
from test_utils.disk_finder import resolve_to_by_id_link
from test_utils.size import Size, Unit
from test_tools.disk_finder import resolve_to_by_id_link
from type_def.size import Size, Unit
def get_devices_paths_string(devices: [Device]):

View File

@@ -7,11 +7,11 @@ import posixpath
from core.test_run import TestRun
from storage_devices.device import Device
from test_tools import disk_utils
from test_tools.fs_utils import ls, parse_ls_output
from test_tools.disk_tools import get_size
from test_tools.fs_tools import ls, parse_ls_output
from test_utils.filesystem.symlink import Symlink
from test_utils.os_utils import reload_kernel_module, unload_kernel_module, is_kernel_module_loaded
from test_utils.size import Size, Unit
from test_tools.os_tools import reload_kernel_module, unload_kernel_module, is_kernel_module_loaded
from type_def.size import Size, Unit
class RamDisk(Device):
@@ -68,7 +68,7 @@ class RamDisk(Device):
ram_disks = cls._list_devices()
return (
len(ram_disks) >= disk_count
and Size(disk_utils.get_size(ram_disks[0].name), Unit.Byte).align_down(Unit.MiB.value)
and Size(get_size(ram_disks[0].name), Unit.Byte).align_down(Unit.MiB.value)
== disk_size.align_down(Unit.MiB.value)
)

View File

@@ -14,8 +14,9 @@ from datetime import timedelta
from core.test_run import TestRun
from storage_devices.device import Device
from test_utils.filesystem.directory import Directory
from test_utils.os_utils import is_mounted, drop_caches, DropCachesMode
from test_utils.size import Size, Unit
from test_tools.os_tools import drop_caches, DropCachesMode
from test_tools.fs_tools import is_mounted
from type_def.size import Size, Unit
DEBUGFS_MOUNT_POINT = "/sys/kernel/debug"
PREFIX = "trace_"

View File

@@ -0,0 +1,4 @@
#
# Copyright(c) 2024 Huawei Technologies Co., Ltd.
# SPDX-License-Identifier: BSD-3-Clause
#

View File

@@ -1,5 +1,6 @@
#
# Copyright(c) 2019-2021 Intel Corporation
# Copyright(c) 2024 Huawei Technologies Co., Ltd.
# SPDX-License-Identifier: BSD-3-Clause
#

20
test_tools/common/wait.py Normal file
View File

@@ -0,0 +1,20 @@
#
# Copyright(c) 2019-2022 Intel Corporation
# Copyright(c) 2024 Huawei Technologies Co., Ltd.
# SPDX-License-Identifier: BSD-3-Clause
#
import time
from datetime import timedelta, datetime
def wait(predicate, timeout: timedelta, interval: timedelta = None):
start_time = datetime.now()
result = False
while start_time + timeout > datetime.now():
result = predicate()
if result:
break
if interval is not None:
time.sleep(interval.total_seconds())
return result

View File

@@ -1,16 +1,17 @@
#
# Copyright(c) 2019-2021 Intel Corporation
# Copyright(c) 2024 Huawei Technologies Co., Ltd.
# SPDX-License-Identifier: BSD-3-Clause
#
import test_utils.linux_command as linux_comm
import test_utils.size as size
import type_def.size as size
from core.test_run import TestRun
from test_tools.common.linux_command import LinuxCommand
class Dd(linux_comm.LinuxCommand):
class Dd(LinuxCommand):
def __init__(self):
linux_comm.LinuxCommand.__init__(self, TestRun.executor, 'dd')
LinuxCommand.__init__(self, TestRun.executor, 'dd')
def block_size(self, value: size.Size):
return self.set_param('bs', int(value.get_value()))

View File

@@ -1,16 +1,17 @@
#
# Copyright(c) 2019-2021 Intel Corporation
# Copyright(c) 2024 Huawei Technologies Co., Ltd.
# SPDX-License-Identifier: BSD-3-Clause
#
import test_utils.linux_command as linux_comm
import test_utils.size as size
from core.test_run import TestRun
from test_tools.common.linux_command import LinuxCommand
from type_def.size import Size
class Ddrescue(linux_comm.LinuxCommand):
class Ddrescue(LinuxCommand):
def __init__(self):
linux_comm.LinuxCommand.__init__(self, TestRun.executor, 'ddrescue')
LinuxCommand.__init__(self, TestRun.executor, 'ddrescue')
self.source_path = None
self.destination_path = None
self.param_name_prefix = "--"
@@ -35,13 +36,13 @@ class Ddrescue(linux_comm.LinuxCommand):
def force(self):
return self.set_flags("force")
def block_size(self, value: size.Size):
def block_size(self, value: Size):
return self.set_param('sector-size', int(value.get_value()))
def size(self, value: size.Size):
def size(self, value: Size):
return self.set_param('size', int(value.get_value()))
def __str__(self):
command = linux_comm.LinuxCommand.__str__(self)
command = LinuxCommand.__str__(self)
command += f" {self.source_path} {self.destination_path}"
return command

View File

@@ -1,5 +1,6 @@
#
# Copyright(c) 2019-2021 Intel Corporation
# Copyright(c) 2023-2024 Huawei Technologies Co., Ltd.
# SPDX-License-Identifier: BSD-3-Clause
#
@@ -7,9 +8,8 @@ from enum import Enum
from core.test_run import TestRun
from storage_devices.device import Device
from test_utils.disk_finder import resolve_to_by_id_link
from test_utils.linux_command import LinuxCommand
from test_utils.size import Size, Unit
from test_tools.common.linux_command import LinuxCommand
from type_def.size import Size, Unit
class DmTarget(Enum):
@@ -30,11 +30,6 @@ class DmTarget(Enum):
return self.name.lower()
class DmTable:
class TableEntry:
pass
class DmTable:
class TableEntry:
def __init__(self, offset: int, length: int, target: DmTarget, *params):
@@ -131,7 +126,7 @@ class DmTable:
return self
def add_entry(self, entry: DmTable.TableEntry):
def add_entry(self, entry: TableEntry):
self.table.append(entry)
return self
@@ -250,80 +245,3 @@ class DeviceMapper(LinuxCommand):
return TestRun.executor.run_expect_success(
f"{self.command_name} reload {self.name} {self.wrap_table(table)}"
)
class ErrorDevice(Device):
def __init__(self, name: str, base_device: Device, table: DmTable = None):
self.device = base_device
self.mapper = DeviceMapper(name)
self.name = name
self.table = DmTable.passthrough_table(base_device) if not table else table
self.active = False
self.start()
self.path = resolve_to_by_id_link(self.mapper.get_path().replace('/dev/', ''))
@property
def system_path(self):
if self.active:
output = TestRun.executor.run_expect_success(f"realpath {self.mapper.get_path()}")
return output.stdout
return None
@property
def size(self):
if self.active:
return self.table.get_size()
return None
def start(self):
self.mapper.create(self.table)
self.active = True
def stop(self):
self.mapper.remove()
self.active = False
def change_table(self, table: DmTable, permanent=True):
if self.active:
self.mapper.suspend()
self.mapper.reload(table)
self.mapper.resume()
if permanent:
self.table = table
def suspend_errors(self):
empty_table = DmTable.passthrough_table(self.device)
TestRun.LOGGER.info(f"Suspending issuing errors for error device '{self.name}'")
self.change_table(empty_table, False)
def resume_errors(self):
TestRun.LOGGER.info(f"Resuming issuing errors for error device '{self.name}'")
self.change_table(self.table, False)
def suspend(self):
if not self.active:
TestRun.LOGGER.warning(
f"cannot suspend error device '{self.name}'! It's already running"
)
self.mapper.suspend()
self.active = False
def resume(self):
if self.active:
TestRun.LOGGER.warning(
f"cannot resume error device '{self.name}'! It's already running"
)
self.mapper.resume()
self.active = True

View File

@@ -1,15 +1,15 @@
#
# Copyright(c) 2019-2021 Intel Corporation
# Copyright(c) 2024 Huawei Technologies Co., Ltd.
# SPDX-License-Identifier: BSD-3-Clause
#
import os
import posixpath
from core.test_run import TestRun
from test_tools import disk_utils
from test_tools.fs_utils import check_if_file_exists, readlink
from test_utils import os_utils
from test_utils.output import CmdException
from test_tools.disk_tools import get_sysfs_path, get_block_size, get_size
from test_tools.fs_tools import check_if_file_exists, readlink
from connection.utils.output import CmdException
def find_disks():
@@ -49,7 +49,7 @@ def discover_hdd_devices(block_devices, devices_res):
for dev in block_devices:
if TestRun.executor.run_expect_success(f"cat /sys/block/{dev}/removable").stdout == "1":
continue # skip removable drives
block_size = disk_utils.get_block_size(dev)
block_size = get_block_size(dev)
if int(block_size) == 4096:
disk_type = 'hdd4k'
else:
@@ -61,7 +61,7 @@ def discover_hdd_devices(block_devices, devices_res):
f"sg_inq /dev/{dev} | grep -i 'serial number'"
).stdout.split(': ')[1].strip(),
"blocksize": block_size,
"size": disk_utils.get_size(dev)})
"size": get_size(dev)})
block_devices.clear()
@@ -98,57 +98,22 @@ def discover_ssd_devices(block_devices, devices_res):
"type": disk_type,
"path": resolve_to_by_id_link(device_path),
"serial": serial_number,
"blocksize": disk_utils.get_block_size(dev),
"size": disk_utils.get_size(dev)})
"blocksize": get_block_size(dev),
"size": get_size(dev)})
block_devices.remove(dev)
def get_disk_serial_number(dev_path):
commands = [
f"(udevadm info --query=all --name={dev_path} | grep 'SCSI.*_SERIAL' || "
f"udevadm info --query=all --name={dev_path} | grep 'ID_SERIAL_SHORT') | "
"awk --field-separator '=' '{print $NF}'",
f"sg_inq {dev_path} 2> /dev/null | grep '[Ss]erial number:' | "
"awk '{print $NF}'",
f"udevadm info --query=all --name={dev_path} | grep 'ID_SERIAL' | "
"awk --field-separator '=' '{print $NF}'"
]
for command in commands:
serial = TestRun.executor.run(command).stdout
if serial:
return serial.split('\n')[0]
return None
def get_all_serial_numbers():
serial_numbers = {}
block_devices = get_block_devices_list()
for dev in block_devices:
serial = get_disk_serial_number(dev)
try:
path = resolve_to_by_id_link(dev)
except Exception:
continue
if serial:
serial_numbers[serial] = path
else:
TestRun.LOGGER.warning(f"Device {path} ({dev}) does not have a serial number.")
serial_numbers[path] = path
return serial_numbers
def get_system_disks():
system_device = TestRun.executor.run_expect_success('mount | grep " / "').stdout.split()[0]
readlink_output = readlink(system_device)
device_name = readlink_output.split('/')[-1]
sys_block_path = os_utils.get_sys_block_path()
used_device_names = __get_slaves(device_name)
if not used_device_names:
used_device_names = [device_name]
disk_names = []
for device_name in used_device_names:
if check_if_file_exists(f'{sys_block_path}/{device_name}/partition'):
parent_device = readlink(f'{sys_block_path}/{device_name}/..').split('/')[-1]
if check_if_file_exists(os.path.join(get_sysfs_path(device_name), "partition")):
parent_device = readlink(os.path.join(get_sysfs_path(device_name), "..")).split('/')[-1]
disk_names.append(parent_device)
else:
disk_names.append(device_name)
@@ -159,7 +124,7 @@ def get_system_disks():
def __get_slaves(device_name: str):
try:
device_names = TestRun.executor.run_expect_success(
f'ls {os_utils.get_sys_block_path()}/{device_name}/slaves').stdout.splitlines()
f"ls {os.path.join(get_sysfs_path(device_name), 'slaves')}").stdout.splitlines()
except CmdException as e:
if "No such file or directory" not in e.output.stderr:
raise
@@ -177,7 +142,9 @@ def __get_slaves(device_name: str):
def resolve_to_by_id_link(path):
by_id_paths = TestRun.executor.run_expect_success("ls /dev/disk/by-id -1").stdout.splitlines()
dev_full_paths = [posixpath.join("/dev/disk/by-id", by_id_path) for by_id_path in by_id_paths]
dev_full_paths = [
posixpath.join("/dev/disk/by-id", by_id_path) for by_id_path in by_id_paths
]
for full_path in dev_full_paths:
# handle exception for broken links

View File

@@ -1,6 +1,6 @@
#
# Copyright(c) 2019-2022 Intel Corporation
# Copyright(c) 2023-2024 Huawei Technologies Co., Ltd.
# Copyright(c) 2023-2025 Huawei Technologies Co., Ltd.
# SPDX-License-Identifier: BSD-3-Clause
#
@@ -8,23 +8,18 @@ import posixpath
import re
import time
from enum import Enum
from typing import List
from core.test_run import TestRun
from test_tools import fs_utils
from test_tools.dd import Dd
from test_tools.fs_utils import readlink, parse_ls_output, ls
from test_utils.output import CmdException
from test_utils.size import Size, Unit
from test_tools.fs_tools import readlink, parse_ls_output, ls, check_if_directory_exists, \
create_directory, wipefs, is_mounted
from test_tools.udev import Udev
from type_def.size import Size, Unit
SECTOR_SIZE = 512
class Filesystem(Enum):
xfs = 0
ext3 = 1
ext4 = 2
class PartitionTable(Enum):
msdos = 0
gpt = 1
@@ -42,21 +37,6 @@ class PartitionType(Enum):
unknown = 8
def create_filesystem(device, filesystem: Filesystem, force=True, blocksize=None):
TestRun.LOGGER.info(
f"Creating filesystem ({filesystem.name}) on device: {device.path}")
force_param = ' -f ' if filesystem == Filesystem.xfs else ' -F '
force_param = force_param if force else ''
block_size_param = f' -b size={blocksize}' if filesystem == Filesystem.xfs \
else f' -b {blocksize}'
block_size_param = block_size_param if blocksize else ''
cmd = f'mkfs.{filesystem.name} {force_param} {device.path} {block_size_param}'
cmd = re.sub(' +', ' ', cmd)
TestRun.executor.run_expect_success(cmd)
TestRun.LOGGER.info(
f"Successfully created filesystem on device: {device.path}")
def create_partition_table(device, partition_table_type: PartitionTable = PartitionTable.gpt):
TestRun.LOGGER.info(
f"Creating partition table ({partition_table_type.name}) for device: {device.path}")
@@ -122,11 +102,11 @@ def create_partition(
TestRun.executor.run_expect_success("udevadm settle")
if not check_partition_after_create(
part_size,
part_number,
device.path,
part_type,
aligned):
size=part_size,
part_number=part_number,
parent_dev_path=device.path,
part_type=part_type,
aligned=aligned):
raise Exception("Could not create partition!")
if part_type != PartitionType.extended:
@@ -167,7 +147,7 @@ def create_partitions(device, sizes: [], partition_table_type=PartitionTable.gpt
for s in sizes:
size = Size(
s.get_value(device.block_size) - device.block_size.value, device.block_size)
s.get_value(device.block_size) - 1, device.block_size)
if partition_table_type == PartitionTable.msdos and \
len(sizes) > 4 and len(device.partitions) == 3:
if available_disk_size(device) > msdos_part_max_size:
@@ -182,12 +162,12 @@ def create_partitions(device, sizes: [], partition_table_type=PartitionTable.gpt
partition_number_offset = 1
partition_number = len(device.partitions) + 1 + partition_number_offset
create_partition(device,
size,
partition_number,
partition_type,
Unit.MebiByte,
True)
create_partition(device=device,
part_size=size,
part_number=partition_number,
part_type=partition_type,
unit=device.block_size,
aligned=True)
def get_block_size(device):
@@ -217,7 +197,8 @@ def get_pci_address(device):
return pci_address
def check_partition_after_create(size, part_number, parent_dev_path, part_type, aligned):
def check_partition_after_create(size: Size, part_number: int, parent_dev_path: str,
part_type: PartitionType, aligned: bool):
partition_path = get_partition_path(parent_dev_path, part_number)
if "dev/cas" not in partition_path:
cmd = f"find {partition_path} -type l"
@@ -266,8 +247,7 @@ def get_first_partition_offset(device, aligned: bool):
def remove_partitions(device):
from test_utils.os_utils import Udev
if device.is_mounted():
if is_mounted(device.path):
device.unmount()
for partition in device.partitions:
@@ -275,7 +255,7 @@ def remove_partitions(device):
TestRun.LOGGER.info(f"Removing partitions from device: {device.path} "
f"({device.get_device_id()}).")
device.wipe_filesystem()
wipefs(device)
Udev.trigger()
Udev.settle()
output = TestRun.executor.run(f"ls {device.path}* -1")
@@ -286,8 +266,8 @@ def remove_partitions(device):
def mount(device, mount_point, options: [str] = None):
if not fs_utils.check_if_directory_exists(mount_point):
fs_utils.create_directory(mount_point, True)
if not check_if_directory_exists(mount_point):
create_directory(mount_point, True)
TestRun.LOGGER.info(f"Mounting device {device.path} ({device.get_device_id()}) "
f"to {mount_point}.")
cmd = f"mount {device.path} {mount_point}"
@@ -329,15 +309,6 @@ def unit_to_string(unit):
return unit_string.get(unit, "Invalid unit.")
def wipe_filesystem(device, force=True):
TestRun.LOGGER.info(f"Erasing the device: {device.path}")
force_param = ' -f' if force else ''
cmd = f'wipefs -a{force_param} {device.path}'
TestRun.executor.run_expect_success(cmd)
TestRun.LOGGER.info(
f"Successfully wiped device: {device.path}")
def check_if_device_supports_trim(device):
if device.get_device_id().startswith("nvme"):
return True
@@ -350,27 +321,6 @@ def check_if_device_supports_trim(device):
return int(command_output.stdout) > 0
def get_device_filesystem_type(device_id):
cmd = f'lsblk -l -o NAME,FSTYPE | sort | uniq | grep "{device_id} "'
try:
stdout = TestRun.executor.run_expect_success(cmd).stdout
except CmdException:
# unusual devices might not be listed in output (i.e. RAID containers)
if TestRun.executor.run(f"test -b /dev/{device_id}").exit_code != 0:
raise
else:
return None
split_stdout = stdout.strip().split()
if len(split_stdout) <= 1:
return None
else:
try:
return Filesystem[split_stdout[1]]
except KeyError:
TestRun.LOGGER.warning(f"Unrecognized filesystem: {split_stdout[1]}")
return None
def _is_by_id_path(path: str):
"""check if given path already is proper by-id path"""
dev_by_id_dir = "/dev/disk/by-id"
@@ -406,3 +356,13 @@ def validate_dev_path(path: str):
return path
raise ValueError(f'By-id device link {path} is broken.')
def get_block_device_names_list(exclude_list: List[int] = None) -> List[str]:
cmd = "lsblk -lo NAME"
if exclude_list is not None:
cmd += f" -e {','.join(str(type_id) for type_id in exclude_list)}"
devices = TestRun.executor.run_expect_success(cmd).stdout
devices_list = devices.splitlines()
devices_list.sort()
return devices_list

View File

@@ -4,7 +4,7 @@
#
from core.test_run import TestRun
from test_utils.output import Output
from connection.utils.output import Output
def get_dmesg() -> str:

View File

@@ -1,32 +1,36 @@
#
# Copyright(c) 2019-2022 Intel Corporation
# Copyright(c) 2024 Huawei Technologies Co., Ltd.
# SPDX-License-Identifier: BSD-3-Clause
#
import datetime
import uuid
import test_tools.fio.fio_param
import test_tools.fs_utils
from packaging.version import Version
from core.test_run import TestRun
from test_tools import fs_utils
from test_utils import os_utils
from connection.utils.output import CmdException
from test_tools import wget
from test_tools.fio.fio_param import FioParam, FioParamCmd, FioOutput, FioParamConfig
from test_tools.fs_tools import uncompress_archive
class Fio:
def __init__(self, executor_obj=None):
self.fio_version = "fio-3.30"
self.min_fio_version = Version(TestRun.config.get("fio_version", "3.30"))
self.default_run_time = datetime.timedelta(hours=1)
self.jobs = []
self.executor = executor_obj if executor_obj is not None else TestRun.executor
self.base_cmd_parameters: test_tools.fio.fio_param.FioParam = None
self.global_cmd_parameters: test_tools.fio.fio_param.FioParam = None
self.base_cmd_parameters: FioParam = None
self.global_cmd_parameters: FioParam = None
def create_command(self, output_type=test_tools.fio.fio_param.FioOutput.json):
self.base_cmd_parameters = test_tools.fio.fio_param.FioParamCmd(self, self.executor)
self.global_cmd_parameters = test_tools.fio.fio_param.FioParamConfig(self, self.executor)
def create_command(self, output_type=FioOutput.json):
self.base_cmd_parameters = FioParamCmd(self, self.executor)
self.global_cmd_parameters = FioParamConfig(self, self.executor)
self.fio_file = f'fio_run_{datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")}_{uuid.uuid4().hex}'
self.fio_file = \
f'fio_run_{datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")}_{uuid.uuid4().hex}'
self.base_cmd_parameters\
.set_param('eta', 'always')\
.set_param('output-format', output_type.value)\
@@ -37,14 +41,21 @@ class Fio:
return self.global_cmd_parameters
def is_installed(self):
return self.executor.run("fio --version").stdout.strip() == self.fio_version
try:
output = self.executor.run_expect_success("fio --version").stdout
except CmdException:
return False
current_version = Version(output.strip().removeprefix("fio-"))
return current_version >= self.min_fio_version
def install(self):
fio_url = f"http://brick.kernel.dk/snaps/{self.fio_version}.tar.bz2"
fio_package = os_utils.download_file(fio_url)
fs_utils.uncompress_archive(fio_package)
TestRun.executor.run_expect_success(f"cd {fio_package.parent_dir}/{self.fio_version}"
f" && ./configure && make -j && make install")
fio_url = f"http://brick.kernel.dk/snaps/fio-{self.min_fio_version}.tar.bz2"
fio_package = wget.download_file(fio_url)
uncompress_archive(fio_package)
TestRun.executor.run_expect_success(
f"cd {fio_package.parent_dir}/fio-{self.min_fio_version}"
f" && ./configure && make -j && make install"
)
def calculate_timeout(self):
if "time_based" not in self.global_cmd_parameters.command_flags:
@@ -95,7 +106,7 @@ class Fio:
command = f"echo '{self.execution_cmd_parameters()}' |" \
f" {str(self.base_cmd_parameters)} -"
else:
fio_parameters = test_tools.fio.fio_param.FioParamCmd(self, self.executor)
fio_parameters = FioParamCmd(self, self.executor)
fio_parameters.command_env_var.update(self.base_cmd_parameters.command_env_var)
fio_parameters.command_param.update(self.base_cmd_parameters.command_param)
fio_parameters.command_param.update(self.global_cmd_parameters.command_param)

View File

@@ -13,8 +13,8 @@ from connection.base_executor import BaseExecutor
from core.test_run import TestRun
from storage_devices.device import Device
from test_tools.fio.fio_result import FioResult
from test_utils.linux_command import LinuxCommand
from test_utils.size import Size
from test_tools.common.linux_command import LinuxCommand
from type_def.size import Size
class CpusAllowedPolicy(Enum):
@@ -316,6 +316,9 @@ class FioParam(LinuxCommand):
def verify_only(self, value: bool = True):
return self.set_flags('verify_only') if value else self.remove_param('verify_only')
def trim_verify_zero(self, value: bool = True):
return self.set_param('trim_verify_zero', int(value))
def write_hint(self, value: str):
return self.set_param('write_hint', value)

View File

@@ -4,7 +4,7 @@
#
import secrets
from aenum import Enum
from enum import Enum
class Pattern(Enum):

View File

@@ -4,8 +4,8 @@
#
from test_utils.size import Size, Unit, UnitPerSecond
from test_utils.time import Time
from type_def.size import Size, Unit, UnitPerSecond
from type_def.time import Time
class FioResult:
@@ -103,9 +103,6 @@ class FioResult:
def write_runtime(self):
return Time(microseconds=self.job.write.runtime)
def write_completion_latency_average(self):
return Time(nanoseconds=self.job.write.lat_ns.mean)
def write_completion_latency_min(self):
return Time(nanoseconds=self.job.write.lat_ns.min)
@@ -139,9 +136,6 @@ class FioResult:
def trim_runtime(self):
return Time(microseconds=self.job.trim.runtime)
def trim_completion_latency_average(self):
return Time(nanoseconds=self.job.trim.lat_ns.mean)
def trim_completion_latency_min(self):
return Time(nanoseconds=self.job.trim.lat_ns.min)

View File

@@ -7,15 +7,23 @@
import base64
import math
import re
import textwrap
from collections import namedtuple
from datetime import datetime, timedelta
from enum import Enum
from aenum import IntFlag # IntFlag from enum is not able to correctly parse string like "x|y|z"
from aenum import IntFlag, Enum
from connection.utils.output import CmdException
from core.test_run import TestRun
from test_tools.dd import Dd
from test_utils.size import Size, Unit
from type_def.size import Size, Unit
class Filesystem(Enum):
xfs = 0
ext3 = 1
ext4 = 2
class Permissions(IntFlag):
@@ -50,7 +58,7 @@ class PermissionSign(Enum):
set = '='
class FilesPermissions():
class FilesPermissions:
perms_exceptions = {}
def __init__(self, files_list: list):
@@ -393,3 +401,55 @@ def create_random_test_file(target_file_path: str,
dd.run()
file.refresh_item()
return file
def mkfs(device, filesystem: Filesystem, force=True, blocksize=None):
TestRun.LOGGER.info(
f"Creating filesystem ({filesystem.name}) on device: {device.path}")
force_param = ' -f ' if filesystem == Filesystem.xfs else ' -F '
force_param = force_param if force else ''
block_size_param = f' -b size={blocksize}' if filesystem == Filesystem.xfs \
else f' -b {blocksize}'
block_size_param = block_size_param if blocksize else ''
cmd = f'mkfs.{filesystem.name} {force_param} {device.path} {block_size_param}'
cmd = re.sub(' +', ' ', cmd)
TestRun.executor.run_expect_success(cmd)
TestRun.LOGGER.info(
f"Successfully created filesystem on device: {device.path}")
def wipefs(device, force=True):
TestRun.LOGGER.info(f"Erasing the device: {device.path}")
force_param = ' -f' if force else ''
cmd = f'wipefs -a{force_param} {device.path}'
TestRun.executor.run_expect_success(cmd)
TestRun.LOGGER.info(
f"Successfully wiped device: {device.path}")
def get_device_filesystem_type(device_id):
cmd = f'lsblk -l -o NAME,FSTYPE | sort | uniq | grep "{device_id} "'
try:
stdout = TestRun.executor.run_expect_success(cmd).stdout
except CmdException:
# unusual devices might not be listed in output (i.e. RAID containers)
if TestRun.executor.run(f"test -b /dev/{device_id}").exit_code != 0:
raise
else:
return None
split_stdout = stdout.strip().split()
if len(split_stdout) <= 1:
return None
else:
try:
return Filesystem[split_stdout[1]]
except KeyError:
TestRun.LOGGER.warning(f"Unrecognized filesystem: {split_stdout[1]}")
return None
def is_mounted(path: str):
if path is None or path.isspace():
raise Exception("Checked path cannot be empty")
command = f"mount | grep --fixed-strings '{path.rstrip('/')} '"
return TestRun.executor.run(command).exit_code == 0

21
test_tools/fstab.py Normal file
View File

@@ -0,0 +1,21 @@
#
# Copyright(c) 2019-2021 Intel Corporation
# Copyright(c) 2024 Huawei Technologies Co., Ltd.
# SPDX-License-Identifier: BSD-3-Clause
#
from test_tools.fs_tools import append_line, remove_lines
from test_tools.systemctl import reload_daemon, restart_service
def add_mountpoint(device, mount_point, fs_type, mount_now=True):
append_line("/etc/fstab",
f"{device.path} {mount_point} {fs_type.name} defaults 0 0")
reload_daemon()
if mount_now:
restart_service("local-fs.target")
def remove_mountpoint(device):
remove_lines("/etc/fstab", device.path)
reload_daemon()

View File

@@ -3,13 +3,14 @@
# Copyright(c) 2024 Huawei Technologies Co., Ltd.
# SPDX-License-Identifier: BSD-3-Clause
#
import itertools
import os
import posixpath
from core.test_run import TestRun
from connection.local_executor import LocalExecutor
from test_utils.output import CmdException
from connection.utils.output import CmdException
def __get_executor_and_repo_path(from_dut):
@@ -97,14 +98,10 @@ def get_commit_hash(version, from_dut: bool = False):
return output.stdout
def get_release_tags(forbidden_characters: list = None):
def get_tags():
repo_path = os.path.join(TestRun.usr.working_dir, ".git")
output = TestRun.executor.run_expect_success(f"git --git-dir={repo_path} tag").stdout
if not forbidden_characters:
return output.splitlines()
else:
return [v for v in output.splitlines() if all(c not in v for c in forbidden_characters)]
return output.splitlines()
def checkout_version(version):

19
test_tools/initramfs.py Normal file
View File

@@ -0,0 +1,19 @@
#
# Copyright(c) 2024 Huawei Technologies Co., Ltd.
# SPDX-License-Identifier: BSD-3-Clause
#
from core.test_run import TestRun
from test_tools.os_tools import get_distro, Distro
def update():
distro = get_distro()
TestRun.LOGGER.info("Updating initramfs")
match distro:
case Distro.DEBIAN | Distro.UBUNTU:
TestRun.executor.run_expect_success("update-initramfs -u")
case Distro.OPENEULER | Distro.CENTOS | Distro.REDHAT:
TestRun.executor.run_expect_success(
"dracut -f /boot/initramfs-$(uname -r).img $(uname -r)"
)

View File

@@ -3,12 +3,13 @@
# SPDX-License-Identifier: BSD-3-Clause
#
from core.test_run import TestRun
from storage_devices.device import Device
from test_utils.size import Size, Unit, UnitPerSecond
from test_utils.time import Time
import csv
from core.test_run import TestRun
from type_def.size import Size, Unit, UnitPerSecond
from type_def.time import Time
class IOstatExtended:
iostat_option = "x"
@@ -84,7 +85,7 @@ class IOstatExtended:
@classmethod
def get_iostat_list(
cls,
devices_list: [Device],
devices_list: [str],
since_boot: bool = True,
interval: int = 1,
):
@@ -138,7 +139,7 @@ class IOstatBasic:
@classmethod
def get_iostat_list(
cls,
devices_list: [Device],
devices_list: [str],
since_boot: bool = True,
interval: int = 1,
):
@@ -150,8 +151,8 @@ class IOstatBasic:
def _get_iostat_list(
class_type: type,
devices_list: [Device],
class_type: type(IOstatExtended) | type(IOstatBasic),
devices_list: [str],
since_boot: bool,
interval: int,
):
@@ -163,7 +164,7 @@ def _get_iostat_list(
if not since_boot:
iostat_cmd += f"-y {interval} 1 "
iostat_cmd += " ".join([name.get_device_id() for name in devices_list])
iostat_cmd += " ".join(devices_list)
sed_cmd = "sed -n '/^$/d;s/\s\+/,/g;/^Device/,$p'"

View File

@@ -1,129 +0,0 @@
#
# Copyright(c) 2019-2021 Intel Corporation
# SPDX-License-Identifier: BSD-3-Clause
#
import wget
import os
from enum import Enum
from core.test_run import TestRun
from test_tools import fs_utils
from test_utils.os_utils import DEBUGFS_MOUNT_POINT
KEDR_0_6_URL = "https://github.com/euspectre/kedr/archive/v0.6.tar.gz"
BUILD_DIR = "build"
LEAKS_LOGS_PATH = f"{DEBUGFS_MOUNT_POINT}/kedr_leak_check"
KMALLOC_FAULT_SIMULATION_PATH = "/sys/kernel/debug/kedr_fault_simulation"
class KedrProfile(Enum):
MEM_LEAK_CHECK = "leak_check.conf"
FAULT_SIM = "fsim.conf"
class Kedr:
@staticmethod
def is_installed():
return "KEDR version" in TestRun.executor.run("kedr --version").stdout.strip()
@classmethod
def install(cls):
if cls.is_installed():
TestRun.LOGGER.info("Kedr is already installed!")
return
# TODO check if cmake is installed before
# TODO consider using os_utils.download_file()
kedr_archive = wget.download(KEDR_0_6_URL)
TestRun.executor.rsync_to(
f"\"{kedr_archive}\"",
f"{TestRun.config['working_dir']}")
kedr_dir = TestRun.executor.run_expect_success(
f"cd {TestRun.config['working_dir']} && "
f"tar -ztf \"{kedr_archive}\" | sed -e 's@/.*@@' | uniq"
).stdout
TestRun.executor.run_expect_success(
f"cd {TestRun.config['working_dir']} && "
f"tar -xf \"{kedr_archive}\" && "
f"cd {kedr_dir} && "
f"mkdir -p {BUILD_DIR} && "
f"cd {BUILD_DIR} && "
f"cmake ../sources/ && "
f"make && "
f"make install"
)
os.remove(kedr_archive)
TestRun.LOGGER.info("Kedr installed succesfully")
@classmethod
def is_loaded(cls):
if not cls.is_installed():
raise Exception("Kedr is not installed!")
if "KEDR status: loaded" in TestRun.executor.run_expect_success("kedr status").stdout:
return True
else:
return False
@classmethod
def start(cls, module, profile: KedrProfile = KedrProfile.MEM_LEAK_CHECK):
if not cls.is_installed():
raise Exception("Kedr is not installed!")
TestRun.LOGGER.info(f"Starting kedr with {profile} profile")
start_cmd = f"kedr start {module} -f {profile.value}"
TestRun.executor.run_expect_success(start_cmd)
# TODO extend to scenarios other than kmalloc
def setup_fault_injections(condition: str = "1"):
TestRun.executor.run_expect_success(
f'echo "kmalloc" > {KMALLOC_FAULT_SIMULATION_PATH}/points/kmalloc/current_indicator')
TestRun.executor.run_expect_success(
f'echo "{condition}" > {KMALLOC_FAULT_SIMULATION_PATH}/points/kmalloc/expression')
@classmethod
def fsim_show_last_fault(cls):
if not cls.is_installed():
raise Exception("Kedr is not installed!")
if not cls.is_loaded():
raise Exception("Kedr is not loaded!")
return fs_utils.read_file(f"{KMALLOC_FAULT_SIMULATION_PATH}/last_fault")
@classmethod
def stop(cls):
if not cls.is_installed():
raise Exception("Kedr is not installed!")
TestRun.executor.run_expect_success("kedr stop")
@classmethod
def check_for_mem_leaks(cls, module):
if not cls.is_installed():
raise Exception("Kedr is not installed!")
if not cls.is_loaded():
raise Exception("Kedr is not loaded!")
if fs_utils.check_if_directory_exists(f"{LEAKS_LOGS_PATH}/{module}"):
logs_path = f"{LEAKS_LOGS_PATH}/{module}"
elif fs_utils.check_if_directory_exists(f"{DEBUGFS_MOUNT_POINT}"):
logs_path = f"{LEAKS_LOGS_PATH}"
else:
raise Exception("Couldn't find kedr logs dir!")
leaks = fs_utils.read_file(f"{logs_path}/possible_leaks")
frees = fs_utils.read_file(f"{logs_path}/unallocated_frees")
summary = fs_utils.read_file(f"{logs_path}/info")
if leaks or frees:
raise Exception("Memory leaks found!\n"
f"Kedr summary: {summary}\n"
f"Possible memory leaks: {leaks}\n"
f"Unallocated frees: {frees}\n")

View File

@@ -4,12 +4,11 @@
# SPDX-License-Identifier: BSD-3-Clause
#
import os
import re
from core.test_run import TestRun
from test_utils.output import CmdException
from connection.utils.output import CmdException
class RpmSet:

View File

@@ -1,11 +1,14 @@
#
# Copyright(c) 2020-2021 Intel Corporation
# Copyright(c) 2024 Huawei Technologies Co., Ltd.
# SPDX-License-Identifier: BSD-3-Clause
#
import re
from core.test_run import TestRun
from test_utils.size import Unit
from type_def.size import Unit
from test_tools.udev import Udev
class Mdadm:
@@ -33,7 +36,11 @@ class Mdadm:
if conf.size:
cmd += f"--size={int(conf.size.get_value(Unit.KibiByte))} "
cmd += device_paths
return TestRun.executor.run_expect_success(cmd)
ret = TestRun.executor.run_expect_success(cmd)
Udev.trigger()
Udev.settle()
return ret
@staticmethod
def detail(raid_device_paths: str):
@@ -76,8 +83,6 @@ class Mdadm:
raids = []
uuid_path_prefix = "/dev/disk/by-id/md-uuid-"
# sometimes links for RAIDs are not properly created, force udev to create them
TestRun.executor.run("udevadm trigger && udevadm settle")
for line in output.stdout.splitlines():
split_line = line.split()

109
test_tools/memory.py Normal file
View File

@@ -0,0 +1,109 @@
#
# Copyright(c) 2019-2022 Intel Corporation
# Copyright(c) 2024 Huawei Technologies Co., Ltd.
# SPDX-License-Identifier: BSD-3-Clause
#
import math
from connection.utils.output import CmdException
from core.test_run import TestRun
from test_tools.dd import Dd
from test_tools.fs_tools import check_if_directory_exists, create_directory, is_mounted
from test_tools.os_tools import OvercommitMemoryMode, drop_caches, DropCachesMode, \
MEMORY_MOUNT_POINT
from type_def.size import Size, Unit
def disable_memory_affecting_functions():
"""Disables system functions affecting memory"""
# Don't allow sshd to be killed in case of out-of-memory:
TestRun.executor.run(
"echo '-1000' > /proc/`cat /var/run/sshd.pid`/oom_score_adj"
)
TestRun.executor.run(
"echo -17 > /proc/`cat /var/run/sshd.pid`/oom_adj"
) # deprecated
TestRun.executor.run_expect_success(
f"echo {OvercommitMemoryMode.NEVER.value} > /proc/sys/vm/overcommit_memory"
)
TestRun.executor.run_expect_success("echo '100' > /proc/sys/vm/overcommit_ratio")
TestRun.executor.run_expect_success(
"echo '64 64 32' > /proc/sys/vm/lowmem_reserve_ratio"
)
TestRun.executor.run_expect_success("swapoff --all")
drop_caches(DropCachesMode.SLAB)
def defaultize_memory_affecting_functions():
"""Sets default values to system functions affecting memory"""
TestRun.executor.run_expect_success(
f"echo {OvercommitMemoryMode.DEFAULT.value} > /proc/sys/vm/overcommit_memory"
)
TestRun.executor.run_expect_success("echo 50 > /proc/sys/vm/overcommit_ratio")
TestRun.executor.run_expect_success(
"echo '256 256 32' > /proc/sys/vm/lowmem_reserve_ratio"
)
TestRun.executor.run_expect_success("swapon --all")
def get_mem_free():
"""Returns free amount of memory in bytes"""
output = TestRun.executor.run_expect_success("free -b")
output = output.stdout.splitlines()
for line in output:
if 'free' in line:
index = line.split().index('free') + 1 # 1st row has 1 element less than following rows
if 'Mem' in line:
mem_line = line.split()
return Size(int(mem_line[index]))
def get_mem_available():
"""Returns amount of available memory from /proc/meminfo"""
cmd = "cat /proc/meminfo | grep MemAvailable | awk '{ print $2 }'"
mem_available = TestRun.executor.run(cmd).stdout
return Size(int(mem_available), Unit.KibiByte)
def get_module_mem_footprint(module_name):
"""Returns allocated size of specific module's metadata from /proc/vmallocinfo"""
cmd = f"cat /proc/vmallocinfo | grep {module_name} | awk '{{ print $2 }}' "
output_lines = TestRun.executor.run(cmd).stdout.splitlines()
memory_used = 0
for line in output_lines:
memory_used += int(line)
return Size(memory_used)
def allocate_memory(size: Size):
"""Allocates given amount of memory"""
mount_ramfs()
TestRun.LOGGER.info(f"Allocating {size.get_value(Unit.MiB):0.2f} MiB of memory.")
bs = Size(1, Unit.Blocks512)
dd = (
Dd()
.block_size(bs)
.count(math.ceil(size / bs))
.input("/dev/zero")
.output(f"{MEMORY_MOUNT_POINT}/data")
)
output = dd.run()
if output.exit_code != 0:
raise CmdException("Allocating memory failed.", output)
def mount_ramfs():
"""Mounts ramfs to enable allocating memory space"""
if not check_if_directory_exists(MEMORY_MOUNT_POINT):
create_directory(MEMORY_MOUNT_POINT)
if not is_mounted(MEMORY_MOUNT_POINT):
TestRun.executor.run_expect_success(f"mount -t ramfs ramfs {MEMORY_MOUNT_POINT}")
def unmount_ramfs():
"""Unmounts ramfs and releases whole space allocated by it in memory"""
TestRun.executor.run_expect_success(f"umount {MEMORY_MOUNT_POINT}")

View File

@@ -1,7 +1,9 @@
#
# Copyright(c) 2021 Intel Corporation
# Copyright(c) 2024 Huawei Technologies Co., Ltd.
# SPDX-License-Identifier: BSD-3-Clause
#
import json
from core.test_run import TestRun

229
test_tools/os_tools.py Normal file
View File

@@ -0,0 +1,229 @@
#
# Copyright(c) 2019-2022 Intel Corporation
# Copyright(c) 2024 Huawei Technologies Co., Ltd.
# SPDX-License-Identifier: BSD-3-Clause
#
import posixpath
import re
import time
from datetime import timedelta
from enum import IntFlag, Enum, StrEnum
from packaging import version
from core.test_run import TestRun
from storage_devices.device import Device
from test_tools.disk_tools import get_sysfs_path
from test_tools.fs_tools import check_if_file_exists, is_mounted
from connection.utils.retry import Retry
DEBUGFS_MOUNT_POINT = "/sys/kernel/debug"
MEMORY_MOUNT_POINT = "/mnt/memspace"
class Distro(StrEnum):
UBUNTU = "ubuntu"
DEBIAN = "debian"
REDHAT = "rhel"
OPENEULER = "openeuler"
CENTOS = "centos"
class DropCachesMode(IntFlag):
PAGECACHE = 1
SLAB = 2
ALL = PAGECACHE | SLAB
class OvercommitMemoryMode(Enum):
DEFAULT = 0
ALWAYS = 1
NEVER = 2
class SystemManagerType(Enum):
sysv = 0
systemd = 1
def get_distro():
output = TestRun.executor.run(
"cat /etc/os-release | grep -e \"^ID=\" | awk -F= '{print$2}' | tr -d '\"'"
).stdout.lower()
try:
return Distro(output)
except ValueError:
raise ValueError(f"Could not resolve distro name. Command output: {output}")
def drop_caches(level: DropCachesMode = DropCachesMode.ALL):
TestRun.executor.run_expect_success(
f"echo {level.value} > /proc/sys/vm/drop_caches")
def get_number_of_processors_from_cpuinfo():
"""Returns number of processors (count) which are listed out in /proc/cpuinfo"""
cmd = f"cat /proc/cpuinfo | grep processor | wc -l"
output = TestRun.executor.run(cmd).stdout
return int(output)
def get_number_of_processes(process_name):
cmd = f"ps aux | grep {process_name} | grep -v grep | wc -l"
output = TestRun.executor.run(cmd).stdout
return int(output)
def get_kernel_version():
version_string = TestRun.executor.run_expect_success("uname -r").stdout
version_string = version_string.split('-')[0]
return version.Version(version_string)
def is_kernel_module_loaded(module_name):
command = f"lsmod | grep -E '^{module_name}\\b'"
output = TestRun.executor.run(command)
return output.exit_code == 0
def load_kernel_module(module_name, module_args: {str, str}=None):
cmd = f"modprobe {module_name}"
if module_args is not None:
for key, value in module_args.items():
cmd += f" {key}={value}"
return TestRun.executor.run(cmd)
def unload_kernel_module(module_name):
cmd = f"modprobe -r {module_name}"
return TestRun.executor.run_expect_success(cmd)
def get_kernel_module_parameter(module_name, parameter):
param_file_path = f"/sys/module/{module_name}/parameters/{parameter}"
if not check_if_file_exists(param_file_path):
raise FileNotFoundError(f"File {param_file_path} does not exist!")
return TestRun.executor.run(f"cat {param_file_path}").stdout
def mount_debugfs():
if not is_mounted(DEBUGFS_MOUNT_POINT):
TestRun.executor.run_expect_success(f"mount -t debugfs none {DEBUGFS_MOUNT_POINT}")
def reload_kernel_module(module_name, module_args: {str, str}=None):
if is_kernel_module_loaded(module_name):
unload_kernel_module(module_name)
Retry.run_while_false(
lambda: load_kernel_module(module_name, module_args).exit_code == 0,
timeout=timedelta(seconds=5)
)
def get_module_path(module_name):
cmd = f"modinfo {module_name}"
# module path is in second column of first line of `modinfo` output
module_info = TestRun.executor.run_expect_success(cmd).stdout
module_path = module_info.splitlines()[0].split()[1]
return module_path
def get_executable_path(exec_name):
cmd = f"which {exec_name}"
path = TestRun.executor.run_expect_success(cmd).stdout
return path
def kill_all_io(graceful=True):
if graceful:
# TERM signal should be used in preference to the KILL signal, since a
# process may install a handler for the TERM signal in order to perform
# clean-up steps before terminating in an orderly fashion.
TestRun.executor.run("killall -q --signal TERM dd fio blktrace")
time.sleep(3)
TestRun.executor.run("killall -q --signal TERM dd fio blktrace")
time.sleep(3)
TestRun.executor.run("killall -q --signal KILL dd fio blktrace")
TestRun.executor.run("kill -9 `ps aux | grep -i vdbench.* | awk '{ print $2 }'`")
if TestRun.executor.run("pgrep -x dd").exit_code == 0:
raise Exception(f"Failed to stop dd!")
if TestRun.executor.run("pgrep -x fio").exit_code == 0:
raise Exception(f"Failed to stop fio!")
if TestRun.executor.run("pgrep -x blktrace").exit_code == 0:
raise Exception(f"Failed to stop blktrace!")
if TestRun.executor.run("pgrep vdbench").exit_code == 0:
raise Exception(f"Failed to stop vdbench!")
def sync():
TestRun.executor.run_expect_success("sync")
def get_dut_cpu_number():
return int(TestRun.executor.run_expect_success("nproc").stdout)
def get_dut_cpu_physical_cores():
""" Get list of CPU numbers that don't share physical cores """
output = TestRun.executor.run_expect_success("lscpu --all --parse").stdout
core_list = []
visited_phys_cores = []
for line in output.split("\n"):
if "#" in line:
continue
cpu_no, phys_core_no = line.split(",")[:2]
if phys_core_no not in visited_phys_cores:
core_list.append(cpu_no)
visited_phys_cores.append(phys_core_no)
return core_list
def set_wbt_lat(device: Device, value: int):
if value < 0:
raise ValueError("Write back latency can't be negative number")
wbt_lat_config_path = posixpath.join(
get_sysfs_path(device.get_device_id()), "queue/wbt_lat_usec"
)
return TestRun.executor.run_expect_success(f"echo {value} > {wbt_lat_config_path}")
def get_wbt_lat(device: Device):
wbt_lat_config_path = posixpath.join(
get_sysfs_path(device.get_device_id()), "queue/wbt_lat_usec"
)
return int(TestRun.executor.run_expect_success(f"cat {wbt_lat_config_path}").stdout)
def get_cores_ids_range(numa_node: int):
output = TestRun.executor.run_expect_success(f"lscpu --all --parse").stdout
parse_output = re.findall(r'(\d+),(\d+),(?:\d+),(\d+),,', output, re.I)
return [element[0] for element in parse_output if int(element[2]) == numa_node]
def create_user(username, additional_params=None):
command = "useradd "
if additional_params:
command += "".join([f"-{p} " for p in additional_params])
command += username
return TestRun.executor.run_expect_success(command)
def check_if_user_exists(username):
return TestRun.executor.run(f"id {username}").exit_code == 0

View File

@@ -12,10 +12,10 @@ import tempfile
import lxml.etree as etree
from collections import namedtuple
from test_tools import wget
from core.test_run import TestRun
from test_tools import fs_utils
from test_tools.fs_utils import create_directory, check_if_file_exists, write_file
from test_utils import os_utils
from test_tools.fs_tools import create_directory, check_if_file_exists, write_file, remove, \
check_if_directory_exists
class PeachFuzzer:
@@ -27,7 +27,7 @@ class PeachFuzzer:
peach_fuzzer_3_0_url = "https://sourceforge.net/projects/peachfuzz/files/Peach/3.0/" \
"peach-3.0.202-linux-x86_64-release.zip"
base_dir = "/root/Fuzzy"
base_dir = posixpath.join(TestRun.TEST_RUN_DATA_PATH, "Fuzzy")
peach_dir = "peach-3.0.202-linux-x86_64-release"
xml_config_template = os.path.join(os.path.dirname(__file__), "config_template.xml")
xml_config_file = posixpath.join(base_dir, "fuzzerConfig.xml")
@@ -38,7 +38,7 @@ class PeachFuzzer:
escape_chars = '\\\n"\'&|;()`<>$! '
@classmethod
def get_fuzzed_command(cls, command_template: bytes, count: int):
def get_fuzzed_command(cls, command_template: str, count: int):
"""
Generate command with fuzzed parameter provided on command_template.
:param command_template: string with command to be executed.
@@ -75,7 +75,7 @@ class PeachFuzzer:
cls._install()
if not cls._is_xml_config_prepared():
TestRun.block("No Peach Fuzzer XML config needed to generate fuzzed values was found!")
fs_utils.remove(cls.fuzzy_output_file, force=True, ignore_errors=True)
remove(cls.fuzzy_output_file, force=True, ignore_errors=True)
TestRun.LOGGER.info(f"Generate {count} unique fuzzed values")
cmd = f"cd {cls.base_dir}; {cls.peach_dir}/peach --range 0,{count - 1} " \
f"--seed {random.randrange(2 ** 32)} {cls.xml_config_file} > " \
@@ -155,7 +155,7 @@ class PeachFuzzer:
Install Peach Fuzzer on the DUT
"""
create_directory(cls.base_dir, True)
peach_archive = os_utils.download_file(
peach_archive = wget.download_file(
cls.peach_fuzzer_3_0_url, destination_dir=cls.base_dir
)
TestRun.executor.run_expect_success(
@@ -172,7 +172,7 @@ class PeachFuzzer:
"""
if not cls._is_mono_installed():
TestRun.block("Mono is not installed, can't continue with Peach Fuzzer!")
if fs_utils.check_if_directory_exists(posixpath.join(cls.base_dir, cls.peach_dir)):
if check_if_directory_exists(posixpath.join(cls.base_dir, cls.peach_dir)):
return "Peach" in TestRun.executor.run(
f"cd {cls.base_dir} && {cls.peach_dir}/peach --version").stdout.strip()
else:
@@ -197,7 +197,7 @@ class PeachFuzzer:
"""
Check if Peach Fuzzer XML config is present on the DUT
"""
if fs_utils.check_if_file_exists(cls.xml_config_file):
if check_if_file_exists(cls.xml_config_file):
return True
else:
return False

109
test_tools/runlevel.py Normal file
View File

@@ -0,0 +1,109 @@
#
# Copyright(c) 2019-2022 Intel Corporation
# Copyright(c) 2024 Huawei Technologies Co., Ltd.
# SPDX-License-Identifier: BSD-3-Clause
#
from enum import IntEnum
from core.test_run import TestRun
from test_tools.os_tools import SystemManagerType
class Runlevel(IntEnum):
"""
Halt the system.
SysV Runlevel: 0
systemd Target: runlevel0.target, poweroff.target
"""
runlevel0 = 0
poweroff = runlevel0
"""
Single user mode.
SysV Runlevel: 1, s, single
systemd Target: runlevel1.target, rescue.target
"""
runlevel1 = 1
rescue = runlevel1
"""
User-defined/Site-specific runlevels. By default, identical to 3.
SysV Runlevel: 2, 4
systemd Target: runlevel2.target, runlevel4.target, multi-user.target
"""
runlevel2 = 2
"""
Multi-user, non-graphical. Users can usually login via multiple consoles or via the network.
SysV Runlevel: 3
systemd Target: runlevel3.target, multi-user.target
"""
runlevel3 = 3
multi_user = runlevel3
"""
Multi-user, graphical. Usually has all the services of runlevel 3 plus a graphical login.
SysV Runlevel: 5
systemd Target: runlevel5.target, graphical.target
"""
runlevel5 = 5
graphical = runlevel5
"""
Reboot
SysV Runlevel: 6
systemd Target: runlevel6.target, reboot.target
"""
runlevel6 = 6
reboot = runlevel6
"""
Emergency shell
SysV Runlevel: emergency
systemd Target: emergency.target
"""
runlevel7 = 7
emergency = runlevel7
def get_system_manager():
output = TestRun.executor.run_expect_success("ps -p 1").stdout
type = output.split('\n')[1].split()[3]
if type == "init":
return SystemManagerType.sysv
elif type == "systemd":
return SystemManagerType.systemd
raise Exception(f"Unknown system manager type ({type}).")
def change_runlevel(runlevel: Runlevel):
if runlevel == get_runlevel():
return
if Runlevel.runlevel0 < runlevel < Runlevel.runlevel6:
system_manager = get_system_manager()
if system_manager == SystemManagerType.systemd:
TestRun.executor.run_expect_success(f"systemctl set-default {runlevel.name}.target")
else:
TestRun.executor.run_expect_success(
f"sed -i 's/^.*id:.*$/id:{runlevel.value}:initdefault: /' /etc/inittab")
TestRun.executor.run_expect_success(f"init {runlevel.value}")
def get_runlevel():
system_manager = get_system_manager()
if system_manager == SystemManagerType.systemd:
result = TestRun.executor.run_expect_success("systemctl get-default")
try:
name = result.stdout.split(".")[0].replace("-", "_")
return Runlevel[name]
except Exception:
raise Exception(f"Cannot parse '{result.output}' to runlevel.")
else:
result = TestRun.executor.run_expect_success("runlevel")
try:
split_output = result.stdout.split()
runlevel = Runlevel(int(split_output[1]))
return runlevel
except Exception:
raise Exception(f"Cannot parse '{result.output}' to runlevel.")

View File

@@ -1,13 +1,15 @@
#
# Copyright(c) 2019-2022 Intel Corporation
# Copyright(c) 2024-2025 Huawei Technologies Co., Ltd.
# SPDX-License-Identifier: BSD-3-Clause
#
from pathlib import Path
from pathlib import PurePosixPath
from core.test_run import TestRun
systemd_service_directory = Path("/usr/lib/systemd/system/")
systemd_service_directory = PurePosixPath("/usr/lib/systemd/system/")
def enable_service(name):
TestRun.executor.run_expect_success(f"systemctl enable {name}")
@@ -23,3 +25,13 @@ def reload_daemon():
def restart_service(name):
TestRun.executor.run_expect_success(f"systemctl restart {name}")
def get_service_path(unit_name):
cmd = f"systemctl cat {unit_name}"
# path is in second column of first line of output
info = TestRun.executor.run_expect_success(cmd).stdout
path = info.splitlines()[0].split()[1]
return path

27
test_tools/udev.py Normal file
View File

@@ -0,0 +1,27 @@
#
# Copyright(c) 2019-2022 Intel Corporation
# Copyright(c) 2024 Huawei Technologies Co., Ltd.
# SPDX-License-Identifier: BSD-3-Clause
#
from core.test_run import TestRun
class Udev(object):
@staticmethod
def enable():
TestRun.LOGGER.info("Enabling udev")
TestRun.executor.run_expect_success("udevadm control --start-exec-queue")
@staticmethod
def disable():
TestRun.LOGGER.info("Disabling udev")
TestRun.executor.run_expect_success("udevadm control --stop-exec-queue")
@staticmethod
def trigger():
TestRun.executor.run_expect_success("udevadm trigger")
@staticmethod
def settle():
TestRun.executor.run_expect_success("udevadm settle")

17
test_tools/wget.py Normal file
View File

@@ -0,0 +1,17 @@
#
# Copyright(c) 2019-2022 Intel Corporation
# Copyright(c) 2024 Huawei Technologies Co., Ltd.
# SPDX-License-Identifier: BSD-3-Clause
#
from core.test_run import TestRun
from test_utils.filesystem.file import File
def download_file(url, destination_dir="/tmp"):
# TODO use wget module instead
command = ("wget --tries=3 --timeout=5 --continue --quiet "
f"--directory-prefix={destination_dir} {url}")
TestRun.executor.run_expect_success(command)
path = f"{destination_dir.rstrip('/')}/{File.get_name(url)}"
return File(path)

View File

@@ -0,0 +1,4 @@
#
# Copyright(c) 2024 Huawei Technologies Co., Ltd.
# SPDX-License-Identifier: BSD-3-Clause
#

View File

@@ -1,5 +1,6 @@
#
# Copyright(c) 2019-2021 Intel Corporation
# Copyright(c) 2024 Huawei Technologies Co., Ltd.
# SPDX-License-Identifier: BSD-3-Clause
#

View File

@@ -3,8 +3,6 @@
# SPDX-License-Identifier: BSD-3-Clause-Clear
#
import os
from test_utils.filesystem.file import File

View File

@@ -1,14 +1,15 @@
#
# Copyright(c) 2022 Intel Corporation
# Copyright(c) 2025 Huawei Corporation
# SPDX-License-Identifier: BSD-3-Clause
#
from textwrap import dedent
from string import Template
from pathlib import Path
from pathlib import PurePosixPath
from .systemd import enable_service, reload_daemon, systemd_service_directory, disable_service
from test_tools.fs_utils import (
from test_tools.systemctl import enable_service, reload_daemon, systemd_service_directory
from test_tools.fs_tools import (
create_file,
write_file,
remove,
@@ -17,7 +18,7 @@ from test_tools.fs_utils import (
class EmergencyEscape:
escape_marker = "EMERGENCY_ESCAPE"
escape_service = Path("emergency-escape.service")
escape_service = PurePosixPath("emergency-escape.service")
escape_service_template = Template(
dedent(
f"""
@@ -38,7 +39,7 @@ class EmergencyEscape:
"""
).strip()
)
cleanup_service = Path("emergency-escape-cleanup.service")
cleanup_service = PurePosixPath("emergency-escape-cleanup.service")
cleanup_service_template = Template(
dedent(
"""

View File

@@ -1,10 +1,12 @@
#
# Copyright(c) 2019-2021 Intel Corporation
# Copyright(c) 2024 Huawei Technologies Co., Ltd.
# SPDX-License-Identifier: BSD-3-Clause
#
from core.test_run import TestRun
from test_tools import fs_utils
from test_tools.fs_utils import check_if_directory_exists
from test_tools import fs_tools
from test_tools.fs_tools import check_if_directory_exists, parse_ls_output, ls_item, ls
from test_utils.filesystem.fs_item import FsItem
@@ -13,14 +15,14 @@ class Directory(FsItem):
FsItem.__init__(self, full_path)
def ls(self):
output = fs_utils.ls(f"{self.full_path}")
return fs_utils.parse_ls_output(output, self.full_path)
output = ls(self.full_path)
return parse_ls_output(output, self.full_path)
@staticmethod
def create_directory(path: str, parents: bool = False):
fs_utils.create_directory(path, parents)
output = fs_utils.ls_item(path)
return fs_utils.parse_ls_output(output)[0]
fs_tools.create_directory(path, parents)
output = ls_item(path)
return parse_ls_output(output)[0]
@staticmethod
def create_temp_directory(parent_dir_path: str = "/tmp"):

View File

@@ -3,12 +3,15 @@
# Copyright(c) 2023-2024 Huawei Technologies Co., Ltd.
# SPDX-License-Identifier: BSD-3-Clause
#
from datetime import timedelta
from test_tools import fs_utils
from test_tools import fs_tools
from test_tools.dd import Dd
from test_tools.fs_tools import read_file, write_file, ls_item, parse_ls_output, remove, \
check_if_directory_exists
from test_utils.filesystem.fs_item import FsItem
from test_utils.size import Size
from type_def.size import Size
class File(FsItem):
@@ -16,22 +19,22 @@ class File(FsItem):
FsItem.__init__(self, full_path)
def compare(self, other_file, timeout: timedelta = timedelta(minutes=30)):
return fs_utils.compare(str(self), str(other_file), timeout)
return fs_tools.compare(str(self), str(other_file), timeout)
def diff(self, other_file, timeout: timedelta = timedelta(minutes=30)):
return fs_utils.diff(str(self), str(other_file), timeout)
return fs_tools.diff(str(self), str(other_file), timeout)
def md5sum(self, binary=True, timeout: timedelta = timedelta(minutes=30)):
return fs_utils.md5sum(str(self), binary, timeout)
return fs_tools.md5sum(str(self), binary, timeout)
def crc32sum(self, timeout: timedelta = timedelta(minutes=30)):
return fs_utils.crc32sum(str(self), timeout)
return fs_tools.crc32sum(str(self), timeout)
def read(self):
return fs_utils.read_file(str(self))
return read_file(str(self))
def write(self, content, overwrite: bool = True):
fs_utils.write_file(str(self), content, overwrite)
write_file(str(self), content, overwrite)
self.refresh_item()
def get_properties(self):
@@ -39,9 +42,9 @@ class File(FsItem):
@staticmethod
def create_file(path: str):
fs_utils.create_file(path)
output = fs_utils.ls_item(path)
return fs_utils.parse_ls_output(output)[0]
fs_tools.create_file(path)
output = ls_item(path)
return parse_ls_output(output)[0]
def padding(self, size: Size):
dd = Dd().input("/dev/zero").output(self).count(1).block_size(size)
@@ -49,7 +52,7 @@ class File(FsItem):
self.refresh_item()
def remove(self, force: bool = False, ignore_errors: bool = False):
fs_utils.remove(str(self), force=force, ignore_errors=ignore_errors)
remove(str(self), force=force, ignore_errors=ignore_errors)
def copy(self,
destination,
@@ -57,18 +60,18 @@ class File(FsItem):
recursive: bool = False,
dereference: bool = False,
timeout: timedelta = timedelta(minutes=30)):
fs_utils.copy(str(self), destination, force, recursive, dereference, timeout)
if fs_utils.check_if_directory_exists(destination):
fs_tools.copy(str(self), destination, force, recursive, dereference, timeout)
if check_if_directory_exists(destination):
path = f"{destination}{'/' if destination[-1] != '/' else ''}{self.name}"
else:
path = destination
output = fs_utils.ls_item(path)
return fs_utils.parse_ls_output(output)[0]
output = ls_item(path)
return parse_ls_output(output)[0]
class FileProperties:
def __init__(self, file):
file = fs_utils.parse_ls_output(fs_utils.ls_item(file.full_path))[0]
file = parse_ls_output(ls_item(file.full_path))[0]
self.full_path = file.full_path
self.parent_dir = FsItem.get_parent_dir(self.full_path)
self.name = FsItem.get_name(self.full_path)

View File

@@ -1,11 +1,14 @@
#
# Copyright(c) 2019-2021 Intel Corporation
# Copyright(c) 2024 Huawei Technologies Co., Ltd.
# SPDX-License-Identifier: BSD-3-Clause
#
import posixpath
from test_tools import fs_utils
from test_tools import fs_tools
from test_tools.fs_tools import Permissions, PermissionsUsers, PermissionSign, \
check_if_directory_exists, ls_item, parse_ls_output
class FsItem:
@@ -38,19 +41,19 @@ class FsItem:
return self.full_path
def chmod_numerical(self, permissions: int, recursive: bool = False):
fs_utils.chmod_numerical(self.full_path, permissions, recursive)
fs_tools.chmod_numerical(self.full_path, permissions, recursive)
self.refresh_item()
def chmod(self,
permissions: fs_utils.Permissions,
users: fs_utils.PermissionsUsers,
sign: fs_utils.PermissionSign = fs_utils.PermissionSign.set,
permissions: Permissions,
users: PermissionsUsers,
sign: PermissionSign = PermissionSign.set,
recursive: bool = False):
fs_utils.chmod(self.full_path, permissions, users, sign=sign, recursive=recursive)
fs_tools.chmod(self.full_path, permissions, users, sign=sign, recursive=recursive)
self.refresh_item()
def chown(self, owner, group, recursive: bool = False):
fs_utils.chown(self.full_path, owner, group, recursive)
fs_tools.chown(self.full_path, owner, group, recursive)
self.refresh_item()
def copy(self,
@@ -58,20 +61,20 @@ class FsItem:
force: bool = False,
recursive: bool = False,
dereference: bool = False):
target_dir_exists = fs_utils.check_if_directory_exists(destination)
fs_utils.copy(str(self), destination, force, recursive, dereference)
target_dir_exists = check_if_directory_exists(destination)
fs_tools.copy(str(self), destination, force, recursive, dereference)
if target_dir_exists:
path = f"{destination}{'/' if destination[-1] != '/' else ''}{self.name}"
else:
path = destination
output = fs_utils.ls_item(f"{path}")
return fs_utils.parse_ls_output(output)[0]
output = ls_item(f"{path}")
return parse_ls_output(output)[0]
def move(self,
destination,
force: bool = False):
target_dir_exists = fs_utils.check_if_directory_exists(destination)
fs_utils.move(str(self), destination, force)
target_dir_exists = check_if_directory_exists(destination)
fs_tools.move(str(self), destination, force)
if target_dir_exists:
self.full_path = f"{destination}{'/' if destination[-1] != '/' else ''}{self.name}"
else:
@@ -80,7 +83,7 @@ class FsItem:
return self
def refresh_item(self):
updated_file = fs_utils.parse_ls_output(fs_utils.ls_item(self.full_path))[0]
updated_file = parse_ls_output(ls_item(self.full_path))[0]
# keep order the same as in __init__()
self.parent_dir = updated_file.parent_dir
self.name = updated_file.name

View File

@@ -5,7 +5,7 @@
#
from core.test_run import TestRun
from test_tools.fs_utils import (
from test_tools.fs_tools import (
readlink,
create_directory,
check_if_symlink_exists,

View File

@@ -1,20 +0,0 @@
#
# Copyright(c) 2019-2021 Intel Corporation
# SPDX-License-Identifier: BSD-3-Clause
#
from test_tools import fs_utils
from test_utils import systemd
def add_mountpoint(device, mount_point, fs_type, mount_now=True):
fs_utils.append_line("/etc/fstab",
f"{device.path} {mount_point} {fs_type.name} defaults 0 0")
systemd.reload_daemon()
if mount_now:
systemd.restart_service("local-fs.target")
def remove_mountpoint(device):
fs_utils.remove_lines("/etc/fstab", device.path)
systemd.reload_daemon()

View File

@@ -1,11 +0,0 @@
#
# Copyright(c) 2019-2021 Intel Corporation
# SPDX-License-Identifier: BSD-3-Clause
#
import random
import string
def random_string(length: int, chars=string.ascii_letters + string.digits):
return ''.join(random.choice(chars) for i in range(length))

View File

@@ -5,7 +5,7 @@
import re
from core.test_run import TestRun
from test_utils.output import CmdException
from connection.utils.output import CmdException
SYSFS_LINE_FORMAT = r"^(\d+\s+){10,}\d+$"
PROCFS_LINE_FORMAT = r"^\d+\s+\d+\s+[\w-]+\s+" + SYSFS_LINE_FORMAT[1:]

View File

@@ -1,462 +0,0 @@
#
# Copyright(c) 2019-2022 Intel Corporation
# SPDX-License-Identifier: BSD-3-Clause
#
import math
import posixpath
import re
import time
from datetime import timedelta, datetime
from aenum import IntFlag, Enum, IntEnum
from packaging import version
from core.test_run import TestRun
from storage_devices.device import Device
from test_tools.dd import Dd
from test_tools.disk_utils import get_sysfs_path
from test_tools.fs_utils import check_if_directory_exists, create_directory, check_if_file_exists
from test_utils.filesystem.file import File
from test_utils.output import CmdException
from test_utils.retry import Retry
from test_utils.size import Size, Unit
DEBUGFS_MOUNT_POINT = "/sys/kernel/debug"
MEMORY_MOUNT_POINT = "/mnt/memspace"
class DropCachesMode(IntFlag):
PAGECACHE = 1
SLAB = 2
ALL = PAGECACHE | SLAB
class OvercommitMemoryMode(Enum):
DEFAULT = 0
ALWAYS = 1
NEVER = 2
class Runlevel(IntEnum):
"""
Halt the system.
SysV Runlevel: 0
systemd Target: runlevel0.target, poweroff.target
"""
runlevel0 = 0
poweroff = runlevel0
"""
Single user mode.
SysV Runlevel: 1, s, single
systemd Target: runlevel1.target, rescue.target
"""
runlevel1 = 1
rescue = runlevel1
"""
User-defined/Site-specific runlevels. By default, identical to 3.
SysV Runlevel: 2, 4
systemd Target: runlevel2.target, runlevel4.target, multi-user.target
"""
runlevel2 = 2
"""
Multi-user, non-graphical. Users can usually login via multiple consoles or via the network.
SysV Runlevel: 3
systemd Target: runlevel3.target, multi-user.target
"""
runlevel3 = 3
multi_user = runlevel3
"""
Multi-user, graphical. Usually has all the services of runlevel 3 plus a graphical login.
SysV Runlevel: 5
systemd Target: runlevel5.target, graphical.target
"""
runlevel5 = 5
graphical = runlevel5
"""
Reboot
SysV Runlevel: 6
systemd Target: runlevel6.target, reboot.target
"""
runlevel6 = 6
reboot = runlevel6
"""
Emergency shell
SysV Runlevel: emergency
systemd Target: emergency.target
"""
runlevel7 = 7
emergency = runlevel7
class SystemManagerType(Enum):
sysv = 0
systemd = 1
def get_system_manager():
output = TestRun.executor.run_expect_success("ps -p 1").stdout
type = output.split('\n')[1].split()[3]
if type == "init":
return SystemManagerType.sysv
elif type == "systemd":
return SystemManagerType.systemd
raise Exception(f"Unknown system manager type ({type}).")
def change_runlevel(runlevel: Runlevel):
if runlevel == get_runlevel():
return
if Runlevel.runlevel0 < runlevel < Runlevel.runlevel6:
system_manager = get_system_manager()
if system_manager == SystemManagerType.systemd:
TestRun.executor.run_expect_success(f"systemctl set-default {runlevel.name}.target")
else:
TestRun.executor.run_expect_success(
f"sed -i 's/^.*id:.*$/id:{runlevel.value}:initdefault: /' /etc/inittab")
TestRun.executor.run_expect_success(f"init {runlevel.value}")
def get_runlevel():
system_manager = get_system_manager()
if system_manager == SystemManagerType.systemd:
result = TestRun.executor.run_expect_success("systemctl get-default")
try:
name = result.stdout.split(".")[0].replace("-", "_")
return Runlevel[name]
except Exception:
raise Exception(f"Cannot parse '{result.output}' to runlevel.")
else:
result = TestRun.executor.run_expect_success("runlevel")
try:
split_output = result.stdout.split()
runlevel = Runlevel(int(split_output[1]))
return runlevel
except Exception:
raise Exception(f"Cannot parse '{result.output}' to runlevel.")
class Udev(object):
@staticmethod
def enable():
TestRun.LOGGER.info("Enabling udev")
TestRun.executor.run_expect_success("udevadm control --start-exec-queue")
@staticmethod
def disable():
TestRun.LOGGER.info("Disabling udev")
TestRun.executor.run_expect_success("udevadm control --stop-exec-queue")
@staticmethod
def trigger():
TestRun.executor.run_expect_success("udevadm trigger")
@staticmethod
def settle():
TestRun.executor.run_expect_success("udevadm settle")
def drop_caches(level: DropCachesMode = DropCachesMode.ALL):
TestRun.executor.run_expect_success(
f"echo {level.value} > /proc/sys/vm/drop_caches")
def disable_memory_affecting_functions():
"""Disables system functions affecting memory"""
# Don't allow sshd to be killed in case of out-of-memory:
TestRun.executor.run(
"echo '-1000' > /proc/`cat /var/run/sshd.pid`/oom_score_adj"
)
TestRun.executor.run(
"echo -17 > /proc/`cat /var/run/sshd.pid`/oom_adj"
) # deprecated
TestRun.executor.run_expect_success(
f"echo {OvercommitMemoryMode.NEVER.value} > /proc/sys/vm/overcommit_memory"
)
TestRun.executor.run_expect_success("echo '100' > /proc/sys/vm/overcommit_ratio")
TestRun.executor.run_expect_success(
"echo '64 64 32' > /proc/sys/vm/lowmem_reserve_ratio"
)
TestRun.executor.run_expect_success("swapoff --all")
drop_caches(DropCachesMode.SLAB)
def defaultize_memory_affecting_functions():
"""Sets default values to system functions affecting memory"""
TestRun.executor.run_expect_success(
f"echo {OvercommitMemoryMode.DEFAULT.value} > /proc/sys/vm/overcommit_memory"
)
TestRun.executor.run_expect_success("echo 50 > /proc/sys/vm/overcommit_ratio")
TestRun.executor.run_expect_success(
"echo '256 256 32' > /proc/sys/vm/lowmem_reserve_ratio"
)
TestRun.executor.run_expect_success("swapon --all")
def get_mem_free():
"""Returns free amount of memory in bytes"""
output = TestRun.executor.run_expect_success("free -b")
output = output.stdout.splitlines()
for line in output:
if 'free' in line:
index = line.split().index('free') + 1 # 1st row has 1 element less than following rows
if 'Mem' in line:
mem_line = line.split()
return Size(int(mem_line[index]))
def get_mem_available():
"""Returns amount of available memory from /proc/meminfo"""
cmd = "cat /proc/meminfo | grep MemAvailable | awk '{ print $2 }'"
mem_available = TestRun.executor.run(cmd).stdout
return Size(int(mem_available), Unit.KibiByte)
def get_module_mem_footprint(module_name):
"""Returns allocated size of specific module's metadata from /proc/vmallocinfo"""
cmd = f"cat /proc/vmallocinfo | grep {module_name} | awk '{{ print $2 }}' "
output_lines = TestRun.executor.run(cmd).stdout.splitlines()
memory_used = 0
for line in output_lines:
memory_used += int(line)
return Size(memory_used)
def allocate_memory(size: Size):
"""Allocates given amount of memory"""
mount_ramfs()
TestRun.LOGGER.info(f"Allocating {size.get_value(Unit.MiB):0.2f} MiB of memory.")
bs = Size(1, Unit.Blocks512)
dd = (
Dd()
.block_size(bs)
.count(math.ceil(size / bs))
.input("/dev/zero")
.output(f"{MEMORY_MOUNT_POINT}/data")
)
output = dd.run()
if output.exit_code != 0:
raise CmdException("Allocating memory failed.", output)
def get_number_of_processors_from_cpuinfo():
"""Returns number of processors (count) which are listed out in /proc/cpuinfo"""
cmd = f"cat /proc/cpuinfo | grep processor | wc -l"
output = TestRun.executor.run(cmd).stdout
return int(output)
def get_number_of_processes(process_name):
cmd = f"ps aux | grep {process_name} | grep -v grep | wc -l"
output = TestRun.executor.run(cmd).stdout
return int(output)
def mount_ramfs():
"""Mounts ramfs to enable allocating memory space"""
if not check_if_directory_exists(MEMORY_MOUNT_POINT):
create_directory(MEMORY_MOUNT_POINT)
if not is_mounted(MEMORY_MOUNT_POINT):
TestRun.executor.run_expect_success(f"mount -t ramfs ramfs {MEMORY_MOUNT_POINT}")
def unmount_ramfs():
"""Unmounts ramfs and releases whole space allocated by it in memory"""
TestRun.executor.run_expect_success(f"umount {MEMORY_MOUNT_POINT}")
def download_file(url, destination_dir="/tmp"):
# TODO use wget module instead
command = ("wget --tries=3 --timeout=5 --continue --quiet "
f"--directory-prefix={destination_dir} {url}")
TestRun.executor.run_expect_success(command)
path = f"{destination_dir.rstrip('/')}/{File.get_name(url)}"
return File(path)
def get_kernel_version():
version_string = TestRun.executor.run_expect_success("uname -r").stdout
version_string = version_string.split('-')[0]
return version.Version(version_string)
class ModuleRemoveMethod(Enum):
rmmod = "rmmod"
modprobe = "modprobe -r"
def is_kernel_module_loaded(module_name):
output = TestRun.executor.run(f"lsmod | grep ^{module_name}")
return output.exit_code == 0
def get_sys_block_path():
sys_block = "/sys/class/block"
if not check_if_directory_exists(sys_block):
sys_block = "/sys/block"
return sys_block
def load_kernel_module(module_name, module_args: {str, str}=None):
cmd = f"modprobe {module_name}"
if module_args is not None:
for key, value in module_args.items():
cmd += f" {key}={value}"
return TestRun.executor.run(cmd)
def unload_kernel_module(module_name, unload_method: ModuleRemoveMethod = ModuleRemoveMethod.rmmod):
cmd = f"{unload_method.value} {module_name}"
return TestRun.executor.run_expect_success(cmd)
def get_kernel_module_parameter(module_name, parameter):
param_file_path = f"/sys/module/{module_name}/parameters/{parameter}"
if not check_if_file_exists(param_file_path):
raise FileNotFoundError(f"File {param_file_path} does not exist!")
return File(param_file_path).read()
def is_mounted(path: str):
if path is None or path.isspace():
raise Exception("Checked path cannot be empty")
command = f"mount | grep --fixed-strings '{path.rstrip('/')} '"
return TestRun.executor.run(command).exit_code == 0
def mount_debugfs():
if not is_mounted(DEBUGFS_MOUNT_POINT):
TestRun.executor.run_expect_success(f"mount -t debugfs none {DEBUGFS_MOUNT_POINT}")
def reload_kernel_module(module_name, module_args: {str, str}=None,
unload_method: ModuleRemoveMethod = ModuleRemoveMethod.rmmod):
if is_kernel_module_loaded(module_name):
unload_kernel_module(module_name, unload_method)
Retry.run_while_false(
lambda: load_kernel_module(module_name, module_args).exit_code == 0,
timeout=timedelta(seconds=5)
)
def get_module_path(module_name):
cmd = f"modinfo {module_name}"
# module path is in second column of first line of `modinfo` output
module_info = TestRun.executor.run_expect_success(cmd).stdout
module_path = module_info.splitlines()[0].split()[1]
return module_path
def get_executable_path(exec_name):
cmd = f"which {exec_name}"
path = TestRun.executor.run_expect_success(cmd).stdout
return path
def get_udev_service_path(unit_name):
cmd = f"systemctl cat {unit_name}"
# path is in second column of first line of output
info = TestRun.executor.run_expect_success(cmd).stdout
path = info.splitlines()[0].split()[1]
return path
def kill_all_io():
# TERM signal should be used in preference to the KILL signal, since a
# process may install a handler for the TERM signal in order to perform
# clean-up steps before terminating in an orderly fashion.
TestRun.executor.run("killall -q --signal TERM dd fio blktrace")
time.sleep(3)
TestRun.executor.run("killall -q --signal KILL dd fio blktrace")
TestRun.executor.run("kill -9 `ps aux | grep -i vdbench.* | awk '{ print $2 }'`")
if TestRun.executor.run("pgrep -x dd").exit_code == 0:
raise Exception(f"Failed to stop dd!")
if TestRun.executor.run("pgrep -x fio").exit_code == 0:
raise Exception(f"Failed to stop fio!")
if TestRun.executor.run("pgrep -x blktrace").exit_code == 0:
raise Exception(f"Failed to stop blktrace!")
if TestRun.executor.run("pgrep vdbench").exit_code == 0:
raise Exception(f"Failed to stop vdbench!")
def wait(predicate, timeout: timedelta, interval: timedelta = None):
start_time = datetime.now()
result = False
while start_time + timeout > datetime.now():
result = predicate()
if result:
break
if interval is not None:
time.sleep(interval.total_seconds())
return result
def sync():
TestRun.executor.run_expect_success("sync")
def get_dut_cpu_number():
return int(TestRun.executor.run_expect_success("nproc").stdout)
def get_dut_cpu_physical_cores():
""" Get list of CPU numbers that don't share physical cores """
output = TestRun.executor.run_expect_success("lscpu --all --parse").stdout
core_list = []
visited_phys_cores = []
for line in output.split("\n"):
if "#" in line:
continue
cpu_no, phys_core_no = line.split(",")[:2]
if phys_core_no not in visited_phys_cores:
core_list.append(cpu_no)
visited_phys_cores.append(phys_core_no)
return core_list
def set_wbt_lat(device: Device, value: int):
if value < 0:
raise ValueError("Write back latency can't be negative number")
wbt_lat_config_path = posixpath.join(
get_sysfs_path(device.get_device_id()), "queue/wbt_lat_usec"
)
return TestRun.executor.run_expect_success(f"echo {value} > {wbt_lat_config_path}")
def get_wbt_lat(device: Device):
wbt_lat_config_path = posixpath.join(
get_sysfs_path(device.get_device_id()), "queue/wbt_lat_usec"
)
return int(TestRun.executor.run_expect_success(f"cat {wbt_lat_config_path}").stdout)
def get_cores_ids_range(numa_node: int):
output = TestRun.executor.run_expect_success(f"lscpu --all --parse").stdout
parse_output = re.findall(r'(\d+),(\d+),(?:\d+),(\d+),,', output, re.I)
return [element[0] for element in parse_output if int(element[2]) == numa_node]

4
type_def/__init__.py Normal file
View File

@@ -0,0 +1,4 @@
#
# Copyright(c) 2024 Huawei Technologies Co., Ltd.
# SPDX-License-Identifier: BSD-3-Clause
#