Merge pull request #583 from jfckm/multiple-pyocf-ctx
Support multiple PyOcf ctx instances plus leak fixes
This commit is contained in:
commit
6a1d5f2ad6
@ -37,6 +37,7 @@ from .queue import Queue
|
||||
from .stats.cache import CacheInfo
|
||||
from .ioclass import IoClassesInfo, IoClassInfo
|
||||
from .stats.shared import UsageStats, RequestsStats, BlocksStats, ErrorsStats
|
||||
from .ctx import OcfCtx
|
||||
|
||||
|
||||
class Backfill(Structure):
|
||||
@ -209,7 +210,6 @@ class Cache:
|
||||
)
|
||||
if status:
|
||||
raise OcfError("Creating cache instance failed", status)
|
||||
self.owner.caches.append(self)
|
||||
|
||||
self.mngt_queue = mngt_queue or Queue(self, "mgmt-{}".format(self.get_name()))
|
||||
|
||||
@ -223,6 +223,7 @@ class Cache:
|
||||
raise OcfError("Error setting management queue", status)
|
||||
|
||||
self.started = True
|
||||
self.owner.caches.append(self)
|
||||
|
||||
def change_cache_mode(self, cache_mode: CacheMode):
|
||||
self.write_lock()
|
||||
@ -474,7 +475,7 @@ class Cache:
|
||||
|
||||
c = OcfCompletion([("cache", c_void_p), ("priv", c_void_p), ("error", c_int)])
|
||||
|
||||
device.owner.lib.ocf_mngt_cache_attach(
|
||||
self.owner.lib.ocf_mngt_cache_attach(
|
||||
self.cache_handle, byref(self.dev_cfg), c, None
|
||||
)
|
||||
|
||||
@ -493,6 +494,7 @@ class Cache:
|
||||
|
||||
c.wait()
|
||||
self.write_unlock()
|
||||
self.device = None
|
||||
|
||||
if c.results["error"]:
|
||||
raise OcfError("Attaching cache device failed", c.results["error"])
|
||||
@ -500,7 +502,7 @@ class Cache:
|
||||
def load_cache(self, device, open_cores=True):
|
||||
self.configure_device(device, open_cores=open_cores)
|
||||
c = OcfCompletion([("cache", c_void_p), ("priv", c_void_p), ("error", c_int)])
|
||||
device.owner.lib.ocf_mngt_cache_load(
|
||||
self.owner.lib.ocf_mngt_cache_load(
|
||||
self.cache_handle, byref(self.dev_cfg), c, None
|
||||
)
|
||||
|
||||
@ -509,8 +511,11 @@ class Cache:
|
||||
raise OcfError("Loading cache device failed", c.results["error"])
|
||||
|
||||
@classmethod
|
||||
def load_from_device(cls, device, name="cache", open_cores=True):
|
||||
c = cls(name=name, owner=device.owner)
|
||||
def load_from_device(cls, device, owner=None, name="cache", open_cores=True):
|
||||
if owner is None:
|
||||
owner = OcfCtx.get_default()
|
||||
|
||||
c = cls(name=name, owner=owner)
|
||||
|
||||
c.start_cache()
|
||||
try:
|
||||
@ -522,8 +527,11 @@ class Cache:
|
||||
return c
|
||||
|
||||
@classmethod
|
||||
def start_on_device(cls, device, **kwargs):
|
||||
c = cls(owner=device.owner, **kwargs)
|
||||
def start_on_device(cls, device, owner=None, **kwargs):
|
||||
if owner is None:
|
||||
owner = OcfCtx.get_default()
|
||||
|
||||
c = cls(owner=owner, **kwargs)
|
||||
|
||||
c.start_cache()
|
||||
try:
|
||||
@ -707,9 +715,10 @@ class Cache:
|
||||
|
||||
self.mngt_queue.put()
|
||||
del self.io_queues[:]
|
||||
self.started = False
|
||||
|
||||
self.write_unlock()
|
||||
self.device = None
|
||||
self.started = False
|
||||
|
||||
self.owner.caches.remove(self)
|
||||
|
||||
|
@ -4,6 +4,7 @@
|
||||
#
|
||||
|
||||
from ctypes import c_void_p, Structure, c_char_p, cast, pointer, byref, c_int
|
||||
import weakref
|
||||
|
||||
from .logger import LoggerOps, Logger
|
||||
from .data import DataOps, Data
|
||||
@ -27,6 +28,8 @@ class OcfCtxCfg(Structure):
|
||||
|
||||
|
||||
class OcfCtx:
|
||||
default = None
|
||||
|
||||
def __init__(self, lib, name, logger, data, cleaner):
|
||||
self.logger = logger
|
||||
self.data = data
|
||||
@ -51,10 +54,29 @@ class OcfCtx:
|
||||
if result != 0:
|
||||
raise OcfError("Context initialization failed", result)
|
||||
|
||||
if self.default is None or self.default() is None:
|
||||
type(self).default = weakref.ref(self)
|
||||
|
||||
@classmethod
|
||||
def with_defaults(cls, logger):
|
||||
return cls(
|
||||
OcfLib.getInstance(),
|
||||
b"PyOCF default ctx",
|
||||
logger,
|
||||
Data,
|
||||
Cleaner,
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def get_default(cls):
|
||||
if cls.default is None or cls.default() is None:
|
||||
raise Exception("No context instantiated yet")
|
||||
|
||||
return cls.default()
|
||||
|
||||
def register_volume_type(self, volume_type):
|
||||
self.volume_types[self.volume_types_count] = volume_type
|
||||
volume_type.type_id = self.volume_types_count
|
||||
volume_type.owner = self
|
||||
|
||||
result = self.lib.ocf_ctx_register_volume_type(
|
||||
self.ctx_handle,
|
||||
@ -90,26 +112,8 @@ class OcfCtx:
|
||||
self.cleanup_volume_types()
|
||||
|
||||
self.lib.ocf_ctx_put(self.ctx_handle)
|
||||
|
||||
self.cfg = None
|
||||
self.logger = None
|
||||
self.data = None
|
||||
self.cleaner = None
|
||||
Queue._instances_ = {}
|
||||
Volume._instances_ = {}
|
||||
Volume._uuid_ = {}
|
||||
Data._instances_ = {}
|
||||
Logger._instances_ = {}
|
||||
|
||||
|
||||
def get_default_ctx(logger):
|
||||
return OcfCtx(
|
||||
OcfLib.getInstance(),
|
||||
b"PyOCF default ctx",
|
||||
logger,
|
||||
Data,
|
||||
Cleaner,
|
||||
)
|
||||
if type(self).default and type(self).default() == self:
|
||||
type(self).default = None
|
||||
|
||||
|
||||
lib = OcfLib.getInstance()
|
||||
|
@ -59,7 +59,7 @@ class Data:
|
||||
DATA_POISON = 0xA5
|
||||
PAGE_SIZE = 4096
|
||||
|
||||
_instances_ = {}
|
||||
_instances_ = weakref.WeakValueDictionary()
|
||||
_ocf_instances_ = []
|
||||
|
||||
def __init__(self, byte_count: int):
|
||||
@ -69,12 +69,12 @@ class Data:
|
||||
self.handle = cast(byref(self.buffer), c_void_p)
|
||||
|
||||
memset(self.handle, self.DATA_POISON, self.size)
|
||||
type(self)._instances_[self.handle.value] = weakref.ref(self)
|
||||
type(self)._instances_[self.handle.value] = self
|
||||
self._as_parameter_ = self.handle
|
||||
|
||||
@classmethod
|
||||
def get_instance(cls, ref):
|
||||
return cls._instances_[ref]()
|
||||
return cls._instances_[ref]
|
||||
|
||||
@classmethod
|
||||
def get_ops(cls):
|
||||
|
@ -20,9 +20,7 @@ import weakref
|
||||
|
||||
from ..ocf import OcfLib
|
||||
|
||||
logger = logging.getLogger("pyocf")
|
||||
logger.setLevel(logging.DEBUG)
|
||||
|
||||
logging.basicConfig(level=logging.DEBUG, handlers=[logging.NullHandler()])
|
||||
|
||||
class LogLevel(IntEnum):
|
||||
EMERG = 0
|
||||
@ -69,9 +67,9 @@ class LoggerPriv(Structure):
|
||||
|
||||
|
||||
class Logger(Structure):
|
||||
_instances_ = {}
|
||||
_instances_ = weakref.WeakValueDictionary()
|
||||
|
||||
_fields_ = [("logger", c_void_p)]
|
||||
_fields_ = [("_logger", c_void_p)]
|
||||
|
||||
def __init__(self):
|
||||
self.ops = LoggerOps(
|
||||
@ -81,7 +79,7 @@ class Logger(Structure):
|
||||
)
|
||||
self.priv = LoggerPriv(_log=self._log)
|
||||
self._as_parameter_ = cast(pointer(self.priv), c_void_p).value
|
||||
self._instances_[self._as_parameter_] = weakref.ref(self)
|
||||
self._instances_[self._as_parameter_] = self
|
||||
|
||||
def get_ops(self):
|
||||
return self.ops
|
||||
@ -92,7 +90,7 @@ class Logger(Structure):
|
||||
@classmethod
|
||||
def get_instance(cls, ctx: int):
|
||||
priv = OcfLib.getInstance().ocf_logger_get_priv(ctx)
|
||||
return cls._instances_[priv]()
|
||||
return cls._instances_[priv]
|
||||
|
||||
@staticmethod
|
||||
@LoggerOps.LOG
|
||||
@ -118,23 +116,25 @@ class Logger(Structure):
|
||||
|
||||
|
||||
class DefaultLogger(Logger):
|
||||
def __init__(self, level: LogLevel = LogLevel.WARN):
|
||||
def __init__(self, level: LogLevel = LogLevel.WARN, name: str = ""):
|
||||
super().__init__()
|
||||
self.level = level
|
||||
self.name = name
|
||||
|
||||
self.logger = logging.getLogger(name)
|
||||
ch = logging.StreamHandler()
|
||||
fmt = logging.Formatter(
|
||||
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
|
||||
)
|
||||
ch.setFormatter(fmt)
|
||||
ch.setLevel(LevelMapping[level])
|
||||
logger.addHandler(ch)
|
||||
self.logger.addHandler(ch)
|
||||
|
||||
def log(self, lvl: int, msg: str):
|
||||
logger.log(LevelMapping[lvl], msg)
|
||||
self.logger.log(LevelMapping[lvl], msg)
|
||||
|
||||
def close(self):
|
||||
logger.handlers = []
|
||||
self.logger.handlers = []
|
||||
|
||||
|
||||
class FileLogger(Logger):
|
||||
|
@ -38,7 +38,7 @@ def io_queue_run(*, queue: Queue, kick: Condition, stop: Event):
|
||||
|
||||
|
||||
class Queue:
|
||||
_instances_ = {}
|
||||
_instances_ = weakref.WeakValueDictionary()
|
||||
|
||||
def __init__(self, cache, name):
|
||||
|
||||
@ -51,7 +51,7 @@ class Queue:
|
||||
if status:
|
||||
raise OcfError("Couldn't create queue object", status)
|
||||
|
||||
Queue._instances_[self.handle.value] = weakref.ref(self)
|
||||
Queue._instances_[self.handle.value] = self
|
||||
self._as_parameter_ = self.handle
|
||||
|
||||
self.stop_event = Event()
|
||||
@ -70,7 +70,7 @@ class Queue:
|
||||
|
||||
@classmethod
|
||||
def get_instance(cls, ref):
|
||||
return cls._instances_[ref]()
|
||||
return cls._instances_[ref]
|
||||
|
||||
@staticmethod
|
||||
@QueueOps.KICK_SYNC
|
||||
|
@ -78,8 +78,8 @@ class Volume(Structure):
|
||||
VOLUME_POISON = 0x13
|
||||
|
||||
_fields_ = [("_storage", c_void_p)]
|
||||
_instances_ = {}
|
||||
_uuid_ = {}
|
||||
_instances_ = weakref.WeakValueDictionary()
|
||||
_uuid_ = weakref.WeakValueDictionary()
|
||||
|
||||
props = None
|
||||
|
||||
@ -95,7 +95,7 @@ class Volume(Structure):
|
||||
else:
|
||||
self.uuid = str(id(self))
|
||||
|
||||
type(self)._uuid_[self.uuid] = weakref.ref(self)
|
||||
type(self)._uuid_[self.uuid] = self
|
||||
|
||||
self.data = create_string_buffer(int(self.size))
|
||||
memset(self.data, self.VOLUME_POISON, self.size)
|
||||
@ -138,7 +138,7 @@ class Volume(Structure):
|
||||
|
||||
@classmethod
|
||||
def get_instance(cls, ref):
|
||||
instance = cls._instances_[ref]()
|
||||
instance = cls._instances_[ref]
|
||||
if instance is None:
|
||||
print("tried to access {} but it's gone".format(ref))
|
||||
|
||||
@ -146,7 +146,7 @@ class Volume(Structure):
|
||||
|
||||
@classmethod
|
||||
def get_by_uuid(cls, uuid):
|
||||
return cls._uuid_[uuid]()
|
||||
return cls._uuid_[uuid]
|
||||
|
||||
@staticmethod
|
||||
@VolumeOps.SUBMIT_IO
|
||||
@ -205,7 +205,7 @@ class Volume(Structure):
|
||||
if volume.opened:
|
||||
return -OcfErrorCode.OCF_ERR_NOT_OPEN_EXC
|
||||
|
||||
Volume._instances_[ref] = weakref.ref(volume)
|
||||
Volume._instances_[ref] = volume
|
||||
|
||||
return volume.open()
|
||||
|
||||
|
@ -11,7 +11,7 @@ import gc
|
||||
sys.path.append(os.path.join(os.path.dirname(__file__), os.path.pardir))
|
||||
from pyocf.types.logger import LogLevel, DefaultLogger, BufferLogger
|
||||
from pyocf.types.volume import Volume, ErrorDevice
|
||||
from pyocf.types.ctx import get_default_ctx
|
||||
from pyocf.types.ctx import OcfCtx
|
||||
|
||||
|
||||
def pytest_configure(config):
|
||||
@ -20,7 +20,7 @@ def pytest_configure(config):
|
||||
|
||||
@pytest.fixture()
|
||||
def pyocf_ctx():
|
||||
c = get_default_ctx(DefaultLogger(LogLevel.WARN))
|
||||
c = OcfCtx.with_defaults(DefaultLogger(LogLevel.WARN))
|
||||
c.register_volume_type(Volume)
|
||||
c.register_volume_type(ErrorDevice)
|
||||
yield c
|
||||
@ -31,7 +31,7 @@ def pyocf_ctx():
|
||||
@pytest.fixture()
|
||||
def pyocf_ctx_log_buffer():
|
||||
logger = BufferLogger(LogLevel.DEBUG)
|
||||
c = get_default_ctx(logger)
|
||||
c = OcfCtx.with_defaults(logger)
|
||||
c.register_volume_type(Volume)
|
||||
c.register_volume_type(ErrorDevice)
|
||||
yield logger
|
||||
|
Loading…
Reference in New Issue
Block a user