simple example: implement queues based on threads

Signed-off-by: Adam Rutkowski <adam.j.rutkowski@intel.com>
This commit is contained in:
Adam Rutkowski 2021-09-17 10:30:33 +02:00
parent aac21cab63
commit 02066f3cdf
3 changed files with 216 additions and 41 deletions

View File

@ -7,9 +7,11 @@
#include <stdlib.h> #include <stdlib.h>
#include <errno.h> #include <errno.h>
#include <string.h> #include <string.h>
#include <semaphore.h>
#include <ocf/ocf.h> #include <ocf/ocf.h>
#include "data.h" #include "data.h"
#include "ctx.h" #include "ctx.h"
#include "queue_thread.h"
/* /*
* Cache private data. Used to share information between async contexts. * Cache private data. Used to share information between async contexts.
@ -29,63 +31,38 @@ void error(char *msg)
} }
/* /*
* Trigger queue asynchronously. Made synchronous for simplicity. * Queue ops providing interface for running queue thread in asynchronous
* Notice that it makes all asynchronous calls synchronous, because * way. Optional synchronous kick callback is not provided. The stop()
* asynchronism in OCF is achieved mostly by using queues. * operation is called just before queue is being destroyed.
*/
static inline void queue_kick_async(ocf_queue_t q)
{
ocf_queue_run(q);
}
/*
* Trigger queue synchronously. May be implemented as asynchronous as well,
* but in some environments kicking queue synchronously may reduce latency,
* so to take advantage of such situations OCF call synchronous variant of
* queue kick callback where possible.
*/
static void queue_kick_sync(ocf_queue_t q)
{
ocf_queue_run(q);
}
/*
* Stop queue thread. To keep this example simple we handle queues
* synchronously, thus it's left non-implemented.
*/
static void queue_stop(ocf_queue_t q)
{
}
/*
* Queue ops providing interface for running queue thread in both synchronous
* and asynchronous way. The stop() operation in called just before queue is
* being destroyed.
*/ */
const struct ocf_queue_ops queue_ops = { const struct ocf_queue_ops queue_ops = {
.kick_sync = queue_kick_sync, .kick = queue_thread_kick,
.kick = queue_kick_async, .stop = queue_thread_stop,
.stop = queue_stop,
}; };
/* /*
* Simple completion context. As lots of OCF API functions work asynchronously * Simple completion context. As lots of OCF API functions work asynchronously
* and call completion callback when job is done, we need some structure to * and call completion callback when job is done, we need some structure to
* share program state with completion callback. In this case we have single * share program state with completion callback. In this case we have a
* variable pointer to propagate error code. * variable pointer to propagate error code and a semaphore to signal
* completion.
*
*/ */
struct simple_context { struct simple_context {
int *error; int *error;
sem_t sem;
}; };
/* /*
* Basic asynchronous completion callback. Just propagate error code. * Basic asynchronous completion callback. Just propagate error code and
* up the semaphore.
*/ */
static void simple_complete(ocf_cache_t cache, void *priv, int error) static void simple_complete(ocf_cache_t cache, void *priv, int error)
{ {
struct simple_context *context= priv; struct simple_context *context= priv;
*context->error = error; *context->error = error;
sem_post(&context->sem);
} }
/* /*
@ -99,6 +76,11 @@ int initialize_cache(ocf_ctx_t ctx, ocf_cache_t *cache)
struct simple_context context; struct simple_context context;
int ret; int ret;
/* Initialize completion semaphore */
ret = sem_init(&context.sem, 0, 0);
if (ret)
return ret;
/* /*
* Asynchronous callbacks will assign error code to ret. That * Asynchronous callbacks will assign error code to ret. That
* way we have always the same variable holding last error code. * way we have always the same variable holding last error code.
@ -114,7 +96,7 @@ int initialize_cache(ocf_ctx_t ctx, ocf_cache_t *cache)
attach_cfg.device.volume_type = VOL_TYPE; attach_cfg.device.volume_type = VOL_TYPE;
ret = ocf_uuid_set_str(&attach_cfg.device.uuid, "cache"); ret = ocf_uuid_set_str(&attach_cfg.device.uuid, "cache");
if (ret) if (ret)
return ret; goto err_sem;
/* /*
* Allocate cache private structure. We can not initialize it * Allocate cache private structure. We can not initialize it
@ -141,6 +123,7 @@ int initialize_cache(ocf_ctx_t ctx, ocf_cache_t *cache)
ret = ocf_queue_create(*cache, &cache_priv->mngt_queue, &queue_ops); ret = ocf_queue_create(*cache, &cache_priv->mngt_queue, &queue_ops);
if (ret) { if (ret) {
ocf_mngt_cache_stop(*cache, simple_complete, &context); ocf_mngt_cache_stop(*cache, simple_complete, &context);
sem_wait(&context.sem);
goto err_priv; goto err_priv;
} }
@ -157,8 +140,13 @@ int initialize_cache(ocf_ctx_t ctx, ocf_cache_t *cache)
if (ret) if (ret)
goto err_cache; goto err_cache;
ret = initialize_threads(cache_priv->mngt_queue, cache_priv->io_queue);
if (ret)
goto err_cache;
/* Attach volume to cache */ /* Attach volume to cache */
ocf_mngt_cache_attach(*cache, &attach_cfg, simple_complete, &context); ocf_mngt_cache_attach(*cache, &attach_cfg, simple_complete, &context);
sem_wait(&context.sem);
if (ret) if (ret)
goto err_cache; goto err_cache;
@ -169,6 +157,8 @@ err_cache:
ocf_queue_put(cache_priv->mngt_queue); ocf_queue_put(cache_priv->mngt_queue);
err_priv: err_priv:
free(cache_priv); free(cache_priv);
err_sem:
sem_destroy(&context.sem);
return ret; return ret;
} }
@ -179,9 +169,12 @@ err_priv:
struct add_core_context { struct add_core_context {
ocf_core_t *core; ocf_core_t *core;
int *error; int *error;
sem_t sem;
}; };
/* Add core complete callback. Just rewrite args to context structure. */ /* Add core complete callback. Just rewrite args to context structure and
* up the semaphore.
*/
static void add_core_complete(ocf_cache_t cache, ocf_core_t core, static void add_core_complete(ocf_cache_t cache, ocf_core_t core,
void *priv, int error) void *priv, int error)
{ {
@ -189,6 +182,7 @@ static void add_core_complete(ocf_cache_t cache, ocf_core_t core,
*context->core = core; *context->core = core;
*context->error = error; *context->error = error;
sem_post(&context->sem);
} }
/* /*
@ -200,6 +194,11 @@ int initialize_core(ocf_cache_t cache, ocf_core_t *core)
struct add_core_context context; struct add_core_context context;
int ret; int ret;
/* Initialize completion semaphore */
ret = sem_init(&context.sem, 0, 0);
if (ret)
return ret;
/* /*
* Asynchronous callback will assign core handle to core, * Asynchronous callback will assign core handle to core,
* and to error code to ret. * and to error code to ret.
@ -213,10 +212,14 @@ int initialize_core(ocf_cache_t cache, ocf_core_t *core)
core_cfg.volume_type = VOL_TYPE; core_cfg.volume_type = VOL_TYPE;
ret = ocf_uuid_set_str(&core_cfg.uuid, "core"); ret = ocf_uuid_set_str(&core_cfg.uuid, "core");
if (ret) if (ret)
return ret; goto err_sem;
/* Add core to cache */ /* Add core to cache */
ocf_mngt_cache_add_core(cache, &core_cfg, add_core_complete, &context); ocf_mngt_cache_add_core(cache, &core_cfg, add_core_complete, &context);
sem_wait(&context.sem);
err_sem:
sem_destroy(&context.sem);
return ret; return ret;
} }
@ -330,6 +333,7 @@ static void remove_core_complete(void *priv, int error)
struct simple_context *context = priv; struct simple_context *context = priv;
*context->error = error; *context->error = error;
sem_post(&context->sem);
} }
int main(int argc, char *argv[]) int main(int argc, char *argv[])
@ -341,6 +345,10 @@ int main(int argc, char *argv[])
ocf_core_t core1; ocf_core_t core1;
int ret; int ret;
/* Initialize completion semaphore */
ret = sem_init(&context.sem, 0, 0);
if (ret)
error("Unable to initialize completion semaphore\n");
context.error = &ret; context.error = &ret;
/* Initialize OCF context */ /* Initialize OCF context */
@ -360,11 +368,13 @@ int main(int argc, char *argv[])
/* Remove core from cache */ /* Remove core from cache */
ocf_mngt_cache_remove_core(core1, remove_core_complete, &context); ocf_mngt_cache_remove_core(core1, remove_core_complete, &context);
sem_wait(&context.sem);
if (ret) if (ret)
error("Unable to remove core\n"); error("Unable to remove core\n");
/* Stop cache */ /* Stop cache */
ocf_mngt_cache_stop(cache1, simple_complete, &context); ocf_mngt_cache_stop(cache1, simple_complete, &context);
sem_wait(&context.sem);
if (ret) if (ret)
error("Unable to stop cache\n"); error("Unable to stop cache\n");
@ -378,5 +388,8 @@ int main(int argc, char *argv[])
/* Deinitialize context */ /* Deinitialize context */
ctx_cleanup(ctx); ctx_cleanup(ctx);
/* Destroy completion semaphore */
sem_destroy(&context.sem);
return 0; return 0;
} }

