From e52d34c1c886b5181b4a17ba95e64910c079c3c1 Mon Sep 17 00:00:00 2001 From: Kamil Lepek Date: Mon, 10 Jun 2019 15:49:15 +0200 Subject: [PATCH] Adapt all python code to PEP8 style standards Signed-off-by: Kamil Lepek --- .pep8speaks.yml | 1 + tests/functional/pyocf/types/cache.py | 8 +- tests/functional/pyocf/types/data.py | 4 +- tests/functional/pyocf/types/io.py | 2 +- tests/functional/pyocf/types/queue.py | 2 +- tests/functional/pyocf/types/shared.py | 2 +- tests/functional/pyocf/types/volume.py | 14 +- tests/functional/pyocf/utils.py | 22 +- tests/functional/tests/engine/test_wo.py | 60 +- .../tests/management/test_start_stop.py | 52 +- .../security/test_management_start_fuzzy.py | 21 +- tests/unit/framework/add_new_test_file.py | 252 ++-- .../framework/prepare_sources_for_testing.py | 1088 +++++++++-------- tests/unit/framework/run_unit_tests.py | 32 +- tests/unit/framework/tests_config.py | 27 +- 15 files changed, 822 insertions(+), 765 deletions(-) diff --git a/.pep8speaks.yml b/.pep8speaks.yml index e5ee75c..93ede8a 100644 --- a/.pep8speaks.yml +++ b/.pep8speaks.yml @@ -8,5 +8,6 @@ pycodestyle: max-line-length: 100 ignore: - E402 # module level import not at top of file + - W503 # line break after binary operator no_blank_comment: True diff --git a/tests/functional/pyocf/types/cache.py b/tests/functional/pyocf/types/cache.py index afa87a8..35c3d84 100644 --- a/tests/functional/pyocf/types/cache.py +++ b/tests/functional/pyocf/types/cache.py @@ -86,6 +86,7 @@ class CacheMode(IntEnum): def read_insert(self): return self.value not in [CacheMode.PT, CacheMode.WO] + class EvictionPolicy(IntEnum): LRU = 0 DEFAULT = LRU @@ -306,7 +307,7 @@ class Cache: c.start_cache() try: c.load_cache(device) - except: + except: # noqa E722 c.stop() raise @@ -319,7 +320,7 @@ class Cache: c.start_cache() try: c.attach_device(device, force=True) - except: + except: # noqa E722 c.stop() raise @@ -529,13 +530,12 @@ class Cache: if c.results["error"]: raise OcfError("Couldn't flush cache", c.results["error"]) - def get_name(self): self.read_lock() try: return str(self.owner.lib.ocf_cache_get_name(self), encoding="ascii") - except: + except: # noqa E722 raise OcfError("Couldn't get cache name") finally: self.read_unlock() diff --git a/tests/functional/pyocf/types/data.py b/tests/functional/pyocf/types/data.py index 3c49f46..b032cf3 100644 --- a/tests/functional/pyocf/types/data.py +++ b/tests/functional/pyocf/types/data.py @@ -56,7 +56,7 @@ class DataOps(Structure): class Data: - DATA_POISON=0xA5 + DATA_POISON = 0xA5 PAGE_SIZE = 4096 _instances_ = {} @@ -109,7 +109,7 @@ class Data: def from_string(cls, source: str, encoding: str = "ascii"): b = bytes(source, encoding) # duplicate string to fill space up to sector boundary - padding_len = S.from_B(len(b), sector_aligned = True).B - len(b) + padding_len = S.from_B(len(b), sector_aligned=True).B - len(b) padding = b * (padding_len // len(b) + 1) padding = padding[:padding_len] b = b + padding diff --git a/tests/functional/pyocf/types/io.py b/tests/functional/pyocf/types/io.py index 8da63d5..16c8a2a 100644 --- a/tests/functional/pyocf/types/io.py +++ b/tests/functional/pyocf/types/io.py @@ -92,7 +92,7 @@ class Io(Structure): def end(self, err): try: self.callback(err) - except: + except: # noqa E722 pass self.put() diff --git a/tests/functional/pyocf/types/queue.py b/tests/functional/pyocf/types/queue.py index b7747f2..da09639 100644 --- a/tests/functional/pyocf/types/queue.py +++ b/tests/functional/pyocf/types/queue.py @@ -36,6 +36,7 @@ def io_queue_run(*, queue: Queue, kick: Condition, stop: Event): if stop.is_set() and not OcfLib.getInstance().ocf_queue_pending_io(queue): break + class Queue: _instances_ = {} @@ -102,4 +103,3 @@ class Queue: self.kick_condition.notify_all() self.thread.join() - diff --git a/tests/functional/pyocf/types/shared.py b/tests/functional/pyocf/types/shared.py index 28677be..6f53dda 100644 --- a/tests/functional/pyocf/types/shared.py +++ b/tests/functional/pyocf/types/shared.py @@ -102,7 +102,7 @@ class SharedOcfObject(Structure): def get_instance(cls, ref: int): try: return cls._instances_[ref] - except: + except: # noqa E722 logging.getLogger("pyocf").error( "OcfSharedObject corruption. wanted: {} instances: {}".format( ref, cls._instances_ diff --git a/tests/functional/pyocf/types/volume.py b/tests/functional/pyocf/types/volume.py index 29f4b9d..168e4af 100644 --- a/tests/functional/pyocf/types/volume.py +++ b/tests/functional/pyocf/types/volume.py @@ -74,7 +74,7 @@ class VolumeIoPriv(Structure): class Volume(Structure): - VOLUME_POISON=0x13 + VOLUME_POISON = 0x13 _fields_ = [("_storage", c_void_p)] _instances_ = {} @@ -184,7 +184,7 @@ class Volume(Structure): uuid = str(uuid_ptr.contents._data, encoding="ascii") try: volume = Volume.get_by_uuid(uuid) - except: + except: # noqa E722 TODO:Investigate whether this really should be so broad print("Tried to access unallocated volume {}".format(uuid)) print("{}".format(Volume._uuid_)) return -1 @@ -255,7 +255,7 @@ class Volume(Structure): memset(dst, 0, discard.contents._bytes) discard.contents._end(discard, 0) - except: + except: # noqa E722 discard.contents._end(discard, -5) def get_stats(self): @@ -269,8 +269,7 @@ class Volume(Structure): self.stats[IoDir(io.contents._dir)] += 1 io_priv = cast( - OcfLib.getInstance().ocf_io_get_priv(io), POINTER(VolumeIoPriv) - ) + OcfLib.getInstance().ocf_io_get_priv(io), POINTER(VolumeIoPriv)) offset = io_priv.contents._offset if io.contents._dir == IoDir.WRITE: @@ -286,7 +285,7 @@ class Volume(Structure): io_priv.contents._offset += io.contents._bytes io.contents._end(io, 0) - except: + except: # noqa E722 io.contents._end(io, -5) def dump(self, offset=0, size=0, ignore=VOLUME_POISON, **kwargs): @@ -325,10 +324,11 @@ class ErrorDevice(Volume): super().reset_stats() self.stats["errors"] = {IoDir.WRITE: 0, IoDir.READ: 0} + class TraceDevice(Volume): def __init__(self, size, trace_fcn=None, uuid=None): super().__init__(size, uuid) - self.trace_fcn=trace_fcn + self.trace_fcn = trace_fcn def submit_io(self, io): submit = True diff --git a/tests/functional/pyocf/utils.py b/tests/functional/pyocf/utils.py index a0710af..a308ca2 100644 --- a/tests/functional/pyocf/utils.py +++ b/tests/functional/pyocf/utils.py @@ -6,7 +6,8 @@ from ctypes import string_at -def print_buffer(buf, length, offset=0, width=16, ignore=0, stop_after_count_ignored=0, print_fcn=print): +def print_buffer(buf, length, offset=0, width=16, ignore=0, + stop_after_count_ignored=0, print_fcn=print): end = int(offset) + int(length) offset = int(offset) ignored_lines = 0 @@ -15,16 +16,13 @@ def print_buffer(buf, length, offset=0, width=16, ignore=0, stop_after_count_ign stop_after_count_ignored = int(stop_after_count_ignored / width) for addr in range(offset, end, width): - cur_line = buf[addr : min(end, addr + width)] + cur_line = buf[addr: min(end, addr + width)] byteline = "" asciiline = "" if not any(x != ignore for x in cur_line): if stop_after_count_ignored and ignored_lines > stop_after_count_ignored: - print_fcn( - "<{} bytes of '0x{:02X}' encountered, stopping>".format( - stop_after_count_ignored * width, ignore - ) - ) + print_fcn("<{} bytes of '0x{:02X}' encountered, stopping>". + format(stop_after_count_ignored * width, ignore)) return ignored_lines += 1 continue @@ -71,23 +69,23 @@ class Size: return self.bytes @classmethod - def from_B(cls, value, sector_aligned = False): + def from_B(cls, value, sector_aligned=False): return cls(value, sector_aligned) @classmethod - def from_KiB(cls, value, sector_aligned = False): + def from_KiB(cls, value, sector_aligned=False): return cls(value * cls._KiB, sector_aligned) @classmethod - def from_MiB(cls, value, sector_aligned = False): + def from_MiB(cls, value, sector_aligned=False): return cls(value * cls._MiB, sector_aligned) @classmethod - def from_GiB(cls, value, sector_aligned = False): + def from_GiB(cls, value, sector_aligned=False): return cls(value * cls._GiB, sector_aligned) @classmethod - def from_TiB(cls, value, sector_aligned = False): + def from_TiB(cls, value, sector_aligned=False): return cls(value * cls._TiB, sector_aligned) @classmethod diff --git a/tests/functional/tests/engine/test_wo.py b/tests/functional/tests/engine/test_wo.py index 2ea0b60..db2862f 100644 --- a/tests/functional/tests/engine/test_wo.py +++ b/tests/functional/tests/engine/test_wo.py @@ -3,7 +3,6 @@ # SPDX-License-Identifier: BSD-3-Clause-Clear # -import pytest from ctypes import c_int, memmove, cast, c_void_p from enum import IntEnum from itertools import product @@ -11,11 +10,12 @@ import random from pyocf.types.cache import Cache, CacheMode from pyocf.types.core import Core -from pyocf.types.volume import Volume, ErrorDevice +from pyocf.types.volume import Volume from pyocf.types.data import Data from pyocf.types.io import IoDir from pyocf.utils import Size -from pyocf.types.shared import OcfError, OcfCompletion +from pyocf.types.shared import OcfCompletion + def __io(io, queue, address, size, data, direction): io.set_data(data, 0) @@ -38,13 +38,16 @@ def _io(io, queue, address, size, data, offset, direction): memmove(cast(data, c_void_p).value + offset, _data.handle, size) return ret + def io_to_core(core, address, size, data, offset, direction): return _io(core.new_core_io(), core.cache.get_default_queue(), address, size, - data, offset, direction) + data, offset, direction) + def io_to_exp_obj(core, address, size, data, offset, direction): return _io(core.new_io(), core.cache.get_default_queue(), address, size, data, - offset, direction) + offset, direction) + def sector_to_region(sector, region_start): i = 0 @@ -52,10 +55,12 @@ def sector_to_region(sector, region_start): i += 1 return i + class SectorStatus(IntEnum): - DIRTY = 0, - CLEAN = 1, - INVALID = 2, + DIRTY = 0, + CLEAN = 1, + INVALID = 2, + I = SectorStatus.INVALID D = SectorStatus.DIRTY @@ -85,6 +90,8 @@ C = SectorStatus.CLEAN # - if clean, exported object sector no @n is filled with 100 + @n # - if dirty, exported object sector no @n is filled with 200 + @n # + + def test_wo_read_data_consistency(pyocf_ctx): # start sector for each region region_start = [0, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17] @@ -114,11 +121,11 @@ def test_wo_read_data_consistency(pyocf_ctx): data = {} # memset n-th sector of core data with n - data[SectorStatus.INVALID] = bytes([x // SECTOR_SIZE for x in range (WORKSET_SIZE)]) + data[SectorStatus.INVALID] = bytes([x // SECTOR_SIZE for x in range(WORKSET_SIZE)]) # memset n-th sector of clean data with n + 100 - data[SectorStatus.CLEAN] = bytes([100 + x // SECTOR_SIZE for x in range (WORKSET_SIZE)]) + data[SectorStatus.CLEAN] = bytes([100 + x // SECTOR_SIZE for x in range(WORKSET_SIZE)]) # memset n-th sector of dirty data with n + 200 - data[SectorStatus.DIRTY] = bytes([200 + x // SECTOR_SIZE for x in range (WORKSET_SIZE)]) + data[SectorStatus.DIRTY] = bytes([200 + x // SECTOR_SIZE for x in range(WORKSET_SIZE)]) result_b = bytes(WORKSET_SIZE) @@ -137,30 +144,30 @@ def test_wo_read_data_consistency(pyocf_ctx): combinations.append(S) random.shuffle(combinations) - # add fixed test cases at the beginnning + # add fixed test cases at the beginning combinations = fixed_combinations + combinations for S in combinations[:ITRATION_COUNT]: # write data to core and invalidate all CL - cache.change_cache_mode(cache_mode = CacheMode.PT) - io_to_exp_obj(core, WORKSET_OFFSET, len(data[SectorStatus.INVALID]), \ - data[SectorStatus.INVALID], 0, IoDir.WRITE) + cache.change_cache_mode(cache_mode=CacheMode.PT) + io_to_exp_obj(core, WORKSET_OFFSET, len(data[SectorStatus.INVALID]), + data[SectorStatus.INVALID], 0, IoDir.WRITE) # insert clean sectors - cache.change_cache_mode(cache_mode = CacheMode.WT) + cache.change_cache_mode(cache_mode=CacheMode.WT) for sec in range(SECTOR_COUNT): region = sector_to_region(sec, region_start) if S[region] == SectorStatus.CLEAN: - io_to_exp_obj(core, WORKSET_OFFSET + SECTOR_SIZE * sec, SECTOR_SIZE, \ - data[SectorStatus.CLEAN], sec * SECTOR_SIZE, IoDir.WRITE) + io_to_exp_obj(core, WORKSET_OFFSET + SECTOR_SIZE * sec, SECTOR_SIZE, + data[SectorStatus.CLEAN], sec * SECTOR_SIZE, IoDir.WRITE) # write dirty sectors - cache.change_cache_mode(cache_mode = CacheMode.WO) + cache.change_cache_mode(cache_mode=CacheMode.WO) for sec in range(SECTOR_COUNT): region = sector_to_region(sec, region_start) if S[region] == SectorStatus.DIRTY: - io_to_exp_obj(core, WORKSET_OFFSET + SECTOR_SIZE * sec, SECTOR_SIZE, \ - data[SectorStatus.DIRTY], sec * SECTOR_SIZE, IoDir.WRITE) + io_to_exp_obj(core, WORKSET_OFFSET + SECTOR_SIZE * sec, SECTOR_SIZE, + data[SectorStatus.DIRTY], sec * SECTOR_SIZE, IoDir.WRITE) for s in start_sec: for e in end_sec: @@ -171,10 +178,9 @@ def test_wo_read_data_consistency(pyocf_ctx): START = s * SECTOR_SIZE END = e * SECTOR_SIZE size = (e - s + 1) * SECTOR_SIZE - assert(0 == io_to_exp_obj(core, WORKSET_OFFSET + START, size, \ - result_b, START, IoDir.READ)), \ - "error reading in WO mode: S={}, start={}, end={}".format( \ - S, s, e) + assert(0 == io_to_exp_obj(core, WORKSET_OFFSET + START, size, + result_b, START, IoDir.READ)),\ + "error reading in WO mode: S={}, start={}, end={}".format(S, s, e) # verify read data for sec in range(s, e + 1): @@ -182,6 +188,4 @@ def test_wo_read_data_consistency(pyocf_ctx): region = sector_to_region(sec, region_start) check_byte = sec * SECTOR_SIZE assert(result_b[check_byte] == data[S[region]][check_byte]), \ - "unexpected data in sector {}, S={}, s={}, e={}\n".format( \ - sec, S, s, e) - + "unexpected data in sector {}, S={}, s={}, e={}\n".format(sec, S, s, e) diff --git a/tests/functional/tests/management/test_start_stop.py b/tests/functional/tests/management/test_start_stop.py index 648f3e0..29f04ae 100644 --- a/tests/functional/tests/management/test_start_stop.py +++ b/tests/functional/tests/management/test_start_stop.py @@ -111,8 +111,9 @@ def test_start_read_first_and_check_mode(pyocf_ctx, mode: CacheMode, cls: CacheL core_device.reset_stats() test_data = Data.from_string("Changed test data") - + io_to_core(core_exported, test_data, Size.from_sector(1).B) + check_stats_write_after_read(core_exported, mode, cls, True) logger.info("[STAGE] Read from exported object after write") @@ -159,7 +160,8 @@ def test_start_params(pyocf_ctx, mode: CacheMode, cls: CacheLineSize, layout: Me assert stats["conf"]["eviction_policy"] == EvictionPolicy.DEFAULT, "Eviction policy" assert stats["conf"]["cache_id"] == cache_id, "Cache id" assert cache.get_name() == name, "Cache name" - # TODO: metadata_layout, metadata_volatile, max_queue_size, queue_unblock_size, pt_unaligned_io, use_submit_fast + # TODO: metadata_layout, metadata_volatile, max_queue_size, + # queue_unblock_size, pt_unaligned_io, use_submit_fast # TODO: test in functional tests @@ -254,8 +256,9 @@ def test_100_start_stop(pyocf_ctx): def test_start_stop_incrementally(pyocf_ctx): """Starting/stopping multiple caches incrementally. - Check whether OCF behaves correctly when few caches at a time are in turns added and removed (#added > #removed) - until their number reaches limit, and then proportions are reversed and number of caches gradually falls to 0. + Check whether OCF behaves correctly when few caches at a time are + in turns added and removed (#added > #removed) until their number reaches limit, + and then proportions are reversed and number of caches gradually falls to 0. """ caches = [] @@ -292,7 +295,8 @@ def test_start_stop_incrementally(pyocf_ctx): stats = cache.get_stats() cache_id = stats["conf"]["cache_id"] cache.stop() - assert get_cache_by_id(pyocf_ctx, cache_id) != 0, "Try getting cache after stopping it" + assert get_cache_by_id(pyocf_ctx, cache_id) !=\ + 0, "Try getting cache after stopping it" add = not add @@ -306,11 +310,17 @@ def test_start_cache_same_id(pyocf_ctx, mode, cls): cache_device1 = Volume(Size.from_MiB(20)) cache_device2 = Volume(Size.from_MiB(20)) cache_id = randrange(1, 16385) - cache = Cache.start_on_device(cache_device1, cache_mode=mode, cache_line_size=cls, cache_id=cache_id) + cache = Cache.start_on_device(cache_device1, + cache_mode=mode, + cache_line_size=cls, + cache_id=cache_id) cache.get_stats() with pytest.raises(OcfError, match="OCF_ERR_CACHE_EXIST"): - cache = Cache.start_on_device(cache_device2, cache_mode=mode, cache_line_size=cls, cache_id=cache_id) + cache = Cache.start_on_device(cache_device2, + cache_mode=mode, + cache_line_size=cls, + cache_id=cache_id) cache.get_stats() @@ -418,14 +428,20 @@ def check_stats_write_empty(exported_obj: Core, mode: CacheMode, cls: CacheLineS "Occupancy" -def check_stats_write_after_read(exported_obj: Core, mode: CacheMode, cls: CacheLineSize, read_from_empty=False): +def check_stats_write_after_read(exported_obj: Core, + mode: CacheMode, + cls: CacheLineSize, + read_from_empty=False): stats = exported_obj.cache.get_stats() assert exported_obj.cache.device.get_stats()[IoDir.WRITE] == \ - (0 if mode in {CacheMode.WI, CacheMode.PT} else (2 if read_from_empty and mode.lazy_write() else 1)), \ + (0 if mode in {CacheMode.WI, CacheMode.PT} else + (2 if read_from_empty and mode.lazy_write() else 1)), \ "Writes to cache device" assert exported_obj.device.get_stats()[IoDir.WRITE] == (0 if mode.lazy_write() else 1), \ "Writes to core device" - assert stats["req"]["wr_hits"]["value"] == (1 if (mode.read_insert() and mode != CacheMode.WI) or (mode.write_insert() and not read_from_empty) else 0), \ + assert stats["req"]["wr_hits"]["value"] == \ + (1 if (mode.read_insert() and mode != CacheMode.WI) + or (mode.write_insert() and not read_from_empty) else 0), \ "Write hits" assert stats["usage"]["occupancy"]["value"] == \ (0 if mode in {CacheMode.WI, CacheMode.PT} else (cls / CacheLineSize.LINE_4KiB)), \ @@ -438,16 +454,20 @@ def check_stats_read_after_write(exported_obj, mode, cls, write_to_empty=False): (2 if mode.lazy_write() else (0 if mode == CacheMode.PT else 1)), \ "Writes to cache device" assert exported_obj.cache.device.get_stats()[IoDir.READ] == \ - (1 if mode in {CacheMode.WT, CacheMode.WB, CacheMode.WO} or (mode == CacheMode.WA and not write_to_empty) else 0), \ + (1 if mode in {CacheMode.WT, CacheMode.WB, CacheMode.WO} + or (mode == CacheMode.WA and not write_to_empty) else 0), \ "Reads from cache device" assert exported_obj.device.get_stats()[IoDir.READ] == \ - (0 if mode in {CacheMode.WB, CacheMode.WO, CacheMode.WT} or (mode == CacheMode.WA and not write_to_empty) else 1), \ + (0 if mode in {CacheMode.WB, CacheMode.WO, CacheMode.WT} + or (mode == CacheMode.WA and not write_to_empty) else 1), \ "Reads from core device" - assert stats["req"]["rd_full_misses"]["value"] == (1 if mode in {CacheMode.WA, CacheMode.WI} else 0) \ + assert stats["req"]["rd_full_misses"]["value"] == \ + (1 if mode in {CacheMode.WA, CacheMode.WI} else 0) \ + (0 if write_to_empty or mode in {CacheMode.PT, CacheMode.WA} else 1), \ "Read full misses" assert stats["req"]["rd_hits"]["value"] == \ - (1 if mode in {CacheMode.WT, CacheMode.WB, CacheMode.WO} or (mode == CacheMode.WA and not write_to_empty) else 0), \ + (1 if mode in {CacheMode.WT, CacheMode.WB, CacheMode.WO} + or (mode == CacheMode.WA and not write_to_empty) else 0), \ "Read hits" assert stats["usage"]["occupancy"]["value"] == \ (0 if mode == CacheMode.PT else (cls / CacheLineSize.LINE_4KiB)), "Occupancy" @@ -467,4 +487,6 @@ def check_md5_sums(exported_obj: Core, mode: CacheMode): def get_cache_by_id(ctx, cache_id): cache_pointer = c_void_p() - return OcfLib.getInstance().ocf_mngt_cache_get_by_id(ctx.ctx_handle, cache_id, byref(cache_pointer)) + return OcfLib.getInstance().ocf_mngt_cache_get_by_id(ctx.ctx_handle, + cache_id, + byref(cache_pointer)) diff --git a/tests/functional/tests/security/test_management_start_fuzzy.py b/tests/functional/tests/security/test_management_start_fuzzy.py index 706485e..07db815 100644 --- a/tests/functional/tests/security/test_management_start_fuzzy.py +++ b/tests/functional/tests/security/test_management_start_fuzzy.py @@ -12,7 +12,6 @@ from pyocf.utils import Size from pyocf.types.shared import OcfError, CacheLineSize from ctypes import c_uint32 - logger = logging.getLogger(__name__) @@ -51,7 +50,8 @@ def test_fuzzy_start_cache_line_size(pyocf_ctx, c_uint64_randomize, cm): with pytest.raises(OcfError, match="OCF_ERR_INVALID_CACHE_LINE_SIZE"): try_start_cache(cache_mode=cm, cache_line_size=c_uint64_randomize) else: - logger.warning(f"Test skipped for valid cache line size enum value: '{c_uint64_randomize}'. ") + logger.warning( + f"Test skipped for valid cache line size enum value: '{c_uint64_randomize}'. ") @pytest.mark.security @@ -67,8 +67,9 @@ def test_fuzzy_start_name(pyocf_ctx, string_randomize, cm, cls): """ cache_device = Volume(Size.from_MiB(30)) try: - cache = Cache.start_on_device(cache_device, name=string_randomize, cache_mode=cm, cache_line_size=cls) - except: + cache = Cache.start_on_device(cache_device, name=string_randomize, cache_mode=cm, + cache_line_size=cls) + except OcfError: logger.error(f"Cache did not start properly with correct name value: {string_randomize}") cache.stop() @@ -107,7 +108,8 @@ def test_fuzzy_start_eviction_policy(pyocf_ctx, c_uint32_randomize, cm, cls): with pytest.raises(OcfError, match="OCF_ERR_INVAL"): try_start_cache(eviction_policy=c_uint32_randomize, cache_mode=cm, cache_line_size=cls) else: - logger.warning(f"Test skipped for valid eviction policy enum value: '{c_uint32_randomize}'. ") + logger.warning( + f"Test skipped for valid eviction policy enum value: '{c_uint32_randomize}'. ") @pytest.mark.security @@ -125,7 +127,8 @@ def test_fuzzy_start_metadata_layout(pyocf_ctx, c_uint32_randomize, cm, cls): with pytest.raises(OcfError, match="OCF_ERR_INVAL"): try_start_cache(metadata_layout=c_uint32_randomize, cache_mode=cm, cache_line_size=cls) else: - logger.warning(f"Test skipped for valid metadata layout enum value: '{c_uint32_randomize}'. ") + logger.warning( + f"Test skipped for valid metadata layout enum value: '{c_uint32_randomize}'. ") @pytest.mark.security @@ -133,7 +136,8 @@ def test_fuzzy_start_metadata_layout(pyocf_ctx, c_uint32_randomize, cm, cls): @pytest.mark.parametrize('max_wb_queue_size', generate_random_numbers(c_uint32, 10)) def test_fuzzy_start_max_queue_size(pyocf_ctx, max_wb_queue_size, c_uint32_randomize, cls): """ - Test whether it is impossible to start cache with invalid dependence between max queue size and queue unblock size. + Test whether it is impossible to start cache with invalid dependence between max queue size + and queue unblock size. :param pyocf_ctx: basic pyocf context fixture :param max_wb_queue_size: max queue size value to start cache with :param c_uint32_randomize: queue unblock size value to start cache with @@ -148,4 +152,5 @@ def test_fuzzy_start_max_queue_size(pyocf_ctx, max_wb_queue_size, c_uint32_rando cache_line_size=cls) else: logger.warning(f"Test skipped for valid values: " - f"'max_queue_size={max_wb_queue_size}, queue_unblock_size={c_uint32_randomize}'.") + f"'max_queue_size={max_wb_queue_size}, " + f"queue_unblock_size={c_uint32_randomize}'.") diff --git a/tests/unit/framework/add_new_test_file.py b/tests/unit/framework/add_new_test_file.py index 2058c01..92e8713 100755 --- a/tests/unit/framework/add_new_test_file.py +++ b/tests/unit/framework/add_new_test_file.py @@ -11,167 +11,169 @@ import os import sys import textwrap + class TestGenerator(object): - main_UT_dir = "" - main_tested_dir = "" - tested_file_path = "" - tested_function_name = "" + main_UT_dir = "" + main_tested_dir = "" + tested_file_path = "" + tested_function_name = "" - def __init__(self, main_UT_dir, main_tested_dir, file_path, func_name): - self.set_main_UT_dir(main_UT_dir) - self.set_main_tested_dir(main_tested_dir) - self.set_tested_file_path(file_path) - self.tested_function_name = func_name + def __init__(self, main_UT_dir, main_tested_dir, file_path, func_name): + self.set_main_UT_dir(main_UT_dir) + self.set_main_tested_dir(main_tested_dir) + self.set_tested_file_path(file_path) + self.tested_function_name = func_name - def create_empty_test_file(self): - dst_dir = os.path.dirname(self.get_tested_file_path()[::-1])[::-1] + def create_empty_test_file(self): + dst_dir = os.path.dirname(self.get_tested_file_path()[::-1])[::-1] - self.create_dir_if_not_exist(self.get_main_UT_dir() + dst_dir) - test_file_name = os.path.basename(self.get_tested_file_path()) + self.create_dir_if_not_exist(self.get_main_UT_dir() + dst_dir) + test_file_name = os.path.basename(self.get_tested_file_path()) - dst_path = self.get_main_UT_dir() + dst_dir + "/" + test_file_name + dst_path = self.get_main_UT_dir() + dst_dir + "/" + test_file_name - no_str = "" - no = 0 - while True: - if not os.path.isfile(dst_path.rsplit(".", 1)[0] + no_str + "." + dst_path.rsplit(".", 1)[1]): - break - no += 1 - no_str = str(no) + no_str = "" + no = 0 + while True: + if not os.path.isfile("{0}{1}.{2}".format(dst_path.rsplit(".", 1)[0], no_str, + dst_path.rsplit(".", 1)[1])): + break + no += 1 + no_str = str(no) - dst_path = dst_path.rsplit(".", 1)[0] + no_str + "." + dst_path.rsplit(".", 1)[1] - buf = self.get_markups() - buf += "#undef static\n\n" - buf += "#undef inline\n\n" - buf += self.get_UT_includes() - buf += self.get_includes(self.get_main_tested_dir() + self.get_tested_file_path()) - buf += self.get_autowrap_file_include(dst_path) - buf += self.get_empty_test_function() - buf += self.get_test_main() + dst_path = dst_path.rsplit(".", 1)[0] + no_str + "." + dst_path.rsplit(".", 1)[1] + buf = self.get_markups() + buf += "#undef static\n\n" + buf += "#undef inline\n\n" + buf += self.get_UT_includes() + buf += self.get_includes(self.get_main_tested_dir() + self.get_tested_file_path()) + buf += self.get_autowrap_file_include(dst_path) + buf += self.get_empty_test_function() + buf += self.get_test_main() - with open(dst_path, "w") as f: - f.writelines(buf) + with open(dst_path, "w") as f: + f.writelines(buf) - print(f"{dst_path} generated successfully!") + print(f"{dst_path} generated successfully!") - def get_markups(self): - ret = "/*\n" - ret += " * " + self.get_tested_file_path() + "\n" - ret += " * " + self.get_tested_function_name() + "\n" - ret += " * \n" - ret += " *\tINSERT HERE LIST OF FUNCTIONS YOU WANT TO LEAVE\n" - ret += " *\tONE FUNCTION PER LINE\n" - ret += " * \n" - ret += " */\n\n" + def get_markups(self): + ret = "/*\n" + ret += " * " + self.get_tested_file_path() + "\n" + ret += " * " + self.get_tested_function_name() + "\n" + ret += " * \n" + ret += " *\tINSERT HERE LIST OF FUNCTIONS YOU WANT TO LEAVE\n" + ret += " *\tONE FUNCTION PER LINE\n" + ret += " * \n" + ret += " */\n\n" - return ret + return ret - def create_dir_if_not_exist(self, path): - if not os.path.isdir(path): - try: - os.makedirs(path) - except Exception: - pass - return True - return None + def create_dir_if_not_exist(self, path): + if not os.path.isdir(path): + try: + os.makedirs(path) + except Exception: + pass + return True + return None + def get_UT_includes(self): + ret = ''' + #include + #include + #include + #include + #include "print_desc.h"\n\n''' - def get_UT_includes(self): - ret = ''' - #include - #include - #include - #include - #include "print_desc.h"\n\n''' + return textwrap.dedent(ret) - return textwrap.dedent(ret) + def get_autowrap_file_include(self, test_file_path): + autowrap_file = test_file_path.rsplit(".", 1)[0] + autowrap_file = autowrap_file.replace(self.main_UT_dir, "") + autowrap_file += "_generated_warps.c" + return "#include \"" + autowrap_file + "\"\n\n" - def get_autowrap_file_include(self, test_file_path): - autowrap_file = test_file_path.rsplit(".", 1)[0] - autowrap_file = autowrap_file.replace(self.main_UT_dir, "") - autowrap_file += "_generated_warps.c" - return "#include \"" + autowrap_file + "\"\n\n" + def get_includes(self, abs_path_to_tested_file): + with open(abs_path_to_tested_file, "r") as f: + code = f.readlines() - def get_includes(self, abs_path_to_tested_file): - with open(abs_path_to_tested_file, "r") as f: - code = f.readlines() + ret = [line for line in code if re.search(r'#include', line)] - ret = [line for line in code if re.search(r'#include', line)] + return "".join(ret) + "\n" - return "".join(ret) + "\n" + def get_empty_test_function(self): + ret = "static void " + self.get_tested_function_name() + "_test01(void **state)\n" + ret += "{\n" + ret += "\tprint_test_description(\"Put test description here\");\n" + ret += "\tassert_int_equal(1,1);\n" + ret += "}\n\n" - def get_empty_test_function(self): - ret = "static void " + self.get_tested_function_name() + "_test01(void **state)\n" - ret += "{\n" - ret += "\tprint_test_description(\"Put test description here\");\n" - ret += "\tassert_int_equal(1,1);\n" - ret += "}\n\n" + return ret - return ret + def get_test_main(self): + ret = "int main(void)\n" + ret += "{\n" + ret += "\tconst struct CMUnitTest tests[] = {\n" + ret += "\t\tcmocka_unit_test(" + self.get_tested_function_name() + "_test01)\n" + ret += "\t};\n\n" + ret += "\tprint_message(\"Unit test of " + self.get_tested_file_path() + "\");\n\n" + ret += "\treturn cmocka_run_group_tests(tests, NULL, NULL);\n" + ret += "}" - def get_test_main(self): - ret = "int main(void)\n" - ret += "{\n" - ret += "\tconst struct CMUnitTest tests[] = {\n" - ret += "\t\tcmocka_unit_test(" + self.get_tested_function_name() + "_test01)\n" - ret += "\t};\n\n" - ret += "\tprint_message(\"Unit test of " + self.get_tested_file_path() + "\");\n\n" - ret += "\treturn cmocka_run_group_tests(tests, NULL, NULL);\n" - ret += "}" + return ret - return ret + def set_tested_file_path(self, path): + call_dir = os.getcwd() + os.sep + p = os.path.normpath(call_dir + path) - def set_tested_file_path(self, path): - call_dir = os.getcwd() + os.sep - p = os.path.normpath(call_dir + path) + if os.path.isfile(p): + self.tested_file_path = p.split(self.get_main_tested_dir(), 1)[1] + return + elif os.path.isfile(self.get_main_tested_dir() + path): + self.tested_file_path = path + return - if os.path.isfile(p): - self.tested_file_path = p.split(self.get_main_tested_dir(), 1)[1] - return - elif os.path.isfile(self.get_main_tested_dir() + path): - self.tested_file_path = path - return + print(f"{os.path.join(self.get_main_tested_dir(), path)}") + print("Given path not exists!") + exit(1) - print(f"{os.path.join(self.get_main_tested_dir(), path)}") - print("Given path not exists!") - exit(1) + def set_main_UT_dir(self, path): + p = os.path.dirname(os.path.realpath(__file__)) + os.sep + path + p = os.path.normpath(os.path.dirname(p)) + os.sep + self.main_UT_dir = p + def get_main_UT_dir(self): + return self.main_UT_dir - def set_main_UT_dir(self, path): - p = os.path.dirname(os.path.realpath(__file__)) + os.sep + path - p = os.path.normpath(os.path.dirname(p)) + os.sep - self.main_UT_dir = p + def set_main_tested_dir(self, path): + p = os.path.dirname(os.path.realpath(__file__)) + os.sep + path + p = os.path.normpath(os.path.dirname(p)) + os.sep + self.main_tested_dir = p - def get_main_UT_dir(self): - return self.main_UT_dir + def get_main_tested_dir(self): + return self.main_tested_dir - def set_main_tested_dir(self, path): - p = os.path.dirname(os.path.realpath(__file__)) + os.sep + path - p = os.path.normpath(os.path.dirname(p)) + os.sep - self.main_tested_dir = p + def get_tested_file_path(self): + return self.tested_file_path - def get_main_tested_dir(self): - return self.main_tested_dir + def get_tested_function_name(self): + return self.tested_function_name - def get_tested_file_path(self): - return self.tested_file_path - - def get_tested_function_name(self): - return self.tested_function_name def __main__(): - if len(sys.argv) < 3: - print("No path to tested file or tested function name given !") - sys.exit(1) + if len(sys.argv) < 3: + print("No path to tested file or tested function name given !") + sys.exit(1) - tested_file_path = sys.argv[1] - tested_function_name = sys.argv[2] + tested_file_path = sys.argv[1] + tested_function_name = sys.argv[2] - generator = TestGenerator(tests_config.MAIN_DIRECTORY_OF_UNIT_TESTS,\ - tests_config.MAIN_DIRECTORY_OF_TESTED_PROJECT,\ - tested_file_path, tested_function_name) + generator = TestGenerator(tests_config.MAIN_DIRECTORY_OF_UNIT_TESTS, + tests_config.MAIN_DIRECTORY_OF_TESTED_PROJECT, + tested_file_path, tested_function_name) + + generator.create_empty_test_file() - generator.create_empty_test_file() if __name__ == "__main__": - __main__() + __main__() diff --git a/tests/unit/framework/prepare_sources_for_testing.py b/tests/unit/framework/prepare_sources_for_testing.py index 5cd46ba..06aa557 100755 --- a/tests/unit/framework/prepare_sources_for_testing.py +++ b/tests/unit/framework/prepare_sources_for_testing.py @@ -9,19 +9,20 @@ import shutil import sys import re import os.path -from collections import defaultdict import subprocess +import tests_config + def run_command(args, verbose=True): - result = subprocess.run(" ".join(args), shell=True, - stdout=subprocess.PIPE, stderr=subprocess.PIPE) - result.stdout = result.stdout.decode("ASCII", errors='ignore') - result.stderr = result.stderr.decode("ASCII", errors='ignore') - if verbose: - print(result.stderr) - return result + result = subprocess.run(" ".join(args), shell=True, + stdout=subprocess.PIPE, stderr=subprocess.PIPE) + result.stdout = result.stdout.decode("ASCII", errors='ignore') + result.stderr = result.stderr.decode("ASCII", errors='ignore') + if verbose: + print(result.stderr) + return result + -import tests_config # # This script purpose is to remove unused functions definitions # It is giving the opportunity to unit test all functions from OCF. @@ -37,661 +38,674 @@ import tests_config # class UnitTestsSourcesGenerator(object): - script_file_abs_path = "" - script_dir_abs_path = "" + script_file_abs_path = "" + script_dir_abs_path = "" - main_UT_dir = "" - main_tested_dir = "" + main_UT_dir = "" + main_tested_dir = "" - ctags_path = "" + ctags_path = "" - test_catalouges_list = [] - dirs_to_include_list = [] + test_catalogues_list = [] + dirs_to_include_list = [] - tests_internal_includes_list = [] - framework_includes = [] + tests_internal_includes_list = [] + framework_includes = [] - dirs_with_tests_list = [] - test_files_paths_list = [] + dirs_with_tests_list = [] + test_files_paths_list = [] - tested_files_paths_list = [] + tested_files_paths_list = [] - includes_to_copy_dict = {} + includes_to_copy_dict = {} - preprocessing_repo = "" - sources_to_test_repo = "" + preprocessing_repo = "" + sources_to_test_repo = "" - def __init__(self): - self.script_file_abs_path = os.path.realpath(__file__) - self.script_dir_abs_path = os.path.normpath(os.path.dirname(self.script_file_abs_path) + os.sep) + def __init__(self): + self.script_file_abs_path = os.path.realpath(__file__) + self.script_dir_abs_path = os.path.normpath( + os.path.dirname(self.script_file_abs_path) + os.sep) - self.set_ctags_path() + self.set_ctags_path() - self.set_main_UT_dir() - self.set_main_tested_dir() + self.set_main_UT_dir() + self.set_main_tested_dir() - self.test_catalouges_list = tests_config.DIRECTORIES_WITH_TESTS_LIST - self.set_includes_to_copy_dict(tests_config.INCLUDES_TO_COPY_DICT) - self.set_dirs_to_include() + self.test_catalogues_list = tests_config.DIRECTORIES_WITH_TESTS_LIST + self.set_includes_to_copy_dict(tests_config.INCLUDES_TO_COPY_DICT) + self.set_dirs_to_include() - self.set_tests_internal_includes_list() - self.set_framework_includes() - self.set_files_with_tests_list() - self.set_tested_files_paths_list() + self.set_tests_internal_includes_list() + self.set_framework_includes() + self.set_files_with_tests_list() + self.set_tested_files_paths_list() - self.set_preprocessing_repo() - self.set_sources_to_test_repo() + self.set_preprocessing_repo() + self.set_sources_to_test_repo() - def preprocessing(self): - tested_files_list = self.get_tested_files_paths_list() - project_includes = self.get_dirs_to_include_list() - framework_includes = self.get_tests_internal_includes_list() + def preprocessing(self): + tested_files_list = self.get_tested_files_paths_list() + project_includes = self.get_dirs_to_include_list() + framework_includes = self.get_tests_internal_includes_list() - gcc_flags = " -fno-inline -Dstatic= -Dinline= -E " - gcc_command_template = "gcc " - for path in project_includes: - gcc_command_template += " -I " + path + " " + gcc_flags = " -fno-inline -Dstatic= -Dinline= -E " + gcc_command_template = "gcc " + for path in project_includes: + gcc_command_template += " -I " + path + " " - for path in framework_includes: - gcc_command_template += " -I " + path + for path in framework_includes: + gcc_command_template += " -I " + path - gcc_command_template += gcc_flags + gcc_command_template += gcc_flags - for path in tested_files_list: - preprocessing_dst = self.get_preprocessing_repo() +\ - self.get_relative_path(path, self.get_main_tested_dir()) - preprocessing_dst_dir = os.path.dirname(preprocessing_dst) - self.create_dir_if_not_exist(preprocessing_dst_dir) + for path in tested_files_list: + preprocessing_dst = self.get_preprocessing_repo() \ + + self.get_relative_path(path, self.get_main_tested_dir()) + preprocessing_dst_dir = os.path.dirname(preprocessing_dst) + self.create_dir_if_not_exist(preprocessing_dst_dir) - gcc_command = gcc_command_template +\ - path + " > " + preprocessing_dst + gcc_command = gcc_command_template + path + " > " + preprocessing_dst - result = run_command([gcc_command]) + result = run_command([gcc_command]) - if result.returncode != 0: - print(f"Generating preprocessing for {self.get_main_tested_dir() + path} failed!") - print(result.output) - run_command(["rm", "-f", preprocessing_dst]) - continue + if result.returncode != 0: + print(f"Generating preprocessing for {self.get_main_tested_dir() + path} failed!") + print(result.output) + run_command(["rm", "-f", preprocessing_dst]) + continue - self.remove_hashes(preprocessing_dst) + self.remove_hashes(preprocessing_dst) - print(f"Preprocessed file {path} saved to {preprocessing_dst}") + print(f"Preprocessed file {path} saved to {preprocessing_dst}") - def copy_includes(self): - includes_dict = self.get_includes_to_copy_dict() + def copy_includes(self): + includes_dict = self.get_includes_to_copy_dict() - for dst, src in includes_dict.items(): - src_path = os.path.normpath(self.get_main_tested_dir() + src) - if not os.path.isdir(src_path): - print(f"Directory {src_path} given to include does not exists!") - continue - dst_path = os.path.normpath(self.get_main_UT_dir() + dst) + for dst, src in includes_dict.items(): + src_path = os.path.normpath(self.get_main_tested_dir() + src) + if not os.path.isdir(src_path): + print(f"Directory {src_path} given to include does not exists!") + continue + dst_path = os.path.normpath(self.get_main_UT_dir() + dst) - shutil.rmtree(dst_path, ignore_errors=True) - shutil.copytree(src_path, dst_path) + shutil.rmtree(dst_path, ignore_errors=True) + shutil.copytree(src_path, dst_path) - def get_user_wraps(self, path): - functions_list = self.get_functions_list(path) - functions_list = [re.sub(r'__wrap_([\S]+)\s*[\d]+', r'\1', line) \ - for line in functions_list if re.search("__wrap_", line)] + def get_user_wraps(self, path): + functions_list = self.get_functions_list(path) + functions_list = [re.sub(r'__wrap_([\S]+)\s*[\d]+', r'\1', line) + for line in functions_list if re.search("__wrap_", line)] - return functions_list + return functions_list - def get_autowrap_file_path(self, test_file_path): - wrap_file_path = test_file_path.rsplit('.', 1)[0] - wrap_file_path = wrap_file_path + "_generated_warps.c" - return wrap_file_path + def get_autowrap_file_path(self, test_file_path): + wrap_file_path = test_file_path.rsplit('.', 1)[0] + wrap_file_path = wrap_file_path + "_generated_warps.c" + return wrap_file_path - def prepare_autowraps(self, test_file_path, preprocessed_file_path): - functions_to_wrap = self.get_functions_calls( - self.get_sources_to_test_repo() + test_file_path) - user_wraps = set(self.get_user_wraps(self.get_main_UT_dir() + test_file_path)) + def prepare_autowraps(self, test_file_path, preprocessed_file_path): + functions_to_wrap = self.get_functions_calls( + self.get_sources_to_test_repo() + test_file_path) + user_wraps = set(self.get_user_wraps(self.get_main_UT_dir() + test_file_path)) - functions_to_wrap = functions_to_wrap - user_wraps + functions_to_wrap = functions_to_wrap - user_wraps - tags_list = self.get_functions_list(preprocessed_file_path, prototypes=True) + tags_list = self.get_functions_list(preprocessed_file_path, prototypes=True) - wrap_list = [] + wrap_list = [] - with open(preprocessed_file_path) as f: - code = f.readlines() - for function in functions_to_wrap: - if function.startswith("env_") or function.startswith("bug"): - continue - for tag in tags_list: - if function in tag: - name, line = tag.split() - if name == function: - line = int(line) - wrap_list.append(self.get_function_wrap(code, line)) - break + with open(preprocessed_file_path) as f: + code = f.readlines() + for function in functions_to_wrap: + if function.startswith("env_") or function.startswith("bug"): + continue + for tag in tags_list: + if function in tag: + name, line = tag.split() + if name == function: + line = int(line) + wrap_list.append(self.get_function_wrap(code, line)) + break - wrap_file_path = self.get_main_UT_dir() + self.get_autowrap_file_path(test_file_path) + wrap_file_path = self.get_main_UT_dir() + self.get_autowrap_file_path(test_file_path) - with open(wrap_file_path, "w") as f: - f.write("/* This is file is generated by UT framework */\n") - for wrap in wrap_list: - f.write(wrap + "\n") + with open(wrap_file_path, "w") as f: + f.write("/* This is file is generated by UT framework */\n") + for wrap in wrap_list: + f.write(wrap + "\n") - def prepare_sources_for_testing(self): - test_files_paths = self.get_files_with_tests_list() + def prepare_sources_for_testing(self): + test_files_paths = self.get_files_with_tests_list() - for test_path in test_files_paths: - path = self.get_tested_file_path(self.get_main_UT_dir() + test_path) + for test_path in test_files_paths: + path = self.get_tested_file_path(self.get_main_UT_dir() + test_path) - preprocessed_tested_path = self.get_preprocessing_repo() + path - if not os.path.isfile(preprocessed_tested_path): - print(f"No preprocessed path for {test_path} test file.") - continue + preprocessed_tested_path = self.get_preprocessing_repo() + path + if not os.path.isfile(preprocessed_tested_path): + print(f"No preprocessed path for {test_path} test file.") + continue - tested_src = self.get_src_to_test(test_path, preprocessed_tested_path) + tested_src = self.get_src_to_test(test_path, preprocessed_tested_path) - self.create_dir_if_not_exist(self.get_sources_to_test_repo() + os.path.dirname(test_path)) + self.create_dir_if_not_exist( + self.get_sources_to_test_repo() + os.path.dirname(test_path)) - with open(self.get_sources_to_test_repo() + test_path, "w") as f: - f.writelines(tested_src) - print(f"Sources for {test_path} saved in {self.get_sources_to_test_repo() + test_path}") + with open(self.get_sources_to_test_repo() + test_path, "w") as f: + f.writelines(tested_src) + print( + f"Sources for {test_path} saved in + \ + {self.get_sources_to_test_repo() + test_path}") - self.prepare_autowraps(test_path, preprocessed_tested_path) + self.prepare_autowraps(test_path, preprocessed_tested_path) - def create_main_cmake_lists(self): - buf = "cmake_minimum_required(VERSION 2.6.0)\n\n" - buf += "project(OCF_unit_tests C)\n\n" + def create_main_cmake_lists(self): + buf = "cmake_minimum_required(VERSION 2.6.0)\n\n" + buf += "project(OCF_unit_tests C)\n\n" - buf += "enable_testing()\n\n" + buf += "enable_testing()\n\n" - buf += "include_directories(\n" - dirs_to_inc = self.get_dirs_to_include_list() + self.get_framework_includes()\ - + self.get_tests_internal_includes_list() - for path in dirs_to_inc: - buf += "\t" + path + "\n" - buf += ")\n\n" + buf += "include_directories(\n" + dirs_to_inc = self.get_dirs_to_include_list() + self.get_framework_includes() \ + + self.get_tests_internal_includes_list() + for path in dirs_to_inc: + buf += "\t" + path + "\n" + buf += ")\n\n" - includes = self.get_tests_internal_includes_list() - for path in includes: - buf += "\nadd_subdirectory(" + path + ")" - buf += "\n\n" + includes = self.get_tests_internal_includes_list() + for path in includes: + buf += "\nadd_subdirectory(" + path + ")" + buf += "\n\n" - test_files = self.get_files_with_tests_list() - test_dirs_to_include = [os.path.dirname(path) for path in test_files] + test_files = self.get_files_with_tests_list() + test_dirs_to_include = [os.path.dirname(path) for path in test_files] - test_dirs_to_include = self.remove_duplicates_from_list(test_dirs_to_include) + test_dirs_to_include = self.remove_duplicates_from_list(test_dirs_to_include) - for path in test_dirs_to_include: - buf += "\nadd_subdirectory(" + self.get_sources_to_test_repo() + path + ")" + for path in test_dirs_to_include: + buf += "\nadd_subdirectory(" + self.get_sources_to_test_repo() + path + ")" + with open(self.get_main_UT_dir() + "CMakeLists.txt", "w") as f: + f.writelines(buf) - with open(self.get_main_UT_dir() + "CMakeLists.txt", "w") as f: - f.writelines(buf) + print(f"Main CMakeLists.txt generated written to {self.get_main_UT_dir()} CMakeLists.txt") - print(f"Main CMakeLists.txt generated written to {self.get_main_UT_dir()} CMakeLists.txt") + def generate_cmakes_for_tests(self): + test_files_paths = self.get_files_with_tests_list() - def generate_cmakes_for_tests(self): - test_files_paths = self.get_files_with_tests_list() + for test_path in test_files_paths: + tested_file_path = self.get_sources_to_test_repo() + test_path + if not os.path.isfile(tested_file_path): + print(f"No source to test for {test_path} test") + continue - for test_path in test_files_paths: - tested_file_path = self.get_sources_to_test_repo() + test_path - if not os.path.isfile(tested_file_path): - print(f"No source to test for {test_path} test") - continue + test_file_path = self.get_main_UT_dir() + test_path - test_file_path = self.get_main_UT_dir() + test_path + cmake_buf = self.generate_test_cmake_buf(test_file_path, tested_file_path) - cmake_buf = self.generate_test_cmake_buf(test_file_path, tested_file_path) + cmake_path = self.get_sources_to_test_repo() + test_path + cmake_path = os.path.splitext(cmake_path)[0] + ".cmake" + with open(cmake_path, "w") as f: + f.writelines(cmake_buf) + print(f"cmake file for {test_path} written to {cmake_path}") - cmake_path = self.get_sources_to_test_repo() + test_path - cmake_path = os.path.splitext(cmake_path)[0] + ".cmake" - with open(cmake_path, "w") as f: - f.writelines(cmake_buf) - print(f"cmake file for {test_path} written to {cmake_path}") + cmake_lists_path = os.path.dirname(cmake_path) + os.sep + self.update_cmakelists(cmake_lists_path, cmake_path) - cmake_lists_path = os.path.dirname(cmake_path) + os.sep - self.update_cmakelists(cmake_lists_path, cmake_path) + def generate_test_cmake_buf(self, test_file_path, tested_file_path): + test_file_name = os.path.basename(test_file_path) + target_name = os.path.splitext(test_file_name)[0] - def generate_test_cmake_buf(self, test_file_path, tested_file_path): - test_file_name = os.path.basename(test_file_path) - target_name = os.path.splitext(test_file_name)[0] + add_executable = "add_executable(" + target_name + " " + test_file_path + " " + \ + tested_file_path + ")\n" - add_executable = "add_executable(" + target_name + " " + test_file_path + " " + tested_file_path + ")\n" + libraries = "target_link_libraries(" + target_name + " libcmocka.so ocf_env)\n" - libraries = "target_link_libraries(" + target_name + " libcmocka.so ocf_env)\n" + add_test = "add_test(" + target_name + " ${CMAKE_CURRENT_BINARY_DIR}/" + target_name + ")\n" - add_test = "add_test(" + target_name + " ${CMAKE_CURRENT_BINARY_DIR}/" + target_name + ")\n" + tgt_properties = "set_target_properties(" + target_name + "\n" + \ + "PROPERTIES\n" + \ + "COMPILE_FLAGS \"-fno-inline -Dstatic= -Dinline= -w \"\n" - tgt_properties = "set_target_properties(" + target_name + "\n" + \ - "PROPERTIES\n" + \ - "COMPILE_FLAGS \"-fno-inline -Dstatic= -Dinline= -w \"\n" + link_flags = self.generate_cmake_link_flags(test_file_path) + tgt_properties += link_flags + ")" - link_flags = self.generate_cmake_link_flags(test_file_path) - tgt_properties += link_flags + ")" + buf = add_executable + libraries + add_test + tgt_properties - buf = add_executable + libraries + add_test + tgt_properties + return buf - return buf + def generate_cmake_link_flags(self, path): + ret = "" - def generate_cmake_link_flags(self, path): - ret = "" + autowraps_path = self.get_autowrap_file_path(path) + functions_to_wrap = self.get_functions_to_wrap(path) + functions_to_wrap += self.get_functions_to_wrap(autowraps_path) - autowraps_path = self.get_autowrap_file_path(path) - functions_to_wrap = self.get_functions_to_wrap(path) - functions_to_wrap += self.get_functions_to_wrap(autowraps_path) + for function_name in functions_to_wrap: + ret += ",--wrap=" + function_name + if len(ret) > 0: + ret = "LINK_FLAGS \"-Wl" + ret + "\"\n" - for function_name in functions_to_wrap: - ret += ",--wrap=" + function_name - if len(ret) > 0: - ret = "LINK_FLAGS \"-Wl" + ret + "\"\n" + return ret - return ret + def update_cmakelists(self, cmake_lists_path, cmake_name): + with open(cmake_lists_path + "CMakeLists.txt", "a+") as f: + f.seek(0, os.SEEK_SET) + new_line = "include(" + os.path.basename(cmake_name) + ")\n" - def update_cmakelists(self, cmake_lists_path, cmake_name): - with open(cmake_lists_path + "CMakeLists.txt", "a+") as f: - f.seek(0, os.SEEK_SET) - new_line = "include(" + os.path.basename(cmake_name) + ")\n" + if new_line not in f.read(): + f.write(new_line) - if not new_line in f.read(): - f.write(new_line) + def get_functions_to_wrap(self, path): + functions_list = self.get_functions_list(path) + functions_list = [re.sub(r'__wrap_([\S]+)\s*[\d]+', r'\1', line) for line in functions_list + if re.search("__wrap_", line)] - def get_functions_to_wrap(self, path): - functions_list = self.get_functions_list(path) - functions_list = [re.sub(r'__wrap_([\S]+)\s*[\d]+', r'\1', line) for line in functions_list if re.search("__wrap_", line)] + return functions_list - return functions_list + def get_functions_to_leave(self, path): + with open(path) as f: + lines = f.readlines() + buf = ''.join(lines) - def get_functions_to_leave(self, path): - with open(path) as f: - l = f.readlines() - buf = ''.join(l) + tags_pattern = re.compile(r"[\s\S]*") - tags_pattern = re.compile("[\s\S]*") + buf = re.findall(tags_pattern, buf) + if not len(buf) > 0: + return [] - buf = re.findall(tags_pattern, buf) - if not len(buf) > 0: - return [] + buf = buf[0] - buf = buf[0] + buf = re.sub(r'<.*>', '', buf) + buf = re.sub(r'[^a-zA-Z0-9_\n]+', '', buf) - buf = re.sub(r'<.*>', '', buf) - buf = re.sub(r'[^a-zA-Z0-9_\n]+', '', buf) + ret = buf.split("\n") + ret = [name for name in ret if name] + return ret - ret = buf.split("\n") - ret = [name for name in ret if name] - return ret + def get_functions_list(self, file_path, prototypes=None): + ctags_path = self.get_ctags_path() - def get_functions_list(self, file_path, prototypes=None): - ctags_path = self.get_ctags_path() + ctags_args = "--c-types=f" + if prototypes: + ctags_args += " --c-kinds=+p" + # find all functions' definitions | put tabs instead of spaces | + # take only columns with function name and line number | sort in descending order + result = run_command([ctags_path, "-x", ctags_args, file_path, + "--language-force=c | sed \"s/ \\+/\t/g\" | cut -f 1,3 | sort -nsr " + "-k 2"]) - ctags_args = "--c-types=f" - if prototypes == True: - ctags_args += " --c-kinds=+p" - # find all functions' definitions | put tabs instead of spaces | - # take only columns with function name and line number | sort in descending order - result = run_command([ctags_path, "-x", ctags_args, file_path, - "--language-force=c | sed \"s/ \\+/\t/g\" | cut -f 1,3 | sort -nsr -k 2"]) + # 'output' is string, but it has to be changed to list + output = list(filter(None, result.stdout.split("\n"))) + return output - # 'output' is string, but it has to be changed to list - output = list(filter(None, result.stdout.split("\n"))) - return output + def remove_functions_from_list(self, functions_list, to_remove_list): + ret = functions_list[:] + for function_name in to_remove_list: + ret = [line for line in ret if not re.search(r'\b%s\b' % function_name, line)] + return ret - def remove_functions_from_list(self, functions_list, to_remove_list): - ret = functions_list[:] - for function_name in to_remove_list: - ret = [line for line in ret if not re.search(r'\b%s\b' % function_name, line)] - return ret + def get_src_to_test(self, test_path, preprocessed_tested_path): + functions_to_leave = self.get_functions_to_leave(self.get_main_UT_dir() + test_path) - def get_src_to_test(self, test_path, preprocessed_tested_path): - functions_to_leave = self.get_functions_to_leave(self.get_main_UT_dir() + test_path) + functions_to_leave.append(self.get_tested_function_name(self.get_main_UT_dir() + test_path)) + functions_list = self.get_functions_list(preprocessed_tested_path) - functions_to_leave.append(self.get_tested_function_name(self.get_main_UT_dir() + test_path)) - functions_list = self.get_functions_list(preprocessed_tested_path) + functions_list = self.remove_functions_from_list(functions_list, functions_to_leave) - functions_list = self.remove_functions_from_list(functions_list, functions_to_leave) + with open(preprocessed_tested_path) as f: + ret = f.readlines() + for function in functions_list: + line = function.split("\t")[1] + line = int(line) - with open(preprocessed_tested_path) as f: - ret = f.readlines() - for function in functions_list: - line = function.split("\t")[1] - line = int(line) + self.remove_function_body(ret, line) - self.remove_function_body(ret, line) + return ret - return ret + def set_tested_files_paths_list(self): + test_files_list = self.get_files_with_tests_list() - def set_tested_files_paths_list(self): - test_files_list = self.get_files_with_tests_list() + for f in test_files_list: + self.tested_files_paths_list.append(self.get_main_tested_dir() + + self.get_tested_file_path( + self.get_main_UT_dir() + f)) - for f in test_files_list: - self.tested_files_paths_list.append(self.get_main_tested_dir() +\ - self.get_tested_file_path(self.get_main_UT_dir() + f)) + self.tested_files_paths_list = self.remove_duplicates_from_list( + self.tested_files_paths_list) - self.tested_files_paths_list = self.remove_duplicates_from_list(self.tested_files_paths_list) + def get_tested_files_paths_list(self): + return self.tested_files_paths_list - def get_tested_files_paths_list(self): - return self.tested_files_paths_list + def get_files_with_tests_list(self): + return self.test_files_paths_list - def get_files_with_tests_list(self): - return self.test_files_paths_list + def set_files_with_tests_list(self): + test_catalogues_list = self.get_tests_catalouges_list() + for catalogue in test_catalogues_list: + dir_with_tests_path = self.get_main_UT_dir() + catalogue - def set_files_with_tests_list(self): - test_catalogues_list = self.get_tests_catalouges_list() - for catalogue in test_catalogues_list: - dir_with_tests_path = self.get_main_UT_dir() + catalogue + for path, dirs, files in os.walk(dir_with_tests_path): + test_files = self.get_test_files_from_dir(path + os.sep) - for path, dirs, files in os.walk(dir_with_tests_path): - test_files = self.get_test_files_from_dir(path + os.sep) + for test_file_name in test_files: + test_rel_path = os.path.relpath(path + os.sep + test_file_name, + self.get_main_UT_dir()) + self.test_files_paths_list.append(test_rel_path) - for test_file_name in test_files: - test_rel_path = os.path.relpath(path + os.sep + test_file_name, self.get_main_UT_dir()) - self.test_files_paths_list.append(test_rel_path) + def are_markups_valid(self, path): + file_path = self.get_tested_file_path(path) + function_name = self.get_tested_function_name(path) - def are_markups_valid(self, path): - file_path = self.get_tested_file_path(path) - function_name = self.get_tested_function_name(path) + if file_path is None: + print(f"{path} file has no tested_file tag!") + return None + elif not os.path.isfile(self.get_main_tested_dir() + file_path): + print(f"Tested file given in {path} does not exist!") + return None - if file_path is None: - print(f"{path} file has no tested_file tag!") - return None - elif not os.path.isfile(self.get_main_tested_dir() + file_path): - print(f"Tested file given in {path} does not exist!") - return None + if function_name is None: + print(f"{path} file has no tested_function_name tag!") + return None - if function_name is None: - print(f"{path} file has no tested_function_name tag!") - return None + return True - return True + def create_dir_if_not_exist(self, path): + if not os.path.isdir(path): + try: + os.makedirs(path) + except Exception: + pass + return True + return None - def create_dir_if_not_exist(self, path): - if not os.path.isdir(path): - try: - os.makedirs(path) - except Exception: - pass - return True - return None + def get_tested_file_path(self, test_file_path): + with open(test_file_path) as f: + buf = f.readlines() + buf = ''.join(buf) - def get_tested_file_path(self, test_file_path): - with open(test_file_path) as f: - buf = f.readlines() - buf = ''.join(buf) + tags_pattern = re.compile(r"[\s\S]*") + buf = re.findall(tags_pattern, buf) - tags_pattern = re.compile("[\s\S]*") - buf = re.findall(tags_pattern, buf) + if not len(buf) > 0: + return None - if not len(buf) > 0: - return None + buf = buf[0] - buf = buf[0] + buf = re.sub(r'<[^>]*>', '', buf) + buf = re.sub(r'\s+', '', buf) - buf = re.sub(r'<[^>]*>', '', buf) - buf = re.sub(r'\s+', '', buf) + if len(buf) > 0: + return buf - if len(buf) > 0: - return buf + return None - return None + def get_tested_function_name(self, test_file_path): + with open(test_file_path) as f: + buf = f.readlines() + buf = ''.join(buf) - def get_tested_function_name(self, test_file_path): - with open(test_file_path) as f: - buf = f.readlines() - buf = ''.join(buf) + tags_pattern = re.compile(r"[\s\S]*") + buf = re.findall(tags_pattern, buf) - tags_pattern = re.compile("[\s\S]*") - buf = re.findall(tags_pattern, buf) + if not len(buf) > 0: + return None - if not len(buf) > 0: - return None + buf = buf[0] - buf = buf[0] + buf = re.sub(r'<[^>]*>', '', buf) + buf = re.sub('//', '', buf) + buf = re.sub(r'\s+', '', buf) - buf = re.sub(r'<[^>]*>', '', buf) - buf = re.sub('//', '', buf) - buf = re.sub(r'\s+', '', buf) + if len(buf) > 0: + return buf - if len(buf) > 0: - return buf + return None + + def get_test_files_from_dir(self, path): + ret = os.listdir(path) + ret = [name for name in ret if os.path.isfile(path + os.sep + name) + and (name.endswith(".c") or name.endswith(".h"))] + ret = [name for name in ret if self.are_markups_valid(path + name)] + + return ret + + def get_list_of_directories(self, path): + if not os.path.isdir(path): + return [] + + ret = os.listdir(path) + ret = [name for name in ret if not os.path.isfile(path + os.sep + name)] + ret = [os.path.normpath(name) + os.sep for name in ret] + + return ret + + def remove_hashes(self, path): + with open(path) as f: + buf = f.readlines() + + buf = [l for l in buf if not re.search(r'.*#.*', l)] + + with open(path, "w") as f: + f.writelines(buf) + + return + for i in range(len(padding)): + try: + padding[i] = padding[i].split("#")[0] + except ValueError: + continue + + f = open(path, "w") + f.writelines(padding) + f.close() + + def find_function_end(self, code_lines_list, first_line_of_function_index): + brackets_counter = 0 + current_line_index = first_line_of_function_index + + while True: + if "{" in code_lines_list[current_line_index]: + brackets_counter += code_lines_list[current_line_index].count("{") + brackets_counter -= code_lines_list[current_line_index].count("}") + break + else: + current_line_index += 1 + + while brackets_counter > 0: + current_line_index += 1 + if "{" in code_lines_list[current_line_index]: + brackets_counter += code_lines_list[current_line_index].count("{") + brackets_counter -= code_lines_list[current_line_index].count("}") + elif "}" in code_lines_list[current_line_index]: + brackets_counter -= code_lines_list[current_line_index].count("}") + + return current_line_index + + def get_functions_calls(self, file_to_compile): + out_dir = "/tmp/ocf_ut" + out_file = out_dir + "/ocf_obj.o" + self.create_dir_if_not_exist(out_dir) + cmd = "/usr/bin/gcc -o " + out_file + " -c " + file_to_compile + " &> /dev/null" + run_command([cmd], verbose=None) + result = run_command(["/usr/bin/nm -u " + out_file + " | cut -f2 -d\'U\'"]) + return set(result.stdout.split()) + + def remove_function_body(self, code_lines_list, line_id): + try: + while "{" not in code_lines_list[line_id]: + if ";" in code_lines_list[line_id]: + return + line_id += 1 + except IndexError: + return + + last_line_id = self.find_function_end(code_lines_list, line_id) + + code_lines_list[line_id] = code_lines_list[line_id].split("{")[0] + code_lines_list[line_id] += ";" + + del code_lines_list[line_id + 1: last_line_id + 1] + + def get_function_wrap(self, code_lines_list, line_id): + ret = [] + # Line numbering starts with one, list indexing with zero + line_id -= 1 + + # If returned type is not present, it should be in line above + try: + code_lines_list[line_id].split("(")[0].rsplit()[1] + except IndexError: + line_id -= 1 + + while True: + ret.append(code_lines_list[line_id]) + if ")" in code_lines_list[line_id]: + break + line_id += 1 + + # Tags list contains both prototypes and definitions, here we recoginze + # with which one we deals + delimiter = "" + try: + if "{" in ret[-1] or "{" in ret[-2]: + delimter = "{" + else: + delimiter = ";" + except IndexError: + delimiter = ";" + + ret[-1] = ret[-1].split(delimiter)[0] + ret[-1] += "{}" + + function_name = "" + line_with_name = 0 + try: + function_name = ret[line_with_name].split("(")[0].rsplit(maxsplit=1)[1] + except IndexError: + line_with_name = 1 + function_name = ret[line_with_name].split("(")[0] + + function_new_name = "__wrap_" + function_name.replace("*", "") + ret[0] = ret[0].replace(function_name, function_new_name) + + return ''.join(ret) + + def set_ctags_path(self): + result = run_command(["/usr/bin/ctags --version &> /dev/null"]) + if result.returncode == 0: + path = "/usr/bin/ctags " + result = run_command([path, "--c-types=f"], verbose=None) + if not re.search("unrecognized option", result.stdout, re.IGNORECASE): + self.ctags_path = path + return + + result = run_command(["/usr/local/bin/ctags --version &> /dev/null"]) + if result.returncode == 0: + path = "/usr/local/bin/ctags " + result = run_command(["path", "--c-types=f"], verbose=None) + if not re.search("unrecognized option", result.stdout, re.IGNORECASE): + self.ctags_path = path + return + + print("ERROR: Current ctags version don't support \"--c-types=f\" parameter!") + exit(1) + + def get_ctags_path(self): + return self.ctags_path + + def get_tests_catalouges_list(self): + return self.test_catalogues_list + + def get_relative_path(self, original_path, part_to_remove): + return original_path.split(part_to_remove, 1)[1] + + def get_dirs_to_include_list(self): + return self.dirs_to_include_list + + def set_dirs_to_include(self): + self.dirs_to_include_list = [self.get_main_tested_dir() + name + for name in + tests_config.DIRECTORIES_TO_INCLUDE_FROM_PROJECT_LIST] + + def set_tests_internal_includes_list(self): + self.tests_internal_includes_list = [self.get_main_UT_dir() + name + for name in + tests_config.DIRECTORIES_TO_INCLUDE_FROM_UT_LIST] + + def set_preprocessing_repo(self): + self.preprocessing_repo = self.get_main_UT_dir() \ + + tests_config.PREPROCESSED_SOURCES_REPOSITORY + + def set_sources_to_test_repo(self): + self.sources_to_test_repo = self.get_main_UT_dir() + tests_config.SOURCES_TO_TEST_REPOSITORY + + def get_sources_to_test_repo(self): + return self.sources_to_test_repo + + def get_preprocessing_repo(self): + return self.preprocessing_repo + + def get_tests_internal_includes_list(self): + return self.tests_internal_includes_list + + def get_script_dir_path(self): + return os.path.normpath(self.script_dir_abs_path) + os.sep + + def get_main_UT_dir(self): + return os.path.normpath(self.main_UT_dir) + os.sep + + def get_main_tested_dir(self): + return os.path.normpath(self.main_tested_dir) + os.sep + + def remove_duplicates_from_list(self, l): + return list(set(l)) + + def set_framework_includes(self): + self.framework_includes = tests_config.FRAMEWORK_DIRECTORIES_TO_INCLUDE_LIST + + def get_framework_includes(self): + return self.framework_includes + + def set_includes_to_copy_dict(self, files_to_copy_dict): + self.includes_to_copy_dict = files_to_copy_dict - return None + def get_includes_to_copy_dict(self): + return self.includes_to_copy_dict - def get_test_files_from_dir(self, path): - ret = os.listdir(path) - ret = [name for name in ret if os.path.isfile(path + os.sep + name) and (name.endswith(".c") or name.endswith(".h"))] - ret = [name for name in ret if self.are_markups_valid(path + name)] + def set_main_UT_dir(self): + main_UT_dir = os.path.normpath(os.path.normpath(self.get_script_dir_path() + + os.sep + + tests_config. + MAIN_DIRECTORY_OF_UNIT_TESTS)) + if not os.path.isdir(main_UT_dir): + print("Given path to main UT directory is wrong!") + sys.exit(1) - return ret + self.main_UT_dir = main_UT_dir - def get_list_of_directories(self, path): - if not os.path.isdir(path): - return [] + def set_main_tested_dir(self): + main_tested_dir = os.path.normpath(os.path.normpath(self.get_script_dir_path() + + os.sep + + tests_config. + MAIN_DIRECTORY_OF_TESTED_PROJECT)) + if not os.path.isdir(main_tested_dir): + print("Given path to main tested directory is wrong!") + sys.exit(1) - ret = os.listdir(path) - ret = [name for name in ret if not os.path.isfile(path + os.sep + name)] - ret = [os.path.normpath(name) + os.sep for name in ret] - - return ret - - def remove_hashes(self, path): - with open(path) as f: - buf = f.readlines() - - buf = [l for l in buf if not re.search(r'.*#.*', l)] - - with open(path, "w") as f: - f.writelines(buf) - - return - for i in range(len(padding)): - try: - padding[i] = padding[i].split("#")[0] - except ValueError: - continue - - f = open(path, "w") - f.writelines(padding) - f.close() - - def find_function_end(self,code_lines_list, first_line_of_function_index): - brackets_counter = 0 - current_line_index = first_line_of_function_index - - while True: - if "{" in code_lines_list[current_line_index]: - brackets_counter += code_lines_list[current_line_index].count("{") - brackets_counter -= code_lines_list[current_line_index].count("}") - break - else: - current_line_index += 1 - - while brackets_counter > 0: - current_line_index += 1 - if "{" in code_lines_list[current_line_index]: - brackets_counter += code_lines_list[current_line_index].count("{") - brackets_counter -= code_lines_list[current_line_index].count("}") - elif "}" in code_lines_list[current_line_index]: - brackets_counter -= code_lines_list[current_line_index].count("}") - - return current_line_index - - def get_functions_calls(self, file_to_compile): - out_dir = "/tmp/ocf_ut" - out_file = out_dir + "/ocf_obj.o" - self.create_dir_if_not_exist(out_dir) - cmd = "/usr/bin/gcc -o " + out_file + " -c " + file_to_compile + " &> /dev/null" - run_command([cmd], verbose=None) - result = run_command(["/usr/bin/nm -u " + out_file + " | cut -f2 -d\'U\'"]) - return set(result.stdout.split()) - - - def remove_function_body(self, code_lines_list, line_id): - try: - while "{" not in code_lines_list[line_id]: - if ";" in code_lines_list[line_id]: - return - line_id += 1 - except IndexError: - return - - last_line_id = self.find_function_end(code_lines_list, line_id) - - code_lines_list[line_id] = code_lines_list[line_id].split("{")[0] - code_lines_list[line_id] += ";" - - del code_lines_list[line_id + 1: last_line_id + 1] - - - def get_function_wrap(self, code_lines_list, line_id): - ret = [] - # Line numbering starts with one, list indexing with zero - line_id -= 1 - - # If returned type is not present, it should be in line above - try: - code_lines_list[line_id].split("(")[0].rsplit()[1] - except IndexError: - line_id -= 1 - - while True: - ret.append(code_lines_list[line_id]) - if ")" in code_lines_list[line_id]: - break - line_id += 1 - - # Tags list contains both prototypes and definitions, here we recoginze - # with which one we deals - delimiter = "" - try: - if "{" in ret[-1] or "{" in ret[-2]: - delimter = "{" - else: - delimiter =";" - except IndexError: - delimiter =";" - - ret[-1] = ret[-1].split(delimiter)[0] - ret[-1] += "{}" - - function_name = "" - line_with_name = 0 - try: - function_name = ret[line_with_name].split("(")[0].rsplit(maxsplit=1)[1] - except IndexError: - line_with_name = 1 - function_name = ret[line_with_name].split("(")[0] - - function_new_name = "__wrap_" + function_name.replace("*", "") - ret[0] = ret[0].replace(function_name, function_new_name) - - return ''.join(ret) - - def set_ctags_path(self): - result = run_command(["/usr/bin/ctags --version &> /dev/null"]) - if result.returncode == 0: - path = "/usr/bin/ctags " - result = run_command([path, "--c-types=f"], verbose=None) - if not re.search("unrecognized option", result.stdout, re.IGNORECASE): - self.ctags_path = path - return - - result = run_command(["/usr/local/bin/ctags --version &> /dev/null"]) - if result.returncode == 0: - path = "/usr/local/bin/ctags " - result = run_command(["path", "--c-types=f"], verbose=None) - if not re.search("unrecognized option", result.stdout, re.IGNORECASE): - self.ctags_path = path - return - - print("ERROR: Current ctags version don't support \"--c-types=f\" parameter!") - exit(1) - - def get_ctags_path(self): - return self.ctags_path - - def get_tests_catalouges_list(self): - return self.test_catalouges_list - - def get_relative_path(self, original_path, part_to_remove): - return original_path.split(part_to_remove, 1)[1] - - def get_dirs_to_include_list(self): - return self.dirs_to_include_list - - def set_dirs_to_include(self): - self.dirs_to_include_list = [self.get_main_tested_dir() + name\ - for name in tests_config.DIRECTORIES_TO_INCLUDE_FROM_PROJECT_LIST] + self.main_tested_dir = main_tested_dir - def set_tests_internal_includes_list(self): - self.tests_internal_includes_list = [self.get_main_UT_dir() + name\ - for name in tests_config.DIRECTORIES_TO_INCLUDE_FROM_UT_LIST] - - def set_preprocessing_repo(self): - self.preprocessing_repo = self.get_main_UT_dir() +\ - tests_config.PREPROCESSED_SOURCES_REPOSITORY - - def set_sources_to_test_repo(self): - self.sources_to_test_repo = self.get_main_UT_dir() +\ - tests_config.SOURCES_TO_TEST_REPOSITORY - - def get_sources_to_test_repo(self): - return self.sources_to_test_repo - - def get_preprocessing_repo(self): - return self.preprocessing_repo - - def get_tests_internal_includes_list(self): - return self.tests_internal_includes_list - - def get_script_dir_path(self): - return os.path.normpath(self.script_dir_abs_path) + os.sep - - def get_main_UT_dir(self): - return os.path.normpath(self.main_UT_dir) + os.sep - - def get_main_tested_dir(self): - return os.path.normpath(self.main_tested_dir) + os.sep - - def remove_duplicates_from_list(self, l): - return list(set(l)) - - def set_framework_includes(self): - self.framework_includes = tests_config.FRAMEWORK_DIRECTORIES_TO_INCLUDE_LIST - - def get_framework_includes(self): - return self.framework_includes - - def set_includes_to_copy_dict(self, files_to_copy_dict): - self.includes_to_copy_dict = files_to_copy_dict - - def get_includes_to_copy_dict(self): - return self.includes_to_copy_dict - - def set_main_UT_dir(self): - main_UT_dir = os.path.normpath(os.path.normpath(self.get_script_dir_path()\ - + os.sep + tests_config.MAIN_DIRECTORY_OF_UNIT_TESTS)) - if not os.path.isdir(main_UT_dir): - print("Given path to main UT directory is wrong!") - sys.exit(1) - - self.main_UT_dir = main_UT_dir - - def set_main_tested_dir(self): - main_tested_dir = os.path.normpath(os.path.normpath(self.get_script_dir_path()\ - + os.sep + tests_config.MAIN_DIRECTORY_OF_TESTED_PROJECT)) - if not os.path.isdir(main_tested_dir): - print("Given path to main tested directory is wrong!") - sys.exit(1) - - self.main_tested_dir = main_tested_dir def __main__(): + generator = UnitTestsSourcesGenerator() + generator.copy_includes() + generator.preprocessing() + generator.prepare_sources_for_testing() + generator.create_main_cmake_lists() + generator.generate_cmakes_for_tests() - generator = UnitTestsSourcesGenerator() - generator.copy_includes() - generator.preprocessing() - generator.prepare_sources_for_testing() - generator.create_main_cmake_lists() - generator.generate_cmakes_for_tests() + print("Files for testing generated!") - print("Files for testing generated!") if __name__ == "__main__": - __main__() + __main__() diff --git a/tests/unit/framework/run_unit_tests.py b/tests/unit/framework/run_unit_tests.py index 8e1aac2..d72269c 100755 --- a/tests/unit/framework/run_unit_tests.py +++ b/tests/unit/framework/run_unit_tests.py @@ -10,13 +10,15 @@ import os import sys import subprocess + def run_command(args): result = subprocess.run(" ".join(args), shell=True, - stdout=subprocess.PIPE, stderr=subprocess.PIPE) + stdout=subprocess.PIPE, stderr=subprocess.PIPE) result.stdout = result.stdout.decode("ASCII", errors='ignore') result.stderr = result.stderr.decode("ASCII", errors='ignore') return result + script_path = os.path.dirname(os.path.realpath(__file__)) main_UT_dir = os.path.join(script_path, tests_config.MAIN_DIRECTORY_OF_UNIT_TESTS) @@ -29,13 +31,13 @@ if not os.path.isdir(os.path.join(main_UT_dir, "ocf_env", "ocf")): except Exception: raise Exception("Cannot create ocf_env/ocf directory!") -result = run_command([ "cp", "-r", - os.path.join(main_tested_dir, "inc", "*"), - os.path.join(main_UT_dir, "ocf_env", "ocf") ]) +result = run_command(["cp", "-r", + os.path.join(main_tested_dir, "inc", "*"), + os.path.join(main_UT_dir, "ocf_env", "ocf")]) if result.returncode != 0: raise Exception("Preparing sources for testing failed!") -result = run_command([ os.path.join(script_path, "prepare_sources_for_testing.py") ]) +result = run_command([os.path.join(script_path, "prepare_sources_for_testing.py")]) if result.returncode != 0: raise Exception("Preparing sources for testing failed!") @@ -52,7 +54,7 @@ except Exception: os.chdir(build_dir) -cmake_result = run_command([ "cmake", ".." ]) +cmake_result = run_command(["cmake", ".."]) print(cmake_result.stdout) with open(os.path.join(logs_dir, "cmake.output"), "w") as f: @@ -64,20 +66,20 @@ if cmake_result.returncode != 0: f.write("Cmake step failed! More details in cmake.output.") sys.exit(1) -make_result = run_command([ "make", "-j" ]) +make_result = run_command(["make", "-j"]) print(make_result.stdout) with open(os.path.join(logs_dir, "make.output"), "w") as f: - f.write(make_result.stdout) - f.write(make_result.stderr) + f.write(make_result.stdout) + f.write(make_result.stderr) if make_result.returncode != 0: - with open(os.path.join(logs_dir, "tests.output"), "w") as f: - f.write("Make step failed! More details in make.output.") - sys.exit(1) + with open(os.path.join(logs_dir, "tests.output"), "w") as f: + f.write("Make step failed! More details in make.output.") + sys.exit(1) -test_result = run_command([ "make", "test" ]) +test_result = run_command(["make", "test"]) print(test_result.stdout) -with open(os.path.join(logs_dir , "tests.output"), "w") as f: - f.write(test_result.stdout) +with open(os.path.join(logs_dir, "tests.output"), "w") as f: + f.write(test_result.stdout) diff --git a/tests/unit/framework/tests_config.py b/tests/unit/framework/tests_config.py index 19a1761..2dd3130 100644 --- a/tests/unit/framework/tests_config.py +++ b/tests/unit/framework/tests_config.py @@ -11,25 +11,34 @@ MAIN_DIRECTORY_OF_TESTED_PROJECT = "../../../" MAIN_DIRECTORY_OF_UNIT_TESTS = "../tests/" -# Paths to all directories, in which tests are stored. All paths should be relative to MAIN_DIRECTORY_OF_UNIT_TESTS -DIRECTORIES_WITH_TESTS_LIST = ["cleaning/", "metadata/", "mngt/", "concurrency/", "engine/", "eviction/", "utils/"] +# Paths to all directories, in which tests are stored. All paths should be relative to +# MAIN_DIRECTORY_OF_UNIT_TESTS +DIRECTORIES_WITH_TESTS_LIST = ["cleaning/", "metadata/", "mngt/", "concurrency/", "engine/", + "eviction/", "utils/"] -# Paths to all directories containing files with sources. All paths should be relative to MAIN_DIRECTORY_OF_TESTED_PROJECT -DIRECTORIES_TO_INCLUDE_FROM_PROJECT_LIST = ["src/", "src/cleaning/", "src/engine/", "src/metadata/", "src/eviction/", "src/mngt/", "src/concurrency/", "src/utils/", "inc/"] +# Paths to all directories containing files with sources. All paths should be relative to +# MAIN_DIRECTORY_OF_TESTED_PROJECT +DIRECTORIES_TO_INCLUDE_FROM_PROJECT_LIST = ["src/", "src/cleaning/", "src/engine/", "src/metadata/", + "src/eviction/", "src/mngt/", "src/concurrency/", + "src/utils/", "inc/"] # Paths to all directories from directory with tests, which should also be included DIRECTORIES_TO_INCLUDE_FROM_UT_LIST = ["ocf_env/"] # Paths to include, required by cmake, cmocka, cunit -FRAMEWORK_DIRECTORIES_TO_INCLUDE_LIST = ["${CMOCKA_PUBLIC_INCLUDE_DIRS}" ,"${CMAKE_BINARY_DIR}", "${CMAKE_CURRENT_SOURCE_DIR}"] +FRAMEWORK_DIRECTORIES_TO_INCLUDE_LIST = ["${CMOCKA_PUBLIC_INCLUDE_DIRS}", "${CMAKE_BINARY_DIR}", + "${CMAKE_CURRENT_SOURCE_DIR}"] -# Path to directory containing all sources after preprocessing. Should be relative to MAIN_DIRECTORY_OF_UNIT_TESTS +# Path to directory containing all sources after preprocessing. Should be relative to +# MAIN_DIRECTORY_OF_UNIT_TESTS PREPROCESSED_SOURCES_REPOSITORY = "preprocessed_sources_repository/" -# Path to directory containing all sources after removing unneeded functions and cmake files for tests +# Path to directory containing all sources after removing unneeded functions and cmake files for +# tests SOURCES_TO_TEST_REPOSITORY = "sources_to_test_repository/" -# List of includes. Directories will be recursively copied to given destinations in directory with tests. +# List of includes. +# Directories will be recursively copied to given destinations in directory with tests. # key - destination in dir with tests # value - path in tested project to dir which should be copied -INCLUDES_TO_COPY_DICT = { 'ocf_env/ocf/' : "inc/" } +INCLUDES_TO_COPY_DICT = {'ocf_env/ocf/': "inc/"}