* \param pool The pool to shut down
*/
void ast_threadpool_shutdown(struct ast_threadpool *pool);
+
+/*!
+ * \brief Serialized execution of tasks within a \ref ast_threadpool.
+ *
+ * \since 12.0.0
+ *
+ * A \ref ast_taskprocessor with the same contract as a default taskprocessor
+ * (tasks execute serially) except instead of executing out of a dedicated
+ * thread, execution occurs in a thread from a \ref ast_threadpool. Think of it
+ * as a lightweight thread.
+ *
+ * While it guarantees that each task will complete before executing the next,
+ * there is no guarantee as to which thread from the \c pool individual tasks
+ * will execute. This normally only matters if your code relys on thread
+ * specific information, such as thread locals.
+ *
+ * Use ast_taskprocessor_unreference() to dispose of the returned \ref
+ * ast_taskprocessor.
+ *
+ * Only a single taskprocessor with a given name may exist. This function will fail
+ * if a taskprocessor with the given name already exists.
+ *
+ * \param name Name of the serializer. (must be unique)
+ * \param pool \ref ast_threadpool for execution.
+ * \return \ref ast_taskprocessor for enqueuing work.
+ * \return \c NULL on error.
+ */
+struct ast_taskprocessor *ast_threadpool_serializer(const char *name, struct ast_threadpool *pool);
+
#endif /* ASTERISK_THREADPOOL_H */
/*! \brief Taskprocessor singleton list entry */
AST_LIST_ENTRY(ast_taskprocessor) list;
struct ast_taskprocessor_listener *listener;
- /*! Indicates if the taskprocessor is in the process of shuting down */
- unsigned int shutting_down:1;
+ /*! Indicates if the taskprocessor is currently executing a task */
+ unsigned int executing:1;
};
/*!
{
struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
+ ast_assert(!pvt->dead);
+
if (was_empty) {
default_tps_wake_up(pvt, 0);
}
struct tps_task *task;
SCOPED_AO2LOCK(lock, tps);
- if (tps->shutting_down) {
- return NULL;
- }
-
if ((task = AST_LIST_REMOVE_HEAD(&tps->tps_queue, list))) {
tps->tps_queue_size--;
}
{
struct tps_task *t;
int previous_size;
+ int was_empty;
if (!tps || !task_exe) {
ast_log(LOG_ERROR, "%s is missing!!\n", (tps) ? "task callback" : "taskprocessor");
ao2_lock(tps);
AST_LIST_INSERT_TAIL(&tps->tps_queue, t, list);
previous_size = tps->tps_queue_size++;
+ /* The currently executing task counts as still in queue */
+ was_empty = tps->executing ? 0 : previous_size == 0;
ao2_unlock(tps);
- tps->listener->callbacks->task_pushed(tps->listener, previous_size ? 0 : 1);
+ tps->listener->callbacks->task_pushed(tps->listener, was_empty);
return 0;
}
struct tps_task *t;
int size;
- if (!(t = tps_taskprocessor_pop(tps))) {
- return 0;
- }
+ ao2_lock(tps);
+ tps->executing = 1;
+ ao2_unlock(tps);
- t->execute(t->datap);
+ t = tps_taskprocessor_pop(tps);
- tps_task_free(t);
+ if (t) {
+ t->execute(t->datap);
+ tps_task_free(t);
+ }
ao2_lock(tps);
+ /* We need to check size in the same critical section where we reset the
+ * executing bit. Avoids a race condition where a task is pushed right
+ * after we pop an empty stack.
+ */
+ tps->executing = 0;
size = tps_taskprocessor_depth(tps);
- if (tps->stats) {
+ /* If we executed a task, bump the stats */
+ if (t && tps->stats) {
tps->stats->_tasks_processed_count++;
if (size > tps->stats->max_qsize) {
tps->stats->max_qsize = size;
}
ao2_unlock(tps);
- if (size == 0 && tps->listener->callbacks->emptied) {
+ /* If we executed a task, check for the transition to empty */
+ if (t && size == 0 && tps->listener->callbacks->emptied) {
tps->listener->callbacks->emptied(tps->listener);
- return 0;
}
- return 1;
+ return size > 0;
}
if (!pool) {
return NULL;
}
-
+
tps_listener = ast_taskprocessor_listener_alloc(&threadpool_tps_listener_callbacks, pool);
if (!tps_listener) {
return NULL;
ast_cond_signal(&worker->cond);
}
+struct serializer {
+ struct ast_threadpool *pool;
+};
+
+static void serializer_dtor(void *obj)
+{
+ struct serializer *ser = obj;
+ ao2_cleanup(ser->pool);
+ ser->pool = NULL;
+}
+
+static struct serializer *serializer_create(struct ast_threadpool *pool)
+{
+ struct serializer *ser = ao2_alloc(sizeof(*ser), serializer_dtor);
+ if (!ser) {
+ return NULL;
+ }
+ ao2_ref(pool, +1);
+ ser->pool = pool;
+ return ser;
+}
+
+static int execute_tasks(void *data)
+{
+ struct ast_taskprocessor *tps = data;
+
+ while (ast_taskprocessor_execute(tps)) {
+ /* No-op */
+ }
+
+ ast_taskprocessor_unreference(tps);
+ return 0;
+}
+
+static void serializer_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty) {
+ if (was_empty) {
+ struct serializer *ser = ast_taskprocessor_listener_get_user_data(listener);
+ struct ast_taskprocessor *tps = ast_taskprocessor_listener_get_tps(listener);
+ ast_threadpool_push(ser->pool, execute_tasks, tps);
+ }
+};
+
+static int serializer_start(struct ast_taskprocessor_listener *listener)
+{
+ /* No-op */
+ return 0;
+}
+
+static void serializer_shutdown(struct ast_taskprocessor_listener *listener)
+{
+ struct serializer *ser = ast_taskprocessor_listener_get_user_data(listener);
+ ao2_cleanup(ser);
+}
+
+static struct ast_taskprocessor_listener_callbacks serializer_tps_listener_callbacks = {
+ .task_pushed = serializer_task_pushed,
+ .start = serializer_start,
+ .shutdown = serializer_shutdown,
+};
+
+struct ast_taskprocessor *ast_threadpool_serializer(const char *name, struct ast_threadpool *pool)
+{
+ RAII_VAR(struct serializer *, ser, NULL, ao2_cleanup);
+ RAII_VAR(struct ast_taskprocessor_listener *, listener, NULL, ao2_cleanup);
+ struct ast_taskprocessor *tps = NULL;
+
+ ser = serializer_create(pool);
+ if (!ser) {
+ return NULL;
+ }
+
+ listener = ast_taskprocessor_listener_alloc(&serializer_tps_listener_callbacks, ser);
+ if (!listener) {
+ return NULL;
+ }
+ ser = NULL; /* ownership transferred to listener */
+
+ tps = ast_taskprocessor_create_with_listener(name, listener);
+ if (!tps) {
+ return NULL;
+ }
+ listener = NULL; /* ownership transferred to tps */
+
+ return tps;
+}
return res;
}
+struct shutdown_data {
+ ast_cond_t in;
+ ast_cond_t out;
+ ast_mutex_t lock;
+ int task_complete;
+ int task_started;
+ int task_stop_waiting;
+};
+
+static void shutdown_data_dtor(void *data)
+{
+ struct shutdown_data *shutdown_data = data;
+ ast_mutex_destroy(&shutdown_data->lock);
+ ast_cond_destroy(&shutdown_data->in);
+ ast_cond_destroy(&shutdown_data->out);
+}
+
+static struct shutdown_data *shutdown_data_create(int dont_wait)
+{
+ RAII_VAR(struct shutdown_data *, shutdown_data, NULL, ao2_cleanup);
+
+ shutdown_data = ao2_alloc(sizeof(*shutdown_data), shutdown_data_dtor);
+ if (!shutdown_data) {
+ return NULL;
+ }
+
+ ast_mutex_init(&shutdown_data->lock);
+ ast_cond_init(&shutdown_data->in, NULL);
+ ast_cond_init(&shutdown_data->out, NULL);
+ shutdown_data->task_stop_waiting = dont_wait;
+ ao2_ref(shutdown_data, +1);
+ return shutdown_data;
+}
+
+static int shutdown_task_exec(void *data)
+{
+ struct shutdown_data *shutdown_data = data;
+ SCOPED_MUTEX(lock, &shutdown_data->lock);
+ shutdown_data->task_started = 1;
+ ast_cond_signal(&shutdown_data->out);
+ while (!shutdown_data->task_stop_waiting) {
+ ast_cond_wait(&shutdown_data->in, &shutdown_data->lock);
+ }
+ shutdown_data->task_complete = 1;
+ ast_cond_signal(&shutdown_data->out);
+ return 0;
+}
+
+static int shutdown_waitfor_completion(struct shutdown_data *shutdown_data)
+{
+ struct timeval start = ast_tvnow();
+ struct timespec end = {
+ .tv_sec = start.tv_sec + 5,
+ .tv_nsec = start.tv_usec * 1000
+ };
+ SCOPED_MUTEX(lock, &shutdown_data->lock);
+
+ while (!shutdown_data->task_complete) {
+ if (ast_cond_timedwait(&shutdown_data->out, &shutdown_data->lock, &end) == ETIMEDOUT) {
+ break;
+ }
+ }
+
+ return shutdown_data->task_complete;
+}
+
+static int shutdown_has_completed(struct shutdown_data *shutdown_data)
+{
+ SCOPED_MUTEX(lock, &shutdown_data->lock);
+ return shutdown_data->task_complete;
+}
+
+static int shutdown_waitfor_start(struct shutdown_data *shutdown_data)
+{
+ struct timeval start = ast_tvnow();
+ struct timespec end = {
+ .tv_sec = start.tv_sec + 5,
+ .tv_nsec = start.tv_usec * 1000
+ };
+ SCOPED_MUTEX(lock, &shutdown_data->lock);
+
+ while (!shutdown_data->task_started) {
+ if (ast_cond_timedwait(&shutdown_data->out, &shutdown_data->lock, &end) == ETIMEDOUT) {
+ break;
+ }
+ }
+
+ return shutdown_data->task_started;
+}
+
+static void shutdown_poke(struct shutdown_data *shutdown_data)
+{
+ SCOPED_MUTEX(lock, &shutdown_data->lock);
+ shutdown_data->task_stop_waiting = 1;
+ ast_cond_signal(&shutdown_data->in);
+}
+
+static void *tps_shutdown_thread(void *data)
+{
+ struct ast_taskprocessor *tps = data;
+ ast_taskprocessor_unreference(tps);
+ return NULL;
+}
+
+AST_TEST_DEFINE(taskprocessor_shutdown)
+{
+ RAII_VAR(struct ast_taskprocessor *, tps, NULL, ast_taskprocessor_unreference);
+ RAII_VAR(struct shutdown_data *, task1, NULL, ao2_cleanup);
+ RAII_VAR(struct shutdown_data *, task2, NULL, ao2_cleanup);
+ int push_res;
+ int wait_res;
+ int pthread_res;
+ pthread_t shutdown_thread;
+
+ switch (cmd) {
+ case TEST_INIT:
+ info->name = "taskprocessor_shutdown";
+ info->category = "/main/taskprocessor/";
+ info->summary = "Test of taskproccesor shutdown sequence";
+ info->description =
+ "Ensures that all tasks run to completion after the taskprocessor has been unref'ed.";
+ return AST_TEST_NOT_RUN;
+ case TEST_EXECUTE:
+ break;
+ }
+
+ tps = ast_taskprocessor_get("test_shutdown", TPS_REF_DEFAULT);
+ task1 = shutdown_data_create(0); /* task1 waits to be poked */
+ task2 = shutdown_data_create(1); /* task2 waits for nothing */
+
+ if (!tps || !task1 || !task2) {
+ ast_test_status_update(test, "Allocation error\n");
+ return AST_TEST_FAIL;
+ }
+
+ push_res = ast_taskprocessor_push(tps, shutdown_task_exec, task1);
+ if (push_res != 0) {
+ ast_test_status_update(test, "Could not push task1\n");
+ return AST_TEST_FAIL;
+ }
+
+ push_res = ast_taskprocessor_push(tps, shutdown_task_exec, task2);
+ if (push_res != 0) {
+ ast_test_status_update(test, "Could not push task2\n");
+ return AST_TEST_FAIL;
+ }
+
+ wait_res = shutdown_waitfor_start(task1);
+ if (!wait_res) {
+ ast_test_status_update(test, "Task1 didn't start\n");
+ return AST_TEST_FAIL;
+ }
+
+ pthread_res = ast_pthread_create(&shutdown_thread, NULL, tps_shutdown_thread, tps);
+ if (pthread_res != 0) {
+ ast_test_status_update(test, "Failed to create shutdown thread\n");
+ return AST_TEST_FAIL;
+ }
+ tps = NULL;
+
+ /* Wakeup task1; it should complete */
+ shutdown_poke(task1);
+ wait_res = shutdown_waitfor_completion(task1);
+ if (!wait_res) {
+ ast_test_status_update(test, "Task1 didn't complete\n");
+ return AST_TEST_FAIL;
+ }
+
+ /* Wait for shutdown to complete */
+ pthread_join(shutdown_thread, NULL);
+
+ /* Should have also also completed task2 */
+ wait_res = shutdown_has_completed(task2);
+ if (!wait_res) {
+ ast_test_status_update(test, "Task2 didn't finish\n");
+ return AST_TEST_FAIL;
+ }
+
+ return AST_TEST_PASS;
+}
+
static int unload_module(void)
{
ast_test_unregister(default_taskprocessor);
ast_test_unregister(default_taskprocessor_load);
ast_test_unregister(taskprocessor_listener);
+ ast_test_unregister(taskprocessor_shutdown);
return 0;
}
ast_test_register(default_taskprocessor);
ast_test_register(default_taskprocessor_load);
ast_test_register(taskprocessor_listener);
+ ast_test_register(taskprocessor_shutdown);
return AST_MODULE_LOAD_SUCCESS;
}
#include "asterisk.h"
-#include "asterisk/test.h"
-#include "asterisk/threadpool.h"
-#include "asterisk/module.h"
-#include "asterisk/lock.h"
#include "asterisk/astobj2.h"
+#include "asterisk/lock.h"
#include "asterisk/logger.h"
+#include "asterisk/module.h"
+#include "asterisk/taskprocessor.h"
+#include "asterisk/test.h"
+#include "asterisk/threadpool.h"
struct test_listener_data {
int num_active;
}
struct complex_task_data {
+ int task_started;
int task_executed;
int continue_task;
ast_mutex_t lock;
ast_cond_t stall_cond;
- ast_cond_t done_cond;
+ ast_cond_t notify_cond;
};
static struct complex_task_data *complex_task_data_alloc(void)
}
ast_mutex_init(&ctd->lock);
ast_cond_init(&ctd->stall_cond, NULL);
- ast_cond_init(&ctd->done_cond, NULL);
+ ast_cond_init(&ctd->notify_cond, NULL);
return ctd;
}
{
struct complex_task_data *ctd = data;
SCOPED_MUTEX(lock, &ctd->lock);
+ /* Notify that we started */
+ ctd->task_started = 1;
+ ast_cond_signal(&ctd->notify_cond);
while (!ctd->continue_task) {
ast_cond_wait(&ctd->stall_cond, lock);
}
/* We got poked. Finish up */
ctd->task_executed = 1;
- ast_cond_signal(&ctd->done_cond);
+ ast_cond_signal(&ctd->notify_cond);
return 0;
}
ast_cond_signal(&ctd->stall_cond);
}
+static int wait_for_complex_start(struct complex_task_data *ctd)
+{
+ struct timeval start = ast_tvnow();
+ struct timespec end = {
+ .tv_sec = start.tv_sec + 5,
+ .tv_nsec = start.tv_usec * 1000
+ };
+ SCOPED_MUTEX(lock, &ctd->lock);
+
+ while (!ctd->task_started) {
+ if (ast_cond_timedwait(&ctd->notify_cond, lock, &end) == ETIMEDOUT) {
+ break;
+ }
+ }
+
+ return ctd->task_started;
+}
+
+static int has_complex_started(struct complex_task_data *ctd)
+{
+ struct timeval start = ast_tvnow();
+ struct timespec end = {
+ .tv_sec = start.tv_sec + 1,
+ .tv_nsec = start.tv_usec * 1000
+ };
+ SCOPED_MUTEX(lock, &ctd->lock);
+
+ while (!ctd->task_started) {
+ if (ast_cond_timedwait(&ctd->notify_cond, lock, &end) == ETIMEDOUT) {
+ break;
+ }
+ }
+
+ return ctd->task_started;
+}
+
static enum ast_test_result_state wait_for_complex_completion(struct complex_task_data *ctd)
{
struct timeval start = ast_tvnow();
SCOPED_MUTEX(lock, &ctd->lock);
while (!ctd->task_executed) {
- if (ast_cond_timedwait(&ctd->done_cond, lock, &end) == ETIMEDOUT) {
+ if (ast_cond_timedwait(&ctd->notify_cond, lock, &end) == ETIMEDOUT) {
break;
}
}
return res;
}
+AST_TEST_DEFINE(threadpool_serializer)
+{
+ int started = 0;
+ int finished = 0;
+ enum ast_test_result_state res = AST_TEST_FAIL;
+ struct ast_threadpool *pool = NULL;
+ struct ast_taskprocessor *uut = NULL;
+ struct complex_task_data *data1 = NULL;
+ struct complex_task_data *data2 = NULL;
+ struct complex_task_data *data3 = NULL;
+ struct ast_threadpool_options options = {
+ .version = AST_THREADPOOL_OPTIONS_VERSION,
+ .idle_timeout = 0,
+ .auto_increment = 0,
+ .initial_size = 2,
+ .max_size = 0,
+ };
+
+ switch (cmd) {
+ case TEST_INIT:
+ info->name = "threadpool_serializer";
+ info->category = "/main/threadpool/";
+ info->summary = "Test that serializers";
+ info->description =
+ "Ensures that tasks enqueued to a serialize execute in sequence.\n";
+ return AST_TEST_NOT_RUN;
+ case TEST_EXECUTE:
+ break;
+ }
+
+ pool = ast_threadpool_create("threadpool_serializer", NULL, &options);
+ if (!pool) {
+ ast_test_status_update(test, "Could not create threadpool\n");
+ goto end;
+ }
+ uut = ast_threadpool_serializer("ser1", pool);
+ data1 = complex_task_data_alloc();
+ data2 = complex_task_data_alloc();
+ data3 = complex_task_data_alloc();
+ if (!uut || !data1 || !data2 || !data3) {
+ ast_test_status_update(test, "Allocation failed\n");
+ goto end;
+ }
+
+ /* This should start right away */
+ if (ast_taskprocessor_push(uut, complex_task, data1)) {
+ ast_test_status_update(test, "Failed to enqueue data1\n");
+ goto end;
+ }
+ started = wait_for_complex_start(data1);
+ if (!started) {
+ ast_test_status_update(test, "Failed to start data1\n");
+ goto end;
+ }
+
+ /* This should not start until data 1 is complete */
+ if (ast_taskprocessor_push(uut, complex_task, data2)) {
+ ast_test_status_update(test, "Failed to enqueue data2\n");
+ goto end;
+ }
+ started = has_complex_started(data2);
+ if (started) {
+ ast_test_status_update(test, "data2 started out of order\n");
+ goto end;
+ }
+
+ /* But the free thread in the pool can still run */
+ if (ast_threadpool_push(pool, complex_task, data3)) {
+ ast_test_status_update(test, "Failed to enqueue data3\n");
+ }
+ started = wait_for_complex_start(data3);
+ if (!started) {
+ ast_test_status_update(test, "Failed to start data3\n");
+ goto end;
+ }
+
+ /* Finishing data1 should allow data2 to start */
+ poke_worker(data1);
+ finished = wait_for_complex_completion(data1) == AST_TEST_PASS;
+ if (!finished) {
+ ast_test_status_update(test, "data1 couldn't finish\n");
+ goto end;
+ }
+ started = wait_for_complex_start(data2);
+ if (!started) {
+ ast_test_status_update(test, "Failed to start data2\n");
+ goto end;
+ }
+
+ /* Finish up */
+ poke_worker(data2);
+ finished = wait_for_complex_completion(data2) == AST_TEST_PASS;
+ if (!finished) {
+ ast_test_status_update(test, "data2 couldn't finish\n");
+ goto end;
+ }
+ poke_worker(data3);
+ finished = wait_for_complex_completion(data3) == AST_TEST_PASS;
+ if (!finished) {
+ ast_test_status_update(test, "data3 couldn't finish\n");
+ goto end;
+ }
+
+ res = AST_TEST_PASS;
+
+end:
+ poke_worker(data1);
+ poke_worker(data2);
+ poke_worker(data3);
+ ast_taskprocessor_unreference(uut);
+ ast_threadpool_shutdown(pool);
+ ast_free(data1);
+ ast_free(data2);
+ ast_free(data3);
+ return res;
+}
+
+AST_TEST_DEFINE(threadpool_serializer_dupe)
+{
+ enum ast_test_result_state res = AST_TEST_FAIL;
+ struct ast_threadpool *pool = NULL;
+ struct ast_taskprocessor *uut = NULL;
+ struct ast_taskprocessor *there_can_be_only_one = NULL;
+ struct ast_threadpool_options options = {
+ .version = AST_THREADPOOL_OPTIONS_VERSION,
+ .idle_timeout = 0,
+ .auto_increment = 0,
+ .initial_size = 2,
+ .max_size = 0,
+ };
+
+ switch (cmd) {
+ case TEST_INIT:
+ info->name = "threadpool_serializer_dupe";
+ info->category = "/main/threadpool/";
+ info->summary = "Test that serializers are uniquely named";
+ info->description =
+ "Creating two serializers with the same name should\n"
+ "result in error.\n";
+ return AST_TEST_NOT_RUN;
+ case TEST_EXECUTE:
+ break;
+ }
+
+ pool = ast_threadpool_create("threadpool_serializer", NULL, &options);
+ if (!pool) {
+ ast_test_status_update(test, "Could not create threadpool\n");
+ goto end;
+ }
+
+ uut = ast_threadpool_serializer("highlander", pool);
+ if (!uut) {
+ ast_test_status_update(test, "Allocation failed\n");
+ goto end;
+ }
+
+ there_can_be_only_one = ast_threadpool_serializer("highlander", pool);
+ if (there_can_be_only_one) {
+ ast_taskprocessor_unreference(there_can_be_only_one);
+ ast_test_status_update(test, "Duplicate name error\n");
+ goto end;
+ }
+
+ res = AST_TEST_PASS;
+
+end:
+ ast_taskprocessor_unreference(uut);
+ ast_threadpool_shutdown(pool);
+ return res;
+}
+
static int unload_module(void)
{
ast_test_unregister(threadpool_push);
ast_test_unregister(threadpool_reactivation);
ast_test_unregister(threadpool_task_distribution);
ast_test_unregister(threadpool_more_destruction);
+ ast_test_unregister(threadpool_serializer);
+ ast_test_unregister(threadpool_serializer_dupe);
return 0;
}
ast_test_register(threadpool_reactivation);
ast_test_register(threadpool_task_distribution);
ast_test_register(threadpool_more_destruction);
+ ast_test_register(threadpool_serializer);
+ ast_test_register(threadpool_serializer_dupe);
return AST_MODULE_LOAD_SUCCESS;
}