rio: accept queue list in Rio constructor
Signed-off-by: Adam Rutkowski <adam.j.rutkowski@intel.com> Signed-off-by: Jan Musial <jan.musial@intel.com>
This commit is contained in:
parent
372b1f1e1c
commit
a6d8bd0470
@ -79,9 +79,10 @@ class JobSpec:
|
|||||||
|
|
||||||
class Rio:
|
class Rio:
|
||||||
class RioThread(Thread):
|
class RioThread(Thread):
|
||||||
def __init__(self, jobspec: JobSpec):
|
def __init__(self, jobspec: JobSpec, queue):
|
||||||
super().__init__()
|
super().__init__()
|
||||||
self.jobspec = jobspec
|
self.jobspec = jobspec
|
||||||
|
self.queue = queue
|
||||||
self.ios = Size(0)
|
self.ios = Size(0)
|
||||||
self.io_target = 0
|
self.io_target = 0
|
||||||
self.finish_time = None
|
self.finish_time = None
|
||||||
@ -153,7 +154,7 @@ class Rio:
|
|||||||
|
|
||||||
data = Data(self.jobspec.bs) # TODO pattern and verify
|
data = Data(self.jobspec.bs) # TODO pattern and verify
|
||||||
io = self.jobspec.target.new_io(
|
io = self.jobspec.target.new_io(
|
||||||
self.jobspec.target.cache.get_default_queue(), # TODO multiple queues?
|
self.queue,
|
||||||
next(iogen),
|
next(iogen),
|
||||||
self.jobspec.bs,
|
self.jobspec.bs,
|
||||||
iodir,
|
iodir,
|
||||||
@ -260,12 +261,12 @@ class Rio:
|
|||||||
self._threads = []
|
self._threads = []
|
||||||
self.errors = {}
|
self.errors = {}
|
||||||
|
|
||||||
def run(self):
|
def run(self, queues=None):
|
||||||
self.run_async()
|
self.run_async(queues)
|
||||||
self.wait_for_completion()
|
self.wait_for_completion()
|
||||||
return self
|
return self
|
||||||
|
|
||||||
def run_async(self):
|
def run_async(self, queues=None):
|
||||||
self.clear()
|
self.clear()
|
||||||
|
|
||||||
jobs = deepcopy(self.jobs)
|
jobs = deepcopy(self.jobs)
|
||||||
@ -273,9 +274,13 @@ class Rio:
|
|||||||
if not jobs:
|
if not jobs:
|
||||||
jobs = [self.global_jobspec for _ in range(self.global_jobspec.njobs)]
|
jobs = [self.global_jobspec for _ in range(self.global_jobspec.njobs)]
|
||||||
|
|
||||||
|
if not queues:
|
||||||
|
queues = [self.global_jobspec.target.cache.get_default_queue()]
|
||||||
|
queues = cycle(queues)
|
||||||
|
|
||||||
for job in jobs:
|
for job in jobs:
|
||||||
spec = job.merge(self.global_jobspec)
|
spec = job.merge(self.global_jobspec)
|
||||||
thread = Rio.RioThread(spec)
|
thread = Rio.RioThread(spec, next(queues))
|
||||||
self._threads.append(thread)
|
self._threads.append(thread)
|
||||||
|
|
||||||
for thread in self._threads:
|
for thread in self._threads:
|
||||||
|
Loading…
Reference in New Issue
Block a user