diff --git a/tests/functional/pyocf/rio.py b/tests/functional/pyocf/rio.py index 09ef495..f11cba9 100644 --- a/tests/functional/pyocf/rio.py +++ b/tests/functional/pyocf/rio.py @@ -79,9 +79,10 @@ class JobSpec: class Rio: class RioThread(Thread): - def __init__(self, jobspec: JobSpec): + def __init__(self, jobspec: JobSpec, queue): super().__init__() self.jobspec = jobspec + self.queue = queue self.ios = Size(0) self.io_target = 0 self.finish_time = None @@ -153,7 +154,7 @@ class Rio: data = Data(self.jobspec.bs) # TODO pattern and verify io = self.jobspec.target.new_io( - self.jobspec.target.cache.get_default_queue(), # TODO multiple queues? + self.queue, next(iogen), self.jobspec.bs, iodir, @@ -260,12 +261,12 @@ class Rio: self._threads = [] self.errors = {} - def run(self): - self.run_async() + def run(self, queues=None): + self.run_async(queues) self.wait_for_completion() return self - def run_async(self): + def run_async(self, queues=None): self.clear() jobs = deepcopy(self.jobs) @@ -273,9 +274,13 @@ class Rio: if not jobs: jobs = [self.global_jobspec for _ in range(self.global_jobspec.njobs)] + if not queues: + queues = [self.global_jobspec.target.cache.get_default_queue()] + queues = cycle(queues) + for job in jobs: spec = job.merge(self.global_jobspec) - thread = Rio.RioThread(spec) + thread = Rio.RioThread(spec, next(queues)) self._threads.append(thread) for thread in self._threads: