Merge pull request #666 from arutk/pyocf_vol_cleaniup

pyocf: volume changes required for failover tests
This commit is contained in:
Robert Baldyga
2022-03-29 10:02:01 +02:00
committed by GitHub
30 changed files with 1027 additions and 708 deletions

View File

@@ -1,5 +1,5 @@
#
# Copyright(c) 2021 Intel Corporation
# Copyright(c) 2022 Intel Corporation
# SPDX-License-Identifier: BSD-3-Clause
#

View File

@@ -35,11 +35,11 @@ from ..utils import Size, struct_to_dict
from .core import Core
from .queue import Queue
from .stats.cache import CacheInfo
from .io import IoDir
from .ioclass import IoClassesInfo, IoClassInfo
from .stats.shared import UsageStats, RequestsStats, BlocksStats, ErrorsStats
from .ctx import OcfCtx
from .volume import Volume
from .volume import RamVolume
class Backfill(Structure):
_fields_ = [("_max_queue_size", c_uint32), ("_queue_unblock_size", c_uint32)]
@@ -697,7 +697,7 @@ class Cache:
raise OcfError("Failed getting core by name", result)
uuid = self.owner.lib.ocf_core_get_uuid_wrapper(core_handle)
device = Volume.get_by_uuid(uuid.contents._data.decode("ascii"))
device = RamVolume.get_by_uuid(uuid.contents._data.decode("ascii"))
core = Core(device)
core.cache = self
core.handle = core_handle
@@ -749,6 +749,12 @@ class Cache:
self.cores.remove(core)
def get_front_volume(self):
return Volume.get_instance(lib.ocf_cache_get_front_volume(self.cache_handle))
def get_volume(self):
return Volume.get_instance(lib.ocf_cache_get_volume(self.cache_handle))
def get_stats(self):
cache_info = CacheInfo()
usage = UsageStats()
@@ -897,6 +903,10 @@ lib.ocf_mngt_cache_remove_core.argtypes = [c_void_p, c_void_p, c_void_p]
lib.ocf_mngt_cache_add_core.argtypes = [c_void_p, c_void_p, c_void_p, c_void_p]
lib.ocf_cache_get_name.argtypes = [c_void_p]
lib.ocf_cache_get_name.restype = c_char_p
lib.ocf_cache_get_front_volume.argtypes = [c_void_p]
lib.ocf_cache_get_front_volume.restype = c_void_p
lib.ocf_cache_get_volume.argtypes = [c_void_p]
lib.ocf_cache_get_volume.restype = c_void_p
lib.ocf_mngt_cache_cleaning_set_policy.argtypes = [
c_void_p,
c_uint32,

View File

@@ -1,5 +1,5 @@
#
# Copyright(c) 2019-2021 Intel Corporation
# Copyright(c) 2019-2022 Intel Corporation
# SPDX-License-Identifier: BSD-3-Clause
#
@@ -99,30 +99,14 @@ class Core:
def get_handle(self):
return self.handle
def new_io(
self, queue: Queue, addr: int, length: int, direction: IoDir,
io_class: int, flags: int
):
if not self.cache:
raise Exception("Core isn't attached to any cache")
def get_front_volume(self):
return Volume.get_instance(lib.ocf_core_get_front_volume(self.handle))
io = OcfLib.getInstance().ocf_core_new_io_wrapper(
self.handle, queue.handle, addr, length, direction, io_class, flags)
def get_volume(self):
return Volume.get_instance(lib.ocf_core_get_volume(self.handle))
if io is None:
raise Exception("Failed to create io!")
return Io.from_pointer(io)
def new_core_io(
self, queue: Queue, addr: int, length: int, direction: IoDir,
io_class: int, flags: int
):
lib = OcfLib.getInstance()
volume = lib.ocf_core_get_volume(self.handle)
io = lib.ocf_volume_new_io(
volume, queue.handle, addr, length, direction, io_class, flags)
return Io.from_pointer(io)
def get_default_queue(self):
return self.cache.get_default_queue()
def get_stats(self):
core_info = CoreInfo()
@@ -191,52 +175,13 @@ class Core:
def reset_stats(self):
self.cache.owner.lib.ocf_core_stats_initialize(self.handle)
def exp_obj_md5(self):
logging.getLogger("pyocf").warning(
"Reading whole exported object! This disturbs statistics values"
)
cache_line_size = int(self.cache.get_stats()['conf']['cache_line_size'])
read_buffer_all = Data(self.device.size)
read_buffer = Data(cache_line_size)
position = 0
while position < read_buffer_all.size:
io = self.new_io(self.cache.get_default_queue(), position,
cache_line_size, IoDir.READ, 0, 0)
io.set_data(read_buffer)
cmpl = OcfCompletion([("err", c_int)])
io.callback = cmpl.callback
io.submit()
cmpl.wait()
if cmpl.results["err"]:
raise Exception("Error reading whole exported object")
read_buffer_all.copy(read_buffer, position, 0, cache_line_size)
position += cache_line_size
return read_buffer_all.md5()
lib = OcfLib.getInstance()
lib.ocf_core_get_uuid_wrapper.restype = POINTER(Uuid)
lib.ocf_core_get_uuid_wrapper.argtypes = [c_void_p]
lib.ocf_core_get_volume.restype = c_void_p
lib.ocf_volume_new_io.argtypes = [
c_void_p,
c_void_p,
c_uint64,
c_uint32,
c_uint32,
c_uint32,
c_uint64,
]
lib.ocf_volume_new_io.restype = c_void_p
lib.ocf_core_get_volume.argtypes = [c_void_p]
lib.ocf_core_get_volume.restype = c_void_p
lib.ocf_core_get_front_volume.argtypes = [c_void_p]
lib.ocf_core_get_front_volume.restype = c_void_p
lib.ocf_mngt_core_set_seq_cutoff_policy.argtypes = [c_void_p, c_uint32]
lib.ocf_mngt_core_set_seq_cutoff_policy.restype = c_int
lib.ocf_mngt_core_set_seq_cutoff_threshold.argtypes = [c_void_p, c_uint32]
@@ -247,13 +192,3 @@ lib.ocf_stats_collect_core.argtypes = [c_void_p, c_void_p, c_void_p, c_void_p, c
lib.ocf_stats_collect_core.restype = c_int
lib.ocf_core_get_info.argtypes = [c_void_p, c_void_p]
lib.ocf_core_get_info.restype = c_int
lib.ocf_core_new_io_wrapper.argtypes = [
c_void_p,
c_void_p,
c_uint64,
c_uint32,
c_uint32,
c_uint32,
c_uint64,
]
lib.ocf_core_new_io_wrapper.restype = c_void_p

View File

@@ -1,5 +1,5 @@
#
# Copyright(c) 2019-2021 Intel Corporation
# Copyright(c) 2019-2022 Intel Corporation
# SPDX-License-Identifier: BSD-3-Clause
#
@@ -12,8 +12,6 @@ from .cleaner import CleanerOps, Cleaner
from .shared import OcfError
from ..ocf import OcfLib
from .queue import Queue
from .volume import Volume
class OcfCtxOps(Structure):
_fields_ = [

View File

@@ -1,5 +1,5 @@
#
# Copyright(c) 2019-2021 Intel Corporation
# Copyright(c) 2019-2022 Intel Corporation
# SPDX-License-Identifier: BSD-3-Clause
#
@@ -96,7 +96,13 @@ class Io(Structure):
self.del_object()
def submit(self):
return OcfLib.getInstance().ocf_core_submit_io_wrapper(byref(self))
return OcfLib.getInstance().ocf_volume_submit_io(byref(self))
def submit_flush(self):
return OcfLib.getInstance().ocf_volume_submit_flush(byref(self))
def submit_discard(self):
return OcfLib.getInstance().ocf_volume_submit_discard(byref(self))
def set_data(self, data: Data, offset: int = 0):
self.data = data
@@ -111,8 +117,14 @@ IoOps._fields_ = [("_set_data", IoOps.SET_DATA), ("_get_data", IoOps.GET_DATA)]
lib = OcfLib.getInstance()
lib.ocf_io_set_cmpl_wrapper.argtypes = [POINTER(Io), c_void_p, c_void_p, Io.END]
lib.ocf_core_new_io_wrapper.argtypes = [c_void_p]
lib.ocf_core_new_io_wrapper.restype = c_void_p
lib.ocf_io_set_data.argtypes = [POINTER(Io), c_void_p, c_uint32]
lib.ocf_io_set_data.restype = c_int
lib.ocf_volume_submit_io.argtypes = [POINTER(Io)]
lib.ocf_volume_submit_io.restype = None
lib.ocf_volume_submit_flush.argtypes = [POINTER(Io)]
lib.ocf_volume_submit_flush.restype = None
lib.ocf_volume_submit_discard.argtypes = [POINTER(Io)]
lib.ocf_volume_submit_discard.restype = None

View File

@@ -1,5 +1,5 @@
#
# Copyright(c) 2019-2021 Intel Corporation
# Copyright(c) 2019-2022 Intel Corporation
# SPDX-License-Identifier: BSD-3-Clause
#
@@ -24,10 +24,12 @@ from hashlib import md5
import weakref
from .io import Io, IoOps, IoDir
from .queue import Queue
from .shared import OcfErrorCode, Uuid
from ..ocf import OcfLib
from ..utils import print_buffer, Size as S
from .data import Data
from .queue import Queue
class VolumeCaps(Structure):
@@ -66,75 +68,137 @@ class VolumeProperties(Structure):
("_caps", VolumeCaps),
("_io_ops", IoOps),
("_deinit", c_char_p),
("_ops", VolumeOps),
("_ops_", VolumeOps),
]
class VolumeIoPriv(Structure):
_fields_ = [("_data", c_void_p), ("_offset", c_uint64)]
class Volume(Structure):
VOLUME_POISON = 0x13
VOLUME_POISON = 0x13
_fields_ = [("_storage", c_void_p)]
class Volume:
_instances_ = weakref.WeakValueDictionary()
_uuid_ = weakref.WeakValueDictionary()
_ops_ = {}
_props_ = {}
props = None
@classmethod
def get_ops(cls):
if cls in Volume._ops_:
return Volume._ops_[cls]
def __init__(self, size: S, uuid=None):
super().__init__()
self.size = size
if uuid:
if uuid in type(self)._uuid_:
raise Exception(
"Volume with uuid {} already created".format(uuid)
)
self.uuid = uuid
else:
self.uuid = str(id(self))
@VolumeOps.SUBMIT_IO
def _submit_io(io):
io_structure = cast(io, POINTER(Io))
volume = Volume.get_instance(
OcfLib.getInstance().ocf_io_get_volume(io_structure)
)
type(self)._uuid_[self.uuid] = self
volume.submit_io(io_structure)
self.data = create_string_buffer(int(self.size))
memset(self.data, self.VOLUME_POISON, self.size)
self._storage = cast(self.data, c_void_p)
@VolumeOps.SUBMIT_FLUSH
def _submit_flush(flush):
io_structure = cast(flush, POINTER(Io))
volume = Volume.get_instance(
OcfLib.getInstance().ocf_io_get_volume(io_structure)
)
self.reset_stats()
self.opened = False
volume.submit_flush(io_structure)
def get_copy(self):
new_volume = Volume(self.size)
memmove(new_volume.data, self.data, self.size)
return new_volume
@VolumeOps.SUBMIT_METADATA
def _submit_metadata(meta):
raise NotImplementedError
@VolumeOps.SUBMIT_DISCARD
def _submit_discard(discard):
io_structure = cast(discard, POINTER(Io))
volume = Volume.get_instance(
OcfLib.getInstance().ocf_io_get_volume(io_structure)
)
volume.submit_discard(io_structure)
@VolumeOps.SUBMIT_WRITE_ZEROES
def _submit_write_zeroes(write_zeroes):
raise NotImplementedError
@VolumeOps.OPEN
def _open(ref):
uuid_ptr = cast(
OcfLib.getInstance().ocf_volume_get_uuid(ref), POINTER(Uuid)
)
uuid = str(uuid_ptr.contents._data, encoding="ascii")
try:
volume = Volume.get_by_uuid(uuid)
except: # noqa E722 TODO:Investigate whether this really should be so broad
print("Tried to access unallocated volume {}".format(uuid))
print("{}".format(Volume._uuid_))
return -1
return Volume.open(ref, volume)
@VolumeOps.CLOSE
def _close(ref):
volume = Volume.get_instance(ref)
volume.close()
volume.opened = False
@VolumeOps.GET_MAX_IO_SIZE
def _get_max_io_size(ref):
return Volume.get_instance(ref).get_max_io_size()
@VolumeOps.GET_LENGTH
def _get_length(ref):
return Volume.get_instance(ref).get_length()
Volume._ops_[cls] = VolumeOps(
_submit_io=_submit_io,
_submit_flush=_submit_flush,
_submit_metadata=_submit_metadata,
_submit_discard=_submit_discard,
_submit_write_zeroes=_submit_write_zeroes,
_open=_open,
_close=_close,
_get_max_io_size=_get_max_io_size,
_get_length=_get_length,
)
return Volume._ops_[cls]
@staticmethod
def open(ref, volume):
if volume.opened:
return -OcfErrorCode.OCF_ERR_NOT_OPEN_EXC
Volume._instances_[ref] = volume
volume.handle = ref
return volume.do_open()
@classmethod
def get_io_ops(cls):
return IoOps(_set_data=cls._io_set_data, _get_data=cls._io_get_data)
@classmethod
def get_props(cls):
if not cls.props:
cls.props = VolumeProperties(
_name=str(cls.__name__).encode("ascii"),
_io_priv_size=sizeof(VolumeIoPriv),
_volume_priv_size=0,
_caps=VolumeCaps(_atomic_writes=0),
_ops=VolumeOps(
_submit_io=cls._submit_io,
_submit_flush=cls._submit_flush,
_submit_metadata=cls._submit_metadata,
_submit_discard=cls._submit_discard,
_submit_write_zeroes=cls._submit_write_zeroes,
_open=cls._open,
_close=cls._close,
_get_max_io_size=cls._get_max_io_size,
_get_length=cls._get_length,
),
_io_ops=IoOps(
_set_data=cls._io_set_data, _get_data=cls._io_get_data
),
_deinit=0,
)
if cls in Volume._props_:
return Volume._props_[cls]
return cls.props
Volume._props_[cls] = VolumeProperties(
_name=str(cls.__name__).encode("ascii"),
_io_priv_size=sizeof(VolumeIoPriv),
_volume_priv_size=0,
_caps=VolumeCaps(_atomic_writes=0),
_ops_=cls.get_ops(),
_io_ops=cls.get_io_ops(),
_deinit=0,
)
return Volume._props_[cls]
def get_copy(self):
raise NotImplementedError
@classmethod
def get_instance(cls, ref):
@@ -148,84 +212,6 @@ class Volume(Structure):
def get_by_uuid(cls, uuid):
return cls._uuid_[uuid]
@staticmethod
@VolumeOps.SUBMIT_IO
def _submit_io(io):
io_structure = cast(io, POINTER(Io))
volume = Volume.get_instance(
OcfLib.getInstance().ocf_io_get_volume(io_structure)
)
volume.submit_io(io_structure)
@staticmethod
@VolumeOps.SUBMIT_FLUSH
def _submit_flush(flush):
io_structure = cast(flush, POINTER(Io))
volume = Volume.get_instance(
OcfLib.getInstance().ocf_io_get_volume(io_structure)
)
volume.submit_flush(io_structure)
@staticmethod
@VolumeOps.SUBMIT_METADATA
def _submit_metadata(meta):
pass
@staticmethod
@VolumeOps.SUBMIT_DISCARD
def _submit_discard(discard):
io_structure = cast(discard, POINTER(Io))
volume = Volume.get_instance(
OcfLib.getInstance().ocf_io_get_volume(io_structure)
)
volume.submit_discard(io_structure)
@staticmethod
@VolumeOps.SUBMIT_WRITE_ZEROES
def _submit_write_zeroes(write_zeroes):
pass
@staticmethod
@CFUNCTYPE(c_int, c_void_p)
def _open(ref):
uuid_ptr = cast(
OcfLib.getInstance().ocf_volume_get_uuid(ref), POINTER(Uuid)
)
uuid = str(uuid_ptr.contents._data, encoding="ascii")
try:
volume = Volume.get_by_uuid(uuid)
except: # noqa E722 TODO:Investigate whether this really should be so broad
print("Tried to access unallocated volume {}".format(uuid))
print("{}".format(Volume._uuid_))
return -1
if volume.opened:
return -OcfErrorCode.OCF_ERR_NOT_OPEN_EXC
Volume._instances_[ref] = volume
return volume.open()
@staticmethod
@VolumeOps.CLOSE
def _close(ref):
volume = Volume.get_instance(ref)
volume.close()
volume.opened = False
@staticmethod
@VolumeOps.GET_MAX_IO_SIZE
def _get_max_io_size(ref):
return Volume.get_instance(ref).get_max_io_size()
@staticmethod
@VolumeOps.GET_LENGTH
def _get_length(ref):
return Volume.get_instance(ref).get_length()
@staticmethod
@IoOps.SET_DATA
def _io_set_data(io, data, offset):
@@ -246,36 +232,40 @@ class Volume(Structure):
)
return io_priv.contents._data
def open(self):
def __init__(self, uuid=None):
if uuid:
if uuid in type(self)._uuid_:
raise Exception(
"Volume with uuid {} already created".format(uuid)
)
self.uuid = uuid
else:
self.uuid = str(id(self))
type(self)._uuid_[self.uuid] = self
self.reset_stats()
self.is_online = True
self.opened = False
def do_open(self):
self.opened = True
return 0
def close(self):
pass
self.opened = False
def get_length(self):
return self.size
def resize(self, size):
self.size = size
self.data = create_string_buffer(int(self.size))
memset(self.data, self.VOLUME_POISON, self.size)
self._storage = cast(self.data, c_void_p)
raise NotImplementedError
def get_max_io_size(self):
return S.from_KiB(128)
raise NotImplementedError
def submit_flush(self, flush):
flush.contents._end(flush, 0)
def do_submit_flush(self, flush):
raise NotImplementedError
def submit_discard(self, discard):
try:
dst = self._storage + discard.contents._addr
memset(dst, 0, discard.contents._bytes)
discard.contents._end(discard, 0)
except: # noqa E722
discard.contents._end(discard, -OcfErrorCode.OCF_ERR_NOT_SUPP)
def do_submit_discard(self, discard):
raise NotImplementedError
def get_stats(self):
return self.stats
@@ -283,10 +273,103 @@ class Volume(Structure):
def reset_stats(self):
self.stats = {IoDir.WRITE: 0, IoDir.READ: 0}
def submit_io(self, io):
try:
self.stats[IoDir(io.contents._dir)] += 1
def inc_stats(self, _dir):
self.stats[_dir] += 1
def do_submit_io(self, io):
raise NotImplementedError
def dump(self, offset=0, size=0, ignore=VOLUME_POISON, **kwargs):
raise NotImplementedError
def md5(self):
raise NotImplementedError
def offline(self):
self.is_online = False
def online(self):
self.is_online = True
def _reject_io(self, io):
cast(io, POINTER(Io)).contents._end(io, -OcfErrorCode.OCF_ERR_IO)
def submit_flush(self, io):
if self.is_online:
self.do_submit_flush(io)
else:
self._reject_io(io)
def submit_io(self, io):
if self.is_online:
self.inc_stats(IoDir(io.contents._dir))
self.do_submit_io(io)
else:
self._reject_io(io)
def submit_discard(self, io):
if self.is_online:
self.do_submit_discard(io)
else:
self._reject_io(io)
def new_io(
self,
queue: Queue,
addr: int,
length: int,
direction: IoDir,
io_class: int,
flags: int,
):
lib = OcfLib.getInstance()
io = lib.ocf_volume_new_io(
self.handle, queue.handle, addr, length, direction, io_class, flags
)
return Io.from_pointer(io)
class RamVolume(Volume):
props = None
def __init__(self, size: S, uuid=None):
super().__init__(uuid)
self.size = size
self.data = create_string_buffer(int(self.size))
memset(self.data, VOLUME_POISON, self.size)
self.data_ptr = cast(self.data, c_void_p).value
def get_copy(self):
new_volume = RamVolume(self.size)
memmove(new_volume.data, self.data, self.size)
return new_volume
def get_length(self):
return self.size
def resize(self, size):
self.size = size
self.data = create_string_buffer(int(self.size))
memset(self.data, VOLUME_POISON, self.size)
self.data_ptr = cast(self.data, c_void_p).value
def get_max_io_size(self):
return S.from_KiB(128)
def do_submit_flush(self, flush):
flush.contents._end(flush, 0)
def do_submit_discard(self, discard):
try:
dst = self.data_ptr + discard.contents._addr
memset(dst, 0, discard.contents._bytes)
discard.contents._end(discard, 0)
except: # noqa E722
discard.contents._end(discard, -OcfErrorCode.OCF_ERR_NOT_SUPP)
def do_submit_io(self, io):
try:
io_priv = cast(
OcfLib.getInstance().ocf_io_get_priv(io), POINTER(VolumeIoPriv))
offset = io_priv.contents._offset
@@ -294,11 +377,11 @@ class Volume(Structure):
if io.contents._dir == IoDir.WRITE:
src_ptr = cast(OcfLib.getInstance().ocf_io_get_data(io), c_void_p)
src = Data.get_instance(src_ptr.value).handle.value + offset
dst = self._storage + io.contents._addr
dst = self.data_ptr + io.contents._addr
elif io.contents._dir == IoDir.READ:
dst_ptr = cast(OcfLib.getInstance().ocf_io_get_data(io), c_void_p)
dst = Data.get_instance(dst_ptr.value).handle.value + offset
src = self._storage + io.contents._addr
src = self.data_ptr + io.contents._addr
memmove(dst, src, io.contents._bytes)
io_priv.contents._offset += io.contents._bytes
@@ -311,18 +394,18 @@ class Volume(Structure):
if size == 0:
size = int(self.size) - int(offset)
print_buffer(self._storage, size, ignore=ignore, **kwargs)
print_buffer(self.data_ptr, size, ignore=ignore, **kwargs)
def md5(self):
m = md5()
m.update(string_at(self._storage, self.size))
m.update(string_at(self.data_ptr, self.size))
return m.hexdigest()
def get_bytes(self):
return string_at(self._storage, self.size)
return string_at(self.data_ptr, self.size)
class ErrorDevice(Volume):
class ErrorDevice(RamVolume):
def __init__(
self,
size,
@@ -341,9 +424,9 @@ class ErrorDevice(Volume):
def set_mapping(self, error_sectors: set):
self.error_sectors = error_sectors
def submit_io(self, io):
def do_submit_io(self, io):
if not self.armed:
super().submit_io(io)
super().do_submit_io(io)
return
direction = IoDir(io.contents._dir)
@@ -368,7 +451,7 @@ class ErrorDevice(Volume):
io.contents._end(io, -OcfErrorCode.OCF_ERR_IO)
self.stats["errors"][direction] += 1
else:
super().submit_io(io)
super().do_submit_io(io)
def arm(self):
self.armed = True
@@ -384,24 +467,19 @@ class ErrorDevice(Volume):
self.stats["errors"] = {IoDir.WRITE: 0, IoDir.READ: 0}
class TraceDevice(Volume):
def __init__(self, size, trace_fcn=None, uuid=None):
super().__init__(size, uuid)
self.trace_fcn = trace_fcn
def submit_io(self, io):
submit = True
if self.trace_fcn:
submit = self.trace_fcn(self, io)
if submit:
super().submit_io(io)
lib = OcfLib.getInstance()
lib.ocf_io_get_priv.restype = POINTER(VolumeIoPriv)
lib.ocf_io_get_volume.argtypes = [c_void_p]
lib.ocf_io_get_volume.restype = c_void_p
lib.ocf_io_get_data.argtypes = [c_void_p]
lib.ocf_io_get_data.restype = c_void_p
lib.ocf_volume_new_io.argtypes = [
c_void_p,
c_void_p,
c_uint64,
c_uint32,
c_uint32,
c_uint32,
c_uint64,
]
lib.ocf_volume_new_io.restype = c_void_p

View File

@@ -0,0 +1,31 @@
#
# Copyright(c) 2022 Intel Corporation
# SPDX-License-Identifier: BSD-3-Clause
#
from ctypes import cast, POINTER
from .cache import Cache
from .io import Io
from .io import IoDir
from .volume_exp_obj import ExpObjVolume
from .volume import Volume
class CacheVolume(ExpObjVolume):
def __init__(self, cache, open=False, uuid=None):
super().__init__(cache, uuid)
self.cache = cache
self.lib = cache.owner.lib
if open:
self.open()
def open(self):
return Volume.open(
self.lib.ocf_cache_get_front_volume(self.cache.handle),
self
)
def md5(self):
cache_line_size = int(self.cache.get_stats()['conf']['cache_line_size'])
return self._exp_obj_md5(cache_line_size)

View File

@@ -0,0 +1,27 @@
#
# Copyright(c) 2022 Intel Corporation
# SPDX-License-Identifier: BSD-3-Clause
#
from .core import Core
from .volume_exp_obj import ExpObjVolume
from .io import IoDir
from .volume import Volume
class CoreVolume(ExpObjVolume):
def __init__(self, core, open=False, uuid=None):
super().__init__(core, uuid)
self.core = core
self.lib = core.cache.owner.lib
if open:
self.open()
def open(self):
return Volume.open(
self.lib.ocf_core_get_front_volume(self.core.handle),
self
)
def md5(self):
return self._exp_obj_md5(4096)

View File

@@ -0,0 +1,126 @@
#
# Copyright(c) 2022 Intel Corporation
# SPDX-License-Identifier: BSD-3-Clause
#
import logging
from ctypes import c_int, c_void_p, CFUNCTYPE, byref, c_uint32, c_uint64, cast, POINTER
from ..ocf import OcfLib
from .volume import Volume, VOLUME_POISON
from pyocf.utils import Size
from pyocf.types.data import Data
from pyocf.types.io import IoDir, Io
from pyocf.types.shared import OcfCompletion
class ExpObjVolume(Volume):
def __init__(self, parent, uuid=None):
super().__init__(uuid)
self.parent = parent
def __alloc_io(self, addr, _bytes, _dir, _class, _flags):
vol = self.parent.get_front_volume()
queue = self.parent.get_default_queue() # TODO multiple queues?
return vol.new_io(
queue, addr, _bytes, _dir, _class, _flags
)
def _alloc_io(self, io):
exp_obj_io = self.__alloc_io(
io.contents._addr,
io.contents._bytes,
io.contents._dir,
io.contents._class,
io.contents._flags,
)
lib = OcfLib.getInstance()
cdata = OcfLib.getInstance().ocf_io_get_data(io)
OcfLib.getInstance().ocf_io_set_data(byref(exp_obj_io), cdata, 0)
def cb(error):
nonlocal io
io = cast(io, POINTER(Io))
io.contents._end(io, error)
exp_obj_io.callback = cb
return exp_obj_io
def get_length(self):
return Size.from_B(OcfLib.getInstance().ocf_volume_get_length(self.c_vol))
def get_max_io_size(self):
return Size.from_B(OcfLib.getInstance().ocf_volume_get_max_io_size(self.c_vol))
def do_submit_io(self, io):
io = self._alloc_io(io)
io.submit()
def do_submit_flush(self, flush):
io = self._alloc_io(flush)
io.submit_flush()
def do_submit_discard(self, discard):
io = self._alloc_io(discard)
io.submit_discard()
def _read(self, offset=0, size=0):
if size == 0:
size = self.get_length().B - offset
exp_obj_io = self.__alloc_io(offset, size, IoDir.READ, 0, 0)
completion = OcfCompletion([("err", c_int)])
exp_obj_io.callback = completion
data = Data.from_bytes(bytes(size))
exp_obj_io.set_data(data)
exp_obj_io.submit()
completion.wait()
error = completion.results["err"]
if error:
raise Exception("error reading exported object for dump")
return data
def dump(self, offset=0, size=0, ignore=VOLUME_POISON, **kwargs):
data = self._read(offset, size)
data.dump(ignore=ifnore, **kwargs)
def md5(self):
raise NotImplementedError
def _exp_obj_md5(self, read_size):
logging.getLogger("pyocf").warning(
"Reading whole exported object! This disturbs statistics values"
)
read_buffer_all = Data(self.parent.device.size)
read_buffer = Data(read_size)
position = 0
while position < read_buffer_all.size:
io = self.new_io(self.parent.get_default_queue(), position,
read_size, IoDir.READ, 0, 0)
io.set_data(read_buffer)
cmpl = OcfCompletion([("err", c_int)])
io.callback = cmpl.callback
io.submit()
cmpl.wait()
if cmpl.results["err"]:
raise Exception("Error reading whole exported object")
read_buffer_all.copy(read_buffer, position, 0, read_size)
position += read_size
return read_buffer_all.md5()
lib = OcfLib.getInstance()
lib.ocf_volume_get_max_io_size.argtypes = [c_void_p]
lib.ocf_volume_get_max_io_size.restype = c_uint32
lib.ocf_volume_get_length.argtypes = [c_void_p]
lib.ocf_volume_get_length.restype = c_uint64
lib.ocf_io_get_data.argtypes = [POINTER(Io)]
lib.ocf_io_get_data.restype = c_void_p

View File

@@ -0,0 +1,92 @@
#
# Copyright(c) 2022 Intel Corporation
# SPDX-License-Identifier: BSD-3-Clause
#
from threading import Lock
from .volume import Volume, VOLUME_POISON
from .io import Io, IoDir
from ctypes import cast, c_void_p, CFUNCTYPE, c_int, POINTER, memmove, sizeof, pointer
class ReplicatedVolume(Volume):
def __init__(self, primary: Volume, secondary: Volume, uuid=None):
super().__init__(uuid)
self.primary = primary
self.secondary = secondary
if secondary.get_max_io_size() < primary.get_max_io_size():
raise Exception("secondary volume max io size too small")
if secondary.get_length() < primary.get_length():
raise Exception("secondary volume size too small")
def do_open(self):
ret = self.primary.do_open()
if ret:
return ret
ret = self.secondary.do_open()
if ret:
self.primary.close()
return ret
def close(self):
self.primary.close()
self.secondary.close()
def get_length(self):
return self.primary.get_length()
def get_max_io_size(self):
return self.primary.get_max_io_size()
def _prepare_io(self, io):
original_cb = Io.END()
pointer(original_cb)[0] = io.contents._end
lock = Lock()
error = 0
io_remaining = 2
@CFUNCTYPE(None, c_void_p, c_int)
def cb(io, err):
nonlocal io_remaining
nonlocal error
nonlocal original_cb
nonlocal lock
io = cast(io, POINTER(Io))
with lock:
if err:
error = err
io_remaining -= 1
finished = True if io_remaining == 0 else False
if finished:
io.contents._end = original_cb
original_cb(io, error)
io.contents._end = cb
def do_submit_io(self, io):
if io.contents._dir == IoDir.WRITE:
self._prepare_io(io)
self.primary.submit_io(io)
self.secondary.submit_io(io)
else:
# for read just pass through down to primary
# with original completion
self.primary.submit_io(io)
def do_submit_flush(self, flush):
self._prepare_io(flush)
self.primary.submit_flush(flush)
self.secondary.submit_flush(flush)
def do_submit_discard(self, discard):
self._prepare_io(discard)
self.primary.submit_discard(discard)
self.secondary.submit_discard(discard)
def dump(self, offset=0, size=0, ignore=VOLUME_POISON, **kwargs):
self.primary.dump()
def md5(self):
return self.primary.md5()

View File

@@ -1,18 +1,11 @@
/*
* Copyright(c) 2012-2021 Intel Corporation
* Copyright(c) 2012-2022 Intel Corporation
* SPDX-License-Identifier: BSD-3-Clause
*/
#include "ocf/ocf_io.h"
#include "ocf/ocf_core.h"
struct ocf_io *ocf_core_new_io_wrapper(ocf_core_t core, ocf_queue_t queue,
uint64_t addr, uint32_t bytes, uint32_t dir,
uint32_t io_class, uint64_t flags)
{
return ocf_core_new_io(core, queue, addr, bytes, dir, io_class, flags);
}
void ocf_io_set_cmpl_wrapper(struct ocf_io *io, void *context,
void *context2, ocf_end_io_t fn)
{
@@ -34,3 +27,13 @@ void ocf_core_submit_io_wrapper(struct ocf_io *io)
ocf_core_submit_io(io);
}
void ocf_core_submit_flush_wrapper(struct ocf_io *io)
{
ocf_core_submit_flush(io);
}
void ocf_core_submit_discard_wrapper(struct ocf_io *io)
{
ocf_core_submit_discard(io);
}