pyocf: make open/close explicit

Signed-off-by: Jan Musial <jan.musial@intel.com>
This commit is contained in:
Jan Musial 2022-06-20 14:59:40 +02:00
parent abc726d7f8
commit a0c6995189
23 changed files with 168 additions and 109 deletions

View File

@ -251,6 +251,8 @@ class Rio:
self.errors.update({thread.name: thread.errors})
self.error_count += len(thread.errors)
self.global_jobspec.target.close()
return self
def __del__(self):
@ -275,6 +277,8 @@ class Rio:
queues = cycle(queues)
self.global_jobspec.target.open()
for job in jobs:
spec = job.merge(self.global_jobspec)
thread = Rio.RioThread(spec, next(queues))

View File

@ -444,6 +444,7 @@ class ErrorDevice(Volume):
uuid=None,
):
self.vol = vol
self.vol.open()
super().__init__(uuid)
self.error_sectors = error_sectors or set()
self.error_seq_no = error_seq_no or {IoDir.WRITE: -1, IoDir.READ: -1}
@ -523,6 +524,10 @@ class ErrorDevice(Volume):
def get_copy(self):
return self.vol.get_copy()
def close(self):
super().close()
self.vol.close()
class TraceDevice(Volume):
class IoType(IntEnum):

View File

@ -13,12 +13,10 @@ from .volume import Volume
class CacheVolume(OcfInternalVolume):
def __init__(self, cache, open=False, uuid=None):
def __init__(self, cache, uuid=None):
super().__init__(cache, uuid)
self.cache = cache
self.lib = cache.owner.lib
if open:
self.open()
def get_c_handle(self):
return self.cache.get_c_front_volume()

View File

@ -10,12 +10,10 @@ from .volume import Volume
class CoreVolume(OcfInternalVolume):
def __init__(self, core, open=False, uuid=None):
def __init__(self, core, uuid=None):
super().__init__(core, uuid)
self.core = core
self.lib = core.cache.owner.lib
if open:
self.open()
def get_c_handle(self):
return self.core.get_c_front_volume()

View File

@ -11,7 +11,7 @@ from .volume import Volume, VOLUME_POISON
from pyocf.utils import Size
from pyocf.types.data import Data
from pyocf.types.io import IoDir, Io
from pyocf.types.shared import OcfCompletion
from pyocf.types.shared import OcfCompletion, OcfError
class OcfInternalVolume(Volume):
@ -20,9 +20,8 @@ class OcfInternalVolume(Volume):
self.parent = parent
def __alloc_io(self, addr, _bytes, _dir, _class, _flags):
vol = self.parent.get_front_volume()
queue = self.parent.get_default_queue() # TODO multiple queues?
return vol.new_io(queue, addr, _bytes, _dir, _class, _flags)
return self.new_io(queue, addr, _bytes, _dir, _class, _flags)
def _alloc_io(self, io):
exp_obj_io = self.__alloc_io(
@ -33,7 +32,6 @@ class OcfInternalVolume(Volume):
io.contents._flags,
)
lib = OcfLib.getInstance()
cdata = OcfLib.getInstance().ocf_io_get_data(io)
OcfLib.getInstance().ocf_io_set_data(byref(exp_obj_io), cdata, 0)
@ -87,6 +85,7 @@ class OcfInternalVolume(Volume):
raise NotImplementedError
def _exp_obj_md5(self, read_size):
self.open()
logging.getLogger("pyocf").warning(
"Reading whole exported object! This disturbs statistics values"
)
@ -111,14 +110,23 @@ class OcfInternalVolume(Volume):
read_buffer_all.copy(read_buffer, position, 0, read_size)
position += read_size
self.close()
return read_buffer_all.md5()
def open(self):
ret = super().open()
if ret:
return ret
handle = self.get_c_handle()
return Volume.s_open(handle, self)
self.handle = handle
return ret
def close(self):
return Volume.s_close(self)
super().close()
self.handle = None
lib = OcfLib.getInstance()

View File

@ -14,22 +14,23 @@ class ReplicatedVolume(Volume):
super().__init__(uuid)
self.primary = primary
self.secondary = secondary
ret = self.primary.open()
if ret:
raise Exception(f"Couldn't open primary volume. ({ret})")
ret = self.secondary.open()
if ret:
raise Exception(f"Couldn't open secondary volume. ({ret})")
if secondary.get_max_io_size() < primary.get_max_io_size():
raise Exception("secondary volume max io size too small")
if secondary.get_length() < primary.get_length():
raise Exception("secondary volume size too small")
def do_open(self):
ret = self.primary.do_open()
if ret:
return ret
ret = self.secondary.do_open()
if ret:
self.primary.close()
return ret
def open(self):
return super().open()
def do_close(self):
def close(self):
super().close()
self.primary.close()
self.secondary.close()

View File

