Merge pull request #1560 from katlapinka/kasiat/test-security-fixes

Small fixes for security tests
This commit is contained in:
Katarzyna Treder 2024-10-15 09:37:55 +02:00 committed by GitHub
commit bffe87d071
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 78 additions and 13 deletions

View File

@ -20,6 +20,7 @@ from api.cas.cli import add_core_cmd
from core.test_run import TestRun from core.test_run import TestRun
from storage_devices.disk import DiskType, DiskTypeSet, DiskTypeLowerThan from storage_devices.disk import DiskType, DiskTypeSet, DiskTypeLowerThan
from test_tools.peach_fuzzer.peach_fuzzer import PeachFuzzer from test_tools.peach_fuzzer.peach_fuzzer import PeachFuzzer
from test_utils.os_utils import Udev
from tests.security.fuzzy.kernel.common.common import ( from tests.security.fuzzy.kernel.common.common import (
get_fuzz_config, get_fuzz_config,
run_cmd_and_validate, run_cmd_and_validate,
@ -68,6 +69,9 @@ def test_fuzzy_add_core_cache_id(
) )
) )
with TestRun.step("Disable udev"):
Udev.disable()
with TestRun.step("Prepare PeachFuzzer"): with TestRun.step("Prepare PeachFuzzer"):
valid_values = [str(cache.cache_id).encode("ascii")] valid_values = [str(cache.cache_id).encode("ascii")]
fuzz_config = get_fuzz_config("cache_id.yml") fuzz_config = get_fuzz_config("cache_id.yml")

View File

@ -24,6 +24,7 @@ from tests.security.fuzzy.kernel.common.common import (
get_fuzz_config, get_fuzz_config,
run_cmd_and_validate, run_cmd_and_validate,
) )
from test_utils.os_utils import Udev
core_id_min = 0 core_id_min = 0
core_id_max = 4095 core_id_max = 4095
@ -68,6 +69,9 @@ def test_fuzzy_add_core_core_id(
) )
) )
with TestRun.step("Disable udev"):
Udev.disable()
with TestRun.step("Prepare PeachFuzzer"): with TestRun.step("Prepare PeachFuzzer"):
fuzz_config = get_fuzz_config("core_id.yml") fuzz_config = get_fuzz_config("core_id.yml")
PeachFuzzer.generate_config(fuzz_config) PeachFuzzer.generate_config(fuzz_config)

View File

