pyocf: Update to use forward API
Signed-off-by: Robert Baldyga <robert.baldyga@huawei.com> Signed-off-by: Michal Mielewczyk <michal.mielewczyk@huawei.com>
This commit is contained in:
parent
0cd2393aaf
commit
c9cd217a08
@ -67,6 +67,18 @@ class Io(Structure):
|
||||
def get_instance(cls, ref):
|
||||
return cls._instances_[cast(ref, c_void_p).value]
|
||||
|
||||
@staticmethod
|
||||
def get_by_forward_token(token):
|
||||
return OcfLib.getInstance().ocf_forward_get_io(token)
|
||||
|
||||
@staticmethod
|
||||
def forward_get(token):
|
||||
OcfLib.getInstance().ocf_forward_get(token)
|
||||
|
||||
@staticmethod
|
||||
def forward_end(token, error):
|
||||
OcfLib.getInstance().ocf_forward_end(token, error)
|
||||
|
||||
def del_object(self):
|
||||
del type(self)._instances_[cast(byref(self), c_void_p).value]
|
||||
|
||||
@ -149,6 +161,14 @@ IoOps.GET_DATA = CFUNCTYPE(c_void_p, POINTER(Io))
|
||||
IoOps._fields_ = [("_set_data", IoOps.SET_DATA), ("_get_data", IoOps.GET_DATA)]
|
||||
|
||||
lib = OcfLib.getInstance()
|
||||
|
||||
lib.ocf_forward_get.argtypes = [c_uint64]
|
||||
|
||||
lib.ocf_forward_get_io.argtypes = [c_uint64]
|
||||
lib.ocf_forward_get_io.restype = POINTER(Io)
|
||||
|
||||
lib.ocf_forward_end.argtypes = [c_uint64, c_int]
|
||||
|
||||
lib.ocf_io_set_cmpl_wrapper.argtypes = [POINTER(Io), c_void_p, c_void_p, Io.END]
|
||||
|
||||
lib.ocf_io_set_data.argtypes = [POINTER(Io), c_void_p, c_uint32]
|
||||
|
@ -51,6 +51,9 @@ class VolumeOps(Structure):
|
||||
SUBMIT_METADATA = CFUNCTYPE(None, c_void_p)
|
||||
SUBMIT_DISCARD = CFUNCTYPE(None, c_void_p)
|
||||
SUBMIT_WRITE_ZEROES = CFUNCTYPE(None, c_void_p)
|
||||
FORWARD_IO = CFUNCTYPE(None, c_void_p, c_uint64, c_int, c_uint64, c_uint64, c_uint64)
|
||||
FORWARD_FLUSH = CFUNCTYPE(None, c_void_p, c_uint64)
|
||||
FORWARD_DISCARD = CFUNCTYPE(None, c_void_p, c_uint64, c_uint64, c_uint64)
|
||||
ON_INIT = CFUNCTYPE(c_int, c_void_p)
|
||||
ON_DEINIT = CFUNCTYPE(None, c_void_p)
|
||||
OPEN = CFUNCTYPE(c_int, c_void_p, c_void_p)
|
||||
@ -64,6 +67,9 @@ class VolumeOps(Structure):
|
||||
("_submit_metadata", SUBMIT_METADATA),
|
||||
("_submit_discard", SUBMIT_DISCARD),
|
||||
("_submit_write_zeroes", SUBMIT_WRITE_ZEROES),
|
||||
("_forward_io", FORWARD_IO),
|
||||
("_forward_flush", FORWARD_FLUSH),
|
||||
("_forward_discard", FORWARD_DISCARD),
|
||||
("_on_init", ON_INIT),
|
||||
("_on_deinit", ON_DEINIT),
|
||||
("_open", OPEN),
|
||||
@ -132,6 +138,18 @@ class Volume:
|
||||
def _submit_write_zeroes(write_zeroes):
|
||||
raise NotImplementedError
|
||||
|
||||
@VolumeOps.FORWARD_IO
|
||||
def _forward_io(volume, token, rw, addr, nbytes, offset):
|
||||
Volume.get_instance(volume).forward_io(token, rw, addr, nbytes, offset)
|
||||
|
||||
@VolumeOps.FORWARD_FLUSH
|
||||
def _forward_flush(volume, token):
|
||||
Volume.get_instance(volume).forward_flush(token)
|
||||
|
||||
@VolumeOps.FORWARD_DISCARD
|
||||
def _forward_discard(volume, token, addr, nbytes):
|
||||
Volume.get_instance(volume).forward_discard(token, addr, nbytes)
|
||||
|
||||
@VolumeOps.ON_INIT
|
||||
def _on_init(ref):
|
||||
return 0
|
||||
@ -181,6 +199,9 @@ class Volume:
|
||||
_submit_metadata=_submit_metadata,
|
||||
_submit_discard=_submit_discard,
|
||||
_submit_write_zeroes=_submit_write_zeroes,
|
||||
_forward_io=_forward_io,
|
||||
_forward_flush=_forward_flush,
|
||||
_forward_discard=_forward_discard,
|
||||
_open=_open,
|
||||
_close=_close,
|
||||
_get_max_io_size=_get_max_io_size,
|
||||
@ -329,6 +350,28 @@ class Volume:
|
||||
else:
|
||||
self._reject_io(io)
|
||||
|
||||
def _reject_forward(self, token):
|
||||
Io.forward_end(token, -OcfErrorCode.OCF_ERR_IO)
|
||||
|
||||
def forward_io(self, token, rw, addr, nbytes, offset):
|
||||
if self.is_online:
|
||||
self.inc_stats(IoDir(rw))
|
||||
self.do_forward_io(token, rw, addr, nbytes, offset)
|
||||
else:
|
||||
self._reject_forward(token)
|
||||
|
||||
def forward_flush(self, token):
|
||||
if self.is_online:
|
||||
self.do_forward_flush(token)
|
||||
else:
|
||||
self._reject_forward(token)
|
||||
|
||||
def forward_discard(self, token, addr, nbytes):
|
||||
if self.is_online:
|
||||
self.do_forward_discard(token, addr, nbytes)
|
||||
else:
|
||||
self._reject_forward(token)
|
||||
|
||||
def new_io(
|
||||
self, queue: Queue, addr: int, length: int, direction: IoDir, io_class: int, flags: int,
|
||||
):
|
||||
@ -470,6 +513,37 @@ class RamVolume(Volume):
|
||||
except: # noqa E722
|
||||
io.contents._end(io, -OcfErrorCode.OCF_ERR_IO)
|
||||
|
||||
def do_forward_io(self, token, rw, addr, nbytes, offset):
|
||||
try:
|
||||
io = Io.get_by_forward_token(token)
|
||||
|
||||
if rw == IoDir.WRITE:
|
||||
src_ptr = cast(OcfLib.getInstance().ocf_io_get_data(io), c_void_p)
|
||||
src = Data.get_instance(src_ptr.value).handle.value + offset
|
||||
dst = self.data_ptr + addr
|
||||
elif rw == IoDir.READ:
|
||||
dst_ptr = cast(OcfLib.getInstance().ocf_io_get_data(io), c_void_p)
|
||||
dst = Data.get_instance(dst_ptr.value).handle.value + offset
|
||||
src = self.data_ptr + addr
|
||||
|
||||
memmove(dst, src, nbytes)
|
||||
|
||||
Io.forward_end(token, 0)
|
||||
except Exception as e: # noqa E722
|
||||
Io.forward_end(token, -OcfErrorCode.OCF_ERR_IO)
|
||||
|
||||
def do_forward_flush(self, token):
|
||||
Io.forward_end(token, 0)
|
||||
|
||||
def do_forward_discard(self, token, addr, nbytes):
|
||||
try:
|
||||
dst = self.data_ptr + addr
|
||||
memset(dst, 0, nbytes)
|
||||
|
||||
Io.forward_end(token, 0)
|
||||
except: # noqa E722
|
||||
Io.forward_end(token, -OcfErrorCode.OCF_ERR_NOT_SUPP)
|
||||
|
||||
def dump(self, offset=0, size=0, ignore=VOLUME_POISON, **kwargs):
|
||||
if size == 0:
|
||||
size = int(self.size) - int(offset)
|
||||
@ -517,44 +591,68 @@ class ErrorDevice(Volume):
|
||||
super().close()
|
||||
self.vol.close()
|
||||
|
||||
def should_forward_io(self, io):
|
||||
def should_forward_io(self, rw, addr):
|
||||
if not self.armed:
|
||||
return True
|
||||
|
||||
direction = IoDir(io.contents._dir)
|
||||
direction = IoDir(rw)
|
||||
seq_no_match = (
|
||||
self.error_seq_no[direction] >= 0
|
||||
and self.error_seq_no[direction] <= self.io_seq_no[direction]
|
||||
)
|
||||
sector_match = io.contents._addr in self.error_sectors
|
||||
sector_match = addr in self.error_sectors
|
||||
|
||||
self.io_seq_no[direction] += 1
|
||||
|
||||
return not seq_no_match and not sector_match
|
||||
|
||||
def complete_with_error(self, io):
|
||||
def complete_submit_with_error(self, io):
|
||||
self.error = True
|
||||
direction = IoDir(io.contents._dir)
|
||||
self.stats["errors"][direction] += 1
|
||||
io.contents._end(io, -OcfErrorCode.OCF_ERR_IO)
|
||||
|
||||
def do_submit_io(self, io):
|
||||
if self.should_forward_io(io):
|
||||
if self.should_forward_io(io.contents._dir, io.contents._addr):
|
||||
self.vol.do_submit_io(io)
|
||||
else:
|
||||
self.complete_with_error(io)
|
||||
self.complete_submit_with_error(io)
|
||||
|
||||
def do_submit_flush(self, flush):
|
||||
if self.data_only or self.should_forward_io(flush):
|
||||
self.vol.do_submit_flush(flush)
|
||||
def do_submit_flush(self, io):
|
||||
if self.data_only or self.should_forward_io(io.contents._dir, io.contents._addr):
|
||||
self.vol.do_submit_flush(io)
|
||||
else:
|
||||
self.complete_with_error(flush)
|
||||
self.complete_submit_with_error(io)
|
||||
|
||||
def do_submit_discard(self, discard):
|
||||
if self.data_only or self.should_forward_io(discard):
|
||||
self.vol.do_submit_discard(discard)
|
||||
def do_submit_discard(self, io):
|
||||
if self.data_only or self.should_forward_io(io.contents._dir, io.contents._addr):
|
||||
self.vol.do_submit_discard(io)
|
||||
else:
|
||||
self.complete_with_error(discard)
|
||||
self.complete_submit_with_error(io)
|
||||
|
||||
def complete_forward_with_error(self, token, rw):
|
||||
self.error = True
|
||||
direction = IoDir(rw)
|
||||
self.stats["errors"][direction] += 1
|
||||
Io.forward_end(token, -OcfErrorCode.OCF_ERR_IO)
|
||||
|
||||
def do_forward_io(self, token, rw, addr, nbytes, offset):
|
||||
if self.should_forward_io(rw, addr):
|
||||
self.vol.do_forward_io(token, rw, addr, nbytes, offset)
|
||||
else:
|
||||
self.complete_forward_with_error(token, rw)
|
||||
|
||||
def do_forward_flush(self, token):
|
||||
if self.data_only or self.should_forward_io(0, 0):
|
||||
self.vol.do_forward_flush(token)
|
||||
else:
|
||||
self.complete_forward_with_error(token, rw)
|
||||
|
||||
def do_forward_discard(self, token, addr, nbytes):
|
||||
if self.data_only or self.should_forward_io(0, addr):
|
||||
self.vol.do_forward_discard(token, addr, nbytes)
|
||||
else:
|
||||
self.complete_forward_with_error(token, rw)
|
||||
|
||||
def arm(self):
|
||||
self.armed = True
|
||||
@ -611,32 +709,89 @@ class TraceDevice(Volume):
|
||||
super().close()
|
||||
self.vol.close()
|
||||
|
||||
def _trace(self, io, io_type):
|
||||
def _trace(self, io_type, rw, addr, nbytes, flags):
|
||||
submit = True
|
||||
|
||||
if self.trace_fcn:
|
||||
submit = self.trace_fcn(self, io, io_type)
|
||||
submit = self.trace_fcn(self, io_type, rw, addr, nbytes, flags)
|
||||
|
||||
return submit
|
||||
|
||||
def do_submit_io(self, io):
|
||||
submit = self._trace(io, TraceDevice.IoType.Data)
|
||||
submit = self._trace(
|
||||
TraceDevice.IoType.Data,
|
||||
io.contents._dir,
|
||||
io.contents._addr,
|
||||
io.contents._bytes,
|
||||
io.contents._flags
|
||||
)
|
||||
|
||||
if submit:
|
||||
self.vol.do_submit_io(io)
|
||||
|
||||
def do_submit_flush(self, io):
|
||||
submit = self._trace(io, TraceDevice.IoType.Flush)
|
||||
submit = self._trace(
|
||||
TraceDevice.IoType.Flush,
|
||||
io.contents._dir,
|
||||
io.contents._addr,
|
||||
io.contents._bytes,
|
||||
io.contents._flags
|
||||
)
|
||||
|
||||
if submit:
|
||||
self.vol.do_submit_flush(io)
|
||||
|
||||
def do_submit_discard(self, io):
|
||||
submit = self._trace(io, TraceDevice.IoType.Discard)
|
||||
submit = self._trace(
|
||||
TraceDevice.IoType.Discard,
|
||||
io.contents._dir,
|
||||
io.contents._addr,
|
||||
io.contents._bytes,
|
||||
io.contents._flags
|
||||
)
|
||||
|
||||
if submit:
|
||||
self.vol.do_submit_discard(io)
|
||||
|
||||
def do_forward_io(self, token, rw, addr, nbytes, offset):
|
||||
io = Io.get_by_forward_token(token)
|
||||
submit = self._trace(
|
||||
TraceDevice.IoType.Data,
|
||||
rw,
|
||||
addr,
|
||||
nbytes,
|
||||
io.contents._flags
|
||||
)
|
||||
|
||||
if submit:
|
||||
self.vol.do_forward_io(token, rw, addr, nbytes, offset)
|
||||
|
||||
def do_forward_flush(self, token):
|
||||
io = Io.get_by_forward_token(token)
|
||||
submit = self._trace(
|
||||
TraceDevice.IoType.Flush,
|
||||
IoDir.WRITE,
|
||||
0,
|
||||
0,
|
||||
io.contents._flags
|
||||
)
|
||||
|
||||
if submit:
|
||||
self.vol.do_forward_flush(token)
|
||||
|
||||
def do_forward_discard(self, token, addr, nbytes):
|
||||
io = Io.get_by_forward_token(token)
|
||||
submit = self._trace(
|
||||
TraceDevice.IoType.Discard,
|
||||
IoDir.WRITE,
|
||||
addr,
|
||||
nbytes,
|
||||
io.contents._flags
|
||||
)
|
||||
|
||||
if submit:
|
||||
self.vol.do_forward_discard(token, addr, nbytes)
|
||||
|
||||
def get_length(self):
|
||||
return self.vol.get_length()
|
||||
|
||||
|
@ -1,5 +1,6 @@
|
||||
#
|
||||
# Copyright(c) 2022 Intel Corporation
|
||||
# Copyright(c) 2024 Huawei Technologies
|
||||
# SPDX-License-Identifier: BSD-3-Clause
|
||||
#
|
||||
|
||||
@ -23,17 +24,17 @@ class OcfInternalVolume(Volume):
|
||||
queue = self.parent.get_default_queue() # TODO multiple queues?
|
||||
return self.new_io(queue, addr, _bytes, _dir, _class, _flags)
|
||||
|
||||
def _alloc_io(self, io):
|
||||
def _alloc_io(self, io, rw=None, addr=None, nbytes=None, offset=0):
|
||||
exp_obj_io = self.__alloc_io(
|
||||
io.contents._addr,
|
||||
io.contents._bytes,
|
||||
io.contents._dir,
|
||||
addr or io.contents._addr,
|
||||
nbytes or io.contents._bytes,
|
||||
rw or io.contents._dir,
|
||||
io.contents._class,
|
||||
io.contents._flags,
|
||||
)
|
||||
|
||||
cdata = OcfLib.getInstance().ocf_io_get_data(io)
|
||||
OcfLib.getInstance().ocf_io_set_data(byref(exp_obj_io), cdata, 0)
|
||||
OcfLib.getInstance().ocf_io_set_data(byref(exp_obj_io), cdata, offset)
|
||||
|
||||
def cb(error):
|
||||
nonlocal io
|
||||
@ -62,6 +63,29 @@ class OcfInternalVolume(Volume):
|
||||
io = self._alloc_io(discard)
|
||||
io.submit_discard()
|
||||
|
||||
def do_forward_io(self, token, rw, addr, nbytes, offset):
|
||||
orig_io = Io.get_by_forward_token(token)
|
||||
io = self._alloc_io(orig_io, rw, addr, nbytes, offset)
|
||||
|
||||
def cb(error):
|
||||
nonlocal io
|
||||
Io.forward_end(io.token, error)
|
||||
|
||||
io.token = token
|
||||
io.callback = cb
|
||||
|
||||
io.submit()
|
||||
|
||||
def do_forward_flush(self, token):
|
||||
orig_io = Io.get_by_forward_token(token)
|
||||
io = self._alloc_io(orig_io)
|
||||
io.submit_flush()
|
||||
|
||||
def do_forward_discard(self, token, addr, nbytes):
|
||||
orig_io = Io.get_by_forward_token(token)
|
||||
io = self._alloc_io(orig_io, addr=addr, nbytes=nbytes)
|
||||
io.submit_discard()
|
||||
|
||||
def _read(self, offset=0, size=0):
|
||||
if size == 0:
|
||||
size = self.get_length().B - offset
|
||||
|
@ -1,5 +1,6 @@
|
||||
#
|
||||
# Copyright(c) 2022 Intel Corporation
|
||||
# Copyright(c) 2024 Huawei Technologies
|
||||
# SPDX-License-Identifier: BSD-3-Clause
|
||||
#
|
||||
|
||||
@ -92,6 +93,22 @@ class ReplicatedVolume(Volume):
|
||||
self.primary.submit_discard(discard)
|
||||
self.secondary.submit_discard(discard)
|
||||
|
||||
def do_forward_io(self, token, rw, addr, nbytes, offset):
|
||||
if rw == IoDir.WRITE:
|
||||
Io.forward_get(token)
|
||||
self.secondary.do_forward_io(token, rw, addr, nbytes, offset)
|
||||
self.primary.do_forward_io(token, rw, addr, nbytes, offset)
|
||||
|
||||
def do_forward_flush(self, token):
|
||||
Io.forward_get(token)
|
||||
self.secondary.do_forward_flush(token)
|
||||
self.primary.do_forward_flush(token)
|
||||
|
||||
def do_forward_discard(self, token, addr, nbytes):
|
||||
Io.forward_get(token)
|
||||
self.secondary.do_forward_discard(token, addr, nbytes)
|
||||
self.primary.do_forward_discard(token, addr, nbytes)
|
||||
|
||||
def dump(self, offset=0, size=0, ignore=VOLUME_POISON, **kwargs):
|
||||
self.primary.dump()
|
||||
|
||||
|
@ -20,13 +20,13 @@ def test_discard_propagation(pyocf_ctx):
|
||||
|
||||
pyocf_ctx.register_volume_type(TraceDevice)
|
||||
|
||||
def trace_discard(vol, io, io_type):
|
||||
def trace_discard(vol, io_type, rw, addr, nbytes, flags):
|
||||
nonlocal discards
|
||||
|
||||
if io_type == TraceDevice.IoType.Discard:
|
||||
if vol.uuid not in discards:
|
||||
discards[vol.uuid] = []
|
||||
discards[vol.uuid].append((io.contents._addr, io.contents._bytes))
|
||||
discards[vol.uuid].append((addr, nbytes))
|
||||
|
||||
return True
|
||||
|
||||
|
@ -7,7 +7,7 @@
|
||||
from pyocf.types.cache import Cache
|
||||
from pyocf.types.data import Data
|
||||
from pyocf.types.core import Core
|
||||
from pyocf.types.io import IoDir, Sync
|
||||
from pyocf.types.io import Io, IoDir, Sync
|
||||
from pyocf.types.volume import RamVolume, IoFlags, TraceDevice
|
||||
from pyocf.types.volume_core import CoreVolume
|
||||
from pyocf.utils import Size
|
||||
@ -18,22 +18,19 @@ def test_flush_propagation(pyocf_ctx):
|
||||
|
||||
pyocf_ctx.register_volume_type(TraceDevice)
|
||||
|
||||
def trace_flush(vol, io, io_type):
|
||||
def trace_flush(vol, io_type, rw, addr, nbytes, flags):
|
||||
nonlocal flushes
|
||||
|
||||
if io_type == TraceDevice.IoType.Flush or int(io.contents._flags) & IoFlags.FLUSH:
|
||||
if io_type == TraceDevice.IoType.Flush:
|
||||
if vol.uuid not in flushes:
|
||||
flushes[vol.uuid] = []
|
||||
flushes[vol.uuid].append((io.contents._addr, io.contents._bytes))
|
||||
flushes[vol.uuid].append((addr, nbytes))
|
||||
|
||||
return True
|
||||
|
||||
cache_device = TraceDevice(RamVolume(Size.from_MiB(50)), trace_fcn=trace_flush)
|
||||
core_device = TraceDevice(RamVolume(Size.from_MiB(100)), trace_fcn=trace_flush)
|
||||
|
||||
addr = Size.from_MiB(2).B
|
||||
size = Size.from_MiB(1).B
|
||||
|
||||
cache = Cache.start_on_device(cache_device)
|
||||
core = Core.using_device(core_device)
|
||||
cache.add_core(core)
|
||||
@ -44,9 +41,7 @@ def test_flush_propagation(pyocf_ctx):
|
||||
flushes = {}
|
||||
|
||||
vol.open()
|
||||
io = vol.new_io(queue, addr, size, IoDir.WRITE, 0, IoFlags.FLUSH)
|
||||
data = Data(byte_count=0)
|
||||
io.set_data(data, 0)
|
||||
io = vol.new_io(queue, 0, 0, IoDir.WRITE, 0, 0)
|
||||
|
||||
completion = Sync(io).submit_flush()
|
||||
vol.close()
|
||||
@ -62,9 +57,4 @@ def test_flush_propagation(pyocf_ctx):
|
||||
assert len(cache_flushes) == 1
|
||||
assert len(core_flushes) == 1
|
||||
|
||||
assert core_flushes[0] == (addr, size)
|
||||
|
||||
# empty flush expected to be sent to cache device
|
||||
assert cache_flushes[0] == (0, 0)
|
||||
|
||||
cache.stop()
|
||||
|
@ -48,6 +48,14 @@ class FlushValVolume(RamVolume):
|
||||
self.flush_last = True
|
||||
super().submit_flush(flush)
|
||||
|
||||
def forward_io(self, token, rw, addr, nbytes, offset):
|
||||
self.flush_last = False
|
||||
super().forward_io(token, rw, addr, nbytes, offset)
|
||||
|
||||
def forward_flush(self, token):
|
||||
self.flush_last = True
|
||||
super().forward_flush(token)
|
||||
|
||||
|
||||
def test_flush_after_mngmt(pyocf_ctx):
|
||||
"""
|
||||
|
@ -166,12 +166,9 @@ def setup_tracing(backends):
|
||||
TraceDevice.IoType.Data: [],
|
||||
}
|
||||
|
||||
def trace(vol, io, io_type):
|
||||
if int(io.contents._flags) & IoFlags.FLUSH:
|
||||
io_type = TraceDevice.IoType.Flush
|
||||
|
||||
def trace(vol, io_type, rw, addr, nbytes, flags):
|
||||
io_trace[vol][io_type].append(
|
||||
IoEvent(io.contents._dir, io.contents._addr, io.contents._bytes)
|
||||
IoEvent(rw, addr, nbytes)
|
||||
)
|
||||
|
||||
return True
|
||||
|
Loading…
Reference in New Issue
Block a user