diff --git a/tests/functional/pyocf/types/io.py b/tests/functional/pyocf/types/io.py index 46f6e93..c01c1f2 100644 --- a/tests/functional/pyocf/types/io.py +++ b/tests/functional/pyocf/types/io.py @@ -67,6 +67,18 @@ class Io(Structure): def get_instance(cls, ref): return cls._instances_[cast(ref, c_void_p).value] + @staticmethod + def get_by_forward_token(token): + return OcfLib.getInstance().ocf_forward_get_io(token) + + @staticmethod + def forward_get(token): + OcfLib.getInstance().ocf_forward_get(token) + + @staticmethod + def forward_end(token, error): + OcfLib.getInstance().ocf_forward_end(token, error) + def del_object(self): del type(self)._instances_[cast(byref(self), c_void_p).value] @@ -149,6 +161,14 @@ IoOps.GET_DATA = CFUNCTYPE(c_void_p, POINTER(Io)) IoOps._fields_ = [("_set_data", IoOps.SET_DATA), ("_get_data", IoOps.GET_DATA)] lib = OcfLib.getInstance() + +lib.ocf_forward_get.argtypes = [c_uint64] + +lib.ocf_forward_get_io.argtypes = [c_uint64] +lib.ocf_forward_get_io.restype = POINTER(Io) + +lib.ocf_forward_end.argtypes = [c_uint64, c_int] + lib.ocf_io_set_cmpl_wrapper.argtypes = [POINTER(Io), c_void_p, c_void_p, Io.END] lib.ocf_io_set_data.argtypes = [POINTER(Io), c_void_p, c_uint32] diff --git a/tests/functional/pyocf/types/volume.py b/tests/functional/pyocf/types/volume.py index fccbd4e..858e289 100644 --- a/tests/functional/pyocf/types/volume.py +++ b/tests/functional/pyocf/types/volume.py @@ -51,6 +51,9 @@ class VolumeOps(Structure): SUBMIT_METADATA = CFUNCTYPE(None, c_void_p) SUBMIT_DISCARD = CFUNCTYPE(None, c_void_p) SUBMIT_WRITE_ZEROES = CFUNCTYPE(None, c_void_p) + FORWARD_IO = CFUNCTYPE(None, c_void_p, c_uint64, c_int, c_uint64, c_uint64, c_uint64) + FORWARD_FLUSH = CFUNCTYPE(None, c_void_p, c_uint64) + FORWARD_DISCARD = CFUNCTYPE(None, c_void_p, c_uint64, c_uint64, c_uint64) ON_INIT = CFUNCTYPE(c_int, c_void_p) ON_DEINIT = CFUNCTYPE(None, c_void_p) OPEN = CFUNCTYPE(c_int, c_void_p, c_void_p) @@ -64,6 +67,9 @@ class VolumeOps(Structure): ("_submit_metadata", SUBMIT_METADATA), ("_submit_discard", SUBMIT_DISCARD), ("_submit_write_zeroes", SUBMIT_WRITE_ZEROES), + ("_forward_io", FORWARD_IO), + ("_forward_flush", FORWARD_FLUSH), + ("_forward_discard", FORWARD_DISCARD), ("_on_init", ON_INIT), ("_on_deinit", ON_DEINIT), ("_open", OPEN), @@ -132,6 +138,18 @@ class Volume: def _submit_write_zeroes(write_zeroes): raise NotImplementedError + @VolumeOps.FORWARD_IO + def _forward_io(volume, token, rw, addr, nbytes, offset): + Volume.get_instance(volume).forward_io(token, rw, addr, nbytes, offset) + + @VolumeOps.FORWARD_FLUSH + def _forward_flush(volume, token): + Volume.get_instance(volume).forward_flush(token) + + @VolumeOps.FORWARD_DISCARD + def _forward_discard(volume, token, addr, nbytes): + Volume.get_instance(volume).forward_discard(token, addr, nbytes) + @VolumeOps.ON_INIT def _on_init(ref): return 0 @@ -181,6 +199,9 @@ class Volume: _submit_metadata=_submit_metadata, _submit_discard=_submit_discard, _submit_write_zeroes=_submit_write_zeroes, + _forward_io=_forward_io, + _forward_flush=_forward_flush, + _forward_discard=_forward_discard, _open=_open, _close=_close, _get_max_io_size=_get_max_io_size, @@ -329,6 +350,28 @@ class Volume: else: self._reject_io(io) + def _reject_forward(self, token): + Io.forward_end(token, -OcfErrorCode.OCF_ERR_IO) + + def forward_io(self, token, rw, addr, nbytes, offset): + if self.is_online: + self.inc_stats(IoDir(rw)) + self.do_forward_io(token, rw, addr, nbytes, offset) + else: + self._reject_forward(token) + + def forward_flush(self, token): + if self.is_online: + self.do_forward_flush(token) + else: + self._reject_forward(token) + + def forward_discard(self, token, addr, nbytes): + if self.is_online: + self.do_forward_discard(token, addr, nbytes) + else: + self._reject_forward(token) + def new_io( self, queue: Queue, addr: int, length: int, direction: IoDir, io_class: int, flags: int, ): @@ -470,6 +513,37 @@ class RamVolume(Volume): except: # noqa E722 io.contents._end(io, -OcfErrorCode.OCF_ERR_IO) + def do_forward_io(self, token, rw, addr, nbytes, offset): + try: + io = Io.get_by_forward_token(token) + + if rw == IoDir.WRITE: + src_ptr = cast(OcfLib.getInstance().ocf_io_get_data(io), c_void_p) + src = Data.get_instance(src_ptr.value).handle.value + offset + dst = self.data_ptr + addr + elif rw == IoDir.READ: + dst_ptr = cast(OcfLib.getInstance().ocf_io_get_data(io), c_void_p) + dst = Data.get_instance(dst_ptr.value).handle.value + offset + src = self.data_ptr + addr + + memmove(dst, src, nbytes) + + Io.forward_end(token, 0) + except Exception as e: # noqa E722 + Io.forward_end(token, -OcfErrorCode.OCF_ERR_IO) + + def do_forward_flush(self, token): + Io.forward_end(token, 0) + + def do_forward_discard(self, token, addr, nbytes): + try: + dst = self.data_ptr + addr + memset(dst, 0, nbytes) + + Io.forward_end(token, 0) + except: # noqa E722 + Io.forward_end(token, -OcfErrorCode.OCF_ERR_NOT_SUPP) + def dump(self, offset=0, size=0, ignore=VOLUME_POISON, **kwargs): if size == 0: size = int(self.size) - int(offset) @@ -517,44 +591,68 @@ class ErrorDevice(Volume): super().close() self.vol.close() - def should_forward_io(self, io): + def should_forward_io(self, rw, addr): if not self.armed: return True - direction = IoDir(io.contents._dir) + direction = IoDir(rw) seq_no_match = ( self.error_seq_no[direction] >= 0 and self.error_seq_no[direction] <= self.io_seq_no[direction] ) - sector_match = io.contents._addr in self.error_sectors + sector_match = addr in self.error_sectors self.io_seq_no[direction] += 1 return not seq_no_match and not sector_match - def complete_with_error(self, io): + def complete_submit_with_error(self, io): self.error = True direction = IoDir(io.contents._dir) self.stats["errors"][direction] += 1 io.contents._end(io, -OcfErrorCode.OCF_ERR_IO) def do_submit_io(self, io): - if self.should_forward_io(io): + if self.should_forward_io(io.contents._dir, io.contents._addr): self.vol.do_submit_io(io) else: - self.complete_with_error(io) + self.complete_submit_with_error(io) - def do_submit_flush(self, flush): - if self.data_only or self.should_forward_io(flush): - self.vol.do_submit_flush(flush) + def do_submit_flush(self, io): + if self.data_only or self.should_forward_io(io.contents._dir, io.contents._addr): + self.vol.do_submit_flush(io) else: - self.complete_with_error(flush) + self.complete_submit_with_error(io) - def do_submit_discard(self, discard): - if self.data_only or self.should_forward_io(discard): - self.vol.do_submit_discard(discard) + def do_submit_discard(self, io): + if self.data_only or self.should_forward_io(io.contents._dir, io.contents._addr): + self.vol.do_submit_discard(io) else: - self.complete_with_error(discard) + self.complete_submit_with_error(io) + + def complete_forward_with_error(self, token, rw): + self.error = True + direction = IoDir(rw) + self.stats["errors"][direction] += 1 + Io.forward_end(token, -OcfErrorCode.OCF_ERR_IO) + + def do_forward_io(self, token, rw, addr, nbytes, offset): + if self.should_forward_io(rw, addr): + self.vol.do_forward_io(token, rw, addr, nbytes, offset) + else: + self.complete_forward_with_error(token, rw) + + def do_forward_flush(self, token): + if self.data_only or self.should_forward_io(0, 0): + self.vol.do_forward_flush(token) + else: + self.complete_forward_with_error(token, rw) + + def do_forward_discard(self, token, addr, nbytes): + if self.data_only or self.should_forward_io(0, addr): + self.vol.do_forward_discard(token, addr, nbytes) + else: + self.complete_forward_with_error(token, rw) def arm(self): self.armed = True @@ -611,32 +709,89 @@ class TraceDevice(Volume): super().close() self.vol.close() - def _trace(self, io, io_type): + def _trace(self, io_type, rw, addr, nbytes, flags): submit = True if self.trace_fcn: - submit = self.trace_fcn(self, io, io_type) + submit = self.trace_fcn(self, io_type, rw, addr, nbytes, flags) return submit def do_submit_io(self, io): - submit = self._trace(io, TraceDevice.IoType.Data) + submit = self._trace( + TraceDevice.IoType.Data, + io.contents._dir, + io.contents._addr, + io.contents._bytes, + io.contents._flags + ) if submit: self.vol.do_submit_io(io) def do_submit_flush(self, io): - submit = self._trace(io, TraceDevice.IoType.Flush) + submit = self._trace( + TraceDevice.IoType.Flush, + io.contents._dir, + io.contents._addr, + io.contents._bytes, + io.contents._flags + ) if submit: self.vol.do_submit_flush(io) def do_submit_discard(self, io): - submit = self._trace(io, TraceDevice.IoType.Discard) + submit = self._trace( + TraceDevice.IoType.Discard, + io.contents._dir, + io.contents._addr, + io.contents._bytes, + io.contents._flags + ) if submit: self.vol.do_submit_discard(io) + def do_forward_io(self, token, rw, addr, nbytes, offset): + io = Io.get_by_forward_token(token) + submit = self._trace( + TraceDevice.IoType.Data, + rw, + addr, + nbytes, + io.contents._flags + ) + + if submit: + self.vol.do_forward_io(token, rw, addr, nbytes, offset) + + def do_forward_flush(self, token): + io = Io.get_by_forward_token(token) + submit = self._trace( + TraceDevice.IoType.Flush, + IoDir.WRITE, + 0, + 0, + io.contents._flags + ) + + if submit: + self.vol.do_forward_flush(token) + + def do_forward_discard(self, token, addr, nbytes): + io = Io.get_by_forward_token(token) + submit = self._trace( + TraceDevice.IoType.Discard, + IoDir.WRITE, + addr, + nbytes, + io.contents._flags + ) + + if submit: + self.vol.do_forward_discard(token, addr, nbytes) + def get_length(self): return self.vol.get_length() diff --git a/tests/functional/pyocf/types/volume_exp_obj.py b/tests/functional/pyocf/types/volume_exp_obj.py index 4827589..e56160b 100644 --- a/tests/functional/pyocf/types/volume_exp_obj.py +++ b/tests/functional/pyocf/types/volume_exp_obj.py @@ -1,5 +1,6 @@ # # Copyright(c) 2022 Intel Corporation +# Copyright(c) 2024 Huawei Technologies # SPDX-License-Identifier: BSD-3-Clause # @@ -23,17 +24,17 @@ class OcfInternalVolume(Volume): queue = self.parent.get_default_queue() # TODO multiple queues? return self.new_io(queue, addr, _bytes, _dir, _class, _flags) - def _alloc_io(self, io): + def _alloc_io(self, io, rw=None, addr=None, nbytes=None, offset=0): exp_obj_io = self.__alloc_io( - io.contents._addr, - io.contents._bytes, - io.contents._dir, + addr or io.contents._addr, + nbytes or io.contents._bytes, + rw or io.contents._dir, io.contents._class, io.contents._flags, ) cdata = OcfLib.getInstance().ocf_io_get_data(io) - OcfLib.getInstance().ocf_io_set_data(byref(exp_obj_io), cdata, 0) + OcfLib.getInstance().ocf_io_set_data(byref(exp_obj_io), cdata, offset) def cb(error): nonlocal io @@ -62,6 +63,29 @@ class OcfInternalVolume(Volume): io = self._alloc_io(discard) io.submit_discard() + def do_forward_io(self, token, rw, addr, nbytes, offset): + orig_io = Io.get_by_forward_token(token) + io = self._alloc_io(orig_io, rw, addr, nbytes, offset) + + def cb(error): + nonlocal io + Io.forward_end(io.token, error) + + io.token = token + io.callback = cb + + io.submit() + + def do_forward_flush(self, token): + orig_io = Io.get_by_forward_token(token) + io = self._alloc_io(orig_io) + io.submit_flush() + + def do_forward_discard(self, token, addr, nbytes): + orig_io = Io.get_by_forward_token(token) + io = self._alloc_io(orig_io, addr=addr, nbytes=nbytes) + io.submit_discard() + def _read(self, offset=0, size=0): if size == 0: size = self.get_length().B - offset diff --git a/tests/functional/pyocf/types/volume_replicated.py b/tests/functional/pyocf/types/volume_replicated.py index 13be9f5..c10c893 100644 --- a/tests/functional/pyocf/types/volume_replicated.py +++ b/tests/functional/pyocf/types/volume_replicated.py @@ -1,5 +1,6 @@ # # Copyright(c) 2022 Intel Corporation +# Copyright(c) 2024 Huawei Technologies # SPDX-License-Identifier: BSD-3-Clause # @@ -92,6 +93,22 @@ class ReplicatedVolume(Volume): self.primary.submit_discard(discard) self.secondary.submit_discard(discard) + def do_forward_io(self, token, rw, addr, nbytes, offset): + if rw == IoDir.WRITE: + Io.forward_get(token) + self.secondary.do_forward_io(token, rw, addr, nbytes, offset) + self.primary.do_forward_io(token, rw, addr, nbytes, offset) + + def do_forward_flush(self, token): + Io.forward_get(token) + self.secondary.do_forward_flush(token) + self.primary.do_forward_flush(token) + + def do_forward_discard(self, token, addr, nbytes): + Io.forward_get(token) + self.secondary.do_forward_discard(token, addr, nbytes) + self.primary.do_forward_discard(token, addr, nbytes) + def dump(self, offset=0, size=0, ignore=VOLUME_POISON, **kwargs): self.primary.dump() diff --git a/tests/functional/tests/engine/test_discard.py b/tests/functional/tests/engine/test_discard.py index 608e6c0..9e8459c 100644 --- a/tests/functional/tests/engine/test_discard.py +++ b/tests/functional/tests/engine/test_discard.py @@ -20,13 +20,13 @@ def test_discard_propagation(pyocf_ctx): pyocf_ctx.register_volume_type(TraceDevice) - def trace_discard(vol, io, io_type): + def trace_discard(vol, io_type, rw, addr, nbytes, flags): nonlocal discards if io_type == TraceDevice.IoType.Discard: if vol.uuid not in discards: discards[vol.uuid] = [] - discards[vol.uuid].append((io.contents._addr, io.contents._bytes)) + discards[vol.uuid].append((addr, nbytes)) return True diff --git a/tests/functional/tests/engine/test_flush.py b/tests/functional/tests/engine/test_flush.py index 569779a..afcef01 100644 --- a/tests/functional/tests/engine/test_flush.py +++ b/tests/functional/tests/engine/test_flush.py @@ -7,7 +7,7 @@ from pyocf.types.cache import Cache from pyocf.types.data import Data from pyocf.types.core import Core -from pyocf.types.io import IoDir, Sync +from pyocf.types.io import Io, IoDir, Sync from pyocf.types.volume import RamVolume, IoFlags, TraceDevice from pyocf.types.volume_core import CoreVolume from pyocf.utils import Size @@ -18,22 +18,19 @@ def test_flush_propagation(pyocf_ctx): pyocf_ctx.register_volume_type(TraceDevice) - def trace_flush(vol, io, io_type): + def trace_flush(vol, io_type, rw, addr, nbytes, flags): nonlocal flushes - if io_type == TraceDevice.IoType.Flush or int(io.contents._flags) & IoFlags.FLUSH: + if io_type == TraceDevice.IoType.Flush: if vol.uuid not in flushes: flushes[vol.uuid] = [] - flushes[vol.uuid].append((io.contents._addr, io.contents._bytes)) + flushes[vol.uuid].append((addr, nbytes)) return True cache_device = TraceDevice(RamVolume(Size.from_MiB(50)), trace_fcn=trace_flush) core_device = TraceDevice(RamVolume(Size.from_MiB(100)), trace_fcn=trace_flush) - addr = Size.from_MiB(2).B - size = Size.from_MiB(1).B - cache = Cache.start_on_device(cache_device) core = Core.using_device(core_device) cache.add_core(core) @@ -44,9 +41,7 @@ def test_flush_propagation(pyocf_ctx): flushes = {} vol.open() - io = vol.new_io(queue, addr, size, IoDir.WRITE, 0, IoFlags.FLUSH) - data = Data(byte_count=0) - io.set_data(data, 0) + io = vol.new_io(queue, 0, 0, IoDir.WRITE, 0, 0) completion = Sync(io).submit_flush() vol.close() @@ -62,9 +57,4 @@ def test_flush_propagation(pyocf_ctx): assert len(cache_flushes) == 1 assert len(core_flushes) == 1 - assert core_flushes[0] == (addr, size) - - # empty flush expected to be sent to cache device - assert cache_flushes[0] == (0, 0) - cache.stop() diff --git a/tests/functional/tests/flush/test_flush_after_mngmt.py b/tests/functional/tests/flush/test_flush_after_mngmt.py index 04634b0..e12c3a0 100644 --- a/tests/functional/tests/flush/test_flush_after_mngmt.py +++ b/tests/functional/tests/flush/test_flush_after_mngmt.py @@ -48,6 +48,14 @@ class FlushValVolume(RamVolume): self.flush_last = True super().submit_flush(flush) + def forward_io(self, token, rw, addr, nbytes, offset): + self.flush_last = False + super().forward_io(token, rw, addr, nbytes, offset) + + def forward_flush(self, token): + self.flush_last = True + super().forward_flush(token) + def test_flush_after_mngmt(pyocf_ctx): """ diff --git a/tests/functional/tests/management/test_composite_volume.py b/tests/functional/tests/management/test_composite_volume.py index 9023616..f7b0607 100644 --- a/tests/functional/tests/management/test_composite_volume.py +++ b/tests/functional/tests/management/test_composite_volume.py @@ -166,12 +166,9 @@ def setup_tracing(backends): TraceDevice.IoType.Data: [], } - def trace(vol, io, io_type): - if int(io.contents._flags) & IoFlags.FLUSH: - io_type = TraceDevice.IoType.Flush - + def trace(vol, io_type, rw, addr, nbytes, flags): io_trace[vol][io_type].append( - IoEvent(io.contents._dir, io.contents._addr, io.contents._bytes) + IoEvent(rw, addr, nbytes) ) return True