Merge pull request #34 from katlapinka/kasiat/tf-refactor

TF refactor
This commit is contained in:
Katarzyna Treder 2024-12-31 12:44:20 +01:00 committed by GitHub
commit aea7532756
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
68 changed files with 987 additions and 1152 deletions

View File

@ -8,7 +8,7 @@ import time
from datetime import timedelta from datetime import timedelta
from core.test_run import TestRun from core.test_run import TestRun
from test_utils.output import CmdException from connection.utils.output import CmdException
class BaseExecutor: class BaseExecutor:

View File

@ -9,8 +9,8 @@ from datetime import timedelta
from connection.base_executor import BaseExecutor from connection.base_executor import BaseExecutor
from core.test_run import TestRun from core.test_run import TestRun
from test_tools.fs_utils import copy from test_tools.fs_tools import copy
from test_utils.output import Output, CmdException from connection.utils.output import Output, CmdException
class LocalExecutor(BaseExecutor): class LocalExecutor(BaseExecutor):

View File

@ -3,17 +3,18 @@
# Copyright(c) 2024 Huawei Technologies Co., Ltd. # Copyright(c) 2024 Huawei Technologies Co., Ltd.
# SPDX-License-Identifier: BSD-3-Clause # SPDX-License-Identifier: BSD-3-Clause
# #
import os import os
import re import re
import paramiko
import socket import socket
import subprocess import subprocess
from datetime import timedelta, datetime
import paramiko from datetime import timedelta, datetime
from connection.base_executor import BaseExecutor from connection.base_executor import BaseExecutor
from core.test_run import TestRun, Blocked from core.test_run import TestRun, Blocked
from test_utils.output import Output from connection.utils.output import Output
class SshExecutor(BaseExecutor): class SshExecutor(BaseExecutor):
@ -46,7 +47,7 @@ class SshExecutor(BaseExecutor):
hostname = target["hostname"] hostname = target["hostname"]
key_filename = target.get("identityfile", None) key_filename = target.get("identityfile", None)
user = target.get("user", user) user = target.get("user", user)
port = target.get("port", port) port = int(target.get("port", port))
if target.get("proxyjump", None) is not None: if target.get("proxyjump", None) is not None:
proxy = config.lookup(target["proxyjump"]) proxy = config.lookup(target["proxyjump"])
jump = paramiko.SSHClient() jump = paramiko.SSHClient()

View File

@ -0,0 +1,4 @@
#
# Copyright(c) 2024 Huawei Technologies Co., Ltd.
# SPDX-License-Identifier: BSD-3-Clause
#

View File

@ -1,9 +1,10 @@
# #
# Copyright(c) 2020-2021 Intel Corporation # Copyright(c) 2020-2021 Intel Corporation
# Copyright(c) 2024 Huawei Technologies Co., Ltd.
# SPDX-License-Identifier: BSD-3-Clause # SPDX-License-Identifier: BSD-3-Clause
# #
import concurrent from concurrent.futures import ThreadPoolExecutor
def start_async_func(func, *args): def start_async_func(func, *args):
@ -14,5 +15,5 @@ def start_async_func(func, *args):
- done() method returns True when task ended (have a result or ended with an exception) - done() method returns True when task ended (have a result or ended with an exception)
otherwise returns False otherwise returns False
""" """
executor = concurrent.futures.ThreadPoolExecutor() executor = ThreadPoolExecutor()
return executor.submit(func, *args) return executor.submit(func, *args)

View File

@ -1,5 +1,6 @@
# #
# Copyright(c) 2019-2021 Intel Corporation # Copyright(c) 2019-2021 Intel Corporation
# Copyright(c) 2024 Huawei Technologies Co., Ltd.
# SPDX-License-Identifier: BSD-3-Clause # SPDX-License-Identifier: BSD-3-Clause
# #

View File

@ -1,5 +1,6 @@
# #
# Copyright(c) 2021 Intel Corporation # Copyright(c) 2021 Intel Corporation
# Copyright(c) 2024 Huawei Technologies Co., Ltd.
# SPDX-License-Identifier: BSD-3-Clause # SPDX-License-Identifier: BSD-3-Clause
# #
@ -33,7 +34,7 @@ class Retry:
try: try:
result = func() result = func()
return True return True
except: except Exception:
return False return False
cls.run_while_false(wrapped_func, retries=retries, timeout=timeout) cls.run_while_false(wrapped_func, retries=retries, timeout=timeout)

View File

@ -19,7 +19,7 @@ from core.pair_testing import generate_pair_testing_testcases, register_testcase
from core.plugins import PluginManager from core.plugins import PluginManager
from log.base_log import BaseLogResult from log.base_log import BaseLogResult
from storage_devices.disk import Disk from storage_devices.disk import Disk
from test_utils import disk_finder from test_tools import disk_finder
from test_utils.dut import Dut from test_utils.dut import Dut
TestRun = core.test_run.TestRun TestRun = core.test_run.TestRun

View File

@ -6,8 +6,8 @@ from time import sleep
from core.test_run_utils import TestRun from core.test_run_utils import TestRun
from storage_devices.device import Device from storage_devices.device import Device
from test_utils import os_utils from connection.utils.output import CmdException
from test_utils.output import CmdException from test_tools.os_tools import load_kernel_module, is_kernel_module_loaded, unload_kernel_module
class ScsiDebug: class ScsiDebug:
@ -24,7 +24,7 @@ class ScsiDebug:
def reload(self): def reload(self):
self.teardown() self.teardown()
sleep(1) sleep(1)
load_output = os_utils.load_kernel_module(self.module_name, self.params) load_output = load_kernel_module(self.module_name, self.params)
if load_output.exit_code != 0: if load_output.exit_code != 0:
raise CmdException(f"Failed to load {self.module_name} module", load_output) raise CmdException(f"Failed to load {self.module_name} module", load_output)
TestRun.LOGGER.info(f"{self.module_name} loaded successfully.") TestRun.LOGGER.info(f"{self.module_name} loaded successfully.")
@ -32,8 +32,8 @@ class ScsiDebug:
TestRun.scsi_debug_devices = Device.get_scsi_debug_devices() TestRun.scsi_debug_devices = Device.get_scsi_debug_devices()
def teardown(self): def teardown(self):
if os_utils.is_kernel_module_loaded(self.module_name): if is_kernel_module_loaded(self.module_name):
os_utils.unload_kernel_module(self.module_name) unload_kernel_module(self.module_name)
plugin_class = ScsiDebug plugin_class = ScsiDebug

View File

@ -1,98 +0,0 @@
#
# Copyright(c) 2020-2021 Intel Corporation
# Copyright(c) 2023-2024 Huawei Technologies Co., Ltd.
# SPDX-License-Identifier: BSD-3-Clause
#
import time
import posixpath
from datetime import timedelta
from core.test_run import TestRun
from test_tools import fs_utils
class Vdbench:
def __init__(self, params, config):
print("VDBench plugin initialization")
self.run_time = timedelta(seconds=60)
try:
self.working_dir = config["working_dir"]
self.reinstall = config["reinstall"]
self.source_dir = config["source_dir"]
except Exception:
raise Exception("Missing fields in config! ('working_dir', 'source_dir' and "
"'reinstall' required)")
self.result_dir = posixpath.join(self.working_dir, 'result.tod')
def pre_setup(self):
pass
def post_setup(self):
print("VDBench plugin post setup")
if not self.reinstall and fs_utils.check_if_directory_exists(self.working_dir):
return
if fs_utils.check_if_directory_exists(self.working_dir):
fs_utils.remove(self.working_dir, True, True)
fs_utils.create_directory(self.working_dir)
TestRun.LOGGER.info("Copying vdbench to working dir.")
fs_utils.copy(
source=self.source_dir, destination=self.working_dir, force=True, recursive=True
)
def teardown(self):
pass
def create_config(self, config, run_time: timedelta):
self.run_time = run_time
if config[-1] != ",":
config += ","
config += f"elapsed={int(run_time.total_seconds())}"
TestRun.LOGGER.info(f"Vdbench config:\n{config}")
fs_utils.write_file(posixpath.join(self.working_dir, "param.ini"), config)
def run(self):
cmd = f"{posixpath.join(self.working_dir, 'vdbench')} " \
f"-f {posixpath.join(self.working_dir, 'param.ini')} " \
f"-vr -o {self.result_dir}"
full_cmd = f"screen -dmS vdbench {cmd}"
TestRun.executor.run(full_cmd)
start_time = time.time()
timeout = self.run_time * 1.5
while True:
if not TestRun.executor.run(f"ps aux | grep '{cmd}' | grep -v grep").exit_code == 0:
return self.analyze_log()
if time.time() - start_time > timeout.total_seconds():
TestRun.LOGGER.error("Vdbench timeout.")
return False
time.sleep(1)
def analyze_log(self):
output = TestRun.executor.run(
f"ls -1td {self.result_dir[0:len(self.result_dir) - 3]}* | head -1")
log_path = posixpath.join(output.stdout if output.exit_code == 0 else self.result_dir,
"logfile.html")
log_file = fs_utils.read_file(log_path)
if "Vdbench execution completed successfully" in log_file:
TestRun.LOGGER.info("Vdbench execution completed successfully.")
return True
if "Data Validation error" in log_file or "data_errors=1" in log_file:
TestRun.LOGGER.error("Data corruption occurred!")
elif "Heartbeat monitor:" in log_file:
TestRun.LOGGER.error("Vdbench: heartbeat.")
else:
TestRun.LOGGER.error("Vdbench unknown result.")
return False
plugin_class = Vdbench

View File

@ -15,8 +15,8 @@ import portalocker
from log.html_log_config import HtmlLogConfig from log.html_log_config import HtmlLogConfig
from log.html_log_manager import HtmlLogManager from log.html_log_manager import HtmlLogManager
from log.html_presentation_policy import html_policy from log.html_presentation_policy import html_policy
from test_utils.output import Output from connection.utils.output import Output
from test_utils.singleton import Singleton from test_utils.common.singleton import Singleton
def create_log(log_base_path, test_module, additional_args=None): def create_log(log_base_path, test_module, additional_args=None):
@ -188,7 +188,7 @@ class Log(HtmlLogManager, metaclass=Singleton):
def get_additional_logs(self): def get_additional_logs(self):
from core.test_run import TestRun from core.test_run import TestRun
from test_tools.fs_utils import check_if_file_exists from test_tools.fs_tools import check_if_file_exists
messages_log = "/var/log/messages" messages_log = "/var/log/messages"
if not check_if_file_exists(messages_log): if not check_if_file_exists(messages_log):
messages_log = "/var/log/syslog" messages_log = "/var/log/syslog"

4
scripts/__init__.py Normal file
View File

@ -0,0 +1,4 @@
#
# Copyright(c) 2024 Huawei Technologies Co., Ltd.
# SPDX-License-Identifier: BSD-3-Clause
#

View File

