Merge pull request #807 from mmichal10/parallelize-fixes

Parallelize fixes
This commit is contained in:
Robert Baldyga 2024-09-03 12:49:54 +02:00 committed by GitHub
commit 01902e1206
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -25,11 +25,25 @@ struct ocf_parallelize {
struct ocf_request *reqs[]; struct ocf_request *reqs[];
}; };
static void _ocf_parallelize_finish(ocf_parallelize_t parallelize)
{
ocf_parallelize_finish_t finish;
void *priv;
int error;
if (env_atomic_dec_return(&parallelize->remaining))
return;
finish = parallelize->finish;
priv = parallelize->priv;
error = env_atomic_read(&parallelize->error);
finish(parallelize, priv, error);
}
static int _ocf_parallelize_hndl(struct ocf_request *req) static int _ocf_parallelize_hndl(struct ocf_request *req)
{ {
ocf_parallelize_t parallelize = req->priv; ocf_parallelize_t parallelize = req->priv;
ocf_parallelize_finish_t finish;
void *priv;
int error; int error;
error = parallelize->handle(parallelize, parallelize->priv, error = parallelize->handle(parallelize, parallelize->priv,
@ -37,14 +51,7 @@ static int _ocf_parallelize_hndl(struct ocf_request *req)
env_atomic_cmpxchg(&parallelize->error, 0, error); env_atomic_cmpxchg(&parallelize->error, 0, error);
if (env_atomic_dec_return(&parallelize->remaining)) _ocf_parallelize_finish(parallelize);
return 0;
finish = parallelize->finish;
priv = parallelize->priv;
error = env_atomic_read(&parallelize->error);
finish(parallelize, priv, error);
return 0; return 0;
} }
@ -55,6 +62,7 @@ int ocf_parallelize_create(ocf_parallelize_t *parallelize,
ocf_parallelize_finish_t finish) ocf_parallelize_finish_t finish)
{ {
ocf_parallelize_t tmp_parallelize; ocf_parallelize_t tmp_parallelize;
struct list_head *iter;
ocf_queue_t queue; ocf_queue_t queue;
size_t prl_size; size_t prl_size;
unsigned queue_count = 0; unsigned queue_count = 0;
@ -63,7 +71,7 @@ int ocf_parallelize_create(ocf_parallelize_t *parallelize,
queue_count = ocf_cache_get_queue_count(cache); queue_count = ocf_cache_get_queue_count(cache);
if (shards_cnt == 0) if (shards_cnt == 0)
shards_cnt = queue_count; shards_cnt = queue_count ?: 1;
prl_size = sizeof(*tmp_parallelize) + prl_size = sizeof(*tmp_parallelize) +
shards_cnt * sizeof(*tmp_parallelize->reqs); shards_cnt * sizeof(*tmp_parallelize->reqs);
@ -83,13 +91,20 @@ int ocf_parallelize_create(ocf_parallelize_t *parallelize,
tmp_parallelize->finish = finish; tmp_parallelize->finish = finish;
tmp_parallelize->shards_cnt = shards_cnt; tmp_parallelize->shards_cnt = shards_cnt;
env_atomic_set(&tmp_parallelize->remaining, shards_cnt); env_atomic_set(&tmp_parallelize->remaining, shards_cnt + 1);
env_atomic_set(&tmp_parallelize->error, 0); env_atomic_set(&tmp_parallelize->error, 0);
for (i = 0; i < shards_cnt;) {
list_for_each_entry(queue, &cache->io_queues, list) { iter = cache->io_queues.next;
if (i == shards_cnt) for (i = 0; i < shards_cnt; i++) {
break; if (queue_count > 0) {
queue = list_entry(iter, struct ocf_queue, list);
iter = iter->next;
if (iter == &cache->io_queues)
iter = iter->next;
} else {
queue = cache->mngt_queue;
}
tmp_parallelize->reqs[i] = ocf_req_new(queue, tmp_parallelize->reqs[i] = ocf_req_new(queue,
NULL, 0, 0, 0); NULL, 0, 0, 0);
if (!tmp_parallelize->reqs[i]) { if (!tmp_parallelize->reqs[i]) {
@ -101,8 +116,6 @@ int ocf_parallelize_create(ocf_parallelize_t *parallelize,
_ocf_parallelize_hndl; _ocf_parallelize_hndl;
tmp_parallelize->reqs[i]->byte_position = i; tmp_parallelize->reqs[i]->byte_position = i;
tmp_parallelize->reqs[i]->priv = tmp_parallelize; tmp_parallelize->reqs[i]->priv = tmp_parallelize;
i++;
}
} }
*parallelize = tmp_parallelize; *parallelize = tmp_parallelize;
@ -143,4 +156,6 @@ void ocf_parallelize_run(ocf_parallelize_t parallelize)
for (i = 0; i < parallelize->shards_cnt; i++) for (i = 0; i < parallelize->shards_cnt; i++)
ocf_queue_push_req(parallelize->reqs[i], OCF_QUEUE_PRIO_HIGH); ocf_queue_push_req(parallelize->reqs[i], OCF_QUEUE_PRIO_HIGH);
_ocf_parallelize_finish(parallelize);
} }