Merge pull request #695 from arutk/failover_test_5

pyocf: failover functional and power failure recovery tests
This commit is contained in:
Adam Rutkowski 2022-05-16 16:37:45 +02:00 committed by GitHub
commit 3a1b6fd718
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 1395 additions and 264 deletions

6
env/posix/ocf_env.h vendored
View File

@ -1,5 +1,5 @@
/*
* Copyright(c) 2019-2021 Intel Corporation
* Copyright(c) 2019-2022 Intel Corporation
* SPDX-License-Identifier: BSD-3-Clause
*/
@ -67,11 +67,13 @@ typedef uint64_t sector_t;
#define ENV_MEM_ATOMIC 0
/* DEBUGING */
void env_stack_trace(void);
#define ENV_WARN(cond, fmt...) printf(fmt)
#define ENV_WARN_ON(cond) ;
#define ENV_WARN_ONCE(cond, fmt...) ENV_WARN(cond, fmt)
#define ENV_BUG() assert(0)
#define ENV_BUG() do {env_stack_trace(); assert(0);} while(0)
#define ENV_BUG_ON(cond) do { if (cond) ENV_BUG(); } while (0)
#define ENV_BUILD_BUG_ON(cond) _Static_assert(!(cond), "static "\
"assertion failure")

View File

@ -1,5 +1,5 @@
#
# Copyright(c) 2019-2021 Intel Corporation
# Copyright(c) 2019-2022 Intel Corporation
# SPDX-License-Identifier: BSD-3-Clause
#
@ -8,13 +8,14 @@ OCFDIR=$(PWD)/../../
ADAPTERDIR=$(PWD)/pyocf
SRCDIR=$(ADAPTERDIR)/ocf/src
INCDIR=$(ADAPTERDIR)/ocf/include
WRAPDIR=$(ADAPTERDIR)/wrappers
WRAPDIR=$(ADAPTERDIR)/c/wrappers
HELPDIR=$(ADAPTERDIR)/c/helpers
CC=gcc
CFLAGS=-g -Wall -I$(INCDIR) -I$(SRCDIR)/ocf/env
LDFLAGS=-pthread -lz
SRC=$(shell find $(SRCDIR) $(WRAPDIR) -name \*.c)
SRC=$(shell find $(SRCDIR) $(WRAPDIR) $(HELPDIR) -name \*.c)
OBJS=$(patsubst %.c, %.o, $(SRC))
OCFLIB=$(ADAPTERDIR)/libocf.so

View File

@ -0,0 +1,27 @@
/*
* Copyright(c) 2022-2022 Intel Corporation
* SPDX-License-Identifier: BSD-3-Clause
*/
#include "ocf/ocf_io.h"
#include "ocf/ocf_cache.h"
#include "../src/ocf/ocf_cache_priv.h"
#include "../src/ocf/metadata/metadata_raw.h"
#include "../src/ocf/metadata/metadata_internal.h"
// get collision metadata segment start and size (excluding padding)
uint64_t ocf_get_collision_start_page_helper(ocf_cache_t cache)
{
struct ocf_metadata_ctrl *ctrl = cache->metadata.priv;
struct ocf_metadata_raw *raw = &ctrl->raw_desc[metadata_segment_collision];
return raw->ssd_pages_offset;
}
uint64_t ocf_get_collision_page_count_helper(ocf_cache_t cache)
{
struct ocf_metadata_ctrl *ctrl = cache->metadata.priv;
struct ocf_metadata_raw *raw = &ctrl->raw_desc[metadata_segment_collision];
return raw->ssd_pages;
}

View File

@ -0,0 +1,9 @@
/*
* Copyright(c) 2022-2022 Intel Corporation
* SPDX-License-Identifier: BSD-3-Clause
*/
#pragma once
uint64_t ocf_get_collision_start_page_helper(ocf_cache_t cache);
uint64_t ocf_get_collision_page_count_helper(ocf_cache_t cache);

View File

@ -0,0 +1,16 @@
#
# Copyright(c) 2022 Intel Corporation
# SPDX-License-Identifier: BSD-3-Clause
#
#
from .ocf import OcfLib
def get_collision_segment_page_location(cache):
lib = OcfLib.getInstance()
return int(lib.ocf_get_collision_start_page_helper(cache))
def get_collision_segment_size(cache):
lib = OcfLib.getInstance()
return int(lib.ocf_get_collision_page_count_helper(cache))

View File

@ -261,12 +261,12 @@ class Rio:
self._threads = []
self.errors = {}
def run(self, queues=None):
def run(self, queues):
self.run_async(queues)
self.wait_for_completion()
return self
def run_async(self, queues=None):
def run_async(self, queues):
self.clear()
jobs = deepcopy(self.jobs)
@ -274,8 +274,6 @@ class Rio:
if not jobs:
jobs = [self.global_jobspec for _ in range(self.global_jobspec.njobs)]
if not queues:
queues = [self.global_jobspec.target.cache.get_default_queue()]
queues = cycle(queues)
for job in jobs:

View File