@ -30,7 +30,7 @@ def test_simple_wt_write(pyocf_ctx):
queue = cache.get_default_queue()
cache.add_core(core)
vol = CoreVolume(core, open=True)
vol = CoreVolume(core)
cache_device.reset_stats()
core_device.reset_stats()
@ -91,9 +91,10 @@ def test_load_cache_with_cores(pyocf_ctx, open_cores):
core = Core.using_device(core_device, name="test_core")
cache.add_core(core)
vol = CoreVolume(core, open=True)
vol = CoreVolume(core)
write_data = Data.from_string("This is test data")
vol.open()
io = vol.new_io(
cache.get_default_queue(), S.from_sector(3).B, write_data.size, IoDir.WRITE, 0, 0
)
@ -103,6 +104,7 @@ def test_load_cache_with_cores(pyocf_ctx, open_cores):
io.callback = cmpl.callback
io.submit()
cmpl.wait()
vol.close()
cache.stop()
@ -112,9 +114,10 @@ def test_load_cache_with_cores(pyocf_ctx, open_cores):
else:
core = cache.get_core_by_name("test_core")
vol = CoreVolume(core, open=True)
vol = CoreVolume(core)
read_data = Data(write_data.size)
vol.open()
io = vol.new_io(cache.get_default_queue(), S.from_sector(3).B, read_data.size, IoDir.READ, 0, 0)
io.set_data(read_data)
@ -122,6 +125,7 @@ def test_load_cache_with_cores(pyocf_ctx, open_cores):
io.callback = cmpl.callback
io.submit()
cmpl.wait()
vol.close()
assert read_data.md5() == write_data.md5()
assert vol.md5() == core_device.md5()

View File

@ -40,10 +40,11 @@ def test_flush_propagation(pyocf_ctx):
cache.add_core(core)
queue = cache.get_default_queue()
vol = CoreVolume(core, open=True)
vol = CoreVolume(core)
flushes = {}
vol.open()
io = vol.new_io(queue, addr, size, IoDir.WRITE, 0, IoFlags.FLUSH)
completion = OcfCompletion([("err", c_int)])
io.callback = completion.callback
@ -52,6 +53,7 @@ def test_flush_propagation(pyocf_ctx):
io.submit_flush()
completion.wait()
vol.close()
assert int(completion.results["err"]) == 0

View File

@ -29,9 +29,9 @@ def __io(io, queue, address, size, data, direction):
return int(completion.results["err"])
def io_to_exp_obj(core, address, size, data, offset, direction, flags):
vol = core.get_front_volume()
queue = core.cache.get_default_queue()
def io_to_exp_obj(vol, address, size, data, offset, direction, flags):
queue = vol.parent.get_default_queue()
vol.open()
io = vol.new_io(queue, address, size, direction, 0, flags)
if direction == IoDir.READ:
_data = Data.from_bytes(bytes(size))
@ -40,6 +40,7 @@ def io_to_exp_obj(core, address, size, data, offset, direction, flags):
ret = __io(io, queue, address, size, _data, direction)
if not ret and direction == IoDir.READ:
memmove(cast(data, c_void_p).value + offset, _data.handle, size)
vol.close()
return ret
@ -83,37 +84,37 @@ def test_io_flags(pyocf_ctx, cache_mode):
core = Core.using_device(core_device)
cache.add_core(core)
vol = CoreVolume(core, open=True)
vol = CoreVolume(core)
cache_device.set_check(True)
core_device.set_check(True)
# write miss
io_to_exp_obj(core, block_size * 0, block_size, data, 0, IoDir.WRITE, flags)
io_to_exp_obj(vol, block_size * 0, block_size, data, 0, IoDir.WRITE, flags)
assert not cache_device.fail
assert not core_device.fail
# read miss
io_to_exp_obj(core, block_size * 1, block_size, data, 0, IoDir.READ, flags)
io_to_exp_obj(vol, block_size * 1, block_size, data, 0, IoDir.READ, flags)
assert not cache_device.fail
assert not core_device.fail
# "dirty" read hit
io_to_exp_obj(core, block_size * 0, block_size, data, 0, IoDir.READ, flags)
io_to_exp_obj(vol, block_size * 0, block_size, data, 0, IoDir.READ, flags)
assert not cache_device.fail
assert not core_device.fail
# "clean" read hit
io_to_exp_obj(core, block_size * 1, block_size, data, 0, IoDir.READ, flags)
io_to_exp_obj(vol, block_size * 1, block_size, data, 0, IoDir.READ, flags)
assert not cache_device.fail
assert not core_device.fail
# "dirty" write hit
io_to_exp_obj(core, block_size * 0, block_size, data, 0, IoDir.WRITE, flags)
io_to_exp_obj(vol, block_size * 0, block_size, data, 0, IoDir.WRITE, flags)
assert not cache_device.fail
assert not core_device.fail
# "clean" write hit
io_to_exp_obj(core, block_size * 1, block_size, data, 0, IoDir.WRITE, flags)
io_to_exp_obj(vol, block_size * 1, block_size, data, 0, IoDir.WRITE, flags)
assert not cache_device.fail
assert not core_device.fail

View File

@ -24,7 +24,8 @@ def test_large_flush(pyocf_ctx):
cache.add_core(core)
queue = cache.get_default_queue()
vol = CoreVolume(core, open=True)
vol = CoreVolume(core)
vol.open()
io = vol.new_io(queue, 0, core_device.size.bytes, IoDir.WRITE, 0, IoFlags.FLUSH)
completion = OcfCompletion([("err", c_int)])
@ -33,6 +34,7 @@ def test_large_flush(pyocf_ctx):
io.set_data(data, 0)
io.submit_flush()
completion.wait()
vol.close()
assert int(completion.results["err"]) == 0
@ -48,7 +50,8 @@ def test_large_discard(pyocf_ctx):
cache.add_core(core)
queue = cache.get_default_queue()
vol = CoreVolume(core, open=True)
vol = CoreVolume(core)
vol.open()
io = vol.new_io(queue, 0, core_device.size.bytes, IoDir.WRITE, 0, 0)
completion = OcfCompletion([("err", c_int)])
@ -57,6 +60,7 @@ def test_large_discard(pyocf_ctx):
io.set_data(data, 0)
io.submit_discard()
completion.wait()
vol.close()
assert int(completion.results["err"]) == 0
@ -72,7 +76,8 @@ def test_large_io(pyocf_ctx):
cache.add_core(core)
queue = cache.get_default_queue()
vol = CoreVolume(core, open=True)
vol = CoreVolume(core)
vol.open()
io = vol.new_io(queue, 0, core_device.size.bytes, IoDir.WRITE, 0, 0)
completion = OcfCompletion([("err", c_int)])
@ -82,6 +87,8 @@ def test_large_io(pyocf_ctx):
io.submit()
completion.wait()
vol.close()
assert int(completion.results["err"]) == 0
cache.stop()

