tests: Embed test framework within OCL repository

Signed-off-by: Robert Baldyga <robert.baldyga@intel.com>
This commit is contained in:
Robert Baldyga
2022-12-23 12:50:17 +01:00
parent bc0c8c1bf5
commit 849f59855c
91 changed files with 9930 additions and 2 deletions

View File

@@ -0,0 +1,107 @@
#
# Copyright(c) 2020-2021 Intel Corporation
# SPDX-License-Identifier: BSD-3-Clause
#
# The MIT License (MIT)
#
# Copyright (c) 2004-2020 Holger Krekel and others
#
# Permission is hereby granted, free of charge, to any person obtaining a copy of
# this software and associated documentation files (the "Software"), to deal in
# the Software without restriction, including without limitation the rights to
# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
# of the Software, and to permit persons to whom the Software is furnished to do
# so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
from itertools import product, combinations
import random
from core.test_run import TestRun
def testcase_id(param_set):
if len(param_set.values) == 1:
return param_set.values[0]
return "-".join([str(value) for value in param_set.values])
def generate_pair_testing_testcases(*argvals):
"""
Generate test_cases from provided argument values lists in such way that each possible
(argX, argY) pair will be used.
"""
# if only one argument is used, yield from it
if len(argvals) == 1:
for val in argvals[0]:
yield (val,)
# append argument index to argument values list to avoid confusion when there are two arguments
# with the same type
for i, arg in enumerate(argvals):
for j, val in enumerate(arg):
arg[j] = (i, val)
# generate all possible test cases
all_test_cases = list(product(*argvals))
random.seed(TestRun.random_seed)
random.shuffle(all_test_cases)
used_pairs = set()
for tc in all_test_cases:
current_pairs = set(combinations(tc, 2))
# if cardinality of (current_pairs & used_pairs) is lesser than cardinality of current_pairs
# it means not all argument pairs in this tc have been used. return current tc
# and update used_pairs set
if len(current_pairs & used_pairs) != len(current_pairs):
used_pairs.update(current_pairs)
# unpack testcase by deleting argument index
yield list(list(zip(*tc))[1])
def register_testcases(metafunc, argnames, argvals):
"""
Add custom parametrization test cases. Based on metafunc's parametrize method.
"""
from _pytest.python import CallSpec2, _find_parametrized_scope
from _pytest.mark import ParameterSet
from _pytest.fixtures import scope2index
parameter_sets = [ParameterSet(values=val, marks=[], id=None) for val in argvals]
metafunc._validate_if_using_arg_names(argnames, False)
arg_value_types = metafunc._resolve_arg_value_types(argnames, False)
ids = [testcase_id(param_set) for param_set in parameter_sets]
scope = _find_parametrized_scope(argnames, metafunc._arg2fixturedefs, False)
scopenum = scope2index(scope, descr=f"parametrizex() call in {metafunc.function.__name__}")
calls = []
for callspec in metafunc._calls or [CallSpec2(metafunc)]:
for param_index, (param_id, param_set) in enumerate(zip(ids, parameter_sets)):
newcallspec = callspec.copy()
newcallspec.setmulti2(
arg_value_types,
argnames,
param_set.values,
param_id,
param_set.marks,
scopenum,
param_index,
)
calls.append(newcallspec)
metafunc._calls = calls

View File

