Merge pull request #588 from arutk/rio-io-tester

Rio - I/O exerciser for pyocf
This commit is contained in:
Robert Baldyga 2022-03-01 14:07:04 +01:00 committed by GitHub
commit b94518130a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 403 additions and 127 deletions

View File

@ -0,0 +1,289 @@
#
# Copyright(c) 2021 Intel Corporation
# SPDX-License-Identifier: BSD-3-Clause
#
from ctypes import c_int, c_void_p, CFUNCTYPE
from enum import Enum, auto
from random import Random
from dataclasses import dataclass
from datetime import timedelta, datetime
from itertools import cycle
from threading import Thread, Condition, Event
from copy import deepcopy
from pyocf.utils import Size
from pyocf.types.volume import Volume
from pyocf.types.io import Io, IoDir
from pyocf.types.data import Data
class ReadWrite(Enum):
READ = auto()
WRITE = auto()
RANDREAD = auto()
RANDWRITE = auto()
RANDRW = auto()
def is_random(self):
return self in [self.RANDREAD, self.RANDWRITE, self.RANDRW]
class IoGen:
def __init__(self, extent, blocksize=512, seed=0, random=False, randommap=True):
self.random_gen = Random(seed)
self.random = random
self.randommap = randommap
gen = list(range(extent[0].B, extent[0].B + extent[1].B, blocksize.B))
self.cycle_len = len(gen)
if random:
self.random_gen.shuffle(gen)
if self.randommap:
self.gen = cycle(gen)
self.gen_fcn = lambda: next(self.gen)
else:
self.gen = gen
self.gen_fcn = lambda: self.random_gen.choice(self.gen)
def __iter__(self):
return self
def __next__(self):
return self.gen_fcn()
@dataclass
class JobSpec:
readwrite: ReadWrite = ReadWrite.READ
randseed: int = 1
rwmixwrite: int = 50
randommap: bool = True
bs: Size = Size.from_B(512)
offset: Size = Size(0)
njobs: int = 1
qd: int = 1
size: Size = Size(0)
io_size: Size = Size(0)
target: Volume = None
time_based: bool = False
time: timedelta = None
continue_on_error: bool = False
def merge(self, other):
# TODO implement
return other
class Rio:
class RioThread(Thread):
def __init__(self, jobspec: JobSpec, queue):
super().__init__()
self.jobspec = jobspec
self.queue = queue
self.ios = Size(0)
self.io_target = 0
self.finish_time = None
self.qd_condition = Condition()
self.qd = 0
self.stop_event = Event()
self.errors = []
def should_finish(self):
if self.stop_event.is_set():
return True
if self.jobspec.time_based:
if datetime.now() >= self.finish_time:
return True
elif self.ios >= self.io_target:
return True
return False
def get_io_cb(self):
def cb(error):
if error != 0:
self.errors.append(error)
if not self.jobspec.continue_on_error:
print(f"Aborting on error {error}")
self.abort()
with self.qd_condition as c:
self.qd -= 1
self.qd_condition.notify_all()
return cb
def abort(self):
self.stop_event.set()
def run(self):
iogen = IoGen(
(self.jobspec.offset, self.jobspec.size),
self.jobspec.bs,
self.jobspec.randseed,
self.jobspec.readwrite.is_random(),
self.jobspec.randommap,
)
if self.jobspec.time_based:
self.finish_time = datetime.now() + self.jobspec.time
else:
if int(self.jobspec.io_size) != 0:
self.io_target = min(
self.jobspec.io_size, self.jobspec.size - self.jobspec.offset
)
else:
self.io_target = self.jobspec.size - self.jobspec.offset
# TODO randrw
iodir = (
IoDir.WRITE
if self.jobspec.readwrite in [ReadWrite.WRITE, ReadWrite.RANDWRITE]
else IoDir.READ
)
while not self.should_finish():
with self.qd_condition:
self.qd_condition.wait_for(lambda: self.qd <= self.jobspec.qd)
data = Data(self.jobspec.bs) # TODO pattern and verify
io = self.jobspec.target.new_io(
self.queue,
next(iogen),
self.jobspec.bs,
iodir,
0,
0,
)
io.set_data(data)
io.callback = self.get_io_cb()
self.ios += self.jobspec.bs
io.submit()
with self.qd_condition:
self.qd += 1
with self.qd_condition:
self.qd_condition.wait_for(lambda: self.qd == 0)
def __init__(self):
self.global_jobspec = JobSpec()
self.jobs = []
self._threads = []
self.errors = {}
self.error_count = 0
def readwrite(self, rw: ReadWrite):
self.global_jobspec.readwrite = rw
return self
def rwmixwrite(self, mix: int):
self.global_jobspec.rwmixwrite = mix
return self
def rwmixread(self, mix: int):
self.global_jobspec.rwmixwrite = 100 - mix
return self
def norandommap(self):
self.global_jobspec.randommap = False
return self
def bs(self, bs: Size):
self.global_jobspec.bs = bs
return self
def offset(self, offset: Size):
self.global_jobspec.offset = offset
return self
def njobs(self, njobs: int):
self.global_jobspec.njobs = njobs
return self
def qd(self, qd: int):
self.global_jobspec.qd = qd
return self
def target(self, target: Volume):
self.global_jobspec.target = target
return self
def add_job(self, job: JobSpec):
self.jobs.append(job)
return self
def size(self, size: Size):
self.global_jobspec.size = size
return self
def io_size(self, size: Size):
self.global_jobspec.io_size = size
return self
def time_based(self):
self.global_jobspec.time_based = True
return self
def time(self, time: timedelta):
self.global_jobspec.time = time
return self
def continue_on_error(self):
self.global_jobspec.continue_on_error = True
return self
def abort(self):
for thread in self._threads:
thread.abort()
self.wait_for_completion()
return self
def wait_for_completion(self):
for thread in self._threads:
thread.join()
self.errors.update({thread.name: thread.errors})
self.error_count += len(thread.errors)
return self
def __del__(self):
self.wait_for_completion()
def clear(self):
self._threads = []
self.errors = {}
def run(self, queues=None):
self.run_async(queues)
self.wait_for_completion()
return self
def run_async(self, queues=None):
self.clear()
jobs = deepcopy(self.jobs)
if not jobs:
jobs = [self.global_jobspec for _ in range(self.global_jobspec.njobs)]
if not queues:
queues = [self.global_jobspec.target.cache.get_default_queue()]
queues = cycle(queues)
for job in jobs:
spec = job.merge(self.global_jobspec)
thread = Rio.RioThread(spec, next(queues))
self._threads.append(thread)
for thread in self._threads:
thread.start()
return self

