Compare commits

..

No commits in common. "ace1ff49d454f224e56b7ea21eb0c61ae1d59156" and "aea753275625e144550484a4ebdec297dcd80899" have entirely different histories.

8 changed files with 32 additions and 66 deletions

View File

@ -22,7 +22,6 @@ class TestRun:
plugin_manager = None plugin_manager = None
duts = None duts = None
disks = None disks = None
TEST_RUN_DATA_PATH = None
@classmethod @classmethod
@contextmanager @contextmanager

View File

@ -1,6 +1,6 @@
# #
# Copyright(c) 2019-2021 Intel Corporation # Copyright(c) 2019-2021 Intel Corporation
# Copyright(c) 2023-2025 Huawei Technologies Co., Ltd. # Copyright(c) 2023-2024 Huawei Technologies Co., Ltd.
# SPDX-License-Identifier: BSD-3-Clause # SPDX-License-Identifier: BSD-3-Clause
# #
@ -23,7 +23,6 @@ from test_tools import disk_finder
from test_utils.dut import Dut from test_utils.dut import Dut
TestRun = core.test_run.TestRun TestRun = core.test_run.TestRun
TestRun.TEST_RUN_DATA_PATH = "/tmp/test_data"
@classmethod @classmethod
@ -133,7 +132,6 @@ def __presetup(cls):
if cls.config['type'] == 'ssh': if cls.config['type'] == 'ssh':
try: try:
IP(cls.config['ip']) IP(cls.config['ip'])
if not cls.config['host']:
cls.config['host'] = cls.config['ip'] cls.config['host'] = cls.config['ip']
except ValueError: except ValueError:
TestRun.block("IP address from config is in invalid format.") TestRun.block("IP address from config is in invalid format.")

View File

@ -1,6 +1,6 @@
# #
# Copyright(c) 2020-2021 Intel Corporation # Copyright(c) 2020-2021 Intel Corporation
# Copyright(c) 2023-2025 Huawei Technologies Co., Ltd. # Copyright(c) 2023-2024 Huawei Technologies Co., Ltd.
# SPDX-License-Identifier: BSD-3-Clause # SPDX-License-Identifier: BSD-3-Clause
# #
@ -46,25 +46,18 @@ class PowerControlPlugin:
def teardown(self): def teardown(self):
pass pass
def power_cycle(self, wait_for_connection: bool = False, delay_until_reboot: int = 0) -> None: def power_cycle(self):
self.executor.run_expect_success(f"sudo virsh destroy {TestRun.dut.virsh['vm_name']}") self.executor.run_expect_success(f"sudo virsh reset {TestRun.dut.virsh['vm_name']}")
TestRun.executor.disconnect() TestRun.executor.disconnect()
self.executor.run_expect_success( TestRun.executor.wait_for_connection(timedelta(seconds=TestRun.dut.virsh["reboot_timeout"]))
f"(sleep {delay_until_reboot} && sudo virsh start {TestRun.dut.virsh['vm_name']}) &"
)
if wait_for_connection:
TestRun.executor.wait_for_connection(
timedelta(seconds=TestRun.dut.virsh["reboot_timeout"])
)
def check_if_vm_exists(self, vm_name) -> bool: def check_if_vm_exists(self, vm_name) -> bool:
return self.executor.run(f"sudo virsh list|grep -w {vm_name}").exit_code == 0 return self.executor.run(f"sudo virsh list|grep -w {vm_name}").exit_code == 0
def parse_virsh_config(self, vm_name, reboot_timeout=DEFAULT_REBOOT_TIMEOUT) -> dict | None: def parse_virsh_config(self, vm_name, reboot_timeout=DEFAULT_REBOOT_TIMEOUT) -> dict | None:
if not self.check_if_vm_exists(vm_name=vm_name): if not self.check_if_vm_exists(vm_name=vm_name):
raise ValueError( raise ValueError(f"Virsh power plugin error: couldn't find VM {vm_name} on host "
f"Virsh power plugin error: couldn't find VM {vm_name} on host {self.host}" f"{self.host}")
)
return { return {
"vm_name": vm_name, "vm_name": vm_name,
"reboot_timeout": reboot_timeout, "reboot_timeout": reboot_timeout,

View File

@ -1,13 +1,10 @@
# #
# Copyright(c) 2019-2021 Intel Corporation # Copyright(c) 2019-2021 Intel Corporation
# Copyright(c) 2025 Huawei Technologies Co., Ltd.
# SPDX-License-Identifier: BSD-3-Clause # SPDX-License-Identifier: BSD-3-Clause
# #
import logging import logging
import os import os
import posixpath
import re
import sys import sys
from contextlib import contextmanager from contextlib import contextmanager
from datetime import datetime from datetime import datetime
@ -48,7 +45,6 @@ class Log(HtmlLogManager, metaclass=Singleton):
logger = None logger = None
LOG_FORMAT = '%(asctime)s %(levelname)s:\t%(message)s' LOG_FORMAT = '%(asctime)s %(levelname)s:\t%(message)s'
DATE_FORMAT = "%Y/%m/%d %H:%M:%S" DATE_FORMAT = "%Y/%m/%d %H:%M:%S"
unique_test_identifier = ""
command_id = 0 command_id = 0
lock = Lock() lock = Lock()
@ -193,23 +189,15 @@ class Log(HtmlLogManager, metaclass=Singleton):
def get_additional_logs(self): def get_additional_logs(self):
from core.test_run import TestRun from core.test_run import TestRun
from test_tools.fs_tools import check_if_file_exists from test_tools.fs_tools import check_if_file_exists
messages_log = "/var/log/messages" messages_log = "/var/log/messages"
if not check_if_file_exists(messages_log): if not check_if_file_exists(messages_log):
messages_log = "/var/log/syslog" messages_log = "/var/log/syslog"
log_files = {"messages.log": messages_log,
log_files = {"messages.log": posixpath.join(TestRun.TEST_RUN_DATA_PATH, "messages"), "dmesg.log": "/tmp/dmesg"}
"dmesg.log": posixpath.join(TestRun.TEST_RUN_DATA_PATH, "dmesg")}
extra_logs = TestRun.config.get("extra_logs", {}) extra_logs = TestRun.config.get("extra_logs", {})
log_files.update(extra_logs) log_files.update(extra_logs)
# Escape special characters from test identifier to be properly processed by awk TestRun.executor.run(f"dmesg > {log_files['dmesg.log']}")
test_identifier = re.escape(TestRun.LOGGER.unique_test_identifier)
TestRun.executor.run(
f"dmesg | awk '/{test_identifier}/,0' > {log_files['dmesg.log']}")
TestRun.executor.run(
f"awk '/{test_identifier}/,0' {messages_log} > {log_files['messages.log']}")
dut_identifier = TestRun.dut.ip if TestRun.dut.ip else TestRun.dut.config["host"] dut_identifier = TestRun.dut.ip if TestRun.dut.ip else TestRun.dut.config["host"]
for log_name, log_source_path in log_files.items(): for log_name, log_source_path in log_files.items():
@ -240,11 +228,3 @@ class Log(HtmlLogManager, metaclass=Singleton):
} }
} }
json.dump(data, summary) json.dump(data, summary)
def print_test_identifier_to_logs(self):
from core.test_run import TestRun
# Add test identifier to dmesg
TestRun.executor.run(f"echo {self.unique_test_identifier} > /dev/kmsg")
# Add test identifier to messages log
TestRun.executor.run(f"logger {self.unique_test_identifier}")

View File

