opencas-test-framework/test_tools/fs_tools.py
Katarzyna Treder 5bd6a656c5 Fix imports
Signed-off-by: Katarzyna Treder <katarzyna.treder@h-partners.com>
2024-12-31 11:51:05 +01:00

456 lines
15 KiB
Python

#
# Copyright(c) 2019-2022 Intel Corporation
# Copyright(c) 2023-2024 Huawei Technologies Co., Ltd.
# SPDX-License-Identifier: BSD-3-Clause
#
import base64
import math
import re
import textwrap
from collections import namedtuple
from datetime import datetime, timedelta
from enum import Enum
from aenum import IntFlag # IntFlag from enum is not able to correctly parse string like "x|y|z"
from connection.utils.output import CmdException
from core.test_run import TestRun
from test_tools.dd import Dd
from type_def.size import Size, Unit
class Filesystem(Enum):
xfs = 0
ext3 = 1
ext4 = 2
class Permissions(IntFlag):
r = 4
w = 2
x = 1
def __str__(self):
ret_string = ""
for p in Permissions:
if p in self:
ret_string += p.name
return ret_string
class PermissionsUsers(IntFlag):
u = 4
g = 2
o = 1
def __str__(self):
ret_string = ""
for p in PermissionsUsers:
if p in self:
ret_string += p.name
return ret_string
class PermissionSign(Enum):
add = '+'
remove = '-'
set = '='
class FilesPermissions:
perms_exceptions = {}
def __init__(self, files_list: list):
self.files_list = files_list
def add_exceptions(self, perms: dict):
self.perms_exceptions.update(perms)
def check(self, file_perm: int = 644, dir_perm: int = 755):
failed_perms = []
FailedPerm = namedtuple("FailedPerm", ["file", "current_perm", "expected_perm"])
for file in self.files_list:
perm = get_permissions(file)
if file in self.perms_exceptions:
if perm != self.perms_exceptions[file]:
failed_perms.append(FailedPerm(file, perm, self.perms_exceptions[file]))
continue
if check_if_regular_file_exists(file):
if perm != file_perm:
failed_perms.append(FailedPerm(file, perm, file_perm))
elif check_if_directory_exists(file):
if perm != dir_perm:
failed_perms.append(FailedPerm(file, perm, dir_perm))
else:
raise Exception(f"{file}: File type not recognized.")
return failed_perms
def create_directory(path, parents: bool = False):
cmd = f"mkdir {'--parents ' if parents else ''}\"{path}\""
return TestRun.executor.run_expect_success(cmd)
def check_if_directory_exists(path):
return TestRun.executor.run(f"test -d \"{path}\"").exit_code == 0
def check_if_file_exists(path):
return TestRun.executor.run(f"test -e \"{path}\"").exit_code == 0
def check_if_regular_file_exists(path):
return TestRun.executor.run(f"test -f \"{path}\"").exit_code == 0
def check_if_symlink_exists(path):
return TestRun.executor.run(f"test -L \"{path}\"").exit_code == 0
def copy(source: str,
destination: str,
force: bool = False,
recursive: bool = False,
dereference: bool = False,
timeout: timedelta = timedelta(minutes=30)):
cmd = f"cp{' --force' if force else ''}" \
f"{' --recursive' if recursive else ''}" \
f"{' --dereference' if dereference else ''} " \
f"{source} {destination}"
return TestRun.executor.run_expect_success(cmd, timeout)
def move(source, destination, force: bool = False, timeout: timedelta = timedelta(minutes=30)):
cmd = f"mv{' --force' if force else ''} \"{source}\" \"{destination}\""
return TestRun.executor.run_expect_success(cmd, timeout)
def remove(path: str, force: bool = False, recursive: bool = False, ignore_errors: bool = False):
cmd = "rm"
cmd += " --force" if force else ""
cmd += " --recursive" if recursive else ""
cmd += f" \"{path}\"" if not path.endswith("*") else f" \"{path[:-1]}\"*"
output = TestRun.executor.run(cmd)
if output.exit_code != 0 and not ignore_errors:
raise Exception(f"Could not remove file {path}."
f"\nstdout: {output.stdout}\nstderr: {output.stderr}")
return output
def get_permissions(path, dereference: bool = True):
cmd = f"stat --format='%a' {'--dereference' if dereference else ''} \"{path}\""
return int(TestRun.executor.run_expect_success(cmd).stdout)
def chmod(path, permissions: Permissions, users: PermissionsUsers,
sign: PermissionSign = PermissionSign.set, recursive: bool = False):
cmd = f"chmod{' --recursive' if recursive else ''} " \
f"{str(users)}{sign.value}{str(permissions)} \"{path}\""
output = TestRun.executor.run(cmd)
return output
def chmod_numerical(path, permissions: int, recursive: bool = False):
cmd = f"chmod{' --recursive' if recursive else ''} {permissions} \"{path}\""
return TestRun.executor.run_expect_success(cmd)
def chown(path, owner, group, recursive):
cmd = f"chown {'--recursive ' if recursive else ''}{owner}:{group} \"{path}\""
return TestRun.executor.run_expect_success(cmd)
def create_file(path):
if not path.strip():
raise ValueError("Path cannot be empty or whitespaces.")
cmd = f"touch \"{path}\""
return TestRun.executor.run_expect_success(cmd)
def compare(file, other_file, timeout: timedelta = timedelta(minutes=30)):
output = TestRun.executor.run(f"cmp --silent \"{file}\" \"{other_file}\"", timeout)
if output.exit_code == 0:
return True
elif output.exit_code > 1:
raise Exception(f"Compare command execution failed. {output.stdout}\n{output.stderr}")
else:
return False
def diff(file, other_file, timeout: timedelta = timedelta(minutes=30)):
output = TestRun.executor.run(f"diff \"{file}\" \"{other_file}\"", timeout)
if output.exit_code == 0:
return None
elif output.exit_code > 1:
raise Exception(f"Diff command execution failed. {output.stdout}\n{output.stderr}")
else:
return output.stderr
def md5sum(file, binary=True, timeout: timedelta = timedelta(minutes=30)):
output = TestRun.executor.run(f"md5sum {'-b' if binary else ''} {file}", timeout)
if output.exit_code != 0:
raise Exception(f"Md5sum command execution failed! {output.stdout}\n{output.stderr}")
return output.stdout.split()[0]
def crc32sum(file, timeout: timedelta = timedelta(minutes=30)):
output = TestRun.executor.run(f"crc32 {file}", timeout)
if output.exit_code != 0:
raise Exception(f"crc32 command execution failed! {output.stdout}\n{output.stderr}")
return output.stdout
# For some reason separators other than '/' don't work when using sed on system paths
# This requires escaping '/' in pattern and target string
def escape_sed_string(string: str, sed_replace: bool = False):
string = string.replace("'", r"\x27").replace("/", r"\/")
# '&' has special meaning in sed replace and needs to be escaped
if sed_replace:
string = string.replace("&", r"\&")
return string
def insert_line_before_pattern(file, pattern, new_line):
pattern = escape_sed_string(pattern)
new_line = escape_sed_string(new_line)
cmd = f"sed -i '/{pattern}/i {new_line}' \"{file}\""
return TestRun.executor.run_expect_success(cmd)
def replace_first_pattern_occurrence(file, pattern, new_string):
pattern = escape_sed_string(pattern)
new_string = escape_sed_string(new_string, sed_replace=True)
cmd = f"sed -i '0,/{pattern}/s//{new_string}/' \"{file}\""
return TestRun.executor.run_expect_success(cmd)
def replace_in_lines(file, pattern, new_string, regexp=False):
pattern = escape_sed_string(pattern)
new_string = escape_sed_string(new_string, sed_replace=True)
cmd = f"sed -i{' -r' if regexp else ''} 's/{pattern}/{new_string}/g' \"{file}\""
return TestRun.executor.run_expect_success(cmd)
def append_line(file, string):
cmd = f"echo '{string}' >> \"{file}\""
return TestRun.executor.run_expect_success(cmd)
def remove_lines(file, pattern, regexp=False):
pattern = escape_sed_string(pattern)
cmd = f"sed -i{' -r' if regexp else ''} '/{pattern}/d' \"{file}\""
return TestRun.executor.run_expect_success(cmd)
def read_file(file):
if not file.strip():
raise ValueError("File path cannot be empty or whitespace.")
output = TestRun.executor.run_expect_success(f"cat \"{file}\"")
return output.stdout
def write_file(file, content, overwrite: bool = True, unix_line_end: bool = True):
if not file.strip():
raise ValueError("File path cannot be empty or whitespace.")
if not content:
raise ValueError("Content cannot be empty.")
if unix_line_end:
content.replace('\r', '')
content += '\n'
max_length = 60000
split_content = textwrap.TextWrapper(width=max_length, replace_whitespace=False).wrap(content)
split_content[-1] += '\n'
for s in split_content:
redirection_char = '>' if overwrite else '>>'
overwrite = False
encoded_content = base64.b64encode(s.encode("utf-8"))
cmd = f"printf '{encoded_content.decode('utf-8')}' " \
f"| base64 --decode {redirection_char} \"{file}\""
TestRun.executor.run_expect_success(cmd)
def uncompress_archive(file, destination=None):
from test_utils.filesystem.file import File
if not isinstance(file, File):
file = File(file)
if not destination:
destination = file.parent_dir
command = (f"unzip -u {file.full_path} -d {destination}"
if str(file).endswith(".zip")
else f"tar --extract --file={file.full_path} --directory={destination}")
TestRun.executor.run_expect_success(command)
def ls(path, options=''):
default_options = "-lA --time-style=+'%Y-%m-%d %H:%M:%S'"
output = TestRun.executor.run(
f"ls {default_options} {options} {path}")
return output.stdout
def ls_item(path):
output = ls(path, '-d')
return output.splitlines()[0] if output else None
def parse_ls_output(ls_output, dir_path=''):
split_output = ls_output.split('\n')
fs_items = []
for line in split_output:
if not line.strip():
continue
line_fields = line.split()
if len(line_fields) < 8:
continue
file_type = line[0]
if file_type not in ['-', 'd', 'l', 'b', 'c', 'p', 's']:
continue
permissions = line_fields[0][1:].replace('.', '')
owner = line_fields[2]
group = line_fields[3]
has_size = ',' not in line_fields[4]
if has_size:
size = Size(float(line_fields[4]), Unit.Byte)
else:
size = None
line_fields.pop(4)
split_date = line_fields[5].split('-')
split_time = line_fields[6].split(':')
modification_time = datetime(int(split_date[0]), int(split_date[1]), int(split_date[2]),
int(split_time[0]), int(split_time[1]), int(split_time[2]))
if dir_path and file_type != 'l':
full_path = '/'.join([dir_path, line_fields[7]])
else:
full_path = line_fields[7]
from test_utils.filesystem.file import File, FsItem
from test_utils.filesystem.directory import Directory
from test_utils.filesystem.symlink import Symlink
if file_type == '-':
fs_item = File(full_path)
elif file_type == 'd':
fs_item = Directory(full_path)
elif file_type == 'l':
fs_item = Symlink(full_path)
else:
fs_item = FsItem(full_path)
fs_item.permissions.user = Permissions['|'.join(list(permissions[:3].replace('-', '')))] \
if permissions[:3] != '---' else Permissions(0)
fs_item.permissions.group = Permissions['|'.join(list(permissions[3:6].replace('-', '')))] \
if permissions[3:6] != '---' else Permissions(0)
fs_item.permissions.other = Permissions['|'.join(list(permissions[6:].replace('-', '')))] \
if permissions[6:] != '---' else Permissions(0)
fs_item.owner = owner
fs_item.group = group
fs_item.size = size
fs_item.modification_time = modification_time
fs_items.append(fs_item)
return fs_items
def find_all_files(path: str, recursive: bool = True):
if not path.strip():
raise ValueError("No path given.")
output = TestRun.executor.run_expect_success(f"find \"{path}\" {'-maxdepth 1' if not recursive else ''} \( -type f -o -type l \) -print")
return output.stdout.splitlines()
def find_all_dirs(path: str, recursive: bool = True):
if not path.strip():
raise ValueError("No path given.")
output = TestRun.executor.run_expect_success(f"find \"{path}\" {'-maxdepth 1' if not recursive else ''} -type d -print")
return output.stdout.splitlines()
def find_all_items(path: str, recursive: bool = True):
return [*find_all_files(path, recursive), *find_all_dirs(path, recursive)]
def readlink(link: str, options="--canonicalize-existing"):
return TestRun.executor.run_expect_success(
f"readlink {options} \"{link}\""
).stdout
def create_random_test_file(target_file_path: str,
file_size: Size = Size(1, Unit.MebiByte),
random: bool = True):
from test_utils.filesystem.file import File
bs = Size(512, Unit.KibiByte)
cnt = math.ceil(file_size.value / bs.value)
file = File.create_file(target_file_path)
dd = Dd().output(target_file_path) \
.input("/dev/urandom" if random else "/dev/zero") \
.block_size(bs) \
.count(cnt) \
.oflag("direct")
dd.run()
file.refresh_item()
return file
def mkfs(device, filesystem: Filesystem, force=True, blocksize=None):
TestRun.LOGGER.info(
f"Creating filesystem ({filesystem.name}) on device: {device.path}")
force_param = ' -f ' if filesystem == Filesystem.xfs else ' -F '
force_param = force_param if force else ''
block_size_param = f' -b size={blocksize}' if filesystem == Filesystem.xfs \
else f' -b {blocksize}'
block_size_param = block_size_param if blocksize else ''
cmd = f'mkfs.{filesystem.name} {force_param} {device.path} {block_size_param}'
cmd = re.sub(' +', ' ', cmd)
TestRun.executor.run_expect_success(cmd)
TestRun.LOGGER.info(
f"Successfully created filesystem on device: {device.path}")
def wipefs(device, force=True):
TestRun.LOGGER.info(f"Erasing the device: {device.path}")
force_param = ' -f' if force else ''
cmd = f'wipefs -a{force_param} {device.path}'
TestRun.executor.run_expect_success(cmd)
TestRun.LOGGER.info(
f"Successfully wiped device: {device.path}")
def get_device_filesystem_type(device_id):
cmd = f'lsblk -l -o NAME,FSTYPE | sort | uniq | grep "{device_id} "'
try:
stdout = TestRun.executor.run_expect_success(cmd).stdout
except CmdException:
# unusual devices might not be listed in output (i.e. RAID containers)
if TestRun.executor.run(f"test -b /dev/{device_id}").exit_code != 0:
raise
else:
return None
split_stdout = stdout.strip().split()
if len(split_stdout) <= 1:
return None
else:
try:
return Filesystem[split_stdout[1]]
except KeyError:
TestRun.LOGGER.warning(f"Unrecognized filesystem: {split_stdout[1]}")
return None
def is_mounted(path: str):
if path is None or path.isspace():
raise Exception("Checked path cannot be empty")
command = f"mount | grep --fixed-strings '{path.rstrip('/')} '"
return TestRun.executor.run(command).exit_code == 0