Merge pull request #734 from arutk/cvoltests3

pyocf: composite volume I/O address range tests
This commit is contained in:
Robert Baldyga 2022-06-29 14:16:52 +02:00 committed by GitHub
commit 2defff1da0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 470 additions and 84 deletions

View File

@ -103,8 +103,11 @@ class OcfCompletion:
return complete
def wait(self):
self.e.wait()
def wait(self, timeout=None):
return self.e.wait(timeout=timeout)
def completed(self):
return self.e.is_set()
class OcfError(BaseException):

View File

@ -431,12 +431,19 @@ class RamVolume(Volume):
class ErrorDevice(Volume):
def __init__(
self, vol, error_sectors: set = None, error_seq_no: dict = None, armed=True, uuid=None,
self,
vol,
error_sectors: set = None,
error_seq_no: dict = None,
data_only=False,
armed=True,
uuid=None,
):
self.vol = vol
super().__init__(uuid)
self.error_sectors = error_sectors
self.error_seq_no = error_seq_no
self.error_sectors = error_sectors or set()
self.error_seq_no = error_seq_no or {IoDir.WRITE: -1, IoDir.READ: -1}
self.data_only = data_only
self.armed = armed
self.io_seq_no = {IoDir.WRITE: 0, IoDir.READ: 0}
self.error = False
@ -444,32 +451,44 @@ class ErrorDevice(Volume):
def set_mapping(self, error_sectors: set):
self.error_sectors = error_sectors
def do_submit_io(self, io):
def should_forward_io(self, io):
if not self.armed:
self.vol.do_submit_io(io)
return
return True
direction = IoDir(io.contents._dir)
seq_no_match = (
self.error_seq_no is not None
and direction in self.error_seq_no
self.error_seq_no[direction] >= 0
and self.error_seq_no[direction] <= self.io_seq_no[direction]
)
sector_match = self.error_sectors is not None and io.contents._addr in self.error_sectors
sector_match = io.contents._addr in self.error_sectors
self.io_seq_no[direction] += 1
error = True
if self.error_seq_no is not None and not seq_no_match:
error = False
if self.error_sectors is not None and not sector_match:
error = False
if error:
self.error = True
io.contents._end(io, -OcfErrorCode.OCF_ERR_IO)
self.stats["errors"][direction] += 1
else:
return not seq_no_match and not sector_match
def complete_with_error(self, io):
self.error = True
direction = IoDir(io.contents._dir)
self.stats["errors"][direction] += 1
io.contents._end(io, -OcfErrorCode.OCF_ERR_IO)
def do_submit_io(self, io):
if self.should_forward_io(io):
self.vol.do_submit_io(io)
else:
self.complete_with_error(io)
def do_submit_flush(self, flush):
if self.data_only or self.should_forward_io(flush):
self.vol.do_submit_flush(flush)
else:
self.complete_with_error(flush)
def do_submit_discard(self, discard):
if self.data_only or self.should_forward_io(discard):
self.vol.do_submit_discard(discard)
else:
self.complete_with_error(discard)
def arm(self):
self.armed = True
@ -491,12 +510,6 @@ class ErrorDevice(Volume):
def get_max_io_size(self):
return self.vol.get_max_io_size()
def do_submit_flush(self, flush):
return self.vol.do_submit_flush(flush)
def do_submit_discard(self, discard):
return self.vol.do_submit_discard(discard)
def dump(self, offset=0, size=0, ignore=VOLUME_POISON, **kwargs):
return self.vol.dump(offset, size, ignore=ignore, **kwargs)

View File

@ -4,14 +4,19 @@
#
import pytest
from ctypes import c_int
import random
from ctypes import POINTER, c_int, cast, c_void_p
from datetime import datetime
from threading import Event
from collections import namedtuple
from pyocf.types.volume import RamVolume, ErrorDevice, TraceDevice, IoFlags
from pyocf.ocf import OcfLib
from pyocf.types.volume import RamVolume, ErrorDevice, TraceDevice, IoFlags, VolumeIoPriv
from pyocf.types.cvolume import CVolume
from pyocf.types.data import Data
from pyocf.types.io import IoDir
from pyocf.types.shared import OcfError, OcfCompletion
from pyocf.types.cache import Cache
from pyocf.types.shared import OcfError, OcfErrorCode, OcfCompletion
from pyocf.utils import Size as S
@ -99,7 +104,7 @@ def test_add_max_subvolumes(pyocf_ctx):
cvol.destroy()
def _cvol_io(cvol, addr, size, func, flags=0):
def prepare_cvol_io(cvol, addr, size, func, flags=0):
io = cvol.new_io(
queue=None,
addr=addr,
@ -110,16 +115,78 @@ def _cvol_io(cvol, addr, size, func, flags=0):
)
completion = OcfCompletion([("err", c_int)])
io.callback = completion.callback
data = Data(byte_count=size)
data = Data(size)
io.set_data(data, 0)
submit_fn = getattr(io, func)
submit_fn()
return io, completion
def cvol_submit_data_io(cvol, addr, size, flags=0):
io, completion = prepare_cvol_io(cvol, addr, size, flags)
io.submit()
completion.wait()
return int(completion.results["err"])
def cvol_submit_flush_io(cvol, addr, size, flags=0):
io, completion = prepare_cvol_io(cvol, addr, size, flags)
io.submit_flush()
completion.wait()
return int(completion.results["err"])
def cvol_submit_discard_io(cvol, addr, size, flags=0):
io, completion = prepare_cvol_io(cvol, addr, size, flags)
io.submit_discard()
completion.wait()
return int(completion.results["err"])
IoEvent = namedtuple("IoEvent", ["dir", "addr", "bytes"])
def setup_tracing(backends):
io_trace = {}
vols = []
for vol in backends:
trace_vol = TraceDevice(vol)
vols.append(trace_vol)
io_trace[trace_vol] = {
TraceDevice.IoType.Flush: [],
TraceDevice.IoType.Discard: [],
TraceDevice.IoType.Data: [],
}
def trace(vol, io, io_type):
if int(io.contents._flags) & IoFlags.FLUSH:
io_type = TraceDevice.IoType.Flush
io_trace[vol][io_type].append(
IoEvent(io.contents._dir, io.contents._addr, io.contents._bytes)
)
return True
for vol in vols:
vol.trace_fcn = trace
return vols, io_trace
def clear_tracing(io_trace):
for io_types in io_trace.values():
for ios in io_types.values():
ios.clear()
def test_basic_volume_operations(pyocf_ctx):
"""
title: Perform basic volume operations.
@ -140,59 +207,38 @@ def test_basic_volume_operations(pyocf_ctx):
- composite_volume::volume_api
- composite_volume::io_request_passing
"""
count = {"flush": 0, "discard": 0, "io": 0}
expected = {"flush": 0, "discard": 0, "io": 0}
pyocf_ctx.register_volume_type(TraceDevice)
addr = S.from_KiB(512).B
size = S.from_KiB(4).B
def trace(vol, io, io_type):
if io_type == TraceDevice.IoType.Flush or int(io.contents._flags) & IoFlags.FLUSH:
count["flush"] += 1
elif io_type == TraceDevice.IoType.Discard:
count["discard"] += 1
else:
assert io_type == TraceDevice.IoType.Data
count["io"] += 1
assert io.contents._dir == IoDir.WRITE
assert io.contents._addr == addr
assert io.contents._bytes == size
return True
size = S.from_KiB(4)
backend = RamVolume(S.from_MiB(1))
trace_dev = TraceDevice(backend, trace_fcn=trace)
(vol,), io_trace = setup_tracing([backend])
cvol = CVolume(pyocf_ctx)
cvol.add(trace_dev)
cvol.add(vol)
cvol.open()
# verify data properly propagated
ret = _cvol_io(cvol, addr, size, "submit")
ret = cvol_submit_data_io(cvol, addr, size)
assert ret == 0
expected["io"] += 1
assert expected == count
assert len(io_trace[vol][TraceDevice.IoType.Data]) == 1
# verify flush properly propagated
ret = _cvol_io(cvol, addr, size, "submit_flush", IoFlags.FLUSH)
ret = cvol_submit_flush_io(cvol, addr, size, IoFlags.FLUSH)
assert ret == 0
expected["flush"] += 1
assert expected == count
assert len(io_trace[vol][TraceDevice.IoType.Flush]) == 1
# verify discard properly propagated
ret = _cvol_io(cvol, addr, size, "submit_discard")
ret = cvol_submit_discard_io(cvol, addr, size)
assert ret == 0
expected["discard"] += 1
assert expected == count
assert len(io_trace[vol][TraceDevice.IoType.Discard]) == 1
cvol.close()
cvol.destroy()
@pytest.mark.skip(reason="not implemented")
def test_io_propagation_basic(pyocf_ctx):
"""
title: Perform volume operations with multiple subvolumes.
@ -216,10 +262,51 @@ def test_io_propagation_basic(pyocf_ctx):
- composite_volume::volume_api
- composite_volume::io_request_passing
"""
pass
pyocf_ctx.register_volume_type(TraceDevice)
vol_size = S.from_MiB(1)
ram_vols = [RamVolume(vol_size * i) for i in range(1, 17)]
vols, io_trace = setup_tracing(ram_vols)
running_sum = S(0)
vol_begin = []
for v in ram_vols:
vol_begin.append(S(running_sum))
running_sum += S(v.size)
cvol = CVolume(pyocf_ctx)
for vol in vols:
cvol.add(vol)
cvol.open()
# hit each subvolume at different offset (vol number * 1 KiB)
io_addr = [i * S.from_KiB(1) + (vol_begin[i]) for i in range(len(vols))]
io_size = S.from_KiB(12)
for i, (vol, addr) in enumerate(zip(vols, io_addr)):
ret = cvol_submit_data_io(cvol, addr, io_size)
assert ret == 0
ret = cvol_submit_flush_io(cvol, addr, io_size, IoFlags.FLUSH)
assert ret == 0
ret = cvol_submit_discard_io(cvol, addr, io_size)
assert ret == 0
for io_type in TraceDevice.IoType:
ios = io_trace[vol][io_type]
assert len(ios) == 1
io = ios[0]
assert io.dir == IoDir.WRITE
assert io.addr == addr.B - int(vol_begin[i])
assert io.bytes == io_size.B
cvol.close()
cvol.destroy()
@pytest.mark.skip(reason="not implemented")
def test_io_propagation_cross_boundary(pyocf_ctx):
"""
title: Perform cross-subvolume operations.
@ -242,11 +329,106 @@ def test_io_propagation_cross_boundary(pyocf_ctx):
requirements:
- composite_volume::io_request_passing
"""
pass
pyocf_ctx.register_volume_type(TraceDevice)
vol_size = S.from_MiB(1)
ram_vols = [RamVolume(vol_size * i) for i in range(16, 0, -1)]
vols, io_trace = setup_tracing(ram_vols)
running_sum = S(0)
vol_begin = []
for v in ram_vols:
vol_begin.append(S(running_sum))
running_sum += S(v.size)
cvol = CVolume(pyocf_ctx)
for vol in vols:
cvol.add(vol)
cvol.open()
io_size = S.from_KiB(12)
io_addr = [S(end) - (io_size / 2) for end in vol_begin[1:]]
for i, addr in enumerate(io_addr):
clear_tracing(io_trace)
ret = cvol_submit_data_io(cvol, addr, io_size)
assert ret == 0
ret = cvol_submit_flush_io(cvol, addr, io_size, IoFlags.FLUSH)
assert ret == 0
ret = cvol_submit_discard_io(cvol, addr, io_size)
assert ret == 0
for io_type in TraceDevice.IoType:
ios1 = io_trace[vols[i]][io_type]
ios2 = io_trace[vols[i + 1]][io_type]
assert len(ios1) == 1
io = ios1[0]
assert io.dir == IoDir.WRITE
assert io.addr == int(vols[i].vol.size - (io_size / 2))
assert io.bytes == io_size.B / 2
assert len(ios2) == 1
io = ios2[0]
assert io.dir == IoDir.WRITE
assert io.addr == 0
assert io.bytes == io_size.B / 2
cvol.close()
cvol.destroy()
@pytest.mark.skip(reason="not implemented")
def test_io_propagation_multiple_subvolumes(pyocf_ctx):
def test_io_propagation_entire_dev(pyocf_ctx):
"""
title: Perform flush with 0 size
description: |
Check that flush operation with 0 size gets propagated to all
subvolumes.
pass_criteria:
- Composite volume is created without an error.
- Subvolumes are added without an error.
- Flush is propagated to all subvolumes
steps:
- Create composite volume
- Add 16 mock volumes as subvolumes
- Submit flush with size == 0
- Check if flush is sent to all subvolumes
- Destroy composite volume
requirements:
- composite_volume::io_request_passing
"""
pyocf_ctx.register_volume_type(TraceDevice)
vol_size = S.from_MiB(1)
ram_vols = [RamVolume(vol_size * (3 if i % 2 else 1)) for i in range(16)]
vols, io_trace = setup_tracing(ram_vols)
cvol = CVolume(pyocf_ctx)
for vol in vols:
cvol.add(vol)
cvol.open()
ret = cvol_submit_flush_io(cvol, 0, 0, IoFlags.FLUSH)
assert ret == 0
for vol, io_types in io_trace.items():
assert len(io_types[TraceDevice.IoType.Flush]) == 1
assert io_types[TraceDevice.IoType.Flush][0].addr == 0
assert io_types[TraceDevice.IoType.Flush][0].bytes == 0
cvol.close()
cvol.destroy()
@pytest.mark.parametrize("rand_seed", [datetime.now().timestamp()])
def test_io_propagation_multiple_subvolumes(pyocf_ctx, rand_seed):
"""
title: Perform multi-subvolume operations.
description: |
@ -268,11 +450,67 @@ def test_io_propagation_multiple_subvolumes(pyocf_ctx):
requirements:
- composite_volume::io_request_passing
"""
pass
random.seed(rand_seed)
pyocf_ctx.register_volume_type(TraceDevice)
vol_size = S.from_MiB(1)
ram_vols = [RamVolume(vol_size) for _ in range(16)]
vols, io_trace = setup_tracing(ram_vols)
cvol = CVolume(pyocf_ctx)
for vol in vols:
cvol.add(vol)
cvol.open()
for subvol_count in range(2, len(vols) + 1):
clear_tracing(io_trace)
first_idx = random.randint(0, len(vols) - subvol_count)
# I/O addres range start/end offsets within a subvolume
start_offset = S.from_B(random.randint(0, vol_size.B // 512 - 1) * 512)
end_offset = S.from_B(random.randint(0, vol_size.B // 512 - 1) * 512)
size = (vol_size - start_offset) + (subvol_count - 2) * vol_size + end_offset
addr = first_idx * vol_size + start_offset
# aliases for subvolumes for easy referencing
first = vols[first_idx]
middle = vols[(first_idx + 1):(first_idx + subvol_count - 1)]
last = vols[first_idx + subvol_count - 1]
subvols = vols[(first_idx):(first_idx + subvol_count)]
ret = cvol_submit_data_io(cvol, addr, size)
assert ret == 0
ret = cvol_submit_flush_io(cvol, addr, size, IoFlags.FLUSH)
assert ret == 0
ret = cvol_submit_discard_io(cvol, addr, size)
assert ret == 0
for vol in middle:
for io in io_trace[vol].values():
assert len(io) == 1
assert io[0].addr == 0
assert io[0].bytes == int(vol.vol.size)
for io in io_trace[first].values():
assert io[0].addr == int(start_offset)
assert io[0].bytes == int(vol_size - start_offset)
for io in io_trace[last].values():
assert io[0].addr == 0
assert io[0].bytes == int(end_offset)
cvol.close()
cvol.destroy()
@pytest.mark.skip(reason="not implemented")
def test_io_completion(pyocf_ctx):
@pytest.mark.parametrize("rand_seed", [datetime.now().timestamp()])
def test_io_completion(pyocf_ctx, rand_seed):
"""
title: Composite volume completion order.
description: |
@ -294,11 +532,102 @@ def test_io_completion(pyocf_ctx):
requirements:
- composite_volume::io_request_completion
"""
pass
random.seed(rand_seed)
class PendingIoVolume(RamVolume):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.pending_ios = []
self.io_submitted = Event()
def do_submit_io(self, io):
self.pending_ios.append(("io", io))
self.io_submitted.set()
def do_submit_flush(self, flush):
self.pending_ios.append(("flush", flush))
self.io_submitted.set()
def do_submit_discard(self, discard):
self.pending_ios.append(("discard", discard))
self.io_submitted.set()
def wait_submitted(self):
self.io_submitted.wait()
self.io_submitted.clear()
def resume_next(self):
if not self.pending_ios:
return False
io_type, io = self.pending_ios.pop()
if io_type == "io":
super().do_submit_io(io)
elif io_type == "flush":
super().do_submit_flush(io)
elif io_type == "discard":
super().do_submit_discard(io)
else:
assert False
return True
pyocf_ctx.register_volume_type(PendingIoVolume)
vol_size = S.from_MiB(1)
vols = [PendingIoVolume(vol_size) for _ in range(16)]
cvol = CVolume(pyocf_ctx)
for vol in vols:
cvol.add(vol)
cvol.open()
for subvol_count in range(2, len(vols)):
# start I/O at an offset in the first volume
addr = vol_size / 2
size = (subvol_count - 1) * vol_size
for op, flags in [("submit", 0), ("submit_flush", IoFlags.FLUSH), ("submit_discard", 0)]:
io = cvol.new_io(
queue=None,
addr=addr,
length=size,
direction=IoDir.WRITE,
io_class=0,
flags=flags,
)
completion = OcfCompletion([("err", c_int)])
io.callback = completion.callback
data = Data(size)
io.set_data(data, 0)
submit_fn = getattr(io, op)
submit_fn()
pending_vols = vols[:subvol_count]
for v in pending_vols:
v.wait_submitted()
assert not completion.completed()
random.shuffle(pending_vols)
for v in pending_vols:
assert not completion.completed()
assert v.resume_next()
assert not v.resume_next()
assert completion.wait(timeout=10)
assert int(completion.results["err"]) == 0
cvol.close()
cvol.destroy()
@pytest.mark.skip(reason="not implemented")
def test_io_completion(pyocf_ctx):
@pytest.mark.parametrize("rand_seed", [datetime.now().timestamp()])
def test_io_error(pyocf_ctx, rand_seed):
"""
title: Composite volume error propagation.
description: |
@ -321,7 +650,43 @@ def test_io_completion(pyocf_ctx):
requirements:
- composite_volume::io_error_handling
"""
pass
random.seed(rand_seed)
pyocf_ctx.register_volume_type(TraceDevice)
vol_size = S.from_MiB(1)
ram_vols = [RamVolume(vol_size) for _ in range(16)]
err_vols = [ErrorDevice(rv, armed=False, error_seq_no={IoDir.WRITE: 0}) for rv in ram_vols]
cvol = CVolume(pyocf_ctx)
for vol in err_vols:
cvol.add(vol)
cvol.open()
for subvol_count in range(2, len(err_vols)):
# start I/O at an offset in the first volume
addr = vol_size / 2
size = subvol_count * vol_size
error_idx = random.randrange(0, subvol_count)
err_vols[error_idx].arm()
# verify data properly propagated
ret = cvol_submit_data_io(cvol, addr, size)
assert ret == -OcfErrorCode.OCF_ERR_IO
# verify flush properly propagated
ret = cvol_submit_flush_io(cvol, addr, size, IoFlags.FLUSH)
assert ret == -OcfErrorCode.OCF_ERR_IO
# verdiscard discard properly propagated
ret = cvol_submit_discard_io(cvol, addr, size)
assert ret == -OcfErrorCode.OCF_ERR_IO
err_vols[error_idx].disarm()
cvol.close()
cvol.destroy()
def test_attach(pyocf_ctx):

View File

@ -70,7 +70,10 @@ def prepare_failover(pyocf_2_ctx, cache_backend_vol, error_io_seq_no):
error_io = {IoDir.WRITE: error_io_seq_no}
err_vol = ErrorDevice(cache2_exp_obj_vol, error_seq_no=error_io, armed=False)
# TODO: Adjust tests to work with error injection for flushes and discards (data_only=False
# below). Currently the test fails with data_only=False as it assumes metadata is not updated
# if error had been injected, which is not true in case of error in flush.
err_vol = ErrorDevice(cache2_exp_obj_vol, error_seq_no=error_io, data_only=True, armed=False)
cache = Cache.start_on_device(err_vol, cache_mode=CacheMode.WB, owner=ctx1)
return cache, cache2, err_vol
@ -81,7 +84,7 @@ def prepare_normal(pyocf_2_ctx, cache_backend_vol, error_io_seq_no):
error_io = {IoDir.WRITE: error_io_seq_no}
err_vol = ErrorDevice(cache_backend_vol, error_seq_no=error_io, armed=False)
err_vol = ErrorDevice(cache_backend_vol, error_seq_no=error_io, data_only=True, armed=False)
cache = Cache.start_on_device(err_vol, cache_mode=CacheMode.WB, owner=ctx1)
return cache, err_vol
@ -330,9 +333,11 @@ def test_surprise_shutdown_start_cache(pyocf_2_ctx, failover):
cache2.start_cache()
cache2.standby_attach(ramdisk)
cache2_exp_obj_vol = CacheVolume(cache2, open=True)
err_device = ErrorDevice(cache2_exp_obj_vol, error_seq_no=error_io, armed=True)
err_device = ErrorDevice(
cache2_exp_obj_vol, error_seq_no=error_io, data_only=True, armed=True
)
else:
err_device = ErrorDevice(ramdisk, error_seq_no=error_io, armed=True)
err_device = ErrorDevice(ramdisk, error_seq_no=error_io, data_only=True, armed=True)
# call tested management function
try:
@ -808,7 +813,7 @@ def test_surprise_shutdown_standby_activate(pyocf_ctx):
# Start cache device without error injection
error_io = {IoDir.WRITE: error_io_seq_no}
ramdisk = RamVolume(mngmt_op_surprise_shutdown_test_cache_size)
device = ErrorDevice(ramdisk, error_seq_no=error_io, armed=False)
device = ErrorDevice(ramdisk, error_seq_no=error_io, data_only=True, rmed=False)
core_device = RamVolume(S.from_MiB(10))
device.disarm()
@ -882,7 +887,7 @@ def test_surprise_shutdown_standby_init_clean(pyocf_ctx):
# Start cache device without error injection
error_io = {IoDir.WRITE: error_io_seq_no}
ramdisk = RamVolume(mngmt_op_surprise_shutdown_test_cache_size)
device = ErrorDevice(ramdisk, error_seq_no=error_io, armed=True)
device = ErrorDevice(ramdisk, error_seq_no=error_io, data_only=True, armed=True)
cache = Cache(owner=OcfCtx.get_default())
cache.start_cache()
@ -942,7 +947,7 @@ def test_surprise_shutdown_standby_init_force_1(pyocf_ctx):
# Start cache device without error injection
error_io = {IoDir.WRITE: error_io_seq_no}
ramdisk = RamVolume(mngmt_op_surprise_shutdown_test_cache_size)
device = ErrorDevice(ramdisk, error_seq_no=error_io, armed=False)
device = ErrorDevice(ramdisk, error_seq_no=error_io, data_only=True, armed=False)
# start and stop cache with cacheline inserted
cache = Cache.start_on_device(device, cache_mode=CacheMode.WB)
@ -1032,7 +1037,7 @@ def test_surprise_shutdown_standby_init_force_2(pyocf_ctx):
# Start cache device without error injection
error_io = {IoDir.WRITE: error_io_seq_no}
ramdisk = RamVolume(mngmt_op_surprise_shutdown_test_cache_size)
device = ErrorDevice(ramdisk, error_seq_no=error_io, armed=False)
device = ErrorDevice(ramdisk, error_seq_no=error_io, data_only=True, armed=False)
# start and stop cache with cacheline inserted
cache = Cache.start_on_device(device, cache_mode=CacheMode.WB)