@ -1,6 +1,6 @@
# #
# Copyright(c) 2019-2022 Intel Corporation # Copyright(c) 2019-2022 Intel Corporation
# Copyright(c) 2023-2025 Huawei Technologies Co., Ltd. # Copyright(c) 2023-2024 Huawei Technologies Co., Ltd.
# SPDX-License-Identifier: BSD-3-Clause # SPDX-License-Identifier: BSD-3-Clause
# #
@ -102,11 +102,11 @@ def create_partition(
TestRun.executor.run_expect_success("udevadm settle") TestRun.executor.run_expect_success("udevadm settle")
if not check_partition_after_create( if not check_partition_after_create(
size=part_size, part_size,
part_number=part_number, part_number,
parent_dev_path=device.path, device.path,
part_type=part_type, part_type,
aligned=aligned): aligned):
raise Exception("Could not create partition!") raise Exception("Could not create partition!")
if part_type != PartitionType.extended: if part_type != PartitionType.extended:
@ -147,7 +147,7 @@ def create_partitions(device, sizes: [], partition_table_type=PartitionTable.gpt
for s in sizes: for s in sizes:
size = Size( size = Size(
s.get_value(device.block_size) - 1, device.block_size) s.get_value(device.block_size) - device.block_size.value, device.block_size)
if partition_table_type == PartitionTable.msdos and \ if partition_table_type == PartitionTable.msdos and \
len(sizes) > 4 and len(device.partitions) == 3: len(sizes) > 4 and len(device.partitions) == 3:
if available_disk_size(device) > msdos_part_max_size: if available_disk_size(device) > msdos_part_max_size:
@ -162,12 +162,12 @@ def create_partitions(device, sizes: [], partition_table_type=PartitionTable.gpt
partition_number_offset = 1 partition_number_offset = 1
partition_number = len(device.partitions) + 1 + partition_number_offset partition_number = len(device.partitions) + 1 + partition_number_offset
create_partition(device=device, create_partition(device,
part_size=size, size,
part_number=partition_number, partition_number,
part_type=partition_type, partition_type,
unit=device.block_size, Unit.MebiByte,
aligned=True) True)
def get_block_size(device): def get_block_size(device):
@ -197,8 +197,7 @@ def get_pci_address(device):
return pci_address return pci_address
def check_partition_after_create(size: Size, part_number: int, parent_dev_path: str, def check_partition_after_create(size, part_number, parent_dev_path, part_type, aligned):
part_type: PartitionType, aligned: bool):
partition_path = get_partition_path(parent_dev_path, part_number) partition_path = get_partition_path(parent_dev_path, part_number)
if "dev/cas" not in partition_path: if "dev/cas" not in partition_path:
cmd = f"find {partition_path} -type l" cmd = f"find {partition_path} -type l"

View File

@ -27,7 +27,7 @@ class PeachFuzzer:
peach_fuzzer_3_0_url = "https://sourceforge.net/projects/peachfuzz/files/Peach/3.0/" \ peach_fuzzer_3_0_url = "https://sourceforge.net/projects/peachfuzz/files/Peach/3.0/" \
"peach-3.0.202-linux-x86_64-release.zip" "peach-3.0.202-linux-x86_64-release.zip"
base_dir = posixpath.join(TestRun.TEST_RUN_DATA_PATH, "Fuzzy") base_dir = "/root/Fuzzy"
peach_dir = "peach-3.0.202-linux-x86_64-release" peach_dir = "peach-3.0.202-linux-x86_64-release"
xml_config_template = os.path.join(os.path.dirname(__file__), "config_template.xml") xml_config_template = os.path.join(os.path.dirname(__file__), "config_template.xml")
xml_config_file = posixpath.join(base_dir, "fuzzerConfig.xml") xml_config_file = posixpath.join(base_dir, "fuzzerConfig.xml")

View File

@ -1,15 +1,13 @@
# #
# Copyright(c) 2019-2022 Intel Corporation # Copyright(c) 2019-2022 Intel Corporation
# Copyright(c) 2024-2025 Huawei Technologies Co., Ltd. # Copyright(c) 2024 Huawei Technologies Co., Ltd.
# SPDX-License-Identifier: BSD-3-Clause # SPDX-License-Identifier: BSD-3-Clause
# #
from pathlib import PurePosixPath from pathlib import Path
from core.test_run import TestRun from core.test_run import TestRun
systemd_service_directory = PurePosixPath("/usr/lib/systemd/system/") systemd_service_directory = Path("/usr/lib/systemd/system/")
def enable_service(name): def enable_service(name):
TestRun.executor.run_expect_success(f"systemctl enable {name}") TestRun.executor.run_expect_success(f"systemctl enable {name}")

View File

@ -1,12 +1,11 @@
# #
# Copyright(c) 2022 Intel Corporation # Copyright(c) 2022 Intel Corporation
# Copyright(c) 2025 Huawei Corporation
# SPDX-License-Identifier: BSD-3-Clause # SPDX-License-Identifier: BSD-3-Clause
# #
from textwrap import dedent from textwrap import dedent
from string import Template from string import Template
from pathlib import PurePosixPath from pathlib import Path
from test_tools.systemctl import enable_service, reload_daemon, systemd_service_directory from test_tools.systemctl import enable_service, reload_daemon, systemd_service_directory
from test_tools.fs_tools import ( from test_tools.fs_tools import (
@ -18,7 +17,7 @@ from test_tools.fs_tools import (
class EmergencyEscape: class EmergencyEscape:
escape_marker = "EMERGENCY_ESCAPE" escape_marker = "EMERGENCY_ESCAPE"
escape_service = PurePosixPath("emergency-escape.service") escape_service = Path("emergency-escape.service")
escape_service_template = Template( escape_service_template = Template(
dedent( dedent(
f""" f"""
@ -39,7 +38,7 @@ class EmergencyEscape:
""" """
).strip() ).strip()
) )
cleanup_service = PurePosixPath("emergency-escape-cleanup.service") cleanup_service = Path("emergency-escape-cleanup.service")
cleanup_service_template = Template( cleanup_service_template = Template(
dedent( dedent(
""" """