OS tools refactor:
- make udev, runlevel, memory and wget separate files - rename os_utils to os_tools and move it to test_tools - remove ModuleRemoveMethod and set modprobe as default - fix regex in is_kernel_module_loaded method - remove get_sys_block_path - move get_block_device_names method to disk tools Signed-off-by: Katarzyna Treder <katarzyna.treder@h-partners.com>
This commit is contained in:
236
test_tools/os_tools.py
Normal file
236
test_tools/os_tools.py
Normal file
@@ -0,0 +1,236 @@
|
||||
#
|
||||
# Copyright(c) 2019-2022 Intel Corporation
|
||||
# Copyright(c) 2024 Huawei Technologies Co., Ltd.
|
||||
# SPDX-License-Identifier: BSD-3-Clause
|
||||
#
|
||||
|
||||
import posixpath
|
||||
import re
|
||||
import time
|
||||
|
||||
from datetime import timedelta
|
||||
from enum import IntFlag, Enum, StrEnum
|
||||
from packaging import version
|
||||
|
||||
from core.test_run import TestRun
|
||||
from storage_devices.device import Device
|
||||
from test_tools.disk_utils import get_sysfs_path
|
||||
from test_tools.fs_utils import check_if_file_exists
|
||||
from test_utils.filesystem.file import File
|
||||
from connection.utils.retry import Retry
|
||||
|
||||
DEBUGFS_MOUNT_POINT = "/sys/kernel/debug"
|
||||
MEMORY_MOUNT_POINT = "/mnt/memspace"
|
||||
|
||||
|
||||
class Distro(StrEnum):
|
||||
UBUNTU = "ubuntu"
|
||||
DEBIAN = "debian"
|
||||
REDHAT = "rhel"
|
||||
OPENEULER = "openeuler"
|
||||
CENTOS = "centos"
|
||||
|
||||
|
||||
class DropCachesMode(IntFlag):
|
||||
PAGECACHE = 1
|
||||
SLAB = 2
|
||||
ALL = PAGECACHE | SLAB
|
||||
|
||||
|
||||
class OvercommitMemoryMode(Enum):
|
||||
DEFAULT = 0
|
||||
ALWAYS = 1
|
||||
NEVER = 2
|
||||
|
||||
|
||||
class SystemManagerType(Enum):
|
||||
sysv = 0
|
||||
systemd = 1
|
||||
|
||||
|
||||
def get_distro():
|
||||
output = TestRun.executor.run(
|
||||
"cat /etc/os-release | grep -e \"^ID=\" | awk -F= '{print$2}' | tr -d '\"'"
|
||||
).stdout.lower()
|
||||
|
||||
try:
|
||||
return Distro(output)
|
||||
except ValueError:
|
||||
raise ValueError(f"Could not resolve distro name. Command output: {output}")
|
||||
|
||||
|
||||
def drop_caches(level: DropCachesMode = DropCachesMode.ALL):
|
||||
TestRun.executor.run_expect_success(
|
||||
f"echo {level.value} > /proc/sys/vm/drop_caches")
|
||||
|
||||
|
||||
def get_number_of_processors_from_cpuinfo():
|
||||
"""Returns number of processors (count) which are listed out in /proc/cpuinfo"""
|
||||
cmd = f"cat /proc/cpuinfo | grep processor | wc -l"
|
||||
output = TestRun.executor.run(cmd).stdout
|
||||
|
||||
return int(output)
|
||||
|
||||
|
||||
def get_number_of_processes(process_name):
|
||||
cmd = f"ps aux | grep {process_name} | grep -v grep | wc -l"
|
||||
output = TestRun.executor.run(cmd).stdout
|
||||
|
||||
return int(output)
|
||||
|
||||
|
||||
def get_kernel_version():
|
||||
version_string = TestRun.executor.run_expect_success("uname -r").stdout
|
||||
version_string = version_string.split('-')[0]
|
||||
return version.Version(version_string)
|
||||
|
||||
|
||||
def is_kernel_module_loaded(module_name):
|
||||
output = TestRun.executor.run(f"lsmod | grep ^{module_name}$")
|
||||
return output.exit_code == 0
|
||||
|
||||
|
||||
def load_kernel_module(module_name, module_args: {str, str}=None):
|
||||
cmd = f"modprobe {module_name}"
|
||||
if module_args is not None:
|
||||
for key, value in module_args.items():
|
||||
cmd += f" {key}={value}"
|
||||
return TestRun.executor.run(cmd)
|
||||
|
||||
|
||||
def unload_kernel_module(module_name):
|
||||
cmd = f"modprobe -r {module_name}"
|
||||
return TestRun.executor.run_expect_success(cmd)
|
||||
|
||||
|
||||
def get_kernel_module_parameter(module_name, parameter):
|
||||
param_file_path = f"/sys/module/{module_name}/parameters/{parameter}"
|
||||
if not check_if_file_exists(param_file_path):
|
||||
raise FileNotFoundError(f"File {param_file_path} does not exist!")
|
||||
return File(param_file_path).read()
|
||||
|
||||
|
||||
def is_mounted(path: str):
|
||||
if path is None or path.isspace():
|
||||
raise Exception("Checked path cannot be empty")
|
||||
command = f"mount | grep --fixed-strings '{path.rstrip('/')} '"
|
||||
return TestRun.executor.run(command).exit_code == 0
|
||||
|
||||
|
||||
def mount_debugfs():
|
||||
if not is_mounted(DEBUGFS_MOUNT_POINT):
|
||||
TestRun.executor.run_expect_success(f"mount -t debugfs none {DEBUGFS_MOUNT_POINT}")
|
||||
|
||||
|
||||
def reload_kernel_module(module_name, module_args: {str, str}=None):
|
||||
if is_kernel_module_loaded(module_name):
|
||||
unload_kernel_module(module_name)
|
||||
|
||||
Retry.run_while_false(
|
||||
lambda: load_kernel_module(module_name, module_args).exit_code == 0,
|
||||
timeout=timedelta(seconds=5)
|
||||
)
|
||||
|
||||
|
||||
def get_module_path(module_name):
|
||||
cmd = f"modinfo {module_name}"
|
||||
|
||||
# module path is in second column of first line of `modinfo` output
|
||||
module_info = TestRun.executor.run_expect_success(cmd).stdout
|
||||
module_path = module_info.splitlines()[0].split()[1]
|
||||
|
||||
return module_path
|
||||
|
||||
|
||||
def get_executable_path(exec_name):
|
||||
cmd = f"which {exec_name}"
|
||||
|
||||
path = TestRun.executor.run_expect_success(cmd).stdout
|
||||
|
||||
return path
|
||||
|
||||
|
||||
def kill_all_io(graceful=True):
|
||||
if graceful:
|
||||
# TERM signal should be used in preference to the KILL signal, since a
|
||||
# process may install a handler for the TERM signal in order to perform
|
||||
# clean-up steps before terminating in an orderly fashion.
|
||||
TestRun.executor.run("killall -q --signal TERM dd fio blktrace")
|
||||
time.sleep(3)
|
||||
TestRun.executor.run("killall -q --signal TERM dd fio blktrace")
|
||||
time.sleep(3)
|
||||
TestRun.executor.run("killall -q --signal KILL dd fio blktrace")
|
||||
TestRun.executor.run("kill -9 `ps aux | grep -i vdbench.* | awk '{ print $2 }'`")
|
||||
|
||||
if TestRun.executor.run("pgrep -x dd").exit_code == 0:
|
||||
raise Exception(f"Failed to stop dd!")
|
||||
if TestRun.executor.run("pgrep -x fio").exit_code == 0:
|
||||
raise Exception(f"Failed to stop fio!")
|
||||
if TestRun.executor.run("pgrep -x blktrace").exit_code == 0:
|
||||
raise Exception(f"Failed to stop blktrace!")
|
||||
if TestRun.executor.run("pgrep vdbench").exit_code == 0:
|
||||
raise Exception(f"Failed to stop vdbench!")
|
||||
|
||||
|
||||
def sync():
|
||||
TestRun.executor.run_expect_success("sync")
|
||||
|
||||
|
||||
def get_dut_cpu_number():
|
||||
return int(TestRun.executor.run_expect_success("nproc").stdout)
|
||||
|
||||
|
||||
def get_dut_cpu_physical_cores():
|
||||
""" Get list of CPU numbers that don't share physical cores """
|
||||
output = TestRun.executor.run_expect_success("lscpu --all --parse").stdout
|
||||
|
||||
core_list = []
|
||||
visited_phys_cores = []
|
||||
for line in output.split("\n"):
|
||||
if "#" in line:
|
||||
continue
|
||||
|
||||
cpu_no, phys_core_no = line.split(",")[:2]
|
||||
if phys_core_no not in visited_phys_cores:
|
||||
core_list.append(cpu_no)
|
||||
visited_phys_cores.append(phys_core_no)
|
||||
|
||||
return core_list
|
||||
|
||||
|
||||
def set_wbt_lat(device: Device, value: int):
|
||||
if value < 0:
|
||||
raise ValueError("Write back latency can't be negative number")
|
||||
|
||||
wbt_lat_config_path = posixpath.join(
|
||||
get_sysfs_path(device.get_device_id()), "queue/wbt_lat_usec"
|
||||
)
|
||||
|
||||
return TestRun.executor.run_expect_success(f"echo {value} > {wbt_lat_config_path}")
|
||||
|
||||
|
||||
def get_wbt_lat(device: Device):
|
||||
wbt_lat_config_path = posixpath.join(
|
||||
get_sysfs_path(device.get_device_id()), "queue/wbt_lat_usec"
|
||||
)
|
||||
|
||||
return int(TestRun.executor.run_expect_success(f"cat {wbt_lat_config_path}").stdout)
|
||||
|
||||
|
||||
def get_cores_ids_range(numa_node: int):
|
||||
output = TestRun.executor.run_expect_success(f"lscpu --all --parse").stdout
|
||||
parse_output = re.findall(r'(\d+),(\d+),(?:\d+),(\d+),,', output, re.I)
|
||||
|
||||
return [element[0] for element in parse_output if int(element[2]) == numa_node]
|
||||
|
||||
|
||||
def create_user(username, additional_params=None):
|
||||
command = "useradd "
|
||||
if additional_params:
|
||||
command += "".join([f"-{p} " for p in additional_params])
|
||||
command += username
|
||||
return TestRun.executor.run_expect_success(command)
|
||||
|
||||
|
||||
def check_if_user_exists(username):
|
||||
return TestRun.executor.run(f"id {username}").exit_code == 0
|
||||
Reference in New Issue
Block a user