diff --git a/tests/functional/pyocf/types/cache.py b/tests/functional/pyocf/types/cache.py index a90598f..3efc8ca 100644 --- a/tests/functional/pyocf/types/cache.py +++ b/tests/functional/pyocf/types/cache.py @@ -798,6 +798,9 @@ class Cache: finally: self.read_unlock() + # settle all queues accociated with this cache (mngt and I/O) + def settle(self): + Queue.settle_many(self.io_queues + [self.mngt_queue]) lib = OcfLib.getInstance() lib.ocf_mngt_cache_remove_core.argtypes = [c_void_p, c_void_p, c_void_p] diff --git a/tests/functional/pyocf/types/queue.py b/tests/functional/pyocf/types/queue.py index 112a848..f38ff35 100644 --- a/tests/functional/pyocf/types/queue.py +++ b/tests/functional/pyocf/types/queue.py @@ -4,7 +4,7 @@ # from ctypes import c_void_p, CFUNCTYPE, Structure, byref -from threading import Thread, Condition, Event +from threading import Thread, Condition, Event, Semaphore import weakref from ..ocf import OcfLib @@ -23,7 +23,7 @@ class Queue: pass -def io_queue_run(*, queue: Queue, kick: Condition, stop: Event): +def io_queue_run(*, queue: Queue, kick: Condition, stop: Event, sem: Semaphore): def wait_predicate(): return stop.is_set() or OcfLib.getInstance().ocf_queue_pending_io(queue) @@ -31,7 +31,9 @@ def io_queue_run(*, queue: Queue, kick: Condition, stop: Event): with kick: kick.wait_for(wait_predicate) + sem.acquire() OcfLib.getInstance().ocf_queue_run(queue) + sem.release() if stop.is_set() and not OcfLib.getInstance().ocf_queue_pending_io(queue): break @@ -56,6 +58,7 @@ class Queue: self.stop_event = Event() self.kick_condition = Condition() + self.sem = Semaphore() self.thread = Thread( group=None, target=io_queue_run, @@ -64,6 +67,7 @@ class Queue: "queue": self, "kick": self.kick_condition, "stop": self.stop_event, + "sem": self.sem, }, ) self.thread.start() @@ -103,3 +107,37 @@ class Queue: self.kick_condition.notify_all() self.thread.join() + + # settle - wait for OCF to finish execution within this queue context + # + # In some tests simply waiting for I/O to finish is not enough. Most + # notably some statistics are potentially incremented after user triggered + # I/O is finished. This is due to asynchronous nature of I/O operations + # and OCF approach to statistics update, where only eventual consistency + # is guaranteed without explicit mechanism available to query whether + # the final state is reached yet. However it is fully within the adapter power + # to determine this, as OCF executes in context of API calls from the + # adapter (like I/O submission, ocf_queue_run, ocf_cleaner_run, management + # operations) and I/O completion callbacks. Queue settle is a mechanism to + # assure ocf_queue_run is not being executed by the thread associated with + # a queue. + # + # With queue settle mechanism it is straightforward for the client to + # wait for cache statistics to reach fixed values: + # 1. wait for all I/O to OCF to finish + # 2. settle all I/O queues + # 3. make sure background cleaner is not active + # + def settle(self): + busy = not self.sem.acquire(blocking=False) + if busy: + self.sem.acquire() + self.sem.release() + return busy + + # wait until all queues provided settle + @staticmethod + def settle_many(qlist: [Queue]): + status = [True] + while any(status): + status = [q.settle() for q in qlist]