Add fuzzy tests API

Signed-off-by: jwirkus <jakubx.wirkus@intel.com>
This commit is contained in:
jwirkus
2022-11-03 15:05:12 +01:00
parent 6dbdb94b2d
commit 2f4d2f70c5
25 changed files with 212 additions and 99 deletions

View File

@@ -2,9 +2,11 @@
# Copyright(c) 2022 Intel Corporation
# SPDX-License-Identifier: BSD-3-Clause
#
import os
import posixpath
from typing import Callable
from collections import namedtuple
from typing import List
import yaml
@@ -22,43 +24,73 @@ def get_fuzz_config(config_name: str):
return fuzz_config
def prepare_cas_instance(cache_disk, core_disk, cache_mode: CacheMode = None,
def get_device_fuzz_config(device_paths: List[str]):
if len(device_paths) == 0:
raise Exception("device_paths parameter cannot be empty list")
device_base_config = get_fuzz_config("device.yml")
device_base_config[0]['attributes']['value'] = device_paths[0]
if len(device_paths) > 1:
other_valid_devices = {
'name': 'Hint',
'attributes': {
'name': 'ValidValues',
'value': ';'.join(device_paths[1:])
}
}
device_base_config[0]['children'].append(other_valid_devices)
return device_base_config
def prepare_cas_instance(cache_device, core_device, cache_mode: CacheMode = None,
cache_line_size: CacheLineSize = None,
kernel_params: KernelParameters = KernelParameters(),
cleaning_policy: CleaningPolicy = None, mount_point: str = None):
cleaning_policy: CleaningPolicy = None, mount_point: str = None,
create_partition=True):
# Change cleaning policy to default for Write Policy different than WB
if cleaning_policy:
cleaning_policy = CleaningPolicy.DEFAULT if cache_mode != CacheMode.WB \
else cleaning_policy
cache_disk.create_partitions([Size(400, Unit.MebiByte)])
cache_device = cache_disk.partitions[0]
if create_partition is True:
cache_device.create_partitions([Size(400, Unit.MebiByte)])
cache_device = cache_device.partitions[0]
cache = casadm.start_cache(cache_device, cache_mode, cache_line_size, 1, True,
kernel_params=kernel_params)
if cleaning_policy:
cache.set_cleaning_policy(cleaning_policy)
if mount_point:
core_disk.create_filesystem(Filesystem.ext4)
core = cache.add_core(core_disk)
core_device.create_filesystem(Filesystem.ext4)
core = cache.add_core(core_device)
core.mount(mount_point)
else:
core = cache.add_core(core_disk)
core = cache.add_core(core_device)
return cache, core
def run_cmd_and_validate(cmd, value_name: str, valid_values: list,
post_process_param_func: Callable = None):
def run_cmd_and_validate(cmd, value_name: str, is_valid: bool):
TestRun.LOGGER.info(f"{value_name}: {cmd.param}")
TestRun.LOGGER.info(f"Encoded command: {cmd.command}")
TestRun.LOGGER.info(f"Command: {cmd.command}")
output = TestRun.executor.run(cmd.command)
param = cmd.param
if post_process_param_func:
param = post_process_param_func(param)
if output.exit_code == 0 and param not in valid_values:
TestRun.LOGGER.error(f" {param} value is not valid")
elif output.exit_code != 0 and param in valid_values:
TestRun.LOGGER.error(f" {param} value is valid but command returned with "
f"{output.exit_code} exit code")
if output.exit_code == 0 and not is_valid:
TestRun.LOGGER.error(f"{cmd.param} value is not valid\n"
f"stdout: {output.stdout}\n"
f"stderr: {output.stderr}")
elif output.exit_code != 0 and is_valid:
TestRun.LOGGER.error(f"{cmd.param} value is valid but command returned with "
f"{output.exit_code} exit code\n"
f"stdout: {output.stdout}\n"
f"stderr: {output.stderr}")
return output
def get_cmd(command, param):
FuzzedCommand = namedtuple('Command', ['param', 'command'])
return FuzzedCommand(param, command)