diff --git a/tests/functional/pyocf/rio.py b/tests/functional/pyocf/rio.py new file mode 100644 index 0000000..6231b42 --- /dev/null +++ b/tests/functional/pyocf/rio.py @@ -0,0 +1,289 @@ +# +# Copyright(c) 2021 Intel Corporation +# SPDX-License-Identifier: BSD-3-Clause +# + +from ctypes import c_int, c_void_p, CFUNCTYPE +from enum import Enum, auto +from random import Random +from dataclasses import dataclass +from datetime import timedelta, datetime +from itertools import cycle +from threading import Thread, Condition, Event +from copy import deepcopy + +from pyocf.utils import Size +from pyocf.types.volume import Volume +from pyocf.types.io import Io, IoDir +from pyocf.types.data import Data + + +class ReadWrite(Enum): + READ = auto() + WRITE = auto() + RANDREAD = auto() + RANDWRITE = auto() + RANDRW = auto() + + def is_random(self): + return self in [self.RANDREAD, self.RANDWRITE, self.RANDRW] + + +class IoGen: + def __init__(self, extent, blocksize=512, seed=0, random=False, randommap=True): + self.random_gen = Random(seed) + self.random = random + self.randommap = randommap + + gen = list(range(extent[0].B, extent[0].B + extent[1].B, blocksize.B)) + self.cycle_len = len(gen) + + if random: + self.random_gen.shuffle(gen) + + if self.randommap: + self.gen = cycle(gen) + self.gen_fcn = lambda: next(self.gen) + else: + self.gen = gen + self.gen_fcn = lambda: self.random_gen.choice(self.gen) + + def __iter__(self): + return self + + def __next__(self): + return self.gen_fcn() + + +@dataclass +class JobSpec: + readwrite: ReadWrite = ReadWrite.READ + randseed: int = 1 + rwmixwrite: int = 50 + randommap: bool = True + bs: Size = Size.from_B(512) + offset: Size = Size(0) + njobs: int = 1 + qd: int = 1 + size: Size = Size(0) + io_size: Size = Size(0) + target: Volume = None + time_based: bool = False + time: timedelta = None + continue_on_error: bool = False + + def merge(self, other): + # TODO implement + return other + + +class Rio: + class RioThread(Thread): + def __init__(self, jobspec: JobSpec, queue): + super().__init__() + self.jobspec = jobspec + self.queue = queue + self.ios = Size(0) + self.io_target = 0 + self.finish_time = None + + self.qd_condition = Condition() + self.qd = 0 + + self.stop_event = Event() + + self.errors = [] + + def should_finish(self): + if self.stop_event.is_set(): + return True + + if self.jobspec.time_based: + if datetime.now() >= self.finish_time: + return True + elif self.ios >= self.io_target: + return True + + return False + + def get_io_cb(self): + def cb(error): + if error != 0: + self.errors.append(error) + if not self.jobspec.continue_on_error: + print(f"Aborting on error {error}") + self.abort() + with self.qd_condition as c: + self.qd -= 1 + self.qd_condition.notify_all() + + return cb + + def abort(self): + self.stop_event.set() + + def run(self): + iogen = IoGen( + (self.jobspec.offset, self.jobspec.size), + self.jobspec.bs, + self.jobspec.randseed, + self.jobspec.readwrite.is_random(), + self.jobspec.randommap, + ) + + if self.jobspec.time_based: + self.finish_time = datetime.now() + self.jobspec.time + else: + if int(self.jobspec.io_size) != 0: + self.io_target = min( + self.jobspec.io_size, self.jobspec.size - self.jobspec.offset + ) + else: + self.io_target = self.jobspec.size - self.jobspec.offset + + # TODO randrw + iodir = ( + IoDir.WRITE + if self.jobspec.readwrite in [ReadWrite.WRITE, ReadWrite.RANDWRITE] + else IoDir.READ + ) + + while not self.should_finish(): + with self.qd_condition: + self.qd_condition.wait_for(lambda: self.qd <= self.jobspec.qd) + + data = Data(self.jobspec.bs) # TODO pattern and verify + io = self.jobspec.target.new_io( + self.queue, + next(iogen), + self.jobspec.bs, + iodir, + 0, + 0, + ) + io.set_data(data) + io.callback = self.get_io_cb() + self.ios += self.jobspec.bs + io.submit() + with self.qd_condition: + self.qd += 1 + + with self.qd_condition: + self.qd_condition.wait_for(lambda: self.qd == 0) + + def __init__(self): + self.global_jobspec = JobSpec() + self.jobs = [] + + self._threads = [] + self.errors = {} + self.error_count = 0 + + def readwrite(self, rw: ReadWrite): + self.global_jobspec.readwrite = rw + return self + + def rwmixwrite(self, mix: int): + self.global_jobspec.rwmixwrite = mix + return self + + def rwmixread(self, mix: int): + self.global_jobspec.rwmixwrite = 100 - mix + return self + + def norandommap(self): + self.global_jobspec.randommap = False + return self + + def bs(self, bs: Size): + self.global_jobspec.bs = bs + return self + + def offset(self, offset: Size): + self.global_jobspec.offset = offset + return self + + def njobs(self, njobs: int): + self.global_jobspec.njobs = njobs + return self + + def qd(self, qd: int): + self.global_jobspec.qd = qd + return self + + def target(self, target: Volume): + self.global_jobspec.target = target + return self + + def add_job(self, job: JobSpec): + self.jobs.append(job) + return self + + def size(self, size: Size): + self.global_jobspec.size = size + return self + + def io_size(self, size: Size): + self.global_jobspec.io_size = size + return self + + def time_based(self): + self.global_jobspec.time_based = True + return self + + def time(self, time: timedelta): + self.global_jobspec.time = time + return self + + def continue_on_error(self): + self.global_jobspec.continue_on_error = True + return self + + def abort(self): + for thread in self._threads: + thread.abort() + + self.wait_for_completion() + return self + + def wait_for_completion(self): + for thread in self._threads: + thread.join() + self.errors.update({thread.name: thread.errors}) + self.error_count += len(thread.errors) + + return self + + def __del__(self): + self.wait_for_completion() + + def clear(self): + self._threads = [] + self.errors = {} + + def run(self, queues=None): + self.run_async(queues) + self.wait_for_completion() + return self + + def run_async(self, queues=None): + self.clear() + + jobs = deepcopy(self.jobs) + + if not jobs: + jobs = [self.global_jobspec for _ in range(self.global_jobspec.njobs)] + + if not queues: + queues = [self.global_jobspec.target.cache.get_default_queue()] + queues = cycle(queues) + + for job in jobs: + spec = job.merge(self.global_jobspec) + thread = Rio.RioThread(spec, next(queues)) + self._threads.append(thread) + + for thread in self._threads: + thread.start() + + return self diff --git a/tests/functional/pyocf/types/cache.py b/tests/functional/pyocf/types/cache.py index a90598f..3efc8ca 100644 --- a/tests/functional/pyocf/types/cache.py +++ b/tests/functional/pyocf/types/cache.py @@ -798,6 +798,9 @@ class Cache: finally: self.read_unlock() + # settle all queues accociated with this cache (mngt and I/O) + def settle(self): + Queue.settle_many(self.io_queues + [self.mngt_queue]) lib = OcfLib.getInstance() lib.ocf_mngt_cache_remove_core.argtypes = [c_void_p, c_void_p, c_void_p] diff --git a/tests/functional/pyocf/types/queue.py b/tests/functional/pyocf/types/queue.py index 112a848..f38ff35 100644 --- a/tests/functional/pyocf/types/queue.py +++ b/tests/functional/pyocf/types/queue.py @@ -4,7 +4,7 @@ # from ctypes import c_void_p, CFUNCTYPE, Structure, byref -from threading import Thread, Condition, Event +from threading import Thread, Condition, Event, Semaphore import weakref from ..ocf import OcfLib @@ -23,7 +23,7 @@ class Queue: pass -def io_queue_run(*, queue: Queue, kick: Condition, stop: Event): +def io_queue_run(*, queue: Queue, kick: Condition, stop: Event, sem: Semaphore): def wait_predicate(): return stop.is_set() or OcfLib.getInstance().ocf_queue_pending_io(queue) @@ -31,7 +31,9 @@ def io_queue_run(*, queue: Queue, kick: Condition, stop: Event): with kick: kick.wait_for(wait_predicate) + sem.acquire() OcfLib.getInstance().ocf_queue_run(queue) + sem.release() if stop.is_set() and not OcfLib.getInstance().ocf_queue_pending_io(queue): break @@ -56,6 +58,7 @@ class Queue: self.stop_event = Event() self.kick_condition = Condition() + self.sem = Semaphore() self.thread = Thread( group=None, target=io_queue_run, @@ -64,6 +67,7 @@ class Queue: "queue": self, "kick": self.kick_condition, "stop": self.stop_event, + "sem": self.sem, }, ) self.thread.start() @@ -103,3 +107,37 @@ class Queue: self.kick_condition.notify_all() self.thread.join() + + # settle - wait for OCF to finish execution within this queue context + # + # In some tests simply waiting for I/O to finish is not enough. Most + # notably some statistics are potentially incremented after user triggered + # I/O is finished. This is due to asynchronous nature of I/O operations + # and OCF approach to statistics update, where only eventual consistency + # is guaranteed without explicit mechanism available to query whether + # the final state is reached yet. However it is fully within the adapter power + # to determine this, as OCF executes in context of API calls from the + # adapter (like I/O submission, ocf_queue_run, ocf_cleaner_run, management + # operations) and I/O completion callbacks. Queue settle is a mechanism to + # assure ocf_queue_run is not being executed by the thread associated with + # a queue. + # + # With queue settle mechanism it is straightforward for the client to + # wait for cache statistics to reach fixed values: + # 1. wait for all I/O to OCF to finish + # 2. settle all I/O queues + # 3. make sure background cleaner is not active + # + def settle(self): + busy = not self.sem.acquire(blocking=False) + if busy: + self.sem.acquire() + self.sem.release() + return busy + + # wait until all queues provided settle + @staticmethod + def settle_many(qlist: [Queue]): + status = [True] + while any(status): + status = [q.settle() for q in qlist] diff --git a/tests/functional/pyocf/utils.py b/tests/functional/pyocf/utils.py index 0c86ae1..3e00dbc 100644 --- a/tests/functional/pyocf/utils.py +++ b/tests/functional/pyocf/utils.py @@ -82,6 +82,24 @@ class Size: else: self.bytes = int(b) + def __lt__(self, other): + return int(self) < int(other) + + def __le__(self, other): + return int(self) <= int(other) + + def __eq__(self, other): + return int(self) == int(other) + + def __ne__(self, other): + return int(self) != int(other) + + def __gt__(self, other): + return int(self) > int(other) + + def __ge__(self, other): + return int(self) >= int(other) + def __int__(self): return self.bytes diff --git a/tests/functional/tests/basic/test_pyocf.py b/tests/functional/tests/basic/test_pyocf.py index 6899ea9..7a36b1d 100644 --- a/tests/functional/tests/basic/test_pyocf.py +++ b/tests/functional/tests/basic/test_pyocf.py @@ -13,6 +13,7 @@ from pyocf.types.data import Data from pyocf.types.io import IoDir from pyocf.utils import Size as S from pyocf.types.shared import OcfError, OcfCompletion +from pyocf.rio import Rio, ReadWrite def test_ctx_fixture(pyocf_ctx): @@ -31,18 +32,9 @@ def test_simple_wt_write(pyocf_ctx): cache_device.reset_stats() core_device.reset_stats() - write_data = Data.from_string("This is test data") - io = core.new_io(cache.get_default_queue(), S.from_sector(1).B, - write_data.size, IoDir.WRITE, 0, 0) - io.set_data(write_data) - - cmpl = OcfCompletion([("err", c_int)]) - io.callback = cmpl.callback - io.submit() - cmpl.wait() - - assert cmpl.results["err"] == 0 + r = Rio().target(core).readwrite(ReadWrite.WRITE).size(S.from_sector(1)).run() assert cache_device.get_stats()[IoDir.WRITE] == 1 + cache.settle() stats = cache.get_stats() assert stats["req"]["wr_full_misses"]["value"] == 1 assert stats["usage"]["occupancy"]["value"] == 1 diff --git a/tests/functional/tests/engine/test_pp.py b/tests/functional/tests/engine/test_pp.py index c9dde8d..5fab028 100644 --- a/tests/functional/tests/engine/test_pp.py +++ b/tests/functional/tests/engine/test_pp.py @@ -6,6 +6,7 @@ from ctypes import c_int import pytest import math +from datetime import timedelta from pyocf.types.cache import Cache, PromotionPolicy, NhitParams from pyocf.types.core import Core @@ -14,6 +15,7 @@ from pyocf.types.data import Data from pyocf.types.io import IoDir from pyocf.utils import Size from pyocf.types.shared import OcfCompletion +from pyocf.rio import Rio, ReadWrite @pytest.mark.parametrize("promotion_policy", PromotionPolicy) @@ -62,49 +64,35 @@ def test_change_to_nhit_and_back_io_in_flight(pyocf_ctx): cache.add_core(core) # Step 2 - completions = [] - for i in range(2000): - comp = OcfCompletion([("error", c_int)]) - write_data = Data(4096) - io = core.new_io( - cache.get_default_queue(), i * 4096, write_data.size, IoDir.WRITE, 0, 0 - ) - completions += [comp] - io.set_data(write_data) - io.callback = comp.callback - io.submit() + r = ( + Rio() + .target(core) + .njobs(10) + .bs(Size.from_KiB(4)) + .readwrite(ReadWrite.RANDWRITE) + .size(core_device.size) + .time_based() + .time(timedelta(minutes=1)) + .qd(10) + .run_async() + ) # Step 3 cache.set_promotion_policy(PromotionPolicy.NHIT) # Step 4 - for c in completions: - c.wait() - assert not c.results["error"], "No IO's should fail when turning NHIT policy on" + r.abort() + assert r.error_count == 0, "No IO's should fail when turning NHIT policy on" # Step 5 - completions = [] - for i in range(2000): - comp = OcfCompletion([("error", c_int)]) - write_data = Data(4096) - io = core.new_io( - cache.get_default_queue(), i * 4096, write_data.size, IoDir.WRITE, 0, 0 - ) - completions += [comp] - io.set_data(write_data) - io.callback = comp.callback - io.submit() + r.run_async() # Step 6 cache.set_promotion_policy(PromotionPolicy.ALWAYS) # Step 7 - for c in completions: - c.wait() - assert not c.results[ - "error" - ], "No IO's should fail when turning NHIT policy off" - + r.abort() + assert r.error_count == 0, "No IO's should fail when turning NHIT policy off" def fill_cache(cache, fill_ratio): """ @@ -116,47 +104,19 @@ def fill_cache(cache, fill_ratio): cache_lines = cache.get_stats()["conf"]["size"] - bytes_to_fill = cache_lines.bytes * fill_ratio - max_io_size = cache.device.get_max_io_size().bytes - - ios_to_issue = math.floor(bytes_to_fill / max_io_size) + bytes_to_fill = Size(round(cache_lines.bytes * fill_ratio)) core = cache.cores[0] - completions = [] - for i in range(ios_to_issue): - comp = OcfCompletion([("error", c_int)]) - write_data = Data(max_io_size) - io = core.new_io( - cache.get_default_queue(), - i * max_io_size, - write_data.size, - IoDir.WRITE, - 0, - 0, - ) - io.set_data(write_data) - io.callback = comp.callback - completions += [comp] - io.submit() - if bytes_to_fill % max_io_size: - comp = OcfCompletion([("error", c_int)]) - write_data = Data(Size.from_B(bytes_to_fill % max_io_size, sector_aligned=True)) - io = core.new_io( - cache.get_default_queue(), - ios_to_issue * max_io_size, - write_data.size, - IoDir.WRITE, - 0, - 0, - ) - io.set_data(write_data) - io.callback = comp.callback - completions += [comp] - io.submit() - - for c in completions: - c.wait() + r = ( + Rio() + .target(core) + .readwrite(ReadWrite.RANDWRITE) + .size(bytes_to_fill) + .bs(Size(512)) + .qd(10) + .run() + ) @pytest.mark.parametrize("fill_percentage", [0, 1, 50, 99]) @@ -194,33 +154,27 @@ def test_promoted_after_hits_various_thresholds( # Step 3 fill_cache(cache, fill_percentage / 100) + cache.settle() stats = cache.get_stats() cache_lines = stats["conf"]["size"] assert stats["usage"]["occupancy"]["fraction"] // 10 == fill_percentage * 10 filled_occupancy = stats["usage"]["occupancy"]["value"] # Step 4 - last_core_line = int(core_device.size) - cache_lines.line_size - completions = [] + last_core_line = Size(int(core_device.size) - cache_lines.line_size) + r = ( + Rio() + .readwrite(ReadWrite.WRITE) + .bs(Size(4096)) + .offset(last_core_line) + .target(core) + .size(Size(4096) + last_core_line) + ) + for i in range(insertion_threshold - 1): - comp = OcfCompletion([("error", c_int)]) - write_data = Data(cache_lines.line_size) - io = core.new_io( - cache.get_default_queue(), - last_core_line, - write_data.size, - IoDir.WRITE, - 0, - 0, - ) - completions += [comp] - io.set_data(write_data) - io.callback = comp.callback - io.submit() - - for c in completions: - c.wait() + r.run() + cache.settle() stats = cache.get_stats() threshold_reached_occupancy = stats["usage"]["occupancy"]["value"] assert threshold_reached_occupancy == filled_occupancy, ( @@ -229,20 +183,13 @@ def test_promoted_after_hits_various_thresholds( ) # Step 5 - comp = OcfCompletion([("error", c_int)]) - write_data = Data(cache_lines.line_size) - io = core.new_io( - cache.get_default_queue(), last_core_line, write_data.size, IoDir.WRITE, 0, 0 - ) - io.set_data(write_data) - io.callback = comp.callback - io.submit() - - comp.wait() + r.run() + cache.settle() + stats = cache.get_stats() assert ( threshold_reached_occupancy - == cache.get_stats()["usage"]["occupancy"]["value"] - 1 + == stats["usage"]["occupancy"]["value"] - 1 ), "Previous request should be promoted and occupancy should rise" @@ -268,14 +215,7 @@ def test_partial_hit_promotion(pyocf_ctx): cache.add_core(core) # Step 2 - comp = OcfCompletion([("error", c_int)]) - write_data = Data(Size.from_sector(1)) - io = core.new_io(cache.get_default_queue(), 0, write_data.size, IoDir.READ, 0, 0) - io.set_data(write_data) - io.callback = comp.callback - io.submit() - - comp.wait() + r = Rio().readwrite(ReadWrite.READ).bs(Size(512)).size(Size(512)).target(core).run() stats = cache.get_stats() cache_lines = stats["conf"]["size"] @@ -291,14 +231,10 @@ def test_partial_hit_promotion(pyocf_ctx): ) # Step 4 - comp = OcfCompletion([("error", c_int)]) - write_data = Data(2 * cache_lines.line_size) - io = core.new_io(cache.get_default_queue(), 0, write_data.size, IoDir.WRITE, 0, 0) - io.set_data(write_data) - io.callback = comp.callback - io.submit() - comp.wait() + req_size = Size(2 * cache_lines.line_size) + r.size(req_size).bs(req_size).readwrite(ReadWrite.WRITE).run() + cache.settle() stats = cache.get_stats() assert ( stats["usage"]["occupancy"]["value"] == 2