@ -3,29 +3,32 @@
# Copyright(c) 2023-2024 Huawei Technologies Co., Ltd. # Copyright(c) 2023-2024 Huawei Technologies Co., Ltd.
# SPDX-License-Identifier: BSD-3-Clause # SPDX-License-Identifier: BSD-3-Clause
# #
import posixpath import posixpath
from core.test_run import TestRun from core.test_run import TestRun
from test_tools import disk_utils, fs_utils from test_tools import disk_tools
from test_tools.disk_utils import get_device_filesystem_type, get_sysfs_path from test_tools.disk_tools import get_sysfs_path, validate_dev_path, get_size
from test_tools.fs_tools import (get_device_filesystem_type, Filesystem, wipefs,
readlink, write_file, mkfs, ls, parse_ls_output)
from test_utils.io_stats import IoStats from test_utils.io_stats import IoStats
from test_utils.size import Size, Unit from type_def.size import Size, Unit
class Device: class Device:
def __init__(self, path): def __init__(self, path):
disk_utils.validate_dev_path(path) validate_dev_path(path)
self.path = path self.path = path
self.size = Size(disk_utils.get_size(self.get_device_id()), Unit.Byte) self.size = Size(get_size(self.get_device_id()), Unit.Byte)
self.filesystem = get_device_filesystem_type(self.get_device_id()) self.filesystem = get_device_filesystem_type(self.get_device_id())
self.mount_point = None self.mount_point = None
def create_filesystem(self, fs_type: disk_utils.Filesystem, force=True, blocksize=None): def create_filesystem(self, fs_type: Filesystem, force=True, blocksize=None):
disk_utils.create_filesystem(self, fs_type, force, blocksize) mkfs(self, fs_type, force, blocksize)
self.filesystem = fs_type self.filesystem = fs_type
def wipe_filesystem(self, force=True): def wipe_filesystem(self, force=True):
disk_utils.wipe_filesystem(self, force) wipefs(self, force)
self.filesystem = None self.filesystem = None
def is_mounted(self): def is_mounted(self):
@ -34,13 +37,13 @@ class Device:
return False return False
else: else:
mount_point_line = output.stdout.split('\n')[1] mount_point_line = output.stdout.split('\n')[1]
device_path = fs_utils.readlink(self.path) device_path = readlink(self.path)
self.mount_point = mount_point_line[0:mount_point_line.find(device_path)].strip() self.mount_point = mount_point_line[0:mount_point_line.find(device_path)].strip()
return True return True
def mount(self, mount_point, options: [str] = None): def mount(self, mount_point, options: [str] = None):
if not self.is_mounted(): if not self.is_mounted():
if disk_utils.mount(self, mount_point, options): if disk_tools.mount(self, mount_point, options):
self.mount_point = mount_point self.mount_point = mount_point
else: else:
raise Exception(f"Device is already mounted! Actual mount point: {self.mount_point}") raise Exception(f"Device is already mounted! Actual mount point: {self.mount_point}")
@ -48,7 +51,7 @@ class Device:
def unmount(self): def unmount(self):
if not self.is_mounted(): if not self.is_mounted():
TestRun.LOGGER.info("Device is not mounted.") TestRun.LOGGER.info("Device is not mounted.")
elif disk_utils.unmount(self): elif disk_tools.unmount(self):
self.mount_point = None self.mount_point = None
def get_device_link(self, directory: str): def get_device_link(self, directory: str):
@ -56,27 +59,26 @@ class Device:
return next(i for i in items if i.full_path.startswith(directory)) return next(i for i in items if i.full_path.startswith(directory))
def get_device_id(self): def get_device_id(self):
return fs_utils.readlink(self.path).split('/')[-1] return readlink(self.path).split('/')[-1]
def get_all_device_links(self, directory: str): def get_all_device_links(self, directory: str):
from test_tools import fs_utils output = ls(f"$(find -L {directory} -samefile {self.path})")
output = fs_utils.ls(f"$(find -L {directory} -samefile {self.path})") return parse_ls_output(output, self.path)
return fs_utils.parse_ls_output(output, self.path)
def get_io_stats(self): def get_io_stats(self):
return IoStats.get_io_stats(self.get_device_id()) return IoStats.get_io_stats(self.get_device_id())
def get_sysfs_property(self, property_name): def get_sysfs_property(self, property_name):
path = posixpath.join(disk_utils.get_sysfs_path(self.get_device_id()), path = posixpath.join(get_sysfs_path(self.get_device_id()),
"queue", property_name) "queue", property_name)
return TestRun.executor.run_expect_success(f"cat {path}").stdout return TestRun.executor.run_expect_success(f"cat {path}").stdout
def set_sysfs_property(self, property_name, value): def set_sysfs_property(self, property_name, value):
TestRun.LOGGER.info( TestRun.LOGGER.info(
f"Setting {property_name} for device {self.get_device_id()} to {value}.") f"Setting {property_name} for device {self.get_device_id()} to {value}.")
path = posixpath.join(disk_utils.get_sysfs_path(self.get_device_id()), "queue", path = posixpath.join(get_sysfs_path(self.get_device_id()), "queue",
property_name) property_name)
fs_utils.write_file(path, str(value)) write_file(path, str(value))
def set_max_io_size(self, new_max_io_size: Size): def set_max_io_size(self, new_max_io_size: Size):
self.set_sysfs_property("max_sectors_kb", self.set_sysfs_property("max_sectors_kb",

View File

@ -11,12 +11,14 @@ from datetime import timedelta
from enum import IntEnum from enum import IntEnum
from core.test_run import TestRun from core.test_run import TestRun
from connection.utils.output import Output
from storage_devices.device import Device from storage_devices.device import Device
from test_tools import disk_utils, fs_utils, nvme_cli from test_tools import disk_tools, nvme_cli
from test_utils import disk_finder from test_tools.common.wait import wait
from test_utils.os_utils import wait from test_tools.disk_finder import get_block_devices_list, resolve_to_by_id_link
from test_utils.output import Output from test_tools.disk_tools import PartitionTable
from test_utils.size import Unit from test_tools.fs_tools import readlink, is_mounted, ls_item, parse_ls_output
from type_def.size import Unit
class DiskType(IntEnum): class DiskType(IntEnum):
@ -137,33 +139,33 @@ class Disk(Device):
) )
return recognized_types[0] return recognized_types[0]
def create_partitions(self, sizes: [], partition_table_type=disk_utils.PartitionTable.gpt): def create_partitions(self, sizes: [], partition_table_type=PartitionTable.gpt):
disk_utils.create_partitions(self, sizes, partition_table_type) disk_tools.create_partitions(self, sizes, partition_table_type)
def remove_partition(self, part): def remove_partition(self, part):
part_number = int(part.path.split("part")[1]) part_number = int(part.path.split("part")[1])
disk_utils.remove_parition(self, part_number) disk_tools.remove_parition(self, part_number)
self.partitions.remove(part) self.partitions.remove(part)
def umount_all_partitions(self): def umount_all_partitions(self):
TestRun.LOGGER.info(f"Unmounting all partitions from: {self.path}") TestRun.LOGGER.info(f"Unmounting all partitions from: {self.path}")
cmd = f"umount -l {fs_utils.readlink(self.path)}*?" cmd = f"umount -l {readlink(self.path)}*?"
TestRun.executor.run(cmd) TestRun.executor.run(cmd)
def remove_partitions(self): def remove_partitions(self):
for part in self.partitions: for part in self.partitions:
if part.is_mounted(): if is_mounted(part.path):
part.unmount() part.unmount()
if disk_utils.remove_partitions(self): if disk_tools.remove_partitions(self):
self.partitions.clear() self.partitions.clear()
def is_detected(self): def is_detected(self):
if self.serial_number: if self.serial_number:
serial_numbers = disk_finder.get_all_serial_numbers() serial_numbers = Disk.get_all_serial_numbers()
return self.serial_number in serial_numbers return self.serial_number in serial_numbers
elif self.path: elif self.path:
output = fs_utils.ls_item(f"{self.path}") output = ls_item(f"{self.path}")
return fs_utils.parse_ls_output(output)[0] is not None return parse_ls_output(output)[0] is not None
raise Exception("Couldn't check if device is detected by the system") raise Exception("Couldn't check if device is detected by the system")
def wait_for_plug_status(self, should_be_visible): def wait_for_plug_status(self, should_be_visible):
@ -214,6 +216,40 @@ class Disk(Device):
for disk_type in cls.types_registry: for disk_type in cls.types_registry:
disk_type.plug_all() disk_type.plug_all()
@staticmethod
def get_all_serial_numbers():
serial_numbers = {}
block_devices = get_block_devices_list()
for dev in block_devices:
serial = Disk.get_disk_serial_number(dev)
try:
path = resolve_to_by_id_link(dev)
except Exception:
continue
if serial:
serial_numbers[serial] = path
else:
TestRun.LOGGER.warning(f"Device {path} ({dev}) does not have a serial number.")
serial_numbers[path] = path
return serial_numbers
@staticmethod
def get_disk_serial_number(dev_path):
commands = [
f"(udevadm info --query=all --name={dev_path} | grep 'SCSI.*_SERIAL' || "
f"udevadm info --query=all --name={dev_path} | grep 'ID_SERIAL_SHORT') | "
"awk -F '=' '{print $NF}'",
f"sg_inq {dev_path} 2> /dev/null | grep '[Ss]erial number:' | "
"awk '{print $NF}'",
f"udevadm info --query=all --name={dev_path} | grep 'ID_SERIAL' | "
"awk -F '=' '{print $NF}'"
]
for command in commands:
serial = TestRun.executor.run(command).stdout
if serial:
return serial.split('\n')[0]
return None
@static_init @static_init
class NvmeDisk(Disk): class NvmeDisk(Disk):
@ -255,8 +291,8 @@ class NvmeDisk(Disk):
base = f"/sys/block/{device_id}/device" base = f"/sys/block/{device_id}/device"
for suffix in ["/remove", "/device/remove"]: for suffix in ["/remove", "/device/remove"]:
try: try:
output = fs_utils.ls_item(base + suffix) output = ls_item(base + suffix)
fs_utils.parse_ls_output(output)[0] parse_ls_output(output)[0]
except TypeError: except TypeError:
continue continue
return base + suffix return base + suffix
@ -311,8 +347,8 @@ class SataDisk(Disk):
@staticmethod @staticmethod
def get_sysfs_addr(device_id): def get_sysfs_addr(device_id):
ls_command = f"$(find -H /sys/devices/ -name {device_id} -type d)" ls_command = f"$(find -H /sys/devices/ -name {device_id} -type d)"
output = fs_utils.ls_item(f"{ls_command}") output = ls_item(ls_command)
sysfs_addr = fs_utils.parse_ls_output(output)[0] sysfs_addr = parse_ls_output(output)[0]
if not sysfs_addr: if not sysfs_addr:
raise Exception(f"Failed to find sysfs address: ls -l {ls_command}") raise Exception(f"Failed to find sysfs address: ls -l {ls_command}")
return sysfs_addr.full_path return sysfs_addr.full_path
@ -378,8 +414,8 @@ class VirtioDisk(Disk):
@staticmethod @staticmethod
def get_sysfs_addr(device_id: str) -> str: def get_sysfs_addr(device_id: str) -> str:
ls_command = f"$(find -H /sys/devices/ -name {device_id} -type d)" ls_command = f"$(find -H /sys/devices/ -name {device_id} -type d)"
output = fs_utils.ls_item(f"{ls_command}") output = ls_item(ls_command)
sysfs_addr = fs_utils.parse_ls_output(output)[0] sysfs_addr = parse_ls_output(output)[0]
if not sysfs_addr: if not sysfs_addr:
raise Exception(f"Failed to find sysfs address: ls -l {ls_command}") raise Exception(f"Failed to find sysfs address: ls -l {ls_command}")

View File

@ -3,14 +3,13 @@
# SPDX-License-Identifier: BSD-3-Clause-Clear # SPDX-License-Identifier: BSD-3-Clause-Clear
# #
import os
import posixpath import posixpath
from core.test_run import TestRun from core.test_run import TestRun
from storage_devices.device import Device from storage_devices.device import Device
from test_tools.drbdadm import Drbdadm from test_tools.drbdadm import Drbdadm
from test_utils.filesystem.symlink import Symlink from test_utils.filesystem.symlink import Symlink
from test_utils.output import CmdException from connection.utils.output import CmdException
class Drbd(Device): class Drbd(Device):

View File

@ -0,0 +1,87 @@
#
# Copyright(c) 2019-2022 Intel Corporation
# Copyright(c) 2023-2024 Huawei Technologies Co., Ltd.
# SPDX-License-Identifier: BSD-3-Clause
#
from core.test_run import TestRun
from storage_devices.device import Device
from test_tools.device_mapper import DmTable, DeviceMapper
from test_tools.disk_finder import resolve_to_by_id_link
class ErrorDevice(Device):
def __init__(self, name: str, base_device: Device, table: DmTable = None):
self.device = base_device
self.mapper = DeviceMapper(name)
self.name = name
self.table = DmTable.passthrough_table(base_device) if not table else table
self.active = False
self.start()
self.path = resolve_to_by_id_link(self.mapper.get_path().replace('/dev/', ''))
@property
def system_path(self):
if self.active:
output = TestRun.executor.run_expect_success(f"realpath {self.mapper.get_path()}")
return output.stdout
return None
@property
def size(self):
if self.active:
return self.table.get_size()
return None
def start(self):
self.mapper.create(self.table)
self.active = True
def stop(self):
self.mapper.remove()
self.active = False
def change_table(self, table: DmTable, permanent=True):
if self.active:
self.mapper.suspend()
self.mapper.reload(table)
self.mapper.resume()
if permanent:
self.table = table
def suspend_errors(self):
empty_table = DmTable.passthrough_table(self.device)
TestRun.LOGGER.info(f"Suspending issuing errors for error device '{self.name}'")
self.change_table(empty_table, False)
def resume_errors(self):
TestRun.LOGGER.info(f"Resuming issuing errors for error device '{self.name}'")
self.change_table(self.table, False)
def suspend(self):
if not self.active:
TestRun.LOGGER.warning(
f"cannot suspend error device '{self.name}'! It's already running"
)
self.mapper.suspend()
self.active = False
def resume(self):
if self.active:
TestRun.LOGGER.warning(
f"cannot resume error device '{self.name}'! It's already running"
)
self.mapper.resume()
self.active = True

View File

@ -10,10 +10,10 @@ from typing import Union
from core.test_run import TestRun from core.test_run import TestRun
from storage_devices.device import Device from storage_devices.device import Device
from storage_devices.disk import Disk from storage_devices.disk import Disk
from test_tools.fs_utils import readlink from test_tools.fs_tools import readlink
from test_utils.disk_finder import resolve_to_by_id_link, get_system_disks from test_tools.disk_finder import resolve_to_by_id_link, get_system_disks
from test_utils.filesystem.symlink import Symlink from test_utils.filesystem.symlink import Symlink
from test_utils.size import Size from type_def.size import Size
lvm_config_path = "/etc/lvm/lvm.conf" lvm_config_path = "/etc/lvm/lvm.conf"
filter_prototype_regex = r"^\sfilter\s=\s\[" filter_prototype_regex = r"^\sfilter\s=\s\["

View File

@ -5,11 +5,10 @@
from core.test_run import TestRun from core.test_run import TestRun
from storage_devices.device import Device from storage_devices.device import Device
from test_tools.fs_utils import ls, parse_ls_output from test_tools.fs_tools import ls, parse_ls_output
from test_utils.os_utils import ( from test_tools.os_tools import (
unload_kernel_module, unload_kernel_module,
is_kernel_module_loaded, is_kernel_module_loaded,
ModuleRemoveMethod,
reload_kernel_module, reload_kernel_module,
) )
@ -37,7 +36,7 @@ class NullBlk(Device):
if not is_kernel_module_loaded(cls._module): if not is_kernel_module_loaded(cls._module):
return return
TestRun.LOGGER.info("Removing null_blk ") TestRun.LOGGER.info("Removing null_blk ")
unload_kernel_module(module_name=cls._module, unload_method=ModuleRemoveMethod.modprobe) unload_kernel_module(module_name=cls._module)
@classmethod @classmethod
def list(cls): def list(cls):

View File

@ -4,13 +4,13 @@
# #
from storage_devices.device import Device from storage_devices.device import Device
from test_tools import disk_utils from test_tools.disk_tools import get_partition_path
from test_utils.size import Size from type_def.size import Size
class Partition(Device): class Partition(Device):
def __init__(self, parent_dev, type, number, begin: Size, end: Size): def __init__(self, parent_dev, type, number, begin: Size, end: Size):
Device.__init__(self, disk_utils.get_partition_path(parent_dev.path, number)) Device.__init__(self, get_partition_path(parent_dev.path, number))
self.number = number self.number = number
self.parent_device = parent_dev self.parent_device = parent_dev
self.type = type self.type = type

View File

@ -2,16 +2,17 @@
# Copyright(c) 2020-2021 Intel Corporation # Copyright(c) 2020-2021 Intel Corporation
# SPDX-License-Identifier: BSD-3-Clause # SPDX-License-Identifier: BSD-3-Clause
# #
import threading import threading
from enum import IntEnum, Enum from enum import IntEnum, Enum
from core.test_run import TestRun from core.test_run import TestRun
from storage_devices.device import Device from storage_devices.device import Device
from storage_devices.disk import Disk from storage_devices.disk import Disk
from test_tools.fs_utils import readlink from test_tools.fs_tools import readlink
from test_tools.mdadm import Mdadm from test_tools.mdadm import Mdadm
from test_utils.disk_finder import resolve_to_by_id_link from test_tools.disk_finder import resolve_to_by_id_link
from test_utils.size import Size, Unit from type_def.size import Size, Unit
def get_devices_paths_string(devices: [Device]): def get_devices_paths_string(devices: [Device]):

View File

@ -7,11 +7,11 @@ import posixpath
from core.test_run import TestRun from core.test_run import TestRun
from storage_devices.device import Device from storage_devices.device import Device
from test_tools import disk_utils from test_tools.disk_tools import get_size
from test_tools.fs_utils import ls, parse_ls_output from test_tools.fs_tools import ls, parse_ls_output
from test_utils.filesystem.symlink import Symlink from test_utils.filesystem.symlink import Symlink
from test_utils.os_utils import reload_kernel_module, unload_kernel_module, is_kernel_module_loaded from test_tools.os_tools import reload_kernel_module, unload_kernel_module, is_kernel_module_loaded
from test_utils.size import Size, Unit from type_def.size import Size, Unit
class RamDisk(Device): class RamDisk(Device):
@ -68,7 +68,7 @@ class RamDisk(Device):
ram_disks = cls._list_devices() ram_disks = cls._list_devices()
return ( return (
len(ram_disks) >= disk_count len(ram_disks) >= disk_count
and Size(disk_utils.get_size(ram_disks[0].name), Unit.Byte).align_down(Unit.MiB.value) and Size(get_size(ram_disks[0].name), Unit.Byte).align_down(Unit.MiB.value)
== disk_size.align_down(Unit.MiB.value) == disk_size.align_down(Unit.MiB.value)
) )