View File

@ -63,7 +63,7 @@ def test_change_to_nhit_and_back_io_in_flight(pyocf_ctx):
core = Core.using_device(core_device)
cache.add_core(core)
vol = CoreVolume(core, open=True)
vol = CoreVolume(core)
queue = cache.get_default_queue()
# Step 2
@ -111,7 +111,7 @@ def fill_cache(cache, fill_ratio):
bytes_to_fill = Size(round(cache_lines.bytes * fill_ratio))
core = cache.cores[0]
vol = CoreVolume(core, open=True)
vol = CoreVolume(core)
queue = cache.get_default_queue()
r = (
@ -147,7 +147,7 @@ def test_promoted_after_hits_various_thresholds(pyocf_ctx, insertion_threshold,
cache = Cache.start_on_device(cache_device, promotion_policy=PromotionPolicy.NHIT)
core = Core.using_device(core_device)
cache.add_core(core)
vol = CoreVolume(core, open=True)
vol = CoreVolume(core)
queue = cache.get_default_queue()
# Step 2
@ -218,7 +218,7 @@ def test_partial_hit_promotion(pyocf_ctx):
cache = Cache.start_on_device(cache_device)
core = Core.using_device(core_device)
cache.add_core(core)
vol = CoreVolume(core, open=True)
vol = CoreVolume(core)
queue = cache.get_default_queue()
# Step 2

View File

@ -243,7 +243,7 @@ def test_read_data_consistency(pyocf_ctx, cacheline_size, cache_mode, rand_seed)
core = Core.using_device(core_device)
cache.add_core(core)
queue = cache.get_default_queue()
vol = CoreVolume(core, open=True)
vol = CoreVolume(core)
insert_order = list(range(CACHELINE_COUNT))
@ -279,6 +279,8 @@ def test_read_data_consistency(pyocf_ctx, cacheline_size, cache_mode, rand_seed)
for _ in range(ITRATION_COUNT - len(region_statuses)):
region_statuses.append([random.choice(list(SectorStatus)) for _ in range(num_regions)])
vol.open()
# iterate over generated status combinations and perform the test
for region_state in region_statuses:
# write data to core and invalidate all CL and write data pattern to core
@ -383,3 +385,4 @@ def test_read_data_consistency(pyocf_ctx, cacheline_size, cache_mode, rand_seed)
), "unexpected write to core device, region_state={}, start={}, end={}, insert_order = {}\n".format(
region_state, start, end, insert_order
)
vol.close()

View File

@ -95,13 +95,14 @@ def test_seq_cutoff_max_streams(pyocf_ctx):
core = Core.using_device(RamVolume(core_size), seq_cutoff_promotion_count=1)
cache.add_core(core)
vol = CoreVolume(core, open=True)
vol = CoreVolume(core)
queue = cache.get_default_queue()
cache.set_seq_cut_off_policy(SeqCutOffPolicy.ALWAYS)
cache.set_seq_cut_off_threshold(threshold)
# STEP 1
vol.open()
shuffle(streams)
io_size = threshold - Size.from_sector(1)
io_to_streams(vol, queue, streams, io_size)
@ -139,7 +140,9 @@ def test_seq_cutoff_max_streams(pyocf_ctx):
# STEP 4
io_to_streams(vol, queue, [lru_stream], Size.from_sector(1))
vol.close()
stats = cache.get_stats()
assert (
stats["req"]["serviced"]["value"] == old_serviced + 2
), "This request should be serviced by cache - lru_stream should be no longer tracked"

View File

@ -35,14 +35,14 @@ def test_eviction_two_cores(pyocf_ctx, mode: CacheMode, cls: CacheLineSize):
core1 = Core.using_device(core_device1, name="core1")
core2 = Core.using_device(core_device2, name="core2")
cache.add_core(core1)
vol1 = CoreVolume(core1, open=True)
vol1 = CoreVolume(core1)
cache.add_core(core2)
vol2 = CoreVolume(core2, open=True)
vol2 = CoreVolume(core2)
valid_io_size = Size.from_B(cache_size.B)
test_data = Data(valid_io_size)
send_io(core1, test_data)
send_io(core2, test_data)
send_io(vol1, test_data)
send_io(vol2, test_data)
stats1 = core1.get_stats()
stats2 = core2.get_stats()
@ -62,12 +62,12 @@ def test_write_size_greater_than_cache(pyocf_ctx, mode: CacheMode, cls: CacheLin
cache_size = cache.get_stats()["conf"]["size"]
core = Core.using_device(core_device)
cache.add_core(core)
vol = CoreVolume(core, open=True)
vol = CoreVolume(core)
cache.set_seq_cut_off_policy(SeqCutOffPolicy.NEVER)
valid_io_size = Size.from_B(cache_size.B // 2)
test_data = Data(valid_io_size)
send_io(core, test_data)
send_io(vol, test_data)
stats = core.cache.get_stats()
first_block_sts = stats["block"]
@ -84,7 +84,7 @@ def test_write_size_greater_than_cache(pyocf_ctx, mode: CacheMode, cls: CacheLin
io_size_bigger_than_cache = Size.from_MiB(100)
io_offset = valid_io_size
test_data = Data(io_size_bigger_than_cache)
send_io(core, test_data, io_offset)
send_io(vol, test_data, io_offset)
if mode is not CacheMode.WT:
# Flush first write
@ -115,7 +115,7 @@ def test_evict_overflown_pinned(pyocf_ctx, cls: CacheLineSize):
cache = Cache.start_on_device(cache_device, cache_mode=CacheMode.WT, cache_line_size=cls)
core = Core.using_device(core_device)
cache.add_core(core)
vol = CoreVolume(core, open=True)
vol = CoreVolume(core)
test_ioclass_id = 1
pinned_ioclass_id = 2
@ -139,7 +139,7 @@ def test_evict_overflown_pinned(pyocf_ctx, cls: CacheLineSize):
# Populate cache with data
for i in range(cache_size.blocks_4k):
send_io(core, data, i * 4096, test_ioclass_id)
send_io(vol, data, i * 4096, test_ioclass_id)
part_current_size = CacheLines(
cache.get_partition_info(part_id=test_ioclass_id)["_curr_size"], cls
@ -151,7 +151,7 @@ def test_evict_overflown_pinned(pyocf_ctx, cls: CacheLineSize):
# Repart - force overflow of second partition occupancy limit
pinned_double_size = ceil((cache_size.blocks_4k * pinned_ioclass_max_occupancy * 2) / 100)
for i in range(pinned_double_size):
send_io(core, data, i * 4096, pinned_ioclass_id)
send_io(vol, data, i * 4096, pinned_ioclass_id)
part_current_size = CacheLines(
cache.get_partition_info(part_id=pinned_ioclass_id)["_curr_size"], cls
@ -162,7 +162,7 @@ def test_evict_overflown_pinned(pyocf_ctx, cls: CacheLineSize):
# Trigger IO to the default ioclass - force eviction from overlown ioclass
for i in range(cache_size.blocks_4k):
send_io(core, data, (cache_size.blocks_4k + i) * 4096, test_ioclass_id)
send_io(vol, data, (cache_size.blocks_4k + i) * 4096, test_ioclass_id)
part_current_size = CacheLines(
cache.get_partition_info(part_id=pinned_ioclass_id)["_curr_size"], cls
@ -172,10 +172,10 @@ def test_evict_overflown_pinned(pyocf_ctx, cls: CacheLineSize):
), "Overflown part has not been evicted"
def send_io(core: Core, data: Data, addr: int = 0, target_ioclass: int = 0):
vol = core.get_front_volume()
def send_io(vol: CoreVolume, data: Data, addr: int = 0, target_ioclass: int = 0):
vol.open()
io = vol.new_io(
core.cache.get_default_queue(), addr, data.size, IoDir.WRITE, target_ioclass, 0,
vol.parent.get_default_queue(), addr, data.size, IoDir.WRITE, target_ioclass, 0,
)
io.set_data(data)
@ -184,5 +184,6 @@ def send_io(core: Core, data: Data, addr: int = 0, target_ioclass: int = 0):
io.callback = completion.callback
io.submit()
completion.wait()
vol.close()
assert completion.results["err"] == 0, "IO to exported object completion"

View File

@ -34,7 +34,7 @@ def test_test_standby_io(pyocf_ctx, cacheline_size):
cache.add_io_queue(f"io-queue-{i}")
cache.standby_attach(cache_vol)
cache_vol = CacheVolume(cache, open=True)
cache_vol = CacheVolume(cache)
r = (
Rio()
@ -74,7 +74,7 @@ def test_test_standby_io_metadata(pyocf_ctx, cacheline_size):
io_offset = Size.from_page(start)
io_size = Size.from_page(count)
cache_vol = CacheVolume(cache, open=True)
cache_vol = CacheVolume(cache)
r = (
Rio()

View File

@ -30,6 +30,7 @@ def __io(io, queue, address, size, data, direction):
def io_to_exp_obj(vol, queue, address, size, data, offset, direction, flags):
vol.open()
io = vol.new_io(queue, address, size, direction, 0, flags)
if direction == IoDir.READ:
_data = Data.from_bytes(bytes(size))
@ -38,6 +39,7 @@ def io_to_exp_obj(vol, queue, address, size, data, offset, direction, flags):
ret = __io(io, queue, address, size, _data, direction)
if not ret and direction == IoDir.READ:
memmove(cast(data, c_void_p).value + offset, _data.handle, size)
vol.close()
return ret
@ -77,7 +79,7 @@ def test_flush_after_mngmt(pyocf_ctx):
cache.add_core(core)
assert cache_device.flush_last
vol = CoreVolume(core, open=True)
vol = CoreVolume(core)
queue = cache.get_default_queue()
# WT I/O to write data to core and cache VC

View File

@ -75,7 +75,7 @@ def test_remove_dirty_no_flush(pyocf_ctx, cache_mode, cls):
core = Core.using_device(core_device)
cache.add_core(core)
vol = CoreVolume(core, open=True)
vol = CoreVolume(core)
queue = core.cache.get_default_queue()
# Prepare data
@ -121,10 +121,11 @@ def test_10add_remove_with_io(pyocf_ctx):
# Add and remove core 10 times in a loop with io in between
for i in range(0, 10):
cache.add_core(core)
vol = CoreVolume(core, open=True)
vol = CoreVolume(core)
stats = cache.get_stats()
assert stats["conf"]["core_count"] == 1
vol.open()
write_data = Data.from_string("Test data")
io = vol.new_io(
cache.get_default_queue(), S.from_sector(1).B, write_data.size, IoDir.WRITE, 0, 0
@ -135,6 +136,7 @@ def test_10add_remove_with_io(pyocf_ctx):
io.callback = cmpl.callback
io.submit()
cmpl.wait()
vol.close()
cache.remove_core(core)
stats = cache.get_stats()
@ -299,6 +301,7 @@ def test_add_remove_incrementally(pyocf_ctx, cache_mode, cls):
def _io_to_core(vol: Volume, queue: Queue, data: Data):
vol.open()
io = vol.new_io(queue, 0, data.size, IoDir.WRITE, 0, 0)
io.set_data(data)
@ -307,6 +310,7 @@ def _io_to_core(vol: Volume, queue: Queue, data: Data):
io.submit()
completion.wait()
vol.close()
assert completion.results["err"] == 0, "IO to exported object completion"

View File

@ -48,7 +48,7 @@ def test_attach_different_size(pyocf_ctx, new_cache_size, mode: CacheMode, cls:
core = Core.using_device(core_device)
cache.add_core(core)
vol = CoreVolume(core, open=True)
vol = CoreVolume(core)
queue = cache.get_default_queue()
cache.configure_partition(part_id=1, name="test_part", max_size=50, priority=1)
@ -82,6 +82,7 @@ def test_attach_different_size(pyocf_ctx, new_cache_size, mode: CacheMode, cls:
def io_to_exp_obj(vol, queue, address, size, data, offset, direction, target_ioclass, flags):
vol.open()
io = vol.new_io(queue, address, size, direction, target_ioclass, flags)
if direction == IoDir.READ:
_data = Data.from_bytes(bytes(size))
@ -90,6 +91,7 @@ def io_to_exp_obj(vol, queue, address, size, data, offset, direction, target_ioc
ret = __io(io, queue, address, size, _data, direction)
if not ret and direction == IoDir.READ:
memmove(cast(data, c_void_p).value + offset, _data.handle, size)
vol.close()
return ret

View File

@ -283,7 +283,7 @@ def test_standby_activate_core_size_mismatch(pyocf_2_ctx):
cache = Cache(owner=ctx, cache_mode=mode, cache_line_size=cls)
cache.start_cache()
cache.standby_attach(vol2)
cache_vol = CacheVolume(cache, open=True)
cache_vol = CacheVolume(cache)
write_vol(cache_vol, cache.get_default_queue(), data)
@ -327,14 +327,14 @@ def test_failover_passive_first(pyocf_2_ctx):
cache2.standby_attach(sec_cache_backend_vol)
# volume replicating cache1 ramdisk writes to cache2 cache exported object
cache2_exp_obj_vol = CacheVolume(cache2, open=True)
cache2_exp_obj_vol = CacheVolume(cache2)
cache1_cache_vol = ReplicatedVolume(prim_cache_backend_vol, cache2_exp_obj_vol)
# active cache
cache1 = Cache.start_on_device(cache1_cache_vol, ctx1, cache_mode=mode, cache_line_size=cls)
core = Core(core_backend_vol)
cache1.add_core(core)
core_vol = CoreVolume(core, open=True)
core_vol = CoreVolume(core)
queue = cache1.get_default_queue()
# some I/O
@ -365,7 +365,7 @@ def test_failover_passive_first(pyocf_2_ctx):
# add core explicitly with "try_add" to workaround pyocf limitations
core = Core(core_backend_vol)
cache2.add_core(core, try_add=True)
core_vol = CoreVolume(core, open=True)
core_vol = CoreVolume(core)
assert md5 == core_vol.md5()
@ -373,6 +373,7 @@ def test_failover_passive_first(pyocf_2_ctx):
def write_vol(vol, queue, data):
data_size = len(data)
subdata_size_max = int(Size.from_MiB(32))
vol.open()
for offset in range(0, data_size, subdata_size_max):
subdata_size = min(data_size - offset, subdata_size_max)
subdata = Data.from_bytes(data, offset, subdata_size)
@ -382,6 +383,7 @@ def write_vol(vol, queue, data):
io.callback = comp.callback
io.submit()
comp.wait()
vol.close()
def test_failover_active_first(pyocf_2_ctx):
@ -399,7 +401,7 @@ def test_failover_active_first(pyocf_2_ctx):
)
core = Core(core_backend_vol)
cache1.add_core(core)
vol = CoreVolume(core, open=True)
vol = CoreVolume(core)
queue1 = cache1.get_default_queue()
# some I/O
@ -431,11 +433,11 @@ def test_failover_active_first(pyocf_2_ctx):
cache2 = Cache(owner=ctx2, cache_mode=mode, cache_line_size=cls)
cache2.start_cache()
cache2.standby_attach(sec_cache_backend_vol)
vol2 = CacheVolume(cache2, open=True)
vol2 = CacheVolume(cache2)
queue = cache2.get_default_queue()
# standby cache exported object volume
cache2_exp_obj_vol = CacheVolume(cache2, open=True)
cache2_exp_obj_vol = CacheVolume(cache2)
# just to be sure
assert sec_cache_backend_vol.get_bytes() != prim_cache_backend_vol.get_bytes()
@ -453,7 +455,7 @@ def test_failover_active_first(pyocf_2_ctx):
cache2.standby_activate(sec_cache_backend_vol, open_cores=False)
core = Core(core_backend_vol)
cache2.add_core(core, try_add=True)
vol = CoreVolume(core, open=True)
vol = CoreVolume(core)
# check data consistency
assert data_md5 == vol.md5()
@ -498,7 +500,7 @@ def test_failover_line_size_mismatch(pyocf_2_ctx):
cache = Cache(owner=ctx, cache_mode=mode, cache_line_size=cls2)
cache.start_cache()
cache.standby_attach(vol2)
cache_vol = CacheVolume(cache, open=True)
cache_vol = CacheVolume(cache)
write_vol(cache_vol, cache.get_default_queue(), data)
@ -544,14 +546,14 @@ def test_failover_passive_first(pyocf_2_ctx):
cache2.standby_attach(sec_cache_backend_vol)
# volume replicating cache1 ramdisk writes to cache2 cache exported object
cache2_exp_obj_vol = CacheVolume(cache2, open=True)
cache2_exp_obj_vol = CacheVolume(cache2)
cache1_cache_vol = ReplicatedVolume(prim_cache_backend_vol, cache2_exp_obj_vol)
# active cache
cache1 = Cache.start_on_device(cache1_cache_vol, ctx1, cache_mode=mode, cache_line_size=cls)
core = Core(core_backend_vol)
cache1.add_core(core)
core_vol = CoreVolume(core, open=True)
core_vol = CoreVolume(core)
queue = cache1.get_default_queue()
# some I/O
@ -582,6 +584,6 @@ def test_failover_passive_first(pyocf_2_ctx):
# add core explicitly with "try_add" to workaround pyocf limitations
core = Core(core_backend_vol)
cache2.add_core(core, try_add=True)
core_vol = CoreVolume(core, open=True)
core_vol = CoreVolume(core)
assert md5 == core_vol.md5()

View File

@ -84,7 +84,7 @@ def test_start_write_first_and_check_mode(pyocf_ctx, mode: CacheMode, cls: Cache
core = Core.using_device(core_device)
cache.add_core(core)
vol = CoreVolume(core, open=True)
vol = CoreVolume(core)
queue = cache.get_default_queue()
logger.info("[STAGE] Initial write to exported object")
@ -108,7 +108,7 @@ def test_start_write_first_and_check_mode(pyocf_ctx, mode: CacheMode, cls: Cache
io_to_core(vol, queue, test_data, Size.from_sector(1).B)
check_stats_write_after_read(core, mode, cls)
check_md5_sums(core, mode)
check_md5_sums(vol, mode)
@pytest.mark.parametrize("cls", CacheLineSize)
@ -124,7 +124,7 @@ def test_start_read_first_and_check_mode(pyocf_ctx, mode: CacheMode, cls: CacheL
core = Core.using_device(core_device)
cache.add_core(core)
front_vol = CoreVolume(core, open=True)
front_vol = CoreVolume(core)
bottom_vol = core.get_volume()
queue = cache.get_default_queue()
@ -153,7 +153,7 @@ def test_start_read_first_and_check_mode(pyocf_ctx, mode: CacheMode, cls: CacheL
io_from_exported_object(front_vol, queue, test_data.size, Size.from_sector(1).B)
check_stats_read_after_write(core, mode, cls)
check_md5_sums(core, mode)
check_md5_sums(vol, mode)
@pytest.mark.parametrize("cls", CacheLineSize)
@ -208,7 +208,7 @@ def test_stop(pyocf_ctx, mode: CacheMode, cls: CacheLineSize, with_flush: bool):
core = Core.using_device(core_device)
cache.add_core(core)
front_vol = CoreVolume(core, open=True)
front_vol = CoreVolume(core)
queue = cache.get_default_queue()
cls_no = 10
@ -518,6 +518,7 @@ def run_io_and_cache_data_if_possible(core, mode, cls, cls_no):
def io_to_core(vol: Volume, queue: Queue, data: Data, offset: int):
vol.open()
io = vol.new_io(queue, offset, data.size, IoDir.WRITE, 0, 0)
io.set_data(data)
@ -526,11 +527,13 @@ def io_to_core(vol: Volume, queue: Queue, data: Data, offset: int):
io.submit()
completion.wait()
vol.close()
assert completion.results["err"] == 0, "IO to exported object completion"
def io_from_exported_object(vol: Volume, queue: Queue, buffer_size: int, offset: int):
read_buffer = Data(buffer_size)
vol.open()
io = vol.new_io(queue, offset, read_buffer.size, IoDir.READ, 0, 0)
io.set_data(read_buffer)
@ -538,6 +541,7 @@ def io_from_exported_object(vol: Volume, queue: Queue, buffer_size: int, offset:
io.callback = completion.callback
io.submit()
completion.wait()
vol.close()
assert completion.results["err"] == 0, "IO from exported object completion"
return read_buffer
@ -634,17 +638,17 @@ def check_stats_read_after_write(core, mode, cls, write_to_empty=False):
), "Occupancy"
def check_md5_sums(core: Core, mode: CacheMode):
def check_md5_sums(vol: CoreVolume, mode: CacheMode):
if mode.lazy_write():
assert (
core.device.md5() != core.get_front_volume().md5()
vol.parent.device.md5() != vol.md5()
), "MD5 check: core device vs exported object without flush"
core.cache.flush()
assert (
core.device.md5() == core.get_front_volume().md5()
vol.parent.device.md5() == vol.md5()
), "MD5 check: core device vs exported object after flush"
else:
assert (
core.device.md5() == core.get_front_volume().md5()
vol.parent.device.md5() == vol.md5()
), "MD5 check: core device vs exported object"

View File

@ -191,7 +191,7 @@ def prepare_cache_and_core(core_size: Size, cache_size: Size = Size.from_MiB(50)
core = Core.using_device(core_device)
cache.add_core(core)
vol = CoreVolume(core, open=True)
vol = CoreVolume(core)
queue = cache.get_default_queue()
return vol, queue
@ -200,6 +200,7 @@ def prepare_cache_and_core(core_size: Size, cache_size: Size = Size.from_MiB(50)
def io_operation(
vol: Volume, queue: Queue, data: Data, io_direction: int, offset: int = 0, io_class: int = 0,
):
vol.open()
io = vol.new_io(queue, offset, data.size, io_direction, io_class, 0)
io.set_data(data)
@ -207,4 +208,5 @@ def io_operation(
io.callback = completion.callback
io.submit()
completion.wait()
vol.close()
return completion

View File

@ -82,10 +82,11 @@ def test_secure_erase_simple_io_read_misses(cache_mode):
core_device = RamVolume(S.from_MiB(50))
core = Core.using_device(core_device)
cache.add_core(core)
vol = CoreVolume(core, open=True)
vol = CoreVolume(core)
queue = cache.get_default_queue()
write_data = DataCopyTracer(S.from_sector(1))
vol.open()
io = vol.new_io(queue, S.from_sector(1).B, write_data.size, IoDir.WRITE, 0, 0,)
io.set_data(write_data)
@ -117,6 +118,7 @@ def test_secure_erase_simple_io_read_misses(cache_mode):
io.submit()
cmpl.wait()
vol.close()
stats = cache.get_stats()
ctx.exit()
@ -156,10 +158,12 @@ def test_secure_erase_simple_io_cleaning():
core_device = RamVolume(S.from_MiB(100))
core = Core.using_device(core_device)
cache.add_core(core)
vol = CoreVolume(core, open=True)
vol = CoreVolume(core)
queue = cache.get_default_queue()
read_data = Data(S.from_sector(1).B)
vol.open()
io = vol.new_io(queue, S.from_sector(1).B, read_data.size, IoDir.WRITE, 0, 0)
io.set_data(read_data)
@ -177,6 +181,7 @@ def test_secure_erase_simple_io_cleaning():
io.submit()
cmpl.wait()
vol.close()
stats = cache.get_stats()
ctx.exit()

View File

@ -39,6 +39,7 @@ mngmt_op_surprise_shutdown_test_io_offset = S.from_MiB(4).B
def ocf_write(vol, queue, val, offset):
vol.open()
data = Data.from_bytes(bytes([val] * 512))
comp = OcfCompletion([("error", c_int)])
io = vol.new_io(queue, offset, 512, IoDir.WRITE, 0, 0)
@ -46,9 +47,11 @@ def ocf_write(vol, queue, val, offset):
io.callback = comp.callback
io.submit()
comp.wait()
vol.close()
def ocf_read(vol, queue, offset):
vol.open()
data = Data(byte_count=512)
comp = OcfCompletion([("error", c_int)])
io = vol.new_io(queue, offset, 512, IoDir.READ, 0, 0)
@ -56,6 +59,7 @@ def ocf_read(vol, queue, offset):
io.callback = comp.callback
io.submit()
comp.wait()
vol.close()
return data.get_bytes()[0]
@ -66,7 +70,7 @@ def prepare_failover(pyocf_2_ctx, cache_backend_vol, error_io_seq_no):
cache2 = Cache(owner=ctx2)
cache2.start_cache()
cache2.standby_attach(cache_backend_vol)
cache2_exp_obj_vol = CacheVolume(cache2, open=True)
cache2_exp_obj_vol = CacheVolume(cache2)
error_io = {IoDir.WRITE: error_io_seq_no}
@ -206,7 +210,7 @@ def test_surprise_shutdown_remove_core_with_data(pyocf_2_ctx, failover):
def prepare_func(cache):
cache.add_core(core)
vol = CoreVolume(core, open=True)
vol = CoreVolume(core)
ocf_write(vol, cache.get_default_queue(), 0xAA, io_offset)
def tested_func(cache):
@ -219,7 +223,7 @@ def test_surprise_shutdown_remove_core_with_data(pyocf_2_ctx, failover):
assert core_device.get_bytes()[io_offset] == 0xAA
else:
core = cache.get_core_by_name("core1")
vol = CoreVolume(core, open=True)
vol = CoreVolume(core)
assert ocf_read(vol, cache.get_default_queue(), io_offset) == 0xAA
mngmt_op_surprise_shutdown_test(pyocf_2_ctx, failover, tested_func, prepare_func, check_func)
@ -273,7 +277,7 @@ def test_surprise_shutdown_swap_core_with_data(pyocf_2_ctx, failover):
def prepare(cache):
cache.add_core(core1)
vol = CoreVolume(core1, open=True)
vol = CoreVolume(core1)
cache.save()
ocf_write(
vol, cache.get_default_queue(), 0xAA, mngmt_op_surprise_shutdown_test_io_offset,
@ -299,7 +303,7 @@ def test_surprise_shutdown_swap_core_with_data(pyocf_2_ctx, failover):
core2 = cache.get_core_by_name("core2")
if core2 is not None:
vol2 = CoreVolume(core2, open=True)
vol2 = CoreVolume(core2)
assert core2.device.uuid == "dev2"
assert (
ocf_read(
@ -332,10 +336,9 @@ def test_surprise_shutdown_start_cache(pyocf_2_ctx, failover):
cache2 = Cache(owner=ctx2)
cache2.start_cache()
cache2.standby_attach(ramdisk)
cache2_exp_obj_vol = CacheVolume(cache2, open=True)
err_device = ErrorDevice(
cache2_exp_obj_vol, error_seq_no=error_io, data_only=True, armed=True
)
cache2_exp_obj_vol = CacheVolume(cache2)
err_device = ErrorDevice(cache2_exp_obj_vol, error_seq_no=error_io, armed=True)
else:
err_device = ErrorDevice(ramdisk, error_seq_no=error_io, data_only=True, armed=True)
@ -404,7 +407,7 @@ def test_surprise_shutdown_stop_cache(pyocf_2_ctx, failover):
core = Core(device=core_device)
cache.add_core(core)
vol = CoreVolume(core, open=True)
vol = CoreVolume(core)
ocf_write(vol, cache.get_default_queue(), 0xAA, io_offset)
# start error injection
@ -444,7 +447,7 @@ def test_surprise_shutdown_stop_cache(pyocf_2_ctx, failover):
assert stats["usage"]["occupancy"]["value"] == 1
core = Core(device=core_device)
cache.add_core(core, try_add=True)
vol = CoreVolume(core, open=True)
vol = CoreVolume(core)
assert ocf_read(vol, cache.get_default_queue(), io_offset) == 0xAA
cache.stop()
@ -473,7 +476,7 @@ def test_surprise_shutdown_cache_reinit(pyocf_2_ctx, failover):
core = Core(device=core_device)
cache.add_core(core)
vol = CoreVolume(core, open=True)
vol = CoreVolume(core)
queue = cache.get_default_queue()
# insert dirty cacheline
@ -531,7 +534,7 @@ def test_surprise_shutdown_cache_reinit(pyocf_2_ctx, failover):
if stats["conf"]["core_count"] == 0:
assert stats["usage"]["occupancy"]["value"] == 0
cache.add_core(core)
vol = CoreVolume(core, open=True)
vol = CoreVolume(core)
assert ocf_read(vol, cache.get_default_queue(), io_offset) == VOLUME_POISON
cache.stop()
@ -822,7 +825,7 @@ def test_surprise_shutdown_standby_activate(pyocf_ctx):
cache = Cache.start_on_device(device, cache_mode=CacheMode.WB)
core = Core(device=core_device)
cache.add_core(core)
vol = CoreVolume(core, open=True)
vol = CoreVolume(core)
ocf_write(vol, cache.get_default_queue(), 0xAA, io_offset)
original_dirty_blocks = cache.get_stats()["usage"]["dirty"]
cache.stop()
@ -867,7 +870,7 @@ def test_surprise_shutdown_standby_activate(pyocf_ctx):
core = Core(device=core_device)
cache.add_core(core, try_add=True)
vol = CoreVolume(core, open=True)
vol = CoreVolume(core)
assert ocf_read(vol, cache.get_default_queue(), io_offset) == 0xAA
cache.stop()
@ -953,7 +956,7 @@ def test_surprise_shutdown_standby_init_force_1(pyocf_ctx):
cache = Cache.start_on_device(device, cache_mode=CacheMode.WB)
core = Core(device=core_device)
cache.add_core(core)
vol = CoreVolume(core, open=True)
vol = CoreVolume(core)
ocf_write(vol, cache.get_default_queue(), 0xAA, io_offset)
original_dirty_blocks = cache.get_stats()["usage"]["dirty"]
cache.stop()
@ -1002,14 +1005,14 @@ def test_surprise_shutdown_standby_init_force_1(pyocf_ctx):
assert original_dirty_blocks == stats["usage"]["dirty"]
core = Core(device=core_device)
cache.add_core(core, try_add=True)
vol = CoreVolume(core, open=True)
vol = CoreVolume(core)
assert ocf_read(vol, cache.get_default_queue(), io_offset) == 0xAA
else:
assert stats["usage"]["occupancy"]["value"] == 0
assert stats["usage"]["dirty"]["value"] == 0
core = Core(device=core_device)
cache.add_core(core)
vol = CoreVolume(core, open=True)
vol = CoreVolume(core)
assert ocf_read(vol, cache.get_default_queue(), io_offset) == VOLUME_POISON
cache.stop()
@ -1043,7 +1046,7 @@ def test_surprise_shutdown_standby_init_force_2(pyocf_ctx):
cache = Cache.start_on_device(device, cache_mode=CacheMode.WB)
core = Core(device=core_device)
cache.add_core(core)
vol = CoreVolume(core, open=True)
vol = CoreVolume(core)
ocf_write(vol, cache.get_default_queue(), 0xAA, io_offset)
original_dirty_blocks = cache.get_stats()["usage"]["dirty"]
cache.stop()
@ -1087,14 +1090,14 @@ def test_surprise_shutdown_standby_init_force_2(pyocf_ctx):
assert original_dirty_blocks == stats["usage"]["dirty"]
core = Core(device=core_device)
cache.add_core(core, try_add=True)
vol = CoreVolume(core, open=True)
vol = CoreVolume(core)
assert ocf_read(vol, cache.get_default_queue(), io_offset) == 0xAA
else:
assert stats["usage"]["occupancy"]["value"] == 0
assert stats["usage"]["dirty"]["value"] == 0
core = Core(device=core_device)
cache.add_core(core)
vol = CoreVolume(core, open=True)
vol = CoreVolume(core)
assert ocf_read(vol, cache.get_default_queue(), io_offset) == VOLUME_POISON
if cache: