diff --git a/tests/functional/tests/engine/test_pp.py b/tests/functional/tests/engine/test_pp.py index c9dde8d..5fab028 100644 --- a/tests/functional/tests/engine/test_pp.py +++ b/tests/functional/tests/engine/test_pp.py @@ -6,6 +6,7 @@ from ctypes import c_int import pytest import math +from datetime import timedelta from pyocf.types.cache import Cache, PromotionPolicy, NhitParams from pyocf.types.core import Core @@ -14,6 +15,7 @@ from pyocf.types.data import Data from pyocf.types.io import IoDir from pyocf.utils import Size from pyocf.types.shared import OcfCompletion +from pyocf.rio import Rio, ReadWrite @pytest.mark.parametrize("promotion_policy", PromotionPolicy) @@ -62,49 +64,35 @@ def test_change_to_nhit_and_back_io_in_flight(pyocf_ctx): cache.add_core(core) # Step 2 - completions = [] - for i in range(2000): - comp = OcfCompletion([("error", c_int)]) - write_data = Data(4096) - io = core.new_io( - cache.get_default_queue(), i * 4096, write_data.size, IoDir.WRITE, 0, 0 - ) - completions += [comp] - io.set_data(write_data) - io.callback = comp.callback - io.submit() + r = ( + Rio() + .target(core) + .njobs(10) + .bs(Size.from_KiB(4)) + .readwrite(ReadWrite.RANDWRITE) + .size(core_device.size) + .time_based() + .time(timedelta(minutes=1)) + .qd(10) + .run_async() + ) # Step 3 cache.set_promotion_policy(PromotionPolicy.NHIT) # Step 4 - for c in completions: - c.wait() - assert not c.results["error"], "No IO's should fail when turning NHIT policy on" + r.abort() + assert r.error_count == 0, "No IO's should fail when turning NHIT policy on" # Step 5 - completions = [] - for i in range(2000): - comp = OcfCompletion([("error", c_int)]) - write_data = Data(4096) - io = core.new_io( - cache.get_default_queue(), i * 4096, write_data.size, IoDir.WRITE, 0, 0 - ) - completions += [comp] - io.set_data(write_data) - io.callback = comp.callback - io.submit() + r.run_async() # Step 6 cache.set_promotion_policy(PromotionPolicy.ALWAYS) # Step 7 - for c in completions: - c.wait() - assert not c.results[ - "error" - ], "No IO's should fail when turning NHIT policy off" - + r.abort() + assert r.error_count == 0, "No IO's should fail when turning NHIT policy off" def fill_cache(cache, fill_ratio): """ @@ -116,47 +104,19 @@ def fill_cache(cache, fill_ratio): cache_lines = cache.get_stats()["conf"]["size"] - bytes_to_fill = cache_lines.bytes * fill_ratio - max_io_size = cache.device.get_max_io_size().bytes - - ios_to_issue = math.floor(bytes_to_fill / max_io_size) + bytes_to_fill = Size(round(cache_lines.bytes * fill_ratio)) core = cache.cores[0] - completions = [] - for i in range(ios_to_issue): - comp = OcfCompletion([("error", c_int)]) - write_data = Data(max_io_size) - io = core.new_io( - cache.get_default_queue(), - i * max_io_size, - write_data.size, - IoDir.WRITE, - 0, - 0, - ) - io.set_data(write_data) - io.callback = comp.callback - completions += [comp] - io.submit() - if bytes_to_fill % max_io_size: - comp = OcfCompletion([("error", c_int)]) - write_data = Data(Size.from_B(bytes_to_fill % max_io_size, sector_aligned=True)) - io = core.new_io( - cache.get_default_queue(), - ios_to_issue * max_io_size, - write_data.size, - IoDir.WRITE, - 0, - 0, - ) - io.set_data(write_data) - io.callback = comp.callback - completions += [comp] - io.submit() - - for c in completions: - c.wait() + r = ( + Rio() + .target(core) + .readwrite(ReadWrite.RANDWRITE) + .size(bytes_to_fill) + .bs(Size(512)) + .qd(10) + .run() + ) @pytest.mark.parametrize("fill_percentage", [0, 1, 50, 99]) @@ -194,33 +154,27 @@ def test_promoted_after_hits_various_thresholds( # Step 3 fill_cache(cache, fill_percentage / 100) + cache.settle() stats = cache.get_stats() cache_lines = stats["conf"]["size"] assert stats["usage"]["occupancy"]["fraction"] // 10 == fill_percentage * 10 filled_occupancy = stats["usage"]["occupancy"]["value"] # Step 4 - last_core_line = int(core_device.size) - cache_lines.line_size - completions = [] + last_core_line = Size(int(core_device.size) - cache_lines.line_size) + r = ( + Rio() + .readwrite(ReadWrite.WRITE) + .bs(Size(4096)) + .offset(last_core_line) + .target(core) + .size(Size(4096) + last_core_line) + ) + for i in range(insertion_threshold - 1): - comp = OcfCompletion([("error", c_int)]) - write_data = Data(cache_lines.line_size) - io = core.new_io( - cache.get_default_queue(), - last_core_line, - write_data.size, - IoDir.WRITE, - 0, - 0, - ) - completions += [comp] - io.set_data(write_data) - io.callback = comp.callback - io.submit() - - for c in completions: - c.wait() + r.run() + cache.settle() stats = cache.get_stats() threshold_reached_occupancy = stats["usage"]["occupancy"]["value"] assert threshold_reached_occupancy == filled_occupancy, ( @@ -229,20 +183,13 @@ def test_promoted_after_hits_various_thresholds( ) # Step 5 - comp = OcfCompletion([("error", c_int)]) - write_data = Data(cache_lines.line_size) - io = core.new_io( - cache.get_default_queue(), last_core_line, write_data.size, IoDir.WRITE, 0, 0 - ) - io.set_data(write_data) - io.callback = comp.callback - io.submit() - - comp.wait() + r.run() + cache.settle() + stats = cache.get_stats() assert ( threshold_reached_occupancy - == cache.get_stats()["usage"]["occupancy"]["value"] - 1 + == stats["usage"]["occupancy"]["value"] - 1 ), "Previous request should be promoted and occupancy should rise" @@ -268,14 +215,7 @@ def test_partial_hit_promotion(pyocf_ctx): cache.add_core(core) # Step 2 - comp = OcfCompletion([("error", c_int)]) - write_data = Data(Size.from_sector(1)) - io = core.new_io(cache.get_default_queue(), 0, write_data.size, IoDir.READ, 0, 0) - io.set_data(write_data) - io.callback = comp.callback - io.submit() - - comp.wait() + r = Rio().readwrite(ReadWrite.READ).bs(Size(512)).size(Size(512)).target(core).run() stats = cache.get_stats() cache_lines = stats["conf"]["size"] @@ -291,14 +231,10 @@ def test_partial_hit_promotion(pyocf_ctx): ) # Step 4 - comp = OcfCompletion([("error", c_int)]) - write_data = Data(2 * cache_lines.line_size) - io = core.new_io(cache.get_default_queue(), 0, write_data.size, IoDir.WRITE, 0, 0) - io.set_data(write_data) - io.callback = comp.callback - io.submit() - comp.wait() + req_size = Size(2 * cache_lines.line_size) + r.size(req_size).bs(req_size).readwrite(ReadWrite.WRITE).run() + cache.settle() stats = cache.get_stats() assert ( stats["usage"]["occupancy"]["value"] == 2