From 5c714cb3de7d105eee57ab8278a5aeca3290d3f9 Mon Sep 17 00:00:00 2001 From: Robert Baldyga Date: Mon, 26 Jun 2023 23:50:12 +0200 Subject: [PATCH 1/3] parallelize: Use mngt_queue only as a fallback Signed-off-by: Robert Baldyga Signed-off-by: Michal Mielewczyk --- src/utils/utils_parallelize.c | 41 +++++++++++++++++++++-------------- 1 file changed, 25 insertions(+), 16 deletions(-) diff --git a/src/utils/utils_parallelize.c b/src/utils/utils_parallelize.c index 43983bd..5a2fdb6 100644 --- a/src/utils/utils_parallelize.c +++ b/src/utils/utils_parallelize.c @@ -55,6 +55,7 @@ int ocf_parallelize_create(ocf_parallelize_t *parallelize, ocf_parallelize_finish_t finish) { ocf_parallelize_t tmp_parallelize; + struct list_head *iter; ocf_queue_t queue; size_t prl_size; unsigned queue_count = 0; @@ -65,6 +66,9 @@ int ocf_parallelize_create(ocf_parallelize_t *parallelize, if (shards_cnt == 0) shards_cnt = queue_count; + if (queue_count == 0) + shards_cnt = 1; + prl_size = sizeof(*tmp_parallelize) + shards_cnt * sizeof(*tmp_parallelize->reqs); @@ -86,23 +90,28 @@ int ocf_parallelize_create(ocf_parallelize_t *parallelize, env_atomic_set(&tmp_parallelize->remaining, shards_cnt); env_atomic_set(&tmp_parallelize->error, 0); - for (i = 0; i < shards_cnt;) { - list_for_each_entry(queue, &cache->io_queues, list) { - if (i == shards_cnt) - break; - tmp_parallelize->reqs[i] = ocf_req_new(queue, - NULL, 0, 0, 0); - if (!tmp_parallelize->reqs[i]) { - result = -OCF_ERR_NO_MEM; - goto err_reqs; - } - tmp_parallelize->reqs[i]->info.internal = true; - tmp_parallelize->reqs[i]->engine_handler = - _ocf_parallelize_hndl; - tmp_parallelize->reqs[i]->byte_position = i; - tmp_parallelize->reqs[i]->priv = tmp_parallelize; - i++; + + iter = cache->io_queues.next; + for (i = 0; i < shards_cnt; i++) { + if (queue_count > 0) { + queue = list_entry(iter, struct ocf_queue, list); + iter = iter->next; + if (iter == &cache->io_queues) + iter = iter->next; + } else { + queue = cache->mngt_queue; } + tmp_parallelize->reqs[i] = ocf_req_new(queue, + NULL, 0, 0, 0); + if (!tmp_parallelize->reqs[i]) { + result = -OCF_ERR_NO_MEM; + goto err_reqs; + } + tmp_parallelize->reqs[i]->info.internal = true; + tmp_parallelize->reqs[i]->engine_handler = + _ocf_parallelize_hndl; + tmp_parallelize->reqs[i]->byte_position = i; + tmp_parallelize->reqs[i]->priv = tmp_parallelize; } *parallelize = tmp_parallelize; From 55b99518eda8d86edc9b2f347fd09cbb4ad4b322 Mon Sep 17 00:00:00 2001 From: Robert Baldyga Date: Wed, 5 Jul 2023 23:10:36 +0200 Subject: [PATCH 2/3] parallelize: Create number of shards requested by user In some scenarios running the exact number of shards, regardless of number of available queues is crucial for correctness of operation. Signed-off-by: Robert Baldyga Signed-off-by: Michal Mielewczyk --- src/utils/utils_parallelize.c | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/utils/utils_parallelize.c b/src/utils/utils_parallelize.c index 5a2fdb6..e6ca0e6 100644 --- a/src/utils/utils_parallelize.c +++ b/src/utils/utils_parallelize.c @@ -64,10 +64,7 @@ int ocf_parallelize_create(ocf_parallelize_t *parallelize, queue_count = ocf_cache_get_queue_count(cache); if (shards_cnt == 0) - shards_cnt = queue_count; - - if (queue_count == 0) - shards_cnt = 1; + shards_cnt = queue_count ?: 1; prl_size = sizeof(*tmp_parallelize) + shards_cnt * sizeof(*tmp_parallelize->reqs); From 5e6a90a293c8523ff54332bedda3aee3f8d13d53 Mon Sep 17 00:00:00 2001 From: Robert Baldyga Date: Wed, 26 Jul 2023 04:04:09 +0200 Subject: [PATCH 3/3] parallelize: Fix race condition In situation when all the shards finish their work before parallelize loop does it's final loop condition check, which involves access to parallelize object, it's possible that parallelize object will be deinitialized before this final access. Increasing refcount by 1 before running parallelize and decreasing it only after the loop is finished addresses this problem. Signed-off-by: Robert Baldyga Signed-off-by: Michal Mielewczyk --- src/utils/utils_parallelize.c | 31 ++++++++++++++++++++----------- 1 file changed, 20 insertions(+), 11 deletions(-) diff --git a/src/utils/utils_parallelize.c b/src/utils/utils_parallelize.c index e6ca0e6..e22d436 100644 --- a/src/utils/utils_parallelize.c +++ b/src/utils/utils_parallelize.c @@ -25,11 +25,25 @@ struct ocf_parallelize { struct ocf_request *reqs[]; }; +static void _ocf_parallelize_finish(ocf_parallelize_t parallelize) +{ + ocf_parallelize_finish_t finish; + void *priv; + int error; + + if (env_atomic_dec_return(¶llelize->remaining)) + return; + + finish = parallelize->finish; + priv = parallelize->priv; + error = env_atomic_read(¶llelize->error); + + finish(parallelize, priv, error); +} + static int _ocf_parallelize_hndl(struct ocf_request *req) { ocf_parallelize_t parallelize = req->priv; - ocf_parallelize_finish_t finish; - void *priv; int error; error = parallelize->handle(parallelize, parallelize->priv, @@ -37,14 +51,7 @@ static int _ocf_parallelize_hndl(struct ocf_request *req) env_atomic_cmpxchg(¶llelize->error, 0, error); - if (env_atomic_dec_return(¶llelize->remaining)) - return 0; - - finish = parallelize->finish; - priv = parallelize->priv; - error = env_atomic_read(¶llelize->error); - - finish(parallelize, priv, error); + _ocf_parallelize_finish(parallelize); return 0; } @@ -84,7 +91,7 @@ int ocf_parallelize_create(ocf_parallelize_t *parallelize, tmp_parallelize->finish = finish; tmp_parallelize->shards_cnt = shards_cnt; - env_atomic_set(&tmp_parallelize->remaining, shards_cnt); + env_atomic_set(&tmp_parallelize->remaining, shards_cnt + 1); env_atomic_set(&tmp_parallelize->error, 0); @@ -149,4 +156,6 @@ void ocf_parallelize_run(ocf_parallelize_t parallelize) for (i = 0; i < parallelize->shards_cnt; i++) ocf_queue_push_req(parallelize->reqs[i], OCF_QUEUE_PRIO_HIGH); + + _ocf_parallelize_finish(parallelize); }