*/
struct ao2_container *idle_threads;
/*!
+ * \brief The container of zombie threads.
+ * Zombie threads may be running tasks, but they are scheduled to die soon
+ */
+ struct ao2_container *zombie_threads;
+ /*!
* \brief The main taskprocessor
*
* Tasks that are queued in this taskprocessor are
* This is done for three main reasons
* 1) It ensures that listeners are given an accurate portrayal
* of the threadpool's current state. In other words, when a listener
- * gets told a count of active and idle threads, it does not
+ * gets told a count of active, idle and zombie threads, it does not
* need to worry that internal state of the threadpool might be different
* from what it has been told.
* 2) It minimizes the locking required in both the threadpool and in
enum worker_state {
/*! The worker is either active or idle */
ALIVE,
- /*! The worker has been asked to shut down. */
+ /*!
+ * The worker has been asked to shut down but
+ * may still be in the process of executing tasks.
+ * This transition happens when the threadpool needs
+ * to shrink and needs to kill active threads in order
+ * to do so.
+ */
+ ZOMBIE,
+ /*!
+ * The worker has been asked to shut down. Typically
+ * only idle threads go to this state directly, but
+ * active threads may go straight to this state when
+ * the threadpool is shut down.
+ */
DEAD,
};
}
/*!
+ * \brief Kill a zombie thread
+ *
+ * This runs from the threadpool's control taskprocessor thread.
+ *
+ * \param data A thread_worker_pair containing the threadpool and the zombie thread
+ * \return 0
+ */
+static int queued_zombie_thread_dead(void *data)
+{
+ struct thread_worker_pair *pair = data;
+
+ ao2_unlink(pair->pool->zombie_threads, pair->worker);
+ threadpool_send_state_changed(pair->pool);
+
+ ao2_ref(pair, -1);
+ return 0;
+}
+
+/*!
+ * \brief Queue a task to kill a zombie thread
+ *
+ * This is called by a worker thread when it acknowledges that it is time for
+ * it to die.
+ */
+static void threadpool_zombie_thread_dead(struct ast_threadpool *pool,
+ struct worker_thread *worker)
+{
+ struct thread_worker_pair *pair = thread_worker_pair_alloc(pool, worker);
+ if (!pair) {
+ return;
+ }
+ ast_taskprocessor_push(pool->control_tps, queued_zombie_thread_dead, pair);
+}
+
+/*!
* \brief Execute a task in the threadpool
*
* This is the function that worker threads call in order to execute tasks
if (!pool->idle_threads) {
return NULL;
}
+ pool->zombie_threads = ao2_container_alloc(THREAD_BUCKETS, worker_thread_hash, worker_thread_cmp);
+ if (!pool->zombie_threads) {
+ return NULL;
+ }
ao2_ref(pool, +1);
return pool;
ao2_cleanup(pool->active_threads);
ao2_cleanup(pool->idle_threads);
+ ao2_cleanup(pool->zombie_threads);
}
/*!
return;
}
ao2_link(pool->active_threads, worker);
+ ao2_ref(worker, -1);
}
}
{
int *num_to_kill = arg;
- if ((*num_to_kill)-- > 0) {
+ ast_log(LOG_NOTICE, "num to kill is %d\n", *num_to_kill);
+
+ if (*num_to_kill > 0) {
+ --(*num_to_kill);
+ ast_log(LOG_NOTICE, "Should be killing a thread\n");
+ return CMP_MATCH;
+ } else {
+ return CMP_STOP;
+ }
+}
+
+/*!
+ * \brief ao2 callback to zombify a set number of threads.
+ *
+ * Threads will be zombified as long as as the counter has not reached
+ * zero. The counter is decremented with each thread that is zombified.
+ *
+ * Zombifying a thread involves removing it from its current container,
+ * adding it to the zombie container, and changing the state of the
+ * worker to a zombie
+ *
+ * This callback is called from the threadpool control taskprocessor thread.
+ *
+ * \param obj The worker thread that may be zombified
+ * \param arg The pool to which the worker belongs
+ * \param data The counter
+ * \param flags Unused
+ * \retval CMP_MATCH The zombified thread should be removed from its current container
+ * \retval CMP_STOP Stop attempting to zombify threads
+ */
+static int zombify_threads(void *obj, void *arg, void *data, int flags)
+{
+ struct worker_thread *worker = obj;
+ struct ast_threadpool *pool = arg;
+ int *num_to_zombify = data;
+
+ if ((*num_to_zombify)-- > 0) {
+ ast_log(LOG_NOTICE, "Should be zombifying a thread\n");
+ ao2_link(pool->zombie_threads, worker);
+ worker_set_state(worker, ZOMBIE);
return CMP_MATCH;
} else {
return CMP_STOP;
*
* The preference is to kill idle threads. However, if there are
* more threads to remove than there are idle threads, then active
- * threads will be removed too.
+ * threads will be zombified instead.
*
* This function is called from the threadpool control taskprocessor thread.
*
*/
static void shrink(struct ast_threadpool *pool, int delta)
{
+ /*
+ * Preference is to kill idle threads, but
+ * we'll move on to deactivating active threads
+ * if we have to
+ */
int idle_threads = ao2_container_count(pool->idle_threads);
int idle_threads_to_kill = MIN(delta, idle_threads);
- int active_threads_to_kill = delta - idle_threads_to_kill;
+ int active_threads_to_zombify = delta - idle_threads_to_kill;
- ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK,
+ ast_log(LOG_NOTICE, "Going to kill off %d idle threads\n", idle_threads_to_kill);
+ ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE,
kill_threads, &idle_threads_to_kill);
- ao2_callback(pool->active_threads, OBJ_UNLINK | OBJ_NOLOCK,
- kill_threads, &active_threads_to_kill);
+ ao2_callback_data(pool->active_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE,
+ zombify_threads, pool, &active_threads_to_zombify);
}
/*!
{
struct set_size_data *ssd = data;
struct ast_threadpool *pool = ssd->pool;
- unsigned int new_size = ssd->size;
+ unsigned int num_threads = ssd->size;
+
+ /* We don't count zombie threads as being "live when potentially resizing */
unsigned int current_size = ao2_container_count(pool->active_threads) +
ao2_container_count(pool->idle_threads);
- if (current_size == new_size) {
+ if (current_size == num_threads) {
ast_log(LOG_NOTICE, "Not changing threadpool size since new size %u is the same as current %u\n",
- new_size, current_size);
+ num_threads, current_size);
return 0;
}
- if (current_size < new_size) {
- grow(pool, new_size - current_size);
+ if (current_size < num_threads) {
+ grow(pool, num_threads - current_size);
} else {
- shrink(pool, current_size - new_size);
+ shrink(pool, current_size - num_threads);
}
threadpool_send_state_changed(pool);
alive = worker_idle(worker);
}
}
+
+ /* Reaching this portion means the thread is
+ * on death's door. It may have been killed while
+ * it was idle, in which case it can just die
+ * peacefully. If it's a zombie, though, then
+ * it needs to let the pool know so
+ * that the thread can be removed from the
+ * list of zombie threads.
+ */
+ if (worker->state == ZOMBIE) {
+ threadpool_zombie_thread_dead(worker->pool, worker);
+ }
}
/*!
return res;
}
+AST_TEST_DEFINE(threadpool_more_destruction)
+{
+ struct ast_threadpool *pool = NULL;
+ struct ast_threadpool_listener *listener = NULL;
+ struct complex_task_data *ctd1 = NULL;
+ struct complex_task_data *ctd2 = NULL;
+ enum ast_test_result_state res = AST_TEST_FAIL;
+ struct test_listener_data *tld;
+
+ switch (cmd) {
+ case TEST_INIT:
+ info->name = "threadpool_more_destruction";
+ info->category = "/main/threadpool/";
+ info->summary = "Test that threads are destroyed as expected";
+ info->description =
+ "Push two tasks into a threadpool. Set the threadpool size to 4\n"
+ "Ensure that there are 2 active and 2 idle threads. Then shrink the\n"
+ "threadpool down to 1 thread. Ensure that the thread leftove is active\n"
+ "and ensure that both tasks complete.\n";
+ return AST_TEST_NOT_RUN;
+ case TEST_EXECUTE:
+ break;
+ }
+
+ listener = ast_threadpool_listener_alloc(&test_callbacks);
+ if (!listener) {
+ return AST_TEST_FAIL;
+ }
+ tld = listener->private_data;
+
+ pool = ast_threadpool_create(listener, 0);
+ if (!pool) {
+ goto end;
+ }
+
+ ctd1 = complex_task_data_alloc();
+ ctd2 = complex_task_data_alloc();
+ if (!ctd1 || !ctd2) {
+ goto end;
+ }
+
+ ast_threadpool_push(pool, complex_task, ctd1);
+ ast_threadpool_push(pool, complex_task, ctd2);
+
+ ast_threadpool_set_size(pool, 4);
+
+ WAIT_WHILE(tld, tld->num_idle < 2);
+
+ res = listener_check(test, listener, 1, 0, 2, 2, 2, 0);
+ if (res == AST_TEST_FAIL) {
+ goto end;
+ }
+
+ ast_threadpool_set_size(pool, 1);
+
+ /* Shrinking the threadpool should kill off the two idle threads
+ * and one of the active threads.
+ */
+ WAIT_WHILE(tld, tld->num_idle > 0 || tld->num_active > 1);
+
+ res = listener_check(test, listener, 1, 0, 2, 1, 0, 0);
+ if (res == AST_TEST_FAIL) {
+ goto end;
+ }
+
+ /* The tasks are stalled until we poke them */
+ poke_worker(ctd1);
+ poke_worker(ctd2);
+
+ res = wait_for_complex_completion(ctd1);
+ if (res == AST_TEST_FAIL) {
+ goto end;
+ }
+ res = wait_for_complex_completion(ctd2);
+ if (res == AST_TEST_FAIL) {
+ goto end;
+ }
+
+ WAIT_WHILE(tld, tld->num_idle < 1);
+
+ res = listener_check(test, listener, 1, 0, 2, 0, 1, 1);
+
+end:
+ if (pool) {
+ ast_threadpool_shutdown(pool);
+ }
+ ao2_cleanup(listener);
+ ast_free(ctd1);
+ ast_free(ctd2);
+ return res;
+}
+
static int unload_module(void)
{
ast_test_unregister(threadpool_push);
ast_test_unregister(threadpool_one_thread_multiple_tasks);
ast_test_unregister(threadpool_reactivation);
ast_test_unregister(threadpool_task_distribution);
+ ast_test_unregister(threadpool_more_destruction);
return 0;
}
ast_test_register(threadpool_one_thread_multiple_tasks);
ast_test_register(threadpool_reactivation);
ast_test_register(threadpool_task_distribution);
+ ast_test_register(threadpool_more_destruction);
return AST_MODULE_LOAD_SUCCESS;
}