diff --git a/tests/functional/tests/security/conftest.py b/tests/functional/tests/security/conftest.py index aa859bb..486f182 100644 --- a/tests/functional/tests/security/conftest.py +++ b/tests/functional/tests/security/conftest.py @@ -11,33 +11,33 @@ from ctypes import ( c_uint16, c_int ) -from tests.utils import get_random_strings, get_random_ints +from tests.utils.random import RandomStringGenerator, RandomGenerator, DefaultRanges import pytest sys.path.append(os.path.join(os.path.dirname(__file__), os.path.pardir)) -@pytest.fixture(params=get_random_ints(c_uint16)) +@pytest.fixture(params=RandomGenerator(DefaultRanges.UINT16)) def c_uint16_randomize(request): return request.param -@pytest.fixture(params=get_random_ints(c_uint32)) +@pytest.fixture(params=RandomGenerator(DefaultRanges.UINT32)) def c_uint32_randomize(request): return request.param -@pytest.fixture(params=get_random_ints(c_uint64)) +@pytest.fixture(params=RandomGenerator(DefaultRanges.UINT64)) def c_uint64_randomize(request): return request.param -@pytest.fixture(params=get_random_ints(c_int)) +@pytest.fixture(params=RandomGenerator(DefaultRanges.INT)) def c_int_randomize(request): return request.param -@pytest.fixture(params=get_random_strings()) +@pytest.fixture(params=RandomStringGenerator()) def string_randomize(request): return request.param diff --git a/tests/functional/tests/security/test_management_fuzzy.py b/tests/functional/tests/security/test_management_fuzzy.py index 61420c0..263ecf6 100644 --- a/tests/functional/tests/security/test_management_fuzzy.py +++ b/tests/functional/tests/security/test_management_fuzzy.py @@ -10,7 +10,7 @@ from pyocf.types.cache import Cache, CacheMode, CleaningPolicy,\ from pyocf.types.core import Core from pyocf.types.volume import Volume from pyocf.utils import Size as S -from tests.utils import generate_random_numbers +from tests.utils.random import RandomGenerator, DefaultRanges from pyocf.types.shared import OcfError, CacheLineSize, SeqCutOffPolicy from ctypes import ( c_uint64, @@ -36,7 +36,7 @@ def test_neg_change_cache_mode(pyocf_ctx, cm, cls): ) # Change cache mode to invalid one and check if failed - for i in generate_random_numbers(c_uint32): + for i in RandomGenerator(DefaultRanges.UINT32): if i in [item.value for item in CacheMode]: continue with pytest.raises(OcfError, match="Error changing cache mode"): @@ -61,7 +61,7 @@ def test_neg_set_cleaning_policy(pyocf_ctx, cm, cls): ) # Set cleaning policy to invalid one and check if failed - for i in generate_random_numbers(c_uint32): + for i in RandomGenerator(DefaultRanges.UINT32): if i in [item.value for item in CleaningPolicy]: continue with pytest.raises(OcfError, match="Error changing cleaning policy"): @@ -86,7 +86,7 @@ def test_neg_attach_cls(pyocf_ctx, cm, cls): cache.start_cache() # Check whether it is possible to attach cache device with invalid cache line size - for i in generate_random_numbers(c_uint64): + for i in RandomGenerator(DefaultRanges.UINT64): if i in [item.value for item in CacheLineSize]: continue with pytest.raises(OcfError, match="Attaching cache device failed"): @@ -121,7 +121,7 @@ def test_neg_cache_set_seq_cut_off_policy(pyocf_ctx, cm, cls): cache.add_core(core2) # Change cache seq cut off policy to invalid one and check if failed - for i in generate_random_numbers(c_uint32): + for i in RandomGenerator(DefaultRanges.UINT32): if i in [item.value for item in SeqCutOffPolicy]: continue with pytest.raises(OcfError, match="Error setting cache seq cut off policy"): @@ -153,7 +153,7 @@ def test_neg_core_set_seq_cut_off_policy(pyocf_ctx, cm, cls): cache.add_core(core) # Change core seq cut off policy to invalid one and check if failed - for i in generate_random_numbers(c_uint32): + for i in RandomGenerator(DefaultRanges.UINT32): if i in [item.value for item in SeqCutOffPolicy]: continue with pytest.raises(OcfError, match="Error setting core seq cut off policy"): @@ -178,7 +178,7 @@ def test_neg_set_alru_param(pyocf_ctx, cm, cls): ) # Change invalid alru param and check if failed - for i in generate_random_numbers(c_uint32): + for i in RandomGenerator(DefaultRanges.UINT32): if i in [item.value for item in AlruParams]: continue with pytest.raises(OcfError, match="Error setting cleaning policy param"): @@ -203,7 +203,7 @@ def test_neg_set_acp_param(pyocf_ctx, cm, cls): ) # Change invalid acp param and check if failed - for i in generate_random_numbers(c_uint32): + for i in RandomGenerator(DefaultRanges.UINT32): if i in [item.value for item in AcpParams]: continue with pytest.raises(OcfError, match="Error setting cleaning policy param"): @@ -228,7 +228,7 @@ def test_neg_set_promotion_policy(pyocf_ctx, cm, cls): ) # Change to invalid promotion policy and check if failed - for i in generate_random_numbers(c_uint32): + for i in RandomGenerator(DefaultRanges.UINT32): if i in [item.value for item in PromotionPolicy]: continue with pytest.raises(OcfError, match="Error setting promotion policy"): @@ -253,7 +253,7 @@ def test_neg_set_nhit_promotion_policy_param(pyocf_ctx, cm, cls): ) # Set invalid promotion policy param id and check if failed - for i in generate_random_numbers(c_uint8): + for i in RandomGenerator(DefaultRanges.UINT8): if i in [item.value for item in NhitParams]: continue with pytest.raises(OcfError, match="Error setting promotion policy parameter"): @@ -279,7 +279,7 @@ def test_neg_set_nhit_promotion_policy_param_trigger(pyocf_ctx, cm, cls): ) # Set to invalid promotion policy trigger threshold and check if failed - for i in generate_random_numbers(c_uint32): + for i in RandomGenerator(DefaultRanges.UINT32): if i in ConfValidValues.promotion_nhit_trigger_threshold_range: continue with pytest.raises(OcfError, match="Error setting promotion policy parameter"): @@ -305,7 +305,7 @@ def test_neg_set_nhit_promotion_policy_param_threshold(pyocf_ctx, cm, cls): ) # Set to invalid promotion policy insertion threshold and check if failed - for i in generate_random_numbers(c_uint32): + for i in RandomGenerator(DefaultRanges.UINT32): if i in ConfValidValues.promotion_nhit_insertion_threshold_range: continue with pytest.raises(OcfError, match="Error setting promotion policy parameter"): diff --git a/tests/functional/tests/security/test_management_start_fuzzy.py b/tests/functional/tests/security/test_management_start_fuzzy.py index f0db3ad..63099b0 100644 --- a/tests/functional/tests/security/test_management_start_fuzzy.py +++ b/tests/functional/tests/security/test_management_start_fuzzy.py @@ -5,7 +5,7 @@ import pytest import logging -from tests.utils import generate_random_numbers +from tests.utils.random import RandomGenerator, DefaultRanges from pyocf.types.cache import Cache, CacheMode, EvictionPolicy, MetadataLayout, PromotionPolicy from pyocf.types.volume import Volume from pyocf.utils import Size @@ -133,7 +133,7 @@ def test_fuzzy_start_metadata_layout(pyocf_ctx, c_uint32_randomize, cm, cls): @pytest.mark.security @pytest.mark.parametrize("cls", CacheLineSize) -@pytest.mark.parametrize('max_wb_queue_size', generate_random_numbers(c_uint32, 10)) +@pytest.mark.parametrize('max_wb_queue_size', RandomGenerator(DefaultRanges.UINT32, 10)) def test_fuzzy_start_max_queue_size(pyocf_ctx, max_wb_queue_size, c_uint32_randomize, cls): """ Test whether it is impossible to start cache with invalid dependence between max queue size diff --git a/tests/functional/tests/utils.py b/tests/functional/tests/utils.py deleted file mode 100644 index 342ec2f..0000000 --- a/tests/functional/tests/utils.py +++ /dev/null @@ -1,54 +0,0 @@ -# -# Copyright(c) 2019 Intel Corporation -# SPDX-License-Identifier: BSD-3-Clause-Clear -# - -import random -import string -from ctypes import ( - c_uint64, - c_uint32, - c_uint16, - c_uint8, - c_int, - c_uint -) - - -def generate_random_numbers(c_type, count=1000): - type_dict = { - c_uint8: [0, c_uint8(-1).value], - c_uint16: [0, c_uint16(-1).value], - c_uint32: [0, c_uint32(-1).value], - c_uint64: [0, c_uint64(-1).value], - c_int: [int(-c_uint(-1).value / 2) - 1, int(c_uint(-1).value / 2)] - } - - values = [] - for i in range(0, count): - values.append(random.randint(type_dict[c_type][0], type_dict[c_type][1])) - return values - - -def generate_random_strings(): - values = [] - for t in [string.digits, - string.ascii_letters + string.digits, - string.ascii_lowercase, - string.ascii_uppercase, - string.printable, - string.punctuation, - string.hexdigits]: - for i in range(0, 100): - values.append(''.join(random.choice(t) for _ in range(random.randint(0, 20)))) - return values - - -def get_random_ints(c_type): - for value in generate_random_numbers(c_type): - yield value - - -def get_random_strings(): - for value in generate_random_strings(): - yield value diff --git a/tests/functional/tests/utils/random.py b/tests/functional/tests/utils/random.py new file mode 100644 index 0000000..5c60885 --- /dev/null +++ b/tests/functional/tests/utils/random.py @@ -0,0 +1,90 @@ +# +# Copyright(c) 2019 Intel Corporation +# SPDX-License-Identifier: BSD-3-Clause-Clear +# + +import random +import string +import enum +from functools import reduce +from ctypes import ( + c_uint64, + c_uint32, + c_uint16, + c_uint8, + c_int, + c_uint +) + + +class Range: + def __init__(self, min_val, max_val): + self.min = min_val + self.max = max_val + + def is_within(self, val): + return val >= self.min and val <= self.max + + +class DefaultRanges(Range, enum.Enum): + UINT8 = 0, c_uint8(-1).value + UINT16 = 0, c_uint16(-1).value + UINT32 = 0, c_uint32(-1).value + UINT64 = 0, c_uint64(-1).value + INT = int(-c_uint(-1).value / 2) - 1, int(c_uint(-1).value / 2) + + +class RandomGenerator: + def __init__(self, base_range=DefaultRanges.INT, count=1000): + self.exclude = [] + self.range = base_range + self.count = count + self.n = 0 + + def exclude_range(self, excl_range): + self.exclude.append(excl_range) + + def __iter__(self): + return self + + def __next__(self): + if self.n >= self.count: + raise StopIteration() + self.n += 1 + while True: + val = random.randint(self.range.min, self.range.max) + if self.exclude: + excl_map = map(lambda e: e.is_within(val), self.exclude) + is_excluded = reduce(lambda a, b: a or b, excl_map) + if is_excluded: + continue + return val + + +class RandomStringGenerator: + def __init__(self, len_range=Range(0, 20), count=700): + self.generator = self.__string_generator(len_range) + self.count = count + self.n = 0 + + def __string_generator(self, len_range): + while True: + for t in [string.digits, + string.ascii_letters + string.digits, + string.ascii_lowercase, + string.ascii_uppercase, + string.printable, + string.punctuation, + string.hexdigits]: + yield ''.join(random.choice(t) for _ in range( + random.randint(len_range.min, len_range.max) + )) + + def __iter__(self): + return self + + def __next__(self): + if self.n >= self.count: + raise StopIteration() + self.n += 1 + return next(self.generator)