@@ -0,0 +1,124 @@
#
# Copyright(c) 2020-2021 Intel Corporation
# SPDX-License-Identifier: BSD-3-Clause
#
import pytest
import sys
import importlib
import signal
from core.test_run import TestRun
class PluginManager:
def __init__(self, item, config):
if 'plugins_dir' in config:
sys.path.append(config['plugins_dir'])
self.plugins = {}
self.plugins_config = config.get('plugins', {})
self.req_plugins = config.get('req_plugins', {})
self.opt_plugins = config.get('opt_plugins', {})
self.req_plugins.update(dict(map(lambda mark: (mark.args[0], mark.kwargs),
item.iter_markers(name="require_plugin"))))
req_plugin_mod = {}
opt_plugin_mod = {}
for name in self.req_plugins:
try:
req_plugin_mod[name] = self.__import_plugin(name)
except ModuleNotFoundError:
pytest.skip("Unable to find requested plugin!")
for name in self.opt_plugins:
try:
opt_plugin_mod[name] = self.__import_plugin(name)
except ModuleNotFoundError as e:
TestRun.LOGGER.debug(
f"Failed to import '{name}' - optional plugin. " f"Reason: {e}"
)
continue
for name, mod in req_plugin_mod.items():
try:
self.plugins[name] = mod.plugin_class(
self.req_plugins[name],
self.plugins_config.get(name, {}).get("config", {}))
except Exception:
pytest.skip(f"Unable to initialize plugin '{name}'")
for name, mod in opt_plugin_mod.items():
try:
self.plugins[name] = mod.plugin_class(
self.opt_plugins[name],
self.plugins_config.get(name, {}).get("config", {}))
except Exception as e:
TestRun.LOGGER.debug(
f"Failed to initialize '{name}' - optional plugin. " f"Reason: {e}"
)
continue
def __import_plugin(self, name):
provided_by = self.plugins_config.get(name, {}).get("provided_by")
if provided_by:
return importlib.import_module(provided_by)
try:
return importlib.import_module(f"internal_plugins.{name}")
except ModuleNotFoundError:
pass
return importlib.import_module(f"external_plugins.{name}")
def hook_pre_setup(self):
for plugin in self.plugins.values():
plugin.pre_setup()
def hook_post_setup(self):
for plugin in self.plugins.values():
plugin.post_setup()
def hook_teardown(self):
for plugin in self.plugins.values():
plugin.teardown()
def get_plugin(self, name):
if name not in self.plugins:
raise KeyError("Requested plugin does not exist")
return self.plugins[name]
def teardown_on_signal(self, sig_id, plugin_name):
try:
plugin = self.get_plugin(plugin_name)
except Exception as e:
TestRun.LOGGER.warning(
f"Failed to setup teardown on signal for {plugin_name}. Reason: {e}")
return
old_sig_handler = None
def signal_handler(sig, frame):
plugin.teardown()
if old_sig_handler is not None:
if old_sig_handler == signal.SIG_DFL:
# In case of SIG_DFL the function pointer points to address 0,
# which is not a valid address.
# We have to reset the handler and raise the signal again
signal.signal(sig, signal.SIG_DFL)
signal.raise_signal(sig)
signal.signal(sig, signal_handler)
elif old_sig_handler == signal.SIG_IGN:
# SIG_IGN has value 1 (also an invalid address).
# Here we can just return (do nothing)
return
else:
# When we received neither SIG_IGN nor SIG_DFL, the received value is
# a valid function pointer and we can call the handler directly
old_sig_handler()
signal.signal(sig, old_sig_handler)
old_sig_handler = signal.signal(sig_id, signal_handler)

View File

@@ -0,0 +1,65 @@
#
# Copyright(c) 2019-2021 Intel Corporation
# SPDX-License-Identifier: BSD-3-Clause
#
from contextlib import contextmanager
import pytest
from log.logger import Log
class Blocked(Exception):
pass
class TestRun:
dut = None
executor = None
LOGGER: Log = None
plugin_manager = None
duts = None
disks = None
@classmethod
@contextmanager
def use_dut(cls, dut):
cls.dut = dut
cls.config = cls.dut.config
cls.executor = cls.dut.executor
cls.plugin_manager = cls.dut.plugin_manager
cls.disks = cls.dut.req_disks
yield cls.executor
cls.disks = None
cls.plugin_manager = None
cls.executor = None
# setting cls.config to None omitted (causes problems in the teardown stage of execution)
cls.dut = None
@classmethod
def step(cls, message):
return cls.LOGGER.step(message)
@classmethod
def group(cls, message):
return cls.LOGGER.group(message)
@classmethod
def iteration(cls, iterable, group_name=None):
TestRun.LOGGER.start_group(f"{group_name}" if group_name is not None else "Iteration list")
items = list(iterable)
for i, item in enumerate(items, start=1):
cls.LOGGER.start_iteration(f"Iteration {i}/{len(items)}")
yield item
TestRun.LOGGER.end_iteration()
TestRun.LOGGER.end_group()
@classmethod
def fail(cls, message):
pytest.fail(message)
@classmethod
def block(cls, message):
raise Blocked(message)

