Fix leaky tests

Fix some instance tracking to prevent Python-side objects from leaking
buffers.
Also, reduce min size of Cache instance (real minimum should be around ~~19MiB,
but we need to make it more deterministic and 20 MiB seems to be reasonable).

Still some stuff left to do, but it needs more investigation and, for
now, this should suffice just to enable some form of CI.

Signed-off-by: Jan Musial <jan.musial@intel.com>
This commit is contained in:
Jan Musial
2019-04-08 16:30:50 +02:00
parent f0c3e049c6
commit 003d6505aa
15 changed files with 246 additions and 172 deletions

View File

@@ -111,7 +111,8 @@ class Cache:
pt_unaligned_io: bool = DEFAULT_PT_UNALIGNED_IO,
use_submit_fast: bool = DEFAULT_USE_SUBMIT_FAST,
):
self.device = None
self.started = False
self.owner = owner
self.cache_line_size = cache_line_size
@@ -134,16 +135,17 @@ class Cache:
self.cache_handle = c_void_p()
self._as_parameter_ = self.cache_handle
self.io_queues = []
self.device = None
def start_cache(
self, default_io_queue: Queue = None, mngt_queue: Queue = None,
self, default_io_queue: Queue = None, mngt_queue: Queue = None
):
status = self.owner.lib.ocf_mngt_cache_start(
self.owner.ctx_handle, byref(self.cache_handle), byref(self.cfg)
)
if status:
raise OcfError("Creating cache instance failed", status)
self.owner.caches += [self]
self.owner.caches.append(self)
self.mngt_queue = mngt_queue or Queue(
self, "mgmt-{}".format(self.get_name()), mngt_queue=True
@@ -152,7 +154,9 @@ class Cache:
if default_io_queue:
self.io_queues += [default_io_queue]
else:
self.io_queues += [Queue(self, "default-io-{}".format(self.get_name()))]
self.io_queues += [
Queue(self, "default-io-{}".format(self.get_name()))
]
status = self.owner.lib.ocf_mngt_cache_set_mngt_queue(
self, self.mngt_queue
@@ -160,6 +164,8 @@ class Cache:
if status:
raise OcfError("Error setting management queue", status)
self.started = True
def change_cache_mode(self, cache_mode: CacheMode):
self.get_and_write_lock()
self.owner.lib.ocf_mngt_cache_set_mode(self.cache_handle, cache_mode)
@@ -168,6 +174,7 @@ class Cache:
def configure_device(
self, device, force=False, perform_test=False, cache_line_size=None
):
self.device = device
self.device_name = device.uuid
self.dev_cfg = CacheDeviceConfig(
_uuid=Uuid(
@@ -233,15 +240,13 @@ class Cache:
try:
c.attach_device(device, force=True)
except:
c.stop(flush=False)
c.stop()
raise
return c
def _get_and_lock(self, read=True):
status = self.owner.lib.ocf_mngt_cache_get(self.cache_handle)
if status:
raise OcfError("Couldn't get cache instance", status)
self.get()
if read:
status = self.owner.lib.ocf_mngt_cache_read_lock(self.cache_handle)
@@ -249,7 +254,7 @@ class Cache:
status = self.owner.lib.ocf_mngt_cache_lock(self.cache_handle)
if status:
self.owner.lib.ocf_mngt_cache_put(self.cache_handle)
self.put()
raise OcfError("Couldn't lock cache instance", status)
def _put_and_unlock(self, read=True):
@@ -258,8 +263,16 @@ class Cache:
else:
self.owner.lib.ocf_mngt_cache_unlock(self.cache_handle)
self.put()
def put(self):
self.owner.lib.ocf_mngt_cache_put(self.cache_handle)
def get(self):
status = self.owner.lib.ocf_mngt_cache_get(self.cache_handle)
if status:
raise OcfError("Couldn't get cache instance", status)
def get_and_read_lock(self):
self._get_and_lock(True)
@@ -301,16 +314,9 @@ class Cache:
def remove_core(self, core: Core):
self.get_and_write_lock()
c = OcfCompletion(
[
("priv", c_void_p),
("error", c_int),
]
)
c = OcfCompletion([("priv", c_void_p), ("error", c_int)])
self.owner.lib.ocf_mngt_cache_remove_core(
core.handle, c, None
)
self.owner.lib.ocf_mngt_cache_remove_core(core.handle, c, None)
c.wait()
if c.results["error"]:
@@ -393,9 +399,9 @@ class Cache:
return self.io_queues[0]
def stop(self, flush: bool = True):
if flush:
self.flush()
def stop(self):
if not self.started:
raise Exception("Already stopped!")
self.get_and_write_lock()
@@ -410,7 +416,15 @@ class Cache:
self.put_and_write_unlock()
raise OcfError("Failed stopping cache", c.results["error"])
self.mngt_queue.stop()
self.mngt_queue = None
del self.io_queues[:]
self.device = None
self.started = False
self.put_and_write_unlock()
self.put()
self.owner.caches.remove(self)
def flush(self):
@@ -437,6 +451,7 @@ class Cache:
finally:
self.put_and_read_unlock()
lib = OcfLib.getInstance()
lib.ocf_mngt_cache_remove_core.argtypes = [c_void_p, c_void_p, c_void_p]
lib.ocf_mngt_cache_add_core.argtypes = [c_void_p, c_void_p, c_void_p, c_void_p]

View File

@@ -4,15 +4,15 @@
#
from ctypes import c_void_p, Structure, c_char_p, cast, pointer, byref
from enum import IntEnum
from .logger import LoggerOps, Logger
from .data import DataOps, Data
from .queue import Queue
from .cleaner import CleanerOps, Cleaner
from .metadata_updater import MetadataUpdaterOps, MetadataUpdater
from .shared import OcfError
from ..ocf import OcfLib
from .queue import Queue
from .volume import Volume
class OcfCtxOps(Structure):
@@ -56,22 +56,53 @@ class OcfCtx:
raise OcfError("Context initialization failed", result)
def register_volume_type(self, volume_type):
self.volume_types[self.volume_types_count] = volume_type.get_props()
self.volume_types[self.volume_types_count] = volume_type
volume_type.type_id = self.volume_types_count
volume_type.owner = self
result = self.lib.ocf_ctx_register_volume_type(
self.ctx_handle,
self.volume_types_count,
byref(self.volume_types[self.volume_types_count]),
byref(self.volume_types[self.volume_types_count].get_props()),
)
if result != 0:
raise OcfError("Data object registration failed", result)
raise OcfError("Volume type registration failed", result)
self.volume_types_count += 1
def unregister_volume_type(self, vol_type):
if not vol_type.type_id:
raise Exception("Already unregistered")
result = self.lib.ocf_ctx_unregister_volume_type(
self.ctx_handle, vol_type.type_id
)
if result != 0:
raise OcfError("Volume type unregistration failed", result)
del self.volume_types[vol_type.type_id]
def cleanup_volume_types(self):
for k, vol_type in list(self.volume_types.items()):
if vol_type:
self.unregister_volume_type(vol_type)
def exit(self):
self.lib.ocf_ctx_exit(self.ctx_handle)
self.cleanup_volume_types()
result = self.lib.ocf_ctx_exit(self.ctx_handle)
if result != 0:
raise OcfError("Failed quitting OcfCtx", result)
self.cfg = None
self.logger = None
self.data = None
self.mu = None
self.cleaner = None
Queue._instances_ = {}
Volume._instances_ = {}
Data._instances_ = {}
Logger._instances_ = {}
def get_default_ctx(logger):

View File

@@ -11,16 +11,16 @@ from ctypes import (
create_string_buffer,
cast,
memset,
c_char_p,
string_at,
Structure,
c_int,
memmove,
byref,
)
from enum import IntEnum
from hashlib import md5
import weakref
from .shared import SharedOcfObject
from ..utils import print_buffer
@@ -55,23 +55,24 @@ class DataOps(Structure):
]
class Data(SharedOcfObject):
class Data:
PAGE_SIZE = 4096
_instances_ = {}
_fields_ = [("data", c_void_p)]
_ocf_instances_ = []
def __init__(self, byte_count: int):
self.size = byte_count
self.size = int(byte_count)
self.position = 0
self.buffer = create_string_buffer(int(self.size))
self.data = cast(self.buffer, c_void_p)
memset(self.data, 0, self.size)
type(self)._instances_[self.data] = self
self._as_parameter_ = self.data
self.handle = cast(byref(self.buffer), c_void_p)
memset(self.handle, 0, self.size)
type(self)._instances_[self.handle.value] = weakref.ref(self)
self._as_parameter_ = self.handle
super().__init__()
@classmethod
def get_instance(cls, ref):
return cls._instances_[ref]()
@classmethod
def get_ops(cls):
@@ -96,7 +97,7 @@ class Data(SharedOcfObject):
def from_bytes(cls, source: bytes):
d = cls(len(source))
memmove(d.data, cast(source, c_void_p), len(source))
memmove(d.handle, cast(source, c_void_p), len(source))
return d
@@ -104,31 +105,25 @@ class Data(SharedOcfObject):
def from_string(cls, source: str, encoding: str = "ascii"):
return cls.from_bytes(bytes(source, encoding))
def __str__(self):
char_array = cast(self.data, c_char_p)
return str(char_array.value, "ascii")
def __wstr__(self):
char_array = cast(self.data, c_wchar_p)
return str(char_array.value, "utf-8")
def set_data(self, contents):
if len(contents) > self.size:
raise Exception("Data too big to fit into allocated buffer")
memmove(self.data, cast(contents, c_void_p), len(contents))
memmove(self.handle, cast(contents, c_void_p), len(contents))
self.position = 0
@staticmethod
@DataOps.ALLOC
def _alloc(pages):
data = Data.pages(pages)
return data.data
Data._ocf_instances_.append(data)
return data.handle.value
@staticmethod
@DataOps.FREE
def _free(data):
Data.del_object(data)
def _free(ref):
Data._ocf_instances_.remove(Data.get_instance(ref))
@staticmethod
@DataOps.MLOCK
@@ -162,9 +157,9 @@ class Data(SharedOcfObject):
@staticmethod
@DataOps.COPY
def _copy(dst, src, end, start, size):
def _copy(dst, src, skip, seek, size):
return Data.get_instance(dst).copy(
Data.get_instance(src), end, start, size
Data.get_instance(src), skip, seek, size
)
@staticmethod
@@ -174,12 +169,12 @@ class Data(SharedOcfObject):
def read(self, dst, size):
to_read = min(self.size - self.position, size)
memmove(dst, self.data + self.position, to_read)
memmove(dst, self.handle.value + self.position, to_read)
return to_read
def write(self, src, size):
to_write = min(self.size - self.position, size)
memmove(self.data + self.position, src, to_write)
memmove(self.handle.value + self.position, src, to_write)
return to_write
def mlock(self):
@@ -190,7 +185,7 @@ class Data(SharedOcfObject):
def zero(self, size):
to_zero = min(self.size - self.position, size)
memset(self.data + self.position, 0, to_zero)
memset(self.handle.value + self.position, 0, to_zero)
return to_zero
def seek(self, seek, size):
@@ -203,8 +198,11 @@ class Data(SharedOcfObject):
return to_move
def copy(self, src, end, start, size):
return size
def copy(self, src, skip, seek, size):
to_write = min(self.size - skip, size, src.size - seek)
memmove(self.handle.value + skip, src.handle.value + seek, to_write)
return to_write
def secure_erase(self):
pass
@@ -214,5 +212,5 @@ class Data(SharedOcfObject):
def md5(self):
m = md5()
m.update(string_at(self.data, self.size))
m.update(string_at(self.handle, self.size))
return m.hexdigest()

View File

@@ -14,7 +14,7 @@ from ctypes import (
byref,
cast,
)
from enum import IntEnum, auto
from enum import IntEnum
from ..ocf import OcfLib
from .data import Data
@@ -113,7 +113,6 @@ class Io(Structure):
OcfLib.getInstance().ocf_io_set_data_wrapper(byref(self), data, 0)
def set_queue(self, queue: Queue):
self.queue = queue
OcfLib.getInstance().ocf_io_set_queue_wrapper(byref(self), queue.handle)

View File

@@ -6,7 +6,6 @@
from ctypes import (
c_void_p,
Structure,
c_void_p,
c_char_p,
c_uint,
c_int,
@@ -17,6 +16,7 @@ from ctypes import (
from enum import IntEnum
import logging
from io import StringIO
import weakref
from ..ocf import OcfLib
@@ -81,7 +81,7 @@ class Logger(Structure):
)
self.priv = LoggerPriv(_log=self._log)
self._as_parameter_ = cast(pointer(self.priv), c_void_p).value
self._instances_[self._as_parameter_] = self
self._instances_[self._as_parameter_] = weakref.ref(self)
def get_ops(self):
return self.ops
@@ -92,7 +92,7 @@ class Logger(Structure):
@classmethod
def get_instance(cls, ctx: int):
priv = OcfLib.getInstance().ocf_logger_get_priv(ctx)
return cls._instances_[priv]
return cls._instances_[priv]()
@staticmethod
@LoggerOps.LOG

View File

@@ -4,7 +4,8 @@
#
from ctypes import c_void_p, CFUNCTYPE, Structure, byref
from threading import Thread, Condition, Lock
from threading import Thread, Condition, Event
import weakref
from ..ocf import OcfLib
from .shared import OcfError
@@ -13,7 +14,6 @@ from .shared import OcfError
class QueueOps(Structure):
KICK = CFUNCTYPE(None, c_void_p)
KICK_SYNC = CFUNCTYPE(None, c_void_p)
KICK = CFUNCTYPE(None, c_void_p)
STOP = CFUNCTYPE(None, c_void_p)
_fields_ = [("kick", KICK), ("kick_sync", KICK_SYNC), ("stop", STOP)]
@@ -23,54 +23,55 @@ class Queue:
pass
def io_queue_run(*, queue: Queue, kick: Condition, stop: Event):
def wait_predicate():
return stop.is_set() or OcfLib.getInstance().ocf_queue_pending_io(queue)
while True:
with kick:
kick.wait_for(wait_predicate)
OcfLib.getInstance().ocf_queue_run(queue)
if stop.is_set() and not OcfLib.getInstance().ocf_queue_pending_io(queue):
break
class Queue:
_instances_ = {}
@staticmethod
def io_queue_run(*, queue: Queue, kick: Condition):
def wait_predicate():
return queue.stop or OcfLib.getInstance().ocf_queue_pending_io(queue)
while True:
with kick:
kick.wait_for(wait_predicate)
queue.owner.lib.ocf_queue_run(queue)
if queue.stop and not queue.owner.lib.ocf_queue_pending_io(queue):
break
def __init__(self, cache, name, mngt_queue: bool = False):
self.owner = cache.owner
self.ops = QueueOps(kick=type(self)._kick, stop=type(self)._stop)
self.handle = c_void_p()
status = self.owner.lib.ocf_queue_create(
status = OcfLib.getInstance().ocf_queue_create(
cache.cache_handle, byref(self.handle), byref(self.ops)
)
if status:
raise OcfError("Couldn't create queue object", status)
Queue._instances_[self.handle.value] = self
Queue._instances_[self.handle.value] = weakref.ref(self)
self._as_parameter_ = self.handle
self.stop_lock = Lock()
self.stop = False
self.kick_condition = Condition(self.stop_lock)
self.stop_event = Event()
self.kick_condition = Condition()
self.thread = Thread(
group=None,
target=Queue.io_queue_run,
target=io_queue_run,
name=name,
kwargs={"queue": self, "kick": self.kick_condition},
daemon=True,
kwargs={
"queue": self,
"kick": self.kick_condition,
"stop": self.stop_event,
},
)
self.thread.start()
self.mngt_queue = mngt_queue
@classmethod
def get_instance(cls, ref):
return cls._instances_[ref]
return cls._instances_[ref]()
@staticmethod
@QueueOps.KICK_SYNC
@@ -88,7 +89,7 @@ class Queue:
Queue.get_instance(ref).stop()
def kick_sync(self):
self.owner.lib.ocf_queue_run(self.handle)
OcfLib.getInstance().ocf_queue_run(self.handle)
def kick(self):
with self.kick_condition:
@@ -96,9 +97,12 @@ class Queue:
def stop(self):
with self.kick_condition:
self.stop = True
self.stop_event.set()
self.kick_condition.notify_all()
self.thread.join()
if self.mngt_queue:
self.owner.lib.ocf_queue_put(self)
OcfLib.getInstance().ocf_queue_put(self)
self.thread = None
self.ops = None

View File

@@ -98,7 +98,7 @@ class SharedOcfObject(Structure):
try:
return cls._instances_[ref]
except:
logging.get_logger("pyocf").error(
logging.getLogger("pyocf").error(
"OcfSharedObject corruption. wanted: {} instances: {}".format(
ref, cls._instances_
)
@@ -130,8 +130,3 @@ class CacheLines(S):
def __int__(self):
return int(self.bytes / self.line_size)
def __str__(self):
return "{} ({})".format(int(self), super().__str__())
__repr__ = __str__

View File

@@ -10,6 +10,7 @@ from ctypes import (
c_char_p,
create_string_buffer,
memmove,
memset,
Structure,
CFUNCTYPE,
c_int,
@@ -20,6 +21,7 @@ from ctypes import (
string_at,
)
from hashlib import md5
import weakref
from .io import Io, IoOps, IoDir
from .shared import OcfErrorCode, Uuid
@@ -76,6 +78,8 @@ class Volume(Structure):
_instances_ = {}
_uuid_ = {}
props = None
def __init__(self, size: S, uuid=None):
super().__init__()
self.size = size
@@ -88,41 +92,50 @@ class Volume(Structure):
else:
self.uuid = str(id(self))
type(self)._uuid_[self.uuid] = self
type(self)._uuid_[self.uuid] = weakref.ref(self)
self.data = create_string_buffer(int(self.size))
self._storage = cast(self.data, c_void_p)
self.reset_stats()
self.opened = False
@classmethod
def get_props(cls):
return VolumeProperties(
_name=str(cls.__name__).encode("ascii"),
_io_priv_size=sizeof(VolumeIoPriv),
_volume_priv_size=0,
_caps=VolumeCaps(_atomic_writes=0),
_ops=VolumeOps(
_submit_io=cls._submit_io,
_submit_flush=cls._submit_flush,
_submit_metadata=cls._submit_metadata,
_submit_discard=cls._submit_discard,
_submit_write_zeroes=cls._submit_write_zeroes,
_open=cls._open,
_close=cls._close,
_get_max_io_size=cls._get_max_io_size,
_get_length=cls._get_length,
),
_io_ops=IoOps(_set_data=cls._io_set_data, _get_data=cls._io_get_data),
)
if not cls.props:
cls.props = VolumeProperties(
_name=str(cls.__name__).encode("ascii"),
_io_priv_size=sizeof(VolumeIoPriv),
_volume_priv_size=0,
_caps=VolumeCaps(_atomic_writes=0),
_ops=VolumeOps(
_submit_io=cls._submit_io,
_submit_flush=cls._submit_flush,
_submit_metadata=cls._submit_metadata,
_submit_discard=cls._submit_discard,
_submit_write_zeroes=cls._submit_write_zeroes,
_open=cls._open,
_close=cls._close,
_get_max_io_size=cls._get_max_io_size,
_get_length=cls._get_length,
),
_io_ops=IoOps(
_set_data=cls._io_set_data, _get_data=cls._io_get_data
),
)
return cls.props
@classmethod
def get_instance(cls, ref):
return cls._instances_[ref]
instance = cls._instances_[ref]()
if instance is None:
print("tried to access {} but it's gone".format(ref))
return instance
@classmethod
def get_by_uuid(cls, uuid):
return cls._uuid_[uuid]
return cls._uuid_[uuid]()
@staticmethod
@VolumeOps.SUBMIT_IO
@@ -172,15 +185,19 @@ class Volume(Structure):
print("{}".format(Volume._uuid_))
return -1
type(volume)._instances_[ref] = volume
if volume.opened:
return OcfErrorCode.OCF_ERR_NOT_OPEN_EXC
Volume._instances_[ref] = weakref.ref(volume)
return volume.open()
@staticmethod
@VolumeOps.CLOSE
def _close(ref):
Volume.get_instance(ref).close()
del Volume._instances_[ref]
volume = Volume.get_instance(ref)
volume.close()
volume.opened = False
@staticmethod
@VolumeOps.GET_MAX_IO_SIZE
@@ -200,7 +217,8 @@ class Volume(Structure):
)
data = Data.get_instance(data)
data.position = offset
io_priv.contents._data = data.data
io_priv.contents._data = data.handle
return 0
@staticmethod
@@ -212,14 +230,11 @@ class Volume(Structure):
return io_priv.contents._data
def open(self):
if self.opened:
return OcfErrorCode.OCF_ERR_NOT_OPEN_EXC
self.opened = True
return 0
def close(self):
self.opened = False
pass
def get_length(self):
return self.size
@@ -231,7 +246,13 @@ class Volume(Structure):
flush.contents._end(flush, 0)
def submit_discard(self, discard):
discard.contents._end(discard, 0)
try:
dst = self._storage + discard.contents._addr
memset(dst, discard.contents._bytes)
discard.contents._end(discard, 0)
except:
discard.contents._end(discard, -5)
def get_stats(self):
return self.stats
@@ -242,6 +263,7 @@ class Volume(Structure):
def submit_io(self, io):
try:
self.stats[IoDir(io.contents._dir)] += 1
if io.contents._dir == IoDir.WRITE:
src_ptr = cast(io.contents._ops.contents._get_data(io), c_void_p)
src = Data.get_instance(src_ptr.value)
@@ -259,9 +281,12 @@ class Volume(Structure):
def dump_contents(self, stop_after_zeros=0, offset=0, size=0):
if size == 0:
size = self.size
size = int(self.size) - int(offset)
print_buffer(
self._storage + offset, size, stop_after_zeros=stop_after_zeros
self._storage,
int(size),
offset=int(offset),
stop_after_zeros=int(stop_after_zeros),
)
def md5(self):

View File

@@ -42,7 +42,7 @@ def print_buffer(buf, length, offset=0, width=16, stop_after_zeros=0):
char = "."
asciiline += char
print("{:#08X}\t{}\t{}".format(addr, byteline, asciiline))
print("0x{:08X}\t{}\t{}".format(addr, byteline, asciiline))
whole_buffer_empty = False
if whole_buffer_empty: