From 876294f603d04cff307b813a1a3045083fd91800 Mon Sep 17 00:00:00 2001 From: Daniel Madej Date: Tue, 16 Apr 2019 11:10:47 +0200 Subject: [PATCH] Negative tests for IO path Signed-off-by: Daniel Madej --- tests/functional/pyocf/types/core.py | 12 +- tests/functional/pyocf/types/shared.py | 3 +- .../tests/security/test_negative_io.py | 176 ++++++++++++++++++ 3 files changed, 184 insertions(+), 7 deletions(-) create mode 100644 tests/functional/tests/security/test_negative_io.py diff --git a/tests/functional/pyocf/types/core.py b/tests/functional/pyocf/types/core.py index 4562632..71b9a84 100644 --- a/tests/functional/pyocf/types/core.py +++ b/tests/functional/pyocf/types/core.py @@ -3,6 +3,7 @@ # SPDX-License-Identifier: BSD-3-Clause-Clear # +import logging from ctypes import ( c_size_t, c_void_p, @@ -17,16 +18,15 @@ from ctypes import ( byref, create_string_buffer, ) -import logging from datetime import timedelta -from ..ocf import OcfLib -from .shared import Uuid, OcfCompletion, OcfError, SeqCutOffPolicy -from .volume import Volume from .data import Data from .io import Io, IoDir -from .stats.shared import UsageStats, RequestsStats, BlocksStats, ErrorsStats +from .shared import Uuid, OcfCompletion, OcfError, SeqCutOffPolicy from .stats.core import CoreStats +from .stats.shared import UsageStats, RequestsStats, BlocksStats, ErrorsStats +from .volume import Volume +from ..ocf import OcfLib from ..utils import Size, struct_to_dict @@ -59,7 +59,7 @@ class Core: core_id: int = DEFAULT_ID, seq_cutoff_threshold: int = DEFAULT_SEQ_CUTOFF_THRESHOLD, ): - + self.cache = None self.device = device self.device_name = device.uuid self.core_id = core_id diff --git a/tests/functional/pyocf/types/shared.py b/tests/functional/pyocf/types/shared.py index 1f550cc..b23a2fa 100644 --- a/tests/functional/pyocf/types/shared.py +++ b/tests/functional/pyocf/types/shared.py @@ -3,10 +3,10 @@ # SPDX-License-Identifier: BSD-3-Clause-Clear # +import logging from ctypes import CFUNCTYPE, c_size_t, c_char_p, Structure, c_void_p from enum import IntEnum, auto from threading import Event -import logging from ..utils import Size as S @@ -58,6 +58,7 @@ class OcfCompletion: self.e = Event() self.completion_args = completion_args self._as_parameter_ = self.callback + self.results = None @property def callback(self): diff --git a/tests/functional/tests/security/test_negative_io.py b/tests/functional/tests/security/test_negative_io.py new file mode 100644 index 0000000..ffd4001 --- /dev/null +++ b/tests/functional/tests/security/test_negative_io.py @@ -0,0 +1,176 @@ +# +# Copyright(c) 2019 Intel Corporation +# SPDX-License-Identifier: BSD-3-Clause-Clear +# + +from ctypes import c_int +from random import randrange + +import pytest + +from pyocf.types.cache import Cache, Core +from pyocf.types.data import Data +from pyocf.types.io import IoDir +from pyocf.types.shared import OcfCompletion +from pyocf.types.volume import Volume +from pyocf.utils import Size + + +@pytest.mark.security +def test_neg_write_too_long_data(pyocf_ctx, c_uint16_randomize): + """ + Check if writing data larger than exported object size is properly blocked + """ + + core = prepare_cache_and_core(Size.from_MiB(1)) + data = Data(int(Size.from_KiB(c_uint16_randomize))) + completion = io_operation(core, data, IoDir.WRITE) + + if c_uint16_randomize > 1024: + assert completion.results["err"] != 0 + else: + assert completion.results["err"] == 0 + + +@pytest.mark.security +def test_neg_read_too_long_data(pyocf_ctx, c_uint16_randomize): + """ + Check if reading data larger than exported object size is properly blocked + """ + + core = prepare_cache_and_core(Size.from_MiB(1)) + data = Data(int(Size.from_KiB(c_uint16_randomize))) + completion = io_operation(core, data, IoDir.READ) + + if c_uint16_randomize > 1024: + assert completion.results["err"] != 0 + else: + assert completion.results["err"] == 0 + + +@pytest.mark.security +def test_neg_write_too_far(pyocf_ctx, c_uint16_randomize): + """ + Check if writing data which would normally fit on exported object is + blocked when offset is set so that data goes over exported device end + """ + + limited_size = c_uint16_randomize % (int(Size.from_KiB(4)) + 1) + core = prepare_cache_and_core(Size.from_MiB(4)) + data = Data(int(Size.from_KiB(limited_size))) + completion = io_operation(core, data, IoDir.WRITE, int(Size.from_MiB(3))) + + if limited_size > 1024: + assert completion.results["err"] != 0 + else: + assert completion.results["err"] == 0 + + +@pytest.mark.security +def test_neg_read_too_far(pyocf_ctx, c_uint16_randomize): + """ + Check if reading data which would normally fit on exported object is + blocked when offset is set so that data is read beyond exported device end + """ + + limited_size = c_uint16_randomize % (int(Size.from_KiB(4)) + 1) + core = prepare_cache_and_core(Size.from_MiB(4)) + data = Data(int(Size.from_KiB(limited_size))) + completion = io_operation(core, data, IoDir.READ, offset=(Size.from_MiB(3))) + + if limited_size > 1024: + assert completion.results["err"] != 0 + else: + assert completion.results["err"] == 0 + + +@pytest.mark.security +def test_neg_write_offset_outside_of_device(pyocf_ctx, c_int_randomize): + """ + Check that write operations are blocked when + IO offset is located outside of device range + """ + + core = prepare_cache_and_core(Size.from_MiB(2)) + data = Data(int(Size.from_KiB(1))) + completion = io_operation(core, data, IoDir.WRITE, offset=c_int_randomize) + + if 0 <= c_int_randomize <= int(Size.from_MiB(2)) - int(Size.from_KiB(1)): + assert completion.results["err"] == 0 + else: + assert completion.results["err"] != 0 + + +@pytest.mark.security +def test_neg_read_offset_outside_of_device(pyocf_ctx, c_int_randomize): + """ + Check that read operations are blocked when + IO offset is located outside of device range + """ + + core = prepare_cache_and_core(Size.from_MiB(2)) + data = Data(int(Size.from_KiB(1))) + completion = io_operation(core, data, IoDir.READ, offset=c_int_randomize) + + if 0 <= c_int_randomize <= int(Size.from_MiB(2)) - int(Size.from_KiB(1)): + assert completion.results["err"] == 0 + else: + assert completion.results["err"] != 0 + + +@pytest.mark.security +def test_neg_io_class(pyocf_ctx, c_int_randomize): + """ + Check that IO operations are blocked when IO class + number is not in allowed values {0, ..., 32} + """ + + core = prepare_cache_and_core(Size.from_MiB(2)) + data = Data(int(Size.from_MiB(1))) + completion = io_operation(core, data, randrange(0, 2), io_class=c_int_randomize) + + if 0 <= c_int_randomize <= 32: + assert completion.results["err"] == 0 + else: + assert completion.results["err"] != 0 + + +@pytest.mark.security +def test_neg_io_direction(pyocf_ctx, c_int_randomize): + """ + Check that IO operations are not executed for unknown IO direction, + that is when IO direction value is not in allowed values {0, 1} + """ + + core = prepare_cache_and_core(Size.from_MiB(2)) + data = Data(int(Size.from_MiB(1))) + completion = io_operation(core, data, c_int_randomize) + + if c_int_randomize in [0, 1]: + assert completion.results["err"] == 0 + else: + assert completion.results["err"] != 0 + + +def prepare_cache_and_core(core_size: Size, cache_size: Size = Size.from_MiB(20)): + cache_device = Volume(cache_size) + core_device = Volume(core_size) + + cache = Cache.start_on_device(cache_device) + core = Core.using_device(core_device) + + cache.add_core(core) + return core + + +def io_operation(core: Core, data: Data, io_direction: int, offset: int = 0, io_class: int = 0): + io = core.new_io() + io.set_data(data) + io.configure(offset, data.size, io_direction, io_class, 0) + io.set_queue(core.cache.get_default_queue()) + + completion = OcfCompletion([("err", c_int)]) + io.callback = completion.callback + io.submit() + completion.wait() + return completion