View File

@@ -0,0 +1,272 @@
#
# Copyright(c) 2019-2021 Intel Corporation
# SPDX-License-Identifier: BSD-3-Clause
#
import posixpath
import random
import sys
import traceback
import pytest
from IPy import IP
import core.test_run
from connection.local_executor import LocalExecutor
from connection.ssh_executor import SshExecutor
from core.pair_testing import generate_pair_testing_testcases, register_testcases
from core.plugins import PluginManager
from log.base_log import BaseLogResult
from storage_devices.disk import Disk
from test_utils import disk_finder
from test_utils.dut import Dut
TestRun = core.test_run.TestRun
@classmethod
def __configure(cls, config):
config.addinivalue_line(
"markers",
"require_disk(name, type): require disk of specific type, otherwise skip"
)
config.addinivalue_line(
"markers",
"require_plugin(name, *kwargs): require specific plugins, otherwise skip"
)
config.addinivalue_line(
"markers",
"remote_only: run test only in case of remote execution, otherwise skip"
)
config.addinivalue_line(
"markers",
"os_dependent: run test only if its OS dependent, otherwise skip"
)
config.addinivalue_line(
"markers",
"multidut(number): test requires a number of different platforms to be executed"
)
config.addinivalue_line(
"markers",
"parametrizex(argname, argvalues): sparse parametrized testing"
)
config.addinivalue_line(
"markers",
"CI: marks test for continuous integration pipeline"
)
cls.random_seed = config.getoption("--random-seed") or random.randrange(sys.maxsize)
random.seed(cls.random_seed)
TestRun.configure = __configure
@classmethod
def __prepare(cls, item, config):
if not config:
raise Exception("You need to specify DUT config!")
cls.item = item
cls.config = config
req_disks = list(map(lambda mark: mark.args, cls.item.iter_markers(name="require_disk")))
cls.req_disks = dict(req_disks)
if len(req_disks) != len(cls.req_disks):
raise Exception("Disk name specified more than once!")
TestRun.prepare = __prepare
@classmethod
def __attach_log(cls, log_path, target_name=None):
if target_name is None:
target_name = posixpath.basename(log_path)
if cls.config.get('extra_logs'):
cls.config["extra_logs"][target_name] = log_path
else:
cls.config["extra_logs"] = {target_name: log_path}
TestRun.attach_log = __attach_log
@classmethod
def __setup_disk(cls, disk_name, disk_type):
cls.disks[disk_name] = next(filter(
lambda disk: disk.disk_type in disk_type.types() and disk not in cls.disks.values(),
cls.dut.disks
), None)
if not cls.disks[disk_name]:
pytest.skip("Unable to find requested disk!")
TestRun.__setup_disk = __setup_disk
@classmethod
def __setup_disks(cls):
cls.disks = {}
items = list(cls.req_disks.items())
while items:
resolved, unresolved = [], []
for disk_name, disk_type in items:
(resolved, unresolved)[not disk_type.resolved()].append((disk_name, disk_type))
resolved.sort(
key=lambda disk: (lambda disk_name, disk_type: disk_type)(*disk)
)
for disk_name, disk_type in resolved:
cls.__setup_disk(disk_name, disk_type)
items = unresolved
cls.dut.req_disks = cls.disks
TestRun.__setup_disks = __setup_disks
@classmethod
def __presetup(cls):
cls.plugin_manager = PluginManager(cls.item, cls.config)
cls.plugin_manager.hook_pre_setup()
if cls.config['type'] == 'ssh':
try:
IP(cls.config['ip'])
except ValueError:
TestRun.block("IP address from config is in invalid format.")
port = cls.config.get('port', 22)
if 'user' in cls.config:
cls.executor = SshExecutor(
cls.config['ip'],
cls.config['user'],
port
)
else:
TestRun.block("There is no user given in config.")
elif cls.config['type'] == 'local':
cls.executor = LocalExecutor()
else:
TestRun.block("Execution type (local/ssh) is missing in DUT config!")
TestRun.presetup = __presetup
@classmethod
def __setup(cls):
if list(cls.item.iter_markers(name="remote_only")):
if not cls.executor.is_remote():
pytest.skip()
Disk.plug_all_disks()
if cls.config.get('allow_disk_autoselect', False):
cls.config["disks"] = disk_finder.find_disks()
try:
cls.dut = Dut(cls.config)
except Exception as ex:
raise Exception(f"Failed to setup DUT instance:\n"
f"{str(ex)}\n{traceback.format_exc()}")
cls.__setup_disks()
TestRun.LOGGER.info(f"Re-seeding random number generator with seed: {cls.random_seed}")
random.seed(cls.random_seed)
cls.plugin_manager.hook_post_setup()
TestRun.setup = __setup
@classmethod
def __makereport(cls, item, call, res):
cls.outcome = res.outcome
step_info = {
'result': res.outcome,
'exception': str(call.excinfo.value) if call.excinfo else None
}
setattr(item, "rep_" + res.when, step_info)
from _pytest.outcomes import Failed
from core.test_run import Blocked
if res.when == "call" and res.failed:
msg = f"{call.excinfo.type.__name__}: {call.excinfo.value}"
if call.excinfo.type is Failed:
cls.LOGGER.error(msg)
elif call.excinfo.type is Blocked:
cls.LOGGER.blocked(msg)
else:
cls.LOGGER.exception(msg)
elif res.when == "setup" and res.failed:
msg = f"{call.excinfo.type.__name__}: {call.excinfo.value}"
cls.LOGGER.exception(msg)
res.outcome = "failed"
if res.outcome == "skipped":
cls.LOGGER.skip("Test skipped.")
if res.when == "call" and cls.LOGGER.get_result() == BaseLogResult.FAILED:
res.outcome = "failed"
# To print additional message in final test report, assign it to res.longrepr
cls.LOGGER.generate_summary(item, cls.config.get('meta'))
TestRun.makereport = __makereport
@classmethod
def __generate_tests(cls, metafunc):
marks = getattr(metafunc.function, "pytestmark", [])
parametrizex_marks = [
mark for mark in marks if mark.name == "parametrizex"
]
if not parametrizex_marks:
random.seed(TestRun.random_seed)
return
argnames = []
argvals = []
for mark in parametrizex_marks:
argnames.append(mark.args[0])
argvals.append(list(mark.args[1]))
if metafunc.config.getoption("--parametrization-type") == "full":
for name, values in zip(argnames, argvals):
metafunc.parametrize(name, values)
elif metafunc.config.getoption("--parametrization-type") == "pair":
test_cases = generate_pair_testing_testcases(*argvals)
register_testcases(metafunc, argnames, test_cases)
else:
raise Exception("Not supported parametrization type")
random.seed(TestRun.random_seed)
TestRun.generate_tests = __generate_tests
@classmethod
def __addoption(cls, parser):
parser.addoption("--parametrization-type", choices=["pair", "full"], default="pair")
parser.addoption("--random-seed", type=int, default=None)
TestRun.addoption = __addoption
@classmethod
def __teardown(cls):
for dut in cls.duts:
with cls.use_dut(dut):
if cls.plugin_manager:
cls.plugin_manager.hook_teardown()
TestRun.teardown = __teardown