View File

@ -798,6 +798,9 @@ class Cache:
finally:
self.read_unlock()
# settle all queues accociated with this cache (mngt and I/O)
def settle(self):
Queue.settle_many(self.io_queues + [self.mngt_queue])
lib = OcfLib.getInstance()
lib.ocf_mngt_cache_remove_core.argtypes = [c_void_p, c_void_p, c_void_p]

View File

@ -4,7 +4,7 @@
#
from ctypes import c_void_p, CFUNCTYPE, Structure, byref
from threading import Thread, Condition, Event
from threading import Thread, Condition, Event, Semaphore
import weakref
from ..ocf import OcfLib
@ -23,7 +23,7 @@ class Queue:
pass
def io_queue_run(*, queue: Queue, kick: Condition, stop: Event):
def io_queue_run(*, queue: Queue, kick: Condition, stop: Event, sem: Semaphore):
def wait_predicate():
return stop.is_set() or OcfLib.getInstance().ocf_queue_pending_io(queue)
@ -31,7 +31,9 @@ def io_queue_run(*, queue: Queue, kick: Condition, stop: Event):
with kick:
kick.wait_for(wait_predicate)
sem.acquire()
OcfLib.getInstance().ocf_queue_run(queue)
sem.release()
if stop.is_set() and not OcfLib.getInstance().ocf_queue_pending_io(queue):
break
@ -56,6 +58,7 @@ class Queue:
self.stop_event = Event()
self.kick_condition = Condition()
self.sem = Semaphore()
self.thread = Thread(
group=None,
target=io_queue_run,
@ -64,6 +67,7 @@ class Queue:
"queue": self,
"kick": self.kick_condition,
"stop": self.stop_event,
"sem": self.sem,
},
)
self.thread.start()
@ -103,3 +107,37 @@ class Queue:
self.kick_condition.notify_all()
self.thread.join()
# settle - wait for OCF to finish execution within this queue context
#
# In some tests simply waiting for I/O to finish is not enough. Most
# notably some statistics are potentially incremented after user triggered
# I/O is finished. This is due to asynchronous nature of I/O operations
# and OCF approach to statistics update, where only eventual consistency
# is guaranteed without explicit mechanism available to query whether
# the final state is reached yet. However it is fully within the adapter power
# to determine this, as OCF executes in context of API calls from the
# adapter (like I/O submission, ocf_queue_run, ocf_cleaner_run, management
# operations) and I/O completion callbacks. Queue settle is a mechanism to
# assure ocf_queue_run is not being executed by the thread associated with
# a queue.
#
# With queue settle mechanism it is straightforward for the client to
# wait for cache statistics to reach fixed values:
# 1. wait for all I/O to OCF to finish
# 2. settle all I/O queues
# 3. make sure background cleaner is not active
#
def settle(self):
busy = not self.sem.acquire(blocking=False)
if busy:
self.sem.acquire()
self.sem.release()
return busy
# wait until all queues provided settle
@staticmethod
def settle_many(qlist: [Queue]):
status = [True]
while any(status):
status = [q.settle() for q in qlist]

