Refactor disk tools and fs tools

Signed-off-by: Katarzyna Treder <katarzyna.treder@h-partners.com>
This commit is contained in:
Katarzyna Treder 2024-12-11 18:56:23 +01:00
parent 6dd9c9ca8c
commit fd869a0afc
25 changed files with 179 additions and 176 deletions

View File

@ -9,7 +9,7 @@ from datetime import timedelta
from connection.base_executor import BaseExecutor
from core.test_run import TestRun
from test_tools.fs_utils import copy
from test_tools.fs_tools import copy
from connection.utils.output import Output, CmdException

View File

@ -9,7 +9,7 @@ import posixpath
from datetime import timedelta
from core.test_run import TestRun
from test_tools import fs_utils
from test_tools import fs_tools
class Vdbench:

View File

@ -188,7 +188,7 @@ class Log(HtmlLogManager, metaclass=Singleton):
def get_additional_logs(self):
from core.test_run import TestRun
from test_tools.fs_utils import check_if_file_exists
from test_tools.fs_tools import check_if_file_exists
messages_log = "/var/log/messages"
if not check_if_file_exists(messages_log):
messages_log = "/var/log/syslog"

View File

@ -5,27 +5,29 @@
#
import posixpath
import test_tools.fs_tools
from core.test_run import TestRun
from test_tools import disk_utils, fs_utils
from test_tools.disk_utils import get_device_filesystem_type, get_sysfs_path
from test_tools import disk_tools, fs_tools
from test_tools.disk_tools import get_sysfs_path
from test_tools.fs_tools import get_device_filesystem_type
from test_utils.io_stats import IoStats
from type_def.size import Size, Unit
class Device:
def __init__(self, path):
disk_utils.validate_dev_path(path)
disk_tools.validate_dev_path(path)
self.path = path
self.size = Size(disk_utils.get_size(self.get_device_id()), Unit.Byte)
self.size = Size(disk_tools.get_size(self.get_device_id()), Unit.Byte)
self.filesystem = get_device_filesystem_type(self.get_device_id())
self.mount_point = None
def create_filesystem(self, fs_type: disk_utils.Filesystem, force=True, blocksize=None):
disk_utils.create_filesystem(self, fs_type, force, blocksize)
def create_filesystem(self, fs_type: test_tools.fs_tools.Filesystem, force=True, blocksize=None):
test_tools.fs_tools.create_filesystem(self, fs_type, force, blocksize)
self.filesystem = fs_type
def wipe_filesystem(self, force=True):
disk_utils.wipe_filesystem(self, force)
test_tools.fs_tools.wipe_filesystem(self, force)
self.filesystem = None
def is_mounted(self):
@ -34,13 +36,13 @@ class Device:
return False
else:
mount_point_line = output.stdout.split('\n')[1]
device_path = fs_utils.readlink(self.path)
device_path = fs_tools.readlink(self.path)
self.mount_point = mount_point_line[0:mount_point_line.find(device_path)].strip()
return True
def mount(self, mount_point, options: [str] = None):
if not self.is_mounted():
if disk_utils.mount(self, mount_point, options):
if disk_tools.mount(self, mount_point, options):
self.mount_point = mount_point
else:
raise Exception(f"Device is already mounted! Actual mount point: {self.mount_point}")
@ -48,7 +50,7 @@ class Device:
def unmount(self):
if not self.is_mounted():
TestRun.LOGGER.info("Device is not mounted.")
elif disk_utils.unmount(self):
elif disk_tools.unmount(self):
self.mount_point = None
def get_device_link(self, directory: str):
@ -56,27 +58,27 @@ class Device:
return next(i for i in items if i.full_path.startswith(directory))
def get_device_id(self):
return fs_utils.readlink(self.path).split('/')[-1]
return fs_tools.readlink(self.path).split('/')[-1]
def get_all_device_links(self, directory: str):
from test_tools import fs_utils
output = fs_utils.ls(f"$(find -L {directory} -samefile {self.path})")
return fs_utils.parse_ls_output(output, self.path)
from test_tools import fs_tools
output = fs_tools.ls(f"$(find -L {directory} -samefile {self.path})")
return fs_tools.parse_ls_output(output, self.path)
def get_io_stats(self):
return IoStats.get_io_stats(self.get_device_id())
def get_sysfs_property(self, property_name):
path = posixpath.join(disk_utils.get_sysfs_path(self.get_device_id()),
path = posixpath.join(disk_tools.get_sysfs_path(self.get_device_id()),
"queue", property_name)
return TestRun.executor.run_expect_success(f"cat {path}").stdout
def set_sysfs_property(self, property_name, value):
TestRun.LOGGER.info(
f"Setting {property_name} for device {self.get_device_id()} to {value}.")
path = posixpath.join(disk_utils.get_sysfs_path(self.get_device_id()), "queue",
path = posixpath.join(disk_tools.get_sysfs_path(self.get_device_id()), "queue",
property_name)
fs_utils.write_file(path, str(value))
fs_tools.write_file(path, str(value))
def set_max_io_size(self, new_max_io_size: Size):
self.set_sysfs_property("max_sectors_kb",

View File

@ -10,9 +10,10 @@ import re
from datetime import timedelta
from enum import IntEnum
import test_tools.fs_tools
from core.test_run import TestRun
from storage_devices.device import Device
from test_tools import disk_utils, fs_utils, nvme_cli
from test_tools import disk_tools, fs_tools, nvme_cli
from test_tools.common.wait import wait
from connection.utils.output import Output
from test_tools.disk_finder import get_block_devices_list, resolve_to_by_id_link
@ -137,24 +138,24 @@ class Disk(Device):
)
return recognized_types[0]
def create_partitions(self, sizes: [], partition_table_type=disk_utils.PartitionTable.gpt):
disk_utils.create_partitions(self, sizes, partition_table_type)
def create_partitions(self, sizes: [], partition_table_type=disk_tools.PartitionTable.gpt):
disk_tools.create_partitions(self, sizes, partition_table_type)
def remove_partition(self, part):
part_number = int(part.path.split("part")[1])
disk_utils.remove_parition(self, part_number)
disk_tools.remove_parition(self, part_number)
self.partitions.remove(part)
def umount_all_partitions(self):
TestRun.LOGGER.info(f"Unmounting all partitions from: {self.path}")
cmd = f"umount -l {fs_utils.readlink(self.path)}*?"
cmd = f"umount -l {fs_tools.readlink(self.path)}*?"
TestRun.executor.run(cmd)
def remove_partitions(self):
for part in self.partitions:
if part.is_mounted():
if test_tools.fs_tools.is_mounted(part.path):
part.unmount()
if disk_utils.remove_partitions(self):
if disk_tools.remove_partitions(self):
self.partitions.clear()
def is_detected(self):
@ -162,8 +163,8 @@ class Disk(Device):
serial_numbers = Disk.get_all_serial_numbers()
return self.serial_number in serial_numbers
elif self.path:
output = fs_utils.ls_item(f"{self.path}")
return fs_utils.parse_ls_output(output)[0] is not None
output = fs_tools.ls_item(f"{self.path}")
return fs_tools.parse_ls_output(output)[0] is not None
raise Exception("Couldn't check if device is detected by the system")
def wait_for_plug_status(self, should_be_visible):
@ -289,8 +290,8 @@ class NvmeDisk(Disk):
base = f"/sys/block/{device_id}/device"
for suffix in ["/remove", "/device/remove"]:
try:
output = fs_utils.ls_item(base + suffix)
fs_utils.parse_ls_output(output)[0]
output = fs_tools.ls_item(base + suffix)
fs_tools.parse_ls_output(output)[0]
except TypeError:
continue
return base + suffix
@ -345,8 +346,8 @@ class SataDisk(Disk):
@staticmethod
def get_sysfs_addr(device_id):
ls_command = f"$(find -H /sys/devices/ -name {device_id} -type d)"
output = fs_utils.ls_item(f"{ls_command}")
sysfs_addr = fs_utils.parse_ls_output(output)[0]
output = fs_tools.ls_item(f"{ls_command}")
sysfs_addr = fs_tools.parse_ls_output(output)[0]
if not sysfs_addr:
raise Exception(f"Failed to find sysfs address: ls -l {ls_command}")
return sysfs_addr.full_path
@ -412,8 +413,8 @@ class VirtioDisk(Disk):
@staticmethod
def get_sysfs_addr(device_id: str) -> str:
ls_command = f"$(find -H /sys/devices/ -name {device_id} -type d)"
output = fs_utils.ls_item(f"{ls_command}")
sysfs_addr = fs_utils.parse_ls_output(output)[0]
output = fs_tools.ls_item(f"{ls_command}")
sysfs_addr = fs_tools.parse_ls_output(output)[0]
if not sysfs_addr:
raise Exception(f"Failed to find sysfs address: ls -l {ls_command}")

View File

@ -10,7 +10,7 @@ from typing import Union
from core.test_run import TestRun
from storage_devices.device import Device
from storage_devices.disk import Disk
from test_tools.fs_utils import readlink
from test_tools.fs_tools import readlink
from test_tools.disk_finder import resolve_to_by_id_link, get_system_disks
from test_utils.filesystem.symlink import Symlink
from type_def.size import Size

View File

@ -5,7 +5,7 @@
from core.test_run import TestRun
from storage_devices.device import Device
from test_tools.fs_utils import ls, parse_ls_output
from test_tools.fs_tools import ls, parse_ls_output
from test_tools.os_tools import (
unload_kernel_module,
is_kernel_module_loaded,

View File

@ -4,13 +4,13 @@
#
from storage_devices.device import Device
from test_tools import disk_utils
from test_tools import disk_tools
from type_def.size import Size
class Partition(Device):
def __init__(self, parent_dev, type, number, begin: Size, end: Size):
Device.__init__(self, disk_utils.get_partition_path(parent_dev.path, number))
Device.__init__(self, disk_tools.get_partition_path(parent_dev.path, number))
self.number = number
self.parent_device = parent_dev
self.type = type

View File

@ -8,7 +8,7 @@ from enum import IntEnum, Enum
from core.test_run import TestRun
from storage_devices.device import Device
from storage_devices.disk import Disk
from test_tools.fs_utils import readlink
from test_tools.fs_tools import readlink
from test_tools.mdadm import Mdadm
from test_tools.disk_finder import resolve_to_by_id_link
from type_def.size import Size, Unit

View File

@ -7,8 +7,8 @@ import posixpath
from core.test_run import TestRun
from storage_devices.device import Device
from test_tools import disk_utils
from test_tools.fs_utils import ls, parse_ls_output
from test_tools import disk_tools
from test_tools.fs_tools import ls, parse_ls_output
from test_utils.filesystem.symlink import Symlink
from test_tools.os_tools import reload_kernel_module, unload_kernel_module, is_kernel_module_loaded
from type_def.size import Size, Unit
@ -68,7 +68,7 @@ class RamDisk(Device):
ram_disks = cls._list_devices()
return (
len(ram_disks) >= disk_count
and Size(disk_utils.get_size(ram_disks[0].name), Unit.Byte).align_down(Unit.MiB.value)
and Size(disk_tools.get_size(ram_disks[0].name), Unit.Byte).align_down(Unit.MiB.value)
== disk_size.align_down(Unit.MiB.value)
)

View File

@ -14,7 +14,8 @@ from datetime import timedelta
from core.test_run import TestRun
from storage_devices.device import Device
from test_utils.filesystem.directory import Directory
from test_tools.os_tools import is_mounted, drop_caches, DropCachesMode
from test_tools.os_tools import drop_caches, DropCachesMode
from test_tools.fs_tools import is_mounted
from type_def.size import Size, Unit
DEBUGFS_MOUNT_POINT = "/sys/kernel/debug"

View File

@ -7,9 +7,9 @@ import os
import posixpath
from core.test_run import TestRun
from test_tools import disk_utils
from test_tools.disk_utils import get_sysfs_path
from test_tools.fs_utils import check_if_file_exists, readlink
from test_tools import disk_tools
from test_tools.disk_tools import get_sysfs_path
from test_tools.fs_tools import check_if_file_exists, readlink
from connection.utils.output import CmdException
@ -50,7 +50,7 @@ def discover_hdd_devices(block_devices, devices_res):
for dev in block_devices:
if TestRun.executor.run_expect_success(f"cat /sys/block/{dev}/removable").stdout == "1":
continue # skip removable drives
block_size = disk_utils.get_block_size(dev)
block_size = disk_tools.get_block_size(dev)
if int(block_size) == 4096:
disk_type = 'hdd4k'
else:
@ -62,7 +62,7 @@ def discover_hdd_devices(block_devices, devices_res):
f"sg_inq /dev/{dev} | grep -i 'serial number'"
).stdout.split(': ')[1].strip(),
"blocksize": block_size,
"size": disk_utils.get_size(dev)})
"size": disk_tools.get_size(dev)})
block_devices.clear()
@ -99,8 +99,8 @@ def discover_ssd_devices(block_devices, devices_res):
"type": disk_type,
"path": resolve_to_by_id_link(device_path),
"serial": serial_number,
"blocksize": disk_utils.get_block_size(dev),
"size": disk_utils.get_size(dev)})
"blocksize": disk_tools.get_block_size(dev),
"size": disk_tools.get_size(dev)})
block_devices.remove(dev)