View File

@ -14,8 +14,9 @@ from datetime import timedelta
from core.test_run import TestRun from core.test_run import TestRun
from storage_devices.device import Device from storage_devices.device import Device
from test_utils.filesystem.directory import Directory from test_utils.filesystem.directory import Directory
from test_utils.os_utils import is_mounted, drop_caches, DropCachesMode from test_tools.os_tools import drop_caches, DropCachesMode
from test_utils.size import Size, Unit from test_tools.fs_tools import is_mounted
from type_def.size import Size, Unit
DEBUGFS_MOUNT_POINT = "/sys/kernel/debug" DEBUGFS_MOUNT_POINT = "/sys/kernel/debug"
PREFIX = "trace_" PREFIX = "trace_"

View File

@ -0,0 +1,4 @@
#
# Copyright(c) 2024 Huawei Technologies Co., Ltd.
# SPDX-License-Identifier: BSD-3-Clause
#

View File

@ -1,5 +1,6 @@
# #
# Copyright(c) 2019-2021 Intel Corporation # Copyright(c) 2019-2021 Intel Corporation
# Copyright(c) 2024 Huawei Technologies Co., Ltd.
# SPDX-License-Identifier: BSD-3-Clause # SPDX-License-Identifier: BSD-3-Clause
# #

20
test_tools/common/wait.py Normal file
View File

@ -0,0 +1,20 @@
#
# Copyright(c) 2019-2022 Intel Corporation
# Copyright(c) 2024 Huawei Technologies Co., Ltd.
# SPDX-License-Identifier: BSD-3-Clause
#
import time
from datetime import timedelta, datetime
def wait(predicate, timeout: timedelta, interval: timedelta = None):
start_time = datetime.now()
result = False
while start_time + timeout > datetime.now():
result = predicate()
if result:
break
if interval is not None:
time.sleep(interval.total_seconds())
return result

View File

@ -1,16 +1,17 @@
# #
# Copyright(c) 2019-2021 Intel Corporation # Copyright(c) 2019-2021 Intel Corporation
# Copyright(c) 2024 Huawei Technologies Co., Ltd.
# SPDX-License-Identifier: BSD-3-Clause # SPDX-License-Identifier: BSD-3-Clause
# #
import test_utils.linux_command as linux_comm import type_def.size as size
import test_utils.size as size
from core.test_run import TestRun from core.test_run import TestRun
from test_tools.common.linux_command import LinuxCommand
class Dd(linux_comm.LinuxCommand): class Dd(LinuxCommand):
def __init__(self): def __init__(self):
linux_comm.LinuxCommand.__init__(self, TestRun.executor, 'dd') LinuxCommand.__init__(self, TestRun.executor, 'dd')
def block_size(self, value: size.Size): def block_size(self, value: size.Size):
return self.set_param('bs', int(value.get_value())) return self.set_param('bs', int(value.get_value()))

View File

@ -1,16 +1,17 @@
# #
# Copyright(c) 2019-2021 Intel Corporation # Copyright(c) 2019-2021 Intel Corporation
# Copyright(c) 2024 Huawei Technologies Co., Ltd.
# SPDX-License-Identifier: BSD-3-Clause # SPDX-License-Identifier: BSD-3-Clause
# #
import test_utils.linux_command as linux_comm
import test_utils.size as size
from core.test_run import TestRun from core.test_run import TestRun
from test_tools.common.linux_command import LinuxCommand
from type_def.size import Size
class Ddrescue(linux_comm.LinuxCommand): class Ddrescue(LinuxCommand):
def __init__(self): def __init__(self):
linux_comm.LinuxCommand.__init__(self, TestRun.executor, 'ddrescue') LinuxCommand.__init__(self, TestRun.executor, 'ddrescue')
self.source_path = None self.source_path = None
self.destination_path = None self.destination_path = None
self.param_name_prefix = "--" self.param_name_prefix = "--"
@ -35,13 +36,13 @@ class Ddrescue(linux_comm.LinuxCommand):
def force(self): def force(self):
return self.set_flags("force") return self.set_flags("force")
def block_size(self, value: size.Size): def block_size(self, value: Size):
return self.set_param('sector-size', int(value.get_value())) return self.set_param('sector-size', int(value.get_value()))
def size(self, value: size.Size): def size(self, value: Size):
return self.set_param('size', int(value.get_value())) return self.set_param('size', int(value.get_value()))
def __str__(self): def __str__(self):
command = linux_comm.LinuxCommand.__str__(self) command = LinuxCommand.__str__(self)
command += f" {self.source_path} {self.destination_path}" command += f" {self.source_path} {self.destination_path}"
return command return command

View File

@ -1,5 +1,6 @@
# #
# Copyright(c) 2019-2021 Intel Corporation # Copyright(c) 2019-2021 Intel Corporation
# Copyright(c) 2023-2024 Huawei Technologies Co., Ltd.
# SPDX-License-Identifier: BSD-3-Clause # SPDX-License-Identifier: BSD-3-Clause
# #
@ -7,9 +8,8 @@ from enum import Enum
from core.test_run import TestRun from core.test_run import TestRun
from storage_devices.device import Device from storage_devices.device import Device
from test_utils.disk_finder import resolve_to_by_id_link from test_tools.common.linux_command import LinuxCommand
from test_utils.linux_command import LinuxCommand from type_def.size import Size, Unit
from test_utils.size import Size, Unit
class DmTarget(Enum): class DmTarget(Enum):
@ -30,11 +30,6 @@ class DmTarget(Enum):
return self.name.lower() return self.name.lower()
class DmTable:
class TableEntry:
pass
class DmTable: class DmTable:
class TableEntry: class TableEntry:
def __init__(self, offset: int, length: int, target: DmTarget, *params): def __init__(self, offset: int, length: int, target: DmTarget, *params):
@ -131,7 +126,7 @@ class DmTable:
return self return self
def add_entry(self, entry: DmTable.TableEntry): def add_entry(self, entry: TableEntry):
self.table.append(entry) self.table.append(entry)
return self return self
@ -250,80 +245,3 @@ class DeviceMapper(LinuxCommand):
return TestRun.executor.run_expect_success( return TestRun.executor.run_expect_success(
f"{self.command_name} reload {self.name} {self.wrap_table(table)}" f"{self.command_name} reload {self.name} {self.wrap_table(table)}"
) )
class ErrorDevice(Device):
def __init__(self, name: str, base_device: Device, table: DmTable = None):
self.device = base_device
self.mapper = DeviceMapper(name)
self.name = name
self.table = DmTable.passthrough_table(base_device) if not table else table
self.active = False
self.start()
self.path = resolve_to_by_id_link(self.mapper.get_path().replace('/dev/', ''))
@property
def system_path(self):
if self.active:
output = TestRun.executor.run_expect_success(f"realpath {self.mapper.get_path()}")
return output.stdout
return None
@property
def size(self):
if self.active:
return self.table.get_size()
return None
def start(self):
self.mapper.create(self.table)
self.active = True
def stop(self):
self.mapper.remove()
self.active = False
def change_table(self, table: DmTable, permanent=True):
if self.active:
self.mapper.suspend()
self.mapper.reload(table)
self.mapper.resume()
if permanent:
self.table = table
def suspend_errors(self):
empty_table = DmTable.passthrough_table(self.device)
TestRun.LOGGER.info(f"Suspending issuing errors for error device '{self.name}'")
self.change_table(empty_table, False)
def resume_errors(self):
TestRun.LOGGER.info(f"Resuming issuing errors for error device '{self.name}'")
self.change_table(self.table, False)
def suspend(self):
if not self.active:
TestRun.LOGGER.warning(
f"cannot suspend error device '{self.name}'! It's already running"
)
self.mapper.suspend()
self.active = False
def resume(self):
if self.active:
TestRun.LOGGER.warning(
f"cannot resume error device '{self.name}'! It's already running"
)
self.mapper.resume()
self.active = True

View File

@ -1,15 +1,15 @@
# #
# Copyright(c) 2019-2021 Intel Corporation # Copyright(c) 2019-2021 Intel Corporation
# Copyright(c) 2024 Huawei Technologies Co., Ltd.
# SPDX-License-Identifier: BSD-3-Clause # SPDX-License-Identifier: BSD-3-Clause
# #
import os
import posixpath import posixpath
from core.test_run import TestRun from core.test_run import TestRun
from test_tools import disk_utils from test_tools.disk_tools import get_sysfs_path, get_block_size, get_size
from test_tools.fs_utils import check_if_file_exists, readlink from test_tools.fs_tools import check_if_file_exists, readlink
from test_utils import os_utils from connection.utils.output import CmdException
from test_utils.output import CmdException
def find_disks(): def find_disks():
@ -49,7 +49,7 @@ def discover_hdd_devices(block_devices, devices_res):
for dev in block_devices: for dev in block_devices:
if TestRun.executor.run_expect_success(f"cat /sys/block/{dev}/removable").stdout == "1": if TestRun.executor.run_expect_success(f"cat /sys/block/{dev}/removable").stdout == "1":
continue # skip removable drives continue # skip removable drives
block_size = disk_utils.get_block_size(dev) block_size = get_block_size(dev)
if int(block_size) == 4096: if int(block_size) == 4096:
disk_type = 'hdd4k' disk_type = 'hdd4k'
else: else:
@ -61,7 +61,7 @@ def discover_hdd_devices(block_devices, devices_res):
f"sg_inq /dev/{dev} | grep -i 'serial number'" f"sg_inq /dev/{dev} | grep -i 'serial number'"
).stdout.split(': ')[1].strip(), ).stdout.split(': ')[1].strip(),
"blocksize": block_size, "blocksize": block_size,
"size": disk_utils.get_size(dev)}) "size": get_size(dev)})
block_devices.clear() block_devices.clear()
@ -98,57 +98,22 @@ def discover_ssd_devices(block_devices, devices_res):
"type": disk_type, "type": disk_type,
"path": resolve_to_by_id_link(device_path), "path": resolve_to_by_id_link(device_path),
"serial": serial_number, "serial": serial_number,
"blocksize": disk_utils.get_block_size(dev), "blocksize": get_block_size(dev),
"size": disk_utils.get_size(dev)}) "size": get_size(dev)})
block_devices.remove(dev) block_devices.remove(dev)
def get_disk_serial_number(dev_path):
commands = [
f"(udevadm info --query=all --name={dev_path} | grep 'SCSI.*_SERIAL' || "
f"udevadm info --query=all --name={dev_path} | grep 'ID_SERIAL_SHORT') | "
"awk -F '=' '{print $NF}'",
f"sg_inq {dev_path} 2> /dev/null | grep '[Ss]erial number:' | "
"awk '{print $NF}'",
f"udevadm info --query=all --name={dev_path} | grep 'ID_SERIAL' | "
"awk -F '=' '{print $NF}'"
]
for command in commands:
serial = TestRun.executor.run(command).stdout
if serial:
return serial.split('\n')[0]
return None
def get_all_serial_numbers():
serial_numbers = {}
block_devices = get_block_devices_list()
for dev in block_devices:
serial = get_disk_serial_number(dev)
try:
path = resolve_to_by_id_link(dev)
except Exception:
continue
if serial:
serial_numbers[serial] = path
else:
TestRun.LOGGER.warning(f"Device {path} ({dev}) does not have a serial number.")
serial_numbers[path] = path
return serial_numbers
def get_system_disks(): def get_system_disks():
system_device = TestRun.executor.run_expect_success('mount | grep " / "').stdout.split()[0] system_device = TestRun.executor.run_expect_success('mount | grep " / "').stdout.split()[0]
readlink_output = readlink(system_device) readlink_output = readlink(system_device)
device_name = readlink_output.split('/')[-1] device_name = readlink_output.split('/')[-1]
sys_block_path = os_utils.get_sys_block_path()
used_device_names = __get_slaves(device_name) used_device_names = __get_slaves(device_name)
if not used_device_names: if not used_device_names:
used_device_names = [device_name] used_device_names = [device_name]
disk_names = [] disk_names = []
for device_name in used_device_names: for device_name in used_device_names:
if check_if_file_exists(f'{sys_block_path}/{device_name}/partition'): if check_if_file_exists(os.path.join(get_sysfs_path(device_name), "partition")):
parent_device = readlink(f'{sys_block_path}/{device_name}/..').split('/')[-1] parent_device = readlink(os.path.join(get_sysfs_path(device_name), "..")).split('/')[-1]
disk_names.append(parent_device) disk_names.append(parent_device)
else: else:
disk_names.append(device_name) disk_names.append(device_name)
@ -159,7 +124,7 @@ def get_system_disks():
def __get_slaves(device_name: str): def __get_slaves(device_name: str):
try: try:
device_names = TestRun.executor.run_expect_success( device_names = TestRun.executor.run_expect_success(
f'ls {os_utils.get_sys_block_path()}/{device_name}/slaves').stdout.splitlines() f"ls {os.path.join(get_sysfs_path(device_name), 'slaves')}").stdout.splitlines()
except CmdException as e: except CmdException as e:
if "No such file or directory" not in e.output.stderr: if "No such file or directory" not in e.output.stderr:
raise raise
@ -177,7 +142,9 @@ def __get_slaves(device_name: str):
def resolve_to_by_id_link(path): def resolve_to_by_id_link(path):
by_id_paths = TestRun.executor.run_expect_success("ls /dev/disk/by-id -1").stdout.splitlines() by_id_paths = TestRun.executor.run_expect_success("ls /dev/disk/by-id -1").stdout.splitlines()
dev_full_paths = [posixpath.join("/dev/disk/by-id", by_id_path) for by_id_path in by_id_paths] dev_full_paths = [
posixpath.join("/dev/disk/by-id", by_id_path) for by_id_path in by_id_paths
]
for full_path in dev_full_paths: for full_path in dev_full_paths:
# handle exception for broken links # handle exception for broken links

View File