@ -39,7 +39,7 @@ from .io import IoDir
from .ioclass import IoClassesInfo, IoClassInfo
from .stats.shared import UsageStats, RequestsStats, BlocksStats, ErrorsStats
from .ctx import OcfCtx
from .volume import RamVolume
from .volume import RamVolume, Volume
class Backfill(Structure):
_fields_ = [("_max_queue_size", c_uint32), ("_queue_unblock_size", c_uint32)]
@ -205,10 +205,7 @@ class Cache:
self.cores = []
def start_cache(
self,
default_io_queue: Queue = None,
mngt_queue: Queue = None,
locked: bool = False,
self, init_mngmt_queue=True, init_default_io_queue=True, locked: bool = False,
):
cfg = CacheConfig(
_name=self.name.encode("ascii"),
@ -231,20 +228,24 @@ class Cache:
if status:
raise OcfError("Creating cache instance failed", status)
self.mngt_queue = mngt_queue or Queue(self, "mgmt-{}".format(self.get_name()))
if default_io_queue:
self.io_queues += [default_io_queue]
else:
self.io_queues += [Queue(self, "default-io-{}".format(self.get_name()))]
if init_mngmt_queue:
self.mngt_queue = Queue(self, "mgmt-{}".format(self.get_name()))
status = self.owner.lib.ocf_mngt_cache_set_mngt_queue(self, self.mngt_queue)
if status:
raise OcfError("Error setting management queue", status)
if init_default_io_queue:
self.io_queues = [Queue(self, "default-io-{}".format(self.get_name()))]
else:
self.io_queues = []
self.started = True
self.owner.caches.append(self)
def add_io_queue(self, *args, **kwargs):
q = Queue(self, args, **kwargs)
self.io_queues += [q]
def standby_detach(self):
self.write_lock()
c = OcfCompletion([("cache", c_void_p), ("priv", c_void_p), ("error", c_int)])
@ -560,11 +561,11 @@ class Cache:
c.results["error"],
)
def standby_load(self, device):
def standby_load(self, device, perform_test=True):
self.device = device
self.device_name = device.uuid
device_config = Cache.generate_device_config(device)
device_config = Cache.generate_device_config(device, perform_test=perform_test)
attach_cfg = CacheAttachConfig(
_device=device_config,
@ -755,40 +756,27 @@ class Cache:
def get_volume(self):
return Volume.get_instance(lib.ocf_cache_get_volume(self.cache_handle))
def get_stats(self):
def get_conf(self):
cache_info = CacheInfo()
usage = UsageStats()
req = RequestsStats()
block = BlocksStats()
errors = ErrorsStats()
self.read_lock()
status = self.owner.lib.ocf_cache_get_info(self.cache_handle, byref(cache_info))
if status:
self.read_unlock()
raise OcfError("Failed getting cache info", status)
status = self.owner.lib.ocf_stats_collect_cache(
self.cache_handle, byref(usage), byref(req), byref(block), byref(errors)
)
if status:
self.read_unlock()
raise OcfError("Failed getting stats", status)
if status:
raise OcfError("Failed getting cache info", status)
line_size = CacheLineSize(cache_info.cache_line_size)
cache_name = self.owner.lib.ocf_cache_get_name(self).decode("ascii")
self.read_unlock()
return {
"conf": {
"attached": cache_info.attached,
"volume_type": self.owner.volume_types[cache_info.volume_type],
"size": CacheLines(cache_info.size, line_size),
"inactive": {
"occupancy": CacheLines(
cache_info.inactive.occupancy.value, line_size
),
"occupancy": CacheLines(cache_info.inactive.occupancy.value, line_size),
"dirty": CacheLines(cache_info.inactive.dirty.value, line_size),
"clean": CacheLines(cache_info.inactive.clean.value, line_size),
},
@ -810,7 +798,29 @@ class Cache:
"metadata_footprint": Size(cache_info.metadata_footprint),
"metadata_end_offset": Size(cache_info.metadata_end_offset),
"cache_name": cache_name,
},
}
def get_stats(self):
usage = UsageStats()
req = RequestsStats()
block = BlocksStats()
errors = ErrorsStats()
self.read_lock()
conf = self.get_conf()
status = self.owner.lib.ocf_stats_collect_cache(
self.cache_handle, byref(usage), byref(req), byref(block), byref(errors)
)
self.read_unlock()
if status:
raise OcfError("Failed getting stats", status)
return {
"conf": conf,
"block": struct_to_dict(block),
"req": struct_to_dict(req),
"usage": struct_to_dict(usage),

View File

@ -1,5 +1,5 @@
#
# Copyright(c) 2019-2021 Intel Corporation
# Copyright(c) 2019-2022 Intel Corporation
# SPDX-License-Identifier: BSD-3-Clause
#
@ -45,6 +45,7 @@ class Queue:
def __init__(self, cache, name):
self.ops = QueueOps(kick=type(self)._kick, stop=type(self)._stop)
self.name = name
self.handle = c_void_p()
status = OcfLib.getInstance().ocf_queue_create(

View File

@ -405,16 +405,17 @@ class RamVolume(Volume):
return string_at(self.data_ptr, self.size)
class ErrorDevice(RamVolume):
class ErrorDevice(Volume):
def __init__(
self,
size,
vol,
error_sectors: set = None,
error_seq_no: dict = None,
armed=True,
uuid=None,
):
super().__init__(size, uuid)
self.vol = vol
super().__init__(uuid)
self.error_sectors = error_sectors
self.error_seq_no = error_seq_no
self.armed = armed
@ -426,7 +427,7 @@ class ErrorDevice(RamVolume):
def do_submit_io(self, io):
if not self.armed:
super().do_submit_io(io)
self.vol.do_submit_io(io)
return
direction = IoDir(io.contents._dir)
@ -451,7 +452,7 @@ class ErrorDevice(RamVolume):
io.contents._end(io, -OcfErrorCode.OCF_ERR_IO)
self.stats["errors"][direction] += 1
else:
super().do_submit_io(io)
self.vol.do_submit_io(io)
def arm(self):
self.armed = True
@ -463,9 +464,30 @@ class ErrorDevice(RamVolume):
return self.error
def reset_stats(self):
self.vol.reset_stats()
super().reset_stats()
self.stats["errors"] = {IoDir.WRITE: 0, IoDir.READ: 0}
def get_length(self):
return self.vol.get_length()
def get_max_io_size(self):
return self.vol.get_max_io_size()
def do_submit_flush(self, flush):
return self.vol.do_submit_flush(flush)
def do_submit_discard(self, discard):
return self.vol.do_submit_discard(discard)
def dump(self, offset=0, size=0, ignore=VOLUME_POISON, **kwargs):
return self.vol.dump(offset, size, ignore=ignore, **kwargs)
def md5(self):
return self.vol.md5()
def get_copy(self):
return self.vol.get_copy()
lib = OcfLib.getInstance()
lib.ocf_io_get_priv.restype = POINTER(VolumeIoPriv)

View File

@ -22,10 +22,11 @@ class CacheVolume(ExpObjVolume):
def open(self):
return Volume.open(
self.lib.ocf_cache_get_front_volume(self.cache.handle),
self.lib.ocf_cache_get_front_volume(self.cache.cache_handle),
self
)
def md5(self):
cache_line_size = int(self.cache.get_stats()['conf']['cache_line_size'])
out = self.cache.get_conf()
cache_line_size = int(out['cache_line_size'])
return self._exp_obj_md5(cache_line_size)

View File

@ -49,10 +49,10 @@ class ExpObjVolume(Volume):
return exp_obj_io
def get_length(self):
return Size.from_B(OcfLib.getInstance().ocf_volume_get_length(self.c_vol))
return Size.from_B(OcfLib.getInstance().ocf_volume_get_length(self.handle))
def get_max_io_size(self):
return Size.from_B(OcfLib.getInstance().ocf_volume_get_max_io_size(self.c_vol))
return Size.from_B(OcfLib.getInstance().ocf_volume_get_max_io_size(self.handle))
def do_submit_io(self, io):
io = self._alloc_io(io)

View File

@ -1,5 +1,5 @@
#
# Copyright(c) 2019-2021 Intel Corporation
# Copyright(c) 2019-2022 Intel Corporation
# SPDX-License-Identifier: BSD-3-Clause
#
@ -72,6 +72,7 @@ class Size:
_GiB = _MiB * 1024
_TiB = _GiB * 1024
_SECTOR_SIZE = 512
_PAGE_SIZE = 4096
def __init__(self, b: int, sector_aligned: bool = False):
if sector_aligned:
@ -130,6 +131,10 @@ class Size:
def from_sector(cls, value):
return cls(value * cls._SECTOR_SIZE)
@classmethod
def from_page(cls, value):
return cls(value * cls._PAGE_SIZE)
@property
def B(self):
return self.bytes

View File

@ -47,7 +47,8 @@ def test_simple_wt_write(pyocf_ctx):
def test_start_corrupted_metadata_lba(pyocf_ctx):
cache_device = ErrorDevice(S.from_MiB(50), error_sectors=set([0]))
ramdisk = RamVolume(S.from_MiB(50))
cache_device = ErrorDevice(ramdisk, error_sectors=set([0]))
with pytest.raises(OcfError, match="OCF_ERR_WRITE_CACHE"):
cache = Cache.start_on_device(cache_device)

View File

@ -16,6 +16,7 @@ from pyocf.types.volume_core import CoreVolume
from pyocf.types.volume_replicated import ReplicatedVolume
from pyocf.types.ctx import OcfCtx
default_registered_volumes = [RamVolume, ErrorDevice, CacheVolume, CoreVolume, ReplicatedVolume]
def pytest_configure(config):
sys.path.append(os.path.join(os.path.dirname(__file__), os.path.pardir))
@ -24,11 +25,8 @@ def pytest_configure(config):
@pytest.fixture()
def pyocf_ctx():
c = OcfCtx.with_defaults(DefaultLogger(LogLevel.WARN))
c.register_volume_type(RamVolume)
c.register_volume_type(ErrorDevice)
c.register_volume_type(CacheVolume)
c.register_volume_type(CoreVolume)
c.register_volume_type(ReplicatedVolume)
for vol_type in default_registered_volumes:
c.register_volume_type(vol_type)
yield c
c.exit()
gc.collect()
@ -38,11 +36,21 @@ def pyocf_ctx():
def pyocf_ctx_log_buffer():
logger = BufferLogger(LogLevel.DEBUG)
c = OcfCtx.with_defaults(logger)
c.register_volume_type(RamVolume)
c.register_volume_type(ErrorDevice)
c.register_volume_type(CacheVolume)
c.register_volume_type(CoreVolume)
c.register_volume_type(ReplicatedVolume)
for vol_type in default_registered_volumes:
c.register_volume_type(vol_type)
yield logger
c.exit()
gc.collect()
@pytest.fixture()
def pyocf_2_ctx():
c1 = OcfCtx.with_defaults(DefaultLogger(LogLevel.WARN, "Ctx1"))
c2 = OcfCtx.with_defaults(DefaultLogger(LogLevel.WARN, "Ctx2"))
for vol_type in default_registered_volumes:
c1.register_volume_type(vol_type)
c2.register_volume_type(vol_type)
yield [c1, c2]
c1.exit()
c2.exit()
gc.collect()

View File

@ -0,0 +1,591 @@
#
# Copyright(c) 2022-2022 Intel Corporation
# SPDX-License-Identifier: BSD-3-Clause
#
import pytest
import copy
from ctypes import c_int
from pyocf.types.cache import (
Cache,
CacheMode,
MetadataLayout,
CleaningPolicy,
)
from pyocf.types.core import Core
from pyocf.types.data import Data
from pyocf.types.io import Io, IoDir
from pyocf.types.volume import RamVolume, Volume
from pyocf.types.volume_cache import CacheVolume
from pyocf.types.volume_core import CoreVolume
from pyocf.types.volume_replicated import ReplicatedVolume
from pyocf.types.shared import (
OcfError,
OcfErrorCode,
OcfCompletion,
CacheLines,
CacheLineSize,
SeqCutOffPolicy,
)
from pyocf.utils import Size
from pyocf.rio import Rio, ReadWrite
def test_standby_stop_closes_volume(pyocf_2_ctx):
ctx = pyocf_2_ctx[1]
mode = CacheMode.WB
cls = CacheLineSize.LINE_4KiB
vol = RamVolume(Size.from_MiB(150))
cache = Cache(owner=ctx, cache_mode=mode, cache_line_size=cls)
cache.start_cache()
cache.standby_attach(vol, force=False)
cache.stop()
assert not vol.opened
def test_standby_stop_detached(pyocf_2_ctx):
ctx = pyocf_2_ctx[1]
mode = CacheMode.WB
cls = CacheLineSize.LINE_4KiB
vol = RamVolume(Size.from_MiB(150))
cache = Cache(owner=ctx, cache_mode=mode, cache_line_size=cls)
cache.start_cache()
cache.standby_attach(vol, force=False)
cache.standby_detach()
assert not vol.opened
cache.stop()
# verify that force flag is required to attach a standby instance
# on a volume where standby instance had previously been running
def test_standby_attach_force_after_standby(pyocf_2_ctx):
ctx = pyocf_2_ctx[1]
mode = CacheMode.WB
cls = CacheLineSize.LINE_4KiB
vol = RamVolume(Size.from_MiB(150))
cache = Cache(owner=ctx, cache_mode=mode, cache_line_size=cls)
cache.start_cache()
cache.standby_attach(vol, force=False)
cache.standby_detach()
cache.stop()
cache = Cache(owner=ctx, cache_mode=mode, cache_line_size=cls)
cache.start_cache()
with pytest.raises(OcfError) as ex:
cache.standby_attach(vol, force=False)
assert ex.value.error_code == OcfErrorCode.OCF_ERR_METADATA_FOUND
cache.standby_attach(vol, force=True)
def test_standby_attach_force_after_active(pyocf_2_ctx):
ctx = pyocf_2_ctx[1]
mode = CacheMode.WB
cls = CacheLineSize.LINE_4KiB
vol = RamVolume(Size.from_MiB(150))
cache = Cache(owner=ctx, cache_mode=mode, cache_line_size=cls)
cache.start_cache()
cache.attach_device(vol)
cache.stop()
assert not vol.opened
cache = Cache(owner=ctx, cache_mode=mode, cache_line_size=cls)
cache.start_cache()
with pytest.raises(OcfError) as ex:
cache.standby_attach(vol, force=False)
assert ex.value.error_code == OcfErrorCode.OCF_ERR_METADATA_FOUND
cache.standby_attach(vol, force=True)
# standby load from standby cache instance after clean shutdown
def test_standby_load_after_standby_clean_shutdown(pyocf_2_ctx):
ctx = pyocf_2_ctx[1]
mode = CacheMode.WB
cls = CacheLineSize.LINE_4KiB
vol = RamVolume(Size.from_MiB(150))
cache = Cache(owner=ctx, cache_mode=mode, cache_line_size=cls)
cache.start_cache()
cache.standby_attach(vol, force=False)
cache.stop()
cache = Cache(owner=ctx, cache_mode=mode, cache_line_size=cls)
cache.start_cache()
vol.reset_stats()
cache.standby_load(vol, perform_test=False)
assert vol.get_stats()[IoDir.WRITE] == 0
cache.stop()
# standby load from active cache instance after clean shutdown
def test_standby_load_after_active_clean_shutdown(pyocf_2_ctx):
ctx = pyocf_2_ctx[1]
mode = CacheMode.WB
cls = CacheLineSize.LINE_4KiB
vol = RamVolume(Size.from_MiB(150))
cache = Cache(owner=ctx, cache_mode=mode, cache_line_size=cls)
cache.start_cache()
cache.attach_device(vol, force=False)
cache.stop()
cache = Cache(owner=ctx, cache_mode=mode, cache_line_size=cls)
cache.start_cache()
vol.reset_stats()
cache.standby_load(vol, perform_test=False)
assert vol.get_stats()[IoDir.WRITE] == 0
# standby load from active cache instance after clean shutdown
def test_standby_load_after_active_dirty_shutdown(pyocf_2_ctx):
ctx = pyocf_2_ctx[1]
mode = CacheMode.WB
cls = CacheLineSize.LINE_4KiB
vol = RamVolume(Size.from_MiB(150))
cache = Cache(owner=ctx, cache_mode=mode, cache_line_size=cls)
cache.start_cache()
cache.attach_device(vol, force=False)
vol.offline()
with pytest.raises(OcfError) as ex:
cache.stop()
assert ex.value.error_code == OcfErrorCode.OCF_ERR_WRITE_CACHE
vol.online()
cache = Cache(owner=ctx, cache_mode=mode, cache_line_size=cls)
cache.start_cache()
vol.reset_stats()
cache.standby_load(vol, perform_test=False)
assert vol.get_stats()[IoDir.WRITE] == 0
cache.stop()
def test_standby_load_after_standby_dirty_shutdown(pyocf_2_ctx):
ctx = pyocf_2_ctx[1]
mode = CacheMode.WB
cls = CacheLineSize.LINE_4KiB
vol = RamVolume(Size.from_MiB(150))
cache = Cache(owner=ctx, cache_mode=mode, cache_line_size=cls)
cache.start_cache()
cache.standby_attach(vol, force=False)
vol.offline()
cache.stop()
vol.online()
cache = Cache(owner=ctx, cache_mode=mode, cache_line_size=cls)
cache.start_cache()
vol.reset_stats()
cache.standby_load(vol, perform_test=False)
assert vol.get_stats()[IoDir.WRITE] == 0
cache.stop()
def test_standby_load_after_standby_dirty_shutdown_with_vol_test(pyocf_2_ctx):
ctx = pyocf_2_ctx[1]
mode = CacheMode.WB
cls = CacheLineSize.LINE_4KiB
vol = RamVolume(Size.from_MiB(150))
cache = Cache(owner=ctx, cache_mode=mode, cache_line_size=cls)
cache.start_cache()
cache.standby_attach(vol, force=False)
vol.offline()
cache.stop()
vol.online()
cache = Cache(owner=ctx, cache_mode=mode, cache_line_size=cls)
cache.start_cache()
cache.standby_load(vol)
cache.stop()
def test_standby_activate_core_size_mismatch_after_active(pyocf_2_ctx):
ctx = pyocf_2_ctx[1]
mode = CacheMode.WB
cls = CacheLineSize.LINE_4KiB
vol = RamVolume(Size.from_MiB(150))
cache = Cache(owner=ctx, cache_mode=mode, cache_line_size=cls)
cache.start_cache()
cache.attach_device(vol, force=False)
# prepare and stop cache instance with standard size core volume
core_vol_uuid = str(id(cache))
core_vol_size_initial = Size.from_MiB(150)
core_vol = RamVolume(core_vol_size_initial, uuid=core_vol_uuid)
core = Core(core_vol)
cache.add_core(core)
cache.stop()
cache = None
# resize core volume
# TODO: how to avoid manually removing vol<->uuid mapping?
del Volume._uuid_[core_vol.uuid]
core_vol = None
core_vol = RamVolume(2 * core_vol_size_initial, uuid=core_vol_uuid)
# standby load on the volume
cache = Cache(owner=ctx, cache_mode=mode, cache_line_size=cls)
cache.start_cache()
cache.standby_load(vol)
cache.standby_detach()
# first attempt to activate with size mismatch
with pytest.raises(OcfError) as ex:
cache.standby_activate(vol)
assert ex.value.error_code == OcfErrorCode.OCF_ERR_CORE_SIZE_MISMATCH
# second attempt to activate with size mismatch
with pytest.raises(OcfError) as ex:
cache.standby_activate(vol)
assert ex.value.error_code == OcfErrorCode.OCF_ERR_CORE_SIZE_MISMATCH
del Volume._uuid_[core_vol.uuid]
core_vol = RamVolume(core_vol_size_initial, uuid=core_vol_uuid)
# attempt to activate with fixed sizE
cache.standby_activate(vol)
cache.stop()
def test_standby_activate_core_size_mismatch(pyocf_2_ctx):
ctx = pyocf_2_ctx[1]
mode = CacheMode.WB
cls = CacheLineSize.LINE_4KiB
vol1 = RamVolume(Size.from_MiB(150), uuid="cv1")
cache = Cache(owner=ctx, cache_mode=mode, cache_line_size=cls)
cache.start_cache()
cache.attach_device(vol1, force=False)
core_vol_uuid = str(id(cache))
core_vol_size_initial = Size.from_MiB(150)
core_vol = RamVolume(core_vol_size_initial, uuid=core_vol_uuid)
core2_vol = RamVolume(core_vol_size_initial)
core = Core(core_vol)
core2 = Core(core2_vol, name="core2")
cache.add_core(core)
cache.add_core(core2)
data = vol1.get_bytes()
cache.stop()
vol1 = None
del Volume._uuid_[core_vol.uuid]
core_vol = None
vol2 = RamVolume(Size.from_MiB(150), uuid="cv2")
cache = Cache(owner=ctx, cache_mode=mode, cache_line_size=cls)
cache.start_cache()
cache.standby_attach(vol2)
cache_vol = CacheVolume(cache, open=True)
write_vol(cache_vol, cache.get_default_queue(), data)
core_vol = RamVolume(2 * core_vol_size_initial, uuid=core_vol_uuid)
cache.standby_detach()
# first attempt to activate with size mismatch
with pytest.raises(OcfError) as ex:
cache.standby_activate(vol2)
assert ex.value.error_code == OcfErrorCode.OCF_ERR_CORE_SIZE_MISMATCH
# second attempt to activate with size mismatch
with pytest.raises(OcfError) as ex:
cache.standby_activate(vol2)
assert ex.value.error_code == OcfErrorCode.OCF_ERR_CORE_SIZE_MISMATCH
del Volume._uuid_[core_vol.uuid]
core_vol = None
core_vol = RamVolume(core_vol_size_initial, uuid=core_vol_uuid)
# attempt to activate with fixed sizE
cache.standby_activate(vol2)
cache.stop()
def test_failover_passive_first(pyocf_2_ctx):
ctx1 = pyocf_2_ctx[0]
ctx2 = pyocf_2_ctx[1]
mode = CacheMode.WB
cls = CacheLineSize.LINE_4KiB
prim_cache_backend_vol = RamVolume(Size.from_MiB(150))
core_backend_vol = RamVolume(Size.from_MiB(1))
sec_cache_backend_vol = RamVolume(Size.from_MiB(150))
# passive cache with directly on ram disk
cache2 = Cache(owner=ctx2, cache_mode=mode, cache_line_size=cls)
cache2.start_cache()
cache2.standby_attach(sec_cache_backend_vol)
# volume replicating cache1 ramdisk writes to cache2 cache exported object
cache2_exp_obj_vol = CacheVolume(cache2, open=True)
cache1_cache_vol = ReplicatedVolume(prim_cache_backend_vol, cache2_exp_obj_vol)
# active cache
cache1 = Cache.start_on_device(
cache1_cache_vol, ctx1, cache_mode=mode, cache_line_size=cls
)
core = Core(core_backend_vol)
cache1.add_core(core)
core_vol = CoreVolume(core, open=True)
queue = cache1.get_default_queue()
# some I/O
r = (
Rio()
.target(core_vol)
.njobs(1)
.readwrite(ReadWrite.WRITE)
.size(Size.from_MiB(1))
.qd(1)
.run([queue])
)
# capture checksum before simulated active host failure
md5 = core_vol.md5()
# offline primary cache volume and stop primary cache to simulate active host
# failure
cache1_cache_vol.offline()
with pytest.raises(OcfError) as ex:
cache1.stop()
assert ex.value.error_code == OcfErrorCode.OCF_ERR_WRITE_CACHE
# failover
cache2.standby_detach()
cache2.standby_activate(sec_cache_backend_vol, open_cores=False)
# add core explicitly with "try_add" to workaround pyocf limitations
core = Core(core_backend_vol)
cache2.add_core(core, try_add=True)
core_vol = CoreVolume(core, open=True)
assert md5 == core_vol.md5()
def write_vol(vol, queue, data):
data_size = len(data)
subdata_size_max = int(Size.from_MiB(32))
for offset in range(0, data_size, subdata_size_max):
subdata_size = min(data_size - offset, subdata_size_max)
subdata = Data.from_bytes(data, offset, subdata_size)
comp = OcfCompletion([("error", c_int)])
io = vol.new_io(queue, offset, subdata_size, IoDir.WRITE, 0, 0,)
io.set_data(subdata)
io.callback = comp.callback
io.submit()
comp.wait()
def test_failover_active_first(pyocf_2_ctx):
ctx1 = pyocf_2_ctx[0]
ctx2 = pyocf_2_ctx[1]
mode = CacheMode.WB
cls = CacheLineSize.LINE_4KiB
prim_cache_backend_vol = RamVolume(Size.from_MiB(150))
core_backend_vol = RamVolume(Size.from_MiB(1))
# active cache
cache1 = Cache.start_on_device(
prim_cache_backend_vol, ctx1, cache_mode=mode, cache_line_size=cls
)
core = Core(core_backend_vol)
cache1.add_core(core)
vol = CoreVolume(core, open=True)
queue1 = cache1.get_default_queue()
# some I/O
r = (
Rio()
.target(vol)
.njobs(1)
.readwrite(ReadWrite.WRITE)
.size(Size.from_MiB(1))
.qd(1)
.run([queue1])
)
# capture checksum before simulated active host failure
data_md5 = vol.md5()
prim_cache_backend_vol.offline()
with pytest.raises(OcfError) as ex:
cache1.stop()
assert ex.value.error_code == OcfErrorCode.OCF_ERR_WRITE_CACHE
# capture a copy of active cache instance data
data = prim_cache_backend_vol.get_bytes()
cache_md5 = prim_cache_backend_vol.md5()
# setup standby cache
sec_cache_backend_vol = RamVolume(Size.from_MiB(150))
cache2 = Cache(owner=ctx2, cache_mode=mode, cache_line_size=cls)
cache2.start_cache()
cache2.standby_attach(sec_cache_backend_vol)
vol2 = CacheVolume(cache2, open=True)
queue = cache2.get_default_queue()
# standby cache exported object volume
cache2_exp_obj_vol = CacheVolume(cache2, open=True)
# just to be sure
assert sec_cache_backend_vol.get_bytes() != prim_cache_backend_vol.get_bytes()
# write content of active cache volume to passive cache exported obj
write_vol(vol2, queue, data)
assert cache_md5 == cache2_exp_obj_vol.md5()
# volumes should have the same data
assert sec_cache_backend_vol.get_bytes() == prim_cache_backend_vol.get_bytes()
# failover
cache2.standby_detach()
cache2.standby_activate(sec_cache_backend_vol, open_cores=False)
core = Core(core_backend_vol)
cache2.add_core(core, try_add=True)
vol = CoreVolume(core, open=True)
# check data consistency
assert data_md5 == vol.md5()
def test_standby_load_writes_count(pyocf_ctx):
# Prepare a volume with valid metadata
device = RamVolume(Size.from_MiB(40))
cache = Cache.start_on_device(device, cache_mode=CacheMode.WB)
cache.stop()
device.reset_stats()
cache = Cache(owner=pyocf_ctx)
cache.start_cache()
cache.standby_load(device, perform_test=False)
assert device.get_stats()[IoDir.WRITE] == 0
def test_failover_line_size_mismatch(pyocf_2_ctx):
ctx = pyocf_2_ctx[1]
mode = CacheMode.WB
cls = CacheLineSize.LINE_4KiB
cls2 = CacheLineSize.LINE_64KiB
vol1 = RamVolume(Size.from_MiB(150), uuid="cv1")
cache = Cache(owner=ctx, cache_mode=mode, cache_line_size=cls)
cache.start_cache()
cache.attach_device(vol1, force=False)
core_vol = RamVolume(Size.from_MiB(150))
core = Core(core_vol)
cache.add_core(core)
data = vol1.get_bytes()
cache.stop()
vol1 = None
vol2 = RamVolume(Size.from_MiB(150), uuid="cv2")
cache = Cache(owner=ctx, cache_mode=mode, cache_line_size=cls2)
cache.start_cache()
cache.standby_attach(vol2)
cache_vol = CacheVolume(cache, open=True)
write_vol(cache_vol, cache.get_default_queue(), data)
cache.get_conf()["cache_line_size"] == cls2
cache.standby_detach()
# first attempt to activate with size mismatch
with pytest.raises(OcfError) as ex:
cache.standby_activate(vol2)
assert ex.value.error_code == OcfErrorCode.OCF_ERR_START_CACHE_FAIL
# second attempt to activate with size mismatch
with pytest.raises(OcfError) as ex:
cache.standby_activate(vol2)
assert ex.value.error_code == OcfErrorCode.OCF_ERR_START_CACHE_FAIL
cache.stop()
cache = Cache(owner=ctx, cache_mode=mode, cache_line_size=cls)
cache.start_cache()
cache.standby_load(vol2)
cache.standby_detach()
cache.standby_activate(vol2)
cache.get_conf()["cache_line_size"] == cls
cache.stop()
def test_failover_passive_first(pyocf_2_ctx):
ctx1 = pyocf_2_ctx[0]
ctx2 = pyocf_2_ctx[1]
mode = CacheMode.WB
cls = CacheLineSize.LINE_4KiB
prim_cache_backend_vol = RamVolume(Size.from_MiB(150))
core_backend_vol = RamVolume(Size.from_MiB(1))
sec_cache_backend_vol = RamVolume(Size.from_MiB(150))
# passive cache with directly on ram disk
cache2 = Cache(owner=ctx2, cache_mode=mode, cache_line_size=cls)
cache2.start_cache()
cache2.standby_attach(sec_cache_backend_vol)
# volume replicating cache1 ramdisk writes to cache2 cache exported object
cache2_exp_obj_vol = CacheVolume(cache2, open=True)
cache1_cache_vol = ReplicatedVolume(prim_cache_backend_vol, cache2_exp_obj_vol)
# active cache
cache1 = Cache.start_on_device(
cache1_cache_vol, ctx1, cache_mode=mode, cache_line_size=cls
)
core = Core(core_backend_vol)
cache1.add_core(core)
core_vol = CoreVolume(core, open=True)
queue = cache1.get_default_queue()
# some I/O
r = (
Rio()
.target(core_vol)
.njobs(1)
.readwrite(ReadWrite.WRITE)
.size(Size.from_MiB(1))
.qd(1)
.run([queue])
)
# capture checksum before simulated active host failure
md5 = core_vol.md5()
# offline primary cache volume and stop primary cache to simulate active host
# failure
cache1_cache_vol.offline()
with pytest.raises(OcfError) as ex:
cache1.stop()
assert ex.value.error_code == OcfErrorCode.OCF_ERR_WRITE_CACHE
# failover
cache2.standby_detach()
cache2.standby_activate(sec_cache_backend_vol, open_cores=False)
# add core explicitly with "try_add" to workaround pyocf limitations
core = Core(core_backend_vol)
cache2.add_core(core, try_add=True)
core_vol = CoreVolume(core, open=True)
assert md5 == core_vol.md5()

View File

@ -15,10 +15,13 @@ from pyocf.types.cache import (
AcpParams,
NhitParams,
)
from pyocf.types.ctx import OcfCtx
from pyocf.types.data import Data
from pyocf.types.core import Core
from pyocf.types.volume import ErrorDevice, RamVolume, VOLUME_POISON
from pyocf.types.volume_core import CoreVolume
from pyocf.types.volume_cache import CacheVolume
from pyocf.types.io import IoDir
from pyocf.types.ioclass import IoClassesInfo, IoClassInfo
from pyocf.utils import Size as S
@ -56,19 +59,51 @@ def ocf_read(vol, queue, offset):
return data.get_bytes()[0]
def prepare_failover(pyocf_2_ctx, cache_backend_vol, error_io_seq_no):
ctx1 = pyocf_2_ctx[0]
ctx2 = pyocf_2_ctx[1]
cache2 = Cache(owner=ctx2)
cache2.start_cache()
cache2.standby_attach(cache_backend_vol)
cache2_exp_obj_vol = CacheVolume(cache2, open=True)
error_io = {IoDir.WRITE: error_io_seq_no}
err_vol = ErrorDevice(cache2_exp_obj_vol, error_seq_no=error_io, armed=False)
cache = Cache.start_on_device(err_vol, cache_mode=CacheMode.WB, owner=ctx1)
return cache, cache2, err_vol
def prepare_normal(pyocf_2_ctx, cache_backend_vol, error_io_seq_no):
ctx1 = pyocf_2_ctx[0]
error_io = {IoDir.WRITE: error_io_seq_no}
err_vol = ErrorDevice(cache_backend_vol, error_seq_no=error_io, armed=False)
cache = Cache.start_on_device(err_vol, cache_mode=CacheMode.WB, owner=ctx1)
return cache, err_vol
def mngmt_op_surprise_shutdown_test(
pyocf_ctx, mngt_func, prepare_func, consistency_check_func
pyocf_2_ctx, failover, mngt_func, prepare_func, consistency_check_func
):
error_triggered = True
error_io_seq_no = 0
while error_triggered:
# Start cache device without error injection
error_io = {IoDir.WRITE: error_io_seq_no}
device = ErrorDevice(
mngmt_op_surprise_shutdown_test_cache_size, armed=False, error_seq_no=error_io
cache_backend_vol = RamVolume(mngmt_op_surprise_shutdown_test_cache_size)
if failover:
cache, cache2, err_vol = prepare_failover(
pyocf_2_ctx, cache_backend_vol, error_io_seq_no
)
else:
cache, err_vol = prepare_normal(
pyocf_2_ctx, cache_backend_vol, error_io_seq_no
)
cache = Cache.start_on_device(device, cache_mode=CacheMode.WB)
if prepare_func:
prepare_func(cache)
@ -77,17 +112,17 @@ def mngmt_op_surprise_shutdown_test(
cache.save()
# initiate error injection starting at write no @error_io_seq_no
device.arm()
err_vol.arm()
# call tested management function
status = 0
try:
mngt_func(cache)
status = OcfErrorCode.OCF_OK
except OcfError as ex:
status = ex.error_code
# if error was injected we expect mngmt op error
error_triggered = device.error_triggered()
error_triggered = err_vol.error_triggered()
assert error_triggered == (status != 0)
if error_triggered:
assert (
@ -98,12 +133,18 @@ def mngmt_op_surprise_shutdown_test(
# stop cache with error injection still on
with pytest.raises(OcfError) as ex:
cache.stop()
cache = None
assert ex.value.error_code == OcfErrorCode.OCF_ERR_WRITE_CACHE
# disable error injection and load the cache
device.disarm()
# discard error volume
err_vol.disarm()
cache = Cache.load_from_device(device, open_cores=True)
if failover:
cache2.standby_detach()
cache2.standby_activate(cache_backend_vol, open_cores=True)
cache = cache2
else:
cache = Cache.load_from_device(err_vol, open_cores=True)
# run consistency check
if consistency_check_func is not None:
@ -118,7 +159,8 @@ def mngmt_op_surprise_shutdown_test(
# power failure during core insert
@pytest.mark.security
def test_surprise_shutdown_add_core(pyocf_ctx):
@pytest.mark.parametrize("failover", [False, True])
def test_surprise_shutdown_add_core(pyocf_2_ctx, failover):
core_device = RamVolume(S.from_MiB(10))
def check_core(cache, error_triggered):
@ -132,13 +174,16 @@ def test_surprise_shutdown_add_core(pyocf_ctx):
def check_func(cache, error_triggered):
check_core(cache, error_triggered)
mngmt_op_surprise_shutdown_test(pyocf_ctx, tested_func, None, check_func)
mngmt_op_surprise_shutdown_test(
pyocf_2_ctx, failover, tested_func, None, check_func
)
# power failure during core removal
@pytest.mark.security
@pytest.mark.long
def test_surprise_shutdown_remove_core(pyocf_ctx):
@pytest.mark.parametrize("failover", [False, True])
def test_surprise_shutdown_remove_core(pyocf_2_ctx, failover):
core_device = RamVolume(S.from_MiB(10))
core = Core.using_device(core_device)
@ -152,12 +197,15 @@ def test_surprise_shutdown_remove_core(pyocf_ctx):
stats = cache.get_stats()
assert stats["conf"]["core_count"] == (1 if error_triggered else 0)
mngmt_op_surprise_shutdown_test(pyocf_ctx, tested_func, prepare_func, check_func)
mngmt_op_surprise_shutdown_test(
pyocf_2_ctx, failover, tested_func, prepare_func, check_func
)
@pytest.mark.security
@pytest.mark.long
def test_surprise_shutdown_remove_core_with_data(pyocf_ctx):
@pytest.mark.parametrize("failover", [False, True])
def test_surprise_shutdown_remove_core_with_data(pyocf_2_ctx, failover):
io_offset = mngmt_op_surprise_shutdown_test_io_offset
core_device = RamVolume(S.from_MiB(10))
core = Core.using_device(core_device, name="core1")
@ -180,13 +228,16 @@ def test_surprise_shutdown_remove_core_with_data(pyocf_ctx):
vol = CoreVolume(core, open=True)
assert ocf_read(vol, cache.get_default_queue(), io_offset) == 0xAA
mngmt_op_surprise_shutdown_test(pyocf_ctx, tested_func, prepare_func, check_func)
mngmt_op_surprise_shutdown_test(
pyocf_2_ctx, failover, tested_func, prepare_func, check_func
)
# power failure during core add after previous core removed
@pytest.mark.security
@pytest.mark.long
def test_surprise_shutdown_swap_core(pyocf_ctx):
@pytest.mark.parametrize("failover", [False, True])
def test_surprise_shutdown_swap_core(pyocf_2_ctx, failover):
core_device_1 = RamVolume(S.from_MiB(10), uuid="dev1")
core_device_2 = RamVolume(S.from_MiB(10), uuid="dev2")
core1 = Core.using_device(core_device_1, name="core1")
@ -215,13 +266,16 @@ def test_surprise_shutdown_swap_core(pyocf_ctx):
core2 = cache.get_core_by_name("core2")
assert core2.device.uuid == "dev2"
mngmt_op_surprise_shutdown_test(pyocf_ctx, tested_func, prepare, check_func)
mngmt_op_surprise_shutdown_test(
pyocf_2_ctx, failover, tested_func, prepare, check_func
)
# power failure during core add after previous core removed
@pytest.mark.security
@pytest.mark.long
def test_surprise_shutdown_swap_core_with_data(pyocf_ctx):
@pytest.mark.parametrize("failover", [False, True])
def test_surprise_shutdown_swap_core_with_data(pyocf_2_ctx, failover):
core_device_1 = RamVolume(S.from_MiB(10), uuid="dev1")
core_device_2 = RamVolume(S.from_MiB(10), uuid="dev2")
core1 = Core.using_device(core_device_1, name="core1")
@ -231,7 +285,12 @@ def test_surprise_shutdown_swap_core_with_data(pyocf_ctx):
cache.add_core(core1)
vol = CoreVolume(core1, open=True)
cache.save()
ocf_write(vol, cache.get_default_queue(), 0xAA, mngmt_op_surprise_shutdown_test_io_offset)
ocf_write(
vol,
cache.get_default_queue(),
0xAA,
mngmt_op_surprise_shutdown_test_io_offset,
)
cache.remove_core(core1)
cache.save()
@ -256,39 +315,56 @@ def test_surprise_shutdown_swap_core_with_data(pyocf_ctx):
vol2 = CoreVolume(core2, open=True)
assert core2.device.uuid == "dev2"
assert (
ocf_read(vol2, cache.get_default_queue(), mngmt_op_surprise_shutdown_test_io_offset)
ocf_read(
vol2,
cache.get_default_queue(),
mngmt_op_surprise_shutdown_test_io_offset,
)
== VOLUME_POISON
)
mngmt_op_surprise_shutdown_test(pyocf_ctx, tested_func, prepare, check_func)
mngmt_op_surprise_shutdown_test(
pyocf_2_ctx, failover, tested_func, prepare, check_func
)
# make sure there are no crashes when cache start is interrupted
# 1. is this checksum mismatch actually expected and the proper way
# to avoid loading improperly initialized cache?
# 2. uuid checksum mismatch should not allow cache to load
@pytest.mark.security
@pytest.mark.long
def test_surprise_shutdown_start_cache(pyocf_ctx):
@pytest.mark.parametrize("failover", [False, True])
def test_surprise_shutdown_start_cache(pyocf_2_ctx, failover):
ctx1 = pyocf_2_ctx[0]
ctx2 = pyocf_2_ctx[1]
error_triggered = True
error_io_seq_no = 0
while error_triggered:
# Start cache device without error injection
error_io = {IoDir.WRITE: error_io_seq_no}
device = ErrorDevice(
mngmt_op_surprise_shutdown_test_cache_size, error_seq_no=error_io, armed=True
ramdisk = RamVolume(mngmt_op_surprise_shutdown_test_cache_size)
if failover:
cache2 = Cache(owner=ctx2)
cache2.start_cache()
cache2.standby_attach(ramdisk)
cache2_exp_obj_vol = CacheVolume(cache2, open=True)
err_device = ErrorDevice(
cache2_exp_obj_vol, error_seq_no=error_io, armed=True
)
else:
err_device = ErrorDevice(ramdisk, error_seq_no=error_io, armed=True)
# call tested management function
status = 0
try:
cache = Cache.start_on_device(device, cache_mode=CacheMode.WB)
cache = Cache.start_on_device(err_device, cache_mode=CacheMode.WB)
status = OcfErrorCode.OCF_OK
except OcfError as ex:
status = ex.error_code
# if error was injected we expect mngmt op error
error_triggered = device.error_triggered()
error_triggered = err_device.error_triggered()
assert error_triggered == (status != 0)
if not error_triggered:
@ -299,16 +375,27 @@ def test_surprise_shutdown_start_cache(pyocf_ctx):
break
# disable error injection and load the cache
device.disarm()
err_device.disarm()
cache = None
if failover:
try:
cache = Cache.load_from_device(device)
cache2.standby_detach()
cache2.standby_activate(ramdisk, open_cores=True)
cache = cache2
except OcfError:
cache2.stop()
cache2 = None
cache = None
else:
try:
cache = Cache.load_from_device(err_device, open_cores=True)
except OcfError:
cache = None
if cache is not None:
cache.stop()
cache = None
# advance error injection point
error_io_seq_no += 1
@ -316,7 +403,8 @@ def test_surprise_shutdown_start_cache(pyocf_ctx):
@pytest.mark.security
@pytest.mark.long
def test_surprise_shutdown_stop_cache(pyocf_ctx):
@pytest.mark.parametrize("failover", [False, True])
def test_surprise_shutdown_stop_cache(pyocf_2_ctx, failover):
core_device = RamVolume(S.from_MiB(10))
error_triggered = True
error_io_seq_no = 0
@ -324,13 +412,15 @@ def test_surprise_shutdown_stop_cache(pyocf_ctx):
while error_triggered:
# Start cache device without error injection
error_io = {IoDir.WRITE: error_io_seq_no}
device = ErrorDevice(
mngmt_op_surprise_shutdown_test_cache_size, error_seq_no=error_io, armed=False
)
ramdisk = RamVolume(mngmt_op_surprise_shutdown_test_cache_size)
if failover:
cache, cache2, device = prepare_failover(
pyocf_2_ctx, ramdisk, error_io_seq_no
)
else:
cache, device = prepare_normal(pyocf_2_ctx, ramdisk, error_io_seq_no)
# setup cache and insert some data
cache = Cache.start_on_device(device, cache_mode=CacheMode.WB)
core = Core(device=core_device)
cache.add_core(core)
vol = CoreVolume(core, open=True)
@ -350,7 +440,7 @@ def test_surprise_shutdown_stop_cache(pyocf_ctx):
if error_triggered:
assert status == OcfErrorCode.OCF_ERR_WRITE_CACHE
else:
assert status == 0
assert status == OcfErrorCode.OCF_OK
if not error_triggered:
break
@ -361,7 +451,13 @@ def test_surprise_shutdown_stop_cache(pyocf_ctx):
assert core_device.get_bytes()[io_offset] == VOLUME_POISON
if failover:
cache2.standby_detach()
cache2.standby_activate(ramdisk, open_cores=False)
cache = cache2
else:
cache = Cache.load_from_device(device, open_cores=False)
stats = cache.get_stats()
if stats["conf"]["core_count"] == 1:
assert stats["usage"]["occupancy"]["value"] == 1
@ -377,22 +473,25 @@ def test_surprise_shutdown_stop_cache(pyocf_ctx):
@pytest.mark.security
def test_surprise_shutdown_cache_reinit(pyocf_ctx):
@pytest.mark.parametrize("failover", [False, True])
def test_surprise_shutdown_cache_reinit(pyocf_2_ctx, failover):
core_device = RamVolume(S.from_MiB(10))
error_io = {IoDir.WRITE: 0}
error_io_seq_no = 0
io_offset = mngmt_op_surprise_shutdown_test_io_offset
error_triggered = True
while error_triggered:
# Start cache device without error injection
device = ErrorDevice(
mngmt_op_surprise_shutdown_test_cache_size, error_seq_no=error_io, armed=False
)
ramdisk = RamVolume(mngmt_op_surprise_shutdown_test_cache_size)
if failover:
cache, cache2, device = prepare_failover(
pyocf_2_ctx, ramdisk, error_io_seq_no
)
else:
cache, device = prepare_normal(pyocf_2_ctx, ramdisk, error_io_seq_no)
# start WB
cache = Cache.start_on_device(device, cache_mode=CacheMode.WB)
core = Core(device=core_device)
cache.add_core(core)
vol = CoreVolume(core, open=True)
@ -402,6 +501,7 @@ def test_surprise_shutdown_cache_reinit(pyocf_ctx):
ocf_write(vol, queue, 0xAA, io_offset)
cache.stop()
cache = None
assert core_device.get_bytes()[io_offset] == VOLUME_POISON
@ -429,8 +529,19 @@ def test_surprise_shutdown_cache_reinit(pyocf_ctx):
cache = None
status = OcfErrorCode.OCF_OK
if failover:
try:
cache = Cache.load_from_device(device)
cache2.standby_detach()
cache2.standby_activate(ramdisk, open_cores=True)
cache = cache2
except OcfError as ex:
cache2.stop()
cache2 = None
status = ex.error_code
else:
try:
cache = Cache.load_from_device(device, open_cores=True)
except OcfError as ex:
status = ex.error_code
@ -442,14 +553,17 @@ def test_surprise_shutdown_cache_reinit(pyocf_ctx):
assert stats["usage"]["occupancy"]["value"] == 0
cache.add_core(core)
vol = CoreVolume(core, open=True)
assert ocf_read(vol, cache.get_default_queue(), io_offset) == VOLUME_POISON
assert (
ocf_read(vol, cache.get_default_queue(), io_offset) == VOLUME_POISON
)
cache.stop()
cache = None
error_io[IoDir.WRITE] += 1
error_io_seq_no += 1
def _test_surprise_shutdown_mngmt_generic(pyocf_ctx, func):
def _test_surprise_shutdown_mngmt_generic(pyocf_2_ctx, failover, func):
core_device = RamVolume(S.from_MiB(10))
core = Core(device=core_device)
@ -460,87 +574,93 @@ def _test_surprise_shutdown_mngmt_generic(pyocf_ctx, func):
func(cache, core)
cache.save()
mngmt_op_surprise_shutdown_test(pyocf_ctx, test, prepare, None)
mngmt_op_surprise_shutdown_test(pyocf_2_ctx, failover, test, prepare, None)
@pytest.mark.security
@pytest.mark.long
def test_surprise_shutdown_change_cache_mode(pyocf_ctx):
@pytest.mark.parametrize("failover", [False, True])
def test_surprise_shutdown_change_cache_mode(pyocf_2_ctx, failover):
_test_surprise_shutdown_mngmt_generic(
pyocf_ctx, lambda cache, core: cache.change_cache_mode(CacheMode.WT)
pyocf_2_ctx, failover, lambda cache, core: cache.change_cache_mode(CacheMode.WT)
)
@pytest.mark.security
@pytest.mark.long
def test_surprise_shutdown_set_cleaning_policy(pyocf_ctx):
@pytest.mark.parametrize("failover", [False, True])
@pytest.mark.parametrize("start_clp", CleaningPolicy)
@pytest.mark.parametrize("end_clp", CleaningPolicy)
def test_surprise_shutdown_set_cleaning_policy(
pyocf_2_ctx, failover, start_clp, end_clp
):
core_device = RamVolume(S.from_MiB(10))
core = Core(device=core_device)
for c1 in CleaningPolicy:
for c2 in CleaningPolicy:
def prepare(cache):
cache.add_core(core)
cache.set_cleaning_policy(c1)
cache.set_cleaning_policy(start_clp)
cache.save()
def test(cache):
cache.set_cleaning_policy(c2)
cache.set_cleaning_policy(end_clp)
cache.save()
mngmt_op_surprise_shutdown_test(pyocf_ctx, test, prepare, None)
mngmt_op_surprise_shutdown_test(pyocf_2_ctx, failover, test, prepare, None)
@pytest.mark.security
@pytest.mark.long
def test_surprise_shutdown_set_seq_cut_off_policy(pyocf_ctx):
@pytest.mark.parametrize("failover", [False, True])
@pytest.mark.parametrize("start_scp", SeqCutOffPolicy)
@pytest.mark.parametrize("end_scp", SeqCutOffPolicy)
def test_surprise_shutdown_set_seq_cut_off_policy(pyocf_2_ctx, failover, start_scp, end_scp):
core_device = RamVolume(S.from_MiB(10))
core = Core(device=core_device)
for s1 in SeqCutOffPolicy:
for s2 in SeqCutOffPolicy:
def prepare(cache):
cache.add_core(core)
cache.set_seq_cut_off_policy(s1)
cache.set_seq_cut_off_policy(start_scp)
cache.save()
def test(cache):
cache.set_seq_cut_off_policy(s2)
cache.set_seq_cut_off_policy(end_scp)
cache.save()
mngmt_op_surprise_shutdown_test(pyocf_ctx, test, prepare, None)
mngmt_op_surprise_shutdown_test(pyocf_2_ctx, failover, test, prepare, None)
@pytest.mark.security
@pytest.mark.long
def test_surprise_shutdown_set_seq_cut_off_promotion(pyocf_ctx):
@pytest.mark.parametrize("failover", [False, True])
def test_surprise_shutdown_set_seq_cut_off_promotion(pyocf_2_ctx, failover):
_test_surprise_shutdown_mngmt_generic(
pyocf_ctx, lambda cache, core: cache.set_seq_cut_off_promotion(256)
pyocf_2_ctx, failover, lambda cache, core: cache.set_seq_cut_off_promotion(256)
)
@pytest.mark.security
@pytest.mark.long
def test_surprise_shutdown_set_seq_cut_off_threshold(pyocf_ctx):
@pytest.mark.parametrize("failover", [False, True])
def test_surprise_shutdown_set_seq_cut_off_threshold(pyocf_2_ctx, failover):
_test_surprise_shutdown_mngmt_generic(
pyocf_ctx, lambda cache, core: cache.set_seq_cut_off_threshold(S.from_MiB(2).B)
pyocf_2_ctx,
failover,
lambda cache, core: cache.set_seq_cut_off_threshold(S.from_MiB(2).B),
)
@pytest.mark.security
@pytest.mark.long
def test_surprise_shutdown_set_cleaning_policy_param(pyocf_ctx):
@pytest.mark.parametrize("failover", [False, True])
@pytest.mark.parametrize("clp", [c for c in CleaningPolicy if c != CleaningPolicy.NOP])
def test_surprise_shutdown_set_cleaning_policy_param(pyocf_2_ctx, failover, clp):
core_device = RamVolume(S.from_MiB(10))
core = Core(device=core_device)
for pol in CleaningPolicy:
if pol == CleaningPolicy.NOP:
continue
if pol == CleaningPolicy.ALRU:
if clp == CleaningPolicy.ALRU:
params = AlruParams
elif pol == CleaningPolicy.ACP:
elif clp == CleaningPolicy.ACP:
params = AcpParams
else:
# add handler for new policy here
@ -550,12 +670,12 @@ def test_surprise_shutdown_set_cleaning_policy_param(pyocf_ctx):
def prepare(cache):
cache.add_core(core)
cache.set_cleaning_policy(pol)
cache.set_cleaning_policy(clp)
cache.save()
def test(cache):
val = None
if pol == CleaningPolicy.ACP:
if clp == CleaningPolicy.ACP:
if p == AcpParams.WAKE_UP_TIME:
val = 5000
elif p == AcpParams.FLUSH_MAX_BUFFERS:
@ -563,7 +683,7 @@ def test_surprise_shutdown_set_cleaning_policy_param(pyocf_ctx):
else:
# add handler for new param here
assert False
elif pol == CleaningPolicy.ALRU:
elif clp == CleaningPolicy.ALRU:
if p == AlruParams.WAKE_UP_TIME:
val = 2000
elif p == AlruParams.STALE_BUFFER_TIME:
@ -575,42 +695,45 @@ def test_surprise_shutdown_set_cleaning_policy_param(pyocf_ctx):
else:
# add handler for new param here
assert False
cache.set_cleaning_policy_param(pol, p, val)
cache.set_cleaning_policy_param(clp, p, val)
cache.save()
mngmt_op_surprise_shutdown_test(pyocf_ctx, test, prepare, None)
mngmt_op_surprise_shutdown_test(pyocf_2_ctx, failover, test, prepare, None)
@pytest.mark.security
@pytest.mark.long
def test_surprise_shutdown_set_promotion_policy(pyocf_ctx):
@pytest.mark.parametrize("failover", [False, True])
@pytest.mark.parametrize("start_pp", PromotionPolicy)
@pytest.mark.parametrize("end_pp", PromotionPolicy)
def test_surprise_shutdown_set_promotion_policy(
pyocf_2_ctx, failover, start_pp, end_pp
):
core_device = RamVolume(S.from_MiB(10))
core = Core(device=core_device)
for pp1 in PromotionPolicy:
for pp2 in PromotionPolicy:
def prepare(cache):
cache.add_core(core)
cache.set_promotion_policy(pp1)
cache.set_promotion_policy(start_pp)
cache.save()
def test(cache):
cache.set_promotion_policy(pp2)
cache.set_promotion_policy(end_pp)
cache.save()
mngmt_op_surprise_shutdown_test(pyocf_ctx, test, prepare, None)
mngmt_op_surprise_shutdown_test(pyocf_2_ctx, failover, test, prepare, None)
@pytest.mark.security
@pytest.mark.long
def test_surprise_shutdown_set_promotion_policy_param(pyocf_ctx):
@pytest.mark.parametrize("failover", [False, True])
@pytest.mark.parametrize("pp", PromotionPolicy)
def test_surprise_shutdown_set_promotion_policy_param(pyocf_2_ctx, failover, pp):
core_device = RamVolume(S.from_MiB(10))
core = Core(device=core_device)
for pp in PromotionPolicy:
if pp == PromotionPolicy.ALWAYS:
continue
return
if pp == PromotionPolicy.NHIT:
params = NhitParams
else:
@ -637,12 +760,13 @@ def test_surprise_shutdown_set_promotion_policy_param(pyocf_ctx):
cache.set_promotion_policy_param(pp, p, val)
cache.save()
mngmt_op_surprise_shutdown_test(pyocf_ctx, test, prepare, None)
mngmt_op_surprise_shutdown_test(pyocf_2_ctx, failover, test, prepare, None)
@pytest.mark.security
@pytest.mark.long
def test_surprise_shutdown_set_io_class_config(pyocf_ctx):
@pytest.mark.parametrize("failover", [False, True])
def test_surprise_shutdown_set_io_class_config(pyocf_2_ctx, failover):
core_device = RamVolume(S.from_MiB(10))
core = Core(device=core_device)
@ -697,4 +821,319 @@ def test_surprise_shutdown_set_io_class_config(pyocf_ctx):
]
assert curr_ioclass == old_ioclass or curr_ioclass == new_ioclass
mngmt_op_surprise_shutdown_test(pyocf_ctx, test, prepare, check)
mngmt_op_surprise_shutdown_test(pyocf_2_ctx, failover, test, prepare, check)
@pytest.mark.security
@pytest.mark.long
def test_surprise_shutdown_standby_activate(pyocf_ctx):
""" 1. start active cache
2. add core, insert data
3. stop
4. load standby
5. detach
6. activate <- with I/O error injection
7. standby load
8. verify consistency
"""
io_offset = mngmt_op_surprise_shutdown_test_io_offset
error_triggered = True
error_io_seq_no = 0
while error_triggered:
# Start cache device without error injection
error_io = {IoDir.WRITE: error_io_seq_no}
ramdisk = RamVolume(mngmt_op_surprise_shutdown_test_cache_size)
device = ErrorDevice(ramdisk, error_seq_no=error_io, armed=False)
core_device = RamVolume(S.from_MiB(10))
device.disarm()
# Add a core device and provide a few dirty blocks
cache = Cache.start_on_device(device, cache_mode=CacheMode.WB)
core = Core(device=core_device)
cache.add_core(core)
vol = CoreVolume(core, open=True)
ocf_write(vol, cache.get_default_queue(), 0xAA, io_offset)
original_dirty_blocks = cache.get_stats()["usage"]["dirty"]
cache.stop()
# Preapre a passive instance
cache = Cache(owner=OcfCtx.get_default())
cache.start_cache()
cache.standby_load(device)
cache.standby_detach()
device.arm()
# If the activate failes, cache should be rollbacked into the passive state
try:
cache.standby_activate(device)
status = OcfErrorCode.OCF_OK
except OcfError as ex:
status = ex.error_code
cache.stop()
# If error was injected we expect mngmt op error
error_triggered = device.error_triggered()
assert error_triggered == (status != 0)
# Activate succeeded but error injection is still enabled
if not error_triggered:
with pytest.raises(OcfError) as ex:
cache.stop()
# Disable error injection and activate cache
device.disarm()
cache = None
cache = Cache(owner=OcfCtx.get_default())
cache.start_cache()
cache.standby_load(device)
cache.standby_detach()
cache.standby_activate(device, open_cores=False)
assert cache.get_stats()["conf"]["core_count"] == 1
assert original_dirty_blocks == cache.get_stats()["usage"]["dirty"]
core = Core(device=core_device)
cache.add_core(core, try_add=True)
vol = CoreVolume(core, open=True)
assert ocf_read(vol, cache.get_default_queue(), io_offset) == 0xAA
cache.stop()
# advance error injection point
error_io_seq_no += 1
@pytest.mark.security
@pytest.mark.long
def test_surprise_shutdown_standby_init_clean(pyocf_ctx):
""" interrupted standby init on an empty volume """
error_triggered = True
error_io_seq_no = 0
while error_triggered:
# Start cache device without error injection
error_io = {IoDir.WRITE: error_io_seq_no}
ramdisk = RamVolume(mngmt_op_surprise_shutdown_test_cache_size)
device = ErrorDevice(ramdisk, error_seq_no=error_io, armed=True)
cache = Cache(owner=OcfCtx.get_default())
cache.start_cache()
try:
cache.standby_attach(device)
status = OcfErrorCode.OCF_OK
except OcfError as ex:
status = ex.error_code
cache.stop()
# if error was injected we expect mngmt op error
error_triggered = device.error_triggered()
assert error_triggered == (status != 0)
if not error_triggered:
# stop cache with error injection still on - expect no error in standby
# as no writes go to the disk
cache.stop()
break
# disable error injection and load the cache
device.disarm()
cache = None
cache = Cache(owner=OcfCtx.get_default())
cache.start_cache()
with pytest.raises(OcfError) as ex:
cache.standby_load(device)
assert ex.value.error_code == OcfErrorCode.OCF_ERR_NO_METADATA
cache.stop()
# advance error injection point
error_io_seq_no += 1
@pytest.mark.security
@pytest.mark.long
def test_surprise_shutdown_standby_init_force_1(pyocf_ctx):
""" 1. start active
2. add core, insert cacheline
3. stop cache
4. standby attach force = 1 <- with I/O injection
5. standby load
6. activate
7. verify consistency: either no metadata, empty cache or cacheline still inserted
"""
core_device = RamVolume(S.from_MiB(10))
io_offset = mngmt_op_surprise_shutdown_test_io_offset
error_triggered = True
error_io_seq_no = 0
while error_triggered:
# Start cache device without error injection
error_io = {IoDir.WRITE: error_io_seq_no}
ramdisk = RamVolume(mngmt_op_surprise_shutdown_test_cache_size)
device = ErrorDevice(ramdisk, error_seq_no=error_io, armed=False)
# start and stop cache with cacheline inserted
cache = Cache.start_on_device(device, cache_mode=CacheMode.WB)
core = Core(device=core_device)
cache.add_core(core)
vol = CoreVolume(core, open=True)
ocf_write(vol, cache.get_default_queue(), 0xAA, io_offset)
original_dirty_blocks = cache.get_stats()["usage"]["dirty"]
cache.stop()
cache = Cache(owner=OcfCtx.get_default())
cache.start_cache()
device.arm()
# attempt to reinitialize standby cache with erorr injection
try:
cache.standby_attach(device, force=True)
status = OcfErrorCode.OCF_OK
except OcfError as ex:
status = ex.error_code
# if error was injected we expect mngmt op error
error_triggered = device.error_triggered()
assert error_triggered == (status == OcfErrorCode.OCF_ERR_WRITE_CACHE)
# stop cache with error injection still on
# expect no error when stoping standby or detached cache
cache.stop()
cache = None
# disable error injection and load the cache
device.disarm()
cache = Cache(owner=OcfCtx.get_default())
cache.start_cache()
# standby load
try:
cache.standby_load(device)
cache.standby_detach()
cache.standby_activate(device, open_cores=False)
status = OcfErrorCode.OCF_OK
except OcfError as ex:
status = ex.error_code
if status != OcfErrorCode.OCF_OK:
assert status == OcfErrorCode.OCF_ERR_NO_METADATA
else:
stats = cache.get_stats()
if stats["conf"]["core_count"] == 1:
assert original_dirty_blocks == stats["usage"]["dirty"]
core = Core(device=core_device)
cache.add_core(core, try_add=True)
vol = CoreVolume(core, open=True)
assert ocf_read(vol, cache.get_default_queue(), io_offset) == 0xAA
else:
assert stats["usage"]["occupancy"]["value"] == 0
assert stats["usage"]["dirty"]["value"] == 0
core = Core(device=core_device)
cache.add_core(core)
vol = CoreVolume(core, open=True)
assert (
ocf_read(vol, cache.get_default_queue(), io_offset) == VOLUME_POISON
)
cache.stop()
error_io_seq_no += 1
@pytest.mark.security
@pytest.mark.long
def test_surprise_shutdown_standby_init_force_2(pyocf_ctx):
""" 1. start active
2. add core, insert cacheline
3. stop cache
4. standby attach force = 1 <- with I/O injection
5. load cache (standard load)
6. verify consistency: either no metadata, empty cache or cacheline still inserted
"""
core_device = RamVolume(S.from_MiB(10))
io_offset = mngmt_op_surprise_shutdown_test_io_offset
error_triggered = True
error_io_seq_no = 0
while error_triggered:
# Start cache device without error injection
error_io = {IoDir.WRITE: error_io_seq_no}
ramdisk = RamVolume(mngmt_op_surprise_shutdown_test_cache_size)
device = ErrorDevice(ramdisk, error_seq_no=error_io, armed=False)
# start and stop cache with cacheline inserted
cache = Cache.start_on_device(device, cache_mode=CacheMode.WB)
core = Core(device=core_device)
cache.add_core(core)
vol = CoreVolume(core, open=True)
ocf_write(vol, cache.get_default_queue(), 0xAA, io_offset)
original_dirty_blocks = cache.get_stats()["usage"]["dirty"]
cache.stop()
cache = Cache(owner=OcfCtx.get_default())
cache.start_cache()
device.arm()
# attempt to reinitialize standby cache with erorr injection
try:
cache.standby_attach(device, force=True)
status = OcfErrorCode.OCF_OK
except OcfError as ex:
status = ex.error_code
# if error was injected we expect mngmt op error
error_triggered = device.error_triggered()
assert error_triggered == (status == OcfErrorCode.OCF_ERR_WRITE_CACHE)
# stop cache with error injection still on
# expect no error when stoping standby or detached cache
cache.stop()
cache = None
# disable error injection and load the cache
device.disarm()
# standard load
try:
cache = Cache.load_from_device(device, open_cores=False)
status = OcfErrorCode.OCF_OK
except OcfError as ex:
status = ex.error_code
if status != OcfErrorCode.OCF_OK:
assert status == OcfErrorCode.OCF_ERR_NO_METADATA
else:
stats = cache.get_stats()
if stats["conf"]["core_count"] == 1:
assert original_dirty_blocks == stats["usage"]["dirty"]
core = Core(device=core_device)
cache.add_core(core, try_add=True)
vol = CoreVolume(core, open=True)
assert ocf_read(vol, cache.get_default_queue(), io_offset) == 0xAA
else:
assert stats["usage"]["occupancy"]["value"] == 0
assert stats["usage"]["dirty"]["value"] == 0
core = Core(device=core_device)
cache.add_core(core)
vol = CoreVolume(core, open=True)
assert (
ocf_read(vol, cache.get_default_queue(), io_offset) == VOLUME_POISON
)
if cache:
cache.stop()
cache = None
error_io_seq_no += 1