Merge pull request #115 from imjfckm/leaks

Fix leaky tests
This commit is contained in:
Kamil Łepek 2019-04-16 10:20:56 +02:00 committed by GitHub
commit 00c434fb66
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 246 additions and 172 deletions

View File

@ -31,7 +31,7 @@
/** /**
* Minimum cache size in bytes * Minimum cache size in bytes
*/ */
#define OCF_CACHE_SIZE_MIN (100 * MiB) #define OCF_CACHE_SIZE_MIN (20 * MiB)
/** /**
* Size of cache name * Size of cache name
*/ */

View File

@ -111,7 +111,8 @@ class Cache:
pt_unaligned_io: bool = DEFAULT_PT_UNALIGNED_IO, pt_unaligned_io: bool = DEFAULT_PT_UNALIGNED_IO,
use_submit_fast: bool = DEFAULT_USE_SUBMIT_FAST, use_submit_fast: bool = DEFAULT_USE_SUBMIT_FAST,
): ):
self.device = None
self.started = False
self.owner = owner self.owner = owner
self.cache_line_size = cache_line_size self.cache_line_size = cache_line_size
@ -134,16 +135,17 @@ class Cache:
self.cache_handle = c_void_p() self.cache_handle = c_void_p()
self._as_parameter_ = self.cache_handle self._as_parameter_ = self.cache_handle
self.io_queues = [] self.io_queues = []
self.device = None
def start_cache( def start_cache(
self, default_io_queue: Queue = None, mngt_queue: Queue = None, self, default_io_queue: Queue = None, mngt_queue: Queue = None
): ):
status = self.owner.lib.ocf_mngt_cache_start( status = self.owner.lib.ocf_mngt_cache_start(
self.owner.ctx_handle, byref(self.cache_handle), byref(self.cfg) self.owner.ctx_handle, byref(self.cache_handle), byref(self.cfg)
) )
if status: if status:
raise OcfError("Creating cache instance failed", status) raise OcfError("Creating cache instance failed", status)
self.owner.caches += [self] self.owner.caches.append(self)
self.mngt_queue = mngt_queue or Queue( self.mngt_queue = mngt_queue or Queue(
self, "mgmt-{}".format(self.get_name()), mngt_queue=True self, "mgmt-{}".format(self.get_name()), mngt_queue=True
@ -152,7 +154,9 @@ class Cache:
if default_io_queue: if default_io_queue:
self.io_queues += [default_io_queue] self.io_queues += [default_io_queue]
else: else:
self.io_queues += [Queue(self, "default-io-{}".format(self.get_name()))] self.io_queues += [
Queue(self, "default-io-{}".format(self.get_name()))
]
status = self.owner.lib.ocf_mngt_cache_set_mngt_queue( status = self.owner.lib.ocf_mngt_cache_set_mngt_queue(
self, self.mngt_queue self, self.mngt_queue
@ -160,6 +164,8 @@ class Cache:
if status: if status:
raise OcfError("Error setting management queue", status) raise OcfError("Error setting management queue", status)
self.started = True
def change_cache_mode(self, cache_mode: CacheMode): def change_cache_mode(self, cache_mode: CacheMode):
self.get_and_write_lock() self.get_and_write_lock()
self.owner.lib.ocf_mngt_cache_set_mode(self.cache_handle, cache_mode) self.owner.lib.ocf_mngt_cache_set_mode(self.cache_handle, cache_mode)
@ -168,6 +174,7 @@ class Cache:
def configure_device( def configure_device(
self, device, force=False, perform_test=False, cache_line_size=None self, device, force=False, perform_test=False, cache_line_size=None
): ):
self.device = device
self.device_name = device.uuid self.device_name = device.uuid
self.dev_cfg = CacheDeviceConfig( self.dev_cfg = CacheDeviceConfig(
_uuid=Uuid( _uuid=Uuid(
@ -233,15 +240,13 @@ class Cache:
try: try:
c.attach_device(device, force=True) c.attach_device(device, force=True)
except: except:
c.stop(flush=False) c.stop()
raise raise
return c return c
def _get_and_lock(self, read=True): def _get_and_lock(self, read=True):
status = self.owner.lib.ocf_mngt_cache_get(self.cache_handle) self.get()
if status:
raise OcfError("Couldn't get cache instance", status)
if read: if read:
status = self.owner.lib.ocf_mngt_cache_read_lock(self.cache_handle) status = self.owner.lib.ocf_mngt_cache_read_lock(self.cache_handle)
@ -249,7 +254,7 @@ class Cache:
status = self.owner.lib.ocf_mngt_cache_lock(self.cache_handle) status = self.owner.lib.ocf_mngt_cache_lock(self.cache_handle)
if status: if status:
self.owner.lib.ocf_mngt_cache_put(self.cache_handle) self.put()
raise OcfError("Couldn't lock cache instance", status) raise OcfError("Couldn't lock cache instance", status)
def _put_and_unlock(self, read=True): def _put_and_unlock(self, read=True):
@ -258,8 +263,16 @@ class Cache:
else: else:
self.owner.lib.ocf_mngt_cache_unlock(self.cache_handle) self.owner.lib.ocf_mngt_cache_unlock(self.cache_handle)
self.put()
def put(self):
self.owner.lib.ocf_mngt_cache_put(self.cache_handle) self.owner.lib.ocf_mngt_cache_put(self.cache_handle)
def get(self):
status = self.owner.lib.ocf_mngt_cache_get(self.cache_handle)
if status:
raise OcfError("Couldn't get cache instance", status)
def get_and_read_lock(self): def get_and_read_lock(self):
self._get_and_lock(True) self._get_and_lock(True)
@ -301,16 +314,9 @@ class Cache:
def remove_core(self, core: Core): def remove_core(self, core: Core):
self.get_and_write_lock() self.get_and_write_lock()
c = OcfCompletion( c = OcfCompletion([("priv", c_void_p), ("error", c_int)])
[
("priv", c_void_p),
("error", c_int),
]
)
self.owner.lib.ocf_mngt_cache_remove_core( self.owner.lib.ocf_mngt_cache_remove_core(core.handle, c, None)
core.handle, c, None
)
c.wait() c.wait()
if c.results["error"]: if c.results["error"]:
@ -393,9 +399,9 @@ class Cache:
return self.io_queues[0] return self.io_queues[0]
def stop(self, flush: bool = True): def stop(self):
if flush: if not self.started:
self.flush() raise Exception("Already stopped!")
self.get_and_write_lock() self.get_and_write_lock()
@ -410,7 +416,15 @@ class Cache:
self.put_and_write_unlock() self.put_and_write_unlock()
raise OcfError("Failed stopping cache", c.results["error"]) raise OcfError("Failed stopping cache", c.results["error"])
self.mngt_queue.stop()
self.mngt_queue = None
del self.io_queues[:]
self.device = None
self.started = False
self.put_and_write_unlock() self.put_and_write_unlock()
self.put()
self.owner.caches.remove(self) self.owner.caches.remove(self)
def flush(self): def flush(self):
@ -437,6 +451,7 @@ class Cache:
finally: finally:
self.put_and_read_unlock() self.put_and_read_unlock()
lib = OcfLib.getInstance() lib = OcfLib.getInstance()
lib.ocf_mngt_cache_remove_core.argtypes = [c_void_p, c_void_p, c_void_p] lib.ocf_mngt_cache_remove_core.argtypes = [c_void_p, c_void_p, c_void_p]
lib.ocf_mngt_cache_add_core.argtypes = [c_void_p, c_void_p, c_void_p, c_void_p] lib.ocf_mngt_cache_add_core.argtypes = [c_void_p, c_void_p, c_void_p, c_void_p]

View File

@ -4,15 +4,15 @@
# #
from ctypes import c_void_p, Structure, c_char_p, cast, pointer, byref from ctypes import c_void_p, Structure, c_char_p, cast, pointer, byref
from enum import IntEnum
from .logger import LoggerOps, Logger from .logger import LoggerOps, Logger
from .data import DataOps, Data from .data import DataOps, Data
from .queue import Queue
from .cleaner import CleanerOps, Cleaner from .cleaner import CleanerOps, Cleaner
from .metadata_updater import MetadataUpdaterOps, MetadataUpdater from .metadata_updater import MetadataUpdaterOps, MetadataUpdater
from .shared import OcfError from .shared import OcfError
from ..ocf import OcfLib from ..ocf import OcfLib
from .queue import Queue
from .volume import Volume
class OcfCtxOps(Structure): class OcfCtxOps(Structure):
@ -56,22 +56,53 @@ class OcfCtx:
raise OcfError("Context initialization failed", result) raise OcfError("Context initialization failed", result)
def register_volume_type(self, volume_type): def register_volume_type(self, volume_type):
self.volume_types[self.volume_types_count] = volume_type.get_props() self.volume_types[self.volume_types_count] = volume_type
volume_type.type_id = self.volume_types_count volume_type.type_id = self.volume_types_count
volume_type.owner = self volume_type.owner = self
result = self.lib.ocf_ctx_register_volume_type( result = self.lib.ocf_ctx_register_volume_type(
self.ctx_handle, self.ctx_handle,
self.volume_types_count, self.volume_types_count,
byref(self.volume_types[self.volume_types_count]), byref(self.volume_types[self.volume_types_count].get_props()),
) )
if result != 0: if result != 0:
raise OcfError("Data object registration failed", result) raise OcfError("Volume type registration failed", result)
self.volume_types_count += 1 self.volume_types_count += 1
def unregister_volume_type(self, vol_type):
if not vol_type.type_id:
raise Exception("Already unregistered")
result = self.lib.ocf_ctx_unregister_volume_type(
self.ctx_handle, vol_type.type_id
)
if result != 0:
raise OcfError("Volume type unregistration failed", result)
del self.volume_types[vol_type.type_id]
def cleanup_volume_types(self):
for k, vol_type in list(self.volume_types.items()):
if vol_type:
self.unregister_volume_type(vol_type)
def exit(self): def exit(self):
self.lib.ocf_ctx_exit(self.ctx_handle) self.cleanup_volume_types()
result = self.lib.ocf_ctx_exit(self.ctx_handle)
if result != 0:
raise OcfError("Failed quitting OcfCtx", result)
self.cfg = None
self.logger = None
self.data = None
self.mu = None
self.cleaner = None
Queue._instances_ = {}
Volume._instances_ = {}
Data._instances_ = {}
Logger._instances_ = {}
def get_default_ctx(logger): def get_default_ctx(logger):

View File

@ -11,16 +11,16 @@ from ctypes import (
create_string_buffer, create_string_buffer,
cast, cast,
memset, memset,
c_char_p,
string_at, string_at,
Structure, Structure,
c_int, c_int,
memmove, memmove,
byref,
) )
from enum import IntEnum from enum import IntEnum
from hashlib import md5 from hashlib import md5
import weakref
from .shared import SharedOcfObject
from ..utils import print_buffer from ..utils import print_buffer
@ -55,23 +55,24 @@ class DataOps(Structure):
] ]
class Data(SharedOcfObject): class Data:
PAGE_SIZE = 4096 PAGE_SIZE = 4096
_instances_ = {} _instances_ = {}
_ocf_instances_ = []
_fields_ = [("data", c_void_p)]
def __init__(self, byte_count: int): def __init__(self, byte_count: int):
self.size = byte_count self.size = int(byte_count)
self.position = 0 self.position = 0
self.buffer = create_string_buffer(int(self.size)) self.buffer = create_string_buffer(int(self.size))
self.data = cast(self.buffer, c_void_p) self.handle = cast(byref(self.buffer), c_void_p)
memset(self.data, 0, self.size) memset(self.handle, 0, self.size)
type(self)._instances_[self.data] = self type(self)._instances_[self.handle.value] = weakref.ref(self)
self._as_parameter_ = self.data self._as_parameter_ = self.handle
super().__init__() @classmethod
def get_instance(cls, ref):
return cls._instances_[ref]()
@classmethod @classmethod
def get_ops(cls): def get_ops(cls):
@ -96,7 +97,7 @@ class Data(SharedOcfObject):
def from_bytes(cls, source: bytes): def from_bytes(cls, source: bytes):
d = cls(len(source)) d = cls(len(source))
memmove(d.data, cast(source, c_void_p), len(source)) memmove(d.handle, cast(source, c_void_p), len(source))
return d return d
@ -104,31 +105,25 @@ class Data(SharedOcfObject):
def from_string(cls, source: str, encoding: str = "ascii"): def from_string(cls, source: str, encoding: str = "ascii"):
return cls.from_bytes(bytes(source, encoding)) return cls.from_bytes(bytes(source, encoding))
def __str__(self):
char_array = cast(self.data, c_char_p)
return str(char_array.value, "ascii")
def __wstr__(self):
char_array = cast(self.data, c_wchar_p)
return str(char_array.value, "utf-8")
def set_data(self, contents): def set_data(self, contents):
if len(contents) > self.size: if len(contents) > self.size:
raise Exception("Data too big to fit into allocated buffer") raise Exception("Data too big to fit into allocated buffer")
memmove(self.data, cast(contents, c_void_p), len(contents)) memmove(self.handle, cast(contents, c_void_p), len(contents))
self.position = 0 self.position = 0
@staticmethod @staticmethod
@DataOps.ALLOC @DataOps.ALLOC
def _alloc(pages): def _alloc(pages):
data = Data.pages(pages) data = Data.pages(pages)
return data.data Data._ocf_instances_.append(data)
return data.handle.value
@staticmethod @staticmethod
@DataOps.FREE @DataOps.FREE
def _free(data): def _free(ref):
Data.del_object(data) Data._ocf_instances_.remove(Data.get_instance(ref))
@staticmethod @staticmethod
@DataOps.MLOCK @DataOps.MLOCK
@ -162,9 +157,9 @@ class Data(SharedOcfObject):
@staticmethod @staticmethod
@DataOps.COPY @DataOps.COPY
def _copy(dst, src, end, start, size): def _copy(dst, src, skip, seek, size):
return Data.get_instance(dst).copy( return Data.get_instance(dst).copy(
Data.get_instance(src), end, start, size Data.get_instance(src), skip, seek, size
) )
@staticmethod @staticmethod
@ -174,12 +169,12 @@ class Data(SharedOcfObject):
def read(self, dst, size): def read(self, dst, size):
to_read = min(self.size - self.position, size) to_read = min(self.size - self.position, size)
memmove(dst, self.data + self.position, to_read) memmove(dst, self.handle.value + self.position, to_read)
return to_read return to_read
def write(self, src, size): def write(self, src, size):
to_write = min(self.size - self.position, size) to_write = min(self.size - self.position, size)
memmove(self.data + self.position, src, to_write) memmove(self.handle.value + self.position, src, to_write)
return to_write return to_write
def mlock(self): def mlock(self):
@ -190,7 +185,7 @@ class Data(SharedOcfObject):
def zero(self, size): def zero(self, size):
to_zero = min(self.size - self.position, size) to_zero = min(self.size - self.position, size)
memset(self.data + self.position, 0, to_zero) memset(self.handle.value + self.position, 0, to_zero)
return to_zero return to_zero
def seek(self, seek, size): def seek(self, seek, size):
@ -203,8 +198,11 @@ class Data(SharedOcfObject):
return to_move return to_move
def copy(self, src, end, start, size): def copy(self, src, skip, seek, size):
return size to_write = min(self.size - skip, size, src.size - seek)
memmove(self.handle.value + skip, src.handle.value + seek, to_write)
return to_write
def secure_erase(self): def secure_erase(self):
pass pass
@ -214,5 +212,5 @@ class Data(SharedOcfObject):
def md5(self): def md5(self):
m = md5() m = md5()
m.update(string_at(self.data, self.size)) m.update(string_at(self.handle, self.size))
return m.hexdigest() return m.hexdigest()

View File

@ -14,7 +14,7 @@ from ctypes import (
byref, byref,
cast, cast,
) )
from enum import IntEnum, auto from enum import IntEnum
from ..ocf import OcfLib from ..ocf import OcfLib
from .data import Data from .data import Data
@ -113,7 +113,6 @@ class Io(Structure):
OcfLib.getInstance().ocf_io_set_data_wrapper(byref(self), data, 0) OcfLib.getInstance().ocf_io_set_data_wrapper(byref(self), data, 0)
def set_queue(self, queue: Queue): def set_queue(self, queue: Queue):
self.queue = queue
OcfLib.getInstance().ocf_io_set_queue_wrapper(byref(self), queue.handle) OcfLib.getInstance().ocf_io_set_queue_wrapper(byref(self), queue.handle)

View File

@ -6,7 +6,6 @@
from ctypes import ( from ctypes import (
c_void_p, c_void_p,
Structure, Structure,
c_void_p,
c_char_p, c_char_p,
c_uint, c_uint,
c_int, c_int,
@ -17,6 +16,7 @@ from ctypes import (
from enum import IntEnum from enum import IntEnum
import logging import logging
from io import StringIO from io import StringIO
import weakref
from ..ocf import OcfLib from ..ocf import OcfLib
@ -81,7 +81,7 @@ class Logger(Structure):
) )
self.priv = LoggerPriv(_log=self._log) self.priv = LoggerPriv(_log=self._log)
self._as_parameter_ = cast(pointer(self.priv), c_void_p).value self._as_parameter_ = cast(pointer(self.priv), c_void_p).value
self._instances_[self._as_parameter_] = self self._instances_[self._as_parameter_] = weakref.ref(self)
def get_ops(self): def get_ops(self):
return self.ops return self.ops
@ -92,7 +92,7 @@ class Logger(Structure):
@classmethod @classmethod
def get_instance(cls, ctx: int): def get_instance(cls, ctx: int):
priv = OcfLib.getInstance().ocf_logger_get_priv(ctx) priv = OcfLib.getInstance().ocf_logger_get_priv(ctx)
return cls._instances_[priv] return cls._instances_[priv]()
@staticmethod @staticmethod
@LoggerOps.LOG @LoggerOps.LOG

View File

@ -4,7 +4,8 @@
# #
from ctypes import c_void_p, CFUNCTYPE, Structure, byref from ctypes import c_void_p, CFUNCTYPE, Structure, byref
from threading import Thread, Condition, Lock from threading import Thread, Condition, Event
import weakref
from ..ocf import OcfLib from ..ocf import OcfLib
from .shared import OcfError from .shared import OcfError
@ -13,7 +14,6 @@ from .shared import OcfError
class QueueOps(Structure): class QueueOps(Structure):
KICK = CFUNCTYPE(None, c_void_p) KICK = CFUNCTYPE(None, c_void_p)
KICK_SYNC = CFUNCTYPE(None, c_void_p) KICK_SYNC = CFUNCTYPE(None, c_void_p)
KICK = CFUNCTYPE(None, c_void_p)
STOP = CFUNCTYPE(None, c_void_p) STOP = CFUNCTYPE(None, c_void_p)
_fields_ = [("kick", KICK), ("kick_sync", KICK_SYNC), ("stop", STOP)] _fields_ = [("kick", KICK), ("kick_sync", KICK_SYNC), ("stop", STOP)]
@ -23,54 +23,55 @@ class Queue:
pass pass
class Queue: def io_queue_run(*, queue: Queue, kick: Condition, stop: Event):
_instances_ = {}
@staticmethod
def io_queue_run(*, queue: Queue, kick: Condition):
def wait_predicate(): def wait_predicate():
return queue.stop or OcfLib.getInstance().ocf_queue_pending_io(queue) return stop.is_set() or OcfLib.getInstance().ocf_queue_pending_io(queue)
while True: while True:
with kick: with kick:
kick.wait_for(wait_predicate) kick.wait_for(wait_predicate)
queue.owner.lib.ocf_queue_run(queue) OcfLib.getInstance().ocf_queue_run(queue)
if queue.stop and not queue.owner.lib.ocf_queue_pending_io(queue): if stop.is_set() and not OcfLib.getInstance().ocf_queue_pending_io(queue):
break break
class Queue:
_instances_ = {}
def __init__(self, cache, name, mngt_queue: bool = False): def __init__(self, cache, name, mngt_queue: bool = False):
self.owner = cache.owner
self.ops = QueueOps(kick=type(self)._kick, stop=type(self)._stop) self.ops = QueueOps(kick=type(self)._kick, stop=type(self)._stop)
self.handle = c_void_p() self.handle = c_void_p()
status = self.owner.lib.ocf_queue_create( status = OcfLib.getInstance().ocf_queue_create(
cache.cache_handle, byref(self.handle), byref(self.ops) cache.cache_handle, byref(self.handle), byref(self.ops)
) )
if status: if status:
raise OcfError("Couldn't create queue object", status) raise OcfError("Couldn't create queue object", status)
Queue._instances_[self.handle.value] = self Queue._instances_[self.handle.value] = weakref.ref(self)
self._as_parameter_ = self.handle self._as_parameter_ = self.handle
self.stop_lock = Lock() self.stop_event = Event()
self.stop = False self.kick_condition = Condition()
self.kick_condition = Condition(self.stop_lock)
self.thread = Thread( self.thread = Thread(
group=None, group=None,
target=Queue.io_queue_run, target=io_queue_run,
name=name, name=name,
kwargs={"queue": self, "kick": self.kick_condition}, kwargs={
daemon=True, "queue": self,
"kick": self.kick_condition,
"stop": self.stop_event,
},
) )
self.thread.start() self.thread.start()
self.mngt_queue = mngt_queue self.mngt_queue = mngt_queue
@classmethod @classmethod
def get_instance(cls, ref): def get_instance(cls, ref):
return cls._instances_[ref] return cls._instances_[ref]()
@staticmethod @staticmethod
@QueueOps.KICK_SYNC @QueueOps.KICK_SYNC
@ -88,7 +89,7 @@ class Queue:
Queue.get_instance(ref).stop() Queue.get_instance(ref).stop()
def kick_sync(self): def kick_sync(self):
self.owner.lib.ocf_queue_run(self.handle) OcfLib.getInstance().ocf_queue_run(self.handle)
def kick(self): def kick(self):
with self.kick_condition: with self.kick_condition:
@ -96,9 +97,12 @@ class Queue:
def stop(self): def stop(self):
with self.kick_condition: with self.kick_condition:
self.stop = True self.stop_event.set()
self.kick_condition.notify_all() self.kick_condition.notify_all()
self.thread.join() self.thread.join()
if self.mngt_queue: if self.mngt_queue:
self.owner.lib.ocf_queue_put(self) OcfLib.getInstance().ocf_queue_put(self)
self.thread = None
self.ops = None

View File

@ -98,7 +98,7 @@ class SharedOcfObject(Structure):
try: try:
return cls._instances_[ref] return cls._instances_[ref]
except: except:
logging.get_logger("pyocf").error( logging.getLogger("pyocf").error(
"OcfSharedObject corruption. wanted: {} instances: {}".format( "OcfSharedObject corruption. wanted: {} instances: {}".format(
ref, cls._instances_ ref, cls._instances_
) )
@ -130,8 +130,3 @@ class CacheLines(S):
def __int__(self): def __int__(self):
return int(self.bytes / self.line_size) return int(self.bytes / self.line_size)
def __str__(self):
return "{} ({})".format(int(self), super().__str__())
__repr__ = __str__

View File

@ -10,6 +10,7 @@ from ctypes import (
c_char_p, c_char_p,
create_string_buffer, create_string_buffer,
memmove, memmove,
memset,
Structure, Structure,
CFUNCTYPE, CFUNCTYPE,
c_int, c_int,
@ -20,6 +21,7 @@ from ctypes import (
string_at, string_at,
) )
from hashlib import md5 from hashlib import md5
import weakref
from .io import Io, IoOps, IoDir from .io import Io, IoOps, IoDir
from .shared import OcfErrorCode, Uuid from .shared import OcfErrorCode, Uuid
@ -76,6 +78,8 @@ class Volume(Structure):
_instances_ = {} _instances_ = {}
_uuid_ = {} _uuid_ = {}
props = None
def __init__(self, size: S, uuid=None): def __init__(self, size: S, uuid=None):
super().__init__() super().__init__()
self.size = size self.size = size
@ -88,16 +92,18 @@ class Volume(Structure):
else: else:
self.uuid = str(id(self)) self.uuid = str(id(self))
type(self)._uuid_[self.uuid] = self type(self)._uuid_[self.uuid] = weakref.ref(self)
self.data = create_string_buffer(int(self.size)) self.data = create_string_buffer(int(self.size))
self._storage = cast(self.data, c_void_p) self._storage = cast(self.data, c_void_p)
self.reset_stats() self.reset_stats()
self.opened = False self.opened = False
@classmethod @classmethod
def get_props(cls): def get_props(cls):
return VolumeProperties( if not cls.props:
cls.props = VolumeProperties(
_name=str(cls.__name__).encode("ascii"), _name=str(cls.__name__).encode("ascii"),
_io_priv_size=sizeof(VolumeIoPriv), _io_priv_size=sizeof(VolumeIoPriv),
_volume_priv_size=0, _volume_priv_size=0,
@ -113,16 +119,23 @@ class Volume(Structure):
_get_max_io_size=cls._get_max_io_size, _get_max_io_size=cls._get_max_io_size,
_get_length=cls._get_length, _get_length=cls._get_length,
), ),
_io_ops=IoOps(_set_data=cls._io_set_data, _get_data=cls._io_get_data), _io_ops=IoOps(
_set_data=cls._io_set_data, _get_data=cls._io_get_data
),
) )
return cls.props
@classmethod @classmethod
def get_instance(cls, ref): def get_instance(cls, ref):
return cls._instances_[ref] instance = cls._instances_[ref]()
if instance is None:
print("tried to access {} but it's gone".format(ref))
return instance
@classmethod @classmethod
def get_by_uuid(cls, uuid): def get_by_uuid(cls, uuid):
return cls._uuid_[uuid] return cls._uuid_[uuid]()
@staticmethod @staticmethod
@VolumeOps.SUBMIT_IO @VolumeOps.SUBMIT_IO
@ -172,15 +185,19 @@ class Volume(Structure):
print("{}".format(Volume._uuid_)) print("{}".format(Volume._uuid_))
return -1 return -1
type(volume)._instances_[ref] = volume if volume.opened:
return OcfErrorCode.OCF_ERR_NOT_OPEN_EXC
Volume._instances_[ref] = weakref.ref(volume)
return volume.open() return volume.open()
@staticmethod @staticmethod
@VolumeOps.CLOSE @VolumeOps.CLOSE
def _close(ref): def _close(ref):
Volume.get_instance(ref).close() volume = Volume.get_instance(ref)
del Volume._instances_[ref] volume.close()
volume.opened = False
@staticmethod @staticmethod
@VolumeOps.GET_MAX_IO_SIZE @VolumeOps.GET_MAX_IO_SIZE
@ -200,7 +217,8 @@ class Volume(Structure):
) )
data = Data.get_instance(data) data = Data.get_instance(data)
data.position = offset data.position = offset
io_priv.contents._data = data.data io_priv.contents._data = data.handle
return 0 return 0
@staticmethod @staticmethod
@ -212,14 +230,11 @@ class Volume(Structure):
return io_priv.contents._data return io_priv.contents._data
def open(self): def open(self):
if self.opened:
return OcfErrorCode.OCF_ERR_NOT_OPEN_EXC
self.opened = True self.opened = True
return 0 return 0
def close(self): def close(self):
self.opened = False pass
def get_length(self): def get_length(self):
return self.size return self.size
@ -231,7 +246,13 @@ class Volume(Structure):
flush.contents._end(flush, 0) flush.contents._end(flush, 0)
def submit_discard(self, discard): def submit_discard(self, discard):
try:
dst = self._storage + discard.contents._addr
memset(dst, discard.contents._bytes)
discard.contents._end(discard, 0) discard.contents._end(discard, 0)
except:
discard.contents._end(discard, -5)
def get_stats(self): def get_stats(self):
return self.stats return self.stats
@ -242,6 +263,7 @@ class Volume(Structure):
def submit_io(self, io): def submit_io(self, io):
try: try:
self.stats[IoDir(io.contents._dir)] += 1 self.stats[IoDir(io.contents._dir)] += 1
if io.contents._dir == IoDir.WRITE: if io.contents._dir == IoDir.WRITE:
src_ptr = cast(io.contents._ops.contents._get_data(io), c_void_p) src_ptr = cast(io.contents._ops.contents._get_data(io), c_void_p)
src = Data.get_instance(src_ptr.value) src = Data.get_instance(src_ptr.value)
@ -259,9 +281,12 @@ class Volume(Structure):
def dump_contents(self, stop_after_zeros=0, offset=0, size=0): def dump_contents(self, stop_after_zeros=0, offset=0, size=0):
if size == 0: if size == 0:
size = self.size size = int(self.size) - int(offset)
print_buffer( print_buffer(
self._storage + offset, size, stop_after_zeros=stop_after_zeros self._storage,
int(size),
offset=int(offset),
stop_after_zeros=int(stop_after_zeros),
) )
def md5(self): def md5(self):

View File

@ -42,7 +42,7 @@ def print_buffer(buf, length, offset=0, width=16, stop_after_zeros=0):
char = "." char = "."
asciiline += char asciiline += char
print("{:#08X}\t{}\t{}".format(addr, byteline, asciiline)) print("0x{:08X}\t{}\t{}".format(addr, byteline, asciiline))
whole_buffer_empty = False whole_buffer_empty = False
if whole_buffer_empty: if whole_buffer_empty:

View File

@ -1,5 +0,0 @@
[pytest]
log_cli = 1
log_cli_level = INFO
log_cli_format = %(asctime)s [%(levelname)8s] %(message)s (%(filename)s:%(lineno)s)
log_cli_date_format=%Y-%m-%d %H:%M:%S

View File

@ -20,8 +20,8 @@ def test_ctx_fixture(pyocf_ctx):
def test_simple_wt_write(pyocf_ctx): def test_simple_wt_write(pyocf_ctx):
cache_device = Volume(S.from_MiB(100)) cache_device = Volume(S.from_MiB(30))
core_device = Volume(S.from_MiB(200)) core_device = Volume(S.from_MiB(30))
cache = Cache.start_on_device(cache_device) cache = Cache.start_on_device(cache_device)
core = Core.using_device(core_device) core = Core.using_device(core_device)
@ -49,17 +49,18 @@ def test_simple_wt_write(pyocf_ctx):
assert stats["usage"]["occupancy"]["value"] == 1 assert stats["usage"]["occupancy"]["value"] == 1
assert core.exp_obj_md5() == core_device.md5() assert core.exp_obj_md5() == core_device.md5()
cache.stop()
def test_start_corrupted_metadata_lba(pyocf_ctx): def test_start_corrupted_metadata_lba(pyocf_ctx):
cache_device = ErrorDevice(S.from_MiB(100), error_sectors=set([0])) cache_device = ErrorDevice(S.from_MiB(30), error_sectors=set([0]))
with pytest.raises(OcfError, match="OCF_ERR_WRITE_CACHE"): with pytest.raises(OcfError, match="OCF_ERR_WRITE_CACHE"):
cache = Cache.start_on_device(cache_device) cache = Cache.start_on_device(cache_device)
def test_load_cache_no_preexisting_data(pyocf_ctx): def test_load_cache_no_preexisting_data(pyocf_ctx):
cache_device = Volume(S.from_MiB(100)) cache_device = Volume(S.from_MiB(30))
with pytest.raises(OcfError, match="OCF_ERR_START_CACHE_FAIL"): with pytest.raises(OcfError, match="OCF_ERR_START_CACHE_FAIL"):
cache = Cache.load_from_device(cache_device) cache = Cache.load_from_device(cache_device)
@ -68,7 +69,7 @@ def test_load_cache_no_preexisting_data(pyocf_ctx):
# TODO: Find out why this fails and fix # TODO: Find out why this fails and fix
@pytest.mark.xfail @pytest.mark.xfail
def test_load_cache(pyocf_ctx): def test_load_cache(pyocf_ctx):
cache_device = Volume(S.from_MiB(100)) cache_device = Volume(S.from_MiB(30))
cache = Cache.start_on_device(cache_device) cache = Cache.start_on_device(cache_device)
cache.stop() cache.stop()

View File

@ -5,14 +5,12 @@
import os import os
import sys import sys
import pytest import pytest
sys.path.append(os.path.join(os.path.dirname(__file__), os.path.pardir)) sys.path.append(os.path.join(os.path.dirname(__file__), os.path.pardir))
from pyocf.types.logger import LogLevel, DefaultLogger, BufferLogger from pyocf.types.logger import LogLevel, DefaultLogger, BufferLogger
from pyocf.types.volume import Volume, ErrorDevice from pyocf.types.volume import Volume, ErrorDevice
from pyocf.types.ctx import get_default_ctx from pyocf.types.ctx import get_default_ctx
from pyocf.ocf import OcfLib
def pytest_configure(config): def pytest_configure(config):
@ -24,10 +22,9 @@ def pyocf_ctx():
c = get_default_ctx(DefaultLogger(LogLevel.WARN)) c = get_default_ctx(DefaultLogger(LogLevel.WARN))
c.register_volume_type(Volume) c.register_volume_type(Volume)
c.register_volume_type(ErrorDevice) c.register_volume_type(ErrorDevice)
yield c yield c
for cache in c.caches: for cache in c.caches[:]:
cache.stop(flush=False) cache.stop()
c.exit() c.exit()
@ -39,4 +36,4 @@ def pyocf_ctx_log_buffer():
c.register_volume_type(ErrorDevice) c.register_volume_type(ErrorDevice)
yield logger yield logger
for cache in c.caches: for cache in c.caches:
cache.stop(flush=False) cache.stop()

View File

@ -19,8 +19,10 @@ from pyocf.types.shared import OcfError, OcfCompletion, CacheLineSize
@pytest.mark.parametrize("cls", CacheLineSize) @pytest.mark.parametrize("cls", CacheLineSize)
def test_adding_core(pyocf_ctx, cache_mode, cls): def test_adding_core(pyocf_ctx, cache_mode, cls):
# Start cache device # Start cache device
cache_device = Volume(S.from_MiB(100)) cache_device = Volume(S.from_MiB(30))
cache = Cache.start_on_device(cache_device, cache_mode=cache_mode, cache_line_size=cls) cache = Cache.start_on_device(
cache_device, cache_mode=cache_mode, cache_line_size=cls
)
# Create core device # Create core device
core_device = Volume(S.from_MiB(10)) core_device = Volume(S.from_MiB(10))
@ -42,8 +44,10 @@ def test_adding_core(pyocf_ctx, cache_mode, cls):
@pytest.mark.parametrize("cls", CacheLineSize) @pytest.mark.parametrize("cls", CacheLineSize)
def test_removing_core(pyocf_ctx, cache_mode, cls): def test_removing_core(pyocf_ctx, cache_mode, cls):
# Start cache device # Start cache device
cache_device = Volume(S.from_MiB(100)) cache_device = Volume(S.from_MiB(30))
cache = Cache.start_on_device(cache_device, cache_mode=cache_mode, cache_line_size=cls) cache = Cache.start_on_device(
cache_device, cache_mode=cache_mode, cache_line_size=cls
)
# Create core device # Create core device
core_device = Volume(S.from_MiB(10)) core_device = Volume(S.from_MiB(10))
@ -60,9 +64,9 @@ def test_removing_core(pyocf_ctx, cache_mode, cls):
assert stats["conf"]["core_count"] == 0 assert stats["conf"]["core_count"] == 0
def test_100add_remove(pyocf_ctx): def test_30add_remove(pyocf_ctx):
# Start cache device # Start cache device
cache_device = Volume(S.from_MiB(100)) cache_device = Volume(S.from_MiB(30))
cache = Cache.start_on_device(cache_device) cache = Cache.start_on_device(cache_device)
# Create core device # Create core device
@ -71,7 +75,7 @@ def test_100add_remove(pyocf_ctx):
# Add and remove core device in a loop 100 times # Add and remove core device in a loop 100 times
# Check statistics after every operation # Check statistics after every operation
for i in range(0, 100): for i in range(0, 30):
cache.add_core(core) cache.add_core(core)
stats = cache.get_stats() stats = cache.get_stats()
assert stats["conf"]["core_count"] == 1 assert stats["conf"]["core_count"] == 1
@ -83,7 +87,7 @@ def test_100add_remove(pyocf_ctx):
def test_10add_remove_with_io(pyocf_ctx): def test_10add_remove_with_io(pyocf_ctx):
# Start cache device # Start cache device
cache_device = Volume(S.from_MiB(100)) cache_device = Volume(S.from_MiB(30))
cache = Cache.start_on_device(cache_device) cache = Cache.start_on_device(cache_device)
# Create core device # Create core device
@ -112,12 +116,12 @@ def test_10add_remove_with_io(pyocf_ctx):
assert stats["conf"]["core_count"] == 0 assert stats["conf"]["core_count"] == 0
def test_add_remove_50core(pyocf_ctx): def test_add_remove_30core(pyocf_ctx):
# Start cache device # Start cache device
cache_device = Volume(S.from_MiB(100)) cache_device = Volume(S.from_MiB(30))
cache = Cache.start_on_device(cache_device) cache = Cache.start_on_device(cache_device)
core_devices = [] core_devices = []
core_amount = 50 core_amount = 30
# Add 50 cores and check stats after each addition # Add 50 cores and check stats after each addition
for i in range(0, core_amount): for i in range(0, core_amount):
@ -143,11 +147,11 @@ def test_adding_to_random_cache(pyocf_ctx):
cache_devices = [] cache_devices = []
core_devices = {} core_devices = {}
cache_amount = 5 cache_amount = 5
core_amount = 50 core_amount = 30
# Create 5 cache devices # Create 5 cache devices
for i in range(0, cache_amount): for i in range(0, cache_amount):
cache_device = Volume(S.from_MiB(100)) cache_device = Volume(S.from_MiB(30))
cache = Cache.start_on_device(cache_device) cache = Cache.start_on_device(cache_device)
cache_devices.append(cache) cache_devices.append(cache)
@ -173,8 +177,10 @@ def test_adding_to_random_cache(pyocf_ctx):
@pytest.mark.parametrize("cls", CacheLineSize) @pytest.mark.parametrize("cls", CacheLineSize)
def test_adding_core_twice(pyocf_ctx, cache_mode, cls): def test_adding_core_twice(pyocf_ctx, cache_mode, cls):
# Start cache device # Start cache device
cache_device = Volume(S.from_MiB(100)) cache_device = Volume(S.from_MiB(30))
cache = Cache.start_on_device(cache_device, cache_mode=cache_mode, cache_line_size=cls) cache = Cache.start_on_device(
cache_device, cache_mode=cache_mode, cache_line_size=cls
)
# Create core device # Create core device
core_device = Volume(S.from_MiB(10)) core_device = Volume(S.from_MiB(10))
@ -196,12 +202,16 @@ def test_adding_core_twice(pyocf_ctx, cache_mode, cls):
@pytest.mark.parametrize("cls", CacheLineSize) @pytest.mark.parametrize("cls", CacheLineSize)
def test_adding_core_already_used(pyocf_ctx, cache_mode, cls): def test_adding_core_already_used(pyocf_ctx, cache_mode, cls):
# Start first cache device # Start first cache device
cache_device1 = Volume(S.from_MiB(100)) cache_device1 = Volume(S.from_MiB(30))
cache1 = Cache.start_on_device(cache_device1, cache_mode=cache_mode, cache_line_size=cls) cache1 = Cache.start_on_device(
cache_device1, cache_mode=cache_mode, cache_line_size=cls
)
# Start second cache device # Start second cache device
cache_device2 = Volume(S.from_MiB(100)) cache_device2 = Volume(S.from_MiB(30))
cache2 = Cache.start_on_device(cache_device2, cache_mode=cache_mode, cache_line_size=cls) cache2 = Cache.start_on_device(
cache_device2, cache_mode=cache_mode, cache_line_size=cls
)
# Create core device # Create core device
core_device = Volume(S.from_MiB(10)) core_device = Volume(S.from_MiB(10))
@ -226,8 +236,10 @@ def test_adding_core_already_used(pyocf_ctx, cache_mode, cls):
@pytest.mark.parametrize("cls", CacheLineSize) @pytest.mark.parametrize("cls", CacheLineSize)
def test_add_remove_incrementally(pyocf_ctx, cache_mode, cls): def test_add_remove_incrementally(pyocf_ctx, cache_mode, cls):
# Start cache device # Start cache device
cache_device = Volume(S.from_MiB(100)) cache_device = Volume(S.from_MiB(30))
cache = Cache.start_on_device(cache_device, cache_mode=cache_mode, cache_line_size=cls) cache = Cache.start_on_device(
cache_device, cache_mode=cache_mode, cache_line_size=cls
)
core_devices = [] core_devices = []
core_amount = 5 core_amount = 5

View File

@ -16,8 +16,10 @@ from pyocf.types.shared import CacheLineSize
@pytest.mark.parametrize("cls", CacheLineSize) @pytest.mark.parametrize("cls", CacheLineSize)
def test_change_cache_mode(pyocf_ctx, from_cm, to_cm, cls): def test_change_cache_mode(pyocf_ctx, from_cm, to_cm, cls):
# Start cache device # Start cache device
cache_device = Volume(S.from_MiB(100)) cache_device = Volume(S.from_MiB(30))
cache = Cache.start_on_device(cache_device, cache_mode=from_cm, cache_line_size=cls) cache = Cache.start_on_device(
cache_device, cache_mode=from_cm, cache_line_size=cls
)
# Check if started with correct cache mode # Check if started with correct cache mode
stats = cache.get_stats() stats = cache.get_stats()