Merge pull request #703 from jfckm/metadata-corruption-tests

Metadata corruption tests
This commit is contained in:
Robert Baldyga 2022-07-22 16:06:41 +02:00 committed by GitHub
commit 93b6ddb8a7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
32 changed files with 802 additions and 170 deletions

View File

@ -1,5 +1,5 @@
/*
* Copyright(c) 2022-2022 Intel Corporation
* Copyright(c) 2022 Intel Corporation
* SPDX-License-Identifier: BSD-3-Clause
*/
@ -8,6 +8,7 @@
#include "../src/ocf/ocf_cache_priv.h"
#include "../src/ocf/metadata/metadata_raw.h"
#include "../src/ocf/metadata/metadata_internal.h"
#include "../src/ocf/metadata/metadata_superblock.h"
uint64_t ocf_get_metadata_segment_start_page(ocf_cache_t cache, int segment)
{
@ -46,6 +47,9 @@ uint64_t ocf_get_metadata_segment_elem_size(ocf_cache_t cache, int segment)
struct ocf_metadata_ctrl *ctrl = cache->metadata.priv;
struct ocf_metadata_raw *raw = &ctrl->raw_desc[segment];
if (segment == metadata_segment_sb_config)
return offsetof(struct ocf_superblock_config, checksum);
return raw->entry_size;
}

View File

@ -251,6 +251,8 @@ class Rio:
self.errors.update({thread.name: thread.errors})
self.error_count += len(thread.errors)
self.global_jobspec.target.close()
return self
def __del__(self):
@ -275,6 +277,8 @@ class Rio:
queues = cycle(queues)
self.global_jobspec.target.open()
for job in jobs:
spec = job.merge(self.global_jobspec)
thread = Rio.RioThread(spec, next(queues))

View File

@ -300,10 +300,13 @@ class Cache:
c.wait()
self.write_unlock()
self.device = None
if c.results["error"]:
raise OcfError("Failed to detach failover cache device", c.results["error"])
def standby_activate(self, device, open_cores=True):
self.device = device
device_cfg = self.alloc_device_config(device)
activate_cfg = CacheStandbyActivateConfig(
@ -322,6 +325,7 @@ class Cache:
self.free_device_config(device_cfg)
if c.results["error"]:
self.device = None
raise OcfError("Failed to activate standby cache", c.results["error"])
def change_cache_mode(self, cache_mode: CacheMode):
@ -575,7 +579,6 @@ class Cache:
disable_cleaner=False,
):
self.device = device
self.device_name = device.uuid
device_config = self.alloc_device_config(device, perform_test=perform_test)
@ -607,7 +610,6 @@ class Cache:
def standby_attach(self, device, force=False, disable_cleaner=False):
self.device = device
self.device_name = device.uuid
device_config = self.alloc_device_config(device, perform_test=False)
@ -639,7 +641,6 @@ class Cache:
def standby_load(self, device, perform_test=True, disable_cleaner=False):
self.device = device
self.device_name = device.uuid
device_config = self.alloc_device_config(device, perform_test=perform_test)
@ -661,6 +662,7 @@ class Cache:
self.free_device_config(device_config)
if c.results["error"]:
self.device = None
raise OcfError("Loading standby cache device failed", c.results["error"])
def detach_device(self):
@ -679,7 +681,6 @@ class Cache:
def load_cache(self, device, open_cores=True, disable_cleaner=False):
self.device = device
self.device_name = device.uuid
device_config = self.alloc_device_config(device)
@ -701,8 +702,25 @@ class Cache:
self.free_device_config(device_config)
if c.results["error"]:
self.device = None
raise OcfError("Loading cache device failed", c.results["error"])
@classmethod
def load_standby_from_device(cls, device, owner=None, name="cache", cache_line_size=None):
if owner is None:
owner = OcfCtx.get_default()
c = cls(name=name, owner=owner, cache_line_size=cache_line_size)
c.start_cache()
try:
c.standby_load(device)
except: # noqa E722
c.stop()
raise
return c
@classmethod
def load_from_device(
cls, device, owner=None, name="cache", open_cores=True, disable_cleaner=False

View File

@ -58,13 +58,20 @@ class CVolume(OcfInternalVolume):
def get_c_handle(self):
return self.cvol.value
def do_open(self):
ret = self.lib.ocf_volume_open(self.cvol, c_void_p())
if ret != 0:
raise OcfError("openning composite volume failed", ret)
def open(self):
ret = super().open()
if ret == 0:
ret = self.lib.ocf_volume_open(self.handle, c_void_p())
if ret:
raise OcfError("opening composite volume failed", ret)
return ret
def close(self):
self.lib.ocf_volume_close(self.handle)
super().close()
def do_close(self):
self.lib.ocf_volume_close(self.cvol)
lib = OcfLib.getInstance()

View File

@ -1,5 +1,5 @@
#
# Copyright(c) 2019-2021 Intel Corporation
# Copyright(c) 2019-2022 Intel Corporation
# SPDX-License-Identifier: BSD-3-Clause
#
@ -160,13 +160,19 @@ class FileLogger(Logger):
logger.handlers = []
class BufferLogger(Logger):
def __init__(self, level: LogLevel):
super().__init__()
self.level = level
class BufferLogger(DefaultLogger):
def __init__(
self,
console_level: LogLevel = LogLevel.WARN,
buffer_level: LogLevel = LogLevel.DEBUG,
name: str = ""
):
super().__init__(console_level, name)
self.level = buffer_level
self.buffer = StringIO()
def log(self, lvl, msg):
super().log(lvl, msg)
if lvl < self.level:
self.buffer.write(msg + "\n")

View File

@ -23,6 +23,7 @@ from ctypes import (
from hashlib import md5
import weakref
from enum import IntEnum
import warnings
from .io import Io, IoOps, IoDir
from .queue import Queue
@ -89,7 +90,7 @@ VOLUME_POISON = 0x13
class Volume:
_instances_ = weakref.WeakValueDictionary()
_instances_ = {}
_uuid_ = weakref.WeakValueDictionary()
_ops_ = {}
_props_ = {}
@ -143,16 +144,25 @@ class Volume:
try:
volume = Volume.get_by_uuid(uuid)
except: # noqa E722 TODO:Investigate whether this really should be so broad
print("Tried to access unallocated volume {}".format(uuid))
print("{}".format(Volume._uuid_))
warnings.warn("Tried to access unallocated volume {}".format(uuid))
return -1
return Volume.s_open(ref, volume)
ret = volume.open()
if not ret:
Volume._instances_[ref] = volume
volume.handle = ref
return ret
@VolumeOps.CLOSE
def _close(ref):
volume = Volume.get_instance(ref)
Volume.s_close(volume)
del Volume._instances_[volume.handle]
volume.handle = None
volume.close()
@VolumeOps.GET_MAX_IO_SIZE
def _get_max_io_size(ref):
@ -178,30 +188,19 @@ class Volume:
return Volume._ops_[cls]
@staticmethod
def s_open(ref, volume):
if volume.opened:
def open(self):
if self.opened:
return -OcfErrorCode.OCF_ERR_NOT_OPEN_EXC
Volume._instances_[ref] = volume
volume.handle = ref
self.opened = True
ret = volume.do_open()
if ret == 0:
volume.opened = True
return 0
return ret
@staticmethod
def s_close(volume):
if not volume.opened:
def close(self):
if not self.opened:
return
volume.do_close()
volume.opened = False
del Volume._instances_[volume.handle]
volume.handle = None
self.opened = False
@classmethod
def get_io_ops(cls):
@ -228,11 +227,11 @@ class Volume:
@classmethod
def get_instance(cls, ref):
instance = cls._instances_[ref]
if instance is None:
print("tried to access {} but it's gone".format(ref))
if ref not in cls._instances_:
warnings.warn(f"tried to access volume ref {ref} but it's gone")
return None
return instance
return cls._instances_[ref]
@classmethod
def get_by_uuid(cls, uuid):
@ -269,12 +268,6 @@ class Volume:
self.opened = False
self.handle = None
def do_open(self):
return 0
def do_close(self):
pass
def get_length(self):
raise NotImplementedError
@ -451,6 +444,16 @@ class ErrorDevice(Volume):
def set_mapping(self, error_sectors: set):
self.error_sectors = error_sectors
def open(self):
ret = self.vol.open()
if ret:
return ret
return super().open()
def close(self):
super().close()
self.vol.close()
def should_forward_io(self, io):
if not self.armed:
return True
@ -519,6 +522,10 @@ class ErrorDevice(Volume):
def get_copy(self):
return self.vol.get_copy()
def close(self):
super().close()
self.vol.close()
class TraceDevice(Volume):
class IoType(IntEnum):
@ -531,6 +538,16 @@ class TraceDevice(Volume):
super().__init__(uuid)
self.trace_fcn = trace_fcn
def open(self):
ret = self.vol.open()
if ret:
return ret
return super().open()
def close(self):
super().close()
self.vol.close()
def _trace(self, io, io_type):
submit = True

View File

@ -13,12 +13,10 @@ from .volume import Volume
class CacheVolume(OcfInternalVolume):
def __init__(self, cache, open=False, uuid=None):
def __init__(self, cache, uuid=None):
super().__init__(cache, uuid)
self.cache = cache
self.lib = cache.owner.lib
if open:
self.open()
def get_c_handle(self):
return self.cache.get_c_front_volume()

View File

@ -10,12 +10,10 @@ from .volume import Volume
class CoreVolume(OcfInternalVolume):
def __init__(self, core, open=False, uuid=None):
def __init__(self, core, uuid=None):
super().__init__(core, uuid)
self.core = core
self.lib = core.cache.owner.lib
if open:
self.open()
def get_c_handle(self):
return self.core.get_c_front_volume()

View File

@ -11,7 +11,7 @@ from .volume import Volume, VOLUME_POISON
from pyocf.utils import Size
from pyocf.types.data import Data
from pyocf.types.io import IoDir, Io
from pyocf.types.shared import OcfCompletion
from pyocf.types.shared import OcfCompletion, OcfError
class OcfInternalVolume(Volume):
@ -20,9 +20,8 @@ class OcfInternalVolume(Volume):
self.parent = parent
def __alloc_io(self, addr, _bytes, _dir, _class, _flags):
vol = self.parent.get_front_volume()
queue = self.parent.get_default_queue() # TODO multiple queues?
return vol.new_io(queue, addr, _bytes, _dir, _class, _flags)
return self.new_io(queue, addr, _bytes, _dir, _class, _flags)
def _alloc_io(self, io):
exp_obj_io = self.__alloc_io(
@ -33,7 +32,6 @@ class OcfInternalVolume(Volume):
io.contents._flags,
)
lib = OcfLib.getInstance()
cdata = OcfLib.getInstance().ocf_io_get_data(io)
OcfLib.getInstance().ocf_io_set_data(byref(exp_obj_io), cdata, 0)
@ -87,6 +85,7 @@ class OcfInternalVolume(Volume):
raise NotImplementedError
def _exp_obj_md5(self, read_size):
self.open()
logging.getLogger("pyocf").warning(
"Reading whole exported object! This disturbs statistics values"
)
@ -111,14 +110,23 @@ class OcfInternalVolume(Volume):
read_buffer_all.copy(read_buffer, position, 0, read_size)
position += read_size
self.close()
return read_buffer_all.md5()
def open(self):
ret = super().open()
if ret:
return ret
handle = self.get_c_handle()
return Volume.s_open(handle, self)
self.handle = handle
return ret
def close(self):
return Volume.s_close(self)
super().close()
self.handle = None
lib = OcfLib.getInstance()

View File

@ -5,6 +5,7 @@
from threading import Lock
from .volume import Volume, VOLUME_POISON
from .shared import OcfErrorCode
from .io import Io, IoDir
from ctypes import cast, c_void_p, CFUNCTYPE, c_int, POINTER, memmove, sizeof, pointer
@ -15,21 +16,27 @@ class ReplicatedVolume(Volume):
self.primary = primary
self.secondary = secondary
if secondary.get_max_io_size() < primary.get_max_io_size():
raise Exception("secondary volume max io size too small")
if secondary.get_length() < primary.get_length():
raise Exception("secondary volume size too small")
def open(self):
ret = self.primary.open()
if ret:
raise Exception(f"Couldn't open primary volume. ({ret})")
return ret
ret = self.secondary.open()
if ret:
raise Exception(f"Couldn't open secondary volume. ({ret})")
return ret
def do_open(self):
ret = self.primary.do_open()
if ret:
return ret
ret = self.secondary.do_open()
if ret:
self.primary.close()
return ret
if self.secondary.get_max_io_size() < self.primary.get_max_io_size():
raise Exception("secondary volume max io size too small")
return -OcfErrorCode.OCF_ERR_INVAL
if self.secondary.get_length() < self.primary.get_length():
raise Exception("secondary volume size too small")
return -OcfErrorCode.OCF_ERR_INVAL
return super().open()
def close(self):
super().close()
self.primary.close()
self.secondary.close()

View File

@ -61,12 +61,29 @@ class Size:
_SECTOR_SIZE = 512
_PAGE_SIZE = 4096
_unit_mapping = {
"B": 1,
"kiB": _KiB,
"MiB": _MiB,
"GiB": _GiB,
"TiB": _TiB,
}
def __init__(self, b: int, sector_aligned: bool = False):
if sector_aligned:
self.bytes = int(((b + self._SECTOR_SIZE - 1) // self._SECTOR_SIZE) * self._SECTOR_SIZE)
else:
self.bytes = int(b)
@classmethod
def from_string(cls, string):
string = string.strip()
number, unit = string.split(" ")
number = float(number)
unit = cls._unit_mapping[unit]
return cls(int(number * unit))
def __lt__(self, other):
return int(self) < int(other)

View File

@ -30,7 +30,7 @@ def test_simple_wt_write(pyocf_ctx):
queue = cache.get_default_queue()
cache.add_core(core)
vol = CoreVolume(core, open=True)
vol = CoreVolume(core)
cache_device.reset_stats()
core_device.reset_stats()
@ -91,9 +91,10 @@ def test_load_cache_with_cores(pyocf_ctx, open_cores):
core = Core.using_device(core_device, name="test_core")
cache.add_core(core)
vol = CoreVolume(core, open=True)
vol = CoreVolume(core)
write_data = Data.from_string("This is test data")
vol.open()
io = vol.new_io(
cache.get_default_queue(), S.from_sector(3).B, write_data.size, IoDir.WRITE, 0, 0
)
@ -103,6 +104,7 @@ def test_load_cache_with_cores(pyocf_ctx, open_cores):
io.callback = cmpl.callback
io.submit()
cmpl.wait()
vol.close()
cache.stop()
@ -112,9 +114,10 @@ def test_load_cache_with_cores(pyocf_ctx, open_cores):
else:
core = cache.get_core_by_name("test_core")
vol = CoreVolume(core, open=True)
vol = CoreVolume(core)
read_data = Data(write_data.size)
vol.open()
io = vol.new_io(cache.get_default_queue(), S.from_sector(3).B, read_data.size, IoDir.READ, 0, 0)
io.set_data(read_data)
@ -122,6 +125,7 @@ def test_load_cache_with_cores(pyocf_ctx, open_cores):
io.callback = cmpl.callback
io.submit()
cmpl.wait()
vol.close()
assert read_data.md5() == write_data.md5()
assert vol.md5() == core_device.md5()

View File

@ -17,6 +17,8 @@ from pyocf.types.volume_replicated import ReplicatedVolume
from pyocf.types.cvolume import CVolume
from pyocf.types.ctx import OcfCtx
from pyocf.helpers import get_composite_volume_type_id
from pyocf.types.volume import Volume
import warnings
default_registered_volumes = [RamVolume, ErrorDevice, CacheVolume, CoreVolume, ReplicatedVolume]
@ -34,6 +36,8 @@ def pyocf_ctx():
yield c
c.exit()
gc.collect()
if len(Volume._instances_) > 0:
warnings.warn("Not all Volumes have been closed!!!")
@pytest.fixture()
@ -46,6 +50,8 @@ def pyocf_ctx_log_buffer():
yield logger
c.exit()
gc.collect()
if len(Volume._instances_) > 0:
warnings.warn("Not all Volumes have been closed!!!")
@pytest.fixture()
@ -61,3 +67,22 @@ def pyocf_2_ctx():
c1.exit()
c2.exit()
gc.collect()
if len(Volume._instances_) > 0:
warnings.warn("Not all Volumes have been closed!!!")
@pytest.fixture()
def pyocf_2_ctx_log_buffer():
logger1 = BufferLogger(LogLevel.WARN, LogLevel.DEBUG, "Ctx1")
logger2 = BufferLogger(LogLevel.WARN, LogLevel.DEBUG, "Ctx2")
c1 = OcfCtx.with_defaults(logger1)
c2 = OcfCtx.with_defaults(logger2)
for vol_type in default_registered_volumes:
c1.register_volume_type(vol_type)
c2.register_volume_type(vol_type)
yield (c1, logger1, c2, logger2)
c1.exit()
c2.exit()
gc.collect()
if len(Volume._instances_) > 0:
warnings.warn("Not all Volumes have been closed!!!")

View File

@ -40,10 +40,11 @@ def test_flush_propagation(pyocf_ctx):
cache.add_core(core)
queue = cache.get_default_queue()
vol = CoreVolume(core, open=True)
vol = CoreVolume(core)
flushes = {}
vol.open()
io = vol.new_io(queue, addr, size, IoDir.WRITE, 0, IoFlags.FLUSH)
completion = OcfCompletion([("err", c_int)])
io.callback = completion.callback
@ -52,6 +53,7 @@ def test_flush_propagation(pyocf_ctx):
io.submit_flush()
completion.wait()
vol.close()
assert int(completion.results["err"]) == 0

View File

@ -29,9 +29,9 @@ def __io(io, queue, address, size, data, direction):
return int(completion.results["err"])
def io_to_exp_obj(core, address, size, data, offset, direction, flags):
vol = core.get_front_volume()
queue = core.cache.get_default_queue()
def io_to_exp_obj(vol, address, size, data, offset, direction, flags):
queue = vol.parent.get_default_queue()
vol.open()
io = vol.new_io(queue, address, size, direction, 0, flags)
if direction == IoDir.READ:
_data = Data.from_bytes(bytes(size))
@ -40,6 +40,7 @@ def io_to_exp_obj(core, address, size, data, offset, direction, flags):
ret = __io(io, queue, address, size, _data, direction)
if not ret and direction == IoDir.READ:
memmove(cast(data, c_void_p).value + offset, _data.handle, size)
vol.close()
return ret
@ -83,37 +84,37 @@ def test_io_flags(pyocf_ctx, cache_mode):
core = Core.using_device(core_device)
cache.add_core(core)
vol = CoreVolume(core, open=True)
vol = CoreVolume(core)
cache_device.set_check(True)
core_device.set_check(True)
# write miss
io_to_exp_obj(core, block_size * 0, block_size, data, 0, IoDir.WRITE, flags)
io_to_exp_obj(vol, block_size * 0, block_size, data, 0, IoDir.WRITE, flags)
assert not cache_device.fail
assert not core_device.fail
# read miss
io_to_exp_obj(core, block_size * 1, block_size, data, 0, IoDir.READ, flags)
io_to_exp_obj(vol, block_size * 1, block_size, data, 0, IoDir.READ, flags)
assert not cache_device.fail
assert not core_device.fail
# "dirty" read hit
io_to_exp_obj(core, block_size * 0, block_size, data, 0, IoDir.READ, flags)
io_to_exp_obj(vol, block_size * 0, block_size, data, 0, IoDir.READ, flags)
assert not cache_device.fail
assert not core_device.fail
# "clean" read hit
io_to_exp_obj(core, block_size * 1, block_size, data, 0, IoDir.READ, flags)
io_to_exp_obj(vol, block_size * 1, block_size, data, 0, IoDir.READ, flags)
assert not cache_device.fail
assert not core_device.fail
# "dirty" write hit
io_to_exp_obj(core, block_size * 0, block_size, data, 0, IoDir.WRITE, flags)
io_to_exp_obj(vol, block_size * 0, block_size, data, 0, IoDir.WRITE, flags)
assert not cache_device.fail
assert not core_device.fail
# "clean" write hit
io_to_exp_obj(core, block_size * 1, block_size, data, 0, IoDir.WRITE, flags)
io_to_exp_obj(vol, block_size * 1, block_size, data, 0, IoDir.WRITE, flags)
assert not cache_device.fail
assert not core_device.fail

View File

@ -24,7 +24,8 @@ def test_large_flush(pyocf_ctx):
cache.add_core(core)
queue = cache.get_default_queue()
vol = CoreVolume(core, open=True)
vol = CoreVolume(core)
vol.open()
io = vol.new_io(queue, 0, core_device.size.bytes, IoDir.WRITE, 0, IoFlags.FLUSH)
completion = OcfCompletion([("err", c_int)])
@ -33,6 +34,7 @@ def test_large_flush(pyocf_ctx):
io.set_data(data, 0)
io.submit_flush()
completion.wait()
vol.close()
assert int(completion.results["err"]) == 0
@ -48,7 +50,8 @@ def test_large_discard(pyocf_ctx):
cache.add_core(core)
queue = cache.get_default_queue()
vol = CoreVolume(core, open=True)
vol = CoreVolume(core)
vol.open()
io = vol.new_io(queue, 0, core_device.size.bytes, IoDir.WRITE, 0, 0)
completion = OcfCompletion([("err", c_int)])
@ -57,6 +60,7 @@ def test_large_discard(pyocf_ctx):
io.set_data(data, 0)
io.submit_discard()
completion.wait()
vol.close()
assert int(completion.results["err"]) == 0
@ -72,7 +76,8 @@ def test_large_io(pyocf_ctx):
cache.add_core(core)
queue = cache.get_default_queue()
vol = CoreVolume(core, open=True)
vol = CoreVolume(core)
vol.open()
io = vol.new_io(queue, 0, core_device.size.bytes, IoDir.WRITE, 0, 0)
completion = OcfCompletion([("err", c_int)])
@ -82,6 +87,8 @@ def test_large_io(pyocf_ctx):
io.submit()
completion.wait()
vol.close()
assert int(completion.results["err"]) == 0
cache.stop()

View File

@ -63,7 +63,7 @@ def test_change_to_nhit_and_back_io_in_flight(pyocf_ctx):
core = Core.using_device(core_device)
cache.add_core(core)
vol = CoreVolume(core, open=True)
vol = CoreVolume(core)
queue = cache.get_default_queue()
# Step 2
@ -111,7 +111,7 @@ def fill_cache(cache, fill_ratio):
bytes_to_fill = Size(round(cache_lines.bytes * fill_ratio))
core = cache.cores[0]
vol = CoreVolume(core, open=True)
vol = CoreVolume(core)
queue = cache.get_default_queue()
r = (
@ -147,7 +147,7 @@ def test_promoted_after_hits_various_thresholds(pyocf_ctx, insertion_threshold,
cache = Cache.start_on_device(cache_device, promotion_policy=PromotionPolicy.NHIT)
core = Core.using_device(core_device)
cache.add_core(core)
vol = CoreVolume(core, open=True)
vol = CoreVolume(core)
queue = cache.get_default_queue()
# Step 2
@ -218,7 +218,7 @@ def test_partial_hit_promotion(pyocf_ctx):
cache = Cache.start_on_device(cache_device)
core = Core.using_device(core_device)
cache.add_core(core)
vol = CoreVolume(core, open=True)
vol = CoreVolume(core)
queue = cache.get_default_queue()
# Step 2

View File

@ -243,7 +243,7 @@ def test_read_data_consistency(pyocf_ctx, cacheline_size, cache_mode, rand_seed)
core = Core.using_device(core_device)
cache.add_core(core)
queue = cache.get_default_queue()
vol = CoreVolume(core, open=True)
vol = CoreVolume(core)
insert_order = list(range(CACHELINE_COUNT))
@ -279,6 +279,8 @@ def test_read_data_consistency(pyocf_ctx, cacheline_size, cache_mode, rand_seed)
for _ in range(ITRATION_COUNT - len(region_statuses)):
region_statuses.append([random.choice(list(SectorStatus)) for _ in range(num_regions)])
vol.open()
# iterate over generated status combinations and perform the test
for region_state in region_statuses:
# write data to core and invalidate all CL and write data pattern to core
@ -383,3 +385,4 @@ def test_read_data_consistency(pyocf_ctx, cacheline_size, cache_mode, rand_seed)
), "unexpected write to core device, region_state={}, start={}, end={}, insert_order = {}\n".format(
region_state, start, end, insert_order
)
vol.close()

View File

@ -95,13 +95,14 @@ def test_seq_cutoff_max_streams(pyocf_ctx):
core = Core.using_device(RamVolume(core_size), seq_cutoff_promotion_count=1)
cache.add_core(core)
vol = CoreVolume(core, open=True)
vol = CoreVolume(core)
queue = cache.get_default_queue()
cache.set_seq_cut_off_policy(SeqCutOffPolicy.ALWAYS)
cache.set_seq_cut_off_threshold(threshold)
# STEP 1
vol.open()
shuffle(streams)
io_size = threshold - Size.from_sector(1)
io_to_streams(vol, queue, streams, io_size)
@ -139,7 +140,9 @@ def test_seq_cutoff_max_streams(pyocf_ctx):
# STEP 4
io_to_streams(vol, queue, [lru_stream], Size.from_sector(1))
vol.close()
stats = cache.get_stats()
assert (
stats["req"]["serviced"]["value"] == old_serviced + 2
), "This request should be serviced by cache - lru_stream should be no longer tracked"

View File

@ -35,14 +35,14 @@ def test_eviction_two_cores(pyocf_ctx, mode: CacheMode, cls: CacheLineSize):
core1 = Core.using_device(core_device1, name="core1")
core2 = Core.using_device(core_device2, name="core2")
cache.add_core(core1)
vol1 = CoreVolume(core1, open=True)
vol1 = CoreVolume(core1)
cache.add_core(core2)
vol2 = CoreVolume(core2, open=True)
vol2 = CoreVolume(core2)
valid_io_size = Size.from_B(cache_size.B)
test_data = Data(valid_io_size)
send_io(core1, test_data)
send_io(core2, test_data)
send_io(vol1, test_data)
send_io(vol2, test_data)
stats1 = core1.get_stats()
stats2 = core2.get_stats()
@ -62,12 +62,12 @@ def test_write_size_greater_than_cache(pyocf_ctx, mode: CacheMode, cls: CacheLin
cache_size = cache.get_stats()["conf"]["size"]
core = Core.using_device(core_device)
cache.add_core(core)
vol = CoreVolume(core, open=True)
vol = CoreVolume(core)
cache.set_seq_cut_off_policy(SeqCutOffPolicy.NEVER)
valid_io_size = Size.from_B(cache_size.B // 2)
test_data = Data(valid_io_size)
send_io(core, test_data)
send_io(vol, test_data)
stats = core.cache.get_stats()
first_block_sts = stats["block"]
@ -84,7 +84,7 @@ def test_write_size_greater_than_cache(pyocf_ctx, mode: CacheMode, cls: CacheLin
io_size_bigger_than_cache = Size.from_MiB(100)
io_offset = valid_io_size
test_data = Data(io_size_bigger_than_cache)
send_io(core, test_data, io_offset)
send_io(vol, test_data, io_offset)
if mode is not CacheMode.WT:
# Flush first write
@ -115,7 +115,7 @@ def test_evict_overflown_pinned(pyocf_ctx, cls: CacheLineSize):
cache = Cache.start_on_device(cache_device, cache_mode=CacheMode.WT, cache_line_size=cls)
core = Core.using_device(core_device)
cache.add_core(core)
vol = CoreVolume(core, open=True)
vol = CoreVolume(core)
test_ioclass_id = 1
pinned_ioclass_id = 2
@ -139,7 +139,7 @@ def test_evict_overflown_pinned(pyocf_ctx, cls: CacheLineSize):
# Populate cache with data
for i in range(cache_size.blocks_4k):
send_io(core, data, i * 4096, test_ioclass_id)
send_io(vol, data, i * 4096, test_ioclass_id)
part_current_size = CacheLines(
cache.get_partition_info(part_id=test_ioclass_id)["_curr_size"], cls
@ -151,7 +151,7 @@ def test_evict_overflown_pinned(pyocf_ctx, cls: CacheLineSize):
# Repart - force overflow of second partition occupancy limit
pinned_double_size = ceil((cache_size.blocks_4k * pinned_ioclass_max_occupancy * 2) / 100)
for i in range(pinned_double_size):
send_io(core, data, i * 4096, pinned_ioclass_id)
send_io(vol, data, i * 4096, pinned_ioclass_id)
part_current_size = CacheLines(
cache.get_partition_info(part_id=pinned_ioclass_id)["_curr_size"], cls
@ -162,7 +162,7 @@ def test_evict_overflown_pinned(pyocf_ctx, cls: CacheLineSize):
# Trigger IO to the default ioclass - force eviction from overlown ioclass
for i in range(cache_size.blocks_4k):
send_io(core, data, (cache_size.blocks_4k + i) * 4096, test_ioclass_id)
send_io(vol, data, (cache_size.blocks_4k + i) * 4096, test_ioclass_id)
part_current_size = CacheLines(
cache.get_partition_info(part_id=pinned_ioclass_id)["_curr_size"], cls
@ -172,10 +172,10 @@ def test_evict_overflown_pinned(pyocf_ctx, cls: CacheLineSize):
), "Overflown part has not been evicted"
def send_io(core: Core, data: Data, addr: int = 0, target_ioclass: int = 0):
vol = core.get_front_volume()
def send_io(vol: CoreVolume, data: Data, addr: int = 0, target_ioclass: int = 0):
vol.open()
io = vol.new_io(
core.cache.get_default_queue(), addr, data.size, IoDir.WRITE, target_ioclass, 0,
vol.parent.get_default_queue(), addr, data.size, IoDir.WRITE, target_ioclass, 0,
)
io.set_data(data)
@ -184,5 +184,6 @@ def send_io(core: Core, data: Data, addr: int = 0, target_ioclass: int = 0):
io.callback = completion.callback
io.submit()
completion.wait()
vol.close()
assert completion.results["err"] == 0, "IO to exported object completion"

View File

@ -34,7 +34,7 @@ def test_test_standby_io(pyocf_ctx, cacheline_size):
cache.add_io_queue(f"io-queue-{i}")
cache.standby_attach(cache_vol)
cache_vol = CacheVolume(cache, open=True)
cache_vol = CacheVolume(cache)
r = (
Rio()
@ -74,7 +74,7 @@ def test_test_standby_io_metadata(pyocf_ctx, cacheline_size):
io_offset = Size.from_page(start)
io_size = Size.from_page(count)
cache_vol = CacheVolume(cache, open=True)
cache_vol = CacheVolume(cache)
r = (
Rio()

View File

@ -30,6 +30,7 @@ def __io(io, queue, address, size, data, direction):
def io_to_exp_obj(vol, queue, address, size, data, offset, direction, flags):
vol.open()
io = vol.new_io(queue, address, size, direction, 0, flags)
if direction == IoDir.READ:
_data = Data.from_bytes(bytes(size))
@ -38,6 +39,7 @@ def io_to_exp_obj(vol, queue, address, size, data, offset, direction, flags):
ret = __io(io, queue, address, size, _data, direction)
if not ret and direction == IoDir.READ:
memmove(cast(data, c_void_p).value + offset, _data.handle, size)
vol.close()
return ret
@ -77,7 +79,7 @@ def test_flush_after_mngmt(pyocf_ctx):
cache.add_core(core)
assert cache_device.flush_last
vol = CoreVolume(core, open=True)
vol = CoreVolume(core)
queue = cache.get_default_queue()
# WT I/O to write data to core and cache VC

View File

@ -75,7 +75,7 @@ def test_remove_dirty_no_flush(pyocf_ctx, cache_mode, cls):
core = Core.using_device(core_device)
cache.add_core(core)
vol = CoreVolume(core, open=True)
vol = CoreVolume(core)
queue = core.cache.get_default_queue()
# Prepare data
@ -121,10 +121,11 @@ def test_10add_remove_with_io(pyocf_ctx):
# Add and remove core 10 times in a loop with io in between
for i in range(0, 10):
cache.add_core(core)
vol = CoreVolume(core, open=True)
vol = CoreVolume(core)
stats = cache.get_stats()
assert stats["conf"]["core_count"] == 1
vol.open()
write_data = Data.from_string("Test data")
io = vol.new_io(
cache.get_default_queue(), S.from_sector(1).B, write_data.size, IoDir.WRITE, 0, 0
@ -135,6 +136,7 @@ def test_10add_remove_with_io(pyocf_ctx):
io.callback = cmpl.callback
io.submit()
cmpl.wait()
vol.close()
cache.remove_core(core)
stats = cache.get_stats()
@ -299,6 +301,7 @@ def test_add_remove_incrementally(pyocf_ctx, cache_mode, cls):
def _io_to_core(vol: Volume, queue: Queue, data: Data):
vol.open()
io = vol.new_io(queue, 0, data.size, IoDir.WRITE, 0, 0)
io.set_data(data)
@ -307,6 +310,7 @@ def _io_to_core(vol: Volume, queue: Queue, data: Data):
io.submit()
completion.wait()
vol.close()
assert completion.results["err"] == 0, "IO to exported object completion"

View File

@ -48,7 +48,7 @@ def test_attach_different_size(pyocf_ctx, new_cache_size, mode: CacheMode, cls:
core = Core.using_device(core_device)
cache.add_core(core)
vol = CoreVolume(core, open=True)
vol = CoreVolume(core)
queue = cache.get_default_queue()
cache.configure_partition(part_id=1, name="test_part", max_size=50, priority=1)
@ -82,6 +82,7 @@ def test_attach_different_size(pyocf_ctx, new_cache_size, mode: CacheMode, cls:
def io_to_exp_obj(vol, queue, address, size, data, offset, direction, target_ioclass, flags):
vol.open()
io = vol.new_io(queue, address, size, direction, target_ioclass, flags)
if direction == IoDir.READ:
_data = Data.from_bytes(bytes(size))
@ -90,6 +91,7 @@ def io_to_exp_obj(vol, queue, address, size, data, offset, direction, target_ioc
ret = __io(io, queue, address, size, _data, direction)
if not ret and direction == IoDir.READ:
memmove(cast(data, c_void_p).value + offset, _data.handle, size)
vol.close()
return ret

View File

@ -283,7 +283,7 @@ def test_standby_activate_core_size_mismatch(pyocf_2_ctx):
cache = Cache(owner=ctx, cache_mode=mode, cache_line_size=cls)
cache.start_cache()
cache.standby_attach(vol2)
cache_vol = CacheVolume(cache, open=True)
cache_vol = CacheVolume(cache)
write_vol(cache_vol, cache.get_default_queue(), data)
@ -327,14 +327,14 @@ def test_failover_passive_first(pyocf_2_ctx):
cache2.standby_attach(sec_cache_backend_vol)
# volume replicating cache1 ramdisk writes to cache2 cache exported object
cache2_exp_obj_vol = CacheVolume(cache2, open=True)
cache2_exp_obj_vol = CacheVolume(cache2)
cache1_cache_vol = ReplicatedVolume(prim_cache_backend_vol, cache2_exp_obj_vol)
# active cache
cache1 = Cache.start_on_device(cache1_cache_vol, ctx1, cache_mode=mode, cache_line_size=cls)
core = Core(core_backend_vol)
cache1.add_core(core)
core_vol = CoreVolume(core, open=True)
core_vol = CoreVolume(core)
queue = cache1.get_default_queue()
# some I/O
@ -365,7 +365,7 @@ def test_failover_passive_first(pyocf_2_ctx):
# add core explicitly with "try_add" to workaround pyocf limitations
core = Core(core_backend_vol)
cache2.add_core(core, try_add=True)
core_vol = CoreVolume(core, open=True)
core_vol = CoreVolume(core)
assert md5 == core_vol.md5()
@ -373,6 +373,7 @@ def test_failover_passive_first(pyocf_2_ctx):
def write_vol(vol, queue, data):
data_size = len(data)
subdata_size_max = int(Size.from_MiB(32))
vol.open()
for offset in range(0, data_size, subdata_size_max):
subdata_size = min(data_size - offset, subdata_size_max)
subdata = Data.from_bytes(data, offset, subdata_size)
@ -382,6 +383,7 @@ def write_vol(vol, queue, data):
io.callback = comp.callback
io.submit()
comp.wait()
vol.close()
def test_failover_active_first(pyocf_2_ctx):
@ -399,7 +401,7 @@ def test_failover_active_first(pyocf_2_ctx):
)
core = Core(core_backend_vol)
cache1.add_core(core)
vol = CoreVolume(core, open=True)
vol = CoreVolume(core)
queue1 = cache1.get_default_queue()
# some I/O
@ -431,11 +433,11 @@ def test_failover_active_first(pyocf_2_ctx):
cache2 = Cache(owner=ctx2, cache_mode=mode, cache_line_size=cls)
cache2.start_cache()
cache2.standby_attach(sec_cache_backend_vol)
vol2 = CacheVolume(cache2, open=True)
vol2 = CacheVolume(cache2)
queue = cache2.get_default_queue()
# standby cache exported object volume
cache2_exp_obj_vol = CacheVolume(cache2, open=True)
cache2_exp_obj_vol = CacheVolume(cache2)
# just to be sure
assert sec_cache_backend_vol.get_bytes() != prim_cache_backend_vol.get_bytes()
@ -453,7 +455,7 @@ def test_failover_active_first(pyocf_2_ctx):
cache2.standby_activate(sec_cache_backend_vol, open_cores=False)
core = Core(core_backend_vol)
cache2.add_core(core, try_add=True)
vol = CoreVolume(core, open=True)
vol = CoreVolume(core)
# check data consistency
assert data_md5 == vol.md5()
@ -498,7 +500,7 @@ def test_failover_line_size_mismatch(pyocf_2_ctx):
cache = Cache(owner=ctx, cache_mode=mode, cache_line_size=cls2)
cache.start_cache()
cache.standby_attach(vol2)
cache_vol = CacheVolume(cache, open=True)
cache_vol = CacheVolume(cache)
write_vol(cache_vol, cache.get_default_queue(), data)
@ -544,14 +546,14 @@ def test_failover_passive_first(pyocf_2_ctx):
cache2.standby_attach(sec_cache_backend_vol)
# volume replicating cache1 ramdisk writes to cache2 cache exported object
cache2_exp_obj_vol = CacheVolume(cache2, open=True)
cache2_exp_obj_vol = CacheVolume(cache2)
cache1_cache_vol = ReplicatedVolume(prim_cache_backend_vol, cache2_exp_obj_vol)
# active cache
cache1 = Cache.start_on_device(cache1_cache_vol, ctx1, cache_mode=mode, cache_line_size=cls)
core = Core(core_backend_vol)
cache1.add_core(core)
core_vol = CoreVolume(core, open=True)
core_vol = CoreVolume(core)
queue = cache1.get_default_queue()
# some I/O
@ -582,6 +584,6 @@ def test_failover_passive_first(pyocf_2_ctx):
# add core explicitly with "try_add" to workaround pyocf limitations
core = Core(core_backend_vol)
cache2.add_core(core, try_add=True)
core_vol = CoreVolume(core, open=True)
core_vol = CoreVolume(core)
assert md5 == core_vol.md5()

View File

@ -92,7 +92,7 @@ def test_metadata_volatile_io(pyocf_ctx):
cache.change_cache_mode(CacheMode.WB)
core = Core.using_device(core_device, name="test_core")
cache.add_core(core)
vol = CoreVolume(core, open=True)
vol = CoreVolume(core)
r = (
Rio()

View File

@ -84,7 +84,7 @@ def test_start_write_first_and_check_mode(pyocf_ctx, mode: CacheMode, cls: Cache
core = Core.using_device(core_device)
cache.add_core(core)
vol = CoreVolume(core, open=True)
vol = CoreVolume(core)
queue = cache.get_default_queue()
logger.info("[STAGE] Initial write to exported object")
@ -108,7 +108,7 @@ def test_start_write_first_and_check_mode(pyocf_ctx, mode: CacheMode, cls: Cache
io_to_core(vol, queue, test_data, Size.from_sector(1).B)
check_stats_write_after_read(core, mode, cls)
check_md5_sums(core, mode)
check_md5_sums(vol, mode)
@pytest.mark.parametrize("cls", CacheLineSize)
@ -124,7 +124,7 @@ def test_start_read_first_and_check_mode(pyocf_ctx, mode: CacheMode, cls: CacheL
core = Core.using_device(core_device)
cache.add_core(core)
front_vol = CoreVolume(core, open=True)
front_vol = CoreVolume(core)
bottom_vol = core.get_volume()
queue = cache.get_default_queue()
@ -153,7 +153,7 @@ def test_start_read_first_and_check_mode(pyocf_ctx, mode: CacheMode, cls: CacheL
io_from_exported_object(front_vol, queue, test_data.size, Size.from_sector(1).B)
check_stats_read_after_write(core, mode, cls)
check_md5_sums(core, mode)
check_md5_sums(front_vol, mode)
@pytest.mark.parametrize("cls", CacheLineSize)
@ -208,12 +208,12 @@ def test_stop(pyocf_ctx, mode: CacheMode, cls: CacheLineSize, with_flush: bool):
core = Core.using_device(core_device)
cache.add_core(core)
front_vol = CoreVolume(core, open=True)
front_vol = CoreVolume(core)
queue = cache.get_default_queue()
cls_no = 10
run_io_and_cache_data_if_possible(core, mode, cls, cls_no)
run_io_and_cache_data_if_possible(front_vol, mode, cls, cls_no)
stats = cache.get_stats()
assert int(stats["conf"]["dirty"]) == (
@ -495,29 +495,28 @@ def test_start_stop_noqueue(pyocf_ctx):
assert not c.results["error"], "Failed to stop cache: {}".format(c.results["error"])
def run_io_and_cache_data_if_possible(core, mode, cls, cls_no):
front_vol = core.get_front_volume()
bottom_vol = core.get_volume()
queue = core.cache.get_default_queue()
def run_io_and_cache_data_if_possible(vol, mode, cls, cls_no):
queue = vol.parent.get_default_queue()
test_data = Data(cls_no * cls)
if mode in {CacheMode.WI, CacheMode.WA}:
logger.info("[STAGE] Write to core device")
io_to_core(bottom_vol, queue, test_data, 0)
io_to_core(vol.parent.device, queue, test_data, 0)
logger.info("[STAGE] Read from exported object")
io_from_exported_object(front_vol, queue, test_data.size, 0)
io_from_exported_object(vol, queue, test_data.size, 0)
else:
logger.info("[STAGE] Write to exported object")
io_to_core(front_vol, queue, test_data, 0)
io_to_core(vol, queue, test_data, 0)
stats = core.cache.get_stats()
stats = vol.parent.cache.get_stats()
assert stats["usage"]["occupancy"]["value"] == (
(cls_no * cls / CacheLineSize.LINE_4KiB) if mode != CacheMode.PT else 0
), "Occupancy"
def io_to_core(vol: Volume, queue: Queue, data: Data, offset: int):
vol.open()
io = vol.new_io(queue, offset, data.size, IoDir.WRITE, 0, 0)
io.set_data(data)
@ -526,11 +525,13 @@ def io_to_core(vol: Volume, queue: Queue, data: Data, offset: int):
io.submit()
completion.wait()
vol.close()
assert completion.results["err"] == 0, "IO to exported object completion"
def io_from_exported_object(vol: Volume, queue: Queue, buffer_size: int, offset: int):
read_buffer = Data(buffer_size)
vol.open()
io = vol.new_io(queue, offset, read_buffer.size, IoDir.READ, 0, 0)
io.set_data(read_buffer)
@ -538,6 +539,7 @@ def io_from_exported_object(vol: Volume, queue: Queue, buffer_size: int, offset:
io.callback = completion.callback
io.submit()
completion.wait()
vol.close()
assert completion.results["err"] == 0, "IO from exported object completion"
return read_buffer
@ -634,17 +636,17 @@ def check_stats_read_after_write(core, mode, cls, write_to_empty=False):
), "Occupancy"
def check_md5_sums(core: Core, mode: CacheMode):
def check_md5_sums(vol: CoreVolume, mode: CacheMode):
if mode.lazy_write():
assert (
core.device.md5() != core.get_front_volume().md5()
vol.parent.device.md5() != vol.md5()
), "MD5 check: core device vs exported object without flush"
core.cache.flush()
vol.parent.cache.flush()
assert (
core.device.md5() == core.get_front_volume().md5()
vol.parent.device.md5() == vol.md5()
), "MD5 check: core device vs exported object after flush"
else:
assert (
core.device.md5() == core.get_front_volume().md5()
vol.parent.device.md5() == vol.md5()
), "MD5 check: core device vs exported object"

View File

@ -0,0 +1,80 @@
#
# Copyright(c) 2022 Intel Corporation
# SPDX-License-Identifier: BSD-3-Clause
#
import pytest
import logging
from datetime import datetime
from pyocf.types.volume_cache import CacheVolume
from pyocf.types.volume import RamVolume
from pyocf.types.cache import Cache, CacheMetadataSegment, CacheMode
from pyocf.utils import Size
from pyocf.types.shared import CacheLineSize, OcfError, OcfErrorCode
from pyocf.types.ctx import OcfCtx
from pyocf.rio import Rio, ReadWrite
from pyocf.helpers import get_metadata_segment_size, get_metadata_segment_page_location
from tests.utils.random import RandomGenerator
logger = logging.getLogger(__name__)
@pytest.mark.security
@pytest.mark.parametrize("cache_line_size", CacheLineSize)
@pytest.mark.parametrize(
"bs",
[
Size.from_B(512),
Size.from_KiB(1),
Size.from_KiB(18),
Size.from_KiB(128),
],
)
@pytest.mark.parametrize(
"io_size",
[
Size.from_B(512),
Size.from_KiB(10),
Size.from_MiB(1),
Size.from_MiB(10),
Size.from_GiB(1),
],
)
@pytest.mark.parametrize("section", CacheMetadataSegment)
def test_garbage_on_cache_exported_object(pyocf_ctx, cache_line_size, bs, io_size, section):
num_jobs = 1
qd = 64
vol_size = Size.from_MiB(100)
cache_vol = RamVolume(vol_size)
secondary_cache_volume = RamVolume(vol_size)
cache = Cache(owner=OcfCtx.get_default(), cache_line_size=cache_line_size)
cache.start_cache(init_default_io_queue=False)
for i in range(num_jobs):
cache.add_io_queue(f"io-queue-{i}")
cache.standby_attach(cache_vol)
cache_exp_vol = CacheVolume(cache)
seed = next(RandomGenerator())
r = (
Rio()
.target(cache_exp_vol)
.njobs(num_jobs)
.readwrite(ReadWrite.RANDWRITE)
.io_size(io_size)
.randseed(seed)
.bs(bs)
.qd(qd)
.norandommap()
.run(cache.io_queues)
)
cache.standby_detach()
with pytest.raises(OcfError):
cache.standby_activate(secondary_cache_volume, open_cores=False)

View File

@ -0,0 +1,400 @@
#
# Copyright(c) 2022 Intel Corporation
# SPDX-License-Identifier: BSD-3-Clause
#
import pytest
from datetime import timedelta
import logging
from random import randrange
from contextlib import nullcontext, suppress
from enum import Enum
from pyocf.types.volume import RamVolume
from pyocf.types.volume_replicated import ReplicatedVolume
from pyocf.types.cache import Cache, CacheMetadataSegment, CacheMode
from pyocf.types.volume_cache import CacheVolume
from pyocf.types.core import Core
from pyocf.types.volume_core import CoreVolume
from pyocf.utils import Size
from pyocf.types.shared import CacheLineSize, OcfError, OcfErrorCode
from pyocf.types.ctx import OcfCtx
from pyocf.rio import Rio, ReadWrite
from pyocf.helpers import (
get_metadata_segment_size,
get_metadata_segment_elems_count,
get_metadata_segment_elems_per_page,
get_metadata_segment_elem_size,
get_metadata_segment_is_flapped,
get_metadata_segment_page_location,
)
logger = logging.getLogger(__name__)
def raises(exception):
context = pytest.raises(exception)
context.__name__ = f"Raises({exception.__name__})"
return context
def does_not_raise():
context = nullcontext()
context.__name__ = "DoesNotRaise"
return context
def may_raise(exception):
context = suppress(exception)
context.__name__ = f"MayRaise({exception.__name__})"
return context
class Shutdown(Enum):
DIRTY = True
CLEAN = False
@pytest.mark.security
@pytest.mark.parametrize(
"shutdown_type,target_segment,expectation",
[
(Shutdown.DIRTY, CacheMetadataSegment.SB_CONFIG, raises(OcfError)),
(Shutdown.DIRTY, CacheMetadataSegment.SB_RUNTIME, does_not_raise()),
(Shutdown.DIRTY, CacheMetadataSegment.RESERVED, does_not_raise()),
(Shutdown.DIRTY, CacheMetadataSegment.PART_CONFIG, raises(OcfError)),
(Shutdown.DIRTY, CacheMetadataSegment.PART_RUNTIME, does_not_raise()),
(Shutdown.DIRTY, CacheMetadataSegment.CORE_CONFIG, raises(OcfError)),
(Shutdown.DIRTY, CacheMetadataSegment.CORE_RUNTIME, does_not_raise()),
(Shutdown.DIRTY, CacheMetadataSegment.CORE_UUID, raises(OcfError)),
(Shutdown.DIRTY, CacheMetadataSegment.CLEANING, does_not_raise()),
(Shutdown.DIRTY, CacheMetadataSegment.LRU, does_not_raise()),
(Shutdown.DIRTY, CacheMetadataSegment.COLLISION, may_raise(OcfError)),
(Shutdown.DIRTY, CacheMetadataSegment.LIST_INFO, does_not_raise()),
(Shutdown.DIRTY, CacheMetadataSegment.HASH, does_not_raise()),
(Shutdown.CLEAN, CacheMetadataSegment.SB_CONFIG, raises(OcfError)),
(Shutdown.CLEAN, CacheMetadataSegment.SB_RUNTIME, raises(OcfError)),
(Shutdown.CLEAN, CacheMetadataSegment.RESERVED, does_not_raise()),
(Shutdown.CLEAN, CacheMetadataSegment.PART_CONFIG, raises(OcfError)),
(Shutdown.CLEAN, CacheMetadataSegment.PART_RUNTIME, raises(OcfError)),
(Shutdown.CLEAN, CacheMetadataSegment.CORE_CONFIG, raises(OcfError)),
(Shutdown.CLEAN, CacheMetadataSegment.CORE_RUNTIME, raises(OcfError)),
(Shutdown.CLEAN, CacheMetadataSegment.CORE_UUID, raises(OcfError)),
(Shutdown.CLEAN, CacheMetadataSegment.CLEANING, raises(OcfError)),
(Shutdown.CLEAN, CacheMetadataSegment.LRU, raises(OcfError)),
(Shutdown.CLEAN, CacheMetadataSegment.COLLISION, raises(OcfError)),
(Shutdown.CLEAN, CacheMetadataSegment.LIST_INFO, raises(OcfError)),
(Shutdown.CLEAN, CacheMetadataSegment.HASH, raises(OcfError)),
],
)
@pytest.mark.parametrize("cache_line_size", CacheLineSize)
@pytest.mark.parametrize("cache_mode", CacheMode)
def test_metadata_corruption(
pyocf_ctx, cache_line_size, cache_mode, shutdown_type, target_segment, expectation
):
cache_volume = RamVolume(Size.from_MiB(60))
core_volume = RamVolume(Size.from_MiB(1))
cache = Cache.start_on_device(
cache_volume,
cache_mode=cache_mode,
cache_line_size=cache_line_size,
)
corrupted_bytes = get_random_target_in_segment(cache, target_segment)
core = Core(core_volume)
cache.add_core(core)
core_exp_volume = CoreVolume(core)
queue = cache.get_default_queue()
r = (
Rio()
.target(core_exp_volume)
.njobs(1)
.readwrite(ReadWrite.WRITE)
.size(Size.from_MiB(1))
.qd(1)
.run([queue])
)
if shutdown_type == Shutdown.DIRTY:
cache.save()
cache.device.offline()
exc = False
try:
cache.stop()
except OcfError:
exc = True
cache_volume.online()
if shutdown_type == Shutdown.DIRTY:
assert exc, "Stopping with device offlined should raise an exception"
for byte in corrupted_bytes:
corrupt_byte(cache_volume.data, byte)
with expectation:
cache = Cache.load_from_device(cache_volume)
@pytest.mark.security
@pytest.mark.parametrize(
"shutdown_type,target_segment,expectation",
[
(Shutdown.DIRTY, CacheMetadataSegment.SB_CONFIG, raises(OcfError)),
(Shutdown.DIRTY, CacheMetadataSegment.SB_RUNTIME, does_not_raise()),
(Shutdown.DIRTY, CacheMetadataSegment.RESERVED, does_not_raise()),
(Shutdown.DIRTY, CacheMetadataSegment.PART_CONFIG, raises(OcfError)),
(Shutdown.DIRTY, CacheMetadataSegment.PART_RUNTIME, does_not_raise()),
(Shutdown.DIRTY, CacheMetadataSegment.CORE_CONFIG, raises(OcfError)),
(Shutdown.DIRTY, CacheMetadataSegment.CORE_RUNTIME, does_not_raise()),
(Shutdown.DIRTY, CacheMetadataSegment.CORE_UUID, raises(OcfError)),
(Shutdown.DIRTY, CacheMetadataSegment.CLEANING, does_not_raise()),
(Shutdown.DIRTY, CacheMetadataSegment.LRU, does_not_raise()),
(Shutdown.DIRTY, CacheMetadataSegment.COLLISION, may_raise(OcfError)),
(Shutdown.DIRTY, CacheMetadataSegment.LIST_INFO, does_not_raise()),
(Shutdown.DIRTY, CacheMetadataSegment.HASH, does_not_raise()),
(Shutdown.CLEAN, CacheMetadataSegment.SB_CONFIG, raises(OcfError)),
(Shutdown.CLEAN, CacheMetadataSegment.SB_RUNTIME, does_not_raise()),
(Shutdown.CLEAN, CacheMetadataSegment.RESERVED, does_not_raise()),
(Shutdown.CLEAN, CacheMetadataSegment.PART_CONFIG, raises(OcfError)),
(Shutdown.CLEAN, CacheMetadataSegment.PART_RUNTIME, does_not_raise()),
(Shutdown.CLEAN, CacheMetadataSegment.CORE_CONFIG, raises(OcfError)),
(Shutdown.CLEAN, CacheMetadataSegment.CORE_RUNTIME, does_not_raise()),
(Shutdown.CLEAN, CacheMetadataSegment.CORE_UUID, raises(OcfError)),
(Shutdown.CLEAN, CacheMetadataSegment.CLEANING, does_not_raise()),
(Shutdown.CLEAN, CacheMetadataSegment.LRU, does_not_raise()),
(Shutdown.CLEAN, CacheMetadataSegment.COLLISION, may_raise(OcfError)),
(Shutdown.CLEAN, CacheMetadataSegment.LIST_INFO, does_not_raise()),
(Shutdown.CLEAN, CacheMetadataSegment.HASH, does_not_raise()),
],
)
@pytest.mark.parametrize("cache_line_size", CacheLineSize)
@pytest.mark.parametrize("cache_mode", CacheMode)
def test_metadata_corruption_standby_activate(
pyocf_2_ctx, cache_line_size, cache_mode, shutdown_type, target_segment, expectation
):
primary_ctx, secondary_ctx = pyocf_2_ctx
primary_cache_volume = RamVolume(Size.from_MiB(60))
secondary_cache_volume = RamVolume(Size.from_MiB(60))
core_volume = RamVolume(Size.from_MiB(1))
secondary_cache = Cache(
owner=secondary_ctx,
cache_mode=cache_mode,
cache_line_size=cache_line_size,
)
secondary_cache.start_cache()
secondary_cache.standby_attach(secondary_cache_volume)
corrupted_bytes = get_random_target_in_segment(secondary_cache, target_segment)
secondary_cache_exp_obj = CacheVolume(secondary_cache)
primary_cache_replicated_volume = ReplicatedVolume(
primary_cache_volume, secondary_cache_exp_obj
)
primary_cache = Cache.start_on_device(
primary_cache_replicated_volume,
owner=primary_ctx,
cache_mode=cache_mode,
cache_line_size=cache_line_size,
)
core = Core(core_volume)
primary_cache.add_core(core)
core_exp_volume = CoreVolume(core)
queue = primary_cache.get_default_queue()
r = (
Rio()
.target(core_exp_volume)
.njobs(1)
.readwrite(ReadWrite.WRITE)
.size(Size.from_MiB(1))
.qd(1)
.run([queue])
)
if shutdown_type == Shutdown.DIRTY:
primary_cache.save()
primary_cache.device.offline()
exc = False
try:
primary_cache.stop()
except OcfError:
exc = True
primary_cache_replicated_volume.online()
secondary_cache.standby_detach()
if shutdown_type == Shutdown.DIRTY:
assert exc, "Stopping with device offlined should raise an exception"
for byte in corrupted_bytes:
corrupt_byte(secondary_cache_volume.data, byte)
with expectation:
secondary_cache.standby_activate(secondary_cache_volume, open_cores=False)
@pytest.mark.security
@pytest.mark.parametrize(
"shutdown_type,target_segment,expectation",
[
(Shutdown.DIRTY, CacheMetadataSegment.SB_CONFIG, raises(OcfError)),
(Shutdown.DIRTY, CacheMetadataSegment.SB_RUNTIME, does_not_raise()),
(Shutdown.DIRTY, CacheMetadataSegment.RESERVED, does_not_raise()),
(Shutdown.DIRTY, CacheMetadataSegment.PART_CONFIG, raises(OcfError)),
(Shutdown.DIRTY, CacheMetadataSegment.PART_RUNTIME, does_not_raise()),
(Shutdown.DIRTY, CacheMetadataSegment.CORE_CONFIG, raises(OcfError)),
(Shutdown.DIRTY, CacheMetadataSegment.CORE_RUNTIME, does_not_raise()),
(Shutdown.DIRTY, CacheMetadataSegment.CORE_UUID, raises(OcfError)),
(Shutdown.DIRTY, CacheMetadataSegment.CLEANING, does_not_raise()),
(Shutdown.DIRTY, CacheMetadataSegment.LRU, does_not_raise()),
(Shutdown.DIRTY, CacheMetadataSegment.COLLISION, may_raise(OcfError)),
(Shutdown.DIRTY, CacheMetadataSegment.LIST_INFO, does_not_raise()),
(Shutdown.DIRTY, CacheMetadataSegment.HASH, does_not_raise()),
(Shutdown.CLEAN, CacheMetadataSegment.SB_CONFIG, raises(OcfError)),
(Shutdown.CLEAN, CacheMetadataSegment.SB_RUNTIME, does_not_raise()),
(Shutdown.CLEAN, CacheMetadataSegment.RESERVED, does_not_raise()),
(Shutdown.CLEAN, CacheMetadataSegment.PART_CONFIG, raises(OcfError)),
(Shutdown.CLEAN, CacheMetadataSegment.PART_RUNTIME, does_not_raise()),
(Shutdown.CLEAN, CacheMetadataSegment.CORE_CONFIG, raises(OcfError)),
(Shutdown.CLEAN, CacheMetadataSegment.CORE_RUNTIME, does_not_raise()),
(Shutdown.CLEAN, CacheMetadataSegment.CORE_UUID, raises(OcfError)),
(Shutdown.CLEAN, CacheMetadataSegment.CLEANING, does_not_raise()),
(Shutdown.CLEAN, CacheMetadataSegment.LRU, does_not_raise()),
(Shutdown.CLEAN, CacheMetadataSegment.COLLISION, may_raise(OcfError)),
(Shutdown.CLEAN, CacheMetadataSegment.LIST_INFO, does_not_raise()),
(Shutdown.CLEAN, CacheMetadataSegment.HASH, does_not_raise()),
],
)
@pytest.mark.parametrize("cache_line_size", CacheLineSize)
@pytest.mark.parametrize("cache_mode", CacheMode)
def test_metadata_corruption_standby_load(
pyocf_2_ctx, cache_line_size, cache_mode, shutdown_type, target_segment, expectation
):
primary_ctx, secondary_ctx = pyocf_2_ctx
primary_cache_volume = RamVolume(Size.from_MiB(60))
secondary_cache_volume = RamVolume(Size.from_MiB(60))
core_volume = RamVolume(Size.from_MiB(1))
secondary_cache = Cache(
owner=secondary_ctx,
cache_mode=cache_mode,
cache_line_size=cache_line_size,
)
secondary_cache.start_cache()
secondary_cache.standby_attach(secondary_cache_volume)
corrupted_bytes = get_random_target_in_segment(secondary_cache, target_segment)
secondary_cache_exp_obj = CacheVolume(secondary_cache)
primary_cache_replicated_volume = ReplicatedVolume(
primary_cache_volume, secondary_cache_exp_obj
)
primary_cache = Cache.start_on_device(
primary_cache_replicated_volume,
owner=primary_ctx,
cache_mode=cache_mode,
cache_line_size=cache_line_size,
)
core = Core(core_volume)
primary_cache.add_core(core)
core_exp_volume = CoreVolume(core)
queue = primary_cache.get_default_queue()
r = (
Rio()
.target(core_exp_volume)
.njobs(1)
.readwrite(ReadWrite.WRITE)
.size(Size.from_MiB(1))
.qd(1)
.run([queue])
)
if shutdown_type == Shutdown.DIRTY:
primary_cache.save()
primary_cache.device.offline()
exc = False
try:
primary_cache.stop()
except OcfError:
exc = True
primary_cache_replicated_volume.online()
secondary_cache.stop()
if shutdown_type == Shutdown.DIRTY:
assert exc, "Stopping with device offlined should raise an exception"
for byte in corrupted_bytes:
corrupt_byte(secondary_cache_volume.data, corrupted_bytes)
loaded = False
with expectation:
secondary_cache = Cache.load_standby_from_device(
secondary_cache_volume, secondary_ctx, cache_line_size=cache_line_size
)
loaded = True
if loaded:
secondary_cache.standby_detach()
secondary_cache.standby_activate(secondary_cache_volume, open_cores=False)
def corrupt_byte(buffer, offset):
logger.info(f"Corrupting byte {offset}")
byte_val = int.from_bytes(buffer[offset], "big")
target_val = byte_val ^ 0xAA
buffer[offset] = (target_val).to_bytes(1, "big")
logger.debug(f"0x{byte_val:02X} -> 0x{target_val:02X}")
def get_random_target_in_segment(cache: Cache, target_segment: CacheMetadataSegment):
offset = Size.from_page(get_metadata_segment_page_location(cache, target_segment))
page_count = get_metadata_segment_size(cache, target_segment)
if get_metadata_segment_is_flapped(cache, target_segment):
page_count = page_count // 2
elem_size = get_metadata_segment_elem_size(cache, target_segment)
elems_per_page = get_metadata_segment_elems_per_page(cache, target_segment)
elems_count = get_metadata_segment_elems_count(cache, target_segment)
target_page = randrange(0, page_count) if page_count > 1 else 0
if target_page != page_count - 1:
page_filled = elem_size * elems_per_page
else:
page_filled = (elems_count % elems_per_page) * elem_size
offset_in_page = randrange(0, page_filled) if page_filled else 0
corrupted_byte = target_page * Size.from_page(1).B + offset_in_page + offset.B
if get_metadata_segment_is_flapped(cache, target_segment):
ret = (corrupted_byte, corrupted_byte + (page_count * Size.from_page(1).B))
else:
ret = (corrupted_byte,)
return ret

View File

@ -191,7 +191,7 @@ def prepare_cache_and_core(core_size: Size, cache_size: Size = Size.from_MiB(50)
core = Core.using_device(core_device)
cache.add_core(core)
vol = CoreVolume(core, open=True)
vol = CoreVolume(core)
queue = cache.get_default_queue()
return vol, queue
@ -200,6 +200,7 @@ def prepare_cache_and_core(core_size: Size, cache_size: Size = Size.from_MiB(50)
def io_operation(
vol: Volume, queue: Queue, data: Data, io_direction: int, offset: int = 0, io_class: int = 0,
):
vol.open()
io = vol.new_io(queue, offset, data.size, io_direction, io_class, 0)
io.set_data(data)
@ -207,4 +208,5 @@ def io_operation(
io.callback = completion.callback
io.submit()
completion.wait()
vol.close()
return completion

View File

@ -82,10 +82,11 @@ def test_secure_erase_simple_io_read_misses(cache_mode):
core_device = RamVolume(S.from_MiB(50))
core = Core.using_device(core_device)
cache.add_core(core)
vol = CoreVolume(core, open=True)
vol = CoreVolume(core)
queue = cache.get_default_queue()
write_data = DataCopyTracer(S.from_sector(1))
vol.open()
io = vol.new_io(queue, S.from_sector(1).B, write_data.size, IoDir.WRITE, 0, 0,)
io.set_data(write_data)
@ -117,6 +118,7 @@ def test_secure_erase_simple_io_read_misses(cache_mode):
io.submit()
cmpl.wait()
vol.close()
stats = cache.get_stats()
ctx.exit()
@ -156,10 +158,12 @@ def test_secure_erase_simple_io_cleaning():
core_device = RamVolume(S.from_MiB(100))
core = Core.using_device(core_device)
cache.add_core(core)
vol = CoreVolume(core, open=True)
vol = CoreVolume(core)
queue = cache.get_default_queue()
read_data = Data(S.from_sector(1).B)
vol.open()
io = vol.new_io(queue, S.from_sector(1).B, read_data.size, IoDir.WRITE, 0, 0)
io.set_data(read_data)
@ -177,6 +181,7 @@ def test_secure_erase_simple_io_cleaning():
io.submit()
cmpl.wait()
vol.close()
stats = cache.get_stats()
ctx.exit()

View File

@ -39,6 +39,7 @@ mngmt_op_surprise_shutdown_test_io_offset = S.from_MiB(4).B
def ocf_write(vol, queue, val, offset):
vol.open()
data = Data.from_bytes(bytes([val] * 512))
comp = OcfCompletion([("error", c_int)])
io = vol.new_io(queue, offset, 512, IoDir.WRITE, 0, 0)
@ -46,9 +47,11 @@ def ocf_write(vol, queue, val, offset):
io.callback = comp.callback
io.submit()
comp.wait()
vol.close()
def ocf_read(vol, queue, offset):
vol.open()
data = Data(byte_count=512)
comp = OcfCompletion([("error", c_int)])
io = vol.new_io(queue, offset, 512, IoDir.READ, 0, 0)
@ -56,6 +59,7 @@ def ocf_read(vol, queue, offset):
io.callback = comp.callback
io.submit()
comp.wait()
vol.close()
return data.get_bytes()[0]
@ -66,7 +70,7 @@ def prepare_failover(pyocf_2_ctx, cache_backend_vol, error_io_seq_no):
cache2 = Cache(owner=ctx2)
cache2.start_cache()
cache2.standby_attach(cache_backend_vol)
cache2_exp_obj_vol = CacheVolume(cache2, open=True)
cache2_exp_obj_vol = CacheVolume(cache2)
error_io = {IoDir.WRITE: error_io_seq_no}
@ -206,7 +210,7 @@ def test_surprise_shutdown_remove_core_with_data(pyocf_2_ctx, failover):
def prepare_func(cache):
cache.add_core(core)
vol = CoreVolume(core, open=True)
vol = CoreVolume(core)
ocf_write(vol, cache.get_default_queue(), 0xAA, io_offset)
def tested_func(cache):
@ -219,7 +223,7 @@ def test_surprise_shutdown_remove_core_with_data(pyocf_2_ctx, failover):
assert core_device.get_bytes()[io_offset] == 0xAA
else:
core = cache.get_core_by_name("core1")
vol = CoreVolume(core, open=True)
vol = CoreVolume(core)
assert ocf_read(vol, cache.get_default_queue(), io_offset) == 0xAA
mngmt_op_surprise_shutdown_test(pyocf_2_ctx, failover, tested_func, prepare_func, check_func)
@ -273,7 +277,7 @@ def test_surprise_shutdown_swap_core_with_data(pyocf_2_ctx, failover):
def prepare(cache):
cache.add_core(core1)
vol = CoreVolume(core1, open=True)
vol = CoreVolume(core1)
cache.save()
ocf_write(
vol, cache.get_default_queue(), 0xAA, mngmt_op_surprise_shutdown_test_io_offset,
@ -299,7 +303,7 @@ def test_surprise_shutdown_swap_core_with_data(pyocf_2_ctx, failover):
core2 = cache.get_core_by_name("core2")
if core2 is not None:
vol2 = CoreVolume(core2, open=True)
vol2 = CoreVolume(core2)
assert core2.device.uuid == "dev2"
assert (
ocf_read(
@ -332,10 +336,9 @@ def test_surprise_shutdown_start_cache(pyocf_2_ctx, failover):
cache2 = Cache(owner=ctx2)
cache2.start_cache()
cache2.standby_attach(ramdisk)
cache2_exp_obj_vol = CacheVolume(cache2, open=True)
err_device = ErrorDevice(
cache2_exp_obj_vol, error_seq_no=error_io, data_only=True, armed=True
)
cache2_exp_obj_vol = CacheVolume(cache2)
err_device = ErrorDevice(cache2_exp_obj_vol, error_seq_no=error_io, armed=True)
else:
err_device = ErrorDevice(ramdisk, error_seq_no=error_io, data_only=True, armed=True)
@ -404,7 +407,7 @@ def test_surprise_shutdown_stop_cache(pyocf_2_ctx, failover):
core = Core(device=core_device)
cache.add_core(core)
vol = CoreVolume(core, open=True)
vol = CoreVolume(core)
ocf_write(vol, cache.get_default_queue(), 0xAA, io_offset)
# start error injection
@ -444,7 +447,7 @@ def test_surprise_shutdown_stop_cache(pyocf_2_ctx, failover):
assert stats["usage"]["occupancy"]["value"] == 1
core = Core(device=core_device)
cache.add_core(core, try_add=True)
vol = CoreVolume(core, open=True)
vol = CoreVolume(core)
assert ocf_read(vol, cache.get_default_queue(), io_offset) == 0xAA
cache.stop()
@ -473,7 +476,7 @@ def test_surprise_shutdown_cache_reinit(pyocf_2_ctx, failover):
core = Core(device=core_device)
cache.add_core(core)
vol = CoreVolume(core, open=True)
vol = CoreVolume(core)
queue = cache.get_default_queue()
# insert dirty cacheline
@ -531,7 +534,7 @@ def test_surprise_shutdown_cache_reinit(pyocf_2_ctx, failover):
if stats["conf"]["core_count"] == 0:
assert stats["usage"]["occupancy"]["value"] == 0
cache.add_core(core)
vol = CoreVolume(core, open=True)
vol = CoreVolume(core)
assert ocf_read(vol, cache.get_default_queue(), io_offset) == VOLUME_POISON
cache.stop()
@ -822,7 +825,7 @@ def test_surprise_shutdown_standby_activate(pyocf_ctx):
cache = Cache.start_on_device(device, cache_mode=CacheMode.WB)
core = Core(device=core_device)
cache.add_core(core)
vol = CoreVolume(core, open=True)
vol = CoreVolume(core)
ocf_write(vol, cache.get_default_queue(), 0xAA, io_offset)
original_dirty_blocks = cache.get_stats()["usage"]["dirty"]
cache.stop()
@ -867,7 +870,7 @@ def test_surprise_shutdown_standby_activate(pyocf_ctx):
core = Core(device=core_device)
cache.add_core(core, try_add=True)
vol = CoreVolume(core, open=True)
vol = CoreVolume(core)
assert ocf_read(vol, cache.get_default_queue(), io_offset) == 0xAA
cache.stop()
@ -953,7 +956,7 @@ def test_surprise_shutdown_standby_init_force_1(pyocf_ctx):
cache = Cache.start_on_device(device, cache_mode=CacheMode.WB)
core = Core(device=core_device)
cache.add_core(core)
vol = CoreVolume(core, open=True)
vol = CoreVolume(core)
ocf_write(vol, cache.get_default_queue(), 0xAA, io_offset)
original_dirty_blocks = cache.get_stats()["usage"]["dirty"]
cache.stop()
@ -1002,14 +1005,14 @@ def test_surprise_shutdown_standby_init_force_1(pyocf_ctx):
assert original_dirty_blocks == stats["usage"]["dirty"]
core = Core(device=core_device)
cache.add_core(core, try_add=True)
vol = CoreVolume(core, open=True)
vol = CoreVolume(core)
assert ocf_read(vol, cache.get_default_queue(), io_offset) == 0xAA
else:
assert stats["usage"]["occupancy"]["value"] == 0
assert stats["usage"]["dirty"]["value"] == 0
core = Core(device=core_device)
cache.add_core(core)
vol = CoreVolume(core, open=True)
vol = CoreVolume(core)
assert ocf_read(vol, cache.get_default_queue(), io_offset) == VOLUME_POISON
cache.stop()
@ -1043,7 +1046,7 @@ def test_surprise_shutdown_standby_init_force_2(pyocf_ctx):
cache = Cache.start_on_device(device, cache_mode=CacheMode.WB)
core = Core(device=core_device)
cache.add_core(core)
vol = CoreVolume(core, open=True)
vol = CoreVolume(core)
ocf_write(vol, cache.get_default_queue(), 0xAA, io_offset)
original_dirty_blocks = cache.get_stats()["usage"]["dirty"]
cache.stop()
@ -1087,14 +1090,14 @@ def test_surprise_shutdown_standby_init_force_2(pyocf_ctx):
assert original_dirty_blocks == stats["usage"]["dirty"]
core = Core(device=core_device)
cache.add_core(core, try_add=True)
vol = CoreVolume(core, open=True)
vol = CoreVolume(core)
assert ocf_read(vol, cache.get_default_queue(), io_offset) == 0xAA
else:
assert stats["usage"]["occupancy"]["value"] == 0
assert stats["usage"]["dirty"]["value"] == 0
core = Core(device=core_device)
cache.add_core(core)
vol = CoreVolume(core, open=True)
vol = CoreVolume(core)
assert ocf_read(vol, cache.get_default_queue(), io_offset) == VOLUME_POISON
if cache: