struct ast_threadpool {
/*! Threadpool listener */
struct ast_threadpool_listener *listener;
- /*!
+ /*!
* \brief The container of active threads.
* Active threads are those that are currently running tasks
*/
struct ao2_container *active_threads;
- /*!
+ /*!
* \brief The container of idle threads.
* Idle threads are those that are currenly waiting to run tasks
*/
struct ao2_container *idle_threads;
- /*!
+ /*!
* \brief The container of zombie threads.
* Zombie threads may be running tasks, but they are scheduled to die soon
*/
struct ao2_container *zombie_threads;
- /*!
+ /*!
* \brief The main taskprocessor
- *
+ *
* Tasks that are queued in this taskprocessor are
* doled out to the worker threads. Worker threads that
* execute tasks from the threadpool are executing tasks
DEAD,
};
+/*!
+ * A thread that executes threadpool tasks
+ */
+struct worker_thread {
+ /*! A unique (within a run of Asterisk) ID for the thread. Used for hashing and searching */
+ int id;
+ /*! Condition used in conjunction with state changes */
+ ast_cond_t cond;
+ /*! Lock used alongside the condition for state changes */
+ ast_mutex_t lock;
+ /*! The actual thread that is executing tasks */
+ pthread_t thread;
+ /*! A pointer to the threadpool. Needed to be able to execute tasks */
+ struct ast_threadpool *pool;
+ /*! The current state of the worker thread */
+ enum worker_state state;
+ /*! A boolean used to determine if an idle thread should become active */
+ int wake_up;
+ /*! Options for this threadpool */
+ struct ast_threadpool_options options;
+};
+
/* Worker thread forward declarations. See definitions for documentation */
-struct worker_thread;
static int worker_thread_hash(const void *obj, int flags);
static int worker_thread_cmp(void *obj, void *arg, int flags);
static void worker_thread_destroy(void *obj);
static void worker_active(struct worker_thread *worker);
static void *worker_start(void *arg);
static struct worker_thread *worker_thread_alloc(struct ast_threadpool *pool);
+static int worker_thread_start(struct worker_thread *worker);
static int worker_idle(struct worker_thread *worker);
static void worker_set_state(struct worker_thread *worker, enum worker_state state);
static void worker_shutdown(struct worker_thread *worker);
int active_size = ao2_container_count(pool->active_threads);
int idle_size = ao2_container_count(pool->idle_threads);
- pool->listener->callbacks->state_changed(pool, pool->listener, active_size, idle_size);
+ if (pool->listener && pool->listener->callbacks->state_changed) {
+ pool->listener->callbacks->state_changed(pool, pool->listener, active_size, idle_size);
+ }
}
/*!
/*!
* \brief Execute a task in the threadpool
- *
+ *
* This is the function that worker threads call in order to execute tasks
* in the threadpool
*
struct worker_thread *worker = obj;
struct ast_threadpool *pool = arg;
- ao2_link(pool->active_threads, worker);
+ if (!ao2_link(pool->active_threads, worker)) {
+ /* If we can't link the idle thread into the active container, then
+ * we'll just leave the thread idle and not wake it up.
+ */
+ ast_log(LOG_WARNING, "Failed to activate thread %d. Remaining idle\n",
+ worker->id);
+ return 0;
+ }
worker_set_state(worker, ALIVE);
return CMP_MATCH;
}
{
int i;
- ast_debug(1, "Going to increase threadpool size by %d\n", delta);
+ ast_debug(3, "Increasing threadpool %s's size by %d\n",
+ ast_taskprocessor_name(pool->tps), delta);
for (i = 0; i < delta; ++i) {
struct worker_thread *worker = worker_thread_alloc(pool);
if (!worker) {
return;
}
- ao2_link(pool->active_threads, worker);
+ if (ao2_link(pool->active_threads, worker)) {
+ if (worker_thread_start(worker)) {
+ ast_log(LOG_ERROR, "Unable to start worker thread %d. Destroying.\n", worker->id);
+ ao2_unlink(pool->active_threads, worker);
+ }
+ } else {
+ ast_log(LOG_WARNING, "Failed to activate worker thread %d. Destroying.\n", worker->id);
+ }
ao2_ref(worker, -1);
}
}
int was_empty = tpd->was_empty;
int state_changed;
- pool->listener->callbacks->task_pushed(pool, pool->listener, was_empty);
+ if (pool->listener && pool->listener->callbacks->task_pushed) {
+ pool->listener->callbacks->task_pushed(pool, pool->listener, was_empty);
+ }
if (ao2_container_count(pool->idle_threads) == 0) {
if (pool->options.auto_increment > 0) {
grow(pool, pool->options.auto_increment);
{
struct ast_threadpool *pool = data;
+ /* We already checked for existence of this callback when this was queued */
pool->listener->callbacks->emptied(pool, pool->listener);
return 0;
}
return;
}
- ast_taskprocessor_push(pool->control_tps, queued_emptied, pool);
+ if (pool->listener && pool->listener->callbacks->emptied) {
+ ast_taskprocessor_push(pool->control_tps, queued_emptied, pool);
+ }
}
/*!
int *num_to_zombify = data;
if ((*num_to_zombify)-- > 0) {
- ao2_link(pool->zombie_threads, worker);
+ if (!ao2_link(pool->zombie_threads, worker)) {
+ ast_log(LOG_WARNING, "Failed to zombify active thread %d. Thread will remain active\n", worker->id);
+ return 0;
+ }
worker_set_state(worker, ZOMBIE);
return CMP_MATCH;
} else {
*/
static void shrink(struct ast_threadpool *pool, int delta)
{
- /*
+ /*
* Preference is to kill idle threads, but
* we'll move on to deactivating active threads
* if we have to
int idle_threads_to_kill = MIN(delta, idle_threads);
int active_threads_to_zombify = delta - idle_threads_to_kill;
- ast_debug(1, "Going to kill off %d idle threads\n", idle_threads_to_kill);
+ ast_debug(3, "Destroying %d idle threads in threadpool %s\n", idle_threads_to_kill,
+ ast_taskprocessor_name(pool->tps));
ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE,
kill_threads, &idle_threads_to_kill);
- ast_debug(1, "Going to kill off %d active threads\n", active_threads_to_zombify);
+ ast_debug(3, "Destroying %d active threads in threadpool %s\n", active_threads_to_zombify,
+ ast_taskprocessor_name(pool->tps));
ao2_callback_data(pool->active_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE,
zombify_threads, pool, &active_threads_to_zombify);
pool = tps_listener->private_data;
pool->tps = tps;
- ao2_ref(listener, +1);
- pool->listener = listener;
+ if (listener) {
+ ao2_ref(listener, +1);
+ pool->listener = listener;
+ }
pool->options = *options;
ast_threadpool_set_size(pool, initial_size);
return pool;
}
/*!
- * A thread that executes threadpool tasks
- */
-struct worker_thread {
- /*! A unique (within a run of Asterisk) ID for the thread. Used for hashing and searching */
- int id;
- /*! Condition used in conjunction with state changes */
- ast_cond_t cond;
- /*! Lock used alongside the condition for state changes */
- ast_mutex_t lock;
- /*! The actual thread that is executing tasks */
- pthread_t thread;
- /*! A pointer to the threadpool. Needed to be able to execute tasks */
- struct ast_threadpool *pool;
- /*! The current state of the worker thread */
- enum worker_state state;
- /*! A boolean used to determine if an idle thread should become active */
- int wake_up;
- /*! Options for this threadpool */
- struct ast_threadpool_options options;
-};
-
-/*!
* A monotonically increasing integer used for worker
* thread identification.
*/
static void worker_thread_destroy(void *obj)
{
struct worker_thread *worker = obj;
- ast_debug(1, "Destroying worker thread\n");
+ ast_debug(3, "Destroying worker thread %d\n", worker->id);
worker_shutdown(worker);
ast_mutex_destroy(&worker->lock);
ast_cond_destroy(&worker->cond);
worker->thread = AST_PTHREADT_NULL;
worker->state = ALIVE;
worker->options = pool->options;
- if (ast_pthread_create(&worker->thread, NULL, worker_start, worker) < 0) {
- ast_log(LOG_ERROR, "Unable to start worker thread!\n");
- ao2_ref(worker, -1);
- return NULL;
- }
return worker;
}
+static int worker_thread_start(struct worker_thread *worker)
+{
+ return ast_pthread_create(&worker->thread, NULL, worker_start, worker);
+}
+
/*!
* \brief Active loop for worker threads
*
{
int alive = 1;
while (alive) {
- if (threadpool_execute(worker->pool) == 0) {
+ if (!threadpool_execute(worker->pool)) {
alive = worker_idle(worker);
}
}
return res;
}
+AST_TEST_DEFINE(threadpool_initial_threads)
+{
+ struct ast_threadpool *pool = NULL;
+ struct ast_threadpool_listener *listener = NULL;
+ enum ast_test_result_state res = AST_TEST_FAIL;
+ struct test_listener_data *tld;
+ struct ast_threadpool_options options = {
+ .version = AST_THREADPOOL_OPTIONS_VERSION,
+ .idle_timeout = 0,
+ .auto_increment = 0,
+ };
+
+ switch (cmd) {
+ case TEST_INIT:
+ info->name = "initial_threads";
+ info->category = "/main/threadpool/";
+ info->summary = "Test threadpool initialization state";
+ info->description =
+ "Ensure that a threadpool created with a specific size contains the\n"
+ "proper number of idle threads.";
+ return AST_TEST_NOT_RUN;
+ case TEST_EXECUTE:
+ break;
+ }
+
+ listener = ast_threadpool_listener_alloc(&test_callbacks);
+ if (!listener) {
+ return AST_TEST_FAIL;
+ }
+ tld = listener->private_data;
+
+ pool = ast_threadpool_create(info->name, listener, 3, &options);
+ if (!pool) {
+ goto end;
+ }
+
+ res = wait_until_thread_state(test, tld, 0, 3);
+
+end:
+ if (pool) {
+ ast_threadpool_shutdown(pool);
+ }
+ ao2_cleanup(listener);
+ return res;
+}
+
+
AST_TEST_DEFINE(threadpool_thread_creation)
{
struct ast_threadpool *pool = NULL;
if (res == AST_TEST_FAIL) {
goto end;
}
-
+
/* After completing the task, the thread should go idle */
res = wait_until_thread_state(test, tld, 0, 1);
if (res == AST_TEST_FAIL) {
{
struct ast_threadpool *pool = NULL;
struct ast_threadpool_listener *listener = NULL;
- struct simple_task_data *std = NULL;
+ struct simple_task_data *std1 = NULL;
+ struct simple_task_data *std2 = NULL;
+ struct simple_task_data *std3 = NULL;
+ struct simple_task_data *std4 = NULL;
enum ast_test_result_state res = AST_TEST_FAIL;
struct test_listener_data *tld;
struct ast_threadpool_options options = {
goto end;
}
- std = simple_task_data_alloc();
- if (!std) {
+ std1 = simple_task_data_alloc();
+ std2 = simple_task_data_alloc();
+ std3 = simple_task_data_alloc();
+ std4 = simple_task_data_alloc();
+ if (!std1 || !std2 || !std3 || !std4) {
goto end;
}
- ast_threadpool_push(pool, simple_task, std);
+ ast_threadpool_push(pool, simple_task, std1);
/* Pushing the task should result in the threadpool growing
* by three threads. This will allow the task to actually execute
*/
- res = wait_for_completion(test, std);
+ res = wait_for_completion(test, std1);
if (res == AST_TEST_FAIL) {
goto end;
}
goto end;
}
- res = listener_check(test, listener, 1, 1, 1, 0, 3, 1);
+ /* Now push three tasks into the pool and ensure the pool does not
+ * grow.
+ */
+ ast_threadpool_push(pool, simple_task, std2);
+ ast_threadpool_push(pool, simple_task, std3);
+ ast_threadpool_push(pool, simple_task, std4);
+
+ res = wait_for_completion(test, std2);
+ if (res == AST_TEST_FAIL) {
+ goto end;
+ }
+ res = wait_for_completion(test, std3);
+ if (res == AST_TEST_FAIL) {
+ goto end;
+ }
+ res = wait_for_completion(test, std4);
+ if (res == AST_TEST_FAIL) {
+ goto end;
+ }
+
+ res = wait_for_empty_notice(test, tld);
+ if (res == AST_TEST_FAIL) {
+ goto end;
+ }
+
+ res = wait_until_thread_state(test, tld, 0, 3);
+ if (res == AST_TEST_FAIL) {
+ goto end;
+ }
+ res = listener_check(test, listener, 1, 0, 4, 0, 3, 1);
end:
if (pool) {
ast_threadpool_shutdown(pool);
}
ao2_cleanup(listener);
- ast_free(std);
+ ast_free(std1);
+ ast_free(std2);
+ ast_free(std3);
+ ast_free(std4);
return res;
}
if (res == AST_TEST_FAIL) {
goto end;
}
-
+
res = wait_until_thread_state(test, tld, 0, 1);
if (res == AST_TEST_FAIL) {
goto end;
if (res == AST_TEST_FAIL) {
goto end;
}
-
+
res = wait_until_thread_state(test, tld, 0, 1);
if (res == AST_TEST_FAIL) {
goto end;
static int unload_module(void)
{
ast_test_unregister(threadpool_push);
+ ast_test_unregister(threadpool_initial_threads);
ast_test_unregister(threadpool_thread_creation);
ast_test_unregister(threadpool_thread_destruction);
ast_test_unregister(threadpool_thread_timeout);
static int load_module(void)
{
ast_test_register(threadpool_push);
+ ast_test_register(threadpool_initial_threads);
ast_test_register(threadpool_thread_creation);
ast_test_register(threadpool_thread_destruction);
ast_test_register(threadpool_thread_timeout);