@ -8,23 +8,18 @@ import posixpath
import re import re
import time import time
from enum import Enum from enum import Enum
from typing import List
from core.test_run import TestRun from core.test_run import TestRun
from test_tools import fs_utils
from test_tools.dd import Dd from test_tools.dd import Dd
from test_tools.fs_utils import readlink, parse_ls_output, ls from test_tools.fs_tools import readlink, parse_ls_output, ls, check_if_directory_exists, \
from test_utils.output import CmdException create_directory, wipefs, is_mounted
from test_utils.size import Size, Unit from test_tools.udev import Udev
from type_def.size import Size, Unit
SECTOR_SIZE = 512 SECTOR_SIZE = 512
class Filesystem(Enum):
xfs = 0
ext3 = 1
ext4 = 2
class PartitionTable(Enum): class PartitionTable(Enum):
msdos = 0 msdos = 0
gpt = 1 gpt = 1
@ -42,21 +37,6 @@ class PartitionType(Enum):
unknown = 8 unknown = 8
def create_filesystem(device, filesystem: Filesystem, force=True, blocksize=None):
TestRun.LOGGER.info(
f"Creating filesystem ({filesystem.name}) on device: {device.path}")
force_param = ' -f ' if filesystem == Filesystem.xfs else ' -F '
force_param = force_param if force else ''
block_size_param = f' -b size={blocksize}' if filesystem == Filesystem.xfs \
else f' -b {blocksize}'
block_size_param = block_size_param if blocksize else ''
cmd = f'mkfs.{filesystem.name} {force_param} {device.path} {block_size_param}'
cmd = re.sub(' +', ' ', cmd)
TestRun.executor.run_expect_success(cmd)
TestRun.LOGGER.info(
f"Successfully created filesystem on device: {device.path}")
def create_partition_table(device, partition_table_type: PartitionTable = PartitionTable.gpt): def create_partition_table(device, partition_table_type: PartitionTable = PartitionTable.gpt):
TestRun.LOGGER.info( TestRun.LOGGER.info(
f"Creating partition table ({partition_table_type.name}) for device: {device.path}") f"Creating partition table ({partition_table_type.name}) for device: {device.path}")
@ -266,8 +246,7 @@ def get_first_partition_offset(device, aligned: bool):
def remove_partitions(device): def remove_partitions(device):
from test_utils.os_utils import Udev if is_mounted(device.path):
if device.is_mounted():
device.unmount() device.unmount()
for partition in device.partitions: for partition in device.partitions:
@ -275,7 +254,7 @@ def remove_partitions(device):
TestRun.LOGGER.info(f"Removing partitions from device: {device.path} " TestRun.LOGGER.info(f"Removing partitions from device: {device.path} "
f"({device.get_device_id()}).") f"({device.get_device_id()}).")
device.wipe_filesystem() wipefs(device)
Udev.trigger() Udev.trigger()
Udev.settle() Udev.settle()
output = TestRun.executor.run(f"ls {device.path}* -1") output = TestRun.executor.run(f"ls {device.path}* -1")
@ -286,8 +265,8 @@ def remove_partitions(device):
def mount(device, mount_point, options: [str] = None): def mount(device, mount_point, options: [str] = None):
if not fs_utils.check_if_directory_exists(mount_point): if not check_if_directory_exists(mount_point):
fs_utils.create_directory(mount_point, True) create_directory(mount_point, True)
TestRun.LOGGER.info(f"Mounting device {device.path} ({device.get_device_id()}) " TestRun.LOGGER.info(f"Mounting device {device.path} ({device.get_device_id()}) "
f"to {mount_point}.") f"to {mount_point}.")
cmd = f"mount {device.path} {mount_point}" cmd = f"mount {device.path} {mount_point}"
@ -329,15 +308,6 @@ def unit_to_string(unit):
return unit_string.get(unit, "Invalid unit.") return unit_string.get(unit, "Invalid unit.")
def wipe_filesystem(device, force=True):
TestRun.LOGGER.info(f"Erasing the device: {device.path}")
force_param = ' -f' if force else ''
cmd = f'wipefs -a{force_param} {device.path}'
TestRun.executor.run_expect_success(cmd)
TestRun.LOGGER.info(
f"Successfully wiped device: {device.path}")
def check_if_device_supports_trim(device): def check_if_device_supports_trim(device):
if device.get_device_id().startswith("nvme"): if device.get_device_id().startswith("nvme"):
return True return True
@ -350,27 +320,6 @@ def check_if_device_supports_trim(device):
return int(command_output.stdout) > 0 return int(command_output.stdout) > 0
def get_device_filesystem_type(device_id):
cmd = f'lsblk -l -o NAME,FSTYPE | sort | uniq | grep "{device_id} "'
try:
stdout = TestRun.executor.run_expect_success(cmd).stdout
except CmdException:
# unusual devices might not be listed in output (i.e. RAID containers)
if TestRun.executor.run(f"test -b /dev/{device_id}").exit_code != 0:
raise
else:
return None
split_stdout = stdout.strip().split()
if len(split_stdout) <= 1:
return None
else:
try:
return Filesystem[split_stdout[1]]
except KeyError:
TestRun.LOGGER.warning(f"Unrecognized filesystem: {split_stdout[1]}")
return None
def _is_by_id_path(path: str): def _is_by_id_path(path: str):
"""check if given path already is proper by-id path""" """check if given path already is proper by-id path"""
dev_by_id_dir = "/dev/disk/by-id" dev_by_id_dir = "/dev/disk/by-id"
@ -406,3 +355,13 @@ def validate_dev_path(path: str):
return path return path
raise ValueError(f'By-id device link {path} is broken.') raise ValueError(f'By-id device link {path} is broken.')
def get_block_device_names_list(exclude_list: List[int] = None) -> List[str]:
cmd = "lsblk -lo NAME"
if exclude_list is not None:
cmd += f" -e {','.join(str(type_id) for type_id in exclude_list)}"
devices = TestRun.executor.run_expect_success(cmd).stdout
devices_list = devices.splitlines()
devices_list.sort()
return devices_list

View File

@ -4,7 +4,7 @@
# #
from core.test_run import TestRun from core.test_run import TestRun
from test_utils.output import Output from connection.utils.output import Output
def get_dmesg() -> str: def get_dmesg() -> str:

View File

@ -8,12 +8,12 @@ import datetime
import uuid import uuid
from packaging.version import Version from packaging.version import Version
import test_tools.fio.fio_param
import test_tools.fs_utils
from core.test_run import TestRun from core.test_run import TestRun
from test_tools import fs_utils from connection.utils.output import CmdException
from test_utils import os_utils from test_tools import wget
from test_utils.output import CmdException from test_tools.fio.fio_param import FioParam, FioParamCmd, FioOutput, FioParamConfig
from test_tools.fs_tools import uncompress_archive
class Fio: class Fio:
@ -22,12 +22,12 @@ class Fio:
self.default_run_time = datetime.timedelta(hours=1) self.default_run_time = datetime.timedelta(hours=1)
self.jobs = [] self.jobs = []
self.executor = executor_obj if executor_obj is not None else TestRun.executor self.executor = executor_obj if executor_obj is not None else TestRun.executor
self.base_cmd_parameters: test_tools.fio.fio_param.FioParam = None self.base_cmd_parameters: FioParam = None
self.global_cmd_parameters: test_tools.fio.fio_param.FioParam = None self.global_cmd_parameters: FioParam = None
def create_command(self, output_type=test_tools.fio.fio_param.FioOutput.json): def create_command(self, output_type=FioOutput.json):
self.base_cmd_parameters = test_tools.fio.fio_param.FioParamCmd(self, self.executor) self.base_cmd_parameters = FioParamCmd(self, self.executor)
self.global_cmd_parameters = test_tools.fio.fio_param.FioParamConfig(self, self.executor) self.global_cmd_parameters = FioParamConfig(self, self.executor)
self.fio_file = \ self.fio_file = \
f'fio_run_{datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")}_{uuid.uuid4().hex}' f'fio_run_{datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")}_{uuid.uuid4().hex}'
@ -50,8 +50,8 @@ class Fio:
def install(self): def install(self):
fio_url = f"http://brick.kernel.dk/snaps/fio-{self.min_fio_version}.tar.bz2" fio_url = f"http://brick.kernel.dk/snaps/fio-{self.min_fio_version}.tar.bz2"
fio_package = os_utils.download_file(fio_url) fio_package = wget.download_file(fio_url)
fs_utils.uncompress_archive(fio_package) uncompress_archive(fio_package)
TestRun.executor.run_expect_success( TestRun.executor.run_expect_success(
f"cd {fio_package.parent_dir}/fio-{self.min_fio_version}" f"cd {fio_package.parent_dir}/fio-{self.min_fio_version}"
f" && ./configure && make -j && make install" f" && ./configure && make -j && make install"
@ -106,7 +106,7 @@ class Fio:
command = f"echo '{self.execution_cmd_parameters()}' |" \ command = f"echo '{self.execution_cmd_parameters()}' |" \
f" {str(self.base_cmd_parameters)} -" f" {str(self.base_cmd_parameters)} -"
else: else:
fio_parameters = test_tools.fio.fio_param.FioParamCmd(self, self.executor) fio_parameters = FioParamCmd(self, self.executor)
fio_parameters.command_env_var.update(self.base_cmd_parameters.command_env_var) fio_parameters.command_env_var.update(self.base_cmd_parameters.command_env_var)
fio_parameters.command_param.update(self.base_cmd_parameters.command_param) fio_parameters.command_param.update(self.base_cmd_parameters.command_param)
fio_parameters.command_param.update(self.global_cmd_parameters.command_param) fio_parameters.command_param.update(self.global_cmd_parameters.command_param)

View File

@ -13,8 +13,8 @@ from connection.base_executor import BaseExecutor
from core.test_run import TestRun from core.test_run import TestRun
from storage_devices.device import Device from storage_devices.device import Device
from test_tools.fio.fio_result import FioResult from test_tools.fio.fio_result import FioResult
from test_utils.linux_command import LinuxCommand from test_tools.common.linux_command import LinuxCommand
from test_utils.size import Size from type_def.size import Size
class CpusAllowedPolicy(Enum): class CpusAllowedPolicy(Enum):
@ -316,6 +316,9 @@ class FioParam(LinuxCommand):
def verify_only(self, value: bool = True): def verify_only(self, value: bool = True):
return self.set_flags('verify_only') if value else self.remove_param('verify_only') return self.set_flags('verify_only') if value else self.remove_param('verify_only')
def trim_verify_zero(self, value: bool = True):
return self.set_param('trim_verify_zero', int(value))
def write_hint(self, value: str): def write_hint(self, value: str):
return self.set_param('write_hint', value) return self.set_param('write_hint', value)

View File

@ -4,7 +4,7 @@
# #
import secrets import secrets
from aenum import Enum from enum import Enum
class Pattern(Enum): class Pattern(Enum):

View File

@ -4,8 +4,8 @@
# #
from test_utils.size import Size, Unit, UnitPerSecond from type_def.size import Size, Unit, UnitPerSecond
from test_utils.time import Time from type_def.time import Time
class FioResult: class FioResult:
@ -103,9 +103,6 @@ class FioResult:
def write_runtime(self): def write_runtime(self):
return Time(microseconds=self.job.write.runtime) return Time(microseconds=self.job.write.runtime)
def write_completion_latency_average(self):
return Time(nanoseconds=self.job.write.lat_ns.mean)
def write_completion_latency_min(self): def write_completion_latency_min(self):
return Time(nanoseconds=self.job.write.lat_ns.min) return Time(nanoseconds=self.job.write.lat_ns.min)
@ -139,9 +136,6 @@ class FioResult:
def trim_runtime(self): def trim_runtime(self):
return Time(microseconds=self.job.trim.runtime) return Time(microseconds=self.job.trim.runtime)
def trim_completion_latency_average(self):
return Time(nanoseconds=self.job.trim.lat_ns.mean)
def trim_completion_latency_min(self): def trim_completion_latency_min(self):
return Time(nanoseconds=self.job.trim.lat_ns.min) return Time(nanoseconds=self.job.trim.lat_ns.min)

View File

@ -7,15 +7,23 @@
import base64 import base64
import math import math
import re
import textwrap import textwrap
from collections import namedtuple from collections import namedtuple
from datetime import datetime, timedelta from datetime import datetime, timedelta
from enum import Enum
from aenum import IntFlag # IntFlag from enum is not able to correctly parse string like "x|y|z"
from aenum import IntFlag, Enum from connection.utils.output import CmdException
from core.test_run import TestRun from core.test_run import TestRun
from test_tools.dd import Dd from test_tools.dd import Dd
from test_utils.size import Size, Unit from type_def.size import Size, Unit
class Filesystem(Enum):
xfs = 0
ext3 = 1
ext4 = 2
class Permissions(IntFlag): class Permissions(IntFlag):
@ -50,7 +58,7 @@ class PermissionSign(Enum):
set = '=' set = '='
class FilesPermissions(): class FilesPermissions:
perms_exceptions = {} perms_exceptions = {}
def __init__(self, files_list: list): def __init__(self, files_list: list):
@ -393,3 +401,55 @@ def create_random_test_file(target_file_path: str,
dd.run() dd.run()
file.refresh_item() file.refresh_item()
return file return file
def mkfs(device, filesystem: Filesystem, force=True, blocksize=None):
TestRun.LOGGER.info(
f"Creating filesystem ({filesystem.name}) on device: {device.path}")
force_param = ' -f ' if filesystem == Filesystem.xfs else ' -F '
force_param = force_param if force else ''
block_size_param = f' -b size={blocksize}' if filesystem == Filesystem.xfs \
else f' -b {blocksize}'
block_size_param = block_size_param if blocksize else ''
cmd = f'mkfs.{filesystem.name} {force_param} {device.path} {block_size_param}'
cmd = re.sub(' +', ' ', cmd)
TestRun.executor.run_expect_success(cmd)
TestRun.LOGGER.info(
f"Successfully created filesystem on device: {device.path}")
def wipefs(device, force=True):
TestRun.LOGGER.info(f"Erasing the device: {device.path}")
force_param = ' -f' if force else ''
cmd = f'wipefs -a{force_param} {device.path}'
TestRun.executor.run_expect_success(cmd)
TestRun.LOGGER.info(
f"Successfully wiped device: {device.path}")
def get_device_filesystem_type(device_id):
cmd = f'lsblk -l -o NAME,FSTYPE | sort | uniq | grep "{device_id} "'
try:
stdout = TestRun.executor.run_expect_success(cmd).stdout
except CmdException:
# unusual devices might not be listed in output (i.e. RAID containers)
if TestRun.executor.run(f"test -b /dev/{device_id}").exit_code != 0:
raise
else:
return None
split_stdout = stdout.strip().split()
if len(split_stdout) <= 1:
return None
else:
try:
return Filesystem[split_stdout[1]]
except KeyError:
TestRun.LOGGER.warning(f"Unrecognized filesystem: {split_stdout[1]}")
return None
def is_mounted(path: str):
if path is None or path.isspace():
raise Exception("Checked path cannot be empty")
command = f"mount | grep --fixed-strings '{path.rstrip('/')} '"
return TestRun.executor.run(command).exit_code == 0

21
test_tools/fstab.py Normal file
View File

@ -0,0 +1,21 @@
#
# Copyright(c) 2019-2021 Intel Corporation
# Copyright(c) 2024 Huawei Technologies Co., Ltd.
# SPDX-License-Identifier: BSD-3-Clause
#
from test_tools.fs_tools import append_line, remove_lines
from test_tools.systemctl import reload_daemon, restart_service
def add_mountpoint(device, mount_point, fs_type, mount_now=True):
append_line("/etc/fstab",
f"{device.path} {mount_point} {fs_type.name} defaults 0 0")
reload_daemon()
if mount_now:
restart_service("local-fs.target")
def remove_mountpoint(device):
remove_lines("/etc/fstab", device.path)
reload_daemon()

View File

@ -3,13 +3,14 @@
# Copyright(c) 2024 Huawei Technologies Co., Ltd. # Copyright(c) 2024 Huawei Technologies Co., Ltd.
# SPDX-License-Identifier: BSD-3-Clause # SPDX-License-Identifier: BSD-3-Clause
# #
import itertools import itertools
import os import os
import posixpath import posixpath
from core.test_run import TestRun from core.test_run import TestRun
from connection.local_executor import LocalExecutor from connection.local_executor import LocalExecutor
from test_utils.output import CmdException from connection.utils.output import CmdException
def __get_executor_and_repo_path(from_dut): def __get_executor_and_repo_path(from_dut):
@ -97,14 +98,10 @@ def get_commit_hash(version, from_dut: bool = False):
return output.stdout return output.stdout
def get_release_tags(forbidden_characters: list = None): def get_tags():
repo_path = os.path.join(TestRun.usr.working_dir, ".git") repo_path = os.path.join(TestRun.usr.working_dir, ".git")
output = TestRun.executor.run_expect_success(f"git --git-dir={repo_path} tag").stdout output = TestRun.executor.run_expect_success(f"git --git-dir={repo_path} tag").stdout
if not forbidden_characters:
return output.splitlines() return output.splitlines()
else:
return [v for v in output.splitlines() if all(c not in v for c in forbidden_characters)]
def checkout_version(version): def checkout_version(version):

View File

@ -4,7 +4,7 @@
# #
from core.test_run import TestRun from core.test_run import TestRun
from test_utils.os_utils import get_distro, Distro from test_tools.os_tools import get_distro, Distro
def update(): def update():

View File

@ -3,12 +3,13 @@
# SPDX-License-Identifier: BSD-3-Clause # SPDX-License-Identifier: BSD-3-Clause
# #
from core.test_run import TestRun
from storage_devices.device import Device
from test_utils.size import Size, Unit, UnitPerSecond
from test_utils.time import Time
import csv import csv
from core.test_run import TestRun
from type_def.size import Size, Unit, UnitPerSecond
from type_def.time import Time
class IOstatExtended: class IOstatExtended:
iostat_option = "x" iostat_option = "x"
@ -84,7 +85,7 @@ class IOstatExtended:
@classmethod @classmethod
def get_iostat_list( def get_iostat_list(
cls, cls,
devices_list: [Device], devices_list: [str],
since_boot: bool = True, since_boot: bool = True,
interval: int = 1, interval: int = 1,
): ):
@ -138,7 +139,7 @@ class IOstatBasic:
@classmethod @classmethod
def get_iostat_list( def get_iostat_list(
cls, cls,
devices_list: [Device], devices_list: [str],
since_boot: bool = True, since_boot: bool = True,
interval: int = 1, interval: int = 1,
): ):
@ -150,8 +151,8 @@ class IOstatBasic:
def _get_iostat_list( def _get_iostat_list(
class_type: type, class_type: type(IOstatExtended) | type(IOstatBasic),
devices_list: [Device], devices_list: [str],
since_boot: bool, since_boot: bool,
interval: int, interval: int,
): ):
@ -163,7 +164,7 @@ def _get_iostat_list(
if not since_boot: if not since_boot:
iostat_cmd += f"-y {interval} 1 " iostat_cmd += f"-y {interval} 1 "
iostat_cmd += " ".join([name.get_device_id() for name in devices_list]) iostat_cmd += " ".join(devices_list)
sed_cmd = "sed -n '/^$/d;s/\s\+/,/g;/^Device/,$p'" sed_cmd = "sed -n '/^$/d;s/\s\+/,/g;/^Device/,$p'"

View File

@ -1,129 +0,0 @@
#
# Copyright(c) 2019-2021 Intel Corporation
# SPDX-License-Identifier: BSD-3-Clause
#
import wget
import os
from enum import Enum
from core.test_run import TestRun
from test_tools import fs_utils
from test_utils.os_utils import DEBUGFS_MOUNT_POINT
KEDR_0_6_URL = "https://github.com/euspectre/kedr/archive/v0.6.tar.gz"
BUILD_DIR = "build"
LEAKS_LOGS_PATH = f"{DEBUGFS_MOUNT_POINT}/kedr_leak_check"
KMALLOC_FAULT_SIMULATION_PATH = "/sys/kernel/debug/kedr_fault_simulation"
class KedrProfile(Enum):
MEM_LEAK_CHECK = "leak_check.conf"
FAULT_SIM = "fsim.conf"
class Kedr:
@staticmethod
def is_installed():
return "KEDR version" in TestRun.executor.run("kedr --version").stdout.strip()
@classmethod
def install(cls):
if cls.is_installed():
TestRun.LOGGER.info("Kedr is already installed!")
return
# TODO check if cmake is installed before
# TODO consider using os_utils.download_file()
kedr_archive = wget.download(KEDR_0_6_URL)
TestRun.executor.rsync_to(
f"\"{kedr_archive}\"",
f"{TestRun.config['working_dir']}")
kedr_dir = TestRun.executor.run_expect_success(
f"cd {TestRun.config['working_dir']} && "
f"tar -ztf \"{kedr_archive}\" | sed -e 's@/.*@@' | uniq"
).stdout
TestRun.executor.run_expect_success(
f"cd {TestRun.config['working_dir']} && "
f"tar -xf \"{kedr_archive}\" && "
f"cd {kedr_dir} && "
f"mkdir -p {BUILD_DIR} && "
f"cd {BUILD_DIR} && "
f"cmake ../sources/ && "
f"make && "
f"make install"
)
os.remove(kedr_archive)
TestRun.LOGGER.info("Kedr installed succesfully")
@classmethod
def is_loaded(cls):
if not cls.is_installed():
raise Exception("Kedr is not installed!")
if "KEDR status: loaded" in TestRun.executor.run_expect_success("kedr status").stdout:
return True
else:
return False
@classmethod
def start(cls, module, profile: KedrProfile = KedrProfile.MEM_LEAK_CHECK):
if not cls.is_installed():
raise Exception("Kedr is not installed!")
TestRun.LOGGER.info(f"Starting kedr with {profile} profile")
start_cmd = f"kedr start {module} -f {profile.value}"
TestRun.executor.run_expect_success(start_cmd)
# TODO extend to scenarios other than kmalloc
def setup_fault_injections(condition: str = "1"):
TestRun.executor.run_expect_success(
f'echo "kmalloc" > {KMALLOC_FAULT_SIMULATION_PATH}/points/kmalloc/current_indicator')
TestRun.executor.run_expect_success(
f'echo "{condition}" > {KMALLOC_FAULT_SIMULATION_PATH}/points/kmalloc/expression')
@classmethod
def fsim_show_last_fault(cls):
if not cls.is_installed():
raise Exception("Kedr is not installed!")
if not cls.is_loaded():
raise Exception("Kedr is not loaded!")
return fs_utils.read_file(f"{KMALLOC_FAULT_SIMULATION_PATH}/last_fault")
@classmethod
def stop(cls):
if not cls.is_installed():
raise Exception("Kedr is not installed!")
TestRun.executor.run_expect_success("kedr stop")
@classmethod
def check_for_mem_leaks(cls, module):
if not cls.is_installed():
raise Exception("Kedr is not installed!")
if not cls.is_loaded():
raise Exception("Kedr is not loaded!")
if fs_utils.check_if_directory_exists(f"{LEAKS_LOGS_PATH}/{module}"):
logs_path = f"{LEAKS_LOGS_PATH}/{module}"
elif fs_utils.check_if_directory_exists(f"{DEBUGFS_MOUNT_POINT}"):
logs_path = f"{LEAKS_LOGS_PATH}"
else:
raise Exception("Couldn't find kedr logs dir!")
leaks = fs_utils.read_file(f"{logs_path}/possible_leaks")
frees = fs_utils.read_file(f"{logs_path}/unallocated_frees")
summary = fs_utils.read_file(f"{logs_path}/info")
if leaks or frees:
raise Exception("Memory leaks found!\n"
f"Kedr summary: {summary}\n"
f"Possible memory leaks: {leaks}\n"
f"Unallocated frees: {frees}\n")

View File

@ -4,12 +4,11 @@
# SPDX-License-Identifier: BSD-3-Clause # SPDX-License-Identifier: BSD-3-Clause
# #
import os import os
import re import re
from core.test_run import TestRun from core.test_run import TestRun
from test_utils.output import CmdException from connection.utils.output import CmdException
class RpmSet: class RpmSet:

View File

@ -7,8 +7,8 @@
import re import re
from core.test_run import TestRun from core.test_run import TestRun
from test_utils.size import Unit from type_def.size import Unit
from test_utils.os_utils import Udev from test_tools.udev import Udev
class Mdadm: class Mdadm:

109
test_tools/memory.py Normal file
View File

@ -0,0 +1,109 @@
#
# Copyright(c) 2019-2022 Intel Corporation
# Copyright(c) 2024 Huawei Technologies Co., Ltd.
# SPDX-License-Identifier: BSD-3-Clause
#
import math
from connection.utils.output import CmdException
from core.test_run import TestRun
from test_tools.dd import Dd
from test_tools.fs_tools import check_if_directory_exists, create_directory, is_mounted
from test_tools.os_tools import OvercommitMemoryMode, drop_caches, DropCachesMode, \
MEMORY_MOUNT_POINT
from type_def.size import Size, Unit
def disable_memory_affecting_functions():
"""Disables system functions affecting memory"""
# Don't allow sshd to be killed in case of out-of-memory:
TestRun.executor.run(
"echo '-1000' > /proc/`cat /var/run/sshd.pid`/oom_score_adj"
)
TestRun.executor.run(
"echo -17 > /proc/`cat /var/run/sshd.pid`/oom_adj"
) # deprecated
TestRun.executor.run_expect_success(
f"echo {OvercommitMemoryMode.NEVER.value} > /proc/sys/vm/overcommit_memory"
)
TestRun.executor.run_expect_success("echo '100' > /proc/sys/vm/overcommit_ratio")
TestRun.executor.run_expect_success(
"echo '64 64 32' > /proc/sys/vm/lowmem_reserve_ratio"
)
TestRun.executor.run_expect_success("swapoff --all")
drop_caches(DropCachesMode.SLAB)
def defaultize_memory_affecting_functions():
"""Sets default values to system functions affecting memory"""
TestRun.executor.run_expect_success(
f"echo {OvercommitMemoryMode.DEFAULT.value} > /proc/sys/vm/overcommit_memory"
)
TestRun.executor.run_expect_success("echo 50 > /proc/sys/vm/overcommit_ratio")
TestRun.executor.run_expect_success(
"echo '256 256 32' > /proc/sys/vm/lowmem_reserve_ratio"
)
TestRun.executor.run_expect_success("swapon --all")
def get_mem_free():
"""Returns free amount of memory in bytes"""
output = TestRun.executor.run_expect_success("free -b")
output = output.stdout.splitlines()
for line in output:
if 'free' in line:
index = line.split().index('free') + 1 # 1st row has 1 element less than following rows
if 'Mem' in line:
mem_line = line.split()
return Size(int(mem_line[index]))
def get_mem_available():
"""Returns amount of available memory from /proc/meminfo"""
cmd = "cat /proc/meminfo | grep MemAvailable | awk '{ print $2 }'"
mem_available = TestRun.executor.run(cmd).stdout
return Size(int(mem_available), Unit.KibiByte)
def get_module_mem_footprint(module_name):
"""Returns allocated size of specific module's metadata from /proc/vmallocinfo"""
cmd = f"cat /proc/vmallocinfo | grep {module_name} | awk '{{ print $2 }}' "
output_lines = TestRun.executor.run(cmd).stdout.splitlines()
memory_used = 0
for line in output_lines:
memory_used += int(line)
return Size(memory_used)
def allocate_memory(size: Size):
"""Allocates given amount of memory"""
mount_ramfs()
TestRun.LOGGER.info(f"Allocating {size.get_value(Unit.MiB):0.2f} MiB of memory.")
bs = Size(1, Unit.Blocks512)
dd = (
Dd()
.block_size(bs)
.count(math.ceil(size / bs))
.input("/dev/zero")
.output(f"{MEMORY_MOUNT_POINT}/data")
)
output = dd.run()
if output.exit_code != 0:
raise CmdException("Allocating memory failed.", output)
def mount_ramfs():
"""Mounts ramfs to enable allocating memory space"""
if not check_if_directory_exists(MEMORY_MOUNT_POINT):
create_directory(MEMORY_MOUNT_POINT)
if not is_mounted(MEMORY_MOUNT_POINT):
TestRun.executor.run_expect_success(f"mount -t ramfs ramfs {MEMORY_MOUNT_POINT}")
def unmount_ramfs():
"""Unmounts ramfs and releases whole space allocated by it in memory"""
TestRun.executor.run_expect_success(f"umount {MEMORY_MOUNT_POINT}")

View File

@ -1,7 +1,9 @@
# #
# Copyright(c) 2021 Intel Corporation # Copyright(c) 2021 Intel Corporation
# Copyright(c) 2024 Huawei Technologies Co., Ltd.
# SPDX-License-Identifier: BSD-3-Clause # SPDX-License-Identifier: BSD-3-Clause
# #
import json import json
from core.test_run import TestRun from core.test_run import TestRun

229
test_tools/os_tools.py Normal file
View File

@ -0,0 +1,229 @@
#
# Copyright(c) 2019-2022 Intel Corporation
# Copyright(c) 2024 Huawei Technologies Co., Ltd.
# SPDX-License-Identifier: BSD-3-Clause
#
import posixpath
import re
import time
from datetime import timedelta
from enum import IntFlag, Enum, StrEnum
from packaging import version
from core.test_run import TestRun
from storage_devices.device import Device
from test_tools.disk_tools import get_sysfs_path
from test_tools.fs_tools import check_if_file_exists, is_mounted
from connection.utils.retry import Retry
DEBUGFS_MOUNT_POINT = "/sys/kernel/debug"
MEMORY_MOUNT_POINT = "/mnt/memspace"
class Distro(StrEnum):
UBUNTU = "ubuntu"
DEBIAN = "debian"
REDHAT = "rhel"
OPENEULER = "openeuler"
CENTOS = "centos"
class DropCachesMode(IntFlag):
PAGECACHE = 1
SLAB = 2
ALL = PAGECACHE | SLAB
class OvercommitMemoryMode(Enum):
DEFAULT = 0
ALWAYS = 1
NEVER = 2
class SystemManagerType(Enum):
sysv = 0
systemd = 1
def get_distro():
output = TestRun.executor.run(
"cat /etc/os-release | grep -e \"^ID=\" | awk -F= '{print$2}' | tr -d '\"'"
).stdout.lower()
try:
return Distro(output)
except ValueError:
raise ValueError(f"Could not resolve distro name. Command output: {output}")
def drop_caches(level: DropCachesMode = DropCachesMode.ALL):
TestRun.executor.run_expect_success(
f"echo {level.value} > /proc/sys/vm/drop_caches")
def get_number_of_processors_from_cpuinfo():
"""Returns number of processors (count) which are listed out in /proc/cpuinfo"""
cmd = f"cat /proc/cpuinfo | grep processor | wc -l"
output = TestRun.executor.run(cmd).stdout
return int(output)
def get_number_of_processes(process_name):
cmd = f"ps aux | grep {process_name} | grep -v grep | wc -l"
output = TestRun.executor.run(cmd).stdout
return int(output)
def get_kernel_version():
version_string = TestRun.executor.run_expect_success("uname -r").stdout
version_string = version_string.split('-')[0]
return version.Version(version_string)
def is_kernel_module_loaded(module_name):
command = f"lsmod | grep -E '^{module_name}\\b'"
output = TestRun.executor.run(command)
return output.exit_code == 0
def load_kernel_module(module_name, module_args: {str, str}=None):
cmd = f"modprobe {module_name}"
if module_args is not None:
for key, value in module_args.items():
cmd += f" {key}={value}"
return TestRun.executor.run(cmd)
def unload_kernel_module(module_name):
cmd = f"modprobe -r {module_name}"
return TestRun.executor.run_expect_success(cmd)
def get_kernel_module_parameter(module_name, parameter):
param_file_path = f"/sys/module/{module_name}/parameters/{parameter}"
if not check_if_file_exists(param_file_path):
raise FileNotFoundError(f"File {param_file_path} does not exist!")
return TestRun.executor.run(f"cat {param_file_path}").stdout
def mount_debugfs():
if not is_mounted(DEBUGFS_MOUNT_POINT):
TestRun.executor.run_expect_success(f"mount -t debugfs none {DEBUGFS_MOUNT_POINT}")
def reload_kernel_module(module_name, module_args: {str, str}=None):
if is_kernel_module_loaded(module_name):
unload_kernel_module(module_name)
Retry.run_while_false(
lambda: load_kernel_module(module_name, module_args).exit_code == 0,
timeout=timedelta(seconds=5)
)
def get_module_path(module_name):
cmd = f"modinfo {module_name}"
# module path is in second column of first line of `modinfo` output
module_info = TestRun.executor.run_expect_success(cmd).stdout
module_path = module_info.splitlines()[0].split()[1]
return module_path
def get_executable_path(exec_name):
cmd = f"which {exec_name}"
path = TestRun.executor.run_expect_success(cmd).stdout
return path
def kill_all_io(graceful=True):
if graceful:
# TERM signal should be used in preference to the KILL signal, since a
# process may install a handler for the TERM signal in order to perform
# clean-up steps before terminating in an orderly fashion.
TestRun.executor.run("killall -q --signal TERM dd fio blktrace")
time.sleep(3)
TestRun.executor.run("killall -q --signal TERM dd fio blktrace")
time.sleep(3)
TestRun.executor.run("killall -q --signal KILL dd fio blktrace")
TestRun.executor.run("kill -9 `ps aux | grep -i vdbench.* | awk '{ print $2 }'`")
if TestRun.executor.run("pgrep -x dd").exit_code == 0:
raise Exception(f"Failed to stop dd!")
if TestRun.executor.run("pgrep -x fio").exit_code == 0:
raise Exception(f"Failed to stop fio!")
if TestRun.executor.run("pgrep -x blktrace").exit_code == 0:
raise Exception(f"Failed to stop blktrace!")
if TestRun.executor.run("pgrep vdbench").exit_code == 0:
raise Exception(f"Failed to stop vdbench!")
def sync():
TestRun.executor.run_expect_success("sync")
def get_dut_cpu_number():
return int(TestRun.executor.run_expect_success("nproc").stdout)
def get_dut_cpu_physical_cores():
""" Get list of CPU numbers that don't share physical cores """
output = TestRun.executor.run_expect_success("lscpu --all --parse").stdout
core_list = []
visited_phys_cores = []
for line in output.split("\n"):
if "#" in line:
continue
cpu_no, phys_core_no = line.split(",")[:2]
if phys_core_no not in visited_phys_cores:
core_list.append(cpu_no)
visited_phys_cores.append(phys_core_no)
return core_list
def set_wbt_lat(device: Device, value: int):
if value < 0:
raise ValueError("Write back latency can't be negative number")
wbt_lat_config_path = posixpath.join(
get_sysfs_path(device.get_device_id()), "queue/wbt_lat_usec"
)
return TestRun.executor.run_expect_success(f"echo {value} > {wbt_lat_config_path}")
def get_wbt_lat(device: Device):
wbt_lat_config_path = posixpath.join(
get_sysfs_path(device.get_device_id()), "queue/wbt_lat_usec"
)
return int(TestRun.executor.run_expect_success(f"cat {wbt_lat_config_path}").stdout)
def get_cores_ids_range(numa_node: int):
output = TestRun.executor.run_expect_success(f"lscpu --all --parse").stdout
parse_output = re.findall(r'(\d+),(\d+),(?:\d+),(\d+),,', output, re.I)
return [element[0] for element in parse_output if int(element[2]) == numa_node]
def create_user(username, additional_params=None):
command = "useradd "
if additional_params:
command += "".join([f"-{p} " for p in additional_params])
command += username
return TestRun.executor.run_expect_success(command)
def check_if_user_exists(username):
return TestRun.executor.run(f"id {username}").exit_code == 0

View File

@ -12,10 +12,10 @@ import tempfile
import lxml.etree as etree import lxml.etree as etree
from collections import namedtuple from collections import namedtuple
from test_tools import wget
from core.test_run import TestRun from core.test_run import TestRun
from test_tools import fs_utils from test_tools.fs_tools import create_directory, check_if_file_exists, write_file, remove, \
from test_tools.fs_utils import create_directory, check_if_file_exists, write_file check_if_directory_exists
from test_utils import os_utils
class PeachFuzzer: class PeachFuzzer:
@ -75,7 +75,7 @@ class PeachFuzzer:
cls._install() cls._install()
if not cls._is_xml_config_prepared(): if not cls._is_xml_config_prepared():
TestRun.block("No Peach Fuzzer XML config needed to generate fuzzed values was found!") TestRun.block("No Peach Fuzzer XML config needed to generate fuzzed values was found!")
fs_utils.remove(cls.fuzzy_output_file, force=True, ignore_errors=True) remove(cls.fuzzy_output_file, force=True, ignore_errors=True)
TestRun.LOGGER.info(f"Generate {count} unique fuzzed values") TestRun.LOGGER.info(f"Generate {count} unique fuzzed values")
cmd = f"cd {cls.base_dir}; {cls.peach_dir}/peach --range 0,{count - 1} " \ cmd = f"cd {cls.base_dir}; {cls.peach_dir}/peach --range 0,{count - 1} " \
f"--seed {random.randrange(2 ** 32)} {cls.xml_config_file} > " \ f"--seed {random.randrange(2 ** 32)} {cls.xml_config_file} > " \
@ -155,7 +155,7 @@ class PeachFuzzer:
Install Peach Fuzzer on the DUT Install Peach Fuzzer on the DUT
""" """
create_directory(cls.base_dir, True) create_directory(cls.base_dir, True)
peach_archive = os_utils.download_file( peach_archive = wget.download_file(
cls.peach_fuzzer_3_0_url, destination_dir=cls.base_dir cls.peach_fuzzer_3_0_url, destination_dir=cls.base_dir
) )
TestRun.executor.run_expect_success( TestRun.executor.run_expect_success(
@ -172,7 +172,7 @@ class PeachFuzzer:
""" """
if not cls._is_mono_installed(): if not cls._is_mono_installed():
TestRun.block("Mono is not installed, can't continue with Peach Fuzzer!") TestRun.block("Mono is not installed, can't continue with Peach Fuzzer!")
if fs_utils.check_if_directory_exists(posixpath.join(cls.base_dir, cls.peach_dir)): if check_if_directory_exists(posixpath.join(cls.base_dir, cls.peach_dir)):
return "Peach" in TestRun.executor.run( return "Peach" in TestRun.executor.run(
f"cd {cls.base_dir} && {cls.peach_dir}/peach --version").stdout.strip() f"cd {cls.base_dir} && {cls.peach_dir}/peach --version").stdout.strip()
else: else:
@ -197,7 +197,7 @@ class PeachFuzzer:
""" """
Check if Peach Fuzzer XML config is present on the DUT Check if Peach Fuzzer XML config is present on the DUT
""" """
if fs_utils.check_if_file_exists(cls.xml_config_file): if check_if_file_exists(cls.xml_config_file):
return True return True
else: else:
return False return False

109
test_tools/runlevel.py Normal file
View File

@ -0,0 +1,109 @@
#
# Copyright(c) 2019-2022 Intel Corporation
# Copyright(c) 2024 Huawei Technologies Co., Ltd.
# SPDX-License-Identifier: BSD-3-Clause
#
from enum import IntEnum
from core.test_run import TestRun
from test_tools.os_tools import SystemManagerType
class Runlevel(IntEnum):
"""
Halt the system.
SysV Runlevel: 0
systemd Target: runlevel0.target, poweroff.target
"""
runlevel0 = 0
poweroff = runlevel0
"""
Single user mode.
SysV Runlevel: 1, s, single
systemd Target: runlevel1.target, rescue.target
"""
runlevel1 = 1
rescue = runlevel1
"""
User-defined/Site-specific runlevels. By default, identical to 3.
SysV Runlevel: 2, 4
systemd Target: runlevel2.target, runlevel4.target, multi-user.target
"""
runlevel2 = 2
"""
Multi-user, non-graphical. Users can usually login via multiple consoles or via the network.
SysV Runlevel: 3
systemd Target: runlevel3.target, multi-user.target
"""
runlevel3 = 3
multi_user = runlevel3
"""
Multi-user, graphical. Usually has all the services of runlevel 3 plus a graphical login.
SysV Runlevel: 5
systemd Target: runlevel5.target, graphical.target
"""
runlevel5 = 5
graphical = runlevel5
"""
Reboot
SysV Runlevel: 6
systemd Target: runlevel6.target, reboot.target
"""
runlevel6 = 6
reboot = runlevel6
"""
Emergency shell
SysV Runlevel: emergency
systemd Target: emergency.target
"""
runlevel7 = 7
emergency = runlevel7
def get_system_manager():
output = TestRun.executor.run_expect_success("ps -p 1").stdout
type = output.split('\n')[1].split()[3]
if type == "init":
return SystemManagerType.sysv
elif type == "systemd":
return SystemManagerType.systemd
raise Exception(f"Unknown system manager type ({type}).")
def change_runlevel(runlevel: Runlevel):
if runlevel == get_runlevel():
return
if Runlevel.runlevel0 < runlevel < Runlevel.runlevel6:
system_manager = get_system_manager()
if system_manager == SystemManagerType.systemd:
TestRun.executor.run_expect_success(f"systemctl set-default {runlevel.name}.target")
else:
TestRun.executor.run_expect_success(
f"sed -i 's/^.*id:.*$/id:{runlevel.value}:initdefault: /' /etc/inittab")
TestRun.executor.run_expect_success(f"init {runlevel.value}")
def get_runlevel():
system_manager = get_system_manager()
if system_manager == SystemManagerType.systemd:
result = TestRun.executor.run_expect_success("systemctl get-default")
try:
name = result.stdout.split(".")[0].replace("-", "_")
return Runlevel[name]
except Exception:
raise Exception(f"Cannot parse '{result.output}' to runlevel.")
else:
result = TestRun.executor.run_expect_success("runlevel")
try:
split_output = result.stdout.split()
runlevel = Runlevel(int(split_output[1]))
return runlevel
except Exception:
raise Exception(f"Cannot parse '{result.output}' to runlevel.")

View File

@ -1,10 +1,10 @@
# #
# Copyright(c) 2019-2022 Intel Corporation # Copyright(c) 2019-2022 Intel Corporation
# Copyright(c) 2024 Huawei Technologies Co., Ltd.
# SPDX-License-Identifier: BSD-3-Clause # SPDX-License-Identifier: BSD-3-Clause
# #
from pathlib import Path from pathlib import Path
from core.test_run import TestRun from core.test_run import TestRun
systemd_service_directory = Path("/usr/lib/systemd/system/") systemd_service_directory = Path("/usr/lib/systemd/system/")
@ -23,3 +23,13 @@ def reload_daemon():
def restart_service(name): def restart_service(name):
TestRun.executor.run_expect_success(f"systemctl restart {name}") TestRun.executor.run_expect_success(f"systemctl restart {name}")
def get_service_path(unit_name):
cmd = f"systemctl cat {unit_name}"
# path is in second column of first line of output
info = TestRun.executor.run_expect_success(cmd).stdout
path = info.splitlines()[0].split()[1]
return path

27
test_tools/udev.py Normal file
View File

@ -0,0 +1,27 @@
#
# Copyright(c) 2019-2022 Intel Corporation
# Copyright(c) 2024 Huawei Technologies Co., Ltd.
# SPDX-License-Identifier: BSD-3-Clause
#
from core.test_run import TestRun
class Udev(object):
@staticmethod
def enable():
TestRun.LOGGER.info("Enabling udev")
TestRun.executor.run_expect_success("udevadm control --start-exec-queue")
@staticmethod
def disable():
TestRun.LOGGER.info("Disabling udev")
TestRun.executor.run_expect_success("udevadm control --stop-exec-queue")
@staticmethod
def trigger():
TestRun.executor.run_expect_success("udevadm trigger")
@staticmethod
def settle():
TestRun.executor.run_expect_success("udevadm settle")

17
test_tools/wget.py Normal file
View File

@ -0,0 +1,17 @@
#
# Copyright(c) 2019-2022 Intel Corporation
# Copyright(c) 2024 Huawei Technologies Co., Ltd.
# SPDX-License-Identifier: BSD-3-Clause
#
from core.test_run import TestRun
from test_utils.filesystem.file import File
def download_file(url, destination_dir="/tmp"):
# TODO use wget module instead
command = ("wget --tries=3 --timeout=5 --continue --quiet "
f"--directory-prefix={destination_dir} {url}")
TestRun.executor.run_expect_success(command)
path = f"{destination_dir.rstrip('/')}/{File.get_name(url)}"
return File(path)

View File

@ -0,0 +1,4 @@
#
# Copyright(c) 2024 Huawei Technologies Co., Ltd.
# SPDX-License-Identifier: BSD-3-Clause
#

View File

@ -1,5 +1,6 @@
# #
# Copyright(c) 2019-2021 Intel Corporation # Copyright(c) 2019-2021 Intel Corporation
# Copyright(c) 2024 Huawei Technologies Co., Ltd.
# SPDX-License-Identifier: BSD-3-Clause # SPDX-License-Identifier: BSD-3-Clause
# #

View File

@ -3,8 +3,6 @@
# SPDX-License-Identifier: BSD-3-Clause-Clear # SPDX-License-Identifier: BSD-3-Clause-Clear
# #
import os
from test_utils.filesystem.file import File from test_utils.filesystem.file import File

View File

@ -7,8 +7,8 @@ from textwrap import dedent
from string import Template from string import Template
from pathlib import Path from pathlib import Path
from .systemd import enable_service, reload_daemon, systemd_service_directory, disable_service from test_tools.systemctl import enable_service, reload_daemon, systemd_service_directory
from test_tools.fs_utils import ( from test_tools.fs_tools import (
create_file, create_file,
write_file, write_file,
remove, remove,

View File

@ -1,10 +1,12 @@
# #
# Copyright(c) 2019-2021 Intel Corporation # Copyright(c) 2019-2021 Intel Corporation
# Copyright(c) 2024 Huawei Technologies Co., Ltd.
# SPDX-License-Identifier: BSD-3-Clause # SPDX-License-Identifier: BSD-3-Clause
# #
from core.test_run import TestRun from core.test_run import TestRun
from test_tools import fs_utils from test_tools import fs_tools
from test_tools.fs_utils import check_if_directory_exists from test_tools.fs_tools import check_if_directory_exists, parse_ls_output, ls_item, ls
from test_utils.filesystem.fs_item import FsItem from test_utils.filesystem.fs_item import FsItem
@ -13,14 +15,14 @@ class Directory(FsItem):
FsItem.__init__(self, full_path) FsItem.__init__(self, full_path)
def ls(self): def ls(self):
output = fs_utils.ls(f"{self.full_path}") output = ls(self.full_path)
return fs_utils.parse_ls_output(output, self.full_path) return parse_ls_output(output, self.full_path)
@staticmethod @staticmethod
def create_directory(path: str, parents: bool = False): def create_directory(path: str, parents: bool = False):
fs_utils.create_directory(path, parents) fs_tools.create_directory(path, parents)
output = fs_utils.ls_item(path) output = ls_item(path)
return fs_utils.parse_ls_output(output)[0] return parse_ls_output(output)[0]
@staticmethod @staticmethod
def create_temp_directory(parent_dir_path: str = "/tmp"): def create_temp_directory(parent_dir_path: str = "/tmp"):

View File

@ -3,12 +3,15 @@
# Copyright(c) 2023-2024 Huawei Technologies Co., Ltd. # Copyright(c) 2023-2024 Huawei Technologies Co., Ltd.
# SPDX-License-Identifier: BSD-3-Clause # SPDX-License-Identifier: BSD-3-Clause
# #
from datetime import timedelta from datetime import timedelta
from test_tools import fs_utils from test_tools import fs_tools
from test_tools.dd import Dd from test_tools.dd import Dd
from test_tools.fs_tools import read_file, write_file, ls_item, parse_ls_output, remove, \
check_if_directory_exists
from test_utils.filesystem.fs_item import FsItem from test_utils.filesystem.fs_item import FsItem
from test_utils.size import Size from type_def.size import Size
class File(FsItem): class File(FsItem):
@ -16,22 +19,22 @@ class File(FsItem):
FsItem.__init__(self, full_path) FsItem.__init__(self, full_path)
def compare(self, other_file, timeout: timedelta = timedelta(minutes=30)): def compare(self, other_file, timeout: timedelta = timedelta(minutes=30)):
return fs_utils.compare(str(self), str(other_file), timeout) return fs_tools.compare(str(self), str(other_file), timeout)
def diff(self, other_file, timeout: timedelta = timedelta(minutes=30)): def diff(self, other_file, timeout: timedelta = timedelta(minutes=30)):
return fs_utils.diff(str(self), str(other_file), timeout) return fs_tools.diff(str(self), str(other_file), timeout)
def md5sum(self, binary=True, timeout: timedelta = timedelta(minutes=30)): def md5sum(self, binary=True, timeout: timedelta = timedelta(minutes=30)):
return fs_utils.md5sum(str(self), binary, timeout) return fs_tools.md5sum(str(self), binary, timeout)
def crc32sum(self, timeout: timedelta = timedelta(minutes=30)): def crc32sum(self, timeout: timedelta = timedelta(minutes=30)):
return fs_utils.crc32sum(str(self), timeout) return fs_tools.crc32sum(str(self), timeout)
def read(self): def read(self):
return fs_utils.read_file(str(self)) return read_file(str(self))
def write(self, content, overwrite: bool = True): def write(self, content, overwrite: bool = True):
fs_utils.write_file(str(self), content, overwrite) write_file(str(self), content, overwrite)
self.refresh_item() self.refresh_item()
def get_properties(self): def get_properties(self):
@ -39,9 +42,9 @@ class File(FsItem):
@staticmethod @staticmethod
def create_file(path: str): def create_file(path: str):
fs_utils.create_file(path) fs_tools.create_file(path)
output = fs_utils.ls_item(path) output = ls_item(path)
return fs_utils.parse_ls_output(output)[0] return parse_ls_output(output)[0]
def padding(self, size: Size): def padding(self, size: Size):
dd = Dd().input("/dev/zero").output(self).count(1).block_size(size) dd = Dd().input("/dev/zero").output(self).count(1).block_size(size)
@ -49,7 +52,7 @@ class File(FsItem):
self.refresh_item() self.refresh_item()
def remove(self, force: bool = False, ignore_errors: bool = False): def remove(self, force: bool = False, ignore_errors: bool = False):
fs_utils.remove(str(self), force=force, ignore_errors=ignore_errors) remove(str(self), force=force, ignore_errors=ignore_errors)
def copy(self, def copy(self,
destination, destination,
@ -57,18 +60,18 @@ class File(FsItem):
recursive: bool = False, recursive: bool = False,
dereference: bool = False, dereference: bool = False,
timeout: timedelta = timedelta(minutes=30)): timeout: timedelta = timedelta(minutes=30)):
fs_utils.copy(str(self), destination, force, recursive, dereference, timeout) fs_tools.copy(str(self), destination, force, recursive, dereference, timeout)
if fs_utils.check_if_directory_exists(destination): if check_if_directory_exists(destination):
path = f"{destination}{'/' if destination[-1] != '/' else ''}{self.name}" path = f"{destination}{'/' if destination[-1] != '/' else ''}{self.name}"
else: else:
path = destination path = destination
output = fs_utils.ls_item(path) output = ls_item(path)
return fs_utils.parse_ls_output(output)[0] return parse_ls_output(output)[0]
class FileProperties: class FileProperties:
def __init__(self, file): def __init__(self, file):
file = fs_utils.parse_ls_output(fs_utils.ls_item(file.full_path))[0] file = parse_ls_output(ls_item(file.full_path))[0]
self.full_path = file.full_path self.full_path = file.full_path
self.parent_dir = FsItem.get_parent_dir(self.full_path) self.parent_dir = FsItem.get_parent_dir(self.full_path)
self.name = FsItem.get_name(self.full_path) self.name = FsItem.get_name(self.full_path)

View File

@ -1,11 +1,14 @@
# #
# Copyright(c) 2019-2021 Intel Corporation # Copyright(c) 2019-2021 Intel Corporation
# Copyright(c) 2024 Huawei Technologies Co., Ltd.
# SPDX-License-Identifier: BSD-3-Clause # SPDX-License-Identifier: BSD-3-Clause
# #
import posixpath import posixpath
from test_tools import fs_utils from test_tools import fs_tools
from test_tools.fs_tools import Permissions, PermissionsUsers, PermissionSign, \
check_if_directory_exists, ls_item, parse_ls_output
class FsItem: class FsItem:
@ -38,19 +41,19 @@ class FsItem:
return self.full_path return self.full_path
def chmod_numerical(self, permissions: int, recursive: bool = False): def chmod_numerical(self, permissions: int, recursive: bool = False):
fs_utils.chmod_numerical(self.full_path, permissions, recursive) fs_tools.chmod_numerical(self.full_path, permissions, recursive)
self.refresh_item() self.refresh_item()
def chmod(self, def chmod(self,
permissions: fs_utils.Permissions, permissions: Permissions,
users: fs_utils.PermissionsUsers, users: PermissionsUsers,
sign: fs_utils.PermissionSign = fs_utils.PermissionSign.set, sign: PermissionSign = PermissionSign.set,
recursive: bool = False): recursive: bool = False):
fs_utils.chmod(self.full_path, permissions, users, sign=sign, recursive=recursive) fs_tools.chmod(self.full_path, permissions, users, sign=sign, recursive=recursive)
self.refresh_item() self.refresh_item()
def chown(self, owner, group, recursive: bool = False): def chown(self, owner, group, recursive: bool = False):
fs_utils.chown(self.full_path, owner, group, recursive) fs_tools.chown(self.full_path, owner, group, recursive)
self.refresh_item() self.refresh_item()
def copy(self, def copy(self,
@ -58,20 +61,20 @@ class FsItem:
force: bool = False, force: bool = False,
recursive: bool = False, recursive: bool = False,
dereference: bool = False): dereference: bool = False):
target_dir_exists = fs_utils.check_if_directory_exists(destination) target_dir_exists = check_if_directory_exists(destination)
fs_utils.copy(str(self), destination, force, recursive, dereference) fs_tools.copy(str(self), destination, force, recursive, dereference)
if target_dir_exists: if target_dir_exists:
path = f"{destination}{'/' if destination[-1] != '/' else ''}{self.name}" path = f"{destination}{'/' if destination[-1] != '/' else ''}{self.name}"
else: else:
path = destination path = destination
output = fs_utils.ls_item(f"{path}") output = ls_item(f"{path}")
return fs_utils.parse_ls_output(output)[0] return parse_ls_output(output)[0]
def move(self, def move(self,
destination, destination,
force: bool = False): force: bool = False):
target_dir_exists = fs_utils.check_if_directory_exists(destination) target_dir_exists = check_if_directory_exists(destination)
fs_utils.move(str(self), destination, force) fs_tools.move(str(self), destination, force)
if target_dir_exists: if target_dir_exists:
self.full_path = f"{destination}{'/' if destination[-1] != '/' else ''}{self.name}" self.full_path = f"{destination}{'/' if destination[-1] != '/' else ''}{self.name}"
else: else:
@ -80,7 +83,7 @@ class FsItem:
return self return self
def refresh_item(self): def refresh_item(self):
updated_file = fs_utils.parse_ls_output(fs_utils.ls_item(self.full_path))[0] updated_file = parse_ls_output(ls_item(self.full_path))[0]
# keep order the same as in __init__() # keep order the same as in __init__()
self.parent_dir = updated_file.parent_dir self.parent_dir = updated_file.parent_dir
self.name = updated_file.name self.name = updated_file.name

View File

@ -5,7 +5,7 @@
# #
from core.test_run import TestRun from core.test_run import TestRun
from test_tools.fs_utils import ( from test_tools.fs_tools import (
readlink, readlink,
create_directory, create_directory,
check_if_symlink_exists, check_if_symlink_exists,

View File

@ -1,20 +0,0 @@
#
# Copyright(c) 2019-2021 Intel Corporation
# SPDX-License-Identifier: BSD-3-Clause
#
from test_tools import fs_utils
from test_utils import systemd
def add_mountpoint(device, mount_point, fs_type, mount_now=True):
fs_utils.append_line("/etc/fstab",
f"{device.path} {mount_point} {fs_type.name} defaults 0 0")
systemd.reload_daemon()
if mount_now:
systemd.restart_service("local-fs.target")
def remove_mountpoint(device):
fs_utils.remove_lines("/etc/fstab", device.path)
systemd.reload_daemon()

View File

@ -1,11 +0,0 @@
#
# Copyright(c) 2019-2021 Intel Corporation
# SPDX-License-Identifier: BSD-3-Clause
#
import random
import string
def random_string(length: int, chars=string.ascii_letters + string.digits):
return ''.join(random.choice(chars) for i in range(length))

View File

@ -5,7 +5,7 @@
import re import re
from core.test_run import TestRun from core.test_run import TestRun
from test_utils.output import CmdException from connection.utils.output import CmdException
SYSFS_LINE_FORMAT = r"^(\d+\s+){10,}\d+$" SYSFS_LINE_FORMAT = r"^(\d+\s+){10,}\d+$"
PROCFS_LINE_FORMAT = r"^\d+\s+\d+\s+[\w-]+\s+" + SYSFS_LINE_FORMAT[1:] PROCFS_LINE_FORMAT = r"^\d+\s+\d+\s+[\w-]+\s+" + SYSFS_LINE_FORMAT[1:]

View File

@ -1,508 +0,0 @@
#
# Copyright(c) 2019-2022 Intel Corporation
# Copyright(c) 2024 Huawei Technologies Co., Ltd.
# SPDX-License-Identifier: BSD-3-Clause
#
import math
import posixpath
import re
import time
from datetime import timedelta, datetime
from enum import IntFlag, Enum, IntEnum, StrEnum
from packaging import version
from typing import List
from core.test_run import TestRun
from storage_devices.device import Device
from test_tools.dd import Dd
from test_tools.disk_utils import get_sysfs_path
from test_tools.fs_utils import check_if_directory_exists, create_directory, check_if_file_exists
from test_utils.filesystem.file import File
from test_utils.output import CmdException
from test_utils.retry import Retry
from test_utils.size import Size, Unit
DEBUGFS_MOUNT_POINT = "/sys/kernel/debug"
MEMORY_MOUNT_POINT = "/mnt/memspace"
class Distro(StrEnum):
UBUNTU = "ubuntu"
DEBIAN = "debian"
REDHAT = "rhel"
OPENEULER = "openeuler"
CENTOS = "centos"
class DropCachesMode(IntFlag):
PAGECACHE = 1
SLAB = 2
ALL = PAGECACHE | SLAB
class OvercommitMemoryMode(Enum):
DEFAULT = 0
ALWAYS = 1
NEVER = 2
class Runlevel(IntEnum):
"""
Halt the system.
SysV Runlevel: 0
systemd Target: runlevel0.target, poweroff.target
"""
runlevel0 = 0
poweroff = runlevel0
"""
Single user mode.
SysV Runlevel: 1, s, single
systemd Target: runlevel1.target, rescue.target
"""
runlevel1 = 1
rescue = runlevel1
"""
User-defined/Site-specific runlevels. By default, identical to 3.
SysV Runlevel: 2, 4
systemd Target: runlevel2.target, runlevel4.target, multi-user.target
"""
runlevel2 = 2
"""
Multi-user, non-graphical. Users can usually login via multiple consoles or via the network.
SysV Runlevel: 3
systemd Target: runlevel3.target, multi-user.target
"""
runlevel3 = 3
multi_user = runlevel3
"""
Multi-user, graphical. Usually has all the services of runlevel 3 plus a graphical login.
SysV Runlevel: 5
systemd Target: runlevel5.target, graphical.target
"""
runlevel5 = 5
graphical = runlevel5
"""
Reboot
SysV Runlevel: 6
systemd Target: runlevel6.target, reboot.target
"""
runlevel6 = 6
reboot = runlevel6
"""
Emergency shell
SysV Runlevel: emergency
systemd Target: emergency.target
"""
runlevel7 = 7
emergency = runlevel7
class SystemManagerType(Enum):
sysv = 0
systemd = 1
def get_distro():
output = TestRun.executor.run(
"cat /etc/os-release | grep -e \"^ID=\" | awk -F= '{print$2}' | tr -d '\"'"
).stdout.lower()
try:
return Distro(output)
except ValueError:
raise ValueError(f"Could not resolve distro name. Command output: {output}")
def get_system_manager():
output = TestRun.executor.run_expect_success("ps -p 1").stdout
type = output.split('\n')[1].split()[3]
if type == "init":
return SystemManagerType.sysv
elif type == "systemd":
return SystemManagerType.systemd
raise Exception(f"Unknown system manager type ({type}).")
def change_runlevel(runlevel: Runlevel):
if runlevel == get_runlevel():
return
if Runlevel.runlevel0 < runlevel < Runlevel.runlevel6:
system_manager = get_system_manager()
if system_manager == SystemManagerType.systemd:
TestRun.executor.run_expect_success(f"systemctl set-default {runlevel.name}.target")
else:
TestRun.executor.run_expect_success(
f"sed -i 's/^.*id:.*$/id:{runlevel.value}:initdefault: /' /etc/inittab")
TestRun.executor.run_expect_success(f"init {runlevel.value}")
def get_runlevel():
system_manager = get_system_manager()
if system_manager == SystemManagerType.systemd:
result = TestRun.executor.run_expect_success("systemctl get-default")
try:
name = result.stdout.split(".")[0].replace("-", "_")
return Runlevel[name]
except Exception:
raise Exception(f"Cannot parse '{result.output}' to runlevel.")
else:
result = TestRun.executor.run_expect_success("runlevel")
try:
split_output = result.stdout.split()
runlevel = Runlevel(int(split_output[1]))
return runlevel
except Exception:
raise Exception(f"Cannot parse '{result.output}' to runlevel.")
class Udev(object):
@staticmethod
def enable():
TestRun.LOGGER.info("Enabling udev")
TestRun.executor.run_expect_success("udevadm control --start-exec-queue")
@staticmethod
def disable():
TestRun.LOGGER.info("Disabling udev")
TestRun.executor.run_expect_success("udevadm control --stop-exec-queue")
@staticmethod
def trigger():
TestRun.executor.run_expect_success("udevadm trigger")
@staticmethod
def settle():
TestRun.executor.run_expect_success("udevadm settle")
def drop_caches(level: DropCachesMode = DropCachesMode.ALL):
TestRun.executor.run_expect_success(
f"echo {level.value} > /proc/sys/vm/drop_caches")
def disable_memory_affecting_functions():
"""Disables system functions affecting memory"""
# Don't allow sshd to be killed in case of out-of-memory:
TestRun.executor.run(
"echo '-1000' > /proc/`cat /var/run/sshd.pid`/oom_score_adj"
)
TestRun.executor.run(
"echo -17 > /proc/`cat /var/run/sshd.pid`/oom_adj"
) # deprecated
TestRun.executor.run_expect_success(
f"echo {OvercommitMemoryMode.NEVER.value} > /proc/sys/vm/overcommit_memory"
)
TestRun.executor.run_expect_success("echo '100' > /proc/sys/vm/overcommit_ratio")
TestRun.executor.run_expect_success(
"echo '64 64 32' > /proc/sys/vm/lowmem_reserve_ratio"
)
TestRun.executor.run_expect_success("swapoff --all")
drop_caches(DropCachesMode.SLAB)
def defaultize_memory_affecting_functions():
"""Sets default values to system functions affecting memory"""
TestRun.executor.run_expect_success(
f"echo {OvercommitMemoryMode.DEFAULT.value} > /proc/sys/vm/overcommit_memory"
)
TestRun.executor.run_expect_success("echo 50 > /proc/sys/vm/overcommit_ratio")
TestRun.executor.run_expect_success(
"echo '256 256 32' > /proc/sys/vm/lowmem_reserve_ratio"
)
TestRun.executor.run_expect_success("swapon --all")
def get_mem_free():
"""Returns free amount of memory in bytes"""
output = TestRun.executor.run_expect_success("free -b")
output = output.stdout.splitlines()
for line in output:
if 'free' in line:
index = line.split().index('free') + 1 # 1st row has 1 element less than following rows
if 'Mem' in line:
mem_line = line.split()
return Size(int(mem_line[index]))
def get_mem_available():
"""Returns amount of available memory from /proc/meminfo"""
cmd = "cat /proc/meminfo | grep MemAvailable | awk '{ print $2 }'"
mem_available = TestRun.executor.run(cmd).stdout
return Size(int(mem_available), Unit.KibiByte)
def get_module_mem_footprint(module_name):
"""Returns allocated size of specific module's metadata from /proc/vmallocinfo"""
cmd = f"cat /proc/vmallocinfo | grep {module_name} | awk '{{ print $2 }}' "
output_lines = TestRun.executor.run(cmd).stdout.splitlines()
memory_used = 0
for line in output_lines:
memory_used += int(line)
return Size(memory_used)
def allocate_memory(size: Size):
"""Allocates given amount of memory"""
mount_ramfs()
TestRun.LOGGER.info(f"Allocating {size.get_value(Unit.MiB):0.2f} MiB of memory.")
bs = Size(1, Unit.Blocks512)
dd = (
Dd()
.block_size(bs)
.count(math.ceil(size / bs))
.input("/dev/zero")
.output(f"{MEMORY_MOUNT_POINT}/data")
)
output = dd.run()
if output.exit_code != 0:
raise CmdException("Allocating memory failed.", output)
def get_number_of_processors_from_cpuinfo():
"""Returns number of processors (count) which are listed out in /proc/cpuinfo"""
cmd = f"cat /proc/cpuinfo | grep processor | wc -l"
output = TestRun.executor.run(cmd).stdout
return int(output)
def get_number_of_processes(process_name):
cmd = f"ps aux | grep {process_name} | grep -v grep | wc -l"
output = TestRun.executor.run(cmd).stdout
return int(output)
def mount_ramfs():
"""Mounts ramfs to enable allocating memory space"""
if not check_if_directory_exists(MEMORY_MOUNT_POINT):
create_directory(MEMORY_MOUNT_POINT)
if not is_mounted(MEMORY_MOUNT_POINT):
TestRun.executor.run_expect_success(f"mount -t ramfs ramfs {MEMORY_MOUNT_POINT}")
def unmount_ramfs():
"""Unmounts ramfs and releases whole space allocated by it in memory"""
TestRun.executor.run_expect_success(f"umount {MEMORY_MOUNT_POINT}")
def download_file(url, destination_dir="/tmp"):
# TODO use wget module instead
command = ("wget --tries=3 --timeout=5 --continue --quiet "
f"--directory-prefix={destination_dir} {url}")
TestRun.executor.run_expect_success(command)
path = f"{destination_dir.rstrip('/')}/{File.get_name(url)}"
return File(path)
def get_kernel_version():
version_string = TestRun.executor.run_expect_success("uname -r").stdout
version_string = version_string.split('-')[0]
return version.Version(version_string)
class ModuleRemoveMethod(Enum):
rmmod = "rmmod"
modprobe = "modprobe -r"
def is_kernel_module_loaded(module_name):
output = TestRun.executor.run(f"lsmod | grep ^{module_name}")
return output.exit_code == 0
def get_sys_block_path():
sys_block = "/sys/class/block"
if not check_if_directory_exists(sys_block):
sys_block = "/sys/block"
return sys_block
def load_kernel_module(module_name, module_args: {str, str}=None):
cmd = f"modprobe {module_name}"
if module_args is not None:
for key, value in module_args.items():
cmd += f" {key}={value}"
return TestRun.executor.run(cmd)
def unload_kernel_module(module_name, unload_method: ModuleRemoveMethod = ModuleRemoveMethod.rmmod):
cmd = f"{unload_method.value} {module_name}"
return TestRun.executor.run_expect_success(cmd)
def get_kernel_module_parameter(module_name, parameter):
param_file_path = f"/sys/module/{module_name}/parameters/{parameter}"
if not check_if_file_exists(param_file_path):
raise FileNotFoundError(f"File {param_file_path} does not exist!")
return File(param_file_path).read()
def is_mounted(path: str):
if path is None or path.isspace():
raise Exception("Checked path cannot be empty")
command = f"mount | grep --fixed-strings '{path.rstrip('/')} '"
return TestRun.executor.run(command).exit_code == 0
def mount_debugfs():
if not is_mounted(DEBUGFS_MOUNT_POINT):
TestRun.executor.run_expect_success(f"mount -t debugfs none {DEBUGFS_MOUNT_POINT}")
def reload_kernel_module(module_name, module_args: {str, str}=None,
unload_method: ModuleRemoveMethod = ModuleRemoveMethod.rmmod):
if is_kernel_module_loaded(module_name):
unload_kernel_module(module_name, unload_method)
Retry.run_while_false(
lambda: load_kernel_module(module_name, module_args).exit_code == 0,
timeout=timedelta(seconds=5)
)
def get_module_path(module_name):
cmd = f"modinfo {module_name}"
# module path is in second column of first line of `modinfo` output
module_info = TestRun.executor.run_expect_success(cmd).stdout
module_path = module_info.splitlines()[0].split()[1]
return module_path
def get_executable_path(exec_name):
cmd = f"which {exec_name}"
path = TestRun.executor.run_expect_success(cmd).stdout
return path
def get_udev_service_path(unit_name):
cmd = f"systemctl cat {unit_name}"
# path is in second column of first line of output
info = TestRun.executor.run_expect_success(cmd).stdout
path = info.splitlines()[0].split()[1]
return path
def kill_all_io(graceful=True):
if graceful:
# TERM signal should be used in preference to the KILL signal, since a
# process may install a handler for the TERM signal in order to perform
# clean-up steps before terminating in an orderly fashion.
TestRun.executor.run("killall -q --signal TERM dd fio blktrace")
time.sleep(3)
TestRun.executor.run("killall -q --signal TERM dd fio blktrace")
time.sleep(3)
TestRun.executor.run("killall -q --signal KILL dd fio blktrace")
TestRun.executor.run("kill -9 `ps aux | grep -i vdbench.* | awk '{ print $2 }'`")
if TestRun.executor.run("pgrep -x dd").exit_code == 0:
raise Exception(f"Failed to stop dd!")
if TestRun.executor.run("pgrep -x fio").exit_code == 0:
raise Exception(f"Failed to stop fio!")
if TestRun.executor.run("pgrep -x blktrace").exit_code == 0:
raise Exception(f"Failed to stop blktrace!")
if TestRun.executor.run("pgrep vdbench").exit_code == 0:
raise Exception(f"Failed to stop vdbench!")
def wait(predicate, timeout: timedelta, interval: timedelta = None):
start_time = datetime.now()
result = False
while start_time + timeout > datetime.now():
result = predicate()
if result:
break
if interval is not None:
time.sleep(interval.total_seconds())
return result
def sync():
TestRun.executor.run_expect_success("sync")
def get_dut_cpu_number():
return int(TestRun.executor.run_expect_success("nproc").stdout)
def get_dut_cpu_physical_cores():
""" Get list of CPU numbers that don't share physical cores """
output = TestRun.executor.run_expect_success("lscpu --all --parse").stdout
core_list = []
visited_phys_cores = []
for line in output.split("\n"):
if "#" in line:
continue
cpu_no, phys_core_no = line.split(",")[:2]
if phys_core_no not in visited_phys_cores:
core_list.append(cpu_no)
visited_phys_cores.append(phys_core_no)
return core_list
def set_wbt_lat(device: Device, value: int):
if value < 0:
raise ValueError("Write back latency can't be negative number")
wbt_lat_config_path = posixpath.join(
get_sysfs_path(device.get_device_id()), "queue/wbt_lat_usec"
)
return TestRun.executor.run_expect_success(f"echo {value} > {wbt_lat_config_path}")
def get_wbt_lat(device: Device):
wbt_lat_config_path = posixpath.join(
get_sysfs_path(device.get_device_id()), "queue/wbt_lat_usec"
)
return int(TestRun.executor.run_expect_success(f"cat {wbt_lat_config_path}").stdout)
def get_cores_ids_range(numa_node: int):
output = TestRun.executor.run_expect_success(f"lscpu --all --parse").stdout
parse_output = re.findall(r'(\d+),(\d+),(?:\d+),(\d+),,', output, re.I)
return [element[0] for element in parse_output if int(element[2]) == numa_node]
def create_user(username, additional_params=None):
command = "useradd "
if additional_params:
command += "".join([f"-{p} " for p in additional_params])
command += username
return TestRun.executor.run_expect_success(command)
def check_if_user_exists(username):
return TestRun.executor.run(f"id {username}").exit_code == 0
def get_block_device_names_list(exclude_list: List[int] = None) -> List[str]:
cmd = "lsblk -lo NAME"
if exclude_list is not None:
cmd += f" -e {','.join(str(type_id) for type_id in exclude_list)}"
devices = TestRun.executor.run_expect_success(cmd).stdout
devices_list = devices.splitlines()
devices_list.sort()
return devices_list

4
type_def/__init__.py Normal file
View File

@ -0,0 +1,4 @@
#
# Copyright(c) 2024 Huawei Technologies Co., Ltd.
# SPDX-License-Identifier: BSD-3-Clause
#