Make pyocf work with async API

Signed-off-by: Jan Musial <jan.musial@intel.com>
This commit is contained in:
Jan Musial 2019-03-14 14:45:48 +01:00
parent cc30794160
commit 545d5b8aac
18 changed files with 380 additions and 111 deletions

View File

@ -2,7 +2,7 @@
# Copyright(c) 2019 Intel Corporation # Copyright(c) 2019 Intel Corporation
# SPDX-License-Identifier: BSD-3-Clause-Clear # SPDX-License-Identifier: BSD-3-Clause-Clear
# #
from ctypes import * from ctypes import c_void_p, cdll
lib = None lib = None

View File

@ -3,16 +3,29 @@
# SPDX-License-Identifier: BSD-3-Clause-Clear # SPDX-License-Identifier: BSD-3-Clause-Clear
# #
from ctypes import * from ctypes import (
c_uint64,
c_uint32,
c_uint16,
c_int,
c_char_p,
c_void_p,
c_bool,
c_uint8,
Structure,
byref,
cast,
create_string_buffer,
)
from enum import IntEnum from enum import IntEnum
import logging
from datetime import timedelta from datetime import timedelta
from .shared import Uuid, OcfError, CacheLineSize, CacheLines from .shared import Uuid, OcfError, CacheLineSize, CacheLines, OcfCompletion
from ..utils import Size, struct_to_dict from ..utils import Size, struct_to_dict
from .core import Core from .core import Core
from .stats.cache import * from .queue import Queue
from .stats.shared import * from .stats.cache import CacheInfo
from .stats.shared import UsageStats, RequestsStats, BlocksStats, ErrorsStats
class Backfill(Structure): class Backfill(Structure):
@ -100,6 +113,7 @@ class Cache:
self.owner = owner self.owner = owner
self.cache_line_size = cache_line_size self.cache_line_size = cache_line_size
self.cfg = CacheConfig( self.cfg = CacheConfig(
_id=cache_id, _id=cache_id,
_name=name.encode("ascii") if name else None, _name=name.encode("ascii") if name else None,
@ -109,22 +123,39 @@ class Cache:
_metadata_layout=metadata_layout, _metadata_layout=metadata_layout,
_metadata_volatile=metadata_volatile, _metadata_volatile=metadata_volatile,
_backfill=Backfill( _backfill=Backfill(
_max_queue_size=max_queue_size, _queue_unblock_size=queue_unblock_size _max_queue_size=max_queue_size,
_queue_unblock_size=queue_unblock_size,
), ),
_locked=locked, _locked=locked,
_pt_unaligned_io=pt_unaligned_io, _pt_unaligned_io=pt_unaligned_io,
_use_submit_fast=use_submit_fast, _use_submit_fast=use_submit_fast,
) )
self.cache_handle = c_void_p() self.cache_handle = c_void_p()
self.queues = [] self._as_parameter_ = self.cache_handle
self.io_queues = []
def start_cache(self): def start_cache(
self, mngt_queue: Queue = None, use_mngt_queue_for_io: bool = True
):
status = self.owner.lib.ocf_mngt_cache_start( status = self.owner.lib.ocf_mngt_cache_start(
self.owner.ctx_handle, byref(self.cache_handle), byref(self.cfg) self.owner.ctx_handle, byref(self.cache_handle), byref(self.cfg)
) )
if status: if status:
raise OcfError("Creating cache instance failed", status) raise OcfError("Creating cache instance failed", status)
self.owner.caches += [self] self.owner.caches += [self]
self.name = self.owner.lib.ocf_cache_get_name(self)
self.mngt_queue = mngt_queue or Queue(
self, "mgmt-{}".format(self.name), mngt_queue=True
)
if use_mngt_queue_for_io:
self.io_queues += [self.mngt_queue]
status = self.owner.lib.ocf_mngt_cache_set_mngt_queue(
self, self.mngt_queue
)
if status:
raise OcfError("Error setting management queue", status)
def configure_device( def configure_device(
self, device, force=False, perform_test=False, cache_line_size=None self, device, force=False, perform_test=False, cache_line_size=None
@ -133,7 +164,8 @@ class Cache:
self.dev_cfg = CacheDeviceConfig( self.dev_cfg = CacheDeviceConfig(
_uuid=Uuid( _uuid=Uuid(
_data=cast( _data=cast(
create_string_buffer(self.device_name.encode("ascii")), c_char_p create_string_buffer(self.device_name.encode("ascii")),
c_char_p,
), ),
_size=len(self.device_name) + 1, _size=len(self.device_name) + 1,
), ),
@ -148,31 +180,39 @@ class Cache:
) )
def attach_device( def attach_device(
self, device, force=False, perform_test=False, cache_line_size=None, self, device, force=False, perform_test=False, cache_line_size=None
): ):
self.configure_device(device, force, perform_test, cache_line_size) self.configure_device(device, force, perform_test, cache_line_size)
status = device.owner.lib.ocf_mngt_cache_attach( c = OcfCompletion(
self.cache_handle, byref(self.dev_cfg) [("cache", c_void_p), ("priv", c_void_p), ("error", c_int)]
) )
if status:
raise OcfError("Attaching cache device failed", status) device.owner.lib.ocf_mngt_cache_attach(
self.cache_handle, byref(self.dev_cfg), c, None
)
c.wait()
if c.results["error"]:
raise OcfError("Attaching cache device failed", c.results["error"])
def load_cache(self, device): def load_cache(self, device):
self.configure_device(device) self.configure_device(device)
c = OcfCompletion(
status = device.owner.lib.ocf_mngt_cache_load( [("cache", c_void_p), ("priv", c_void_p), ("error", c_int)]
self.owner.ctx_handle,
byref(self.cache_handle),
byref(self.cfg),
byref(self.dev_cfg),
) )
if status: device.owner.lib.ocf_mngt_cache_load(
raise OcfError("Loading cache device failed", status) self.cache_handle, byref(self.dev_cfg), c, None
)
c.wait()
if c.results["error"]:
raise OcfError("Loading cache device failed", c.results["error"])
@classmethod @classmethod
def load_from_device(cls, device, name=""): def load_from_device(cls, device, name=""):
c = cls(name=name, owner=device.owner) c = cls(name=name, owner=device.owner)
c.start_cache()
c.load_cache(device) c.load_cache(device)
return c return c
@ -181,7 +221,12 @@ class Cache:
c = cls(locked=True, owner=device.owner, **kwargs) c = cls(locked=True, owner=device.owner, **kwargs)
c.start_cache() c.start_cache()
c.attach_device(device, force=True) try:
c.attach_device(device, force=True)
except:
c.stop(flush=False)
raise
return c return c
def _get_and_lock(self, read=True): def _get_and_lock(self, read=True):
@ -221,15 +266,26 @@ class Cache:
def add_core(self, core: Core): def add_core(self, core: Core):
self.get_and_write_lock() self.get_and_write_lock()
status = self.owner.lib.ocf_mngt_cache_add_core( c = OcfCompletion(
self.cache_handle, byref(core.get_handle()), byref(core.get_cfg()) [
("cache", c_void_p),
("core", c_void_p),
("priv", c_void_p),
("error", c_int),
]
) )
if status: self.owner.lib.ocf_mngt_cache_add_core(
self.cache_handle, byref(core.get_cfg()), c, None
)
c.wait()
if c.results["error"]:
self.put_and_write_unlock() self.put_and_write_unlock()
raise OcfError("Failed adding core", status) raise OcfError("Failed adding core", c.results["error"])
core.cache = self core.cache = self
core.handle = c.results["core"]
self.put_and_write_unlock() self.put_and_write_unlock()
@ -242,13 +298,19 @@ class Cache:
self.get_and_read_lock() self.get_and_read_lock()
status = self.owner.lib.ocf_cache_get_info(self.cache_handle, byref(cache_info)) status = self.owner.lib.ocf_cache_get_info(
self.cache_handle, byref(cache_info)
)
if status: if status:
self.put_and_read_unlock() self.put_and_read_unlock()
raise OcfError("Failed getting cache info", status) raise OcfError("Failed getting cache info", status)
status = self.owner.lib.ocf_stats_collect_cache( status = self.owner.lib.ocf_stats_collect_cache(
self.cache_handle, byref(usage), byref(req), byref(block), byref(errors) self.cache_handle,
byref(usage),
byref(req),
byref(block),
byref(errors),
) )
if status: if status:
self.put_and_read_unlock() self.put_and_read_unlock()
@ -263,7 +325,9 @@ class Cache:
"volume_type": self.owner.volume_types[cache_info.volume_type], "volume_type": self.owner.volume_types[cache_info.volume_type],
"size": CacheLines(cache_info.size, line_size), "size": CacheLines(cache_info.size, line_size),
"inactive": { "inactive": {
"occupancy": CacheLines(cache_info.inactive.occupancy, line_size), "occupancy": CacheLines(
cache_info.inactive.occupancy, line_size
),
"dirty": CacheLines(cache_info.inactive.dirty, line_size), "dirty": CacheLines(cache_info.inactive.dirty, line_size),
}, },
"occupancy": CacheLines(cache_info.occupancy, line_size), "occupancy": CacheLines(cache_info.occupancy, line_size),
@ -294,19 +358,34 @@ class Cache:
self.owner.lib.ocf_core_stats_initialize_all(self.cache_handle) self.owner.lib.ocf_core_stats_initialize_all(self.cache_handle)
def get_default_queue(self): def get_default_queue(self):
if not self.queues: if not self.io_queues:
raise Exception("No queues added for cache") raise Exception("No queues added for cache")
return self.queues[0] return self.io_queues[0]
def stop(self, flush: bool = True):
def stop(self):
self.get_and_write_lock() self.get_and_write_lock()
status = self.owner.lib.ocf_mngt_cache_stop(self.cache_handle) if flush:
if status: c = OcfCompletion(
[("cache", c_void_p), ("priv", c_void_p), ("error", c_int)]
)
self.owner.lib.ocf_mngt_cache_flush(self.cache_handle, False, c, None)
c.wait()
if c.results["error"]:
self.put_and_write_unlock()
raise OcfError("Couldn't flush cache", c.results["error"])
c = OcfCompletion(
[("cache", c_void_p), ("priv", c_void_p), ("error", c_int)]
)
self.owner.lib.ocf_mngt_cache_stop(self.cache_handle, c, None)
c.wait()
if c.results["error"]:
self.put_and_write_unlock() self.put_and_write_unlock()
raise OcfError("Failed stopping cache", status) raise OcfError("Failed stopping cache", c.results["error"])
self.put_and_write_unlock() self.put_and_write_unlock()
self.owner.caches.remove(self) self.owner.caches.remove(self)

View File

@ -3,7 +3,7 @@
# SPDX-License-Identifier: BSD-3-Clause-Clear # SPDX-License-Identifier: BSD-3-Clause-Clear
# #
from ctypes import * from ctypes import c_void_p, CFUNCTYPE, Structure, c_int
from .shared import SharedOcfObject from .shared import SharedOcfObject

View File

@ -3,19 +3,31 @@
# SPDX-License-Identifier: BSD-3-Clause-Clear # SPDX-License-Identifier: BSD-3-Clause-Clear
# #
from ctypes import * from ctypes import (
c_size_t,
c_void_p,
Structure,
c_int,
c_uint8,
c_uint16,
c_char_p,
c_bool,
c_uint32,
cast,
byref,
create_string_buffer,
)
import logging import logging
from datetime import timedelta from datetime import timedelta
from ..ocf import OcfLib from ..ocf import OcfLib
from .shared import Uuid from .shared import Uuid, OcfCompletion, OcfError
from .volume import Volume from .volume import Volume
from .data import Data from .data import Data
from .io import Io, IoDir from .io import Io, IoDir
from .stats.shared import * from .stats.shared import UsageStats, RequestsStats, BlocksStats, ErrorsStats
from .stats.core import * from .stats.core import CoreStats
from ..utils import Size, struct_to_dict from ..utils import Size, struct_to_dict
from .queue import Queue
class UserMetadata(Structure): class UserMetadata(Structure):
@ -55,7 +67,8 @@ class Core:
self.cfg = CoreConfig( self.cfg = CoreConfig(
_uuid=Uuid( _uuid=Uuid(
_data=cast( _data=cast(
create_string_buffer(self.device_name.encode("ascii")), c_char_p create_string_buffer(self.device_name.encode("ascii")),
c_char_p,
), ),
_size=len(self.device_name) + 1, _size=len(self.device_name) + 1,
), ),
@ -108,7 +121,9 @@ class Core:
self.cache.put_and_unlock(True) self.cache.put_and_unlock(True)
raise OcfError("Failed collecting core stats", status) raise OcfError("Failed collecting core stats", status)
status = self.cache.owner.lib.ocf_core_get_stats(self.handle, byref(core_stats)) status = self.cache.owner.lib.ocf_core_get_stats(
self.handle, byref(core_stats)
)
if status: if status:
self.cache.put_and_unlock(True) self.cache.put_and_unlock(True)
raise OcfError("Failed getting core stats", status) raise OcfError("Failed getting core stats", status)
@ -135,7 +150,15 @@ class Core:
io.configure(0, read_buffer.size, IoDir.READ, 0, 0) io.configure(0, read_buffer.size, IoDir.READ, 0, 0)
io.set_data(read_buffer) io.set_data(read_buffer)
io.set_queue(self.cache.get_default_queue()) io.set_queue(self.cache.get_default_queue())
cmpl = OcfCompletion([("err", c_int)])
io.callback = cmpl.callback
io.submit() io.submit()
cmpl.wait()
if cmpl.results["err"]:
raise Exception("Error reading whole exported object")
return read_buffer.md5() return read_buffer.md5()

View File

@ -3,7 +3,7 @@
# SPDX-License-Identifier: BSD-3-Clause-Clear # SPDX-License-Identifier: BSD-3-Clause-Clear
# #
from ctypes import * from ctypes import c_void_p, Structure, c_char_p, cast, pointer, byref
from enum import IntEnum from enum import IntEnum
from .logger import LoggerOps, Logger from .logger import LoggerOps, Logger

View File

@ -3,7 +3,20 @@
# SPDX-License-Identifier: BSD-3-Clause-Clear # SPDX-License-Identifier: BSD-3-Clause-Clear
# #
from ctypes import * from ctypes import (
c_void_p,
c_uint32,
CFUNCTYPE,
c_uint64,
create_string_buffer,
cast,
memset,
c_char_p,
string_at,
Structure,
c_int,
memmove,
)
from enum import IntEnum from enum import IntEnum
from hashlib import md5 from hashlib import md5
@ -150,7 +163,9 @@ class Data(SharedOcfObject):
@staticmethod @staticmethod
@DataOps.COPY @DataOps.COPY
def _copy(dst, src, end, start, size): def _copy(dst, src, end, start, size):
return Data.get_instance(dst).copy(Data.get_instance(src), end, start, size) return Data.get_instance(dst).copy(
Data.get_instance(src), end, start, size
)
@staticmethod @staticmethod
@DataOps.SECURE_ERASE @DataOps.SECURE_ERASE

View File

@ -3,8 +3,18 @@
# SPDX-License-Identifier: BSD-3-Clause-Clear # SPDX-License-Identifier: BSD-3-Clause-Clear
# #
from ctypes import * from ctypes import (
from enum import IntEnum, IntFlag, auto c_void_p,
c_int,
c_uint32,
c_uint64,
CFUNCTYPE,
Structure,
POINTER,
byref,
cast,
)
from enum import IntEnum, auto
from ..ocf import OcfLib from ..ocf import OcfLib
from .data import Data from .data import Data
@ -46,7 +56,9 @@ class Io(Structure):
def from_pointer(cls, ref): def from_pointer(cls, ref):
c = cls.from_address(ref) c = cls.from_address(ref)
cls._instances_[ref] = c cls._instances_[ref] = c
OcfLib.getInstance().ocf_io_set_cmpl_wrapper(byref(c), None, None, c.c_end) OcfLib.getInstance().ocf_io_set_cmpl_wrapper(
byref(c), None, None, c.c_end
)
return c return c
@classmethod @classmethod
@ -78,8 +90,10 @@ class Io(Structure):
Io.get_instance(io).handle(opaque) Io.get_instance(io).handle(opaque)
def end(self, err): def end(self, err):
if err: try:
print("IO err {}".format(err)) self.callback(err)
except:
pass
self.put() self.put()
self.del_object() self.del_object()

View File

@ -3,7 +3,17 @@
# SPDX-License-Identifier: BSD-3-Clause-Clear # SPDX-License-Identifier: BSD-3-Clause-Clear
# #
from ctypes import * from ctypes import (
c_void_p,
Structure,
c_void_p,
c_char_p,
c_uint,
c_int,
cast,
CFUNCTYPE,
pointer,
)
from enum import IntEnum from enum import IntEnum
import logging import logging
from io import StringIO from io import StringIO
@ -113,7 +123,9 @@ class DefaultLogger(Logger):
self.level = level self.level = level
ch = logging.StreamHandler() ch = logging.StreamHandler()
fmt = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") fmt = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
ch.setFormatter(fmt) ch.setFormatter(fmt)
ch.setLevel(LevelMapping[level]) ch.setLevel(LevelMapping[level])
logger.addHandler(ch) logger.addHandler(ch)
@ -128,7 +140,9 @@ class DefaultLogger(Logger):
class FileLogger(Logger): class FileLogger(Logger):
def __init__(self, f, console_level=None): def __init__(self, f, console_level=None):
super().__init__() super().__init__()
fmt = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") fmt = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
fh = logging.FileHandler(f) fh = logging.FileHandler(f)
fh.setLevel(logging.DEBUG) fh.setLevel(logging.DEBUG)

View File

@ -3,7 +3,7 @@
# SPDX-License-Identifier: BSD-3-Clause-Clear # SPDX-License-Identifier: BSD-3-Clause-Clear
# #
from ctypes import * from ctypes import c_void_p, c_int, Structure, CFUNCTYPE
from .shared import SharedOcfObject from .shared import SharedOcfObject

View File

@ -3,37 +3,70 @@
# SPDX-License-Identifier: BSD-3-Clause-Clear # SPDX-License-Identifier: BSD-3-Clause-Clear
# #
from ctypes import * from ctypes import c_void_p, CFUNCTYPE, Structure, byref
from threading import Thread, Condition, Lock
from ..ocf import OcfLib from ..ocf import OcfLib
from .shared import SharedOcfObject, OcfError from .shared import OcfError
class QueueOps(Structure): class QueueOps(Structure):
KICK = CFUNCTYPE(None, c_void_p) KICK = CFUNCTYPE(None, c_void_p)
KICK_SYNC = CFUNCTYPE(None, c_void_p) KICK_SYNC = CFUNCTYPE(None, c_void_p)
KICK = CFUNCTYPE(None, c_void_p)
STOP = CFUNCTYPE(None, c_void_p) STOP = CFUNCTYPE(None, c_void_p)
_fields_ = [ _fields_ = [("kick", KICK), ("kick_sync", KICK_SYNC), ("stop", STOP)]
("kick", KICK),
("kick_sync", KICK_SYNC),
("stop", STOP), class Queue:
] pass
class Queue: class Queue:
_instances_ = {} _instances_ = {}
def __init__(self, cache): @staticmethod
self.ops = QueueOps(kick_sync=type(self)._kick_sync, stop=type(self)._stop) def io_queue_run(*, queue: Queue, kick: Condition):
self.handle = c_void_p() def wait_predicate():
return queue.stop or OcfLib.getInstance().ocf_queue_pending_io(queue)
status = OcfLib.getInstance().ocf_queue_create(cache.cache_handle, byref(self.handle), byref(self.ops)) while True:
with kick:
kick.wait_for(wait_predicate)
queue.owner.lib.ocf_queue_run(queue)
if queue.stop and not queue.owner.lib.ocf_queue_pending_io(queue):
break
def __init__(self, cache, name, mngt_queue: bool = False):
self.owner = cache.owner
self.ops = QueueOps(kick=type(self)._kick, stop=type(self)._stop)
self.handle = c_void_p()
status = self.owner.lib.ocf_queue_create(
cache.cache_handle, byref(self.handle), byref(self.ops)
)
if status: if status:
raise OcfError("Couldn't create queue object", status) raise OcfError("Couldn't create queue object", status)
Queue._instances_[self.handle.value] = self Queue._instances_[self.handle.value] = self
cache.queues += [self] self._as_parameter_ = self.handle
self.stop_lock = Lock()
self.stop = False
self.kick_condition = Condition(self.stop_lock)
self.thread = Thread(
group=None,
target=Queue.io_queue_run,
name=name,
kwargs={"queue": self, "kick": self.kick_condition},
daemon=True,
)
self.thread.start()
self.mngt_queue = mngt_queue
@classmethod @classmethod
def get_instance(cls, ref): def get_instance(cls, ref):
@ -44,13 +77,28 @@ class Queue:
def _kick_sync(ref): def _kick_sync(ref):
Queue.get_instance(ref).kick_sync() Queue.get_instance(ref).kick_sync()
@staticmethod
@QueueOps.KICK
def _kick(ref):
Queue.get_instance(ref).kick()
@staticmethod @staticmethod
@QueueOps.STOP @QueueOps.STOP
def _stop(ref): def _stop(ref):
Queue.get_instance(ref).stop() Queue.get_instance(ref).stop()
def kick_sync(self): def kick_sync(self):
OcfLib.getInstance().ocf_queue_run(self.handle) self.owner.lib.ocf_queue_run(self.handle)
def kick(self):
with self.kick_condition:
self.kick_condition.notify_all()
def stop(self): def stop(self):
pass with self.kick_condition:
self.stop = True
self.kick_condition.notify_all()
self.thread.join()
if self.mngt_queue:
self.owner.lib.ocf_queue_put(self)

View File

@ -3,8 +3,10 @@
# SPDX-License-Identifier: BSD-3-Clause-Clear # SPDX-License-Identifier: BSD-3-Clause-Clear
# #
from ctypes import * from ctypes import CFUNCTYPE, c_size_t, c_char_p, Structure, c_void_p
from enum import IntEnum, auto from enum import IntEnum, auto
from threading import Event
import logging
from ..utils import Size as S from ..utils import Size as S
@ -39,6 +41,41 @@ class OcfErrorCode(IntEnum):
OCF_ERR_INVALID_CACHE_LINE_SIZE = auto() OCF_ERR_INVALID_CACHE_LINE_SIZE = auto()
class OcfCompletion:
"""
This class provides Completion mechanism for interacting with OCF async
management API.
"""
def __init__(self, completion_args: list):
"""
Provide ctypes arg list, and optionally index of status argument in
completion function which will be extracted (default - last argument).
:param completion_args: list of tuples (parameter name, parameter type)
for OCF completion function
"""
self.e = Event()
self.completion_args = completion_args
self._as_parameter_ = self.callback
@property
def callback(self):
arg_types = list(list(zip(*self.completion_args))[1])
@CFUNCTYPE(c_void_p, *arg_types)
def complete(*args):
self.results = {}
for i, arg in enumerate(args):
self.results[self.completion_args[i][0]] = arg
self.e.set()
return complete
def wait(self):
self.e.wait()
class OcfError(BaseException): class OcfError(BaseException):
def __init__(self, msg, error_code): def __init__(self, msg, error_code):
super().__init__(self, msg) super().__init__(self, msg)
@ -61,7 +98,7 @@ class SharedOcfObject(Structure):
try: try:
return cls._instances_[ref] return cls._instances_[ref]
except: except:
print( logging.get_logger("pyocf").error(
"OcfSharedObject corruption. wanted: {} instances: {}".format( "OcfSharedObject corruption. wanted: {} instances: {}".format(
ref, cls._instances_ ref, cls._instances_
) )

View File

@ -3,7 +3,7 @@
# SPDX-License-Identifier: BSD-3-Clause-Clear # SPDX-License-Identifier: BSD-3-Clause-Clear
# #
from ctypes import * from ctypes import c_uint8, c_uint32, c_uint64, c_bool, c_int, Structure
class _Inactive(Structure): class _Inactive(Structure):

View File

@ -4,9 +4,9 @@
# SPDX-License-Identifier: BSD-3-Clause-Clear # SPDX-License-Identifier: BSD-3-Clause-Clear
# #
from ctypes import * from ctypes import c_uint32, c_uint64, Structure
from .shared import * from .shared import OcfStatsReq, OcfStatsBlock, OcfStatsDebug, OcfStatsError
class CoreStats(Structure): class CoreStats(Structure):

View File

@ -3,7 +3,7 @@
# SPDX-License-Identifier: BSD-3-Clause-Clear # SPDX-License-Identifier: BSD-3-Clause-Clear
# #
from ctypes import * from ctypes import c_uint64, c_uint32, Structure
class _Stat(Structure): class _Stat(Structure):

View File

@ -3,12 +3,26 @@
# SPDX-License-Identifier: BSD-3-Clause-Clear # SPDX-License-Identifier: BSD-3-Clause-Clear
# #
from ctypes import * from ctypes import (
POINTER,
c_void_p,
c_uint32,
c_char_p,
create_string_buffer,
memmove,
Structure,
CFUNCTYPE,
c_int,
c_uint,
c_uint64,
sizeof,
cast,
string_at,
)
from hashlib import md5 from hashlib import md5
from collections import defaultdict
from .io import Io, IoOps, IoDir from .io import Io, IoOps, IoDir
from .shared import OcfError from .shared import OcfErrorCode, Uuid
from ..ocf import OcfLib from ..ocf import OcfLib
from ..utils import print_buffer, Size as S from ..utils import print_buffer, Size as S
from .data import Data from .data import Data
@ -63,10 +77,13 @@ class Volume(Structure):
_uuid_ = {} _uuid_ = {}
def __init__(self, size: S, uuid=None): def __init__(self, size: S, uuid=None):
super().__init__()
self.size = size self.size = size
if uuid: if uuid:
if uuid in type(self)._uuid_: if uuid in type(self)._uuid_:
raise Exception("Volume with uuid {} already created".format(uuid)) raise Exception(
"Volume with uuid {} already created".format(uuid)
)
self.uuid = uuid self.uuid = uuid
else: else:
self.uuid = str(id(self)) self.uuid = str(id(self))
@ -76,6 +93,7 @@ class Volume(Structure):
self.data = create_string_buffer(int(self.size)) self.data = create_string_buffer(int(self.size))
self._storage = cast(self.data, c_void_p) self._storage = cast(self.data, c_void_p)
self.reset_stats() self.reset_stats()
self.opened = False
@classmethod @classmethod
def get_props(cls): def get_props(cls):
@ -117,7 +135,7 @@ class Volume(Structure):
@staticmethod @staticmethod
@VolumeOps.SUBMIT_FLUSH @VolumeOps.SUBMIT_FLUSH
def _submit_flush(flush): def _submit_flush(flush):
io_structure = cast(io, POINTER(Io)) io_structure = cast(flush, POINTER(Io))
volume = Volume.get_instance(io_structure.contents._volume) volume = Volume.get_instance(io_structure.contents._volume)
volume.submit_flush(io_structure) volume.submit_flush(io_structure)
@ -130,7 +148,7 @@ class Volume(Structure):
@staticmethod @staticmethod
@VolumeOps.SUBMIT_DISCARD @VolumeOps.SUBMIT_DISCARD
def _submit_discard(discard): def _submit_discard(discard):
io_structure = cast(io, POINTER(Io)) io_structure = cast(discard, POINTER(Io))
volume = Volume.get_instance(io_structure.contents._volume) volume = Volume.get_instance(io_structure.contents._volume)
volume.submit_discard(io_structure) volume.submit_discard(io_structure)
@ -143,11 +161,10 @@ class Volume(Structure):
@staticmethod @staticmethod
@CFUNCTYPE(c_int, c_void_p) @CFUNCTYPE(c_int, c_void_p)
def _open(ref): def _open(ref):
uuid_ptr = cast(OcfLib.getInstance().ocf_volume_get_uuid(ref), c_void_p) uuid_ptr = cast(
uuid_str = cast( OcfLib.getInstance().ocf_volume_get_uuid(ref), POINTER(Uuid)
OcfLib.getInstance().ocf_uuid_to_str_wrapper(uuid_ptr), c_char_p
) )
uuid = str(uuid_str.value, encoding="ascii") uuid = str(uuid_ptr.contents._data, encoding="ascii")
try: try:
volume = Volume.get_by_uuid(uuid) volume = Volume.get_by_uuid(uuid)
except: except:
@ -168,7 +185,7 @@ class Volume(Structure):
@staticmethod @staticmethod
@VolumeOps.GET_MAX_IO_SIZE @VolumeOps.GET_MAX_IO_SIZE
def _get_max_io_size(ref): def _get_max_io_size(ref):
return S.from_KiB(128) return Volume.get_instance(ref).get_max_io_size()
@staticmethod @staticmethod
@VolumeOps.GET_LENGTH @VolumeOps.GET_LENGTH
@ -178,32 +195,43 @@ class Volume(Structure):
@staticmethod @staticmethod
@IoOps.SET_DATA @IoOps.SET_DATA
def _io_set_data(io, data, offset): def _io_set_data(io, data, offset):
io_priv = cast(OcfLib.getInstance().ocf_io_get_priv(io), POINTER(VolumeIoPriv)) io_priv = cast(
OcfLib.getInstance().ocf_io_get_priv(io), POINTER(VolumeIoPriv)
)
data = Data.get_instance(data) data = Data.get_instance(data)
data.position = offset data.position = offset
io_priv.contents._data = cast(data, c_void_p) io_priv.contents._data = data.data
return 0 return 0
@staticmethod @staticmethod
@IoOps.GET_DATA @IoOps.GET_DATA
def _io_get_data(io): def _io_get_data(io):
io_priv = cast(OcfLib.getInstance().ocf_io_get_priv(io), POINTER(VolumeIoPriv)) io_priv = cast(
OcfLib.getInstance().ocf_io_get_priv(io), POINTER(VolumeIoPriv)
)
return io_priv.contents._data return io_priv.contents._data
def open(self): def open(self):
if self.opened:
return OcfErrorCode.OCF_ERR_NOT_OPEN_EXC
self.opened = True
return 0 return 0
def close(self): def close(self):
pass self.opened = False
def get_length(self): def get_length(self):
return self.size return self.size
def get_max_io_size(self):
return S.from_KiB(128)
def submit_flush(self, flush): def submit_flush(self, flush):
flush.contents._end(io, 0) flush.contents._end(flush, 0)
def submit_discard(self, discard): def submit_discard(self, discard):
discard.contents._end(io, 0) discard.contents._end(discard, 0)
def get_stats(self): def get_stats(self):
return self.stats return self.stats
@ -213,7 +241,7 @@ class Volume(Structure):
def submit_io(self, io): def submit_io(self, io):
try: try:
self.stats[io.contents._dir] += 1 self.stats[IoDir(io.contents._dir)] += 1
if io.contents._dir == IoDir.WRITE: if io.contents._dir == IoDir.WRITE:
src_ptr = cast(io.contents._ops.contents._get_data(io), c_void_p) src_ptr = cast(io.contents._ops.contents._get_data(io), c_void_p)
src = Data.get_instance(src_ptr.value) src = Data.get_instance(src_ptr.value)
@ -232,7 +260,9 @@ class Volume(Structure):
def dump_contents(self, stop_after_zeros=0, offset=0, size=0): def dump_contents(self, stop_after_zeros=0, offset=0, size=0):
if size == 0: if size == 0:
size = self.size size = self.size
print_buffer(self._storage + offset, size, stop_after_zeros=stop_after_zeros) print_buffer(
self._storage + offset, size, stop_after_zeros=stop_after_zeros
)
def md5(self): def md5(self):
m = md5() m = md5()
@ -258,3 +288,7 @@ class ErrorDevice(Volume):
def reset_stats(self): def reset_stats(self):
super().reset_stats() super().reset_stats()
self.stats["errors"] = {IoDir.WRITE: 0, IoDir.READ: 0} self.stats["errors"] = {IoDir.WRITE: 0, IoDir.READ: 0}
lib = OcfLib.getInstance()
lib.ocf_io_get_priv.restype = POINTER(VolumeIoPriv)

View File

@ -2,7 +2,8 @@
# Copyright(c) 2019 Intel Corporation # Copyright(c) 2019 Intel Corporation
# SPDX-License-Identifier: BSD-3-Clause-Clear # SPDX-License-Identifier: BSD-3-Clause-Clear
# #
from ctypes import *
from ctypes import string_at
def print_buffer(buf, length, offset=0, width=16, stop_after_zeros=0): def print_buffer(buf, length, offset=0, width=16, stop_after_zeros=0):
@ -32,11 +33,11 @@ def print_buffer(buf, length, offset=0, width=16, stop_after_zeros=0):
print("<{} zero bytes omitted>".format(zero_lines * width)) print("<{} zero bytes omitted>".format(zero_lines * width))
zero_lines = 0 zero_lines = 0
for x in cur_line: for byte in cur_line:
x = int(x) byte = int(byte)
byteline += "{:02X} ".format(x) byteline += "{:02X} ".format(byte)
if 31 < x < 126: if 31 < byte < 126:
char = chr(x) char = chr(byte)
else: else:
char = "." char = "."
asciiline += char asciiline += char

View File

@ -4,15 +4,15 @@
# #
import pytest import pytest
from ctypes import c_int
from pyocf.types.cache import Cache from pyocf.types.cache import Cache
from pyocf.types.core import Core from pyocf.types.core import Core
from pyocf.types.volume import Volume, ErrorDevice from pyocf.types.volume import Volume, ErrorDevice
from pyocf.types.data import Data from pyocf.types.data import Data
from pyocf.types.io import IoDir from pyocf.types.io import IoDir
from pyocf.types.queue import Queue
from pyocf.utils import Size as S from pyocf.utils import Size as S
from pyocf.types.shared import OcfError from pyocf.types.shared import OcfError, OcfCompletion
def test_ctx_fixture(pyocf_ctx): def test_ctx_fixture(pyocf_ctx):
@ -51,7 +51,6 @@ def test_simple_wt_write(pyocf_ctx):
cache = Cache.start_on_device(cache_device) cache = Cache.start_on_device(cache_device)
core = Core.using_device(core_device) core = Core.using_device(core_device)
queue = Queue(cache)
cache.add_core(core) cache.add_core(core)
cache_device.reset_stats() cache_device.reset_stats()
@ -61,9 +60,14 @@ def test_simple_wt_write(pyocf_ctx):
io = core.new_io() io = core.new_io()
io.set_data(write_data) io.set_data(write_data)
io.configure(20, write_data.size, IoDir.WRITE, 0, 0) io.configure(20, write_data.size, IoDir.WRITE, 0, 0)
io.set_queue(queue) io.set_queue(cache.get_default_queue())
io.submit()
cmpl = OcfCompletion([("err", c_int)])
io.callback = cmpl.callback
io.submit()
cmpl.wait()
assert cmpl.results["err"] == 0
assert cache_device.get_stats()[IoDir.WRITE] == 1 assert cache_device.get_stats()[IoDir.WRITE] == 1
stats = cache.get_stats() stats = cache.get_stats()
assert stats["req"]["wr_full_misses"]["value"] == 1 assert stats["req"]["wr_full_misses"]["value"] == 1
@ -82,7 +86,7 @@ def test_start_corrupted_metadata_lba(pyocf_ctx):
def test_load_cache_no_preexisting_data(pyocf_ctx): def test_load_cache_no_preexisting_data(pyocf_ctx):
cache_device = Volume(S.from_MiB(100)) cache_device = Volume(S.from_MiB(100))
with pytest.raises(OcfError, match="OCF_ERR_INVAL"): with pytest.raises(OcfError, match="OCF_ERR_START_CACHE_FAIL"):
cache = Cache.load_from_device(cache_device) cache = Cache.load_from_device(cache_device)

View File

@ -27,7 +27,7 @@ def pyocf_ctx():
yield c yield c
for cache in c.caches: for cache in c.caches:
cache.stop() cache.stop(flush=False)
c.exit() c.exit()
@ -39,4 +39,4 @@ def pyocf_ctx_log_buffer():
c.register_volume_type(ErrorDevice) c.register_volume_type(ErrorDevice)
yield logger yield logger
for cache in c.caches: for cache in c.caches:
cache.stop() cache.stop(flush=False)