From 17eddab0945639e48a7d154fa84544043fd3b358 Mon Sep 17 00:00:00 2001 From: Adam Rutkowski Date: Fri, 12 Nov 2021 13:07:41 +0100 Subject: [PATCH] pyocf: queue settle functionality Queue settle is a mechanism to wait for all OCF processing on a given queue to finish. In some tests simply waiting for I/O to finish is not enough. Most notably some statistics are potentially incremented after user triggered I/O is finished. This is due to asynchronous nature of I/O operations and OCF approach to statistics update, where only eventual consistency is guaranteed without explicit mechanism available to query whether the final state is reached yet. However it is fully in the adapter power to determine this, as OCF executes in context of API calls from the adapter (like I/O submission, ocf_queue_run, ocf_cleaner_run, management operations) and I/O completion callbacks. Queue settle is a mechanism to assure ocf_queue_run is not being executed by the thread associated with a queue. With queue settle mechanism it is straightforward for the adapter to wait for cache statistics to reach fixed values: 1. wait for all I/O to OCF to finish 2. settle all I/O queues 3. make sure background cleaner is not active Signed-off-by: Adam Rutkowski --- tests/functional/pyocf/types/cache.py | 3 ++ tests/functional/pyocf/types/queue.py | 42 +++++++++++++++++++++++++-- 2 files changed, 43 insertions(+), 2 deletions(-) diff --git a/tests/functional/pyocf/types/cache.py b/tests/functional/pyocf/types/cache.py index a90598f..3efc8ca 100644 --- a/tests/functional/pyocf/types/cache.py +++ b/tests/functional/pyocf/types/cache.py @@ -798,6 +798,9 @@ class Cache: finally: self.read_unlock() + # settle all queues accociated with this cache (mngt and I/O) + def settle(self): + Queue.settle_many(self.io_queues + [self.mngt_queue]) lib = OcfLib.getInstance() lib.ocf_mngt_cache_remove_core.argtypes = [c_void_p, c_void_p, c_void_p] diff --git a/tests/functional/pyocf/types/queue.py b/tests/functional/pyocf/types/queue.py index 112a848..f38ff35 100644 --- a/tests/functional/pyocf/types/queue.py +++ b/tests/functional/pyocf/types/queue.py @@ -4,7 +4,7 @@ # from ctypes import c_void_p, CFUNCTYPE, Structure, byref -from threading import Thread, Condition, Event +from threading import Thread, Condition, Event, Semaphore import weakref from ..ocf import OcfLib @@ -23,7 +23,7 @@ class Queue: pass -def io_queue_run(*, queue: Queue, kick: Condition, stop: Event): +def io_queue_run(*, queue: Queue, kick: Condition, stop: Event, sem: Semaphore): def wait_predicate(): return stop.is_set() or OcfLib.getInstance().ocf_queue_pending_io(queue) @@ -31,7 +31,9 @@ def io_queue_run(*, queue: Queue, kick: Condition, stop: Event): with kick: kick.wait_for(wait_predicate) + sem.acquire() OcfLib.getInstance().ocf_queue_run(queue) + sem.release() if stop.is_set() and not OcfLib.getInstance().ocf_queue_pending_io(queue): break @@ -56,6 +58,7 @@ class Queue: self.stop_event = Event() self.kick_condition = Condition() + self.sem = Semaphore() self.thread = Thread( group=None, target=io_queue_run, @@ -64,6 +67,7 @@ class Queue: "queue": self, "kick": self.kick_condition, "stop": self.stop_event, + "sem": self.sem, }, ) self.thread.start() @@ -103,3 +107,37 @@ class Queue: self.kick_condition.notify_all() self.thread.join() + + # settle - wait for OCF to finish execution within this queue context + # + # In some tests simply waiting for I/O to finish is not enough. Most + # notably some statistics are potentially incremented after user triggered + # I/O is finished. This is due to asynchronous nature of I/O operations + # and OCF approach to statistics update, where only eventual consistency + # is guaranteed without explicit mechanism available to query whether + # the final state is reached yet. However it is fully within the adapter power + # to determine this, as OCF executes in context of API calls from the + # adapter (like I/O submission, ocf_queue_run, ocf_cleaner_run, management + # operations) and I/O completion callbacks. Queue settle is a mechanism to + # assure ocf_queue_run is not being executed by the thread associated with + # a queue. + # + # With queue settle mechanism it is straightforward for the client to + # wait for cache statistics to reach fixed values: + # 1. wait for all I/O to OCF to finish + # 2. settle all I/O queues + # 3. make sure background cleaner is not active + # + def settle(self): + busy = not self.sem.acquire(blocking=False) + if busy: + self.sem.acquire() + self.sem.release() + return busy + + # wait until all queues provided settle + @staticmethod + def settle_many(qlist: [Queue]): + status = [True] + while any(status): + status = [q.settle() for q in qlist]