View File

@ -0,0 +1,152 @@
/*
* Copyright(c) 2021-2021 Intel Corporation
* SPDX-License-Identifier: BSD-3-Clause
*/
#include <stdlib.h>
#include <pthread.h>
#include <ocf/ocf.h>
#include "queue_thread.h"
/* queue thread main function */
static void* run(void *);
/* helper class to store all synchronization related objects */
struct queue_thread
{
/* thread running the queue */
pthread_t thread;
/* kick sets true, queue thread sets to false */
bool signalled;
/* request thread to exit */
bool stop;
/* conditional variable to sync queue thread and kick thread */
pthread_cond_t cv;
/* mutex for variables shared across threads */
pthread_mutex_t mutex;
/* associated OCF queue */
struct ocf_queue *queue;
};
struct queue_thread *queue_thread_init(struct ocf_queue *q)
{
struct queue_thread *qt = malloc(sizeof(*qt));
int ret;
if (!qt)
return NULL;
ret = pthread_cond_init(&qt->cv, NULL);
if (ret)
goto err_mem;
ret = pthread_mutex_init(&qt->mutex, NULL);
if (ret)
goto err_cond;
qt->signalled = false;
qt->stop = false;
qt->queue = q;
ret = pthread_create(&qt->thread, NULL, run, qt);
if (ret)
goto err_mutex;
return qt;
err_mutex:
pthread_mutex_destroy(&qt->mutex);
err_cond:
pthread_cond_destroy(&qt->cv);
err_mem:
free(qt);
return NULL;
}
void queue_thread_signal(struct queue_thread *qt, bool stop)
{
pthread_mutex_lock(&qt->mutex);
qt->signalled = true;
qt->stop = stop;
pthread_cond_signal(&qt->cv);
pthread_mutex_unlock(&qt->mutex);
}
void queue_thread_destroy(struct queue_thread *qt)
{
if (!qt)
return;
queue_thread_signal(qt, true);
pthread_join(qt->thread, NULL);
pthread_mutex_destroy(&qt->mutex);
pthread_cond_destroy(&qt->cv);
free(qt);
}
/* queue thread main function */
static void* run(void *arg)
{
struct queue_thread *qt = arg;
struct ocf_queue *q = qt->queue;
pthread_mutex_lock(&qt->mutex);
while (!qt->stop) {
if (qt->signalled) {
qt->signalled = false;
pthread_mutex_unlock(&qt->mutex);
/* execute items on the queue */
ocf_queue_run(q);
pthread_mutex_lock(&qt->mutex);
}
if (!qt->stop && !qt->signalled)
pthread_cond_wait(&qt->cv, &qt->mutex);
}
pthread_mutex_unlock(&qt->mutex);
pthread_exit(0);
}
/* initialize I/O queue and management queue thread */
int initialize_threads(struct ocf_queue *mngt_queue, struct ocf_queue *io_queue)
{
int ret = 0;
struct queue_thread* mngt_queue_thread = queue_thread_init(mngt_queue);
struct queue_thread* io_queue_thread = queue_thread_init(io_queue);
if (!mngt_queue_thread || !io_queue_thread) {
queue_thread_destroy(io_queue_thread);
queue_thread_destroy(mngt_queue_thread);
return 1;
}
ocf_queue_set_priv(mngt_queue, mngt_queue_thread);
ocf_queue_set_priv(io_queue, io_queue_thread);
return ret;
}
/* callback for OCF to kick the queue thread */
void queue_thread_kick(ocf_queue_t q)
{
struct queue_thread *qt = ocf_queue_get_priv(q);
queue_thread_signal(qt, false);
}
/* callback for OCF to stop the queue thread */
void queue_thread_stop(ocf_queue_t q)
{
struct queue_thread *qt = ocf_queue_get_priv(q);
queue_thread_destroy(qt);
}

View File

@ -0,0 +1,10 @@
/*
* Copyright(c) 2021-2021 Intel Corporation
* SPDX-License-Identifier: BSD-3-Clause
*/
#pragma once
int initialize_threads(struct ocf_queue *mngt_queue, struct ocf_queue *io_queue);
void queue_thread_kick(ocf_queue_t q);
void queue_thread_stop(ocf_queue_t q);