From 96124ca87a61d71ab6ad60787fbd132e35302730 Mon Sep 17 00:00:00 2001 From: Jan Musial Date: Tue, 31 Aug 2021 12:14:43 +0200 Subject: [PATCH 1/9] Enable Size comparisons in pyocf Signed-off-by: Jan Musial --- tests/functional/pyocf/utils.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/tests/functional/pyocf/utils.py b/tests/functional/pyocf/utils.py index 0c86ae1..3e00dbc 100644 --- a/tests/functional/pyocf/utils.py +++ b/tests/functional/pyocf/utils.py @@ -82,6 +82,24 @@ class Size: else: self.bytes = int(b) + def __lt__(self, other): + return int(self) < int(other) + + def __le__(self, other): + return int(self) <= int(other) + + def __eq__(self, other): + return int(self) == int(other) + + def __ne__(self, other): + return int(self) != int(other) + + def __gt__(self, other): + return int(self) > int(other) + + def __ge__(self, other): + return int(self) >= int(other) + def __int__(self): return self.bytes From 41732090ae895afe014c07f1ee078b47251ffe57 Mon Sep 17 00:00:00 2001 From: Jan Musial Date: Tue, 31 Aug 2021 12:15:03 +0200 Subject: [PATCH 2/9] Implement rio io tester for pyocf rio stands for Rigid IO tester and is a simple mechanism for testing OCF cache IO. Signed-off-by: Jan Musial Signed-off-by: Adam Rutkowski --- tests/functional/pyocf/rio.py | 285 ++++++++++++++++++++++++++++++++++ 1 file changed, 285 insertions(+) create mode 100644 tests/functional/pyocf/rio.py diff --git a/tests/functional/pyocf/rio.py b/tests/functional/pyocf/rio.py new file mode 100644 index 0000000..e53e95a --- /dev/null +++ b/tests/functional/pyocf/rio.py @@ -0,0 +1,285 @@ +# +# Copyright(c) 2021 Intel Corporation +# SPDX-License-Identifier: BSD-3-Clause +# + +from ctypes import c_int, c_void_p, CFUNCTYPE +from enum import Enum, auto +from random import Random +from dataclasses import dataclass +from datetime import timedelta, datetime +from itertools import cycle +from threading import Thread, Condition, Event +from copy import deepcopy + +from pyocf.utils import Size +from pyocf.types.volume import Volume +from pyocf.types.io import Io, IoDir +from pyocf.types.data import Data + + +class ReadWrite(Enum): + READ = auto() + WRITE = auto() + RANDREAD = auto() + RANDWRITE = auto() + RANDRW = auto() + + def is_random(self): + return self in [self.RANDREAD, self.RANDWRITE, self.RANDRW] + + +class IoGen: + def __init__(self, extent, blocksize=512, seed=0, random=False, randommap=True): + self.random_gen = Random(seed) + self.random = random + self.randommap = randommap + + gen = list(range(extent[0], extent[1], blocksize)) + self.cycle_len = len(gen) + + if random: + self.random_gen.shuffle(gen) + + if self.randommap: + self.gen = cycle(gen) + self.gen_fcn = lambda: next(self.gen) + else: + self.gen = gen + self.gen_fcn = lambda: self.random_gen.choice(self.gen) + + def __iter__(self): + return self + + def __next__(self): + return self.gen_fcn() + + +@dataclass +class JobSpec: + readwrite: ReadWrite = ReadWrite.READ + randseed: int = 1 + rwmixwrite: int = 50 + randommap: bool = True + bs: Size = Size.from_B(512) + offset: Size = Size(0) + njobs: int = 1 + qd: int = 1 + size: Size = Size(0) + io_size: Size = Size(0) + target: Volume = None + time_based: bool = False + time: timedelta = None + continue_on_error: bool = False + + def merge(self, other): + # TODO implement + return other + + +class Rio: + class RioThread(Thread): + def __init__(self, jobspec: JobSpec): + super().__init__() + self.jobspec = jobspec + self.ios = Size(0) + self.io_target = 0 + self.finish_time = None + + self.qd_condition = Condition() + self.qd = 0 + + self.stop_event = Event() + + self.errors = [] + + def should_finish(self): + if self.stop_event.is_set(): + return True + + if self.jobspec.time_based: + if datetime.now() >= self.finish_time: + return True + elif self.ios >= self.io_target: + return True + + return False + + def get_io_cb(self): + @CFUNCTYPE(c_void_p, c_int) + def cb(error): + if error != 0: + self.errors.append(error) + if not self.jobspec.continue_on_error: + print(f"Aborting on error {error}") + self.abort() + with self.qd_condition as c: + self.qd -= 1 + self.qd_condition.notify_all() + + return cb + + def abort(self): + self.stop_event.set() + + def run(self): + iogen = IoGen( + (self.jobspec.offset, self.jobspec.size), + self.jobspec.bs, + self.jobspec.randseed, + self.jobspec.readwrite.is_random(), + self.jobspec.randommap, + ) + + if self.jobspec.time_based: + self.finish_time = datetime.now() + self.jobspec.time + else: + if self.jobspec.io_size != 0: + self.io_target = min( + self.jobspec.io_size, self.jobspec.size - self.jobspec.offset + ) + else: + self.io_target = self.jobspec.size - self.jobspec.offset + + # TODO randrw + iodir = ( + IoDir.WRITE + if self.jobspec.readwrite in [ReadWrite.WRITE, ReadWrite.RANDWRITE] + else IoDir.READ + ) + + while not self.should_finish(): + with self.qd_condition: + self.qd_condition.wait_for(lambda: self.qd <= self.jobspec.qd) + + data = Data(self.jobspec.bs) # TODO pattern and verify + io = self.jobspec.target.new_io( + self.jobspec.target.cache.get_default_queue(), # TODO multiple queues? + next(iogen), + self.jobspec.bs, + iodir, + 0, + 0, + ) + io.set_data(data) + io.callback = self.get_io_cb() + self.ios += self.jobspec.bs + io.submit() + with self.qd_condition: + self.qd += 1 + + with self.qd_condition: + self.qd_condition.wait_for(lambda: self.qd == 0) + + def __init__(self): + self.global_jobspec = JobSpec() + self.jobs = [] + + self._threads = [] + self.errors = {} + self.error_count = 0 + + def readwrite(self, rw: ReadWrite): + self.global_jobspec.readwrite = rw + return self + + def rwmixwrite(self, mix: int): + self.global_jobspec.rwmixwrite = mix + return self + + def rwmixread(self, mix: int): + self.global_jobspec.rwmixwrite = 100 - mix + return self + + def norandommap(self): + self.global_jobspec.randommap = False + return self + + def bs(self, bs: Size): + self.global_jobspec.bs = bs + return self + + def offset(self, offset: Size): + self.global_jobspec.offset = offset + return self + + def njobs(self, njobs: int): + self.global_jobspec.njobs = njobs + return self + + def qd(self, qd: int): + self.global_jobspec.qd = qd + return self + + def target(self, target: Volume): + self.global_jobspec.target = target + return self + + def add_job(self, job: JobSpec): + self.jobs.append(job) + return self + + def size(self, size: Size): + self.global_jobspec.size = size + return self + + def io_size(self, size: Size): + self.global_jobspec.io_size = size + return self + + def time_based(self): + self.global_jobspec.time_based = True + return self + + def time(self, time: timedelta): + self.global_jobspec.time = time + return self + + def continue_on_error(self): + self.global_jobspec.continue_on_error = True + return self + + def abort(self): + for thread in self._threads: + thread.abort() + + self.wait_for_completion() + return self + + def wait_for_completion(self): + for thread in self._threads: + thread.join() + self.errors.update({thread.name: thread.errors}) + self.error_count += len(thread.errors) + + return self + + def __del__(self): + self.wait_for_completion() + + def clear(self): + self._threads = [] + self.errors = {} + + def run(self): + self.run_async() + self.wait_for_completion() + return self + + def run_async(self): + self.clear() + + jobs = deepcopy(self.jobs) + + if not jobs: + jobs = [self.global_jobspec for _ in range(self.global_jobspec.njobs)] + + for job in jobs: + spec = job.merge(self.global_jobspec) + thread = Rio.RioThread(spec) + self._threads.append(thread) + + for thread in self._threads: + thread.start() + + return self From 17eddab0945639e48a7d154fa84544043fd3b358 Mon Sep 17 00:00:00 2001 From: Adam Rutkowski Date: Fri, 12 Nov 2021 13:07:41 +0100 Subject: [PATCH 3/9] pyocf: queue settle functionality Queue settle is a mechanism to wait for all OCF processing on a given queue to finish. In some tests simply waiting for I/O to finish is not enough. Most notably some statistics are potentially incremented after user triggered I/O is finished. This is due to asynchronous nature of I/O operations and OCF approach to statistics update, where only eventual consistency is guaranteed without explicit mechanism available to query whether the final state is reached yet. However it is fully in the adapter power to determine this, as OCF executes in context of API calls from the adapter (like I/O submission, ocf_queue_run, ocf_cleaner_run, management operations) and I/O completion callbacks. Queue settle is a mechanism to assure ocf_queue_run is not being executed by the thread associated with a queue. With queue settle mechanism it is straightforward for the adapter to wait for cache statistics to reach fixed values: 1. wait for all I/O to OCF to finish 2. settle all I/O queues 3. make sure background cleaner is not active Signed-off-by: Adam Rutkowski --- tests/functional/pyocf/types/cache.py | 3 ++ tests/functional/pyocf/types/queue.py | 42 +++++++++++++++++++++++++-- 2 files changed, 43 insertions(+), 2 deletions(-) diff --git a/tests/functional/pyocf/types/cache.py b/tests/functional/pyocf/types/cache.py index a90598f..3efc8ca 100644 --- a/tests/functional/pyocf/types/cache.py +++ b/tests/functional/pyocf/types/cache.py @@ -798,6 +798,9 @@ class Cache: finally: self.read_unlock() + # settle all queues accociated with this cache (mngt and I/O) + def settle(self): + Queue.settle_many(self.io_queues + [self.mngt_queue]) lib = OcfLib.getInstance() lib.ocf_mngt_cache_remove_core.argtypes = [c_void_p, c_void_p, c_void_p] diff --git a/tests/functional/pyocf/types/queue.py b/tests/functional/pyocf/types/queue.py index 112a848..f38ff35 100644 --- a/tests/functional/pyocf/types/queue.py +++ b/tests/functional/pyocf/types/queue.py @@ -4,7 +4,7 @@ # from ctypes import c_void_p, CFUNCTYPE, Structure, byref -from threading import Thread, Condition, Event +from threading import Thread, Condition, Event, Semaphore import weakref from ..ocf import OcfLib @@ -23,7 +23,7 @@ class Queue: pass -def io_queue_run(*, queue: Queue, kick: Condition, stop: Event): +def io_queue_run(*, queue: Queue, kick: Condition, stop: Event, sem: Semaphore): def wait_predicate(): return stop.is_set() or OcfLib.getInstance().ocf_queue_pending_io(queue) @@ -31,7 +31,9 @@ def io_queue_run(*, queue: Queue, kick: Condition, stop: Event): with kick: kick.wait_for(wait_predicate) + sem.acquire() OcfLib.getInstance().ocf_queue_run(queue) + sem.release() if stop.is_set() and not OcfLib.getInstance().ocf_queue_pending_io(queue): break @@ -56,6 +58,7 @@ class Queue: self.stop_event = Event() self.kick_condition = Condition() + self.sem = Semaphore() self.thread = Thread( group=None, target=io_queue_run, @@ -64,6 +67,7 @@ class Queue: "queue": self, "kick": self.kick_condition, "stop": self.stop_event, + "sem": self.sem, }, ) self.thread.start() @@ -103,3 +107,37 @@ class Queue: self.kick_condition.notify_all() self.thread.join() + + # settle - wait for OCF to finish execution within this queue context + # + # In some tests simply waiting for I/O to finish is not enough. Most + # notably some statistics are potentially incremented after user triggered + # I/O is finished. This is due to asynchronous nature of I/O operations + # and OCF approach to statistics update, where only eventual consistency + # is guaranteed without explicit mechanism available to query whether + # the final state is reached yet. However it is fully within the adapter power + # to determine this, as OCF executes in context of API calls from the + # adapter (like I/O submission, ocf_queue_run, ocf_cleaner_run, management + # operations) and I/O completion callbacks. Queue settle is a mechanism to + # assure ocf_queue_run is not being executed by the thread associated with + # a queue. + # + # With queue settle mechanism it is straightforward for the client to + # wait for cache statistics to reach fixed values: + # 1. wait for all I/O to OCF to finish + # 2. settle all I/O queues + # 3. make sure background cleaner is not active + # + def settle(self): + busy = not self.sem.acquire(blocking=False) + if busy: + self.sem.acquire() + self.sem.release() + return busy + + # wait until all queues provided settle + @staticmethod + def settle_many(qlist: [Queue]): + status = [True] + while any(status): + status = [q.settle() for q in qlist] From 9360e55851c448cae2ae3eb18db92bad2f11cb3c Mon Sep 17 00:00:00 2001 From: Jan Musial Date: Thu, 2 Sep 2021 15:14:17 +0200 Subject: [PATCH 4/9] Update basic pyOCF tests to use rio Signed-off-by: Jan Musial Signed-off-by: Adam Rutkowski --- tests/functional/tests/basic/test_pyocf.py | 14 +++----------- 1 file changed, 3 insertions(+), 11 deletions(-) diff --git a/tests/functional/tests/basic/test_pyocf.py b/tests/functional/tests/basic/test_pyocf.py index 6899ea9..7a36b1d 100644 --- a/tests/functional/tests/basic/test_pyocf.py +++ b/tests/functional/tests/basic/test_pyocf.py @@ -13,6 +13,7 @@ from pyocf.types.data import Data from pyocf.types.io import IoDir from pyocf.utils import Size as S from pyocf.types.shared import OcfError, OcfCompletion +from pyocf.rio import Rio, ReadWrite def test_ctx_fixture(pyocf_ctx): @@ -31,18 +32,9 @@ def test_simple_wt_write(pyocf_ctx): cache_device.reset_stats() core_device.reset_stats() - write_data = Data.from_string("This is test data") - io = core.new_io(cache.get_default_queue(), S.from_sector(1).B, - write_data.size, IoDir.WRITE, 0, 0) - io.set_data(write_data) - - cmpl = OcfCompletion([("err", c_int)]) - io.callback = cmpl.callback - io.submit() - cmpl.wait() - - assert cmpl.results["err"] == 0 + r = Rio().target(core).readwrite(ReadWrite.WRITE).size(S.from_sector(1)).run() assert cache_device.get_stats()[IoDir.WRITE] == 1 + cache.settle() stats = cache.get_stats() assert stats["req"]["wr_full_misses"]["value"] == 1 assert stats["usage"]["occupancy"]["value"] == 1 From 9baf9a3876313875875827980f217fdaa37575ce Mon Sep 17 00:00:00 2001 From: Adam Rutkowski Date: Wed, 29 Dec 2021 23:55:45 +0100 Subject: [PATCH 5/9] rio: convert Size to bytes for extent calculation Signed-off-by: Adam Rutkowski --- tests/functional/pyocf/rio.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/functional/pyocf/rio.py b/tests/functional/pyocf/rio.py index e53e95a..e857e02 100644 --- a/tests/functional/pyocf/rio.py +++ b/tests/functional/pyocf/rio.py @@ -35,7 +35,7 @@ class IoGen: self.random = random self.randommap = randommap - gen = list(range(extent[0], extent[1], blocksize)) + gen = list(range(extent[0].B, extent[0].B + extent[1].B, blocksize.B)) self.cycle_len = len(gen) if random: From 372b1f1e1c82983929a78633194c52d518009030 Mon Sep 17 00:00:00 2001 From: Adam Rutkowski Date: Tue, 26 Oct 2021 16:51:07 +0200 Subject: [PATCH 6/9] remove unnecessary callback decorator in rio Signed-off-by: Adam Rutkowski --- tests/functional/pyocf/rio.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/functional/pyocf/rio.py b/tests/functional/pyocf/rio.py index e857e02..09ef495 100644 --- a/tests/functional/pyocf/rio.py +++ b/tests/functional/pyocf/rio.py @@ -106,7 +106,6 @@ class Rio: return False def get_io_cb(self): - @CFUNCTYPE(c_void_p, c_int) def cb(error): if error != 0: self.errors.append(error) From a6d8bd0470fb4fe34d6790152e0adab4c181b1b2 Mon Sep 17 00:00:00 2001 From: Adam Rutkowski Date: Fri, 5 Nov 2021 16:44:59 +0100 Subject: [PATCH 7/9] rio: accept queue list in Rio constructor Signed-off-by: Adam Rutkowski Signed-off-by: Jan Musial --- tests/functional/pyocf/rio.py | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/tests/functional/pyocf/rio.py b/tests/functional/pyocf/rio.py index 09ef495..f11cba9 100644 --- a/tests/functional/pyocf/rio.py +++ b/tests/functional/pyocf/rio.py @@ -79,9 +79,10 @@ class JobSpec: class Rio: class RioThread(Thread): - def __init__(self, jobspec: JobSpec): + def __init__(self, jobspec: JobSpec, queue): super().__init__() self.jobspec = jobspec + self.queue = queue self.ios = Size(0) self.io_target = 0 self.finish_time = None @@ -153,7 +154,7 @@ class Rio: data = Data(self.jobspec.bs) # TODO pattern and verify io = self.jobspec.target.new_io( - self.jobspec.target.cache.get_default_queue(), # TODO multiple queues? + self.queue, next(iogen), self.jobspec.bs, iodir, @@ -260,12 +261,12 @@ class Rio: self._threads = [] self.errors = {} - def run(self): - self.run_async() + def run(self, queues=None): + self.run_async(queues) self.wait_for_completion() return self - def run_async(self): + def run_async(self, queues=None): self.clear() jobs = deepcopy(self.jobs) @@ -273,9 +274,13 @@ class Rio: if not jobs: jobs = [self.global_jobspec for _ in range(self.global_jobspec.njobs)] + if not queues: + queues = [self.global_jobspec.target.cache.get_default_queue()] + queues = cycle(queues) + for job in jobs: spec = job.merge(self.global_jobspec) - thread = Rio.RioThread(spec) + thread = Rio.RioThread(spec, next(queues)) self._threads.append(thread) for thread in self._threads: From cd0551e72ecc8a592700510c9d324a5b81deab72 Mon Sep 17 00:00:00 2001 From: Jan Musial Date: Tue, 31 Aug 2021 12:21:40 +0200 Subject: [PATCH 8/9] Use rio in promotion policy tests Signed-off-by: Jan Musial --- tests/functional/tests/engine/test_pp.py | 164 +++++++---------------- 1 file changed, 50 insertions(+), 114 deletions(-) diff --git a/tests/functional/tests/engine/test_pp.py b/tests/functional/tests/engine/test_pp.py index c9dde8d..5fab028 100644 --- a/tests/functional/tests/engine/test_pp.py +++ b/tests/functional/tests/engine/test_pp.py @@ -6,6 +6,7 @@ from ctypes import c_int import pytest import math +from datetime import timedelta from pyocf.types.cache import Cache, PromotionPolicy, NhitParams from pyocf.types.core import Core @@ -14,6 +15,7 @@ from pyocf.types.data import Data from pyocf.types.io import IoDir from pyocf.utils import Size from pyocf.types.shared import OcfCompletion +from pyocf.rio import Rio, ReadWrite @pytest.mark.parametrize("promotion_policy", PromotionPolicy) @@ -62,49 +64,35 @@ def test_change_to_nhit_and_back_io_in_flight(pyocf_ctx): cache.add_core(core) # Step 2 - completions = [] - for i in range(2000): - comp = OcfCompletion([("error", c_int)]) - write_data = Data(4096) - io = core.new_io( - cache.get_default_queue(), i * 4096, write_data.size, IoDir.WRITE, 0, 0 - ) - completions += [comp] - io.set_data(write_data) - io.callback = comp.callback - io.submit() + r = ( + Rio() + .target(core) + .njobs(10) + .bs(Size.from_KiB(4)) + .readwrite(ReadWrite.RANDWRITE) + .size(core_device.size) + .time_based() + .time(timedelta(minutes=1)) + .qd(10) + .run_async() + ) # Step 3 cache.set_promotion_policy(PromotionPolicy.NHIT) # Step 4 - for c in completions: - c.wait() - assert not c.results["error"], "No IO's should fail when turning NHIT policy on" + r.abort() + assert r.error_count == 0, "No IO's should fail when turning NHIT policy on" # Step 5 - completions = [] - for i in range(2000): - comp = OcfCompletion([("error", c_int)]) - write_data = Data(4096) - io = core.new_io( - cache.get_default_queue(), i * 4096, write_data.size, IoDir.WRITE, 0, 0 - ) - completions += [comp] - io.set_data(write_data) - io.callback = comp.callback - io.submit() + r.run_async() # Step 6 cache.set_promotion_policy(PromotionPolicy.ALWAYS) # Step 7 - for c in completions: - c.wait() - assert not c.results[ - "error" - ], "No IO's should fail when turning NHIT policy off" - + r.abort() + assert r.error_count == 0, "No IO's should fail when turning NHIT policy off" def fill_cache(cache, fill_ratio): """ @@ -116,47 +104,19 @@ def fill_cache(cache, fill_ratio): cache_lines = cache.get_stats()["conf"]["size"] - bytes_to_fill = cache_lines.bytes * fill_ratio - max_io_size = cache.device.get_max_io_size().bytes - - ios_to_issue = math.floor(bytes_to_fill / max_io_size) + bytes_to_fill = Size(round(cache_lines.bytes * fill_ratio)) core = cache.cores[0] - completions = [] - for i in range(ios_to_issue): - comp = OcfCompletion([("error", c_int)]) - write_data = Data(max_io_size) - io = core.new_io( - cache.get_default_queue(), - i * max_io_size, - write_data.size, - IoDir.WRITE, - 0, - 0, - ) - io.set_data(write_data) - io.callback = comp.callback - completions += [comp] - io.submit() - if bytes_to_fill % max_io_size: - comp = OcfCompletion([("error", c_int)]) - write_data = Data(Size.from_B(bytes_to_fill % max_io_size, sector_aligned=True)) - io = core.new_io( - cache.get_default_queue(), - ios_to_issue * max_io_size, - write_data.size, - IoDir.WRITE, - 0, - 0, - ) - io.set_data(write_data) - io.callback = comp.callback - completions += [comp] - io.submit() - - for c in completions: - c.wait() + r = ( + Rio() + .target(core) + .readwrite(ReadWrite.RANDWRITE) + .size(bytes_to_fill) + .bs(Size(512)) + .qd(10) + .run() + ) @pytest.mark.parametrize("fill_percentage", [0, 1, 50, 99]) @@ -194,33 +154,27 @@ def test_promoted_after_hits_various_thresholds( # Step 3 fill_cache(cache, fill_percentage / 100) + cache.settle() stats = cache.get_stats() cache_lines = stats["conf"]["size"] assert stats["usage"]["occupancy"]["fraction"] // 10 == fill_percentage * 10 filled_occupancy = stats["usage"]["occupancy"]["value"] # Step 4 - last_core_line = int(core_device.size) - cache_lines.line_size - completions = [] + last_core_line = Size(int(core_device.size) - cache_lines.line_size) + r = ( + Rio() + .readwrite(ReadWrite.WRITE) + .bs(Size(4096)) + .offset(last_core_line) + .target(core) + .size(Size(4096) + last_core_line) + ) + for i in range(insertion_threshold - 1): - comp = OcfCompletion([("error", c_int)]) - write_data = Data(cache_lines.line_size) - io = core.new_io( - cache.get_default_queue(), - last_core_line, - write_data.size, - IoDir.WRITE, - 0, - 0, - ) - completions += [comp] - io.set_data(write_data) - io.callback = comp.callback - io.submit() - - for c in completions: - c.wait() + r.run() + cache.settle() stats = cache.get_stats() threshold_reached_occupancy = stats["usage"]["occupancy"]["value"] assert threshold_reached_occupancy == filled_occupancy, ( @@ -229,20 +183,13 @@ def test_promoted_after_hits_various_thresholds( ) # Step 5 - comp = OcfCompletion([("error", c_int)]) - write_data = Data(cache_lines.line_size) - io = core.new_io( - cache.get_default_queue(), last_core_line, write_data.size, IoDir.WRITE, 0, 0 - ) - io.set_data(write_data) - io.callback = comp.callback - io.submit() - - comp.wait() + r.run() + cache.settle() + stats = cache.get_stats() assert ( threshold_reached_occupancy - == cache.get_stats()["usage"]["occupancy"]["value"] - 1 + == stats["usage"]["occupancy"]["value"] - 1 ), "Previous request should be promoted and occupancy should rise" @@ -268,14 +215,7 @@ def test_partial_hit_promotion(pyocf_ctx): cache.add_core(core) # Step 2 - comp = OcfCompletion([("error", c_int)]) - write_data = Data(Size.from_sector(1)) - io = core.new_io(cache.get_default_queue(), 0, write_data.size, IoDir.READ, 0, 0) - io.set_data(write_data) - io.callback = comp.callback - io.submit() - - comp.wait() + r = Rio().readwrite(ReadWrite.READ).bs(Size(512)).size(Size(512)).target(core).run() stats = cache.get_stats() cache_lines = stats["conf"]["size"] @@ -291,14 +231,10 @@ def test_partial_hit_promotion(pyocf_ctx): ) # Step 4 - comp = OcfCompletion([("error", c_int)]) - write_data = Data(2 * cache_lines.line_size) - io = core.new_io(cache.get_default_queue(), 0, write_data.size, IoDir.WRITE, 0, 0) - io.set_data(write_data) - io.callback = comp.callback - io.submit() - comp.wait() + req_size = Size(2 * cache_lines.line_size) + r.size(req_size).bs(req_size).readwrite(ReadWrite.WRITE).run() + cache.settle() stats = cache.get_stats() assert ( stats["usage"]["occupancy"]["value"] == 2 From 765d1c5d77567053958121b997b072d007328879 Mon Sep 17 00:00:00 2001 From: Robert Baldyga Date: Mon, 3 Jan 2022 19:27:07 +0100 Subject: [PATCH 9/9] rio: fix size comparison in Signed-off-by: Robert Baldyga --- tests/functional/pyocf/rio.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/functional/pyocf/rio.py b/tests/functional/pyocf/rio.py index f11cba9..6231b42 100644 --- a/tests/functional/pyocf/rio.py +++ b/tests/functional/pyocf/rio.py @@ -134,7 +134,7 @@ class Rio: if self.jobspec.time_based: self.finish_time = datetime.now() + self.jobspec.time else: - if self.jobspec.io_size != 0: + if int(self.jobspec.io_size) != 0: self.io_target = min( self.jobspec.io_size, self.jobspec.size - self.jobspec.offset )