diff --git a/src/mngt/ocf_mngt_flush.c b/src/mngt/ocf_mngt_flush.c index bc771e5..b586f09 100644 --- a/src/mngt/ocf_mngt_flush.c +++ b/src/mngt/ocf_mngt_flush.c @@ -9,22 +9,96 @@ #include "../metadata/metadata.h" #include "../cleaning/cleaning.h" #include "../engine/cache_engine.h" +#include "../engine/engine_common.h" #include "../utils/utils_cleaner.h" #include "../utils/utils_cache_line.h" #include "../utils/utils_part.h" +#include "../utils/utils_pipeline.h" +#include "../utils/utils_req.h" #include "../ocf_def_priv.h" -static inline void _ocf_mngt_begin_flush(struct ocf_cache *cache) +struct ocf_mngt_cache_flush_context; +typedef void (*ocf_flush_complete_t)(struct ocf_mngt_cache_flush_context *, int); + +struct flush_containers_context { + /* array of container descriptors */ + struct flush_container *fctbl; + /* fctbl array size */ + uint32_t fcnum; + /* shared error for all concurrent container flushes */ + env_atomic error; + /* number of outstanding container flushes */ + env_atomic count; + /* first container flush to notice interrupt sets this to 1 */ + env_atomic interrupt_seen; + /* completion to be called after all containers are flushed */ + ocf_flush_complete_t complete; +}; + +/* common struct for cache/core flush/purge pipeline priv */ +struct ocf_mngt_cache_flush_context +{ + /* pipeline for flush / purge */ + ocf_pipeline_t pipeline; + /* target cache */ + ocf_cache_t cache; + /* target core */ + ocf_core_t core; + /* true if flush interrupt respected */ + bool allow_interruption; + + /* management operation identifier */ + enum { + flush_cache = 0, + flush_core, + purge_cache, + purge_core + } op; + + /* ocf mngmt entry point completion */ + union { + ocf_mngt_cache_flush_end_t flush_cache; + ocf_mngt_core_flush_end_t flush_core; + ocf_mngt_cache_purge_end_t purge_cache; + ocf_mngt_core_purge_end_t purge_core; + } cmpl; + + /* completion pivate data */ + void *priv; + + /* purge parameters */ + struct { + uint64_t end_byte; + uint64_t core_id; + } purge; + + /* context for flush containers logic */ + struct flush_containers_context fcs; +}; + +static void _ocf_mngt_begin_flush(ocf_pipeline_t pipeline, void *priv, + ocf_pipeline_arg_t arg) +{ + struct ocf_mngt_cache_flush_context *context = priv; + ocf_cache_t cache = context->cache; + + /* FIXME: need mechanism for async waiting for outstanding flushed to + * finish */ env_mutex_lock(&cache->flush_mutex); env_atomic_inc(&cache->flush_started); + /* FIXME: remove waitqueue from async begin */ env_waitqueue_wait(cache->pending_dirty_wq, !env_atomic_read(&cache->pending_dirty_requests)); + + cache->flushing_interrupted = 0; + + ocf_pipeline_next(context->pipeline); } -static inline void _ocf_mngt_end_flush(struct ocf_cache *cache) +static void _ocf_mngt_end_flush(ocf_cache_t cache) { ENV_BUG_ON(env_atomic_dec_return(&cache->flush_started) < 0); @@ -119,11 +193,6 @@ static int _ocf_mngt_get_sectors(struct ocf_cache *cache, int core_id, return 0; } -static void _ocf_mngt_free_sectors(void *tbl) -{ - env_vfree(tbl); -} - static int _ocf_mngt_get_flush_containers(ocf_cache_t cache, struct flush_container **fctbl, uint32_t *fcnum) { @@ -140,7 +209,6 @@ static int _ocf_mngt_get_flush_containers(ocf_cache_t cache, * TODO: Create containers for each physical device, not for * each core. Cores can be partitions of single device. */ - num = cache->conf_meta->core_count; if (num == 0) { *fcnum = 0; @@ -151,7 +219,7 @@ static int _ocf_mngt_get_flush_containers(ocf_cache_t cache, if (!core_revmap) return -OCF_ERR_NO_MEM; - /* TODO: Alloc flush_containers and data tables in single allocation */ + /* TODO: Alloc fcs and data tables in single allocation */ fc = env_vzalloc(sizeof(**fctbl) * num); if (!fc) { env_vfree(core_revmap); @@ -233,7 +301,7 @@ static int _ocf_mngt_get_flush_containers(ocf_cache_t cache, } static void _ocf_mngt_free_flush_containers(struct flush_container *fctbl, - uint32_t num) + uint32_t num) { int i; @@ -278,133 +346,206 @@ static void _ocf_mngt_flush_portion(struct flush_container *fc) fc->iter += curr_count; } -static void _ocf_mngt_flush_end(void *private_data, int error) +static void _ocf_mngt_flush_portion_end(void *private_data, int error) { struct flush_container *fc = private_data; + struct ocf_mngt_cache_flush_context *context = fc->context; + struct flush_containers_context *fsc = &context->fcs; + ocf_cache_t cache = context->cache; + ocf_core_t core = &cache->core[fc->core_id]; + bool first_interrupt; + + env_atomic_set(&core->flushed, fc->iter); fc->ticks2 = env_get_tick_count(); - env_atomic_cmpxchg(fc->error, 0, error); + env_atomic_cmpxchg(&fsc->error, 0, error); - env_atomic_set(&fc->completed, 1); - env_atomic_inc(fc->progress); - env_waitqueue_wake_up(fc->wq); + if (cache->flushing_interrupted) { + first_interrupt = !env_atomic_cmpxchg(&fsc->interrupt_seen, 0, 1); + if (first_interrupt) { + if (context->allow_interruption) { + ocf_cache_log(cache, log_info, + "Flushing interrupted by " + "user\n"); + } else { + ocf_cache_log(cache, log_err, + "Cannot interrupt flushing\n"); + } + } + if (context->allow_interruption) { + env_atomic_cmpxchg(&fsc->error, 0, + -OCF_ERR_FLUSHING_INTERRUPTED); + } + } + + if (env_atomic_read(&fsc->error) || fc->iter == fc->count) { + ocf_req_put(fc->req); + fc->end(context); + return; + } + + ocf_engine_push_req_front(fc->req, false); } -static int _ocf_mngt_flush_containers(ocf_cache_t cache, - struct flush_container *fctbl, uint32_t fcnum, - bool allow_interruption) + +static int _ofc_flush_container_step(struct ocf_request *req) +{ + struct flush_container *fc = req->priv; + ocf_cache_t cache = fc->cache; + + ocf_metadata_lock(cache, OCF_METADATA_WR); + _ocf_mngt_flush_portion(fc); + ocf_metadata_unlock(cache, OCF_METADATA_WR); + + return 0; +} + +static const struct ocf_io_if _io_if_flush_portion = { + .read = _ofc_flush_container_step, + .write = _ofc_flush_container_step, +}; + +static void _ocf_mngt_flush_container( + struct ocf_mngt_cache_flush_context *context, + struct flush_container *fc, ocf_flush_containter_coplete_t end) +{ + ocf_cache_t cache = context->cache; + struct ocf_request *req; + int error = 0; + + if (!fc->count) + goto finish; + + fc->end = end; + fc->context = context; + + req = ocf_req_new(cache->mngt_queue, NULL, 0, 0, 0); + if (!req) { + error = OCF_ERR_NO_MEM; + goto finish; + } + + req->info.internal = true; + req->io_if = &_io_if_flush_portion; + req->priv = fc; + + fc->req = req; + fc->attribs.cache_line_lock = true; + fc->attribs.cmpl_context = fc; + fc->attribs.cmpl_fn = _ocf_mngt_flush_portion_end; + fc->attribs.io_queue = cache->mngt_queue; + fc->cache = cache; + fc->flush_portion = OCF_MNG_FLUSH_MIN; + fc->ticks1 = 0; + fc->ticks2 = UINT_MAX; + + ocf_engine_push_req_front(fc->req, true); + return; + +finish: + env_atomic_cmpxchg(&context->fcs.error, 0, error); + end(fc); +} + +void _ocf_flush_container_complete(void *ctx) +{ + struct ocf_mngt_cache_flush_context *context = ctx; + + if (atomic_dec_return(&context->fcs.count)) { + return; + } + + _ocf_mngt_free_flush_containers(context->fcs.fctbl, + context->fcs.fcnum); + + context->fcs.complete(context, + env_atomic_read(&context->fcs.error)); +} + +static void _ocf_mngt_flush_containers( + struct ocf_mngt_cache_flush_context *context, + struct flush_container *fctbl, + uint32_t fcnum, ocf_flush_complete_t complete) { - uint32_t fc_to_flush; - env_waitqueue wq; - env_atomic progress; /* incremented each time flushing of a portion of a - container is completed */ - env_atomic error; - ocf_core_t core; - bool interrupt = false; int i; - if (fcnum == 0) - return 0; - - env_waitqueue_init(&wq); + if (fcnum == 0) { + complete(context, 0); + return; + } /* Sort data. Smallest sectors first (0...n). */ ocf_cleaner_sort_flush_containers(fctbl, fcnum); - env_atomic_set(&error, 0); + env_atomic_set(&context->fcs.error, 0); + env_atomic_set(&context->fcs.count, 1); + context->fcs.complete = complete; + context->fcs.fctbl = fctbl; + context->fcs.fcnum = fcnum; for (i = 0; i < fcnum; i++) { - fctbl[i].attribs.cache_line_lock = true; - fctbl[i].attribs.cmpl_context = &fctbl[i]; - fctbl[i].attribs.cmpl_fn = _ocf_mngt_flush_end; - fctbl[i].attribs.io_queue = cache->mngt_queue; - fctbl[i].cache = cache; - fctbl[i].progress = &progress; - fctbl[i].error = &error; - fctbl[i].wq = &wq; - fctbl[i].flush_portion = OCF_MNG_FLUSH_MIN; - fctbl[i].ticks1 = 0; - fctbl[i].ticks2 = UINT_MAX; - env_atomic_set(&fctbl[i].completed, 1); + env_atomic_inc(&context->fcs.count); + _ocf_mngt_flush_container(context, &fctbl[i], + _ocf_flush_container_complete); } - for (fc_to_flush = fcnum; fc_to_flush > 0;) { - env_atomic_set(&progress, 0); - for (i = 0; i < fcnum; i++) { - if (!env_atomic_read(&fctbl[i].completed)) - continue; - - core = &cache->core[fctbl[i].core_id]; - env_atomic_set(&core->flushed, fctbl[i].iter); - env_atomic_set(&fctbl[i].completed, 0); - - if (fctbl[i].iter == fctbl[i].count || interrupt || - env_atomic_read(&error)) { - fc_to_flush--; - continue; - } - - _ocf_mngt_flush_portion(&fctbl[i]); - } - if (fc_to_flush) { - ocf_metadata_unlock(cache, OCF_METADATA_WR); - env_cond_resched(); - env_waitqueue_wait(wq, env_atomic_read(&progress)); - ocf_metadata_lock(cache, OCF_METADATA_WR); - } - if (cache->flushing_interrupted && !interrupt) { - if (allow_interruption) { - interrupt = true; - ocf_cache_log(cache, log_info, - "Flushing interrupted by " - "user\n"); - } else { - ocf_cache_log(cache, log_err, - "Cannot interrupt flushing\n"); - } - } - } - - return interrupt ? -OCF_ERR_FLUSHING_INTERRUPTED : - env_atomic_read(&error); + _ocf_flush_container_complete(context); } -static int _ocf_mngt_flush_core(ocf_core_t core, bool allow_interruption) + +static void _ocf_mngt_flush_core( + struct ocf_mngt_cache_flush_context *context, + ocf_flush_complete_t complete) { + ocf_cache_t cache = context->cache; + ocf_core_t core = context->core; ocf_core_id_t core_id = ocf_core_get_id(core); - ocf_cache_t cache = core->volume.cache; - struct flush_container fc; + struct flush_container *fc; int ret; + fc = env_vzalloc(sizeof(*fc)); + if (!fc) { + complete(context, -OCF_ERR_NO_MEM); + return; + } + ocf_metadata_lock(cache, OCF_METADATA_WR); ret = _ocf_mngt_get_sectors(cache, core_id, - &fc.flush_data, &fc.count); + &fc->flush_data, &fc->count); if (ret) { ocf_core_log(core, log_err, "Flushing operation aborted, " "no memory\n"); - goto out; + env_vfree(fc); + complete(context, -OCF_ERR_NO_MEM); + return; } - fc.core_id = core_id; - fc.iter = 0; + fc->core_id = core_id; + fc->iter = 0; - ret = _ocf_mngt_flush_containers(cache, &fc, 1, allow_interruption); + _ocf_mngt_flush_containers(context, fc, 1, complete); - _ocf_mngt_free_sectors(fc.flush_data); - -out: ocf_metadata_unlock(cache, OCF_METADATA_WR); - return ret; } -static int _ocf_mngt_flush_all_cores(ocf_cache_t cache, bool allow_interruption) +static void _ocf_mngt_flush_all_cores( + struct ocf_mngt_cache_flush_context *context, + ocf_flush_complete_t complete) { + ocf_cache_t cache = context->cache; struct flush_container *fctbl = NULL; uint32_t fcnum = 0; int ret; + if (context->op == flush_cache) + ocf_cache_log(cache, log_info, "Flushing cache\n"); + else if (context->op == purge_cache) + ocf_cache_log(cache, log_info, "Purging cache\n"); + + env_atomic_set(&cache->flush_in_progress, 1); + ocf_metadata_lock(cache, OCF_METADATA_WR); /* Get all 'dirty' sectors for all cores */ @@ -412,42 +553,23 @@ static int _ocf_mngt_flush_all_cores(ocf_cache_t cache, bool allow_interruption) if (ret) { ocf_cache_log(cache, log_err, "Flushing operation aborted, " "no memory\n"); - goto out; + complete(context, ret); + return; } - ret = _ocf_mngt_flush_containers(cache, fctbl, fcnum, - allow_interruption); + _ocf_mngt_flush_containers(context, fctbl, fcnum, complete); - _ocf_mngt_free_flush_containers(fctbl, fcnum); - -out: ocf_metadata_unlock(cache, OCF_METADATA_WR); - return ret; } -/** - * Flush all the dirty data stored on cache (all the cores attached to it) - * @param cache cache instance to which operation applies - * @param allow_interruption whenever to allow interruption of flushing process. - * if set to 0, all requests to interrupt flushing will be ignored - */ -static int _ocf_mng_cache_flush(ocf_cache_t cache, bool interruption) +static void _ocf_mngt_flush_all_cores_complete( + struct ocf_mngt_cache_flush_context *context, int error) { - int result = 0; - int i, j; - - env_atomic_set(&cache->flush_in_progress, 1); - cache->flushing_interrupted = 0; - do { - env_cond_resched(); - result = _ocf_mngt_flush_all_cores(cache, interruption); - if (result) { - /* Cleaning error */ - break; - } - } while (ocf_mngt_cache_is_dirty(cache)); + ocf_cache_t cache = context->cache; + uint32_t i, j; env_atomic_set(&cache->flush_in_progress, 0); + for (i = 0, j = 0; i < OCF_CORE_MAX; i++) { if (!env_bit_test(i, cache->conf_meta->valid_core_bitmap)) continue; @@ -458,12 +580,87 @@ static int _ocf_mng_cache_flush(ocf_cache_t cache, bool interruption) break; } - return result; + if (error) { + ocf_pipeline_finish(context->pipeline, error); + return; + } + + if (context->op == flush_cache) + ocf_cache_log(cache, log_info, "Flushing cache completed\n"); + + ocf_pipeline_next(context->pipeline); } +/** + * Flush all the dirty data stored on cache (all the cores attached to it) + */ +static void _ocf_mngt_cache_flush(ocf_pipeline_t pipeline, void *priv, + ocf_pipeline_arg_t arg) +{ + struct ocf_mngt_cache_flush_context *context = priv; + _ocf_mngt_flush_all_cores(context, _ocf_mngt_flush_all_cores_complete); +} + +static void _ocf_mngt_flush_finish(ocf_pipeline_t pipeline, void *priv, + int error) + +{ + struct ocf_mngt_cache_flush_context *context = priv; + ocf_cache_t cache = context->cache; + int64_t core_id; + + if (!error) { + switch(context->op) { + case flush_cache: + case purge_cache: + ENV_BUG_ON(ocf_mngt_cache_is_dirty(cache)); + break; + case flush_core: + case purge_core: + core_id = ocf_core_get_id(context->core); + ENV_BUG_ON(env_atomic_read(&cache->core_runtime_meta + [core_id].dirty_clines)); + break; + } + } + + _ocf_mngt_end_flush(context->cache); + + switch (context->op) { + case flush_cache: + context->cmpl.flush_cache(context->cache, context->priv, error); + break; + case flush_core: + context->cmpl.flush_core(context->core, context->priv, error); + break; + case purge_cache: + context->cmpl.purge_cache(context->cache, context->priv, error); + break; + case purge_core: + context->cmpl.purge_core(context->core, context->priv, error); + break; + default: + ENV_BUG(); + } + + ocf_pipeline_destroy(context->pipeline); +} + +static struct ocf_pipeline_properties _ocf_mngt_cache_flush_pipeline_properties = { + .priv_size = sizeof(struct ocf_mngt_cache_flush_context), + .finish = _ocf_mngt_flush_finish, + .steps = { + OCF_PL_STEP(_ocf_mngt_begin_flush), + OCF_PL_STEP(_ocf_mngt_cache_flush), + OCF_PL_STEP_TERMINATOR(), + }, +}; + void ocf_mngt_cache_flush(ocf_cache_t cache, bool interruption, ocf_mngt_cache_flush_end_t cmpl, void *priv) { + ocf_pipeline_t pipeline; + struct ocf_mngt_cache_flush_context *context; int result = 0; OCF_CHECK_NULL(cache); @@ -489,47 +686,75 @@ void ocf_mngt_cache_flush(ocf_cache_t cache, bool interruption, return; } - ocf_cache_log(cache, log_info, "Flushing cache\n"); + result = ocf_pipeline_create(&pipeline, cache, + &_ocf_mngt_cache_flush_pipeline_properties); + if (result) { + cmpl(cache, priv, -OCF_ERR_NO_MEM); + return; + } + context = ocf_pipeline_get_priv(pipeline); - _ocf_mngt_begin_flush(cache); + context->pipeline = pipeline; + context->cmpl.flush_cache = cmpl; + context->priv = priv; + context->cache = cache; + context->allow_interruption = interruption; + context->op = flush_cache; - result = _ocf_mng_cache_flush(cache, interruption); - - _ocf_mngt_end_flush(cache); - - if (!result) - ocf_cache_log(cache, log_info, "Flushing cache completed\n"); - - cmpl(cache, priv, result); + ocf_pipeline_next(context->pipeline); } -static int _ocf_mng_core_flush(ocf_core_t core, bool interruption) +static void _ocf_mngt_flush_core_complete( + struct ocf_mngt_cache_flush_context *context, int error) { - ocf_cache_t cache = ocf_core_get_cache(core); - ocf_core_id_t core_id = ocf_core_get_id(core); - int ret; - - cache->flushing_interrupted = 0; - do { - env_cond_resched(); - ret = _ocf_mngt_flush_core(core, interruption); - if (ret == -OCF_ERR_FLUSHING_INTERRUPTED || - ret == -OCF_ERR_WRITE_CORE) { - break; - } - } while (env_atomic_read(&cache->core_runtime_meta[core_id]. - dirty_clines)); + ocf_cache_t cache = context->cache; + ocf_core_t core = context->core; env_atomic_set(&core->flushed, 0); - return ret; + if (error) { + ocf_pipeline_finish(context->pipeline, error); + return; + } + + if (context->op == flush_core) + ocf_cache_log(cache, log_info, "Flushing completed\n"); + + ocf_pipeline_next(context->pipeline); } +static void _ocf_mngt_core_flush(ocf_pipeline_t pipeline, void *priv, + ocf_pipeline_arg_t arg) +{ + struct ocf_mngt_cache_flush_context *context = priv; + ocf_cache_t cache = context->cache; + + if (context->op == flush_core) + ocf_cache_log(cache, log_info, "Flushing core\n"); + else if (context->op == purge_core) + ocf_cache_log(cache, log_info, "Purging core\n"); + + _ocf_mngt_flush_core(context, _ocf_mngt_flush_core_complete); +} + +static +struct ocf_pipeline_properties _ocf_mngt_core_flush_pipeline_properties = { + .priv_size = sizeof(struct ocf_mngt_cache_flush_context), + .finish = _ocf_mngt_flush_finish, + .steps = { + OCF_PL_STEP(_ocf_mngt_begin_flush), + OCF_PL_STEP(_ocf_mngt_core_flush), + OCF_PL_STEP_TERMINATOR(), + }, +}; + void ocf_mngt_core_flush(ocf_core_t core, bool interruption, ocf_mngt_core_flush_end_t cmpl, void *priv) { + ocf_pipeline_t pipeline; + struct ocf_mngt_cache_flush_context *context; ocf_cache_t cache; - int ret = 0; + int result; OCF_CHECK_NULL(core); @@ -556,24 +781,61 @@ void ocf_mngt_core_flush(ocf_core_t core, bool interruption, return; } - ocf_core_log(core, log_info, "Flushing\n"); + result = ocf_pipeline_create(&pipeline, cache, + &_ocf_mngt_core_flush_pipeline_properties); + if (result) { + cmpl(core, priv, -OCF_ERR_NO_MEM); + return; + } + context = ocf_pipeline_get_priv(pipeline); - _ocf_mngt_begin_flush(cache); + context->pipeline = pipeline; + context->cmpl.flush_core = cmpl; + context->priv = priv; + context->cache = cache; + context->allow_interruption = interruption; + context->op = flush_core; + context->core = core; - ret = _ocf_mng_core_flush(core, interruption); - - _ocf_mngt_end_flush(cache); - - if (!ret) - ocf_cache_log(cache, log_info, "Flushing completed\n"); - - cmpl(core, priv, ret); + ocf_pipeline_next(context->pipeline); } +static void _ocf_mngt_cache_invalidate(ocf_pipeline_t pipeline, void *priv, + ocf_pipeline_arg_t arg) +{ + struct ocf_mngt_cache_flush_context *context = priv; + ocf_cache_t cache = context->cache; + int result; + + OCF_METADATA_LOCK_WR(); + result = ocf_metadata_sparse_range(cache, context->purge.core_id, 0, + context->purge.end_byte); + OCF_METADATA_UNLOCK_WR(); + + if (result) + ocf_pipeline_finish(context->pipeline, result); + else + ocf_pipeline_next(context->pipeline); +} + +static +struct ocf_pipeline_properties _ocf_mngt_cache_purge_pipeline_properties = { + .priv_size = sizeof(struct ocf_mngt_cache_flush_context), + .finish = _ocf_mngt_flush_finish, + .steps = { + OCF_PL_STEP(_ocf_mngt_begin_flush), + OCF_PL_STEP(_ocf_mngt_cache_flush), + OCF_PL_STEP(_ocf_mngt_cache_invalidate), + OCF_PL_STEP_TERMINATOR(), + }, +}; + void ocf_mngt_cache_purge(ocf_cache_t cache, ocf_mngt_cache_purge_end_t cmpl, void *priv) { + ocf_pipeline_t pipeline; int result = 0; + struct ocf_mngt_cache_flush_context *context; OCF_CHECK_NULL(cache); @@ -584,30 +846,43 @@ void ocf_mngt_cache_purge(ocf_cache_t cache, return; } - _ocf_mngt_begin_flush(cache); + result = ocf_pipeline_create(&pipeline, cache, + &_ocf_mngt_cache_purge_pipeline_properties); + if (result) { + cmpl(cache, priv, -OCF_ERR_NO_MEM); + return; + } + context = ocf_pipeline_get_priv(pipeline); - ocf_cache_log(cache, log_info, "Purging\n"); + context->pipeline = pipeline; + context->cmpl.purge_cache = cmpl; + context->priv = priv; + context->cache = cache; + context->allow_interruption = true; + context->op = purge_cache; + context->purge.core_id = OCF_CORE_ID_INVALID; + context->purge.end_byte = ~0ULL; - result = _ocf_mng_cache_flush(cache, true); - - if (result) - goto out; - - OCF_METADATA_LOCK_WR(); - result = ocf_metadata_sparse_range(cache, OCF_CORE_ID_INVALID, 0, - ~0ULL); - OCF_METADATA_UNLOCK_WR(); - -out: - _ocf_mngt_end_flush(cache); - - cmpl(cache, priv, result); + ocf_pipeline_next(context->pipeline); } +static +struct ocf_pipeline_properties _ocf_mngt_core_purge_pipeline_properties = { + .priv_size = sizeof(struct ocf_mngt_cache_flush_context), + .finish = _ocf_mngt_flush_finish, + .steps = { + OCF_PL_STEP(_ocf_mngt_begin_flush), + OCF_PL_STEP(_ocf_mngt_core_flush), + OCF_PL_STEP(_ocf_mngt_cache_invalidate), + OCF_PL_STEP_TERMINATOR(), + }, +}; void ocf_mngt_core_purge(ocf_core_t core, ocf_mngt_core_purge_end_t cmpl, void *priv) { + ocf_pipeline_t pipeline; + struct ocf_mngt_cache_flush_context *context; ocf_cache_t cache; ocf_core_id_t core_id; int result = 0; @@ -626,26 +901,27 @@ void ocf_mngt_core_purge(ocf_core_t core, } core_size = ocf_volume_get_length(&cache->core[core_id].volume); - core_size = core_size ?: ~0ULL; - _ocf_mngt_begin_flush(cache); + result = ocf_pipeline_create(&pipeline, cache, + &_ocf_mngt_core_purge_pipeline_properties); + if (result) { + cmpl(core, priv, -OCF_ERR_NO_MEM); + return; + } - ocf_core_log(core, log_info, "Purging\n"); + context = ocf_pipeline_get_priv(pipeline); - result = _ocf_mng_core_flush(core, true); + context->pipeline = pipeline; + context->cmpl.purge_core = cmpl; + context->priv = priv; + context->cache = cache; + context->allow_interruption = true; + context->op = purge_core; + context->purge.core_id = core_id; + context->purge.end_byte = core_size ?: ~0ULL; + context->core = core; - if (result) - goto out; - - OCF_METADATA_LOCK_WR(); - result = ocf_metadata_sparse_range(cache, core_id, 0, - core_size); - OCF_METADATA_UNLOCK_WR(); - -out: - _ocf_mngt_end_flush(cache); - - cmpl(core, priv, result); + ocf_pipeline_next(context->pipeline); } void ocf_mngt_cache_flush_interrupt(ocf_cache_t cache) diff --git a/src/utils/utils_cleaner.h b/src/utils/utils_cleaner.h index b2f3579..5773779 100644 --- a/src/utils/utils_cleaner.h +++ b/src/utils/utils_cleaner.h @@ -55,6 +55,8 @@ struct flush_data { ocf_core_id_t core_id; }; +typedef void (*ocf_flush_containter_coplete_t)(void *ctx); + /** * @brief Flush table container */ @@ -66,14 +68,15 @@ struct flush_container { struct ocf_cleaner_attribs attribs; ocf_cache_t cache; - env_atomic *progress; - env_atomic *error; - env_waitqueue *wq; - env_atomic completed; + + struct ocf_request *req; uint64_t flush_portion; uint64_t ticks1; uint64_t ticks2; + + ocf_flush_containter_coplete_t end; + struct ocf_mngt_cache_flush_context *context; }; /**