pyocf: composite volume tests

Signed-off-by: Adam Rutkowski <adam.j.rutkowski@intel.com>
This commit is contained in:
Adam Rutkowski 2022-06-13 11:35:16 +02:00 committed by Jan Musial
parent 81396681f4
commit 69ef673bd1
2 changed files with 416 additions and 51 deletions

View File

@ -479,13 +479,13 @@ class ErrorDevice(Volume):
self.complete_with_error(io) self.complete_with_error(io)
def do_submit_flush(self, flush): def do_submit_flush(self, flush):
if self.data_only and self.should_forward_io(flush): if self.data_only or self.should_forward_io(flush):
self.vol.do_submit_flush(flush) self.vol.do_submit_flush(flush)
else: else:
self.complete_with_error(flush) self.complete_with_error(flush)
def do_submit_discard(self, discard): def do_submit_discard(self, discard):
if self.data_only and self.should_forward_io(discard): if self.data_only or self.should_forward_io(discard):
self.vol.do_submit_discard(discard) self.vol.do_submit_discard(discard)
else: else:
self.complete_with_error(discard) self.complete_with_error(discard)

View File

@ -4,14 +4,19 @@
# #
import pytest import pytest
from ctypes import c_int import random
from ctypes import POINTER, c_int, cast, c_void_p
from datetime import datetime
from threading import Event
from collections import namedtuple
from pyocf.types.volume import RamVolume, ErrorDevice, TraceDevice, IoFlags from pyocf.ocf import OcfLib
from pyocf.types.volume import RamVolume, ErrorDevice, TraceDevice, IoFlags, VolumeIoPriv
from pyocf.types.cvolume import CVolume from pyocf.types.cvolume import CVolume
from pyocf.types.data import Data from pyocf.types.data import Data
from pyocf.types.io import IoDir from pyocf.types.io import IoDir
from pyocf.types.shared import OcfError, OcfCompletion
from pyocf.types.cache import Cache from pyocf.types.cache import Cache
from pyocf.types.shared import OcfError, OcfErrorCode, OcfCompletion
from pyocf.utils import Size as S from pyocf.utils import Size as S
@ -99,7 +104,7 @@ def test_add_max_subvolumes(pyocf_ctx):
cvol.destroy() cvol.destroy()
def _cvol_io(cvol, addr, size, func, flags=0): def prepare_cvol_io(cvol, addr, size, func, flags=0):
io = cvol.new_io( io = cvol.new_io(
queue=None, queue=None,
addr=addr, addr=addr,
@ -110,16 +115,78 @@ def _cvol_io(cvol, addr, size, func, flags=0):
) )
completion = OcfCompletion([("err", c_int)]) completion = OcfCompletion([("err", c_int)])
io.callback = completion.callback io.callback = completion.callback
data = Data(byte_count=size)
data = Data(size)
io.set_data(data, 0) io.set_data(data, 0)
submit_fn = getattr(io, func) return io, completion
submit_fn()
def cvol_submit_data_io(cvol, addr, size, flags=0):
io, completion = prepare_cvol_io(cvol, addr, size, flags)
io.submit()
completion.wait() completion.wait()
return int(completion.results["err"]) return int(completion.results["err"])
def cvol_submit_flush_io(cvol, addr, size, flags=0):
io, completion = prepare_cvol_io(cvol, addr, size, flags)
io.submit_flush()
completion.wait()
return int(completion.results["err"])
def cvol_submit_discard_io(cvol, addr, size, flags=0):
io, completion = prepare_cvol_io(cvol, addr, size, flags)
io.submit_discard()
completion.wait()
return int(completion.results["err"])
IoEvent = namedtuple("IoEvent", ["dir", "addr", "bytes"])
def setup_tracing(backends):
io_trace = {}
vols = []
for vol in backends:
trace_vol = TraceDevice(vol)
vols.append(trace_vol)
io_trace[trace_vol] = {
TraceDevice.IoType.Flush: [],
TraceDevice.IoType.Discard: [],
TraceDevice.IoType.Data: [],
}
def trace(vol, io, io_type):
if int(io.contents._flags) & IoFlags.FLUSH:
io_type = TraceDevice.IoType.Flush
io_trace[vol][io_type].append(
IoEvent(io.contents._dir, io.contents._addr, io.contents._bytes)
)
return True
for vol in vols:
vol.trace_fcn = trace
return vols, io_trace
def clear_tracing(io_trace):
for io_types in io_trace.values():
for ios in io_types.values():
ios.clear()
def test_basic_volume_operations(pyocf_ctx): def test_basic_volume_operations(pyocf_ctx):
""" """
title: Perform basic volume operations. title: Perform basic volume operations.
@ -140,59 +207,38 @@ def test_basic_volume_operations(pyocf_ctx):
- composite_volume::volume_api - composite_volume::volume_api
- composite_volume::io_request_passing - composite_volume::io_request_passing
""" """
count = {"flush": 0, "discard": 0, "io": 0}
expected = {"flush": 0, "discard": 0, "io": 0}
pyocf_ctx.register_volume_type(TraceDevice) pyocf_ctx.register_volume_type(TraceDevice)
addr = S.from_KiB(512).B addr = S.from_KiB(512).B
size = S.from_KiB(4).B size = S.from_KiB(4)
def trace(vol, io, io_type):
if io_type == TraceDevice.IoType.Flush or int(io.contents._flags) & IoFlags.FLUSH:
count["flush"] += 1
elif io_type == TraceDevice.IoType.Discard:
count["discard"] += 1
else:
assert io_type == TraceDevice.IoType.Data
count["io"] += 1
assert io.contents._dir == IoDir.WRITE
assert io.contents._addr == addr
assert io.contents._bytes == size
return True
backend = RamVolume(S.from_MiB(1)) backend = RamVolume(S.from_MiB(1))
trace_dev = TraceDevice(backend, trace_fcn=trace) (vol,), io_trace = setup_tracing([backend])
cvol = CVolume(pyocf_ctx) cvol = CVolume(pyocf_ctx)
cvol.add(trace_dev) cvol.add(vol)
cvol.open() cvol.open()
# verify data properly propagated # verify data properly propagated
ret = _cvol_io(cvol, addr, size, "submit") ret = cvol_submit_data_io(cvol, addr, size)
assert ret == 0 assert ret == 0
expected["io"] += 1 assert len(io_trace[vol][TraceDevice.IoType.Data]) == 1
assert expected == count
# verify flush properly propagated # verify flush properly propagated
ret = _cvol_io(cvol, addr, size, "submit_flush", IoFlags.FLUSH) ret = cvol_submit_flush_io(cvol, addr, size, IoFlags.FLUSH)
assert ret == 0 assert ret == 0
expected["flush"] += 1 assert len(io_trace[vol][TraceDevice.IoType.Flush]) == 1
assert expected == count
# verify discard properly propagated # verify discard properly propagated
ret = _cvol_io(cvol, addr, size, "submit_discard") ret = cvol_submit_discard_io(cvol, addr, size)
assert ret == 0 assert ret == 0
expected["discard"] += 1 assert len(io_trace[vol][TraceDevice.IoType.Discard]) == 1
assert expected == count
cvol.close() cvol.close()
cvol.destroy() cvol.destroy()
@pytest.mark.skip(reason="not implemented")
def test_io_propagation_basic(pyocf_ctx): def test_io_propagation_basic(pyocf_ctx):
""" """
title: Perform volume operations with multiple subvolumes. title: Perform volume operations with multiple subvolumes.
@ -216,10 +262,51 @@ def test_io_propagation_basic(pyocf_ctx):
- composite_volume::volume_api - composite_volume::volume_api
- composite_volume::io_request_passing - composite_volume::io_request_passing
""" """
pass pyocf_ctx.register_volume_type(TraceDevice)
vol_size = S.from_MiB(1)
ram_vols = [RamVolume(vol_size * i) for i in range(1, 17)]
vols, io_trace = setup_tracing(ram_vols)
running_sum = S(0)
vol_begin = []
for v in ram_vols:
vol_begin.append(S(running_sum))
running_sum += S(v.size)
cvol = CVolume(pyocf_ctx)
for vol in vols:
cvol.add(vol)
cvol.open()
# hit each subvolume at different offset (vol number * 1 KiB)
io_addr = [i * S.from_KiB(1) + (vol_begin[i]) for i in range(len(vols))]
io_size = S.from_KiB(12)
for i, (vol, addr) in enumerate(zip(vols, io_addr)):
ret = cvol_submit_data_io(cvol, addr, io_size)
assert ret == 0
ret = cvol_submit_flush_io(cvol, addr, io_size, IoFlags.FLUSH)
assert ret == 0
ret = cvol_submit_discard_io(cvol, addr, io_size)
assert ret == 0
for io_type in TraceDevice.IoType:
ios = io_trace[vol][io_type]
assert len(ios) == 1
io = ios[0]
assert io.dir == IoDir.WRITE
assert io.addr == addr.B - int(vol_begin[i])
assert io.bytes == io_size.B
cvol.close()
cvol.destroy()
@pytest.mark.skip(reason="not implemented")
def test_io_propagation_cross_boundary(pyocf_ctx): def test_io_propagation_cross_boundary(pyocf_ctx):
""" """
title: Perform cross-subvolume operations. title: Perform cross-subvolume operations.
@ -242,11 +329,106 @@ def test_io_propagation_cross_boundary(pyocf_ctx):
requirements: requirements:
- composite_volume::io_request_passing - composite_volume::io_request_passing
""" """
pass pyocf_ctx.register_volume_type(TraceDevice)
vol_size = S.from_MiB(1)
ram_vols = [RamVolume(vol_size * i) for i in range(16, 0, -1)]
vols, io_trace = setup_tracing(ram_vols)
running_sum = S(0)
vol_begin = []
for v in ram_vols:
vol_begin.append(S(running_sum))
running_sum += S(v.size)
cvol = CVolume(pyocf_ctx)
for vol in vols:
cvol.add(vol)
cvol.open()
io_size = S.from_KiB(12)
io_addr = [S(end) - (io_size / 2) for end in vol_begin[1:]]
for i, addr in enumerate(io_addr):
clear_tracing(io_trace)
ret = cvol_submit_data_io(cvol, addr, io_size)
assert ret == 0
ret = cvol_submit_flush_io(cvol, addr, io_size, IoFlags.FLUSH)
assert ret == 0
ret = cvol_submit_discard_io(cvol, addr, io_size)
assert ret == 0
for io_type in TraceDevice.IoType:
ios1 = io_trace[vols[i]][io_type]
ios2 = io_trace[vols[i + 1]][io_type]
assert len(ios1) == 1
io = ios1[0]
assert io.dir == IoDir.WRITE
assert io.addr == int(vols[i].vol.size - (io_size / 2))
assert io.bytes == io_size.B / 2
assert len(ios2) == 1
io = ios2[0]
assert io.dir == IoDir.WRITE
assert io.addr == 0
assert io.bytes == io_size.B / 2
cvol.close()
cvol.destroy()
@pytest.mark.skip(reason="not implemented") def test_io_propagation_entire_dev(pyocf_ctx):
def test_io_propagation_multiple_subvolumes(pyocf_ctx): """
title: Perform flush with 0 size
description: |
Check that flush operation with 0 size gets propagated to all
subvolumes.
pass_criteria:
- Composite volume is created without an error.
- Subvolumes are added without an error.
- Flush is propagated to all subvolumes
steps:
- Create composite volume
- Add 16 mock volumes as subvolumes
- Submit flush with size == 0
- Check if flush is sent to all subvolumes
- Destroy composite volume
requirements:
- composite_volume::io_request_passing
"""
pyocf_ctx.register_volume_type(TraceDevice)
vol_size = S.from_MiB(1)
ram_vols = [RamVolume(vol_size * (3 if i % 2 else 1)) for i in range(16)]
vols, io_trace = setup_tracing(ram_vols)
cvol = CVolume(pyocf_ctx)
for vol in vols:
cvol.add(vol)
cvol.open()
ret = cvol_submit_flush_io(cvol, 0, 0, IoFlags.FLUSH)
assert ret == 0
for vol, io_types in io_trace.items():
assert len(io_types[TraceDevice.IoType.Flush]) == 1
assert io_types[TraceDevice.IoType.Flush][0].addr == 0
assert io_types[TraceDevice.IoType.Flush][0].bytes == 0
cvol.close()
cvol.destroy()
@pytest.mark.parametrize("rand_seed", [datetime.now().timestamp()])
def test_io_propagation_multiple_subvolumes(pyocf_ctx, rand_seed):
""" """
title: Perform multi-subvolume operations. title: Perform multi-subvolume operations.
description: | description: |
@ -268,11 +450,67 @@ def test_io_propagation_multiple_subvolumes(pyocf_ctx):
requirements: requirements:
- composite_volume::io_request_passing - composite_volume::io_request_passing
""" """
pass random.seed(rand_seed)
pyocf_ctx.register_volume_type(TraceDevice)
vol_size = S.from_MiB(1)
ram_vols = [RamVolume(vol_size) for _ in range(16)]
vols, io_trace = setup_tracing(ram_vols)
cvol = CVolume(pyocf_ctx)
for vol in vols:
cvol.add(vol)
cvol.open()
for subvol_count in range(2, len(vols) + 1):
clear_tracing(io_trace)
first_idx = random.randint(0, len(vols) - subvol_count)
# I/O addres range start/end offsets within a subvolume
start_offset = S.from_B(random.randint(0, vol_size.B // 512 - 1) * 512)
end_offset = S.from_B(random.randint(0, vol_size.B // 512 - 1) * 512)
size = (vol_size - start_offset) + (subvol_count - 2) * vol_size + end_offset
addr = first_idx * vol_size + start_offset
# aliases for subvolumes for easy referencing
first = vols[first_idx]
middle = vols[(first_idx + 1):(first_idx + subvol_count - 1)]
last = vols[first_idx + subvol_count - 1]
subvols = vols[(first_idx):(first_idx + subvol_count)]
ret = cvol_submit_data_io(cvol, addr, size)
assert ret == 0
ret = cvol_submit_flush_io(cvol, addr, size, IoFlags.FLUSH)
assert ret == 0
ret = cvol_submit_discard_io(cvol, addr, size)
assert ret == 0
for vol in middle:
for io in io_trace[vol].values():
assert len(io) == 1
assert io[0].addr == 0
assert io[0].bytes == int(vol.vol.size)
for io in io_trace[first].values():
assert io[0].addr == int(start_offset)
assert io[0].bytes == int(vol_size - start_offset)
for io in io_trace[last].values():
assert io[0].addr == 0
assert io[0].bytes == int(end_offset)
cvol.close()
cvol.destroy()
@pytest.mark.skip(reason="not implemented") @pytest.mark.parametrize("rand_seed", [datetime.now().timestamp()])
def test_io_completion(pyocf_ctx): def test_io_completion(pyocf_ctx, rand_seed):
""" """
title: Composite volume completion order. title: Composite volume completion order.
description: | description: |
@ -294,11 +532,102 @@ def test_io_completion(pyocf_ctx):
requirements: requirements:
- composite_volume::io_request_completion - composite_volume::io_request_completion
""" """
pass random.seed(rand_seed)
class PendingIoVolume(RamVolume):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.pending_ios = []
self.io_submitted = Event()
def do_submit_io(self, io):
self.pending_ios.append(("io", io))
self.io_submitted.set()
def do_submit_flush(self, flush):
self.pending_ios.append(("flush", flush))
self.io_submitted.set()
def do_submit_discard(self, discard):
self.pending_ios.append(("discard", discard))
self.io_submitted.set()
def wait_submitted(self):
self.io_submitted.wait()
self.io_submitted.clear()
def resume_next(self):
if not self.pending_ios:
return False
io_type, io = self.pending_ios.pop()
if io_type == "io":
super().do_submit_io(io)
elif io_type == "flush":
super().do_submit_flush(io)
elif io_type == "discard":
super().do_submit_discard(io)
else:
assert False
return True
pyocf_ctx.register_volume_type(PendingIoVolume)
vol_size = S.from_MiB(1)
vols = [PendingIoVolume(vol_size) for _ in range(16)]
cvol = CVolume(pyocf_ctx)
for vol in vols:
cvol.add(vol)
cvol.open()
for subvol_count in range(2, len(vols)):
# start I/O at an offset in the first volume
addr = vol_size / 2
size = (subvol_count - 1) * vol_size
for op, flags in [("submit", 0), ("submit_flush", IoFlags.FLUSH), ("submit_discard", 0)]:
io = cvol.new_io(
queue=None,
addr=addr,
length=size,
direction=IoDir.WRITE,
io_class=0,
flags=flags,
)
completion = OcfCompletion([("err", c_int)])
io.callback = completion.callback
data = Data(size)
io.set_data(data, 0)
submit_fn = getattr(io, op)
submit_fn()
pending_vols = vols[:subvol_count]
for v in pending_vols:
v.wait_submitted()
assert not completion.completed()
random.shuffle(pending_vols)
for v in pending_vols:
assert not completion.completed()
assert v.resume_next()
assert not v.resume_next()
assert completion.wait(timeout=10)
assert int(completion.results["err"]) == 0
cvol.close()
cvol.destroy()
@pytest.mark.skip(reason="not implemented") @pytest.mark.parametrize("rand_seed", [datetime.now().timestamp()])
def test_io_completion(pyocf_ctx): def test_io_error(pyocf_ctx, rand_seed):
""" """
title: Composite volume error propagation. title: Composite volume error propagation.
description: | description: |
@ -321,7 +650,43 @@ def test_io_completion(pyocf_ctx):
requirements: requirements:
- composite_volume::io_error_handling - composite_volume::io_error_handling
""" """
pass random.seed(rand_seed)
pyocf_ctx.register_volume_type(TraceDevice)
vol_size = S.from_MiB(1)
ram_vols = [RamVolume(vol_size) for _ in range(16)]
err_vols = [ErrorDevice(rv, armed=False, error_seq_no={IoDir.WRITE: 0}) for rv in ram_vols]
cvol = CVolume(pyocf_ctx)
for vol in err_vols:
cvol.add(vol)
cvol.open()
for subvol_count in range(2, len(err_vols)):
# start I/O at an offset in the first volume
addr = vol_size / 2
size = subvol_count * vol_size
error_idx = random.randrange(0, subvol_count)
err_vols[error_idx].arm()
# verify data properly propagated
ret = cvol_submit_data_io(cvol, addr, size)
assert ret == -OcfErrorCode.OCF_ERR_IO
# verify flush properly propagated
ret = cvol_submit_flush_io(cvol, addr, size, IoFlags.FLUSH)
assert ret == -OcfErrorCode.OCF_ERR_IO
# verdiscard discard properly propagated
ret = cvol_submit_discard_io(cvol, addr, size)
assert ret == -OcfErrorCode.OCF_ERR_IO
err_vols[error_idx].disarm()
cvol.close()
cvol.destroy()
def test_attach(pyocf_ctx): def test_attach(pyocf_ctx):