Asynchronous implementation of flush and purge - part 1

For flush/purge entry points to be fully asynchronous we still
need to rework flush mutex and waiting for outstanding dirty
requests.

Signed-off-by: Adam Rutkowski <adam.j.rutkowski@intel.com>
This commit is contained in:
Adam Rutkowski 2019-03-19 17:24:12 -04:00
parent 70df1d80a1
commit 0d0fd0be75
2 changed files with 474 additions and 195 deletions

View File

@ -9,22 +9,96 @@
#include "../metadata/metadata.h" #include "../metadata/metadata.h"
#include "../cleaning/cleaning.h" #include "../cleaning/cleaning.h"
#include "../engine/cache_engine.h" #include "../engine/cache_engine.h"
#include "../engine/engine_common.h"
#include "../utils/utils_cleaner.h" #include "../utils/utils_cleaner.h"
#include "../utils/utils_cache_line.h" #include "../utils/utils_cache_line.h"
#include "../utils/utils_part.h" #include "../utils/utils_part.h"
#include "../utils/utils_pipeline.h"
#include "../utils/utils_req.h"
#include "../ocf_def_priv.h" #include "../ocf_def_priv.h"
static inline void _ocf_mngt_begin_flush(struct ocf_cache *cache) struct ocf_mngt_cache_flush_context;
typedef void (*ocf_flush_complete_t)(struct ocf_mngt_cache_flush_context *, int);
struct flush_containers_context
{ {
/* array of container descriptors */
struct flush_container *fctbl;
/* fctbl array size */
uint32_t fcnum;
/* shared error for all concurrent container flushes */
env_atomic error;
/* number of outstanding container flushes */
env_atomic count;
/* first container flush to notice interrupt sets this to 1 */
env_atomic interrupt_seen;
/* completion to be called after all containers are flushed */
ocf_flush_complete_t complete;
};
/* common struct for cache/core flush/purge pipeline priv */
struct ocf_mngt_cache_flush_context
{
/* pipeline for flush / purge */
ocf_pipeline_t pipeline;
/* target cache */
ocf_cache_t cache;
/* target core */
ocf_core_t core;
/* true if flush interrupt respected */
bool allow_interruption;
/* management operation identifier */
enum {
flush_cache = 0,
flush_core,
purge_cache,
purge_core
} op;
/* ocf mngmt entry point completion */
union {
ocf_mngt_cache_flush_end_t flush_cache;
ocf_mngt_core_flush_end_t flush_core;
ocf_mngt_cache_purge_end_t purge_cache;
ocf_mngt_core_purge_end_t purge_core;
} cmpl;
/* completion pivate data */
void *priv;
/* purge parameters */
struct {
uint64_t end_byte;
uint64_t core_id;
} purge;
/* context for flush containers logic */
struct flush_containers_context fcs;
};
static void _ocf_mngt_begin_flush(ocf_pipeline_t pipeline, void *priv,
ocf_pipeline_arg_t arg)
{
struct ocf_mngt_cache_flush_context *context = priv;
ocf_cache_t cache = context->cache;
/* FIXME: need mechanism for async waiting for outstanding flushed to
* finish */
env_mutex_lock(&cache->flush_mutex); env_mutex_lock(&cache->flush_mutex);
env_atomic_inc(&cache->flush_started); env_atomic_inc(&cache->flush_started);
/* FIXME: remove waitqueue from async begin */
env_waitqueue_wait(cache->pending_dirty_wq, env_waitqueue_wait(cache->pending_dirty_wq,
!env_atomic_read(&cache->pending_dirty_requests)); !env_atomic_read(&cache->pending_dirty_requests));
cache->flushing_interrupted = 0;
ocf_pipeline_next(context->pipeline);
} }
static inline void _ocf_mngt_end_flush(struct ocf_cache *cache) static void _ocf_mngt_end_flush(ocf_cache_t cache)
{ {
ENV_BUG_ON(env_atomic_dec_return(&cache->flush_started) < 0); ENV_BUG_ON(env_atomic_dec_return(&cache->flush_started) < 0);
@ -119,11 +193,6 @@ static int _ocf_mngt_get_sectors(struct ocf_cache *cache, int core_id,
return 0; return 0;
} }
static void _ocf_mngt_free_sectors(void *tbl)
{
env_vfree(tbl);
}
static int _ocf_mngt_get_flush_containers(ocf_cache_t cache, static int _ocf_mngt_get_flush_containers(ocf_cache_t cache,
struct flush_container **fctbl, uint32_t *fcnum) struct flush_container **fctbl, uint32_t *fcnum)
{ {
@ -140,7 +209,6 @@ static int _ocf_mngt_get_flush_containers(ocf_cache_t cache,
* TODO: Create containers for each physical device, not for * TODO: Create containers for each physical device, not for
* each core. Cores can be partitions of single device. * each core. Cores can be partitions of single device.
*/ */
num = cache->conf_meta->core_count; num = cache->conf_meta->core_count;
if (num == 0) { if (num == 0) {
*fcnum = 0; *fcnum = 0;
@ -151,7 +219,7 @@ static int _ocf_mngt_get_flush_containers(ocf_cache_t cache,
if (!core_revmap) if (!core_revmap)
return -OCF_ERR_NO_MEM; return -OCF_ERR_NO_MEM;
/* TODO: Alloc flush_containers and data tables in single allocation */ /* TODO: Alloc fcs and data tables in single allocation */
fc = env_vzalloc(sizeof(**fctbl) * num); fc = env_vzalloc(sizeof(**fctbl) * num);
if (!fc) { if (!fc) {
env_vfree(core_revmap); env_vfree(core_revmap);
@ -233,7 +301,7 @@ static int _ocf_mngt_get_flush_containers(ocf_cache_t cache,
} }
static void _ocf_mngt_free_flush_containers(struct flush_container *fctbl, static void _ocf_mngt_free_flush_containers(struct flush_container *fctbl,
uint32_t num) uint32_t num)
{ {
int i; int i;
@ -278,133 +346,206 @@ static void _ocf_mngt_flush_portion(struct flush_container *fc)
fc->iter += curr_count; fc->iter += curr_count;
} }
static void _ocf_mngt_flush_end(void *private_data, int error) static void _ocf_mngt_flush_portion_end(void *private_data, int error)
{ {
struct flush_container *fc = private_data; struct flush_container *fc = private_data;
struct ocf_mngt_cache_flush_context *context = fc->context;
struct flush_containers_context *fsc = &context->fcs;
ocf_cache_t cache = context->cache;
ocf_core_t core = &cache->core[fc->core_id];
bool first_interrupt;
env_atomic_set(&core->flushed, fc->iter);
fc->ticks2 = env_get_tick_count(); fc->ticks2 = env_get_tick_count();
env_atomic_cmpxchg(fc->error, 0, error); env_atomic_cmpxchg(&fsc->error, 0, error);
env_atomic_set(&fc->completed, 1); if (cache->flushing_interrupted) {
env_atomic_inc(fc->progress); first_interrupt = !env_atomic_cmpxchg(&fsc->interrupt_seen, 0, 1);
env_waitqueue_wake_up(fc->wq); if (first_interrupt) {
if (context->allow_interruption) {
ocf_cache_log(cache, log_info,
"Flushing interrupted by "
"user\n");
} else {
ocf_cache_log(cache, log_err,
"Cannot interrupt flushing\n");
}
}
if (context->allow_interruption) {
env_atomic_cmpxchg(&fsc->error, 0,
-OCF_ERR_FLUSHING_INTERRUPTED);
}
}
if (env_atomic_read(&fsc->error) || fc->iter == fc->count) {
ocf_req_put(fc->req);
fc->end(context);
return;
}
ocf_engine_push_req_front(fc->req, false);
} }
static int _ocf_mngt_flush_containers(ocf_cache_t cache,
struct flush_container *fctbl, uint32_t fcnum, static int _ofc_flush_container_step(struct ocf_request *req)
bool allow_interruption) {
struct flush_container *fc = req->priv;
ocf_cache_t cache = fc->cache;
ocf_metadata_lock(cache, OCF_METADATA_WR);
_ocf_mngt_flush_portion(fc);
ocf_metadata_unlock(cache, OCF_METADATA_WR);
return 0;
}
static const struct ocf_io_if _io_if_flush_portion = {
.read = _ofc_flush_container_step,
.write = _ofc_flush_container_step,
};
static void _ocf_mngt_flush_container(
struct ocf_mngt_cache_flush_context *context,
struct flush_container *fc, ocf_flush_containter_coplete_t end)
{
ocf_cache_t cache = context->cache;
struct ocf_request *req;
int error = 0;
if (!fc->count)
goto finish;
fc->end = end;
fc->context = context;
req = ocf_req_new(cache->mngt_queue, NULL, 0, 0, 0);
if (!req) {
error = OCF_ERR_NO_MEM;
goto finish;
}
req->info.internal = true;
req->io_if = &_io_if_flush_portion;
req->priv = fc;
fc->req = req;
fc->attribs.cache_line_lock = true;
fc->attribs.cmpl_context = fc;
fc->attribs.cmpl_fn = _ocf_mngt_flush_portion_end;
fc->attribs.io_queue = cache->mngt_queue;
fc->cache = cache;
fc->flush_portion = OCF_MNG_FLUSH_MIN;
fc->ticks1 = 0;
fc->ticks2 = UINT_MAX;
ocf_engine_push_req_front(fc->req, true);
return;
finish:
env_atomic_cmpxchg(&context->fcs.error, 0, error);
end(fc);
}
void _ocf_flush_container_complete(void *ctx)
{
struct ocf_mngt_cache_flush_context *context = ctx;
if (atomic_dec_return(&context->fcs.count)) {
return;
}
_ocf_mngt_free_flush_containers(context->fcs.fctbl,
context->fcs.fcnum);
context->fcs.complete(context,
env_atomic_read(&context->fcs.error));
}
static void _ocf_mngt_flush_containers(
struct ocf_mngt_cache_flush_context *context,
struct flush_container *fctbl,
uint32_t fcnum, ocf_flush_complete_t complete)
{ {
uint32_t fc_to_flush;
env_waitqueue wq;
env_atomic progress; /* incremented each time flushing of a portion of a
container is completed */
env_atomic error;
ocf_core_t core;
bool interrupt = false;
int i; int i;
if (fcnum == 0) if (fcnum == 0) {
return 0; complete(context, 0);
return;
env_waitqueue_init(&wq); }
/* Sort data. Smallest sectors first (0...n). */ /* Sort data. Smallest sectors first (0...n). */
ocf_cleaner_sort_flush_containers(fctbl, fcnum); ocf_cleaner_sort_flush_containers(fctbl, fcnum);
env_atomic_set(&error, 0); env_atomic_set(&context->fcs.error, 0);
env_atomic_set(&context->fcs.count, 1);
context->fcs.complete = complete;
context->fcs.fctbl = fctbl;
context->fcs.fcnum = fcnum;
for (i = 0; i < fcnum; i++) { for (i = 0; i < fcnum; i++) {
fctbl[i].attribs.cache_line_lock = true; env_atomic_inc(&context->fcs.count);
fctbl[i].attribs.cmpl_context = &fctbl[i]; _ocf_mngt_flush_container(context, &fctbl[i],
fctbl[i].attribs.cmpl_fn = _ocf_mngt_flush_end; _ocf_flush_container_complete);
fctbl[i].attribs.io_queue = cache->mngt_queue;
fctbl[i].cache = cache;
fctbl[i].progress = &progress;
fctbl[i].error = &error;
fctbl[i].wq = &wq;
fctbl[i].flush_portion = OCF_MNG_FLUSH_MIN;
fctbl[i].ticks1 = 0;
fctbl[i].ticks2 = UINT_MAX;
env_atomic_set(&fctbl[i].completed, 1);
} }
for (fc_to_flush = fcnum; fc_to_flush > 0;) { _ocf_flush_container_complete(context);
env_atomic_set(&progress, 0);
for (i = 0; i < fcnum; i++) {
if (!env_atomic_read(&fctbl[i].completed))
continue;
core = &cache->core[fctbl[i].core_id];
env_atomic_set(&core->flushed, fctbl[i].iter);
env_atomic_set(&fctbl[i].completed, 0);
if (fctbl[i].iter == fctbl[i].count || interrupt ||
env_atomic_read(&error)) {
fc_to_flush--;
continue;
}
_ocf_mngt_flush_portion(&fctbl[i]);
}
if (fc_to_flush) {
ocf_metadata_unlock(cache, OCF_METADATA_WR);
env_cond_resched();
env_waitqueue_wait(wq, env_atomic_read(&progress));
ocf_metadata_lock(cache, OCF_METADATA_WR);
}
if (cache->flushing_interrupted && !interrupt) {
if (allow_interruption) {
interrupt = true;
ocf_cache_log(cache, log_info,
"Flushing interrupted by "
"user\n");
} else {
ocf_cache_log(cache, log_err,
"Cannot interrupt flushing\n");
}
}
}
return interrupt ? -OCF_ERR_FLUSHING_INTERRUPTED :
env_atomic_read(&error);
} }
static int _ocf_mngt_flush_core(ocf_core_t core, bool allow_interruption)
static void _ocf_mngt_flush_core(
struct ocf_mngt_cache_flush_context *context,
ocf_flush_complete_t complete)
{ {
ocf_cache_t cache = context->cache;
ocf_core_t core = context->core;
ocf_core_id_t core_id = ocf_core_get_id(core); ocf_core_id_t core_id = ocf_core_get_id(core);
ocf_cache_t cache = core->volume.cache; struct flush_container *fc;
struct flush_container fc;
int ret; int ret;
fc = env_vzalloc(sizeof(*fc));
if (!fc) {
complete(context, -OCF_ERR_NO_MEM);
return;
}
ocf_metadata_lock(cache, OCF_METADATA_WR); ocf_metadata_lock(cache, OCF_METADATA_WR);
ret = _ocf_mngt_get_sectors(cache, core_id, ret = _ocf_mngt_get_sectors(cache, core_id,
&fc.flush_data, &fc.count); &fc->flush_data, &fc->count);
if (ret) { if (ret) {
ocf_core_log(core, log_err, "Flushing operation aborted, " ocf_core_log(core, log_err, "Flushing operation aborted, "
"no memory\n"); "no memory\n");
goto out; env_vfree(fc);
complete(context, -OCF_ERR_NO_MEM);
return;
} }
fc.core_id = core_id; fc->core_id = core_id;
fc.iter = 0; fc->iter = 0;
ret = _ocf_mngt_flush_containers(cache, &fc, 1, allow_interruption); _ocf_mngt_flush_containers(context, fc, 1, complete);
_ocf_mngt_free_sectors(fc.flush_data);
out:
ocf_metadata_unlock(cache, OCF_METADATA_WR); ocf_metadata_unlock(cache, OCF_METADATA_WR);
return ret;
} }
static int _ocf_mngt_flush_all_cores(ocf_cache_t cache, bool allow_interruption) static void _ocf_mngt_flush_all_cores(
struct ocf_mngt_cache_flush_context *context,
ocf_flush_complete_t complete)
{ {
ocf_cache_t cache = context->cache;
struct flush_container *fctbl = NULL; struct flush_container *fctbl = NULL;
uint32_t fcnum = 0; uint32_t fcnum = 0;
int ret; int ret;
if (context->op == flush_cache)
ocf_cache_log(cache, log_info, "Flushing cache\n");
else if (context->op == purge_cache)
ocf_cache_log(cache, log_info, "Purging cache\n");
env_atomic_set(&cache->flush_in_progress, 1);
ocf_metadata_lock(cache, OCF_METADATA_WR); ocf_metadata_lock(cache, OCF_METADATA_WR);
/* Get all 'dirty' sectors for all cores */ /* Get all 'dirty' sectors for all cores */
@ -412,42 +553,23 @@ static int _ocf_mngt_flush_all_cores(ocf_cache_t cache, bool allow_interruption)
if (ret) { if (ret) {
ocf_cache_log(cache, log_err, "Flushing operation aborted, " ocf_cache_log(cache, log_err, "Flushing operation aborted, "
"no memory\n"); "no memory\n");
goto out; complete(context, ret);
return;
} }
ret = _ocf_mngt_flush_containers(cache, fctbl, fcnum, _ocf_mngt_flush_containers(context, fctbl, fcnum, complete);
allow_interruption);
_ocf_mngt_free_flush_containers(fctbl, fcnum);
out:
ocf_metadata_unlock(cache, OCF_METADATA_WR); ocf_metadata_unlock(cache, OCF_METADATA_WR);
return ret;
} }
/** static void _ocf_mngt_flush_all_cores_complete(
* Flush all the dirty data stored on cache (all the cores attached to it) struct ocf_mngt_cache_flush_context *context, int error)
* @param cache cache instance to which operation applies
* @param allow_interruption whenever to allow interruption of flushing process.
* if set to 0, all requests to interrupt flushing will be ignored
*/
static int _ocf_mng_cache_flush(ocf_cache_t cache, bool interruption)
{ {
int result = 0; ocf_cache_t cache = context->cache;
int i, j; uint32_t i, j;
env_atomic_set(&cache->flush_in_progress, 1);
cache->flushing_interrupted = 0;
do {
env_cond_resched();
result = _ocf_mngt_flush_all_cores(cache, interruption);
if (result) {
/* Cleaning error */
break;
}
} while (ocf_mngt_cache_is_dirty(cache));
env_atomic_set(&cache->flush_in_progress, 0); env_atomic_set(&cache->flush_in_progress, 0);
for (i = 0, j = 0; i < OCF_CORE_MAX; i++) { for (i = 0, j = 0; i < OCF_CORE_MAX; i++) {
if (!env_bit_test(i, cache->conf_meta->valid_core_bitmap)) if (!env_bit_test(i, cache->conf_meta->valid_core_bitmap))
continue; continue;
@ -458,12 +580,87 @@ static int _ocf_mng_cache_flush(ocf_cache_t cache, bool interruption)
break; break;
} }
return result; if (error) {
ocf_pipeline_finish(context->pipeline, error);
return;
}
if (context->op == flush_cache)
ocf_cache_log(cache, log_info, "Flushing cache completed\n");
ocf_pipeline_next(context->pipeline);
} }
/**
* Flush all the dirty data stored on cache (all the cores attached to it)
*/
static void _ocf_mngt_cache_flush(ocf_pipeline_t pipeline, void *priv,
ocf_pipeline_arg_t arg)
{
struct ocf_mngt_cache_flush_context *context = priv;
_ocf_mngt_flush_all_cores(context, _ocf_mngt_flush_all_cores_complete);
}
static void _ocf_mngt_flush_finish(ocf_pipeline_t pipeline, void *priv,
int error)
{
struct ocf_mngt_cache_flush_context *context = priv;
ocf_cache_t cache = context->cache;
int64_t core_id;
if (!error) {
switch(context->op) {
case flush_cache:
case purge_cache:
ENV_BUG_ON(ocf_mngt_cache_is_dirty(cache));
break;
case flush_core:
case purge_core:
core_id = ocf_core_get_id(context->core);
ENV_BUG_ON(env_atomic_read(&cache->core_runtime_meta
[core_id].dirty_clines));
break;
}
}
_ocf_mngt_end_flush(context->cache);
switch (context->op) {
case flush_cache:
context->cmpl.flush_cache(context->cache, context->priv, error);
break;
case flush_core:
context->cmpl.flush_core(context->core, context->priv, error);
break;
case purge_cache:
context->cmpl.purge_cache(context->cache, context->priv, error);
break;
case purge_core:
context->cmpl.purge_core(context->core, context->priv, error);
break;
default:
ENV_BUG();
}
ocf_pipeline_destroy(context->pipeline);
}
static struct ocf_pipeline_properties _ocf_mngt_cache_flush_pipeline_properties = {
.priv_size = sizeof(struct ocf_mngt_cache_flush_context),
.finish = _ocf_mngt_flush_finish,
.steps = {
OCF_PL_STEP(_ocf_mngt_begin_flush),
OCF_PL_STEP(_ocf_mngt_cache_flush),
OCF_PL_STEP_TERMINATOR(),
},
};
void ocf_mngt_cache_flush(ocf_cache_t cache, bool interruption, void ocf_mngt_cache_flush(ocf_cache_t cache, bool interruption,
ocf_mngt_cache_flush_end_t cmpl, void *priv) ocf_mngt_cache_flush_end_t cmpl, void *priv)
{ {
ocf_pipeline_t pipeline;
struct ocf_mngt_cache_flush_context *context;
int result = 0; int result = 0;
OCF_CHECK_NULL(cache); OCF_CHECK_NULL(cache);
@ -489,47 +686,75 @@ void ocf_mngt_cache_flush(ocf_cache_t cache, bool interruption,
return; return;
} }
ocf_cache_log(cache, log_info, "Flushing cache\n"); result = ocf_pipeline_create(&pipeline, cache,
&_ocf_mngt_cache_flush_pipeline_properties);
if (result) {
cmpl(cache, priv, -OCF_ERR_NO_MEM);
return;
}
context = ocf_pipeline_get_priv(pipeline);
_ocf_mngt_begin_flush(cache); context->pipeline = pipeline;
context->cmpl.flush_cache = cmpl;
context->priv = priv;
context->cache = cache;
context->allow_interruption = interruption;
context->op = flush_cache;
result = _ocf_mng_cache_flush(cache, interruption); ocf_pipeline_next(context->pipeline);
_ocf_mngt_end_flush(cache);
if (!result)
ocf_cache_log(cache, log_info, "Flushing cache completed\n");
cmpl(cache, priv, result);
} }
static int _ocf_mng_core_flush(ocf_core_t core, bool interruption) static void _ocf_mngt_flush_core_complete(
struct ocf_mngt_cache_flush_context *context, int error)
{ {
ocf_cache_t cache = ocf_core_get_cache(core); ocf_cache_t cache = context->cache;
ocf_core_id_t core_id = ocf_core_get_id(core); ocf_core_t core = context->core;
int ret;
cache->flushing_interrupted = 0;
do {
env_cond_resched();
ret = _ocf_mngt_flush_core(core, interruption);
if (ret == -OCF_ERR_FLUSHING_INTERRUPTED ||
ret == -OCF_ERR_WRITE_CORE) {
break;
}
} while (env_atomic_read(&cache->core_runtime_meta[core_id].
dirty_clines));
env_atomic_set(&core->flushed, 0); env_atomic_set(&core->flushed, 0);
return ret; if (error) {
ocf_pipeline_finish(context->pipeline, error);
return;
}
if (context->op == flush_core)
ocf_cache_log(cache, log_info, "Flushing completed\n");
ocf_pipeline_next(context->pipeline);
} }
static void _ocf_mngt_core_flush(ocf_pipeline_t pipeline, void *priv,
ocf_pipeline_arg_t arg)
{
struct ocf_mngt_cache_flush_context *context = priv;
ocf_cache_t cache = context->cache;
if (context->op == flush_core)
ocf_cache_log(cache, log_info, "Flushing core\n");
else if (context->op == purge_core)
ocf_cache_log(cache, log_info, "Purging core\n");
_ocf_mngt_flush_core(context, _ocf_mngt_flush_core_complete);
}
static
struct ocf_pipeline_properties _ocf_mngt_core_flush_pipeline_properties = {
.priv_size = sizeof(struct ocf_mngt_cache_flush_context),
.finish = _ocf_mngt_flush_finish,
.steps = {
OCF_PL_STEP(_ocf_mngt_begin_flush),
OCF_PL_STEP(_ocf_mngt_core_flush),
OCF_PL_STEP_TERMINATOR(),
},
};
void ocf_mngt_core_flush(ocf_core_t core, bool interruption, void ocf_mngt_core_flush(ocf_core_t core, bool interruption,
ocf_mngt_core_flush_end_t cmpl, void *priv) ocf_mngt_core_flush_end_t cmpl, void *priv)
{ {
ocf_pipeline_t pipeline;
struct ocf_mngt_cache_flush_context *context;
ocf_cache_t cache; ocf_cache_t cache;
int ret = 0; int result;
OCF_CHECK_NULL(core); OCF_CHECK_NULL(core);
@ -556,24 +781,61 @@ void ocf_mngt_core_flush(ocf_core_t core, bool interruption,
return; return;
} }
ocf_core_log(core, log_info, "Flushing\n"); result = ocf_pipeline_create(&pipeline, cache,
&_ocf_mngt_core_flush_pipeline_properties);
if (result) {
cmpl(core, priv, -OCF_ERR_NO_MEM);
return;
}
context = ocf_pipeline_get_priv(pipeline);
_ocf_mngt_begin_flush(cache); context->pipeline = pipeline;
context->cmpl.flush_core = cmpl;
context->priv = priv;
context->cache = cache;
context->allow_interruption = interruption;
context->op = flush_core;
context->core = core;
ret = _ocf_mng_core_flush(core, interruption); ocf_pipeline_next(context->pipeline);
_ocf_mngt_end_flush(cache);
if (!ret)
ocf_cache_log(cache, log_info, "Flushing completed\n");
cmpl(core, priv, ret);
} }
static void _ocf_mngt_cache_invalidate(ocf_pipeline_t pipeline, void *priv,
ocf_pipeline_arg_t arg)
{
struct ocf_mngt_cache_flush_context *context = priv;
ocf_cache_t cache = context->cache;
int result;
OCF_METADATA_LOCK_WR();
result = ocf_metadata_sparse_range(cache, context->purge.core_id, 0,
context->purge.end_byte);
OCF_METADATA_UNLOCK_WR();
if (result)
ocf_pipeline_finish(context->pipeline, result);
else
ocf_pipeline_next(context->pipeline);
}
static
struct ocf_pipeline_properties _ocf_mngt_cache_purge_pipeline_properties = {
.priv_size = sizeof(struct ocf_mngt_cache_flush_context),
.finish = _ocf_mngt_flush_finish,
.steps = {
OCF_PL_STEP(_ocf_mngt_begin_flush),
OCF_PL_STEP(_ocf_mngt_cache_flush),
OCF_PL_STEP(_ocf_mngt_cache_invalidate),
OCF_PL_STEP_TERMINATOR(),
},
};
void ocf_mngt_cache_purge(ocf_cache_t cache, void ocf_mngt_cache_purge(ocf_cache_t cache,
ocf_mngt_cache_purge_end_t cmpl, void *priv) ocf_mngt_cache_purge_end_t cmpl, void *priv)
{ {
ocf_pipeline_t pipeline;
int result = 0; int result = 0;
struct ocf_mngt_cache_flush_context *context;
OCF_CHECK_NULL(cache); OCF_CHECK_NULL(cache);
@ -584,30 +846,43 @@ void ocf_mngt_cache_purge(ocf_cache_t cache,
return; return;
} }
_ocf_mngt_begin_flush(cache); result = ocf_pipeline_create(&pipeline, cache,
&_ocf_mngt_cache_purge_pipeline_properties);
if (result) {
cmpl(cache, priv, -OCF_ERR_NO_MEM);
return;
}
context = ocf_pipeline_get_priv(pipeline);
ocf_cache_log(cache, log_info, "Purging\n"); context->pipeline = pipeline;
context->cmpl.purge_cache = cmpl;
context->priv = priv;
context->cache = cache;
context->allow_interruption = true;
context->op = purge_cache;
context->purge.core_id = OCF_CORE_ID_INVALID;
context->purge.end_byte = ~0ULL;
result = _ocf_mng_cache_flush(cache, true); ocf_pipeline_next(context->pipeline);
if (result)
goto out;
OCF_METADATA_LOCK_WR();
result = ocf_metadata_sparse_range(cache, OCF_CORE_ID_INVALID, 0,
~0ULL);
OCF_METADATA_UNLOCK_WR();
out:
_ocf_mngt_end_flush(cache);
cmpl(cache, priv, result);
} }
static
struct ocf_pipeline_properties _ocf_mngt_core_purge_pipeline_properties = {
.priv_size = sizeof(struct ocf_mngt_cache_flush_context),
.finish = _ocf_mngt_flush_finish,
.steps = {
OCF_PL_STEP(_ocf_mngt_begin_flush),
OCF_PL_STEP(_ocf_mngt_core_flush),
OCF_PL_STEP(_ocf_mngt_cache_invalidate),
OCF_PL_STEP_TERMINATOR(),
},
};
void ocf_mngt_core_purge(ocf_core_t core, void ocf_mngt_core_purge(ocf_core_t core,
ocf_mngt_core_purge_end_t cmpl, void *priv) ocf_mngt_core_purge_end_t cmpl, void *priv)
{ {
ocf_pipeline_t pipeline;
struct ocf_mngt_cache_flush_context *context;
ocf_cache_t cache; ocf_cache_t cache;
ocf_core_id_t core_id; ocf_core_id_t core_id;
int result = 0; int result = 0;
@ -626,26 +901,27 @@ void ocf_mngt_core_purge(ocf_core_t core,
} }
core_size = ocf_volume_get_length(&cache->core[core_id].volume); core_size = ocf_volume_get_length(&cache->core[core_id].volume);
core_size = core_size ?: ~0ULL;
_ocf_mngt_begin_flush(cache); result = ocf_pipeline_create(&pipeline, cache,
&_ocf_mngt_core_purge_pipeline_properties);
if (result) {
cmpl(core, priv, -OCF_ERR_NO_MEM);
return;
}
ocf_core_log(core, log_info, "Purging\n"); context = ocf_pipeline_get_priv(pipeline);
result = _ocf_mng_core_flush(core, true); context->pipeline = pipeline;
context->cmpl.purge_core = cmpl;
context->priv = priv;
context->cache = cache;
context->allow_interruption = true;
context->op = purge_core;
context->purge.core_id = core_id;
context->purge.end_byte = core_size ?: ~0ULL;
context->core = core;
if (result) ocf_pipeline_next(context->pipeline);
goto out;
OCF_METADATA_LOCK_WR();
result = ocf_metadata_sparse_range(cache, core_id, 0,
core_size);
OCF_METADATA_UNLOCK_WR();
out:
_ocf_mngt_end_flush(cache);
cmpl(core, priv, result);
} }
void ocf_mngt_cache_flush_interrupt(ocf_cache_t cache) void ocf_mngt_cache_flush_interrupt(ocf_cache_t cache)

View File

@ -55,6 +55,8 @@ struct flush_data {
ocf_core_id_t core_id; ocf_core_id_t core_id;
}; };
typedef void (*ocf_flush_containter_coplete_t)(void *ctx);
/** /**
* @brief Flush table container * @brief Flush table container
*/ */
@ -66,14 +68,15 @@ struct flush_container {
struct ocf_cleaner_attribs attribs; struct ocf_cleaner_attribs attribs;
ocf_cache_t cache; ocf_cache_t cache;
env_atomic *progress;
env_atomic *error; struct ocf_request *req;
env_waitqueue *wq;
env_atomic completed;
uint64_t flush_portion; uint64_t flush_portion;
uint64_t ticks1; uint64_t ticks1;
uint64_t ticks2; uint64_t ticks2;
ocf_flush_containter_coplete_t end;
struct ocf_mngt_cache_flush_context *context;
}; };
/** /**