@ -20,6 +20,7 @@ from api.cas.cli import add_core_cmd
from core.test_run import TestRun from core.test_run import TestRun
from storage_devices.disk import DiskType, DiskTypeSet, DiskTypeLowerThan from storage_devices.disk import DiskType, DiskTypeSet, DiskTypeLowerThan
from test_tools.peach_fuzzer.peach_fuzzer import PeachFuzzer from test_tools.peach_fuzzer.peach_fuzzer import PeachFuzzer
from test_utils.os_utils import Udev
from tests.security.fuzzy.kernel.common.common import ( from tests.security.fuzzy.kernel.common.common import (
run_cmd_and_validate, run_cmd_and_validate,
get_device_fuzz_config, get_device_fuzz_config,
@ -67,6 +68,9 @@ def test_fuzzy_add_core_device(
) )
) )
with TestRun.step("Disable udev"):
Udev.disable()
with TestRun.step("Prepare PeachFuzzer"): with TestRun.step("Prepare PeachFuzzer"):
valid_values = [ valid_values = [
disk.path disk.path

View File

@ -9,6 +9,7 @@ from api.cas.cli import script_try_add_cmd, remove_detached_cmd
from core.test_run import TestRun from core.test_run import TestRun
from storage_devices.disk import DiskTypeSet, DiskType from storage_devices.disk import DiskTypeSet, DiskType
from test_tools.peach_fuzzer.peach_fuzzer import PeachFuzzer from test_tools.peach_fuzzer.peach_fuzzer import PeachFuzzer
from test_utils.os_utils import Udev
from tests.security.fuzzy.kernel.common.common import ( from tests.security.fuzzy.kernel.common.common import (
run_cmd_and_validate, run_cmd_and_validate,
get_fuzz_config, get_fuzz_config,
@ -41,6 +42,9 @@ def test_fuzzy_script_add_core_try_add_cache_id():
command_template=base_cmd, count=TestRun.usr.fuzzy_iter_count command_template=base_cmd, count=TestRun.usr.fuzzy_iter_count
) )
with TestRun.step("Disable udev"):
Udev.disable()
for index, cmd in TestRun.iteration( for index, cmd in TestRun.iteration(
enumerate(commands), f"Run command {TestRun.usr.fuzzy_iter_count} times" enumerate(commands), f"Run command {TestRun.usr.fuzzy_iter_count} times"
): ):

View File

@ -9,6 +9,7 @@ from api.cas.cli import script_try_add_cmd, remove_detached_cmd
from core.test_run import TestRun from core.test_run import TestRun
from storage_devices.disk import DiskTypeSet, DiskType from storage_devices.disk import DiskTypeSet, DiskType
from test_tools.peach_fuzzer.peach_fuzzer import PeachFuzzer from test_tools.peach_fuzzer.peach_fuzzer import PeachFuzzer
from test_utils.os_utils import Udev
from tests.security.fuzzy.kernel.common.common import ( from tests.security.fuzzy.kernel.common.common import (
run_cmd_and_validate, run_cmd_and_validate,
get_fuzz_config, get_fuzz_config,
@ -43,6 +44,9 @@ def test_fuzzy_script_add_core_try_add_core_id():
command_template=base_cmd, count=TestRun.usr.fuzzy_iter_count command_template=base_cmd, count=TestRun.usr.fuzzy_iter_count
) )
with TestRun.step("Disable udev"):
Udev.disable()
for index, cmd in TestRun.iteration( for index, cmd in TestRun.iteration(
enumerate(commands), f"Run command {TestRun.usr.fuzzy_iter_count} times" enumerate(commands), f"Run command {TestRun.usr.fuzzy_iter_count} times"
): ):

View File

@ -7,6 +7,7 @@
from api.cas.cli import script_try_add_cmd, remove_detached_cmd from api.cas.cli import script_try_add_cmd, remove_detached_cmd
from core.test_run import TestRun from core.test_run import TestRun
from test_tools.peach_fuzzer.peach_fuzzer import PeachFuzzer from test_tools.peach_fuzzer.peach_fuzzer import PeachFuzzer
from test_utils.os_utils import Udev
from tests.security.fuzzy.kernel.common.common import ( from tests.security.fuzzy.kernel.common.common import (
run_cmd_and_validate, run_cmd_and_validate,
get_device_fuzz_config, get_device_fuzz_config,
@ -36,6 +37,9 @@ def test_fuzzy_script_add_core_try_add_core_device():
command_template=base_cmd, count=TestRun.usr.fuzzy_iter_count command_template=base_cmd, count=TestRun.usr.fuzzy_iter_count
) )
with TestRun.step("Disable udev"):
Udev.disable()
for index, cmd in TestRun.iteration( for index, cmd in TestRun.iteration(
enumerate(commands), f"Run command {TestRun.usr.fuzzy_iter_count} times" enumerate(commands), f"Run command {TestRun.usr.fuzzy_iter_count} times"
): ):

View File

@ -19,6 +19,7 @@ from api.cas.cli import start_cmd
from core.test_run import TestRun from core.test_run import TestRun
from storage_devices.disk import DiskType, DiskTypeSet from storage_devices.disk import DiskType, DiskTypeSet
from test_tools.peach_fuzzer.peach_fuzzer import PeachFuzzer from test_tools.peach_fuzzer.peach_fuzzer import PeachFuzzer
from test_utils.os_utils import Udev
from test_utils.size import Unit, Size from test_utils.size import Unit, Size
from tests.security.fuzzy.kernel.common.common import ( from tests.security.fuzzy.kernel.common.common import (
get_fuzz_config, get_fuzz_config,
@ -49,6 +50,9 @@ def test_fuzzy_start_cache_flags(cache_mode, cache_line_size, unaligned_io, use_
cache_disk = TestRun.disks["cache"] cache_disk = TestRun.disks["cache"]
cache_disk.create_partitions([Size(400, Unit.MebiByte)]) cache_disk.create_partitions([Size(400, Unit.MebiByte)])
with TestRun.step("Disable udev"):
Udev.disable()
with TestRun.step("Start and stop cache"): with TestRun.step("Start and stop cache"):
# Reload kernel modules # Reload kernel modules
cache = casadm.start_cache( cache = casadm.start_cache(
@ -93,7 +97,7 @@ def test_fuzzy_start_cache_flags(cache_mode, cache_line_size, unaligned_io, use_
any_alphanumeric_pattern = r"\w+" any_alphanumeric_pattern = r"\w+"
base_cmd = re.sub( base_cmd = re.sub(
pattern=f"{incompatible_param} {any_alphanumeric_pattern}", pattern=f"{incompatible_param} {any_alphanumeric_pattern}",
sub="", repl="",
string=base_cmd, string=base_cmd,
) )
base_cmd = f"{base_cmd.strip()} {param}" base_cmd = f"{base_cmd.strip()} {param}"

View File

@ -18,6 +18,7 @@ from api.cas.cli import remove_core_cmd
from core.test_run import TestRun from core.test_run import TestRun
from storage_devices.disk import DiskType, DiskTypeSet, DiskTypeLowerThan from storage_devices.disk import DiskType, DiskTypeSet, DiskTypeLowerThan
from test_tools.peach_fuzzer.peach_fuzzer import PeachFuzzer from test_tools.peach_fuzzer.peach_fuzzer import PeachFuzzer
from test_utils.os_utils import Udev
from tests.security.fuzzy.kernel.common.common import ( from tests.security.fuzzy.kernel.common.common import (
prepare_cas_instance, prepare_cas_instance,
get_fuzz_config, get_fuzz_config,
@ -56,6 +57,9 @@ def test_fuzzy_remove_core_cache_id(
cleaning_policy=cleaning_policy, cleaning_policy=cleaning_policy,
) )
with TestRun.step("Disable udev"):
Udev.disable()
with TestRun.step("Prepare PeachFuzzer"): with TestRun.step("Prepare PeachFuzzer"):
valid_values = [str(core.cache_id).encode("ascii")] valid_values = [str(core.cache_id).encode("ascii")]
PeachFuzzer.generate_config(get_fuzz_config("cache_id.yml")) PeachFuzzer.generate_config(get_fuzz_config("cache_id.yml"))

View File

@ -18,6 +18,7 @@ from api.cas.cli import remove_core_cmd
from core.test_run import TestRun from core.test_run import TestRun
from storage_devices.disk import DiskType, DiskTypeSet, DiskTypeLowerThan from storage_devices.disk import DiskType, DiskTypeSet, DiskTypeLowerThan
from test_tools.peach_fuzzer.peach_fuzzer import PeachFuzzer from test_tools.peach_fuzzer.peach_fuzzer import PeachFuzzer
from test_utils.os_utils import Udev
from tests.security.fuzzy.kernel.common.common import ( from tests.security.fuzzy.kernel.common.common import (
prepare_cas_instance, prepare_cas_instance,
get_fuzz_config, get_fuzz_config,
@ -56,6 +57,9 @@ def test_fuzzy_remove_core_core_id(
cleaning_policy=cleaning_policy, cleaning_policy=cleaning_policy,
) )
with TestRun.step("Disable udev"):
Udev.disable()
with TestRun.step("Prepare PeachFuzzer"): with TestRun.step("Prepare PeachFuzzer"):
valid_values = [str(core.core_id).encode("ascii")] valid_values = [str(core.core_id).encode("ascii")]
PeachFuzzer.generate_config(get_fuzz_config("core_id.yml")) PeachFuzzer.generate_config(get_fuzz_config("core_id.yml"))

View File

@ -17,6 +17,7 @@ from api.cas.cli import remove_core_cmd
from core.test_run import TestRun from core.test_run import TestRun
from storage_devices.disk import DiskType, DiskTypeSet, DiskTypeLowerThan from storage_devices.disk import DiskType, DiskTypeSet, DiskTypeLowerThan
from test_tools.peach_fuzzer.peach_fuzzer import PeachFuzzer from test_tools.peach_fuzzer.peach_fuzzer import PeachFuzzer
from test_utils.os_utils import Udev
from tests.security.fuzzy.kernel.common.common import ( from tests.security.fuzzy.kernel.common.common import (
prepare_cas_instance, prepare_cas_instance,
get_fuzz_config, get_fuzz_config,
@ -55,6 +56,9 @@ def test_fuzzy_remove_core_flag(
cleaning_policy=cleaning_policy, cleaning_policy=cleaning_policy,
) )
with TestRun.step("Disable udev"):
Udev.disable()
with TestRun.step("Prepare PeachFuzzer"): with TestRun.step("Prepare PeachFuzzer"):
valid_values = ["", "-f", "--force"] valid_values = ["", "-f", "--force"]
valid_values = [v.encode("ascii") for v in valid_values] valid_values = [v.encode("ascii") for v in valid_values]

View File

@ -20,6 +20,7 @@ from api.cas.init_config import InitConfig
from core.test_run import TestRun from core.test_run import TestRun
from storage_devices.disk import DiskType, DiskTypeSet, DiskTypeLowerThan from storage_devices.disk import DiskType, DiskTypeSet, DiskTypeLowerThan
from test_tools.peach_fuzzer.peach_fuzzer import PeachFuzzer from test_tools.peach_fuzzer.peach_fuzzer import PeachFuzzer
from test_utils.os_utils import Udev
from tests.security.fuzzy.kernel.common.common import ( from tests.security.fuzzy.kernel.common.common import (
prepare_cas_instance, prepare_cas_instance,
get_fuzz_config, get_fuzz_config,
@ -58,6 +59,9 @@ def test_fuzzy_remove_inactive_cache_id(
cleaning_policy=cleaning_policy, cleaning_policy=cleaning_policy,
) )
with TestRun.step("Disable udev"):
Udev.disable()
with TestRun.step("Create init config from running configuration."): with TestRun.step("Create init config from running configuration."):
InitConfig.create_init_config_from_running_configuration() InitConfig.create_init_config_from_running_configuration()
@ -93,3 +97,6 @@ def test_fuzzy_remove_inactive_cache_id(
cache.stop(no_data_flush=True) cache.stop(no_data_flush=True)
core_disk.unplug() core_disk.unplug()
casadm.load_cache(device=cache_disk.partitions[0]) casadm.load_cache(device=cache_disk.partitions[0])
with TestRun.step("Plug core device"):
core_disk.plug_all()

View File

@ -19,6 +19,7 @@ from api.cas.init_config import InitConfig
from core.test_run import TestRun from core.test_run import TestRun
from storage_devices.disk import DiskType, DiskTypeSet, DiskTypeLowerThan from storage_devices.disk import DiskType, DiskTypeSet, DiskTypeLowerThan
from test_tools.peach_fuzzer.peach_fuzzer import PeachFuzzer from test_tools.peach_fuzzer.peach_fuzzer import PeachFuzzer
from test_utils.os_utils import Udev
from tests.security.fuzzy.kernel.common.common import ( from tests.security.fuzzy.kernel.common.common import (
prepare_cas_instance, prepare_cas_instance,
get_fuzz_config, get_fuzz_config,
@ -57,6 +58,9 @@ def test_fuzzy_remove_inactive_core_id(
cleaning_policy=cleaning_policy, cleaning_policy=cleaning_policy,
) )
with TestRun.step("Disable udev"):
Udev.disable()
with TestRun.step("Create init config from running configuration."): with TestRun.step("Create init config from running configuration."):
InitConfig.create_init_config_from_running_configuration() InitConfig.create_init_config_from_running_configuration()
@ -92,3 +96,6 @@ def test_fuzzy_remove_inactive_core_id(
cache.stop(no_data_flush=True) cache.stop(no_data_flush=True)
core_disk.unplug() core_disk.unplug()
casadm.load_cache(device=cache_disk.partitions[0]) casadm.load_cache(device=cache_disk.partitions[0])
with TestRun.step("Plug core device"):
core_disk.plug_all()

View File

@ -18,6 +18,7 @@ from api.cas.cli import stop_cmd
from core.test_run import TestRun from core.test_run import TestRun
from storage_devices.disk import DiskType, DiskTypeSet, DiskTypeLowerThan from storage_devices.disk import DiskType, DiskTypeSet, DiskTypeLowerThan
from test_tools.peach_fuzzer.peach_fuzzer import PeachFuzzer from test_tools.peach_fuzzer.peach_fuzzer import PeachFuzzer
from test_utils.os_utils import Udev
from tests.security.fuzzy.kernel.common.common import ( from tests.security.fuzzy.kernel.common.common import (
prepare_cas_instance, prepare_cas_instance,
get_fuzz_config, get_fuzz_config,
@ -56,6 +57,9 @@ def test_fuzzy_stop_cache_cache_id(
cleaning_policy=cleaning_policy, cleaning_policy=cleaning_policy,
) )
with TestRun.step("Disable udev"):
Udev.disable()
with TestRun.step("Prepare PeachFuzzer"): with TestRun.step("Prepare PeachFuzzer"):
valid_values = [str(core.cache_id).encode("ascii")] valid_values = [str(core.cache_id).encode("ascii")]
PeachFuzzer.generate_config(get_fuzz_config("cache_id.yml")) PeachFuzzer.generate_config(get_fuzz_config("cache_id.yml"))

View File

@ -18,6 +18,7 @@ from api.cas.cli import stop_cmd
from core.test_run import TestRun from core.test_run import TestRun
from storage_devices.disk import DiskType, DiskTypeSet, DiskTypeLowerThan from storage_devices.disk import DiskType, DiskTypeSet, DiskTypeLowerThan
from test_tools.peach_fuzzer.peach_fuzzer import PeachFuzzer from test_tools.peach_fuzzer.peach_fuzzer import PeachFuzzer
from test_utils.os_utils import Udev
from tests.security.fuzzy.kernel.common.common import ( from tests.security.fuzzy.kernel.common.common import (
prepare_cas_instance, prepare_cas_instance,
get_fuzz_config, get_fuzz_config,
@ -56,6 +57,9 @@ def test_fuzzy_stop_cache_flag(
cleaning_policy=cleaning_policy, cleaning_policy=cleaning_policy,
) )
with TestRun.step("Disable udev"):
Udev.disable()
with TestRun.step("Prepare PeachFuzzer"): with TestRun.step("Prepare PeachFuzzer"):
valid_values = ["", "-n", "--no-data-flush"] valid_values = ["", "-n", "--no-data-flush"]
valid_values = [v.encode("ascii") for v in valid_values] valid_values = [v.encode("ascii") for v in valid_values]

View File

@ -87,7 +87,7 @@ def test_fuzzy_get_param_name(cache_mode, cache_line_size, unaligned_io, use_io_
if param == str(ParamName.seq_cutoff): if param == str(ParamName.seq_cutoff):
cmd += f" --core-id {core.core_id}" cmd += f" --core-id {core.core_id}"
cmd = base_cmd.replace("{param}", param) cmd = cmd.replace("{param}", param)
run_cmd_and_validate( run_cmd_and_validate(
cmd=get_cmd(cmd, param.encode("ascii")), cmd=get_cmd(cmd, param.encode("ascii")),

View File

@ -1,5 +1,6 @@
# #
# Copyright(c) 2019-2022 Intel Corporation # Copyright(c) 2019-2022 Intel Corporation
# Copyright(c) 2024 Huawei Technologies Co., Ltd.
# SPDX-License-Identifier: BSD-3-Clause # SPDX-License-Identifier: BSD-3-Clause
# #
@ -13,6 +14,7 @@ from core.test_run import TestRun
from storage_devices.disk import DiskType, DiskTypeSet, DiskTypeLowerThan from storage_devices.disk import DiskType, DiskTypeSet, DiskTypeLowerThan
from test_tools import fs_utils from test_tools import fs_utils
from test_tools.disk_utils import Filesystem from test_tools.disk_utils import Filesystem
from test_utils.os_utils import create_user, check_if_user_exists
from test_utils.output import CmdException from test_utils.output import CmdException
from test_utils.size import Size, Unit from test_utils.size import Size, Unit
@ -70,9 +72,7 @@ def test_user_cli():
casadm.stop_all_caches() casadm.stop_all_caches()
with TestRun.step("Add non-root user account."): with TestRun.step("Add non-root user account."):
TestRun.executor.run(f"useradd -N -r -l {user_name}") add_user()
user_home_dir = fs_utils.parse_ls_output(fs_utils.ls_item(f"/home/{user_name}"))[0]
user_home_dir.chmod_numerical(777, True)
with TestRun.step("Try to start cache."): with TestRun.step("Try to start cache."):
try: try:
@ -95,7 +95,7 @@ def test_user_cli():
with TestRun.step("Try to set cache mode."): with TestRun.step("Try to set cache mode."):
try: try:
output = run_as_other_user(cli.set_cache_mode_cmd(CacheMode.WB, output = run_as_other_user(cli.set_cache_mode_cmd(CacheMode.WB.name.lower(),
str(cache.cache_id)), user_name) str(cache.cache_id)), user_name)
if output.exit_code == 0: if output.exit_code == 0:
TestRun.LOGGER.error("Setting cache mode should fail!") TestRun.LOGGER.error("Setting cache mode should fail!")
@ -130,7 +130,7 @@ def test_user_cli():
with TestRun.step("Try to list caches."): with TestRun.step("Try to list caches."):
try: try:
output = run_as_other_user(cli.list_cmd(), user_name) output = run_as_other_user(cli.list_caches_cmd(), user_name)
if output.exit_code == 0: if output.exit_code == 0:
TestRun.LOGGER.error("Listing caches should fail!") TestRun.LOGGER.error("Listing caches should fail!")
except CmdException: except CmdException:
@ -222,7 +222,7 @@ def test_user_cli():
with TestRun.step("Try to load IO class configuration."): with TestRun.step("Try to load IO class configuration."):
try: try:
output = run_as_other_user(cli.load_io_classes_cmd( output = run_as_other_user(cli.load_io_classes_cmd(
str(cache.cache_id), io_conf_copy), user_name) str(cache.cache_id), io_conf_copy.full_path), user_name)
if output.exit_code == 0: if output.exit_code == 0:
TestRun.LOGGER.error("Loading IO class configuration should fail!") TestRun.LOGGER.error("Loading IO class configuration should fail!")
except CmdException: except CmdException:
@ -273,7 +273,7 @@ def test_user_cli():
with TestRun.step("Try to list caches with 'sudo'."): with TestRun.step("Try to list caches with 'sudo'."):
try: try:
run_as_other_user(cli.list_cmd(), user_name, True) run_as_other_user(cli.list_caches_cmd(), user_name, True)
except CmdException: except CmdException:
TestRun.LOGGER.error("Non-root sudoer user should be able to list caches.") TestRun.LOGGER.error("Non-root sudoer user should be able to list caches.")
@ -339,7 +339,7 @@ def test_user_cli():
with TestRun.step("Try to load IO class configuration with 'sudo'."): with TestRun.step("Try to load IO class configuration with 'sudo'."):
try: try:
run_as_other_user(cli.load_io_classes_cmd(str(cache.cache_id), io_conf_copy), run_as_other_user(cli.load_io_classes_cmd(str(cache.cache_id), io_conf_copy.full_path),
user_name, True) user_name, True)
except CmdException: except CmdException:
TestRun.LOGGER.error("Non-root sudoer user should be able to " TestRun.LOGGER.error("Non-root sudoer user should be able to "
@ -420,9 +420,7 @@ def test_user_service():
core.unmount() core.unmount()
with TestRun.step("Add non-root user account."): with TestRun.step("Add non-root user account."):
TestRun.executor.run(f"useradd -N -r -l {user_name}") add_user()
user_home_dir = fs_utils.parse_ls_output(fs_utils.ls_item(f"/home/{user_name}"))[0]
user_home_dir.chmod_numerical(777, True)
with TestRun.step("Try to stop OpenCAS service."): with TestRun.step("Try to stop OpenCAS service."):
try: try:
@ -492,3 +490,8 @@ def run_as_other_user(command, user: str, sudo: bool = False):
if output.exit_code != 0 or output.stderr is not "": if output.exit_code != 0 or output.stderr is not "":
raise CmdException("Must be run as root.", output) raise CmdException("Must be run as root.", output)
return output return output
def add_user():
if not check_if_user_exists(user_name):
create_user(user_name, additional_params=["N", "r", "l"])