Merge pull request #80 from arutk/prv-async_flush

Asynchronous flush and purge operations - part 1
This commit is contained in:
Robert Bałdyga 2019-03-22 23:24:10 +01:00 committed by GitHub
commit a084c1cb1c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 474 additions and 195 deletions

View File

@ -9,22 +9,96 @@
#include "../metadata/metadata.h"
#include "../cleaning/cleaning.h"
#include "../engine/cache_engine.h"
#include "../engine/engine_common.h"
#include "../utils/utils_cleaner.h"
#include "../utils/utils_cache_line.h"
#include "../utils/utils_part.h"
#include "../utils/utils_pipeline.h"
#include "../utils/utils_req.h"
#include "../ocf_def_priv.h"
static inline void _ocf_mngt_begin_flush(struct ocf_cache *cache)
struct ocf_mngt_cache_flush_context;
typedef void (*ocf_flush_complete_t)(struct ocf_mngt_cache_flush_context *, int);
struct flush_containers_context
{
/* array of container descriptors */
struct flush_container *fctbl;
/* fctbl array size */
uint32_t fcnum;
/* shared error for all concurrent container flushes */
env_atomic error;
/* number of outstanding container flushes */
env_atomic count;
/* first container flush to notice interrupt sets this to 1 */
env_atomic interrupt_seen;
/* completion to be called after all containers are flushed */
ocf_flush_complete_t complete;
};
/* common struct for cache/core flush/purge pipeline priv */
struct ocf_mngt_cache_flush_context
{
/* pipeline for flush / purge */
ocf_pipeline_t pipeline;
/* target cache */
ocf_cache_t cache;
/* target core */
ocf_core_t core;
/* true if flush interrupt respected */
bool allow_interruption;
/* management operation identifier */
enum {
flush_cache = 0,
flush_core,
purge_cache,
purge_core
} op;
/* ocf mngmt entry point completion */
union {
ocf_mngt_cache_flush_end_t flush_cache;
ocf_mngt_core_flush_end_t flush_core;
ocf_mngt_cache_purge_end_t purge_cache;
ocf_mngt_core_purge_end_t purge_core;
} cmpl;
/* completion pivate data */
void *priv;
/* purge parameters */
struct {
uint64_t end_byte;
uint64_t core_id;
} purge;
/* context for flush containers logic */
struct flush_containers_context fcs;
};
static void _ocf_mngt_begin_flush(ocf_pipeline_t pipeline, void *priv,
ocf_pipeline_arg_t arg)
{
struct ocf_mngt_cache_flush_context *context = priv;
ocf_cache_t cache = context->cache;
/* FIXME: need mechanism for async waiting for outstanding flushed to
* finish */
env_mutex_lock(&cache->flush_mutex);
env_atomic_inc(&cache->flush_started);
/* FIXME: remove waitqueue from async begin */
env_waitqueue_wait(cache->pending_dirty_wq,
!env_atomic_read(&cache->pending_dirty_requests));
cache->flushing_interrupted = 0;
ocf_pipeline_next(context->pipeline);
}
static inline void _ocf_mngt_end_flush(struct ocf_cache *cache)
static void _ocf_mngt_end_flush(ocf_cache_t cache)
{
ENV_BUG_ON(env_atomic_dec_return(&cache->flush_started) < 0);
@ -119,11 +193,6 @@ static int _ocf_mngt_get_sectors(struct ocf_cache *cache, int core_id,
return 0;
}
static void _ocf_mngt_free_sectors(void *tbl)
{
env_vfree(tbl);
}
static int _ocf_mngt_get_flush_containers(ocf_cache_t cache,
struct flush_container **fctbl, uint32_t *fcnum)
{
@ -140,7 +209,6 @@ static int _ocf_mngt_get_flush_containers(ocf_cache_t cache,
* TODO: Create containers for each physical device, not for
* each core. Cores can be partitions of single device.
*/
num = cache->conf_meta->core_count;
if (num == 0) {
*fcnum = 0;
@ -151,7 +219,7 @@ static int _ocf_mngt_get_flush_containers(ocf_cache_t cache,
if (!core_revmap)
return -OCF_ERR_NO_MEM;
/* TODO: Alloc flush_containers and data tables in single allocation */
/* TODO: Alloc fcs and data tables in single allocation */
fc = env_vzalloc(sizeof(**fctbl) * num);
if (!fc) {
env_vfree(core_revmap);
@ -233,7 +301,7 @@ static int _ocf_mngt_get_flush_containers(ocf_cache_t cache,
}
static void _ocf_mngt_free_flush_containers(struct flush_container *fctbl,
uint32_t num)
uint32_t num)
{
int i;
@ -278,133 +346,206 @@ static void _ocf_mngt_flush_portion(struct flush_container *fc)
fc->iter += curr_count;
}
static void _ocf_mngt_flush_end(void *private_data, int error)
static void _ocf_mngt_flush_portion_end(void *private_data, int error)
{
struct flush_container *fc = private_data;
struct ocf_mngt_cache_flush_context *context = fc->context;
struct flush_containers_context *fsc = &context->fcs;
ocf_cache_t cache = context->cache;
ocf_core_t core = &cache->core[fc->core_id];
bool first_interrupt;
env_atomic_set(&core->flushed, fc->iter);
fc->ticks2 = env_get_tick_count();
env_atomic_cmpxchg(fc->error, 0, error);
env_atomic_cmpxchg(&fsc->error, 0, error);
env_atomic_set(&fc->completed, 1);
env_atomic_inc(fc->progress);
env_waitqueue_wake_up(fc->wq);
if (cache->flushing_interrupted) {
first_interrupt = !env_atomic_cmpxchg(&fsc->interrupt_seen, 0, 1);
if (first_interrupt) {
if (context->allow_interruption) {
ocf_cache_log(cache, log_info,
"Flushing interrupted by "
"user\n");
} else {
ocf_cache_log(cache, log_err,
"Cannot interrupt flushing\n");
}
}
if (context->allow_interruption) {
env_atomic_cmpxchg(&fsc->error, 0,
-OCF_ERR_FLUSHING_INTERRUPTED);
}
}
if (env_atomic_read(&fsc->error) || fc->iter == fc->count) {
ocf_req_put(fc->req);
fc->end(context);
return;
}
ocf_engine_push_req_front(fc->req, false);
}
static int _ocf_mngt_flush_containers(ocf_cache_t cache,
struct flush_container *fctbl, uint32_t fcnum,
bool allow_interruption)
static int _ofc_flush_container_step(struct ocf_request *req)
{
struct flush_container *fc = req->priv;
ocf_cache_t cache = fc->cache;
ocf_metadata_lock(cache, OCF_METADATA_WR);
_ocf_mngt_flush_portion(fc);
ocf_metadata_unlock(cache, OCF_METADATA_WR);
return 0;
}
static const struct ocf_io_if _io_if_flush_portion = {
.read = _ofc_flush_container_step,
.write = _ofc_flush_container_step,
};
static void _ocf_mngt_flush_container(
struct ocf_mngt_cache_flush_context *context,
struct flush_container *fc, ocf_flush_containter_coplete_t end)
{
ocf_cache_t cache = context->cache;
struct ocf_request *req;
int error = 0;
if (!fc->count)
goto finish;
fc->end = end;
fc->context = context;
req = ocf_req_new(cache->mngt_queue, NULL, 0, 0, 0);
if (!req) {
error = OCF_ERR_NO_MEM;
goto finish;
}
req->info.internal = true;
req->io_if = &_io_if_flush_portion;
req->priv = fc;
fc->req = req;
fc->attribs.cache_line_lock = true;
fc->attribs.cmpl_context = fc;
fc->attribs.cmpl_fn = _ocf_mngt_flush_portion_end;
fc->attribs.io_queue = cache->mngt_queue;
fc->cache = cache;
fc->flush_portion = OCF_MNG_FLUSH_MIN;
fc->ticks1 = 0;
fc->ticks2 = UINT_MAX;
ocf_engine_push_req_front(fc->req, true);
return;
finish:
env_atomic_cmpxchg(&context->fcs.error, 0, error);
end(fc);
}
void _ocf_flush_container_complete(void *ctx)
{
struct ocf_mngt_cache_flush_context *context = ctx;
if (atomic_dec_return(&context->fcs.count)) {
return;
}
_ocf_mngt_free_flush_containers(context->fcs.fctbl,
context->fcs.fcnum);
context->fcs.complete(context,
env_atomic_read(&context->fcs.error));
}
static void _ocf_mngt_flush_containers(
struct ocf_mngt_cache_flush_context *context,
struct flush_container *fctbl,
uint32_t fcnum, ocf_flush_complete_t complete)
{
uint32_t fc_to_flush;
env_waitqueue wq;
env_atomic progress; /* incremented each time flushing of a portion of a
container is completed */
env_atomic error;
ocf_core_t core;
bool interrupt = false;
int i;
if (fcnum == 0)
return 0;
env_waitqueue_init(&wq);
if (fcnum == 0) {
complete(context, 0);
return;
}
/* Sort data. Smallest sectors first (0...n). */
ocf_cleaner_sort_flush_containers(fctbl, fcnum);
env_atomic_set(&error, 0);
env_atomic_set(&context->fcs.error, 0);
env_atomic_set(&context->fcs.count, 1);
context->fcs.complete = complete;
context->fcs.fctbl = fctbl;
context->fcs.fcnum = fcnum;
for (i = 0; i < fcnum; i++) {
fctbl[i].attribs.cache_line_lock = true;
fctbl[i].attribs.cmpl_context = &fctbl[i];
fctbl[i].attribs.cmpl_fn = _ocf_mngt_flush_end;
fctbl[i].attribs.io_queue = cache->mngt_queue;
fctbl[i].cache = cache;
fctbl[i].progress = &progress;
fctbl[i].error = &error;
fctbl[i].wq = &wq;
fctbl[i].flush_portion = OCF_MNG_FLUSH_MIN;
fctbl[i].ticks1 = 0;
fctbl[i].ticks2 = UINT_MAX;
env_atomic_set(&fctbl[i].completed, 1);
env_atomic_inc(&context->fcs.count);
_ocf_mngt_flush_container(context, &fctbl[i],
_ocf_flush_container_complete);
}
for (fc_to_flush = fcnum; fc_to_flush > 0;) {
env_atomic_set(&progress, 0);
for (i = 0; i < fcnum; i++) {
if (!env_atomic_read(&fctbl[i].completed))
continue;
core = &cache->core[fctbl[i].core_id];
env_atomic_set(&core->flushed, fctbl[i].iter);
env_atomic_set(&fctbl[i].completed, 0);
if (fctbl[i].iter == fctbl[i].count || interrupt ||
env_atomic_read(&error)) {
fc_to_flush--;
continue;
}
_ocf_mngt_flush_portion(&fctbl[i]);
}
if (fc_to_flush) {
ocf_metadata_unlock(cache, OCF_METADATA_WR);
env_cond_resched();
env_waitqueue_wait(wq, env_atomic_read(&progress));
ocf_metadata_lock(cache, OCF_METADATA_WR);
}
if (cache->flushing_interrupted && !interrupt) {
if (allow_interruption) {
interrupt = true;
ocf_cache_log(cache, log_info,
"Flushing interrupted by "
"user\n");
} else {
ocf_cache_log(cache, log_err,
"Cannot interrupt flushing\n");
}
}
}
return interrupt ? -OCF_ERR_FLUSHING_INTERRUPTED :
env_atomic_read(&error);
_ocf_flush_container_complete(context);
}
static int _ocf_mngt_flush_core(ocf_core_t core, bool allow_interruption)
static void _ocf_mngt_flush_core(
struct ocf_mngt_cache_flush_context *context,
ocf_flush_complete_t complete)
{
ocf_cache_t cache = context->cache;
ocf_core_t core = context->core;
ocf_core_id_t core_id = ocf_core_get_id(core);
ocf_cache_t cache = core->volume.cache;
struct flush_container fc;
struct flush_container *fc;
int ret;
fc = env_vzalloc(sizeof(*fc));
if (!fc) {
complete(context, -OCF_ERR_NO_MEM);
return;
}
ocf_metadata_lock(cache, OCF_METADATA_WR);
ret = _ocf_mngt_get_sectors(cache, core_id,
&fc.flush_data, &fc.count);
&fc->flush_data, &fc->count);
if (ret) {
ocf_core_log(core, log_err, "Flushing operation aborted, "
"no memory\n");
goto out;
env_vfree(fc);
complete(context, -OCF_ERR_NO_MEM);
return;
}
fc.core_id = core_id;
fc.iter = 0;
fc->core_id = core_id;
fc->iter = 0;
ret = _ocf_mngt_flush_containers(cache, &fc, 1, allow_interruption);
_ocf_mngt_flush_containers(context, fc, 1, complete);
_ocf_mngt_free_sectors(fc.flush_data);
out:
ocf_metadata_unlock(cache, OCF_METADATA_WR);
return ret;
}
static int _ocf_mngt_flush_all_cores(ocf_cache_t cache, bool allow_interruption)
static void _ocf_mngt_flush_all_cores(
struct ocf_mngt_cache_flush_context *context,
ocf_flush_complete_t complete)
{
ocf_cache_t cache = context->cache;
struct flush_container *fctbl = NULL;
uint32_t fcnum = 0;
int ret;
if (context->op == flush_cache)
ocf_cache_log(cache, log_info, "Flushing cache\n");
else if (context->op == purge_cache)
ocf_cache_log(cache, log_info, "Purging cache\n");
env_atomic_set(&cache->flush_in_progress, 1);
ocf_metadata_lock(cache, OCF_METADATA_WR);
/* Get all 'dirty' sectors for all cores */
@ -412,42 +553,23 @@ static int _ocf_mngt_flush_all_cores(ocf_cache_t cache, bool allow_interruption)
if (ret) {
ocf_cache_log(cache, log_err, "Flushing operation aborted, "
"no memory\n");
goto out;
complete(context, ret);
return;
}
ret = _ocf_mngt_flush_containers(cache, fctbl, fcnum,
allow_interruption);
_ocf_mngt_flush_containers(context, fctbl, fcnum, complete);
_ocf_mngt_free_flush_containers(fctbl, fcnum);
out:
ocf_metadata_unlock(cache, OCF_METADATA_WR);
return ret;
}
/**
* Flush all the dirty data stored on cache (all the cores attached to it)
* @param cache cache instance to which operation applies
* @param allow_interruption whenever to allow interruption of flushing process.
* if set to 0, all requests to interrupt flushing will be ignored
*/
static int _ocf_mng_cache_flush(ocf_cache_t cache, bool interruption)
static void _ocf_mngt_flush_all_cores_complete(
struct ocf_mngt_cache_flush_context *context, int error)
{
int result = 0;
int i, j;
env_atomic_set(&cache->flush_in_progress, 1);
cache->flushing_interrupted = 0;
do {
env_cond_resched();
result = _ocf_mngt_flush_all_cores(cache, interruption);
if (result) {
/* Cleaning error */
break;
}
} while (ocf_mngt_cache_is_dirty(cache));
ocf_cache_t cache = context->cache;
uint32_t i, j;
env_atomic_set(&cache->flush_in_progress, 0);
for (i = 0, j = 0; i < OCF_CORE_MAX; i++) {
if (!env_bit_test(i, cache->conf_meta->valid_core_bitmap))
continue;
@ -458,12 +580,87 @@ static int _ocf_mng_cache_flush(ocf_cache_t cache, bool interruption)
break;
}
return result;
if (error) {
ocf_pipeline_finish(context->pipeline, error);
return;
}
if (context->op == flush_cache)
ocf_cache_log(cache, log_info, "Flushing cache completed\n");
ocf_pipeline_next(context->pipeline);
}
/**
* Flush all the dirty data stored on cache (all the cores attached to it)
*/
static void _ocf_mngt_cache_flush(ocf_pipeline_t pipeline, void *priv,
ocf_pipeline_arg_t arg)
{
struct ocf_mngt_cache_flush_context *context = priv;
_ocf_mngt_flush_all_cores(context, _ocf_mngt_flush_all_cores_complete);
}
static void _ocf_mngt_flush_finish(ocf_pipeline_t pipeline, void *priv,
int error)
{
struct ocf_mngt_cache_flush_context *context = priv;
ocf_cache_t cache = context->cache;
int64_t core_id;
if (!error) {
switch(context->op) {
case flush_cache:
case purge_cache:
ENV_BUG_ON(ocf_mngt_cache_is_dirty(cache));
break;
case flush_core:
case purge_core:
core_id = ocf_core_get_id(context->core);
ENV_BUG_ON(env_atomic_read(&cache->core_runtime_meta
[core_id].dirty_clines));
break;
}
}
_ocf_mngt_end_flush(context->cache);
switch (context->op) {
case flush_cache:
context->cmpl.flush_cache(context->cache, context->priv, error);
break;
case flush_core:
context->cmpl.flush_core(context->core, context->priv, error);
break;
case purge_cache:
context->cmpl.purge_cache(context->cache, context->priv, error);
break;
case purge_core:
context->cmpl.purge_core(context->core, context->priv, error);
break;
default:
ENV_BUG();
}
ocf_pipeline_destroy(context->pipeline);
}
static struct ocf_pipeline_properties _ocf_mngt_cache_flush_pipeline_properties = {
.priv_size = sizeof(struct ocf_mngt_cache_flush_context),
.finish = _ocf_mngt_flush_finish,
.steps = {
OCF_PL_STEP(_ocf_mngt_begin_flush),
OCF_PL_STEP(_ocf_mngt_cache_flush),
OCF_PL_STEP_TERMINATOR(),
},
};
void ocf_mngt_cache_flush(ocf_cache_t cache, bool interruption,
ocf_mngt_cache_flush_end_t cmpl, void *priv)
{
ocf_pipeline_t pipeline;
struct ocf_mngt_cache_flush_context *context;
int result = 0;
OCF_CHECK_NULL(cache);
@ -489,47 +686,75 @@ void ocf_mngt_cache_flush(ocf_cache_t cache, bool interruption,
return;
}
ocf_cache_log(cache, log_info, "Flushing cache\n");
result = ocf_pipeline_create(&pipeline, cache,
&_ocf_mngt_cache_flush_pipeline_properties);
if (result) {
cmpl(cache, priv, -OCF_ERR_NO_MEM);
return;
}
context = ocf_pipeline_get_priv(pipeline);
_ocf_mngt_begin_flush(cache);
context->pipeline = pipeline;
context->cmpl.flush_cache = cmpl;
context->priv = priv;
context->cache = cache;
context->allow_interruption = interruption;
context->op = flush_cache;
result = _ocf_mng_cache_flush(cache, interruption);
_ocf_mngt_end_flush(cache);
if (!result)
ocf_cache_log(cache, log_info, "Flushing cache completed\n");
cmpl(cache, priv, result);
ocf_pipeline_next(context->pipeline);
}
static int _ocf_mng_core_flush(ocf_core_t core, bool interruption)
static void _ocf_mngt_flush_core_complete(
struct ocf_mngt_cache_flush_context *context, int error)
{
ocf_cache_t cache = ocf_core_get_cache(core);
ocf_core_id_t core_id = ocf_core_get_id(core);
int ret;
cache->flushing_interrupted = 0;
do {
env_cond_resched();
ret = _ocf_mngt_flush_core(core, interruption);
if (ret == -OCF_ERR_FLUSHING_INTERRUPTED ||
ret == -OCF_ERR_WRITE_CORE) {
break;
}
} while (env_atomic_read(&cache->core_runtime_meta[core_id].
dirty_clines));
ocf_cache_t cache = context->cache;
ocf_core_t core = context->core;
env_atomic_set(&core->flushed, 0);
return ret;
if (error) {
ocf_pipeline_finish(context->pipeline, error);
return;
}
if (context->op == flush_core)
ocf_cache_log(cache, log_info, "Flushing completed\n");
ocf_pipeline_next(context->pipeline);
}
static void _ocf_mngt_core_flush(ocf_pipeline_t pipeline, void *priv,
ocf_pipeline_arg_t arg)
{
struct ocf_mngt_cache_flush_context *context = priv;
ocf_cache_t cache = context->cache;
if (context->op == flush_core)
ocf_cache_log(cache, log_info, "Flushing core\n");
else if (context->op == purge_core)
ocf_cache_log(cache, log_info, "Purging core\n");
_ocf_mngt_flush_core(context, _ocf_mngt_flush_core_complete);
}
static
struct ocf_pipeline_properties _ocf_mngt_core_flush_pipeline_properties = {
.priv_size = sizeof(struct ocf_mngt_cache_flush_context),
.finish = _ocf_mngt_flush_finish,
.steps = {
OCF_PL_STEP(_ocf_mngt_begin_flush),
OCF_PL_STEP(_ocf_mngt_core_flush),
OCF_PL_STEP_TERMINATOR(),
},
};
void ocf_mngt_core_flush(ocf_core_t core, bool interruption,
ocf_mngt_core_flush_end_t cmpl, void *priv)
{
ocf_pipeline_t pipeline;
struct ocf_mngt_cache_flush_context *context;
ocf_cache_t cache;
int ret = 0;
int result;
OCF_CHECK_NULL(core);
@ -556,24 +781,61 @@ void ocf_mngt_core_flush(ocf_core_t core, bool interruption,
return;
}
ocf_core_log(core, log_info, "Flushing\n");
result = ocf_pipeline_create(&pipeline, cache,
&_ocf_mngt_core_flush_pipeline_properties);
if (result) {
cmpl(core, priv, -OCF_ERR_NO_MEM);
return;
}
context = ocf_pipeline_get_priv(pipeline);
_ocf_mngt_begin_flush(cache);
context->pipeline = pipeline;
context->cmpl.flush_core = cmpl;
context->priv = priv;
context->cache = cache;
context->allow_interruption = interruption;
context->op = flush_core;
context->core = core;
ret = _ocf_mng_core_flush(core, interruption);
_ocf_mngt_end_flush(cache);
if (!ret)
ocf_cache_log(cache, log_info, "Flushing completed\n");
cmpl(core, priv, ret);
ocf_pipeline_next(context->pipeline);
}
static void _ocf_mngt_cache_invalidate(ocf_pipeline_t pipeline, void *priv,
ocf_pipeline_arg_t arg)
{
struct ocf_mngt_cache_flush_context *context = priv;
ocf_cache_t cache = context->cache;
int result;
OCF_METADATA_LOCK_WR();
result = ocf_metadata_sparse_range(cache, context->purge.core_id, 0,
context->purge.end_byte);
OCF_METADATA_UNLOCK_WR();
if (result)
ocf_pipeline_finish(context->pipeline, result);
else
ocf_pipeline_next(context->pipeline);
}
static
struct ocf_pipeline_properties _ocf_mngt_cache_purge_pipeline_properties = {
.priv_size = sizeof(struct ocf_mngt_cache_flush_context),
.finish = _ocf_mngt_flush_finish,
.steps = {
OCF_PL_STEP(_ocf_mngt_begin_flush),
OCF_PL_STEP(_ocf_mngt_cache_flush),
OCF_PL_STEP(_ocf_mngt_cache_invalidate),
OCF_PL_STEP_TERMINATOR(),
},
};
void ocf_mngt_cache_purge(ocf_cache_t cache,
ocf_mngt_cache_purge_end_t cmpl, void *priv)
{
ocf_pipeline_t pipeline;
int result = 0;
struct ocf_mngt_cache_flush_context *context;
OCF_CHECK_NULL(cache);
@ -584,30 +846,43 @@ void ocf_mngt_cache_purge(ocf_cache_t cache,
return;
}
_ocf_mngt_begin_flush(cache);
result = ocf_pipeline_create(&pipeline, cache,
&_ocf_mngt_cache_purge_pipeline_properties);
if (result) {
cmpl(cache, priv, -OCF_ERR_NO_MEM);
return;
}
context = ocf_pipeline_get_priv(pipeline);
ocf_cache_log(cache, log_info, "Purging\n");
context->pipeline = pipeline;
context->cmpl.purge_cache = cmpl;
context->priv = priv;
context->cache = cache;
context->allow_interruption = true;
context->op = purge_cache;
context->purge.core_id = OCF_CORE_ID_INVALID;
context->purge.end_byte = ~0ULL;
result = _ocf_mng_cache_flush(cache, true);
if (result)
goto out;
OCF_METADATA_LOCK_WR();
result = ocf_metadata_sparse_range(cache, OCF_CORE_ID_INVALID, 0,
~0ULL);
OCF_METADATA_UNLOCK_WR();
out:
_ocf_mngt_end_flush(cache);
cmpl(cache, priv, result);
ocf_pipeline_next(context->pipeline);
}
static
struct ocf_pipeline_properties _ocf_mngt_core_purge_pipeline_properties = {
.priv_size = sizeof(struct ocf_mngt_cache_flush_context),
.finish = _ocf_mngt_flush_finish,
.steps = {
OCF_PL_STEP(_ocf_mngt_begin_flush),
OCF_PL_STEP(_ocf_mngt_core_flush),
OCF_PL_STEP(_ocf_mngt_cache_invalidate),
OCF_PL_STEP_TERMINATOR(),
},
};
void ocf_mngt_core_purge(ocf_core_t core,
ocf_mngt_core_purge_end_t cmpl, void *priv)
{
ocf_pipeline_t pipeline;
struct ocf_mngt_cache_flush_context *context;
ocf_cache_t cache;
ocf_core_id_t core_id;
int result = 0;
@ -626,26 +901,27 @@ void ocf_mngt_core_purge(ocf_core_t core,
}
core_size = ocf_volume_get_length(&cache->core[core_id].volume);
core_size = core_size ?: ~0ULL;
_ocf_mngt_begin_flush(cache);
result = ocf_pipeline_create(&pipeline, cache,
&_ocf_mngt_core_purge_pipeline_properties);
if (result) {
cmpl(core, priv, -OCF_ERR_NO_MEM);
return;
}
ocf_core_log(core, log_info, "Purging\n");
context = ocf_pipeline_get_priv(pipeline);
result = _ocf_mng_core_flush(core, true);
context->pipeline = pipeline;
context->cmpl.purge_core = cmpl;
context->priv = priv;
context->cache = cache;
context->allow_interruption = true;
context->op = purge_core;
context->purge.core_id = core_id;
context->purge.end_byte = core_size ?: ~0ULL;
context->core = core;
if (result)
goto out;
OCF_METADATA_LOCK_WR();
result = ocf_metadata_sparse_range(cache, core_id, 0,
core_size);
OCF_METADATA_UNLOCK_WR();
out:
_ocf_mngt_end_flush(cache);
cmpl(core, priv, result);
ocf_pipeline_next(context->pipeline);
}
void ocf_mngt_cache_flush_interrupt(ocf_cache_t cache)

View File

@ -55,6 +55,8 @@ struct flush_data {
ocf_core_id_t core_id;
};
typedef void (*ocf_flush_containter_coplete_t)(void *ctx);
/**
* @brief Flush table container
*/
@ -66,14 +68,15 @@ struct flush_container {
struct ocf_cleaner_attribs attribs;
ocf_cache_t cache;
env_atomic *progress;
env_atomic *error;
env_waitqueue *wq;
env_atomic completed;
struct ocf_request *req;
uint64_t flush_portion;
uint64_t ticks1;
uint64_t ticks2;
ocf_flush_containter_coplete_t end;
struct ocf_mngt_cache_flush_context *context;
};
/**