View File

@ -82,6 +82,24 @@ class Size:
else:
self.bytes = int(b)
def __lt__(self, other):
return int(self) < int(other)
def __le__(self, other):
return int(self) <= int(other)
def __eq__(self, other):
return int(self) == int(other)
def __ne__(self, other):
return int(self) != int(other)
def __gt__(self, other):
return int(self) > int(other)
def __ge__(self, other):
return int(self) >= int(other)
def __int__(self):
return self.bytes

View File

@ -13,6 +13,7 @@ from pyocf.types.data import Data
from pyocf.types.io import IoDir
from pyocf.utils import Size as S
from pyocf.types.shared import OcfError, OcfCompletion
from pyocf.rio import Rio, ReadWrite
def test_ctx_fixture(pyocf_ctx):
@ -31,18 +32,9 @@ def test_simple_wt_write(pyocf_ctx):
cache_device.reset_stats()
core_device.reset_stats()
write_data = Data.from_string("This is test data")
io = core.new_io(cache.get_default_queue(), S.from_sector(1).B,
write_data.size, IoDir.WRITE, 0, 0)
io.set_data(write_data)
cmpl = OcfCompletion([("err", c_int)])
io.callback = cmpl.callback
io.submit()
cmpl.wait()
assert cmpl.results["err"] == 0
r = Rio().target(core).readwrite(ReadWrite.WRITE).size(S.from_sector(1)).run()
assert cache_device.get_stats()[IoDir.WRITE] == 1
cache.settle()
stats = cache.get_stats()
assert stats["req"]["wr_full_misses"]["value"] == 1
assert stats["usage"]["occupancy"]["value"] == 1

View File