View File

@ -10,22 +10,17 @@ import time
from enum import Enum
from typing import List
import test_tools.fs_tools
from core.test_run import TestRun
from test_tools import fs_utils
from test_tools.dd import Dd
from test_tools.fs_utils import readlink, parse_ls_output, ls
from connection.utils.output import CmdException
from test_tools.fs_tools import readlink, parse_ls_output, ls, check_if_directory_exists, \
create_directory, wipe_filesystem
from test_tools.udev import Udev
from type_def.size import Size, Unit
SECTOR_SIZE = 512
class Filesystem(Enum):
xfs = 0
ext3 = 1
ext4 = 2
class PartitionTable(Enum):
msdos = 0
gpt = 1
@ -43,21 +38,6 @@ class PartitionType(Enum):
unknown = 8
def create_filesystem(device, filesystem: Filesystem, force=True, blocksize=None):
TestRun.LOGGER.info(
f"Creating filesystem ({filesystem.name}) on device: {device.path}")
force_param = ' -f ' if filesystem == Filesystem.xfs else ' -F '
force_param = force_param if force else ''
block_size_param = f' -b size={blocksize}' if filesystem == Filesystem.xfs \
else f' -b {blocksize}'
block_size_param = block_size_param if blocksize else ''
cmd = f'mkfs.{filesystem.name} {force_param} {device.path} {block_size_param}'
cmd = re.sub(' +', ' ', cmd)
TestRun.executor.run_expect_success(cmd)
TestRun.LOGGER.info(
f"Successfully created filesystem on device: {device.path}")
def create_partition_table(device, partition_table_type: PartitionTable = PartitionTable.gpt):
TestRun.LOGGER.info(
f"Creating partition table ({partition_table_type.name}) for device: {device.path}")
@ -267,8 +247,7 @@ def get_first_partition_offset(device, aligned: bool):
def remove_partitions(device):
from test_tools.udev import Udev
if device.is_mounted():
if test_tools.fs_tools.is_mounted(device.path):
device.unmount()
for partition in device.partitions:
@ -276,7 +255,7 @@ def remove_partitions(device):
TestRun.LOGGER.info(f"Removing partitions from device: {device.path} "
f"({device.get_device_id()}).")
device.wipe_filesystem()
wipe_filesystem(device)
Udev.trigger()
Udev.settle()
output = TestRun.executor.run(f"ls {device.path}* -1")
@ -287,8 +266,8 @@ def remove_partitions(device):
def mount(device, mount_point, options: [str] = None):
if not fs_utils.check_if_directory_exists(mount_point):
fs_utils.create_directory(mount_point, True)
if not check_if_directory_exists(mount_point):
create_directory(mount_point, True)
TestRun.LOGGER.info(f"Mounting device {device.path} ({device.get_device_id()}) "
f"to {mount_point}.")
cmd = f"mount {device.path} {mount_point}"
@ -330,15 +309,6 @@ def unit_to_string(unit):
return unit_string.get(unit, "Invalid unit.")
def wipe_filesystem(device, force=True):
TestRun.LOGGER.info(f"Erasing the device: {device.path}")
force_param = ' -f' if force else ''
cmd = f'wipefs -a{force_param} {device.path}'
TestRun.executor.run_expect_success(cmd)
TestRun.LOGGER.info(
f"Successfully wiped device: {device.path}")
def check_if_device_supports_trim(device):
if device.get_device_id().startswith("nvme"):
return True
@ -351,27 +321,6 @@ def check_if_device_supports_trim(device):
return int(command_output.stdout) > 0
def get_device_filesystem_type(device_id):
cmd = f'lsblk -l -o NAME,FSTYPE | sort | uniq | grep "{device_id} "'
try:
stdout = TestRun.executor.run_expect_success(cmd).stdout
except CmdException:
# unusual devices might not be listed in output (i.e. RAID containers)
if TestRun.executor.run(f"test -b /dev/{device_id}").exit_code != 0:
raise
else:
return None
split_stdout = stdout.strip().split()
if len(split_stdout) <= 1:
return None
else:
try:
return Filesystem[split_stdout[1]]
except KeyError:
TestRun.LOGGER.warning(f"Unrecognized filesystem: {split_stdout[1]}")
return None
def _is_by_id_path(path: str):
"""check if given path already is proper by-id path"""
dev_by_id_dir = "/dev/disk/by-id"

