diff --git a/example/simple/src/main.c b/example/simple/src/main.c index 00c05e7..cd503e7 100644 --- a/example/simple/src/main.c +++ b/example/simple/src/main.c @@ -7,9 +7,11 @@ #include #include #include +#include #include #include "data.h" #include "ctx.h" +#include "queue_thread.h" /* * Cache private data. Used to share information between async contexts. @@ -29,63 +31,38 @@ void error(char *msg) } /* - * Trigger queue asynchronously. Made synchronous for simplicity. - * Notice that it makes all asynchronous calls synchronous, because - * asynchronism in OCF is achieved mostly by using queues. - */ -static inline void queue_kick_async(ocf_queue_t q) -{ - ocf_queue_run(q); -} - -/* - * Trigger queue synchronously. May be implemented as asynchronous as well, - * but in some environments kicking queue synchronously may reduce latency, - * so to take advantage of such situations OCF call synchronous variant of - * queue kick callback where possible. - */ -static void queue_kick_sync(ocf_queue_t q) -{ - ocf_queue_run(q); -} - -/* - * Stop queue thread. To keep this example simple we handle queues - * synchronously, thus it's left non-implemented. - */ -static void queue_stop(ocf_queue_t q) -{ -} - -/* - * Queue ops providing interface for running queue thread in both synchronous - * and asynchronous way. The stop() operation in called just before queue is - * being destroyed. + * Queue ops providing interface for running queue thread in asynchronous + * way. Optional synchronous kick callback is not provided. The stop() + * operation is called just before queue is being destroyed. */ const struct ocf_queue_ops queue_ops = { - .kick_sync = queue_kick_sync, - .kick = queue_kick_async, - .stop = queue_stop, + .kick = queue_thread_kick, + .stop = queue_thread_stop, }; /* * Simple completion context. As lots of OCF API functions work asynchronously * and call completion callback when job is done, we need some structure to - * share program state with completion callback. In this case we have single - * variable pointer to propagate error code. + * share program state with completion callback. In this case we have a + * variable pointer to propagate error code and a semaphore to signal + * completion. + * */ struct simple_context { int *error; + sem_t sem; }; /* - * Basic asynchronous completion callback. Just propagate error code. + * Basic asynchronous completion callback. Just propagate error code and + * up the semaphore. */ static void simple_complete(ocf_cache_t cache, void *priv, int error) { struct simple_context *context= priv; *context->error = error; + sem_post(&context->sem); } /* @@ -99,6 +76,11 @@ int initialize_cache(ocf_ctx_t ctx, ocf_cache_t *cache) struct simple_context context; int ret; + /* Initialize completion semaphore */ + ret = sem_init(&context.sem, 0, 0); + if (ret) + return ret; + /* * Asynchronous callbacks will assign error code to ret. That * way we have always the same variable holding last error code. @@ -114,7 +96,7 @@ int initialize_cache(ocf_ctx_t ctx, ocf_cache_t *cache) attach_cfg.device.volume_type = VOL_TYPE; ret = ocf_uuid_set_str(&attach_cfg.device.uuid, "cache"); if (ret) - return ret; + goto err_sem; /* * Allocate cache private structure. We can not initialize it @@ -141,6 +123,7 @@ int initialize_cache(ocf_ctx_t ctx, ocf_cache_t *cache) ret = ocf_queue_create(*cache, &cache_priv->mngt_queue, &queue_ops); if (ret) { ocf_mngt_cache_stop(*cache, simple_complete, &context); + sem_wait(&context.sem); goto err_priv; } @@ -157,8 +140,13 @@ int initialize_cache(ocf_ctx_t ctx, ocf_cache_t *cache) if (ret) goto err_cache; + ret = initialize_threads(cache_priv->mngt_queue, cache_priv->io_queue); + if (ret) + goto err_cache; + /* Attach volume to cache */ ocf_mngt_cache_attach(*cache, &attach_cfg, simple_complete, &context); + sem_wait(&context.sem); if (ret) goto err_cache; @@ -169,6 +157,8 @@ err_cache: ocf_queue_put(cache_priv->mngt_queue); err_priv: free(cache_priv); +err_sem: + sem_destroy(&context.sem); return ret; } @@ -179,9 +169,12 @@ err_priv: struct add_core_context { ocf_core_t *core; int *error; + sem_t sem; }; -/* Add core complete callback. Just rewrite args to context structure. */ +/* Add core complete callback. Just rewrite args to context structure and + * up the semaphore. + */ static void add_core_complete(ocf_cache_t cache, ocf_core_t core, void *priv, int error) { @@ -189,6 +182,7 @@ static void add_core_complete(ocf_cache_t cache, ocf_core_t core, *context->core = core; *context->error = error; + sem_post(&context->sem); } /* @@ -200,6 +194,11 @@ int initialize_core(ocf_cache_t cache, ocf_core_t *core) struct add_core_context context; int ret; + /* Initialize completion semaphore */ + ret = sem_init(&context.sem, 0, 0); + if (ret) + return ret; + /* * Asynchronous callback will assign core handle to core, * and to error code to ret. @@ -213,10 +212,14 @@ int initialize_core(ocf_cache_t cache, ocf_core_t *core) core_cfg.volume_type = VOL_TYPE; ret = ocf_uuid_set_str(&core_cfg.uuid, "core"); if (ret) - return ret; + goto err_sem; /* Add core to cache */ ocf_mngt_cache_add_core(cache, &core_cfg, add_core_complete, &context); + sem_wait(&context.sem); + +err_sem: + sem_destroy(&context.sem); return ret; } @@ -330,6 +333,7 @@ static void remove_core_complete(void *priv, int error) struct simple_context *context = priv; *context->error = error; + sem_post(&context->sem); } int main(int argc, char *argv[]) @@ -341,6 +345,10 @@ int main(int argc, char *argv[]) ocf_core_t core1; int ret; + /* Initialize completion semaphore */ + ret = sem_init(&context.sem, 0, 0); + if (ret) + error("Unable to initialize completion semaphore\n"); context.error = &ret; /* Initialize OCF context */ @@ -360,11 +368,13 @@ int main(int argc, char *argv[]) /* Remove core from cache */ ocf_mngt_cache_remove_core(core1, remove_core_complete, &context); + sem_wait(&context.sem); if (ret) error("Unable to remove core\n"); /* Stop cache */ ocf_mngt_cache_stop(cache1, simple_complete, &context); + sem_wait(&context.sem); if (ret) error("Unable to stop cache\n"); @@ -378,5 +388,8 @@ int main(int argc, char *argv[]) /* Deinitialize context */ ctx_cleanup(ctx); + /* Destroy completion semaphore */ + sem_destroy(&context.sem); + return 0; } diff --git a/example/simple/src/queue_thread.c b/example/simple/src/queue_thread.c new file mode 100644 index 0000000..8eb2b13 --- /dev/null +++ b/example/simple/src/queue_thread.c @@ -0,0 +1,152 @@ +/* + * Copyright(c) 2021-2021 Intel Corporation + * SPDX-License-Identifier: BSD-3-Clause + */ + +#include +#include + +#include +#include "queue_thread.h" + +/* queue thread main function */ +static void* run(void *); + +/* helper class to store all synchronization related objects */ +struct queue_thread +{ + /* thread running the queue */ + pthread_t thread; + /* kick sets true, queue thread sets to false */ + bool signalled; + /* request thread to exit */ + bool stop; + /* conditional variable to sync queue thread and kick thread */ + pthread_cond_t cv; + /* mutex for variables shared across threads */ + pthread_mutex_t mutex; + /* associated OCF queue */ + struct ocf_queue *queue; +}; + +struct queue_thread *queue_thread_init(struct ocf_queue *q) +{ + struct queue_thread *qt = malloc(sizeof(*qt)); + int ret; + + if (!qt) + return NULL; + + ret = pthread_cond_init(&qt->cv, NULL); + if (ret) + goto err_mem; + + ret = pthread_mutex_init(&qt->mutex, NULL); + if (ret) + goto err_cond; + + qt->signalled = false; + qt->stop = false; + qt->queue = q; + + ret = pthread_create(&qt->thread, NULL, run, qt); + if (ret) + goto err_mutex; + + return qt; + +err_mutex: + pthread_mutex_destroy(&qt->mutex); +err_cond: + pthread_cond_destroy(&qt->cv); +err_mem: + free(qt); + + return NULL; +} + +void queue_thread_signal(struct queue_thread *qt, bool stop) +{ + pthread_mutex_lock(&qt->mutex); + qt->signalled = true; + qt->stop = stop; + pthread_cond_signal(&qt->cv); + pthread_mutex_unlock(&qt->mutex); +} + +void queue_thread_destroy(struct queue_thread *qt) +{ + if (!qt) + return; + + queue_thread_signal(qt, true); + pthread_join(qt->thread, NULL); + + pthread_mutex_destroy(&qt->mutex); + pthread_cond_destroy(&qt->cv); + free(qt); +} + +/* queue thread main function */ +static void* run(void *arg) +{ + struct queue_thread *qt = arg; + struct ocf_queue *q = qt->queue; + + pthread_mutex_lock(&qt->mutex); + + while (!qt->stop) { + if (qt->signalled) { + qt->signalled = false; + pthread_mutex_unlock(&qt->mutex); + + /* execute items on the queue */ + ocf_queue_run(q); + + pthread_mutex_lock(&qt->mutex); + } + + if (!qt->stop && !qt->signalled) + pthread_cond_wait(&qt->cv, &qt->mutex); + } + + pthread_mutex_unlock(&qt->mutex); + + pthread_exit(0); +} + +/* initialize I/O queue and management queue thread */ +int initialize_threads(struct ocf_queue *mngt_queue, struct ocf_queue *io_queue) +{ + int ret = 0; + + struct queue_thread* mngt_queue_thread = queue_thread_init(mngt_queue); + struct queue_thread* io_queue_thread = queue_thread_init(io_queue); + + if (!mngt_queue_thread || !io_queue_thread) { + queue_thread_destroy(io_queue_thread); + queue_thread_destroy(mngt_queue_thread); + return 1; + } + + ocf_queue_set_priv(mngt_queue, mngt_queue_thread); + ocf_queue_set_priv(io_queue, io_queue_thread); + + return ret; +} + +/* callback for OCF to kick the queue thread */ +void queue_thread_kick(ocf_queue_t q) +{ + struct queue_thread *qt = ocf_queue_get_priv(q); + + queue_thread_signal(qt, false); +} + +/* callback for OCF to stop the queue thread */ +void queue_thread_stop(ocf_queue_t q) +{ + struct queue_thread *qt = ocf_queue_get_priv(q); + + queue_thread_destroy(qt); +} diff --git a/example/simple/src/queue_thread.h b/example/simple/src/queue_thread.h new file mode 100644 index 0000000..eac4c1f --- /dev/null +++ b/example/simple/src/queue_thread.h @@ -0,0 +1,10 @@ +/* + * Copyright(c) 2021-2021 Intel Corporation + * SPDX-License-Identifier: BSD-3-Clause + */ + +#pragma once + +int initialize_threads(struct ocf_queue *mngt_queue, struct ocf_queue *io_queue); +void queue_thread_kick(ocf_queue_t q); +void queue_thread_stop(ocf_queue_t q);