diff --git a/.pep8speaks.yml b/.pep8speaks.yml
index e5ee75c..93ede8a 100644
--- a/.pep8speaks.yml
+++ b/.pep8speaks.yml
@@ -8,5 +8,6 @@ pycodestyle:
max-line-length: 100
ignore:
- E402 # module level import not at top of file
+ - W503 # line break after binary operator
no_blank_comment: True
diff --git a/tests/functional/pyocf/types/cache.py b/tests/functional/pyocf/types/cache.py
index afa87a8..35c3d84 100644
--- a/tests/functional/pyocf/types/cache.py
+++ b/tests/functional/pyocf/types/cache.py
@@ -86,6 +86,7 @@ class CacheMode(IntEnum):
def read_insert(self):
return self.value not in [CacheMode.PT, CacheMode.WO]
+
class EvictionPolicy(IntEnum):
LRU = 0
DEFAULT = LRU
@@ -306,7 +307,7 @@ class Cache:
c.start_cache()
try:
c.load_cache(device)
- except:
+ except: # noqa E722
c.stop()
raise
@@ -319,7 +320,7 @@ class Cache:
c.start_cache()
try:
c.attach_device(device, force=True)
- except:
+ except: # noqa E722
c.stop()
raise
@@ -529,13 +530,12 @@ class Cache:
if c.results["error"]:
raise OcfError("Couldn't flush cache", c.results["error"])
-
def get_name(self):
self.read_lock()
try:
return str(self.owner.lib.ocf_cache_get_name(self), encoding="ascii")
- except:
+ except: # noqa E722
raise OcfError("Couldn't get cache name")
finally:
self.read_unlock()
diff --git a/tests/functional/pyocf/types/data.py b/tests/functional/pyocf/types/data.py
index 3c49f46..b032cf3 100644
--- a/tests/functional/pyocf/types/data.py
+++ b/tests/functional/pyocf/types/data.py
@@ -56,7 +56,7 @@ class DataOps(Structure):
class Data:
- DATA_POISON=0xA5
+ DATA_POISON = 0xA5
PAGE_SIZE = 4096
_instances_ = {}
@@ -109,7 +109,7 @@ class Data:
def from_string(cls, source: str, encoding: str = "ascii"):
b = bytes(source, encoding)
# duplicate string to fill space up to sector boundary
- padding_len = S.from_B(len(b), sector_aligned = True).B - len(b)
+ padding_len = S.from_B(len(b), sector_aligned=True).B - len(b)
padding = b * (padding_len // len(b) + 1)
padding = padding[:padding_len]
b = b + padding
diff --git a/tests/functional/pyocf/types/io.py b/tests/functional/pyocf/types/io.py
index 8da63d5..16c8a2a 100644
--- a/tests/functional/pyocf/types/io.py
+++ b/tests/functional/pyocf/types/io.py
@@ -92,7 +92,7 @@ class Io(Structure):
def end(self, err):
try:
self.callback(err)
- except:
+ except: # noqa E722
pass
self.put()
diff --git a/tests/functional/pyocf/types/queue.py b/tests/functional/pyocf/types/queue.py
index b7747f2..da09639 100644
--- a/tests/functional/pyocf/types/queue.py
+++ b/tests/functional/pyocf/types/queue.py
@@ -36,6 +36,7 @@ def io_queue_run(*, queue: Queue, kick: Condition, stop: Event):
if stop.is_set() and not OcfLib.getInstance().ocf_queue_pending_io(queue):
break
+
class Queue:
_instances_ = {}
@@ -102,4 +103,3 @@ class Queue:
self.kick_condition.notify_all()
self.thread.join()
-
diff --git a/tests/functional/pyocf/types/shared.py b/tests/functional/pyocf/types/shared.py
index 28677be..6f53dda 100644
--- a/tests/functional/pyocf/types/shared.py
+++ b/tests/functional/pyocf/types/shared.py
@@ -102,7 +102,7 @@ class SharedOcfObject(Structure):
def get_instance(cls, ref: int):
try:
return cls._instances_[ref]
- except:
+ except: # noqa E722
logging.getLogger("pyocf").error(
"OcfSharedObject corruption. wanted: {} instances: {}".format(
ref, cls._instances_
diff --git a/tests/functional/pyocf/types/volume.py b/tests/functional/pyocf/types/volume.py
index 29f4b9d..168e4af 100644
--- a/tests/functional/pyocf/types/volume.py
+++ b/tests/functional/pyocf/types/volume.py
@@ -74,7 +74,7 @@ class VolumeIoPriv(Structure):
class Volume(Structure):
- VOLUME_POISON=0x13
+ VOLUME_POISON = 0x13
_fields_ = [("_storage", c_void_p)]
_instances_ = {}
@@ -184,7 +184,7 @@ class Volume(Structure):
uuid = str(uuid_ptr.contents._data, encoding="ascii")
try:
volume = Volume.get_by_uuid(uuid)
- except:
+ except: # noqa E722 TODO:Investigate whether this really should be so broad
print("Tried to access unallocated volume {}".format(uuid))
print("{}".format(Volume._uuid_))
return -1
@@ -255,7 +255,7 @@ class Volume(Structure):
memset(dst, 0, discard.contents._bytes)
discard.contents._end(discard, 0)
- except:
+ except: # noqa E722
discard.contents._end(discard, -5)
def get_stats(self):
@@ -269,8 +269,7 @@ class Volume(Structure):
self.stats[IoDir(io.contents._dir)] += 1
io_priv = cast(
- OcfLib.getInstance().ocf_io_get_priv(io), POINTER(VolumeIoPriv)
- )
+ OcfLib.getInstance().ocf_io_get_priv(io), POINTER(VolumeIoPriv))
offset = io_priv.contents._offset
if io.contents._dir == IoDir.WRITE:
@@ -286,7 +285,7 @@ class Volume(Structure):
io_priv.contents._offset += io.contents._bytes
io.contents._end(io, 0)
- except:
+ except: # noqa E722
io.contents._end(io, -5)
def dump(self, offset=0, size=0, ignore=VOLUME_POISON, **kwargs):
@@ -325,10 +324,11 @@ class ErrorDevice(Volume):
super().reset_stats()
self.stats["errors"] = {IoDir.WRITE: 0, IoDir.READ: 0}
+
class TraceDevice(Volume):
def __init__(self, size, trace_fcn=None, uuid=None):
super().__init__(size, uuid)
- self.trace_fcn=trace_fcn
+ self.trace_fcn = trace_fcn
def submit_io(self, io):
submit = True
diff --git a/tests/functional/pyocf/utils.py b/tests/functional/pyocf/utils.py
index a0710af..a308ca2 100644
--- a/tests/functional/pyocf/utils.py
+++ b/tests/functional/pyocf/utils.py
@@ -6,7 +6,8 @@
from ctypes import string_at
-def print_buffer(buf, length, offset=0, width=16, ignore=0, stop_after_count_ignored=0, print_fcn=print):
+def print_buffer(buf, length, offset=0, width=16, ignore=0,
+ stop_after_count_ignored=0, print_fcn=print):
end = int(offset) + int(length)
offset = int(offset)
ignored_lines = 0
@@ -15,16 +16,13 @@ def print_buffer(buf, length, offset=0, width=16, ignore=0, stop_after_count_ign
stop_after_count_ignored = int(stop_after_count_ignored / width)
for addr in range(offset, end, width):
- cur_line = buf[addr : min(end, addr + width)]
+ cur_line = buf[addr: min(end, addr + width)]
byteline = ""
asciiline = ""
if not any(x != ignore for x in cur_line):
if stop_after_count_ignored and ignored_lines > stop_after_count_ignored:
- print_fcn(
- "<{} bytes of '0x{:02X}' encountered, stopping>".format(
- stop_after_count_ignored * width, ignore
- )
- )
+ print_fcn("<{} bytes of '0x{:02X}' encountered, stopping>".
+ format(stop_after_count_ignored * width, ignore))
return
ignored_lines += 1
continue
@@ -71,23 +69,23 @@ class Size:
return self.bytes
@classmethod
- def from_B(cls, value, sector_aligned = False):
+ def from_B(cls, value, sector_aligned=False):
return cls(value, sector_aligned)
@classmethod
- def from_KiB(cls, value, sector_aligned = False):
+ def from_KiB(cls, value, sector_aligned=False):
return cls(value * cls._KiB, sector_aligned)
@classmethod
- def from_MiB(cls, value, sector_aligned = False):
+ def from_MiB(cls, value, sector_aligned=False):
return cls(value * cls._MiB, sector_aligned)
@classmethod
- def from_GiB(cls, value, sector_aligned = False):
+ def from_GiB(cls, value, sector_aligned=False):
return cls(value * cls._GiB, sector_aligned)
@classmethod
- def from_TiB(cls, value, sector_aligned = False):
+ def from_TiB(cls, value, sector_aligned=False):
return cls(value * cls._TiB, sector_aligned)
@classmethod
diff --git a/tests/functional/tests/engine/test_wo.py b/tests/functional/tests/engine/test_wo.py
index 2ea0b60..db2862f 100644
--- a/tests/functional/tests/engine/test_wo.py
+++ b/tests/functional/tests/engine/test_wo.py
@@ -3,7 +3,6 @@
# SPDX-License-Identifier: BSD-3-Clause-Clear
#
-import pytest
from ctypes import c_int, memmove, cast, c_void_p
from enum import IntEnum
from itertools import product
@@ -11,11 +10,12 @@ import random
from pyocf.types.cache import Cache, CacheMode
from pyocf.types.core import Core
-from pyocf.types.volume import Volume, ErrorDevice
+from pyocf.types.volume import Volume
from pyocf.types.data import Data
from pyocf.types.io import IoDir
from pyocf.utils import Size
-from pyocf.types.shared import OcfError, OcfCompletion
+from pyocf.types.shared import OcfCompletion
+
def __io(io, queue, address, size, data, direction):
io.set_data(data, 0)
@@ -38,13 +38,16 @@ def _io(io, queue, address, size, data, offset, direction):
memmove(cast(data, c_void_p).value + offset, _data.handle, size)
return ret
+
def io_to_core(core, address, size, data, offset, direction):
return _io(core.new_core_io(), core.cache.get_default_queue(), address, size,
- data, offset, direction)
+ data, offset, direction)
+
def io_to_exp_obj(core, address, size, data, offset, direction):
return _io(core.new_io(), core.cache.get_default_queue(), address, size, data,
- offset, direction)
+ offset, direction)
+
def sector_to_region(sector, region_start):
i = 0
@@ -52,10 +55,12 @@ def sector_to_region(sector, region_start):
i += 1
return i
+
class SectorStatus(IntEnum):
- DIRTY = 0,
- CLEAN = 1,
- INVALID = 2,
+ DIRTY = 0,
+ CLEAN = 1,
+ INVALID = 2,
+
I = SectorStatus.INVALID
D = SectorStatus.DIRTY
@@ -85,6 +90,8 @@ C = SectorStatus.CLEAN
# - if clean, exported object sector no @n is filled with 100 + @n
# - if dirty, exported object sector no @n is filled with 200 + @n
#
+
+
def test_wo_read_data_consistency(pyocf_ctx):
# start sector for each region
region_start = [0, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17]
@@ -114,11 +121,11 @@ def test_wo_read_data_consistency(pyocf_ctx):
data = {}
# memset n-th sector of core data with n
- data[SectorStatus.INVALID] = bytes([x // SECTOR_SIZE for x in range (WORKSET_SIZE)])
+ data[SectorStatus.INVALID] = bytes([x // SECTOR_SIZE for x in range(WORKSET_SIZE)])
# memset n-th sector of clean data with n + 100
- data[SectorStatus.CLEAN] = bytes([100 + x // SECTOR_SIZE for x in range (WORKSET_SIZE)])
+ data[SectorStatus.CLEAN] = bytes([100 + x // SECTOR_SIZE for x in range(WORKSET_SIZE)])
# memset n-th sector of dirty data with n + 200
- data[SectorStatus.DIRTY] = bytes([200 + x // SECTOR_SIZE for x in range (WORKSET_SIZE)])
+ data[SectorStatus.DIRTY] = bytes([200 + x // SECTOR_SIZE for x in range(WORKSET_SIZE)])
result_b = bytes(WORKSET_SIZE)
@@ -137,30 +144,30 @@ def test_wo_read_data_consistency(pyocf_ctx):
combinations.append(S)
random.shuffle(combinations)
- # add fixed test cases at the beginnning
+ # add fixed test cases at the beginning
combinations = fixed_combinations + combinations
for S in combinations[:ITRATION_COUNT]:
# write data to core and invalidate all CL
- cache.change_cache_mode(cache_mode = CacheMode.PT)
- io_to_exp_obj(core, WORKSET_OFFSET, len(data[SectorStatus.INVALID]), \
- data[SectorStatus.INVALID], 0, IoDir.WRITE)
+ cache.change_cache_mode(cache_mode=CacheMode.PT)
+ io_to_exp_obj(core, WORKSET_OFFSET, len(data[SectorStatus.INVALID]),
+ data[SectorStatus.INVALID], 0, IoDir.WRITE)
# insert clean sectors
- cache.change_cache_mode(cache_mode = CacheMode.WT)
+ cache.change_cache_mode(cache_mode=CacheMode.WT)
for sec in range(SECTOR_COUNT):
region = sector_to_region(sec, region_start)
if S[region] == SectorStatus.CLEAN:
- io_to_exp_obj(core, WORKSET_OFFSET + SECTOR_SIZE * sec, SECTOR_SIZE, \
- data[SectorStatus.CLEAN], sec * SECTOR_SIZE, IoDir.WRITE)
+ io_to_exp_obj(core, WORKSET_OFFSET + SECTOR_SIZE * sec, SECTOR_SIZE,
+ data[SectorStatus.CLEAN], sec * SECTOR_SIZE, IoDir.WRITE)
# write dirty sectors
- cache.change_cache_mode(cache_mode = CacheMode.WO)
+ cache.change_cache_mode(cache_mode=CacheMode.WO)
for sec in range(SECTOR_COUNT):
region = sector_to_region(sec, region_start)
if S[region] == SectorStatus.DIRTY:
- io_to_exp_obj(core, WORKSET_OFFSET + SECTOR_SIZE * sec, SECTOR_SIZE, \
- data[SectorStatus.DIRTY], sec * SECTOR_SIZE, IoDir.WRITE)
+ io_to_exp_obj(core, WORKSET_OFFSET + SECTOR_SIZE * sec, SECTOR_SIZE,
+ data[SectorStatus.DIRTY], sec * SECTOR_SIZE, IoDir.WRITE)
for s in start_sec:
for e in end_sec:
@@ -171,10 +178,9 @@ def test_wo_read_data_consistency(pyocf_ctx):
START = s * SECTOR_SIZE
END = e * SECTOR_SIZE
size = (e - s + 1) * SECTOR_SIZE
- assert(0 == io_to_exp_obj(core, WORKSET_OFFSET + START, size, \
- result_b, START, IoDir.READ)), \
- "error reading in WO mode: S={}, start={}, end={}".format( \
- S, s, e)
+ assert(0 == io_to_exp_obj(core, WORKSET_OFFSET + START, size,
+ result_b, START, IoDir.READ)),\
+ "error reading in WO mode: S={}, start={}, end={}".format(S, s, e)
# verify read data
for sec in range(s, e + 1):
@@ -182,6 +188,4 @@ def test_wo_read_data_consistency(pyocf_ctx):
region = sector_to_region(sec, region_start)
check_byte = sec * SECTOR_SIZE
assert(result_b[check_byte] == data[S[region]][check_byte]), \
- "unexpected data in sector {}, S={}, s={}, e={}\n".format( \
- sec, S, s, e)
-
+ "unexpected data in sector {}, S={}, s={}, e={}\n".format(sec, S, s, e)
diff --git a/tests/functional/tests/management/test_start_stop.py b/tests/functional/tests/management/test_start_stop.py
index 648f3e0..29f04ae 100644
--- a/tests/functional/tests/management/test_start_stop.py
+++ b/tests/functional/tests/management/test_start_stop.py
@@ -111,8 +111,9 @@ def test_start_read_first_and_check_mode(pyocf_ctx, mode: CacheMode, cls: CacheL
core_device.reset_stats()
test_data = Data.from_string("Changed test data")
-
+
io_to_core(core_exported, test_data, Size.from_sector(1).B)
+
check_stats_write_after_read(core_exported, mode, cls, True)
logger.info("[STAGE] Read from exported object after write")
@@ -159,7 +160,8 @@ def test_start_params(pyocf_ctx, mode: CacheMode, cls: CacheLineSize, layout: Me
assert stats["conf"]["eviction_policy"] == EvictionPolicy.DEFAULT, "Eviction policy"
assert stats["conf"]["cache_id"] == cache_id, "Cache id"
assert cache.get_name() == name, "Cache name"
- # TODO: metadata_layout, metadata_volatile, max_queue_size, queue_unblock_size, pt_unaligned_io, use_submit_fast
+ # TODO: metadata_layout, metadata_volatile, max_queue_size,
+ # queue_unblock_size, pt_unaligned_io, use_submit_fast
# TODO: test in functional tests
@@ -254,8 +256,9 @@ def test_100_start_stop(pyocf_ctx):
def test_start_stop_incrementally(pyocf_ctx):
"""Starting/stopping multiple caches incrementally.
- Check whether OCF behaves correctly when few caches at a time are in turns added and removed (#added > #removed)
- until their number reaches limit, and then proportions are reversed and number of caches gradually falls to 0.
+ Check whether OCF behaves correctly when few caches at a time are
+ in turns added and removed (#added > #removed) until their number reaches limit,
+ and then proportions are reversed and number of caches gradually falls to 0.
"""
caches = []
@@ -292,7 +295,8 @@ def test_start_stop_incrementally(pyocf_ctx):
stats = cache.get_stats()
cache_id = stats["conf"]["cache_id"]
cache.stop()
- assert get_cache_by_id(pyocf_ctx, cache_id) != 0, "Try getting cache after stopping it"
+ assert get_cache_by_id(pyocf_ctx, cache_id) !=\
+ 0, "Try getting cache after stopping it"
add = not add
@@ -306,11 +310,17 @@ def test_start_cache_same_id(pyocf_ctx, mode, cls):
cache_device1 = Volume(Size.from_MiB(20))
cache_device2 = Volume(Size.from_MiB(20))
cache_id = randrange(1, 16385)
- cache = Cache.start_on_device(cache_device1, cache_mode=mode, cache_line_size=cls, cache_id=cache_id)
+ cache = Cache.start_on_device(cache_device1,
+ cache_mode=mode,
+ cache_line_size=cls,
+ cache_id=cache_id)
cache.get_stats()
with pytest.raises(OcfError, match="OCF_ERR_CACHE_EXIST"):
- cache = Cache.start_on_device(cache_device2, cache_mode=mode, cache_line_size=cls, cache_id=cache_id)
+ cache = Cache.start_on_device(cache_device2,
+ cache_mode=mode,
+ cache_line_size=cls,
+ cache_id=cache_id)
cache.get_stats()
@@ -418,14 +428,20 @@ def check_stats_write_empty(exported_obj: Core, mode: CacheMode, cls: CacheLineS
"Occupancy"
-def check_stats_write_after_read(exported_obj: Core, mode: CacheMode, cls: CacheLineSize, read_from_empty=False):
+def check_stats_write_after_read(exported_obj: Core,
+ mode: CacheMode,
+ cls: CacheLineSize,
+ read_from_empty=False):
stats = exported_obj.cache.get_stats()
assert exported_obj.cache.device.get_stats()[IoDir.WRITE] == \
- (0 if mode in {CacheMode.WI, CacheMode.PT} else (2 if read_from_empty and mode.lazy_write() else 1)), \
+ (0 if mode in {CacheMode.WI, CacheMode.PT} else
+ (2 if read_from_empty and mode.lazy_write() else 1)), \
"Writes to cache device"
assert exported_obj.device.get_stats()[IoDir.WRITE] == (0 if mode.lazy_write() else 1), \
"Writes to core device"
- assert stats["req"]["wr_hits"]["value"] == (1 if (mode.read_insert() and mode != CacheMode.WI) or (mode.write_insert() and not read_from_empty) else 0), \
+ assert stats["req"]["wr_hits"]["value"] == \
+ (1 if (mode.read_insert() and mode != CacheMode.WI)
+ or (mode.write_insert() and not read_from_empty) else 0), \
"Write hits"
assert stats["usage"]["occupancy"]["value"] == \
(0 if mode in {CacheMode.WI, CacheMode.PT} else (cls / CacheLineSize.LINE_4KiB)), \
@@ -438,16 +454,20 @@ def check_stats_read_after_write(exported_obj, mode, cls, write_to_empty=False):
(2 if mode.lazy_write() else (0 if mode == CacheMode.PT else 1)), \
"Writes to cache device"
assert exported_obj.cache.device.get_stats()[IoDir.READ] == \
- (1 if mode in {CacheMode.WT, CacheMode.WB, CacheMode.WO} or (mode == CacheMode.WA and not write_to_empty) else 0), \
+ (1 if mode in {CacheMode.WT, CacheMode.WB, CacheMode.WO}
+ or (mode == CacheMode.WA and not write_to_empty) else 0), \
"Reads from cache device"
assert exported_obj.device.get_stats()[IoDir.READ] == \
- (0 if mode in {CacheMode.WB, CacheMode.WO, CacheMode.WT} or (mode == CacheMode.WA and not write_to_empty) else 1), \
+ (0 if mode in {CacheMode.WB, CacheMode.WO, CacheMode.WT}
+ or (mode == CacheMode.WA and not write_to_empty) else 1), \
"Reads from core device"
- assert stats["req"]["rd_full_misses"]["value"] == (1 if mode in {CacheMode.WA, CacheMode.WI} else 0) \
+ assert stats["req"]["rd_full_misses"]["value"] == \
+ (1 if mode in {CacheMode.WA, CacheMode.WI} else 0) \
+ (0 if write_to_empty or mode in {CacheMode.PT, CacheMode.WA} else 1), \
"Read full misses"
assert stats["req"]["rd_hits"]["value"] == \
- (1 if mode in {CacheMode.WT, CacheMode.WB, CacheMode.WO} or (mode == CacheMode.WA and not write_to_empty) else 0), \
+ (1 if mode in {CacheMode.WT, CacheMode.WB, CacheMode.WO}
+ or (mode == CacheMode.WA and not write_to_empty) else 0), \
"Read hits"
assert stats["usage"]["occupancy"]["value"] == \
(0 if mode == CacheMode.PT else (cls / CacheLineSize.LINE_4KiB)), "Occupancy"
@@ -467,4 +487,6 @@ def check_md5_sums(exported_obj: Core, mode: CacheMode):
def get_cache_by_id(ctx, cache_id):
cache_pointer = c_void_p()
- return OcfLib.getInstance().ocf_mngt_cache_get_by_id(ctx.ctx_handle, cache_id, byref(cache_pointer))
+ return OcfLib.getInstance().ocf_mngt_cache_get_by_id(ctx.ctx_handle,
+ cache_id,
+ byref(cache_pointer))
diff --git a/tests/functional/tests/security/test_management_start_fuzzy.py b/tests/functional/tests/security/test_management_start_fuzzy.py
index 706485e..07db815 100644
--- a/tests/functional/tests/security/test_management_start_fuzzy.py
+++ b/tests/functional/tests/security/test_management_start_fuzzy.py
@@ -12,7 +12,6 @@ from pyocf.utils import Size
from pyocf.types.shared import OcfError, CacheLineSize
from ctypes import c_uint32
-
logger = logging.getLogger(__name__)
@@ -51,7 +50,8 @@ def test_fuzzy_start_cache_line_size(pyocf_ctx, c_uint64_randomize, cm):
with pytest.raises(OcfError, match="OCF_ERR_INVALID_CACHE_LINE_SIZE"):
try_start_cache(cache_mode=cm, cache_line_size=c_uint64_randomize)
else:
- logger.warning(f"Test skipped for valid cache line size enum value: '{c_uint64_randomize}'. ")
+ logger.warning(
+ f"Test skipped for valid cache line size enum value: '{c_uint64_randomize}'. ")
@pytest.mark.security
@@ -67,8 +67,9 @@ def test_fuzzy_start_name(pyocf_ctx, string_randomize, cm, cls):
"""
cache_device = Volume(Size.from_MiB(30))
try:
- cache = Cache.start_on_device(cache_device, name=string_randomize, cache_mode=cm, cache_line_size=cls)
- except:
+ cache = Cache.start_on_device(cache_device, name=string_randomize, cache_mode=cm,
+ cache_line_size=cls)
+ except OcfError:
logger.error(f"Cache did not start properly with correct name value: {string_randomize}")
cache.stop()
@@ -107,7 +108,8 @@ def test_fuzzy_start_eviction_policy(pyocf_ctx, c_uint32_randomize, cm, cls):
with pytest.raises(OcfError, match="OCF_ERR_INVAL"):
try_start_cache(eviction_policy=c_uint32_randomize, cache_mode=cm, cache_line_size=cls)
else:
- logger.warning(f"Test skipped for valid eviction policy enum value: '{c_uint32_randomize}'. ")
+ logger.warning(
+ f"Test skipped for valid eviction policy enum value: '{c_uint32_randomize}'. ")
@pytest.mark.security
@@ -125,7 +127,8 @@ def test_fuzzy_start_metadata_layout(pyocf_ctx, c_uint32_randomize, cm, cls):
with pytest.raises(OcfError, match="OCF_ERR_INVAL"):
try_start_cache(metadata_layout=c_uint32_randomize, cache_mode=cm, cache_line_size=cls)
else:
- logger.warning(f"Test skipped for valid metadata layout enum value: '{c_uint32_randomize}'. ")
+ logger.warning(
+ f"Test skipped for valid metadata layout enum value: '{c_uint32_randomize}'. ")
@pytest.mark.security
@@ -133,7 +136,8 @@ def test_fuzzy_start_metadata_layout(pyocf_ctx, c_uint32_randomize, cm, cls):
@pytest.mark.parametrize('max_wb_queue_size', generate_random_numbers(c_uint32, 10))
def test_fuzzy_start_max_queue_size(pyocf_ctx, max_wb_queue_size, c_uint32_randomize, cls):
"""
- Test whether it is impossible to start cache with invalid dependence between max queue size and queue unblock size.
+ Test whether it is impossible to start cache with invalid dependence between max queue size
+ and queue unblock size.
:param pyocf_ctx: basic pyocf context fixture
:param max_wb_queue_size: max queue size value to start cache with
:param c_uint32_randomize: queue unblock size value to start cache with
@@ -148,4 +152,5 @@ def test_fuzzy_start_max_queue_size(pyocf_ctx, max_wb_queue_size, c_uint32_rando
cache_line_size=cls)
else:
logger.warning(f"Test skipped for valid values: "
- f"'max_queue_size={max_wb_queue_size}, queue_unblock_size={c_uint32_randomize}'.")
+ f"'max_queue_size={max_wb_queue_size}, "
+ f"queue_unblock_size={c_uint32_randomize}'.")
diff --git a/tests/unit/framework/add_new_test_file.py b/tests/unit/framework/add_new_test_file.py
index 2058c01..92e8713 100755
--- a/tests/unit/framework/add_new_test_file.py
+++ b/tests/unit/framework/add_new_test_file.py
@@ -11,167 +11,169 @@ import os
import sys
import textwrap
+
class TestGenerator(object):
- main_UT_dir = ""
- main_tested_dir = ""
- tested_file_path = ""
- tested_function_name = ""
+ main_UT_dir = ""
+ main_tested_dir = ""
+ tested_file_path = ""
+ tested_function_name = ""
- def __init__(self, main_UT_dir, main_tested_dir, file_path, func_name):
- self.set_main_UT_dir(main_UT_dir)
- self.set_main_tested_dir(main_tested_dir)
- self.set_tested_file_path(file_path)
- self.tested_function_name = func_name
+ def __init__(self, main_UT_dir, main_tested_dir, file_path, func_name):
+ self.set_main_UT_dir(main_UT_dir)
+ self.set_main_tested_dir(main_tested_dir)
+ self.set_tested_file_path(file_path)
+ self.tested_function_name = func_name
- def create_empty_test_file(self):
- dst_dir = os.path.dirname(self.get_tested_file_path()[::-1])[::-1]
+ def create_empty_test_file(self):
+ dst_dir = os.path.dirname(self.get_tested_file_path()[::-1])[::-1]
- self.create_dir_if_not_exist(self.get_main_UT_dir() + dst_dir)
- test_file_name = os.path.basename(self.get_tested_file_path())
+ self.create_dir_if_not_exist(self.get_main_UT_dir() + dst_dir)
+ test_file_name = os.path.basename(self.get_tested_file_path())
- dst_path = self.get_main_UT_dir() + dst_dir + "/" + test_file_name
+ dst_path = self.get_main_UT_dir() + dst_dir + "/" + test_file_name
- no_str = ""
- no = 0
- while True:
- if not os.path.isfile(dst_path.rsplit(".", 1)[0] + no_str + "." + dst_path.rsplit(".", 1)[1]):
- break
- no += 1
- no_str = str(no)
+ no_str = ""
+ no = 0
+ while True:
+ if not os.path.isfile("{0}{1}.{2}".format(dst_path.rsplit(".", 1)[0], no_str,
+ dst_path.rsplit(".", 1)[1])):
+ break
+ no += 1
+ no_str = str(no)
- dst_path = dst_path.rsplit(".", 1)[0] + no_str + "." + dst_path.rsplit(".", 1)[1]
- buf = self.get_markups()
- buf += "#undef static\n\n"
- buf += "#undef inline\n\n"
- buf += self.get_UT_includes()
- buf += self.get_includes(self.get_main_tested_dir() + self.get_tested_file_path())
- buf += self.get_autowrap_file_include(dst_path)
- buf += self.get_empty_test_function()
- buf += self.get_test_main()
+ dst_path = dst_path.rsplit(".", 1)[0] + no_str + "." + dst_path.rsplit(".", 1)[1]
+ buf = self.get_markups()
+ buf += "#undef static\n\n"
+ buf += "#undef inline\n\n"
+ buf += self.get_UT_includes()
+ buf += self.get_includes(self.get_main_tested_dir() + self.get_tested_file_path())
+ buf += self.get_autowrap_file_include(dst_path)
+ buf += self.get_empty_test_function()
+ buf += self.get_test_main()
- with open(dst_path, "w") as f:
- f.writelines(buf)
+ with open(dst_path, "w") as f:
+ f.writelines(buf)
- print(f"{dst_path} generated successfully!")
+ print(f"{dst_path} generated successfully!")
- def get_markups(self):
- ret = "/*\n"
- ret += " * " + self.get_tested_file_path() + "\n"
- ret += " * " + self.get_tested_function_name() + "\n"
- ret += " * \n"
- ret += " *\tINSERT HERE LIST OF FUNCTIONS YOU WANT TO LEAVE\n"
- ret += " *\tONE FUNCTION PER LINE\n"
- ret += " * \n"
- ret += " */\n\n"
+ def get_markups(self):
+ ret = "/*\n"
+ ret += " * " + self.get_tested_file_path() + "\n"
+ ret += " * " + self.get_tested_function_name() + "\n"
+ ret += " * \n"
+ ret += " *\tINSERT HERE LIST OF FUNCTIONS YOU WANT TO LEAVE\n"
+ ret += " *\tONE FUNCTION PER LINE\n"
+ ret += " * \n"
+ ret += " */\n\n"
- return ret
+ return ret
- def create_dir_if_not_exist(self, path):
- if not os.path.isdir(path):
- try:
- os.makedirs(path)
- except Exception:
- pass
- return True
- return None
+ def create_dir_if_not_exist(self, path):
+ if not os.path.isdir(path):
+ try:
+ os.makedirs(path)
+ except Exception:
+ pass
+ return True
+ return None
+ def get_UT_includes(self):
+ ret = '''
+ #include
+ #include
+ #include
+ #include
+ #include "print_desc.h"\n\n'''
- def get_UT_includes(self):
- ret = '''
- #include
- #include
- #include
- #include
- #include "print_desc.h"\n\n'''
+ return textwrap.dedent(ret)
- return textwrap.dedent(ret)
+ def get_autowrap_file_include(self, test_file_path):
+ autowrap_file = test_file_path.rsplit(".", 1)[0]
+ autowrap_file = autowrap_file.replace(self.main_UT_dir, "")
+ autowrap_file += "_generated_warps.c"
+ return "#include \"" + autowrap_file + "\"\n\n"
- def get_autowrap_file_include(self, test_file_path):
- autowrap_file = test_file_path.rsplit(".", 1)[0]
- autowrap_file = autowrap_file.replace(self.main_UT_dir, "")
- autowrap_file += "_generated_warps.c"
- return "#include \"" + autowrap_file + "\"\n\n"
+ def get_includes(self, abs_path_to_tested_file):
+ with open(abs_path_to_tested_file, "r") as f:
+ code = f.readlines()
- def get_includes(self, abs_path_to_tested_file):
- with open(abs_path_to_tested_file, "r") as f:
- code = f.readlines()
+ ret = [line for line in code if re.search(r'#include', line)]
- ret = [line for line in code if re.search(r'#include', line)]
+ return "".join(ret) + "\n"
- return "".join(ret) + "\n"
+ def get_empty_test_function(self):
+ ret = "static void " + self.get_tested_function_name() + "_test01(void **state)\n"
+ ret += "{\n"
+ ret += "\tprint_test_description(\"Put test description here\");\n"
+ ret += "\tassert_int_equal(1,1);\n"
+ ret += "}\n\n"
- def get_empty_test_function(self):
- ret = "static void " + self.get_tested_function_name() + "_test01(void **state)\n"
- ret += "{\n"
- ret += "\tprint_test_description(\"Put test description here\");\n"
- ret += "\tassert_int_equal(1,1);\n"
- ret += "}\n\n"
+ return ret
- return ret
+ def get_test_main(self):
+ ret = "int main(void)\n"
+ ret += "{\n"
+ ret += "\tconst struct CMUnitTest tests[] = {\n"
+ ret += "\t\tcmocka_unit_test(" + self.get_tested_function_name() + "_test01)\n"
+ ret += "\t};\n\n"
+ ret += "\tprint_message(\"Unit test of " + self.get_tested_file_path() + "\");\n\n"
+ ret += "\treturn cmocka_run_group_tests(tests, NULL, NULL);\n"
+ ret += "}"
- def get_test_main(self):
- ret = "int main(void)\n"
- ret += "{\n"
- ret += "\tconst struct CMUnitTest tests[] = {\n"
- ret += "\t\tcmocka_unit_test(" + self.get_tested_function_name() + "_test01)\n"
- ret += "\t};\n\n"
- ret += "\tprint_message(\"Unit test of " + self.get_tested_file_path() + "\");\n\n"
- ret += "\treturn cmocka_run_group_tests(tests, NULL, NULL);\n"
- ret += "}"
+ return ret
- return ret
+ def set_tested_file_path(self, path):
+ call_dir = os.getcwd() + os.sep
+ p = os.path.normpath(call_dir + path)
- def set_tested_file_path(self, path):
- call_dir = os.getcwd() + os.sep
- p = os.path.normpath(call_dir + path)
+ if os.path.isfile(p):
+ self.tested_file_path = p.split(self.get_main_tested_dir(), 1)[1]
+ return
+ elif os.path.isfile(self.get_main_tested_dir() + path):
+ self.tested_file_path = path
+ return
- if os.path.isfile(p):
- self.tested_file_path = p.split(self.get_main_tested_dir(), 1)[1]
- return
- elif os.path.isfile(self.get_main_tested_dir() + path):
- self.tested_file_path = path
- return
+ print(f"{os.path.join(self.get_main_tested_dir(), path)}")
+ print("Given path not exists!")
+ exit(1)
- print(f"{os.path.join(self.get_main_tested_dir(), path)}")
- print("Given path not exists!")
- exit(1)
+ def set_main_UT_dir(self, path):
+ p = os.path.dirname(os.path.realpath(__file__)) + os.sep + path
+ p = os.path.normpath(os.path.dirname(p)) + os.sep
+ self.main_UT_dir = p
+ def get_main_UT_dir(self):
+ return self.main_UT_dir
- def set_main_UT_dir(self, path):
- p = os.path.dirname(os.path.realpath(__file__)) + os.sep + path
- p = os.path.normpath(os.path.dirname(p)) + os.sep
- self.main_UT_dir = p
+ def set_main_tested_dir(self, path):
+ p = os.path.dirname(os.path.realpath(__file__)) + os.sep + path
+ p = os.path.normpath(os.path.dirname(p)) + os.sep
+ self.main_tested_dir = p
- def get_main_UT_dir(self):
- return self.main_UT_dir
+ def get_main_tested_dir(self):
+ return self.main_tested_dir
- def set_main_tested_dir(self, path):
- p = os.path.dirname(os.path.realpath(__file__)) + os.sep + path
- p = os.path.normpath(os.path.dirname(p)) + os.sep
- self.main_tested_dir = p
+ def get_tested_file_path(self):
+ return self.tested_file_path
- def get_main_tested_dir(self):
- return self.main_tested_dir
+ def get_tested_function_name(self):
+ return self.tested_function_name
- def get_tested_file_path(self):
- return self.tested_file_path
-
- def get_tested_function_name(self):
- return self.tested_function_name
def __main__():
- if len(sys.argv) < 3:
- print("No path to tested file or tested function name given !")
- sys.exit(1)
+ if len(sys.argv) < 3:
+ print("No path to tested file or tested function name given !")
+ sys.exit(1)
- tested_file_path = sys.argv[1]
- tested_function_name = sys.argv[2]
+ tested_file_path = sys.argv[1]
+ tested_function_name = sys.argv[2]
- generator = TestGenerator(tests_config.MAIN_DIRECTORY_OF_UNIT_TESTS,\
- tests_config.MAIN_DIRECTORY_OF_TESTED_PROJECT,\
- tested_file_path, tested_function_name)
+ generator = TestGenerator(tests_config.MAIN_DIRECTORY_OF_UNIT_TESTS,
+ tests_config.MAIN_DIRECTORY_OF_TESTED_PROJECT,
+ tested_file_path, tested_function_name)
+
+ generator.create_empty_test_file()
- generator.create_empty_test_file()
if __name__ == "__main__":
- __main__()
+ __main__()
diff --git a/tests/unit/framework/prepare_sources_for_testing.py b/tests/unit/framework/prepare_sources_for_testing.py
index 5cd46ba..06aa557 100755
--- a/tests/unit/framework/prepare_sources_for_testing.py
+++ b/tests/unit/framework/prepare_sources_for_testing.py
@@ -9,19 +9,20 @@ import shutil
import sys
import re
import os.path
-from collections import defaultdict
import subprocess
+import tests_config
+
def run_command(args, verbose=True):
- result = subprocess.run(" ".join(args), shell=True,
- stdout=subprocess.PIPE, stderr=subprocess.PIPE)
- result.stdout = result.stdout.decode("ASCII", errors='ignore')
- result.stderr = result.stderr.decode("ASCII", errors='ignore')
- if verbose:
- print(result.stderr)
- return result
+ result = subprocess.run(" ".join(args), shell=True,
+ stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ result.stdout = result.stdout.decode("ASCII", errors='ignore')
+ result.stderr = result.stderr.decode("ASCII", errors='ignore')
+ if verbose:
+ print(result.stderr)
+ return result
+
-import tests_config
#
# This script purpose is to remove unused functions definitions
# It is giving the opportunity to unit test all functions from OCF.
@@ -37,661 +38,674 @@ import tests_config
#
class UnitTestsSourcesGenerator(object):
- script_file_abs_path = ""
- script_dir_abs_path = ""
+ script_file_abs_path = ""
+ script_dir_abs_path = ""
- main_UT_dir = ""
- main_tested_dir = ""
+ main_UT_dir = ""
+ main_tested_dir = ""
- ctags_path = ""
+ ctags_path = ""
- test_catalouges_list = []
- dirs_to_include_list = []
+ test_catalogues_list = []
+ dirs_to_include_list = []
- tests_internal_includes_list = []
- framework_includes = []
+ tests_internal_includes_list = []
+ framework_includes = []
- dirs_with_tests_list = []
- test_files_paths_list = []
+ dirs_with_tests_list = []
+ test_files_paths_list = []
- tested_files_paths_list = []
+ tested_files_paths_list = []
- includes_to_copy_dict = {}
+ includes_to_copy_dict = {}
- preprocessing_repo = ""
- sources_to_test_repo = ""
+ preprocessing_repo = ""
+ sources_to_test_repo = ""
- def __init__(self):
- self.script_file_abs_path = os.path.realpath(__file__)
- self.script_dir_abs_path = os.path.normpath(os.path.dirname(self.script_file_abs_path) + os.sep)
+ def __init__(self):
+ self.script_file_abs_path = os.path.realpath(__file__)
+ self.script_dir_abs_path = os.path.normpath(
+ os.path.dirname(self.script_file_abs_path) + os.sep)
- self.set_ctags_path()
+ self.set_ctags_path()
- self.set_main_UT_dir()
- self.set_main_tested_dir()
+ self.set_main_UT_dir()
+ self.set_main_tested_dir()
- self.test_catalouges_list = tests_config.DIRECTORIES_WITH_TESTS_LIST
- self.set_includes_to_copy_dict(tests_config.INCLUDES_TO_COPY_DICT)
- self.set_dirs_to_include()
+ self.test_catalogues_list = tests_config.DIRECTORIES_WITH_TESTS_LIST
+ self.set_includes_to_copy_dict(tests_config.INCLUDES_TO_COPY_DICT)
+ self.set_dirs_to_include()
- self.set_tests_internal_includes_list()
- self.set_framework_includes()
- self.set_files_with_tests_list()
- self.set_tested_files_paths_list()
+ self.set_tests_internal_includes_list()
+ self.set_framework_includes()
+ self.set_files_with_tests_list()
+ self.set_tested_files_paths_list()
- self.set_preprocessing_repo()
- self.set_sources_to_test_repo()
+ self.set_preprocessing_repo()
+ self.set_sources_to_test_repo()
- def preprocessing(self):
- tested_files_list = self.get_tested_files_paths_list()
- project_includes = self.get_dirs_to_include_list()
- framework_includes = self.get_tests_internal_includes_list()
+ def preprocessing(self):
+ tested_files_list = self.get_tested_files_paths_list()
+ project_includes = self.get_dirs_to_include_list()
+ framework_includes = self.get_tests_internal_includes_list()
- gcc_flags = " -fno-inline -Dstatic= -Dinline= -E "
- gcc_command_template = "gcc "
- for path in project_includes:
- gcc_command_template += " -I " + path + " "
+ gcc_flags = " -fno-inline -Dstatic= -Dinline= -E "
+ gcc_command_template = "gcc "
+ for path in project_includes:
+ gcc_command_template += " -I " + path + " "
- for path in framework_includes:
- gcc_command_template += " -I " + path
+ for path in framework_includes:
+ gcc_command_template += " -I " + path
- gcc_command_template += gcc_flags
+ gcc_command_template += gcc_flags
- for path in tested_files_list:
- preprocessing_dst = self.get_preprocessing_repo() +\
- self.get_relative_path(path, self.get_main_tested_dir())
- preprocessing_dst_dir = os.path.dirname(preprocessing_dst)
- self.create_dir_if_not_exist(preprocessing_dst_dir)
+ for path in tested_files_list:
+ preprocessing_dst = self.get_preprocessing_repo() \
+ + self.get_relative_path(path, self.get_main_tested_dir())
+ preprocessing_dst_dir = os.path.dirname(preprocessing_dst)
+ self.create_dir_if_not_exist(preprocessing_dst_dir)
- gcc_command = gcc_command_template +\
- path + " > " + preprocessing_dst
+ gcc_command = gcc_command_template + path + " > " + preprocessing_dst
- result = run_command([gcc_command])
+ result = run_command([gcc_command])
- if result.returncode != 0:
- print(f"Generating preprocessing for {self.get_main_tested_dir() + path} failed!")
- print(result.output)
- run_command(["rm", "-f", preprocessing_dst])
- continue
+ if result.returncode != 0:
+ print(f"Generating preprocessing for {self.get_main_tested_dir() + path} failed!")
+ print(result.output)
+ run_command(["rm", "-f", preprocessing_dst])
+ continue
- self.remove_hashes(preprocessing_dst)
+ self.remove_hashes(preprocessing_dst)
- print(f"Preprocessed file {path} saved to {preprocessing_dst}")
+ print(f"Preprocessed file {path} saved to {preprocessing_dst}")
- def copy_includes(self):
- includes_dict = self.get_includes_to_copy_dict()
+ def copy_includes(self):
+ includes_dict = self.get_includes_to_copy_dict()
- for dst, src in includes_dict.items():
- src_path = os.path.normpath(self.get_main_tested_dir() + src)
- if not os.path.isdir(src_path):
- print(f"Directory {src_path} given to include does not exists!")
- continue
- dst_path = os.path.normpath(self.get_main_UT_dir() + dst)
+ for dst, src in includes_dict.items():
+ src_path = os.path.normpath(self.get_main_tested_dir() + src)
+ if not os.path.isdir(src_path):
+ print(f"Directory {src_path} given to include does not exists!")
+ continue
+ dst_path = os.path.normpath(self.get_main_UT_dir() + dst)
- shutil.rmtree(dst_path, ignore_errors=True)
- shutil.copytree(src_path, dst_path)
+ shutil.rmtree(dst_path, ignore_errors=True)
+ shutil.copytree(src_path, dst_path)
- def get_user_wraps(self, path):
- functions_list = self.get_functions_list(path)
- functions_list = [re.sub(r'__wrap_([\S]+)\s*[\d]+', r'\1', line) \
- for line in functions_list if re.search("__wrap_", line)]
+ def get_user_wraps(self, path):
+ functions_list = self.get_functions_list(path)
+ functions_list = [re.sub(r'__wrap_([\S]+)\s*[\d]+', r'\1', line)
+ for line in functions_list if re.search("__wrap_", line)]
- return functions_list
+ return functions_list
- def get_autowrap_file_path(self, test_file_path):
- wrap_file_path = test_file_path.rsplit('.', 1)[0]
- wrap_file_path = wrap_file_path + "_generated_warps.c"
- return wrap_file_path
+ def get_autowrap_file_path(self, test_file_path):
+ wrap_file_path = test_file_path.rsplit('.', 1)[0]
+ wrap_file_path = wrap_file_path + "_generated_warps.c"
+ return wrap_file_path
- def prepare_autowraps(self, test_file_path, preprocessed_file_path):
- functions_to_wrap = self.get_functions_calls(
- self.get_sources_to_test_repo() + test_file_path)
- user_wraps = set(self.get_user_wraps(self.get_main_UT_dir() + test_file_path))
+ def prepare_autowraps(self, test_file_path, preprocessed_file_path):
+ functions_to_wrap = self.get_functions_calls(
+ self.get_sources_to_test_repo() + test_file_path)
+ user_wraps = set(self.get_user_wraps(self.get_main_UT_dir() + test_file_path))
- functions_to_wrap = functions_to_wrap - user_wraps
+ functions_to_wrap = functions_to_wrap - user_wraps
- tags_list = self.get_functions_list(preprocessed_file_path, prototypes=True)
+ tags_list = self.get_functions_list(preprocessed_file_path, prototypes=True)
- wrap_list = []
+ wrap_list = []
- with open(preprocessed_file_path) as f:
- code = f.readlines()
- for function in functions_to_wrap:
- if function.startswith("env_") or function.startswith("bug"):
- continue
- for tag in tags_list:
- if function in tag:
- name, line = tag.split()
- if name == function:
- line = int(line)
- wrap_list.append(self.get_function_wrap(code, line))
- break
+ with open(preprocessed_file_path) as f:
+ code = f.readlines()
+ for function in functions_to_wrap:
+ if function.startswith("env_") or function.startswith("bug"):
+ continue
+ for tag in tags_list:
+ if function in tag:
+ name, line = tag.split()
+ if name == function:
+ line = int(line)
+ wrap_list.append(self.get_function_wrap(code, line))
+ break
- wrap_file_path = self.get_main_UT_dir() + self.get_autowrap_file_path(test_file_path)
+ wrap_file_path = self.get_main_UT_dir() + self.get_autowrap_file_path(test_file_path)
- with open(wrap_file_path, "w") as f:
- f.write("/* This is file is generated by UT framework */\n")
- for wrap in wrap_list:
- f.write(wrap + "\n")
+ with open(wrap_file_path, "w") as f:
+ f.write("/* This is file is generated by UT framework */\n")
+ for wrap in wrap_list:
+ f.write(wrap + "\n")
- def prepare_sources_for_testing(self):
- test_files_paths = self.get_files_with_tests_list()
+ def prepare_sources_for_testing(self):
+ test_files_paths = self.get_files_with_tests_list()
- for test_path in test_files_paths:
- path = self.get_tested_file_path(self.get_main_UT_dir() + test_path)
+ for test_path in test_files_paths:
+ path = self.get_tested_file_path(self.get_main_UT_dir() + test_path)
- preprocessed_tested_path = self.get_preprocessing_repo() + path
- if not os.path.isfile(preprocessed_tested_path):
- print(f"No preprocessed path for {test_path} test file.")
- continue
+ preprocessed_tested_path = self.get_preprocessing_repo() + path
+ if not os.path.isfile(preprocessed_tested_path):
+ print(f"No preprocessed path for {test_path} test file.")
+ continue
- tested_src = self.get_src_to_test(test_path, preprocessed_tested_path)
+ tested_src = self.get_src_to_test(test_path, preprocessed_tested_path)
- self.create_dir_if_not_exist(self.get_sources_to_test_repo() + os.path.dirname(test_path))
+ self.create_dir_if_not_exist(
+ self.get_sources_to_test_repo() + os.path.dirname(test_path))
- with open(self.get_sources_to_test_repo() + test_path, "w") as f:
- f.writelines(tested_src)
- print(f"Sources for {test_path} saved in {self.get_sources_to_test_repo() + test_path}")
+ with open(self.get_sources_to_test_repo() + test_path, "w") as f:
+ f.writelines(tested_src)
+ print(
+ f"Sources for {test_path} saved in + \
+ {self.get_sources_to_test_repo() + test_path}")
- self.prepare_autowraps(test_path, preprocessed_tested_path)
+ self.prepare_autowraps(test_path, preprocessed_tested_path)
- def create_main_cmake_lists(self):
- buf = "cmake_minimum_required(VERSION 2.6.0)\n\n"
- buf += "project(OCF_unit_tests C)\n\n"
+ def create_main_cmake_lists(self):
+ buf = "cmake_minimum_required(VERSION 2.6.0)\n\n"
+ buf += "project(OCF_unit_tests C)\n\n"
- buf += "enable_testing()\n\n"
+ buf += "enable_testing()\n\n"
- buf += "include_directories(\n"
- dirs_to_inc = self.get_dirs_to_include_list() + self.get_framework_includes()\
- + self.get_tests_internal_includes_list()
- for path in dirs_to_inc:
- buf += "\t" + path + "\n"
- buf += ")\n\n"
+ buf += "include_directories(\n"
+ dirs_to_inc = self.get_dirs_to_include_list() + self.get_framework_includes() \
+ + self.get_tests_internal_includes_list()
+ for path in dirs_to_inc:
+ buf += "\t" + path + "\n"
+ buf += ")\n\n"
- includes = self.get_tests_internal_includes_list()
- for path in includes:
- buf += "\nadd_subdirectory(" + path + ")"
- buf += "\n\n"
+ includes = self.get_tests_internal_includes_list()
+ for path in includes:
+ buf += "\nadd_subdirectory(" + path + ")"
+ buf += "\n\n"
- test_files = self.get_files_with_tests_list()
- test_dirs_to_include = [os.path.dirname(path) for path in test_files]
+ test_files = self.get_files_with_tests_list()
+ test_dirs_to_include = [os.path.dirname(path) for path in test_files]
- test_dirs_to_include = self.remove_duplicates_from_list(test_dirs_to_include)
+ test_dirs_to_include = self.remove_duplicates_from_list(test_dirs_to_include)
- for path in test_dirs_to_include:
- buf += "\nadd_subdirectory(" + self.get_sources_to_test_repo() + path + ")"
+ for path in test_dirs_to_include:
+ buf += "\nadd_subdirectory(" + self.get_sources_to_test_repo() + path + ")"
+ with open(self.get_main_UT_dir() + "CMakeLists.txt", "w") as f:
+ f.writelines(buf)
- with open(self.get_main_UT_dir() + "CMakeLists.txt", "w") as f:
- f.writelines(buf)
+ print(f"Main CMakeLists.txt generated written to {self.get_main_UT_dir()} CMakeLists.txt")
- print(f"Main CMakeLists.txt generated written to {self.get_main_UT_dir()} CMakeLists.txt")
+ def generate_cmakes_for_tests(self):
+ test_files_paths = self.get_files_with_tests_list()
- def generate_cmakes_for_tests(self):
- test_files_paths = self.get_files_with_tests_list()
+ for test_path in test_files_paths:
+ tested_file_path = self.get_sources_to_test_repo() + test_path
+ if not os.path.isfile(tested_file_path):
+ print(f"No source to test for {test_path} test")
+ continue
- for test_path in test_files_paths:
- tested_file_path = self.get_sources_to_test_repo() + test_path
- if not os.path.isfile(tested_file_path):
- print(f"No source to test for {test_path} test")
- continue
+ test_file_path = self.get_main_UT_dir() + test_path
- test_file_path = self.get_main_UT_dir() + test_path
+ cmake_buf = self.generate_test_cmake_buf(test_file_path, tested_file_path)
- cmake_buf = self.generate_test_cmake_buf(test_file_path, tested_file_path)
+ cmake_path = self.get_sources_to_test_repo() + test_path
+ cmake_path = os.path.splitext(cmake_path)[0] + ".cmake"
+ with open(cmake_path, "w") as f:
+ f.writelines(cmake_buf)
+ print(f"cmake file for {test_path} written to {cmake_path}")
- cmake_path = self.get_sources_to_test_repo() + test_path
- cmake_path = os.path.splitext(cmake_path)[0] + ".cmake"
- with open(cmake_path, "w") as f:
- f.writelines(cmake_buf)
- print(f"cmake file for {test_path} written to {cmake_path}")
+ cmake_lists_path = os.path.dirname(cmake_path) + os.sep
+ self.update_cmakelists(cmake_lists_path, cmake_path)
- cmake_lists_path = os.path.dirname(cmake_path) + os.sep
- self.update_cmakelists(cmake_lists_path, cmake_path)
+ def generate_test_cmake_buf(self, test_file_path, tested_file_path):
+ test_file_name = os.path.basename(test_file_path)
+ target_name = os.path.splitext(test_file_name)[0]
- def generate_test_cmake_buf(self, test_file_path, tested_file_path):
- test_file_name = os.path.basename(test_file_path)
- target_name = os.path.splitext(test_file_name)[0]
+ add_executable = "add_executable(" + target_name + " " + test_file_path + " " + \
+ tested_file_path + ")\n"
- add_executable = "add_executable(" + target_name + " " + test_file_path + " " + tested_file_path + ")\n"
+ libraries = "target_link_libraries(" + target_name + " libcmocka.so ocf_env)\n"
- libraries = "target_link_libraries(" + target_name + " libcmocka.so ocf_env)\n"
+ add_test = "add_test(" + target_name + " ${CMAKE_CURRENT_BINARY_DIR}/" + target_name + ")\n"
- add_test = "add_test(" + target_name + " ${CMAKE_CURRENT_BINARY_DIR}/" + target_name + ")\n"
+ tgt_properties = "set_target_properties(" + target_name + "\n" + \
+ "PROPERTIES\n" + \
+ "COMPILE_FLAGS \"-fno-inline -Dstatic= -Dinline= -w \"\n"
- tgt_properties = "set_target_properties(" + target_name + "\n" + \
- "PROPERTIES\n" + \
- "COMPILE_FLAGS \"-fno-inline -Dstatic= -Dinline= -w \"\n"
+ link_flags = self.generate_cmake_link_flags(test_file_path)
+ tgt_properties += link_flags + ")"
- link_flags = self.generate_cmake_link_flags(test_file_path)
- tgt_properties += link_flags + ")"
+ buf = add_executable + libraries + add_test + tgt_properties
- buf = add_executable + libraries + add_test + tgt_properties
+ return buf
- return buf
+ def generate_cmake_link_flags(self, path):
+ ret = ""
- def generate_cmake_link_flags(self, path):
- ret = ""
+ autowraps_path = self.get_autowrap_file_path(path)
+ functions_to_wrap = self.get_functions_to_wrap(path)
+ functions_to_wrap += self.get_functions_to_wrap(autowraps_path)
- autowraps_path = self.get_autowrap_file_path(path)
- functions_to_wrap = self.get_functions_to_wrap(path)
- functions_to_wrap += self.get_functions_to_wrap(autowraps_path)
+ for function_name in functions_to_wrap:
+ ret += ",--wrap=" + function_name
+ if len(ret) > 0:
+ ret = "LINK_FLAGS \"-Wl" + ret + "\"\n"
- for function_name in functions_to_wrap:
- ret += ",--wrap=" + function_name
- if len(ret) > 0:
- ret = "LINK_FLAGS \"-Wl" + ret + "\"\n"
+ return ret
- return ret
+ def update_cmakelists(self, cmake_lists_path, cmake_name):
+ with open(cmake_lists_path + "CMakeLists.txt", "a+") as f:
+ f.seek(0, os.SEEK_SET)
+ new_line = "include(" + os.path.basename(cmake_name) + ")\n"
- def update_cmakelists(self, cmake_lists_path, cmake_name):
- with open(cmake_lists_path + "CMakeLists.txt", "a+") as f:
- f.seek(0, os.SEEK_SET)
- new_line = "include(" + os.path.basename(cmake_name) + ")\n"
+ if new_line not in f.read():
+ f.write(new_line)
- if not new_line in f.read():
- f.write(new_line)
+ def get_functions_to_wrap(self, path):
+ functions_list = self.get_functions_list(path)
+ functions_list = [re.sub(r'__wrap_([\S]+)\s*[\d]+', r'\1', line) for line in functions_list
+ if re.search("__wrap_", line)]
- def get_functions_to_wrap(self, path):
- functions_list = self.get_functions_list(path)
- functions_list = [re.sub(r'__wrap_([\S]+)\s*[\d]+', r'\1', line) for line in functions_list if re.search("__wrap_", line)]
+ return functions_list
- return functions_list
+ def get_functions_to_leave(self, path):
+ with open(path) as f:
+ lines = f.readlines()
+ buf = ''.join(lines)
- def get_functions_to_leave(self, path):
- with open(path) as f:
- l = f.readlines()
- buf = ''.join(l)
+ tags_pattern = re.compile(r"[\s\S]*")
- tags_pattern = re.compile("[\s\S]*")
+ buf = re.findall(tags_pattern, buf)
+ if not len(buf) > 0:
+ return []
- buf = re.findall(tags_pattern, buf)
- if not len(buf) > 0:
- return []
+ buf = buf[0]
- buf = buf[0]
+ buf = re.sub(r'<.*>', '', buf)
+ buf = re.sub(r'[^a-zA-Z0-9_\n]+', '', buf)
- buf = re.sub(r'<.*>', '', buf)
- buf = re.sub(r'[^a-zA-Z0-9_\n]+', '', buf)
+ ret = buf.split("\n")
+ ret = [name for name in ret if name]
+ return ret
- ret = buf.split("\n")
- ret = [name for name in ret if name]
- return ret
+ def get_functions_list(self, file_path, prototypes=None):
+ ctags_path = self.get_ctags_path()
- def get_functions_list(self, file_path, prototypes=None):
- ctags_path = self.get_ctags_path()
+ ctags_args = "--c-types=f"
+ if prototypes:
+ ctags_args += " --c-kinds=+p"
+ # find all functions' definitions | put tabs instead of spaces |
+ # take only columns with function name and line number | sort in descending order
+ result = run_command([ctags_path, "-x", ctags_args, file_path,
+ "--language-force=c | sed \"s/ \\+/\t/g\" | cut -f 1,3 | sort -nsr "
+ "-k 2"])
- ctags_args = "--c-types=f"
- if prototypes == True:
- ctags_args += " --c-kinds=+p"
- # find all functions' definitions | put tabs instead of spaces |
- # take only columns with function name and line number | sort in descending order
- result = run_command([ctags_path, "-x", ctags_args, file_path,
- "--language-force=c | sed \"s/ \\+/\t/g\" | cut -f 1,3 | sort -nsr -k 2"])
+ # 'output' is string, but it has to be changed to list
+ output = list(filter(None, result.stdout.split("\n")))
+ return output
- # 'output' is string, but it has to be changed to list
- output = list(filter(None, result.stdout.split("\n")))
- return output
+ def remove_functions_from_list(self, functions_list, to_remove_list):
+ ret = functions_list[:]
+ for function_name in to_remove_list:
+ ret = [line for line in ret if not re.search(r'\b%s\b' % function_name, line)]
+ return ret
- def remove_functions_from_list(self, functions_list, to_remove_list):
- ret = functions_list[:]
- for function_name in to_remove_list:
- ret = [line for line in ret if not re.search(r'\b%s\b' % function_name, line)]
- return ret
+ def get_src_to_test(self, test_path, preprocessed_tested_path):
+ functions_to_leave = self.get_functions_to_leave(self.get_main_UT_dir() + test_path)
- def get_src_to_test(self, test_path, preprocessed_tested_path):
- functions_to_leave = self.get_functions_to_leave(self.get_main_UT_dir() + test_path)
+ functions_to_leave.append(self.get_tested_function_name(self.get_main_UT_dir() + test_path))
+ functions_list = self.get_functions_list(preprocessed_tested_path)
- functions_to_leave.append(self.get_tested_function_name(self.get_main_UT_dir() + test_path))
- functions_list = self.get_functions_list(preprocessed_tested_path)
+ functions_list = self.remove_functions_from_list(functions_list, functions_to_leave)
- functions_list = self.remove_functions_from_list(functions_list, functions_to_leave)
+ with open(preprocessed_tested_path) as f:
+ ret = f.readlines()
+ for function in functions_list:
+ line = function.split("\t")[1]
+ line = int(line)
- with open(preprocessed_tested_path) as f:
- ret = f.readlines()
- for function in functions_list:
- line = function.split("\t")[1]
- line = int(line)
+ self.remove_function_body(ret, line)
- self.remove_function_body(ret, line)
+ return ret
- return ret
+ def set_tested_files_paths_list(self):
+ test_files_list = self.get_files_with_tests_list()
- def set_tested_files_paths_list(self):
- test_files_list = self.get_files_with_tests_list()
+ for f in test_files_list:
+ self.tested_files_paths_list.append(self.get_main_tested_dir()
+ + self.get_tested_file_path(
+ self.get_main_UT_dir() + f))
- for f in test_files_list:
- self.tested_files_paths_list.append(self.get_main_tested_dir() +\
- self.get_tested_file_path(self.get_main_UT_dir() + f))
+ self.tested_files_paths_list = self.remove_duplicates_from_list(
+ self.tested_files_paths_list)
- self.tested_files_paths_list = self.remove_duplicates_from_list(self.tested_files_paths_list)
+ def get_tested_files_paths_list(self):
+ return self.tested_files_paths_list
- def get_tested_files_paths_list(self):
- return self.tested_files_paths_list
+ def get_files_with_tests_list(self):
+ return self.test_files_paths_list
- def get_files_with_tests_list(self):
- return self.test_files_paths_list
+ def set_files_with_tests_list(self):
+ test_catalogues_list = self.get_tests_catalouges_list()
+ for catalogue in test_catalogues_list:
+ dir_with_tests_path = self.get_main_UT_dir() + catalogue
- def set_files_with_tests_list(self):
- test_catalogues_list = self.get_tests_catalouges_list()
- for catalogue in test_catalogues_list:
- dir_with_tests_path = self.get_main_UT_dir() + catalogue
+ for path, dirs, files in os.walk(dir_with_tests_path):
+ test_files = self.get_test_files_from_dir(path + os.sep)
- for path, dirs, files in os.walk(dir_with_tests_path):
- test_files = self.get_test_files_from_dir(path + os.sep)
+ for test_file_name in test_files:
+ test_rel_path = os.path.relpath(path + os.sep + test_file_name,
+ self.get_main_UT_dir())
+ self.test_files_paths_list.append(test_rel_path)
- for test_file_name in test_files:
- test_rel_path = os.path.relpath(path + os.sep + test_file_name, self.get_main_UT_dir())
- self.test_files_paths_list.append(test_rel_path)
+ def are_markups_valid(self, path):
+ file_path = self.get_tested_file_path(path)
+ function_name = self.get_tested_function_name(path)
- def are_markups_valid(self, path):
- file_path = self.get_tested_file_path(path)
- function_name = self.get_tested_function_name(path)
+ if file_path is None:
+ print(f"{path} file has no tested_file tag!")
+ return None
+ elif not os.path.isfile(self.get_main_tested_dir() + file_path):
+ print(f"Tested file given in {path} does not exist!")
+ return None
- if file_path is None:
- print(f"{path} file has no tested_file tag!")
- return None
- elif not os.path.isfile(self.get_main_tested_dir() + file_path):
- print(f"Tested file given in {path} does not exist!")
- return None
+ if function_name is None:
+ print(f"{path} file has no tested_function_name tag!")
+ return None
- if function_name is None:
- print(f"{path} file has no tested_function_name tag!")
- return None
+ return True
- return True
+ def create_dir_if_not_exist(self, path):
+ if not os.path.isdir(path):
+ try:
+ os.makedirs(path)
+ except Exception:
+ pass
+ return True
+ return None
- def create_dir_if_not_exist(self, path):
- if not os.path.isdir(path):
- try:
- os.makedirs(path)
- except Exception:
- pass
- return True
- return None
+ def get_tested_file_path(self, test_file_path):
+ with open(test_file_path) as f:
+ buf = f.readlines()
+ buf = ''.join(buf)
- def get_tested_file_path(self, test_file_path):
- with open(test_file_path) as f:
- buf = f.readlines()
- buf = ''.join(buf)
+ tags_pattern = re.compile(r"[\s\S]*")
+ buf = re.findall(tags_pattern, buf)
- tags_pattern = re.compile("[\s\S]*")
- buf = re.findall(tags_pattern, buf)
+ if not len(buf) > 0:
+ return None
- if not len(buf) > 0:
- return None
+ buf = buf[0]
- buf = buf[0]
+ buf = re.sub(r'<[^>]*>', '', buf)
+ buf = re.sub(r'\s+', '', buf)
- buf = re.sub(r'<[^>]*>', '', buf)
- buf = re.sub(r'\s+', '', buf)
+ if len(buf) > 0:
+ return buf
- if len(buf) > 0:
- return buf
+ return None
- return None
+ def get_tested_function_name(self, test_file_path):
+ with open(test_file_path) as f:
+ buf = f.readlines()
+ buf = ''.join(buf)
- def get_tested_function_name(self, test_file_path):
- with open(test_file_path) as f:
- buf = f.readlines()
- buf = ''.join(buf)
+ tags_pattern = re.compile(r"[\s\S]*")
+ buf = re.findall(tags_pattern, buf)
- tags_pattern = re.compile("[\s\S]*")
- buf = re.findall(tags_pattern, buf)
+ if not len(buf) > 0:
+ return None
- if not len(buf) > 0:
- return None
+ buf = buf[0]
- buf = buf[0]
+ buf = re.sub(r'<[^>]*>', '', buf)
+ buf = re.sub('//', '', buf)
+ buf = re.sub(r'\s+', '', buf)
- buf = re.sub(r'<[^>]*>', '', buf)
- buf = re.sub('//', '', buf)
- buf = re.sub(r'\s+', '', buf)
+ if len(buf) > 0:
+ return buf
- if len(buf) > 0:
- return buf
+ return None
+
+ def get_test_files_from_dir(self, path):
+ ret = os.listdir(path)
+ ret = [name for name in ret if os.path.isfile(path + os.sep + name)
+ and (name.endswith(".c") or name.endswith(".h"))]
+ ret = [name for name in ret if self.are_markups_valid(path + name)]
+
+ return ret
+
+ def get_list_of_directories(self, path):
+ if not os.path.isdir(path):
+ return []
+
+ ret = os.listdir(path)
+ ret = [name for name in ret if not os.path.isfile(path + os.sep + name)]
+ ret = [os.path.normpath(name) + os.sep for name in ret]
+
+ return ret
+
+ def remove_hashes(self, path):
+ with open(path) as f:
+ buf = f.readlines()
+
+ buf = [l for l in buf if not re.search(r'.*#.*', l)]
+
+ with open(path, "w") as f:
+ f.writelines(buf)
+
+ return
+ for i in range(len(padding)):
+ try:
+ padding[i] = padding[i].split("#")[0]
+ except ValueError:
+ continue
+
+ f = open(path, "w")
+ f.writelines(padding)
+ f.close()
+
+ def find_function_end(self, code_lines_list, first_line_of_function_index):
+ brackets_counter = 0
+ current_line_index = first_line_of_function_index
+
+ while True:
+ if "{" in code_lines_list[current_line_index]:
+ brackets_counter += code_lines_list[current_line_index].count("{")
+ brackets_counter -= code_lines_list[current_line_index].count("}")
+ break
+ else:
+ current_line_index += 1
+
+ while brackets_counter > 0:
+ current_line_index += 1
+ if "{" in code_lines_list[current_line_index]:
+ brackets_counter += code_lines_list[current_line_index].count("{")
+ brackets_counter -= code_lines_list[current_line_index].count("}")
+ elif "}" in code_lines_list[current_line_index]:
+ brackets_counter -= code_lines_list[current_line_index].count("}")
+
+ return current_line_index
+
+ def get_functions_calls(self, file_to_compile):
+ out_dir = "/tmp/ocf_ut"
+ out_file = out_dir + "/ocf_obj.o"
+ self.create_dir_if_not_exist(out_dir)
+ cmd = "/usr/bin/gcc -o " + out_file + " -c " + file_to_compile + " &> /dev/null"
+ run_command([cmd], verbose=None)
+ result = run_command(["/usr/bin/nm -u " + out_file + " | cut -f2 -d\'U\'"])
+ return set(result.stdout.split())
+
+ def remove_function_body(self, code_lines_list, line_id):
+ try:
+ while "{" not in code_lines_list[line_id]:
+ if ";" in code_lines_list[line_id]:
+ return
+ line_id += 1
+ except IndexError:
+ return
+
+ last_line_id = self.find_function_end(code_lines_list, line_id)
+
+ code_lines_list[line_id] = code_lines_list[line_id].split("{")[0]
+ code_lines_list[line_id] += ";"
+
+ del code_lines_list[line_id + 1: last_line_id + 1]
+
+ def get_function_wrap(self, code_lines_list, line_id):
+ ret = []
+ # Line numbering starts with one, list indexing with zero
+ line_id -= 1
+
+ # If returned type is not present, it should be in line above
+ try:
+ code_lines_list[line_id].split("(")[0].rsplit()[1]
+ except IndexError:
+ line_id -= 1
+
+ while True:
+ ret.append(code_lines_list[line_id])
+ if ")" in code_lines_list[line_id]:
+ break
+ line_id += 1
+
+ # Tags list contains both prototypes and definitions, here we recoginze
+ # with which one we deals
+ delimiter = ""
+ try:
+ if "{" in ret[-1] or "{" in ret[-2]:
+ delimter = "{"
+ else:
+ delimiter = ";"
+ except IndexError:
+ delimiter = ";"
+
+ ret[-1] = ret[-1].split(delimiter)[0]
+ ret[-1] += "{}"
+
+ function_name = ""
+ line_with_name = 0
+ try:
+ function_name = ret[line_with_name].split("(")[0].rsplit(maxsplit=1)[1]
+ except IndexError:
+ line_with_name = 1
+ function_name = ret[line_with_name].split("(")[0]
+
+ function_new_name = "__wrap_" + function_name.replace("*", "")
+ ret[0] = ret[0].replace(function_name, function_new_name)
+
+ return ''.join(ret)
+
+ def set_ctags_path(self):
+ result = run_command(["/usr/bin/ctags --version &> /dev/null"])
+ if result.returncode == 0:
+ path = "/usr/bin/ctags "
+ result = run_command([path, "--c-types=f"], verbose=None)
+ if not re.search("unrecognized option", result.stdout, re.IGNORECASE):
+ self.ctags_path = path
+ return
+
+ result = run_command(["/usr/local/bin/ctags --version &> /dev/null"])
+ if result.returncode == 0:
+ path = "/usr/local/bin/ctags "
+ result = run_command(["path", "--c-types=f"], verbose=None)
+ if not re.search("unrecognized option", result.stdout, re.IGNORECASE):
+ self.ctags_path = path
+ return
+
+ print("ERROR: Current ctags version don't support \"--c-types=f\" parameter!")
+ exit(1)
+
+ def get_ctags_path(self):
+ return self.ctags_path
+
+ def get_tests_catalouges_list(self):
+ return self.test_catalogues_list
+
+ def get_relative_path(self, original_path, part_to_remove):
+ return original_path.split(part_to_remove, 1)[1]
+
+ def get_dirs_to_include_list(self):
+ return self.dirs_to_include_list
+
+ def set_dirs_to_include(self):
+ self.dirs_to_include_list = [self.get_main_tested_dir() + name
+ for name in
+ tests_config.DIRECTORIES_TO_INCLUDE_FROM_PROJECT_LIST]
+
+ def set_tests_internal_includes_list(self):
+ self.tests_internal_includes_list = [self.get_main_UT_dir() + name
+ for name in
+ tests_config.DIRECTORIES_TO_INCLUDE_FROM_UT_LIST]
+
+ def set_preprocessing_repo(self):
+ self.preprocessing_repo = self.get_main_UT_dir() \
+ + tests_config.PREPROCESSED_SOURCES_REPOSITORY
+
+ def set_sources_to_test_repo(self):
+ self.sources_to_test_repo = self.get_main_UT_dir() + tests_config.SOURCES_TO_TEST_REPOSITORY
+
+ def get_sources_to_test_repo(self):
+ return self.sources_to_test_repo
+
+ def get_preprocessing_repo(self):
+ return self.preprocessing_repo
+
+ def get_tests_internal_includes_list(self):
+ return self.tests_internal_includes_list
+
+ def get_script_dir_path(self):
+ return os.path.normpath(self.script_dir_abs_path) + os.sep
+
+ def get_main_UT_dir(self):
+ return os.path.normpath(self.main_UT_dir) + os.sep
+
+ def get_main_tested_dir(self):
+ return os.path.normpath(self.main_tested_dir) + os.sep
+
+ def remove_duplicates_from_list(self, l):
+ return list(set(l))
+
+ def set_framework_includes(self):
+ self.framework_includes = tests_config.FRAMEWORK_DIRECTORIES_TO_INCLUDE_LIST
+
+ def get_framework_includes(self):
+ return self.framework_includes
+
+ def set_includes_to_copy_dict(self, files_to_copy_dict):
+ self.includes_to_copy_dict = files_to_copy_dict
- return None
+ def get_includes_to_copy_dict(self):
+ return self.includes_to_copy_dict
- def get_test_files_from_dir(self, path):
- ret = os.listdir(path)
- ret = [name for name in ret if os.path.isfile(path + os.sep + name) and (name.endswith(".c") or name.endswith(".h"))]
- ret = [name for name in ret if self.are_markups_valid(path + name)]
+ def set_main_UT_dir(self):
+ main_UT_dir = os.path.normpath(os.path.normpath(self.get_script_dir_path()
+ + os.sep
+ + tests_config.
+ MAIN_DIRECTORY_OF_UNIT_TESTS))
+ if not os.path.isdir(main_UT_dir):
+ print("Given path to main UT directory is wrong!")
+ sys.exit(1)
- return ret
+ self.main_UT_dir = main_UT_dir
- def get_list_of_directories(self, path):
- if not os.path.isdir(path):
- return []
+ def set_main_tested_dir(self):
+ main_tested_dir = os.path.normpath(os.path.normpath(self.get_script_dir_path()
+ + os.sep
+ + tests_config.
+ MAIN_DIRECTORY_OF_TESTED_PROJECT))
+ if not os.path.isdir(main_tested_dir):
+ print("Given path to main tested directory is wrong!")
+ sys.exit(1)
- ret = os.listdir(path)
- ret = [name for name in ret if not os.path.isfile(path + os.sep + name)]
- ret = [os.path.normpath(name) + os.sep for name in ret]
-
- return ret
-
- def remove_hashes(self, path):
- with open(path) as f:
- buf = f.readlines()
-
- buf = [l for l in buf if not re.search(r'.*#.*', l)]
-
- with open(path, "w") as f:
- f.writelines(buf)
-
- return
- for i in range(len(padding)):
- try:
- padding[i] = padding[i].split("#")[0]
- except ValueError:
- continue
-
- f = open(path, "w")
- f.writelines(padding)
- f.close()
-
- def find_function_end(self,code_lines_list, first_line_of_function_index):
- brackets_counter = 0
- current_line_index = first_line_of_function_index
-
- while True:
- if "{" in code_lines_list[current_line_index]:
- brackets_counter += code_lines_list[current_line_index].count("{")
- brackets_counter -= code_lines_list[current_line_index].count("}")
- break
- else:
- current_line_index += 1
-
- while brackets_counter > 0:
- current_line_index += 1
- if "{" in code_lines_list[current_line_index]:
- brackets_counter += code_lines_list[current_line_index].count("{")
- brackets_counter -= code_lines_list[current_line_index].count("}")
- elif "}" in code_lines_list[current_line_index]:
- brackets_counter -= code_lines_list[current_line_index].count("}")
-
- return current_line_index
-
- def get_functions_calls(self, file_to_compile):
- out_dir = "/tmp/ocf_ut"
- out_file = out_dir + "/ocf_obj.o"
- self.create_dir_if_not_exist(out_dir)
- cmd = "/usr/bin/gcc -o " + out_file + " -c " + file_to_compile + " &> /dev/null"
- run_command([cmd], verbose=None)
- result = run_command(["/usr/bin/nm -u " + out_file + " | cut -f2 -d\'U\'"])
- return set(result.stdout.split())
-
-
- def remove_function_body(self, code_lines_list, line_id):
- try:
- while "{" not in code_lines_list[line_id]:
- if ";" in code_lines_list[line_id]:
- return
- line_id += 1
- except IndexError:
- return
-
- last_line_id = self.find_function_end(code_lines_list, line_id)
-
- code_lines_list[line_id] = code_lines_list[line_id].split("{")[0]
- code_lines_list[line_id] += ";"
-
- del code_lines_list[line_id + 1: last_line_id + 1]
-
-
- def get_function_wrap(self, code_lines_list, line_id):
- ret = []
- # Line numbering starts with one, list indexing with zero
- line_id -= 1
-
- # If returned type is not present, it should be in line above
- try:
- code_lines_list[line_id].split("(")[0].rsplit()[1]
- except IndexError:
- line_id -= 1
-
- while True:
- ret.append(code_lines_list[line_id])
- if ")" in code_lines_list[line_id]:
- break
- line_id += 1
-
- # Tags list contains both prototypes and definitions, here we recoginze
- # with which one we deals
- delimiter = ""
- try:
- if "{" in ret[-1] or "{" in ret[-2]:
- delimter = "{"
- else:
- delimiter =";"
- except IndexError:
- delimiter =";"
-
- ret[-1] = ret[-1].split(delimiter)[0]
- ret[-1] += "{}"
-
- function_name = ""
- line_with_name = 0
- try:
- function_name = ret[line_with_name].split("(")[0].rsplit(maxsplit=1)[1]
- except IndexError:
- line_with_name = 1
- function_name = ret[line_with_name].split("(")[0]
-
- function_new_name = "__wrap_" + function_name.replace("*", "")
- ret[0] = ret[0].replace(function_name, function_new_name)
-
- return ''.join(ret)
-
- def set_ctags_path(self):
- result = run_command(["/usr/bin/ctags --version &> /dev/null"])
- if result.returncode == 0:
- path = "/usr/bin/ctags "
- result = run_command([path, "--c-types=f"], verbose=None)
- if not re.search("unrecognized option", result.stdout, re.IGNORECASE):
- self.ctags_path = path
- return
-
- result = run_command(["/usr/local/bin/ctags --version &> /dev/null"])
- if result.returncode == 0:
- path = "/usr/local/bin/ctags "
- result = run_command(["path", "--c-types=f"], verbose=None)
- if not re.search("unrecognized option", result.stdout, re.IGNORECASE):
- self.ctags_path = path
- return
-
- print("ERROR: Current ctags version don't support \"--c-types=f\" parameter!")
- exit(1)
-
- def get_ctags_path(self):
- return self.ctags_path
-
- def get_tests_catalouges_list(self):
- return self.test_catalouges_list
-
- def get_relative_path(self, original_path, part_to_remove):
- return original_path.split(part_to_remove, 1)[1]
-
- def get_dirs_to_include_list(self):
- return self.dirs_to_include_list
-
- def set_dirs_to_include(self):
- self.dirs_to_include_list = [self.get_main_tested_dir() + name\
- for name in tests_config.DIRECTORIES_TO_INCLUDE_FROM_PROJECT_LIST]
+ self.main_tested_dir = main_tested_dir
- def set_tests_internal_includes_list(self):
- self.tests_internal_includes_list = [self.get_main_UT_dir() + name\
- for name in tests_config.DIRECTORIES_TO_INCLUDE_FROM_UT_LIST]
-
- def set_preprocessing_repo(self):
- self.preprocessing_repo = self.get_main_UT_dir() +\
- tests_config.PREPROCESSED_SOURCES_REPOSITORY
-
- def set_sources_to_test_repo(self):
- self.sources_to_test_repo = self.get_main_UT_dir() +\
- tests_config.SOURCES_TO_TEST_REPOSITORY
-
- def get_sources_to_test_repo(self):
- return self.sources_to_test_repo
-
- def get_preprocessing_repo(self):
- return self.preprocessing_repo
-
- def get_tests_internal_includes_list(self):
- return self.tests_internal_includes_list
-
- def get_script_dir_path(self):
- return os.path.normpath(self.script_dir_abs_path) + os.sep
-
- def get_main_UT_dir(self):
- return os.path.normpath(self.main_UT_dir) + os.sep
-
- def get_main_tested_dir(self):
- return os.path.normpath(self.main_tested_dir) + os.sep
-
- def remove_duplicates_from_list(self, l):
- return list(set(l))
-
- def set_framework_includes(self):
- self.framework_includes = tests_config.FRAMEWORK_DIRECTORIES_TO_INCLUDE_LIST
-
- def get_framework_includes(self):
- return self.framework_includes
-
- def set_includes_to_copy_dict(self, files_to_copy_dict):
- self.includes_to_copy_dict = files_to_copy_dict
-
- def get_includes_to_copy_dict(self):
- return self.includes_to_copy_dict
-
- def set_main_UT_dir(self):
- main_UT_dir = os.path.normpath(os.path.normpath(self.get_script_dir_path()\
- + os.sep + tests_config.MAIN_DIRECTORY_OF_UNIT_TESTS))
- if not os.path.isdir(main_UT_dir):
- print("Given path to main UT directory is wrong!")
- sys.exit(1)
-
- self.main_UT_dir = main_UT_dir
-
- def set_main_tested_dir(self):
- main_tested_dir = os.path.normpath(os.path.normpath(self.get_script_dir_path()\
- + os.sep + tests_config.MAIN_DIRECTORY_OF_TESTED_PROJECT))
- if not os.path.isdir(main_tested_dir):
- print("Given path to main tested directory is wrong!")
- sys.exit(1)
-
- self.main_tested_dir = main_tested_dir
def __main__():
+ generator = UnitTestsSourcesGenerator()
+ generator.copy_includes()
+ generator.preprocessing()
+ generator.prepare_sources_for_testing()
+ generator.create_main_cmake_lists()
+ generator.generate_cmakes_for_tests()
- generator = UnitTestsSourcesGenerator()
- generator.copy_includes()
- generator.preprocessing()
- generator.prepare_sources_for_testing()
- generator.create_main_cmake_lists()
- generator.generate_cmakes_for_tests()
+ print("Files for testing generated!")
- print("Files for testing generated!")
if __name__ == "__main__":
- __main__()
+ __main__()
diff --git a/tests/unit/framework/run_unit_tests.py b/tests/unit/framework/run_unit_tests.py
index 8e1aac2..d72269c 100755
--- a/tests/unit/framework/run_unit_tests.py
+++ b/tests/unit/framework/run_unit_tests.py
@@ -10,13 +10,15 @@ import os
import sys
import subprocess
+
def run_command(args):
result = subprocess.run(" ".join(args), shell=True,
- stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ stdout=subprocess.PIPE, stderr=subprocess.PIPE)
result.stdout = result.stdout.decode("ASCII", errors='ignore')
result.stderr = result.stderr.decode("ASCII", errors='ignore')
return result
+
script_path = os.path.dirname(os.path.realpath(__file__))
main_UT_dir = os.path.join(script_path, tests_config.MAIN_DIRECTORY_OF_UNIT_TESTS)
@@ -29,13 +31,13 @@ if not os.path.isdir(os.path.join(main_UT_dir, "ocf_env", "ocf")):
except Exception:
raise Exception("Cannot create ocf_env/ocf directory!")
-result = run_command([ "cp", "-r",
- os.path.join(main_tested_dir, "inc", "*"),
- os.path.join(main_UT_dir, "ocf_env", "ocf") ])
+result = run_command(["cp", "-r",
+ os.path.join(main_tested_dir, "inc", "*"),
+ os.path.join(main_UT_dir, "ocf_env", "ocf")])
if result.returncode != 0:
raise Exception("Preparing sources for testing failed!")
-result = run_command([ os.path.join(script_path, "prepare_sources_for_testing.py") ])
+result = run_command([os.path.join(script_path, "prepare_sources_for_testing.py")])
if result.returncode != 0:
raise Exception("Preparing sources for testing failed!")
@@ -52,7 +54,7 @@ except Exception:
os.chdir(build_dir)
-cmake_result = run_command([ "cmake", ".." ])
+cmake_result = run_command(["cmake", ".."])
print(cmake_result.stdout)
with open(os.path.join(logs_dir, "cmake.output"), "w") as f:
@@ -64,20 +66,20 @@ if cmake_result.returncode != 0:
f.write("Cmake step failed! More details in cmake.output.")
sys.exit(1)
-make_result = run_command([ "make", "-j" ])
+make_result = run_command(["make", "-j"])
print(make_result.stdout)
with open(os.path.join(logs_dir, "make.output"), "w") as f:
- f.write(make_result.stdout)
- f.write(make_result.stderr)
+ f.write(make_result.stdout)
+ f.write(make_result.stderr)
if make_result.returncode != 0:
- with open(os.path.join(logs_dir, "tests.output"), "w") as f:
- f.write("Make step failed! More details in make.output.")
- sys.exit(1)
+ with open(os.path.join(logs_dir, "tests.output"), "w") as f:
+ f.write("Make step failed! More details in make.output.")
+ sys.exit(1)
-test_result = run_command([ "make", "test" ])
+test_result = run_command(["make", "test"])
print(test_result.stdout)
-with open(os.path.join(logs_dir , "tests.output"), "w") as f:
- f.write(test_result.stdout)
+with open(os.path.join(logs_dir, "tests.output"), "w") as f:
+ f.write(test_result.stdout)
diff --git a/tests/unit/framework/tests_config.py b/tests/unit/framework/tests_config.py
index 19a1761..2dd3130 100644
--- a/tests/unit/framework/tests_config.py
+++ b/tests/unit/framework/tests_config.py
@@ -11,25 +11,34 @@ MAIN_DIRECTORY_OF_TESTED_PROJECT = "../../../"
MAIN_DIRECTORY_OF_UNIT_TESTS = "../tests/"
-# Paths to all directories, in which tests are stored. All paths should be relative to MAIN_DIRECTORY_OF_UNIT_TESTS
-DIRECTORIES_WITH_TESTS_LIST = ["cleaning/", "metadata/", "mngt/", "concurrency/", "engine/", "eviction/", "utils/"]
+# Paths to all directories, in which tests are stored. All paths should be relative to
+# MAIN_DIRECTORY_OF_UNIT_TESTS
+DIRECTORIES_WITH_TESTS_LIST = ["cleaning/", "metadata/", "mngt/", "concurrency/", "engine/",
+ "eviction/", "utils/"]
-# Paths to all directories containing files with sources. All paths should be relative to MAIN_DIRECTORY_OF_TESTED_PROJECT
-DIRECTORIES_TO_INCLUDE_FROM_PROJECT_LIST = ["src/", "src/cleaning/", "src/engine/", "src/metadata/", "src/eviction/", "src/mngt/", "src/concurrency/", "src/utils/", "inc/"]
+# Paths to all directories containing files with sources. All paths should be relative to
+# MAIN_DIRECTORY_OF_TESTED_PROJECT
+DIRECTORIES_TO_INCLUDE_FROM_PROJECT_LIST = ["src/", "src/cleaning/", "src/engine/", "src/metadata/",
+ "src/eviction/", "src/mngt/", "src/concurrency/",
+ "src/utils/", "inc/"]
# Paths to all directories from directory with tests, which should also be included
DIRECTORIES_TO_INCLUDE_FROM_UT_LIST = ["ocf_env/"]
# Paths to include, required by cmake, cmocka, cunit
-FRAMEWORK_DIRECTORIES_TO_INCLUDE_LIST = ["${CMOCKA_PUBLIC_INCLUDE_DIRS}" ,"${CMAKE_BINARY_DIR}", "${CMAKE_CURRENT_SOURCE_DIR}"]
+FRAMEWORK_DIRECTORIES_TO_INCLUDE_LIST = ["${CMOCKA_PUBLIC_INCLUDE_DIRS}", "${CMAKE_BINARY_DIR}",
+ "${CMAKE_CURRENT_SOURCE_DIR}"]
-# Path to directory containing all sources after preprocessing. Should be relative to MAIN_DIRECTORY_OF_UNIT_TESTS
+# Path to directory containing all sources after preprocessing. Should be relative to
+# MAIN_DIRECTORY_OF_UNIT_TESTS
PREPROCESSED_SOURCES_REPOSITORY = "preprocessed_sources_repository/"
-# Path to directory containing all sources after removing unneeded functions and cmake files for tests
+# Path to directory containing all sources after removing unneeded functions and cmake files for
+# tests
SOURCES_TO_TEST_REPOSITORY = "sources_to_test_repository/"
-# List of includes. Directories will be recursively copied to given destinations in directory with tests.
+# List of includes.
+# Directories will be recursively copied to given destinations in directory with tests.
# key - destination in dir with tests
# value - path in tested project to dir which should be copied
-INCLUDES_TO_COPY_DICT = { 'ocf_env/ocf/' : "inc/" }
+INCLUDES_TO_COPY_DICT = {'ocf_env/ocf/': "inc/"}