View File

@ -9,10 +9,10 @@ import uuid
from packaging.version import Version
import test_tools.fio.fio_param
import test_tools.fs_utils
import test_tools.fs_tools
import test_tools.wget
from core.test_run import TestRun
from test_tools import fs_utils
from test_tools import fs_tools
from connection.utils.output import CmdException
@ -51,7 +51,7 @@ class Fio:
def install(self):
fio_url = f"http://brick.kernel.dk/snaps/fio-{self.min_fio_version}.tar.bz2"
fio_package = test_tools.wget.download_file(fio_url)
fs_utils.uncompress_archive(fio_package)
fs_tools.uncompress_archive(fio_package)
TestRun.executor.run_expect_success(
f"cd {fio_package.parent_dir}/fio-{self.min_fio_version}"
f" && ./configure && make -j && make install"

View File

@ -7,17 +7,24 @@
import base64
import math
import re
import textwrap
from collections import namedtuple
from datetime import datetime, timedelta
from enum import Enum, IntFlag
from aenum import IntFlag, Enum
from connection.utils.output import CmdException
from core.test_run import TestRun
from test_tools.dd import Dd
from type_def.size import Size, Unit
class Filesystem(Enum):
xfs = 0
ext3 = 1
ext4 = 2
class Permissions(IntFlag):
r = 4
w = 2
@ -50,7 +57,7 @@ class PermissionSign(Enum):
set = '='
class FilesPermissions():
class FilesPermissions:
perms_exceptions = {}
def __init__(self, files_list: list):
@ -393,3 +400,55 @@ def create_random_test_file(target_file_path: str,
dd.run()
file.refresh_item()
return file
def create_filesystem(device, filesystem: Filesystem, force=True, blocksize=None):
TestRun.LOGGER.info(
f"Creating filesystem ({filesystem.name}) on device: {device.path}")
force_param = ' -f ' if filesystem == Filesystem.xfs else ' -F '
force_param = force_param if force else ''
block_size_param = f' -b size={blocksize}' if filesystem == Filesystem.xfs \
else f' -b {blocksize}'
block_size_param = block_size_param if blocksize else ''
cmd = f'mkfs.{filesystem.name} {force_param} {device.path} {block_size_param}'
cmd = re.sub(' +', ' ', cmd)
TestRun.executor.run_expect_success(cmd)
TestRun.LOGGER.info(
f"Successfully created filesystem on device: {device.path}")
def wipe_filesystem(device, force=True):
TestRun.LOGGER.info(f"Erasing the device: {device.path}")
force_param = ' -f' if force else ''
cmd = f'wipefs -a{force_param} {device.path}'
TestRun.executor.run_expect_success(cmd)
TestRun.LOGGER.info(
f"Successfully wiped device: {device.path}")
def get_device_filesystem_type(device_id):
cmd = f'lsblk -l -o NAME,FSTYPE | sort | uniq | grep "{device_id} "'
try:
stdout = TestRun.executor.run_expect_success(cmd).stdout
except CmdException:
# unusual devices might not be listed in output (i.e. RAID containers)
if TestRun.executor.run(f"test -b /dev/{device_id}").exit_code != 0:
raise
else:
return None
split_stdout = stdout.strip().split()
if len(split_stdout) <= 1:
return None
else:
try:
return Filesystem[split_stdout[1]]
except KeyError:
TestRun.LOGGER.warning(f"Unrecognized filesystem: {split_stdout[1]}")
return None
def is_mounted(path: str):
if path is None or path.isspace():
raise Exception("Checked path cannot be empty")
command = f"mount | grep --fixed-strings '{path.rstrip('/')} '"
return TestRun.executor.run(command).exit_code == 0

View File

@ -4,11 +4,11 @@
# SPDX-License-Identifier: BSD-3-Clause
#
from test_tools import fs_utils, systemctl
from test_tools import fs_tools, systemctl
def add_mountpoint(device, mount_point, fs_type, mount_now=True):
fs_utils.append_line("/etc/fstab",
fs_tools.append_line("/etc/fstab",
f"{device.path} {mount_point} {fs_type.name} defaults 0 0")
systemctl.reload_daemon()
if mount_now:
@ -16,5 +16,5 @@ def add_mountpoint(device, mount_point, fs_type, mount_now=True):
def remove_mountpoint(device):
fs_utils.remove_lines("/etc/fstab", device.path)
fs_tools.remove_lines("/etc/fstab", device.path)
systemctl.reload_daemon()

View File

@ -3,9 +3,9 @@ import math
from connection.utils.output import CmdException
from core.test_run import TestRun
from test_tools.dd import Dd
from test_tools.fs_utils import check_if_directory_exists, create_directory
from test_tools.fs_tools import check_if_directory_exists, create_directory, is_mounted
from test_tools.os_tools import OvercommitMemoryMode, drop_caches, DropCachesMode, \
MEMORY_MOUNT_POINT, is_mounted
MEMORY_MOUNT_POINT
from type_def.size import Size, Unit

View File

@ -14,8 +14,8 @@ from packaging import version
from core.test_run import TestRun
from storage_devices.device import Device
from test_tools.disk_utils import get_sysfs_path
from test_tools.fs_utils import check_if_file_exists
from test_tools.disk_tools import get_sysfs_path
from test_tools.fs_tools import check_if_file_exists, is_mounted
from test_utils.filesystem.file import File
from connection.utils.retry import Retry
@ -110,13 +110,6 @@ def get_kernel_module_parameter(module_name, parameter):
return File(param_file_path).read()
def is_mounted(path: str):
if path is None or path.isspace():
raise Exception("Checked path cannot be empty")
command = f"mount | grep --fixed-strings '{path.rstrip('/')} '"
return TestRun.executor.run(command).exit_code == 0
def mount_debugfs():
if not is_mounted(DEBUGFS_MOUNT_POINT):
TestRun.executor.run_expect_success(f"mount -t debugfs none {DEBUGFS_MOUNT_POINT}")

View File

@ -14,8 +14,8 @@ from collections import namedtuple
import test_tools.wget
from core.test_run import TestRun
from test_tools import fs_utils
from test_tools.fs_utils import create_directory, check_if_file_exists, write_file
from test_tools import fs_tools
from test_tools.fs_tools import create_directory, check_if_file_exists, write_file
class PeachFuzzer:
@ -75,7 +75,7 @@ class PeachFuzzer:
cls._install()
if not cls._is_xml_config_prepared():
TestRun.block("No Peach Fuzzer XML config needed to generate fuzzed values was found!")
fs_utils.remove(cls.fuzzy_output_file, force=True, ignore_errors=True)
fs_tools.remove(cls.fuzzy_output_file, force=True, ignore_errors=True)
TestRun.LOGGER.info(f"Generate {count} unique fuzzed values")
cmd = f"cd {cls.base_dir}; {cls.peach_dir}/peach --range 0,{count - 1} " \
f"--seed {random.randrange(2 ** 32)} {cls.xml_config_file} > " \
@ -172,7 +172,7 @@ class PeachFuzzer:
"""
if not cls._is_mono_installed():
TestRun.block("Mono is not installed, can't continue with Peach Fuzzer!")
if fs_utils.check_if_directory_exists(posixpath.join(cls.base_dir, cls.peach_dir)):
if fs_tools.check_if_directory_exists(posixpath.join(cls.base_dir, cls.peach_dir)):
return "Peach" in TestRun.executor.run(
f"cd {cls.base_dir} && {cls.peach_dir}/peach --version").stdout.strip()
else:
@ -197,7 +197,7 @@ class PeachFuzzer:
"""
Check if Peach Fuzzer XML config is present on the DUT
"""
if fs_utils.check_if_file_exists(cls.xml_config_file):
if fs_tools.check_if_file_exists(cls.xml_config_file):
return True
else:
return False

View File

@ -3,8 +3,6 @@
# SPDX-License-Identifier: BSD-3-Clause-Clear
#
import os
from test_utils.filesystem.file import File

View File

@ -8,7 +8,7 @@ from string import Template
from pathlib import Path
from test_tools.systemctl import enable_service, reload_daemon, systemd_service_directory
from test_tools.fs_utils import (
from test_tools.fs_tools import (
create_file,
write_file,
remove,

View File

@ -3,8 +3,8 @@
# SPDX-License-Identifier: BSD-3-Clause
#
from core.test_run import TestRun
from test_tools import fs_utils
from test_tools.fs_utils import check_if_directory_exists
from test_tools import fs_tools
from test_tools.fs_tools import check_if_directory_exists
from test_utils.filesystem.fs_item import FsItem
@ -13,14 +13,14 @@ class Directory(FsItem):
FsItem.__init__(self, full_path)
def ls(self):
output = fs_utils.ls(f"{self.full_path}")
return fs_utils.parse_ls_output(output, self.full_path)
output = fs_tools.ls(f"{self.full_path}")
return fs_tools.parse_ls_output(output, self.full_path)
@staticmethod
def create_directory(path: str, parents: bool = False):
fs_utils.create_directory(path, parents)
output = fs_utils.ls_item(path)
return fs_utils.parse_ls_output(output)[0]
fs_tools.create_directory(path, parents)
output = fs_tools.ls_item(path)
return fs_tools.parse_ls_output(output)[0]
@staticmethod
def create_temp_directory(parent_dir_path: str = "/tmp"):

View File

@ -5,7 +5,7 @@
#
from datetime import timedelta
from test_tools import fs_utils
from test_tools import fs_tools
from test_tools.dd import Dd
from test_utils.filesystem.fs_item import FsItem
from type_def.size import Size
@ -16,22 +16,22 @@ class File(FsItem):
FsItem.__init__(self, full_path)
def compare(self, other_file, timeout: timedelta = timedelta(minutes=30)):
return fs_utils.compare(str(self), str(other_file), timeout)
return fs_tools.compare(str(self), str(other_file), timeout)
def diff(self, other_file, timeout: timedelta = timedelta(minutes=30)):
return fs_utils.diff(str(self), str(other_file), timeout)
return fs_tools.diff(str(self), str(other_file), timeout)
def md5sum(self, binary=True, timeout: timedelta = timedelta(minutes=30)):
return fs_utils.md5sum(str(self), binary, timeout)
return fs_tools.md5sum(str(self), binary, timeout)
def crc32sum(self, timeout: timedelta = timedelta(minutes=30)):
return fs_utils.crc32sum(str(self), timeout)
return fs_tools.crc32sum(str(self), timeout)
def read(self):
return fs_utils.read_file(str(self))
return fs_tools.read_file(str(self))
def write(self, content, overwrite: bool = True):
fs_utils.write_file(str(self), content, overwrite)
fs_tools.write_file(str(self), content, overwrite)
self.refresh_item()
def get_properties(self):
@ -39,9 +39,9 @@ class File(FsItem):
@staticmethod
def create_file(path: str):
fs_utils.create_file(path)
output = fs_utils.ls_item(path)
return fs_utils.parse_ls_output(output)[0]
fs_tools.create_file(path)
output = fs_tools.ls_item(path)
return fs_tools.parse_ls_output(output)[0]
def padding(self, size: Size):
dd = Dd().input("/dev/zero").output(self).count(1).block_size(size)
@ -49,7 +49,7 @@ class File(FsItem):
self.refresh_item()
def remove(self, force: bool = False, ignore_errors: bool = False):
fs_utils.remove(str(self), force=force, ignore_errors=ignore_errors)
fs_tools.remove(str(self), force=force, ignore_errors=ignore_errors)
def copy(self,
destination,
@ -57,18 +57,18 @@ class File(FsItem):
recursive: bool = False,
dereference: bool = False,
timeout: timedelta = timedelta(minutes=30)):
fs_utils.copy(str(self), destination, force, recursive, dereference, timeout)
if fs_utils.check_if_directory_exists(destination):
fs_tools.copy(str(self), destination, force, recursive, dereference, timeout)
if fs_tools.check_if_directory_exists(destination):
path = f"{destination}{'/' if destination[-1] != '/' else ''}{self.name}"
else:
path = destination
output = fs_utils.ls_item(path)
return fs_utils.parse_ls_output(output)[0]
output = fs_tools.ls_item(path)
return fs_tools.parse_ls_output(output)[0]
class FileProperties:
def __init__(self, file):
file = fs_utils.parse_ls_output(fs_utils.ls_item(file.full_path))[0]
file = fs_tools.parse_ls_output(fs_tools.ls_item(file.full_path))[0]
self.full_path = file.full_path
self.parent_dir = FsItem.get_parent_dir(self.full_path)
self.name = FsItem.get_name(self.full_path)

View File

@ -5,7 +5,7 @@
import posixpath
from test_tools import fs_utils
from test_tools import fs_tools
class FsItem:
@ -38,19 +38,19 @@ class FsItem:
return self.full_path
def chmod_numerical(self, permissions: int, recursive: bool = False):
fs_utils.chmod_numerical(self.full_path, permissions, recursive)
fs_tools.chmod_numerical(self.full_path, permissions, recursive)
self.refresh_item()
def chmod(self,
permissions: fs_utils.Permissions,
users: fs_utils.PermissionsUsers,
sign: fs_utils.PermissionSign = fs_utils.PermissionSign.set,
permissions: fs_tools.Permissions,
users: fs_tools.PermissionsUsers,
sign: fs_tools.PermissionSign = fs_tools.PermissionSign.set,
recursive: bool = False):
fs_utils.chmod(self.full_path, permissions, users, sign=sign, recursive=recursive)
fs_tools.chmod(self.full_path, permissions, users, sign=sign, recursive=recursive)
self.refresh_item()
def chown(self, owner, group, recursive: bool = False):
fs_utils.chown(self.full_path, owner, group, recursive)
fs_tools.chown(self.full_path, owner, group, recursive)
self.refresh_item()
def copy(self,
@ -58,20 +58,20 @@ class FsItem:
force: bool = False,
recursive: bool = False,
dereference: bool = False):
target_dir_exists = fs_utils.check_if_directory_exists(destination)
fs_utils.copy(str(self), destination, force, recursive, dereference)
target_dir_exists = fs_tools.check_if_directory_exists(destination)
fs_tools.copy(str(self), destination, force, recursive, dereference)
if target_dir_exists:
path = f"{destination}{'/' if destination[-1] != '/' else ''}{self.name}"
else:
path = destination
output = fs_utils.ls_item(f"{path}")
return fs_utils.parse_ls_output(output)[0]
output = fs_tools.ls_item(f"{path}")
return fs_tools.parse_ls_output(output)[0]
def move(self,
destination,
force: bool = False):
target_dir_exists = fs_utils.check_if_directory_exists(destination)
fs_utils.move(str(self), destination, force)
target_dir_exists = fs_tools.check_if_directory_exists(destination)
fs_tools.move(str(self), destination, force)
if target_dir_exists:
self.full_path = f"{destination}{'/' if destination[-1] != '/' else ''}{self.name}"
else:
@ -80,7 +80,7 @@ class FsItem:
return self
def refresh_item(self):
updated_file = fs_utils.parse_ls_output(fs_utils.ls_item(self.full_path))[0]
updated_file = fs_tools.parse_ls_output(fs_tools.ls_item(self.full_path))[0]
# keep order the same as in __init__()
self.parent_dir = updated_file.parent_dir
self.name = updated_file.name

View File

@ -5,7 +5,7 @@
#
from core.test_run import TestRun
from test_tools.fs_utils import (
from test_tools.fs_tools import (
readlink,
create_directory,
check_if_symlink_exists,