tests: functional: Add pretty random generators
Signed-off-by: Robert Baldyga <robert.baldyga@intel.com>
This commit is contained in:
parent
79a2d866ae
commit
29f0f33502
@ -11,33 +11,33 @@ from ctypes import (
|
||||
c_uint16,
|
||||
c_int
|
||||
)
|
||||
from tests.utils import get_random_strings, get_random_ints
|
||||
from tests.utils.random import RandomStringGenerator, RandomGenerator, DefaultRanges
|
||||
|
||||
import pytest
|
||||
|
||||
sys.path.append(os.path.join(os.path.dirname(__file__), os.path.pardir))
|
||||
|
||||
|
||||
@pytest.fixture(params=get_random_ints(c_uint16))
|
||||
@pytest.fixture(params=RandomGenerator(DefaultRanges.UINT16))
|
||||
def c_uint16_randomize(request):
|
||||
return request.param
|
||||
|
||||
|
||||
@pytest.fixture(params=get_random_ints(c_uint32))
|
||||
@pytest.fixture(params=RandomGenerator(DefaultRanges.UINT32))
|
||||
def c_uint32_randomize(request):
|
||||
return request.param
|
||||
|
||||
|
||||
@pytest.fixture(params=get_random_ints(c_uint64))
|
||||
@pytest.fixture(params=RandomGenerator(DefaultRanges.UINT64))
|
||||
def c_uint64_randomize(request):
|
||||
return request.param
|
||||
|
||||
|
||||
@pytest.fixture(params=get_random_ints(c_int))
|
||||
@pytest.fixture(params=RandomGenerator(DefaultRanges.INT))
|
||||
def c_int_randomize(request):
|
||||
return request.param
|
||||
|
||||
|
||||
@pytest.fixture(params=get_random_strings())
|
||||
@pytest.fixture(params=RandomStringGenerator())
|
||||
def string_randomize(request):
|
||||
return request.param
|
||||
|
@ -10,7 +10,7 @@ from pyocf.types.cache import Cache, CacheMode, CleaningPolicy,\
|
||||
from pyocf.types.core import Core
|
||||
from pyocf.types.volume import Volume
|
||||
from pyocf.utils import Size as S
|
||||
from tests.utils import generate_random_numbers
|
||||
from tests.utils.random import RandomGenerator, DefaultRanges
|
||||
from pyocf.types.shared import OcfError, CacheLineSize, SeqCutOffPolicy
|
||||
from ctypes import (
|
||||
c_uint64,
|
||||
@ -36,7 +36,7 @@ def test_neg_change_cache_mode(pyocf_ctx, cm, cls):
|
||||
)
|
||||
|
||||
# Change cache mode to invalid one and check if failed
|
||||
for i in generate_random_numbers(c_uint32):
|
||||
for i in RandomGenerator(DefaultRanges.UINT32):
|
||||
if i in [item.value for item in CacheMode]:
|
||||
continue
|
||||
with pytest.raises(OcfError, match="Error changing cache mode"):
|
||||
@ -61,7 +61,7 @@ def test_neg_set_cleaning_policy(pyocf_ctx, cm, cls):
|
||||
)
|
||||
|
||||
# Set cleaning policy to invalid one and check if failed
|
||||
for i in generate_random_numbers(c_uint32):
|
||||
for i in RandomGenerator(DefaultRanges.UINT32):
|
||||
if i in [item.value for item in CleaningPolicy]:
|
||||
continue
|
||||
with pytest.raises(OcfError, match="Error changing cleaning policy"):
|
||||
@ -86,7 +86,7 @@ def test_neg_attach_cls(pyocf_ctx, cm, cls):
|
||||
cache.start_cache()
|
||||
|
||||
# Check whether it is possible to attach cache device with invalid cache line size
|
||||
for i in generate_random_numbers(c_uint64):
|
||||
for i in RandomGenerator(DefaultRanges.UINT64):
|
||||
if i in [item.value for item in CacheLineSize]:
|
||||
continue
|
||||
with pytest.raises(OcfError, match="Attaching cache device failed"):
|
||||
@ -121,7 +121,7 @@ def test_neg_cache_set_seq_cut_off_policy(pyocf_ctx, cm, cls):
|
||||
cache.add_core(core2)
|
||||
|
||||
# Change cache seq cut off policy to invalid one and check if failed
|
||||
for i in generate_random_numbers(c_uint32):
|
||||
for i in RandomGenerator(DefaultRanges.UINT32):
|
||||
if i in [item.value for item in SeqCutOffPolicy]:
|
||||
continue
|
||||
with pytest.raises(OcfError, match="Error setting cache seq cut off policy"):
|
||||
@ -153,7 +153,7 @@ def test_neg_core_set_seq_cut_off_policy(pyocf_ctx, cm, cls):
|
||||
cache.add_core(core)
|
||||
|
||||
# Change core seq cut off policy to invalid one and check if failed
|
||||
for i in generate_random_numbers(c_uint32):
|
||||
for i in RandomGenerator(DefaultRanges.UINT32):
|
||||
if i in [item.value for item in SeqCutOffPolicy]:
|
||||
continue
|
||||
with pytest.raises(OcfError, match="Error setting core seq cut off policy"):
|
||||
@ -178,7 +178,7 @@ def test_neg_set_alru_param(pyocf_ctx, cm, cls):
|
||||
)
|
||||
|
||||
# Change invalid alru param and check if failed
|
||||
for i in generate_random_numbers(c_uint32):
|
||||
for i in RandomGenerator(DefaultRanges.UINT32):
|
||||
if i in [item.value for item in AlruParams]:
|
||||
continue
|
||||
with pytest.raises(OcfError, match="Error setting cleaning policy param"):
|
||||
@ -203,7 +203,7 @@ def test_neg_set_acp_param(pyocf_ctx, cm, cls):
|
||||
)
|
||||
|
||||
# Change invalid acp param and check if failed
|
||||
for i in generate_random_numbers(c_uint32):
|
||||
for i in RandomGenerator(DefaultRanges.UINT32):
|
||||
if i in [item.value for item in AcpParams]:
|
||||
continue
|
||||
with pytest.raises(OcfError, match="Error setting cleaning policy param"):
|
||||
@ -228,7 +228,7 @@ def test_neg_set_promotion_policy(pyocf_ctx, cm, cls):
|
||||
)
|
||||
|
||||
# Change to invalid promotion policy and check if failed
|
||||
for i in generate_random_numbers(c_uint32):
|
||||
for i in RandomGenerator(DefaultRanges.UINT32):
|
||||
if i in [item.value for item in PromotionPolicy]:
|
||||
continue
|
||||
with pytest.raises(OcfError, match="Error setting promotion policy"):
|
||||
@ -253,7 +253,7 @@ def test_neg_set_nhit_promotion_policy_param(pyocf_ctx, cm, cls):
|
||||
)
|
||||
|
||||
# Set invalid promotion policy param id and check if failed
|
||||
for i in generate_random_numbers(c_uint8):
|
||||
for i in RandomGenerator(DefaultRanges.UINT8):
|
||||
if i in [item.value for item in NhitParams]:
|
||||
continue
|
||||
with pytest.raises(OcfError, match="Error setting promotion policy parameter"):
|
||||
@ -279,7 +279,7 @@ def test_neg_set_nhit_promotion_policy_param_trigger(pyocf_ctx, cm, cls):
|
||||
)
|
||||
|
||||
# Set to invalid promotion policy trigger threshold and check if failed
|
||||
for i in generate_random_numbers(c_uint32):
|
||||
for i in RandomGenerator(DefaultRanges.UINT32):
|
||||
if i in ConfValidValues.promotion_nhit_trigger_threshold_range:
|
||||
continue
|
||||
with pytest.raises(OcfError, match="Error setting promotion policy parameter"):
|
||||
@ -305,7 +305,7 @@ def test_neg_set_nhit_promotion_policy_param_threshold(pyocf_ctx, cm, cls):
|
||||
)
|
||||
|
||||
# Set to invalid promotion policy insertion threshold and check if failed
|
||||
for i in generate_random_numbers(c_uint32):
|
||||
for i in RandomGenerator(DefaultRanges.UINT32):
|
||||
if i in ConfValidValues.promotion_nhit_insertion_threshold_range:
|
||||
continue
|
||||
with pytest.raises(OcfError, match="Error setting promotion policy parameter"):
|
||||
|
@ -5,7 +5,7 @@
|
||||
|
||||
import pytest
|
||||
import logging
|
||||
from tests.utils import generate_random_numbers
|
||||
from tests.utils.random import RandomGenerator, DefaultRanges
|
||||
from pyocf.types.cache import Cache, CacheMode, EvictionPolicy, MetadataLayout, PromotionPolicy
|
||||
from pyocf.types.volume import Volume
|
||||
from pyocf.utils import Size
|
||||
@ -133,7 +133,7 @@ def test_fuzzy_start_metadata_layout(pyocf_ctx, c_uint32_randomize, cm, cls):
|
||||
|
||||
@pytest.mark.security
|
||||
@pytest.mark.parametrize("cls", CacheLineSize)
|
||||
@pytest.mark.parametrize('max_wb_queue_size', generate_random_numbers(c_uint32, 10))
|
||||
@pytest.mark.parametrize('max_wb_queue_size', RandomGenerator(DefaultRanges.UINT32, 10))
|
||||
def test_fuzzy_start_max_queue_size(pyocf_ctx, max_wb_queue_size, c_uint32_randomize, cls):
|
||||
"""
|
||||
Test whether it is impossible to start cache with invalid dependence between max queue size
|
||||
|
@ -1,54 +0,0 @@
|
||||
#
|
||||
# Copyright(c) 2019 Intel Corporation
|
||||
# SPDX-License-Identifier: BSD-3-Clause-Clear
|
||||
#
|
||||
|
||||
import random
|
||||
import string
|
||||
from ctypes import (
|
||||
c_uint64,
|
||||
c_uint32,
|
||||
c_uint16,
|
||||
c_uint8,
|
||||
c_int,
|
||||
c_uint
|
||||
)
|
||||
|
||||
|
||||
def generate_random_numbers(c_type, count=1000):
|
||||
type_dict = {
|
||||
c_uint8: [0, c_uint8(-1).value],
|
||||
c_uint16: [0, c_uint16(-1).value],
|
||||
c_uint32: [0, c_uint32(-1).value],
|
||||
c_uint64: [0, c_uint64(-1).value],
|
||||
c_int: [int(-c_uint(-1).value / 2) - 1, int(c_uint(-1).value / 2)]
|
||||
}
|
||||
|
||||
values = []
|
||||
for i in range(0, count):
|
||||
values.append(random.randint(type_dict[c_type][0], type_dict[c_type][1]))
|
||||
return values
|
||||
|
||||
|
||||
def generate_random_strings():
|
||||
values = []
|
||||
for t in [string.digits,
|
||||
string.ascii_letters + string.digits,
|
||||
string.ascii_lowercase,
|
||||
string.ascii_uppercase,
|
||||
string.printable,
|
||||
string.punctuation,
|
||||
string.hexdigits]:
|
||||
for i in range(0, 100):
|
||||
values.append(''.join(random.choice(t) for _ in range(random.randint(0, 20))))
|
||||
return values
|
||||
|
||||
|
||||
def get_random_ints(c_type):
|
||||
for value in generate_random_numbers(c_type):
|
||||
yield value
|
||||
|
||||
|
||||
def get_random_strings():
|
||||
for value in generate_random_strings():
|
||||
yield value
|
90
tests/functional/tests/utils/random.py
Normal file
90
tests/functional/tests/utils/random.py
Normal file
@ -0,0 +1,90 @@
|
||||
#
|
||||
# Copyright(c) 2019 Intel Corporation
|
||||
# SPDX-License-Identifier: BSD-3-Clause-Clear
|
||||
#
|
||||
|
||||
import random
|
||||
import string
|
||||
import enum
|
||||
from functools import reduce
|
||||
from ctypes import (
|
||||
c_uint64,
|
||||
c_uint32,
|
||||
c_uint16,
|
||||
c_uint8,
|
||||
c_int,
|
||||
c_uint
|
||||
)
|
||||
|
||||
|
||||
class Range:
|
||||
def __init__(self, min_val, max_val):
|
||||
self.min = min_val
|
||||
self.max = max_val
|
||||
|
||||
def is_within(self, val):
|
||||
return val >= self.min and val <= self.max
|
||||
|
||||
|
||||
class DefaultRanges(Range, enum.Enum):
|
||||
UINT8 = 0, c_uint8(-1).value
|
||||
UINT16 = 0, c_uint16(-1).value
|
||||
UINT32 = 0, c_uint32(-1).value
|
||||
UINT64 = 0, c_uint64(-1).value
|
||||
INT = int(-c_uint(-1).value / 2) - 1, int(c_uint(-1).value / 2)
|
||||
|
||||
|
||||
class RandomGenerator:
|
||||
def __init__(self, base_range=DefaultRanges.INT, count=1000):
|
||||
self.exclude = []
|
||||
self.range = base_range
|
||||
self.count = count
|
||||
self.n = 0
|
||||
|
||||
def exclude_range(self, excl_range):
|
||||
self.exclude.append(excl_range)
|
||||
|
||||
def __iter__(self):
|
||||
return self
|
||||
|
||||
def __next__(self):
|
||||
if self.n >= self.count:
|
||||
raise StopIteration()
|
||||
self.n += 1
|
||||
while True:
|
||||
val = random.randint(self.range.min, self.range.max)
|
||||
if self.exclude:
|
||||
excl_map = map(lambda e: e.is_within(val), self.exclude)
|
||||
is_excluded = reduce(lambda a, b: a or b, excl_map)
|
||||
if is_excluded:
|
||||
continue
|
||||
return val
|
||||
|
||||
|
||||
class RandomStringGenerator:
|
||||
def __init__(self, len_range=Range(0, 20), count=700):
|
||||
self.generator = self.__string_generator(len_range)
|
||||
self.count = count
|
||||
self.n = 0
|
||||
|
||||
def __string_generator(self, len_range):
|
||||
while True:
|
||||
for t in [string.digits,
|
||||
string.ascii_letters + string.digits,
|
||||
string.ascii_lowercase,
|
||||
string.ascii_uppercase,
|
||||
string.printable,
|
||||
string.punctuation,
|
||||
string.hexdigits]:
|
||||
yield ''.join(random.choice(t) for _ in range(
|
||||
random.randint(len_range.min, len_range.max)
|
||||
))
|
||||
|
||||
def __iter__(self):
|
||||
return self
|
||||
|
||||
def __next__(self):
|
||||
if self.n >= self.count:
|
||||
raise StopIteration()
|
||||
self.n += 1
|
||||
return next(self.generator)
|
Loading…
Reference in New Issue
Block a user