Merge pull request #122 from KamilLepek/security_tests
Add security fuzzy tests and improve pyocf api
This commit is contained in:
commit
c80ca4a9a8
@ -651,7 +651,7 @@ int ocf_mngt_cache_set_mode(ocf_cache_t cache, ocf_cache_mode_t mode);
|
|||||||
* use function ocf_mngt_cache_save().
|
* use function ocf_mngt_cache_save().
|
||||||
*
|
*
|
||||||
* @param[in] cache Cache handle
|
* @param[in] cache Cache handle
|
||||||
* @param[in] type Cleainig policy type
|
* @param[in] type Cleaning policy type
|
||||||
*
|
*
|
||||||
* @retval 0 Policy has been set successfully
|
* @retval 0 Policy has been set successfully
|
||||||
* @retval Non-zero Error occurred and policy has not been set
|
* @retval Non-zero Error occurred and policy has not been set
|
||||||
@ -676,6 +676,7 @@ int ocf_mngt_cache_cleaning_get_policy(ocf_cache_t cache, ocf_cleaning_t *type);
|
|||||||
* use function ocf_mngt_cache_save().
|
* use function ocf_mngt_cache_save().
|
||||||
*
|
*
|
||||||
* @param[in] cache Cache handle
|
* @param[in] cache Cache handle
|
||||||
|
* @param[in] type Cleaning policy type
|
||||||
* @param[in] param_id Cleaning policy parameter id
|
* @param[in] param_id Cleaning policy parameter id
|
||||||
* @param[in] param_value Cleaning policy parameter value
|
* @param[in] param_value Cleaning policy parameter value
|
||||||
*
|
*
|
||||||
@ -689,6 +690,7 @@ int ocf_mngt_cache_cleaning_set_param(ocf_cache_t cache, ocf_cleaning_t type,
|
|||||||
* @brief Get cleaning parameter from given cache
|
* @brief Get cleaning parameter from given cache
|
||||||
*
|
*
|
||||||
* @param[in] cache Cache handle
|
* @param[in] cache Cache handle
|
||||||
|
* @param[in] type Cleaning policy type
|
||||||
* @param[in] param_id Cleaning policy parameter id
|
* @param[in] param_id Cleaning policy parameter id
|
||||||
* @param[in] param_value Variable to store parameter value
|
* @param[in] param_value Variable to store parameter value
|
||||||
*
|
*
|
||||||
|
@ -21,7 +21,7 @@ from enum import IntEnum
|
|||||||
from datetime import timedelta
|
from datetime import timedelta
|
||||||
|
|
||||||
from ..ocf import OcfLib
|
from ..ocf import OcfLib
|
||||||
from .shared import Uuid, OcfError, CacheLineSize, CacheLines, OcfCompletion
|
from .shared import Uuid, OcfError, CacheLineSize, CacheLines, OcfCompletion, SeqCutOffPolicy
|
||||||
from ..utils import Size, struct_to_dict
|
from ..utils import Size, struct_to_dict
|
||||||
from .core import Core
|
from .core import Core
|
||||||
from .queue import Queue
|
from .queue import Queue
|
||||||
@ -82,6 +82,18 @@ class CleaningPolicy(IntEnum):
|
|||||||
DEFAULT = ALRU
|
DEFAULT = ALRU
|
||||||
|
|
||||||
|
|
||||||
|
class AlruParams(IntEnum):
|
||||||
|
WAKE_UP_TIME = 0,
|
||||||
|
STALE_BUFFER_TIME = 1,
|
||||||
|
FLUSH_MAX_BUFFERS = 2,
|
||||||
|
ACTIVITY_THRESHOLD = 3
|
||||||
|
|
||||||
|
|
||||||
|
class AcpParams(IntEnum):
|
||||||
|
WAKE_UP_TIME = 0,
|
||||||
|
FLUSH_MAX_BUFFERS = 1
|
||||||
|
|
||||||
|
|
||||||
class MetadataLayout(IntEnum):
|
class MetadataLayout(IntEnum):
|
||||||
STRIPING = 0
|
STRIPING = 0
|
||||||
SEQUENTIAL = 1
|
SEQUENTIAL = 1
|
||||||
@ -168,7 +180,35 @@ class Cache:
|
|||||||
|
|
||||||
def change_cache_mode(self, cache_mode: CacheMode):
|
def change_cache_mode(self, cache_mode: CacheMode):
|
||||||
self.get_and_write_lock()
|
self.get_and_write_lock()
|
||||||
self.owner.lib.ocf_mngt_cache_set_mode(self.cache_handle, cache_mode)
|
status = self.owner.lib.ocf_mngt_cache_set_mode(self.cache_handle, cache_mode)
|
||||||
|
if status:
|
||||||
|
raise OcfError("Error changing cache mode", status)
|
||||||
|
self.put_and_write_unlock()
|
||||||
|
|
||||||
|
def set_cleaning_policy(self, cleaning_policy: CleaningPolicy):
|
||||||
|
self.get_and_write_lock()
|
||||||
|
status = self.owner.lib.ocf_mngt_cache_cleaning_set_policy(self.cache_handle, cleaning_policy)
|
||||||
|
if status:
|
||||||
|
raise OcfError("Error changing cleaning policy", status)
|
||||||
|
self.put_and_write_unlock()
|
||||||
|
|
||||||
|
def set_cleaning_policy_param(self, cleaning_policy: CleaningPolicy, param_id, param_value):
|
||||||
|
self.get_and_write_lock()
|
||||||
|
status = self.owner.lib.ocf_mngt_cache_cleaning_set_param(
|
||||||
|
self.cache_handle,
|
||||||
|
cleaning_policy,
|
||||||
|
param_id,
|
||||||
|
param_value
|
||||||
|
)
|
||||||
|
if status:
|
||||||
|
raise OcfError("Error setting cleaning policy param", status)
|
||||||
|
self.put_and_write_unlock()
|
||||||
|
|
||||||
|
def set_seq_cut_off_policy(self, policy: SeqCutOffPolicy):
|
||||||
|
self.get_and_write_lock()
|
||||||
|
status = self.owner.lib.ocf_mngt_core_set_seq_cutoff_policy_all(self.cache_handle, policy)
|
||||||
|
if status:
|
||||||
|
raise OcfError("Error setting cache seq cut off policy", status)
|
||||||
self.put_and_write_unlock()
|
self.put_and_write_unlock()
|
||||||
|
|
||||||
def configure_device(
|
def configure_device(
|
||||||
@ -457,3 +497,10 @@ lib.ocf_mngt_cache_remove_core.argtypes = [c_void_p, c_void_p, c_void_p]
|
|||||||
lib.ocf_mngt_cache_add_core.argtypes = [c_void_p, c_void_p, c_void_p, c_void_p]
|
lib.ocf_mngt_cache_add_core.argtypes = [c_void_p, c_void_p, c_void_p, c_void_p]
|
||||||
lib.ocf_cache_get_name.argtypes = [c_void_p]
|
lib.ocf_cache_get_name.argtypes = [c_void_p]
|
||||||
lib.ocf_cache_get_name.restype = c_char_p
|
lib.ocf_cache_get_name.restype = c_char_p
|
||||||
|
lib.ocf_mngt_cache_cleaning_set_policy.argtypes = [c_void_p, c_uint32]
|
||||||
|
lib.ocf_mngt_cache_cleaning_set_policy.restype = c_int
|
||||||
|
lib.ocf_mngt_core_set_seq_cutoff_policy_all.argtypes = [c_void_p, c_uint32]
|
||||||
|
lib.ocf_mngt_core_set_seq_cutoff_policy_all.restype = c_int
|
||||||
|
lib.ocf_mngt_cache_cleaning_set_param.argtypes = [c_void_p, c_uint32, c_uint32, c_uint32]
|
||||||
|
lib.ocf_mngt_cache_cleaning_set_param.restype = c_int
|
||||||
|
|
||||||
|
@ -21,7 +21,7 @@ import logging
|
|||||||
from datetime import timedelta
|
from datetime import timedelta
|
||||||
|
|
||||||
from ..ocf import OcfLib
|
from ..ocf import OcfLib
|
||||||
from .shared import Uuid, OcfCompletion, OcfError
|
from .shared import Uuid, OcfCompletion, OcfError, SeqCutOffPolicy
|
||||||
from .volume import Volume
|
from .volume import Volume
|
||||||
from .data import Data
|
from .data import Data
|
||||||
from .io import Io, IoDir
|
from .io import Io, IoDir
|
||||||
@ -138,6 +138,13 @@ class Core:
|
|||||||
"errors": struct_to_dict(errors),
|
"errors": struct_to_dict(errors),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
def set_seq_cut_off_policy(self, policy: SeqCutOffPolicy):
|
||||||
|
self.cache.get_and_write_lock()
|
||||||
|
status = self.cache.owner.lib.ocf_mngt_core_set_seq_cutoff_policy(self.handle, policy)
|
||||||
|
if status:
|
||||||
|
raise OcfError("Error setting core seq cut off policy", status)
|
||||||
|
self.cache.put_and_write_unlock()
|
||||||
|
|
||||||
def reset_stats(self):
|
def reset_stats(self):
|
||||||
self.cache.owner.lib.ocf_core_stats_initialize(self.handle)
|
self.cache.owner.lib.ocf_core_stats_initialize(self.handle)
|
||||||
|
|
||||||
@ -168,3 +175,5 @@ lib.ocf_volume_new_io.argtypes = [c_void_p]
|
|||||||
lib.ocf_volume_new_io.restype = c_void_p
|
lib.ocf_volume_new_io.restype = c_void_p
|
||||||
lib.ocf_core_get_volume.argtypes = [c_void_p]
|
lib.ocf_core_get_volume.argtypes = [c_void_p]
|
||||||
lib.ocf_core_get_volume.restype = c_void_p
|
lib.ocf_core_get_volume.restype = c_void_p
|
||||||
|
lib.ocf_mngt_core_set_seq_cutoff_policy.argtypes = [c_void_p, c_uint32]
|
||||||
|
lib.ocf_mngt_core_set_seq_cutoff_policy.restype = c_int
|
||||||
|
@ -123,6 +123,13 @@ class CacheLineSize(IntEnum):
|
|||||||
DEFAULT = LINE_4KiB
|
DEFAULT = LINE_4KiB
|
||||||
|
|
||||||
|
|
||||||
|
class SeqCutOffPolicy(IntEnum):
|
||||||
|
ALWAYS = 0
|
||||||
|
FULL = 1
|
||||||
|
NEVER = 2
|
||||||
|
DEFAULT = FULL
|
||||||
|
|
||||||
|
|
||||||
class CacheLines(S):
|
class CacheLines(S):
|
||||||
def __init__(self, count: int, line_size: CacheLineSize):
|
def __init__(self, count: int, line_size: CacheLineSize):
|
||||||
self.bytes = count * line_size
|
self.bytes = count * line_size
|
||||||
|
@ -15,7 +15,6 @@ def print_buffer(buf, length, offset=0, width=16, stop_after_zeros=0):
|
|||||||
|
|
||||||
for addr in range(offset, end, width):
|
for addr in range(offset, end, width):
|
||||||
cur_line = buf[addr : min(end, addr + width)]
|
cur_line = buf[addr : min(end, addr + width)]
|
||||||
all_zeros = True
|
|
||||||
byteline = ""
|
byteline = ""
|
||||||
asciiline = ""
|
asciiline = ""
|
||||||
if not any(cur_line):
|
if not any(cur_line):
|
||||||
|
@ -5,59 +5,19 @@
|
|||||||
|
|
||||||
import os
|
import os
|
||||||
import sys
|
import sys
|
||||||
import random
|
|
||||||
import string
|
|
||||||
from ctypes import (
|
from ctypes import (
|
||||||
c_uint64,
|
c_uint64,
|
||||||
c_uint32,
|
c_uint32,
|
||||||
c_uint16,
|
c_uint16,
|
||||||
c_int,
|
c_int
|
||||||
c_uint
|
|
||||||
)
|
)
|
||||||
|
from tests.utils import get_random_strings, get_random_ints
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
sys.path.append(os.path.join(os.path.dirname(__file__), os.path.pardir))
|
sys.path.append(os.path.join(os.path.dirname(__file__), os.path.pardir))
|
||||||
|
|
||||||
|
|
||||||
def generate_random_numbers(c_type):
|
|
||||||
type_dict = {
|
|
||||||
c_uint16: [0, c_uint16(-1).value],
|
|
||||||
c_uint32: [0, c_uint32(-1).value],
|
|
||||||
c_uint64: [0, c_uint64(-1).value],
|
|
||||||
c_int: [int(-c_uint(-1).value / 2) - 1, int(c_uint(-1).value / 2)]
|
|
||||||
}
|
|
||||||
|
|
||||||
values = []
|
|
||||||
for i in range(0, 1000):
|
|
||||||
values.append(random.randint(type_dict[c_type][0], type_dict[c_type][1]))
|
|
||||||
return values
|
|
||||||
|
|
||||||
|
|
||||||
def generate_random_strings():
|
|
||||||
values = []
|
|
||||||
for t in [string.digits,
|
|
||||||
string.ascii_letters + string.digits,
|
|
||||||
string.ascii_lowercase,
|
|
||||||
string.ascii_uppercase,
|
|
||||||
string.printable,
|
|
||||||
string.punctuation,
|
|
||||||
string.hexdigits]:
|
|
||||||
for i in range(0, 100):
|
|
||||||
values.append(''.join(random.choice(t) for _ in range(random.randint(0, 20))))
|
|
||||||
return values
|
|
||||||
|
|
||||||
|
|
||||||
def get_random_ints(c_type):
|
|
||||||
for value in generate_random_numbers(c_type):
|
|
||||||
yield value
|
|
||||||
|
|
||||||
|
|
||||||
def get_random_strings():
|
|
||||||
for value in generate_random_strings():
|
|
||||||
yield value
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(params=get_random_ints(c_uint16))
|
@pytest.fixture(params=get_random_ints(c_uint16))
|
||||||
def c_uint16_randomize(request):
|
def c_uint16_randomize(request):
|
||||||
return request.param
|
return request.param
|
||||||
|
208
tests/functional/tests/security/test_management_fuzzy.py
Normal file
208
tests/functional/tests/security/test_management_fuzzy.py
Normal file
@ -0,0 +1,208 @@
|
|||||||
|
#
|
||||||
|
# Copyright(c) 2019 Intel Corporation
|
||||||
|
# SPDX-License-Identifier: BSD-3-Clause-Clear
|
||||||
|
#
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from pyocf.types.cache import Cache, CacheMode, CleaningPolicy, AlruParams, AcpParams
|
||||||
|
from pyocf.types.core import Core
|
||||||
|
from pyocf.types.volume import Volume
|
||||||
|
from pyocf.utils import Size as S
|
||||||
|
from tests.utils import generate_random_numbers
|
||||||
|
from pyocf.types.shared import OcfError, CacheLineSize, SeqCutOffPolicy
|
||||||
|
from ctypes import (
|
||||||
|
c_uint64,
|
||||||
|
c_uint32
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize("cm", CacheMode)
|
||||||
|
@pytest.mark.parametrize("cls", CacheLineSize)
|
||||||
|
@pytest.mark.security
|
||||||
|
def test_neg_change_cache_mode(pyocf_ctx, cm, cls):
|
||||||
|
"""
|
||||||
|
Test whether it is possible to change cache mode to invalid value.
|
||||||
|
:param pyocf_ctx: basic pyocf context fixture
|
||||||
|
:param cm: cache mode we start with
|
||||||
|
:param cls: cache line size we start with
|
||||||
|
"""
|
||||||
|
# Start cache device
|
||||||
|
cache_device = Volume(S.from_MiB(30))
|
||||||
|
cache = Cache.start_on_device(
|
||||||
|
cache_device, cache_mode=cm, cache_line_size=cls
|
||||||
|
)
|
||||||
|
|
||||||
|
# Change cache mode to invalid one and check if failed
|
||||||
|
for i in generate_random_numbers(c_uint32):
|
||||||
|
if i in [item.value for item in CacheMode]:
|
||||||
|
continue
|
||||||
|
with pytest.raises(OcfError, match="Error changing cache mode"):
|
||||||
|
cache.change_cache_mode(i)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize("cm", CacheMode)
|
||||||
|
@pytest.mark.parametrize("cls", CacheLineSize)
|
||||||
|
@pytest.mark.security
|
||||||
|
def test_neg_set_cleaning_policy(pyocf_ctx, cm, cls):
|
||||||
|
"""
|
||||||
|
Test whether it is possible to change cleaning policy to invalid value
|
||||||
|
:param pyocf_ctx: basic pyocf context fixture
|
||||||
|
:param cm: cache mode we start with
|
||||||
|
:param cls: cache line size we start with
|
||||||
|
:return:
|
||||||
|
"""
|
||||||
|
# Start cache device
|
||||||
|
cache_device = Volume(S.from_MiB(30))
|
||||||
|
cache = Cache.start_on_device(
|
||||||
|
cache_device, cache_mode=cm, cache_line_size=cls
|
||||||
|
)
|
||||||
|
|
||||||
|
# Set cleaning policy to invalid one and check if failed
|
||||||
|
for i in generate_random_numbers(c_uint32):
|
||||||
|
if i in [item.value for item in CleaningPolicy]:
|
||||||
|
continue
|
||||||
|
with pytest.raises(OcfError, match="Error changing cleaning policy"):
|
||||||
|
cache.set_cleaning_policy(i)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize("cm", CacheMode)
|
||||||
|
@pytest.mark.parametrize("cls", CacheLineSize)
|
||||||
|
@pytest.mark.security
|
||||||
|
def test_neg_attach_cls(pyocf_ctx, cm, cls):
|
||||||
|
"""
|
||||||
|
Test whether it is possible to change cache line size to
|
||||||
|
invalid value while attaching cache device
|
||||||
|
:param pyocf_ctx: basic pyocf context fixture
|
||||||
|
:param cm: cache mode we start with
|
||||||
|
:param cls: cache line size we start with
|
||||||
|
:return:
|
||||||
|
"""
|
||||||
|
# Start cache device
|
||||||
|
cache_device = Volume(S.from_MiB(30))
|
||||||
|
cache = Cache(owner=cache_device.owner, cache_mode=cm, cache_line_size=cls)
|
||||||
|
cache.start_cache()
|
||||||
|
|
||||||
|
# Check whether it is possible to attach cache device with invalid cache line size
|
||||||
|
for i in generate_random_numbers(c_uint64):
|
||||||
|
if i in [item.value for item in CacheLineSize]:
|
||||||
|
continue
|
||||||
|
with pytest.raises(OcfError, match="Attaching cache device failed"):
|
||||||
|
cache.attach_device(cache_device, cache_line_size=i)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize("cm", CacheMode)
|
||||||
|
@pytest.mark.parametrize("cls", CacheLineSize)
|
||||||
|
@pytest.mark.security
|
||||||
|
def test_neg_cache_set_seq_cut_off_policy(pyocf_ctx, cm, cls):
|
||||||
|
"""
|
||||||
|
Test whether it is possible to change cache seq cut-off policy to invalid value
|
||||||
|
:param pyocf_ctx: basic pyocf context fixture
|
||||||
|
:param cm: cache mode we start with
|
||||||
|
:param cls: cache line size we start with
|
||||||
|
:return:
|
||||||
|
"""
|
||||||
|
# Start cache device
|
||||||
|
cache_device = Volume(S.from_MiB(30))
|
||||||
|
cache = Cache.start_on_device(
|
||||||
|
cache_device, cache_mode=cm, cache_line_size=cls
|
||||||
|
)
|
||||||
|
|
||||||
|
# Create 2 core devices
|
||||||
|
core_device1 = Volume(S.from_MiB(10))
|
||||||
|
core1 = Core.using_device(core_device1)
|
||||||
|
core_device2 = Volume(S.from_MiB(10))
|
||||||
|
core2 = Core.using_device(core_device2)
|
||||||
|
|
||||||
|
# Add cores
|
||||||
|
cache.add_core(core1)
|
||||||
|
cache.add_core(core2)
|
||||||
|
|
||||||
|
# Change cache seq cut off policy to invalid one and check if failed
|
||||||
|
for i in generate_random_numbers(c_uint32):
|
||||||
|
if i in [item.value for item in SeqCutOffPolicy]:
|
||||||
|
continue
|
||||||
|
with pytest.raises(OcfError, match="Error setting cache seq cut off policy"):
|
||||||
|
cache.set_seq_cut_off_policy(i)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize("cm", CacheMode)
|
||||||
|
@pytest.mark.parametrize("cls", CacheLineSize)
|
||||||
|
@pytest.mark.security
|
||||||
|
def test_neg_core_set_seq_cut_off_policy(pyocf_ctx, cm, cls):
|
||||||
|
"""
|
||||||
|
Test whether it is possible to change core seq cut-off policy to invalid value
|
||||||
|
:param pyocf_ctx: basic pyocf context fixture
|
||||||
|
:param cm: cache mode we start with
|
||||||
|
:param cls: cache line size we start with
|
||||||
|
:return:
|
||||||
|
"""
|
||||||
|
# Start cache device
|
||||||
|
cache_device = Volume(S.from_MiB(30))
|
||||||
|
cache = Cache.start_on_device(
|
||||||
|
cache_device, cache_mode=cm, cache_line_size=cls
|
||||||
|
)
|
||||||
|
|
||||||
|
# Create core device
|
||||||
|
core_device = Volume(S.from_MiB(10))
|
||||||
|
core = Core.using_device(core_device)
|
||||||
|
|
||||||
|
# Add core
|
||||||
|
cache.add_core(core)
|
||||||
|
|
||||||
|
# Change core seq cut off policy to invalid one and check if failed
|
||||||
|
for i in generate_random_numbers(c_uint32):
|
||||||
|
if i in [item.value for item in SeqCutOffPolicy]:
|
||||||
|
continue
|
||||||
|
with pytest.raises(OcfError, match="Error setting core seq cut off policy"):
|
||||||
|
core.set_seq_cut_off_policy(i)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize("cm", CacheMode)
|
||||||
|
@pytest.mark.parametrize("cls", CacheLineSize)
|
||||||
|
@pytest.mark.security
|
||||||
|
def test_neg_set_alru_param(pyocf_ctx, cm, cls):
|
||||||
|
"""
|
||||||
|
Test whether it is possible to set invalid param for alru cleaning policy
|
||||||
|
:param pyocf_ctx: basic pyocf context fixture
|
||||||
|
:param cm: cache mode we start with
|
||||||
|
:param cls: cache line size we start with
|
||||||
|
:return:
|
||||||
|
"""
|
||||||
|
# Start cache device
|
||||||
|
cache_device = Volume(S.from_MiB(30))
|
||||||
|
cache = Cache.start_on_device(
|
||||||
|
cache_device, cache_mode=cm, cache_line_size=cls
|
||||||
|
)
|
||||||
|
|
||||||
|
# Change invalid alru param and check if failed
|
||||||
|
for i in generate_random_numbers(c_uint32):
|
||||||
|
if i in [item.value for item in AlruParams]:
|
||||||
|
continue
|
||||||
|
with pytest.raises(OcfError, match="Error setting cleaning policy param"):
|
||||||
|
cache.set_cleaning_policy_param(CleaningPolicy.ALRU, i, 1)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize("cm", CacheMode)
|
||||||
|
@pytest.mark.parametrize("cls", CacheLineSize)
|
||||||
|
@pytest.mark.security
|
||||||
|
def test_neg_set_acp_param(pyocf_ctx, cm, cls):
|
||||||
|
"""
|
||||||
|
Test whether it is possible to set invalid param for acp cleaning policy
|
||||||
|
:param pyocf_ctx: basic pyocf context fixture
|
||||||
|
:param cm: cache mode we start with
|
||||||
|
:param cls: cache line size we start with
|
||||||
|
:return:
|
||||||
|
"""
|
||||||
|
# Start cache device
|
||||||
|
cache_device = Volume(S.from_MiB(30))
|
||||||
|
cache = Cache.start_on_device(
|
||||||
|
cache_device, cache_mode=cm, cache_line_size=cls
|
||||||
|
)
|
||||||
|
|
||||||
|
# Change invalid acp param and check if failed
|
||||||
|
for i in generate_random_numbers(c_uint32):
|
||||||
|
if i in [item.value for item in AcpParams]:
|
||||||
|
continue
|
||||||
|
with pytest.raises(OcfError, match="Error setting cleaning policy param"):
|
||||||
|
cache.set_cleaning_policy_param(CleaningPolicy.ALRU, i, 1)
|
52
tests/functional/tests/utils.py
Normal file
52
tests/functional/tests/utils.py
Normal file
@ -0,0 +1,52 @@
|
|||||||
|
#
|
||||||
|
# Copyright(c) 2019 Intel Corporation
|
||||||
|
# SPDX-License-Identifier: BSD-3-Clause-Clear
|
||||||
|
#
|
||||||
|
|
||||||
|
import random
|
||||||
|
import string
|
||||||
|
from ctypes import (
|
||||||
|
c_uint64,
|
||||||
|
c_uint32,
|
||||||
|
c_uint16,
|
||||||
|
c_int,
|
||||||
|
c_uint
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def generate_random_numbers(c_type):
|
||||||
|
type_dict = {
|
||||||
|
c_uint16: [0, c_uint16(-1).value],
|
||||||
|
c_uint32: [0, c_uint32(-1).value],
|
||||||
|
c_uint64: [0, c_uint64(-1).value],
|
||||||
|
c_int: [int(-c_uint(-1).value / 2) - 1, int(c_uint(-1).value / 2)]
|
||||||
|
}
|
||||||
|
|
||||||
|
values = []
|
||||||
|
for i in range(0, 1000):
|
||||||
|
values.append(random.randint(type_dict[c_type][0], type_dict[c_type][1]))
|
||||||
|
return values
|
||||||
|
|
||||||
|
|
||||||
|
def generate_random_strings():
|
||||||
|
values = []
|
||||||
|
for t in [string.digits,
|
||||||
|
string.ascii_letters + string.digits,
|
||||||
|
string.ascii_lowercase,
|
||||||
|
string.ascii_uppercase,
|
||||||
|
string.printable,
|
||||||
|
string.punctuation,
|
||||||
|
string.hexdigits]:
|
||||||
|
for i in range(0, 100):
|
||||||
|
values.append(''.join(random.choice(t) for _ in range(random.randint(0, 20))))
|
||||||
|
return values
|
||||||
|
|
||||||
|
|
||||||
|
def get_random_ints(c_type):
|
||||||
|
for value in generate_random_numbers(c_type):
|
||||||
|
yield value
|
||||||
|
|
||||||
|
|
||||||
|
def get_random_strings():
|
||||||
|
for value in generate_random_strings():
|
||||||
|
yield value
|
Loading…
Reference in New Issue
Block a user