pyocf: queue settle functionality
Queue settle is a mechanism to wait for all OCF processing on a given queue to finish. In some tests simply waiting for I/O to finish is not enough. Most notably some statistics are potentially incremented after user triggered I/O is finished. This is due to asynchronous nature of I/O operations and OCF approach to statistics update, where only eventual consistency is guaranteed without explicit mechanism available to query whether the final state is reached yet. However it is fully in the adapter power to determine this, as OCF executes in context of API calls from the adapter (like I/O submission, ocf_queue_run, ocf_cleaner_run, management operations) and I/O completion callbacks. Queue settle is a mechanism to assure ocf_queue_run is not being executed by the thread associated with a queue. With queue settle mechanism it is straightforward for the adapter to wait for cache statistics to reach fixed values: 1. wait for all I/O to OCF to finish 2. settle all I/O queues 3. make sure background cleaner is not active Signed-off-by: Adam Rutkowski <adam.j.rutkowski@intel.com>
This commit is contained in:
parent
41732090ae
commit
17eddab094
@ -798,6 +798,9 @@ class Cache:
|
|||||||
finally:
|
finally:
|
||||||
self.read_unlock()
|
self.read_unlock()
|
||||||
|
|
||||||
|
# settle all queues accociated with this cache (mngt and I/O)
|
||||||
|
def settle(self):
|
||||||
|
Queue.settle_many(self.io_queues + [self.mngt_queue])
|
||||||
|
|
||||||
lib = OcfLib.getInstance()
|
lib = OcfLib.getInstance()
|
||||||
lib.ocf_mngt_cache_remove_core.argtypes = [c_void_p, c_void_p, c_void_p]
|
lib.ocf_mngt_cache_remove_core.argtypes = [c_void_p, c_void_p, c_void_p]
|
||||||
|
@ -4,7 +4,7 @@
|
|||||||
#
|
#
|
||||||
|
|
||||||
from ctypes import c_void_p, CFUNCTYPE, Structure, byref
|
from ctypes import c_void_p, CFUNCTYPE, Structure, byref
|
||||||
from threading import Thread, Condition, Event
|
from threading import Thread, Condition, Event, Semaphore
|
||||||
import weakref
|
import weakref
|
||||||
|
|
||||||
from ..ocf import OcfLib
|
from ..ocf import OcfLib
|
||||||
@ -23,7 +23,7 @@ class Queue:
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
def io_queue_run(*, queue: Queue, kick: Condition, stop: Event):
|
def io_queue_run(*, queue: Queue, kick: Condition, stop: Event, sem: Semaphore):
|
||||||
def wait_predicate():
|
def wait_predicate():
|
||||||
return stop.is_set() or OcfLib.getInstance().ocf_queue_pending_io(queue)
|
return stop.is_set() or OcfLib.getInstance().ocf_queue_pending_io(queue)
|
||||||
|
|
||||||
@ -31,7 +31,9 @@ def io_queue_run(*, queue: Queue, kick: Condition, stop: Event):
|
|||||||
with kick:
|
with kick:
|
||||||
kick.wait_for(wait_predicate)
|
kick.wait_for(wait_predicate)
|
||||||
|
|
||||||
|
sem.acquire()
|
||||||
OcfLib.getInstance().ocf_queue_run(queue)
|
OcfLib.getInstance().ocf_queue_run(queue)
|
||||||
|
sem.release()
|
||||||
|
|
||||||
if stop.is_set() and not OcfLib.getInstance().ocf_queue_pending_io(queue):
|
if stop.is_set() and not OcfLib.getInstance().ocf_queue_pending_io(queue):
|
||||||
break
|
break
|
||||||
@ -56,6 +58,7 @@ class Queue:
|
|||||||
|
|
||||||
self.stop_event = Event()
|
self.stop_event = Event()
|
||||||
self.kick_condition = Condition()
|
self.kick_condition = Condition()
|
||||||
|
self.sem = Semaphore()
|
||||||
self.thread = Thread(
|
self.thread = Thread(
|
||||||
group=None,
|
group=None,
|
||||||
target=io_queue_run,
|
target=io_queue_run,
|
||||||
@ -64,6 +67,7 @@ class Queue:
|
|||||||
"queue": self,
|
"queue": self,
|
||||||
"kick": self.kick_condition,
|
"kick": self.kick_condition,
|
||||||
"stop": self.stop_event,
|
"stop": self.stop_event,
|
||||||
|
"sem": self.sem,
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
self.thread.start()
|
self.thread.start()
|
||||||
@ -103,3 +107,37 @@ class Queue:
|
|||||||
self.kick_condition.notify_all()
|
self.kick_condition.notify_all()
|
||||||
|
|
||||||
self.thread.join()
|
self.thread.join()
|
||||||
|
|
||||||
|
# settle - wait for OCF to finish execution within this queue context
|
||||||
|
#
|
||||||
|
# In some tests simply waiting for I/O to finish is not enough. Most
|
||||||
|
# notably some statistics are potentially incremented after user triggered
|
||||||
|
# I/O is finished. This is due to asynchronous nature of I/O operations
|
||||||
|
# and OCF approach to statistics update, where only eventual consistency
|
||||||
|
# is guaranteed without explicit mechanism available to query whether
|
||||||
|
# the final state is reached yet. However it is fully within the adapter power
|
||||||
|
# to determine this, as OCF executes in context of API calls from the
|
||||||
|
# adapter (like I/O submission, ocf_queue_run, ocf_cleaner_run, management
|
||||||
|
# operations) and I/O completion callbacks. Queue settle is a mechanism to
|
||||||
|
# assure ocf_queue_run is not being executed by the thread associated with
|
||||||
|
# a queue.
|
||||||
|
#
|
||||||
|
# With queue settle mechanism it is straightforward for the client to
|
||||||
|
# wait for cache statistics to reach fixed values:
|
||||||
|
# 1. wait for all I/O to OCF to finish
|
||||||
|
# 2. settle all I/O queues
|
||||||
|
# 3. make sure background cleaner is not active
|
||||||
|
#
|
||||||
|
def settle(self):
|
||||||
|
busy = not self.sem.acquire(blocking=False)
|
||||||
|
if busy:
|
||||||
|
self.sem.acquire()
|
||||||
|
self.sem.release()
|
||||||
|
return busy
|
||||||
|
|
||||||
|
# wait until all queues provided settle
|
||||||
|
@staticmethod
|
||||||
|
def settle_many(qlist: [Queue]):
|
||||||
|
status = [True]
|
||||||
|
while any(status):
|
||||||
|
status = [q.settle() for q in qlist]
|
||||||
|
Loading…
Reference in New Issue
Block a user