@ -6,6 +6,7 @@
from ctypes import c_int
import pytest
import math
from datetime import timedelta
from pyocf.types.cache import Cache, PromotionPolicy, NhitParams
from pyocf.types.core import Core
@ -14,6 +15,7 @@ from pyocf.types.data import Data
from pyocf.types.io import IoDir
from pyocf.utils import Size
from pyocf.types.shared import OcfCompletion
from pyocf.rio import Rio, ReadWrite
@pytest.mark.parametrize("promotion_policy", PromotionPolicy)
@ -62,49 +64,35 @@ def test_change_to_nhit_and_back_io_in_flight(pyocf_ctx):
cache.add_core(core)
# Step 2
completions = []
for i in range(2000):
comp = OcfCompletion([("error", c_int)])
write_data = Data(4096)
io = core.new_io(
cache.get_default_queue(), i * 4096, write_data.size, IoDir.WRITE, 0, 0
)
completions += [comp]
io.set_data(write_data)
io.callback = comp.callback
io.submit()
r = (
Rio()
.target(core)
.njobs(10)
.bs(Size.from_KiB(4))
.readwrite(ReadWrite.RANDWRITE)
.size(core_device.size)
.time_based()
.time(timedelta(minutes=1))
.qd(10)
.run_async()
)
# Step 3
cache.set_promotion_policy(PromotionPolicy.NHIT)
# Step 4
for c in completions:
c.wait()
assert not c.results["error"], "No IO's should fail when turning NHIT policy on"
r.abort()
assert r.error_count == 0, "No IO's should fail when turning NHIT policy on"
# Step 5
completions = []
for i in range(2000):
comp = OcfCompletion([("error", c_int)])
write_data = Data(4096)
io = core.new_io(
cache.get_default_queue(), i * 4096, write_data.size, IoDir.WRITE, 0, 0
)
completions += [comp]
io.set_data(write_data)
io.callback = comp.callback
io.submit()
r.run_async()
# Step 6
cache.set_promotion_policy(PromotionPolicy.ALWAYS)
# Step 7
for c in completions:
c.wait()
assert not c.results[
"error"
], "No IO's should fail when turning NHIT policy off"
r.abort()
assert r.error_count == 0, "No IO's should fail when turning NHIT policy off"
def fill_cache(cache, fill_ratio):
"""
@ -116,47 +104,19 @@ def fill_cache(cache, fill_ratio):
cache_lines = cache.get_stats()["conf"]["size"]
bytes_to_fill = cache_lines.bytes * fill_ratio
max_io_size = cache.device.get_max_io_size().bytes
ios_to_issue = math.floor(bytes_to_fill / max_io_size)
bytes_to_fill = Size(round(cache_lines.bytes * fill_ratio))
core = cache.cores[0]
completions = []
for i in range(ios_to_issue):
comp = OcfCompletion([("error", c_int)])
write_data = Data(max_io_size)
io = core.new_io(
cache.get_default_queue(),
i * max_io_size,
write_data.size,
IoDir.WRITE,
0,
0,
)
io.set_data(write_data)
io.callback = comp.callback
completions += [comp]
io.submit()
if bytes_to_fill % max_io_size:
comp = OcfCompletion([("error", c_int)])
write_data = Data(Size.from_B(bytes_to_fill % max_io_size, sector_aligned=True))
io = core.new_io(
cache.get_default_queue(),
ios_to_issue * max_io_size,
write_data.size,
IoDir.WRITE,
0,
0,
)
io.set_data(write_data)
io.callback = comp.callback
completions += [comp]
io.submit()
for c in completions:
c.wait()
r = (
Rio()
.target(core)
.readwrite(ReadWrite.RANDWRITE)
.size(bytes_to_fill)
.bs(Size(512))
.qd(10)
.run()
)
@pytest.mark.parametrize("fill_percentage", [0, 1, 50, 99])
@ -194,33 +154,27 @@ def test_promoted_after_hits_various_thresholds(
# Step 3
fill_cache(cache, fill_percentage / 100)
cache.settle()
stats = cache.get_stats()
cache_lines = stats["conf"]["size"]
assert stats["usage"]["occupancy"]["fraction"] // 10 == fill_percentage * 10
filled_occupancy = stats["usage"]["occupancy"]["value"]
# Step 4
last_core_line = int(core_device.size) - cache_lines.line_size
completions = []
last_core_line = Size(int(core_device.size) - cache_lines.line_size)
r = (
Rio()
.readwrite(ReadWrite.WRITE)
.bs(Size(4096))
.offset(last_core_line)
.target(core)
.size(Size(4096) + last_core_line)
)
for i in range(insertion_threshold - 1):
comp = OcfCompletion([("error", c_int)])
write_data = Data(cache_lines.line_size)
io = core.new_io(
cache.get_default_queue(),
last_core_line,
write_data.size,
IoDir.WRITE,
0,
0,
)
completions += [comp]
io.set_data(write_data)
io.callback = comp.callback
io.submit()
for c in completions:
c.wait()
r.run()
cache.settle()
stats = cache.get_stats()
threshold_reached_occupancy = stats["usage"]["occupancy"]["value"]
assert threshold_reached_occupancy == filled_occupancy, (
@ -229,20 +183,13 @@ def test_promoted_after_hits_various_thresholds(
)
# Step 5
comp = OcfCompletion([("error", c_int)])
write_data = Data(cache_lines.line_size)
io = core.new_io(
cache.get_default_queue(), last_core_line, write_data.size, IoDir.WRITE, 0, 0
)
io.set_data(write_data)
io.callback = comp.callback
io.submit()
comp.wait()
r.run()
cache.settle()
stats = cache.get_stats()
assert (
threshold_reached_occupancy
== cache.get_stats()["usage"]["occupancy"]["value"] - 1
== stats["usage"]["occupancy"]["value"] - 1
), "Previous request should be promoted and occupancy should rise"
@ -268,14 +215,7 @@ def test_partial_hit_promotion(pyocf_ctx):
cache.add_core(core)
# Step 2
comp = OcfCompletion([("error", c_int)])
write_data = Data(Size.from_sector(1))
io = core.new_io(cache.get_default_queue(), 0, write_data.size, IoDir.READ, 0, 0)
io.set_data(write_data)
io.callback = comp.callback
io.submit()
comp.wait()
r = Rio().readwrite(ReadWrite.READ).bs(Size(512)).size(Size(512)).target(core).run()
stats = cache.get_stats()
cache_lines = stats["conf"]["size"]
@ -291,14 +231,10 @@ def test_partial_hit_promotion(pyocf_ctx):
)
# Step 4
comp = OcfCompletion([("error", c_int)])
write_data = Data(2 * cache_lines.line_size)
io = core.new_io(cache.get_default_queue(), 0, write_data.size, IoDir.WRITE, 0, 0)
io.set_data(write_data)
io.callback = comp.callback
io.submit()
comp.wait()
req_size = Size(2 * cache_lines.line_size)
r.size(req_size).bs(req_size).readwrite(ReadWrite.WRITE).run()
cache.settle()
stats = cache.get_stats()
assert (
stats["usage"]["occupancy"]["value"] == 2