Move test-framework to its own repository
Signed-off-by: Robert Baldyga <baldyga.r@gmail.com>
This commit is contained in:
0
test_utils/filesystem/__init__.py
Normal file
0
test_utils/filesystem/__init__.py
Normal file
31
test_utils/filesystem/directory.py
Normal file
31
test_utils/filesystem/directory.py
Normal file
@@ -0,0 +1,31 @@
|
||||
#
|
||||
# Copyright(c) 2019-2021 Intel Corporation
|
||||
# SPDX-License-Identifier: BSD-3-Clause
|
||||
#
|
||||
from core.test_run import TestRun
|
||||
from test_tools import fs_utils
|
||||
from test_tools.fs_utils import check_if_directory_exists
|
||||
from test_utils.filesystem.fs_item import FsItem
|
||||
|
||||
|
||||
class Directory(FsItem):
|
||||
def __init__(self, full_path):
|
||||
FsItem.__init__(self, full_path)
|
||||
|
||||
def ls(self):
|
||||
output = fs_utils.ls(f"{self.full_path}")
|
||||
return fs_utils.parse_ls_output(output, self.full_path)
|
||||
|
||||
@staticmethod
|
||||
def create_directory(path: str, parents: bool = False):
|
||||
fs_utils.create_directory(path, parents)
|
||||
output = fs_utils.ls_item(path)
|
||||
return fs_utils.parse_ls_output(output)[0]
|
||||
|
||||
@staticmethod
|
||||
def create_temp_directory(parent_dir_path: str = "/tmp"):
|
||||
command = f"mktemp --directory --tmpdir={parent_dir_path}"
|
||||
output = TestRun.executor.run_expect_success(command)
|
||||
if not check_if_directory_exists(output.stdout):
|
||||
TestRun.LOGGER.exception("'mktemp' succeeded, but created directory does not exist")
|
||||
return Directory(output.stdout)
|
||||
83
test_utils/filesystem/file.py
Normal file
83
test_utils/filesystem/file.py
Normal file
@@ -0,0 +1,83 @@
|
||||
#
|
||||
# Copyright(c) 2019-2021 Intel Corporation
|
||||
# SPDX-License-Identifier: BSD-3-Clause
|
||||
#
|
||||
|
||||
from core.test_run import TestRun
|
||||
from test_tools import fs_utils
|
||||
from test_tools.dd import Dd
|
||||
from test_utils.filesystem.fs_item import FsItem
|
||||
from test_utils.size import Size
|
||||
|
||||
|
||||
class File(FsItem):
|
||||
def __init__(self, full_path):
|
||||
FsItem.__init__(self, full_path)
|
||||
|
||||
def compare(self, other_file):
|
||||
return fs_utils.compare(str(self), str(other_file))
|
||||
|
||||
def diff(self, other_file):
|
||||
return fs_utils.diff(str(self), str(other_file))
|
||||
|
||||
def md5sum(self, binary=True):
|
||||
output = TestRun.executor.run(
|
||||
f"md5sum {'-b' if binary else ''} {self.full_path}")
|
||||
if output.exit_code != 0:
|
||||
raise Exception(f"Md5sum command execution failed! {output.stdout}\n{output.stderr}")
|
||||
return output.stdout.split()[0]
|
||||
|
||||
def read(self):
|
||||
return fs_utils.read_file(str(self))
|
||||
|
||||
def write(self, content, overwrite: bool = True):
|
||||
fs_utils.write_file(str(self), content, overwrite)
|
||||
self.refresh_item()
|
||||
|
||||
def get_properties(self):
|
||||
return FileProperties(self)
|
||||
|
||||
@staticmethod
|
||||
def create_file(path: str):
|
||||
fs_utils.create_file(path)
|
||||
output = fs_utils.ls_item(path)
|
||||
return fs_utils.parse_ls_output(output)[0]
|
||||
|
||||
def padding(self, size: Size):
|
||||
dd = Dd().input("/dev/zero").output(self).count(1).block_size(size)
|
||||
dd.run()
|
||||
self.refresh_item()
|
||||
|
||||
def remove(self, force: bool = False, ignore_errors: bool = False):
|
||||
fs_utils.remove(str(self), force=force, ignore_errors=ignore_errors)
|
||||
|
||||
def copy(self,
|
||||
destination,
|
||||
force: bool = False,
|
||||
recursive: bool = False,
|
||||
dereference: bool = False):
|
||||
fs_utils.copy(str(self), destination, force, recursive, dereference)
|
||||
if fs_utils.check_if_directory_exists(destination):
|
||||
path = f"{destination}{'/' if destination[-1] != '/' else ''}{self.name}"
|
||||
else:
|
||||
path = destination
|
||||
output = fs_utils.ls_item(path)
|
||||
return fs_utils.parse_ls_output(output)[0]
|
||||
|
||||
|
||||
class FileProperties:
|
||||
def __init__(self, file):
|
||||
file = fs_utils.parse_ls_output(fs_utils.ls_item(file.full_path))[0]
|
||||
self.full_path = file.full_path
|
||||
self.parent_dir = FsItem.get_parent_dir(self.full_path)
|
||||
self.name = FsItem.get_name(self.full_path)
|
||||
self.modification_time = file.modification_time
|
||||
self.owner = file.owner
|
||||
self.group = file.group
|
||||
self.permissions = file.permissions
|
||||
self.size = file.size
|
||||
|
||||
def __eq__(self, other):
|
||||
return (self.permissions == other.permissions and self.size == other.size
|
||||
and self.owner == other.owner and self.group == other.group
|
||||
and self.name == other.name)
|
||||
102
test_utils/filesystem/fs_item.py
Normal file
102
test_utils/filesystem/fs_item.py
Normal file
@@ -0,0 +1,102 @@
|
||||
#
|
||||
# Copyright(c) 2019-2021 Intel Corporation
|
||||
# SPDX-License-Identifier: BSD-3-Clause
|
||||
#
|
||||
|
||||
import posixpath
|
||||
|
||||
from test_tools import fs_utils
|
||||
|
||||
|
||||
class FsItem:
|
||||
def __init__(self, full_path):
|
||||
self.full_path = full_path
|
||||
# all below values must be refreshed in refresh_item()
|
||||
self.parent_dir = self.get_parent_dir(self.full_path)
|
||||
self.name = self.get_name(self.full_path)
|
||||
self.modification_time = None
|
||||
self.owner = None
|
||||
self.group = None
|
||||
self.permissions = FsPermissions()
|
||||
self.size = None
|
||||
|
||||
@staticmethod
|
||||
def get_name(path):
|
||||
head, tail = posixpath.split(path)
|
||||
return tail or posixpath.basename(head)
|
||||
|
||||
@staticmethod
|
||||
def get_parent_dir(path):
|
||||
head, tail = posixpath.split(path)
|
||||
if tail:
|
||||
return head
|
||||
else:
|
||||
head, tail = posixpath.split(head)
|
||||
return head
|
||||
|
||||
def __str__(self):
|
||||
return self.full_path
|
||||
|
||||
def chmod_numerical(self, permissions: int, recursive: bool = False):
|
||||
fs_utils.chmod_numerical(self.full_path, permissions, recursive)
|
||||
self.refresh_item()
|
||||
|
||||
def chmod(self,
|
||||
permissions: fs_utils.Permissions,
|
||||
users: fs_utils.PermissionsUsers,
|
||||
sign: fs_utils.PermissionSign = fs_utils.PermissionSign.set,
|
||||
recursive: bool = False):
|
||||
fs_utils.chmod(self.full_path, permissions, users, sign=sign, recursive=recursive)
|
||||
self.refresh_item()
|
||||
|
||||
def chown(self, owner, group, recursive: bool = False):
|
||||
fs_utils.chown(self.full_path, owner, group, recursive)
|
||||
self.refresh_item()
|
||||
|
||||
def copy(self,
|
||||
destination,
|
||||
force: bool = False,
|
||||
recursive: bool = False,
|
||||
dereference: bool = False):
|
||||
target_dir_exists = fs_utils.check_if_directory_exists(destination)
|
||||
fs_utils.copy(str(self), destination, force, recursive, dereference)
|
||||
if target_dir_exists:
|
||||
path = f"{destination}{'/' if destination[-1] != '/' else ''}{self.name}"
|
||||
else:
|
||||
path = destination
|
||||
output = fs_utils.ls_item(f"{path}")
|
||||
return fs_utils.parse_ls_output(output)[0]
|
||||
|
||||
def move(self,
|
||||
destination,
|
||||
force: bool = False):
|
||||
target_dir_exists = fs_utils.check_if_directory_exists(destination)
|
||||
fs_utils.move(str(self), destination, force)
|
||||
if target_dir_exists:
|
||||
self.full_path = f"{destination}{'/' if destination[-1] != '/' else ''}{self.name}"
|
||||
else:
|
||||
self.full_path = destination
|
||||
self.refresh_item()
|
||||
return self
|
||||
|
||||
def refresh_item(self):
|
||||
updated_file = fs_utils.parse_ls_output(fs_utils.ls_item(self.full_path))[0]
|
||||
# keep order the same as in __init__()
|
||||
self.parent_dir = updated_file.parent_dir
|
||||
self.name = updated_file.name
|
||||
self.modification_time = updated_file.modification_time
|
||||
self.owner = updated_file.owner
|
||||
self.group = updated_file.group
|
||||
self.permissions = updated_file.permissions
|
||||
self.size = updated_file.size
|
||||
return self
|
||||
|
||||
|
||||
class FsPermissions:
|
||||
def __init__(self, user=None, group=None, other=None):
|
||||
self.user = user
|
||||
self.group = group
|
||||
self.other = other
|
||||
|
||||
def __eq__(self, other):
|
||||
return self.user == other.user and self.group == other.group and self.other == other.other
|
||||
91
test_utils/filesystem/symlink.py
Normal file
91
test_utils/filesystem/symlink.py
Normal file
@@ -0,0 +1,91 @@
|
||||
#
|
||||
# Copyright(c) 2019-2021 Intel Corporation
|
||||
# SPDX-License-Identifier: BSD-3-Clause
|
||||
#
|
||||
|
||||
from core.test_run import TestRun
|
||||
from test_tools.fs_utils import (
|
||||
readlink,
|
||||
create_directory,
|
||||
check_if_symlink_exists,
|
||||
check_if_directory_exists,
|
||||
)
|
||||
from test_utils.filesystem.file import File
|
||||
|
||||
|
||||
class Symlink(File):
|
||||
def __init__(self, full_path):
|
||||
File.__init__(self, full_path)
|
||||
|
||||
def md5sum(self, binary=True):
|
||||
output = TestRun.executor.run_expect_success(
|
||||
f"md5sum {'-b' if binary else ''} {self.get_target()}"
|
||||
)
|
||||
return output.stdout.split()[0]
|
||||
|
||||
def get_target(self):
|
||||
return readlink(self.full_path)
|
||||
|
||||
def get_symlink_path(self):
|
||||
return self.full_path
|
||||
|
||||
def remove_symlink(self):
|
||||
path = self.get_symlink_path()
|
||||
TestRun.executor.run_expect_success(f"rm -f {path}")
|
||||
|
||||
@classmethod
|
||||
def create_symlink(cls, link_path: str, target: str, force: bool = False):
|
||||
"""
|
||||
Creates a Symlink - new or overwrites existing one if force parameter is True
|
||||
:param link_path: path to the place where we want to create a symlink
|
||||
:param target: the path of an object that the requested Symlink points to
|
||||
:param force: determines if the existing symlink with the same name should be overridden
|
||||
return: Symlink object located under link_path
|
||||
"""
|
||||
cmd = f"ln --symbolic {target} {link_path}"
|
||||
is_dir = check_if_directory_exists(link_path)
|
||||
parent_dir = cls.get_parent_dir(link_path)
|
||||
if is_dir:
|
||||
raise IsADirectoryError(f"'{link_path}' is an existing directory.")
|
||||
if force:
|
||||
if not check_if_directory_exists(parent_dir):
|
||||
create_directory(parent_dir, True)
|
||||
TestRun.executor.run_expect_success(f"rm -f {link_path}")
|
||||
TestRun.executor.run_expect_success(cmd)
|
||||
return cls(link_path)
|
||||
|
||||
@classmethod
|
||||
def get_symlink(cls, link_path: str, target: str = None, create: bool = False):
|
||||
"""
|
||||
Request a Symlink (create new or identify existing)
|
||||
:param link_path: full path of the requested Symlink
|
||||
:param target: path of an object that the requested Symlink points to
|
||||
(required if create is True)
|
||||
:param create: determines if the requested Symlink should be created if it does not exist
|
||||
:return: Symlink object located under link_path
|
||||
"""
|
||||
if create and not target:
|
||||
raise AttributeError("Target is required for symlink creation.")
|
||||
|
||||
is_symlink = check_if_symlink_exists(link_path)
|
||||
if is_symlink:
|
||||
if not target or readlink(link_path) == readlink(target):
|
||||
return cls(link_path)
|
||||
else:
|
||||
raise FileExistsError("Existing symlink points to a different target.")
|
||||
elif not create:
|
||||
raise FileNotFoundError("Requested symlink does not exist.")
|
||||
|
||||
is_dir = check_if_directory_exists(link_path)
|
||||
if is_dir:
|
||||
raise IsADirectoryError(
|
||||
f"'{link_path}' is an existing directory." "\nUse a full path for symlink creation."
|
||||
)
|
||||
|
||||
parent_dir = cls.get_parent_dir(link_path)
|
||||
if not check_if_directory_exists(parent_dir):
|
||||
create_directory(parent_dir, True)
|
||||
|
||||
cmd = f"ln --symbolic {target} {link_path}"
|
||||
TestRun.executor.run_expect_success(cmd)
|
||||
return cls(link_path)
|
||||
Reference in New Issue
Block a user