Add basic load test with core reattachment and cleanup structures

Signed-off-by: Jan Musial <jan.musial@intel.com>
This commit is contained in:
Jan Musial 2021-08-24 14:46:12 +02:00
parent 30fe2eb783
commit 2038c3aa68
3 changed files with 124 additions and 64 deletions

View File

@ -176,37 +176,53 @@ class Cache:
metadata_volatile: bool = False,
max_queue_size: int = DEFAULT_BACKFILL_QUEUE_SIZE,
queue_unblock_size: int = DEFAULT_BACKFILL_UNBLOCK,
locked: bool = False,
pt_unaligned_io: bool = DEFAULT_PT_UNALIGNED_IO,
use_submit_fast: bool = DEFAULT_USE_SUBMIT_FAST,
):
self.device = None
self.started = False
self.owner = owner
self.cache_line_size = cache_line_size
self.cfg = CacheConfig(
_name=name.encode("ascii"),
_cache_mode=cache_mode,
_promotion_policy=promotion_policy,
_cache_line_size=cache_line_size,
_metadata_layout=metadata_layout,
_metadata_volatile=metadata_volatile,
_backfill=Backfill(
_max_queue_size=max_queue_size, _queue_unblock_size=queue_unblock_size
),
_locked=locked,
_pt_unaligned_io=pt_unaligned_io,
_use_submit_fast=use_submit_fast,
)
self.name = name
self.cache_mode = cache_mode
self.promotion_policy = promotion_policy
self.cache_line_size = cache_line_size
self.metadata_layout = metadata_layout
self.metadata_volatile = metadata_volatile
self.max_queue_size = max_queue_size
self.queue_unblock_size = queue_unblock_size
self.pt_unaligned_io = pt_unaligned_io
self.use_submit_fast = use_submit_fast
self.cache_handle = c_void_p()
self._as_parameter_ = self.cache_handle
self.io_queues = []
self.cores = []
def start_cache(self, default_io_queue: Queue = None, mngt_queue: Queue = None):
def start_cache(
self,
default_io_queue: Queue = None,
mngt_queue: Queue = None,
locked: bool = False,
):
cfg = CacheConfig(
_name=self.name.encode("ascii"),
_cache_mode=self.cache_mode,
_promotion_policy=self.promotion_policy,
_cache_line_size=self.cache_line_size,
_metadata_layout=self.metadata_layout,
_metadata_volatile=self.metadata_volatile,
_backfill=Backfill(
_max_queue_size=self.max_queue_size,
_queue_unblock_size=self.queue_unblock_size,
),
_locked=locked,
_pt_unaligned_io=self.pt_unaligned_io,
_use_submit_fast=self.use_submit_fast,
)
status = self.owner.lib.ocf_mngt_cache_start(
self.owner.ctx_handle, byref(self.cache_handle), byref(self.cfg), None
self.owner.ctx_handle, byref(self.cache_handle), byref(cfg), None
)
if status:
raise OcfError("Creating cache instance failed", status)
@ -427,12 +443,13 @@ class Cache:
if status:
raise OcfError("Error adding partition to cache", status)
def configure_device(
def generate_attach_config(
self,
device,
force=False,
perform_test=True,
cache_line_size=None,
discard=False,
open_cores=True,
):
self.device = device
@ -440,17 +457,15 @@ class Cache:
device_config = CacheDeviceConfig(
_uuid=Uuid(
_data=cast(
create_string_buffer(self.device_name.encode("ascii")), c_char_p
),
_size=len(self.device_name) + 1,
_data=cast(create_string_buffer(device.uuid.encode("ascii")), c_char_p),
_size=len(device.uuid) + 1,
),
_volume_type=device.type_id,
_perform_test=perform_test,
_volume_params=None,
)
self.dev_cfg = CacheAttachConfig(
attach_cfg = CacheAttachConfig(
_device=device_config,
_cache_line_size=cache_line_size
if cache_line_size
@ -460,23 +475,28 @@ class Cache:
_discard_on_start=False,
)
return attach_cfg
def attach_device(
self,
device,
force=False,
perform_test=False,
cache_line_size=None,
open_cores=True,
self, device, force=False, perform_test=False, cache_line_size=None, open_cores=True
):
self.configure_device(
device, force, perform_test, cache_line_size, open_cores=open_cores
attach_cfg = self.generate_attach_config(
device,
force,
perform_test,
cache_line_size,
open_cores=open_cores
)
self.device = device
self.write_lock()
c = OcfCompletion([("cache", c_void_p), ("priv", c_void_p), ("error", c_int)])
self.owner.lib.ocf_mngt_cache_attach(
self.cache_handle, byref(self.dev_cfg), c, None
self.cache_handle, byref(attach_cfg), c, None
)
c.wait()
@ -500,10 +520,13 @@ class Cache:
raise OcfError("Attaching cache device failed", c.results["error"])
def load_cache(self, device, open_cores=True):
self.configure_device(device, open_cores=open_cores)
attach_cfg = self.generate_attach_config(device, open_cores=open_cores)
self.device = device
c = OcfCompletion([("cache", c_void_p), ("priv", c_void_p), ("error", c_int)])
self.owner.lib.ocf_mngt_cache_load(
self.cache_handle, byref(self.dev_cfg), c, None
self.cache_handle, byref(attach_cfg), c, None
)
c.wait()
@ -571,6 +594,10 @@ class Cache:
self.owner.lib.ocf_mngt_cache_unlock(self.cache_handle)
def add_core(self, core: Core, try_add=False):
cfg = core.get_config()
cfg._try_add = try_add
self.write_lock()
c = OcfCompletion(
@ -582,12 +609,7 @@ class Cache:
]
)
cfg = core.get_cfg()
cfg._try_add = try_add
self.owner.lib.ocf_mngt_cache_add_core(
self.cache_handle, byref(cfg), c, None
)
self.owner.lib.ocf_mngt_cache_add_core(self.cache_handle, byref(cfg), c, None)
c.wait()
if c.results["error"]:

View File

@ -19,9 +19,6 @@ from ctypes import (
cast,
byref,
create_string_buffer,
sizeof,
memmove,
pointer,
)
from datetime import timedelta
@ -68,23 +65,11 @@ class Core:
):
self.cache = None
self.device = device
self.device_name = device.uuid
self.name = name
self.seq_cutoff_threshold = seq_cutoff_threshold
self.seq_cutoff_promotion_count = seq_cutoff_promotion_count
self.handle = c_void_p()
self.cfg = CoreConfig(
_uuid=Uuid(
_data=cast(
create_string_buffer(self.device_name.encode("ascii")),
c_char_p,
),
_size=len(self.device_name) + 1,
),
_name=name.encode("ascii"),
_volume_type=self.device.type_id,
_try_add=try_add,
_seq_cutoff_threshold=seq_cutoff_threshold,
_seq_cutoff_promotion_count=seq_cutoff_promotion_count,
_user_metadata=UserMetadata(_data=None, _size=0),
)
@classmethod
def using_device(cls, device, **kwargs):
@ -92,11 +77,24 @@ class Core:
return c
def get_cfg(self):
config_copy = CoreConfig()
memmove(pointer(config_copy), pointer(self.cfg), sizeof(config_copy))
def get_config(self):
cfg = CoreConfig(
_uuid=Uuid(
_data=cast(
create_string_buffer(self.device.uuid.encode("ascii")),
c_char_p,
),
_size=len(self.device.uuid) + 1,
),
_name=self.name.encode("ascii"),
_volume_type=self.device.type_id,
_try_add=False,
_seq_cutoff_threshold=self.seq_cutoff_threshold,
_seq_cutoff_promotion_count=self.seq_cutoff_promotion_count,
_user_metadata=UserMetadata(_data=None, _size=0),
)
return config_copy
return cfg
def get_handle(self):
return self.handle

View File

@ -84,3 +84,43 @@ def test_load_cache_recovery(pyocf_ctx):
cache.stop()
cache = Cache.load_from_device(device_copy)
@pytest.mark.parametrize("open_cores", [True, False])
def test_load_cache_with_cores(pyocf_ctx, open_cores):
cache_device = Volume(S.from_MiB(40))
core_device = Volume(S.from_MiB(40))
cache = Cache.start_on_device(cache_device)
core = Core.using_device(core_device)
cache.add_core(core)
write_data = Data.from_string("This is test data")
io = core.new_io(cache.get_default_queue(), S.from_sector(3).B,
write_data.size, IoDir.WRITE, 0, 0)
io.set_data(write_data)
cmpl = OcfCompletion([("err", c_int)])
io.callback = cmpl.callback
io.submit()
cmpl.wait()
cache.stop()
cache = Cache.load_from_device(cache_device, open_cores=open_cores)
if not open_cores:
cache.add_core(core, try_add=True)
read_data = Data(write_data.size)
io = core.new_io(cache.get_default_queue(), S.from_sector(3).B,
read_data.size, IoDir.READ, 0, 0)
io.set_data(read_data)
cmpl = OcfCompletion([("err", c_int)])
io.callback = cmpl.callback
io.submit()
cmpl.wait()
assert read_data.md5() == write_data.md5()
assert core.exp_obj_md5() == core_device.md5()