diff --git a/src/utils/utils_parallelize.c b/src/utils/utils_parallelize.c index 43983bd..e22d436 100644 --- a/src/utils/utils_parallelize.c +++ b/src/utils/utils_parallelize.c @@ -25,11 +25,25 @@ struct ocf_parallelize { struct ocf_request *reqs[]; }; +static void _ocf_parallelize_finish(ocf_parallelize_t parallelize) +{ + ocf_parallelize_finish_t finish; + void *priv; + int error; + + if (env_atomic_dec_return(¶llelize->remaining)) + return; + + finish = parallelize->finish; + priv = parallelize->priv; + error = env_atomic_read(¶llelize->error); + + finish(parallelize, priv, error); +} + static int _ocf_parallelize_hndl(struct ocf_request *req) { ocf_parallelize_t parallelize = req->priv; - ocf_parallelize_finish_t finish; - void *priv; int error; error = parallelize->handle(parallelize, parallelize->priv, @@ -37,14 +51,7 @@ static int _ocf_parallelize_hndl(struct ocf_request *req) env_atomic_cmpxchg(¶llelize->error, 0, error); - if (env_atomic_dec_return(¶llelize->remaining)) - return 0; - - finish = parallelize->finish; - priv = parallelize->priv; - error = env_atomic_read(¶llelize->error); - - finish(parallelize, priv, error); + _ocf_parallelize_finish(parallelize); return 0; } @@ -55,6 +62,7 @@ int ocf_parallelize_create(ocf_parallelize_t *parallelize, ocf_parallelize_finish_t finish) { ocf_parallelize_t tmp_parallelize; + struct list_head *iter; ocf_queue_t queue; size_t prl_size; unsigned queue_count = 0; @@ -63,7 +71,7 @@ int ocf_parallelize_create(ocf_parallelize_t *parallelize, queue_count = ocf_cache_get_queue_count(cache); if (shards_cnt == 0) - shards_cnt = queue_count; + shards_cnt = queue_count ?: 1; prl_size = sizeof(*tmp_parallelize) + shards_cnt * sizeof(*tmp_parallelize->reqs); @@ -83,26 +91,31 @@ int ocf_parallelize_create(ocf_parallelize_t *parallelize, tmp_parallelize->finish = finish; tmp_parallelize->shards_cnt = shards_cnt; - env_atomic_set(&tmp_parallelize->remaining, shards_cnt); + env_atomic_set(&tmp_parallelize->remaining, shards_cnt + 1); env_atomic_set(&tmp_parallelize->error, 0); - for (i = 0; i < shards_cnt;) { - list_for_each_entry(queue, &cache->io_queues, list) { - if (i == shards_cnt) - break; - tmp_parallelize->reqs[i] = ocf_req_new(queue, - NULL, 0, 0, 0); - if (!tmp_parallelize->reqs[i]) { - result = -OCF_ERR_NO_MEM; - goto err_reqs; - } - tmp_parallelize->reqs[i]->info.internal = true; - tmp_parallelize->reqs[i]->engine_handler = - _ocf_parallelize_hndl; - tmp_parallelize->reqs[i]->byte_position = i; - tmp_parallelize->reqs[i]->priv = tmp_parallelize; - i++; + + iter = cache->io_queues.next; + for (i = 0; i < shards_cnt; i++) { + if (queue_count > 0) { + queue = list_entry(iter, struct ocf_queue, list); + iter = iter->next; + if (iter == &cache->io_queues) + iter = iter->next; + } else { + queue = cache->mngt_queue; } + tmp_parallelize->reqs[i] = ocf_req_new(queue, + NULL, 0, 0, 0); + if (!tmp_parallelize->reqs[i]) { + result = -OCF_ERR_NO_MEM; + goto err_reqs; + } + tmp_parallelize->reqs[i]->info.internal = true; + tmp_parallelize->reqs[i]->engine_handler = + _ocf_parallelize_hndl; + tmp_parallelize->reqs[i]->byte_position = i; + tmp_parallelize->reqs[i]->priv = tmp_parallelize; } *parallelize = tmp_parallelize; @@ -143,4 +156,6 @@ void ocf_parallelize_run(ocf_parallelize_t parallelize) for (i = 0; i < parallelize->shards_cnt; i++) ocf_queue_push_req(parallelize->reqs[i], OCF_QUEUE_PRIO_HIGH); + + _ocf_parallelize_finish(parallelize); }