Merge pull request #642 from robertbaldyga/parallelize

Parallelize metadata initialization
This commit is contained in:
Robert Baldyga
2022-02-07 13:53:45 +01:00
committed by GitHub
16 changed files with 1384 additions and 355 deletions

View File

@@ -17,10 +17,12 @@
#include "../utils/utils_cache_line.h"
#include "../utils/utils_io.h"
#include "../utils/utils_cache_line.h"
#include "../utils/utils_parallelize.h"
#include "../utils/utils_pipeline.h"
#include "../utils/utils_refcnt.h"
#include "../utils/utils_async_lock.h"
#include "../concurrency/ocf_concurrency.h"
#include "../concurrency/ocf_metadata_concurrency.h"
#include "../ocf_lru.h"
#include "../ocf_ctx_priv.h"
#include "../cleaning/cleaning.h"
@@ -215,14 +217,6 @@ static void __init_parts_attached(ocf_cache_t cache)
ocf_lru_init(cache, &cache->free);
}
static void __populate_free(ocf_cache_t cache)
{
uint64_t free_clines = ocf_metadata_collision_table_entries(cache) -
ocf_get_cache_occupancy(cache);
ocf_lru_populate(cache, free_clines);
}
static ocf_error_t __init_cleaning_policy(ocf_cache_t cache)
{
int i;
@@ -300,29 +294,6 @@ static void __reset_stats(ocf_cache_t cache)
}
}
static ocf_error_t init_attached_data_structures(ocf_cache_t cache)
{
ocf_error_t result;
/* Lock to ensure consistency */
ocf_metadata_init_hash_table(cache);
ocf_metadata_init_collision(cache);
__init_parts_attached(cache);
__populate_free(cache);
result = __init_cleaning_policy(cache);
if (result) {
ocf_cache_log(cache, log_err,
"Cannot initialize cleaning policy\n");
return result;
}
__setup_promotion_policy(cache);
return 0;
}
static void init_attached_data_structures_recovery(ocf_cache_t cache,
bool init_collision)
{
@@ -487,28 +458,84 @@ err:
OCF_PL_FINISH_RET(pipeline, -OCF_ERR_START_CACHE_FAIL);
}
static void _recovery_reset_cline_metadata(struct ocf_cache *cache,
ocf_cache_line_t cline)
typedef void (*ocf_mngt_rebuild_metadata_end_t)(void *priv, int error);
/*
* IMPORTANT: This value must match number of LRU lists so that adding
* cache lines to the list can be implemented without locking (each shard
* owns it's own LRU list). Don't change this value unless you are really
* sure you know what you're doing.
*/
#define OCF_MNGT_REBUILD_METADATA_SHARDS_CNT OCF_NUM_LRU_LISTS
struct ocf_mngt_rebuild_metadata_context {
ocf_cache_t cache;
struct {
env_atomic lines;
} core[OCF_CORE_MAX];
struct {
struct {
uint32_t lines;
} core[OCF_CORE_MAX];
} shard[OCF_MNGT_REBUILD_METADATA_SHARDS_CNT];
env_atomic free_lines;
ocf_mngt_rebuild_metadata_end_t cmpl;
void *priv;
};
static void ocf_mngt_cline_reset_metadata(ocf_cache_t cache,
ocf_cache_line_t cline, uint32_t lru_list)
{
ocf_metadata_set_core_info(cache, cline, OCF_CORE_MAX, ULLONG_MAX);
metadata_init_status_bits(cache, cline);
ocf_cleaning_init_cache_block(cache, cline);
ocf_metadata_set_partition_id(cache, cline, PARTITION_FREELIST);
ocf_lru_add_free(cache, cline);
}
static int _ocf_mngt_rebuild_metadata(ocf_cache_t cache)
static void ocf_mngt_cline_rebuild_metadata(ocf_cache_t cache,
ocf_core_id_t core_id, uint64_t core_line,
ocf_cache_line_t cline)
{
ocf_cache_line_t cline;
ocf_part_id_t part_id = PARTITION_DEFAULT;
ocf_cache_line_t hash_index;
ocf_metadata_set_partition_id(cache, cline, part_id);
hash_index = ocf_metadata_hash_func(cache, core_line, core_id);
ocf_hb_id_naked_lock_wr(&cache->metadata.lock, hash_index);
ocf_metadata_add_to_collision(cache, core_id, core_line, hash_index,
cline);
ocf_hb_id_naked_unlock_wr(&cache->metadata.lock, hash_index);
ocf_lru_init_cline(cache, cline);
ocf_lru_add(cache, cline);
}
static int ocf_mngt_rebuild_metadata_handle(ocf_parallelize_t parallelize,
void *priv, unsigned shard_id, unsigned shards_cnt)
{
struct ocf_mngt_rebuild_metadata_context *context = priv;
ocf_cache_t cache = context->cache;
ocf_cache_line_t begin, increment, cline, free_lines;
ocf_core_t core;
ocf_core_id_t core_id;
uint64_t core_line;
unsigned char step = 0;
const uint64_t collision_table_entries =
ocf_metadata_collision_table_entries(cache);
const uint64_t entries = ocf_metadata_collision_table_entries(cache);
ocf_metadata_start_exclusive_access(&cache->metadata.lock);
begin = shard_id;
increment = shards_cnt;
for (cline = 0; cline < collision_table_entries; cline++) {
free_lines = 0;
for (cline = begin; cline < entries; cline += increment) {
bool any_valid = true;
OCF_COND_RESCHED(step, 128);
@@ -525,11 +552,12 @@ static int _ocf_mngt_rebuild_metadata(ocf_cache_t cache)
any_valid = metadata_clear_valid_if_clean(cache, cline);
if (!any_valid || core_id == OCF_CORE_MAX) {
/* Reset metadata for not mapped or clean cache line */
_recovery_reset_cline_metadata(cache, cline);
ocf_mngt_cline_reset_metadata(cache, cline, shard_id);
free_lines++;
continue;
}
if (!cache->core[core_id].added) {
if (!cache->core[core_id].conf_meta->valid) {
ocf_cache_log(cache, log_err, "Stale mapping in "
"on-disk metadata - refusing to "
"recover cache.\n");
@@ -537,58 +565,130 @@ static int _ocf_mngt_rebuild_metadata(ocf_cache_t cache)
}
/* Rebuild metadata for mapped cache line */
ocf_cline_rebuild_metadata(cache, core_id, core_line, cline);
ocf_mngt_cline_rebuild_metadata(cache, core_id,
core_line, cline);
context->shard[shard_id].core[core_id].lines++;
}
ocf_metadata_end_exclusive_access(&cache->metadata.lock);
for_each_core(cache, core, core_id) {
env_atomic_add(context->shard[shard_id].core[core_id].lines,
&context->core[core_id].lines);
}
env_atomic_add(free_lines, &context->free_lines);
return 0;
}
static int _ocf_mngt_recovery_rebuild_metadata(ocf_cache_t cache)
static void ocf_mngt_rebuild_metadata_finish(ocf_parallelize_t parallelize,
void *priv, int error)
{
return _ocf_mngt_rebuild_metadata(cache);
struct ocf_mngt_rebuild_metadata_context *context = priv;
ocf_cache_t cache = context->cache;
ocf_part_id_t part_id = PARTITION_DEFAULT;
struct ocf_part_runtime *part;
ocf_core_t core;
ocf_core_id_t core_id;
uint32_t lines_total = 0;
for_each_core(cache, core, core_id) {
uint32_t lines = env_atomic_read(&context->core[core_id].lines);
env_atomic_set(&core->runtime_meta->cached_clines, lines);
env_atomic_set(&core->runtime_meta->
part_counters[part_id].cached_clines, lines);
env_atomic_set(&core->runtime_meta->dirty_clines, lines);
env_atomic_set(&core->runtime_meta->
part_counters[part_id].dirty_clines, lines);
if (lines) {
env_atomic64_set(&core->runtime_meta->dirty_since,
env_ticks_to_secs(env_get_tick_count()));
}
lines_total += lines;
}
part = cache->user_parts[part_id].part.runtime;
env_atomic_set(&part->curr_size, lines_total);
env_atomic_set(&cache->free.runtime->curr_size,
env_atomic_read(&context->free_lines));
context->cmpl(context->priv, error);
ocf_parallelize_destroy(parallelize);
}
static inline ocf_error_t _ocf_init_cleaning_policy(ocf_cache_t cache,
ocf_cleaning_t cleaning_policy,
enum ocf_metadata_shutdown_status shutdown_status)
static void ocf_mngt_rebuild_metadata(ocf_cache_t cache,
ocf_mngt_rebuild_metadata_end_t cmpl, void *priv)
{
ocf_error_t result;
struct ocf_mngt_rebuild_metadata_context *context;
ocf_parallelize_t parallelize;
int result;
if (shutdown_status == ocf_metadata_clean_shutdown)
result = ocf_cleaning_initialize(cache, cleaning_policy, 0);
else
result = ocf_cleaning_initialize(cache, cleaning_policy, 1);
result = ocf_parallelize_create(&parallelize, cache,
OCF_MNGT_REBUILD_METADATA_SHARDS_CNT,
sizeof(*context), ocf_mngt_rebuild_metadata_handle,
ocf_mngt_rebuild_metadata_finish);
if (result) {
cmpl(priv, result);
return;
}
if (result)
ocf_cache_log(cache, log_err, "Cannot initialize cleaning policy\n");
context = ocf_parallelize_get_priv(parallelize);
context->cache = cache;
context->cmpl = cmpl;
context->priv = priv;
return result;
ocf_parallelize_run(parallelize);
}
static void _ocf_mngt_load_post_metadata_load(ocf_pipeline_t pipeline,
static void _ocf_mngt_load_rebuild_metadata_complete(void *priv, int error)
{
struct ocf_cache_attach_context *context = priv;
OCF_PL_NEXT_ON_SUCCESS_RET(context->pipeline, error);
}
static void _ocf_mngt_load_rebuild_metadata(ocf_pipeline_t pipeline,
void *priv, ocf_pipeline_arg_t arg)
{
struct ocf_cache_attach_context *context = priv;
ocf_cache_t cache = context->cache;
if (context->metadata.shutdown_status != ocf_metadata_clean_shutdown) {
ocf_mngt_rebuild_metadata(cache,
_ocf_mngt_load_rebuild_metadata_complete,
context);
return;
}
ocf_pipeline_next(pipeline);
}
static void _ocf_mngt_cleaning_recovery_complete(void *priv, int error)
{
struct ocf_cache_attach_context *context = priv;
OCF_PL_NEXT_ON_SUCCESS_RET(context->pipeline, error);
}
static void _ocf_mngt_load_init_cleaning(ocf_pipeline_t pipeline,
void *priv, ocf_pipeline_arg_t arg)
{
struct ocf_cache_attach_context *context = priv;
ocf_cache_t cache = context->cache;
ocf_error_t result;
int ret;
if (context->metadata.shutdown_status != ocf_metadata_clean_shutdown) {
ret = _ocf_mngt_recovery_rebuild_metadata(cache);
if (ret)
OCF_PL_FINISH_RET(pipeline, ret);
__populate_free(cache);
if (context->metadata.shutdown_status == ocf_metadata_clean_shutdown) {
result = ocf_cleaning_initialize(cache,
cache->cleaner.policy, 0);
OCF_PL_NEXT_ON_SUCCESS_RET(pipeline, result);
}
result = _ocf_init_cleaning_policy(cache, cache->cleaner.policy,
context->metadata.shutdown_status);
if (result)
OCF_PL_FINISH_RET(pipeline, result);
ocf_pipeline_next(pipeline);
ocf_cleaning_recovery(cache, cache->cleaner.policy,
_ocf_mngt_cleaning_recovery_complete, context);
}
void _ocf_mngt_load_metadata_complete(void *priv, int error)
@@ -1108,16 +1208,51 @@ static void _ocf_mngt_attach_prepare_metadata(ocf_pipeline_t pipeline,
/**
* @brief initializing cache anew (not loading or recovering)
*/
static void _ocf_mngt_attach_init_instance(ocf_pipeline_t pipeline,
static void _ocf_mngt_attach_init_metadata(ocf_pipeline_t pipeline,
void *priv, ocf_pipeline_arg_t arg)
{
struct ocf_cache_attach_context *context = priv;
ocf_cache_t cache = context->cache;
ocf_metadata_init_hash_table(cache);
ocf_metadata_init_collision(cache);
__init_parts_attached(cache);
ocf_pipeline_next(pipeline);
}
static void _ocf_mngt_attach_populate_free_complete(void *priv, int error)
{
struct ocf_cache_attach_context *context = priv;
OCF_PL_NEXT_ON_SUCCESS_RET(context->pipeline, error);
}
static void _ocf_mngt_attach_populate_free(ocf_pipeline_t pipeline,
void *priv, ocf_pipeline_arg_t arg)
{
struct ocf_cache_attach_context *context = priv;
ocf_cache_t cache = context->cache;
ocf_lru_populate(cache, _ocf_mngt_attach_populate_free_complete,
context);
}
static void _ocf_mngt_attach_init_services(ocf_pipeline_t pipeline,
void *priv, ocf_pipeline_arg_t arg)
{
struct ocf_cache_attach_context *context = priv;
ocf_cache_t cache = context->cache;
ocf_error_t result;
result = init_attached_data_structures(cache);
if (result)
result = __init_cleaning_policy(cache);
if (result) {
ocf_cache_log(cache, log_err,
"Cannot initialize cleaning policy\n");
OCF_PL_FINISH_RET(pipeline, result);
}
__setup_promotion_policy(cache);
/* In initial cache state there is no dirty data, so all dirty data is
considered to be flushed
@@ -1728,7 +1863,9 @@ struct ocf_pipeline_properties _ocf_mngt_cache_attach_pipeline_properties = {
OCF_PL_STEP(_ocf_mngt_test_volume),
OCF_PL_STEP(_ocf_mngt_init_cleaner),
OCF_PL_STEP(_ocf_mngt_init_promotion),
OCF_PL_STEP(_ocf_mngt_attach_init_instance),
OCF_PL_STEP(_ocf_mngt_attach_init_metadata),
OCF_PL_STEP(_ocf_mngt_attach_populate_free),
OCF_PL_STEP(_ocf_mngt_attach_init_services),
OCF_PL_STEP(_ocf_mngt_zero_superblock),
OCF_PL_STEP(_ocf_mngt_attach_flush_metadata),
OCF_PL_STEP(_ocf_mngt_attach_discard),
@@ -1756,7 +1893,8 @@ struct ocf_pipeline_properties _ocf_mngt_cache_load_pipeline_properties = {
OCF_PL_STEP(_ocf_mngt_init_promotion),
OCF_PL_STEP(_ocf_mngt_load_add_cores),
OCF_PL_STEP(_ocf_mngt_load_metadata),
OCF_PL_STEP(_ocf_mngt_load_post_metadata_load),
OCF_PL_STEP(_ocf_mngt_load_rebuild_metadata),
OCF_PL_STEP(_ocf_mngt_load_init_cleaning),
OCF_PL_STEP(_ocf_mngt_attach_shutdown_status),
OCF_PL_STEP(_ocf_mngt_attach_flush_metadata),
OCF_PL_STEP(_ocf_mngt_attach_shutdown_status),
@@ -2098,7 +2236,6 @@ static void _ocf_mngt_standby_init_structures_attach(ocf_pipeline_t pipeline,
ocf_cache_t cache = context->cache;
init_attached_data_structures_recovery(cache, true);
__populate_free(cache);
ocf_pipeline_next(pipeline);
}
@@ -2128,37 +2265,6 @@ static void _ocf_mngt_standby_init_pio_concurrency(ocf_pipeline_t pipeline,
OCF_PL_NEXT_ON_SUCCESS_RET(context->pipeline, result);
}
static void _ocf_mngt_standby_recovery(ocf_pipeline_t pipeline,
void *priv, ocf_pipeline_arg_t arg)
{
struct ocf_cache_attach_context *context = priv;
ocf_cache_t cache = context->cache;
int ret;
ret = _ocf_mngt_recovery_rebuild_metadata(cache);
if (ret)
OCF_PL_FINISH_RET(pipeline, ret);
__populate_free(cache);
ocf_pipeline_next(pipeline);
}
static void _ocf_mngt_standby_init_cleaning(ocf_pipeline_t pipeline,
void *priv, ocf_pipeline_arg_t arg)
{
struct ocf_cache_attach_context *context = priv;
ocf_cache_t cache = context->cache;
ocf_error_t result;
result = _ocf_init_cleaning_policy(cache, cache->cleaner.policy,
context->metadata.shutdown_status);
if (result)
OCF_PL_FINISH_RET(pipeline, result);
ocf_pipeline_next(pipeline);
}
static void _ocf_mngt_standby_post_init(ocf_pipeline_t pipeline,
void *priv, ocf_pipeline_arg_t arg)
{
@@ -2185,7 +2291,8 @@ struct ocf_pipeline_properties _ocf_mngt_cache_standby_attach_pipeline_propertie
OCF_PL_STEP(_ocf_mngt_test_volume),
OCF_PL_STEP(_ocf_mngt_init_cleaner),
OCF_PL_STEP(_ocf_mngt_standby_init_structures_attach),
OCF_PL_STEP(_ocf_mngt_standby_init_cleaning),
OCF_PL_STEP(_ocf_mngt_attach_populate_free),
OCF_PL_STEP(_ocf_mngt_load_init_cleaning),
OCF_PL_STEP(_ocf_mngt_standby_preapre_mempool),
OCF_PL_STEP(_ocf_mngt_standby_init_pio_concurrency),
OCF_PL_STEP(_ocf_mngt_zero_superblock),
@@ -2213,10 +2320,10 @@ struct ocf_pipeline_properties _ocf_mngt_cache_standby_load_pipeline_properties
OCF_PL_STEP(_ocf_mngt_load_metadata_recovery),
OCF_PL_STEP(_ocf_mngt_init_cleaner),
OCF_PL_STEP(_ocf_mngt_standby_init_structures_load),
OCF_PL_STEP(_ocf_mngt_standby_init_cleaning),
OCF_PL_STEP(_ocf_mngt_load_init_cleaning),
OCF_PL_STEP(_ocf_mngt_standby_preapre_mempool),
OCF_PL_STEP(_ocf_mngt_standby_init_pio_concurrency),
OCF_PL_STEP(_ocf_mngt_standby_recovery),
OCF_PL_STEP(_ocf_mngt_load_rebuild_metadata),
OCF_PL_STEP(_ocf_mngt_standby_post_init),
OCF_PL_STEP_TERMINATOR(),
},
@@ -2430,6 +2537,7 @@ struct ocf_pipeline_properties _ocf_mngt_cache_activate_pipeline_properties = {
.steps = {
OCF_PL_STEP(_ocf_mngt_copy_uuid_data),
OCF_PL_STEP(_ocf_mngt_activate_set_cache_device),
OCF_PL_STEP(_ocf_mngt_activate_init_properties),
OCF_PL_STEP(_ocf_mngt_activate_compare_superblock),
OCF_PL_STEP(_ocf_mngt_load_superblock),
OCF_PL_STEP(_ocf_mngt_activate_check_superblock),