Add security fuzzy tests and improve pyocf api
Fuzzy tests added for: changing cache mode attaching cache device with fuzzed cache line size setting cleaning policy setting alru params setting acp params setting seq cut off policy per cache setting seq cut off policy per core
This commit is contained in:
parent
057777e91d
commit
d55195ecf5
@ -666,7 +666,7 @@ int ocf_mngt_cache_set_mode(ocf_cache_t cache, ocf_cache_mode_t mode);
|
|||||||
* use function ocf_mngt_cache_save().
|
* use function ocf_mngt_cache_save().
|
||||||
*
|
*
|
||||||
* @param[in] cache Cache handle
|
* @param[in] cache Cache handle
|
||||||
* @param[in] type Cleainig policy type
|
* @param[in] type Cleaning policy type
|
||||||
*
|
*
|
||||||
* @retval 0 Policy has been set successfully
|
* @retval 0 Policy has been set successfully
|
||||||
* @retval Non-zero Error occurred and policy has not been set
|
* @retval Non-zero Error occurred and policy has not been set
|
||||||
@ -691,6 +691,7 @@ int ocf_mngt_cache_cleaning_get_policy(ocf_cache_t cache, ocf_cleaning_t *type);
|
|||||||
* use function ocf_mngt_cache_save().
|
* use function ocf_mngt_cache_save().
|
||||||
*
|
*
|
||||||
* @param[in] cache Cache handle
|
* @param[in] cache Cache handle
|
||||||
|
* @param[in] type Cleaning policy type
|
||||||
* @param[in] param_id Cleaning policy parameter id
|
* @param[in] param_id Cleaning policy parameter id
|
||||||
* @param[in] param_value Cleaning policy parameter value
|
* @param[in] param_value Cleaning policy parameter value
|
||||||
*
|
*
|
||||||
@ -704,6 +705,7 @@ int ocf_mngt_cache_cleaning_set_param(ocf_cache_t cache, ocf_cleaning_t type,
|
|||||||
* @brief Get cleaning parameter from given cache
|
* @brief Get cleaning parameter from given cache
|
||||||
*
|
*
|
||||||
* @param[in] cache Cache handle
|
* @param[in] cache Cache handle
|
||||||
|
* @param[in] type Cleaning policy type
|
||||||
* @param[in] param_id Cleaning policy parameter id
|
* @param[in] param_id Cleaning policy parameter id
|
||||||
* @param[in] param_value Variable to store parameter value
|
* @param[in] param_value Variable to store parameter value
|
||||||
*
|
*
|
||||||
|
@ -21,7 +21,7 @@ from enum import IntEnum
|
|||||||
from datetime import timedelta
|
from datetime import timedelta
|
||||||
|
|
||||||
from ..ocf import OcfLib
|
from ..ocf import OcfLib
|
||||||
from .shared import Uuid, OcfError, CacheLineSize, CacheLines, OcfCompletion
|
from .shared import Uuid, OcfError, CacheLineSize, CacheLines, OcfCompletion, SeqCutOffPolicy
|
||||||
from ..utils import Size, struct_to_dict
|
from ..utils import Size, struct_to_dict
|
||||||
from .core import Core
|
from .core import Core
|
||||||
from .queue import Queue
|
from .queue import Queue
|
||||||
@ -82,6 +82,18 @@ class CleaningPolicy(IntEnum):
|
|||||||
DEFAULT = ALRU
|
DEFAULT = ALRU
|
||||||
|
|
||||||
|
|
||||||
|
class AlruParams(IntEnum):
|
||||||
|
WAKE_UP_TIME = 0,
|
||||||
|
STALE_BUFFER_TIME = 1,
|
||||||
|
FLUSH_MAX_BUFFERS = 2,
|
||||||
|
ACTIVITY_THRESHOLD = 3
|
||||||
|
|
||||||
|
|
||||||
|
class AcpParams(IntEnum):
|
||||||
|
WAKE_UP_TIME = 0,
|
||||||
|
FLUSH_MAX_BUFFERS = 1
|
||||||
|
|
||||||
|
|
||||||
class MetadataLayout(IntEnum):
|
class MetadataLayout(IntEnum):
|
||||||
STRIPING = 0
|
STRIPING = 0
|
||||||
SEQUENTIAL = 1
|
SEQUENTIAL = 1
|
||||||
@ -168,7 +180,35 @@ class Cache:
|
|||||||
|
|
||||||
def change_cache_mode(self, cache_mode: CacheMode):
|
def change_cache_mode(self, cache_mode: CacheMode):
|
||||||
self.get_and_write_lock()
|
self.get_and_write_lock()
|
||||||
self.owner.lib.ocf_mngt_cache_set_mode(self.cache_handle, cache_mode)
|
status = self.owner.lib.ocf_mngt_cache_set_mode(self.cache_handle, cache_mode)
|
||||||
|
if status:
|
||||||
|
raise OcfError("Error changing cache mode", status)
|
||||||
|
self.put_and_write_unlock()
|
||||||
|
|
||||||
|
def set_cleaning_policy(self, cleaning_policy: CleaningPolicy):
|
||||||
|
self.get_and_write_lock()
|
||||||
|
status = self.owner.lib.ocf_mngt_cache_cleaning_set_policy(self.cache_handle, cleaning_policy)
|
||||||
|
if status:
|
||||||
|
raise OcfError("Error changing cleaning policy", status)
|
||||||
|
self.put_and_write_unlock()
|
||||||
|
|
||||||
|
def set_cleaning_policy_param(self, cleaning_policy: CleaningPolicy, param_id, param_value):
|
||||||
|
self.get_and_write_lock()
|
||||||
|
status = self.owner.lib.ocf_mngt_cache_cleaning_set_param(
|
||||||
|
self.cache_handle,
|
||||||
|
cleaning_policy,
|
||||||
|
param_id,
|
||||||
|
param_value
|
||||||
|
)
|
||||||
|
if status:
|
||||||
|
raise OcfError("Error setting cleaning policy param", status)
|
||||||
|
self.put_and_write_unlock()
|
||||||
|
|
||||||
|
def set_seq_cut_off_policy(self, policy: SeqCutOffPolicy):
|
||||||
|
self.get_and_write_lock()
|
||||||
|
status = self.owner.lib.ocf_mngt_core_set_seq_cutoff_policy_all(self.cache_handle, policy)
|
||||||
|
if status:
|
||||||
|
raise OcfError("Error setting cache seq cut off policy", status)
|
||||||
self.put_and_write_unlock()
|
self.put_and_write_unlock()
|
||||||
|
|
||||||
def configure_device(
|
def configure_device(
|
||||||
@ -457,3 +497,10 @@ lib.ocf_mngt_cache_remove_core.argtypes = [c_void_p, c_void_p, c_void_p]
|
|||||||
lib.ocf_mngt_cache_add_core.argtypes = [c_void_p, c_void_p, c_void_p, c_void_p]
|
lib.ocf_mngt_cache_add_core.argtypes = [c_void_p, c_void_p, c_void_p, c_void_p]
|
||||||
lib.ocf_cache_get_name.argtypes = [c_void_p]
|
lib.ocf_cache_get_name.argtypes = [c_void_p]
|
||||||
lib.ocf_cache_get_name.restype = c_char_p
|
lib.ocf_cache_get_name.restype = c_char_p
|
||||||
|
lib.ocf_mngt_cache_cleaning_set_policy.argtypes = [c_void_p, c_uint32]
|
||||||
|
lib.ocf_mngt_cache_cleaning_set_policy.restype = c_int
|
||||||
|
lib.ocf_mngt_core_set_seq_cutoff_policy_all.argtypes = [c_void_p, c_uint32]
|
||||||
|
lib.ocf_mngt_core_set_seq_cutoff_policy_all.restype = c_int
|
||||||
|
lib.ocf_mngt_cache_cleaning_set_param.argtypes = [c_void_p, c_uint32, c_uint32, c_uint32]
|
||||||
|
lib.ocf_mngt_cache_cleaning_set_param.restype = c_int
|
||||||
|
|
||||||
|
@ -21,7 +21,7 @@ import logging
|
|||||||
from datetime import timedelta
|
from datetime import timedelta
|
||||||
|
|
||||||
from ..ocf import OcfLib
|
from ..ocf import OcfLib
|
||||||
from .shared import Uuid, OcfCompletion, OcfError
|
from .shared import Uuid, OcfCompletion, OcfError, SeqCutOffPolicy
|
||||||
from .volume import Volume
|
from .volume import Volume
|
||||||
from .data import Data
|
from .data import Data
|
||||||
from .io import Io, IoDir
|
from .io import Io, IoDir
|
||||||
@ -138,6 +138,13 @@ class Core:
|
|||||||
"errors": struct_to_dict(errors),
|
"errors": struct_to_dict(errors),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
def set_seq_cut_off_policy(self, policy: SeqCutOffPolicy):
|
||||||
|
self.cache.get_and_write_lock()
|
||||||
|
status = self.cache.owner.lib.ocf_mngt_core_set_seq_cutoff_policy(self.handle, policy)
|
||||||
|
if status:
|
||||||
|
raise OcfError("Error setting core seq cut off policy", status)
|
||||||
|
self.cache.put_and_write_unlock()
|
||||||
|
|
||||||
def reset_stats(self):
|
def reset_stats(self):
|
||||||
self.cache.owner.lib.ocf_core_stats_initialize(self.handle)
|
self.cache.owner.lib.ocf_core_stats_initialize(self.handle)
|
||||||
|
|
||||||
@ -168,3 +175,5 @@ lib.ocf_volume_new_io.argtypes = [c_void_p]
|
|||||||
lib.ocf_volume_new_io.restype = c_void_p
|
lib.ocf_volume_new_io.restype = c_void_p
|
||||||
lib.ocf_core_get_volume.argtypes = [c_void_p]
|
lib.ocf_core_get_volume.argtypes = [c_void_p]
|
||||||
lib.ocf_core_get_volume.restype = c_void_p
|
lib.ocf_core_get_volume.restype = c_void_p
|
||||||
|
lib.ocf_mngt_core_set_seq_cutoff_policy.argtypes = [c_void_p, c_uint32]
|
||||||
|
lib.ocf_mngt_core_set_seq_cutoff_policy.restype = c_int
|
||||||
|
@ -123,6 +123,13 @@ class CacheLineSize(IntEnum):
|
|||||||
DEFAULT = LINE_4KiB
|
DEFAULT = LINE_4KiB
|
||||||
|
|
||||||
|
|
||||||
|
class SeqCutOffPolicy(IntEnum):
|
||||||
|
ALWAYS = 0
|
||||||
|
FULL = 1
|
||||||
|
NEVER = 2
|
||||||
|
DEFAULT = FULL
|
||||||
|
|
||||||
|
|
||||||
class CacheLines(S):
|
class CacheLines(S):
|
||||||
def __init__(self, count: int, line_size: CacheLineSize):
|
def __init__(self, count: int, line_size: CacheLineSize):
|
||||||
self.bytes = count * line_size
|
self.bytes = count * line_size
|
||||||
|
@ -15,7 +15,6 @@ def print_buffer(buf, length, offset=0, width=16, stop_after_zeros=0):
|
|||||||
|
|
||||||
for addr in range(offset, end, width):
|
for addr in range(offset, end, width):
|
||||||
cur_line = buf[addr : min(end, addr + width)]
|
cur_line = buf[addr : min(end, addr + width)]
|
||||||
all_zeros = True
|
|
||||||
byteline = ""
|
byteline = ""
|
||||||
asciiline = ""
|
asciiline = ""
|
||||||
if not any(cur_line):
|
if not any(cur_line):
|
||||||
|
@ -5,59 +5,19 @@
|
|||||||
|
|
||||||
import os
|
import os
|
||||||
import sys
|
import sys
|
||||||
import random
|
|
||||||
import string
|
|
||||||
from ctypes import (
|
from ctypes import (
|
||||||
c_uint64,
|
c_uint64,
|
||||||
c_uint32,
|
c_uint32,
|
||||||
c_uint16,
|
c_uint16,
|
||||||
c_int,
|
c_int
|
||||||
c_uint
|
|
||||||
)
|
)
|
||||||
|
from tests.utils import get_random_strings, get_random_ints
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
sys.path.append(os.path.join(os.path.dirname(__file__), os.path.pardir))
|
sys.path.append(os.path.join(os.path.dirname(__file__), os.path.pardir))
|
||||||
|
|
||||||
|
|
||||||
def generate_random_numbers(c_type):
|
|
||||||
type_dict = {
|
|
||||||
c_uint16: [0, c_uint16(-1).value],
|
|
||||||
c_uint32: [0, c_uint32(-1).value],
|
|
||||||
c_uint64: [0, c_uint64(-1).value],
|
|
||||||
c_int: [int(-c_uint(-1).value / 2) - 1, int(c_uint(-1).value / 2)]
|
|
||||||
}
|
|
||||||
|
|
||||||
values = []
|
|
||||||
for i in range(0, 1000):
|
|
||||||
values.append(random.randint(type_dict[c_type][0], type_dict[c_type][1]))
|
|
||||||
return values
|
|
||||||
|
|
||||||
|
|
||||||
def generate_random_strings():
|
|
||||||
values = []
|
|
||||||
for t in [string.digits,
|
|
||||||
string.ascii_letters + string.digits,
|
|
||||||
string.ascii_lowercase,
|
|
||||||
string.ascii_uppercase,
|
|
||||||
string.printable,
|
|
||||||
string.punctuation,
|
|
||||||
string.hexdigits]:
|
|
||||||
for i in range(0, 100):
|
|
||||||
values.append(''.join(random.choice(t) for _ in range(random.randint(0, 20))))
|
|
||||||
return values
|
|
||||||
|
|
||||||
|
|
||||||
def get_random_ints(c_type):
|
|
||||||
for value in generate_random_numbers(c_type):
|
|
||||||
yield value
|
|
||||||
|
|
||||||
|
|
||||||
def get_random_strings():
|
|
||||||
for value in generate_random_strings():
|
|
||||||
yield value
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(params=get_random_ints(c_uint16))
|
@pytest.fixture(params=get_random_ints(c_uint16))
|
||||||
def c_uint16_randomize(request):
|
def c_uint16_randomize(request):
|
||||||
return request.param
|
return request.param
|
||||||
|
208
tests/functional/tests/security/test_management_fuzzy.py
Normal file
208
tests/functional/tests/security/test_management_fuzzy.py
Normal file
@ -0,0 +1,208 @@
|
|||||||
|
#
|
||||||
|
# Copyright(c) 2019 Intel Corporation
|
||||||
|
# SPDX-License-Identifier: BSD-3-Clause-Clear
|
||||||
|
#
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from pyocf.types.cache import Cache, CacheMode, CleaningPolicy, AlruParams, AcpParams
|
||||||
|
from pyocf.types.core import Core
|
||||||
|
from pyocf.types.volume import Volume
|
||||||
|
from pyocf.utils import Size as S
|
||||||
|
from tests.utils import generate_random_numbers
|
||||||
|
from pyocf.types.shared import OcfError, CacheLineSize, SeqCutOffPolicy
|
||||||
|
from ctypes import (
|
||||||
|
c_uint64,
|
||||||
|
c_uint32
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize("cm", CacheMode)
|
||||||
|
@pytest.mark.parametrize("cls", CacheLineSize)
|
||||||
|
@pytest.mark.security
|
||||||
|
def test_neg_change_cache_mode(pyocf_ctx, cm, cls):
|
||||||
|
"""
|
||||||
|
Test whether it is possible to change cache mode to invalid value.
|
||||||
|
:param pyocf_ctx: basic pyocf context fixture
|
||||||
|
:param cm: cache mode we start with
|
||||||
|
:param cls: cache line size we start with
|
||||||
|
"""
|
||||||
|
# Start cache device
|
||||||
|
cache_device = Volume(S.from_MiB(30))
|
||||||
|
cache = Cache.start_on_device(
|
||||||
|
cache_device, cache_mode=cm, cache_line_size=cls
|
||||||
|
)
|
||||||
|
|
||||||
|
# Change cache mode to invalid one and check if failed
|
||||||
|
for i in generate_random_numbers(c_uint32):
|
||||||
|
if i in [item.value for item in CacheMode]:
|
||||||
|
continue
|
||||||
|
with pytest.raises(OcfError, match="Error changing cache mode"):
|
||||||
|
cache.change_cache_mode(i)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize("cm", CacheMode)
|
||||||
|
@pytest.mark.parametrize("cls", CacheLineSize)
|
||||||
|
@pytest.mark.security
|
||||||
|
def test_neg_set_cleaning_policy(pyocf_ctx, cm, cls):
|
||||||
|
"""
|
||||||
|
Test whether it is possible to change cleaning policy to invalid value
|
||||||
|
:param pyocf_ctx: basic pyocf context fixture
|
||||||
|
:param cm: cache mode we start with
|
||||||
|
:param cls: cache line size we start with
|
||||||
|
:return:
|
||||||
|
"""
|
||||||
|
# Start cache device
|
||||||
|
cache_device = Volume(S.from_MiB(30))
|
||||||
|
cache = Cache.start_on_device(
|
||||||
|
cache_device, cache_mode=cm, cache_line_size=cls
|
||||||
|
)
|
||||||
|
|
||||||
|
# Set cleaning policy to invalid one and check if failed
|
||||||
|
for i in generate_random_numbers(c_uint32):
|
||||||
|
if i in [item.value for item in CleaningPolicy]:
|
||||||
|
continue
|
||||||
|
with pytest.raises(OcfError, match="Error changing cleaning policy"):
|
||||||
|
cache.set_cleaning_policy(i)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize("cm", CacheMode)
|
||||||
|
@pytest.mark.parametrize("cls", CacheLineSize)
|
||||||
|
@pytest.mark.security
|
||||||
|
def test_neg_attach_cls(pyocf_ctx, cm, cls):
|
||||||
|
"""
|
||||||
|
Test whether it is possible to change cache line size to
|
||||||
|
invalid value while attaching cache device
|
||||||
|
:param pyocf_ctx: basic pyocf context fixture
|
||||||
|
:param cm: cache mode we start with
|
||||||
|
:param cls: cache line size we start with
|
||||||
|
:return:
|
||||||
|
"""
|
||||||
|
# Start cache device
|
||||||
|
cache_device = Volume(S.from_MiB(30))
|
||||||
|
cache = Cache(owner=cache_device.owner, cache_mode=cm, cache_line_size=cls)
|
||||||
|
cache.start_cache()
|
||||||
|
|
||||||
|
# Check whether it is possible to attach cache device with invalid cache line size
|
||||||
|
for i in generate_random_numbers(c_uint64):
|
||||||
|
if i in [item.value for item in CacheLineSize]:
|
||||||
|
continue
|
||||||
|
with pytest.raises(OcfError, match="Attaching cache device failed"):
|
||||||
|
cache.attach_device(cache_device, cache_line_size=i)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize("cm", CacheMode)
|
||||||
|
@pytest.mark.parametrize("cls", CacheLineSize)
|
||||||
|
@pytest.mark.security
|
||||||
|
def test_neg_cache_set_seq_cut_off_policy(pyocf_ctx, cm, cls):
|
||||||
|
"""
|
||||||
|
Test whether it is possible to change cache seq cut-off policy to invalid value
|
||||||
|
:param pyocf_ctx: basic pyocf context fixture
|
||||||
|
:param cm: cache mode we start with
|
||||||
|
:param cls: cache line size we start with
|
||||||
|
:return:
|
||||||
|
"""
|
||||||
|
# Start cache device
|
||||||
|
cache_device = Volume(S.from_MiB(30))
|
||||||
|
cache = Cache.start_on_device(
|
||||||
|
cache_device, cache_mode=cm, cache_line_size=cls
|
||||||
|
)
|
||||||
|
|
||||||
|
# Create 2 core devices
|
||||||
|
core_device1 = Volume(S.from_MiB(10))
|
||||||
|
core1 = Core.using_device(core_device1)
|
||||||
|
core_device2 = Volume(S.from_MiB(10))
|
||||||
|
core2 = Core.using_device(core_device2)
|
||||||
|
|
||||||
|
# Add cores
|
||||||
|
cache.add_core(core1)
|
||||||
|
cache.add_core(core2)
|
||||||
|
|
||||||
|
# Change cache seq cut off policy to invalid one and check if failed
|
||||||
|
for i in generate_random_numbers(c_uint32):
|
||||||
|
if i in [item.value for item in SeqCutOffPolicy]:
|
||||||
|
continue
|
||||||
|
with pytest.raises(OcfError, match="Error setting cache seq cut off policy"):
|
||||||
|
cache.set_seq_cut_off_policy(i)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize("cm", CacheMode)
|
||||||
|
@pytest.mark.parametrize("cls", CacheLineSize)
|
||||||
|
@pytest.mark.security
|
||||||
|
def test_neg_core_set_seq_cut_off_policy(pyocf_ctx, cm, cls):
|
||||||
|
"""
|
||||||
|
Test whether it is possible to change core seq cut-off policy to invalid value
|
||||||
|
:param pyocf_ctx: basic pyocf context fixture
|
||||||
|
:param cm: cache mode we start with
|
||||||
|
:param cls: cache line size we start with
|
||||||
|
:return:
|
||||||
|
"""
|
||||||
|
# Start cache device
|
||||||
|
cache_device = Volume(S.from_MiB(30))
|
||||||
|
cache = Cache.start_on_device(
|
||||||
|
cache_device, cache_mode=cm, cache_line_size=cls
|
||||||
|
)
|
||||||
|
|
||||||
|
# Create core device
|
||||||
|
core_device = Volume(S.from_MiB(10))
|
||||||
|
core = Core.using_device(core_device)
|
||||||
|
|
||||||
|
# Add core
|
||||||
|
cache.add_core(core)
|
||||||
|
|
||||||
|
# Change core seq cut off policy to invalid one and check if failed
|
||||||
|
for i in generate_random_numbers(c_uint32):
|
||||||
|
if i in [item.value for item in SeqCutOffPolicy]:
|
||||||
|
continue
|
||||||
|
with pytest.raises(OcfError, match="Error setting core seq cut off policy"):
|
||||||
|
core.set_seq_cut_off_policy(i)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize("cm", CacheMode)
|
||||||
|
@pytest.mark.parametrize("cls", CacheLineSize)
|
||||||
|
@pytest.mark.security
|
||||||
|
def test_neg_set_alru_param(pyocf_ctx, cm, cls):
|
||||||
|
"""
|
||||||
|
Test whether it is possible to set invalid param for alru cleaning policy
|
||||||
|
:param pyocf_ctx: basic pyocf context fixture
|
||||||
|
:param cm: cache mode we start with
|
||||||
|
:param cls: cache line size we start with
|
||||||
|
:return:
|
||||||
|
"""
|
||||||
|
# Start cache device
|
||||||
|
cache_device = Volume(S.from_MiB(30))
|
||||||
|
cache = Cache.start_on_device(
|
||||||
|
cache_device, cache_mode=cm, cache_line_size=cls
|
||||||
|
)
|
||||||
|
|
||||||
|
# Change invalid alru param and check if failed
|
||||||
|
for i in generate_random_numbers(c_uint32):
|
||||||
|
if i in [item.value for item in AlruParams]:
|
||||||
|
continue
|
||||||
|
with pytest.raises(OcfError, match="Error setting cleaning policy param"):
|
||||||
|
cache.set_cleaning_policy_param(CleaningPolicy.ALRU, i, 1)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize("cm", CacheMode)
|
||||||
|
@pytest.mark.parametrize("cls", CacheLineSize)
|
||||||
|
@pytest.mark.security
|
||||||
|
def test_neg_set_acp_param(pyocf_ctx, cm, cls):
|
||||||
|
"""
|
||||||
|
Test whether it is possible to set invalid param for acp cleaning policy
|
||||||
|
:param pyocf_ctx: basic pyocf context fixture
|
||||||
|
:param cm: cache mode we start with
|
||||||
|
:param cls: cache line size we start with
|
||||||
|
:return:
|
||||||
|
"""
|
||||||
|
# Start cache device
|
||||||
|
cache_device = Volume(S.from_MiB(30))
|
||||||
|
cache = Cache.start_on_device(
|
||||||
|
cache_device, cache_mode=cm, cache_line_size=cls
|
||||||
|
)
|
||||||
|
|
||||||
|
# Change invalid acp param and check if failed
|
||||||
|
for i in generate_random_numbers(c_uint32):
|
||||||
|
if i in [item.value for item in AcpParams]:
|
||||||
|
continue
|
||||||
|
with pytest.raises(OcfError, match="Error setting cleaning policy param"):
|
||||||
|
cache.set_cleaning_policy_param(CleaningPolicy.ALRU, i, 1)
|
52
tests/functional/tests/utils.py
Normal file
52
tests/functional/tests/utils.py
Normal file
@ -0,0 +1,52 @@
|
|||||||
|
#
|
||||||
|
# Copyright(c) 2019 Intel Corporation
|
||||||
|
# SPDX-License-Identifier: BSD-3-Clause-Clear
|
||||||
|
#
|
||||||
|
|
||||||
|
import random
|
||||||
|
import string
|
||||||
|
from ctypes import (
|
||||||
|
c_uint64,
|
||||||
|
c_uint32,
|
||||||
|
c_uint16,
|
||||||
|
c_int,
|
||||||
|
c_uint
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def generate_random_numbers(c_type):
|
||||||
|
type_dict = {
|
||||||
|
c_uint16: [0, c_uint16(-1).value],
|
||||||
|
c_uint32: [0, c_uint32(-1).value],
|
||||||
|
c_uint64: [0, c_uint64(-1).value],
|
||||||
|
c_int: [int(-c_uint(-1).value / 2) - 1, int(c_uint(-1).value / 2)]
|
||||||
|
}
|
||||||
|
|
||||||
|
values = []
|
||||||
|
for i in range(0, 1000):
|
||||||
|
values.append(random.randint(type_dict[c_type][0], type_dict[c_type][1]))
|
||||||
|
return values
|
||||||
|
|
||||||
|
|
||||||
|
def generate_random_strings():
|
||||||
|
values = []
|
||||||
|
for t in [string.digits,
|
||||||
|
string.ascii_letters + string.digits,
|
||||||
|
string.ascii_lowercase,
|
||||||
|
string.ascii_uppercase,
|
||||||
|
string.printable,
|
||||||
|
string.punctuation,
|
||||||
|
string.hexdigits]:
|
||||||
|
for i in range(0, 100):
|
||||||
|
values.append(''.join(random.choice(t) for _ in range(random.randint(0, 20))))
|
||||||
|
return values
|
||||||
|
|
||||||
|
|
||||||
|
def get_random_ints(c_type):
|
||||||
|
for value in generate_random_numbers(c_type):
|
||||||
|
yield value
|
||||||
|
|
||||||
|
|
||||||
|
def get_random_strings():
|
||||||
|
for value in generate_random_strings():
|
||||||
|
yield value
|
Loading…
Reference in New Issue
Block a user